Skip to content
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 222 additions & 0 deletions src/table/src/predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@
op: &Operator,
right: &Expr,
) -> Option<TimestampRange> {
if let Some(range) = get_casted_timestamp_filter(ts_col_name, ts_col_unit, left, op, right) {
return Some(range);
}

match op {
Operator::Eq => get_timestamp_filter(ts_col_name, left, right)
.and_then(|(ts, _)| ts.convert_to(ts_col_unit))
Expand Down Expand Up @@ -310,7 +314,105 @@
}
}

fn get_casted_timestamp_filter(
ts_col_name: &str,
ts_col_unit: TimeUnit,
left: &Expr,
op: &Operator,
right: &Expr,
) -> Option<TimestampRange> {
let (lit, op) = match (left, right) {
(expr, Expr::Literal(scalar, _)) if is_casted_time_index(expr, ts_col_name) => {
(scalar, op.clone())

Check failure on line 326 in src/table/src/predicate.rs

View workflow job for this annotation

GitHub Actions / Clippy

using `clone` on type `Operator` which implements the `Copy` trait
}
(Expr::Literal(scalar, _), expr) if is_casted_time_index(expr, ts_col_name) => {
(scalar, reverse_operator(op)?)
}
_ => return None,
};

return_none_if_utf8!(lit);
let ScalarValue::TimestampMillisecond(Some(lit_ms), None) = lit else {
return None;
};

// Avoid epoch-boundary cases until negative timestamp cast semantics are tested.
// With truncation-toward-zero, predicates such as `CAST(ts AS ms) >= 0` may include
// negative sub-millisecond raw timestamps, so `ts >= 0` would be too narrow.
if *lit_ms <= 0 {
return None;
}

let lit_ts = Timestamp::new_millisecond(*lit_ms);
let next_ms = lit_ms.checked_add(1).map(Timestamp::new_millisecond)?;

match &op {
Operator::Eq => TimestampRange::new(
lit_ts.convert_to(ts_col_unit)?,
next_ms.convert_to(ts_col_unit)?,
),
Operator::Lt => lit_ts
.convert_to(ts_col_unit)
.map(|ts| TimestampRange::until_end(ts, false)),
Operator::LtEq => next_ms
.convert_to(ts_col_unit)
.map(|ts| TimestampRange::until_end(ts, false)),
Operator::Gt => next_ms
.convert_to(ts_col_unit)
.map(TimestampRange::from_start),
Operator::GtEq => lit_ts
.convert_to(ts_col_unit)
.map(TimestampRange::from_start),
_ => None,
}
}

fn is_casted_time_index(expr: &Expr, ts_col_name: &str) -> bool {
let Expr::Cast(cast) = expr else {
return false;
};

if !matches!(
&cast.data_type,
datatypes::arrow::datatypes::DataType::Timestamp(
datatypes::arrow::datatypes::TimeUnit::Millisecond,
None
)
) {
return false;
}
Comment on lines +375 to +383
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The type paths for DataType and TimeUnit are unnecessarily verbose. Since datatypes::arrow is imported as arrow at the top of the file, you can use arrow::datatypes::DataType and arrow::datatypes::TimeUnit directly to improve readability.

    if !matches!(
        &cast.data_type,
        arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None)
    ) {
        return false;
    }


let Expr::Column(col) = cast.expr.as_ref() else {
return false;
};

col.name == ts_col_name
}

fn reverse_operator(op: &Operator) -> Option<Operator> {
match op {
Operator::Eq => Some(Operator::Eq),
Operator::Lt => Some(Operator::Gt),
Operator::LtEq => Some(Operator::GtEq),
Operator::Gt => Some(Operator::Lt),
Operator::GtEq => Some(Operator::LtEq),
_ => None,
}
}

fn get_timestamp_filter(ts_col_name: &str, left: &Expr, right: &Expr) -> Option<(Timestamp, bool)> {
// Design note for extracting `CAST(time_index AS Timestamp(ms)) <op> literal` (#7913):
// this helper currently accepts only raw `time_index <op> literal` filters. If casted
// time-index filters are added, derive a pruning range from the cast bucket, not by
// blindly replacing `CAST(ts)` with `ts`. For positive millisecond literals and finer
// raw units whose cast-to-ms semantics drop the sub-ms remainder, the safe mappings are:
// `CAST(ts)=L` => `[L, L+1ms)`, `< L` => `(-inf, L)`, `<= L` => `(-inf, L+1ms)`,
// `> L` => `[L+1ms, +inf)`, and `>= L` => `[L, +inf)`, converted to the raw unit.
// Literal-left comparisons can be normalized by reversing the operator. Keep the original
// filter applied when the extracted range is only a pruning approximation, and return
// `None` instead of risking a narrower-than-true range for unsupported operators (`!=`),
// timezone/overflow ambiguity, or negative literals until Arrow/DataFusion cast semantics
// for negative epochs are covered by tests.
let (col, lit, reverse) = match (left, right) {
(Expr::Column(column), Expr::Literal(scalar, _)) => (column, scalar, false),
(Expr::Literal(scalar, _), Expr::Column(column)) => (column, scalar, true),
Expand Down Expand Up @@ -421,6 +523,24 @@
);
}

fn check_build_predicate_with_unit(expr: Expr, unit: TimeUnit, expect: TimestampRange) {
assert_eq!(expect, build_time_range_predicate("ts", unit, &[expr]));
}

fn cast_to_ms_col(name: &str) -> Expr {
Expr::Cast(datafusion_expr::Cast {
expr: Box::new(col(name)),
data_type: DataType::Timestamp(
datatypes::arrow::datatypes::TimeUnit::Millisecond,
None,
),
})
}

fn ms_lit(value: i64) -> Expr {
lit(ScalarValue::TimestampMillisecond(Some(value), None))
}

#[test]
fn test_gt() {
// ts > 1ms
Expand Down Expand Up @@ -577,6 +697,108 @@
);
}

#[test]
fn test_casted_time_index_filter() {
let ts = cast_to_ms_col("ts");
let unit = TimeUnit::Microsecond;

check_build_predicate_with_unit(
ts.clone().eq(ms_lit(1000)),
unit,
TimestampRange::new(
Timestamp::new(1_000_000, unit),
Timestamp::new(1_001_000, unit),
)
.unwrap(),
);
check_build_predicate_with_unit(
ts.clone().lt(ms_lit(1000)),
unit,
TimestampRange::until_end(Timestamp::new(1_000_000, unit), false),
);
check_build_predicate_with_unit(
ts.clone().lt_eq(ms_lit(1000)),
unit,
TimestampRange::until_end(Timestamp::new(1_001_000, unit), false),
);
check_build_predicate_with_unit(
ts.clone().gt(ms_lit(1000)),
unit,
TimestampRange::from_start(Timestamp::new(1_001_000, unit)),
);
check_build_predicate_with_unit(
ts.gt_eq(ms_lit(1000)),
unit,
TimestampRange::from_start(Timestamp::new(1_000_000, unit)),
);
}

#[test]
fn test_casted_time_index_filter_literal_left() {
let ts = cast_to_ms_col("ts");
let unit = TimeUnit::Nanosecond;

check_build_predicate_with_unit(
ms_lit(1000).lt(ts.clone()),
unit,
TimestampRange::from_start(Timestamp::new(1_001_000_000, unit)),
);
check_build_predicate_with_unit(
ms_lit(1000).lt_eq(ts.clone()),
unit,
TimestampRange::from_start(Timestamp::new(1_000_000_000, unit)),
);
check_build_predicate_with_unit(
ms_lit(1000).gt(ts.clone()),
unit,
TimestampRange::until_end(Timestamp::new(1_000_000_000, unit), false),
);
check_build_predicate_with_unit(
ms_lit(1000).gt_eq(ts.clone()),
unit,
TimestampRange::until_end(Timestamp::new(1_001_000_000, unit), false),
);
check_build_predicate_with_unit(
ms_lit(1000).eq(ts),
unit,
TimestampRange::new(
Timestamp::new(1_000_000_000, unit),
Timestamp::new(1_001_000_000, unit),
)
.unwrap(),
);
}

#[test]
fn test_casted_time_index_filter_unsupported_cases() {
check_build_predicate_with_unit(
cast_to_ms_col("other").eq(ms_lit(1000)),
TimeUnit::Microsecond,
TimestampRange::min_to_max(),
);

let cast_to_second = Expr::Cast(datafusion_expr::Cast {
expr: Box::new(col("ts")),
data_type: DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
});
check_build_predicate_with_unit(
cast_to_second.eq(ms_lit(1000)),
TimeUnit::Microsecond,
TimestampRange::min_to_max(),
);

check_build_predicate_with_unit(
cast_to_ms_col("ts").gt_eq(ms_lit(0)),
TimeUnit::Microsecond,
TimestampRange::min_to_max(),
);
check_build_predicate_with_unit(
cast_to_ms_col("ts").eq(lit(ScalarValue::TimestampMicrosecond(Some(1000), None))),
TimeUnit::Microsecond,
TimestampRange::min_to_max(),
);
}

async fn gen_test_parquet_file(dir: &TempDir, cnt: usize) -> (String, Arc<Schema>) {
let path = dir
.path()
Expand Down
Loading