feat: prefilter flat parquet scans by primary key

This commit is contained in:
Ruihang Xia
2026-03-22 01:37:36 +08:00
parent fe45ae446c
commit 767c3b44c8
10 changed files with 1196 additions and 159 deletions

View File

@@ -616,6 +616,15 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to read arrow record batch from parquet file {}", path))]
ArrowReader {
path: String,
#[snafu(source)]
error: ArrowError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Column not found, column: {column}"))]
ColumnNotFound {
column: String,
@@ -1340,6 +1349,7 @@ impl ErrorExt for Error {
RegionState { .. } | UpdateManifest { .. } => StatusCode::RegionNotReady,
JsonOptions { .. } => StatusCode::InvalidArguments,
EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound,
ArrowReader { .. } => StatusCode::StorageUnavailable,
ConvertValue { source, .. } => source.status_code(),
ApplyBloomFilterIndex { source, .. } => source.status_code(),
InvalidPartitionExpr { source, .. } => source.status_code(),

View File

@@ -90,6 +90,7 @@ impl BulkIterContext {
Ok(Self {
base: RangeBase {
filters: simple_filters,
primary_key_filters: None,
dyn_filters,
read_format,
prune_schema: region_metadata.schema.clone(),

View File

@@ -333,10 +333,10 @@ impl FlatRowGroupLastRowCachedReader {
}
/// Returns the next RecordBatch.
pub(crate) async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
match self {
FlatRowGroupLastRowCachedReader::Hit(r) => r.next_batch(),
FlatRowGroupLastRowCachedReader::Miss(r) => r.next_batch().await,
FlatRowGroupLastRowCachedReader::Miss(r) => r.next_batch(),
}
}
@@ -466,12 +466,12 @@ impl FlatRowGroupLastRowReader {
Ok(Some(merged))
}
async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
if self.pending.is_full() {
return self.flush_pending();
}
while let Some(batch) = self.reader.next_batch().await? {
while let Some(batch) = self.reader.next_batch()? {
self.selector.on_next(batch, &mut self.pending)?;
if self.pending.is_full() {
return self.flush_pending();

View File

@@ -19,15 +19,19 @@ use common_recordbatch::filter::SimpleFilterEvaluator;
use common_time::Timestamp;
use datatypes::arrow::array::BooleanArray;
use datatypes::arrow::buffer::BooleanBuffer;
use datatypes::arrow::compute::concat_batches;
use datatypes::arrow::record_batch::RecordBatch;
use snafu::ResultExt;
use mito_codec::row_converter::PrimaryKeyFilter;
use snafu::{OptionExt, ResultExt};
use crate::error::{RecordBatchSnafu, Result};
use crate::error::{ComputeArrowSnafu, RecordBatchSnafu, Result, UnexpectedSnafu};
use crate::memtable::BoxedBatchIterator;
use crate::read::last_row::{FlatRowGroupLastRowCachedReader, RowGroupLastRowCachedReader};
use crate::read::{Batch, BatchReader};
use crate::sst::file::FileTimeRange;
use crate::sst::parquet::file_range::FileRangeContextRef;
use crate::sst::parquet::flat_format::primary_key_column_index;
use crate::sst::parquet::format::PrimaryKeyArray;
use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics, RowGroupReader};
pub enum Source {
@@ -247,12 +251,80 @@ pub enum FlatSource {
}
impl FlatSource {
async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
fn next_raw_batch(&mut self) -> Result<Option<RecordBatch>> {
match self {
FlatSource::RowGroup(r) => r.next_batch().await,
FlatSource::LastRow(r) => r.next_batch().await,
FlatSource::RowGroup(r) => r.next_raw_batch(),
FlatSource::LastRow(r) => r.next_batch(),
}
}
fn convert_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
match self {
FlatSource::RowGroup(r) => r.convert_batch(batch),
FlatSource::LastRow(_) => Ok(batch),
}
}
}
struct CachedPrimaryKeyFilter {
inner: Box<dyn PrimaryKeyFilter>,
last_primary_key: Vec<u8>,
last_match: Option<bool>,
}
impl CachedPrimaryKeyFilter {
fn new(inner: Box<dyn PrimaryKeyFilter>) -> Self {
Self {
inner,
last_primary_key: Vec::new(),
last_match: None,
}
}
}
impl PrimaryKeyFilter for CachedPrimaryKeyFilter {
fn matches(&mut self, pk: &[u8]) -> bool {
if let Some(last_match) = self.last_match
&& self.last_primary_key == pk
{
return last_match;
}
let matched = self.inner.matches(pk);
self.last_primary_key.clear();
self.last_primary_key.extend_from_slice(pk);
self.last_match = Some(matched);
matched
}
}
fn batch_single_primary_key(batch: &RecordBatch) -> Result<Option<&[u8]>> {
let primary_key_index = primary_key_column_index(batch.num_columns());
let pk_dict_array = batch
.column(primary_key_index)
.as_any()
.downcast_ref::<PrimaryKeyArray>()
.context(UnexpectedSnafu {
reason: "Primary key column is not a dictionary array".to_string(),
})?;
let pk_values = pk_dict_array
.values()
.as_any()
.downcast_ref::<datatypes::arrow::array::BinaryArray>()
.context(UnexpectedSnafu {
reason: "Primary key values are not binary array".to_string(),
})?;
let keys = pk_dict_array.keys();
if keys.is_empty() {
return Ok(None);
}
let first_key = keys.value(0);
if first_key != keys.value(keys.len() - 1) {
return Ok(None);
}
Ok(Some(pk_values.value(first_key as usize)))
}
/// A flat format reader that returns RecordBatch instead of Batch.
@@ -260,6 +332,8 @@ pub struct FlatPruneReader {
/// Context for file ranges.
context: FileRangeContextRef,
source: FlatSource,
primary_key_filter: Option<CachedPrimaryKeyFilter>,
buffered_prefiltered_batch: Option<RecordBatch>,
metrics: ReaderMetrics,
/// Whether to skip field filters for this row group.
skip_fields: bool,
@@ -272,6 +346,10 @@ impl FlatPruneReader {
skip_fields: bool,
) -> Self {
Self {
primary_key_filter: ctx
.new_primary_key_filter()
.map(CachedPrimaryKeyFilter::new),
buffered_prefiltered_batch: None,
context: ctx,
source: FlatSource::RowGroup(reader),
metrics: Default::default(),
@@ -285,6 +363,8 @@ impl FlatPruneReader {
skip_fields: bool,
) -> Self {
Self {
primary_key_filter: None,
buffered_prefiltered_batch: None,
context: ctx,
source: FlatSource::LastRow(reader),
metrics: Default::default(),
@@ -297,20 +377,18 @@ impl FlatPruneReader {
self.metrics.clone()
}
pub(crate) async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
loop {
let start = std::time::Instant::now();
let batch = self.source.next_batch().await?;
self.metrics.scan_cost += start.elapsed();
let Some(record_batch) = batch else {
let Some(mut raw_batch) = self.next_prefiltered_batch()? else {
return Ok(None);
};
// Update metrics for the received batch
self.metrics.num_rows += record_batch.num_rows();
self.metrics.num_batches += 1;
let scan_start = std::time::Instant::now();
self.coalesce_prefiltered_batches(&mut raw_batch)?;
let record_batch = self.source.convert_batch(raw_batch)?;
self.metrics.scan_cost += scan_start.elapsed();
self.metrics.num_batches += 1;
match self.prune_flat(record_batch)? {
Some(filtered_batch) => {
return Ok(Some(filtered_batch));
@@ -322,6 +400,68 @@ impl FlatPruneReader {
}
}
fn next_prefiltered_batch(&mut self) -> Result<Option<RecordBatch>> {
if let Some(batch) = self.buffered_prefiltered_batch.take() {
return Ok(Some(batch));
}
loop {
let start = std::time::Instant::now();
let Some(raw_batch) = self.source.next_raw_batch()? else {
return Ok(None);
};
self.metrics.num_rows += raw_batch.num_rows();
let num_rows_before_prefilter = raw_batch.num_rows();
let Some(prefiltered_batch) = self.prefilter_primary_keys(raw_batch)? else {
self.metrics.scan_cost += start.elapsed();
self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_prefilter;
continue;
};
let prefiltered_rows = num_rows_before_prefilter - prefiltered_batch.num_rows();
self.metrics.filter_metrics.rows_precise_filtered += prefiltered_rows;
self.metrics.scan_cost += start.elapsed();
return Ok(Some(prefiltered_batch));
}
}
fn coalesce_prefiltered_batches(&mut self, batch: &mut RecordBatch) -> Result<()> {
let Some(primary_key) = batch_single_primary_key(batch)? else {
return Ok(());
};
let primary_key = primary_key.to_vec();
let schema = batch.schema();
let mut batches: Vec<RecordBatch> = Vec::new();
while let Some(next_batch) = self.next_prefiltered_batch()? {
if batch_single_primary_key(&next_batch)? == Some(primary_key.as_slice()) {
if batches.is_empty() {
batches.push(batch.clone());
}
batches.push(next_batch);
} else {
self.buffered_prefiltered_batch = Some(next_batch);
break;
}
}
if !batches.is_empty() {
*batch = concat_batches(&schema, &batches).context(ComputeArrowSnafu)?;
}
Ok(())
}
fn prefilter_primary_keys(&mut self, record_batch: RecordBatch) -> Result<Option<RecordBatch>> {
let Some(primary_key_filter) = self.primary_key_filter.as_mut() else {
return Ok(Some(record_batch));
};
self.context
.prefilter_flat_batch_by_primary_key(record_batch, primary_key_filter)
}
/// Prunes batches by the pushed down predicate and returns RecordBatch.
fn prune_flat(&mut self, record_batch: RecordBatch) -> Result<Option<RecordBatch>> {
// fast path

View File

@@ -1533,7 +1533,7 @@ pub fn build_flat_file_range_scan_stream(
.transpose()?;
let mapper = range.compaction_projection_mapper();
while let Some(record_batch) = reader.next_batch().await? {
while let Some(record_batch) = reader.next_batch()? {
let record_batch = if let Some(mapper) = mapper {
let batch = mapper.project(record_batch)?;
batch

View File

@@ -41,9 +41,17 @@ pub mod writer;
pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
/// Default batch size to read parquet files.
pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
///
/// This is a runtime-only scan granularity, so we align it with DataFusion's
/// default execution batch size to reduce rebatching and concatenation in the
/// query pipeline.
pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 8 * 1024;
/// Default row group size for parquet files.
pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
///
/// Keep the existing persisted/on-disk default stable. It intentionally stays
/// decoupled from [`DEFAULT_READ_BATCH_SIZE`] so we can tune runtime scan
/// batching without changing the row group layout of newly written SSTs.
pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * 1024;
/// Parquet write options.
#[derive(Debug, Clone)]

View File

@@ -16,10 +16,11 @@
//! is usually a row group in a parquet file.
use std::collections::HashMap;
use std::ops::BitAnd;
use std::ops::{BitAnd, Range};
use std::sync::Arc;
use api::v1::{OpType, SemanticType};
use common_recordbatch::filter::SimpleFilterEvaluator;
use common_telemetry::error;
use datafusion::physical_plan::PhysicalExpr;
use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
@@ -28,17 +29,18 @@ use datatypes::arrow::buffer::BooleanBuffer;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::Schema;
use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
use mito_codec::primary_key_filter::is_partition_column;
use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec, PrimaryKeyFilter};
use parquet::arrow::arrow_reader::RowSelection;
use parquet::file::metadata::ParquetMetaData;
use snafu::{OptionExt, ResultExt};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadataRef;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::{ColumnId, TimeSeriesRowSelector};
use table::predicate::Predicate;
use crate::error::{
ComputeArrowSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu,
ArrowReaderSnafu, ComputeArrowSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu,
EvalPartitionFilterSnafu, NewRecordBatchSnafu, RecordBatchSnafu, Result, StatsNotPresentSnafu,
UnexpectedSnafu,
};
@@ -49,13 +51,14 @@ use crate::read::last_row::{FlatRowGroupLastRowCachedReader, RowGroupLastRowCach
use crate::read::prune::{FlatPruneReader, PruneReader};
use crate::sst::file::FileHandle;
use crate::sst::parquet::flat_format::{
DecodedPrimaryKeys, decode_primary_keys, time_index_column_index,
DecodedPrimaryKeys, decode_primary_keys, primary_key_column_index, time_index_column_index,
};
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
use crate::sst::parquet::reader::{
FlatRowGroupReader, MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext,
};
use crate::sst::parquet::row_group::ParquetFetchMetrics;
use crate::sst::parquet::row_group::{InMemoryRowGroup, ParquetFetchMetrics};
use crate::sst::parquet::row_selection::row_selection_from_row_ranges;
use crate::sst::parquet::stats::RowGroupPruningStats;
/// Checks if a row group contains delete operations by examining the min value of op_type column.
@@ -144,7 +147,7 @@ impl FileRange {
std::slice::from_ref(curr_row_group),
read_format,
self.context.base.expected_metadata.clone(),
self.compute_skip_fields(),
self.context.should_skip_fields(self.row_group_idx),
);
// not costly to create a predicate here since dynamic filters are wrapped in Arc
@@ -156,22 +159,6 @@ impl FileRange {
.unwrap_or(true) // unexpected, not skip just in case
}
fn compute_skip_fields(&self) -> bool {
match self.context.base.pre_filter_mode {
PreFilterMode::All => false,
PreFilterMode::SkipFields => true,
PreFilterMode::SkipFieldsOnDelete => {
// Check if this specific row group contains delete op
row_group_contains_delete(
self.context.reader_builder.parquet_metadata(),
self.row_group_idx,
self.context.reader_builder.file_path(),
)
.unwrap_or(true)
}
}
}
/// Returns a reader to read the [FileRange].
pub(crate) async fn reader(
&self,
@@ -243,15 +230,6 @@ impl FileRange {
if !self.in_dynamic_filter_range() {
return Ok(None);
}
let parquet_reader = self
.context
.reader_builder
.build(
self.row_group_idx,
self.row_selection.clone(),
fetch_metrics,
)
.await?;
let use_last_row_reader = if selector
.map(|s| s == TimeSeriesRowSelector::LastRow)
@@ -275,8 +253,21 @@ impl FileRange {
let skip_fields = self.context.should_skip_fields(self.row_group_idx);
let flat_prune_reader = if use_last_row_reader {
let flat_row_group_reader =
FlatRowGroupReader::new(self.context.clone(), parquet_reader);
let row_selection = self.row_selection.clone();
if row_selection
.as_ref()
.is_some_and(|selection| selection.row_count() == 0)
{
return Ok(None);
}
let flat_row_group_reader = FlatRowGroupReader::new(
self.context.clone(),
self.context
.reader_builder
.build(self.row_group_idx, row_selection, fetch_metrics)
.await?,
);
let reader = FlatRowGroupLastRowCachedReader::new(
self.file_handle().file_id().file_id(),
self.row_group_idx,
@@ -286,18 +277,138 @@ impl FileRange {
);
FlatPruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields)
} else {
let flat_row_group_reader =
FlatRowGroupReader::new(self.context.clone(), parquet_reader);
FlatPruneReader::new_with_row_group_reader(
self.context.clone(),
flat_row_group_reader,
skip_fields,
)
match self.prefiltered_flat_reader_input(fetch_metrics).await? {
PrefilteredFlatReaderInput::Selection(row_selection) => {
if row_selection
.as_ref()
.is_some_and(|selection| selection.row_count() == 0)
{
return Ok(None);
}
let flat_row_group_reader = FlatRowGroupReader::new(
self.context.clone(),
self.context
.reader_builder
.build(self.row_group_idx, row_selection, fetch_metrics)
.await?,
);
FlatPruneReader::new_with_row_group_reader(
self.context.clone(),
flat_row_group_reader,
skip_fields,
)
}
PrefilteredFlatReaderInput::Prefetched(mut row_group) => {
let flat_row_group_reader = FlatRowGroupReader::new(
self.context.clone(),
self.context
.reader_builder
.build_on_row_group(
&mut row_group,
self.row_selection.clone(),
fetch_metrics,
)
.await?,
);
FlatPruneReader::new_with_row_group_reader(
self.context.clone(),
flat_row_group_reader,
skip_fields,
)
}
}
};
Ok(Some(flat_prune_reader))
}
async fn prefiltered_flat_reader_input<'a>(
&'a self,
fetch_metrics: Option<&ParquetFetchMetrics>,
) -> Result<PrefilteredFlatReaderInput<'a>> {
if !self.select_all() {
return Ok(PrefilteredFlatReaderInput::Selection(
self.row_selection.clone(),
));
}
let Some(mut primary_key_filter) = self.context.new_primary_key_filter() else {
return Ok(PrefilteredFlatReaderInput::Selection(
self.row_selection.clone(),
));
};
let read_format = ReadFormat::new_flat(
self.context.read_format().metadata().clone(),
std::iter::empty::<ColumnId>(),
Some(
self.context
.reader_builder
.parquet_metadata()
.file_metadata()
.schema_descr()
.num_columns(),
),
self.context.file_path(),
false,
)?;
let mut row_group = self
.context
.reader_builder
.new_in_memory_row_group(self.row_group_idx);
let reader = self
.context
.reader_builder
.build_on_row_group_with_read_format(
&mut row_group,
self.row_selection.clone(),
fetch_metrics,
&read_format,
)
.await?;
let rows_in_group = self
.context
.reader_builder
.parquet_metadata()
.row_group(self.row_group_idx)
.num_rows() as usize;
let mut matched_row_ranges: Vec<Range<usize>> = Vec::new();
let mut row_offset = 0;
for batch_result in reader {
let batch = batch_result.context(ArrowReaderSnafu {
path: self.context.file_path(),
})?;
let batch_num_rows = batch.num_rows();
matched_row_ranges.extend(
self.context
.base
.matching_row_ranges_by_primary_key(&batch, primary_key_filter.as_mut())?
.into_iter()
.map(|range| (range.start + row_offset)..(range.end + row_offset)),
);
row_offset += batch_num_rows;
}
if matched_row_ranges.is_empty() {
return Ok(PrefilteredFlatReaderInput::Selection(Some(
RowSelection::from(vec![]),
)));
}
if matched_row_ranges.len() == 1
&& matched_row_ranges[0].start == 0
&& matched_row_ranges[0].end == rows_in_group
{
return Ok(PrefilteredFlatReaderInput::Prefetched(row_group));
}
Ok(PrefilteredFlatReaderInput::Selection(Some(
row_selection_from_row_ranges(matched_row_ranges.into_iter(), rows_in_group),
)))
}
/// Returns the helper to compat batches.
pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
self.context.compat_batch()
@@ -314,6 +425,11 @@ impl FileRange {
}
}
enum PrefilteredFlatReaderInput<'a> {
Selection(Option<RowSelection>),
Prefetched(InMemoryRowGroup<'a>),
}
/// Context shared by ranges of the same parquet SST.
pub(crate) struct FileRangeContext {
/// Row group reader builder for the file.
@@ -343,6 +459,11 @@ impl FileRangeContext {
&self.base.filters
}
/// Builds an encoded primary-key filter for flat scan pre-filtering.
pub(crate) fn new_primary_key_filter(&self) -> Option<Box<dyn PrimaryKeyFilter>> {
self.base.new_primary_key_filter()
}
/// Returns true if a partition filter is configured.
pub(crate) fn has_partition_filter(&self) -> bool {
self.base.partition_filter.is_some()
@@ -390,6 +511,16 @@ impl FileRangeContext {
self.base.precise_filter_flat(input, skip_fields)
}
/// Applies an encoded primary-key prefilter to the input `RecordBatch`.
pub(crate) fn prefilter_flat_batch_by_primary_key(
&self,
input: RecordBatch,
primary_key_filter: &mut dyn PrimaryKeyFilter,
) -> Result<Option<RecordBatch>> {
self.base
.prefilter_flat_batch_by_primary_key(input, primary_key_filter)
}
/// Determines whether to skip field filters based on PreFilterMode and row group delete status.
pub(crate) fn should_skip_fields(&self, row_group_idx: usize) -> bool {
match self.base.pre_filter_mode {
@@ -439,6 +570,11 @@ pub(crate) struct PartitionFilterContext {
pub(crate) struct RangeBase {
/// Filters pushed down.
pub(crate) filters: Vec<SimpleFilterContext>,
/// Simple filters that can be compiled into encoded primary-key checks.
///
/// This set is pre-validated against the SST/expected metadata and only contains
/// tag filters on primary-key columns, excluding partition columns.
pub(crate) primary_key_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
/// Dynamic filter physical exprs.
pub(crate) dyn_filters: Vec<Arc<DynamicFilterPhysicalExpr>>,
/// Helper to read the SST.
@@ -473,6 +609,178 @@ impl TagDecodeState {
}
impl RangeBase {
pub(crate) fn retain_usable_primary_key_filters(
sst_metadata: &RegionMetadataRef,
expected_metadata: Option<&RegionMetadata>,
filters: &mut Vec<SimpleFilterEvaluator>,
) {
filters.retain(|filter| {
Self::is_usable_primary_key_filter(sst_metadata, expected_metadata, filter)
});
}
fn is_usable_primary_key_filter(
sst_metadata: &RegionMetadataRef,
expected_metadata: Option<&RegionMetadata>,
filter: &SimpleFilterEvaluator,
) -> bool {
if is_partition_column(filter.column_name()) {
return false;
}
let sst_column = match expected_metadata {
Some(expected_metadata) => {
let Some(expected_column) = expected_metadata.column_by_name(filter.column_name())
else {
return false;
};
let Some(sst_column) = sst_metadata.column_by_id(expected_column.column_id) else {
return false;
};
if sst_column.column_schema.name != expected_column.column_schema.name
|| sst_column.semantic_type != expected_column.semantic_type
|| sst_column.column_schema.data_type != expected_column.column_schema.data_type
{
return false;
}
sst_column
}
None => {
let Some(sst_column) = sst_metadata.column_by_name(filter.column_name()) else {
return false;
};
sst_column
}
};
sst_column.semantic_type == SemanticType::Tag
&& sst_metadata
.primary_key_index(sst_column.column_id)
.is_some()
}
/// Builds an encoded primary-key filter for flat scan pre-filtering.
pub(crate) fn new_primary_key_filter(&self) -> Option<Box<dyn PrimaryKeyFilter>> {
if self.read_format.metadata().primary_key.is_empty()
|| !self
.read_format
.as_flat()
.is_some_and(|format| format.raw_batch_has_primary_key_dictionary())
{
return None;
}
let filters = self.primary_key_filters.as_ref()?;
if filters.is_empty() {
return None;
}
let filters = Arc::clone(filters);
Some(
self.codec
.primary_key_filter(self.read_format.metadata(), filters),
)
}
/// Applies an encoded primary-key prefilter before flat-row materialization.
///
/// This only prunes rows that are guaranteed to fail simple primary-key predicates.
/// The normal precise filter still runs after flat conversion.
pub(crate) fn prefilter_flat_batch_by_primary_key(
&self,
input: RecordBatch,
primary_key_filter: &mut dyn PrimaryKeyFilter,
) -> Result<Option<RecordBatch>> {
if input.num_rows() == 0 {
return Ok(Some(input));
}
let matched_row_ranges =
self.matching_row_ranges_by_primary_key(&input, primary_key_filter)?;
if matched_row_ranges.is_empty() {
return Ok(None);
}
if matched_row_ranges.len() == 1
&& matched_row_ranges[0].start == 0
&& matched_row_ranges[0].end == input.num_rows()
{
return Ok(Some(input));
}
if matched_row_ranges.len() == 1 {
let span = &matched_row_ranges[0];
return Ok(Some(input.slice(span.start, span.end - span.start)));
}
let mut mask = vec![false; input.num_rows()];
for span in matched_row_ranges {
mask[span].fill(true);
}
let filtered =
datatypes::arrow::compute::filter_record_batch(&input, &BooleanArray::from(mask))
.context(ComputeArrowSnafu)?;
if filtered.num_rows() == 0 {
Ok(None)
} else {
Ok(Some(filtered))
}
}
fn matching_row_ranges_by_primary_key(
&self,
input: &RecordBatch,
primary_key_filter: &mut dyn PrimaryKeyFilter,
) -> Result<Vec<Range<usize>>> {
let primary_key_index = primary_key_column_index(input.num_columns());
let pk_dict_array = input
.column(primary_key_index)
.as_any()
.downcast_ref::<PrimaryKeyArray>()
.context(UnexpectedSnafu {
reason: "Primary key column is not a dictionary array".to_string(),
})?;
let pk_values = pk_dict_array
.values()
.as_any()
.downcast_ref::<datatypes::arrow::array::BinaryArray>()
.context(UnexpectedSnafu {
reason: "Primary key values are not binary array".to_string(),
})?;
let keys = pk_dict_array.keys();
let key_values = keys.values();
if key_values.is_empty() {
return Ok(std::iter::once(0..input.num_rows()).collect());
}
let mut matched_row_ranges: Vec<Range<usize>> = Vec::new();
let mut start = 0;
while start < key_values.len() {
let key = key_values[start];
let mut end = start + 1;
while end < key_values.len() && key_values[end] == key {
end += 1;
}
if primary_key_filter.matches(pk_values.value(key as usize)) {
if let Some(last) = matched_row_ranges.last_mut()
&& last.end == start
{
last.end = end;
} else {
matched_row_ranges.push(start..end);
}
}
start = end;
}
Ok(matched_row_ranges)
}
/// TRY THE BEST to perform pushed down predicate precisely on the input batch.
/// Return the filtered batch. If the entire batch is filtered out, return None.
///

View File

@@ -282,6 +282,15 @@ impl FlatReadFormat {
}
}
/// Returns true if raw parquet batches already use the flat layout with an encoded
/// `__primary_key` dictionary column.
pub(crate) fn raw_batch_has_primary_key_dictionary(&self) -> bool {
match &self.parquet_adapter {
ParquetAdapter::Flat(_) => true,
ParquetAdapter::PrimaryKeyToFlat(_) => false,
}
}
/// Creates a sequence array to override.
pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
self.override_sequence

View File

@@ -26,15 +26,14 @@ use common_telemetry::{tracing, warn};
use datafusion_expr::Expr;
use datatypes::arrow::array::ArrayRef;
use datatypes::arrow::datatypes::Field;
use datatypes::arrow::error::ArrowError;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::DataType;
use futures::StreamExt;
use mito_codec::row_converter::build_primary_key_codec;
use object_store::ObjectStore;
use parquet::arrow::ProjectionMask;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions, RowSelection};
use parquet::arrow::async_reader::{ParquetRecordBatchStream, ParquetRecordBatchStreamBuilder};
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
use partition::expr::PartitionExpr;
use snafu::ResultExt;
@@ -48,7 +47,9 @@ use crate::cache::index::result_cache::PredicateKey;
use crate::cache::{CacheStrategy, CachedSstMeta};
#[cfg(feature = "vector_index")]
use crate::error::ApplyVectorIndexSnafu;
use crate::error::{ReadDataPartSnafu, ReadParquetSnafu, Result, SerializePartitionExprSnafu};
use crate::error::{
ArrowReaderSnafu, ReadDataPartSnafu, ReadParquetSnafu, Result, SerializePartitionExprSnafu,
};
use crate::metrics::{
PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL,
READ_ROWS_TOTAL, READ_STAGE_ELAPSED,
@@ -69,14 +70,13 @@ use crate::sst::index::inverted_index::applier::{
#[cfg(feature = "vector_index")]
use crate::sst::index::vector_index::applier::VectorIndexApplierRef;
use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
use crate::sst::parquet::async_reader::SstAsyncFileReader;
use crate::sst::parquet::file_range::{
FileRangeContext, FileRangeContextRef, PartitionFilterContext, PreFilterMode, RangeBase,
row_group_contains_delete,
};
use crate::sst::parquet::format::{ReadFormat, need_override_sequence};
use crate::sst::parquet::metadata::MetadataLoader;
use crate::sst::parquet::row_group::ParquetFetchMetrics;
use crate::sst::parquet::row_group::{InMemoryRowGroup, ParquetFetchMetrics};
use crate::sst::parquet::row_selection::RowGroupSelection;
use crate::sst::parquet::stats::RowGroupPruningStats;
use crate::sst::tag_maybe_to_dictionary_field;
@@ -415,12 +415,6 @@ impl ParquetReaderBuilder {
.set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
}
// Computes the projection mask.
let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
let indices = read_format.projection_indices();
// Now we assumes we don't have nested schemas.
// TODO(yingwen): Revisit this if we introduce nested types such as JSON type.
let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied());
let selection = self
.row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
.await;
@@ -452,20 +446,26 @@ impl ParquetReaderBuilder {
.map(|meta| meta.schema.clone())
.unwrap_or_else(|| region_meta.schema.clone());
// Create ArrowReaderMetadata for async stream building.
let arrow_reader_options =
ArrowReaderOptions::new().with_schema(read_format.arrow_schema().clone());
let arrow_metadata =
ArrowReaderMetadata::try_new(parquet_meta.clone(), arrow_reader_options)
// Computes the projection mask.
let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
let indices = read_format.projection_indices();
// Now we assumes we don't have nested schemas.
// TODO(yingwen): Revisit this if we introduce nested types such as JSON type.
let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied());
// Computes the field levels.
let hint = Some(read_format.arrow_schema().fields());
let field_levels =
parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
.context(ReadDataPartSnafu)?;
let reader_builder = RowGroupReaderBuilder {
file_handle: self.file_handle.clone(),
file_path,
parquet_meta,
arrow_metadata,
object_store: self.object_store.clone(),
projection: projection_mask,
field_levels,
cache_strategy: self.cache_strategy.clone(),
};
@@ -485,6 +485,20 @@ impl ParquetReaderBuilder {
vec![]
};
let primary_key_filters = self.predicate.as_ref().and_then(|predicate| {
let mut filters = predicate
.exprs()
.iter()
.filter_map(SimpleFilterEvaluator::try_new)
.collect::<Vec<_>>();
RangeBase::retain_usable_primary_key_filters(
read_format.metadata(),
self.expected_metadata.as_deref(),
&mut filters,
);
(!filters.is_empty()).then_some(Arc::new(filters))
});
let dyn_filters = if let Some(predicate) = &self.predicate {
predicate.dyn_filters().as_ref().clone()
} else {
@@ -499,6 +513,7 @@ impl ParquetReaderBuilder {
reader_builder,
RangeBase {
filters,
primary_key_filters,
dyn_filters,
read_format,
expected_metadata: self.expected_metadata.clone(),
@@ -1640,7 +1655,7 @@ impl ReaderMetrics {
}
}
/// Builder to build a [ParquetRecordBatchStream] for a row group.
/// Builder to build a [ParquetRecordBatchReader] for a row group.
pub(crate) struct RowGroupReaderBuilder {
/// SST file to read.
///
@@ -1650,12 +1665,12 @@ pub(crate) struct RowGroupReaderBuilder {
file_path: String,
/// Metadata of the parquet file.
parquet_meta: Arc<ParquetMetaData>,
/// Arrow reader metadata for building async stream.
arrow_metadata: ArrowReaderMetadata,
/// Object store as an Operator.
object_store: ObjectStore,
/// Projection mask.
projection: ProjectionMask,
/// Field levels to read.
field_levels: FieldLevels,
/// Cache.
cache_strategy: CacheStrategy,
}
@@ -1679,43 +1694,121 @@ impl RowGroupReaderBuilder {
&self.cache_strategy
}
/// Builds a [ParquetRecordBatchStream] to read the row group at `row_group_idx`.
pub(crate) fn new_in_memory_row_group(&self, row_group_idx: usize) -> InMemoryRowGroup<'_> {
InMemoryRowGroup::create(
self.file_handle.region_id(),
self.file_handle.file_id().file_id(),
&self.parquet_meta,
row_group_idx,
self.cache_strategy.clone(),
&self.file_path,
self.object_store.clone(),
)
}
fn projection_and_field_levels(
&self,
read_format: &ReadFormat,
) -> Result<(ProjectionMask, FieldLevels)> {
let parquet_schema_desc = self.parquet_meta.file_metadata().schema_descr();
let projection_mask = ProjectionMask::roots(
parquet_schema_desc,
read_format.projection_indices().iter().copied(),
);
let hint = Some(read_format.arrow_schema().fields());
let field_levels =
parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
.context(ReadDataPartSnafu)?;
Ok((projection_mask, field_levels))
}
/// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`.
pub(crate) async fn build(
&self,
row_group_idx: usize,
row_selection: Option<RowSelection>,
fetch_metrics: Option<&ParquetFetchMetrics>,
) -> Result<ParquetRecordBatchStream<SstAsyncFileReader>> {
// Create async file reader with caching support.
let async_reader = SstAsyncFileReader::new(
self.file_handle.file_id(),
self.file_path.clone(),
self.object_store.clone(),
self.cache_strategy.clone(),
self.parquet_meta.clone(),
row_group_idx,
) -> Result<ParquetRecordBatchReader> {
let mut row_group = self.new_in_memory_row_group(row_group_idx);
self.build_on_row_group_with_projection(
&mut row_group,
row_selection,
fetch_metrics,
self.projection.clone(),
self.field_levels.clone(),
)
.with_fetch_metrics(fetch_metrics.cloned());
.await
}
// Build the async stream using ArrowReaderBuilder API.
let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata(
async_reader,
self.arrow_metadata.clone(),
);
builder = builder
.with_row_groups(vec![row_group_idx])
.with_projection(self.projection.clone())
.with_batch_size(DEFAULT_READ_BATCH_SIZE);
pub(crate) async fn build_on_row_group_with_read_format(
&self,
row_group: &mut InMemoryRowGroup<'_>,
row_selection: Option<RowSelection>,
fetch_metrics: Option<&ParquetFetchMetrics>,
read_format: &ReadFormat,
) -> Result<ParquetRecordBatchReader> {
let (projection, field_levels) = self.projection_and_field_levels(read_format)?;
self.build_on_row_group_with_projection(
row_group,
row_selection,
fetch_metrics,
projection,
field_levels,
)
.await
}
if let Some(selection) = row_selection {
builder = builder.with_row_selection(selection);
pub(crate) async fn build_on_row_group(
&self,
row_group: &mut InMemoryRowGroup<'_>,
row_selection: Option<RowSelection>,
fetch_metrics: Option<&ParquetFetchMetrics>,
) -> Result<ParquetRecordBatchReader> {
self.build_on_row_group_with_projection(
row_group,
row_selection,
fetch_metrics,
self.projection.clone(),
self.field_levels.clone(),
)
.await
}
async fn build_on_row_group_with_projection(
&self,
row_group: &mut InMemoryRowGroup<'_>,
row_selection: Option<RowSelection>,
fetch_metrics: Option<&ParquetFetchMetrics>,
projection: ProjectionMask,
field_levels: FieldLevels,
) -> Result<ParquetRecordBatchReader> {
let fetch_start = Instant::now();
// Fetches data into memory.
row_group
.fetch(&projection, row_selection.as_ref(), fetch_metrics)
.await
.context(ReadParquetSnafu {
path: &self.file_path,
})?;
// Record total fetch elapsed time.
if let Some(metrics) = fetch_metrics {
metrics.data.lock().unwrap().total_fetch_elapsed += fetch_start.elapsed();
}
let stream = builder.build().context(ReadParquetSnafu {
// Builds the parquet reader.
// Now the row selection is None.
ParquetRecordBatchReader::try_new_with_row_groups(
&field_levels,
row_group,
DEFAULT_READ_BATCH_SIZE,
row_selection,
)
.context(ReadParquetSnafu {
path: &self.file_path,
})?;
Ok(stream)
})
}
}
@@ -1845,7 +1938,7 @@ impl ParquetReader {
pub async fn next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
loop {
if let Some(reader) = &mut self.reader {
if let Some(batch) = reader.next_batch().await? {
if let Some(batch) = reader.next_batch()? {
return Ok(Some(batch));
}
self.reader = None;
@@ -1924,18 +2017,26 @@ impl ParquetReader {
/// RowGroupReaderContext represents the fields that cannot be shared
/// between different `RowGroupReader`s.
pub(crate) trait RowGroupReaderContext: Send {
fn read_format(&self) -> &ReadFormat;
fn map_result(
&self,
result: std::result::Result<Option<RecordBatch>, ArrowError>,
) -> Result<Option<RecordBatch>>;
fn file_path(&self) -> &str;
fn read_format(&self) -> &ReadFormat;
}
impl RowGroupReaderContext for FileRangeContextRef {
fn read_format(&self) -> &ReadFormat {
self.as_ref().read_format()
fn map_result(
&self,
result: std::result::Result<Option<RecordBatch>, ArrowError>,
) -> Result<Option<RecordBatch>> {
result.context(ArrowReaderSnafu {
path: self.file_path(),
})
}
fn file_path(&self) -> &str {
self.as_ref().file_path()
fn read_format(&self) -> &ReadFormat {
self.as_ref().read_format()
}
}
@@ -1944,11 +2045,8 @@ pub(crate) type RowGroupReader = RowGroupReaderBase<FileRangeContextRef>;
impl RowGroupReader {
/// Creates a new reader from file range.
pub(crate) fn new(
context: FileRangeContextRef,
stream: ParquetRecordBatchStream<SstAsyncFileReader>,
) -> Self {
Self::create(context, stream)
pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
Self::create(context, reader)
}
}
@@ -1956,8 +2054,8 @@ impl RowGroupReader {
pub(crate) struct RowGroupReaderBase<T> {
/// Context of [RowGroupReader] so adapts to different underlying implementation.
context: T,
/// Inner parquet record batch stream.
stream: ParquetRecordBatchStream<SstAsyncFileReader>,
/// Inner parquet reader.
reader: ParquetRecordBatchReader,
/// Buffered batches to return.
batches: VecDeque<Batch>,
/// Local scan metrics.
@@ -1971,7 +2069,7 @@ where
T: RowGroupReaderContext,
{
/// Creates a new reader to read the primary key format.
pub(crate) fn create(context: T, stream: ParquetRecordBatchStream<SstAsyncFileReader>) -> Self {
pub(crate) fn create(context: T, reader: ParquetRecordBatchReader) -> Self {
// The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE.
let override_sequence = context
.read_format()
@@ -1980,7 +2078,7 @@ where
Self {
context,
stream,
reader,
batches: VecDeque::new(),
metrics: ReaderMetrics::default(),
override_sequence,
@@ -1997,18 +2095,13 @@ where
self.context.read_format()
}
/// Tries to fetch next [RecordBatch] from the stream asynchronously.
async fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
match self.stream.next().await.transpose() {
Ok(batch) => Ok(batch),
Err(e) => Err(e).context(ReadParquetSnafu {
path: self.context.file_path(),
}),
}
/// Tries to fetch next [RecordBatch] from the reader.
fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
self.context.map_result(self.reader.next().transpose())
}
/// Returns the next [Batch].
pub(crate) async fn next_inner(&mut self) -> Result<Option<Batch>> {
pub(crate) fn next_inner(&mut self) -> Result<Option<Batch>> {
let scan_start = Instant::now();
if let Some(batch) = self.batches.pop_front() {
self.metrics.num_rows += batch.num_rows();
@@ -2018,7 +2111,7 @@ where
// We need to fetch next record batch and convert it to batches.
while self.batches.is_empty() {
let Some(record_batch) = self.fetch_next_record_batch().await? else {
let Some(record_batch) = self.fetch_next_record_batch()? else {
self.metrics.scan_cost += scan_start.elapsed();
return Ok(None);
};
@@ -2046,10 +2139,10 @@ where
#[async_trait::async_trait]
impl<T> BatchReader for RowGroupReaderBase<T>
where
T: RowGroupReaderContext + Send + Sync,
T: RowGroupReaderContext,
{
async fn next_batch(&mut self) -> Result<Option<Batch>> {
self.next_inner().await
self.next_inner()
}
}
@@ -2057,18 +2150,15 @@ where
pub(crate) struct FlatRowGroupReader {
/// Context for file ranges.
context: FileRangeContextRef,
/// Inner parquet record batch stream.
stream: ParquetRecordBatchStream<SstAsyncFileReader>,
/// Inner parquet reader.
reader: ParquetRecordBatchReader,
/// Cached sequence array to override sequences.
override_sequence: Option<ArrayRef>,
}
impl FlatRowGroupReader {
/// Creates a new flat reader from file range.
pub(crate) fn new(
context: FileRangeContextRef,
stream: ParquetRecordBatchStream<SstAsyncFileReader>,
) -> Self {
pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
// The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE.
let override_sequence = context
.read_format()
@@ -2076,28 +2166,39 @@ impl FlatRowGroupReader {
Self {
context,
stream,
reader,
override_sequence,
}
}
/// Returns the next RecordBatch.
pub(crate) async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
match self.stream.next().await {
/// Returns the next raw RecordBatch from parquet before flat-format conversion.
pub(crate) fn next_raw_batch(&mut self) -> Result<Option<RecordBatch>> {
match self.reader.next() {
Some(batch_result) => {
let record_batch = batch_result.context(ReadParquetSnafu {
let record_batch = batch_result.context(ArrowReaderSnafu {
path: self.context.file_path(),
})?;
// Safety: Only flat format use FlatRowGroupReader.
let flat_format = self.context.read_format().as_flat().unwrap();
let record_batch =
flat_format.convert_batch(record_batch, self.override_sequence.as_ref())?;
Ok(Some(record_batch))
}
None => Ok(None),
}
}
/// Converts a raw parquet batch into the projected flat batch.
pub(crate) fn convert_batch(&self, record_batch: RecordBatch) -> Result<RecordBatch> {
// Safety: Only flat format use FlatRowGroupReader.
let flat_format = self.context.read_format().as_flat().unwrap();
flat_format.convert_batch(record_batch, self.override_sequence.as_ref())
}
/// Returns the next flat RecordBatch.
pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
let Some(record_batch) = self.next_raw_batch()? else {
return Ok(None);
};
self.convert_batch(record_batch).map(Some)
}
}
#[cfg(test)]

View File

@@ -12,12 +12,28 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Parquet row group reading utilities.
//! Ports private structs from [parquet crate](https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/arrow/async_reader/mod.rs#L644-L650).
use std::ops::Range;
use std::sync::Arc;
use crate::sst::parquet::helper::MERGE_GAP;
use bytes::{Buf, Bytes};
use object_store::ObjectStore;
use parquet::arrow::ProjectionMask;
use parquet::arrow::arrow_reader::{RowGroups, RowSelection};
use parquet::column::page::{PageIterator, PageReader};
use parquet::errors::{ParquetError, Result};
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
use parquet::file::page_index::offset_index::OffsetIndexMetaData;
use parquet::file::reader::{ChunkReader, Length};
use parquet::file::serialized_reader::SerializedPageReader;
use store_api::storage::{FileId, RegionId};
use tokio::task::yield_now;
use crate::cache::file_cache::{FileType, IndexKey};
use crate::cache::{CacheStrategy, PageKey, PageValue};
use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES};
use crate::sst::parquet::helper::{MERGE_GAP, fetch_byte_ranges};
/// Inner data for ParquetFetchMetrics.
#[derive(Default, Debug, Clone)]
@@ -58,9 +74,9 @@ impl ParquetFetchMetricsData {
}
/// Metrics for tracking page/row group fetch operations.
#[derive(Default, Clone)]
#[derive(Default)]
pub struct ParquetFetchMetrics {
pub data: Arc<std::sync::Mutex<ParquetFetchMetricsData>>,
pub data: std::sync::Mutex<ParquetFetchMetricsData>,
}
impl std::fmt::Debug for ParquetFetchMetrics {
@@ -188,6 +204,357 @@ impl ParquetFetchMetrics {
}
}
pub(crate) struct RowGroupBase<'a> {
parquet_metadata: &'a ParquetMetaData,
row_group_idx: usize,
pub(crate) offset_index: Option<&'a [OffsetIndexMetaData]>,
/// Compressed page of each column.
column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
pub(crate) row_count: usize,
}
impl<'a> RowGroupBase<'a> {
pub(crate) fn new(parquet_meta: &'a ParquetMetaData, row_group_idx: usize) -> Self {
let metadata = parquet_meta.row_group(row_group_idx);
// `offset_index` is always `None` if we don't set
// [with_page_index()](https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderOptions.html#method.with_page_index)
// to `true`.
let offset_index = parquet_meta
.offset_index()
// filter out empty offset indexes (old versions specified Some(vec![]) when no present)
.filter(|index| !index.is_empty())
.map(|x| x[row_group_idx].as_slice());
Self {
parquet_metadata: parquet_meta,
row_group_idx,
offset_index,
column_chunks: vec![None; metadata.columns().len()],
row_count: metadata.num_rows() as usize,
}
}
pub(crate) fn calc_sparse_read_ranges(
&self,
projection: &ProjectionMask,
offset_index: &[OffsetIndexMetaData],
selection: &RowSelection,
) -> (Vec<Range<u64>>, Vec<Vec<usize>>) {
// If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
// `RowSelection`
let mut page_start_offsets: Vec<Vec<usize>> = vec![];
let ranges = self
.column_chunks
.iter()
.zip(self.row_group_metadata().columns())
.enumerate()
.filter(|&(idx, (chunk, _chunk_meta))| chunk.is_none() && projection.leaf_included(idx))
.flat_map(|(idx, (_chunk, chunk_meta))| {
// If the first page does not start at the beginning of the column,
// then we need to also fetch a dictionary page.
let mut ranges = vec![];
let (start, _len) = chunk_meta.byte_range();
match offset_index[idx].page_locations.first() {
Some(first) if first.offset as u64 != start => {
ranges.push(start..first.offset as u64);
}
_ => (),
}
ranges.extend(
selection
.scan_ranges(&offset_index[idx].page_locations)
.iter()
.map(|range| range.start..range.end),
);
page_start_offsets.push(ranges.iter().map(|range| range.start as usize).collect());
ranges
})
.collect::<Vec<_>>();
(ranges, page_start_offsets)
}
pub(crate) fn assign_sparse_chunk(
&mut self,
projection: &ProjectionMask,
data: Vec<Bytes>,
page_start_offsets: Vec<Vec<usize>>,
) {
let mut page_start_offsets = page_start_offsets.into_iter();
let mut chunk_data = data.into_iter();
for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
if chunk.is_some() || !projection.leaf_included(idx) {
continue;
}
if let Some(offsets) = page_start_offsets.next() {
let mut chunks = Vec::with_capacity(offsets.len());
for _ in 0..offsets.len() {
chunks.push(chunk_data.next().unwrap());
}
let column = self
.parquet_metadata
.row_group(self.row_group_idx)
.column(idx);
*chunk = Some(Arc::new(ColumnChunkData::Sparse {
length: column.byte_range().1 as usize,
data: offsets.into_iter().zip(chunks).collect(),
}))
}
}
}
pub(crate) fn calc_dense_read_ranges(&self, projection: &ProjectionMask) -> Vec<Range<u64>> {
self.column_chunks
.iter()
.enumerate()
.filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
.map(|(idx, _chunk)| {
let column = self.row_group_metadata().column(idx);
let (start, length) = column.byte_range();
start..(start + length)
})
.collect::<Vec<_>>()
}
/// Assigns compressed chunk binary data to [RowGroupBase::column_chunks]
/// and returns the chunk offset and binary data assigned.
pub(crate) fn assign_dense_chunk(
&mut self,
projection: &ProjectionMask,
chunk_data: Vec<Bytes>,
) {
let mut chunk_data = chunk_data.into_iter();
for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
if chunk.is_some() || !projection.leaf_included(idx) {
continue;
}
// Get the fetched page.
let Some(data) = chunk_data.next() else {
continue;
};
let column = self
.parquet_metadata
.row_group(self.row_group_idx)
.column(idx);
*chunk = Some(Arc::new(ColumnChunkData::Dense {
offset: column.byte_range().0 as usize,
data,
}));
}
}
/// Create [PageReader] from [RowGroupBase::column_chunks]
pub(crate) fn column_reader(
&self,
col_idx: usize,
) -> Result<SerializedPageReader<ColumnChunkData>> {
let page_reader = match &self.column_chunks[col_idx] {
None => {
return Err(ParquetError::General(format!(
"Invalid column index {col_idx}, column was not fetched"
)));
}
Some(data) => {
let page_locations = self
.offset_index
// filter out empty offset indexes (old versions specified Some(vec![]) when no present)
.filter(|index| !index.is_empty())
.map(|index| index[col_idx].page_locations.clone());
SerializedPageReader::new(
data.clone(),
self.row_group_metadata().column(col_idx),
self.row_count,
page_locations,
)?
}
};
Ok(page_reader)
}
pub(crate) fn parquet_metadata(&self) -> &ParquetMetaData {
self.parquet_metadata
}
pub(crate) fn row_group_metadata(&self) -> &RowGroupMetaData {
self.parquet_metadata().row_group(self.row_group_idx)
}
}
/// An in-memory collection of column chunks
pub struct InMemoryRowGroup<'a> {
region_id: RegionId,
file_id: FileId,
row_group_idx: usize,
cache_strategy: CacheStrategy,
file_path: &'a str,
/// Object store.
object_store: ObjectStore,
base: RowGroupBase<'a>,
}
impl<'a> InMemoryRowGroup<'a> {
/// Creates a new [InMemoryRowGroup] by `row_group_idx`.
///
/// # Panics
/// Panics if the `row_group_idx` is invalid.
pub fn create(
region_id: RegionId,
file_id: FileId,
parquet_meta: &'a ParquetMetaData,
row_group_idx: usize,
cache_strategy: CacheStrategy,
file_path: &'a str,
object_store: ObjectStore,
) -> Self {
Self {
region_id,
file_id,
row_group_idx,
cache_strategy,
file_path,
object_store,
base: RowGroupBase::new(parquet_meta, row_group_idx),
}
}
/// Fetches the necessary column data into memory
pub async fn fetch(
&mut self,
projection: &ProjectionMask,
selection: Option<&RowSelection>,
metrics: Option<&ParquetFetchMetrics>,
) -> Result<()> {
if let Some((selection, offset_index)) = selection.zip(self.base.offset_index) {
let (fetch_ranges, page_start_offsets) =
self.base
.calc_sparse_read_ranges(projection, offset_index, selection);
let chunk_data = self.fetch_bytes(&fetch_ranges, metrics).await?;
// Assign sparse chunk data to base.
self.base
.assign_sparse_chunk(projection, chunk_data, page_start_offsets);
} else {
// Release the CPU to avoid blocking the runtime. Since `fetch_pages_from_cache`
// is a synchronous, CPU-bound operation.
yield_now().await;
// Calculate ranges to read.
let fetch_ranges = self.base.calc_dense_read_ranges(projection);
if fetch_ranges.is_empty() {
// Nothing to fetch.
return Ok(());
}
// Fetch data with ranges
let chunk_data = self.fetch_bytes(&fetch_ranges, metrics).await?;
// Assigns fetched data to base.
self.base.assign_dense_chunk(projection, chunk_data);
}
Ok(())
}
/// Try to fetch data from the memory cache or the WriteCache,
/// if not in WriteCache, fetch data from object store directly.
async fn fetch_bytes(
&self,
ranges: &[Range<u64>],
metrics: Option<&ParquetFetchMetrics>,
) -> Result<Vec<Bytes>> {
// Now fetch page timer includes the whole time to read pages.
let _timer = READ_STAGE_FETCH_PAGES.start_timer();
let page_key = PageKey::new(self.file_id, self.row_group_idx, ranges.to_vec());
if let Some(pages) = self.cache_strategy.get_pages(&page_key) {
if let Some(metrics) = metrics {
let total_size: u64 = ranges.iter().map(|r| r.end - r.start).sum();
let mut metrics_data = metrics.data.lock().unwrap();
metrics_data.page_cache_hit += 1;
metrics_data.pages_to_fetch_mem += ranges.len();
metrics_data.page_size_to_fetch_mem += total_size;
metrics_data.page_size_needed += total_size;
}
return Ok(pages.compressed.clone());
}
// Calculate total range size for metrics.
let (total_range_size, unaligned_size) = compute_total_range_size(ranges);
let key = IndexKey::new(self.region_id, self.file_id, FileType::Parquet);
let fetch_write_cache_start = metrics.map(|_| std::time::Instant::now());
let write_cache_result = self.fetch_ranges_from_write_cache(key, ranges).await;
let pages = match write_cache_result {
Some(data) => {
if let Some(metrics) = metrics {
let elapsed = fetch_write_cache_start
.map(|start| start.elapsed())
.unwrap_or_default();
let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum();
let mut metrics_data = metrics.data.lock().unwrap();
metrics_data.write_cache_fetch_elapsed += elapsed;
metrics_data.write_cache_hit += 1;
metrics_data.pages_to_fetch_write_cache += ranges.len();
metrics_data.page_size_to_fetch_write_cache += unaligned_size;
metrics_data.page_size_needed += range_size_needed;
}
data
}
None => {
// Fetch data from object store.
let _timer = READ_STAGE_ELAPSED
.with_label_values(&["cache_miss_read"])
.start_timer();
let start = metrics.map(|_| std::time::Instant::now());
let data = fetch_byte_ranges(self.file_path, self.object_store.clone(), ranges)
.await
.map_err(|e| ParquetError::External(Box::new(e)))?;
if let Some(metrics) = metrics {
let elapsed = start.map(|start| start.elapsed()).unwrap_or_default();
let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum();
let mut metrics_data = metrics.data.lock().unwrap();
metrics_data.store_fetch_elapsed += elapsed;
metrics_data.cache_miss += 1;
metrics_data.pages_to_fetch_store += ranges.len();
metrics_data.page_size_to_fetch_store += unaligned_size;
metrics_data.page_size_needed += range_size_needed;
}
data
}
};
// Put pages back to the cache.
let page_value = PageValue::new(pages.clone(), total_range_size);
self.cache_strategy
.put_pages(page_key, Arc::new(page_value));
Ok(pages)
}
/// Fetches data from write cache.
/// Returns `None` if the data is not in the cache.
async fn fetch_ranges_from_write_cache(
&self,
key: IndexKey,
ranges: &[Range<u64>],
) -> Option<Vec<Bytes>> {
if let Some(cache) = self.cache_strategy.write_cache() {
return cache.file_cache().read_ranges(key, ranges).await;
}
None
}
}
/// Computes the max possible buffer size to read the given `ranges`.
/// Returns (aligned_size, unaligned_size) where:
/// - aligned_size: total size aligned to pooled buffer size
@@ -235,3 +602,96 @@ fn align_to_pooled_buf_size(size: u64) -> u64 {
const POOLED_BUF_SIZE: u64 = 2 * 1024 * 1024;
size.div_ceil(POOLED_BUF_SIZE) * POOLED_BUF_SIZE
}
impl RowGroups for InMemoryRowGroup<'_> {
fn num_rows(&self) -> usize {
self.base.row_count
}
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
// Creates a page reader to read column at `i`.
let page_reader = self.base.column_reader(i)?;
Ok(Box::new(ColumnChunkIterator {
reader: Some(Ok(Box::new(page_reader))),
}))
}
fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
Box::new(std::iter::once(self.base.row_group_metadata()))
}
fn metadata(&self) -> &ParquetMetaData {
self.base.parquet_metadata()
}
}
/// An in-memory column chunk
#[derive(Clone)]
pub(crate) enum ColumnChunkData {
/// Column chunk data representing only a subset of data pages
Sparse {
/// Length of the full column chunk
length: usize,
/// Set of data pages included in this sparse chunk. Each element is a tuple
/// of (page offset, page data)
data: Vec<(usize, Bytes)>,
},
/// Full column chunk and its offset
Dense { offset: usize, data: Bytes },
}
impl ColumnChunkData {
fn get(&self, start: u64) -> Result<Bytes> {
match &self {
ColumnChunkData::Sparse { data, .. } => data
.binary_search_by_key(&start, |(offset, _)| *offset as u64)
.map(|idx| data[idx].1.clone())
.map_err(|_| {
ParquetError::General(format!(
"Invalid offset in sparse column chunk data: {start}"
))
}),
ColumnChunkData::Dense { offset, data } => {
let start = start as usize - *offset;
Ok(data.slice(start..))
}
}
}
}
impl Length for ColumnChunkData {
fn len(&self) -> u64 {
match &self {
ColumnChunkData::Sparse { length, .. } => *length as u64,
ColumnChunkData::Dense { data, .. } => data.len() as u64,
}
}
}
impl ChunkReader for ColumnChunkData {
type T = bytes::buf::Reader<Bytes>;
fn get_read(&self, start: u64) -> Result<Self::T> {
Ok(self.get(start)?.reader())
}
fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
Ok(self.get(start)?.slice(..length))
}
}
/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`]
pub(crate) struct ColumnChunkIterator {
pub(crate) reader: Option<Result<Box<dyn PageReader>>>,
}
impl Iterator for ColumnChunkIterator {
type Item = Result<Box<dyn PageReader>>;
fn next(&mut self) -> Option<Self::Item> {
self.reader.take()
}
}
impl PageIterator for ColumnChunkIterator {}