diff --git a/src/mito2/src/engine/scan_test.rs b/src/mito2/src/engine/scan_test.rs index d4d48b9fe6..75fbc848ea 100644 --- a/src/mito2/src/engine/scan_test.rs +++ b/src/mito2/src/engine/scan_test.rs @@ -15,10 +15,13 @@ use std::collections::BTreeMap; use api::v1::Rows; +use common_base::readable_size::ReadableSize; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_recordbatch::RecordBatches; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion_common::ScalarValue; +use datafusion_expr::{col, lit}; use datatypes::arrow::array::AsArray; use datatypes::arrow::datatypes::{Float64Type, TimestampMillisecondType}; use futures::TryStreamExt; @@ -795,3 +798,97 @@ async fn test_series_scan_flat_small_permits() { ); } } + +// Regression test: `ts = a OR ts = b` extracts to a `TimestampRange` that +// `GenericRange::or` widens into `[min(a, b), max(a, b) + 1)`. Two such +// predicates with different `a` values can both extract to ranges that cover +// the same partition while selecting different (or no) rows. The previous +// cover check would strip both predicates from the cache key, letting the +// second scan return the first scan's cached row. +#[tokio::test] +async fn test_range_cache_separates_or_equality_time_filters() { + let mut env = TestEnv::new().await; + let engine = env + .create_engine(MitoConfig { + default_flat_format: true, + // Explicitly enable the range result cache: the bug only reproduces + // when the second scan can replay the first scan's cached batches. + range_result_cache_size: ReadableSize::mb(64), + ..Default::default() + }) + .await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let column_schemas = test_util::rows_schema(&request); + + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Partition rows ts=5..10 (5000ms..9000ms), flushed to SST. + test_util::put_rows( + &engine, + region_id, + Rows { + schema: column_schemas.clone(), + rows: test_util::build_rows(5, 10), + }, + ) + .await; + test_util::flush_region(&engine, region_id, None).await; + + let ts_lit = |ms: i64| lit(ScalarValue::TimestampMillisecond(Some(ms), None)); + let tag_filter = || col("tag_0").gt_eq(lit(ScalarValue::Utf8(Some("0".to_string())))); + + // First scan: (ts = 5000) OR (ts = 100000) -- extracts to `[5000, 100001)`, + // which covers the partition `[5000, 9000]`. Selects ts=5. + let stream = engine + .scan_to_stream( + region_id, + ScanRequest { + filters: vec![ + tag_filter(), + col("ts").eq(ts_lit(5000)).or(col("ts").eq(ts_lit(100000))), + ], + ..Default::default() + }, + ) + .await + .unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected_first = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 5 | 5.0 | 1970-01-01T00:00:05 | ++-------+---------+---------------------+"; + assert_eq!(expected_first, batches.pretty_print().unwrap()); + + // Second scan: (ts = 3000) OR (ts = 100000) -- extracts to `[3000, 100001)`, + // which also covers the partition. Selects nothing. With the buggy cover + // check both scans built the same cache key (tag filter only), so this scan + // would replay the first scan's cached row and incorrectly return ts=5. + let stream = engine + .scan_to_stream( + region_id, + ScanRequest { + filters: vec![ + tag_filter(), + col("ts").eq(ts_lit(3000)).or(col("ts").eq(ts_lit(100000))), + ], + ..Default::default() + }, + ) + .await + .unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let row_count: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!( + 0, + row_count, + "expected empty result, got: {}", + batches.pretty_print().unwrap() + ); +} diff --git a/src/mito2/src/read/range_cache.rs b/src/mito2/src/read/range_cache.rs index 208715f57d..7d1010205d 100644 --- a/src/mito2/src/read/range_cache.rs +++ b/src/mito2/src/read/range_cache.rs @@ -19,7 +19,6 @@ use std::sync::Arc; use async_stream::try_stream; use common_telemetry::warn; -use common_time::range::TimestampRange; use datatypes::arrow::compute::concat_batches; use datatypes::arrow::record_batch::RecordBatch; use datatypes::prelude::ConcreteDataType; @@ -36,7 +35,6 @@ use crate::read::read_columns::ReadColumns; use crate::read::scan_region::StreamContext; use crate::read::scan_util::PartitionMetrics; use crate::region::options::MergeMode; -use crate::sst::file::FileTimeRange; use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE; const RANGE_CACHE_COMPACT_THRESHOLD_BYTES: usize = 8 * 1024 * 1024; @@ -141,6 +139,7 @@ impl ScanRequestFingerprint { .unwrap_or(&[]) } + #[allow(dead_code)] pub(crate) fn without_time_filters(&self) -> Self { Self { inner: Arc::clone(&self.inner), @@ -293,15 +292,17 @@ pub(crate) fn build_range_cache_key( return None; } - let range_meta = &stream_ctx.ranges[part_range.identifier]; - let scan = if query_time_range_covers_partition_range( - stream_ctx.input.time_range.as_ref(), - range_meta.time_range, - ) { - fingerprint.without_time_filters() - } else { - fingerprint.clone() - }; + // TODO(yingwen): We used to call `fingerprint.without_time_filters()` when the query's + // `TimestampRange` fully covered the partition's `FileTimeRange`, so different queries that + // all enclosed the same partition could share a cache entry. The cover check turned out to + // be too coarse: it returned true in cases where the dropped time predicates would still + // have excluded rows, so the cache served results that should have been filtered. Reviving + // the optimization needs a per-predicate implication check that walks each time-only `Expr` + // (recursing through AND/OR/NOT) and proves the predicate is satisfied for every timestamp + // inside the partition's `FileTimeRange` — not the looser "does `extract_time_range_from_expr` + // return a range that covers the partition" used previously. Until then, always carry the + // full fingerprint so cache reuse stays correct. + let scan = fingerprint.clone(); Some(RangeScanCacheKey { region_id: stream_ctx.input.region_metadata().region_id, @@ -310,18 +311,6 @@ pub(crate) fn build_range_cache_key( }) } -fn query_time_range_covers_partition_range( - query_time_range: Option<&TimestampRange>, - partition_time_range: FileTimeRange, -) -> bool { - let Some(query_time_range) = query_time_range else { - return true; - }; - - let (part_start, part_end) = partition_time_range; - query_time_range.contains(&part_start) && query_time_range.contains(&part_end) -} - /// Returns a stream that replays cached record batches. pub(crate) fn cached_flat_range_stream(value: Arc) -> BoxedRecordBatchStream { Box::pin(try_stream! { @@ -636,6 +625,7 @@ mod tests { use crate::read::flat_projection::FlatProjectionMapper; use crate::read::range::{RangeMeta, RowGroupIndex, SourceIndex}; use crate::read::scan_region::{PredicateGroup, ScanInput}; + use crate::sst::file::FileTimeRange; use crate::test_util::memtable_util::metadata_with_primary_key; use crate::test_util::scheduler_util::SchedulerEnv; use crate::test_util::sst_util::sst_file_handle_with_file_id; @@ -780,7 +770,7 @@ mod tests { } #[tokio::test] - async fn strips_time_only_filters_when_query_covers_partition_range() { + async fn preserves_time_filters_when_query_covers_partition_range() { assert_range_cache_filters( vec![ col("ts").gt_eq(ts_lit(1000)), @@ -794,7 +784,7 @@ mod tests { Timestamp::new_millisecond(2000), ), vec![col("k0").eq(lit("foo")), col("ts").is_not_null()], - vec![], + vec![col("ts").gt_eq(ts_lit(1000)), col("ts").lt(ts_lit(2001))], ) .await; } @@ -815,7 +805,7 @@ mod tests { } #[tokio::test] - async fn strips_time_only_filters_when_query_has_no_time_range_limit() { + async fn preserves_time_filters_when_query_has_no_time_range_limit() { assert_range_cache_filters( vec![ col("ts").gt_eq(ts_lit(1000)), @@ -828,7 +818,7 @@ mod tests { Timestamp::new_millisecond(2000), ), vec![col("k0").eq(lit("foo")), col("ts").is_not_null()], - vec![], + vec![col("ts").gt_eq(ts_lit(1000))], ) .await; } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 19360f7208..fb30913534 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -1367,16 +1367,21 @@ pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option false, }; + // TODO(yingwen): The split between `time_filters` and `filters` is currently inert + // because `build_range_cache_key()` always keeps both in the cache key. We used to + // strip `time_filters` when the query's `TimestampRange` covered the partition's + // `FileTimeRange`, but `extract_time_range_from_expr` is not precise enough to prove + // a time predicate is implied by that range (it can return a wider range than the + // predicate, and it does not analyze AND/OR shapes), which let the cache reuse rows + // that should have been filtered. Reviving the optimization needs a per-predicate + // implication check that walks each time-only `Expr` (recursing through AND/OR/NOT) + // and proves the predicate holds for every timestamp inside the partition's + // `FileTimeRange`; until then both buckets land in the fingerprint. if is_time_only && extract_time_range_from_expr(&time_index_name, ts_col_unit, expr).is_some() { - // Range-reducible time predicates can be safely dropped from the - // cache key when the query time range covers the partition range. time_filters.push(expr.to_string()); } else { - // Non-time filters and non-range time predicates (those that - // extract_time_range_from_expr cannot convert to a TimestampRange) - // always stay in the cache key. filters.push(expr.to_string()); } }