mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-31 20:30:37 +00:00
@@ -368,6 +368,7 @@ fn apply_combined_filters(
|
||||
let predicate_mask = context.base.compute_filter_mask_flat(
|
||||
&record_batch,
|
||||
skip_fields,
|
||||
None,
|
||||
&mut tag_decode_state,
|
||||
)?;
|
||||
// If predicate filters out the entire batch, return None early
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::ops::BitAnd;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -19,16 +20,20 @@ use common_recordbatch::filter::SimpleFilterEvaluator;
|
||||
use common_time::Timestamp;
|
||||
use datatypes::arrow::array::BooleanArray;
|
||||
use datatypes::arrow::buffer::BooleanBuffer;
|
||||
use datatypes::arrow::compute::concat_batches;
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use mito_codec::row_converter::PrimaryKeyFilter;
|
||||
use snafu::ResultExt;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::ColumnId;
|
||||
|
||||
use crate::error::{RecordBatchSnafu, Result};
|
||||
use crate::error::{ComputeArrowSnafu, RecordBatchSnafu, Result, UnexpectedSnafu};
|
||||
use crate::memtable::BoxedBatchIterator;
|
||||
use crate::read::last_row::{FlatRowGroupLastRowCachedReader, RowGroupLastRowCachedReader};
|
||||
use crate::read::{Batch, BatchReader};
|
||||
use crate::sst::file::FileTimeRange;
|
||||
use crate::sst::parquet::file_range::FileRangeContextRef;
|
||||
use crate::sst::parquet::flat_format::primary_key_column_index;
|
||||
use crate::sst::parquet::format::PrimaryKeyArray;
|
||||
use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics, RowGroupReader};
|
||||
|
||||
pub enum Source {
|
||||
@@ -263,12 +268,75 @@ impl FlatSource {
|
||||
}
|
||||
}
|
||||
|
||||
struct CachedPrimaryKeyFilter {
|
||||
inner: Box<dyn PrimaryKeyFilter>,
|
||||
last_primary_key: Vec<u8>,
|
||||
last_match: Option<bool>,
|
||||
}
|
||||
|
||||
impl CachedPrimaryKeyFilter {
|
||||
fn new(inner: Box<dyn PrimaryKeyFilter>) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
last_primary_key: Vec::new(),
|
||||
last_match: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PrimaryKeyFilter for CachedPrimaryKeyFilter {
|
||||
fn matches(&mut self, pk: &[u8]) -> bool {
|
||||
if let Some(last_match) = self.last_match
|
||||
&& self.last_primary_key == pk
|
||||
{
|
||||
return last_match;
|
||||
}
|
||||
|
||||
let matched = self.inner.matches(pk);
|
||||
self.last_primary_key.clear();
|
||||
self.last_primary_key.extend_from_slice(pk);
|
||||
self.last_match = Some(matched);
|
||||
matched
|
||||
}
|
||||
}
|
||||
|
||||
fn batch_single_primary_key(batch: &RecordBatch) -> Result<Option<&[u8]>> {
|
||||
let primary_key_index = primary_key_column_index(batch.num_columns());
|
||||
let pk_dict_array = batch
|
||||
.column(primary_key_index)
|
||||
.as_any()
|
||||
.downcast_ref::<PrimaryKeyArray>()
|
||||
.context(UnexpectedSnafu {
|
||||
reason: "Primary key column is not a dictionary array".to_string(),
|
||||
})?;
|
||||
let pk_values = pk_dict_array
|
||||
.values()
|
||||
.as_any()
|
||||
.downcast_ref::<datatypes::arrow::array::BinaryArray>()
|
||||
.context(UnexpectedSnafu {
|
||||
reason: "Primary key values are not binary array".to_string(),
|
||||
})?;
|
||||
let keys = pk_dict_array.keys();
|
||||
if keys.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let first_key = keys.value(0);
|
||||
if first_key != keys.value(keys.len() - 1) {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
Ok(Some(pk_values.value(first_key as usize)))
|
||||
}
|
||||
|
||||
/// A flat format reader that returns RecordBatch instead of Batch.
|
||||
pub struct FlatPruneReader {
|
||||
/// Context for file ranges.
|
||||
context: FileRangeContextRef,
|
||||
source: FlatSource,
|
||||
primary_key_filter: Option<Box<dyn PrimaryKeyFilter>>,
|
||||
primary_key_filter: Option<CachedPrimaryKeyFilter>,
|
||||
covered_primary_key_filter_columns: Option<HashSet<ColumnId>>,
|
||||
buffered_prefiltered_batch: Option<RecordBatch>,
|
||||
metrics: ReaderMetrics,
|
||||
/// Whether to skip field filters for this row group.
|
||||
skip_fields: bool,
|
||||
@@ -281,7 +349,11 @@ impl FlatPruneReader {
|
||||
skip_fields: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
primary_key_filter: ctx.new_primary_key_filter(),
|
||||
primary_key_filter: ctx
|
||||
.new_primary_key_filter()
|
||||
.map(CachedPrimaryKeyFilter::new),
|
||||
covered_primary_key_filter_columns: ctx.covered_primary_key_filter_columns(),
|
||||
buffered_prefiltered_batch: None,
|
||||
context: ctx,
|
||||
source: FlatSource::RowGroup(reader),
|
||||
metrics: Default::default(),
|
||||
@@ -296,6 +368,8 @@ impl FlatPruneReader {
|
||||
) -> Self {
|
||||
Self {
|
||||
primary_key_filter: None,
|
||||
covered_primary_key_filter_columns: None,
|
||||
buffered_prefiltered_batch: None,
|
||||
context: ctx,
|
||||
source: FlatSource::LastRow(reader),
|
||||
metrics: Default::default(),
|
||||
@@ -309,25 +383,17 @@ impl FlatPruneReader {
|
||||
}
|
||||
|
||||
pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
|
||||
while let Some(raw_batch) = {
|
||||
let start = std::time::Instant::now();
|
||||
let batch = self.source.next_raw_batch()?;
|
||||
self.metrics.scan_cost += start.elapsed();
|
||||
batch
|
||||
} {
|
||||
self.metrics.num_rows += raw_batch.num_rows();
|
||||
self.metrics.num_batches += 1;
|
||||
|
||||
let num_rows_before_prefilter = raw_batch.num_rows();
|
||||
let Some(prefiltered_batch) = self.prefilter_primary_keys(raw_batch)? else {
|
||||
self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_prefilter;
|
||||
continue;
|
||||
loop {
|
||||
let Some(mut raw_batch) = self.next_prefiltered_batch()? else {
|
||||
return Ok(None);
|
||||
};
|
||||
let prefiltered_rows = num_rows_before_prefilter - prefiltered_batch.num_rows();
|
||||
self.metrics.filter_metrics.rows_precise_filtered += prefiltered_rows;
|
||||
|
||||
let record_batch = self.source.convert_batch(prefiltered_batch)?;
|
||||
let scan_start = std::time::Instant::now();
|
||||
self.coalesce_prefiltered_batches(&mut raw_batch)?;
|
||||
let record_batch = self.source.convert_batch(raw_batch)?;
|
||||
self.metrics.scan_cost += scan_start.elapsed();
|
||||
|
||||
self.metrics.num_batches += 1;
|
||||
match self.prune_flat(record_batch)? {
|
||||
Some(filtered_batch) => {
|
||||
return Ok(Some(filtered_batch));
|
||||
@@ -337,8 +403,56 @@ impl FlatPruneReader {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
fn next_prefiltered_batch(&mut self) -> Result<Option<RecordBatch>> {
|
||||
if let Some(batch) = self.buffered_prefiltered_batch.take() {
|
||||
return Ok(Some(batch));
|
||||
}
|
||||
|
||||
loop {
|
||||
let start = std::time::Instant::now();
|
||||
let Some(raw_batch) = self.source.next_raw_batch()? else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
self.metrics.num_rows += raw_batch.num_rows();
|
||||
|
||||
let num_rows_before_prefilter = raw_batch.num_rows();
|
||||
let Some(prefiltered_batch) = self.prefilter_primary_keys(raw_batch)? else {
|
||||
self.metrics.scan_cost += start.elapsed();
|
||||
self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_prefilter;
|
||||
continue;
|
||||
};
|
||||
let prefiltered_rows = num_rows_before_prefilter - prefiltered_batch.num_rows();
|
||||
self.metrics.filter_metrics.rows_precise_filtered += prefiltered_rows;
|
||||
self.metrics.scan_cost += start.elapsed();
|
||||
return Ok(Some(prefiltered_batch));
|
||||
}
|
||||
}
|
||||
|
||||
fn coalesce_prefiltered_batches(&mut self, batch: &mut RecordBatch) -> Result<()> {
|
||||
let Some(primary_key) = batch_single_primary_key(batch)? else {
|
||||
return Ok(());
|
||||
};
|
||||
let primary_key = primary_key.to_vec();
|
||||
let schema = batch.schema();
|
||||
let mut batches = vec![batch.clone()];
|
||||
|
||||
while let Some(next_batch) = self.next_prefiltered_batch()? {
|
||||
if batch_single_primary_key(&next_batch)? == Some(primary_key.as_slice()) {
|
||||
batches.push(next_batch);
|
||||
} else {
|
||||
self.buffered_prefiltered_batch = Some(next_batch);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if batches.len() > 1 {
|
||||
*batch = concat_batches(&schema, &batches).context(ComputeArrowSnafu)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn prefilter_primary_keys(&mut self, record_batch: RecordBatch) -> Result<Option<RecordBatch>> {
|
||||
@@ -347,7 +461,7 @@ impl FlatPruneReader {
|
||||
};
|
||||
|
||||
self.context
|
||||
.prefilter_flat_batch_by_primary_key(record_batch, primary_key_filter.as_mut())
|
||||
.prefilter_flat_batch_by_primary_key(record_batch, primary_key_filter)
|
||||
}
|
||||
|
||||
/// Prunes batches by the pushed down predicate and returns RecordBatch.
|
||||
@@ -358,9 +472,11 @@ impl FlatPruneReader {
|
||||
}
|
||||
|
||||
let num_rows_before_filter = record_batch.num_rows();
|
||||
let Some(filtered_batch) = self
|
||||
.context
|
||||
.precise_filter_flat(record_batch, self.skip_fields)?
|
||||
let Some(filtered_batch) = self.context.precise_filter_flat(
|
||||
record_batch,
|
||||
self.skip_fields,
|
||||
self.covered_primary_key_filter_columns.as_ref(),
|
||||
)?
|
||||
else {
|
||||
// the entire batch is filtered out
|
||||
self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter;
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
//! Structs and functions for reading ranges from a parquet file. A file range
|
||||
//! is usually a row group in a parquet file.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::ops::{BitAnd, Range};
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -441,6 +441,12 @@ impl FileRangeContext {
|
||||
self.base.new_primary_key_filter()
|
||||
}
|
||||
|
||||
/// Returns tag columns whose simple filters are already guaranteed by the
|
||||
/// encoded primary-key prefilter.
|
||||
pub(crate) fn covered_primary_key_filter_columns(&self) -> Option<HashSet<ColumnId>> {
|
||||
self.base.covered_primary_key_filter_columns()
|
||||
}
|
||||
|
||||
/// Returns true if a partition filter is configured.
|
||||
pub(crate) fn has_partition_filter(&self) -> bool {
|
||||
self.base.partition_filter.is_some()
|
||||
@@ -484,8 +490,10 @@ impl FileRangeContext {
|
||||
&self,
|
||||
input: RecordBatch,
|
||||
skip_fields: bool,
|
||||
skip_tag_filter_columns: Option<&HashSet<ColumnId>>,
|
||||
) -> Result<Option<RecordBatch>> {
|
||||
self.base.precise_filter_flat(input, skip_fields)
|
||||
self.base
|
||||
.precise_filter_flat(input, skip_fields, skip_tag_filter_columns)
|
||||
}
|
||||
|
||||
/// Applies an encoded primary-key prefilter to the input `RecordBatch`.
|
||||
@@ -660,6 +668,32 @@ impl RangeBase {
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn covered_primary_key_filter_columns(&self) -> Option<HashSet<ColumnId>> {
|
||||
if self.read_format.metadata().primary_key.is_empty()
|
||||
|| !self
|
||||
.read_format
|
||||
.as_flat()
|
||||
.is_some_and(|format| format.raw_batch_has_primary_key_dictionary())
|
||||
{
|
||||
return None;
|
||||
}
|
||||
|
||||
let sst_metadata = self.read_format.metadata();
|
||||
let expected_metadata = self.expected_metadata.as_deref();
|
||||
let filters = self.usable_primary_key_filters()?;
|
||||
let column_ids = filters
|
||||
.iter()
|
||||
.filter_map(|filter| {
|
||||
expected_metadata
|
||||
.and_then(|metadata| metadata.column_by_name(filter.column_name()))
|
||||
.or_else(|| sst_metadata.column_by_name(filter.column_name()))
|
||||
.map(|column| column.column_id)
|
||||
})
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
(!column_ids.is_empty()).then_some(column_ids)
|
||||
}
|
||||
|
||||
/// Applies an encoded primary-key prefilter before flat-row materialization.
|
||||
///
|
||||
/// This only prunes rows that are guaranteed to fail simple primary-key predicates.
|
||||
@@ -885,15 +919,25 @@ impl RangeBase {
|
||||
&self,
|
||||
input: RecordBatch,
|
||||
skip_fields: bool,
|
||||
skip_tag_filter_columns: Option<&HashSet<ColumnId>>,
|
||||
) -> Result<Option<RecordBatch>> {
|
||||
let mut tag_decode_state = TagDecodeState::new();
|
||||
let mask = self.compute_filter_mask_flat(&input, skip_fields, &mut tag_decode_state)?;
|
||||
let mask = self.compute_filter_mask_flat(
|
||||
&input,
|
||||
skip_fields,
|
||||
skip_tag_filter_columns,
|
||||
&mut tag_decode_state,
|
||||
)?;
|
||||
|
||||
// If mask is None, the entire batch is filtered out
|
||||
let Some(mut mask) = mask else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
if self.partition_filter.is_none() && mask.count_set_bits() == input.num_rows() {
|
||||
return Ok(Some(input));
|
||||
}
|
||||
|
||||
// Apply partition filter
|
||||
if let Some(partition_filter) = &self.partition_filter {
|
||||
let record_batch = self.project_record_batch_for_pruning_flat(
|
||||
@@ -932,6 +976,7 @@ impl RangeBase {
|
||||
&self,
|
||||
input: &RecordBatch,
|
||||
skip_fields: bool,
|
||||
skip_tag_filter_columns: Option<&HashSet<ColumnId>>,
|
||||
tag_decode_state: &mut TagDecodeState,
|
||||
) -> Result<Option<BooleanBuffer>> {
|
||||
let mut mask = BooleanBuffer::new_set(input.num_rows());
|
||||
@@ -959,6 +1004,13 @@ impl RangeBase {
|
||||
continue;
|
||||
}
|
||||
|
||||
if skip_tag_filter_columns.is_some_and(|columns| {
|
||||
filter_ctx.semantic_type() == SemanticType::Tag
|
||||
&& columns.contains(&filter_ctx.column_id())
|
||||
}) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Get the column directly by its projected index.
|
||||
// If the column is missing and it's not a tag/time column, this filter is skipped.
|
||||
// Assumes the projection indices align with the input batch schema.
|
||||
|
||||
Reference in New Issue
Block a user