From 51beee8e81b05065a8e6c21a16123107b6ce5b3e Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 18 May 2026 17:54:58 +0800 Subject: [PATCH 1/4] feat: initial implementation for range cache time filters Signed-off-by: evenyag --- src/mito2/src/read/range_cache.rs | 509 +++++++++++++++++++++++++++--- src/mito2/src/read/scan_region.rs | 111 ++++--- src/mito2/src/read/scan_util.rs | 2 + src/table/src/predicate.rs | 9 +- 4 files changed, 541 insertions(+), 90 deletions(-) diff --git a/src/mito2/src/read/range_cache.rs b/src/mito2/src/read/range_cache.rs index 7d1010205d74..b8e01fcf8393 100644 --- a/src/mito2/src/read/range_cache.rs +++ b/src/mito2/src/read/range_cache.rs @@ -19,13 +19,20 @@ use std::sync::Arc; use async_stream::try_stream; use common_telemetry::warn; +use common_time::Timestamp; +use common_time::range::TimestampRange; +use common_time::timestamp::TimeUnit; +use datafusion_expr::expr::Expr; +use datafusion_expr::{Between, BinaryExpr, Operator}; use datatypes::arrow::compute::concat_batches; use datatypes::arrow::record_batch::RecordBatch; use datatypes::prelude::ConcreteDataType; +use datatypes::value::scalar_value_to_timestamp; use futures::TryStreamExt; use snafu::ResultExt; use store_api::region_engine::PartitionRange; use store_api::storage::{FileId, RegionId, TimeSeriesRowSelector}; +use table::predicate::is_string_timestamp_literal; use tokio::sync::{mpsc, oneshot}; use crate::cache::CacheStrategy; @@ -139,7 +146,6 @@ impl ScanRequestFingerprint { .unwrap_or(&[]) } - #[allow(dead_code)] pub(crate) fn without_time_filters(&self) -> Self { Self { inner: Arc::clone(&self.inner), @@ -266,6 +272,177 @@ pub(crate) fn collect_partition_range_row_groups( } } +/// Returns the timestamp range where all time-only predicates are guaranteed true. +/// +/// Returns `Some(min_to_max)` for empty input (vacuously true everywhere). +/// Returns `None` if any expression contains an unsupported shape: `OR`, `NOT`, +/// `IN`, non-literal RHS, unsupported operator, column-name mismatch, an `=` +/// literal that cannot be represented exactly in the column unit, or overflow +/// during bound adjustment. +/// +/// This is intentionally stricter than `extract_time_range_from_expr` in +/// `table::predicate`: lower bounds round up and upper bounds round down. If a +/// partition's file-time range is contained by the returned range, every row in +/// that partition satisfies the original time predicates. +/// +/// `IsNull`/`IsNotNull` on the time index are not routed into `time_filters` +/// today. If that changes, handle them here before stripping time filters from +/// the cache key. +pub(crate) fn implied_time_range_from_exprs( + ts_col_name: &str, + ts_col_unit: TimeUnit, + exprs: &[&Expr], +) -> Option { + let mut acc = TimestampRange::min_to_max(); + for expr in exprs { + let r = implied_time_range_from_expr(ts_col_name, ts_col_unit, expr)?; + acc = acc.and(&r); + } + Some(acc) +} + +fn implied_time_range_from_expr( + ts_col_name: &str, + ts_col_unit: TimeUnit, + expr: &Expr, +) -> Option { + match expr { + Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op { + Operator::And => { + let l = implied_time_range_from_expr(ts_col_name, ts_col_unit, left)?; + let r = implied_time_range_from_expr(ts_col_name, ts_col_unit, right)?; + Some(l.and(&r)) + } + Operator::Eq | Operator::Lt | Operator::LtEq | Operator::Gt | Operator::GtEq => { + implied_from_cmp(ts_col_name, ts_col_unit, left, *op, right) + } + // `OR` would require a strict intersection over a union of half-planes + // (not the loose-span union provided by `TimestampRange::or`), so we + // refuse it. Any other operator is unsupported. + _ => None, + }, + Expr::Between(Between { + expr, + negated, + low, + high, + }) => { + if *negated { + return None; + } + implied_from_between(ts_col_name, ts_col_unit, expr, low, high) + } + // Includes `IsNull`, `IsNotNull`, `Not`, `InList`, function calls, etc. + _ => None, + } +} + +fn match_ts_column_literal<'a>( + ts_col_name: &str, + left: &'a Expr, + right: &'a Expr, +) -> Option<(Timestamp, bool)> { + let (col, scalar, reverse) = match (left, right) { + (Expr::Column(c), Expr::Literal(s, _)) => (c, s, false), + (Expr::Literal(s, _), Expr::Column(c)) => (c, s, true), + _ => return None, + }; + if col.name != ts_col_name { + return None; + } + // Reject string literals: their conversion needs a timezone we do not have, + // and the existing extractor in `table::predicate` rejects them too. + if is_string_timestamp_literal(scalar) { + return None; + } + scalar_value_to_timestamp(scalar, None).map(|t| (t, reverse)) +} + +fn implied_from_cmp( + ts_col_name: &str, + ts_col_unit: TimeUnit, + left: &Expr, + op: Operator, + right: &Expr, +) -> Option { + let (ts, reverse) = match_ts_column_literal(ts_col_name, left, right)?; + // Normalize to "column OP literal". + let op = if reverse { + match op { + Operator::Lt => Operator::Gt, + Operator::LtEq => Operator::GtEq, + Operator::Gt => Operator::Lt, + Operator::GtEq => Operator::LtEq, + Operator::Eq => Operator::Eq, + _ => return None, + } + } else { + op + }; + + match op { + Operator::GtEq => { + // ts >= L. Round the lower bound up in the column unit. + let b = ts.convert_to_ceil(ts_col_unit)?; + Some(TimestampRange::from_start(b)) + } + Operator::Gt => { + // ts > L. floor(L) + 1 is the tight lower bound in the column unit. + let v = ts.convert_to(ts_col_unit)?.value().checked_add(1)?; + Some(TimestampRange::from_start(Timestamp::new(v, ts_col_unit))) + } + Operator::LtEq => { + // ts <= L. Round the upper bound down in the column unit. + let b = ts.convert_to(ts_col_unit)?; + Some(TimestampRange::until_end(b, true)) + } + Operator::Lt => { + // ts < L. Use the exclusive floor bound; it is sound even when the + // literal is not exactly representable in the column unit. + let b = ts.convert_to(ts_col_unit)?; + Some(TimestampRange::until_end(b, false)) + } + Operator::Eq => { + // ts = L. Only provable when L is exactly representable. + let f = ts.convert_to(ts_col_unit)?; + let c = ts.convert_to_ceil(ts_col_unit)?; + if f.value() != c.value() { + return None; + } + Some(TimestampRange::single(f)) + } + _ => None, + } +} + +fn implied_from_between( + ts_col_name: &str, + ts_col_unit: TimeUnit, + expr: &Expr, + low: &Expr, + high: &Expr, +) -> Option { + let Expr::Column(c) = expr else { + return None; + }; + if c.name != ts_col_name { + return None; + } + let (low_s, high_s) = match (low, high) { + (Expr::Literal(l, _), Expr::Literal(h, _)) => (l, h), + _ => return None, + }; + if is_string_timestamp_literal(low_s) || is_string_timestamp_literal(high_s) { + return None; + } + let low_ts = scalar_value_to_timestamp(low_s, None)?; + let high_ts = scalar_value_to_timestamp(high_s, None)?; + // BETWEEN low AND high is equivalent to ts >= low AND ts <= high. + let lo = low_ts.convert_to_ceil(ts_col_unit)?; + let hi = high_ts.convert_to(ts_col_unit)?; + Some(TimestampRange::new_inclusive(Some(lo), Some(hi))) +} + /// Builds a cache key for the given partition range if it is eligible for caching. pub(crate) fn build_range_cache_key( stream_ctx: &StreamContext, @@ -292,17 +469,24 @@ pub(crate) fn build_range_cache_key( return None; } - // TODO(yingwen): We used to call `fingerprint.without_time_filters()` when the query's - // `TimestampRange` fully covered the partition's `FileTimeRange`, so different queries that - // all enclosed the same partition could share a cache entry. The cover check turned out to - // be too coarse: it returned true in cases where the dropped time predicates would still - // have excluded rows, so the cache served results that should have been filtered. Reviving - // the optimization needs a per-predicate implication check that walks each time-only `Expr` - // (recursing through AND/OR/NOT) and proves the predicate is satisfied for every timestamp - // inside the partition's `FileTimeRange` — not the looser "does `extract_time_range_from_expr` - // return a range that covers the partition" used previously. Until then, always carry the - // full fingerprint so cache reuse stays correct. - let scan = fingerprint.clone(); + // If every time-only predicate is implied to be true on this partition's + // `FileTimeRange`, drop them from the cache key so that two queries with + // different but equally-covering time bounds share the same entry. The + // implied range is computed once per scan by `implied_time_range_from_exprs` + // (see `build_scan_fingerprint`); `None` means at least one time-only + // predicate had an unsupported shape (e.g. `OR`), so the optimization is + // disabled and the time predicates remain part of the key. + let range_meta = &stream_ctx.ranges[part_range.identifier]; + let (file_min, file_max) = range_meta.time_range; + let covers = match &stream_ctx.scan_implied_time_range { + Some(implied) => implied.contains(&file_min) && implied.contains(&file_max), + None => false, + }; + let scan = if covers { + fingerprint.without_time_filters() + } else { + fingerprint.clone() + }; Some(RangeScanCacheKey { region_id: stream_ctx.input.region_metadata().region_id, @@ -722,11 +906,16 @@ mod tests { num_rows: 10, }; let partition_range = range_meta.new_partition_range(0); - let scan_fingerprint = crate::read::scan_region::build_scan_fingerprint(&input); + let (scan_fingerprint, scan_implied_time_range) = + match crate::read::scan_region::build_scan_fingerprint(&input) { + Some(b) => (Some(b.fingerprint), b.implied_time_range), + None => (None, None), + }; let stream_ctx = StreamContext { input, ranges: vec![range_meta], scan_fingerprint, + scan_implied_time_range, query_start: Instant::now(), }; @@ -770,57 +959,279 @@ mod tests { } #[tokio::test] - async fn preserves_time_filters_when_query_covers_partition_range() { - assert_range_cache_filters( + async fn range_cache_time_filter_key_cases() { + let partition = ( + Timestamp::new_millisecond(1000), + Timestamp::new_millisecond(2000), + ); + + struct Case { + filters: Vec, + query_time_range: Option, + expected_filters: Vec, + expected_time_filters: Vec, + } + + // Time filters are stripped only when their implied range fully covers + // the partition's file-time range. `is_not_null(ts)` stays in regular + // filters because it is not routed into `time_filters`. + for case in [ + Case { + filters: vec![ + col("ts").gt_eq(ts_lit(1000)), + col("ts").lt(ts_lit(2001)), + col("ts").is_not_null(), + col("k0").eq(lit("foo")), + ], + query_time_range: TimestampRange::with_unit(1000, 2002, TimeUnit::Millisecond), + expected_filters: vec![col("k0").eq(lit("foo")), col("ts").is_not_null()], + expected_time_filters: vec![], + }, + Case { + filters: vec![ + col("ts").gt_eq(ts_lit(500)), + col("ts").lt(ts_lit(3000)), + col("k0").eq(lit("foo")), + ], + query_time_range: TimestampRange::with_unit(500, 3000, TimeUnit::Millisecond), + expected_filters: vec![col("k0").eq(lit("foo"))], + expected_time_filters: vec![], + }, + Case { + filters: vec![ + col("ts").gt_eq(ts_lit(1000)), + col("ts").lt_eq(ts_lit(2000)), + col("k0").eq(lit("foo")), + ], + query_time_range: TimestampRange::with_unit(1000, 2001, TimeUnit::Millisecond), + expected_filters: vec![col("k0").eq(lit("foo"))], + expected_time_filters: vec![], + }, + Case { + filters: vec![ + col("ts").between(ts_lit(1000), ts_lit(2000)), + col("k0").eq(lit("foo")), + ], + query_time_range: TimestampRange::with_unit(1000, 2001, TimeUnit::Millisecond), + expected_filters: vec![col("k0").eq(lit("foo"))], + expected_time_filters: vec![], + }, + Case { + filters: vec![col("ts").gt_eq(ts_lit(1200)), col("k0").eq(lit("foo"))], + query_time_range: TimestampRange::with_unit(1200, 2001, TimeUnit::Millisecond), + expected_filters: vec![col("k0").eq(lit("foo"))], + expected_time_filters: vec![col("ts").gt_eq(ts_lit(1200))], + }, + Case { + filters: vec![ + col("ts").gt_eq(ts_lit(1500)), + col("ts").is_not_null(), + col("k0").eq(lit("foo")), + ], + query_time_range: None, + expected_filters: vec![col("k0").eq(lit("foo")), col("ts").is_not_null()], + expected_time_filters: vec![col("ts").gt_eq(ts_lit(1500))], + }, + ] { + assert_range_cache_filters( + case.filters, + case.query_time_range, + partition, + case.expected_filters, + case.expected_time_filters, + ) + .await; + } + } + + #[tokio::test] + async fn two_distinct_queries_share_cache_key_when_both_cover() { + let partition_range = ( + Timestamp::new_millisecond(1000), + Timestamp::new_millisecond(2000), + ); + + let (ctx_a, part_a) = new_stream_context( vec![ - col("ts").gt_eq(ts_lit(1000)), - col("ts").lt(ts_lit(2001)), - col("ts").is_not_null(), + col("ts").gt_eq(ts_lit(500)), + col("ts").lt(ts_lit(3000)), col("k0").eq(lit("foo")), ], - TimestampRange::with_unit(1000, 2002, TimeUnit::Millisecond), - ( - Timestamp::new_millisecond(1000), - Timestamp::new_millisecond(2000), - ), - vec![col("k0").eq(lit("foo")), col("ts").is_not_null()], - vec![col("ts").gt_eq(ts_lit(1000)), col("ts").lt(ts_lit(2001))], + TimestampRange::with_unit(500, 3000, TimeUnit::Millisecond), + partition_range, + ) + .await; + let (ctx_b, part_b) = new_stream_context( + vec![ + col("ts").gt_eq(ts_lit(100)), + col("ts").lt(ts_lit(5000)), + col("k0").eq(lit("foo")), + ], + TimestampRange::with_unit(100, 5000, TimeUnit::Millisecond), + partition_range, ) .await; + + let key_a = build_range_cache_key(&ctx_a, &part_a).unwrap(); + let key_b = build_range_cache_key(&ctx_b, &part_b).unwrap(); + assert_eq!(key_a.scan, key_b.scan); + assert!(key_a.scan.time_filters().is_empty()); } #[tokio::test] - async fn preserves_time_filters_when_query_does_not_cover_partition_range() { - assert_range_cache_filters( - vec![col("ts").gt_eq(ts_lit(1000)), col("k0").eq(lit("foo"))], - TimestampRange::with_unit(1000, 1500, TimeUnit::Millisecond), - ( - Timestamp::new_millisecond(1000), - Timestamp::new_millisecond(2000), - ), - vec![col("k0").eq(lit("foo"))], - vec![col("ts").gt_eq(ts_lit(1000))], + async fn disables_optimization_on_or_clause() { + let partition_range = ( + Timestamp::new_millisecond(1000), + Timestamp::new_millisecond(2000), + ); + + let or_a = col("ts").gt_eq(ts_lit(1000)).or(col("ts").lt(ts_lit(500))); + let or_b = col("ts").gt_eq(ts_lit(900)).or(col("ts").lt(ts_lit(400))); + + let (ctx_a, part_a) = new_stream_context( + vec![or_a.clone(), col("k0").eq(lit("foo"))], + None, + partition_range, ) .await; + let (ctx_b, part_b) = new_stream_context( + vec![or_b.clone(), col("k0").eq(lit("foo"))], + None, + partition_range, + ) + .await; + + assert!(ctx_a.scan_implied_time_range.is_none()); + let key_a = build_range_cache_key(&ctx_a, &part_a).unwrap(); + let key_b = build_range_cache_key(&ctx_b, &part_b).unwrap(); + assert_ne!(key_a.scan, key_b.scan); + assert_eq!( + key_a.scan.time_filters(), + normalized_exprs([or_a]).as_slice() + ); } - #[tokio::test] - async fn preserves_time_filters_when_query_has_no_time_range_limit() { - assert_range_cache_filters( - vec![ + fn ms_ts(v: i64) -> Timestamp { + Timestamp::new_millisecond(v) + } + + fn implied_ms(expr: Expr) -> Option { + implied_time_range_from_exprs("ts", TimeUnit::Millisecond, &[&expr]) + } + + #[test] + fn implied_time_range_supported_exprs() { + for (expr, expected) in [ + ( col("ts").gt_eq(ts_lit(1000)), - col("ts").is_not_null(), - col("k0").eq(lit("foo")), - ], - None, + Some(TimestampRange::from_start(ms_ts(1000))), + ), ( - Timestamp::new_millisecond(1000), - Timestamp::new_millisecond(2000), + col("ts").gt(ts_lit(1000)), + Some(TimestampRange::from_start(ms_ts(1001))), ), - vec![col("k0").eq(lit("foo")), col("ts").is_not_null()], - vec![col("ts").gt_eq(ts_lit(1000))], - ) - .await; + ( + col("ts").lt_eq(ts_lit(2000)), + Some(TimestampRange::until_end(ms_ts(2000), true)), + ), + ( + col("ts").lt(ts_lit(2000)), + Some(TimestampRange::until_end(ms_ts(2000), false)), + ), + ( + col("ts").eq(ts_lit(1500)), + Some(TimestampRange::single(ms_ts(1500))), + ), + ( + ts_lit(1000).lt_eq(col("ts")), + Some(TimestampRange::from_start(ms_ts(1000))), + ), + ( + col("ts").between(ts_lit(1000), ts_lit(2000)), + Some(TimestampRange::new_inclusive( + Some(ms_ts(1000)), + Some(ms_ts(2000)), + )), + ), + ( + col("ts") + .gt_eq(ts_lit(1000)) + .and(col("ts").lt(ts_lit(2000))), + TimestampRange::with_unit(1000, 2000, TimeUnit::Millisecond), + ), + ( + col("ts") + .gt_eq(ts_lit(1000)) + .and(col("ts").lt(ts_lit(5000))) + .and(col("ts").lt_eq(ts_lit(3000))), + TimestampRange::with_unit(1000, 3001, TimeUnit::Millisecond), + ), + ] { + assert_eq!(implied_ms(expr), expected); + } + + assert_eq!( + implied_time_range_from_exprs("ts", TimeUnit::Millisecond, &[]), + Some(TimestampRange::min_to_max()) + ); + } + + #[test] + fn implied_time_range_unsupported_exprs() { + let not_between = Expr::Between(Between { + expr: Box::new(col("ts")), + negated: true, + low: Box::new(ts_lit(1000)), + high: Box::new(ts_lit(2000)), + }); + + for expr in [ + not_between, + col("ts").gt_eq(ts_lit(1000)).or(col("ts").lt(ts_lit(500))), + Expr::Not(Box::new(col("ts").gt_eq(ts_lit(1000)))), + col("ts").in_list(vec![ts_lit(1000), ts_lit(2000)], false), + col("ts").gt_eq(col("other")), + col("other_ts").gt_eq(ts_lit(1000)), + ] { + assert!(implied_ms(expr).is_none()); + } + } + + #[test] + fn implied_time_range_unit_conversion() { + let second_1 = lit(ScalarValue::TimestampSecond(Some(1), None)); + let ns_1500 = lit(ScalarValue::TimestampNanosecond(Some(1_500_000_000), None)); + let ns_1500_5 = lit(ScalarValue::TimestampNanosecond(Some(1_500_500_000), None)); + + for (expr, expected) in [ + ( + col("ts").gt_eq(second_1.clone()), + Some(TimestampRange::from_start(ms_ts(1000))), + ), + ( + col("ts").lt_eq(second_1), + Some(TimestampRange::until_end(ms_ts(1000), true)), + ), + ( + col("ts").eq(ns_1500), + Some(TimestampRange::single(ms_ts(1500))), + ), + (col("ts").eq(ns_1500_5.clone()), None), + ( + col("ts").gt_eq(ns_1500_5.clone()), + Some(TimestampRange::from_start(ms_ts(1501))), + ), + ( + col("ts").lt_eq(ns_1500_5.clone()), + Some(TimestampRange::until_end(ms_ts(1500), true)), + ), + ( + col("ts").gt(ns_1500_5), + Some(TimestampRange::from_start(ms_ts(1501))), + ), + ] { + assert_eq!(implied_ms(expr), expected); + } } #[test] diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index fb30913534e2..85a45a17b04f 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -61,7 +61,7 @@ use crate::metrics::READ_SST_COUNT; use crate::read::compat::{self, FlatCompatBatch}; use crate::read::flat_projection::FlatProjectionMapper; use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex}; -use crate::read::range_cache::ScanRequestFingerprint; +use crate::read::range_cache::{ScanRequestFingerprint, implied_time_range_from_exprs}; use crate::read::read_columns::{ ReadColumns, merge, read_columns_from_predicate, read_columns_from_projection, }; @@ -1316,9 +1316,21 @@ fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode { } } -/// Builds a [ScanRequestFingerprint] from a [ScanInput] if the scan is eligible +/// Output of [build_scan_fingerprint]: the cache fingerprint plus the derived +/// implied time range used to decide whether the cache key can drop the time +/// predicates for a given partition (see `build_range_cache_key`). +pub(crate) struct ScanFingerprintBundle { + pub(crate) fingerprint: ScanRequestFingerprint, + /// `Some(r)` = all time-only predicates are guaranteed true on `r` (in the + /// column's `TimeUnit`). + /// `None` = at least one time-only predicate could not be proven (e.g. + /// `OR`), so the cache-key optimization is disabled for this scan. + pub(crate) implied_time_range: Option, +} + +/// Builds a [ScanFingerprintBundle] from a [ScanInput] if the scan is eligible /// for partition range caching. -pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option { +pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option { let eligible = !input.compaction && !input.files.is_empty() && matches!(input.cache_strategy, CacheStrategy::EnableAll(_)); @@ -1351,7 +1363,7 @@ pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option = Vec::new(); let mut has_tag_filter = false; let mut columns = HashSet::new(); @@ -1367,20 +1379,17 @@ pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option false, }; - // TODO(yingwen): The split between `time_filters` and `filters` is currently inert - // because `build_range_cache_key()` always keeps both in the cache key. We used to - // strip `time_filters` when the query's `TimestampRange` covered the partition's - // `FileTimeRange`, but `extract_time_range_from_expr` is not precise enough to prove - // a time predicate is implied by that range (it can return a wider range than the - // predicate, and it does not analyze AND/OR shapes), which let the cache reuse rows - // that should have been filtered. Reviving the optimization needs a per-predicate - // implication check that walks each time-only `Expr` (recursing through AND/OR/NOT) - // and proves the predicate holds for every timestamp inside the partition's - // `FileTimeRange`; until then both buckets land in the fingerprint. + // Route time-only exprs that the legacy extractor recognizes into + // `time_only_exprs` so the implication walker + // (`implied_time_range_from_exprs`, called below) can attempt to drop + // them from the cache key when the partition's `FileTimeRange` is fully + // covered, then stringify them into the fingerprint's `time_filters` + // bucket. Time-only exprs that the extractor doesn't recognize stay in + // `filters` and never get stripped — conservatively correct. if is_time_only && extract_time_range_from_expr(&time_index_name, ts_col_unit, expr).is_some() { - time_filters.push(expr.to_string()); + time_only_exprs.push(expr); } else { filters.push(expr.to_string()); } @@ -1391,31 +1400,38 @@ pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option = time_only_exprs.iter().map(|e| e.to_string()).collect(); + // Ensure the filters are sorted for consistent fingerprinting. filters.sort_unstable(); time_filters.sort_unstable(); let read_columns = input.read_cols.clone(); - Some( - crate::read::range_cache::ScanRequestFingerprintBuilder { - read_column_types: read_columns - .column_ids_iter() - .map(|id| { - metadata - .column_by_id(id) - .map(|col| col.column_schema.data_type.clone()) - }) - .collect(), - read_columns, - filters, - time_filters, - series_row_selector: input.series_row_selector, - append_mode: input.append_mode, - filter_deleted: input.filter_deleted, - merge_mode: input.merge_mode, - partition_expr_version: metadata.partition_expr_version, - } - .build(), - ) + let fingerprint = crate::read::range_cache::ScanRequestFingerprintBuilder { + read_column_types: read_columns + .column_ids_iter() + .map(|id| { + metadata + .column_by_id(id) + .map(|col| col.column_schema.data_type.clone()) + }) + .collect(), + read_columns, + filters, + time_filters, + series_row_selector: input.series_row_selector, + append_mode: input.append_mode, + filter_deleted: input.filter_deleted, + merge_mode: input.merge_mode, + partition_expr_version: metadata.partition_expr_version, + } + .build(); + + Some(ScanFingerprintBundle { + fingerprint, + implied_time_range, + }) } /// Context shared by different streams from a scanner. @@ -1429,6 +1445,13 @@ pub struct StreamContext { /// `None` when the scan is not eligible for caching. #[allow(dead_code)] pub(crate) scan_fingerprint: Option, + /// Implied range of every time-only predicate, in the time index column's + /// `TimeUnit`. Used by `build_range_cache_key` to decide whether the + /// partition's `FileTimeRange` is fully covered (allowing `time_filters` + /// to be stripped from the cache key). `None` when caching is ineligible + /// or when the implication walker bailed on an unsupported shape (e.g. + /// `OR`). + pub(crate) scan_implied_time_range: Option, // Metrics: /// The start time of the query. @@ -1441,12 +1464,16 @@ impl StreamContext { let query_start = input.query_start.unwrap_or_else(Instant::now); let ranges = RangeMeta::seq_scan_ranges(&input); READ_SST_COUNT.observe(input.num_files() as f64); - let scan_fingerprint = build_scan_fingerprint(&input); + let (scan_fingerprint, scan_implied_time_range) = match build_scan_fingerprint(&input) { + Some(b) => (Some(b.fingerprint), b.implied_time_range), + None => (None, None), + }; Self { input, ranges, scan_fingerprint, + scan_implied_time_range, query_start, } } @@ -1456,12 +1483,16 @@ impl StreamContext { let query_start = input.query_start.unwrap_or_else(Instant::now); let ranges = RangeMeta::unordered_scan_ranges(&input); READ_SST_COUNT.observe(input.num_files() as f64); - let scan_fingerprint = build_scan_fingerprint(&input); + let (scan_fingerprint, scan_implied_time_range) = match build_scan_fingerprint(&input) { + Some(b) => (Some(b.fingerprint), b.implied_time_range), + None => (None, None), + }; Self { input, ranges, scan_fingerprint, + scan_implied_time_range, query_start, } } @@ -1969,7 +2000,7 @@ mod tests { partition_expr_version: 0, } .build(); - assert_eq!(expected, fingerprint); + assert_eq!(expected, fingerprint.fingerprint); } #[tokio::test] @@ -2042,7 +2073,7 @@ mod tests { partition_expr_version: metadata.partition_expr_version, } .build(); - assert_eq!(expected, fingerprint); + assert_eq!(expected, fingerprint.fingerprint); assert_ne!(0, metadata.partition_expr_version); } diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 5b7e46b0c1ce..1d97b2eb7693 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -1375,6 +1375,7 @@ mod split_tests { input, ranges: vec![], scan_fingerprint: None, + scan_implied_time_range: None, query_start: std::time::Instant::now(), } } @@ -1755,6 +1756,7 @@ mod tests { input, ranges: Vec::new(), scan_fingerprint: None, + scan_implied_time_range: None, query_start: Instant::now(), }) } diff --git a/src/table/src/predicate.rs b/src/table/src/predicate.rs index e559e2c2962d..29b10430492b 100644 --- a/src/table/src/predicate.rs +++ b/src/table/src/predicate.rs @@ -41,7 +41,7 @@ mod stats; /// In theory, it should be converted to a timestamp scalar value by `TypeConversionRule`. macro_rules! return_none_if_utf8 { ($lit: ident) => { - if matches!($lit, ScalarValue::Utf8(_)) { + if is_string_timestamp_literal($lit) { warn!( "Unexpected ScalarValue::Utf8 in time range predicate: {:?}. Maybe it's an implicit bug, please report it to https://github.com/GreptimeTeam/greptimedb/issues", $lit @@ -53,6 +53,13 @@ macro_rules! return_none_if_utf8 { }; } +pub fn is_string_timestamp_literal(scalar: &ScalarValue) -> bool { + matches!( + scalar, + ScalarValue::Utf8(_) | ScalarValue::LargeUtf8(_) | ScalarValue::Utf8View(_) + ) +} + /// Reference-counted pointer to a list of logical exprs and a list of dynamic filter physical exprs. #[derive(Debug, Clone, Default)] pub struct Predicate { From c17eab5718b463d5808ed19168865292dea9333c Mon Sep 17 00:00:00 2001 From: evenyag Date: Tue, 19 May 2026 17:22:54 +0800 Subject: [PATCH 2/4] refactor: tighten Lt implied time range bound Signed-off-by: evenyag --- src/mito2/src/read/range_cache.rs | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/src/mito2/src/read/range_cache.rs b/src/mito2/src/read/range_cache.rs index b8e01fcf8393..9ce0c631559a 100644 --- a/src/mito2/src/read/range_cache.rs +++ b/src/mito2/src/read/range_cache.rs @@ -397,9 +397,9 @@ fn implied_from_cmp( Some(TimestampRange::until_end(b, true)) } Operator::Lt => { - // ts < L. Use the exclusive floor bound; it is sound even when the - // literal is not exactly representable in the column unit. - let b = ts.convert_to(ts_col_unit)?; + // ts < L. `ts < ceil(L)` is the tight bound: equal to `ts < L` when + // L is exactly representable, and `ts <= floor(L)` otherwise. + let b = ts.convert_to_ceil(ts_col_unit)?; Some(TimestampRange::until_end(b, false)) } Operator::Eq => { @@ -479,7 +479,17 @@ pub(crate) fn build_range_cache_key( let range_meta = &stream_ctx.ranges[part_range.identifier]; let (file_min, file_max) = range_meta.time_range; let covers = match &stream_ctx.scan_implied_time_range { - Some(implied) => implied.contains(&file_min) && implied.contains(&file_max), + Some(implied) => { + // The `contains` check is sound only when `file_min`/`file_max` + // share the implied range's unit (the time index column's unit). + // Mito stores time index values in that unit; assert to catch any + // future drift. + if let Some(ts) = implied.start().as_ref().or(implied.end().as_ref()) { + assert_eq!(file_min.unit(), ts.unit()); + assert_eq!(file_max.unit(), ts.unit()); + } + implied.contains(&file_min) && implied.contains(&file_max) + } None => false, }; let scan = if covers { @@ -1226,9 +1236,13 @@ mod tests { Some(TimestampRange::until_end(ms_ts(1500), true)), ), ( - col("ts").gt(ns_1500_5), + col("ts").gt(ns_1500_5.clone()), Some(TimestampRange::from_start(ms_ts(1501))), ), + ( + col("ts").lt(ns_1500_5), + Some(TimestampRange::until_end(ms_ts(1501), false)), + ), ] { assert_eq!(implied_ms(expr), expected); } From 037fbf2916884e3ca4752ee0b176acbbe36918fb Mon Sep 17 00:00:00 2001 From: evenyag Date: Tue, 19 May 2026 18:07:05 +0800 Subject: [PATCH 3/4] docs: tighten range cache key comment Signed-off-by: evenyag --- src/mito2/src/read/range_cache.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/mito2/src/read/range_cache.rs b/src/mito2/src/read/range_cache.rs index 9ce0c631559a..21119312ae64 100644 --- a/src/mito2/src/read/range_cache.rs +++ b/src/mito2/src/read/range_cache.rs @@ -469,13 +469,11 @@ pub(crate) fn build_range_cache_key( return None; } - // If every time-only predicate is implied to be true on this partition's - // `FileTimeRange`, drop them from the cache key so that two queries with - // different but equally-covering time bounds share the same entry. The - // implied range is computed once per scan by `implied_time_range_from_exprs` - // (see `build_scan_fingerprint`); `None` means at least one time-only - // predicate had an unsupported shape (e.g. `OR`), so the optimization is - // disabled and the time predicates remain part of the key. + // If the implied range covers this partition's `FileTimeRange`, drop + // time-only predicates from the cache key so that queries with different + // but equally-covering time bounds share an entry. `None` means some + // time-only predicate had an unsupported shape (e.g. `OR`), so we keep + // them in the key. let range_meta = &stream_ctx.ranges[part_range.identifier]; let (file_min, file_max) = range_meta.time_range; let covers = match &stream_ctx.scan_implied_time_range { From 72a669e4ccc3a51e34bb719968d4c64591a53821 Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 25 May 2026 15:50:26 +0800 Subject: [PATCH 4/4] fix: skip range cache unit asserts on empty implied range Signed-off-by: evenyag --- src/mito2/src/read/range_cache.rs | 37 +++++++++++++++++++++++++++++-- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/read/range_cache.rs b/src/mito2/src/read/range_cache.rs index 21119312ae64..af212ab23e63 100644 --- a/src/mito2/src/read/range_cache.rs +++ b/src/mito2/src/read/range_cache.rs @@ -477,7 +477,11 @@ pub(crate) fn build_range_cache_key( let range_meta = &stream_ctx.ranges[part_range.identifier]; let (file_min, file_max) = range_meta.time_range; let covers = match &stream_ctx.scan_implied_time_range { - Some(implied) => { + // An empty implied range can never cover a non-empty file range, so + // short-circuit. We also skip the unit asserts because + // `TimestampRange::empty()` uses `Timestamp::default()` (millisecond), + // which would falsely trip the asserts for non-ms time index columns. + Some(implied) if !implied.is_empty() => { // The `contains` check is sound only when `file_min`/`file_max` // share the implied range's unit (the time index column's unit). // Mito stores time index values in that unit; assert to catch any @@ -488,7 +492,7 @@ pub(crate) fn build_range_cache_key( } implied.contains(&file_min) && implied.contains(&file_max) } - None => false, + _ => false, }; let scan = if covers { fingerprint.without_time_filters() @@ -1119,6 +1123,35 @@ mod tests { ); } + #[tokio::test] + async fn empty_implied_range_does_not_panic_on_non_ms_file_range() { + // Contradictory time predicates make the implied range empty. The + // empty range's sentinel timestamps use `Timestamp::default()` (ms), + // so without the `is_empty()` short-circuit the unit asserts would + // panic against a non-ms `range_meta.time_range`. + let partition = ( + Timestamp::new_millisecond(1000), + Timestamp::new_millisecond(2000), + ); + + let (mut ctx, part_range) = new_stream_context( + vec![col("ts").gt_eq(ts_lit(1500)), col("k0").eq(lit("foo"))], + TimestampRange::with_unit(1500, 3000, TimeUnit::Millisecond), + partition, + ) + .await; + + ctx.scan_implied_time_range = Some(TimestampRange::empty()); + ctx.ranges[0].time_range = ( + Timestamp::new(1_000_000_000, TimeUnit::Nanosecond), + Timestamp::new(2_000_000_000, TimeUnit::Nanosecond), + ); + + let key = build_range_cache_key(&ctx, &part_range).unwrap(); + // Empty implied range cannot cover, so time filters stay in the key. + assert!(!key.scan.time_filters().is_empty()); + } + fn ms_ts(v: i64) -> Timestamp { Timestamp::new_millisecond(v) }