From 36d9346ffcc1cd81309235ece74df6a843ab0ab8 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Mon, 12 May 2025 15:15:17 +0800 Subject: [PATCH] refactor: introduce row group selection (#6075) Signed-off-by: Zhenchi --- src/mito2/src/read/range.rs | 27 +- src/mito2/src/read/scan_region.rs | 4 +- src/mito2/src/sst/parquet.rs | 2 +- src/mito2/src/sst/parquet/reader.rs | 603 ++-------------- src/mito2/src/sst/parquet/row_selection.rs | 804 ++++++++++++++++++++- 5 files changed, 880 insertions(+), 560 deletions(-) diff --git a/src/mito2/src/read/range.rs b/src/mito2/src/read/range.rs index 4d01016d64..1b480f19a4 100644 --- a/src/mito2/src/read/range.rs +++ b/src/mito2/src/read/range.rs @@ -14,11 +14,9 @@ //! Structs for partition ranges. -use std::collections::BTreeMap; use std::sync::{Arc, Mutex}; use common_time::Timestamp; -use parquet::arrow::arrow_reader::RowSelection; use smallvec::{smallvec, SmallVec}; use store_api::region_engine::PartitionRange; use store_api::storage::TimeSeriesDistribution; @@ -31,6 +29,7 @@ use crate::sst::file::{overlaps, FileHandle, FileTimeRange}; use crate::sst::parquet::file_range::{FileRange, FileRangeContextRef}; use crate::sst::parquet::format::parquet_row_group_time_range; use crate::sst::parquet::reader::ReaderMetrics; +use crate::sst::parquet::row_selection::RowGroupSelection; use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE; const ALL_ROW_GROUPS: i64 = -1; @@ -371,20 +370,16 @@ pub(crate) struct FileRangeBuilder { /// Context for the file. /// None indicates nothing to read. context: Option, - /// Row selections for each row group to read. - /// It skips the row group if it is not in the map. - row_groups: BTreeMap>, + /// Row group selection for the file to read. + selection: RowGroupSelection, } impl FileRangeBuilder { /// Builds a file range builder from context and row groups. - pub(crate) fn new( - context: FileRangeContextRef, - row_groups: BTreeMap>, - ) -> Self { + pub(crate) fn new(context: FileRangeContextRef, selection: RowGroupSelection) -> Self { Self { context: Some(context), - row_groups, + selection, } } @@ -397,21 +392,25 @@ impl FileRangeBuilder { if row_group_index >= 0 { let row_group_index = row_group_index as usize; // Scans one row group. - let Some(row_selection) = self.row_groups.get(&row_group_index) else { + let Some(row_selection) = self.selection.get(row_group_index) else { return; }; ranges.push(FileRange::new( context, row_group_index, - row_selection.clone(), + Some(row_selection.clone()), )); } else { // Scans all row groups. ranges.extend( - self.row_groups + self.selection .iter() .map(|(row_group_index, row_selection)| { - FileRange::new(context.clone(), *row_group_index, row_selection.clone()) + FileRange::new( + context.clone(), + *row_group_index, + Some(row_selection.clone()), + ) }), ); } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index a35dd00e1b..80b6cb275b 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -787,7 +787,7 @@ impl ScanInput { .expected_metadata(Some(self.mapper.metadata().clone())) .build_reader_input(reader_metrics) .await; - let (mut file_range_ctx, row_groups) = match res { + let (mut file_range_ctx, selection) = match res { Ok(x) => x, Err(e) => { if e.is_object_not_found() && self.ignore_file_not_found { @@ -810,7 +810,7 @@ impl ScanInput { )?; file_range_ctx.set_compat_batch(Some(compat)); } - Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), row_groups)) + Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection)) } /// Scans the input source in another task and sends batches to the sender. diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index fd06f92fc6..43c543b1b1 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -30,7 +30,7 @@ pub(crate) mod metadata; pub(crate) mod page_reader; pub mod reader; pub mod row_group; -mod row_selection; +pub mod row_selection; pub(crate) mod stats; pub mod writer; diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index ffa5c5d003..bf5c99d7b2 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -14,8 +14,7 @@ //! Parquet reader. -use std::collections::{BTreeMap, BTreeSet, VecDeque}; -use std::ops::Range; +use std::collections::VecDeque; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -27,7 +26,6 @@ use datafusion_expr::Expr; use datatypes::arrow::error::ArrowError; use datatypes::arrow::record_batch::RecordBatch; use datatypes::data_type::ConcreteDataType; -use itertools::Itertools; use object_store::ObjectStore; use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection}; use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask}; @@ -58,9 +56,7 @@ use crate::sst::parquet::file_range::{FileRangeContext, FileRangeContextRef}; use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::metadata::MetadataLoader; use crate::sst::parquet::row_group::InMemoryRowGroup; -use crate::sst::parquet::row_selection::{ - intersect_row_selections, row_selection_from_row_ranges, row_selection_from_sorted_row_ids, -}; +use crate::sst::parquet::row_selection::RowGroupSelection; use crate::sst::parquet::stats::RowGroupPruningStats; use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY}; @@ -176,8 +172,8 @@ impl ParquetReaderBuilder { pub async fn build(&self) -> Result { let mut metrics = ReaderMetrics::default(); - let (context, row_groups) = self.build_reader_input(&mut metrics).await?; - ParquetReader::new(Arc::new(context), row_groups).await + let (context, selection) = self.build_reader_input(&mut metrics).await?; + ParquetReader::new(Arc::new(context), selection).await } /// Builds a [FileRangeContext] and collects row groups to read. @@ -186,7 +182,7 @@ impl ParquetReaderBuilder { pub(crate) async fn build_reader_input( &self, metrics: &mut ReaderMetrics, - ) -> Result<(FileRangeContext, RowGroupMap)> { + ) -> Result<(FileRangeContext, RowGroupSelection)> { let start = Instant::now(); let file_path = self.file_handle.file_path(&self.file_dir); @@ -224,7 +220,7 @@ impl ParquetReaderBuilder { let field_levels = parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint) .context(ReadDataPartSnafu)?; - let row_groups = self + let selection = self .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics) .await; @@ -260,7 +256,7 @@ impl ParquetReaderBuilder { metrics.build_cost += start.elapsed(); - Ok((context, row_groups)) + Ok((context, selection)) } /// Decodes region metadata from key value. @@ -331,24 +327,24 @@ impl ParquetReaderBuilder { read_format: &ReadFormat, parquet_meta: &ParquetMetaData, metrics: &mut ReaderFilterMetrics, - ) -> BTreeMap> { + ) -> RowGroupSelection { let num_row_groups = parquet_meta.num_row_groups(); let num_rows = parquet_meta.file_metadata().num_rows(); if num_row_groups == 0 || num_rows == 0 { - return BTreeMap::default(); + return RowGroupSelection::default(); } // Let's assume that the number of rows in the first row group // can represent the `row_group_size` of the Parquet file. let row_group_size = parquet_meta.row_group(0).num_rows() as usize; if row_group_size == 0 { - return BTreeMap::default(); + return RowGroupSelection::default(); } metrics.rg_total += num_row_groups; metrics.rows_total += num_rows as usize; - let mut output = (0..num_row_groups).map(|i| (i, None)).collect(); + let mut output = RowGroupSelection::new(row_group_size, num_rows as _); self.prune_row_groups_by_fulltext_index(row_group_size, parquet_meta, &mut output, metrics) .await; @@ -357,7 +353,7 @@ impl ParquetReaderBuilder { } let inverted_filtered = self - .prune_row_groups_by_inverted_index(row_group_size, parquet_meta, &mut output, metrics) + .prune_row_groups_by_inverted_index(row_group_size, &mut output, metrics) .await; if output.is_empty() { return output; @@ -365,14 +361,19 @@ impl ParquetReaderBuilder { if !inverted_filtered { self.prune_row_groups_by_minmax(read_format, parquet_meta, &mut output, metrics); + if output.is_empty() { + return output; + } } - self.prune_row_groups_by_bloom_filter(parquet_meta, &mut output, metrics) + self.prune_row_groups_by_bloom_filter(row_group_size, parquet_meta, &mut output, metrics) .await; + if output.is_empty() { + return output; + } - self.prune_row_groups_by_fulltext_bloom(parquet_meta, &mut output, metrics) + self.prune_row_groups_by_fulltext_bloom(row_group_size, parquet_meta, &mut output, metrics) .await; - output } @@ -381,7 +382,7 @@ impl ParquetReaderBuilder { &self, row_group_size: usize, parquet_meta: &ParquetMetaData, - output: &mut BTreeMap>, + output: &mut RowGroupSelection, metrics: &mut ReaderFilterMetrics, ) -> bool { let Some(index_applier) = &self.fulltext_index_applier else { @@ -419,46 +420,21 @@ impl ParquetReaderBuilder { } }; - let row_group_to_row_ids = - Self::group_row_ids(apply_res, row_group_size, parquet_meta.num_row_groups()); - Self::prune_row_groups_by_rows( - parquet_meta, - row_group_to_row_ids, - output, - &mut metrics.rg_fulltext_filtered, - &mut metrics.rows_fulltext_filtered, + let selection = RowGroupSelection::from_row_ids( + apply_res, + row_group_size, + parquet_meta.num_row_groups(), ); + let intersection = output.intersect(&selection); + + metrics.rg_fulltext_filtered += output.row_group_count() - intersection.row_group_count(); + metrics.rows_fulltext_filtered += output.row_count() - intersection.row_count(); + + *output = intersection; true } - /// Groups row IDs into row groups, with each group's row IDs starting from 0. - fn group_row_ids( - row_ids: BTreeSet, - row_group_size: usize, - num_row_groups: usize, - ) -> Vec<(usize, Vec)> { - let est_rows_per_group = row_ids.len() / num_row_groups; - - let mut row_group_to_row_ids: Vec<(usize, Vec)> = Vec::with_capacity(num_row_groups); - for row_id in row_ids { - let row_group_id = row_id as usize / row_group_size; - let row_id_in_group = row_id as usize % row_group_size; - - if let Some((rg_id, row_ids)) = row_group_to_row_ids.last_mut() - && *rg_id == row_group_id - { - row_ids.push(row_id_in_group); - } else { - let mut row_ids = Vec::with_capacity(est_rows_per_group); - row_ids.push(row_id_in_group); - row_group_to_row_ids.push((row_group_id, row_ids)); - } - } - - row_group_to_row_ids - } - /// Applies index to prune row groups. /// /// TODO(zhongzc): Devise a mechanism to enforce the non-use of indices @@ -467,8 +443,7 @@ impl ParquetReaderBuilder { async fn prune_row_groups_by_inverted_index( &self, row_group_size: usize, - parquet_meta: &ParquetMetaData, - output: &mut BTreeMap>, + output: &mut RowGroupSelection, metrics: &mut ReaderFilterMetrics, ) -> bool { let Some(index_applier) = &self.inverted_index_applier else { @@ -503,32 +478,14 @@ impl ParquetReaderBuilder { } }; - let segment_row_count = apply_output.segment_row_count; - let grouped_in_row_groups = apply_output - .matched_segment_ids - .iter_ones() - .map(|seg_id| { - let begin_row_id = seg_id * segment_row_count; - let row_group_id = begin_row_id / row_group_size; + let selection = + RowGroupSelection::from_inverted_index_apply_output(row_group_size, apply_output); + let intersection = output.intersect(&selection); - let rg_begin_row_id = begin_row_id % row_group_size; - let rg_end_row_id = rg_begin_row_id + segment_row_count; + metrics.rg_inverted_filtered += output.row_group_count() - intersection.row_group_count(); + metrics.rows_inverted_filtered += output.row_count() - intersection.row_count(); - (row_group_id, rg_begin_row_id..rg_end_row_id) - }) - .chunk_by(|(row_group_id, _)| *row_group_id); - - let ranges_in_row_groups = grouped_in_row_groups - .into_iter() - .map(|(row_group_id, group)| (row_group_id, group.map(|(_, ranges)| ranges))); - - Self::prune_row_groups_by_ranges( - parquet_meta, - ranges_in_row_groups, - output, - &mut metrics.rg_inverted_filtered, - &mut metrics.rows_inverted_filtered, - ); + *output = intersection; true } @@ -538,14 +495,14 @@ impl ParquetReaderBuilder { &self, read_format: &ReadFormat, parquet_meta: &ParquetMetaData, - output: &mut BTreeMap>, + output: &mut RowGroupSelection, metrics: &mut ReaderFilterMetrics, ) -> bool { let Some(predicate) = &self.predicate else { return false; }; - let row_groups_before = output.len(); + let row_groups_before = output.row_group_count(); let region_meta = read_format.metadata(); let row_groups = parquet_meta.row_groups(); @@ -560,31 +517,27 @@ impl ParquetReaderBuilder { // Here we use the schema of the SST to build the physical expression. If the column // in the SST doesn't have the same column id as the column in the expected metadata, // we will get a None statistics for that column. - let res = predicate + predicate .prune_with_stats(&stats, prune_schema) .iter() .zip(0..parquet_meta.num_row_groups()) - .filter_map(|(mask, row_group)| { + .for_each(|(mask, row_group)| { if !*mask { - return None; + output.remove_row_group(row_group); } + }); - let selection = output.remove(&row_group)?; - Some((row_group, selection)) - }) - .collect::>(); - - let row_groups_after = res.len(); + let row_groups_after = output.row_group_count(); metrics.rg_minmax_filtered += row_groups_before - row_groups_after; - *output = res; true } async fn prune_row_groups_by_bloom_filter( &self, + row_group_size: usize, parquet_meta: &ParquetMetaData, - output: &mut BTreeMap>, + output: &mut RowGroupSelection, metrics: &mut ReaderFilterMetrics, ) -> bool { let Some(index_applier) = &self.bloom_filter_index_applier else { @@ -604,7 +557,7 @@ impl ParquetReaderBuilder { .row_groups() .iter() .enumerate() - .map(|(i, rg)| (rg.num_rows() as usize, output.contains_key(&i))), + .map(|(i, rg)| (rg.num_rows() as usize, output.contains_row_group(i))), ) .await { @@ -628,23 +581,22 @@ impl ParquetReaderBuilder { } }; - Self::prune_row_groups_by_ranges( - parquet_meta, - apply_output - .into_iter() - .map(|(rg, ranges)| (rg, ranges.into_iter())), - output, - &mut metrics.rg_bloom_filtered, - &mut metrics.rows_bloom_filtered, - ); + let selection = RowGroupSelection::from_row_ranges(apply_output, row_group_size); + let intersection = output.intersect(&selection); + + metrics.rg_bloom_filtered += output.row_group_count() - intersection.row_group_count(); + metrics.rows_bloom_filtered += output.row_count() - intersection.row_count(); + + *output = intersection; true } async fn prune_row_groups_by_fulltext_bloom( &self, + row_group_size: usize, parquet_meta: &ParquetMetaData, - output: &mut BTreeMap>, + output: &mut RowGroupSelection, metrics: &mut ReaderFilterMetrics, ) -> bool { let Some(index_applier) = &self.fulltext_index_applier else { @@ -664,7 +616,7 @@ impl ParquetReaderBuilder { .row_groups() .iter() .enumerate() - .map(|(i, rg)| (rg.num_rows() as usize, output.contains_key(&i))), + .map(|(i, rg)| (rg.num_rows() as usize, output.contains_row_group(i))), ) .await { @@ -689,123 +641,16 @@ impl ParquetReaderBuilder { } }; - Self::prune_row_groups_by_ranges( - parquet_meta, - apply_output - .into_iter() - .map(|(rg, ranges)| (rg, ranges.into_iter())), - output, - &mut metrics.rg_fulltext_filtered, - &mut metrics.rows_fulltext_filtered, - ); + let selection = RowGroupSelection::from_row_ranges(apply_output, row_group_size); + let intersection = output.intersect(&selection); + + metrics.rg_fulltext_filtered += output.row_group_count() - intersection.row_group_count(); + metrics.rows_fulltext_filtered += output.row_count() - intersection.row_count(); + + *output = intersection; true } - - /// Prunes row groups by rows. The `rows_in_row_groups` is like a map from row group to - /// a list of row ids to keep. - fn prune_row_groups_by_rows( - parquet_meta: &ParquetMetaData, - rows_in_row_groups: Vec<(usize, Vec)>, - output: &mut BTreeMap>, - filtered_row_groups: &mut usize, - filtered_rows: &mut usize, - ) { - let row_groups_before = output.len(); - let mut rows_in_row_group_before = 0; - let mut rows_in_row_group_after = 0; - - let mut res = BTreeMap::new(); - for (row_group, row_ids) in rows_in_row_groups { - let Some(selection) = output.remove(&row_group) else { - continue; - }; - - let total_row_count = parquet_meta.row_group(row_group).num_rows() as usize; - rows_in_row_group_before += selection - .as_ref() - .map_or(total_row_count, |s| s.row_count()); - - let new_selection = - row_selection_from_sorted_row_ids(row_ids.into_iter(), total_row_count); - let intersected_selection = intersect_row_selections(selection, Some(new_selection)); - - let num_rows_after = intersected_selection - .as_ref() - .map_or(total_row_count, |s| s.row_count()); - rows_in_row_group_after += num_rows_after; - - if num_rows_after > 0 { - res.insert(row_group, intersected_selection); - } - } - - // Pruned row groups. - while let Some((row_group, selection)) = output.pop_first() { - let total_row_count = parquet_meta.row_group(row_group).num_rows() as usize; - rows_in_row_group_before += selection - .as_ref() - .map_or(total_row_count, |s| s.row_count()); - } - - let row_groups_after = res.len(); - *filtered_row_groups += row_groups_before - row_groups_after; - *filtered_rows += rows_in_row_group_before - rows_in_row_group_after; - - *output = res; - } - - /// Prunes row groups by ranges. The `ranges_in_row_groups` is like a map from row group to - /// a list of row ranges to keep. - fn prune_row_groups_by_ranges( - parquet_meta: &ParquetMetaData, - ranges_in_row_groups: impl Iterator>)>, - output: &mut BTreeMap>, - filtered_row_groups: &mut usize, - filtered_rows: &mut usize, - ) { - let row_groups_before = output.len(); - let mut rows_in_row_group_before = 0; - let mut rows_in_row_group_after = 0; - - let mut res = BTreeMap::new(); - for (row_group, row_ranges) in ranges_in_row_groups { - let Some(selection) = output.remove(&row_group) else { - continue; - }; - - let total_row_count = parquet_meta.row_group(row_group).num_rows() as usize; - rows_in_row_group_before += selection - .as_ref() - .map_or(total_row_count, |s| s.row_count()); - - let new_selection = row_selection_from_row_ranges(row_ranges, total_row_count); - let intersected_selection = intersect_row_selections(selection, Some(new_selection)); - - let num_rows_after = intersected_selection - .as_ref() - .map_or(total_row_count, |s| s.row_count()); - rows_in_row_group_after += num_rows_after; - - if num_rows_after > 0 { - res.insert(row_group, intersected_selection); - } - } - - // Pruned row groups. - while let Some((row_group, selection)) = output.pop_first() { - let total_row_count = parquet_meta.row_group(row_group).num_rows() as usize; - rows_in_row_group_before += selection - .as_ref() - .map_or(total_row_count, |s| s.row_count()); - } - - let row_groups_after = res.len(); - *filtered_row_groups += row_groups_before - row_groups_after; - *filtered_rows += rows_in_row_group_before - rows_in_row_group_after; - - *output = res; - } } /// Metrics of filtering rows groups and rows. @@ -1119,14 +964,12 @@ fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) -> Some(!matches) } -type RowGroupMap = BTreeMap>; - /// Parquet batch reader to read our SST format. pub struct ParquetReader { /// File range context. context: FileRangeContextRef, - /// Indices of row groups to read, along with their respective row selections. - row_groups: RowGroupMap, + /// Row group selection to read. + selection: RowGroupSelection, /// Reader of current row group. reader_state: ReaderState, } @@ -1144,11 +987,11 @@ impl BatchReader for ParquetReader { } // No more items in current row group, reads next row group. - while let Some((row_group_idx, row_selection)) = self.row_groups.pop_first() { + while let Some((row_group_idx, row_selection)) = self.selection.pop_first() { let parquet_reader = self .context .reader_builder() - .build(row_group_idx, row_selection) + .build(row_group_idx, Some(row_selection)) .await?; // Resets the parquet reader. @@ -1198,15 +1041,12 @@ impl Drop for ParquetReader { impl ParquetReader { /// Creates a new reader. - async fn new( - context: FileRangeContextRef, - mut row_groups: BTreeMap>, - ) -> Result { + async fn new(context: FileRangeContextRef, mut selection: RowGroupSelection) -> Result { // No more items in current row group, reads next row group. - let reader_state = if let Some((row_group_idx, row_selection)) = row_groups.pop_first() { + let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() { let parquet_reader = context .reader_builder() - .build(row_group_idx, row_selection) + .build(row_group_idx, Some(row_selection)) .await?; ReaderState::Readable(PruneReader::new_with_row_group_reader( context.clone(), @@ -1218,7 +1058,7 @@ impl ParquetReader { Ok(ParquetReader { context, - row_groups, + selection, reader_state, }) } @@ -1354,300 +1194,3 @@ where self.next_inner() } } - -#[cfg(test)] -mod tests { - use parquet::arrow::arrow_reader::RowSelector; - use parquet::file::metadata::{FileMetaData, RowGroupMetaData}; - use parquet::schema::types::{SchemaDescriptor, Type}; - - use super::*; - - fn mock_parquet_metadata_from_row_groups(num_rows_in_row_groups: Vec) -> ParquetMetaData { - let tp = Arc::new(Type::group_type_builder("test").build().unwrap()); - let schema_descr = Arc::new(SchemaDescriptor::new(tp)); - - let mut row_groups = Vec::new(); - for num_rows in &num_rows_in_row_groups { - let row_group = RowGroupMetaData::builder(schema_descr.clone()) - .set_num_rows(*num_rows) - .build() - .unwrap(); - row_groups.push(row_group); - } - - let file_meta = FileMetaData::new( - 0, - num_rows_in_row_groups.iter().sum(), - None, - None, - schema_descr, - None, - ); - ParquetMetaData::new(file_meta, row_groups) - } - - #[test] - fn test_group_row_ids() { - let row_ids = [0, 1, 2, 5, 6, 7, 8, 12].into_iter().collect(); - let row_group_size = 5; - let num_row_groups = 3; - - let row_group_to_row_ids = - ParquetReaderBuilder::group_row_ids(row_ids, row_group_size, num_row_groups); - - assert_eq!( - row_group_to_row_ids, - vec![(0, vec![0, 1, 2]), (1, vec![0, 1, 2, 3]), (2, vec![2])] - ); - } - - #[test] - fn prune_row_groups_by_rows_from_empty() { - let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]); - - let rows_in_row_groups = vec![(0, vec![5, 6, 7, 8, 9]), (2, vec![0, 1, 2, 3, 4])]; - - // The original output is empty. No row groups are pruned. - let mut output = BTreeMap::new(); - let mut filtered_row_groups = 0; - let mut filtered_rows = 0; - - ParquetReaderBuilder::prune_row_groups_by_rows( - &parquet_meta, - rows_in_row_groups, - &mut output, - &mut filtered_row_groups, - &mut filtered_rows, - ); - - assert!(output.is_empty()); - assert_eq!(filtered_row_groups, 0); - assert_eq!(filtered_rows, 0); - } - - #[test] - fn prune_row_groups_by_rows_from_full() { - let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]); - - let rows_in_row_groups = vec![(0, vec![5, 6, 7, 8, 9]), (2, vec![0, 1, 2, 3, 4])]; - - // The original output is full. - let mut output = BTreeMap::from([(0, None), (1, None), (2, None)]); - let mut filtered_row_groups = 0; - let mut filtered_rows = 0; - - ParquetReaderBuilder::prune_row_groups_by_rows( - &parquet_meta, - rows_in_row_groups, - &mut output, - &mut filtered_row_groups, - &mut filtered_rows, - ); - - assert_eq!( - output, - BTreeMap::from([ - ( - 0, - Some(RowSelection::from(vec![ - RowSelector::skip(5), - RowSelector::select(5), - ])) - ), - (2, Some(RowSelection::from(vec![RowSelector::select(5)]))), - ]) - ); - assert_eq!(filtered_row_groups, 1); - assert_eq!(filtered_rows, 15); - } - - #[test] - fn prune_row_groups_by_rows_from_not_full() { - let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]); - - let rows_in_row_groups = vec![(0, vec![5, 6, 7, 8, 9]), (2, vec![0, 1, 2, 3, 4])]; - - // The original output is not full. - let mut output = BTreeMap::from([ - ( - 0, - Some(RowSelection::from(vec![ - RowSelector::select(5), - RowSelector::skip(5), - ])), - ), - ( - 1, - Some(RowSelection::from(vec![ - RowSelector::select(5), - RowSelector::skip(5), - ])), - ), - (2, Some(RowSelection::from(vec![RowSelector::select(5)]))), - ]); - let mut filtered_row_groups = 0; - let mut filtered_rows = 0; - - ParquetReaderBuilder::prune_row_groups_by_rows( - &parquet_meta, - rows_in_row_groups, - &mut output, - &mut filtered_row_groups, - &mut filtered_rows, - ); - - assert_eq!( - output, - BTreeMap::from([(2, Some(RowSelection::from(vec![RowSelector::select(5)])))]) - ); - assert_eq!(filtered_row_groups, 2); - assert_eq!(filtered_rows, 10); - } - - #[test] - fn prune_row_groups_by_ranges_from_empty() { - let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]); - - let ranges_in_row_groups = [(0, Some(5..10).into_iter()), (2, Some(0..5).into_iter())]; - - // The original output is empty. No row groups are pruned. - let mut output = BTreeMap::new(); - let mut filtered_row_groups = 0; - let mut filtered_rows = 0; - - ParquetReaderBuilder::prune_row_groups_by_ranges( - &parquet_meta, - ranges_in_row_groups.into_iter(), - &mut output, - &mut filtered_row_groups, - &mut filtered_rows, - ); - - assert!(output.is_empty()); - assert_eq!(filtered_row_groups, 0); - assert_eq!(filtered_rows, 0); - } - - #[test] - fn prune_row_groups_by_ranges_from_full() { - let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]); - - let ranges_in_row_groups = [(0, Some(5..10).into_iter()), (2, Some(0..5).into_iter())]; - - // The original output is full. - let mut output = BTreeMap::from([(0, None), (1, None), (2, None)]); - let mut filtered_row_groups = 0; - let mut filtered_rows = 0; - - ParquetReaderBuilder::prune_row_groups_by_ranges( - &parquet_meta, - ranges_in_row_groups.into_iter(), - &mut output, - &mut filtered_row_groups, - &mut filtered_rows, - ); - - assert_eq!( - output, - BTreeMap::from([ - ( - 0, - Some(RowSelection::from(vec![ - RowSelector::skip(5), - RowSelector::select(5), - ])) - ), - (2, Some(RowSelection::from(vec![RowSelector::select(5)]))), - ]) - ); - assert_eq!(filtered_row_groups, 1); - assert_eq!(filtered_rows, 15); - } - - #[test] - fn prune_row_groups_by_ranges_from_not_full() { - let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]); - - let ranges_in_row_groups = [(0, Some(5..10).into_iter()), (2, Some(0..5).into_iter())]; - - // The original output is not full. - let mut output = BTreeMap::from([ - ( - 0, - Some(RowSelection::from(vec![ - RowSelector::select(5), - RowSelector::skip(5), - ])), - ), - ( - 1, - Some(RowSelection::from(vec![ - RowSelector::select(5), - RowSelector::skip(5), - ])), - ), - (2, Some(RowSelection::from(vec![RowSelector::select(5)]))), - ]); - let mut filtered_row_groups = 0; - let mut filtered_rows = 0; - - ParquetReaderBuilder::prune_row_groups_by_ranges( - &parquet_meta, - ranges_in_row_groups.into_iter(), - &mut output, - &mut filtered_row_groups, - &mut filtered_rows, - ); - - assert_eq!( - output, - BTreeMap::from([(2, Some(RowSelection::from(vec![RowSelector::select(5)])))]) - ); - assert_eq!(filtered_row_groups, 2); - assert_eq!(filtered_rows, 10); - } - - #[test] - fn prune_row_groups_by_ranges_exceed_end() { - let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]); - - // The range exceeds the end of the row group. - let ranges_in_row_groups = [(0, Some(5..10).into_iter()), (2, Some(0..10).into_iter())]; - - let mut output = BTreeMap::from([ - ( - 0, - Some(RowSelection::from(vec![ - RowSelector::select(5), - RowSelector::skip(5), - ])), - ), - ( - 1, - Some(RowSelection::from(vec![ - RowSelector::select(5), - RowSelector::skip(5), - ])), - ), - (2, Some(RowSelection::from(vec![RowSelector::select(5)]))), - ]); - let mut filtered_row_groups = 0; - let mut filtered_rows = 0; - - ParquetReaderBuilder::prune_row_groups_by_ranges( - &parquet_meta, - ranges_in_row_groups.into_iter(), - &mut output, - &mut filtered_row_groups, - &mut filtered_rows, - ); - - assert_eq!( - output, - BTreeMap::from([(2, Some(RowSelection::from(vec![RowSelector::select(5)])))]) - ); - assert_eq!(filtered_row_groups, 2); - assert_eq!(filtered_rows, 10); - } -} diff --git a/src/mito2/src/sst/parquet/row_selection.rs b/src/mito2/src/sst/parquet/row_selection.rs index 6b83242b73..964f148cb1 100644 --- a/src/mito2/src/sst/parquet/row_selection.rs +++ b/src/mito2/src/sst/parquet/row_selection.rs @@ -12,10 +12,333 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::{BTreeMap, BTreeSet}; use std::ops::Range; +use index::inverted_index::search::index_apply::ApplyOutput; +use itertools::Itertools; use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; +/// A selection of row groups. +#[derive(Debug, Clone, Default)] +pub struct RowGroupSelection { + /// Row group id to row selection. + selection_in_rg: BTreeMap, + /// Total number of rows in the selection. + row_count: usize, +} + +/// A row selection with its count. +#[derive(Debug, Clone)] +struct RowSelectionWithCount { + /// Row selection. + selection: RowSelection, + /// Number of rows in the selection. + row_count: usize, +} + +impl RowGroupSelection { + /// Creates a new `RowGroupSelection` with all row groups selected. + /// + /// # Arguments + /// * `row_group_size` - The number of rows in each row group (except possibly the last one) + /// * `total_row_count` - Total number of rows + pub fn new(row_group_size: usize, total_row_count: usize) -> Self { + let mut selection_in_rg = BTreeMap::new(); + + let row_group_count = total_row_count.div_ceil(row_group_size); + for rg_id in 0..row_group_count { + // The last row group may have fewer rows than `row_group_size` + let row_group_size = if rg_id == row_group_count - 1 { + total_row_count - (row_group_count - 1) * row_group_size + } else { + row_group_size + }; + + let selection = RowSelection::from(vec![RowSelector::select(row_group_size)]); + selection_in_rg.insert( + rg_id, + RowSelectionWithCount { + selection, + row_count: row_group_size, + }, + ); + } + + Self { + selection_in_rg, + row_count: total_row_count, + } + } + + /// Returns the row selection for a given row group. + /// + /// `None` indicates not selected. + pub fn get(&self, rg_id: usize) -> Option<&RowSelection> { + self.selection_in_rg.get(&rg_id).map(|x| &x.selection) + } + + /// Creates a new `RowGroupSelection` from the output of inverted index application. + /// + /// # Arguments + /// * `row_group_size` - The number of rows in each row group (except possibly the last one) + /// * `apply_output` - The output from applying the inverted index + /// + /// # Assumptions + /// * All row groups (except possibly the last one) have the same number of rows + /// * The last row group may have fewer rows than `row_group_size` + pub fn from_inverted_index_apply_output( + row_group_size: usize, + apply_output: ApplyOutput, + ) -> Self { + // Step 1: Convert segment IDs to row ranges within row groups + // For each segment ID, calculate its corresponding row range in the row group + let segment_row_count = apply_output.segment_row_count; + let row_group_ranges = apply_output.matched_segment_ids.iter_ones().map(|seg_id| { + // Calculate the global row ID where this segment starts + let begin_row_id = seg_id * segment_row_count; + // Determine which row group this segment belongs to + let row_group_id = begin_row_id / row_group_size; + // Calculate the offset within the row group + let rg_begin_row_id = begin_row_id % row_group_size; + // Ensure the end row ID doesn't exceed the row group size + let rg_end_row_id = (rg_begin_row_id + segment_row_count).min(row_group_size); + + (row_group_id, rg_begin_row_id..rg_end_row_id) + }); + + // Step 2: Group ranges by row group ID and create row selections + let mut total_row_count = 0; + let selection_in_rg = row_group_ranges + .chunk_by(|(row_group_id, _)| *row_group_id) + .into_iter() + .map(|(row_group_id, group)| { + // Extract just the ranges from the group + let ranges = group.map(|(_, ranges)| ranges); + // Create row selection from the ranges + // Note: We use `row_group_size` here, which is safe because: + // 1. For non-last row groups, it's the actual size + // 2. For the last row group, any ranges beyond the actual size will be clipped + // by the min() operation above + let selection = row_selection_from_row_ranges(ranges, row_group_size); + let row_count = selection.row_count(); + total_row_count += row_count; + ( + row_group_id, + RowSelectionWithCount { + selection, + row_count, + }, + ) + }) + .collect(); + + Self { + selection_in_rg, + row_count: total_row_count, + } + } + + /// Creates a new `RowGroupSelection` from a set of row IDs. + /// + /// # Arguments + /// * `row_ids` - Set of row IDs to select + /// * `row_group_size` - The number of rows in each row group (except possibly the last one) + /// * `num_row_groups` - Total number of row groups + /// + /// # Assumptions + /// * All row groups (except possibly the last one) have the same number of rows + /// * The last row group may have fewer rows than `row_group_size` + /// * All row IDs must within the range of [0, num_row_groups * row_group_size) + pub fn from_row_ids( + row_ids: BTreeSet, + row_group_size: usize, + num_row_groups: usize, + ) -> Self { + // Step 1: Group row IDs by their row group + let row_group_to_row_ids = + Self::group_row_ids_by_row_group(row_ids, row_group_size, num_row_groups); + + // Step 2: Create row selections for each row group + let mut total_row_count = 0; + let selection_in_rg = row_group_to_row_ids + .into_iter() + .map(|(row_group_id, row_ids)| { + let selection = + row_selection_from_sorted_row_ids(row_ids.into_iter(), row_group_size); + let row_count = selection.row_count(); + total_row_count += row_count; + ( + row_group_id, + RowSelectionWithCount { + selection, + row_count, + }, + ) + }) + .collect(); + + Self { + selection_in_rg, + row_count: total_row_count, + } + } + + /// Creates a new `RowGroupSelection` from a set of row ranges. + /// + /// # Arguments + /// * `row_ranges` - A vector of (row_group_id, row_ranges) pairs + /// * `row_group_size` - The number of rows in each row group (except possibly the last one) + /// + /// # Assumptions + /// * All row groups (except possibly the last one) have the same number of rows + /// * The last row group may have fewer rows than `row_group_size` + /// * All ranges in `row_ranges` must be within the bounds of their respective row groups + /// (i.e., for row group i, all ranges must be within [0, row_group_size) or [0, remaining_rows) for the last row group) + /// * Ranges within the same row group must not overlap. Overlapping ranges will result in undefined behavior. + pub fn from_row_ranges( + row_ranges: Vec<(usize, Vec>)>, + row_group_size: usize, + ) -> Self { + let mut total_row_count = 0; + let selection_in_rg = row_ranges + .into_iter() + .map(|(row_group_id, ranges)| { + let selection = row_selection_from_row_ranges(ranges.into_iter(), row_group_size); + let row_count = selection.row_count(); + total_row_count += row_count; + ( + row_group_id, + RowSelectionWithCount { + selection, + row_count, + }, + ) + }) + .collect(); + + Self { + selection_in_rg, + row_count: total_row_count, + } + } + + /// Groups row IDs by their row group. + /// + /// # Arguments + /// * `row_ids` - Set of row IDs to group + /// * `row_group_size` - Size of each row group + /// * `num_row_groups` - Total number of row groups + /// + /// # Returns + /// A vector of (row_group_id, row_ids) pairs, where row_ids are the IDs within that row group. + fn group_row_ids_by_row_group( + row_ids: BTreeSet, + row_group_size: usize, + num_row_groups: usize, + ) -> Vec<(usize, Vec)> { + let est_rows_per_group = row_ids.len() / num_row_groups; + let mut row_group_to_row_ids: Vec<(usize, Vec)> = Vec::with_capacity(num_row_groups); + + for row_id in row_ids { + let row_group_id = row_id as usize / row_group_size; + let row_id_in_group = row_id as usize % row_group_size; + + if let Some((rg_id, row_ids)) = row_group_to_row_ids.last_mut() + && *rg_id == row_group_id + { + row_ids.push(row_id_in_group); + } else { + let mut row_ids = Vec::with_capacity(est_rows_per_group); + row_ids.push(row_id_in_group); + row_group_to_row_ids.push((row_group_id, row_ids)); + } + } + + row_group_to_row_ids + } + + /// Intersects two `RowGroupSelection`s. + pub fn intersect(&self, other: &Self) -> Self { + let mut res = BTreeMap::new(); + let mut total_row_count = 0; + + for (rg_id, x) in other.selection_in_rg.iter() { + let Some(y) = self.selection_in_rg.get(rg_id) else { + continue; + }; + let selection = x.selection.intersection(&y.selection); + let row_count = selection.row_count(); + if row_count > 0 { + total_row_count += row_count; + res.insert( + *rg_id, + RowSelectionWithCount { + selection, + row_count, + }, + ); + } + } + + Self { + selection_in_rg: res, + row_count: total_row_count, + } + } + + /// Returns the number of row groups in the selection. + pub fn row_group_count(&self) -> usize { + self.selection_in_rg.len() + } + + /// Returns the number of rows in the selection. + pub fn row_count(&self) -> usize { + self.row_count + } + + /// Returns the first row group in the selection. + pub fn pop_first(&mut self) -> Option<(usize, RowSelection)> { + let ( + row_group_id, + RowSelectionWithCount { + selection, + row_count, + }, + ) = self.selection_in_rg.pop_first()?; + + self.row_count -= row_count; + Some((row_group_id, selection)) + } + + /// Removes a row group from the selection. + pub fn remove_row_group(&mut self, row_group_id: usize) { + let Some(RowSelectionWithCount { row_count, .. }) = + self.selection_in_rg.remove(&row_group_id) + else { + return; + }; + self.row_count -= row_count; + } + + /// Returns true if the selection is empty. + pub fn is_empty(&self) -> bool { + self.selection_in_rg.is_empty() + } + + /// Returns true if the selection contains a row group with the given ID. + pub fn contains_row_group(&self, row_group_id: usize) -> bool { + self.selection_in_rg.contains_key(&row_group_id) + } + + /// Returns an iterator over the row groups in the selection. + pub fn iter(&self) -> impl Iterator { + self.selection_in_rg + .iter() + .map(|(row_group_id, x)| (row_group_id, &x.selection)) + } +} + /// Converts an iterator of row ranges into a `RowSelection` by creating a sequence of `RowSelector`s. /// /// This function processes each range in the input and either creates a new selector or merges @@ -78,19 +401,6 @@ pub(crate) fn row_selection_from_sorted_row_ids( RowSelection::from(selectors) } -/// Intersects two `RowSelection`s. -pub(crate) fn intersect_row_selections( - a: Option, - b: Option, -) -> Option { - match (a, b) { - (Some(a), Some(b)) => Some(a.intersection(&b)), - (Some(a), None) => Some(a), - (None, Some(b)) => Some(b), - (None, None) => None, - } -} - /// Helper function to either add a new `RowSelector` to `selectors` or merge it with the last one /// if they are of the same type (both skip or both select). fn add_or_merge_selector(selectors: &mut Vec, count: usize, is_skip: bool) { @@ -111,6 +421,7 @@ fn add_or_merge_selector(selectors: &mut Vec, count: usize, is_skip } #[cfg(test)] +#[allow(clippy::single_range_in_vec_init)] mod tests { use super::*; @@ -210,4 +521,471 @@ mod tests { let expected = RowSelection::from(vec![RowSelector::skip(10)]); assert_eq!(selection, expected); } + + #[test] + fn test_group_row_ids() { + let row_ids = [0, 1, 2, 5, 6, 7, 8, 12].into_iter().collect(); + let row_group_size = 5; + let num_row_groups = 3; + + let row_group_to_row_ids = + RowGroupSelection::group_row_ids_by_row_group(row_ids, row_group_size, num_row_groups); + + assert_eq!( + row_group_to_row_ids, + vec![(0, vec![0, 1, 2]), (1, vec![0, 1, 2, 3]), (2, vec![2])] + ); + } + + #[test] + fn test_row_group_selection_new() { + // Test with regular case + let selection = RowGroupSelection::new(100, 250); + assert_eq!(selection.row_count(), 250); + assert_eq!(selection.row_group_count(), 3); + + // Check content of each row group + let row_selection = selection.get(0).unwrap(); + assert_eq!(row_selection.row_count(), 100); + + let row_selection = selection.get(1).unwrap(); + assert_eq!(row_selection.row_count(), 100); + + let row_selection = selection.get(2).unwrap(); + assert_eq!(row_selection.row_count(), 50); + + // Test with empty selection + let selection = RowGroupSelection::new(100, 0); + assert_eq!(selection.row_count(), 0); + assert_eq!(selection.row_group_count(), 0); + assert!(selection.get(0).is_none()); + + // Test with single row group + let selection = RowGroupSelection::new(100, 50); + assert_eq!(selection.row_count(), 50); + assert_eq!(selection.row_group_count(), 1); + + let row_selection = selection.get(0).unwrap(); + assert_eq!(row_selection.row_count(), 50); + + // Test with row count that doesn't divide evenly + let selection = RowGroupSelection::new(100, 150); + assert_eq!(selection.row_count(), 150); + assert_eq!(selection.row_group_count(), 2); + + let row_selection = selection.get(0).unwrap(); + assert_eq!(row_selection.row_count(), 100); + + let row_selection = selection.get(1).unwrap(); + assert_eq!(row_selection.row_count(), 50); + + // Test with row count that's just over a multiple of row_group_size + let selection = RowGroupSelection::new(100, 101); + assert_eq!(selection.row_count(), 101); + assert_eq!(selection.row_group_count(), 2); + + let row_selection = selection.get(0).unwrap(); + assert_eq!(row_selection.row_count(), 100); + + let row_selection = selection.get(1).unwrap(); + assert_eq!(row_selection.row_count(), 1); + } + + #[test] + fn test_from_row_ids() { + let row_group_size = 100; + let num_row_groups = 3; + + // Test with regular case + let row_ids: BTreeSet = vec![5, 15, 25, 35, 105, 115, 205, 215] + .into_iter() + .collect(); + let selection = RowGroupSelection::from_row_ids(row_ids, row_group_size, num_row_groups); + assert_eq!(selection.row_count(), 8); + assert_eq!(selection.row_group_count(), 3); + + // Check content of each row group + let row_selection = selection.get(0).unwrap(); + assert_eq!(row_selection.row_count(), 4); // 5, 15, 25, 35 + + let row_selection = selection.get(1).unwrap(); + assert_eq!(row_selection.row_count(), 2); // 105, 115 + + let row_selection = selection.get(2).unwrap(); + assert_eq!(row_selection.row_count(), 2); // 205, 215 + + // Test with empty row IDs + let empty_row_ids: BTreeSet = BTreeSet::new(); + let selection = + RowGroupSelection::from_row_ids(empty_row_ids, row_group_size, num_row_groups); + assert_eq!(selection.row_count(), 0); + assert_eq!(selection.row_group_count(), 0); + assert!(selection.get(0).is_none()); + + // Test with consecutive row IDs + let consecutive_row_ids: BTreeSet = vec![5, 6, 7, 8, 9].into_iter().collect(); + let selection = + RowGroupSelection::from_row_ids(consecutive_row_ids, row_group_size, num_row_groups); + assert_eq!(selection.row_count(), 5); + assert_eq!(selection.row_group_count(), 1); + + let row_selection = selection.get(0).unwrap(); + assert_eq!(row_selection.row_count(), 5); // 5, 6, 7, 8, 9 + + // Test with row IDs at row group boundaries + let boundary_row_ids: BTreeSet = vec![0, 99, 100, 199, 200, 249].into_iter().collect(); + let selection = + RowGroupSelection::from_row_ids(boundary_row_ids, row_group_size, num_row_groups); + assert_eq!(selection.row_count(), 6); + assert_eq!(selection.row_group_count(), 3); + + let row_selection = selection.get(0).unwrap(); + assert_eq!(row_selection.row_count(), 2); // 0, 99 + + let row_selection = selection.get(1).unwrap(); + assert_eq!(row_selection.row_count(), 2); // 100, 199 + + let row_selection = selection.get(2).unwrap(); + assert_eq!(row_selection.row_count(), 2); // 200, 249 + + // Test with single row group + let single_group_row_ids: BTreeSet = vec![5, 10, 15].into_iter().collect(); + let selection = RowGroupSelection::from_row_ids(single_group_row_ids, row_group_size, 1); + assert_eq!(selection.row_count(), 3); + assert_eq!(selection.row_group_count(), 1); + + let row_selection = selection.get(0).unwrap(); + assert_eq!(row_selection.row_count(), 3); // 5, 10, 15 + } + + #[test] + fn test_from_row_ranges() { + let row_group_size = 100; + + // Test with regular case + let ranges = vec![ + (0, vec![5..15, 25..35]), // Within [0, 100) + (1, vec![5..15]), // Within [0, 100) + (2, vec![0..5, 10..15]), // Within [0, 50) for last row group + ]; + let selection = RowGroupSelection::from_row_ranges(ranges, row_group_size); + assert_eq!(selection.row_count(), 40); + assert_eq!(selection.row_group_count(), 3); + + // Check content of each row group + let row_selection = selection.get(0).unwrap(); + assert_eq!(row_selection.row_count(), 20); // 5..15 (10) + 25..35 (10) + + let row_selection = selection.get(1).unwrap(); + assert_eq!(row_selection.row_count(), 10); // 5..15 (10) + + let row_selection = selection.get(2).unwrap(); + assert_eq!(row_selection.row_count(), 10); // 0..5 (5) + 10..15 (5) + + // Test with empty ranges + let empty_ranges: Vec<(usize, Vec>)> = vec![]; + let selection = RowGroupSelection::from_row_ranges(empty_ranges, row_group_size); + assert_eq!(selection.row_count(), 0); + assert_eq!(selection.row_group_count(), 0); + assert!(selection.get(0).is_none()); + + // Test with adjacent ranges within same row group + let adjacent_ranges = vec![ + (0, vec![5..15, 15..25]), // Adjacent ranges within [0, 100) + ]; + let selection = RowGroupSelection::from_row_ranges(adjacent_ranges, row_group_size); + assert_eq!(selection.row_count(), 20); + assert_eq!(selection.row_group_count(), 1); + + let row_selection = selection.get(0).unwrap(); + assert_eq!(row_selection.row_count(), 20); // 5..15 (10) + 15..25 (10) + + // Test with ranges at row group boundaries + let boundary_ranges = vec![ + (0, vec![0..10, 90..100]), // Ranges at start and end of first row group + (1, vec![0..100]), // Full range of second row group + (2, vec![0..50]), // Full range of last row group + ]; + let selection = RowGroupSelection::from_row_ranges(boundary_ranges, row_group_size); + assert_eq!(selection.row_count(), 170); + assert_eq!(selection.row_group_count(), 3); + + let row_selection = selection.get(0).unwrap(); + assert_eq!(row_selection.row_count(), 20); // 0..10 (10) + 90..100 (10) + + let row_selection = selection.get(1).unwrap(); + assert_eq!(row_selection.row_count(), 100); // 0..100 (100) + + let row_selection = selection.get(2).unwrap(); + assert_eq!(row_selection.row_count(), 50); // 0..50 (50) + + // Test with single row group + let single_group_ranges = vec![ + (0, vec![0..50]), // Half of first row group + ]; + let selection = RowGroupSelection::from_row_ranges(single_group_ranges, row_group_size); + assert_eq!(selection.row_count(), 50); + assert_eq!(selection.row_group_count(), 1); + + let row_selection = selection.get(0).unwrap(); + assert_eq!(row_selection.row_count(), 50); // 0..50 (50) + } + + #[test] + fn test_intersect() { + let row_group_size = 100; + + // Test case 1: Regular intersection with partial overlap + let ranges1 = vec![ + (0, vec![5..15, 25..35]), // Within [0, 100) + (1, vec![5..15]), // Within [0, 100) + ]; + let selection1 = RowGroupSelection::from_row_ranges(ranges1, row_group_size); + + let ranges2 = vec![ + (0, vec![10..20]), // Within [0, 100) + (1, vec![10..20]), // Within [0, 100) + (2, vec![0..10]), // Within [0, 50) for last row group + ]; + let selection2 = RowGroupSelection::from_row_ranges(ranges2, row_group_size); + + let intersection = selection1.intersect(&selection2); + assert_eq!(intersection.row_count(), 10); + assert_eq!(intersection.row_group_count(), 2); + + let row_selection = intersection.get(0).unwrap(); + assert_eq!(row_selection.row_count(), 5); // 10..15 (5) + + let row_selection = intersection.get(1).unwrap(); + assert_eq!(row_selection.row_count(), 5); // 10..15 (5) + + // Test case 2: Empty intersection with empty selection + let empty_ranges: Vec<(usize, Vec>)> = vec![]; + let empty_selection = RowGroupSelection::from_row_ranges(empty_ranges, row_group_size); + let intersection = selection1.intersect(&empty_selection); + assert_eq!(intersection.row_count(), 0); + assert_eq!(intersection.row_group_count(), 0); + assert!(intersection.get(0).is_none()); + + // Test case 3: No overlapping row groups + let non_overlapping_ranges = vec![ + (3, vec![0..10]), // Within [0, 50) for last row group + ]; + let non_overlapping_selection = + RowGroupSelection::from_row_ranges(non_overlapping_ranges, row_group_size); + let intersection = selection1.intersect(&non_overlapping_selection); + assert_eq!(intersection.row_count(), 0); + assert_eq!(intersection.row_group_count(), 0); + assert!(intersection.get(0).is_none()); + + // Test case 4: Full overlap within same row group + let full_overlap_ranges1 = vec![ + (0, vec![0..50]), // Within [0, 100) + ]; + let full_overlap_ranges2 = vec![ + (0, vec![0..50]), // Within [0, 100) + ]; + let selection1 = RowGroupSelection::from_row_ranges(full_overlap_ranges1, row_group_size); + let selection2 = RowGroupSelection::from_row_ranges(full_overlap_ranges2, row_group_size); + let intersection = selection1.intersect(&selection2); + assert_eq!(intersection.row_count(), 50); + assert_eq!(intersection.row_group_count(), 1); + + let row_selection = intersection.get(0).unwrap(); + assert_eq!(row_selection.row_count(), 50); // 0..50 (50) + + // Test case 5: Partial overlap at row group boundaries + let boundary_ranges1 = vec![ + (0, vec![0..10, 90..100]), // Within [0, 100) + (1, vec![0..100]), // Within [0, 100) + ]; + let boundary_ranges2 = vec![ + (0, vec![5..15, 95..100]), // Within [0, 100) + (1, vec![50..100]), // Within [0, 100) + ]; + let selection1 = RowGroupSelection::from_row_ranges(boundary_ranges1, row_group_size); + let selection2 = RowGroupSelection::from_row_ranges(boundary_ranges2, row_group_size); + let intersection = selection1.intersect(&selection2); + assert_eq!(intersection.row_count(), 60); + assert_eq!(intersection.row_group_count(), 2); + + let row_selection = intersection.get(0).unwrap(); + assert_eq!(row_selection.row_count(), 10); // 5..10 (5) + 95..100 (5) + + let row_selection = intersection.get(1).unwrap(); + assert_eq!(row_selection.row_count(), 50); // 50..100 (50) + + // Test case 6: Multiple ranges with complex overlap + let complex_ranges1 = vec![ + (0, vec![5..15, 25..35, 45..55]), // Within [0, 100) + (1, vec![10..20, 30..40]), // Within [0, 100) + ]; + let complex_ranges2 = vec![ + (0, vec![10..20, 30..40, 50..60]), // Within [0, 100) + (1, vec![15..25, 35..45]), // Within [0, 100) + ]; + let selection1 = RowGroupSelection::from_row_ranges(complex_ranges1, row_group_size); + let selection2 = RowGroupSelection::from_row_ranges(complex_ranges2, row_group_size); + let intersection = selection1.intersect(&selection2); + assert_eq!(intersection.row_count(), 25); + assert_eq!(intersection.row_group_count(), 2); + + let row_selection = intersection.get(0).unwrap(); + assert_eq!(row_selection.row_count(), 15); // 10..15 (5) + 30..35 (5) + 50..55 (5) + + let row_selection = intersection.get(1).unwrap(); + assert_eq!(row_selection.row_count(), 10); // 15..20 (5) + 35..40 (5) + + // Test case 7: Intersection with last row group (smaller size) + let last_rg_ranges1 = vec![ + (2, vec![0..25, 30..40]), // Within [0, 50) for last row group + ]; + let last_rg_ranges2 = vec![ + (2, vec![20..30, 35..45]), // Within [0, 50) for last row group + ]; + let selection1 = RowGroupSelection::from_row_ranges(last_rg_ranges1, row_group_size); + let selection2 = RowGroupSelection::from_row_ranges(last_rg_ranges2, row_group_size); + let intersection = selection1.intersect(&selection2); + assert_eq!(intersection.row_count(), 10); + assert_eq!(intersection.row_group_count(), 1); + + let row_selection = intersection.get(2).unwrap(); + assert_eq!(row_selection.row_count(), 10); // 20..25 (5) + 35..40 (5) + + // Test case 8: Intersection with empty ranges in one selection + let empty_ranges = vec![ + (0, vec![]), // Empty ranges + (1, vec![5..15]), // Within [0, 100) + ]; + let selection1 = RowGroupSelection::from_row_ranges(empty_ranges, row_group_size); + let ranges2 = vec![ + (0, vec![5..15, 25..35]), // Within [0, 100) + (1, vec![5..15]), // Within [0, 100) + ]; + let selection2 = RowGroupSelection::from_row_ranges(ranges2, row_group_size); + let intersection = selection1.intersect(&selection2); + assert_eq!(intersection.row_count(), 10); + assert_eq!(intersection.row_group_count(), 1); + + let row_selection = intersection.get(1).unwrap(); + assert_eq!(row_selection.row_count(), 10); // 5..15 (10) + } + + #[test] + fn test_pop_first() { + let row_group_size = 100; + let ranges = vec![ + (0, vec![5..15]), // Within [0, 100) + (1, vec![5..15]), // Within [0, 100) + (2, vec![0..5]), // Within [0, 50) for last row group + ]; + let mut selection = RowGroupSelection::from_row_ranges(ranges, row_group_size); + + // Test popping first row group + let (rg_id, row_selection) = selection.pop_first().unwrap(); + assert_eq!(rg_id, 0); + assert_eq!(row_selection.row_count(), 10); // 5..15 (10) + assert_eq!(selection.row_count(), 15); + assert_eq!(selection.row_group_count(), 2); + + // Verify remaining row groups' content + let row_selection = selection.get(1).unwrap(); + assert_eq!(row_selection.row_count(), 10); // 5..15 (10) + + let row_selection = selection.get(2).unwrap(); + assert_eq!(row_selection.row_count(), 5); // 0..5 (5) + + // Test popping second row group + let (rg_id, row_selection) = selection.pop_first().unwrap(); + assert_eq!(rg_id, 1); + assert_eq!(row_selection.row_count(), 10); // 5..15 (10) + assert_eq!(selection.row_count(), 5); + assert_eq!(selection.row_group_count(), 1); + + // Verify remaining row group's content + let row_selection = selection.get(2).unwrap(); + assert_eq!(row_selection.row_count(), 5); // 0..5 (5) + + // Test popping last row group + let (rg_id, row_selection) = selection.pop_first().unwrap(); + assert_eq!(rg_id, 2); + assert_eq!(row_selection.row_count(), 5); // 0..5 (5) + assert_eq!(selection.row_count(), 0); + assert_eq!(selection.row_group_count(), 0); + assert!(selection.is_empty()); + + // Test popping from empty selection + let mut empty_selection = RowGroupSelection::from_row_ranges(vec![], row_group_size); + assert!(empty_selection.pop_first().is_none()); + assert_eq!(empty_selection.row_count(), 0); + assert_eq!(empty_selection.row_group_count(), 0); + assert!(empty_selection.is_empty()); + } + + #[test] + fn test_remove_row_group() { + let row_group_size = 100; + let ranges = vec![ + (0, vec![5..15]), // Within [0, 100) + (1, vec![5..15]), // Within [0, 100) + (2, vec![0..5]), // Within [0, 50) for last row group + ]; + let mut selection = RowGroupSelection::from_row_ranges(ranges, row_group_size); + + // Test removing existing row group + selection.remove_row_group(1); + assert_eq!(selection.row_count(), 15); + assert_eq!(selection.row_group_count(), 2); + assert!(!selection.contains_row_group(1)); + + // Verify remaining row groups' content + let row_selection = selection.get(0).unwrap(); + assert_eq!(row_selection.row_count(), 10); // 5..15 (10) + + let row_selection = selection.get(2).unwrap(); + assert_eq!(row_selection.row_count(), 5); // 0..5 (5) + + // Test removing non-existent row group + selection.remove_row_group(5); + assert_eq!(selection.row_count(), 15); + assert_eq!(selection.row_group_count(), 2); + + // Test removing all row groups + selection.remove_row_group(0); + assert_eq!(selection.row_count(), 5); + assert_eq!(selection.row_group_count(), 1); + + let row_selection = selection.get(2).unwrap(); + assert_eq!(row_selection.row_count(), 5); // 0..5 (5) + + selection.remove_row_group(2); + assert_eq!(selection.row_count(), 0); + assert_eq!(selection.row_group_count(), 0); + assert!(selection.is_empty()); + + // Test removing from empty selection + let mut empty_selection = RowGroupSelection::from_row_ranges(vec![], row_group_size); + empty_selection.remove_row_group(0); + assert_eq!(empty_selection.row_count(), 0); + assert_eq!(empty_selection.row_group_count(), 0); + assert!(empty_selection.is_empty()); + } + + #[test] + fn test_contains_row_group() { + let row_group_size = 100; + let ranges = vec![ + (0, vec![5..15]), // Within [0, 100) + (1, vec![5..15]), // Within [0, 100) + ]; + let selection = RowGroupSelection::from_row_ranges(ranges, row_group_size); + + assert!(selection.contains_row_group(0)); + assert!(selection.contains_row_group(1)); + assert!(!selection.contains_row_group(2)); + + // Test empty selection + let empty_selection = RowGroupSelection::from_row_ranges(vec![], row_group_size); + assert!(!empty_selection.contains_row_group(0)); + } }