Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 35 additions & 18 deletions pkg/preparation/dags/dags.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,44 +50,58 @@ var _ uploads.RemoveBadNodesFunc = API{}.RemoveBadNodes

// ExecuteDagScansForUpload runs all pending and awaiting children DAG scans for the given upload, until there are no more scans to process.
func (a API) ExecuteDagScansForUpload(ctx context.Context, uploadID id.UploadID, nodeCB func(node model.Node, data []byte) error) error {
var badFsEntryErrs []types.BadFSEntryError
for {
ctx, span := tracer.Start(ctx, "dag-scans-batch", trace.WithAttributes(
passCtx, span := tracer.Start(ctx, "dag-scans-batch", trace.WithAttributes(
attribute.String("upload.id", uploadID.String()),
))
defer span.End() // In case of early return

dagScans, err := a.Repo.IncompleteDAGScansForUpload(ctx, uploadID)
dagScans, err := a.Repo.IncompleteDAGScansForUpload(passCtx, uploadID)
if err != nil {
span.End()
return fmt.Errorf("getting dag scans for upload %s: %w", uploadID, err)
}
log.Debugf("Found %d pending or awaiting children dag scans for upload %s", len(dagScans), uploadID)
if len(dagScans) == 0 {
span.End()
return nil // No pending or awaiting children scans found, exit the loop
}
executions := 0

// badFsEntryErrs is reset each pass so that, when the loop exits because
// a pass made no completions, we return only the bad entries discovered
// in the final pass — not accumulated duplicates from earlier passes.
var badFsEntryErrs []types.BadFSEntryError
completions := 0
for _, dagScan := range dagScans {
if dagScan.CID().Defined() {
span.End()
return fmt.Errorf("tried to execute completed dag scan %s (cid %s)", dagScan.FsEntryID(), dagScan.CID())
}

log.Debugf("Executing dag scan %s", dagScan.FsEntryID())
if err := a.executeDAGScan(ctx, dagScan, nodeCB); err != nil {
completed, err := a.executeDAGScan(passCtx, dagScan, nodeCB)
if err != nil {
var errBadFSEntry types.BadFSEntryError
if errors.As(err, &errBadFSEntry) {
badFsEntryErrs = append(badFsEntryErrs, errBadFSEntry)
continue // Continue with the next dag scan
}
span.End()
return fmt.Errorf("executing dag scan %s: %w", dagScan.FsEntryID(), err)
}

executions++
span.SetAttributes(attribute.Int("executed", executions))
if completed {
completions++
span.SetAttributes(attribute.Int("executed", completions))
}
}

span.End()

if executions == 0 {
// Only completions count as progress. A directory that was deferred
// because its children are still incomplete returns (false, nil) and
// must not keep the loop alive — otherwise a bad file child causes its
// parent to "execute" forever without ever completing.
if completions == 0 {
if len(badFsEntryErrs) > 0 {
return types.NewBadFSEntriesError(badFsEntryErrs)
}
Expand All @@ -96,8 +110,11 @@ func (a API) ExecuteDagScansForUpload(ctx context.Context, uploadID id.UploadID,
}
}

// executeDAGScan executes a dag scan on the given fs entry, creating a unix fs dag for the given file or directory.
func (a API) executeDAGScan(ctx context.Context, dagScan model.DAGScan, nodeCB func(node model.Node, data []byte) error) error {
// executeDAGScan executes a dag scan on the given fs entry, creating a unix fs
// dag for the given file or directory. The returned boolean reports whether
// the scan completed (got a CID and was persisted); a directory scan deferred
// because of incomplete children returns (false, nil).
func (a API) executeDAGScan(ctx context.Context, dagScan model.DAGScan, nodeCB func(node model.Node, data []byte) error) (bool, error) {
var err error
var cid cid.Cid
switch ds := dagScan.(type) {
Expand All @@ -106,33 +123,33 @@ func (a API) executeDAGScan(ctx context.Context, dagScan model.DAGScan, nodeCB f
case *model.DirectoryDAGScan:
cid, err = a.executeDirectoryDAGScan(ctx, ds, nodeCB)
default:
return fmt.Errorf("unrecognized DAG scan type: %T", dagScan)
return false, fmt.Errorf("unrecognized DAG scan type: %T", dagScan)
}

if err != nil {
if errors.Is(err, context.Canceled) {
return fmt.Errorf("executing dag scan: %w", err)
return false, fmt.Errorf("executing dag scan: %w", err)
}

return types.NewBadFSEntryError(dagScan.FsEntryID(), err)
return false, types.NewBadFSEntryError(dagScan.FsEntryID(), err)
}

// If we didn't get a CID back, it means the scan wasn't ready to complete.
if !cid.Defined() {
return nil
return false, nil
}

log.Debugf("Completing DAG scan for %s with CID: %s", dagScan.FsEntryID(), cid)
if err := dagScan.Complete(cid); err != nil {
return fmt.Errorf("completing dag scan: %w", err)
return false, fmt.Errorf("completing dag scan: %w", err)
}

// Update the scan in the repository after completion.
log.Debugf("Updating dag scan %s after execution", dagScan.FsEntryID())
if err := a.Repo.UpdateDAGScan(ctx, dagScan); err != nil {
return fmt.Errorf("updating dag scan after execution: %w", err)
return false, fmt.Errorf("updating dag scan after execution: %w", err)
}
return nil
return true, nil
}

func (a API) executeFileDAGScan(ctx context.Context, dagScan *model.FileDAGScan, nodeCB func(node model.Node, data []byte) error) (cid.Cid, error) {
Expand Down
80 changes: 80 additions & 0 deletions pkg/preparation/dags/dags_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package dags_test

import (
"context"
"errors"
"io/fs"
"testing"
"time"

"github.com/storacha/go-libstoracha/testutil"
"github.com/storacha/guppy/pkg/preparation/dags"
dagmodel "github.com/storacha/guppy/pkg/preparation/dags/model"
"github.com/storacha/guppy/pkg/preparation/internal/testdb"
scanmodel "github.com/storacha/guppy/pkg/preparation/scans/model"
"github.com/storacha/guppy/pkg/preparation/sqlrepo"
"github.com/storacha/guppy/pkg/preparation/types"
"github.com/storacha/guppy/pkg/preparation/types/id"
"github.com/stretchr/testify/require"
)

// TestExecuteDagScansForUpload_TerminatesWhenBadChildBlocksParent is a
// regression test for a livelock in [dags.API.ExecuteDagScansForUpload]. An
// unreadable file leaves its parent directory perpetually "awaiting children":
// previously, the deferred directory scan was counted as progress on every
// pass, so the outer loop never returned and the upload pipeline appeared
// stuck (see stack.txt). With the fix, only completed scans count as progress,
// so a pass with nothing but bad files and their (blocked) ancestors exits
// with [types.BadFSEntriesError], which the upload worker can then hand to
// [uploads.API.handleBadFSEntries].
func TestExecuteDagScansForUpload_TerminatesWhenBadChildBlocksParent(t *testing.T) {
repo := testutil.Must(sqlrepo.New(testdb.CreateTestDB(t)))(t)
spaceDID := testutil.RandomDID(t)
sourceID := id.New()
uploadID := id.New()
modTime := time.Now().UTC().Truncate(time.Second)

parentDir, _, err := repo.FindOrCreateDirectory(t.Context(), "parent", modTime, fs.ModeDir|0755, []byte("parent-cksum"), sourceID, spaceDID)
require.NoError(t, err)

badFile, _, err := repo.FindOrCreateFile(t.Context(), "parent/bad.txt", modTime, 0644, 100, []byte("bad-cksum"), sourceID, spaceDID)
require.NoError(t, err)

require.NoError(t, repo.CreateDirectoryChildren(t.Context(), parentDir, []scanmodel.FSEntry{badFile}))

_, err = repo.CreateDAGScan(t.Context(), badFile.ID(), false, uploadID, spaceDID)
require.NoError(t, err)
_, err = repo.CreateDAGScan(t.Context(), parentDir.ID(), true, uploadID, spaceDID)
require.NoError(t, err)

errFileUnavailable := errors.New("simulated unreadable file (e.g. broken symlink)")
api := dags.API{
Repo: repo,
FileAccessor: func(ctx context.Context, fsEntryID id.FSEntryID) (fs.File, id.SourceID, string, error) {
return nil, id.Nil, "", errFileUnavailable
},
}

// A bounded deadline is the backstop: if the livelock regresses, the
// function never returns and this test fails with context.DeadlineExceeded
// instead of hanging the whole suite.
ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second)
defer cancel()

nodeCalls := 0
nodeCB := func(node dagmodel.Node, data []byte) error {
nodeCalls++
return nil
}

err = api.ExecuteDagScansForUpload(ctx, uploadID, nodeCB)
require.Error(t, err)
require.NotErrorIs(t, err, context.DeadlineExceeded, "ExecuteDagScansForUpload hung — the bad-child livelock has regressed")

var badEntriesErr types.BadFSEntriesError
require.ErrorAs(t, err, &badEntriesErr)
require.Len(t, badEntriesErr.Errs(), 1)
require.Equal(t, badFile.ID(), badEntriesErr.Errs()[0].FsEntryID())
require.ErrorIs(t, badEntriesErr.Errs()[0], errFileUnavailable)
require.Zero(t, nodeCalls, "no nodes should be produced when the only file is unreadable")
}
Loading