diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 377ac546c3..af17873310 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -3551,7 +3551,7 @@ def merge_index_metadata( This method does NOT commit changes. This API merges temporary scalar index files (for example per-fragment - BTree or inverted index outputs). + BTree, Bitmap, or inverted index outputs). After this method returns, callers MUST explicitly commit the index manifest using lance.LanceDataset.commit(...) with a LanceOperation.CreateIndex. diff --git a/python/python/lance/indices/__init__.py b/python/python/lance/indices/__init__.py index edf9e5091f..4050bbc780 100644 --- a/python/python/lance/indices/__init__.py +++ b/python/python/lance/indices/__init__.py @@ -28,6 +28,7 @@ class IndexFileVersion(str, Enum): class SupportedDistributedIndices(str, Enum): # Scalar index types BTREE = "BTREE" + BITMAP = "BITMAP" INVERTED = "INVERTED" # Precise vector index types supported by distributed merge diff --git a/python/python/tests/test_scalar_index.py b/python/python/tests/test_scalar_index.py index 4b7cdc4b7f..127f64f576 100644 --- a/python/python/tests/test_scalar_index.py +++ b/python/python/tests/test_scalar_index.py @@ -3746,6 +3746,112 @@ def test_distribute_btree_index_build(tmp_path): ) +def test_distribute_bitmap_index_build(tmp_path): + """ + Per-fragment BITMAP shards under one index UUID, merge_index_metadata, commit, + then match a monolithic bitmap index on the same data. + """ + from lance.dataset import Index + + num_fragments = 4 + rows_per_fragment = 500 + tiers_cycle = ["low", "med", "high"] + + def tier_for_row(global_row: int) -> str: + return tiers_cycle[global_row % 3] + + first = pa.table( + { + "id": pa.array(range(rows_per_fragment)), + "tier": pa.array([tier_for_row(j) for j in range(rows_per_fragment)]), + } + ) + ds = lance.write_dataset(first, tmp_path, max_rows_per_file=rows_per_fragment) + for i in range(1, num_fragments): + start = i * rows_per_fragment + frag = pa.table( + { + "id": pa.array(range(start, start + rows_per_fragment)), + "tier": pa.array( + [tier_for_row(j) for j in range(start, start + rows_per_fragment)] + ), + } + ) + ds = lance.write_dataset( + frag, tmp_path, mode="append", max_rows_per_file=rows_per_fragment + ) + + index_id = str(uuid.uuid4()) + index_name = "tier_bitmap_dist_idx" + fragments = ds.get_fragments() + fragment_ids = [f.fragment_id for f in fragments] + + for fragment in fragments: + ds.create_scalar_index( + column="tier", + index_type="BITMAP", + name=index_name, + replace=False, + index_uuid=index_id, + fragment_ids=[fragment.fragment_id], + ) + + test_tier = "med" + pre_merge = ds.scanner( + filter=f"tier = '{test_tier}'", + columns=["id", "tier"], + ).to_table() + assert pre_merge.num_rows > 0 + + ds.merge_index_metadata(index_id, index_type="BITMAP") + + field_id = ds.schema.get_field_index("tier") + index = Index( + uuid=index_id, + name=index_name, + fields=[field_id], + dataset_version=ds.version, + fragment_ids=set(fragment_ids), + index_version=0, + ) + create_index_op = lance.LanceOperation.CreateIndex( + new_indices=[index], + removed_indices=[], + ) + ds_committed = lance.LanceDataset.commit( + ds.uri, + create_index_op, + read_version=ds.version, + ) + + stats = ds_committed.stats.index_stats(index_name) + assert stats["name"] == index_name + assert stats["index_type"] == "Bitmap" + + dist_results = ds_committed.scanner( + filter=f"tier = '{test_tier}'", + columns=["id", "tier"], + ).to_table() + + ref_path = tmp_path / "ref_bitmap_mono" + full = ds_committed.scanner().to_table() + ref_ds = lance.write_dataset(full, ref_path) + ref_ds.create_scalar_index( + column="tier", + index_type="BITMAP", + name="ref_tier_bitmap", + ) + ref_results = ref_ds.scanner( + filter=f"tier = '{test_tier}'", + columns=["id", "tier"], + ).to_table() + + assert dist_results.num_rows == ref_results.num_rows + assert sorted(dist_results.column("id").to_pylist()) == sorted( + ref_results.column("id").to_pylist() + ) + + def test_btree_fragment_ids_parameter_validation(tmp_path): """ Test validation of fragment_ids parameter for B-tree indices. diff --git a/rust/lance-index/src/scalar/bitmap.rs b/rust/lance-index/src/scalar/bitmap.rs index dd490265e0..2c333cf6ad 100644 --- a/rust/lance-index/src/scalar/bitmap.rs +++ b/rust/lance-index/src/scalar/bitmap.rs @@ -3,6 +3,7 @@ use std::{ any::Any, + cmp, collections::{BTreeMap, HashMap}, fmt::Debug, ops::Bound, @@ -30,13 +31,14 @@ use lance_core::{ }; use roaring::RoaringBitmap; use serde::Serialize; -use tracing::instrument; +use tracing::{instrument, warn}; use super::{AnyQuery, IndexStore, ScalarIndex}; use super::{ BuiltinIndexType, SargableQuery, ScalarIndexParams, SearchResult, btree::OrderableScalarValue, }; use crate::pbold; +use crate::progress::IndexBuildProgress; use crate::{Index, IndexType, metrics::MetricsCollector}; use crate::{ frag_reuse::FragReuseIndex, @@ -50,6 +52,8 @@ use crate::{ }, }; use crate::{scalar::IndexReader, scalar::expression::ScalarQueryParser}; +use lance_io::object_store::ObjectStore; +use object_store::path::Path; pub const BITMAP_LOOKUP_NAME: &str = "bitmap_page_lookup.lance"; pub const INDEX_STATS_METADATA_KEY: &str = "lance:index_stats"; @@ -60,6 +64,11 @@ const MAX_ROWS_PER_CHUNK: usize = 2 * 1024; const BITMAP_INDEX_VERSION: u32 = 0; +/// Per-shard bitmap lookup file for distributed builds (`fragment_ids`). +pub fn part_bitmap_file_path(partition_id: u64) -> String { + format!("part_{partition_id}_{}", BITMAP_LOOKUP_NAME) +} + // We only need to open a file reader if we need to load a bitmap. If all // bitmaps are cached we don't open it. If we do open it we should only open it once. #[derive(Clone)] @@ -606,7 +615,13 @@ impl ScalarIndex for BitmapIndex { dest_store: &dyn IndexStore, _old_data_filter: Option, ) -> Result { - BitmapIndexPlugin::streaming_build_and_write(new_data, Some(self), dest_store).await?; + BitmapIndexPlugin::streaming_build_and_write( + new_data, + Some(self), + dest_store, + BITMAP_LOOKUP_NAME, + ) + .await?; Ok(CreatedIndex { index_details: prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default()) @@ -834,7 +849,7 @@ impl BitmapIndexPlugin { data: SendableRecordBatchStream, index_store: &dyn IndexStore, ) -> Result<()> { - Self::streaming_build_and_write(data, None, index_store).await + Self::streaming_build_and_write(data, None, index_store, BITMAP_LOOKUP_NAME).await } /// Builds and writes a bitmap index in a streaming fashion from value-sorted @@ -848,6 +863,7 @@ impl BitmapIndexPlugin { mut data_source: SendableRecordBatchStream, old_index: Option<&BitmapIndex>, index_store: &dyn IndexStore, + output_lookup_file: &str, ) -> Result<()> { let value_type = data_source.schema().field(0).data_type().clone(); @@ -857,7 +873,7 @@ impl BitmapIndexPlugin { ])); let index_file = index_store - .new_index_file(BITMAP_LOOKUP_NAME, schema) + .new_index_file(output_lookup_file, schema) .await?; let mut writer = BitmapBatchWriter::new(index_file); @@ -1067,13 +1083,13 @@ impl ScalarIndexPlugin for BitmapIndexPlugin { fragment_ids: Option>, _progress: Arc, ) -> Result { - if fragment_ids.is_some() { - return Err(Error::invalid_input_source( - "Bitmap index does not support fragment training".into(), - )); - } + let output_lookup_file = fragment_ids + .as_ref() + .and_then(|ids| ids.first()) + .map(|&frag_id| part_bitmap_file_path((frag_id as u64) << 32)) + .unwrap_or_else(|| BITMAP_LOOKUP_NAME.to_string()); - Self::train_bitmap_index(data, index_store).await?; + Self::streaming_build_and_write(data, None, index_store, &output_lookup_file).await?; Ok(CreatedIndex { index_details: prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default()) .unwrap(), @@ -1110,6 +1126,203 @@ impl ScalarIndexPlugin for BitmapIndexPlugin { } } +/// Extract partition id from `part_{id}_bitmap_page_lookup.lance` (same pattern as btree shards). +fn extract_bitmap_shard_partition_id(filename: &str) -> Result { + if !filename.starts_with("part_") { + return Err(Error::internal(format!( + "Invalid bitmap shard file name: {filename}" + ))); + } + let parts: Vec<&str> = filename.split('_').collect(); + if parts.len() < 3 { + return Err(Error::internal(format!( + "Invalid bitmap shard file name: {filename}" + ))); + } + parts[1].parse::().map_err(|_| { + Error::internal(format!( + "Failed to parse partition id from bitmap shard file: {filename}" + )) + }) +} + +async fn list_bitmap_shard_files( + object_store: &ObjectStore, + index_dir: &Path, +) -> Result> { + let mut shards = Vec::new(); + let mut list_stream = object_store.list(Some(index_dir.clone())); + while let Some(item) = list_stream.next().await { + let meta = item.map_err(|e| Error::io(format!("list bitmap index dir: {e}")))?; + let file_name = meta.location.filename().unwrap_or_default(); + if file_name.starts_with("part_") && file_name.ends_with(BITMAP_LOOKUP_NAME) { + shards.push(file_name.to_string()); + } + } + if shards.is_empty() { + return Err(Error::internal(format!( + "No bitmap shard files (part_*_{BITMAP_LOOKUP_NAME}) found in {index_dir}" + ))); + } + shards.sort(); + Ok(shards) +} + +struct BitmapShardMergeCursor { + reader: Arc, + next_row: usize, + num_rows: usize, + current_key: Option, + current_bitmap: RowAddrTreeMap, +} + +impl BitmapShardMergeCursor { + async fn new(reader: Arc) -> Result { + let num_rows = reader.num_rows(); + let mut s = Self { + reader, + next_row: 0, + num_rows, + current_key: None, + current_bitmap: RowAddrTreeMap::default(), + }; + s.advance().await?; + Ok(s) + } + + async fn advance(&mut self) -> Result<()> { + if self.next_row >= self.num_rows { + self.current_key = None; + return Ok(()); + } + let batch = self + .reader + .read_range(self.next_row..self.next_row + 1, Some(&["keys", "bitmaps"])) + .await?; + self.next_row += 1; + let keys = batch.column(0); + let key = OrderableScalarValue(ScalarValue::try_from_array(keys, 0)?); + let binary_bitmaps = batch + .column(1) + .as_any() + .downcast_ref::() + .ok_or_else(|| Error::internal("Invalid bitmap column type in shard".to_string()))?; + let bitmap_bytes = binary_bitmaps.value(0); + let bitmap = RowAddrTreeMap::deserialize_from(bitmap_bytes).unwrap(); + self.current_key = Some(key); + self.current_bitmap = bitmap; + Ok(()) + } +} + +async fn cleanup_bitmap_shard_files(store: &dyn IndexStore, shard_names: &[String]) { + for name in shard_names { + if let Err(e) = store.delete_index_file(name).await { + warn!( + target: "lance_index::scalar::bitmap", + "failed to delete bitmap shard file {name} after merge: {e}" + ); + } + } +} + +/// Merge per-fragment bitmap index shards into a single [`BITMAP_LOOKUP_NAME`] file. +/// +/// Workers write [`part_bitmap_file_path`] outputs during fragment-scoped training. +/// This performs a K-way merge on keys and unions [`RowAddrTreeMap`] for matching keys. +pub async fn merge_index_files( + object_store: &ObjectStore, + index_dir: &Path, + store: Arc, + _batch_readhead: Option, + progress: Arc, +) -> Result<()> { + progress + .stage_start("merge_bitmap_shards", None, "shards") + .await?; + let shard_names = list_bitmap_shard_files(object_store, index_dir).await?; + for name in &shard_names { + extract_bitmap_shard_partition_id(name)?; + } + + let mut readers: Vec> = Vec::with_capacity(shard_names.len()); + for name in &shard_names { + readers.push(store.open_index_file(name).await?); + } + + let value_type = readers[0].schema().fields[0].data_type().clone(); + for r in readers.iter().skip(1) { + if r.schema().fields[0].data_type() != value_type { + return Err(Error::invalid_input(format!( + "Bitmap shard schema mismatch across shards (expected {value_type:?})" + ))); + } + } + + let mut cursors = Vec::with_capacity(readers.len()); + for reader in readers { + cursors.push(BitmapShardMergeCursor::new(reader).await?); + } + progress.stage_complete("merge_bitmap_shards").await?; + + progress + .stage_start("write_merged_bitmap", None, "keys") + .await?; + + let schema = Arc::new(Schema::new(vec![ + Field::new("keys", value_type.clone(), true), + Field::new("bitmaps", DataType::Binary, true), + ])); + let mut writer = + BitmapBatchWriter::new(store.new_index_file(BITMAP_LOOKUP_NAME, schema).await?); + + let mut keys_emitted: u64 = 0; + loop { + let mut min_key: Option = None; + for c in &cursors { + if let Some(k) = &c.current_key { + min_key = Some(match &min_key { + None => k.clone(), + Some(m) => cmp::min(k.clone(), m.clone()), + }); + } + } + let Some(min_key) = min_key else { + break; + }; + + let mut merged = RowAddrTreeMap::default(); + for c in &mut cursors { + if let Some(k) = &c.current_key + && *k == min_key + { + merged |= &c.current_bitmap; + } + } + + writer.emit(min_key.0.clone(), &merged).await?; + + for c in &mut cursors { + if let Some(k) = &c.current_key + && *k == min_key + { + c.advance().await?; + } + } + keys_emitted += 1; + progress + .stage_progress("write_merged_bitmap", keys_emitted) + .await?; + } + + writer.finish().await?; + progress.stage_complete("write_merged_bitmap").await?; + + cleanup_bitmap_shard_files(store.as_ref(), &shard_names).await; + + Ok(()) +} + #[cfg(test)] mod tests { use super::*; @@ -1814,4 +2027,118 @@ mod tests { _ => panic!("Expected Exact search result"), } } + + #[tokio::test] + async fn test_bitmap_merge_distributed_shards() { + use crate::progress::noop_progress; + + let tmpdir = TempObjDir::default(); + let os = Arc::new(ObjectStore::local()); + let store = Arc::new(LanceIndexStore::new( + os.clone(), + tmpdir.clone(), + Arc::new(LanceCache::no_cache()), + )); + + let schema = Arc::new(Schema::new(vec![ + Field::new("value", DataType::Utf8, false), + Field::new("_rowid", DataType::UInt64, false), + ])); + + let batch0 = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(vec!["a", "a", "b"])), + Arc::new(UInt64Array::from(vec![0_u64, 1, 2])), + ], + ) + .unwrap(); + let batch0 = sort_batch_by_value(&batch0); + let s0 = schema.clone(); + let stream0 = Box::pin(RecordBatchStreamAdapter::new( + s0, + stream::once(async move { Ok(batch0) }), + )); + BitmapIndexPlugin::streaming_build_and_write( + stream0, + None, + store.as_ref(), + &part_bitmap_file_path(0), + ) + .await + .unwrap(); + + let batch1 = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(vec!["a", "c"])), + Arc::new(UInt64Array::from(vec![100_u64, 101])), + ], + ) + .unwrap(); + let batch1 = sort_batch_by_value(&batch1); + let stream1 = Box::pin(RecordBatchStreamAdapter::new( + schema, + stream::once(async move { Ok(batch1) }), + )); + BitmapIndexPlugin::streaming_build_and_write( + stream1, + None, + store.as_ref(), + &part_bitmap_file_path(1_u64 << 32), + ) + .await + .unwrap(); + + merge_index_files(&os, &tmpdir, store.clone(), None, noop_progress()) + .await + .unwrap(); + + let index = BitmapIndex::load(store, None, &LanceCache::no_cache()) + .await + .unwrap(); + + let query = SargableQuery::Equals(ScalarValue::Utf8(Some("a".to_string()))); + let result = index.search(&query, &NoOpMetricsCollector).await.unwrap(); + if let SearchResult::Exact(row_ids) = result { + let mut actual: Vec = row_ids + .true_rows() + .row_addrs() + .unwrap() + .map(u64::from) + .collect(); + actual.sort(); + assert_eq!(actual, vec![0, 1, 100]); + } else { + panic!("expected exact search result for key a"); + } + + let query = SargableQuery::Equals(ScalarValue::Utf8(Some("b".to_string()))); + let result = index.search(&query, &NoOpMetricsCollector).await.unwrap(); + if let SearchResult::Exact(row_ids) = result { + let actual: Vec = row_ids + .true_rows() + .row_addrs() + .unwrap() + .map(u64::from) + .collect(); + assert_eq!(actual, vec![2]); + } else { + panic!("expected exact search result for key b"); + } + + let query = SargableQuery::Equals(ScalarValue::Utf8(Some("c".to_string()))); + let result = index.search(&query, &NoOpMetricsCollector).await.unwrap(); + if let SearchResult::Exact(row_ids) = result { + let actual: Vec = row_ids + .true_rows() + .row_addrs() + .unwrap() + .map(u64::from) + .collect(); + assert_eq!(actual, vec![101]); + } else { + panic!("expected exact search result for key c"); + } + } } diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 8ef2c79a67..1f15ca0d19 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -2792,6 +2792,16 @@ impl Dataset { ) .await } + IndexType::Bitmap => { + lance_index::scalar::bitmap::merge_index_files( + self.object_store(), + &index_dir, + Arc::new(store), + batch_readhead, + progress, + ) + .await + } IndexType::IvfFlat | IndexType::IvfPq | IndexType::IvfSq | IndexType::Vector => { Err(Error::invalid_input( "Vector distributed indexing no longer supports merge_index_metadata; \