Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 113 additions & 12 deletions src/mito2/src/sst/index/bloom_filter/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::time::Instant;

use common_base::range_read::RangeReader;
use common_telemetry::{tracing, warn};
use datatypes::data_type::ConcreteDataType;
use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
use index::bloom_filter::reader::{
BloomFilterReadMetrics, BloomFilterReader, BloomFilterReaderImpl,
Expand All @@ -30,6 +31,7 @@ use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use puffin::puffin_manager::{PuffinManager, PuffinReader};
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::region_request::PathType;
use store_api::storage::ColumnId;

Expand All @@ -38,7 +40,6 @@ use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::cache::index::bloom_filter_index::{
BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
};
use crate::cache::index::result_cache::PredicateKey;
use crate::error::{
ApplyBloomFilterIndexSnafu, Error, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu,
Result,
Expand Down Expand Up @@ -133,10 +134,10 @@ pub struct BloomFilterIndexApplier {

/// Bloom filter predicates.
/// For each column, the value will be retained only if it contains __all__ predicates.
predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,
default_predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,

/// Predicate key. Used to identify the predicate and fetch result from cache.
predicate_key: PredicateKey,
/// Expected predicate column types from the latest region metadata.
expected_predicate_col_types: BTreeMap<ColumnId, ConcreteDataType>,
}

impl BloomFilterIndexApplier {
Expand All @@ -149,8 +150,9 @@ impl BloomFilterIndexApplier {
object_store: ObjectStore,
puffin_manager_factory: PuffinManagerFactory,
predicates: BTreeMap<ColumnId, Vec<InListPredicate>>,
expected_predicate_col_types: BTreeMap<ColumnId, ConcreteDataType>,
) -> Self {
let predicates = Arc::new(predicates);
let default_predicates = Arc::new(predicates);
Self {
table_dir,
path_type,
Expand All @@ -159,8 +161,8 @@ impl BloomFilterIndexApplier {
puffin_manager_factory,
puffin_metadata_cache: None,
bloom_filter_index_cache: None,
predicate_key: PredicateKey::new_bloom(predicates.clone()),
predicates,
default_predicates,
expected_predicate_col_types,
}
}

Expand Down Expand Up @@ -207,6 +209,7 @@ impl BloomFilterIndexApplier {
&self,
file_id: RegionIndexId,
file_size_hint: Option<u64>,
predicates: &BTreeMap<ColumnId, Vec<InListPredicate>>,
row_groups: impl Iterator<Item = (usize, bool)>,
mut metrics: Option<&mut BloomFilterIndexApplyMetrics>,
) -> Result<Vec<(usize, Vec<Range<usize>>)>> {
Expand All @@ -230,7 +233,7 @@ impl BloomFilterIndexApplier {
.map(|(i, range)| (*i, vec![range.clone()]))
.collect::<Vec<_>>();

for (column_id, predicates) in self.predicates.iter() {
for (column_id, predicates) in predicates {
let blob = match self
.blob_reader(file_id, *column_id, file_size_hint, metrics.as_deref_mut())
.await?
Expand Down Expand Up @@ -438,9 +441,45 @@ impl BloomFilterIndexApplier {
Ok(())
}

/// Returns the predicate key.
pub fn predicate_key(&self) -> &PredicateKey {
&self.predicate_key
/// Returns compatible bloom filter predicates with the given SST metadata.
///
/// Returns `None` when no compatible predicate remains for this SST.
pub fn compatible_predicate_for_sst(
&self,
sst_metadata: &RegionMetadataRef,
) -> Option<Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>> {
let mut has_type_mismatch = false;
let mut compatible_col_ids = Vec::new();

for (col_id, expected) in &self.expected_predicate_col_types {
if let Some(sst_col) = sst_metadata.column_by_id(*col_id)
&& sst_col.column_schema.data_type != *expected
{
has_type_mismatch = true;
continue;
}
Comment thread
fengys1996 marked this conversation as resolved.
Outdated

if self.default_predicates.contains_key(col_id) {
compatible_col_ids.push(*col_id);
}
}
Comment thread
fengys1996 marked this conversation as resolved.

if compatible_col_ids.is_empty() {
return None;
}

if !has_type_mismatch {
return Some(self.default_predicates.clone());
}

let mut compatible_predicates = BTreeMap::new();
for col_id in compatible_col_ids {
if let Some(predicates) = self.default_predicates.get(&col_id) {
compatible_predicates.insert(col_id, predicates.clone());
}
}
Comment thread
fengys1996 marked this conversation as resolved.

Some(Arc::new(compatible_predicates))
}
}

Expand All @@ -456,9 +495,12 @@ fn is_blob_not_found(err: &Error) -> bool {

#[cfg(test)]
mod tests {
use std::collections::BTreeSet;

use datafusion_expr::{Expr, col, lit};
use futures::future::BoxFuture;
use index::Bytes;
use object_store::services::Memory;
use puffin::puffin_manager::PuffinWriter;
use store_api::metadata::RegionMetadata;
use store_api::storage::FileId;
Expand All @@ -470,6 +512,62 @@ mod tests {
mock_object_store, mock_region_metadata, new_batch, new_intm_mgr,
};

#[tokio::test]
async fn test_compatible_predicate_for_sst() {
let (_d, puffin_manager_factory) =
PuffinManagerFactory::new_for_test_async("test_plan_for_sst_basic_").await;
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
let table_dir = "table_dir".to_string();

let predicates = BTreeMap::from_iter([(
1,
vec![InListPredicate {
list: BTreeSet::from_iter([Bytes::from("foo")]),
}],
)]);
let expected_predicate_col_types =
BTreeMap::from_iter([(1, ConcreteDataType::string_datatype())]);

let applier = BloomFilterIndexApplier::new(
table_dir,
PathType::Bare,
object_store,
puffin_manager_factory,
predicates,
expected_predicate_col_types,
);
let predicates = applier.compatible_predicate_for_sst(&mock_region_metadata());
assert!(predicates.is_some());
}

#[tokio::test]
async fn test_compatible_predicate_for_sst_type_mismatch() {
let (_d, puffin_manager_factory) =
PuffinManagerFactory::new_for_test_async("test_plan_for_sst_type_mismatch_").await;
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
let table_dir = "table_dir".to_string();

let predicates = BTreeMap::from_iter([(
1,
vec![InListPredicate {
list: BTreeSet::from_iter([Bytes::from("foo")]),
}],
)]);
let expected_predicate_col_types =
BTreeMap::from_iter([(1, ConcreteDataType::int64_datatype())]);

let applier = BloomFilterIndexApplier::new(
table_dir,
PathType::Bare,
object_store,
puffin_manager_factory,
predicates,
expected_predicate_col_types,
);
let predicates = applier.compatible_predicate_for_sst(&mock_region_metadata());
assert!(predicates.is_none());
}

#[allow(clippy::type_complexity)]
fn tester(
table_dir: String,
Expand All @@ -496,8 +594,11 @@ mod tests {
);

let applier = builder.build(&exprs).unwrap().unwrap();
let predicates = applier
.compatible_predicate_for_sst(&Arc::new(metadata.clone()))
.unwrap();
applier
.apply(file_id, None, row_groups.into_iter(), None)
.apply(file_id, None, &predicates, row_groups.into_iter(), None)
.await
.unwrap()
.into_iter()
Expand Down
25 changes: 19 additions & 6 deletions src/mito2/src/sst/index/bloom_filter/applier/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,14 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> {
return Ok(None);
}

let expected_predicate_column_types = self.expected_predicate_column_types();
let applier = BloomFilterIndexApplier::new(
self.table_dir,
self.path_type,
self.object_store,
self.puffin_manager_factory,
self.predicates,
expected_predicate_column_types,
)
.with_file_cache(self.file_cache)
.with_puffin_metadata_cache(self.puffin_metadata_cache)
Expand Down Expand Up @@ -137,6 +139,17 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> {
}
}

/// Returns `(column_id, data_type)` pairs for predicate columns.
fn expected_predicate_column_types(&self) -> BTreeMap<ColumnId, ConcreteDataType> {
self.predicates
.keys()
.filter_map(|col_id| {
let col = self.metadata.column_by_id(*col_id)?;
Some((*col_id, col.column_schema.data_type.clone()))
})
.collect()
}

/// Helper function to get the column id and type
fn column_id_and_type(
&self,
Expand Down Expand Up @@ -404,7 +417,7 @@ mod tests {
let result = builder.build(&exprs).unwrap();
assert!(result.is_some());

let predicates = result.unwrap().predicates;
let predicates = result.unwrap().default_predicates;
assert_eq!(predicates.len(), 1);

let column_predicates = predicates.get(&1).unwrap();
Expand Down Expand Up @@ -443,7 +456,7 @@ mod tests {
let result = builder.build(&exprs).unwrap();
assert!(result.is_some());

let predicates = result.unwrap().predicates;
let predicates = result.unwrap().default_predicates;
let column_predicates = predicates.get(&2).unwrap();
assert_eq!(column_predicates.len(), 1);
assert_eq!(column_predicates[0].list.len(), 3);
Expand Down Expand Up @@ -473,7 +486,7 @@ mod tests {
let result = builder().build(&[expr]).unwrap();
assert!(result.is_some());

let predicates = result.unwrap().predicates;
let predicates = result.unwrap().default_predicates;
let column_predicates = predicates.get(&1).unwrap();
assert_eq!(column_predicates.len(), 1);
assert_eq!(column_predicates[0].list.len(), 4);
Expand Down Expand Up @@ -537,7 +550,7 @@ mod tests {
let result = builder.build(&exprs).unwrap();
assert!(result.is_some());

let predicates = result.unwrap().predicates;
let predicates = result.unwrap().default_predicates;
assert_eq!(predicates.len(), 2);
assert!(predicates.contains_key(&1));
assert!(predicates.contains_key(&2));
Expand Down Expand Up @@ -575,7 +588,7 @@ mod tests {
let result = builder.build(&exprs).unwrap();
assert!(result.is_some());

let predicates = result.unwrap().predicates;
let predicates = result.unwrap().default_predicates;
assert!(!predicates.contains_key(&1)); // Null equality should be ignored
let column2_predicates = predicates.get(&2).unwrap();
assert_eq!(column2_predicates[0].list.len(), 2);
Expand Down Expand Up @@ -644,7 +657,7 @@ mod tests {
let result = builder.build(&exprs).unwrap();
assert!(result.is_some());

let predicates = result.unwrap().predicates;
let predicates = result.unwrap().default_predicates;
let column_predicates = predicates.get(&1).unwrap();
assert_eq!(column_predicates.len(), 2);
}
Expand Down
23 changes: 15 additions & 8 deletions src/mito2/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ mod tests {

use super::*;
use crate::access_layer::{FilePathProvider, Metrics, RegionFilePathFactory, WriteType};
use crate::cache::index::result_cache::PredicateKey;
use crate::cache::test_util::assert_parquet_metadata_equal;
use crate::cache::{CacheManager, CacheStrategy, PageKey};
use crate::config::IndexConfig;
Expand Down Expand Up @@ -985,11 +986,14 @@ mod tests {
assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 2);
assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
let bloom_predicates = bloom_filter_applier
.as_ref()
.unwrap()
.compatible_predicate_for_sst(&metadata)
.unwrap();
let bloom_predicate_key = PredicateKey::new_bloom(bloom_predicates);
let cached = index_result_cache
.get(
bloom_filter_applier.unwrap().predicate_key(),
handle.file_id().file_id(),
)
.get(&bloom_predicate_key, handle.file_id().file_id())
.unwrap();
assert!(cached.contains_row_group(2));
assert!(cached.contains_row_group(3));
Expand Down Expand Up @@ -1055,11 +1059,14 @@ mod tests {
assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 140);
let bloom_predicates = bloom_filter_applier
.as_ref()
.unwrap()
.compatible_predicate_for_sst(&metadata)
.unwrap();
let bloom_predicate_key = PredicateKey::new_bloom(bloom_predicates);
let cached = index_result_cache
.get(
bloom_filter_applier.unwrap().predicate_key(),
handle.file_id().file_id(),
)
.get(&bloom_predicate_key, handle.file_id().file_id())
.unwrap();
assert!(cached.contains_row_group(0));
assert!(cached.contains_row_group(1));
Expand Down
Loading
Loading