From 835f228bfab1836de1df642c03689ca814686a63 Mon Sep 17 00:00:00 2001 From: PSeitz Date: Sat, 25 Mar 2023 14:58:15 +0800 Subject: [PATCH] fix cardinality when merging empty columns (#1960) fixes #1958 --- columnar/src/column_index/merge/mod.rs | 34 ++++---- columnar/src/column_index/merge/shuffled.rs | 53 ++++++------ columnar/src/column_index/merge/stacked.rs | 29 +++---- columnar/src/column_values/merge.rs | 5 +- columnar/src/columnar/merge/mod.rs | 38 ++++++--- columnar/src/columnar/merge/tests.rs | 94 +++++++++++++++++++++ 6 files changed, 174 insertions(+), 79 deletions(-) diff --git a/columnar/src/column_index/merge/mod.rs b/columnar/src/column_index/merge/mod.rs index 3b031b88d..f8b69c008 100644 --- a/columnar/src/column_index/merge/mod.rs +++ b/columnar/src/column_index/merge/mod.rs @@ -8,17 +8,16 @@ use crate::column_index::SerializableColumnIndex; use crate::{Cardinality, ColumnIndex, MergeRowOrder}; // For simplification, we never have cardinality go down due to deletes. -fn detect_cardinality(columns: &[Option]) -> Cardinality { +fn detect_cardinality(columns: &[ColumnIndex]) -> Cardinality { columns .iter() - .flatten() .map(ColumnIndex::get_cardinality) .max() .unwrap_or(Cardinality::Full) } pub fn merge_column_index<'a>( - columns: &'a [Option], + columns: &'a [ColumnIndex], merge_row_order: &'a MergeRowOrder, ) -> SerializableColumnIndex<'a> { // For simplification, we do not try to detect whether the cardinality could be @@ -53,34 +52,33 @@ mod tests { let optional_index: ColumnIndex = OptionalIndex::for_test(1, &[]).into(); let multivalued_index: ColumnIndex = MultiValueIndex::for_test(&[0, 1]).into(); assert_eq!( - detect_cardinality(&[Some(optional_index.clone()), None]), + detect_cardinality(&[optional_index.clone(), ColumnIndex::Empty { num_docs: 0 }]), Cardinality::Optional ); assert_eq!( - detect_cardinality(&[Some(optional_index.clone()), Some(ColumnIndex::Full)]), + detect_cardinality(&[optional_index.clone(), ColumnIndex::Full]), Cardinality::Optional ); - assert_eq!( - detect_cardinality(&[Some(multivalued_index.clone()), None]), - Cardinality::Multivalued - ); assert_eq!( detect_cardinality(&[ - Some(multivalued_index.clone()), - Some(optional_index.clone()) + multivalued_index.clone(), + ColumnIndex::Empty { num_docs: 0 } ]), Cardinality::Multivalued ); assert_eq!( - detect_cardinality(&[Some(optional_index), Some(multivalued_index)]), + detect_cardinality(&[multivalued_index.clone(), optional_index.clone()]), + Cardinality::Multivalued + ); + assert_eq!( + detect_cardinality(&[optional_index, multivalued_index]), Cardinality::Multivalued ); } #[test] fn test_merge_index_multivalued_sorted() { - let column_indexes: Vec> = - vec![Some(MultiValueIndex::for_test(&[0, 2, 5]).into())]; + let column_indexes: Vec = vec![MultiValueIndex::for_test(&[0, 2, 5]).into()]; let merge_row_order: MergeRowOrder = ShuffleMergeOrder::for_test( &[2], vec![ @@ -104,10 +102,10 @@ mod tests { #[test] fn test_merge_index_multivalued_sorted_several_segment() { - let column_indexes: Vec> = vec![ - Some(MultiValueIndex::for_test(&[0, 2, 5]).into()), - None, - Some(MultiValueIndex::for_test(&[0, 1, 4]).into()), + let column_indexes: Vec = vec![ + MultiValueIndex::for_test(&[0, 2, 5]).into(), + ColumnIndex::Empty { num_docs: 0 }, + MultiValueIndex::for_test(&[0, 1, 4]).into(), ]; let merge_row_order: MergeRowOrder = ShuffleMergeOrder::for_test( &[2, 0, 2], diff --git a/columnar/src/column_index/merge/shuffled.rs b/columnar/src/column_index/merge/shuffled.rs index fd2543fd0..f809b0f8f 100644 --- a/columnar/src/column_index/merge/shuffled.rs +++ b/columnar/src/column_index/merge/shuffled.rs @@ -5,7 +5,7 @@ use crate::iterable::Iterable; use crate::{Cardinality, ColumnIndex, RowId, ShuffleMergeOrder}; pub fn merge_column_index_shuffled<'a>( - column_indexes: &'a [Option], + column_indexes: &'a [ColumnIndex], cardinality_after_merge: Cardinality, shuffle_merge_order: &'a ShuffleMergeOrder, ) -> SerializableColumnIndex<'a> { @@ -33,41 +33,41 @@ pub fn merge_column_index_shuffled<'a>( /// /// In other words the column_indexes passed as argument may NOT be multivalued. fn merge_column_index_shuffled_optional<'a>( - column_indexes: &'a [Option], + column_indexes: &'a [ColumnIndex], merge_order: &'a ShuffleMergeOrder, ) -> Box + 'a> { - Box::new(ShuffledOptionalIndex { + Box::new(ShuffledIndex { column_indexes, merge_order, }) } -struct ShuffledOptionalIndex<'a> { - column_indexes: &'a [Option], +struct ShuffledIndex<'a> { + column_indexes: &'a [ColumnIndex], merge_order: &'a ShuffleMergeOrder, } -impl<'a> Iterable for ShuffledOptionalIndex<'a> { +impl<'a> Iterable for ShuffledIndex<'a> { fn boxed_iter(&self) -> Box + '_> { - Box::new(self.merge_order - .iter_new_to_old_row_addrs() - .enumerate() - .filter_map(|(new_row_id, old_row_addr)| { - let Some(column_index) = &self.column_indexes[old_row_addr.segment_ord as usize] else { - return None; - }; - let row_id = new_row_id as u32; - if column_index.has_value(old_row_addr.row_id) { - Some(row_id) - } else { - None - } - })) + Box::new( + self.merge_order + .iter_new_to_old_row_addrs() + .enumerate() + .filter_map(|(new_row_id, old_row_addr)| { + let column_index = &self.column_indexes[old_row_addr.segment_ord as usize]; + let row_id = new_row_id as u32; + if column_index.has_value(old_row_addr.row_id) { + Some(row_id) + } else { + None + } + }), + ) } } fn merge_column_index_shuffled_multivalued<'a>( - column_indexes: &'a [Option], + column_indexes: &'a [ColumnIndex], merge_order: &'a ShuffleMergeOrder, ) -> Box + 'a> { Box::new(ShuffledMultivaluedIndex { @@ -77,19 +77,16 @@ fn merge_column_index_shuffled_multivalued<'a>( } struct ShuffledMultivaluedIndex<'a> { - column_indexes: &'a [Option], + column_indexes: &'a [ColumnIndex], merge_order: &'a ShuffleMergeOrder, } fn iter_num_values<'a>( - column_indexes: &'a [Option], + column_indexes: &'a [ColumnIndex], merge_order: &'a ShuffleMergeOrder, ) -> impl Iterator + 'a { merge_order.iter_new_to_old_row_addrs().map(|row_addr| { - let Some(column_index) = &column_indexes[row_addr.segment_ord as usize] else { - // No values in the entire column. It surely means there are 0 values associated to this row. - return 0u32; - }; + let column_index = &column_indexes[row_addr.segment_ord as usize]; match column_index { ColumnIndex::Empty { .. } => 0u32, ColumnIndex::Full => 1, @@ -143,7 +140,7 @@ mod tests { #[test] fn test_merge_column_index_optional_shuffle() { let optional_index: ColumnIndex = OptionalIndex::for_test(2, &[0]).into(); - let column_indexes = vec![Some(optional_index), Some(ColumnIndex::Full)]; + let column_indexes = vec![optional_index, ColumnIndex::Full]; let row_addrs = vec![ RowAddr { segment_ord: 0u32, diff --git a/columnar/src/column_index/merge/stacked.rs b/columnar/src/column_index/merge/stacked.rs index e039ec2da..9ef294b60 100644 --- a/columnar/src/column_index/merge/stacked.rs +++ b/columnar/src/column_index/merge/stacked.rs @@ -9,7 +9,7 @@ use crate::{Cardinality, ColumnIndex, RowId, StackMergeOrder}; /// /// There are no sort nor deletes involved. pub fn merge_column_index_stacked<'a>( - columns: &'a [Option], + columns: &'a [ColumnIndex], cardinality_after_merge: Cardinality, stack_merge_order: &'a StackMergeOrder, ) -> SerializableColumnIndex<'a> { @@ -33,7 +33,7 @@ pub fn merge_column_index_stacked<'a>( } struct StackedOptionalIndex<'a> { - columns: &'a [Option], + columns: &'a [ColumnIndex], stack_merge_order: &'a StackMergeOrder, } @@ -46,16 +46,16 @@ impl<'a> Iterable for StackedOptionalIndex<'a> { .flat_map(|(columnar_id, column_index_opt)| { let columnar_row_range = self.stack_merge_order.columnar_range(columnar_id); let rows_it: Box> = match column_index_opt { - Some(ColumnIndex::Full) => Box::new(columnar_row_range), - Some(ColumnIndex::Optional(optional_index)) => Box::new( + ColumnIndex::Full => Box::new(columnar_row_range), + ColumnIndex::Optional(optional_index) => Box::new( optional_index .iter_rows() .map(move |row_id: RowId| columnar_row_range.start + row_id), ), - Some(ColumnIndex::Multivalued(_)) => { + ColumnIndex::Multivalued(_) => { panic!("No multivalued index is allowed when stacking column index"); } - None | Some(ColumnIndex::Empty { .. }) => Box::new(std::iter::empty()), + ColumnIndex::Empty { .. } => Box::new(std::iter::empty()), }; rows_it }), @@ -65,20 +65,18 @@ impl<'a> Iterable for StackedOptionalIndex<'a> { #[derive(Clone, Copy)] struct StackedMultivaluedIndex<'a> { - columns: &'a [Option], + columns: &'a [ColumnIndex], stack_merge_order: &'a StackMergeOrder, } fn convert_column_opt_to_multivalued_index<'a>( - column_index_opt: Option<&'a ColumnIndex>, + column_index_opt: &'a ColumnIndex, num_rows: RowId, ) -> Box + 'a> { match column_index_opt { - None | Some(ColumnIndex::Empty { .. }) => { - Box::new(iter::repeat(0u32).take(num_rows as usize + 1)) - } - Some(ColumnIndex::Full) => Box::new(0..num_rows + 1), - Some(ColumnIndex::Optional(optional_index)) => { + ColumnIndex::Empty { .. } => Box::new(iter::repeat(0u32).take(num_rows as usize + 1)), + ColumnIndex::Full => Box::new(0..num_rows + 1), + ColumnIndex::Optional(optional_index) => { Box::new( (0..num_rows) // TODO optimize @@ -86,9 +84,7 @@ fn convert_column_opt_to_multivalued_index<'a>( .chain(std::iter::once(optional_index.num_non_nulls())), ) } - Some(ColumnIndex::Multivalued(multivalued_index)) => { - multivalued_index.start_index_column.iter() - } + ColumnIndex::Multivalued(multivalued_index) => multivalued_index.start_index_column.iter(), } } @@ -97,7 +93,6 @@ impl<'a> Iterable for StackedMultivaluedIndex<'a> { let multivalued_indexes = self.columns .iter() - .map(Option::as_ref) .enumerate() .map(|(columnar_id, column_opt)| { let num_rows = diff --git a/columnar/src/column_values/merge.rs b/columnar/src/column_values/merge.rs index d70b4295b..ff3d657f4 100644 --- a/columnar/src/column_values/merge.rs +++ b/columnar/src/column_values/merge.rs @@ -5,7 +5,7 @@ use crate::iterable::Iterable; use crate::{ColumnIndex, ColumnValues, MergeRowOrder}; pub(crate) struct MergedColumnValues<'a, T> { - pub(crate) column_indexes: &'a [Option], + pub(crate) column_indexes: &'a [ColumnIndex], pub(crate) column_values: &'a [Option>>], pub(crate) merge_row_order: &'a MergeRowOrder, } @@ -23,8 +23,7 @@ impl<'a, T: Copy + PartialOrd + Debug> Iterable for MergedColumnValues<'a, T> shuffle_merge_order .iter_new_to_old_row_addrs() .flat_map(|row_addr| { - let column_index = - self.column_indexes[row_addr.segment_ord as usize].as_ref()?; + let column_index = &self.column_indexes[row_addr.segment_ord as usize]; let column_values = self.column_values[row_addr.segment_ord as usize].as_ref()?; let value_range = column_index.value_row_ids(row_addr.row_id); diff --git a/columnar/src/columnar/merge/mod.rs b/columnar/src/columnar/merge/mod.rs index 069a559c7..10d2c7343 100644 --- a/columnar/src/columnar/merge/mod.rs +++ b/columnar/src/columnar/merge/mod.rs @@ -78,6 +78,10 @@ pub fn merge_columnar( output: &mut impl io::Write, ) -> io::Result<()> { let mut serializer = ColumnarSerializer::new(output); + let num_rows_per_column = columnar_readers + .iter() + .map(|reader| reader.num_rows()) + .collect::>(); let columns_to_merge = group_columns_for_merge(columnar_readers, required_columns)?; for ((column_name, column_type), columns) in columns_to_merge { @@ -85,6 +89,7 @@ pub fn merge_columnar( serializer.serialize_column(column_name.as_bytes(), column_type); merge_column( column_type, + &num_rows_per_column, columns, &merge_row_order, &mut column_serializer, @@ -108,6 +113,7 @@ fn dynamic_column_to_u64_monotonic(dynamic_column: DynamicColumn) -> Option>, merge_row_order: &MergeRowOrder, wrt: &mut impl io::Write, @@ -118,17 +124,19 @@ fn merge_column( | ColumnType::F64 | ColumnType::DateTime | ColumnType::Bool => { - let mut column_indexes: Vec> = Vec::with_capacity(columns.len()); + let mut column_indexes: Vec = Vec::with_capacity(columns.len()); let mut column_values: Vec>> = Vec::with_capacity(columns.len()); - for dynamic_column_opt in columns { + for (idx, dynamic_column_opt) in columns.into_iter().enumerate() { if let Some(Column { idx, values }) = dynamic_column_opt.and_then(dynamic_column_to_u64_monotonic) { - column_indexes.push(Some(idx)); + column_indexes.push(idx); column_values.push(Some(values)); } else { - column_indexes.push(None); + column_indexes.push(ColumnIndex::Empty { + num_docs: num_docs_per_column[idx], + }); column_values.push(None); } } @@ -142,15 +150,17 @@ fn merge_column( serialize_column_mappable_to_u64(merged_column_index, &merge_column_values, wrt)?; } ColumnType::IpAddr => { - let mut column_indexes: Vec> = Vec::with_capacity(columns.len()); + let mut column_indexes: Vec = Vec::with_capacity(columns.len()); let mut column_values: Vec>>> = Vec::with_capacity(columns.len()); - for dynamic_column_opt in columns { + for (idx, dynamic_column_opt) in columns.into_iter().enumerate() { if let Some(DynamicColumn::IpAddr(Column { idx, values })) = dynamic_column_opt { - column_indexes.push(Some(idx)); + column_indexes.push(idx); column_values.push(Some(values)); } else { - column_indexes.push(None); + column_indexes.push(ColumnIndex::Empty { + num_docs: num_docs_per_column[idx], + }); column_values.push(None); } } @@ -166,20 +176,22 @@ fn merge_column( serialize_column_mappable_to_u128(merged_column_index, &merge_column_values, wrt)?; } ColumnType::Bytes | ColumnType::Str => { - let mut column_indexes: Vec> = Vec::with_capacity(columns.len()); + let mut column_indexes: Vec = Vec::with_capacity(columns.len()); let mut bytes_columns: Vec> = Vec::with_capacity(columns.len()); - for dynamic_column_opt in columns { + for (idx, dynamic_column_opt) in columns.into_iter().enumerate() { match dynamic_column_opt { Some(DynamicColumn::Str(str_column)) => { - column_indexes.push(Some(str_column.term_ord_column.idx.clone())); + column_indexes.push(str_column.term_ord_column.idx.clone()); bytes_columns.push(Some(str_column.into())); } Some(DynamicColumn::Bytes(bytes_column)) => { - column_indexes.push(Some(bytes_column.term_ord_column.idx.clone())); + column_indexes.push(bytes_column.term_ord_column.idx.clone()); bytes_columns.push(Some(bytes_column)); } _ => { - column_indexes.push(None); + column_indexes.push(ColumnIndex::Empty { + num_docs: num_docs_per_column[idx], + }); bytes_columns.push(None); } } diff --git a/columnar/src/columnar/merge/tests.rs b/columnar/src/columnar/merge/tests.rs index 10eb8e1be..616d14c77 100644 --- a/columnar/src/columnar/merge/tests.rs +++ b/columnar/src/columnar/merge/tests.rs @@ -1,3 +1,5 @@ +use itertools::Itertools; + use super::*; use crate::{Cardinality, ColumnarWriter, HasAssociatedColumnType, RowId}; @@ -249,6 +251,8 @@ fn test_merge_columnar_texts() { let cols = columnar_reader.read_columns("texts").unwrap(); let dynamic_column = cols[0].open().unwrap(); let DynamicColumn::Str(vals) = dynamic_column else { panic!() }; + assert_eq!(vals.ords().get_cardinality(), Cardinality::Optional); + let get_str_for_ord = |ord| { let mut out = String::new(); vals.ord_to_str(ord, &mut out).unwrap(); @@ -376,3 +380,93 @@ fn test_merge_columnar_byte_with_missing() { assert_eq!(get_bytes_for_row(6), vec![b"b".to_vec()]); assert_eq!(get_bytes_for_row(7), vec![b"a".to_vec(), b"b".to_vec()]); } + +#[test] +fn test_merge_columnar_different_types() { + let columnar1 = make_text_columnar_multiple_columns(&[("mixed", &[&["a"]])]); + let columnar2 = make_text_columnar_multiple_columns(&[("mixed", &[&[], &["b"]])]); + let columnar3 = make_columnar("mixed", &[1i64]); + let mut buffer = Vec::new(); + let columnars = &[&columnar1, &columnar2, &columnar3]; + let stack_merge_order = StackMergeOrder::stack(columnars); + crate::columnar::merge_columnar( + columnars, + &[], + MergeRowOrder::Stack(stack_merge_order), + &mut buffer, + ) + .unwrap(); + let columnar_reader = ColumnarReader::open(buffer).unwrap(); + assert_eq!(columnar_reader.num_rows(), 4); + assert_eq!(columnar_reader.num_columns(), 2); + let cols = columnar_reader.read_columns("mixed").unwrap(); + + // numeric column + let dynamic_column = cols[0].open().unwrap(); + let DynamicColumn::I64(vals) = dynamic_column else { panic!() }; + assert_eq!(vals.get_cardinality(), Cardinality::Optional); + assert_eq!(vals.values_for_doc(0).collect_vec(), vec![]); + assert_eq!(vals.values_for_doc(1).collect_vec(), vec![]); + assert_eq!(vals.values_for_doc(2).collect_vec(), vec![]); + assert_eq!(vals.values_for_doc(3).collect_vec(), vec![1]); + assert_eq!(vals.values_for_doc(4).collect_vec(), vec![]); + + // text column + let dynamic_column = cols[1].open().unwrap(); + let DynamicColumn::Str(vals) = dynamic_column else { panic!() }; + assert_eq!(vals.ords().get_cardinality(), Cardinality::Optional); + let get_str_for_ord = |ord| { + let mut out = String::new(); + vals.ord_to_str(ord, &mut out).unwrap(); + out + }; + + assert_eq!(vals.dictionary.num_terms(), 2); + assert_eq!(get_str_for_ord(0), "a"); + assert_eq!(get_str_for_ord(1), "b"); + + let get_str_for_row = |row_id| { + let term_ords: Vec = vals + .term_ords(row_id) + .map(|el| { + let mut out = String::new(); + vals.ord_to_str(el, &mut out).unwrap(); + out + }) + .collect(); + term_ords + }; + + assert_eq!(get_str_for_row(0), vec!["a".to_string()]); + assert_eq!(get_str_for_row(1), Vec::::new()); + assert_eq!(get_str_for_row(2), vec!["b".to_string()]); + assert_eq!(get_str_for_row(3), Vec::::new()); +} + +#[test] +fn test_merge_columnar_different_empty_cardinality() { + let columnar1 = make_text_columnar_multiple_columns(&[("mixed", &[&["a"]])]); + let columnar2 = make_columnar("mixed", &[1i64]); + let mut buffer = Vec::new(); + let columnars = &[&columnar1, &columnar2]; + let stack_merge_order = StackMergeOrder::stack(columnars); + crate::columnar::merge_columnar( + columnars, + &[], + MergeRowOrder::Stack(stack_merge_order), + &mut buffer, + ) + .unwrap(); + let columnar_reader = ColumnarReader::open(buffer).unwrap(); + assert_eq!(columnar_reader.num_rows(), 2); + assert_eq!(columnar_reader.num_columns(), 2); + let cols = columnar_reader.read_columns("mixed").unwrap(); + + // numeric column + let dynamic_column = cols[0].open().unwrap(); + assert_eq!(dynamic_column.get_cardinality(), Cardinality::Optional); + + // text column + let dynamic_column = cols[1].open().unwrap(); + assert_eq!(dynamic_column.get_cardinality(), Cardinality::Optional); +}