diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 923d8a2713..c6b69fe607 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -616,15 +616,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to read arrow record batch from parquet file {}", path))] - ArrowReader { - path: String, - #[snafu(source)] - error: ArrowError, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Column not found, column: {column}"))] ColumnNotFound { column: String, @@ -1349,7 +1340,6 @@ impl ErrorExt for Error { RegionState { .. } | UpdateManifest { .. } => StatusCode::RegionNotReady, JsonOptions { .. } => StatusCode::InvalidArguments, EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound, - ArrowReader { .. } => StatusCode::StorageUnavailable, ConvertValue { source, .. } => source.status_code(), ApplyBloomFilterIndex { source, .. } => source.status_code(), InvalidPartitionExpr { source, .. } => source.status_code(), diff --git a/src/mito2/src/memtable/bulk.rs b/src/mito2/src/memtable/bulk.rs index e649681b76..502b61759d 100644 --- a/src/mito2/src/memtable/bulk.rs +++ b/src/mito2/src/memtable/bulk.rs @@ -14,6 +14,7 @@ //! Memtable implementation for bulk load +pub(crate) mod chunk_reader; #[allow(unused)] pub mod context; #[allow(unused)] diff --git a/src/mito2/src/memtable/bulk/chunk_reader.rs b/src/mito2/src/memtable/bulk/chunk_reader.rs new file mode 100644 index 0000000000..e632cd1b37 --- /dev/null +++ b/src/mito2/src/memtable/bulk/chunk_reader.rs @@ -0,0 +1,65 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! ChunkReader implementation for in-memory parquet bytes. + +use std::io::Cursor; + +use bytes::Bytes; +use parquet::errors::{ParquetError, Result}; +use parquet::file::reader::{ChunkReader, Length}; + +/// A [ChunkReader] implementation for in-memory parquet bytes. +/// +/// This provides byte access to parquet data stored in memory (Bytes), +/// used for reading parquet data from bulk memtable. +#[derive(Clone)] +pub struct MemtableChunkReader { + /// The in-memory parquet data. + data: Bytes, +} + +impl MemtableChunkReader { + /// Creates a new [MemtableChunkReader] from the given bytes. + pub fn new(data: Bytes) -> Self { + Self { data } + } +} + +impl Length for MemtableChunkReader { + fn len(&self) -> u64 { + self.data.len() as u64 + } +} + +impl ChunkReader for MemtableChunkReader { + type T = Cursor; + + fn get_read(&self, start: u64) -> Result { + let start = start as usize; + if start > self.data.len() { + return Err(ParquetError::IndexOutOfBound(start, self.data.len())); + } + Ok(Cursor::new(self.data.slice(start..))) + } + + fn get_bytes(&self, start: u64, length: usize) -> Result { + let start = start as usize; + let end = start + length; + if end > self.data.len() { + return Err(ParquetError::IndexOutOfBound(end, self.data.len())); + } + Ok(self.data.slice(start..end)) + } +} diff --git a/src/mito2/src/memtable/bulk/part_reader.rs b/src/mito2/src/memtable/bulk/part_reader.rs index 904aae8c90..edb9ff52d9 100644 --- a/src/mito2/src/memtable/bulk/part_reader.rs +++ b/src/mito2/src/memtable/bulk/part_reader.rs @@ -30,7 +30,6 @@ use crate::memtable::{MemScanMetrics, MemScanMetricsData}; use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED}; use crate::sst::parquet::file_range::{PreFilterMode, TagDecodeState}; use crate::sst::parquet::flat_format::sequence_column_index; -use crate::sst::parquet::reader::RowGroupReaderContext; /// Iterator for reading data inside a bulk part. pub struct EncodedBulkPartIter { diff --git a/src/mito2/src/memtable/bulk/row_group_reader.rs b/src/mito2/src/memtable/bulk/row_group_reader.rs index fccd22db10..40a5b2f85d 100644 --- a/src/mito2/src/memtable/bulk/row_group_reader.rs +++ b/src/mito2/src/memtable/bulk/row_group_reader.rs @@ -12,124 +12,27 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::Range; use std::sync::Arc; use bytes::Bytes; -use datatypes::arrow::array::RecordBatch; -use datatypes::arrow::error::ArrowError; -use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowGroups, RowSelection}; -use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels}; -use parquet::column::page::{PageIterator, PageReader}; -use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; +use parquet::arrow::ProjectionMask; +use parquet::arrow::arrow_reader::{ + ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader, + ParquetRecordBatchReaderBuilder, RowSelection, +}; +use parquet::file::metadata::ParquetMetaData; use snafu::ResultExt; use crate::error; use crate::error::ReadDataPartSnafu; +use crate::memtable::bulk::chunk_reader::MemtableChunkReader; use crate::memtable::bulk::context::BulkIterContextRef; use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE; -use crate::sst::parquet::format::ReadFormat; -use crate::sst::parquet::reader::RowGroupReaderContext; -use crate::sst::parquet::row_group::{ColumnChunkIterator, RowGroupBase}; - -/// Helper for reading specific row group inside Memtable Parquet parts. -// This is similar to [mito2::sst::parquet::row_group::InMemoryRowGroup] since -// it's a workaround for lacking of keyword generics. -pub struct MemtableRowGroupPageFetcher<'a> { - /// Shared structs for reading row group. - base: RowGroupBase<'a>, - bytes: Bytes, -} - -impl<'a> MemtableRowGroupPageFetcher<'a> { - pub(crate) fn create( - row_group_idx: usize, - parquet_meta: &'a ParquetMetaData, - bytes: Bytes, - ) -> Self { - Self { - // the cached `column_uncompressed_pages` would never be used in Memtable readers. - base: RowGroupBase::new(parquet_meta, row_group_idx), - bytes, - } - } - - /// Fetches column pages from memory file. - pub(crate) fn fetch(&mut self, projection: &ProjectionMask, selection: Option<&RowSelection>) { - if let Some((selection, offset_index)) = selection.zip(self.base.offset_index) { - // Selection provided. - let (fetch_ranges, page_start_offsets) = - self.base - .calc_sparse_read_ranges(projection, offset_index, selection); - if fetch_ranges.is_empty() { - return; - } - let chunk_data = self.fetch_bytes(&fetch_ranges); - - self.base - .assign_sparse_chunk(projection, chunk_data, page_start_offsets); - } else { - let fetch_ranges = self.base.calc_dense_read_ranges(projection); - if fetch_ranges.is_empty() { - // Nothing to fetch. - return; - } - let chunk_data = self.fetch_bytes(&fetch_ranges); - self.base.assign_dense_chunk(projection, chunk_data); - } - } - - fn fetch_bytes(&self, ranges: &[Range]) -> Vec { - ranges - .iter() - .map(|range| self.bytes.slice(range.start as usize..range.end as usize)) - .collect() - } - - /// Creates a page reader to read column at `i`. - fn column_page_reader(&self, i: usize) -> parquet::errors::Result> { - let reader = self.base.column_reader(i)?; - Ok(Box::new(reader)) - } -} - -impl RowGroups for MemtableRowGroupPageFetcher<'_> { - fn num_rows(&self) -> usize { - self.base.row_count - } - - fn column_chunks(&self, i: usize) -> parquet::errors::Result> { - Ok(Box::new(ColumnChunkIterator { - reader: Some(self.column_page_reader(i)), - })) - } - - fn row_groups(&self) -> Box + '_> { - Box::new(std::iter::once(self.base.row_group_metadata())) - } - - fn metadata(&self) -> &ParquetMetaData { - self.base.parquet_metadata() - } -} - -impl RowGroupReaderContext for BulkIterContextRef { - fn map_result( - &self, - result: Result, ArrowError>, - ) -> error::Result> { - result.context(error::DecodeArrowRowGroupSnafu) - } - - fn read_format(&self) -> &ReadFormat { - self.as_ref().read_format() - } -} pub(crate) struct MemtableRowGroupReaderBuilder { projection: ProjectionMask, parquet_metadata: Arc, - field_levels: FieldLevels, + arrow_metadata: ArrowReaderMetadata, data: Bytes, } @@ -140,15 +43,16 @@ impl MemtableRowGroupReaderBuilder { parquet_metadata: Arc, data: Bytes, ) -> error::Result { - let parquet_schema_desc = parquet_metadata.file_metadata().schema_descr(); - let hint = Some(context.read_format().arrow_schema().fields()); - let field_levels = - parquet_to_arrow_field_levels(parquet_schema_desc, projection.clone(), hint) + // Create ArrowReaderMetadata for building the reader. + let arrow_reader_options = + ArrowReaderOptions::new().with_schema(context.read_format().arrow_schema().clone()); + let arrow_metadata = + ArrowReaderMetadata::try_new(parquet_metadata.clone(), arrow_reader_options) .context(ReadDataPartSnafu)?; Ok(Self { projection, parquet_metadata, - field_levels, + arrow_metadata, data, }) } @@ -159,23 +63,21 @@ impl MemtableRowGroupReaderBuilder { row_group_idx: usize, row_selection: Option, ) -> error::Result { - let mut row_group = MemtableRowGroupPageFetcher::create( - row_group_idx, - &self.parquet_metadata, - self.data.clone(), - ); - // Fetches data from memory part. Currently, row selection is not supported. - row_group.fetch(&self.projection, row_selection.as_ref()); + let chunk_reader = MemtableChunkReader::new(self.data.clone()); - // Builds the parquet reader. - // Now the row selection is None. - ParquetRecordBatchReader::try_new_with_row_groups( - &self.field_levels, - &row_group, - DEFAULT_READ_BATCH_SIZE, - row_selection, + let mut builder = ParquetRecordBatchReaderBuilder::new_with_metadata( + chunk_reader, + self.arrow_metadata.clone(), ) - .context(ReadDataPartSnafu) + .with_row_groups(vec![row_group_idx]) + .with_projection(self.projection.clone()) + .with_batch_size(DEFAULT_READ_BATCH_SIZE); + + if let Some(selection) = row_selection { + builder = builder.with_row_selection(selection); + } + + builder.build().context(ReadDataPartSnafu) } /// Computes whether to skip field filters for a specific row group based on PreFilterMode. diff --git a/src/mito2/src/read/last_row.rs b/src/mito2/src/read/last_row.rs index 0c13c120a0..1dc4102311 100644 --- a/src/mito2/src/read/last_row.rs +++ b/src/mito2/src/read/last_row.rs @@ -333,10 +333,10 @@ impl FlatRowGroupLastRowCachedReader { } /// Returns the next RecordBatch. - pub(crate) fn next_batch(&mut self) -> Result> { + pub(crate) async fn next_batch(&mut self) -> Result> { match self { FlatRowGroupLastRowCachedReader::Hit(r) => r.next_batch(), - FlatRowGroupLastRowCachedReader::Miss(r) => r.next_batch(), + FlatRowGroupLastRowCachedReader::Miss(r) => r.next_batch().await, } } @@ -466,12 +466,12 @@ impl FlatRowGroupLastRowReader { Ok(Some(merged)) } - fn next_batch(&mut self) -> Result> { + async fn next_batch(&mut self) -> Result> { if self.pending.is_full() { return self.flush_pending(); } - while let Some(batch) = self.reader.next_batch()? { + while let Some(batch) = self.reader.next_batch().await? { self.selector.on_next(batch, &mut self.pending)?; if self.pending.is_full() { return self.flush_pending(); diff --git a/src/mito2/src/read/prune.rs b/src/mito2/src/read/prune.rs index 2f9fa002d4..6766bf3f38 100644 --- a/src/mito2/src/read/prune.rs +++ b/src/mito2/src/read/prune.rs @@ -247,10 +247,10 @@ pub enum FlatSource { } impl FlatSource { - fn next_batch(&mut self) -> Result> { + async fn next_batch(&mut self) -> Result> { match self { - FlatSource::RowGroup(r) => r.next_batch(), - FlatSource::LastRow(r) => r.next_batch(), + FlatSource::RowGroup(r) => r.next_batch().await, + FlatSource::LastRow(r) => r.next_batch().await, } } } @@ -297,13 +297,16 @@ impl FlatPruneReader { self.metrics.clone() } - pub(crate) fn next_batch(&mut self) -> Result> { - while let Some(record_batch) = { + pub(crate) async fn next_batch(&mut self) -> Result> { + loop { let start = std::time::Instant::now(); - let batch = self.source.next_batch()?; + let batch = self.source.next_batch().await?; self.metrics.scan_cost += start.elapsed(); - batch - } { + + let Some(record_batch) = batch else { + return Ok(None); + }; + // Update metrics for the received batch self.metrics.num_rows += record_batch.num_rows(); self.metrics.num_batches += 1; @@ -317,8 +320,6 @@ impl FlatPruneReader { } } } - - Ok(None) } /// Prunes batches by the pushed down predicate and returns RecordBatch. diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 6f68616709..9bf1c17276 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -1533,7 +1533,7 @@ pub fn build_flat_file_range_scan_stream( .transpose()?; let mapper = range.compaction_projection_mapper(); - while let Some(record_batch) = reader.next_batch()? { + while let Some(record_batch) = reader.next_batch().await? { let record_batch = if let Some(mapper) = mapper { let batch = mapper.project(record_batch)?; batch diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index fb8e1d1fc2..79a08a209d 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -24,6 +24,7 @@ use crate::sst::DEFAULT_WRITE_BUFFER_SIZE; use crate::sst::file::FileTimeRange; use crate::sst::index::IndexOutput; +pub(crate) mod async_reader; pub mod file_range; pub mod flat_format; pub mod format; diff --git a/src/mito2/src/sst/parquet/async_reader.rs b/src/mito2/src/sst/parquet/async_reader.rs new file mode 100644 index 0000000000..a060fd367d --- /dev/null +++ b/src/mito2/src/sst/parquet/async_reader.rs @@ -0,0 +1,221 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Async file reader implementation for SST parquet files. + +use std::ops::Range; +use std::sync::Arc; + +use bytes::Bytes; +use futures::FutureExt; +use futures::future::BoxFuture; +use object_store::ObjectStore; +use parquet::arrow::async_reader::AsyncFileReader; +use parquet::errors::{ParquetError, Result as ParquetResult}; +use parquet::file::metadata::ParquetMetaData; + +use crate::cache::file_cache::{FileType, IndexKey}; +use crate::cache::{CacheStrategy, PageKey, PageValue}; +use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES}; +use crate::sst::file::RegionFileId; +use crate::sst::parquet::helper::fetch_byte_ranges; +use crate::sst::parquet::row_group::{ParquetFetchMetrics, compute_total_range_size}; + +/// An [AsyncFileReader] implementation for SST parquet files. +/// +/// This reader provides async byte access to parquet data in object storage, +/// with caching support (page cache and write cache). +pub struct SstAsyncFileReader { + /// Region file ID for cache key. + region_file_id: RegionFileId, + /// Path to the parquet file in object storage. + file_path: String, + /// Object store for reading data. + object_store: ObjectStore, + /// Cache strategy for reading pages. + cache_strategy: CacheStrategy, + /// Cached parquet metadata. + metadata: Arc, + /// Row group index for cache key. + row_group_idx: usize, + /// Optional metrics for tracking fetch operations. + fetch_metrics: Option, +} + +impl SstAsyncFileReader { + /// Creates a new [SstAsyncFileReader]. + pub fn new( + region_file_id: RegionFileId, + file_path: String, + object_store: ObjectStore, + cache_strategy: CacheStrategy, + metadata: Arc, + row_group_idx: usize, + ) -> Self { + Self { + region_file_id, + file_path, + object_store, + cache_strategy, + metadata, + row_group_idx, + fetch_metrics: None, + } + } + + /// Sets the fetch metrics. + pub fn with_fetch_metrics(mut self, metrics: Option) -> Self { + self.fetch_metrics = metrics; + self + } + + /// Fetches byte ranges from page cache, write cache, or object store. + async fn fetch_bytes_with_cache(&self, ranges: Vec>) -> ParquetResult> { + let fetch_start = self + .fetch_metrics + .as_ref() + .map(|_| std::time::Instant::now()); + let _timer = READ_STAGE_FETCH_PAGES.start_timer(); + + let page_key = PageKey::new( + self.region_file_id.file_id(), + self.row_group_idx, + ranges.clone(), + ); + + // Check page cache first. + if let Some(pages) = self.cache_strategy.get_pages(&page_key) { + if let Some(metrics) = &self.fetch_metrics { + let total_size: u64 = ranges.iter().map(|r| r.end - r.start).sum(); + let mut metrics_data = metrics.data.lock().unwrap(); + metrics_data.page_cache_hit += 1; + metrics_data.pages_to_fetch_mem += ranges.len(); + metrics_data.page_size_to_fetch_mem += total_size; + metrics_data.page_size_needed += total_size; + if let Some(start) = fetch_start { + metrics_data.total_fetch_elapsed += start.elapsed(); + } + } + return Ok(pages.compressed.clone()); + } + + // Calculate total range size for metrics. + let (total_range_size, unaligned_size) = compute_total_range_size(&ranges); + + // Check write cache. + let key = IndexKey::new( + self.region_file_id.region_id(), + self.region_file_id.file_id(), + FileType::Parquet, + ); + let fetch_write_cache_start = self + .fetch_metrics + .as_ref() + .map(|_| std::time::Instant::now()); + let write_cache_result = self.fetch_ranges_from_write_cache(key, &ranges).await; + + let pages = match write_cache_result { + Some(data) => { + if let Some(metrics) = &self.fetch_metrics { + let elapsed = fetch_write_cache_start + .map(|start| start.elapsed()) + .unwrap_or_default(); + let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum(); + let mut metrics_data = metrics.data.lock().unwrap(); + metrics_data.write_cache_fetch_elapsed += elapsed; + metrics_data.write_cache_hit += 1; + metrics_data.pages_to_fetch_write_cache += ranges.len(); + metrics_data.page_size_to_fetch_write_cache += unaligned_size; + metrics_data.page_size_needed += range_size_needed; + } + data + } + None => { + // Fetch data from object store. + let _timer = READ_STAGE_ELAPSED + .with_label_values(&["cache_miss_read"]) + .start_timer(); + + let start = self + .fetch_metrics + .as_ref() + .map(|_| std::time::Instant::now()); + let data = fetch_byte_ranges(&self.file_path, self.object_store.clone(), &ranges) + .await + .map_err(|e| ParquetError::External(Box::new(e)))?; + + if let Some(metrics) = &self.fetch_metrics { + let elapsed = start.map(|start| start.elapsed()).unwrap_or_default(); + let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum(); + let mut metrics_data = metrics.data.lock().unwrap(); + metrics_data.store_fetch_elapsed += elapsed; + metrics_data.cache_miss += 1; + metrics_data.pages_to_fetch_store += ranges.len(); + metrics_data.page_size_to_fetch_store += unaligned_size; + metrics_data.page_size_needed += range_size_needed; + } + data + } + }; + + // Put pages back to the cache. + let page_value = PageValue::new(pages.clone(), total_range_size); + self.cache_strategy + .put_pages(page_key, Arc::new(page_value)); + + if let (Some(metrics), Some(start)) = (&self.fetch_metrics, fetch_start) { + metrics.data.lock().unwrap().total_fetch_elapsed += start.elapsed(); + } + + Ok(pages) + } + + /// Fetches data from write cache. + /// Returns `None` if the data is not in the cache. + async fn fetch_ranges_from_write_cache( + &self, + key: IndexKey, + ranges: &[Range], + ) -> Option> { + if let Some(cache) = self.cache_strategy.write_cache() { + return cache.file_cache().read_ranges(key, ranges).await; + } + None + } +} + +impl AsyncFileReader for SstAsyncFileReader { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, ParquetResult> { + async move { + let mut result = self.fetch_bytes_with_cache(vec![range]).await?; + Ok(result.pop().unwrap_or_default()) + } + .boxed() + } + + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, ParquetResult>> { + async move { self.fetch_bytes_with_cache(ranges).await }.boxed() + } + + fn get_metadata( + &mut self, + _options: Option<&parquet::arrow::arrow_reader::ArrowReaderOptions>, + ) -> BoxFuture<'_, ParquetResult>> { + // Metadata is already cached, return it immediately. + std::future::ready(Ok(self.metadata.clone())).boxed() + } +} diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 855204b80e..f152c97075 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -26,14 +26,15 @@ use common_telemetry::{tracing, warn}; use datafusion_expr::Expr; use datatypes::arrow::array::ArrayRef; use datatypes::arrow::datatypes::Field; -use datatypes::arrow::error::ArrowError; use datatypes::arrow::record_batch::RecordBatch; use datatypes::data_type::ConcreteDataType; use datatypes::prelude::DataType; +use futures::StreamExt; use mito_codec::row_converter::build_primary_key_codec; use object_store::ObjectStore; -use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection}; -use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels}; +use parquet::arrow::ProjectionMask; +use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions, RowSelection}; +use parquet::arrow::async_reader::{ParquetRecordBatchStream, ParquetRecordBatchStreamBuilder}; use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData}; use partition::expr::PartitionExpr; use snafu::ResultExt; @@ -47,9 +48,7 @@ use crate::cache::index::result_cache::PredicateKey; use crate::cache::{CacheStrategy, CachedSstMeta}; #[cfg(feature = "vector_index")] use crate::error::ApplyVectorIndexSnafu; -use crate::error::{ - ArrowReaderSnafu, ReadDataPartSnafu, ReadParquetSnafu, Result, SerializePartitionExprSnafu, -}; +use crate::error::{ReadDataPartSnafu, ReadParquetSnafu, Result, SerializePartitionExprSnafu}; use crate::metrics::{ PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_TOTAL, READ_STAGE_ELAPSED, @@ -70,13 +69,14 @@ use crate::sst::index::inverted_index::applier::{ #[cfg(feature = "vector_index")] use crate::sst::index::vector_index::applier::VectorIndexApplierRef; use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE; +use crate::sst::parquet::async_reader::SstAsyncFileReader; use crate::sst::parquet::file_range::{ FileRangeContext, FileRangeContextRef, PartitionFilterContext, PreFilterMode, RangeBase, row_group_contains_delete, }; use crate::sst::parquet::format::{ReadFormat, need_override_sequence}; use crate::sst::parquet::metadata::MetadataLoader; -use crate::sst::parquet::row_group::{InMemoryRowGroup, ParquetFetchMetrics}; +use crate::sst::parquet::row_group::ParquetFetchMetrics; use crate::sst::parquet::row_selection::RowGroupSelection; use crate::sst::parquet::stats::RowGroupPruningStats; use crate::sst::tag_maybe_to_dictionary_field; @@ -415,6 +415,12 @@ impl ParquetReaderBuilder { .set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get())); } + // Computes the projection mask. + let parquet_schema_desc = parquet_meta.file_metadata().schema_descr(); + let indices = read_format.projection_indices(); + // Now we assumes we don't have nested schemas. + // TODO(yingwen): Revisit this if we introduce nested types such as JSON type. + let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied()); let selection = self .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics) .await; @@ -446,26 +452,20 @@ impl ParquetReaderBuilder { .map(|meta| meta.schema.clone()) .unwrap_or_else(|| region_meta.schema.clone()); - // Computes the projection mask. - let parquet_schema_desc = parquet_meta.file_metadata().schema_descr(); - let indices = read_format.projection_indices(); - // Now we assumes we don't have nested schemas. - // TODO(yingwen): Revisit this if we introduce nested types such as JSON type. - let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied()); - - // Computes the field levels. - let hint = Some(read_format.arrow_schema().fields()); - let field_levels = - parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint) + // Create ArrowReaderMetadata for async stream building. + let arrow_reader_options = + ArrowReaderOptions::new().with_schema(read_format.arrow_schema().clone()); + let arrow_metadata = + ArrowReaderMetadata::try_new(parquet_meta.clone(), arrow_reader_options) .context(ReadDataPartSnafu)?; let reader_builder = RowGroupReaderBuilder { file_handle: self.file_handle.clone(), file_path, parquet_meta, + arrow_metadata, object_store: self.object_store.clone(), projection: projection_mask, - field_levels, cache_strategy: self.cache_strategy.clone(), }; @@ -1640,7 +1640,7 @@ impl ReaderMetrics { } } -/// Builder to build a [ParquetRecordBatchReader] for a row group. +/// Builder to build a [ParquetRecordBatchStream] for a row group. pub(crate) struct RowGroupReaderBuilder { /// SST file to read. /// @@ -1650,12 +1650,12 @@ pub(crate) struct RowGroupReaderBuilder { file_path: String, /// Metadata of the parquet file. parquet_meta: Arc, + /// Arrow reader metadata for building async stream. + arrow_metadata: ArrowReaderMetadata, /// Object store as an Operator. object_store: ObjectStore, /// Projection mask. projection: ProjectionMask, - /// Field levels to read. - field_levels: FieldLevels, /// Cache. cache_strategy: CacheStrategy, } @@ -1679,48 +1679,43 @@ impl RowGroupReaderBuilder { &self.cache_strategy } - /// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`. + /// Builds a [ParquetRecordBatchStream] to read the row group at `row_group_idx`. pub(crate) async fn build( &self, row_group_idx: usize, row_selection: Option, fetch_metrics: Option<&ParquetFetchMetrics>, - ) -> Result { - let fetch_start = Instant::now(); - - let mut row_group = InMemoryRowGroup::create( - self.file_handle.region_id(), - self.file_handle.file_id().file_id(), - &self.parquet_meta, - row_group_idx, - self.cache_strategy.clone(), - &self.file_path, + ) -> Result> { + // Create async file reader with caching support. + let async_reader = SstAsyncFileReader::new( + self.file_handle.file_id(), + self.file_path.clone(), self.object_store.clone(), - ); - // Fetches data into memory. - row_group - .fetch(&self.projection, row_selection.as_ref(), fetch_metrics) - .await - .context(ReadParquetSnafu { - path: &self.file_path, - })?; + self.cache_strategy.clone(), + self.parquet_meta.clone(), + row_group_idx, + ) + .with_fetch_metrics(fetch_metrics.cloned()); - // Record total fetch elapsed time. - if let Some(metrics) = fetch_metrics { - metrics.data.lock().unwrap().total_fetch_elapsed += fetch_start.elapsed(); + // Build the async stream using ArrowReaderBuilder API. + let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata( + async_reader, + self.arrow_metadata.clone(), + ); + builder = builder + .with_row_groups(vec![row_group_idx]) + .with_projection(self.projection.clone()) + .with_batch_size(DEFAULT_READ_BATCH_SIZE); + + if let Some(selection) = row_selection { + builder = builder.with_row_selection(selection); } - // Builds the parquet reader. - // Now the row selection is None. - ParquetRecordBatchReader::try_new_with_row_groups( - &self.field_levels, - &row_group, - DEFAULT_READ_BATCH_SIZE, - row_selection, - ) - .context(ReadParquetSnafu { + let stream = builder.build().context(ReadParquetSnafu { path: &self.file_path, - }) + })?; + + Ok(stream) } } @@ -1850,7 +1845,7 @@ impl ParquetReader { pub async fn next_record_batch(&mut self) -> Result> { loop { if let Some(reader) = &mut self.reader { - if let Some(batch) = reader.next_batch()? { + if let Some(batch) = reader.next_batch().await? { return Ok(Some(batch)); } self.reader = None; @@ -1929,27 +1924,19 @@ impl ParquetReader { /// RowGroupReaderContext represents the fields that cannot be shared /// between different `RowGroupReader`s. pub(crate) trait RowGroupReaderContext: Send { - fn map_result( - &self, - result: std::result::Result, ArrowError>, - ) -> Result>; - fn read_format(&self) -> &ReadFormat; + + fn file_path(&self) -> &str; } impl RowGroupReaderContext for FileRangeContextRef { - fn map_result( - &self, - result: std::result::Result, ArrowError>, - ) -> Result> { - result.context(ArrowReaderSnafu { - path: self.file_path(), - }) - } - fn read_format(&self) -> &ReadFormat { self.as_ref().read_format() } + + fn file_path(&self) -> &str { + self.as_ref().file_path() + } } /// [RowGroupReader] that reads from [FileRange]. @@ -1957,8 +1944,11 @@ pub(crate) type RowGroupReader = RowGroupReaderBase; impl RowGroupReader { /// Creates a new reader from file range. - pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self { - Self::create(context, reader) + pub(crate) fn new( + context: FileRangeContextRef, + stream: ParquetRecordBatchStream, + ) -> Self { + Self::create(context, stream) } } @@ -1966,8 +1956,8 @@ impl RowGroupReader { pub(crate) struct RowGroupReaderBase { /// Context of [RowGroupReader] so adapts to different underlying implementation. context: T, - /// Inner parquet reader. - reader: ParquetRecordBatchReader, + /// Inner parquet record batch stream. + stream: ParquetRecordBatchStream, /// Buffered batches to return. batches: VecDeque, /// Local scan metrics. @@ -1981,7 +1971,7 @@ where T: RowGroupReaderContext, { /// Creates a new reader to read the primary key format. - pub(crate) fn create(context: T, reader: ParquetRecordBatchReader) -> Self { + pub(crate) fn create(context: T, stream: ParquetRecordBatchStream) -> Self { // The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE. let override_sequence = context .read_format() @@ -1990,7 +1980,7 @@ where Self { context, - reader, + stream, batches: VecDeque::new(), metrics: ReaderMetrics::default(), override_sequence, @@ -2007,13 +1997,18 @@ where self.context.read_format() } - /// Tries to fetch next [RecordBatch] from the reader. - fn fetch_next_record_batch(&mut self) -> Result> { - self.context.map_result(self.reader.next().transpose()) + /// Tries to fetch next [RecordBatch] from the stream asynchronously. + async fn fetch_next_record_batch(&mut self) -> Result> { + match self.stream.next().await.transpose() { + Ok(batch) => Ok(batch), + Err(e) => Err(e).context(ReadParquetSnafu { + path: self.context.file_path(), + }), + } } /// Returns the next [Batch]. - pub(crate) fn next_inner(&mut self) -> Result> { + pub(crate) async fn next_inner(&mut self) -> Result> { let scan_start = Instant::now(); if let Some(batch) = self.batches.pop_front() { self.metrics.num_rows += batch.num_rows(); @@ -2023,7 +2018,7 @@ where // We need to fetch next record batch and convert it to batches. while self.batches.is_empty() { - let Some(record_batch) = self.fetch_next_record_batch()? else { + let Some(record_batch) = self.fetch_next_record_batch().await? else { self.metrics.scan_cost += scan_start.elapsed(); return Ok(None); }; @@ -2051,10 +2046,10 @@ where #[async_trait::async_trait] impl BatchReader for RowGroupReaderBase where - T: RowGroupReaderContext, + T: RowGroupReaderContext + Send + Sync, { async fn next_batch(&mut self) -> Result> { - self.next_inner() + self.next_inner().await } } @@ -2062,15 +2057,18 @@ where pub(crate) struct FlatRowGroupReader { /// Context for file ranges. context: FileRangeContextRef, - /// Inner parquet reader. - reader: ParquetRecordBatchReader, + /// Inner parquet record batch stream. + stream: ParquetRecordBatchStream, /// Cached sequence array to override sequences. override_sequence: Option, } impl FlatRowGroupReader { /// Creates a new flat reader from file range. - pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self { + pub(crate) fn new( + context: FileRangeContextRef, + stream: ParquetRecordBatchStream, + ) -> Self { // The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE. let override_sequence = context .read_format() @@ -2078,16 +2076,16 @@ impl FlatRowGroupReader { Self { context, - reader, + stream, override_sequence, } } /// Returns the next RecordBatch. - pub(crate) fn next_batch(&mut self) -> Result> { - match self.reader.next() { + pub(crate) async fn next_batch(&mut self) -> Result> { + match self.stream.next().await { Some(batch_result) => { - let record_batch = batch_result.context(ArrowReaderSnafu { + let record_batch = batch_result.context(ReadParquetSnafu { path: self.context.file_path(), })?; diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index 8f3f6c5f62..38ef62c6b8 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -12,28 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Ports private structs from [parquet crate](https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/arrow/async_reader/mod.rs#L644-L650). +//! Parquet row group reading utilities. use std::ops::Range; use std::sync::Arc; -use bytes::{Buf, Bytes}; -use object_store::ObjectStore; -use parquet::arrow::ProjectionMask; -use parquet::arrow::arrow_reader::{RowGroups, RowSelection}; -use parquet::column::page::{PageIterator, PageReader}; -use parquet::errors::{ParquetError, Result}; -use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; -use parquet::file::page_index::offset_index::OffsetIndexMetaData; -use parquet::file::reader::{ChunkReader, Length}; -use parquet::file::serialized_reader::SerializedPageReader; -use store_api::storage::{FileId, RegionId}; -use tokio::task::yield_now; - -use crate::cache::file_cache::{FileType, IndexKey}; -use crate::cache::{CacheStrategy, PageKey, PageValue}; -use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES}; -use crate::sst::parquet::helper::{MERGE_GAP, fetch_byte_ranges}; +use crate::sst::parquet::helper::MERGE_GAP; /// Inner data for ParquetFetchMetrics. #[derive(Default, Debug, Clone)] @@ -74,9 +58,9 @@ impl ParquetFetchMetricsData { } /// Metrics for tracking page/row group fetch operations. -#[derive(Default)] +#[derive(Default, Clone)] pub struct ParquetFetchMetrics { - pub data: std::sync::Mutex, + pub data: Arc>, } impl std::fmt::Debug for ParquetFetchMetrics { @@ -204,363 +188,12 @@ impl ParquetFetchMetrics { } } -pub(crate) struct RowGroupBase<'a> { - parquet_metadata: &'a ParquetMetaData, - row_group_idx: usize, - pub(crate) offset_index: Option<&'a [OffsetIndexMetaData]>, - /// Compressed page of each column. - column_chunks: Vec>>, - pub(crate) row_count: usize, -} - -impl<'a> RowGroupBase<'a> { - pub(crate) fn new(parquet_meta: &'a ParquetMetaData, row_group_idx: usize) -> Self { - let metadata = parquet_meta.row_group(row_group_idx); - // `offset_index` is always `None` if we don't set - // [with_page_index()](https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderOptions.html#method.with_page_index) - // to `true`. - let offset_index = parquet_meta - .offset_index() - // filter out empty offset indexes (old versions specified Some(vec![]) when no present) - .filter(|index| !index.is_empty()) - .map(|x| x[row_group_idx].as_slice()); - - Self { - parquet_metadata: parquet_meta, - row_group_idx, - offset_index, - column_chunks: vec![None; metadata.columns().len()], - row_count: metadata.num_rows() as usize, - } - } - - pub(crate) fn calc_sparse_read_ranges( - &self, - projection: &ProjectionMask, - offset_index: &[OffsetIndexMetaData], - selection: &RowSelection, - ) -> (Vec>, Vec>) { - // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the - // `RowSelection` - let mut page_start_offsets: Vec> = vec![]; - let ranges = self - .column_chunks - .iter() - .zip(self.row_group_metadata().columns()) - .enumerate() - .filter(|&(idx, (chunk, _chunk_meta))| chunk.is_none() && projection.leaf_included(idx)) - .flat_map(|(idx, (_chunk, chunk_meta))| { - // If the first page does not start at the beginning of the column, - // then we need to also fetch a dictionary page. - let mut ranges = vec![]; - let (start, _len) = chunk_meta.byte_range(); - match offset_index[idx].page_locations.first() { - Some(first) if first.offset as u64 != start => { - ranges.push(start..first.offset as u64); - } - _ => (), - } - - ranges.extend( - selection - .scan_ranges(&offset_index[idx].page_locations) - .iter() - .map(|range| range.start..range.end), - ); - page_start_offsets.push(ranges.iter().map(|range| range.start as usize).collect()); - - ranges - }) - .collect::>(); - (ranges, page_start_offsets) - } - - pub(crate) fn assign_sparse_chunk( - &mut self, - projection: &ProjectionMask, - data: Vec, - page_start_offsets: Vec>, - ) { - let mut page_start_offsets = page_start_offsets.into_iter(); - let mut chunk_data = data.into_iter(); - - for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { - if chunk.is_some() || !projection.leaf_included(idx) { - continue; - } - - if let Some(offsets) = page_start_offsets.next() { - let mut chunks = Vec::with_capacity(offsets.len()); - for _ in 0..offsets.len() { - chunks.push(chunk_data.next().unwrap()); - } - - let column = self - .parquet_metadata - .row_group(self.row_group_idx) - .column(idx); - *chunk = Some(Arc::new(ColumnChunkData::Sparse { - length: column.byte_range().1 as usize, - data: offsets.into_iter().zip(chunks).collect(), - })) - } - } - } - - pub(crate) fn calc_dense_read_ranges(&self, projection: &ProjectionMask) -> Vec> { - self.column_chunks - .iter() - .enumerate() - .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx)) - .map(|(idx, _chunk)| { - let column = self.row_group_metadata().column(idx); - let (start, length) = column.byte_range(); - start..(start + length) - }) - .collect::>() - } - - /// Assigns compressed chunk binary data to [RowGroupBase::column_chunks] - /// and returns the chunk offset and binary data assigned. - pub(crate) fn assign_dense_chunk( - &mut self, - projection: &ProjectionMask, - chunk_data: Vec, - ) { - let mut chunk_data = chunk_data.into_iter(); - - for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { - if chunk.is_some() || !projection.leaf_included(idx) { - continue; - } - - // Get the fetched page. - let Some(data) = chunk_data.next() else { - continue; - }; - - let column = self - .parquet_metadata - .row_group(self.row_group_idx) - .column(idx); - *chunk = Some(Arc::new(ColumnChunkData::Dense { - offset: column.byte_range().0 as usize, - data, - })); - } - } - - /// Create [PageReader] from [RowGroupBase::column_chunks] - pub(crate) fn column_reader( - &self, - col_idx: usize, - ) -> Result> { - let page_reader = match &self.column_chunks[col_idx] { - None => { - return Err(ParquetError::General(format!( - "Invalid column index {col_idx}, column was not fetched" - ))); - } - Some(data) => { - let page_locations = self - .offset_index - // filter out empty offset indexes (old versions specified Some(vec![]) when no present) - .filter(|index| !index.is_empty()) - .map(|index| index[col_idx].page_locations.clone()); - SerializedPageReader::new( - data.clone(), - self.row_group_metadata().column(col_idx), - self.row_count, - page_locations, - )? - } - }; - - Ok(page_reader) - } - - pub(crate) fn parquet_metadata(&self) -> &ParquetMetaData { - self.parquet_metadata - } - - pub(crate) fn row_group_metadata(&self) -> &RowGroupMetaData { - self.parquet_metadata().row_group(self.row_group_idx) - } -} - -/// An in-memory collection of column chunks -pub struct InMemoryRowGroup<'a> { - region_id: RegionId, - file_id: FileId, - row_group_idx: usize, - cache_strategy: CacheStrategy, - file_path: &'a str, - /// Object store. - object_store: ObjectStore, - base: RowGroupBase<'a>, -} - -impl<'a> InMemoryRowGroup<'a> { - /// Creates a new [InMemoryRowGroup] by `row_group_idx`. - /// - /// # Panics - /// Panics if the `row_group_idx` is invalid. - pub fn create( - region_id: RegionId, - file_id: FileId, - parquet_meta: &'a ParquetMetaData, - row_group_idx: usize, - cache_strategy: CacheStrategy, - file_path: &'a str, - object_store: ObjectStore, - ) -> Self { - Self { - region_id, - file_id, - row_group_idx, - cache_strategy, - file_path, - object_store, - base: RowGroupBase::new(parquet_meta, row_group_idx), - } - } - - /// Fetches the necessary column data into memory - pub async fn fetch( - &mut self, - projection: &ProjectionMask, - selection: Option<&RowSelection>, - metrics: Option<&ParquetFetchMetrics>, - ) -> Result<()> { - if let Some((selection, offset_index)) = selection.zip(self.base.offset_index) { - let (fetch_ranges, page_start_offsets) = - self.base - .calc_sparse_read_ranges(projection, offset_index, selection); - - let chunk_data = self.fetch_bytes(&fetch_ranges, metrics).await?; - // Assign sparse chunk data to base. - self.base - .assign_sparse_chunk(projection, chunk_data, page_start_offsets); - } else { - // Release the CPU to avoid blocking the runtime. Since `fetch_pages_from_cache` - // is a synchronous, CPU-bound operation. - yield_now().await; - - // Calculate ranges to read. - let fetch_ranges = self.base.calc_dense_read_ranges(projection); - - if fetch_ranges.is_empty() { - // Nothing to fetch. - return Ok(()); - } - - // Fetch data with ranges - let chunk_data = self.fetch_bytes(&fetch_ranges, metrics).await?; - - // Assigns fetched data to base. - self.base.assign_dense_chunk(projection, chunk_data); - } - - Ok(()) - } - - /// Try to fetch data from the memory cache or the WriteCache, - /// if not in WriteCache, fetch data from object store directly. - async fn fetch_bytes( - &self, - ranges: &[Range], - metrics: Option<&ParquetFetchMetrics>, - ) -> Result> { - // Now fetch page timer includes the whole time to read pages. - let _timer = READ_STAGE_FETCH_PAGES.start_timer(); - - let page_key = PageKey::new(self.file_id, self.row_group_idx, ranges.to_vec()); - if let Some(pages) = self.cache_strategy.get_pages(&page_key) { - if let Some(metrics) = metrics { - let total_size: u64 = ranges.iter().map(|r| r.end - r.start).sum(); - let mut metrics_data = metrics.data.lock().unwrap(); - metrics_data.page_cache_hit += 1; - metrics_data.pages_to_fetch_mem += ranges.len(); - metrics_data.page_size_to_fetch_mem += total_size; - metrics_data.page_size_needed += total_size; - } - return Ok(pages.compressed.clone()); - } - - // Calculate total range size for metrics. - let (total_range_size, unaligned_size) = compute_total_range_size(ranges); - - let key = IndexKey::new(self.region_id, self.file_id, FileType::Parquet); - let fetch_write_cache_start = metrics.map(|_| std::time::Instant::now()); - let write_cache_result = self.fetch_ranges_from_write_cache(key, ranges).await; - let pages = match write_cache_result { - Some(data) => { - if let Some(metrics) = metrics { - let elapsed = fetch_write_cache_start - .map(|start| start.elapsed()) - .unwrap_or_default(); - let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum(); - let mut metrics_data = metrics.data.lock().unwrap(); - metrics_data.write_cache_fetch_elapsed += elapsed; - metrics_data.write_cache_hit += 1; - metrics_data.pages_to_fetch_write_cache += ranges.len(); - metrics_data.page_size_to_fetch_write_cache += unaligned_size; - metrics_data.page_size_needed += range_size_needed; - } - data - } - None => { - // Fetch data from object store. - let _timer = READ_STAGE_ELAPSED - .with_label_values(&["cache_miss_read"]) - .start_timer(); - - let start = metrics.map(|_| std::time::Instant::now()); - let data = fetch_byte_ranges(self.file_path, self.object_store.clone(), ranges) - .await - .map_err(|e| ParquetError::External(Box::new(e)))?; - if let Some(metrics) = metrics { - let elapsed = start.map(|start| start.elapsed()).unwrap_or_default(); - let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum(); - let mut metrics_data = metrics.data.lock().unwrap(); - metrics_data.store_fetch_elapsed += elapsed; - metrics_data.cache_miss += 1; - metrics_data.pages_to_fetch_store += ranges.len(); - metrics_data.page_size_to_fetch_store += unaligned_size; - metrics_data.page_size_needed += range_size_needed; - } - data - } - }; - - // Put pages back to the cache. - let page_value = PageValue::new(pages.clone(), total_range_size); - self.cache_strategy - .put_pages(page_key, Arc::new(page_value)); - - Ok(pages) - } - - /// Fetches data from write cache. - /// Returns `None` if the data is not in the cache. - async fn fetch_ranges_from_write_cache( - &self, - key: IndexKey, - ranges: &[Range], - ) -> Option> { - if let Some(cache) = self.cache_strategy.write_cache() { - return cache.file_cache().read_ranges(key, ranges).await; - } - None - } -} - /// Computes the max possible buffer size to read the given `ranges`. /// Returns (aligned_size, unaligned_size) where: /// - aligned_size: total size aligned to pooled buffer size /// - unaligned_size: actual total size without alignment // See https://github.com/apache/opendal/blob/v0.54.0/core/src/types/read/reader.rs#L166-L192 -fn compute_total_range_size(ranges: &[Range]) -> (u64, u64) { +pub(crate) fn compute_total_range_size(ranges: &[Range]) -> (u64, u64) { if ranges.is_empty() { return (0, 0); } @@ -602,96 +235,3 @@ fn align_to_pooled_buf_size(size: u64) -> u64 { const POOLED_BUF_SIZE: u64 = 2 * 1024 * 1024; size.div_ceil(POOLED_BUF_SIZE) * POOLED_BUF_SIZE } - -impl RowGroups for InMemoryRowGroup<'_> { - fn num_rows(&self) -> usize { - self.base.row_count - } - - fn column_chunks(&self, i: usize) -> Result> { - // Creates a page reader to read column at `i`. - let page_reader = self.base.column_reader(i)?; - - Ok(Box::new(ColumnChunkIterator { - reader: Some(Ok(Box::new(page_reader))), - })) - } - - fn row_groups(&self) -> Box + '_> { - Box::new(std::iter::once(self.base.row_group_metadata())) - } - - fn metadata(&self) -> &ParquetMetaData { - self.base.parquet_metadata() - } -} - -/// An in-memory column chunk -#[derive(Clone)] -pub(crate) enum ColumnChunkData { - /// Column chunk data representing only a subset of data pages - Sparse { - /// Length of the full column chunk - length: usize, - /// Set of data pages included in this sparse chunk. Each element is a tuple - /// of (page offset, page data) - data: Vec<(usize, Bytes)>, - }, - /// Full column chunk and its offset - Dense { offset: usize, data: Bytes }, -} - -impl ColumnChunkData { - fn get(&self, start: u64) -> Result { - match &self { - ColumnChunkData::Sparse { data, .. } => data - .binary_search_by_key(&start, |(offset, _)| *offset as u64) - .map(|idx| data[idx].1.clone()) - .map_err(|_| { - ParquetError::General(format!( - "Invalid offset in sparse column chunk data: {start}" - )) - }), - ColumnChunkData::Dense { offset, data } => { - let start = start as usize - *offset; - Ok(data.slice(start..)) - } - } - } -} - -impl Length for ColumnChunkData { - fn len(&self) -> u64 { - match &self { - ColumnChunkData::Sparse { length, .. } => *length as u64, - ColumnChunkData::Dense { data, .. } => data.len() as u64, - } - } -} - -impl ChunkReader for ColumnChunkData { - type T = bytes::buf::Reader; - - fn get_read(&self, start: u64) -> Result { - Ok(self.get(start)?.reader()) - } - - fn get_bytes(&self, start: u64, length: usize) -> Result { - Ok(self.get(start)?.slice(..length)) - } -} - -/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`] -pub(crate) struct ColumnChunkIterator { - pub(crate) reader: Option>>, -} - -impl Iterator for ColumnChunkIterator { - type Item = Result>; - - fn next(&mut self) -> Option { - self.reader.take() - } -} - -impl PageIterator for ColumnChunkIterator {}