mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-26 18:00:41 +00:00
remove last row related
This commit is contained in:
@@ -17,8 +17,7 @@
|
||||
use std::collections::VecDeque;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_recordbatch::filter::SimpleFilterEvaluator;
|
||||
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyFilter, build_primary_key_codec};
|
||||
use mito_codec::row_converter::{DensePrimaryKeyCodec, build_primary_key_codec};
|
||||
use parquet::file::metadata::ParquetMetaData;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::ColumnId;
|
||||
@@ -87,21 +86,11 @@ impl BulkIterContext {
|
||||
.as_ref()
|
||||
.map(|pred| pred.dyn_filters().as_ref().clone())
|
||||
.unwrap_or_default();
|
||||
let primary_key_filters = predicate
|
||||
.as_ref()
|
||||
.map(|pred| {
|
||||
pred.exprs()
|
||||
.iter()
|
||||
.filter_map(SimpleFilterEvaluator::try_new)
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
.filter(|filters| !filters.is_empty())
|
||||
.map(Arc::new);
|
||||
|
||||
Ok(Self {
|
||||
base: RangeBase {
|
||||
filters: simple_filters,
|
||||
primary_key_filters,
|
||||
primary_key_filters: None,
|
||||
dyn_filters,
|
||||
read_format,
|
||||
prune_schema: region_metadata.schema.clone(),
|
||||
@@ -149,10 +138,6 @@ impl BulkIterContext {
|
||||
&self.base.read_format
|
||||
}
|
||||
|
||||
pub(crate) fn new_primary_key_filter(&self) -> Option<Box<dyn PrimaryKeyFilter>> {
|
||||
self.base.new_primary_key_filter()
|
||||
}
|
||||
|
||||
/// Returns the pre-filter mode.
|
||||
pub(crate) fn pre_filter_mode(&self) -> PreFilterMode {
|
||||
self.base.pre_filter_mode
|
||||
|
||||
@@ -17,7 +17,6 @@ use std::time::Instant;
|
||||
|
||||
use datatypes::arrow::array::BooleanArray;
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use mito_codec::row_converter::PrimaryKeyFilter;
|
||||
use parquet::arrow::ProjectionMask;
|
||||
use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
|
||||
use snafu::ResultExt;
|
||||
@@ -47,7 +46,6 @@ pub struct EncodedBulkPartIter {
|
||||
metrics: MemScanMetricsData,
|
||||
/// Optional memory scan metrics to report to.
|
||||
mem_scan_metrics: Option<MemScanMetrics>,
|
||||
primary_key_filter: Option<Box<dyn PrimaryKeyFilter>>,
|
||||
}
|
||||
|
||||
impl EncodedBulkPartIter {
|
||||
@@ -60,7 +58,6 @@ impl EncodedBulkPartIter {
|
||||
mem_scan_metrics: Option<MemScanMetrics>,
|
||||
) -> error::Result<Self> {
|
||||
assert!(context.read_format().as_flat().is_some());
|
||||
let primary_key_filter = context.base.new_primary_key_filter();
|
||||
|
||||
let parquet_meta = encoded_part.metadata().parquet_metadata.clone();
|
||||
let data = encoded_part.data().clone();
|
||||
@@ -94,7 +91,6 @@ impl EncodedBulkPartIter {
|
||||
..Default::default()
|
||||
},
|
||||
mem_scan_metrics,
|
||||
primary_key_filter,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -119,7 +115,6 @@ impl EncodedBulkPartIter {
|
||||
if let Some(batch) = apply_combined_filters(
|
||||
&self.context,
|
||||
&self.sequence,
|
||||
&mut self.primary_key_filter,
|
||||
batch,
|
||||
self.current_skip_fields,
|
||||
)? {
|
||||
@@ -146,7 +141,6 @@ impl EncodedBulkPartIter {
|
||||
if let Some(batch) = apply_combined_filters(
|
||||
&self.context,
|
||||
&self.sequence,
|
||||
&mut self.primary_key_filter,
|
||||
batch,
|
||||
self.current_skip_fields,
|
||||
)? {
|
||||
@@ -216,7 +210,6 @@ pub struct BulkPartBatchIter {
|
||||
metrics: MemScanMetricsData,
|
||||
/// Optional memory scan metrics to report to.
|
||||
mem_scan_metrics: Option<MemScanMetrics>,
|
||||
primary_key_filter: Option<Box<dyn PrimaryKeyFilter>>,
|
||||
}
|
||||
|
||||
impl BulkPartBatchIter {
|
||||
@@ -229,7 +222,6 @@ impl BulkPartBatchIter {
|
||||
mem_scan_metrics: Option<MemScanMetrics>,
|
||||
) -> Self {
|
||||
assert!(context.read_format().as_flat().is_some());
|
||||
let primary_key_filter = context.base.new_primary_key_filter();
|
||||
|
||||
Self {
|
||||
batches: VecDeque::from(batches),
|
||||
@@ -240,7 +232,6 @@ impl BulkPartBatchIter {
|
||||
..Default::default()
|
||||
},
|
||||
mem_scan_metrics,
|
||||
primary_key_filter,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -292,13 +283,8 @@ impl BulkPartBatchIter {
|
||||
PreFilterMode::SkipFieldsOnDelete => true,
|
||||
};
|
||||
|
||||
let Some(filtered_batch) = apply_combined_filters(
|
||||
&self.context,
|
||||
&self.sequence,
|
||||
&mut self.primary_key_filter,
|
||||
projected_batch,
|
||||
skip_fields,
|
||||
)?
|
||||
let Some(filtered_batch) =
|
||||
apply_combined_filters(&self.context, &self.sequence, projected_batch, skip_fields)?
|
||||
else {
|
||||
self.metrics.scan_cost += start.elapsed();
|
||||
return Ok(None);
|
||||
@@ -366,23 +352,9 @@ impl Drop for BulkPartBatchIter {
|
||||
fn apply_combined_filters(
|
||||
context: &BulkIterContext,
|
||||
sequence: &Option<SequenceRange>,
|
||||
primary_key_filter: &mut Option<Box<dyn PrimaryKeyFilter>>,
|
||||
record_batch: RecordBatch,
|
||||
skip_fields: bool,
|
||||
) -> error::Result<Option<RecordBatch>> {
|
||||
let covered_primary_key_filter_columns = primary_key_filter
|
||||
.as_ref()
|
||||
.and_then(|_| context.base.covered_primary_key_filter_columns());
|
||||
let record_batch = match primary_key_filter.as_mut() {
|
||||
Some(primary_key_filter) => context
|
||||
.base
|
||||
.prefilter_flat_batch_by_primary_key(record_batch, primary_key_filter.as_mut())?,
|
||||
None => Some(record_batch),
|
||||
};
|
||||
let Some(record_batch) = record_batch else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
// Converts the format to the flat format first.
|
||||
let format = context.read_format().as_flat().unwrap();
|
||||
let record_batch = format.convert_batch(record_batch, None)?;
|
||||
@@ -396,7 +368,6 @@ fn apply_combined_filters(
|
||||
let predicate_mask = context.base.compute_filter_mask_flat(
|
||||
&record_batch,
|
||||
skip_fields,
|
||||
covered_primary_key_filter_columns.as_ref(),
|
||||
&mut tag_decode_state,
|
||||
)?;
|
||||
// If predicate filters out the entire batch, return None early
|
||||
@@ -453,7 +424,7 @@ mod tests {
|
||||
ArrayRef, BinaryArray, DictionaryArray, Int64Array, StringArray, UInt8Array, UInt32Array,
|
||||
UInt64Array,
|
||||
};
|
||||
use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type};
|
||||
use datatypes::arrow::datatypes::{DataType, Field, Schema};
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
|
||||
@@ -462,15 +433,6 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
use crate::memtable::bulk::context::BulkIterContext;
|
||||
use crate::test_util::sst_util::new_primary_key;
|
||||
|
||||
fn encoded_primary_key_array(tags: &[&str]) -> Arc<DictionaryArray<UInt32Type>> {
|
||||
let values = Arc::new(BinaryArray::from_iter_values(
|
||||
tags.iter().map(|tag| new_primary_key(&[*tag])),
|
||||
));
|
||||
let keys = UInt32Array::from_iter_values(0..tags.len() as u32);
|
||||
Arc::new(DictionaryArray::new(keys, values))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bulk_part_batch_iter() {
|
||||
@@ -500,7 +462,10 @@ mod tests {
|
||||
));
|
||||
|
||||
// Create primary key dictionary array
|
||||
let primary_key = encoded_primary_key_array(&["key1", "key2", "key3"]);
|
||||
use datatypes::arrow::array::{BinaryArray, DictionaryArray, UInt32Array};
|
||||
let values = Arc::new(BinaryArray::from_iter_values([b"key1", b"key2", b"key3"]));
|
||||
let keys = UInt32Array::from(vec![0, 1, 2]);
|
||||
let primary_key = Arc::new(DictionaryArray::new(keys, values));
|
||||
|
||||
let sequence = Arc::new(UInt64Array::from(vec![1, 2, 3]));
|
||||
let op_type = Arc::new(UInt8Array::from(vec![1, 1, 1])); // PUT operations
|
||||
@@ -636,7 +601,9 @@ mod tests {
|
||||
let timestamp_1 = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from(
|
||||
vec![1000, 2000],
|
||||
));
|
||||
let primary_key_1 = encoded_primary_key_array(&["key1", "key2"]);
|
||||
let values_1 = Arc::new(BinaryArray::from_iter_values([b"key1", b"key2"]));
|
||||
let keys_1 = UInt32Array::from(vec![0, 1]);
|
||||
let primary_key_1 = Arc::new(DictionaryArray::new(keys_1, values_1));
|
||||
let sequence_1 = Arc::new(UInt64Array::from(vec![1, 2]));
|
||||
let op_type_1 = Arc::new(UInt8Array::from(vec![1, 1]));
|
||||
|
||||
@@ -659,7 +626,9 @@ mod tests {
|
||||
let timestamp_2 = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from(
|
||||
vec![3000, 4000, 5000],
|
||||
));
|
||||
let primary_key_2 = encoded_primary_key_array(&["key3", "key4", "key5"]);
|
||||
let values_2 = Arc::new(BinaryArray::from_iter_values([b"key3", b"key4", b"key5"]));
|
||||
let keys_2 = UInt32Array::from(vec![0, 1, 2]);
|
||||
let primary_key_2 = Arc::new(DictionaryArray::new(keys_2, values_2));
|
||||
let sequence_2 = Arc::new(UInt64Array::from(vec![3, 4, 5]));
|
||||
let op_type_2 = Arc::new(UInt8Array::from(vec![1, 1, 1]));
|
||||
|
||||
|
||||
@@ -22,7 +22,6 @@ use datatypes::arrow::compute::concat_batches;
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::vectors::UInt32Vector;
|
||||
use futures::{Stream, TryStreamExt};
|
||||
use mito_codec::row_converter::PrimaryKeyFilter;
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::{FileId, TimeSeriesRowSelector};
|
||||
|
||||
@@ -313,7 +312,6 @@ impl FlatRowGroupLastRowCachedReader {
|
||||
cache_strategy: CacheStrategy,
|
||||
projection: &[usize],
|
||||
reader: FlatRowGroupReader,
|
||||
primary_key_filter: Option<Box<dyn PrimaryKeyFilter>>,
|
||||
) -> Self {
|
||||
let key = SelectorResultKey {
|
||||
file_id,
|
||||
@@ -327,10 +325,10 @@ impl FlatRowGroupLastRowCachedReader {
|
||||
if is_flat && schema_matches {
|
||||
Self::new_hit(value)
|
||||
} else {
|
||||
Self::new_miss(key, projection, reader, cache_strategy, primary_key_filter)
|
||||
Self::new_miss(key, projection, reader, cache_strategy)
|
||||
}
|
||||
} else {
|
||||
Self::new_miss(key, projection, reader, cache_strategy, primary_key_filter)
|
||||
Self::new_miss(key, projection, reader, cache_strategy)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -352,7 +350,6 @@ impl FlatRowGroupLastRowCachedReader {
|
||||
projection: &[usize],
|
||||
reader: FlatRowGroupReader,
|
||||
cache_strategy: CacheStrategy,
|
||||
primary_key_filter: Option<Box<dyn PrimaryKeyFilter>>,
|
||||
) -> Self {
|
||||
selector_result_cache_miss();
|
||||
Self::Miss(FlatRowGroupLastRowReader::new(
|
||||
@@ -360,7 +357,6 @@ impl FlatRowGroupLastRowCachedReader {
|
||||
projection.to_vec(),
|
||||
reader,
|
||||
cache_strategy,
|
||||
primary_key_filter,
|
||||
))
|
||||
}
|
||||
}
|
||||
@@ -434,7 +430,6 @@ impl BatchBuffer {
|
||||
pub(crate) struct FlatRowGroupLastRowReader {
|
||||
key: SelectorResultKey,
|
||||
reader: FlatRowGroupReader,
|
||||
primary_key_filter: Option<Box<dyn PrimaryKeyFilter>>,
|
||||
selector: FlatLastTimestampSelector,
|
||||
yielded_batches: Vec<RecordBatch>,
|
||||
cache_strategy: CacheStrategy,
|
||||
@@ -449,12 +444,10 @@ impl FlatRowGroupLastRowReader {
|
||||
projection: Vec<usize>,
|
||||
reader: FlatRowGroupReader,
|
||||
cache_strategy: CacheStrategy,
|
||||
primary_key_filter: Option<Box<dyn PrimaryKeyFilter>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
key,
|
||||
reader,
|
||||
primary_key_filter,
|
||||
selector: FlatLastTimestampSelector::default(),
|
||||
yielded_batches: vec![],
|
||||
cache_strategy,
|
||||
@@ -478,11 +471,7 @@ impl FlatRowGroupLastRowReader {
|
||||
return self.flush_pending();
|
||||
}
|
||||
|
||||
while let Some(raw_batch) = self.reader.next_raw_batch()? {
|
||||
let Some(raw_batch) = self.prefilter_primary_keys(raw_batch)? else {
|
||||
continue;
|
||||
};
|
||||
let batch = self.reader.convert_batch(raw_batch)?;
|
||||
while let Some(batch) = self.reader.next_batch()? {
|
||||
self.selector.on_next(batch, &mut self.pending)?;
|
||||
if self.pending.is_full() {
|
||||
return self.flush_pending();
|
||||
@@ -503,26 +492,10 @@ impl FlatRowGroupLastRowReader {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn prefilter_primary_keys(&mut self, batch: RecordBatch) -> Result<Option<RecordBatch>> {
|
||||
let Some(primary_key_filter) = self.primary_key_filter.as_mut() else {
|
||||
return Ok(Some(batch));
|
||||
};
|
||||
|
||||
self.reader
|
||||
.prefilter_raw_batch_by_primary_key(batch, primary_key_filter.as_mut())
|
||||
}
|
||||
|
||||
fn maybe_update_cache(&mut self) {
|
||||
if self.yielded_batches.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
// Filtered flat last-row scans only contain the subset of series that matched the
|
||||
// encoded primary-key prefilter, so they cannot be published under the shared
|
||||
// selector cache key.
|
||||
if self.primary_key_filter.is_some() {
|
||||
return;
|
||||
}
|
||||
let batches = std::mem::take(&mut self.yielded_batches);
|
||||
let value = Arc::new(SelectorResultValue::new_flat(
|
||||
batches,
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::ops::BitAnd;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -20,20 +19,16 @@ use common_recordbatch::filter::SimpleFilterEvaluator;
|
||||
use common_time::Timestamp;
|
||||
use datatypes::arrow::array::BooleanArray;
|
||||
use datatypes::arrow::buffer::BooleanBuffer;
|
||||
use datatypes::arrow::compute::concat_batches;
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use mito_codec::row_converter::PrimaryKeyFilter;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::ColumnId;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{ComputeArrowSnafu, RecordBatchSnafu, Result, UnexpectedSnafu};
|
||||
use crate::error::{RecordBatchSnafu, Result};
|
||||
use crate::memtable::BoxedBatchIterator;
|
||||
use crate::read::last_row::{FlatRowGroupLastRowCachedReader, RowGroupLastRowCachedReader};
|
||||
use crate::read::{Batch, BatchReader};
|
||||
use crate::sst::file::FileTimeRange;
|
||||
use crate::sst::parquet::file_range::FileRangeContextRef;
|
||||
use crate::sst::parquet::flat_format::primary_key_column_index;
|
||||
use crate::sst::parquet::format::PrimaryKeyArray;
|
||||
use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics, RowGroupReader};
|
||||
|
||||
pub enum Source {
|
||||
@@ -268,75 +263,12 @@ impl FlatSource {
|
||||
}
|
||||
}
|
||||
|
||||
struct CachedPrimaryKeyFilter {
|
||||
inner: Box<dyn PrimaryKeyFilter>,
|
||||
last_primary_key: Vec<u8>,
|
||||
last_match: Option<bool>,
|
||||
}
|
||||
|
||||
impl CachedPrimaryKeyFilter {
|
||||
fn new(inner: Box<dyn PrimaryKeyFilter>) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
last_primary_key: Vec::new(),
|
||||
last_match: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PrimaryKeyFilter for CachedPrimaryKeyFilter {
|
||||
fn matches(&mut self, pk: &[u8]) -> bool {
|
||||
if let Some(last_match) = self.last_match
|
||||
&& self.last_primary_key == pk
|
||||
{
|
||||
return last_match;
|
||||
}
|
||||
|
||||
let matched = self.inner.matches(pk);
|
||||
self.last_primary_key.clear();
|
||||
self.last_primary_key.extend_from_slice(pk);
|
||||
self.last_match = Some(matched);
|
||||
matched
|
||||
}
|
||||
}
|
||||
|
||||
fn batch_single_primary_key(batch: &RecordBatch) -> Result<Option<&[u8]>> {
|
||||
let primary_key_index = primary_key_column_index(batch.num_columns());
|
||||
let pk_dict_array = batch
|
||||
.column(primary_key_index)
|
||||
.as_any()
|
||||
.downcast_ref::<PrimaryKeyArray>()
|
||||
.context(UnexpectedSnafu {
|
||||
reason: "Primary key column is not a dictionary array".to_string(),
|
||||
})?;
|
||||
let pk_values = pk_dict_array
|
||||
.values()
|
||||
.as_any()
|
||||
.downcast_ref::<datatypes::arrow::array::BinaryArray>()
|
||||
.context(UnexpectedSnafu {
|
||||
reason: "Primary key values are not binary array".to_string(),
|
||||
})?;
|
||||
let keys = pk_dict_array.keys();
|
||||
if keys.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let first_key = keys.value(0);
|
||||
if first_key != keys.value(keys.len() - 1) {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
Ok(Some(pk_values.value(first_key as usize)))
|
||||
}
|
||||
|
||||
/// A flat format reader that returns RecordBatch instead of Batch.
|
||||
pub struct FlatPruneReader {
|
||||
/// Context for file ranges.
|
||||
context: FileRangeContextRef,
|
||||
source: FlatSource,
|
||||
primary_key_filter: Option<CachedPrimaryKeyFilter>,
|
||||
covered_primary_key_filter_columns: Option<HashSet<ColumnId>>,
|
||||
buffered_prefiltered_batch: Option<RecordBatch>,
|
||||
primary_key_filter: Option<Box<dyn PrimaryKeyFilter>>,
|
||||
metrics: ReaderMetrics,
|
||||
/// Whether to skip field filters for this row group.
|
||||
skip_fields: bool,
|
||||
@@ -349,11 +281,7 @@ impl FlatPruneReader {
|
||||
skip_fields: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
primary_key_filter: ctx
|
||||
.new_primary_key_filter()
|
||||
.map(CachedPrimaryKeyFilter::new),
|
||||
covered_primary_key_filter_columns: ctx.covered_primary_key_filter_columns(),
|
||||
buffered_prefiltered_batch: None,
|
||||
primary_key_filter: ctx.new_primary_key_filter(),
|
||||
context: ctx,
|
||||
source: FlatSource::RowGroup(reader),
|
||||
metrics: Default::default(),
|
||||
@@ -368,8 +296,6 @@ impl FlatPruneReader {
|
||||
) -> Self {
|
||||
Self {
|
||||
primary_key_filter: None,
|
||||
covered_primary_key_filter_columns: None,
|
||||
buffered_prefiltered_batch: None,
|
||||
context: ctx,
|
||||
source: FlatSource::LastRow(reader),
|
||||
metrics: Default::default(),
|
||||
@@ -383,18 +309,25 @@ impl FlatPruneReader {
|
||||
}
|
||||
|
||||
pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
|
||||
loop {
|
||||
let Some(mut raw_batch) = self.next_prefiltered_batch()? else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let scan_start = std::time::Instant::now();
|
||||
self.coalesce_prefiltered_batches(&mut raw_batch)?;
|
||||
let record_batch = self.source.convert_batch(raw_batch)?;
|
||||
self.metrics.scan_cost += scan_start.elapsed();
|
||||
|
||||
// `num_batches` counts decoded flat batches, not raw parquet batches.
|
||||
while let Some(raw_batch) = {
|
||||
let start = std::time::Instant::now();
|
||||
let batch = self.source.next_raw_batch()?;
|
||||
self.metrics.scan_cost += start.elapsed();
|
||||
batch
|
||||
} {
|
||||
self.metrics.num_rows += raw_batch.num_rows();
|
||||
self.metrics.num_batches += 1;
|
||||
|
||||
let num_rows_before_prefilter = raw_batch.num_rows();
|
||||
let Some(prefiltered_batch) = self.prefilter_primary_keys(raw_batch)? else {
|
||||
self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_prefilter;
|
||||
continue;
|
||||
};
|
||||
let prefiltered_rows = num_rows_before_prefilter - prefiltered_batch.num_rows();
|
||||
self.metrics.filter_metrics.rows_precise_filtered += prefiltered_rows;
|
||||
|
||||
let record_batch = self.source.convert_batch(prefiltered_batch)?;
|
||||
|
||||
match self.prune_flat(record_batch)? {
|
||||
Some(filtered_batch) => {
|
||||
return Ok(Some(filtered_batch));
|
||||
@@ -404,56 +337,8 @@ impl FlatPruneReader {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn next_prefiltered_batch(&mut self) -> Result<Option<RecordBatch>> {
|
||||
if let Some(batch) = self.buffered_prefiltered_batch.take() {
|
||||
return Ok(Some(batch));
|
||||
}
|
||||
|
||||
loop {
|
||||
let start = std::time::Instant::now();
|
||||
let Some(raw_batch) = self.source.next_raw_batch()? else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
self.metrics.num_rows += raw_batch.num_rows();
|
||||
|
||||
let num_rows_before_prefilter = raw_batch.num_rows();
|
||||
let Some(prefiltered_batch) = self.prefilter_primary_keys(raw_batch)? else {
|
||||
self.metrics.scan_cost += start.elapsed();
|
||||
self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_prefilter;
|
||||
continue;
|
||||
};
|
||||
let prefiltered_rows = num_rows_before_prefilter - prefiltered_batch.num_rows();
|
||||
self.metrics.filter_metrics.rows_precise_filtered += prefiltered_rows;
|
||||
self.metrics.scan_cost += start.elapsed();
|
||||
return Ok(Some(prefiltered_batch));
|
||||
}
|
||||
}
|
||||
|
||||
fn coalesce_prefiltered_batches(&mut self, batch: &mut RecordBatch) -> Result<()> {
|
||||
let Some(primary_key) = batch_single_primary_key(batch)? else {
|
||||
return Ok(());
|
||||
};
|
||||
let primary_key = primary_key.to_vec();
|
||||
let schema = batch.schema();
|
||||
let mut batches = vec![batch.clone()];
|
||||
|
||||
while let Some(next_batch) = self.next_prefiltered_batch()? {
|
||||
if batch_single_primary_key(&next_batch)? == Some(primary_key.as_slice()) {
|
||||
batches.push(next_batch);
|
||||
} else {
|
||||
self.buffered_prefiltered_batch = Some(next_batch);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if batches.len() > 1 {
|
||||
*batch = concat_batches(&schema, &batches).context(ComputeArrowSnafu)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn prefilter_primary_keys(&mut self, record_batch: RecordBatch) -> Result<Option<RecordBatch>> {
|
||||
@@ -462,7 +347,7 @@ impl FlatPruneReader {
|
||||
};
|
||||
|
||||
self.context
|
||||
.prefilter_flat_batch_by_primary_key(record_batch, primary_key_filter)
|
||||
.prefilter_flat_batch_by_primary_key(record_batch, primary_key_filter.as_mut())
|
||||
}
|
||||
|
||||
/// Prunes batches by the pushed down predicate and returns RecordBatch.
|
||||
@@ -473,11 +358,9 @@ impl FlatPruneReader {
|
||||
}
|
||||
|
||||
let num_rows_before_filter = record_batch.num_rows();
|
||||
let Some(filtered_batch) = self.context.precise_filter_flat(
|
||||
record_batch,
|
||||
self.skip_fields,
|
||||
self.covered_primary_key_filter_columns.as_ref(),
|
||||
)?
|
||||
let Some(filtered_batch) = self
|
||||
.context
|
||||
.precise_filter_flat(record_batch, self.skip_fields)?
|
||||
else {
|
||||
// the entire batch is filtered out
|
||||
self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter;
|
||||
@@ -498,121 +381,13 @@ impl FlatPruneReader {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
use api::v1::OpType;
|
||||
use datafusion_common::ScalarValue;
|
||||
use datafusion_expr::{Expr, col, lit};
|
||||
use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt32Array};
|
||||
use datatypes::arrow::datatypes::{DataType, Field, Schema, TimeUnit, UInt32Type};
|
||||
|
||||
use super::*;
|
||||
use crate::test_util::new_batch;
|
||||
|
||||
struct CountingPrimaryKeyFilter {
|
||||
calls: Arc<AtomicUsize>,
|
||||
matched: bool,
|
||||
}
|
||||
|
||||
impl PrimaryKeyFilter for CountingPrimaryKeyFilter {
|
||||
fn matches(&mut self, _pk: &[u8]) -> bool {
|
||||
self.calls.fetch_add(1, Ordering::Relaxed);
|
||||
self.matched
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cached_primary_key_filter_reuses_last_match() {
|
||||
let calls = Arc::new(AtomicUsize::new(0));
|
||||
let mut filter = CachedPrimaryKeyFilter::new(Box::new(CountingPrimaryKeyFilter {
|
||||
calls: calls.clone(),
|
||||
matched: true,
|
||||
}));
|
||||
|
||||
assert!(filter.matches(b"series-a"));
|
||||
assert!(filter.matches(b"series-a"));
|
||||
assert_eq!(calls.load(Ordering::Relaxed), 1);
|
||||
|
||||
assert!(filter.matches(b"series-b"));
|
||||
assert!(filter.matches(b"series-b"));
|
||||
assert_eq!(calls.load(Ordering::Relaxed), 2);
|
||||
}
|
||||
|
||||
fn new_flat_raw_batch(primary_keys: &[&[u8]]) -> RecordBatch {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("value", DataType::Float64, true),
|
||||
Field::new(
|
||||
"ts",
|
||||
DataType::Timestamp(TimeUnit::Millisecond, None),
|
||||
false,
|
||||
),
|
||||
Field::new(
|
||||
"__primary_key",
|
||||
DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
|
||||
false,
|
||||
),
|
||||
Field::new("__sequence", DataType::UInt64, false),
|
||||
Field::new("__op_type", DataType::UInt8, false),
|
||||
]));
|
||||
|
||||
let mut dict_values = Vec::new();
|
||||
let mut keys = Vec::with_capacity(primary_keys.len());
|
||||
for pk in primary_keys {
|
||||
let key = dict_values
|
||||
.iter()
|
||||
.position(|existing: &&[u8]| existing == pk)
|
||||
.unwrap_or_else(|| {
|
||||
dict_values.push(*pk);
|
||||
dict_values.len() - 1
|
||||
});
|
||||
keys.push(key as u32);
|
||||
}
|
||||
|
||||
let pk_array: ArrayRef = Arc::new(DictionaryArray::<UInt32Type>::new(
|
||||
UInt32Array::from(keys),
|
||||
Arc::new(BinaryArray::from_iter_values(dict_values.iter().copied())),
|
||||
));
|
||||
|
||||
RecordBatch::try_new(
|
||||
schema,
|
||||
vec![
|
||||
Arc::new(datatypes::arrow::array::Float64Array::from(
|
||||
vec![1.0; primary_keys.len()],
|
||||
)),
|
||||
Arc::new(
|
||||
datatypes::arrow::array::TimestampMillisecondArray::from_iter_values(
|
||||
0..primary_keys.len() as i64,
|
||||
),
|
||||
),
|
||||
pk_array,
|
||||
Arc::new(datatypes::arrow::array::UInt64Array::from(vec![
|
||||
1;
|
||||
primary_keys
|
||||
.len()
|
||||
])),
|
||||
Arc::new(datatypes::arrow::array::UInt8Array::from(vec![
|
||||
1;
|
||||
primary_keys
|
||||
.len()
|
||||
])),
|
||||
],
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_batch_single_primary_key_detects_single_series_batch() {
|
||||
let batch = new_flat_raw_batch(&[b"series-a", b"series-a"]);
|
||||
assert_eq!(
|
||||
batch_single_primary_key(&batch).unwrap(),
|
||||
Some(&b"series-a"[..])
|
||||
);
|
||||
|
||||
let batch = new_flat_raw_batch(&[b"series-a", b"series-b"]);
|
||||
assert!(batch_single_primary_key(&batch).unwrap().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_time_iter_empty() {
|
||||
let input = [];
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
//! Structs and functions for reading ranges from a parquet file. A file range
|
||||
//! is usually a row group in a parquet file.
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::collections::HashMap;
|
||||
use std::ops::{BitAnd, Range};
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -294,7 +294,6 @@ impl FileRange {
|
||||
self.context.reader_builder.cache_strategy().clone(),
|
||||
self.context.read_format().projection_indices(),
|
||||
flat_row_group_reader,
|
||||
self.context.new_primary_key_filter(),
|
||||
);
|
||||
FlatPruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields)
|
||||
} else {
|
||||
@@ -442,12 +441,6 @@ impl FileRangeContext {
|
||||
self.base.new_primary_key_filter()
|
||||
}
|
||||
|
||||
/// Returns tag columns whose simple filters are already guaranteed by the
|
||||
/// encoded primary-key prefilter.
|
||||
pub(crate) fn covered_primary_key_filter_columns(&self) -> Option<HashSet<ColumnId>> {
|
||||
self.base.covered_primary_key_filter_columns()
|
||||
}
|
||||
|
||||
/// Returns true if a partition filter is configured.
|
||||
pub(crate) fn has_partition_filter(&self) -> bool {
|
||||
self.base.partition_filter.is_some()
|
||||
@@ -491,10 +484,8 @@ impl FileRangeContext {
|
||||
&self,
|
||||
input: RecordBatch,
|
||||
skip_fields: bool,
|
||||
skip_tag_filter_columns: Option<&HashSet<ColumnId>>,
|
||||
) -> Result<Option<RecordBatch>> {
|
||||
self.base
|
||||
.precise_filter_flat(input, skip_fields, skip_tag_filter_columns)
|
||||
self.base.precise_filter_flat(input, skip_fields)
|
||||
}
|
||||
|
||||
/// Applies an encoded primary-key prefilter to the input `RecordBatch`.
|
||||
@@ -669,32 +660,6 @@ impl RangeBase {
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn covered_primary_key_filter_columns(&self) -> Option<HashSet<ColumnId>> {
|
||||
if self.read_format.metadata().primary_key.is_empty()
|
||||
|| !self
|
||||
.read_format
|
||||
.as_flat()
|
||||
.is_some_and(|format| format.raw_batch_has_primary_key_dictionary())
|
||||
{
|
||||
return None;
|
||||
}
|
||||
|
||||
let sst_metadata = self.read_format.metadata();
|
||||
let expected_metadata = self.expected_metadata.as_deref();
|
||||
let filters = self.usable_primary_key_filters()?;
|
||||
let column_ids = filters
|
||||
.iter()
|
||||
.filter_map(|filter| {
|
||||
expected_metadata
|
||||
.and_then(|metadata| metadata.column_by_name(filter.column_name()))
|
||||
.or_else(|| sst_metadata.column_by_name(filter.column_name()))
|
||||
.map(|column| column.column_id)
|
||||
})
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
(!column_ids.is_empty()).then_some(column_ids)
|
||||
}
|
||||
|
||||
/// Applies an encoded primary-key prefilter before flat-row materialization.
|
||||
///
|
||||
/// This only prunes rows that are guaranteed to fail simple primary-key predicates.
|
||||
@@ -920,25 +885,15 @@ impl RangeBase {
|
||||
&self,
|
||||
input: RecordBatch,
|
||||
skip_fields: bool,
|
||||
skip_tag_filter_columns: Option<&HashSet<ColumnId>>,
|
||||
) -> Result<Option<RecordBatch>> {
|
||||
let mut tag_decode_state = TagDecodeState::new();
|
||||
let mask = self.compute_filter_mask_flat(
|
||||
&input,
|
||||
skip_fields,
|
||||
skip_tag_filter_columns,
|
||||
&mut tag_decode_state,
|
||||
)?;
|
||||
let mask = self.compute_filter_mask_flat(&input, skip_fields, &mut tag_decode_state)?;
|
||||
|
||||
// If mask is None, the entire batch is filtered out
|
||||
let Some(mut mask) = mask else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
if self.partition_filter.is_none() && mask.count_set_bits() == input.num_rows() {
|
||||
return Ok(Some(input));
|
||||
}
|
||||
|
||||
// Apply partition filter
|
||||
if let Some(partition_filter) = &self.partition_filter {
|
||||
let record_batch = self.project_record_batch_for_pruning_flat(
|
||||
@@ -977,7 +932,6 @@ impl RangeBase {
|
||||
&self,
|
||||
input: &RecordBatch,
|
||||
skip_fields: bool,
|
||||
skip_tag_filter_columns: Option<&HashSet<ColumnId>>,
|
||||
tag_decode_state: &mut TagDecodeState,
|
||||
) -> Result<Option<BooleanBuffer>> {
|
||||
let mut mask = BooleanBuffer::new_set(input.num_rows());
|
||||
@@ -1005,13 +959,6 @@ impl RangeBase {
|
||||
continue;
|
||||
}
|
||||
|
||||
if skip_tag_filter_columns.is_some_and(|columns| {
|
||||
filter_ctx.semantic_type() == SemanticType::Tag
|
||||
&& columns.contains(&filter_ctx.column_id())
|
||||
}) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Get the column directly by its projected index.
|
||||
// If the column is missing and it's not a tag/time column, this filter is skipped.
|
||||
// Assumes the projection indices align with the input batch schema.
|
||||
@@ -1504,20 +1451,6 @@ mod tests {
|
||||
assert!(base.new_primary_key_filter().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_covered_primary_key_filter_columns_only_include_prefiltered_tags() {
|
||||
let base = new_test_range_base(&[
|
||||
col("tag_0").eq(lit("b")),
|
||||
col("field_0").eq(lit(1_u64)),
|
||||
col("ts").gt(lit(0_i64)),
|
||||
]);
|
||||
|
||||
let covered_columns = base.covered_primary_key_filter_columns().unwrap();
|
||||
let metadata = sst_region_metadata();
|
||||
assert_eq!(covered_columns.len(), 1);
|
||||
assert!(covered_columns.contains(&metadata.column_by_name("tag_0").unwrap().column_id));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prefilter_primary_key_ignores_reused_expected_tag_name() {
|
||||
let metadata = Arc::new(sst_region_metadata());
|
||||
@@ -1678,19 +1611,4 @@ mod tests {
|
||||
assert_eq!(filtered.num_rows(), 2);
|
||||
assert_eq!(field_values(&filtered), vec![12, 13]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_precise_filter_flat_skips_prefiltered_tag_decode() {
|
||||
let base = new_test_range_base(&[col("tag_0").eq(lit("b"))]);
|
||||
let skip_tag_filter_columns = base.covered_primary_key_filter_columns().unwrap();
|
||||
let batch = new_raw_batch(&[b"not-a-valid-primary-key"], &[10]);
|
||||
|
||||
let filtered = base
|
||||
.precise_filter_flat(batch, false, Some(&skip_tag_filter_columns))
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(filtered.num_rows(), 1);
|
||||
assert_eq!(field_values(&filtered), vec![10]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,7 +30,7 @@ use datatypes::arrow::error::ArrowError;
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::prelude::DataType;
|
||||
use mito_codec::row_converter::{PrimaryKeyFilter, build_primary_key_codec};
|
||||
use mito_codec::row_converter::build_primary_key_codec;
|
||||
use object_store::ObjectStore;
|
||||
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
|
||||
use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
|
||||
@@ -1606,12 +1606,11 @@ pub struct ReaderMetrics {
|
||||
pub(crate) filter_metrics: ReaderFilterMetrics,
|
||||
/// Duration to build the parquet reader.
|
||||
pub(crate) build_cost: Duration,
|
||||
/// Duration to scan the reader, including parquet fetches and decoding work
|
||||
/// needed to materialize output batches.
|
||||
/// Duration to scan the reader.
|
||||
pub(crate) scan_cost: Duration,
|
||||
/// Number of record batches read.
|
||||
pub(crate) num_record_batches: usize,
|
||||
/// Number of decoded output batches materialized from parquet data.
|
||||
/// Number of batches decoded.
|
||||
pub(crate) num_batches: usize,
|
||||
/// Number of rows read.
|
||||
pub(crate) num_rows: usize,
|
||||
@@ -2164,14 +2163,13 @@ impl FlatRowGroupReader {
|
||||
flat_format.convert_batch(record_batch, self.override_sequence.as_ref())
|
||||
}
|
||||
|
||||
/// Applies the encoded primary-key prefilter to a raw parquet batch before flat conversion.
|
||||
pub(crate) fn prefilter_raw_batch_by_primary_key(
|
||||
&self,
|
||||
record_batch: RecordBatch,
|
||||
primary_key_filter: &mut dyn PrimaryKeyFilter,
|
||||
) -> Result<Option<RecordBatch>> {
|
||||
self.context
|
||||
.prefilter_flat_batch_by_primary_key(record_batch, primary_key_filter)
|
||||
/// Returns the next flat RecordBatch.
|
||||
pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
|
||||
let Some(record_batch) = self.next_raw_batch()? else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
self.convert_batch(record_batch).map(Some)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,53 +0,0 @@
|
||||
CREATE TABLE last_row_selector_cache_filter (
|
||||
host STRING,
|
||||
cpu DOUBLE,
|
||||
ts TIMESTAMP TIME INDEX,
|
||||
PRIMARY KEY (host)
|
||||
) WITH ('sst_format' = 'flat');
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO last_row_selector_cache_filter VALUES
|
||||
('a', 1.0, 1000),
|
||||
('a', 2.0, 2000),
|
||||
('b', 3.0, 1000),
|
||||
('b', 4.0, 2000);
|
||||
|
||||
Affected Rows: 4
|
||||
|
||||
ADMIN FLUSH_TABLE('last_row_selector_cache_filter');
|
||||
|
||||
+-----------------------------------------------------+
|
||||
| ADMIN FLUSH_TABLE('last_row_selector_cache_filter') |
|
||||
+-----------------------------------------------------+
|
||||
| 0 |
|
||||
+-----------------------------------------------------+
|
||||
|
||||
SELECT host, last_value(cpu ORDER BY ts) AS last_cpu
|
||||
FROM last_row_selector_cache_filter
|
||||
WHERE host = 'a'
|
||||
GROUP BY host
|
||||
ORDER BY host;
|
||||
|
||||
+------+----------+
|
||||
| host | last_cpu |
|
||||
+------+----------+
|
||||
| a | 2.0 |
|
||||
+------+----------+
|
||||
|
||||
SELECT host, last_value(cpu ORDER BY ts) AS last_cpu
|
||||
FROM last_row_selector_cache_filter
|
||||
GROUP BY host
|
||||
ORDER BY host;
|
||||
|
||||
+------+----------+
|
||||
| host | last_cpu |
|
||||
+------+----------+
|
||||
| a | 2.0 |
|
||||
| b | 4.0 |
|
||||
+------+----------+
|
||||
|
||||
DROP TABLE last_row_selector_cache_filter;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
@@ -1,27 +0,0 @@
|
||||
CREATE TABLE last_row_selector_cache_filter (
|
||||
host STRING,
|
||||
cpu DOUBLE,
|
||||
ts TIMESTAMP TIME INDEX,
|
||||
PRIMARY KEY (host)
|
||||
) WITH ('sst_format' = 'flat');
|
||||
|
||||
INSERT INTO last_row_selector_cache_filter VALUES
|
||||
('a', 1.0, 1000),
|
||||
('a', 2.0, 2000),
|
||||
('b', 3.0, 1000),
|
||||
('b', 4.0, 2000);
|
||||
|
||||
ADMIN FLUSH_TABLE('last_row_selector_cache_filter');
|
||||
|
||||
SELECT host, last_value(cpu ORDER BY ts) AS last_cpu
|
||||
FROM last_row_selector_cache_filter
|
||||
WHERE host = 'a'
|
||||
GROUP BY host
|
||||
ORDER BY host;
|
||||
|
||||
SELECT host, last_value(cpu ORDER BY ts) AS last_cpu
|
||||
FROM last_row_selector_cache_filter
|
||||
GROUP BY host
|
||||
ORDER BY host;
|
||||
|
||||
DROP TABLE last_row_selector_cache_filter;
|
||||
Reference in New Issue
Block a user