diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index aa56d54de9..332509633c 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -772,7 +772,10 @@ pub fn inner_commit_append<'local>( for f in fragment_objs { fragments.push(f.extract_object(env)?); } - let op = Operation::Append { fragments }; + let op = Operation::Append { + fragments, + row_ids: None, + }; let path_str = path.extract(env)?; let read_version = env.get_u64_opt(&read_version_obj)?; let storage_options = extract_storage_options(env, &storage_options_obj)?; diff --git a/java/lance-jni/src/transaction.rs b/java/lance-jni/src/transaction.rs index d46140020f..9c95a51cf8 100644 --- a/java/lance-jni/src/transaction.rs +++ b/java/lance-jni/src/transaction.rs @@ -351,6 +351,7 @@ fn convert_to_java_operation_inner<'local>( match operation { Operation::Append { fragments: rust_fragments, + .. } => { let java_fragments = export_vec(env, &rust_fragments)?; @@ -1090,7 +1091,10 @@ fn convert_to_rust_operation( import_vec_from_method(env, java_operation, "fragments", |env, fragment| { fragment.extract_object(env) })?; - Operation::Append { fragments } + Operation::Append { + fragments, + row_ids: None, + } } "Delete" => { let updated_fragments: Vec = import_vec_from_method( diff --git a/protos/table.proto b/protos/table.proto index e73d22b6b9..28b186f26a 100644 --- a/protos/table.proto +++ b/protos/table.proto @@ -205,6 +205,7 @@ message Manifest { // The branch of the dataset. None means main branch. optional string branch = 20; + } // Manifest // external dataset base path @@ -690,4 +691,3 @@ message ShardField { // Transform parameters (e.g., num_buckets for bucket transform). map parameters = 6; } - diff --git a/protos/transaction.proto b/protos/transaction.proto index 06268feb25..a0f8c8e9ff 100644 --- a/protos/transaction.proto +++ b/protos/transaction.proto @@ -35,10 +35,22 @@ message Transaction { // Add new rows to the dataset. message Append { + message ReservedRowIds { + uint64 start_row_id = 1; + uint64 num_rows = 2; + } + // The new fragments to append. // // Fragment IDs are not yet assigned. repeated DataFragment fragments = 1; + // Stable row ids to assign to the new rows, if pre-reserved. + // + // The requested range must be contained within the row ids reserved by the + // dataset version identified by read_version. Multiple append operations + // may consume disjoint sub-ranges from the same reservation as long as + // they continue to reference that reserve version. + optional ReservedRowIds row_ids = 2; } // Mark rows as deleted. @@ -164,6 +176,11 @@ message Transaction { uint32 num_fragments = 1; } + // An operation that reserves stable row ids for a future append. + message ReserveRowIds { + uint64 num_rows = 1; + } + // An operation that clones a dataset. message Clone { // - true: Performs a metadata-only clone (copies manifest without data files). @@ -339,6 +356,7 @@ message Transaction { UpdateMemWalState update_mem_wal_state = 112; Clone clone = 113; UpdateBases update_bases = 114; + ReserveRowIds reserve_row_ids = 115; } // Fields 200/202 (`blob_append` / `blob_overwrite`) previously represented blob dataset ops. diff --git a/python/src/transaction.rs b/python/src/transaction.rs index eae5b49a15..eff436dfed 100644 --- a/python/src/transaction.rs +++ b/python/src/transaction.rs @@ -250,7 +250,10 @@ impl FromPyObject<'_> for PyLance { } "Append" => { let fragments = extract_vec(&ob.getattr("fragments")?)?; - let op = Operation::Append { fragments }; + let op = Operation::Append { + fragments, + row_ids: None, + }; Ok(Self(op)) } "Delete" => { @@ -403,7 +406,7 @@ impl<'py> IntoPyObject<'py> for PyLance<&Operation> { .expect("Failed to import LanceOperation namespace"); match self.0 { - Operation::Append { fragments } => { + Operation::Append { fragments, .. } => { let fragments = export_vec(py, fragments.as_slice())?; let cls = namespace .getattr("Append") diff --git a/rust/lance-table/src/format.rs b/rust/lance-table/src/format.rs index 842c76f1e5..c2c3fc34c1 100644 --- a/rust/lance-table/src/format.rs +++ b/rust/lance-table/src/format.rs @@ -16,8 +16,8 @@ pub use fragment::*; pub use index::{IndexFile, IndexMetadata, index_metadata_codec, list_index_files_with_sizes}; pub use manifest::{ - BasePath, DETACHED_VERSION_MASK, DataStorageFormat, Manifest, SelfDescribingFileReader, - WriterVersion, is_detached_version, + BasePath, DETACHED_VERSION_MASK, DataStorageFormat, Manifest, ReservedRowIds, + SelfDescribingFileReader, WriterVersion, is_detached_version, }; pub use transaction::Transaction; diff --git a/rust/lance-table/src/format/manifest.rs b/rust/lance-table/src/format/manifest.rs index 71de80c547..b20714c753 100644 --- a/rust/lance-table/src/format/manifest.rs +++ b/rust/lance-table/src/format/manifest.rs @@ -24,6 +24,14 @@ use lance_core::{Error, Result}; use lance_io::object_store::{ObjectStore, ObjectStoreRegistry}; use lance_io::utils::read_struct; +#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)] +pub struct ReservedRowIds { + /// The first stable row id in the reserved range. + pub start_row_id: u64, + /// The number of stable row ids in the reserved range. + pub num_rows: u64, +} + /// Manifest of a dataset /// /// * Schema @@ -82,7 +90,7 @@ pub struct Manifest { /// accelerating the fragment search using offset ranges. fragment_offsets: Vec, - /// The max row id used so far. + /// The next row id that will be allocated. pub next_row_id: u64, /// The storage format of the data files. @@ -233,6 +241,8 @@ impl Manifest { /// - Any persistent storage operations /// - Modifications to the original data /// - If the shallow clone is for branch, ref_name is the source branch + /// + /// The clone keeps the current `next_row_id` progress. pub fn shallow_clone( &self, ref_name: Option, @@ -839,6 +849,24 @@ impl From for pb::BasePath { } } +impl From for ReservedRowIds { + fn from(value: pb::transaction::append::ReservedRowIds) -> Self { + Self { + start_row_id: value.start_row_id, + num_rows: value.num_rows, + } + } +} + +impl From<&ReservedRowIds> for pb::transaction::append::ReservedRowIds { + fn from(value: &ReservedRowIds) -> Self { + Self { + start_row_id: value.start_row_id, + num_rows: value.num_rows, + } + } +} + impl TryFrom for Manifest { type Error = Error; diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 8ef2c79a67..4b1ff34f63 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -116,6 +116,7 @@ use lance_core::box_error; use lance_index::scalar::lance_format::LanceIndexStore; use lance_namespace::models::{DeclareTableRequest, DescribeTableRequest}; use lance_table::feature_flags::{apply_feature_flags, can_read_dataset}; +pub use lance_table::format::ReservedRowIds; use lance_table::io::deletion::{DELETIONS_DIR, relative_deletion_file_path}; pub use schema_evolution::{ BatchInfo, BatchUDF, ColumnAlteration, NewColumnTransform, UDFCheckpointStore, @@ -405,6 +406,42 @@ impl From for ProjectionRequest { } impl Dataset { + /// Returns the stable row-id range reserved by the current dataset version, + /// if any. + /// + /// This only reports the reservation created by the current version's + /// `ReserveRowIds` transaction. It does not scan older versions or expose + /// a global list of outstanding reservations. + /// + /// In other words, this returns `Some(...)` only when the current dataset + /// version itself was produced by `ReserveRowIds`; otherwise it returns + /// `None`. + pub async fn reserved_row_ids(&self) -> Result> { + let Some(transaction) = self.read_transaction().await? else { + return Ok(None); + }; + + let Operation::ReserveRowIds { num_rows } = transaction.operation else { + return Ok(None); + }; + + let start_row_id = self + .manifest + .next_row_id + .checked_sub(num_rows) + .ok_or_else(|| { + Error::internal(format!( + "Manifest next_row_id={} is smaller than reserved num_rows={}", + self.manifest.next_row_id, num_rows + )) + })?; + + Ok(Some(ReservedRowIds { + start_row_id, + num_rows, + })) + } + /// Open an existing dataset. /// /// See also [DatasetBuilder]. diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index 986fe8a844..1c668b507f 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -3894,6 +3894,7 @@ mod tests { let op = Operation::Append { fragments: vec![frag], + row_ids: None, }; let dataset = Dataset::commit( &dataset.uri, diff --git a/rust/lance/src/dataset/rowids.rs b/rust/lance/src/dataset/rowids.rs index d40a9adfd5..610a623659 100644 --- a/rust/lance/src/dataset/rowids.rs +++ b/rust/lance/src/dataset/rowids.rs @@ -133,7 +133,11 @@ async fn load_row_id_index(dataset: &Dataset) -> Result() + .values() + .iter() + .copied() + .collect::>(); + assert_eq!(row_ids, vec![2, 3, 4, 5]); + + let invalid_reuse = InsertBuilder::new(Arc::new(dataset.clone())) + .with_row_ids(requested) + .with_params(&append_params) + .execute_uncommitted(vec![sequence_batch(200..202)]) + .await + .unwrap(); + let err = CommitBuilder::new(Arc::new(dataset)) + .execute(invalid_reuse) + .await + .unwrap_err(); + assert!(matches!(err, Error::InvalidInput { .. })); + } + + #[tokio::test] + async fn test_append_with_reserved_row_ids_conflict() { + let temp_dir = lance_core::utils::tempfile::TempStrDir::default(); + let tmp_path = &temp_dir; + let schema = sequence_batch(0..0).schema(); + let reader = RecordBatchIterator::new(vec![].into_iter().map(Ok), schema); + let dataset = Dataset::write( + reader, + tmp_path, + Some(WriteParams { + enable_stable_row_ids: true, + ..Default::default() + }), + ) + .await + .unwrap(); + + let reserve_txn = TransactionBuilder::new( + dataset.manifest().version, + Operation::ReserveRowIds { num_rows: 6 }, + ) + .build(); + let dataset = CommitBuilder::new(Arc::new(dataset)) + .execute(reserve_txn) + .await + .unwrap(); + let reserved = dataset.reserved_row_ids().await.unwrap().unwrap(); + + let append_params = WriteParams { + mode: WriteMode::Append, + ..Default::default() + }; + let first_transaction = InsertBuilder::new(Arc::new(dataset.clone())) + .with_row_ids(ReservedRowIds { + start_row_id: reserved.start_row_id + 1, + num_rows: 4, + }) + .with_params(&append_params) + .execute_uncommitted(vec![sequence_batch(100..104)]) + .await + .unwrap(); + let second_transaction = InsertBuilder::new(Arc::new(dataset.clone())) + .with_row_ids(ReservedRowIds { + start_row_id: reserved.start_row_id + 3, + num_rows: 2, + }) + .with_params(&append_params) + .execute_uncommitted(vec![sequence_batch(200..202)]) + .await + .unwrap(); + + let dataset = CommitBuilder::new(Arc::new(dataset)) + .execute(first_transaction) + .await + .unwrap(); + + let result = CommitBuilder::new(Arc::new(dataset)) + .execute(second_transaction) + .await; + + let err = result.unwrap_err(); + assert!(matches!(err, Error::CommitConflict { .. })); + assert!(err.to_string().contains("Reserved row ids overlap")); + } + #[tokio::test] async fn test_scan_row_ids() { // Write dataset with multiple files -> _rowid != _rowaddr diff --git a/rust/lance/src/dataset/tests/dataset_transactions.rs b/rust/lance/src/dataset/tests/dataset_transactions.rs index 2b49d0963e..99abe4fa4e 100644 --- a/rust/lance/src/dataset/tests/dataset_transactions.rs +++ b/rust/lance/src/dataset/tests/dataset_transactions.rs @@ -301,7 +301,14 @@ async fn test_inline_transaction() { } fn make_tx(read_version: u64) -> Transaction { - Transaction::new(read_version, Operation::Append { fragments: vec![] }, None) + Transaction::new( + read_version, + Operation::Append { + fragments: vec![], + row_ids: None, + }, + None, + ) } async fn delete_external_tx_file(ds: &Dataset) { diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index b5f2e05ff0..5e78bd8b20 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -28,7 +28,7 @@ use lance_table::rowids::read_row_ids; use lance_table::{ format::{ BasePath, DataFile, DataStorageFormat, Fragment, IndexFile, IndexMetadata, Manifest, - RowIdMeta, pb, + ReservedRowIds, RowIdMeta, pb, }, io::{ commit::CommitHandler, @@ -113,7 +113,10 @@ pub struct UpdateMap { pub enum Operation { /// Adding new fragments to the dataset. The fragments contained within /// haven't yet been assigned a final ID. - Append { fragments: Vec }, + Append { + fragments: Vec, + row_ids: Option, + }, /// Updated fragments contain those that have been modified with new deletion /// files. The deleted fragment IDs are those that should be removed from /// the manifest. @@ -186,6 +189,11 @@ pub enum Operation { /// has been committed. It is used during a rewrite operation to allow /// indices to be remapped to the new row ids as part of the operation. ReserveFragments { num_fragments: u32 }, + /// Reserves stable row ids for a future append operation. + /// + /// Reservations belong to the current version line and are not inherited by + /// later shallow clones. + ReserveRowIds { num_rows: u64 }, /// Update values in the dataset. /// @@ -281,6 +289,7 @@ impl std::fmt::Display for Operation { Self::Merge { .. } => write!(f, "Merge"), Self::Restore { .. } => write!(f, "Restore"), Self::ReserveFragments { .. } => write!(f, "ReserveFragments"), + Self::ReserveRowIds { .. } => write!(f, "ReserveRowIds"), Self::Update { .. } => write!(f, "Update"), Self::Project { .. } => write!(f, "Project"), Self::UpdateConfig { .. } => write!(f, "UpdateConfig"), @@ -312,7 +321,16 @@ impl PartialEq for Operation { a.len() == b.len() && a.iter().all(|f| b.contains(f)) } match (self, other) { - (Self::Append { fragments: a }, Self::Append { fragments: b }) => compare_vec(a, b), + ( + Self::Append { + fragments: a_fragments, + row_ids: a_row_ids, + }, + Self::Append { + fragments: b_fragments, + row_ids: b_row_ids, + }, + ) => compare_vec(a_fragments, b_fragments) && a_row_ids == b_row_ids, ( Self::Clone { is_shallow: a_is_shallow, @@ -411,6 +429,7 @@ impl PartialEq for Operation { Self::ReserveFragments { num_fragments: a }, Self::ReserveFragments { num_fragments: b }, ) => a == b, + (Self::ReserveRowIds { num_rows: a }, Self::ReserveRowIds { num_rows: b }) => a == b, ( Self::Update { removed_fragment_ids: a_removed, @@ -1128,6 +1147,96 @@ impl PartialEq for Operation { (Self::Clone { .. }, Self::UpdateBases { .. }) => { std::mem::discriminant(self) == std::mem::discriminant(other) } + (Self::ReserveRowIds { .. }, Self::Append { .. }) => { + std::mem::discriminant(self) == std::mem::discriminant(other) + } + (Self::ReserveRowIds { .. }, Self::Delete { .. }) => { + std::mem::discriminant(self) == std::mem::discriminant(other) + } + (Self::ReserveRowIds { .. }, Self::Overwrite { .. }) => { + std::mem::discriminant(self) == std::mem::discriminant(other) + } + (Self::ReserveRowIds { .. }, Self::CreateIndex { .. }) => { + std::mem::discriminant(self) == std::mem::discriminant(other) + } + (Self::ReserveRowIds { .. }, Self::Rewrite { .. }) => { + std::mem::discriminant(self) == std::mem::discriminant(other) + } + (Self::ReserveRowIds { .. }, Self::Merge { .. }) => { + std::mem::discriminant(self) == std::mem::discriminant(other) + } + (Self::ReserveRowIds { .. }, Self::Restore { .. }) => { + std::mem::discriminant(self) == std::mem::discriminant(other) + } + (Self::ReserveRowIds { .. }, Self::ReserveFragments { .. }) => { + std::mem::discriminant(self) == std::mem::discriminant(other) + } + (Self::ReserveRowIds { .. }, Self::Update { .. }) => { + std::mem::discriminant(self) == std::mem::discriminant(other) + } + (Self::ReserveRowIds { .. }, Self::Project { .. }) => { + std::mem::discriminant(self) == std::mem::discriminant(other) + } + (Self::ReserveRowIds { .. }, Self::UpdateConfig { .. }) => { + std::mem::discriminant(self) == std::mem::discriminant(other) + } + (Self::ReserveRowIds { .. }, Self::DataReplacement { .. }) => { + std::mem::discriminant(self) == std::mem::discriminant(other) + } + (Self::ReserveRowIds { .. }, Self::UpdateMemWalState { .. }) => { + std::mem::discriminant(self) == std::mem::discriminant(other) + } + (Self::ReserveRowIds { .. }, Self::Clone { .. }) => { + std::mem::discriminant(self) == std::mem::discriminant(other) + } + (Self::ReserveRowIds { .. }, Self::UpdateBases { .. }) => { + std::mem::discriminant(self) == std::mem::discriminant(other) + } + (Self::Append { .. }, Self::ReserveRowIds { .. }) => { + std::mem::discriminant(self) == std::mem::discriminant(other) + } + (Self::Delete { .. }, Self::ReserveRowIds { .. }) => { + std::mem::discriminant(self) == std::mem::discriminant(other) + } + (Self::Overwrite { .. }, Self::ReserveRowIds { .. }) => { + std::mem::discriminant(self) == std::mem::discriminant(other) + } + (Self::CreateIndex { .. }, Self::ReserveRowIds { .. }) => { + std::mem::discriminant(self) == std::mem::discriminant(other) + } + (Self::Rewrite { .. }, Self::ReserveRowIds { .. }) => { + std::mem::discriminant(self) == std::mem::discriminant(other) + } + (Self::Merge { .. }, Self::ReserveRowIds { .. }) => { + std::mem::discriminant(self) == std::mem::discriminant(other) + } + (Self::Restore { .. }, Self::ReserveRowIds { .. }) => { + std::mem::discriminant(self) == std::mem::discriminant(other) + } + (Self::ReserveFragments { .. }, Self::ReserveRowIds { .. }) => { + std::mem::discriminant(self) == std::mem::discriminant(other) + } + (Self::Update { .. }, Self::ReserveRowIds { .. }) => { + std::mem::discriminant(self) == std::mem::discriminant(other) + } + (Self::Project { .. }, Self::ReserveRowIds { .. }) => { + std::mem::discriminant(self) == std::mem::discriminant(other) + } + (Self::UpdateConfig { .. }, Self::ReserveRowIds { .. }) => { + std::mem::discriminant(self) == std::mem::discriminant(other) + } + (Self::DataReplacement { .. }, Self::ReserveRowIds { .. }) => { + std::mem::discriminant(self) == std::mem::discriminant(other) + } + (Self::UpdateMemWalState { .. }, Self::ReserveRowIds { .. }) => { + std::mem::discriminant(self) == std::mem::discriminant(other) + } + (Self::Clone { .. }, Self::ReserveRowIds { .. }) => { + std::mem::discriminant(self) == std::mem::discriminant(other) + } + (Self::UpdateBases { .. }, Self::ReserveRowIds { .. }) => { + std::mem::discriminant(self) == std::mem::discriminant(other) + } } } } @@ -1273,6 +1382,7 @@ impl Operation { Self::Rewrite { .. } => "Rewrite", Self::Merge { .. } => "Merge", Self::ReserveFragments { .. } => "ReserveFragments", + Self::ReserveRowIds { .. } => "ReserveRowIds", Self::Restore { .. } => "Restore", Self::Update { .. } => "Update", Self::Project { .. } => "Project", @@ -1492,6 +1602,18 @@ impl Transaction { } } + fn reserved_row_ids_end(row_ids: &ReservedRowIds) -> Result { + row_ids + .start_row_id + .checked_add(row_ids.num_rows) + .ok_or_else(|| { + Error::invalid_input(format!( + "reserved row ids overflow: start_row_id={}, num_rows={}", + row_ids.start_row_id, row_ids.num_rows + )) + }) + } + pub(crate) async fn restore_old_manifest( object_store: &ObjectStore, commit_handler: &dyn CommitHandler, @@ -1607,7 +1729,6 @@ impl Transaction { } } }; - let maybe_existing_fragments = current_manifest .map(|m| m.fragments.as_ref()) @@ -1624,13 +1745,33 @@ impl Transaction { "Clone operation should not enter build_manifest.".to_string(), )); } - Operation::Append { fragments } => { + Operation::Append { fragments, row_ids } => { final_fragments.extend(maybe_existing_fragments?.clone()); let mut new_fragments = Self::fragments_with_ids(fragments.clone(), &mut fragment_id) .collect::>(); - if let Some(next_row_id) = &mut next_row_id { - Self::assign_row_ids(next_row_id, new_fragments.as_mut_slice())?; + match (&mut next_row_id, row_ids) { + (Some(next_row_id), Some(row_ids)) => { + let requested_end = Self::reserved_row_ids_end(row_ids)?; + if requested_end > *next_row_id { + return Err(Error::invalid_input(format!( + "Requested row ids exceed next_row_id: start_row_id={}, num_rows={}, next_row_id={}", + row_ids.start_row_id, row_ids.num_rows, next_row_id + ))); + } + Self::assign_provided_row_ids(row_ids, new_fragments.as_mut_slice())?; + } + (Some(next_row_id), None) => { + Self::assign_row_ids(next_row_id, new_fragments.as_mut_slice())?; + } + (None, Some(_)) => { + return Err(Error::not_supported_source( + "Reserved row ids require a dataset created with stable row ids".into(), + )); + } + (None, None) => {} + } + if next_row_id.is_some() { // Add version metadata for all new fragments let new_version = current_manifest.map(|m| m.version + 1).unwrap_or(1); for fragment in new_fragments.iter_mut() { @@ -1956,6 +2097,27 @@ impl Transaction { Operation::ReserveFragments { .. } | Operation::UpdateConfig { .. } => { final_fragments.extend(maybe_existing_fragments?.clone()); } + Operation::ReserveRowIds { num_rows } => { + final_fragments.extend(maybe_existing_fragments?.clone()); + let next_row_id = next_row_id.as_mut().ok_or_else(|| { + Error::not_supported_source( + "Reserved row ids require a dataset created with stable row ids".into(), + ) + })?; + if *num_rows == 0 { + return Err(Error::invalid_input( + "num_rows must be greater than 0".to_string(), + )); + } + let start_row_id = *next_row_id; + let end_row_id = start_row_id.checked_add(*num_rows).ok_or_else(|| { + Error::invalid_input(format!( + "next_row_id overflow when reserving row ids: next_row_id={}, num_rows={}", + start_row_id, num_rows + )) + })?; + *next_row_id = end_row_id; + } Operation::Merge { fragments, .. } => { final_fragments.extend(fragments.clone()); @@ -2690,6 +2852,57 @@ impl Transaction { } Ok(()) } + + fn assign_provided_row_ids(row_ids: &ReservedRowIds, fragments: &mut [Fragment]) -> Result<()> { + let total_rows = fragments.iter().try_fold(0_u64, |total_rows, fragment| { + let physical_rows = fragment + .physical_rows + .ok_or_else(|| Error::internal("Fragment does not have physical rows"))? + as u64; + total_rows.checked_add(physical_rows).ok_or_else(|| { + Error::invalid_input(format!( + "provided row id count overflow when summing fragment rows: total_rows={}, fragment_rows={}", + total_rows, physical_rows + )) + }) + })?; + + if total_rows > row_ids.num_rows { + return Err(Error::invalid_input(format!( + "provided row id count {} is smaller than appended row count {}", + row_ids.num_rows, total_rows + ))); + } + + let mut next_reserved_row_id = row_ids.start_row_id; + for fragment in fragments { + if fragment.row_id_meta.is_some() { + return Err(Error::invalid_input(format!( + "cannot assign reserved row ids to fragment {} because row_id_meta is already present", + fragment.id + ))); + } + + let physical_rows = fragment + .physical_rows + .ok_or_else(|| Error::internal("Fragment does not have physical rows"))? + as u64; + let end_row_id = + next_reserved_row_id + .checked_add(physical_rows) + .ok_or_else(|| { + Error::invalid_input(format!( + "reserved row id overflow while assigning row ids: start_row_id={}, fragment_rows={}", + next_reserved_row_id, physical_rows + )) + })?; + let row_id_sequence = RowIdSequence::from(next_reserved_row_id..end_row_id); + fragment.row_id_meta = Some(RowIdMeta::Inline(write_row_ids(&row_id_sequence))); + next_reserved_row_id = end_row_id; + } + + Ok(()) + } } impl From<&DataReplacementGroup> for pb::transaction::DataReplacementGroup { @@ -2724,14 +2937,16 @@ impl TryFrom for Transaction { fn try_from(message: pb::Transaction) -> Result { let operation = match message.operation { - Some(pb::transaction::Operation::Append(pb::transaction::Append { fragments })) => { - Operation::Append { - fragments: fragments - .into_iter() - .map(Fragment::try_from) - .collect::>>()?, - } - } + Some(pb::transaction::Operation::Append(pb::transaction::Append { + fragments, + row_ids, + })) => Operation::Append { + fragments: fragments + .into_iter() + .map(Fragment::try_from) + .collect::>>()?, + row_ids: row_ids.map(ReservedRowIds::from), + }, Some(pb::transaction::Operation::Clone(pb::transaction::Clone { is_shallow, ref_name, @@ -2787,6 +3002,9 @@ impl TryFrom for Transaction { Some(pb::transaction::Operation::ReserveFragments( pb::transaction::ReserveFragments { num_fragments }, )) => Operation::ReserveFragments { num_fragments }, + Some(pb::transaction::Operation::ReserveRowIds(pb::transaction::ReserveRowIds { + num_rows, + })) => Operation::ReserveRowIds { num_rows }, Some(pb::transaction::Operation::Rewrite(pb::transaction::Rewrite { old_fragments, new_fragments, @@ -3079,9 +3297,12 @@ impl TryFrom for RewriteGroup { impl From<&Transaction> for pb::Transaction { fn from(value: &Transaction) -> Self { let operation = match &value.operation { - Operation::Append { fragments } => { + Operation::Append { fragments, row_ids } => { pb::transaction::Operation::Append(pb::transaction::Append { fragments: fragments.iter().map(pb::DataFragment::from).collect(), + row_ids: row_ids + .as_ref() + .map(pb::transaction::append::ReservedRowIds::from), }) } Operation::Clone { @@ -3139,6 +3360,11 @@ impl From<&Transaction> for pb::Transaction { num_fragments: *num_fragments, }) } + Operation::ReserveRowIds { num_rows } => { + pb::transaction::Operation::ReserveRowIds(pb::transaction::ReserveRowIds { + num_rows: *num_rows, + }) + } Operation::Rewrite { groups, rewritten_indices, @@ -3345,7 +3571,12 @@ pub fn validate_operation(manifest: Option<&Manifest>, operation: &Operation) -> }; match operation { - Operation::Append { fragments } => { + Operation::Append { fragments, row_ids } => { + if row_ids.is_some() && !manifest.uses_stable_row_ids() { + return Err(Error::not_supported( + "Reserved row ids require a dataset created with stable row ids", + )); + } // Fragments must contain all fields in the schema schema_fragments_valid(Some(manifest), &manifest.schema, fragments) } @@ -3489,6 +3720,7 @@ mod tests { use chrono::Utc; use lance_core::datatypes::Schema as LanceSchema; use lance_io::utils::CachedFileSize; + use lance_table::feature_flags::FLAG_STABLE_ROW_IDS; use std::sync::Arc; use uuid::Uuid; @@ -3782,6 +4014,31 @@ mod tests { assert_eq!(fragments[0].files[1].path, "mixed.lance"); } + #[test] + fn test_reserve_row_ids() { + let mut manifest = sample_manifest(); + manifest.reader_feature_flags = FLAG_STABLE_ROW_IDS; + manifest.writer_feature_flags = FLAG_STABLE_ROW_IDS; + manifest.next_row_id = 10; + + let transaction = Transaction::new( + manifest.version, + Operation::ReserveRowIds { num_rows: 5 }, + None, + ); + + let (next_manifest, _) = transaction + .build_manifest( + Some(&manifest), + Vec::new(), + "txn", + &ManifestWriteConfig::default(), + ) + .unwrap(); + + assert_eq!(next_manifest.next_row_id, 15); + } + #[test] fn test_assign_row_ids_new_fragment() { // Test assigning row IDs to a fragment without existing row IDs diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index ddf1b76942..043fce1929 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -50,6 +50,72 @@ pub struct CommitBuilder<'a> { } impl<'a> CommitBuilder<'a> { + async fn validate_append_row_ids( + dest: &WriteDestination<'_>, + transaction: &Transaction, + ) -> Result<()> { + let Some(dataset) = dest.dataset() else { + return Ok(()); + }; + + let Operation::Append { + row_ids: Some(requested_row_ids), + .. + } = &transaction.operation + else { + return Ok(()); + }; + + let base_dataset = if dataset.manifest.version == transaction.read_version { + dataset.clone() + } else { + dataset.checkout_version(transaction.read_version).await? + }; + + let Some(reserved_row_ids) = base_dataset.reserved_row_ids().await? else { + return Err(Error::invalid_input(format!( + "Append row ids require read_version {} to be a ReserveRowIds transaction", + transaction.read_version + ))); + }; + + let requested_end_row_id = requested_row_ids + .start_row_id + .checked_add(requested_row_ids.num_rows) + .ok_or_else(|| { + Error::invalid_input(format!( + "Requested row ids overflow: start_row_id={}, num_rows={}", + requested_row_ids.start_row_id, requested_row_ids.num_rows + )) + })?; + let reserved_end_row_id = reserved_row_ids + .start_row_id + .checked_add(reserved_row_ids.num_rows) + .ok_or_else(|| { + Error::invalid_input(format!( + "Reserved row ids overflow: start_row_id={}, num_rows={}", + reserved_row_ids.start_row_id, reserved_row_ids.num_rows + )) + })?; + + let requested_within_reserved = reserved_row_ids.start_row_id + <= requested_row_ids.start_row_id + && requested_end_row_id <= reserved_end_row_id; + + if requested_within_reserved { + Ok(()) + } else { + Err(Error::invalid_input(format!( + "Requested row ids start_row_id={} num_rows={} are not contained within the read_version {} reservation start_row_id={} num_rows={}", + requested_row_ids.start_row_id, + requested_row_ids.num_rows, + transaction.read_version, + reserved_row_ids.start_row_id, + reserved_row_ids.num_rows + ))) + } + } + pub fn new(dest: impl Into>) -> Self { Self { dest: dest.into(), @@ -266,6 +332,7 @@ impl<'a> CommitBuilder<'a> { } else { validate_operation(None, &transaction.operation)?; } + Self::validate_append_row_ids(&dest, &transaction).await?; let (metadata_cache, index_cache) = match &dest { WriteDestination::Dataset(ds) => (ds.metadata_cache.clone(), ds.index_cache.clone()), @@ -423,10 +490,11 @@ impl<'a> CommitBuilder<'a> { } if transactions .iter() - .any(|t| !matches!(t.operation, Operation::Append { .. })) + .any(|t| !matches!(t.operation, Operation::Append { row_ids: None, .. })) { return Err(Error::not_supported_source( - "Only append transactions are supported in batch commits".into(), + "Only append transactions without reserved row ids are supported in batch commits" + .into(), )); } @@ -438,10 +506,11 @@ impl<'a> CommitBuilder<'a> { fragments: transactions .iter() .flat_map(|t| match &t.operation { - Operation::Append { fragments } => fragments.clone(), + Operation::Append { fragments, .. } => fragments.clone(), _ => unreachable!(), }) .collect(), + row_ids: None, }, read_version, tag: None, @@ -506,6 +575,7 @@ mod tests { uuid: uuid::Uuid::new_v4().hyphenated().to_string(), operation: Operation::Append { fragments: vec![sample_fragment()], + row_ids: None, }, read_version, tag: None, @@ -768,10 +838,10 @@ mod tests { let append1 = sample_transaction(1); let append2 = sample_transaction(2); let mut expected_fragments = vec![]; - if let Operation::Append { fragments } = &append1.operation { + if let Operation::Append { fragments, .. } = &append1.operation { expected_fragments.extend(fragments.clone()); } - if let Operation::Append { fragments } = &append2.operation { + if let Operation::Append { fragments, .. } = &append2.operation { expected_fragments.extend(fragments.clone()); } let res = CommitBuilder::new(dataset.clone()) @@ -780,7 +850,7 @@ mod tests { .unwrap(); let transaction = res.merged; assert!( - matches!(transaction.operation, Operation::Append { fragments } if fragments == expected_fragments) + matches!(transaction.operation, Operation::Append { fragments, .. } if fragments == expected_fragments) ); assert_eq!(transaction.read_version, 1); } diff --git a/rust/lance/src/dataset/write/insert.rs b/rust/lance/src/dataset/write/insert.rs index 20209ed7f3..4259146bb9 100644 --- a/rust/lance/src/dataset/write/insert.rs +++ b/rust/lance/src/dataset/write/insert.rs @@ -14,7 +14,7 @@ use lance_datafusion::utils::StreamingWriteSource; use lance_file::version::LanceFileVersion; use lance_io::object_store::ObjectStore; use lance_table::feature_flags::can_write_dataset; -use lance_table::format::Fragment; +use lance_table::format::{Fragment, ReservedRowIds}; use lance_table::io::commit::CommitHandler; use object_store::path::Path; @@ -49,6 +49,7 @@ pub struct InsertBuilder<'a> { // TODO: make these parameters a part of the builder, and add specific methods. params: Option<&'a WriteParams>, write_progress: Option, + row_ids: Option, } impl<'a> InsertBuilder<'a> { @@ -57,6 +58,7 @@ impl<'a> InsertBuilder<'a> { dest: dest.into(), params: None, write_progress: None, + row_ids: None, } } @@ -77,6 +79,23 @@ impl<'a> InsertBuilder<'a> { self } + /// Use a pre-reserved range of stable row ids for the appended rows. + /// + /// The provided range must come from the `ReserveRowIds` transaction at the + /// append transaction's `read_version`. + /// + /// If `row_ids.num_rows` is larger than the number of rows actually + /// appended then only the prefix is assigned to written rows. + /// + /// Multiple append transactions may consume disjoint sub-ranges from the + /// same reservation, as long as each append still uses the reserve + /// transaction's version as its `read_version` and the requested ranges do + /// not overlap with each other. + pub fn with_row_ids(mut self, row_ids: ReservedRowIds) -> Self { + self.row_ids = Some(row_ids); + self + } + /// Execute the insert operation with the given data. /// /// This writes the data fragments and commits them into the dataset. @@ -215,12 +234,13 @@ impl<'a> InsertBuilder<'a> { ) .await?; - let transaction = Self::build_transaction(written_schema, written_fragments, &context)?; + let transaction = self.build_transaction(written_schema, written_fragments, &context)?; Ok((transaction, context)) } fn build_transaction( + &self, schema: Schema, fragments: Vec, context: &WriteContext<'_>, @@ -262,7 +282,10 @@ impl<'a> InsertBuilder<'a> { config_upsert_values: None, initial_bases: context.params.initial_bases.clone(), }, - WriteMode::Append => Operation::Append { fragments }, + WriteMode::Append => Operation::Append { + fragments, + row_ids: self.row_ids.clone(), + }, }; let transaction = TransactionBuilder::new( @@ -281,6 +304,29 @@ impl<'a> InsertBuilder<'a> { fn validate_write(&self, context: &mut WriteContext, data_schema: &Schema) -> Result<()> { // Write mode + if self.row_ids.is_some() { + match (&context.params.mode, &context.dest) { + (WriteMode::Append, WriteDestination::Dataset(dataset)) => { + if !dataset.manifest.uses_stable_row_ids() { + return Err(Error::not_supported_source( + "Reserved row ids require a dataset created with stable row ids".into(), + )); + } + } + (WriteMode::Append, WriteDestination::Uri(_)) => { + return Err(Error::invalid_input_source( + "Reserved row ids can only be used when appending to an existing dataset" + .into(), + )); + } + _ => { + return Err(Error::invalid_input_source( + "Reserved row ids can only be used with append writes".into(), + )); + } + } + } + match (&context.params.mode, &context.dest) { (WriteMode::Create, WriteDestination::Dataset(ds)) => { return Err(Error::dataset_already_exists(ds.uri.clone())); diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index 7c35957fd7..303ddc3dee 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -1236,7 +1236,10 @@ mod tests { let base_path = Path::from("test"); let transaction = Transaction::new( 42, - Operation::Append { fragments: vec![] }, + Operation::Append { + fragments: vec![], + row_ids: None, + }, Some("hello world".to_string()), ); diff --git a/rust/lance/src/io/commit/conflict_resolver.rs b/rust/lance/src/io/commit/conflict_resolver.rs index c270d2ab7e..d3edcb627c 100644 --- a/rust/lance/src/io/commit/conflict_resolver.rs +++ b/rust/lance/src/io/commit/conflict_resolver.rs @@ -52,6 +52,7 @@ impl<'a> TransactionRebase<'a> { | Operation::Overwrite { .. } | Operation::CreateIndex { .. } | Operation::ReserveFragments { .. } + | Operation::ReserveRowIds { .. } | Operation::Project { .. } | Operation::UpdateConfig { .. } | Operation::UpdateMemWalState { .. } @@ -211,6 +212,9 @@ impl<'a> TransactionRebase<'a> { Operation::ReserveFragments { .. } => { self.check_reserve_fragments_txn(other_transaction, other_version) } + Operation::ReserveRowIds { .. } => { + self.check_reserve_row_ids_txn(other_transaction, other_version) + } Operation::Project { .. } => self.check_project_txn(other_transaction, other_version), Operation::UpdateConfig { .. } => { self.check_update_config_txn(other_transaction, other_version) @@ -234,6 +238,7 @@ impl<'a> TransactionRebase<'a> { match &other_transaction.operation { Operation::CreateIndex { .. } | Operation::ReserveFragments { .. } + | Operation::ReserveRowIds { .. } | Operation::Clone { .. } | Operation::Project { .. } | Operation::Append { .. } @@ -382,6 +387,7 @@ impl<'a> TransactionRebase<'a> { match &other_transaction.operation { Operation::CreateIndex { .. } | Operation::ReserveFragments { .. } + | Operation::ReserveRowIds { .. } | Operation::Project { .. } | Operation::Clone { .. } | Operation::UpdateConfig { .. } @@ -542,7 +548,7 @@ impl<'a> TransactionRebase<'a> { Operation::Delete { .. } | Operation::Update { .. } => Ok(()), // Merge, reserve, and project don't change row ids, so this should be fine. Operation::Merge { .. } => Ok(()), - Operation::ReserveFragments { .. } => Ok(()), + Operation::ReserveFragments { .. } | Operation::ReserveRowIds { .. } => Ok(()), Operation::Project { .. } => Ok(()), // Should be compatible with rewrite if it didn't move the rows // we indexed. If it did, we could retry. @@ -656,6 +662,7 @@ impl<'a> TransactionRebase<'a> { // existing fragments or update fragments we don't touch. Operation::Append { .. } | Operation::ReserveFragments { .. } + | Operation::ReserveRowIds { .. } | Operation::Project { .. } | Operation::Clone { .. } | Operation::UpdateConfig { .. } @@ -835,6 +842,7 @@ impl<'a> TransactionRebase<'a> { | Operation::Merge { .. } | Operation::Restore { .. } | Operation::ReserveFragments { .. } + | Operation::ReserveRowIds { .. } | Operation::Update { .. } | Operation::Project { .. } | Operation::UpdateBases { .. } => Ok(()), @@ -846,6 +854,51 @@ impl<'a> TransactionRebase<'a> { other_transaction: &Transaction, other_version: u64, ) -> Result<()> { + if let ( + Operation::Append { + row_ids: Some(current_row_ids), + .. + }, + Operation::Append { + row_ids: Some(other_row_ids), + .. + }, + ) = (&self.transaction.operation, &other_transaction.operation) + { + let current_end = current_row_ids + .start_row_id + .checked_add(current_row_ids.num_rows) + .ok_or_else(|| { + Error::invalid_input(format!( + "reserved row ids overflow: start_row_id={}, num_rows={}", + current_row_ids.start_row_id, current_row_ids.num_rows + )) + })?; + let other_end = other_row_ids + .start_row_id + .checked_add(other_row_ids.num_rows) + .ok_or_else(|| { + Error::invalid_input(format!( + "reserved row ids overflow: start_row_id={}, num_rows={}", + other_row_ids.start_row_id, other_row_ids.num_rows + )) + })?; + if current_row_ids.start_row_id < other_end && other_row_ids.start_row_id < current_end + { + return Err(Error::commit_conflict_source( + other_version, + format!( + "Reserved row ids overlap with a concurrent append: current start_row_id={} num_rows={}, other start_row_id={} num_rows={}", + current_row_ids.start_row_id, + current_row_ids.num_rows, + other_row_ids.start_row_id, + other_row_ids.num_rows + ) + .into(), + )); + } + } + match &other_transaction.operation { // Append is not compatible with any operation that completely // overwrites the schema. @@ -860,6 +913,7 @@ impl<'a> TransactionRebase<'a> { | Operation::Delete { .. } | Operation::Update { .. } | Operation::ReserveFragments { .. } + | Operation::ReserveRowIds { .. } | Operation::Project { .. } | Operation::UpdateBases { .. } | Operation::Merge { .. } @@ -883,6 +937,7 @@ impl<'a> TransactionRebase<'a> { | Operation::Merge { .. } | Operation::UpdateConfig { .. } | Operation::ReserveFragments { .. } + | Operation::ReserveRowIds { .. } | Operation::Project { .. } | Operation::UpdateBases { .. } => Ok(()), Operation::CreateIndex { new_indices, .. } => { @@ -962,6 +1017,7 @@ impl<'a> TransactionRebase<'a> { match &other_transaction.operation { Operation::CreateIndex { .. } | Operation::ReserveFragments { .. } + | Operation::ReserveRowIds { .. } | Operation::Clone { .. } | Operation::UpdateConfig { .. } | Operation::UpdateBases { .. } => Ok(()), @@ -998,6 +1054,7 @@ impl<'a> TransactionRebase<'a> { | Operation::Merge { .. } | Operation::Restore { .. } | Operation::ReserveFragments { .. } + | Operation::ReserveRowIds { .. } | Operation::UpdateBases { .. } | Operation::Update { .. } | Operation::Project { .. } @@ -1025,6 +1082,7 @@ impl<'a> TransactionRebase<'a> { | Operation::DataReplacement { .. } | Operation::Merge { .. } | Operation::ReserveFragments { .. } + | Operation::ReserveRowIds { .. } | Operation::Update { .. } | Operation::Project { .. } | Operation::Clone { .. } @@ -1034,6 +1092,14 @@ impl<'a> TransactionRebase<'a> { } } + fn check_reserve_row_ids_txn( + &mut self, + other_transaction: &Transaction, + other_version: u64, + ) -> Result<()> { + self.check_reserve_fragments_txn(other_transaction, other_version) + } + fn check_project_txn( &mut self, other_transaction: &Transaction, @@ -1050,6 +1116,7 @@ impl<'a> TransactionRebase<'a> { | Operation::Rewrite { .. } | Operation::Clone { .. } | Operation::ReserveFragments { .. } + | Operation::ReserveRowIds { .. } | Operation::UpdateBases { .. } => Ok(()), Operation::Merge { .. } | Operation::Project { .. } => { // Need to recompute the schema @@ -1114,6 +1181,7 @@ impl<'a> TransactionRebase<'a> { | Operation::Merge { .. } | Operation::Restore { .. } | Operation::ReserveFragments { .. } + | Operation::ReserveRowIds { .. } | Operation::Update { .. } | Operation::Project { .. } | Operation::UpdateMemWalState { .. } @@ -1178,6 +1246,7 @@ impl<'a> TransactionRebase<'a> { Operation::UpdateConfig { .. } | Operation::Rewrite { .. } | Operation::ReserveFragments { .. } + | Operation::ReserveRowIds { .. } | Operation::UpdateBases { .. } => Ok(()), Operation::Append { .. } | Operation::Overwrite { .. } @@ -1279,6 +1348,7 @@ impl<'a> TransactionRebase<'a> { | Operation::Merge { .. } | Operation::Restore { .. } | Operation::ReserveFragments { .. } + | Operation::ReserveRowIds { .. } | Operation::Project { .. } | Operation::Clone { .. } | Operation::UpdateConfig { .. } @@ -2184,6 +2254,7 @@ mod tests { let other_operations = [ Operation::Append { fragments: vec![fragment0.clone()], + row_ids: None, }, Operation::CreateIndex { new_indices: vec![index0.clone()], @@ -2253,6 +2324,7 @@ mod tests { ( Operation::Append { fragments: vec![fragment0.clone()], + row_ids: None, }, [ Compatible, // append @@ -2938,7 +3010,10 @@ mod tests { // Test against various data operations let data_operations = vec![ - Operation::Append { fragments: vec![] }, + Operation::Append { + fragments: vec![], + row_ids: None, + }, Operation::Delete { deleted_fragment_ids: vec![0], updated_fragments: vec![], @@ -3102,6 +3177,7 @@ mod tests { | Operation::Overwrite { .. } | Operation::CreateIndex { .. } | Operation::ReserveFragments { .. } + | Operation::ReserveRowIds { .. } | Operation::Project { .. } | Operation::UpdateConfig { .. } | Operation::UpdateBases { .. }