diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 5269d0eabd..65b070662a 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -30,7 +30,7 @@ use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, SequenceNumber}; use crate::config::MitoConfig; -use crate::error::Result; +use crate::error::{Result, UnsupportedOperationSnafu}; use crate::flush::WriteBufferManagerRef; use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder}; use crate::memtable::time_series::TimeSeriesMemtableBuilder; @@ -401,6 +401,23 @@ pub(crate) struct MemScanMetricsData { pub trait IterBuilder: Send + Sync { /// Returns the iterator to read the range. fn build(&self, metrics: Option) -> Result; + + /// Returns whether the iterator is a record batch iterator. + fn is_record_batch(&self) -> bool { + false + } + + /// Returns the record batch iterator to read the range. + fn build_record_batch( + &self, + metrics: Option, + ) -> Result { + let _metrics = metrics; + UnsupportedOperationSnafu { + err_msg: "Record batch iterator is not supported by this memtable", + } + .fail() + } } pub type BoxedIterBuilder = Box; @@ -471,6 +488,22 @@ impl MemtableRange { self.context.builder.build(None) } + /// Builds a record batch iterator to read all rows in range. + /// + /// This method doesn't take the optional time range because a bulk part is immutable + /// so we don't need to filter rows out of the time range. + pub fn build_record_batch_iter( + &self, + metrics: Option, + ) -> Result { + self.context.builder.build_record_batch(metrics) + } + + /// Returns whether the iterator is a record batch iterator. + pub fn is_record_batch(&self) -> bool { + self.context.builder.is_record_batch() + } + pub fn num_rows(&self) -> usize { self.num_rows } diff --git a/src/mito2/src/read/prune.rs b/src/mito2/src/read/prune.rs index 80373afc11..3a5fc81e30 100644 --- a/src/mito2/src/read/prune.rs +++ b/src/mito2/src/read/prune.rs @@ -19,6 +19,7 @@ use common_recordbatch::filter::SimpleFilterEvaluator; use common_time::Timestamp; use datatypes::arrow::array::BooleanArray; use datatypes::arrow::buffer::BooleanBuffer; +use datatypes::arrow::record_batch::RecordBatch; use snafu::ResultExt; use crate::error::{RecordBatchSnafu, Result}; @@ -27,7 +28,7 @@ use crate::read::last_row::RowGroupLastRowCachedReader; use crate::read::{Batch, BatchReader}; use crate::sst::file::FileTimeRange; use crate::sst::parquet::file_range::FileRangeContextRef; -use crate::sst::parquet::reader::{ReaderMetrics, RowGroupReader}; +use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics, RowGroupReader}; pub enum Source { RowGroup(RowGroupReader), @@ -238,6 +239,94 @@ impl Iterator for PruneTimeIterator { } } +pub enum FlatSource { + RowGroup(FlatRowGroupReader), +} + +impl FlatSource { + fn next_batch(&mut self) -> Result> { + match self { + FlatSource::RowGroup(r) => r.next_batch(), + } + } +} + +/// A flat format reader that returns RecordBatch instead of Batch. +pub struct FlatPruneReader { + /// Context for file ranges. + context: FileRangeContextRef, + source: FlatSource, + metrics: ReaderMetrics, +} + +impl FlatPruneReader { + pub(crate) fn new_with_row_group_reader( + ctx: FileRangeContextRef, + reader: FlatRowGroupReader, + ) -> Self { + Self { + context: ctx, + source: FlatSource::RowGroup(reader), + metrics: Default::default(), + } + } + + /// Merge metrics with the inner reader and return the merged metrics. + pub(crate) fn metrics(&self) -> ReaderMetrics { + // FlatRowGroupReader doesn't collect metrics, so just return our own + self.metrics.clone() + } + + pub(crate) fn next_batch(&mut self) -> Result> { + while let Some(record_batch) = { + let start = std::time::Instant::now(); + let batch = self.source.next_batch()?; + self.metrics.scan_cost += start.elapsed(); + batch + } { + // Update metrics for the received batch + self.metrics.num_rows += record_batch.num_rows(); + self.metrics.num_batches += 1; + + match self.prune_flat(record_batch)? { + Some(filtered_batch) => { + return Ok(Some(filtered_batch)); + } + None => { + continue; + } + } + } + + Ok(None) + } + + /// Prunes batches by the pushed down predicate and returns RecordBatch. + fn prune_flat(&mut self, record_batch: RecordBatch) -> Result> { + // fast path + if self.context.filters().is_empty() { + return Ok(Some(record_batch)); + } + + let num_rows_before_filter = record_batch.num_rows(); + let Some(filtered_batch) = self.context.precise_filter_flat(record_batch)? else { + // the entire batch is filtered out + self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter; + return Ok(None); + }; + + // update metric + let filtered_rows = num_rows_before_filter - filtered_batch.num_rows(); + self.metrics.filter_metrics.rows_precise_filtered += filtered_rows; + + if filtered_batch.num_rows() > 0 { + Ok(Some(filtered_batch)) + } else { + Ok(None) + } + } +} + #[cfg(test)] mod tests { use api::v1::OpType; diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 820fe5a1cd..1e37aacf65 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -20,12 +20,14 @@ use std::time::{Duration, Instant}; use async_stream::try_stream; use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time}; +use datatypes::arrow::record_batch::RecordBatch; use futures::Stream; use prometheus::IntGauge; use smallvec::SmallVec; +use snafu::OptionExt; use store_api::storage::RegionId; -use crate::error::Result; +use crate::error::{Result, UnexpectedSnafu}; use crate::memtable::MemScanMetrics; use crate::metrics::{ IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROWS_IN_ROW_GROUP_TOTAL, @@ -33,7 +35,7 @@ use crate::metrics::{ }; use crate::read::range::{RangeBuilderList, RowGroupIndex}; use crate::read::scan_region::StreamContext; -use crate::read::{Batch, BoxedBatchStream, ScannerMetrics, Source}; +use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source}; use crate::sst::file::FileTimeRange; use crate::sst::parquet::file_range::FileRange; use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics}; @@ -646,6 +648,35 @@ pub(crate) fn scan_mem_ranges( } } +/// Scans memtable ranges at `index` using flat format that returns RecordBatch. +#[allow(dead_code)] +pub(crate) fn scan_flat_mem_ranges( + stream_ctx: Arc, + part_metrics: PartitionMetrics, + index: RowGroupIndex, +) -> impl Stream> { + try_stream! { + let ranges = stream_ctx.input.build_mem_ranges(index); + part_metrics.inc_num_mem_ranges(ranges.len()); + for range in ranges { + let build_reader_start = Instant::now(); + let mem_scan_metrics = Some(MemScanMetrics::default()); + let mut iter = range.build_record_batch_iter(mem_scan_metrics.clone())?; + part_metrics.inc_build_reader_cost(build_reader_start.elapsed()); + + while let Some(record_batch) = iter.next().transpose()? { + yield record_batch; + } + + // Report the memtable scan metrics to partition metrics + if let Some(ref metrics) = mem_scan_metrics { + let data = metrics.data(); + part_metrics.report_mem_scan_metrics(&data); + } + } + } +} + /// Scans file ranges at `index`. pub(crate) async fn scan_file_ranges( stream_ctx: Arc, @@ -669,6 +700,30 @@ pub(crate) async fn scan_file_ranges( )) } +/// Scans file ranges at `index` using flat reader that returns RecordBatch. +#[allow(dead_code)] +pub(crate) async fn scan_flat_file_ranges( + stream_ctx: Arc, + part_metrics: PartitionMetrics, + index: RowGroupIndex, + read_type: &'static str, + range_builder: Arc, +) -> Result>> { + let mut reader_metrics = ReaderMetrics::default(); + let ranges = range_builder + .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics) + .await?; + part_metrics.inc_num_file_ranges(ranges.len()); + part_metrics.merge_reader_metrics(&reader_metrics); + + Ok(build_flat_file_range_scan_stream( + stream_ctx, + part_metrics, + read_type, + ranges, + )) +} + /// Build the stream of scanning the input [`FileRange`]s. pub fn build_file_range_scan_stream( stream_ctx: Arc, @@ -704,6 +759,49 @@ pub fn build_file_range_scan_stream( } } +/// Build the stream of scanning the input [`FileRange`]s using flat reader that returns RecordBatch. +pub fn build_flat_file_range_scan_stream( + _stream_ctx: Arc, + part_metrics: PartitionMetrics, + read_type: &'static str, + ranges: SmallVec<[FileRange; 2]>, +) -> impl Stream> { + try_stream! { + let reader_metrics = &mut ReaderMetrics::default(); + for range in ranges { + let build_reader_start = Instant::now(); + let mut reader = range.flat_reader().await?; + let build_cost = build_reader_start.elapsed(); + part_metrics.inc_build_reader_cost(build_cost); + + let may_compat = range + .compat_batch() + .map(|compat| { + compat.as_flat().context(UnexpectedSnafu { + reason: "Invalid compat for flat format", + }) + }) + .transpose()?; + while let Some(record_batch) = reader.next_batch()? { + if let Some(flat_compat) = may_compat { + let batch = flat_compat.compat(record_batch)?; + yield batch; + } else { + yield record_batch; + } + } + + let prune_metrics = reader.metrics(); + reader_metrics.merge_from(&prune_metrics); + } + + // Reports metrics. + reader_metrics.observe_rows(read_type); + reader_metrics.filter_metrics.observe(); + part_metrics.merge_reader_metrics(reader_metrics); + } +} + /// Build the stream of scanning the extension range denoted by the [`RowGroupIndex`]. #[cfg(feature = "enterprise")] pub(crate) async fn scan_extension_range( @@ -744,3 +842,19 @@ pub(crate) async fn maybe_scan_other_ranges( .fail() } } + +#[allow(dead_code)] +pub(crate) async fn maybe_scan_flat_other_ranges( + context: &Arc, + index: RowGroupIndex, + metrics: &PartitionMetrics, +) -> Result { + let _ = context; + let _ = index; + let _ = metrics; + + crate::error::UnexpectedSnafu { + reason: "no other ranges scannable in flat format", + } + .fail() +} diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 24697c68ec..6e3dcc585e 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -22,23 +22,24 @@ use api::v1::{OpType, SemanticType}; use common_telemetry::error; use datatypes::arrow::array::BooleanArray; use datatypes::arrow::buffer::BooleanBuffer; +use datatypes::arrow::record_batch::RecordBatch; use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec}; use parquet::arrow::arrow_reader::RowSelection; use snafu::{OptionExt, ResultExt}; use store_api::storage::TimeSeriesRowSelector; use crate::error::{ - DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu, RecordBatchSnafu, Result, - StatsNotPresentSnafu, + ComputeArrowSnafu, ConvertVectorSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu, + RecordBatchSnafu, Result, StatsNotPresentSnafu, }; use crate::read::compat::CompatBatch; use crate::read::last_row::RowGroupLastRowCachedReader; -use crate::read::prune::PruneReader; +use crate::read::prune::{FlatPruneReader, PruneReader}; use crate::read::Batch; use crate::sst::file::FileHandle; use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::reader::{ - MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext, + FlatRowGroupReader, MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext, }; /// A range of a parquet SST. Now it is a row group. @@ -132,6 +133,21 @@ impl FileRange { Ok(prune_reader) } + /// Creates a flat reader that returns RecordBatch. + pub(crate) async fn flat_reader(&self) -> Result { + let parquet_reader = self + .context + .reader_builder + .build(self.row_group_idx, self.row_selection.clone()) + .await?; + + let flat_row_group_reader = FlatRowGroupReader::new(self.context.clone(), parquet_reader); + let flat_prune_reader = + FlatPruneReader::new_with_row_group_reader(self.context.clone(), flat_row_group_reader); + + Ok(flat_prune_reader) + } + /// Returns the helper to compat batches. pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> { self.context.compat_batch() @@ -208,6 +224,11 @@ impl FileRangeContext { self.base.precise_filter(input) } + /// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch. + pub(crate) fn precise_filter_flat(&self, input: RecordBatch) -> Result> { + self.base.precise_filter_flat(input) + } + //// Decodes parquet metadata and finds if row group contains delete op. pub(crate) fn contains_delete(&self, row_group_index: usize) -> Result { let metadata = self.reader_builder.parquet_metadata(); @@ -334,4 +355,51 @@ impl RangeBase { Ok(Some(input)) } + + /// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch. + pub(crate) fn precise_filter_flat(&self, input: RecordBatch) -> Result> { + let mut mask = BooleanBuffer::new_set(input.num_rows()); + + let flat_format = self + .read_format + .as_flat() + .context(crate::error::UnexpectedSnafu { + reason: "Expected flat format for precise_filter_flat", + })?; + + // Run filter one by one and combine them result + for filter_ctx in &self.filters { + let filter = match filter_ctx.filter() { + MaybeFilter::Filter(f) => f, + // Column matches. + MaybeFilter::Matched => continue, + // Column doesn't match, filter the entire batch. + MaybeFilter::Pruned => return Ok(None), + }; + + // Get the column directly by its projected index + let column_idx = flat_format.projected_index_by_id(filter_ctx.column_id()); + if let Some(idx) = column_idx { + let column = &input.columns()[idx]; + // Convert Arrow Array to Vector + let vector = datatypes::vectors::Helper::try_into_vector(column.clone()) + .context(ConvertVectorSnafu)?; + let result = filter.evaluate_vector(&vector).context(RecordBatchSnafu)?; + mask = mask.bitand(&result); + } else { + // Column not found in projection, continue + continue; + } + } + + let filtered_batch = + datatypes::arrow::compute::filter_record_batch(&input, &BooleanArray::from(mask)) + .context(ComputeArrowSnafu)?; + + if filtered_batch.num_rows() > 0 { + Ok(Some(filtered_batch)) + } else { + Ok(None) + } + } } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index f70beeac1b..edc3be2465 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -1332,3 +1332,53 @@ where self.next_inner() } } + +/// Reader to read a row group of a parquet file in flat format, returning RecordBatch. +pub(crate) struct FlatRowGroupReader { + /// Context for file ranges. + context: FileRangeContextRef, + /// Inner parquet reader. + reader: ParquetRecordBatchReader, + /// Cached sequence array to override sequences. + override_sequence: Option, +} + +impl FlatRowGroupReader { + /// Creates a new flat reader from file range. + pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self { + // The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE. + let override_sequence = context + .read_format() + .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE); + + Self { + context, + reader, + override_sequence, + } + } + + /// Returns the next RecordBatch. + pub(crate) fn next_batch(&mut self) -> Result> { + match self.reader.next() { + Some(batch_result) => { + let record_batch = batch_result.context(ArrowReaderSnafu { + path: self.context.file_path(), + })?; + + // Apply override sequence if needed + if let (Some(flat_format), Some(override_array)) = ( + self.context.read_format().as_flat(), + &self.override_sequence, + ) { + let converted = + flat_format.convert_batch(record_batch, Some(override_array))?; + return Ok(Some(converted)); + } + + Ok(Some(record_batch)) + } + None => Ok(None), + } + } +}