diff --git a/Cargo.toml b/Cargo.toml index dadb7ee99..21b412b87 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -108,7 +108,7 @@ unstable = [] # useful for benches. quickwit = ["sstable"] [workspace] -members = ["query-grammar", "bitpacker", "common", "fastfield_codecs", "ownedbytes", "stacker", "sstable"] +members = ["query-grammar", "bitpacker", "common", "fastfield_codecs", "ownedbytes", "stacker", "sstable", "columnar"] # Following the "fail" crate best practises, we isolate # tests that define specific behavior in fail check points diff --git a/columnar/Cargo.toml b/columnar/Cargo.toml new file mode 100644 index 000000000..55ff5c761 --- /dev/null +++ b/columnar/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "tantivy-columnar" +version = "0.1.0" +edition = "2021" + +[dependencies] +stacker = { path = "../stacker", package="tantivy-stacker"} +serde_json = "1" +thiserror = "1" +fnv = "1" +tantivy-fst = "0.4.0" +sstable = { path = "../sstable", package = "tantivy-sstable" } +common = { path = "../common", package = "tantivy-common" } +fastfield_codecs = { path = "../fastfield_codecs"} +ordered-float = "3.4" +itertools = "0.10" + +[features] +# default = ["quickwit"] +# quickwit = ["common/quickwit"] + + + + +[dev-dependencies] +proptest = "1" diff --git a/columnar/README.md b/columnar/README.md new file mode 100644 index 000000000..35a4f778d --- /dev/null +++ b/columnar/README.md @@ -0,0 +1,33 @@ +# Columnar format + +This crate describes columnar format used in tantivy. + + +## Goals + +This format is special in the following way. +- it needs to be compact +- it does not required to be loaded in memory. +- it is designed to fit well with quickwit's strange constraint: +we need to be able to load columns rapidly. +- columns of several types can be associated with the same column name. +- it needs to support columns with different types `(str, u64, i64, f64)` +and different cardinality `(required, optional, multivalued)`. +- columns, once loaded, offer cheap random access. + +# Format + +A quickwit/tantivy style sstable associated +`(column names, column_cardinality, column_type) to range of bytes. + +The format of the key is: +`[column_name][ZERO_BYTE][column_type_header: u8]` + +Column name may not contain the zero byte. + +Listing all columns associated to `column_name` can therefore +be done by listing all keys prefixed by +`[column_name][ZERO_BYTE]` + +The associated range of bytes refer to a range of bytes + diff --git a/columnar/src/column_type_header.rs b/columnar/src/column_type_header.rs new file mode 100644 index 000000000..769557eff --- /dev/null +++ b/columnar/src/column_type_header.rs @@ -0,0 +1,154 @@ +use crate::value::NumericalType; + +#[derive(Clone, Copy, Hash, Default, Debug, PartialEq, Eq, PartialOrd, Ord)] +#[repr(u8)] +pub enum Cardinality { + #[default] + Required = 0, + Optional = 1, + Multivalued = 2, +} + +impl Cardinality { + pub fn to_code(self) -> u8 { + self as u8 + } + + pub fn try_from_code(code: u8) -> Option { + match code { + 0 => Some(Cardinality::Required), + 1 => Some(Cardinality::Optional), + 2 => Some(Cardinality::Multivalued), + _ => None, + } + } +} + +#[derive(Hash, Eq, PartialEq, Debug, Clone, Copy)] +pub enum ColumnType { + Bytes, + Numerical(NumericalType), +} + +impl ColumnType { + pub fn to_code(self) -> u8 { + match self { + ColumnType::Bytes => 0u8, + ColumnType::Numerical(numerical_type) => 1u8 | (numerical_type.to_code() << 1), + } + } + + pub fn try_from_code(code: u8) -> Option { + if code == 0u8 { + return Some(ColumnType::Bytes); + } + if code & 1u8 == 0u8 { + return None; + } + let numerical_type = NumericalType::try_from_code(code >> 1)?; + Some(ColumnType::Numerical(numerical_type)) + } +} + +/// Represents the type and cardinality of a column. +/// This is encoded over one-byte and added to a column key in the +/// columnar sstable. +/// +/// Cardinality is encoded as the first two highest two bits. +/// The low 6 bits encode the column type. +#[derive(Eq, Hash, PartialEq, Debug, Copy, Clone)] +pub struct ColumnTypeAndCardinality { + pub cardinality: Cardinality, + pub typ: ColumnType, +} + +#[inline] +const fn compute_mask(num_bits: u8) -> u8 { + if num_bits == 8 { + u8::MAX + } else { + (1u8 << num_bits) - 1 + } +} + +#[inline] +fn select_bits(code: u8) -> u8 { + assert!(START <= END); + assert!(END <= 8); + let num_bits: u8 = END - START; + let mask: u8 = compute_mask(num_bits); + (code >> START) & mask +} + +#[inline] +fn place_bits(code: u8) -> u8 { + assert!(START <= END); + assert!(END <= 8); + let num_bits: u8 = END - START; + let mask: u8 = compute_mask(num_bits); + assert!(code <= mask); + code << START +} + +impl ColumnTypeAndCardinality { + pub fn to_code(self) -> u8 { + place_bits::<6, 8>(self.cardinality.to_code()) | place_bits::<0, 6>(self.typ.to_code()) + } + + pub fn try_from_code(code: u8) -> Option { + let typ_code = select_bits::<0, 6>(code); + let cardinality_code = select_bits::<6, 8>(code); + let cardinality = Cardinality::try_from_code(cardinality_code)?; + let typ = ColumnType::try_from_code(typ_code)?; + assert_eq!(typ.to_code(), typ_code); + Some(ColumnTypeAndCardinality { cardinality, typ }) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use super::ColumnTypeAndCardinality; + use crate::column_type_header::{Cardinality, ColumnType}; + + #[test] + fn test_column_type_header_to_code() { + let mut column_type_header_set: HashSet = HashSet::new(); + for code in u8::MIN..=u8::MAX { + if let Some(column_type_header) = ColumnTypeAndCardinality::try_from_code(code) { + assert_eq!(column_type_header.to_code(), code); + assert!(column_type_header_set.insert(column_type_header)); + } + } + assert_eq!( + column_type_header_set.len(), + 3 /* cardinality */ * (1 + 3) // column_types + ); + } + + #[test] + fn test_column_type_to_code() { + let mut column_type_set: HashSet = HashSet::new(); + for code in u8::MIN..=u8::MAX { + if let Some(column_type) = ColumnType::try_from_code(code) { + assert_eq!(column_type.to_code(), code); + assert!(column_type_set.insert(column_type)); + } + } + assert_eq!(column_type_set.len(), 1 + 3); + } + + #[test] + fn test_cardinality_to_code() { + let mut num_cardinality = 0; + for code in u8::MIN..=u8::MAX { + let cardinality_opt = Cardinality::try_from_code(code); + if let Some(cardinality) = cardinality_opt { + assert_eq!(cardinality.to_code(), code); + num_cardinality += 1; + } + } + assert_eq!(num_cardinality, 3); + } +} diff --git a/columnar/src/dictionary.rs b/columnar/src/dictionary.rs new file mode 100644 index 000000000..11754b7b6 --- /dev/null +++ b/columnar/src/dictionary.rs @@ -0,0 +1,78 @@ +use std::io; + +use fnv::FnvHashMap; + +fn fst_err_into_io_err(fst_err: tantivy_fst::Error) -> io::Error { + match fst_err { + tantivy_fst::Error::Fst(fst_err) => { + io::Error::new(io::ErrorKind::Other, format!("FST Error: {:?}", fst_err)) + } + tantivy_fst::Error::Io(io_err) => io_err, + } +} + +/// `DictionaryBuilder` for dictionary encoding. +/// +/// It stores the different terms encounterred and assigns them a temporary value +/// we call unordered id. +/// +/// Upon serialization, we will sort the ids and hence build a `UnorderedId -> Term ordinal` +/// mapping. +#[derive(Default)] +pub struct DictionaryBuilder { + dict: FnvHashMap, UnorderedId>, +} + +pub struct IdMapping { + unordered_to_ord: Vec, +} + +impl IdMapping { + pub fn to_ord(&self, unordered: UnorderedId) -> OrderedId { + self.unordered_to_ord[unordered.0 as usize] + } +} + +impl DictionaryBuilder { + /// Get or allocate an unordered id. + /// (This ID is simply an auto-incremented id.) + pub fn get_or_allocate_id(&mut self, term: &[u8]) -> UnorderedId { + if let Some(term_id) = self.dict.get(term) { + return *term_id; + } + let new_id = UnorderedId(self.dict.len() as u32); + self.dict.insert(term.to_vec(), new_id); + new_id + } + + /// Serialize the dictionary into an fst, and returns the + /// `UnorderedId -> TermOrdinal` map. + pub fn serialize<'a, W: io::Write + 'a>(&self, wrt: &mut W) -> io::Result { + serialize_inner(&self.dict, wrt).map_err(fst_err_into_io_err) + } +} + +/// Helper function just there for error conversion. +fn serialize_inner<'a, W: io::Write + 'a>( + dict: &FnvHashMap, UnorderedId>, + wrt: &mut W, +) -> tantivy_fst::Result { + let mut terms: Vec<(&[u8], UnorderedId)> = + dict.iter().map(|(k, v)| (k.as_slice(), *v)).collect(); + terms.sort_unstable_by_key(|(key, _)| *key); + let mut unordered_to_ord: Vec = vec![OrderedId(0u32); terms.len()]; + let mut fst_builder = tantivy_fst::MapBuilder::new(wrt)?; + for (ord, (key, unordered_id)) in terms.into_iter().enumerate() { + let ordered_id = OrderedId(ord as u32); + fst_builder.insert(key, ord as u64)?; + unordered_to_ord[unordered_id.0 as usize] = ordered_id; + } + fst_builder.finish()?; + Ok(IdMapping { unordered_to_ord }) +} + +#[derive(Clone, Copy, Debug)] +pub struct UnorderedId(pub u32); + +#[derive(Clone, Copy)] +pub struct OrderedId(pub u32); diff --git a/columnar/src/lib.rs b/columnar/src/lib.rs new file mode 100644 index 000000000..89b5d2504 --- /dev/null +++ b/columnar/src/lib.rs @@ -0,0 +1,69 @@ +// Copyright (C) 2022 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +mod column_type_header; +mod dictionary; +mod reader; +mod serializer; +mod value; +mod writer; + +pub use column_type_header::Cardinality; +pub use reader::ColumnarReader; +pub use serializer::ColumnarSerializer; +pub use writer::ColumnarWriter; + +pub type DocId = u32; + +#[cfg(test)] +mod tests { + use std::ops::Range; + + use common::file_slice::FileSlice; + + use crate::column_type_header::ColumnTypeAndCardinality; + use crate::reader::ColumnarReader; + use crate::serializer::ColumnarSerializer; + use crate::value::NumericalValue; + use crate::ColumnarWriter; + + #[test] + fn test_dataframe_writer() { + let mut dataframe_writer = ColumnarWriter::default(); + dataframe_writer.record_numerical(1u32, b"srical.value", NumericalValue::U64(1u64)); + dataframe_writer.record_numerical(2u32, b"srical.value", NumericalValue::U64(2u64)); + dataframe_writer.record_numerical(4u32, b"srical.value", NumericalValue::I64(2i64)); + let mut buffer: Vec = Vec::new(); + let serializer = ColumnarSerializer::new(&mut buffer); + dataframe_writer.serialize(5, serializer).unwrap(); + let columnar_fileslice = FileSlice::from(buffer); + let columnar = ColumnarReader::open(columnar_fileslice).unwrap(); + assert_eq!(columnar.num_columns(), 1); + let cols: Vec<(ColumnTypeAndCardinality, Range)> = + columnar.read_columns("srical.value").unwrap(); + assert_eq!(cols.len(), 1); + // Right now this 31 bytes are spent as follows + // + // - header 14 bytes + // - vals 8 //< due to padding? could have been 1byte?. + // - null footer 6 bytes + // - version footer 3 bytes // Should be file-wide + assert_eq!(cols[0].1, 0..31); + } +} diff --git a/columnar/src/reader/mod.rs b/columnar/src/reader/mod.rs new file mode 100644 index 000000000..b5dc1a1f9 --- /dev/null +++ b/columnar/src/reader/mod.rs @@ -0,0 +1,66 @@ +use std::ops::Range; +use std::{io, mem}; + +use common::file_slice::FileSlice; +use common::BinarySerializable; +use sstable::{Dictionary, SSTableRange}; + +use crate::column_type_header::ColumnTypeAndCardinality; + +fn io_invalid_data(msg: String) -> io::Error { + io::Error::new(io::ErrorKind::InvalidData, msg) // format!("Invalid key found. + // {key_bytes:?}"))); +} +pub struct ColumnarReader { + column_dictionary: Dictionary, + column_data: FileSlice, +} + +impl ColumnarReader { + pub fn num_columns(&self) -> usize { + self.column_dictionary.num_terms() + } + + pub fn open(file_slice: FileSlice) -> io::Result { + let (file_slice_without_sstable_len, sstable_len_bytes) = + file_slice.split_from_end(mem::size_of::()); + let mut sstable_len_bytes = sstable_len_bytes.read_bytes()?; + let sstable_len = u64::deserialize(&mut sstable_len_bytes)?; + let (column_data, sstable) = + file_slice_without_sstable_len.split_from_end(sstable_len as usize); + let column_dictionary = Dictionary::open(sstable)?; + Ok(ColumnarReader { + column_dictionary, + column_data, + }) + } + + pub fn read_columns( + &self, + field_name: &str, + ) -> io::Result)>> { + let mut start_key = field_name.to_string(); + start_key.push('\0'); + let mut end_key = field_name.to_string(); + end_key.push(1u8 as char); + let mut stream = self + .column_dictionary + .range() + .ge(start_key.as_bytes()) + .lt(end_key.as_bytes()) + .into_stream()?; + let mut results = Vec::new(); + while stream.advance() { + let key_bytes: &[u8] = stream.key(); + if !key_bytes.starts_with(start_key.as_bytes()) { + return Err(io_invalid_data(format!("Invalid key found. {key_bytes:?}"))); + } + let column_code: u8 = key_bytes.last().cloned().unwrap(); + let column_type_and_cardinality = ColumnTypeAndCardinality::try_from_code(column_code) + .ok_or_else(|| io_invalid_data(format!("Unknown column code `{column_code}`")))?; + let range = stream.value().clone(); + results.push((column_type_and_cardinality, range)); + } + Ok(results) + } +} diff --git a/columnar/src/serializer.rs b/columnar/src/serializer.rs new file mode 100644 index 000000000..a66b06585 --- /dev/null +++ b/columnar/src/serializer.rs @@ -0,0 +1,39 @@ +use std::io; +use std::io::Write; +use std::ops::Range; + +use common::CountingWriter; +use sstable::value::RangeWriter; +use sstable::SSTableRange; + +pub struct ColumnarSerializer { + wrt: CountingWriter, + sstable_range: sstable::Writer, RangeWriter>, +} + +impl ColumnarSerializer { + pub fn new(wrt: W) -> ColumnarSerializer { + let sstable_range: sstable::Writer, RangeWriter> = + sstable::Dictionary::::builder(Vec::with_capacity(100_000)).unwrap(); + ColumnarSerializer { + wrt: CountingWriter::wrap(wrt), + sstable_range, + } + } + + pub fn record_column_offsets(&mut self, key: &[u8], byte_range: Range) -> io::Result<()> { + self.sstable_range.insert(key, &byte_range) + } + + pub fn wrt(&mut self) -> &mut CountingWriter { + &mut self.wrt + } + + pub fn finalize(mut self) -> io::Result<()> { + let sstable_bytes: Vec = self.sstable_range.finish()?; + let sstable_num_bytes: u64 = sstable_bytes.len() as u64; + self.wrt.write_all(&sstable_bytes)?; + self.wrt.write_all(&sstable_num_bytes.to_le_bytes()[..])?; + Ok(()) + } +} diff --git a/columnar/src/value.rs b/columnar/src/value.rs new file mode 100644 index 000000000..aa200a7e5 --- /dev/null +++ b/columnar/src/value.rs @@ -0,0 +1,123 @@ +use ordered_float::NotNan; + +#[derive(Copy, Clone, Debug, PartialEq)] +pub enum NumericalValue { + I64(i64), + U64(u64), + F64(NotNan), +} + +impl From for NumericalValue { + fn from(val: u64) -> NumericalValue { + NumericalValue::U64(val) + } +} + +impl From for NumericalValue { + fn from(val: i64) -> Self { + NumericalValue::I64(val) + } +} + +impl From> for NumericalValue { + fn from(val: NotNan) -> Self { + NumericalValue::F64(val) + } +} + +impl NumericalValue { + pub fn numerical_type(&self) -> NumericalType { + match self { + NumericalValue::F64(_) => NumericalType::F64, + NumericalValue::I64(_) => NumericalType::I64, + NumericalValue::U64(_) => NumericalType::U64, + } + } +} + +impl Eq for NumericalValue {} + +#[derive(Clone, Copy, Debug, Default, Hash, Eq, PartialEq)] +#[repr(u8)] +pub enum NumericalType { + #[default] + I64 = 0, + U64 = 1, + F64 = 2, +} + +impl NumericalType { + pub fn to_code(self) -> u8 { + self as u8 + } + + pub fn try_from_code(code: u8) -> Option { + match code { + 0 => Some(NumericalType::I64), + 1 => Some(NumericalType::U64), + 2 => Some(NumericalType::F64), + _ => None, + } + } +} + +/// We voluntarily avoid using `Into` here to keep this +/// implementation quirk as private as possible. +/// +/// This coercion trait actually panics if it is used +/// to convert a loose types to a stricter type. +/// +/// The level is strictness is somewhat arbitrary. +/// - i64 +/// - u64 +/// - f64. +pub(crate) trait Coerce { + fn coerce(numerical_value: NumericalValue) -> Self; +} + +impl Coerce for i64 { + fn coerce(value: NumericalValue) -> Self { + match value { + NumericalValue::I64(val) => val, + NumericalValue::U64(val) => val as i64, + NumericalValue::F64(_) => unreachable!(), + } + } +} + +impl Coerce for u64 { + fn coerce(value: NumericalValue) -> Self { + match value { + NumericalValue::I64(val) => val as u64, + NumericalValue::U64(val) => val, + NumericalValue::F64(_) => unreachable!(), + } + } +} + +impl Coerce for NotNan { + fn coerce(value: NumericalValue) -> Self { + match value { + NumericalValue::I64(val) => unsafe { NotNan::new_unchecked(val as f64) }, + NumericalValue::U64(val) => unsafe { NotNan::new_unchecked(val as f64) }, + NumericalValue::F64(val) => val, + } + } +} + +#[cfg(test)] +mod tests { + use super::NumericalType; + + #[test] + fn test_numerical_type_code() { + let mut num_numerical_type = 0; + for code in u8::MIN..=u8::MAX { + if let Some(numerical_type) = NumericalType::try_from_code(code) { + assert_eq!(numerical_type.to_code(), code); + num_numerical_type += 1; + } + } + assert_eq!(num_numerical_type, 3); + } +} diff --git a/columnar/src/writer/column_operation.rs b/columnar/src/writer/column_operation.rs new file mode 100644 index 000000000..e01842562 --- /dev/null +++ b/columnar/src/writer/column_operation.rs @@ -0,0 +1,321 @@ +use std::fmt; +use std::num::NonZeroU8; + +use ordered_float::NotNan; +use thiserror::Error; + +use crate::dictionary::UnorderedId; +use crate::value::NumericalValue; +use crate::DocId; + +/// When we build a columnar dataframe, we first just group +/// all mutations per column, and append them in append-only object. +/// +/// We represents all of these operations as `ColumnOperation`. +#[derive(Eq, PartialEq, Debug, Clone, Copy)] +pub(crate) enum ColumnOperation { + NewDoc(DocId), + Value(T), +} + +impl From for ColumnOperation { + fn from(value: T) -> Self { + ColumnOperation::Value(value) + } +} + +#[allow(clippy::from_over_into)] +pub(crate) trait SymbolValue: Into + Clone + Copy + fmt::Debug { + fn deserialize(header: NonZeroU8, bytes: &mut &[u8]) -> Result; +} + +pub(crate) struct MiniBuffer { + pub bytes: [u8; 9], + pub len: usize, +} + +impl MiniBuffer { + pub fn as_slice(&self) -> &[u8] { + &self.bytes[..self.len] + } +} + +fn compute_header_byte(typ: SymbolType, len: usize) -> u8 { + assert!(len <= 9); + (len << 4) as u8 | typ as u8 +} + +impl SymbolValue for NumericalValue { + fn deserialize(header_byte: NonZeroU8, bytes: &mut &[u8]) -> Result { + let (typ, len) = parse_header_byte(header_byte)?; + let value_bytes: &[u8]; + (value_bytes, *bytes) = bytes.split_at(len); + let symbol: NumericalValue = match typ { + SymbolType::U64 => { + let mut octet: [u8; 8] = [0u8; 8]; + octet[..value_bytes.len()].copy_from_slice(value_bytes); + let val: u64 = u64::from_le_bytes(octet); + NumericalValue::U64(val) + } + SymbolType::I64 => { + let mut octet: [u8; 8] = [0u8; 8]; + octet[..value_bytes.len()].copy_from_slice(value_bytes); + let encoded: u64 = u64::from_le_bytes(octet); + let val: i64 = decode_zig_zag(encoded); + NumericalValue::I64(val) + } + SymbolType::Float => { + let octet: [u8; 8] = + value_bytes.try_into().map_err(|_| ParseError::InvalidLen { + typ: SymbolType::Float, + len, + })?; + let val_possibly_nan = f64::from_le_bytes(octet); + let val_not_nan = NotNan::new(val_possibly_nan) + .map_err(|_| ParseError::NaN)?; + NumericalValue::F64(val_not_nan) + } + }; + Ok(symbol) + } +} + +#[allow(clippy::from_over_into)] +impl Into for NumericalValue { + fn into(self) -> MiniBuffer { + let mut bytes = [0u8; 9]; + match self { + NumericalValue::F64(val) => { + let len = 8; + let header_byte = compute_header_byte(SymbolType::Float, len); + bytes[0] = header_byte; + bytes[1..].copy_from_slice(&val.to_le_bytes()); + MiniBuffer { + bytes, + len: len + 1, + } + } + NumericalValue::U64(val) => { + let len = compute_num_bytes_for_u64(val); + let header_byte = compute_header_byte(SymbolType::U64, len); + bytes[0] = header_byte; + bytes[1..].copy_from_slice(&val.to_le_bytes()); + MiniBuffer { + bytes, + len: len + 1, + } + } + NumericalValue::I64(val) => { + let encoded = encode_zig_zag(val); + let len = compute_num_bytes_for_u64(encoded); + let header_byte = compute_header_byte(SymbolType::I64, len); + bytes[0] = header_byte; + bytes[1..].copy_from_slice(&encoded.to_le_bytes()); + MiniBuffer { + bytes, + len: len + 1, + } + } + } + } +} + +#[allow(clippy::from_over_into)] +impl Into for UnorderedId { + fn into(self) -> MiniBuffer { + let mut bytes = [0u8; 9]; + let val = self.0 as u64; + let len = compute_num_bytes_for_u64(val) + 1; + bytes[0] = len as u8; + bytes[1..].copy_from_slice(&val.to_le_bytes()); + MiniBuffer { bytes, len } + } +} + +impl SymbolValue for UnorderedId { + fn deserialize(header: NonZeroU8, bytes: &mut &[u8]) -> Result { + let len = header.get() as usize; + let symbol_bytes: &[u8]; + (symbol_bytes, *bytes) = bytes.split_at(len); + let mut value_bytes = [0u8; 4]; + value_bytes[..len - 1].copy_from_slice(&symbol_bytes[1..]); + let value = u32::from_le_bytes(value_bytes); + Ok(UnorderedId(value)) + } +} + +const HEADER_MASK: u8 = (1u8 << 4) - 1u8; + +fn compute_num_bytes_for_u64(val: u64) -> usize { + let msb = (64u32 - val.leading_zeros()) as usize; + (msb + 7) / 8 +} + +fn parse_header_byte(byte: NonZeroU8) -> Result<(SymbolType, usize), ParseError> { + let len = (byte.get() as usize) >> 4; + let typ_code = byte.get() & HEADER_MASK; + let typ = SymbolType::try_from(typ_code)?; + Ok((typ, len)) +} + +#[derive(Error, Debug)] +pub enum ParseError { + #[error("Type byte unknown `{0}`")] + UnknownType(u8), + #[error("Invalid len for type `{len}` for type `{typ:?}`.")] + InvalidLen { typ: SymbolType, len: usize }, + #[error("Missing bytes.")] + MissingBytes, + #[error("Not a number value.")] + NaN, +} + +impl ColumnOperation { + pub fn serialize(self) -> MiniBuffer { + match self { + ColumnOperation::NewDoc(doc) => { + let mut minibuf: [u8; 9] = [0u8; 9]; + minibuf[0] = 0u8; + minibuf[1..5].copy_from_slice(&doc.to_le_bytes()); + MiniBuffer { + bytes: minibuf, + len: 5, + } + } + ColumnOperation::Value(val) => val.into(), + } + } + + pub fn deserialize(bytes: &mut &[u8]) -> Result { + if bytes.is_empty() { + return Err(ParseError::MissingBytes); + } + let header_byte = bytes[0]; + *bytes = &bytes[1..]; + if let Some(header_byte) = NonZeroU8::new(header_byte) { + let value = V::deserialize(header_byte, bytes)?; + Ok(ColumnOperation::Value(value)) + } else { + let doc_bytes: &[u8]; + (doc_bytes, *bytes) = bytes.split_at(4); + let doc: u32 = + u32::from_le_bytes(doc_bytes.try_into().map_err(|_| ParseError::MissingBytes)?); + Ok(ColumnOperation::NewDoc(doc)) + } + } +} + +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +#[repr(u8)] +pub enum SymbolType { + U64 = 1u8, + I64 = 2u8, + Float = 3u8, +} + +impl TryFrom for SymbolType { + type Error = ParseError; + + fn try_from(byte: u8) -> Result { + match byte { + 1u8 => Ok(SymbolType::U64), + 2u8 => Ok(SymbolType::I64), + 3u8 => Ok(SymbolType::Float), + _ => Err(ParseError::UnknownType(byte)), + } + } +} + +fn encode_zig_zag(n: i64) -> u64 { + ((n << 1) ^ (n >> 63)) as u64 +} + +fn decode_zig_zag(n: u64) -> i64 { + ((n >> 1) as i64) ^ (-((n & 1) as i64)) +} + +#[cfg(test)] +mod tests { + use super::{SymbolType, *}; + + #[track_caller] + fn test_zig_zag_aux(val: i64) { + let encoded = super::encode_zig_zag(val); + assert_eq!(decode_zig_zag(encoded), val); + if let Some(abs_val) = val.checked_abs() { + let abs_val = abs_val as u64; + assert!(encoded <= abs_val * 2); + } + } + + #[test] + fn test_zig_zag() { + assert_eq!(encode_zig_zag(0i64), 0u64); + assert_eq!(encode_zig_zag(-1i64), 1u64); + assert_eq!(encode_zig_zag(1i64), 2u64); + test_zig_zag_aux(0i64); + test_zig_zag_aux(i64::MIN); + test_zig_zag_aux(i64::MAX); + } + + use proptest::prelude::any; + use proptest::proptest; + + proptest! { + #[test] + fn test_proptest_zig_zag(val in any::()) { + test_zig_zag_aux(val); + } + } + + #[track_caller] + fn ser_deser_header_byte_aux(symbol_type: SymbolType, len: usize) { + let header_byte = compute_header_byte(symbol_type, len); + let (serdeser_numerical_type, serdeser_len) = + parse_header_byte(NonZeroU8::new(header_byte).unwrap()).unwrap(); + assert_eq!(symbol_type, serdeser_numerical_type); + assert_eq!(len, serdeser_len); + } + + #[test] + fn test_header_byte_serialization() { + for len in 1..9 { + ser_deser_header_byte_aux(SymbolType::Float, len); + ser_deser_header_byte_aux(SymbolType::I64, len); + ser_deser_header_byte_aux(SymbolType::U64, len); + } + } + + #[track_caller] + fn ser_deser_symbol(symbol: ColumnOperation) { + let buf = symbol.serialize(); + let mut bytes = &buf.bytes[..]; + let serdeser_symbol = ColumnOperation::deserialize(&mut bytes).unwrap(); + assert_eq!(bytes.len() + buf.len, buf.bytes.len()); + assert_eq!(symbol, serdeser_symbol); + } + + #[test] + fn test_compute_num_bytes_for_u64() { + assert_eq!(compute_num_bytes_for_u64(0), 0); + assert_eq!(compute_num_bytes_for_u64(1), 1); + assert_eq!(compute_num_bytes_for_u64(255), 1); + assert_eq!(compute_num_bytes_for_u64(256), 2); + assert_eq!(compute_num_bytes_for_u64((1 << 16) - 1), 2); + assert_eq!(compute_num_bytes_for_u64(1 << 16), 3); + } + + #[test] + fn test_symbol_serialization() { + ser_deser_symbol(ColumnOperation::NewDoc(0)); + ser_deser_symbol(ColumnOperation::NewDoc(3)); + ser_deser_symbol(ColumnOperation::Value(NumericalValue::I64(0i64))); + ser_deser_symbol(ColumnOperation::Value(NumericalValue::I64(1i64))); + ser_deser_symbol(ColumnOperation::Value(NumericalValue::U64(257u64))); + ser_deser_symbol(ColumnOperation::Value(NumericalValue::I64(-257i64))); + ser_deser_symbol(ColumnOperation::Value(NumericalValue::I64(i64::MIN))); + ser_deser_symbol(ColumnOperation::Value(NumericalValue::U64(0u64))); + ser_deser_symbol(ColumnOperation::Value(NumericalValue::U64(u64::MIN))); + ser_deser_symbol(ColumnOperation::Value(NumericalValue::U64(u64::MAX))); + } +} diff --git a/columnar/src/writer/mod.rs b/columnar/src/writer/mod.rs new file mode 100644 index 000000000..8508a6d2e --- /dev/null +++ b/columnar/src/writer/mod.rs @@ -0,0 +1,675 @@ +mod column_operation; +mod value_index; + +use std::io::{self, Write}; + +use column_operation::ColumnOperation; +use common::CountingWriter; +use fastfield_codecs::serialize::ValueIndexInfo; +use fastfield_codecs::{Column, MonotonicallyMappableToU64, VecColumn}; +use ordered_float::NotNan; +use stacker::{Addr, ArenaHashMap, ExpUnrolledLinkedList, MemoryArena}; + +use crate::column_type_header::{ColumnType, ColumnTypeAndCardinality}; +use crate::dictionary::{DictionaryBuilder, IdMapping, UnorderedId}; +use crate::value::{Coerce, NumericalType, NumericalValue}; +use crate::writer::column_operation::SymbolValue; +use crate::writer::value_index::{IndexBuilder, SpareIndexBuilders}; +use crate::{Cardinality, ColumnarSerializer, DocId}; + +#[derive(Copy, Clone, Default)] +struct ColumnWriter { + // Detected cardinality of the column so far. + cardinality: Cardinality, + // Last document inserted. + // None if no doc has been added yet. + last_doc_opt: Option, + // Buffer containing the serialized values. + values: ExpUnrolledLinkedList, +} + +#[derive(Clone, Copy, Default)] +pub struct NumericalColumnWriter { + compatible_numerical_types: CompatibleNumericalTypes, + column_writer: ColumnWriter, +} + +#[derive(Clone, Copy)] +struct CompatibleNumericalTypes { + all_values_within_i64_range: bool, + all_values_within_u64_range: bool, +} + +impl Default for CompatibleNumericalTypes { + fn default() -> CompatibleNumericalTypes { + CompatibleNumericalTypes { + all_values_within_i64_range: true, + all_values_within_u64_range: true, + } + } +} + +impl CompatibleNumericalTypes { + pub fn accept_value(&mut self, numerical_value: NumericalValue) { + match numerical_value { + NumericalValue::I64(val_i64) => { + let value_within_u64_range = val_i64 >= 0i64; + self.all_values_within_u64_range &= value_within_u64_range; + } + NumericalValue::U64(val_u64) => { + let value_within_i64_range = val_u64 < i64::MAX as u64; + self.all_values_within_i64_range &= value_within_i64_range; + } + NumericalValue::F64(_) => { + self.all_values_within_i64_range = false; + self.all_values_within_u64_range = false; + } + } + } + + pub fn to_numerical_type(self) -> NumericalType { + if self.all_values_within_i64_range { + NumericalType::I64 + } else if self.all_values_within_u64_range { + NumericalType::U64 + } else { + NumericalType::F64 + } + } +} + +impl NumericalColumnWriter { + pub fn record_numerical_value( + &mut self, + doc: DocId, + value: NumericalValue, + arena: &mut MemoryArena, + ) { + self.compatible_numerical_types.accept_value(value); + self.column_writer.record(doc, value, arena); + } +} + +impl ColumnWriter { + fn symbol_iterator<'a, V: SymbolValue>( + &self, + arena: &MemoryArena, + buffer: &'a mut Vec, + ) -> impl Iterator> + 'a { + buffer.clear(); + self.values.read_to_end(arena, buffer); + let mut cursor: &[u8] = &buffer[..]; + std::iter::from_fn(move || { + if cursor.is_empty() { + return None; + } + let symbol = ColumnOperation::deserialize(&mut cursor) + .expect("Failed to deserialize symbol from in-memory. This should never happen."); + Some(symbol) + }) + } + + fn delta_with_last_doc(&self, doc: DocId) -> u32 { + self.last_doc_opt + .map(|last_doc| doc - last_doc) + .unwrap_or(doc + 1u32) + } + + /// Records a change of the document being recorded. + /// + /// This function will also update the cardinality of the column + /// if necessary. + fn record(&mut self, doc: DocId, value: NumericalValue, arena: &mut MemoryArena) { + // Difference between `doc` and the last doc. + match self.delta_with_last_doc(doc) { + 0 => { + // This is the last encounterred document. + self.cardinality = Cardinality::Multivalued; + } + 1 => { + self.last_doc_opt = Some(doc); + self.write_symbol::(ColumnOperation::NewDoc(doc), arena); + } + _ => { + self.cardinality = self.cardinality.max(Cardinality::Optional); + self.last_doc_opt = Some(doc); + self.write_symbol::(ColumnOperation::NewDoc(doc), arena); + } + } + self.write_symbol(ColumnOperation::Value(value), arena); + } + + // Get the cardinality. + // The overall number of docs in the column is necessary to + // deal with the case where the all docs contain 1 value, except some documents + // at the end of the column. + fn get_cardinality(&self, num_docs: DocId) -> Cardinality { + if self.delta_with_last_doc(num_docs) > 1 { + self.cardinality.max(Cardinality::Optional) + } else { + self.cardinality + } + } + + fn write_symbol( + &mut self, + symbol: ColumnOperation, + arena: &mut MemoryArena, + ) { + self.values + .writer(arena) + .extend_from_slice(symbol.serialize().as_slice()); + } +} + +#[derive(Copy, Clone, Default)] +pub struct BytesColumnWriter { + dictionary_id: u32, + column_writer: ColumnWriter, +} + +impl BytesColumnWriter { + pub fn with_dictionary_id(dictionary_id: u32) -> BytesColumnWriter { + BytesColumnWriter { + dictionary_id, + column_writer: Default::default(), + } + } + + pub fn record_bytes( + &mut self, + doc: DocId, + bytes: &[u8], + dictionaries: &mut [DictionaryBuilder], + arena: &mut MemoryArena, + ) { + let unordered_id = dictionaries[self.dictionary_id as usize].get_or_allocate_id(bytes); + let numerical_value = NumericalValue::U64(unordered_id.0 as u64); + self.column_writer.record(doc, numerical_value, arena); + } +} + +pub struct ColumnarWriter { + numerical_field_hash_map: ArenaHashMap, + bytes_field_hash_map: ArenaHashMap, + arena: MemoryArena, + // Dictionaries used to store dictionary-encoded values. + dictionaries: Vec, + buffers: SpareBuffers, +} + +#[derive(Default)] +struct SpareBuffers { + byte_buffer: Vec, + value_index_builders: SpareIndexBuilders, + i64_values: Vec, + u64_values: Vec, + f64_values: Vec>, +} + +impl Default for ColumnarWriter { + fn default() -> Self { + ColumnarWriter { + numerical_field_hash_map: ArenaHashMap::new(10_000), + bytes_field_hash_map: ArenaHashMap::new(10_000), + dictionaries: Vec::new(), + arena: MemoryArena::default(), + buffers: SpareBuffers::default(), + } + } +} + +#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Debug)] +enum BytesOrNumerical { + Bytes, + Numerical, +} + +impl ColumnarWriter { + pub fn record_numerical(&mut self, doc: DocId, key: &[u8], numerical_value: NumericalValue) { + let (hash_map, arena) = (&mut self.numerical_field_hash_map, &mut self.arena); + hash_map.mutate_or_create(key, |column_opt: Option| { + let mut column: NumericalColumnWriter = column_opt.unwrap_or_default(); + column.record_numerical_value(doc, numerical_value, arena); + column + }); + } + + pub fn record_bytes(&mut self, doc: DocId, key: &[u8], value: &[u8]) { + let (hash_map, arena, dictionaries) = ( + &mut self.bytes_field_hash_map, + &mut self.arena, + &mut self.dictionaries, + ); + hash_map.mutate_or_create(key, |column_opt: Option| { + let mut column: BytesColumnWriter = column_opt.unwrap_or_else(|| { + let dictionary_id = dictionaries.len() as u32; + dictionaries.push(DictionaryBuilder::default()); + BytesColumnWriter::with_dictionary_id(dictionary_id) + }); + column.record_bytes(doc, value, dictionaries, arena); + column + }); + } + + pub fn serialize( + &mut self, + num_docs: DocId, + mut serializer: ColumnarSerializer, + ) -> io::Result<()> { + let mut field_columns: Vec<(&[u8], BytesOrNumerical, Addr)> = self + .numerical_field_hash_map + .iter() + .map(|(term, addr, _)| (term, BytesOrNumerical::Numerical, addr)) + .collect(); + field_columns.extend( + self.bytes_field_hash_map + .iter() + .map(|(term, addr, _)| (term, BytesOrNumerical::Bytes, addr)), + ); + let mut key_buffer = Vec::new(); + field_columns.sort_unstable_by_key(|(key, col_type, _)| (*key, *col_type)); + let (arena, buffers, dictionaries) = (&self.arena, &mut self.buffers, &self.dictionaries); + for (key, bytes_or_numerical, addr) in field_columns { + let wrt = serializer.wrt(); + let start_offset = wrt.written_bytes(); + let column_type_and_cardinality: ColumnTypeAndCardinality = + match bytes_or_numerical { + BytesOrNumerical::Bytes => { + let BytesColumnWriter { dictionary_id, column_writer } = + self.bytes_field_hash_map.read(addr); + let dictionary_builder = + &dictionaries[dictionary_id as usize]; + serialize_bytes_column( + &column_writer, + num_docs, + dictionary_builder, + arena, + buffers, + wrt, + )?; + ColumnTypeAndCardinality { + cardinality: column_writer.get_cardinality(num_docs), + typ: ColumnType::Bytes, + } + } + BytesOrNumerical::Numerical => { + let NumericalColumnWriter { compatible_numerical_types, column_writer } = + self.numerical_field_hash_map.read(addr); + let cardinality = column_writer.get_cardinality(num_docs); + let numerical_type = compatible_numerical_types.to_numerical_type(); + serialize_numerical_column( + cardinality, + numerical_type, + &column_writer, + num_docs, + arena, + buffers, + wrt, + )?; + ColumnTypeAndCardinality { + cardinality, + typ: ColumnType::Numerical(numerical_type), + } + } + }; + let end_offset = wrt.written_bytes(); + let key_with_type = prepare_key(key, column_type_and_cardinality, &mut key_buffer); + serializer.record_column_offsets(key_with_type, start_offset..end_offset)?; + } + serializer.finalize()?; + Ok(()) + } +} + +/// Returns a key consisting of the concatenation of the key and the column_type_and_cardinality +/// code. +fn prepare_key<'a>( + key: &[u8], + column_type_cardinality: ColumnTypeAndCardinality, + buffer: &'a mut Vec, +) -> &'a [u8] { + buffer.clear(); + buffer.extend_from_slice(key); + buffer.push(0u8); + buffer.push(column_type_cardinality.to_code()); + &buffer[..] +} + +fn serialize_bytes_column( + column_writer: &ColumnWriter, + num_docs: DocId, + dictionary_builder: &DictionaryBuilder, + arena: &MemoryArena, + buffers: &mut SpareBuffers, + wrt: &mut CountingWriter, +) -> io::Result<()> { + let start_offset = wrt.written_bytes(); + let id_mapping: IdMapping = dictionary_builder.serialize(wrt)?; + let dictionary_num_bytes: u32 = (wrt.written_bytes() - start_offset) as u32; + let cardinality = column_writer.get_cardinality(num_docs); + let SpareBuffers { + byte_buffer, + value_index_builders, + u64_values, + .. + } = buffers; + let symbol_iterator = column_writer + .symbol_iterator(arena, byte_buffer) + .map(|symbol: ColumnOperation| { + // We map unordered ids to ordered ids. + match symbol { + ColumnOperation::Value(unordered_id) => { + let ordered_id = id_mapping.to_ord(unordered_id); + ColumnOperation::Value(ordered_id.0 as u64) + } + ColumnOperation::NewDoc(doc) => ColumnOperation::NewDoc(doc), + } + }); + serialize_column( + symbol_iterator, + cardinality, + num_docs, + value_index_builders, + u64_values, + wrt, + )?; + wrt.write_all(&dictionary_num_bytes.to_le_bytes()[..])?; + Ok(()) +} + +fn serialize_numerical_column( + cardinality: Cardinality, + numerical_type: NumericalType, + column_writer: &ColumnWriter, + num_docs: DocId, + arena: &MemoryArena, + buffers: &mut SpareBuffers, + wrt: &mut W, +) -> io::Result<()> { + let SpareBuffers { + byte_buffer, + value_index_builders, + u64_values, + i64_values, + f64_values, + } = buffers; + let symbol_iterator = column_writer.symbol_iterator(arena, byte_buffer); + match numerical_type { + NumericalType::I64 => { + serialize_column( + coerce_numerical_symbol::(symbol_iterator), + cardinality, + num_docs, + value_index_builders, + i64_values, + wrt, + )?; + } + NumericalType::U64 => { + serialize_column( + coerce_numerical_symbol::(symbol_iterator), + cardinality, + num_docs, + value_index_builders, + u64_values, + wrt, + )?; + } + NumericalType::F64 => { + serialize_column( + coerce_numerical_symbol::>(symbol_iterator), + cardinality, + num_docs, + value_index_builders, + f64_values, + wrt, + )?; + } + }; + Ok(()) +} + +fn serialize_column< + T: Copy + Ord + Default + Send + Sync + MonotonicallyMappableToU64, + W: io::Write, +>( + symbol_iterator: impl Iterator>, + cardinality: Cardinality, + num_docs: DocId, + value_index_builders: &mut SpareIndexBuilders, + values: &mut Vec, + wrt: &mut W, +) -> io::Result<()> +where + for<'a> VecColumn<'a, T>: Column, +{ + match cardinality { + Cardinality::Required => { + consume_symbol_iterator( + symbol_iterator, + value_index_builders.borrow_required_index_builder(), + values, + ); + fastfield_codecs::serialize( + VecColumn::from(&values[..]), + wrt, + &fastfield_codecs::ALL_CODEC_TYPES[..], + )?; + } + Cardinality::Optional => { + let optional_index_builder = value_index_builders.borrow_optional_index_builder(); + consume_symbol_iterator(symbol_iterator, optional_index_builder, values); + let optional_index = optional_index_builder.finish(num_docs); + fastfield_codecs::serialize::serialize_new( + ValueIndexInfo::SingleValue(Box::new(optional_index)), + VecColumn::from(&values[..]), + wrt, + &fastfield_codecs::ALL_CODEC_TYPES[..], + )?; + } + Cardinality::Multivalued => { + let multivalued_index_builder = value_index_builders.borrow_multivalued_index_builder(); + consume_symbol_iterator(symbol_iterator, multivalued_index_builder, values); + let multivalued_index = multivalued_index_builder.finish(num_docs); + fastfield_codecs::serialize::serialize_new( + ValueIndexInfo::MultiValue(Box::new(multivalued_index)), + VecColumn::from(&values[..]), + wrt, + &fastfield_codecs::ALL_CODEC_TYPES[..], + )?; + } + } + Ok(()) +} + +fn coerce_numerical_symbol( + symbol_iterator: impl Iterator>, +) -> impl Iterator> +where T: Coerce { + symbol_iterator.map(|symbol| match symbol { + ColumnOperation::NewDoc(doc) => ColumnOperation::NewDoc(doc), + ColumnOperation::Value(numerical_value) => { + ColumnOperation::Value(Coerce::coerce(numerical_value)) + } + }) +} + +fn consume_symbol_iterator( + symbol_iterator: impl Iterator>, + index_builder: &mut TIndexBuilder, + values: &mut Vec, +) { + for symbol in symbol_iterator { + match symbol { + ColumnOperation::NewDoc(doc) => { + index_builder.record_doc(doc); + } + ColumnOperation::Value(value) => { + index_builder.record_value(); + values.push(value); + } + } + } +} + +#[cfg(test)] +mod tests { + + use ordered_float::NotNan; + use stacker::MemoryArena; + + use super::prepare_key; + use crate::column_type_header::{ColumnType, ColumnTypeAndCardinality}; + use crate::value::{NumericalType, NumericalValue}; + use crate::writer::column_operation::ColumnOperation; + use crate::writer::CompatibleNumericalTypes; + use crate::Cardinality; + + #[test] + fn test_prepare_key_bytes() { + let mut buffer: Vec = b"somegarbage".to_vec(); + let column_type_and_cardinality = ColumnTypeAndCardinality { + typ: ColumnType::Bytes, + cardinality: Cardinality::Optional, + }; + let prepared_key = prepare_key(b"root\0child", column_type_and_cardinality, &mut buffer); + assert_eq!(prepared_key.len(), 12); + assert_eq!(&prepared_key[..10], b"root\0child"); + assert_eq!(prepared_key[10], 0u8); + assert_eq!(prepared_key[11], column_type_and_cardinality.to_code()); + } + + #[test] + fn test_column_writer_required_simple() { + let mut arena = MemoryArena::default(); + let mut column_writer = super::ColumnWriter::default(); + column_writer.record(0u32, 14i64.into(), &mut arena); + column_writer.record(1u32, 15i64.into(), &mut arena); + column_writer.record(2u32, (-16i64).into(), &mut arena); + assert_eq!(column_writer.get_cardinality(3), Cardinality::Required); + let mut buffer = Vec::new(); + let symbols: Vec> = column_writer + .symbol_iterator(&mut arena, &mut buffer) + .collect(); + assert_eq!(symbols.len(), 6); + assert!(matches!(symbols[0], ColumnOperation::NewDoc(0u32))); + assert!(matches!( + symbols[1], + ColumnOperation::Value(NumericalValue::I64(14i64)) + )); + assert!(matches!(symbols[2], ColumnOperation::NewDoc(1u32))); + assert!(matches!( + symbols[3], + ColumnOperation::Value(NumericalValue::I64(15i64)) + )); + assert!(matches!(symbols[4], ColumnOperation::NewDoc(2u32))); + assert!(matches!( + symbols[5], + ColumnOperation::Value(NumericalValue::I64(-16i64)) + )); + } + + #[test] + fn test_column_writer_optional_cardinality_missing_first() { + let mut arena = MemoryArena::default(); + let mut column_writer = super::ColumnWriter::default(); + column_writer.record(1u32, 15i64.into(), &mut arena); + column_writer.record(2u32, (-16i64).into(), &mut arena); + assert_eq!(column_writer.get_cardinality(3), Cardinality::Optional); + let mut buffer = Vec::new(); + let symbols: Vec> = column_writer + .symbol_iterator(&mut arena, &mut buffer) + .collect(); + assert_eq!(symbols.len(), 4); + assert!(matches!(symbols[0], ColumnOperation::NewDoc(1u32))); + assert!(matches!( + symbols[1], + ColumnOperation::Value(NumericalValue::I64(15i64)) + )); + assert!(matches!(symbols[2], ColumnOperation::NewDoc(2u32))); + assert!(matches!( + symbols[3], + ColumnOperation::Value(NumericalValue::I64(-16i64)) + )); + } + + #[test] + fn test_column_writer_optional_cardinality_missing_last() { + let mut arena = MemoryArena::default(); + let mut column_writer = super::ColumnWriter::default(); + column_writer.record(0u32, 15i64.into(), &mut arena); + assert_eq!(column_writer.get_cardinality(2), Cardinality::Optional); + let mut buffer = Vec::new(); + let symbols: Vec> = column_writer + .symbol_iterator(&mut arena, &mut buffer) + .collect(); + assert_eq!(symbols.len(), 2); + assert!(matches!(symbols[0], ColumnOperation::NewDoc(0u32))); + assert!(matches!( + symbols[1], + ColumnOperation::Value(NumericalValue::I64(15i64)) + )); + } + + #[test] + fn test_column_writer_multivalued() { + let mut arena = MemoryArena::default(); + let mut column_writer = super::ColumnWriter::default(); + column_writer.record(0u32, 16i64.into(), &mut arena); + column_writer.record(0u32, 17i64.into(), &mut arena); + assert_eq!(column_writer.get_cardinality(1), Cardinality::Multivalued); + let mut buffer = Vec::new(); + let symbols: Vec> = column_writer + .symbol_iterator(&mut arena, &mut buffer) + .collect(); + assert_eq!(symbols.len(), 3); + assert!(matches!(symbols[0], ColumnOperation::NewDoc(0u32))); + assert!(matches!( + symbols[1], + ColumnOperation::Value(NumericalValue::I64(16i64)) + )); + assert!(matches!( + symbols[2], + ColumnOperation::Value(NumericalValue::I64(17i64)) + )); + } + + #[track_caller] + fn test_column_writer_coercion_iter_aux( + values: impl Iterator, + expected_numerical_type: NumericalType, + ) { + let mut compatible_numerical_types = CompatibleNumericalTypes::default(); + for value in values { + compatible_numerical_types.accept_value(value); + } + assert_eq!( + compatible_numerical_types.to_numerical_type(), + expected_numerical_type + ); + } + + #[track_caller] + fn test_column_writer_coercion_aux( + values: &[NumericalValue], + expected_numerical_type: NumericalType, + ) { + test_column_writer_coercion_iter_aux(values.iter().copied(), expected_numerical_type); + test_column_writer_coercion_iter_aux(values.iter().rev().copied(), expected_numerical_type); + } + + #[test] + fn test_column_writer_coercion() { + test_column_writer_coercion_aux(&[], NumericalType::I64); + test_column_writer_coercion_aux(&[1i64.into()], NumericalType::I64); + test_column_writer_coercion_aux(&[1u64.into()], NumericalType::I64); + // We don't detect exact integer at the moment. We could! + test_column_writer_coercion_aux(&[NotNan::new(1f64).unwrap().into()], NumericalType::F64); + test_column_writer_coercion_aux(&[u64::MAX.into()], NumericalType::U64); + test_column_writer_coercion_aux(&[(i64::MAX as u64).into()], NumericalType::U64); + test_column_writer_coercion_aux(&[(1u64 << 63).into()], NumericalType::U64); + test_column_writer_coercion_aux(&[1i64.into(), 1u64.into()], NumericalType::I64); + test_column_writer_coercion_aux(&[u64::MAX.into(), (-1i64).into()], NumericalType::F64); + } +} diff --git a/columnar/src/writer/value_index.rs b/columnar/src/writer/value_index.rs new file mode 100644 index 000000000..7284eb845 --- /dev/null +++ b/columnar/src/writer/value_index.rs @@ -0,0 +1,218 @@ +use fastfield_codecs::serialize::{MultiValueIndexInfo, SingleValueIndexInfo}; + +use crate::DocId; + +/// The `IndexBuilder` interprets a sequence of +/// calls of the form: +/// (record_doc,record_value+)* +/// and can then serialize the results into an index. +/// +/// It has different implementation depending on whether the +/// cardinality is required, optional, or multivalued. +pub(crate) trait IndexBuilder { + fn record_doc(&mut self, doc: DocId); + #[inline] + fn record_value(&mut self) {} +} + +/// The RequiredIndexBuilder does nothing. +#[derive(Default)] +pub struct RequiredIndexBuilder; + +impl IndexBuilder for RequiredIndexBuilder { + #[inline(always)] + fn record_doc(&mut self, _doc: DocId) {} +} + +#[derive(Default)] +pub struct OptionalIndexBuilder { + docs: Vec, +} + +struct SingleValueArrayIndex<'a> { + docs: &'a [DocId], + num_docs: DocId, +} + +impl<'a> SingleValueIndexInfo for SingleValueArrayIndex<'a> { + fn num_vals(&self) -> u32 { + self.num_docs as u32 + } + + fn num_non_nulls(&self) -> u32 { + self.docs.len() as u32 + } + + fn iter(&self) -> Box + '_> { + Box::new(self.docs.iter().copied()) + } +} + +impl OptionalIndexBuilder { + pub fn finish(&mut self, num_docs: DocId) -> impl SingleValueIndexInfo + '_ { + debug_assert!(self + .docs + .last() + .copied() + .map(|last_doc| last_doc < num_docs) + .unwrap_or(true)); + SingleValueArrayIndex { + docs: &self.docs[..], + num_docs, + } + } + + fn reset(&mut self) { + self.docs.clear(); + } +} + +impl IndexBuilder for OptionalIndexBuilder { + #[inline(always)] + fn record_doc(&mut self, doc: DocId) { + debug_assert!(self + .docs + .last() + .copied() + .map(|prev_doc| doc > prev_doc) + .unwrap_or(true)); + self.docs.push(doc); + } +} + +#[derive(Default)] +pub struct MultivaluedIndexBuilder { + // TODO should we switch to `start_offset`? + end_values: Vec, + total_num_vals_seen: u32, +} + +pub struct MultivaluedValueArrayIndex<'a> { + end_offsets: &'a [DocId], +} + +impl<'a> MultiValueIndexInfo for MultivaluedValueArrayIndex<'a> { + fn num_docs(&self) -> u32 { + self.end_offsets.len() as u32 + } + + fn num_vals(&self) -> u32 { + self.end_offsets.last().copied().unwrap_or(0u32) + } + + fn iter(&self) -> Box + '_> { + if self.end_offsets.is_empty() { + return Box::new(std::iter::empty()); + } + let n = self.end_offsets.len(); + Box::new(std::iter::once(0u32).chain(self.end_offsets[..n - 1].iter().copied())) + } +} + +impl MultivaluedIndexBuilder { + pub fn finish(&mut self, num_docs: DocId) -> impl MultiValueIndexInfo + '_ { + self.end_values + .resize(num_docs as usize, self.total_num_vals_seen); + MultivaluedValueArrayIndex { + end_offsets: &self.end_values[..], + } + } + + fn reset(&mut self) { + self.end_values.clear(); + self.total_num_vals_seen = 0; + } +} + +impl IndexBuilder for MultivaluedIndexBuilder { + fn record_doc(&mut self, doc: DocId) { + self.end_values + .resize(doc as usize, self.total_num_vals_seen); + } + + fn record_value(&mut self) { + self.total_num_vals_seen += 1; + } +} + +/// The `SpareIndexBuilders` is there to avoid allocating a +/// new index builder for every single column. +#[derive(Default)] +pub struct SpareIndexBuilders { + required_index_builder: RequiredIndexBuilder, + optional_index_builder: OptionalIndexBuilder, + multivalued_index_builder: MultivaluedIndexBuilder, +} + +impl SpareIndexBuilders { + pub fn borrow_required_index_builder(&mut self) -> &mut RequiredIndexBuilder { + &mut self.required_index_builder + } + + pub fn borrow_optional_index_builder(&mut self) -> &mut OptionalIndexBuilder { + self.optional_index_builder.reset(); + &mut self.optional_index_builder + } + + pub fn borrow_multivalued_index_builder(&mut self) -> &mut MultivaluedIndexBuilder { + self.multivalued_index_builder.reset(); + &mut self.multivalued_index_builder + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_optional_value_index_builder() { + let mut opt_value_index_builder = OptionalIndexBuilder::default(); + opt_value_index_builder.record_doc(0u32); + opt_value_index_builder.record_value(); + assert_eq!( + &opt_value_index_builder + .finish(1u32) + .iter() + .collect::>(), + &[0] + ); + opt_value_index_builder.reset(); + opt_value_index_builder.record_doc(1u32); + opt_value_index_builder.record_value(); + assert_eq!( + &opt_value_index_builder + .finish(2u32) + .iter() + .collect::>(), + &[1] + ); + } + + #[test] + fn test_multivalued_value_index_builder() { + let mut multivalued_value_index_builder = MultivaluedIndexBuilder::default(); + multivalued_value_index_builder.record_doc(1u32); + multivalued_value_index_builder.record_value(); + multivalued_value_index_builder.record_value(); + multivalued_value_index_builder.record_doc(2u32); + multivalued_value_index_builder.record_value(); + assert_eq!( + multivalued_value_index_builder + .finish(4u32) + .iter() + .collect::>(), + vec![0, 0, 2, 3] + ); + multivalued_value_index_builder.reset(); + multivalued_value_index_builder.record_doc(2u32); + multivalued_value_index_builder.record_value(); + multivalued_value_index_builder.record_value(); + assert_eq!( + multivalued_value_index_builder + .finish(4u32) + .iter() + .collect::>(), + vec![0, 0, 0, 2] + ); + } +} diff --git a/fastfield_codecs/src/lib.rs b/fastfield_codecs/src/lib.rs index 1b4a2d6ea..6c8e9e267 100644 --- a/fastfield_codecs/src/lib.rs +++ b/fastfield_codecs/src/lib.rs @@ -43,7 +43,7 @@ mod null_index_footer; mod column; mod gcd; -mod serialize; +pub mod serialize; pub use ordered_float;