From 17375b79b125eea474f70c63fee4eb77ea181410 Mon Sep 17 00:00:00 2001 From: frrist Date: Thu, 16 Apr 2026 19:01:14 -0700 Subject: [PATCH] fix: avoid DAG scan livelock on bad FS entries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit An unreadable file (e.g. a broken /etc/alternatives symlink under /bin) produced a `types.BadFSEntryError` that left its `dag_scans` row with `cid IS NULL`. The parent directory's scan then returned `(cid.Undef, nil)` — "no error, deferred on incomplete children" — and the outer loop in `ExecuteDagScansForUpload` counted that as progress, so the `executions == 0 → return BadFSEntriesError` guard never fired and the upload pipeline appeared stuck. `executeDAGScan` now returns a `(completed, error)` pair so the outer loop can distinguish "scan got a CID" from "scan was deferred". Only completions count as progress, so a pass containing nothing but bad files and their blocked ancestors now exits with `BadFSEntriesError`, which the existing handler in `uploads.go` cleans up and the `--retry` path in `cmd/upload` can then resume. `badFsEntryErrs` is reset per pass to avoid duplicate entries in the returned error, and the stray in-loop `defer span.End()` is replaced with explicit calls on each exit path. Adds a regression test under a bounded deadline so any future reintroduction of the livelock fails fast instead of hanging. Co-Authored-By: Claude Opus 4.7 (1M context) --- pkg/preparation/dags/dags.go | 53 +++++++++++++------- pkg/preparation/dags/dags_test.go | 80 +++++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+), 18 deletions(-) create mode 100644 pkg/preparation/dags/dags_test.go diff --git a/pkg/preparation/dags/dags.go b/pkg/preparation/dags/dags.go index 6d126e92..bbfc66c6 100644 --- a/pkg/preparation/dags/dags.go +++ b/pkg/preparation/dags/dags.go @@ -50,44 +50,58 @@ var _ uploads.RemoveBadNodesFunc = API{}.RemoveBadNodes // ExecuteDagScansForUpload runs all pending and awaiting children DAG scans for the given upload, until there are no more scans to process. func (a API) ExecuteDagScansForUpload(ctx context.Context, uploadID id.UploadID, nodeCB func(node model.Node, data []byte) error) error { - var badFsEntryErrs []types.BadFSEntryError for { - ctx, span := tracer.Start(ctx, "dag-scans-batch", trace.WithAttributes( + passCtx, span := tracer.Start(ctx, "dag-scans-batch", trace.WithAttributes( attribute.String("upload.id", uploadID.String()), )) - defer span.End() // In case of early return - dagScans, err := a.Repo.IncompleteDAGScansForUpload(ctx, uploadID) + dagScans, err := a.Repo.IncompleteDAGScansForUpload(passCtx, uploadID) if err != nil { + span.End() return fmt.Errorf("getting dag scans for upload %s: %w", uploadID, err) } log.Debugf("Found %d pending or awaiting children dag scans for upload %s", len(dagScans), uploadID) if len(dagScans) == 0 { + span.End() return nil // No pending or awaiting children scans found, exit the loop } - executions := 0 + + // badFsEntryErrs is reset each pass so that, when the loop exits because + // a pass made no completions, we return only the bad entries discovered + // in the final pass — not accumulated duplicates from earlier passes. + var badFsEntryErrs []types.BadFSEntryError + completions := 0 for _, dagScan := range dagScans { if dagScan.CID().Defined() { + span.End() return fmt.Errorf("tried to execute completed dag scan %s (cid %s)", dagScan.FsEntryID(), dagScan.CID()) } log.Debugf("Executing dag scan %s", dagScan.FsEntryID()) - if err := a.executeDAGScan(ctx, dagScan, nodeCB); err != nil { + completed, err := a.executeDAGScan(passCtx, dagScan, nodeCB) + if err != nil { var errBadFSEntry types.BadFSEntryError if errors.As(err, &errBadFSEntry) { badFsEntryErrs = append(badFsEntryErrs, errBadFSEntry) continue // Continue with the next dag scan } + span.End() return fmt.Errorf("executing dag scan %s: %w", dagScan.FsEntryID(), err) } - executions++ - span.SetAttributes(attribute.Int("executed", executions)) + if completed { + completions++ + span.SetAttributes(attribute.Int("executed", completions)) + } } span.End() - if executions == 0 { + // Only completions count as progress. A directory that was deferred + // because its children are still incomplete returns (false, nil) and + // must not keep the loop alive — otherwise a bad file child causes its + // parent to "execute" forever without ever completing. + if completions == 0 { if len(badFsEntryErrs) > 0 { return types.NewBadFSEntriesError(badFsEntryErrs) } @@ -96,8 +110,11 @@ func (a API) ExecuteDagScansForUpload(ctx context.Context, uploadID id.UploadID, } } -// executeDAGScan executes a dag scan on the given fs entry, creating a unix fs dag for the given file or directory. -func (a API) executeDAGScan(ctx context.Context, dagScan model.DAGScan, nodeCB func(node model.Node, data []byte) error) error { +// executeDAGScan executes a dag scan on the given fs entry, creating a unix fs +// dag for the given file or directory. The returned boolean reports whether +// the scan completed (got a CID and was persisted); a directory scan deferred +// because of incomplete children returns (false, nil). +func (a API) executeDAGScan(ctx context.Context, dagScan model.DAGScan, nodeCB func(node model.Node, data []byte) error) (bool, error) { var err error var cid cid.Cid switch ds := dagScan.(type) { @@ -106,33 +123,33 @@ func (a API) executeDAGScan(ctx context.Context, dagScan model.DAGScan, nodeCB f case *model.DirectoryDAGScan: cid, err = a.executeDirectoryDAGScan(ctx, ds, nodeCB) default: - return fmt.Errorf("unrecognized DAG scan type: %T", dagScan) + return false, fmt.Errorf("unrecognized DAG scan type: %T", dagScan) } if err != nil { if errors.Is(err, context.Canceled) { - return fmt.Errorf("executing dag scan: %w", err) + return false, fmt.Errorf("executing dag scan: %w", err) } - return types.NewBadFSEntryError(dagScan.FsEntryID(), err) + return false, types.NewBadFSEntryError(dagScan.FsEntryID(), err) } // If we didn't get a CID back, it means the scan wasn't ready to complete. if !cid.Defined() { - return nil + return false, nil } log.Debugf("Completing DAG scan for %s with CID: %s", dagScan.FsEntryID(), cid) if err := dagScan.Complete(cid); err != nil { - return fmt.Errorf("completing dag scan: %w", err) + return false, fmt.Errorf("completing dag scan: %w", err) } // Update the scan in the repository after completion. log.Debugf("Updating dag scan %s after execution", dagScan.FsEntryID()) if err := a.Repo.UpdateDAGScan(ctx, dagScan); err != nil { - return fmt.Errorf("updating dag scan after execution: %w", err) + return false, fmt.Errorf("updating dag scan after execution: %w", err) } - return nil + return true, nil } func (a API) executeFileDAGScan(ctx context.Context, dagScan *model.FileDAGScan, nodeCB func(node model.Node, data []byte) error) (cid.Cid, error) { diff --git a/pkg/preparation/dags/dags_test.go b/pkg/preparation/dags/dags_test.go new file mode 100644 index 00000000..a943cb0b --- /dev/null +++ b/pkg/preparation/dags/dags_test.go @@ -0,0 +1,80 @@ +package dags_test + +import ( + "context" + "errors" + "io/fs" + "testing" + "time" + + "github.com/storacha/go-libstoracha/testutil" + "github.com/storacha/guppy/pkg/preparation/dags" + dagmodel "github.com/storacha/guppy/pkg/preparation/dags/model" + "github.com/storacha/guppy/pkg/preparation/internal/testdb" + scanmodel "github.com/storacha/guppy/pkg/preparation/scans/model" + "github.com/storacha/guppy/pkg/preparation/sqlrepo" + "github.com/storacha/guppy/pkg/preparation/types" + "github.com/storacha/guppy/pkg/preparation/types/id" + "github.com/stretchr/testify/require" +) + +// TestExecuteDagScansForUpload_TerminatesWhenBadChildBlocksParent is a +// regression test for a livelock in [dags.API.ExecuteDagScansForUpload]. An +// unreadable file leaves its parent directory perpetually "awaiting children": +// previously, the deferred directory scan was counted as progress on every +// pass, so the outer loop never returned and the upload pipeline appeared +// stuck (see stack.txt). With the fix, only completed scans count as progress, +// so a pass with nothing but bad files and their (blocked) ancestors exits +// with [types.BadFSEntriesError], which the upload worker can then hand to +// [uploads.API.handleBadFSEntries]. +func TestExecuteDagScansForUpload_TerminatesWhenBadChildBlocksParent(t *testing.T) { + repo := testutil.Must(sqlrepo.New(testdb.CreateTestDB(t)))(t) + spaceDID := testutil.RandomDID(t) + sourceID := id.New() + uploadID := id.New() + modTime := time.Now().UTC().Truncate(time.Second) + + parentDir, _, err := repo.FindOrCreateDirectory(t.Context(), "parent", modTime, fs.ModeDir|0755, []byte("parent-cksum"), sourceID, spaceDID) + require.NoError(t, err) + + badFile, _, err := repo.FindOrCreateFile(t.Context(), "parent/bad.txt", modTime, 0644, 100, []byte("bad-cksum"), sourceID, spaceDID) + require.NoError(t, err) + + require.NoError(t, repo.CreateDirectoryChildren(t.Context(), parentDir, []scanmodel.FSEntry{badFile})) + + _, err = repo.CreateDAGScan(t.Context(), badFile.ID(), false, uploadID, spaceDID) + require.NoError(t, err) + _, err = repo.CreateDAGScan(t.Context(), parentDir.ID(), true, uploadID, spaceDID) + require.NoError(t, err) + + errFileUnavailable := errors.New("simulated unreadable file (e.g. broken symlink)") + api := dags.API{ + Repo: repo, + FileAccessor: func(ctx context.Context, fsEntryID id.FSEntryID) (fs.File, id.SourceID, string, error) { + return nil, id.Nil, "", errFileUnavailable + }, + } + + // A bounded deadline is the backstop: if the livelock regresses, the + // function never returns and this test fails with context.DeadlineExceeded + // instead of hanging the whole suite. + ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) + defer cancel() + + nodeCalls := 0 + nodeCB := func(node dagmodel.Node, data []byte) error { + nodeCalls++ + return nil + } + + err = api.ExecuteDagScansForUpload(ctx, uploadID, nodeCB) + require.Error(t, err) + require.NotErrorIs(t, err, context.DeadlineExceeded, "ExecuteDagScansForUpload hung — the bad-child livelock has regressed") + + var badEntriesErr types.BadFSEntriesError + require.ErrorAs(t, err, &badEntriesErr) + require.Len(t, badEntriesErr.Errs(), 1) + require.Equal(t, badFile.ID(), badEntriesErr.Errs()[0].FsEntryID()) + require.ErrorIs(t, badEntriesErr.Errs()[0], errFileUnavailable) + require.Zero(t, nodeCalls, "no nodes should be produced when the only file is unreadable") +}