From 2ce485b8cc3534fb92d8ca91c9f3ef6ac8dd02ea Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Tue, 11 Jun 2024 19:03:33 +0800 Subject: [PATCH] skip estimate phase for merge multivalue index precompute stats for merge multivalue index + disable Line encoding for multivalue index. That combination allows to skip the first estimation pass. This gives up to 2x on merge performance on multivalue indices. This change may decrease compression as Line is very good compressible for documents, which have a fixed amount of values in each doc. The line codec should be replaced. ``` merge_multi_and_multi Avg: 22.7880ms (-47.15%) Median: 22.5469ms (-47.38%) [22.3691ms .. 25.8392ms] merge_dense_and_dense Avg: 14.4398ms (+2.18%) Median: 14.2465ms (+0.74%) [14.1620ms .. 16.1270ms] merge_sparse_and_sparse Avg: 10.6559ms (+1.10%) Median: 10.6318ms (+0.91%) [10.5527ms .. 11.2848ms] merge_sparse_and_dense Avg: 12.4886ms (+1.52%) Median: 12.4044ms (+0.84%) [12.3261ms .. 13.9439ms] merge_multi_and_dense Avg: 25.6686ms (-45.56%) Median: 25.4851ms (-45.84%) [25.1618ms .. 27.6226ms] merge_multi_and_sparse Avg: 24.3278ms (-47.00%) Median: 24.1917ms (-47.34%) [23.7159ms .. 27.0513ms] ``` --- columnar/benches/bench_merge.rs | 10 +--- columnar/src/column_index/merge/mod.rs | 26 +++++--- columnar/src/column_index/merge/shuffled.rs | 5 +- columnar/src/column_index/merge/stacked.rs | 15 ++++- .../src/column_index/multivalued_index.rs | 23 +++++--- columnar/src/column_index/serialize.rs | 19 +++--- columnar/src/column_values/mod.rs | 3 +- .../src/column_values/u64_based/bitpacked.rs | 3 + .../u64_based/blockwise_linear.rs | 4 ++ .../src/column_values/u64_based/linear.rs | 6 ++ columnar/src/column_values/u64_based/mod.rs | 39 ++++++++---- columnar/src/columnar/merge/mod.rs | 39 +++++++++--- columnar/src/columnar/writer/mod.rs | 10 +++- columnar/src/tests.rs | 59 ++++++++----------- 14 files changed, 173 insertions(+), 88 deletions(-) diff --git a/columnar/benches/bench_merge.rs b/columnar/benches/bench_merge.rs index 2e8d19864..665e5a48a 100644 --- a/columnar/benches/bench_merge.rs +++ b/columnar/benches/bench_merge.rs @@ -22,7 +22,7 @@ impl Display for Card { } } -const NUM_DOCS: u32 = 100_000; +const NUM_DOCS: u32 = 1_000_000; fn generate_columnar(card: Card, num_docs: u32) -> ColumnarReader { use tantivy_columnar::ColumnarWriter; @@ -88,12 +88,8 @@ fn main() { let columnar_readers = columnar_readers.iter().collect::>(); let merge_row_order = StackMergeOrder::stack(&columnar_readers[..]); - let _ = black_box(merge_columnar( - &columnar_readers, - &[], - merge_row_order.into(), - &mut out, - )); + merge_columnar(&columnar_readers, &[], merge_row_order.into(), &mut out).unwrap(); + black_box(out); }, ); } diff --git a/columnar/src/column_index/merge/mod.rs b/columnar/src/column_index/merge/mod.rs index 1aec9f71c..cc584fd40 100644 --- a/columnar/src/column_index/merge/mod.rs +++ b/columnar/src/column_index/merge/mod.rs @@ -73,14 +73,18 @@ fn detect_cardinality( pub fn merge_column_index<'a>( columns: &'a [ColumnIndex], merge_row_order: &'a MergeRowOrder, + num_values: u32, ) -> SerializableColumnIndex<'a> { // For simplification, we do not try to detect whether the cardinality could be // downgraded thanks to deletes. let cardinality_after_merge = detect_cardinality(columns, merge_row_order); match merge_row_order { - MergeRowOrder::Stack(stack_merge_order) => { - merge_column_index_stacked(columns, cardinality_after_merge, stack_merge_order) - } + MergeRowOrder::Stack(stack_merge_order) => merge_column_index_stacked( + columns, + cardinality_after_merge, + stack_merge_order, + num_values, + ), MergeRowOrder::Shuffled(complex_merge_order) => { merge_column_index_shuffled(columns, cardinality_after_merge, complex_merge_order) } @@ -167,8 +171,12 @@ mod tests { ], ) .into(); - let merged_column_index = merge_column_index(&column_indexes[..], &merge_row_order); - let SerializableColumnIndex::Multivalued(start_index_iterable) = merged_column_index else { + let merged_column_index = merge_column_index(&column_indexes[..], &merge_row_order, 3); + let SerializableColumnIndex::Multivalued { + indices: start_index_iterable, + .. + } = merged_column_index + else { panic!("Excpected a multivalued index") }; let start_indexes: Vec = start_index_iterable.boxed_iter().collect(); @@ -200,8 +208,12 @@ mod tests { ], ) .into(); - let merged_column_index = merge_column_index(&column_indexes[..], &merge_row_order); - let SerializableColumnIndex::Multivalued(start_index_iterable) = merged_column_index else { + let merged_column_index = merge_column_index(&column_indexes[..], &merge_row_order, 6); + let SerializableColumnIndex::Multivalued { + indices: start_index_iterable, + .. + } = merged_column_index + else { panic!("Excpected a multivalued index") }; let start_indexes: Vec = start_index_iterable.boxed_iter().collect(); diff --git a/columnar/src/column_index/merge/shuffled.rs b/columnar/src/column_index/merge/shuffled.rs index f93b89635..400cedf6a 100644 --- a/columnar/src/column_index/merge/shuffled.rs +++ b/columnar/src/column_index/merge/shuffled.rs @@ -22,7 +22,10 @@ pub fn merge_column_index_shuffled<'a>( Cardinality::Multivalued => { let multivalue_start_index = merge_column_index_shuffled_multivalued(column_indexes, shuffle_merge_order); - SerializableColumnIndex::Multivalued(multivalue_start_index) + SerializableColumnIndex::Multivalued { + indices: multivalue_start_index, + stats: None, + } } } } diff --git a/columnar/src/column_index/merge/stacked.rs b/columnar/src/column_index/merge/stacked.rs index ba91b8d64..2f9c1c008 100644 --- a/columnar/src/column_index/merge/stacked.rs +++ b/columnar/src/column_index/merge/stacked.rs @@ -1,6 +1,8 @@ use std::iter; +use std::num::NonZeroU64; use crate::column_index::{SerializableColumnIndex, Set}; +use crate::column_values::ColumnStats; use crate::iterable::Iterable; use crate::{Cardinality, ColumnIndex, RowId, StackMergeOrder}; @@ -12,6 +14,7 @@ pub fn merge_column_index_stacked<'a>( columns: &'a [ColumnIndex], cardinality_after_merge: Cardinality, stack_merge_order: &'a StackMergeOrder, + num_values: u32, ) -> SerializableColumnIndex<'a> { match cardinality_after_merge { Cardinality::Full => SerializableColumnIndex::Full, @@ -27,7 +30,17 @@ pub fn merge_column_index_stacked<'a>( columns, stack_merge_order, }; - SerializableColumnIndex::Multivalued(Box::new(stacked_multivalued_index)) + SerializableColumnIndex::Multivalued { + indices: Box::new(stacked_multivalued_index), + stats: Some(ColumnStats { + gcd: NonZeroU64::new(1).unwrap(), + // The values in the multivalue index are the positions of the values + min_value: 0, + max_value: num_values as u64, + // This is num docs, but it starts at 0 so we need +1 + num_rows: stack_merge_order.num_rows() + 1, + }), + } } } } diff --git a/columnar/src/column_index/multivalued_index.rs b/columnar/src/column_index/multivalued_index.rs index eab82a3e3..96f5c565d 100644 --- a/columnar/src/column_index/multivalued_index.rs +++ b/columnar/src/column_index/multivalued_index.rs @@ -6,20 +6,29 @@ use std::sync::Arc; use common::OwnedBytes; use crate::column_values::{ - load_u64_based_column_values, serialize_u64_based_column_values, CodecType, ColumnValues, + load_u64_based_column_values, serialize_u64_based_column_values, + serialize_u64_with_codec_and_stats, CodecType, ColumnStats, ColumnValues, }; use crate::iterable::Iterable; use crate::{DocId, RowId}; pub fn serialize_multivalued_index( multivalued_index: &dyn Iterable, + stats: Option, output: &mut impl Write, ) -> io::Result<()> { - serialize_u64_based_column_values( - multivalued_index, - &[CodecType::Bitpacked, CodecType::Linear], - output, - )?; + if let Some(stats) = stats { + // TODO: Add something with higher compression that doesn't require a full scan upfront + let estimator = CodecType::Bitpacked.estimator(); + assert!(!estimator.requires_full_scan()); + serialize_u64_with_codec_and_stats(multivalued_index, estimator, stats, output)?; + } else { + serialize_u64_based_column_values( + multivalued_index, + &[CodecType::Bitpacked, CodecType::Linear], + output, + )?; + } Ok(()) } @@ -52,7 +61,7 @@ impl From>> for MultiValueIndex { impl MultiValueIndex { pub fn for_test(start_offsets: &[RowId]) -> MultiValueIndex { let mut buffer = Vec::new(); - serialize_multivalued_index(&start_offsets, &mut buffer).unwrap(); + serialize_multivalued_index(&start_offsets, None, &mut buffer).unwrap(); let bytes = OwnedBytes::new(buffer); open_multivalued_index(bytes).unwrap() } diff --git a/columnar/src/column_index/serialize.rs b/columnar/src/column_index/serialize.rs index f2a99c740..a4ca0c4ea 100644 --- a/columnar/src/column_index/serialize.rs +++ b/columnar/src/column_index/serialize.rs @@ -6,6 +6,7 @@ use common::{CountingWriter, OwnedBytes}; use crate::column_index::multivalued_index::serialize_multivalued_index; use crate::column_index::optional_index::serialize_optional_index; use crate::column_index::ColumnIndex; +use crate::column_values::ColumnStats; use crate::iterable::Iterable; use crate::{Cardinality, RowId}; @@ -15,9 +16,12 @@ pub enum SerializableColumnIndex<'a> { non_null_row_ids: Box + 'a>, num_rows: RowId, }, - // TODO remove the Arc apart from serialization this is not - // dynamic at all. - Multivalued(Box + 'a>), + Multivalued { + /// Iterator emitting the indices for the index + indices: Box + 'a>, + /// In the merge case we can precompute the column stats + stats: Option, + }, } impl<'a> SerializableColumnIndex<'a> { @@ -25,7 +29,7 @@ impl<'a> SerializableColumnIndex<'a> { match self { SerializableColumnIndex::Full => Cardinality::Full, SerializableColumnIndex::Optional { .. } => Cardinality::Optional, - SerializableColumnIndex::Multivalued(_) => Cardinality::Multivalued, + SerializableColumnIndex::Multivalued { .. } => Cardinality::Multivalued, } } } @@ -44,9 +48,10 @@ pub fn serialize_column_index( non_null_row_ids, num_rows, } => serialize_optional_index(non_null_row_ids.as_ref(), num_rows, &mut output)?, - SerializableColumnIndex::Multivalued(multivalued_index) => { - serialize_multivalued_index(&*multivalued_index, &mut output)? - } + SerializableColumnIndex::Multivalued { + indices: multivalued_index, + stats, + } => serialize_multivalued_index(&*multivalued_index, stats, &mut output)?, } let column_index_num_bytes = output.written_bytes() as u32; Ok(column_index_num_bytes) diff --git a/columnar/src/column_values/mod.rs b/columnar/src/column_values/mod.rs index ef5de5154..9c03618a9 100644 --- a/columnar/src/column_values/mod.rs +++ b/columnar/src/column_values/mod.rs @@ -32,7 +32,8 @@ pub use u128_based::{ }; pub use u64_based::{ load_u64_based_column_values, serialize_and_load_u64_based_column_values, - serialize_u64_based_column_values, CodecType, ALL_U64_CODEC_TYPES, + serialize_u64_based_column_values, serialize_u64_with_codec_and_stats, CodecType, + ALL_U64_CODEC_TYPES, }; pub use vec_column::VecColumn; diff --git a/columnar/src/column_values/u64_based/bitpacked.rs b/columnar/src/column_values/u64_based/bitpacked.rs index 3ed999648..50da9a88a 100644 --- a/columnar/src/column_values/u64_based/bitpacked.rs +++ b/columnar/src/column_values/u64_based/bitpacked.rs @@ -128,6 +128,9 @@ impl ColumnCodecEstimator for BitpackedCodecEstimator { bit_packer.close(wrt)?; Ok(()) } + fn codec_type(&self) -> super::CodecType { + super::CodecType::Bitpacked + } } pub struct BitpackedCodec; diff --git a/columnar/src/column_values/u64_based/blockwise_linear.rs b/columnar/src/column_values/u64_based/blockwise_linear.rs index 2abf8205b..f1022aa71 100644 --- a/columnar/src/column_values/u64_based/blockwise_linear.rs +++ b/columnar/src/column_values/u64_based/blockwise_linear.rs @@ -163,6 +163,10 @@ impl ColumnCodecEstimator for BlockwiseLinearEstimator { Ok(()) } + + fn codec_type(&self) -> super::CodecType { + super::CodecType::BlockwiseLinear + } } pub struct BlockwiseLinearCodec; diff --git a/columnar/src/column_values/u64_based/linear.rs b/columnar/src/column_values/u64_based/linear.rs index ba0c9e641..bdbb23b14 100644 --- a/columnar/src/column_values/u64_based/linear.rs +++ b/columnar/src/column_values/u64_based/linear.rs @@ -153,6 +153,12 @@ impl ColumnCodecEstimator for LinearCodecEstimator { self.collect_before_line_estimation(value); } } + fn requires_full_scan(&self) -> bool { + true + } + fn codec_type(&self) -> super::CodecType { + super::CodecType::Linear + } } impl LinearCodecEstimator { diff --git a/columnar/src/column_values/u64_based/mod.rs b/columnar/src/column_values/u64_based/mod.rs index 7afc71e3f..e9310be63 100644 --- a/columnar/src/column_values/u64_based/mod.rs +++ b/columnar/src/column_values/u64_based/mod.rs @@ -37,7 +37,11 @@ pub trait ColumnCodecEstimator: 'static { /// This method will be called for each element of the column during /// `estimation`. fn collect(&mut self, value: u64); - /// Finalizes the first pass phase. + /// Returns true if the estimator needs a full pass over the column before serialization + fn requires_full_scan(&self) -> bool { + false + } + fn codec_type(&self) -> CodecType; fn finalize(&mut self) {} /// Returns an accurate estimation of the number of bytes that will /// be used to represent this column. @@ -150,34 +154,45 @@ pub fn serialize_u64_based_column_values( wrt: &mut dyn Write, ) -> io::Result<()> { let mut stats_collector = StatsCollector::default(); - let mut estimators: Vec<(CodecType, Box)> = - Vec::with_capacity(codec_types.len()); + let mut estimators: Vec> = Vec::with_capacity(codec_types.len()); for &codec_type in codec_types { - estimators.push((codec_type, codec_type.estimator())); + estimators.push(codec_type.estimator()); } for val in vals.boxed_iter() { let val_u64 = val.to_u64(); stats_collector.collect(val_u64); - for (_, estimator) in &mut estimators { + for estimator in &mut estimators { estimator.collect(val_u64); } } - for (_, estimator) in &mut estimators { + for estimator in &mut estimators { estimator.finalize(); } let stats = stats_collector.stats(); - let (_, best_codec, best_codec_estimator) = estimators + let (_, best_codec) = estimators .into_iter() - .flat_map(|(codec_type, estimator)| { + .flat_map(|estimator| { let num_bytes = estimator.estimate(&stats)?; - Some((num_bytes, codec_type, estimator)) + Some((num_bytes, estimator)) }) - .min_by_key(|(num_bytes, _, _)| *num_bytes) + .min_by_key(|(num_bytes, _)| *num_bytes) .ok_or_else(|| { io::Error::new(io::ErrorKind::InvalidData, "No available applicable codec.") })?; - best_codec.to_code().serialize(wrt)?; - best_codec_estimator.serialize( + serialize_u64_with_codec_and_stats(vals, best_codec, stats, wrt)?; + Ok(()) +} + +/// Serializes a given column of u64-mapped values. +/// The codec estimator needs to be collected fully for the Line codec before calling this. +pub fn serialize_u64_with_codec_and_stats( + vals: &dyn Iterable, + codec: Box, + stats: ColumnStats, + wrt: &mut dyn Write, +) -> io::Result<()> { + codec.codec_type().to_code().serialize(wrt)?; + codec.serialize( &stats, &mut vals.boxed_iter().map(MonotonicallyMappableToU64::to_u64), wrt, diff --git a/columnar/src/columnar/merge/mod.rs b/columnar/src/columnar/merge/mod.rs index 9f7666e8f..38a9d302e 100644 --- a/columnar/src/columnar/merge/mod.rs +++ b/columnar/src/columnar/merge/mod.rs @@ -3,7 +3,7 @@ mod merge_mapping; mod term_merger; use std::collections::{BTreeMap, HashSet}; -use std::io; +use std::io::{self}; use std::net::Ipv6Addr; use std::sync::Arc; @@ -156,8 +156,15 @@ fn merge_column( column_values.push(None); } } - let merged_column_index = - crate::column_index::merge_column_index(&column_indexes[..], merge_row_order); + let num_values: u32 = column_values + .iter() + .map(|vals| vals.as_ref().map(|idx| idx.num_vals()).unwrap_or(0)) + .sum(); + let merged_column_index = crate::column_index::merge_column_index( + &column_indexes[..], + merge_row_order, + num_values, + ); let merge_column_values = MergedColumnValues { column_indexes: &column_indexes[..], column_values: &column_values[..], @@ -183,8 +190,15 @@ fn merge_column( } } - let merged_column_index = - crate::column_index::merge_column_index(&column_indexes[..], merge_row_order); + let num_values: u32 = column_values + .iter() + .map(|vals| vals.as_ref().map(|idx| idx.num_vals()).unwrap_or(0)) + .sum(); + let merged_column_index = crate::column_index::merge_column_index( + &column_indexes[..], + merge_row_order, + num_values, + ); let merge_column_values = MergedColumnValues { column_indexes: &column_indexes[..], column_values: &column_values, @@ -214,8 +228,19 @@ fn merge_column( } } } - let merged_column_index = - crate::column_index::merge_column_index(&column_indexes[..], merge_row_order); + let num_values: u32 = bytes_columns + .iter() + .map(|vals| { + vals.as_ref() + .map(|idx| idx.term_ord_column.values.num_vals()) + .unwrap_or(0) + }) + .sum(); + let merged_column_index = crate::column_index::merge_column_index( + &column_indexes[..], + merge_row_order, + num_values, + ); merge_bytes_or_str_column(merged_column_index, &bytes_columns, merge_row_order, wrt)?; } } diff --git a/columnar/src/columnar/writer/mod.rs b/columnar/src/columnar/writer/mod.rs index 1fbc9d85d..9aed64a24 100644 --- a/columnar/src/columnar/writer/mod.rs +++ b/columnar/src/columnar/writer/mod.rs @@ -644,7 +644,10 @@ fn send_to_serialize_column_mappable_to_u128< let multivalued_index_builder = value_index_builders.borrow_multivalued_index_builder(); consume_operation_iterator(op_iterator, multivalued_index_builder, values); let multivalued_index = multivalued_index_builder.finish(num_rows); - SerializableColumnIndex::Multivalued(Box::new(multivalued_index)) + SerializableColumnIndex::Multivalued { + indices: Box::new(multivalued_index), + stats: Default::default(), // TODO: implement stats for u128 + } } }; crate::column::serialize_column_mappable_to_u128( @@ -699,7 +702,10 @@ fn send_to_serialize_column_mappable_to_u64( if sort_values_within_row { sort_values_within_row_in_place(multivalued_index, values); } - SerializableColumnIndex::Multivalued(Box::new(multivalued_index)) + SerializableColumnIndex::Multivalued { + indices: Box::new(multivalued_index), + stats: None, + } } }; crate::column::serialize_column_mappable_to_u64( diff --git a/columnar/src/tests.rs b/columnar/src/tests.rs index 5e5c50f55..f6e99cad2 100644 --- a/columnar/src/tests.rs +++ b/columnar/src/tests.rs @@ -738,35 +738,22 @@ proptest! { #![proptest_config(ProptestConfig::with_cases(1000))] #[test] fn test_columnar_merge_proptest(columnar_docs in proptest::collection::vec(columnar_docs_strategy(), 2..=3)) { - let columnar_readers: Vec = columnar_docs.iter() - .map(|docs| build_columnar(&docs[..])) - .collect::>(); - let columnar_readers_arr: Vec<&ColumnarReader> = columnar_readers.iter().collect(); - let mut output: Vec = Vec::new(); - let stack_merge_order = StackMergeOrder::stack(&columnar_readers_arr[..]).into(); - crate::merge_columnar(&columnar_readers_arr[..], &[], stack_merge_order, &mut output).unwrap(); - let merged_columnar = ColumnarReader::open(output).unwrap(); - let concat_rows: Vec> = columnar_docs.iter().flatten().cloned().collect(); - let expected_merged_columnar = build_columnar(&concat_rows[..]); - assert_columnar_eq_strict(&merged_columnar, &expected_merged_columnar); + test_columnar_docs(columnar_docs); } } -#[test] -fn test_columnar_merging_empty_columnar() { - let columnar_docs: Vec>> = - vec![vec![], vec![vec![("c1", ColumnValue::Str("a"))]]]; +fn test_columnar_docs(columnar_docs: Vec>>) { let columnar_readers: Vec = columnar_docs .iter() .map(|docs| build_columnar(&docs[..])) .collect::>(); let columnar_readers_arr: Vec<&ColumnarReader> = columnar_readers.iter().collect(); let mut output: Vec = Vec::new(); - let stack_merge_order = StackMergeOrder::stack(&columnar_readers_arr[..]); + let stack_merge_order = StackMergeOrder::stack(&columnar_readers_arr[..]).into(); crate::merge_columnar( &columnar_readers_arr[..], &[], - crate::MergeRowOrder::Stack(stack_merge_order), + stack_merge_order, &mut output, ) .unwrap(); @@ -777,6 +764,24 @@ fn test_columnar_merging_empty_columnar() { assert_columnar_eq_strict(&merged_columnar, &expected_merged_columnar); } +#[test] +fn test_columnar_merging_empty_columnar() { + let columnar_docs: Vec>> = + vec![vec![], vec![vec![("c1", ColumnValue::Str("a"))]]]; + test_columnar_docs(columnar_docs); +} +#[test] +fn test_columnar_merging_simple() { + let columnar_docs: Vec>> = vec![ + vec![], + vec![vec![ + ("c1", ColumnValue::Numerical(0u64.into())), + ("c1", ColumnValue::Numerical(0u64.into())), + ]], + ]; + test_columnar_docs(columnar_docs); +} + #[test] fn test_columnar_merging_number_columns() { let columnar_docs: Vec>> = vec![ @@ -793,25 +798,7 @@ fn test_columnar_merging_number_columns() { vec![("c2", ColumnValue::Numerical(u64::MAX.into()))], ], ]; - let columnar_readers: Vec = columnar_docs - .iter() - .map(|docs| build_columnar(&docs[..])) - .collect::>(); - let columnar_readers_arr: Vec<&ColumnarReader> = columnar_readers.iter().collect(); - let mut output: Vec = Vec::new(); - let stack_merge_order = StackMergeOrder::stack(&columnar_readers_arr[..]); - crate::merge_columnar( - &columnar_readers_arr[..], - &[], - crate::MergeRowOrder::Stack(stack_merge_order), - &mut output, - ) - .unwrap(); - let merged_columnar = ColumnarReader::open(output).unwrap(); - let concat_rows: Vec> = - columnar_docs.iter().flatten().cloned().collect(); - let expected_merged_columnar = build_columnar(&concat_rows[..]); - assert_columnar_eq_strict(&merged_columnar, &expected_merged_columnar); + test_columnar_docs(columnar_docs); } // TODO add non trivial remap and merge