diff --git a/src/mito2/src/read/prune.rs b/src/mito2/src/read/prune.rs index 87240f94b4..18177c8edd 100644 --- a/src/mito2/src/read/prune.rs +++ b/src/mito2/src/read/prune.rs @@ -314,18 +314,21 @@ impl FlatPruneReader { } pub(crate) fn next_batch(&mut self) -> Result> { - while let Some(raw_batch) = { + loop { let start = std::time::Instant::now(); - let batch = self.source.next_raw_batch()?; - self.metrics.scan_cost += start.elapsed(); - batch - } { - // Update metrics for the received batch + let Some(raw_batch) = self.source.next_raw_batch()? else { + return Ok(None); + }; + + // Account rows as soon as parquet yields a raw batch. The scan timer spans + // the raw read, encoded primary-key prefilter, and flat conversion so + // `scan_cost` keeps the same meaning after splitting `next_raw_batch()` + // from `convert_batch()`. self.metrics.num_rows += raw_batch.num_rows(); - self.metrics.num_batches += 1; let num_rows_before_prefilter = raw_batch.num_rows(); let Some(prefiltered_batch) = self.prefilter_primary_keys(raw_batch)? else { + self.metrics.scan_cost += start.elapsed(); self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_prefilter; continue; }; @@ -333,7 +336,10 @@ impl FlatPruneReader { self.metrics.filter_metrics.rows_precise_filtered += prefiltered_rows; let record_batch = self.source.convert_batch(prefiltered_batch)?; + self.metrics.scan_cost += start.elapsed(); + // `num_batches` counts decoded flat batches, not raw parquet batches. + self.metrics.num_batches += 1; match self.prune_flat(record_batch)? { Some(filtered_batch) => { return Ok(Some(filtered_batch)); @@ -343,8 +349,6 @@ impl FlatPruneReader { } } } - - Ok(None) } fn prefilter_primary_keys(&mut self, record_batch: RecordBatch) -> Result> { diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index e9c90b9103..64e894da0e 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -29,6 +29,7 @@ use datatypes::arrow::buffer::BooleanBuffer; use datatypes::arrow::record_batch::RecordBatch; use datatypes::prelude::ConcreteDataType; use datatypes::schema::Schema; +use mito_codec::primary_key_filter::is_partition_column; use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec, PrimaryKeyFilter}; use parquet::arrow::arrow_reader::RowSelection; use parquet::file::metadata::ParquetMetaData; @@ -491,12 +492,31 @@ impl TagDecodeState { } impl RangeBase { + fn has_usable_primary_key_filter(&self) -> bool { + let Some(filters) = &self.primary_key_filters else { + return false; + }; + let metadata = self.read_format.metadata(); + + filters.iter().any(|filter| { + !is_partition_column(filter.column_name()) + && metadata + .column_by_name(filter.column_name()) + .is_some_and(|column| { + column.semantic_type == SemanticType::Tag + && metadata.primary_key_index(column.column_id).is_some() + }) + }) + } + /// Builds an encoded primary-key filter for flat scan pre-filtering. pub(crate) fn new_primary_key_filter(&self) -> Option> { - let filters = self.primary_key_filters.as_ref()?; - if self.read_format.metadata().primary_key.is_empty() { + if self.read_format.metadata().primary_key.is_empty() + || !self.has_usable_primary_key_filter() + { return None; } + let filters = self.primary_key_filters.as_ref()?; Some( self.codec @@ -1071,12 +1091,19 @@ mod tests { }; use datatypes::arrow::datatypes::UInt32Type; use mito_codec::row_converter::build_primary_key_codec; + use store_api::codec::PrimaryKeyEncoding; + use store_api::metadata::RegionMetadata; use super::*; - use crate::test_util::sst_util::{new_primary_key, sst_region_metadata}; + use crate::test_util::sst_util::{ + new_primary_key, new_sparse_primary_key, sst_region_metadata, + sst_region_metadata_with_encoding, + }; - fn new_test_range_base(exprs: &[Expr]) -> RangeBase { - let metadata = Arc::new(sst_region_metadata()); + fn new_test_range_base_with_metadata( + metadata: Arc, + exprs: &[Expr], + ) -> RangeBase { let read_format = ReadFormat::new_flat( metadata.clone(), metadata.column_metadatas.iter().map(|c| c.column_id), @@ -1106,10 +1133,17 @@ mod tests { } } - fn new_raw_batch(primary_keys: &[&[u8]], field_values: &[u64]) -> RecordBatch { + fn new_test_range_base(exprs: &[Expr]) -> RangeBase { + new_test_range_base_with_metadata(Arc::new(sst_region_metadata()), exprs) + } + + fn new_raw_batch_with_metadata( + metadata: Arc, + primary_keys: &[&[u8]], + field_values: &[u64], + ) -> RecordBatch { assert_eq!(primary_keys.len(), field_values.len()); - let metadata = Arc::new(sst_region_metadata()); let schema = ReadFormat::new_flat( metadata.clone(), metadata.column_metadatas.iter().map(|c| c.column_id), @@ -1154,6 +1188,10 @@ mod tests { .unwrap() } + fn new_raw_batch(primary_keys: &[&[u8]], field_values: &[u64]) -> RecordBatch { + new_raw_batch_with_metadata(Arc::new(sst_region_metadata()), primary_keys, field_values) + } + fn field_values(batch: &RecordBatch) -> Vec { batch .column(0) @@ -1164,6 +1202,13 @@ mod tests { .to_vec() } + #[test] + fn test_new_primary_key_filter_skips_non_tag_filters() { + let base = new_test_range_base(&[col("field_0").eq(lit(1_u64)), col("ts").gt(lit(0_i64))]); + + assert!(base.new_primary_key_filter().is_none()); + } + #[test] fn test_prefilter_primary_key_drops_single_dictionary_batch() { let pk_a = new_primary_key(&["a", "x"]); @@ -1239,4 +1284,33 @@ mod tests { assert_eq!(filtered.num_rows(), 4); assert_eq!(field_values(&filtered), vec![10, 11, 14, 15]); } + + #[test] + fn test_prefilter_primary_key_sparse_path() { + let metadata = Arc::new(sst_region_metadata_with_encoding( + PrimaryKeyEncoding::Sparse, + )); + let pk_a = new_sparse_primary_key(&["a", "x"], &metadata, 1, 11); + let pk_b = new_sparse_primary_key(&["b", "x"], &metadata, 1, 22); + let batch = new_raw_batch_with_metadata( + metadata.clone(), + &[ + pk_a.as_slice(), + pk_a.as_slice(), + pk_b.as_slice(), + pk_b.as_slice(), + ], + &[10, 11, 12, 13], + ); + let base = new_test_range_base_with_metadata(metadata, &[col("tag_0").eq(lit("b"))]); + let mut primary_key_filter = base.new_primary_key_filter().unwrap(); + + let filtered = base + .prefilter_flat_batch_by_primary_key(batch, primary_key_filter.as_mut()) + .unwrap() + .unwrap(); + + assert_eq!(filtered.num_rows(), 2); + assert_eq!(field_values(&filtered), vec![12, 13]); + } } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 1fe0cf8daa..1f80d1ee02 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -1633,11 +1633,12 @@ pub struct ReaderMetrics { pub(crate) filter_metrics: ReaderFilterMetrics, /// Duration to build the parquet reader. pub(crate) build_cost: Duration, - /// Duration to scan the reader. + /// Duration to scan the reader, including parquet fetches and decoding work + /// needed to materialize output batches. pub(crate) scan_cost: Duration, /// Number of record batches read. pub(crate) num_record_batches: usize, - /// Number of batches decoded. + /// Number of decoded output batches materialized from parquet data. pub(crate) num_batches: usize, /// Number of rows read. pub(crate) num_rows: usize,