[ENH]: Allow cascading functions#7129
Conversation
Reviewer ChecklistPlease leverage this checklist to ensure your code review is thorough before approving Testing, Bugs, Errors, Logs, Documentation
System Compatibility
Quality
|
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
| return globalDB.WithContext(ctx) | ||
| } | ||
|
|
||
| func TryGetDB(ctx context.Context) *gorm.DB { |
| return maxDepth, nil | ||
| } | ||
|
|
||
| func (g *attachedFunctionGraphState) incomingCollectionIDs(collectionID string) ([]string, error) { |
There was a problem hiding this comment.
make graph tests
| coordinator *Coordinator | ||
| ctx context.Context | ||
| databaseName string | ||
| incomingByOutput map[string][]*dbmodel.AttachedFunction |
There was a problem hiding this comment.
would be good to comment these fields
| return incoming, nil | ||
| } | ||
|
|
||
| incoming, err := g.coordinator.catalog.metaDomain.AttachedFunctionDb(g.ctx).GetAttachedFunctions(nil, nil, nil, &collectionID, nil, false) |
There was a problem hiding this comment.
is this an indexed lookup, is that a concern if not?
| return outgoing, nil | ||
| } | ||
|
|
||
| outgoing, err := g.coordinator.catalog.metaDomain.AttachedFunctionDb(g.ctx).GetAttachedFunctions(nil, nil, &collectionID, nil, nil, false) |
There was a problem hiding this comment.
is this an indexed lookup, is that a concern if not?
|
|
||
| // Rebuild the graph under locks before validating/inserting. | ||
| graphState, err = s.buildAttachFunctionGraph(txCtx, req.InputCollectionId, existingOutputCollectionID, req.Database) | ||
| if err != nil { |
There was a problem hiding this comment.
didnt detect that it changed??
| return err | ||
| } | ||
|
|
||
| // Rebuild the graph under locks before validating/inserting. |
There was a problem hiding this comment.
seems like we just lock and rebuild?
| } | ||
|
|
||
| // Check if input collection is being used as an output collection by any attached function | ||
| inputCollectionIDStr := req.InputCollectionId |
There was a problem hiding this comment.
would appreciate some commentary here
| functionIDs := make([]uuid.UUID, 0, len(attachedFunctionsUsingAsOutput)) | ||
| seenFunctionIDs := make(map[uuid.UUID]struct{}, len(attachedFunctionsUsingAsOutput)) | ||
| for _, attachedFunction := range attachedFunctionsUsingAsOutput { | ||
| if _, ok := seenFunctionIDs[attachedFunction.FunctionID]; ok { |
There was a problem hiding this comment.
more commenting in this block, it's a bit unreadable
| inputCollectionIDStr := req.InputCollectionId | ||
| attachedFunctionsUsingAsOutput, err := s.catalog.metaDomain.AttachedFunctionDb(txCtx).GetAttachedFunctions(nil, nil, nil, &inputCollectionIDStr, nil, false) | ||
| // Check if output collection already exists so we can materialize and then lock the full graph in a stable order. | ||
| existingOutputCollections, err := s.catalog.metaDomain.CollectionDb(txCtx).GetCollections(nil, &req.OutputCollectionName, req.TenantId, req.Database, nil, nil, false) |
| inputCollectionIDStr := req.InputCollectionId | ||
| attachedFunctionsUsingAsOutput, err := s.catalog.metaDomain.AttachedFunctionDb(txCtx).GetAttachedFunctions(nil, nil, nil, &inputCollectionIDStr, nil, false) | ||
| // Check if output collection already exists so we can materialize and then lock the full graph in a stable order. | ||
| existingOutputCollections, err := s.catalog.metaDomain.CollectionDb(txCtx).GetCollections(nil, &req.OutputCollectionName, req.TenantId, req.Database, nil, nil, false) |
There was a problem hiding this comment.
plurality is confusing
|
|
||
| outputTailDepth := 0 | ||
| if len(existingOutputCollections) > 0 { | ||
| outputTailDepth, err = graphState.collectionTailDepth(existingOutputCollectionID, map[string]int{}, map[string]struct{}{}) |
There was a problem hiding this comment.
should be in 547's if block
| } | ||
|
|
||
| outputTailDepth := 0 | ||
| if len(existingOutputCollections) > 0 { |
There was a problem hiding this comment.
seems like it can be collapes with reaches check block above
cd63962 to
f147dab
Compare
This comment has been minimized.
This comment has been minimized.
2223dbb to
ea10072
Compare

Description of changes
This PR allows "cascading" (chained) functions: attaching a function whose input collection is itself the output of another function. Previously this was unconditionally blocked. Now it's allowed only when every upstream function writing to that collection is async. Sync upstream functions still block the attachment.
Two additional safety checks are also introduced:
maxAttachedFunctionDepth = 5Improvements & Bug fixes
New functionality
Test plan
How are these changes tested?
test_task_api.py has been adjusted for ths PR. Tests have been added and adjusted in that file.
pytestfor python,yarn testfor js,cargo testfor rustMigration plan
Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?
Observability plan
What is the plan to instrument and monitor this change?
Documentation Changes
Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the _docs section?_