Support for columnar

This commit is contained in:
Paul Masurel
2022-12-21 12:21:30 +09:00
parent 2b89bf9050
commit 8828b6d310
13 changed files with 1804 additions and 2 deletions

View File

@@ -108,7 +108,7 @@ unstable = [] # useful for benches.
quickwit = ["sstable"]
[workspace]
members = ["query-grammar", "bitpacker", "common", "fastfield_codecs", "ownedbytes", "stacker", "sstable"]
members = ["query-grammar", "bitpacker", "common", "fastfield_codecs", "ownedbytes", "stacker", "sstable", "columnar"]
# Following the "fail" crate best practises, we isolate
# tests that define specific behavior in fail check points

26
columnar/Cargo.toml Normal file
View File

@@ -0,0 +1,26 @@
[package]
name = "tantivy-columnar"
version = "0.1.0"
edition = "2021"
[dependencies]
stacker = { path = "../stacker", package="tantivy-stacker"}
serde_json = "1"
thiserror = "1"
fnv = "1"
tantivy-fst = "0.4.0"
sstable = { path = "../sstable", package = "tantivy-sstable" }
common = { path = "../common", package = "tantivy-common" }
fastfield_codecs = { path = "../fastfield_codecs"}
ordered-float = "3.4"
itertools = "0.10"
[features]
# default = ["quickwit"]
# quickwit = ["common/quickwit"]
[dev-dependencies]
proptest = "1"

33
columnar/README.md Normal file
View File

@@ -0,0 +1,33 @@
# Columnar format
This crate describes columnar format used in tantivy.
## Goals
This format is special in the following way.
- it needs to be compact
- it does not required to be loaded in memory.
- it is designed to fit well with quickwit's strange constraint:
we need to be able to load columns rapidly.
- columns of several types can be associated with the same column name.
- it needs to support columns with different types `(str, u64, i64, f64)`
and different cardinality `(required, optional, multivalued)`.
- columns, once loaded, offer cheap random access.
# Format
A quickwit/tantivy style sstable associated
`(column names, column_cardinality, column_type) to range of bytes.
The format of the key is:
`[column_name][ZERO_BYTE][column_type_header: u8]`
Column name may not contain the zero byte.
Listing all columns associated to `column_name` can therefore
be done by listing all keys prefixed by
`[column_name][ZERO_BYTE]`
The associated range of bytes refer to a range of bytes

View File

@@ -0,0 +1,154 @@
use crate::value::NumericalType;
#[derive(Clone, Copy, Hash, Default, Debug, PartialEq, Eq, PartialOrd, Ord)]
#[repr(u8)]
pub enum Cardinality {
#[default]
Required = 0,
Optional = 1,
Multivalued = 2,
}
impl Cardinality {
pub fn to_code(self) -> u8 {
self as u8
}
pub fn try_from_code(code: u8) -> Option<Cardinality> {
match code {
0 => Some(Cardinality::Required),
1 => Some(Cardinality::Optional),
2 => Some(Cardinality::Multivalued),
_ => None,
}
}
}
#[derive(Hash, Eq, PartialEq, Debug, Clone, Copy)]
pub enum ColumnType {
Bytes,
Numerical(NumericalType),
}
impl ColumnType {
pub fn to_code(self) -> u8 {
match self {
ColumnType::Bytes => 0u8,
ColumnType::Numerical(numerical_type) => 1u8 | (numerical_type.to_code() << 1),
}
}
pub fn try_from_code(code: u8) -> Option<ColumnType> {
if code == 0u8 {
return Some(ColumnType::Bytes);
}
if code & 1u8 == 0u8 {
return None;
}
let numerical_type = NumericalType::try_from_code(code >> 1)?;
Some(ColumnType::Numerical(numerical_type))
}
}
/// Represents the type and cardinality of a column.
/// This is encoded over one-byte and added to a column key in the
/// columnar sstable.
///
/// Cardinality is encoded as the first two highest two bits.
/// The low 6 bits encode the column type.
#[derive(Eq, Hash, PartialEq, Debug, Copy, Clone)]
pub struct ColumnTypeAndCardinality {
pub cardinality: Cardinality,
pub typ: ColumnType,
}
#[inline]
const fn compute_mask(num_bits: u8) -> u8 {
if num_bits == 8 {
u8::MAX
} else {
(1u8 << num_bits) - 1
}
}
#[inline]
fn select_bits<const START: u8, const END: u8>(code: u8) -> u8 {
assert!(START <= END);
assert!(END <= 8);
let num_bits: u8 = END - START;
let mask: u8 = compute_mask(num_bits);
(code >> START) & mask
}
#[inline]
fn place_bits<const START: u8, const END: u8>(code: u8) -> u8 {
assert!(START <= END);
assert!(END <= 8);
let num_bits: u8 = END - START;
let mask: u8 = compute_mask(num_bits);
assert!(code <= mask);
code << START
}
impl ColumnTypeAndCardinality {
pub fn to_code(self) -> u8 {
place_bits::<6, 8>(self.cardinality.to_code()) | place_bits::<0, 6>(self.typ.to_code())
}
pub fn try_from_code(code: u8) -> Option<ColumnTypeAndCardinality> {
let typ_code = select_bits::<0, 6>(code);
let cardinality_code = select_bits::<6, 8>(code);
let cardinality = Cardinality::try_from_code(cardinality_code)?;
let typ = ColumnType::try_from_code(typ_code)?;
assert_eq!(typ.to_code(), typ_code);
Some(ColumnTypeAndCardinality { cardinality, typ })
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use super::ColumnTypeAndCardinality;
use crate::column_type_header::{Cardinality, ColumnType};
#[test]
fn test_column_type_header_to_code() {
let mut column_type_header_set: HashSet<ColumnTypeAndCardinality> = HashSet::new();
for code in u8::MIN..=u8::MAX {
if let Some(column_type_header) = ColumnTypeAndCardinality::try_from_code(code) {
assert_eq!(column_type_header.to_code(), code);
assert!(column_type_header_set.insert(column_type_header));
}
}
assert_eq!(
column_type_header_set.len(),
3 /* cardinality */ * (1 + 3) // column_types
);
}
#[test]
fn test_column_type_to_code() {
let mut column_type_set: HashSet<ColumnType> = HashSet::new();
for code in u8::MIN..=u8::MAX {
if let Some(column_type) = ColumnType::try_from_code(code) {
assert_eq!(column_type.to_code(), code);
assert!(column_type_set.insert(column_type));
}
}
assert_eq!(column_type_set.len(), 1 + 3);
}
#[test]
fn test_cardinality_to_code() {
let mut num_cardinality = 0;
for code in u8::MIN..=u8::MAX {
let cardinality_opt = Cardinality::try_from_code(code);
if let Some(cardinality) = cardinality_opt {
assert_eq!(cardinality.to_code(), code);
num_cardinality += 1;
}
}
assert_eq!(num_cardinality, 3);
}
}

View File

@@ -0,0 +1,78 @@
use std::io;
use fnv::FnvHashMap;
fn fst_err_into_io_err(fst_err: tantivy_fst::Error) -> io::Error {
match fst_err {
tantivy_fst::Error::Fst(fst_err) => {
io::Error::new(io::ErrorKind::Other, format!("FST Error: {:?}", fst_err))
}
tantivy_fst::Error::Io(io_err) => io_err,
}
}
/// `DictionaryBuilder` for dictionary encoding.
///
/// It stores the different terms encounterred and assigns them a temporary value
/// we call unordered id.
///
/// Upon serialization, we will sort the ids and hence build a `UnorderedId -> Term ordinal`
/// mapping.
#[derive(Default)]
pub struct DictionaryBuilder {
dict: FnvHashMap<Vec<u8>, UnorderedId>,
}
pub struct IdMapping {
unordered_to_ord: Vec<OrderedId>,
}
impl IdMapping {
pub fn to_ord(&self, unordered: UnorderedId) -> OrderedId {
self.unordered_to_ord[unordered.0 as usize]
}
}
impl DictionaryBuilder {
/// Get or allocate an unordered id.
/// (This ID is simply an auto-incremented id.)
pub fn get_or_allocate_id(&mut self, term: &[u8]) -> UnorderedId {
if let Some(term_id) = self.dict.get(term) {
return *term_id;
}
let new_id = UnorderedId(self.dict.len() as u32);
self.dict.insert(term.to_vec(), new_id);
new_id
}
/// Serialize the dictionary into an fst, and returns the
/// `UnorderedId -> TermOrdinal` map.
pub fn serialize<'a, W: io::Write + 'a>(&self, wrt: &mut W) -> io::Result<IdMapping> {
serialize_inner(&self.dict, wrt).map_err(fst_err_into_io_err)
}
}
/// Helper function just there for error conversion.
fn serialize_inner<'a, W: io::Write + 'a>(
dict: &FnvHashMap<Vec<u8>, UnorderedId>,
wrt: &mut W,
) -> tantivy_fst::Result<IdMapping> {
let mut terms: Vec<(&[u8], UnorderedId)> =
dict.iter().map(|(k, v)| (k.as_slice(), *v)).collect();
terms.sort_unstable_by_key(|(key, _)| *key);
let mut unordered_to_ord: Vec<OrderedId> = vec![OrderedId(0u32); terms.len()];
let mut fst_builder = tantivy_fst::MapBuilder::new(wrt)?;
for (ord, (key, unordered_id)) in terms.into_iter().enumerate() {
let ordered_id = OrderedId(ord as u32);
fst_builder.insert(key, ord as u64)?;
unordered_to_ord[unordered_id.0 as usize] = ordered_id;
}
fst_builder.finish()?;
Ok(IdMapping { unordered_to_ord })
}
#[derive(Clone, Copy, Debug)]
pub struct UnorderedId(pub u32);
#[derive(Clone, Copy)]
pub struct OrderedId(pub u32);

69
columnar/src/lib.rs Normal file
View File

@@ -0,0 +1,69 @@
// Copyright (C) 2022 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at hello@quickwit.io.
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
mod column_type_header;
mod dictionary;
mod reader;
mod serializer;
mod value;
mod writer;
pub use column_type_header::Cardinality;
pub use reader::ColumnarReader;
pub use serializer::ColumnarSerializer;
pub use writer::ColumnarWriter;
pub type DocId = u32;
#[cfg(test)]
mod tests {
use std::ops::Range;
use common::file_slice::FileSlice;
use crate::column_type_header::ColumnTypeAndCardinality;
use crate::reader::ColumnarReader;
use crate::serializer::ColumnarSerializer;
use crate::value::NumericalValue;
use crate::ColumnarWriter;
#[test]
fn test_dataframe_writer() {
let mut dataframe_writer = ColumnarWriter::default();
dataframe_writer.record_numerical(1u32, b"srical.value", NumericalValue::U64(1u64));
dataframe_writer.record_numerical(2u32, b"srical.value", NumericalValue::U64(2u64));
dataframe_writer.record_numerical(4u32, b"srical.value", NumericalValue::I64(2i64));
let mut buffer: Vec<u8> = Vec::new();
let serializer = ColumnarSerializer::new(&mut buffer);
dataframe_writer.serialize(5, serializer).unwrap();
let columnar_fileslice = FileSlice::from(buffer);
let columnar = ColumnarReader::open(columnar_fileslice).unwrap();
assert_eq!(columnar.num_columns(), 1);
let cols: Vec<(ColumnTypeAndCardinality, Range<u64>)> =
columnar.read_columns("srical.value").unwrap();
assert_eq!(cols.len(), 1);
// Right now this 31 bytes are spent as follows
//
// - header 14 bytes
// - vals 8 //< due to padding? could have been 1byte?.
// - null footer 6 bytes
// - version footer 3 bytes // Should be file-wide
assert_eq!(cols[0].1, 0..31);
}
}

View File

@@ -0,0 +1,66 @@
use std::ops::Range;
use std::{io, mem};
use common::file_slice::FileSlice;
use common::BinarySerializable;
use sstable::{Dictionary, SSTableRange};
use crate::column_type_header::ColumnTypeAndCardinality;
fn io_invalid_data(msg: String) -> io::Error {
io::Error::new(io::ErrorKind::InvalidData, msg) // format!("Invalid key found.
// {key_bytes:?}")));
}
pub struct ColumnarReader {
column_dictionary: Dictionary<SSTableRange>,
column_data: FileSlice,
}
impl ColumnarReader {
pub fn num_columns(&self) -> usize {
self.column_dictionary.num_terms()
}
pub fn open(file_slice: FileSlice) -> io::Result<ColumnarReader> {
let (file_slice_without_sstable_len, sstable_len_bytes) =
file_slice.split_from_end(mem::size_of::<u64>());
let mut sstable_len_bytes = sstable_len_bytes.read_bytes()?;
let sstable_len = u64::deserialize(&mut sstable_len_bytes)?;
let (column_data, sstable) =
file_slice_without_sstable_len.split_from_end(sstable_len as usize);
let column_dictionary = Dictionary::open(sstable)?;
Ok(ColumnarReader {
column_dictionary,
column_data,
})
}
pub fn read_columns(
&self,
field_name: &str,
) -> io::Result<Vec<(ColumnTypeAndCardinality, Range<u64>)>> {
let mut start_key = field_name.to_string();
start_key.push('\0');
let mut end_key = field_name.to_string();
end_key.push(1u8 as char);
let mut stream = self
.column_dictionary
.range()
.ge(start_key.as_bytes())
.lt(end_key.as_bytes())
.into_stream()?;
let mut results = Vec::new();
while stream.advance() {
let key_bytes: &[u8] = stream.key();
if !key_bytes.starts_with(start_key.as_bytes()) {
return Err(io_invalid_data(format!("Invalid key found. {key_bytes:?}")));
}
let column_code: u8 = key_bytes.last().cloned().unwrap();
let column_type_and_cardinality = ColumnTypeAndCardinality::try_from_code(column_code)
.ok_or_else(|| io_invalid_data(format!("Unknown column code `{column_code}`")))?;
let range = stream.value().clone();
results.push((column_type_and_cardinality, range));
}
Ok(results)
}
}

View File

@@ -0,0 +1,39 @@
use std::io;
use std::io::Write;
use std::ops::Range;
use common::CountingWriter;
use sstable::value::RangeWriter;
use sstable::SSTableRange;
pub struct ColumnarSerializer<W: io::Write> {
wrt: CountingWriter<W>,
sstable_range: sstable::Writer<Vec<u8>, RangeWriter>,
}
impl<W: io::Write> ColumnarSerializer<W> {
pub fn new(wrt: W) -> ColumnarSerializer<W> {
let sstable_range: sstable::Writer<Vec<u8>, RangeWriter> =
sstable::Dictionary::<SSTableRange>::builder(Vec::with_capacity(100_000)).unwrap();
ColumnarSerializer {
wrt: CountingWriter::wrap(wrt),
sstable_range,
}
}
pub fn record_column_offsets(&mut self, key: &[u8], byte_range: Range<u64>) -> io::Result<()> {
self.sstable_range.insert(key, &byte_range)
}
pub fn wrt(&mut self) -> &mut CountingWriter<W> {
&mut self.wrt
}
pub fn finalize(mut self) -> io::Result<()> {
let sstable_bytes: Vec<u8> = self.sstable_range.finish()?;
let sstable_num_bytes: u64 = sstable_bytes.len() as u64;
self.wrt.write_all(&sstable_bytes)?;
self.wrt.write_all(&sstable_num_bytes.to_le_bytes()[..])?;
Ok(())
}
}

123
columnar/src/value.rs Normal file
View File

@@ -0,0 +1,123 @@
use ordered_float::NotNan;
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum NumericalValue {
I64(i64),
U64(u64),
F64(NotNan<f64>),
}
impl From<u64> for NumericalValue {
fn from(val: u64) -> NumericalValue {
NumericalValue::U64(val)
}
}
impl From<i64> for NumericalValue {
fn from(val: i64) -> Self {
NumericalValue::I64(val)
}
}
impl From<NotNan<f64>> for NumericalValue {
fn from(val: NotNan<f64>) -> Self {
NumericalValue::F64(val)
}
}
impl NumericalValue {
pub fn numerical_type(&self) -> NumericalType {
match self {
NumericalValue::F64(_) => NumericalType::F64,
NumericalValue::I64(_) => NumericalType::I64,
NumericalValue::U64(_) => NumericalType::U64,
}
}
}
impl Eq for NumericalValue {}
#[derive(Clone, Copy, Debug, Default, Hash, Eq, PartialEq)]
#[repr(u8)]
pub enum NumericalType {
#[default]
I64 = 0,
U64 = 1,
F64 = 2,
}
impl NumericalType {
pub fn to_code(self) -> u8 {
self as u8
}
pub fn try_from_code(code: u8) -> Option<NumericalType> {
match code {
0 => Some(NumericalType::I64),
1 => Some(NumericalType::U64),
2 => Some(NumericalType::F64),
_ => None,
}
}
}
/// We voluntarily avoid using `Into` here to keep this
/// implementation quirk as private as possible.
///
/// This coercion trait actually panics if it is used
/// to convert a loose types to a stricter type.
///
/// The level is strictness is somewhat arbitrary.
/// - i64
/// - u64
/// - f64.
pub(crate) trait Coerce {
fn coerce(numerical_value: NumericalValue) -> Self;
}
impl Coerce for i64 {
fn coerce(value: NumericalValue) -> Self {
match value {
NumericalValue::I64(val) => val,
NumericalValue::U64(val) => val as i64,
NumericalValue::F64(_) => unreachable!(),
}
}
}
impl Coerce for u64 {
fn coerce(value: NumericalValue) -> Self {
match value {
NumericalValue::I64(val) => val as u64,
NumericalValue::U64(val) => val,
NumericalValue::F64(_) => unreachable!(),
}
}
}
impl Coerce for NotNan<f64> {
fn coerce(value: NumericalValue) -> Self {
match value {
NumericalValue::I64(val) => unsafe { NotNan::new_unchecked(val as f64) },
NumericalValue::U64(val) => unsafe { NotNan::new_unchecked(val as f64) },
NumericalValue::F64(val) => val,
}
}
}
#[cfg(test)]
mod tests {
use super::NumericalType;
#[test]
fn test_numerical_type_code() {
let mut num_numerical_type = 0;
for code in u8::MIN..=u8::MAX {
if let Some(numerical_type) = NumericalType::try_from_code(code) {
assert_eq!(numerical_type.to_code(), code);
num_numerical_type += 1;
}
}
assert_eq!(num_numerical_type, 3);
}
}

View File

@@ -0,0 +1,321 @@
use std::fmt;
use std::num::NonZeroU8;
use ordered_float::NotNan;
use thiserror::Error;
use crate::dictionary::UnorderedId;
use crate::value::NumericalValue;
use crate::DocId;
/// When we build a columnar dataframe, we first just group
/// all mutations per column, and append them in append-only object.
///
/// We represents all of these operations as `ColumnOperation`.
#[derive(Eq, PartialEq, Debug, Clone, Copy)]
pub(crate) enum ColumnOperation<T> {
NewDoc(DocId),
Value(T),
}
impl<T> From<T> for ColumnOperation<T> {
fn from(value: T) -> Self {
ColumnOperation::Value(value)
}
}
#[allow(clippy::from_over_into)]
pub(crate) trait SymbolValue: Into<MiniBuffer> + Clone + Copy + fmt::Debug {
fn deserialize(header: NonZeroU8, bytes: &mut &[u8]) -> Result<Self, ParseError>;
}
pub(crate) struct MiniBuffer {
pub bytes: [u8; 9],
pub len: usize,
}
impl MiniBuffer {
pub fn as_slice(&self) -> &[u8] {
&self.bytes[..self.len]
}
}
fn compute_header_byte(typ: SymbolType, len: usize) -> u8 {
assert!(len <= 9);
(len << 4) as u8 | typ as u8
}
impl SymbolValue for NumericalValue {
fn deserialize(header_byte: NonZeroU8, bytes: &mut &[u8]) -> Result<Self, ParseError> {
let (typ, len) = parse_header_byte(header_byte)?;
let value_bytes: &[u8];
(value_bytes, *bytes) = bytes.split_at(len);
let symbol: NumericalValue = match typ {
SymbolType::U64 => {
let mut octet: [u8; 8] = [0u8; 8];
octet[..value_bytes.len()].copy_from_slice(value_bytes);
let val: u64 = u64::from_le_bytes(octet);
NumericalValue::U64(val)
}
SymbolType::I64 => {
let mut octet: [u8; 8] = [0u8; 8];
octet[..value_bytes.len()].copy_from_slice(value_bytes);
let encoded: u64 = u64::from_le_bytes(octet);
let val: i64 = decode_zig_zag(encoded);
NumericalValue::I64(val)
}
SymbolType::Float => {
let octet: [u8; 8] =
value_bytes.try_into().map_err(|_| ParseError::InvalidLen {
typ: SymbolType::Float,
len,
})?;
let val_possibly_nan = f64::from_le_bytes(octet);
let val_not_nan = NotNan::new(val_possibly_nan)
.map_err(|_| ParseError::NaN)?;
NumericalValue::F64(val_not_nan)
}
};
Ok(symbol)
}
}
#[allow(clippy::from_over_into)]
impl Into<MiniBuffer> for NumericalValue {
fn into(self) -> MiniBuffer {
let mut bytes = [0u8; 9];
match self {
NumericalValue::F64(val) => {
let len = 8;
let header_byte = compute_header_byte(SymbolType::Float, len);
bytes[0] = header_byte;
bytes[1..].copy_from_slice(&val.to_le_bytes());
MiniBuffer {
bytes,
len: len + 1,
}
}
NumericalValue::U64(val) => {
let len = compute_num_bytes_for_u64(val);
let header_byte = compute_header_byte(SymbolType::U64, len);
bytes[0] = header_byte;
bytes[1..].copy_from_slice(&val.to_le_bytes());
MiniBuffer {
bytes,
len: len + 1,
}
}
NumericalValue::I64(val) => {
let encoded = encode_zig_zag(val);
let len = compute_num_bytes_for_u64(encoded);
let header_byte = compute_header_byte(SymbolType::I64, len);
bytes[0] = header_byte;
bytes[1..].copy_from_slice(&encoded.to_le_bytes());
MiniBuffer {
bytes,
len: len + 1,
}
}
}
}
}
#[allow(clippy::from_over_into)]
impl Into<MiniBuffer> for UnorderedId {
fn into(self) -> MiniBuffer {
let mut bytes = [0u8; 9];
let val = self.0 as u64;
let len = compute_num_bytes_for_u64(val) + 1;
bytes[0] = len as u8;
bytes[1..].copy_from_slice(&val.to_le_bytes());
MiniBuffer { bytes, len }
}
}
impl SymbolValue for UnorderedId {
fn deserialize(header: NonZeroU8, bytes: &mut &[u8]) -> Result<UnorderedId, ParseError> {
let len = header.get() as usize;
let symbol_bytes: &[u8];
(symbol_bytes, *bytes) = bytes.split_at(len);
let mut value_bytes = [0u8; 4];
value_bytes[..len - 1].copy_from_slice(&symbol_bytes[1..]);
let value = u32::from_le_bytes(value_bytes);
Ok(UnorderedId(value))
}
}
const HEADER_MASK: u8 = (1u8 << 4) - 1u8;
fn compute_num_bytes_for_u64(val: u64) -> usize {
let msb = (64u32 - val.leading_zeros()) as usize;
(msb + 7) / 8
}
fn parse_header_byte(byte: NonZeroU8) -> Result<(SymbolType, usize), ParseError> {
let len = (byte.get() as usize) >> 4;
let typ_code = byte.get() & HEADER_MASK;
let typ = SymbolType::try_from(typ_code)?;
Ok((typ, len))
}
#[derive(Error, Debug)]
pub enum ParseError {
#[error("Type byte unknown `{0}`")]
UnknownType(u8),
#[error("Invalid len for type `{len}` for type `{typ:?}`.")]
InvalidLen { typ: SymbolType, len: usize },
#[error("Missing bytes.")]
MissingBytes,
#[error("Not a number value.")]
NaN,
}
impl<V: SymbolValue> ColumnOperation<V> {
pub fn serialize(self) -> MiniBuffer {
match self {
ColumnOperation::NewDoc(doc) => {
let mut minibuf: [u8; 9] = [0u8; 9];
minibuf[0] = 0u8;
minibuf[1..5].copy_from_slice(&doc.to_le_bytes());
MiniBuffer {
bytes: minibuf,
len: 5,
}
}
ColumnOperation::Value(val) => val.into(),
}
}
pub fn deserialize(bytes: &mut &[u8]) -> Result<Self, ParseError> {
if bytes.is_empty() {
return Err(ParseError::MissingBytes);
}
let header_byte = bytes[0];
*bytes = &bytes[1..];
if let Some(header_byte) = NonZeroU8::new(header_byte) {
let value = V::deserialize(header_byte, bytes)?;
Ok(ColumnOperation::Value(value))
} else {
let doc_bytes: &[u8];
(doc_bytes, *bytes) = bytes.split_at(4);
let doc: u32 =
u32::from_le_bytes(doc_bytes.try_into().map_err(|_| ParseError::MissingBytes)?);
Ok(ColumnOperation::NewDoc(doc))
}
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
#[repr(u8)]
pub enum SymbolType {
U64 = 1u8,
I64 = 2u8,
Float = 3u8,
}
impl TryFrom<u8> for SymbolType {
type Error = ParseError;
fn try_from(byte: u8) -> Result<Self, ParseError> {
match byte {
1u8 => Ok(SymbolType::U64),
2u8 => Ok(SymbolType::I64),
3u8 => Ok(SymbolType::Float),
_ => Err(ParseError::UnknownType(byte)),
}
}
}
fn encode_zig_zag(n: i64) -> u64 {
((n << 1) ^ (n >> 63)) as u64
}
fn decode_zig_zag(n: u64) -> i64 {
((n >> 1) as i64) ^ (-((n & 1) as i64))
}
#[cfg(test)]
mod tests {
use super::{SymbolType, *};
#[track_caller]
fn test_zig_zag_aux(val: i64) {
let encoded = super::encode_zig_zag(val);
assert_eq!(decode_zig_zag(encoded), val);
if let Some(abs_val) = val.checked_abs() {
let abs_val = abs_val as u64;
assert!(encoded <= abs_val * 2);
}
}
#[test]
fn test_zig_zag() {
assert_eq!(encode_zig_zag(0i64), 0u64);
assert_eq!(encode_zig_zag(-1i64), 1u64);
assert_eq!(encode_zig_zag(1i64), 2u64);
test_zig_zag_aux(0i64);
test_zig_zag_aux(i64::MIN);
test_zig_zag_aux(i64::MAX);
}
use proptest::prelude::any;
use proptest::proptest;
proptest! {
#[test]
fn test_proptest_zig_zag(val in any::<i64>()) {
test_zig_zag_aux(val);
}
}
#[track_caller]
fn ser_deser_header_byte_aux(symbol_type: SymbolType, len: usize) {
let header_byte = compute_header_byte(symbol_type, len);
let (serdeser_numerical_type, serdeser_len) =
parse_header_byte(NonZeroU8::new(header_byte).unwrap()).unwrap();
assert_eq!(symbol_type, serdeser_numerical_type);
assert_eq!(len, serdeser_len);
}
#[test]
fn test_header_byte_serialization() {
for len in 1..9 {
ser_deser_header_byte_aux(SymbolType::Float, len);
ser_deser_header_byte_aux(SymbolType::I64, len);
ser_deser_header_byte_aux(SymbolType::U64, len);
}
}
#[track_caller]
fn ser_deser_symbol(symbol: ColumnOperation<NumericalValue>) {
let buf = symbol.serialize();
let mut bytes = &buf.bytes[..];
let serdeser_symbol = ColumnOperation::deserialize(&mut bytes).unwrap();
assert_eq!(bytes.len() + buf.len, buf.bytes.len());
assert_eq!(symbol, serdeser_symbol);
}
#[test]
fn test_compute_num_bytes_for_u64() {
assert_eq!(compute_num_bytes_for_u64(0), 0);
assert_eq!(compute_num_bytes_for_u64(1), 1);
assert_eq!(compute_num_bytes_for_u64(255), 1);
assert_eq!(compute_num_bytes_for_u64(256), 2);
assert_eq!(compute_num_bytes_for_u64((1 << 16) - 1), 2);
assert_eq!(compute_num_bytes_for_u64(1 << 16), 3);
}
#[test]
fn test_symbol_serialization() {
ser_deser_symbol(ColumnOperation::NewDoc(0));
ser_deser_symbol(ColumnOperation::NewDoc(3));
ser_deser_symbol(ColumnOperation::Value(NumericalValue::I64(0i64)));
ser_deser_symbol(ColumnOperation::Value(NumericalValue::I64(1i64)));
ser_deser_symbol(ColumnOperation::Value(NumericalValue::U64(257u64)));
ser_deser_symbol(ColumnOperation::Value(NumericalValue::I64(-257i64)));
ser_deser_symbol(ColumnOperation::Value(NumericalValue::I64(i64::MIN)));
ser_deser_symbol(ColumnOperation::Value(NumericalValue::U64(0u64)));
ser_deser_symbol(ColumnOperation::Value(NumericalValue::U64(u64::MIN)));
ser_deser_symbol(ColumnOperation::Value(NumericalValue::U64(u64::MAX)));
}
}

675
columnar/src/writer/mod.rs Normal file
View File

@@ -0,0 +1,675 @@
mod column_operation;
mod value_index;
use std::io::{self, Write};
use column_operation::ColumnOperation;
use common::CountingWriter;
use fastfield_codecs::serialize::ValueIndexInfo;
use fastfield_codecs::{Column, MonotonicallyMappableToU64, VecColumn};
use ordered_float::NotNan;
use stacker::{Addr, ArenaHashMap, ExpUnrolledLinkedList, MemoryArena};
use crate::column_type_header::{ColumnType, ColumnTypeAndCardinality};
use crate::dictionary::{DictionaryBuilder, IdMapping, UnorderedId};
use crate::value::{Coerce, NumericalType, NumericalValue};
use crate::writer::column_operation::SymbolValue;
use crate::writer::value_index::{IndexBuilder, SpareIndexBuilders};
use crate::{Cardinality, ColumnarSerializer, DocId};
#[derive(Copy, Clone, Default)]
struct ColumnWriter {
// Detected cardinality of the column so far.
cardinality: Cardinality,
// Last document inserted.
// None if no doc has been added yet.
last_doc_opt: Option<u32>,
// Buffer containing the serialized values.
values: ExpUnrolledLinkedList,
}
#[derive(Clone, Copy, Default)]
pub struct NumericalColumnWriter {
compatible_numerical_types: CompatibleNumericalTypes,
column_writer: ColumnWriter,
}
#[derive(Clone, Copy)]
struct CompatibleNumericalTypes {
all_values_within_i64_range: bool,
all_values_within_u64_range: bool,
}
impl Default for CompatibleNumericalTypes {
fn default() -> CompatibleNumericalTypes {
CompatibleNumericalTypes {
all_values_within_i64_range: true,
all_values_within_u64_range: true,
}
}
}
impl CompatibleNumericalTypes {
pub fn accept_value(&mut self, numerical_value: NumericalValue) {
match numerical_value {
NumericalValue::I64(val_i64) => {
let value_within_u64_range = val_i64 >= 0i64;
self.all_values_within_u64_range &= value_within_u64_range;
}
NumericalValue::U64(val_u64) => {
let value_within_i64_range = val_u64 < i64::MAX as u64;
self.all_values_within_i64_range &= value_within_i64_range;
}
NumericalValue::F64(_) => {
self.all_values_within_i64_range = false;
self.all_values_within_u64_range = false;
}
}
}
pub fn to_numerical_type(self) -> NumericalType {
if self.all_values_within_i64_range {
NumericalType::I64
} else if self.all_values_within_u64_range {
NumericalType::U64
} else {
NumericalType::F64
}
}
}
impl NumericalColumnWriter {
pub fn record_numerical_value(
&mut self,
doc: DocId,
value: NumericalValue,
arena: &mut MemoryArena,
) {
self.compatible_numerical_types.accept_value(value);
self.column_writer.record(doc, value, arena);
}
}
impl ColumnWriter {
fn symbol_iterator<'a, V: SymbolValue>(
&self,
arena: &MemoryArena,
buffer: &'a mut Vec<u8>,
) -> impl Iterator<Item = ColumnOperation<V>> + 'a {
buffer.clear();
self.values.read_to_end(arena, buffer);
let mut cursor: &[u8] = &buffer[..];
std::iter::from_fn(move || {
if cursor.is_empty() {
return None;
}
let symbol = ColumnOperation::deserialize(&mut cursor)
.expect("Failed to deserialize symbol from in-memory. This should never happen.");
Some(symbol)
})
}
fn delta_with_last_doc(&self, doc: DocId) -> u32 {
self.last_doc_opt
.map(|last_doc| doc - last_doc)
.unwrap_or(doc + 1u32)
}
/// Records a change of the document being recorded.
///
/// This function will also update the cardinality of the column
/// if necessary.
fn record(&mut self, doc: DocId, value: NumericalValue, arena: &mut MemoryArena) {
// Difference between `doc` and the last doc.
match self.delta_with_last_doc(doc) {
0 => {
// This is the last encounterred document.
self.cardinality = Cardinality::Multivalued;
}
1 => {
self.last_doc_opt = Some(doc);
self.write_symbol::<NumericalValue>(ColumnOperation::NewDoc(doc), arena);
}
_ => {
self.cardinality = self.cardinality.max(Cardinality::Optional);
self.last_doc_opt = Some(doc);
self.write_symbol::<NumericalValue>(ColumnOperation::NewDoc(doc), arena);
}
}
self.write_symbol(ColumnOperation::Value(value), arena);
}
// Get the cardinality.
// The overall number of docs in the column is necessary to
// deal with the case where the all docs contain 1 value, except some documents
// at the end of the column.
fn get_cardinality(&self, num_docs: DocId) -> Cardinality {
if self.delta_with_last_doc(num_docs) > 1 {
self.cardinality.max(Cardinality::Optional)
} else {
self.cardinality
}
}
fn write_symbol<V: SymbolValue>(
&mut self,
symbol: ColumnOperation<V>,
arena: &mut MemoryArena,
) {
self.values
.writer(arena)
.extend_from_slice(symbol.serialize().as_slice());
}
}
#[derive(Copy, Clone, Default)]
pub struct BytesColumnWriter {
dictionary_id: u32,
column_writer: ColumnWriter,
}
impl BytesColumnWriter {
pub fn with_dictionary_id(dictionary_id: u32) -> BytesColumnWriter {
BytesColumnWriter {
dictionary_id,
column_writer: Default::default(),
}
}
pub fn record_bytes(
&mut self,
doc: DocId,
bytes: &[u8],
dictionaries: &mut [DictionaryBuilder],
arena: &mut MemoryArena,
) {
let unordered_id = dictionaries[self.dictionary_id as usize].get_or_allocate_id(bytes);
let numerical_value = NumericalValue::U64(unordered_id.0 as u64);
self.column_writer.record(doc, numerical_value, arena);
}
}
pub struct ColumnarWriter {
numerical_field_hash_map: ArenaHashMap,
bytes_field_hash_map: ArenaHashMap,
arena: MemoryArena,
// Dictionaries used to store dictionary-encoded values.
dictionaries: Vec<DictionaryBuilder>,
buffers: SpareBuffers,
}
#[derive(Default)]
struct SpareBuffers {
byte_buffer: Vec<u8>,
value_index_builders: SpareIndexBuilders,
i64_values: Vec<i64>,
u64_values: Vec<u64>,
f64_values: Vec<ordered_float::NotNan<f64>>,
}
impl Default for ColumnarWriter {
fn default() -> Self {
ColumnarWriter {
numerical_field_hash_map: ArenaHashMap::new(10_000),
bytes_field_hash_map: ArenaHashMap::new(10_000),
dictionaries: Vec::new(),
arena: MemoryArena::default(),
buffers: SpareBuffers::default(),
}
}
}
#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Debug)]
enum BytesOrNumerical {
Bytes,
Numerical,
}
impl ColumnarWriter {
pub fn record_numerical(&mut self, doc: DocId, key: &[u8], numerical_value: NumericalValue) {
let (hash_map, arena) = (&mut self.numerical_field_hash_map, &mut self.arena);
hash_map.mutate_or_create(key, |column_opt: Option<NumericalColumnWriter>| {
let mut column: NumericalColumnWriter = column_opt.unwrap_or_default();
column.record_numerical_value(doc, numerical_value, arena);
column
});
}
pub fn record_bytes(&mut self, doc: DocId, key: &[u8], value: &[u8]) {
let (hash_map, arena, dictionaries) = (
&mut self.bytes_field_hash_map,
&mut self.arena,
&mut self.dictionaries,
);
hash_map.mutate_or_create(key, |column_opt: Option<BytesColumnWriter>| {
let mut column: BytesColumnWriter = column_opt.unwrap_or_else(|| {
let dictionary_id = dictionaries.len() as u32;
dictionaries.push(DictionaryBuilder::default());
BytesColumnWriter::with_dictionary_id(dictionary_id)
});
column.record_bytes(doc, value, dictionaries, arena);
column
});
}
pub fn serialize<W: io::Write>(
&mut self,
num_docs: DocId,
mut serializer: ColumnarSerializer<W>,
) -> io::Result<()> {
let mut field_columns: Vec<(&[u8], BytesOrNumerical, Addr)> = self
.numerical_field_hash_map
.iter()
.map(|(term, addr, _)| (term, BytesOrNumerical::Numerical, addr))
.collect();
field_columns.extend(
self.bytes_field_hash_map
.iter()
.map(|(term, addr, _)| (term, BytesOrNumerical::Bytes, addr)),
);
let mut key_buffer = Vec::new();
field_columns.sort_unstable_by_key(|(key, col_type, _)| (*key, *col_type));
let (arena, buffers, dictionaries) = (&self.arena, &mut self.buffers, &self.dictionaries);
for (key, bytes_or_numerical, addr) in field_columns {
let wrt = serializer.wrt();
let start_offset = wrt.written_bytes();
let column_type_and_cardinality: ColumnTypeAndCardinality =
match bytes_or_numerical {
BytesOrNumerical::Bytes => {
let BytesColumnWriter { dictionary_id, column_writer } =
self.bytes_field_hash_map.read(addr);
let dictionary_builder =
&dictionaries[dictionary_id as usize];
serialize_bytes_column(
&column_writer,
num_docs,
dictionary_builder,
arena,
buffers,
wrt,
)?;
ColumnTypeAndCardinality {
cardinality: column_writer.get_cardinality(num_docs),
typ: ColumnType::Bytes,
}
}
BytesOrNumerical::Numerical => {
let NumericalColumnWriter { compatible_numerical_types, column_writer } =
self.numerical_field_hash_map.read(addr);
let cardinality = column_writer.get_cardinality(num_docs);
let numerical_type = compatible_numerical_types.to_numerical_type();
serialize_numerical_column(
cardinality,
numerical_type,
&column_writer,
num_docs,
arena,
buffers,
wrt,
)?;
ColumnTypeAndCardinality {
cardinality,
typ: ColumnType::Numerical(numerical_type),
}
}
};
let end_offset = wrt.written_bytes();
let key_with_type = prepare_key(key, column_type_and_cardinality, &mut key_buffer);
serializer.record_column_offsets(key_with_type, start_offset..end_offset)?;
}
serializer.finalize()?;
Ok(())
}
}
/// Returns a key consisting of the concatenation of the key and the column_type_and_cardinality
/// code.
fn prepare_key<'a>(
key: &[u8],
column_type_cardinality: ColumnTypeAndCardinality,
buffer: &'a mut Vec<u8>,
) -> &'a [u8] {
buffer.clear();
buffer.extend_from_slice(key);
buffer.push(0u8);
buffer.push(column_type_cardinality.to_code());
&buffer[..]
}
fn serialize_bytes_column<W: io::Write>(
column_writer: &ColumnWriter,
num_docs: DocId,
dictionary_builder: &DictionaryBuilder,
arena: &MemoryArena,
buffers: &mut SpareBuffers,
wrt: &mut CountingWriter<W>,
) -> io::Result<()> {
let start_offset = wrt.written_bytes();
let id_mapping: IdMapping = dictionary_builder.serialize(wrt)?;
let dictionary_num_bytes: u32 = (wrt.written_bytes() - start_offset) as u32;
let cardinality = column_writer.get_cardinality(num_docs);
let SpareBuffers {
byte_buffer,
value_index_builders,
u64_values,
..
} = buffers;
let symbol_iterator = column_writer
.symbol_iterator(arena, byte_buffer)
.map(|symbol: ColumnOperation<UnorderedId>| {
// We map unordered ids to ordered ids.
match symbol {
ColumnOperation::Value(unordered_id) => {
let ordered_id = id_mapping.to_ord(unordered_id);
ColumnOperation::Value(ordered_id.0 as u64)
}
ColumnOperation::NewDoc(doc) => ColumnOperation::NewDoc(doc),
}
});
serialize_column(
symbol_iterator,
cardinality,
num_docs,
value_index_builders,
u64_values,
wrt,
)?;
wrt.write_all(&dictionary_num_bytes.to_le_bytes()[..])?;
Ok(())
}
fn serialize_numerical_column<W: io::Write>(
cardinality: Cardinality,
numerical_type: NumericalType,
column_writer: &ColumnWriter,
num_docs: DocId,
arena: &MemoryArena,
buffers: &mut SpareBuffers,
wrt: &mut W,
) -> io::Result<()> {
let SpareBuffers {
byte_buffer,
value_index_builders,
u64_values,
i64_values,
f64_values,
} = buffers;
let symbol_iterator = column_writer.symbol_iterator(arena, byte_buffer);
match numerical_type {
NumericalType::I64 => {
serialize_column(
coerce_numerical_symbol::<i64>(symbol_iterator),
cardinality,
num_docs,
value_index_builders,
i64_values,
wrt,
)?;
}
NumericalType::U64 => {
serialize_column(
coerce_numerical_symbol::<u64>(symbol_iterator),
cardinality,
num_docs,
value_index_builders,
u64_values,
wrt,
)?;
}
NumericalType::F64 => {
serialize_column(
coerce_numerical_symbol::<NotNan<f64>>(symbol_iterator),
cardinality,
num_docs,
value_index_builders,
f64_values,
wrt,
)?;
}
};
Ok(())
}
fn serialize_column<
T: Copy + Ord + Default + Send + Sync + MonotonicallyMappableToU64,
W: io::Write,
>(
symbol_iterator: impl Iterator<Item = ColumnOperation<T>>,
cardinality: Cardinality,
num_docs: DocId,
value_index_builders: &mut SpareIndexBuilders,
values: &mut Vec<T>,
wrt: &mut W,
) -> io::Result<()>
where
for<'a> VecColumn<'a, T>: Column<T>,
{
match cardinality {
Cardinality::Required => {
consume_symbol_iterator(
symbol_iterator,
value_index_builders.borrow_required_index_builder(),
values,
);
fastfield_codecs::serialize(
VecColumn::from(&values[..]),
wrt,
&fastfield_codecs::ALL_CODEC_TYPES[..],
)?;
}
Cardinality::Optional => {
let optional_index_builder = value_index_builders.borrow_optional_index_builder();
consume_symbol_iterator(symbol_iterator, optional_index_builder, values);
let optional_index = optional_index_builder.finish(num_docs);
fastfield_codecs::serialize::serialize_new(
ValueIndexInfo::SingleValue(Box::new(optional_index)),
VecColumn::from(&values[..]),
wrt,
&fastfield_codecs::ALL_CODEC_TYPES[..],
)?;
}
Cardinality::Multivalued => {
let multivalued_index_builder = value_index_builders.borrow_multivalued_index_builder();
consume_symbol_iterator(symbol_iterator, multivalued_index_builder, values);
let multivalued_index = multivalued_index_builder.finish(num_docs);
fastfield_codecs::serialize::serialize_new(
ValueIndexInfo::MultiValue(Box::new(multivalued_index)),
VecColumn::from(&values[..]),
wrt,
&fastfield_codecs::ALL_CODEC_TYPES[..],
)?;
}
}
Ok(())
}
fn coerce_numerical_symbol<T>(
symbol_iterator: impl Iterator<Item = ColumnOperation<NumericalValue>>,
) -> impl Iterator<Item = ColumnOperation<T>>
where T: Coerce {
symbol_iterator.map(|symbol| match symbol {
ColumnOperation::NewDoc(doc) => ColumnOperation::NewDoc(doc),
ColumnOperation::Value(numerical_value) => {
ColumnOperation::Value(Coerce::coerce(numerical_value))
}
})
}
fn consume_symbol_iterator<T, TIndexBuilder: IndexBuilder>(
symbol_iterator: impl Iterator<Item = ColumnOperation<T>>,
index_builder: &mut TIndexBuilder,
values: &mut Vec<T>,
) {
for symbol in symbol_iterator {
match symbol {
ColumnOperation::NewDoc(doc) => {
index_builder.record_doc(doc);
}
ColumnOperation::Value(value) => {
index_builder.record_value();
values.push(value);
}
}
}
}
#[cfg(test)]
mod tests {
use ordered_float::NotNan;
use stacker::MemoryArena;
use super::prepare_key;
use crate::column_type_header::{ColumnType, ColumnTypeAndCardinality};
use crate::value::{NumericalType, NumericalValue};
use crate::writer::column_operation::ColumnOperation;
use crate::writer::CompatibleNumericalTypes;
use crate::Cardinality;
#[test]
fn test_prepare_key_bytes() {
let mut buffer: Vec<u8> = b"somegarbage".to_vec();
let column_type_and_cardinality = ColumnTypeAndCardinality {
typ: ColumnType::Bytes,
cardinality: Cardinality::Optional,
};
let prepared_key = prepare_key(b"root\0child", column_type_and_cardinality, &mut buffer);
assert_eq!(prepared_key.len(), 12);
assert_eq!(&prepared_key[..10], b"root\0child");
assert_eq!(prepared_key[10], 0u8);
assert_eq!(prepared_key[11], column_type_and_cardinality.to_code());
}
#[test]
fn test_column_writer_required_simple() {
let mut arena = MemoryArena::default();
let mut column_writer = super::ColumnWriter::default();
column_writer.record(0u32, 14i64.into(), &mut arena);
column_writer.record(1u32, 15i64.into(), &mut arena);
column_writer.record(2u32, (-16i64).into(), &mut arena);
assert_eq!(column_writer.get_cardinality(3), Cardinality::Required);
let mut buffer = Vec::new();
let symbols: Vec<ColumnOperation<NumericalValue>> = column_writer
.symbol_iterator(&mut arena, &mut buffer)
.collect();
assert_eq!(symbols.len(), 6);
assert!(matches!(symbols[0], ColumnOperation::NewDoc(0u32)));
assert!(matches!(
symbols[1],
ColumnOperation::Value(NumericalValue::I64(14i64))
));
assert!(matches!(symbols[2], ColumnOperation::NewDoc(1u32)));
assert!(matches!(
symbols[3],
ColumnOperation::Value(NumericalValue::I64(15i64))
));
assert!(matches!(symbols[4], ColumnOperation::NewDoc(2u32)));
assert!(matches!(
symbols[5],
ColumnOperation::Value(NumericalValue::I64(-16i64))
));
}
#[test]
fn test_column_writer_optional_cardinality_missing_first() {
let mut arena = MemoryArena::default();
let mut column_writer = super::ColumnWriter::default();
column_writer.record(1u32, 15i64.into(), &mut arena);
column_writer.record(2u32, (-16i64).into(), &mut arena);
assert_eq!(column_writer.get_cardinality(3), Cardinality::Optional);
let mut buffer = Vec::new();
let symbols: Vec<ColumnOperation<NumericalValue>> = column_writer
.symbol_iterator(&mut arena, &mut buffer)
.collect();
assert_eq!(symbols.len(), 4);
assert!(matches!(symbols[0], ColumnOperation::NewDoc(1u32)));
assert!(matches!(
symbols[1],
ColumnOperation::Value(NumericalValue::I64(15i64))
));
assert!(matches!(symbols[2], ColumnOperation::NewDoc(2u32)));
assert!(matches!(
symbols[3],
ColumnOperation::Value(NumericalValue::I64(-16i64))
));
}
#[test]
fn test_column_writer_optional_cardinality_missing_last() {
let mut arena = MemoryArena::default();
let mut column_writer = super::ColumnWriter::default();
column_writer.record(0u32, 15i64.into(), &mut arena);
assert_eq!(column_writer.get_cardinality(2), Cardinality::Optional);
let mut buffer = Vec::new();
let symbols: Vec<ColumnOperation<NumericalValue>> = column_writer
.symbol_iterator(&mut arena, &mut buffer)
.collect();
assert_eq!(symbols.len(), 2);
assert!(matches!(symbols[0], ColumnOperation::NewDoc(0u32)));
assert!(matches!(
symbols[1],
ColumnOperation::Value(NumericalValue::I64(15i64))
));
}
#[test]
fn test_column_writer_multivalued() {
let mut arena = MemoryArena::default();
let mut column_writer = super::ColumnWriter::default();
column_writer.record(0u32, 16i64.into(), &mut arena);
column_writer.record(0u32, 17i64.into(), &mut arena);
assert_eq!(column_writer.get_cardinality(1), Cardinality::Multivalued);
let mut buffer = Vec::new();
let symbols: Vec<ColumnOperation<NumericalValue>> = column_writer
.symbol_iterator(&mut arena, &mut buffer)
.collect();
assert_eq!(symbols.len(), 3);
assert!(matches!(symbols[0], ColumnOperation::NewDoc(0u32)));
assert!(matches!(
symbols[1],
ColumnOperation::Value(NumericalValue::I64(16i64))
));
assert!(matches!(
symbols[2],
ColumnOperation::Value(NumericalValue::I64(17i64))
));
}
#[track_caller]
fn test_column_writer_coercion_iter_aux(
values: impl Iterator<Item = NumericalValue>,
expected_numerical_type: NumericalType,
) {
let mut compatible_numerical_types = CompatibleNumericalTypes::default();
for value in values {
compatible_numerical_types.accept_value(value);
}
assert_eq!(
compatible_numerical_types.to_numerical_type(),
expected_numerical_type
);
}
#[track_caller]
fn test_column_writer_coercion_aux(
values: &[NumericalValue],
expected_numerical_type: NumericalType,
) {
test_column_writer_coercion_iter_aux(values.iter().copied(), expected_numerical_type);
test_column_writer_coercion_iter_aux(values.iter().rev().copied(), expected_numerical_type);
}
#[test]
fn test_column_writer_coercion() {
test_column_writer_coercion_aux(&[], NumericalType::I64);
test_column_writer_coercion_aux(&[1i64.into()], NumericalType::I64);
test_column_writer_coercion_aux(&[1u64.into()], NumericalType::I64);
// We don't detect exact integer at the moment. We could!
test_column_writer_coercion_aux(&[NotNan::new(1f64).unwrap().into()], NumericalType::F64);
test_column_writer_coercion_aux(&[u64::MAX.into()], NumericalType::U64);
test_column_writer_coercion_aux(&[(i64::MAX as u64).into()], NumericalType::U64);
test_column_writer_coercion_aux(&[(1u64 << 63).into()], NumericalType::U64);
test_column_writer_coercion_aux(&[1i64.into(), 1u64.into()], NumericalType::I64);
test_column_writer_coercion_aux(&[u64::MAX.into(), (-1i64).into()], NumericalType::F64);
}
}

View File

@@ -0,0 +1,218 @@
use fastfield_codecs::serialize::{MultiValueIndexInfo, SingleValueIndexInfo};
use crate::DocId;
/// The `IndexBuilder` interprets a sequence of
/// calls of the form:
/// (record_doc,record_value+)*
/// and can then serialize the results into an index.
///
/// It has different implementation depending on whether the
/// cardinality is required, optional, or multivalued.
pub(crate) trait IndexBuilder {
fn record_doc(&mut self, doc: DocId);
#[inline]
fn record_value(&mut self) {}
}
/// The RequiredIndexBuilder does nothing.
#[derive(Default)]
pub struct RequiredIndexBuilder;
impl IndexBuilder for RequiredIndexBuilder {
#[inline(always)]
fn record_doc(&mut self, _doc: DocId) {}
}
#[derive(Default)]
pub struct OptionalIndexBuilder {
docs: Vec<DocId>,
}
struct SingleValueArrayIndex<'a> {
docs: &'a [DocId],
num_docs: DocId,
}
impl<'a> SingleValueIndexInfo for SingleValueArrayIndex<'a> {
fn num_vals(&self) -> u32 {
self.num_docs as u32
}
fn num_non_nulls(&self) -> u32 {
self.docs.len() as u32
}
fn iter(&self) -> Box<dyn Iterator<Item = u32> + '_> {
Box::new(self.docs.iter().copied())
}
}
impl OptionalIndexBuilder {
pub fn finish(&mut self, num_docs: DocId) -> impl SingleValueIndexInfo + '_ {
debug_assert!(self
.docs
.last()
.copied()
.map(|last_doc| last_doc < num_docs)
.unwrap_or(true));
SingleValueArrayIndex {
docs: &self.docs[..],
num_docs,
}
}
fn reset(&mut self) {
self.docs.clear();
}
}
impl IndexBuilder for OptionalIndexBuilder {
#[inline(always)]
fn record_doc(&mut self, doc: DocId) {
debug_assert!(self
.docs
.last()
.copied()
.map(|prev_doc| doc > prev_doc)
.unwrap_or(true));
self.docs.push(doc);
}
}
#[derive(Default)]
pub struct MultivaluedIndexBuilder {
// TODO should we switch to `start_offset`?
end_values: Vec<DocId>,
total_num_vals_seen: u32,
}
pub struct MultivaluedValueArrayIndex<'a> {
end_offsets: &'a [DocId],
}
impl<'a> MultiValueIndexInfo for MultivaluedValueArrayIndex<'a> {
fn num_docs(&self) -> u32 {
self.end_offsets.len() as u32
}
fn num_vals(&self) -> u32 {
self.end_offsets.last().copied().unwrap_or(0u32)
}
fn iter(&self) -> Box<dyn Iterator<Item = u32> + '_> {
if self.end_offsets.is_empty() {
return Box::new(std::iter::empty());
}
let n = self.end_offsets.len();
Box::new(std::iter::once(0u32).chain(self.end_offsets[..n - 1].iter().copied()))
}
}
impl MultivaluedIndexBuilder {
pub fn finish(&mut self, num_docs: DocId) -> impl MultiValueIndexInfo + '_ {
self.end_values
.resize(num_docs as usize, self.total_num_vals_seen);
MultivaluedValueArrayIndex {
end_offsets: &self.end_values[..],
}
}
fn reset(&mut self) {
self.end_values.clear();
self.total_num_vals_seen = 0;
}
}
impl IndexBuilder for MultivaluedIndexBuilder {
fn record_doc(&mut self, doc: DocId) {
self.end_values
.resize(doc as usize, self.total_num_vals_seen);
}
fn record_value(&mut self) {
self.total_num_vals_seen += 1;
}
}
/// The `SpareIndexBuilders` is there to avoid allocating a
/// new index builder for every single column.
#[derive(Default)]
pub struct SpareIndexBuilders {
required_index_builder: RequiredIndexBuilder,
optional_index_builder: OptionalIndexBuilder,
multivalued_index_builder: MultivaluedIndexBuilder,
}
impl SpareIndexBuilders {
pub fn borrow_required_index_builder(&mut self) -> &mut RequiredIndexBuilder {
&mut self.required_index_builder
}
pub fn borrow_optional_index_builder(&mut self) -> &mut OptionalIndexBuilder {
self.optional_index_builder.reset();
&mut self.optional_index_builder
}
pub fn borrow_multivalued_index_builder(&mut self) -> &mut MultivaluedIndexBuilder {
self.multivalued_index_builder.reset();
&mut self.multivalued_index_builder
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_optional_value_index_builder() {
let mut opt_value_index_builder = OptionalIndexBuilder::default();
opt_value_index_builder.record_doc(0u32);
opt_value_index_builder.record_value();
assert_eq!(
&opt_value_index_builder
.finish(1u32)
.iter()
.collect::<Vec<u32>>(),
&[0]
);
opt_value_index_builder.reset();
opt_value_index_builder.record_doc(1u32);
opt_value_index_builder.record_value();
assert_eq!(
&opt_value_index_builder
.finish(2u32)
.iter()
.collect::<Vec<u32>>(),
&[1]
);
}
#[test]
fn test_multivalued_value_index_builder() {
let mut multivalued_value_index_builder = MultivaluedIndexBuilder::default();
multivalued_value_index_builder.record_doc(1u32);
multivalued_value_index_builder.record_value();
multivalued_value_index_builder.record_value();
multivalued_value_index_builder.record_doc(2u32);
multivalued_value_index_builder.record_value();
assert_eq!(
multivalued_value_index_builder
.finish(4u32)
.iter()
.collect::<Vec<u32>>(),
vec![0, 0, 2, 3]
);
multivalued_value_index_builder.reset();
multivalued_value_index_builder.record_doc(2u32);
multivalued_value_index_builder.record_value();
multivalued_value_index_builder.record_value();
assert_eq!(
multivalued_value_index_builder
.finish(4u32)
.iter()
.collect::<Vec<u32>>(),
vec![0, 0, 0, 2]
);
}
}

View File

@@ -43,7 +43,7 @@ mod null_index_footer;
mod column;
mod gcd;
mod serialize;
pub mod serialize;
pub use ordered_float;