diff --git a/pkg/preparation/dags/dags.go b/pkg/preparation/dags/dags.go index 6d126e92..bbfc66c6 100644 --- a/pkg/preparation/dags/dags.go +++ b/pkg/preparation/dags/dags.go @@ -50,44 +50,58 @@ var _ uploads.RemoveBadNodesFunc = API{}.RemoveBadNodes // ExecuteDagScansForUpload runs all pending and awaiting children DAG scans for the given upload, until there are no more scans to process. func (a API) ExecuteDagScansForUpload(ctx context.Context, uploadID id.UploadID, nodeCB func(node model.Node, data []byte) error) error { - var badFsEntryErrs []types.BadFSEntryError for { - ctx, span := tracer.Start(ctx, "dag-scans-batch", trace.WithAttributes( + passCtx, span := tracer.Start(ctx, "dag-scans-batch", trace.WithAttributes( attribute.String("upload.id", uploadID.String()), )) - defer span.End() // In case of early return - dagScans, err := a.Repo.IncompleteDAGScansForUpload(ctx, uploadID) + dagScans, err := a.Repo.IncompleteDAGScansForUpload(passCtx, uploadID) if err != nil { + span.End() return fmt.Errorf("getting dag scans for upload %s: %w", uploadID, err) } log.Debugf("Found %d pending or awaiting children dag scans for upload %s", len(dagScans), uploadID) if len(dagScans) == 0 { + span.End() return nil // No pending or awaiting children scans found, exit the loop } - executions := 0 + + // badFsEntryErrs is reset each pass so that, when the loop exits because + // a pass made no completions, we return only the bad entries discovered + // in the final pass — not accumulated duplicates from earlier passes. + var badFsEntryErrs []types.BadFSEntryError + completions := 0 for _, dagScan := range dagScans { if dagScan.CID().Defined() { + span.End() return fmt.Errorf("tried to execute completed dag scan %s (cid %s)", dagScan.FsEntryID(), dagScan.CID()) } log.Debugf("Executing dag scan %s", dagScan.FsEntryID()) - if err := a.executeDAGScan(ctx, dagScan, nodeCB); err != nil { + completed, err := a.executeDAGScan(passCtx, dagScan, nodeCB) + if err != nil { var errBadFSEntry types.BadFSEntryError if errors.As(err, &errBadFSEntry) { badFsEntryErrs = append(badFsEntryErrs, errBadFSEntry) continue // Continue with the next dag scan } + span.End() return fmt.Errorf("executing dag scan %s: %w", dagScan.FsEntryID(), err) } - executions++ - span.SetAttributes(attribute.Int("executed", executions)) + if completed { + completions++ + span.SetAttributes(attribute.Int("executed", completions)) + } } span.End() - if executions == 0 { + // Only completions count as progress. A directory that was deferred + // because its children are still incomplete returns (false, nil) and + // must not keep the loop alive — otherwise a bad file child causes its + // parent to "execute" forever without ever completing. + if completions == 0 { if len(badFsEntryErrs) > 0 { return types.NewBadFSEntriesError(badFsEntryErrs) } @@ -96,8 +110,11 @@ func (a API) ExecuteDagScansForUpload(ctx context.Context, uploadID id.UploadID, } } -// executeDAGScan executes a dag scan on the given fs entry, creating a unix fs dag for the given file or directory. -func (a API) executeDAGScan(ctx context.Context, dagScan model.DAGScan, nodeCB func(node model.Node, data []byte) error) error { +// executeDAGScan executes a dag scan on the given fs entry, creating a unix fs +// dag for the given file or directory. The returned boolean reports whether +// the scan completed (got a CID and was persisted); a directory scan deferred +// because of incomplete children returns (false, nil). +func (a API) executeDAGScan(ctx context.Context, dagScan model.DAGScan, nodeCB func(node model.Node, data []byte) error) (bool, error) { var err error var cid cid.Cid switch ds := dagScan.(type) { @@ -106,33 +123,33 @@ func (a API) executeDAGScan(ctx context.Context, dagScan model.DAGScan, nodeCB f case *model.DirectoryDAGScan: cid, err = a.executeDirectoryDAGScan(ctx, ds, nodeCB) default: - return fmt.Errorf("unrecognized DAG scan type: %T", dagScan) + return false, fmt.Errorf("unrecognized DAG scan type: %T", dagScan) } if err != nil { if errors.Is(err, context.Canceled) { - return fmt.Errorf("executing dag scan: %w", err) + return false, fmt.Errorf("executing dag scan: %w", err) } - return types.NewBadFSEntryError(dagScan.FsEntryID(), err) + return false, types.NewBadFSEntryError(dagScan.FsEntryID(), err) } // If we didn't get a CID back, it means the scan wasn't ready to complete. if !cid.Defined() { - return nil + return false, nil } log.Debugf("Completing DAG scan for %s with CID: %s", dagScan.FsEntryID(), cid) if err := dagScan.Complete(cid); err != nil { - return fmt.Errorf("completing dag scan: %w", err) + return false, fmt.Errorf("completing dag scan: %w", err) } // Update the scan in the repository after completion. log.Debugf("Updating dag scan %s after execution", dagScan.FsEntryID()) if err := a.Repo.UpdateDAGScan(ctx, dagScan); err != nil { - return fmt.Errorf("updating dag scan after execution: %w", err) + return false, fmt.Errorf("updating dag scan after execution: %w", err) } - return nil + return true, nil } func (a API) executeFileDAGScan(ctx context.Context, dagScan *model.FileDAGScan, nodeCB func(node model.Node, data []byte) error) (cid.Cid, error) { diff --git a/pkg/preparation/dags/dags_test.go b/pkg/preparation/dags/dags_test.go new file mode 100644 index 00000000..a943cb0b --- /dev/null +++ b/pkg/preparation/dags/dags_test.go @@ -0,0 +1,80 @@ +package dags_test + +import ( + "context" + "errors" + "io/fs" + "testing" + "time" + + "github.com/storacha/go-libstoracha/testutil" + "github.com/storacha/guppy/pkg/preparation/dags" + dagmodel "github.com/storacha/guppy/pkg/preparation/dags/model" + "github.com/storacha/guppy/pkg/preparation/internal/testdb" + scanmodel "github.com/storacha/guppy/pkg/preparation/scans/model" + "github.com/storacha/guppy/pkg/preparation/sqlrepo" + "github.com/storacha/guppy/pkg/preparation/types" + "github.com/storacha/guppy/pkg/preparation/types/id" + "github.com/stretchr/testify/require" +) + +// TestExecuteDagScansForUpload_TerminatesWhenBadChildBlocksParent is a +// regression test for a livelock in [dags.API.ExecuteDagScansForUpload]. An +// unreadable file leaves its parent directory perpetually "awaiting children": +// previously, the deferred directory scan was counted as progress on every +// pass, so the outer loop never returned and the upload pipeline appeared +// stuck (see stack.txt). With the fix, only completed scans count as progress, +// so a pass with nothing but bad files and their (blocked) ancestors exits +// with [types.BadFSEntriesError], which the upload worker can then hand to +// [uploads.API.handleBadFSEntries]. +func TestExecuteDagScansForUpload_TerminatesWhenBadChildBlocksParent(t *testing.T) { + repo := testutil.Must(sqlrepo.New(testdb.CreateTestDB(t)))(t) + spaceDID := testutil.RandomDID(t) + sourceID := id.New() + uploadID := id.New() + modTime := time.Now().UTC().Truncate(time.Second) + + parentDir, _, err := repo.FindOrCreateDirectory(t.Context(), "parent", modTime, fs.ModeDir|0755, []byte("parent-cksum"), sourceID, spaceDID) + require.NoError(t, err) + + badFile, _, err := repo.FindOrCreateFile(t.Context(), "parent/bad.txt", modTime, 0644, 100, []byte("bad-cksum"), sourceID, spaceDID) + require.NoError(t, err) + + require.NoError(t, repo.CreateDirectoryChildren(t.Context(), parentDir, []scanmodel.FSEntry{badFile})) + + _, err = repo.CreateDAGScan(t.Context(), badFile.ID(), false, uploadID, spaceDID) + require.NoError(t, err) + _, err = repo.CreateDAGScan(t.Context(), parentDir.ID(), true, uploadID, spaceDID) + require.NoError(t, err) + + errFileUnavailable := errors.New("simulated unreadable file (e.g. broken symlink)") + api := dags.API{ + Repo: repo, + FileAccessor: func(ctx context.Context, fsEntryID id.FSEntryID) (fs.File, id.SourceID, string, error) { + return nil, id.Nil, "", errFileUnavailable + }, + } + + // A bounded deadline is the backstop: if the livelock regresses, the + // function never returns and this test fails with context.DeadlineExceeded + // instead of hanging the whole suite. + ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) + defer cancel() + + nodeCalls := 0 + nodeCB := func(node dagmodel.Node, data []byte) error { + nodeCalls++ + return nil + } + + err = api.ExecuteDagScansForUpload(ctx, uploadID, nodeCB) + require.Error(t, err) + require.NotErrorIs(t, err, context.DeadlineExceeded, "ExecuteDagScansForUpload hung — the bad-child livelock has regressed") + + var badEntriesErr types.BadFSEntriesError + require.ErrorAs(t, err, &badEntriesErr) + require.Len(t, badEntriesErr.Errs(), 1) + require.Equal(t, badFile.ID(), badEntriesErr.Errs()[0].FsEntryID()) + require.ErrorIs(t, badEntriesErr.Errs()[0], errFileUnavailable) + require.Zero(t, nodeCalls, "no nodes should be produced when the only file is unreadable") +}