Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/lance/src/dataset/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//! [Transaction Specification](https://lance.org/format/table/transaction/#transaction-types).

use super::ManifestWriteConfig;
use super::write::merge_insert::inserted_rows::KeyExistenceFilter;
use super::write::key_existence_filter::KeyExistenceFilter;
use crate::dataset::transaction::UpdateMode::RewriteRows;
use crate::index::mem_wal::update_mem_wal_index_merged_generations;
use crate::utils::temporal::timestamp_to_nanos;
Expand Down
56 changes: 55 additions & 1 deletion rust/lance/src/dataset/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use lance_table::format::{BasePath, DataFile, Fragment};
use lance_table::io::commit::{CommitHandler, commit_handler_from_url};
use lance_table::io::manifest::ManifestDescribing;
use object_store::path::Path;
use std::collections::{HashMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::num::NonZero;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
Expand All @@ -48,6 +48,7 @@ use super::utils::SchemaAdapter;
mod commit;
pub mod delete;
mod insert;
pub mod key_existence_filter;
pub mod merge_insert;
mod retry;
pub mod update;
Expand All @@ -57,6 +58,59 @@ pub use commit::CommitBuilder;
pub use delete::{DeleteBuilder, DeleteResult};
pub use insert::InsertBuilder;

use roaring::RoaringTreemap;

/// Apply deletions to fragments in the dataset.
///
/// Concurrently iterates over all fragments in the dataset based on `removed_row_addrs`
/// (row addresses in RoaringTreemap format), applies `extend_deletions` to matching fragments,
/// and returns the modified fragments along with the IDs of fully deleted fragments.
pub async fn apply_deletions(
dataset: &Dataset,
removed_row_addrs: &RoaringTreemap,
) -> Result<(Vec<Fragment>, Vec<u64>)> {
let bitmaps = Arc::new(removed_row_addrs.bitmaps().collect::<BTreeMap<_, _>>());

enum FragmentChange {
Unchanged,
Modified(Box<Fragment>),
Removed(u64),
}

let mut updated_fragments = Vec::new();
let mut removed_fragments = Vec::new();

let mut stream = futures::stream::iter(dataset.get_fragments())
.map(move |fragment| {
let bitmaps_ref = bitmaps.clone();
async move {
let fragment_id = fragment.id();
if let Some(bitmap) = bitmaps_ref.get(&(fragment_id as u32)) {
match fragment.extend_deletions(*bitmap).await {
Ok(Some(new_fragment)) => {
Ok(FragmentChange::Modified(Box::new(new_fragment.metadata)))
}
Ok(None) => Ok(FragmentChange::Removed(fragment_id as u64)),
Err(e) => Err(e),
}
} else {
Ok(FragmentChange::Unchanged)
}
}
})
.buffer_unordered(dataset.object_store.io_parallelism());

while let Some(res) = stream.next().await.transpose()? {
match res {
FragmentChange::Unchanged => {}
FragmentChange::Modified(fragment) => updated_fragments.push(*fragment),
FragmentChange::Removed(fragment_id) => removed_fragments.push(fragment_id),
}
}

Ok((updated_fragments, removed_fragments))
}

/// The destination to write data to.
#[derive(Debug, Clone)]
pub enum WriteDestination<'a> {
Expand Down
54 changes: 2 additions & 52 deletions rust/lance/src/dataset/write/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,15 @@ use crate::{
};
use datafusion::logical_expr::Expr;
use datafusion::scalar::ScalarValue;
use futures::{StreamExt, TryStreamExt};
use futures::TryStreamExt;
use lance_core::utils::mask::RowAddrTreeMap;
use lance_core::{Error, ROW_ID, Result};
use lance_table::format::Fragment;
use roaring::RoaringTreemap;
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;

use super::CommitBuilder;
use super::apply_deletions;
use super::retry::{RetryConfig, RetryExecutor, execute_with_retry};

/// Result of a delete operation.
Expand All @@ -31,55 +30,6 @@ pub struct DeleteResult {
pub num_deleted_rows: u64,
}

/// Apply deletions to fragments based on a RoaringTreemap of row IDs.
///
/// Returns the set of modified fragments and removed fragments, if any.
async fn apply_deletions(
dataset: &Dataset,
removed_row_addrs: &RoaringTreemap,
) -> Result<(Vec<Fragment>, Vec<u64>)> {
let bitmaps = Arc::new(removed_row_addrs.bitmaps().collect::<BTreeMap<_, _>>());

enum FragmentChange {
Unchanged,
Modified(Box<Fragment>),
Removed(u64),
}

let mut updated_fragments = Vec::new();
let mut removed_fragments = Vec::new();

let mut stream = futures::stream::iter(dataset.get_fragments())
.map(move |fragment| {
let bitmaps_ref = bitmaps.clone();
async move {
let fragment_id = fragment.id();
if let Some(bitmap) = bitmaps_ref.get(&(fragment_id as u32)) {
match fragment.extend_deletions(*bitmap).await {
Ok(Some(new_fragment)) => {
Ok(FragmentChange::Modified(Box::new(new_fragment.metadata)))
}
Ok(None) => Ok(FragmentChange::Removed(fragment_id as u64)),
Err(e) => Err(e),
}
} else {
Ok(FragmentChange::Unchanged)
}
}
})
.buffer_unordered(dataset.object_store.io_parallelism());

while let Some(res) = stream.next().await.transpose()? {
match res {
FragmentChange::Unchanged => {}
FragmentChange::Modified(fragment) => updated_fragments.push(*fragment),
FragmentChange::Removed(fragment_id) => removed_fragments.push(fragment_id),
}
}

Ok((updated_fragments, removed_fragments))
}

/// Builder for configuring delete operations with retry support
///
/// This operation is similar to SQL's DELETE statement. It allows you to remove
Expand Down
57 changes: 4 additions & 53 deletions rust/lance/src/dataset/write/merge_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
// Internal column name for the merge action. Using "__action" to avoid collisions with user columns.
const MERGE_ACTION_COLUMN: &str = "__action";

pub mod inserted_rows;

use super::key_existence_filter::KeyExistenceFilter;
use assign_action::merge_insert_action;
use inserted_rows::KeyExistenceFilter;

use super::retry::{RetryConfig, RetryExecutor, execute_with_retry};
use super::{CommitBuilder, WriteParams, write_fragments_internal};
Expand Down Expand Up @@ -102,7 +100,7 @@ use log::info;
use roaring::RoaringTreemap;
use snafu::ResultExt;
use std::{
collections::{BTreeMap, HashSet},
collections::HashSet,
sync::{
Arc, Mutex,
atomic::{AtomicU32, Ordering},
Expand Down Expand Up @@ -1637,7 +1635,7 @@ impl MergeInsertJob {
let removed_row_addrs = RoaringTreemap::from_iter(removed_row_addr_vec.into_iter());

let (old_fragments, removed_fragment_ids) =
Self::apply_deletions(&self.dataset, &removed_row_addrs).await?;
super::apply_deletions(&self.dataset, &removed_row_addrs).await?;

// Commit updated and new fragments
let operation = Operation::Update {
Expand Down Expand Up @@ -1676,53 +1674,6 @@ impl MergeInsertJob {
})
}

// Delete a batch of rows by id, returns the fragments modified and the fragments removed
async fn apply_deletions(
dataset: &Dataset,
removed_row_ids: &RoaringTreemap,
) -> Result<(Vec<Fragment>, Vec<u64>)> {
let bitmaps = Arc::new(removed_row_ids.bitmaps().collect::<BTreeMap<_, _>>());

enum FragmentChange {
Unchanged,
Modified(Box<Fragment>),
Removed(u64),
}

let mut updated_fragments = Vec::new();
let mut removed_fragments = Vec::new();

let mut stream = futures::stream::iter(dataset.get_fragments())
.map(move |fragment| {
let bitmaps_ref = bitmaps.clone();
async move {
let fragment_id = fragment.id();
if let Some(bitmap) = bitmaps_ref.get(&(fragment_id as u32)) {
match fragment.extend_deletions(*bitmap).await {
Ok(Some(new_fragment)) => {
Ok(FragmentChange::Modified(Box::new(new_fragment.metadata)))
}
Ok(None) => Ok(FragmentChange::Removed(fragment_id as u64)),
Err(e) => Err(e),
}
} else {
Ok(FragmentChange::Unchanged)
}
}
})
.buffer_unordered(dataset.object_store.io_parallelism());

while let Some(res) = stream.next().await.transpose()? {
match res {
FragmentChange::Unchanged => {}
FragmentChange::Modified(fragment) => updated_fragments.push(*fragment),
FragmentChange::Removed(fragment_id) => removed_fragments.push(fragment_id),
}
}

Ok((updated_fragments, removed_fragments))
}

/// Generate the execution plan and return it as a formatted string for debugging.
///
/// This method takes an optional schema representing the source data and calls `create_plan()`
Expand Down Expand Up @@ -2223,7 +2174,7 @@ impl Merger {
mod tests {
use super::*;
use crate::dataset::scanner::ColumnOrdering;
use crate::dataset::write::merge_insert::inserted_rows::{
use crate::dataset::write::key_existence_filter::{
KeyExistenceFilter, KeyExistenceFilterBuilder, extract_key_value_from_batch,
};
use crate::index::vector::VectorIndexParams;
Expand Down
56 changes: 3 additions & 53 deletions rust/lance/src/dataset/write/merge_insert/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,15 @@
mod delete;
mod write;

use std::collections::BTreeMap;
use std::sync::Arc;

use datafusion::physical_plan::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder};
use futures::StreamExt;
use lance_table::format::Fragment;
use roaring::RoaringTreemap;

pub use delete::DeleteOnlyMergeInsertExec;
pub use write::FullSchemaMergeInsertExec;

use super::MergeStats;
use crate::Dataset;

// Re-export apply_deletions from the parent write module for use by exec submodules
pub(super) use crate::dataset::write::apply_deletions;

pub(super) struct MergeInsertMetrics {
pub num_inserted_rows: Count,
Expand Down Expand Up @@ -60,49 +56,3 @@ impl MergeInsertMetrics {
}
}
}

pub(super) async fn apply_deletions(
dataset: &Dataset,
removed_row_addrs: &RoaringTreemap,
) -> crate::Result<(Vec<Fragment>, Vec<u64>)> {
let bitmaps = Arc::new(removed_row_addrs.bitmaps().collect::<BTreeMap<_, _>>());

enum FragmentChange {
Unchanged,
Modified(Box<Fragment>),
Removed(u64),
}

let mut updated_fragments = Vec::new();
let mut removed_fragments = Vec::new();

let mut stream = futures::stream::iter(dataset.get_fragments())
.map(move |fragment| {
let bitmaps_ref = bitmaps.clone();
async move {
let fragment_id = fragment.id();
if let Some(bitmap) = bitmaps_ref.get(&(fragment_id as u32)) {
match fragment.extend_deletions(*bitmap).await {
Ok(Some(new_fragment)) => {
Ok(FragmentChange::Modified(Box::new(new_fragment.metadata)))
}
Ok(None) => Ok(FragmentChange::Removed(fragment_id as u64)),
Err(e) => Err(e),
}
} else {
Ok(FragmentChange::Unchanged)
}
}
})
.buffer_unordered(dataset.object_store.io_parallelism());

while let Some(res) = stream.next().await.transpose()? {
match res {
FragmentChange::Unchanged => {}
FragmentChange::Modified(fragment) => updated_fragments.push(*fragment),
FragmentChange::Removed(fragment_id) => removed_fragments.push(fragment_id),
}
}

Ok((updated_fragments, removed_fragments))
}
2 changes: 1 addition & 1 deletion rust/lance/src/dataset/write/merge_insert/exec/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use roaring::RoaringTreemap;

use crate::dataset::transaction::UpdateMode::RewriteRows;
use crate::dataset::utils::CapturedRowIds;
use crate::dataset::write::merge_insert::inserted_rows::{
use crate::dataset::write::key_existence_filter::{
KeyExistenceFilter, KeyExistenceFilterBuilder, extract_key_value_from_batch,
};
use crate::dataset::write::merge_insert::{
Expand Down
Loading
Loading