From 7c137f270fe88cd66fabb50b0926dc517545f906 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sat, 21 Mar 2026 03:47:02 +0800 Subject: [PATCH] some simplifications Signed-off-by: Ruihang Xia --- src/mito2/src/memtable/bulk/part_reader.rs | 1 - src/mito2/src/read/prune.rs | 13 +- src/mito2/src/sst/parquet/file_range.rs | 177 +-------------------- 3 files changed, 7 insertions(+), 184 deletions(-) diff --git a/src/mito2/src/memtable/bulk/part_reader.rs b/src/mito2/src/memtable/bulk/part_reader.rs index f5bb47d2de..1e9d955321 100644 --- a/src/mito2/src/memtable/bulk/part_reader.rs +++ b/src/mito2/src/memtable/bulk/part_reader.rs @@ -368,7 +368,6 @@ fn apply_combined_filters( let predicate_mask = context.base.compute_filter_mask_flat( &record_batch, skip_fields, - None, &mut tag_decode_state, )?; // If predicate filters out the entire batch, return None early diff --git a/src/mito2/src/read/prune.rs b/src/mito2/src/read/prune.rs index 4dc0dc51dd..83e7a9dcb6 100644 --- a/src/mito2/src/read/prune.rs +++ b/src/mito2/src/read/prune.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; use std::ops::BitAnd; use std::sync::Arc; @@ -24,7 +23,6 @@ use datatypes::arrow::compute::concat_batches; use datatypes::arrow::record_batch::RecordBatch; use mito_codec::row_converter::PrimaryKeyFilter; use snafu::{OptionExt, ResultExt}; -use store_api::storage::ColumnId; use crate::error::{ComputeArrowSnafu, RecordBatchSnafu, Result, UnexpectedSnafu}; use crate::memtable::BoxedBatchIterator; @@ -335,7 +333,6 @@ pub struct FlatPruneReader { context: FileRangeContextRef, source: FlatSource, primary_key_filter: Option, - covered_primary_key_filter_columns: Option>, buffered_prefiltered_batch: Option, metrics: ReaderMetrics, /// Whether to skip field filters for this row group. @@ -352,7 +349,6 @@ impl FlatPruneReader { primary_key_filter: ctx .new_primary_key_filter() .map(CachedPrimaryKeyFilter::new), - covered_primary_key_filter_columns: ctx.covered_primary_key_filter_columns(), buffered_prefiltered_batch: None, context: ctx, source: FlatSource::RowGroup(reader), @@ -368,7 +364,6 @@ impl FlatPruneReader { ) -> Self { Self { primary_key_filter: None, - covered_primary_key_filter_columns: None, buffered_prefiltered_batch: None, context: ctx, source: FlatSource::LastRow(reader), @@ -472,11 +467,9 @@ impl FlatPruneReader { } let num_rows_before_filter = record_batch.num_rows(); - let Some(filtered_batch) = self.context.precise_filter_flat( - record_batch, - self.skip_fields, - self.covered_primary_key_filter_columns.as_ref(), - )? + let Some(filtered_batch) = self + .context + .precise_filter_flat(record_batch, self.skip_fields)? else { // the entire batch is filtered out self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter; diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 845f4f76c0..f3d79bbadd 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -15,7 +15,7 @@ //! Structs and functions for reading ranges from a parquet file. A file range //! is usually a row group in a parquet file. -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::ops::{BitAnd, Range}; use std::sync::Arc; @@ -480,12 +480,6 @@ impl FileRangeContext { self.base.new_primary_key_filter() } - /// Returns tag columns whose simple filters are already guaranteed by the - /// encoded primary-key prefilter. - pub(crate) fn covered_primary_key_filter_columns(&self) -> Option> { - self.base.covered_primary_key_filter_columns() - } - /// Returns true if a partition filter is configured. pub(crate) fn has_partition_filter(&self) -> bool { self.base.partition_filter.is_some() @@ -529,10 +523,8 @@ impl FileRangeContext { &self, input: RecordBatch, skip_fields: bool, - skip_tag_filter_columns: Option<&HashSet>, ) -> Result> { - self.base - .precise_filter_flat(input, skip_fields, skip_tag_filter_columns) + self.base.precise_filter_flat(input, skip_fields) } /// Applies an encoded primary-key prefilter to the input `RecordBatch`. @@ -707,32 +699,6 @@ impl RangeBase { ) } - pub(crate) fn covered_primary_key_filter_columns(&self) -> Option> { - if self.read_format.metadata().primary_key.is_empty() - || !self - .read_format - .as_flat() - .is_some_and(|format| format.raw_batch_has_primary_key_dictionary()) - { - return None; - } - - let sst_metadata = self.read_format.metadata(); - let expected_metadata = self.expected_metadata.as_deref(); - let filters = self.usable_primary_key_filters()?; - let column_ids = filters - .iter() - .filter_map(|filter| { - expected_metadata - .and_then(|metadata| metadata.column_by_name(filter.column_name())) - .or_else(|| sst_metadata.column_by_name(filter.column_name())) - .map(|column| column.column_id) - }) - .collect::>(); - - (!column_ids.is_empty()).then_some(column_ids) - } - /// Applies an encoded primary-key prefilter before flat-row materialization. /// /// This only prunes rows that are guaranteed to fail simple primary-key predicates. @@ -958,25 +924,15 @@ impl RangeBase { &self, input: RecordBatch, skip_fields: bool, - skip_tag_filter_columns: Option<&HashSet>, ) -> Result> { let mut tag_decode_state = TagDecodeState::new(); - let mask = self.compute_filter_mask_flat( - &input, - skip_fields, - skip_tag_filter_columns, - &mut tag_decode_state, - )?; + let mask = self.compute_filter_mask_flat(&input, skip_fields, &mut tag_decode_state)?; // If mask is None, the entire batch is filtered out let Some(mut mask) = mask else { return Ok(None); }; - if self.partition_filter.is_none() && mask.count_set_bits() == input.num_rows() { - return Ok(Some(input)); - } - // Apply partition filter if let Some(partition_filter) = &self.partition_filter { let record_batch = self.project_record_batch_for_pruning_flat( @@ -1015,7 +971,6 @@ impl RangeBase { &self, input: &RecordBatch, skip_fields: bool, - skip_tag_filter_columns: Option<&HashSet>, tag_decode_state: &mut TagDecodeState, ) -> Result> { let mut mask = BooleanBuffer::new_set(input.num_rows()); @@ -1043,13 +998,6 @@ impl RangeBase { continue; } - if skip_tag_filter_columns.is_some_and(|columns| { - filter_ctx.semantic_type() == SemanticType::Tag - && columns.contains(&filter_ctx.column_id()) - }) { - continue; - } - // Get the column directly by its projected index. // If the column is missing and it's not a tag/time column, this filter is skipped. // Assumes the projection indices align with the input batch schema. @@ -1336,8 +1284,7 @@ mod tests { use crate::sst::{internal_fields, location}; use crate::test_util::sst_util::{ new_flat_source_from_record_batches, new_primary_key, new_record_batch_by_range, - new_sparse_primary_key, sst_file_handle, sst_region_metadata, - sst_region_metadata_with_encoding, + sst_file_handle, sst_region_metadata, sst_region_metadata_with_encoding, }; const FILE_DIR: &str = "/"; @@ -1597,33 +1544,6 @@ mod tests { assert!(base.new_primary_key_filter().is_none()); } - #[test] - fn test_prefilter_primary_key_ignores_reused_expected_tag_name() { - let metadata = Arc::new(sst_region_metadata()); - let expected_metadata = expected_metadata_with_reused_tag_name(&metadata); - let pk_ax = new_primary_key(&["a", "x"]); - let pk_by = new_primary_key(&["b", "y"]); - let batch = new_raw_batch_with_metadata( - metadata.clone(), - &[pk_ax.as_slice(), pk_by.as_slice()], - &[10, 11], - ); - let base = new_test_range_base_with_expected_metadata( - metadata, - expected_metadata, - &[col("tag_0").eq(lit("b")), col("tag_1").eq(lit("x"))], - ); - let mut primary_key_filter = base.new_primary_key_filter().unwrap(); - - let filtered = base - .prefilter_flat_batch_by_primary_key(batch, primary_key_filter.as_mut()) - .unwrap() - .unwrap(); - - assert_eq!(filtered.num_rows(), 1); - assert_eq!(field_values(&filtered), vec![10]); - } - #[test] fn test_prefilter_primary_key_drops_single_dictionary_batch() { let pk_a = new_primary_key(&["a", "x"]); @@ -1638,37 +1558,6 @@ mod tests { assert!(filtered.is_none()); } - #[test] - fn test_prefilter_primary_key_returns_slice_for_contiguous_matches() { - let pk_a = new_primary_key(&["a", "x"]); - let pk_b = new_primary_key(&["b", "x"]); - let pk_c = new_primary_key(&["c", "x"]); - let pk_d = new_primary_key(&["d", "x"]); - let batch = new_raw_batch( - &[ - pk_a.as_slice(), - pk_a.as_slice(), - pk_b.as_slice(), - pk_b.as_slice(), - pk_c.as_slice(), - pk_c.as_slice(), - pk_d.as_slice(), - pk_d.as_slice(), - ], - &[10, 11, 12, 13, 14, 15, 16, 17], - ); - let base = new_test_range_base(&[col("tag_0").eq(lit("b")).or(col("tag_0").eq(lit("c")))]); - let mut primary_key_filter = base.new_primary_key_filter().unwrap(); - - let filtered = base - .prefilter_flat_batch_by_primary_key(batch, primary_key_filter.as_mut()) - .unwrap() - .unwrap(); - - assert_eq!(filtered.num_rows(), 4); - assert_eq!(field_values(&filtered), vec![12, 13, 14, 15]); - } - #[test] fn test_prefilter_primary_key_builds_mask_for_fragmented_matches() { let pk_a = new_primary_key(&["a", "x"]); @@ -1700,64 +1589,6 @@ mod tests { assert_eq!(field_values(&filtered), vec![10, 11, 14, 15]); } - #[test] - fn test_matching_row_ranges_by_primary_key_merges_adjacent_spans() { - let pk_a = new_primary_key(&["a", "x"]); - let pk_b = new_primary_key(&["b", "x"]); - let pk_c = new_primary_key(&["c", "x"]); - let pk_d = new_primary_key(&["d", "x"]); - let batch = new_raw_batch( - &[ - pk_a.as_slice(), - pk_a.as_slice(), - pk_b.as_slice(), - pk_b.as_slice(), - pk_c.as_slice(), - pk_c.as_slice(), - pk_d.as_slice(), - pk_d.as_slice(), - ], - &[10, 11, 12, 13, 14, 15, 16, 17], - ); - let base = new_test_range_base(&[col("tag_0").eq(lit("a")).or(col("tag_0").eq(lit("c")))]); - let mut primary_key_filter = base.new_primary_key_filter().unwrap(); - - let matched = base - .matching_row_ranges_by_primary_key(&batch, primary_key_filter.as_mut()) - .unwrap(); - - assert_eq!(matched, vec![0..2, 4..6]); - } - - #[test] - fn test_prefilter_primary_key_sparse_path() { - let metadata = Arc::new(sst_region_metadata_with_encoding( - PrimaryKeyEncoding::Sparse, - )); - let pk_a = new_sparse_primary_key(&["a", "x"], &metadata, 1, 11); - let pk_b = new_sparse_primary_key(&["b", "x"], &metadata, 1, 22); - let batch = new_raw_batch_with_metadata( - metadata.clone(), - &[ - pk_a.as_slice(), - pk_a.as_slice(), - pk_b.as_slice(), - pk_b.as_slice(), - ], - &[10, 11, 12, 13], - ); - let base = new_test_range_base_with_metadata(metadata, None, &[col("tag_0").eq(lit("b"))]); - let mut primary_key_filter = base.new_primary_key_filter().unwrap(); - - let filtered = base - .prefilter_flat_batch_by_primary_key(batch, primary_key_filter.as_mut()) - .unwrap() - .unwrap(); - - assert_eq!(filtered.num_rows(), 2); - assert_eq!(field_values(&filtered), vec![12, 13]); - } - async fn fetch_metrics_for_predicate(predicate: Option) -> ParquetFetchMetricsData { let object_store = object_store::ObjectStore::new(Memory::default()) .unwrap()