feat: use ArrowReaderBuilder instead of the RowGroups API (#7853)

* feat: use ArrowReaderBuilder instead of the RowGroups API

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: make row_group_idx required

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: remove unsed variant

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: collect total_fetch_elapsed metrics

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2026-03-25 11:10:19 +08:00
committed by GitHub
parent 13cdfa9b59
commit 04aa84af62
12 changed files with 423 additions and 705 deletions

View File

@@ -616,15 +616,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to read arrow record batch from parquet file {}", path))]
ArrowReader {
path: String,
#[snafu(source)]
error: ArrowError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Column not found, column: {column}"))]
ColumnNotFound {
column: String,
@@ -1349,7 +1340,6 @@ impl ErrorExt for Error {
RegionState { .. } | UpdateManifest { .. } => StatusCode::RegionNotReady,
JsonOptions { .. } => StatusCode::InvalidArguments,
EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound,
ArrowReader { .. } => StatusCode::StorageUnavailable,
ConvertValue { source, .. } => source.status_code(),
ApplyBloomFilterIndex { source, .. } => source.status_code(),
InvalidPartitionExpr { source, .. } => source.status_code(),

View File

@@ -14,6 +14,7 @@
//! Memtable implementation for bulk load
pub(crate) mod chunk_reader;
#[allow(unused)]
pub mod context;
#[allow(unused)]

View File

@@ -0,0 +1,65 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! ChunkReader implementation for in-memory parquet bytes.
use std::io::Cursor;
use bytes::Bytes;
use parquet::errors::{ParquetError, Result};
use parquet::file::reader::{ChunkReader, Length};
/// A [ChunkReader] implementation for in-memory parquet bytes.
///
/// This provides byte access to parquet data stored in memory (Bytes),
/// used for reading parquet data from bulk memtable.
#[derive(Clone)]
pub struct MemtableChunkReader {
/// The in-memory parquet data.
data: Bytes,
}
impl MemtableChunkReader {
/// Creates a new [MemtableChunkReader] from the given bytes.
pub fn new(data: Bytes) -> Self {
Self { data }
}
}
impl Length for MemtableChunkReader {
fn len(&self) -> u64 {
self.data.len() as u64
}
}
impl ChunkReader for MemtableChunkReader {
type T = Cursor<Bytes>;
fn get_read(&self, start: u64) -> Result<Self::T> {
let start = start as usize;
if start > self.data.len() {
return Err(ParquetError::IndexOutOfBound(start, self.data.len()));
}
Ok(Cursor::new(self.data.slice(start..)))
}
fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
let start = start as usize;
let end = start + length;
if end > self.data.len() {
return Err(ParquetError::IndexOutOfBound(end, self.data.len()));
}
Ok(self.data.slice(start..end))
}
}

View File

@@ -30,7 +30,6 @@ use crate::memtable::{MemScanMetrics, MemScanMetricsData};
use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
use crate::sst::parquet::file_range::{PreFilterMode, TagDecodeState};
use crate::sst::parquet::flat_format::sequence_column_index;
use crate::sst::parquet::reader::RowGroupReaderContext;
/// Iterator for reading data inside a bulk part.
pub struct EncodedBulkPartIter {

View File

@@ -12,124 +12,27 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::Range;
use std::sync::Arc;
use bytes::Bytes;
use datatypes::arrow::array::RecordBatch;
use datatypes::arrow::error::ArrowError;
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowGroups, RowSelection};
use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
use parquet::column::page::{PageIterator, PageReader};
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
use parquet::arrow::ProjectionMask;
use parquet::arrow::arrow_reader::{
ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
ParquetRecordBatchReaderBuilder, RowSelection,
};
use parquet::file::metadata::ParquetMetaData;
use snafu::ResultExt;
use crate::error;
use crate::error::ReadDataPartSnafu;
use crate::memtable::bulk::chunk_reader::MemtableChunkReader;
use crate::memtable::bulk::context::BulkIterContextRef;
use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::reader::RowGroupReaderContext;
use crate::sst::parquet::row_group::{ColumnChunkIterator, RowGroupBase};
/// Helper for reading specific row group inside Memtable Parquet parts.
// This is similar to [mito2::sst::parquet::row_group::InMemoryRowGroup] since
// it's a workaround for lacking of keyword generics.
pub struct MemtableRowGroupPageFetcher<'a> {
/// Shared structs for reading row group.
base: RowGroupBase<'a>,
bytes: Bytes,
}
impl<'a> MemtableRowGroupPageFetcher<'a> {
pub(crate) fn create(
row_group_idx: usize,
parquet_meta: &'a ParquetMetaData,
bytes: Bytes,
) -> Self {
Self {
// the cached `column_uncompressed_pages` would never be used in Memtable readers.
base: RowGroupBase::new(parquet_meta, row_group_idx),
bytes,
}
}
/// Fetches column pages from memory file.
pub(crate) fn fetch(&mut self, projection: &ProjectionMask, selection: Option<&RowSelection>) {
if let Some((selection, offset_index)) = selection.zip(self.base.offset_index) {
// Selection provided.
let (fetch_ranges, page_start_offsets) =
self.base
.calc_sparse_read_ranges(projection, offset_index, selection);
if fetch_ranges.is_empty() {
return;
}
let chunk_data = self.fetch_bytes(&fetch_ranges);
self.base
.assign_sparse_chunk(projection, chunk_data, page_start_offsets);
} else {
let fetch_ranges = self.base.calc_dense_read_ranges(projection);
if fetch_ranges.is_empty() {
// Nothing to fetch.
return;
}
let chunk_data = self.fetch_bytes(&fetch_ranges);
self.base.assign_dense_chunk(projection, chunk_data);
}
}
fn fetch_bytes(&self, ranges: &[Range<u64>]) -> Vec<Bytes> {
ranges
.iter()
.map(|range| self.bytes.slice(range.start as usize..range.end as usize))
.collect()
}
/// Creates a page reader to read column at `i`.
fn column_page_reader(&self, i: usize) -> parquet::errors::Result<Box<dyn PageReader>> {
let reader = self.base.column_reader(i)?;
Ok(Box::new(reader))
}
}
impl RowGroups for MemtableRowGroupPageFetcher<'_> {
fn num_rows(&self) -> usize {
self.base.row_count
}
fn column_chunks(&self, i: usize) -> parquet::errors::Result<Box<dyn PageIterator>> {
Ok(Box::new(ColumnChunkIterator {
reader: Some(self.column_page_reader(i)),
}))
}
fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
Box::new(std::iter::once(self.base.row_group_metadata()))
}
fn metadata(&self) -> &ParquetMetaData {
self.base.parquet_metadata()
}
}
impl RowGroupReaderContext for BulkIterContextRef {
fn map_result(
&self,
result: Result<Option<RecordBatch>, ArrowError>,
) -> error::Result<Option<RecordBatch>> {
result.context(error::DecodeArrowRowGroupSnafu)
}
fn read_format(&self) -> &ReadFormat {
self.as_ref().read_format()
}
}
pub(crate) struct MemtableRowGroupReaderBuilder {
projection: ProjectionMask,
parquet_metadata: Arc<ParquetMetaData>,
field_levels: FieldLevels,
arrow_metadata: ArrowReaderMetadata,
data: Bytes,
}
@@ -140,15 +43,16 @@ impl MemtableRowGroupReaderBuilder {
parquet_metadata: Arc<ParquetMetaData>,
data: Bytes,
) -> error::Result<Self> {
let parquet_schema_desc = parquet_metadata.file_metadata().schema_descr();
let hint = Some(context.read_format().arrow_schema().fields());
let field_levels =
parquet_to_arrow_field_levels(parquet_schema_desc, projection.clone(), hint)
// Create ArrowReaderMetadata for building the reader.
let arrow_reader_options =
ArrowReaderOptions::new().with_schema(context.read_format().arrow_schema().clone());
let arrow_metadata =
ArrowReaderMetadata::try_new(parquet_metadata.clone(), arrow_reader_options)
.context(ReadDataPartSnafu)?;
Ok(Self {
projection,
parquet_metadata,
field_levels,
arrow_metadata,
data,
})
}
@@ -159,23 +63,21 @@ impl MemtableRowGroupReaderBuilder {
row_group_idx: usize,
row_selection: Option<RowSelection>,
) -> error::Result<ParquetRecordBatchReader> {
let mut row_group = MemtableRowGroupPageFetcher::create(
row_group_idx,
&self.parquet_metadata,
self.data.clone(),
);
// Fetches data from memory part. Currently, row selection is not supported.
row_group.fetch(&self.projection, row_selection.as_ref());
let chunk_reader = MemtableChunkReader::new(self.data.clone());
// Builds the parquet reader.
// Now the row selection is None.
ParquetRecordBatchReader::try_new_with_row_groups(
&self.field_levels,
&row_group,
DEFAULT_READ_BATCH_SIZE,
row_selection,
let mut builder = ParquetRecordBatchReaderBuilder::new_with_metadata(
chunk_reader,
self.arrow_metadata.clone(),
)
.context(ReadDataPartSnafu)
.with_row_groups(vec![row_group_idx])
.with_projection(self.projection.clone())
.with_batch_size(DEFAULT_READ_BATCH_SIZE);
if let Some(selection) = row_selection {
builder = builder.with_row_selection(selection);
}
builder.build().context(ReadDataPartSnafu)
}
/// Computes whether to skip field filters for a specific row group based on PreFilterMode.

View File

@@ -333,10 +333,10 @@ impl FlatRowGroupLastRowCachedReader {
}
/// Returns the next RecordBatch.
pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
pub(crate) async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
match self {
FlatRowGroupLastRowCachedReader::Hit(r) => r.next_batch(),
FlatRowGroupLastRowCachedReader::Miss(r) => r.next_batch(),
FlatRowGroupLastRowCachedReader::Miss(r) => r.next_batch().await,
}
}
@@ -466,12 +466,12 @@ impl FlatRowGroupLastRowReader {
Ok(Some(merged))
}
fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
if self.pending.is_full() {
return self.flush_pending();
}
while let Some(batch) = self.reader.next_batch()? {
while let Some(batch) = self.reader.next_batch().await? {
self.selector.on_next(batch, &mut self.pending)?;
if self.pending.is_full() {
return self.flush_pending();

View File

@@ -247,10 +247,10 @@ pub enum FlatSource {
}
impl FlatSource {
fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
match self {
FlatSource::RowGroup(r) => r.next_batch(),
FlatSource::LastRow(r) => r.next_batch(),
FlatSource::RowGroup(r) => r.next_batch().await,
FlatSource::LastRow(r) => r.next_batch().await,
}
}
}
@@ -297,13 +297,16 @@ impl FlatPruneReader {
self.metrics.clone()
}
pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
while let Some(record_batch) = {
pub(crate) async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
loop {
let start = std::time::Instant::now();
let batch = self.source.next_batch()?;
let batch = self.source.next_batch().await?;
self.metrics.scan_cost += start.elapsed();
batch
} {
let Some(record_batch) = batch else {
return Ok(None);
};
// Update metrics for the received batch
self.metrics.num_rows += record_batch.num_rows();
self.metrics.num_batches += 1;
@@ -317,8 +320,6 @@ impl FlatPruneReader {
}
}
}
Ok(None)
}
/// Prunes batches by the pushed down predicate and returns RecordBatch.

View File

@@ -1533,7 +1533,7 @@ pub fn build_flat_file_range_scan_stream(
.transpose()?;
let mapper = range.compaction_projection_mapper();
while let Some(record_batch) = reader.next_batch()? {
while let Some(record_batch) = reader.next_batch().await? {
let record_batch = if let Some(mapper) = mapper {
let batch = mapper.project(record_batch)?;
batch

View File

@@ -24,6 +24,7 @@ use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
use crate::sst::file::FileTimeRange;
use crate::sst::index::IndexOutput;
pub(crate) mod async_reader;
pub mod file_range;
pub mod flat_format;
pub mod format;

View File

@@ -0,0 +1,221 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Async file reader implementation for SST parquet files.
use std::ops::Range;
use std::sync::Arc;
use bytes::Bytes;
use futures::FutureExt;
use futures::future::BoxFuture;
use object_store::ObjectStore;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::errors::{ParquetError, Result as ParquetResult};
use parquet::file::metadata::ParquetMetaData;
use crate::cache::file_cache::{FileType, IndexKey};
use crate::cache::{CacheStrategy, PageKey, PageValue};
use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES};
use crate::sst::file::RegionFileId;
use crate::sst::parquet::helper::fetch_byte_ranges;
use crate::sst::parquet::row_group::{ParquetFetchMetrics, compute_total_range_size};
/// An [AsyncFileReader] implementation for SST parquet files.
///
/// This reader provides async byte access to parquet data in object storage,
/// with caching support (page cache and write cache).
pub struct SstAsyncFileReader {
/// Region file ID for cache key.
region_file_id: RegionFileId,
/// Path to the parquet file in object storage.
file_path: String,
/// Object store for reading data.
object_store: ObjectStore,
/// Cache strategy for reading pages.
cache_strategy: CacheStrategy,
/// Cached parquet metadata.
metadata: Arc<ParquetMetaData>,
/// Row group index for cache key.
row_group_idx: usize,
/// Optional metrics for tracking fetch operations.
fetch_metrics: Option<ParquetFetchMetrics>,
}
impl SstAsyncFileReader {
/// Creates a new [SstAsyncFileReader].
pub fn new(
region_file_id: RegionFileId,
file_path: String,
object_store: ObjectStore,
cache_strategy: CacheStrategy,
metadata: Arc<ParquetMetaData>,
row_group_idx: usize,
) -> Self {
Self {
region_file_id,
file_path,
object_store,
cache_strategy,
metadata,
row_group_idx,
fetch_metrics: None,
}
}
/// Sets the fetch metrics.
pub fn with_fetch_metrics(mut self, metrics: Option<ParquetFetchMetrics>) -> Self {
self.fetch_metrics = metrics;
self
}
/// Fetches byte ranges from page cache, write cache, or object store.
async fn fetch_bytes_with_cache(&self, ranges: Vec<Range<u64>>) -> ParquetResult<Vec<Bytes>> {
let fetch_start = self
.fetch_metrics
.as_ref()
.map(|_| std::time::Instant::now());
let _timer = READ_STAGE_FETCH_PAGES.start_timer();
let page_key = PageKey::new(
self.region_file_id.file_id(),
self.row_group_idx,
ranges.clone(),
);
// Check page cache first.
if let Some(pages) = self.cache_strategy.get_pages(&page_key) {
if let Some(metrics) = &self.fetch_metrics {
let total_size: u64 = ranges.iter().map(|r| r.end - r.start).sum();
let mut metrics_data = metrics.data.lock().unwrap();
metrics_data.page_cache_hit += 1;
metrics_data.pages_to_fetch_mem += ranges.len();
metrics_data.page_size_to_fetch_mem += total_size;
metrics_data.page_size_needed += total_size;
if let Some(start) = fetch_start {
metrics_data.total_fetch_elapsed += start.elapsed();
}
}
return Ok(pages.compressed.clone());
}
// Calculate total range size for metrics.
let (total_range_size, unaligned_size) = compute_total_range_size(&ranges);
// Check write cache.
let key = IndexKey::new(
self.region_file_id.region_id(),
self.region_file_id.file_id(),
FileType::Parquet,
);
let fetch_write_cache_start = self
.fetch_metrics
.as_ref()
.map(|_| std::time::Instant::now());
let write_cache_result = self.fetch_ranges_from_write_cache(key, &ranges).await;
let pages = match write_cache_result {
Some(data) => {
if let Some(metrics) = &self.fetch_metrics {
let elapsed = fetch_write_cache_start
.map(|start| start.elapsed())
.unwrap_or_default();
let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum();
let mut metrics_data = metrics.data.lock().unwrap();
metrics_data.write_cache_fetch_elapsed += elapsed;
metrics_data.write_cache_hit += 1;
metrics_data.pages_to_fetch_write_cache += ranges.len();
metrics_data.page_size_to_fetch_write_cache += unaligned_size;
metrics_data.page_size_needed += range_size_needed;
}
data
}
None => {
// Fetch data from object store.
let _timer = READ_STAGE_ELAPSED
.with_label_values(&["cache_miss_read"])
.start_timer();
let start = self
.fetch_metrics
.as_ref()
.map(|_| std::time::Instant::now());
let data = fetch_byte_ranges(&self.file_path, self.object_store.clone(), &ranges)
.await
.map_err(|e| ParquetError::External(Box::new(e)))?;
if let Some(metrics) = &self.fetch_metrics {
let elapsed = start.map(|start| start.elapsed()).unwrap_or_default();
let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum();
let mut metrics_data = metrics.data.lock().unwrap();
metrics_data.store_fetch_elapsed += elapsed;
metrics_data.cache_miss += 1;
metrics_data.pages_to_fetch_store += ranges.len();
metrics_data.page_size_to_fetch_store += unaligned_size;
metrics_data.page_size_needed += range_size_needed;
}
data
}
};
// Put pages back to the cache.
let page_value = PageValue::new(pages.clone(), total_range_size);
self.cache_strategy
.put_pages(page_key, Arc::new(page_value));
if let (Some(metrics), Some(start)) = (&self.fetch_metrics, fetch_start) {
metrics.data.lock().unwrap().total_fetch_elapsed += start.elapsed();
}
Ok(pages)
}
/// Fetches data from write cache.
/// Returns `None` if the data is not in the cache.
async fn fetch_ranges_from_write_cache(
&self,
key: IndexKey,
ranges: &[Range<u64>],
) -> Option<Vec<Bytes>> {
if let Some(cache) = self.cache_strategy.write_cache() {
return cache.file_cache().read_ranges(key, ranges).await;
}
None
}
}
impl AsyncFileReader for SstAsyncFileReader {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, ParquetResult<Bytes>> {
async move {
let mut result = self.fetch_bytes_with_cache(vec![range]).await?;
Ok(result.pop().unwrap_or_default())
}
.boxed()
}
fn get_byte_ranges(
&mut self,
ranges: Vec<Range<u64>>,
) -> BoxFuture<'_, ParquetResult<Vec<Bytes>>> {
async move { self.fetch_bytes_with_cache(ranges).await }.boxed()
}
fn get_metadata(
&mut self,
_options: Option<&parquet::arrow::arrow_reader::ArrowReaderOptions>,
) -> BoxFuture<'_, ParquetResult<Arc<ParquetMetaData>>> {
// Metadata is already cached, return it immediately.
std::future::ready(Ok(self.metadata.clone())).boxed()
}
}

View File

@@ -26,14 +26,15 @@ use common_telemetry::{tracing, warn};
use datafusion_expr::Expr;
use datatypes::arrow::array::ArrayRef;
use datatypes::arrow::datatypes::Field;
use datatypes::arrow::error::ArrowError;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::DataType;
use futures::StreamExt;
use mito_codec::row_converter::build_primary_key_codec;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
use parquet::arrow::ProjectionMask;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions, RowSelection};
use parquet::arrow::async_reader::{ParquetRecordBatchStream, ParquetRecordBatchStreamBuilder};
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
use partition::expr::PartitionExpr;
use snafu::ResultExt;
@@ -47,9 +48,7 @@ use crate::cache::index::result_cache::PredicateKey;
use crate::cache::{CacheStrategy, CachedSstMeta};
#[cfg(feature = "vector_index")]
use crate::error::ApplyVectorIndexSnafu;
use crate::error::{
ArrowReaderSnafu, ReadDataPartSnafu, ReadParquetSnafu, Result, SerializePartitionExprSnafu,
};
use crate::error::{ReadDataPartSnafu, ReadParquetSnafu, Result, SerializePartitionExprSnafu};
use crate::metrics::{
PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL,
READ_ROWS_TOTAL, READ_STAGE_ELAPSED,
@@ -70,13 +69,14 @@ use crate::sst::index::inverted_index::applier::{
#[cfg(feature = "vector_index")]
use crate::sst::index::vector_index::applier::VectorIndexApplierRef;
use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
use crate::sst::parquet::async_reader::SstAsyncFileReader;
use crate::sst::parquet::file_range::{
FileRangeContext, FileRangeContextRef, PartitionFilterContext, PreFilterMode, RangeBase,
row_group_contains_delete,
};
use crate::sst::parquet::format::{ReadFormat, need_override_sequence};
use crate::sst::parquet::metadata::MetadataLoader;
use crate::sst::parquet::row_group::{InMemoryRowGroup, ParquetFetchMetrics};
use crate::sst::parquet::row_group::ParquetFetchMetrics;
use crate::sst::parquet::row_selection::RowGroupSelection;
use crate::sst::parquet::stats::RowGroupPruningStats;
use crate::sst::tag_maybe_to_dictionary_field;
@@ -415,6 +415,12 @@ impl ParquetReaderBuilder {
.set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
}
// Computes the projection mask.
let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
let indices = read_format.projection_indices();
// Now we assumes we don't have nested schemas.
// TODO(yingwen): Revisit this if we introduce nested types such as JSON type.
let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied());
let selection = self
.row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
.await;
@@ -446,26 +452,20 @@ impl ParquetReaderBuilder {
.map(|meta| meta.schema.clone())
.unwrap_or_else(|| region_meta.schema.clone());
// Computes the projection mask.
let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
let indices = read_format.projection_indices();
// Now we assumes we don't have nested schemas.
// TODO(yingwen): Revisit this if we introduce nested types such as JSON type.
let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied());
// Computes the field levels.
let hint = Some(read_format.arrow_schema().fields());
let field_levels =
parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
// Create ArrowReaderMetadata for async stream building.
let arrow_reader_options =
ArrowReaderOptions::new().with_schema(read_format.arrow_schema().clone());
let arrow_metadata =
ArrowReaderMetadata::try_new(parquet_meta.clone(), arrow_reader_options)
.context(ReadDataPartSnafu)?;
let reader_builder = RowGroupReaderBuilder {
file_handle: self.file_handle.clone(),
file_path,
parquet_meta,
arrow_metadata,
object_store: self.object_store.clone(),
projection: projection_mask,
field_levels,
cache_strategy: self.cache_strategy.clone(),
};
@@ -1640,7 +1640,7 @@ impl ReaderMetrics {
}
}
/// Builder to build a [ParquetRecordBatchReader] for a row group.
/// Builder to build a [ParquetRecordBatchStream] for a row group.
pub(crate) struct RowGroupReaderBuilder {
/// SST file to read.
///
@@ -1650,12 +1650,12 @@ pub(crate) struct RowGroupReaderBuilder {
file_path: String,
/// Metadata of the parquet file.
parquet_meta: Arc<ParquetMetaData>,
/// Arrow reader metadata for building async stream.
arrow_metadata: ArrowReaderMetadata,
/// Object store as an Operator.
object_store: ObjectStore,
/// Projection mask.
projection: ProjectionMask,
/// Field levels to read.
field_levels: FieldLevels,
/// Cache.
cache_strategy: CacheStrategy,
}
@@ -1679,48 +1679,43 @@ impl RowGroupReaderBuilder {
&self.cache_strategy
}
/// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`.
/// Builds a [ParquetRecordBatchStream] to read the row group at `row_group_idx`.
pub(crate) async fn build(
&self,
row_group_idx: usize,
row_selection: Option<RowSelection>,
fetch_metrics: Option<&ParquetFetchMetrics>,
) -> Result<ParquetRecordBatchReader> {
let fetch_start = Instant::now();
let mut row_group = InMemoryRowGroup::create(
self.file_handle.region_id(),
self.file_handle.file_id().file_id(),
&self.parquet_meta,
row_group_idx,
self.cache_strategy.clone(),
&self.file_path,
) -> Result<ParquetRecordBatchStream<SstAsyncFileReader>> {
// Create async file reader with caching support.
let async_reader = SstAsyncFileReader::new(
self.file_handle.file_id(),
self.file_path.clone(),
self.object_store.clone(),
);
// Fetches data into memory.
row_group
.fetch(&self.projection, row_selection.as_ref(), fetch_metrics)
.await
.context(ReadParquetSnafu {
path: &self.file_path,
})?;
self.cache_strategy.clone(),
self.parquet_meta.clone(),
row_group_idx,
)
.with_fetch_metrics(fetch_metrics.cloned());
// Record total fetch elapsed time.
if let Some(metrics) = fetch_metrics {
metrics.data.lock().unwrap().total_fetch_elapsed += fetch_start.elapsed();
// Build the async stream using ArrowReaderBuilder API.
let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata(
async_reader,
self.arrow_metadata.clone(),
);
builder = builder
.with_row_groups(vec![row_group_idx])
.with_projection(self.projection.clone())
.with_batch_size(DEFAULT_READ_BATCH_SIZE);
if let Some(selection) = row_selection {
builder = builder.with_row_selection(selection);
}
// Builds the parquet reader.
// Now the row selection is None.
ParquetRecordBatchReader::try_new_with_row_groups(
&self.field_levels,
&row_group,
DEFAULT_READ_BATCH_SIZE,
row_selection,
)
.context(ReadParquetSnafu {
let stream = builder.build().context(ReadParquetSnafu {
path: &self.file_path,
})
})?;
Ok(stream)
}
}
@@ -1850,7 +1845,7 @@ impl ParquetReader {
pub async fn next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
loop {
if let Some(reader) = &mut self.reader {
if let Some(batch) = reader.next_batch()? {
if let Some(batch) = reader.next_batch().await? {
return Ok(Some(batch));
}
self.reader = None;
@@ -1929,27 +1924,19 @@ impl ParquetReader {
/// RowGroupReaderContext represents the fields that cannot be shared
/// between different `RowGroupReader`s.
pub(crate) trait RowGroupReaderContext: Send {
fn map_result(
&self,
result: std::result::Result<Option<RecordBatch>, ArrowError>,
) -> Result<Option<RecordBatch>>;
fn read_format(&self) -> &ReadFormat;
fn file_path(&self) -> &str;
}
impl RowGroupReaderContext for FileRangeContextRef {
fn map_result(
&self,
result: std::result::Result<Option<RecordBatch>, ArrowError>,
) -> Result<Option<RecordBatch>> {
result.context(ArrowReaderSnafu {
path: self.file_path(),
})
}
fn read_format(&self) -> &ReadFormat {
self.as_ref().read_format()
}
fn file_path(&self) -> &str {
self.as_ref().file_path()
}
}
/// [RowGroupReader] that reads from [FileRange].
@@ -1957,8 +1944,11 @@ pub(crate) type RowGroupReader = RowGroupReaderBase<FileRangeContextRef>;
impl RowGroupReader {
/// Creates a new reader from file range.
pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
Self::create(context, reader)
pub(crate) fn new(
context: FileRangeContextRef,
stream: ParquetRecordBatchStream<SstAsyncFileReader>,
) -> Self {
Self::create(context, stream)
}
}
@@ -1966,8 +1956,8 @@ impl RowGroupReader {
pub(crate) struct RowGroupReaderBase<T> {
/// Context of [RowGroupReader] so adapts to different underlying implementation.
context: T,
/// Inner parquet reader.
reader: ParquetRecordBatchReader,
/// Inner parquet record batch stream.
stream: ParquetRecordBatchStream<SstAsyncFileReader>,
/// Buffered batches to return.
batches: VecDeque<Batch>,
/// Local scan metrics.
@@ -1981,7 +1971,7 @@ where
T: RowGroupReaderContext,
{
/// Creates a new reader to read the primary key format.
pub(crate) fn create(context: T, reader: ParquetRecordBatchReader) -> Self {
pub(crate) fn create(context: T, stream: ParquetRecordBatchStream<SstAsyncFileReader>) -> Self {
// The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE.
let override_sequence = context
.read_format()
@@ -1990,7 +1980,7 @@ where
Self {
context,
reader,
stream,
batches: VecDeque::new(),
metrics: ReaderMetrics::default(),
override_sequence,
@@ -2007,13 +1997,18 @@ where
self.context.read_format()
}
/// Tries to fetch next [RecordBatch] from the reader.
fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
self.context.map_result(self.reader.next().transpose())
/// Tries to fetch next [RecordBatch] from the stream asynchronously.
async fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
match self.stream.next().await.transpose() {
Ok(batch) => Ok(batch),
Err(e) => Err(e).context(ReadParquetSnafu {
path: self.context.file_path(),
}),
}
}
/// Returns the next [Batch].
pub(crate) fn next_inner(&mut self) -> Result<Option<Batch>> {
pub(crate) async fn next_inner(&mut self) -> Result<Option<Batch>> {
let scan_start = Instant::now();
if let Some(batch) = self.batches.pop_front() {
self.metrics.num_rows += batch.num_rows();
@@ -2023,7 +2018,7 @@ where
// We need to fetch next record batch and convert it to batches.
while self.batches.is_empty() {
let Some(record_batch) = self.fetch_next_record_batch()? else {
let Some(record_batch) = self.fetch_next_record_batch().await? else {
self.metrics.scan_cost += scan_start.elapsed();
return Ok(None);
};
@@ -2051,10 +2046,10 @@ where
#[async_trait::async_trait]
impl<T> BatchReader for RowGroupReaderBase<T>
where
T: RowGroupReaderContext,
T: RowGroupReaderContext + Send + Sync,
{
async fn next_batch(&mut self) -> Result<Option<Batch>> {
self.next_inner()
self.next_inner().await
}
}
@@ -2062,15 +2057,18 @@ where
pub(crate) struct FlatRowGroupReader {
/// Context for file ranges.
context: FileRangeContextRef,
/// Inner parquet reader.
reader: ParquetRecordBatchReader,
/// Inner parquet record batch stream.
stream: ParquetRecordBatchStream<SstAsyncFileReader>,
/// Cached sequence array to override sequences.
override_sequence: Option<ArrayRef>,
}
impl FlatRowGroupReader {
/// Creates a new flat reader from file range.
pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
pub(crate) fn new(
context: FileRangeContextRef,
stream: ParquetRecordBatchStream<SstAsyncFileReader>,
) -> Self {
// The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE.
let override_sequence = context
.read_format()
@@ -2078,16 +2076,16 @@ impl FlatRowGroupReader {
Self {
context,
reader,
stream,
override_sequence,
}
}
/// Returns the next RecordBatch.
pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
match self.reader.next() {
pub(crate) async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
match self.stream.next().await {
Some(batch_result) => {
let record_batch = batch_result.context(ArrowReaderSnafu {
let record_batch = batch_result.context(ReadParquetSnafu {
path: self.context.file_path(),
})?;

View File

@@ -12,28 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Ports private structs from [parquet crate](https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/arrow/async_reader/mod.rs#L644-L650).
//! Parquet row group reading utilities.
use std::ops::Range;
use std::sync::Arc;
use bytes::{Buf, Bytes};
use object_store::ObjectStore;
use parquet::arrow::ProjectionMask;
use parquet::arrow::arrow_reader::{RowGroups, RowSelection};
use parquet::column::page::{PageIterator, PageReader};
use parquet::errors::{ParquetError, Result};
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
use parquet::file::page_index::offset_index::OffsetIndexMetaData;
use parquet::file::reader::{ChunkReader, Length};
use parquet::file::serialized_reader::SerializedPageReader;
use store_api::storage::{FileId, RegionId};
use tokio::task::yield_now;
use crate::cache::file_cache::{FileType, IndexKey};
use crate::cache::{CacheStrategy, PageKey, PageValue};
use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES};
use crate::sst::parquet::helper::{MERGE_GAP, fetch_byte_ranges};
use crate::sst::parquet::helper::MERGE_GAP;
/// Inner data for ParquetFetchMetrics.
#[derive(Default, Debug, Clone)]
@@ -74,9 +58,9 @@ impl ParquetFetchMetricsData {
}
/// Metrics for tracking page/row group fetch operations.
#[derive(Default)]
#[derive(Default, Clone)]
pub struct ParquetFetchMetrics {
pub data: std::sync::Mutex<ParquetFetchMetricsData>,
pub data: Arc<std::sync::Mutex<ParquetFetchMetricsData>>,
}
impl std::fmt::Debug for ParquetFetchMetrics {
@@ -204,363 +188,12 @@ impl ParquetFetchMetrics {
}
}
pub(crate) struct RowGroupBase<'a> {
parquet_metadata: &'a ParquetMetaData,
row_group_idx: usize,
pub(crate) offset_index: Option<&'a [OffsetIndexMetaData]>,
/// Compressed page of each column.
column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
pub(crate) row_count: usize,
}
impl<'a> RowGroupBase<'a> {
pub(crate) fn new(parquet_meta: &'a ParquetMetaData, row_group_idx: usize) -> Self {
let metadata = parquet_meta.row_group(row_group_idx);
// `offset_index` is always `None` if we don't set
// [with_page_index()](https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderOptions.html#method.with_page_index)
// to `true`.
let offset_index = parquet_meta
.offset_index()
// filter out empty offset indexes (old versions specified Some(vec![]) when no present)
.filter(|index| !index.is_empty())
.map(|x| x[row_group_idx].as_slice());
Self {
parquet_metadata: parquet_meta,
row_group_idx,
offset_index,
column_chunks: vec![None; metadata.columns().len()],
row_count: metadata.num_rows() as usize,
}
}
pub(crate) fn calc_sparse_read_ranges(
&self,
projection: &ProjectionMask,
offset_index: &[OffsetIndexMetaData],
selection: &RowSelection,
) -> (Vec<Range<u64>>, Vec<Vec<usize>>) {
// If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
// `RowSelection`
let mut page_start_offsets: Vec<Vec<usize>> = vec![];
let ranges = self
.column_chunks
.iter()
.zip(self.row_group_metadata().columns())
.enumerate()
.filter(|&(idx, (chunk, _chunk_meta))| chunk.is_none() && projection.leaf_included(idx))
.flat_map(|(idx, (_chunk, chunk_meta))| {
// If the first page does not start at the beginning of the column,
// then we need to also fetch a dictionary page.
let mut ranges = vec![];
let (start, _len) = chunk_meta.byte_range();
match offset_index[idx].page_locations.first() {
Some(first) if first.offset as u64 != start => {
ranges.push(start..first.offset as u64);
}
_ => (),
}
ranges.extend(
selection
.scan_ranges(&offset_index[idx].page_locations)
.iter()
.map(|range| range.start..range.end),
);
page_start_offsets.push(ranges.iter().map(|range| range.start as usize).collect());
ranges
})
.collect::<Vec<_>>();
(ranges, page_start_offsets)
}
pub(crate) fn assign_sparse_chunk(
&mut self,
projection: &ProjectionMask,
data: Vec<Bytes>,
page_start_offsets: Vec<Vec<usize>>,
) {
let mut page_start_offsets = page_start_offsets.into_iter();
let mut chunk_data = data.into_iter();
for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
if chunk.is_some() || !projection.leaf_included(idx) {
continue;
}
if let Some(offsets) = page_start_offsets.next() {
let mut chunks = Vec::with_capacity(offsets.len());
for _ in 0..offsets.len() {
chunks.push(chunk_data.next().unwrap());
}
let column = self
.parquet_metadata
.row_group(self.row_group_idx)
.column(idx);
*chunk = Some(Arc::new(ColumnChunkData::Sparse {
length: column.byte_range().1 as usize,
data: offsets.into_iter().zip(chunks).collect(),
}))
}
}
}
pub(crate) fn calc_dense_read_ranges(&self, projection: &ProjectionMask) -> Vec<Range<u64>> {
self.column_chunks
.iter()
.enumerate()
.filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
.map(|(idx, _chunk)| {
let column = self.row_group_metadata().column(idx);
let (start, length) = column.byte_range();
start..(start + length)
})
.collect::<Vec<_>>()
}
/// Assigns compressed chunk binary data to [RowGroupBase::column_chunks]
/// and returns the chunk offset and binary data assigned.
pub(crate) fn assign_dense_chunk(
&mut self,
projection: &ProjectionMask,
chunk_data: Vec<Bytes>,
) {
let mut chunk_data = chunk_data.into_iter();
for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
if chunk.is_some() || !projection.leaf_included(idx) {
continue;
}
// Get the fetched page.
let Some(data) = chunk_data.next() else {
continue;
};
let column = self
.parquet_metadata
.row_group(self.row_group_idx)
.column(idx);
*chunk = Some(Arc::new(ColumnChunkData::Dense {
offset: column.byte_range().0 as usize,
data,
}));
}
}
/// Create [PageReader] from [RowGroupBase::column_chunks]
pub(crate) fn column_reader(
&self,
col_idx: usize,
) -> Result<SerializedPageReader<ColumnChunkData>> {
let page_reader = match &self.column_chunks[col_idx] {
None => {
return Err(ParquetError::General(format!(
"Invalid column index {col_idx}, column was not fetched"
)));
}
Some(data) => {
let page_locations = self
.offset_index
// filter out empty offset indexes (old versions specified Some(vec![]) when no present)
.filter(|index| !index.is_empty())
.map(|index| index[col_idx].page_locations.clone());
SerializedPageReader::new(
data.clone(),
self.row_group_metadata().column(col_idx),
self.row_count,
page_locations,
)?
}
};
Ok(page_reader)
}
pub(crate) fn parquet_metadata(&self) -> &ParquetMetaData {
self.parquet_metadata
}
pub(crate) fn row_group_metadata(&self) -> &RowGroupMetaData {
self.parquet_metadata().row_group(self.row_group_idx)
}
}
/// An in-memory collection of column chunks
pub struct InMemoryRowGroup<'a> {
region_id: RegionId,
file_id: FileId,
row_group_idx: usize,
cache_strategy: CacheStrategy,
file_path: &'a str,
/// Object store.
object_store: ObjectStore,
base: RowGroupBase<'a>,
}
impl<'a> InMemoryRowGroup<'a> {
/// Creates a new [InMemoryRowGroup] by `row_group_idx`.
///
/// # Panics
/// Panics if the `row_group_idx` is invalid.
pub fn create(
region_id: RegionId,
file_id: FileId,
parquet_meta: &'a ParquetMetaData,
row_group_idx: usize,
cache_strategy: CacheStrategy,
file_path: &'a str,
object_store: ObjectStore,
) -> Self {
Self {
region_id,
file_id,
row_group_idx,
cache_strategy,
file_path,
object_store,
base: RowGroupBase::new(parquet_meta, row_group_idx),
}
}
/// Fetches the necessary column data into memory
pub async fn fetch(
&mut self,
projection: &ProjectionMask,
selection: Option<&RowSelection>,
metrics: Option<&ParquetFetchMetrics>,
) -> Result<()> {
if let Some((selection, offset_index)) = selection.zip(self.base.offset_index) {
let (fetch_ranges, page_start_offsets) =
self.base
.calc_sparse_read_ranges(projection, offset_index, selection);
let chunk_data = self.fetch_bytes(&fetch_ranges, metrics).await?;
// Assign sparse chunk data to base.
self.base
.assign_sparse_chunk(projection, chunk_data, page_start_offsets);
} else {
// Release the CPU to avoid blocking the runtime. Since `fetch_pages_from_cache`
// is a synchronous, CPU-bound operation.
yield_now().await;
// Calculate ranges to read.
let fetch_ranges = self.base.calc_dense_read_ranges(projection);
if fetch_ranges.is_empty() {
// Nothing to fetch.
return Ok(());
}
// Fetch data with ranges
let chunk_data = self.fetch_bytes(&fetch_ranges, metrics).await?;
// Assigns fetched data to base.
self.base.assign_dense_chunk(projection, chunk_data);
}
Ok(())
}
/// Try to fetch data from the memory cache or the WriteCache,
/// if not in WriteCache, fetch data from object store directly.
async fn fetch_bytes(
&self,
ranges: &[Range<u64>],
metrics: Option<&ParquetFetchMetrics>,
) -> Result<Vec<Bytes>> {
// Now fetch page timer includes the whole time to read pages.
let _timer = READ_STAGE_FETCH_PAGES.start_timer();
let page_key = PageKey::new(self.file_id, self.row_group_idx, ranges.to_vec());
if let Some(pages) = self.cache_strategy.get_pages(&page_key) {
if let Some(metrics) = metrics {
let total_size: u64 = ranges.iter().map(|r| r.end - r.start).sum();
let mut metrics_data = metrics.data.lock().unwrap();
metrics_data.page_cache_hit += 1;
metrics_data.pages_to_fetch_mem += ranges.len();
metrics_data.page_size_to_fetch_mem += total_size;
metrics_data.page_size_needed += total_size;
}
return Ok(pages.compressed.clone());
}
// Calculate total range size for metrics.
let (total_range_size, unaligned_size) = compute_total_range_size(ranges);
let key = IndexKey::new(self.region_id, self.file_id, FileType::Parquet);
let fetch_write_cache_start = metrics.map(|_| std::time::Instant::now());
let write_cache_result = self.fetch_ranges_from_write_cache(key, ranges).await;
let pages = match write_cache_result {
Some(data) => {
if let Some(metrics) = metrics {
let elapsed = fetch_write_cache_start
.map(|start| start.elapsed())
.unwrap_or_default();
let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum();
let mut metrics_data = metrics.data.lock().unwrap();
metrics_data.write_cache_fetch_elapsed += elapsed;
metrics_data.write_cache_hit += 1;
metrics_data.pages_to_fetch_write_cache += ranges.len();
metrics_data.page_size_to_fetch_write_cache += unaligned_size;
metrics_data.page_size_needed += range_size_needed;
}
data
}
None => {
// Fetch data from object store.
let _timer = READ_STAGE_ELAPSED
.with_label_values(&["cache_miss_read"])
.start_timer();
let start = metrics.map(|_| std::time::Instant::now());
let data = fetch_byte_ranges(self.file_path, self.object_store.clone(), ranges)
.await
.map_err(|e| ParquetError::External(Box::new(e)))?;
if let Some(metrics) = metrics {
let elapsed = start.map(|start| start.elapsed()).unwrap_or_default();
let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum();
let mut metrics_data = metrics.data.lock().unwrap();
metrics_data.store_fetch_elapsed += elapsed;
metrics_data.cache_miss += 1;
metrics_data.pages_to_fetch_store += ranges.len();
metrics_data.page_size_to_fetch_store += unaligned_size;
metrics_data.page_size_needed += range_size_needed;
}
data
}
};
// Put pages back to the cache.
let page_value = PageValue::new(pages.clone(), total_range_size);
self.cache_strategy
.put_pages(page_key, Arc::new(page_value));
Ok(pages)
}
/// Fetches data from write cache.
/// Returns `None` if the data is not in the cache.
async fn fetch_ranges_from_write_cache(
&self,
key: IndexKey,
ranges: &[Range<u64>],
) -> Option<Vec<Bytes>> {
if let Some(cache) = self.cache_strategy.write_cache() {
return cache.file_cache().read_ranges(key, ranges).await;
}
None
}
}
/// Computes the max possible buffer size to read the given `ranges`.
/// Returns (aligned_size, unaligned_size) where:
/// - aligned_size: total size aligned to pooled buffer size
/// - unaligned_size: actual total size without alignment
// See https://github.com/apache/opendal/blob/v0.54.0/core/src/types/read/reader.rs#L166-L192
fn compute_total_range_size(ranges: &[Range<u64>]) -> (u64, u64) {
pub(crate) fn compute_total_range_size(ranges: &[Range<u64>]) -> (u64, u64) {
if ranges.is_empty() {
return (0, 0);
}
@@ -602,96 +235,3 @@ fn align_to_pooled_buf_size(size: u64) -> u64 {
const POOLED_BUF_SIZE: u64 = 2 * 1024 * 1024;
size.div_ceil(POOLED_BUF_SIZE) * POOLED_BUF_SIZE
}
impl RowGroups for InMemoryRowGroup<'_> {
fn num_rows(&self) -> usize {
self.base.row_count
}
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
// Creates a page reader to read column at `i`.
let page_reader = self.base.column_reader(i)?;
Ok(Box::new(ColumnChunkIterator {
reader: Some(Ok(Box::new(page_reader))),
}))
}
fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
Box::new(std::iter::once(self.base.row_group_metadata()))
}
fn metadata(&self) -> &ParquetMetaData {
self.base.parquet_metadata()
}
}
/// An in-memory column chunk
#[derive(Clone)]
pub(crate) enum ColumnChunkData {
/// Column chunk data representing only a subset of data pages
Sparse {
/// Length of the full column chunk
length: usize,
/// Set of data pages included in this sparse chunk. Each element is a tuple
/// of (page offset, page data)
data: Vec<(usize, Bytes)>,
},
/// Full column chunk and its offset
Dense { offset: usize, data: Bytes },
}
impl ColumnChunkData {
fn get(&self, start: u64) -> Result<Bytes> {
match &self {
ColumnChunkData::Sparse { data, .. } => data
.binary_search_by_key(&start, |(offset, _)| *offset as u64)
.map(|idx| data[idx].1.clone())
.map_err(|_| {
ParquetError::General(format!(
"Invalid offset in sparse column chunk data: {start}"
))
}),
ColumnChunkData::Dense { offset, data } => {
let start = start as usize - *offset;
Ok(data.slice(start..))
}
}
}
}
impl Length for ColumnChunkData {
fn len(&self) -> u64 {
match &self {
ColumnChunkData::Sparse { length, .. } => *length as u64,
ColumnChunkData::Dense { data, .. } => data.len() as u64,
}
}
}
impl ChunkReader for ColumnChunkData {
type T = bytes::buf::Reader<Bytes>;
fn get_read(&self, start: u64) -> Result<Self::T> {
Ok(self.get(start)?.reader())
}
fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
Ok(self.get(start)?.slice(..length))
}
}
/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`]
pub(crate) struct ColumnChunkIterator {
pub(crate) reader: Option<Result<Box<dyn PageReader>>>,
}
impl Iterator for ColumnChunkIterator {
type Item = Result<Box<dyn PageReader>>;
fn next(&mut self) -> Option<Self::Item> {
self.reader.take()
}
}
impl PageIterator for ColumnChunkIterator {}