mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-22 07:50:38 +00:00
@@ -368,7 +368,6 @@ fn apply_combined_filters(
|
||||
let predicate_mask = context.base.compute_filter_mask_flat(
|
||||
&record_batch,
|
||||
skip_fields,
|
||||
None,
|
||||
&mut tag_decode_state,
|
||||
)?;
|
||||
// If predicate filters out the entire batch, return None early
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::ops::BitAnd;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -24,7 +23,6 @@ use datatypes::arrow::compute::concat_batches;
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use mito_codec::row_converter::PrimaryKeyFilter;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::ColumnId;
|
||||
|
||||
use crate::error::{ComputeArrowSnafu, RecordBatchSnafu, Result, UnexpectedSnafu};
|
||||
use crate::memtable::BoxedBatchIterator;
|
||||
@@ -335,7 +333,6 @@ pub struct FlatPruneReader {
|
||||
context: FileRangeContextRef,
|
||||
source: FlatSource,
|
||||
primary_key_filter: Option<CachedPrimaryKeyFilter>,
|
||||
covered_primary_key_filter_columns: Option<HashSet<ColumnId>>,
|
||||
buffered_prefiltered_batch: Option<RecordBatch>,
|
||||
metrics: ReaderMetrics,
|
||||
/// Whether to skip field filters for this row group.
|
||||
@@ -352,7 +349,6 @@ impl FlatPruneReader {
|
||||
primary_key_filter: ctx
|
||||
.new_primary_key_filter()
|
||||
.map(CachedPrimaryKeyFilter::new),
|
||||
covered_primary_key_filter_columns: ctx.covered_primary_key_filter_columns(),
|
||||
buffered_prefiltered_batch: None,
|
||||
context: ctx,
|
||||
source: FlatSource::RowGroup(reader),
|
||||
@@ -368,7 +364,6 @@ impl FlatPruneReader {
|
||||
) -> Self {
|
||||
Self {
|
||||
primary_key_filter: None,
|
||||
covered_primary_key_filter_columns: None,
|
||||
buffered_prefiltered_batch: None,
|
||||
context: ctx,
|
||||
source: FlatSource::LastRow(reader),
|
||||
@@ -472,11 +467,9 @@ impl FlatPruneReader {
|
||||
}
|
||||
|
||||
let num_rows_before_filter = record_batch.num_rows();
|
||||
let Some(filtered_batch) = self.context.precise_filter_flat(
|
||||
record_batch,
|
||||
self.skip_fields,
|
||||
self.covered_primary_key_filter_columns.as_ref(),
|
||||
)?
|
||||
let Some(filtered_batch) = self
|
||||
.context
|
||||
.precise_filter_flat(record_batch, self.skip_fields)?
|
||||
else {
|
||||
// the entire batch is filtered out
|
||||
self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter;
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
//! Structs and functions for reading ranges from a parquet file. A file range
|
||||
//! is usually a row group in a parquet file.
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::collections::HashMap;
|
||||
use std::ops::{BitAnd, Range};
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -480,12 +480,6 @@ impl FileRangeContext {
|
||||
self.base.new_primary_key_filter()
|
||||
}
|
||||
|
||||
/// Returns tag columns whose simple filters are already guaranteed by the
|
||||
/// encoded primary-key prefilter.
|
||||
pub(crate) fn covered_primary_key_filter_columns(&self) -> Option<HashSet<ColumnId>> {
|
||||
self.base.covered_primary_key_filter_columns()
|
||||
}
|
||||
|
||||
/// Returns true if a partition filter is configured.
|
||||
pub(crate) fn has_partition_filter(&self) -> bool {
|
||||
self.base.partition_filter.is_some()
|
||||
@@ -529,10 +523,8 @@ impl FileRangeContext {
|
||||
&self,
|
||||
input: RecordBatch,
|
||||
skip_fields: bool,
|
||||
skip_tag_filter_columns: Option<&HashSet<ColumnId>>,
|
||||
) -> Result<Option<RecordBatch>> {
|
||||
self.base
|
||||
.precise_filter_flat(input, skip_fields, skip_tag_filter_columns)
|
||||
self.base.precise_filter_flat(input, skip_fields)
|
||||
}
|
||||
|
||||
/// Applies an encoded primary-key prefilter to the input `RecordBatch`.
|
||||
@@ -707,32 +699,6 @@ impl RangeBase {
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn covered_primary_key_filter_columns(&self) -> Option<HashSet<ColumnId>> {
|
||||
if self.read_format.metadata().primary_key.is_empty()
|
||||
|| !self
|
||||
.read_format
|
||||
.as_flat()
|
||||
.is_some_and(|format| format.raw_batch_has_primary_key_dictionary())
|
||||
{
|
||||
return None;
|
||||
}
|
||||
|
||||
let sst_metadata = self.read_format.metadata();
|
||||
let expected_metadata = self.expected_metadata.as_deref();
|
||||
let filters = self.usable_primary_key_filters()?;
|
||||
let column_ids = filters
|
||||
.iter()
|
||||
.filter_map(|filter| {
|
||||
expected_metadata
|
||||
.and_then(|metadata| metadata.column_by_name(filter.column_name()))
|
||||
.or_else(|| sst_metadata.column_by_name(filter.column_name()))
|
||||
.map(|column| column.column_id)
|
||||
})
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
(!column_ids.is_empty()).then_some(column_ids)
|
||||
}
|
||||
|
||||
/// Applies an encoded primary-key prefilter before flat-row materialization.
|
||||
///
|
||||
/// This only prunes rows that are guaranteed to fail simple primary-key predicates.
|
||||
@@ -958,25 +924,15 @@ impl RangeBase {
|
||||
&self,
|
||||
input: RecordBatch,
|
||||
skip_fields: bool,
|
||||
skip_tag_filter_columns: Option<&HashSet<ColumnId>>,
|
||||
) -> Result<Option<RecordBatch>> {
|
||||
let mut tag_decode_state = TagDecodeState::new();
|
||||
let mask = self.compute_filter_mask_flat(
|
||||
&input,
|
||||
skip_fields,
|
||||
skip_tag_filter_columns,
|
||||
&mut tag_decode_state,
|
||||
)?;
|
||||
let mask = self.compute_filter_mask_flat(&input, skip_fields, &mut tag_decode_state)?;
|
||||
|
||||
// If mask is None, the entire batch is filtered out
|
||||
let Some(mut mask) = mask else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
if self.partition_filter.is_none() && mask.count_set_bits() == input.num_rows() {
|
||||
return Ok(Some(input));
|
||||
}
|
||||
|
||||
// Apply partition filter
|
||||
if let Some(partition_filter) = &self.partition_filter {
|
||||
let record_batch = self.project_record_batch_for_pruning_flat(
|
||||
@@ -1015,7 +971,6 @@ impl RangeBase {
|
||||
&self,
|
||||
input: &RecordBatch,
|
||||
skip_fields: bool,
|
||||
skip_tag_filter_columns: Option<&HashSet<ColumnId>>,
|
||||
tag_decode_state: &mut TagDecodeState,
|
||||
) -> Result<Option<BooleanBuffer>> {
|
||||
let mut mask = BooleanBuffer::new_set(input.num_rows());
|
||||
@@ -1043,13 +998,6 @@ impl RangeBase {
|
||||
continue;
|
||||
}
|
||||
|
||||
if skip_tag_filter_columns.is_some_and(|columns| {
|
||||
filter_ctx.semantic_type() == SemanticType::Tag
|
||||
&& columns.contains(&filter_ctx.column_id())
|
||||
}) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Get the column directly by its projected index.
|
||||
// If the column is missing and it's not a tag/time column, this filter is skipped.
|
||||
// Assumes the projection indices align with the input batch schema.
|
||||
@@ -1336,8 +1284,7 @@ mod tests {
|
||||
use crate::sst::{internal_fields, location};
|
||||
use crate::test_util::sst_util::{
|
||||
new_flat_source_from_record_batches, new_primary_key, new_record_batch_by_range,
|
||||
new_sparse_primary_key, sst_file_handle, sst_region_metadata,
|
||||
sst_region_metadata_with_encoding,
|
||||
sst_file_handle, sst_region_metadata, sst_region_metadata_with_encoding,
|
||||
};
|
||||
|
||||
const FILE_DIR: &str = "/";
|
||||
@@ -1597,33 +1544,6 @@ mod tests {
|
||||
assert!(base.new_primary_key_filter().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prefilter_primary_key_ignores_reused_expected_tag_name() {
|
||||
let metadata = Arc::new(sst_region_metadata());
|
||||
let expected_metadata = expected_metadata_with_reused_tag_name(&metadata);
|
||||
let pk_ax = new_primary_key(&["a", "x"]);
|
||||
let pk_by = new_primary_key(&["b", "y"]);
|
||||
let batch = new_raw_batch_with_metadata(
|
||||
metadata.clone(),
|
||||
&[pk_ax.as_slice(), pk_by.as_slice()],
|
||||
&[10, 11],
|
||||
);
|
||||
let base = new_test_range_base_with_expected_metadata(
|
||||
metadata,
|
||||
expected_metadata,
|
||||
&[col("tag_0").eq(lit("b")), col("tag_1").eq(lit("x"))],
|
||||
);
|
||||
let mut primary_key_filter = base.new_primary_key_filter().unwrap();
|
||||
|
||||
let filtered = base
|
||||
.prefilter_flat_batch_by_primary_key(batch, primary_key_filter.as_mut())
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(filtered.num_rows(), 1);
|
||||
assert_eq!(field_values(&filtered), vec![10]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prefilter_primary_key_drops_single_dictionary_batch() {
|
||||
let pk_a = new_primary_key(&["a", "x"]);
|
||||
@@ -1638,37 +1558,6 @@ mod tests {
|
||||
assert!(filtered.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prefilter_primary_key_returns_slice_for_contiguous_matches() {
|
||||
let pk_a = new_primary_key(&["a", "x"]);
|
||||
let pk_b = new_primary_key(&["b", "x"]);
|
||||
let pk_c = new_primary_key(&["c", "x"]);
|
||||
let pk_d = new_primary_key(&["d", "x"]);
|
||||
let batch = new_raw_batch(
|
||||
&[
|
||||
pk_a.as_slice(),
|
||||
pk_a.as_slice(),
|
||||
pk_b.as_slice(),
|
||||
pk_b.as_slice(),
|
||||
pk_c.as_slice(),
|
||||
pk_c.as_slice(),
|
||||
pk_d.as_slice(),
|
||||
pk_d.as_slice(),
|
||||
],
|
||||
&[10, 11, 12, 13, 14, 15, 16, 17],
|
||||
);
|
||||
let base = new_test_range_base(&[col("tag_0").eq(lit("b")).or(col("tag_0").eq(lit("c")))]);
|
||||
let mut primary_key_filter = base.new_primary_key_filter().unwrap();
|
||||
|
||||
let filtered = base
|
||||
.prefilter_flat_batch_by_primary_key(batch, primary_key_filter.as_mut())
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(filtered.num_rows(), 4);
|
||||
assert_eq!(field_values(&filtered), vec![12, 13, 14, 15]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prefilter_primary_key_builds_mask_for_fragmented_matches() {
|
||||
let pk_a = new_primary_key(&["a", "x"]);
|
||||
@@ -1700,64 +1589,6 @@ mod tests {
|
||||
assert_eq!(field_values(&filtered), vec![10, 11, 14, 15]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_matching_row_ranges_by_primary_key_merges_adjacent_spans() {
|
||||
let pk_a = new_primary_key(&["a", "x"]);
|
||||
let pk_b = new_primary_key(&["b", "x"]);
|
||||
let pk_c = new_primary_key(&["c", "x"]);
|
||||
let pk_d = new_primary_key(&["d", "x"]);
|
||||
let batch = new_raw_batch(
|
||||
&[
|
||||
pk_a.as_slice(),
|
||||
pk_a.as_slice(),
|
||||
pk_b.as_slice(),
|
||||
pk_b.as_slice(),
|
||||
pk_c.as_slice(),
|
||||
pk_c.as_slice(),
|
||||
pk_d.as_slice(),
|
||||
pk_d.as_slice(),
|
||||
],
|
||||
&[10, 11, 12, 13, 14, 15, 16, 17],
|
||||
);
|
||||
let base = new_test_range_base(&[col("tag_0").eq(lit("a")).or(col("tag_0").eq(lit("c")))]);
|
||||
let mut primary_key_filter = base.new_primary_key_filter().unwrap();
|
||||
|
||||
let matched = base
|
||||
.matching_row_ranges_by_primary_key(&batch, primary_key_filter.as_mut())
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(matched, vec![0..2, 4..6]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prefilter_primary_key_sparse_path() {
|
||||
let metadata = Arc::new(sst_region_metadata_with_encoding(
|
||||
PrimaryKeyEncoding::Sparse,
|
||||
));
|
||||
let pk_a = new_sparse_primary_key(&["a", "x"], &metadata, 1, 11);
|
||||
let pk_b = new_sparse_primary_key(&["b", "x"], &metadata, 1, 22);
|
||||
let batch = new_raw_batch_with_metadata(
|
||||
metadata.clone(),
|
||||
&[
|
||||
pk_a.as_slice(),
|
||||
pk_a.as_slice(),
|
||||
pk_b.as_slice(),
|
||||
pk_b.as_slice(),
|
||||
],
|
||||
&[10, 11, 12, 13],
|
||||
);
|
||||
let base = new_test_range_base_with_metadata(metadata, None, &[col("tag_0").eq(lit("b"))]);
|
||||
let mut primary_key_filter = base.new_primary_key_filter().unwrap();
|
||||
|
||||
let filtered = base
|
||||
.prefilter_flat_batch_by_primary_key(batch, primary_key_filter.as_mut())
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(filtered.num_rows(), 2);
|
||||
assert_eq!(field_values(&filtered), vec![12, 13]);
|
||||
}
|
||||
|
||||
async fn fetch_metrics_for_predicate(predicate: Option<Predicate>) -> ParquetFetchMetricsData {
|
||||
let object_store = object_store::ObjectStore::new(Memory::default())
|
||||
.unwrap()
|
||||
|
||||
Reference in New Issue
Block a user