Compare commits

..

4 Commits

Author SHA1 Message Date
Pascal Seitz
375d1f9dac prepare for merge 2023-01-24 17:18:41 +08:00
Paul Masurel
2874554ee4 Removed the sorting logic that forced column type to be sorted like (#1816)
* Removed the sorting logic that forced column type to be sorted like
ColumnTypes.

* add comments

Co-authored-by: PSeitz <PSeitz@users.noreply.github.com>
2023-01-20 12:43:28 +01:00
PSeitz
cbc70a9eae Cargo.toml cleanup (#1817) 2023-01-20 12:30:35 +01:00
PSeitz
226d0f88bc add columnar to workspace (#1808) 2023-01-20 11:47:10 +01:00
21 changed files with 326 additions and 338 deletions

View File

@@ -55,6 +55,7 @@ measure_time = "0.8.2"
async-trait = "0.1.53"
arc-swap = "1.5.0"
columnar = { version="0.1", path="./columnar", package ="tantivy-columnar" }
sstable = { version="0.1", path="./sstable", package ="tantivy-sstable", optional = true }
stacker = { version="0.1", path="./stacker", package ="tantivy-stacker" }
tantivy-query-grammar = { version= "0.19.0", path="./query-grammar" }
@@ -107,7 +108,7 @@ unstable = [] # useful for benches.
quickwit = ["sstable"]
[workspace]
members = ["query-grammar", "bitpacker", "common", "fastfield_codecs", "ownedbytes", "stacker", "sstable", "tokenizer-api"]
members = ["query-grammar", "bitpacker", "common", "fastfield_codecs", "ownedbytes", "stacker", "sstable", "tokenizer-api", "columnar"]
# Following the "fail" crate best practises, we isolate
# tests that define specific behavior in fail check points

View File

@@ -5,28 +5,23 @@ edition = "2021"
license = "MIT"
[dependencies]
itertools = "0.10.5"
log = "0.4.17"
fnv = "1.0.7"
fastdivide = "0.4.0"
rand = { version = "0.8.5", optional = true }
measure_time = { version = "0.8.2", optional = true }
prettytable-rs = { version = "0.10.0", optional = true }
stacker = { path = "../stacker", package="tantivy-stacker"}
serde_json = "1"
thiserror = "1"
fnv = "1"
sstable = { path = "../sstable", package = "tantivy-sstable" }
common = { path = "../common", package = "tantivy-common" }
itertools = "0.10"
log = "0.4"
tantivy-bitpacker = { version= "0.3", path = "../bitpacker/" }
prettytable-rs = {version="0.10.0", optional= true}
rand = {version="0.8.3", optional= true}
fastdivide = "0.4"
measure_time = { version="0.8.2", optional=true}
[dev-dependencies]
proptest = "1"
more-asserts = "0.3.0"
rand = "0.8.3"
# temporary
[workspace]
members = []
proptest = "1.0.0"
more-asserts = "0.3.1"
rand = "0.8.5"
[features]
unstable = []

View File

@@ -22,6 +22,9 @@ pub struct Column<T> {
}
impl<T: PartialOrd> Column<T> {
pub fn get_cardinality(&self) -> Cardinality {
self.idx.get_cardinality()
}
pub fn num_rows(&self) -> RowId {
match &self.idx {
ColumnIndex::Full => self.values.num_vals() as u32,

View File

@@ -5,8 +5,8 @@ use std::sync::Arc;
mod set;
mod set_block;
use common::{BinarySerializable, OwnedBytes, VInt};
pub use set::{Set, SetCodec, SelectCursor};
use common::{BinarySerializable, GroupByIteratorExtended, OwnedBytes, VInt};
pub use set::{Set, SetCodec};
use set_block::{
DenseBlock, DenseBlockCodec, SparseBlock, SparseBlockCodec, DENSE_BLOCK_NUM_BYTES,
};
@@ -115,59 +115,7 @@ fn row_addr_from_row_id(row_id: RowId) -> RowAddr {
}
}
enum BlockSelectCursor<'a> {
Dense(<DenseBlock<'a> as Set<u16>>::SelectCursor<'a>),
Sparse(<SparseBlock<'a> as Set<u16>>::SelectCursor<'a>),
}
impl<'a> BlockSelectCursor<'a> {
fn select(&mut self, rank: u16) -> u16 {
match self {
BlockSelectCursor::Dense(dense_select_cursor) => dense_select_cursor.select(rank),
BlockSelectCursor::Sparse(sparse_select_cursor) => sparse_select_cursor.select(rank),
}
}
}
pub struct OptionalIndexSelectCursor<'a> {
current_block_cursor: BlockSelectCursor<'a>,
current_block_id: u16,
// The current block is guaranteed to contain ranks < end_rank.
current_block_end_rank: RowId,
optional_index: &'a OptionalIndex,
block_doc_idx_start: RowId,
num_null_rows_before_block: RowId,
}
impl<'a> OptionalIndexSelectCursor<'a> {
fn search_and_load_block(&mut self, rank: RowId) {
if rank < self.current_block_end_rank {
// we are already in the right block
return;
}
self.current_block_id = self.optional_index.find_block(rank, self.current_block_id);
self.current_block_end_rank = self.optional_index.block_metas.get(self.current_block_id as usize + 1).map(|block_meta| block_meta.non_null_rows_before_block).unwrap_or(u32::MAX);
self.block_doc_idx_start = (self.current_block_id as u32) * ELEMENTS_PER_BLOCK;
let block_meta = self.optional_index.block_metas[self.current_block_id as usize];
self.num_null_rows_before_block = block_meta.non_null_rows_before_block;
let block: Block<'_> = self.optional_index.block(block_meta);
self.current_block_cursor = match block {
Block::Dense(dense_block) => BlockSelectCursor::Dense(dense_block.select_cursor()),
Block::Sparse(sparse_block) => BlockSelectCursor::Sparse(sparse_block.select_cursor()),
};
}
}
impl<'a> SelectCursor<RowId> for OptionalIndexSelectCursor<'a> {
fn select(&mut self, rank: RowId) -> RowId {
self.search_and_load_block(rank);
let index_in_block = (rank - self.num_null_rows_before_block) as u16;
self.current_block_cursor.select(index_in_block) as RowId + self.block_doc_idx_start
}
}
impl Set<RowId> for OptionalIndex {
type SelectCursor<'b> = OptionalIndexSelectCursor<'b> where Self: 'b;
// Check if value at position is not null.
#[inline]
fn contains(&self, row_id: RowId) -> bool {
@@ -200,7 +148,7 @@ impl Set<RowId> for OptionalIndex {
#[inline]
fn select(&self, rank: RowId) -> RowId {
let block_pos = self.find_block(rank, 0);
let block_doc_idx_start = (block_pos as u32) * ELEMENTS_PER_BLOCK;
let block_doc_idx_start = block_pos * ELEMENTS_PER_BLOCK;
let block_meta = self.block_metas[block_pos as usize];
let block: Block<'_> = self.block(block_meta);
let index_in_block = (rank - block_meta.non_null_rows_before_block) as u16;
@@ -211,27 +159,39 @@ impl Set<RowId> for OptionalIndex {
block_doc_idx_start + in_block_rank as u32
}
fn select_cursor<'b>(&'b self) -> OptionalIndexSelectCursor<'b> {
OptionalIndexSelectCursor {
current_block_cursor: BlockSelectCursor::Sparse(SparseBlockCodec::open(b"").select_cursor()),
current_block_id: 0u16,
current_block_end_rank: 0u32, //< this is sufficient to force the first load
optional_index: self,
block_doc_idx_start: 0u32,
num_null_rows_before_block: 0u32,
fn select_batch(&self, ranks: &[u32], output_idxs: &mut [u32]) {
let mut block_pos = 0u32;
let mut start = 0;
let group_by_it = ranks.iter().copied().group_by(move |codec_idx| {
block_pos = self.find_block(*codec_idx, block_pos);
block_pos
});
for (block_pos, block_iter) in group_by_it {
let block_doc_idx_start = block_pos * ELEMENTS_PER_BLOCK;
let block_meta = self.block_metas[block_pos as usize];
let block: Block<'_> = self.block(block_meta);
let offset = block_meta.non_null_rows_before_block;
let indexes_in_block_iter =
block_iter.map(move |codec_idx| (codec_idx - offset) as u16);
match block {
Block::Dense(dense_block) => {
for in_offset in dense_block.select_iter(indexes_in_block_iter) {
output_idxs[start] = in_offset as u32 + block_doc_idx_start;
start += 1;
}
}
Block::Sparse(sparse_block) => {
for in_offset in sparse_block.select_iter(indexes_in_block_iter) {
output_idxs[start] = in_offset as u32 + block_doc_idx_start;
start += 1;
}
}
};
}
}
}
impl OptionalIndex {
pub fn select_batch(&self, ranks: &mut [RowId]) {
let mut select_cursor = self.select_cursor();
for rank in ranks.iter_mut() {
*rank = select_cursor.select(*rank);
}
}
#[inline]
fn block<'a>(&'a self, block_meta: BlockMeta) -> Block<'a> {
let BlockMeta {
@@ -254,14 +214,14 @@ impl OptionalIndex {
}
#[inline]
fn find_block(&self, dense_idx: u32, start_block_pos: u16) -> u16 {
for block_pos in start_block_pos..self.block_metas.len() as u16 {
fn find_block(&self, dense_idx: u32, start_block_pos: u32) -> u32 {
for block_pos in start_block_pos..self.block_metas.len() as u32 {
let offset = self.block_metas[block_pos as usize].non_null_rows_before_block;
if offset > dense_idx {
return block_pos - 1u16;
return block_pos - 1;
}
}
self.block_metas.len() as u16 - 1u16
self.block_metas.len() as u32 - 1u32
}
// TODO Add a good API for the codec_idx to original_idx translation.

View File

@@ -13,19 +13,7 @@ pub trait SetCodec {
fn open<'a>(data: &'a [u8]) -> Self::Reader<'a>;
}
/// Stateful object that makes it possible to compute several select in a row,
/// provided the rank passed as argument are increasing.
pub trait SelectCursor<T> {
// May panic if rank is greater than the number of elements in the Set,
// or if rank is < than value provided in the previous call.
fn select(&mut self, rank: T) -> T;
}
pub trait Set<T> {
type SelectCursor<'b>: SelectCursor<T> where Self: 'b;
/// Returns true if the elements is contained in the Set
fn contains(&self, el: T) -> bool;
@@ -40,6 +28,11 @@ pub trait Set<T> {
/// May panic if rank is greater than the number of elements in the Set.
fn select(&self, rank: T) -> T;
/// Creates a brand new select cursor.
fn select_cursor<'b>(&'b self,) -> Self::SelectCursor<'b>;
/// Batch version of select.
/// `ranks` is assumed to be sorted.
///
/// # Panics
///
/// May panic if rank is greater than the number of elements in the Set.
fn select_batch(&self, ranks: &[T], outputs: &mut [T]);
}

View File

@@ -3,7 +3,7 @@ use std::io::{self, Write};
use common::BinarySerializable;
use crate::column_index::optional_index::{Set, SetCodec, SelectCursor, ELEMENTS_PER_BLOCK};
use crate::column_index::optional_index::{Set, SetCodec, ELEMENTS_PER_BLOCK};
#[inline(always)]
fn get_bit_at(input: u64, n: u16) -> bool {
@@ -105,24 +105,7 @@ impl DenseMiniBlock {
#[derive(Copy, Clone)]
pub struct DenseBlock<'a>(&'a [u8]);
pub struct DenseBlockSelectCursor<'a> {
block_id: u16,
dense_block: DenseBlock<'a>,
}
impl<'a> SelectCursor<u16> for DenseBlockSelectCursor<'a> {
#[inline]
fn select(&mut self, rank: u16) -> u16 {
self.block_id = self.dense_block.find_miniblock_containing_rank(rank, self.block_id).unwrap();
let index_block = self.dense_block.mini_block(self.block_id);
let in_block_rank = rank - index_block.rank;
self.block_id * ELEMENTS_PER_MINI_BLOCK + select_u64(index_block.bitvec, in_block_rank)
}
}
impl<'a> Set<u16> for DenseBlock<'a> {
type SelectCursor<'b> = DenseBlockSelectCursor<'a> where Self: 'b;
#[inline(always)]
fn contains(&self, el: u16) -> bool {
let mini_block_id = el / ELEMENTS_PER_MINI_BLOCK;
@@ -153,15 +136,37 @@ impl<'a> Set<u16> for DenseBlock<'a> {
block_id * ELEMENTS_PER_MINI_BLOCK + select_u64(index_block.bitvec, in_block_rank)
}
#[inline(always)]
fn select_cursor<'b>(&'b self,) -> Self::SelectCursor<'b> {
DenseBlockSelectCursor {
block_id: 0,
dense_block: *self,
fn select_batch(&self, ranks: &[u16], outputs: &mut [u16]) {
let orig_ids = self.select_iter(ranks.iter().copied());
for (output, original_id) in outputs.iter_mut().zip(orig_ids) {
*output = original_id;
}
}
}
impl<'a> DenseBlock<'a> {
/// Iterator verison of select.
///
/// # Panics
/// Panics if one of the rank is higher than the number of elements in the set.
pub fn select_iter<'b>(
&self,
rank_it: impl Iterator<Item = u16> + 'b,
) -> impl Iterator<Item = u16> + 'b
where
Self: 'b,
{
let mut block_id = 0u16;
let me = *self;
rank_it.map(move |rank| {
block_id = me.find_miniblock_containing_rank(rank, block_id).unwrap();
let index_block = me.mini_block(block_id);
let in_block_rank = rank - index_block.rank;
block_id * ELEMENTS_PER_MINI_BLOCK + select_u64(index_block.bitvec, in_block_rank)
})
}
}
impl<'a> DenseBlock<'a> {
#[inline]
fn mini_block(&self, mini_block_id: u16) -> DenseMiniBlock {

View File

@@ -1,4 +1,4 @@
use crate::column_index::optional_index::{Set, SetCodec, SelectCursor};
use crate::column_index::optional_index::{Set, SetCodec};
pub struct SparseBlockCodec;
@@ -24,17 +24,7 @@ impl SetCodec for SparseBlockCodec {
#[derive(Copy, Clone)]
pub struct SparseBlock<'a>(&'a [u8]);
impl<'a> SelectCursor<u16> for SparseBlock<'a> {
#[inline]
fn select(&mut self, rank: u16) -> u16 {
<SparseBlock<'a> as Set<u16>>::select(self, rank)
}
}
impl<'a> Set<u16> for SparseBlock<'a> {
type SelectCursor<'b> = Self where Self: 'b;
#[inline(always)]
fn contains(&self, el: u16) -> bool {
self.binary_search(el).is_ok()
@@ -51,11 +41,12 @@ impl<'a> Set<u16> for SparseBlock<'a> {
u16::from_le_bytes(self.0[offset..offset + 2].try_into().unwrap())
}
#[inline(always)]
fn select_cursor<'b>(&'b self,) -> Self::SelectCursor<'b> {
*self
fn select_batch(&self, ranks: &[u16], outputs: &mut [u16]) {
let orig_ids = self.select_iter(ranks.iter().copied());
for (output, original_id) in outputs.iter_mut().zip(orig_ids) {
*output = original_id;
}
}
}
#[inline(always)]
@@ -105,4 +96,17 @@ impl<'a> SparseBlock<'a> {
}
Err(left)
}
pub fn select_iter<'b>(
&self,
iter: impl Iterator<Item = u16> + 'b,
) -> impl Iterator<Item = u16> + 'b
where
Self: 'b,
{
iter.map(|codec_id| {
let offset = codec_id as usize * 2;
u16::from_le_bytes(self.0[offset..offset + 2].try_into().unwrap())
})
}
}

View File

@@ -1,8 +1,9 @@
use std::collections::HashMap;
use crate::column_index::optional_index::set_block::dense::DENSE_BLOCK_NUM_BYTES;
use crate::column_index::optional_index::set_block::{DenseBlockCodec, SparseBlockCodec};
use crate::column_index::optional_index::{Set, SetCodec, SelectCursor};
use crate::column_index::optional_index::set_block::{
DenseBlockCodec, SparseBlockCodec, DENSE_BLOCK_NUM_BYTES,
};
use crate::column_index::optional_index::{Set, SetCodec};
fn test_set_helper<C: SetCodec<Item = u16>>(vals: &[u16]) -> usize {
let mut buffer = Vec::new();
@@ -74,10 +75,12 @@ fn test_simple_translate_codec_codec_idx_to_original_idx_dense() {
.unwrap();
let tested_set = DenseBlockCodec::open(buffer.as_slice());
assert!(tested_set.contains(1));
let mut select_cursor = tested_set.select_cursor();
assert_eq!(select_cursor.select(0), 1);
assert_eq!(select_cursor.select(1), 3);
assert_eq!(select_cursor.select(2), 17);
assert_eq!(
&tested_set
.select_iter([0, 1, 2, 5].iter().copied())
.collect::<Vec<u16>>(),
&[1, 3, 17, 30_001]
);
}
#[test]
@@ -86,10 +89,12 @@ fn test_simple_translate_codec_idx_to_original_idx_sparse() {
SparseBlockCodec::serialize([1, 3, 17].iter().copied(), &mut buffer).unwrap();
let tested_set = SparseBlockCodec::open(buffer.as_slice());
assert!(tested_set.contains(1));
let mut select_cursor = tested_set.select_cursor();
assert_eq!(SelectCursor::select(&mut select_cursor, 0), 1);
assert_eq!(SelectCursor::select(&mut select_cursor, 1), 3);
assert_eq!(SelectCursor::select(&mut select_cursor, 2), 17);
assert_eq!(
&tested_set
.select_iter([0, 1, 2].iter().copied())
.collect::<Vec<u16>>(),
&[1, 3, 17]
);
}
#[test]
@@ -98,8 +103,10 @@ fn test_simple_translate_codec_idx_to_original_idx_dense() {
DenseBlockCodec::serialize(0u16..150u16, &mut buffer).unwrap();
let tested_set = DenseBlockCodec::open(buffer.as_slice());
assert!(tested_set.contains(1));
let mut select_cursor = tested_set.select_cursor();
for i in 0..150 {
assert_eq!(i, select_cursor.select(i));
}
let rg = 0u16..150u16;
let els: Vec<u16> = rg.clone().collect();
assert_eq!(
&tested_set.select_iter(rg.clone()).collect::<Vec<u16>>(),
&els
);
}

View File

@@ -41,10 +41,9 @@ fn test_with_random_sets_simple() {
let null_index = open_optional_index(OwnedBytes::new(out)).unwrap();
let ranks: Vec<u32> = (65_472u32..65_473u32).collect();
let els: Vec<u32> = ranks.iter().copied().map(|rank| rank + 10).collect();
let mut select_cursor = null_index.select_cursor();
for (rank, el) in ranks.iter().copied().zip(els.iter().copied()) {
assert_eq!(select_cursor.select(rank), el);
}
let mut output = vec![0u32; ranks.len()];
null_index.select_batch(&ranks[..], &mut output[..]);
assert_eq!(&output, &els);
}
#[test]
@@ -92,10 +91,11 @@ fn test_null_index(data: &[bool]) {
.filter(|(_pos, val)| **val)
.map(|(pos, _val)| pos as u32)
.collect();
let mut select_iter = null_index.select_cursor();
for i in 0..orig_idx_with_value.len() {
assert_eq!(select_iter.select(i as u32), orig_idx_with_value[i]);
}
let ids: Vec<u32> = (0..orig_idx_with_value.len() as u32).collect();
let mut output = vec![0u32; ids.len()];
null_index.select_batch(&ids[..], &mut output);
// assert_eq!(&output[0..100], &orig_idx_with_value[0..100]);
assert_eq!(output, orig_idx_with_value);
let step_size = (orig_idx_with_value.len() / 100).max(1);
for (dense_idx, orig_idx) in orig_idx_with_value.iter().enumerate().step_by(step_size) {
@@ -115,9 +115,9 @@ fn test_optional_index_test_translation() {
let iter = &[true, false, true, false];
serialize_optional_index(&&iter[..], &mut out).unwrap();
let null_index = open_optional_index(OwnedBytes::new(out)).unwrap();
let mut select_cursor = null_index.select_cursor();
assert_eq!(select_cursor.select(0), 0);
assert_eq!(select_cursor.select(1), 2);
let mut output = vec![0u32; 2];
null_index.select_batch(&[0, 1], &mut output);
assert_eq!(output, &[0, 2]);
}
#[test]
@@ -175,6 +175,7 @@ mod bench {
.map(|_| rng.gen_bool(fill_ratio))
.collect();
serialize_optional_index(&&vals[..], &mut out).unwrap();
let codec = open_optional_index(OwnedBytes::new(out)).unwrap();
codec
}
@@ -310,8 +311,7 @@ mod bench {
};
let mut output = vec![0u32; idxs.len()];
bench.iter(|| {
output.copy_from_slice(&idxs[..]);
codec.select_batch(&mut output);
codec.select_batch(&idxs[..], &mut output);
});
}

View File

@@ -3,24 +3,22 @@ use std::net::Ipv6Addr;
use crate::value::NumericalType;
use crate::InvalidData;
/// The column type represents the column type and can fit on 6-bits.
///
/// - bits[0..3]: Column category type.
/// - bits[3..6]: Numerical type if necessary.
#[derive(Hash, Eq, PartialEq, Debug, Clone, Copy)]
/// The column type represents the column type.
/// Any changes need to be propagated to `COLUMN_TYPES`.
#[derive(Hash, Eq, PartialEq, Debug, Clone, Copy, Ord, PartialOrd)]
#[repr(u8)]
pub enum ColumnType {
I64 = 0u8,
U64 = 1u8,
F64 = 2u8,
Bytes = 10u8,
Str = 14u8,
Bool = 18u8,
IpAddr = 22u8,
DateTime = 26u8,
Bytes = 3u8,
Str = 4u8,
Bool = 5u8,
IpAddr = 6u8,
DateTime = 7u8,
}
#[cfg(test)]
// The order needs to match _exactly_ the order in the enum
const COLUMN_TYPES: [ColumnType; 8] = [
ColumnType::I64,
ColumnType::U64,
@@ -38,18 +36,7 @@ impl ColumnType {
}
pub(crate) fn try_from_code(code: u8) -> Result<ColumnType, InvalidData> {
use ColumnType::*;
match code {
0u8 => Ok(I64),
1u8 => Ok(U64),
2u8 => Ok(F64),
10u8 => Ok(Bytes),
14u8 => Ok(Str),
18u8 => Ok(Bool),
22u8 => Ok(IpAddr),
26u8 => Ok(Self::DateTime),
_ => Err(InvalidData),
}
COLUMN_TYPES.get(code as usize).copied().ok_or(InvalidData)
}
}
@@ -64,18 +51,6 @@ impl From<NumericalType> for ColumnType {
}
impl ColumnType {
/// get column type category
pub(crate) fn column_type_category(self) -> ColumnTypeCategory {
match self {
ColumnType::I64 | ColumnType::U64 | ColumnType::F64 => ColumnTypeCategory::Numerical,
ColumnType::Bytes => ColumnTypeCategory::Bytes,
ColumnType::Str => ColumnTypeCategory::Str,
ColumnType::Bool => ColumnTypeCategory::Bool,
ColumnType::IpAddr => ColumnTypeCategory::IpAddr,
ColumnType::DateTime => ColumnTypeCategory::DateTime,
}
}
pub fn numerical_type(&self) -> Option<NumericalType> {
match self {
ColumnType::I64 => Some(NumericalType::I64),
@@ -154,70 +129,20 @@ impl HasAssociatedColumnType for Ipv6Addr {
}
}
/// Column types are grouped into different categories that
/// corresponds to the different types of `JsonValue` types.
///
/// The columnar writer will apply coercion rules to make sure that
/// at most one column exist per `ColumnTypeCategory`.
///
/// See also [README.md].
#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)]
#[repr(u8)]
pub enum ColumnTypeCategory {
Bool,
Str,
Numerical,
DateTime,
Bytes,
IpAddr,
}
impl From<ColumnType> for ColumnTypeCategory {
fn from(column_type: ColumnType) -> Self {
match column_type {
ColumnType::I64 => ColumnTypeCategory::Numerical,
ColumnType::U64 => ColumnTypeCategory::Numerical,
ColumnType::F64 => ColumnTypeCategory::Numerical,
ColumnType::Bytes => ColumnTypeCategory::Bytes,
ColumnType::Str => ColumnTypeCategory::Str,
ColumnType::Bool => ColumnTypeCategory::Bool,
ColumnType::IpAddr => ColumnTypeCategory::IpAddr,
ColumnType::DateTime => ColumnTypeCategory::DateTime,
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use super::*;
use crate::Cardinality;
#[test]
fn test_column_type_to_code() {
let mut column_type_set: HashSet<ColumnType> = HashSet::new();
for code in u8::MIN..=u8::MAX {
if let Ok(column_type) = ColumnType::try_from_code(code) {
assert_eq!(column_type.to_code(), code);
assert!(column_type_set.insert(column_type));
for (code, expected_column_type) in super::COLUMN_TYPES.iter().copied().enumerate() {
if let Ok(column_type) = ColumnType::try_from_code(code as u8) {
assert_eq!(column_type, expected_column_type);
}
}
assert_eq!(column_type_set.len(), super::COLUMN_TYPES.len());
}
#[test]
fn test_column_category_sort_consistent_with_column_type_sort() {
// This is a very important property because we
// we need to serialize colunmn in the right order.
let mut column_types: Vec<ColumnType> = super::COLUMN_TYPES.iter().copied().collect();
column_types.sort_by_key(|col| col.to_code());
let column_categories: Vec<ColumnTypeCategory> = column_types
.into_iter()
.map(ColumnTypeCategory::from)
.collect();
for (prev, next) in column_categories.iter().zip(column_categories.iter()) {
assert!(prev <= next);
for code in COLUMN_TYPES.len() as u8..=u8::MAX {
assert!(ColumnType::try_from_code(code as u8).is_err());
}
}

View File

@@ -1,9 +1,10 @@
use std::collections::HashMap;
use std::io;
use super::column_type::ColumnTypeCategory;
use super::writer::ColumnarSerializer;
use crate::columnar::ColumnarReader;
use crate::dynamic_column::DynamicColumn;
use crate::{Cardinality, ColumnType};
pub enum MergeDocOrder {
/// Columnar tables are simply stacked one above the other.
@@ -19,24 +20,100 @@ pub enum MergeDocOrder {
}
pub fn merge_columnar(
_columnar_readers: &[ColumnarReader],
columnar_readers: &[ColumnarReader],
mapping: MergeDocOrder,
_output: &mut impl io::Write,
output: &mut impl io::Write,
) -> io::Result<()> {
match mapping {
MergeDocOrder::Stack => {
// implement me :)
todo!();
let mut serializer = ColumnarSerializer::new(output);
// TODO handle dictionary merge for Str/Bytes column
let field_name_to_group = group_columns_for_merge(columnar_readers)?;
for (column_name, category_to_columns) in field_name_to_group {
for (_category, columns_to_merge) in category_to_columns {
let column_type = columns_to_merge[0].column_type();
let mut column_serialzier =
serializer.serialize_column(column_name.as_bytes(), column_type);
merge_columns(
column_type,
&columns_to_merge,
&mapping,
&mut column_serialzier,
)?;
}
MergeDocOrder::Complex(_) => {
// for later
todo!();
}
serializer.finalize()?;
Ok(())
}
/// Column types are grouped into different categories.
/// After merge, all columns belonging to the same category are coerced to
/// the same column type.
///
/// In practise, today, only Numerical colummns are coerced into one type today.
///
/// See also [README.md].
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
#[repr(u8)]
pub enum ColumnTypeCategory {
Bool,
Str,
Numerical,
DateTime,
Bytes,
IpAddr,
}
impl From<ColumnType> for ColumnTypeCategory {
fn from(column_type: ColumnType) -> Self {
match column_type {
ColumnType::I64 => ColumnTypeCategory::Numerical,
ColumnType::U64 => ColumnTypeCategory::Numerical,
ColumnType::F64 => ColumnTypeCategory::Numerical,
ColumnType::Bytes => ColumnTypeCategory::Bytes,
ColumnType::Str => ColumnTypeCategory::Str,
ColumnType::Bool => ColumnTypeCategory::Bool,
ColumnType::IpAddr => ColumnTypeCategory::IpAddr,
ColumnType::DateTime => ColumnTypeCategory::DateTime,
}
}
}
pub fn collect_columns(
columnar_readers: &[&ColumnarReader],
pub fn detect_cardinality(columns: &[DynamicColumn]) -> Cardinality {
if columns
.iter()
.any(|column| column.get_cardinality().is_multivalue())
{
return Cardinality::Multivalued;
}
if columns
.iter()
.any(|column| column.get_cardinality().is_optional())
{
return Cardinality::Optional;
}
Cardinality::Full
}
pub fn compute_num_docs(columns: &[DynamicColumn], mapping: &MergeDocOrder) -> usize {
// TODO handle deletes
0
}
pub fn merge_columns(
column_type: ColumnType,
columns: &[DynamicColumn],
mapping: &MergeDocOrder,
column_serializer: &mut impl io::Write,
) -> io::Result<()> {
let cardinality = detect_cardinality(columns);
Ok(())
}
pub fn group_columns_for_merge(
columnar_readers: &[ColumnarReader],
) -> io::Result<HashMap<String, HashMap<ColumnTypeCategory, Vec<DynamicColumn>>>> {
// Each column name may have multiple types of column associated.
// For merging we are interested in the same column type category since they can be merged.
@@ -51,7 +128,7 @@ pub fn collect_columns(
.or_default();
let columns = column_type_to_handles
.entry(handle.column_type().column_type_category())
.entry(handle.column_type().into())
.or_default();
columns.push(handle.open()?);
}
@@ -62,10 +139,9 @@ pub fn collect_columns(
Ok(field_name_to_group)
}
/// Cast numerical type columns to the same type
pub(crate) fn normalize_columns(
map: &mut HashMap<String, HashMap<ColumnTypeCategory, Vec<DynamicColumn>>>,
) {
/// Coerce numerical type columns to the same type
/// TODO rename to `coerce_columns`
fn normalize_columns(map: &mut HashMap<String, HashMap<ColumnTypeCategory, Vec<DynamicColumn>>>) {
for (_field_name, type_category_to_columns) in map.iter_mut() {
for (type_category, columns) in type_category_to_columns {
if type_category == &ColumnTypeCategory::Numerical {
@@ -85,26 +161,20 @@ fn cast_to_common_numerical_column(columns: &[DynamicColumn]) -> Vec<DynamicColu
.all(|column| column.column_type().numerical_type().is_some()));
let coerce_to_i64: Vec<_> = columns
.iter()
.map(|column| column.clone().coerce_to_i64())
.filter_map(|column| column.clone().coerce_to_i64())
.collect();
if coerce_to_i64.iter().all(|column| column.is_some()) {
return coerce_to_i64
.into_iter()
.map(|column| column.unwrap())
.collect();
if coerce_to_i64.len() == columns.len() {
return coerce_to_i64;
}
let coerce_to_u64: Vec<_> = columns
.iter()
.map(|column| column.clone().coerce_to_u64())
.filter_map(|column| column.clone().coerce_to_u64())
.collect();
if coerce_to_u64.iter().all(|column| column.is_some()) {
return coerce_to_u64
.into_iter()
.map(|column| column.unwrap())
.collect();
if coerce_to_u64.len() == columns.len() {
return coerce_to_u64;
}
columns
@@ -151,7 +221,9 @@ mod tests {
ColumnarReader::open(buffer).unwrap()
};
let column_map = collect_columns(&[&columnar1, &columnar2, &columnar3]).unwrap();
let column_map =
group_columns_for_merge(&[columnar1.clone(), columnar2.clone(), columnar3.clone()])
.unwrap();
assert_eq!(column_map.len(), 1);
let cat_to_columns = column_map.get("numbers").unwrap();
assert_eq!(cat_to_columns.len(), 1);
@@ -159,14 +231,14 @@ mod tests {
let numerical = cat_to_columns.get(&ColumnTypeCategory::Numerical).unwrap();
assert!(numerical.iter().all(|column| column.is_f64()));
let column_map = collect_columns(&[&columnar1, &columnar1]).unwrap();
let column_map = group_columns_for_merge(&[columnar1.clone(), columnar1.clone()]).unwrap();
assert_eq!(column_map.len(), 1);
let cat_to_columns = column_map.get("numbers").unwrap();
assert_eq!(cat_to_columns.len(), 1);
let numerical = cat_to_columns.get(&ColumnTypeCategory::Numerical).unwrap();
assert!(numerical.iter().all(|column| column.is_i64()));
let column_map = collect_columns(&[&columnar2, &columnar2]).unwrap();
let column_map = group_columns_for_merge(&[columnar2.clone(), columnar2.clone()]).unwrap();
assert_eq!(column_map.len(), 1);
let cat_to_columns = column_map.get("numbers").unwrap();
assert_eq!(cat_to_columns.len(), 1);

View File

@@ -0,0 +1 @@

View File

@@ -1,6 +1,7 @@
mod column_type;
mod format_version;
mod merge;
mod merge_index;
mod reader;
mod writer;

View File

@@ -13,6 +13,7 @@ fn io_invalid_data(msg: String) -> io::Error {
/// The ColumnarReader makes it possible to access a set of columns
/// associated to field names.
#[derive(Clone)]
pub struct ColumnarReader {
column_dictionary: Dictionary<RangeSSTable>,
column_data: FileSlice,

View File

@@ -184,10 +184,12 @@ impl CompatibleNumericalTypes {
}
impl NumericalColumnWriter {
pub fn column_type_and_cardinality(&self, num_docs: RowId) -> (NumericalType, Cardinality) {
let numerical_type = self.compatible_numerical_types.to_numerical_type();
let cardinality = self.column_writer.get_cardinality(num_docs);
(numerical_type, cardinality)
pub fn numerical_type(&self) -> NumericalType {
self.compatible_numerical_types.to_numerical_type()
}
pub fn cardinality(&self, num_docs: RowId) -> Cardinality {
self.column_writer.get_cardinality(num_docs)
}
pub fn record_numerical_value(

View File

@@ -8,14 +8,14 @@ use std::net::Ipv6Addr;
use column_operation::ColumnOperation;
use common::CountingWriter;
use serializer::ColumnarSerializer;
pub(crate) use serializer::ColumnarSerializer;
use stacker::{Addr, ArenaHashMap, MemoryArena};
use crate::column_index::SerializableColumnIndex;
use crate::column_values::{
ColumnValues, MonotonicallyMappableToU128, MonotonicallyMappableToU64, VecColumn,
};
use crate::columnar::column_type::{ColumnType, ColumnTypeCategory};
use crate::columnar::column_type::ColumnType;
use crate::columnar::writer::column_writers::{
ColumnWriter, NumericalColumnWriter, StrOrBytesColumnWriter,
};
@@ -276,35 +276,40 @@ impl ColumnarWriter {
}
pub fn serialize(&mut self, num_docs: RowId, wrt: &mut dyn io::Write) -> io::Result<()> {
let mut serializer = ColumnarSerializer::new(wrt);
let mut columns: Vec<(&[u8], ColumnTypeCategory, Addr)> = self
let mut columns: Vec<(&[u8], ColumnType, Addr)> = self
.numerical_field_hash_map
.iter()
.map(|(column_name, addr, _)| (column_name, ColumnTypeCategory::Numerical, addr))
.map(|(column_name, addr, _)| {
let numerical_column_writer: NumericalColumnWriter =
self.numerical_field_hash_map.read(addr);
let column_type = numerical_column_writer.numerical_type().into();
(column_name, column_type, addr)
})
.collect();
columns.extend(
self.bytes_field_hash_map
.iter()
.map(|(term, addr, _)| (term, ColumnTypeCategory::Bytes, addr)),
.map(|(term, addr, _)| (term, ColumnType::Bytes, addr)),
);
columns.extend(
self.str_field_hash_map
.iter()
.map(|(column_name, addr, _)| (column_name, ColumnTypeCategory::Str, addr)),
.map(|(column_name, addr, _)| (column_name, ColumnType::Str, addr)),
);
columns.extend(
self.bool_field_hash_map
.iter()
.map(|(column_name, addr, _)| (column_name, ColumnTypeCategory::Bool, addr)),
.map(|(column_name, addr, _)| (column_name, ColumnType::Bool, addr)),
);
columns.extend(
self.ip_addr_field_hash_map
.iter()
.map(|(column_name, addr, _)| (column_name, ColumnTypeCategory::IpAddr, addr)),
.map(|(column_name, addr, _)| (column_name, ColumnType::IpAddr, addr)),
);
columns.extend(
self.datetime_field_hash_map
.iter()
.map(|(column_name, addr, _)| (column_name, ColumnTypeCategory::DateTime, addr)),
.map(|(column_name, addr, _)| (column_name, ColumnType::DateTime, addr)),
);
columns.sort_unstable_by_key(|(column_name, col_type, _)| (*column_name, *col_type));
@@ -312,8 +317,12 @@ impl ColumnarWriter {
let mut symbol_byte_buffer: Vec<u8> = Vec::new();
for (column_name, column_type, addr) in columns {
match column_type {
ColumnTypeCategory::Bool => {
let column_writer: ColumnWriter = self.bool_field_hash_map.read(addr);
ColumnType::Bool | ColumnType::DateTime => {
let column_writer: ColumnWriter = if column_type == ColumnType::Bool {
self.bool_field_hash_map.read(addr)
} else {
self.datetime_field_hash_map.read(addr)
};
let cardinality = column_writer.get_cardinality(num_docs);
let mut column_serializer =
serializer.serialize_column(column_name, ColumnType::Bool);
@@ -325,7 +334,7 @@ impl ColumnarWriter {
&mut column_serializer,
)?;
}
ColumnTypeCategory::IpAddr => {
ColumnType::IpAddr => {
let column_writer: ColumnWriter = self.ip_addr_field_hash_map.read(addr);
let cardinality = column_writer.get_cardinality(num_docs);
let mut column_serializer =
@@ -338,32 +347,35 @@ impl ColumnarWriter {
&mut column_serializer,
)?;
}
ColumnTypeCategory::Bytes | ColumnTypeCategory::Str => {
let (column_type, str_column_writer): (ColumnType, StrOrBytesColumnWriter) =
if column_type == ColumnTypeCategory::Bytes {
(ColumnType::Bytes, self.bytes_field_hash_map.read(addr))
ColumnType::Bytes | ColumnType::Str => {
let str_or_bytes_column_writer: StrOrBytesColumnWriter =
if column_type == ColumnType::Bytes {
self.bytes_field_hash_map.read(addr)
} else {
(ColumnType::Str, self.str_field_hash_map.read(addr))
self.str_field_hash_map.read(addr)
};
let dictionary_builder =
&dictionaries[str_column_writer.dictionary_id as usize];
let cardinality = str_column_writer.column_writer.get_cardinality(num_docs);
&dictionaries[str_or_bytes_column_writer.dictionary_id as usize];
let cardinality = str_or_bytes_column_writer
.column_writer
.get_cardinality(num_docs);
let mut column_serializer =
serializer.serialize_column(column_name, column_type);
serialize_bytes_or_str_column(
cardinality,
num_docs,
dictionary_builder,
str_column_writer.operation_iterator(arena, &mut symbol_byte_buffer),
str_or_bytes_column_writer
.operation_iterator(arena, &mut symbol_byte_buffer),
buffers,
&mut column_serializer,
)?;
}
ColumnTypeCategory::Numerical => {
ColumnType::I64 | ColumnType::F64 | ColumnType::U64 => {
let numerical_column_writer: NumericalColumnWriter =
self.numerical_field_hash_map.read(addr);
let (numerical_type, cardinality) =
numerical_column_writer.column_type_and_cardinality(num_docs);
let numerical_type = column_type.numerical_type().unwrap();
let cardinality = numerical_column_writer.cardinality(num_docs);
let mut column_serializer =
serializer.serialize_column(column_name, ColumnType::from(numerical_type));
serialize_numerical_column(
@@ -375,20 +387,6 @@ impl ColumnarWriter {
&mut column_serializer,
)?;
}
ColumnTypeCategory::DateTime => {
let column_writer: ColumnWriter = self.datetime_field_hash_map.read(addr);
let cardinality = column_writer.get_cardinality(num_docs);
let mut column_serializer =
serializer.serialize_column(column_name, ColumnType::DateTime);
serialize_numerical_column(
cardinality,
num_docs,
NumericalType::I64,
column_writer.operation_iterator(arena, &mut symbol_byte_buffer),
buffers,
&mut column_serializer,
)?;
}
};
}
serializer.finalize()?;

View File

@@ -8,7 +8,7 @@ use common::{HasLen, OwnedBytes};
use crate::column::{BytesColumn, Column, StrColumn};
use crate::column_values::{monotonic_map_column, StrictlyMonotonicFn};
use crate::columnar::ColumnType;
use crate::{DateTime, NumericalType};
use crate::{Cardinality, DateTime, NumericalType};
#[derive(Clone)]
pub enum DynamicColumn {
@@ -23,6 +23,18 @@ pub enum DynamicColumn {
}
impl DynamicColumn {
pub fn get_cardinality(&self) -> Cardinality {
match self {
DynamicColumn::Bool(c) => c.get_cardinality(),
DynamicColumn::I64(c) => c.get_cardinality(),
DynamicColumn::U64(c) => c.get_cardinality(),
DynamicColumn::F64(c) => c.get_cardinality(),
DynamicColumn::IpAddr(c) => c.get_cardinality(),
DynamicColumn::DateTime(c) => c.get_cardinality(),
DynamicColumn::Bytes(c) => c.ords().get_cardinality(),
DynamicColumn::Str(c) => c.ords().get_cardinality(),
}
}
pub fn column_type(&self) -> ColumnType {
match self {
DynamicColumn::Bool(_) => ColumnType::Bool,

View File

@@ -62,6 +62,12 @@ pub enum Cardinality {
}
impl Cardinality {
pub fn is_optional(&self) -> bool {
matches!(self, Cardinality::Optional)
}
pub fn is_multivalue(&self) -> bool {
matches!(self, Cardinality::Multivalued)
}
pub(crate) fn to_code(self) -> u8 {
self as u8
}

View File

@@ -30,6 +30,7 @@ use crate::{BlockAddr, DeltaReader, Reader, SSTable, SSTableIndex, TermOrdinal};
/// block boundary.
///
/// (See also README.md)
#[derive(Debug, Clone)]
pub struct Dictionary<TSSTable: SSTable> {
pub sstable_slice: FileSlice,
pub sstable_index: SSTableIndex,

View File

@@ -117,6 +117,7 @@ impl SSTable for MonotonicU64SSTable {
/// `range_sstable[k1].end == range_sstable[k2].start`.
///
/// The first range is not required to start at `0`.
#[derive(Clone, Copy, Debug)]
pub struct RangeSSTable;
impl SSTable for RangeSSTable {

View File

@@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize};
use crate::{common_prefix_len, SSTableDataCorruption, TermOrdinal};
#[derive(Default, Debug, Serialize, Deserialize)]
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub struct SSTableIndex {
blocks: Vec<BlockMeta>,
}
@@ -75,7 +75,7 @@ pub struct BlockAddr {
pub first_ordinal: u64,
}
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct BlockMeta {
/// Any byte string that is lexicographically greater or equal to
/// the last key in the block,