diff --git a/columnar/src/column_index/merge/stacked.rs b/columnar/src/column_index/merge/stacked.rs index 4a1a70375..32724f5b0 100644 --- a/columnar/src/column_index/merge/stacked.rs +++ b/columnar/src/column_index/merge/stacked.rs @@ -56,7 +56,7 @@ fn get_doc_ids_with_values<'a>( ColumnIndex::Full => Box::new(doc_range), ColumnIndex::Optional(optional_index) => Box::new( optional_index - .iter_rows() + .iter_docs() .map(move |row| row + doc_range.start), ), ColumnIndex::Multivalued(multivalued_index) => match multivalued_index { @@ -73,7 +73,7 @@ fn get_doc_ids_with_values<'a>( MultiValueIndex::MultiValueIndexV2(multivalued_index) => Box::new( multivalued_index .optional_index - .iter_rows() + .iter_docs() .map(move |row| row + doc_range.start), ), }, @@ -177,7 +177,7 @@ impl<'a> Iterable for StackedOptionalIndex<'a> { ColumnIndex::Full => Box::new(columnar_row_range), ColumnIndex::Optional(optional_index) => Box::new( optional_index - .iter_rows() + .iter_docs() .map(move |row_id: RowId| columnar_row_range.start + row_id), ), ColumnIndex::Multivalued(_) => { diff --git a/columnar/src/column_index/optional_index/mod.rs b/columnar/src/column_index/optional_index/mod.rs index f3ab523e2..0923c0c2e 100644 --- a/columnar/src/column_index/optional_index/mod.rs +++ b/columnar/src/column_index/optional_index/mod.rs @@ -80,23 +80,23 @@ impl BlockVariant { /// index is the block index. For each block `byte_start` and `offset` is computed. #[derive(Clone)] pub struct OptionalIndex { - num_rows: RowId, - num_non_null_rows: RowId, + num_docs: RowId, + num_non_null_docs: RowId, block_data: OwnedBytes, block_metas: Arc<[BlockMeta]>, } impl Iterable for &OptionalIndex { fn boxed_iter(&self) -> Box + '_> { - Box::new(self.iter_rows()) + Box::new(self.iter_docs()) } } impl std::fmt::Debug for OptionalIndex { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { f.debug_struct("OptionalIndex") - .field("num_rows", &self.num_rows) - .field("num_non_null_rows", &self.num_non_null_rows) + .field("num_docs", &self.num_docs) + .field("num_non_null_docs", &self.num_non_null_docs) .finish_non_exhaustive() } } @@ -271,17 +271,17 @@ impl OptionalIndex { } pub fn num_docs(&self) -> RowId { - self.num_rows + self.num_docs } pub fn num_non_nulls(&self) -> RowId { - self.num_non_null_rows + self.num_non_null_docs } - pub fn iter_rows(&self) -> impl Iterator + '_ { + pub fn iter_docs(&self) -> impl Iterator + '_ { // TODO optimize let mut select_batch = self.select_cursor(); - (0..self.num_non_null_rows).map(move |rank| select_batch.select(rank)) + (0..self.num_non_null_docs).map(move |rank| select_batch.select(rank)) } pub fn select_batch(&self, ranks: &mut [RowId]) { let mut select_cursor = self.select_cursor(); @@ -519,15 +519,15 @@ pub fn open_optional_index(bytes: OwnedBytes) -> io::Result { let (mut bytes, num_non_empty_blocks_bytes) = bytes.rsplit(2); let num_non_empty_block_bytes = u16::from_le_bytes(num_non_empty_blocks_bytes.as_slice().try_into().unwrap()); - let num_rows = VInt::deserialize_u64(&mut bytes)? as u32; + let num_docs = VInt::deserialize_u64(&mut bytes)? as u32; let block_metas_num_bytes = num_non_empty_block_bytes as usize * SERIALIZED_BLOCK_META_NUM_BYTES; let (block_data, block_metas) = bytes.rsplit(block_metas_num_bytes); - let (block_metas, num_non_null_rows) = - deserialize_optional_index_block_metadatas(block_metas.as_slice(), num_rows); + let (block_metas, num_non_null_docs) = + deserialize_optional_index_block_metadatas(block_metas.as_slice(), num_docs); let optional_index = OptionalIndex { - num_rows, - num_non_null_rows, + num_docs, + num_non_null_docs, block_data, block_metas: block_metas.into(), }; diff --git a/columnar/src/column_index/optional_index/tests.rs b/columnar/src/column_index/optional_index/tests.rs index 2bcb77fd6..41d496390 100644 --- a/columnar/src/column_index/optional_index/tests.rs +++ b/columnar/src/column_index/optional_index/tests.rs @@ -164,7 +164,7 @@ fn test_optional_index_large() { fn test_optional_index_iter_aux(row_ids: &[RowId], num_rows: RowId) { let optional_index = OptionalIndex::for_test(num_rows, row_ids); assert_eq!(optional_index.num_docs(), num_rows); - assert!(optional_index.iter_rows().eq(row_ids.iter().copied())); + assert!(optional_index.iter_docs().eq(row_ids.iter().copied())); } #[test] diff --git a/columnar/src/columnar/merge/merge_dict_column.rs b/columnar/src/columnar/merge/merge_dict_column.rs index e2247a156..cee7daf68 100644 --- a/columnar/src/columnar/merge/merge_dict_column.rs +++ b/columnar/src/columnar/merge/merge_dict_column.rs @@ -3,7 +3,7 @@ use std::io::{self, Write}; use common::{BitSet, CountingWriter, ReadOnlyBitSet}; use sstable::{SSTable, Streamer, TermOrdinal, VoidSSTable}; -use super::term_merger::TermMerger; +use super::term_merger::{TermMerger, TermsWithSegmentOrd}; use crate::column::serialize_column_mappable_to_u64; use crate::column_index::SerializableColumnIndex; use crate::iterable::Iterable; @@ -126,14 +126,17 @@ fn serialize_merged_dict( let mut term_ord_mapping = TermOrdinalMapping::default(); let mut field_term_streams = Vec::new(); - for column_opt in bytes_columns.iter() { + for (segment_ord, column_opt) in bytes_columns.iter().enumerate() { if let Some(column) = column_opt { term_ord_mapping.add_segment(column.dictionary.num_terms()); let terms: Streamer = column.dictionary.stream()?; - field_term_streams.push(terms); + field_term_streams.push(TermsWithSegmentOrd { terms, segment_ord }); } else { term_ord_mapping.add_segment(0); - field_term_streams.push(Streamer::empty()); + field_term_streams.push(TermsWithSegmentOrd { + terms: Streamer::empty(), + segment_ord, + }); } } @@ -191,6 +194,7 @@ fn serialize_merged_dict( #[derive(Default, Debug)] struct TermOrdinalMapping { + /// Contains the new term ordinals for each segment. per_segment_new_term_ordinals: Vec>, } @@ -205,6 +209,6 @@ impl TermOrdinalMapping { } fn get_segment(&self, segment_ord: u32) -> &[TermOrdinal] { - &(self.per_segment_new_term_ordinals[segment_ord as usize])[..] + &self.per_segment_new_term_ordinals[segment_ord as usize] } } diff --git a/columnar/src/columnar/merge/merge_mapping.rs b/columnar/src/columnar/merge/merge_mapping.rs index 842886182..2072e108a 100644 --- a/columnar/src/columnar/merge/merge_mapping.rs +++ b/columnar/src/columnar/merge/merge_mapping.rs @@ -26,7 +26,7 @@ impl StackMergeOrder { let mut cumulated_row_ids: Vec = Vec::with_capacity(columnars.len()); let mut cumulated_row_id = 0; for columnar in columnars { - cumulated_row_id += columnar.num_rows(); + cumulated_row_id += columnar.num_docs(); cumulated_row_ids.push(cumulated_row_id); } StackMergeOrder { cumulated_row_ids } diff --git a/columnar/src/columnar/merge/mod.rs b/columnar/src/columnar/merge/mod.rs index f07590750..b286698df 100644 --- a/columnar/src/columnar/merge/mod.rs +++ b/columnar/src/columnar/merge/mod.rs @@ -80,13 +80,12 @@ pub fn merge_columnar( output: &mut impl io::Write, ) -> io::Result<()> { let mut serializer = ColumnarSerializer::new(output); - let num_rows_per_columnar = columnar_readers + let num_docs_per_columnar = columnar_readers .iter() - .map(|reader| reader.num_rows()) + .map(|reader| reader.num_docs()) .collect::>(); - let columns_to_merge = - group_columns_for_merge(columnar_readers, required_columns, &merge_row_order)?; + let columns_to_merge = group_columns_for_merge(columnar_readers, required_columns)?; for res in columns_to_merge { let ((column_name, _column_type_category), grouped_columns) = res; let grouped_columns = grouped_columns.open(&merge_row_order)?; @@ -94,15 +93,18 @@ pub fn merge_columnar( continue; } - let column_type = grouped_columns.column_type_after_merge(); + let column_type_after_merge = grouped_columns.column_type_after_merge(); let mut columns = grouped_columns.columns; - coerce_columns(column_type, &mut columns)?; + // Make sure the number of columns is the same as the number of columnar readers. + // Or num_docs_per_columnar would be incorrect. + assert_eq!(columns.len(), columnar_readers.len()); + coerce_columns(column_type_after_merge, &mut columns)?; let mut column_serializer = - serializer.start_serialize_column(column_name.as_bytes(), column_type); + serializer.start_serialize_column(column_name.as_bytes(), column_type_after_merge); merge_column( - column_type, - &num_rows_per_columnar, + column_type_after_merge, + &num_docs_per_columnar, columns, &merge_row_order, &mut column_serializer, @@ -128,7 +130,7 @@ fn dynamic_column_to_u64_monotonic(dynamic_column: DynamicColumn) -> Option>, + columns_to_merge: Vec>, merge_row_order: &MergeRowOrder, wrt: &mut impl io::Write, ) -> io::Result<()> { @@ -138,10 +140,10 @@ fn merge_column( | ColumnType::F64 | ColumnType::DateTime | ColumnType::Bool => { - let mut column_indexes: Vec = Vec::with_capacity(columns.len()); + let mut column_indexes: Vec = Vec::with_capacity(columns_to_merge.len()); let mut column_values: Vec>> = - Vec::with_capacity(columns.len()); - for (i, dynamic_column_opt) in columns.into_iter().enumerate() { + Vec::with_capacity(columns_to_merge.len()); + for (i, dynamic_column_opt) in columns_to_merge.into_iter().enumerate() { if let Some(Column { index: idx, values }) = dynamic_column_opt.and_then(dynamic_column_to_u64_monotonic) { @@ -164,10 +166,10 @@ fn merge_column( serialize_column_mappable_to_u64(merged_column_index, &merge_column_values, wrt)?; } ColumnType::IpAddr => { - let mut column_indexes: Vec = Vec::with_capacity(columns.len()); + let mut column_indexes: Vec = Vec::with_capacity(columns_to_merge.len()); let mut column_values: Vec>>> = - Vec::with_capacity(columns.len()); - for (i, dynamic_column_opt) in columns.into_iter().enumerate() { + Vec::with_capacity(columns_to_merge.len()); + for (i, dynamic_column_opt) in columns_to_merge.into_iter().enumerate() { if let Some(DynamicColumn::IpAddr(Column { index: idx, values })) = dynamic_column_opt { @@ -192,9 +194,10 @@ fn merge_column( serialize_column_mappable_to_u128(merged_column_index, &merge_column_values, wrt)?; } ColumnType::Bytes | ColumnType::Str => { - let mut column_indexes: Vec = Vec::with_capacity(columns.len()); - let mut bytes_columns: Vec> = Vec::with_capacity(columns.len()); - for (i, dynamic_column_opt) in columns.into_iter().enumerate() { + let mut column_indexes: Vec = Vec::with_capacity(columns_to_merge.len()); + let mut bytes_columns: Vec> = + Vec::with_capacity(columns_to_merge.len()); + for (i, dynamic_column_opt) in columns_to_merge.into_iter().enumerate() { match dynamic_column_opt { Some(DynamicColumn::Str(str_column)) => { column_indexes.push(str_column.term_ord_column.index.clone()); @@ -248,7 +251,7 @@ impl GroupedColumns { if column_type.len() == 1 { return column_type.into_iter().next().unwrap(); } - // At the moment, only the numerical categorical column type has more than one possible + // At the moment, only the numerical column type category has more than one possible // column type. assert!(self .columns @@ -361,7 +364,7 @@ fn is_empty_after_merge( ColumnIndex::Empty { .. } => true, ColumnIndex::Full => alive_bitset.len() == 0, ColumnIndex::Optional(optional_index) => { - for doc in optional_index.iter_rows() { + for doc in optional_index.iter_docs() { if alive_bitset.contains(doc) { return false; } @@ -391,7 +394,6 @@ fn is_empty_after_merge( fn group_columns_for_merge<'a>( columnar_readers: &'a [&'a ColumnarReader], required_columns: &'a [(String, ColumnType)], - _merge_row_order: &'a MergeRowOrder, ) -> io::Result> { let mut columns: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = BTreeMap::new(); diff --git a/columnar/src/columnar/merge/term_merger.rs b/columnar/src/columnar/merge/term_merger.rs index d3e219ea6..7b63790fb 100644 --- a/columnar/src/columnar/merge/term_merger.rs +++ b/columnar/src/columnar/merge/term_merger.rs @@ -5,28 +5,29 @@ use sstable::TermOrdinal; use crate::Streamer; -pub struct HeapItem<'a> { - pub streamer: Streamer<'a>, +/// The terms of a column with the ordinal of the segment. +pub struct TermsWithSegmentOrd<'a> { + pub terms: Streamer<'a>, pub segment_ord: usize, } -impl PartialEq for HeapItem<'_> { +impl PartialEq for TermsWithSegmentOrd<'_> { fn eq(&self, other: &Self) -> bool { self.segment_ord == other.segment_ord } } -impl Eq for HeapItem<'_> {} +impl Eq for TermsWithSegmentOrd<'_> {} -impl<'a> PartialOrd for HeapItem<'a> { - fn partial_cmp(&self, other: &HeapItem<'a>) -> Option { +impl<'a> PartialOrd for TermsWithSegmentOrd<'a> { + fn partial_cmp(&self, other: &TermsWithSegmentOrd<'a>) -> Option { Some(self.cmp(other)) } } -impl<'a> Ord for HeapItem<'a> { - fn cmp(&self, other: &HeapItem<'a>) -> Ordering { - (&other.streamer.key(), &other.segment_ord).cmp(&(&self.streamer.key(), &self.segment_ord)) +impl<'a> Ord for TermsWithSegmentOrd<'a> { + fn cmp(&self, other: &TermsWithSegmentOrd<'a>) -> Ordering { + (&other.terms.key(), &other.segment_ord).cmp(&(&self.terms.key(), &self.segment_ord)) } } @@ -37,39 +38,32 @@ impl<'a> Ord for HeapItem<'a> { /// - the term /// - a slice with the ordinal of the segments containing the terms. pub struct TermMerger<'a> { - heap: BinaryHeap>, - current_streamers: Vec>, + heap: BinaryHeap>, + term_streams_with_segment: Vec>, } impl<'a> TermMerger<'a> { /// Stream of merged term dictionary - pub fn new(streams: Vec>) -> TermMerger<'a> { + pub fn new(term_streams_with_segment: Vec>) -> TermMerger<'a> { TermMerger { heap: BinaryHeap::new(), - current_streamers: streams - .into_iter() - .enumerate() - .map(|(ord, streamer)| HeapItem { - streamer, - segment_ord: ord, - }) - .collect(), + term_streams_with_segment, } } pub(crate) fn matching_segments<'b: 'a>( &'b self, ) -> impl 'b + Iterator { - self.current_streamers + self.term_streams_with_segment .iter() - .map(|heap_item| (heap_item.segment_ord, heap_item.streamer.term_ord())) + .map(|heap_item| (heap_item.segment_ord, heap_item.terms.term_ord())) } fn advance_segments(&mut self) { - let streamers = &mut self.current_streamers; + let streamers = &mut self.term_streams_with_segment; let heap = &mut self.heap; for mut heap_item in streamers.drain(..) { - if heap_item.streamer.advance() { + if heap_item.terms.advance() { heap.push(heap_item); } } @@ -81,13 +75,13 @@ impl<'a> TermMerger<'a> { pub fn advance(&mut self) -> bool { self.advance_segments(); if let Some(head) = self.heap.pop() { - self.current_streamers.push(head); + self.term_streams_with_segment.push(head); while let Some(next_streamer) = self.heap.peek() { - if self.current_streamers[0].streamer.key() != next_streamer.streamer.key() { + if self.term_streams_with_segment[0].terms.key() != next_streamer.terms.key() { break; } let next_heap_it = self.heap.pop().unwrap(); // safe : we peeked beforehand - self.current_streamers.push(next_heap_it); + self.term_streams_with_segment.push(next_heap_it); } true } else { @@ -101,6 +95,6 @@ impl<'a> TermMerger<'a> { /// if and only if advance() has been called before /// and "true" was returned. pub fn key(&self) -> &[u8] { - self.current_streamers[0].streamer.key() + self.term_streams_with_segment[0].terms.key() } } diff --git a/columnar/src/columnar/merge/tests.rs b/columnar/src/columnar/merge/tests.rs index 1419bf2c9..1eea5e9ef 100644 --- a/columnar/src/columnar/merge/tests.rs +++ b/columnar/src/columnar/merge/tests.rs @@ -1,7 +1,10 @@ use itertools::Itertools; +use proptest::collection::vec; +use proptest::prelude::*; use super::*; -use crate::{Cardinality, ColumnarWriter, HasAssociatedColumnType, RowId}; +use crate::columnar::{merge_columnar, ColumnarReader, MergeRowOrder, StackMergeOrder}; +use crate::{Cardinality, ColumnarWriter, DynamicColumn, HasAssociatedColumnType, RowId}; fn make_columnar + HasAssociatedColumnType + Copy>( column_name: &str, @@ -26,9 +29,8 @@ fn test_column_coercion_to_u64() { // u64 type let columnar2 = make_columnar("numbers", &[u64::MAX]); let columnars = &[&columnar1, &columnar2]; - let merge_order = StackMergeOrder::stack(columnars).into(); let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = - group_columns_for_merge(columnars, &[], &merge_order).unwrap(); + group_columns_for_merge(columnars, &[]).unwrap(); assert_eq!(column_map.len(), 1); assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical))); } @@ -38,9 +40,8 @@ fn test_column_coercion_to_i64() { let columnar1 = make_columnar("numbers", &[-1i64]); let columnar2 = make_columnar("numbers", &[2u64]); let columnars = &[&columnar1, &columnar2]; - let merge_order = StackMergeOrder::stack(columnars).into(); let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = - group_columns_for_merge(columnars, &[], &merge_order).unwrap(); + group_columns_for_merge(columnars, &[]).unwrap(); assert_eq!(column_map.len(), 1); assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical))); } @@ -63,14 +64,8 @@ fn test_group_columns_with_required_column() { let columnar1 = make_columnar("numbers", &[1i64]); let columnar2 = make_columnar("numbers", &[2u64]); let columnars = &[&columnar1, &columnar2]; - let merge_order = StackMergeOrder::stack(columnars).into(); let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = - group_columns_for_merge( - &[&columnar1, &columnar2], - &[("numbers".to_string(), ColumnType::U64)], - &merge_order, - ) - .unwrap(); + group_columns_for_merge(columnars, &[("numbers".to_string(), ColumnType::U64)]).unwrap(); assert_eq!(column_map.len(), 1); assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical))); } @@ -80,13 +75,9 @@ fn test_group_columns_required_column_with_no_existing_columns() { let columnar1 = make_columnar("numbers", &[2u64]); let columnar2 = make_columnar("numbers", &[2u64]); let columnars = &[&columnar1, &columnar2]; - let merge_order = StackMergeOrder::stack(columnars).into(); - let column_map: BTreeMap<_, _> = group_columns_for_merge( - columnars, - &[("required_col".to_string(), ColumnType::Str)], - &merge_order, - ) - .unwrap(); + let column_map: BTreeMap<_, _> = + group_columns_for_merge(columnars, &[("required_col".to_string(), ColumnType::Str)]) + .unwrap(); assert_eq!(column_map.len(), 2); let columns = &column_map .get(&("required_col".to_string(), ColumnTypeCategory::Str)) @@ -102,14 +93,8 @@ fn test_group_columns_required_column_is_above_all_columns_have_the_same_type_ru let columnar1 = make_columnar("numbers", &[2i64]); let columnar2 = make_columnar("numbers", &[2i64]); let columnars = &[&columnar1, &columnar2]; - let merge_order = StackMergeOrder::stack(columnars).into(); let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = - group_columns_for_merge( - columnars, - &[("numbers".to_string(), ColumnType::U64)], - &merge_order, - ) - .unwrap(); + group_columns_for_merge(columnars, &[("numbers".to_string(), ColumnType::U64)]).unwrap(); assert_eq!(column_map.len(), 1); assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical))); } @@ -119,9 +104,8 @@ fn test_missing_column() { let columnar1 = make_columnar("numbers", &[-1i64]); let columnar2 = make_columnar("numbers2", &[2u64]); let columnars = &[&columnar1, &columnar2]; - let merge_order = StackMergeOrder::stack(columnars).into(); let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = - group_columns_for_merge(columnars, &[], &merge_order).unwrap(); + group_columns_for_merge(columnars, &[]).unwrap(); assert_eq!(column_map.len(), 2); assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical))); { @@ -224,7 +208,7 @@ fn test_merge_columnar_numbers() { ) .unwrap(); let columnar_reader = ColumnarReader::open(buffer).unwrap(); - assert_eq!(columnar_reader.num_rows(), 3); + assert_eq!(columnar_reader.num_docs(), 3); assert_eq!(columnar_reader.num_columns(), 1); let cols = columnar_reader.read_columns("numbers").unwrap(); let dynamic_column = cols[0].open().unwrap(); @@ -252,7 +236,7 @@ fn test_merge_columnar_texts() { ) .unwrap(); let columnar_reader = ColumnarReader::open(buffer).unwrap(); - assert_eq!(columnar_reader.num_rows(), 3); + assert_eq!(columnar_reader.num_docs(), 3); assert_eq!(columnar_reader.num_columns(), 1); let cols = columnar_reader.read_columns("texts").unwrap(); let dynamic_column = cols[0].open().unwrap(); @@ -301,7 +285,7 @@ fn test_merge_columnar_byte() { ) .unwrap(); let columnar_reader = ColumnarReader::open(buffer).unwrap(); - assert_eq!(columnar_reader.num_rows(), 4); + assert_eq!(columnar_reader.num_docs(), 4); assert_eq!(columnar_reader.num_columns(), 1); let cols = columnar_reader.read_columns("bytes").unwrap(); let dynamic_column = cols[0].open().unwrap(); @@ -357,7 +341,7 @@ fn test_merge_columnar_byte_with_missing() { ) .unwrap(); let columnar_reader = ColumnarReader::open(buffer).unwrap(); - assert_eq!(columnar_reader.num_rows(), 3 + 2 + 3); + assert_eq!(columnar_reader.num_docs(), 3 + 2 + 3); assert_eq!(columnar_reader.num_columns(), 2); let cols = columnar_reader.read_columns("col").unwrap(); let dynamic_column = cols[0].open().unwrap(); @@ -409,7 +393,7 @@ fn test_merge_columnar_different_types() { ) .unwrap(); let columnar_reader = ColumnarReader::open(buffer).unwrap(); - assert_eq!(columnar_reader.num_rows(), 4); + assert_eq!(columnar_reader.num_docs(), 4); assert_eq!(columnar_reader.num_columns(), 2); let cols = columnar_reader.read_columns("mixed").unwrap(); @@ -474,7 +458,7 @@ fn test_merge_columnar_different_empty_cardinality() { ) .unwrap(); let columnar_reader = ColumnarReader::open(buffer).unwrap(); - assert_eq!(columnar_reader.num_rows(), 2); + assert_eq!(columnar_reader.num_docs(), 2); assert_eq!(columnar_reader.num_columns(), 2); let cols = columnar_reader.read_columns("mixed").unwrap(); @@ -486,3 +470,119 @@ fn test_merge_columnar_different_empty_cardinality() { let dynamic_column = cols[1].open().unwrap(); assert_eq!(dynamic_column.get_cardinality(), Cardinality::Optional); } + +#[derive(Debug, Clone)] +struct ColumnSpec { + column_name: String, + /// (row_id, term) + terms: Vec<(RowId, Vec)>, +} + +#[derive(Clone, Debug)] +struct ColumnarSpec { + columns: Vec, +} + +/// Generate a random (row_id, term) pair: +/// - row_id in [0..10] +/// - term is either from POSSIBLE_TERMS or random bytes +fn rowid_and_term_strategy() -> impl Strategy)> { + const POSSIBLE_TERMS: &[&[u8]] = &[b"a", b"b", b"allo"]; + + let term_strat = prop_oneof![ + // pick from the fixed list + (0..POSSIBLE_TERMS.len()).prop_map(|i| POSSIBLE_TERMS[i].to_vec()), + // or random bytes (length 0..10) + prop::collection::vec(any::(), 0..10), + ]; + + (0u32..11, term_strat) +} + +/// Generate one ColumnSpec, with a random name and a random list of (row_id, term). +/// We sort it by row_id so that data is in ascending order. +fn column_spec_strategy() -> impl Strategy { + let column_name = prop_oneof![ + Just("col".to_string()), + Just("col2".to_string()), + "col.*".prop_map(|s| s), + ]; + + // We'll produce 0..8 (rowid,term) entries for this column + let data_strat = vec(rowid_and_term_strategy(), 0..8).prop_map(|mut pairs| { + // Sort by row_id + pairs.sort_by_key(|(row_id, _)| *row_id); + pairs + }); + + (column_name, data_strat).prop_map(|(name, data)| ColumnSpec { + column_name: name, + terms: data, + }) +} + +/// Strategy to generate an ColumnarSpec +fn columnar_strategy() -> impl Strategy { + vec(column_spec_strategy(), 0..3).prop_map(|columns| ColumnarSpec { columns }) +} + +/// Strategy to generate multiple ColumnarSpecs, each of which we will treat +/// as one "columnar" to be merged together. +fn columnars_strategy() -> impl Strategy> { + vec(columnar_strategy(), 1..4) +} + +/// Build a `ColumnarReader` from a `ColumnarSpec` +fn build_columnar(spec: &ColumnarSpec) -> ColumnarReader { + let mut writer = ColumnarWriter::default(); + let mut max_row_id = 0; + for col in &spec.columns { + for &(row_id, ref term) in &col.terms { + writer.record_bytes(row_id, &col.column_name, term); + max_row_id = max_row_id.max(row_id); + } + } + + let mut buffer = Vec::new(); + writer.serialize(max_row_id + 1, &mut buffer).unwrap(); + ColumnarReader::open(buffer).unwrap() +} + +proptest! { + // We just test that the merge_columnar function doesn't crash. + #![proptest_config(ProptestConfig::with_cases(256))] + #[test] + fn test_merge_columnar_bytes_no_crash(columnars in columnars_strategy(), second_merge_columnars in columnars_strategy()) { + let columnars: Vec = columnars.iter() + .map(build_columnar) + .collect(); + + let mut out = Vec::new(); + let columnar_refs: Vec<&ColumnarReader> = columnars.iter().collect(); + let stack_merge_order = StackMergeOrder::stack(&columnar_refs); + merge_columnar( + &columnar_refs, + &[], + MergeRowOrder::Stack(stack_merge_order), + &mut out, + ).unwrap(); + + let merged_reader = ColumnarReader::open(out).unwrap(); + + // Merge the second set of columnars with the result of the first merge + let mut columnars: Vec = second_merge_columnars.iter() + .map(build_columnar) + .collect(); + columnars.push(merged_reader); + let mut out = Vec::new(); + let columnar_refs: Vec<&ColumnarReader> = columnars.iter().collect(); + let stack_merge_order = StackMergeOrder::stack(&columnar_refs); + merge_columnar( + &columnar_refs, + &[], + MergeRowOrder::Stack(stack_merge_order), + &mut out, + ).unwrap(); + + } +} diff --git a/columnar/src/columnar/reader/mod.rs b/columnar/src/columnar/reader/mod.rs index f1936f846..cb659bb5d 100644 --- a/columnar/src/columnar/reader/mod.rs +++ b/columnar/src/columnar/reader/mod.rs @@ -19,13 +19,13 @@ fn io_invalid_data(msg: String) -> io::Error { pub struct ColumnarReader { column_dictionary: Dictionary, column_data: FileSlice, - num_rows: RowId, + num_docs: RowId, format_version: Version, } impl fmt::Debug for ColumnarReader { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let num_rows = self.num_rows(); + let num_rows = self.num_docs(); let columns = self.list_columns().unwrap(); let num_cols = columns.len(); let mut debug_struct = f.debug_struct("Columnar"); @@ -112,13 +112,13 @@ impl ColumnarReader { Ok(ColumnarReader { column_dictionary, column_data, - num_rows, + num_docs: num_rows, format_version, }) } - pub fn num_rows(&self) -> RowId { - self.num_rows + pub fn num_docs(&self) -> RowId { + self.num_docs } // Iterate over the columns in a sorted way pub fn iter_columns( diff --git a/columnar/src/tests.rs b/columnar/src/tests.rs index a5ab9d0ff..b7ce7f27f 100644 --- a/columnar/src/tests.rs +++ b/columnar/src/tests.rs @@ -380,7 +380,7 @@ fn assert_columnar_eq( right: &ColumnarReader, lenient_on_numerical_value: bool, ) { - assert_eq!(left.num_rows(), right.num_rows()); + assert_eq!(left.num_docs(), right.num_docs()); let left_columns = left.list_columns().unwrap(); let right_columns = right.list_columns().unwrap(); assert_eq!(left_columns.len(), right_columns.len()); @@ -588,7 +588,7 @@ proptest! { #[test] fn test_single_columnar_builder_proptest(docs in columnar_docs_strategy()) { let columnar = build_columnar(&docs[..]); - assert_eq!(columnar.num_rows() as usize, docs.len()); + assert_eq!(columnar.num_docs() as usize, docs.len()); let mut expected_columns: HashMap<(&str, ColumnTypeCategory), HashMap> > = Default::default(); for (doc_id, doc_vals) in docs.iter().enumerate() { for (col_name, col_val) in doc_vals { @@ -820,7 +820,7 @@ fn test_columnar_merge_empty() { ) .unwrap(); let merged_columnar = ColumnarReader::open(output).unwrap(); - assert_eq!(merged_columnar.num_rows(), 0); + assert_eq!(merged_columnar.num_docs(), 0); assert_eq!(merged_columnar.num_columns(), 0); } @@ -846,7 +846,7 @@ fn test_columnar_merge_single_str_column() { ) .unwrap(); let merged_columnar = ColumnarReader::open(output).unwrap(); - assert_eq!(merged_columnar.num_rows(), 1); + assert_eq!(merged_columnar.num_docs(), 1); assert_eq!(merged_columnar.num_columns(), 1); } @@ -878,7 +878,7 @@ fn test_delete_decrease_cardinality() { ) .unwrap(); let merged_columnar = ColumnarReader::open(output).unwrap(); - assert_eq!(merged_columnar.num_rows(), 1); + assert_eq!(merged_columnar.num_docs(), 1); assert_eq!(merged_columnar.num_columns(), 1); let cols = merged_columnar.read_columns("c").unwrap(); assert_eq!(cols.len(), 1);