diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 4f4387fda6..a3fe32cbe3 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -24,13 +24,13 @@ use crate::sst::DEFAULT_WRITE_BUFFER_SIZE; use crate::sst::file::FileTimeRange; use crate::sst::index::IndexOutput; -pub(crate) mod async_reader; pub mod file_range; pub mod flat_format; pub mod format; pub(crate) mod helper; pub(crate) mod metadata; pub mod prefilter; +pub(crate) mod push_decoder; pub mod read_columns; pub mod reader; pub mod row_group; diff --git a/src/mito2/src/sst/parquet/prefilter.rs b/src/mito2/src/sst/parquet/prefilter.rs index ddd9c9d075..c98da1abac 100644 --- a/src/mito2/src/sst/parquet/prefilter.rs +++ b/src/mito2/src/sst/parquet/prefilter.rs @@ -39,7 +39,7 @@ use table::predicate::Predicate; use crate::error::{ ComputeArrowSnafu, DecodeSnafu, EvalPartitionFilterSnafu, NewRecordBatchSnafu, - ReadParquetSnafu, RecordBatchSnafu, Result, UnexpectedSnafu, + RecordBatchSnafu, Result, UnexpectedSnafu, }; use crate::sst::parquet::file_range::PreFilterMode; use crate::sst::parquet::flat_format::FlatReadFormat; @@ -582,9 +582,7 @@ pub(crate) async fn execute_prefilter( let mut rows_selected = 0usize; while let Some(batch_result) = stream.next().await { - let batch = batch_result.context(ReadParquetSnafu { - path: reader_builder.file_path(), - })?; + let batch = batch_result?; let num_rows = batch.num_rows(); if num_rows == 0 { continue; diff --git a/src/mito2/src/sst/parquet/async_reader.rs b/src/mito2/src/sst/parquet/push_decoder.rs similarity index 69% rename from src/mito2/src/sst/parquet/async_reader.rs rename to src/mito2/src/sst/parquet/push_decoder.rs index a060fd367d..c44813a5ab 100644 --- a/src/mito2/src/sst/parquet/async_reader.rs +++ b/src/mito2/src/sst/parquet/push_decoder.rs @@ -12,31 +12,37 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Async file reader implementation for SST parquet files. +//! Push decoder stream implementation for SST parquet files. use std::ops::Range; use std::sync::Arc; use bytes::Bytes; -use futures::FutureExt; -use futures::future::BoxFuture; +use datatypes::arrow::record_batch::RecordBatch; +use futures::StreamExt; +use futures::stream::BoxStream; use object_store::ObjectStore; -use parquet::arrow::async_reader::AsyncFileReader; -use parquet::errors::{ParquetError, Result as ParquetResult}; -use parquet::file::metadata::ParquetMetaData; +use parquet::DecodeResult; +use parquet::arrow::ProjectionMask; +use parquet::arrow::arrow_reader::{ArrowReaderMetadata, RowSelection}; +use parquet::arrow::push_decoder::ParquetPushDecoderBuilder; +use snafu::ResultExt; use crate::cache::file_cache::{FileType, IndexKey}; use crate::cache::{CacheStrategy, PageKey, PageValue}; +use crate::error::{OpenDalSnafu, ReadParquetSnafu, Result}; use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES}; use crate::sst::file::RegionFileId; +use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE; use crate::sst::parquet::helper::fetch_byte_ranges; use crate::sst::parquet::row_group::{ParquetFetchMetrics, compute_total_range_size}; -/// An [AsyncFileReader] implementation for SST parquet files. +/// Fetches parquet byte ranges through Greptime's cache hierarchy. /// -/// This reader provides async byte access to parquet data in object storage, -/// with caching support (page cache and write cache). -pub struct SstAsyncFileReader { +/// The push decoder decides which ranges are required for decoding, while this +/// fetcher keeps cache lookup, local write-cache reads, and remote I/O explicit +/// in Greptime code. +pub(crate) struct SstParquetRangeFetcher { /// Region file ID for cache key. region_file_id: RegionFileId, /// Path to the parquet file in object storage. @@ -45,43 +51,34 @@ pub struct SstAsyncFileReader { object_store: ObjectStore, /// Cache strategy for reading pages. cache_strategy: CacheStrategy, - /// Cached parquet metadata. - metadata: Arc, /// Row group index for cache key. row_group_idx: usize, /// Optional metrics for tracking fetch operations. fetch_metrics: Option, } -impl SstAsyncFileReader { - /// Creates a new [SstAsyncFileReader]. - pub fn new( +impl SstParquetRangeFetcher { + /// Creates a new [SstParquetRangeFetcher]. + pub(crate) fn new( region_file_id: RegionFileId, file_path: String, object_store: ObjectStore, cache_strategy: CacheStrategy, - metadata: Arc, row_group_idx: usize, + fetch_metrics: Option, ) -> Self { Self { region_file_id, file_path, object_store, cache_strategy, - metadata, row_group_idx, - fetch_metrics: None, + fetch_metrics, } } - /// Sets the fetch metrics. - pub fn with_fetch_metrics(mut self, metrics: Option) -> Self { - self.fetch_metrics = metrics; - self - } - /// Fetches byte ranges from page cache, write cache, or object store. - async fn fetch_bytes_with_cache(&self, ranges: Vec>) -> ParquetResult> { + async fn fetch_bytes_with_cache(&self, ranges: Vec>) -> Result> { let fetch_start = self .fetch_metrics .as_ref() @@ -123,7 +120,10 @@ impl SstAsyncFileReader { .fetch_metrics .as_ref() .map(|_| std::time::Instant::now()); - let write_cache_result = self.fetch_ranges_from_write_cache(key, &ranges).await; + let write_cache_result = match self.cache_strategy.write_cache() { + Some(cache) => cache.file_cache().read_ranges(key, &ranges).await, + None => None, + }; let pages = match write_cache_result { Some(data) => { @@ -153,7 +153,7 @@ impl SstAsyncFileReader { .map(|_| std::time::Instant::now()); let data = fetch_byte_ranges(&self.file_path, self.object_store.clone(), &ranges) .await - .map_err(|e| ParquetError::External(Box::new(e)))?; + .context(OpenDalSnafu)?; if let Some(metrics) = &self.fetch_metrics { let elapsed = start.map(|start| start.elapsed()).unwrap_or_default(); @@ -180,42 +180,43 @@ impl SstAsyncFileReader { Ok(pages) } - - /// Fetches data from write cache. - /// Returns `None` if the data is not in the cache. - async fn fetch_ranges_from_write_cache( - &self, - key: IndexKey, - ranges: &[Range], - ) -> Option> { - if let Some(cache) = self.cache_strategy.write_cache() { - return cache.file_cache().read_ranges(key, ranges).await; - } - None - } } -impl AsyncFileReader for SstAsyncFileReader { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, ParquetResult> { - async move { - let mut result = self.fetch_bytes_with_cache(vec![range]).await?; - Ok(result.pop().unwrap_or_default()) +/// Builds a parquet record batch stream driven directly by [ParquetPushDecoderBuilder]. +pub(crate) fn build_sst_parquet_record_batch_stream( + arrow_metadata: ArrowReaderMetadata, + row_group_idx: usize, + row_selection: Option, + projection: ProjectionMask, + fetcher: SstParquetRangeFetcher, + file_path: String, +) -> Result>> { + let mut builder = ParquetPushDecoderBuilder::new_with_metadata(arrow_metadata) + .with_row_groups(vec![row_group_idx]) + .with_projection(projection) + .with_batch_size(DEFAULT_READ_BATCH_SIZE); + + if let Some(selection) = row_selection { + builder = builder.with_row_selection(selection); + } + + let mut decoder = builder + .build() + .context(ReadParquetSnafu { path: &file_path })?; + + Ok(async_stream::try_stream! { + loop { + match decoder.try_decode().context(ReadParquetSnafu { path: &file_path })? { + DecodeResult::NeedsData(ranges) => { + let data = fetcher.fetch_bytes_with_cache(ranges.clone()).await?; + decoder + .push_ranges(ranges, data) + .context(ReadParquetSnafu { path: &file_path })?; + } + DecodeResult::Data(batch) => yield batch, + DecodeResult::Finished => break, + } } - .boxed() - } - - fn get_byte_ranges( - &mut self, - ranges: Vec>, - ) -> BoxFuture<'_, ParquetResult>> { - async move { self.fetch_bytes_with_cache(ranges).await }.boxed() - } - - fn get_metadata( - &mut self, - _options: Option<&parquet::arrow::arrow_reader::ArrowReaderOptions>, - ) -> BoxFuture<'_, ParquetResult>> { - // Metadata is already cached, return it immediately. - std::future::ready(Ok(self.metadata.clone())).boxed() } + .boxed()) } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index f03856a521..26d5cb7b61 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -39,7 +39,6 @@ use mito_codec::row_converter::build_primary_key_codec; use object_store::ObjectStore; use parquet::arrow::ProjectionMask; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions, RowSelection}; -use parquet::arrow::async_reader::{ParquetRecordBatchStream, ParquetRecordBatchStreamBuilder}; use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData}; use partition::expr::PartitionExpr; use snafu::ResultExt; @@ -49,12 +48,12 @@ use store_api::region_request::PathType; use store_api::storage::{ColumnId, FileId}; use table::predicate::Predicate; -use self::stream::{NestedSchemaAligner, ParquetErrorAdapter, ProjectedRecordBatchStream}; +use self::stream::{NestedSchemaAligner, ProjectedRecordBatchStream}; use crate::cache::index::result_cache::PredicateKey; use crate::cache::{CacheStrategy, CachedSstMeta}; #[cfg(feature = "vector_index")] use crate::error::ApplyVectorIndexSnafu; -use crate::error::{ReadDataPartSnafu, ReadParquetSnafu, Result, SerializePartitionExprSnafu}; +use crate::error::{ReadDataPartSnafu, Result, SerializePartitionExprSnafu}; use crate::metrics::{ PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_TOTAL, READ_STAGE_ELAPSED, @@ -75,7 +74,6 @@ use crate::sst::index::inverted_index::applier::{ #[cfg(feature = "vector_index")] use crate::sst::index::vector_index::applier::VectorIndexApplierRef; use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE; -use crate::sst::parquet::async_reader::SstAsyncFileReader; use crate::sst::parquet::file_range::{ FileRangeContext, FileRangeContextRef, PartitionFilterContext, PreFilterMode, RangeBase, }; @@ -85,6 +83,9 @@ use crate::sst::parquet::metadata::MetadataLoader; use crate::sst::parquet::prefilter::{ PrefilterContextBuilder, build_reader_filter_plan, execute_prefilter, }; +use crate::sst::parquet::push_decoder::{ + SstParquetRangeFetcher, build_sst_parquet_record_batch_stream, +}; use crate::sst::parquet::read_columns::{ProjectionMaskPlan, build_projection_plan}; use crate::sst::parquet::row_group::ParquetFetchMetrics; use crate::sst::parquet::row_selection::RowGroupSelection; @@ -1615,7 +1616,7 @@ impl ReaderMetrics { } } -/// Builder to build a [ParquetRecordBatchStream] for a row group. +/// Builder to build a parquet record batch stream for a row group. pub(crate) struct RowGroupReaderBuilder { /// SST file to read. /// @@ -1675,7 +1676,7 @@ impl RowGroupReaderBuilder { self.prefilter_builder.is_some() } - /// Builds a [ParquetRecordBatchStream] to read the row group at `row_group_idx`. + /// Builds a parquet record batch stream to read the row group at `row_group_idx`. /// /// If prefiltering is applicable (based on `build_ctx`), this performs a two-phase read: /// 1. Reads only the prefilter columns (e.g. PK column), applies filters to get a refined row selection @@ -1735,11 +1736,10 @@ impl RowGroupReaderBuilder { fn make_projected_stream( &self, - stream: ParquetRecordBatchStream, + stream: ProjectedRecordBatchStream, ) -> Result { - let stream = ParquetErrorAdapter::new(stream, self.file_path.clone()); if !self.has_nested_projection { - return Ok(stream.boxed()); + return Ok(stream); } Ok(NestedSchemaAligner::new( @@ -1750,44 +1750,31 @@ impl RowGroupReaderBuilder { .boxed()) } - /// Builds a [ParquetRecordBatchStream] with a custom projection mask. + /// Builds a parquet record batch stream with a custom projection mask. pub(crate) async fn build_with_projection( &self, row_group_idx: usize, row_selection: Option, projection: ProjectionMask, fetch_metrics: Option<&ParquetFetchMetrics>, - ) -> Result> { - // Create async file reader with caching support. - let async_reader = SstAsyncFileReader::new( + ) -> Result { + let range_fetcher = SstParquetRangeFetcher::new( self.file_handle.file_id(), self.file_path.clone(), self.object_store.clone(), self.cache_strategy.clone(), - self.parquet_meta.clone(), row_group_idx, - ) - .with_fetch_metrics(fetch_metrics.cloned()); - - // Build the async stream using ArrowReaderBuilder API. - let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata( - async_reader, - self.arrow_metadata.clone(), + fetch_metrics.cloned(), ); - builder = builder - .with_row_groups(vec![row_group_idx]) - .with_projection(projection) - .with_batch_size(DEFAULT_READ_BATCH_SIZE); - if let Some(selection) = row_selection { - builder = builder.with_row_selection(selection); - } - - let stream = builder.build().context(ReadParquetSnafu { - path: &self.file_path, - })?; - - Ok(stream) + build_sst_parquet_record_batch_stream( + self.arrow_metadata.clone(), + row_group_idx, + row_selection, + projection, + range_fetcher, + self.file_path.clone(), + ) } } diff --git a/src/mito2/src/sst/parquet/reader/stream.rs b/src/mito2/src/sst/parquet/reader/stream.rs index f6211df475..1a17d6be64 100644 --- a/src/mito2/src/sst/parquet/reader/stream.rs +++ b/src/mito2/src/sst/parquet/reader/stream.rs @@ -22,13 +22,9 @@ use datatypes::arrow::datatypes::{DataType, FieldRef, SchemaRef}; use datatypes::arrow::record_batch::RecordBatch; use futures::Stream; use futures::stream::BoxStream; -use parquet::arrow::async_reader::ParquetRecordBatchStream; -use snafu::{IntoError, ResultExt, ensure}; +use snafu::{ResultExt, ensure}; -use crate::error::{ - CastColumnSnafu, NewRecordBatchSnafu, ReadParquetSnafu, Result, UnexpectedSnafu, -}; -use crate::sst::parquet::async_reader::SstAsyncFileReader; +use crate::error::{CastColumnSnafu, NewRecordBatchSnafu, Result, UnexpectedSnafu}; /// Aligns projected batches to the expected output schema for nested projections. /// @@ -208,37 +204,6 @@ fn align_array(array: &ArrayRef, field: &FieldRef) -> Result { cast_column(array, field.as_ref(), &DEFAULT_CAST_OPTIONS).context(CastColumnSnafu) } -/// Maps parquet stream errors into mito errors before batches enter the filler. -pub(crate) struct ParquetErrorAdapter { - inner: ParquetRecordBatchStream, - path: String, -} - -impl ParquetErrorAdapter { - pub(crate) fn new(inner: ParquetRecordBatchStream, path: String) -> Self { - Self { inner, path } - } -} - -impl Stream for ParquetErrorAdapter { - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.get_mut(); - - match Pin::new(&mut this.inner).poll_next(cx) { - Poll::Ready(Some(Ok(rb))) => Poll::Ready(Some(Ok(rb))), - Poll::Ready(Some(Err(err))) => { - Poll::Ready(Some(Err( - ReadParquetSnafu { path: &this.path }.into_error(err) - ))) - } - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, - } - } -} - #[cfg(test)] mod tests { use std::sync::Arc;