From b0edcefb17db564f9e64f344a0c11ac8c4a44c1c Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 20 Mar 2026 12:05:59 +0800 Subject: [PATCH] cache pk match reuse Signed-off-by: Ruihang Xia --- src/mito2/src/memtable/bulk/part_reader.rs | 1 + src/mito2/src/read/prune.rs | 166 +++++++++++++++++---- src/mito2/src/sst/parquet/file_range.rs | 58 ++++++- 3 files changed, 197 insertions(+), 28 deletions(-) diff --git a/src/mito2/src/memtable/bulk/part_reader.rs b/src/mito2/src/memtable/bulk/part_reader.rs index 1e9d955321..f5bb47d2de 100644 --- a/src/mito2/src/memtable/bulk/part_reader.rs +++ b/src/mito2/src/memtable/bulk/part_reader.rs @@ -368,6 +368,7 @@ fn apply_combined_filters( let predicate_mask = context.base.compute_filter_mask_flat( &record_batch, skip_fields, + None, &mut tag_decode_state, )?; // If predicate filters out the entire batch, return None early diff --git a/src/mito2/src/read/prune.rs b/src/mito2/src/read/prune.rs index 881f1f0836..4dc0dc51dd 100644 --- a/src/mito2/src/read/prune.rs +++ b/src/mito2/src/read/prune.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::ops::BitAnd; use std::sync::Arc; @@ -19,16 +20,20 @@ use common_recordbatch::filter::SimpleFilterEvaluator; use common_time::Timestamp; use datatypes::arrow::array::BooleanArray; use datatypes::arrow::buffer::BooleanBuffer; +use datatypes::arrow::compute::concat_batches; use datatypes::arrow::record_batch::RecordBatch; use mito_codec::row_converter::PrimaryKeyFilter; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; +use store_api::storage::ColumnId; -use crate::error::{RecordBatchSnafu, Result}; +use crate::error::{ComputeArrowSnafu, RecordBatchSnafu, Result, UnexpectedSnafu}; use crate::memtable::BoxedBatchIterator; use crate::read::last_row::{FlatRowGroupLastRowCachedReader, RowGroupLastRowCachedReader}; use crate::read::{Batch, BatchReader}; use crate::sst::file::FileTimeRange; use crate::sst::parquet::file_range::FileRangeContextRef; +use crate::sst::parquet::flat_format::primary_key_column_index; +use crate::sst::parquet::format::PrimaryKeyArray; use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics, RowGroupReader}; pub enum Source { @@ -263,12 +268,75 @@ impl FlatSource { } } +struct CachedPrimaryKeyFilter { + inner: Box, + last_primary_key: Vec, + last_match: Option, +} + +impl CachedPrimaryKeyFilter { + fn new(inner: Box) -> Self { + Self { + inner, + last_primary_key: Vec::new(), + last_match: None, + } + } +} + +impl PrimaryKeyFilter for CachedPrimaryKeyFilter { + fn matches(&mut self, pk: &[u8]) -> bool { + if let Some(last_match) = self.last_match + && self.last_primary_key == pk + { + return last_match; + } + + let matched = self.inner.matches(pk); + self.last_primary_key.clear(); + self.last_primary_key.extend_from_slice(pk); + self.last_match = Some(matched); + matched + } +} + +fn batch_single_primary_key(batch: &RecordBatch) -> Result> { + let primary_key_index = primary_key_column_index(batch.num_columns()); + let pk_dict_array = batch + .column(primary_key_index) + .as_any() + .downcast_ref::() + .context(UnexpectedSnafu { + reason: "Primary key column is not a dictionary array".to_string(), + })?; + let pk_values = pk_dict_array + .values() + .as_any() + .downcast_ref::() + .context(UnexpectedSnafu { + reason: "Primary key values are not binary array".to_string(), + })?; + let keys = pk_dict_array.keys(); + if keys.is_empty() { + return Ok(None); + } + + let first_key = keys.value(0); + if first_key != keys.value(keys.len() - 1) { + return Ok(None); + } + + Ok(Some(pk_values.value(first_key as usize))) +} + /// A flat format reader that returns RecordBatch instead of Batch. pub struct FlatPruneReader { /// Context for file ranges. context: FileRangeContextRef, source: FlatSource, - primary_key_filter: Option>, + primary_key_filter: Option, + covered_primary_key_filter_columns: Option>, + buffered_prefiltered_batch: Option, metrics: ReaderMetrics, /// Whether to skip field filters for this row group. skip_fields: bool, @@ -281,7 +349,11 @@ impl FlatPruneReader { skip_fields: bool, ) -> Self { Self { - primary_key_filter: ctx.new_primary_key_filter(), + primary_key_filter: ctx + .new_primary_key_filter() + .map(CachedPrimaryKeyFilter::new), + covered_primary_key_filter_columns: ctx.covered_primary_key_filter_columns(), + buffered_prefiltered_batch: None, context: ctx, source: FlatSource::RowGroup(reader), metrics: Default::default(), @@ -296,6 +368,8 @@ impl FlatPruneReader { ) -> Self { Self { primary_key_filter: None, + covered_primary_key_filter_columns: None, + buffered_prefiltered_batch: None, context: ctx, source: FlatSource::LastRow(reader), metrics: Default::default(), @@ -309,25 +383,17 @@ impl FlatPruneReader { } pub(crate) fn next_batch(&mut self) -> Result> { - while let Some(raw_batch) = { - let start = std::time::Instant::now(); - let batch = self.source.next_raw_batch()?; - self.metrics.scan_cost += start.elapsed(); - batch - } { - self.metrics.num_rows += raw_batch.num_rows(); - self.metrics.num_batches += 1; - - let num_rows_before_prefilter = raw_batch.num_rows(); - let Some(prefiltered_batch) = self.prefilter_primary_keys(raw_batch)? else { - self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_prefilter; - continue; + loop { + let Some(mut raw_batch) = self.next_prefiltered_batch()? else { + return Ok(None); }; - let prefiltered_rows = num_rows_before_prefilter - prefiltered_batch.num_rows(); - self.metrics.filter_metrics.rows_precise_filtered += prefiltered_rows; - let record_batch = self.source.convert_batch(prefiltered_batch)?; + let scan_start = std::time::Instant::now(); + self.coalesce_prefiltered_batches(&mut raw_batch)?; + let record_batch = self.source.convert_batch(raw_batch)?; + self.metrics.scan_cost += scan_start.elapsed(); + self.metrics.num_batches += 1; match self.prune_flat(record_batch)? { Some(filtered_batch) => { return Ok(Some(filtered_batch)); @@ -337,8 +403,56 @@ impl FlatPruneReader { } } } + } - Ok(None) + fn next_prefiltered_batch(&mut self) -> Result> { + if let Some(batch) = self.buffered_prefiltered_batch.take() { + return Ok(Some(batch)); + } + + loop { + let start = std::time::Instant::now(); + let Some(raw_batch) = self.source.next_raw_batch()? else { + return Ok(None); + }; + + self.metrics.num_rows += raw_batch.num_rows(); + + let num_rows_before_prefilter = raw_batch.num_rows(); + let Some(prefiltered_batch) = self.prefilter_primary_keys(raw_batch)? else { + self.metrics.scan_cost += start.elapsed(); + self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_prefilter; + continue; + }; + let prefiltered_rows = num_rows_before_prefilter - prefiltered_batch.num_rows(); + self.metrics.filter_metrics.rows_precise_filtered += prefiltered_rows; + self.metrics.scan_cost += start.elapsed(); + return Ok(Some(prefiltered_batch)); + } + } + + fn coalesce_prefiltered_batches(&mut self, batch: &mut RecordBatch) -> Result<()> { + let Some(primary_key) = batch_single_primary_key(batch)? else { + return Ok(()); + }; + let primary_key = primary_key.to_vec(); + let schema = batch.schema(); + let mut batches = vec![batch.clone()]; + + while let Some(next_batch) = self.next_prefiltered_batch()? { + if batch_single_primary_key(&next_batch)? == Some(primary_key.as_slice()) { + batches.push(next_batch); + } else { + self.buffered_prefiltered_batch = Some(next_batch); + break; + } + } + + if batches.len() > 1 { + *batch = concat_batches(&schema, &batches).context(ComputeArrowSnafu)?; + } + + Ok(()) } fn prefilter_primary_keys(&mut self, record_batch: RecordBatch) -> Result> { @@ -347,7 +461,7 @@ impl FlatPruneReader { }; self.context - .prefilter_flat_batch_by_primary_key(record_batch, primary_key_filter.as_mut()) + .prefilter_flat_batch_by_primary_key(record_batch, primary_key_filter) } /// Prunes batches by the pushed down predicate and returns RecordBatch. @@ -358,9 +472,11 @@ impl FlatPruneReader { } let num_rows_before_filter = record_batch.num_rows(); - let Some(filtered_batch) = self - .context - .precise_filter_flat(record_batch, self.skip_fields)? + let Some(filtered_batch) = self.context.precise_filter_flat( + record_batch, + self.skip_fields, + self.covered_primary_key_filter_columns.as_ref(), + )? else { // the entire batch is filtered out self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter; diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 5a4681ae22..802d8f15b1 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -15,7 +15,7 @@ //! Structs and functions for reading ranges from a parquet file. A file range //! is usually a row group in a parquet file. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::ops::{BitAnd, Range}; use std::sync::Arc; @@ -441,6 +441,12 @@ impl FileRangeContext { self.base.new_primary_key_filter() } + /// Returns tag columns whose simple filters are already guaranteed by the + /// encoded primary-key prefilter. + pub(crate) fn covered_primary_key_filter_columns(&self) -> Option> { + self.base.covered_primary_key_filter_columns() + } + /// Returns true if a partition filter is configured. pub(crate) fn has_partition_filter(&self) -> bool { self.base.partition_filter.is_some() @@ -484,8 +490,10 @@ impl FileRangeContext { &self, input: RecordBatch, skip_fields: bool, + skip_tag_filter_columns: Option<&HashSet>, ) -> Result> { - self.base.precise_filter_flat(input, skip_fields) + self.base + .precise_filter_flat(input, skip_fields, skip_tag_filter_columns) } /// Applies an encoded primary-key prefilter to the input `RecordBatch`. @@ -660,6 +668,32 @@ impl RangeBase { ) } + pub(crate) fn covered_primary_key_filter_columns(&self) -> Option> { + if self.read_format.metadata().primary_key.is_empty() + || !self + .read_format + .as_flat() + .is_some_and(|format| format.raw_batch_has_primary_key_dictionary()) + { + return None; + } + + let sst_metadata = self.read_format.metadata(); + let expected_metadata = self.expected_metadata.as_deref(); + let filters = self.usable_primary_key_filters()?; + let column_ids = filters + .iter() + .filter_map(|filter| { + expected_metadata + .and_then(|metadata| metadata.column_by_name(filter.column_name())) + .or_else(|| sst_metadata.column_by_name(filter.column_name())) + .map(|column| column.column_id) + }) + .collect::>(); + + (!column_ids.is_empty()).then_some(column_ids) + } + /// Applies an encoded primary-key prefilter before flat-row materialization. /// /// This only prunes rows that are guaranteed to fail simple primary-key predicates. @@ -885,15 +919,25 @@ impl RangeBase { &self, input: RecordBatch, skip_fields: bool, + skip_tag_filter_columns: Option<&HashSet>, ) -> Result> { let mut tag_decode_state = TagDecodeState::new(); - let mask = self.compute_filter_mask_flat(&input, skip_fields, &mut tag_decode_state)?; + let mask = self.compute_filter_mask_flat( + &input, + skip_fields, + skip_tag_filter_columns, + &mut tag_decode_state, + )?; // If mask is None, the entire batch is filtered out let Some(mut mask) = mask else { return Ok(None); }; + if self.partition_filter.is_none() && mask.count_set_bits() == input.num_rows() { + return Ok(Some(input)); + } + // Apply partition filter if let Some(partition_filter) = &self.partition_filter { let record_batch = self.project_record_batch_for_pruning_flat( @@ -932,6 +976,7 @@ impl RangeBase { &self, input: &RecordBatch, skip_fields: bool, + skip_tag_filter_columns: Option<&HashSet>, tag_decode_state: &mut TagDecodeState, ) -> Result> { let mut mask = BooleanBuffer::new_set(input.num_rows()); @@ -959,6 +1004,13 @@ impl RangeBase { continue; } + if skip_tag_filter_columns.is_some_and(|columns| { + filter_ctx.semantic_type() == SemanticType::Tag + && columns.contains(&filter_ctx.column_id()) + }) { + continue; + } + // Get the column directly by its projected index. // If the column is missing and it's not a tag/time column, this filter is skipped. // Assumes the projection indices align with the input batch schema.