diff --git a/columnar/benches/bench_merge.rs b/columnar/benches/bench_merge.rs index 2e8d19864..665e5a48a 100644 --- a/columnar/benches/bench_merge.rs +++ b/columnar/benches/bench_merge.rs @@ -22,7 +22,7 @@ impl Display for Card { } } -const NUM_DOCS: u32 = 100_000; +const NUM_DOCS: u32 = 1_000_000; fn generate_columnar(card: Card, num_docs: u32) -> ColumnarReader { use tantivy_columnar::ColumnarWriter; @@ -88,12 +88,8 @@ fn main() { let columnar_readers = columnar_readers.iter().collect::>(); let merge_row_order = StackMergeOrder::stack(&columnar_readers[..]); - let _ = black_box(merge_columnar( - &columnar_readers, - &[], - merge_row_order.into(), - &mut out, - )); + merge_columnar(&columnar_readers, &[], merge_row_order.into(), &mut out).unwrap(); + black_box(out); }, ); } diff --git a/columnar/src/column_index/merge/mod.rs b/columnar/src/column_index/merge/mod.rs index 1aec9f71c..cc584fd40 100644 --- a/columnar/src/column_index/merge/mod.rs +++ b/columnar/src/column_index/merge/mod.rs @@ -73,14 +73,18 @@ fn detect_cardinality( pub fn merge_column_index<'a>( columns: &'a [ColumnIndex], merge_row_order: &'a MergeRowOrder, + num_values: u32, ) -> SerializableColumnIndex<'a> { // For simplification, we do not try to detect whether the cardinality could be // downgraded thanks to deletes. let cardinality_after_merge = detect_cardinality(columns, merge_row_order); match merge_row_order { - MergeRowOrder::Stack(stack_merge_order) => { - merge_column_index_stacked(columns, cardinality_after_merge, stack_merge_order) - } + MergeRowOrder::Stack(stack_merge_order) => merge_column_index_stacked( + columns, + cardinality_after_merge, + stack_merge_order, + num_values, + ), MergeRowOrder::Shuffled(complex_merge_order) => { merge_column_index_shuffled(columns, cardinality_after_merge, complex_merge_order) } @@ -167,8 +171,12 @@ mod tests { ], ) .into(); - let merged_column_index = merge_column_index(&column_indexes[..], &merge_row_order); - let SerializableColumnIndex::Multivalued(start_index_iterable) = merged_column_index else { + let merged_column_index = merge_column_index(&column_indexes[..], &merge_row_order, 3); + let SerializableColumnIndex::Multivalued { + indices: start_index_iterable, + .. + } = merged_column_index + else { panic!("Excpected a multivalued index") }; let start_indexes: Vec = start_index_iterable.boxed_iter().collect(); @@ -200,8 +208,12 @@ mod tests { ], ) .into(); - let merged_column_index = merge_column_index(&column_indexes[..], &merge_row_order); - let SerializableColumnIndex::Multivalued(start_index_iterable) = merged_column_index else { + let merged_column_index = merge_column_index(&column_indexes[..], &merge_row_order, 6); + let SerializableColumnIndex::Multivalued { + indices: start_index_iterable, + .. + } = merged_column_index + else { panic!("Excpected a multivalued index") }; let start_indexes: Vec = start_index_iterable.boxed_iter().collect(); diff --git a/columnar/src/column_index/merge/shuffled.rs b/columnar/src/column_index/merge/shuffled.rs index f93b89635..400cedf6a 100644 --- a/columnar/src/column_index/merge/shuffled.rs +++ b/columnar/src/column_index/merge/shuffled.rs @@ -22,7 +22,10 @@ pub fn merge_column_index_shuffled<'a>( Cardinality::Multivalued => { let multivalue_start_index = merge_column_index_shuffled_multivalued(column_indexes, shuffle_merge_order); - SerializableColumnIndex::Multivalued(multivalue_start_index) + SerializableColumnIndex::Multivalued { + indices: multivalue_start_index, + stats: None, + } } } } diff --git a/columnar/src/column_index/merge/stacked.rs b/columnar/src/column_index/merge/stacked.rs index ba91b8d64..2f9c1c008 100644 --- a/columnar/src/column_index/merge/stacked.rs +++ b/columnar/src/column_index/merge/stacked.rs @@ -1,6 +1,8 @@ use std::iter; +use std::num::NonZeroU64; use crate::column_index::{SerializableColumnIndex, Set}; +use crate::column_values::ColumnStats; use crate::iterable::Iterable; use crate::{Cardinality, ColumnIndex, RowId, StackMergeOrder}; @@ -12,6 +14,7 @@ pub fn merge_column_index_stacked<'a>( columns: &'a [ColumnIndex], cardinality_after_merge: Cardinality, stack_merge_order: &'a StackMergeOrder, + num_values: u32, ) -> SerializableColumnIndex<'a> { match cardinality_after_merge { Cardinality::Full => SerializableColumnIndex::Full, @@ -27,7 +30,17 @@ pub fn merge_column_index_stacked<'a>( columns, stack_merge_order, }; - SerializableColumnIndex::Multivalued(Box::new(stacked_multivalued_index)) + SerializableColumnIndex::Multivalued { + indices: Box::new(stacked_multivalued_index), + stats: Some(ColumnStats { + gcd: NonZeroU64::new(1).unwrap(), + // The values in the multivalue index are the positions of the values + min_value: 0, + max_value: num_values as u64, + // This is num docs, but it starts at 0 so we need +1 + num_rows: stack_merge_order.num_rows() + 1, + }), + } } } } diff --git a/columnar/src/column_index/multivalued_index.rs b/columnar/src/column_index/multivalued_index.rs index eab82a3e3..96f5c565d 100644 --- a/columnar/src/column_index/multivalued_index.rs +++ b/columnar/src/column_index/multivalued_index.rs @@ -6,20 +6,29 @@ use std::sync::Arc; use common::OwnedBytes; use crate::column_values::{ - load_u64_based_column_values, serialize_u64_based_column_values, CodecType, ColumnValues, + load_u64_based_column_values, serialize_u64_based_column_values, + serialize_u64_with_codec_and_stats, CodecType, ColumnStats, ColumnValues, }; use crate::iterable::Iterable; use crate::{DocId, RowId}; pub fn serialize_multivalued_index( multivalued_index: &dyn Iterable, + stats: Option, output: &mut impl Write, ) -> io::Result<()> { - serialize_u64_based_column_values( - multivalued_index, - &[CodecType::Bitpacked, CodecType::Linear], - output, - )?; + if let Some(stats) = stats { + // TODO: Add something with higher compression that doesn't require a full scan upfront + let estimator = CodecType::Bitpacked.estimator(); + assert!(!estimator.requires_full_scan()); + serialize_u64_with_codec_and_stats(multivalued_index, estimator, stats, output)?; + } else { + serialize_u64_based_column_values( + multivalued_index, + &[CodecType::Bitpacked, CodecType::Linear], + output, + )?; + } Ok(()) } @@ -52,7 +61,7 @@ impl From>> for MultiValueIndex { impl MultiValueIndex { pub fn for_test(start_offsets: &[RowId]) -> MultiValueIndex { let mut buffer = Vec::new(); - serialize_multivalued_index(&start_offsets, &mut buffer).unwrap(); + serialize_multivalued_index(&start_offsets, None, &mut buffer).unwrap(); let bytes = OwnedBytes::new(buffer); open_multivalued_index(bytes).unwrap() } diff --git a/columnar/src/column_index/serialize.rs b/columnar/src/column_index/serialize.rs index f2a99c740..a4ca0c4ea 100644 --- a/columnar/src/column_index/serialize.rs +++ b/columnar/src/column_index/serialize.rs @@ -6,6 +6,7 @@ use common::{CountingWriter, OwnedBytes}; use crate::column_index::multivalued_index::serialize_multivalued_index; use crate::column_index::optional_index::serialize_optional_index; use crate::column_index::ColumnIndex; +use crate::column_values::ColumnStats; use crate::iterable::Iterable; use crate::{Cardinality, RowId}; @@ -15,9 +16,12 @@ pub enum SerializableColumnIndex<'a> { non_null_row_ids: Box + 'a>, num_rows: RowId, }, - // TODO remove the Arc apart from serialization this is not - // dynamic at all. - Multivalued(Box + 'a>), + Multivalued { + /// Iterator emitting the indices for the index + indices: Box + 'a>, + /// In the merge case we can precompute the column stats + stats: Option, + }, } impl<'a> SerializableColumnIndex<'a> { @@ -25,7 +29,7 @@ impl<'a> SerializableColumnIndex<'a> { match self { SerializableColumnIndex::Full => Cardinality::Full, SerializableColumnIndex::Optional { .. } => Cardinality::Optional, - SerializableColumnIndex::Multivalued(_) => Cardinality::Multivalued, + SerializableColumnIndex::Multivalued { .. } => Cardinality::Multivalued, } } } @@ -44,9 +48,10 @@ pub fn serialize_column_index( non_null_row_ids, num_rows, } => serialize_optional_index(non_null_row_ids.as_ref(), num_rows, &mut output)?, - SerializableColumnIndex::Multivalued(multivalued_index) => { - serialize_multivalued_index(&*multivalued_index, &mut output)? - } + SerializableColumnIndex::Multivalued { + indices: multivalued_index, + stats, + } => serialize_multivalued_index(&*multivalued_index, stats, &mut output)?, } let column_index_num_bytes = output.written_bytes() as u32; Ok(column_index_num_bytes) diff --git a/columnar/src/column_values/mod.rs b/columnar/src/column_values/mod.rs index ef5de5154..9c03618a9 100644 --- a/columnar/src/column_values/mod.rs +++ b/columnar/src/column_values/mod.rs @@ -32,7 +32,8 @@ pub use u128_based::{ }; pub use u64_based::{ load_u64_based_column_values, serialize_and_load_u64_based_column_values, - serialize_u64_based_column_values, CodecType, ALL_U64_CODEC_TYPES, + serialize_u64_based_column_values, serialize_u64_with_codec_and_stats, CodecType, + ALL_U64_CODEC_TYPES, }; pub use vec_column::VecColumn; diff --git a/columnar/src/column_values/u64_based/bitpacked.rs b/columnar/src/column_values/u64_based/bitpacked.rs index 3ed999648..50da9a88a 100644 --- a/columnar/src/column_values/u64_based/bitpacked.rs +++ b/columnar/src/column_values/u64_based/bitpacked.rs @@ -128,6 +128,9 @@ impl ColumnCodecEstimator for BitpackedCodecEstimator { bit_packer.close(wrt)?; Ok(()) } + fn codec_type(&self) -> super::CodecType { + super::CodecType::Bitpacked + } } pub struct BitpackedCodec; diff --git a/columnar/src/column_values/u64_based/blockwise_linear.rs b/columnar/src/column_values/u64_based/blockwise_linear.rs index 2abf8205b..f1022aa71 100644 --- a/columnar/src/column_values/u64_based/blockwise_linear.rs +++ b/columnar/src/column_values/u64_based/blockwise_linear.rs @@ -163,6 +163,10 @@ impl ColumnCodecEstimator for BlockwiseLinearEstimator { Ok(()) } + + fn codec_type(&self) -> super::CodecType { + super::CodecType::BlockwiseLinear + } } pub struct BlockwiseLinearCodec; diff --git a/columnar/src/column_values/u64_based/linear.rs b/columnar/src/column_values/u64_based/linear.rs index ba0c9e641..bdbb23b14 100644 --- a/columnar/src/column_values/u64_based/linear.rs +++ b/columnar/src/column_values/u64_based/linear.rs @@ -153,6 +153,12 @@ impl ColumnCodecEstimator for LinearCodecEstimator { self.collect_before_line_estimation(value); } } + fn requires_full_scan(&self) -> bool { + true + } + fn codec_type(&self) -> super::CodecType { + super::CodecType::Linear + } } impl LinearCodecEstimator { diff --git a/columnar/src/column_values/u64_based/mod.rs b/columnar/src/column_values/u64_based/mod.rs index 7afc71e3f..e9310be63 100644 --- a/columnar/src/column_values/u64_based/mod.rs +++ b/columnar/src/column_values/u64_based/mod.rs @@ -37,7 +37,11 @@ pub trait ColumnCodecEstimator: 'static { /// This method will be called for each element of the column during /// `estimation`. fn collect(&mut self, value: u64); - /// Finalizes the first pass phase. + /// Returns true if the estimator needs a full pass over the column before serialization + fn requires_full_scan(&self) -> bool { + false + } + fn codec_type(&self) -> CodecType; fn finalize(&mut self) {} /// Returns an accurate estimation of the number of bytes that will /// be used to represent this column. @@ -150,34 +154,45 @@ pub fn serialize_u64_based_column_values( wrt: &mut dyn Write, ) -> io::Result<()> { let mut stats_collector = StatsCollector::default(); - let mut estimators: Vec<(CodecType, Box)> = - Vec::with_capacity(codec_types.len()); + let mut estimators: Vec> = Vec::with_capacity(codec_types.len()); for &codec_type in codec_types { - estimators.push((codec_type, codec_type.estimator())); + estimators.push(codec_type.estimator()); } for val in vals.boxed_iter() { let val_u64 = val.to_u64(); stats_collector.collect(val_u64); - for (_, estimator) in &mut estimators { + for estimator in &mut estimators { estimator.collect(val_u64); } } - for (_, estimator) in &mut estimators { + for estimator in &mut estimators { estimator.finalize(); } let stats = stats_collector.stats(); - let (_, best_codec, best_codec_estimator) = estimators + let (_, best_codec) = estimators .into_iter() - .flat_map(|(codec_type, estimator)| { + .flat_map(|estimator| { let num_bytes = estimator.estimate(&stats)?; - Some((num_bytes, codec_type, estimator)) + Some((num_bytes, estimator)) }) - .min_by_key(|(num_bytes, _, _)| *num_bytes) + .min_by_key(|(num_bytes, _)| *num_bytes) .ok_or_else(|| { io::Error::new(io::ErrorKind::InvalidData, "No available applicable codec.") })?; - best_codec.to_code().serialize(wrt)?; - best_codec_estimator.serialize( + serialize_u64_with_codec_and_stats(vals, best_codec, stats, wrt)?; + Ok(()) +} + +/// Serializes a given column of u64-mapped values. +/// The codec estimator needs to be collected fully for the Line codec before calling this. +pub fn serialize_u64_with_codec_and_stats( + vals: &dyn Iterable, + codec: Box, + stats: ColumnStats, + wrt: &mut dyn Write, +) -> io::Result<()> { + codec.codec_type().to_code().serialize(wrt)?; + codec.serialize( &stats, &mut vals.boxed_iter().map(MonotonicallyMappableToU64::to_u64), wrt, diff --git a/columnar/src/columnar/merge/mod.rs b/columnar/src/columnar/merge/mod.rs index 9f7666e8f..38a9d302e 100644 --- a/columnar/src/columnar/merge/mod.rs +++ b/columnar/src/columnar/merge/mod.rs @@ -3,7 +3,7 @@ mod merge_mapping; mod term_merger; use std::collections::{BTreeMap, HashSet}; -use std::io; +use std::io::{self}; use std::net::Ipv6Addr; use std::sync::Arc; @@ -156,8 +156,15 @@ fn merge_column( column_values.push(None); } } - let merged_column_index = - crate::column_index::merge_column_index(&column_indexes[..], merge_row_order); + let num_values: u32 = column_values + .iter() + .map(|vals| vals.as_ref().map(|idx| idx.num_vals()).unwrap_or(0)) + .sum(); + let merged_column_index = crate::column_index::merge_column_index( + &column_indexes[..], + merge_row_order, + num_values, + ); let merge_column_values = MergedColumnValues { column_indexes: &column_indexes[..], column_values: &column_values[..], @@ -183,8 +190,15 @@ fn merge_column( } } - let merged_column_index = - crate::column_index::merge_column_index(&column_indexes[..], merge_row_order); + let num_values: u32 = column_values + .iter() + .map(|vals| vals.as_ref().map(|idx| idx.num_vals()).unwrap_or(0)) + .sum(); + let merged_column_index = crate::column_index::merge_column_index( + &column_indexes[..], + merge_row_order, + num_values, + ); let merge_column_values = MergedColumnValues { column_indexes: &column_indexes[..], column_values: &column_values, @@ -214,8 +228,19 @@ fn merge_column( } } } - let merged_column_index = - crate::column_index::merge_column_index(&column_indexes[..], merge_row_order); + let num_values: u32 = bytes_columns + .iter() + .map(|vals| { + vals.as_ref() + .map(|idx| idx.term_ord_column.values.num_vals()) + .unwrap_or(0) + }) + .sum(); + let merged_column_index = crate::column_index::merge_column_index( + &column_indexes[..], + merge_row_order, + num_values, + ); merge_bytes_or_str_column(merged_column_index, &bytes_columns, merge_row_order, wrt)?; } } diff --git a/columnar/src/columnar/writer/mod.rs b/columnar/src/columnar/writer/mod.rs index 1fbc9d85d..9aed64a24 100644 --- a/columnar/src/columnar/writer/mod.rs +++ b/columnar/src/columnar/writer/mod.rs @@ -644,7 +644,10 @@ fn send_to_serialize_column_mappable_to_u128< let multivalued_index_builder = value_index_builders.borrow_multivalued_index_builder(); consume_operation_iterator(op_iterator, multivalued_index_builder, values); let multivalued_index = multivalued_index_builder.finish(num_rows); - SerializableColumnIndex::Multivalued(Box::new(multivalued_index)) + SerializableColumnIndex::Multivalued { + indices: Box::new(multivalued_index), + stats: Default::default(), // TODO: implement stats for u128 + } } }; crate::column::serialize_column_mappable_to_u128( @@ -699,7 +702,10 @@ fn send_to_serialize_column_mappable_to_u64( if sort_values_within_row { sort_values_within_row_in_place(multivalued_index, values); } - SerializableColumnIndex::Multivalued(Box::new(multivalued_index)) + SerializableColumnIndex::Multivalued { + indices: Box::new(multivalued_index), + stats: None, + } } }; crate::column::serialize_column_mappable_to_u64( diff --git a/columnar/src/tests.rs b/columnar/src/tests.rs index 5e5c50f55..f6e99cad2 100644 --- a/columnar/src/tests.rs +++ b/columnar/src/tests.rs @@ -738,35 +738,22 @@ proptest! { #![proptest_config(ProptestConfig::with_cases(1000))] #[test] fn test_columnar_merge_proptest(columnar_docs in proptest::collection::vec(columnar_docs_strategy(), 2..=3)) { - let columnar_readers: Vec = columnar_docs.iter() - .map(|docs| build_columnar(&docs[..])) - .collect::>(); - let columnar_readers_arr: Vec<&ColumnarReader> = columnar_readers.iter().collect(); - let mut output: Vec = Vec::new(); - let stack_merge_order = StackMergeOrder::stack(&columnar_readers_arr[..]).into(); - crate::merge_columnar(&columnar_readers_arr[..], &[], stack_merge_order, &mut output).unwrap(); - let merged_columnar = ColumnarReader::open(output).unwrap(); - let concat_rows: Vec> = columnar_docs.iter().flatten().cloned().collect(); - let expected_merged_columnar = build_columnar(&concat_rows[..]); - assert_columnar_eq_strict(&merged_columnar, &expected_merged_columnar); + test_columnar_docs(columnar_docs); } } -#[test] -fn test_columnar_merging_empty_columnar() { - let columnar_docs: Vec>> = - vec![vec![], vec![vec![("c1", ColumnValue::Str("a"))]]]; +fn test_columnar_docs(columnar_docs: Vec>>) { let columnar_readers: Vec = columnar_docs .iter() .map(|docs| build_columnar(&docs[..])) .collect::>(); let columnar_readers_arr: Vec<&ColumnarReader> = columnar_readers.iter().collect(); let mut output: Vec = Vec::new(); - let stack_merge_order = StackMergeOrder::stack(&columnar_readers_arr[..]); + let stack_merge_order = StackMergeOrder::stack(&columnar_readers_arr[..]).into(); crate::merge_columnar( &columnar_readers_arr[..], &[], - crate::MergeRowOrder::Stack(stack_merge_order), + stack_merge_order, &mut output, ) .unwrap(); @@ -777,6 +764,24 @@ fn test_columnar_merging_empty_columnar() { assert_columnar_eq_strict(&merged_columnar, &expected_merged_columnar); } +#[test] +fn test_columnar_merging_empty_columnar() { + let columnar_docs: Vec>> = + vec![vec![], vec![vec![("c1", ColumnValue::Str("a"))]]]; + test_columnar_docs(columnar_docs); +} +#[test] +fn test_columnar_merging_simple() { + let columnar_docs: Vec>> = vec![ + vec![], + vec![vec![ + ("c1", ColumnValue::Numerical(0u64.into())), + ("c1", ColumnValue::Numerical(0u64.into())), + ]], + ]; + test_columnar_docs(columnar_docs); +} + #[test] fn test_columnar_merging_number_columns() { let columnar_docs: Vec>> = vec![ @@ -793,25 +798,7 @@ fn test_columnar_merging_number_columns() { vec![("c2", ColumnValue::Numerical(u64::MAX.into()))], ], ]; - let columnar_readers: Vec = columnar_docs - .iter() - .map(|docs| build_columnar(&docs[..])) - .collect::>(); - let columnar_readers_arr: Vec<&ColumnarReader> = columnar_readers.iter().collect(); - let mut output: Vec = Vec::new(); - let stack_merge_order = StackMergeOrder::stack(&columnar_readers_arr[..]); - crate::merge_columnar( - &columnar_readers_arr[..], - &[], - crate::MergeRowOrder::Stack(stack_merge_order), - &mut output, - ) - .unwrap(); - let merged_columnar = ColumnarReader::open(output).unwrap(); - let concat_rows: Vec> = - columnar_docs.iter().flatten().cloned().collect(); - let expected_merged_columnar = build_columnar(&concat_rows[..]); - assert_columnar_eq_strict(&merged_columnar, &expected_merged_columnar); + test_columnar_docs(columnar_docs); } // TODO add non trivial remap and merge