From 1330e6f10d0e27f7cd5438c16aee9bdd8e1a26cc Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Tue, 24 Jan 2023 17:06:23 +0800 Subject: [PATCH] prepare for merge --- columnar/src/column/mod.rs | 3 + columnar/src/columnar/merge.rs | 97 ++++++++++++++++++++-------- columnar/src/columnar/merge_index.rs | 1 + columnar/src/columnar/mod.rs | 1 + columnar/src/columnar/reader/mod.rs | 1 + columnar/src/columnar/writer/mod.rs | 2 +- columnar/src/dynamic_column.rs | 14 +++- columnar/src/lib.rs | 6 ++ sstable/src/dictionary.rs | 1 + sstable/src/lib.rs | 1 + sstable/src/sstable_index.rs | 4 +- 11 files changed, 99 insertions(+), 32 deletions(-) create mode 100644 columnar/src/columnar/merge_index.rs diff --git a/columnar/src/column/mod.rs b/columnar/src/column/mod.rs index a0aaea1da..0ac5625a4 100644 --- a/columnar/src/column/mod.rs +++ b/columnar/src/column/mod.rs @@ -23,6 +23,9 @@ pub struct Column { } impl Column { + pub fn get_cardinality(&self) -> Cardinality { + self.idx.get_cardinality() + } pub fn num_rows(&self) -> RowId { match &self.idx { ColumnIndex::Full => self.values.num_vals() as u32, diff --git a/columnar/src/columnar/merge.rs b/columnar/src/columnar/merge.rs index 3255dc309..1970b8926 100644 --- a/columnar/src/columnar/merge.rs +++ b/columnar/src/columnar/merge.rs @@ -1,9 +1,11 @@ use std::collections::HashMap; use std::io; -use super::column_type::ColumnTypeCategory; use crate::columnar::ColumnarReader; +use crate::columnar::column_type::ColumnTypeCategory; use crate::dynamic_column::DynamicColumn; +use super::writer::ColumnarSerializer; +use crate::{Cardinality, ColumnType}; pub enum MergeDocOrder { /// Columnar tables are simply stacked one above the other. @@ -19,24 +21,67 @@ pub enum MergeDocOrder { } pub fn merge_columnar( - _columnar_readers: &[ColumnarReader], + columnar_readers: &[ColumnarReader], mapping: MergeDocOrder, - _output: &mut impl io::Write, + output: &mut impl io::Write, ) -> io::Result<()> { - match mapping { - MergeDocOrder::Stack => { - // implement me :) - todo!(); - } - MergeDocOrder::Complex(_) => { - // for later - todo!(); + let mut serializer = ColumnarSerializer::new(output); + + // TODO handle dictionary merge for Str/Bytes column + let field_name_to_group = group_columns_for_merge(columnar_readers)?; + for (column_name, category_to_columns) in field_name_to_group { + for (_category, columns_to_merge) in category_to_columns { + let column_type = columns_to_merge[0].column_type(); + let mut column_serialzier = + serializer.serialize_column(column_name.as_bytes(), column_type); + merge_columns( + column_type, + &columns_to_merge, + &mapping, + &mut column_serialzier, + )?; } } + serializer.finalize()?; + + Ok(()) } -pub fn collect_columns( - columnar_readers: &[&ColumnarReader], +pub fn detect_cardinality(columns: &[DynamicColumn]) -> Cardinality { + if columns + .iter() + .any(|column| column.get_cardinality().is_multivalue()) + { + return Cardinality::Multivalued; + } + if columns + .iter() + .any(|column| column.get_cardinality().is_optional()) + { + return Cardinality::Optional; + } + Cardinality::Full +} + +pub fn compute_num_docs(columns: &[DynamicColumn], mapping: &MergeDocOrder) -> usize { + // TODO handle deletes + + 0 +} + +pub fn merge_columns( + column_type: ColumnType, + columns: &[DynamicColumn], + mapping: &MergeDocOrder, + column_serializer: &mut impl io::Write, +) -> io::Result<()> { + let cardinality = detect_cardinality(columns); + + Ok(()) +} + +pub fn group_columns_for_merge( + columnar_readers: &[ColumnarReader], ) -> io::Result>>> { // Each column name may have multiple types of column associated. // For merging we are interested in the same column type category since they can be merged. @@ -85,26 +130,20 @@ fn cast_to_common_numerical_column(columns: &[DynamicColumn]) -> Vec = columns .iter() - .map(|column| column.clone().coerce_to_i64()) + .filter_map(|column| column.clone().coerce_to_i64()) .collect(); - if coerce_to_i64.iter().all(|column| column.is_some()) { - return coerce_to_i64 - .into_iter() - .map(|column| column.unwrap()) - .collect(); + if coerce_to_i64.len() == columns.len() { + return coerce_to_i64; } let coerce_to_u64: Vec<_> = columns .iter() - .map(|column| column.clone().coerce_to_u64()) + .filter_map(|column| column.clone().coerce_to_u64()) .collect(); - if coerce_to_u64.iter().all(|column| column.is_some()) { - return coerce_to_u64 - .into_iter() - .map(|column| column.unwrap()) - .collect(); + if coerce_to_u64.len() == columns.len() { + return coerce_to_u64; } columns @@ -151,7 +190,9 @@ mod tests { ColumnarReader::open(buffer).unwrap() }; - let column_map = collect_columns(&[&columnar1, &columnar2, &columnar3]).unwrap(); + let column_map = + group_columns_for_merge(&[columnar1.clone(), columnar2.clone(), columnar3.clone()]) + .unwrap(); assert_eq!(column_map.len(), 1); let cat_to_columns = column_map.get("numbers").unwrap(); assert_eq!(cat_to_columns.len(), 1); @@ -159,14 +200,14 @@ mod tests { let numerical = cat_to_columns.get(&ColumnTypeCategory::Numerical).unwrap(); assert!(numerical.iter().all(|column| column.is_f64())); - let column_map = collect_columns(&[&columnar1, &columnar1]).unwrap(); + let column_map = group_columns_for_merge(&[columnar1.clone(), columnar1.clone()]).unwrap(); assert_eq!(column_map.len(), 1); let cat_to_columns = column_map.get("numbers").unwrap(); assert_eq!(cat_to_columns.len(), 1); let numerical = cat_to_columns.get(&ColumnTypeCategory::Numerical).unwrap(); assert!(numerical.iter().all(|column| column.is_i64())); - let column_map = collect_columns(&[&columnar2, &columnar2]).unwrap(); + let column_map = group_columns_for_merge(&[columnar2.clone(), columnar2.clone()]).unwrap(); assert_eq!(column_map.len(), 1); let cat_to_columns = column_map.get("numbers").unwrap(); assert_eq!(cat_to_columns.len(), 1); diff --git a/columnar/src/columnar/merge_index.rs b/columnar/src/columnar/merge_index.rs new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/columnar/src/columnar/merge_index.rs @@ -0,0 +1 @@ + diff --git a/columnar/src/columnar/mod.rs b/columnar/src/columnar/mod.rs index 30c4677b7..6be7d6ebb 100644 --- a/columnar/src/columnar/mod.rs +++ b/columnar/src/columnar/mod.rs @@ -1,6 +1,7 @@ mod column_type; mod format_version; mod merge; +mod merge_index; mod reader; mod writer; diff --git a/columnar/src/columnar/reader/mod.rs b/columnar/src/columnar/reader/mod.rs index 130ab452a..f6ffe8d9c 100644 --- a/columnar/src/columnar/reader/mod.rs +++ b/columnar/src/columnar/reader/mod.rs @@ -13,6 +13,7 @@ fn io_invalid_data(msg: String) -> io::Error { /// The ColumnarReader makes it possible to access a set of columns /// associated to field names. +#[derive(Clone)] pub struct ColumnarReader { column_dictionary: Dictionary, column_data: FileSlice, diff --git a/columnar/src/columnar/writer/mod.rs b/columnar/src/columnar/writer/mod.rs index c7f751d3a..59dd3d20d 100644 --- a/columnar/src/columnar/writer/mod.rs +++ b/columnar/src/columnar/writer/mod.rs @@ -8,7 +8,7 @@ use std::net::Ipv6Addr; use column_operation::ColumnOperation; use common::CountingWriter; -use serializer::ColumnarSerializer; +pub(crate) use serializer::ColumnarSerializer; use stacker::{Addr, ArenaHashMap, MemoryArena}; use crate::column_index::SerializableColumnIndex; diff --git a/columnar/src/dynamic_column.rs b/columnar/src/dynamic_column.rs index f668d08c8..64b7979c1 100644 --- a/columnar/src/dynamic_column.rs +++ b/columnar/src/dynamic_column.rs @@ -8,7 +8,7 @@ use common::{HasLen, OwnedBytes}; use crate::column::{BytesColumn, Column, StrColumn}; use crate::column_values::{monotonic_map_column, StrictlyMonotonicFn}; use crate::columnar::ColumnType; -use crate::{DateTime, NumericalType}; +use crate::{Cardinality, DateTime, NumericalType}; #[derive(Clone)] pub enum DynamicColumn { @@ -23,6 +23,18 @@ pub enum DynamicColumn { } impl DynamicColumn { + pub fn get_cardinality(&self) -> Cardinality { + match self { + DynamicColumn::Bool(c) => c.get_cardinality(), + DynamicColumn::I64(c) => c.get_cardinality(), + DynamicColumn::U64(c) => c.get_cardinality(), + DynamicColumn::F64(c) => c.get_cardinality(), + DynamicColumn::IpAddr(c) => c.get_cardinality(), + DynamicColumn::DateTime(c) => c.get_cardinality(), + DynamicColumn::Bytes(c) => c.ords().get_cardinality(), + DynamicColumn::Str(c) => c.ords().get_cardinality(), + } + } pub fn column_type(&self) -> ColumnType { match self { DynamicColumn::Bool(_) => ColumnType::Bool, diff --git a/columnar/src/lib.rs b/columnar/src/lib.rs index 20cd981bb..ee349d015 100644 --- a/columnar/src/lib.rs +++ b/columnar/src/lib.rs @@ -66,6 +66,12 @@ pub enum Cardinality { } impl Cardinality { + pub fn is_optional(&self) -> bool { + matches!(self, Cardinality::Optional) + } + pub fn is_multivalue(&self) -> bool { + matches!(self, Cardinality::Multivalued) + } pub(crate) fn to_code(self) -> u8 { self as u8 } diff --git a/sstable/src/dictionary.rs b/sstable/src/dictionary.rs index afab84629..eafc439c9 100644 --- a/sstable/src/dictionary.rs +++ b/sstable/src/dictionary.rs @@ -30,6 +30,7 @@ use crate::{BlockAddr, DeltaReader, Reader, SSTable, SSTableIndex, TermOrdinal, /// block boundary. /// /// (See also README.md) +#[derive(Debug, Clone)] pub struct Dictionary { pub sstable_slice: FileSlice, pub sstable_index: SSTableIndex, diff --git a/sstable/src/lib.rs b/sstable/src/lib.rs index 725b3fa41..30d55d926 100644 --- a/sstable/src/lib.rs +++ b/sstable/src/lib.rs @@ -117,6 +117,7 @@ impl SSTable for MonotonicU64SSTable { /// `range_sstable[k1].end == range_sstable[k2].start`. /// /// The first range is not required to start at `0`. +#[derive(Clone, Copy, Debug)] pub struct RangeSSTable; impl SSTable for RangeSSTable { diff --git a/sstable/src/sstable_index.rs b/sstable/src/sstable_index.rs index ccf997f05..43f80d099 100644 --- a/sstable/src/sstable_index.rs +++ b/sstable/src/sstable_index.rs @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize}; use crate::{common_prefix_len, SSTableDataCorruption, TermOrdinal}; -#[derive(Default, Debug, Serialize, Deserialize)] +#[derive(Default, Debug, Clone, Serialize, Deserialize)] pub struct SSTableIndex { blocks: Vec, } @@ -75,7 +75,7 @@ pub struct BlockAddr { pub first_ordinal: u64, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub(crate) struct BlockMeta { /// Any byte string that is lexicographically greater or equal to /// the last key in the block,