From 141ed51dccb99ca83bc3d9b57a763b416c862bca Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Wed, 7 Feb 2024 16:20:00 +0800 Subject: [PATCH] feat(mito): adjust seg size of inverted index to finer granularity instead of row group level (#3289) * feat(mito): adjust seg size of inverted index to finer granularity instead of row group level Signed-off-by: Zhenchi * fix: wrong metric Signed-off-by: Zhenchi * fix: more suitable name Signed-off-by: Zhenchi * feat: BitVec instead Signed-off-by: Zhenchi --------- Signed-off-by: Zhenchi --- Cargo.lock | 1 + .../src/inverted_index/search/index_apply.rs | 18 +- .../search/index_apply/predicates_apply.rs | 55 ++-- src/mito2/Cargo.toml | 1 + src/mito2/src/access_layer.rs | 1 + src/mito2/src/cache/write_cache.rs | 1 + src/mito2/src/engine/basic_test.rs | 4 +- src/mito2/src/metrics.rs | 2 + src/mito2/src/sst/index.rs | 20 +- src/mito2/src/sst/index/applier.rs | 27 +- src/mito2/src/sst/index/creator.rs | 4 +- src/mito2/src/sst/parquet.rs | 6 + src/mito2/src/sst/parquet/reader.rs | 256 ++++++++++++------ src/mito2/src/sst/parquet/row_selection.rs | 128 +++++++++ 14 files changed, 408 insertions(+), 116 deletions(-) create mode 100644 src/mito2/src/sst/parquet/row_selection.rs diff --git a/Cargo.lock b/Cargo.lock index d2fe7181f8..0665afb7f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5299,6 +5299,7 @@ dependencies = [ "futures", "humantime-serde", "index", + "itertools 0.10.5", "lazy_static", "log-store", "memcomparable", diff --git a/src/index/src/inverted_index/search/index_apply.rs b/src/index/src/inverted_index/search/index_apply.rs index 24478d5e22..654796b4d0 100644 --- a/src/index/src/inverted_index/search/index_apply.rs +++ b/src/index/src/inverted_index/search/index_apply.rs @@ -14,14 +14,26 @@ mod predicates_apply; -use std::collections::BTreeSet; - use async_trait::async_trait; +use common_base::BitVec; pub use predicates_apply::PredicatesIndexApplier; use crate::inverted_index::error::Result; use crate::inverted_index::format::reader::InvertedIndexReader; +/// The output of an apply operation. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct ApplyOutput { + /// Bitmap of indices that match the predicates. + pub matched_segment_ids: BitVec, + + /// The total number of rows in the index. + pub total_row_count: usize, + + /// The number of rows in each segment. + pub segment_row_count: usize, +} + /// A trait for processing and transforming indices obtained from an inverted index. /// /// Applier instances are reusable and work with various `InvertedIndexReader` instances, @@ -35,7 +47,7 @@ pub trait IndexApplier: Send + Sync { &self, context: SearchContext, reader: &mut (dyn InvertedIndexReader + 'a), - ) -> Result>; + ) -> Result; /// Returns the memory usage of the applier. fn memory_usage(&self) -> usize; diff --git a/src/index/src/inverted_index/search/index_apply/predicates_apply.rs b/src/index/src/inverted_index/search/index_apply/predicates_apply.rs index aba2f8c999..93cdb201c8 100644 --- a/src/index/src/inverted_index/search/index_apply/predicates_apply.rs +++ b/src/index/src/inverted_index/search/index_apply/predicates_apply.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeSet; use std::mem::size_of; use async_trait::async_trait; @@ -26,7 +25,7 @@ use crate::inverted_index::search::fst_apply::{ }; use crate::inverted_index::search::fst_values_mapper::FstValuesMapper; use crate::inverted_index::search::index_apply::{ - IndexApplier, IndexNotFoundStrategy, SearchContext, + ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext, }; use crate::inverted_index::search::predicate::Predicate; @@ -48,8 +47,13 @@ impl IndexApplier for PredicatesIndexApplier { &self, context: SearchContext, reader: &mut (dyn InvertedIndexReader + 'a), - ) -> Result> { + ) -> Result { let metadata = reader.metadata().await?; + let mut output = ApplyOutput { + matched_segment_ids: BitVec::EMPTY, + total_row_count: metadata.total_row_count as _, + segment_row_count: metadata.segment_row_count as _, + }; let mut bitmap = Self::bitmap_full_range(&metadata); // TODO(zhongzc): optimize the order of applying to make it quicker to return empty. @@ -61,7 +65,7 @@ impl IndexApplier for PredicatesIndexApplier { let Some(meta) = metadata.metas.get(name) else { match context.index_not_found_strategy { IndexNotFoundStrategy::ReturnEmpty => { - return Ok(BTreeSet::default()); + return Ok(output); } IndexNotFoundStrategy::Ignore => { continue; @@ -81,7 +85,8 @@ impl IndexApplier for PredicatesIndexApplier { bitmap &= bm; } - Ok(bitmap.iter_ones().collect()) + output.matched_segment_ids = bitmap; + Ok(output) } /// Returns the memory usage of the applier. @@ -206,11 +211,14 @@ mod tests { _ => unreachable!(), } }); - let indices = applier + let output = applier .apply(SearchContext::default(), &mut mock_reader) .await .unwrap(); - assert_eq!(indices, BTreeSet::from_iter([0, 2, 4, 6])); + assert_eq!( + output.matched_segment_ids, + bitvec![u8, Lsb0; 1, 0, 1, 0, 1, 0, 1, 0] + ); // An index reader with a single tag "tag-0" but without value "tag-0_value-0" let mut mock_reader = MockInvertedIndexReader::new(); @@ -223,11 +231,11 @@ mod tests { "tag-0" => Ok(FstMap::from_iter([(b"tag-0_value-1", fst_value(2, 1))]).unwrap()), _ => unreachable!(), }); - let indices = applier + let output = applier .apply(SearchContext::default(), &mut mock_reader) .await .unwrap(); - assert!(indices.is_empty()); + assert_eq!(output.matched_segment_ids.count_ones(), 0); } #[tokio::test] @@ -260,11 +268,14 @@ mod tests { } }); - let indices = applier + let output = applier .apply(SearchContext::default(), &mut mock_reader) .await .unwrap(); - assert_eq!(indices, BTreeSet::from_iter([0, 4, 6])); + assert_eq!( + output.matched_segment_ids, + bitvec![u8, Lsb0; 1, 0, 0, 0, 1, 0, 1, 0] + ); } #[tokio::test] @@ -278,11 +289,14 @@ mod tests { .expect_metadata() .returning(|| Ok(mock_metas(["tag-0"]))); - let indices = applier + let output = applier .apply(SearchContext::default(), &mut mock_reader) .await .unwrap(); - assert_eq!(indices, BTreeSet::from_iter([0, 1, 2, 3, 4, 5, 6, 7])); // full range to scan + assert_eq!( + output.matched_segment_ids, + bitvec![u8, Lsb0; 1, 1, 1, 1, 1, 1, 1, 1] + ); // full range to scan } #[tokio::test] @@ -303,11 +317,11 @@ mod tests { fst_appliers: vec![(s("tag-0"), Box::new(mock_fst_applier))], }; - let indices = applier + let output = applier .apply(SearchContext::default(), &mut mock_reader) .await .unwrap(); - assert!(indices.is_empty()); + assert!(output.matched_segment_ids.is_empty()); } #[tokio::test] @@ -334,7 +348,7 @@ mod tests { .await; assert!(matches!(result, Err(Error::IndexNotFound { .. }))); - let indices = applier + let output = applier .apply( SearchContext { index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty, @@ -343,9 +357,9 @@ mod tests { ) .await .unwrap(); - assert!(indices.is_empty()); + assert!(output.matched_segment_ids.is_empty()); - let indices = applier + let output = applier .apply( SearchContext { index_not_found_strategy: IndexNotFoundStrategy::Ignore, @@ -354,7 +368,10 @@ mod tests { ) .await .unwrap(); - assert_eq!(indices, BTreeSet::from_iter([0, 1, 2, 3, 4, 5, 6, 7])); + assert_eq!( + output.matched_segment_ids, + bitvec![u8, Lsb0; 1, 1, 1, 1, 1, 1, 1, 1] + ); } #[test] diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 78a659876d..92f1c63525 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -41,6 +41,7 @@ datatypes.workspace = true futures.workspace = true humantime-serde.workspace = true index.workspace = true +itertools.workspace = true lazy_static = "1.4" log-store = { workspace = true, optional = true } memcomparable = "0.2" diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 630a27f436..eb049c6f3a 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -139,6 +139,7 @@ impl AccessLayer { file_id, file_path: index_file_path, metadata: &request.metadata, + segment_row_count: write_opts.index_segment_row_count, row_group_size: write_opts.row_group_size, object_store: self.object_store.clone(), intermediate_manager: self.intermediate_manager.clone(), diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index efeee37ce5..3e66e4bf90 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -114,6 +114,7 @@ impl WriteCache { file_id, file_path: self.file_cache.cache_file_path(puffin_key), metadata: &write_request.metadata, + segment_row_count: write_opts.index_segment_row_count, row_group_size: write_opts.row_group_size, object_store: self.file_cache.local_store(), intermediate_manager: self.intermediate_manager.clone(), diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index e4d0afc60e..878932fbb4 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -551,10 +551,10 @@ async fn test_region_usage() { let region_stat = region.region_usage().await; assert_eq!(region_stat.wal_usage, 0); - assert_eq!(region_stat.sst_usage, 3006); + assert_eq!(region_stat.sst_usage, 3005); // region total usage - assert_eq!(region_stat.disk_usage(), 4072); + assert_eq!(region_stat.disk_usage(), 4071); } #[tokio::test] diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index 90a6d5b5b0..374b5954a8 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -132,6 +132,8 @@ lazy_static! { /// Counter of filtered rows by precise filter. pub static ref PRECISE_FILTER_ROWS_TOTAL: IntCounterVec = register_int_counter_vec!("greptime_mito_precise_filter_rows_total", "mito precise filter rows total", &[TYPE_LABEL]).unwrap(); + pub static ref READ_ROWS_IN_ROW_GROUP_TOTAL: IntCounterVec = + register_int_counter_vec!("greptime_mito_read_rows_in_row_group_total", "mito read rows in row group total", &[TYPE_LABEL]).unwrap(); // ------- End of query metrics. // Cache related metrics. diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 2c26f63895..00f432efac 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -129,6 +129,7 @@ pub(crate) struct IndexerBuilder<'a> { pub(crate) file_path: String, pub(crate) metadata: &'a RegionMetadataRef, pub(crate) row_group_size: usize, + pub(crate) segment_row_count: usize, pub(crate) object_store: ObjectStore, pub(crate) intermediate_manager: IntermediateManager, } @@ -153,6 +154,14 @@ impl<'a> IndexerBuilder<'a> { return Indexer::default(); } + let Some(mut segment_row_count) = NonZeroUsize::new(self.segment_row_count) else { + warn!( + "Segment row count is 0, skip creating index, region_id: {}, file_id: {}", + self.metadata.region_id, self.file_id, + ); + return Indexer::default(); + }; + let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else { warn!( "Row group size is 0, skip creating index, region_id: {}, file_id: {}", @@ -161,6 +170,11 @@ impl<'a> IndexerBuilder<'a> { return Indexer::default(); }; + // if segment row count not aligned with row group size, adjust it to be aligned. + if row_group_size.get() % segment_row_count.get() != 0 { + segment_row_count = row_group_size; + } + let creator = SstIndexCreator::new( self.file_path, self.file_id, @@ -168,7 +182,7 @@ impl<'a> IndexerBuilder<'a> { self.object_store, self.intermediate_manager, self.mem_threshold_index_create, - row_group_size, + segment_row_count, ) .with_buffer_size(self.write_buffer_size); @@ -263,6 +277,7 @@ mod tests { file_id: FileId::random(), file_path: "test".to_string(), metadata: &metadata, + segment_row_count: 16, row_group_size: 1024, object_store: mock_object_store(), intermediate_manager: mock_intm_mgr(), @@ -282,6 +297,7 @@ mod tests { file_id: FileId::random(), file_path: "test".to_string(), metadata: &metadata, + segment_row_count: 16, row_group_size: 1024, object_store: mock_object_store(), intermediate_manager: mock_intm_mgr(), @@ -301,6 +317,7 @@ mod tests { file_id: FileId::random(), file_path: "test".to_string(), metadata: &metadata, + segment_row_count: 16, row_group_size: 1024, object_store: mock_object_store(), intermediate_manager: mock_intm_mgr(), @@ -320,6 +337,7 @@ mod tests { file_id: FileId::random(), file_path: "test".to_string(), metadata: &metadata, + segment_row_count: 0, row_group_size: 0, object_store: mock_object_store(), intermediate_manager: mock_intm_mgr(), diff --git a/src/mito2/src/sst/index/applier.rs b/src/mito2/src/sst/index/applier.rs index 3355f5c2d9..f14251afb9 100644 --- a/src/mito2/src/sst/index/applier.rs +++ b/src/mito2/src/sst/index/applier.rs @@ -14,13 +14,12 @@ pub mod builder; -use std::collections::BTreeSet; use std::sync::Arc; use futures::{AsyncRead, AsyncSeek}; use index::inverted_index::format::reader::InvertedIndexBlobReader; use index::inverted_index::search::index_apply::{ - IndexApplier, IndexNotFoundStrategy, SearchContext, + ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext, }; use object_store::ObjectStore; use puffin::file_format::reader::{PuffinAsyncReader, PuffinFileReader}; @@ -84,7 +83,7 @@ impl SstIndexApplier { } /// Applies predicates to the provided SST file id and returns the relevant row group ids - pub async fn apply(&self, file_id: FileId) -> Result> { + pub async fn apply(&self, file_id: FileId) -> Result { let _timer = INDEX_APPLY_ELAPSED.start_timer(); let context = SearchContext { @@ -175,6 +174,7 @@ impl Drop for SstIndexApplier { #[cfg(test)] mod tests { + use common_base::BitVec; use futures::io::Cursor; use index::inverted_index::search::index_apply::MockIndexApplier; use object_store::services::Memory; @@ -203,9 +203,13 @@ mod tests { let mut mock_index_applier = MockIndexApplier::new(); mock_index_applier.expect_memory_usage().returning(|| 100); - mock_index_applier - .expect_apply() - .returning(|_, _| Ok(BTreeSet::from_iter([1, 2, 3]))); + mock_index_applier.expect_apply().returning(|_, _| { + Ok(ApplyOutput { + matched_segment_ids: BitVec::EMPTY, + total_row_count: 100, + segment_row_count: 10, + }) + }); let sst_index_applier = SstIndexApplier::new( region_dir.clone(), @@ -214,8 +218,15 @@ mod tests { None, Box::new(mock_index_applier), ); - let ids = sst_index_applier.apply(file_id).await.unwrap(); - assert_eq!(ids, BTreeSet::from_iter([1, 2, 3])); + let output = sst_index_applier.apply(file_id).await.unwrap(); + assert_eq!( + output, + ApplyOutput { + matched_segment_ids: BitVec::EMPTY, + total_row_count: 100, + segment_row_count: 10, + } + ); } #[tokio::test] diff --git a/src/mito2/src/sst/index/creator.rs b/src/mito2/src/sst/index/creator.rs index 51e4e67f38..cb4f3433f0 100644 --- a/src/mito2/src/sst/index/creator.rs +++ b/src/mito2/src/sst/index/creator.rs @@ -84,7 +84,7 @@ impl SstIndexCreator { index_store: ObjectStore, intermediate_manager: IntermediateManager, memory_usage_threshold: Option, - row_group_size: NonZeroUsize, + segment_row_count: NonZeroUsize, ) -> Self { // `memory_usage_threshold` is the total memory usage threshold of the index creation, // so we need to divide it by the number of columns @@ -96,7 +96,7 @@ impl SstIndexCreator { intermediate_manager, )); let sorter = ExternalSorter::factory(temp_file_provider.clone() as _, memory_threshold); - let index_creator = Box::new(SortIndexCreator::new(sorter, row_group_size)); + let index_creator = Box::new(SortIndexCreator::new(sorter, segment_row_count)); let codec = IndexValuesCodec::from_tag_columns(metadata.primary_key_columns()); Self { diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 56485a17a3..c88c29d8bb 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -20,6 +20,7 @@ pub(crate) mod metadata; mod page_reader; pub mod reader; pub mod row_group; +mod row_selection; mod stats; pub mod writer; @@ -38,6 +39,8 @@ pub const PARQUET_METADATA_KEY: &str = "greptime:metadata"; pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024; /// Default row group size for parquet files. const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE; +/// Default segment row count for inverted index. +const DEFAULT_INDEX_SEGMENT_ROW_COUNT: usize = 1024; /// Parquet write options. #[derive(Debug)] @@ -46,6 +49,8 @@ pub struct WriteOptions { pub write_buffer_size: ReadableSize, /// Row group size. pub row_group_size: usize, + /// Segment row count for inverted index. + pub index_segment_row_count: usize, } impl Default for WriteOptions { @@ -53,6 +58,7 @@ impl Default for WriteOptions { WriteOptions { write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE, row_group_size: DEFAULT_ROW_GROUP_SIZE, + index_segment_row_count: DEFAULT_INDEX_SEGMENT_ROW_COUNT, } } } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 06b6442917..827e5d851c 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -14,7 +14,7 @@ //! Parquet reader. -use std::collections::{BTreeSet, VecDeque}; +use std::collections::{BTreeMap, VecDeque}; use std::ops::BitAnd; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -27,8 +27,9 @@ use common_time::range::TimestampRange; use datafusion_common::arrow::array::BooleanArray; use datafusion_common::arrow::buffer::BooleanBuffer; use datatypes::arrow::record_batch::RecordBatch; +use itertools::Itertools; use object_store::ObjectStore; -use parquet::arrow::arrow_reader::ParquetRecordBatchReader; +use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection}; use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask}; use parquet::file::metadata::ParquetMetaData; use parquet::format::KeyValue; @@ -43,7 +44,8 @@ use crate::error::{ InvalidParquetSnafu, ReadParquetSnafu, Result, }; use crate::metrics::{ - PRECISE_FILTER_ROWS_TOTAL, READ_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED, + PRECISE_FILTER_ROWS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_TOTAL, + READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED, }; use crate::read::{Batch, BatchReader}; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; @@ -52,6 +54,7 @@ use crate::sst::index::applier::SstIndexApplierRef; use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::metadata::MetadataLoader; use crate::sst::parquet::row_group::InMemoryRowGroup; +use crate::sst::parquet::row_selection::row_selection_from_row_ranges; use crate::sst::parquet::stats::RowGroupPruningStats; use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY}; @@ -161,7 +164,6 @@ impl ParquetReaderBuilder { let mut metrics = Metrics::default(); - // Computes row groups to read. let row_groups = self .row_groups_to_read(&read_format, &parquet_meta, &mut metrics) .await; @@ -265,85 +267,162 @@ impl ParquetReaderBuilder { Ok(metadata) } - /// Computes row groups to read. + /// Computes row groups to read, along with their respective row selections. async fn row_groups_to_read( &self, read_format: &ReadFormat, parquet_meta: &ParquetMetaData, metrics: &mut Metrics, - ) -> BTreeSet { - let mut row_group_ids: BTreeSet<_> = (0..parquet_meta.num_row_groups()).collect(); - metrics.num_row_groups_unfiltered += row_group_ids.len(); - - // Applies index to prune row groups. - // - // TODO(zhongzc): Devise a mechanism to enforce the non-use of indices - // as an escape route in case of index issues, and it can be used to test - // the correctness of the index. - if let Some(index_applier) = &self.index_applier { - if self.file_handle.meta().inverted_index_available() { - match index_applier.apply(self.file_handle.file_id()).await { - Ok(row_groups) => row_group_ids = row_groups, - Err(err) => { - if cfg!(any(test, feature = "test")) { - panic!( - "Failed to apply index, region_id: {}, file_id: {}, err: {}", - self.file_handle.region_id(), - self.file_handle.file_id(), - err - ); - } else { - warn!( - err; "Failed to apply index, region_id: {}, file_id: {}", - self.file_handle.region_id(), self.file_handle.file_id() - ); - } - } - } - } + ) -> BTreeMap> { + let num_row_groups = parquet_meta.num_row_groups(); + if num_row_groups == 0 { + return BTreeMap::default(); } - metrics.num_row_groups_inverted_index_selected += row_group_ids.len(); + metrics.num_row_groups_before_filtering += num_row_groups; - if row_group_ids.is_empty() { - return row_group_ids; - } + self.prune_row_groups_by_inverted_index(parquet_meta, metrics) + .await + .or_else(|| self.prune_row_groups_by_minmax(read_format, parquet_meta, metrics)) + .unwrap_or_else(|| (0..num_row_groups).map(|i| (i, None)).collect()) + } - // Prunes row groups by min-max index. - if let Some(predicate) = &self.predicate { - let region_meta = read_format.metadata(); - let column_ids = region_meta - .column_metadatas - .iter() - .map(|c| c.column_id) - .collect(); - - let row_groups = row_group_ids - .iter() - .map(|id| parquet_meta.row_group(*id)) - .collect::>(); - let stats = RowGroupPruningStats::new(&row_groups, read_format, column_ids); - let mut mask = predicate - .prune_with_stats(&stats, region_meta.schema.arrow_schema()) - .into_iter(); - - row_group_ids.retain(|_| mask.next().unwrap_or(false)); + /// Applies index to prune row groups. + /// + /// TODO(zhongzc): Devise a mechanism to enforce the non-use of indices + /// as an escape route in case of index issues, and it can be used to test + /// the correctness of the index. + async fn prune_row_groups_by_inverted_index( + &self, + parquet_meta: &ParquetMetaData, + metrics: &mut Metrics, + ) -> Option>> { + let Some(index_applier) = &self.index_applier else { + return None; }; - metrics.num_row_groups_min_max_selected += row_group_ids.len(); - row_group_ids + if !self.file_handle.meta().inverted_index_available() { + return None; + } + + let output = match index_applier.apply(self.file_handle.file_id()).await { + Ok(output) => output, + Err(err) => { + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to apply index, region_id: {}, file_id: {}, err: {}", + self.file_handle.region_id(), + self.file_handle.file_id(), + err + ); + } else { + warn!( + err; "Failed to apply index, region_id: {}, file_id: {}", + self.file_handle.region_id(), self.file_handle.file_id() + ); + } + + return None; + } + }; + + // Let's assume that the number of rows in the first row group + // can represent the `row_group_size` of the Parquet file. + // + // If the file contains only one row group, i.e. the number of rows + // less than the `row_group_size`, the calculation of `row_group_id` + // and `rg_begin_row_id` is still correct. + let row_group_size = parquet_meta.row_group(0).num_rows() as usize; + if row_group_size == 0 { + return None; + } + + let segment_row_count = output.segment_row_count; + let row_groups = output + .matched_segment_ids + .iter_ones() + .map(|seg_id| { + let begin_row_id = seg_id * segment_row_count; + let row_group_id = begin_row_id / row_group_size; + + let rg_begin_row_id = begin_row_id % row_group_size; + let rg_end_row_id = rg_begin_row_id + segment_row_count; + + (row_group_id, rg_begin_row_id..rg_end_row_id) + }) + .group_by(|(row_group_id, _)| *row_group_id) + .into_iter() + .map(|(row_group_id, group)| { + let row_ranges = group.map(|(_, range)| range); + + let total_row_count = parquet_meta.row_group(row_group_id).num_rows() as usize; + let (row_selection, skipped) = + row_selection_from_row_ranges(row_ranges, total_row_count); + + metrics.num_rows_in_row_group_before_filtering += total_row_count; + metrics.num_rows_in_row_group_inverted_index_filtered += skipped; + + (row_group_id, Some(row_selection)) + }) + .collect::>(); + + let filtered = parquet_meta.num_row_groups() - row_groups.len(); + metrics.num_row_groups_inverted_index_filtered += filtered; + + Some(row_groups) + } + + /// Prunes row groups by min-max index. + fn prune_row_groups_by_minmax( + &self, + read_format: &ReadFormat, + parquet_meta: &ParquetMetaData, + metrics: &mut Metrics, + ) -> Option>> { + let Some(predicate) = &self.predicate else { + return None; + }; + + let num_row_groups = parquet_meta.num_row_groups(); + + let region_meta = read_format.metadata(); + let column_ids = region_meta + .column_metadatas + .iter() + .map(|c| c.column_id) + .collect(); + + let row_groups = parquet_meta.row_groups(); + let stats = RowGroupPruningStats::new(row_groups, read_format, column_ids); + let row_groups = predicate + .prune_with_stats(&stats, region_meta.schema.arrow_schema()) + .iter() + .zip(0..num_row_groups) + .filter(|&(mask, _)| *mask) + .map(|(_, id)| (id, None)) + .collect::>(); + + let filtered = num_row_groups - row_groups.len(); + metrics.num_row_groups_min_max_filtered += filtered; + + Some(row_groups) } } /// Parquet reader metrics. #[derive(Debug, Default)] struct Metrics { - /// Number of unfiltered row groups. - num_row_groups_unfiltered: usize, - /// Number of row groups to read after filtering by inverted index. - num_row_groups_inverted_index_selected: usize, - /// Number of row groups to read after filtering by min-max index. - num_row_groups_min_max_selected: usize, + /// Number of row groups before filtering. + num_row_groups_before_filtering: usize, + /// Number of row groups filtered by inverted index. + num_row_groups_inverted_index_filtered: usize, + /// Number of row groups filtered by min-max index. + num_row_groups_min_max_filtered: usize, + /// Number of rows filtered by precise filter. num_rows_precise_filtered: usize, + /// Number of rows in row group before filtering. + num_rows_in_row_group_before_filtering: usize, + /// Number of rows in row group filtered by inverted index. + num_rows_in_row_group_inverted_index_filtered: usize, /// Duration to build the parquet reader. build_cost: Duration, /// Duration to scan the reader. @@ -383,7 +462,11 @@ impl RowGroupReaderBuilder { } /// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`. - async fn build(&mut self, row_group_idx: usize) -> Result { + async fn build( + &mut self, + row_group_idx: usize, + row_selection: Option, + ) -> Result { let mut row_group = InMemoryRowGroup::create( self.file_handle.region_id(), self.file_handle.file_id(), @@ -395,7 +478,7 @@ impl RowGroupReaderBuilder { ); // Fetches data into memory. row_group - .fetch(&self.projection, None) + .fetch(&self.projection, row_selection.as_ref()) .await .context(ReadParquetSnafu { path: &self.file_path, @@ -407,7 +490,7 @@ impl RowGroupReaderBuilder { &self.field_levels, &row_group, DEFAULT_READ_BATCH_SIZE, - None, + row_selection, ) .context(ReadParquetSnafu { path: &self.file_path, @@ -417,8 +500,8 @@ impl RowGroupReaderBuilder { /// Parquet batch reader to read our SST format. pub struct ParquetReader { - /// Indices of row groups to read. - row_groups: BTreeSet, + /// Indices of row groups to read, along with their respective row selections. + row_groups: BTreeMap>, /// Helper to read record batches. /// /// Not `None` if [ParquetReader::stream] is not `None`. @@ -477,8 +560,10 @@ impl Drop for ParquetReader { self.reader_builder.file_handle.region_id(), self.reader_builder.file_handle.file_id(), self.reader_builder.file_handle.time_range(), - self.metrics.num_row_groups_min_max_selected, - self.metrics.num_row_groups_unfiltered, + self.metrics.num_row_groups_before_filtering + - self.metrics.num_row_groups_inverted_index_filtered + - self.metrics.num_row_groups_min_max_filtered, + self.metrics.num_row_groups_before_filtering, self.metrics ); @@ -493,17 +578,23 @@ impl Drop for ParquetReader { .with_label_values(&["parquet"]) .inc_by(self.metrics.num_rows as u64); READ_ROW_GROUPS_TOTAL - .with_label_values(&["unfiltered"]) - .inc_by(self.metrics.num_row_groups_unfiltered as u64); + .with_label_values(&["before_filtering"]) + .inc_by(self.metrics.num_row_groups_before_filtering as u64); READ_ROW_GROUPS_TOTAL - .with_label_values(&["inverted_index_selected"]) - .inc_by(self.metrics.num_row_groups_inverted_index_selected as u64); + .with_label_values(&["inverted_index_filtered"]) + .inc_by(self.metrics.num_row_groups_inverted_index_filtered as u64); READ_ROW_GROUPS_TOTAL - .with_label_values(&["min_max_index_selected"]) - .inc_by(self.metrics.num_row_groups_min_max_selected as u64); + .with_label_values(&["minmax_index_filtered"]) + .inc_by(self.metrics.num_row_groups_min_max_filtered as u64); PRECISE_FILTER_ROWS_TOTAL .with_label_values(&["parquet"]) .inc_by(self.metrics.num_rows_precise_filtered as u64); + READ_ROWS_IN_ROW_GROUP_TOTAL + .with_label_values(&["before_filtering"]) + .inc_by(self.metrics.num_rows_in_row_group_before_filtering as u64); + READ_ROWS_IN_ROW_GROUP_TOTAL + .with_label_values(&["inverted_index_filtered"]) + .inc_by(self.metrics.num_rows_in_row_group_inverted_index_filtered as u64); } } @@ -531,8 +622,11 @@ impl ParquetReader { } // No more items in current row group, reads next row group. - while let Some(row_group_idx) = self.row_groups.pop_first() { - let mut row_group_reader = self.reader_builder.build(row_group_idx).await?; + while let Some((row_group_idx, row_selection)) = self.row_groups.pop_first() { + let mut row_group_reader = self + .reader_builder + .build(row_group_idx, row_selection) + .await?; let Some(record_batch) = row_group_reader .next() diff --git a/src/mito2/src/sst/parquet/row_selection.rs b/src/mito2/src/sst/parquet/row_selection.rs new file mode 100644 index 0000000000..93accf11ac --- /dev/null +++ b/src/mito2/src/sst/parquet/row_selection.rs @@ -0,0 +1,128 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::Range; + +use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; + +type SkipRowCount = usize; + +/// Converts an iterator of row ranges into a `RowSelection` by creating a sequence of `RowSelector`s. +/// Returns the `RowSelection` and the number of rows that were skipped. +/// +/// This function processes each range in the input and either creates a new selector or merges +/// with the existing one, depending on whether the current range is contiguous with the preceding one +/// or if there's a gap that requires skipping rows. It handles both "select" and "skip" actions, +/// optimizing the list of selectors by merging contiguous actions of the same type. +/// +/// Note: overlapping ranges are not supported and will result in an incorrect selection. +pub(crate) fn row_selection_from_row_ranges( + row_ranges: impl Iterator>, + total_row_count: usize, +) -> (RowSelection, SkipRowCount) { + let mut selectors: Vec = Vec::new(); + let mut last_processed_end = 0; + let mut skip_row_count = 0; + + for Range { start, end } in row_ranges { + if start > last_processed_end { + add_or_merge_selector(&mut selectors, start - last_processed_end, true); + skip_row_count += start - last_processed_end; + } + + add_or_merge_selector(&mut selectors, end - start, false); + last_processed_end = end; + } + + skip_row_count += total_row_count.saturating_sub(last_processed_end); + (RowSelection::from(selectors), skip_row_count) +} + +/// Helper function to either add a new `RowSelector` to `selectors` or merge it with the last one +/// if they are of the same type (both skip or both select). +fn add_or_merge_selector(selectors: &mut Vec, count: usize, is_skip: bool) { + if let Some(last) = selectors.last_mut() { + // Merge with last if both actions are same + if last.skip == is_skip { + last.row_count += count; + return; + } + } + // Add new selector otherwise + let new_selector = if is_skip { + RowSelector::skip(count) + } else { + RowSelector::select(count) + }; + selectors.push(new_selector); +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_single_contiguous_range() { + let (selection, skipped) = row_selection_from_row_ranges(Some(5..10).into_iter(), 10); + let expected = RowSelection::from(vec![RowSelector::skip(5), RowSelector::select(5)]); + assert_eq!(selection, expected); + assert_eq!(skipped, 5); + } + + #[test] + fn test_non_contiguous_ranges() { + let ranges = vec![1..3, 5..8]; + let (selection, skipped) = row_selection_from_row_ranges(ranges.iter().cloned(), 10); + let expected = RowSelection::from(vec![ + RowSelector::skip(1), + RowSelector::select(2), + RowSelector::skip(2), + RowSelector::select(3), + ]); + assert_eq!(selection, expected); + assert_eq!(skipped, 5); + } + + #[test] + fn test_empty_range() { + let ranges = vec![]; + let (selection, skipped) = row_selection_from_row_ranges(ranges.iter().cloned(), 10); + let expected = RowSelection::from(vec![]); + assert_eq!(selection, expected); + assert_eq!(skipped, 10); + } + + #[test] + fn test_adjacent_ranges() { + let ranges = vec![1..2, 2..3]; + let (selection, skipped) = row_selection_from_row_ranges(ranges.iter().cloned(), 10); + let expected = RowSelection::from(vec![RowSelector::skip(1), RowSelector::select(2)]); + assert_eq!(selection, expected); + assert_eq!(skipped, 8); + } + + #[test] + fn test_large_gap_between_ranges() { + let ranges = vec![1..2, 100..101]; + let (selection, skipped) = row_selection_from_row_ranges(ranges.iter().cloned(), 10240); + let expected = RowSelection::from(vec![ + RowSelector::skip(1), + RowSelector::select(1), + RowSelector::skip(98), + RowSelector::select(1), + ]); + assert_eq!(selection, expected); + assert_eq!(skipped, 10238); + } +}