diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 377ac546c3..a9e2d16a6e 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -700,6 +700,7 @@ def scanner( offset: Optional[int] = None, nearest: Optional[dict] = None, batch_size: Optional[int] = None, + batch_size_bytes: Optional[int] = None, batch_readahead: Optional[int] = None, fragment_readahead: Optional[int] = None, scan_in_order: Optional[bool] = None, @@ -795,9 +796,16 @@ def scanner( } batch_size: int, default None - The target size of batches returned. In some cases batches can be up to - twice this size (but never larger than this). In some cases batches can - be smaller than this size. + The maximum number of rows per batch. In some cases batches can be + smaller than this size. Note: this can be overridden by + ``batch_size_bytes`` or by a dataset-level ``batch_size_bytes`` + configured via ``FileReaderOptions``. + batch_size_bytes: int, default None + If set, the scanner will produce batches whose total size in bytes + is approximately this value, overriding the row-based ``batch_size``. + This can also be configured at the dataset level via + ``FileReaderOptions``. A scanner-level setting takes precedence + over the dataset-level default. io_buffer_size: int, default None The size of the IO buffer. See ``ScannerBuilder.io_buffer_size`` for more information. @@ -933,6 +941,7 @@ def setopt(opt, val): setopt(builder.limit, limit) setopt(builder.offset, offset) setopt(builder.batch_size, batch_size) + setopt(builder.batch_size_bytes, batch_size_bytes) setopt(builder.io_buffer_size, io_buffer_size) setopt(builder.batch_readahead, batch_readahead) setopt(builder.fragment_readahead, fragment_readahead) @@ -1016,6 +1025,7 @@ def to_table( offset: Optional[int] = None, nearest: Optional[dict] = None, batch_size: Optional[int] = None, + batch_size_bytes: Optional[int] = None, batch_readahead: Optional[int] = None, fragment_readahead: Optional[int] = None, scan_in_order: Optional[bool] = None, @@ -1143,6 +1153,7 @@ def to_table( offset=offset, nearest=nearest, batch_size=batch_size, + batch_size_bytes=batch_size_bytes, io_buffer_size=io_buffer_size, batch_readahead=batch_readahead, fragment_readahead=fragment_readahead, @@ -1519,6 +1530,7 @@ def to_batches( offset: Optional[int] = None, nearest: Optional[dict] = None, batch_size: Optional[int] = None, + batch_size_bytes: Optional[int] = None, batch_readahead: Optional[int] = None, fragment_readahead: Optional[int] = None, scan_in_order: Optional[bool] = None, @@ -1555,6 +1567,7 @@ def to_batches( offset=offset, nearest=nearest, batch_size=batch_size, + batch_size_bytes=batch_size_bytes, io_buffer_size=io_buffer_size, batch_readahead=batch_readahead, fragment_readahead=fragment_readahead, @@ -4984,6 +4997,7 @@ def __init__(self, ds: LanceDataset): self._columns_with_transform = None self._nearest = None self._batch_size: Optional[int] = None + self._batch_size_bytes: Optional[int] = None self._io_buffer_size: Optional[int] = None self._batch_readahead: Optional[int] = None self._fragment_readahead: Optional[int] = None @@ -5014,10 +5028,28 @@ def apply_defaults(self, default_opts: Dict[str, Any]) -> ScannerBuilder: return self def batch_size(self, batch_size: int) -> ScannerBuilder: - """Set batch size for Scanner""" + """Set the maximum number of rows per batch. + + Note: this can be overridden by ``batch_size_bytes`` or by a + dataset-level ``batch_size_bytes`` configured via + ``FileReaderOptions``. + """ self._batch_size = batch_size return self + def batch_size_bytes(self, batch_size_bytes: int) -> ScannerBuilder: + """Set the target batch size in bytes. + + When set, the scanner will produce batches whose total size in bytes + is approximately this value, overriding the row-based ``batch_size``. + + This can also be configured at the dataset level via + ``FileReaderOptions``. A scanner-level setting takes precedence + over the dataset-level default. + """ + self._batch_size_bytes = batch_size_bytes + return self + def io_buffer_size(self, io_buffer_size: int) -> ScannerBuilder: """ Set the I/O buffer size for the Scanner @@ -5402,6 +5434,7 @@ def to_scanner(self) -> LanceScanner: self._offset, self._nearest, self._batch_size, + self._batch_size_bytes, self._io_buffer_size, self._batch_readahead, self._fragment_readahead, diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 8838b89bc0..22c77c35aa 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -800,7 +800,7 @@ impl Dataset { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature=(columns=None, columns_with_transform=None, filter=None, search_filter=None, prefilter=None, limit=None, offset=None, nearest=None, batch_size=None, io_buffer_size=None, batch_readahead=None, fragment_readahead=None, scan_in_order=None, fragments=None, with_row_id=None, with_row_address=None, use_stats=None, substrait_filter=None, fast_search=None, full_text_query=None, late_materialization=None, blob_handling=None, use_scalar_index=None, include_deleted_rows=None, scan_stats_callback=None, strict_batch_size=None, order_by=None, disable_scoring_autoprojection=None, substrait_aggregate=None))] + #[pyo3(signature=(columns=None, columns_with_transform=None, filter=None, search_filter=None, prefilter=None, limit=None, offset=None, nearest=None, batch_size=None, batch_size_bytes=None, io_buffer_size=None, batch_readahead=None, fragment_readahead=None, scan_in_order=None, fragments=None, with_row_id=None, with_row_address=None, use_stats=None, substrait_filter=None, fast_search=None, full_text_query=None, late_materialization=None, blob_handling=None, use_scalar_index=None, include_deleted_rows=None, scan_stats_callback=None, strict_batch_size=None, order_by=None, disable_scoring_autoprojection=None, substrait_aggregate=None))] fn scanner( self_: PyRef<'_, Self>, columns: Option>, @@ -812,6 +812,7 @@ impl Dataset { offset: Option, nearest: Option<&Bound>, batch_size: Option, + batch_size_bytes: Option, io_buffer_size: Option, batch_readahead: Option, fragment_readahead: Option, @@ -956,6 +957,9 @@ impl Dataset { if let Some(batch_size) = batch_size { scanner.batch_size(batch_size); } + if let Some(batch_size_bytes) = batch_size_bytes { + scanner.batch_size_bytes(batch_size_bytes); + } if let Some(io_buffer_size) = io_buffer_size { scanner.io_buffer_size(io_buffer_size); } diff --git a/rust/lance-file/src/reader.rs b/rust/lance-file/src/reader.rs index d41820f75a..72734491d6 100644 --- a/rust/lance-file/src/reader.rs +++ b/rust/lance-file/src/reader.rs @@ -342,6 +342,13 @@ pub struct FileReaderOptions { /// will be read in multiple chunks to control memory usage. /// Default: 8MB (DEFAULT_READ_CHUNK_SIZE) pub read_chunk_size: u64, + /// If set, the reader will produce batches whose total size in bytes + /// is approximately this value, overriding the row-based `batch_size`. + /// + /// This can be set at the dataset level (via `ReadParams::file_reader_options`) + /// to provide a default for all scans, or at the scanner level (via + /// `Scanner::batch_size_bytes`) to override per scan. + pub batch_size_bytes: Option, } impl Default for FileReaderOptions { @@ -349,6 +356,7 @@ impl Default for FileReaderOptions { Self { decoder_config: DecoderConfig::default(), read_chunk_size: DEFAULT_READ_CHUNK_SIZE, + batch_size_bytes: None, } } } @@ -871,6 +879,7 @@ impl FileReader { projection: ReaderProjection, filter: FilterExpression, decoder_config: DecoderConfig, + batch_size_bytes: Option, ) -> Result> { debug!( "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns", @@ -887,7 +896,7 @@ impl FileReader { decoder_plugins, io, decoder_config, - batch_size_bytes: None, + batch_size_bytes, }; let requested_rows = RequestedRows::Ranges(vec![range]); @@ -921,6 +930,7 @@ impl FileReader { projection, filter, self.options.decoder_config.clone(), + self.options.batch_size_bytes, ) } @@ -935,6 +945,7 @@ impl FileReader { projection: ReaderProjection, filter: FilterExpression, decoder_config: DecoderConfig, + batch_size_bytes: Option, ) -> Result> { debug!( "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}", @@ -951,7 +962,7 @@ impl FileReader { decoder_plugins, io, decoder_config, - batch_size_bytes: None, + batch_size_bytes, }; let requested_rows = RequestedRows::Indices(indices); @@ -983,6 +994,7 @@ impl FileReader { projection, FilterExpression::no_filter(), self.options.decoder_config.clone(), + self.options.batch_size_bytes, ) } @@ -997,6 +1009,7 @@ impl FileReader { projection: ReaderProjection, filter: FilterExpression, decoder_config: DecoderConfig, + batch_size_bytes: Option, ) -> Result> { let num_rows = ranges.iter().map(|r| r.end - r.start).sum::(); debug!( @@ -1015,7 +1028,7 @@ impl FileReader { decoder_plugins, io, decoder_config, - batch_size_bytes: None, + batch_size_bytes, }; let requested_rows = RequestedRows::Ranges(ranges); @@ -1047,6 +1060,7 @@ impl FileReader { projection, filter, self.options.decoder_config.clone(), + self.options.batch_size_bytes, ) } @@ -1194,7 +1208,7 @@ impl FileReader { decoder_plugins: self.decoder_plugins.clone(), io: self.scheduler.clone(), decoder_config: self.options.decoder_config.clone(), - batch_size_bytes: None, + batch_size_bytes: self.options.batch_size_bytes, }; let requested_rows = RequestedRows::Indices(indices); @@ -1234,7 +1248,7 @@ impl FileReader { decoder_plugins: self.decoder_plugins.clone(), io: self.scheduler.clone(), decoder_config: self.options.decoder_config.clone(), - batch_size_bytes: None, + batch_size_bytes: self.options.batch_size_bytes, }; let requested_rows = RequestedRows::Ranges(ranges); @@ -1274,7 +1288,7 @@ impl FileReader { decoder_plugins: self.decoder_plugins.clone(), io: self.scheduler.clone(), decoder_config: self.options.decoder_config.clone(), - batch_size_bytes: None, + batch_size_bytes: self.options.batch_size_bytes, }; let requested_rows = RequestedRows::Ranges(vec![range]); diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index c5e5d24e65..b45aee6b81 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -257,6 +257,9 @@ pub struct ReadParams { /// File reader options to use when reading data files. /// /// This allows control over features like caching repetition indices and validation. + /// Options set here act as dataset-level defaults and can be overridden on a + /// per-scan basis via [`Scanner::batch_size_bytes`](crate::dataset::scanner::Scanner::batch_size_bytes) or + /// [`Scanner::with_file_reader_options`](crate::dataset::scanner::Scanner::with_file_reader_options). pub file_reader_options: Option, } diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index b7f5094ac3..5f40c79b4f 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -719,6 +719,10 @@ pub struct Scanner { /// The batch size controls the maximum size of rows to return for each read. batch_size: Option, + /// If set, the scanner will produce batches whose total size in bytes + /// is approximately this value, overriding the row-based `batch_size`. + batch_size_bytes: Option, + /// Number of batches to prefetch batch_readahead: usize, @@ -989,6 +993,7 @@ impl Scanner { filter: LanceFilter::default(), full_text_query: None, batch_size: None, + batch_size_bytes: None, batch_readahead: get_num_compute_intensive_cpus(), fragment_readahead: None, io_buffer_size: None, @@ -1261,12 +1266,29 @@ impl Scanner { Ok(self) } - /// Set the batch size. + /// Set the maximum number of rows per batch. + /// + /// Note: this can be overridden by [`Self::batch_size_bytes`] or by a dataset-level + /// `batch_size_bytes` set via [`ReadParams::file_reader_options`](crate::dataset::ReadParams::file_reader_options). When a byte-based + /// batch size is active, the row-based batch size is used only as an initial estimate. pub fn batch_size(&mut self, batch_size: usize) -> &mut Self { self.batch_size = Some(batch_size); self } + /// Set the target batch size in bytes. + /// + /// When set, the scanner will produce batches whose total size in bytes + /// is approximately this value, overriding the row-based `batch_size`. + /// + /// This can also be configured at the dataset level via + /// [`ReadParams::file_reader_options`](crate::dataset::ReadParams::file_reader_options). A scanner-level setting takes + /// precedence over the dataset-level default. + pub fn batch_size_bytes(&mut self, batch_size_bytes: u64) -> &mut Self { + self.batch_size_bytes = Some(batch_size_bytes); + self + } + /// Include deleted rows /// /// These are rows that have been deleted from the dataset but are still present in the @@ -1688,6 +1710,30 @@ impl Scanner { self } + /// Compute the resolved file reader options, merging the scanner's explicit + /// `file_reader_options`, the dataset-level defaults, and the `batch_size_bytes` + /// setting. + fn resolved_file_reader_options(&self) -> Option { + let base = self + .file_reader_options + .clone() + .or_else(|| self.dataset.file_reader_options.clone()); + match (base, self.batch_size_bytes) { + (Some(mut opts), Some(bsb)) => { + if opts.batch_size_bytes.is_none() { + opts.batch_size_bytes = Some(bsb); + } + Some(opts) + } + (Some(opts), None) => Some(opts), + (None, Some(bsb)) => Some(FileReaderOptions { + batch_size_bytes: Some(bsb), + ..Default::default() + }), + (None, None) => None, + } + } + /// Create a physical expression for a column that may be nested fn create_column_expr( column_name: &str, @@ -2658,6 +2704,10 @@ impl Scanner { read_options = read_options.with_batch_size(batch_size as u32); } + if let Some(file_reader_options) = self.resolved_file_reader_options() { + read_options = read_options.with_file_reader_options(file_reader_options); + } + if let Some(fragment_readahead) = self.fragment_readahead { read_options = read_options.with_fragment_readahead(fragment_readahead); } @@ -4003,6 +4053,7 @@ impl Scanner { with_row_created_at_version, with_make_deletions_null, ordered_output: ordered, + file_reader_options: self.resolved_file_reader_options(), }; Arc::new(LanceScanExec::new( self.dataset.clone(), @@ -4029,10 +4080,7 @@ impl Scanner { with_row_address: self.projection_plan.physical_projection.with_row_addr, make_deletions_null, ordered_output: self.ordered, - file_reader_options: self - .file_reader_options - .clone() - .or_else(|| self.dataset.file_reader_options.clone()), + file_reader_options: self.resolved_file_reader_options(), }; let fragments = if let Some(fragment) = self.fragments.as_ref() { diff --git a/rust/lance/src/io/exec/filtered_read.rs b/rust/lance/src/io/exec/filtered_read.rs index e3837f9ce4..2360c1a221 100644 --- a/rust/lance/src/io/exec/filtered_read.rs +++ b/rust/lance/src/io/exec/filtered_read.rs @@ -42,6 +42,7 @@ use lance_datafusion::utils::{ ExecutionPlanMetricsSetExt, FRAGMENTS_SCANNED_METRIC, RANGES_SCANNED_METRIC, ROWS_SCANNED_METRIC, TASK_WAIT_TIME_METRIC, }; +use lance_file::reader::FileReaderOptions; use lance_index::scalar::expression::{FilterPlan, IndexExprResult}; use lance_io::scheduler::{ScanScheduler, SchedulerConfig}; use lance_table::format::Fragment; @@ -118,6 +119,7 @@ struct ScopedFragmentRead { projection: Arc, with_deleted_rows: bool, batch_size: u32, + file_reader_options: Option, // An in-memory filter to apply after reading the fragment (whatever couldn't be // pushed down into the index query) filter: Option, @@ -127,13 +129,17 @@ struct ScopedFragmentRead { impl ScopedFragmentRead { fn frag_read_config(&self) -> FragReadConfig { - FragReadConfig::default() + let mut config = FragReadConfig::default() .with_row_id(self.with_deleted_rows || self.projection.with_row_id) .with_row_address(self.projection.with_row_addr) .with_row_last_updated_at_version(self.projection.with_row_last_updated_at_version) .with_row_created_at_version(self.projection.with_row_created_at_version) .with_scan_scheduler(self.scan_scheduler.clone()) - .with_reader_priority(self.priority) + .with_reader_priority(self.priority); + if let Some(file_reader_options) = &self.file_reader_options { + config = config.with_file_reader_options(file_reader_options.clone()); + } + config } } @@ -669,6 +675,7 @@ impl FilteredReadStream { projection: projection.clone(), with_deleted_rows: options.with_deleted_rows, batch_size: default_batch_size, + file_reader_options: options.file_reader_options.clone(), filter, priority: priority as u32, scan_scheduler: scan_scheduler.clone(), @@ -1243,6 +1250,8 @@ pub struct FilteredReadOptions { pub with_deleted_rows: bool, /// The maximum number of rows per batch pub batch_size: Option, + /// File reader options to use when reading data files. + pub file_reader_options: Option, /// Controls how many fragments to read ahead pub fragment_readahead: Option, /// The fragments to read @@ -1281,6 +1290,7 @@ impl FilteredReadOptions { scan_range_after_filter: None, with_deleted_rows: false, batch_size: None, + file_reader_options: None, fragment_readahead: None, fragments: None, projection, @@ -1369,6 +1379,12 @@ impl FilteredReadOptions { self } + /// Specify the file reader options to use when reading data files. + pub fn with_file_reader_options(mut self, file_reader_options: FileReaderOptions) -> Self { + self.file_reader_options = Some(file_reader_options); + self + } + /// Controls how many fragments to read ahead. /// /// If not set, the default will be 2 * the I/O parallelism. Generally, reading ahead diff --git a/rust/lance/src/io/exec/scan.rs b/rust/lance/src/io/exec/scan.rs index 030016c9d7..c7736eccfb 100644 --- a/rust/lance/src/io/exec/scan.rs +++ b/rust/lance/src/io/exec/scan.rs @@ -27,6 +27,7 @@ use lance_arrow::SchemaExt; use lance_core::utils::tokio::get_num_compute_intensive_cpus; use lance_core::utils::tracing::StreamTracingExt; use lance_core::{Error, ROW_ADDR_FIELD, ROW_ID_FIELD}; +use lance_file::reader::FileReaderOptions; use lance_io::scheduler::{ScanScheduler, SchedulerConfig}; use lance_table::format::Fragment; use log::debug; @@ -274,25 +275,31 @@ impl LanceStream { let scan_scheduler_clone = scan_scheduler.clone(); + let config_for_stream = config.clone(); let batches = stream::iter(file_fragments.into_iter().enumerate()) .map(move |(priority, file_fragment)| { let project_schema = project_schema.clone(); let scan_scheduler = scan_scheduler.clone(); + let config = config_for_stream.clone(); #[allow(clippy::type_complexity)] let frag_task: BoxFuture< Result>>>>, > = tokio::spawn( (async move { + let mut frag_config = FragReadConfig::default() + .with_row_id(config.with_row_id) + .with_row_address(config.with_row_address) + .with_row_last_updated_at_version( + config.with_row_last_updated_at_version, + ) + .with_row_created_at_version(config.with_row_created_at_version); + if let Some(file_reader_options) = config.file_reader_options { + frag_config = frag_config.with_file_reader_options(file_reader_options); + } let reader = open_file( file_fragment.fragment, project_schema, - FragReadConfig::default() - .with_row_id(config.with_row_id) - .with_row_address(config.with_row_address) - .with_row_last_updated_at_version( - config.with_row_last_updated_at_version, - ) - .with_row_created_at_version(config.with_row_created_at_version), + frag_config, config.with_make_deletions_null, Some((scan_scheduler, priority as u32)), ) @@ -499,6 +506,7 @@ pub struct LanceScanConfig { pub with_row_created_at_version: bool, pub with_make_deletions_null: bool, pub ordered_output: bool, + pub file_reader_options: Option, } // This is mostly for testing purposes, end users are unlikely to create this @@ -516,6 +524,7 @@ impl Default for LanceScanConfig { with_row_created_at_version: false, with_make_deletions_null: false, ordered_output: false, + file_reader_options: None, } } }