Skip to content

Commit 6112a34

Browse files
westonpaceclaude
andauthored
feat: add DataFile.create helper for building DataFile metadata (#6427)
## Summary - Adds `DataFile.create(dataset, path, *, base_id=None)` classmethod that reads a lance file's metadata and automatically constructs a `DataFile` with correct field IDs, column indices, file version, and file size - Eliminates the need for manual `DataFile` construction when performing `DataReplacement` operations - Handles packed structs, structural file versions (v2.1+), subset columns, and external base paths Closes #6413 ## Test plan - [x] `test_data_file_create_basic` — verifies fields, column_indices, version, file_size for a two-column file - [x] `test_data_file_create_subset_columns` — single column from a multi-column dataset - [x] `test_data_file_create_end_to_end` — full DataReplacement round-trip using the new helper - [x] `test_data_file_create_unknown_column` — error on column not in dataset schema - [x] All existing `test_table_ops.py` tests still pass 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 40d09bf commit 6112a34

File tree

4 files changed

+251
-9
lines changed

4 files changed

+251
-9
lines changed

python/python/lance/fragment.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,36 @@ def field_ids(self) -> List[int]:
250250
)
251251
return self.fields
252252

253+
@classmethod
254+
def create(
255+
cls,
256+
dataset: "LanceDataset",
257+
path: str,
258+
*,
259+
base_id: Optional[int] = None,
260+
) -> "DataFile":
261+
"""Create a DataFile by reading metadata from an existing lance file.
262+
263+
This is a convenience method for creating DataFile metadata needed
264+
for operations like DataReplacement. It opens the file, reads its
265+
schema and version information, matches columns to the dataset's
266+
schema to determine field IDs, and calculates column indices.
267+
268+
Parameters
269+
----------
270+
dataset : LanceDataset
271+
The dataset this file will belong to.
272+
path : str
273+
The path to the data file, relative to the dataset's data directory.
274+
base_id : int, optional
275+
The base path ID if the file is outside the dataset directory.
276+
277+
Returns
278+
-------
279+
DataFile
280+
"""
281+
return _Fragment.create_data_file(dataset._ds, path, base_id=base_id)
282+
253283

254284
class LanceFragment(pa.dataset.Fragment):
255285
def __init__(

python/python/tests/test_table_ops.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,3 +106,95 @@ def test_replacement_after_index(tmp_path: str):
106106
),
107107
read_version=ds3.version,
108108
)
109+
110+
111+
def test_data_file_create_basic(tmp_path: str):
112+
"""DataFile.create should read file metadata and produce correct fields/indices."""
113+
table = pa.table({"a": range(10), "b": range(10, 20)})
114+
ds = lance.write_dataset(table, tmp_path)
115+
116+
# Write a lance file with both columns
117+
new_file_name = f"{uuid.uuid4()}.lance"
118+
new_file_path = f"{tmp_path}/data/{new_file_name}"
119+
with LanceFileWriter(new_file_path) as writer:
120+
writer.write_batch(table)
121+
122+
df = DataFile.create(ds, new_file_name)
123+
124+
# Should have both field IDs from the dataset
125+
frag = ds.get_fragments()[0]
126+
expected_fields = frag.data_files()[0].fields
127+
assert df.fields == expected_fields
128+
assert df.column_indices == [0, 1]
129+
assert df.file_major_version == int(stable_version().split(".")[0])
130+
assert df.file_minor_version == int(stable_version().split(".")[1])
131+
assert df.file_size_bytes is not None and df.file_size_bytes > 0
132+
133+
134+
def test_data_file_create_subset_columns(tmp_path: str):
135+
"""DataFile.create should work for a file with a subset of dataset columns."""
136+
table = pa.table({"a": range(10), "b": range(10, 20)})
137+
ds = lance.write_dataset(table, tmp_path)
138+
ds.add_columns({"c": "a + b"})
139+
ds = lance.dataset(tmp_path)
140+
141+
# Write a file with only column b
142+
new_file_name = f"{uuid.uuid4()}.lance"
143+
new_file_path = f"{tmp_path}/data/{new_file_name}"
144+
with LanceFileWriter(new_file_path, pa.schema([("b", pa.int64())])) as writer:
145+
writer.write_batch(pa.table({"b": range(100, 110)}))
146+
147+
df = DataFile.create(ds, new_file_name)
148+
149+
# Should only have b's field ID
150+
frag = ds.get_fragments()[0]
151+
all_fields = frag.data_files()[0].fields
152+
# b is the second field in the original data file
153+
b_field_id = all_fields[1]
154+
assert df.fields == [b_field_id]
155+
assert df.column_indices == [0]
156+
157+
158+
def test_data_file_create_end_to_end(tmp_path: str):
159+
"""DataFile.create should work end-to-end with DataReplacement."""
160+
table = pa.table({"a": range(100)})
161+
ds = lance.write_dataset(table, tmp_path)
162+
ds.add_columns({"b": "a + 1"})
163+
ds = lance.dataset(tmp_path)
164+
165+
# Write a replacement file for column b
166+
new_file_name = f"{uuid.uuid4()}.lance"
167+
new_file_path = f"{tmp_path}/data/{new_file_name}"
168+
replacement_data = pa.table({"b": range(200, 300)})
169+
with LanceFileWriter(new_file_path, pa.schema([("b", pa.int64())])) as writer:
170+
writer.write_batch(replacement_data)
171+
172+
# Use DataFile.create instead of manual construction
173+
df = DataFile.create(ds, new_file_name)
174+
175+
ds.commit(
176+
ds.uri,
177+
lance.LanceOperation.DataReplacement(
178+
[lance.LanceOperation.DataReplacementGroup(0, df)]
179+
),
180+
read_version=ds.version,
181+
)
182+
183+
result = lance.dataset(tmp_path).to_table()
184+
assert result.column("b").to_pylist() == list(range(200, 300))
185+
assert result.column("a").to_pylist() == list(range(100))
186+
187+
188+
def test_data_file_create_unknown_column(tmp_path: str):
189+
"""DataFile.create should raise an error for a file with unknown columns."""
190+
table = pa.table({"a": range(10)})
191+
ds = lance.write_dataset(table, tmp_path)
192+
193+
# Write a file with a column not in the dataset
194+
new_file_name = f"{uuid.uuid4()}.lance"
195+
new_file_path = f"{tmp_path}/data/{new_file_name}"
196+
with LanceFileWriter(new_file_path) as writer:
197+
writer.write_batch(pa.table({"z": range(10)}))
198+
199+
with pytest.raises(Exception, match="z"):
200+
DataFile.create(ds, new_file_name)

python/src/fragment.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,20 @@ impl FileFragment {
9696
Ok(PyLance(metadata))
9797
}
9898

99+
#[staticmethod]
100+
#[pyo3(signature = (dataset, path, base_id=None))]
101+
fn create_data_file(
102+
dataset: &Dataset,
103+
path: &str,
104+
base_id: Option<u32>,
105+
) -> PyResult<PyLance<DataFile>> {
106+
let ds = dataset.ds.clone();
107+
let data_file = rt()
108+
.block_on(None, ds.create_data_file(path, base_id))?
109+
.infer_error()?;
110+
Ok(PyLance(data_file))
111+
}
112+
99113
#[staticmethod]
100114
#[pyo3(signature = (dataset_uri, fragment_id, reader, **kwargs))]
101115
fn create(

rust/lance/src/dataset.rs

Lines changed: 115 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,17 @@ use lance_core::utils::tracing::{
2929
};
3030
use lance_datafusion::projection::ProjectionPlan;
3131
use lance_file::datatypes::populate_schema_dictionary;
32-
use lance_file::reader::FileReaderOptions;
32+
use lance_file::reader::{FileReader, FileReaderOptions};
3333
use lance_file::version::LanceFileVersion;
3434
use lance_index::{IndexType, progress::IndexBuildProgress};
3535
use lance_io::object_store::{
3636
LanceNamespaceStorageOptionsProvider, ObjectStore, ObjectStoreParams, StorageOptions,
3737
StorageOptionsAccessor, StorageOptionsProvider,
3838
};
39-
use lance_io::utils::{read_last_block, read_message, read_metadata_offset, read_struct};
39+
use lance_io::scheduler::{ScanScheduler, SchedulerConfig};
40+
use lance_io::utils::{
41+
CachedFileSize, read_last_block, read_message, read_metadata_offset, read_struct,
42+
};
4043
use lance_namespace::LanceNamespace;
4144
use lance_table::format::{
4245
DataFile, DataStorageFormat, DeletionFile, Fragment, IndexMetadata, Manifest, RowIdMeta, pb,
@@ -57,6 +60,7 @@ use serde::{Deserialize, Serialize};
5760
use std::borrow::Cow;
5861
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
5962
use std::fmt::Debug;
63+
use std::num::NonZero;
6064
use std::ops::Range;
6165
use std::pin::Pin;
6266
use std::sync::Arc;
@@ -1714,14 +1718,116 @@ impl Dataset {
17141718
}
17151719

17161720
pub(crate) fn data_file_dir(&self, data_file: &DataFile) -> Result<Path> {
1717-
match data_file.base_id.as_ref() {
1721+
self.data_file_dir_for_base(data_file.base_id)
1722+
}
1723+
1724+
/// Create a [`DataFile`] by reading metadata from an existing lance file.
1725+
///
1726+
/// This reads the file's schema and version information, matches columns to
1727+
/// the dataset's schema to determine field IDs, and calculates column indices.
1728+
/// This is useful for constructing `DataFile` metadata needed for operations
1729+
/// like [`Operation::DataReplacement`].
1730+
///
1731+
/// # Arguments
1732+
///
1733+
/// * `path` - The path to the data file, relative to the dataset's data directory.
1734+
/// * `base_id` - The base path ID if the file is outside the dataset directory.
1735+
pub async fn create_data_file(&self, path: &str, base_id: Option<u32>) -> Result<DataFile> {
1736+
let data_dir = self.data_file_dir_for_base(base_id)?;
1737+
let filepath = data_dir.child(path);
1738+
1739+
// Get file size
1740+
let file_size = self.object_store().size(&filepath).await?;
1741+
1742+
// Read file metadata
1743+
let scheduler = ScanScheduler::new(
1744+
self.object_store.clone(),
1745+
SchedulerConfig::new(2 * 1024 * 1024 * 1024),
1746+
);
1747+
let file = scheduler
1748+
.open_file(&filepath, &CachedFileSize::new(file_size))
1749+
.await?;
1750+
let file_metadata = FileReader::read_all_metadata(&file).await?;
1751+
1752+
let file_version = LanceFileVersion::try_from_major_minor(
1753+
file_metadata.major_version as u32,
1754+
file_metadata.minor_version as u32,
1755+
)?;
1756+
1757+
// Get top-level column names from file schema in file order
1758+
let column_names: Vec<&str> = file_metadata
1759+
.file_schema
1760+
.fields
1761+
.iter()
1762+
.map(|f| f.name.as_str())
1763+
.collect();
1764+
1765+
// Project dataset schema by file column names to get dataset field IDs
1766+
let projected_ds_schema = self.schema().project(&column_names)?;
1767+
1768+
// Walk both schemas in parallel to build fields and column_indices
1769+
let is_structural = file_version >= LanceFileVersion::V2_1;
1770+
let ds_fields: Vec<_> = projected_ds_schema.fields_pre_order().collect();
1771+
let file_fields: Vec<_> = file_metadata.file_schema.fields_pre_order().collect();
1772+
1773+
if ds_fields.len() != file_fields.len() {
1774+
return Err(Error::invalid_input(format!(
1775+
"Schema mismatch: dataset projection has {} fields but file has {} fields",
1776+
ds_fields.len(),
1777+
file_fields.len()
1778+
)));
1779+
}
1780+
1781+
let mut fields = Vec::new();
1782+
let mut column_indices = Vec::new();
1783+
let mut curr_column_idx: i32 = 0;
1784+
let mut packed_struct_fields_num: usize = 0;
1785+
1786+
for (ds_field, file_field) in ds_fields.iter().zip(file_fields.iter()) {
1787+
if ds_field.name != file_field.name {
1788+
return Err(Error::invalid_input(format!(
1789+
"Schema mismatch: expected field '{}' but file has '{}'",
1790+
ds_field.name, file_field.name
1791+
)));
1792+
}
1793+
1794+
if packed_struct_fields_num > 0 {
1795+
packed_struct_fields_num -= 1;
1796+
continue;
1797+
}
1798+
1799+
if file_field.is_packed_struct() {
1800+
fields.push(ds_field.id);
1801+
column_indices.push(curr_column_idx);
1802+
curr_column_idx += 1;
1803+
packed_struct_fields_num = file_field.children.len();
1804+
} else if file_field.children.is_empty() || !is_structural {
1805+
fields.push(ds_field.id);
1806+
column_indices.push(curr_column_idx);
1807+
curr_column_idx += 1;
1808+
}
1809+
}
1810+
1811+
let file_size_nz = NonZero::new(file_size);
1812+
Ok(DataFile::new(
1813+
path,
1814+
fields,
1815+
column_indices,
1816+
file_metadata.major_version as u32,
1817+
file_metadata.minor_version as u32,
1818+
file_size_nz,
1819+
base_id,
1820+
))
1821+
}
1822+
1823+
/// Resolve the data directory for a given base_id.
1824+
///
1825+
/// If `base_id` is `None`, returns the default data directory.
1826+
fn data_file_dir_for_base(&self, base_id: Option<u32>) -> Result<Path> {
1827+
match base_id {
17181828
Some(base_id) => {
1719-
let base_paths = &self.manifest.base_paths;
1720-
let base_path = base_paths.get(base_id).ok_or_else(|| {
1721-
Error::invalid_input(format!(
1722-
"base_path id {} not found for data_file {}",
1723-
base_id, data_file.path
1724-
))
1829+
let base_path = self.manifest.base_paths.get(&base_id).ok_or_else(|| {
1830+
Error::invalid_input(format!("base_path id {} not found", base_id))
17251831
})?;
17261832
let path = base_path.extract_path(self.session.store_registry())?;
17271833
if base_path.is_dataset_root {

0 commit comments

Comments
 (0)