Compare commits

...

1 Commits

Author SHA1 Message Date
Paul Masurel
a088de3079 proptests
Added shuffling on write
2023-03-26 14:30:17 +09:00
13 changed files with 604 additions and 22 deletions

View File

@@ -23,6 +23,7 @@ serde = "1.0.152"
proptest = "1"
more-asserts = "0.3.1"
rand = "0.8.5"
serde_json = "1"
[features]
unstable = []

View File

@@ -1,6 +1,6 @@
use std::io;
use std::ops::Deref;
use std::sync::Arc;
use std::{fmt, io};
use sstable::{Dictionary, VoidSSTable};
@@ -21,6 +21,14 @@ pub struct BytesColumn {
pub(crate) term_ord_column: Column<u64>,
}
impl fmt::Debug for BytesColumn {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BytesColumn")
.field("term_ord_column", &self.term_ord_column)
.finish()
}
}
impl BytesColumn {
/// Fills the given `output` buffer with the term associated to the ordinal `ord`.
///
@@ -56,6 +64,12 @@ impl BytesColumn {
#[derive(Clone)]
pub struct StrColumn(BytesColumn);
impl fmt::Debug for StrColumn {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:?}", self.term_ord_column)
}
}
impl From<StrColumn> for BytesColumn {
fn from(str_column: StrColumn) -> BytesColumn {
str_column.0

View File

@@ -1,6 +1,7 @@
mod dictionary_encoded;
mod serialize;
use core::fmt;
use std::fmt::Debug;
use std::io::Write;
use std::ops::{Deref, Range, RangeInclusive};
@@ -24,6 +25,16 @@ pub struct Column<T = u64> {
pub values: Arc<dyn ColumnValues<T>>,
}
impl<T: fmt::Debug + PartialOrd + Send + Sync + Copy + 'static> fmt::Debug for Column<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let num_docs = self.num_docs();
let entries = (0..num_docs)
.map(|i| (i, self.values_for_doc(i).collect::<Vec<_>>()))
.filter(|(_, vals)| !vals.is_empty());
f.debug_map().entries(entries).finish()
}
}
impl<T: PartialOrd + Default> Column<T> {
pub fn build_empty_column(num_docs: u32) -> Column<T> {
Column {

View File

@@ -12,7 +12,7 @@ pub use serialize::{open_column_index, serialize_column_index, SerializableColum
use crate::column_index::multivalued_index::MultiValueIndex;
use crate::{Cardinality, DocId, RowId};
#[derive(Clone)]
#[derive(Clone, Debug)]
pub enum ColumnIndex {
Empty {
num_docs: u32,
@@ -37,11 +37,15 @@ impl From<MultiValueIndex> for ColumnIndex {
}
impl ColumnIndex {
// Returns the cardinality of the column index.
//
// By convention, if the column contains no docs, we consider that it is
// full.
#[inline]
pub fn get_cardinality(&self) -> Cardinality {
match self {
ColumnIndex::Empty { num_docs: 0 } | ColumnIndex::Full => Cardinality::Full,
ColumnIndex::Empty { .. } => Cardinality::Optional,
ColumnIndex::Full => Cardinality::Full,
ColumnIndex::Optional(_) => Cardinality::Optional,
ColumnIndex::Multivalued(_) => Cardinality::Multivalued,
}
@@ -152,3 +156,21 @@ impl ColumnIndex {
}
}
}
#[cfg(test)]
mod tests {
use crate::{Cardinality, ColumnIndex};
#[test]
fn test_column_index_get_cardinality() {
assert_eq!(
ColumnIndex::Empty { num_docs: 0 }.get_cardinality(),
Cardinality::Full
);
assert_eq!(ColumnIndex::Full.get_cardinality(), Cardinality::Full);
assert_eq!(
ColumnIndex::Empty { num_docs: 1 }.get_cardinality(),
Cardinality::Optional
);
}
}

View File

@@ -35,6 +35,14 @@ pub struct MultiValueIndex {
pub start_index_column: Arc<dyn crate::ColumnValues<RowId>>,
}
impl std::fmt::Debug for MultiValueIndex {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("MultiValuedIndex")
.field("num_rows", &self.start_index_column.num_vals())
.finish_non_exhaustive()
}
}
impl From<Arc<dyn ColumnValues<RowId>>> for MultiValueIndex {
fn from(start_index_column: Arc<dyn ColumnValues<RowId>>) -> Self {
MultiValueIndex { start_index_column }

View File

@@ -88,6 +88,15 @@ pub struct OptionalIndex {
block_metas: Arc<[BlockMeta]>,
}
impl std::fmt::Debug for OptionalIndex {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OptionalIndex")
.field("num_rows", &self.num_rows)
.field("num_non_null_rows", &self.num_non_null_rows)
.finish_non_exhaustive()
}
}
/// Splits a value address into lower and upper 16bits.
/// The lower 16 bits are the value in the block
/// The upper 16 bits are the block index

View File

@@ -1,3 +1,4 @@
use std::fmt;
use std::fmt::Debug;
use std::net::Ipv6Addr;
@@ -21,6 +22,22 @@ pub enum ColumnType {
DateTime = 7u8,
}
impl fmt::Display for ColumnType {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let short_str = match self {
ColumnType::I64 => "i64",
ColumnType::U64 => "u64",
ColumnType::F64 => "f64",
ColumnType::Bytes => "bytes",
ColumnType::Str => "str",
ColumnType::Bool => "bool",
ColumnType::IpAddr => "ip",
ColumnType::DateTime => "datetime",
};
write!(f, "{}", short_str)
}
}
// The order needs to match _exactly_ the order in the enum
const COLUMN_TYPES: [ColumnType; 8] = [
ColumnType::I64,

View File

@@ -28,7 +28,7 @@ use crate::{
///
/// See also [README.md].
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
enum ColumnTypeCategory {
pub(crate) enum ColumnTypeCategory {
Bool,
Str,
Numerical,
@@ -78,7 +78,7 @@ pub fn merge_columnar(
output: &mut impl io::Write,
) -> io::Result<()> {
let mut serializer = ColumnarSerializer::new(output);
let num_rows_per_column = columnar_readers
let num_rows_per_columnar = columnar_readers
.iter()
.map(|reader| reader.num_rows())
.collect::<Vec<u32>>();
@@ -89,7 +89,7 @@ pub fn merge_columnar(
serializer.serialize_column(column_name.as_bytes(), column_type);
merge_column(
column_type,
&num_rows_per_column,
&num_rows_per_columnar,
columns,
&merge_row_order,
&mut column_serializer,

View File

@@ -5,6 +5,8 @@ mod reader;
mod writer;
pub use column_type::{ColumnType, HasAssociatedColumnType};
#[cfg(test)]
pub(crate) use merge::ColumnTypeCategory;
pub use merge::{merge_columnar, MergeRowOrder, ShuffleMergeOrder, StackMergeOrder};
pub use reader::ColumnarReader;
pub use writer::ColumnarWriter;

View File

@@ -1,4 +1,4 @@
use std::{io, mem};
use std::{fmt, io, mem};
use common::file_slice::FileSlice;
use common::BinarySerializable;
@@ -21,6 +21,32 @@ pub struct ColumnarReader {
num_rows: RowId,
}
impl fmt::Debug for ColumnarReader {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let num_rows = self.num_rows();
let columns = self.list_columns().unwrap();
let num_cols = columns.len();
let mut debug_struct = f.debug_struct("Columnar");
debug_struct
.field("num_rows", &num_rows)
.field("num_cols", &num_cols);
for (col_name, dynamic_column_handle) in columns.into_iter().take(5) {
let col = dynamic_column_handle.open().unwrap();
if col.num_values() > 10 {
debug_struct.field(&col_name, &"..");
} else {
debug_struct.field(&col_name, &col);
}
}
if num_cols > 5 {
debug_struct.finish_non_exhaustive()?;
} else {
debug_struct.finish()?;
}
Ok(())
}
}
/// Functions by both the async/sync code listing columns.
/// It takes a stream from the column sstable and return the list of
/// `DynamicColumn` available in it.

View File

@@ -1,6 +1,6 @@
use std::io;
use std::net::Ipv6Addr;
use std::sync::Arc;
use std::{fmt, io};
use common::file_slice::FileSlice;
use common::{ByteCount, DateTime, HasLen, OwnedBytes};
@@ -8,7 +8,7 @@ use common::{ByteCount, DateTime, HasLen, OwnedBytes};
use crate::column::{BytesColumn, Column, StrColumn};
use crate::column_values::{monotonic_map_column, StrictlyMonotonicFn};
use crate::columnar::ColumnType;
use crate::{Cardinality, NumericalType};
use crate::{Cardinality, ColumnIndex, NumericalType};
#[derive(Clone)]
pub enum DynamicColumn {
@@ -22,19 +22,54 @@ pub enum DynamicColumn {
Str(StrColumn),
}
impl DynamicColumn {
pub fn get_cardinality(&self) -> Cardinality {
impl fmt::Debug for DynamicColumn {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "[{} {} |", self.get_cardinality(), self.column_type())?;
match self {
DynamicColumn::Bool(c) => c.get_cardinality(),
DynamicColumn::I64(c) => c.get_cardinality(),
DynamicColumn::U64(c) => c.get_cardinality(),
DynamicColumn::F64(c) => c.get_cardinality(),
DynamicColumn::IpAddr(c) => c.get_cardinality(),
DynamicColumn::DateTime(c) => c.get_cardinality(),
DynamicColumn::Bytes(c) => c.ords().get_cardinality(),
DynamicColumn::Str(c) => c.ords().get_cardinality(),
DynamicColumn::Bool(col) => write!(f, " {:?}", col)?,
DynamicColumn::I64(col) => write!(f, " {:?}", col)?,
DynamicColumn::U64(col) => write!(f, " {:?}", col)?,
DynamicColumn::F64(col) => write!(f, "{:?}", col)?,
DynamicColumn::IpAddr(col) => write!(f, "{:?}", col)?,
DynamicColumn::DateTime(col) => write!(f, "{:?}", col)?,
DynamicColumn::Bytes(col) => write!(f, "{:?}", col)?,
DynamicColumn::Str(col) => write!(f, "{:?}", col)?,
}
write!(f, "]")
}
}
impl DynamicColumn {
pub fn column_index(&self) -> &ColumnIndex {
match self {
DynamicColumn::Bool(c) => &c.index,
DynamicColumn::I64(c) => &c.index,
DynamicColumn::U64(c) => &c.index,
DynamicColumn::F64(c) => &c.index,
DynamicColumn::IpAddr(c) => &c.index,
DynamicColumn::DateTime(c) => &c.index,
DynamicColumn::Bytes(c) => &c.ords().index,
DynamicColumn::Str(c) => &c.ords().index,
}
}
pub fn get_cardinality(&self) -> Cardinality {
self.column_index().get_cardinality()
}
pub fn num_values(&self) -> u32 {
match self {
DynamicColumn::Bool(c) => c.values.num_vals(),
DynamicColumn::I64(c) => c.values.num_vals(),
DynamicColumn::U64(c) => c.values.num_vals(),
DynamicColumn::F64(c) => c.values.num_vals(),
DynamicColumn::IpAddr(c) => c.values.num_vals(),
DynamicColumn::DateTime(c) => c.values.num_vals(),
DynamicColumn::Bytes(c) => c.ords().values.num_vals(),
DynamicColumn::Str(c) => c.ords().values.num_vals(),
}
}
pub fn column_type(&self) -> ColumnType {
match self {
DynamicColumn::Bool(_) => ColumnType::Bool,

View File

@@ -7,6 +7,7 @@ extern crate more_asserts;
#[cfg(all(test, feature = "unstable"))]
extern crate test;
use std::fmt::Display;
use std::io;
mod block_accessor;
@@ -75,6 +76,17 @@ pub enum Cardinality {
Multivalued = 2,
}
impl Display for Cardinality {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let short_str = match self {
Cardinality::Full => "full",
Cardinality::Optional => "opt",
Cardinality::Multivalued => "mult",
};
write!(f, "{short_str}")
}
}
impl Cardinality {
pub fn is_optional(&self) -> bool {
matches!(self, Cardinality::Optional)
@@ -85,7 +97,6 @@ impl Cardinality {
pub(crate) fn to_code(self) -> u8 {
self as u8
}
pub(crate) fn try_from_code(code: u8) -> Result<Cardinality, InvalidData> {
match code {
0 => Ok(Cardinality::Full),

View File

@@ -1,10 +1,17 @@
use std::collections::HashMap;
use std::fmt::Debug;
use std::net::Ipv6Addr;
use common::DateTime;
use proptest::prelude::*;
use crate::column_values::MonotonicallyMappableToU128;
use crate::columnar::ColumnType;
use crate::dynamic_column::{DynamicColumn, DynamicColumnHandle};
use crate::value::NumericalValue;
use crate::{Cardinality, ColumnarReader, ColumnarWriter};
use crate::value::{Coerce, NumericalValue};
use crate::{
BytesColumn, Cardinality, Column, ColumnarReader, ColumnarWriter, RowId, StackMergeOrder,
};
#[test]
fn test_dataframe_writer_str() {
@@ -210,3 +217,422 @@ fn test_dictionary_encoded_bytes() {
.unwrap();
assert_eq!(term_buffer, b"b");
}
fn num_strategy() -> impl Strategy<Value = NumericalValue> {
prop_oneof![
Just(NumericalValue::U64(0u64)),
Just(NumericalValue::U64(u64::MAX)),
Just(NumericalValue::I64(0i64)),
Just(NumericalValue::I64(i64::MIN)),
Just(NumericalValue::I64(i64::MAX)),
Just(NumericalValue::F64(1.2f64)),
]
}
#[derive(Debug, Clone, Copy)]
enum ColumnValue {
Str(&'static str),
Bytes(&'static [u8]),
Numerical(NumericalValue),
IpAddr(Ipv6Addr),
Bool(bool),
DateTime(DateTime),
}
impl ColumnValue {
pub(crate) fn column_type_category(&self) -> ColumnTypeCategory {
match self {
ColumnValue::Str(_) => ColumnTypeCategory::Str,
ColumnValue::Bytes(_) => ColumnTypeCategory::Bytes,
ColumnValue::Numerical(numerical_val) => ColumnTypeCategory::Numerical,
ColumnValue::IpAddr(_) => ColumnTypeCategory::IpAddr,
ColumnValue::Bool(_) => ColumnTypeCategory::Bool,
ColumnValue::DateTime(_) => ColumnTypeCategory::DateTime,
}
}
}
fn column_name_strategy() -> impl Strategy<Value = &'static str> {
prop_oneof![Just("c1"), Just("c2")]
}
fn string_strategy() -> impl Strategy<Value = &'static str> {
prop_oneof![Just("a"), Just("b")]
}
fn bytes_strategy() -> impl Strategy<Value = &'static [u8]> {
prop_oneof![Just(&[0u8][..]), Just(&[1u8][..])]
}
// A random column value
fn column_value_strategy() -> impl Strategy<Value = ColumnValue> {
prop_oneof![
string_strategy().prop_map(|s| ColumnValue::Str(s)),
bytes_strategy().prop_map(|b| ColumnValue::Bytes(b)),
num_strategy().prop_map(|n| ColumnValue::Numerical(n)),
(1u16..3u16).prop_map(|ip_addr_byte| ColumnValue::IpAddr(Ipv6Addr::new(
127,
0,
0,
0,
0,
0,
0,
ip_addr_byte
))),
any::<bool>().prop_map(|b| ColumnValue::Bool(b)),
(0_679_723_993i64..1_679_723_995i64)
.prop_map(|val| { ColumnValue::DateTime(DateTime::from_timestamp_secs(val)) })
]
}
// A document contains up to 4 values.
fn doc_strategy() -> impl Strategy<Value = Vec<(&'static str, ColumnValue)>> {
proptest::collection::vec((column_name_strategy(), column_value_strategy()), 0..4)
}
// A columnar contains up to 2 docs.
fn columnar_docs_strategy() -> impl Strategy<Value = Vec<Vec<(&'static str, ColumnValue)>>> {
proptest::collection::vec(doc_strategy(), 0..=2)
}
fn columnar_docs_and_mapping_strategy(
) -> impl Strategy<Value = (Vec<Vec<(&'static str, ColumnValue)>>, Vec<RowId>)> {
columnar_docs_strategy().prop_flat_map(|docs| {
permutation_strategy(docs.len()).prop_map(move |permutation| (docs.clone(), permutation))
})
}
fn permutation_strategy(n: usize) -> impl Strategy<Value = Vec<RowId>> {
Just((0u32..n as RowId).collect()).prop_shuffle()
}
fn build_columnar_with_mapping(
docs: &[Vec<(&'static str, ColumnValue)>],
old_to_new_row_ids_opt: Option<&[RowId]>,
) -> ColumnarReader {
let num_docs = docs.len() as u32;
let mut buffer = Vec::new();
let mut columnar_writer = ColumnarWriter::default();
for (doc_id, vals) in docs.iter().enumerate() {
for (column_name, col_val) in vals {
match *col_val {
ColumnValue::Str(str_val) => {
columnar_writer.record_str(doc_id as u32, column_name, str_val);
}
ColumnValue::Bytes(bytes) => {
columnar_writer.record_bytes(doc_id as u32, column_name, bytes)
}
ColumnValue::Numerical(num) => {
columnar_writer.record_numerical(doc_id as u32, column_name, num);
}
ColumnValue::IpAddr(ip_addr) => {
columnar_writer.record_ip_addr(doc_id as u32, column_name, ip_addr);
}
ColumnValue::Bool(bool_val) => {
columnar_writer.record_bool(doc_id as u32, column_name, bool_val);
}
ColumnValue::DateTime(date_time) => {
columnar_writer.record_datetime(doc_id as u32, column_name, date_time);
}
}
}
}
columnar_writer
.serialize(num_docs, old_to_new_row_ids_opt, &mut buffer)
.unwrap();
let columnar_reader = ColumnarReader::open(buffer).unwrap();
columnar_reader
}
fn build_columnar(docs: &[Vec<(&'static str, ColumnValue)>]) -> ColumnarReader {
build_columnar_with_mapping(docs, None)
}
fn assert_columnar_eq(left: &ColumnarReader, right: &ColumnarReader) {
assert_eq!(left.num_rows(), right.num_rows());
let left_columns = left.list_columns().unwrap();
let right_columns = right.list_columns().unwrap();
assert_eq!(left_columns.len(), right_columns.len());
for i in 0..left_columns.len() {
assert_eq!(left_columns[i].0, right_columns[i].0);
let left_column = left_columns[i].1.open().unwrap();
let right_column = right_columns[i].1.open().unwrap();
assert_dyn_column_eq(&left_column, &right_column);
}
}
fn assert_column_eq<T: PartialEq + Copy>(left: &Column<T>, right: &Column<T>) {}
fn assert_bytes_column_eq(left: &BytesColumn, right: &BytesColumn) {}
fn assert_dyn_column_eq(left_dyn_column: &DynamicColumn, right_dyn_column: &DynamicColumn) {
assert_eq!(
&left_dyn_column.column_type(),
&right_dyn_column.column_type()
);
assert_eq!(
&left_dyn_column.get_cardinality(),
&right_dyn_column.get_cardinality()
);
match &(left_dyn_column, right_dyn_column) {
(DynamicColumn::Bool(left_col), DynamicColumn::Bool(right_col)) => {
assert_column_eq(left_col, right_col);
}
(DynamicColumn::I64(left_col), DynamicColumn::I64(right_col)) => {
assert_column_eq(left_col, right_col);
}
(DynamicColumn::U64(left_col), DynamicColumn::U64(right_col)) => {
assert_column_eq(left_col, right_col);
}
(DynamicColumn::F64(left_col), DynamicColumn::F64(right_col)) => {
assert_column_eq(left_col, right_col);
}
(DynamicColumn::DateTime(left_col), DynamicColumn::DateTime(right_col)) => {
assert_column_eq(left_col, right_col);
}
(DynamicColumn::IpAddr(left_col), DynamicColumn::IpAddr(right_col)) => {
assert_column_eq(left_col, right_col);
}
(DynamicColumn::Bytes(left_col), DynamicColumn::Bytes(right_col)) => {
assert_bytes_column_eq(left_col, right_col);
}
(DynamicColumn::Str(left_col), DynamicColumn::Str(right_col)) => {
assert_bytes_column_eq(left_col, right_col);
}
_ => {
unreachable!()
}
}
}
trait AssertEqualToColumnValue {
fn assert_equal_to_column_value(&self, column_value: &ColumnValue);
}
use crate::columnar::ColumnTypeCategory;
impl AssertEqualToColumnValue for bool {
fn assert_equal_to_column_value(&self, column_value: &ColumnValue) {
let ColumnValue::Bool(val) = column_value else { panic!() };
assert_eq!(self, val);
}
}
impl AssertEqualToColumnValue for Ipv6Addr {
fn assert_equal_to_column_value(&self, column_value: &ColumnValue) {
let ColumnValue::IpAddr(val) = column_value else { panic!() };
assert_eq!(self, val);
}
}
impl<T: Coerce + PartialEq + Debug + Into<NumericalValue>> AssertEqualToColumnValue for T {
fn assert_equal_to_column_value(&self, column_value: &ColumnValue) {
let ColumnValue::Numerical(num) = column_value else { panic!() };
assert_eq!(self, &T::coerce(*num));
}
}
impl AssertEqualToColumnValue for DateTime {
fn assert_equal_to_column_value(&self, column_value: &ColumnValue) {
let ColumnValue::DateTime(dt) = column_value else { panic!() };
assert_eq!(self, dt);
}
}
fn assert_column_values<
T: AssertEqualToColumnValue + PartialEq + Copy + PartialOrd + Debug + Send + Sync + 'static,
>(
col: &Column<T>,
expected: &HashMap<u32, Vec<&ColumnValue>>,
) {
let mut num_non_empty_rows = 0;
for doc in 0..col.num_docs() {
let doc_vals: Vec<T> = col.values_for_doc(doc).collect();
if doc_vals.is_empty() {
continue;
}
num_non_empty_rows += 1;
let expected_vals = expected.get(&doc).unwrap();
assert_eq!(doc_vals.len(), expected_vals.len());
for (val, &expected) in doc_vals.iter().zip(expected_vals.iter()) {
val.assert_equal_to_column_value(expected)
}
}
assert_eq!(num_non_empty_rows, expected.len());
}
fn assert_bytes_column_values(
col: &BytesColumn,
expected: &HashMap<u32, Vec<&ColumnValue>>,
is_str: bool,
) {
let mut num_non_empty_rows = 0;
let mut buffer = Vec::new();
for doc in 0..col.term_ord_column.num_docs() {
let doc_vals: Vec<u64> = col.term_ords(doc).collect();
if doc_vals.is_empty() {
continue;
}
let expected_vals = expected.get(&doc).unwrap();
assert_eq!(doc_vals.len(), expected_vals.len());
for (&expected_col_val, &ord) in expected_vals.iter().zip(&doc_vals) {
col.ord_to_bytes(ord, &mut buffer).unwrap();
match expected_col_val {
ColumnValue::Str(str_val) => {
assert!(is_str);
assert_eq!(str_val.as_bytes(), &buffer);
}
ColumnValue::Bytes(bytes_val) => {
assert!(!is_str);
assert_eq!(bytes_val, &buffer);
}
_ => {
panic!();
}
}
}
num_non_empty_rows += 1;
}
assert_eq!(num_non_empty_rows, expected.len());
}
proptest! {
/// This proptest attempts to create a tiny columnar based of up to 3 rows, and checks that the resulting
/// columnar matches the row data.
#[test]
fn test_single_columnar_builder_proptest(docs in columnar_docs_strategy()) {
let columnar = build_columnar(&docs[..]);
assert_eq!(columnar.num_rows() as usize, docs.len());
let mut expected_columns: HashMap<(&str, ColumnTypeCategory), HashMap<u32, Vec<&ColumnValue>> > = Default::default();
for (doc_id, doc_vals) in docs.iter().enumerate() {
for (col_name, col_val) in doc_vals {
expected_columns
.entry((col_name, col_val.column_type_category()))
.or_default()
.entry(doc_id as u32)
.or_default()
.push(col_val);
}
}
let column_list = columnar.list_columns().unwrap();
assert_eq!(expected_columns.len(), column_list.len());
for (column_name, column) in column_list {
let dynamic_column = column.open().unwrap();
let col_category: ColumnTypeCategory = dynamic_column.column_type().into();
let expected_col_values: &HashMap<u32, Vec<&ColumnValue>> = expected_columns.get(&(column_name.as_str(), col_category)).unwrap();
match &dynamic_column {
DynamicColumn::Bool(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::I64(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::U64(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::F64(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::IpAddr(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::DateTime(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::Bytes(col) =>
assert_bytes_column_values(col, expected_col_values, false),
DynamicColumn::Str(col) =>
assert_bytes_column_values(col, expected_col_values, true),
}
}
}
/// Same as `test_single_columnar_builder_proptest` but with a shuffling mapping.
#[test]
fn test_single_columnar_builder_with_shuffle_proptest((docs, mapping) in columnar_docs_and_mapping_strategy()) {
let columnar = build_columnar_with_mapping(&docs[..], Some(&mapping));
assert_eq!(columnar.num_rows() as usize, docs.len());
let mut expected_columns: HashMap<(&str, ColumnTypeCategory), HashMap<u32, Vec<&ColumnValue>> > = Default::default();
for (doc_id, doc_vals) in docs.iter().enumerate() {
for (col_name, col_val) in doc_vals {
expected_columns
.entry((col_name, col_val.column_type_category()))
.or_default()
.entry(mapping[doc_id])
.or_default()
.push(col_val);
}
}
let column_list = columnar.list_columns().unwrap();
assert_eq!(expected_columns.len(), column_list.len());
for (column_name, column) in column_list {
let dynamic_column = column.open().unwrap();
let col_category: ColumnTypeCategory = dynamic_column.column_type().into();
let expected_col_values: &HashMap<u32, Vec<&ColumnValue>> = expected_columns.get(&(column_name.as_str(), col_category)).unwrap();
for doc_id in 0..columnar.num_rows() {
match &dynamic_column {
DynamicColumn::Bool(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::I64(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::U64(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::F64(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::IpAddr(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::DateTime(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::Bytes(col) =>
assert_bytes_column_values(col, expected_col_values, false),
DynamicColumn::Str(col) =>
assert_bytes_column_values(col, expected_col_values, true),
}
}
}
}
/// This tests create 2 or 3 random small columnar and attempts to merge them.
/// It compares the resulting merged dataframe with what would have been obtained by building the
/// dataframe from the concatenated rows to begin with.
#[test]
fn test_columnar_merge_proptest(columnar_docs in proptest::collection::vec(columnar_docs_strategy(), 2..=3)) {
let columnar_readers: Vec<ColumnarReader> = columnar_docs.iter()
.map(|docs| build_columnar(&docs[..]))
.collect::<Vec<_>>();
let columnar_readers_arr: Vec<&ColumnarReader> = columnar_readers.iter().collect();
let mut output: Vec<u8> = Vec::new();
let stack_merge_order = StackMergeOrder::stack(&columnar_readers_arr[..]);
crate::merge_columnar(&columnar_readers_arr[..], &[], crate::MergeRowOrder::Stack(stack_merge_order), &mut output).unwrap();
let merged_columnar = ColumnarReader::open(output).unwrap();
let concat_rows: Vec<Vec<(&'static str, ColumnValue)>> = columnar_docs.iter().cloned().flatten().collect();
let expected_merged_columnar = build_columnar(&concat_rows[..]);
assert_columnar_eq(&merged_columnar, &expected_merged_columnar);
}
}
#[test]
fn test_columnar_failing_test() {
let columnar_docs: Vec<Vec<Vec<(&str, ColumnValue)>>> =
vec![vec![], vec![vec![("c1", ColumnValue::Str("a"))]]];
let columnar_readers: Vec<ColumnarReader> = columnar_docs
.iter()
.map(|docs| build_columnar(&docs[..]))
.collect::<Vec<_>>();
let columnar_readers_arr: Vec<&ColumnarReader> = columnar_readers.iter().collect();
let mut output: Vec<u8> = Vec::new();
let stack_merge_order = StackMergeOrder::stack(&columnar_readers_arr[..]);
crate::merge_columnar(
&columnar_readers_arr[..],
&[],
crate::MergeRowOrder::Stack(stack_merge_order),
&mut output,
)
.unwrap();
let merged_columnar = ColumnarReader::open(output).unwrap();
let concat_rows: Vec<Vec<(&'static str, ColumnValue)>> =
columnar_docs.iter().cloned().flatten().collect();
let expected_merged_columnar = build_columnar(&concat_rows[..]);
assert_columnar_eq(&merged_columnar, &expected_merged_columnar);
}
// TODO add non trivial remap and merge
// TODO test required
// TODO add support for empty columnar.