mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-05-21 02:30:43 +00:00
* use optional index in multivalued index
For mostly empty multivalued indices there was a large overhead during
creation when iterating all docids. This is alleviated by placing an
optional index in the multivalued index to mark documents that have values.
There's some performance overhead when accessing values in a multivalued
index. The accessing cost is now optional index + multivalue index. The
sparse codec performs relatively bad with the binary_search when accessing
data. This is reflected in the benchmarks below.
This changes the format of columnar to v2, but code is added to handle the v1
formats.
```
Running benches/bench_access.rs (/home/pascal/Development/tantivy/optional_multivalues/target/release/deps/bench_access-ea323c028db88db4)
multi sparse 1/13
access_values_for_doc Avg: 42.8946ms (+241.80%) Median: 42.8869ms (+244.10%) [42.7484ms .. 43.1074ms]
access_first_vals Avg: 42.8022ms (+421.93%) Median: 42.7553ms (+439.84%) [42.6794ms .. 43.7404ms]
multi 2x
access_values_for_doc Avg: 31.1244ms (+24.17%) Median: 30.8339ms (+23.46%) [30.7192ms .. 33.6059ms]
access_first_vals Avg: 24.3070ms (+70.92%) Median: 24.0966ms (+70.18%) [23.9328ms .. 26.4851ms]
sparse 1/13
access_values_for_doc Avg: 42.2490ms (+0.61%) Median: 42.2346ms (+2.28%) [41.8988ms .. 43.7821ms]
access_first_vals Avg: 43.6272ms (+0.23%) Median: 43.6197ms (+1.78%) [43.4920ms .. 43.9009ms]
dense 1/12
access_values_for_doc Avg: 8.6184ms (+23.18%) Median: 8.6126ms (+23.78%) [8.5843ms .. 8.7527ms]
access_first_vals Avg: 6.8112ms (+4.47%) Median: 6.8002ms (+4.55%) [6.7887ms .. 6.8991ms]
full
access_values_for_doc Avg: 9.4073ms (-5.09%) Median: 9.4023ms (-2.23%) [9.3694ms .. 9.4568ms]
access_first_vals Avg: 4.9531ms (+6.24%) Median: 4.9502ms (+7.85%) [4.9423ms .. 4.9718ms]
```
```
Running benches/bench_merge.rs (/home/pascal/Development/tantivy/optional_multivalues/target/release/deps/bench_merge-475697dfceb3639f)
merge_multi 2x_and_multi 2x Avg: 20.2280ms (+34.33%) Median: 20.1829ms (+35.33%) [19.9933ms .. 20.8806ms]
merge_multi sparse 1/13_and_multi sparse 1/13 Avg: 0.8961ms (-78.04%) Median: 0.8943ms (-77.61%) [0.8899ms .. 0.9272ms]
merge_dense 1/12_and_dense 1/12 Avg: 0.6619ms (-1.26%) Median: 0.6616ms (+2.20%) [0.6473ms .. 0.6837ms]
merge_sparse 1/13_and_sparse 1/13 Avg: 0.5508ms (-0.85%) Median: 0.5508ms (+2.80%) [0.5420ms .. 0.5634ms]
merge_sparse 1/13_and_dense 1/12 Avg: 0.6046ms (-4.64%) Median: 0.6038ms (+2.80%) [0.5939ms .. 0.6296ms]
merge_multi sparse 1/13_and_dense 1/12 Avg: 0.9111ms (-83.48%) Median: 0.9063ms (-83.50%) [0.9047ms .. 0.9663ms]
merge_multi sparse 1/13_and_sparse 1/13 Avg: 0.8451ms (-89.49%) Median: 0.8428ms (-89.43%) [0.8411ms .. 0.8563ms]
merge_multi 2x_and_dense 1/12 Avg: 10.6624ms (-4.82%) Median: 10.6568ms (-4.49%) [10.5738ms .. 10.8353ms]
merge_multi 2x_and_sparse 1/13 Avg: 10.6336ms (-22.95%) Median: 10.5925ms (-22.33%) [10.5149ms .. 11.5657ms]
```
* Update columnar/src/columnar/format_version.rs
Co-authored-by: Paul Masurel <paul@quickwit.io>
* Update columnar/src/column_index/mod.rs
Co-authored-by: Paul Masurel <paul@quickwit.io>
---------
Co-authored-by: Paul Masurel <paul@quickwit.io>
887 lines
33 KiB
Rust
887 lines
33 KiB
Rust
use std::collections::HashMap;
|
|
use std::fmt::Debug;
|
|
use std::net::Ipv6Addr;
|
|
|
|
use common::DateTime;
|
|
use proptest::prelude::*;
|
|
use proptest::sample::subsequence;
|
|
|
|
use crate::column_values::MonotonicallyMappableToU128;
|
|
use crate::columnar::{ColumnType, ColumnTypeCategory};
|
|
use crate::dynamic_column::{DynamicColumn, DynamicColumnHandle};
|
|
use crate::value::{Coerce, NumericalValue};
|
|
use crate::{
|
|
BytesColumn, Cardinality, Column, ColumnarReader, ColumnarWriter, RowAddr, RowId,
|
|
ShuffleMergeOrder, StackMergeOrder,
|
|
};
|
|
|
|
#[test]
|
|
fn test_dataframe_writer_str() {
|
|
let mut dataframe_writer = ColumnarWriter::default();
|
|
dataframe_writer.record_str(1u32, "my_string", "hello");
|
|
dataframe_writer.record_str(3u32, "my_string", "helloeee");
|
|
let mut buffer: Vec<u8> = Vec::new();
|
|
dataframe_writer.serialize(5, &mut buffer).unwrap();
|
|
let columnar = ColumnarReader::open(buffer).unwrap();
|
|
assert_eq!(columnar.num_columns(), 1);
|
|
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("my_string").unwrap();
|
|
assert_eq!(cols.len(), 1);
|
|
assert_eq!(cols[0].num_bytes(), 73);
|
|
}
|
|
|
|
#[test]
|
|
fn test_dataframe_writer_bytes() {
|
|
let mut dataframe_writer = ColumnarWriter::default();
|
|
dataframe_writer.record_bytes(1u32, "my_string", b"hello");
|
|
dataframe_writer.record_bytes(3u32, "my_string", b"helloeee");
|
|
let mut buffer: Vec<u8> = Vec::new();
|
|
dataframe_writer.serialize(5, &mut buffer).unwrap();
|
|
let columnar = ColumnarReader::open(buffer).unwrap();
|
|
assert_eq!(columnar.num_columns(), 1);
|
|
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("my_string").unwrap();
|
|
assert_eq!(cols.len(), 1);
|
|
assert_eq!(cols[0].num_bytes(), 73);
|
|
}
|
|
|
|
#[test]
|
|
fn test_dataframe_writer_bool() {
|
|
let mut dataframe_writer = ColumnarWriter::default();
|
|
dataframe_writer.record_bool(1u32, "bool.value", false);
|
|
dataframe_writer.record_bool(3u32, "bool.value", true);
|
|
let mut buffer: Vec<u8> = Vec::new();
|
|
dataframe_writer.serialize(5, &mut buffer).unwrap();
|
|
let columnar = ColumnarReader::open(buffer).unwrap();
|
|
assert_eq!(columnar.num_columns(), 1);
|
|
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("bool.value").unwrap();
|
|
assert_eq!(cols.len(), 1);
|
|
assert_eq!(cols[0].num_bytes(), 22);
|
|
assert_eq!(cols[0].column_type(), ColumnType::Bool);
|
|
let dyn_bool_col = cols[0].open().unwrap();
|
|
let DynamicColumn::Bool(bool_col) = dyn_bool_col else {
|
|
panic!();
|
|
};
|
|
let vals: Vec<Option<bool>> = (0..5).map(|row_id| bool_col.first(row_id)).collect();
|
|
assert_eq!(&vals, &[None, Some(false), None, Some(true), None,]);
|
|
}
|
|
|
|
#[test]
|
|
fn test_dataframe_writer_u64_multivalued() {
|
|
let mut dataframe_writer = ColumnarWriter::default();
|
|
dataframe_writer.record_numerical(2u32, "divisor", 2u64);
|
|
dataframe_writer.record_numerical(3u32, "divisor", 3u64);
|
|
dataframe_writer.record_numerical(4u32, "divisor", 2u64);
|
|
dataframe_writer.record_numerical(5u32, "divisor", 5u64);
|
|
dataframe_writer.record_numerical(6u32, "divisor", 2u64);
|
|
dataframe_writer.record_numerical(6u32, "divisor", 3u64);
|
|
let mut buffer: Vec<u8> = Vec::new();
|
|
dataframe_writer.serialize(7, &mut buffer).unwrap();
|
|
let columnar = ColumnarReader::open(buffer).unwrap();
|
|
assert_eq!(columnar.num_columns(), 1);
|
|
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("divisor").unwrap();
|
|
assert_eq!(cols.len(), 1);
|
|
assert_eq!(cols[0].num_bytes(), 50);
|
|
let dyn_i64_col = cols[0].open().unwrap();
|
|
let DynamicColumn::I64(divisor_col) = dyn_i64_col else {
|
|
panic!();
|
|
};
|
|
assert_eq!(
|
|
divisor_col.get_cardinality(),
|
|
crate::Cardinality::Multivalued
|
|
);
|
|
assert_eq!(divisor_col.num_docs(), 7);
|
|
}
|
|
|
|
#[test]
|
|
fn test_dataframe_writer_ip_addr() {
|
|
let mut dataframe_writer = ColumnarWriter::default();
|
|
dataframe_writer.record_ip_addr(1, "ip_addr", Ipv6Addr::from_u128(1001));
|
|
dataframe_writer.record_ip_addr(3, "ip_addr", Ipv6Addr::from_u128(1050));
|
|
let mut buffer: Vec<u8> = Vec::new();
|
|
dataframe_writer.serialize(5, &mut buffer).unwrap();
|
|
let columnar = ColumnarReader::open(buffer).unwrap();
|
|
assert_eq!(columnar.num_columns(), 1);
|
|
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("ip_addr").unwrap();
|
|
assert_eq!(cols.len(), 1);
|
|
assert_eq!(cols[0].num_bytes(), 42);
|
|
assert_eq!(cols[0].column_type(), ColumnType::IpAddr);
|
|
let dyn_bool_col = cols[0].open().unwrap();
|
|
let DynamicColumn::IpAddr(ip_col) = dyn_bool_col else {
|
|
panic!();
|
|
};
|
|
let vals: Vec<Option<Ipv6Addr>> = (0..5).map(|row_id| ip_col.first(row_id)).collect();
|
|
assert_eq!(
|
|
&vals,
|
|
&[
|
|
None,
|
|
Some(Ipv6Addr::from_u128(1001)),
|
|
None,
|
|
Some(Ipv6Addr::from_u128(1050)),
|
|
None,
|
|
]
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn test_dataframe_writer_numerical() {
|
|
let mut dataframe_writer = ColumnarWriter::default();
|
|
dataframe_writer.record_numerical(1u32, "srical.value", NumericalValue::U64(12u64));
|
|
dataframe_writer.record_numerical(2u32, "srical.value", NumericalValue::U64(13u64));
|
|
dataframe_writer.record_numerical(4u32, "srical.value", NumericalValue::U64(15u64));
|
|
let mut buffer: Vec<u8> = Vec::new();
|
|
dataframe_writer.serialize(6, &mut buffer).unwrap();
|
|
let columnar = ColumnarReader::open(buffer).unwrap();
|
|
assert_eq!(columnar.num_columns(), 1);
|
|
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("srical.value").unwrap();
|
|
assert_eq!(cols.len(), 1);
|
|
// Right now this 31 bytes are spent as follows
|
|
//
|
|
// - header 14 bytes
|
|
// - vals 8 //< due to padding? could have been 1byte?.
|
|
// - null footer 6 bytes
|
|
assert_eq!(cols[0].num_bytes(), 33);
|
|
let column = cols[0].open().unwrap();
|
|
let DynamicColumn::I64(column_i64) = column else {
|
|
panic!();
|
|
};
|
|
assert_eq!(column_i64.index.get_cardinality(), Cardinality::Optional);
|
|
assert_eq!(column_i64.first(0), None);
|
|
assert_eq!(column_i64.first(1), Some(12i64));
|
|
assert_eq!(column_i64.first(2), Some(13i64));
|
|
assert_eq!(column_i64.first(3), None);
|
|
assert_eq!(column_i64.first(4), Some(15i64));
|
|
assert_eq!(column_i64.first(5), None);
|
|
assert_eq!(column_i64.first(6), None); //< we can change the spec for that one.
|
|
}
|
|
|
|
#[test]
|
|
fn test_dictionary_encoded_str() {
|
|
let mut buffer = Vec::new();
|
|
let mut columnar_writer = ColumnarWriter::default();
|
|
columnar_writer.record_str(1, "my.column", "a");
|
|
columnar_writer.record_str(3, "my.column", "c");
|
|
columnar_writer.record_str(3, "my.column2", "different_column!");
|
|
columnar_writer.record_str(4, "my.column", "b");
|
|
columnar_writer.serialize(5, &mut buffer).unwrap();
|
|
let columnar_reader = ColumnarReader::open(buffer).unwrap();
|
|
assert_eq!(columnar_reader.num_columns(), 2);
|
|
let col_handles = columnar_reader.read_columns("my.column").unwrap();
|
|
assert_eq!(col_handles.len(), 1);
|
|
let DynamicColumn::Str(str_col) = col_handles[0].open().unwrap() else {
|
|
panic!();
|
|
};
|
|
let index: Vec<Option<u64>> = (0..5).map(|row_id| str_col.ords().first(row_id)).collect();
|
|
assert_eq!(index, &[None, Some(0), None, Some(2), Some(1)]);
|
|
assert_eq!(str_col.num_rows(), 5);
|
|
let mut term_buffer = String::new();
|
|
let term_ords = str_col.ords();
|
|
assert_eq!(term_ords.first(0), None);
|
|
assert_eq!(term_ords.first(1), Some(0));
|
|
str_col.ord_to_str(0u64, &mut term_buffer).unwrap();
|
|
assert_eq!(term_buffer, "a");
|
|
assert_eq!(term_ords.first(2), None);
|
|
assert_eq!(term_ords.first(3), Some(2));
|
|
str_col.ord_to_str(2u64, &mut term_buffer).unwrap();
|
|
assert_eq!(term_buffer, "c");
|
|
assert_eq!(term_ords.first(4), Some(1));
|
|
str_col.ord_to_str(1u64, &mut term_buffer).unwrap();
|
|
assert_eq!(term_buffer, "b");
|
|
}
|
|
|
|
#[test]
|
|
fn test_dictionary_encoded_bytes() {
|
|
let mut buffer = Vec::new();
|
|
let mut columnar_writer = ColumnarWriter::default();
|
|
columnar_writer.record_bytes(1, "my.column", b"a");
|
|
columnar_writer.record_bytes(3, "my.column", b"c");
|
|
columnar_writer.record_bytes(3, "my.column2", b"different_column!");
|
|
columnar_writer.record_bytes(4, "my.column", b"b");
|
|
columnar_writer.serialize(5, &mut buffer).unwrap();
|
|
let columnar_reader = ColumnarReader::open(buffer).unwrap();
|
|
assert_eq!(columnar_reader.num_columns(), 2);
|
|
let col_handles = columnar_reader.read_columns("my.column").unwrap();
|
|
assert_eq!(col_handles.len(), 1);
|
|
let DynamicColumn::Bytes(bytes_col) = col_handles[0].open().unwrap() else {
|
|
panic!();
|
|
};
|
|
let index: Vec<Option<u64>> = (0..5)
|
|
.map(|row_id| bytes_col.ords().first(row_id))
|
|
.collect();
|
|
assert_eq!(index, &[None, Some(0), None, Some(2), Some(1)]);
|
|
assert_eq!(bytes_col.num_rows(), 5);
|
|
let mut term_buffer = Vec::new();
|
|
let term_ords = bytes_col.ords();
|
|
assert_eq!(term_ords.first(0), None);
|
|
assert_eq!(term_ords.first(1), Some(0));
|
|
bytes_col
|
|
.dictionary
|
|
.ord_to_term(0u64, &mut term_buffer)
|
|
.unwrap();
|
|
assert_eq!(term_buffer, b"a");
|
|
assert_eq!(term_ords.first(2), None);
|
|
assert_eq!(term_ords.first(3), Some(2));
|
|
bytes_col
|
|
.dictionary
|
|
.ord_to_term(2u64, &mut term_buffer)
|
|
.unwrap();
|
|
assert_eq!(term_buffer, b"c");
|
|
assert_eq!(term_ords.first(4), Some(1));
|
|
bytes_col
|
|
.dictionary
|
|
.ord_to_term(1u64, &mut term_buffer)
|
|
.unwrap();
|
|
assert_eq!(term_buffer, b"b");
|
|
}
|
|
|
|
fn num_strategy() -> impl Strategy<Value = NumericalValue> {
|
|
prop_oneof![
|
|
3 => Just(NumericalValue::U64(0u64)),
|
|
3 => Just(NumericalValue::U64(u64::MAX)),
|
|
3 => Just(NumericalValue::I64(0i64)),
|
|
3 => Just(NumericalValue::I64(i64::MIN)),
|
|
3 => Just(NumericalValue::I64(i64::MAX)),
|
|
3 => Just(NumericalValue::F64(1.2f64)),
|
|
1 => any::<f64>().prop_map(NumericalValue::from),
|
|
1 => any::<u64>().prop_map(NumericalValue::from),
|
|
1 => any::<i64>().prop_map(NumericalValue::from),
|
|
]
|
|
}
|
|
|
|
#[derive(Debug, Clone, Copy)]
|
|
enum ColumnValue {
|
|
Str(&'static str),
|
|
Bytes(&'static [u8]),
|
|
Numerical(NumericalValue),
|
|
IpAddr(Ipv6Addr),
|
|
Bool(bool),
|
|
DateTime(DateTime),
|
|
}
|
|
|
|
impl<T: Into<NumericalValue>> From<T> for ColumnValue {
|
|
fn from(val: T) -> ColumnValue {
|
|
ColumnValue::Numerical(val.into())
|
|
}
|
|
}
|
|
|
|
impl ColumnValue {
|
|
pub(crate) fn column_type_category(&self) -> ColumnTypeCategory {
|
|
match self {
|
|
ColumnValue::Str(_) => ColumnTypeCategory::Str,
|
|
ColumnValue::Bytes(_) => ColumnTypeCategory::Bytes,
|
|
ColumnValue::Numerical(_) => ColumnTypeCategory::Numerical,
|
|
ColumnValue::IpAddr(_) => ColumnTypeCategory::IpAddr,
|
|
ColumnValue::Bool(_) => ColumnTypeCategory::Bool,
|
|
ColumnValue::DateTime(_) => ColumnTypeCategory::DateTime,
|
|
}
|
|
}
|
|
}
|
|
|
|
fn column_name_strategy() -> impl Strategy<Value = &'static str> {
|
|
prop_oneof![Just("c1"), Just("c2")]
|
|
}
|
|
|
|
fn string_strategy() -> impl Strategy<Value = &'static str> {
|
|
prop_oneof![Just("a"), Just("b")]
|
|
}
|
|
|
|
fn bytes_strategy() -> impl Strategy<Value = &'static [u8]> {
|
|
prop_oneof![Just(&[0u8][..]), Just(&[1u8][..])]
|
|
}
|
|
|
|
// A random column value
|
|
fn column_value_strategy() -> impl Strategy<Value = ColumnValue> {
|
|
prop_oneof![
|
|
10 => string_strategy().prop_map(ColumnValue::Str),
|
|
1 => bytes_strategy().prop_map(ColumnValue::Bytes),
|
|
40 => num_strategy().prop_map(ColumnValue::Numerical),
|
|
1 => (1u16..3u16).prop_map(|ip_addr_byte| ColumnValue::IpAddr(Ipv6Addr::new(
|
|
127,
|
|
0,
|
|
0,
|
|
0,
|
|
0,
|
|
0,
|
|
0,
|
|
ip_addr_byte
|
|
))),
|
|
1 => any::<bool>().prop_map(ColumnValue::Bool),
|
|
1 => (679_723_993i64..1_679_723_995i64)
|
|
.prop_map(|val| { ColumnValue::DateTime(DateTime::from_timestamp_secs(val)) })
|
|
]
|
|
}
|
|
|
|
// A document contains up to 4 values.
|
|
fn doc_strategy() -> impl Strategy<Value = Vec<(&'static str, ColumnValue)>> {
|
|
proptest::collection::vec((column_name_strategy(), column_value_strategy()), 0..=4)
|
|
}
|
|
|
|
fn num_docs_strategy() -> impl Strategy<Value = usize> {
|
|
prop_oneof!(
|
|
// We focus heavily on the 0..2 case as we assume it is sufficient to cover all edge cases.
|
|
0usize..=3usize,
|
|
// We leave 50% of the effort exploring more defensively.
|
|
3usize..=12usize
|
|
)
|
|
}
|
|
|
|
// A columnar contains up to 2 docs.
|
|
fn columnar_docs_strategy() -> impl Strategy<Value = Vec<Vec<(&'static str, ColumnValue)>>> {
|
|
num_docs_strategy()
|
|
.prop_flat_map(|num_docs| proptest::collection::vec(doc_strategy(), num_docs))
|
|
}
|
|
|
|
fn permutation_and_subset_strategy(n: usize) -> impl Strategy<Value = Vec<usize>> {
|
|
let vals: Vec<usize> = (0..n).collect();
|
|
subsequence(vals, 0..=n).prop_shuffle()
|
|
}
|
|
|
|
fn build_columnar_with_mapping(docs: &[Vec<(&'static str, ColumnValue)>]) -> ColumnarReader {
|
|
let num_docs = docs.len() as u32;
|
|
let mut buffer = Vec::new();
|
|
let mut columnar_writer = ColumnarWriter::default();
|
|
for (doc_id, vals) in docs.iter().enumerate() {
|
|
for (column_name, col_val) in vals {
|
|
match *col_val {
|
|
ColumnValue::Str(str_val) => {
|
|
columnar_writer.record_str(doc_id as u32, column_name, str_val);
|
|
}
|
|
ColumnValue::Bytes(bytes) => {
|
|
columnar_writer.record_bytes(doc_id as u32, column_name, bytes)
|
|
}
|
|
ColumnValue::Numerical(num) => {
|
|
columnar_writer.record_numerical(doc_id as u32, column_name, num);
|
|
}
|
|
ColumnValue::IpAddr(ip_addr) => {
|
|
columnar_writer.record_ip_addr(doc_id as u32, column_name, ip_addr);
|
|
}
|
|
ColumnValue::Bool(bool_val) => {
|
|
columnar_writer.record_bool(doc_id as u32, column_name, bool_val);
|
|
}
|
|
ColumnValue::DateTime(date_time) => {
|
|
columnar_writer.record_datetime(doc_id as u32, column_name, date_time);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
columnar_writer.serialize(num_docs, &mut buffer).unwrap();
|
|
|
|
ColumnarReader::open(buffer).unwrap()
|
|
}
|
|
|
|
fn build_columnar(docs: &[Vec<(&'static str, ColumnValue)>]) -> ColumnarReader {
|
|
build_columnar_with_mapping(docs)
|
|
}
|
|
|
|
fn assert_columnar_eq_strict(left: &ColumnarReader, right: &ColumnarReader) {
|
|
assert_columnar_eq(left, right, false);
|
|
}
|
|
|
|
fn assert_columnar_eq(
|
|
left: &ColumnarReader,
|
|
right: &ColumnarReader,
|
|
lenient_on_numerical_value: bool,
|
|
) {
|
|
assert_eq!(left.num_rows(), right.num_rows());
|
|
let left_columns = left.list_columns().unwrap();
|
|
let right_columns = right.list_columns().unwrap();
|
|
assert_eq!(left_columns.len(), right_columns.len());
|
|
for i in 0..left_columns.len() {
|
|
assert_eq!(left_columns[i].0, right_columns[i].0);
|
|
let left_column = left_columns[i].1.open().unwrap();
|
|
let right_column = right_columns[i].1.open().unwrap();
|
|
assert_dyn_column_eq(&left_column, &right_column, lenient_on_numerical_value);
|
|
}
|
|
}
|
|
|
|
#[track_caller]
|
|
fn assert_column_eq<T: Copy + PartialOrd + Debug + Send + Sync + 'static>(
|
|
left: &Column<T>,
|
|
right: &Column<T>,
|
|
) {
|
|
assert_eq!(left.get_cardinality(), right.get_cardinality());
|
|
assert_eq!(left.num_docs(), right.num_docs());
|
|
let num_docs = left.num_docs();
|
|
for doc in 0..num_docs {
|
|
assert_eq!(
|
|
left.index.value_row_ids(doc),
|
|
right.index.value_row_ids(doc)
|
|
);
|
|
}
|
|
assert_eq!(left.values.num_vals(), right.values.num_vals());
|
|
let num_vals = left.values.num_vals();
|
|
for i in 0..num_vals {
|
|
assert_eq!(left.values.get_val(i), right.values.get_val(i));
|
|
}
|
|
}
|
|
|
|
fn assert_bytes_column_eq(left: &BytesColumn, right: &BytesColumn) {
|
|
assert_eq!(
|
|
left.term_ord_column.get_cardinality(),
|
|
right.term_ord_column.get_cardinality()
|
|
);
|
|
assert_eq!(left.num_rows(), right.num_rows());
|
|
assert_column_eq(&left.term_ord_column, &right.term_ord_column);
|
|
assert_eq!(left.dictionary.num_terms(), right.dictionary.num_terms());
|
|
let num_terms = left.dictionary.num_terms();
|
|
let mut left_terms = left.dictionary.stream().unwrap();
|
|
let mut right_terms = right.dictionary.stream().unwrap();
|
|
for _ in 0..num_terms {
|
|
assert!(left_terms.advance());
|
|
assert!(right_terms.advance());
|
|
assert_eq!(left_terms.key(), right_terms.key());
|
|
}
|
|
assert!(!left_terms.advance());
|
|
assert!(!right_terms.advance());
|
|
}
|
|
|
|
fn assert_dyn_column_eq(
|
|
left_dyn_column: &DynamicColumn,
|
|
right_dyn_column: &DynamicColumn,
|
|
lenient_on_numerical_value: bool,
|
|
) {
|
|
assert_eq!(
|
|
&left_dyn_column.get_cardinality(),
|
|
&right_dyn_column.get_cardinality()
|
|
);
|
|
match &(left_dyn_column, right_dyn_column) {
|
|
(DynamicColumn::Bool(left_col), DynamicColumn::Bool(right_col)) => {
|
|
assert_column_eq(left_col, right_col);
|
|
}
|
|
(DynamicColumn::I64(left_col), DynamicColumn::I64(right_col)) => {
|
|
assert_column_eq(left_col, right_col);
|
|
}
|
|
(DynamicColumn::U64(left_col), DynamicColumn::U64(right_col)) => {
|
|
assert_column_eq(left_col, right_col);
|
|
}
|
|
(DynamicColumn::F64(left_col), DynamicColumn::F64(right_col)) => {
|
|
assert_column_eq(left_col, right_col);
|
|
}
|
|
(DynamicColumn::DateTime(left_col), DynamicColumn::DateTime(right_col)) => {
|
|
assert_column_eq(left_col, right_col);
|
|
}
|
|
(DynamicColumn::IpAddr(left_col), DynamicColumn::IpAddr(right_col)) => {
|
|
assert_column_eq(left_col, right_col);
|
|
}
|
|
(DynamicColumn::Bytes(left_col), DynamicColumn::Bytes(right_col)) => {
|
|
assert_bytes_column_eq(left_col, right_col);
|
|
}
|
|
(DynamicColumn::Str(left_col), DynamicColumn::Str(right_col)) => {
|
|
assert_bytes_column_eq(left_col, right_col);
|
|
}
|
|
(left, right) => {
|
|
if lenient_on_numerical_value {
|
|
assert_eq!(
|
|
ColumnTypeCategory::from(left.column_type()),
|
|
ColumnTypeCategory::from(right.column_type())
|
|
);
|
|
} else {
|
|
panic!(
|
|
"Column type are not the same: {:?} vs {:?}",
|
|
left.column_type(),
|
|
right.column_type()
|
|
);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
trait AssertEqualToColumnValue {
|
|
fn assert_equal_to_column_value(&self, column_value: &ColumnValue);
|
|
}
|
|
|
|
impl AssertEqualToColumnValue for bool {
|
|
fn assert_equal_to_column_value(&self, column_value: &ColumnValue) {
|
|
let ColumnValue::Bool(val) = column_value else {
|
|
panic!()
|
|
};
|
|
assert_eq!(self, val);
|
|
}
|
|
}
|
|
|
|
impl AssertEqualToColumnValue for Ipv6Addr {
|
|
fn assert_equal_to_column_value(&self, column_value: &ColumnValue) {
|
|
let ColumnValue::IpAddr(val) = column_value else {
|
|
panic!()
|
|
};
|
|
assert_eq!(self, val);
|
|
}
|
|
}
|
|
|
|
impl<T: Coerce + PartialEq + Debug + Into<NumericalValue>> AssertEqualToColumnValue for T {
|
|
fn assert_equal_to_column_value(&self, column_value: &ColumnValue) {
|
|
let ColumnValue::Numerical(num) = column_value else {
|
|
panic!()
|
|
};
|
|
assert_eq!(self, &T::coerce(*num));
|
|
}
|
|
}
|
|
|
|
impl AssertEqualToColumnValue for DateTime {
|
|
fn assert_equal_to_column_value(&self, column_value: &ColumnValue) {
|
|
let ColumnValue::DateTime(dt) = column_value else {
|
|
panic!()
|
|
};
|
|
assert_eq!(self, dt);
|
|
}
|
|
}
|
|
|
|
fn assert_column_values<
|
|
T: AssertEqualToColumnValue + PartialEq + Copy + PartialOrd + Debug + Send + Sync + 'static,
|
|
>(
|
|
col: &Column<T>,
|
|
expected: &HashMap<u32, Vec<&ColumnValue>>,
|
|
) {
|
|
let mut num_non_empty_rows = 0;
|
|
for doc in 0..col.num_docs() {
|
|
let doc_vals: Vec<T> = col.values_for_doc(doc).collect();
|
|
if doc_vals.is_empty() {
|
|
continue;
|
|
}
|
|
num_non_empty_rows += 1;
|
|
let expected_vals = expected.get(&doc).unwrap();
|
|
assert_eq!(doc_vals.len(), expected_vals.len());
|
|
for (val, &expected) in doc_vals.iter().zip(expected_vals.iter()) {
|
|
val.assert_equal_to_column_value(expected)
|
|
}
|
|
}
|
|
assert_eq!(num_non_empty_rows, expected.len());
|
|
}
|
|
|
|
fn assert_bytes_column_values(
|
|
col: &BytesColumn,
|
|
expected: &HashMap<u32, Vec<&ColumnValue>>,
|
|
is_str: bool,
|
|
) {
|
|
let mut num_non_empty_rows = 0;
|
|
let mut buffer = Vec::new();
|
|
for doc in 0..col.term_ord_column.num_docs() {
|
|
let doc_vals: Vec<u64> = col.term_ords(doc).collect();
|
|
if doc_vals.is_empty() {
|
|
continue;
|
|
}
|
|
let expected_vals = expected.get(&doc).unwrap();
|
|
assert_eq!(doc_vals.len(), expected_vals.len());
|
|
for (&expected_col_val, &ord) in expected_vals.iter().zip(&doc_vals) {
|
|
col.ord_to_bytes(ord, &mut buffer).unwrap();
|
|
match expected_col_val {
|
|
ColumnValue::Str(str_val) => {
|
|
assert!(is_str);
|
|
assert_eq!(str_val.as_bytes(), &buffer);
|
|
}
|
|
ColumnValue::Bytes(bytes_val) => {
|
|
assert!(!is_str);
|
|
assert_eq!(bytes_val, &buffer);
|
|
}
|
|
_ => {
|
|
panic!();
|
|
}
|
|
}
|
|
}
|
|
num_non_empty_rows += 1;
|
|
}
|
|
assert_eq!(num_non_empty_rows, expected.len());
|
|
}
|
|
|
|
// This proptest attempts to create a tiny columnar based of up to 3 rows, and checks that the
|
|
// resulting columnar matches the row data.
|
|
proptest! {
|
|
#![proptest_config(ProptestConfig::with_cases(500))]
|
|
#[test]
|
|
fn test_single_columnar_builder_proptest(docs in columnar_docs_strategy()) {
|
|
let columnar = build_columnar(&docs[..]);
|
|
assert_eq!(columnar.num_rows() as usize, docs.len());
|
|
let mut expected_columns: HashMap<(&str, ColumnTypeCategory), HashMap<u32, Vec<&ColumnValue>> > = Default::default();
|
|
for (doc_id, doc_vals) in docs.iter().enumerate() {
|
|
for (col_name, col_val) in doc_vals {
|
|
expected_columns
|
|
.entry((col_name, col_val.column_type_category()))
|
|
.or_default()
|
|
.entry(doc_id as u32)
|
|
.or_default()
|
|
.push(col_val);
|
|
}
|
|
}
|
|
let column_list = columnar.list_columns().unwrap();
|
|
assert_eq!(expected_columns.len(), column_list.len());
|
|
for (column_name, column) in column_list {
|
|
let dynamic_column = column.open().unwrap();
|
|
let col_category: ColumnTypeCategory = dynamic_column.column_type().into();
|
|
let expected_col_values: &HashMap<u32, Vec<&ColumnValue>> = expected_columns.get(&(column_name.as_str(), col_category)).unwrap();
|
|
match &dynamic_column {
|
|
DynamicColumn::Bool(col) =>
|
|
assert_column_values(col, expected_col_values),
|
|
DynamicColumn::I64(col) =>
|
|
assert_column_values(col, expected_col_values),
|
|
DynamicColumn::U64(col) =>
|
|
assert_column_values(col, expected_col_values),
|
|
DynamicColumn::F64(col) =>
|
|
assert_column_values(col, expected_col_values),
|
|
DynamicColumn::IpAddr(col) =>
|
|
assert_column_values(col, expected_col_values),
|
|
DynamicColumn::DateTime(col) =>
|
|
assert_column_values(col, expected_col_values),
|
|
DynamicColumn::Bytes(col) =>
|
|
assert_bytes_column_values(col, expected_col_values, false),
|
|
DynamicColumn::Str(col) =>
|
|
assert_bytes_column_values(col, expected_col_values, true),
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// This tests create 2 or 3 random small columnar and attempts to merge them.
|
|
// It compares the resulting merged dataframe with what would have been obtained by building the
|
|
// dataframe from the concatenated rows to begin with.
|
|
proptest! {
|
|
#![proptest_config(ProptestConfig::with_cases(1000))]
|
|
#[test]
|
|
fn test_columnar_merge_proptest(columnar_docs in proptest::collection::vec(columnar_docs_strategy(), 2..=3)) {
|
|
let columnar_readers: Vec<ColumnarReader> = columnar_docs.iter()
|
|
.map(|docs| build_columnar(&docs[..]))
|
|
.collect::<Vec<_>>();
|
|
let columnar_readers_arr: Vec<&ColumnarReader> = columnar_readers.iter().collect();
|
|
let mut output: Vec<u8> = Vec::new();
|
|
let stack_merge_order = StackMergeOrder::stack(&columnar_readers_arr[..]).into();
|
|
crate::merge_columnar(&columnar_readers_arr[..], &[], stack_merge_order, &mut output).unwrap();
|
|
let merged_columnar = ColumnarReader::open(output).unwrap();
|
|
let concat_rows: Vec<Vec<(&'static str, ColumnValue)>> = columnar_docs.iter().flatten().cloned().collect();
|
|
let expected_merged_columnar = build_columnar(&concat_rows[..]);
|
|
assert_columnar_eq_strict(&merged_columnar, &expected_merged_columnar);
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn test_columnar_merging_empty_columnar() {
|
|
let columnar_docs: Vec<Vec<Vec<(&str, ColumnValue)>>> =
|
|
vec![vec![], vec![vec![("c1", ColumnValue::Str("a"))]]];
|
|
let columnar_readers: Vec<ColumnarReader> = columnar_docs
|
|
.iter()
|
|
.map(|docs| build_columnar(&docs[..]))
|
|
.collect::<Vec<_>>();
|
|
let columnar_readers_arr: Vec<&ColumnarReader> = columnar_readers.iter().collect();
|
|
let mut output: Vec<u8> = Vec::new();
|
|
let stack_merge_order = StackMergeOrder::stack(&columnar_readers_arr[..]);
|
|
crate::merge_columnar(
|
|
&columnar_readers_arr[..],
|
|
&[],
|
|
crate::MergeRowOrder::Stack(stack_merge_order),
|
|
&mut output,
|
|
)
|
|
.unwrap();
|
|
let merged_columnar = ColumnarReader::open(output).unwrap();
|
|
let concat_rows: Vec<Vec<(&'static str, ColumnValue)>> =
|
|
columnar_docs.iter().flatten().cloned().collect();
|
|
let expected_merged_columnar = build_columnar(&concat_rows[..]);
|
|
assert_columnar_eq_strict(&merged_columnar, &expected_merged_columnar);
|
|
}
|
|
|
|
#[test]
|
|
fn test_columnar_merging_number_columns() {
|
|
let columnar_docs: Vec<Vec<Vec<(&str, ColumnValue)>>> = vec![
|
|
// columnar 1
|
|
vec![
|
|
// doc 1.1
|
|
vec![("c2", ColumnValue::Numerical(0i64.into()))],
|
|
],
|
|
// columnar2
|
|
vec![
|
|
// doc 2.1
|
|
vec![("c2", ColumnValue::Numerical(0u64.into()))],
|
|
// doc 2.2
|
|
vec![("c2", ColumnValue::Numerical(u64::MAX.into()))],
|
|
],
|
|
];
|
|
let columnar_readers: Vec<ColumnarReader> = columnar_docs
|
|
.iter()
|
|
.map(|docs| build_columnar(&docs[..]))
|
|
.collect::<Vec<_>>();
|
|
let columnar_readers_arr: Vec<&ColumnarReader> = columnar_readers.iter().collect();
|
|
let mut output: Vec<u8> = Vec::new();
|
|
let stack_merge_order = StackMergeOrder::stack(&columnar_readers_arr[..]);
|
|
crate::merge_columnar(
|
|
&columnar_readers_arr[..],
|
|
&[],
|
|
crate::MergeRowOrder::Stack(stack_merge_order),
|
|
&mut output,
|
|
)
|
|
.unwrap();
|
|
let merged_columnar = ColumnarReader::open(output).unwrap();
|
|
let concat_rows: Vec<Vec<(&'static str, ColumnValue)>> =
|
|
columnar_docs.iter().flatten().cloned().collect();
|
|
let expected_merged_columnar = build_columnar(&concat_rows[..]);
|
|
assert_columnar_eq_strict(&merged_columnar, &expected_merged_columnar);
|
|
}
|
|
|
|
// TODO add non trivial remap and merge
|
|
// TODO test required_columns
|
|
// TODO document edge case: required_columns incompatible with values.
|
|
|
|
fn columnar_docs_and_remap(
|
|
) -> impl Strategy<Value = (Vec<Vec<Vec<(&'static str, ColumnValue)>>>, Vec<RowAddr>)> {
|
|
proptest::collection::vec(columnar_docs_strategy(), 2..=3).prop_flat_map(
|
|
|columnars_docs: Vec<Vec<Vec<(&str, ColumnValue)>>>| {
|
|
let row_addrs: Vec<RowAddr> = columnars_docs
|
|
.iter()
|
|
.enumerate()
|
|
.flat_map(|(segment_ord, columnar_docs)| {
|
|
(0u32..columnar_docs.len() as u32).map(move |row_id| RowAddr {
|
|
segment_ord: segment_ord as u32,
|
|
row_id,
|
|
})
|
|
})
|
|
.collect();
|
|
permutation_and_subset_strategy(row_addrs.len()).prop_map(move |shuffled_subset| {
|
|
let shuffled_row_addr_subset: Vec<RowAddr> =
|
|
shuffled_subset.iter().map(|ord| row_addrs[*ord]).collect();
|
|
(columnars_docs.clone(), shuffled_row_addr_subset)
|
|
})
|
|
},
|
|
)
|
|
}
|
|
|
|
proptest! {
|
|
#![proptest_config(ProptestConfig::with_cases(1000))]
|
|
#[test]
|
|
fn test_columnar_merge_and_remap_proptest((columnar_docs, shuffle_merge_order) in
|
|
columnar_docs_and_remap()) {
|
|
test_columnar_merge_and_remap(columnar_docs, shuffle_merge_order);
|
|
}
|
|
}
|
|
|
|
fn test_columnar_merge_and_remap(
|
|
columnar_docs: Vec<Vec<Vec<(&'static str, ColumnValue)>>>,
|
|
shuffle_merge_order: Vec<RowAddr>,
|
|
) {
|
|
let shuffled_rows: Vec<Vec<(&'static str, ColumnValue)>> = shuffle_merge_order
|
|
.iter()
|
|
.map(|row_addr| {
|
|
columnar_docs[row_addr.segment_ord as usize][row_addr.row_id as usize].clone()
|
|
})
|
|
.collect();
|
|
let expected_merged_columnar = build_columnar(&shuffled_rows[..]);
|
|
let columnar_readers: Vec<ColumnarReader> = columnar_docs
|
|
.iter()
|
|
.map(|docs| build_columnar(&docs[..]))
|
|
.collect::<Vec<_>>();
|
|
let columnar_readers_ref: Vec<&ColumnarReader> = columnar_readers.iter().collect();
|
|
let mut output: Vec<u8> = Vec::new();
|
|
let segment_num_rows: Vec<RowId> = columnar_docs
|
|
.iter()
|
|
.map(|docs| docs.len() as RowId)
|
|
.collect();
|
|
let shuffle_merge_order = ShuffleMergeOrder::for_test(&segment_num_rows, shuffle_merge_order);
|
|
crate::merge_columnar(
|
|
&columnar_readers_ref[..],
|
|
&[],
|
|
shuffle_merge_order.into(),
|
|
&mut output,
|
|
)
|
|
.unwrap();
|
|
let merged_columnar = ColumnarReader::open(output).unwrap();
|
|
assert_columnar_eq(&merged_columnar, &expected_merged_columnar, true);
|
|
}
|
|
|
|
#[test]
|
|
fn test_columnar_merge_and_remap_bug_1() {
|
|
let columnar_docs = vec![vec![
|
|
vec![
|
|
("c1", ColumnValue::Numerical(NumericalValue::U64(0))),
|
|
("c1", ColumnValue::Numerical(NumericalValue::U64(0))),
|
|
],
|
|
vec![],
|
|
]];
|
|
let shuffle_merge_order: Vec<RowAddr> = vec![
|
|
RowAddr {
|
|
segment_ord: 0,
|
|
row_id: 1,
|
|
},
|
|
RowAddr {
|
|
segment_ord: 0,
|
|
row_id: 0,
|
|
},
|
|
];
|
|
|
|
test_columnar_merge_and_remap(columnar_docs, shuffle_merge_order);
|
|
}
|
|
|
|
#[test]
|
|
fn test_columnar_merge_empty() {
|
|
let columnar_reader_1 = build_columnar(&[]);
|
|
let rows: &[Vec<_>] = &[vec![("c1", ColumnValue::Str("a"))]][..];
|
|
let columnar_reader_2 = build_columnar(rows);
|
|
let mut output: Vec<u8> = Vec::new();
|
|
let segment_num_rows: Vec<RowId> = vec![0, 0];
|
|
let shuffle_merge_order = ShuffleMergeOrder::for_test(&segment_num_rows, vec![]);
|
|
crate::merge_columnar(
|
|
&[&columnar_reader_1, &columnar_reader_2],
|
|
&[],
|
|
shuffle_merge_order.into(),
|
|
&mut output,
|
|
)
|
|
.unwrap();
|
|
let merged_columnar = ColumnarReader::open(output).unwrap();
|
|
assert_eq!(merged_columnar.num_rows(), 0);
|
|
assert_eq!(merged_columnar.num_columns(), 0);
|
|
}
|
|
|
|
#[test]
|
|
fn test_columnar_merge_single_str_column() {
|
|
let columnar_reader_1 = build_columnar(&[]);
|
|
let rows: &[Vec<_>] = &[vec![("c1", ColumnValue::Str("a"))]][..];
|
|
let columnar_reader_2 = build_columnar(rows);
|
|
let mut output: Vec<u8> = Vec::new();
|
|
let segment_num_rows: Vec<RowId> = vec![0, 1];
|
|
let shuffle_merge_order = ShuffleMergeOrder::for_test(
|
|
&segment_num_rows,
|
|
vec![RowAddr {
|
|
segment_ord: 1u32,
|
|
row_id: 0u32,
|
|
}],
|
|
);
|
|
crate::merge_columnar(
|
|
&[&columnar_reader_1, &columnar_reader_2],
|
|
&[],
|
|
shuffle_merge_order.into(),
|
|
&mut output,
|
|
)
|
|
.unwrap();
|
|
let merged_columnar = ColumnarReader::open(output).unwrap();
|
|
assert_eq!(merged_columnar.num_rows(), 1);
|
|
assert_eq!(merged_columnar.num_columns(), 1);
|
|
}
|
|
|
|
#[test]
|
|
fn test_delete_decrease_cardinality() {
|
|
let columnar_reader_1 = build_columnar(&[]);
|
|
let rows: &[Vec<_>] = &[
|
|
vec![
|
|
("c", ColumnValue::from(0i64)),
|
|
("c", ColumnValue::from(0i64)),
|
|
],
|
|
vec![("c", ColumnValue::from(0i64))],
|
|
][..];
|
|
// c is multivalued here
|
|
let columnar_reader_2 = build_columnar(rows);
|
|
let mut output: Vec<u8> = Vec::new();
|
|
let shuffle_merge_order = ShuffleMergeOrder::for_test(
|
|
&[0, 2],
|
|
vec![RowAddr {
|
|
segment_ord: 1u32,
|
|
row_id: 1u32,
|
|
}],
|
|
);
|
|
crate::merge_columnar(
|
|
&[&columnar_reader_1, &columnar_reader_2],
|
|
&[],
|
|
shuffle_merge_order.into(),
|
|
&mut output,
|
|
)
|
|
.unwrap();
|
|
let merged_columnar = ColumnarReader::open(output).unwrap();
|
|
assert_eq!(merged_columnar.num_rows(), 1);
|
|
assert_eq!(merged_columnar.num_columns(), 1);
|
|
let cols = merged_columnar.read_columns("c").unwrap();
|
|
assert_eq!(cols.len(), 1);
|
|
assert_eq!(cols[0].column_type(), ColumnType::I64);
|
|
assert_eq!(cols[0].open().unwrap().get_cardinality(), Cardinality::Full);
|
|
}
|