Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 65 additions & 9 deletions src/mito2/src/sst/index/bloom_filter/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::time::Instant;

use common_base::range_read::RangeReader;
use common_telemetry::{tracing, warn};
use datatypes::data_type::ConcreteDataType;
use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
use index::bloom_filter::reader::{
BloomFilterReadMetrics, BloomFilterReader, BloomFilterReaderImpl,
Expand All @@ -30,6 +31,7 @@ use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use puffin::puffin_manager::{PuffinManager, PuffinReader};
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::region_request::PathType;
use store_api::storage::ColumnId;

Expand Down Expand Up @@ -133,10 +135,19 @@ pub struct BloomFilterIndexApplier {

/// Bloom filter predicates.
/// For each column, the value will be retained only if it contains __all__ predicates.
predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,
predicates: BTreeMap<ColumnId, Vec<InListPredicate>>,

/// Predicate key. Used to identify the predicate and fetch result from cache.
predicate_key: PredicateKey,
/// Default apply plan built from all collected predicates.
default_plan: SstApplyPlan,

/// Expected predicate column types from the latest region metadata.
expected_predicate_col_types: BTreeMap<ColumnId, ConcreteDataType>,
}

#[derive(Clone)]
pub(crate) struct SstApplyPlan {
pub predicate_key: PredicateKey,
pub predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,
}

impl BloomFilterIndexApplier {
Expand All @@ -149,8 +160,9 @@ impl BloomFilterIndexApplier {
object_store: ObjectStore,
puffin_manager_factory: PuffinManagerFactory,
predicates: BTreeMap<ColumnId, Vec<InListPredicate>>,
expected_predicate_col_types: BTreeMap<ColumnId, ConcreteDataType>,
) -> Self {
let predicates = Arc::new(predicates);
let default_plan = Self::build_apply_plan(predicates.clone());
Self {
table_dir,
path_type,
Expand All @@ -159,8 +171,9 @@ impl BloomFilterIndexApplier {
puffin_manager_factory,
puffin_metadata_cache: None,
bloom_filter_index_cache: None,
predicate_key: PredicateKey::new_bloom(predicates.clone()),
default_plan,
predicates,
expected_predicate_col_types,
}
}

Expand Down Expand Up @@ -207,6 +220,7 @@ impl BloomFilterIndexApplier {
&self,
file_id: RegionIndexId,
file_size_hint: Option<u64>,
predicates: &BTreeMap<ColumnId, Vec<InListPredicate>>,
row_groups: impl Iterator<Item = (usize, bool)>,
mut metrics: Option<&mut BloomFilterIndexApplyMetrics>,
) -> Result<Vec<(usize, Vec<Range<usize>>)>> {
Expand All @@ -230,7 +244,7 @@ impl BloomFilterIndexApplier {
.map(|(i, range)| (*i, vec![range.clone()]))
.collect::<Vec<_>>();

for (column_id, predicates) in self.predicates.iter() {
for (column_id, predicates) in predicates {
let blob = match self
.blob_reader(file_id, *column_id, file_size_hint, metrics.as_deref_mut())
.await?
Expand Down Expand Up @@ -439,8 +453,43 @@ impl BloomFilterIndexApplier {
}

/// Returns the predicate key.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is outdated.

pub fn predicate_key(&self) -> &PredicateKey {
&self.predicate_key
pub fn plan_for_sst(&self, sst_metadata: &RegionMetadataRef) -> Option<SstApplyPlan> {
let mut compatible_predicates = BTreeMap::new();
let mut has_type_mismatch = false;

for (col_id, expected) in &self.expected_predicate_col_types {
if let Some(sst_col) = sst_metadata.column_by_id(*col_id)
&& sst_col.column_schema.data_type != *expected
{
has_type_mismatch = true;
continue;
}
Comment thread
fengys1996 marked this conversation as resolved.

if let Some(predicates) = self.predicates.get(col_id) {
compatible_predicates.insert(*col_id, predicates.clone());
}
}

if compatible_predicates.is_empty() {
return None;
}
Comment on lines +466 to +468
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use a flag has_type_mismatch and reuse the plan in the inverted applier. Should we follow the same pattern?

if !has_type_mismatch {
return Ok(Some(self.default_plan.clone()));
}


if !has_type_mismatch {
return Some(self.default_plan.clone());
}

Some(Self::build_apply_plan(compatible_predicates))
}

fn build_apply_plan(
predicates_by_col: BTreeMap<ColumnId, Vec<InListPredicate>>,
) -> SstApplyPlan {
let predicates = Arc::new(predicates_by_col);
let predicate_key = PredicateKey::new_bloom(predicates.clone());
SstApplyPlan {
predicate_key,
predicates,
}
}
}

Expand Down Expand Up @@ -496,8 +545,15 @@ mod tests {
);

let applier = builder.build(&exprs).unwrap().unwrap();
let plan = applier.plan_for_sst(&Arc::new(metadata.clone())).unwrap();
applier
.apply(file_id, None, row_groups.into_iter(), None)
.apply(
file_id,
None,
&plan.predicates,
row_groups.into_iter(),
None,
)
.await
.unwrap()
.into_iter()
Expand Down
13 changes: 13 additions & 0 deletions src/mito2/src/sst/index/bloom_filter/applier/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,14 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> {
return Ok(None);
}

let expected_predicate_column_types = self.expected_predicate_column_types();
let applier = BloomFilterIndexApplier::new(
self.table_dir,
self.path_type,
self.object_store,
self.puffin_manager_factory,
self.predicates,
expected_predicate_column_types,
)
.with_file_cache(self.file_cache)
.with_puffin_metadata_cache(self.puffin_metadata_cache)
Expand Down Expand Up @@ -137,6 +139,17 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> {
}
}

/// Returns `(column_id, data_type)` pairs for predicate columns.
fn expected_predicate_column_types(&self) -> BTreeMap<ColumnId, ConcreteDataType> {
self.predicates
.keys()
.filter_map(|col_id| {
let col = self.metadata.column_by_id(*col_id)?;
Some((*col_id, col.column_schema.data_type.clone()))
})
.collect()
}

/// Helper function to get the column id and type
fn column_id_and_type(
&self,
Expand Down
17 changes: 11 additions & 6 deletions src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,7 @@ impl ParquetReaderBuilder {
}

self.prune_row_groups_by_bloom_filter(
read_format.metadata(),
row_group_size,
parquet_meta,
&mut output,
Expand Down Expand Up @@ -888,6 +889,7 @@ impl ParquetReaderBuilder {

async fn prune_row_groups_by_bloom_filter(
&self,
sst_metadata: &RegionMetadataRef,
row_group_size: usize,
parquet_meta: &ParquetMetaData,
output: &mut RowGroupSelection,
Expand All @@ -906,12 +908,14 @@ impl ParquetReaderBuilder {
&self.bloom_filter_index_appliers[..]
};
for index_applier in appliers.iter().flatten() {
let predicate_key = index_applier.predicate_key();
let Some(plan) = index_applier.plan_for_sst(sst_metadata) else {
continue;
};
// Fast path: return early if the result is in the cache.
let cached = self
.cache_strategy
.index_result_cache()
.and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
let cached = self.cache_strategy.index_result_cache().and_then(|cache| {
let file_id = self.file_handle.file_id().file_id();
cache.get(&plan.predicate_key, file_id)
});
if let Some(result) = cached.as_ref()
&& all_required_row_groups_searched(output, result)
{
Expand Down Expand Up @@ -939,6 +943,7 @@ impl ParquetReaderBuilder {
.apply(
self.file_handle.index_id(),
Some(file_size_hint),
&plan.predicates,
rgs,
metrics.bloom_filter_apply_metrics.as_mut(),
)
Expand All @@ -959,7 +964,7 @@ impl ParquetReaderBuilder {
}

self.apply_index_result_and_update_cache(
predicate_key,
&plan.predicate_key,
self.file_handle.file_id().file_id(),
selection,
output,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
-- Regression test for skip index with column type change.
CREATE TABLE monitoring_data_skip (
host STRING SKIPPING INDEX,
`region` STRING,
cpu_usage DOUBLE SKIPPING INDEX,
`timestamp` TIMESTAMP TIME INDEX
) WITH ('append_mode'='true');

Affected Rows: 0

INSERT INTO monitoring_data_skip (host, region, cpu_usage, `timestamp`) VALUES
('web-01', 'us-east', 12.5, '2026-05-06 10:00:00'),
('web-01', 'us-east', 15.2, '2026-05-06 10:01:00'),
('web-02', 'us-east', 23.7, '2026-05-06 10:01:00'),
('db-01', 'us-east', 45.0, '2026-05-06 10:02:00'),
('db-02', 'us-west', 82.2, '2026-05-06 10:02:00'),
('cache-01', 'eu-central', 55.4, '2026-05-06 10:02:00'),
('queue-01', 'ap-south', 99.1, '2026-05-06 10:02:00');

Affected Rows: 7

ADMIN FLUSH_TABLE('monitoring_data_skip');

+-------------------------------------------+
| ADMIN FLUSH_TABLE('monitoring_data_skip') |
+-------------------------------------------+
| 0 |
+-------------------------------------------+

ALTER TABLE monitoring_data_skip
MODIFY COLUMN cpu_usage STRING;

Affected Rows: 0

SELECT host, region, cpu_usage, `timestamp` FROM monitoring_data_skip
WHERE cpu_usage = '23.7';

+--------+---------+-----------+---------------------+
| host | region | cpu_usage | timestamp |
+--------+---------+-----------+---------------------+
| web-02 | us-east | 23.7 | 2026-05-06 10:01:00 |
+--------+---------+-----------+---------------------+

DROP TABLE monitoring_data_skip;

Affected Rows: 0
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
-- Regression test for skip index with column type change.
CREATE TABLE monitoring_data_skip (
host STRING SKIPPING INDEX,
`region` STRING,
cpu_usage DOUBLE SKIPPING INDEX,
`timestamp` TIMESTAMP TIME INDEX
) WITH ('append_mode'='true');

INSERT INTO monitoring_data_skip (host, region, cpu_usage, `timestamp`) VALUES
('web-01', 'us-east', 12.5, '2026-05-06 10:00:00'),
('web-01', 'us-east', 15.2, '2026-05-06 10:01:00'),
('web-02', 'us-east', 23.7, '2026-05-06 10:01:00'),
('db-01', 'us-east', 45.0, '2026-05-06 10:02:00'),
('db-02', 'us-west', 82.2, '2026-05-06 10:02:00'),
('cache-01', 'eu-central', 55.4, '2026-05-06 10:02:00'),
('queue-01', 'ap-south', 99.1, '2026-05-06 10:02:00');

ADMIN FLUSH_TABLE('monitoring_data_skip');

ALTER TABLE monitoring_data_skip
MODIFY COLUMN cpu_usage STRING;

SELECT host, region, cpu_usage, `timestamp` FROM monitoring_data_skip
WHERE cpu_usage = '23.7';

DROP TABLE monitoring_data_skip;
Loading