Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 188 additions & 15 deletions rust/lance/src/dataset/write/merge_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -847,25 +847,28 @@ impl MergeInsertJob {
&self,
source: SendableRecordBatchStream,
) -> Result<SendableRecordBatchStream> {
// We need to do a full index scan if we're deleting source data
let can_use_scalar_index = matches!(
self.params.delete_not_matched_by_source, // this value marks behavior for rows in target that are not matched by the source. Value assigned earlier.
WhenNotMatchedBySource::Keep
) && self.params.use_index;

if can_use_scalar_index {
if self.params.use_index
&& matches!(
self.params.delete_not_matched_by_source,
WhenNotMatchedBySource::Keep
)
{
// keeping unmatched rows, no deletion
if let Some(index) = self.join_key_as_scalar_index().await? {
self.create_indexed_scan_joined_stream(source, index).await
} else {
self.create_full_table_joined_stream(source).await
return self.create_indexed_scan_joined_stream(source, index).await;
}
} else {
}

if !matches!(
self.params.delete_not_matched_by_source,
WhenNotMatchedBySource::Keep
) {
info!(
"The merge insert operation is configured to delete rows from the target table, this requires a potentially costly full table scan"
);
self.create_full_table_joined_stream(source).await
}

self.create_full_table_joined_stream(source).await
}

async fn update_fragments(
Expand Down Expand Up @@ -1456,7 +1459,7 @@ impl MergeInsertJob {
///
/// The fast path is only available for specific conditions:
/// - when_matched is UpdateAll or UpdateIf or Fail
/// - Either use_index is false OR there's no scalar index on join key
/// - The execution will not use the legacy scalar-index join path
/// - Source schema matches dataset schema exactly
/// - when_not_matched_by_source is Keep, Delete, or DeleteIf
async fn can_use_create_plan(&self, source_schema: &Schema) -> Result<bool> {
Expand All @@ -1475,7 +1478,15 @@ impl MergeInsertJob {
},
);

let has_scalar_index = self.join_key_as_scalar_index().await?.is_some();
let would_use_scalar_index = if self.params.use_index
&& matches!(
self.params.delete_not_matched_by_source,
WhenNotMatchedBySource::Keep
) {
self.join_key_as_scalar_index().await?.is_some()
} else {
false
};

// Check if this is a delete-only operation (no update/insert writes needed from source)
// For delete-only, we don't need the full source schema, just key columns for matching
Expand All @@ -1499,7 +1510,7 @@ impl MergeInsertJob {
| WhenMatched::UpdateIf(_)
| WhenMatched::Fail
| WhenMatched::Delete
) && (!self.params.use_index || !has_scalar_index)
) && !would_use_scalar_index
&& schema_ok
&& matches!(
self.params.delete_not_matched_by_source,
Expand Down Expand Up @@ -5419,6 +5430,76 @@ MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_n
);
}

#[tokio::test]
async fn test_explain_plan_full_schema_delete_by_source_with_fsl_and_scalar_index() {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new(
"vec",
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 4),
true,
),
]));

let dataset_batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(
FixedSizeListArray::try_new_from_values(
Float32Array::from(vec![
1.0, 1.1, 1.2, 1.3, 2.0, 2.1, 2.2, 2.3, 3.0, 3.1, 3.2, 3.3,
]),
4,
)
.unwrap(),
),
],
)
.unwrap();

let mut dataset = Dataset::write(
Box::new(RecordBatchIterator::new(
[Ok(dataset_batch)],
schema.clone(),
)),
"memory://test_explain_plan_full_schema_delete_by_source_with_fsl_and_scalar_index",
None,
)
.await
.unwrap();

let scalar_params = ScalarIndexParams::default();
dataset
.create_index(&["id"], IndexType::Scalar, None, &scalar_params, false)
.await
.unwrap();

let merge_insert_job =
MergeInsertBuilder::try_new(Arc::new(dataset), vec!["id".to_string()])
.unwrap()
.when_matched(WhenMatched::UpdateAll)
.when_not_matched(WhenNotMatched::InsertAll)
.when_not_matched_by_source(WhenNotMatchedBySource::Delete)
.try_build()
.unwrap();

let plan = merge_insert_job.explain_plan(None, false).await.unwrap();
assert!(plan.contains("HashJoinExec"));
assert!(plan.contains("join_type=Full"));
assert!(plan.contains("projection=[_rowid"));
assert!(
plan.contains("LanceRead: uri=") && plan.contains("projection=[id]"),
"target-side scan should prune the FSL payload from the join build side even when a scalar index exists: {plan}"
);
assert!(
!plan.contains(
"LanceRead: uri=test_explain_plan_full_schema_delete_by_source_with_fsl_and_scalar_index/data, projection=[id, vec]"
),
"target-side scan should not include the FSL payload in the join build side: {plan}"
);
}

#[tokio::test]
async fn test_merge_insert_full_schema_delete_by_source_with_fsl() {
let schema = Arc::new(Schema::new(vec![
Expand Down Expand Up @@ -5505,6 +5586,98 @@ MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_n
assert_eq!(actual, vec![20.0, 20.1, 20.2, 20.3, 40.0, 40.1, 40.2, 40.3]);
}

#[tokio::test]
async fn test_merge_insert_full_schema_delete_by_source_with_fsl_and_scalar_index() {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new(
"vec",
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 4),
true,
),
]));

let dataset_batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(
FixedSizeListArray::try_new_from_values(
Float32Array::from(vec![
1.0, 1.1, 1.2, 1.3, 2.0, 2.1, 2.2, 2.3, 3.0, 3.1, 3.2, 3.3,
]),
4,
)
.unwrap(),
),
],
)
.unwrap();

let mut dataset = Dataset::write(
Box::new(RecordBatchIterator::new(
[Ok(dataset_batch)],
schema.clone(),
)),
"memory://test_merge_insert_full_schema_delete_by_source_with_fsl_and_scalar_index",
None,
)
.await
.unwrap();

let scalar_params = ScalarIndexParams::default();
dataset
.create_index(&["id"], IndexType::Scalar, None, &scalar_params, false)
.await
.unwrap();

let source_batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![2, 4])),
Arc::new(
FixedSizeListArray::try_new_from_values(
Float32Array::from(vec![20.0, 20.1, 20.2, 20.3, 40.0, 40.1, 40.2, 40.3]),
4,
)
.unwrap(),
),
],
)
.unwrap();

let (merged_dataset, stats) =
MergeInsertBuilder::try_new(Arc::new(dataset), vec!["id".to_string()])
.unwrap()
.when_matched(WhenMatched::UpdateAll)
.when_not_matched(WhenNotMatched::InsertAll)
.when_not_matched_by_source(WhenNotMatchedBySource::Delete)
.try_build()
.unwrap()
.execute_reader(Box::new(RecordBatchIterator::new(
[Ok(source_batch)],
schema.clone(),
)))
.await
.unwrap();

assert_eq!(stats.num_deleted_rows, 2);
assert_eq!(stats.num_updated_rows, 1);
assert_eq!(stats.num_inserted_rows, 1);

let merged = merged_dataset.scan().try_into_batch().await.unwrap();
let ids = merged["id"].as_primitive::<Int32Type>().values().to_vec();
assert_eq!(ids, vec![2, 4]);

let vecs = merged["vec"].as_fixed_size_list();
let actual = vecs
.values()
.as_primitive::<Float32Type>()
.values()
.to_vec();
assert_eq!(actual, vec![20.0, 20.1, 20.2, 20.3, 40.0, 40.1, 40.2, 40.3]);
}

#[tokio::test]
async fn test_analyze_plan() {
// Set up test data using lance_datagen
Expand Down
Loading