feat(mito): parquet memtable reader (#4967)

* wip: row group reader base

* wip: memtable row group reader

* Refactor MemtableRowGroupReader to streamline data fetching

 - Added early return when fetch_ranges is empty to optimize performance.
 - Replaced inline chunk data assignment with a call to `assign_dense_chunk` for cleaner code.

* wip: row group reader

* wip: reuse RowGroupReader

* wip: bulk part reader

* Enhance BulkPart Iteration with Filtering

 - Introduced `RangeBase` to `BulkIterContext` for improved filter handling.
 - Implemented filter application in `BulkPartIter` to prune batches based on predicates.
 - Updated `SimpleFilterContext::new_opt` to be public for broader access.

* chore: add prune test

* fix: clippy

* fix: introduce prune reader for memtable and add more prune test

* Enhance BulkPart read method to return Option<BoxedBatchIterator>

 - Modified `BulkPart::read` to return `Option<BoxedBatchIterator>` to handle cases where no row groups are selected.
 - Added logic to return `None` when all row groups are filtered out.
 - Updated tests to handle the new return type and added a test case to verify behavior when no row groups match the pr

* refactor/separate-paraquet-reader: Add helper function to parse parquet metadata and integrate it into BulkPartEncoder

* refactor/separate-paraquet-reader:
 Change BulkPartEncoder row_group_size from Option to usize and update tests

* refactor/separate-paraquet-reader: Add context module for bulk memtable iteration and refactor part reading

 • Introduce context module to encapsulate context for bulk memtable iteration.
 • Refactor BulkPart to use BulkIterContextRef for reading operations.
 • Remove redundant code in BulkPart by centralizing context creation and row group pruning logic in the new context module.
 • Create new file context.rs with structures and logic for handling iteration context.
 • Adjust part_reader.rs and row_group_reader.rs to reference the new BulkIterContextRef.

* refactor/separate-paraquet-reader: Refactor RowGroupReader traits and implementations in memtable and parquet reader modules

 • Rename RowGroupReaderVirtual to RowGroupReaderContext for clarity.
 • Replace BulkPartVirt with direct usage of BulkIterContextRef in MemtableRowGroupReader.
 • Simplify MemtableRowGroupReaderBuilder by directly passing context instead of creating a BulkPartVirt instance.
 • Update RowGroupReaderBase to use context field instead of virt, reflecting the trait renaming and usage.
 • Modify FileRangeVirt to FileRangeContextRef and adjust implementations accordingly.

* refactor/separate-paraquet-reader: Refactor column page reader creation and remove unused code

 • Centralize creation of SerializedPageReader in RowGroupBase::column_reader method.
 • Remove unused RowGroupCachedReader and related code from MemtableRowGroupPageFetcher.
 • Eliminate redundant error handling for invalid column index in multiple places.

* chore: rebase main and resolve conflicts

* fix: some comments

* chore: resolve conflicts

* chore: resolve conflicts
This commit is contained in:
Lei, HUANG
2024-12-24 17:59:26 +08:00
committed by Yingwen
parent dd3a509607
commit 31cfab81ad
10 changed files with 1061 additions and 245 deletions

View File

@@ -723,10 +723,20 @@ pub enum Error {
#[snafu(display("Failed to iter data part"))]
ReadDataPart {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: parquet::errors::ParquetError,
},
#[snafu(display("Failed to read row group in memtable"))]
DecodeArrowRowGroup {
#[snafu(source)]
error: ArrowError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid region options, {}", reason))]
InvalidRegionOptions {
reason: String,
@@ -1029,6 +1039,7 @@ impl ErrorExt for Error {
RegionBusy { .. } => StatusCode::RegionBusy,
GetSchemaMetadata { source, .. } => source.status_code(),
Timeout { .. } => StatusCode::Cancelled,
DecodeArrowRowGroup { .. } => StatusCode::Internal,
}
}

View File

@@ -27,8 +27,12 @@ use crate::memtable::{
BoxedBatchIterator, KeyValues, Memtable, MemtableId, MemtableRanges, MemtableRef, MemtableStats,
};
#[allow(unused)]
mod context;
#[allow(unused)]
pub(crate) mod part;
mod part_reader;
mod row_group_reader;
#[derive(Debug)]
pub struct BulkMemtable {

View File

@@ -0,0 +1,117 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Context for iterating bulk memtable.
use std::collections::VecDeque;
use std::sync::Arc;
use parquet::file::metadata::ParquetMetaData;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;
use crate::row_converter::McmpRowCodec;
use crate::sst::parquet::file_range::RangeBase;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::reader::SimpleFilterContext;
use crate::sst::parquet::stats::RowGroupPruningStats;
pub(crate) type BulkIterContextRef = Arc<BulkIterContext>;
pub(crate) struct BulkIterContext {
pub(crate) base: RangeBase,
pub(crate) predicate: Option<Predicate>,
}
impl BulkIterContext {
pub(crate) fn new(
region_metadata: RegionMetadataRef,
projection: &Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> Self {
let codec = McmpRowCodec::new_with_primary_keys(&region_metadata);
let simple_filters = predicate
.as_ref()
.iter()
.flat_map(|predicate| {
predicate
.exprs()
.iter()
.filter_map(|expr| SimpleFilterContext::new_opt(&region_metadata, None, expr))
})
.collect();
let read_format = build_read_format(region_metadata, projection);
Self {
base: RangeBase {
filters: simple_filters,
read_format,
codec,
// we don't need to compat batch since all batch in memtable have the same schema.
compat_batch: None,
},
predicate,
}
}
/// Prunes row groups by stats.
pub(crate) fn row_groups_to_read(&self, file_meta: &Arc<ParquetMetaData>) -> VecDeque<usize> {
let region_meta = self.base.read_format.metadata();
let row_groups = file_meta.row_groups();
// expected_metadata is set to None since we always expect region metadata of memtable is up-to-date.
let stats = RowGroupPruningStats::new(row_groups, &self.base.read_format, None);
if let Some(predicate) = self.predicate.as_ref() {
predicate
.prune_with_stats(&stats, region_meta.schema.arrow_schema())
.iter()
.zip(0..file_meta.num_row_groups())
.filter_map(|(selected, row_group)| {
if !*selected {
return None;
}
Some(row_group)
})
.collect::<VecDeque<_>>()
} else {
(0..file_meta.num_row_groups()).collect()
}
}
pub(crate) fn read_format(&self) -> &ReadFormat {
&self.base.read_format
}
}
fn build_read_format(
region_metadata: RegionMetadataRef,
projection: &Option<&[ColumnId]>,
) -> ReadFormat {
let read_format = if let Some(column_ids) = &projection {
ReadFormat::new(region_metadata, column_ids.iter().copied())
} else {
// No projection, lists all column ids to read.
ReadFormat::new(
region_metadata.clone(),
region_metadata
.column_metadatas
.iter()
.map(|col| col.column_id),
)
};
read_format
}

View File

@@ -13,10 +13,12 @@
// limitations under the License.
//! Bulk part encoder/decoder.
use std::collections::VecDeque;
use std::sync::Arc;
use api::v1::Mutation;
use bytes::Bytes;
use common_time::timestamp::TimeUnit;
use datafusion::arrow::array::{TimestampNanosecondArray, UInt64Builder};
use datatypes::arrow;
@@ -26,93 +28,145 @@ use datatypes::arrow::array::{
UInt8Builder,
};
use datatypes::arrow::compute::TakeOptions;
use datatypes::arrow::datatypes::{DataType as ArrowDataType, SchemaRef};
use datatypes::arrow::datatypes::SchemaRef;
use datatypes::arrow_array::BinaryArray;
use datatypes::data_type::DataType;
use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
use datatypes::types::TimestampType;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use parquet::arrow::ArrowWriter;
use parquet::data_type::AsBytes;
use parquet::file::metadata::ParquetMetaData;
use parquet::file::properties::WriterProperties;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;
use crate::error;
use crate::error::{ComputeArrowSnafu, EncodeMemtableSnafu, NewRecordBatchSnafu, Result};
use crate::memtable::bulk::context::BulkIterContextRef;
use crate::memtable::bulk::part_reader::BulkPartIter;
use crate::memtable::key_values::KeyValuesRef;
use crate::read::Batch;
use crate::memtable::BoxedBatchIterator;
use crate::row_converter::{McmpRowCodec, RowCodec};
use crate::sst::parquet::format::PrimaryKeyArray;
use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
use crate::sst::parquet::helper::parse_parquet_metadata;
use crate::sst::to_sst_arrow_schema;
#[derive(Debug)]
pub struct BulkPart {
data: Vec<u8>,
data: Bytes,
metadata: BulkPartMeta,
}
impl BulkPart {
pub fn new(data: Vec<u8>, metadata: BulkPartMeta) -> Self {
pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
Self { data, metadata }
}
pub(crate) fn metadata(&self) -> &BulkPartMeta {
&self.metadata
}
pub(crate) fn read(&self, context: BulkIterContextRef) -> Result<Option<BoxedBatchIterator>> {
// use predicate to find row groups to read.
let row_groups_to_read = context.row_groups_to_read(&self.metadata.parquet_metadata);
if row_groups_to_read.is_empty() {
// All row groups are filtered.
return Ok(None);
}
let iter = BulkPartIter::try_new(
context,
row_groups_to_read,
self.metadata.parquet_metadata.clone(),
self.data.clone(),
)?;
Ok(Some(Box::new(iter) as BoxedBatchIterator))
}
}
#[derive(Debug)]
pub struct BulkPartMeta {
/// Total rows in part.
pub num_rows: usize,
/// Max timestamp in part.
pub max_timestamp: i64,
/// Min timestamp in part.
pub min_timestamp: i64,
}
impl Default for BulkPartMeta {
fn default() -> Self {
Self {
num_rows: 0,
max_timestamp: i64::MIN,
min_timestamp: i64::MAX,
}
}
/// Part file metadata.
pub parquet_metadata: Arc<ParquetMetaData>,
/// Part region schema.
pub region_metadata: RegionMetadataRef,
}
pub struct BulkPartEncoder {
metadata: RegionMetadataRef,
arrow_schema: SchemaRef,
pk_encoder: McmpRowCodec,
row_group_size: usize,
dedup: bool,
writer_props: Option<WriterProperties>,
}
impl BulkPartEncoder {
pub(crate) fn new(
metadata: RegionMetadataRef,
dedup: bool,
row_group_size: usize,
) -> BulkPartEncoder {
let codec = McmpRowCodec::new_with_primary_keys(&metadata);
let writer_props = Some(
WriterProperties::builder()
.set_write_batch_size(row_group_size)
.set_max_row_group_size(row_group_size)
.build(),
);
Self {
metadata,
pk_encoder: codec,
row_group_size,
dedup,
writer_props,
}
}
}
impl BulkPartEncoder {
/// Encodes mutations to a [BulkPart], returns true if encoded data has been written to `dest`.
fn encode_mutations(&self, mutations: &[Mutation], dest: &mut BulkPart) -> Result<bool> {
fn encode_mutations(&self, mutations: &[Mutation]) -> Result<Option<BulkPart>> {
let Some((arrow_record_batch, min_ts, max_ts)) =
mutations_to_record_batch(mutations, &self.metadata, &self.pk_encoder, false)?
mutations_to_record_batch(mutations, &self.metadata, &self.pk_encoder, self.dedup)?
else {
return Ok(false);
return Ok(None);
};
let mut buf = Vec::with_capacity(4096);
let arrow_schema = arrow_record_batch.schema();
{
let mut writer = ArrowWriter::try_new(&mut dest.data, arrow_schema, None)
.context(EncodeMemtableSnafu)?;
let file_metadata = {
let mut writer =
ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
.context(EncodeMemtableSnafu)?;
writer
.write(&arrow_record_batch)
.context(EncodeMemtableSnafu)?;
let _metadata = writer.finish().context(EncodeMemtableSnafu)?;
}
dest.metadata = BulkPartMeta {
num_rows: arrow_record_batch.num_rows(),
max_timestamp: max_ts,
min_timestamp: min_ts,
writer.finish().context(EncodeMemtableSnafu)?
};
Ok(true)
}
/// Decodes [BulkPart] to [Batch]es.
fn decode_to_batches(&self, _part: &BulkPart, _dest: &mut VecDeque<Batch>) -> Result<()> {
todo!()
let buf = Bytes::from(buf);
let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
Ok(Some(BulkPart {
data: buf,
metadata: BulkPartMeta {
num_rows: arrow_record_batch.num_rows(),
max_timestamp: max_ts,
min_timestamp: min_ts,
parquet_metadata,
region_metadata: self.metadata.clone(),
},
}))
}
}
@@ -379,10 +433,12 @@ fn binary_array_to_dictionary(input: &BinaryArray) -> Result<PrimaryKeyArray> {
mod tests {
use std::collections::VecDeque;
use datafusion_common::ScalarValue;
use datatypes::prelude::{ScalarVector, Value};
use datatypes::vectors::{Float64Vector, TimestampMillisecondVector};
use super::*;
use crate::memtable::bulk::context::BulkIterContext;
use crate::sst::parquet::format::ReadFormat;
use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
@@ -444,7 +500,7 @@ mod tests {
k0: &'a str,
k1: u32,
timestamps: &'a [i64],
v0: &'a [Option<f64>],
v1: &'a [Option<f64>],
sequence: u64,
}
@@ -452,7 +508,7 @@ mod tests {
struct BatchOutput<'a> {
pk_values: &'a [Value],
timestamps: &'a [i64],
v0: &'a [Option<f64>],
v1: &'a [Option<f64>],
}
fn check_mutations_to_record_batches(
@@ -470,7 +526,7 @@ mod tests {
m.k0.to_string(),
m.k1,
m.timestamps.iter().copied(),
m.v0.iter().copied(),
m.v1.iter().copied(),
m.sequence,
)
.mutation
@@ -526,7 +582,7 @@ mod tests {
for idx in 0..expected.len() {
assert_eq!(expected[idx].pk_values, &batch_values[idx].0);
assert_eq!(expected[idx].timestamps, &batch_values[idx].1);
assert_eq!(expected[idx].v0, &batch_values[idx].2);
assert_eq!(expected[idx].v1, &batch_values[idx].2);
}
}
@@ -537,13 +593,13 @@ mod tests {
k0: "a",
k1: 0,
timestamps: &[0],
v0: &[Some(0.1)],
v1: &[Some(0.1)],
sequence: 0,
}],
&[BatchOutput {
pk_values: &[Value::String("a".into()), Value::UInt32(0)],
timestamps: &[0],
v0: &[Some(0.1)],
v1: &[Some(0.1)],
}],
(0, 0),
true,
@@ -555,28 +611,28 @@ mod tests {
k0: "a",
k1: 0,
timestamps: &[0],
v0: &[Some(0.1)],
v1: &[Some(0.1)],
sequence: 0,
},
MutationInput {
k0: "b",
k1: 0,
timestamps: &[0],
v0: &[Some(0.0)],
v1: &[Some(0.0)],
sequence: 0,
},
MutationInput {
k0: "a",
k1: 0,
timestamps: &[1],
v0: &[Some(0.2)],
v1: &[Some(0.2)],
sequence: 1,
},
MutationInput {
k0: "a",
k1: 1,
timestamps: &[1],
v0: &[Some(0.3)],
v1: &[Some(0.3)],
sequence: 2,
},
],
@@ -584,17 +640,17 @@ mod tests {
BatchOutput {
pk_values: &[Value::String("a".into()), Value::UInt32(0)],
timestamps: &[0, 1],
v0: &[Some(0.1), Some(0.2)],
v1: &[Some(0.1), Some(0.2)],
},
BatchOutput {
pk_values: &[Value::String("a".into()), Value::UInt32(1)],
timestamps: &[1],
v0: &[Some(0.3)],
v1: &[Some(0.3)],
},
BatchOutput {
pk_values: &[Value::String("b".into()), Value::UInt32(0)],
timestamps: &[0],
v0: &[Some(0.0)],
v1: &[Some(0.0)],
},
],
(0, 1),
@@ -607,21 +663,21 @@ mod tests {
k0: "a",
k1: 0,
timestamps: &[0],
v0: &[Some(0.1)],
v1: &[Some(0.1)],
sequence: 0,
},
MutationInput {
k0: "b",
k1: 0,
timestamps: &[0],
v0: &[Some(0.0)],
v1: &[Some(0.0)],
sequence: 0,
},
MutationInput {
k0: "a",
k1: 0,
timestamps: &[0],
v0: &[Some(0.2)],
v1: &[Some(0.2)],
sequence: 1,
},
],
@@ -629,12 +685,12 @@ mod tests {
BatchOutput {
pk_values: &[Value::String("a".into()), Value::UInt32(0)],
timestamps: &[0],
v0: &[Some(0.2)],
v1: &[Some(0.2)],
},
BatchOutput {
pk_values: &[Value::String("b".into()), Value::UInt32(0)],
timestamps: &[0],
v0: &[Some(0.0)],
v1: &[Some(0.0)],
},
],
(0, 0),
@@ -646,21 +702,21 @@ mod tests {
k0: "a",
k1: 0,
timestamps: &[0],
v0: &[Some(0.1)],
v1: &[Some(0.1)],
sequence: 0,
},
MutationInput {
k0: "b",
k1: 0,
timestamps: &[0],
v0: &[Some(0.0)],
v1: &[Some(0.0)],
sequence: 0,
},
MutationInput {
k0: "a",
k1: 0,
timestamps: &[0],
v0: &[Some(0.2)],
v1: &[Some(0.2)],
sequence: 1,
},
],
@@ -668,16 +724,194 @@ mod tests {
BatchOutput {
pk_values: &[Value::String("a".into()), Value::UInt32(0)],
timestamps: &[0, 0],
v0: &[Some(0.2), Some(0.1)],
v1: &[Some(0.2), Some(0.1)],
},
BatchOutput {
pk_values: &[Value::String("b".into()), Value::UInt32(0)],
timestamps: &[0],
v0: &[Some(0.0)],
v1: &[Some(0.0)],
},
],
(0, 0),
false,
);
}
fn encode(input: &[MutationInput]) -> BulkPart {
let metadata = metadata_for_test();
let mutations = input
.iter()
.map(|m| {
build_key_values_with_ts_seq_values(
&metadata,
m.k0.to_string(),
m.k1,
m.timestamps.iter().copied(),
m.v1.iter().copied(),
m.sequence,
)
.mutation
})
.collect::<Vec<_>>();
let encoder = BulkPartEncoder::new(metadata, true, 1024);
encoder.encode_mutations(&mutations).unwrap().unwrap()
}
#[test]
fn test_write_and_read_part_projection() {
let part = encode(&[
MutationInput {
k0: "a",
k1: 0,
timestamps: &[1],
v1: &[Some(0.1)],
sequence: 0,
},
MutationInput {
k0: "b",
k1: 0,
timestamps: &[1],
v1: &[Some(0.0)],
sequence: 0,
},
MutationInput {
k0: "a",
k1: 0,
timestamps: &[2],
v1: &[Some(0.2)],
sequence: 1,
},
]);
let projection = &[4u32];
let mut reader = part
.read(Arc::new(BulkIterContext::new(
part.metadata.region_metadata.clone(),
&Some(projection.as_slice()),
None,
)))
.unwrap()
.expect("expect at least one row group");
let mut total_rows_read = 0;
let mut field = vec![];
for res in reader {
let batch = res.unwrap();
assert_eq!(1, batch.fields().len());
assert_eq!(4, batch.fields()[0].column_id);
field.extend(
batch.fields()[0]
.data
.as_any()
.downcast_ref::<Float64Vector>()
.unwrap()
.iter_data()
.map(|v| v.unwrap()),
);
total_rows_read += batch.num_rows();
}
assert_eq!(3, total_rows_read);
assert_eq!(vec![0.1, 0.2, 0.0], field);
}
fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> BulkPart {
let metadata = metadata_for_test();
let mutations = key_values
.into_iter()
.map(|(k0, k1, (start, end), sequence)| {
let ts = (start..end);
let v1 = (start..end).map(|_| None);
build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
.mutation
})
.collect::<Vec<_>>();
let encoder = BulkPartEncoder::new(metadata, true, 100);
encoder.encode_mutations(&mutations).unwrap().unwrap()
}
fn check_prune_row_group(part: &BulkPart, predicate: Option<Predicate>, expected_rows: usize) {
let context = Arc::new(BulkIterContext::new(
part.metadata.region_metadata.clone(),
&None,
predicate,
));
let mut reader = part
.read(context)
.unwrap()
.expect("expect at least one row group");
let mut total_rows_read = 0;
for res in reader {
let batch = res.unwrap();
total_rows_read += batch.num_rows();
}
// Should only read row group 1.
assert_eq!(expected_rows, total_rows_read);
}
#[test]
fn test_prune_row_groups() {
let part = prepare(vec![
("a", 0, (0, 40), 1),
("a", 1, (0, 60), 1),
("b", 0, (0, 100), 2),
("b", 1, (100, 180), 3),
("b", 1, (180, 210), 4),
]);
let context = Arc::new(BulkIterContext::new(
part.metadata.region_metadata.clone(),
&None,
Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
)])),
));
assert!(part.read(context).unwrap().is_none());
check_prune_row_group(&part, None, 310);
check_prune_row_group(
&part,
Some(Predicate::new(vec![
datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
])),
40,
);
check_prune_row_group(
&part,
Some(Predicate::new(vec![
datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
])),
60,
);
check_prune_row_group(
&part,
Some(Predicate::new(vec![
datafusion_expr::col("k0").eq(datafusion_expr::lit("a"))
])),
100,
);
check_prune_row_group(
&part,
Some(Predicate::new(vec![
datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
])),
100,
);
/// Predicates over field column can do precise filtering.
check_prune_row_group(
&part,
Some(Predicate::new(vec![
datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64))
])),
1,
);
}
}

View File

@@ -0,0 +1,149 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::VecDeque;
use std::sync::Arc;
use bytes::Bytes;
use parquet::arrow::ProjectionMask;
use parquet::file::metadata::ParquetMetaData;
use crate::error;
use crate::memtable::bulk::context::BulkIterContextRef;
use crate::memtable::bulk::row_group_reader::{
MemtableRowGroupReader, MemtableRowGroupReaderBuilder,
};
use crate::read::Batch;
/// Iterator for reading data inside a bulk part.
pub struct BulkPartIter {
row_groups_to_read: VecDeque<usize>,
current_reader: Option<PruneReader>,
builder: MemtableRowGroupReaderBuilder,
}
impl BulkPartIter {
/// Creates a new [BulkPartIter].
pub(crate) fn try_new(
context: BulkIterContextRef,
mut row_groups_to_read: VecDeque<usize>,
parquet_meta: Arc<ParquetMetaData>,
data: Bytes,
) -> error::Result<Self> {
let projection_mask = ProjectionMask::roots(
parquet_meta.file_metadata().schema_descr(),
context.read_format().projection_indices().iter().copied(),
);
let builder = MemtableRowGroupReaderBuilder::try_new(
context.clone(),
projection_mask,
parquet_meta,
data,
)?;
let init_reader = row_groups_to_read
.pop_front()
.map(|first_row_group| builder.build_row_group_reader(first_row_group, None))
.transpose()?
.map(|r| PruneReader::new(context, r));
Ok(Self {
row_groups_to_read,
current_reader: init_reader,
builder,
})
}
pub(crate) fn next_batch(&mut self) -> error::Result<Option<Batch>> {
let Some(current) = &mut self.current_reader else {
// All row group exhausted.
return Ok(None);
};
if let Some(batch) = current.next_batch()? {
return Ok(Some(batch));
}
// Previous row group exhausted, read next row group
while let Some(next_row_group) = self.row_groups_to_read.pop_front() {
current.reset(self.builder.build_row_group_reader(next_row_group, None)?);
if let Some(next_batch) = current.next_batch()? {
return Ok(Some(next_batch));
}
}
Ok(None)
}
}
impl Iterator for BulkPartIter {
type Item = error::Result<Batch>;
fn next(&mut self) -> Option<Self::Item> {
self.next_batch().transpose()
}
}
struct PruneReader {
context: BulkIterContextRef,
row_group_reader: MemtableRowGroupReader,
}
//todo(hl): maybe we also need to support lastrow mode here.
impl PruneReader {
fn new(context: BulkIterContextRef, reader: MemtableRowGroupReader) -> Self {
Self {
context,
row_group_reader: reader,
}
}
/// Iterates current inner reader until exhausted.
fn next_batch(&mut self) -> error::Result<Option<Batch>> {
while let Some(b) = self.row_group_reader.next_inner()? {
match self.prune(b)? {
Some(b) => {
return Ok(Some(b));
}
None => {
continue;
}
}
}
Ok(None)
}
/// Prunes batch according to filters.
fn prune(&mut self, batch: Batch) -> error::Result<Option<Batch>> {
//todo(hl): add metrics.
// fast path
if self.context.base.filters.is_empty() {
return Ok(Some(batch));
}
let Some(batch_filtered) = self.context.base.precise_filter(batch)? else {
// the entire batch is filtered out
return Ok(None);
};
if !batch_filtered.is_empty() {
Ok(Some(batch_filtered))
} else {
Ok(None)
}
}
fn reset(&mut self, reader: MemtableRowGroupReader) {
self.row_group_reader = reader;
}
}

View File

@@ -0,0 +1,189 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::Range;
use std::sync::Arc;
use bytes::Bytes;
use datatypes::arrow::array::RecordBatch;
use datatypes::arrow::error::ArrowError;
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowGroups, RowSelection};
use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
use parquet::column::page::{PageIterator, PageReader};
use parquet::file::metadata::ParquetMetaData;
use snafu::ResultExt;
use crate::error;
use crate::error::ReadDataPartSnafu;
use crate::memtable::bulk::context::BulkIterContextRef;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::reader::{RowGroupReaderBase, RowGroupReaderContext};
use crate::sst::parquet::row_group::{ColumnChunkIterator, RowGroupBase};
use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
/// Helper for reading specific row group inside Memtable Parquet parts.
// This is similar to [mito2::sst::parquet::row_group::InMemoryRowGroup] since
// it's a workaround for lacking of keyword generics.
pub struct MemtableRowGroupPageFetcher<'a> {
/// Shared structs for reading row group.
base: RowGroupBase<'a>,
bytes: Bytes,
}
impl<'a> MemtableRowGroupPageFetcher<'a> {
pub(crate) fn create(
row_group_idx: usize,
parquet_meta: &'a ParquetMetaData,
bytes: Bytes,
) -> Self {
let metadata = parquet_meta.row_group(row_group_idx);
let row_count = metadata.num_rows() as usize;
let page_locations = parquet_meta
.offset_index()
.map(|x| x[row_group_idx].as_slice());
Self {
base: RowGroupBase {
metadata,
page_locations,
row_count,
column_chunks: vec![None; metadata.columns().len()],
// the cached `column_uncompressed_pages` would never be used in Memtable readers.
column_uncompressed_pages: vec![None; metadata.columns().len()],
},
bytes,
}
}
/// Fetches column pages from memory file.
pub(crate) fn fetch(&mut self, projection: &ProjectionMask, selection: Option<&RowSelection>) {
if let Some((selection, page_locations)) = selection.zip(self.base.page_locations) {
// Selection provided.
let (fetch_ranges, page_start_offsets) =
self.base
.calc_sparse_read_ranges(projection, page_locations, selection);
if fetch_ranges.is_empty() {
return;
}
let chunk_data = self.fetch_bytes(&fetch_ranges);
self.base
.assign_sparse_chunk(projection, chunk_data, page_start_offsets);
} else {
let fetch_ranges = self.base.calc_dense_read_ranges(projection);
if fetch_ranges.is_empty() {
// Nothing to fetch.
return;
}
let chunk_data = self.fetch_bytes(&fetch_ranges);
self.base.assign_dense_chunk(projection, chunk_data);
}
}
fn fetch_bytes(&self, ranges: &[Range<u64>]) -> Vec<Bytes> {
ranges
.iter()
.map(|range| self.bytes.slice(range.start as usize..range.end as usize))
.collect()
}
/// Creates a page reader to read column at `i`.
fn column_page_reader(&self, i: usize) -> parquet::errors::Result<Box<dyn PageReader>> {
let reader = self.base.column_reader(i)?;
Ok(Box::new(reader))
}
}
impl RowGroups for MemtableRowGroupPageFetcher<'_> {
fn num_rows(&self) -> usize {
self.base.row_count
}
fn column_chunks(&self, i: usize) -> parquet::errors::Result<Box<dyn PageIterator>> {
Ok(Box::new(ColumnChunkIterator {
reader: Some(self.column_page_reader(i)),
}))
}
}
impl RowGroupReaderContext for BulkIterContextRef {
fn map_result(
&self,
result: Result<Option<RecordBatch>, ArrowError>,
) -> error::Result<Option<RecordBatch>> {
result.context(error::DecodeArrowRowGroupSnafu)
}
fn read_format(&self) -> &ReadFormat {
self.as_ref().read_format()
}
}
pub(crate) type MemtableRowGroupReader = RowGroupReaderBase<BulkIterContextRef>;
pub(crate) struct MemtableRowGroupReaderBuilder {
context: BulkIterContextRef,
projection: ProjectionMask,
parquet_metadata: Arc<ParquetMetaData>,
field_levels: FieldLevels,
data: Bytes,
}
impl MemtableRowGroupReaderBuilder {
pub(crate) fn try_new(
context: BulkIterContextRef,
projection: ProjectionMask,
parquet_metadata: Arc<ParquetMetaData>,
data: Bytes,
) -> error::Result<Self> {
let parquet_schema_desc = parquet_metadata.file_metadata().schema_descr();
let hint = Some(context.read_format().arrow_schema().fields());
let field_levels =
parquet_to_arrow_field_levels(parquet_schema_desc, projection.clone(), hint)
.context(ReadDataPartSnafu)?;
Ok(Self {
context,
projection,
parquet_metadata,
field_levels,
data,
})
}
/// Builds a reader to read the row group at `row_group_idx` from memory.
pub(crate) fn build_row_group_reader(
&self,
row_group_idx: usize,
row_selection: Option<RowSelection>,
) -> error::Result<MemtableRowGroupReader> {
let mut row_group = MemtableRowGroupPageFetcher::create(
row_group_idx,
&self.parquet_metadata,
self.data.clone(),
);
// Fetches data from memory part. Currently, row selection is not supported.
row_group.fetch(&self.projection, row_selection.as_ref());
// Builds the parquet reader.
// Now the row selection is None.
let reader = ParquetRecordBatchReader::try_new_with_row_groups(
&self.field_levels,
&row_group,
DEFAULT_READ_BATCH_SIZE,
row_selection,
)
.context(ReadDataPartSnafu)?;
Ok(MemtableRowGroupReader::create(self.context.clone(), reader))
}
}

View File

@@ -99,11 +99,8 @@ impl RowGroupLastRowCachedReader {
return Self::new_miss(key, row_group_reader, None);
};
if let Some(value) = cache_manager.get_selector_result(&key) {
let schema_matches = value.projection
== row_group_reader
.context()
.read_format()
.projection_indices();
let schema_matches =
value.projection == row_group_reader.read_format().projection_indices();
if schema_matches {
// Schema matches, use cache batches.
Self::new_hit(value)
@@ -218,29 +215,23 @@ impl RowGroupLastRowReader {
};
// All last rows in row group are yielded, update cache.
self.update_cache();
self.maybe_update_cache();
Ok(last_batch)
}
/// Updates row group's last row cache if cache manager is present.
fn update_cache(&mut self) {
if self.yielded_batches.is_empty() {
// we always expect that row groups yields batches.
return;
fn maybe_update_cache(&mut self) {
if let Some(cache) = &self.cache_manager {
if self.yielded_batches.is_empty() {
// we always expect that row groups yields batches.
return;
}
let value = Arc::new(SelectorResultValue {
result: std::mem::take(&mut self.yielded_batches),
projection: self.reader.read_format().projection_indices().to_vec(),
});
cache.put_selector_result(self.key, value)
}
let Some(cache) = &self.cache_manager else {
return;
};
let value = Arc::new(SelectorResultValue {
result: std::mem::take(&mut self.yielded_batches),
projection: self
.reader
.context()
.read_format()
.projection_indices()
.to_vec(),
});
cache.put_selector_result(self.key, value);
}
fn metrics(&self) -> &ReaderMetrics {

View File

@@ -27,11 +27,11 @@ pub(crate) mod file_range;
pub mod format;
pub(crate) mod helper;
pub(crate) mod metadata;
mod page_reader;
pub(crate) mod page_reader;
pub mod reader;
pub mod row_group;
mod row_selection;
mod stats;
pub(crate) mod stats;
pub mod writer;
/// Key of metadata in parquet SST.

View File

@@ -24,6 +24,7 @@ use async_trait::async_trait;
use common_recordbatch::filter::SimpleFilterEvaluator;
use common_telemetry::{debug, warn};
use datafusion_expr::Expr;
use datatypes::arrow::error::ArrowError;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::data_type::ConcreteDataType;
use itertools::Itertools;
@@ -39,7 +40,8 @@ use table::predicate::Predicate;
use crate::cache::CacheManagerRef;
use crate::error::{
ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadParquetSnafu, Result,
ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu,
ReadParquetSnafu, Result,
};
use crate::metrics::{
PRECISE_FILTER_ROWS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_TOTAL,
@@ -207,8 +209,7 @@ impl ParquetReaderBuilder {
let hint = Some(read_format.arrow_schema().fields());
let field_levels =
parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
.context(ReadParquetSnafu { path: &file_path })?;
.context(ReadDataPartSnafu)?;
let row_groups = self
.row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
.await;
@@ -871,7 +872,7 @@ impl SimpleFilterContext {
///
/// Returns None if the column to filter doesn't exist in the SST metadata or the
/// expected metadata.
fn new_opt(
pub(crate) fn new_opt(
sst_meta: &RegionMetadataRef,
expected_meta: Option<&RegionMetadata>,
expr: &Expr,
@@ -1035,10 +1036,51 @@ impl ParquetReader {
}
}
/// RowGroupReaderContext represents the fields that cannot be shared
/// between different `RowGroupReader`s.
pub(crate) trait RowGroupReaderContext: Send {
fn map_result(
&self,
result: std::result::Result<Option<RecordBatch>, ArrowError>,
) -> Result<Option<RecordBatch>>;
fn read_format(&self) -> &ReadFormat;
}
impl RowGroupReaderContext for FileRangeContextRef {
fn map_result(
&self,
result: std::result::Result<Option<RecordBatch>, ArrowError>,
) -> Result<Option<RecordBatch>> {
result.context(ArrowReaderSnafu {
path: self.file_path(),
})
}
fn read_format(&self) -> &ReadFormat {
self.as_ref().read_format()
}
}
/// [RowGroupReader] that reads from [FileRange].
pub(crate) type RowGroupReader = RowGroupReaderBase<FileRangeContextRef>;
impl RowGroupReader {
/// Creates a new reader from file range.
pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
Self {
context,
reader,
batches: VecDeque::new(),
metrics: ReaderMetrics::default(),
}
}
}
/// Reader to read a row group of a parquet file.
pub struct RowGroupReader {
/// Context for file ranges.
context: FileRangeContextRef,
pub(crate) struct RowGroupReaderBase<T> {
/// Context of [RowGroupReader] so adapts to different underlying implementation.
context: T,
/// Inner parquet reader.
reader: ParquetRecordBatchReader,
/// Buffered batches to return.
@@ -1047,9 +1089,12 @@ pub struct RowGroupReader {
metrics: ReaderMetrics,
}
impl RowGroupReader {
impl<T> RowGroupReaderBase<T>
where
T: RowGroupReaderContext,
{
/// Creates a new reader.
pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
pub(crate) fn create(context: T, reader: ParquetRecordBatchReader) -> Self {
Self {
context,
reader,
@@ -1062,21 +1107,19 @@ impl RowGroupReader {
pub(crate) fn metrics(&self) -> &ReaderMetrics {
&self.metrics
}
pub(crate) fn context(&self) -> &FileRangeContextRef {
&self.context
/// Gets [ReadFormat] of underlying reader.
pub(crate) fn read_format(&self) -> &ReadFormat {
self.context.read_format()
}
/// Tries to fetch next [RecordBatch] from the reader.
fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
self.reader.next().transpose().context(ArrowReaderSnafu {
path: self.context.file_path(),
})
self.context.map_result(self.reader.next().transpose())
}
}
#[async_trait::async_trait]
impl BatchReader for RowGroupReader {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
/// Returns the next [Batch].
pub(crate) fn next_inner(&mut self) -> Result<Option<Batch>> {
let scan_start = Instant::now();
if let Some(batch) = self.batches.pop_front() {
self.metrics.num_rows += batch.num_rows();
@@ -1104,6 +1147,16 @@ impl BatchReader for RowGroupReader {
}
}
#[async_trait::async_trait]
impl<T> BatchReader for RowGroupReaderBase<T>
where
T: RowGroupReaderContext,
{
async fn next_batch(&mut self) -> Result<Option<Batch>> {
self.next_inner()
}
}
#[cfg(test)]
mod tests {
use parquet::arrow::arrow_reader::RowSelector;

View File

@@ -38,25 +38,196 @@ use crate::sst::file::FileId;
use crate::sst::parquet::helper::fetch_byte_ranges;
use crate::sst::parquet::page_reader::RowGroupCachedReader;
/// An in-memory collection of column chunks
pub struct InMemoryRowGroup<'a> {
metadata: &'a RowGroupMetaData,
page_locations: Option<&'a [Vec<PageLocation>]>,
pub(crate) struct RowGroupBase<'a> {
pub(crate) metadata: &'a RowGroupMetaData,
pub(crate) page_locations: Option<&'a [Vec<PageLocation>]>,
/// Compressed page of each column.
column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
row_count: usize,
region_id: RegionId,
file_id: FileId,
row_group_idx: usize,
cache_manager: Option<CacheManagerRef>,
pub(crate) column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
pub(crate) row_count: usize,
/// Row group level cached pages for each column.
///
/// These pages are uncompressed pages of a row group.
/// `column_uncompressed_pages.len()` equals to `column_chunks.len()`.
column_uncompressed_pages: Vec<Option<Arc<PageValue>>>,
pub(crate) column_uncompressed_pages: Vec<Option<Arc<PageValue>>>,
}
impl<'a> RowGroupBase<'a> {
pub(crate) fn new(parquet_meta: &'a ParquetMetaData, row_group_idx: usize) -> Self {
let metadata = parquet_meta.row_group(row_group_idx);
// `page_locations` is always `None` if we don't set
// [with_page_index()](https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderOptions.html#method.with_page_index)
// to `true`.
let page_locations = parquet_meta
.offset_index()
.map(|x| x[row_group_idx].as_slice());
Self {
metadata,
page_locations,
column_chunks: vec![None; metadata.columns().len()],
row_count: metadata.num_rows() as usize,
column_uncompressed_pages: vec![None; metadata.columns().len()],
}
}
pub(crate) fn calc_sparse_read_ranges(
&self,
projection: &ProjectionMask,
page_locations: &[Vec<PageLocation>],
selection: &RowSelection,
) -> (Vec<Range<u64>>, Vec<Vec<usize>>) {
// If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
// `RowSelection`
let mut page_start_offsets: Vec<Vec<usize>> = vec![];
let ranges = self
.column_chunks
.iter()
.zip(self.metadata.columns())
.enumerate()
.filter(|&(idx, (chunk, _chunk_meta))| chunk.is_none() && projection.leaf_included(idx))
.flat_map(|(idx, (_chunk, chunk_meta))| {
// If the first page does not start at the beginning of the column,
// then we need to also fetch a dictionary page.
let mut ranges = vec![];
let (start, _len) = chunk_meta.byte_range();
match page_locations[idx].first() {
Some(first) if first.offset as u64 != start => {
ranges.push(start..first.offset as u64);
}
_ => (),
}
ranges.extend(
selection
.scan_ranges(&page_locations[idx])
.iter()
.map(|range| range.start as u64..range.end as u64),
);
page_start_offsets.push(ranges.iter().map(|range| range.start as usize).collect());
ranges
})
.collect::<Vec<_>>();
(ranges, page_start_offsets)
}
pub(crate) fn assign_sparse_chunk(
&mut self,
projection: &ProjectionMask,
data: Vec<Bytes>,
page_start_offsets: Vec<Vec<usize>>,
) {
let mut page_start_offsets = page_start_offsets.into_iter();
let mut chunk_data = data.into_iter();
for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
if chunk.is_some() || !projection.leaf_included(idx) {
continue;
}
if let Some(offsets) = page_start_offsets.next() {
let mut chunks = Vec::with_capacity(offsets.len());
for _ in 0..offsets.len() {
chunks.push(chunk_data.next().unwrap());
}
*chunk = Some(Arc::new(ColumnChunkData::Sparse {
length: self.metadata.column(idx).byte_range().1 as usize,
data: offsets.into_iter().zip(chunks).collect(),
}))
}
}
}
pub(crate) fn calc_dense_read_ranges(&self, projection: &ProjectionMask) -> Vec<Range<u64>> {
self.column_chunks
.iter()
.zip(&self.column_uncompressed_pages)
.enumerate()
.filter(|&(idx, (chunk, uncompressed_pages))| {
// Don't need to fetch column data if we already cache the column's pages.
chunk.is_none() && projection.leaf_included(idx) && uncompressed_pages.is_none()
})
.map(|(idx, (_chunk, _pages))| {
let column = self.metadata.column(idx);
let (start, length) = column.byte_range();
start..(start + length)
})
.collect::<Vec<_>>()
}
/// Assigns uncompressed chunk binary data to [RowGroupBase::column_chunks]
/// and returns the chunk offset and binary data assigned.
pub(crate) fn assign_dense_chunk(
&mut self,
projection: &ProjectionMask,
chunk_data: Vec<Bytes>,
) -> Vec<(usize, Bytes)> {
let mut chunk_data = chunk_data.into_iter();
let mut res = vec![];
for (idx, (chunk, row_group_pages)) in self
.column_chunks
.iter_mut()
.zip(&self.column_uncompressed_pages)
.enumerate()
{
if chunk.is_some() || !projection.leaf_included(idx) || row_group_pages.is_some() {
continue;
}
// Get the fetched page.
let Some(data) = chunk_data.next() else {
continue;
};
let column = self.metadata.column(idx);
res.push((idx, data.clone()));
*chunk = Some(Arc::new(ColumnChunkData::Dense {
offset: column.byte_range().0 as usize,
data,
}));
}
res
}
/// Create [PageReader] from [RowGroupBase::column_chunks]
pub(crate) fn column_reader(
&self,
col_idx: usize,
) -> Result<SerializedPageReader<ColumnChunkData>> {
let page_reader = match &self.column_chunks[col_idx] {
None => {
return Err(ParquetError::General(format!(
"Invalid column index {col_idx}, column was not fetched"
)))
}
Some(data) => {
let page_locations = self.page_locations.map(|index| index[col_idx].clone());
SerializedPageReader::new(
data.clone(),
self.metadata.column(col_idx),
self.row_count,
page_locations,
)?
}
};
// This column don't cache uncompressed pages.
Ok(page_reader)
}
}
/// An in-memory collection of column chunks
pub struct InMemoryRowGroup<'a> {
region_id: RegionId,
file_id: FileId,
row_group_idx: usize,
cache_manager: Option<CacheManagerRef>,
file_path: &'a str,
/// Object store.
object_store: ObjectStore,
base: RowGroupBase<'a>,
}
impl<'a> InMemoryRowGroup<'a> {
@@ -73,24 +244,12 @@ impl<'a> InMemoryRowGroup<'a> {
file_path: &'a str,
object_store: ObjectStore,
) -> Self {
let metadata = parquet_meta.row_group(row_group_idx);
// `page_locations` is always `None` if we don't set
// [with_page_index()](https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderOptions.html#method.with_page_index)
// to `true`.
let page_locations = parquet_meta
.offset_index()
.map(|x| x[row_group_idx].as_slice());
Self {
metadata,
row_count: metadata.num_rows() as usize,
column_chunks: vec![None; metadata.columns().len()],
page_locations,
base: RowGroupBase::new(parquet_meta, row_group_idx),
region_id,
file_id,
row_group_idx,
cache_manager,
column_uncompressed_pages: vec![None; metadata.columns().len()],
file_path,
object_store,
}
@@ -102,65 +261,15 @@ impl<'a> InMemoryRowGroup<'a> {
projection: &ProjectionMask,
selection: Option<&RowSelection>,
) -> Result<()> {
if let Some((selection, page_locations)) = selection.zip(self.page_locations) {
// If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
// `RowSelection`
let mut page_start_offsets: Vec<Vec<usize>> = vec![];
if let Some((selection, page_locations)) = selection.zip(self.base.page_locations) {
let (fetch_ranges, page_start_offsets) =
self.base
.calc_sparse_read_ranges(projection, page_locations, selection);
let fetch_ranges = self
.column_chunks
.iter()
.zip(self.metadata.columns())
.enumerate()
.filter(|&(idx, (chunk, _chunk_meta))| {
chunk.is_none() && projection.leaf_included(idx)
})
.flat_map(|(idx, (_chunk, chunk_meta))| {
// If the first page does not start at the beginning of the column,
// then we need to also fetch a dictionary page.
let mut ranges = vec![];
let (start, _len) = chunk_meta.byte_range();
match page_locations[idx].first() {
Some(first) if first.offset as u64 != start => {
ranges.push(start..first.offset as u64);
}
_ => (),
}
ranges.extend(
selection
.scan_ranges(&page_locations[idx])
.iter()
.map(|range| range.start as u64..range.end as u64),
);
page_start_offsets
.push(ranges.iter().map(|range| range.start as usize).collect());
ranges
})
.collect::<Vec<_>>();
let mut chunk_data = self.fetch_bytes(&fetch_ranges).await?.into_iter();
let mut page_start_offsets = page_start_offsets.into_iter();
for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
if chunk.is_some() || !projection.leaf_included(idx) {
continue;
}
if let Some(offsets) = page_start_offsets.next() {
let mut chunks = Vec::with_capacity(offsets.len());
for _ in 0..offsets.len() {
chunks.push(chunk_data.next().unwrap());
}
*chunk = Some(Arc::new(ColumnChunkData::Sparse {
length: self.metadata.column(idx).byte_range().1 as usize,
data: offsets.into_iter().zip(chunks).collect(),
}))
}
}
let chunk_data = self.fetch_bytes(&fetch_ranges).await?;
// Assign sparse chunk data to base.
self.base
.assign_sparse_chunk(projection, chunk_data, page_start_offsets);
} else {
// Now we only use cache in dense chunk data.
self.fetch_pages_from_cache(projection);
@@ -169,46 +278,24 @@ impl<'a> InMemoryRowGroup<'a> {
// is a synchronous, CPU-bound operation.
yield_now().await;
let fetch_ranges = self
.column_chunks
.iter()
.zip(&self.column_uncompressed_pages)
.enumerate()
.filter(|&(idx, (chunk, uncompressed_pages))| {
// Don't need to fetch column data if we already cache the column's pages.
chunk.is_none() && projection.leaf_included(idx) && uncompressed_pages.is_none()
})
.map(|(idx, (_chunk, _pages))| {
let column = self.metadata.column(idx);
let (start, length) = column.byte_range();
start..(start + length)
})
.collect::<Vec<_>>();
// Calculate ranges to read.
let fetch_ranges = self.base.calc_dense_read_ranges(projection);
if fetch_ranges.is_empty() {
// Nothing to fetch.
return Ok(());
}
let mut chunk_data = self.fetch_bytes(&fetch_ranges).await?.into_iter();
// Fetch data with ranges
let chunk_data = self.fetch_bytes(&fetch_ranges).await?;
for (idx, (chunk, row_group_pages)) in self
.column_chunks
.iter_mut()
.zip(&self.column_uncompressed_pages)
.enumerate()
{
if chunk.is_some() || !projection.leaf_included(idx) || row_group_pages.is_some() {
continue;
}
// Assigns fetched data to base.
let assigned_columns = self.base.assign_dense_chunk(projection, chunk_data);
// Get the fetched page.
let Some(data) = chunk_data.next() else {
continue;
};
let column = self.metadata.column(idx);
if let Some(cache) = &self.cache_manager {
// Put fetched data to cache if necessary.
if let Some(cache) = &self.cache_manager {
for (col_idx, data) in assigned_columns {
let column = self.base.metadata.column(col_idx);
if !cache_uncompressed_pages(column) {
// For columns that have multiple uncompressed pages, we only cache the compressed page
// to save memory.
@@ -216,17 +303,12 @@ impl<'a> InMemoryRowGroup<'a> {
self.region_id,
self.file_id,
self.row_group_idx,
idx,
col_idx,
);
cache
.put_pages(page_key, Arc::new(PageValue::new_compressed(data.clone())));
}
}
*chunk = Some(Arc::new(ColumnChunkData::Dense {
offset: column.byte_range().0 as usize,
data,
}));
}
}
@@ -237,7 +319,8 @@ impl<'a> InMemoryRowGroup<'a> {
/// If the page is in the cache, sets the column chunk or `column_uncompressed_pages` for the column.
fn fetch_pages_from_cache(&mut self, projection: &ProjectionMask) {
let _timer = READ_STAGE_FETCH_PAGES.start_timer();
self.column_chunks
self.base
.column_chunks
.iter_mut()
.enumerate()
.filter(|(idx, chunk)| chunk.is_none() && projection.leaf_included(*idx))
@@ -245,7 +328,7 @@ impl<'a> InMemoryRowGroup<'a> {
let Some(cache) = &self.cache_manager else {
return;
};
let column = self.metadata.column(idx);
let column = self.base.metadata.column(idx);
if cache_uncompressed_pages(column) {
// Fetches uncompressed pages for the row group.
let page_key = PageKey::new_uncompressed(
@@ -254,7 +337,7 @@ impl<'a> InMemoryRowGroup<'a> {
self.row_group_idx,
idx,
);
self.column_uncompressed_pages[idx] = cache.get_pages(&page_key);
self.base.column_uncompressed_pages[idx] = cache.get_pages(&page_key);
} else {
// Fetches the compressed page from the cache.
let page_key = PageKey::new_compressed(
@@ -308,34 +391,19 @@ impl<'a> InMemoryRowGroup<'a> {
/// Creates a page reader to read column at `i`.
fn column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
if let Some(cached_pages) = &self.column_uncompressed_pages[i] {
if let Some(cached_pages) = &self.base.column_uncompressed_pages[i] {
debug_assert!(!cached_pages.row_group.is_empty());
// Hits the row group level page cache.
return Ok(Box::new(RowGroupCachedReader::new(&cached_pages.row_group)));
}
let page_reader = match &self.column_chunks[i] {
None => {
return Err(ParquetError::General(format!(
"Invalid column index {i}, column was not fetched"
)))
}
Some(data) => {
let page_locations = self.page_locations.map(|index| index[i].clone());
SerializedPageReader::new(
data.clone(),
self.metadata.column(i),
self.row_count,
page_locations,
)?
}
};
let page_reader = self.base.column_reader(i)?;
let Some(cache) = &self.cache_manager else {
return Ok(Box::new(page_reader));
};
let column = self.metadata.column(i);
let column = self.base.metadata.column(i);
if cache_uncompressed_pages(column) {
// This column use row group level page cache.
// We collect all pages and put them into the cache.
@@ -362,7 +430,7 @@ fn cache_uncompressed_pages(column: &ColumnChunkMetaData) -> bool {
impl RowGroups for InMemoryRowGroup<'_> {
fn num_rows(&self) -> usize {
self.row_count
self.base.row_count
}
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
@@ -430,8 +498,8 @@ impl ChunkReader for ColumnChunkData {
}
/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`]
struct ColumnChunkIterator {
reader: Option<Result<Box<dyn PageReader>>>,
pub(crate) struct ColumnChunkIterator {
pub(crate) reader: Option<Result<Box<dyn PageReader>>>,
}
impl Iterator for ColumnChunkIterator {