diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 290a83fb70..194bd0deb3 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -847,25 +847,28 @@ impl MergeInsertJob { &self, source: SendableRecordBatchStream, ) -> Result { - // We need to do a full index scan if we're deleting source data - let can_use_scalar_index = matches!( - self.params.delete_not_matched_by_source, // this value marks behavior for rows in target that are not matched by the source. Value assigned earlier. - WhenNotMatchedBySource::Keep - ) && self.params.use_index; - - if can_use_scalar_index { + if self.params.use_index + && matches!( + self.params.delete_not_matched_by_source, + WhenNotMatchedBySource::Keep + ) + { // keeping unmatched rows, no deletion if let Some(index) = self.join_key_as_scalar_index().await? { - self.create_indexed_scan_joined_stream(source, index).await - } else { - self.create_full_table_joined_stream(source).await + return self.create_indexed_scan_joined_stream(source, index).await; } - } else { + } + + if !matches!( + self.params.delete_not_matched_by_source, + WhenNotMatchedBySource::Keep + ) { info!( "The merge insert operation is configured to delete rows from the target table, this requires a potentially costly full table scan" ); - self.create_full_table_joined_stream(source).await } + + self.create_full_table_joined_stream(source).await } async fn update_fragments( @@ -1456,7 +1459,7 @@ impl MergeInsertJob { /// /// The fast path is only available for specific conditions: /// - when_matched is UpdateAll or UpdateIf or Fail - /// - Either use_index is false OR there's no scalar index on join key + /// - The execution will not use the legacy scalar-index join path /// - Source schema matches dataset schema exactly /// - when_not_matched_by_source is Keep, Delete, or DeleteIf async fn can_use_create_plan(&self, source_schema: &Schema) -> Result { @@ -1475,7 +1478,15 @@ impl MergeInsertJob { }, ); - let has_scalar_index = self.join_key_as_scalar_index().await?.is_some(); + let would_use_scalar_index = if self.params.use_index + && matches!( + self.params.delete_not_matched_by_source, + WhenNotMatchedBySource::Keep + ) { + self.join_key_as_scalar_index().await?.is_some() + } else { + false + }; // Check if this is a delete-only operation (no update/insert writes needed from source) // For delete-only, we don't need the full source schema, just key columns for matching @@ -1499,7 +1510,7 @@ impl MergeInsertJob { | WhenMatched::UpdateIf(_) | WhenMatched::Fail | WhenMatched::Delete - ) && (!self.params.use_index || !has_scalar_index) + ) && !would_use_scalar_index && schema_ok && matches!( self.params.delete_not_matched_by_source, @@ -5419,6 +5430,76 @@ MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_n ); } + #[tokio::test] + async fn test_explain_plan_full_schema_delete_by_source_with_fsl_and_scalar_index() { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new( + "vec", + DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 4), + true, + ), + ])); + + let dataset_batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new( + FixedSizeListArray::try_new_from_values( + Float32Array::from(vec![ + 1.0, 1.1, 1.2, 1.3, 2.0, 2.1, 2.2, 2.3, 3.0, 3.1, 3.2, 3.3, + ]), + 4, + ) + .unwrap(), + ), + ], + ) + .unwrap(); + + let mut dataset = Dataset::write( + Box::new(RecordBatchIterator::new( + [Ok(dataset_batch)], + schema.clone(), + )), + "memory://test_explain_plan_full_schema_delete_by_source_with_fsl_and_scalar_index", + None, + ) + .await + .unwrap(); + + let scalar_params = ScalarIndexParams::default(); + dataset + .create_index(&["id"], IndexType::Scalar, None, &scalar_params, false) + .await + .unwrap(); + + let merge_insert_job = + MergeInsertBuilder::try_new(Arc::new(dataset), vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::UpdateAll) + .when_not_matched(WhenNotMatched::InsertAll) + .when_not_matched_by_source(WhenNotMatchedBySource::Delete) + .try_build() + .unwrap(); + + let plan = merge_insert_job.explain_plan(None, false).await.unwrap(); + assert!(plan.contains("HashJoinExec")); + assert!(plan.contains("join_type=Full")); + assert!(plan.contains("projection=[_rowid")); + assert!( + plan.contains("LanceRead: uri=") && plan.contains("projection=[id]"), + "target-side scan should prune the FSL payload from the join build side even when a scalar index exists: {plan}" + ); + assert!( + !plan.contains( + "LanceRead: uri=test_explain_plan_full_schema_delete_by_source_with_fsl_and_scalar_index/data, projection=[id, vec]" + ), + "target-side scan should not include the FSL payload in the join build side: {plan}" + ); + } + #[tokio::test] async fn test_merge_insert_full_schema_delete_by_source_with_fsl() { let schema = Arc::new(Schema::new(vec![ @@ -5505,6 +5586,98 @@ MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_n assert_eq!(actual, vec![20.0, 20.1, 20.2, 20.3, 40.0, 40.1, 40.2, 40.3]); } + #[tokio::test] + async fn test_merge_insert_full_schema_delete_by_source_with_fsl_and_scalar_index() { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new( + "vec", + DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 4), + true, + ), + ])); + + let dataset_batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new( + FixedSizeListArray::try_new_from_values( + Float32Array::from(vec![ + 1.0, 1.1, 1.2, 1.3, 2.0, 2.1, 2.2, 2.3, 3.0, 3.1, 3.2, 3.3, + ]), + 4, + ) + .unwrap(), + ), + ], + ) + .unwrap(); + + let mut dataset = Dataset::write( + Box::new(RecordBatchIterator::new( + [Ok(dataset_batch)], + schema.clone(), + )), + "memory://test_merge_insert_full_schema_delete_by_source_with_fsl_and_scalar_index", + None, + ) + .await + .unwrap(); + + let scalar_params = ScalarIndexParams::default(); + dataset + .create_index(&["id"], IndexType::Scalar, None, &scalar_params, false) + .await + .unwrap(); + + let source_batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![2, 4])), + Arc::new( + FixedSizeListArray::try_new_from_values( + Float32Array::from(vec![20.0, 20.1, 20.2, 20.3, 40.0, 40.1, 40.2, 40.3]), + 4, + ) + .unwrap(), + ), + ], + ) + .unwrap(); + + let (merged_dataset, stats) = + MergeInsertBuilder::try_new(Arc::new(dataset), vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::UpdateAll) + .when_not_matched(WhenNotMatched::InsertAll) + .when_not_matched_by_source(WhenNotMatchedBySource::Delete) + .try_build() + .unwrap() + .execute_reader(Box::new(RecordBatchIterator::new( + [Ok(source_batch)], + schema.clone(), + ))) + .await + .unwrap(); + + assert_eq!(stats.num_deleted_rows, 2); + assert_eq!(stats.num_updated_rows, 1); + assert_eq!(stats.num_inserted_rows, 1); + + let merged = merged_dataset.scan().try_into_batch().await.unwrap(); + let ids = merged["id"].as_primitive::().values().to_vec(); + assert_eq!(ids, vec![2, 4]); + + let vecs = merged["vec"].as_fixed_size_list(); + let actual = vecs + .values() + .as_primitive::() + .values() + .to_vec(); + assert_eq!(actual, vec![20.0, 20.1, 20.2, 20.3, 40.0, 40.1, 40.2, 40.3]); + } + #[tokio::test] async fn test_analyze_plan() { // Set up test data using lance_datagen