refactor: introduce row group selection (#6075)

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2025-05-12 15:15:17 +08:00
committed by GitHub
parent 36ff36e094
commit 36d9346ffc
5 changed files with 880 additions and 560 deletions

View File

@@ -14,11 +14,9 @@
//! Structs for partition ranges.
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};
use common_time::Timestamp;
use parquet::arrow::arrow_reader::RowSelection;
use smallvec::{smallvec, SmallVec};
use store_api::region_engine::PartitionRange;
use store_api::storage::TimeSeriesDistribution;
@@ -31,6 +29,7 @@ use crate::sst::file::{overlaps, FileHandle, FileTimeRange};
use crate::sst::parquet::file_range::{FileRange, FileRangeContextRef};
use crate::sst::parquet::format::parquet_row_group_time_range;
use crate::sst::parquet::reader::ReaderMetrics;
use crate::sst::parquet::row_selection::RowGroupSelection;
use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
const ALL_ROW_GROUPS: i64 = -1;
@@ -371,20 +370,16 @@ pub(crate) struct FileRangeBuilder {
/// Context for the file.
/// None indicates nothing to read.
context: Option<FileRangeContextRef>,
/// Row selections for each row group to read.
/// It skips the row group if it is not in the map.
row_groups: BTreeMap<usize, Option<RowSelection>>,
/// Row group selection for the file to read.
selection: RowGroupSelection,
}
impl FileRangeBuilder {
/// Builds a file range builder from context and row groups.
pub(crate) fn new(
context: FileRangeContextRef,
row_groups: BTreeMap<usize, Option<RowSelection>>,
) -> Self {
pub(crate) fn new(context: FileRangeContextRef, selection: RowGroupSelection) -> Self {
Self {
context: Some(context),
row_groups,
selection,
}
}
@@ -397,21 +392,25 @@ impl FileRangeBuilder {
if row_group_index >= 0 {
let row_group_index = row_group_index as usize;
// Scans one row group.
let Some(row_selection) = self.row_groups.get(&row_group_index) else {
let Some(row_selection) = self.selection.get(row_group_index) else {
return;
};
ranges.push(FileRange::new(
context,
row_group_index,
row_selection.clone(),
Some(row_selection.clone()),
));
} else {
// Scans all row groups.
ranges.extend(
self.row_groups
self.selection
.iter()
.map(|(row_group_index, row_selection)| {
FileRange::new(context.clone(), *row_group_index, row_selection.clone())
FileRange::new(
context.clone(),
*row_group_index,
Some(row_selection.clone()),
)
}),
);
}

View File

@@ -787,7 +787,7 @@ impl ScanInput {
.expected_metadata(Some(self.mapper.metadata().clone()))
.build_reader_input(reader_metrics)
.await;
let (mut file_range_ctx, row_groups) = match res {
let (mut file_range_ctx, selection) = match res {
Ok(x) => x,
Err(e) => {
if e.is_object_not_found() && self.ignore_file_not_found {
@@ -810,7 +810,7 @@ impl ScanInput {
)?;
file_range_ctx.set_compat_batch(Some(compat));
}
Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), row_groups))
Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
}
/// Scans the input source in another task and sends batches to the sender.

View File

@@ -30,7 +30,7 @@ pub(crate) mod metadata;
pub(crate) mod page_reader;
pub mod reader;
pub mod row_group;
mod row_selection;
pub mod row_selection;
pub(crate) mod stats;
pub mod writer;

View File

@@ -14,8 +14,7 @@
//! Parquet reader.
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::ops::Range;
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -27,7 +26,6 @@ use datafusion_expr::Expr;
use datatypes::arrow::error::ArrowError;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::data_type::ConcreteDataType;
use itertools::Itertools;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
@@ -58,9 +56,7 @@ use crate::sst::parquet::file_range::{FileRangeContext, FileRangeContextRef};
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::metadata::MetadataLoader;
use crate::sst::parquet::row_group::InMemoryRowGroup;
use crate::sst::parquet::row_selection::{
intersect_row_selections, row_selection_from_row_ranges, row_selection_from_sorted_row_ids,
};
use crate::sst::parquet::row_selection::RowGroupSelection;
use crate::sst::parquet::stats::RowGroupPruningStats;
use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
@@ -176,8 +172,8 @@ impl ParquetReaderBuilder {
pub async fn build(&self) -> Result<ParquetReader> {
let mut metrics = ReaderMetrics::default();
let (context, row_groups) = self.build_reader_input(&mut metrics).await?;
ParquetReader::new(Arc::new(context), row_groups).await
let (context, selection) = self.build_reader_input(&mut metrics).await?;
ParquetReader::new(Arc::new(context), selection).await
}
/// Builds a [FileRangeContext] and collects row groups to read.
@@ -186,7 +182,7 @@ impl ParquetReaderBuilder {
pub(crate) async fn build_reader_input(
&self,
metrics: &mut ReaderMetrics,
) -> Result<(FileRangeContext, RowGroupMap)> {
) -> Result<(FileRangeContext, RowGroupSelection)> {
let start = Instant::now();
let file_path = self.file_handle.file_path(&self.file_dir);
@@ -224,7 +220,7 @@ impl ParquetReaderBuilder {
let field_levels =
parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
.context(ReadDataPartSnafu)?;
let row_groups = self
let selection = self
.row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
.await;
@@ -260,7 +256,7 @@ impl ParquetReaderBuilder {
metrics.build_cost += start.elapsed();
Ok((context, row_groups))
Ok((context, selection))
}
/// Decodes region metadata from key value.
@@ -331,24 +327,24 @@ impl ParquetReaderBuilder {
read_format: &ReadFormat,
parquet_meta: &ParquetMetaData,
metrics: &mut ReaderFilterMetrics,
) -> BTreeMap<usize, Option<RowSelection>> {
) -> RowGroupSelection {
let num_row_groups = parquet_meta.num_row_groups();
let num_rows = parquet_meta.file_metadata().num_rows();
if num_row_groups == 0 || num_rows == 0 {
return BTreeMap::default();
return RowGroupSelection::default();
}
// Let's assume that the number of rows in the first row group
// can represent the `row_group_size` of the Parquet file.
let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
if row_group_size == 0 {
return BTreeMap::default();
return RowGroupSelection::default();
}
metrics.rg_total += num_row_groups;
metrics.rows_total += num_rows as usize;
let mut output = (0..num_row_groups).map(|i| (i, None)).collect();
let mut output = RowGroupSelection::new(row_group_size, num_rows as _);
self.prune_row_groups_by_fulltext_index(row_group_size, parquet_meta, &mut output, metrics)
.await;
@@ -357,7 +353,7 @@ impl ParquetReaderBuilder {
}
let inverted_filtered = self
.prune_row_groups_by_inverted_index(row_group_size, parquet_meta, &mut output, metrics)
.prune_row_groups_by_inverted_index(row_group_size, &mut output, metrics)
.await;
if output.is_empty() {
return output;
@@ -365,14 +361,19 @@ impl ParquetReaderBuilder {
if !inverted_filtered {
self.prune_row_groups_by_minmax(read_format, parquet_meta, &mut output, metrics);
if output.is_empty() {
return output;
}
}
self.prune_row_groups_by_bloom_filter(parquet_meta, &mut output, metrics)
self.prune_row_groups_by_bloom_filter(row_group_size, parquet_meta, &mut output, metrics)
.await;
if output.is_empty() {
return output;
}
self.prune_row_groups_by_fulltext_bloom(parquet_meta, &mut output, metrics)
self.prune_row_groups_by_fulltext_bloom(row_group_size, parquet_meta, &mut output, metrics)
.await;
output
}
@@ -381,7 +382,7 @@ impl ParquetReaderBuilder {
&self,
row_group_size: usize,
parquet_meta: &ParquetMetaData,
output: &mut BTreeMap<usize, Option<RowSelection>>,
output: &mut RowGroupSelection,
metrics: &mut ReaderFilterMetrics,
) -> bool {
let Some(index_applier) = &self.fulltext_index_applier else {
@@ -419,46 +420,21 @@ impl ParquetReaderBuilder {
}
};
let row_group_to_row_ids =
Self::group_row_ids(apply_res, row_group_size, parquet_meta.num_row_groups());
Self::prune_row_groups_by_rows(
parquet_meta,
row_group_to_row_ids,
output,
&mut metrics.rg_fulltext_filtered,
&mut metrics.rows_fulltext_filtered,
let selection = RowGroupSelection::from_row_ids(
apply_res,
row_group_size,
parquet_meta.num_row_groups(),
);
let intersection = output.intersect(&selection);
metrics.rg_fulltext_filtered += output.row_group_count() - intersection.row_group_count();
metrics.rows_fulltext_filtered += output.row_count() - intersection.row_count();
*output = intersection;
true
}
/// Groups row IDs into row groups, with each group's row IDs starting from 0.
fn group_row_ids(
row_ids: BTreeSet<u32>,
row_group_size: usize,
num_row_groups: usize,
) -> Vec<(usize, Vec<usize>)> {
let est_rows_per_group = row_ids.len() / num_row_groups;
let mut row_group_to_row_ids: Vec<(usize, Vec<usize>)> = Vec::with_capacity(num_row_groups);
for row_id in row_ids {
let row_group_id = row_id as usize / row_group_size;
let row_id_in_group = row_id as usize % row_group_size;
if let Some((rg_id, row_ids)) = row_group_to_row_ids.last_mut()
&& *rg_id == row_group_id
{
row_ids.push(row_id_in_group);
} else {
let mut row_ids = Vec::with_capacity(est_rows_per_group);
row_ids.push(row_id_in_group);
row_group_to_row_ids.push((row_group_id, row_ids));
}
}
row_group_to_row_ids
}
/// Applies index to prune row groups.
///
/// TODO(zhongzc): Devise a mechanism to enforce the non-use of indices
@@ -467,8 +443,7 @@ impl ParquetReaderBuilder {
async fn prune_row_groups_by_inverted_index(
&self,
row_group_size: usize,
parquet_meta: &ParquetMetaData,
output: &mut BTreeMap<usize, Option<RowSelection>>,
output: &mut RowGroupSelection,
metrics: &mut ReaderFilterMetrics,
) -> bool {
let Some(index_applier) = &self.inverted_index_applier else {
@@ -503,32 +478,14 @@ impl ParquetReaderBuilder {
}
};
let segment_row_count = apply_output.segment_row_count;
let grouped_in_row_groups = apply_output
.matched_segment_ids
.iter_ones()
.map(|seg_id| {
let begin_row_id = seg_id * segment_row_count;
let row_group_id = begin_row_id / row_group_size;
let selection =
RowGroupSelection::from_inverted_index_apply_output(row_group_size, apply_output);
let intersection = output.intersect(&selection);
let rg_begin_row_id = begin_row_id % row_group_size;
let rg_end_row_id = rg_begin_row_id + segment_row_count;
metrics.rg_inverted_filtered += output.row_group_count() - intersection.row_group_count();
metrics.rows_inverted_filtered += output.row_count() - intersection.row_count();
(row_group_id, rg_begin_row_id..rg_end_row_id)
})
.chunk_by(|(row_group_id, _)| *row_group_id);
let ranges_in_row_groups = grouped_in_row_groups
.into_iter()
.map(|(row_group_id, group)| (row_group_id, group.map(|(_, ranges)| ranges)));
Self::prune_row_groups_by_ranges(
parquet_meta,
ranges_in_row_groups,
output,
&mut metrics.rg_inverted_filtered,
&mut metrics.rows_inverted_filtered,
);
*output = intersection;
true
}
@@ -538,14 +495,14 @@ impl ParquetReaderBuilder {
&self,
read_format: &ReadFormat,
parquet_meta: &ParquetMetaData,
output: &mut BTreeMap<usize, Option<RowSelection>>,
output: &mut RowGroupSelection,
metrics: &mut ReaderFilterMetrics,
) -> bool {
let Some(predicate) = &self.predicate else {
return false;
};
let row_groups_before = output.len();
let row_groups_before = output.row_group_count();
let region_meta = read_format.metadata();
let row_groups = parquet_meta.row_groups();
@@ -560,31 +517,27 @@ impl ParquetReaderBuilder {
// Here we use the schema of the SST to build the physical expression. If the column
// in the SST doesn't have the same column id as the column in the expected metadata,
// we will get a None statistics for that column.
let res = predicate
predicate
.prune_with_stats(&stats, prune_schema)
.iter()
.zip(0..parquet_meta.num_row_groups())
.filter_map(|(mask, row_group)| {
.for_each(|(mask, row_group)| {
if !*mask {
return None;
output.remove_row_group(row_group);
}
});
let selection = output.remove(&row_group)?;
Some((row_group, selection))
})
.collect::<BTreeMap<_, _>>();
let row_groups_after = res.len();
let row_groups_after = output.row_group_count();
metrics.rg_minmax_filtered += row_groups_before - row_groups_after;
*output = res;
true
}
async fn prune_row_groups_by_bloom_filter(
&self,
row_group_size: usize,
parquet_meta: &ParquetMetaData,
output: &mut BTreeMap<usize, Option<RowSelection>>,
output: &mut RowGroupSelection,
metrics: &mut ReaderFilterMetrics,
) -> bool {
let Some(index_applier) = &self.bloom_filter_index_applier else {
@@ -604,7 +557,7 @@ impl ParquetReaderBuilder {
.row_groups()
.iter()
.enumerate()
.map(|(i, rg)| (rg.num_rows() as usize, output.contains_key(&i))),
.map(|(i, rg)| (rg.num_rows() as usize, output.contains_row_group(i))),
)
.await
{
@@ -628,23 +581,22 @@ impl ParquetReaderBuilder {
}
};
Self::prune_row_groups_by_ranges(
parquet_meta,
apply_output
.into_iter()
.map(|(rg, ranges)| (rg, ranges.into_iter())),
output,
&mut metrics.rg_bloom_filtered,
&mut metrics.rows_bloom_filtered,
);
let selection = RowGroupSelection::from_row_ranges(apply_output, row_group_size);
let intersection = output.intersect(&selection);
metrics.rg_bloom_filtered += output.row_group_count() - intersection.row_group_count();
metrics.rows_bloom_filtered += output.row_count() - intersection.row_count();
*output = intersection;
true
}
async fn prune_row_groups_by_fulltext_bloom(
&self,
row_group_size: usize,
parquet_meta: &ParquetMetaData,
output: &mut BTreeMap<usize, Option<RowSelection>>,
output: &mut RowGroupSelection,
metrics: &mut ReaderFilterMetrics,
) -> bool {
let Some(index_applier) = &self.fulltext_index_applier else {
@@ -664,7 +616,7 @@ impl ParquetReaderBuilder {
.row_groups()
.iter()
.enumerate()
.map(|(i, rg)| (rg.num_rows() as usize, output.contains_key(&i))),
.map(|(i, rg)| (rg.num_rows() as usize, output.contains_row_group(i))),
)
.await
{
@@ -689,123 +641,16 @@ impl ParquetReaderBuilder {
}
};
Self::prune_row_groups_by_ranges(
parquet_meta,
apply_output
.into_iter()
.map(|(rg, ranges)| (rg, ranges.into_iter())),
output,
&mut metrics.rg_fulltext_filtered,
&mut metrics.rows_fulltext_filtered,
);
let selection = RowGroupSelection::from_row_ranges(apply_output, row_group_size);
let intersection = output.intersect(&selection);
metrics.rg_fulltext_filtered += output.row_group_count() - intersection.row_group_count();
metrics.rows_fulltext_filtered += output.row_count() - intersection.row_count();
*output = intersection;
true
}
/// Prunes row groups by rows. The `rows_in_row_groups` is like a map from row group to
/// a list of row ids to keep.
fn prune_row_groups_by_rows(
parquet_meta: &ParquetMetaData,
rows_in_row_groups: Vec<(usize, Vec<usize>)>,
output: &mut BTreeMap<usize, Option<RowSelection>>,
filtered_row_groups: &mut usize,
filtered_rows: &mut usize,
) {
let row_groups_before = output.len();
let mut rows_in_row_group_before = 0;
let mut rows_in_row_group_after = 0;
let mut res = BTreeMap::new();
for (row_group, row_ids) in rows_in_row_groups {
let Some(selection) = output.remove(&row_group) else {
continue;
};
let total_row_count = parquet_meta.row_group(row_group).num_rows() as usize;
rows_in_row_group_before += selection
.as_ref()
.map_or(total_row_count, |s| s.row_count());
let new_selection =
row_selection_from_sorted_row_ids(row_ids.into_iter(), total_row_count);
let intersected_selection = intersect_row_selections(selection, Some(new_selection));
let num_rows_after = intersected_selection
.as_ref()
.map_or(total_row_count, |s| s.row_count());
rows_in_row_group_after += num_rows_after;
if num_rows_after > 0 {
res.insert(row_group, intersected_selection);
}
}
// Pruned row groups.
while let Some((row_group, selection)) = output.pop_first() {
let total_row_count = parquet_meta.row_group(row_group).num_rows() as usize;
rows_in_row_group_before += selection
.as_ref()
.map_or(total_row_count, |s| s.row_count());
}
let row_groups_after = res.len();
*filtered_row_groups += row_groups_before - row_groups_after;
*filtered_rows += rows_in_row_group_before - rows_in_row_group_after;
*output = res;
}
/// Prunes row groups by ranges. The `ranges_in_row_groups` is like a map from row group to
/// a list of row ranges to keep.
fn prune_row_groups_by_ranges(
parquet_meta: &ParquetMetaData,
ranges_in_row_groups: impl Iterator<Item = (usize, impl Iterator<Item = Range<usize>>)>,
output: &mut BTreeMap<usize, Option<RowSelection>>,
filtered_row_groups: &mut usize,
filtered_rows: &mut usize,
) {
let row_groups_before = output.len();
let mut rows_in_row_group_before = 0;
let mut rows_in_row_group_after = 0;
let mut res = BTreeMap::new();
for (row_group, row_ranges) in ranges_in_row_groups {
let Some(selection) = output.remove(&row_group) else {
continue;
};
let total_row_count = parquet_meta.row_group(row_group).num_rows() as usize;
rows_in_row_group_before += selection
.as_ref()
.map_or(total_row_count, |s| s.row_count());
let new_selection = row_selection_from_row_ranges(row_ranges, total_row_count);
let intersected_selection = intersect_row_selections(selection, Some(new_selection));
let num_rows_after = intersected_selection
.as_ref()
.map_or(total_row_count, |s| s.row_count());
rows_in_row_group_after += num_rows_after;
if num_rows_after > 0 {
res.insert(row_group, intersected_selection);
}
}
// Pruned row groups.
while let Some((row_group, selection)) = output.pop_first() {
let total_row_count = parquet_meta.row_group(row_group).num_rows() as usize;
rows_in_row_group_before += selection
.as_ref()
.map_or(total_row_count, |s| s.row_count());
}
let row_groups_after = res.len();
*filtered_row_groups += row_groups_before - row_groups_after;
*filtered_rows += rows_in_row_group_before - rows_in_row_group_after;
*output = res;
}
}
/// Metrics of filtering rows groups and rows.
@@ -1119,14 +964,12 @@ fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) ->
Some(!matches)
}
type RowGroupMap = BTreeMap<usize, Option<RowSelection>>;
/// Parquet batch reader to read our SST format.
pub struct ParquetReader {
/// File range context.
context: FileRangeContextRef,
/// Indices of row groups to read, along with their respective row selections.
row_groups: RowGroupMap,
/// Row group selection to read.
selection: RowGroupSelection,
/// Reader of current row group.
reader_state: ReaderState,
}
@@ -1144,11 +987,11 @@ impl BatchReader for ParquetReader {
}
// No more items in current row group, reads next row group.
while let Some((row_group_idx, row_selection)) = self.row_groups.pop_first() {
while let Some((row_group_idx, row_selection)) = self.selection.pop_first() {
let parquet_reader = self
.context
.reader_builder()
.build(row_group_idx, row_selection)
.build(row_group_idx, Some(row_selection))
.await?;
// Resets the parquet reader.
@@ -1198,15 +1041,12 @@ impl Drop for ParquetReader {
impl ParquetReader {
/// Creates a new reader.
async fn new(
context: FileRangeContextRef,
mut row_groups: BTreeMap<usize, Option<RowSelection>>,
) -> Result<Self> {
async fn new(context: FileRangeContextRef, mut selection: RowGroupSelection) -> Result<Self> {
// No more items in current row group, reads next row group.
let reader_state = if let Some((row_group_idx, row_selection)) = row_groups.pop_first() {
let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
let parquet_reader = context
.reader_builder()
.build(row_group_idx, row_selection)
.build(row_group_idx, Some(row_selection))
.await?;
ReaderState::Readable(PruneReader::new_with_row_group_reader(
context.clone(),
@@ -1218,7 +1058,7 @@ impl ParquetReader {
Ok(ParquetReader {
context,
row_groups,
selection,
reader_state,
})
}
@@ -1354,300 +1194,3 @@ where
self.next_inner()
}
}
#[cfg(test)]
mod tests {
use parquet::arrow::arrow_reader::RowSelector;
use parquet::file::metadata::{FileMetaData, RowGroupMetaData};
use parquet::schema::types::{SchemaDescriptor, Type};
use super::*;
fn mock_parquet_metadata_from_row_groups(num_rows_in_row_groups: Vec<i64>) -> ParquetMetaData {
let tp = Arc::new(Type::group_type_builder("test").build().unwrap());
let schema_descr = Arc::new(SchemaDescriptor::new(tp));
let mut row_groups = Vec::new();
for num_rows in &num_rows_in_row_groups {
let row_group = RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(*num_rows)
.build()
.unwrap();
row_groups.push(row_group);
}
let file_meta = FileMetaData::new(
0,
num_rows_in_row_groups.iter().sum(),
None,
None,
schema_descr,
None,
);
ParquetMetaData::new(file_meta, row_groups)
}
#[test]
fn test_group_row_ids() {
let row_ids = [0, 1, 2, 5, 6, 7, 8, 12].into_iter().collect();
let row_group_size = 5;
let num_row_groups = 3;
let row_group_to_row_ids =
ParquetReaderBuilder::group_row_ids(row_ids, row_group_size, num_row_groups);
assert_eq!(
row_group_to_row_ids,
vec![(0, vec![0, 1, 2]), (1, vec![0, 1, 2, 3]), (2, vec![2])]
);
}
#[test]
fn prune_row_groups_by_rows_from_empty() {
let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]);
let rows_in_row_groups = vec![(0, vec![5, 6, 7, 8, 9]), (2, vec![0, 1, 2, 3, 4])];
// The original output is empty. No row groups are pruned.
let mut output = BTreeMap::new();
let mut filtered_row_groups = 0;
let mut filtered_rows = 0;
ParquetReaderBuilder::prune_row_groups_by_rows(
&parquet_meta,
rows_in_row_groups,
&mut output,
&mut filtered_row_groups,
&mut filtered_rows,
);
assert!(output.is_empty());
assert_eq!(filtered_row_groups, 0);
assert_eq!(filtered_rows, 0);
}
#[test]
fn prune_row_groups_by_rows_from_full() {
let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]);
let rows_in_row_groups = vec![(0, vec![5, 6, 7, 8, 9]), (2, vec![0, 1, 2, 3, 4])];
// The original output is full.
let mut output = BTreeMap::from([(0, None), (1, None), (2, None)]);
let mut filtered_row_groups = 0;
let mut filtered_rows = 0;
ParquetReaderBuilder::prune_row_groups_by_rows(
&parquet_meta,
rows_in_row_groups,
&mut output,
&mut filtered_row_groups,
&mut filtered_rows,
);
assert_eq!(
output,
BTreeMap::from([
(
0,
Some(RowSelection::from(vec![
RowSelector::skip(5),
RowSelector::select(5),
]))
),
(2, Some(RowSelection::from(vec![RowSelector::select(5)]))),
])
);
assert_eq!(filtered_row_groups, 1);
assert_eq!(filtered_rows, 15);
}
#[test]
fn prune_row_groups_by_rows_from_not_full() {
let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]);
let rows_in_row_groups = vec![(0, vec![5, 6, 7, 8, 9]), (2, vec![0, 1, 2, 3, 4])];
// The original output is not full.
let mut output = BTreeMap::from([
(
0,
Some(RowSelection::from(vec![
RowSelector::select(5),
RowSelector::skip(5),
])),
),
(
1,
Some(RowSelection::from(vec![
RowSelector::select(5),
RowSelector::skip(5),
])),
),
(2, Some(RowSelection::from(vec![RowSelector::select(5)]))),
]);
let mut filtered_row_groups = 0;
let mut filtered_rows = 0;
ParquetReaderBuilder::prune_row_groups_by_rows(
&parquet_meta,
rows_in_row_groups,
&mut output,
&mut filtered_row_groups,
&mut filtered_rows,
);
assert_eq!(
output,
BTreeMap::from([(2, Some(RowSelection::from(vec![RowSelector::select(5)])))])
);
assert_eq!(filtered_row_groups, 2);
assert_eq!(filtered_rows, 10);
}
#[test]
fn prune_row_groups_by_ranges_from_empty() {
let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]);
let ranges_in_row_groups = [(0, Some(5..10).into_iter()), (2, Some(0..5).into_iter())];
// The original output is empty. No row groups are pruned.
let mut output = BTreeMap::new();
let mut filtered_row_groups = 0;
let mut filtered_rows = 0;
ParquetReaderBuilder::prune_row_groups_by_ranges(
&parquet_meta,
ranges_in_row_groups.into_iter(),
&mut output,
&mut filtered_row_groups,
&mut filtered_rows,
);
assert!(output.is_empty());
assert_eq!(filtered_row_groups, 0);
assert_eq!(filtered_rows, 0);
}
#[test]
fn prune_row_groups_by_ranges_from_full() {
let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]);
let ranges_in_row_groups = [(0, Some(5..10).into_iter()), (2, Some(0..5).into_iter())];
// The original output is full.
let mut output = BTreeMap::from([(0, None), (1, None), (2, None)]);
let mut filtered_row_groups = 0;
let mut filtered_rows = 0;
ParquetReaderBuilder::prune_row_groups_by_ranges(
&parquet_meta,
ranges_in_row_groups.into_iter(),
&mut output,
&mut filtered_row_groups,
&mut filtered_rows,
);
assert_eq!(
output,
BTreeMap::from([
(
0,
Some(RowSelection::from(vec![
RowSelector::skip(5),
RowSelector::select(5),
]))
),
(2, Some(RowSelection::from(vec![RowSelector::select(5)]))),
])
);
assert_eq!(filtered_row_groups, 1);
assert_eq!(filtered_rows, 15);
}
#[test]
fn prune_row_groups_by_ranges_from_not_full() {
let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]);
let ranges_in_row_groups = [(0, Some(5..10).into_iter()), (2, Some(0..5).into_iter())];
// The original output is not full.
let mut output = BTreeMap::from([
(
0,
Some(RowSelection::from(vec![
RowSelector::select(5),
RowSelector::skip(5),
])),
),
(
1,
Some(RowSelection::from(vec![
RowSelector::select(5),
RowSelector::skip(5),
])),
),
(2, Some(RowSelection::from(vec![RowSelector::select(5)]))),
]);
let mut filtered_row_groups = 0;
let mut filtered_rows = 0;
ParquetReaderBuilder::prune_row_groups_by_ranges(
&parquet_meta,
ranges_in_row_groups.into_iter(),
&mut output,
&mut filtered_row_groups,
&mut filtered_rows,
);
assert_eq!(
output,
BTreeMap::from([(2, Some(RowSelection::from(vec![RowSelector::select(5)])))])
);
assert_eq!(filtered_row_groups, 2);
assert_eq!(filtered_rows, 10);
}
#[test]
fn prune_row_groups_by_ranges_exceed_end() {
let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]);
// The range exceeds the end of the row group.
let ranges_in_row_groups = [(0, Some(5..10).into_iter()), (2, Some(0..10).into_iter())];
let mut output = BTreeMap::from([
(
0,
Some(RowSelection::from(vec![
RowSelector::select(5),
RowSelector::skip(5),
])),
),
(
1,
Some(RowSelection::from(vec![
RowSelector::select(5),
RowSelector::skip(5),
])),
),
(2, Some(RowSelection::from(vec![RowSelector::select(5)]))),
]);
let mut filtered_row_groups = 0;
let mut filtered_rows = 0;
ParquetReaderBuilder::prune_row_groups_by_ranges(
&parquet_meta,
ranges_in_row_groups.into_iter(),
&mut output,
&mut filtered_row_groups,
&mut filtered_rows,
);
assert_eq!(
output,
BTreeMap::from([(2, Some(RowSelection::from(vec![RowSelector::select(5)])))])
);
assert_eq!(filtered_row_groups, 2);
assert_eq!(filtered_rows, 10);
}
}

View File

@@ -12,10 +12,333 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeMap, BTreeSet};
use std::ops::Range;
use index::inverted_index::search::index_apply::ApplyOutput;
use itertools::Itertools;
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
/// A selection of row groups.
#[derive(Debug, Clone, Default)]
pub struct RowGroupSelection {
/// Row group id to row selection.
selection_in_rg: BTreeMap<usize, RowSelectionWithCount>,
/// Total number of rows in the selection.
row_count: usize,
}
/// A row selection with its count.
#[derive(Debug, Clone)]
struct RowSelectionWithCount {
/// Row selection.
selection: RowSelection,
/// Number of rows in the selection.
row_count: usize,
}
impl RowGroupSelection {
/// Creates a new `RowGroupSelection` with all row groups selected.
///
/// # Arguments
/// * `row_group_size` - The number of rows in each row group (except possibly the last one)
/// * `total_row_count` - Total number of rows
pub fn new(row_group_size: usize, total_row_count: usize) -> Self {
let mut selection_in_rg = BTreeMap::new();
let row_group_count = total_row_count.div_ceil(row_group_size);
for rg_id in 0..row_group_count {
// The last row group may have fewer rows than `row_group_size`
let row_group_size = if rg_id == row_group_count - 1 {
total_row_count - (row_group_count - 1) * row_group_size
} else {
row_group_size
};
let selection = RowSelection::from(vec![RowSelector::select(row_group_size)]);
selection_in_rg.insert(
rg_id,
RowSelectionWithCount {
selection,
row_count: row_group_size,
},
);
}
Self {
selection_in_rg,
row_count: total_row_count,
}
}
/// Returns the row selection for a given row group.
///
/// `None` indicates not selected.
pub fn get(&self, rg_id: usize) -> Option<&RowSelection> {
self.selection_in_rg.get(&rg_id).map(|x| &x.selection)
}
/// Creates a new `RowGroupSelection` from the output of inverted index application.
///
/// # Arguments
/// * `row_group_size` - The number of rows in each row group (except possibly the last one)
/// * `apply_output` - The output from applying the inverted index
///
/// # Assumptions
/// * All row groups (except possibly the last one) have the same number of rows
/// * The last row group may have fewer rows than `row_group_size`
pub fn from_inverted_index_apply_output(
row_group_size: usize,
apply_output: ApplyOutput,
) -> Self {
// Step 1: Convert segment IDs to row ranges within row groups
// For each segment ID, calculate its corresponding row range in the row group
let segment_row_count = apply_output.segment_row_count;
let row_group_ranges = apply_output.matched_segment_ids.iter_ones().map(|seg_id| {
// Calculate the global row ID where this segment starts
let begin_row_id = seg_id * segment_row_count;
// Determine which row group this segment belongs to
let row_group_id = begin_row_id / row_group_size;
// Calculate the offset within the row group
let rg_begin_row_id = begin_row_id % row_group_size;
// Ensure the end row ID doesn't exceed the row group size
let rg_end_row_id = (rg_begin_row_id + segment_row_count).min(row_group_size);
(row_group_id, rg_begin_row_id..rg_end_row_id)
});
// Step 2: Group ranges by row group ID and create row selections
let mut total_row_count = 0;
let selection_in_rg = row_group_ranges
.chunk_by(|(row_group_id, _)| *row_group_id)
.into_iter()
.map(|(row_group_id, group)| {
// Extract just the ranges from the group
let ranges = group.map(|(_, ranges)| ranges);
// Create row selection from the ranges
// Note: We use `row_group_size` here, which is safe because:
// 1. For non-last row groups, it's the actual size
// 2. For the last row group, any ranges beyond the actual size will be clipped
// by the min() operation above
let selection = row_selection_from_row_ranges(ranges, row_group_size);
let row_count = selection.row_count();
total_row_count += row_count;
(
row_group_id,
RowSelectionWithCount {
selection,
row_count,
},
)
})
.collect();
Self {
selection_in_rg,
row_count: total_row_count,
}
}
/// Creates a new `RowGroupSelection` from a set of row IDs.
///
/// # Arguments
/// * `row_ids` - Set of row IDs to select
/// * `row_group_size` - The number of rows in each row group (except possibly the last one)
/// * `num_row_groups` - Total number of row groups
///
/// # Assumptions
/// * All row groups (except possibly the last one) have the same number of rows
/// * The last row group may have fewer rows than `row_group_size`
/// * All row IDs must within the range of [0, num_row_groups * row_group_size)
pub fn from_row_ids(
row_ids: BTreeSet<u32>,
row_group_size: usize,
num_row_groups: usize,
) -> Self {
// Step 1: Group row IDs by their row group
let row_group_to_row_ids =
Self::group_row_ids_by_row_group(row_ids, row_group_size, num_row_groups);
// Step 2: Create row selections for each row group
let mut total_row_count = 0;
let selection_in_rg = row_group_to_row_ids
.into_iter()
.map(|(row_group_id, row_ids)| {
let selection =
row_selection_from_sorted_row_ids(row_ids.into_iter(), row_group_size);
let row_count = selection.row_count();
total_row_count += row_count;
(
row_group_id,
RowSelectionWithCount {
selection,
row_count,
},
)
})
.collect();
Self {
selection_in_rg,
row_count: total_row_count,
}
}
/// Creates a new `RowGroupSelection` from a set of row ranges.
///
/// # Arguments
/// * `row_ranges` - A vector of (row_group_id, row_ranges) pairs
/// * `row_group_size` - The number of rows in each row group (except possibly the last one)
///
/// # Assumptions
/// * All row groups (except possibly the last one) have the same number of rows
/// * The last row group may have fewer rows than `row_group_size`
/// * All ranges in `row_ranges` must be within the bounds of their respective row groups
/// (i.e., for row group i, all ranges must be within [0, row_group_size) or [0, remaining_rows) for the last row group)
/// * Ranges within the same row group must not overlap. Overlapping ranges will result in undefined behavior.
pub fn from_row_ranges(
row_ranges: Vec<(usize, Vec<Range<usize>>)>,
row_group_size: usize,
) -> Self {
let mut total_row_count = 0;
let selection_in_rg = row_ranges
.into_iter()
.map(|(row_group_id, ranges)| {
let selection = row_selection_from_row_ranges(ranges.into_iter(), row_group_size);
let row_count = selection.row_count();
total_row_count += row_count;
(
row_group_id,
RowSelectionWithCount {
selection,
row_count,
},
)
})
.collect();
Self {
selection_in_rg,
row_count: total_row_count,
}
}
/// Groups row IDs by their row group.
///
/// # Arguments
/// * `row_ids` - Set of row IDs to group
/// * `row_group_size` - Size of each row group
/// * `num_row_groups` - Total number of row groups
///
/// # Returns
/// A vector of (row_group_id, row_ids) pairs, where row_ids are the IDs within that row group.
fn group_row_ids_by_row_group(
row_ids: BTreeSet<u32>,
row_group_size: usize,
num_row_groups: usize,
) -> Vec<(usize, Vec<usize>)> {
let est_rows_per_group = row_ids.len() / num_row_groups;
let mut row_group_to_row_ids: Vec<(usize, Vec<usize>)> = Vec::with_capacity(num_row_groups);
for row_id in row_ids {
let row_group_id = row_id as usize / row_group_size;
let row_id_in_group = row_id as usize % row_group_size;
if let Some((rg_id, row_ids)) = row_group_to_row_ids.last_mut()
&& *rg_id == row_group_id
{
row_ids.push(row_id_in_group);
} else {
let mut row_ids = Vec::with_capacity(est_rows_per_group);
row_ids.push(row_id_in_group);
row_group_to_row_ids.push((row_group_id, row_ids));
}
}
row_group_to_row_ids
}
/// Intersects two `RowGroupSelection`s.
pub fn intersect(&self, other: &Self) -> Self {
let mut res = BTreeMap::new();
let mut total_row_count = 0;
for (rg_id, x) in other.selection_in_rg.iter() {
let Some(y) = self.selection_in_rg.get(rg_id) else {
continue;
};
let selection = x.selection.intersection(&y.selection);
let row_count = selection.row_count();
if row_count > 0 {
total_row_count += row_count;
res.insert(
*rg_id,
RowSelectionWithCount {
selection,
row_count,
},
);
}
}
Self {
selection_in_rg: res,
row_count: total_row_count,
}
}
/// Returns the number of row groups in the selection.
pub fn row_group_count(&self) -> usize {
self.selection_in_rg.len()
}
/// Returns the number of rows in the selection.
pub fn row_count(&self) -> usize {
self.row_count
}
/// Returns the first row group in the selection.
pub fn pop_first(&mut self) -> Option<(usize, RowSelection)> {
let (
row_group_id,
RowSelectionWithCount {
selection,
row_count,
},
) = self.selection_in_rg.pop_first()?;
self.row_count -= row_count;
Some((row_group_id, selection))
}
/// Removes a row group from the selection.
pub fn remove_row_group(&mut self, row_group_id: usize) {
let Some(RowSelectionWithCount { row_count, .. }) =
self.selection_in_rg.remove(&row_group_id)
else {
return;
};
self.row_count -= row_count;
}
/// Returns true if the selection is empty.
pub fn is_empty(&self) -> bool {
self.selection_in_rg.is_empty()
}
/// Returns true if the selection contains a row group with the given ID.
pub fn contains_row_group(&self, row_group_id: usize) -> bool {
self.selection_in_rg.contains_key(&row_group_id)
}
/// Returns an iterator over the row groups in the selection.
pub fn iter(&self) -> impl Iterator<Item = (&usize, &RowSelection)> {
self.selection_in_rg
.iter()
.map(|(row_group_id, x)| (row_group_id, &x.selection))
}
}
/// Converts an iterator of row ranges into a `RowSelection` by creating a sequence of `RowSelector`s.
///
/// This function processes each range in the input and either creates a new selector or merges
@@ -78,19 +401,6 @@ pub(crate) fn row_selection_from_sorted_row_ids(
RowSelection::from(selectors)
}
/// Intersects two `RowSelection`s.
pub(crate) fn intersect_row_selections(
a: Option<RowSelection>,
b: Option<RowSelection>,
) -> Option<RowSelection> {
match (a, b) {
(Some(a), Some(b)) => Some(a.intersection(&b)),
(Some(a), None) => Some(a),
(None, Some(b)) => Some(b),
(None, None) => None,
}
}
/// Helper function to either add a new `RowSelector` to `selectors` or merge it with the last one
/// if they are of the same type (both skip or both select).
fn add_or_merge_selector(selectors: &mut Vec<RowSelector>, count: usize, is_skip: bool) {
@@ -111,6 +421,7 @@ fn add_or_merge_selector(selectors: &mut Vec<RowSelector>, count: usize, is_skip
}
#[cfg(test)]
#[allow(clippy::single_range_in_vec_init)]
mod tests {
use super::*;
@@ -210,4 +521,471 @@ mod tests {
let expected = RowSelection::from(vec![RowSelector::skip(10)]);
assert_eq!(selection, expected);
}
#[test]
fn test_group_row_ids() {
let row_ids = [0, 1, 2, 5, 6, 7, 8, 12].into_iter().collect();
let row_group_size = 5;
let num_row_groups = 3;
let row_group_to_row_ids =
RowGroupSelection::group_row_ids_by_row_group(row_ids, row_group_size, num_row_groups);
assert_eq!(
row_group_to_row_ids,
vec![(0, vec![0, 1, 2]), (1, vec![0, 1, 2, 3]), (2, vec![2])]
);
}
#[test]
fn test_row_group_selection_new() {
// Test with regular case
let selection = RowGroupSelection::new(100, 250);
assert_eq!(selection.row_count(), 250);
assert_eq!(selection.row_group_count(), 3);
// Check content of each row group
let row_selection = selection.get(0).unwrap();
assert_eq!(row_selection.row_count(), 100);
let row_selection = selection.get(1).unwrap();
assert_eq!(row_selection.row_count(), 100);
let row_selection = selection.get(2).unwrap();
assert_eq!(row_selection.row_count(), 50);
// Test with empty selection
let selection = RowGroupSelection::new(100, 0);
assert_eq!(selection.row_count(), 0);
assert_eq!(selection.row_group_count(), 0);
assert!(selection.get(0).is_none());
// Test with single row group
let selection = RowGroupSelection::new(100, 50);
assert_eq!(selection.row_count(), 50);
assert_eq!(selection.row_group_count(), 1);
let row_selection = selection.get(0).unwrap();
assert_eq!(row_selection.row_count(), 50);
// Test with row count that doesn't divide evenly
let selection = RowGroupSelection::new(100, 150);
assert_eq!(selection.row_count(), 150);
assert_eq!(selection.row_group_count(), 2);
let row_selection = selection.get(0).unwrap();
assert_eq!(row_selection.row_count(), 100);
let row_selection = selection.get(1).unwrap();
assert_eq!(row_selection.row_count(), 50);
// Test with row count that's just over a multiple of row_group_size
let selection = RowGroupSelection::new(100, 101);
assert_eq!(selection.row_count(), 101);
assert_eq!(selection.row_group_count(), 2);
let row_selection = selection.get(0).unwrap();
assert_eq!(row_selection.row_count(), 100);
let row_selection = selection.get(1).unwrap();
assert_eq!(row_selection.row_count(), 1);
}
#[test]
fn test_from_row_ids() {
let row_group_size = 100;
let num_row_groups = 3;
// Test with regular case
let row_ids: BTreeSet<u32> = vec![5, 15, 25, 35, 105, 115, 205, 215]
.into_iter()
.collect();
let selection = RowGroupSelection::from_row_ids(row_ids, row_group_size, num_row_groups);
assert_eq!(selection.row_count(), 8);
assert_eq!(selection.row_group_count(), 3);
// Check content of each row group
let row_selection = selection.get(0).unwrap();
assert_eq!(row_selection.row_count(), 4); // 5, 15, 25, 35
let row_selection = selection.get(1).unwrap();
assert_eq!(row_selection.row_count(), 2); // 105, 115
let row_selection = selection.get(2).unwrap();
assert_eq!(row_selection.row_count(), 2); // 205, 215
// Test with empty row IDs
let empty_row_ids: BTreeSet<u32> = BTreeSet::new();
let selection =
RowGroupSelection::from_row_ids(empty_row_ids, row_group_size, num_row_groups);
assert_eq!(selection.row_count(), 0);
assert_eq!(selection.row_group_count(), 0);
assert!(selection.get(0).is_none());
// Test with consecutive row IDs
let consecutive_row_ids: BTreeSet<u32> = vec![5, 6, 7, 8, 9].into_iter().collect();
let selection =
RowGroupSelection::from_row_ids(consecutive_row_ids, row_group_size, num_row_groups);
assert_eq!(selection.row_count(), 5);
assert_eq!(selection.row_group_count(), 1);
let row_selection = selection.get(0).unwrap();
assert_eq!(row_selection.row_count(), 5); // 5, 6, 7, 8, 9
// Test with row IDs at row group boundaries
let boundary_row_ids: BTreeSet<u32> = vec![0, 99, 100, 199, 200, 249].into_iter().collect();
let selection =
RowGroupSelection::from_row_ids(boundary_row_ids, row_group_size, num_row_groups);
assert_eq!(selection.row_count(), 6);
assert_eq!(selection.row_group_count(), 3);
let row_selection = selection.get(0).unwrap();
assert_eq!(row_selection.row_count(), 2); // 0, 99
let row_selection = selection.get(1).unwrap();
assert_eq!(row_selection.row_count(), 2); // 100, 199
let row_selection = selection.get(2).unwrap();
assert_eq!(row_selection.row_count(), 2); // 200, 249
// Test with single row group
let single_group_row_ids: BTreeSet<u32> = vec![5, 10, 15].into_iter().collect();
let selection = RowGroupSelection::from_row_ids(single_group_row_ids, row_group_size, 1);
assert_eq!(selection.row_count(), 3);
assert_eq!(selection.row_group_count(), 1);
let row_selection = selection.get(0).unwrap();
assert_eq!(row_selection.row_count(), 3); // 5, 10, 15
}
#[test]
fn test_from_row_ranges() {
let row_group_size = 100;
// Test with regular case
let ranges = vec![
(0, vec![5..15, 25..35]), // Within [0, 100)
(1, vec![5..15]), // Within [0, 100)
(2, vec![0..5, 10..15]), // Within [0, 50) for last row group
];
let selection = RowGroupSelection::from_row_ranges(ranges, row_group_size);
assert_eq!(selection.row_count(), 40);
assert_eq!(selection.row_group_count(), 3);
// Check content of each row group
let row_selection = selection.get(0).unwrap();
assert_eq!(row_selection.row_count(), 20); // 5..15 (10) + 25..35 (10)
let row_selection = selection.get(1).unwrap();
assert_eq!(row_selection.row_count(), 10); // 5..15 (10)
let row_selection = selection.get(2).unwrap();
assert_eq!(row_selection.row_count(), 10); // 0..5 (5) + 10..15 (5)
// Test with empty ranges
let empty_ranges: Vec<(usize, Vec<Range<usize>>)> = vec![];
let selection = RowGroupSelection::from_row_ranges(empty_ranges, row_group_size);
assert_eq!(selection.row_count(), 0);
assert_eq!(selection.row_group_count(), 0);
assert!(selection.get(0).is_none());
// Test with adjacent ranges within same row group
let adjacent_ranges = vec![
(0, vec![5..15, 15..25]), // Adjacent ranges within [0, 100)
];
let selection = RowGroupSelection::from_row_ranges(adjacent_ranges, row_group_size);
assert_eq!(selection.row_count(), 20);
assert_eq!(selection.row_group_count(), 1);
let row_selection = selection.get(0).unwrap();
assert_eq!(row_selection.row_count(), 20); // 5..15 (10) + 15..25 (10)
// Test with ranges at row group boundaries
let boundary_ranges = vec![
(0, vec![0..10, 90..100]), // Ranges at start and end of first row group
(1, vec![0..100]), // Full range of second row group
(2, vec![0..50]), // Full range of last row group
];
let selection = RowGroupSelection::from_row_ranges(boundary_ranges, row_group_size);
assert_eq!(selection.row_count(), 170);
assert_eq!(selection.row_group_count(), 3);
let row_selection = selection.get(0).unwrap();
assert_eq!(row_selection.row_count(), 20); // 0..10 (10) + 90..100 (10)
let row_selection = selection.get(1).unwrap();
assert_eq!(row_selection.row_count(), 100); // 0..100 (100)
let row_selection = selection.get(2).unwrap();
assert_eq!(row_selection.row_count(), 50); // 0..50 (50)
// Test with single row group
let single_group_ranges = vec![
(0, vec![0..50]), // Half of first row group
];
let selection = RowGroupSelection::from_row_ranges(single_group_ranges, row_group_size);
assert_eq!(selection.row_count(), 50);
assert_eq!(selection.row_group_count(), 1);
let row_selection = selection.get(0).unwrap();
assert_eq!(row_selection.row_count(), 50); // 0..50 (50)
}
#[test]
fn test_intersect() {
let row_group_size = 100;
// Test case 1: Regular intersection with partial overlap
let ranges1 = vec![
(0, vec![5..15, 25..35]), // Within [0, 100)
(1, vec![5..15]), // Within [0, 100)
];
let selection1 = RowGroupSelection::from_row_ranges(ranges1, row_group_size);
let ranges2 = vec![
(0, vec![10..20]), // Within [0, 100)
(1, vec![10..20]), // Within [0, 100)
(2, vec![0..10]), // Within [0, 50) for last row group
];
let selection2 = RowGroupSelection::from_row_ranges(ranges2, row_group_size);
let intersection = selection1.intersect(&selection2);
assert_eq!(intersection.row_count(), 10);
assert_eq!(intersection.row_group_count(), 2);
let row_selection = intersection.get(0).unwrap();
assert_eq!(row_selection.row_count(), 5); // 10..15 (5)
let row_selection = intersection.get(1).unwrap();
assert_eq!(row_selection.row_count(), 5); // 10..15 (5)
// Test case 2: Empty intersection with empty selection
let empty_ranges: Vec<(usize, Vec<Range<usize>>)> = vec![];
let empty_selection = RowGroupSelection::from_row_ranges(empty_ranges, row_group_size);
let intersection = selection1.intersect(&empty_selection);
assert_eq!(intersection.row_count(), 0);
assert_eq!(intersection.row_group_count(), 0);
assert!(intersection.get(0).is_none());
// Test case 3: No overlapping row groups
let non_overlapping_ranges = vec![
(3, vec![0..10]), // Within [0, 50) for last row group
];
let non_overlapping_selection =
RowGroupSelection::from_row_ranges(non_overlapping_ranges, row_group_size);
let intersection = selection1.intersect(&non_overlapping_selection);
assert_eq!(intersection.row_count(), 0);
assert_eq!(intersection.row_group_count(), 0);
assert!(intersection.get(0).is_none());
// Test case 4: Full overlap within same row group
let full_overlap_ranges1 = vec![
(0, vec![0..50]), // Within [0, 100)
];
let full_overlap_ranges2 = vec![
(0, vec![0..50]), // Within [0, 100)
];
let selection1 = RowGroupSelection::from_row_ranges(full_overlap_ranges1, row_group_size);
let selection2 = RowGroupSelection::from_row_ranges(full_overlap_ranges2, row_group_size);
let intersection = selection1.intersect(&selection2);
assert_eq!(intersection.row_count(), 50);
assert_eq!(intersection.row_group_count(), 1);
let row_selection = intersection.get(0).unwrap();
assert_eq!(row_selection.row_count(), 50); // 0..50 (50)
// Test case 5: Partial overlap at row group boundaries
let boundary_ranges1 = vec![
(0, vec![0..10, 90..100]), // Within [0, 100)
(1, vec![0..100]), // Within [0, 100)
];
let boundary_ranges2 = vec![
(0, vec![5..15, 95..100]), // Within [0, 100)
(1, vec![50..100]), // Within [0, 100)
];
let selection1 = RowGroupSelection::from_row_ranges(boundary_ranges1, row_group_size);
let selection2 = RowGroupSelection::from_row_ranges(boundary_ranges2, row_group_size);
let intersection = selection1.intersect(&selection2);
assert_eq!(intersection.row_count(), 60);
assert_eq!(intersection.row_group_count(), 2);
let row_selection = intersection.get(0).unwrap();
assert_eq!(row_selection.row_count(), 10); // 5..10 (5) + 95..100 (5)
let row_selection = intersection.get(1).unwrap();
assert_eq!(row_selection.row_count(), 50); // 50..100 (50)
// Test case 6: Multiple ranges with complex overlap
let complex_ranges1 = vec![
(0, vec![5..15, 25..35, 45..55]), // Within [0, 100)
(1, vec![10..20, 30..40]), // Within [0, 100)
];
let complex_ranges2 = vec![
(0, vec![10..20, 30..40, 50..60]), // Within [0, 100)
(1, vec![15..25, 35..45]), // Within [0, 100)
];
let selection1 = RowGroupSelection::from_row_ranges(complex_ranges1, row_group_size);
let selection2 = RowGroupSelection::from_row_ranges(complex_ranges2, row_group_size);
let intersection = selection1.intersect(&selection2);
assert_eq!(intersection.row_count(), 25);
assert_eq!(intersection.row_group_count(), 2);
let row_selection = intersection.get(0).unwrap();
assert_eq!(row_selection.row_count(), 15); // 10..15 (5) + 30..35 (5) + 50..55 (5)
let row_selection = intersection.get(1).unwrap();
assert_eq!(row_selection.row_count(), 10); // 15..20 (5) + 35..40 (5)
// Test case 7: Intersection with last row group (smaller size)
let last_rg_ranges1 = vec![
(2, vec![0..25, 30..40]), // Within [0, 50) for last row group
];
let last_rg_ranges2 = vec![
(2, vec![20..30, 35..45]), // Within [0, 50) for last row group
];
let selection1 = RowGroupSelection::from_row_ranges(last_rg_ranges1, row_group_size);
let selection2 = RowGroupSelection::from_row_ranges(last_rg_ranges2, row_group_size);
let intersection = selection1.intersect(&selection2);
assert_eq!(intersection.row_count(), 10);
assert_eq!(intersection.row_group_count(), 1);
let row_selection = intersection.get(2).unwrap();
assert_eq!(row_selection.row_count(), 10); // 20..25 (5) + 35..40 (5)
// Test case 8: Intersection with empty ranges in one selection
let empty_ranges = vec![
(0, vec![]), // Empty ranges
(1, vec![5..15]), // Within [0, 100)
];
let selection1 = RowGroupSelection::from_row_ranges(empty_ranges, row_group_size);
let ranges2 = vec![
(0, vec![5..15, 25..35]), // Within [0, 100)
(1, vec![5..15]), // Within [0, 100)
];
let selection2 = RowGroupSelection::from_row_ranges(ranges2, row_group_size);
let intersection = selection1.intersect(&selection2);
assert_eq!(intersection.row_count(), 10);
assert_eq!(intersection.row_group_count(), 1);
let row_selection = intersection.get(1).unwrap();
assert_eq!(row_selection.row_count(), 10); // 5..15 (10)
}
#[test]
fn test_pop_first() {
let row_group_size = 100;
let ranges = vec![
(0, vec![5..15]), // Within [0, 100)
(1, vec![5..15]), // Within [0, 100)
(2, vec![0..5]), // Within [0, 50) for last row group
];
let mut selection = RowGroupSelection::from_row_ranges(ranges, row_group_size);
// Test popping first row group
let (rg_id, row_selection) = selection.pop_first().unwrap();
assert_eq!(rg_id, 0);
assert_eq!(row_selection.row_count(), 10); // 5..15 (10)
assert_eq!(selection.row_count(), 15);
assert_eq!(selection.row_group_count(), 2);
// Verify remaining row groups' content
let row_selection = selection.get(1).unwrap();
assert_eq!(row_selection.row_count(), 10); // 5..15 (10)
let row_selection = selection.get(2).unwrap();
assert_eq!(row_selection.row_count(), 5); // 0..5 (5)
// Test popping second row group
let (rg_id, row_selection) = selection.pop_first().unwrap();
assert_eq!(rg_id, 1);
assert_eq!(row_selection.row_count(), 10); // 5..15 (10)
assert_eq!(selection.row_count(), 5);
assert_eq!(selection.row_group_count(), 1);
// Verify remaining row group's content
let row_selection = selection.get(2).unwrap();
assert_eq!(row_selection.row_count(), 5); // 0..5 (5)
// Test popping last row group
let (rg_id, row_selection) = selection.pop_first().unwrap();
assert_eq!(rg_id, 2);
assert_eq!(row_selection.row_count(), 5); // 0..5 (5)
assert_eq!(selection.row_count(), 0);
assert_eq!(selection.row_group_count(), 0);
assert!(selection.is_empty());
// Test popping from empty selection
let mut empty_selection = RowGroupSelection::from_row_ranges(vec![], row_group_size);
assert!(empty_selection.pop_first().is_none());
assert_eq!(empty_selection.row_count(), 0);
assert_eq!(empty_selection.row_group_count(), 0);
assert!(empty_selection.is_empty());
}
#[test]
fn test_remove_row_group() {
let row_group_size = 100;
let ranges = vec![
(0, vec![5..15]), // Within [0, 100)
(1, vec![5..15]), // Within [0, 100)
(2, vec![0..5]), // Within [0, 50) for last row group
];
let mut selection = RowGroupSelection::from_row_ranges(ranges, row_group_size);
// Test removing existing row group
selection.remove_row_group(1);
assert_eq!(selection.row_count(), 15);
assert_eq!(selection.row_group_count(), 2);
assert!(!selection.contains_row_group(1));
// Verify remaining row groups' content
let row_selection = selection.get(0).unwrap();
assert_eq!(row_selection.row_count(), 10); // 5..15 (10)
let row_selection = selection.get(2).unwrap();
assert_eq!(row_selection.row_count(), 5); // 0..5 (5)
// Test removing non-existent row group
selection.remove_row_group(5);
assert_eq!(selection.row_count(), 15);
assert_eq!(selection.row_group_count(), 2);
// Test removing all row groups
selection.remove_row_group(0);
assert_eq!(selection.row_count(), 5);
assert_eq!(selection.row_group_count(), 1);
let row_selection = selection.get(2).unwrap();
assert_eq!(row_selection.row_count(), 5); // 0..5 (5)
selection.remove_row_group(2);
assert_eq!(selection.row_count(), 0);
assert_eq!(selection.row_group_count(), 0);
assert!(selection.is_empty());
// Test removing from empty selection
let mut empty_selection = RowGroupSelection::from_row_ranges(vec![], row_group_size);
empty_selection.remove_row_group(0);
assert_eq!(empty_selection.row_count(), 0);
assert_eq!(empty_selection.row_group_count(), 0);
assert!(empty_selection.is_empty());
}
#[test]
fn test_contains_row_group() {
let row_group_size = 100;
let ranges = vec![
(0, vec![5..15]), // Within [0, 100)
(1, vec![5..15]), // Within [0, 100)
];
let selection = RowGroupSelection::from_row_ranges(ranges, row_group_size);
assert!(selection.contains_row_group(0));
assert!(selection.contains_row_group(1));
assert!(!selection.contains_row_group(2));
// Test empty selection
let empty_selection = RowGroupSelection::from_row_ranges(vec![], row_group_size);
assert!(!empty_selection.contains_row_group(0));
}
}