diff --git a/columnar/src/column/dictionary_encoded.rs b/columnar/src/column/dictionary_encoded.rs index fd45c69dc..8020e4314 100644 --- a/columnar/src/column/dictionary_encoded.rs +++ b/columnar/src/column/dictionary_encoded.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use sstable::{Dictionary, VoidSSTable}; use crate::column::Column; -use crate::column_index::ColumnIndex; +use crate::RowId; /// Dictionary encoded column. #[derive(Clone)] @@ -17,19 +17,58 @@ pub struct BytesColumn { impl BytesColumn { /// Returns `false` if the term does not exist (e.g. `term_ord` is greater or equal to the /// overll number of terms). - pub fn term_ord_to_str(&self, term_ord: u64, output: &mut Vec) -> io::Result { + pub fn ord_to_bytes(&self, term_ord: u64, output: &mut Vec) -> io::Result { self.dictionary.ord_to_term(term_ord, output) } + pub fn num_rows(&self) -> RowId { + self.term_ord_column.num_rows() + } + pub fn term_ords(&self) -> &Column { &self.term_ord_column } } -impl Deref for BytesColumn { - type Target = ColumnIndex<'static>; +#[derive(Clone)] +pub struct StrColumn(BytesColumn); - fn deref(&self) -> &Self::Target { - &**self.term_ords() +impl From for StrColumn { + fn from(bytes_col: BytesColumn) -> Self { + StrColumn(bytes_col) + } +} + +impl StrColumn { + pub fn ord_to_str(&self, term_ord: u64, output: &mut String) -> io::Result { + unsafe { + let buf = output.as_mut_vec(); + self.0.dictionary.ord_to_term(term_ord, buf)?; + // TODO consider remove checks if it hurts performance. + if std::str::from_utf8(buf.as_slice()).is_err() { + buf.clear(); + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "Not valid utf-8", + )); + } + } + Ok(true) + } + + pub fn num_rows(&self) -> RowId { + self.term_ord_column.num_rows() + } + + pub fn ordinal_dictionary(&self) -> &Column { + &self.0.term_ord_column + } +} + +impl Deref for StrColumn { + type Target = BytesColumn; + + fn deref(&self) -> &Self::Target { + &self.0 } } diff --git a/columnar/src/column/mod.rs b/columnar/src/column/mod.rs index ca31b7b22..37087e964 100644 --- a/columnar/src/column/mod.rs +++ b/columnar/src/column/mod.rs @@ -5,11 +5,11 @@ use std::ops::Deref; use std::sync::Arc; use common::BinarySerializable; -pub use dictionary_encoded::BytesColumn; pub use serialize::{ open_column_bytes, open_column_u128, open_column_u64, serialize_column_u128, serialize_column_u64, }; +pub use dictionary_encoded::{BytesColumn, StrColumn}; use crate::column_index::ColumnIndex; use crate::column_values::ColumnValues; diff --git a/columnar/src/column/serialize.rs b/columnar/src/column/serialize.rs index 4c619cd3b..524fe6fb0 100644 --- a/columnar/src/column/serialize.rs +++ b/columnar/src/column/serialize.rs @@ -80,14 +80,15 @@ pub fn open_column_u128( }) } -pub fn open_column_bytes(data: OwnedBytes) -> io::Result { +pub fn open_column_bytes>(data: OwnedBytes) -> io::Result { let (body, dictionary_len_bytes) = data.rsplit(4); let dictionary_len = u32::from_le_bytes(dictionary_len_bytes.as_slice().try_into().unwrap()); let (dictionary_bytes, column_bytes) = body.split(dictionary_len as usize); let dictionary = Arc::new(Dictionary::from_bytes(dictionary_bytes)?); let term_ord_column = crate::column::open_column_u64::(column_bytes)?; - Ok(BytesColumn { + let bytes_column = BytesColumn { dictionary, term_ord_column, - }) + }; + Ok(bytes_column.into()) } diff --git a/columnar/src/columnar/column_type.rs b/columnar/src/columnar/column_type.rs index 7b7c755c7..f7cabff18 100644 --- a/columnar/src/columnar/column_type.rs +++ b/columnar/src/columnar/column_type.rs @@ -9,6 +9,7 @@ use crate::InvalidData; #[derive(Hash, Eq, PartialEq, Debug, Clone, Copy)] pub enum ColumnType { Bytes, + Str, Numerical(NumericalType), Bool, IpAddr, @@ -21,6 +22,10 @@ impl ColumnType { let numerical_type_code: u8; match self { ColumnType::Bytes => { + column_type_category = ColumnTypeCategory::Bytes; + numerical_type_code = 0u8; + } + ColumnType::Str => { column_type_category = ColumnTypeCategory::Str; numerical_type_code = 0u8; } @@ -64,12 +69,18 @@ impl ColumnType { if numerical_type_code != 0u8 { return Err(InvalidData); } - Ok(ColumnType::Bytes) + Ok(ColumnType::Str) } ColumnTypeCategory::Numerical => { let numerical_type = NumericalType::try_from_code(numerical_type_code)?; Ok(ColumnType::Numerical(numerical_type)) } + ColumnTypeCategory::Bytes => { + if numerical_type_code != 0u8 { + return Err(InvalidData); + } + Ok(ColumnType::Bytes) + } } } } @@ -88,6 +99,7 @@ pub(crate) enum ColumnTypeCategory { Str = 1u8, Numerical = 2u8, IpAddr = 3u8, + Bytes = 4u8, } impl ColumnTypeCategory { @@ -101,6 +113,7 @@ impl ColumnTypeCategory { 1u8 => Ok(Self::Str), 2u8 => Ok(Self::Numerical), 3u8 => Ok(Self::IpAddr), + 4u8 => Ok(Self::Bytes), _ => Err(InvalidData), } } @@ -122,7 +135,7 @@ mod tests { assert!(column_type_set.insert(column_type)); } } - assert_eq!(column_type_set.len(), 2 + 3); + assert_eq!(column_type_set.len(), 3 + 4); } #[test] diff --git a/columnar/src/columnar/merge.rs b/columnar/src/columnar/merge.rs index 7043a4de5..ba32fad74 100644 --- a/columnar/src/columnar/merge.rs +++ b/columnar/src/columnar/merge.rs @@ -1,4 +1,5 @@ use std::io; + use crate::columnar::ColumnarReader; pub enum MergeDocOrder { @@ -9,11 +10,16 @@ pub enum MergeDocOrder { /// rows [n_row_0..n_row_0 + n_row_1 contains the row of columnar_readers[1], in order. /// .. Stack, - /// Some more complex mapping, that can interleaves rows from the different readers and possibly drop rows. + /// Some more complex mapping, that can interleaves rows from the different readers and + /// possibly drop rows. Complex(()), } -pub fn merge(columnar_readers: &[ColumnarReader], mapping: MergeDocOrder, output: &mut impl io::Write) -> io::Result<()> { +pub fn merge( + columnar_readers: &[ColumnarReader], + mapping: MergeDocOrder, + output: &mut impl io::Write, +) -> io::Result<()> { match mapping { MergeDocOrder::Stack => { // implement me :) @@ -23,6 +29,5 @@ pub fn merge(columnar_readers: &[ColumnarReader], mapping: MergeDocOrder, output // for later todo!(); } - } } diff --git a/columnar/src/columnar/mod.rs b/columnar/src/columnar/mod.rs index 4b16f9873..4c44b8063 100644 --- a/columnar/src/columnar/mod.rs +++ b/columnar/src/columnar/mod.rs @@ -1,8 +1,8 @@ mod column_type; mod format_version; +mod merge; mod reader; mod writer; -mod merge; pub use column_type::ColumnType; pub use reader::ColumnarReader; diff --git a/columnar/src/columnar/writer/column_writers.rs b/columnar/src/columnar/writer/column_writers.rs index 89f112f95..acf2f1c22 100644 --- a/columnar/src/columnar/writer/column_writers.rs +++ b/columnar/src/columnar/writer/column_writers.rs @@ -176,14 +176,14 @@ impl NumericalColumnWriter { } #[derive(Copy, Clone, Default)] -pub(crate) struct StrColumnWriter { +pub(crate) struct StrOrBytesColumnWriter { pub(crate) dictionary_id: u32, pub(crate) column_writer: ColumnWriter, } -impl StrColumnWriter { - pub(crate) fn with_dictionary_id(dictionary_id: u32) -> StrColumnWriter { - StrColumnWriter { +impl StrOrBytesColumnWriter { + pub(crate) fn with_dictionary_id(dictionary_id: u32) -> StrOrBytesColumnWriter { + StrOrBytesColumnWriter { dictionary_id, column_writer: Default::default(), } diff --git a/columnar/src/columnar/writer/mod.rs b/columnar/src/columnar/writer/mod.rs index 1e6afcbdf..b1cdb92be 100644 --- a/columnar/src/columnar/writer/mod.rs +++ b/columnar/src/columnar/writer/mod.rs @@ -17,12 +17,12 @@ use crate::column_values::{ }; use crate::columnar::column_type::{ColumnType, ColumnTypeCategory}; use crate::columnar::writer::column_writers::{ - ColumnWriter, NumericalColumnWriter, StrColumnWriter, + ColumnWriter, NumericalColumnWriter, StrOrBytesColumnWriter, }; use crate::columnar::writer::value_index::{IndexBuilder, PreallocatedIndexBuilders}; use crate::dictionary::{DictionaryBuilder, TermIdMapping, UnorderedId}; use crate::value::{Coerce, NumericalType, NumericalValue}; -use crate::{Cardinality, RowId}; +use crate::{column, Cardinality, RowId}; /// This is a set of buffers that are used to temporarily write the values into before passing them /// to the fast field codecs. @@ -54,6 +54,7 @@ pub struct ColumnarWriter { bool_field_hash_map: ArenaHashMap, ip_addr_field_hash_map: ArenaHashMap, bytes_field_hash_map: ArenaHashMap, + str_field_hash_map: ArenaHashMap, arena: MemoryArena, // Dictionaries used to store dictionary-encoded values. dictionaries: Vec, @@ -67,6 +68,7 @@ impl Default for ColumnarWriter { bool_field_hash_map: ArenaHashMap::new(10_000), ip_addr_field_hash_map: ArenaHashMap::new(10_000), bytes_field_hash_map: ArenaHashMap::new(10_000), + str_field_hash_map: ArenaHashMap::new(10_000), dictionaries: Vec::new(), arena: MemoryArena::default(), buffers: SpareBuffers::default(), @@ -134,18 +136,18 @@ impl ColumnarWriter { "key may not contain the 0 byte" ); let (hash_map, arena, dictionaries) = ( - &mut self.bytes_field_hash_map, + &mut self.str_field_hash_map, &mut self.arena, &mut self.dictionaries, ); hash_map.mutate_or_create( column_name.as_bytes(), - |column_opt: Option| { - let mut column: StrColumnWriter = column_opt.unwrap_or_else(|| { + |column_opt: Option| { + let mut column: StrOrBytesColumnWriter = column_opt.unwrap_or_else(|| { // Each column has its own dictionary let dictionary_id = dictionaries.len() as u32; dictionaries.push(DictionaryBuilder::default()); - StrColumnWriter::with_dictionary_id(dictionary_id) + StrOrBytesColumnWriter::with_dictionary_id(dictionary_id) }); column.record_bytes(doc, value.as_bytes(), dictionaries, arena); column @@ -153,6 +155,30 @@ impl ColumnarWriter { ); } + pub fn record_bytes(&mut self, doc: RowId, column_name: &str, value: &[u8]) { + assert!( + !column_name.as_bytes().contains(&0u8), + "key may not contain the 0 byte" + ); + let (hash_map, arena, dictionaries) = ( + &mut self.bytes_field_hash_map, + &mut self.arena, + &mut self.dictionaries, + ); + hash_map.mutate_or_create( + column_name.as_bytes(), + |column_opt: Option| { + let mut column: StrOrBytesColumnWriter = column_opt.unwrap_or_else(|| { + // Each column has its own dictionary + let dictionary_id = dictionaries.len() as u32; + dictionaries.push(DictionaryBuilder::default()); + StrOrBytesColumnWriter::with_dictionary_id(dictionary_id) + }); + column.record_bytes(doc, value, dictionaries, arena); + column + }, + ); + } pub fn serialize(&mut self, num_docs: RowId, wrt: &mut dyn io::Write) -> io::Result<()> { let mut serializer = ColumnarSerializer::new(wrt); let mut field_columns: Vec<(&[u8], ColumnTypeCategory, Addr)> = self @@ -162,6 +188,11 @@ impl ColumnarWriter { .collect(); field_columns.extend( self.bytes_field_hash_map + .iter() + .map(|(term, addr, _)| (term, ColumnTypeCategory::Bytes, addr)), + ); + field_columns.extend( + self.str_field_hash_map .iter() .map(|(term, addr, _)| (term, ColumnTypeCategory::Str, addr)), ); @@ -180,8 +211,8 @@ impl ColumnarWriter { let (arena, buffers, dictionaries) = (&self.arena, &mut self.buffers, &self.dictionaries); let mut symbol_byte_buffer: Vec = Vec::new(); - for (column_name, bytes_or_numerical, addr) in field_columns { - match bytes_or_numerical { + for (column_name, column_type, addr) in field_columns { + match column_type { ColumnTypeCategory::Bool => { let column_writer: ColumnWriter = self.bool_field_hash_map.read(addr); let cardinality = column_writer.get_cardinality(num_docs); @@ -208,14 +239,19 @@ impl ColumnarWriter { &mut column_serializer, )?; } - ColumnTypeCategory::Str => { - let str_column_writer: StrColumnWriter = self.bytes_field_hash_map.read(addr); + ColumnTypeCategory::Bytes | ColumnTypeCategory::Str => { + let (column_type, str_column_writer): (ColumnType, StrOrBytesColumnWriter) = + if column_type == ColumnTypeCategory::Bytes { + (ColumnType::Bytes, self.bytes_field_hash_map.read(addr)) + } else { + (ColumnType::Str, self.str_field_hash_map.read(addr)) + }; let dictionary_builder = &dictionaries[str_column_writer.dictionary_id as usize]; let cardinality = str_column_writer.column_writer.get_cardinality(num_docs); let mut column_serializer = - serializer.serialize_column(column_name, ColumnType::Bytes); - serialize_bytes_column( + serializer.serialize_column(column_name, column_type); + serialize_bytes_or_str_column( cardinality, num_docs, dictionary_builder, @@ -247,7 +283,7 @@ impl ColumnarWriter { } } -fn serialize_bytes_column( +fn serialize_bytes_or_str_column( cardinality: Cardinality, num_docs: RowId, dictionary_builder: &DictionaryBuilder, diff --git a/columnar/src/columnar/writer/serializer.rs b/columnar/src/columnar/writer/serializer.rs index 47364c3fd..959d3850f 100644 --- a/columnar/src/columnar/writer/serializer.rs +++ b/columnar/src/columnar/writer/serializer.rs @@ -5,6 +5,7 @@ use common::CountingWriter; use sstable::value::RangeValueWriter; use sstable::RangeSSTable; +use crate::column; use crate::columnar::ColumnType; pub struct ColumnarSerializer { diff --git a/columnar/src/dynamic_column.rs b/columnar/src/dynamic_column.rs index 940ec3dbc..e672e6ac4 100644 --- a/columnar/src/dynamic_column.rs +++ b/columnar/src/dynamic_column.rs @@ -4,7 +4,7 @@ use std::net::Ipv6Addr; use common::file_slice::FileSlice; use common::{HasLen, OwnedBytes}; -use crate::column::{BytesColumn, Column}; +use crate::column::{BytesColumn, Column, StrColumn}; use crate::columnar::ColumnType; use crate::DateTime; @@ -16,7 +16,8 @@ pub enum DynamicColumn { F64(Column), IpAddr(Column), DateTime(Column), - Str(BytesColumn), + Bytes(BytesColumn), + Str(StrColumn), } impl From> for DynamicColumn { @@ -45,6 +46,12 @@ impl From> for DynamicColumn { impl From for DynamicColumn { fn from(dictionary_encoded_col: BytesColumn) -> Self { + DynamicColumn::Bytes(dictionary_encoded_col) + } +} + +impl From for DynamicColumn { + fn from(dictionary_encoded_col: StrColumn) -> Self { DynamicColumn::Str(dictionary_encoded_col) } } @@ -74,7 +81,10 @@ impl DynamicColumnHandle { fn open_internal(&self, column_bytes: OwnedBytes) -> io::Result { let dynamic_column: DynamicColumn = match self.column_type { - ColumnType::Bytes => crate::column::open_column_bytes(column_bytes)?.into(), + ColumnType::Bytes => { + crate::column::open_column_bytes::(column_bytes)?.into() + } + ColumnType::Str => crate::column::open_column_bytes::(column_bytes)?.into(), ColumnType::Numerical(numerical_type) => match numerical_type { crate::NumericalType::I64 => { crate::column::open_column_u64::(column_bytes)?.into() diff --git a/columnar/src/tests.rs b/columnar/src/tests.rs index 70a4ec34b..4ae785d05 100644 --- a/columnar/src/tests.rs +++ b/columnar/src/tests.rs @@ -4,13 +4,27 @@ use crate::column_values::MonotonicallyMappableToU128; use crate::columnar::ColumnType; use crate::dynamic_column::{DynamicColumn, DynamicColumnHandle}; use crate::value::NumericalValue; -use crate::{Cardinality, ColumnarReader, ColumnarWriter}; +use crate::{Cardinality, ColumnarReader, ColumnarWriter, RowId}; + +#[test] +fn test_dataframe_writer_str() { + let mut dataframe_writer = ColumnarWriter::default(); + dataframe_writer.record_str(1u32, "my_string", "hello"); + dataframe_writer.record_str(3u32, "my_string", "helloeee"); + let mut buffer: Vec = Vec::new(); + dataframe_writer.serialize(5, &mut buffer).unwrap(); + let columnar = ColumnarReader::open(buffer).unwrap(); + assert_eq!(columnar.num_columns(), 1); + let cols: Vec = columnar.read_columns("my_string").unwrap(); + assert_eq!(cols.len(), 1); + assert_eq!(cols[0].num_bytes(), 165); +} #[test] fn test_dataframe_writer_bytes() { let mut dataframe_writer = ColumnarWriter::default(); - dataframe_writer.record_str(1u32, "my_string", "hello"); - dataframe_writer.record_str(3u32, "my_string", "helloeee"); + dataframe_writer.record_bytes(1u32, "my_string", b"hello"); + dataframe_writer.record_bytes(3u32, "my_string", b"helloeee"); let mut buffer: Vec = Vec::new(); dataframe_writer.serialize(5, &mut buffer).unwrap(); let columnar = ColumnarReader::open(buffer).unwrap(); @@ -98,7 +112,7 @@ fn test_dataframe_writer_numerical() { } #[test] -fn test_dictionary_encoded() { +fn test_dictionary_encoded_str() { let mut buffer = Vec::new(); let mut columnar_writer = ColumnarWriter::default(); columnar_writer.record_str(1, "my.column", "a"); @@ -111,18 +125,65 @@ fn test_dictionary_encoded() { let col_handles = columnar_reader.read_columns("my.column").unwrap(); assert_eq!(col_handles.len(), 1); let DynamicColumn::Str(str_col) = col_handles[0].open().unwrap() else { panic!(); }; + let index: Vec> = (0..5) + .map(|row_id| str_col.term_ords().first(row_id)) + .collect(); + assert_eq!(index, &[None, Some(0), None, Some(2), Some(1)]); assert_eq!(str_col.num_rows(), 5); - let mut term_buffer = Vec::new(); - let term_ords = str_col.term_ords(); + let mut term_buffer = String::new(); + let term_ords = str_col.ordinal_dictionary(); assert_eq!(term_ords.first(0), None); assert_eq!(term_ords.first(1), Some(0)); - str_col.dictionary.ord_to_term(0u64, &mut term_buffer).unwrap(); + str_col.ord_to_str(0u64, &mut term_buffer).unwrap(); + assert_eq!(term_buffer, "a"); + assert_eq!(term_ords.first(2), None); + assert_eq!(term_ords.first(3), Some(2)); + str_col.ord_to_str(2u64, &mut term_buffer).unwrap(); + assert_eq!(term_buffer, "c"); + assert_eq!(term_ords.first(4), Some(1)); + str_col.ord_to_str(1u64, &mut term_buffer).unwrap(); + assert_eq!(term_buffer, "b"); +} + +#[test] +fn test_dictionary_encoded_bytes() { + let mut buffer = Vec::new(); + let mut columnar_writer = ColumnarWriter::default(); + columnar_writer.record_bytes(1, "my.column", b"a"); + columnar_writer.record_bytes(3, "my.column", b"c"); + columnar_writer.record_bytes(3, "my.column2", b"different_column!"); + columnar_writer.record_bytes(4, "my.column", b"b"); + columnar_writer.serialize(5, &mut buffer).unwrap(); + let columnar_reader = ColumnarReader::open(buffer).unwrap(); + assert_eq!(columnar_reader.num_columns(), 2); + let col_handles = columnar_reader.read_columns("my.column").unwrap(); + assert_eq!(col_handles.len(), 1); + let DynamicColumn::Bytes(bytes_col) = col_handles[0].open().unwrap() else { panic!(); }; + let index: Vec> = (0..5) + .map(|row_id| bytes_col.term_ords().first(row_id)) + .collect(); + assert_eq!(index, &[None, Some(0), None, Some(2), Some(1)]); + assert_eq!(bytes_col.num_rows(), 5); + let mut term_buffer = Vec::new(); + let term_ords = bytes_col.term_ords(); + assert_eq!(term_ords.first(0), None); + assert_eq!(term_ords.first(1), Some(0)); + bytes_col + .dictionary + .ord_to_term(0u64, &mut term_buffer) + .unwrap(); assert_eq!(term_buffer, b"a"); assert_eq!(term_ords.first(2), None); assert_eq!(term_ords.first(3), Some(2)); - str_col.dictionary.ord_to_term(2u64, &mut term_buffer).unwrap(); + bytes_col + .dictionary + .ord_to_term(2u64, &mut term_buffer) + .unwrap(); assert_eq!(term_buffer, b"c"); assert_eq!(term_ords.first(4), Some(1)); - str_col.dictionary.ord_to_term(1u64, &mut term_buffer).unwrap(); + bytes_col + .dictionary + .ord_to_term(1u64, &mut term_buffer) + .unwrap(); assert_eq!(term_buffer, b"b"); }