From a42a96f470fd6e4cf875ebd0741e36bd543ec4fd Mon Sep 17 00:00:00 2001 From: PSeitz Date: Wed, 8 Mar 2023 21:04:37 +0800 Subject: [PATCH] fix panic in dict column merge (#1930) * fix panic in dict column merge * Bugfix and added unit test --------- Co-authored-by: Paul Masurel --- columnar/src/column_values/u64_based/tests.rs | 2 +- .../src/columnar/merge/merge_dict_column.rs | 28 +++---- columnar/src/columnar/merge/tests.rs | 76 +++++++++++++++++-- 3 files changed, 81 insertions(+), 25 deletions(-) diff --git a/columnar/src/column_values/u64_based/tests.rs b/columnar/src/column_values/u64_based/tests.rs index 0a446930e..0f90da119 100644 --- a/columnar/src/column_values/u64_based/tests.rs +++ b/columnar/src/column_values/u64_based/tests.rs @@ -1,6 +1,6 @@ use proptest::prelude::*; use proptest::strategy::Strategy; -use proptest::{num, prop_oneof, proptest}; +use proptest::{prop_oneof, proptest}; #[test] fn test_serialize_and_load_simple() { diff --git a/columnar/src/columnar/merge/merge_dict_column.rs b/columnar/src/columnar/merge/merge_dict_column.rs index 464aa49b1..a49e2078a 100644 --- a/columnar/src/columnar/merge/merge_dict_column.rs +++ b/columnar/src/columnar/merge/merge_dict_column.rs @@ -52,21 +52,18 @@ impl<'a> Iterable for RemappedTermOrdinalsValues<'a> { impl<'a> RemappedTermOrdinalsValues<'a> { fn boxed_iter_stacked(&self) -> Box + '_> { - let iter = self - .bytes_columns - .iter() - .enumerate() - .flat_map(|(segment_ord, byte_column)| { - let segment_ord = self.term_ord_mapping.get_segment(segment_ord as u32); - byte_column.iter().flat_map(move |bytes_column| { - bytes_column - .ords() - .values - .iter() - .map(move |term_ord| segment_ord[term_ord as usize]) - }) - }); - // TODO see if we can better decompose the mapping / and the stacking + let iter = self.bytes_columns.iter().flatten().enumerate().flat_map( + move |(seg_ord_with_column, bytes_column)| { + let term_ord_after_merge_mapping = self + .term_ord_mapping + .get_segment(seg_ord_with_column as u32); + bytes_column + .ords() + .values + .iter() + .map(move |term_ord| term_ord_after_merge_mapping[term_ord as usize]) + }, + ); Box::new(iter) } @@ -133,7 +130,6 @@ fn serialize_merged_dict( let mut merged_terms = TermMerger::new(field_term_streams); let mut sstable_builder = sstable::VoidSSTable::writer(output); - // TODO support complex `merge_row_order`. match merge_row_order { MergeRowOrder::Stack(_) => { let mut current_term_ord = 0; diff --git a/columnar/src/columnar/merge/tests.rs b/columnar/src/columnar/merge/tests.rs index b23c6a6ac..10eb8e1be 100644 --- a/columnar/src/columnar/merge/tests.rs +++ b/columnar/src/columnar/merge/tests.rs @@ -153,20 +153,24 @@ fn make_numerical_columnar_multiple_columns( ColumnarReader::open(buffer).unwrap() } -fn make_byte_columnar_multiple_columns(columns: &[(&str, &[&[&[u8]]])]) -> ColumnarReader { +#[track_caller] +fn make_byte_columnar_multiple_columns( + columns: &[(&str, &[&[&[u8]]])], + num_rows: u32, +) -> ColumnarReader { let mut dataframe_writer = ColumnarWriter::default(); for (column_name, column_values) in columns { + assert_eq!( + column_values.len(), + num_rows as usize, + "All columns must have `{num_rows}` rows" + ); for (row_id, vals) in column_values.iter().enumerate() { for val in vals.iter() { dataframe_writer.record_bytes(row_id as u32, column_name, val); } } } - let num_rows = columns - .iter() - .map(|(_, val_rows)| val_rows.len() as RowId) - .max() - .unwrap_or(0u32); let mut buffer: Vec = Vec::new(); dataframe_writer .serialize(num_rows, None, &mut buffer) @@ -272,8 +276,8 @@ fn test_merge_columnar_texts() { #[test] fn test_merge_columnar_byte() { - let columnar1 = make_byte_columnar_multiple_columns(&[("bytes", &[&[b"bbbb"], &[b"baaa"]])]); - let columnar2 = make_byte_columnar_multiple_columns(&[("bytes", &[&[], &[b"a"]])]); + let columnar1 = make_byte_columnar_multiple_columns(&[("bytes", &[&[b"bbbb"], &[b"baaa"]])], 2); + let columnar2 = make_byte_columnar_multiple_columns(&[("bytes", &[&[], &[b"a"]])], 2); let mut buffer = Vec::new(); let columnars = &[&columnar1, &columnar2]; let stack_merge_order = StackMergeOrder::stack(columnars); @@ -316,3 +320,59 @@ fn test_merge_columnar_byte() { assert_eq!(get_bytes_for_row(2), b""); assert_eq!(get_bytes_for_row(3), b"a"); } + +#[test] +fn test_merge_columnar_byte_with_missing() { + let columnar1 = make_byte_columnar_multiple_columns(&[], 3); + let columnar2 = make_byte_columnar_multiple_columns(&[("col", &[&[b"b"], &[]])], 2); + let columnar3 = make_byte_columnar_multiple_columns( + &[ + ("col", &[&[], &[b"b"], &[b"a", b"b"]]), + ("col2", &[&[b"hello"], &[], &[b"a", b"b"]]), + ], + 3, + ); + let mut buffer = Vec::new(); + let columnars = &[&columnar1, &columnar2, &columnar3]; + let stack_merge_order = StackMergeOrder::stack(columnars); + crate::columnar::merge_columnar( + columnars, + &[], + MergeRowOrder::Stack(stack_merge_order), + &mut buffer, + ) + .unwrap(); + let columnar_reader = ColumnarReader::open(buffer).unwrap(); + assert_eq!(columnar_reader.num_rows(), 3 + 2 + 3); + assert_eq!(columnar_reader.num_columns(), 2); + let cols = columnar_reader.read_columns("col").unwrap(); + let dynamic_column = cols[0].open().unwrap(); + let DynamicColumn::Bytes(vals) = dynamic_column else { panic!() }; + let get_bytes_for_ord = |ord| { + let mut out = Vec::new(); + vals.ord_to_bytes(ord, &mut out).unwrap(); + out + }; + assert_eq!(vals.dictionary.num_terms(), 2); + assert_eq!(get_bytes_for_ord(0), b"a"); + assert_eq!(get_bytes_for_ord(1), b"b"); + let get_bytes_for_row = |row_id| { + let terms: Vec> = vals + .term_ords(row_id) + .map(|term_ord| { + let mut out = Vec::new(); + vals.ord_to_bytes(term_ord, &mut out).unwrap(); + out + }) + .collect(); + terms + }; + assert!(get_bytes_for_row(0).is_empty()); + assert!(get_bytes_for_row(1).is_empty()); + assert!(get_bytes_for_row(2).is_empty()); + assert_eq!(get_bytes_for_row(3), vec![b"b".to_vec()]); + assert!(get_bytes_for_row(4).is_empty()); + assert!(get_bytes_for_row(5).is_empty()); + assert_eq!(get_bytes_for_row(6), vec![b"b".to_vec()]); + assert_eq!(get_bytes_for_row(7), vec![b"a".to_vec(), b"b".to_vec()]); +}