feat: functions and structs to scan flat format file and mem ranges (#6817)

* feat: implement function to scan flat memtable ranges

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: implement function to scan flat file ranges

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: compat batch in scan file range

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: scan other ranges

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: fix compiler errors

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2025-08-27 14:31:30 +08:00
committed by GitHub
parent b921e41abf
commit 906e1ca0bf
5 changed files with 362 additions and 8 deletions

View File

@@ -30,7 +30,7 @@ use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
use crate::config::MitoConfig;
use crate::error::Result;
use crate::error::{Result, UnsupportedOperationSnafu};
use crate::flush::WriteBufferManagerRef;
use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
use crate::memtable::time_series::TimeSeriesMemtableBuilder;
@@ -401,6 +401,23 @@ pub(crate) struct MemScanMetricsData {
pub trait IterBuilder: Send + Sync {
/// Returns the iterator to read the range.
fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator>;
/// Returns whether the iterator is a record batch iterator.
fn is_record_batch(&self) -> bool {
false
}
/// Returns the record batch iterator to read the range.
fn build_record_batch(
&self,
metrics: Option<MemScanMetrics>,
) -> Result<BoxedRecordBatchIterator> {
let _metrics = metrics;
UnsupportedOperationSnafu {
err_msg: "Record batch iterator is not supported by this memtable",
}
.fail()
}
}
pub type BoxedIterBuilder = Box<dyn IterBuilder>;
@@ -471,6 +488,22 @@ impl MemtableRange {
self.context.builder.build(None)
}
/// Builds a record batch iterator to read all rows in range.
///
/// This method doesn't take the optional time range because a bulk part is immutable
/// so we don't need to filter rows out of the time range.
pub fn build_record_batch_iter(
&self,
metrics: Option<MemScanMetrics>,
) -> Result<BoxedRecordBatchIterator> {
self.context.builder.build_record_batch(metrics)
}
/// Returns whether the iterator is a record batch iterator.
pub fn is_record_batch(&self) -> bool {
self.context.builder.is_record_batch()
}
pub fn num_rows(&self) -> usize {
self.num_rows
}

View File

@@ -19,6 +19,7 @@ use common_recordbatch::filter::SimpleFilterEvaluator;
use common_time::Timestamp;
use datatypes::arrow::array::BooleanArray;
use datatypes::arrow::buffer::BooleanBuffer;
use datatypes::arrow::record_batch::RecordBatch;
use snafu::ResultExt;
use crate::error::{RecordBatchSnafu, Result};
@@ -27,7 +28,7 @@ use crate::read::last_row::RowGroupLastRowCachedReader;
use crate::read::{Batch, BatchReader};
use crate::sst::file::FileTimeRange;
use crate::sst::parquet::file_range::FileRangeContextRef;
use crate::sst::parquet::reader::{ReaderMetrics, RowGroupReader};
use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics, RowGroupReader};
pub enum Source {
RowGroup(RowGroupReader),
@@ -238,6 +239,94 @@ impl Iterator for PruneTimeIterator {
}
}
pub enum FlatSource {
RowGroup(FlatRowGroupReader),
}
impl FlatSource {
fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
match self {
FlatSource::RowGroup(r) => r.next_batch(),
}
}
}
/// A flat format reader that returns RecordBatch instead of Batch.
pub struct FlatPruneReader {
/// Context for file ranges.
context: FileRangeContextRef,
source: FlatSource,
metrics: ReaderMetrics,
}
impl FlatPruneReader {
pub(crate) fn new_with_row_group_reader(
ctx: FileRangeContextRef,
reader: FlatRowGroupReader,
) -> Self {
Self {
context: ctx,
source: FlatSource::RowGroup(reader),
metrics: Default::default(),
}
}
/// Merge metrics with the inner reader and return the merged metrics.
pub(crate) fn metrics(&self) -> ReaderMetrics {
// FlatRowGroupReader doesn't collect metrics, so just return our own
self.metrics.clone()
}
pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
while let Some(record_batch) = {
let start = std::time::Instant::now();
let batch = self.source.next_batch()?;
self.metrics.scan_cost += start.elapsed();
batch
} {
// Update metrics for the received batch
self.metrics.num_rows += record_batch.num_rows();
self.metrics.num_batches += 1;
match self.prune_flat(record_batch)? {
Some(filtered_batch) => {
return Ok(Some(filtered_batch));
}
None => {
continue;
}
}
}
Ok(None)
}
/// Prunes batches by the pushed down predicate and returns RecordBatch.
fn prune_flat(&mut self, record_batch: RecordBatch) -> Result<Option<RecordBatch>> {
// fast path
if self.context.filters().is_empty() {
return Ok(Some(record_batch));
}
let num_rows_before_filter = record_batch.num_rows();
let Some(filtered_batch) = self.context.precise_filter_flat(record_batch)? else {
// the entire batch is filtered out
self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter;
return Ok(None);
};
// update metric
let filtered_rows = num_rows_before_filter - filtered_batch.num_rows();
self.metrics.filter_metrics.rows_precise_filtered += filtered_rows;
if filtered_batch.num_rows() > 0 {
Ok(Some(filtered_batch))
} else {
Ok(None)
}
}
}
#[cfg(test)]
mod tests {
use api::v1::OpType;

View File

@@ -20,12 +20,14 @@ use std::time::{Duration, Instant};
use async_stream::try_stream;
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
use datatypes::arrow::record_batch::RecordBatch;
use futures::Stream;
use prometheus::IntGauge;
use smallvec::SmallVec;
use snafu::OptionExt;
use store_api::storage::RegionId;
use crate::error::Result;
use crate::error::{Result, UnexpectedSnafu};
use crate::memtable::MemScanMetrics;
use crate::metrics::{
IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROWS_IN_ROW_GROUP_TOTAL,
@@ -33,7 +35,7 @@ use crate::metrics::{
};
use crate::read::range::{RangeBuilderList, RowGroupIndex};
use crate::read::scan_region::StreamContext;
use crate::read::{Batch, BoxedBatchStream, ScannerMetrics, Source};
use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source};
use crate::sst::file::FileTimeRange;
use crate::sst::parquet::file_range::FileRange;
use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics};
@@ -646,6 +648,35 @@ pub(crate) fn scan_mem_ranges(
}
}
/// Scans memtable ranges at `index` using flat format that returns RecordBatch.
#[allow(dead_code)]
pub(crate) fn scan_flat_mem_ranges(
stream_ctx: Arc<StreamContext>,
part_metrics: PartitionMetrics,
index: RowGroupIndex,
) -> impl Stream<Item = Result<RecordBatch>> {
try_stream! {
let ranges = stream_ctx.input.build_mem_ranges(index);
part_metrics.inc_num_mem_ranges(ranges.len());
for range in ranges {
let build_reader_start = Instant::now();
let mem_scan_metrics = Some(MemScanMetrics::default());
let mut iter = range.build_record_batch_iter(mem_scan_metrics.clone())?;
part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
while let Some(record_batch) = iter.next().transpose()? {
yield record_batch;
}
// Report the memtable scan metrics to partition metrics
if let Some(ref metrics) = mem_scan_metrics {
let data = metrics.data();
part_metrics.report_mem_scan_metrics(&data);
}
}
}
}
/// Scans file ranges at `index`.
pub(crate) async fn scan_file_ranges(
stream_ctx: Arc<StreamContext>,
@@ -669,6 +700,30 @@ pub(crate) async fn scan_file_ranges(
))
}
/// Scans file ranges at `index` using flat reader that returns RecordBatch.
#[allow(dead_code)]
pub(crate) async fn scan_flat_file_ranges(
stream_ctx: Arc<StreamContext>,
part_metrics: PartitionMetrics,
index: RowGroupIndex,
read_type: &'static str,
range_builder: Arc<RangeBuilderList>,
) -> Result<impl Stream<Item = Result<RecordBatch>>> {
let mut reader_metrics = ReaderMetrics::default();
let ranges = range_builder
.build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
.await?;
part_metrics.inc_num_file_ranges(ranges.len());
part_metrics.merge_reader_metrics(&reader_metrics);
Ok(build_flat_file_range_scan_stream(
stream_ctx,
part_metrics,
read_type,
ranges,
))
}
/// Build the stream of scanning the input [`FileRange`]s.
pub fn build_file_range_scan_stream(
stream_ctx: Arc<StreamContext>,
@@ -704,6 +759,49 @@ pub fn build_file_range_scan_stream(
}
}
/// Build the stream of scanning the input [`FileRange`]s using flat reader that returns RecordBatch.
pub fn build_flat_file_range_scan_stream(
_stream_ctx: Arc<StreamContext>,
part_metrics: PartitionMetrics,
read_type: &'static str,
ranges: SmallVec<[FileRange; 2]>,
) -> impl Stream<Item = Result<RecordBatch>> {
try_stream! {
let reader_metrics = &mut ReaderMetrics::default();
for range in ranges {
let build_reader_start = Instant::now();
let mut reader = range.flat_reader().await?;
let build_cost = build_reader_start.elapsed();
part_metrics.inc_build_reader_cost(build_cost);
let may_compat = range
.compat_batch()
.map(|compat| {
compat.as_flat().context(UnexpectedSnafu {
reason: "Invalid compat for flat format",
})
})
.transpose()?;
while let Some(record_batch) = reader.next_batch()? {
if let Some(flat_compat) = may_compat {
let batch = flat_compat.compat(record_batch)?;
yield batch;
} else {
yield record_batch;
}
}
let prune_metrics = reader.metrics();
reader_metrics.merge_from(&prune_metrics);
}
// Reports metrics.
reader_metrics.observe_rows(read_type);
reader_metrics.filter_metrics.observe();
part_metrics.merge_reader_metrics(reader_metrics);
}
}
/// Build the stream of scanning the extension range denoted by the [`RowGroupIndex`].
#[cfg(feature = "enterprise")]
pub(crate) async fn scan_extension_range(
@@ -744,3 +842,19 @@ pub(crate) async fn maybe_scan_other_ranges(
.fail()
}
}
#[allow(dead_code)]
pub(crate) async fn maybe_scan_flat_other_ranges(
context: &Arc<StreamContext>,
index: RowGroupIndex,
metrics: &PartitionMetrics,
) -> Result<BoxedRecordBatchStream> {
let _ = context;
let _ = index;
let _ = metrics;
crate::error::UnexpectedSnafu {
reason: "no other ranges scannable in flat format",
}
.fail()
}

View File

@@ -22,23 +22,24 @@ use api::v1::{OpType, SemanticType};
use common_telemetry::error;
use datatypes::arrow::array::BooleanArray;
use datatypes::arrow::buffer::BooleanBuffer;
use datatypes::arrow::record_batch::RecordBatch;
use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
use parquet::arrow::arrow_reader::RowSelection;
use snafu::{OptionExt, ResultExt};
use store_api::storage::TimeSeriesRowSelector;
use crate::error::{
DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu, RecordBatchSnafu, Result,
StatsNotPresentSnafu,
ComputeArrowSnafu, ConvertVectorSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu,
RecordBatchSnafu, Result, StatsNotPresentSnafu,
};
use crate::read::compat::CompatBatch;
use crate::read::last_row::RowGroupLastRowCachedReader;
use crate::read::prune::PruneReader;
use crate::read::prune::{FlatPruneReader, PruneReader};
use crate::read::Batch;
use crate::sst::file::FileHandle;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::reader::{
MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext,
FlatRowGroupReader, MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext,
};
/// A range of a parquet SST. Now it is a row group.
@@ -132,6 +133,21 @@ impl FileRange {
Ok(prune_reader)
}
/// Creates a flat reader that returns RecordBatch.
pub(crate) async fn flat_reader(&self) -> Result<FlatPruneReader> {
let parquet_reader = self
.context
.reader_builder
.build(self.row_group_idx, self.row_selection.clone())
.await?;
let flat_row_group_reader = FlatRowGroupReader::new(self.context.clone(), parquet_reader);
let flat_prune_reader =
FlatPruneReader::new_with_row_group_reader(self.context.clone(), flat_row_group_reader);
Ok(flat_prune_reader)
}
/// Returns the helper to compat batches.
pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
self.context.compat_batch()
@@ -208,6 +224,11 @@ impl FileRangeContext {
self.base.precise_filter(input)
}
/// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch.
pub(crate) fn precise_filter_flat(&self, input: RecordBatch) -> Result<Option<RecordBatch>> {
self.base.precise_filter_flat(input)
}
//// Decodes parquet metadata and finds if row group contains delete op.
pub(crate) fn contains_delete(&self, row_group_index: usize) -> Result<bool> {
let metadata = self.reader_builder.parquet_metadata();
@@ -334,4 +355,51 @@ impl RangeBase {
Ok(Some(input))
}
/// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch.
pub(crate) fn precise_filter_flat(&self, input: RecordBatch) -> Result<Option<RecordBatch>> {
let mut mask = BooleanBuffer::new_set(input.num_rows());
let flat_format = self
.read_format
.as_flat()
.context(crate::error::UnexpectedSnafu {
reason: "Expected flat format for precise_filter_flat",
})?;
// Run filter one by one and combine them result
for filter_ctx in &self.filters {
let filter = match filter_ctx.filter() {
MaybeFilter::Filter(f) => f,
// Column matches.
MaybeFilter::Matched => continue,
// Column doesn't match, filter the entire batch.
MaybeFilter::Pruned => return Ok(None),
};
// Get the column directly by its projected index
let column_idx = flat_format.projected_index_by_id(filter_ctx.column_id());
if let Some(idx) = column_idx {
let column = &input.columns()[idx];
// Convert Arrow Array to Vector
let vector = datatypes::vectors::Helper::try_into_vector(column.clone())
.context(ConvertVectorSnafu)?;
let result = filter.evaluate_vector(&vector).context(RecordBatchSnafu)?;
mask = mask.bitand(&result);
} else {
// Column not found in projection, continue
continue;
}
}
let filtered_batch =
datatypes::arrow::compute::filter_record_batch(&input, &BooleanArray::from(mask))
.context(ComputeArrowSnafu)?;
if filtered_batch.num_rows() > 0 {
Ok(Some(filtered_batch))
} else {
Ok(None)
}
}
}

View File

@@ -1332,3 +1332,53 @@ where
self.next_inner()
}
}
/// Reader to read a row group of a parquet file in flat format, returning RecordBatch.
pub(crate) struct FlatRowGroupReader {
/// Context for file ranges.
context: FileRangeContextRef,
/// Inner parquet reader.
reader: ParquetRecordBatchReader,
/// Cached sequence array to override sequences.
override_sequence: Option<ArrayRef>,
}
impl FlatRowGroupReader {
/// Creates a new flat reader from file range.
pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
// The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE.
let override_sequence = context
.read_format()
.new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
Self {
context,
reader,
override_sequence,
}
}
/// Returns the next RecordBatch.
pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
match self.reader.next() {
Some(batch_result) => {
let record_batch = batch_result.context(ArrowReaderSnafu {
path: self.context.file_path(),
})?;
// Apply override sequence if needed
if let (Some(flat_format), Some(override_array)) = (
self.context.read_format().as_flat(),
&self.override_sequence,
) {
let converted =
flat_format.convert_batch(record_batch, Some(override_array))?;
return Ok(Some(converted));
}
Ok(Some(record_batch))
}
None => Ok(None),
}
}
}