From 50148b25b5673b0eb2c5137b9f2bdb7f0a4222dd Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Thu, 17 Jul 2025 01:00:10 +0800 Subject: [PATCH] fix: row selection intersection removes trailing rows (#6539) * fix: row selection intersection removes trailing rows Signed-off-by: Zhenchi * fix typos Signed-off-by: Zhenchi --------- Signed-off-by: Zhenchi --- src/mito2/src/sst/parquet/row_selection.rs | 78 ++++++++++++++++++---- 1 file changed, 65 insertions(+), 13 deletions(-) diff --git a/src/mito2/src/sst/parquet/row_selection.rs b/src/mito2/src/sst/parquet/row_selection.rs index efe2bae8ef..10acf76f18 100644 --- a/src/mito2/src/sst/parquet/row_selection.rs +++ b/src/mito2/src/sst/parquet/row_selection.rs @@ -294,7 +294,7 @@ impl RowGroupSelection { let Some(y) = self.selection_in_rg.get(rg_id) else { continue; }; - let selection = x.selection.intersection(&y.selection); + let selection = intersect_row_selections(&x.selection, &y.selection); let row_count = selection.row_count(); let selector_len = selector_len(&selection); if row_count > 0 { @@ -423,6 +423,68 @@ impl RowGroupSelection { } } +/// Ported from `parquet` but trailing rows are removed. +/// +/// Combine two lists of `RowSelection` return the intersection of them +/// For example: +/// self: NNYYYYNNYYNYN +/// other: NYNNNNNNY +/// +/// returned: NNNNNNNNY (modified) +/// NNNNNNNNYYNYN (original) +fn intersect_row_selections(left: &RowSelection, right: &RowSelection) -> RowSelection { + let mut l_iter = left.iter().copied().peekable(); + let mut r_iter = right.iter().copied().peekable(); + + let iter = std::iter::from_fn(move || { + loop { + let l = l_iter.peek_mut(); + let r = r_iter.peek_mut(); + + match (l, r) { + (Some(a), _) if a.row_count == 0 => { + l_iter.next().unwrap(); + } + (_, Some(b)) if b.row_count == 0 => { + r_iter.next().unwrap(); + } + (Some(l), Some(r)) => { + return match (l.skip, r.skip) { + // Keep both ranges + (false, false) => { + if l.row_count < r.row_count { + r.row_count -= l.row_count; + l_iter.next() + } else { + l.row_count -= r.row_count; + r_iter.next() + } + } + // skip at least one + _ => { + if l.row_count < r.row_count { + let skip = l.row_count; + r.row_count -= l.row_count; + l_iter.next(); + Some(RowSelector::skip(skip)) + } else { + let skip = r.row_count; + l.row_count -= skip; + r_iter.next(); + Some(RowSelector::skip(skip)) + } + } + }; + } + (None, _) => return None, + (_, None) => return None, + } + } + }); + + iter.collect() +} + /// Converts an iterator of row ranges into a `RowSelection` by creating a sequence of `RowSelector`s. /// /// This function processes each range in the input and either creates a new selector or merges @@ -448,10 +510,6 @@ pub(crate) fn row_selection_from_row_ranges( last_processed_end = end; } - if last_processed_end < total_row_count { - add_or_merge_selector(&mut selectors, total_row_count - last_processed_end, true); - } - RowSelection::from(selectors) } @@ -546,7 +604,6 @@ mod tests { RowSelector::select(2), RowSelector::skip(2), RowSelector::select(3), - RowSelector::skip(2), ]); assert_eq!(selection, expected); } @@ -555,7 +612,7 @@ mod tests { fn test_empty_range() { let ranges = []; let selection = row_selection_from_row_ranges(ranges.iter().cloned(), 10); - let expected = RowSelection::from(vec![RowSelector::skip(10)]); + let expected = RowSelection::from(vec![]); assert_eq!(selection, expected); } @@ -563,11 +620,7 @@ mod tests { fn test_adjacent_ranges() { let ranges = [1..2, 2..3]; let selection = row_selection_from_row_ranges(ranges.iter().cloned(), 10); - let expected = RowSelection::from(vec![ - RowSelector::skip(1), - RowSelector::select(2), - RowSelector::skip(7), - ]); + let expected = RowSelection::from(vec![RowSelector::skip(1), RowSelector::select(2)]); assert_eq!(selection, expected); } @@ -580,7 +633,6 @@ mod tests { RowSelector::select(1), RowSelector::skip(98), RowSelector::select(1), - RowSelector::skip(10139), ]); assert_eq!(selection, expected); }