From ef6dd5b99f0164c7717ced424ee62d84c2210df5 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Wed, 7 Jan 2026 19:15:34 +0800 Subject: [PATCH] fix: precise filter time index if not in projection (#7531) * fix: precise filter time index if not in projection Signed-off-by: evenyag * test: add sqlness test Signed-off-by: evenyag --------- Signed-off-by: evenyag --- src/mito2/src/engine/projection_test.rs | 118 +++++++++++++++++- src/mito2/src/sst/parquet/file_range.rs | 9 +- .../standalone/common/select/prune.result | 65 ++++++++++ .../cases/standalone/common/select/prune.sql | 25 ++++ 4 files changed, 215 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/engine/projection_test.rs b/src/mito2/src/engine/projection_test.rs index 9e9509b6c9..7726005b0b 100644 --- a/src/mito2/src/engine/projection_test.rs +++ b/src/mito2/src/engine/projection_test.rs @@ -15,12 +15,15 @@ use api::v1::value::ValueData; use api::v1::{Row, Rows}; use common_recordbatch::RecordBatches; +use datafusion_common::ScalarValue; +use datafusion_expr::{col, lit}; use store_api::region_engine::RegionEngine; use store_api::region_request::RegionRequest; use store_api::storage::{RegionId, ScanRequest}; use crate::config::MitoConfig; -use crate::test_util::{CreateRequestBuilder, TestEnv, put_rows, rows_schema}; +use crate::test_util::batch_util::sort_batches_and_print; +use crate::test_util::{CreateRequestBuilder, TestEnv, flush_region, put_rows, rows_schema}; /// Build rows for multiple tags and fields. fn build_rows_multi_tags_fields( @@ -51,6 +54,26 @@ fn build_rows_multi_tags_fields( .collect() } +/// Build rows for fields only (no tags). +fn build_rows_fields_only(field_starts: &[usize], ts_range: (usize, usize)) -> Vec { + (ts_range.0..ts_range.1) + .enumerate() + .map(|(idx, ts)| { + let mut values = Vec::with_capacity(field_starts.len() + 1); + for field_start in field_starts { + values.push(api::v1::Value { + value_data: Some(ValueData::F64Value((field_start + idx) as f64)), + }); + } + values.push(api::v1::Value { + value_data: Some(ValueData::TimestampMillisecondValue(ts as i64 * 1000)), + }); + + api::v1::Row { values } + }) + .collect() +} + #[tokio::test] async fn test_scan_projection() { test_scan_projection_with_format(false).await; @@ -100,3 +123,96 @@ async fn test_scan_projection_with_format(flat_format: bool) { +-------+---------+---------------------+"; assert_eq!(expected, batches.pretty_print().unwrap()); } + +#[tokio::test] +async fn test_scan_projection_without_primary_key() { + common_telemetry::init_default_ut_logging(); + + test_scan_projection_without_primary_key_with_format(false).await; + test_scan_projection_without_primary_key_with_format(true).await; +} + +async fn test_scan_projection_without_primary_key_with_format(flat_format: bool) { + common_telemetry::info!( + "Test projection without pk start, flat format: {}", + flat_format + ); + + let mut env = TestEnv::new().await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; + + let region_id = RegionId::new(1, 1); + // [field_0, field_1, field_2, ts] - no primary key + let request = CreateRequestBuilder::new().tag_num(0).field_num(3).build(); + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Insert first batch and flush to SST #1 (ts: 0, 1, 2) + // field_starts: [0, 10, 20] → field_0: 0,1,2, field_1: 10,11,12, field_2: 20,21,22 + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_fields_only(&[0, 10, 20], (0, 3)), + }; + put_rows(&engine, region_id, rows).await; + flush_region(&engine, region_id, None).await; + + // Insert second batch and flush to SST #2 (ts: 3, 4, 5) + // field_starts: [3, 13, 23] → field_0: 3,4,5, field_1: 13,14,15, field_2: 23,24,25 + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_fields_only(&[3, 13, 23], (3, 6)), + }; + put_rows(&engine, region_id, rows).await; + flush_region(&engine, region_id, None).await; + + // Insert third batch, keep in memtable (ts: 6, 7, 8) + // field_starts: [6, 16, 26] → field_0: 6,7,8, field_1: 16,17,18, field_2: 26,27,28 + let rows = Rows { + schema: column_schemas, + rows: build_rows_fields_only(&[6, 16, 26], (6, 9)), + }; + put_rows(&engine, region_id, rows).await; + + // Scan with projection on field_0 and field_1, filter ts >= 2s + let request = ScanRequest { + projection: Some(vec![0, 1]), // field_0 and field_1 (not ts) + filters: vec![col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(2000), None)))], + ..Default::default() + }; + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + // Filter ts >= 2s returns 7 rows: + // - ts=2 from SST#1: field_0=2, field_1=12 + // - ts=3,4,5 from SST#2: field_0=3,4,5, field_1=13,14,15 + // - ts=6,7,8 from memtable: field_0=6,7,8, field_1=16,17,18 + let expected = "\ ++---------+---------+ +| field_0 | field_1 | ++---------+---------+ +| 2.0 | 12.0 | +| 3.0 | 13.0 | +| 4.0 | 14.0 | +| 5.0 | 15.0 | +| 6.0 | 16.0 | +| 7.0 | 17.0 | +| 8.0 | 18.0 | ++---------+---------+"; + assert_eq!( + expected, + sort_batches_and_print(&batches, &["field_0", "field_1"]) + ); + + common_telemetry::info!( + "Test projection without pk success, flat format: {}", + flat_format + ); +} diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index ae30b8fcb5..1d54eadac6 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -44,7 +44,9 @@ use crate::read::compat::CompatBatch; use crate::read::last_row::RowGroupLastRowCachedReader; use crate::read::prune::{FlatPruneReader, PruneReader}; use crate::sst::file::FileHandle; -use crate::sst::parquet::flat_format::{DecodedPrimaryKeys, decode_primary_keys}; +use crate::sst::parquet::flat_format::{ + DecodedPrimaryKeys, decode_primary_keys, time_index_column_index, +}; use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::reader::{ FlatRowGroupReader, MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext, @@ -606,6 +608,11 @@ impl RangeBase { .evaluate_array(&tag_column) .context(RecordBatchSnafu)?; mask = mask.bitand(&result); + } else if filter_ctx.semantic_type() == SemanticType::Timestamp { + let time_index_pos = time_index_column_index(input.num_columns()); + let column = &input.columns()[time_index_pos]; + let result = filter.evaluate_array(column).context(RecordBatchSnafu)?; + mask = mask.bitand(&result); } // Non-tag column not found in projection. } diff --git a/tests/cases/standalone/common/select/prune.result b/tests/cases/standalone/common/select/prune.result index 88475852f7..d5e2b5c3de 100644 --- a/tests/cases/standalone/common/select/prune.result +++ b/tests/cases/standalone/common/select/prune.result @@ -128,3 +128,68 @@ drop table demo; Affected Rows: 0 +CREATE TABLE test_time_filter( + host STRING, + greptime_timestamp TIMESTAMP, + TIME INDEX(greptime_timestamp) +) +WITH +( + 'append_mode' = 'true', + 'sst_format' = 'flat' +); + +Affected Rows: 0 + +INSERT INTO test_time_filter(host, greptime_timestamp) VALUES ('hello', '2026-01-07T00:00:00'), ('world', '2026-01-07T00:00:01'), ('test', '2026-01-07T00:00:00'), ('go', '2026-01-07T00:00:01'); + +Affected Rows: 4 + +SELECT host FROM test_time_filter WHERE greptime_timestamp > '2026-01-07 00:00:00'; + ++-------+ +| host | ++-------+ +| go | +| world | ++-------+ + +SELECT host, greptime_timestamp FROM test_time_filter WHERE greptime_timestamp > '2026-01-07 00:00:00'; + ++-------+---------------------+ +| host | greptime_timestamp | ++-------+---------------------+ +| go | 2026-01-07T00:00:01 | +| world | 2026-01-07T00:00:01 | ++-------+---------------------+ + +ADMIN flush_table('test_time_filter'); + ++---------------------------------------+ +| ADMIN flush_table('test_time_filter') | ++---------------------------------------+ +| 0 | ++---------------------------------------+ + +SELECT host FROM test_time_filter WHERE greptime_timestamp > '2026-01-07 00:00:00'; + ++-------+ +| host | ++-------+ +| go | +| world | ++-------+ + +SELECT host, greptime_timestamp FROM test_time_filter WHERE greptime_timestamp > '2026-01-07 00:00:00'; + ++-------+---------------------+ +| host | greptime_timestamp | ++-------+---------------------+ +| go | 2026-01-07T00:00:01 | +| world | 2026-01-07T00:00:01 | ++-------+---------------------+ + +DROP TABLE test_time_filter; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/select/prune.sql b/tests/cases/standalone/common/select/prune.sql index 4b976cdb1c..226c1cef69 100644 --- a/tests/cases/standalone/common/select/prune.sql +++ b/tests/cases/standalone/common/select/prune.sql @@ -38,3 +38,28 @@ SELECT * FROM demo where host in ('test1'); explain analyze SELECT * FROM demo where host in ('test1'); drop table demo; + +CREATE TABLE test_time_filter( + host STRING, + greptime_timestamp TIMESTAMP, + TIME INDEX(greptime_timestamp) +) +WITH +( + 'append_mode' = 'true', + 'sst_format' = 'flat' +); + +INSERT INTO test_time_filter(host, greptime_timestamp) VALUES ('hello', '2026-01-07T00:00:00'), ('world', '2026-01-07T00:00:01'), ('test', '2026-01-07T00:00:00'), ('go', '2026-01-07T00:00:01'); + +SELECT host FROM test_time_filter WHERE greptime_timestamp > '2026-01-07 00:00:00'; + +SELECT host, greptime_timestamp FROM test_time_filter WHERE greptime_timestamp > '2026-01-07 00:00:00'; + +ADMIN flush_table('test_time_filter'); + +SELECT host FROM test_time_filter WHERE greptime_timestamp > '2026-01-07 00:00:00'; + +SELECT host, greptime_timestamp FROM test_time_filter WHERE greptime_timestamp > '2026-01-07 00:00:00'; + +DROP TABLE test_time_filter;