From b75a1125610a146f5b9e38176e82d0cdc7c36db3 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Wed, 1 Apr 2026 17:02:54 +0800 Subject: [PATCH] feat: implement prefilter for bulk memtable (#7895) * feat: prefilter in memtable Signed-off-by: evenyag * chore: fmt code Signed-off-by: evenyag * feat: bulk part reader also do prefilter Signed-off-by: evenyag * chore: extract pk filters check Signed-off-by: evenyag * fix: scanbench support explain verbose Signed-off-by: evenyag * feat: add metrics for mem prefilter Signed-off-by: evenyag * chore: address review comment Signed-off-by: evenyag * chore: remove dead code Signed-off-by: evenyag --------- Signed-off-by: evenyag --- src/cmd/src/datanode/scanbench.rs | 4 +- src/mito2/src/memtable.rs | 6 + src/mito2/src/memtable/bulk.rs | 2 - src/mito2/src/memtable/bulk/context.rs | 52 ++++- src/mito2/src/memtable/bulk/part.rs | 80 +++----- src/mito2/src/memtable/bulk/part_reader.rs | 105 ++++++++-- src/mito2/src/memtable/partition_tree/tree.rs | 1 + .../src/memtable/simple_bulk_memtable.rs | 1 + src/mito2/src/memtable/time_series.rs | 1 + src/mito2/src/read/scan_util.rs | 17 ++ src/mito2/src/sst/parquet/prefilter.rs | 185 +++++++++++++++++- 11 files changed, 377 insertions(+), 77 deletions(-) diff --git a/src/cmd/src/datanode/scanbench.rs b/src/cmd/src/datanode/scanbench.rs index 6bfe177fc1..51064126fe 100644 --- a/src/cmd/src/datanode/scanbench.rs +++ b/src/cmd/src/datanode/scanbench.rs @@ -677,7 +677,9 @@ impl ScanbenchCommand { // Scan all partitions let num_partitions = scanner.properties().partitions.len(); - let ctx = QueryScanContext::default(); + let ctx = QueryScanContext { + explain_verbose: self.verbose, + }; let metrics_set = ExecutionPlanMetricsSet::new(); let mut scan_futures = FuturesUnordered::new(); diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 3ebfdd3628..154d062e07 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -497,6 +497,8 @@ impl MemScanMetrics { metrics.num_rows += inner.num_rows; metrics.num_batches += inner.num_batches; metrics.scan_cost += inner.scan_cost; + metrics.prefilter_cost += inner.prefilter_cost; + metrics.prefilter_rows_filtered += inner.prefilter_rows_filtered; } /// Gets the metrics data. @@ -515,6 +517,10 @@ pub(crate) struct MemScanMetricsData { pub(crate) num_batches: usize, /// Duration to scan the memtable. pub(crate) scan_cost: Duration, + /// Duration of prefilter in memtable scan. + pub(crate) prefilter_cost: Duration, + /// Number of rows filtered by prefilter in memtable scan. + pub(crate) prefilter_rows_filtered: usize, } /// Encoded range in the memtable. diff --git a/src/mito2/src/memtable/bulk.rs b/src/mito2/src/memtable/bulk.rs index 502b61759d..9d25d0c39f 100644 --- a/src/mito2/src/memtable/bulk.rs +++ b/src/mito2/src/memtable/bulk.rs @@ -15,9 +15,7 @@ //! Memtable implementation for bulk load pub(crate) mod chunk_reader; -#[allow(unused)] pub mod context; -#[allow(unused)] pub mod part; pub mod part_reader; mod row_group_reader; diff --git a/src/mito2/src/memtable/bulk/context.rs b/src/mito2/src/memtable/bulk/context.rs index c3274d30e9..7551eb33af 100644 --- a/src/mito2/src/memtable/bulk/context.rs +++ b/src/mito2/src/memtable/bulk/context.rs @@ -17,7 +17,8 @@ use std::collections::VecDeque; use std::sync::Arc; -use mito_codec::row_converter::{DensePrimaryKeyCodec, build_primary_key_codec}; +use common_recordbatch::filter::SimpleFilterEvaluator; +use mito_codec::row_converter::build_primary_key_codec; use parquet::file::metadata::ParquetMetaData; use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; @@ -25,8 +26,8 @@ use table::predicate::Predicate; use crate::error::Result; use crate::sst::parquet::file_range::{PreFilterMode, RangeBase}; -use crate::sst::parquet::flat_format::FlatReadFormat; use crate::sst::parquet::format::ReadFormat; +use crate::sst::parquet::prefilter::CachedPrimaryKeyFilter; use crate::sst::parquet::reader::SimpleFilterContext; use crate::sst::parquet::stats::RowGroupPruningStats; @@ -35,6 +36,9 @@ pub(crate) type BulkIterContextRef = Arc; pub struct BulkIterContext { pub(crate) base: RangeBase, pub(crate) predicate: Option, + /// Pre-extracted primary key filters for PK prefiltering. + /// `None` if PK prefiltering is not applicable. + pk_filters: Option>>, } impl BulkIterContext { @@ -62,7 +66,7 @@ impl BulkIterContext { ) -> Result { let codec = build_primary_key_codec(®ion_metadata); - let simple_filters = predicate + let simple_filters: Vec = predicate .as_ref() .iter() .flat_map(|predicate| { @@ -87,6 +91,9 @@ impl BulkIterContext { .map(|pred| pred.dyn_filters().as_ref().clone()) .unwrap_or_default(); + // Pre-extract PK filters if applicable. + let pk_filters = Self::extract_pk_filters(&read_format, &simple_filters); + Ok(Self { base: RangeBase { filters: simple_filters, @@ -102,6 +109,7 @@ impl BulkIterContext { partition_filter: None, }, predicate, + pk_filters, }) } @@ -133,6 +141,44 @@ impl BulkIterContext { } } + /// Extracts PK filters if flat format with dictionary-encoded PKs is used. + fn extract_pk_filters( + read_format: &ReadFormat, + filters: &[SimpleFilterContext], + ) -> Option>> { + let flat_format = read_format.as_flat()?; + if flat_format.batch_has_raw_pk_columns() { + return None; + } + let metadata = read_format.metadata(); + if metadata.primary_key.is_empty() { + return None; + } + + let pk_filters: Vec<_> = filters + .iter() + .filter_map(|f| f.primary_key_prefilter()) + .collect(); + if pk_filters.is_empty() { + return None; + } + + Some(Arc::new(pk_filters)) + } + + /// Builds a fresh PK filter for a new iterator. Returns `None` if PK + /// prefiltering is not applicable. + pub(crate) fn build_pk_filter(&self) -> Option { + let pk_filters = self.pk_filters.as_ref()?; + let metadata = self.base.read_format.metadata(); + // Parquet PK prefilter always supports the partition column. + let inner = self + .base + .codec + .primary_key_filter(metadata, Arc::clone(pk_filters), false); + Some(CachedPrimaryKeyFilter::new(inner)) + } + pub(crate) fn read_format(&self) -> &ReadFormat { &self.base.read_format } diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index bf345c038e..986e9409ee 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -14,66 +14,55 @@ //! Bulk part encoder/decoder. -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::{Duration, Instant}; use api::helper::{ColumnDataTypeWrapper, to_grpc_value}; use api::v1::bulk_wal_entry::Body; -use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType, bulk_wal_entry}; +use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType}; use bytes::Bytes; use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage}; use common_recordbatch::DfRecordBatch as RecordBatch; use common_time::Timestamp; -use common_time::timestamp::TimeUnit; use datatypes::arrow; -use datatypes::arrow::array::{ - Array, ArrayRef, BinaryBuilder, BinaryDictionaryBuilder, DictionaryArray, StringBuilder, - StringDictionaryBuilder, TimestampMicrosecondArray, TimestampMillisecondArray, - TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt8Builder, UInt32Array, - UInt64Array, UInt64Builder, -}; -use datatypes::arrow::compute::{SortColumn, SortOptions, TakeOptions}; +use datatypes::arrow::array::{Array, ArrayRef, StringDictionaryBuilder, UInt8Array, UInt64Array}; +use datatypes::arrow::compute::{SortColumn, SortOptions}; use datatypes::arrow::datatypes::{ DataType as ArrowDataType, Field, Schema, SchemaRef, UInt32Type, }; -use datatypes::arrow_array::BinaryArray; use datatypes::data_type::DataType; -use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector}; -use datatypes::value::{Value, ValueRef}; +use datatypes::prelude::{MutableVector, Vector}; +use datatypes::value::ValueRef; use datatypes::vectors::Helper; -use mito_codec::key_values::{KeyValue, KeyValues, KeyValuesRef}; -use mito_codec::row_converter::{ - DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, build_primary_key_codec, -}; +use mito_codec::key_values::{KeyValue, KeyValues}; +use mito_codec::row_converter::PrimaryKeyCodec; use parquet::arrow::ArrowWriter; use parquet::basic::{Compression, ZstdLevel}; -use parquet::data_type::AsBytes; use parquet::file::metadata::ParquetMetaData; use parquet::file::properties::WriterProperties; use smallvec::SmallVec; -use snafu::{OptionExt, ResultExt, Snafu}; +use snafu::{OptionExt, ResultExt}; use store_api::codec::PrimaryKeyEncoding; -use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef}; +use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME; -use store_api::storage::{FileId, RegionId, SequenceNumber, SequenceRange}; -use table::predicate::Predicate; +use store_api::storage::{FileId, SequenceNumber, SequenceRange}; use crate::error::{ - self, ColumnNotFoundSnafu, ComputeArrowSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, - DataTypeMismatchSnafu, EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu, - InvalidRequestSnafu, NewRecordBatchSnafu, Result, UnexpectedSnafu, + self, ColumnNotFoundSnafu, ComputeArrowSnafu, CreateDefaultSnafu, DataTypeMismatchSnafu, + EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu, InvalidRequestSnafu, + NewRecordBatchSnafu, Result, }; use crate::memtable::bulk::context::BulkIterContextRef; use crate::memtable::bulk::part_reader::EncodedBulkPartIter; use crate::memtable::time_series::{ValueBuilder, Values}; use crate::memtable::{BoxedRecordBatchIterator, MemScanMetrics, MemtableStats}; +use crate::sst::SeriesEstimator; use crate::sst::index::IndexOutput; use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete}; use crate::sst::parquet::flat_format::primary_key_column_index; -use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat}; +use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder}; use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo}; -use crate::sst::{SeriesEstimator, to_sst_arrow_schema}; const INIT_DICT_VALUE_CAPACITY: usize = 8; @@ -527,8 +516,6 @@ impl PrimaryKeyColumnBuilder { /// Converter that converts structs into [BulkPart]. pub struct BulkPartConverter { - /// Region metadata. - region_metadata: RegionMetadataRef, /// Schema of the converted batch. schema: SchemaRef, /// Primary key codec for encoding keys @@ -577,7 +564,6 @@ impl BulkPartConverter { }; Self { - region_metadata: region_metadata.clone(), schema, primary_key_codec, key_buf: Vec::new(), @@ -1116,7 +1102,6 @@ pub struct BulkPartEncodeMetrics { pub struct BulkPartEncoder { metadata: RegionMetadataRef, - row_group_size: usize, writer_props: Option, } @@ -1141,7 +1126,6 @@ impl BulkPartEncoder { Ok(Self { metadata, - row_group_size, writer_props, }) } @@ -1182,7 +1166,6 @@ impl BulkPartEncoder { iter_start = Instant::now(); } metrics.iter_cost += iter_start.elapsed(); - iter_start = Instant::now(); if total_rows == 0 { return Ok(None); @@ -1348,11 +1331,6 @@ impl MultiBulkPart { self.batches.len() } - /// Returns an iterator over the record batches. - pub(crate) fn batches(&self) -> impl Iterator { - self.batches.iter() - } - /// Returns the estimated memory size of all batches. pub(crate) fn estimated_size(&self) -> usize { self.batches.iter().map(record_batch_estimated_size).sum() @@ -1400,19 +1378,22 @@ impl MultiBulkPart { mod tests { use api::v1::{Row, SemanticType, WriteHint}; use datafusion_common::ScalarValue; - use datatypes::arrow::array::Float64Array; + use datatypes::arrow::array::{ + BinaryArray, DictionaryArray, Float64Array, TimestampMillisecondArray, + }; + use datatypes::arrow::datatypes::UInt32Type; use datatypes::prelude::{ConcreteDataType, Value}; use datatypes::schema::ColumnSchema; + use mito_codec::row_converter::build_primary_key_codec; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; use store_api::storage::RegionId; use store_api::storage::consts::ReservedColumnId; + use table::predicate::Predicate; use super::*; use crate::memtable::bulk::context::BulkIterContext; use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema}; - use crate::test_util::memtable_util::{ - build_key_values_with_ts_seq_values, metadata_for_test, region_metadata_to_row_schema, - }; + use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test}; struct MutationInput<'a> { k0: &'a str, @@ -1422,13 +1403,6 @@ mod tests { sequence: u64, } - #[derive(Debug, PartialOrd, PartialEq)] - struct BatchOutput<'a> { - pk_values: &'a [Value], - timestamps: &'a [i64], - v1: &'a [Option], - } - fn encode(input: &[MutationInput]) -> EncodedBulkPart { let metadata = metadata_for_test(); let kvs = input @@ -1482,7 +1456,7 @@ mod tests { ]); let projection = &[4u32]; - let mut reader = part + let reader = part .read( Arc::new( BulkIterContext::new( @@ -1523,7 +1497,7 @@ mod tests { let kvs = key_values .into_iter() .map(|(k0, k1, (start, end), sequence)| { - let ts = (start..end); + let ts = start..end; let v1 = (start..end).map(|_| None); build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence) }) @@ -1553,7 +1527,7 @@ mod tests { ) .unwrap(), ); - let mut reader = part + let reader = part .read(context, None, None) .unwrap() .expect("expect at least one row group"); @@ -1626,7 +1600,7 @@ mod tests { 100, ); - /// Predicates over field column can do precise filtering. + // Predicates over field column can do precise filtering. check_prune_row_group( &part, Some(Predicate::new(vec![ diff --git a/src/mito2/src/memtable/bulk/part_reader.rs b/src/mito2/src/memtable/bulk/part_reader.rs index 1375e79542..a9caeef08c 100644 --- a/src/mito2/src/memtable/bulk/part_reader.rs +++ b/src/mito2/src/memtable/bulk/part_reader.rs @@ -17,6 +17,7 @@ use std::time::Instant; use datatypes::arrow::array::BooleanArray; use datatypes::arrow::record_batch::RecordBatch; +use mito_codec::row_converter::PrimaryKeyFilter; use parquet::arrow::ProjectionMask; use parquet::arrow::arrow_reader::ParquetRecordBatchReader; use snafu::ResultExt; @@ -29,7 +30,8 @@ use crate::memtable::bulk::row_group_reader::MemtableRowGroupReaderBuilder; use crate::memtable::{MemScanMetrics, MemScanMetricsData}; use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED}; use crate::sst::parquet::file_range::{PreFilterMode, TagDecodeState}; -use crate::sst::parquet::flat_format::sequence_column_index; +use crate::sst::parquet::flat_format::{primary_key_column_index, sequence_column_index}; +use crate::sst::parquet::prefilter::{CachedPrimaryKeyFilter, prefilter_flat_batch_by_primary_key}; /// Iterator for reading data inside a bulk part. pub struct EncodedBulkPartIter { @@ -41,6 +43,8 @@ pub struct EncodedBulkPartIter { sequence: Option, /// Cached skip_fields for current row group. current_skip_fields: bool, + /// Primary key filter for prefiltering before convert_batch. + pk_filter: Option, /// Metrics for this iterator. metrics: MemScanMetricsData, /// Optional memory scan metrics to report to. @@ -69,6 +73,9 @@ impl EncodedBulkPartIter { let builder = MemtableRowGroupReaderBuilder::try_new(&context, projection_mask, parquet_meta, data)?; + // Build PK filter if applicable (flat format with dictionary-encoded PKs). + let pk_filter = context.build_pk_filter(); + let (init_reader, current_skip_fields) = match row_groups_to_read.pop_front() { Some(first_row_group) => { let skip_fields = builder.compute_skip_fields(&context, first_row_group); @@ -85,6 +92,7 @@ impl EncodedBulkPartIter { builder, sequence, current_skip_fields, + pk_filter, metrics: MemScanMetricsData { total_series: series_count, ..Default::default() @@ -116,6 +124,10 @@ impl EncodedBulkPartIter { &self.sequence, batch, self.current_skip_fields, + self.pk_filter + .as_mut() + .map(|f| f as &mut dyn PrimaryKeyFilter), + &mut self.metrics, )? { // Update metrics self.metrics.num_batches += 1; @@ -142,6 +154,10 @@ impl EncodedBulkPartIter { &self.sequence, batch, self.current_skip_fields, + self.pk_filter + .as_mut() + .map(|f| f as &mut dyn PrimaryKeyFilter), + &mut self.metrics, )? { // Update metrics self.metrics.num_batches += 1; @@ -175,12 +191,14 @@ impl Iterator for EncodedBulkPartIter { impl Drop for EncodedBulkPartIter { fn drop(&mut self) { common_telemetry::debug!( - "EncodedBulkPartIter region: {}, metrics: total_series={}, num_rows={}, num_batches={}, scan_cost={:?}", + "EncodedBulkPartIter region: {}, metrics: total_series={}, num_rows={}, num_batches={}, scan_cost={:?}, prefilter_cost={:?}, prefilter_rows_filtered={}", self.context.region_id(), self.metrics.total_series, self.metrics.num_rows, self.metrics.num_batches, - self.metrics.scan_cost + self.metrics.scan_cost, + self.metrics.prefilter_cost, + self.metrics.prefilter_rows_filtered ); // Report MemScanMetrics if not already reported @@ -205,6 +223,8 @@ pub struct BulkPartBatchIter { context: BulkIterContextRef, /// Sequence number filter. sequence: Option, + /// Primary key filter for prefiltering before convert_batch. + pk_filter: Option, /// Metrics for this iterator. metrics: MemScanMetricsData, /// Optional memory scan metrics to report to. @@ -222,10 +242,13 @@ impl BulkPartBatchIter { ) -> Self { assert!(context.read_format().as_flat().is_some()); + let pk_filter = context.build_pk_filter(); + Self { batches: VecDeque::from(batches), context, sequence, + pk_filter, metrics: MemScanMetricsData { total_series: series_count, ..Default::default() @@ -282,8 +305,16 @@ impl BulkPartBatchIter { PreFilterMode::SkipFieldsOnDelete => true, }; - let Some(filtered_batch) = - apply_combined_filters(&self.context, &self.sequence, projected_batch, skip_fields)? + let Some(filtered_batch) = apply_combined_filters( + &self.context, + &self.sequence, + projected_batch, + skip_fields, + self.pk_filter + .as_mut() + .map(|f| f as &mut dyn PrimaryKeyFilter), + &mut self.metrics, + )? else { self.metrics.scan_cost += start.elapsed(); return Ok(None); @@ -323,12 +354,14 @@ impl Iterator for BulkPartBatchIter { impl Drop for BulkPartBatchIter { fn drop(&mut self) { common_telemetry::debug!( - "BulkPartBatchIter region: {}, metrics: total_series={}, num_rows={}, num_batches={}, scan_cost={:?}", + "BulkPartBatchIter region: {}, metrics: total_series={}, num_rows={}, num_batches={}, scan_cost={:?}, prefilter_cost={:?}, prefilter_rows_filtered={}", self.context.region_id(), self.metrics.total_series, self.metrics.num_rows, self.metrics.num_batches, - self.metrics.scan_cost + self.metrics.scan_cost, + self.metrics.prefilter_cost, + self.metrics.prefilter_rows_filtered ); // Report MemScanMetrics if not already reported @@ -353,8 +386,32 @@ fn apply_combined_filters( sequence: &Option, record_batch: RecordBatch, skip_fields: bool, + pk_filter: Option<&mut dyn PrimaryKeyFilter>, + metrics: &mut MemScanMetricsData, ) -> error::Result> { - // Converts the format to the flat format first. + // Apply PK prefilter on raw batch before convert_batch to reduce conversion overhead. + let has_pk_prefilter = pk_filter.is_some(); + let record_batch = if let Some(pk_filter) = pk_filter { + let rows_before = record_batch.num_rows(); + let prefilter_start = Instant::now(); + let pk_col_idx = primary_key_column_index(record_batch.num_columns()); + match prefilter_flat_batch_by_primary_key(record_batch, pk_col_idx, pk_filter)? { + Some(batch) => { + metrics.prefilter_cost += prefilter_start.elapsed(); + metrics.prefilter_rows_filtered += rows_before - batch.num_rows(); + batch + } + None => { + metrics.prefilter_cost += prefilter_start.elapsed(); + metrics.prefilter_rows_filtered += rows_before; + return Ok(None); + } + } + } else { + record_batch + }; + + // Converts the format to the flat format. let format = context.read_format().as_flat().unwrap(); let record_batch = format.convert_batch(record_batch, None)?; @@ -362,12 +419,12 @@ fn apply_combined_filters( let mut combined_filter = None; let mut tag_decode_state = TagDecodeState::new(); - // First, apply predicate filters using the shared method. + // Apply predicate filters using the shared method. if !context.base.filters.is_empty() { let predicate_mask = context.base.compute_filter_mask_flat( &record_batch, skip_fields, - false, + has_pk_prefilter, &mut tag_decode_state, )?; // If predicate filters out the entire batch, return None early @@ -433,6 +490,7 @@ mod tests { use super::*; use crate::memtable::bulk::context::BulkIterContext; + use crate::test_util::sst_util::new_primary_key; #[test] fn test_bulk_part_batch_iter() { @@ -461,9 +519,16 @@ mod tests { vec![1000, 2000, 3000], )); - // Create primary key dictionary array + // Create primary key dictionary array with properly encoded PKs use datatypes::arrow::array::{BinaryArray, DictionaryArray, UInt32Array}; - let values = Arc::new(BinaryArray::from_iter_values([b"key1", b"key2", b"key3"])); + let pk1 = new_primary_key(&["key1"]); + let pk2 = new_primary_key(&["key2"]); + let pk3 = new_primary_key(&["key3"]); + let values = Arc::new(BinaryArray::from_iter_values([ + pk1.as_slice(), + pk2.as_slice(), + pk3.as_slice(), + ])); let keys = UInt32Array::from(vec![0, 1, 2]); let primary_key = Arc::new(DictionaryArray::new(keys, values)); @@ -596,12 +661,17 @@ mod tests { ])); // Create first batch with 2 rows + let pk1 = new_primary_key(&["key1"]); + let pk2 = new_primary_key(&["key2"]); let key1_1 = Arc::new(StringArray::from_iter_values(["key1", "key2"])); let field1_1 = Arc::new(Int64Array::from(vec![11, 12])); let timestamp_1 = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from( vec![1000, 2000], )); - let values_1 = Arc::new(BinaryArray::from_iter_values([b"key1", b"key2"])); + let values_1 = Arc::new(BinaryArray::from_iter_values([ + pk1.as_slice(), + pk2.as_slice(), + ])); let keys_1 = UInt32Array::from(vec![0, 1]); let primary_key_1 = Arc::new(DictionaryArray::new(keys_1, values_1)); let sequence_1 = Arc::new(UInt64Array::from(vec![1, 2])); @@ -621,12 +691,19 @@ mod tests { .unwrap(); // Create second batch with 3 rows + let pk3 = new_primary_key(&["key3"]); + let pk4 = new_primary_key(&["key4"]); + let pk5 = new_primary_key(&["key5"]); let key1_2 = Arc::new(StringArray::from_iter_values(["key3", "key4", "key5"])); let field1_2 = Arc::new(Int64Array::from(vec![13, 14, 15])); let timestamp_2 = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from( vec![3000, 4000, 5000], )); - let values_2 = Arc::new(BinaryArray::from_iter_values([b"key3", b"key4", b"key5"])); + let values_2 = Arc::new(BinaryArray::from_iter_values([ + pk3.as_slice(), + pk4.as_slice(), + pk5.as_slice(), + ])); let keys_2 = UInt32Array::from(vec![0, 1, 2]); let primary_key_2 = Arc::new(DictionaryArray::new(keys_2, values_2)); let sequence_2 = Arc::new(UInt64Array::from(vec![3, 4, 5])); diff --git a/src/mito2/src/memtable/partition_tree/tree.rs b/src/mito2/src/memtable/partition_tree/tree.rs index 17977db56a..f5863ae0c8 100644 --- a/src/mito2/src/memtable/partition_tree/tree.rs +++ b/src/mito2/src/memtable/partition_tree/tree.rs @@ -490,6 +490,7 @@ impl TreeIter { num_rows: self.metrics.rows_fetched, num_batches: self.metrics.batches_fetched, scan_cost: self.metrics.iter_elapsed, + ..Default::default() }; mem_scan_metrics.merge_inner(&inner); } diff --git a/src/mito2/src/memtable/simple_bulk_memtable.rs b/src/mito2/src/memtable/simple_bulk_memtable.rs index 6d91f00361..1284741347 100644 --- a/src/mito2/src/memtable/simple_bulk_memtable.rs +++ b/src/mito2/src/memtable/simple_bulk_memtable.rs @@ -372,6 +372,7 @@ impl IterBuilder for BatchRangeBuilder { num_rows: batch.num_rows(), num_batches: 1, scan_cost: self.scan_cost, + ..Default::default() }; metrics.merge_inner(&inner); } diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index d3d00d0703..9666bee51c 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -567,6 +567,7 @@ impl Iter { num_rows: self.metrics.num_rows, num_batches: self.metrics.num_batches, scan_cost: self.metrics.scan_cost, + ..Default::default() }; mem_scan_metrics.merge_inner(&inner); } diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 9bf1c17276..d065657242 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -138,6 +138,10 @@ pub(crate) struct ScanMetricsSet { mem_batches: usize, /// Number of series read from memtables. mem_series: usize, + /// Duration of prefilter in memtable scan. + mem_prefilter_cost: Duration, + /// Number of rows filtered by prefilter in memtable scan. + mem_prefilter_rows_filtered: usize, // SST related metrics: /// Duration to build file ranges. @@ -341,6 +345,8 @@ impl fmt::Debug for ScanMetricsSet { mem_rows, mem_batches, mem_series, + mem_prefilter_cost, + mem_prefilter_rows_filtered, inverted_index_apply_metrics, bloom_filter_apply_metrics, fulltext_index_apply_metrics, @@ -509,6 +515,15 @@ impl fmt::Debug for ScanMetricsSet { if !mem_scan_cost.is_zero() { write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?; } + if !mem_prefilter_cost.is_zero() { + write!(f, ", \"mem_prefilter_cost\":\"{mem_prefilter_cost:?}\"")?; + } + if *mem_prefilter_rows_filtered > 0 { + write!( + f, + ", \"mem_prefilter_rows_filtered\":{mem_prefilter_rows_filtered}" + )?; + } // Write optional verbose metrics if they are not empty if let Some(metrics) = inverted_index_apply_metrics @@ -1061,6 +1076,8 @@ impl PartitionMetrics { metrics.mem_rows += data.num_rows; metrics.mem_batches += data.num_batches; metrics.mem_series += data.total_series; + metrics.mem_prefilter_cost += data.prefilter_cost; + metrics.mem_prefilter_rows_filtered += data.prefilter_rows_filtered; } /// Merges [ScannerMetrics], `build_reader_cost`, `scan_cost` and `yield_cost`. diff --git a/src/mito2/src/sst/parquet/prefilter.rs b/src/mito2/src/sst/parquet/prefilter.rs index 88df56e401..967ddd491b 100644 --- a/src/mito2/src/sst/parquet/prefilter.rs +++ b/src/mito2/src/sst/parquet/prefilter.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use api::v1::SemanticType; use common_recordbatch::filter::SimpleFilterEvaluator; -use datatypes::arrow::array::BinaryArray; +use datatypes::arrow::array::{BinaryArray, BooleanArray, BooleanBufferBuilder}; use datatypes::arrow::record_batch::RecordBatch; use futures::StreamExt; use mito_codec::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter}; @@ -33,7 +33,7 @@ use parquet::schema::types::SchemaDescriptor; use snafu::{OptionExt, ResultExt}; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; -use crate::error::{DecodeSnafu, ReadParquetSnafu, Result, UnexpectedSnafu}; +use crate::error::{ComputeArrowSnafu, DecodeSnafu, ReadParquetSnafu, Result, UnexpectedSnafu}; use crate::sst::parquet::flat_format::primary_key_column_index; use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat}; use crate::sst::parquet::reader::{RowGroupBuildContext, RowGroupReaderBuilder}; @@ -93,6 +93,55 @@ pub(crate) fn matching_row_ranges_by_primary_key( Ok(matched_row_ranges) } +/// Filters a flat-format record batch by primary key, returning only rows whose +/// primary key matches the filter. Returns `None` if all rows are filtered out. +pub(crate) fn prefilter_flat_batch_by_primary_key( + input: RecordBatch, + pk_column_index: usize, + pk_filter: &mut dyn PrimaryKeyFilter, +) -> Result> { + if input.num_rows() == 0 { + return Ok(Some(input)); + } + + let matched_row_ranges = + matching_row_ranges_by_primary_key(&input, pk_column_index, pk_filter)?; + if matched_row_ranges.is_empty() { + return Ok(None); + } + + if matched_row_ranges.len() == 1 + && matched_row_ranges[0].start == 0 + && matched_row_ranges[0].end == input.num_rows() + { + return Ok(Some(input)); + } + + if matched_row_ranges.len() == 1 { + let span = &matched_row_ranges[0]; + return Ok(Some(input.slice(span.start, span.end - span.start))); + } + + let mut builder = BooleanBufferBuilder::new(input.num_rows()); + builder.append_n(input.num_rows(), false); + for span in matched_row_ranges { + for i in span { + builder.set_bit(i, true); + } + } + + let filtered = datatypes::arrow::compute::filter_record_batch( + &input, + &BooleanArray::new(builder.finish(), None), + ) + .context(ComputeArrowSnafu)?; + if filtered.num_rows() == 0 { + Ok(None) + } else { + Ok(Some(filtered)) + } +} + /// Returns whether a filter can be applied by parquet primary-key prefiltering. /// /// Unlike `PartitionTreeMemtable`, parquet prefilter always supports predicates @@ -346,12 +395,19 @@ mod tests { use common_recordbatch::filter::SimpleFilterEvaluator; use datafusion_expr::{col, lit}; - use mito_codec::row_converter::PrimaryKeyFilter; + use datatypes::arrow::array::{ + ArrayRef, DictionaryArray, TimestampMillisecondArray, UInt8Array, UInt32Array, UInt64Array, + }; + use datatypes::arrow::datatypes::{Schema, UInt32Type}; + use mito_codec::row_converter::{PrimaryKeyFilter, build_primary_key_codec}; use store_api::codec::PrimaryKeyEncoding; use super::*; + use crate::sst::internal_fields; use crate::sst::parquet::format::ReadFormat; - use crate::test_util::sst_util::{new_primary_key, sst_region_metadata_with_encoding}; + use crate::test_util::sst_util::{ + new_primary_key, sst_region_metadata, sst_region_metadata_with_encoding, + }; #[test] fn test_is_usable_primary_key_filter_skips_legacy_primary_key_batches() { @@ -416,4 +472,125 @@ mod tests { assert_eq!(hits.load(Ordering::Relaxed), 2); } + + fn new_test_filters(exprs: &[datafusion_expr::Expr]) -> Vec { + exprs + .iter() + .filter_map(SimpleFilterEvaluator::try_new) + .collect() + } + + fn new_raw_batch(primary_keys: &[&[u8]], field_values: &[u64]) -> RecordBatch { + assert_eq!(primary_keys.len(), field_values.len()); + + let metadata = Arc::new(sst_region_metadata()); + let arrow_schema = metadata.schema.arrow_schema(); + let field_column = arrow_schema + .field(arrow_schema.index_of("field_0").unwrap()) + .clone(); + let time_index_column = arrow_schema + .field(arrow_schema.index_of("ts").unwrap()) + .clone(); + let mut fields = vec![field_column, time_index_column]; + fields.extend( + internal_fields() + .into_iter() + .map(|field| field.as_ref().clone()), + ); + let schema = Arc::new(Schema::new(fields)); + + let mut dict_values = Vec::new(); + let mut keys = Vec::with_capacity(primary_keys.len()); + for pk in primary_keys { + let key = dict_values + .iter() + .position(|existing: &&[u8]| existing == pk) + .unwrap_or_else(|| { + dict_values.push(*pk); + dict_values.len() - 1 + }); + keys.push(key as u32); + } + let pk_array: ArrayRef = Arc::new(DictionaryArray::::new( + UInt32Array::from(keys), + Arc::new(BinaryArray::from_iter_values(dict_values.iter().copied())), + )); + + RecordBatch::try_new( + schema, + vec![ + Arc::new(UInt64Array::from(field_values.to_vec())), + Arc::new(TimestampMillisecondArray::from_iter_values( + 0..primary_keys.len() as i64, + )), + pk_array, + Arc::new(UInt64Array::from(vec![1; primary_keys.len()])), + Arc::new(UInt8Array::from(vec![1; primary_keys.len()])), + ], + ) + .unwrap() + } + + fn field_values(batch: &RecordBatch) -> Vec { + batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .to_vec() + } + + #[test] + fn test_prefilter_primary_key_drops_single_dictionary_batch() { + let metadata = Arc::new(sst_region_metadata()); + let filters = Arc::new(new_test_filters(&[col("tag_0").eq(lit("b"))])); + let mut primary_key_filter = build_primary_key_codec(metadata.as_ref()) + .primary_key_filter(&metadata, filters, false); + let pk_a = new_primary_key(&["a", "x"]); + let batch = new_raw_batch(&[pk_a.as_slice(), pk_a.as_slice()], &[10, 11]); + let pk_col_idx = primary_key_column_index(batch.num_columns()); + + let filtered = + prefilter_flat_batch_by_primary_key(batch, pk_col_idx, primary_key_filter.as_mut()) + .unwrap(); + + assert!(filtered.is_none()); + } + + #[test] + fn test_prefilter_primary_key_builds_mask_for_fragmented_matches() { + let metadata = Arc::new(sst_region_metadata()); + let filters = Arc::new(new_test_filters(&[col("tag_0") + .eq(lit("a")) + .or(col("tag_0").eq(lit("c")))])); + let mut primary_key_filter = build_primary_key_codec(metadata.as_ref()) + .primary_key_filter(&metadata, filters, false); + let pk_a = new_primary_key(&["a", "x"]); + let pk_b = new_primary_key(&["b", "x"]); + let pk_c = new_primary_key(&["c", "x"]); + let pk_d = new_primary_key(&["d", "x"]); + let batch = new_raw_batch( + &[ + pk_a.as_slice(), + pk_a.as_slice(), + pk_b.as_slice(), + pk_b.as_slice(), + pk_c.as_slice(), + pk_c.as_slice(), + pk_d.as_slice(), + pk_d.as_slice(), + ], + &[10, 11, 12, 13, 14, 15, 16, 17], + ); + let pk_col_idx = primary_key_column_index(batch.num_columns()); + + let filtered = + prefilter_flat_batch_by_primary_key(batch, pk_col_idx, primary_key_filter.as_mut()) + .unwrap() + .unwrap(); + + assert_eq!(filtered.num_rows(), 4); + assert_eq!(field_values(&filtered), vec![10, 11, 14, 15]); + } }