skip estimate phase for merge multivalue index

precompute stats for merge multivalue index + disable Line encoding for
multivalue index. That combination allows to skip the first estimation
pass. This gives up to 2x on merge performance on multivalue indices.

This change may decrease compression as Line is
very good compressible for documents, which have a fixed amount of
values in each doc. The line codec should be replaced.

```
merge_multi_and_multi          Avg: 22.7880ms (-47.15%)    Median: 22.5469ms (-47.38%)    [22.3691ms .. 25.8392ms]
merge_dense_and_dense          Avg: 14.4398ms (+2.18%)     Median: 14.2465ms (+0.74%)     [14.1620ms .. 16.1270ms]
merge_sparse_and_sparse        Avg: 10.6559ms (+1.10%)     Median: 10.6318ms (+0.91%)     [10.5527ms .. 11.2848ms]
merge_sparse_and_dense         Avg: 12.4886ms (+1.52%)     Median: 12.4044ms (+0.84%)     [12.3261ms .. 13.9439ms]
merge_multi_and_dense          Avg: 25.6686ms (-45.56%)    Median: 25.4851ms (-45.84%)    [25.1618ms .. 27.6226ms]
merge_multi_and_sparse         Avg: 24.3278ms (-47.00%)    Median: 24.1917ms (-47.34%)    [23.7159ms .. 27.0513ms]
```
This commit is contained in:
Pascal Seitz
2024-06-11 19:03:33 +08:00
parent c3b92a5412
commit 2ce485b8cc
14 changed files with 173 additions and 88 deletions

View File

@@ -22,7 +22,7 @@ impl Display for Card {
} }
} }
const NUM_DOCS: u32 = 100_000; const NUM_DOCS: u32 = 1_000_000;
fn generate_columnar(card: Card, num_docs: u32) -> ColumnarReader { fn generate_columnar(card: Card, num_docs: u32) -> ColumnarReader {
use tantivy_columnar::ColumnarWriter; use tantivy_columnar::ColumnarWriter;
@@ -88,12 +88,8 @@ fn main() {
let columnar_readers = columnar_readers.iter().collect::<Vec<_>>(); let columnar_readers = columnar_readers.iter().collect::<Vec<_>>();
let merge_row_order = StackMergeOrder::stack(&columnar_readers[..]); let merge_row_order = StackMergeOrder::stack(&columnar_readers[..]);
let _ = black_box(merge_columnar( merge_columnar(&columnar_readers, &[], merge_row_order.into(), &mut out).unwrap();
&columnar_readers, black_box(out);
&[],
merge_row_order.into(),
&mut out,
));
}, },
); );
} }

View File

@@ -73,14 +73,18 @@ fn detect_cardinality(
pub fn merge_column_index<'a>( pub fn merge_column_index<'a>(
columns: &'a [ColumnIndex], columns: &'a [ColumnIndex],
merge_row_order: &'a MergeRowOrder, merge_row_order: &'a MergeRowOrder,
num_values: u32,
) -> SerializableColumnIndex<'a> { ) -> SerializableColumnIndex<'a> {
// For simplification, we do not try to detect whether the cardinality could be // For simplification, we do not try to detect whether the cardinality could be
// downgraded thanks to deletes. // downgraded thanks to deletes.
let cardinality_after_merge = detect_cardinality(columns, merge_row_order); let cardinality_after_merge = detect_cardinality(columns, merge_row_order);
match merge_row_order { match merge_row_order {
MergeRowOrder::Stack(stack_merge_order) => { MergeRowOrder::Stack(stack_merge_order) => merge_column_index_stacked(
merge_column_index_stacked(columns, cardinality_after_merge, stack_merge_order) columns,
} cardinality_after_merge,
stack_merge_order,
num_values,
),
MergeRowOrder::Shuffled(complex_merge_order) => { MergeRowOrder::Shuffled(complex_merge_order) => {
merge_column_index_shuffled(columns, cardinality_after_merge, complex_merge_order) merge_column_index_shuffled(columns, cardinality_after_merge, complex_merge_order)
} }
@@ -167,8 +171,12 @@ mod tests {
], ],
) )
.into(); .into();
let merged_column_index = merge_column_index(&column_indexes[..], &merge_row_order); let merged_column_index = merge_column_index(&column_indexes[..], &merge_row_order, 3);
let SerializableColumnIndex::Multivalued(start_index_iterable) = merged_column_index else { let SerializableColumnIndex::Multivalued {
indices: start_index_iterable,
..
} = merged_column_index
else {
panic!("Excpected a multivalued index") panic!("Excpected a multivalued index")
}; };
let start_indexes: Vec<RowId> = start_index_iterable.boxed_iter().collect(); let start_indexes: Vec<RowId> = start_index_iterable.boxed_iter().collect();
@@ -200,8 +208,12 @@ mod tests {
], ],
) )
.into(); .into();
let merged_column_index = merge_column_index(&column_indexes[..], &merge_row_order); let merged_column_index = merge_column_index(&column_indexes[..], &merge_row_order, 6);
let SerializableColumnIndex::Multivalued(start_index_iterable) = merged_column_index else { let SerializableColumnIndex::Multivalued {
indices: start_index_iterable,
..
} = merged_column_index
else {
panic!("Excpected a multivalued index") panic!("Excpected a multivalued index")
}; };
let start_indexes: Vec<RowId> = start_index_iterable.boxed_iter().collect(); let start_indexes: Vec<RowId> = start_index_iterable.boxed_iter().collect();

View File

@@ -22,7 +22,10 @@ pub fn merge_column_index_shuffled<'a>(
Cardinality::Multivalued => { Cardinality::Multivalued => {
let multivalue_start_index = let multivalue_start_index =
merge_column_index_shuffled_multivalued(column_indexes, shuffle_merge_order); merge_column_index_shuffled_multivalued(column_indexes, shuffle_merge_order);
SerializableColumnIndex::Multivalued(multivalue_start_index) SerializableColumnIndex::Multivalued {
indices: multivalue_start_index,
stats: None,
}
} }
} }
} }

View File

@@ -1,6 +1,8 @@
use std::iter; use std::iter;
use std::num::NonZeroU64;
use crate::column_index::{SerializableColumnIndex, Set}; use crate::column_index::{SerializableColumnIndex, Set};
use crate::column_values::ColumnStats;
use crate::iterable::Iterable; use crate::iterable::Iterable;
use crate::{Cardinality, ColumnIndex, RowId, StackMergeOrder}; use crate::{Cardinality, ColumnIndex, RowId, StackMergeOrder};
@@ -12,6 +14,7 @@ pub fn merge_column_index_stacked<'a>(
columns: &'a [ColumnIndex], columns: &'a [ColumnIndex],
cardinality_after_merge: Cardinality, cardinality_after_merge: Cardinality,
stack_merge_order: &'a StackMergeOrder, stack_merge_order: &'a StackMergeOrder,
num_values: u32,
) -> SerializableColumnIndex<'a> { ) -> SerializableColumnIndex<'a> {
match cardinality_after_merge { match cardinality_after_merge {
Cardinality::Full => SerializableColumnIndex::Full, Cardinality::Full => SerializableColumnIndex::Full,
@@ -27,7 +30,17 @@ pub fn merge_column_index_stacked<'a>(
columns, columns,
stack_merge_order, stack_merge_order,
}; };
SerializableColumnIndex::Multivalued(Box::new(stacked_multivalued_index)) SerializableColumnIndex::Multivalued {
indices: Box::new(stacked_multivalued_index),
stats: Some(ColumnStats {
gcd: NonZeroU64::new(1).unwrap(),
// The values in the multivalue index are the positions of the values
min_value: 0,
max_value: num_values as u64,
// This is num docs, but it starts at 0 so we need +1
num_rows: stack_merge_order.num_rows() + 1,
}),
}
} }
} }
} }

View File

@@ -6,20 +6,29 @@ use std::sync::Arc;
use common::OwnedBytes; use common::OwnedBytes;
use crate::column_values::{ use crate::column_values::{
load_u64_based_column_values, serialize_u64_based_column_values, CodecType, ColumnValues, load_u64_based_column_values, serialize_u64_based_column_values,
serialize_u64_with_codec_and_stats, CodecType, ColumnStats, ColumnValues,
}; };
use crate::iterable::Iterable; use crate::iterable::Iterable;
use crate::{DocId, RowId}; use crate::{DocId, RowId};
pub fn serialize_multivalued_index( pub fn serialize_multivalued_index(
multivalued_index: &dyn Iterable<RowId>, multivalued_index: &dyn Iterable<RowId>,
stats: Option<ColumnStats>,
output: &mut impl Write, output: &mut impl Write,
) -> io::Result<()> { ) -> io::Result<()> {
serialize_u64_based_column_values( if let Some(stats) = stats {
multivalued_index, // TODO: Add something with higher compression that doesn't require a full scan upfront
&[CodecType::Bitpacked, CodecType::Linear], let estimator = CodecType::Bitpacked.estimator();
output, assert!(!estimator.requires_full_scan());
)?; serialize_u64_with_codec_and_stats(multivalued_index, estimator, stats, output)?;
} else {
serialize_u64_based_column_values(
multivalued_index,
&[CodecType::Bitpacked, CodecType::Linear],
output,
)?;
}
Ok(()) Ok(())
} }
@@ -52,7 +61,7 @@ impl From<Arc<dyn ColumnValues<RowId>>> for MultiValueIndex {
impl MultiValueIndex { impl MultiValueIndex {
pub fn for_test(start_offsets: &[RowId]) -> MultiValueIndex { pub fn for_test(start_offsets: &[RowId]) -> MultiValueIndex {
let mut buffer = Vec::new(); let mut buffer = Vec::new();
serialize_multivalued_index(&start_offsets, &mut buffer).unwrap(); serialize_multivalued_index(&start_offsets, None, &mut buffer).unwrap();
let bytes = OwnedBytes::new(buffer); let bytes = OwnedBytes::new(buffer);
open_multivalued_index(bytes).unwrap() open_multivalued_index(bytes).unwrap()
} }

View File

@@ -6,6 +6,7 @@ use common::{CountingWriter, OwnedBytes};
use crate::column_index::multivalued_index::serialize_multivalued_index; use crate::column_index::multivalued_index::serialize_multivalued_index;
use crate::column_index::optional_index::serialize_optional_index; use crate::column_index::optional_index::serialize_optional_index;
use crate::column_index::ColumnIndex; use crate::column_index::ColumnIndex;
use crate::column_values::ColumnStats;
use crate::iterable::Iterable; use crate::iterable::Iterable;
use crate::{Cardinality, RowId}; use crate::{Cardinality, RowId};
@@ -15,9 +16,12 @@ pub enum SerializableColumnIndex<'a> {
non_null_row_ids: Box<dyn Iterable<RowId> + 'a>, non_null_row_ids: Box<dyn Iterable<RowId> + 'a>,
num_rows: RowId, num_rows: RowId,
}, },
// TODO remove the Arc<dyn> apart from serialization this is not Multivalued {
// dynamic at all. /// Iterator emitting the indices for the index
Multivalued(Box<dyn Iterable<RowId> + 'a>), indices: Box<dyn Iterable<RowId> + 'a>,
/// In the merge case we can precompute the column stats
stats: Option<ColumnStats>,
},
} }
impl<'a> SerializableColumnIndex<'a> { impl<'a> SerializableColumnIndex<'a> {
@@ -25,7 +29,7 @@ impl<'a> SerializableColumnIndex<'a> {
match self { match self {
SerializableColumnIndex::Full => Cardinality::Full, SerializableColumnIndex::Full => Cardinality::Full,
SerializableColumnIndex::Optional { .. } => Cardinality::Optional, SerializableColumnIndex::Optional { .. } => Cardinality::Optional,
SerializableColumnIndex::Multivalued(_) => Cardinality::Multivalued, SerializableColumnIndex::Multivalued { .. } => Cardinality::Multivalued,
} }
} }
} }
@@ -44,9 +48,10 @@ pub fn serialize_column_index(
non_null_row_ids, non_null_row_ids,
num_rows, num_rows,
} => serialize_optional_index(non_null_row_ids.as_ref(), num_rows, &mut output)?, } => serialize_optional_index(non_null_row_ids.as_ref(), num_rows, &mut output)?,
SerializableColumnIndex::Multivalued(multivalued_index) => { SerializableColumnIndex::Multivalued {
serialize_multivalued_index(&*multivalued_index, &mut output)? indices: multivalued_index,
} stats,
} => serialize_multivalued_index(&*multivalued_index, stats, &mut output)?,
} }
let column_index_num_bytes = output.written_bytes() as u32; let column_index_num_bytes = output.written_bytes() as u32;
Ok(column_index_num_bytes) Ok(column_index_num_bytes)

View File

@@ -32,7 +32,8 @@ pub use u128_based::{
}; };
pub use u64_based::{ pub use u64_based::{
load_u64_based_column_values, serialize_and_load_u64_based_column_values, load_u64_based_column_values, serialize_and_load_u64_based_column_values,
serialize_u64_based_column_values, CodecType, ALL_U64_CODEC_TYPES, serialize_u64_based_column_values, serialize_u64_with_codec_and_stats, CodecType,
ALL_U64_CODEC_TYPES,
}; };
pub use vec_column::VecColumn; pub use vec_column::VecColumn;

View File

@@ -128,6 +128,9 @@ impl ColumnCodecEstimator for BitpackedCodecEstimator {
bit_packer.close(wrt)?; bit_packer.close(wrt)?;
Ok(()) Ok(())
} }
fn codec_type(&self) -> super::CodecType {
super::CodecType::Bitpacked
}
} }
pub struct BitpackedCodec; pub struct BitpackedCodec;

View File

@@ -163,6 +163,10 @@ impl ColumnCodecEstimator for BlockwiseLinearEstimator {
Ok(()) Ok(())
} }
fn codec_type(&self) -> super::CodecType {
super::CodecType::BlockwiseLinear
}
} }
pub struct BlockwiseLinearCodec; pub struct BlockwiseLinearCodec;

View File

@@ -153,6 +153,12 @@ impl ColumnCodecEstimator for LinearCodecEstimator {
self.collect_before_line_estimation(value); self.collect_before_line_estimation(value);
} }
} }
fn requires_full_scan(&self) -> bool {
true
}
fn codec_type(&self) -> super::CodecType {
super::CodecType::Linear
}
} }
impl LinearCodecEstimator { impl LinearCodecEstimator {

View File

@@ -37,7 +37,11 @@ pub trait ColumnCodecEstimator<T = u64>: 'static {
/// This method will be called for each element of the column during /// This method will be called for each element of the column during
/// `estimation`. /// `estimation`.
fn collect(&mut self, value: u64); fn collect(&mut self, value: u64);
/// Finalizes the first pass phase. /// Returns true if the estimator needs a full pass over the column before serialization
fn requires_full_scan(&self) -> bool {
false
}
fn codec_type(&self) -> CodecType;
fn finalize(&mut self) {} fn finalize(&mut self) {}
/// Returns an accurate estimation of the number of bytes that will /// Returns an accurate estimation of the number of bytes that will
/// be used to represent this column. /// be used to represent this column.
@@ -150,34 +154,45 @@ pub fn serialize_u64_based_column_values<T: MonotonicallyMappableToU64>(
wrt: &mut dyn Write, wrt: &mut dyn Write,
) -> io::Result<()> { ) -> io::Result<()> {
let mut stats_collector = StatsCollector::default(); let mut stats_collector = StatsCollector::default();
let mut estimators: Vec<(CodecType, Box<dyn ColumnCodecEstimator>)> = let mut estimators: Vec<Box<dyn ColumnCodecEstimator>> = Vec::with_capacity(codec_types.len());
Vec::with_capacity(codec_types.len());
for &codec_type in codec_types { for &codec_type in codec_types {
estimators.push((codec_type, codec_type.estimator())); estimators.push(codec_type.estimator());
} }
for val in vals.boxed_iter() { for val in vals.boxed_iter() {
let val_u64 = val.to_u64(); let val_u64 = val.to_u64();
stats_collector.collect(val_u64); stats_collector.collect(val_u64);
for (_, estimator) in &mut estimators { for estimator in &mut estimators {
estimator.collect(val_u64); estimator.collect(val_u64);
} }
} }
for (_, estimator) in &mut estimators { for estimator in &mut estimators {
estimator.finalize(); estimator.finalize();
} }
let stats = stats_collector.stats(); let stats = stats_collector.stats();
let (_, best_codec, best_codec_estimator) = estimators let (_, best_codec) = estimators
.into_iter() .into_iter()
.flat_map(|(codec_type, estimator)| { .flat_map(|estimator| {
let num_bytes = estimator.estimate(&stats)?; let num_bytes = estimator.estimate(&stats)?;
Some((num_bytes, codec_type, estimator)) Some((num_bytes, estimator))
}) })
.min_by_key(|(num_bytes, _, _)| *num_bytes) .min_by_key(|(num_bytes, _)| *num_bytes)
.ok_or_else(|| { .ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidData, "No available applicable codec.") io::Error::new(io::ErrorKind::InvalidData, "No available applicable codec.")
})?; })?;
best_codec.to_code().serialize(wrt)?; serialize_u64_with_codec_and_stats(vals, best_codec, stats, wrt)?;
best_codec_estimator.serialize( Ok(())
}
/// Serializes a given column of u64-mapped values.
/// The codec estimator needs to be collected fully for the Line codec before calling this.
pub fn serialize_u64_with_codec_and_stats<T: MonotonicallyMappableToU64>(
vals: &dyn Iterable<T>,
codec: Box<dyn ColumnCodecEstimator>,
stats: ColumnStats,
wrt: &mut dyn Write,
) -> io::Result<()> {
codec.codec_type().to_code().serialize(wrt)?;
codec.serialize(
&stats, &stats,
&mut vals.boxed_iter().map(MonotonicallyMappableToU64::to_u64), &mut vals.boxed_iter().map(MonotonicallyMappableToU64::to_u64),
wrt, wrt,

View File

@@ -3,7 +3,7 @@ mod merge_mapping;
mod term_merger; mod term_merger;
use std::collections::{BTreeMap, HashSet}; use std::collections::{BTreeMap, HashSet};
use std::io; use std::io::{self};
use std::net::Ipv6Addr; use std::net::Ipv6Addr;
use std::sync::Arc; use std::sync::Arc;
@@ -156,8 +156,15 @@ fn merge_column(
column_values.push(None); column_values.push(None);
} }
} }
let merged_column_index = let num_values: u32 = column_values
crate::column_index::merge_column_index(&column_indexes[..], merge_row_order); .iter()
.map(|vals| vals.as_ref().map(|idx| idx.num_vals()).unwrap_or(0))
.sum();
let merged_column_index = crate::column_index::merge_column_index(
&column_indexes[..],
merge_row_order,
num_values,
);
let merge_column_values = MergedColumnValues { let merge_column_values = MergedColumnValues {
column_indexes: &column_indexes[..], column_indexes: &column_indexes[..],
column_values: &column_values[..], column_values: &column_values[..],
@@ -183,8 +190,15 @@ fn merge_column(
} }
} }
let merged_column_index = let num_values: u32 = column_values
crate::column_index::merge_column_index(&column_indexes[..], merge_row_order); .iter()
.map(|vals| vals.as_ref().map(|idx| idx.num_vals()).unwrap_or(0))
.sum();
let merged_column_index = crate::column_index::merge_column_index(
&column_indexes[..],
merge_row_order,
num_values,
);
let merge_column_values = MergedColumnValues { let merge_column_values = MergedColumnValues {
column_indexes: &column_indexes[..], column_indexes: &column_indexes[..],
column_values: &column_values, column_values: &column_values,
@@ -214,8 +228,19 @@ fn merge_column(
} }
} }
} }
let merged_column_index = let num_values: u32 = bytes_columns
crate::column_index::merge_column_index(&column_indexes[..], merge_row_order); .iter()
.map(|vals| {
vals.as_ref()
.map(|idx| idx.term_ord_column.values.num_vals())
.unwrap_or(0)
})
.sum();
let merged_column_index = crate::column_index::merge_column_index(
&column_indexes[..],
merge_row_order,
num_values,
);
merge_bytes_or_str_column(merged_column_index, &bytes_columns, merge_row_order, wrt)?; merge_bytes_or_str_column(merged_column_index, &bytes_columns, merge_row_order, wrt)?;
} }
} }

View File

@@ -644,7 +644,10 @@ fn send_to_serialize_column_mappable_to_u128<
let multivalued_index_builder = value_index_builders.borrow_multivalued_index_builder(); let multivalued_index_builder = value_index_builders.borrow_multivalued_index_builder();
consume_operation_iterator(op_iterator, multivalued_index_builder, values); consume_operation_iterator(op_iterator, multivalued_index_builder, values);
let multivalued_index = multivalued_index_builder.finish(num_rows); let multivalued_index = multivalued_index_builder.finish(num_rows);
SerializableColumnIndex::Multivalued(Box::new(multivalued_index)) SerializableColumnIndex::Multivalued {
indices: Box::new(multivalued_index),
stats: Default::default(), // TODO: implement stats for u128
}
} }
}; };
crate::column::serialize_column_mappable_to_u128( crate::column::serialize_column_mappable_to_u128(
@@ -699,7 +702,10 @@ fn send_to_serialize_column_mappable_to_u64(
if sort_values_within_row { if sort_values_within_row {
sort_values_within_row_in_place(multivalued_index, values); sort_values_within_row_in_place(multivalued_index, values);
} }
SerializableColumnIndex::Multivalued(Box::new(multivalued_index)) SerializableColumnIndex::Multivalued {
indices: Box::new(multivalued_index),
stats: None,
}
} }
}; };
crate::column::serialize_column_mappable_to_u64( crate::column::serialize_column_mappable_to_u64(

View File

@@ -738,35 +738,22 @@ proptest! {
#![proptest_config(ProptestConfig::with_cases(1000))] #![proptest_config(ProptestConfig::with_cases(1000))]
#[test] #[test]
fn test_columnar_merge_proptest(columnar_docs in proptest::collection::vec(columnar_docs_strategy(), 2..=3)) { fn test_columnar_merge_proptest(columnar_docs in proptest::collection::vec(columnar_docs_strategy(), 2..=3)) {
let columnar_readers: Vec<ColumnarReader> = columnar_docs.iter() test_columnar_docs(columnar_docs);
.map(|docs| build_columnar(&docs[..]))
.collect::<Vec<_>>();
let columnar_readers_arr: Vec<&ColumnarReader> = columnar_readers.iter().collect();
let mut output: Vec<u8> = Vec::new();
let stack_merge_order = StackMergeOrder::stack(&columnar_readers_arr[..]).into();
crate::merge_columnar(&columnar_readers_arr[..], &[], stack_merge_order, &mut output).unwrap();
let merged_columnar = ColumnarReader::open(output).unwrap();
let concat_rows: Vec<Vec<(&'static str, ColumnValue)>> = columnar_docs.iter().flatten().cloned().collect();
let expected_merged_columnar = build_columnar(&concat_rows[..]);
assert_columnar_eq_strict(&merged_columnar, &expected_merged_columnar);
} }
} }
#[test] fn test_columnar_docs(columnar_docs: Vec<Vec<Vec<(&'static str, ColumnValue)>>>) {
fn test_columnar_merging_empty_columnar() {
let columnar_docs: Vec<Vec<Vec<(&str, ColumnValue)>>> =
vec![vec![], vec![vec![("c1", ColumnValue::Str("a"))]]];
let columnar_readers: Vec<ColumnarReader> = columnar_docs let columnar_readers: Vec<ColumnarReader> = columnar_docs
.iter() .iter()
.map(|docs| build_columnar(&docs[..])) .map(|docs| build_columnar(&docs[..]))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let columnar_readers_arr: Vec<&ColumnarReader> = columnar_readers.iter().collect(); let columnar_readers_arr: Vec<&ColumnarReader> = columnar_readers.iter().collect();
let mut output: Vec<u8> = Vec::new(); let mut output: Vec<u8> = Vec::new();
let stack_merge_order = StackMergeOrder::stack(&columnar_readers_arr[..]); let stack_merge_order = StackMergeOrder::stack(&columnar_readers_arr[..]).into();
crate::merge_columnar( crate::merge_columnar(
&columnar_readers_arr[..], &columnar_readers_arr[..],
&[], &[],
crate::MergeRowOrder::Stack(stack_merge_order), stack_merge_order,
&mut output, &mut output,
) )
.unwrap(); .unwrap();
@@ -777,6 +764,24 @@ fn test_columnar_merging_empty_columnar() {
assert_columnar_eq_strict(&merged_columnar, &expected_merged_columnar); assert_columnar_eq_strict(&merged_columnar, &expected_merged_columnar);
} }
#[test]
fn test_columnar_merging_empty_columnar() {
let columnar_docs: Vec<Vec<Vec<(&str, ColumnValue)>>> =
vec![vec![], vec![vec![("c1", ColumnValue::Str("a"))]]];
test_columnar_docs(columnar_docs);
}
#[test]
fn test_columnar_merging_simple() {
let columnar_docs: Vec<Vec<Vec<(&str, ColumnValue)>>> = vec![
vec![],
vec![vec![
("c1", ColumnValue::Numerical(0u64.into())),
("c1", ColumnValue::Numerical(0u64.into())),
]],
];
test_columnar_docs(columnar_docs);
}
#[test] #[test]
fn test_columnar_merging_number_columns() { fn test_columnar_merging_number_columns() {
let columnar_docs: Vec<Vec<Vec<(&str, ColumnValue)>>> = vec![ let columnar_docs: Vec<Vec<Vec<(&str, ColumnValue)>>> = vec![
@@ -793,25 +798,7 @@ fn test_columnar_merging_number_columns() {
vec![("c2", ColumnValue::Numerical(u64::MAX.into()))], vec![("c2", ColumnValue::Numerical(u64::MAX.into()))],
], ],
]; ];
let columnar_readers: Vec<ColumnarReader> = columnar_docs test_columnar_docs(columnar_docs);
.iter()
.map(|docs| build_columnar(&docs[..]))
.collect::<Vec<_>>();
let columnar_readers_arr: Vec<&ColumnarReader> = columnar_readers.iter().collect();
let mut output: Vec<u8> = Vec::new();
let stack_merge_order = StackMergeOrder::stack(&columnar_readers_arr[..]);
crate::merge_columnar(
&columnar_readers_arr[..],
&[],
crate::MergeRowOrder::Stack(stack_merge_order),
&mut output,
)
.unwrap();
let merged_columnar = ColumnarReader::open(output).unwrap();
let concat_rows: Vec<Vec<(&'static str, ColumnValue)>> =
columnar_docs.iter().flatten().cloned().collect();
let expected_merged_columnar = build_columnar(&concat_rows[..]);
assert_columnar_eq_strict(&merged_columnar, &expected_merged_columnar);
} }
// TODO add non trivial remap and merge // TODO add non trivial remap and merge