diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 6c0678a308..140966ac33 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -828,6 +828,20 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("SST file {} does not contain valid stats info", file_path))] + StatsNotPresent { + file_path: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to decode stats of file {}", file_path))] + DecodeStats { + file_path: String, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -958,6 +972,7 @@ impl ErrorExt for Error { FulltextPushText { source, .. } | FulltextFinish { source, .. } | ApplyFulltextIndex { source, .. } => source.status_code(), + DecodeStats { .. } | StatsNotPresent { .. } => StatusCode::Internal, } } diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index c008014510..3233e868a1 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -19,6 +19,7 @@ pub mod dedup; pub(crate) mod last_row; pub mod merge; pub mod projection; +pub(crate) mod prune; pub(crate) mod scan_region; pub(crate) mod seq_scan; pub(crate) mod unordered_scan; @@ -54,7 +55,7 @@ use crate::error::{ }; use crate::memtable::BoxedBatchIterator; use crate::metrics::{READ_BATCHES_RETURN, READ_ROWS_RETURN, READ_STAGE_ELAPSED}; -use crate::sst::parquet::reader::RowGroupReader; +use crate::read::prune::PruneReader; /// Storage internal representation of a batch of rows for a primary key (time series). /// @@ -686,8 +687,8 @@ pub enum Source { Iter(BoxedBatchIterator), /// Source from a [BoxedBatchStream]. Stream(BoxedBatchStream), - /// Source from a [RowGroupReader]. - RowGroupReader(RowGroupReader), + /// Source from a [PruneReader]. + PruneReader(PruneReader), } impl Source { @@ -697,7 +698,7 @@ impl Source { Source::Reader(reader) => reader.next_batch().await, Source::Iter(iter) => iter.next().transpose(), Source::Stream(stream) => stream.try_next().await, - Source::RowGroupReader(reader) => reader.next_batch().await, + Source::PruneReader(reader) => reader.next_batch().await, } } } diff --git a/src/mito2/src/read/prune.rs b/src/mito2/src/read/prune.rs new file mode 100644 index 0000000000..6fd790f543 --- /dev/null +++ b/src/mito2/src/read/prune.rs @@ -0,0 +1,114 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::error::Result; +use crate::read::last_row::LastRowReader; +use crate::read::{Batch, BatchReader}; +use crate::sst::parquet::file_range::FileRangeContextRef; +use crate::sst::parquet::reader::{ReaderMetrics, RowGroupReader}; + +pub enum Source { + RowGroup(RowGroupReader), + LastRow(LastRowReader), +} + +impl Source { + async fn next_batch(&mut self) -> Result> { + match self { + Source::RowGroup(r) => r.next_batch().await, + Source::LastRow(r) => r.next_batch().await, + } + } +} + +pub struct PruneReader { + /// Context for file ranges. + context: FileRangeContextRef, + source: Source, + metrics: ReaderMetrics, +} + +impl PruneReader { + pub(crate) fn new_with_row_group_reader( + ctx: FileRangeContextRef, + reader: RowGroupReader, + ) -> Self { + Self { + context: ctx, + source: Source::RowGroup(reader), + metrics: Default::default(), + } + } + + pub(crate) fn new_with_last_row_reader( + ctx: FileRangeContextRef, + reader: LastRowReader, + ) -> Self { + Self { + context: ctx, + source: Source::LastRow(reader), + metrics: Default::default(), + } + } + + pub(crate) fn reset_source(&mut self, source: Source) { + self.source = source; + } + + pub(crate) fn metrics(&mut self) -> &ReaderMetrics { + match &self.source { + Source::RowGroup(r) => r.metrics(), + Source::LastRow(_) => &self.metrics, + } + } + + pub(crate) async fn next_batch(&mut self) -> Result> { + while let Some(b) = self.source.next_batch().await? { + match self.prune(b)? { + Some(b) => { + return Ok(Some(b)); + } + None => { + continue; + } + } + } + Ok(None) + } + + /// Prunes batches by the pushed down predicate. + fn prune(&mut self, batch: Batch) -> Result> { + // fast path + if self.context.filters().is_empty() { + return Ok(Some(batch)); + } + + let num_rows_before_filter = batch.num_rows(); + let Some(batch_filtered) = self.context.precise_filter(batch)? else { + // the entire batch is filtered out + self.metrics.num_rows_precise_filtered += num_rows_before_filter; + return Ok(None); + }; + + // update metric + let filtered_rows = num_rows_before_filter - batch_filtered.num_rows(); + self.metrics.num_rows_precise_filtered += filtered_rows; + + if !batch_filtered.is_empty() { + Ok(Some(batch_filtered)) + } else { + Ok(None) + } + } +} diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index e61e01bdd4..d336dcc2e6 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -313,7 +313,7 @@ impl ScanRegion { .with_append_mode(self.version.options.append_mode) .with_filter_deleted(filter_deleted) .with_merge_mode(self.version.options.merge_mode()) - .with_series_row_selector(self.request.series_row_selector.clone()); + .with_series_row_selector(self.request.series_row_selector); Ok(input) } diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 0eb3f89d33..2e42c388c4 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -103,7 +103,11 @@ impl SeqScan { } /// Builds sources from a [ScanPart]. - fn build_part_sources(part: &ScanPart, sources: &mut Vec) -> Result<()> { + fn build_part_sources( + part: &ScanPart, + sources: &mut Vec, + row_selector: Option, + ) -> Result<()> { sources.reserve(part.memtable_ranges.len() + part.file_ranges.len()); // Read memtables. for mem in &part.memtable_ranges { @@ -125,7 +129,7 @@ impl SeqScan { let region_id = ranges[0].file_handle().region_id(); let range_num = ranges.len(); for range in ranges { - let mut reader = range.reader().await?; + let mut reader = range.reader(row_selector).await?; let compat_batch = range.compat_batch(); while let Some(mut batch) = reader.next_batch().await? { if let Some(compat) = compat_batch { @@ -166,7 +170,7 @@ impl SeqScan { return Ok(None); }; - Self::build_part_sources(part, &mut sources)?; + Self::build_part_sources(part, &mut sources, None)?; } Self::build_reader_from_sources(stream_ctx, sources, semaphore).await @@ -189,7 +193,7 @@ impl SeqScan { return Ok(None); }; - Self::build_part_sources(part, &mut sources)?; + Self::build_part_sources(part, &mut sources, stream_ctx.input.series_row_selector)?; Self::build_reader_from_sources(stream_ctx, sources, semaphore).await } diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index 2e4fd023e9..1de53b40a5 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -182,15 +182,15 @@ impl RegionScanner for UnorderedScan { let mut reader_metrics = ReaderMetrics::default(); // Safety: UnorderedDistributor::build_parts() ensures this. for file_range in &part.file_ranges[0] { - let reader = file_range.reader().await.map_err(BoxedError::new).context(ExternalSnafu)?; + let reader = file_range.reader(None).await.map_err(BoxedError::new).context(ExternalSnafu)?; let compat_batch = file_range.compat_batch(); - let mut source = Source::RowGroupReader(reader); + let mut source = Source::PruneReader(reader); while let Some(batch) = Self::fetch_from_source(&mut source, mapper, cache, compat_batch, &mut metrics).await? { metrics.num_batches += 1; metrics.num_rows += batch.num_rows(); yield batch; } - if let Source::RowGroupReader(reader) = source { + if let Source::PruneReader(mut reader) = source { reader_metrics.merge_from(reader.metrics()); } } diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index a5fc417b1b..6c51f8258c 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -18,14 +18,20 @@ use std::ops::BitAnd; use std::sync::Arc; -use api::v1::SemanticType; +use api::v1::{OpType, SemanticType}; +use common_telemetry::error; use datatypes::arrow::array::BooleanArray; use datatypes::arrow::buffer::BooleanBuffer; use parquet::arrow::arrow_reader::RowSelection; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; +use store_api::storage::TimeSeriesRowSelector; -use crate::error::{FieldTypeMismatchSnafu, FilterRecordBatchSnafu, Result}; +use crate::error::{ + DecodeStatsSnafu, FieldTypeMismatchSnafu, FilterRecordBatchSnafu, Result, StatsNotPresentSnafu, +}; use crate::read::compat::CompatBatch; +use crate::read::last_row::LastRowReader; +use crate::read::prune::PruneReader; use crate::read::Batch; use crate::row_converter::{McmpRowCodec, RowCodec}; use crate::sst::file::FileHandle; @@ -58,15 +64,69 @@ impl FileRange { } } + /// Returns true if [FileRange] selects all rows in row group. + fn select_all(&self) -> bool { + let rows_in_group = self + .context + .reader_builder + .parquet_metadata() + .row_group(self.row_group_idx) + .num_rows(); + + let Some(row_selection) = &self.row_selection else { + return true; + }; + row_selection.row_count() == rows_in_group as usize + } + /// Returns a reader to read the [FileRange]. - pub(crate) async fn reader(&self) -> Result { + pub(crate) async fn reader( + &self, + selector: Option, + ) -> Result { let parquet_reader = self .context .reader_builder .build(self.row_group_idx, self.row_selection.clone()) .await?; - Ok(RowGroupReader::new(self.context.clone(), parquet_reader)) + let use_last_row_reader = if selector + .map(|s| s == TimeSeriesRowSelector::LastRow) + .unwrap_or(false) + { + // Only use LastRowReader if row group does not contain DELETE + // and all rows are selected. + let put_only = !self + .context + .contains_delete(self.row_group_idx) + .inspect_err(|e| { + error!(e; "Failed to decode min value of op_type, fallback to RowGroupReader"); + }) + .unwrap_or(true); + put_only && self.select_all() + } else { + // No selector provided, use RowGroupReader + false + }; + + let prune_reader = if use_last_row_reader { + // Row group is PUT only, use LastRowReader to skip unnecessary rows. + PruneReader::new_with_last_row_reader( + self.context.clone(), + LastRowReader::new(Box::new(RowGroupReader::new( + self.context.clone(), + parquet_reader, + )) as _), + ) + } else { + // Row group contains DELETE, fallback to default reader. + PruneReader::new_with_row_group_reader( + self.context.clone(), + RowGroupReader::new(self.context.clone(), parquet_reader), + ) + }; + + Ok(prune_reader) } /// Returns the helper to compat batches. @@ -144,6 +204,34 @@ impl FileRangeContext { pub(crate) fn precise_filter(&self, input: Batch) -> Result> { self.base.precise_filter(input) } + + //// Decodes parquet metadata and finds if row group contains delete op. + pub(crate) fn contains_delete(&self, row_group_index: usize) -> Result { + let metadata = self.reader_builder.parquet_metadata(); + let row_group_metadata = &metadata.row_groups()[row_group_index]; + + // safety: The last column of SST must be op_type + let column_metadata = &row_group_metadata.columns().last().unwrap(); + let stats = column_metadata.statistics().context(StatsNotPresentSnafu { + file_path: self.reader_builder.file_path(), + })?; + if stats.has_min_max_set() { + stats + .min_bytes() + .try_into() + .map(i32::from_le_bytes) + .map(|min_op_type| min_op_type == OpType::Delete as i32) + .ok() + .context(DecodeStatsSnafu { + file_path: self.reader_builder.file_path(), + }) + } else { + DecodeStatsSnafu { + file_path: self.reader_builder.file_path(), + } + .fail() + } + } } /// Common fields for a range to read and filter batches. diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 9483d37e6a..6b207a4bd0 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -50,6 +50,7 @@ use crate::metrics::{ PRECISE_FILTER_ROWS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED, }; +use crate::read::prune::{PruneReader, Source}; use crate::read::{Batch, BatchReader}; use crate::row_converter::{McmpRowCodec, SortField}; use crate::sst::file::FileHandle; @@ -694,34 +695,34 @@ fn time_range_to_predicate( } /// Parquet reader metrics. -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub(crate) struct ReaderMetrics { /// Number of row groups before filtering. - num_row_groups_before_filtering: usize, + pub(crate) num_row_groups_before_filtering: usize, /// Number of row groups filtered by fulltext index. - num_row_groups_fulltext_index_filtered: usize, + pub(crate) num_row_groups_fulltext_index_filtered: usize, /// Number of row groups filtered by inverted index. - num_row_groups_inverted_index_filtered: usize, + pub(crate) num_row_groups_inverted_index_filtered: usize, /// Number of row groups filtered by min-max index. - num_row_groups_min_max_filtered: usize, + pub(crate) num_row_groups_min_max_filtered: usize, /// Number of rows filtered by precise filter. - num_rows_precise_filtered: usize, + pub(crate) num_rows_precise_filtered: usize, /// Number of rows in row group before filtering. - num_rows_in_row_group_before_filtering: usize, + pub(crate) num_rows_in_row_group_before_filtering: usize, /// Number of rows in row group filtered by fulltext index. - num_rows_in_row_group_fulltext_index_filtered: usize, + pub(crate) num_rows_in_row_group_fulltext_index_filtered: usize, /// Number of rows in row group filtered by inverted index. - num_rows_in_row_group_inverted_index_filtered: usize, + pub(crate) num_rows_in_row_group_inverted_index_filtered: usize, /// Duration to build the parquet reader. - build_cost: Duration, + pub(crate) build_cost: Duration, /// Duration to scan the reader. - scan_cost: Duration, + pub(crate) scan_cost: Duration, /// Number of record batches read. - num_record_batches: usize, + pub(crate) num_record_batches: usize, /// Number of batches decoded. - num_batches: usize, + pub(crate) num_batches: usize, /// Number of rows read. - num_rows: usize, + pub(crate) num_rows: usize, } impl ReaderMetrics { @@ -749,7 +750,7 @@ impl ReaderMetrics { pub(crate) struct RowGroupReaderBuilder { /// SST file to read. /// - /// Holds the file handle to avoid the file purge purge it. + /// Holds the file handle to avoid the file purge it. file_handle: FileHandle, /// Path of the file. file_path: String, @@ -776,6 +777,10 @@ impl RowGroupReaderBuilder { &self.file_handle } + pub(crate) fn parquet_metadata(&self) -> &Arc { + &self.parquet_meta + } + /// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`. pub(crate) async fn build( &self, @@ -816,16 +821,16 @@ impl RowGroupReaderBuilder { /// The state of a [ParquetReader]. enum ReaderState { /// The reader is reading a row group. - Readable(RowGroupReader), + Readable(PruneReader), /// The reader is exhausted. Exhausted(ReaderMetrics), } impl ReaderState { /// Returns the metrics of the reader. - fn metrics(&self) -> &ReaderMetrics { + fn metrics(&mut self) -> &ReaderMetrics { match self { - ReaderState::Readable(reader) => &reader.metrics, + ReaderState::Readable(reader) => reader.metrics(), ReaderState::Exhausted(m) => m, } } @@ -942,15 +947,19 @@ impl BatchReader for ParquetReader { .reader_builder() .build(row_group_idx, row_selection) .await?; + // Resets the parquet reader. - reader.reset_reader(parquet_reader); + reader.reset_source(Source::RowGroup(RowGroupReader::new( + self.context.clone(), + parquet_reader, + ))); if let Some(batch) = reader.next_batch().await? { return Ok(Some(batch)); } } // The reader is exhausted. - self.reader_state = ReaderState::Exhausted(std::mem::take(&mut reader.metrics)); + self.reader_state = ReaderState::Exhausted(reader.metrics().clone()); Ok(None) } } @@ -1019,7 +1028,10 @@ impl ParquetReader { .reader_builder() .build(row_group_idx, row_selection) .await?; - ReaderState::Readable(RowGroupReader::new(context.clone(), parquet_reader)) + ReaderState::Readable(PruneReader::new_with_row_group_reader( + context.clone(), + RowGroupReader::new(context.clone(), parquet_reader), + )) } else { ReaderState::Exhausted(ReaderMetrics::default()) }; @@ -1070,13 +1082,17 @@ impl RowGroupReader { &self.metrics } - /// Resets the parquet reader. - fn reset_reader(&mut self, reader: ParquetRecordBatchReader) { - self.reader = reader; + /// Tries to fetch next [RecordBatch] from the reader. + fn fetch_next_record_batch(&mut self) -> Result> { + self.reader.next().transpose().context(ArrowReaderSnafu { + path: self.context.file_path(), + }) } +} - /// Tries to fetch next [Batch] from the reader. - pub(crate) async fn next_batch(&mut self) -> Result> { +#[async_trait::async_trait] +impl BatchReader for RowGroupReader { + async fn next_batch(&mut self) -> Result> { let scan_start = Instant::now(); if let Some(batch) = self.batches.pop_front() { self.metrics.num_rows += batch.num_rows(); @@ -1095,7 +1111,6 @@ impl RowGroupReader { self.context .read_format() .convert_record_batch(&record_batch, &mut self.batches)?; - self.prune_batches()?; self.metrics.num_batches += self.batches.len(); } let batch = self.batches.pop_front(); @@ -1103,45 +1118,6 @@ impl RowGroupReader { self.metrics.scan_cost += scan_start.elapsed(); Ok(batch) } - - /// Tries to fetch next [RecordBatch] from the reader. - /// - /// If the reader is exhausted, reads next row group. - fn fetch_next_record_batch(&mut self) -> Result> { - self.reader.next().transpose().context(ArrowReaderSnafu { - path: self.context.file_path(), - }) - } - - /// Prunes batches by the pushed down predicate. - fn prune_batches(&mut self) -> Result<()> { - // fast path - if self.context.filters().is_empty() { - return Ok(()); - } - - let mut new_batches = VecDeque::new(); - let batches = std::mem::take(&mut self.batches); - for batch in batches { - let num_rows_before_filter = batch.num_rows(); - let Some(batch_filtered) = self.context.precise_filter(batch)? else { - // the entire batch is filtered out - self.metrics.num_rows_precise_filtered += num_rows_before_filter; - continue; - }; - - // update metric - let filtered_rows = num_rows_before_filter - batch_filtered.num_rows(); - self.metrics.num_rows_precise_filtered += filtered_rows; - - if !batch_filtered.is_empty() { - new_batches.push_back(batch_filtered); - } - } - self.batches = new_batches; - - Ok(()) - } } #[cfg(test)] diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index 8316afce2b..02bd745ca1 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -17,7 +17,7 @@ use datafusion_expr::expr::Expr; use strum::Display; /// A hint on how to select rows from a time-series. -#[derive(Debug, Clone, PartialEq, Eq, Hash, Display)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)] pub enum TimeSeriesRowSelector { /// Only keep the last row of each time-series. LastRow,