Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion java/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,10 @@ pub fn inner_commit_append<'local>(
for f in fragment_objs {
fragments.push(f.extract_object(env)?);
}
let op = Operation::Append { fragments };
let op = Operation::Append {
fragments,
row_ids: None,
};
let path_str = path.extract(env)?;
let read_version = env.get_u64_opt(&read_version_obj)?;
let storage_options = extract_storage_options(env, &storage_options_obj)?;
Expand Down
6 changes: 5 additions & 1 deletion java/lance-jni/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ fn convert_to_java_operation_inner<'local>(
match operation {
Operation::Append {
fragments: rust_fragments,
..
} => {
Comment on lines 351 to 355
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Handle ReserveRowIds in JNI operation conversion

This JNI conversion match does not include Operation::ReserveRowIds; it currently falls through to _ => unimplemented!() in convert_to_java_operation_inner. As a result, reading transactions from Java (for example nativeReadTransaction / transaction listing) will panic when a dataset version was produced by ReserveRowIds, causing hard failures instead of a usable Java operation object.

Useful? React with 👍 / 👎.

let java_fragments = export_vec(env, &rust_fragments)?;

Expand Down Expand Up @@ -1090,7 +1091,10 @@ fn convert_to_rust_operation(
import_vec_from_method(env, java_operation, "fragments", |env, fragment| {
fragment.extract_object(env)
})?;
Operation::Append { fragments }
Operation::Append {
fragments,
row_ids: None,
}
}
"Delete" => {
let updated_fragments: Vec<Fragment> = import_vec_from_method(
Expand Down
2 changes: 1 addition & 1 deletion protos/table.proto
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ message Manifest {

// The branch of the dataset. None means main branch.
optional string branch = 20;

} // Manifest

// external dataset base path
Expand Down Expand Up @@ -690,4 +691,3 @@ message ShardField {
// Transform parameters (e.g., num_buckets for bucket transform).
map<string, string> parameters = 6;
}

18 changes: 18 additions & 0 deletions protos/transaction.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,22 @@ message Transaction {

// Add new rows to the dataset.
message Append {
message ReservedRowIds {
uint64 start_row_id = 1;
uint64 num_rows = 2;
}

// The new fragments to append.
//
// Fragment IDs are not yet assigned.
repeated DataFragment fragments = 1;
// Stable row ids to assign to the new rows, if pre-reserved.
//
// The requested range must be contained within the row ids reserved by the
// dataset version identified by read_version. Multiple append operations
// may consume disjoint sub-ranges from the same reservation as long as
// they continue to reference that reserve version.
optional ReservedRowIds row_ids = 2;
}

// Mark rows as deleted.
Expand Down Expand Up @@ -164,6 +176,11 @@ message Transaction {
uint32 num_fragments = 1;
}

// An operation that reserves stable row ids for a future append.
message ReserveRowIds {
uint64 num_rows = 1;
}

// An operation that clones a dataset.
message Clone {
// - true: Performs a metadata-only clone (copies manifest without data files).
Expand Down Expand Up @@ -339,6 +356,7 @@ message Transaction {
UpdateMemWalState update_mem_wal_state = 112;
Clone clone = 113;
UpdateBases update_bases = 114;
ReserveRowIds reserve_row_ids = 115;
}

// Fields 200/202 (`blob_append` / `blob_overwrite`) previously represented blob dataset ops.
Expand Down
7 changes: 5 additions & 2 deletions python/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,10 @@ impl FromPyObject<'_> for PyLance<Operation> {
}
"Append" => {
let fragments = extract_vec(&ob.getattr("fragments")?)?;
let op = Operation::Append { fragments };
let op = Operation::Append {
fragments,
row_ids: None,
};
Ok(Self(op))
}
"Delete" => {
Expand Down Expand Up @@ -403,7 +406,7 @@ impl<'py> IntoPyObject<'py> for PyLance<&Operation> {
.expect("Failed to import LanceOperation namespace");

match self.0 {
Operation::Append { fragments } => {
Operation::Append { fragments, .. } => {
let fragments = export_vec(py, fragments.as_slice())?;
Comment on lines 408 to 410
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Handle ReserveRowIds when exporting Python operations

After introducing Operation::ReserveRowIds, this IntoPyObject match still has no arm for it and falls through to the _ => todo!() branch later in the same function. That makes Dataset.read_transaction(...) / get_transactions(...) crash at runtime for any version created by a reserve-row-ids commit, which breaks Python transaction/history APIs as soon as such a dataset is encountered.

Useful? React with 👍 / 👎.

let cls = namespace
.getattr("Append")
Expand Down
4 changes: 2 additions & 2 deletions rust/lance-table/src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ pub use fragment::*;
pub use index::{IndexFile, IndexMetadata, index_metadata_codec, list_index_files_with_sizes};

pub use manifest::{
BasePath, DETACHED_VERSION_MASK, DataStorageFormat, Manifest, SelfDescribingFileReader,
WriterVersion, is_detached_version,
BasePath, DETACHED_VERSION_MASK, DataStorageFormat, Manifest, ReservedRowIds,
SelfDescribingFileReader, WriterVersion, is_detached_version,
};
pub use transaction::Transaction;

Expand Down
30 changes: 29 additions & 1 deletion rust/lance-table/src/format/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ use lance_core::{Error, Result};
use lance_io::object_store::{ObjectStore, ObjectStoreRegistry};
use lance_io::utils::read_struct;

#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)]
pub struct ReservedRowIds {
/// The first stable row id in the reserved range.
pub start_row_id: u64,
/// The number of stable row ids in the reserved range.
pub num_rows: u64,
}

/// Manifest of a dataset
///
/// * Schema
Expand Down Expand Up @@ -82,7 +90,7 @@ pub struct Manifest {
/// accelerating the fragment search using offset ranges.
fragment_offsets: Vec<usize>,

/// The max row id used so far.
/// The next row id that will be allocated.
pub next_row_id: u64,

/// The storage format of the data files.
Expand Down Expand Up @@ -233,6 +241,8 @@ impl Manifest {
/// - Any persistent storage operations
/// - Modifications to the original data
/// - If the shallow clone is for branch, ref_name is the source branch
///
/// The clone keeps the current `next_row_id` progress.
pub fn shallow_clone(
&self,
ref_name: Option<String>,
Expand Down Expand Up @@ -839,6 +849,24 @@ impl From<BasePath> for pb::BasePath {
}
}

impl From<pb::transaction::append::ReservedRowIds> for ReservedRowIds {
fn from(value: pb::transaction::append::ReservedRowIds) -> Self {
Self {
start_row_id: value.start_row_id,
num_rows: value.num_rows,
}
}
}

impl From<&ReservedRowIds> for pb::transaction::append::ReservedRowIds {
fn from(value: &ReservedRowIds) -> Self {
Self {
start_row_id: value.start_row_id,
num_rows: value.num_rows,
}
}
}

impl TryFrom<pb::Manifest> for Manifest {
type Error = Error;

Expand Down
37 changes: 37 additions & 0 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ use lance_core::box_error;
use lance_index::scalar::lance_format::LanceIndexStore;
use lance_namespace::models::{DeclareTableRequest, DescribeTableRequest};
use lance_table::feature_flags::{apply_feature_flags, can_read_dataset};
pub use lance_table::format::ReservedRowIds;
use lance_table::io::deletion::{DELETIONS_DIR, relative_deletion_file_path};
pub use schema_evolution::{
BatchInfo, BatchUDF, ColumnAlteration, NewColumnTransform, UDFCheckpointStore,
Expand Down Expand Up @@ -405,6 +406,42 @@ impl From<Schema> for ProjectionRequest {
}

impl Dataset {
/// Returns the stable row-id range reserved by the current dataset version,
/// if any.
///
/// This only reports the reservation created by the current version's
/// `ReserveRowIds` transaction. It does not scan older versions or expose
/// a global list of outstanding reservations.
///
/// In other words, this returns `Some(...)` only when the current dataset
/// version itself was produced by `ReserveRowIds`; otherwise it returns
/// `None`.
pub async fn reserved_row_ids(&self) -> Result<Option<ReservedRowIds>> {
let Some(transaction) = self.read_transaction().await? else {
return Ok(None);
};

let Operation::ReserveRowIds { num_rows } = transaction.operation else {
return Ok(None);
};

let start_row_id = self
.manifest
.next_row_id
.checked_sub(num_rows)
.ok_or_else(|| {
Error::internal(format!(
"Manifest next_row_id={} is smaller than reserved num_rows={}",
self.manifest.next_row_id, num_rows
))
})?;

Ok(Some(ReservedRowIds {
start_row_id,
num_rows,
}))
}

/// Open an existing dataset.
///
/// See also [DatasetBuilder].
Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/dataset/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3894,6 +3894,7 @@ mod tests {

let op = Operation::Append {
fragments: vec![frag],
row_ids: None,
};
let dataset = Dataset::commit(
&dataset.uri,
Expand Down
148 changes: 147 additions & 1 deletion rust/lance/src/dataset/rowids.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,11 @@ async fn load_row_id_index(dataset: &Dataset) -> Result<lance_table::rowids::Row
mod test {
use std::ops::Range;

use crate::dataset::{UpdateBuilder, WriteMode, WriteParams, builder::DatasetBuilder};
use crate::dataset::transaction::{Operation, TransactionBuilder};
use crate::dataset::{
CommitBuilder, InsertBuilder, ReservedRowIds, UpdateBuilder, WriteMode, WriteParams,
builder::DatasetBuilder,
};

use super::*;

Expand Down Expand Up @@ -317,6 +321,148 @@ mod test {
assert!(index.get(60).is_none());
}

#[tokio::test]
async fn test_append_with_reserved_row_ids() {
let temp_dir = lance_core::utils::tempfile::TempStrDir::default();
let tmp_path = &temp_dir;
let schema = sequence_batch(0..0).schema();
let reader = RecordBatchIterator::new(vec![].into_iter().map(Ok), schema);
let dataset = Dataset::write(
reader,
tmp_path,
Some(WriteParams {
enable_stable_row_ids: true,
..Default::default()
}),
)
.await
.unwrap();

let reserve_txn = TransactionBuilder::new(
dataset.manifest().version,
Operation::ReserveRowIds { num_rows: 7 },
)
.build();
let dataset = CommitBuilder::new(Arc::new(dataset))
.execute(reserve_txn)
.await
.unwrap();

let reserved = dataset.reserved_row_ids().await.unwrap().unwrap();

let append_params = WriteParams {
mode: WriteMode::Append,
max_rows_per_file: 2,
..Default::default()
};
let requested = ReservedRowIds {
start_row_id: reserved.start_row_id + 2,
num_rows: 5,
};
let dataset = InsertBuilder::new(Arc::new(dataset))
.with_row_ids(requested.clone())
.with_params(&append_params)
.execute(vec![sequence_batch(100..104)])
.await
.unwrap();

assert!(dataset.reserved_row_ids().await.unwrap().is_none());
assert_eq!(dataset.get_fragments().len(), 2);

let batch = dataset
.scan()
.with_row_id()
.project(&["id"])
.unwrap()
.try_into_batch()
.await
.unwrap();
let row_ids = batch[ROW_ID]
.as_primitive::<UInt64Type>()
.values()
.iter()
.copied()
.collect::<Vec<_>>();
assert_eq!(row_ids, vec![2, 3, 4, 5]);

let invalid_reuse = InsertBuilder::new(Arc::new(dataset.clone()))
.with_row_ids(requested)
.with_params(&append_params)
.execute_uncommitted(vec![sequence_batch(200..202)])
.await
.unwrap();
let err = CommitBuilder::new(Arc::new(dataset))
.execute(invalid_reuse)
.await
.unwrap_err();
assert!(matches!(err, Error::InvalidInput { .. }));
}

#[tokio::test]
async fn test_append_with_reserved_row_ids_conflict() {
let temp_dir = lance_core::utils::tempfile::TempStrDir::default();
let tmp_path = &temp_dir;
let schema = sequence_batch(0..0).schema();
let reader = RecordBatchIterator::new(vec![].into_iter().map(Ok), schema);
let dataset = Dataset::write(
reader,
tmp_path,
Some(WriteParams {
enable_stable_row_ids: true,
..Default::default()
}),
)
.await
.unwrap();

let reserve_txn = TransactionBuilder::new(
dataset.manifest().version,
Operation::ReserveRowIds { num_rows: 6 },
)
.build();
let dataset = CommitBuilder::new(Arc::new(dataset))
.execute(reserve_txn)
.await
.unwrap();
let reserved = dataset.reserved_row_ids().await.unwrap().unwrap();

let append_params = WriteParams {
mode: WriteMode::Append,
..Default::default()
};
let first_transaction = InsertBuilder::new(Arc::new(dataset.clone()))
.with_row_ids(ReservedRowIds {
start_row_id: reserved.start_row_id + 1,
num_rows: 4,
})
.with_params(&append_params)
.execute_uncommitted(vec![sequence_batch(100..104)])
.await
.unwrap();
let second_transaction = InsertBuilder::new(Arc::new(dataset.clone()))
.with_row_ids(ReservedRowIds {
start_row_id: reserved.start_row_id + 3,
num_rows: 2,
})
.with_params(&append_params)
.execute_uncommitted(vec![sequence_batch(200..202)])
.await
.unwrap();

let dataset = CommitBuilder::new(Arc::new(dataset))
.execute(first_transaction)
.await
.unwrap();

let result = CommitBuilder::new(Arc::new(dataset))
.execute(second_transaction)
.await;

let err = result.unwrap_err();
assert!(matches!(err, Error::CommitConflict { .. }));
assert!(err.to_string().contains("Reserved row ids overlap"));
}

#[tokio::test]
async fn test_scan_row_ids() {
// Write dataset with multiple files -> _rowid != _rowaddr
Expand Down
Loading
Loading