diff --git a/src/mito2/src/memtable/bulk/context.rs b/src/mito2/src/memtable/bulk/context.rs index 999ea23eb3..228fa2d6c7 100644 --- a/src/mito2/src/memtable/bulk/context.rs +++ b/src/mito2/src/memtable/bulk/context.rs @@ -17,8 +17,7 @@ use std::collections::VecDeque; use std::sync::Arc; -use common_recordbatch::filter::SimpleFilterEvaluator; -use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyFilter, build_primary_key_codec}; +use mito_codec::row_converter::{DensePrimaryKeyCodec, build_primary_key_codec}; use parquet::file::metadata::ParquetMetaData; use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; @@ -87,21 +86,11 @@ impl BulkIterContext { .as_ref() .map(|pred| pred.dyn_filters().as_ref().clone()) .unwrap_or_default(); - let primary_key_filters = predicate - .as_ref() - .map(|pred| { - pred.exprs() - .iter() - .filter_map(SimpleFilterEvaluator::try_new) - .collect::>() - }) - .filter(|filters| !filters.is_empty()) - .map(Arc::new); Ok(Self { base: RangeBase { filters: simple_filters, - primary_key_filters, + primary_key_filters: None, dyn_filters, read_format, prune_schema: region_metadata.schema.clone(), @@ -149,10 +138,6 @@ impl BulkIterContext { &self.base.read_format } - pub(crate) fn new_primary_key_filter(&self) -> Option> { - self.base.new_primary_key_filter() - } - /// Returns the pre-filter mode. pub(crate) fn pre_filter_mode(&self) -> PreFilterMode { self.base.pre_filter_mode diff --git a/src/mito2/src/memtable/bulk/part_reader.rs b/src/mito2/src/memtable/bulk/part_reader.rs index 383d7059ba..1e9d955321 100644 --- a/src/mito2/src/memtable/bulk/part_reader.rs +++ b/src/mito2/src/memtable/bulk/part_reader.rs @@ -17,7 +17,6 @@ use std::time::Instant; use datatypes::arrow::array::BooleanArray; use datatypes::arrow::record_batch::RecordBatch; -use mito_codec::row_converter::PrimaryKeyFilter; use parquet::arrow::ProjectionMask; use parquet::arrow::arrow_reader::ParquetRecordBatchReader; use snafu::ResultExt; @@ -47,7 +46,6 @@ pub struct EncodedBulkPartIter { metrics: MemScanMetricsData, /// Optional memory scan metrics to report to. mem_scan_metrics: Option, - primary_key_filter: Option>, } impl EncodedBulkPartIter { @@ -60,7 +58,6 @@ impl EncodedBulkPartIter { mem_scan_metrics: Option, ) -> error::Result { assert!(context.read_format().as_flat().is_some()); - let primary_key_filter = context.base.new_primary_key_filter(); let parquet_meta = encoded_part.metadata().parquet_metadata.clone(); let data = encoded_part.data().clone(); @@ -94,7 +91,6 @@ impl EncodedBulkPartIter { ..Default::default() }, mem_scan_metrics, - primary_key_filter, }) } @@ -119,7 +115,6 @@ impl EncodedBulkPartIter { if let Some(batch) = apply_combined_filters( &self.context, &self.sequence, - &mut self.primary_key_filter, batch, self.current_skip_fields, )? { @@ -146,7 +141,6 @@ impl EncodedBulkPartIter { if let Some(batch) = apply_combined_filters( &self.context, &self.sequence, - &mut self.primary_key_filter, batch, self.current_skip_fields, )? { @@ -216,7 +210,6 @@ pub struct BulkPartBatchIter { metrics: MemScanMetricsData, /// Optional memory scan metrics to report to. mem_scan_metrics: Option, - primary_key_filter: Option>, } impl BulkPartBatchIter { @@ -229,7 +222,6 @@ impl BulkPartBatchIter { mem_scan_metrics: Option, ) -> Self { assert!(context.read_format().as_flat().is_some()); - let primary_key_filter = context.base.new_primary_key_filter(); Self { batches: VecDeque::from(batches), @@ -240,7 +232,6 @@ impl BulkPartBatchIter { ..Default::default() }, mem_scan_metrics, - primary_key_filter, } } @@ -292,13 +283,8 @@ impl BulkPartBatchIter { PreFilterMode::SkipFieldsOnDelete => true, }; - let Some(filtered_batch) = apply_combined_filters( - &self.context, - &self.sequence, - &mut self.primary_key_filter, - projected_batch, - skip_fields, - )? + let Some(filtered_batch) = + apply_combined_filters(&self.context, &self.sequence, projected_batch, skip_fields)? else { self.metrics.scan_cost += start.elapsed(); return Ok(None); @@ -366,23 +352,9 @@ impl Drop for BulkPartBatchIter { fn apply_combined_filters( context: &BulkIterContext, sequence: &Option, - primary_key_filter: &mut Option>, record_batch: RecordBatch, skip_fields: bool, ) -> error::Result> { - let covered_primary_key_filter_columns = primary_key_filter - .as_ref() - .and_then(|_| context.base.covered_primary_key_filter_columns()); - let record_batch = match primary_key_filter.as_mut() { - Some(primary_key_filter) => context - .base - .prefilter_flat_batch_by_primary_key(record_batch, primary_key_filter.as_mut())?, - None => Some(record_batch), - }; - let Some(record_batch) = record_batch else { - return Ok(None); - }; - // Converts the format to the flat format first. let format = context.read_format().as_flat().unwrap(); let record_batch = format.convert_batch(record_batch, None)?; @@ -396,7 +368,6 @@ fn apply_combined_filters( let predicate_mask = context.base.compute_filter_mask_flat( &record_batch, skip_fields, - covered_primary_key_filter_columns.as_ref(), &mut tag_decode_state, )?; // If predicate filters out the entire batch, return None early @@ -453,7 +424,7 @@ mod tests { ArrayRef, BinaryArray, DictionaryArray, Int64Array, StringArray, UInt8Array, UInt32Array, UInt64Array, }; - use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type}; + use datatypes::arrow::datatypes::{DataType, Field, Schema}; use datatypes::data_type::ConcreteDataType; use datatypes::schema::ColumnSchema; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; @@ -462,15 +433,6 @@ mod tests { use super::*; use crate::memtable::bulk::context::BulkIterContext; - use crate::test_util::sst_util::new_primary_key; - - fn encoded_primary_key_array(tags: &[&str]) -> Arc> { - let values = Arc::new(BinaryArray::from_iter_values( - tags.iter().map(|tag| new_primary_key(&[*tag])), - )); - let keys = UInt32Array::from_iter_values(0..tags.len() as u32); - Arc::new(DictionaryArray::new(keys, values)) - } #[test] fn test_bulk_part_batch_iter() { @@ -500,7 +462,10 @@ mod tests { )); // Create primary key dictionary array - let primary_key = encoded_primary_key_array(&["key1", "key2", "key3"]); + use datatypes::arrow::array::{BinaryArray, DictionaryArray, UInt32Array}; + let values = Arc::new(BinaryArray::from_iter_values([b"key1", b"key2", b"key3"])); + let keys = UInt32Array::from(vec![0, 1, 2]); + let primary_key = Arc::new(DictionaryArray::new(keys, values)); let sequence = Arc::new(UInt64Array::from(vec![1, 2, 3])); let op_type = Arc::new(UInt8Array::from(vec![1, 1, 1])); // PUT operations @@ -636,7 +601,9 @@ mod tests { let timestamp_1 = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from( vec![1000, 2000], )); - let primary_key_1 = encoded_primary_key_array(&["key1", "key2"]); + let values_1 = Arc::new(BinaryArray::from_iter_values([b"key1", b"key2"])); + let keys_1 = UInt32Array::from(vec![0, 1]); + let primary_key_1 = Arc::new(DictionaryArray::new(keys_1, values_1)); let sequence_1 = Arc::new(UInt64Array::from(vec![1, 2])); let op_type_1 = Arc::new(UInt8Array::from(vec![1, 1])); @@ -659,7 +626,9 @@ mod tests { let timestamp_2 = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from( vec![3000, 4000, 5000], )); - let primary_key_2 = encoded_primary_key_array(&["key3", "key4", "key5"]); + let values_2 = Arc::new(BinaryArray::from_iter_values([b"key3", b"key4", b"key5"])); + let keys_2 = UInt32Array::from(vec![0, 1, 2]); + let primary_key_2 = Arc::new(DictionaryArray::new(keys_2, values_2)); let sequence_2 = Arc::new(UInt64Array::from(vec![3, 4, 5])); let op_type_2 = Arc::new(UInt8Array::from(vec![1, 1, 1])); diff --git a/src/mito2/src/read/last_row.rs b/src/mito2/src/read/last_row.rs index b3c5a30f5c..0c13c120a0 100644 --- a/src/mito2/src/read/last_row.rs +++ b/src/mito2/src/read/last_row.rs @@ -22,7 +22,6 @@ use datatypes::arrow::compute::concat_batches; use datatypes::arrow::record_batch::RecordBatch; use datatypes::vectors::UInt32Vector; use futures::{Stream, TryStreamExt}; -use mito_codec::row_converter::PrimaryKeyFilter; use snafu::ResultExt; use store_api::storage::{FileId, TimeSeriesRowSelector}; @@ -313,7 +312,6 @@ impl FlatRowGroupLastRowCachedReader { cache_strategy: CacheStrategy, projection: &[usize], reader: FlatRowGroupReader, - primary_key_filter: Option>, ) -> Self { let key = SelectorResultKey { file_id, @@ -327,10 +325,10 @@ impl FlatRowGroupLastRowCachedReader { if is_flat && schema_matches { Self::new_hit(value) } else { - Self::new_miss(key, projection, reader, cache_strategy, primary_key_filter) + Self::new_miss(key, projection, reader, cache_strategy) } } else { - Self::new_miss(key, projection, reader, cache_strategy, primary_key_filter) + Self::new_miss(key, projection, reader, cache_strategy) } } @@ -352,7 +350,6 @@ impl FlatRowGroupLastRowCachedReader { projection: &[usize], reader: FlatRowGroupReader, cache_strategy: CacheStrategy, - primary_key_filter: Option>, ) -> Self { selector_result_cache_miss(); Self::Miss(FlatRowGroupLastRowReader::new( @@ -360,7 +357,6 @@ impl FlatRowGroupLastRowCachedReader { projection.to_vec(), reader, cache_strategy, - primary_key_filter, )) } } @@ -434,7 +430,6 @@ impl BatchBuffer { pub(crate) struct FlatRowGroupLastRowReader { key: SelectorResultKey, reader: FlatRowGroupReader, - primary_key_filter: Option>, selector: FlatLastTimestampSelector, yielded_batches: Vec, cache_strategy: CacheStrategy, @@ -449,12 +444,10 @@ impl FlatRowGroupLastRowReader { projection: Vec, reader: FlatRowGroupReader, cache_strategy: CacheStrategy, - primary_key_filter: Option>, ) -> Self { Self { key, reader, - primary_key_filter, selector: FlatLastTimestampSelector::default(), yielded_batches: vec![], cache_strategy, @@ -478,11 +471,7 @@ impl FlatRowGroupLastRowReader { return self.flush_pending(); } - while let Some(raw_batch) = self.reader.next_raw_batch()? { - let Some(raw_batch) = self.prefilter_primary_keys(raw_batch)? else { - continue; - }; - let batch = self.reader.convert_batch(raw_batch)?; + while let Some(batch) = self.reader.next_batch()? { self.selector.on_next(batch, &mut self.pending)?; if self.pending.is_full() { return self.flush_pending(); @@ -503,26 +492,10 @@ impl FlatRowGroupLastRowReader { Ok(None) } - fn prefilter_primary_keys(&mut self, batch: RecordBatch) -> Result> { - let Some(primary_key_filter) = self.primary_key_filter.as_mut() else { - return Ok(Some(batch)); - }; - - self.reader - .prefilter_raw_batch_by_primary_key(batch, primary_key_filter.as_mut()) - } - fn maybe_update_cache(&mut self) { if self.yielded_batches.is_empty() { return; } - - // Filtered flat last-row scans only contain the subset of series that matched the - // encoded primary-key prefilter, so they cannot be published under the shared - // selector cache key. - if self.primary_key_filter.is_some() { - return; - } let batches = std::mem::take(&mut self.yielded_batches); let value = Arc::new(SelectorResultValue::new_flat( batches, diff --git a/src/mito2/src/read/prune.rs b/src/mito2/src/read/prune.rs index 7a23eb2069..881f1f0836 100644 --- a/src/mito2/src/read/prune.rs +++ b/src/mito2/src/read/prune.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; use std::ops::BitAnd; use std::sync::Arc; @@ -20,20 +19,16 @@ use common_recordbatch::filter::SimpleFilterEvaluator; use common_time::Timestamp; use datatypes::arrow::array::BooleanArray; use datatypes::arrow::buffer::BooleanBuffer; -use datatypes::arrow::compute::concat_batches; use datatypes::arrow::record_batch::RecordBatch; use mito_codec::row_converter::PrimaryKeyFilter; -use snafu::{OptionExt, ResultExt}; -use store_api::storage::ColumnId; +use snafu::ResultExt; -use crate::error::{ComputeArrowSnafu, RecordBatchSnafu, Result, UnexpectedSnafu}; +use crate::error::{RecordBatchSnafu, Result}; use crate::memtable::BoxedBatchIterator; use crate::read::last_row::{FlatRowGroupLastRowCachedReader, RowGroupLastRowCachedReader}; use crate::read::{Batch, BatchReader}; use crate::sst::file::FileTimeRange; use crate::sst::parquet::file_range::FileRangeContextRef; -use crate::sst::parquet::flat_format::primary_key_column_index; -use crate::sst::parquet::format::PrimaryKeyArray; use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics, RowGroupReader}; pub enum Source { @@ -268,75 +263,12 @@ impl FlatSource { } } -struct CachedPrimaryKeyFilter { - inner: Box, - last_primary_key: Vec, - last_match: Option, -} - -impl CachedPrimaryKeyFilter { - fn new(inner: Box) -> Self { - Self { - inner, - last_primary_key: Vec::new(), - last_match: None, - } - } -} - -impl PrimaryKeyFilter for CachedPrimaryKeyFilter { - fn matches(&mut self, pk: &[u8]) -> bool { - if let Some(last_match) = self.last_match - && self.last_primary_key == pk - { - return last_match; - } - - let matched = self.inner.matches(pk); - self.last_primary_key.clear(); - self.last_primary_key.extend_from_slice(pk); - self.last_match = Some(matched); - matched - } -} - -fn batch_single_primary_key(batch: &RecordBatch) -> Result> { - let primary_key_index = primary_key_column_index(batch.num_columns()); - let pk_dict_array = batch - .column(primary_key_index) - .as_any() - .downcast_ref::() - .context(UnexpectedSnafu { - reason: "Primary key column is not a dictionary array".to_string(), - })?; - let pk_values = pk_dict_array - .values() - .as_any() - .downcast_ref::() - .context(UnexpectedSnafu { - reason: "Primary key values are not binary array".to_string(), - })?; - let keys = pk_dict_array.keys(); - if keys.is_empty() { - return Ok(None); - } - - let first_key = keys.value(0); - if first_key != keys.value(keys.len() - 1) { - return Ok(None); - } - - Ok(Some(pk_values.value(first_key as usize))) -} - /// A flat format reader that returns RecordBatch instead of Batch. pub struct FlatPruneReader { /// Context for file ranges. context: FileRangeContextRef, source: FlatSource, - primary_key_filter: Option, - covered_primary_key_filter_columns: Option>, - buffered_prefiltered_batch: Option, + primary_key_filter: Option>, metrics: ReaderMetrics, /// Whether to skip field filters for this row group. skip_fields: bool, @@ -349,11 +281,7 @@ impl FlatPruneReader { skip_fields: bool, ) -> Self { Self { - primary_key_filter: ctx - .new_primary_key_filter() - .map(CachedPrimaryKeyFilter::new), - covered_primary_key_filter_columns: ctx.covered_primary_key_filter_columns(), - buffered_prefiltered_batch: None, + primary_key_filter: ctx.new_primary_key_filter(), context: ctx, source: FlatSource::RowGroup(reader), metrics: Default::default(), @@ -368,8 +296,6 @@ impl FlatPruneReader { ) -> Self { Self { primary_key_filter: None, - covered_primary_key_filter_columns: None, - buffered_prefiltered_batch: None, context: ctx, source: FlatSource::LastRow(reader), metrics: Default::default(), @@ -383,18 +309,25 @@ impl FlatPruneReader { } pub(crate) fn next_batch(&mut self) -> Result> { - loop { - let Some(mut raw_batch) = self.next_prefiltered_batch()? else { - return Ok(None); - }; - - let scan_start = std::time::Instant::now(); - self.coalesce_prefiltered_batches(&mut raw_batch)?; - let record_batch = self.source.convert_batch(raw_batch)?; - self.metrics.scan_cost += scan_start.elapsed(); - - // `num_batches` counts decoded flat batches, not raw parquet batches. + while let Some(raw_batch) = { + let start = std::time::Instant::now(); + let batch = self.source.next_raw_batch()?; + self.metrics.scan_cost += start.elapsed(); + batch + } { + self.metrics.num_rows += raw_batch.num_rows(); self.metrics.num_batches += 1; + + let num_rows_before_prefilter = raw_batch.num_rows(); + let Some(prefiltered_batch) = self.prefilter_primary_keys(raw_batch)? else { + self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_prefilter; + continue; + }; + let prefiltered_rows = num_rows_before_prefilter - prefiltered_batch.num_rows(); + self.metrics.filter_metrics.rows_precise_filtered += prefiltered_rows; + + let record_batch = self.source.convert_batch(prefiltered_batch)?; + match self.prune_flat(record_batch)? { Some(filtered_batch) => { return Ok(Some(filtered_batch)); @@ -404,56 +337,8 @@ impl FlatPruneReader { } } } - } - fn next_prefiltered_batch(&mut self) -> Result> { - if let Some(batch) = self.buffered_prefiltered_batch.take() { - return Ok(Some(batch)); - } - - loop { - let start = std::time::Instant::now(); - let Some(raw_batch) = self.source.next_raw_batch()? else { - return Ok(None); - }; - - self.metrics.num_rows += raw_batch.num_rows(); - - let num_rows_before_prefilter = raw_batch.num_rows(); - let Some(prefiltered_batch) = self.prefilter_primary_keys(raw_batch)? else { - self.metrics.scan_cost += start.elapsed(); - self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_prefilter; - continue; - }; - let prefiltered_rows = num_rows_before_prefilter - prefiltered_batch.num_rows(); - self.metrics.filter_metrics.rows_precise_filtered += prefiltered_rows; - self.metrics.scan_cost += start.elapsed(); - return Ok(Some(prefiltered_batch)); - } - } - - fn coalesce_prefiltered_batches(&mut self, batch: &mut RecordBatch) -> Result<()> { - let Some(primary_key) = batch_single_primary_key(batch)? else { - return Ok(()); - }; - let primary_key = primary_key.to_vec(); - let schema = batch.schema(); - let mut batches = vec![batch.clone()]; - - while let Some(next_batch) = self.next_prefiltered_batch()? { - if batch_single_primary_key(&next_batch)? == Some(primary_key.as_slice()) { - batches.push(next_batch); - } else { - self.buffered_prefiltered_batch = Some(next_batch); - break; - } - } - - if batches.len() > 1 { - *batch = concat_batches(&schema, &batches).context(ComputeArrowSnafu)?; - } - - Ok(()) + Ok(None) } fn prefilter_primary_keys(&mut self, record_batch: RecordBatch) -> Result> { @@ -462,7 +347,7 @@ impl FlatPruneReader { }; self.context - .prefilter_flat_batch_by_primary_key(record_batch, primary_key_filter) + .prefilter_flat_batch_by_primary_key(record_batch, primary_key_filter.as_mut()) } /// Prunes batches by the pushed down predicate and returns RecordBatch. @@ -473,11 +358,9 @@ impl FlatPruneReader { } let num_rows_before_filter = record_batch.num_rows(); - let Some(filtered_batch) = self.context.precise_filter_flat( - record_batch, - self.skip_fields, - self.covered_primary_key_filter_columns.as_ref(), - )? + let Some(filtered_batch) = self + .context + .precise_filter_flat(record_batch, self.skip_fields)? else { // the entire batch is filtered out self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter; @@ -498,121 +381,13 @@ impl FlatPruneReader { #[cfg(test)] mod tests { - use std::sync::Arc; - use std::sync::atomic::{AtomicUsize, Ordering}; - use api::v1::OpType; use datafusion_common::ScalarValue; use datafusion_expr::{Expr, col, lit}; - use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt32Array}; - use datatypes::arrow::datatypes::{DataType, Field, Schema, TimeUnit, UInt32Type}; use super::*; use crate::test_util::new_batch; - struct CountingPrimaryKeyFilter { - calls: Arc, - matched: bool, - } - - impl PrimaryKeyFilter for CountingPrimaryKeyFilter { - fn matches(&mut self, _pk: &[u8]) -> bool { - self.calls.fetch_add(1, Ordering::Relaxed); - self.matched - } - } - - #[test] - fn test_cached_primary_key_filter_reuses_last_match() { - let calls = Arc::new(AtomicUsize::new(0)); - let mut filter = CachedPrimaryKeyFilter::new(Box::new(CountingPrimaryKeyFilter { - calls: calls.clone(), - matched: true, - })); - - assert!(filter.matches(b"series-a")); - assert!(filter.matches(b"series-a")); - assert_eq!(calls.load(Ordering::Relaxed), 1); - - assert!(filter.matches(b"series-b")); - assert!(filter.matches(b"series-b")); - assert_eq!(calls.load(Ordering::Relaxed), 2); - } - - fn new_flat_raw_batch(primary_keys: &[&[u8]]) -> RecordBatch { - let schema = Arc::new(Schema::new(vec![ - Field::new("value", DataType::Float64, true), - Field::new( - "ts", - DataType::Timestamp(TimeUnit::Millisecond, None), - false, - ), - Field::new( - "__primary_key", - DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)), - false, - ), - Field::new("__sequence", DataType::UInt64, false), - Field::new("__op_type", DataType::UInt8, false), - ])); - - let mut dict_values = Vec::new(); - let mut keys = Vec::with_capacity(primary_keys.len()); - for pk in primary_keys { - let key = dict_values - .iter() - .position(|existing: &&[u8]| existing == pk) - .unwrap_or_else(|| { - dict_values.push(*pk); - dict_values.len() - 1 - }); - keys.push(key as u32); - } - - let pk_array: ArrayRef = Arc::new(DictionaryArray::::new( - UInt32Array::from(keys), - Arc::new(BinaryArray::from_iter_values(dict_values.iter().copied())), - )); - - RecordBatch::try_new( - schema, - vec![ - Arc::new(datatypes::arrow::array::Float64Array::from( - vec![1.0; primary_keys.len()], - )), - Arc::new( - datatypes::arrow::array::TimestampMillisecondArray::from_iter_values( - 0..primary_keys.len() as i64, - ), - ), - pk_array, - Arc::new(datatypes::arrow::array::UInt64Array::from(vec![ - 1; - primary_keys - .len() - ])), - Arc::new(datatypes::arrow::array::UInt8Array::from(vec![ - 1; - primary_keys - .len() - ])), - ], - ) - .unwrap() - } - - #[test] - fn test_batch_single_primary_key_detects_single_series_batch() { - let batch = new_flat_raw_batch(&[b"series-a", b"series-a"]); - assert_eq!( - batch_single_primary_key(&batch).unwrap(), - Some(&b"series-a"[..]) - ); - - let batch = new_flat_raw_batch(&[b"series-a", b"series-b"]); - assert!(batch_single_primary_key(&batch).unwrap().is_none()); - } - #[test] fn test_prune_time_iter_empty() { let input = []; diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 808a75b85a..5a4681ae22 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -15,7 +15,7 @@ //! Structs and functions for reading ranges from a parquet file. A file range //! is usually a row group in a parquet file. -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::ops::{BitAnd, Range}; use std::sync::Arc; @@ -294,7 +294,6 @@ impl FileRange { self.context.reader_builder.cache_strategy().clone(), self.context.read_format().projection_indices(), flat_row_group_reader, - self.context.new_primary_key_filter(), ); FlatPruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields) } else { @@ -442,12 +441,6 @@ impl FileRangeContext { self.base.new_primary_key_filter() } - /// Returns tag columns whose simple filters are already guaranteed by the - /// encoded primary-key prefilter. - pub(crate) fn covered_primary_key_filter_columns(&self) -> Option> { - self.base.covered_primary_key_filter_columns() - } - /// Returns true if a partition filter is configured. pub(crate) fn has_partition_filter(&self) -> bool { self.base.partition_filter.is_some() @@ -491,10 +484,8 @@ impl FileRangeContext { &self, input: RecordBatch, skip_fields: bool, - skip_tag_filter_columns: Option<&HashSet>, ) -> Result> { - self.base - .precise_filter_flat(input, skip_fields, skip_tag_filter_columns) + self.base.precise_filter_flat(input, skip_fields) } /// Applies an encoded primary-key prefilter to the input `RecordBatch`. @@ -669,32 +660,6 @@ impl RangeBase { ) } - pub(crate) fn covered_primary_key_filter_columns(&self) -> Option> { - if self.read_format.metadata().primary_key.is_empty() - || !self - .read_format - .as_flat() - .is_some_and(|format| format.raw_batch_has_primary_key_dictionary()) - { - return None; - } - - let sst_metadata = self.read_format.metadata(); - let expected_metadata = self.expected_metadata.as_deref(); - let filters = self.usable_primary_key_filters()?; - let column_ids = filters - .iter() - .filter_map(|filter| { - expected_metadata - .and_then(|metadata| metadata.column_by_name(filter.column_name())) - .or_else(|| sst_metadata.column_by_name(filter.column_name())) - .map(|column| column.column_id) - }) - .collect::>(); - - (!column_ids.is_empty()).then_some(column_ids) - } - /// Applies an encoded primary-key prefilter before flat-row materialization. /// /// This only prunes rows that are guaranteed to fail simple primary-key predicates. @@ -920,25 +885,15 @@ impl RangeBase { &self, input: RecordBatch, skip_fields: bool, - skip_tag_filter_columns: Option<&HashSet>, ) -> Result> { let mut tag_decode_state = TagDecodeState::new(); - let mask = self.compute_filter_mask_flat( - &input, - skip_fields, - skip_tag_filter_columns, - &mut tag_decode_state, - )?; + let mask = self.compute_filter_mask_flat(&input, skip_fields, &mut tag_decode_state)?; // If mask is None, the entire batch is filtered out let Some(mut mask) = mask else { return Ok(None); }; - if self.partition_filter.is_none() && mask.count_set_bits() == input.num_rows() { - return Ok(Some(input)); - } - // Apply partition filter if let Some(partition_filter) = &self.partition_filter { let record_batch = self.project_record_batch_for_pruning_flat( @@ -977,7 +932,6 @@ impl RangeBase { &self, input: &RecordBatch, skip_fields: bool, - skip_tag_filter_columns: Option<&HashSet>, tag_decode_state: &mut TagDecodeState, ) -> Result> { let mut mask = BooleanBuffer::new_set(input.num_rows()); @@ -1005,13 +959,6 @@ impl RangeBase { continue; } - if skip_tag_filter_columns.is_some_and(|columns| { - filter_ctx.semantic_type() == SemanticType::Tag - && columns.contains(&filter_ctx.column_id()) - }) { - continue; - } - // Get the column directly by its projected index. // If the column is missing and it's not a tag/time column, this filter is skipped. // Assumes the projection indices align with the input batch schema. @@ -1504,20 +1451,6 @@ mod tests { assert!(base.new_primary_key_filter().is_none()); } - #[test] - fn test_covered_primary_key_filter_columns_only_include_prefiltered_tags() { - let base = new_test_range_base(&[ - col("tag_0").eq(lit("b")), - col("field_0").eq(lit(1_u64)), - col("ts").gt(lit(0_i64)), - ]); - - let covered_columns = base.covered_primary_key_filter_columns().unwrap(); - let metadata = sst_region_metadata(); - assert_eq!(covered_columns.len(), 1); - assert!(covered_columns.contains(&metadata.column_by_name("tag_0").unwrap().column_id)); - } - #[test] fn test_prefilter_primary_key_ignores_reused_expected_tag_name() { let metadata = Arc::new(sst_region_metadata()); @@ -1678,19 +1611,4 @@ mod tests { assert_eq!(filtered.num_rows(), 2); assert_eq!(field_values(&filtered), vec![12, 13]); } - - #[test] - fn test_precise_filter_flat_skips_prefiltered_tag_decode() { - let base = new_test_range_base(&[col("tag_0").eq(lit("b"))]); - let skip_tag_filter_columns = base.covered_primary_key_filter_columns().unwrap(); - let batch = new_raw_batch(&[b"not-a-valid-primary-key"], &[10]); - - let filtered = base - .precise_filter_flat(batch, false, Some(&skip_tag_filter_columns)) - .unwrap() - .unwrap(); - - assert_eq!(filtered.num_rows(), 1); - assert_eq!(field_values(&filtered), vec![10]); - } } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 4c15dc29e1..67564e6445 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -30,7 +30,7 @@ use datatypes::arrow::error::ArrowError; use datatypes::arrow::record_batch::RecordBatch; use datatypes::data_type::ConcreteDataType; use datatypes::prelude::DataType; -use mito_codec::row_converter::{PrimaryKeyFilter, build_primary_key_codec}; +use mito_codec::row_converter::build_primary_key_codec; use object_store::ObjectStore; use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection}; use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels}; @@ -1606,12 +1606,11 @@ pub struct ReaderMetrics { pub(crate) filter_metrics: ReaderFilterMetrics, /// Duration to build the parquet reader. pub(crate) build_cost: Duration, - /// Duration to scan the reader, including parquet fetches and decoding work - /// needed to materialize output batches. + /// Duration to scan the reader. pub(crate) scan_cost: Duration, /// Number of record batches read. pub(crate) num_record_batches: usize, - /// Number of decoded output batches materialized from parquet data. + /// Number of batches decoded. pub(crate) num_batches: usize, /// Number of rows read. pub(crate) num_rows: usize, @@ -2164,14 +2163,13 @@ impl FlatRowGroupReader { flat_format.convert_batch(record_batch, self.override_sequence.as_ref()) } - /// Applies the encoded primary-key prefilter to a raw parquet batch before flat conversion. - pub(crate) fn prefilter_raw_batch_by_primary_key( - &self, - record_batch: RecordBatch, - primary_key_filter: &mut dyn PrimaryKeyFilter, - ) -> Result> { - self.context - .prefilter_flat_batch_by_primary_key(record_batch, primary_key_filter) + /// Returns the next flat RecordBatch. + pub(crate) fn next_batch(&mut self) -> Result> { + let Some(record_batch) = self.next_raw_batch()? else { + return Ok(None); + }; + + self.convert_batch(record_batch).map(Some) } } diff --git a/tests/cases/standalone/common/aggregate/last_row_selector_cache_filter.result b/tests/cases/standalone/common/aggregate/last_row_selector_cache_filter.result deleted file mode 100644 index b932891f4a..0000000000 --- a/tests/cases/standalone/common/aggregate/last_row_selector_cache_filter.result +++ /dev/null @@ -1,53 +0,0 @@ -CREATE TABLE last_row_selector_cache_filter ( - host STRING, - cpu DOUBLE, - ts TIMESTAMP TIME INDEX, - PRIMARY KEY (host) -) WITH ('sst_format' = 'flat'); - -Affected Rows: 0 - -INSERT INTO last_row_selector_cache_filter VALUES - ('a', 1.0, 1000), - ('a', 2.0, 2000), - ('b', 3.0, 1000), - ('b', 4.0, 2000); - -Affected Rows: 4 - -ADMIN FLUSH_TABLE('last_row_selector_cache_filter'); - -+-----------------------------------------------------+ -| ADMIN FLUSH_TABLE('last_row_selector_cache_filter') | -+-----------------------------------------------------+ -| 0 | -+-----------------------------------------------------+ - -SELECT host, last_value(cpu ORDER BY ts) AS last_cpu -FROM last_row_selector_cache_filter -WHERE host = 'a' -GROUP BY host -ORDER BY host; - -+------+----------+ -| host | last_cpu | -+------+----------+ -| a | 2.0 | -+------+----------+ - -SELECT host, last_value(cpu ORDER BY ts) AS last_cpu -FROM last_row_selector_cache_filter -GROUP BY host -ORDER BY host; - -+------+----------+ -| host | last_cpu | -+------+----------+ -| a | 2.0 | -| b | 4.0 | -+------+----------+ - -DROP TABLE last_row_selector_cache_filter; - -Affected Rows: 0 - diff --git a/tests/cases/standalone/common/aggregate/last_row_selector_cache_filter.sql b/tests/cases/standalone/common/aggregate/last_row_selector_cache_filter.sql deleted file mode 100644 index 2e591d77ff..0000000000 --- a/tests/cases/standalone/common/aggregate/last_row_selector_cache_filter.sql +++ /dev/null @@ -1,27 +0,0 @@ -CREATE TABLE last_row_selector_cache_filter ( - host STRING, - cpu DOUBLE, - ts TIMESTAMP TIME INDEX, - PRIMARY KEY (host) -) WITH ('sst_format' = 'flat'); - -INSERT INTO last_row_selector_cache_filter VALUES - ('a', 1.0, 1000), - ('a', 2.0, 2000), - ('b', 3.0, 1000), - ('b', 4.0, 2000); - -ADMIN FLUSH_TABLE('last_row_selector_cache_filter'); - -SELECT host, last_value(cpu ORDER BY ts) AS last_cpu -FROM last_row_selector_cache_filter -WHERE host = 'a' -GROUP BY host -ORDER BY host; - -SELECT host, last_value(cpu ORDER BY ts) AS last_cpu -FROM last_row_selector_cache_filter -GROUP BY host -ORDER BY host; - -DROP TABLE last_row_selector_cache_filter;