Differentiating between str and bytes, + unit test

This commit is contained in:
Paul Masurel
2023-01-19 14:32:49 +09:00
parent f9abd256b7
commit a86b104a40
11 changed files with 211 additions and 45 deletions

View File

@@ -5,7 +5,7 @@ use std::sync::Arc;
use sstable::{Dictionary, VoidSSTable};
use crate::column::Column;
use crate::column_index::ColumnIndex;
use crate::RowId;
/// Dictionary encoded column.
#[derive(Clone)]
@@ -17,19 +17,58 @@ pub struct BytesColumn {
impl BytesColumn {
/// Returns `false` if the term does not exist (e.g. `term_ord` is greater or equal to the
/// overll number of terms).
pub fn term_ord_to_str(&self, term_ord: u64, output: &mut Vec<u8>) -> io::Result<bool> {
pub fn ord_to_bytes(&self, term_ord: u64, output: &mut Vec<u8>) -> io::Result<bool> {
self.dictionary.ord_to_term(term_ord, output)
}
pub fn num_rows(&self) -> RowId {
self.term_ord_column.num_rows()
}
pub fn term_ords(&self) -> &Column<u64> {
&self.term_ord_column
}
}
impl Deref for BytesColumn {
type Target = ColumnIndex<'static>;
#[derive(Clone)]
pub struct StrColumn(BytesColumn);
fn deref(&self) -> &Self::Target {
&**self.term_ords()
impl From<BytesColumn> for StrColumn {
fn from(bytes_col: BytesColumn) -> Self {
StrColumn(bytes_col)
}
}
impl StrColumn {
pub fn ord_to_str(&self, term_ord: u64, output: &mut String) -> io::Result<bool> {
unsafe {
let buf = output.as_mut_vec();
self.0.dictionary.ord_to_term(term_ord, buf)?;
// TODO consider remove checks if it hurts performance.
if std::str::from_utf8(buf.as_slice()).is_err() {
buf.clear();
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Not valid utf-8",
));
}
}
Ok(true)
}
pub fn num_rows(&self) -> RowId {
self.term_ord_column.num_rows()
}
pub fn ordinal_dictionary(&self) -> &Column<u64> {
&self.0.term_ord_column
}
}
impl Deref for StrColumn {
type Target = BytesColumn;
fn deref(&self) -> &Self::Target {
&self.0
}
}

View File

@@ -5,11 +5,11 @@ use std::ops::Deref;
use std::sync::Arc;
use common::BinarySerializable;
pub use dictionary_encoded::BytesColumn;
pub use serialize::{
open_column_bytes, open_column_u128, open_column_u64, serialize_column_u128,
serialize_column_u64,
};
pub use dictionary_encoded::{BytesColumn, StrColumn};
use crate::column_index::ColumnIndex;
use crate::column_values::ColumnValues;

View File

@@ -80,14 +80,15 @@ pub fn open_column_u128<T: MonotonicallyMappableToU128>(
})
}
pub fn open_column_bytes(data: OwnedBytes) -> io::Result<BytesColumn> {
pub fn open_column_bytes<T: From<BytesColumn>>(data: OwnedBytes) -> io::Result<T> {
let (body, dictionary_len_bytes) = data.rsplit(4);
let dictionary_len = u32::from_le_bytes(dictionary_len_bytes.as_slice().try_into().unwrap());
let (dictionary_bytes, column_bytes) = body.split(dictionary_len as usize);
let dictionary = Arc::new(Dictionary::from_bytes(dictionary_bytes)?);
let term_ord_column = crate::column::open_column_u64::<u64>(column_bytes)?;
Ok(BytesColumn {
let bytes_column = BytesColumn {
dictionary,
term_ord_column,
})
};
Ok(bytes_column.into())
}

View File

@@ -9,6 +9,7 @@ use crate::InvalidData;
#[derive(Hash, Eq, PartialEq, Debug, Clone, Copy)]
pub enum ColumnType {
Bytes,
Str,
Numerical(NumericalType),
Bool,
IpAddr,
@@ -21,6 +22,10 @@ impl ColumnType {
let numerical_type_code: u8;
match self {
ColumnType::Bytes => {
column_type_category = ColumnTypeCategory::Bytes;
numerical_type_code = 0u8;
}
ColumnType::Str => {
column_type_category = ColumnTypeCategory::Str;
numerical_type_code = 0u8;
}
@@ -64,12 +69,18 @@ impl ColumnType {
if numerical_type_code != 0u8 {
return Err(InvalidData);
}
Ok(ColumnType::Bytes)
Ok(ColumnType::Str)
}
ColumnTypeCategory::Numerical => {
let numerical_type = NumericalType::try_from_code(numerical_type_code)?;
Ok(ColumnType::Numerical(numerical_type))
}
ColumnTypeCategory::Bytes => {
if numerical_type_code != 0u8 {
return Err(InvalidData);
}
Ok(ColumnType::Bytes)
}
}
}
}
@@ -88,6 +99,7 @@ pub(crate) enum ColumnTypeCategory {
Str = 1u8,
Numerical = 2u8,
IpAddr = 3u8,
Bytes = 4u8,
}
impl ColumnTypeCategory {
@@ -101,6 +113,7 @@ impl ColumnTypeCategory {
1u8 => Ok(Self::Str),
2u8 => Ok(Self::Numerical),
3u8 => Ok(Self::IpAddr),
4u8 => Ok(Self::Bytes),
_ => Err(InvalidData),
}
}
@@ -122,7 +135,7 @@ mod tests {
assert!(column_type_set.insert(column_type));
}
}
assert_eq!(column_type_set.len(), 2 + 3);
assert_eq!(column_type_set.len(), 3 + 4);
}
#[test]

View File

@@ -1,4 +1,5 @@
use std::io;
use crate::columnar::ColumnarReader;
pub enum MergeDocOrder {
@@ -9,11 +10,16 @@ pub enum MergeDocOrder {
/// rows [n_row_0..n_row_0 + n_row_1 contains the row of columnar_readers[1], in order.
/// ..
Stack,
/// Some more complex mapping, that can interleaves rows from the different readers and possibly drop rows.
/// Some more complex mapping, that can interleaves rows from the different readers and
/// possibly drop rows.
Complex(()),
}
pub fn merge(columnar_readers: &[ColumnarReader], mapping: MergeDocOrder, output: &mut impl io::Write) -> io::Result<()> {
pub fn merge(
columnar_readers: &[ColumnarReader],
mapping: MergeDocOrder,
output: &mut impl io::Write,
) -> io::Result<()> {
match mapping {
MergeDocOrder::Stack => {
// implement me :)
@@ -23,6 +29,5 @@ pub fn merge(columnar_readers: &[ColumnarReader], mapping: MergeDocOrder, output
// for later
todo!();
}
}
}

View File

@@ -1,8 +1,8 @@
mod column_type;
mod format_version;
mod merge;
mod reader;
mod writer;
mod merge;
pub use column_type::ColumnType;
pub use reader::ColumnarReader;

View File

@@ -176,14 +176,14 @@ impl NumericalColumnWriter {
}
#[derive(Copy, Clone, Default)]
pub(crate) struct StrColumnWriter {
pub(crate) struct StrOrBytesColumnWriter {
pub(crate) dictionary_id: u32,
pub(crate) column_writer: ColumnWriter,
}
impl StrColumnWriter {
pub(crate) fn with_dictionary_id(dictionary_id: u32) -> StrColumnWriter {
StrColumnWriter {
impl StrOrBytesColumnWriter {
pub(crate) fn with_dictionary_id(dictionary_id: u32) -> StrOrBytesColumnWriter {
StrOrBytesColumnWriter {
dictionary_id,
column_writer: Default::default(),
}

View File

@@ -17,12 +17,12 @@ use crate::column_values::{
};
use crate::columnar::column_type::{ColumnType, ColumnTypeCategory};
use crate::columnar::writer::column_writers::{
ColumnWriter, NumericalColumnWriter, StrColumnWriter,
ColumnWriter, NumericalColumnWriter, StrOrBytesColumnWriter,
};
use crate::columnar::writer::value_index::{IndexBuilder, PreallocatedIndexBuilders};
use crate::dictionary::{DictionaryBuilder, TermIdMapping, UnorderedId};
use crate::value::{Coerce, NumericalType, NumericalValue};
use crate::{Cardinality, RowId};
use crate::{column, Cardinality, RowId};
/// This is a set of buffers that are used to temporarily write the values into before passing them
/// to the fast field codecs.
@@ -54,6 +54,7 @@ pub struct ColumnarWriter {
bool_field_hash_map: ArenaHashMap,
ip_addr_field_hash_map: ArenaHashMap,
bytes_field_hash_map: ArenaHashMap,
str_field_hash_map: ArenaHashMap,
arena: MemoryArena,
// Dictionaries used to store dictionary-encoded values.
dictionaries: Vec<DictionaryBuilder>,
@@ -67,6 +68,7 @@ impl Default for ColumnarWriter {
bool_field_hash_map: ArenaHashMap::new(10_000),
ip_addr_field_hash_map: ArenaHashMap::new(10_000),
bytes_field_hash_map: ArenaHashMap::new(10_000),
str_field_hash_map: ArenaHashMap::new(10_000),
dictionaries: Vec::new(),
arena: MemoryArena::default(),
buffers: SpareBuffers::default(),
@@ -134,18 +136,18 @@ impl ColumnarWriter {
"key may not contain the 0 byte"
);
let (hash_map, arena, dictionaries) = (
&mut self.bytes_field_hash_map,
&mut self.str_field_hash_map,
&mut self.arena,
&mut self.dictionaries,
);
hash_map.mutate_or_create(
column_name.as_bytes(),
|column_opt: Option<StrColumnWriter>| {
let mut column: StrColumnWriter = column_opt.unwrap_or_else(|| {
|column_opt: Option<StrOrBytesColumnWriter>| {
let mut column: StrOrBytesColumnWriter = column_opt.unwrap_or_else(|| {
// Each column has its own dictionary
let dictionary_id = dictionaries.len() as u32;
dictionaries.push(DictionaryBuilder::default());
StrColumnWriter::with_dictionary_id(dictionary_id)
StrOrBytesColumnWriter::with_dictionary_id(dictionary_id)
});
column.record_bytes(doc, value.as_bytes(), dictionaries, arena);
column
@@ -153,6 +155,30 @@ impl ColumnarWriter {
);
}
pub fn record_bytes(&mut self, doc: RowId, column_name: &str, value: &[u8]) {
assert!(
!column_name.as_bytes().contains(&0u8),
"key may not contain the 0 byte"
);
let (hash_map, arena, dictionaries) = (
&mut self.bytes_field_hash_map,
&mut self.arena,
&mut self.dictionaries,
);
hash_map.mutate_or_create(
column_name.as_bytes(),
|column_opt: Option<StrOrBytesColumnWriter>| {
let mut column: StrOrBytesColumnWriter = column_opt.unwrap_or_else(|| {
// Each column has its own dictionary
let dictionary_id = dictionaries.len() as u32;
dictionaries.push(DictionaryBuilder::default());
StrOrBytesColumnWriter::with_dictionary_id(dictionary_id)
});
column.record_bytes(doc, value, dictionaries, arena);
column
},
);
}
pub fn serialize(&mut self, num_docs: RowId, wrt: &mut dyn io::Write) -> io::Result<()> {
let mut serializer = ColumnarSerializer::new(wrt);
let mut field_columns: Vec<(&[u8], ColumnTypeCategory, Addr)> = self
@@ -162,6 +188,11 @@ impl ColumnarWriter {
.collect();
field_columns.extend(
self.bytes_field_hash_map
.iter()
.map(|(term, addr, _)| (term, ColumnTypeCategory::Bytes, addr)),
);
field_columns.extend(
self.str_field_hash_map
.iter()
.map(|(term, addr, _)| (term, ColumnTypeCategory::Str, addr)),
);
@@ -180,8 +211,8 @@ impl ColumnarWriter {
let (arena, buffers, dictionaries) = (&self.arena, &mut self.buffers, &self.dictionaries);
let mut symbol_byte_buffer: Vec<u8> = Vec::new();
for (column_name, bytes_or_numerical, addr) in field_columns {
match bytes_or_numerical {
for (column_name, column_type, addr) in field_columns {
match column_type {
ColumnTypeCategory::Bool => {
let column_writer: ColumnWriter = self.bool_field_hash_map.read(addr);
let cardinality = column_writer.get_cardinality(num_docs);
@@ -208,14 +239,19 @@ impl ColumnarWriter {
&mut column_serializer,
)?;
}
ColumnTypeCategory::Str => {
let str_column_writer: StrColumnWriter = self.bytes_field_hash_map.read(addr);
ColumnTypeCategory::Bytes | ColumnTypeCategory::Str => {
let (column_type, str_column_writer): (ColumnType, StrOrBytesColumnWriter) =
if column_type == ColumnTypeCategory::Bytes {
(ColumnType::Bytes, self.bytes_field_hash_map.read(addr))
} else {
(ColumnType::Str, self.str_field_hash_map.read(addr))
};
let dictionary_builder =
&dictionaries[str_column_writer.dictionary_id as usize];
let cardinality = str_column_writer.column_writer.get_cardinality(num_docs);
let mut column_serializer =
serializer.serialize_column(column_name, ColumnType::Bytes);
serialize_bytes_column(
serializer.serialize_column(column_name, column_type);
serialize_bytes_or_str_column(
cardinality,
num_docs,
dictionary_builder,
@@ -247,7 +283,7 @@ impl ColumnarWriter {
}
}
fn serialize_bytes_column(
fn serialize_bytes_or_str_column(
cardinality: Cardinality,
num_docs: RowId,
dictionary_builder: &DictionaryBuilder,

View File

@@ -5,6 +5,7 @@ use common::CountingWriter;
use sstable::value::RangeValueWriter;
use sstable::RangeSSTable;
use crate::column;
use crate::columnar::ColumnType;
pub struct ColumnarSerializer<W: io::Write> {

View File

@@ -4,7 +4,7 @@ use std::net::Ipv6Addr;
use common::file_slice::FileSlice;
use common::{HasLen, OwnedBytes};
use crate::column::{BytesColumn, Column};
use crate::column::{BytesColumn, Column, StrColumn};
use crate::columnar::ColumnType;
use crate::DateTime;
@@ -16,7 +16,8 @@ pub enum DynamicColumn {
F64(Column<f64>),
IpAddr(Column<Ipv6Addr>),
DateTime(Column<DateTime>),
Str(BytesColumn),
Bytes(BytesColumn),
Str(StrColumn),
}
impl From<Column<i64>> for DynamicColumn {
@@ -45,6 +46,12 @@ impl From<Column<bool>> for DynamicColumn {
impl From<BytesColumn> for DynamicColumn {
fn from(dictionary_encoded_col: BytesColumn) -> Self {
DynamicColumn::Bytes(dictionary_encoded_col)
}
}
impl From<StrColumn> for DynamicColumn {
fn from(dictionary_encoded_col: StrColumn) -> Self {
DynamicColumn::Str(dictionary_encoded_col)
}
}
@@ -74,7 +81,10 @@ impl DynamicColumnHandle {
fn open_internal(&self, column_bytes: OwnedBytes) -> io::Result<DynamicColumn> {
let dynamic_column: DynamicColumn = match self.column_type {
ColumnType::Bytes => crate::column::open_column_bytes(column_bytes)?.into(),
ColumnType::Bytes => {
crate::column::open_column_bytes::<BytesColumn>(column_bytes)?.into()
}
ColumnType::Str => crate::column::open_column_bytes::<StrColumn>(column_bytes)?.into(),
ColumnType::Numerical(numerical_type) => match numerical_type {
crate::NumericalType::I64 => {
crate::column::open_column_u64::<i64>(column_bytes)?.into()

View File

@@ -4,13 +4,27 @@ use crate::column_values::MonotonicallyMappableToU128;
use crate::columnar::ColumnType;
use crate::dynamic_column::{DynamicColumn, DynamicColumnHandle};
use crate::value::NumericalValue;
use crate::{Cardinality, ColumnarReader, ColumnarWriter};
use crate::{Cardinality, ColumnarReader, ColumnarWriter, RowId};
#[test]
fn test_dataframe_writer_str() {
let mut dataframe_writer = ColumnarWriter::default();
dataframe_writer.record_str(1u32, "my_string", "hello");
dataframe_writer.record_str(3u32, "my_string", "helloeee");
let mut buffer: Vec<u8> = Vec::new();
dataframe_writer.serialize(5, &mut buffer).unwrap();
let columnar = ColumnarReader::open(buffer).unwrap();
assert_eq!(columnar.num_columns(), 1);
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("my_string").unwrap();
assert_eq!(cols.len(), 1);
assert_eq!(cols[0].num_bytes(), 165);
}
#[test]
fn test_dataframe_writer_bytes() {
let mut dataframe_writer = ColumnarWriter::default();
dataframe_writer.record_str(1u32, "my_string", "hello");
dataframe_writer.record_str(3u32, "my_string", "helloeee");
dataframe_writer.record_bytes(1u32, "my_string", b"hello");
dataframe_writer.record_bytes(3u32, "my_string", b"helloeee");
let mut buffer: Vec<u8> = Vec::new();
dataframe_writer.serialize(5, &mut buffer).unwrap();
let columnar = ColumnarReader::open(buffer).unwrap();
@@ -98,7 +112,7 @@ fn test_dataframe_writer_numerical() {
}
#[test]
fn test_dictionary_encoded() {
fn test_dictionary_encoded_str() {
let mut buffer = Vec::new();
let mut columnar_writer = ColumnarWriter::default();
columnar_writer.record_str(1, "my.column", "a");
@@ -111,18 +125,65 @@ fn test_dictionary_encoded() {
let col_handles = columnar_reader.read_columns("my.column").unwrap();
assert_eq!(col_handles.len(), 1);
let DynamicColumn::Str(str_col) = col_handles[0].open().unwrap() else { panic!(); };
let index: Vec<Option<u64>> = (0..5)
.map(|row_id| str_col.term_ords().first(row_id))
.collect();
assert_eq!(index, &[None, Some(0), None, Some(2), Some(1)]);
assert_eq!(str_col.num_rows(), 5);
let mut term_buffer = Vec::new();
let term_ords = str_col.term_ords();
let mut term_buffer = String::new();
let term_ords = str_col.ordinal_dictionary();
assert_eq!(term_ords.first(0), None);
assert_eq!(term_ords.first(1), Some(0));
str_col.dictionary.ord_to_term(0u64, &mut term_buffer).unwrap();
str_col.ord_to_str(0u64, &mut term_buffer).unwrap();
assert_eq!(term_buffer, "a");
assert_eq!(term_ords.first(2), None);
assert_eq!(term_ords.first(3), Some(2));
str_col.ord_to_str(2u64, &mut term_buffer).unwrap();
assert_eq!(term_buffer, "c");
assert_eq!(term_ords.first(4), Some(1));
str_col.ord_to_str(1u64, &mut term_buffer).unwrap();
assert_eq!(term_buffer, "b");
}
#[test]
fn test_dictionary_encoded_bytes() {
let mut buffer = Vec::new();
let mut columnar_writer = ColumnarWriter::default();
columnar_writer.record_bytes(1, "my.column", b"a");
columnar_writer.record_bytes(3, "my.column", b"c");
columnar_writer.record_bytes(3, "my.column2", b"different_column!");
columnar_writer.record_bytes(4, "my.column", b"b");
columnar_writer.serialize(5, &mut buffer).unwrap();
let columnar_reader = ColumnarReader::open(buffer).unwrap();
assert_eq!(columnar_reader.num_columns(), 2);
let col_handles = columnar_reader.read_columns("my.column").unwrap();
assert_eq!(col_handles.len(), 1);
let DynamicColumn::Bytes(bytes_col) = col_handles[0].open().unwrap() else { panic!(); };
let index: Vec<Option<u64>> = (0..5)
.map(|row_id| bytes_col.term_ords().first(row_id))
.collect();
assert_eq!(index, &[None, Some(0), None, Some(2), Some(1)]);
assert_eq!(bytes_col.num_rows(), 5);
let mut term_buffer = Vec::new();
let term_ords = bytes_col.term_ords();
assert_eq!(term_ords.first(0), None);
assert_eq!(term_ords.first(1), Some(0));
bytes_col
.dictionary
.ord_to_term(0u64, &mut term_buffer)
.unwrap();
assert_eq!(term_buffer, b"a");
assert_eq!(term_ords.first(2), None);
assert_eq!(term_ords.first(3), Some(2));
str_col.dictionary.ord_to_term(2u64, &mut term_buffer).unwrap();
bytes_col
.dictionary
.ord_to_term(2u64, &mut term_buffer)
.unwrap();
assert_eq!(term_buffer, b"c");
assert_eq!(term_ords.first(4), Some(1));
str_col.dictionary.ord_to_term(1u64, &mut term_buffer).unwrap();
bytes_col
.dictionary
.ord_to_term(1u64, &mut term_buffer)
.unwrap();
assert_eq!(term_buffer, b"b");
}