fix(mito2): drop unsound time-filter cache-key stripping (#8105)

* fix(mito2): drop unsound time-filter cache-key stripping

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: update comments and test

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2026-05-13 18:43:22 +08:00
committed by GitHub
parent d6638374e9
commit 5e468190a5
3 changed files with 124 additions and 32 deletions

View File

@@ -15,10 +15,13 @@
use std::collections::BTreeMap;
use api::v1::Rows;
use common_base::readable_size::ReadableSize;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::RecordBatches;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_common::ScalarValue;
use datafusion_expr::{col, lit};
use datatypes::arrow::array::AsArray;
use datatypes::arrow::datatypes::{Float64Type, TimestampMillisecondType};
use futures::TryStreamExt;
@@ -795,3 +798,97 @@ async fn test_series_scan_flat_small_permits() {
);
}
}
// Regression test: `ts = a OR ts = b` extracts to a `TimestampRange` that
// `GenericRange::or` widens into `[min(a, b), max(a, b) + 1)`. Two such
// predicates with different `a` values can both extract to ranges that cover
// the same partition while selecting different (or no) rows. The previous
// cover check would strip both predicates from the cache key, letting the
// second scan return the first scan's cached row.
#[tokio::test]
async fn test_range_cache_separates_or_equality_time_filters() {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_flat_format: true,
// Explicitly enable the range result cache: the bug only reproduces
// when the second scan can replay the first scan's cached batches.
range_result_cache_size: ReadableSize::mb(64),
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let column_schemas = test_util::rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Partition rows ts=5..10 (5000ms..9000ms), flushed to SST.
test_util::put_rows(
&engine,
region_id,
Rows {
schema: column_schemas.clone(),
rows: test_util::build_rows(5, 10),
},
)
.await;
test_util::flush_region(&engine, region_id, None).await;
let ts_lit = |ms: i64| lit(ScalarValue::TimestampMillisecond(Some(ms), None));
let tag_filter = || col("tag_0").gt_eq(lit(ScalarValue::Utf8(Some("0".to_string()))));
// First scan: (ts = 5000) OR (ts = 100000) -- extracts to `[5000, 100001)`,
// which covers the partition `[5000, 9000]`. Selects ts=5.
let stream = engine
.scan_to_stream(
region_id,
ScanRequest {
filters: vec![
tag_filter(),
col("ts").eq(ts_lit(5000)).or(col("ts").eq(ts_lit(100000))),
],
..Default::default()
},
)
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected_first = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 5 | 5.0 | 1970-01-01T00:00:05 |
+-------+---------+---------------------+";
assert_eq!(expected_first, batches.pretty_print().unwrap());
// Second scan: (ts = 3000) OR (ts = 100000) -- extracts to `[3000, 100001)`,
// which also covers the partition. Selects nothing. With the buggy cover
// check both scans built the same cache key (tag filter only), so this scan
// would replay the first scan's cached row and incorrectly return ts=5.
let stream = engine
.scan_to_stream(
region_id,
ScanRequest {
filters: vec![
tag_filter(),
col("ts").eq(ts_lit(3000)).or(col("ts").eq(ts_lit(100000))),
],
..Default::default()
},
)
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let row_count: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(
0,
row_count,
"expected empty result, got: {}",
batches.pretty_print().unwrap()
);
}

View File

@@ -19,7 +19,6 @@ use std::sync::Arc;
use async_stream::try_stream;
use common_telemetry::warn;
use common_time::range::TimestampRange;
use datatypes::arrow::compute::concat_batches;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::prelude::ConcreteDataType;
@@ -36,7 +35,6 @@ use crate::read::read_columns::ReadColumns;
use crate::read::scan_region::StreamContext;
use crate::read::scan_util::PartitionMetrics;
use crate::region::options::MergeMode;
use crate::sst::file::FileTimeRange;
use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
const RANGE_CACHE_COMPACT_THRESHOLD_BYTES: usize = 8 * 1024 * 1024;
@@ -141,6 +139,7 @@ impl ScanRequestFingerprint {
.unwrap_or(&[])
}
#[allow(dead_code)]
pub(crate) fn without_time_filters(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
@@ -293,15 +292,17 @@ pub(crate) fn build_range_cache_key(
return None;
}
let range_meta = &stream_ctx.ranges[part_range.identifier];
let scan = if query_time_range_covers_partition_range(
stream_ctx.input.time_range.as_ref(),
range_meta.time_range,
) {
fingerprint.without_time_filters()
} else {
fingerprint.clone()
};
// TODO(yingwen): We used to call `fingerprint.without_time_filters()` when the query's
// `TimestampRange` fully covered the partition's `FileTimeRange`, so different queries that
// all enclosed the same partition could share a cache entry. The cover check turned out to
// be too coarse: it returned true in cases where the dropped time predicates would still
// have excluded rows, so the cache served results that should have been filtered. Reviving
// the optimization needs a per-predicate implication check that walks each time-only `Expr`
// (recursing through AND/OR/NOT) and proves the predicate is satisfied for every timestamp
// inside the partition's `FileTimeRange` — not the looser "does `extract_time_range_from_expr`
// return a range that covers the partition" used previously. Until then, always carry the
// full fingerprint so cache reuse stays correct.
let scan = fingerprint.clone();
Some(RangeScanCacheKey {
region_id: stream_ctx.input.region_metadata().region_id,
@@ -310,18 +311,6 @@ pub(crate) fn build_range_cache_key(
})
}
fn query_time_range_covers_partition_range(
query_time_range: Option<&TimestampRange>,
partition_time_range: FileTimeRange,
) -> bool {
let Some(query_time_range) = query_time_range else {
return true;
};
let (part_start, part_end) = partition_time_range;
query_time_range.contains(&part_start) && query_time_range.contains(&part_end)
}
/// Returns a stream that replays cached record batches.
pub(crate) fn cached_flat_range_stream(value: Arc<RangeScanCacheValue>) -> BoxedRecordBatchStream {
Box::pin(try_stream! {
@@ -636,6 +625,7 @@ mod tests {
use crate::read::flat_projection::FlatProjectionMapper;
use crate::read::range::{RangeMeta, RowGroupIndex, SourceIndex};
use crate::read::scan_region::{PredicateGroup, ScanInput};
use crate::sst::file::FileTimeRange;
use crate::test_util::memtable_util::metadata_with_primary_key;
use crate::test_util::scheduler_util::SchedulerEnv;
use crate::test_util::sst_util::sst_file_handle_with_file_id;
@@ -780,7 +770,7 @@ mod tests {
}
#[tokio::test]
async fn strips_time_only_filters_when_query_covers_partition_range() {
async fn preserves_time_filters_when_query_covers_partition_range() {
assert_range_cache_filters(
vec![
col("ts").gt_eq(ts_lit(1000)),
@@ -794,7 +784,7 @@ mod tests {
Timestamp::new_millisecond(2000),
),
vec![col("k0").eq(lit("foo")), col("ts").is_not_null()],
vec![],
vec![col("ts").gt_eq(ts_lit(1000)), col("ts").lt(ts_lit(2001))],
)
.await;
}
@@ -815,7 +805,7 @@ mod tests {
}
#[tokio::test]
async fn strips_time_only_filters_when_query_has_no_time_range_limit() {
async fn preserves_time_filters_when_query_has_no_time_range_limit() {
assert_range_cache_filters(
vec![
col("ts").gt_eq(ts_lit(1000)),
@@ -828,7 +818,7 @@ mod tests {
Timestamp::new_millisecond(2000),
),
vec![col("k0").eq(lit("foo")), col("ts").is_not_null()],
vec![],
vec![col("ts").gt_eq(ts_lit(1000))],
)
.await;
}

View File

@@ -1367,16 +1367,21 @@ pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option<ScanRequestFin
_ => false,
};
// TODO(yingwen): The split between `time_filters` and `filters` is currently inert
// because `build_range_cache_key()` always keeps both in the cache key. We used to
// strip `time_filters` when the query's `TimestampRange` covered the partition's
// `FileTimeRange`, but `extract_time_range_from_expr` is not precise enough to prove
// a time predicate is implied by that range (it can return a wider range than the
// predicate, and it does not analyze AND/OR shapes), which let the cache reuse rows
// that should have been filtered. Reviving the optimization needs a per-predicate
// implication check that walks each time-only `Expr` (recursing through AND/OR/NOT)
// and proves the predicate holds for every timestamp inside the partition's
// `FileTimeRange`; until then both buckets land in the fingerprint.
if is_time_only
&& extract_time_range_from_expr(&time_index_name, ts_col_unit, expr).is_some()
{
// Range-reducible time predicates can be safely dropped from the
// cache key when the query time range covers the partition range.
time_filters.push(expr.to_string());
} else {
// Non-time filters and non-range time predicates (those that
// extract_time_range_from_expr cannot convert to a TimestampRange)
// always stay in the cache key.
filters.push(expr.to_string());
}
}