mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-02-15 04:10:36 +00:00
Compare commits
2 Commits
column-han
...
columnar-c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1a72844048 | ||
|
|
d91df6cc7e |
2
.gitignore
vendored
2
.gitignore
vendored
@@ -13,3 +13,5 @@ benchmark
|
||||
.idea
|
||||
trace.dat
|
||||
cargo-timing*
|
||||
columnar/columnar-cli/*.json
|
||||
**/perf.data*
|
||||
|
||||
@@ -10,6 +10,7 @@ serde_json = "1"
|
||||
thiserror = "1"
|
||||
fnv = "1"
|
||||
sstable = { path = "../sstable", package = "tantivy-sstable" }
|
||||
zstd = "0.12"
|
||||
common = { path = "../common", package = "tantivy-common" }
|
||||
fastfield_codecs = { path = "../fastfield_codecs"}
|
||||
itertools = "0.10"
|
||||
|
||||
@@ -16,36 +16,42 @@ and different cardinality `(required, optional, multivalued)`.
|
||||
|
||||
# Coercion rules
|
||||
|
||||
Users can create a columnar by inserting rows to a `ColumnarWriter`,
|
||||
and serializing it into a `Write` object.
|
||||
Nothing prevents a user from recording values with different type to the same `column_name`.
|
||||
Users can create a columnar by appending rows to a writer.
|
||||
Nothing prevents a user from recording values with different to a same `column_key`.
|
||||
|
||||
In that case, `tantivy-columnar`'s behavior is as follows:
|
||||
- JsonValues are grouped into 3 types (String, Number, bool).
|
||||
Values that corresponds to different groups are mapped to different columns. For instance, String values are treated independently
|
||||
from Number or boolean values. `tantivy-columnar` will simply emit several columns associated to a given column_name.
|
||||
- Only one column for a given json value type is emitted. If number values with different number types are recorded (e.g. u64, i64, f64),
|
||||
`tantivy-columnar` will pick the first type that can represents the set of appended value, with the following prioriy order (`i64`, `u64`, `f64`).
|
||||
`i64` is picked over `u64` as it is likely to yield less change of types. Most use cases strictly requiring `u64` show the
|
||||
restriction on 50% of the values (e.g. a 64-bit hash). On the other hand, a lot of use cases can show rare negative value.
|
||||
- Values that corresponds to different JsonValue type are mapped to different columns. For instance, String values are treated independently from Number or boolean values. `tantivy-columnar` will simply emit several columns associated to a given column_name.
|
||||
- Only one column for a given json value type is emitted. If number values with different number types are recorded (e.g. u64, i64, f64), `tantivy-columnar` will pick the first type that can represents the set of appended value, with the following prioriy order (`i64`, `u64`, `f64`). `i64` is picked over `u64` as it is likely to yield less change of types. Most use cases strictly requiring `u64` show the restriction on 50% of the values (e.g. a 64-bit hash). On the other hand, a lot of use cases can show rare negative value.
|
||||
|
||||
# Columnar format
|
||||
|
||||
This columnar format may have more than one column (with different types) associated to the same `column_name` (see [Coercion rules](#coercion-rules) above).
|
||||
The `(column_name, columne_type)` couple however uniquely identifies a column.
|
||||
That couple is serialized as a column `column_key`. The format of that key is:
|
||||
Because this columnar format tries to avoid some coercion.
|
||||
There can be several columns (with different type) associated to a single `column_name`.
|
||||
|
||||
Each column is associated to `column_key`.
|
||||
The format of that key is:
|
||||
`[column_name][ZERO_BYTE][column_type_header: u8]`
|
||||
|
||||
```
|
||||
COLUMNAR:=
|
||||
[COLUMNAR_DATA]
|
||||
[COLUMNAR_KEY_TO_DATA_INDEX]
|
||||
[COLUMNAR_INDEX]
|
||||
[COLUMNAR_FOOTER];
|
||||
|
||||
|
||||
# Columns are sorted by their column key.
|
||||
COLUMNAR_DATA:=
|
||||
[COLUMN_DATA]+;
|
||||
[COLUMN]+;
|
||||
|
||||
COLUMN:=
|
||||
COMPRESSED_COLUMN | NON_COMPRESSED_COLUMN;
|
||||
|
||||
# COLUMN_DATA is compressed when it exceeds a threshold of 100KB.
|
||||
|
||||
COMPRESSED_COLUMN := [b'1'][zstd(COLUMN_DATA)]
|
||||
NON_COMPRESSED_COLUMN:= [b'0'][COLUMN_DATA]
|
||||
|
||||
COLUMNAR_INDEX := [RANGE_SSTABLE_BYTES]
|
||||
|
||||
COLUMNAR_FOOTER := [RANGE_SSTABLE_BYTES_LEN: 8 bytes little endian]
|
||||
|
||||
@@ -54,10 +60,10 @@ COLUMNAR_FOOTER := [RANGE_SSTABLE_BYTES_LEN: 8 bytes little endian]
|
||||
The columnar file starts by the actual column data, concatenated one after the other,
|
||||
sorted by column key.
|
||||
|
||||
A sstable associates
|
||||
A quickwit/tantivy style sstable associates
|
||||
`(column names, column_cardinality, column_type) to range of bytes.
|
||||
|
||||
Column name may not contain the zero byte `\0`.
|
||||
Column name may not contain the zero byte.
|
||||
|
||||
Listing all columns associated to `column_name` can therefore
|
||||
be done by listing all keys prefixed by
|
||||
|
||||
17
columnar/columnar-cli/Cargo.toml
Normal file
17
columnar/columnar-cli/Cargo.toml
Normal file
@@ -0,0 +1,17 @@
|
||||
[package]
|
||||
name = "tantivy-columnar-cli"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
license = "MIT"
|
||||
|
||||
[dependencies]
|
||||
columnar = {path="../", package="tantivy-columnar"}
|
||||
serde_json = "1"
|
||||
serde_json_borrow = {git="https://github.com/PSeitz/serde_json_borrow/"}
|
||||
serde = "1"
|
||||
|
||||
[workspace]
|
||||
members = []
|
||||
|
||||
[profile.release]
|
||||
debug = true
|
||||
126
columnar/columnar-cli/src/main.rs
Normal file
126
columnar/columnar-cli/src/main.rs
Normal file
@@ -0,0 +1,126 @@
|
||||
use columnar::ColumnarWriter;
|
||||
use columnar::NumericalValue;
|
||||
use serde_json_borrow;
|
||||
use std::fs::File;
|
||||
use std::io;
|
||||
use std::io::BufRead;
|
||||
use std::io::BufReader;
|
||||
use std::time::Instant;
|
||||
|
||||
#[derive(Default)]
|
||||
struct JsonStack {
|
||||
path: String,
|
||||
stack: Vec<usize>,
|
||||
}
|
||||
|
||||
impl JsonStack {
|
||||
fn push(&mut self, seg: &str) {
|
||||
let len = self.path.len();
|
||||
self.stack.push(len);
|
||||
self.path.push('.');
|
||||
self.path.push_str(seg);
|
||||
}
|
||||
|
||||
fn pop(&mut self) {
|
||||
if let Some(len) = self.stack.pop() {
|
||||
self.path.truncate(len);
|
||||
}
|
||||
}
|
||||
|
||||
fn path(&self) -> &str {
|
||||
&self.path[1..]
|
||||
}
|
||||
}
|
||||
|
||||
fn append_json_to_columnar(
|
||||
doc: u32,
|
||||
json_value: &serde_json_borrow::Value,
|
||||
columnar: &mut ColumnarWriter,
|
||||
stack: &mut JsonStack,
|
||||
) -> usize {
|
||||
let mut count = 0;
|
||||
match json_value {
|
||||
serde_json_borrow::Value::Null => {}
|
||||
serde_json_borrow::Value::Bool(val) => {
|
||||
columnar.record_numerical(
|
||||
doc,
|
||||
stack.path(),
|
||||
NumericalValue::from(if *val { 1u64 } else { 0u64 }),
|
||||
);
|
||||
count += 1;
|
||||
}
|
||||
serde_json_borrow::Value::Number(num) => {
|
||||
let numerical_value: NumericalValue = if let Some(num_i64) = num.as_i64() {
|
||||
num_i64.into()
|
||||
} else if let Some(num_u64) = num.as_u64() {
|
||||
num_u64.into()
|
||||
} else if let Some(num_f64) = num.as_f64() {
|
||||
num_f64.into()
|
||||
} else {
|
||||
panic!();
|
||||
};
|
||||
count += 1;
|
||||
columnar.record_numerical(
|
||||
doc,
|
||||
stack.path(),
|
||||
numerical_value,
|
||||
);
|
||||
}
|
||||
serde_json_borrow::Value::Str(msg) => {
|
||||
columnar.record_str(
|
||||
doc,
|
||||
stack.path(),
|
||||
msg.as_bytes(),
|
||||
);
|
||||
count += 1;
|
||||
},
|
||||
serde_json_borrow::Value::Array(vals) => {
|
||||
for val in vals {
|
||||
count += append_json_to_columnar(doc, val, columnar, stack);
|
||||
}
|
||||
},
|
||||
serde_json_borrow::Value::Object(json_map) => {
|
||||
for (child_key, child_val) in json_map {
|
||||
stack.push(child_key);
|
||||
count += append_json_to_columnar(doc, child_val, columnar, stack);
|
||||
stack.pop();
|
||||
}
|
||||
},
|
||||
}
|
||||
count
|
||||
}
|
||||
|
||||
fn main() -> io::Result<()> {
|
||||
let file = File::open("gh_small.json")?;
|
||||
let mut reader = BufReader::new(file);
|
||||
let mut line = String::with_capacity(100);
|
||||
let mut columnar = columnar::ColumnarWriter::default();
|
||||
let mut doc = 0;
|
||||
let start = Instant::now();
|
||||
let mut stack = JsonStack::default();
|
||||
let mut total_count = 0;
|
||||
loop {
|
||||
line.clear();
|
||||
let len = reader.read_line(&mut line)?;
|
||||
if len == 0 {
|
||||
break;
|
||||
}
|
||||
let Ok(json_value) = serde_json::from_str::<serde_json_borrow::Value>(&line) else { continue; };
|
||||
total_count += append_json_to_columnar(doc, &json_value, &mut columnar, &mut stack);
|
||||
doc += 1;
|
||||
}
|
||||
println!("value count {total_count}");
|
||||
println!("record {:?}", start.elapsed());
|
||||
let mut buffer = Vec::new();
|
||||
columnar.serialize(doc, &mut buffer)?;
|
||||
println!("num docs: {doc}, {:?}", start.elapsed());
|
||||
println!("buffer len {} MB", buffer.len() / 1_000_000);
|
||||
let columnar = columnar::ColumnarReader::open(buffer)?;
|
||||
for (column_name, typ, offsets, num_bytes) in columnar.list_columns()? {
|
||||
if num_bytes>1_000_000 {
|
||||
println!("{column_name} {typ:?} {offsets:?} {}", num_bytes / 1_000_000);
|
||||
}
|
||||
}
|
||||
println!("{} columns", columnar.num_columns());
|
||||
Ok(())
|
||||
}
|
||||
@@ -1,11 +1,8 @@
|
||||
use crate::utils::{place_bits, select_bits};
|
||||
use crate::value::NumericalType;
|
||||
use crate::InvalidData;
|
||||
|
||||
/// Enum describing the number of values that can exist per document
|
||||
/// (or per row if you will).
|
||||
///
|
||||
/// The cardinality must fit on 2 bits.
|
||||
#[derive(Clone, Copy, Hash, Default, Debug, PartialEq, Eq, PartialOrd, Ord)]
|
||||
#[repr(u8)]
|
||||
pub enum Cardinality {
|
||||
@@ -23,20 +20,16 @@ impl Cardinality {
|
||||
self as u8
|
||||
}
|
||||
|
||||
pub(crate) fn try_from_code(code: u8) -> Result<Cardinality, InvalidData> {
|
||||
pub(crate) fn try_from_code(code: u8) -> Option<Cardinality> {
|
||||
match code {
|
||||
0 => Ok(Cardinality::Required),
|
||||
1 => Ok(Cardinality::Optional),
|
||||
2 => Ok(Cardinality::Multivalued),
|
||||
_ => Err(InvalidData),
|
||||
0 => Some(Cardinality::Required),
|
||||
1 => Some(Cardinality::Optional),
|
||||
2 => Some(Cardinality::Multivalued),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The column type represents the column type and can fit on 6-bits.
|
||||
///
|
||||
/// - bits[0..3]: Column category type.
|
||||
/// - bits[3..6]: Numerical type if necessary.
|
||||
#[derive(Hash, Eq, PartialEq, Debug, Clone, Copy)]
|
||||
pub enum ColumnType {
|
||||
Bytes,
|
||||
@@ -47,79 +40,73 @@ pub enum ColumnType {
|
||||
impl ColumnType {
|
||||
/// Encoded over 6 bits.
|
||||
pub(crate) fn to_code(self) -> u8 {
|
||||
let column_type_category;
|
||||
let numerical_type_code: u8;
|
||||
let high_type;
|
||||
let low_code: u8;
|
||||
match self {
|
||||
ColumnType::Bytes => {
|
||||
column_type_category = ColumnTypeCategory::Str;
|
||||
numerical_type_code = 0u8;
|
||||
high_type = GeneralType::Str;
|
||||
low_code = 0u8;
|
||||
}
|
||||
ColumnType::Numerical(numerical_type) => {
|
||||
column_type_category = ColumnTypeCategory::Numerical;
|
||||
numerical_type_code = numerical_type.to_code();
|
||||
high_type = GeneralType::Numerical;
|
||||
low_code = numerical_type.to_code();
|
||||
}
|
||||
ColumnType::Bool => {
|
||||
column_type_category = ColumnTypeCategory::Bool;
|
||||
numerical_type_code = 0u8;
|
||||
high_type = GeneralType::Bool;
|
||||
low_code = 0u8;
|
||||
}
|
||||
}
|
||||
place_bits::<0, 3>(column_type_category.to_code()) | place_bits::<3, 6>(numerical_type_code)
|
||||
place_bits::<3, 6>(high_type.to_code()) | place_bits::<0, 3>(low_code)
|
||||
}
|
||||
|
||||
pub(crate) fn try_from_code(code: u8) -> Result<ColumnType, InvalidData> {
|
||||
pub(crate) fn try_from_code(code: u8) -> Option<ColumnType> {
|
||||
if select_bits::<6, 8>(code) != 0u8 {
|
||||
return Err(InvalidData);
|
||||
return None;
|
||||
}
|
||||
let column_type_category_code = select_bits::<0, 3>(code);
|
||||
let numerical_type_code = select_bits::<3, 6>(code);
|
||||
let column_type_category = ColumnTypeCategory::try_from_code(column_type_category_code)?;
|
||||
match column_type_category {
|
||||
ColumnTypeCategory::Bool => {
|
||||
if numerical_type_code != 0u8 {
|
||||
return Err(InvalidData);
|
||||
let high_code = select_bits::<3, 6>(code);
|
||||
let low_code = select_bits::<0, 3>(code);
|
||||
let high_type = GeneralType::try_from_code(high_code)?;
|
||||
match high_type {
|
||||
GeneralType::Bool => {
|
||||
if low_code != 0u8 {
|
||||
return None;
|
||||
}
|
||||
Ok(ColumnType::Bool)
|
||||
Some(ColumnType::Bool)
|
||||
}
|
||||
ColumnTypeCategory::Str => {
|
||||
if numerical_type_code != 0u8 {
|
||||
return Err(InvalidData);
|
||||
GeneralType::Str => {
|
||||
if low_code != 0u8 {
|
||||
return None;
|
||||
}
|
||||
Ok(ColumnType::Bytes)
|
||||
Some(ColumnType::Bytes)
|
||||
}
|
||||
ColumnTypeCategory::Numerical => {
|
||||
let numerical_type = NumericalType::try_from_code(numerical_type_code)?;
|
||||
Ok(ColumnType::Numerical(numerical_type))
|
||||
GeneralType::Numerical => {
|
||||
let numerical_type = NumericalType::try_from_code(low_code)?;
|
||||
Some(ColumnType::Numerical(numerical_type))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Column types are grouped into different categories that
|
||||
/// corresponds to the different types of `JsonValue` types.
|
||||
///
|
||||
/// The columnar writer will apply coercion rules to make sure that
|
||||
/// at most one column exist per `ColumnTypeCategory`.
|
||||
///
|
||||
/// See also [README.md].
|
||||
/// This corresponds to the JsonType.
|
||||
#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Debug)]
|
||||
#[repr(u8)]
|
||||
pub(crate) enum ColumnTypeCategory {
|
||||
pub(crate) enum GeneralType {
|
||||
Bool = 0u8,
|
||||
Str = 1u8,
|
||||
Numerical = 2u8,
|
||||
}
|
||||
|
||||
impl ColumnTypeCategory {
|
||||
impl GeneralType {
|
||||
pub fn to_code(self) -> u8 {
|
||||
self as u8
|
||||
}
|
||||
|
||||
pub fn try_from_code(code: u8) -> Result<Self, InvalidData> {
|
||||
pub fn try_from_code(code: u8) -> Option<Self> {
|
||||
match code {
|
||||
0u8 => Ok(Self::Bool),
|
||||
1u8 => Ok(Self::Str),
|
||||
2u8 => Ok(Self::Numerical),
|
||||
_ => Err(InvalidData),
|
||||
0u8 => Some(Self::Bool),
|
||||
1u8 => Some(Self::Str),
|
||||
2u8 => Some(Self::Numerical),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -128,12 +115,12 @@ impl ColumnTypeCategory {
|
||||
/// This is encoded over one-byte and added to a column key in the
|
||||
/// columnar sstable.
|
||||
///
|
||||
/// - [0..6] bits: encodes the column type
|
||||
/// - [6..8] bits: encodes the cardinality
|
||||
/// Cardinality is encoded as the first two highest two bits.
|
||||
/// The low 6 bits encode the column type.
|
||||
#[derive(Eq, Hash, PartialEq, Debug, Copy, Clone)]
|
||||
pub struct ColumnTypeAndCardinality {
|
||||
pub typ: ColumnType,
|
||||
pub cardinality: Cardinality,
|
||||
pub typ: ColumnType,
|
||||
}
|
||||
|
||||
impl ColumnTypeAndCardinality {
|
||||
@@ -141,13 +128,13 @@ impl ColumnTypeAndCardinality {
|
||||
place_bits::<6, 8>(self.cardinality.to_code()) | place_bits::<0, 6>(self.typ.to_code())
|
||||
}
|
||||
|
||||
pub fn try_from_code(code: u8) -> Result<ColumnTypeAndCardinality, InvalidData> {
|
||||
pub fn try_from_code(code: u8) -> Option<ColumnTypeAndCardinality> {
|
||||
let typ_code = select_bits::<0, 6>(code);
|
||||
let cardinality_code = select_bits::<6, 8>(code);
|
||||
let cardinality = Cardinality::try_from_code(cardinality_code)?;
|
||||
let typ = ColumnType::try_from_code(typ_code)?;
|
||||
assert_eq!(typ.to_code(), typ_code);
|
||||
Ok(ColumnTypeAndCardinality { cardinality, typ })
|
||||
Some(ColumnTypeAndCardinality { cardinality, typ })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -162,7 +149,7 @@ mod tests {
|
||||
fn test_column_type_header_to_code() {
|
||||
let mut column_type_header_set: HashSet<ColumnTypeAndCardinality> = HashSet::new();
|
||||
for code in u8::MIN..=u8::MAX {
|
||||
if let Ok(column_type_header) = ColumnTypeAndCardinality::try_from_code(code) {
|
||||
if let Some(column_type_header) = ColumnTypeAndCardinality::try_from_code(code) {
|
||||
assert_eq!(column_type_header.to_code(), code);
|
||||
assert!(column_type_header_set.insert(column_type_header));
|
||||
}
|
||||
@@ -178,7 +165,7 @@ mod tests {
|
||||
fn test_column_type_to_code() {
|
||||
let mut column_type_set: HashSet<ColumnType> = HashSet::new();
|
||||
for code in u8::MIN..=u8::MAX {
|
||||
if let Ok(column_type) = ColumnType::try_from_code(code) {
|
||||
if let Some(column_type) = ColumnType::try_from_code(code) {
|
||||
assert_eq!(column_type.to_code(), code);
|
||||
assert!(column_type_set.insert(column_type));
|
||||
}
|
||||
@@ -190,7 +177,8 @@ mod tests {
|
||||
fn test_cardinality_to_code() {
|
||||
let mut num_cardinality = 0;
|
||||
for code in u8::MIN..=u8::MAX {
|
||||
if let Ok(cardinality) = Cardinality::try_from_code(code) {
|
||||
let cardinality_opt = Cardinality::try_from_code(code);
|
||||
if let Some(cardinality) = cardinality_opt {
|
||||
assert_eq!(cardinality.to_code(), code);
|
||||
num_cardinality += 1;
|
||||
}
|
||||
|
||||
@@ -3,11 +3,11 @@ use std::io;
|
||||
use fnv::FnvHashMap;
|
||||
use sstable::SSTable;
|
||||
|
||||
pub(crate) struct TermIdMapping {
|
||||
pub(crate) struct IdMapping {
|
||||
unordered_to_ord: Vec<OrderedId>,
|
||||
}
|
||||
|
||||
impl TermIdMapping {
|
||||
impl IdMapping {
|
||||
pub fn to_ord(&self, unordered: UnorderedId) -> OrderedId {
|
||||
self.unordered_to_ord[unordered.0 as usize]
|
||||
}
|
||||
@@ -48,7 +48,7 @@ impl DictionaryBuilder {
|
||||
|
||||
/// Serialize the dictionary into an fst, and returns the
|
||||
/// `UnorderedId -> TermOrdinal` map.
|
||||
pub fn serialize<'a, W: io::Write + 'a>(&self, wrt: &mut W) -> io::Result<TermIdMapping> {
|
||||
pub fn serialize<'a, W: io::Write + 'a>(&self, wrt: &mut W) -> io::Result<IdMapping> {
|
||||
let mut terms: Vec<(&[u8], UnorderedId)> =
|
||||
self.dict.iter().map(|(k, v)| (k.as_slice(), *v)).collect();
|
||||
terms.sort_unstable_by_key(|(key, _)| *key);
|
||||
@@ -61,7 +61,7 @@ impl DictionaryBuilder {
|
||||
unordered_to_ord[unordered_id.0 as usize] = ordered_id;
|
||||
}
|
||||
sstable_builder.finish()?;
|
||||
Ok(TermIdMapping { unordered_to_ord })
|
||||
Ok(IdMapping { unordered_to_ord })
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -9,35 +9,34 @@ pub use column_type_header::Cardinality;
|
||||
pub use reader::ColumnarReader;
|
||||
pub use value::{NumericalType, NumericalValue};
|
||||
pub use writer::ColumnarWriter;
|
||||
pub use reader::ColumnHandle;
|
||||
|
||||
pub type DocId = u32;
|
||||
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub struct InvalidData;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::ops::Range;
|
||||
|
||||
use common::file_slice::FileSlice;
|
||||
use crate::column_type_header::ColumnType;
|
||||
use crate::reader::{ColumnarReader, ColumnHandle};
|
||||
|
||||
use crate::column_type_header::{ColumnType, ColumnTypeAndCardinality};
|
||||
use crate::reader::ColumnarReader;
|
||||
use crate::value::NumericalValue;
|
||||
use crate::{Cardinality, ColumnarWriter};
|
||||
|
||||
#[test]
|
||||
fn test_dataframe_writer_bytes() {
|
||||
let mut dataframe_writer = ColumnarWriter::default();
|
||||
dataframe_writer.record_str(1u32, "my_string", "hello");
|
||||
dataframe_writer.record_str(3u32, "my_string", "helloeee");
|
||||
dataframe_writer.record_str(1u32, "my_string", b"hello");
|
||||
dataframe_writer.record_str(3u32, "my_string", b"helloeee");
|
||||
let mut buffer: Vec<u8> = Vec::new();
|
||||
dataframe_writer.serialize(5, &mut buffer).unwrap();
|
||||
let columnar_fileslice = FileSlice::from(buffer);
|
||||
let columnar = ColumnarReader::open(columnar_fileslice).unwrap();
|
||||
assert_eq!(columnar.num_columns(), 1);
|
||||
let cols: Vec<ColumnHandle> =
|
||||
let cols: Vec<(ColumnTypeAndCardinality, Range<u64>)> =
|
||||
columnar.read_columns("my_string").unwrap();
|
||||
assert_eq!(cols.len(), 1);
|
||||
assert_eq!(cols[0].num_bytes(), 158);
|
||||
assert_eq!(cols[0].1, 0..159);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -49,21 +48,17 @@ mod tests {
|
||||
let columnar_fileslice = FileSlice::from(buffer);
|
||||
let columnar = ColumnarReader::open(columnar_fileslice).unwrap();
|
||||
assert_eq!(columnar.num_columns(), 1);
|
||||
let cols: Vec<ColumnHandle> =
|
||||
let cols: Vec<(ColumnTypeAndCardinality, Range<u64>)> =
|
||||
columnar.read_columns("bool.value").unwrap();
|
||||
assert_eq!(cols.len(), 1);
|
||||
let col = cols.into_iter().next().unwrap();
|
||||
assert_eq!(
|
||||
col.column_type(),
|
||||
ColumnType::Bool
|
||||
);
|
||||
assert_eq!(
|
||||
col.cardinality(),
|
||||
Cardinality::Optional);
|
||||
assert_eq!(
|
||||
col.column_name(),
|
||||
"bool.value"
|
||||
cols[0].0,
|
||||
ColumnTypeAndCardinality {
|
||||
cardinality: Cardinality::Optional,
|
||||
typ: ColumnType::Bool
|
||||
}
|
||||
);
|
||||
assert_eq!(cols[0].1, 0..22);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -77,7 +72,7 @@ mod tests {
|
||||
let columnar_fileslice = FileSlice::from(buffer);
|
||||
let columnar = ColumnarReader::open(columnar_fileslice).unwrap();
|
||||
assert_eq!(columnar.num_columns(), 1);
|
||||
let cols: Vec<ColumnHandle> =
|
||||
let cols: Vec<(ColumnTypeAndCardinality, Range<u64>)> =
|
||||
columnar.read_columns("srical.value").unwrap();
|
||||
assert_eq!(cols.len(), 1);
|
||||
// Right now this 31 bytes are spent as follows
|
||||
@@ -86,6 +81,6 @@ mod tests {
|
||||
// - vals 8 //< due to padding? could have been 1byte?.
|
||||
// - null footer 6 bytes
|
||||
// - version footer 3 bytes // Should be file-wide
|
||||
assert_eq!(cols[0].num_bytes(), 31);
|
||||
assert_eq!(cols[0].1, 0..32);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,42 +0,0 @@
|
||||
use common::HasLen;
|
||||
use common::file_slice::FileSlice;
|
||||
|
||||
use crate::Cardinality;
|
||||
use crate::column_type_header::ColumnType;
|
||||
|
||||
|
||||
pub struct ColumnHandle {
|
||||
column_name: String, //< Mostly for debug and display.
|
||||
data: FileSlice,
|
||||
column_type: ColumnType,
|
||||
cardinality: Cardinality,
|
||||
}
|
||||
|
||||
impl ColumnHandle {
|
||||
pub fn new(column_name: String, data: FileSlice, column_type: ColumnType, cardinality: Cardinality) -> Self {
|
||||
ColumnHandle {
|
||||
column_name,
|
||||
data,
|
||||
column_type,
|
||||
cardinality,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn column_name(&self) -> &str {
|
||||
self.column_name.as_str()
|
||||
}
|
||||
|
||||
pub fn num_bytes(&self) -> usize {
|
||||
self.data.len()
|
||||
}
|
||||
|
||||
pub fn column_type(&self) -> ColumnType {
|
||||
self.column_type
|
||||
}
|
||||
|
||||
pub fn cardinality(&self) -> Cardinality {
|
||||
self.cardinality
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
mod column_handle;
|
||||
|
||||
use std::ops::Range;
|
||||
use std::{io, mem};
|
||||
|
||||
@@ -8,11 +6,10 @@ use common::BinarySerializable;
|
||||
use sstable::{Dictionary, RangeSSTable};
|
||||
|
||||
use crate::column_type_header::ColumnTypeAndCardinality;
|
||||
pub use crate::reader::column_handle::ColumnHandle;
|
||||
|
||||
fn io_invalid_data(msg: String) -> io::Error {
|
||||
io::Error::new(io::ErrorKind::InvalidData, msg)
|
||||
// {key_bytes:?}")));
|
||||
io::Error::new(io::ErrorKind::InvalidData, msg) // format!("Invalid key found.
|
||||
// {key_bytes:?}")));
|
||||
}
|
||||
|
||||
/// The ColumnarReader makes it possible to access a set of columns
|
||||
@@ -53,7 +50,7 @@ impl ColumnarReader {
|
||||
let key_bytes: &[u8] = stream.key();
|
||||
let column_code: u8 = key_bytes.last().cloned().unwrap();
|
||||
let column_type_and_cardinality = ColumnTypeAndCardinality::try_from_code(column_code)
|
||||
.map_err(|_| io_invalid_data(format!("Unknown column code `{column_code}`")))?;
|
||||
.ok_or_else(|| io_invalid_data(format!("Unknown column code `{column_code}`")))?;
|
||||
let range = stream.value().clone();
|
||||
let column_name = String::from_utf8_lossy(&key_bytes[..key_bytes.len() - 1]);
|
||||
let range_len = range.end - range.start;
|
||||
@@ -67,26 +64,15 @@ impl ColumnarReader {
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
/// Get all columns for the given column name.
|
||||
///
|
||||
/// There can be more than one column associated to a given column name, provided they have
|
||||
/// different types.
|
||||
/// Get all columns for the given field_name.
|
||||
// TODO fix ugly API
|
||||
pub fn read_columns(
|
||||
&self,
|
||||
column_name: &str,
|
||||
) -> io::Result<Vec<ColumnHandle>> {
|
||||
// Each column is a associated to a given `column_key`,
|
||||
// that starts by `column_name\0column_header`.
|
||||
//
|
||||
// Listing the columns associate to the given column name is therefore equivalent to listing
|
||||
// `column_key` with the prefix `column_name\0`.
|
||||
//
|
||||
// This is in turn equivalent to searching for the range
|
||||
// `[column_name,\0`..column_name\1)`.
|
||||
let mut start_key = column_name.to_string();
|
||||
field_name: &str,
|
||||
) -> io::Result<Vec<(ColumnTypeAndCardinality, Range<u64>)>> {
|
||||
let mut start_key = field_name.to_string();
|
||||
start_key.push('\0');
|
||||
let mut end_key = column_name.to_string();
|
||||
let mut end_key = field_name.to_string();
|
||||
end_key.push(1u8 as char);
|
||||
let mut stream = self
|
||||
.column_dictionary
|
||||
@@ -94,17 +80,17 @@ impl ColumnarReader {
|
||||
.ge(start_key.as_bytes())
|
||||
.lt(end_key.as_bytes())
|
||||
.into_stream()?;
|
||||
let mut results: Vec<ColumnHandle> = Vec::new();
|
||||
let mut results = Vec::new();
|
||||
while stream.advance() {
|
||||
let key_bytes: &[u8] = stream.key();
|
||||
assert!(key_bytes.starts_with(start_key.as_bytes()));
|
||||
if !key_bytes.starts_with(start_key.as_bytes()) {
|
||||
return Err(io_invalid_data(format!("Invalid key found. {key_bytes:?}")));
|
||||
}
|
||||
let column_code: u8 = key_bytes.last().cloned().unwrap();
|
||||
let column_type_and_cardinality = ColumnTypeAndCardinality::try_from_code(column_code)
|
||||
.map_err(|_| io_invalid_data(format!("Unknown column code `{column_code}`")))?;
|
||||
let Range { start, end } = stream.value().clone();
|
||||
let column_data = self.column_data.slice(start as usize..end as usize);
|
||||
let column_handle = ColumnHandle::new(column_name.to_string(), column_data, column_type_and_cardinality.typ, column_type_and_cardinality.cardinality);
|
||||
results.push(column_handle);
|
||||
.ok_or_else(|| io_invalid_data(format!("Unknown column code `{column_code}`")))?;
|
||||
let range = stream.value().clone();
|
||||
results.push((column_type_and_cardinality, range));
|
||||
}
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
use crate::InvalidData;
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq)]
|
||||
pub enum NumericalValue {
|
||||
I64(i64),
|
||||
@@ -51,12 +49,12 @@ impl NumericalType {
|
||||
self as u8
|
||||
}
|
||||
|
||||
pub fn try_from_code(code: u8) -> Result<NumericalType, InvalidData> {
|
||||
pub fn try_from_code(code: u8) -> Option<NumericalType> {
|
||||
match code {
|
||||
0 => Ok(NumericalType::I64),
|
||||
1 => Ok(NumericalType::U64),
|
||||
2 => Ok(NumericalType::F64),
|
||||
_ => Err(InvalidData),
|
||||
0 => Some(NumericalType::I64),
|
||||
1 => Some(NumericalType::U64),
|
||||
2 => Some(NumericalType::F64),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -64,7 +62,6 @@ impl NumericalType {
|
||||
/// We voluntarily avoid using `Into` here to keep this
|
||||
/// implementation quirk as private as possible.
|
||||
///
|
||||
/// # Panics
|
||||
/// This coercion trait actually panics if it is used
|
||||
/// to convert a loose types to a stricter type.
|
||||
///
|
||||
@@ -114,7 +111,7 @@ mod tests {
|
||||
fn test_numerical_type_code() {
|
||||
let mut num_numerical_type = 0;
|
||||
for code in u8::MIN..=u8::MAX {
|
||||
if let Ok(numerical_type) = NumericalType::try_from_code(code) {
|
||||
if let Some(numerical_type) = NumericalType::try_from_code(code) {
|
||||
assert_eq!(numerical_type.to_code(), code);
|
||||
num_numerical_type += 1;
|
||||
}
|
||||
|
||||
@@ -77,7 +77,7 @@ impl ColumnWriter {
|
||||
// The overall number of docs in the column is necessary to
|
||||
// deal with the case where the all docs contain 1 value, except some documents
|
||||
// at the end of the column.
|
||||
pub(crate) fn get_cardinality(&self, num_docs: DocId) -> Cardinality {
|
||||
pub fn get_cardinality(&self, num_docs: DocId) -> Cardinality {
|
||||
match delta_with_last_doc(self.last_doc_opt, num_docs) {
|
||||
DocumentStep::SameDoc | DocumentStep::NextDoc => self.cardinality,
|
||||
DocumentStep::SkippedDoc => self.cardinality.max(Cardinality::Optional),
|
||||
@@ -105,7 +105,7 @@ pub(crate) struct NumericalColumnWriter {
|
||||
/// State used to store what types are still acceptable
|
||||
/// after having seen a set of numerical values.
|
||||
#[derive(Clone, Copy)]
|
||||
struct CompatibleNumericalTypes {
|
||||
pub(crate) struct CompatibleNumericalTypes {
|
||||
all_values_within_i64_range: bool,
|
||||
all_values_within_u64_range: bool,
|
||||
// f64 is always acceptable.
|
||||
@@ -155,7 +155,6 @@ impl NumericalColumnWriter {
|
||||
let cardinality = self.column_writer.get_cardinality(num_docs);
|
||||
(numerical_type, cardinality)
|
||||
}
|
||||
|
||||
pub fn record_numerical_value(
|
||||
&mut self,
|
||||
doc: DocId,
|
||||
@@ -176,13 +175,13 @@ impl NumericalColumnWriter {
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Default)]
|
||||
pub(crate) struct StrColumnWriter {
|
||||
pub struct StrColumnWriter {
|
||||
pub(crate) dictionary_id: u32,
|
||||
pub(crate) column_writer: ColumnWriter,
|
||||
}
|
||||
|
||||
impl StrColumnWriter {
|
||||
pub(crate) fn with_dictionary_id(dictionary_id: u32) -> StrColumnWriter {
|
||||
pub fn with_dictionary_id(dictionary_id: u32) -> StrColumnWriter {
|
||||
StrColumnWriter {
|
||||
dictionary_id,
|
||||
column_writer: Default::default(),
|
||||
|
||||
@@ -11,13 +11,17 @@ use fastfield_codecs::{Column, MonotonicallyMappableToU64, VecColumn};
|
||||
use serializer::ColumnarSerializer;
|
||||
use stacker::{Addr, ArenaHashMap, MemoryArena};
|
||||
|
||||
use crate::column_type_header::{ColumnType, ColumnTypeAndCardinality, ColumnTypeCategory};
|
||||
use crate::dictionary::{DictionaryBuilder, TermIdMapping, UnorderedId};
|
||||
use crate::column_type_header::{ColumnType, ColumnTypeAndCardinality, GeneralType};
|
||||
use crate::dictionary::{DictionaryBuilder, IdMapping, UnorderedId};
|
||||
use crate::value::{Coerce, NumericalType, NumericalValue};
|
||||
use crate::writer::column_writers::{ColumnWriter, NumericalColumnWriter, StrColumnWriter};
|
||||
use crate::writer::value_index::{IndexBuilder, SpareIndexBuilders};
|
||||
use crate::{Cardinality, DocId};
|
||||
|
||||
/// Threshold above which a column data will be compressed
|
||||
/// using ZSTD.
|
||||
const COLUMN_COMPRESSION_THRESHOLD: usize = 100_000;
|
||||
|
||||
/// This is a set of buffers that are only here
|
||||
/// to limit the amount of allocation.
|
||||
#[derive(Default)]
|
||||
@@ -30,20 +34,6 @@ struct SpareBuffers {
|
||||
column_buffer: Vec<u8>,
|
||||
}
|
||||
|
||||
/// Makes it possible to create a new columnar.
|
||||
///
|
||||
/// ```rust
|
||||
/// use tantivy_columnar::ColumnarWriter;
|
||||
/// fn main() {
|
||||
/// let mut columnar_writer = ColumnarWriter::default();
|
||||
/// columnar_writer.record_str(0u32 /* doc id */, "product_name", "Red backpack");
|
||||
/// columnar_writer.record_numerical(0u32 /* doc id */, "price", 10u64);
|
||||
/// columnar_writer.record_str(1u32 /* doc id */, "product_name", "Apple");
|
||||
/// columnar_writer.record_numerical(0u32 /* doc id */, "price", 10.5f64); //< uh oh we ended up mixing integer and floats.
|
||||
/// let mut wrt: Vec<u8> = Vec::new();
|
||||
/// columnar_writer.serialize(2u32, &mut wrt).unwrap();
|
||||
/// }
|
||||
/// ```
|
||||
pub struct ColumnarWriter {
|
||||
numerical_field_hash_map: ArenaHashMap,
|
||||
bool_field_hash_map: ArenaHashMap,
|
||||
@@ -68,11 +58,11 @@ impl Default for ColumnarWriter {
|
||||
}
|
||||
|
||||
impl ColumnarWriter {
|
||||
pub fn record_numerical<T: Into<NumericalValue> + Copy>(
|
||||
pub fn record_numerical(
|
||||
&mut self,
|
||||
doc: DocId,
|
||||
column_name: &str,
|
||||
numerical_value: T,
|
||||
numerical_value: NumericalValue,
|
||||
) {
|
||||
assert!(
|
||||
!column_name.as_bytes().contains(&0u8),
|
||||
@@ -83,7 +73,7 @@ impl ColumnarWriter {
|
||||
column_name.as_bytes(),
|
||||
|column_opt: Option<NumericalColumnWriter>| {
|
||||
let mut column: NumericalColumnWriter = column_opt.unwrap_or_default();
|
||||
column.record_numerical_value(doc, numerical_value.into(), arena);
|
||||
column.record_numerical_value(doc, numerical_value, arena);
|
||||
column
|
||||
},
|
||||
);
|
||||
@@ -105,7 +95,7 @@ impl ColumnarWriter {
|
||||
);
|
||||
}
|
||||
|
||||
pub fn record_str(&mut self, doc: DocId, column_name: &str, value: &str) {
|
||||
pub fn record_str(&mut self, doc: DocId, column_name: &str, value: &[u8]) {
|
||||
assert!(
|
||||
!column_name.as_bytes().contains(&0u8),
|
||||
"key may not contain the 0 byte"
|
||||
@@ -123,7 +113,7 @@ impl ColumnarWriter {
|
||||
dictionaries.push(DictionaryBuilder::default());
|
||||
StrColumnWriter::with_dictionary_id(dictionary_id)
|
||||
});
|
||||
column.record_bytes(doc, value.as_bytes(), dictionaries, arena);
|
||||
column.record_bytes(doc, value, dictionaries, arena);
|
||||
column
|
||||
},
|
||||
);
|
||||
@@ -131,27 +121,27 @@ impl ColumnarWriter {
|
||||
|
||||
pub fn serialize(&mut self, num_docs: DocId, wrt: &mut dyn io::Write) -> io::Result<()> {
|
||||
let mut serializer = ColumnarSerializer::new(wrt);
|
||||
let mut field_columns: Vec<(&[u8], ColumnTypeCategory, Addr)> = self
|
||||
let mut field_columns: Vec<(&[u8], GeneralType, Addr)> = self
|
||||
.numerical_field_hash_map
|
||||
.iter()
|
||||
.map(|(term, addr, _)| (term, ColumnTypeCategory::Numerical, addr))
|
||||
.map(|(term, addr, _)| (term, GeneralType::Numerical, addr))
|
||||
.collect();
|
||||
field_columns.extend(
|
||||
self.bytes_field_hash_map
|
||||
.iter()
|
||||
.map(|(term, addr, _)| (term, ColumnTypeCategory::Str, addr)),
|
||||
.map(|(term, addr, _)| (term, GeneralType::Str, addr)),
|
||||
);
|
||||
field_columns.extend(
|
||||
self.bool_field_hash_map
|
||||
.iter()
|
||||
.map(|(term, addr, _)| (term, ColumnTypeCategory::Bool, addr)),
|
||||
.map(|(term, addr, _)| (term, GeneralType::Bool, addr)),
|
||||
);
|
||||
field_columns.sort_unstable_by_key(|(column_name, col_type, _)| (*column_name, *col_type));
|
||||
let (arena, buffers, dictionaries) = (&self.arena, &mut self.buffers, &self.dictionaries);
|
||||
let mut symbol_byte_buffer: Vec<u8> = Vec::new();
|
||||
for (column_name, bytes_or_numerical, addr) in field_columns {
|
||||
match bytes_or_numerical {
|
||||
ColumnTypeCategory::Bool => {
|
||||
GeneralType::Bool => {
|
||||
let column_writer: ColumnWriter = self.bool_field_hash_map.read(addr);
|
||||
let cardinality = column_writer.get_cardinality(num_docs);
|
||||
let column_type_and_cardinality = ColumnTypeAndCardinality {
|
||||
@@ -168,7 +158,7 @@ impl ColumnarWriter {
|
||||
column_serializer,
|
||||
)?;
|
||||
}
|
||||
ColumnTypeCategory::Str => {
|
||||
GeneralType::Str => {
|
||||
let str_column_writer: StrColumnWriter = self.bytes_field_hash_map.read(addr);
|
||||
let dictionary_builder =
|
||||
&dictionaries[str_column_writer.dictionary_id as usize];
|
||||
@@ -188,7 +178,7 @@ impl ColumnarWriter {
|
||||
column_serializer,
|
||||
)?;
|
||||
}
|
||||
ColumnTypeCategory::Numerical => {
|
||||
GeneralType::Numerical => {
|
||||
let numerical_column_writer: NumericalColumnWriter =
|
||||
self.numerical_field_hash_map.read(addr);
|
||||
let (numerical_type, cardinality) =
|
||||
@@ -216,7 +206,15 @@ impl ColumnarWriter {
|
||||
}
|
||||
|
||||
fn compress_and_write_column<W: io::Write>(column_bytes: &[u8], wrt: &mut W) -> io::Result<()> {
|
||||
wrt.write_all(column_bytes)?;
|
||||
if column_bytes.len() >= COLUMN_COMPRESSION_THRESHOLD {
|
||||
wrt.write_all(&[1])?;
|
||||
let mut encoder = zstd::Encoder::new(wrt, 3)?;
|
||||
encoder.write_all(column_bytes)?;
|
||||
encoder.finish()?;
|
||||
} else {
|
||||
wrt.write_all(&[0])?;
|
||||
wrt.write_all(column_bytes)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -235,13 +233,13 @@ fn serialize_bytes_column<W: io::Write>(
|
||||
..
|
||||
} = buffers;
|
||||
column_buffer.clear();
|
||||
let term_id_mapping: TermIdMapping = dictionary_builder.serialize(column_buffer)?;
|
||||
let id_mapping: IdMapping = dictionary_builder.serialize(column_buffer)?;
|
||||
let dictionary_num_bytes: u32 = column_buffer.len() as u32;
|
||||
let operation_iterator = operation_it.map(|symbol: ColumnOperation<UnorderedId>| {
|
||||
// We map unordered ids to ordered ids.
|
||||
match symbol {
|
||||
ColumnOperation::Value(unordered_id) => {
|
||||
let ordered_id = term_id_mapping.to_ord(unordered_id);
|
||||
let ordered_id = id_mapping.to_ord(unordered_id);
|
||||
ColumnOperation::Value(ordered_id.0 as u64)
|
||||
}
|
||||
ColumnOperation::NewDoc(doc) => ColumnOperation::NewDoc(doc),
|
||||
|
||||
@@ -253,7 +253,8 @@ mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::{FileHandle, FileSlice};
|
||||
use crate::{file_slice::combine_ranges, HasLen};
|
||||
use crate::file_slice::combine_ranges;
|
||||
use crate::HasLen;
|
||||
|
||||
#[test]
|
||||
fn test_file_slice() -> io::Result<()> {
|
||||
|
||||
@@ -42,7 +42,8 @@ pub trait Column<T: PartialOrd = u64>: Send + Sync {
|
||||
positions: &mut Vec<u32>,
|
||||
) {
|
||||
let doc_id_range = doc_id_range.start..doc_id_range.end.min(self.num_vals());
|
||||
for idx in doc_id_range {
|
||||
|
||||
for idx in doc_id_range.start..doc_id_range.end {
|
||||
let val = self.get_val(idx);
|
||||
if value_range.contains(&val) {
|
||||
positions.push(idx);
|
||||
|
||||
Reference in New Issue
Block a user