refactor: remove async file reader adapter layer (#8120)

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-05-15 20:58:31 +08:00
committed by GitHub
parent 7840aa1bb4
commit 3cbd15c1a9
5 changed files with 88 additions and 137 deletions

View File

@@ -24,13 +24,13 @@ use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
use crate::sst::file::FileTimeRange;
use crate::sst::index::IndexOutput;
pub(crate) mod async_reader;
pub mod file_range;
pub mod flat_format;
pub mod format;
pub(crate) mod helper;
pub(crate) mod metadata;
pub mod prefilter;
pub(crate) mod push_decoder;
pub mod read_columns;
pub mod reader;
pub mod row_group;

View File

@@ -39,7 +39,7 @@ use table::predicate::Predicate;
use crate::error::{
ComputeArrowSnafu, DecodeSnafu, EvalPartitionFilterSnafu, NewRecordBatchSnafu,
ReadParquetSnafu, RecordBatchSnafu, Result, UnexpectedSnafu,
RecordBatchSnafu, Result, UnexpectedSnafu,
};
use crate::sst::parquet::file_range::PreFilterMode;
use crate::sst::parquet::flat_format::FlatReadFormat;
@@ -582,9 +582,7 @@ pub(crate) async fn execute_prefilter(
let mut rows_selected = 0usize;
while let Some(batch_result) = stream.next().await {
let batch = batch_result.context(ReadParquetSnafu {
path: reader_builder.file_path(),
})?;
let batch = batch_result?;
let num_rows = batch.num_rows();
if num_rows == 0 {
continue;

View File

@@ -12,31 +12,37 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Async file reader implementation for SST parquet files.
//! Push decoder stream implementation for SST parquet files.
use std::ops::Range;
use std::sync::Arc;
use bytes::Bytes;
use futures::FutureExt;
use futures::future::BoxFuture;
use datatypes::arrow::record_batch::RecordBatch;
use futures::StreamExt;
use futures::stream::BoxStream;
use object_store::ObjectStore;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::errors::{ParquetError, Result as ParquetResult};
use parquet::file::metadata::ParquetMetaData;
use parquet::DecodeResult;
use parquet::arrow::ProjectionMask;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, RowSelection};
use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
use snafu::ResultExt;
use crate::cache::file_cache::{FileType, IndexKey};
use crate::cache::{CacheStrategy, PageKey, PageValue};
use crate::error::{OpenDalSnafu, ReadParquetSnafu, Result};
use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES};
use crate::sst::file::RegionFileId;
use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
use crate::sst::parquet::helper::fetch_byte_ranges;
use crate::sst::parquet::row_group::{ParquetFetchMetrics, compute_total_range_size};
/// An [AsyncFileReader] implementation for SST parquet files.
/// Fetches parquet byte ranges through Greptime's cache hierarchy.
///
/// This reader provides async byte access to parquet data in object storage,
/// with caching support (page cache and write cache).
pub struct SstAsyncFileReader {
/// The push decoder decides which ranges are required for decoding, while this
/// fetcher keeps cache lookup, local write-cache reads, and remote I/O explicit
/// in Greptime code.
pub(crate) struct SstParquetRangeFetcher {
/// Region file ID for cache key.
region_file_id: RegionFileId,
/// Path to the parquet file in object storage.
@@ -45,43 +51,34 @@ pub struct SstAsyncFileReader {
object_store: ObjectStore,
/// Cache strategy for reading pages.
cache_strategy: CacheStrategy,
/// Cached parquet metadata.
metadata: Arc<ParquetMetaData>,
/// Row group index for cache key.
row_group_idx: usize,
/// Optional metrics for tracking fetch operations.
fetch_metrics: Option<ParquetFetchMetrics>,
}
impl SstAsyncFileReader {
/// Creates a new [SstAsyncFileReader].
pub fn new(
impl SstParquetRangeFetcher {
/// Creates a new [SstParquetRangeFetcher].
pub(crate) fn new(
region_file_id: RegionFileId,
file_path: String,
object_store: ObjectStore,
cache_strategy: CacheStrategy,
metadata: Arc<ParquetMetaData>,
row_group_idx: usize,
fetch_metrics: Option<ParquetFetchMetrics>,
) -> Self {
Self {
region_file_id,
file_path,
object_store,
cache_strategy,
metadata,
row_group_idx,
fetch_metrics: None,
fetch_metrics,
}
}
/// Sets the fetch metrics.
pub fn with_fetch_metrics(mut self, metrics: Option<ParquetFetchMetrics>) -> Self {
self.fetch_metrics = metrics;
self
}
/// Fetches byte ranges from page cache, write cache, or object store.
async fn fetch_bytes_with_cache(&self, ranges: Vec<Range<u64>>) -> ParquetResult<Vec<Bytes>> {
async fn fetch_bytes_with_cache(&self, ranges: Vec<Range<u64>>) -> Result<Vec<Bytes>> {
let fetch_start = self
.fetch_metrics
.as_ref()
@@ -123,7 +120,10 @@ impl SstAsyncFileReader {
.fetch_metrics
.as_ref()
.map(|_| std::time::Instant::now());
let write_cache_result = self.fetch_ranges_from_write_cache(key, &ranges).await;
let write_cache_result = match self.cache_strategy.write_cache() {
Some(cache) => cache.file_cache().read_ranges(key, &ranges).await,
None => None,
};
let pages = match write_cache_result {
Some(data) => {
@@ -153,7 +153,7 @@ impl SstAsyncFileReader {
.map(|_| std::time::Instant::now());
let data = fetch_byte_ranges(&self.file_path, self.object_store.clone(), &ranges)
.await
.map_err(|e| ParquetError::External(Box::new(e)))?;
.context(OpenDalSnafu)?;
if let Some(metrics) = &self.fetch_metrics {
let elapsed = start.map(|start| start.elapsed()).unwrap_or_default();
@@ -180,42 +180,43 @@ impl SstAsyncFileReader {
Ok(pages)
}
/// Fetches data from write cache.
/// Returns `None` if the data is not in the cache.
async fn fetch_ranges_from_write_cache(
&self,
key: IndexKey,
ranges: &[Range<u64>],
) -> Option<Vec<Bytes>> {
if let Some(cache) = self.cache_strategy.write_cache() {
return cache.file_cache().read_ranges(key, ranges).await;
}
None
}
}
impl AsyncFileReader for SstAsyncFileReader {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, ParquetResult<Bytes>> {
async move {
let mut result = self.fetch_bytes_with_cache(vec![range]).await?;
Ok(result.pop().unwrap_or_default())
/// Builds a parquet record batch stream driven directly by [ParquetPushDecoderBuilder].
pub(crate) fn build_sst_parquet_record_batch_stream(
arrow_metadata: ArrowReaderMetadata,
row_group_idx: usize,
row_selection: Option<RowSelection>,
projection: ProjectionMask,
fetcher: SstParquetRangeFetcher,
file_path: String,
) -> Result<BoxStream<'static, Result<RecordBatch>>> {
let mut builder = ParquetPushDecoderBuilder::new_with_metadata(arrow_metadata)
.with_row_groups(vec![row_group_idx])
.with_projection(projection)
.with_batch_size(DEFAULT_READ_BATCH_SIZE);
if let Some(selection) = row_selection {
builder = builder.with_row_selection(selection);
}
let mut decoder = builder
.build()
.context(ReadParquetSnafu { path: &file_path })?;
Ok(async_stream::try_stream! {
loop {
match decoder.try_decode().context(ReadParquetSnafu { path: &file_path })? {
DecodeResult::NeedsData(ranges) => {
let data = fetcher.fetch_bytes_with_cache(ranges.clone()).await?;
decoder
.push_ranges(ranges, data)
.context(ReadParquetSnafu { path: &file_path })?;
}
DecodeResult::Data(batch) => yield batch,
DecodeResult::Finished => break,
}
}
.boxed()
}
fn get_byte_ranges(
&mut self,
ranges: Vec<Range<u64>>,
) -> BoxFuture<'_, ParquetResult<Vec<Bytes>>> {
async move { self.fetch_bytes_with_cache(ranges).await }.boxed()
}
fn get_metadata(
&mut self,
_options: Option<&parquet::arrow::arrow_reader::ArrowReaderOptions>,
) -> BoxFuture<'_, ParquetResult<Arc<ParquetMetaData>>> {
// Metadata is already cached, return it immediately.
std::future::ready(Ok(self.metadata.clone())).boxed()
}
.boxed())
}

View File

@@ -39,7 +39,6 @@ use mito_codec::row_converter::build_primary_key_codec;
use object_store::ObjectStore;
use parquet::arrow::ProjectionMask;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions, RowSelection};
use parquet::arrow::async_reader::{ParquetRecordBatchStream, ParquetRecordBatchStreamBuilder};
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
use partition::expr::PartitionExpr;
use snafu::ResultExt;
@@ -49,12 +48,12 @@ use store_api::region_request::PathType;
use store_api::storage::{ColumnId, FileId};
use table::predicate::Predicate;
use self::stream::{NestedSchemaAligner, ParquetErrorAdapter, ProjectedRecordBatchStream};
use self::stream::{NestedSchemaAligner, ProjectedRecordBatchStream};
use crate::cache::index::result_cache::PredicateKey;
use crate::cache::{CacheStrategy, CachedSstMeta};
#[cfg(feature = "vector_index")]
use crate::error::ApplyVectorIndexSnafu;
use crate::error::{ReadDataPartSnafu, ReadParquetSnafu, Result, SerializePartitionExprSnafu};
use crate::error::{ReadDataPartSnafu, Result, SerializePartitionExprSnafu};
use crate::metrics::{
PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL,
READ_ROWS_TOTAL, READ_STAGE_ELAPSED,
@@ -75,7 +74,6 @@ use crate::sst::index::inverted_index::applier::{
#[cfg(feature = "vector_index")]
use crate::sst::index::vector_index::applier::VectorIndexApplierRef;
use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
use crate::sst::parquet::async_reader::SstAsyncFileReader;
use crate::sst::parquet::file_range::{
FileRangeContext, FileRangeContextRef, PartitionFilterContext, PreFilterMode, RangeBase,
};
@@ -85,6 +83,9 @@ use crate::sst::parquet::metadata::MetadataLoader;
use crate::sst::parquet::prefilter::{
PrefilterContextBuilder, build_reader_filter_plan, execute_prefilter,
};
use crate::sst::parquet::push_decoder::{
SstParquetRangeFetcher, build_sst_parquet_record_batch_stream,
};
use crate::sst::parquet::read_columns::{ProjectionMaskPlan, build_projection_plan};
use crate::sst::parquet::row_group::ParquetFetchMetrics;
use crate::sst::parquet::row_selection::RowGroupSelection;
@@ -1615,7 +1616,7 @@ impl ReaderMetrics {
}
}
/// Builder to build a [ParquetRecordBatchStream] for a row group.
/// Builder to build a parquet record batch stream for a row group.
pub(crate) struct RowGroupReaderBuilder {
/// SST file to read.
///
@@ -1675,7 +1676,7 @@ impl RowGroupReaderBuilder {
self.prefilter_builder.is_some()
}
/// Builds a [ParquetRecordBatchStream] to read the row group at `row_group_idx`.
/// Builds a parquet record batch stream to read the row group at `row_group_idx`.
///
/// If prefiltering is applicable (based on `build_ctx`), this performs a two-phase read:
/// 1. Reads only the prefilter columns (e.g. PK column), applies filters to get a refined row selection
@@ -1735,11 +1736,10 @@ impl RowGroupReaderBuilder {
fn make_projected_stream(
&self,
stream: ParquetRecordBatchStream<SstAsyncFileReader>,
stream: ProjectedRecordBatchStream,
) -> Result<ProjectedRecordBatchStream> {
let stream = ParquetErrorAdapter::new(stream, self.file_path.clone());
if !self.has_nested_projection {
return Ok(stream.boxed());
return Ok(stream);
}
Ok(NestedSchemaAligner::new(
@@ -1750,44 +1750,31 @@ impl RowGroupReaderBuilder {
.boxed())
}
/// Builds a [ParquetRecordBatchStream] with a custom projection mask.
/// Builds a parquet record batch stream with a custom projection mask.
pub(crate) async fn build_with_projection(
&self,
row_group_idx: usize,
row_selection: Option<RowSelection>,
projection: ProjectionMask,
fetch_metrics: Option<&ParquetFetchMetrics>,
) -> Result<ParquetRecordBatchStream<SstAsyncFileReader>> {
// Create async file reader with caching support.
let async_reader = SstAsyncFileReader::new(
) -> Result<ProjectedRecordBatchStream> {
let range_fetcher = SstParquetRangeFetcher::new(
self.file_handle.file_id(),
self.file_path.clone(),
self.object_store.clone(),
self.cache_strategy.clone(),
self.parquet_meta.clone(),
row_group_idx,
)
.with_fetch_metrics(fetch_metrics.cloned());
// Build the async stream using ArrowReaderBuilder API.
let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata(
async_reader,
self.arrow_metadata.clone(),
fetch_metrics.cloned(),
);
builder = builder
.with_row_groups(vec![row_group_idx])
.with_projection(projection)
.with_batch_size(DEFAULT_READ_BATCH_SIZE);
if let Some(selection) = row_selection {
builder = builder.with_row_selection(selection);
}
let stream = builder.build().context(ReadParquetSnafu {
path: &self.file_path,
})?;
Ok(stream)
build_sst_parquet_record_batch_stream(
self.arrow_metadata.clone(),
row_group_idx,
row_selection,
projection,
range_fetcher,
self.file_path.clone(),
)
}
}

View File

@@ -22,13 +22,9 @@ use datatypes::arrow::datatypes::{DataType, FieldRef, SchemaRef};
use datatypes::arrow::record_batch::RecordBatch;
use futures::Stream;
use futures::stream::BoxStream;
use parquet::arrow::async_reader::ParquetRecordBatchStream;
use snafu::{IntoError, ResultExt, ensure};
use snafu::{ResultExt, ensure};
use crate::error::{
CastColumnSnafu, NewRecordBatchSnafu, ReadParquetSnafu, Result, UnexpectedSnafu,
};
use crate::sst::parquet::async_reader::SstAsyncFileReader;
use crate::error::{CastColumnSnafu, NewRecordBatchSnafu, Result, UnexpectedSnafu};
/// Aligns projected batches to the expected output schema for nested projections.
///
@@ -208,37 +204,6 @@ fn align_array(array: &ArrayRef, field: &FieldRef) -> Result<ArrayRef> {
cast_column(array, field.as_ref(), &DEFAULT_CAST_OPTIONS).context(CastColumnSnafu)
}
/// Maps parquet stream errors into mito errors before batches enter the filler.
pub(crate) struct ParquetErrorAdapter {
inner: ParquetRecordBatchStream<SstAsyncFileReader>,
path: String,
}
impl ParquetErrorAdapter {
pub(crate) fn new(inner: ParquetRecordBatchStream<SstAsyncFileReader>, path: String) -> Self {
Self { inner, path }
}
}
impl Stream for ParquetErrorAdapter {
type Item = Result<RecordBatch>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
match Pin::new(&mut this.inner).poll_next(cx) {
Poll::Ready(Some(Ok(rb))) => Poll::Ready(Some(Ok(rb))),
Poll::Ready(Some(Err(err))) => {
Poll::Ready(Some(Err(
ReadParquetSnafu { path: &this.path }.into_error(err)
)))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;