diff --git a/Cargo.toml b/Cargo.toml index 731045b9c..ce9f528d6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -106,7 +106,7 @@ unstable = [] # useful for benches. quickwit = ["sstable"] [workspace] -members = ["query-grammar", "bitpacker", "common", "fastfield_codecs", "ownedbytes", "stacker", "sstable"] +members = ["query-grammar", "bitpacker", "common", "fastfield_codecs", "ownedbytes", "stacker", "sstable", "columnar"] # Following the "fail" crate best practises, we isolate # tests that define specific behavior in fail check points diff --git a/columnar/Cargo.toml b/columnar/Cargo.toml new file mode 100644 index 000000000..b67c4cbfd --- /dev/null +++ b/columnar/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "tantivy-columnar" +version = "0.1.0" +edition = "2021" +license = "MIT" + +[dependencies] +stacker = { path = "../stacker", package="tantivy-stacker"} +serde_json = "1" +thiserror = "1" +fnv = "1" +sstable = { path = "../sstable", package = "tantivy-sstable" } +common = { path = "../common", package = "tantivy-common" } +fastfield_codecs = { path = "../fastfield_codecs"} +itertools = "0.10" + +[dev-dependencies] +proptest = "1" diff --git a/columnar/README.md b/columnar/README.md new file mode 100644 index 000000000..2c875415e --- /dev/null +++ b/columnar/README.md @@ -0,0 +1,67 @@ +# Columnar format + +This crate describes columnar format used in tantivy. + +## Goals + +This format is special in the following way. +- it needs to be compact +- it does not required to be loaded in memory. +- it is designed to fit well with quickwit's strange constraint: +we need to be able to load columns rapidly. +- columns of several types can be associated with the same column name. +- it needs to support columns with different types `(str, u64, i64, f64)` +and different cardinality `(required, optional, multivalued)`. +- columns, once loaded, offer cheap random access. + +# Coercion rules + +Users can create a columnar by inserting rows to a `ColumnarWriter`, +and serializing it into a `Write` object. +Nothing prevents a user from recording values with different type to the same `column_name`. + +In that case, `tantivy-columnar`'s behavior is as follows: +- JsonValues are grouped into 3 types (String, Number, bool). +Values that corresponds to different groups are mapped to different columns. For instance, String values are treated independently +from Number or boolean values. `tantivy-columnar` will simply emit several columns associated to a given column_name. +- Only one column for a given json value type is emitted. If number values with different number types are recorded (e.g. u64, i64, f64), +`tantivy-columnar` will pick the first type that can represents the set of appended value, with the following prioriy order (`i64`, `u64`, `f64`). +`i64` is picked over `u64` as it is likely to yield less change of types. Most use cases strictly requiring `u64` show the +restriction on 50% of the values (e.g. a 64-bit hash). On the other hand, a lot of use cases can show rare negative value. + +# Columnar format + +This columnar format may have more than one column (with different types) associated to the same `column_name` (see [Coercion rules](#coercion-rules) above). +The `(column_name, columne_type)` couple however uniquely identifies a column. +That couple is serialized as a column `column_key`. The format of that key is: +`[column_name][ZERO_BYTE][column_type_header: u8]` + +``` +COLUMNAR:= + [COLUMNAR_DATA] + [COLUMNAR_KEY_TO_DATA_INDEX] + [COLUMNAR_FOOTER]; + + +# Columns are sorted by their column key. +COLUMNAR_DATA:= + [COLUMN_DATA]+; + +COLUMNAR_FOOTER := [RANGE_SSTABLE_BYTES_LEN: 8 bytes little endian] + +``` + +The columnar file starts by the actual column data, concatenated one after the other, +sorted by column key. + +A sstable associates +`(column name, column_cardinality, column_type) to range of bytes. + +Column name may not contain the zero byte `\0`. + +Listing all columns associated to `column_name` can therefore +be done by listing all keys prefixed by +`[column_name][ZERO_BYTE]` + +The associated range of bytes refer to a range of bytes + diff --git a/columnar/src/column_type_header.rs b/columnar/src/column_type_header.rs new file mode 100644 index 000000000..87add9204 --- /dev/null +++ b/columnar/src/column_type_header.rs @@ -0,0 +1,201 @@ +use crate::utils::{place_bits, select_bits}; +use crate::value::NumericalType; +use crate::InvalidData; + +/// Enum describing the number of values that can exist per document +/// (or per row if you will). +/// +/// The cardinality must fit on 2 bits. +#[derive(Clone, Copy, Hash, Default, Debug, PartialEq, Eq, PartialOrd, Ord)] +#[repr(u8)] +pub enum Cardinality { + /// All documents contain exactly one value. + /// Required is the default for auto-detecting the Cardinality, since it is the most strict. + #[default] + Required = 0, + /// All documents contain at most one value. + Optional = 1, + /// All documents may contain any number of values. + Multivalued = 2, +} + +impl Cardinality { + pub(crate) fn to_code(self) -> u8 { + self as u8 + } + + pub(crate) fn try_from_code(code: u8) -> Result { + match code { + 0 => Ok(Cardinality::Required), + 1 => Ok(Cardinality::Optional), + 2 => Ok(Cardinality::Multivalued), + _ => Err(InvalidData), + } + } +} + +/// The column type represents the column type and can fit on 6-bits. +/// +/// - bits[0..3]: Column category type. +/// - bits[3..6]: Numerical type if necessary. +#[derive(Hash, Eq, PartialEq, Debug, Clone, Copy)] +pub enum ColumnType { + Bytes, + Numerical(NumericalType), + Bool, +} + +impl ColumnType { + /// Encoded over 6 bits. + pub(crate) fn to_code(self) -> u8 { + let column_type_category; + let numerical_type_code: u8; + match self { + ColumnType::Bytes => { + column_type_category = ColumnTypeCategory::Str; + numerical_type_code = 0u8; + } + ColumnType::Numerical(numerical_type) => { + column_type_category = ColumnTypeCategory::Numerical; + numerical_type_code = numerical_type.to_code(); + } + ColumnType::Bool => { + column_type_category = ColumnTypeCategory::Bool; + numerical_type_code = 0u8; + } + } + place_bits::<0, 3>(column_type_category.to_code()) | place_bits::<3, 6>(numerical_type_code) + } + + pub(crate) fn try_from_code(code: u8) -> Result { + if select_bits::<6, 8>(code) != 0u8 { + return Err(InvalidData); + } + let column_type_category_code = select_bits::<0, 3>(code); + let numerical_type_code = select_bits::<3, 6>(code); + let column_type_category = ColumnTypeCategory::try_from_code(column_type_category_code)?; + match column_type_category { + ColumnTypeCategory::Bool => { + if numerical_type_code != 0u8 { + return Err(InvalidData); + } + Ok(ColumnType::Bool) + } + ColumnTypeCategory::Str => { + if numerical_type_code != 0u8 { + return Err(InvalidData); + } + Ok(ColumnType::Bytes) + } + ColumnTypeCategory::Numerical => { + let numerical_type = NumericalType::try_from_code(numerical_type_code)?; + Ok(ColumnType::Numerical(numerical_type)) + } + } + } +} + +/// Column types are grouped into different categories that +/// corresponds to the different types of `JsonValue` types. +/// +/// The columnar writer will apply coercion rules to make sure that +/// at most one column exist per `ColumnTypeCategory`. +/// +/// See also [README.md]. +#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Debug)] +#[repr(u8)] +pub(crate) enum ColumnTypeCategory { + Bool = 0u8, + Str = 1u8, + Numerical = 2u8, +} + +impl ColumnTypeCategory { + pub fn to_code(self) -> u8 { + self as u8 + } + + pub fn try_from_code(code: u8) -> Result { + match code { + 0u8 => Ok(Self::Bool), + 1u8 => Ok(Self::Str), + 2u8 => Ok(Self::Numerical), + _ => Err(InvalidData), + } + } +} + +/// Represents the type and cardinality of a column. +/// This is encoded over one-byte and added to a column key in the +/// columnar sstable. +/// +/// - [0..6] bits: encodes the column type +/// - [6..8] bits: encodes the cardinality +#[derive(Eq, Hash, PartialEq, Debug, Copy, Clone)] +pub struct ColumnTypeAndCardinality { + pub typ: ColumnType, + pub cardinality: Cardinality, +} + +impl ColumnTypeAndCardinality { + pub fn to_code(self) -> u8 { + place_bits::<0, 6>(self.typ.to_code()) | place_bits::<6, 8>(self.cardinality.to_code()) + } + + pub fn try_from_code(code: u8) -> Result { + let typ_code = select_bits::<0, 6>(code); + let cardinality_code = select_bits::<6, 8>(code); + let cardinality = Cardinality::try_from_code(cardinality_code)?; + let typ = ColumnType::try_from_code(typ_code)?; + assert_eq!(typ.to_code(), typ_code); + Ok(ColumnTypeAndCardinality { cardinality, typ }) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use super::ColumnTypeAndCardinality; + use crate::column_type_header::{Cardinality, ColumnType}; + + #[test] + fn test_column_type_header_to_code() { + let mut column_type_header_set: HashSet = HashSet::new(); + for code in u8::MIN..=u8::MAX { + if let Ok(column_type_header) = ColumnTypeAndCardinality::try_from_code(code) { + assert_eq!(column_type_header.to_code(), code); + assert!(column_type_header_set.insert(column_type_header)); + } + } + assert_eq!( + column_type_header_set.len(), + 3 /* cardinality */ * + (1 + 1 + 3) // column_types (str, bool, numerical x 3) + ); + } + + #[test] + fn test_column_type_to_code() { + let mut column_type_set: HashSet = HashSet::new(); + for code in u8::MIN..=u8::MAX { + if let Ok(column_type) = ColumnType::try_from_code(code) { + assert_eq!(column_type.to_code(), code); + assert!(column_type_set.insert(column_type)); + } + } + assert_eq!(column_type_set.len(), 2 + 3); + } + + #[test] + fn test_cardinality_to_code() { + let mut num_cardinality = 0; + for code in u8::MIN..=u8::MAX { + if let Ok(cardinality) = Cardinality::try_from_code(code) { + assert_eq!(cardinality.to_code(), code); + num_cardinality += 1; + } + } + assert_eq!(num_cardinality, 3); + } +} diff --git a/columnar/src/dictionary.rs b/columnar/src/dictionary.rs new file mode 100644 index 000000000..82ccb91df --- /dev/null +++ b/columnar/src/dictionary.rs @@ -0,0 +1,84 @@ +use std::io; + +use fnv::FnvHashMap; +use sstable::SSTable; + +pub(crate) struct TermIdMapping { + unordered_to_ord: Vec, +} + +impl TermIdMapping { + pub fn to_ord(&self, unordered: UnorderedId) -> OrderedId { + self.unordered_to_ord[unordered.0 as usize] + } +} + +/// When we add values, we cannot know their ordered id yet. +/// For this reason, we temporarily assign them a `UnorderedId` +/// that will be mapped to an `OrderedId` upon serialization. +#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)] +pub struct UnorderedId(pub u32); + +#[derive(Clone, Copy, Hash, PartialEq, Eq, Debug)] +pub struct OrderedId(pub u32); + +/// `DictionaryBuilder` for dictionary encoding. +/// +/// It stores the different terms encounterred and assigns them a temporary value +/// we call unordered id. +/// +/// Upon serialization, we will sort the ids and hence build a `UnorderedId -> Term ordinal` +/// mapping. +#[derive(Default)] +pub(crate) struct DictionaryBuilder { + dict: FnvHashMap, UnorderedId>, +} + +impl DictionaryBuilder { + /// Get or allocate an unordered id. + /// (This ID is simply an auto-incremented id.) + pub fn get_or_allocate_id(&mut self, term: &[u8]) -> UnorderedId { + if let Some(term_id) = self.dict.get(term) { + return *term_id; + } + let new_id = UnorderedId(self.dict.len() as u32); + self.dict.insert(term.to_vec(), new_id); + new_id + } + + /// Serialize the dictionary into an fst, and returns the + /// `UnorderedId -> TermOrdinal` map. + pub fn serialize<'a, W: io::Write + 'a>(&self, wrt: &mut W) -> io::Result { + let mut terms: Vec<(&[u8], UnorderedId)> = + self.dict.iter().map(|(k, v)| (k.as_slice(), *v)).collect(); + terms.sort_unstable_by_key(|(key, _)| *key); + // TODO Remove the allocation. + let mut unordered_to_ord: Vec = vec![OrderedId(0u32); terms.len()]; + let mut sstable_builder = sstable::VoidSSTable::writer(wrt); + for (ord, (key, unordered_id)) in terms.into_iter().enumerate() { + let ordered_id = OrderedId(ord as u32); + sstable_builder.insert(key, &())?; + unordered_to_ord[unordered_id.0 as usize] = ordered_id; + } + sstable_builder.finish()?; + Ok(TermIdMapping { unordered_to_ord }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_dictionary_builder() { + let mut dictionary_builder = DictionaryBuilder::default(); + let hello_uid = dictionary_builder.get_or_allocate_id(b"hello"); + let happy_uid = dictionary_builder.get_or_allocate_id(b"happy"); + let tax_uid = dictionary_builder.get_or_allocate_id(b"tax"); + let mut buffer = Vec::new(); + let id_mapping = dictionary_builder.serialize(&mut buffer).unwrap(); + assert_eq!(id_mapping.to_ord(hello_uid), OrderedId(1)); + assert_eq!(id_mapping.to_ord(happy_uid), OrderedId(0)); + assert_eq!(id_mapping.to_ord(tax_uid), OrderedId(2)); + } +} diff --git a/columnar/src/lib.rs b/columnar/src/lib.rs new file mode 100644 index 000000000..0e28de4da --- /dev/null +++ b/columnar/src/lib.rs @@ -0,0 +1,89 @@ +mod column_type_header; +mod dictionary; +mod reader; +pub(crate) mod utils; +mod value; +mod writer; + +pub use column_type_header::Cardinality; +pub use reader::ColumnarReader; +pub use value::{NumericalType, NumericalValue}; +pub use writer::ColumnarWriter; + +pub type DocId = u32; + +#[derive(Copy, Clone, Debug)] +pub struct InvalidData; + +#[cfg(test)] +mod tests { + use std::ops::Range; + + use common::file_slice::FileSlice; + + use crate::column_type_header::{ColumnType, ColumnTypeAndCardinality}; + use crate::reader::ColumnarReader; + use crate::value::NumericalValue; + use crate::{Cardinality, ColumnarWriter}; + + #[test] + fn test_dataframe_writer_bytes() { + let mut dataframe_writer = ColumnarWriter::default(); + dataframe_writer.record_str(1u32, "my_string", "hello"); + dataframe_writer.record_str(3u32, "my_string", "helloeee"); + let mut buffer: Vec = Vec::new(); + dataframe_writer.serialize(5, &mut buffer).unwrap(); + let columnar_fileslice = FileSlice::from(buffer); + let columnar = ColumnarReader::open(columnar_fileslice).unwrap(); + assert_eq!(columnar.num_columns(), 1); + let cols: Vec<(ColumnTypeAndCardinality, Range)> = + columnar.read_columns("my_string").unwrap(); + assert_eq!(cols.len(), 1); + assert_eq!(cols[0].1, 0..158); + } + + #[test] + fn test_dataframe_writer_bool() { + let mut dataframe_writer = ColumnarWriter::default(); + dataframe_writer.record_bool(1u32, "bool.value", false); + let mut buffer: Vec = Vec::new(); + dataframe_writer.serialize(5, &mut buffer).unwrap(); + let columnar_fileslice = FileSlice::from(buffer); + let columnar = ColumnarReader::open(columnar_fileslice).unwrap(); + assert_eq!(columnar.num_columns(), 1); + let cols: Vec<(ColumnTypeAndCardinality, Range)> = + columnar.read_columns("bool.value").unwrap(); + assert_eq!(cols.len(), 1); + assert_eq!( + cols[0].0, + ColumnTypeAndCardinality { + cardinality: Cardinality::Optional, + typ: ColumnType::Bool + } + ); + assert_eq!(cols[0].1, 0..21); + } + + #[test] + fn test_dataframe_writer_numerical() { + let mut dataframe_writer = ColumnarWriter::default(); + dataframe_writer.record_numerical(1u32, "srical.value", NumericalValue::U64(12u64)); + dataframe_writer.record_numerical(2u32, "srical.value", NumericalValue::U64(13u64)); + dataframe_writer.record_numerical(4u32, "srical.value", NumericalValue::U64(15u64)); + let mut buffer: Vec = Vec::new(); + dataframe_writer.serialize(5, &mut buffer).unwrap(); + let columnar_fileslice = FileSlice::from(buffer); + let columnar = ColumnarReader::open(columnar_fileslice).unwrap(); + assert_eq!(columnar.num_columns(), 1); + let cols: Vec<(ColumnTypeAndCardinality, Range)> = + columnar.read_columns("srical.value").unwrap(); + assert_eq!(cols.len(), 1); + // Right now this 31 bytes are spent as follows + // + // - header 14 bytes + // - vals 8 //< due to padding? could have been 1byte?. + // - null footer 6 bytes + // - version footer 3 bytes // Should be file-wide + assert_eq!(cols[0].1, 0..31); + } +} diff --git a/columnar/src/reader/mod.rs b/columnar/src/reader/mod.rs new file mode 100644 index 000000000..586b13507 --- /dev/null +++ b/columnar/src/reader/mod.rs @@ -0,0 +1,110 @@ +use std::ops::Range; +use std::{io, mem}; + +use common::file_slice::FileSlice; +use common::BinarySerializable; +use sstable::{Dictionary, RangeSSTable}; + +use crate::column_type_header::ColumnTypeAndCardinality; + +fn io_invalid_data(msg: String) -> io::Error { + io::Error::new(io::ErrorKind::InvalidData, msg) +} + +/// The ColumnarReader makes it possible to access a set of columns +/// associated to field names. +pub struct ColumnarReader { + column_dictionary: Dictionary, + column_data: FileSlice, +} + +impl ColumnarReader { + /// Opens a new Columnar file. + pub fn open(file_slice: F) -> io::Result + where FileSlice: From { + Self::open_inner(file_slice.into()) + } + + fn open_inner(file_slice: FileSlice) -> io::Result { + let (file_slice_without_sstable_len, sstable_len_bytes) = + file_slice.split_from_end(mem::size_of::()); + let mut sstable_len_bytes = sstable_len_bytes.read_bytes()?; + let sstable_len = u64::deserialize(&mut sstable_len_bytes)?; + let (column_data, sstable) = + file_slice_without_sstable_len.split_from_end(sstable_len as usize); + let column_dictionary = Dictionary::open(sstable)?; + Ok(ColumnarReader { + column_dictionary, + column_data, + }) + } + + // TODO fix ugly API + pub fn list_columns( + &self, + ) -> io::Result, u64)>> { + let mut stream = self.column_dictionary.stream()?; + let mut results = Vec::new(); + while stream.advance() { + let key_bytes: &[u8] = stream.key(); + let column_code: u8 = key_bytes.last().cloned().unwrap(); + let column_type_and_cardinality = ColumnTypeAndCardinality::try_from_code(column_code) + .map_err(|_| io_invalid_data(format!("Unknown column code `{column_code}`")))?; + let range = stream.value().clone(); + let column_name = String::from_utf8_lossy(&key_bytes[..key_bytes.len() - 1]); + let range_len = range.end - range.start; + results.push(( + column_name.to_string(), + column_type_and_cardinality, + range, + range_len, + )); + } + Ok(results) + } + + /// Get all columns for the given column name. + /// + /// There can be more than one column associated to a given column name, provided they have + /// different types. + // TODO fix ugly API + pub fn read_columns( + &self, + column_name: &str, + ) -> io::Result)>> { + // Each column is a associated to a given `column_key`, + // that starts by `column_name\0column_header`. + // + // Listing the columns associated to the given column name is therefore equivalent to + // listing `column_key` with the prefix `column_name\0`. + // + // This is in turn equivalent to searching for the range + // `[column_name,\0`..column_name\1)`. + let mut start_key = column_name.to_string(); + start_key.push('\0'); + let mut end_key = column_name.to_string(); + end_key.push(1u8 as char); + let mut stream = self + .column_dictionary + .range() + .ge(start_key.as_bytes()) + .lt(end_key.as_bytes()) + .into_stream()?; + let mut results = Vec::new(); + while stream.advance() { + let key_bytes: &[u8] = stream.key(); + assert!(key_bytes.starts_with(start_key.as_bytes())); + let column_code: u8 = key_bytes.last().cloned().unwrap(); + let column_type_and_cardinality = ColumnTypeAndCardinality::try_from_code(column_code) + .map_err(|_| io_invalid_data(format!("Unknown column code `{column_code}`")))?; + let range = stream.value().clone(); + results.push((column_type_and_cardinality, range)); + } + Ok(results) + } + + /// Return the number of columns in the columnar. + pub fn num_columns(&self) -> usize { + self.column_dictionary.num_terms() + } +} diff --git a/columnar/src/utils.rs b/columnar/src/utils.rs new file mode 100644 index 000000000..50ada1001 --- /dev/null +++ b/columnar/src/utils.rs @@ -0,0 +1,76 @@ +const fn compute_mask(num_bits: u8) -> u8 { + if num_bits == 8 { + u8::MAX + } else { + (1u8 << num_bits) - 1 + } +} + +#[inline(always)] +#[must_use] +pub(crate) fn select_bits(code: u8) -> u8 { + assert!(START <= END); + assert!(END <= 8); + let num_bits: u8 = END - START; + let mask: u8 = compute_mask(num_bits); + (code >> START) & mask +} + +#[inline(always)] +#[must_use] +pub(crate) fn place_bits(code: u8) -> u8 { + assert!(START <= END); + assert!(END <= 8); + let num_bits: u8 = END - START; + let mask: u8 = compute_mask(num_bits); + assert!(code <= mask); + code << START +} + +/// Pop-front one bytes from a slice of bytes. +#[inline(always)] +pub fn pop_first_byte(bytes: &mut &[u8]) -> Option { + if bytes.is_empty() { + return None; + } + let first_byte = bytes[0]; + *bytes = &bytes[1..]; + Some(first_byte) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_select_bits() { + assert_eq!(255u8, select_bits::<0, 8>(255u8)); + assert_eq!(0u8, select_bits::<0, 0>(255u8)); + assert_eq!(8u8, select_bits::<0, 4>(8u8)); + assert_eq!(4u8, select_bits::<1, 4>(8u8)); + assert_eq!(0u8, select_bits::<1, 3>(8u8)); + } + + #[test] + fn test_place_bits() { + assert_eq!(255u8, place_bits::<0, 8>(255u8)); + assert_eq!(4u8, place_bits::<2, 3>(1u8)); + assert_eq!(0u8, place_bits::<2, 2>(0u8)); + } + + #[test] + #[should_panic] + fn test_place_bits_overflows() { + let _ = place_bits::<1, 4>(8u8); + } + + #[test] + fn test_pop_first_byte() { + let mut cursor: &[u8] = &b"abcd"[..]; + assert_eq!(pop_first_byte(&mut cursor), Some(b'a')); + assert_eq!(pop_first_byte(&mut cursor), Some(b'b')); + assert_eq!(pop_first_byte(&mut cursor), Some(b'c')); + assert_eq!(pop_first_byte(&mut cursor), Some(b'd')); + assert_eq!(pop_first_byte(&mut cursor), None); + } +} diff --git a/columnar/src/value.rs b/columnar/src/value.rs new file mode 100644 index 000000000..258e80b18 --- /dev/null +++ b/columnar/src/value.rs @@ -0,0 +1,124 @@ +use crate::InvalidData; + +#[derive(Copy, Clone, Debug, PartialEq)] +pub enum NumericalValue { + I64(i64), + U64(u64), + F64(f64), +} + +impl From for NumericalValue { + fn from(val: u64) -> NumericalValue { + NumericalValue::U64(val) + } +} + +impl From for NumericalValue { + fn from(val: i64) -> Self { + NumericalValue::I64(val) + } +} + +impl From for NumericalValue { + fn from(val: f64) -> Self { + NumericalValue::F64(val) + } +} + +impl NumericalValue { + pub fn numerical_type(&self) -> NumericalType { + match self { + NumericalValue::F64(_) => NumericalType::F64, + NumericalValue::I64(_) => NumericalType::I64, + NumericalValue::U64(_) => NumericalType::U64, + } + } +} + +impl Eq for NumericalValue {} + +#[derive(Clone, Copy, Debug, Default, Hash, Eq, PartialEq)] +#[repr(u8)] +pub enum NumericalType { + #[default] + I64 = 0, + U64 = 1, + F64 = 2, +} + +impl NumericalType { + pub fn to_code(self) -> u8 { + self as u8 + } + + pub fn try_from_code(code: u8) -> Result { + match code { + 0 => Ok(NumericalType::I64), + 1 => Ok(NumericalType::U64), + 2 => Ok(NumericalType::F64), + _ => Err(InvalidData), + } + } +} + +/// We voluntarily avoid using `Into` here to keep this +/// implementation quirk as private as possible. +/// +/// # Panics +/// This coercion trait actually panics if it is used +/// to convert a loose types to a stricter type. +/// +/// The level is strictness is somewhat arbitrary. +/// - i64 +/// - u64 +/// - f64. +pub(crate) trait Coerce { + fn coerce(numerical_value: NumericalValue) -> Self; +} + +impl Coerce for i64 { + fn coerce(value: NumericalValue) -> Self { + match value { + NumericalValue::I64(val) => val, + NumericalValue::U64(val) => val as i64, + NumericalValue::F64(_) => unreachable!(), + } + } +} + +impl Coerce for u64 { + fn coerce(value: NumericalValue) -> Self { + match value { + NumericalValue::I64(val) => val as u64, + NumericalValue::U64(val) => val, + NumericalValue::F64(_) => unreachable!(), + } + } +} + +impl Coerce for f64 { + fn coerce(value: NumericalValue) -> Self { + match value { + NumericalValue::I64(val) => val as f64, + NumericalValue::U64(val) => val as f64, + NumericalValue::F64(val) => val, + } + } +} + +#[cfg(test)] +mod tests { + use super::NumericalType; + + #[test] + fn test_numerical_type_code() { + let mut num_numerical_type = 0; + for code in u8::MIN..=u8::MAX { + if let Ok(numerical_type) = NumericalType::try_from_code(code) { + assert_eq!(numerical_type.to_code(), code); + num_numerical_type += 1; + } + } + assert_eq!(num_numerical_type, 3); + } +} diff --git a/columnar/src/writer/column_operation.rs b/columnar/src/writer/column_operation.rs new file mode 100644 index 000000000..9e24d32e1 --- /dev/null +++ b/columnar/src/writer/column_operation.rs @@ -0,0 +1,346 @@ +use crate::dictionary::UnorderedId; +use crate::utils::{place_bits, pop_first_byte, select_bits}; +use crate::value::NumericalValue; +use crate::{DocId, InvalidData, NumericalType}; + +/// When we build a columnar dataframe, we first just group +/// all mutations per column, and appends them in append-only buffer +/// in the stacker. +/// +/// These ColumnOperation are therefore serialize/deserialized +/// in memory. +/// +/// We represents all of these operations as `ColumnOperation`. +#[derive(Eq, PartialEq, Debug, Clone, Copy)] +pub(super) enum ColumnOperation { + NewDoc(DocId), + Value(T), +} + +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +struct ColumnOperationMetadata { + op_type: ColumnOperationType, + len: u8, +} + +impl ColumnOperationMetadata { + fn to_code(self) -> u8 { + place_bits::<0, 4>(self.len) | place_bits::<4, 8>(self.op_type.to_code()) + } + + fn try_from_code(code: u8) -> Result { + let len = select_bits::<0, 4>(code); + let typ_code = select_bits::<4, 8>(code); + let column_type = ColumnOperationType::try_from_code(typ_code)?; + Ok(ColumnOperationMetadata { + op_type: column_type, + len, + }) + } +} + +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +#[repr(u8)] +enum ColumnOperationType { + NewDoc = 0u8, + AddValue = 1u8, +} + +impl ColumnOperationType { + pub fn to_code(self) -> u8 { + self as u8 + } + + pub fn try_from_code(code: u8) -> Result { + match code { + 0 => Ok(Self::NewDoc), + 1 => Ok(Self::AddValue), + _ => Err(InvalidData), + } + } +} + +impl ColumnOperation { + pub(super) fn serialize(self) -> impl AsRef<[u8]> { + let mut minibuf = MiniBuffer::default(); + let column_op_metadata = match self { + ColumnOperation::NewDoc(new_doc) => { + let symbol_len = new_doc.serialize(&mut minibuf.bytes[1..]); + ColumnOperationMetadata { + op_type: ColumnOperationType::NewDoc, + len: symbol_len, + } + } + ColumnOperation::Value(val) => { + let symbol_len = val.serialize(&mut minibuf.bytes[1..]); + ColumnOperationMetadata { + op_type: ColumnOperationType::AddValue, + len: symbol_len, + } + } + }; + minibuf.bytes[0] = column_op_metadata.to_code(); + // +1 for the metadata + minibuf.len = 1 + column_op_metadata.len; + minibuf + } + + /// Deserialize a colummn operation. + /// Returns None if the buffer is empty. + /// + /// Panics if the payload is invalid: + /// this deserialize method is meant to target in memory. + pub(super) fn deserialize(bytes: &mut &[u8]) -> Option { + let column_op_metadata_byte = pop_first_byte(bytes)?; + let column_op_metadata = ColumnOperationMetadata::try_from_code(column_op_metadata_byte) + .expect("Invalid op metadata byte"); + let symbol_bytes: &[u8]; + (symbol_bytes, *bytes) = bytes.split_at(column_op_metadata.len as usize); + match column_op_metadata.op_type { + ColumnOperationType::NewDoc => { + let new_doc = u32::deserialize(symbol_bytes); + Some(ColumnOperation::NewDoc(new_doc)) + } + ColumnOperationType::AddValue => { + let value = V::deserialize(symbol_bytes); + Some(ColumnOperation::Value(value)) + } + } + } +} + +impl From for ColumnOperation { + fn from(value: T) -> Self { + ColumnOperation::Value(value) + } +} + +// Serialization trait very local to the writer. +// As we write fast fields, we accumulate them in "in memory". +// In order to limit memory usage, and in order +// to benefit from the stacker, we do this by serialization our data +// as "Symbols". +#[allow(clippy::from_over_into)] +pub(super) trait SymbolValue: Clone + Copy { + // Serializes the symbol into the given buffer. + // Returns the number of bytes written into the buffer. + /// # Panics + /// May not exceed 9bytes + fn serialize(self, buffer: &mut [u8]) -> u8; + // Panics if invalid + fn deserialize(bytes: &[u8]) -> Self; +} + +impl SymbolValue for bool { + fn serialize(self, buffer: &mut [u8]) -> u8 { + buffer[0] = if self { 1u8 } else { 0u8 }; + 1u8 + } + + fn deserialize(bytes: &[u8]) -> Self { + bytes[0] == 1u8 + } +} + +#[derive(Default)] +struct MiniBuffer { + pub bytes: [u8; 10], + pub len: u8, +} + +impl AsRef<[u8]> for MiniBuffer { + fn as_ref(&self) -> &[u8] { + &self.bytes[..self.len as usize] + } +} + +impl SymbolValue for NumericalValue { + fn deserialize(mut bytes: &[u8]) -> Self { + let type_code = pop_first_byte(&mut bytes).unwrap(); + let symbol_type = NumericalType::try_from_code(type_code).unwrap(); + let mut octet: [u8; 8] = [0u8; 8]; + octet[..bytes.len()].copy_from_slice(bytes); + match symbol_type { + NumericalType::U64 => { + let val: u64 = u64::from_le_bytes(octet); + NumericalValue::U64(val) + } + NumericalType::I64 => { + let encoded: u64 = u64::from_le_bytes(octet); + let val: i64 = decode_zig_zag(encoded); + NumericalValue::I64(val) + } + NumericalType::F64 => { + debug_assert_eq!(bytes.len(), 8); + let val: f64 = f64::from_le_bytes(octet); + NumericalValue::F64(val) + } + } + } + + /// F64: Serialize with a fixed size of 9 bytes + /// U64: Serialize without leading zeroes + /// I64: ZigZag encoded and serialize without leading zeroes + fn serialize(self, output: &mut [u8]) -> u8 { + match self { + NumericalValue::F64(val) => { + output[0] = NumericalType::F64 as u8; + output[1..9].copy_from_slice(&val.to_le_bytes()); + 9u8 + } + NumericalValue::U64(val) => { + let len = compute_num_bytes_for_u64(val) as u8; + output[0] = NumericalType::U64 as u8; + output[1..9].copy_from_slice(&val.to_le_bytes()); + len + 1u8 + } + NumericalValue::I64(val) => { + let zig_zag_encoded = encode_zig_zag(val); + let len = compute_num_bytes_for_u64(zig_zag_encoded) as u8; + output[0] = NumericalType::I64 as u8; + output[1..9].copy_from_slice(&zig_zag_encoded.to_le_bytes()); + len + 1u8 + } + } + } +} + +impl SymbolValue for u32 { + fn serialize(self, output: &mut [u8]) -> u8 { + let len = compute_num_bytes_for_u64(self as u64); + output[0..4].copy_from_slice(&self.to_le_bytes()); + len as u8 + } + + fn deserialize(bytes: &[u8]) -> Self { + let mut quartet: [u8; 4] = [0u8; 4]; + quartet[..bytes.len()].copy_from_slice(bytes); + u32::from_le_bytes(quartet) + } +} + +impl SymbolValue for UnorderedId { + fn serialize(self, output: &mut [u8]) -> u8 { + self.0.serialize(output) + } + + fn deserialize(bytes: &[u8]) -> Self { + UnorderedId(u32::deserialize(bytes)) + } +} + +fn compute_num_bytes_for_u64(val: u64) -> usize { + let msb = (64u32 - val.leading_zeros()) as usize; + (msb + 7) / 8 +} + +fn encode_zig_zag(n: i64) -> u64 { + ((n << 1) ^ (n >> 63)) as u64 +} + +fn decode_zig_zag(n: u64) -> i64 { + ((n >> 1) as i64) ^ (-((n & 1) as i64)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[track_caller] + fn test_zig_zag_aux(val: i64) { + let encoded = super::encode_zig_zag(val); + assert_eq!(decode_zig_zag(encoded), val); + if let Some(abs_val) = val.checked_abs() { + let abs_val = abs_val as u64; + assert!(encoded <= abs_val * 2); + } + } + + #[test] + fn test_zig_zag() { + assert_eq!(encode_zig_zag(0i64), 0u64); + assert_eq!(encode_zig_zag(-1i64), 1u64); + assert_eq!(encode_zig_zag(1i64), 2u64); + test_zig_zag_aux(0i64); + test_zig_zag_aux(i64::MIN); + test_zig_zag_aux(i64::MAX); + } + + use proptest::prelude::any; + use proptest::proptest; + + proptest! { + #[test] + fn test_proptest_zig_zag(val in any::()) { + test_zig_zag_aux(val); + } + } + + #[test] + fn test_column_op_metadata_byte_serialization() { + for len in 0..=15 { + for op_type in [ColumnOperationType::AddValue, ColumnOperationType::NewDoc] { + let column_op_metadata = ColumnOperationMetadata { op_type, len }; + let column_op_metadata_code = column_op_metadata.to_code(); + let serdeser_metadata = + ColumnOperationMetadata::try_from_code(column_op_metadata_code).unwrap(); + assert_eq!(column_op_metadata, serdeser_metadata); + } + } + } + + #[track_caller] + fn ser_deser_symbol(column_op: ColumnOperation) { + let buf = column_op.serialize(); + let mut buffer = buf.as_ref().to_vec(); + buffer.extend_from_slice(b"234234"); + let mut bytes = &buffer[..]; + let serdeser_symbol = ColumnOperation::deserialize(&mut bytes).unwrap(); + assert_eq!(bytes.len() + buf.as_ref().len() as usize, buffer.len()); + assert_eq!(column_op, serdeser_symbol); + } + + #[test] + fn test_compute_num_bytes_for_u64() { + assert_eq!(compute_num_bytes_for_u64(0), 0); + assert_eq!(compute_num_bytes_for_u64(1), 1); + assert_eq!(compute_num_bytes_for_u64(255), 1); + assert_eq!(compute_num_bytes_for_u64(256), 2); + assert_eq!(compute_num_bytes_for_u64((1 << 16) - 1), 2); + assert_eq!(compute_num_bytes_for_u64(1 << 16), 3); + } + + #[test] + fn test_symbol_serialization() { + ser_deser_symbol(ColumnOperation::NewDoc(0)); + ser_deser_symbol(ColumnOperation::NewDoc(3)); + ser_deser_symbol(ColumnOperation::Value(NumericalValue::I64(0i64))); + ser_deser_symbol(ColumnOperation::Value(NumericalValue::I64(1i64))); + ser_deser_symbol(ColumnOperation::Value(NumericalValue::U64(257u64))); + ser_deser_symbol(ColumnOperation::Value(NumericalValue::I64(-257i64))); + ser_deser_symbol(ColumnOperation::Value(NumericalValue::I64(i64::MIN))); + ser_deser_symbol(ColumnOperation::Value(NumericalValue::U64(0u64))); + ser_deser_symbol(ColumnOperation::Value(NumericalValue::U64(u64::MIN))); + ser_deser_symbol(ColumnOperation::Value(NumericalValue::U64(u64::MAX))); + } + + fn test_column_operation_unordered_aux(val: u32, expected_len: usize) { + let column_op = ColumnOperation::Value(UnorderedId(val)); + let minibuf = column_op.serialize(); + assert_eq!(minibuf.as_ref().len() as usize, expected_len); + let mut buf = minibuf.as_ref().to_vec(); + buf.extend_from_slice(&[2, 2, 2, 2, 2, 2]); + let mut cursor = &buf[..]; + let column_op_serdeser: ColumnOperation = + ColumnOperation::deserialize(&mut cursor).unwrap(); + assert_eq!(column_op_serdeser, ColumnOperation::Value(UnorderedId(val))); + assert_eq!(cursor.len() + expected_len, buf.len()); + } + + #[test] + fn test_column_operation_unordered() { + test_column_operation_unordered_aux(300u32, 3); + test_column_operation_unordered_aux(1u32, 2); + test_column_operation_unordered_aux(0u32, 1); + } +} diff --git a/columnar/src/writer/column_writers.rs b/columnar/src/writer/column_writers.rs new file mode 100644 index 000000000..d0e398756 --- /dev/null +++ b/columnar/src/writer/column_writers.rs @@ -0,0 +1,271 @@ +use std::cmp::Ordering; + +use stacker::{ExpUnrolledLinkedList, MemoryArena}; + +use crate::dictionary::{DictionaryBuilder, UnorderedId}; +use crate::writer::column_operation::{ColumnOperation, SymbolValue}; +use crate::{Cardinality, DocId, NumericalType, NumericalValue}; + +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +#[repr(u8)] +enum DocumentStep { + SameDoc = 0, + NextDoc = 1, + SkippedDoc = 2, +} + +#[inline(always)] +fn delta_with_last_doc(last_doc_opt: Option, doc: u32) -> DocumentStep { + let expected_next_doc = last_doc_opt.map(|last_doc| last_doc + 1).unwrap_or(0u32); + match doc.cmp(&expected_next_doc) { + Ordering::Less => DocumentStep::SameDoc, + Ordering::Equal => DocumentStep::NextDoc, + Ordering::Greater => DocumentStep::SkippedDoc, + } +} + +#[derive(Copy, Clone, Default)] +pub struct ColumnWriter { + // Detected cardinality of the column so far. + cardinality: Cardinality, + // Last document inserted. + // None if no doc has been added yet. + last_doc_opt: Option, + // Buffer containing the serialized values. + values: ExpUnrolledLinkedList, +} + +impl ColumnWriter { + /// Returns an iterator over the Symbol that have been recorded + /// for the given column. + pub(super) fn operation_iterator<'a, V: SymbolValue>( + &self, + arena: &MemoryArena, + buffer: &'a mut Vec, + ) -> impl Iterator> + 'a { + buffer.clear(); + self.values.read_to_end(arena, buffer); + let mut cursor: &[u8] = &buffer[..]; + std::iter::from_fn(move || ColumnOperation::deserialize(&mut cursor)) + } + + /// Records a change of the document being recorded. + /// + /// This function will also update the cardinality of the column + /// if necessary. + pub(super) fn record(&mut self, doc: DocId, value: S, arena: &mut MemoryArena) { + // Difference between `doc` and the last doc. + match delta_with_last_doc(self.last_doc_opt, doc) { + DocumentStep::SameDoc => { + // This is the last encounterred document. + self.cardinality = Cardinality::Multivalued; + } + DocumentStep::NextDoc => { + self.last_doc_opt = Some(doc); + self.write_symbol::(ColumnOperation::NewDoc(doc), arena); + } + DocumentStep::SkippedDoc => { + self.cardinality = self.cardinality.max(Cardinality::Optional); + self.last_doc_opt = Some(doc); + self.write_symbol::(ColumnOperation::NewDoc(doc), arena); + } + } + self.write_symbol(ColumnOperation::Value(value), arena); + } + + // Get the cardinality. + // The overall number of docs in the column is necessary to + // deal with the case where the all docs contain 1 value, except some documents + // at the end of the column. + pub(crate) fn get_cardinality(&self, num_docs: DocId) -> Cardinality { + match delta_with_last_doc(self.last_doc_opt, num_docs) { + DocumentStep::SameDoc | DocumentStep::NextDoc => self.cardinality, + DocumentStep::SkippedDoc => self.cardinality.max(Cardinality::Optional), + } + } + + /// Appends a new symbol to the `ColumnWriter`. + fn write_symbol( + &mut self, + column_operation: ColumnOperation, + arena: &mut MemoryArena, + ) { + self.values + .writer(arena) + .extend_from_slice(column_operation.serialize().as_ref()); + } +} + +#[derive(Clone, Copy, Default)] +pub(crate) struct NumericalColumnWriter { + compatible_numerical_types: CompatibleNumericalTypes, + column_writer: ColumnWriter, +} + +/// State used to store what types are still acceptable +/// after having seen a set of numerical values. +#[derive(Clone, Copy)] +struct CompatibleNumericalTypes { + all_values_within_i64_range: bool, + all_values_within_u64_range: bool, + // f64 is always acceptable. +} + +impl Default for CompatibleNumericalTypes { + fn default() -> CompatibleNumericalTypes { + CompatibleNumericalTypes { + all_values_within_i64_range: true, + all_values_within_u64_range: true, + } + } +} + +impl CompatibleNumericalTypes { + fn accept_value(&mut self, numerical_value: NumericalValue) { + match numerical_value { + NumericalValue::I64(val_i64) => { + let value_within_u64_range = val_i64 >= 0i64; + self.all_values_within_u64_range &= value_within_u64_range; + } + NumericalValue::U64(val_u64) => { + let value_within_i64_range = val_u64 < i64::MAX as u64; + self.all_values_within_i64_range &= value_within_i64_range; + } + NumericalValue::F64(_) => { + self.all_values_within_i64_range = false; + self.all_values_within_u64_range = false; + } + } + } + + pub fn to_numerical_type(self) -> NumericalType { + if self.all_values_within_i64_range { + NumericalType::I64 + } else if self.all_values_within_u64_range { + NumericalType::U64 + } else { + NumericalType::F64 + } + } +} + +impl NumericalColumnWriter { + pub fn column_type_and_cardinality(&self, num_docs: DocId) -> (NumericalType, Cardinality) { + let numerical_type = self.compatible_numerical_types.to_numerical_type(); + let cardinality = self.column_writer.get_cardinality(num_docs); + (numerical_type, cardinality) + } + + pub fn record_numerical_value( + &mut self, + doc: DocId, + value: NumericalValue, + arena: &mut MemoryArena, + ) { + self.compatible_numerical_types.accept_value(value); + self.column_writer.record(doc, value, arena); + } + + pub(super) fn operation_iterator<'a>( + self, + arena: &MemoryArena, + buffer: &'a mut Vec, + ) -> impl Iterator> + 'a { + self.column_writer.operation_iterator(arena, buffer) + } +} + +#[derive(Copy, Clone, Default)] +pub(crate) struct StrColumnWriter { + pub(crate) dictionary_id: u32, + pub(crate) column_writer: ColumnWriter, +} + +impl StrColumnWriter { + pub(crate) fn with_dictionary_id(dictionary_id: u32) -> StrColumnWriter { + StrColumnWriter { + dictionary_id, + column_writer: Default::default(), + } + } + + pub(crate) fn record_bytes( + &mut self, + doc: DocId, + bytes: &[u8], + dictionaries: &mut [DictionaryBuilder], + arena: &mut MemoryArena, + ) { + let unordered_id = dictionaries[self.dictionary_id as usize].get_or_allocate_id(bytes); + self.column_writer.record(doc, unordered_id, arena); + } + + pub(super) fn operation_iterator<'a>( + &self, + arena: &MemoryArena, + byte_buffer: &'a mut Vec, + ) -> impl Iterator> + 'a { + self.column_writer.operation_iterator(arena, byte_buffer) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_delta_with_last_doc() { + assert_eq!(delta_with_last_doc(None, 0u32), DocumentStep::NextDoc); + assert_eq!(delta_with_last_doc(None, 1u32), DocumentStep::SkippedDoc); + assert_eq!(delta_with_last_doc(None, 2u32), DocumentStep::SkippedDoc); + assert_eq!(delta_with_last_doc(Some(0u32), 0u32), DocumentStep::SameDoc); + assert_eq!(delta_with_last_doc(Some(1u32), 1u32), DocumentStep::SameDoc); + assert_eq!(delta_with_last_doc(Some(1u32), 2u32), DocumentStep::NextDoc); + assert_eq!( + delta_with_last_doc(Some(1u32), 3u32), + DocumentStep::SkippedDoc + ); + assert_eq!( + delta_with_last_doc(Some(1u32), 4u32), + DocumentStep::SkippedDoc + ); + } + + #[track_caller] + fn test_column_writer_coercion_iter_aux( + values: impl Iterator, + expected_numerical_type: NumericalType, + ) { + let mut compatible_numerical_types = CompatibleNumericalTypes::default(); + for value in values { + compatible_numerical_types.accept_value(value); + } + assert_eq!( + compatible_numerical_types.to_numerical_type(), + expected_numerical_type + ); + } + + #[track_caller] + fn test_column_writer_coercion_aux( + values: &[NumericalValue], + expected_numerical_type: NumericalType, + ) { + test_column_writer_coercion_iter_aux(values.iter().copied(), expected_numerical_type); + test_column_writer_coercion_iter_aux(values.iter().rev().copied(), expected_numerical_type); + } + + #[test] + fn test_column_writer_coercion() { + test_column_writer_coercion_aux(&[], NumericalType::I64); + test_column_writer_coercion_aux(&[1i64.into()], NumericalType::I64); + test_column_writer_coercion_aux(&[1u64.into()], NumericalType::I64); + // We don't detect exact integer at the moment. We could! + test_column_writer_coercion_aux(&[1f64.into()], NumericalType::F64); + test_column_writer_coercion_aux(&[u64::MAX.into()], NumericalType::U64); + test_column_writer_coercion_aux(&[(i64::MAX as u64).into()], NumericalType::U64); + test_column_writer_coercion_aux(&[(1u64 << 63).into()], NumericalType::U64); + test_column_writer_coercion_aux(&[1i64.into(), 1u64.into()], NumericalType::I64); + test_column_writer_coercion_aux(&[u64::MAX.into(), (-1i64).into()], NumericalType::F64); + } +} diff --git a/columnar/src/writer/mod.rs b/columnar/src/writer/mod.rs new file mode 100644 index 000000000..8812ac4f0 --- /dev/null +++ b/columnar/src/writer/mod.rs @@ -0,0 +1,517 @@ +mod column_operation; +mod column_writers; +mod serializer; +mod value_index; + +use std::io; + +use column_operation::ColumnOperation; +use common::CountingWriter; +use fastfield_codecs::serialize::ValueIndexInfo; +use fastfield_codecs::{Column, MonotonicallyMappableToU64, VecColumn}; +use serializer::ColumnarSerializer; +use stacker::{Addr, ArenaHashMap, MemoryArena}; + +use crate::column_type_header::{ColumnType, ColumnTypeAndCardinality, ColumnTypeCategory}; +use crate::dictionary::{DictionaryBuilder, TermIdMapping, UnorderedId}; +use crate::value::{Coerce, NumericalType, NumericalValue}; +use crate::writer::column_writers::{ColumnWriter, NumericalColumnWriter, StrColumnWriter}; +use crate::writer::value_index::{IndexBuilder, SpareIndexBuilders}; +use crate::{Cardinality, DocId}; + +/// This is a set of buffers that are used to temporarily write the values into before passing them +/// to the fast field codecs. +#[derive(Default)] +struct SpareBuffers { + value_index_builders: SpareIndexBuilders, + i64_values: Vec, + u64_values: Vec, + f64_values: Vec, + bool_values: Vec, +} + +/// Makes it possible to create a new columnar. +/// +/// ```rust +/// use tantivy_columnar::ColumnarWriter; +/// fn main() { +/// let mut columnar_writer = ColumnarWriter::default(); +/// columnar_writer.record_str(0u32 /* doc id */, "product_name", "Red backpack"); +/// columnar_writer.record_numerical(0u32 /* doc id */, "price", 10u64); +/// columnar_writer.record_str(1u32 /* doc id */, "product_name", "Apple"); +/// columnar_writer.record_numerical(0u32 /* doc id */, "price", 10.5f64); //< uh oh we ended up mixing integer and floats. +/// let mut wrt: Vec = Vec::new(); +/// columnar_writer.serialize(2u32, &mut wrt).unwrap(); +/// } +/// ``` +pub struct ColumnarWriter { + numerical_field_hash_map: ArenaHashMap, + bool_field_hash_map: ArenaHashMap, + bytes_field_hash_map: ArenaHashMap, + arena: MemoryArena, + // Dictionaries used to store dictionary-encoded values. + dictionaries: Vec, + buffers: SpareBuffers, +} + +impl Default for ColumnarWriter { + fn default() -> Self { + ColumnarWriter { + numerical_field_hash_map: ArenaHashMap::new(10_000), + bool_field_hash_map: ArenaHashMap::new(10_000), + bytes_field_hash_map: ArenaHashMap::new(10_000), + dictionaries: Vec::new(), + arena: MemoryArena::default(), + buffers: SpareBuffers::default(), + } + } +} + +impl ColumnarWriter { + pub fn record_numerical + Copy>( + &mut self, + doc: DocId, + column_name: &str, + numerical_value: T, + ) { + assert!( + !column_name.as_bytes().contains(&0u8), + "key may not contain the 0 byte" + ); + let (hash_map, arena) = (&mut self.numerical_field_hash_map, &mut self.arena); + hash_map.mutate_or_create( + column_name.as_bytes(), + |column_opt: Option| { + let mut column: NumericalColumnWriter = column_opt.unwrap_or_default(); + column.record_numerical_value(doc, numerical_value.into(), arena); + column + }, + ); + } + + pub fn record_bool(&mut self, doc: DocId, column_name: &str, val: bool) { + assert!( + !column_name.as_bytes().contains(&0u8), + "key may not contain the 0 byte" + ); + let (hash_map, arena) = (&mut self.bool_field_hash_map, &mut self.arena); + hash_map.mutate_or_create( + column_name.as_bytes(), + |column_opt: Option| { + let mut column: ColumnWriter = column_opt.unwrap_or_default(); + column.record(doc, val, arena); + column + }, + ); + } + + pub fn record_str(&mut self, doc: DocId, column_name: &str, value: &str) { + assert!( + !column_name.as_bytes().contains(&0u8), + "key may not contain the 0 byte" + ); + let (hash_map, arena, dictionaries) = ( + &mut self.bytes_field_hash_map, + &mut self.arena, + &mut self.dictionaries, + ); + hash_map.mutate_or_create( + column_name.as_bytes(), + |column_opt: Option| { + let mut column: StrColumnWriter = column_opt.unwrap_or_else(|| { + // Each column has its own dictionary + let dictionary_id = dictionaries.len() as u32; + dictionaries.push(DictionaryBuilder::default()); + StrColumnWriter::with_dictionary_id(dictionary_id) + }); + column.record_bytes(doc, value.as_bytes(), dictionaries, arena); + column + }, + ); + } + + pub fn serialize(&mut self, num_docs: DocId, wrt: &mut dyn io::Write) -> io::Result<()> { + let mut serializer = ColumnarSerializer::new(wrt); + let mut field_columns: Vec<(&[u8], ColumnTypeCategory, Addr)> = self + .numerical_field_hash_map + .iter() + .map(|(term, addr, _)| (term, ColumnTypeCategory::Numerical, addr)) + .collect(); + field_columns.extend( + self.bytes_field_hash_map + .iter() + .map(|(term, addr, _)| (term, ColumnTypeCategory::Str, addr)), + ); + field_columns.extend( + self.bool_field_hash_map + .iter() + .map(|(term, addr, _)| (term, ColumnTypeCategory::Bool, addr)), + ); + field_columns.sort_unstable_by_key(|(column_name, col_type, _)| (*column_name, *col_type)); + let (arena, buffers, dictionaries) = (&self.arena, &mut self.buffers, &self.dictionaries); + let mut symbol_byte_buffer: Vec = Vec::new(); + for (column_name, bytes_or_numerical, addr) in field_columns { + match bytes_or_numerical { + ColumnTypeCategory::Bool => { + let column_writer: ColumnWriter = self.bool_field_hash_map.read(addr); + let cardinality = column_writer.get_cardinality(num_docs); + let column_type_and_cardinality = ColumnTypeAndCardinality { + cardinality, + typ: ColumnType::Bool, + }; + let mut column_serializer = + serializer.serialize_column(column_name, column_type_and_cardinality); + serialize_bool_column( + cardinality, + num_docs, + column_writer.operation_iterator(arena, &mut symbol_byte_buffer), + buffers, + &mut column_serializer, + )?; + } + ColumnTypeCategory::Str => { + let str_column_writer: StrColumnWriter = self.bytes_field_hash_map.read(addr); + let dictionary_builder = + &dictionaries[str_column_writer.dictionary_id as usize]; + let cardinality = str_column_writer.column_writer.get_cardinality(num_docs); + let column_type_and_cardinality = ColumnTypeAndCardinality { + cardinality, + typ: ColumnType::Bytes, + }; + let mut column_serializer = + serializer.serialize_column(column_name, column_type_and_cardinality); + serialize_bytes_column( + cardinality, + num_docs, + dictionary_builder, + str_column_writer.operation_iterator(arena, &mut symbol_byte_buffer), + buffers, + &mut column_serializer, + )?; + } + ColumnTypeCategory::Numerical => { + let numerical_column_writer: NumericalColumnWriter = + self.numerical_field_hash_map.read(addr); + let (numerical_type, cardinality) = + numerical_column_writer.column_type_and_cardinality(num_docs); + let column_type_and_cardinality = ColumnTypeAndCardinality { + cardinality, + typ: ColumnType::Numerical(numerical_type), + }; + let mut column_serializer = + serializer.serialize_column(column_name, column_type_and_cardinality); + serialize_numerical_column( + cardinality, + num_docs, + numerical_type, + numerical_column_writer.operation_iterator(arena, &mut symbol_byte_buffer), + buffers, + &mut column_serializer, + )?; + } + }; + } + serializer.finalize()?; + Ok(()) + } +} + +fn serialize_bytes_column( + cardinality: Cardinality, + num_docs: DocId, + dictionary_builder: &DictionaryBuilder, + operation_it: impl Iterator>, + buffers: &mut SpareBuffers, + wrt: impl io::Write, +) -> io::Result<()> { + let SpareBuffers { + value_index_builders, + u64_values, + .. + } = buffers; + let mut counting_writer = CountingWriter::wrap(wrt); + let term_id_mapping: TermIdMapping = dictionary_builder.serialize(&mut counting_writer)?; + let dictionary_num_bytes: u32 = counting_writer.written_bytes() as u32; + let mut wrt = counting_writer.finish(); + let operation_iterator = operation_it.map(|symbol: ColumnOperation| { + // We map unordered ids to ordered ids. + match symbol { + ColumnOperation::Value(unordered_id) => { + let ordered_id = term_id_mapping.to_ord(unordered_id); + ColumnOperation::Value(ordered_id.0 as u64) + } + ColumnOperation::NewDoc(doc) => ColumnOperation::NewDoc(doc), + } + }); + serialize_column( + operation_iterator, + cardinality, + num_docs, + value_index_builders, + u64_values, + &mut wrt, + )?; + wrt.write_all(&dictionary_num_bytes.to_le_bytes()[..])?; + Ok(()) +} + +fn serialize_numerical_column( + cardinality: Cardinality, + num_docs: DocId, + numerical_type: NumericalType, + op_iterator: impl Iterator>, + buffers: &mut SpareBuffers, + wrt: &mut impl io::Write, +) -> io::Result<()> { + let SpareBuffers { + value_index_builders, + u64_values, + i64_values, + f64_values, + .. + } = buffers; + match numerical_type { + NumericalType::I64 => { + serialize_column( + coerce_numerical_symbol::(op_iterator), + cardinality, + num_docs, + value_index_builders, + i64_values, + wrt, + )?; + } + NumericalType::U64 => { + serialize_column( + coerce_numerical_symbol::(op_iterator), + cardinality, + num_docs, + value_index_builders, + u64_values, + wrt, + )?; + } + NumericalType::F64 => { + serialize_column( + coerce_numerical_symbol::(op_iterator), + cardinality, + num_docs, + value_index_builders, + f64_values, + wrt, + )?; + } + }; + Ok(()) +} + +fn serialize_bool_column( + cardinality: Cardinality, + num_docs: DocId, + column_operations_it: impl Iterator>, + buffers: &mut SpareBuffers, + wrt: &mut impl io::Write, +) -> io::Result<()> { + let SpareBuffers { + value_index_builders, + bool_values, + .. + } = buffers; + serialize_column( + column_operations_it, + cardinality, + num_docs, + value_index_builders, + bool_values, + wrt, + )?; + Ok(()) +} + +fn serialize_column< + T: Copy + Default + std::fmt::Debug + Send + Sync + MonotonicallyMappableToU64 + PartialOrd, +>( + op_iterator: impl Iterator>, + cardinality: Cardinality, + num_docs: DocId, + value_index_builders: &mut SpareIndexBuilders, + values: &mut Vec, + mut wrt: impl io::Write, +) -> io::Result<()> +where + for<'a> VecColumn<'a, T>: Column, +{ + values.clear(); + match cardinality { + Cardinality::Required => { + consume_operation_iterator( + op_iterator, + value_index_builders.borrow_required_index_builder(), + values, + ); + fastfield_codecs::serialize( + VecColumn::from(&values[..]), + &mut wrt, + &fastfield_codecs::ALL_CODEC_TYPES[..], + )?; + } + Cardinality::Optional => { + let optional_index_builder = value_index_builders.borrow_optional_index_builder(); + consume_operation_iterator(op_iterator, optional_index_builder, values); + let optional_index = optional_index_builder.finish(num_docs); + fastfield_codecs::serialize::serialize_new( + ValueIndexInfo::SingleValue(Box::new(optional_index)), + VecColumn::from(&values[..]), + &mut wrt, + &fastfield_codecs::ALL_CODEC_TYPES[..], + )?; + } + Cardinality::Multivalued => { + let multivalued_index_builder = value_index_builders.borrow_multivalued_index_builder(); + consume_operation_iterator(op_iterator, multivalued_index_builder, values); + let multivalued_index = multivalued_index_builder.finish(num_docs); + fastfield_codecs::serialize::serialize_new( + ValueIndexInfo::MultiValue(Box::new(multivalued_index)), + VecColumn::from(&values[..]), + &mut wrt, + &fastfield_codecs::ALL_CODEC_TYPES[..], + )?; + } + } + Ok(()) +} + +fn coerce_numerical_symbol( + operation_iterator: impl Iterator>, +) -> impl Iterator> +where T: Coerce { + operation_iterator.map(|symbol| match symbol { + ColumnOperation::NewDoc(doc) => ColumnOperation::NewDoc(doc), + ColumnOperation::Value(numerical_value) => { + ColumnOperation::Value(Coerce::coerce(numerical_value)) + } + }) +} + +fn consume_operation_iterator( + operation_iterator: impl Iterator>, + index_builder: &mut TIndexBuilder, + values: &mut Vec, +) { + for symbol in operation_iterator { + match symbol { + ColumnOperation::NewDoc(doc) => { + index_builder.record_doc(doc); + } + ColumnOperation::Value(value) => { + index_builder.record_value(); + values.push(value); + } + } + } +} + +#[cfg(test)] +mod tests { + use column_operation::ColumnOperation; + use stacker::MemoryArena; + + use super::*; + use crate::value::NumericalValue; + use crate::Cardinality; + + #[test] + fn test_column_writer_required_simple() { + let mut arena = MemoryArena::default(); + let mut column_writer = super::ColumnWriter::default(); + column_writer.record(0u32, NumericalValue::from(14i64), &mut arena); + column_writer.record(1u32, NumericalValue::from(15i64), &mut arena); + column_writer.record(2u32, NumericalValue::from(-16i64), &mut arena); + assert_eq!(column_writer.get_cardinality(3), Cardinality::Required); + let mut buffer = Vec::new(); + let symbols: Vec> = column_writer + .operation_iterator(&mut arena, &mut buffer) + .collect(); + assert_eq!(symbols.len(), 6); + assert!(matches!(symbols[0], ColumnOperation::NewDoc(0u32))); + assert!(matches!( + symbols[1], + ColumnOperation::Value(NumericalValue::I64(14i64)) + )); + assert!(matches!(symbols[2], ColumnOperation::NewDoc(1u32))); + assert!(matches!( + symbols[3], + ColumnOperation::Value(NumericalValue::I64(15i64)) + )); + assert!(matches!(symbols[4], ColumnOperation::NewDoc(2u32))); + assert!(matches!( + symbols[5], + ColumnOperation::Value(NumericalValue::I64(-16i64)) + )); + } + + #[test] + fn test_column_writer_optional_cardinality_missing_first() { + let mut arena = MemoryArena::default(); + let mut column_writer = super::ColumnWriter::default(); + column_writer.record(1u32, NumericalValue::from(15i64), &mut arena); + column_writer.record(2u32, NumericalValue::from(-16i64), &mut arena); + assert_eq!(column_writer.get_cardinality(3), Cardinality::Optional); + let mut buffer = Vec::new(); + let symbols: Vec> = column_writer + .operation_iterator(&mut arena, &mut buffer) + .collect(); + assert_eq!(symbols.len(), 4); + assert!(matches!(symbols[0], ColumnOperation::NewDoc(1u32))); + assert!(matches!( + symbols[1], + ColumnOperation::Value(NumericalValue::I64(15i64)) + )); + assert!(matches!(symbols[2], ColumnOperation::NewDoc(2u32))); + assert!(matches!( + symbols[3], + ColumnOperation::Value(NumericalValue::I64(-16i64)) + )); + } + + #[test] + fn test_column_writer_optional_cardinality_missing_last() { + let mut arena = MemoryArena::default(); + let mut column_writer = super::ColumnWriter::default(); + column_writer.record(0u32, NumericalValue::from(15i64), &mut arena); + assert_eq!(column_writer.get_cardinality(2), Cardinality::Optional); + let mut buffer = Vec::new(); + let symbols: Vec> = column_writer + .operation_iterator(&mut arena, &mut buffer) + .collect(); + assert_eq!(symbols.len(), 2); + assert!(matches!(symbols[0], ColumnOperation::NewDoc(0u32))); + assert!(matches!( + symbols[1], + ColumnOperation::Value(NumericalValue::I64(15i64)) + )); + } + + #[test] + fn test_column_writer_multivalued() { + let mut arena = MemoryArena::default(); + let mut column_writer = super::ColumnWriter::default(); + column_writer.record(0u32, NumericalValue::from(16i64), &mut arena); + column_writer.record(0u32, NumericalValue::from(17i64), &mut arena); + assert_eq!(column_writer.get_cardinality(1), Cardinality::Multivalued); + let mut buffer = Vec::new(); + let symbols: Vec> = column_writer + .operation_iterator(&mut arena, &mut buffer) + .collect(); + assert_eq!(symbols.len(), 3); + assert!(matches!(symbols[0], ColumnOperation::NewDoc(0u32))); + assert!(matches!( + symbols[1], + ColumnOperation::Value(NumericalValue::I64(16i64)) + )); + assert!(matches!( + symbols[2], + ColumnOperation::Value(NumericalValue::I64(17i64)) + )); + } +} diff --git a/columnar/src/writer/serializer.rs b/columnar/src/writer/serializer.rs new file mode 100644 index 000000000..fa351f496 --- /dev/null +++ b/columnar/src/writer/serializer.rs @@ -0,0 +1,116 @@ +use std::io; +use std::io::Write; + +use common::CountingWriter; +use sstable::value::RangeValueWriter; +use sstable::RangeSSTable; + +use crate::column_type_header::ColumnTypeAndCardinality; + +pub struct ColumnarSerializer { + wrt: CountingWriter, + sstable_range: sstable::Writer, RangeValueWriter>, + prepare_key_buffer: Vec, +} + +/// Returns a key consisting of the concatenation of the key and the column_type_and_cardinality +/// code. +fn prepare_key<'a>( + key: &[u8], + column_type_cardinality: ColumnTypeAndCardinality, + buffer: &'a mut Vec, +) { + buffer.clear(); + buffer.extend_from_slice(key); + buffer.push(0u8); + buffer.push(column_type_cardinality.to_code()); +} + +impl ColumnarSerializer { + pub(crate) fn new(wrt: W) -> ColumnarSerializer { + let sstable_range: sstable::Writer, RangeValueWriter> = + sstable::Dictionary::::builder(Vec::with_capacity(100_000)).unwrap(); + ColumnarSerializer { + wrt: CountingWriter::wrap(wrt), + sstable_range, + prepare_key_buffer: Vec::new(), + } + } + + pub fn serialize_column<'a>( + &'a mut self, + column_name: &[u8], + column_type_cardinality: ColumnTypeAndCardinality, + ) -> impl io::Write + 'a { + let start_offset = self.wrt.written_bytes(); + prepare_key( + column_name, + column_type_cardinality, + &mut self.prepare_key_buffer, + ); + ColumnSerializer { + columnar_serializer: self, + start_offset, + } + } + + pub(crate) fn finalize(mut self) -> io::Result<()> { + let sstable_bytes: Vec = self.sstable_range.finish()?; + let sstable_num_bytes: u64 = sstable_bytes.len() as u64; + self.wrt.write_all(&sstable_bytes)?; + self.wrt.write_all(&sstable_num_bytes.to_le_bytes()[..])?; + Ok(()) + } +} + +struct ColumnSerializer<'a, W: io::Write> { + columnar_serializer: &'a mut ColumnarSerializer, + start_offset: u64, +} + +impl<'a, W: io::Write> Drop for ColumnSerializer<'a, W> { + fn drop(&mut self) { + let end_offset: u64 = self.columnar_serializer.wrt.written_bytes(); + let byte_range = self.start_offset..end_offset; + self.columnar_serializer.sstable_range.insert_cannot_fail( + &self.columnar_serializer.prepare_key_buffer[..], + &byte_range, + ); + self.columnar_serializer.prepare_key_buffer.clear(); + } +} + +impl<'a, W: io::Write> io::Write for ColumnSerializer<'a, W> { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.columnar_serializer.wrt.write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + self.columnar_serializer.wrt.flush() + } + + fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { + self.columnar_serializer.wrt.write_all(buf) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::column_type_header::ColumnType; + use crate::Cardinality; + + #[test] + fn test_prepare_key_bytes() { + let mut buffer: Vec = b"somegarbage".to_vec(); + let column_type_and_cardinality = ColumnTypeAndCardinality { + typ: ColumnType::Bytes, + cardinality: Cardinality::Optional, + }; + prepare_key(b"root\0child", column_type_and_cardinality, &mut buffer); + assert_eq!(buffer.len(), 12); + assert_eq!(&buffer[..10], b"root\0child"); + assert_eq!(buffer[10], 0u8); + assert_eq!(buffer[11], column_type_and_cardinality.to_code()); + } +} diff --git a/columnar/src/writer/value_index.rs b/columnar/src/writer/value_index.rs new file mode 100644 index 000000000..8d97877a1 --- /dev/null +++ b/columnar/src/writer/value_index.rs @@ -0,0 +1,220 @@ +use fastfield_codecs::serialize::{MultiValueIndexInfo, SingleValueIndexInfo}; + +use crate::DocId; + +/// The `IndexBuilder` interprets a sequence of +/// calls of the form: +/// (record_doc,record_value+)* +/// and can then serialize the results into an index to associate docids with their value[s]. +/// +/// It has different implementation depending on whether the +/// cardinality is required, optional, or multivalued. +pub(crate) trait IndexBuilder { + fn record_doc(&mut self, doc: DocId); + #[inline] + fn record_value(&mut self) {} +} + +/// The RequiredIndexBuilder does nothing. +#[derive(Default)] +pub struct RequiredIndexBuilder; + +impl IndexBuilder for RequiredIndexBuilder { + #[inline(always)] + fn record_doc(&mut self, _doc: DocId) {} +} + +#[derive(Default)] +pub struct OptionalIndexBuilder { + docs: Vec, +} + +struct SingleValueArrayIndex<'a> { + // DocIds with a value. DocIds are strictly increasing + docs: &'a [DocId], + num_docs: DocId, +} + +impl<'a> SingleValueIndexInfo for SingleValueArrayIndex<'a> { + fn num_vals(&self) -> u32 { + self.num_docs as u32 + } + + fn num_non_nulls(&self) -> u32 { + self.docs.len() as u32 + } + + fn iter(&self) -> Box + '_> { + Box::new(self.docs.iter().copied()) + } +} + +impl OptionalIndexBuilder { + pub fn finish(&mut self, num_docs: DocId) -> impl SingleValueIndexInfo + '_ { + debug_assert!(self + .docs + .last() + .copied() + .map(|last_doc| last_doc < num_docs) + .unwrap_or(true)); + SingleValueArrayIndex { + docs: &self.docs[..], + num_docs, + } + } + + fn reset(&mut self) { + self.docs.clear(); + } +} + +impl IndexBuilder for OptionalIndexBuilder { + #[inline(always)] + fn record_doc(&mut self, doc: DocId) { + debug_assert!(self + .docs + .last() + .copied() + .map(|prev_doc| doc > prev_doc) + .unwrap_or(true)); + self.docs.push(doc); + } +} + +#[derive(Default)] +pub struct MultivaluedIndexBuilder { + // TODO should we switch to `start_offset`? + // contains the num values so far for each `DocId`. + end_offsets: Vec, + total_num_vals_seen: u32, +} + +pub struct MultivaluedValueArrayIndex<'a> { + end_offsets: &'a [DocId], +} + +impl<'a> MultiValueIndexInfo for MultivaluedValueArrayIndex<'a> { + fn num_docs(&self) -> u32 { + self.end_offsets.len() as u32 + } + + fn num_vals(&self) -> u32 { + self.end_offsets.last().copied().unwrap_or(0u32) + } + + fn iter(&self) -> Box + '_> { + if self.end_offsets.is_empty() { + return Box::new(std::iter::empty()); + } + let n = self.end_offsets.len(); + Box::new(std::iter::once(0u32).chain(self.end_offsets[..n - 1].iter().copied())) + } +} + +impl MultivaluedIndexBuilder { + pub fn finish(&mut self, num_docs: DocId) -> impl MultiValueIndexInfo + '_ { + self.end_offsets + .resize(num_docs as usize, self.total_num_vals_seen); + MultivaluedValueArrayIndex { + end_offsets: &self.end_offsets[..], + } + } + + fn reset(&mut self) { + self.end_offsets.clear(); + self.total_num_vals_seen = 0; + } +} + +impl IndexBuilder for MultivaluedIndexBuilder { + fn record_doc(&mut self, doc: DocId) { + self.end_offsets + .resize(doc as usize, self.total_num_vals_seen); + } + + fn record_value(&mut self) { + self.total_num_vals_seen += 1; + } +} + +/// The `SpareIndexBuilders` is there to avoid allocating a +/// new index builder for every single column. +#[derive(Default)] +pub struct SpareIndexBuilders { + required_index_builder: RequiredIndexBuilder, + optional_index_builder: OptionalIndexBuilder, + multivalued_index_builder: MultivaluedIndexBuilder, +} + +impl SpareIndexBuilders { + pub fn borrow_required_index_builder(&mut self) -> &mut RequiredIndexBuilder { + &mut self.required_index_builder + } + + pub fn borrow_optional_index_builder(&mut self) -> &mut OptionalIndexBuilder { + self.optional_index_builder.reset(); + &mut self.optional_index_builder + } + + pub fn borrow_multivalued_index_builder(&mut self) -> &mut MultivaluedIndexBuilder { + self.multivalued_index_builder.reset(); + &mut self.multivalued_index_builder + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_optional_value_index_builder() { + let mut opt_value_index_builder = OptionalIndexBuilder::default(); + opt_value_index_builder.record_doc(0u32); + opt_value_index_builder.record_value(); + assert_eq!( + &opt_value_index_builder + .finish(1u32) + .iter() + .collect::>(), + &[0] + ); + opt_value_index_builder.reset(); + opt_value_index_builder.record_doc(1u32); + opt_value_index_builder.record_value(); + assert_eq!( + &opt_value_index_builder + .finish(2u32) + .iter() + .collect::>(), + &[1] + ); + } + + #[test] + fn test_multivalued_value_index_builder() { + let mut multivalued_value_index_builder = MultivaluedIndexBuilder::default(); + multivalued_value_index_builder.record_doc(1u32); + multivalued_value_index_builder.record_value(); + multivalued_value_index_builder.record_value(); + multivalued_value_index_builder.record_doc(2u32); + multivalued_value_index_builder.record_value(); + assert_eq!( + multivalued_value_index_builder + .finish(4u32) + .iter() + .collect::>(), + vec![0, 0, 2, 3] + ); + multivalued_value_index_builder.reset(); + multivalued_value_index_builder.record_doc(2u32); + multivalued_value_index_builder.record_value(); + multivalued_value_index_builder.record_value(); + assert_eq!( + multivalued_value_index_builder + .finish(4u32) + .iter() + .collect::>(), + vec![0, 0, 0, 2] + ); + } +} diff --git a/fastfield_codecs/src/lib.rs b/fastfield_codecs/src/lib.rs index 79763a153..1ae7a6b28 100644 --- a/fastfield_codecs/src/lib.rs +++ b/fastfield_codecs/src/lib.rs @@ -42,7 +42,7 @@ mod null_index_footer; mod column; mod gcd; -mod serialize; +pub mod serialize; use self::bitpacked::BitpackedCodec; use self::blockwise_linear::BlockwiseLinearCodec; diff --git a/sstable/src/lib.rs b/sstable/src/lib.rs index 6237ea7de..725b3fa41 100644 --- a/sstable/src/lib.rs +++ b/sstable/src/lib.rs @@ -209,8 +209,11 @@ where } /// Inserts a `(key, value)` pair in the term dictionary. + /// Keys have to be inserted in order. /// - /// *Keys have to be inserted in order.* + /// # Panics + /// + /// Will panics if keys are inserted in an invalid order. #[inline] pub fn insert>( &mut self, @@ -295,6 +298,17 @@ where Ok(wrt.into_inner()?) } } + +impl Writer, TValueWriter> +where TValueWriter: value::ValueWriter +{ + #[inline] + pub fn insert_cannot_fail>(&mut self, key: K, value: &TValueWriter::Value) { + self.insert(key, value) + .expect("SSTable over a Vec should never fail"); + } +} + #[cfg(test)] mod test { use std::io;