diff --git a/pkg/preparation/dags/model/node.go b/pkg/preparation/dags/model/node.go index f3594a82..1e0e65cd 100644 --- a/pkg/preparation/dags/model/node.go +++ b/pkg/preparation/dags/model/node.go @@ -282,11 +282,20 @@ func ReadNodeFromDatabase(scanner NodeScanner) (Node, error) { } return unixFSNode, nil case cid.Raw: + var p string + var o uint64 + if path != nil { + p = *path + } + if offset != nil { + o = *offset + } + rawNode := &RawNode{ node: node, - path: *path, + path: p, sourceID: sourceID, - offset: *offset, + offset: o, } if err := validateRawNode(rawNode); err != nil { return nil, err diff --git a/pkg/preparation/sqlrepo/dags.go b/pkg/preparation/sqlrepo/dags.go index 398b8408..5f3fd3a1 100644 --- a/pkg/preparation/sqlrepo/dags.go +++ b/pkg/preparation/sqlrepo/dags.go @@ -185,41 +185,6 @@ func (r *Repo) DirectoryLinks(ctx context.Context, dirScan *model.DirectoryDAGSc return links, nil } -type nodeFinder struct { - r *Repo - stmt *sql.Stmt - ctx context.Context -} - -func (n nodeFinder) Find(spaceDID did.DID, c cid.Cid, tx *sql.Tx) (model.Node, error) { - stmt := n.stmt - if tx != nil { - stmt = tx.StmtContext(n.ctx, stmt) - } - row := stmt.QueryRowContext(n.ctx, util.DbCID(&c), util.DbDID(&spaceDID)) - return n.r.getNodeFromRow(row) -} - -func (r *Repo) nodeFinder(ctx context.Context) (nodeFinder, error) { - stmt, err := r.prepareStmt(ctx, ` - SELECT - cid, - size, - space_did, - ufsdata, - path, - source_id, - offset - FROM nodes - WHERE cid = ? - AND space_did = ? - `) - if err != nil { - return nodeFinder{}, fmt.Errorf("failed to prepare statement: %w", err) - } - return nodeFinder{r: r, stmt: stmt, ctx: ctx}, nil -} - // RowScanner can scan a row into a set of destinations. It should not be // confused with [sql.Scanner], which is used to scan a single value. type RowScanner interface { @@ -246,9 +211,10 @@ type nodeCreator struct { func (r *Repo) nodeCreator(ctx context.Context) (nodeCreator, error) { stmt, err := r.prepareStmt(ctx, ` - INSERT INTO nodes (cid, size, space_did, ufsdata, path, source_id, offset) - VALUES (?, ?, ?, ?, ?, ?, ?) - `) + INSERT INTO nodes (cid, size, space_did, ufsdata, path, source_id, offset) + VALUES (?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(cid, space_did) DO NOTHING + `) if err != nil { return nodeCreator{}, fmt.Errorf("failed to prepare statement: %w", err) @@ -305,8 +271,7 @@ func (n nodeCreator) Create(node model.Node, uploadID id.UploadID, tx *sql.Tx) e uploadID, ) var dummy int - err = row.Scan(&dummy) - if err == nil { + if err := row.Scan(&dummy); err == nil { return nil // Record already exists, nothing to do } createNodeUploadStmt := n.createNodeUploadStmt @@ -324,66 +289,79 @@ func (n nodeCreator) Create(node model.Node, uploadID id.UploadID, tx *sql.Tx) e // If a node with the same CID, size, path, source ID, and offset already exists, it returns that node. // If not, it creates a new raw node with the provided parameters. func (r *Repo) FindOrCreateRawNode(ctx context.Context, cid cid.Cid, size uint64, spaceDID did.DID, uploadID id.UploadID, path string, sourceID id.SourceID, offset uint64) (*model.RawNode, bool, error) { - nf, err := r.nodeFinder(ctx) + // Check if the node is already linked to this upload. + var existsInUpload bool + err := r.db.QueryRowContext(ctx, ` + SELECT EXISTS ( + SELECT 1 FROM node_uploads + WHERE node_cid = ? AND space_did = ? AND upload_id = ? + ) + `, util.DbCID(&cid), util.DbDID(&spaceDID), uploadID).Scan(&existsInUpload) + if err != nil { - return nil, false, err + return nil, false, fmt.Errorf("checking node upload existence: %w", err) + } + + if existsInUpload { + node, err := model.NewRawNode(cid, size, spaceDID, path, sourceID, offset) + if err != nil { + return nil, false, err + } + return node, false, nil } nc, err := r.nodeCreator(ctx) if err != nil { return nil, false, err } + tx, err := r.db.BeginTx(ctx, nil) if err != nil { return nil, false, err } defer tx.Rollback() - node, err := nf.Find(spaceDID, cid, tx) + newNode, err := model.NewRawNode(cid, size, spaceDID, path, sourceID, offset) if err != nil { return nil, false, err } - if node != nil { - // File already exists, return it - if rawNode, ok := node.(*model.RawNode); ok { - return rawNode, false, tx.Commit() - } - return nil, false, errors.New("found entry is not a raw node") - } - newNode, err := model.NewRawNode(cid, size, spaceDID, path, sourceID, offset) + err = nc.Create(newNode, uploadID, tx) if err != nil { return nil, false, err } - err = nc.Create(newNode, uploadID, tx) - if err != nil { + if err := tx.Commit(); err != nil { return nil, false, err } - return newNode, true, tx.Commit() + return newNode, true, nil } // FindOrCreateUnixFSNode finds or creates a UnixFS node in the repository. // If a node with the same CID, size, and ufsdata already exists, it returns that node. // If not, it creates a new UnixFS node with the provided parameters. func (r *Repo) FindOrCreateUnixFSNode(ctx context.Context, cid cid.Cid, size uint64, spaceDID did.DID, uploadID id.UploadID, ufsdata []byte, linkParams []model.LinkParams) (*model.UnixFSNode, bool, error) { - insertQuery, err := r.prepareStmt(ctx, ` - INSERT INTO links ( - name, - t_size, - hash, - parent_id, - space_did, - ordering - ) VALUES (?, ?, ?, ?, ?, ?)`) + // Check if complete + var existsInUpload bool + err := r.db.QueryRowContext(ctx, ` + SELECT EXISTS ( + SELECT 1 FROM node_uploads + WHERE node_cid = ? AND space_did = ? AND upload_id = ? + ) + `, util.DbCID(&cid), util.DbDID(&spaceDID), uploadID).Scan(&existsInUpload) + if err != nil { - return nil, false, fmt.Errorf("failed to prepare statement: %w", err) + return nil, false, fmt.Errorf("checking node upload existence: %w", err) } - nf, err := r.nodeFinder(ctx) - if err != nil { - return nil, false, err + if existsInUpload { + newNode, err := model.NewUnixFSNode(cid, size, spaceDID, ufsdata) + if err != nil { + return nil, false, err + } + return newNode, false, nil } + nc, err := r.nodeCreator(ctx) if err != nil { return nil, false, err @@ -395,62 +373,56 @@ func (r *Repo) FindOrCreateUnixFSNode(ctx context.Context, cid cid.Cid, size uin } defer tx.Rollback() - node, err := nf.Find(spaceDID, cid, tx) - if err != nil { - return nil, false, err - } - if node != nil { - // File already exists, return it - if unixFSNode, ok := node.(*model.UnixFSNode); ok { - return unixFSNode, false, tx.Commit() - } - return nil, false, errors.New("found entry is not a UnixFS node") - } - newNode, err := model.NewUnixFSNode(cid, size, spaceDID, ufsdata) if err != nil { return nil, false, err } err = nc.Create(newNode, uploadID, tx) - if err != nil { return nil, false, err } - if len(linkParams) == 0 { - return newNode, true, tx.Commit() - } - - links := make([]*model.Link, 0, len(linkParams)) - for i, p := range linkParams { - link, err := model.NewLink(p, cid, spaceDID, uint64(i)) + // Insert Links + if len(linkParams) > 0 { + insertLinkStmt, err := tx.PrepareContext(ctx, ` + INSERT INTO links (name, t_size, hash, parent_id, space_did, ordering) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT DO NOTHING + `) if err != nil { return nil, false, err } - links = append(links, link) - } - - insertQuery = tx.StmtContext(ctx, insertQuery) - - for _, link := range links { - hash := link.Hash() - parent := link.Parent() - _, err := insertQuery.ExecContext( - ctx, - link.Name(), - link.TSize(), - util.DbCID(&hash), - util.DbCID(&parent), - util.DbDID(&spaceDID), - link.Order(), - ) - if err != nil { - return nil, false, fmt.Errorf("failed to insert link %s for parent %s: %w", link.Name(), parent, err) + for i, p := range linkParams { + link, err := model.NewLink(p, cid, spaceDID, uint64(i)) + if err != nil { + return nil, false, err + } + hash := link.Hash() + parent := link.Parent() + + _, err = insertLinkStmt.ExecContext( + ctx, + link.Name(), + link.TSize(), + util.DbCID(&hash), + util.DbCID(&parent), + util.DbDID(&spaceDID), + link.Order(), + ) + if err != nil { + return nil, false, err + } } } - return newNode, true, tx.Commit() + + if err := tx.Commit(); err != nil { + return nil, false, err + } + + // Return created=true to trigger the callback and link the node to the upload/shard. + return newNode, true, nil } // HasIncompleteChildren returns whether the given directory scan has at least