diff --git a/columnar/benches/bench_access.rs b/columnar/benches/bench_access.rs index 571d949ec..e40d3f179 100644 --- a/columnar/benches/bench_access.rs +++ b/columnar/benches/bench_access.rs @@ -1,73 +1,13 @@ -use core::fmt; -use std::fmt::{Display, Formatter}; - use binggan::{black_box, InputGroup}; -use tantivy_columnar::*; +use common::*; +use tantivy_columnar::Column; -pub enum Card { - MultiSparse, - Multi, - Sparse, - Dense, - Full, -} -impl Display for Card { - fn fmt(&self, f: &mut Formatter) -> fmt::Result { - match self { - Card::MultiSparse => write!(f, "multi sparse 1/13"), - Card::Multi => write!(f, "multi 2x"), - Card::Sparse => write!(f, "sparse 1/13"), - Card::Dense => write!(f, "dense 1/12"), - Card::Full => write!(f, "full"), - } - } -} +pub mod common; const NUM_DOCS: u32 = 2_000_000; -pub fn generate_columnar(card: Card, num_docs: u32) -> ColumnarReader { - use tantivy_columnar::ColumnarWriter; - - let mut columnar_writer = ColumnarWriter::default(); - - match card { - Card::MultiSparse => { - columnar_writer.record_numerical(0, "price", 10u64); - columnar_writer.record_numerical(0, "price", 10u64); - } - _ => {} - } - - for i in 0..num_docs { - match card { - Card::MultiSparse | Card::Sparse => { - if i % 13 == 0 { - columnar_writer.record_numerical(i, "price", i as u64); - } - } - Card::Dense => { - if i % 12 == 0 { - columnar_writer.record_numerical(i, "price", i as u64); - } - } - Card::Full => { - columnar_writer.record_numerical(i, "price", i as u64); - } - Card::Multi => { - columnar_writer.record_numerical(i, "price", i as u64); - columnar_writer.record_numerical(i, "price", i as u64); - } - } - } - - let mut wrt: Vec = Vec::new(); - columnar_writer.serialize(num_docs, &mut wrt).unwrap(); - let reader = ColumnarReader::open(wrt).unwrap(); - reader -} - pub fn generate_columnar_and_open(card: Card, num_docs: u32) -> Column { - let reader = generate_columnar(card, num_docs); + let reader = generate_columnar_with_name(card, num_docs, "price"); reader.read_columns("price").unwrap()[0] .open_u64_lenient() .unwrap() @@ -116,9 +56,8 @@ fn bench_group(mut runner: InputGroup) { column.first_vals(&docs, &mut buffer); for val in buffer.iter() { - if let Some(val) = val { - sum += *val; - } + let Some(val) = val else { continue }; + sum += *val; } } diff --git a/columnar/benches/bench_merge.rs b/columnar/benches/bench_merge.rs index b30efd43a..9e6d33d41 100644 --- a/columnar/benches/bench_merge.rs +++ b/columnar/benches/bench_merge.rs @@ -1,7 +1,7 @@ -mod bench_access; +pub mod common; -use bench_access::{generate_columnar, Card}; use binggan::{black_box, BenchRunner}; +use common::{generate_columnar_with_name, Card}; use tantivy_columnar::*; const NUM_DOCS: u32 = 100_000; @@ -13,8 +13,8 @@ fn main() { inputs.push(( format!("merge_{card1}_and_{card2}"), vec![ - generate_columnar(card1, NUM_DOCS), - generate_columnar(card2, NUM_DOCS), + generate_columnar_with_name(card1, NUM_DOCS, "price"), + generate_columnar_with_name(card2, NUM_DOCS, "price"), ], )); }; diff --git a/columnar/benches/common.rs b/columnar/benches/common.rs new file mode 100644 index 000000000..b087b0ac8 --- /dev/null +++ b/columnar/benches/common.rs @@ -0,0 +1,59 @@ +extern crate tantivy_columnar; + +use core::fmt; +use std::fmt::{Display, Formatter}; + +use tantivy_columnar::{ColumnarReader, ColumnarWriter}; + +pub enum Card { + MultiSparse, + Multi, + Sparse, + Dense, + Full, +} +impl Display for Card { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + match self { + Card::MultiSparse => write!(f, "multi sparse 1/13"), + Card::Multi => write!(f, "multi 2x"), + Card::Sparse => write!(f, "sparse 1/13"), + Card::Dense => write!(f, "dense 1/12"), + Card::Full => write!(f, "full"), + } + } +} +pub fn generate_columnar_with_name(card: Card, num_docs: u32, column_name: &str) -> ColumnarReader { + let mut columnar_writer = ColumnarWriter::default(); + + if let Card::MultiSparse = card { + columnar_writer.record_numerical(0, column_name, 10u64); + columnar_writer.record_numerical(0, column_name, 10u64); + } + + for i in 0..num_docs { + match card { + Card::MultiSparse | Card::Sparse => { + if i % 13 == 0 { + columnar_writer.record_numerical(i, column_name, i as u64); + } + } + Card::Dense => { + if i % 12 == 0 { + columnar_writer.record_numerical(i, column_name, i as u64); + } + } + Card::Full => { + columnar_writer.record_numerical(i, column_name, i as u64); + } + Card::Multi => { + columnar_writer.record_numerical(i, column_name, i as u64); + columnar_writer.record_numerical(i, column_name, i as u64); + } + } + } + + let mut wrt: Vec = Vec::new(); + columnar_writer.serialize(num_docs, &mut wrt).unwrap(); + ColumnarReader::open(wrt).unwrap() +} diff --git a/columnar/compat_tests_data/v1.columnar b/columnar/compat_tests_data/v1.columnar index 512b4394e..58e7b9efb 100644 Binary files a/columnar/compat_tests_data/v1.columnar and b/columnar/compat_tests_data/v1.columnar differ diff --git a/columnar/compat_tests_data/v2.columnar b/columnar/compat_tests_data/v2.columnar new file mode 100644 index 000000000..78bbcf868 Binary files /dev/null and b/columnar/compat_tests_data/v2.columnar differ diff --git a/columnar/src/column/mod.rs b/columnar/src/column/mod.rs index dd6dc0f21..4349c7407 100644 --- a/columnar/src/column/mod.rs +++ b/columnar/src/column/mod.rs @@ -136,7 +136,7 @@ impl Column { .map(|value_row_id: RowId| self.values.get_val(value_row_id)) } - /// Get the docids of values which are in the provided value range. + /// Get the docids of values which are in the provided value and docid range. #[inline] pub fn get_docids_for_value_range( &self, diff --git a/columnar/src/column/serialize.rs b/columnar/src/column/serialize.rs index 4198487bb..73fc5e7f5 100644 --- a/columnar/src/column/serialize.rs +++ b/columnar/src/column/serialize.rs @@ -12,7 +12,7 @@ use crate::column_values::{ CodecType, MonotonicallyMappableToU128, MonotonicallyMappableToU64, }; use crate::iterable::Iterable; -use crate::StrColumn; +use crate::{StrColumn, Version}; pub fn serialize_column_mappable_to_u128( column_index: SerializableColumnIndex<'_>, @@ -40,25 +40,9 @@ pub fn serialize_column_mappable_to_u64( Ok(()) } -pub fn open_column_u64(bytes: OwnedBytes) -> io::Result> { - let (body, column_index_num_bytes_payload) = bytes.rsplit(4); - let column_index_num_bytes = u32::from_le_bytes( - column_index_num_bytes_payload - .as_slice() - .try_into() - .unwrap(), - ); - let (column_index_data, column_values_data) = body.split(column_index_num_bytes as usize); - let column_index = crate::column_index::open_column_index(column_index_data)?; - let column_values = load_u64_based_column_values(column_values_data)?; - Ok(Column { - index: column_index, - values: column_values, - }) -} - -pub fn open_column_u128( +pub fn open_column_u64( bytes: OwnedBytes, + format_version: Version, ) -> io::Result> { let (body, column_index_num_bytes_payload) = bytes.rsplit(4); let column_index_num_bytes = u32::from_le_bytes( @@ -68,7 +52,27 @@ pub fn open_column_u128( .unwrap(), ); let (column_index_data, column_values_data) = body.split(column_index_num_bytes as usize); - let column_index = crate::column_index::open_column_index(column_index_data)?; + let column_index = crate::column_index::open_column_index(column_index_data, format_version)?; + let column_values = load_u64_based_column_values(column_values_data)?; + Ok(Column { + index: column_index, + values: column_values, + }) +} + +pub fn open_column_u128( + bytes: OwnedBytes, + format_version: Version, +) -> io::Result> { + let (body, column_index_num_bytes_payload) = bytes.rsplit(4); + let column_index_num_bytes = u32::from_le_bytes( + column_index_num_bytes_payload + .as_slice() + .try_into() + .unwrap(), + ); + let (column_index_data, column_values_data) = body.split(column_index_num_bytes as usize); + let column_index = crate::column_index::open_column_index(column_index_data, format_version)?; let column_values = crate::column_values::open_u128_mapped(column_values_data)?; Ok(Column { index: column_index, @@ -79,7 +83,10 @@ pub fn open_column_u128( /// Open the column as u64. /// /// See [`open_u128_as_compact_u64`] for more details. -pub fn open_column_u128_as_compact_u64(bytes: OwnedBytes) -> io::Result> { +pub fn open_column_u128_as_compact_u64( + bytes: OwnedBytes, + format_version: Version, +) -> io::Result> { let (body, column_index_num_bytes_payload) = bytes.rsplit(4); let column_index_num_bytes = u32::from_le_bytes( column_index_num_bytes_payload @@ -88,7 +95,7 @@ pub fn open_column_u128_as_compact_u64(bytes: OwnedBytes) -> io::Result io::Result io::Result { +pub fn open_column_bytes(data: OwnedBytes, format_version: Version) -> io::Result { let (body, dictionary_len_bytes) = data.rsplit(4); let dictionary_len = u32::from_le_bytes(dictionary_len_bytes.as_slice().try_into().unwrap()); let (dictionary_bytes, column_bytes) = body.split(dictionary_len as usize); let dictionary = Arc::new(Dictionary::from_bytes(dictionary_bytes)?); - let term_ord_column = crate::column::open_column_u64::(column_bytes)?; + let term_ord_column = crate::column::open_column_u64::(column_bytes, format_version)?; Ok(BytesColumn { dictionary, term_ord_column, }) } -pub fn open_column_str(data: OwnedBytes) -> io::Result { - let bytes_column = open_column_bytes(data)?; +pub fn open_column_str(data: OwnedBytes, format_version: Version) -> io::Result { + let bytes_column = open_column_bytes(data, format_version)?; Ok(StrColumn::wrap(bytes_column)) } diff --git a/columnar/src/column_index/merge/mod.rs b/columnar/src/column_index/merge/mod.rs index 1aec9f71c..3c4c3df4f 100644 --- a/columnar/src/column_index/merge/mod.rs +++ b/columnar/src/column_index/merge/mod.rs @@ -95,8 +95,12 @@ pub fn merge_column_index<'a>( #[cfg(test)] mod tests { + use common::OwnedBytes; + use crate::column_index::merge::detect_cardinality; - use crate::column_index::multivalued_index::MultiValueIndex; + use crate::column_index::multivalued_index::{ + open_multivalued_index, serialize_multivalued_index, MultiValueIndex, + }; use crate::column_index::{merge_column_index, OptionalIndex, SerializableColumnIndex}; use crate::{ Cardinality, ColumnIndex, MergeRowOrder, RowAddr, RowId, ShuffleMergeOrder, StackMergeOrder, @@ -171,7 +175,11 @@ mod tests { let SerializableColumnIndex::Multivalued(start_index_iterable) = merged_column_index else { panic!("Excpected a multivalued index") }; - let start_indexes: Vec = start_index_iterable.boxed_iter().collect(); + let mut output = Vec::new(); + serialize_multivalued_index(&start_index_iterable, &mut output).unwrap(); + let multivalue = + open_multivalued_index(OwnedBytes::new(output), crate::Version::V2).unwrap(); + let start_indexes: Vec = multivalue.get_start_index_column().iter().collect(); assert_eq!(&start_indexes, &[0, 3, 5]); } @@ -200,11 +208,16 @@ mod tests { ], ) .into(); + let merged_column_index = merge_column_index(&column_indexes[..], &merge_row_order); let SerializableColumnIndex::Multivalued(start_index_iterable) = merged_column_index else { panic!("Excpected a multivalued index") }; - let start_indexes: Vec = start_index_iterable.boxed_iter().collect(); + let mut output = Vec::new(); + serialize_multivalued_index(&start_index_iterable, &mut output).unwrap(); + let multivalue = + open_multivalued_index(OwnedBytes::new(output), crate::Version::V2).unwrap(); + let start_indexes: Vec = multivalue.get_start_index_column().iter().collect(); assert_eq!(&start_indexes, &[0, 3, 5, 6]); } } diff --git a/columnar/src/column_index/merge/shuffled.rs b/columnar/src/column_index/merge/shuffled.rs index f93b89635..9a985b4b9 100644 --- a/columnar/src/column_index/merge/shuffled.rs +++ b/columnar/src/column_index/merge/shuffled.rs @@ -1,6 +1,8 @@ use std::iter; -use crate::column_index::{SerializableColumnIndex, Set}; +use crate::column_index::{ + SerializableColumnIndex, SerializableMultivalueIndex, SerializableOptionalIndex, Set, +}; use crate::iterable::Iterable; use crate::{Cardinality, ColumnIndex, RowId, ShuffleMergeOrder}; @@ -14,15 +16,24 @@ pub fn merge_column_index_shuffled<'a>( Cardinality::Optional => { let non_null_row_ids = merge_column_index_shuffled_optional(column_indexes, shuffle_merge_order); - SerializableColumnIndex::Optional { + SerializableColumnIndex::Optional(SerializableOptionalIndex { non_null_row_ids, num_rows: shuffle_merge_order.num_rows(), - } + }) } Cardinality::Multivalued => { - let multivalue_start_index = - merge_column_index_shuffled_multivalued(column_indexes, shuffle_merge_order); - SerializableColumnIndex::Multivalued(multivalue_start_index) + let non_null_row_ids = + merge_column_index_shuffled_optional(column_indexes, shuffle_merge_order); + SerializableColumnIndex::Multivalued(SerializableMultivalueIndex { + doc_ids_with_values: SerializableOptionalIndex { + non_null_row_ids, + num_rows: shuffle_merge_order.num_rows(), + }, + start_offsets: merge_column_index_shuffled_multivalued( + column_indexes, + shuffle_merge_order, + ), + }) } } } @@ -102,11 +113,18 @@ fn iter_num_values<'a>( /// Transforms an iterator containing the number of vals per row (with `num_rows` elements) /// into a `start_offset` iterator starting at 0 and (with `num_rows + 1` element) +/// +/// This will filter values with 0 values as these are covered by the optional index in the +/// multivalue index. fn integrate_num_vals(num_vals: impl Iterator) -> impl Iterator { - iter::once(0u32).chain(num_vals.scan(0, |state, num_vals| { - *state += num_vals; - Some(*state) - })) + iter::once(0u32).chain( + num_vals + .filter(|num_vals| *num_vals != 0) + .scan(0, |state, num_vals| { + *state += num_vals; + Some(*state) + }), + ) } impl<'a> Iterable for ShuffledMultivaluedIndex<'a> { @@ -134,7 +152,7 @@ mod tests { #[test] fn test_integrate_num_vals_several() { - assert!(integrate_num_vals([3, 0, 10, 20].into_iter()).eq([0, 3, 3, 13, 33].into_iter())); + assert!(integrate_num_vals([3, 0, 10, 20].into_iter()).eq([0, 3, 13, 33].into_iter())); } #[test] @@ -157,10 +175,10 @@ mod tests { Cardinality::Optional, &shuffle_merge_order, ); - let SerializableColumnIndex::Optional { + let SerializableColumnIndex::Optional(SerializableOptionalIndex { non_null_row_ids, num_rows, - } = serializable_index + }) = serializable_index else { panic!() }; diff --git a/columnar/src/column_index/merge/stacked.rs b/columnar/src/column_index/merge/stacked.rs index ba91b8d64..9c0890abe 100644 --- a/columnar/src/column_index/merge/stacked.rs +++ b/columnar/src/column_index/merge/stacked.rs @@ -1,6 +1,8 @@ -use std::iter; +use std::ops::Range; -use crate::column_index::{SerializableColumnIndex, Set}; +use crate::column_index::multivalued_index::{MultiValueIndex, SerializableMultivalueIndex}; +use crate::column_index::serialize::SerializableOptionalIndex; +use crate::column_index::SerializableColumnIndex; use crate::iterable::Iterable; use crate::{Cardinality, ColumnIndex, RowId, StackMergeOrder}; @@ -15,23 +17,149 @@ pub fn merge_column_index_stacked<'a>( ) -> SerializableColumnIndex<'a> { match cardinality_after_merge { Cardinality::Full => SerializableColumnIndex::Full, - Cardinality::Optional => SerializableColumnIndex::Optional { + Cardinality::Optional => SerializableColumnIndex::Optional(SerializableOptionalIndex { non_null_row_ids: Box::new(StackedOptionalIndex { columns, stack_merge_order, }), num_rows: stack_merge_order.num_rows(), - }, + }), Cardinality::Multivalued => { - let stacked_multivalued_index = StackedMultivaluedIndex { - columns, - stack_merge_order, - }; - SerializableColumnIndex::Multivalued(Box::new(stacked_multivalued_index)) + let serializable_multivalue_index = + make_serializable_multivalued_index(columns, stack_merge_order); + SerializableColumnIndex::Multivalued(serializable_multivalue_index) } } } +struct StackedDocIdsWithValues<'a> { + column_indexes: &'a [ColumnIndex], + stack_merge_order: &'a StackMergeOrder, +} + +impl Iterable for StackedDocIdsWithValues<'_> { + fn boxed_iter(&self) -> Box + '_> { + Box::new((0..self.column_indexes.len()).flat_map(|i| { + let column_index = &self.column_indexes[i]; + let doc_range = self.stack_merge_order.columnar_range(i); + get_doc_ids_with_values(column_index, doc_range) + })) + } +} + +fn get_doc_ids_with_values<'a>( + column_index: &'a ColumnIndex, + doc_range: Range, +) -> Box + 'a> { + match column_index { + ColumnIndex::Empty { .. } => Box::new(0..0), + ColumnIndex::Full => Box::new(doc_range), + ColumnIndex::Optional(optional_index) => Box::new( + optional_index + .iter_rows() + .map(move |row| row + doc_range.start), + ), + ColumnIndex::Multivalued(multivalued_index) => match multivalued_index { + MultiValueIndex::MultiValueIndexV1(multivalued_index) => { + Box::new((0..multivalued_index.num_docs()).filter_map(move |docid| { + let range = multivalued_index.range(docid); + if range.is_empty() { + None + } else { + Some(docid + doc_range.start) + } + })) + } + MultiValueIndex::MultiValueIndexV2(multivalued_index) => Box::new( + multivalued_index + .optional_index + .iter_rows() + .map(move |row| row + doc_range.start), + ), + }, + } +} + +fn stack_doc_ids_with_values<'a>( + column_indexes: &'a [ColumnIndex], + stack_merge_order: &'a StackMergeOrder, +) -> SerializableOptionalIndex<'a> { + let num_rows = stack_merge_order.num_rows(); + SerializableOptionalIndex { + non_null_row_ids: Box::new(StackedDocIdsWithValues { + column_indexes, + stack_merge_order, + }), + num_rows, + } +} + +struct StackedStartOffsets<'a> { + column_indexes: &'a [ColumnIndex], + stack_merge_order: &'a StackMergeOrder, +} + +fn get_num_values_iterator<'a>( + column_index: &'a ColumnIndex, + num_docs: u32, +) -> Box + 'a> { + match column_index { + ColumnIndex::Empty { .. } => Box::new(std::iter::empty()), + ColumnIndex::Full => Box::new(std::iter::repeat(1u32).take(num_docs as usize)), + ColumnIndex::Optional(optional_index) => { + Box::new(std::iter::repeat(1u32).take(optional_index.num_non_nulls() as usize)) + } + ColumnIndex::Multivalued(multivalued_index) => Box::new( + multivalued_index + .get_start_index_column() + .iter() + .scan(0u32, |previous_start_offset, current_start_offset| { + let num_vals = current_start_offset - *previous_start_offset; + *previous_start_offset = current_start_offset; + Some(num_vals) + }) + .skip(1), + ), + } +} + +impl<'a> Iterable for StackedStartOffsets<'a> { + fn boxed_iter(&self) -> Box + '_> { + let num_values_it = (0..self.column_indexes.len()).flat_map(|columnar_id| { + let num_docs = self.stack_merge_order.columnar_range(columnar_id).len() as u32; + let column_index = &self.column_indexes[columnar_id]; + get_num_values_iterator(column_index, num_docs) + }); + Box::new(std::iter::once(0u32).chain(num_values_it.into_iter().scan( + 0u32, + |cumulated, el| { + *cumulated += el; + Some(*cumulated) + }, + ))) + } +} + +fn stack_start_offsets<'a>( + column_indexes: &'a [ColumnIndex], + stack_merge_order: &'a StackMergeOrder, +) -> Box + 'a> { + Box::new(StackedStartOffsets { + column_indexes, + stack_merge_order, + }) +} + +fn make_serializable_multivalued_index<'a>( + columns: &'a [ColumnIndex], + stack_merge_order: &'a StackMergeOrder, +) -> SerializableMultivalueIndex<'a> { + SerializableMultivalueIndex { + doc_ids_with_values: stack_doc_ids_with_values(columns, stack_merge_order), + start_offsets: stack_start_offsets(columns, stack_merge_order), + } +} + struct StackedOptionalIndex<'a> { columns: &'a [ColumnIndex], stack_merge_order: &'a StackMergeOrder, @@ -62,87 +190,3 @@ impl<'a> Iterable for StackedOptionalIndex<'a> { ) } } - -#[derive(Clone, Copy)] -struct StackedMultivaluedIndex<'a> { - columns: &'a [ColumnIndex], - stack_merge_order: &'a StackMergeOrder, -} - -fn convert_column_opt_to_multivalued_index<'a>( - column_index_opt: &'a ColumnIndex, - num_rows: RowId, -) -> Box + 'a> { - match column_index_opt { - ColumnIndex::Empty { .. } => Box::new(iter::repeat(0u32).take(num_rows as usize + 1)), - ColumnIndex::Full => Box::new(0..num_rows + 1), - ColumnIndex::Optional(optional_index) => { - Box::new( - (0..num_rows) - // TODO optimize - .map(|row_id| optional_index.rank(row_id)) - .chain(std::iter::once(optional_index.num_non_nulls())), - ) - } - ColumnIndex::Multivalued(multivalued_index) => multivalued_index.start_index_column.iter(), - } -} - -impl<'a> Iterable for StackedMultivaluedIndex<'a> { - fn boxed_iter(&self) -> Box + '_> { - let multivalued_indexes = - self.columns - .iter() - .enumerate() - .map(|(columnar_id, column_opt)| { - let num_rows = - self.stack_merge_order.columnar_range(columnar_id).len() as RowId; - convert_column_opt_to_multivalued_index(column_opt, num_rows) - }); - stack_multivalued_indexes(multivalued_indexes) - } -} - -// Refactor me -fn stack_multivalued_indexes<'a>( - mut multivalued_indexes: impl Iterator + 'a>> + 'a, -) -> Box + 'a> { - let mut offset = 0; - let mut last_row_id = 0; - let mut current_it = multivalued_indexes.next(); - Box::new(std::iter::from_fn(move || loop { - if let Some(row_id) = current_it.as_mut()?.next() { - last_row_id = offset + row_id; - return Some(last_row_id); - } - offset = last_row_id; - loop { - current_it = multivalued_indexes.next(); - if current_it.as_mut()?.next().is_some() { - break; - } - } - })) -} - -#[cfg(test)] -mod tests { - use crate::RowId; - - fn it<'a>(row_ids: &'a [RowId]) -> Box + 'a> { - Box::new(row_ids.iter().copied()) - } - - #[test] - fn test_stack() { - let columns = [ - it(&[0u32, 0u32]), - it(&[0u32, 1u32, 1u32, 4u32]), - it(&[0u32, 3u32, 5u32]), - it(&[0u32, 4u32]), - ] - .into_iter(); - let start_offsets: Vec = super::stack_multivalued_indexes(columns).collect(); - assert_eq!(start_offsets, &[0, 0, 1, 1, 4, 7, 9, 13]); - } -} diff --git a/columnar/src/column_index/mod.rs b/columnar/src/column_index/mod.rs index f52e26ff4..1edfa16b2 100644 --- a/columnar/src/column_index/mod.rs +++ b/columnar/src/column_index/mod.rs @@ -11,8 +11,11 @@ mod serialize; use std::ops::Range; pub use merge::merge_column_index; +pub(crate) use multivalued_index::SerializableMultivalueIndex; pub use optional_index::{OptionalIndex, Set}; -pub use serialize::{open_column_index, serialize_column_index, SerializableColumnIndex}; +pub use serialize::{ + open_column_index, serialize_column_index, SerializableColumnIndex, SerializableOptionalIndex, +}; use crate::column_index::multivalued_index::MultiValueIndex; use crate::{Cardinality, DocId, RowId}; @@ -131,15 +134,41 @@ impl ColumnIndex { let row_end = optional_index.rank(doc_id_range.end); row_start..row_end } - ColumnIndex::Multivalued(multivalued_index) => { - let end_docid = doc_id_range.end.min(multivalued_index.num_docs() - 1) + 1; - let start_docid = doc_id_range.start.min(end_docid); + ColumnIndex::Multivalued(multivalued_index) => match multivalued_index { + MultiValueIndex::MultiValueIndexV1(index) => { + let row_start = index.start_index_column.get_val(doc_id_range.start); + let row_end = index.start_index_column.get_val(doc_id_range.end); + row_start..row_end + } + MultiValueIndex::MultiValueIndexV2(index) => { + // In this case we will use the optional_index select the next values + // that are valid. There are different cases to consider: + // Not exists below means does not exist in the optional + // index, because it has no values. + // * doc_id_range may cover a range of docids which are non existent + // => rank + // will give us the next document outside the range with a value. They both + // get the same rank and therefore return a zero range + // + // * doc_id_range.start and doc_id_range.end may not exist, but docids in + // between may have values + // => rank will give us the next document outside the range with a value. + // + // * doc_id_range.start may be not existent but doc_id_range.end may exist + // * doc_id_range.start may exist but doc_id_range.end may not exist + // * doc_id_range.start and doc_id_range.end may exist + // => rank on doc_id_range.end will give use the next value, which matches + // how the `start_index_column` works, so we get the value start of the next + // docid which we use to create the exclusive range. + // + let rank_start = index.optional_index.rank(doc_id_range.start); + let row_start = index.start_index_column.get_val(rank_start); + let rank_end = index.optional_index.rank(doc_id_range.end); + let row_end = index.start_index_column.get_val(rank_end); - let row_start = multivalued_index.start_index_column.get_val(start_docid); - let row_end = multivalued_index.start_index_column.get_val(end_docid); - - row_start..row_end - } + row_start..row_end + } + }, } } diff --git a/columnar/src/column_index/multivalued_index.rs b/columnar/src/column_index/multivalued_index.rs index eab82a3e3..cef5a1221 100644 --- a/columnar/src/column_index/multivalued_index.rs +++ b/columnar/src/column_index/multivalued_index.rs @@ -3,64 +3,98 @@ use std::io::Write; use std::ops::Range; use std::sync::Arc; -use common::OwnedBytes; +use common::{CountingWriter, OwnedBytes}; +use super::optional_index::{open_optional_index, serialize_optional_index}; +use super::{OptionalIndex, SerializableOptionalIndex, Set}; use crate::column_values::{ load_u64_based_column_values, serialize_u64_based_column_values, CodecType, ColumnValues, }; use crate::iterable::Iterable; -use crate::{DocId, RowId}; +use crate::{DocId, RowId, Version}; + +pub struct SerializableMultivalueIndex<'a> { + pub doc_ids_with_values: SerializableOptionalIndex<'a>, + pub start_offsets: Box + 'a>, +} pub fn serialize_multivalued_index( - multivalued_index: &dyn Iterable, + multivalued_index: &SerializableMultivalueIndex, output: &mut impl Write, ) -> io::Result<()> { + let SerializableMultivalueIndex { + doc_ids_with_values, + start_offsets, + } = multivalued_index; + let mut count_writer = CountingWriter::wrap(output); + let SerializableOptionalIndex { + non_null_row_ids, + num_rows, + } = doc_ids_with_values; + serialize_optional_index(&**non_null_row_ids, *num_rows, &mut count_writer)?; + let optional_len = count_writer.written_bytes() as u32; + let output = count_writer.finish(); serialize_u64_based_column_values( - multivalued_index, + &**start_offsets, &[CodecType::Bitpacked, CodecType::Linear], output, )?; + output.write_all(&optional_len.to_le_bytes())?; Ok(()) } -pub fn open_multivalued_index(bytes: OwnedBytes) -> io::Result { - let start_index_column: Arc> = load_u64_based_column_values(bytes)?; - Ok(MultiValueIndex { start_index_column }) +pub fn open_multivalued_index( + bytes: OwnedBytes, + format_version: Version, +) -> io::Result { + match format_version { + Version::V1 => { + let start_index_column: Arc> = + load_u64_based_column_values(bytes)?; + Ok(MultiValueIndex::MultiValueIndexV1(MultiValueIndexV1 { + start_index_column, + })) + } + Version::V2 => { + let (body_bytes, optional_index_len) = bytes.rsplit(4); + let optional_index_len = + u32::from_le_bytes(optional_index_len.as_slice().try_into().unwrap()); + let (optional_index_bytes, start_index_bytes) = + body_bytes.split(optional_index_len as usize); + let optional_index = open_optional_index(optional_index_bytes)?; + let start_index_column: Arc> = + load_u64_based_column_values(start_index_bytes)?; + Ok(MultiValueIndex::MultiValueIndexV2(MultiValueIndexV2 { + optional_index, + start_index_column, + })) + } + } } #[derive(Clone)] /// Index to resolve value range for given doc_id. /// Starts at 0. -pub struct MultiValueIndex { +pub enum MultiValueIndex { + MultiValueIndexV1(MultiValueIndexV1), + MultiValueIndexV2(MultiValueIndexV2), +} + +#[derive(Clone)] +/// Index to resolve value range for given doc_id. +/// Starts at 0. +pub struct MultiValueIndexV1 { pub start_index_column: Arc>, } -impl std::fmt::Debug for MultiValueIndex { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - f.debug_struct("MultiValuedIndex") - .field("num_rows", &self.start_index_column.num_vals()) - .finish_non_exhaustive() - } -} - -impl From>> for MultiValueIndex { - fn from(start_index_column: Arc>) -> Self { - MultiValueIndex { start_index_column } - } -} - -impl MultiValueIndex { - pub fn for_test(start_offsets: &[RowId]) -> MultiValueIndex { - let mut buffer = Vec::new(); - serialize_multivalued_index(&start_offsets, &mut buffer).unwrap(); - let bytes = OwnedBytes::new(buffer); - open_multivalued_index(bytes).unwrap() - } - +impl MultiValueIndexV1 { /// Returns `[start, end)`, such that the values associated with /// the given document are `start..end`. #[inline] pub(crate) fn range(&self, doc_id: DocId) -> Range { + if doc_id >= self.num_docs() { + return 0..0; + } let start = self.start_index_column.get_val(doc_id); let end = self.start_index_column.get_val(doc_id + 1); start..end @@ -83,7 +117,6 @@ impl MultiValueIndex { /// /// TODO: Instead of a linear scan we can employ a exponential search into binary search to /// match a docid to its value position. - #[allow(clippy::bool_to_int_with_if)] pub(crate) fn select_batch_in_place(&self, docid_start: DocId, ranks: &mut Vec) { if ranks.is_empty() { return; @@ -111,11 +144,170 @@ impl MultiValueIndex { } } +#[derive(Clone)] +/// Index to resolve value range for given doc_id. +/// Starts at 0. +pub struct MultiValueIndexV2 { + pub optional_index: OptionalIndex, + pub start_index_column: Arc>, +} + +impl std::fmt::Debug for MultiValueIndex { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + let index = match self { + MultiValueIndex::MultiValueIndexV1(idx) => &idx.start_index_column, + MultiValueIndex::MultiValueIndexV2(idx) => &idx.start_index_column, + }; + f.debug_struct("MultiValuedIndex") + .field("num_rows", &index.num_vals()) + .finish_non_exhaustive() + } +} + +impl MultiValueIndex { + pub fn for_test(start_offsets: &[RowId]) -> MultiValueIndex { + assert!(!start_offsets.is_empty()); + assert_eq!(start_offsets[0], 0); + let mut doc_with_values = Vec::new(); + let mut compact_start_offsets: Vec = vec![0]; + for doc in 0..start_offsets.len() - 1 { + if start_offsets[doc] < start_offsets[doc + 1] { + doc_with_values.push(doc as RowId); + compact_start_offsets.push(start_offsets[doc + 1]); + } + } + let serializable_multivalued_index = SerializableMultivalueIndex { + doc_ids_with_values: SerializableOptionalIndex { + non_null_row_ids: Box::new(&doc_with_values[..]), + num_rows: start_offsets.len() as u32 - 1, + }, + start_offsets: Box::new(&compact_start_offsets[..]), + }; + let mut buffer = Vec::new(); + serialize_multivalued_index(&serializable_multivalued_index, &mut buffer).unwrap(); + let bytes = OwnedBytes::new(buffer); + open_multivalued_index(bytes, Version::V2).unwrap() + } + + pub fn get_start_index_column(&self) -> &Arc> { + match self { + MultiValueIndex::MultiValueIndexV1(idx) => &idx.start_index_column, + MultiValueIndex::MultiValueIndexV2(idx) => &idx.start_index_column, + } + } + + /// Returns `[start, end)` values range, such that the values associated with + /// the given document are `start..end`. + #[inline] + pub(crate) fn range(&self, doc_id: DocId) -> Range { + match self { + MultiValueIndex::MultiValueIndexV1(idx) => idx.range(doc_id), + MultiValueIndex::MultiValueIndexV2(idx) => idx.range(doc_id), + } + } + + /// Returns the number of documents in the index. + #[inline] + pub fn num_docs(&self) -> u32 { + match self { + MultiValueIndex::MultiValueIndexV1(idx) => idx.start_index_column.num_vals() - 1, + MultiValueIndex::MultiValueIndexV2(idx) => idx.optional_index.num_docs(), + } + } + + /// Converts a list of ranks (row ids of values) in a 1:n index to the corresponding list of + /// docids. Positions are converted inplace to docids. + /// + /// Since there is no index for value pos -> docid, but docid -> value pos range, we scan the + /// index. + /// + /// Correctness: positions needs to be sorted. idx_reader needs to contain monotonically + /// increasing positions. + /// + /// TODO: Instead of a linear scan we can employ a exponential search into binary search to + /// match a docid to its value position. + pub(crate) fn select_batch_in_place(&self, docid_start: DocId, ranks: &mut Vec) { + match self { + MultiValueIndex::MultiValueIndexV1(idx) => { + idx.select_batch_in_place(docid_start, ranks) + } + MultiValueIndex::MultiValueIndexV2(idx) => { + idx.select_batch_in_place(docid_start, ranks) + } + } + } +} +impl MultiValueIndexV2 { + /// Returns `[start, end)`, such that the values associated with + /// the given document are `start..end`. + #[inline] + pub(crate) fn range(&self, doc_id: DocId) -> Range { + let Some(rank) = self.optional_index.rank_if_exists(doc_id) else { + return 0..0; + }; + let start = self.start_index_column.get_val(rank); + let end = self.start_index_column.get_val(rank + 1); + start..end + } + + /// Returns the number of documents in the index. + #[inline] + pub fn num_docs(&self) -> u32 { + self.optional_index.num_docs() + } + + /// Converts a list of ranks (row ids of values) in a 1:n index to the corresponding list of + /// docids. Positions are converted inplace to docids. + /// + /// Since there is no index for value pos -> docid, but docid -> value pos range, we scan the + /// index. + /// + /// Correctness: positions needs to be sorted. idx_reader needs to contain monotonically + /// increasing positions. + /// + /// TODO: Instead of a linear scan we can employ a exponential search into binary search to + /// match a docid to its value position. + pub(crate) fn select_batch_in_place(&self, docid_start: DocId, ranks: &mut Vec) { + if ranks.is_empty() { + return; + } + let mut cur_pos_in_idx = self.optional_index.rank(docid_start); + let mut last_doc = None; + + assert!(cur_pos_in_idx <= ranks[0]); + + let mut write_doc_pos = 0; + for i in 0..ranks.len() { + let pos = ranks[i]; + loop { + let end = self.start_index_column.get_val(cur_pos_in_idx + 1); + if end > pos { + ranks[write_doc_pos] = cur_pos_in_idx; + write_doc_pos += if last_doc == Some(cur_pos_in_idx) { + 0 + } else { + 1 + }; + last_doc = Some(cur_pos_in_idx); + break; + } + cur_pos_in_idx += 1; + } + } + ranks.truncate(write_doc_pos); + + for rank in ranks.iter_mut() { + *rank = self.optional_index.select(*rank); + } + } +} + #[cfg(test)] mod tests { use std::ops::Range; use super::MultiValueIndex; + use crate::{ColumnarReader, DynamicColumn}; fn index_to_pos_helper( index: &MultiValueIndex, @@ -134,6 +326,7 @@ mod tests { let positions = &[10u32, 11, 15, 20, 21, 22]; assert_eq!(index_to_pos_helper(&index, 0..5, positions), vec![1, 3, 4]); assert_eq!(index_to_pos_helper(&index, 1..5, positions), vec![1, 3, 4]); + assert_eq!(index_to_pos_helper(&index, 0..5, &[9]), vec![0]); assert_eq!(index_to_pos_helper(&index, 1..5, &[10]), vec![1]); assert_eq!(index_to_pos_helper(&index, 1..5, &[11]), vec![1]); @@ -141,4 +334,67 @@ mod tests { assert_eq!(index_to_pos_helper(&index, 2..5, &[12, 14]), vec![2]); assert_eq!(index_to_pos_helper(&index, 2..5, &[12, 14, 15]), vec![2, 3]); } + + #[test] + fn test_range_to_rowids() { + use crate::ColumnarWriter; + + let mut columnar_writer = ColumnarWriter::default(); + + // This column gets coerced to u64 + columnar_writer.record_numerical(1, "full", u64::MAX); + columnar_writer.record_numerical(1, "full", u64::MAX); + + columnar_writer.record_numerical(5, "full", u64::MAX); + columnar_writer.record_numerical(5, "full", u64::MAX); + + let mut wrt: Vec = Vec::new(); + columnar_writer.serialize(7, &mut wrt).unwrap(); + + let reader = ColumnarReader::open(wrt).unwrap(); + // Open the column as u64 + let column = reader.read_columns("full").unwrap()[0] + .open() + .unwrap() + .coerce_numerical(crate::NumericalType::U64) + .unwrap(); + let DynamicColumn::U64(column) = column else { + panic!(); + }; + + let row_id_range = column.index.docid_range_to_rowids(1..2); + assert_eq!(row_id_range, 0..2); + + let row_id_range = column.index.docid_range_to_rowids(0..2); + assert_eq!(row_id_range, 0..2); + + let row_id_range = column.index.docid_range_to_rowids(0..4); + assert_eq!(row_id_range, 0..2); + + let row_id_range = column.index.docid_range_to_rowids(3..4); + assert_eq!(row_id_range, 2..2); + + let row_id_range = column.index.docid_range_to_rowids(1..6); + assert_eq!(row_id_range, 0..4); + + let row_id_range = column.index.docid_range_to_rowids(3..6); + assert_eq!(row_id_range, 2..4); + + let row_id_range = column.index.docid_range_to_rowids(0..6); + assert_eq!(row_id_range, 0..4); + + let row_id_range = column.index.docid_range_to_rowids(0..6); + assert_eq!(row_id_range, 0..4); + + let check = |range, expected| { + let full_range = 0..=u64::MAX; + let mut docids = Vec::new(); + column.get_docids_for_value_range(full_range, range, &mut docids); + assert_eq!(docids, expected); + }; + + // check(0..1, vec![]); + // check(0..2, vec![1]); + check(1..2, vec![1]); + } } diff --git a/columnar/src/column_index/optional_index/mod.rs b/columnar/src/column_index/optional_index/mod.rs index bd12ade19..a615f78a0 100644 --- a/columnar/src/column_index/optional_index/mod.rs +++ b/columnar/src/column_index/optional_index/mod.rs @@ -86,8 +86,14 @@ pub struct OptionalIndex { block_metas: Arc<[BlockMeta]>, } +impl<'a> Iterable for &'a OptionalIndex { + fn boxed_iter(&self) -> Box + '_> { + Box::new(self.iter_rows()) + } +} + impl std::fmt::Debug for OptionalIndex { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { f.debug_struct("OptionalIndex") .field("num_rows", &self.num_rows) .field("num_non_null_rows", &self.num_non_null_rows) diff --git a/columnar/src/column_index/optional_index/set.rs b/columnar/src/column_index/optional_index/set.rs index b2bf9cbe2..25f90596f 100644 --- a/columnar/src/column_index/optional_index/set.rs +++ b/columnar/src/column_index/optional_index/set.rs @@ -28,10 +28,11 @@ pub trait Set { /// Returns true if the elements is contained in the Set fn contains(&self, el: T) -> bool; - /// Returns the number of rows in the set that are < `el` + /// Returns the element's rank (its position in the set). + /// If the set does not contain the element, it will return the next existing elements rank. fn rank(&self, el: T) -> T; - /// If the set contains `el` returns the element rank. + /// If the set contains `el`, returns the element's rank (its position in the set). /// If the set does not contain the element, it returns `None`. fn rank_if_exists(&self, el: T) -> Option; diff --git a/columnar/src/column_index/optional_index/set_block/tests.rs b/columnar/src/column_index/optional_index/set_block/tests.rs index 8957ada25..b6fe5ba4d 100644 --- a/columnar/src/column_index/optional_index/set_block/tests.rs +++ b/columnar/src/column_index/optional_index/set_block/tests.rs @@ -22,8 +22,8 @@ fn test_set_helper>(vals: &[u16]) -> usize { vals.iter().cloned().take_while(|v| *v < val).count() as u16 ); } - for rank in 0..vals.len() { - assert_eq!(tested_set.select(rank as u16), vals[rank]); + for (rank, val) in vals.iter().enumerate() { + assert_eq!(tested_set.select(rank as u16), *val); } buffer.len() } @@ -107,3 +107,41 @@ fn test_simple_translate_codec_idx_to_original_idx_dense() { assert_eq!(i, select_cursor.select(i)); } } + +#[test] +fn test_simple_translate_idx_to_value_idx_dense() { + let mut buffer = Vec::new(); + DenseBlockCodec::serialize([1, 10].iter().copied(), &mut buffer).unwrap(); + let tested_set = DenseBlockCodec::open(buffer.as_slice()); + assert!(tested_set.contains(1)); + assert!(!tested_set.contains(2)); + assert_eq!(tested_set.rank(0), 0); + assert_eq!(tested_set.rank(1), 0); + for rank in 2..10 { + // ranks that don't exist select the next highest one + assert_eq!(tested_set.rank_if_exists(rank), None); + assert_eq!(tested_set.rank(rank), 1); + } + assert_eq!(tested_set.rank(10), 1); +} + +#[test] +fn test_simple_translate_idx_to_value_idx_sparse() { + let mut buffer = Vec::new(); + SparseBlockCodec::serialize([1, 10].iter().copied(), &mut buffer).unwrap(); + let tested_set = SparseBlockCodec::open(buffer.as_slice()); + assert!(tested_set.contains(1)); + assert!(!tested_set.contains(2)); + assert_eq!(tested_set.rank(0), 0); + assert_eq!(tested_set.select(tested_set.rank(0)), 1); + assert_eq!(tested_set.rank(1), 0); + assert_eq!(tested_set.select(tested_set.rank(1)), 1); + for rank in 2..10 { + // ranks that don't exist select the next highest one + assert_eq!(tested_set.rank_if_exists(rank), None); + assert_eq!(tested_set.rank(rank), 1); + assert_eq!(tested_set.select(tested_set.rank(rank)), 10); + } + assert_eq!(tested_set.rank(10), 1); + assert_eq!(tested_set.select(tested_set.rank(10)), 10); +} diff --git a/columnar/src/column_index/serialize.rs b/columnar/src/column_index/serialize.rs index f2a99c740..107faf0dd 100644 --- a/columnar/src/column_index/serialize.rs +++ b/columnar/src/column_index/serialize.rs @@ -3,28 +3,39 @@ use std::io::Write; use common::{CountingWriter, OwnedBytes}; +use super::multivalued_index::SerializableMultivalueIndex; +use super::OptionalIndex; use crate::column_index::multivalued_index::serialize_multivalued_index; use crate::column_index::optional_index::serialize_optional_index; use crate::column_index::ColumnIndex; use crate::iterable::Iterable; -use crate::{Cardinality, RowId}; +use crate::{Cardinality, RowId, Version}; + +pub struct SerializableOptionalIndex<'a> { + pub non_null_row_ids: Box + 'a>, + pub num_rows: RowId, +} + +impl<'a> From<&'a OptionalIndex> for SerializableOptionalIndex<'a> { + fn from(optional_index: &'a OptionalIndex) -> Self { + SerializableOptionalIndex { + non_null_row_ids: Box::new(optional_index), + num_rows: optional_index.num_docs(), + } + } +} pub enum SerializableColumnIndex<'a> { Full, - Optional { - non_null_row_ids: Box + 'a>, - num_rows: RowId, - }, - // TODO remove the Arc apart from serialization this is not - // dynamic at all. - Multivalued(Box + 'a>), + Optional(SerializableOptionalIndex<'a>), + Multivalued(SerializableMultivalueIndex<'a>), } impl<'a> SerializableColumnIndex<'a> { pub fn get_cardinality(&self) -> Cardinality { match self { SerializableColumnIndex::Full => Cardinality::Full, - SerializableColumnIndex::Optional { .. } => Cardinality::Optional, + SerializableColumnIndex::Optional(_) => Cardinality::Optional, SerializableColumnIndex::Multivalued(_) => Cardinality::Multivalued, } } @@ -40,12 +51,12 @@ pub fn serialize_column_index( output.write_all(&[cardinality])?; match column_index { SerializableColumnIndex::Full => {} - SerializableColumnIndex::Optional { + SerializableColumnIndex::Optional(SerializableOptionalIndex { non_null_row_ids, num_rows, - } => serialize_optional_index(non_null_row_ids.as_ref(), num_rows, &mut output)?, + }) => serialize_optional_index(non_null_row_ids.as_ref(), num_rows, &mut output)?, SerializableColumnIndex::Multivalued(multivalued_index) => { - serialize_multivalued_index(&*multivalued_index, &mut output)? + serialize_multivalued_index(&multivalued_index, &mut output)? } } let column_index_num_bytes = output.written_bytes() as u32; @@ -53,7 +64,10 @@ pub fn serialize_column_index( } /// Open a serialized column index. -pub fn open_column_index(mut bytes: OwnedBytes) -> io::Result { +pub fn open_column_index( + mut bytes: OwnedBytes, + format_version: Version, +) -> io::Result { if bytes.is_empty() { return Err(io::Error::new( io::ErrorKind::UnexpectedEof, @@ -70,7 +84,8 @@ pub fn open_column_index(mut bytes: OwnedBytes) -> io::Result { Ok(ColumnIndex::Optional(optional_index)) } Cardinality::Multivalued => { - let multivalue_index = super::multivalued_index::open_multivalued_index(bytes)?; + let multivalue_index = + super::multivalued_index::open_multivalued_index(bytes, format_version)?; Ok(ColumnIndex::Multivalued(multivalue_index)) } } diff --git a/columnar/src/columnar/format_version.rs b/columnar/src/columnar/format_version.rs index a46913be8..1314fb754 100644 --- a/columnar/src/columnar/format_version.rs +++ b/columnar/src/columnar/format_version.rs @@ -11,7 +11,7 @@ const MAGIC_BYTES: [u8; 4] = [2, 113, 119, 66]; pub fn footer() -> [u8; VERSION_FOOTER_NUM_BYTES] { let mut footer_bytes = [0u8; VERSION_FOOTER_NUM_BYTES]; - footer_bytes[0..4].copy_from_slice(&Version::V1.to_bytes()); + footer_bytes[0..4].copy_from_slice(&CURRENT_VERSION.to_bytes()); footer_bytes[4..8].copy_from_slice(&MAGIC_BYTES[..]); footer_bytes } @@ -23,18 +23,20 @@ pub fn parse_footer(footer_bytes: [u8; VERSION_FOOTER_NUM_BYTES]) -> Result fmt::Result { match self { Version::V1 => write!(f, "v1"), + Version::V2 => write!(f, "v2"), } } } @@ -48,6 +50,7 @@ impl Version { let code = u32::from_le_bytes(bytes); match code { 1u32 => Ok(Version::V1), + 2u32 => Ok(Version::V2), _ => Err(InvalidData), } } @@ -60,9 +63,9 @@ mod tests { use super::*; #[test] - fn test_footer_dserialization() { + fn test_footer_deserialization() { let parsed_version: Version = parse_footer(footer()).unwrap(); - assert_eq!(Version::V1, parsed_version); + assert_eq!(Version::V2, parsed_version); } #[test] @@ -76,11 +79,10 @@ mod tests { for &i in &version_to_tests { let version_res = Version::try_from_bytes(i.to_le_bytes()); if let Ok(version) = version_res { - assert_eq!(version, Version::V1); assert_eq!(version.to_bytes(), i.to_le_bytes()); valid_versions.insert(i); } } - assert_eq!(valid_versions.len(), 1); + assert_eq!(valid_versions.len(), 2); } } diff --git a/columnar/src/columnar/merge/merge_mapping.rs b/columnar/src/columnar/merge/merge_mapping.rs index 078ed44bb..842886182 100644 --- a/columnar/src/columnar/merge/merge_mapping.rs +++ b/columnar/src/columnar/merge/merge_mapping.rs @@ -59,7 +59,6 @@ pub enum MergeRowOrder { Stack(StackMergeOrder), /// Some more complex mapping, that may interleaves rows from the different readers and /// drop rows, or do both. - /// TODO: remove ordering part here Shuffled(ShuffleMergeOrder), } diff --git a/columnar/src/columnar/merge/mod.rs b/columnar/src/columnar/merge/mod.rs index 9f7666e8f..d970d6861 100644 --- a/columnar/src/columnar/merge/mod.rs +++ b/columnar/src/columnar/merge/mod.rs @@ -7,7 +7,6 @@ use std::io; use std::net::Ipv6Addr; use std::sync::Arc; -use itertools::Itertools; pub use merge_mapping::{MergeRowOrder, ShuffleMergeOrder, StackMergeOrder}; use super::writer::ColumnarSerializer; @@ -371,20 +370,8 @@ fn is_empty_after_merge( true } ColumnIndex::Multivalued(multivalued_index) => { - for (doc_id, (start_index, end_index)) in multivalued_index - .start_index_column - .iter() - .tuple_windows() - .enumerate() - { - let doc_id = doc_id as u32; - if start_index == end_index { - // There are no values in this document - continue; - } - // The document contains values and is present in the alive bitset. - // The column is therefore not empty. - if alive_bitset.contains(doc_id) { + for alive_docid in alive_bitset.iter() { + if !multivalued_index.range(alive_docid).is_empty() { return false; } } diff --git a/columnar/src/columnar/merge/tests.rs b/columnar/src/columnar/merge/tests.rs index 697fe3d24..1419bf2c9 100644 --- a/columnar/src/columnar/merge/tests.rs +++ b/columnar/src/columnar/merge/tests.rs @@ -1,3 +1,5 @@ +use itertools::Itertools; + use super::*; use crate::{Cardinality, ColumnarWriter, HasAssociatedColumnType, RowId}; diff --git a/columnar/src/columnar/reader/mod.rs b/columnar/src/columnar/reader/mod.rs index 23af3f0ee..f850e4f6f 100644 --- a/columnar/src/columnar/reader/mod.rs +++ b/columnar/src/columnar/reader/mod.rs @@ -6,7 +6,7 @@ use sstable::{Dictionary, RangeSSTable}; use crate::columnar::{format_version, ColumnType}; use crate::dynamic_column::DynamicColumnHandle; -use crate::RowId; +use crate::{RowId, Version}; fn io_invalid_data(msg: String) -> io::Error { io::Error::new(io::ErrorKind::InvalidData, msg) @@ -19,6 +19,7 @@ pub struct ColumnarReader { column_dictionary: Dictionary, column_data: FileSlice, num_rows: RowId, + format_version: Version, } impl fmt::Debug for ColumnarReader { @@ -53,6 +54,7 @@ impl fmt::Debug for ColumnarReader { fn read_all_columns_in_stream( mut stream: sstable::Streamer<'_, RangeSSTable>, column_data: &FileSlice, + format_version: Version, ) -> io::Result> { let mut results = Vec::new(); while stream.advance() { @@ -67,6 +69,7 @@ fn read_all_columns_in_stream( let dynamic_column_handle = DynamicColumnHandle { file_slice, column_type, + format_version, }; results.push(dynamic_column_handle); } @@ -88,7 +91,7 @@ impl ColumnarReader { let num_rows = u32::deserialize(&mut &footer_bytes[8..12])?; let version_footer_bytes: [u8; format_version::VERSION_FOOTER_NUM_BYTES] = footer_bytes[12..].try_into().unwrap(); - let _version = format_version::parse_footer(version_footer_bytes)?; + let format_version = format_version::parse_footer(version_footer_bytes)?; let (column_data, sstable) = file_slice_without_sstable_len.split_from_end(sstable_len as usize); let column_dictionary = Dictionary::open(sstable)?; @@ -96,6 +99,7 @@ impl ColumnarReader { column_dictionary, column_data, num_rows, + format_version, }) } @@ -126,6 +130,7 @@ impl ColumnarReader { let column_handle = DynamicColumnHandle { file_slice, column_type, + format_version: self.format_version, }; Some((column_name, column_handle)) } else { @@ -167,7 +172,7 @@ impl ColumnarReader { .stream_for_column_range(column_name) .into_stream_async() .await?; - read_all_columns_in_stream(stream, &self.column_data) + read_all_columns_in_stream(stream, &self.column_data, self.format_version) } /// Get all columns for the given column name. @@ -176,7 +181,7 @@ impl ColumnarReader { /// different types. pub fn read_columns(&self, column_name: &str) -> io::Result> { let stream = self.stream_for_column_range(column_name).into_stream()?; - read_all_columns_in_stream(stream, &self.column_data) + read_all_columns_in_stream(stream, &self.column_data, self.format_version) } /// Return the number of columns in the columnar. diff --git a/columnar/src/columnar/writer/mod.rs b/columnar/src/columnar/writer/mod.rs index d5fda430c..239a7422d 100644 --- a/columnar/src/columnar/writer/mod.rs +++ b/columnar/src/columnar/writer/mod.rs @@ -12,7 +12,7 @@ use common::CountingWriter; pub(crate) use serializer::ColumnarSerializer; use stacker::{Addr, ArenaHashMap, MemoryArena}; -use crate::column_index::SerializableColumnIndex; +use crate::column_index::{SerializableColumnIndex, SerializableOptionalIndex}; use crate::column_values::{MonotonicallyMappableToU128, MonotonicallyMappableToU64}; use crate::columnar::column_type::ColumnType; use crate::columnar::writer::column_writers::{ @@ -554,16 +554,16 @@ fn send_to_serialize_column_mappable_to_u128< let optional_index_builder = value_index_builders.borrow_optional_index_builder(); consume_operation_iterator(op_iterator, optional_index_builder, values); let optional_index = optional_index_builder.finish(num_rows); - SerializableColumnIndex::Optional { + SerializableColumnIndex::Optional(SerializableOptionalIndex { num_rows, non_null_row_ids: Box::new(optional_index), - } + }) } Cardinality::Multivalued => { let multivalued_index_builder = value_index_builders.borrow_multivalued_index_builder(); consume_operation_iterator(op_iterator, multivalued_index_builder, values); - let multivalued_index = multivalued_index_builder.finish(num_rows); - SerializableColumnIndex::Multivalued(Box::new(multivalued_index)) + let serializable_multivalued_index = multivalued_index_builder.finish(num_rows); + SerializableColumnIndex::Multivalued(serializable_multivalued_index) } }; crate::column::serialize_column_mappable_to_u128( @@ -574,15 +574,6 @@ fn send_to_serialize_column_mappable_to_u128< Ok(()) } -fn sort_values_within_row_in_place(multivalued_index: &[RowId], values: &mut [u64]) { - let mut start_index: usize = 0; - for end_index in multivalued_index.iter().copied() { - let end_index = end_index as usize; - values[start_index..end_index].sort_unstable(); - start_index = end_index; - } -} - fn send_to_serialize_column_mappable_to_u64( op_iterator: impl Iterator>, cardinality: Cardinality, @@ -606,19 +597,22 @@ fn send_to_serialize_column_mappable_to_u64( let optional_index_builder = value_index_builders.borrow_optional_index_builder(); consume_operation_iterator(op_iterator, optional_index_builder, values); let optional_index = optional_index_builder.finish(num_rows); - SerializableColumnIndex::Optional { + SerializableColumnIndex::Optional(SerializableOptionalIndex { non_null_row_ids: Box::new(optional_index), num_rows, - } + }) } Cardinality::Multivalued => { let multivalued_index_builder = value_index_builders.borrow_multivalued_index_builder(); consume_operation_iterator(op_iterator, multivalued_index_builder, values); - let multivalued_index = multivalued_index_builder.finish(num_rows); + let serializable_multivalued_index = multivalued_index_builder.finish(num_rows); if sort_values_within_row { - sort_values_within_row_in_place(multivalued_index, values); + sort_values_within_row_in_place( + serializable_multivalued_index.start_offsets.boxed_iter(), + values, + ); } - SerializableColumnIndex::Multivalued(Box::new(multivalued_index)) + SerializableColumnIndex::Multivalued(serializable_multivalued_index) } }; crate::column::serialize_column_mappable_to_u64( @@ -629,6 +623,18 @@ fn send_to_serialize_column_mappable_to_u64( Ok(()) } +fn sort_values_within_row_in_place( + multivalued_index: impl Iterator, + values: &mut [u64], +) { + let mut start_index: usize = 0; + for end_index in multivalued_index { + let end_index = end_index as usize; + values[start_index..end_index].sort_unstable(); + start_index = end_index; + } +} + fn coerce_numerical_symbol( operation_iterator: impl Iterator>, ) -> impl Iterator> diff --git a/columnar/src/columnar/writer/value_index.rs b/columnar/src/columnar/writer/value_index.rs index ab57a7a7f..a35432e3a 100644 --- a/columnar/src/columnar/writer/value_index.rs +++ b/columnar/src/columnar/writer/value_index.rs @@ -1,3 +1,4 @@ +use crate::column_index::{SerializableMultivalueIndex, SerializableOptionalIndex}; use crate::iterable::Iterable; use crate::RowId; @@ -59,31 +60,47 @@ impl IndexBuilder for OptionalIndexBuilder { #[derive(Default)] pub struct MultivaluedIndexBuilder { - start_offsets: Vec, + doc_with_values: Vec, + start_offsets: Vec, total_num_vals_seen: u32, + current_row: RowId, + current_row_has_value: bool, } impl MultivaluedIndexBuilder { - pub fn finish(&mut self, num_docs: RowId) -> &[u32] { - self.start_offsets - .resize(num_docs as usize + 1, self.total_num_vals_seen); - &self.start_offsets[..] + pub fn finish(&mut self, num_docs: RowId) -> SerializableMultivalueIndex<'_> { + self.start_offsets.push(self.total_num_vals_seen); + let non_null_row_ids: Box> = Box::new(&self.doc_with_values[..]); + SerializableMultivalueIndex { + doc_ids_with_values: SerializableOptionalIndex { + non_null_row_ids, + num_rows: num_docs, + }, + start_offsets: Box::new(&self.start_offsets[..]), + } } fn reset(&mut self) { + self.doc_with_values.clear(); self.start_offsets.clear(); - self.start_offsets.push(0u32); self.total_num_vals_seen = 0; + self.current_row = 0; + self.current_row_has_value = false; } } impl IndexBuilder for MultivaluedIndexBuilder { fn record_row(&mut self, row_id: RowId) { - self.start_offsets - .resize(row_id as usize + 1, self.total_num_vals_seen); + self.current_row = row_id; + self.current_row_has_value = false; } fn record_value(&mut self) { + if !self.current_row_has_value { + self.current_row_has_value = true; + self.doc_with_values.push(self.current_row); + self.start_offsets.push(self.total_num_vals_seen); + } self.total_num_vals_seen += 1; } } @@ -141,6 +158,32 @@ mod tests { ); } + #[test] + fn test_multivalued_value_index_builder_simple() { + let mut multivalued_value_index_builder = MultivaluedIndexBuilder::default(); + { + multivalued_value_index_builder.record_row(0u32); + multivalued_value_index_builder.record_value(); + multivalued_value_index_builder.record_value(); + let serialized_multivalue_index = multivalued_value_index_builder.finish(1u32); + let start_offsets: Vec = serialized_multivalue_index + .start_offsets + .boxed_iter() + .collect(); + assert_eq!(&start_offsets, &[0, 2]); + } + multivalued_value_index_builder.reset(); + multivalued_value_index_builder.record_row(0u32); + multivalued_value_index_builder.record_value(); + multivalued_value_index_builder.record_value(); + let serialized_multivalue_index = multivalued_value_index_builder.finish(1u32); + let start_offsets: Vec = serialized_multivalue_index + .start_offsets + .boxed_iter() + .collect(); + assert_eq!(&start_offsets, &[0, 2]); + } + #[test] fn test_multivalued_value_index_builder() { let mut multivalued_value_index_builder = MultivaluedIndexBuilder::default(); @@ -149,17 +192,15 @@ mod tests { multivalued_value_index_builder.record_value(); multivalued_value_index_builder.record_row(2u32); multivalued_value_index_builder.record_value(); - assert_eq!( - multivalued_value_index_builder.finish(4u32).to_vec(), - vec![0, 0, 2, 3, 3] - ); - multivalued_value_index_builder.reset(); - multivalued_value_index_builder.record_row(2u32); - multivalued_value_index_builder.record_value(); - multivalued_value_index_builder.record_value(); - assert_eq!( - multivalued_value_index_builder.finish(4u32).to_vec(), - vec![0, 0, 0, 2, 2] - ); + let SerializableMultivalueIndex { + doc_ids_with_values, + start_offsets, + } = multivalued_value_index_builder.finish(4u32); + assert_eq!(doc_ids_with_values.num_rows, 4u32); + let doc_ids_with_values: Vec = + doc_ids_with_values.non_null_row_ids.boxed_iter().collect(); + assert_eq!(&doc_ids_with_values, &[1u32, 2u32]); + let start_offsets: Vec = start_offsets.boxed_iter().collect(); + assert_eq!(&start_offsets[..], &[0, 2, 3]); } } diff --git a/columnar/src/compat_tests.rs b/columnar/src/compat_tests.rs index cbb11333e..8a504ab26 100644 --- a/columnar/src/compat_tests.rs +++ b/columnar/src/compat_tests.rs @@ -1,24 +1,29 @@ use std::path::PathBuf; -use crate::{Column, ColumnarReader, DynamicColumn, CURRENT_VERSION}; +use itertools::Itertools; + +use crate::{ + merge_columnar, Cardinality, Column, ColumnarReader, DynamicColumn, StackMergeOrder, + CURRENT_VERSION, +}; const NUM_DOCS: u32 = u16::MAX as u32; -fn generate_columnar(num_docs: u32) -> Vec { +fn generate_columnar(num_docs: u32, value_offset: u64) -> Vec { use crate::ColumnarWriter; let mut columnar_writer = ColumnarWriter::default(); for i in 0..num_docs { if i % 100 == 0 { - columnar_writer.record_numerical(i, "sparse", i as u64); + columnar_writer.record_numerical(i, "sparse", value_offset + i as u64); } - if i % 2 == 0 { - columnar_writer.record_numerical(i, "dense", i as u64); + if i % 5 == 0 { + columnar_writer.record_numerical(i, "dense", value_offset + i as u64); } - columnar_writer.record_numerical(i, "full", i as u64); - columnar_writer.record_numerical(i, "multi", i as u64); - columnar_writer.record_numerical(i, "multi", i as u64); + columnar_writer.record_numerical(i, "full", value_offset + i as u64); + columnar_writer.record_numerical(i, "multi", value_offset + i as u64); + columnar_writer.record_numerical(i, "multi", value_offset + i as u64); } let mut wrt: Vec = Vec::new(); @@ -35,7 +40,7 @@ fn create_format() { if PathBuf::from(file_path.clone()).exists() { return; } - let columnar = generate_columnar(NUM_DOCS); + let columnar = generate_columnar(NUM_DOCS, 0); std::fs::write(file_path, columnar).unwrap(); } @@ -49,27 +54,120 @@ fn test_format_v1() { test_format(&path); } +#[test] +fn test_format_v2() { + let path = path_for_version("v2"); + test_format(&path); +} + fn test_format(path: &str) { let file_content = std::fs::read(path).unwrap(); let reader = ColumnarReader::open(file_content).unwrap(); - let column = open_column(&reader, "full"); - assert_eq!(column.first(0).unwrap(), 0); - assert_eq!(column.first(NUM_DOCS - 1).unwrap(), NUM_DOCS as u64 - 1); + check_columns(&reader); - let column = open_column(&reader, "multi"); - assert_eq!(column.first(0).unwrap(), 0); - assert_eq!(column.first(NUM_DOCS - 1).unwrap(), NUM_DOCS as u64 - 1); + // Test merge + let reader2 = ColumnarReader::open(generate_columnar(NUM_DOCS, NUM_DOCS as u64)).unwrap(); + let columnar_readers = vec![&reader, &reader2]; + let merge_row_order = StackMergeOrder::stack(&columnar_readers[..]); + let mut out = Vec::new(); + merge_columnar(&columnar_readers, &[], merge_row_order.into(), &mut out).unwrap(); + let reader = ColumnarReader::open(out).unwrap(); + check_columns(&reader); +} - let column = open_column(&reader, "sparse"); - assert_eq!(column.first(0).unwrap(), 0); - assert_eq!(column.first(NUM_DOCS - 1), None); - assert_eq!(column.first(65000), Some(65000)); +fn check_columns(reader: &ColumnarReader) { + let column = open_column(reader, "full"); + check_column(&column, |doc_id| vec![(doc_id, doc_id as u64).into()]); + assert_eq!(column.get_cardinality(), Cardinality::Full); - let column = open_column(&reader, "dense"); - assert_eq!(column.first(0).unwrap(), 0); - assert_eq!(column.first(NUM_DOCS - 1).unwrap(), NUM_DOCS as u64 - 1); - assert_eq!(column.first(NUM_DOCS - 2), None); + let column = open_column(reader, "multi"); + check_column(&column, |doc_id| { + vec![ + (doc_id * 2, doc_id as u64).into(), + (doc_id * 2 + 1, doc_id as u64).into(), + ] + }); + assert_eq!(column.get_cardinality(), Cardinality::Multivalued); + + let column = open_column(reader, "sparse"); + check_column(&column, |doc_id| { + if doc_id % 100 == 0 { + vec![(doc_id / 100, doc_id as u64).into()] + } else { + vec![] + } + }); + assert_eq!(column.get_cardinality(), Cardinality::Optional); + + let column = open_column(reader, "dense"); + check_column(&column, |doc_id| { + if doc_id % 5 == 0 { + vec![(doc_id / 5, doc_id as u64).into()] + } else { + vec![] + } + }); + assert_eq!(column.get_cardinality(), Cardinality::Optional); +} + +struct RowIdAndValue { + row_id: u32, + value: u64, +} +impl From<(u32, u64)> for RowIdAndValue { + fn from((row_id, value): (u32, u64)) -> Self { + Self { row_id, value } + } +} + +fn check_column Vec>(column: &Column, expected: F) { + let num_docs = column.num_docs(); + let test_doc = |doc: u32| { + if expected(doc).is_empty() { + assert_eq!(column.first(doc), None); + } else { + assert_eq!(column.first(doc), Some(expected(doc)[0].value)); + } + let values = column.values_for_doc(doc).collect_vec(); + assert_eq!(values, expected(doc).iter().map(|x| x.value).collect_vec()); + let mut row_ids = Vec::new(); + column.row_ids_for_docs(&[doc], &mut vec![], &mut row_ids); + assert_eq!( + row_ids, + expected(doc).iter().map(|x| x.row_id).collect_vec() + ); + let values = column.values_for_doc(doc).collect_vec(); + assert_eq!(values, expected(doc).iter().map(|x| x.value).collect_vec()); + + // Docid rowid conversion + let mut row_ids = Vec::new(); + let safe_next_doc = |doc: u32| (doc + 1).min(num_docs - 1); + column + .index + .docids_to_rowids(&[doc, safe_next_doc(doc)], &mut vec![], &mut row_ids); + let expected_rowids = expected(doc) + .iter() + .map(|x| x.row_id) + .chain(expected(safe_next_doc(doc)).iter().map(|x| x.row_id)) + .collect_vec(); + assert_eq!(row_ids, expected_rowids); + let rowid_range = column + .index + .docid_range_to_rowids(doc..safe_next_doc(doc) + 1); + if expected_rowids.is_empty() { + assert!(rowid_range.is_empty()); + } else { + assert_eq!( + rowid_range, + expected_rowids[0]..expected_rowids.last().unwrap() + 1 + ); + } + }; + test_doc(0); + test_doc(num_docs - 1); + test_doc(num_docs - 2); + test_doc(65000); } fn open_column(reader: &ColumnarReader, name: &str) -> Column { diff --git a/columnar/src/dynamic_column.rs b/columnar/src/dynamic_column.rs index 0a18d4207..2b9d69770 100644 --- a/columnar/src/dynamic_column.rs +++ b/columnar/src/dynamic_column.rs @@ -8,7 +8,7 @@ use common::{ByteCount, DateTime, HasLen, OwnedBytes}; use crate::column::{BytesColumn, Column, StrColumn}; use crate::column_values::{monotonic_map_column, StrictlyMonotonicFn}; use crate::columnar::ColumnType; -use crate::{Cardinality, ColumnIndex, ColumnValues, NumericalType}; +use crate::{Cardinality, ColumnIndex, ColumnValues, NumericalType, Version}; #[derive(Clone)] pub enum DynamicColumn { @@ -232,6 +232,7 @@ static_dynamic_conversions!(Column, IpAddr); pub struct DynamicColumnHandle { pub(crate) file_slice: FileSlice, pub(crate) column_type: ColumnType, + pub(crate) format_version: Version, } impl DynamicColumnHandle { @@ -260,11 +261,15 @@ impl DynamicColumnHandle { let column_bytes = self.file_slice.read_bytes()?; match self.column_type { ColumnType::Str | ColumnType::Bytes => { - let column: BytesColumn = crate::column::open_column_bytes(column_bytes)?; + let column: BytesColumn = + crate::column::open_column_bytes(column_bytes, self.format_version)?; Ok(Some(column.term_ord_column)) } ColumnType::IpAddr => { - let column = crate::column::open_column_u128_as_compact_u64(column_bytes)?; + let column = crate::column::open_column_u128_as_compact_u64( + column_bytes, + self.format_version, + )?; Ok(Some(column)) } ColumnType::Bool @@ -272,7 +277,8 @@ impl DynamicColumnHandle { | ColumnType::U64 | ColumnType::F64 | ColumnType::DateTime => { - let column = crate::column::open_column_u64::(column_bytes)?; + let column = + crate::column::open_column_u64::(column_bytes, self.format_version)?; Ok(Some(column)) } } @@ -280,15 +286,31 @@ impl DynamicColumnHandle { fn open_internal(&self, column_bytes: OwnedBytes) -> io::Result { let dynamic_column: DynamicColumn = match self.column_type { - ColumnType::Bytes => crate::column::open_column_bytes(column_bytes)?.into(), - ColumnType::Str => crate::column::open_column_str(column_bytes)?.into(), - ColumnType::I64 => crate::column::open_column_u64::(column_bytes)?.into(), - ColumnType::U64 => crate::column::open_column_u64::(column_bytes)?.into(), - ColumnType::F64 => crate::column::open_column_u64::(column_bytes)?.into(), - ColumnType::Bool => crate::column::open_column_u64::(column_bytes)?.into(), - ColumnType::IpAddr => crate::column::open_column_u128::(column_bytes)?.into(), + ColumnType::Bytes => { + crate::column::open_column_bytes(column_bytes, self.format_version)?.into() + } + ColumnType::Str => { + crate::column::open_column_str(column_bytes, self.format_version)?.into() + } + ColumnType::I64 => { + crate::column::open_column_u64::(column_bytes, self.format_version)?.into() + } + ColumnType::U64 => { + crate::column::open_column_u64::(column_bytes, self.format_version)?.into() + } + ColumnType::F64 => { + crate::column::open_column_u64::(column_bytes, self.format_version)?.into() + } + ColumnType::Bool => { + crate::column::open_column_u64::(column_bytes, self.format_version)?.into() + } + ColumnType::IpAddr => { + crate::column::open_column_u128::(column_bytes, self.format_version)? + .into() + } ColumnType::DateTime => { - crate::column::open_column_u64::(column_bytes)?.into() + crate::column::open_column_u64::(column_bytes, self.format_version)? + .into() } }; Ok(dynamic_column) diff --git a/columnar/src/iterable.rs b/columnar/src/iterable.rs index ec9c88665..f59d37325 100644 --- a/columnar/src/iterable.rs +++ b/columnar/src/iterable.rs @@ -1,4 +1,7 @@ use std::ops::Range; +use std::sync::Arc; + +use crate::{ColumnValues, RowId}; pub trait Iterable { fn boxed_iter(&self) -> Box + '_>; @@ -17,3 +20,9 @@ where Range: Iterator Box::new(self.clone()) } } + +impl Iterable for Arc> { + fn boxed_iter(&self) -> Box + '_> { + Box::new(self.iter().map(|row_id| row_id as u64)) + } +} diff --git a/columnar/src/tests.rs b/columnar/src/tests.rs index efdb9d050..b3d435b3e 100644 --- a/columnar/src/tests.rs +++ b/columnar/src/tests.rs @@ -79,7 +79,7 @@ fn test_dataframe_writer_u64_multivalued() { assert_eq!(columnar.num_columns(), 1); let cols: Vec = columnar.read_columns("divisor").unwrap(); assert_eq!(cols.len(), 1); - assert_eq!(cols[0].num_bytes(), 29); + assert_eq!(cols[0].num_bytes(), 50); let dyn_i64_col = cols[0].open().unwrap(); let DynamicColumn::I64(divisor_col) = dyn_i64_col else { panic!(); @@ -304,7 +304,7 @@ fn column_value_strategy() -> impl Strategy { ip_addr_byte ))), 1 => any::().prop_map(ColumnValue::Bool), - 1 => (0_679_723_993i64..1_679_723_995i64) + 1 => (679_723_993i64..1_679_723_995i64) .prop_map(|val| { ColumnValue::DateTime(DateTime::from_timestamp_secs(val)) }) ] } @@ -392,6 +392,7 @@ fn assert_columnar_eq( } } +#[track_caller] fn assert_column_eq( left: &Column, right: &Column, @@ -740,24 +741,68 @@ fn columnar_docs_and_remap( proptest! { #![proptest_config(ProptestConfig::with_cases(1000))] #[test] - fn test_columnar_merge_and_remap_proptest((columnar_docs, shuffle_merge_order) in columnar_docs_and_remap()) { - let shuffled_rows: Vec> = shuffle_merge_order.iter() - .map(|row_addr| columnar_docs[row_addr.segment_ord as usize][row_addr.row_id as usize].clone()) - .collect(); - let expected_merged_columnar = build_columnar(&shuffled_rows[..]); - let columnar_readers: Vec = columnar_docs.iter() - .map(|docs| build_columnar(&docs[..])) - .collect::>(); - let columnar_readers_arr: Vec<&ColumnarReader> = columnar_readers.iter().collect(); - let mut output: Vec = Vec::new(); - let segment_num_rows: Vec = columnar_docs.iter().map(|docs| docs.len() as RowId).collect(); - let shuffle_merge_order = ShuffleMergeOrder::for_test(&segment_num_rows, shuffle_merge_order); - crate::merge_columnar(&columnar_readers_arr[..], &[], shuffle_merge_order.into(), &mut output).unwrap(); - let merged_columnar = ColumnarReader::open(output).unwrap(); - assert_columnar_eq(&merged_columnar, &expected_merged_columnar, true); + fn test_columnar_merge_and_remap_proptest((columnar_docs, shuffle_merge_order) in +columnar_docs_and_remap()) { + test_columnar_merge_and_remap(columnar_docs, shuffle_merge_order); } } +fn test_columnar_merge_and_remap( + columnar_docs: Vec>>, + shuffle_merge_order: Vec, +) { + let shuffled_rows: Vec> = shuffle_merge_order + .iter() + .map(|row_addr| { + columnar_docs[row_addr.segment_ord as usize][row_addr.row_id as usize].clone() + }) + .collect(); + let expected_merged_columnar = build_columnar(&shuffled_rows[..]); + let columnar_readers: Vec = columnar_docs + .iter() + .map(|docs| build_columnar(&docs[..])) + .collect::>(); + let columnar_readers_ref: Vec<&ColumnarReader> = columnar_readers.iter().collect(); + let mut output: Vec = Vec::new(); + let segment_num_rows: Vec = columnar_docs + .iter() + .map(|docs| docs.len() as RowId) + .collect(); + let shuffle_merge_order = ShuffleMergeOrder::for_test(&segment_num_rows, shuffle_merge_order); + crate::merge_columnar( + &columnar_readers_ref[..], + &[], + shuffle_merge_order.into(), + &mut output, + ) + .unwrap(); + let merged_columnar = ColumnarReader::open(output).unwrap(); + assert_columnar_eq(&merged_columnar, &expected_merged_columnar, true); +} + +#[test] +fn test_columnar_merge_and_remap_bug_1() { + let columnar_docs = vec![vec![ + vec![ + ("c1", ColumnValue::Numerical(NumericalValue::U64(0))), + ("c1", ColumnValue::Numerical(NumericalValue::U64(0))), + ], + vec![], + ]]; + let shuffle_merge_order: Vec = vec![ + RowAddr { + segment_ord: 0, + row_id: 1, + }, + RowAddr { + segment_ord: 0, + row_id: 0, + }, + ]; + + test_columnar_merge_and_remap(columnar_docs, shuffle_merge_order); +} + #[test] fn test_columnar_merge_empty() { let columnar_reader_1 = build_columnar(&[]); diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 0b62a0178..4792574d0 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -2212,7 +2212,7 @@ mod tests { #[test] fn test_fast_field_range() { - let ops: Vec<_> = (0..1000).map(|id| IndexingOp::add(id)).collect(); + let ops: Vec<_> = (0..1000).map(IndexingOp::add).collect(); assert!(test_operation_strategy(&ops, true).is_ok()); }