From 767c3b44c8a62e66249be9d279ee50f9b60bfa67 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sun, 22 Mar 2026 01:37:36 +0800 Subject: [PATCH] feat: prefilter flat parquet scans by primary key --- src/mito2/src/error.rs | 10 + src/mito2/src/memtable/bulk/context.rs | 1 + src/mito2/src/read/last_row.rs | 8 +- src/mito2/src/read/prune.rs | 168 +++++++- src/mito2/src/read/scan_util.rs | 2 +- src/mito2/src/sst/parquet.rs | 12 +- src/mito2/src/sst/parquet/file_range.rs | 392 +++++++++++++++++-- src/mito2/src/sst/parquet/flat_format.rs | 9 + src/mito2/src/sst/parquet/reader.rs | 285 +++++++++----- src/mito2/src/sst/parquet/row_group.rs | 468 ++++++++++++++++++++++- 10 files changed, 1196 insertions(+), 159 deletions(-) diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index c6b69fe607..923d8a2713 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -616,6 +616,15 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to read arrow record batch from parquet file {}", path))] + ArrowReader { + path: String, + #[snafu(source)] + error: ArrowError, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Column not found, column: {column}"))] ColumnNotFound { column: String, @@ -1340,6 +1349,7 @@ impl ErrorExt for Error { RegionState { .. } | UpdateManifest { .. } => StatusCode::RegionNotReady, JsonOptions { .. } => StatusCode::InvalidArguments, EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound, + ArrowReader { .. } => StatusCode::StorageUnavailable, ConvertValue { source, .. } => source.status_code(), ApplyBloomFilterIndex { source, .. } => source.status_code(), InvalidPartitionExpr { source, .. } => source.status_code(), diff --git a/src/mito2/src/memtable/bulk/context.rs b/src/mito2/src/memtable/bulk/context.rs index c3274d30e9..228fa2d6c7 100644 --- a/src/mito2/src/memtable/bulk/context.rs +++ b/src/mito2/src/memtable/bulk/context.rs @@ -90,6 +90,7 @@ impl BulkIterContext { Ok(Self { base: RangeBase { filters: simple_filters, + primary_key_filters: None, dyn_filters, read_format, prune_schema: region_metadata.schema.clone(), diff --git a/src/mito2/src/read/last_row.rs b/src/mito2/src/read/last_row.rs index 1dc4102311..0c13c120a0 100644 --- a/src/mito2/src/read/last_row.rs +++ b/src/mito2/src/read/last_row.rs @@ -333,10 +333,10 @@ impl FlatRowGroupLastRowCachedReader { } /// Returns the next RecordBatch. - pub(crate) async fn next_batch(&mut self) -> Result> { + pub(crate) fn next_batch(&mut self) -> Result> { match self { FlatRowGroupLastRowCachedReader::Hit(r) => r.next_batch(), - FlatRowGroupLastRowCachedReader::Miss(r) => r.next_batch().await, + FlatRowGroupLastRowCachedReader::Miss(r) => r.next_batch(), } } @@ -466,12 +466,12 @@ impl FlatRowGroupLastRowReader { Ok(Some(merged)) } - async fn next_batch(&mut self) -> Result> { + fn next_batch(&mut self) -> Result> { if self.pending.is_full() { return self.flush_pending(); } - while let Some(batch) = self.reader.next_batch().await? { + while let Some(batch) = self.reader.next_batch()? { self.selector.on_next(batch, &mut self.pending)?; if self.pending.is_full() { return self.flush_pending(); diff --git a/src/mito2/src/read/prune.rs b/src/mito2/src/read/prune.rs index 6766bf3f38..75c649c3d7 100644 --- a/src/mito2/src/read/prune.rs +++ b/src/mito2/src/read/prune.rs @@ -19,15 +19,19 @@ use common_recordbatch::filter::SimpleFilterEvaluator; use common_time::Timestamp; use datatypes::arrow::array::BooleanArray; use datatypes::arrow::buffer::BooleanBuffer; +use datatypes::arrow::compute::concat_batches; use datatypes::arrow::record_batch::RecordBatch; -use snafu::ResultExt; +use mito_codec::row_converter::PrimaryKeyFilter; +use snafu::{OptionExt, ResultExt}; -use crate::error::{RecordBatchSnafu, Result}; +use crate::error::{ComputeArrowSnafu, RecordBatchSnafu, Result, UnexpectedSnafu}; use crate::memtable::BoxedBatchIterator; use crate::read::last_row::{FlatRowGroupLastRowCachedReader, RowGroupLastRowCachedReader}; use crate::read::{Batch, BatchReader}; use crate::sst::file::FileTimeRange; use crate::sst::parquet::file_range::FileRangeContextRef; +use crate::sst::parquet::flat_format::primary_key_column_index; +use crate::sst::parquet::format::PrimaryKeyArray; use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics, RowGroupReader}; pub enum Source { @@ -247,12 +251,80 @@ pub enum FlatSource { } impl FlatSource { - async fn next_batch(&mut self) -> Result> { + fn next_raw_batch(&mut self) -> Result> { match self { - FlatSource::RowGroup(r) => r.next_batch().await, - FlatSource::LastRow(r) => r.next_batch().await, + FlatSource::RowGroup(r) => r.next_raw_batch(), + FlatSource::LastRow(r) => r.next_batch(), } } + + fn convert_batch(&self, batch: RecordBatch) -> Result { + match self { + FlatSource::RowGroup(r) => r.convert_batch(batch), + FlatSource::LastRow(_) => Ok(batch), + } + } +} + +struct CachedPrimaryKeyFilter { + inner: Box, + last_primary_key: Vec, + last_match: Option, +} + +impl CachedPrimaryKeyFilter { + fn new(inner: Box) -> Self { + Self { + inner, + last_primary_key: Vec::new(), + last_match: None, + } + } +} + +impl PrimaryKeyFilter for CachedPrimaryKeyFilter { + fn matches(&mut self, pk: &[u8]) -> bool { + if let Some(last_match) = self.last_match + && self.last_primary_key == pk + { + return last_match; + } + + let matched = self.inner.matches(pk); + self.last_primary_key.clear(); + self.last_primary_key.extend_from_slice(pk); + self.last_match = Some(matched); + matched + } +} + +fn batch_single_primary_key(batch: &RecordBatch) -> Result> { + let primary_key_index = primary_key_column_index(batch.num_columns()); + let pk_dict_array = batch + .column(primary_key_index) + .as_any() + .downcast_ref::() + .context(UnexpectedSnafu { + reason: "Primary key column is not a dictionary array".to_string(), + })?; + let pk_values = pk_dict_array + .values() + .as_any() + .downcast_ref::() + .context(UnexpectedSnafu { + reason: "Primary key values are not binary array".to_string(), + })?; + let keys = pk_dict_array.keys(); + if keys.is_empty() { + return Ok(None); + } + + let first_key = keys.value(0); + if first_key != keys.value(keys.len() - 1) { + return Ok(None); + } + + Ok(Some(pk_values.value(first_key as usize))) } /// A flat format reader that returns RecordBatch instead of Batch. @@ -260,6 +332,8 @@ pub struct FlatPruneReader { /// Context for file ranges. context: FileRangeContextRef, source: FlatSource, + primary_key_filter: Option, + buffered_prefiltered_batch: Option, metrics: ReaderMetrics, /// Whether to skip field filters for this row group. skip_fields: bool, @@ -272,6 +346,10 @@ impl FlatPruneReader { skip_fields: bool, ) -> Self { Self { + primary_key_filter: ctx + .new_primary_key_filter() + .map(CachedPrimaryKeyFilter::new), + buffered_prefiltered_batch: None, context: ctx, source: FlatSource::RowGroup(reader), metrics: Default::default(), @@ -285,6 +363,8 @@ impl FlatPruneReader { skip_fields: bool, ) -> Self { Self { + primary_key_filter: None, + buffered_prefiltered_batch: None, context: ctx, source: FlatSource::LastRow(reader), metrics: Default::default(), @@ -297,20 +377,18 @@ impl FlatPruneReader { self.metrics.clone() } - pub(crate) async fn next_batch(&mut self) -> Result> { + pub(crate) fn next_batch(&mut self) -> Result> { loop { - let start = std::time::Instant::now(); - let batch = self.source.next_batch().await?; - self.metrics.scan_cost += start.elapsed(); - - let Some(record_batch) = batch else { + let Some(mut raw_batch) = self.next_prefiltered_batch()? else { return Ok(None); }; - // Update metrics for the received batch - self.metrics.num_rows += record_batch.num_rows(); - self.metrics.num_batches += 1; + let scan_start = std::time::Instant::now(); + self.coalesce_prefiltered_batches(&mut raw_batch)?; + let record_batch = self.source.convert_batch(raw_batch)?; + self.metrics.scan_cost += scan_start.elapsed(); + self.metrics.num_batches += 1; match self.prune_flat(record_batch)? { Some(filtered_batch) => { return Ok(Some(filtered_batch)); @@ -322,6 +400,68 @@ impl FlatPruneReader { } } + fn next_prefiltered_batch(&mut self) -> Result> { + if let Some(batch) = self.buffered_prefiltered_batch.take() { + return Ok(Some(batch)); + } + + loop { + let start = std::time::Instant::now(); + let Some(raw_batch) = self.source.next_raw_batch()? else { + return Ok(None); + }; + + self.metrics.num_rows += raw_batch.num_rows(); + + let num_rows_before_prefilter = raw_batch.num_rows(); + let Some(prefiltered_batch) = self.prefilter_primary_keys(raw_batch)? else { + self.metrics.scan_cost += start.elapsed(); + self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_prefilter; + continue; + }; + let prefiltered_rows = num_rows_before_prefilter - prefiltered_batch.num_rows(); + self.metrics.filter_metrics.rows_precise_filtered += prefiltered_rows; + self.metrics.scan_cost += start.elapsed(); + return Ok(Some(prefiltered_batch)); + } + } + + fn coalesce_prefiltered_batches(&mut self, batch: &mut RecordBatch) -> Result<()> { + let Some(primary_key) = batch_single_primary_key(batch)? else { + return Ok(()); + }; + let primary_key = primary_key.to_vec(); + let schema = batch.schema(); + let mut batches: Vec = Vec::new(); + + while let Some(next_batch) = self.next_prefiltered_batch()? { + if batch_single_primary_key(&next_batch)? == Some(primary_key.as_slice()) { + if batches.is_empty() { + batches.push(batch.clone()); + } + batches.push(next_batch); + } else { + self.buffered_prefiltered_batch = Some(next_batch); + break; + } + } + + if !batches.is_empty() { + *batch = concat_batches(&schema, &batches).context(ComputeArrowSnafu)?; + } + + Ok(()) + } + + fn prefilter_primary_keys(&mut self, record_batch: RecordBatch) -> Result> { + let Some(primary_key_filter) = self.primary_key_filter.as_mut() else { + return Ok(Some(record_batch)); + }; + + self.context + .prefilter_flat_batch_by_primary_key(record_batch, primary_key_filter) + } + /// Prunes batches by the pushed down predicate and returns RecordBatch. fn prune_flat(&mut self, record_batch: RecordBatch) -> Result> { // fast path diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 9bf1c17276..6f68616709 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -1533,7 +1533,7 @@ pub fn build_flat_file_range_scan_stream( .transpose()?; let mapper = range.compaction_projection_mapper(); - while let Some(record_batch) = reader.next_batch().await? { + while let Some(record_batch) = reader.next_batch()? { let record_batch = if let Some(mapper) = mapper { let batch = mapper.project(record_batch)?; batch diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 79a08a209d..796fedf9a0 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -41,9 +41,17 @@ pub mod writer; pub const PARQUET_METADATA_KEY: &str = "greptime:metadata"; /// Default batch size to read parquet files. -pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024; +/// +/// This is a runtime-only scan granularity, so we align it with DataFusion's +/// default execution batch size to reduce rebatching and concatenation in the +/// query pipeline. +pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 8 * 1024; /// Default row group size for parquet files. -pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE; +/// +/// Keep the existing persisted/on-disk default stable. It intentionally stays +/// decoupled from [`DEFAULT_READ_BATCH_SIZE`] so we can tune runtime scan +/// batching without changing the row group layout of newly written SSTs. +pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * 1024; /// Parquet write options. #[derive(Debug, Clone)] diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 3a5251cb1a..d0acdd1a64 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -16,10 +16,11 @@ //! is usually a row group in a parquet file. use std::collections::HashMap; -use std::ops::BitAnd; +use std::ops::{BitAnd, Range}; use std::sync::Arc; use api::v1::{OpType, SemanticType}; +use common_recordbatch::filter::SimpleFilterEvaluator; use common_telemetry::error; use datafusion::physical_plan::PhysicalExpr; use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr; @@ -28,17 +29,18 @@ use datatypes::arrow::buffer::BooleanBuffer; use datatypes::arrow::record_batch::RecordBatch; use datatypes::prelude::ConcreteDataType; use datatypes::schema::Schema; -use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec}; +use mito_codec::primary_key_filter::is_partition_column; +use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec, PrimaryKeyFilter}; use parquet::arrow::arrow_reader::RowSelection; use parquet::file::metadata::ParquetMetaData; use snafu::{OptionExt, ResultExt}; use store_api::codec::PrimaryKeyEncoding; -use store_api::metadata::RegionMetadataRef; +use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::storage::{ColumnId, TimeSeriesRowSelector}; use table::predicate::Predicate; use crate::error::{ - ComputeArrowSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu, + ArrowReaderSnafu, ComputeArrowSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu, EvalPartitionFilterSnafu, NewRecordBatchSnafu, RecordBatchSnafu, Result, StatsNotPresentSnafu, UnexpectedSnafu, }; @@ -49,13 +51,14 @@ use crate::read::last_row::{FlatRowGroupLastRowCachedReader, RowGroupLastRowCach use crate::read::prune::{FlatPruneReader, PruneReader}; use crate::sst::file::FileHandle; use crate::sst::parquet::flat_format::{ - DecodedPrimaryKeys, decode_primary_keys, time_index_column_index, + DecodedPrimaryKeys, decode_primary_keys, primary_key_column_index, time_index_column_index, }; -use crate::sst::parquet::format::ReadFormat; +use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat}; use crate::sst::parquet::reader::{ FlatRowGroupReader, MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext, }; -use crate::sst::parquet::row_group::ParquetFetchMetrics; +use crate::sst::parquet::row_group::{InMemoryRowGroup, ParquetFetchMetrics}; +use crate::sst::parquet::row_selection::row_selection_from_row_ranges; use crate::sst::parquet::stats::RowGroupPruningStats; /// Checks if a row group contains delete operations by examining the min value of op_type column. @@ -144,7 +147,7 @@ impl FileRange { std::slice::from_ref(curr_row_group), read_format, self.context.base.expected_metadata.clone(), - self.compute_skip_fields(), + self.context.should_skip_fields(self.row_group_idx), ); // not costly to create a predicate here since dynamic filters are wrapped in Arc @@ -156,22 +159,6 @@ impl FileRange { .unwrap_or(true) // unexpected, not skip just in case } - fn compute_skip_fields(&self) -> bool { - match self.context.base.pre_filter_mode { - PreFilterMode::All => false, - PreFilterMode::SkipFields => true, - PreFilterMode::SkipFieldsOnDelete => { - // Check if this specific row group contains delete op - row_group_contains_delete( - self.context.reader_builder.parquet_metadata(), - self.row_group_idx, - self.context.reader_builder.file_path(), - ) - .unwrap_or(true) - } - } - } - /// Returns a reader to read the [FileRange]. pub(crate) async fn reader( &self, @@ -243,15 +230,6 @@ impl FileRange { if !self.in_dynamic_filter_range() { return Ok(None); } - let parquet_reader = self - .context - .reader_builder - .build( - self.row_group_idx, - self.row_selection.clone(), - fetch_metrics, - ) - .await?; let use_last_row_reader = if selector .map(|s| s == TimeSeriesRowSelector::LastRow) @@ -275,8 +253,21 @@ impl FileRange { let skip_fields = self.context.should_skip_fields(self.row_group_idx); let flat_prune_reader = if use_last_row_reader { - let flat_row_group_reader = - FlatRowGroupReader::new(self.context.clone(), parquet_reader); + let row_selection = self.row_selection.clone(); + if row_selection + .as_ref() + .is_some_and(|selection| selection.row_count() == 0) + { + return Ok(None); + } + + let flat_row_group_reader = FlatRowGroupReader::new( + self.context.clone(), + self.context + .reader_builder + .build(self.row_group_idx, row_selection, fetch_metrics) + .await?, + ); let reader = FlatRowGroupLastRowCachedReader::new( self.file_handle().file_id().file_id(), self.row_group_idx, @@ -286,18 +277,138 @@ impl FileRange { ); FlatPruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields) } else { - let flat_row_group_reader = - FlatRowGroupReader::new(self.context.clone(), parquet_reader); - FlatPruneReader::new_with_row_group_reader( - self.context.clone(), - flat_row_group_reader, - skip_fields, - ) + match self.prefiltered_flat_reader_input(fetch_metrics).await? { + PrefilteredFlatReaderInput::Selection(row_selection) => { + if row_selection + .as_ref() + .is_some_and(|selection| selection.row_count() == 0) + { + return Ok(None); + } + + let flat_row_group_reader = FlatRowGroupReader::new( + self.context.clone(), + self.context + .reader_builder + .build(self.row_group_idx, row_selection, fetch_metrics) + .await?, + ); + FlatPruneReader::new_with_row_group_reader( + self.context.clone(), + flat_row_group_reader, + skip_fields, + ) + } + PrefilteredFlatReaderInput::Prefetched(mut row_group) => { + let flat_row_group_reader = FlatRowGroupReader::new( + self.context.clone(), + self.context + .reader_builder + .build_on_row_group( + &mut row_group, + self.row_selection.clone(), + fetch_metrics, + ) + .await?, + ); + FlatPruneReader::new_with_row_group_reader( + self.context.clone(), + flat_row_group_reader, + skip_fields, + ) + } + } }; Ok(Some(flat_prune_reader)) } + async fn prefiltered_flat_reader_input<'a>( + &'a self, + fetch_metrics: Option<&ParquetFetchMetrics>, + ) -> Result> { + if !self.select_all() { + return Ok(PrefilteredFlatReaderInput::Selection( + self.row_selection.clone(), + )); + } + + let Some(mut primary_key_filter) = self.context.new_primary_key_filter() else { + return Ok(PrefilteredFlatReaderInput::Selection( + self.row_selection.clone(), + )); + }; + + let read_format = ReadFormat::new_flat( + self.context.read_format().metadata().clone(), + std::iter::empty::(), + Some( + self.context + .reader_builder + .parquet_metadata() + .file_metadata() + .schema_descr() + .num_columns(), + ), + self.context.file_path(), + false, + )?; + let mut row_group = self + .context + .reader_builder + .new_in_memory_row_group(self.row_group_idx); + let reader = self + .context + .reader_builder + .build_on_row_group_with_read_format( + &mut row_group, + self.row_selection.clone(), + fetch_metrics, + &read_format, + ) + .await?; + + let rows_in_group = self + .context + .reader_builder + .parquet_metadata() + .row_group(self.row_group_idx) + .num_rows() as usize; + let mut matched_row_ranges: Vec> = Vec::new(); + let mut row_offset = 0; + for batch_result in reader { + let batch = batch_result.context(ArrowReaderSnafu { + path: self.context.file_path(), + })?; + let batch_num_rows = batch.num_rows(); + matched_row_ranges.extend( + self.context + .base + .matching_row_ranges_by_primary_key(&batch, primary_key_filter.as_mut())? + .into_iter() + .map(|range| (range.start + row_offset)..(range.end + row_offset)), + ); + row_offset += batch_num_rows; + } + + if matched_row_ranges.is_empty() { + return Ok(PrefilteredFlatReaderInput::Selection(Some( + RowSelection::from(vec![]), + ))); + } + + if matched_row_ranges.len() == 1 + && matched_row_ranges[0].start == 0 + && matched_row_ranges[0].end == rows_in_group + { + return Ok(PrefilteredFlatReaderInput::Prefetched(row_group)); + } + + Ok(PrefilteredFlatReaderInput::Selection(Some( + row_selection_from_row_ranges(matched_row_ranges.into_iter(), rows_in_group), + ))) + } + /// Returns the helper to compat batches. pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> { self.context.compat_batch() @@ -314,6 +425,11 @@ impl FileRange { } } +enum PrefilteredFlatReaderInput<'a> { + Selection(Option), + Prefetched(InMemoryRowGroup<'a>), +} + /// Context shared by ranges of the same parquet SST. pub(crate) struct FileRangeContext { /// Row group reader builder for the file. @@ -343,6 +459,11 @@ impl FileRangeContext { &self.base.filters } + /// Builds an encoded primary-key filter for flat scan pre-filtering. + pub(crate) fn new_primary_key_filter(&self) -> Option> { + self.base.new_primary_key_filter() + } + /// Returns true if a partition filter is configured. pub(crate) fn has_partition_filter(&self) -> bool { self.base.partition_filter.is_some() @@ -390,6 +511,16 @@ impl FileRangeContext { self.base.precise_filter_flat(input, skip_fields) } + /// Applies an encoded primary-key prefilter to the input `RecordBatch`. + pub(crate) fn prefilter_flat_batch_by_primary_key( + &self, + input: RecordBatch, + primary_key_filter: &mut dyn PrimaryKeyFilter, + ) -> Result> { + self.base + .prefilter_flat_batch_by_primary_key(input, primary_key_filter) + } + /// Determines whether to skip field filters based on PreFilterMode and row group delete status. pub(crate) fn should_skip_fields(&self, row_group_idx: usize) -> bool { match self.base.pre_filter_mode { @@ -439,6 +570,11 @@ pub(crate) struct PartitionFilterContext { pub(crate) struct RangeBase { /// Filters pushed down. pub(crate) filters: Vec, + /// Simple filters that can be compiled into encoded primary-key checks. + /// + /// This set is pre-validated against the SST/expected metadata and only contains + /// tag filters on primary-key columns, excluding partition columns. + pub(crate) primary_key_filters: Option>>, /// Dynamic filter physical exprs. pub(crate) dyn_filters: Vec>, /// Helper to read the SST. @@ -473,6 +609,178 @@ impl TagDecodeState { } impl RangeBase { + pub(crate) fn retain_usable_primary_key_filters( + sst_metadata: &RegionMetadataRef, + expected_metadata: Option<&RegionMetadata>, + filters: &mut Vec, + ) { + filters.retain(|filter| { + Self::is_usable_primary_key_filter(sst_metadata, expected_metadata, filter) + }); + } + + fn is_usable_primary_key_filter( + sst_metadata: &RegionMetadataRef, + expected_metadata: Option<&RegionMetadata>, + filter: &SimpleFilterEvaluator, + ) -> bool { + if is_partition_column(filter.column_name()) { + return false; + } + + let sst_column = match expected_metadata { + Some(expected_metadata) => { + let Some(expected_column) = expected_metadata.column_by_name(filter.column_name()) + else { + return false; + }; + let Some(sst_column) = sst_metadata.column_by_id(expected_column.column_id) else { + return false; + }; + + if sst_column.column_schema.name != expected_column.column_schema.name + || sst_column.semantic_type != expected_column.semantic_type + || sst_column.column_schema.data_type != expected_column.column_schema.data_type + { + return false; + } + + sst_column + } + None => { + let Some(sst_column) = sst_metadata.column_by_name(filter.column_name()) else { + return false; + }; + sst_column + } + }; + + sst_column.semantic_type == SemanticType::Tag + && sst_metadata + .primary_key_index(sst_column.column_id) + .is_some() + } + + /// Builds an encoded primary-key filter for flat scan pre-filtering. + pub(crate) fn new_primary_key_filter(&self) -> Option> { + if self.read_format.metadata().primary_key.is_empty() + || !self + .read_format + .as_flat() + .is_some_and(|format| format.raw_batch_has_primary_key_dictionary()) + { + return None; + } + let filters = self.primary_key_filters.as_ref()?; + if filters.is_empty() { + return None; + } + let filters = Arc::clone(filters); + + Some( + self.codec + .primary_key_filter(self.read_format.metadata(), filters), + ) + } + + /// Applies an encoded primary-key prefilter before flat-row materialization. + /// + /// This only prunes rows that are guaranteed to fail simple primary-key predicates. + /// The normal precise filter still runs after flat conversion. + pub(crate) fn prefilter_flat_batch_by_primary_key( + &self, + input: RecordBatch, + primary_key_filter: &mut dyn PrimaryKeyFilter, + ) -> Result> { + if input.num_rows() == 0 { + return Ok(Some(input)); + } + + let matched_row_ranges = + self.matching_row_ranges_by_primary_key(&input, primary_key_filter)?; + if matched_row_ranges.is_empty() { + return Ok(None); + } + + if matched_row_ranges.len() == 1 + && matched_row_ranges[0].start == 0 + && matched_row_ranges[0].end == input.num_rows() + { + return Ok(Some(input)); + } + + if matched_row_ranges.len() == 1 { + let span = &matched_row_ranges[0]; + return Ok(Some(input.slice(span.start, span.end - span.start))); + } + + let mut mask = vec![false; input.num_rows()]; + for span in matched_row_ranges { + mask[span].fill(true); + } + + let filtered = + datatypes::arrow::compute::filter_record_batch(&input, &BooleanArray::from(mask)) + .context(ComputeArrowSnafu)?; + if filtered.num_rows() == 0 { + Ok(None) + } else { + Ok(Some(filtered)) + } + } + + fn matching_row_ranges_by_primary_key( + &self, + input: &RecordBatch, + primary_key_filter: &mut dyn PrimaryKeyFilter, + ) -> Result>> { + let primary_key_index = primary_key_column_index(input.num_columns()); + let pk_dict_array = input + .column(primary_key_index) + .as_any() + .downcast_ref::() + .context(UnexpectedSnafu { + reason: "Primary key column is not a dictionary array".to_string(), + })?; + let pk_values = pk_dict_array + .values() + .as_any() + .downcast_ref::() + .context(UnexpectedSnafu { + reason: "Primary key values are not binary array".to_string(), + })?; + let keys = pk_dict_array.keys(); + let key_values = keys.values(); + + if key_values.is_empty() { + return Ok(std::iter::once(0..input.num_rows()).collect()); + } + + let mut matched_row_ranges: Vec> = Vec::new(); + let mut start = 0; + while start < key_values.len() { + let key = key_values[start]; + let mut end = start + 1; + while end < key_values.len() && key_values[end] == key { + end += 1; + } + + if primary_key_filter.matches(pk_values.value(key as usize)) { + if let Some(last) = matched_row_ranges.last_mut() + && last.end == start + { + last.end = end; + } else { + matched_row_ranges.push(start..end); + } + } + + start = end; + } + + Ok(matched_row_ranges) + } + /// TRY THE BEST to perform pushed down predicate precisely on the input batch. /// Return the filtered batch. If the entire batch is filtered out, return None. /// diff --git a/src/mito2/src/sst/parquet/flat_format.rs b/src/mito2/src/sst/parquet/flat_format.rs index 8a59e9a97d..5281f96d8d 100644 --- a/src/mito2/src/sst/parquet/flat_format.rs +++ b/src/mito2/src/sst/parquet/flat_format.rs @@ -282,6 +282,15 @@ impl FlatReadFormat { } } + /// Returns true if raw parquet batches already use the flat layout with an encoded + /// `__primary_key` dictionary column. + pub(crate) fn raw_batch_has_primary_key_dictionary(&self) -> bool { + match &self.parquet_adapter { + ParquetAdapter::Flat(_) => true, + ParquetAdapter::PrimaryKeyToFlat(_) => false, + } + } + /// Creates a sequence array to override. pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option { self.override_sequence diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index f152c97075..ab59fb406e 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -26,15 +26,14 @@ use common_telemetry::{tracing, warn}; use datafusion_expr::Expr; use datatypes::arrow::array::ArrayRef; use datatypes::arrow::datatypes::Field; +use datatypes::arrow::error::ArrowError; use datatypes::arrow::record_batch::RecordBatch; use datatypes::data_type::ConcreteDataType; use datatypes::prelude::DataType; -use futures::StreamExt; use mito_codec::row_converter::build_primary_key_codec; use object_store::ObjectStore; -use parquet::arrow::ProjectionMask; -use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions, RowSelection}; -use parquet::arrow::async_reader::{ParquetRecordBatchStream, ParquetRecordBatchStreamBuilder}; +use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection}; +use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels}; use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData}; use partition::expr::PartitionExpr; use snafu::ResultExt; @@ -48,7 +47,9 @@ use crate::cache::index::result_cache::PredicateKey; use crate::cache::{CacheStrategy, CachedSstMeta}; #[cfg(feature = "vector_index")] use crate::error::ApplyVectorIndexSnafu; -use crate::error::{ReadDataPartSnafu, ReadParquetSnafu, Result, SerializePartitionExprSnafu}; +use crate::error::{ + ArrowReaderSnafu, ReadDataPartSnafu, ReadParquetSnafu, Result, SerializePartitionExprSnafu, +}; use crate::metrics::{ PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_TOTAL, READ_STAGE_ELAPSED, @@ -69,14 +70,13 @@ use crate::sst::index::inverted_index::applier::{ #[cfg(feature = "vector_index")] use crate::sst::index::vector_index::applier::VectorIndexApplierRef; use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE; -use crate::sst::parquet::async_reader::SstAsyncFileReader; use crate::sst::parquet::file_range::{ FileRangeContext, FileRangeContextRef, PartitionFilterContext, PreFilterMode, RangeBase, row_group_contains_delete, }; use crate::sst::parquet::format::{ReadFormat, need_override_sequence}; use crate::sst::parquet::metadata::MetadataLoader; -use crate::sst::parquet::row_group::ParquetFetchMetrics; +use crate::sst::parquet::row_group::{InMemoryRowGroup, ParquetFetchMetrics}; use crate::sst::parquet::row_selection::RowGroupSelection; use crate::sst::parquet::stats::RowGroupPruningStats; use crate::sst::tag_maybe_to_dictionary_field; @@ -415,12 +415,6 @@ impl ParquetReaderBuilder { .set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get())); } - // Computes the projection mask. - let parquet_schema_desc = parquet_meta.file_metadata().schema_descr(); - let indices = read_format.projection_indices(); - // Now we assumes we don't have nested schemas. - // TODO(yingwen): Revisit this if we introduce nested types such as JSON type. - let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied()); let selection = self .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics) .await; @@ -452,20 +446,26 @@ impl ParquetReaderBuilder { .map(|meta| meta.schema.clone()) .unwrap_or_else(|| region_meta.schema.clone()); - // Create ArrowReaderMetadata for async stream building. - let arrow_reader_options = - ArrowReaderOptions::new().with_schema(read_format.arrow_schema().clone()); - let arrow_metadata = - ArrowReaderMetadata::try_new(parquet_meta.clone(), arrow_reader_options) + // Computes the projection mask. + let parquet_schema_desc = parquet_meta.file_metadata().schema_descr(); + let indices = read_format.projection_indices(); + // Now we assumes we don't have nested schemas. + // TODO(yingwen): Revisit this if we introduce nested types such as JSON type. + let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied()); + + // Computes the field levels. + let hint = Some(read_format.arrow_schema().fields()); + let field_levels = + parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint) .context(ReadDataPartSnafu)?; let reader_builder = RowGroupReaderBuilder { file_handle: self.file_handle.clone(), file_path, parquet_meta, - arrow_metadata, object_store: self.object_store.clone(), projection: projection_mask, + field_levels, cache_strategy: self.cache_strategy.clone(), }; @@ -485,6 +485,20 @@ impl ParquetReaderBuilder { vec![] }; + let primary_key_filters = self.predicate.as_ref().and_then(|predicate| { + let mut filters = predicate + .exprs() + .iter() + .filter_map(SimpleFilterEvaluator::try_new) + .collect::>(); + RangeBase::retain_usable_primary_key_filters( + read_format.metadata(), + self.expected_metadata.as_deref(), + &mut filters, + ); + (!filters.is_empty()).then_some(Arc::new(filters)) + }); + let dyn_filters = if let Some(predicate) = &self.predicate { predicate.dyn_filters().as_ref().clone() } else { @@ -499,6 +513,7 @@ impl ParquetReaderBuilder { reader_builder, RangeBase { filters, + primary_key_filters, dyn_filters, read_format, expected_metadata: self.expected_metadata.clone(), @@ -1640,7 +1655,7 @@ impl ReaderMetrics { } } -/// Builder to build a [ParquetRecordBatchStream] for a row group. +/// Builder to build a [ParquetRecordBatchReader] for a row group. pub(crate) struct RowGroupReaderBuilder { /// SST file to read. /// @@ -1650,12 +1665,12 @@ pub(crate) struct RowGroupReaderBuilder { file_path: String, /// Metadata of the parquet file. parquet_meta: Arc, - /// Arrow reader metadata for building async stream. - arrow_metadata: ArrowReaderMetadata, /// Object store as an Operator. object_store: ObjectStore, /// Projection mask. projection: ProjectionMask, + /// Field levels to read. + field_levels: FieldLevels, /// Cache. cache_strategy: CacheStrategy, } @@ -1679,43 +1694,121 @@ impl RowGroupReaderBuilder { &self.cache_strategy } - /// Builds a [ParquetRecordBatchStream] to read the row group at `row_group_idx`. + pub(crate) fn new_in_memory_row_group(&self, row_group_idx: usize) -> InMemoryRowGroup<'_> { + InMemoryRowGroup::create( + self.file_handle.region_id(), + self.file_handle.file_id().file_id(), + &self.parquet_meta, + row_group_idx, + self.cache_strategy.clone(), + &self.file_path, + self.object_store.clone(), + ) + } + + fn projection_and_field_levels( + &self, + read_format: &ReadFormat, + ) -> Result<(ProjectionMask, FieldLevels)> { + let parquet_schema_desc = self.parquet_meta.file_metadata().schema_descr(); + let projection_mask = ProjectionMask::roots( + parquet_schema_desc, + read_format.projection_indices().iter().copied(), + ); + let hint = Some(read_format.arrow_schema().fields()); + let field_levels = + parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint) + .context(ReadDataPartSnafu)?; + + Ok((projection_mask, field_levels)) + } + + /// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`. pub(crate) async fn build( &self, row_group_idx: usize, row_selection: Option, fetch_metrics: Option<&ParquetFetchMetrics>, - ) -> Result> { - // Create async file reader with caching support. - let async_reader = SstAsyncFileReader::new( - self.file_handle.file_id(), - self.file_path.clone(), - self.object_store.clone(), - self.cache_strategy.clone(), - self.parquet_meta.clone(), - row_group_idx, + ) -> Result { + let mut row_group = self.new_in_memory_row_group(row_group_idx); + self.build_on_row_group_with_projection( + &mut row_group, + row_selection, + fetch_metrics, + self.projection.clone(), + self.field_levels.clone(), ) - .with_fetch_metrics(fetch_metrics.cloned()); + .await + } - // Build the async stream using ArrowReaderBuilder API. - let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata( - async_reader, - self.arrow_metadata.clone(), - ); - builder = builder - .with_row_groups(vec![row_group_idx]) - .with_projection(self.projection.clone()) - .with_batch_size(DEFAULT_READ_BATCH_SIZE); + pub(crate) async fn build_on_row_group_with_read_format( + &self, + row_group: &mut InMemoryRowGroup<'_>, + row_selection: Option, + fetch_metrics: Option<&ParquetFetchMetrics>, + read_format: &ReadFormat, + ) -> Result { + let (projection, field_levels) = self.projection_and_field_levels(read_format)?; + self.build_on_row_group_with_projection( + row_group, + row_selection, + fetch_metrics, + projection, + field_levels, + ) + .await + } - if let Some(selection) = row_selection { - builder = builder.with_row_selection(selection); + pub(crate) async fn build_on_row_group( + &self, + row_group: &mut InMemoryRowGroup<'_>, + row_selection: Option, + fetch_metrics: Option<&ParquetFetchMetrics>, + ) -> Result { + self.build_on_row_group_with_projection( + row_group, + row_selection, + fetch_metrics, + self.projection.clone(), + self.field_levels.clone(), + ) + .await + } + + async fn build_on_row_group_with_projection( + &self, + row_group: &mut InMemoryRowGroup<'_>, + row_selection: Option, + fetch_metrics: Option<&ParquetFetchMetrics>, + projection: ProjectionMask, + field_levels: FieldLevels, + ) -> Result { + let fetch_start = Instant::now(); + + // Fetches data into memory. + row_group + .fetch(&projection, row_selection.as_ref(), fetch_metrics) + .await + .context(ReadParquetSnafu { + path: &self.file_path, + })?; + + // Record total fetch elapsed time. + if let Some(metrics) = fetch_metrics { + metrics.data.lock().unwrap().total_fetch_elapsed += fetch_start.elapsed(); } - let stream = builder.build().context(ReadParquetSnafu { + // Builds the parquet reader. + // Now the row selection is None. + ParquetRecordBatchReader::try_new_with_row_groups( + &field_levels, + row_group, + DEFAULT_READ_BATCH_SIZE, + row_selection, + ) + .context(ReadParquetSnafu { path: &self.file_path, - })?; - - Ok(stream) + }) } } @@ -1845,7 +1938,7 @@ impl ParquetReader { pub async fn next_record_batch(&mut self) -> Result> { loop { if let Some(reader) = &mut self.reader { - if let Some(batch) = reader.next_batch().await? { + if let Some(batch) = reader.next_batch()? { return Ok(Some(batch)); } self.reader = None; @@ -1924,18 +2017,26 @@ impl ParquetReader { /// RowGroupReaderContext represents the fields that cannot be shared /// between different `RowGroupReader`s. pub(crate) trait RowGroupReaderContext: Send { - fn read_format(&self) -> &ReadFormat; + fn map_result( + &self, + result: std::result::Result, ArrowError>, + ) -> Result>; - fn file_path(&self) -> &str; + fn read_format(&self) -> &ReadFormat; } impl RowGroupReaderContext for FileRangeContextRef { - fn read_format(&self) -> &ReadFormat { - self.as_ref().read_format() + fn map_result( + &self, + result: std::result::Result, ArrowError>, + ) -> Result> { + result.context(ArrowReaderSnafu { + path: self.file_path(), + }) } - fn file_path(&self) -> &str { - self.as_ref().file_path() + fn read_format(&self) -> &ReadFormat { + self.as_ref().read_format() } } @@ -1944,11 +2045,8 @@ pub(crate) type RowGroupReader = RowGroupReaderBase; impl RowGroupReader { /// Creates a new reader from file range. - pub(crate) fn new( - context: FileRangeContextRef, - stream: ParquetRecordBatchStream, - ) -> Self { - Self::create(context, stream) + pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self { + Self::create(context, reader) } } @@ -1956,8 +2054,8 @@ impl RowGroupReader { pub(crate) struct RowGroupReaderBase { /// Context of [RowGroupReader] so adapts to different underlying implementation. context: T, - /// Inner parquet record batch stream. - stream: ParquetRecordBatchStream, + /// Inner parquet reader. + reader: ParquetRecordBatchReader, /// Buffered batches to return. batches: VecDeque, /// Local scan metrics. @@ -1971,7 +2069,7 @@ where T: RowGroupReaderContext, { /// Creates a new reader to read the primary key format. - pub(crate) fn create(context: T, stream: ParquetRecordBatchStream) -> Self { + pub(crate) fn create(context: T, reader: ParquetRecordBatchReader) -> Self { // The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE. let override_sequence = context .read_format() @@ -1980,7 +2078,7 @@ where Self { context, - stream, + reader, batches: VecDeque::new(), metrics: ReaderMetrics::default(), override_sequence, @@ -1997,18 +2095,13 @@ where self.context.read_format() } - /// Tries to fetch next [RecordBatch] from the stream asynchronously. - async fn fetch_next_record_batch(&mut self) -> Result> { - match self.stream.next().await.transpose() { - Ok(batch) => Ok(batch), - Err(e) => Err(e).context(ReadParquetSnafu { - path: self.context.file_path(), - }), - } + /// Tries to fetch next [RecordBatch] from the reader. + fn fetch_next_record_batch(&mut self) -> Result> { + self.context.map_result(self.reader.next().transpose()) } /// Returns the next [Batch]. - pub(crate) async fn next_inner(&mut self) -> Result> { + pub(crate) fn next_inner(&mut self) -> Result> { let scan_start = Instant::now(); if let Some(batch) = self.batches.pop_front() { self.metrics.num_rows += batch.num_rows(); @@ -2018,7 +2111,7 @@ where // We need to fetch next record batch and convert it to batches. while self.batches.is_empty() { - let Some(record_batch) = self.fetch_next_record_batch().await? else { + let Some(record_batch) = self.fetch_next_record_batch()? else { self.metrics.scan_cost += scan_start.elapsed(); return Ok(None); }; @@ -2046,10 +2139,10 @@ where #[async_trait::async_trait] impl BatchReader for RowGroupReaderBase where - T: RowGroupReaderContext + Send + Sync, + T: RowGroupReaderContext, { async fn next_batch(&mut self) -> Result> { - self.next_inner().await + self.next_inner() } } @@ -2057,18 +2150,15 @@ where pub(crate) struct FlatRowGroupReader { /// Context for file ranges. context: FileRangeContextRef, - /// Inner parquet record batch stream. - stream: ParquetRecordBatchStream, + /// Inner parquet reader. + reader: ParquetRecordBatchReader, /// Cached sequence array to override sequences. override_sequence: Option, } impl FlatRowGroupReader { /// Creates a new flat reader from file range. - pub(crate) fn new( - context: FileRangeContextRef, - stream: ParquetRecordBatchStream, - ) -> Self { + pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self { // The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE. let override_sequence = context .read_format() @@ -2076,28 +2166,39 @@ impl FlatRowGroupReader { Self { context, - stream, + reader, override_sequence, } } - /// Returns the next RecordBatch. - pub(crate) async fn next_batch(&mut self) -> Result> { - match self.stream.next().await { + /// Returns the next raw RecordBatch from parquet before flat-format conversion. + pub(crate) fn next_raw_batch(&mut self) -> Result> { + match self.reader.next() { Some(batch_result) => { - let record_batch = batch_result.context(ReadParquetSnafu { + let record_batch = batch_result.context(ArrowReaderSnafu { path: self.context.file_path(), })?; - - // Safety: Only flat format use FlatRowGroupReader. - let flat_format = self.context.read_format().as_flat().unwrap(); - let record_batch = - flat_format.convert_batch(record_batch, self.override_sequence.as_ref())?; Ok(Some(record_batch)) } None => Ok(None), } } + + /// Converts a raw parquet batch into the projected flat batch. + pub(crate) fn convert_batch(&self, record_batch: RecordBatch) -> Result { + // Safety: Only flat format use FlatRowGroupReader. + let flat_format = self.context.read_format().as_flat().unwrap(); + flat_format.convert_batch(record_batch, self.override_sequence.as_ref()) + } + + /// Returns the next flat RecordBatch. + pub(crate) fn next_batch(&mut self) -> Result> { + let Some(record_batch) = self.next_raw_batch()? else { + return Ok(None); + }; + + self.convert_batch(record_batch).map(Some) + } } #[cfg(test)] diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index 38ef62c6b8..6d245f8988 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -12,12 +12,28 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Parquet row group reading utilities. +//! Ports private structs from [parquet crate](https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/arrow/async_reader/mod.rs#L644-L650). use std::ops::Range; use std::sync::Arc; -use crate::sst::parquet::helper::MERGE_GAP; +use bytes::{Buf, Bytes}; +use object_store::ObjectStore; +use parquet::arrow::ProjectionMask; +use parquet::arrow::arrow_reader::{RowGroups, RowSelection}; +use parquet::column::page::{PageIterator, PageReader}; +use parquet::errors::{ParquetError, Result}; +use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; +use parquet::file::page_index::offset_index::OffsetIndexMetaData; +use parquet::file::reader::{ChunkReader, Length}; +use parquet::file::serialized_reader::SerializedPageReader; +use store_api::storage::{FileId, RegionId}; +use tokio::task::yield_now; + +use crate::cache::file_cache::{FileType, IndexKey}; +use crate::cache::{CacheStrategy, PageKey, PageValue}; +use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES}; +use crate::sst::parquet::helper::{MERGE_GAP, fetch_byte_ranges}; /// Inner data for ParquetFetchMetrics. #[derive(Default, Debug, Clone)] @@ -58,9 +74,9 @@ impl ParquetFetchMetricsData { } /// Metrics for tracking page/row group fetch operations. -#[derive(Default, Clone)] +#[derive(Default)] pub struct ParquetFetchMetrics { - pub data: Arc>, + pub data: std::sync::Mutex, } impl std::fmt::Debug for ParquetFetchMetrics { @@ -188,6 +204,357 @@ impl ParquetFetchMetrics { } } +pub(crate) struct RowGroupBase<'a> { + parquet_metadata: &'a ParquetMetaData, + row_group_idx: usize, + pub(crate) offset_index: Option<&'a [OffsetIndexMetaData]>, + /// Compressed page of each column. + column_chunks: Vec>>, + pub(crate) row_count: usize, +} + +impl<'a> RowGroupBase<'a> { + pub(crate) fn new(parquet_meta: &'a ParquetMetaData, row_group_idx: usize) -> Self { + let metadata = parquet_meta.row_group(row_group_idx); + // `offset_index` is always `None` if we don't set + // [with_page_index()](https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderOptions.html#method.with_page_index) + // to `true`. + let offset_index = parquet_meta + .offset_index() + // filter out empty offset indexes (old versions specified Some(vec![]) when no present) + .filter(|index| !index.is_empty()) + .map(|x| x[row_group_idx].as_slice()); + + Self { + parquet_metadata: parquet_meta, + row_group_idx, + offset_index, + column_chunks: vec![None; metadata.columns().len()], + row_count: metadata.num_rows() as usize, + } + } + + pub(crate) fn calc_sparse_read_ranges( + &self, + projection: &ProjectionMask, + offset_index: &[OffsetIndexMetaData], + selection: &RowSelection, + ) -> (Vec>, Vec>) { + // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the + // `RowSelection` + let mut page_start_offsets: Vec> = vec![]; + let ranges = self + .column_chunks + .iter() + .zip(self.row_group_metadata().columns()) + .enumerate() + .filter(|&(idx, (chunk, _chunk_meta))| chunk.is_none() && projection.leaf_included(idx)) + .flat_map(|(idx, (_chunk, chunk_meta))| { + // If the first page does not start at the beginning of the column, + // then we need to also fetch a dictionary page. + let mut ranges = vec![]; + let (start, _len) = chunk_meta.byte_range(); + match offset_index[idx].page_locations.first() { + Some(first) if first.offset as u64 != start => { + ranges.push(start..first.offset as u64); + } + _ => (), + } + + ranges.extend( + selection + .scan_ranges(&offset_index[idx].page_locations) + .iter() + .map(|range| range.start..range.end), + ); + page_start_offsets.push(ranges.iter().map(|range| range.start as usize).collect()); + + ranges + }) + .collect::>(); + (ranges, page_start_offsets) + } + + pub(crate) fn assign_sparse_chunk( + &mut self, + projection: &ProjectionMask, + data: Vec, + page_start_offsets: Vec>, + ) { + let mut page_start_offsets = page_start_offsets.into_iter(); + let mut chunk_data = data.into_iter(); + + for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { + if chunk.is_some() || !projection.leaf_included(idx) { + continue; + } + + if let Some(offsets) = page_start_offsets.next() { + let mut chunks = Vec::with_capacity(offsets.len()); + for _ in 0..offsets.len() { + chunks.push(chunk_data.next().unwrap()); + } + + let column = self + .parquet_metadata + .row_group(self.row_group_idx) + .column(idx); + *chunk = Some(Arc::new(ColumnChunkData::Sparse { + length: column.byte_range().1 as usize, + data: offsets.into_iter().zip(chunks).collect(), + })) + } + } + } + + pub(crate) fn calc_dense_read_ranges(&self, projection: &ProjectionMask) -> Vec> { + self.column_chunks + .iter() + .enumerate() + .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx)) + .map(|(idx, _chunk)| { + let column = self.row_group_metadata().column(idx); + let (start, length) = column.byte_range(); + start..(start + length) + }) + .collect::>() + } + + /// Assigns compressed chunk binary data to [RowGroupBase::column_chunks] + /// and returns the chunk offset and binary data assigned. + pub(crate) fn assign_dense_chunk( + &mut self, + projection: &ProjectionMask, + chunk_data: Vec, + ) { + let mut chunk_data = chunk_data.into_iter(); + + for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { + if chunk.is_some() || !projection.leaf_included(idx) { + continue; + } + + // Get the fetched page. + let Some(data) = chunk_data.next() else { + continue; + }; + + let column = self + .parquet_metadata + .row_group(self.row_group_idx) + .column(idx); + *chunk = Some(Arc::new(ColumnChunkData::Dense { + offset: column.byte_range().0 as usize, + data, + })); + } + } + + /// Create [PageReader] from [RowGroupBase::column_chunks] + pub(crate) fn column_reader( + &self, + col_idx: usize, + ) -> Result> { + let page_reader = match &self.column_chunks[col_idx] { + None => { + return Err(ParquetError::General(format!( + "Invalid column index {col_idx}, column was not fetched" + ))); + } + Some(data) => { + let page_locations = self + .offset_index + // filter out empty offset indexes (old versions specified Some(vec![]) when no present) + .filter(|index| !index.is_empty()) + .map(|index| index[col_idx].page_locations.clone()); + SerializedPageReader::new( + data.clone(), + self.row_group_metadata().column(col_idx), + self.row_count, + page_locations, + )? + } + }; + + Ok(page_reader) + } + + pub(crate) fn parquet_metadata(&self) -> &ParquetMetaData { + self.parquet_metadata + } + + pub(crate) fn row_group_metadata(&self) -> &RowGroupMetaData { + self.parquet_metadata().row_group(self.row_group_idx) + } +} + +/// An in-memory collection of column chunks +pub struct InMemoryRowGroup<'a> { + region_id: RegionId, + file_id: FileId, + row_group_idx: usize, + cache_strategy: CacheStrategy, + file_path: &'a str, + /// Object store. + object_store: ObjectStore, + base: RowGroupBase<'a>, +} + +impl<'a> InMemoryRowGroup<'a> { + /// Creates a new [InMemoryRowGroup] by `row_group_idx`. + /// + /// # Panics + /// Panics if the `row_group_idx` is invalid. + pub fn create( + region_id: RegionId, + file_id: FileId, + parquet_meta: &'a ParquetMetaData, + row_group_idx: usize, + cache_strategy: CacheStrategy, + file_path: &'a str, + object_store: ObjectStore, + ) -> Self { + Self { + region_id, + file_id, + row_group_idx, + cache_strategy, + file_path, + object_store, + base: RowGroupBase::new(parquet_meta, row_group_idx), + } + } + + /// Fetches the necessary column data into memory + pub async fn fetch( + &mut self, + projection: &ProjectionMask, + selection: Option<&RowSelection>, + metrics: Option<&ParquetFetchMetrics>, + ) -> Result<()> { + if let Some((selection, offset_index)) = selection.zip(self.base.offset_index) { + let (fetch_ranges, page_start_offsets) = + self.base + .calc_sparse_read_ranges(projection, offset_index, selection); + + let chunk_data = self.fetch_bytes(&fetch_ranges, metrics).await?; + // Assign sparse chunk data to base. + self.base + .assign_sparse_chunk(projection, chunk_data, page_start_offsets); + } else { + // Release the CPU to avoid blocking the runtime. Since `fetch_pages_from_cache` + // is a synchronous, CPU-bound operation. + yield_now().await; + + // Calculate ranges to read. + let fetch_ranges = self.base.calc_dense_read_ranges(projection); + + if fetch_ranges.is_empty() { + // Nothing to fetch. + return Ok(()); + } + + // Fetch data with ranges + let chunk_data = self.fetch_bytes(&fetch_ranges, metrics).await?; + + // Assigns fetched data to base. + self.base.assign_dense_chunk(projection, chunk_data); + } + + Ok(()) + } + + /// Try to fetch data from the memory cache or the WriteCache, + /// if not in WriteCache, fetch data from object store directly. + async fn fetch_bytes( + &self, + ranges: &[Range], + metrics: Option<&ParquetFetchMetrics>, + ) -> Result> { + // Now fetch page timer includes the whole time to read pages. + let _timer = READ_STAGE_FETCH_PAGES.start_timer(); + + let page_key = PageKey::new(self.file_id, self.row_group_idx, ranges.to_vec()); + if let Some(pages) = self.cache_strategy.get_pages(&page_key) { + if let Some(metrics) = metrics { + let total_size: u64 = ranges.iter().map(|r| r.end - r.start).sum(); + let mut metrics_data = metrics.data.lock().unwrap(); + metrics_data.page_cache_hit += 1; + metrics_data.pages_to_fetch_mem += ranges.len(); + metrics_data.page_size_to_fetch_mem += total_size; + metrics_data.page_size_needed += total_size; + } + return Ok(pages.compressed.clone()); + } + + // Calculate total range size for metrics. + let (total_range_size, unaligned_size) = compute_total_range_size(ranges); + + let key = IndexKey::new(self.region_id, self.file_id, FileType::Parquet); + let fetch_write_cache_start = metrics.map(|_| std::time::Instant::now()); + let write_cache_result = self.fetch_ranges_from_write_cache(key, ranges).await; + let pages = match write_cache_result { + Some(data) => { + if let Some(metrics) = metrics { + let elapsed = fetch_write_cache_start + .map(|start| start.elapsed()) + .unwrap_or_default(); + let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum(); + let mut metrics_data = metrics.data.lock().unwrap(); + metrics_data.write_cache_fetch_elapsed += elapsed; + metrics_data.write_cache_hit += 1; + metrics_data.pages_to_fetch_write_cache += ranges.len(); + metrics_data.page_size_to_fetch_write_cache += unaligned_size; + metrics_data.page_size_needed += range_size_needed; + } + data + } + None => { + // Fetch data from object store. + let _timer = READ_STAGE_ELAPSED + .with_label_values(&["cache_miss_read"]) + .start_timer(); + + let start = metrics.map(|_| std::time::Instant::now()); + let data = fetch_byte_ranges(self.file_path, self.object_store.clone(), ranges) + .await + .map_err(|e| ParquetError::External(Box::new(e)))?; + if let Some(metrics) = metrics { + let elapsed = start.map(|start| start.elapsed()).unwrap_or_default(); + let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum(); + let mut metrics_data = metrics.data.lock().unwrap(); + metrics_data.store_fetch_elapsed += elapsed; + metrics_data.cache_miss += 1; + metrics_data.pages_to_fetch_store += ranges.len(); + metrics_data.page_size_to_fetch_store += unaligned_size; + metrics_data.page_size_needed += range_size_needed; + } + data + } + }; + + // Put pages back to the cache. + let page_value = PageValue::new(pages.clone(), total_range_size); + self.cache_strategy + .put_pages(page_key, Arc::new(page_value)); + + Ok(pages) + } + + /// Fetches data from write cache. + /// Returns `None` if the data is not in the cache. + async fn fetch_ranges_from_write_cache( + &self, + key: IndexKey, + ranges: &[Range], + ) -> Option> { + if let Some(cache) = self.cache_strategy.write_cache() { + return cache.file_cache().read_ranges(key, ranges).await; + } + None + } +} + /// Computes the max possible buffer size to read the given `ranges`. /// Returns (aligned_size, unaligned_size) where: /// - aligned_size: total size aligned to pooled buffer size @@ -235,3 +602,96 @@ fn align_to_pooled_buf_size(size: u64) -> u64 { const POOLED_BUF_SIZE: u64 = 2 * 1024 * 1024; size.div_ceil(POOLED_BUF_SIZE) * POOLED_BUF_SIZE } + +impl RowGroups for InMemoryRowGroup<'_> { + fn num_rows(&self) -> usize { + self.base.row_count + } + + fn column_chunks(&self, i: usize) -> Result> { + // Creates a page reader to read column at `i`. + let page_reader = self.base.column_reader(i)?; + + Ok(Box::new(ColumnChunkIterator { + reader: Some(Ok(Box::new(page_reader))), + })) + } + + fn row_groups(&self) -> Box + '_> { + Box::new(std::iter::once(self.base.row_group_metadata())) + } + + fn metadata(&self) -> &ParquetMetaData { + self.base.parquet_metadata() + } +} + +/// An in-memory column chunk +#[derive(Clone)] +pub(crate) enum ColumnChunkData { + /// Column chunk data representing only a subset of data pages + Sparse { + /// Length of the full column chunk + length: usize, + /// Set of data pages included in this sparse chunk. Each element is a tuple + /// of (page offset, page data) + data: Vec<(usize, Bytes)>, + }, + /// Full column chunk and its offset + Dense { offset: usize, data: Bytes }, +} + +impl ColumnChunkData { + fn get(&self, start: u64) -> Result { + match &self { + ColumnChunkData::Sparse { data, .. } => data + .binary_search_by_key(&start, |(offset, _)| *offset as u64) + .map(|idx| data[idx].1.clone()) + .map_err(|_| { + ParquetError::General(format!( + "Invalid offset in sparse column chunk data: {start}" + )) + }), + ColumnChunkData::Dense { offset, data } => { + let start = start as usize - *offset; + Ok(data.slice(start..)) + } + } + } +} + +impl Length for ColumnChunkData { + fn len(&self) -> u64 { + match &self { + ColumnChunkData::Sparse { length, .. } => *length as u64, + ColumnChunkData::Dense { data, .. } => data.len() as u64, + } + } +} + +impl ChunkReader for ColumnChunkData { + type T = bytes::buf::Reader; + + fn get_read(&self, start: u64) -> Result { + Ok(self.get(start)?.reader()) + } + + fn get_bytes(&self, start: u64, length: usize) -> Result { + Ok(self.get(start)?.slice(..length)) + } +} + +/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`] +pub(crate) struct ColumnChunkIterator { + pub(crate) reader: Option>>, +} + +impl Iterator for ColumnChunkIterator { + type Item = Result>; + + fn next(&mut self) -> Option { + self.reader.take() + } +} + +impl PageIterator for ColumnChunkIterator {}