feat: Parquet reader builder supports building multiple ranges to read (#3841)

* chore: change `&mut self` to `&self`

* feat: define partition and partition context

* refactor: move precise_filter to PartitionContext

* feat: filter  wip

* feat: compute projection and fields in format

* feat: use RowGroupReader to implement ParquetReader

* fix: use expected meta to get column id for filters

* feat: partition returns row group reader

* style: fix clippy

* feat: add build partitions method

* docs: comment

* refactor: rename Partition to FileRange

* chore: address CR comments

* feat: avoid allocating column ids while constructing ReadFormat
This commit is contained in:
Yingwen
2024-05-10 15:39:38 +08:00
committed by GitHub
parent 89dbf6ddd2
commit 5a0629eaa0
4 changed files with 553 additions and 287 deletions

View File

@@ -14,6 +14,7 @@
//! SST in parquet format.
pub(crate) mod file_range;
mod format;
pub(crate) mod helper;
pub(crate) mod metadata;

View File

@@ -0,0 +1,186 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Structs and functions for reading ranges from a parquet file. A file range
//! is usually a row group in a parquet file.
use std::ops::BitAnd;
use std::sync::Arc;
use api::v1::SemanticType;
use datatypes::arrow::array::BooleanArray;
use datatypes::arrow::buffer::BooleanBuffer;
use parquet::arrow::arrow_reader::RowSelection;
use snafu::ResultExt;
use crate::error::{FieldTypeMismatchSnafu, FilterRecordBatchSnafu, Result};
use crate::read::Batch;
use crate::row_converter::{McmpRowCodec, RowCodec};
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::reader::{RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext};
/// A range of a parquet SST. Now it is a row group.
/// We can read different file ranges in parallel.
pub struct FileRange {
/// Shared context.
context: FileRangeContextRef,
/// Index of the row group in the SST.
row_group_idx: usize,
/// Row selection for the row group. `None` means all rows.
row_selection: Option<RowSelection>,
}
impl FileRange {
/// Creates a new [FileRange].
pub(crate) fn new(
context: FileRangeContextRef,
row_group_idx: usize,
row_selection: Option<RowSelection>,
) -> Self {
Self {
context,
row_group_idx,
row_selection,
}
}
/// Returns a reader to read the [FileRange].
#[allow(dead_code)]
pub(crate) async fn reader(&self) -> Result<RowGroupReader> {
let parquet_reader = self
.context
.reader_builder
.build(self.row_group_idx, self.row_selection.clone())
.await?;
Ok(RowGroupReader::new(self.context.clone(), parquet_reader))
}
}
/// Context shared by ranges of the same parquet SST.
pub(crate) struct FileRangeContext {
// Row group reader builder for the file.
reader_builder: RowGroupReaderBuilder,
/// Filters pushed down.
filters: Vec<SimpleFilterContext>,
/// Helper to read the SST.
read_format: ReadFormat,
/// Decoder for primary keys
codec: McmpRowCodec,
}
pub(crate) type FileRangeContextRef = Arc<FileRangeContext>;
impl FileRangeContext {
/// Creates a new [FileRangeContext].
pub(crate) fn new(
reader_builder: RowGroupReaderBuilder,
filters: Vec<SimpleFilterContext>,
read_format: ReadFormat,
codec: McmpRowCodec,
) -> Self {
Self {
reader_builder,
filters,
read_format,
codec,
}
}
/// Returns the path of the file to read.
pub(crate) fn file_path(&self) -> &str {
self.reader_builder.file_path()
}
/// Returns filters pushed down.
pub(crate) fn filters(&self) -> &[SimpleFilterContext] {
&self.filters
}
/// Returns the format helper.
pub(crate) fn read_format(&self) -> &ReadFormat {
&self.read_format
}
/// Returns the reader builder.
pub(crate) fn reader_builder(&self) -> &RowGroupReaderBuilder {
&self.reader_builder
}
/// TRY THE BEST to perform pushed down predicate precisely on the input batch.
/// Return the filtered batch. If the entire batch is filtered out, return None.
///
/// Supported filter expr type is defined in [SimpleFilterEvaluator].
///
/// When a filter is referencing primary key column, this method will decode
/// the primary key and put it into the batch.
pub(crate) fn precise_filter(&self, mut input: Batch) -> Result<Option<Batch>> {
let mut mask = BooleanBuffer::new_set(input.num_rows());
// Run filter one by one and combine them result
// TODO(ruihang): run primary key filter first. It may short circuit other filters
for filter in &self.filters {
let result = match filter.semantic_type() {
SemanticType::Tag => {
let pk_values = if let Some(pk_values) = input.pk_values() {
pk_values
} else {
input.set_pk_values(self.codec.decode(input.primary_key())?);
input.pk_values().unwrap()
};
// Safety: this is a primary key
let pk_index = self
.read_format
.metadata()
.primary_key_index(filter.column_id())
.unwrap();
let pk_value = pk_values[pk_index]
.try_to_scalar_value(filter.data_type())
.context(FieldTypeMismatchSnafu)?;
if filter
.filter()
.evaluate_scalar(&pk_value)
.context(FilterRecordBatchSnafu)?
{
continue;
} else {
// PK not match means the entire batch is filtered out.
return Ok(None);
}
}
SemanticType::Field => {
let Some(field_index) = self.read_format.field_index_by_id(filter.column_id())
else {
continue;
};
let field_col = &input.fields()[field_index].data;
filter
.filter()
.evaluate_vector(field_col)
.context(FilterRecordBatchSnafu)?
}
SemanticType::Timestamp => filter
.filter()
.evaluate_vector(input.timestamps())
.context(FilterRecordBatchSnafu)?,
};
mask = mask.bitand(&result);
}
input.filter(&BooleanArray::from(mask).into())?;
Ok(Some(input))
}
}

View File

@@ -121,16 +121,19 @@ pub(crate) struct ReadFormat {
/// Field column id to its index in `schema` (SST schema).
/// In SST schema, fields are stored in the front of the schema.
field_id_to_index: HashMap<ColumnId, usize>,
/// Indices of columns to read from the SST. It contains all internal columns.
projection_indices: Vec<usize>,
/// Field column id to their index in the projected schema (
/// the schema of [Batch]).
///
/// This field is set at the first call to [convert_record_batch](Self::convert_record_batch).
field_id_to_projected_index: Option<HashMap<ColumnId, usize>>,
field_id_to_projected_index: HashMap<ColumnId, usize>,
}
impl ReadFormat {
/// Creates a helper with existing `metadata`.
pub(crate) fn new(metadata: RegionMetadataRef) -> ReadFormat {
/// Creates a helper with existing `metadata` and `column_ids` to read.
pub(crate) fn new(
metadata: RegionMetadataRef,
column_ids: impl Iterator<Item = ColumnId>,
) -> ReadFormat {
let field_id_to_index: HashMap<_, _> = metadata
.field_columns()
.enumerate()
@@ -138,11 +141,42 @@ impl ReadFormat {
.collect();
let arrow_schema = to_sst_arrow_schema(&metadata);
// Maps column id of a projected field to its index in SST.
let mut projected_field_id_index: Vec<_> = column_ids
.filter_map(|column_id| {
// Only apply projection to fields.
field_id_to_index
.get(&column_id)
.copied()
.map(|index| (column_id, index))
})
.collect();
let mut projection_indices: Vec<_> = projected_field_id_index
.iter()
.map(|(_column_id, index)| *index)
// We need to add all fixed position columns.
.chain(arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM..arrow_schema.fields.len())
.collect();
projection_indices.sort_unstable();
// Sort fields by their indices in the SST. Then the order of fields is their order
// in the Batch.
projected_field_id_index.sort_unstable_by_key(|x| x.1);
// Because the SST put fields before other columns, we don't need to consider other
// columns.
let field_id_to_projected_index = projected_field_id_index
.into_iter()
.map(|(column_id, _)| column_id)
.enumerate()
.map(|(index, column_id)| (column_id, index))
.collect();
ReadFormat {
metadata,
arrow_schema,
field_id_to_index,
field_id_to_projected_index: None,
projection_indices,
field_id_to_projected_index,
}
}
@@ -159,35 +193,16 @@ impl ReadFormat {
&self.metadata
}
/// Gets sorted projection indices to read `columns` from parquet files.
///
/// This function ignores columns not in `metadata` to for compatibility between
/// different schemas.
pub(crate) fn projection_indices(
&self,
columns: impl IntoIterator<Item = ColumnId>,
) -> Vec<usize> {
let mut indices: Vec<_> = columns
.into_iter()
.filter_map(|column_id| {
// Only apply projection to fields.
self.field_id_to_index.get(&column_id).copied()
})
// We need to add all fixed position columns.
.chain(
self.arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM
..self.arrow_schema.fields.len(),
)
.collect();
indices.sort_unstable();
indices
/// Gets sorted projection indices to read.
pub(crate) fn projection_indices(&self) -> &[usize] {
&self.projection_indices
}
/// Convert a arrow record batch into `batches`.
///
/// Note that the `record_batch` may only contains a subset of columns if it is projected.
pub(crate) fn convert_record_batch(
&mut self,
&self,
record_batch: &RecordBatch,
batches: &mut VecDeque<Batch>,
) -> Result<()> {
@@ -204,10 +219,6 @@ impl ReadFormat {
}
);
if self.field_id_to_projected_index.is_none() {
self.init_id_to_projected_index(record_batch);
}
let mut fixed_pos_columns = record_batch
.columns()
.iter()
@@ -270,19 +281,6 @@ impl ReadFormat {
Ok(())
}
fn init_id_to_projected_index(&mut self, record_batch: &RecordBatch) {
let mut name_to_projected_index = HashMap::new();
for (index, field) in record_batch.schema().fields().iter().enumerate() {
let Some(column) = self.metadata.column_by_name(field.name()) else {
continue;
};
if column.semantic_type == SemanticType::Field {
name_to_projected_index.insert(column.column_id, index);
}
}
self.field_id_to_projected_index = Some(name_to_projected_index);
}
/// Returns min values of specific column in row groups.
pub(crate) fn min_values(
&self,
@@ -513,13 +511,8 @@ impl ReadFormat {
}
/// Index of a field column by its column id.
/// This function is only available after the first call to
/// [convert_record_batch](Self::convert_record_batch). Otherwise
/// it always return `None`
pub fn field_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
self.field_id_to_projected_index
.as_ref()
.and_then(|m| m.get(&column_id).copied())
self.field_id_to_projected_index.get(&column_id).copied()
}
}
@@ -753,18 +746,18 @@ mod tests {
#[test]
fn test_projection_indices() {
let metadata = build_test_region_metadata();
let read_format = ReadFormat::new(metadata);
// Only read tag1
assert_eq!(vec![2, 3, 4, 5], read_format.projection_indices([3]));
let read_format = ReadFormat::new(metadata.clone(), [3].iter().copied());
assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
// Only read field1
assert_eq!(vec![0, 2, 3, 4, 5], read_format.projection_indices([4]));
let read_format = ReadFormat::new(metadata.clone(), [4].iter().copied());
assert_eq!(&[0, 2, 3, 4, 5], read_format.projection_indices());
// Only read ts
assert_eq!(vec![2, 3, 4, 5], read_format.projection_indices([5]));
let read_format = ReadFormat::new(metadata.clone(), [5].iter().copied());
assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
// Read field0, tag0, ts
assert_eq!(
vec![1, 2, 3, 4, 5],
read_format.projection_indices([2, 1, 5])
);
let read_format = ReadFormat::new(metadata, [2, 1, 5].iter().copied());
assert_eq!(&[1, 2, 3, 4, 5], read_format.projection_indices());
}
#[test]
@@ -805,7 +798,12 @@ mod tests {
fn test_convert_empty_record_batch() {
let metadata = build_test_region_metadata();
let arrow_schema = build_test_arrow_schema();
let mut read_format = ReadFormat::new(metadata);
let column_ids: Vec<_> = metadata
.column_metadatas
.iter()
.map(|col| col.column_id)
.collect();
let read_format = ReadFormat::new(metadata, column_ids.iter().copied());
assert_eq!(arrow_schema, *read_format.arrow_schema());
let record_batch = RecordBatch::new_empty(arrow_schema);
@@ -819,7 +817,12 @@ mod tests {
#[test]
fn test_convert_record_batch() {
let metadata = build_test_region_metadata();
let mut read_format = ReadFormat::new(metadata);
let column_ids: Vec<_> = metadata
.column_metadatas
.iter()
.map(|col| col.column_id)
.collect();
let read_format = ReadFormat::new(metadata, column_ids.iter().copied());
let columns: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1

View File

@@ -15,7 +15,6 @@
//! Parquet reader.
use std::collections::{BTreeMap, VecDeque};
use std::ops::BitAnd;
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -24,9 +23,9 @@ use async_trait::async_trait;
use common_recordbatch::filter::SimpleFilterEvaluator;
use common_telemetry::{debug, warn};
use common_time::range::TimestampRange;
use datafusion_common::arrow::array::BooleanArray;
use datafusion_common::arrow::buffer::BooleanBuffer;
use datafusion_expr::Expr;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::data_type::ConcreteDataType;
use itertools::Itertools;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
@@ -40,17 +39,17 @@ use table::predicate::Predicate;
use crate::cache::CacheManagerRef;
use crate::error::{
ArrowReaderSnafu, FieldTypeMismatchSnafu, FilterRecordBatchSnafu, InvalidMetadataSnafu,
InvalidParquetSnafu, ReadParquetSnafu, Result,
ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadParquetSnafu, Result,
};
use crate::metrics::{
PRECISE_FILTER_ROWS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_TOTAL,
READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED,
};
use crate::read::{Batch, BatchReader};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::row_converter::{McmpRowCodec, SortField};
use crate::sst::file::FileHandle;
use crate::sst::index::applier::SstIndexApplierRef;
use crate::sst::parquet::file_range::{FileRange, FileRangeContext, FileRangeContextRef};
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::metadata::MetadataLoader;
use crate::sst::parquet::row_group::InMemoryRowGroup;
@@ -147,10 +146,30 @@ impl ParquetReaderBuilder {
self
}
/// Builds and initializes a [ParquetReader].
/// Builds a [ParquetReader].
///
/// This needs to perform IO operation.
pub async fn build(&self) -> Result<ParquetReader> {
let (context, row_groups) = self.build_reader_input().await?;
ParquetReader::new(context, row_groups).await
}
/// Builds [FileRange]s to read and pushes them to `file_ranges`.
#[allow(dead_code)]
pub async fn build_file_ranges(&self, file_ranges: &mut Vec<FileRange>) -> Result<()> {
let (context, row_groups) = self.build_reader_input().await?;
file_ranges.reserve_exact(row_groups.len());
for (row_group_idx, row_selection) in row_groups {
let file_range = FileRange::new(context.clone(), row_group_idx, row_selection);
file_ranges.push(file_range);
}
Ok(())
}
/// Builds a [FileRangeContext] and collects row groups to read.
///
/// This needs to perform IO operation.
async fn build_reader_input(&self) -> Result<(FileRangeContextRef, RowGroupMap)> {
let start = Instant::now();
let file_path = self.file_handle.file_path(&self.file_dir);
@@ -159,18 +178,28 @@ impl ParquetReaderBuilder {
let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?;
// Decodes region metadata.
let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
let region_meta = Self::get_region_metadata(&file_path, key_value_meta)?;
let read_format = ReadFormat::new(Arc::new(region_meta));
// Gets the metadata stored in the SST.
let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?);
let read_format = if let Some(column_ids) = &self.projection {
ReadFormat::new(region_meta.clone(), column_ids.iter().copied())
} else {
// Lists all column ids to read, we always use the expected metadata if possible.
let expected_meta = self.expected_metadata.as_ref().unwrap_or(&region_meta);
ReadFormat::new(
region_meta.clone(),
expected_meta
.column_metadatas
.iter()
.map(|col| col.column_id),
)
};
// Computes the projection mask.
let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
let projection_mask = if let Some(column_ids) = self.projection.as_ref() {
let indices = read_format.projection_indices(column_ids.iter().copied());
// Now we assumes we don't have nested schemas.
ProjectionMask::roots(parquet_schema_desc, indices)
} else {
ProjectionMask::all()
};
let indices = read_format.projection_indices();
// Now we assumes we don't have nested schemas.
// TODO(yingwen): Revisit this if we introduce nested types such as JSON type.
let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied());
// Computes the field levels.
let hint = Some(read_format.arrow_schema().fields());
@@ -196,16 +225,21 @@ impl ParquetReaderBuilder {
metrics.build_cost = start.elapsed();
let predicate = if let Some(predicate) = &self.predicate {
let filters = if let Some(predicate) = &self.predicate {
predicate
.exprs()
.iter()
.filter_map(|expr| SimpleFilterEvaluator::try_new(expr.df_expr()))
.filter_map(|expr| {
SimpleFilterContext::new_opt(
&region_meta,
self.expected_metadata.as_deref(),
expr.df_expr(),
)
})
.collect::<Vec<_>>()
} else {
vec![]
};
let codec = McmpRowCodec::new(
read_format
.metadata()
@@ -214,16 +248,8 @@ impl ParquetReaderBuilder {
.collect(),
);
Ok(ParquetReader {
row_groups,
read_format,
reader_builder,
predicate,
current_reader: None,
batches: VecDeque::new(),
codec,
metrics,
})
let context = FileRangeContext::new(reader_builder, filters, read_format, codec);
Ok((Arc::new(context), row_groups))
}
/// Decodes region metadata from key value.
@@ -451,7 +477,7 @@ struct Metrics {
}
/// Builder to build a [ParquetRecordBatchReader] for a row group.
struct RowGroupReaderBuilder {
pub(crate) struct RowGroupReaderBuilder {
/// SST file to read.
///
/// Holds the file handle to avoid the file purge purge it.
@@ -472,13 +498,13 @@ struct RowGroupReaderBuilder {
impl RowGroupReaderBuilder {
/// Path of the file to read.
fn file_path(&self) -> &str {
pub(crate) fn file_path(&self) -> &str {
&self.file_path
}
/// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`.
async fn build(
&mut self,
pub(crate) async fn build(
&self,
row_group_idx: usize,
row_selection: Option<RowSelection>,
) -> Result<ParquetRecordBatchReader> {
@@ -513,158 +539,285 @@ impl RowGroupReaderBuilder {
}
}
/// The state of a [ParquetReader].
enum ReaderState {
/// The reader is reading a row group.
Readable(RowGroupReader),
/// The reader is exhausted.
Exhausted(Metrics),
}
impl ReaderState {
/// Returns the metrics of the reader.
fn metrics(&self) -> &Metrics {
match self {
ReaderState::Readable(reader) => &reader.metrics,
ReaderState::Exhausted(m) => m,
}
}
}
/// Context to evaluate the column filter.
pub(crate) struct SimpleFilterContext {
/// Filter to evaluate.
filter: SimpleFilterEvaluator,
/// Id of the column to evaluate.
column_id: ColumnId,
/// Semantic type of the column.
semantic_type: SemanticType,
/// The data type of the column.
data_type: ConcreteDataType,
}
impl SimpleFilterContext {
/// Creates a context for the `expr`.
///
/// Returns None if the column to filter doesn't exist in the SST metadata or the
/// expected metadata.
fn new_opt(
sst_meta: &RegionMetadataRef,
expected_meta: Option<&RegionMetadata>,
expr: &Expr,
) -> Option<Self> {
let filter = SimpleFilterEvaluator::try_new(expr)?;
let column_metadata = match expected_meta {
Some(meta) => {
// Gets the column metadata from the expected metadata.
let column = meta.column_by_name(filter.column_name())?;
// Checks if the column is present in the SST metadata. We still uses the
// column from the expected metadata.
let sst_column = sst_meta.column_by_id(column.column_id)?;
debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
column
}
None => sst_meta.column_by_name(filter.column_name())?,
};
Some(Self {
filter,
column_id: column_metadata.column_id,
semantic_type: column_metadata.semantic_type,
data_type: column_metadata.column_schema.data_type.clone(),
})
}
/// Returns the filter to evaluate.
pub(crate) fn filter(&self) -> &SimpleFilterEvaluator {
&self.filter
}
/// Returns the column id.
pub(crate) fn column_id(&self) -> ColumnId {
self.column_id
}
/// Returns the semantic type of the column.
pub(crate) fn semantic_type(&self) -> SemanticType {
self.semantic_type
}
/// Returns the data type of the column.
pub(crate) fn data_type(&self) -> &ConcreteDataType {
&self.data_type
}
}
type RowGroupMap = BTreeMap<usize, Option<RowSelection>>;
/// Parquet batch reader to read our SST format.
pub struct ParquetReader {
/// File range context.
context: FileRangeContextRef,
/// Indices of row groups to read, along with their respective row selections.
row_groups: BTreeMap<usize, Option<RowSelection>>,
/// Helper to read record batches.
///
/// Not `None` if [ParquetReader::stream] is not `None`.
read_format: ReadFormat,
/// Builder to build row group readers.
///
/// The builder contains the file handle, so don't drop the builder while using
/// the [ParquetReader].
reader_builder: RowGroupReaderBuilder,
/// Predicate pushed down to this reader.
predicate: Vec<SimpleFilterEvaluator>,
row_groups: RowGroupMap,
/// Reader of current row group.
current_reader: Option<ParquetRecordBatchReader>,
/// Buffered batches to return.
batches: VecDeque<Batch>,
/// Decoder for primary keys
codec: McmpRowCodec,
/// Local metrics.
metrics: Metrics,
reader_state: ReaderState,
}
#[async_trait]
impl BatchReader for ParquetReader {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
let ReaderState::Readable(reader) = &mut self.reader_state else {
return Ok(None);
};
let start = Instant::now();
// We don't collect the elapsed time if the reader returns an error.
if let Some(batch) = reader.next_batch().await? {
reader.metrics.scan_cost += start.elapsed();
return Ok(Some(batch));
}
// No more items in current row group, reads next row group.
while let Some((row_group_idx, row_selection)) = self.row_groups.pop_first() {
let parquet_reader = self
.context
.reader_builder()
.build(row_group_idx, row_selection)
.await?;
// Resets the parquet reader.
reader.reset_reader(parquet_reader);
if let Some(batch) = reader.next_batch().await? {
reader.metrics.scan_cost += start.elapsed();
return Ok(Some(batch));
}
}
// The reader is exhausted.
reader.metrics.scan_cost += start.elapsed();
self.reader_state = ReaderState::Exhausted(std::mem::take(&mut reader.metrics));
Ok(None)
}
}
impl Drop for ParquetReader {
fn drop(&mut self) {
let metrics = self.reader_state.metrics();
debug!(
"Read parquet {} {}, range: {:?}, {}/{} row groups, metrics: {:?}",
self.context.reader_builder().file_handle.region_id(),
self.context.reader_builder().file_handle.file_id(),
self.context.reader_builder().file_handle.time_range(),
metrics.num_row_groups_before_filtering
- metrics.num_row_groups_inverted_index_filtered
- metrics.num_row_groups_min_max_filtered,
metrics.num_row_groups_before_filtering,
metrics
);
// Report metrics.
READ_STAGE_ELAPSED
.with_label_values(&["build_parquet_reader"])
.observe(metrics.build_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["scan_row_groups"])
.observe(metrics.scan_cost.as_secs_f64());
READ_ROWS_TOTAL
.with_label_values(&["parquet"])
.inc_by(metrics.num_rows as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["before_filtering"])
.inc_by(metrics.num_row_groups_before_filtering as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["inverted_index_filtered"])
.inc_by(metrics.num_row_groups_inverted_index_filtered as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["minmax_index_filtered"])
.inc_by(metrics.num_row_groups_min_max_filtered as u64);
PRECISE_FILTER_ROWS_TOTAL
.with_label_values(&["parquet"])
.inc_by(metrics.num_rows_precise_filtered as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["before_filtering"])
.inc_by(metrics.num_rows_in_row_group_before_filtering as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["inverted_index_filtered"])
.inc_by(metrics.num_rows_in_row_group_inverted_index_filtered as u64);
}
}
impl ParquetReader {
/// Creates a new reader.
async fn new(
context: FileRangeContextRef,
mut row_groups: BTreeMap<usize, Option<RowSelection>>,
) -> Result<Self> {
// No more items in current row group, reads next row group.
let reader_state = if let Some((row_group_idx, row_selection)) = row_groups.pop_first() {
let parquet_reader = context
.reader_builder()
.build(row_group_idx, row_selection)
.await?;
ReaderState::Readable(RowGroupReader::new(context.clone(), parquet_reader))
} else {
ReaderState::Exhausted(Metrics::default())
};
Ok(ParquetReader {
context,
row_groups,
reader_state,
})
}
/// Returns the metadata of the SST.
pub fn metadata(&self) -> &RegionMetadataRef {
self.context.read_format().metadata()
}
#[cfg(test)]
pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
self.context.reader_builder().parquet_meta.clone()
}
}
/// Reader to read a row group of a parquet file.
pub(crate) struct RowGroupReader {
/// Context for file ranges.
context: FileRangeContextRef,
/// Inner parquet reader.
reader: ParquetRecordBatchReader,
/// Buffered batches to return.
batches: VecDeque<Batch>,
/// Local scan metrics.
metrics: Metrics,
}
impl RowGroupReader {
/// Creates a new reader.
pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
Self {
context,
reader,
batches: VecDeque::new(),
metrics: Metrics::default(),
}
}
/// Resets the parquet reader.
fn reset_reader(&mut self, reader: ParquetRecordBatchReader) {
self.reader = reader;
}
/// Tries to fetch next [Batch] from the reader.
async fn next_batch(&mut self) -> Result<Option<Batch>> {
if let Some(batch) = self.batches.pop_front() {
self.metrics.scan_cost += start.elapsed();
self.metrics.num_rows += batch.num_rows();
return Ok(Some(batch));
}
// We need to fetch next record batch and convert it to batches.
while self.batches.is_empty() {
let Some(record_batch) = self.fetch_next_record_batch().await? else {
self.metrics.scan_cost += start.elapsed();
let Some(record_batch) = self.fetch_next_record_batch()? else {
return Ok(None);
};
self.metrics.num_record_batches += 1;
self.read_format
self.context
.read_format()
.convert_record_batch(&record_batch, &mut self.batches)?;
self.prune_batches()?;
self.metrics.num_batches += self.batches.len();
}
let batch = self.batches.pop_front();
self.metrics.scan_cost += start.elapsed();
self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
Ok(batch)
}
}
impl Drop for ParquetReader {
fn drop(&mut self) {
debug!(
"Read parquet {} {}, range: {:?}, {}/{} row groups, metrics: {:?}",
self.reader_builder.file_handle.region_id(),
self.reader_builder.file_handle.file_id(),
self.reader_builder.file_handle.time_range(),
self.metrics.num_row_groups_before_filtering
- self.metrics.num_row_groups_inverted_index_filtered
- self.metrics.num_row_groups_min_max_filtered,
self.metrics.num_row_groups_before_filtering,
self.metrics
);
// Report metrics.
READ_STAGE_ELAPSED
.with_label_values(&["build_parquet_reader"])
.observe(self.metrics.build_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["scan_row_groups"])
.observe(self.metrics.scan_cost.as_secs_f64());
READ_ROWS_TOTAL
.with_label_values(&["parquet"])
.inc_by(self.metrics.num_rows as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["before_filtering"])
.inc_by(self.metrics.num_row_groups_before_filtering as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["inverted_index_filtered"])
.inc_by(self.metrics.num_row_groups_inverted_index_filtered as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["minmax_index_filtered"])
.inc_by(self.metrics.num_row_groups_min_max_filtered as u64);
PRECISE_FILTER_ROWS_TOTAL
.with_label_values(&["parquet"])
.inc_by(self.metrics.num_rows_precise_filtered as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["before_filtering"])
.inc_by(self.metrics.num_rows_in_row_group_before_filtering as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["inverted_index_filtered"])
.inc_by(self.metrics.num_rows_in_row_group_inverted_index_filtered as u64);
}
}
impl ParquetReader {
/// Returns the metadata of the SST.
pub fn metadata(&self) -> &RegionMetadataRef {
self.read_format.metadata()
}
/// Tries to fetch next [RecordBatch] from the reader.
///
/// If the reader is exhausted, reads next row group.
async fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
if let Some(row_group_reader) = &mut self.current_reader {
if let Some(record_batch) =
row_group_reader
.next()
.transpose()
.context(ArrowReaderSnafu {
path: self.reader_builder.file_path(),
})?
{
return Ok(Some(record_batch));
}
}
// No more items in current row group, reads next row group.
while let Some((row_group_idx, row_selection)) = self.row_groups.pop_first() {
let mut row_group_reader = self
.reader_builder
.build(row_group_idx, row_selection)
.await?;
let Some(record_batch) =
row_group_reader
.next()
.transpose()
.context(ArrowReaderSnafu {
path: self.reader_builder.file_path(),
})?
else {
continue;
};
// Sets current reader to this reader.
self.current_reader = Some(row_group_reader);
return Ok(Some(record_batch));
}
Ok(None)
fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
self.reader.next().transpose().context(ArrowReaderSnafu {
path: self.context.file_path(),
})
}
/// Prunes batches by the pushed down predicate.
fn prune_batches(&mut self) -> Result<()> {
// fast path
if self.predicate.is_empty() {
if self.context.filters().is_empty() {
return Ok(());
}
@@ -672,7 +825,7 @@ impl ParquetReader {
let batches = std::mem::take(&mut self.batches);
for batch in batches {
let num_rows_before_filter = batch.num_rows();
let Some(batch_filtered) = self.precise_filter(batch)? else {
let Some(batch_filtered) = self.context.precise_filter(batch)? else {
// the entire batch is filtered out
self.metrics.num_rows_precise_filtered += num_rows_before_filter;
continue;
@@ -690,81 +843,4 @@ impl ParquetReader {
Ok(())
}
/// TRY THE BEST to perform pushed down predicate precisely on the input batch.
/// Return the filtered batch. If the entire batch is filtered out, return None.
///
/// Supported filter expr type is defined in [SimpleFilterEvaluator].
///
/// When a filter is referencing primary key column, this method will decode
/// the primary key and put it into the batch.
fn precise_filter(&self, mut input: Batch) -> Result<Option<Batch>> {
let mut mask = BooleanBuffer::new_set(input.num_rows());
// Run filter one by one and combine them result
// TODO(ruihang): run primary key filter first. It may short circuit other filters
for filter in &self.predicate {
let column_name = filter.column_name();
let Some(column_metadata) = self.read_format.metadata().column_by_name(column_name)
else {
// column not found, skip
// in situation like an column is added later
continue;
};
let result = match column_metadata.semantic_type {
SemanticType::Tag => {
let pk_values = if let Some(pk_values) = input.pk_values() {
pk_values
} else {
input.set_pk_values(self.codec.decode(input.primary_key())?);
input.pk_values().unwrap()
};
// Safety: this is a primary key
let pk_index = self
.read_format
.metadata()
.primary_key_index(column_metadata.column_id)
.unwrap();
let pk_value = pk_values[pk_index]
.try_to_scalar_value(&column_metadata.column_schema.data_type)
.context(FieldTypeMismatchSnafu)?;
if filter
.evaluate_scalar(&pk_value)
.context(FilterRecordBatchSnafu)?
{
continue;
} else {
// PK not match means the entire batch is filtered out.
return Ok(None);
}
}
SemanticType::Field => {
let Some(field_index) = self
.read_format
.field_index_by_id(column_metadata.column_id)
else {
continue;
};
let field_col = &input.fields()[field_index].data;
filter
.evaluate_vector(field_col)
.context(FilterRecordBatchSnafu)?
}
SemanticType::Timestamp => filter
.evaluate_vector(input.timestamps())
.context(FilterRecordBatchSnafu)?,
};
mask = mask.bitand(&result);
}
input.filter(&BooleanArray::from(mask).into())?;
Ok(Some(input))
}
#[cfg(test)]
pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
self.reader_builder.parquet_meta.clone()
}
}