Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions pkg/preparation/dags/model/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,11 +282,20 @@ func ReadNodeFromDatabase(scanner NodeScanner) (Node, error) {
}
return unixFSNode, nil
case cid.Raw:
var p string
var o uint64
if path != nil {
p = *path
}
if offset != nil {
o = *offset
}

rawNode := &RawNode{
node: node,
path: *path,
path: p,
sourceID: sourceID,
offset: *offset,
offset: o,
}
if err := validateRawNode(rawNode); err != nil {
return nil, err
Expand Down
186 changes: 79 additions & 107 deletions pkg/preparation/sqlrepo/dags.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,41 +185,6 @@ func (r *Repo) DirectoryLinks(ctx context.Context, dirScan *model.DirectoryDAGSc
return links, nil
}

type nodeFinder struct {
r *Repo
stmt *sql.Stmt
ctx context.Context
}

func (n nodeFinder) Find(spaceDID did.DID, c cid.Cid, tx *sql.Tx) (model.Node, error) {
stmt := n.stmt
if tx != nil {
stmt = tx.StmtContext(n.ctx, stmt)
}
row := stmt.QueryRowContext(n.ctx, util.DbCID(&c), util.DbDID(&spaceDID))
return n.r.getNodeFromRow(row)
}

func (r *Repo) nodeFinder(ctx context.Context) (nodeFinder, error) {
stmt, err := r.prepareStmt(ctx, `
SELECT
cid,
size,
space_did,
ufsdata,
path,
source_id,
offset
FROM nodes
WHERE cid = ?
AND space_did = ?
`)
if err != nil {
return nodeFinder{}, fmt.Errorf("failed to prepare statement: %w", err)
}
return nodeFinder{r: r, stmt: stmt, ctx: ctx}, nil
}

// RowScanner can scan a row into a set of destinations. It should not be
// confused with [sql.Scanner], which is used to scan a single value.
type RowScanner interface {
Expand All @@ -246,9 +211,10 @@ type nodeCreator struct {

func (r *Repo) nodeCreator(ctx context.Context) (nodeCreator, error) {
stmt, err := r.prepareStmt(ctx, `
INSERT INTO nodes (cid, size, space_did, ufsdata, path, source_id, offset)
VALUES (?, ?, ?, ?, ?, ?, ?)
`)
INSERT INTO nodes (cid, size, space_did, ufsdata, path, source_id, offset)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(cid, space_did) DO NOTHING
`)

if err != nil {
return nodeCreator{}, fmt.Errorf("failed to prepare statement: %w", err)
Expand Down Expand Up @@ -305,8 +271,7 @@ func (n nodeCreator) Create(node model.Node, uploadID id.UploadID, tx *sql.Tx) e
uploadID,
)
var dummy int
err = row.Scan(&dummy)
if err == nil {
if err := row.Scan(&dummy); err == nil {
return nil // Record already exists, nothing to do
}
createNodeUploadStmt := n.createNodeUploadStmt
Expand All @@ -324,66 +289,79 @@ func (n nodeCreator) Create(node model.Node, uploadID id.UploadID, tx *sql.Tx) e
// If a node with the same CID, size, path, source ID, and offset already exists, it returns that node.
// If not, it creates a new raw node with the provided parameters.
func (r *Repo) FindOrCreateRawNode(ctx context.Context, cid cid.Cid, size uint64, spaceDID did.DID, uploadID id.UploadID, path string, sourceID id.SourceID, offset uint64) (*model.RawNode, bool, error) {
nf, err := r.nodeFinder(ctx)
// Check if the node is already linked to this upload.
var existsInUpload bool
err := r.db.QueryRowContext(ctx, `
SELECT EXISTS (
SELECT 1 FROM node_uploads
WHERE node_cid = ? AND space_did = ? AND upload_id = ?
)
`, util.DbCID(&cid), util.DbDID(&spaceDID), uploadID).Scan(&existsInUpload)

if err != nil {
return nil, false, err
return nil, false, fmt.Errorf("checking node upload existence: %w", err)
}

if existsInUpload {
node, err := model.NewRawNode(cid, size, spaceDID, path, sourceID, offset)
if err != nil {
return nil, false, err
}
return node, false, nil
}
nc, err := r.nodeCreator(ctx)
if err != nil {
return nil, false, err
}

tx, err := r.db.BeginTx(ctx, nil)
if err != nil {
return nil, false, err
}
defer tx.Rollback()

node, err := nf.Find(spaceDID, cid, tx)
newNode, err := model.NewRawNode(cid, size, spaceDID, path, sourceID, offset)
if err != nil {
return nil, false, err
}
if node != nil {
// File already exists, return it
if rawNode, ok := node.(*model.RawNode); ok {
return rawNode, false, tx.Commit()
}
return nil, false, errors.New("found entry is not a raw node")
}

newNode, err := model.NewRawNode(cid, size, spaceDID, path, sourceID, offset)
err = nc.Create(newNode, uploadID, tx)
if err != nil {
return nil, false, err
}

err = nc.Create(newNode, uploadID, tx)
if err != nil {
if err := tx.Commit(); err != nil {
return nil, false, err
}

return newNode, true, tx.Commit()
return newNode, true, nil
}

// FindOrCreateUnixFSNode finds or creates a UnixFS node in the repository.
// If a node with the same CID, size, and ufsdata already exists, it returns that node.
// If not, it creates a new UnixFS node with the provided parameters.
func (r *Repo) FindOrCreateUnixFSNode(ctx context.Context, cid cid.Cid, size uint64, spaceDID did.DID, uploadID id.UploadID, ufsdata []byte, linkParams []model.LinkParams) (*model.UnixFSNode, bool, error) {
insertQuery, err := r.prepareStmt(ctx, `
INSERT INTO links (
name,
t_size,
hash,
parent_id,
space_did,
ordering
) VALUES (?, ?, ?, ?, ?, ?)`)
// Check if complete
var existsInUpload bool
err := r.db.QueryRowContext(ctx, `
SELECT EXISTS (
SELECT 1 FROM node_uploads
WHERE node_cid = ? AND space_did = ? AND upload_id = ?
)
`, util.DbCID(&cid), util.DbDID(&spaceDID), uploadID).Scan(&existsInUpload)

if err != nil {
return nil, false, fmt.Errorf("failed to prepare statement: %w", err)
return nil, false, fmt.Errorf("checking node upload existence: %w", err)
}

nf, err := r.nodeFinder(ctx)
if err != nil {
return nil, false, err
if existsInUpload {
newNode, err := model.NewUnixFSNode(cid, size, spaceDID, ufsdata)
if err != nil {
return nil, false, err
}
return newNode, false, nil
}

nc, err := r.nodeCreator(ctx)
if err != nil {
return nil, false, err
Expand All @@ -395,62 +373,56 @@ func (r *Repo) FindOrCreateUnixFSNode(ctx context.Context, cid cid.Cid, size uin
}
defer tx.Rollback()

node, err := nf.Find(spaceDID, cid, tx)
if err != nil {
return nil, false, err
}
if node != nil {
// File already exists, return it
if unixFSNode, ok := node.(*model.UnixFSNode); ok {
return unixFSNode, false, tx.Commit()
}
return nil, false, errors.New("found entry is not a UnixFS node")
}

newNode, err := model.NewUnixFSNode(cid, size, spaceDID, ufsdata)
if err != nil {
return nil, false, err
}

err = nc.Create(newNode, uploadID, tx)

if err != nil {
return nil, false, err
}

if len(linkParams) == 0 {
return newNode, true, tx.Commit()
}

links := make([]*model.Link, 0, len(linkParams))
for i, p := range linkParams {
link, err := model.NewLink(p, cid, spaceDID, uint64(i))
// Insert Links
if len(linkParams) > 0 {
insertLinkStmt, err := tx.PrepareContext(ctx, `
INSERT INTO links (name, t_size, hash, parent_id, space_did, ordering)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT DO NOTHING
`)
if err != nil {
return nil, false, err
}
links = append(links, link)
}

insertQuery = tx.StmtContext(ctx, insertQuery)

for _, link := range links {
hash := link.Hash()
parent := link.Parent()

_, err := insertQuery.ExecContext(
ctx,
link.Name(),
link.TSize(),
util.DbCID(&hash),
util.DbCID(&parent),
util.DbDID(&spaceDID),
link.Order(),
)
if err != nil {
return nil, false, fmt.Errorf("failed to insert link %s for parent %s: %w", link.Name(), parent, err)
for i, p := range linkParams {
link, err := model.NewLink(p, cid, spaceDID, uint64(i))
if err != nil {
return nil, false, err
}
hash := link.Hash()
parent := link.Parent()

_, err = insertLinkStmt.ExecContext(
ctx,
link.Name(),
link.TSize(),
util.DbCID(&hash),
util.DbCID(&parent),
util.DbDID(&spaceDID),
link.Order(),
)
if err != nil {
return nil, false, err
}
}
}
return newNode, true, tx.Commit()

if err := tx.Commit(); err != nil {
return nil, false, err
}

// Return created=true to trigger the callback and link the node to the upload/shard.
return newNode, true, nil
}

// HasIncompleteChildren returns whether the given directory scan has at least
Expand Down