feat: implement prefilter for bulk memtable (#7895)

* feat: prefilter in memtable

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: fmt code

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: bulk part reader also do prefilter

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: extract pk filters check

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: scanbench support explain verbose

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: add metrics for mem prefilter

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: address review comment

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: remove dead code

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2026-04-01 17:02:54 +08:00
committed by GitHub
parent 2b4e12c358
commit b75a112561
11 changed files with 377 additions and 77 deletions

View File

@@ -677,7 +677,9 @@ impl ScanbenchCommand {
// Scan all partitions
let num_partitions = scanner.properties().partitions.len();
let ctx = QueryScanContext::default();
let ctx = QueryScanContext {
explain_verbose: self.verbose,
};
let metrics_set = ExecutionPlanMetricsSet::new();
let mut scan_futures = FuturesUnordered::new();

View File

@@ -497,6 +497,8 @@ impl MemScanMetrics {
metrics.num_rows += inner.num_rows;
metrics.num_batches += inner.num_batches;
metrics.scan_cost += inner.scan_cost;
metrics.prefilter_cost += inner.prefilter_cost;
metrics.prefilter_rows_filtered += inner.prefilter_rows_filtered;
}
/// Gets the metrics data.
@@ -515,6 +517,10 @@ pub(crate) struct MemScanMetricsData {
pub(crate) num_batches: usize,
/// Duration to scan the memtable.
pub(crate) scan_cost: Duration,
/// Duration of prefilter in memtable scan.
pub(crate) prefilter_cost: Duration,
/// Number of rows filtered by prefilter in memtable scan.
pub(crate) prefilter_rows_filtered: usize,
}
/// Encoded range in the memtable.

View File

@@ -15,9 +15,7 @@
//! Memtable implementation for bulk load
pub(crate) mod chunk_reader;
#[allow(unused)]
pub mod context;
#[allow(unused)]
pub mod part;
pub mod part_reader;
mod row_group_reader;

View File

@@ -17,7 +17,8 @@
use std::collections::VecDeque;
use std::sync::Arc;
use mito_codec::row_converter::{DensePrimaryKeyCodec, build_primary_key_codec};
use common_recordbatch::filter::SimpleFilterEvaluator;
use mito_codec::row_converter::build_primary_key_codec;
use parquet::file::metadata::ParquetMetaData;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
@@ -25,8 +26,8 @@ use table::predicate::Predicate;
use crate::error::Result;
use crate::sst::parquet::file_range::{PreFilterMode, RangeBase};
use crate::sst::parquet::flat_format::FlatReadFormat;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::prefilter::CachedPrimaryKeyFilter;
use crate::sst::parquet::reader::SimpleFilterContext;
use crate::sst::parquet::stats::RowGroupPruningStats;
@@ -35,6 +36,9 @@ pub(crate) type BulkIterContextRef = Arc<BulkIterContext>;
pub struct BulkIterContext {
pub(crate) base: RangeBase,
pub(crate) predicate: Option<Predicate>,
/// Pre-extracted primary key filters for PK prefiltering.
/// `None` if PK prefiltering is not applicable.
pk_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
}
impl BulkIterContext {
@@ -62,7 +66,7 @@ impl BulkIterContext {
) -> Result<Self> {
let codec = build_primary_key_codec(&region_metadata);
let simple_filters = predicate
let simple_filters: Vec<SimpleFilterContext> = predicate
.as_ref()
.iter()
.flat_map(|predicate| {
@@ -87,6 +91,9 @@ impl BulkIterContext {
.map(|pred| pred.dyn_filters().as_ref().clone())
.unwrap_or_default();
// Pre-extract PK filters if applicable.
let pk_filters = Self::extract_pk_filters(&read_format, &simple_filters);
Ok(Self {
base: RangeBase {
filters: simple_filters,
@@ -102,6 +109,7 @@ impl BulkIterContext {
partition_filter: None,
},
predicate,
pk_filters,
})
}
@@ -133,6 +141,44 @@ impl BulkIterContext {
}
}
/// Extracts PK filters if flat format with dictionary-encoded PKs is used.
fn extract_pk_filters(
read_format: &ReadFormat,
filters: &[SimpleFilterContext],
) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
let flat_format = read_format.as_flat()?;
if flat_format.batch_has_raw_pk_columns() {
return None;
}
let metadata = read_format.metadata();
if metadata.primary_key.is_empty() {
return None;
}
let pk_filters: Vec<_> = filters
.iter()
.filter_map(|f| f.primary_key_prefilter())
.collect();
if pk_filters.is_empty() {
return None;
}
Some(Arc::new(pk_filters))
}
/// Builds a fresh PK filter for a new iterator. Returns `None` if PK
/// prefiltering is not applicable.
pub(crate) fn build_pk_filter(&self) -> Option<CachedPrimaryKeyFilter> {
let pk_filters = self.pk_filters.as_ref()?;
let metadata = self.base.read_format.metadata();
// Parquet PK prefilter always supports the partition column.
let inner = self
.base
.codec
.primary_key_filter(metadata, Arc::clone(pk_filters), false);
Some(CachedPrimaryKeyFilter::new(inner))
}
pub(crate) fn read_format(&self) -> &ReadFormat {
&self.base.read_format
}

View File

@@ -14,66 +14,55 @@
//! Bulk part encoder/decoder.
use std::collections::{HashMap, HashSet, VecDeque};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, Instant};
use api::helper::{ColumnDataTypeWrapper, to_grpc_value};
use api::v1::bulk_wal_entry::Body;
use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType, bulk_wal_entry};
use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType};
use bytes::Bytes;
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
use common_recordbatch::DfRecordBatch as RecordBatch;
use common_time::Timestamp;
use common_time::timestamp::TimeUnit;
use datatypes::arrow;
use datatypes::arrow::array::{
Array, ArrayRef, BinaryBuilder, BinaryDictionaryBuilder, DictionaryArray, StringBuilder,
StringDictionaryBuilder, TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt8Builder, UInt32Array,
UInt64Array, UInt64Builder,
};
use datatypes::arrow::compute::{SortColumn, SortOptions, TakeOptions};
use datatypes::arrow::array::{Array, ArrayRef, StringDictionaryBuilder, UInt8Array, UInt64Array};
use datatypes::arrow::compute::{SortColumn, SortOptions};
use datatypes::arrow::datatypes::{
DataType as ArrowDataType, Field, Schema, SchemaRef, UInt32Type,
};
use datatypes::arrow_array::BinaryArray;
use datatypes::data_type::DataType;
use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
use datatypes::value::{Value, ValueRef};
use datatypes::prelude::{MutableVector, Vector};
use datatypes::value::ValueRef;
use datatypes::vectors::Helper;
use mito_codec::key_values::{KeyValue, KeyValues, KeyValuesRef};
use mito_codec::row_converter::{
DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, build_primary_key_codec,
};
use mito_codec::key_values::{KeyValue, KeyValues};
use mito_codec::row_converter::PrimaryKeyCodec;
use parquet::arrow::ArrowWriter;
use parquet::basic::{Compression, ZstdLevel};
use parquet::data_type::AsBytes;
use parquet::file::metadata::ParquetMetaData;
use parquet::file::properties::WriterProperties;
use smallvec::SmallVec;
use snafu::{OptionExt, ResultExt, Snafu};
use snafu::{OptionExt, ResultExt};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
use store_api::storage::{FileId, RegionId, SequenceNumber, SequenceRange};
use table::predicate::Predicate;
use store_api::storage::{FileId, SequenceNumber, SequenceRange};
use crate::error::{
self, ColumnNotFoundSnafu, ComputeArrowSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu,
DataTypeMismatchSnafu, EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu,
InvalidRequestSnafu, NewRecordBatchSnafu, Result, UnexpectedSnafu,
self, ColumnNotFoundSnafu, ComputeArrowSnafu, CreateDefaultSnafu, DataTypeMismatchSnafu,
EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu, InvalidRequestSnafu,
NewRecordBatchSnafu, Result,
};
use crate::memtable::bulk::context::BulkIterContextRef;
use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
use crate::memtable::time_series::{ValueBuilder, Values};
use crate::memtable::{BoxedRecordBatchIterator, MemScanMetrics, MemtableStats};
use crate::sst::SeriesEstimator;
use crate::sst::index::IndexOutput;
use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete};
use crate::sst::parquet::flat_format::primary_key_column_index;
use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat};
use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder};
use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
use crate::sst::{SeriesEstimator, to_sst_arrow_schema};
const INIT_DICT_VALUE_CAPACITY: usize = 8;
@@ -527,8 +516,6 @@ impl PrimaryKeyColumnBuilder {
/// Converter that converts structs into [BulkPart].
pub struct BulkPartConverter {
/// Region metadata.
region_metadata: RegionMetadataRef,
/// Schema of the converted batch.
schema: SchemaRef,
/// Primary key codec for encoding keys
@@ -577,7 +564,6 @@ impl BulkPartConverter {
};
Self {
region_metadata: region_metadata.clone(),
schema,
primary_key_codec,
key_buf: Vec::new(),
@@ -1116,7 +1102,6 @@ pub struct BulkPartEncodeMetrics {
pub struct BulkPartEncoder {
metadata: RegionMetadataRef,
row_group_size: usize,
writer_props: Option<WriterProperties>,
}
@@ -1141,7 +1126,6 @@ impl BulkPartEncoder {
Ok(Self {
metadata,
row_group_size,
writer_props,
})
}
@@ -1182,7 +1166,6 @@ impl BulkPartEncoder {
iter_start = Instant::now();
}
metrics.iter_cost += iter_start.elapsed();
iter_start = Instant::now();
if total_rows == 0 {
return Ok(None);
@@ -1348,11 +1331,6 @@ impl MultiBulkPart {
self.batches.len()
}
/// Returns an iterator over the record batches.
pub(crate) fn batches(&self) -> impl Iterator<Item = &RecordBatch> {
self.batches.iter()
}
/// Returns the estimated memory size of all batches.
pub(crate) fn estimated_size(&self) -> usize {
self.batches.iter().map(record_batch_estimated_size).sum()
@@ -1400,19 +1378,22 @@ impl MultiBulkPart {
mod tests {
use api::v1::{Row, SemanticType, WriteHint};
use datafusion_common::ScalarValue;
use datatypes::arrow::array::Float64Array;
use datatypes::arrow::array::{
BinaryArray, DictionaryArray, Float64Array, TimestampMillisecondArray,
};
use datatypes::arrow::datatypes::UInt32Type;
use datatypes::prelude::{ConcreteDataType, Value};
use datatypes::schema::ColumnSchema;
use mito_codec::row_converter::build_primary_key_codec;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
use store_api::storage::consts::ReservedColumnId;
use table::predicate::Predicate;
use super::*;
use crate::memtable::bulk::context::BulkIterContext;
use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
use crate::test_util::memtable_util::{
build_key_values_with_ts_seq_values, metadata_for_test, region_metadata_to_row_schema,
};
use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
struct MutationInput<'a> {
k0: &'a str,
@@ -1422,13 +1403,6 @@ mod tests {
sequence: u64,
}
#[derive(Debug, PartialOrd, PartialEq)]
struct BatchOutput<'a> {
pk_values: &'a [Value],
timestamps: &'a [i64],
v1: &'a [Option<f64>],
}
fn encode(input: &[MutationInput]) -> EncodedBulkPart {
let metadata = metadata_for_test();
let kvs = input
@@ -1482,7 +1456,7 @@ mod tests {
]);
let projection = &[4u32];
let mut reader = part
let reader = part
.read(
Arc::new(
BulkIterContext::new(
@@ -1523,7 +1497,7 @@ mod tests {
let kvs = key_values
.into_iter()
.map(|(k0, k1, (start, end), sequence)| {
let ts = (start..end);
let ts = start..end;
let v1 = (start..end).map(|_| None);
build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
})
@@ -1553,7 +1527,7 @@ mod tests {
)
.unwrap(),
);
let mut reader = part
let reader = part
.read(context, None, None)
.unwrap()
.expect("expect at least one row group");
@@ -1626,7 +1600,7 @@ mod tests {
100,
);
/// Predicates over field column can do precise filtering.
// Predicates over field column can do precise filtering.
check_prune_row_group(
&part,
Some(Predicate::new(vec![

View File

@@ -17,6 +17,7 @@ use std::time::Instant;
use datatypes::arrow::array::BooleanArray;
use datatypes::arrow::record_batch::RecordBatch;
use mito_codec::row_converter::PrimaryKeyFilter;
use parquet::arrow::ProjectionMask;
use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
use snafu::ResultExt;
@@ -29,7 +30,8 @@ use crate::memtable::bulk::row_group_reader::MemtableRowGroupReaderBuilder;
use crate::memtable::{MemScanMetrics, MemScanMetricsData};
use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
use crate::sst::parquet::file_range::{PreFilterMode, TagDecodeState};
use crate::sst::parquet::flat_format::sequence_column_index;
use crate::sst::parquet::flat_format::{primary_key_column_index, sequence_column_index};
use crate::sst::parquet::prefilter::{CachedPrimaryKeyFilter, prefilter_flat_batch_by_primary_key};
/// Iterator for reading data inside a bulk part.
pub struct EncodedBulkPartIter {
@@ -41,6 +43,8 @@ pub struct EncodedBulkPartIter {
sequence: Option<SequenceRange>,
/// Cached skip_fields for current row group.
current_skip_fields: bool,
/// Primary key filter for prefiltering before convert_batch.
pk_filter: Option<CachedPrimaryKeyFilter>,
/// Metrics for this iterator.
metrics: MemScanMetricsData,
/// Optional memory scan metrics to report to.
@@ -69,6 +73,9 @@ impl EncodedBulkPartIter {
let builder =
MemtableRowGroupReaderBuilder::try_new(&context, projection_mask, parquet_meta, data)?;
// Build PK filter if applicable (flat format with dictionary-encoded PKs).
let pk_filter = context.build_pk_filter();
let (init_reader, current_skip_fields) = match row_groups_to_read.pop_front() {
Some(first_row_group) => {
let skip_fields = builder.compute_skip_fields(&context, first_row_group);
@@ -85,6 +92,7 @@ impl EncodedBulkPartIter {
builder,
sequence,
current_skip_fields,
pk_filter,
metrics: MemScanMetricsData {
total_series: series_count,
..Default::default()
@@ -116,6 +124,10 @@ impl EncodedBulkPartIter {
&self.sequence,
batch,
self.current_skip_fields,
self.pk_filter
.as_mut()
.map(|f| f as &mut dyn PrimaryKeyFilter),
&mut self.metrics,
)? {
// Update metrics
self.metrics.num_batches += 1;
@@ -142,6 +154,10 @@ impl EncodedBulkPartIter {
&self.sequence,
batch,
self.current_skip_fields,
self.pk_filter
.as_mut()
.map(|f| f as &mut dyn PrimaryKeyFilter),
&mut self.metrics,
)? {
// Update metrics
self.metrics.num_batches += 1;
@@ -175,12 +191,14 @@ impl Iterator for EncodedBulkPartIter {
impl Drop for EncodedBulkPartIter {
fn drop(&mut self) {
common_telemetry::debug!(
"EncodedBulkPartIter region: {}, metrics: total_series={}, num_rows={}, num_batches={}, scan_cost={:?}",
"EncodedBulkPartIter region: {}, metrics: total_series={}, num_rows={}, num_batches={}, scan_cost={:?}, prefilter_cost={:?}, prefilter_rows_filtered={}",
self.context.region_id(),
self.metrics.total_series,
self.metrics.num_rows,
self.metrics.num_batches,
self.metrics.scan_cost
self.metrics.scan_cost,
self.metrics.prefilter_cost,
self.metrics.prefilter_rows_filtered
);
// Report MemScanMetrics if not already reported
@@ -205,6 +223,8 @@ pub struct BulkPartBatchIter {
context: BulkIterContextRef,
/// Sequence number filter.
sequence: Option<SequenceRange>,
/// Primary key filter for prefiltering before convert_batch.
pk_filter: Option<CachedPrimaryKeyFilter>,
/// Metrics for this iterator.
metrics: MemScanMetricsData,
/// Optional memory scan metrics to report to.
@@ -222,10 +242,13 @@ impl BulkPartBatchIter {
) -> Self {
assert!(context.read_format().as_flat().is_some());
let pk_filter = context.build_pk_filter();
Self {
batches: VecDeque::from(batches),
context,
sequence,
pk_filter,
metrics: MemScanMetricsData {
total_series: series_count,
..Default::default()
@@ -282,8 +305,16 @@ impl BulkPartBatchIter {
PreFilterMode::SkipFieldsOnDelete => true,
};
let Some(filtered_batch) =
apply_combined_filters(&self.context, &self.sequence, projected_batch, skip_fields)?
let Some(filtered_batch) = apply_combined_filters(
&self.context,
&self.sequence,
projected_batch,
skip_fields,
self.pk_filter
.as_mut()
.map(|f| f as &mut dyn PrimaryKeyFilter),
&mut self.metrics,
)?
else {
self.metrics.scan_cost += start.elapsed();
return Ok(None);
@@ -323,12 +354,14 @@ impl Iterator for BulkPartBatchIter {
impl Drop for BulkPartBatchIter {
fn drop(&mut self) {
common_telemetry::debug!(
"BulkPartBatchIter region: {}, metrics: total_series={}, num_rows={}, num_batches={}, scan_cost={:?}",
"BulkPartBatchIter region: {}, metrics: total_series={}, num_rows={}, num_batches={}, scan_cost={:?}, prefilter_cost={:?}, prefilter_rows_filtered={}",
self.context.region_id(),
self.metrics.total_series,
self.metrics.num_rows,
self.metrics.num_batches,
self.metrics.scan_cost
self.metrics.scan_cost,
self.metrics.prefilter_cost,
self.metrics.prefilter_rows_filtered
);
// Report MemScanMetrics if not already reported
@@ -353,8 +386,32 @@ fn apply_combined_filters(
sequence: &Option<SequenceRange>,
record_batch: RecordBatch,
skip_fields: bool,
pk_filter: Option<&mut dyn PrimaryKeyFilter>,
metrics: &mut MemScanMetricsData,
) -> error::Result<Option<RecordBatch>> {
// Converts the format to the flat format first.
// Apply PK prefilter on raw batch before convert_batch to reduce conversion overhead.
let has_pk_prefilter = pk_filter.is_some();
let record_batch = if let Some(pk_filter) = pk_filter {
let rows_before = record_batch.num_rows();
let prefilter_start = Instant::now();
let pk_col_idx = primary_key_column_index(record_batch.num_columns());
match prefilter_flat_batch_by_primary_key(record_batch, pk_col_idx, pk_filter)? {
Some(batch) => {
metrics.prefilter_cost += prefilter_start.elapsed();
metrics.prefilter_rows_filtered += rows_before - batch.num_rows();
batch
}
None => {
metrics.prefilter_cost += prefilter_start.elapsed();
metrics.prefilter_rows_filtered += rows_before;
return Ok(None);
}
}
} else {
record_batch
};
// Converts the format to the flat format.
let format = context.read_format().as_flat().unwrap();
let record_batch = format.convert_batch(record_batch, None)?;
@@ -362,12 +419,12 @@ fn apply_combined_filters(
let mut combined_filter = None;
let mut tag_decode_state = TagDecodeState::new();
// First, apply predicate filters using the shared method.
// Apply predicate filters using the shared method.
if !context.base.filters.is_empty() {
let predicate_mask = context.base.compute_filter_mask_flat(
&record_batch,
skip_fields,
false,
has_pk_prefilter,
&mut tag_decode_state,
)?;
// If predicate filters out the entire batch, return None early
@@ -433,6 +490,7 @@ mod tests {
use super::*;
use crate::memtable::bulk::context::BulkIterContext;
use crate::test_util::sst_util::new_primary_key;
#[test]
fn test_bulk_part_batch_iter() {
@@ -461,9 +519,16 @@ mod tests {
vec![1000, 2000, 3000],
));
// Create primary key dictionary array
// Create primary key dictionary array with properly encoded PKs
use datatypes::arrow::array::{BinaryArray, DictionaryArray, UInt32Array};
let values = Arc::new(BinaryArray::from_iter_values([b"key1", b"key2", b"key3"]));
let pk1 = new_primary_key(&["key1"]);
let pk2 = new_primary_key(&["key2"]);
let pk3 = new_primary_key(&["key3"]);
let values = Arc::new(BinaryArray::from_iter_values([
pk1.as_slice(),
pk2.as_slice(),
pk3.as_slice(),
]));
let keys = UInt32Array::from(vec![0, 1, 2]);
let primary_key = Arc::new(DictionaryArray::new(keys, values));
@@ -596,12 +661,17 @@ mod tests {
]));
// Create first batch with 2 rows
let pk1 = new_primary_key(&["key1"]);
let pk2 = new_primary_key(&["key2"]);
let key1_1 = Arc::new(StringArray::from_iter_values(["key1", "key2"]));
let field1_1 = Arc::new(Int64Array::from(vec![11, 12]));
let timestamp_1 = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from(
vec![1000, 2000],
));
let values_1 = Arc::new(BinaryArray::from_iter_values([b"key1", b"key2"]));
let values_1 = Arc::new(BinaryArray::from_iter_values([
pk1.as_slice(),
pk2.as_slice(),
]));
let keys_1 = UInt32Array::from(vec![0, 1]);
let primary_key_1 = Arc::new(DictionaryArray::new(keys_1, values_1));
let sequence_1 = Arc::new(UInt64Array::from(vec![1, 2]));
@@ -621,12 +691,19 @@ mod tests {
.unwrap();
// Create second batch with 3 rows
let pk3 = new_primary_key(&["key3"]);
let pk4 = new_primary_key(&["key4"]);
let pk5 = new_primary_key(&["key5"]);
let key1_2 = Arc::new(StringArray::from_iter_values(["key3", "key4", "key5"]));
let field1_2 = Arc::new(Int64Array::from(vec![13, 14, 15]));
let timestamp_2 = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from(
vec![3000, 4000, 5000],
));
let values_2 = Arc::new(BinaryArray::from_iter_values([b"key3", b"key4", b"key5"]));
let values_2 = Arc::new(BinaryArray::from_iter_values([
pk3.as_slice(),
pk4.as_slice(),
pk5.as_slice(),
]));
let keys_2 = UInt32Array::from(vec![0, 1, 2]);
let primary_key_2 = Arc::new(DictionaryArray::new(keys_2, values_2));
let sequence_2 = Arc::new(UInt64Array::from(vec![3, 4, 5]));

View File

@@ -490,6 +490,7 @@ impl TreeIter {
num_rows: self.metrics.rows_fetched,
num_batches: self.metrics.batches_fetched,
scan_cost: self.metrics.iter_elapsed,
..Default::default()
};
mem_scan_metrics.merge_inner(&inner);
}

View File

@@ -372,6 +372,7 @@ impl IterBuilder for BatchRangeBuilder {
num_rows: batch.num_rows(),
num_batches: 1,
scan_cost: self.scan_cost,
..Default::default()
};
metrics.merge_inner(&inner);
}

View File

@@ -567,6 +567,7 @@ impl Iter {
num_rows: self.metrics.num_rows,
num_batches: self.metrics.num_batches,
scan_cost: self.metrics.scan_cost,
..Default::default()
};
mem_scan_metrics.merge_inner(&inner);
}

View File

@@ -138,6 +138,10 @@ pub(crate) struct ScanMetricsSet {
mem_batches: usize,
/// Number of series read from memtables.
mem_series: usize,
/// Duration of prefilter in memtable scan.
mem_prefilter_cost: Duration,
/// Number of rows filtered by prefilter in memtable scan.
mem_prefilter_rows_filtered: usize,
// SST related metrics:
/// Duration to build file ranges.
@@ -341,6 +345,8 @@ impl fmt::Debug for ScanMetricsSet {
mem_rows,
mem_batches,
mem_series,
mem_prefilter_cost,
mem_prefilter_rows_filtered,
inverted_index_apply_metrics,
bloom_filter_apply_metrics,
fulltext_index_apply_metrics,
@@ -509,6 +515,15 @@ impl fmt::Debug for ScanMetricsSet {
if !mem_scan_cost.is_zero() {
write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?;
}
if !mem_prefilter_cost.is_zero() {
write!(f, ", \"mem_prefilter_cost\":\"{mem_prefilter_cost:?}\"")?;
}
if *mem_prefilter_rows_filtered > 0 {
write!(
f,
", \"mem_prefilter_rows_filtered\":{mem_prefilter_rows_filtered}"
)?;
}
// Write optional verbose metrics if they are not empty
if let Some(metrics) = inverted_index_apply_metrics
@@ -1061,6 +1076,8 @@ impl PartitionMetrics {
metrics.mem_rows += data.num_rows;
metrics.mem_batches += data.num_batches;
metrics.mem_series += data.total_series;
metrics.mem_prefilter_cost += data.prefilter_cost;
metrics.mem_prefilter_rows_filtered += data.prefilter_rows_filtered;
}
/// Merges [ScannerMetrics], `build_reader_cost`, `scan_cost` and `yield_cost`.

View File

@@ -23,7 +23,7 @@ use std::sync::Arc;
use api::v1::SemanticType;
use common_recordbatch::filter::SimpleFilterEvaluator;
use datatypes::arrow::array::BinaryArray;
use datatypes::arrow::array::{BinaryArray, BooleanArray, BooleanBufferBuilder};
use datatypes::arrow::record_batch::RecordBatch;
use futures::StreamExt;
use mito_codec::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter};
@@ -33,7 +33,7 @@ use parquet::schema::types::SchemaDescriptor;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use crate::error::{DecodeSnafu, ReadParquetSnafu, Result, UnexpectedSnafu};
use crate::error::{ComputeArrowSnafu, DecodeSnafu, ReadParquetSnafu, Result, UnexpectedSnafu};
use crate::sst::parquet::flat_format::primary_key_column_index;
use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
use crate::sst::parquet::reader::{RowGroupBuildContext, RowGroupReaderBuilder};
@@ -93,6 +93,55 @@ pub(crate) fn matching_row_ranges_by_primary_key(
Ok(matched_row_ranges)
}
/// Filters a flat-format record batch by primary key, returning only rows whose
/// primary key matches the filter. Returns `None` if all rows are filtered out.
pub(crate) fn prefilter_flat_batch_by_primary_key(
input: RecordBatch,
pk_column_index: usize,
pk_filter: &mut dyn PrimaryKeyFilter,
) -> Result<Option<RecordBatch>> {
if input.num_rows() == 0 {
return Ok(Some(input));
}
let matched_row_ranges =
matching_row_ranges_by_primary_key(&input, pk_column_index, pk_filter)?;
if matched_row_ranges.is_empty() {
return Ok(None);
}
if matched_row_ranges.len() == 1
&& matched_row_ranges[0].start == 0
&& matched_row_ranges[0].end == input.num_rows()
{
return Ok(Some(input));
}
if matched_row_ranges.len() == 1 {
let span = &matched_row_ranges[0];
return Ok(Some(input.slice(span.start, span.end - span.start)));
}
let mut builder = BooleanBufferBuilder::new(input.num_rows());
builder.append_n(input.num_rows(), false);
for span in matched_row_ranges {
for i in span {
builder.set_bit(i, true);
}
}
let filtered = datatypes::arrow::compute::filter_record_batch(
&input,
&BooleanArray::new(builder.finish(), None),
)
.context(ComputeArrowSnafu)?;
if filtered.num_rows() == 0 {
Ok(None)
} else {
Ok(Some(filtered))
}
}
/// Returns whether a filter can be applied by parquet primary-key prefiltering.
///
/// Unlike `PartitionTreeMemtable`, parquet prefilter always supports predicates
@@ -346,12 +395,19 @@ mod tests {
use common_recordbatch::filter::SimpleFilterEvaluator;
use datafusion_expr::{col, lit};
use mito_codec::row_converter::PrimaryKeyFilter;
use datatypes::arrow::array::{
ArrayRef, DictionaryArray, TimestampMillisecondArray, UInt8Array, UInt32Array, UInt64Array,
};
use datatypes::arrow::datatypes::{Schema, UInt32Type};
use mito_codec::row_converter::{PrimaryKeyFilter, build_primary_key_codec};
use store_api::codec::PrimaryKeyEncoding;
use super::*;
use crate::sst::internal_fields;
use crate::sst::parquet::format::ReadFormat;
use crate::test_util::sst_util::{new_primary_key, sst_region_metadata_with_encoding};
use crate::test_util::sst_util::{
new_primary_key, sst_region_metadata, sst_region_metadata_with_encoding,
};
#[test]
fn test_is_usable_primary_key_filter_skips_legacy_primary_key_batches() {
@@ -416,4 +472,125 @@ mod tests {
assert_eq!(hits.load(Ordering::Relaxed), 2);
}
fn new_test_filters(exprs: &[datafusion_expr::Expr]) -> Vec<SimpleFilterEvaluator> {
exprs
.iter()
.filter_map(SimpleFilterEvaluator::try_new)
.collect()
}
fn new_raw_batch(primary_keys: &[&[u8]], field_values: &[u64]) -> RecordBatch {
assert_eq!(primary_keys.len(), field_values.len());
let metadata = Arc::new(sst_region_metadata());
let arrow_schema = metadata.schema.arrow_schema();
let field_column = arrow_schema
.field(arrow_schema.index_of("field_0").unwrap())
.clone();
let time_index_column = arrow_schema
.field(arrow_schema.index_of("ts").unwrap())
.clone();
let mut fields = vec![field_column, time_index_column];
fields.extend(
internal_fields()
.into_iter()
.map(|field| field.as_ref().clone()),
);
let schema = Arc::new(Schema::new(fields));
let mut dict_values = Vec::new();
let mut keys = Vec::with_capacity(primary_keys.len());
for pk in primary_keys {
let key = dict_values
.iter()
.position(|existing: &&[u8]| existing == pk)
.unwrap_or_else(|| {
dict_values.push(*pk);
dict_values.len() - 1
});
keys.push(key as u32);
}
let pk_array: ArrayRef = Arc::new(DictionaryArray::<UInt32Type>::new(
UInt32Array::from(keys),
Arc::new(BinaryArray::from_iter_values(dict_values.iter().copied())),
));
RecordBatch::try_new(
schema,
vec![
Arc::new(UInt64Array::from(field_values.to_vec())),
Arc::new(TimestampMillisecondArray::from_iter_values(
0..primary_keys.len() as i64,
)),
pk_array,
Arc::new(UInt64Array::from(vec![1; primary_keys.len()])),
Arc::new(UInt8Array::from(vec![1; primary_keys.len()])),
],
)
.unwrap()
}
fn field_values(batch: &RecordBatch) -> Vec<u64> {
batch
.column(0)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap()
.values()
.to_vec()
}
#[test]
fn test_prefilter_primary_key_drops_single_dictionary_batch() {
let metadata = Arc::new(sst_region_metadata());
let filters = Arc::new(new_test_filters(&[col("tag_0").eq(lit("b"))]));
let mut primary_key_filter = build_primary_key_codec(metadata.as_ref())
.primary_key_filter(&metadata, filters, false);
let pk_a = new_primary_key(&["a", "x"]);
let batch = new_raw_batch(&[pk_a.as_slice(), pk_a.as_slice()], &[10, 11]);
let pk_col_idx = primary_key_column_index(batch.num_columns());
let filtered =
prefilter_flat_batch_by_primary_key(batch, pk_col_idx, primary_key_filter.as_mut())
.unwrap();
assert!(filtered.is_none());
}
#[test]
fn test_prefilter_primary_key_builds_mask_for_fragmented_matches() {
let metadata = Arc::new(sst_region_metadata());
let filters = Arc::new(new_test_filters(&[col("tag_0")
.eq(lit("a"))
.or(col("tag_0").eq(lit("c")))]));
let mut primary_key_filter = build_primary_key_codec(metadata.as_ref())
.primary_key_filter(&metadata, filters, false);
let pk_a = new_primary_key(&["a", "x"]);
let pk_b = new_primary_key(&["b", "x"]);
let pk_c = new_primary_key(&["c", "x"]);
let pk_d = new_primary_key(&["d", "x"]);
let batch = new_raw_batch(
&[
pk_a.as_slice(),
pk_a.as_slice(),
pk_b.as_slice(),
pk_b.as_slice(),
pk_c.as_slice(),
pk_c.as_slice(),
pk_d.as_slice(),
pk_d.as_slice(),
],
&[10, 11, 12, 13, 14, 15, 16, 17],
);
let pk_col_idx = primary_key_column_index(batch.num_columns());
let filtered =
prefilter_flat_batch_by_primary_key(batch, pk_col_idx, primary_key_filter.as_mut())
.unwrap()
.unwrap();
assert_eq!(filtered.num_rows(), 4);
assert_eq!(field_values(&filtered), vec![10, 11, 14, 15]);
}
}