feat: support pre filter mode in bulk memtable

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
evenyag
2025-10-28 19:56:37 +08:00
parent 9c800af75d
commit d1372092ae
6 changed files with 125 additions and 16 deletions

View File

@@ -43,6 +43,22 @@ impl BulkIterContext {
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
skip_auto_convert: bool,
) -> Result<Self> {
Self::new_with_pre_filter_mode(
region_metadata,
projection,
predicate,
skip_auto_convert,
PreFilterMode::All,
)
}
pub fn new_with_pre_filter_mode(
region_metadata: RegionMetadataRef,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
skip_auto_convert: bool,
pre_filter_mode: PreFilterMode,
) -> Result<Self> {
let codec = build_primary_key_codec(&region_metadata);
@@ -73,18 +89,23 @@ impl BulkIterContext {
codec,
// we don't need to compat batch since all batch in memtable have the same schema.
compat_batch: None,
pre_filter_mode: PreFilterMode::All,
pre_filter_mode,
},
predicate,
})
}
/// Prunes row groups by stats.
pub(crate) fn row_groups_to_read(&self, file_meta: &Arc<ParquetMetaData>) -> VecDeque<usize> {
pub(crate) fn row_groups_to_read(
&self,
file_meta: &Arc<ParquetMetaData>,
skip_fields: bool,
) -> VecDeque<usize> {
let region_meta = self.base.read_format.metadata();
let row_groups = file_meta.row_groups();
// expected_metadata is set to None since we always expect region metadata of memtable is up-to-date.
let stats = RowGroupPruningStats::new(row_groups, &self.base.read_format, None, false);
let stats =
RowGroupPruningStats::new(row_groups, &self.base.read_format, None, skip_fields);
if let Some(predicate) = self.predicate.as_ref() {
predicate
.prune_with_stats(&stats, region_meta.schema.arrow_schema())
@@ -105,4 +126,9 @@ impl BulkIterContext {
pub(crate) fn read_format(&self) -> &ReadFormat {
&self.base.read_format
}
/// Returns the pre-filter mode.
pub(crate) fn pre_filter_mode(&self) -> PreFilterMode {
self.base.pre_filter_mode
}
}

View File

@@ -65,6 +65,7 @@ use crate::memtable::bulk::context::BulkIterContextRef;
use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
use crate::memtable::time_series::{ValueBuilder, Values};
use crate::sst::index::IndexOutput;
use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete};
use crate::sst::parquet::flat_format::primary_key_column_index;
use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat};
use crate::sst::parquet::helper::parse_parquet_metadata;
@@ -572,8 +573,13 @@ impl EncodedBulkPart {
context: BulkIterContextRef,
sequence: Option<SequenceRange>,
) -> Result<Option<BoxedRecordBatchIterator>> {
// Compute skip_fields for row group pruning using the same approach as compute_skip_fields in reader.rs.
let skip_fields_for_pruning =
Self::compute_skip_fields(context.pre_filter_mode(), &self.metadata.parquet_metadata);
// use predicate to find row groups to read.
let row_groups_to_read = context.row_groups_to_read(&self.metadata.parquet_metadata);
let row_groups_to_read =
context.row_groups_to_read(&self.metadata.parquet_metadata, skip_fields_for_pruning);
if row_groups_to_read.is_empty() {
// All row groups are filtered.
@@ -589,6 +595,20 @@ impl EncodedBulkPart {
)?;
Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
}
/// Computes whether to skip field columns based on PreFilterMode.
fn compute_skip_fields(pre_filter_mode: PreFilterMode, parquet_meta: &ParquetMetaData) -> bool {
match pre_filter_mode {
PreFilterMode::All => false,
PreFilterMode::SkipFields => true,
PreFilterMode::SkipFieldsOnDelete => {
// Check if any row group contains delete op
(0..parquet_meta.num_row_groups()).any(|rg_idx| {
row_group_contains_delete(parquet_meta, rg_idx, "memtable").unwrap_or(true)
})
}
}
}
}
#[derive(Debug, Clone)]

View File

@@ -27,6 +27,7 @@ use store_api::storage::SequenceRange;
use crate::error::{self, ComputeArrowSnafu, DecodeArrowRowGroupSnafu};
use crate::memtable::bulk::context::{BulkIterContext, BulkIterContextRef};
use crate::memtable::bulk::row_group_reader::MemtableRowGroupReaderBuilder;
use crate::sst::parquet::file_range::PreFilterMode;
use crate::sst::parquet::flat_format::sequence_column_index;
use crate::sst::parquet::reader::RowGroupReaderContext;
@@ -38,6 +39,8 @@ pub struct EncodedBulkPartIter {
builder: MemtableRowGroupReaderBuilder,
/// Sequence number filter.
sequence: Option<SequenceRange>,
/// Cached skip_fields for current row group.
current_skip_fields: bool,
}
impl EncodedBulkPartIter {
@@ -58,16 +61,22 @@ impl EncodedBulkPartIter {
let builder =
MemtableRowGroupReaderBuilder::try_new(&context, projection_mask, parquet_meta, data)?;
let init_reader = row_groups_to_read
.pop_front()
.map(|first_row_group| builder.build_row_group_reader(first_row_group, None))
.transpose()?;
let (init_reader, current_skip_fields) = match row_groups_to_read.pop_front() {
Some(first_row_group) => {
let skip_fields = builder.compute_skip_fields(&context, first_row_group);
let reader = builder.build_row_group_reader(first_row_group, None)?;
(Some(reader), skip_fields)
}
None => (None, false),
};
Ok(Self {
context,
row_groups_to_read,
current_reader: init_reader,
builder,
sequence,
current_skip_fields,
})
}
@@ -80,19 +89,34 @@ impl EncodedBulkPartIter {
for batch in current {
let batch = batch.context(DecodeArrowRowGroupSnafu)?;
if let Some(batch) = apply_combined_filters(&self.context, &self.sequence, batch)? {
if let Some(batch) = apply_combined_filters(
&self.context,
&self.sequence,
batch,
self.current_skip_fields,
)? {
return Ok(Some(batch));
}
}
// Previous row group exhausted, read next row group
while let Some(next_row_group) = self.row_groups_to_read.pop_front() {
// Compute skip_fields for this row group
self.current_skip_fields = self
.builder
.compute_skip_fields(&self.context, next_row_group);
let next_reader = self.builder.build_row_group_reader(next_row_group, None)?;
let current = self.current_reader.insert(next_reader);
for batch in current {
let batch = batch.context(DecodeArrowRowGroupSnafu)?;
if let Some(batch) = apply_combined_filters(&self.context, &self.sequence, batch)? {
if let Some(batch) = apply_combined_filters(
&self.context,
&self.sequence,
batch,
self.current_skip_fields,
)? {
return Ok(Some(batch));
}
}
@@ -152,8 +176,14 @@ impl BulkPartRecordBatchIter {
// Apply projection first.
let projected_batch = self.apply_projection(record_batch)?;
// Apply combined filtering (both predicate and sequence filters)
// For BulkPartRecordBatchIter, we don't have row group information.
let skip_fields = match self.context.pre_filter_mode() {
PreFilterMode::All => false,
PreFilterMode::SkipFields => true,
PreFilterMode::SkipFieldsOnDelete => true,
};
let Some(filtered_batch) =
apply_combined_filters(&self.context, &self.sequence, projected_batch)?
apply_combined_filters(&self.context, &self.sequence, projected_batch, skip_fields)?
else {
return Ok(None);
};
@@ -181,6 +211,7 @@ fn apply_combined_filters(
context: &BulkIterContext,
sequence: &Option<SequenceRange>,
record_batch: RecordBatch,
skip_fields: bool,
) -> error::Result<Option<RecordBatch>> {
// Converts the format to the flat format first.
let format = context.read_format().as_flat().unwrap();
@@ -191,10 +222,9 @@ fn apply_combined_filters(
// First, apply predicate filters using the shared method.
if !context.base.filters.is_empty() {
// BulkIterContext always uses PreFilterMode::All, so skip_fields should be false
let predicate_mask = context
.base
.compute_filter_mask_flat(&record_batch, false)?;
.compute_filter_mask_flat(&record_batch, skip_fields)?;
// If predicate filters out the entire batch, return None early
let Some(mask) = predicate_mask else {
return Ok(None);

View File

@@ -169,4 +169,23 @@ impl MemtableRowGroupReaderBuilder {
)
.context(ReadDataPartSnafu)
}
/// Computes whether to skip field filters for a specific row group based on PreFilterMode.
pub(crate) fn compute_skip_fields(
&self,
context: &BulkIterContextRef,
row_group_idx: usize,
) -> bool {
use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete};
match context.pre_filter_mode() {
PreFilterMode::All => false,
PreFilterMode::SkipFields => true,
PreFilterMode::SkipFieldsOnDelete => {
// Check if this specific row group contains delete op
row_group_contains_delete(&self.parquet_metadata, row_group_idx, "memtable")
.unwrap_or(true)
}
}
}
}

View File

@@ -68,6 +68,7 @@ use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
use crate::sst::parquet::file_range::PreFilterMode;
use crate::sst::parquet::reader::ReaderMetrics;
/// Parallel scan channel size for flat format.
@@ -949,6 +950,17 @@ impl ScanInput {
}
}
fn pre_filter_mode(&self) -> PreFilterMode {
if self.append_mode {
return PreFilterMode::All;
}
match self.merge_mode {
MergeMode::LastRow => PreFilterMode::SkipFieldsOnDelete,
MergeMode::LastNonNull => PreFilterMode::SkipFields,
}
}
/// Prunes a file to scan and returns the builder to build readers.
pub async fn prune_file(
&self,
@@ -956,6 +968,7 @@ impl ScanInput {
reader_metrics: &mut ReaderMetrics,
) -> Result<FileRangeBuilder> {
let predicate = self.predicate_for_file(file);
let filter_mode = self.pre_filter_mode();
let res = self
.access_layer
.read_sst(file.clone())
@@ -968,6 +981,7 @@ impl ScanInput {
.expected_metadata(Some(self.mapper.metadata().clone()))
.flat_format(self.flat_format)
.compaction(self.compaction)
.pre_filter_mode(filter_mode)
.build_reader_input(reader_metrics)
.await;
let (mut file_range_ctx, selection) = match res {

View File

@@ -282,7 +282,7 @@ impl FileRangeContext {
PreFilterMode::SkipFields => true,
PreFilterMode::SkipFieldsOnDelete => {
// Check if this specific row group contains delete op
self.contains_delete(row_group_idx).unwrap_or(false)
self.contains_delete(row_group_idx).unwrap_or(true)
}
}
}
@@ -299,8 +299,8 @@ impl FileRangeContext {
pub(crate) enum PreFilterMode {
/// Filters all columns.
All,
/// If doesn't contain delete op, filters all columns.
/// Otherwise, skips fields.
/// If the range doesn't contain delete op or doesn't have statistics, filters all columns.
/// Otherwise, skips filtering fields.
SkipFieldsOnDelete,
/// Always skip fields.
SkipFields,