feat: add PruneReader for optimized row filtering (#4370)

* Add PruneReader for optimized row filtering and error handling

 - Introduced `PruneReader` to replace `RowGroupReader` for optimized row filtering.

* Commit Message:

 Make ReaderMetrics fields public for external access

* Add row selection support to SeqScan and FileRange readers

 - Updated `SeqScan::build_part_sources` to accept an optional `TimeSeriesRowSelector`.

* Refactor `scan_region.rs` to remove unnecessary cloning of `series_row_selector`. Enhance `file_range.rs` by adding `select_all` method to check if all rows in a row group are selected, and update the logic in `reader` method to use `LastRowReader` only when all rows are
 selected and no DELETE operations are present.

* Commit Message:

Enhance PruneReader and ParquetReader with reset functionality and metrics handling

Summary:

 • Made Source enum public in prune.rs.

* chore: Update src/mito2/src/sst/parquet/reader.rs

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
Lei, HUANG
2024-07-15 22:23:34 +08:00
committed by GitHub
parent 2e7b12c344
commit 9fbc4ba649
9 changed files with 282 additions and 84 deletions

View File

@@ -828,6 +828,20 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("SST file {} does not contain valid stats info", file_path))]
StatsNotPresent {
file_path: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to decode stats of file {}", file_path))]
DecodeStats {
file_path: String,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -958,6 +972,7 @@ impl ErrorExt for Error {
FulltextPushText { source, .. }
| FulltextFinish { source, .. }
| ApplyFulltextIndex { source, .. } => source.status_code(),
DecodeStats { .. } | StatsNotPresent { .. } => StatusCode::Internal,
}
}

View File

@@ -19,6 +19,7 @@ pub mod dedup;
pub(crate) mod last_row;
pub mod merge;
pub mod projection;
pub(crate) mod prune;
pub(crate) mod scan_region;
pub(crate) mod seq_scan;
pub(crate) mod unordered_scan;
@@ -54,7 +55,7 @@ use crate::error::{
};
use crate::memtable::BoxedBatchIterator;
use crate::metrics::{READ_BATCHES_RETURN, READ_ROWS_RETURN, READ_STAGE_ELAPSED};
use crate::sst::parquet::reader::RowGroupReader;
use crate::read::prune::PruneReader;
/// Storage internal representation of a batch of rows for a primary key (time series).
///
@@ -686,8 +687,8 @@ pub enum Source {
Iter(BoxedBatchIterator),
/// Source from a [BoxedBatchStream].
Stream(BoxedBatchStream),
/// Source from a [RowGroupReader].
RowGroupReader(RowGroupReader),
/// Source from a [PruneReader].
PruneReader(PruneReader),
}
impl Source {
@@ -697,7 +698,7 @@ impl Source {
Source::Reader(reader) => reader.next_batch().await,
Source::Iter(iter) => iter.next().transpose(),
Source::Stream(stream) => stream.try_next().await,
Source::RowGroupReader(reader) => reader.next_batch().await,
Source::PruneReader(reader) => reader.next_batch().await,
}
}
}

114
src/mito2/src/read/prune.rs Normal file
View File

@@ -0,0 +1,114 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::error::Result;
use crate::read::last_row::LastRowReader;
use crate::read::{Batch, BatchReader};
use crate::sst::parquet::file_range::FileRangeContextRef;
use crate::sst::parquet::reader::{ReaderMetrics, RowGroupReader};
pub enum Source {
RowGroup(RowGroupReader),
LastRow(LastRowReader),
}
impl Source {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
match self {
Source::RowGroup(r) => r.next_batch().await,
Source::LastRow(r) => r.next_batch().await,
}
}
}
pub struct PruneReader {
/// Context for file ranges.
context: FileRangeContextRef,
source: Source,
metrics: ReaderMetrics,
}
impl PruneReader {
pub(crate) fn new_with_row_group_reader(
ctx: FileRangeContextRef,
reader: RowGroupReader,
) -> Self {
Self {
context: ctx,
source: Source::RowGroup(reader),
metrics: Default::default(),
}
}
pub(crate) fn new_with_last_row_reader(
ctx: FileRangeContextRef,
reader: LastRowReader,
) -> Self {
Self {
context: ctx,
source: Source::LastRow(reader),
metrics: Default::default(),
}
}
pub(crate) fn reset_source(&mut self, source: Source) {
self.source = source;
}
pub(crate) fn metrics(&mut self) -> &ReaderMetrics {
match &self.source {
Source::RowGroup(r) => r.metrics(),
Source::LastRow(_) => &self.metrics,
}
}
pub(crate) async fn next_batch(&mut self) -> Result<Option<Batch>> {
while let Some(b) = self.source.next_batch().await? {
match self.prune(b)? {
Some(b) => {
return Ok(Some(b));
}
None => {
continue;
}
}
}
Ok(None)
}
/// Prunes batches by the pushed down predicate.
fn prune(&mut self, batch: Batch) -> Result<Option<Batch>> {
// fast path
if self.context.filters().is_empty() {
return Ok(Some(batch));
}
let num_rows_before_filter = batch.num_rows();
let Some(batch_filtered) = self.context.precise_filter(batch)? else {
// the entire batch is filtered out
self.metrics.num_rows_precise_filtered += num_rows_before_filter;
return Ok(None);
};
// update metric
let filtered_rows = num_rows_before_filter - batch_filtered.num_rows();
self.metrics.num_rows_precise_filtered += filtered_rows;
if !batch_filtered.is_empty() {
Ok(Some(batch_filtered))
} else {
Ok(None)
}
}
}

View File

@@ -313,7 +313,7 @@ impl ScanRegion {
.with_append_mode(self.version.options.append_mode)
.with_filter_deleted(filter_deleted)
.with_merge_mode(self.version.options.merge_mode())
.with_series_row_selector(self.request.series_row_selector.clone());
.with_series_row_selector(self.request.series_row_selector);
Ok(input)
}

View File

@@ -103,7 +103,11 @@ impl SeqScan {
}
/// Builds sources from a [ScanPart].
fn build_part_sources(part: &ScanPart, sources: &mut Vec<Source>) -> Result<()> {
fn build_part_sources(
part: &ScanPart,
sources: &mut Vec<Source>,
row_selector: Option<TimeSeriesRowSelector>,
) -> Result<()> {
sources.reserve(part.memtable_ranges.len() + part.file_ranges.len());
// Read memtables.
for mem in &part.memtable_ranges {
@@ -125,7 +129,7 @@ impl SeqScan {
let region_id = ranges[0].file_handle().region_id();
let range_num = ranges.len();
for range in ranges {
let mut reader = range.reader().await?;
let mut reader = range.reader(row_selector).await?;
let compat_batch = range.compat_batch();
while let Some(mut batch) = reader.next_batch().await? {
if let Some(compat) = compat_batch {
@@ -166,7 +170,7 @@ impl SeqScan {
return Ok(None);
};
Self::build_part_sources(part, &mut sources)?;
Self::build_part_sources(part, &mut sources, None)?;
}
Self::build_reader_from_sources(stream_ctx, sources, semaphore).await
@@ -189,7 +193,7 @@ impl SeqScan {
return Ok(None);
};
Self::build_part_sources(part, &mut sources)?;
Self::build_part_sources(part, &mut sources, stream_ctx.input.series_row_selector)?;
Self::build_reader_from_sources(stream_ctx, sources, semaphore).await
}

View File

@@ -182,15 +182,15 @@ impl RegionScanner for UnorderedScan {
let mut reader_metrics = ReaderMetrics::default();
// Safety: UnorderedDistributor::build_parts() ensures this.
for file_range in &part.file_ranges[0] {
let reader = file_range.reader().await.map_err(BoxedError::new).context(ExternalSnafu)?;
let reader = file_range.reader(None).await.map_err(BoxedError::new).context(ExternalSnafu)?;
let compat_batch = file_range.compat_batch();
let mut source = Source::RowGroupReader(reader);
let mut source = Source::PruneReader(reader);
while let Some(batch) = Self::fetch_from_source(&mut source, mapper, cache, compat_batch, &mut metrics).await? {
metrics.num_batches += 1;
metrics.num_rows += batch.num_rows();
yield batch;
}
if let Source::RowGroupReader(reader) = source {
if let Source::PruneReader(mut reader) = source {
reader_metrics.merge_from(reader.metrics());
}
}

View File

@@ -18,14 +18,20 @@
use std::ops::BitAnd;
use std::sync::Arc;
use api::v1::SemanticType;
use api::v1::{OpType, SemanticType};
use common_telemetry::error;
use datatypes::arrow::array::BooleanArray;
use datatypes::arrow::buffer::BooleanBuffer;
use parquet::arrow::arrow_reader::RowSelection;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::TimeSeriesRowSelector;
use crate::error::{FieldTypeMismatchSnafu, FilterRecordBatchSnafu, Result};
use crate::error::{
DecodeStatsSnafu, FieldTypeMismatchSnafu, FilterRecordBatchSnafu, Result, StatsNotPresentSnafu,
};
use crate::read::compat::CompatBatch;
use crate::read::last_row::LastRowReader;
use crate::read::prune::PruneReader;
use crate::read::Batch;
use crate::row_converter::{McmpRowCodec, RowCodec};
use crate::sst::file::FileHandle;
@@ -58,15 +64,69 @@ impl FileRange {
}
}
/// Returns true if [FileRange] selects all rows in row group.
fn select_all(&self) -> bool {
let rows_in_group = self
.context
.reader_builder
.parquet_metadata()
.row_group(self.row_group_idx)
.num_rows();
let Some(row_selection) = &self.row_selection else {
return true;
};
row_selection.row_count() == rows_in_group as usize
}
/// Returns a reader to read the [FileRange].
pub(crate) async fn reader(&self) -> Result<RowGroupReader> {
pub(crate) async fn reader(
&self,
selector: Option<TimeSeriesRowSelector>,
) -> Result<PruneReader> {
let parquet_reader = self
.context
.reader_builder
.build(self.row_group_idx, self.row_selection.clone())
.await?;
Ok(RowGroupReader::new(self.context.clone(), parquet_reader))
let use_last_row_reader = if selector
.map(|s| s == TimeSeriesRowSelector::LastRow)
.unwrap_or(false)
{
// Only use LastRowReader if row group does not contain DELETE
// and all rows are selected.
let put_only = !self
.context
.contains_delete(self.row_group_idx)
.inspect_err(|e| {
error!(e; "Failed to decode min value of op_type, fallback to RowGroupReader");
})
.unwrap_or(true);
put_only && self.select_all()
} else {
// No selector provided, use RowGroupReader
false
};
let prune_reader = if use_last_row_reader {
// Row group is PUT only, use LastRowReader to skip unnecessary rows.
PruneReader::new_with_last_row_reader(
self.context.clone(),
LastRowReader::new(Box::new(RowGroupReader::new(
self.context.clone(),
parquet_reader,
)) as _),
)
} else {
// Row group contains DELETE, fallback to default reader.
PruneReader::new_with_row_group_reader(
self.context.clone(),
RowGroupReader::new(self.context.clone(), parquet_reader),
)
};
Ok(prune_reader)
}
/// Returns the helper to compat batches.
@@ -144,6 +204,34 @@ impl FileRangeContext {
pub(crate) fn precise_filter(&self, input: Batch) -> Result<Option<Batch>> {
self.base.precise_filter(input)
}
//// Decodes parquet metadata and finds if row group contains delete op.
pub(crate) fn contains_delete(&self, row_group_index: usize) -> Result<bool> {
let metadata = self.reader_builder.parquet_metadata();
let row_group_metadata = &metadata.row_groups()[row_group_index];
// safety: The last column of SST must be op_type
let column_metadata = &row_group_metadata.columns().last().unwrap();
let stats = column_metadata.statistics().context(StatsNotPresentSnafu {
file_path: self.reader_builder.file_path(),
})?;
if stats.has_min_max_set() {
stats
.min_bytes()
.try_into()
.map(i32::from_le_bytes)
.map(|min_op_type| min_op_type == OpType::Delete as i32)
.ok()
.context(DecodeStatsSnafu {
file_path: self.reader_builder.file_path(),
})
} else {
DecodeStatsSnafu {
file_path: self.reader_builder.file_path(),
}
.fail()
}
}
}
/// Common fields for a range to read and filter batches.

View File

@@ -50,6 +50,7 @@ use crate::metrics::{
PRECISE_FILTER_ROWS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_TOTAL,
READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED,
};
use crate::read::prune::{PruneReader, Source};
use crate::read::{Batch, BatchReader};
use crate::row_converter::{McmpRowCodec, SortField};
use crate::sst::file::FileHandle;
@@ -694,34 +695,34 @@ fn time_range_to_predicate(
}
/// Parquet reader metrics.
#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub(crate) struct ReaderMetrics {
/// Number of row groups before filtering.
num_row_groups_before_filtering: usize,
pub(crate) num_row_groups_before_filtering: usize,
/// Number of row groups filtered by fulltext index.
num_row_groups_fulltext_index_filtered: usize,
pub(crate) num_row_groups_fulltext_index_filtered: usize,
/// Number of row groups filtered by inverted index.
num_row_groups_inverted_index_filtered: usize,
pub(crate) num_row_groups_inverted_index_filtered: usize,
/// Number of row groups filtered by min-max index.
num_row_groups_min_max_filtered: usize,
pub(crate) num_row_groups_min_max_filtered: usize,
/// Number of rows filtered by precise filter.
num_rows_precise_filtered: usize,
pub(crate) num_rows_precise_filtered: usize,
/// Number of rows in row group before filtering.
num_rows_in_row_group_before_filtering: usize,
pub(crate) num_rows_in_row_group_before_filtering: usize,
/// Number of rows in row group filtered by fulltext index.
num_rows_in_row_group_fulltext_index_filtered: usize,
pub(crate) num_rows_in_row_group_fulltext_index_filtered: usize,
/// Number of rows in row group filtered by inverted index.
num_rows_in_row_group_inverted_index_filtered: usize,
pub(crate) num_rows_in_row_group_inverted_index_filtered: usize,
/// Duration to build the parquet reader.
build_cost: Duration,
pub(crate) build_cost: Duration,
/// Duration to scan the reader.
scan_cost: Duration,
pub(crate) scan_cost: Duration,
/// Number of record batches read.
num_record_batches: usize,
pub(crate) num_record_batches: usize,
/// Number of batches decoded.
num_batches: usize,
pub(crate) num_batches: usize,
/// Number of rows read.
num_rows: usize,
pub(crate) num_rows: usize,
}
impl ReaderMetrics {
@@ -749,7 +750,7 @@ impl ReaderMetrics {
pub(crate) struct RowGroupReaderBuilder {
/// SST file to read.
///
/// Holds the file handle to avoid the file purge purge it.
/// Holds the file handle to avoid the file purge it.
file_handle: FileHandle,
/// Path of the file.
file_path: String,
@@ -776,6 +777,10 @@ impl RowGroupReaderBuilder {
&self.file_handle
}
pub(crate) fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
&self.parquet_meta
}
/// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`.
pub(crate) async fn build(
&self,
@@ -816,16 +821,16 @@ impl RowGroupReaderBuilder {
/// The state of a [ParquetReader].
enum ReaderState {
/// The reader is reading a row group.
Readable(RowGroupReader),
Readable(PruneReader),
/// The reader is exhausted.
Exhausted(ReaderMetrics),
}
impl ReaderState {
/// Returns the metrics of the reader.
fn metrics(&self) -> &ReaderMetrics {
fn metrics(&mut self) -> &ReaderMetrics {
match self {
ReaderState::Readable(reader) => &reader.metrics,
ReaderState::Readable(reader) => reader.metrics(),
ReaderState::Exhausted(m) => m,
}
}
@@ -942,15 +947,19 @@ impl BatchReader for ParquetReader {
.reader_builder()
.build(row_group_idx, row_selection)
.await?;
// Resets the parquet reader.
reader.reset_reader(parquet_reader);
reader.reset_source(Source::RowGroup(RowGroupReader::new(
self.context.clone(),
parquet_reader,
)));
if let Some(batch) = reader.next_batch().await? {
return Ok(Some(batch));
}
}
// The reader is exhausted.
self.reader_state = ReaderState::Exhausted(std::mem::take(&mut reader.metrics));
self.reader_state = ReaderState::Exhausted(reader.metrics().clone());
Ok(None)
}
}
@@ -1019,7 +1028,10 @@ impl ParquetReader {
.reader_builder()
.build(row_group_idx, row_selection)
.await?;
ReaderState::Readable(RowGroupReader::new(context.clone(), parquet_reader))
ReaderState::Readable(PruneReader::new_with_row_group_reader(
context.clone(),
RowGroupReader::new(context.clone(), parquet_reader),
))
} else {
ReaderState::Exhausted(ReaderMetrics::default())
};
@@ -1070,13 +1082,17 @@ impl RowGroupReader {
&self.metrics
}
/// Resets the parquet reader.
fn reset_reader(&mut self, reader: ParquetRecordBatchReader) {
self.reader = reader;
/// Tries to fetch next [RecordBatch] from the reader.
fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
self.reader.next().transpose().context(ArrowReaderSnafu {
path: self.context.file_path(),
})
}
}
/// Tries to fetch next [Batch] from the reader.
pub(crate) async fn next_batch(&mut self) -> Result<Option<Batch>> {
#[async_trait::async_trait]
impl BatchReader for RowGroupReader {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
let scan_start = Instant::now();
if let Some(batch) = self.batches.pop_front() {
self.metrics.num_rows += batch.num_rows();
@@ -1095,7 +1111,6 @@ impl RowGroupReader {
self.context
.read_format()
.convert_record_batch(&record_batch, &mut self.batches)?;
self.prune_batches()?;
self.metrics.num_batches += self.batches.len();
}
let batch = self.batches.pop_front();
@@ -1103,45 +1118,6 @@ impl RowGroupReader {
self.metrics.scan_cost += scan_start.elapsed();
Ok(batch)
}
/// Tries to fetch next [RecordBatch] from the reader.
///
/// If the reader is exhausted, reads next row group.
fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
self.reader.next().transpose().context(ArrowReaderSnafu {
path: self.context.file_path(),
})
}
/// Prunes batches by the pushed down predicate.
fn prune_batches(&mut self) -> Result<()> {
// fast path
if self.context.filters().is_empty() {
return Ok(());
}
let mut new_batches = VecDeque::new();
let batches = std::mem::take(&mut self.batches);
for batch in batches {
let num_rows_before_filter = batch.num_rows();
let Some(batch_filtered) = self.context.precise_filter(batch)? else {
// the entire batch is filtered out
self.metrics.num_rows_precise_filtered += num_rows_before_filter;
continue;
};
// update metric
let filtered_rows = num_rows_before_filter - batch_filtered.num_rows();
self.metrics.num_rows_precise_filtered += filtered_rows;
if !batch_filtered.is_empty() {
new_batches.push_back(batch_filtered);
}
}
self.batches = new_batches;
Ok(())
}
}
#[cfg(test)]

View File

@@ -17,7 +17,7 @@ use datafusion_expr::expr::Expr;
use strum::Display;
/// A hint on how to select rows from a time-series.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Display)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)]
pub enum TimeSeriesRowSelector {
/// Only keep the last row of each time-series.
LastRow,