feat: initial implementation for range cache with time filters (#8130)

* feat: initial implementation for range cache time filters

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: tighten Lt implied time range bound

Signed-off-by: evenyag <realevenyag@gmail.com>

* docs: tighten range cache key comment

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: skip range cache unit asserts on empty implied range

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2026-05-26 15:03:31 +08:00
committed by GitHub
parent c84462bdc1
commit 6193e8760b
4 changed files with 590 additions and 94 deletions

View File

@@ -19,13 +19,20 @@ use std::sync::Arc;
use async_stream::try_stream;
use common_telemetry::warn;
use common_time::Timestamp;
use common_time::range::TimestampRange;
use common_time::timestamp::TimeUnit;
use datafusion_expr::expr::Expr;
use datafusion_expr::{Between, BinaryExpr, Operator};
use datatypes::arrow::compute::concat_batches;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::scalar_value_to_timestamp;
use futures::TryStreamExt;
use snafu::ResultExt;
use store_api::region_engine::PartitionRange;
use store_api::storage::{FileId, RegionId, TimeSeriesRowSelector};
use table::predicate::is_string_timestamp_literal;
use tokio::sync::{mpsc, oneshot};
use crate::cache::CacheStrategy;
@@ -139,7 +146,6 @@ impl ScanRequestFingerprint {
.unwrap_or(&[])
}
#[allow(dead_code)]
pub(crate) fn without_time_filters(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
@@ -266,6 +272,177 @@ pub(crate) fn collect_partition_range_row_groups(
}
}
/// Returns the timestamp range where all time-only predicates are guaranteed true.
///
/// Returns `Some(min_to_max)` for empty input (vacuously true everywhere).
/// Returns `None` if any expression contains an unsupported shape: `OR`, `NOT`,
/// `IN`, non-literal RHS, unsupported operator, column-name mismatch, an `=`
/// literal that cannot be represented exactly in the column unit, or overflow
/// during bound adjustment.
///
/// This is intentionally stricter than `extract_time_range_from_expr` in
/// `table::predicate`: lower bounds round up and upper bounds round down. If a
/// partition's file-time range is contained by the returned range, every row in
/// that partition satisfies the original time predicates.
///
/// `IsNull`/`IsNotNull` on the time index are not routed into `time_filters`
/// today. If that changes, handle them here before stripping time filters from
/// the cache key.
pub(crate) fn implied_time_range_from_exprs(
ts_col_name: &str,
ts_col_unit: TimeUnit,
exprs: &[&Expr],
) -> Option<TimestampRange> {
let mut acc = TimestampRange::min_to_max();
for expr in exprs {
let r = implied_time_range_from_expr(ts_col_name, ts_col_unit, expr)?;
acc = acc.and(&r);
}
Some(acc)
}
fn implied_time_range_from_expr(
ts_col_name: &str,
ts_col_unit: TimeUnit,
expr: &Expr,
) -> Option<TimestampRange> {
match expr {
Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
Operator::And => {
let l = implied_time_range_from_expr(ts_col_name, ts_col_unit, left)?;
let r = implied_time_range_from_expr(ts_col_name, ts_col_unit, right)?;
Some(l.and(&r))
}
Operator::Eq | Operator::Lt | Operator::LtEq | Operator::Gt | Operator::GtEq => {
implied_from_cmp(ts_col_name, ts_col_unit, left, *op, right)
}
// `OR` would require a strict intersection over a union of half-planes
// (not the loose-span union provided by `TimestampRange::or`), so we
// refuse it. Any other operator is unsupported.
_ => None,
},
Expr::Between(Between {
expr,
negated,
low,
high,
}) => {
if *negated {
return None;
}
implied_from_between(ts_col_name, ts_col_unit, expr, low, high)
}
// Includes `IsNull`, `IsNotNull`, `Not`, `InList`, function calls, etc.
_ => None,
}
}
fn match_ts_column_literal<'a>(
ts_col_name: &str,
left: &'a Expr,
right: &'a Expr,
) -> Option<(Timestamp, bool)> {
let (col, scalar, reverse) = match (left, right) {
(Expr::Column(c), Expr::Literal(s, _)) => (c, s, false),
(Expr::Literal(s, _), Expr::Column(c)) => (c, s, true),
_ => return None,
};
if col.name != ts_col_name {
return None;
}
// Reject string literals: their conversion needs a timezone we do not have,
// and the existing extractor in `table::predicate` rejects them too.
if is_string_timestamp_literal(scalar) {
return None;
}
scalar_value_to_timestamp(scalar, None).map(|t| (t, reverse))
}
fn implied_from_cmp(
ts_col_name: &str,
ts_col_unit: TimeUnit,
left: &Expr,
op: Operator,
right: &Expr,
) -> Option<TimestampRange> {
let (ts, reverse) = match_ts_column_literal(ts_col_name, left, right)?;
// Normalize to "column OP literal".
let op = if reverse {
match op {
Operator::Lt => Operator::Gt,
Operator::LtEq => Operator::GtEq,
Operator::Gt => Operator::Lt,
Operator::GtEq => Operator::LtEq,
Operator::Eq => Operator::Eq,
_ => return None,
}
} else {
op
};
match op {
Operator::GtEq => {
// ts >= L. Round the lower bound up in the column unit.
let b = ts.convert_to_ceil(ts_col_unit)?;
Some(TimestampRange::from_start(b))
}
Operator::Gt => {
// ts > L. floor(L) + 1 is the tight lower bound in the column unit.
let v = ts.convert_to(ts_col_unit)?.value().checked_add(1)?;
Some(TimestampRange::from_start(Timestamp::new(v, ts_col_unit)))
}
Operator::LtEq => {
// ts <= L. Round the upper bound down in the column unit.
let b = ts.convert_to(ts_col_unit)?;
Some(TimestampRange::until_end(b, true))
}
Operator::Lt => {
// ts < L. `ts < ceil(L)` is the tight bound: equal to `ts < L` when
// L is exactly representable, and `ts <= floor(L)` otherwise.
let b = ts.convert_to_ceil(ts_col_unit)?;
Some(TimestampRange::until_end(b, false))
}
Operator::Eq => {
// ts = L. Only provable when L is exactly representable.
let f = ts.convert_to(ts_col_unit)?;
let c = ts.convert_to_ceil(ts_col_unit)?;
if f.value() != c.value() {
return None;
}
Some(TimestampRange::single(f))
}
_ => None,
}
}
fn implied_from_between(
ts_col_name: &str,
ts_col_unit: TimeUnit,
expr: &Expr,
low: &Expr,
high: &Expr,
) -> Option<TimestampRange> {
let Expr::Column(c) = expr else {
return None;
};
if c.name != ts_col_name {
return None;
}
let (low_s, high_s) = match (low, high) {
(Expr::Literal(l, _), Expr::Literal(h, _)) => (l, h),
_ => return None,
};
if is_string_timestamp_literal(low_s) || is_string_timestamp_literal(high_s) {
return None;
}
let low_ts = scalar_value_to_timestamp(low_s, None)?;
let high_ts = scalar_value_to_timestamp(high_s, None)?;
// BETWEEN low AND high is equivalent to ts >= low AND ts <= high.
let lo = low_ts.convert_to_ceil(ts_col_unit)?;
let hi = high_ts.convert_to(ts_col_unit)?;
Some(TimestampRange::new_inclusive(Some(lo), Some(hi)))
}
/// Builds a cache key for the given partition range if it is eligible for caching.
pub(crate) fn build_range_cache_key(
stream_ctx: &StreamContext,
@@ -292,17 +469,36 @@ pub(crate) fn build_range_cache_key(
return None;
}
// TODO(yingwen): We used to call `fingerprint.without_time_filters()` when the query's
// `TimestampRange` fully covered the partition's `FileTimeRange`, so different queries that
// all enclosed the same partition could share a cache entry. The cover check turned out to
// be too coarse: it returned true in cases where the dropped time predicates would still
// have excluded rows, so the cache served results that should have been filtered. Reviving
// the optimization needs a per-predicate implication check that walks each time-only `Expr`
// (recursing through AND/OR/NOT) and proves the predicate is satisfied for every timestamp
// inside the partition's `FileTimeRange` — not the looser "does `extract_time_range_from_expr`
// return a range that covers the partition" used previously. Until then, always carry the
// full fingerprint so cache reuse stays correct.
let scan = fingerprint.clone();
// If the implied range covers this partition's `FileTimeRange`, drop
// time-only predicates from the cache key so that queries with different
// but equally-covering time bounds share an entry. `None` means some
// time-only predicate had an unsupported shape (e.g. `OR`), so we keep
// them in the key.
let range_meta = &stream_ctx.ranges[part_range.identifier];
let (file_min, file_max) = range_meta.time_range;
let covers = match &stream_ctx.scan_implied_time_range {
// An empty implied range can never cover a non-empty file range, so
// short-circuit. We also skip the unit asserts because
// `TimestampRange::empty()` uses `Timestamp::default()` (millisecond),
// which would falsely trip the asserts for non-ms time index columns.
Some(implied) if !implied.is_empty() => {
// The `contains` check is sound only when `file_min`/`file_max`
// share the implied range's unit (the time index column's unit).
// Mito stores time index values in that unit; assert to catch any
// future drift.
if let Some(ts) = implied.start().as_ref().or(implied.end().as_ref()) {
assert_eq!(file_min.unit(), ts.unit());
assert_eq!(file_max.unit(), ts.unit());
}
implied.contains(&file_min) && implied.contains(&file_max)
}
_ => false,
};
let scan = if covers {
fingerprint.without_time_filters()
} else {
fingerprint.clone()
};
Some(RangeScanCacheKey {
region_id: stream_ctx.input.region_metadata().region_id,
@@ -722,11 +918,16 @@ mod tests {
num_rows: 10,
};
let partition_range = range_meta.new_partition_range(0);
let scan_fingerprint = crate::read::scan_region::build_scan_fingerprint(&input);
let (scan_fingerprint, scan_implied_time_range) =
match crate::read::scan_region::build_scan_fingerprint(&input) {
Some(b) => (Some(b.fingerprint), b.implied_time_range),
None => (None, None),
};
let stream_ctx = StreamContext {
input,
ranges: vec![range_meta],
scan_fingerprint,
scan_implied_time_range,
query_start: Instant::now(),
};
@@ -770,57 +971,312 @@ mod tests {
}
#[tokio::test]
async fn preserves_time_filters_when_query_covers_partition_range() {
assert_range_cache_filters(
vec![
col("ts").gt_eq(ts_lit(1000)),
col("ts").lt(ts_lit(2001)),
col("ts").is_not_null(),
col("k0").eq(lit("foo")),
],
TimestampRange::with_unit(1000, 2002, TimeUnit::Millisecond),
(
Timestamp::new_millisecond(1000),
Timestamp::new_millisecond(2000),
),
vec![col("k0").eq(lit("foo")), col("ts").is_not_null()],
vec![col("ts").gt_eq(ts_lit(1000)), col("ts").lt(ts_lit(2001))],
)
.await;
async fn range_cache_time_filter_key_cases() {
let partition = (
Timestamp::new_millisecond(1000),
Timestamp::new_millisecond(2000),
);
struct Case {
filters: Vec<Expr>,
query_time_range: Option<TimestampRange>,
expected_filters: Vec<Expr>,
expected_time_filters: Vec<Expr>,
}
// Time filters are stripped only when their implied range fully covers
// the partition's file-time range. `is_not_null(ts)` stays in regular
// filters because it is not routed into `time_filters`.
for case in [
Case {
filters: vec![
col("ts").gt_eq(ts_lit(1000)),
col("ts").lt(ts_lit(2001)),
col("ts").is_not_null(),
col("k0").eq(lit("foo")),
],
query_time_range: TimestampRange::with_unit(1000, 2002, TimeUnit::Millisecond),
expected_filters: vec![col("k0").eq(lit("foo")), col("ts").is_not_null()],
expected_time_filters: vec![],
},
Case {
filters: vec![
col("ts").gt_eq(ts_lit(500)),
col("ts").lt(ts_lit(3000)),
col("k0").eq(lit("foo")),
],
query_time_range: TimestampRange::with_unit(500, 3000, TimeUnit::Millisecond),
expected_filters: vec![col("k0").eq(lit("foo"))],
expected_time_filters: vec![],
},
Case {
filters: vec![
col("ts").gt_eq(ts_lit(1000)),
col("ts").lt_eq(ts_lit(2000)),
col("k0").eq(lit("foo")),
],
query_time_range: TimestampRange::with_unit(1000, 2001, TimeUnit::Millisecond),
expected_filters: vec![col("k0").eq(lit("foo"))],
expected_time_filters: vec![],
},
Case {
filters: vec![
col("ts").between(ts_lit(1000), ts_lit(2000)),
col("k0").eq(lit("foo")),
],
query_time_range: TimestampRange::with_unit(1000, 2001, TimeUnit::Millisecond),
expected_filters: vec![col("k0").eq(lit("foo"))],
expected_time_filters: vec![],
},
Case {
filters: vec![col("ts").gt_eq(ts_lit(1200)), col("k0").eq(lit("foo"))],
query_time_range: TimestampRange::with_unit(1200, 2001, TimeUnit::Millisecond),
expected_filters: vec![col("k0").eq(lit("foo"))],
expected_time_filters: vec![col("ts").gt_eq(ts_lit(1200))],
},
Case {
filters: vec![
col("ts").gt_eq(ts_lit(1500)),
col("ts").is_not_null(),
col("k0").eq(lit("foo")),
],
query_time_range: None,
expected_filters: vec![col("k0").eq(lit("foo")), col("ts").is_not_null()],
expected_time_filters: vec![col("ts").gt_eq(ts_lit(1500))],
},
] {
assert_range_cache_filters(
case.filters,
case.query_time_range,
partition,
case.expected_filters,
case.expected_time_filters,
)
.await;
}
}
#[tokio::test]
async fn preserves_time_filters_when_query_does_not_cover_partition_range() {
assert_range_cache_filters(
vec![col("ts").gt_eq(ts_lit(1000)), col("k0").eq(lit("foo"))],
TimestampRange::with_unit(1000, 1500, TimeUnit::Millisecond),
(
Timestamp::new_millisecond(1000),
Timestamp::new_millisecond(2000),
),
vec![col("k0").eq(lit("foo"))],
vec![col("ts").gt_eq(ts_lit(1000))],
async fn two_distinct_queries_share_cache_key_when_both_cover() {
let partition_range = (
Timestamp::new_millisecond(1000),
Timestamp::new_millisecond(2000),
);
let (ctx_a, part_a) = new_stream_context(
vec![
col("ts").gt_eq(ts_lit(500)),
col("ts").lt(ts_lit(3000)),
col("k0").eq(lit("foo")),
],
TimestampRange::with_unit(500, 3000, TimeUnit::Millisecond),
partition_range,
)
.await;
let (ctx_b, part_b) = new_stream_context(
vec![
col("ts").gt_eq(ts_lit(100)),
col("ts").lt(ts_lit(5000)),
col("k0").eq(lit("foo")),
],
TimestampRange::with_unit(100, 5000, TimeUnit::Millisecond),
partition_range,
)
.await;
let key_a = build_range_cache_key(&ctx_a, &part_a).unwrap();
let key_b = build_range_cache_key(&ctx_b, &part_b).unwrap();
assert_eq!(key_a.scan, key_b.scan);
assert!(key_a.scan.time_filters().is_empty());
}
#[tokio::test]
async fn preserves_time_filters_when_query_has_no_time_range_limit() {
assert_range_cache_filters(
vec![
col("ts").gt_eq(ts_lit(1000)),
col("ts").is_not_null(),
col("k0").eq(lit("foo")),
],
async fn disables_optimization_on_or_clause() {
let partition_range = (
Timestamp::new_millisecond(1000),
Timestamp::new_millisecond(2000),
);
let or_a = col("ts").gt_eq(ts_lit(1000)).or(col("ts").lt(ts_lit(500)));
let or_b = col("ts").gt_eq(ts_lit(900)).or(col("ts").lt(ts_lit(400)));
let (ctx_a, part_a) = new_stream_context(
vec![or_a.clone(), col("k0").eq(lit("foo"))],
None,
(
Timestamp::new_millisecond(1000),
Timestamp::new_millisecond(2000),
),
vec![col("k0").eq(lit("foo")), col("ts").is_not_null()],
vec![col("ts").gt_eq(ts_lit(1000))],
partition_range,
)
.await;
let (ctx_b, part_b) = new_stream_context(
vec![or_b.clone(), col("k0").eq(lit("foo"))],
None,
partition_range,
)
.await;
assert!(ctx_a.scan_implied_time_range.is_none());
let key_a = build_range_cache_key(&ctx_a, &part_a).unwrap();
let key_b = build_range_cache_key(&ctx_b, &part_b).unwrap();
assert_ne!(key_a.scan, key_b.scan);
assert_eq!(
key_a.scan.time_filters(),
normalized_exprs([or_a]).as_slice()
);
}
#[tokio::test]
async fn empty_implied_range_does_not_panic_on_non_ms_file_range() {
// Contradictory time predicates make the implied range empty. The
// empty range's sentinel timestamps use `Timestamp::default()` (ms),
// so without the `is_empty()` short-circuit the unit asserts would
// panic against a non-ms `range_meta.time_range`.
let partition = (
Timestamp::new_millisecond(1000),
Timestamp::new_millisecond(2000),
);
let (mut ctx, part_range) = new_stream_context(
vec![col("ts").gt_eq(ts_lit(1500)), col("k0").eq(lit("foo"))],
TimestampRange::with_unit(1500, 3000, TimeUnit::Millisecond),
partition,
)
.await;
ctx.scan_implied_time_range = Some(TimestampRange::empty());
ctx.ranges[0].time_range = (
Timestamp::new(1_000_000_000, TimeUnit::Nanosecond),
Timestamp::new(2_000_000_000, TimeUnit::Nanosecond),
);
let key = build_range_cache_key(&ctx, &part_range).unwrap();
// Empty implied range cannot cover, so time filters stay in the key.
assert!(!key.scan.time_filters().is_empty());
}
fn ms_ts(v: i64) -> Timestamp {
Timestamp::new_millisecond(v)
}
fn implied_ms(expr: Expr) -> Option<TimestampRange> {
implied_time_range_from_exprs("ts", TimeUnit::Millisecond, &[&expr])
}
#[test]
fn implied_time_range_supported_exprs() {
for (expr, expected) in [
(
col("ts").gt_eq(ts_lit(1000)),
Some(TimestampRange::from_start(ms_ts(1000))),
),
(
col("ts").gt(ts_lit(1000)),
Some(TimestampRange::from_start(ms_ts(1001))),
),
(
col("ts").lt_eq(ts_lit(2000)),
Some(TimestampRange::until_end(ms_ts(2000), true)),
),
(
col("ts").lt(ts_lit(2000)),
Some(TimestampRange::until_end(ms_ts(2000), false)),
),
(
col("ts").eq(ts_lit(1500)),
Some(TimestampRange::single(ms_ts(1500))),
),
(
ts_lit(1000).lt_eq(col("ts")),
Some(TimestampRange::from_start(ms_ts(1000))),
),
(
col("ts").between(ts_lit(1000), ts_lit(2000)),
Some(TimestampRange::new_inclusive(
Some(ms_ts(1000)),
Some(ms_ts(2000)),
)),
),
(
col("ts")
.gt_eq(ts_lit(1000))
.and(col("ts").lt(ts_lit(2000))),
TimestampRange::with_unit(1000, 2000, TimeUnit::Millisecond),
),
(
col("ts")
.gt_eq(ts_lit(1000))
.and(col("ts").lt(ts_lit(5000)))
.and(col("ts").lt_eq(ts_lit(3000))),
TimestampRange::with_unit(1000, 3001, TimeUnit::Millisecond),
),
] {
assert_eq!(implied_ms(expr), expected);
}
assert_eq!(
implied_time_range_from_exprs("ts", TimeUnit::Millisecond, &[]),
Some(TimestampRange::min_to_max())
);
}
#[test]
fn implied_time_range_unsupported_exprs() {
let not_between = Expr::Between(Between {
expr: Box::new(col("ts")),
negated: true,
low: Box::new(ts_lit(1000)),
high: Box::new(ts_lit(2000)),
});
for expr in [
not_between,
col("ts").gt_eq(ts_lit(1000)).or(col("ts").lt(ts_lit(500))),
Expr::Not(Box::new(col("ts").gt_eq(ts_lit(1000)))),
col("ts").in_list(vec![ts_lit(1000), ts_lit(2000)], false),
col("ts").gt_eq(col("other")),
col("other_ts").gt_eq(ts_lit(1000)),
] {
assert!(implied_ms(expr).is_none());
}
}
#[test]
fn implied_time_range_unit_conversion() {
let second_1 = lit(ScalarValue::TimestampSecond(Some(1), None));
let ns_1500 = lit(ScalarValue::TimestampNanosecond(Some(1_500_000_000), None));
let ns_1500_5 = lit(ScalarValue::TimestampNanosecond(Some(1_500_500_000), None));
for (expr, expected) in [
(
col("ts").gt_eq(second_1.clone()),
Some(TimestampRange::from_start(ms_ts(1000))),
),
(
col("ts").lt_eq(second_1),
Some(TimestampRange::until_end(ms_ts(1000), true)),
),
(
col("ts").eq(ns_1500),
Some(TimestampRange::single(ms_ts(1500))),
),
(col("ts").eq(ns_1500_5.clone()), None),
(
col("ts").gt_eq(ns_1500_5.clone()),
Some(TimestampRange::from_start(ms_ts(1501))),
),
(
col("ts").lt_eq(ns_1500_5.clone()),
Some(TimestampRange::until_end(ms_ts(1500), true)),
),
(
col("ts").gt(ns_1500_5.clone()),
Some(TimestampRange::from_start(ms_ts(1501))),
),
(
col("ts").lt(ns_1500_5),
Some(TimestampRange::until_end(ms_ts(1501), false)),
),
] {
assert_eq!(implied_ms(expr), expected);
}
}
#[test]

View File

@@ -57,7 +57,7 @@ use crate::metrics::READ_SST_COUNT;
use crate::read::compat::{self, FlatCompatBatch};
use crate::read::flat_projection::FlatProjectionMapper;
use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
use crate::read::range_cache::ScanRequestFingerprint;
use crate::read::range_cache::{ScanRequestFingerprint, implied_time_range_from_exprs};
use crate::read::read_columns::{
ReadColumns, merge, read_columns_from_predicate, read_columns_from_projection,
};
@@ -1299,9 +1299,21 @@ fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
}
}
/// Builds a [ScanRequestFingerprint] from a [ScanInput] if the scan is eligible
/// Output of [build_scan_fingerprint]: the cache fingerprint plus the derived
/// implied time range used to decide whether the cache key can drop the time
/// predicates for a given partition (see `build_range_cache_key`).
pub(crate) struct ScanFingerprintBundle {
pub(crate) fingerprint: ScanRequestFingerprint,
/// `Some(r)` = all time-only predicates are guaranteed true on `r` (in the
/// column's `TimeUnit`).
/// `None` = at least one time-only predicate could not be proven (e.g.
/// `OR`), so the cache-key optimization is disabled for this scan.
pub(crate) implied_time_range: Option<TimestampRange>,
}
/// Builds a [ScanFingerprintBundle] from a [ScanInput] if the scan is eligible
/// for partition range caching.
pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option<ScanRequestFingerprint> {
pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option<ScanFingerprintBundle> {
let eligible = !input.compaction
&& !input.files.is_empty()
&& matches!(input.cache_strategy, CacheStrategy::EnableAll(_));
@@ -1334,7 +1346,7 @@ pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option<ScanRequestFin
.unwrap_or_default();
let mut filters = Vec::new();
let mut time_filters = Vec::new();
let mut time_only_exprs: Vec<&Expr> = Vec::new();
let mut has_tag_filter = false;
let mut columns = HashSet::new();
@@ -1350,20 +1362,17 @@ pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option<ScanRequestFin
_ => false,
};
// TODO(yingwen): The split between `time_filters` and `filters` is currently inert
// because `build_range_cache_key()` always keeps both in the cache key. We used to
// strip `time_filters` when the query's `TimestampRange` covered the partition's
// `FileTimeRange`, but `extract_time_range_from_expr` is not precise enough to prove
// a time predicate is implied by that range (it can return a wider range than the
// predicate, and it does not analyze AND/OR shapes), which let the cache reuse rows
// that should have been filtered. Reviving the optimization needs a per-predicate
// implication check that walks each time-only `Expr` (recursing through AND/OR/NOT)
// and proves the predicate holds for every timestamp inside the partition's
// `FileTimeRange`; until then both buckets land in the fingerprint.
// Route time-only exprs that the legacy extractor recognizes into
// `time_only_exprs` so the implication walker
// (`implied_time_range_from_exprs`, called below) can attempt to drop
// them from the cache key when the partition's `FileTimeRange` is fully
// covered, then stringify them into the fingerprint's `time_filters`
// bucket. Time-only exprs that the extractor doesn't recognize stay in
// `filters` and never get stripped — conservatively correct.
if is_time_only
&& extract_time_range_from_expr(&time_index_name, ts_col_unit, expr).is_some()
{
time_filters.push(expr.to_string());
time_only_exprs.push(expr);
} else {
filters.push(expr.to_string());
}
@@ -1374,31 +1383,38 @@ pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option<ScanRequestFin
return None;
}
let implied_time_range =
implied_time_range_from_exprs(&time_index_name, ts_col_unit, &time_only_exprs);
let mut time_filters: Vec<String> = time_only_exprs.iter().map(|e| e.to_string()).collect();
// Ensure the filters are sorted for consistent fingerprinting.
filters.sort_unstable();
time_filters.sort_unstable();
let read_columns = input.read_cols.clone();
Some(
crate::read::range_cache::ScanRequestFingerprintBuilder {
read_column_types: read_columns
.column_ids_iter()
.map(|id| {
metadata
.column_by_id(id)
.map(|col| col.column_schema.data_type.clone())
})
.collect(),
read_columns,
filters,
time_filters,
series_row_selector: input.series_row_selector,
append_mode: input.append_mode,
filter_deleted: input.filter_deleted,
merge_mode: input.merge_mode,
partition_expr_version: metadata.partition_expr_version,
}
.build(),
)
let fingerprint = crate::read::range_cache::ScanRequestFingerprintBuilder {
read_column_types: read_columns
.column_ids_iter()
.map(|id| {
metadata
.column_by_id(id)
.map(|col| col.column_schema.data_type.clone())
})
.collect(),
read_columns,
filters,
time_filters,
series_row_selector: input.series_row_selector,
append_mode: input.append_mode,
filter_deleted: input.filter_deleted,
merge_mode: input.merge_mode,
partition_expr_version: metadata.partition_expr_version,
}
.build();
Some(ScanFingerprintBundle {
fingerprint,
implied_time_range,
})
}
/// Context shared by different streams from a scanner.
@@ -1412,6 +1428,13 @@ pub struct StreamContext {
/// `None` when the scan is not eligible for caching.
#[allow(dead_code)]
pub(crate) scan_fingerprint: Option<ScanRequestFingerprint>,
/// Implied range of every time-only predicate, in the time index column's
/// `TimeUnit`. Used by `build_range_cache_key` to decide whether the
/// partition's `FileTimeRange` is fully covered (allowing `time_filters`
/// to be stripped from the cache key). `None` when caching is ineligible
/// or when the implication walker bailed on an unsupported shape (e.g.
/// `OR`).
pub(crate) scan_implied_time_range: Option<TimestampRange>,
// Metrics:
/// The start time of the query.
@@ -1424,12 +1447,16 @@ impl StreamContext {
let query_start = input.query_start.unwrap_or_else(Instant::now);
let ranges = RangeMeta::seq_scan_ranges(&input);
READ_SST_COUNT.observe(input.num_files() as f64);
let scan_fingerprint = build_scan_fingerprint(&input);
let (scan_fingerprint, scan_implied_time_range) = match build_scan_fingerprint(&input) {
Some(b) => (Some(b.fingerprint), b.implied_time_range),
None => (None, None),
};
Self {
input,
ranges,
scan_fingerprint,
scan_implied_time_range,
query_start,
}
}
@@ -1439,12 +1466,16 @@ impl StreamContext {
let query_start = input.query_start.unwrap_or_else(Instant::now);
let ranges = RangeMeta::unordered_scan_ranges(&input);
READ_SST_COUNT.observe(input.num_files() as f64);
let scan_fingerprint = build_scan_fingerprint(&input);
let (scan_fingerprint, scan_implied_time_range) = match build_scan_fingerprint(&input) {
Some(b) => (Some(b.fingerprint), b.implied_time_range),
None => (None, None),
};
Self {
input,
ranges,
scan_fingerprint,
scan_implied_time_range,
query_start,
}
}
@@ -1841,7 +1872,7 @@ mod tests {
partition_expr_version: 0,
}
.build();
assert_eq!(expected, fingerprint);
assert_eq!(expected, fingerprint.fingerprint);
}
#[tokio::test]
@@ -1914,7 +1945,7 @@ mod tests {
partition_expr_version: metadata.partition_expr_version,
}
.build();
assert_eq!(expected, fingerprint);
assert_eq!(expected, fingerprint.fingerprint);
assert_ne!(0, metadata.partition_expr_version);
}

View File

@@ -1375,6 +1375,7 @@ mod split_tests {
input,
ranges: vec![],
scan_fingerprint: None,
scan_implied_time_range: None,
query_start: std::time::Instant::now(),
}
}
@@ -1755,6 +1756,7 @@ mod tests {
input,
ranges: Vec::new(),
scan_fingerprint: None,
scan_implied_time_range: None,
query_start: Instant::now(),
})
}

View File

@@ -41,7 +41,7 @@ mod stats;
/// In theory, it should be converted to a timestamp scalar value by `TypeConversionRule`.
macro_rules! return_none_if_utf8 {
($lit: ident) => {
if matches!($lit, ScalarValue::Utf8(_)) {
if is_string_timestamp_literal($lit) {
warn!(
"Unexpected ScalarValue::Utf8 in time range predicate: {:?}. Maybe it's an implicit bug, please report it to https://github.com/GreptimeTeam/greptimedb/issues",
$lit
@@ -53,6 +53,13 @@ macro_rules! return_none_if_utf8 {
};
}
pub fn is_string_timestamp_literal(scalar: &ScalarValue) -> bool {
matches!(
scalar,
ScalarValue::Utf8(_) | ScalarValue::LargeUtf8(_) | ScalarValue::Utf8View(_)
)
}
/// Reference-counted pointer to a list of logical exprs and a list of dynamic filter physical exprs.
#[derive(Debug, Clone, Default)]
pub struct Predicate {