mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-11 07:42:54 +00:00
feat: EncodedBulkPartIter iters flat format and returns RecordBatch (#6655)
* feat: implements iter to read bulk part Signed-off-by: evenyag <realevenyag@gmail.com> * feat: BulkPartEncoder encodes BulkPart instead of mutation Signed-off-by: evenyag <realevenyag@gmail.com> --------- Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
@@ -458,7 +458,6 @@ fn bulk_part_record_batch_iter_filter(c: &mut Criterion) {
|
||||
metadata.clone(),
|
||||
&None, // No projection
|
||||
Some(predicate.clone()), // With hostname filter
|
||||
true,
|
||||
));
|
||||
|
||||
// Create and iterate over BulkPartRecordBatchIter with filter
|
||||
@@ -480,7 +479,6 @@ fn bulk_part_record_batch_iter_filter(c: &mut Criterion) {
|
||||
metadata.clone(),
|
||||
&None, // No projection
|
||||
None, // No predicate
|
||||
true,
|
||||
));
|
||||
|
||||
// Create and iterate over BulkPartRecordBatchIter
|
||||
|
||||
@@ -22,6 +22,7 @@ use std::time::Duration;
|
||||
|
||||
pub use bulk::part::EncodedBulkPart;
|
||||
use common_time::Timestamp;
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use mito_codec::key_values::KeyValue;
|
||||
pub use mito_codec::key_values::KeyValues;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -137,6 +138,8 @@ impl MemtableStats {
|
||||
|
||||
pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
|
||||
|
||||
pub type BoxedRecordBatchIterator = Box<dyn Iterator<Item = Result<RecordBatch>> + Send>;
|
||||
|
||||
/// Ranges in a memtable.
|
||||
#[derive(Default)]
|
||||
pub struct MemtableRanges {
|
||||
|
||||
@@ -41,7 +41,6 @@ impl BulkIterContext {
|
||||
region_metadata: RegionMetadataRef,
|
||||
projection: &Option<&[ColumnId]>,
|
||||
predicate: Option<Predicate>,
|
||||
flat_format: bool,
|
||||
) -> Self {
|
||||
let codec = build_primary_key_codec(®ion_metadata);
|
||||
|
||||
@@ -56,7 +55,7 @@ impl BulkIterContext {
|
||||
})
|
||||
.collect();
|
||||
|
||||
let read_format = build_read_format(region_metadata, projection, flat_format);
|
||||
let read_format = build_read_format(region_metadata, projection, true);
|
||||
|
||||
Self {
|
||||
base: RangeBase {
|
||||
|
||||
@@ -60,7 +60,7 @@ use crate::error::{
|
||||
use crate::memtable::bulk::context::BulkIterContextRef;
|
||||
use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
|
||||
use crate::memtable::time_series::{ValueBuilder, Values};
|
||||
use crate::memtable::BoxedBatchIterator;
|
||||
use crate::memtable::BoxedRecordBatchIterator;
|
||||
use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
|
||||
use crate::sst::parquet::helper::parse_parquet_metadata;
|
||||
use crate::sst::to_sst_arrow_schema;
|
||||
@@ -511,7 +511,7 @@ impl EncodedBulkPart {
|
||||
&self,
|
||||
context: BulkIterContextRef,
|
||||
sequence: Option<SequenceNumber>,
|
||||
) -> Result<Option<BoxedBatchIterator>> {
|
||||
) -> Result<Option<BoxedRecordBatchIterator>> {
|
||||
// use predicate to find row groups to read.
|
||||
let row_groups_to_read = context.row_groups_to_read(&self.metadata.parquet_metadata);
|
||||
|
||||
@@ -527,7 +527,7 @@ impl EncodedBulkPart {
|
||||
self.data.clone(),
|
||||
sequence,
|
||||
)?;
|
||||
Ok(Some(Box::new(iter) as BoxedBatchIterator))
|
||||
Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -547,19 +547,12 @@ pub struct BulkPartMeta {
|
||||
|
||||
pub struct BulkPartEncoder {
|
||||
metadata: RegionMetadataRef,
|
||||
pk_encoder: DensePrimaryKeyCodec,
|
||||
row_group_size: usize,
|
||||
dedup: bool,
|
||||
writer_props: Option<WriterProperties>,
|
||||
}
|
||||
|
||||
impl BulkPartEncoder {
|
||||
pub(crate) fn new(
|
||||
metadata: RegionMetadataRef,
|
||||
dedup: bool,
|
||||
row_group_size: usize,
|
||||
) -> BulkPartEncoder {
|
||||
let codec = DensePrimaryKeyCodec::new(&metadata);
|
||||
pub(crate) fn new(metadata: RegionMetadataRef, row_group_size: usize) -> BulkPartEncoder {
|
||||
let writer_props = Some(
|
||||
WriterProperties::builder()
|
||||
.set_write_batch_size(row_group_size)
|
||||
@@ -568,33 +561,27 @@ impl BulkPartEncoder {
|
||||
);
|
||||
Self {
|
||||
metadata,
|
||||
pk_encoder: codec,
|
||||
row_group_size,
|
||||
dedup,
|
||||
writer_props,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl BulkPartEncoder {
|
||||
/// Encodes mutations to a [EncodedBulkPart], returns true if encoded data has been written to `dest`.
|
||||
fn encode_mutations(&self, mutations: &[Mutation]) -> Result<Option<EncodedBulkPart>> {
|
||||
let Some((arrow_record_batch, min_ts, max_ts)) =
|
||||
mutations_to_record_batch(mutations, &self.metadata, &self.pk_encoder, self.dedup)?
|
||||
else {
|
||||
/// Encodes bulk part to a [EncodedBulkPart], returns the encoded data.
|
||||
fn encode_part(&self, part: &BulkPart) -> Result<Option<EncodedBulkPart>> {
|
||||
if part.batch.num_rows() == 0 {
|
||||
return Ok(None);
|
||||
};
|
||||
}
|
||||
|
||||
let mut buf = Vec::with_capacity(4096);
|
||||
let arrow_schema = arrow_record_batch.schema();
|
||||
let arrow_schema = part.batch.schema();
|
||||
|
||||
let file_metadata = {
|
||||
let mut writer =
|
||||
ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
|
||||
.context(EncodeMemtableSnafu)?;
|
||||
writer
|
||||
.write(&arrow_record_batch)
|
||||
.context(EncodeMemtableSnafu)?;
|
||||
writer.write(&part.batch).context(EncodeMemtableSnafu)?;
|
||||
writer.finish().context(EncodeMemtableSnafu)?
|
||||
};
|
||||
|
||||
@@ -604,9 +591,9 @@ impl BulkPartEncoder {
|
||||
Ok(Some(EncodedBulkPart {
|
||||
data: buf,
|
||||
metadata: BulkPartMeta {
|
||||
num_rows: arrow_record_batch.num_rows(),
|
||||
max_timestamp: max_ts,
|
||||
min_timestamp: min_ts,
|
||||
num_rows: part.batch.num_rows(),
|
||||
max_timestamp: part.max_ts,
|
||||
min_timestamp: part.min_ts,
|
||||
parquet_metadata,
|
||||
region_metadata: self.metadata.clone(),
|
||||
},
|
||||
@@ -881,6 +868,7 @@ mod tests {
|
||||
|
||||
use api::v1::{Row, WriteHint};
|
||||
use datafusion_common::ScalarValue;
|
||||
use datatypes::arrow::array::Float64Array;
|
||||
use datatypes::prelude::{ConcreteDataType, ScalarVector, Value};
|
||||
use datatypes::vectors::{Float64Vector, TimestampMillisecondVector};
|
||||
use store_api::storage::consts::ReservedColumnId;
|
||||
@@ -890,8 +878,7 @@ mod tests {
|
||||
use crate::sst::parquet::format::{PrimaryKeyReadFormat, ReadFormat};
|
||||
use crate::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions};
|
||||
use crate::test_util::memtable_util::{
|
||||
build_key_values_with_ts_seq_values, metadata_for_test, metadata_with_primary_key,
|
||||
region_metadata_to_row_schema,
|
||||
build_key_values_with_ts_seq_values, metadata_for_test, region_metadata_to_row_schema,
|
||||
};
|
||||
|
||||
fn check_binary_array_to_dictionary(
|
||||
@@ -1191,7 +1178,7 @@ mod tests {
|
||||
|
||||
fn encode(input: &[MutationInput]) -> EncodedBulkPart {
|
||||
let metadata = metadata_for_test();
|
||||
let mutations = input
|
||||
let kvs = input
|
||||
.iter()
|
||||
.map(|m| {
|
||||
build_key_values_with_ts_seq_values(
|
||||
@@ -1202,11 +1189,17 @@ mod tests {
|
||||
m.v1.iter().copied(),
|
||||
m.sequence,
|
||||
)
|
||||
.mutation
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let encoder = BulkPartEncoder::new(metadata, true, 1024);
|
||||
encoder.encode_mutations(&mutations).unwrap().unwrap()
|
||||
let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
|
||||
let primary_key_codec = build_primary_key_codec(&metadata);
|
||||
let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
|
||||
for kv in kvs {
|
||||
converter.append_key_values(&kv).unwrap();
|
||||
}
|
||||
let part = converter.convert().unwrap();
|
||||
let encoder = BulkPartEncoder::new(metadata, 1024);
|
||||
encoder.encode_part(&part).unwrap().unwrap()
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1236,14 +1229,12 @@ mod tests {
|
||||
]);
|
||||
|
||||
let projection = &[4u32];
|
||||
|
||||
let mut reader = part
|
||||
.read(
|
||||
Arc::new(BulkIterContext::new(
|
||||
part.metadata.region_metadata.clone(),
|
||||
&Some(projection.as_slice()),
|
||||
None,
|
||||
false,
|
||||
)),
|
||||
None,
|
||||
)
|
||||
@@ -1251,19 +1242,17 @@ mod tests {
|
||||
.expect("expect at least one row group");
|
||||
|
||||
let mut total_rows_read = 0;
|
||||
let mut field = vec![];
|
||||
let mut field: Vec<f64> = vec![];
|
||||
for res in reader {
|
||||
let batch = res.unwrap();
|
||||
assert_eq!(1, batch.fields().len());
|
||||
assert_eq!(4, batch.fields()[0].column_id);
|
||||
field.extend(
|
||||
batch.fields()[0]
|
||||
.data
|
||||
assert_eq!(5, batch.num_columns());
|
||||
field.extend_from_slice(
|
||||
batch
|
||||
.column(0)
|
||||
.as_any()
|
||||
.downcast_ref::<Float64Vector>()
|
||||
.downcast_ref::<Float64Array>()
|
||||
.unwrap()
|
||||
.iter_data()
|
||||
.map(|v| v.unwrap()),
|
||||
.values(),
|
||||
);
|
||||
total_rows_read += batch.num_rows();
|
||||
}
|
||||
@@ -1273,17 +1262,23 @@ mod tests {
|
||||
|
||||
fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
|
||||
let metadata = metadata_for_test();
|
||||
let mutations = key_values
|
||||
let kvs = key_values
|
||||
.into_iter()
|
||||
.map(|(k0, k1, (start, end), sequence)| {
|
||||
let ts = (start..end);
|
||||
let v1 = (start..end).map(|_| None);
|
||||
build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
|
||||
.mutation
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let encoder = BulkPartEncoder::new(metadata, true, 100);
|
||||
encoder.encode_mutations(&mutations).unwrap().unwrap()
|
||||
let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
|
||||
let primary_key_codec = build_primary_key_codec(&metadata);
|
||||
let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
|
||||
for kv in kvs {
|
||||
converter.append_key_values(&kv).unwrap();
|
||||
}
|
||||
let part = converter.convert().unwrap();
|
||||
let encoder = BulkPartEncoder::new(metadata, 1024);
|
||||
encoder.encode_part(&part).unwrap().unwrap()
|
||||
}
|
||||
|
||||
fn check_prune_row_group(
|
||||
@@ -1295,7 +1290,6 @@ mod tests {
|
||||
part.metadata.region_metadata.clone(),
|
||||
&None,
|
||||
predicate,
|
||||
false,
|
||||
));
|
||||
let mut reader = part
|
||||
.read(context, None)
|
||||
@@ -1326,7 +1320,6 @@ mod tests {
|
||||
Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
|
||||
datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
|
||||
)])),
|
||||
false,
|
||||
));
|
||||
assert!(part.read(context, None).unwrap().is_none());
|
||||
|
||||
|
||||
@@ -20,26 +20,26 @@ use bytes::Bytes;
|
||||
use datatypes::arrow::array::{BooleanArray, Scalar, UInt64Array};
|
||||
use datatypes::arrow::buffer::BooleanBuffer;
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
|
||||
use parquet::arrow::ProjectionMask;
|
||||
use parquet::file::metadata::ParquetMetaData;
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::SequenceNumber;
|
||||
|
||||
use crate::error::{self, ComputeArrowSnafu};
|
||||
use crate::memtable::bulk::context::BulkIterContextRef;
|
||||
use crate::memtable::bulk::row_group_reader::{
|
||||
MemtableRowGroupReader, MemtableRowGroupReaderBuilder,
|
||||
};
|
||||
use crate::read::Batch;
|
||||
use crate::error::{self, ComputeArrowSnafu, DecodeArrowRowGroupSnafu};
|
||||
use crate::memtable::bulk::context::{BulkIterContext, BulkIterContextRef};
|
||||
use crate::memtable::bulk::row_group_reader::MemtableRowGroupReaderBuilder;
|
||||
use crate::sst::parquet::flat_format::sequence_column_index;
|
||||
use crate::sst::parquet::reader::MaybeFilter;
|
||||
use crate::sst::parquet::reader::{MaybeFilter, RowGroupReaderContext};
|
||||
|
||||
/// Iterator for reading data inside a bulk part.
|
||||
pub struct EncodedBulkPartIter {
|
||||
context: BulkIterContextRef,
|
||||
row_groups_to_read: VecDeque<usize>,
|
||||
current_reader: Option<PruneReader>,
|
||||
current_reader: Option<ParquetRecordBatchReader>,
|
||||
builder: MemtableRowGroupReaderBuilder,
|
||||
sequence: Option<SequenceNumber>,
|
||||
/// Sequence number filter.
|
||||
sequence: Option<Scalar<UInt64Array>>,
|
||||
}
|
||||
|
||||
impl EncodedBulkPartIter {
|
||||
@@ -51,24 +51,23 @@ impl EncodedBulkPartIter {
|
||||
data: Bytes,
|
||||
sequence: Option<SequenceNumber>,
|
||||
) -> error::Result<Self> {
|
||||
assert!(context.read_format().as_flat().is_some());
|
||||
|
||||
let sequence = sequence.map(UInt64Array::new_scalar);
|
||||
|
||||
let projection_mask = ProjectionMask::roots(
|
||||
parquet_meta.file_metadata().schema_descr(),
|
||||
context.read_format().projection_indices().iter().copied(),
|
||||
);
|
||||
|
||||
let builder = MemtableRowGroupReaderBuilder::try_new(
|
||||
context.clone(),
|
||||
projection_mask,
|
||||
parquet_meta,
|
||||
data,
|
||||
)?;
|
||||
let builder =
|
||||
MemtableRowGroupReaderBuilder::try_new(&context, projection_mask, parquet_meta, data)?;
|
||||
|
||||
let init_reader = row_groups_to_read
|
||||
.pop_front()
|
||||
.map(|first_row_group| builder.build_row_group_reader(first_row_group, None))
|
||||
.transpose()?
|
||||
.map(|r| PruneReader::new(context, r));
|
||||
.transpose()?;
|
||||
Ok(Self {
|
||||
context,
|
||||
row_groups_to_read,
|
||||
current_reader: init_reader,
|
||||
builder,
|
||||
@@ -76,88 +75,42 @@ impl EncodedBulkPartIter {
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn next_batch(&mut self) -> error::Result<Option<Batch>> {
|
||||
/// Fetches next non-empty record batch.
|
||||
pub(crate) fn next_record_batch(&mut self) -> error::Result<Option<RecordBatch>> {
|
||||
let Some(current) = &mut self.current_reader else {
|
||||
// All row group exhausted.
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
if let Some(mut batch) = current.next_batch()? {
|
||||
batch.filter_by_sequence(self.sequence)?;
|
||||
return Ok(Some(batch));
|
||||
for batch in current {
|
||||
let batch = batch.context(DecodeArrowRowGroupSnafu)?;
|
||||
if let Some(batch) = apply_combined_filters(&self.context, &self.sequence, batch)? {
|
||||
return Ok(Some(batch));
|
||||
}
|
||||
}
|
||||
|
||||
// Previous row group exhausted, read next row group
|
||||
while let Some(next_row_group) = self.row_groups_to_read.pop_front() {
|
||||
current.reset(self.builder.build_row_group_reader(next_row_group, None)?);
|
||||
if let Some(mut next_batch) = current.next_batch()? {
|
||||
next_batch.filter_by_sequence(self.sequence)?;
|
||||
return Ok(Some(next_batch));
|
||||
let next_reader = self.builder.build_row_group_reader(next_row_group, None)?;
|
||||
let current = self.current_reader.insert(next_reader);
|
||||
|
||||
for batch in current {
|
||||
let batch = batch.context(DecodeArrowRowGroupSnafu)?;
|
||||
if let Some(batch) = apply_combined_filters(&self.context, &self.sequence, batch)? {
|
||||
return Ok(Some(batch));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
impl Iterator for EncodedBulkPartIter {
|
||||
type Item = error::Result<Batch>;
|
||||
type Item = error::Result<RecordBatch>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.next_batch().transpose()
|
||||
}
|
||||
}
|
||||
|
||||
struct PruneReader {
|
||||
context: BulkIterContextRef,
|
||||
row_group_reader: MemtableRowGroupReader,
|
||||
}
|
||||
|
||||
//todo(hl): maybe we also need to support lastrow mode here.
|
||||
impl PruneReader {
|
||||
fn new(context: BulkIterContextRef, reader: MemtableRowGroupReader) -> Self {
|
||||
Self {
|
||||
context,
|
||||
row_group_reader: reader,
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterates current inner reader until exhausted.
|
||||
fn next_batch(&mut self) -> error::Result<Option<Batch>> {
|
||||
while let Some(b) = self.row_group_reader.next_inner()? {
|
||||
match self.prune(b)? {
|
||||
Some(b) => {
|
||||
return Ok(Some(b));
|
||||
}
|
||||
None => {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Prunes batch according to filters.
|
||||
fn prune(&mut self, batch: Batch) -> error::Result<Option<Batch>> {
|
||||
//todo(hl): add metrics.
|
||||
|
||||
// fast path
|
||||
if self.context.base.filters.is_empty() {
|
||||
return Ok(Some(batch));
|
||||
}
|
||||
|
||||
let Some(batch_filtered) = self.context.base.precise_filter(batch)? else {
|
||||
// the entire batch is filtered out
|
||||
return Ok(None);
|
||||
};
|
||||
if !batch_filtered.is_empty() {
|
||||
Ok(Some(batch_filtered))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
fn reset(&mut self, reader: MemtableRowGroupReader) {
|
||||
self.row_group_reader = reader;
|
||||
self.next_record_batch().transpose()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -201,95 +154,13 @@ impl BulkPartRecordBatchIter {
|
||||
.context(ComputeArrowSnafu)
|
||||
}
|
||||
|
||||
// TODO(yingwen): Supports sparse encoding which doesn't have decoded primary key columns.
|
||||
/// Applies both predicate filtering and sequence filtering in a single pass.
|
||||
/// Returns None if the filtered batch is empty.
|
||||
fn apply_combined_filters(
|
||||
&self,
|
||||
record_batch: RecordBatch,
|
||||
) -> error::Result<Option<RecordBatch>> {
|
||||
let num_rows = record_batch.num_rows();
|
||||
let mut combined_filter = None;
|
||||
|
||||
// First, apply predicate filters.
|
||||
if !self.context.base.filters.is_empty() {
|
||||
let num_rows = record_batch.num_rows();
|
||||
let mut mask = BooleanBuffer::new_set(num_rows);
|
||||
|
||||
// Run filter one by one and combine them result, similar to RangeBase::precise_filter
|
||||
for filter_ctx in &self.context.base.filters {
|
||||
let filter = match filter_ctx.filter() {
|
||||
MaybeFilter::Filter(f) => f,
|
||||
// Column matches.
|
||||
MaybeFilter::Matched => continue,
|
||||
// Column doesn't match, filter the entire batch.
|
||||
MaybeFilter::Pruned => return Ok(None),
|
||||
};
|
||||
|
||||
// Safety: We checked the format type in new().
|
||||
let Some(column_index) = self
|
||||
.context
|
||||
.read_format()
|
||||
.as_flat()
|
||||
.unwrap()
|
||||
.projected_index_by_id(filter_ctx.column_id())
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
let array = record_batch.column(column_index);
|
||||
let result = filter
|
||||
.evaluate_array(array)
|
||||
.context(crate::error::RecordBatchSnafu)?;
|
||||
|
||||
mask = mask.bitand(&result);
|
||||
}
|
||||
// Convert the mask to BooleanArray
|
||||
combined_filter = Some(BooleanArray::from(mask));
|
||||
}
|
||||
|
||||
// Filters rows by the given `sequence`. Only preserves rows with sequence less than or equal to `sequence`.
|
||||
if let Some(sequence) = &self.sequence {
|
||||
let sequence_column =
|
||||
record_batch.column(sequence_column_index(record_batch.num_columns()));
|
||||
let sequence_filter =
|
||||
datatypes::arrow::compute::kernels::cmp::lt_eq(sequence_column, sequence)
|
||||
.context(ComputeArrowSnafu)?;
|
||||
// Combine with existing filter using AND operation
|
||||
combined_filter = match combined_filter {
|
||||
None => Some(sequence_filter),
|
||||
Some(existing_filter) => {
|
||||
let and_result =
|
||||
datatypes::arrow::compute::and(&existing_filter, &sequence_filter)
|
||||
.context(ComputeArrowSnafu)?;
|
||||
Some(and_result)
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// Apply the combined filter if any filters were applied
|
||||
let Some(filter_array) = combined_filter else {
|
||||
// No filters applied, return original batch
|
||||
return Ok(Some(record_batch));
|
||||
};
|
||||
let select_count = filter_array.true_count();
|
||||
if select_count == 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
if select_count == num_rows {
|
||||
return Ok(Some(record_batch));
|
||||
}
|
||||
let filtered_batch =
|
||||
datatypes::arrow::compute::filter_record_batch(&record_batch, &filter_array)
|
||||
.context(ComputeArrowSnafu)?;
|
||||
|
||||
Ok(Some(filtered_batch))
|
||||
}
|
||||
|
||||
fn process_batch(&mut self, record_batch: RecordBatch) -> error::Result<Option<RecordBatch>> {
|
||||
// Apply projection first.
|
||||
let projected_batch = self.apply_projection(record_batch)?;
|
||||
// Apply combined filtering (both predicate and sequence filters)
|
||||
let Some(filtered_batch) = self.apply_combined_filters(projected_batch)? else {
|
||||
let Some(filtered_batch) =
|
||||
apply_combined_filters(&self.context, &self.sequence, projected_batch)?
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
@@ -307,6 +178,89 @@ impl Iterator for BulkPartRecordBatchIter {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(yingwen): Supports sparse encoding which doesn't have decoded primary key columns.
|
||||
/// Applies both predicate filtering and sequence filtering in a single pass.
|
||||
/// Returns None if the filtered batch is empty.
|
||||
fn apply_combined_filters(
|
||||
context: &BulkIterContext,
|
||||
sequence: &Option<Scalar<UInt64Array>>,
|
||||
record_batch: RecordBatch,
|
||||
) -> error::Result<Option<RecordBatch>> {
|
||||
let num_rows = record_batch.num_rows();
|
||||
let mut combined_filter = None;
|
||||
|
||||
// First, apply predicate filters.
|
||||
if !context.base.filters.is_empty() {
|
||||
let num_rows = record_batch.num_rows();
|
||||
let mut mask = BooleanBuffer::new_set(num_rows);
|
||||
|
||||
// Run filter one by one and combine them result, similar to RangeBase::precise_filter
|
||||
for filter_ctx in &context.base.filters {
|
||||
let filter = match filter_ctx.filter() {
|
||||
MaybeFilter::Filter(f) => f,
|
||||
// Column matches.
|
||||
MaybeFilter::Matched => continue,
|
||||
// Column doesn't match, filter the entire batch.
|
||||
MaybeFilter::Pruned => return Ok(None),
|
||||
};
|
||||
|
||||
// Safety: We checked the format type in new().
|
||||
let Some(column_index) = context
|
||||
.read_format()
|
||||
.as_flat()
|
||||
.unwrap()
|
||||
.projected_index_by_id(filter_ctx.column_id())
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
let array = record_batch.column(column_index);
|
||||
let result = filter
|
||||
.evaluate_array(array)
|
||||
.context(crate::error::RecordBatchSnafu)?;
|
||||
|
||||
mask = mask.bitand(&result);
|
||||
}
|
||||
// Convert the mask to BooleanArray
|
||||
combined_filter = Some(BooleanArray::from(mask));
|
||||
}
|
||||
|
||||
// Filters rows by the given `sequence`. Only preserves rows with sequence less than or equal to `sequence`.
|
||||
if let Some(sequence) = sequence {
|
||||
let sequence_column =
|
||||
record_batch.column(sequence_column_index(record_batch.num_columns()));
|
||||
let sequence_filter =
|
||||
datatypes::arrow::compute::kernels::cmp::lt_eq(sequence_column, sequence)
|
||||
.context(ComputeArrowSnafu)?;
|
||||
// Combine with existing filter using AND operation
|
||||
combined_filter = match combined_filter {
|
||||
None => Some(sequence_filter),
|
||||
Some(existing_filter) => {
|
||||
let and_result = datatypes::arrow::compute::and(&existing_filter, &sequence_filter)
|
||||
.context(ComputeArrowSnafu)?;
|
||||
Some(and_result)
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// Apply the combined filter if any filters were applied
|
||||
let Some(filter_array) = combined_filter else {
|
||||
// No filters applied, return original batch
|
||||
return Ok(Some(record_batch));
|
||||
};
|
||||
let select_count = filter_array.true_count();
|
||||
if select_count == 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
if select_count == num_rows {
|
||||
return Ok(Some(record_batch));
|
||||
}
|
||||
let filtered_batch =
|
||||
datatypes::arrow::compute::filter_record_batch(&record_batch, &filter_array)
|
||||
.context(ComputeArrowSnafu)?;
|
||||
|
||||
Ok(Some(filtered_batch))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
@@ -412,7 +366,6 @@ mod tests {
|
||||
Arc::new(region_metadata.clone()),
|
||||
&None, // No projection
|
||||
None, // No predicate
|
||||
true,
|
||||
));
|
||||
// Iterates all rows.
|
||||
let iter = BulkPartRecordBatchIter::new(record_batch.clone(), context.clone(), None);
|
||||
@@ -436,7 +389,6 @@ mod tests {
|
||||
Arc::new(region_metadata),
|
||||
&Some(&[0, 2]),
|
||||
Some(Predicate::new(vec![col("key1").eq(lit("key2"))])),
|
||||
true,
|
||||
));
|
||||
// Creates iter with projection and predicate.
|
||||
let iter = BulkPartRecordBatchIter::new(record_batch.clone(), context.clone(), None);
|
||||
|
||||
@@ -28,7 +28,7 @@ use crate::error;
|
||||
use crate::error::ReadDataPartSnafu;
|
||||
use crate::memtable::bulk::context::BulkIterContextRef;
|
||||
use crate::sst::parquet::format::ReadFormat;
|
||||
use crate::sst::parquet::reader::{RowGroupReaderBase, RowGroupReaderContext};
|
||||
use crate::sst::parquet::reader::RowGroupReaderContext;
|
||||
use crate::sst::parquet::row_group::{ColumnChunkIterator, RowGroupBase};
|
||||
use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
|
||||
|
||||
@@ -118,10 +118,7 @@ impl RowGroupReaderContext for BulkIterContextRef {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) type MemtableRowGroupReader = RowGroupReaderBase<BulkIterContextRef>;
|
||||
|
||||
pub(crate) struct MemtableRowGroupReaderBuilder {
|
||||
context: BulkIterContextRef,
|
||||
projection: ProjectionMask,
|
||||
parquet_metadata: Arc<ParquetMetaData>,
|
||||
field_levels: FieldLevels,
|
||||
@@ -130,7 +127,7 @@ pub(crate) struct MemtableRowGroupReaderBuilder {
|
||||
|
||||
impl MemtableRowGroupReaderBuilder {
|
||||
pub(crate) fn try_new(
|
||||
context: BulkIterContextRef,
|
||||
context: &BulkIterContextRef,
|
||||
projection: ProjectionMask,
|
||||
parquet_metadata: Arc<ParquetMetaData>,
|
||||
data: Bytes,
|
||||
@@ -141,7 +138,6 @@ impl MemtableRowGroupReaderBuilder {
|
||||
parquet_to_arrow_field_levels(parquet_schema_desc, projection.clone(), hint)
|
||||
.context(ReadDataPartSnafu)?;
|
||||
Ok(Self {
|
||||
context,
|
||||
projection,
|
||||
parquet_metadata,
|
||||
field_levels,
|
||||
@@ -154,7 +150,7 @@ impl MemtableRowGroupReaderBuilder {
|
||||
&self,
|
||||
row_group_idx: usize,
|
||||
row_selection: Option<RowSelection>,
|
||||
) -> error::Result<MemtableRowGroupReader> {
|
||||
) -> error::Result<ParquetRecordBatchReader> {
|
||||
let mut row_group = MemtableRowGroupPageFetcher::create(
|
||||
row_group_idx,
|
||||
&self.parquet_metadata,
|
||||
@@ -165,13 +161,12 @@ impl MemtableRowGroupReaderBuilder {
|
||||
|
||||
// Builds the parquet reader.
|
||||
// Now the row selection is None.
|
||||
let reader = ParquetRecordBatchReader::try_new_with_row_groups(
|
||||
ParquetRecordBatchReader::try_new_with_row_groups(
|
||||
&self.field_levels,
|
||||
&row_group,
|
||||
DEFAULT_READ_BATCH_SIZE,
|
||||
row_selection,
|
||||
)
|
||||
.context(ReadDataPartSnafu)?;
|
||||
Ok(MemtableRowGroupReader::create(self.context.clone(), reader))
|
||||
.context(ReadDataPartSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user