diff --git a/src/common/recordbatch/src/filter.rs b/src/common/recordbatch/src/filter.rs index d7c522e656..9f1b596a49 100644 --- a/src/common/recordbatch/src/filter.rs +++ b/src/common/recordbatch/src/filter.rs @@ -48,7 +48,7 @@ use crate::error::{ArrowComputeSnafu, Result, ToArrowScalarSnafu, UnsupportedOpe /// /// This struct contains normalized predicate expr. In the form of /// `col` `op` `literal` where the `col` is provided from input. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct SimpleFilterEvaluator { /// Name of the referenced column. column_name: String, diff --git a/src/mito-codec/benches/bench_primary_key_filter.rs b/src/mito-codec/benches/bench_primary_key_filter.rs index 47158f87f8..2cc35abb22 100644 --- a/src/mito-codec/benches/bench_primary_key_filter.rs +++ b/src/mito-codec/benches/bench_primary_key_filter.rs @@ -246,18 +246,18 @@ fn bench_primary_key_filter(c: &mut Criterion) { let dense_pk = encode_dense_pk(&metadata, &row); let dense_codec = DensePrimaryKeyCodec::new(&metadata); - let mut dense_fast = dense_codec.primary_key_filter(&metadata, filters.clone()); + let mut dense_fast = dense_codec.primary_key_filter(&metadata, filters.clone(), false); let mut dense_offsets = Vec::new(); let sparse_pk = encode_sparse_pk(&metadata, &row); let sparse_codec = SparsePrimaryKeyCodec::new(&metadata); - let mut sparse_fast = sparse_codec.primary_key_filter(&metadata, filters.clone()); + let mut sparse_fast = sparse_codec.primary_key_filter(&metadata, filters.clone(), false); let mut sparse_offsets = std::collections::HashMap::new(); let mut group = c.benchmark_group(format!("primary_key_filter/{case_name}")); group.bench_function("dense/fast", |b| { - b.iter(|| black_box(dense_fast.matches(black_box(&dense_pk)))) + b.iter(|| black_box(dense_fast.matches(black_box(&dense_pk)).unwrap())) }); group.bench_function("dense/scalar", |b| { b.iter(|| { @@ -272,7 +272,7 @@ fn bench_primary_key_filter(c: &mut Criterion) { }); group.bench_function("sparse/fast", |b| { - b.iter(|| black_box(sparse_fast.matches(black_box(&sparse_pk)))) + b.iter(|| black_box(sparse_fast.matches(black_box(&sparse_pk)).unwrap())) }); group.bench_function("sparse/scalar", |b| { b.iter(|| { diff --git a/src/mito-codec/src/error.rs b/src/mito-codec/src/error.rs index 1be0074b1e..78a656415a 100644 --- a/src/mito-codec/src/error.rs +++ b/src/mito-codec/src/error.rs @@ -72,6 +72,14 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to evaluate filter"))] + EvaluateFilter { + #[snafu(source(from(common_recordbatch::error::Error, Box::new)))] + source: Box, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -86,6 +94,7 @@ impl ErrorExt for Error { StatusCode::InvalidArguments } NotSupportedField { .. } | UnsupportedOperation { .. } => StatusCode::Unsupported, + EvaluateFilter { source, .. } => source.status_code(), } } diff --git a/src/mito-codec/src/primary_key_filter.rs b/src/mito-codec/src/primary_key_filter.rs index 189c7a08cd..70fda7bf54 100644 --- a/src/mito-codec/src/primary_key_filter.rs +++ b/src/mito-codec/src/primary_key_filter.rs @@ -20,11 +20,12 @@ use common_recordbatch::filter::SimpleFilterEvaluator; use datatypes::data_type::ConcreteDataType; use datatypes::value::Value; use memcomparable::Serializer; +use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME; use store_api::storage::ColumnId; -use crate::error::Result; +use crate::error::{EvaluateFilterSnafu, Result}; use crate::row_converter::{ DensePrimaryKeyCodec, PrimaryKeyFilter, SortField, SparsePrimaryKeyCodec, }; @@ -41,8 +42,12 @@ struct PrimaryKeyFilterInner { } impl PrimaryKeyFilterInner { - fn new(metadata: RegionMetadataRef, filters: Arc>) -> Self { - let compiled_filters = Self::compile_filters(&metadata, &filters); + fn new( + metadata: RegionMetadataRef, + filters: Arc>, + skip_partition_column: bool, + ) -> Self { + let compiled_filters = Self::compile_filters(&metadata, &filters, skip_partition_column); Self { filters, compiled_filters, @@ -52,6 +57,7 @@ impl PrimaryKeyFilterInner { fn compile_filters( metadata: &RegionMetadataRef, filters: &[SimpleFilterEvaluator], + skip_partition_column: bool, ) -> Vec { if filters.is_empty() || metadata.primary_key.is_empty() { return Vec::new(); @@ -59,7 +65,7 @@ impl PrimaryKeyFilterInner { let mut compiled_filters = Vec::with_capacity(filters.len()); for (filter_idx, filter) in filters.iter().enumerate() { - if is_partition_column(filter.column_name()) { + if skip_partition_column && is_partition_column(filter.column_name()) { continue; } @@ -91,43 +97,36 @@ impl PrimaryKeyFilterInner { compiled_filters } - fn evaluate_filters<'a>(&self, accessor: &mut impl PrimaryKeyValueAccessor<'a>) -> bool { + fn evaluate_filters<'a>( + &self, + accessor: &mut impl PrimaryKeyValueAccessor<'a>, + ) -> Result { if self.compiled_filters.is_empty() { - return true; + return Ok(true); } for compiled in &self.compiled_filters { let filter = &self.filters[compiled.filter_idx]; let passed = if let Some(fast_path) = &compiled.fast_path { - let encoded_value = match accessor.encoded_value(compiled) { - Ok(v) => v, - Err(e) => { - common_telemetry::error!(e; "Failed to decode primary key"); - return true; - } - }; + let encoded_value = accessor.encoded_value(compiled)?; fast_path.matches(encoded_value) } else { - let value = match accessor.decode_value(compiled) { - Ok(v) => v, - Err(e) => { - common_telemetry::error!(e; "Failed to decode primary key"); - return true; - } - }; + let value = accessor.decode_value(compiled)?; // Safety: arrow schema and datatypes are constructed from the same source. let scalar_value = value.try_to_scalar_value(&compiled.data_type).unwrap(); - filter.evaluate_scalar(&scalar_value).unwrap_or(true) + filter + .evaluate_scalar(&scalar_value) + .context(EvaluateFilterSnafu)? }; if !passed { - return false; + return Ok(false); } } - true + Ok(true) } } @@ -258,9 +257,10 @@ impl DensePrimaryKeyFilter { metadata: RegionMetadataRef, filters: Arc>, codec: DensePrimaryKeyCodec, + skip_partition_column: bool, ) -> Self { Self { - inner: PrimaryKeyFilterInner::new(metadata, filters), + inner: PrimaryKeyFilterInner::new(metadata, filters, skip_partition_column), codec, offsets_buf: Vec::new(), } @@ -268,7 +268,7 @@ impl DensePrimaryKeyFilter { } impl PrimaryKeyFilter for DensePrimaryKeyFilter { - fn matches(&mut self, pk: &[u8]) -> bool { + fn matches(&mut self, pk: &[u8]) -> Result { self.offsets_buf.clear(); let mut accessor = DensePrimaryKeyValueAccessor { pk, @@ -311,9 +311,10 @@ impl SparsePrimaryKeyFilter { metadata: RegionMetadataRef, filters: Arc>, codec: SparsePrimaryKeyCodec, + skip_partition_column: bool, ) -> Self { Self { - inner: PrimaryKeyFilterInner::new(metadata, filters), + inner: PrimaryKeyFilterInner::new(metadata, filters, skip_partition_column), codec, offsets_map: HashMap::new(), } @@ -321,7 +322,7 @@ impl SparsePrimaryKeyFilter { } impl PrimaryKeyFilter for SparsePrimaryKeyFilter { - fn matches(&mut self, pk: &[u8]) -> bool { + fn matches(&mut self, pk: &[u8]) -> Result { self.offsets_map.clear(); let mut accessor = SparsePrimaryKeyValueAccessor { pk, @@ -369,6 +370,7 @@ mod tests { use datatypes::schema::ColumnSchema; use datatypes::value::{OrderedFloat, ValueRef}; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; + use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME; use store_api::storage::{ColumnId, RegionId}; use super::*; @@ -423,6 +425,36 @@ mod tests { Arc::new(metadata) } + fn setup_partitioned_metadata() -> RegionMetadataRef { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + DATA_SCHEMA_TABLE_ID_COLUMN_NAME, + ConcreteDataType::uint32_datatype(), + false, + ), + semantic_type: SemanticType::Tag, + column_id: 10, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("tag", ConcreteDataType::string_datatype(), true), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + greptime_timestamp(), + ConcreteDataType::timestamp_nanosecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 2, + }) + .primary_key(vec![10, 1]); + Arc::new(builder.build().unwrap()) + } + fn create_test_row() -> Vec<(ColumnId, ValueRef<'static>)> { vec![ (1, ValueRef::String("greptime-frontend-6989d9899-22222")), @@ -479,8 +511,8 @@ mod tests { )]); let pk = encode_sparse_pk(&metadata, create_test_row()); let codec = SparsePrimaryKeyCodec::new(&metadata); - let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec); - assert!(filter.matches(&pk)); + let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec, false); + assert!(filter.matches(&pk).unwrap()); } #[test] @@ -492,8 +524,8 @@ mod tests { )]); let pk = encode_sparse_pk(&metadata, create_test_row()); let codec = SparsePrimaryKeyCodec::new(&metadata); - let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec); - assert!(!filter.matches(&pk)); + let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec, false); + assert!(!filter.matches(&pk).unwrap()); } #[test] @@ -505,8 +537,8 @@ mod tests { )]); let pk = encode_sparse_pk(&metadata, create_test_row()); let codec = SparsePrimaryKeyCodec::new(&metadata); - let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec); - assert!(filter.matches(&pk)); + let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec, false); + assert!(filter.matches(&pk).unwrap()); } #[test] @@ -518,8 +550,8 @@ mod tests { )]); let pk = encode_dense_pk(&metadata, create_test_row()); let codec = DensePrimaryKeyCodec::new(&metadata); - let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec); - assert!(filter.matches(&pk)); + let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec, false); + assert!(filter.matches(&pk).unwrap()); } #[test] @@ -531,8 +563,8 @@ mod tests { )]); let pk = encode_dense_pk(&metadata, create_test_row()); let codec = DensePrimaryKeyCodec::new(&metadata); - let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec); - assert!(!filter.matches(&pk)); + let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec, false); + assert!(!filter.matches(&pk).unwrap()); } #[test] @@ -544,8 +576,8 @@ mod tests { )]); let pk = encode_dense_pk(&metadata, create_test_row()); let codec = DensePrimaryKeyCodec::new(&metadata); - let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec); - assert!(filter.matches(&pk)); + let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec, false); + assert!(filter.matches(&pk).unwrap()); } #[test] @@ -563,8 +595,9 @@ mod tests { for (op, value, expected) in cases { let filters = Arc::new(vec![create_filter_with_op("pod", op, value)]); - let mut filter = DensePrimaryKeyFilter::new(metadata.clone(), filters, codec.clone()); - assert_eq!(expected, filter.matches(&pk)); + let mut filter = + DensePrimaryKeyFilter::new(metadata.clone(), filters, codec.clone(), false); + assert_eq!(expected, filter.matches(&pk).unwrap()); } } @@ -583,8 +616,9 @@ mod tests { for (op, value, expected) in cases { let filters = Arc::new(vec![create_filter_with_op("pod", op, value)]); - let mut filter = SparsePrimaryKeyFilter::new(metadata.clone(), filters, codec.clone()); - assert_eq!(expected, filter.matches(&pk)); + let mut filter = + SparsePrimaryKeyFilter::new(metadata.clone(), filters, codec.clone(), false); + assert_eq!(expected, filter.matches(&pk).unwrap()); } } @@ -616,8 +650,52 @@ mod tests { .unwrap(); let filters = Arc::new(vec![create_filter_with_op("f", Operator::Eq, 0.0_f64)]); - let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec); + let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec, false); - assert!(filter.matches(&pk)); + assert!(filter.matches(&pk).unwrap()); + } + + #[test] + fn test_dense_primary_key_filter_matches_partition_column_by_default() { + let metadata = setup_partitioned_metadata(); + let codec = DensePrimaryKeyCodec::new(&metadata); + let mut pk = Vec::new(); + codec + .encode_to_vec( + [ValueRef::UInt32(42), ValueRef::String("host-a")].into_iter(), + &mut pk, + ) + .unwrap(); + + let filters = Arc::new(vec![create_filter_with_op( + DATA_SCHEMA_TABLE_ID_COLUMN_NAME, + Operator::Eq, + 42_u32, + )]); + let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec, false); + + assert!(filter.matches(&pk).unwrap()); + } + + #[test] + fn test_dense_primary_key_filter_can_skip_partition_column() { + let metadata = setup_partitioned_metadata(); + let codec = DensePrimaryKeyCodec::new(&metadata); + let mut pk = Vec::new(); + codec + .encode_to_vec( + [ValueRef::UInt32(42), ValueRef::String("host-a")].into_iter(), + &mut pk, + ) + .unwrap(); + + let filters = Arc::new(vec![create_filter_with_op( + DATA_SCHEMA_TABLE_ID_COLUMN_NAME, + Operator::Eq, + 7_u32, + )]); + let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec, true); + + assert!(filter.matches(&pk).unwrap()); } } diff --git a/src/mito-codec/src/row_converter.rs b/src/mito-codec/src/row_converter.rs index 6fe33a9ee7..fa57e1d96e 100644 --- a/src/mito-codec/src/row_converter.rs +++ b/src/mito-codec/src/row_converter.rs @@ -53,7 +53,7 @@ pub trait PrimaryKeyCodecExt { pub trait PrimaryKeyFilter: Send + Sync { /// Returns true if the primary key matches the filter. - fn matches(&mut self, pk: &[u8]) -> bool; + fn matches(&mut self, pk: &[u8]) -> Result; } /// Composite values decoded from primary key bytes. @@ -120,6 +120,7 @@ pub trait PrimaryKeyCodec: Send + Sync + Debug { &self, metadata: &RegionMetadataRef, filters: Arc>, + skip_partition_column: bool, ) -> Box; /// Returns the estimated size of the primary key. diff --git a/src/mito-codec/src/row_converter/dense.rs b/src/mito-codec/src/row_converter/dense.rs index 6cc70feaea..4bc774c941 100644 --- a/src/mito-codec/src/row_converter/dense.rs +++ b/src/mito-codec/src/row_converter/dense.rs @@ -556,11 +556,13 @@ impl PrimaryKeyCodec for DensePrimaryKeyCodec { &self, metadata: &RegionMetadataRef, filters: Arc>, + skip_partition_column: bool, ) -> Box { Box::new(DensePrimaryKeyFilter::new( metadata.clone(), filters, self.clone(), + skip_partition_column, )) } diff --git a/src/mito-codec/src/row_converter/sparse.rs b/src/mito-codec/src/row_converter/sparse.rs index 4638ddcefb..00ec51530c 100644 --- a/src/mito-codec/src/row_converter/sparse.rs +++ b/src/mito-codec/src/row_converter/sparse.rs @@ -357,11 +357,13 @@ impl PrimaryKeyCodec for SparsePrimaryKeyCodec { &self, metadata: &RegionMetadataRef, filters: Arc>, + skip_partition_column: bool, ) -> Box { Box::new(SparsePrimaryKeyFilter::new( metadata.clone(), filters, self.clone(), + skip_partition_column, )) } diff --git a/src/mito2/src/engine/row_selector_test.rs b/src/mito2/src/engine/row_selector_test.rs index d79152e57f..26d7327c2f 100644 --- a/src/mito2/src/engine/row_selector_test.rs +++ b/src/mito2/src/engine/row_selector_test.rs @@ -13,12 +13,15 @@ // limitations under the License. use api::v1::Rows; +use common_base::readable_size::ReadableSize; use common_recordbatch::RecordBatches; +use datafusion_expr::{col, lit}; use store_api::region_engine::RegionEngine; use store_api::region_request::RegionRequest; use store_api::storage::{RegionId, ScanRequest, TimeSeriesRowSelector}; use crate::config::MitoConfig; +use crate::engine::MitoEngine; use crate::test_util::batch_util::sort_batches_and_print; use crate::test_util::{ CreateRequestBuilder, TestEnv, build_rows_for_key, flush_region, put_rows, rows_schema, @@ -107,6 +110,27 @@ async fn test_last_row(append_mode: bool, flat_format: bool) { assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"])); } +async fn scan_last_row( + engine: &MitoEngine, + region_id: RegionId, + filters: Vec, +) -> String { + let scanner = engine + .scanner( + region_id, + ScanRequest { + filters, + series_row_selector: Some(TimeSeriesRowSelector::LastRow), + ..Default::default() + }, + ) + .await + .unwrap(); + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + sort_batches_and_print(&batches, &["tag_0", "ts"]) +} + #[tokio::test] async fn test_last_row_append_mode_disabled() { test_last_row(false, false).await; @@ -126,3 +150,69 @@ async fn test_last_row_flat_format_append_mode_disabled() { async fn test_last_row_flat_format_append_mode_enabled() { test_last_row(true, true).await; } + +#[tokio::test] +async fn test_last_row_flat_format_prefilter_does_not_poison_selector_cache() { + let mut env = TestEnv::new().await; + let engine = env + .create_engine(MitoConfig { + selector_result_cache_size: ReadableSize::mb(1), + ..Default::default() + }) + .await; + let region_id = RegionId::new(1, 1); + + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + env.get_kv_backend(), + ) + .await; + + let request = CreateRequestBuilder::new() + .insert_option("sst_format", "flat") + .build(); + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let rows = Rows { + schema: column_schemas, + rows: [ + build_rows_for_key("a", 0, 3, 0), + build_rows_for_key("b", 0, 3, 10), + ] + .concat(), + }; + put_rows(&engine, region_id, rows).await; + flush_region(&engine, region_id, Some(16)).await; + + let filtered = scan_last_row(&engine, region_id, vec![col("tag_0").eq(lit("a"))]).await; + assert_eq!( + "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| a | 2.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+", + filtered + ); + + let unfiltered = scan_last_row(&engine, region_id, vec![]).await; + assert_eq!( + "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| a | 2.0 | 1970-01-01T00:00:02 | +| b | 12.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+", + unfiltered + ); +} diff --git a/src/mito2/src/memtable/bulk/part_reader.rs b/src/mito2/src/memtable/bulk/part_reader.rs index edb9ff52d9..1375e79542 100644 --- a/src/mito2/src/memtable/bulk/part_reader.rs +++ b/src/mito2/src/memtable/bulk/part_reader.rs @@ -367,6 +367,7 @@ fn apply_combined_filters( let predicate_mask = context.base.compute_filter_mask_flat( &record_batch, skip_fields, + false, &mut tag_decode_state, )?; // If predicate filters out the entire batch, return None early diff --git a/src/mito2/src/memtable/partition_tree/partition.rs b/src/mito2/src/memtable/partition_tree/partition.rs index e6e3b8bf81..0ffbce4867 100644 --- a/src/mito2/src/memtable/partition_tree/partition.rs +++ b/src/mito2/src/memtable/partition_tree/partition.rs @@ -152,7 +152,8 @@ impl Partition { filters: &Arc>, ) -> Option> { if need_prune_key { - let filter = row_codec.primary_key_filter(metadata, filters.clone()); + // TODO(yingwen): Remove `skip_partition_column` after dropping PartitionTreeMemtable. + let filter = row_codec.primary_key_filter(metadata, filters.clone(), true); Some(filter) } else { None diff --git a/src/mito2/src/memtable/partition_tree/shard.rs b/src/mito2/src/memtable/partition_tree/shard.rs index 162f937a7c..c5dc25f573 100644 --- a/src/mito2/src/memtable/partition_tree/shard.rs +++ b/src/mito2/src/memtable/partition_tree/shard.rs @@ -19,9 +19,10 @@ use std::time::{Duration, Instant}; use mito_codec::key_values::KeyValue; use mito_codec::row_converter::PrimaryKeyFilter; +use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; -use crate::error::Result; +use crate::error::{DecodeSnafu, Result}; use crate::memtable::partition_tree::data::{ DATA_INIT_CAP, DataBatch, DataParts, DataPartsReader, DataPartsReaderBuilder, }; @@ -243,7 +244,7 @@ impl ShardReader { // Safety: `key_filter` is some so the shard has primary keys. let key = self.key_dict.as_ref().unwrap().key_by_pk_index(pk_index); let now = Instant::now(); - if key_filter.matches(key) { + if key_filter.matches(key).context(DecodeSnafu)? { self.prune_pk_cost += now.elapsed(); self.last_yield_pk_index = Some(pk_index); self.keys_after_pruning += 1; diff --git a/src/mito2/src/memtable/partition_tree/shard_builder.rs b/src/mito2/src/memtable/partition_tree/shard_builder.rs index 26de85767d..78eeb463c6 100644 --- a/src/mito2/src/memtable/partition_tree/shard_builder.rs +++ b/src/mito2/src/memtable/partition_tree/shard_builder.rs @@ -20,9 +20,10 @@ use std::time::{Duration, Instant}; use mito_codec::key_values::KeyValue; use mito_codec::row_converter::PrimaryKeyFilter; +use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; -use crate::error::Result; +use crate::error::{DecodeSnafu, Result}; use crate::memtable::partition_tree::data::{ DATA_INIT_CAP, DataBatch, DataBuffer, DataBufferReader, DataBufferReaderBuilder, DataParts, }; @@ -281,7 +282,7 @@ impl ShardBuilderReader { self.keys_before_pruning += 1; let key = self.dict_reader.key_by_pk_index(pk_index); let now = Instant::now(); - if key_filter.matches(key) { + if key_filter.matches(key).context(DecodeSnafu)? { self.prune_pk_cost += now.elapsed(); self.last_yield_pk_index = Some(pk_index); self.keys_after_pruning += 1; diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 79a08a209d..4a3466a29c 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -142,6 +142,7 @@ mod tests { use crate::sst::index::{IndexBuildType, Indexer, IndexerBuilder, IndexerBuilderImpl}; use crate::sst::parquet::flat_format::FlatWriteFormat; use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics}; + use crate::sst::parquet::row_selection::RowGroupSelection; use crate::sst::parquet::writer::ParquetWriter; use crate::sst::{ DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, location, to_flat_sst_arrow_schema, @@ -1113,6 +1114,39 @@ mod tests { assert!(reader.next_record_batch().await.unwrap().is_none()); } + fn new_record_batch_from_rows(rows: &[(&str, &str, i64)]) -> RecordBatch { + let metadata = Arc::new(sst_region_metadata()); + let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default()); + + let mut tag_0_builder = StringDictionaryBuilder::::new(); + let mut tag_1_builder = StringDictionaryBuilder::::new(); + let mut pk_builder = BinaryDictionaryBuilder::::new(); + let mut field_values = Vec::with_capacity(rows.len()); + let mut timestamps = Vec::with_capacity(rows.len()); + + for (tag_0, tag_1, ts) in rows { + tag_0_builder.append_value(*tag_0); + tag_1_builder.append_value(*tag_1); + pk_builder.append(new_primary_key(&[tag_0, tag_1])).unwrap(); + field_values.push(*ts as u64); + timestamps.push(*ts); + } + + RecordBatch::try_new( + flat_schema, + vec![ + Arc::new(tag_0_builder.finish()) as ArrayRef, + Arc::new(tag_1_builder.finish()) as ArrayRef, + Arc::new(UInt64Array::from(field_values)) as ArrayRef, + Arc::new(TimestampMillisecondArray::from(timestamps)) as ArrayRef, + Arc::new(pk_builder.finish()) as ArrayRef, + Arc::new(UInt64Array::from_value(1000, rows.len())) as ArrayRef, + Arc::new(UInt8Array::from_value(OpType::Put as u8, rows.len())) as ArrayRef, + ], + ) + .unwrap() + } + /// Creates a flat format RecordBatch for testing with sparse primary key encoding. /// Similar to `new_record_batch_by_range` but without individual primary key columns. fn new_record_batch_by_range_sparse( @@ -1642,6 +1676,133 @@ mod tests { assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100); } + #[tokio::test] + async fn test_reader_prefilter_with_outer_selection_and_trailing_filtered_rows() { + let mut env = TestEnv::new().await; + let object_store = env.init_object_store_manager(); + let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare); + let metadata = Arc::new(sst_region_metadata()); + let row_group_size = 10; + + let flat_source = new_flat_source_from_record_batches(vec![ + new_record_batch_by_range(&["a", "d"], 0, 3), + new_record_batch_by_range(&["b", "d"], 3, 10), + ]); + let write_opts = WriteOptions { + row_group_size, + ..Default::default() + }; + let indexer_builder = create_test_indexer_builder( + &env, + object_store.clone(), + file_path.clone(), + metadata.clone(), + row_group_size, + ); + let info = write_flat_sst( + object_store.clone(), + metadata.clone(), + indexer_builder, + file_path, + flat_source, + &write_opts, + ) + .await; + let handle = create_file_handle_from_sst_info(&info, &metadata); + + let builder = + ParquetReaderBuilder::new(FILE_DIR.to_string(), PathType::Bare, handle, object_store) + .flat_format(true) + .predicate(Some(Predicate::new(vec![col("tag_0").eq(lit("a"))]))); + + let mut metrics = ReaderMetrics::default(); + let (context, _) = builder + .build_reader_input(&mut metrics) + .await + .unwrap() + .unwrap(); + let selection = RowGroupSelection::from_row_ranges( + vec![(0, std::iter::once(0..6).collect())], + row_group_size, + ); + + let mut reader = ParquetReader::new(Arc::new(context), selection) + .await + .unwrap(); + check_record_batch_reader_result( + &mut reader, + &[new_record_batch_by_range(&["a", "d"], 0, 3)], + ) + .await; + } + + #[tokio::test] + async fn test_reader_prefilter_with_outer_selection_disjoint_matches_and_trailing_gap() { + let mut env = TestEnv::new().await; + let object_store = env.init_object_store_manager(); + let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare); + let metadata = Arc::new(sst_region_metadata()); + let row_group_size = 8; + + let flat_source = new_flat_source_from_record_batches(vec![ + new_record_batch_by_range(&["a", "d"], 0, 2), + new_record_batch_by_range(&["b", "d"], 2, 4), + new_record_batch_by_range(&["a", "d"], 4, 6), + new_record_batch_by_range(&["c", "d"], 6, 8), + ]); + let write_opts = WriteOptions { + row_group_size, + ..Default::default() + }; + let indexer_builder = create_test_indexer_builder( + &env, + object_store.clone(), + file_path.clone(), + metadata.clone(), + row_group_size, + ); + let info = write_flat_sst( + object_store.clone(), + metadata.clone(), + indexer_builder, + file_path, + flat_source, + &write_opts, + ) + .await; + let handle = create_file_handle_from_sst_info(&info, &metadata); + + let builder = + ParquetReaderBuilder::new(FILE_DIR.to_string(), PathType::Bare, handle, object_store) + .flat_format(true) + .predicate(Some(Predicate::new(vec![col("tag_0").eq(lit("a"))]))); + + let mut metrics = ReaderMetrics::default(); + let (context, _) = builder + .build_reader_input(&mut metrics) + .await + .unwrap() + .unwrap(); + let selection = RowGroupSelection::from_row_ranges( + vec![(0, std::iter::once(0..8).collect())], + row_group_size, + ); + + let mut reader = ParquetReader::new(Arc::new(context), selection) + .await + .unwrap(); + check_record_batch_reader_result( + &mut reader, + &[new_record_batch_from_rows(&[ + ("a", "d", 0), + ("a", "d", 1), + ("a", "d", 4), + ("a", "d", 5), + ])], + ) + .await; + } + #[tokio::test] async fn test_write_flat_read_with_inverted_index_sparse() { common_telemetry::init_default_ut_logging(); diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 3a5251cb1a..8b4a61acb7 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -37,6 +37,7 @@ use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, TimeSeriesRowSelector}; use table::predicate::Predicate; +use crate::cache::CacheStrategy; use crate::error::{ ComputeArrowSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu, EvalPartitionFilterSnafu, NewRecordBatchSnafu, RecordBatchSnafu, Result, StatsNotPresentSnafu, @@ -53,7 +54,8 @@ use crate::sst::parquet::flat_format::{ }; use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::reader::{ - FlatRowGroupReader, MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext, + FlatRowGroupReader, MaybeFilter, RowGroupBuildContext, RowGroupReader, RowGroupReaderBuilder, + SimpleFilterContext, }; use crate::sst::parquet::row_group::ParquetFetchMetrics; use crate::sst::parquet::stats::RowGroupPruningStats; @@ -181,14 +183,17 @@ impl FileRange { if !self.in_dynamic_filter_range() { return Ok(None); } + // Compute skip_fields once for this row group + let skip_fields = self.context.should_skip_fields(self.row_group_idx); let parquet_reader = self .context .reader_builder - .build( + .build(self.context.build_context( self.row_group_idx, self.row_selection.clone(), fetch_metrics, - ) + skip_fields, + )) .await?; let use_last_row_reader = if selector @@ -210,9 +215,6 @@ impl FileRange { false }; - // Compute skip_fields once for this row group - let skip_fields = self.context.should_skip_fields(self.row_group_idx); - let prune_reader = if use_last_row_reader { // Row group is PUT only, use LastRowReader to skip unnecessary rows. let reader = RowGroupLastRowCachedReader::new( @@ -243,14 +245,17 @@ impl FileRange { if !self.in_dynamic_filter_range() { return Ok(None); } + // Compute skip_fields once for this row group + let skip_fields = self.context.should_skip_fields(self.row_group_idx); let parquet_reader = self .context .reader_builder - .build( + .build(self.context.build_context( self.row_group_idx, self.row_selection.clone(), fetch_metrics, - ) + skip_fields, + )) .await?; let use_last_row_reader = if selector @@ -271,16 +276,20 @@ impl FileRange { false }; - // Compute skip_fields once for this row group - let skip_fields = self.context.should_skip_fields(self.row_group_idx); - let flat_prune_reader = if use_last_row_reader { let flat_row_group_reader = FlatRowGroupReader::new(self.context.clone(), parquet_reader); + // Flat PK prefilter makes the input stream predicate-dependent, so cached + // selector results are not reusable across queries with different filters. + let cache_strategy = if self.context.reader_builder.has_flat_primary_key_prefilter() { + CacheStrategy::Disabled + } else { + self.context.reader_builder.cache_strategy().clone() + }; let reader = FlatRowGroupLastRowCachedReader::new( self.file_handle().file_id().file_id(), self.row_group_idx, - self.context.reader_builder.cache_strategy().clone(), + cache_strategy, self.context.read_format().projection_indices(), flat_row_group_reader, ); @@ -387,7 +396,11 @@ impl FileRangeContext { input: RecordBatch, skip_fields: bool, ) -> Result> { - self.base.precise_filter_flat(input, skip_fields) + self.base.precise_filter_flat( + input, + skip_fields, + self.reader_builder.has_flat_primary_key_prefilter(), + ) } /// Determines whether to skip field filters based on PreFilterMode and row group delete status. @@ -408,6 +421,23 @@ impl FileRangeContext { row_group_contains_delete(metadata, row_group_index, self.reader_builder.file_path()) } + /// Creates a [RowGroupBuildContext] for building row group readers with prefiltering. + pub(crate) fn build_context<'a>( + &'a self, + row_group_idx: usize, + row_selection: Option, + fetch_metrics: Option<&'a ParquetFetchMetrics>, + skip_fields: bool, + ) -> RowGroupBuildContext<'a> { + RowGroupBuildContext { + filters: &self.base.filters, + skip_fields, + row_group_idx, + row_selection, + fetch_metrics, + } + } + /// Returns the estimated memory size of this context. /// Mainly accounts for the parquet metadata size. pub(crate) fn memory_size(&self) -> usize { @@ -600,9 +630,15 @@ impl RangeBase { &self, input: RecordBatch, skip_fields: bool, + skip_prefiltered_pk_filters: bool, ) -> Result> { let mut tag_decode_state = TagDecodeState::new(); - let mask = self.compute_filter_mask_flat(&input, skip_fields, &mut tag_decode_state)?; + let mask = self.compute_filter_mask_flat( + &input, + skip_fields, + skip_prefiltered_pk_filters, + &mut tag_decode_state, + )?; // If mask is None, the entire batch is filtered out let Some(mut mask) = mask else { @@ -647,6 +683,7 @@ impl RangeBase { &self, input: &RecordBatch, skip_fields: bool, + skip_prefiltered_pk_filters: bool, tag_decode_state: &mut TagDecodeState, ) -> Result> { let mut mask = BooleanBuffer::new_set(input.num_rows()); @@ -674,6 +711,12 @@ impl RangeBase { continue; } + // Flat parquet PK prefiltering already applied these tag predicates while refining + // row selection, so skip them here to avoid decoding/evaluating the same condition twice. + if skip_prefiltered_pk_filters && filter_ctx.usable_primary_key_filter() { + continue; + } + // Get the column directly by its projected index. // If the column is missing and it's not a tag/time column, this filter is skipped. // Assumes the projection indices align with the input batch schema. @@ -926,3 +969,62 @@ impl RangeBase { RecordBatch::try_new(arrow_schema.clone(), columns).context(NewRecordBatchSnafu) } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use datafusion_expr::{col, lit}; + + use super::*; + use crate::sst::parquet::format::ReadFormat; + use crate::test_util::sst_util::{new_record_batch_with_custom_sequence, sst_region_metadata}; + + fn new_test_range_base(filters: Vec) -> RangeBase { + let metadata: RegionMetadataRef = Arc::new(sst_region_metadata()); + let read_format = ReadFormat::new_flat( + metadata.clone(), + metadata.column_metadatas.iter().map(|c| c.column_id), + None, + "test", + true, + ) + .unwrap(); + + RangeBase { + filters, + dyn_filters: vec![], + read_format, + expected_metadata: None, + prune_schema: metadata.schema.clone(), + codec: mito_codec::row_converter::build_primary_key_codec(metadata.as_ref()), + compat_batch: None, + compaction_projection_mapper: None, + pre_filter_mode: PreFilterMode::All, + partition_filter: None, + } + } + + #[test] + fn test_compute_filter_mask_flat_skips_prefiltered_pk_filters() { + let metadata: RegionMetadataRef = Arc::new(sst_region_metadata()); + let filters = vec![ + SimpleFilterContext::new_opt(&metadata, None, &col("tag_0").eq(lit("a"))).unwrap(), + SimpleFilterContext::new_opt(&metadata, None, &col("field_0").gt(lit(1_u64))).unwrap(), + ]; + let base = new_test_range_base(filters); + let batch = new_record_batch_with_custom_sequence(&["b", "x"], 0, 4, 1); + + let mask_without_skip = base + .compute_filter_mask_flat(&batch, false, false, &mut TagDecodeState::new()) + .unwrap() + .unwrap(); + assert_eq!(mask_without_skip.count_set_bits(), 0); + + let mask_with_skip = base + .compute_filter_mask_flat(&batch, false, true, &mut TagDecodeState::new()) + .unwrap() + .unwrap(); + assert_eq!(mask_with_skip.count_set_bits(), 2); + } +} diff --git a/src/mito2/src/sst/parquet/flat_format.rs b/src/mito2/src/sst/parquet/flat_format.rs index 8a59e9a97d..ca39cac7e1 100644 --- a/src/mito2/src/sst/parquet/flat_format.rs +++ b/src/mito2/src/sst/parquet/flat_format.rs @@ -282,6 +282,13 @@ impl FlatReadFormat { } } + /// Returns `true` if raw batches from parquet use the flat layout with a + /// dictionary-encoded `__primary_key` column (i.e., [`ParquetAdapter::Flat`]). + /// Returns `false` for the legacy primary-key-to-flat conversion path. + pub(crate) fn raw_batch_has_primary_key_dictionary(&self) -> bool { + matches!(&self.parquet_adapter, ParquetAdapter::Flat(_)) + } + /// Creates a sequence array to override. pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option { self.override_sequence diff --git a/src/mito2/src/sst/parquet/prefilter.rs b/src/mito2/src/sst/parquet/prefilter.rs index 5de2e3512f..07efbd052f 100644 --- a/src/mito2/src/sst/parquet/prefilter.rs +++ b/src/mito2/src/sst/parquet/prefilter.rs @@ -12,31 +12,40 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Helpers for parquet prefiltering. +//! Prefilter framework for parquet reader. +//! +//! Prefilter optimization reduces I/O by reading only a subset of columns first +//! (the prefilter phase), applying filters to compute a refined row selection, +//! then reading the remaining columns with the refined selection. use std::ops::Range; +use std::sync::Arc; use api::v1::SemanticType; use common_recordbatch::filter::SimpleFilterEvaluator; -use datatypes::arrow::array::{BinaryArray, BooleanArray}; +use datatypes::arrow::array::BinaryArray; use datatypes::arrow::record_batch::RecordBatch; -use mito_codec::primary_key_filter::is_partition_column; -use mito_codec::row_converter::PrimaryKeyFilter; +use futures::StreamExt; +use mito_codec::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter}; +use parquet::arrow::ProjectionMask; +use parquet::arrow::arrow_reader::RowSelection; +use parquet::schema::types::SchemaDescriptor; use snafu::{OptionExt, ResultExt}; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; -use crate::error::{ComputeArrowSnafu, Result, UnexpectedSnafu}; +use crate::error::{DecodeSnafu, ReadParquetSnafu, Result, UnexpectedSnafu}; use crate::sst::parquet::flat_format::primary_key_column_index; -use crate::sst::parquet::format::PrimaryKeyArray; +use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat}; +use crate::sst::parquet::reader::{RowGroupBuildContext, RowGroupReaderBuilder}; +use crate::sst::parquet::row_selection::row_selection_from_row_ranges_exact; -#[cfg_attr(not(test), allow(dead_code))] pub(crate) fn matching_row_ranges_by_primary_key( input: &RecordBatch, + pk_column_index: usize, pk_filter: &mut dyn PrimaryKeyFilter, ) -> Result>> { - let primary_key_index = primary_key_column_index(input.num_columns()); let pk_dict_array = input - .column(primary_key_index) + .column(pk_column_index) .as_any() .downcast_ref::() .context(UnexpectedSnafu { @@ -65,7 +74,10 @@ pub(crate) fn matching_row_ranges_by_primary_key( end += 1; } - if pk_filter.matches(pk_values.value(key as usize)) { + if pk_filter + .matches(pk_values.value(key as usize)) + .context(DecodeSnafu)? + { if let Some(last) = matched_row_ranges.last_mut() && last.end == start { @@ -81,68 +93,15 @@ pub(crate) fn matching_row_ranges_by_primary_key( Ok(matched_row_ranges) } -#[cfg_attr(not(test), allow(dead_code))] -pub(crate) fn prefilter_flat_batch_by_primary_key( - input: RecordBatch, - pk_filter: &mut dyn PrimaryKeyFilter, -) -> Result> { - if input.num_rows() == 0 { - return Ok(Some(input)); - } - - let matched_row_ranges = matching_row_ranges_by_primary_key(&input, pk_filter)?; - if matched_row_ranges.is_empty() { - return Ok(None); - } - - if matched_row_ranges.len() == 1 - && matched_row_ranges[0].start == 0 - && matched_row_ranges[0].end == input.num_rows() - { - return Ok(Some(input)); - } - - if matched_row_ranges.len() == 1 { - let span = &matched_row_ranges[0]; - return Ok(Some(input.slice(span.start, span.end - span.start))); - } - - let mut mask = vec![false; input.num_rows()]; - for span in matched_row_ranges { - mask[span].fill(true); - } - - let filtered = - datatypes::arrow::compute::filter_record_batch(&input, &BooleanArray::from(mask)) - .context(ComputeArrowSnafu)?; - if filtered.num_rows() == 0 { - Ok(None) - } else { - Ok(Some(filtered)) - } -} - -#[cfg_attr(not(test), allow(dead_code))] -pub(crate) fn retain_usable_primary_key_filters( - sst_metadata: &RegionMetadataRef, - expected_metadata: Option<&RegionMetadata>, - filters: &mut Vec, -) { - filters.retain(|filter| is_usable_primary_key_filter(sst_metadata, expected_metadata, filter)); -} - -#[cfg_attr(not(test), allow(dead_code))] +/// Returns whether a filter can be applied by parquet primary-key prefiltering. +/// +/// Unlike `PartitionTreeMemtable`, parquet prefilter always supports predicates +/// on the partition column. pub(crate) fn is_usable_primary_key_filter( sst_metadata: &RegionMetadataRef, expected_metadata: Option<&RegionMetadata>, filter: &SimpleFilterEvaluator, ) -> bool { - // TODO(yingwen): The primary key filter always skips the partition column. Consider using a flag - // to control this behavior. We can remove this behavior after we remove the PartitionTreeMemtable. - if is_partition_column(filter.column_name()) { - return false; - } - let sst_column = match expected_metadata { Some(expected_metadata) => { let Some(expected_column) = expected_metadata.column_by_name(filter.column_name()) @@ -176,7 +135,6 @@ pub(crate) fn is_usable_primary_key_filter( .is_some() } -#[cfg_attr(not(test), allow(dead_code))] pub(crate) struct CachedPrimaryKeyFilter { inner: Box, last_primary_key: Vec, @@ -184,7 +142,6 @@ pub(crate) struct CachedPrimaryKeyFilter { } impl CachedPrimaryKeyFilter { - #[cfg_attr(not(test), allow(dead_code))] pub(crate) fn new(inner: Box) -> Self { Self { inner, @@ -195,49 +152,191 @@ impl CachedPrimaryKeyFilter { } impl PrimaryKeyFilter for CachedPrimaryKeyFilter { - fn matches(&mut self, pk: &[u8]) -> bool { + fn matches(&mut self, pk: &[u8]) -> mito_codec::error::Result { if let Some(last_match) = self.last_match && self.last_primary_key == pk { - return last_match; + return Ok(last_match); } - let matched = self.inner.matches(pk); + let matched = self.inner.matches(pk)?; self.last_primary_key.clear(); self.last_primary_key.extend_from_slice(pk); self.last_match = Some(matched); - matched + Ok(matched) } } -#[cfg_attr(not(test), allow(dead_code))] -pub(crate) fn batch_single_primary_key(batch: &RecordBatch) -> Result> { - let primary_key_index = primary_key_column_index(batch.num_columns()); - let pk_dict_array = batch - .column(primary_key_index) - .as_any() - .downcast_ref::() - .context(UnexpectedSnafu { - reason: "Primary key column is not a dictionary array", - })?; - let pk_values = pk_dict_array - .values() - .as_any() - .downcast_ref::() - .context(UnexpectedSnafu { - reason: "Primary key values are not binary array", - })?; - let keys = pk_dict_array.keys(); - if keys.is_empty() { - return Ok(None); +/// Context for prefiltering a row group. +/// +/// Currently supports primary key (PK) filtering only. +/// Will be extended with simple column filters and physical filters in the future. +pub(crate) struct PrefilterContext { + /// PK filter instance. + pk_filter: Box, + /// Projection mask for reading only the PK column. + pk_projection: ProjectionMask, + /// Index of the PK column within the prefilter projection batch. + /// This is 0 when we project only the PK column. + pk_column_index: usize, +} + +/// Pre-built state for constructing [PrefilterContext] per row group. +/// +/// Fields invariant across row groups (projection mask, codec, metadata, filters) +/// are computed once. A fresh [PrefilterContext] with its own mutable PK filter +/// is created via [PrefilterContextBuilder::build()] for each row group. +pub(crate) struct PrefilterContextBuilder { + pk_projection: ProjectionMask, + pk_column_index: usize, + codec: Arc, + metadata: RegionMetadataRef, + pk_filters: Arc>, +} + +impl PrefilterContextBuilder { + /// Creates a builder if prefiltering is applicable. + /// + /// Returns `None` if: + /// - No primary key filters are available + /// - The read format doesn't use flat layout with dictionary-encoded PKs + /// - The primary key is empty + pub(crate) fn new( + read_format: &ReadFormat, + codec: &Arc, + primary_key_filters: Option<&Arc>>, + parquet_schema: &SchemaDescriptor, + ) -> Option { + let pk_filters = primary_key_filters?; + if pk_filters.is_empty() { + return None; + } + + let metadata = read_format.metadata(); + if metadata.primary_key.is_empty() { + return None; + } + + // Only flat format with dictionary-encoded PKs supports PK prefiltering. + let flat_format = read_format.as_flat()?; + if !flat_format.raw_batch_has_primary_key_dictionary() { + return None; + } + + // Compute PK-only projection mask. + let num_parquet_columns = parquet_schema.num_columns(); + let pk_index = primary_key_column_index(num_parquet_columns); + let pk_projection = ProjectionMask::roots(parquet_schema, [pk_index]); + + // The PK column is the only column in the projection, so its index is 0. + let pk_column_index = 0; + + Some(Self { + pk_projection, + pk_column_index, + codec: Arc::clone(codec), + metadata: metadata.clone(), + pk_filters: Arc::clone(pk_filters), + }) } - let first_key = keys.value(0); - if first_key != keys.value(keys.len() - 1) { - return Ok(None); + /// Builds a [PrefilterContext] for a specific row group. + pub(crate) fn build(&self) -> PrefilterContext { + // Parquet PK prefilter always supports the partition column. Only + // PartitionTreeMemtable skips it after partition pruning. + let pk_filter = + self.codec + .primary_key_filter(&self.metadata, Arc::clone(&self.pk_filters), false); + let pk_filter = Box::new(CachedPrimaryKeyFilter::new(pk_filter)); + PrefilterContext { + pk_filter, + pk_projection: self.pk_projection.clone(), + pk_column_index: self.pk_column_index, + } + } +} + +/// Result of prefiltering a row group. +pub(crate) struct PrefilterResult { + /// Refined row selection after prefiltering. + pub(crate) refined_selection: RowSelection, + /// Number of rows filtered out by prefiltering. + pub(crate) filtered_rows: usize, +} + +/// Executes prefiltering on a row group. +/// +/// Reads only the prefilter columns (currently the PK dictionary column), +/// applies filters, and returns a refined [RowSelection]. +pub(crate) async fn execute_prefilter( + prefilter_ctx: &mut PrefilterContext, + reader_builder: &RowGroupReaderBuilder, + build_ctx: &RowGroupBuildContext<'_>, +) -> Result { + // Reads PK column only. + let mut pk_stream = reader_builder + .build_with_projection( + build_ctx.row_group_idx, + build_ctx.row_selection.clone(), + prefilter_ctx.pk_projection.clone(), + build_ctx.fetch_metrics, + ) + .await?; + + // Applies PK filter to each batch and collect matching row ranges. + let mut matched_row_ranges: Vec> = Vec::new(); + let mut row_offset = 0; + let mut rows_before_filter = 0usize; + + while let Some(batch_result) = pk_stream.next().await { + let batch = batch_result.context(ReadParquetSnafu { + path: reader_builder.file_path(), + })?; + let batch_num_rows = batch.num_rows(); + if batch_num_rows == 0 { + continue; + } + rows_before_filter += batch_num_rows; + + let ranges = matching_row_ranges_by_primary_key( + &batch, + prefilter_ctx.pk_column_index, + prefilter_ctx.pk_filter.as_mut(), + )?; + matched_row_ranges.extend( + ranges + .into_iter() + .map(|range| (range.start + row_offset)..(range.end + row_offset)), + ); + row_offset += batch_num_rows; } - Ok(Some(pk_values.value(first_key as usize))) + // Converts matched ranges to RowSelection. + let rows_selected: usize = matched_row_ranges.iter().map(|r| r.end - r.start).sum(); + let filtered_rows = rows_before_filter.saturating_sub(rows_selected); + + let refined_selection = if rows_selected == 0 { + RowSelection::from(vec![]) + } else { + // Build the prefilter selection relative to the yielded rows + // (not total_rows), since matched_row_ranges are offsets within + // the rows actually read from the stream. + let prefilter_selection = + row_selection_from_row_ranges_exact(matched_row_ranges.into_iter(), rows_before_filter); + + // Use and_then to apply prefilter selection within the context + // of the original selection, since prefilter offsets are relative + // to the original selection's selected rows. + match &build_ctx.row_selection { + Some(original) => original.and_then(&prefilter_selection), + None => prefilter_selection, + } + }; + + Ok(PrefilterResult { + refined_selection, + filtered_rows, + }) } #[cfg(test)] @@ -245,175 +344,14 @@ mod tests { use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; - use api::v1::SemanticType; use common_recordbatch::filter::SimpleFilterEvaluator; use datafusion_expr::{col, lit}; - use datatypes::arrow::array::{ - ArrayRef, BinaryArray, DictionaryArray, TimestampMillisecondArray, UInt8Array, UInt32Array, - UInt64Array, - }; - use datatypes::arrow::datatypes::{Schema, UInt32Type}; - use datatypes::arrow::record_batch::RecordBatch; - use datatypes::prelude::ConcreteDataType; - use mito_codec::row_converter::{PrimaryKeyFilter, build_primary_key_codec}; + use mito_codec::row_converter::PrimaryKeyFilter; use store_api::codec::PrimaryKeyEncoding; - use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; - use store_api::storage::ColumnSchema; use super::*; - use crate::sst::internal_fields; use crate::sst::parquet::format::ReadFormat; - use crate::test_util::sst_util::{ - new_primary_key, sst_region_metadata, sst_region_metadata_with_encoding, - }; - - fn new_test_filters(exprs: &[datafusion_expr::Expr]) -> Vec { - exprs - .iter() - .filter_map(SimpleFilterEvaluator::try_new) - .collect() - } - - fn expected_metadata_with_reused_tag_name( - old_metadata: &RegionMetadata, - ) -> Arc { - let mut builder = RegionMetadataBuilder::new(old_metadata.region_id); - builder - .push_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new( - "tag_0".to_string(), - ConcreteDataType::string_datatype(), - true, - ), - semantic_type: SemanticType::Tag, - column_id: 10, - }) - .push_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new( - "tag_1".to_string(), - ConcreteDataType::string_datatype(), - true, - ), - semantic_type: SemanticType::Tag, - column_id: 1, - }) - .push_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new( - "field_0".to_string(), - ConcreteDataType::uint64_datatype(), - true, - ), - semantic_type: SemanticType::Field, - column_id: 2, - }) - .push_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new( - "ts".to_string(), - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ), - semantic_type: SemanticType::Timestamp, - column_id: 3, - }) - .primary_key(vec![10, 1]); - - Arc::new(builder.build().unwrap()) - } - - fn new_raw_batch_with_metadata( - metadata: Arc, - primary_keys: &[&[u8]], - field_values: &[u64], - ) -> RecordBatch { - assert_eq!(primary_keys.len(), field_values.len()); - - let arrow_schema = metadata.schema.arrow_schema(); - let field_column = arrow_schema - .field(arrow_schema.index_of("field_0").unwrap()) - .clone(); - let time_index_column = arrow_schema - .field(arrow_schema.index_of("ts").unwrap()) - .clone(); - let mut fields = vec![field_column, time_index_column]; - fields.extend( - internal_fields() - .into_iter() - .map(|field| field.as_ref().clone()), - ); - let schema = Arc::new(Schema::new(fields)); - - let mut dict_values = Vec::new(); - let mut keys = Vec::with_capacity(primary_keys.len()); - for pk in primary_keys { - let key = dict_values - .iter() - .position(|existing: &&[u8]| existing == pk) - .unwrap_or_else(|| { - dict_values.push(*pk); - dict_values.len() - 1 - }); - keys.push(key as u32); - } - - let pk_array: ArrayRef = Arc::new(DictionaryArray::::new( - UInt32Array::from(keys), - Arc::new(BinaryArray::from_iter_values(dict_values.iter().copied())), - )); - - RecordBatch::try_new( - schema, - vec![ - Arc::new(UInt64Array::from(field_values.to_vec())), - Arc::new(TimestampMillisecondArray::from_iter_values( - 0..primary_keys.len() as i64, - )), - pk_array, - Arc::new(UInt64Array::from(vec![1; primary_keys.len()])), - Arc::new(UInt8Array::from(vec![1; primary_keys.len()])), - ], - ) - .unwrap() - } - - fn new_raw_batch(primary_keys: &[&[u8]], field_values: &[u64]) -> RecordBatch { - new_raw_batch_with_metadata(Arc::new(sst_region_metadata()), primary_keys, field_values) - } - - fn field_values(batch: &RecordBatch) -> Vec { - batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap() - .values() - .to_vec() - } - - #[test] - fn test_retain_usable_primary_key_filters_skips_non_tag_filters() { - let metadata = Arc::new(sst_region_metadata()); - let mut filters = - new_test_filters(&[col("field_0").eq(lit(1_u64)), col("ts").gt(lit(0_i64))]); - - retain_usable_primary_key_filters(&metadata, None, &mut filters); - - assert!(filters.is_empty()); - } - - #[test] - fn test_retain_usable_primary_key_filters_skips_reused_expected_tag_name() { - let metadata = Arc::new(sst_region_metadata()); - let expected_metadata = expected_metadata_with_reused_tag_name(&metadata); - let mut filters = new_test_filters(&[col("tag_0").eq(lit("b"))]); - - retain_usable_primary_key_filters( - &metadata, - Some(expected_metadata.as_ref()), - &mut filters, - ); - - assert!(filters.is_empty()); - } + use crate::test_util::sst_util::{new_primary_key, sst_region_metadata_with_encoding}; #[test] fn test_is_usable_primary_key_filter_skips_legacy_primary_key_batches() { @@ -435,52 +373,16 @@ mod tests { } #[test] - fn test_prefilter_primary_key_drops_single_dictionary_batch() { - let metadata = Arc::new(sst_region_metadata()); - let filters = Arc::new(new_test_filters(&[col("tag_0").eq(lit("b"))])); - let mut primary_key_filter = - build_primary_key_codec(metadata.as_ref()).primary_key_filter(&metadata, filters); - let pk_a = new_primary_key(&["a", "x"]); - let batch = new_raw_batch(&[pk_a.as_slice(), pk_a.as_slice()], &[10, 11]); + fn test_is_usable_primary_key_filter_supports_partition_column_by_default() { + let metadata = Arc::new(sst_region_metadata_with_encoding( + PrimaryKeyEncoding::Sparse, + )); + let filter = SimpleFilterEvaluator::try_new( + &col(store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME).eq(lit(1_u32)), + ) + .unwrap(); - let filtered = - prefilter_flat_batch_by_primary_key(batch, primary_key_filter.as_mut()).unwrap(); - - assert!(filtered.is_none()); - } - - #[test] - fn test_prefilter_primary_key_builds_mask_for_fragmented_matches() { - let metadata = Arc::new(sst_region_metadata()); - let filters = Arc::new(new_test_filters(&[col("tag_0") - .eq(lit("a")) - .or(col("tag_0").eq(lit("c")))])); - let mut primary_key_filter = - build_primary_key_codec(metadata.as_ref()).primary_key_filter(&metadata, filters); - let pk_a = new_primary_key(&["a", "x"]); - let pk_b = new_primary_key(&["b", "x"]); - let pk_c = new_primary_key(&["c", "x"]); - let pk_d = new_primary_key(&["d", "x"]); - let batch = new_raw_batch( - &[ - pk_a.as_slice(), - pk_a.as_slice(), - pk_b.as_slice(), - pk_b.as_slice(), - pk_c.as_slice(), - pk_c.as_slice(), - pk_d.as_slice(), - pk_d.as_slice(), - ], - &[10, 11, 12, 13, 14, 15, 16, 17], - ); - - let filtered = prefilter_flat_batch_by_primary_key(batch, primary_key_filter.as_mut()) - .unwrap() - .unwrap(); - - assert_eq!(filtered.num_rows(), 4); - assert_eq!(field_values(&filtered), vec![10, 11, 14, 15]); + assert!(is_usable_primary_key_filter(&metadata, None, &filter)); } struct CountingPrimaryKeyFilter { @@ -489,9 +391,9 @@ mod tests { } impl PrimaryKeyFilter for CountingPrimaryKeyFilter { - fn matches(&mut self, pk: &[u8]) -> bool { + fn matches(&mut self, pk: &[u8]) -> mito_codec::error::Result { self.hits.fetch_add(1, Ordering::Relaxed); - pk == self.expected.as_slice() + Ok(pk == self.expected.as_slice()) } } @@ -504,25 +406,14 @@ mod tests { expected: expected.clone(), })); - assert!(filter.matches(expected.as_slice())); - assert!(filter.matches(expected.as_slice())); - assert!(!filter.matches(new_primary_key(&["b", "x"]).as_slice())); + assert!(filter.matches(expected.as_slice()).unwrap()); + assert!(filter.matches(expected.as_slice()).unwrap()); + assert!( + !filter + .matches(new_primary_key(&["b", "x"]).as_slice()) + .unwrap() + ); assert_eq!(hits.load(Ordering::Relaxed), 2); } - - #[test] - fn test_batch_single_primary_key() { - let pk_a = new_primary_key(&["a", "x"]); - let pk_b = new_primary_key(&["b", "x"]); - - let batch = new_raw_batch(&[pk_a.as_slice(), pk_a.as_slice()], &[10, 11]); - assert_eq!( - batch_single_primary_key(&batch).unwrap(), - Some(pk_a.as_slice()) - ); - - let batch = new_raw_batch(&[pk_a.as_slice(), pk_b.as_slice()], &[10, 11]); - assert_eq!(batch_single_primary_key(&batch).unwrap(), None); - } } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index f152c97075..8832cd4a16 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -76,6 +76,9 @@ use crate::sst::parquet::file_range::{ }; use crate::sst::parquet::format::{ReadFormat, need_override_sequence}; use crate::sst::parquet::metadata::MetadataLoader; +use crate::sst::parquet::prefilter::{ + PrefilterContextBuilder, execute_prefilter, is_usable_primary_key_filter, +}; use crate::sst::parquet::row_group::ParquetFetchMetrics; use crate::sst::parquet::row_selection::RowGroupSelection; use crate::sst::parquet::stats::RowGroupPruningStats; @@ -459,16 +462,6 @@ impl ParquetReaderBuilder { ArrowReaderMetadata::try_new(parquet_meta.clone(), arrow_reader_options) .context(ReadDataPartSnafu)?; - let reader_builder = RowGroupReaderBuilder { - file_handle: self.file_handle.clone(), - file_path, - parquet_meta, - arrow_metadata, - object_store: self.object_store.clone(), - projection: projection_mask, - cache_strategy: self.cache_strategy.clone(), - }; - let filters = if let Some(predicate) = &self.predicate { predicate .exprs() @@ -493,6 +486,33 @@ impl ParquetReaderBuilder { let codec = build_primary_key_codec(read_format.metadata()); + // Extract primary key filters from precomputed filter contexts for prefiltering. + let primary_key_filters = { + let pk_filters = filters + .iter() + .filter_map(SimpleFilterContext::primary_key_prefilter) + .collect::>(); + (!pk_filters.is_empty()).then_some(Arc::new(pk_filters)) + }; + + let prefilter_builder = PrefilterContextBuilder::new( + &read_format, + &codec, + primary_key_filters.as_ref(), + parquet_meta.file_metadata().schema_descr(), + ); + + let reader_builder = RowGroupReaderBuilder { + file_handle: self.file_handle.clone(), + file_path, + parquet_meta, + arrow_metadata, + object_store: self.object_store.clone(), + projection: projection_mask, + cache_strategy: self.cache_strategy.clone(), + prefilter_builder, + }; + let partition_filter = self.build_partition_filter(&read_format, &prune_schema)?; let context = FileRangeContext::new( @@ -1658,6 +1678,25 @@ pub(crate) struct RowGroupReaderBuilder { projection: ProjectionMask, /// Cache. cache_strategy: CacheStrategy, + /// Pre-built prefilter state. `None` if prefiltering is not applicable. + prefilter_builder: Option, +} + +/// Context passed to [RowGroupReaderBuilder::build()] carrying all information +/// needed for prefiltering decisions. +pub(crate) struct RowGroupBuildContext<'a> { + /// Simple filters pushed down. Used by prefilter on other columns. + #[allow(dead_code)] + pub(crate) filters: &'a [SimpleFilterContext], + /// Whether to skip field filters. Used by prefilter on other columns. + #[allow(dead_code)] + pub(crate) skip_fields: bool, + /// Index of the row group to read. + pub(crate) row_group_idx: usize, + /// Row selection for the row group. `None` means all rows. + pub(crate) row_selection: Option, + /// Metrics for tracking fetch operations. + pub(crate) fetch_metrics: Option<&'a ParquetFetchMetrics>, } impl RowGroupReaderBuilder { @@ -1679,11 +1718,58 @@ impl RowGroupReaderBuilder { &self.cache_strategy } + pub(crate) fn has_flat_primary_key_prefilter(&self) -> bool { + self.prefilter_builder.is_some() + } + /// Builds a [ParquetRecordBatchStream] to read the row group at `row_group_idx`. + /// + /// If prefiltering is applicable (based on `build_ctx`), this performs a two-phase read: + /// 1. Reads only the prefilter columns (e.g. PK column), applies filters to get a refined row selection + /// 2. Reads the full projection with the refined row selection pub(crate) async fn build( + &self, + build_ctx: RowGroupBuildContext<'_>, + ) -> Result> { + let prefilter_ctx = self.prefilter_builder.as_ref().map(|b| b.build()); + + let Some(mut prefilter_ctx) = prefilter_ctx else { + // No prefilter applicable, build stream with full projection. + return self + .build_with_projection( + build_ctx.row_group_idx, + build_ctx.row_selection, + self.projection.clone(), + build_ctx.fetch_metrics, + ) + .await; + }; + + let prefilter_start = Instant::now(); + let prefilter_result = execute_prefilter(&mut prefilter_ctx, self, &build_ctx).await?; + if let Some(metrics) = build_ctx.fetch_metrics { + let mut data = metrics.data.lock().unwrap(); + data.prefilter_cost += prefilter_start.elapsed(); + data.prefilter_filtered_rows += prefilter_result.filtered_rows; + } + + let refined_selection = Some(prefilter_result.refined_selection); + + self.build_with_projection( + build_ctx.row_group_idx, + refined_selection, + self.projection.clone(), + build_ctx.fetch_metrics, + ) + .await + } + + /// Builds a [ParquetRecordBatchStream] with a custom projection mask. + pub(crate) async fn build_with_projection( &self, row_group_idx: usize, row_selection: Option, + projection: ProjectionMask, fetch_metrics: Option<&ParquetFetchMetrics>, ) -> Result> { // Create async file reader with caching support. @@ -1704,7 +1790,7 @@ impl RowGroupReaderBuilder { ); builder = builder .with_row_groups(vec![row_group_idx]) - .with_projection(self.projection.clone()) + .with_projection(projection) .with_batch_size(DEFAULT_READ_BATCH_SIZE); if let Some(selection) = row_selection { @@ -1739,6 +1825,8 @@ pub(crate) struct SimpleFilterContext { semantic_type: SemanticType, /// The data type of the column. data_type: ConcreteDataType, + /// Whether this filter can be applied by flat parquet primary-key prefiltering. + usable_primary_key_filter: bool, } impl SimpleFilterContext { @@ -1752,6 +1840,10 @@ impl SimpleFilterContext { expr: &Expr, ) -> Option { let filter = SimpleFilterEvaluator::try_new(expr)?; + // Parquet PK prefilter always supports the partition column. Only + // PartitionTreeMemtable skips it after partition pruning. + let usable_primary_key_filter = + is_usable_primary_key_filter(sst_meta, expected_meta, &filter); let (column_metadata, maybe_filter) = match expected_meta { Some(meta) => { // Gets the column metadata from the expected metadata. @@ -1782,11 +1874,15 @@ impl SimpleFilterContext { } }; + let usable_primary_key_filter = + matches!(maybe_filter, MaybeFilter::Filter(_)) && usable_primary_key_filter; + Some(Self { filter: maybe_filter, column_id: column_metadata.column_id, semantic_type: column_metadata.semantic_type, data_type: column_metadata.column_schema.data_type.clone(), + usable_primary_key_filter, }) } @@ -1809,6 +1905,23 @@ impl SimpleFilterContext { pub(crate) fn data_type(&self) -> &ConcreteDataType { &self.data_type } + + /// Returns whether this filter is eligible for flat parquet PK prefiltering. + pub(crate) fn usable_primary_key_filter(&self) -> bool { + self.usable_primary_key_filter + } + + /// Returns the filter evaluator when it is eligible for PK prefiltering. + pub(crate) fn primary_key_prefilter(&self) -> Option { + if !self.usable_primary_key_filter { + return None; + } + + match &self.filter { + MaybeFilter::Filter(filter) => Some(filter.clone()), + MaybeFilter::Matched | MaybeFilter::Pruned => None, + } + } } /// Prune a column by its default value. @@ -1856,17 +1969,17 @@ impl ParquetReader { return Ok(None); }; + let skip_fields = self.context.should_skip_fields(row_group_idx); let parquet_reader = self .context .reader_builder() - .build( + .build(self.context.build_context( row_group_idx, Some(row_selection), Some(&self.fetch_metrics), - ) + skip_fields, + )) .await?; - - let skip_fields = self.context.should_skip_fields(row_group_idx); self.reader = Some(FlatPruneReader::new_with_row_group_reader( self.context.clone(), FlatRowGroupReader::new(self.context.clone(), parquet_reader), @@ -1889,11 +2002,16 @@ impl ParquetReader { debug_assert!(context.read_format().as_flat().is_some()); let fetch_metrics = ParquetFetchMetrics::default(); let reader = if let Some((row_group_idx, row_selection)) = selection.pop_first() { + let skip_fields = context.should_skip_fields(row_group_idx); let parquet_reader = context .reader_builder() - .build(row_group_idx, Some(row_selection), Some(&fetch_metrics)) + .build(context.build_context( + row_group_idx, + Some(row_selection), + Some(&fetch_metrics), + skip_fields, + )) .await?; - let skip_fields = context.should_skip_fields(row_group_idx); Some(FlatPruneReader::new_with_row_group_reader( context.clone(), FlatRowGroupReader::new(context.clone(), parquet_reader), @@ -2111,11 +2229,15 @@ mod tests { use datafusion_expr::expr::ScalarFunction; use datafusion_expr::{ ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility, + col, lit, }; use datatypes::arrow::array::{ArrayRef, Int64Array}; use datatypes::arrow::record_batch::RecordBatch; + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::ColumnSchema; use object_store::services::Memory; use parquet::arrow::ArrowWriter; + use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; use store_api::region_request::PathType; use table::predicate::Predicate; @@ -2207,4 +2329,80 @@ mod tests { assert!(!selection.is_empty()); } + + fn expected_metadata_with_reused_tag_name( + old_metadata: &RegionMetadata, + ) -> Arc { + let mut builder = RegionMetadataBuilder::new(old_metadata.region_id); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_0".to_string(), + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 10, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_1".to_string(), + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "field_0".to_string(), + ConcreteDataType::uint64_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts".to_string(), + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 3, + }) + .primary_key(vec![10, 1]); + + Arc::new(builder.build().unwrap()) + } + + #[test] + fn test_simple_filter_context_marks_usable_primary_key_filter() { + let metadata: RegionMetadataRef = Arc::new(sst_region_metadata()); + let ctx = + SimpleFilterContext::new_opt(&metadata, None, &col("tag_0").eq(lit("a"))).unwrap(); + + assert!(ctx.usable_primary_key_filter()); + assert!(ctx.primary_key_prefilter().is_some()); + } + + #[test] + fn test_simple_filter_context_skips_non_usable_primary_key_filter() { + let metadata: RegionMetadataRef = Arc::new(sst_region_metadata()); + + let field_ctx = + SimpleFilterContext::new_opt(&metadata, None, &col("field_0").eq(lit(1_u64))).unwrap(); + assert!(!field_ctx.usable_primary_key_filter()); + assert!(field_ctx.primary_key_prefilter().is_none()); + + let expected_metadata = expected_metadata_with_reused_tag_name(metadata.as_ref()); + let mismatched_ctx = SimpleFilterContext::new_opt( + &metadata, + Some(expected_metadata.as_ref()), + &col("tag_0").eq(lit("a")), + ) + .unwrap(); + assert!(!mismatched_ctx.usable_primary_key_filter()); + assert!(mismatched_ctx.primary_key_prefilter().is_none()); + } } diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index 38ef62c6b8..8822882c5d 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -48,12 +48,16 @@ pub struct ParquetFetchMetricsData { pub store_fetch_elapsed: std::time::Duration, /// Total elapsed time for fetching row groups. pub total_fetch_elapsed: std::time::Duration, + /// Elapsed time for prefilter execution. + pub prefilter_cost: std::time::Duration, + /// Number of rows filtered out by prefiltering. + pub prefilter_filtered_rows: usize, } impl ParquetFetchMetricsData { /// Returns true if the metrics are empty (contain no meaningful data). fn is_empty(&self) -> bool { - self.total_fetch_elapsed.is_zero() + self.total_fetch_elapsed.is_zero() && self.prefilter_cost.is_zero() } } @@ -84,6 +88,8 @@ impl std::fmt::Debug for ParquetFetchMetrics { write_cache_fetch_elapsed, store_fetch_elapsed, total_fetch_elapsed, + prefilter_cost, + prefilter_filtered_rows, } = *data; write!(f, "{{")?; @@ -142,6 +148,16 @@ impl std::fmt::Debug for ParquetFetchMetrics { if !store_fetch_elapsed.is_zero() { write!(f, ", \"store_fetch_elapsed\":\"{:?}\"", store_fetch_elapsed)?; } + if !prefilter_cost.is_zero() { + write!(f, ", \"prefilter_cost\":\"{:?}\"", prefilter_cost)?; + } + if prefilter_filtered_rows > 0 { + write!( + f, + ", \"prefilter_filtered_rows\":{}", + prefilter_filtered_rows + )?; + } write!(f, "}}") } @@ -169,6 +185,8 @@ impl ParquetFetchMetrics { write_cache_fetch_elapsed, store_fetch_elapsed, total_fetch_elapsed, + prefilter_cost, + prefilter_filtered_rows, } = *other.data.lock().unwrap(); let mut data = self.data.lock().unwrap(); @@ -185,6 +203,8 @@ impl ParquetFetchMetrics { data.write_cache_fetch_elapsed += write_cache_fetch_elapsed; data.store_fetch_elapsed += store_fetch_elapsed; data.total_fetch_elapsed += total_fetch_elapsed; + data.prefilter_cost += prefilter_cost; + data.prefilter_filtered_rows += prefilter_filtered_rows; } } diff --git a/src/mito2/src/sst/parquet/row_selection.rs b/src/mito2/src/sst/parquet/row_selection.rs index 595f1d352a..763e244ef2 100644 --- a/src/mito2/src/sst/parquet/row_selection.rs +++ b/src/mito2/src/sst/parquet/row_selection.rs @@ -554,11 +554,43 @@ fn intersect_row_selections(left: &RowSelection, right: &RowSelection) -> RowSel /// or if there's a gap that requires skipping rows. It handles both "select" and "skip" actions, /// optimizing the list of selectors by merging contiguous actions of the same type. /// +/// The returned selection intentionally stops at the end of the last matched range and may omit a +/// trailing `skip` that would extend it to `total_row_count`. That is fine when the selection is +/// used directly by the parquet reader, which simply stops once the selectors are exhausted. +/// /// Note: overlapping ranges are not supported and will result in an incorrect selection. pub(crate) fn row_selection_from_row_ranges( row_ranges: impl Iterator>, total_row_count: usize, ) -> RowSelection { + let (selectors, _) = build_selectors_from_row_ranges(row_ranges, total_row_count); + RowSelection::from(selectors) +} + +/// Like [`row_selection_from_row_ranges`] but guarantees the resulting selection +/// covers exactly `total_row_count` rows by appending a trailing skip if needed. +/// +/// Required when the result is used as the inner operand of [`RowSelection::and_then`], because +/// `and_then` expects the inner selection to account for every row selected by the outer one. +pub(crate) fn row_selection_from_row_ranges_exact( + row_ranges: impl Iterator>, + total_row_count: usize, +) -> RowSelection { + let (mut selectors, last_processed_end) = + build_selectors_from_row_ranges(row_ranges, total_row_count); + if last_processed_end < total_row_count { + // Preserve the full logical length of the selection even when the final rows are all + // filtered out. Without this trailing skip, `and_then` sees an undersized inner + // selection and panics. + add_or_merge_selector(&mut selectors, total_row_count - last_processed_end, true); + } + RowSelection::from(selectors) +} + +fn build_selectors_from_row_ranges( + row_ranges: impl Iterator>, + total_row_count: usize, +) -> (Vec, usize) { let mut selectors: Vec = Vec::new(); let mut last_processed_end = 0; @@ -572,7 +604,7 @@ pub(crate) fn row_selection_from_row_ranges( last_processed_end = end; } - RowSelection::from(selectors) + (selectors, last_processed_end) } /// Converts an iterator of sorted row IDs into a `RowSelection`. @@ -707,6 +739,56 @@ mod tests { assert_eq!(selection, expected); } + #[test] + fn test_exact_single_range_with_trailing_skip() { + let selection = row_selection_from_row_ranges_exact(Some(0..3).into_iter(), 6); + let expected = RowSelection::from(vec![RowSelector::select(3), RowSelector::skip(3)]); + assert_eq!(selection, expected); + assert_eq!(selection.row_count(), 3); + } + + #[test] + fn test_exact_non_contiguous_ranges() { + let ranges = [1..3, 5..8]; + let selection = row_selection_from_row_ranges_exact(ranges.iter().cloned(), 10); + let expected = RowSelection::from(vec![ + RowSelector::skip(1), + RowSelector::select(2), + RowSelector::skip(2), + RowSelector::select(3), + RowSelector::skip(2), + ]); + assert_eq!(selection, expected); + assert_eq!(selection.row_count(), 5); + } + + #[test] + fn test_exact_empty_ranges() { + let selection = row_selection_from_row_ranges_exact([].iter().cloned(), 10); + let expected = RowSelection::from(vec![RowSelector::skip(10)]); + assert_eq!(selection, expected); + assert_eq!(selection.row_count(), 0); + } + + #[test] + fn test_exact_range_covers_all_rows() { + let selection = row_selection_from_row_ranges_exact(Some(0..10).into_iter(), 10); + let expected = RowSelection::from(vec![RowSelector::select(10)]); + assert_eq!(selection, expected); + assert_eq!(selection.row_count(), 10); + } + + #[test] + fn test_exact_compatible_with_and_then() { + // Outer selects rows 0..6 out of 10. + let outer = RowSelection::from(vec![RowSelector::select(6), RowSelector::skip(4)]); + // Inner: within those 6 rows, select only rows 0..3. + let inner = row_selection_from_row_ranges_exact(Some(0..3).into_iter(), 6); + let result = outer.and_then(&inner); + let expected = RowSelection::from(vec![RowSelector::select(3), RowSelector::skip(7)]); + assert_eq!(result, expected); + } + #[test] fn test_row_ids_to_selection() { let row_ids = [1, 3, 5, 7, 9].into_iter();