mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-09 10:32:55 +00:00
Compare commits
4 Commits
range
...
use_column
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
375d1f9dac | ||
|
|
2874554ee4 | ||
|
|
cbc70a9eae | ||
|
|
226d0f88bc |
@@ -55,6 +55,7 @@ measure_time = "0.8.2"
|
|||||||
async-trait = "0.1.53"
|
async-trait = "0.1.53"
|
||||||
arc-swap = "1.5.0"
|
arc-swap = "1.5.0"
|
||||||
|
|
||||||
|
columnar = { version="0.1", path="./columnar", package ="tantivy-columnar" }
|
||||||
sstable = { version="0.1", path="./sstable", package ="tantivy-sstable", optional = true }
|
sstable = { version="0.1", path="./sstable", package ="tantivy-sstable", optional = true }
|
||||||
stacker = { version="0.1", path="./stacker", package ="tantivy-stacker" }
|
stacker = { version="0.1", path="./stacker", package ="tantivy-stacker" }
|
||||||
tantivy-query-grammar = { version= "0.19.0", path="./query-grammar" }
|
tantivy-query-grammar = { version= "0.19.0", path="./query-grammar" }
|
||||||
@@ -107,7 +108,7 @@ unstable = [] # useful for benches.
|
|||||||
quickwit = ["sstable"]
|
quickwit = ["sstable"]
|
||||||
|
|
||||||
[workspace]
|
[workspace]
|
||||||
members = ["query-grammar", "bitpacker", "common", "fastfield_codecs", "ownedbytes", "stacker", "sstable", "tokenizer-api"]
|
members = ["query-grammar", "bitpacker", "common", "fastfield_codecs", "ownedbytes", "stacker", "sstable", "tokenizer-api", "columnar"]
|
||||||
|
|
||||||
# Following the "fail" crate best practises, we isolate
|
# Following the "fail" crate best practises, we isolate
|
||||||
# tests that define specific behavior in fail check points
|
# tests that define specific behavior in fail check points
|
||||||
|
|||||||
@@ -5,28 +5,23 @@ edition = "2021"
|
|||||||
license = "MIT"
|
license = "MIT"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
itertools = "0.10.5"
|
||||||
|
log = "0.4.17"
|
||||||
|
fnv = "1.0.7"
|
||||||
|
fastdivide = "0.4.0"
|
||||||
|
rand = { version = "0.8.5", optional = true }
|
||||||
|
measure_time = { version = "0.8.2", optional = true }
|
||||||
|
prettytable-rs = { version = "0.10.0", optional = true }
|
||||||
|
|
||||||
stacker = { path = "../stacker", package="tantivy-stacker"}
|
stacker = { path = "../stacker", package="tantivy-stacker"}
|
||||||
serde_json = "1"
|
|
||||||
thiserror = "1"
|
|
||||||
fnv = "1"
|
|
||||||
sstable = { path = "../sstable", package = "tantivy-sstable" }
|
sstable = { path = "../sstable", package = "tantivy-sstable" }
|
||||||
common = { path = "../common", package = "tantivy-common" }
|
common = { path = "../common", package = "tantivy-common" }
|
||||||
itertools = "0.10"
|
|
||||||
log = "0.4"
|
|
||||||
tantivy-bitpacker = { version= "0.3", path = "../bitpacker/" }
|
tantivy-bitpacker = { version= "0.3", path = "../bitpacker/" }
|
||||||
prettytable-rs = {version="0.10.0", optional= true}
|
|
||||||
rand = {version="0.8.3", optional= true}
|
|
||||||
fastdivide = "0.4"
|
|
||||||
measure_time = { version="0.8.2", optional=true}
|
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
proptest = "1"
|
proptest = "1.0.0"
|
||||||
more-asserts = "0.3.0"
|
more-asserts = "0.3.1"
|
||||||
rand = "0.8.3"
|
rand = "0.8.5"
|
||||||
|
|
||||||
# temporary
|
|
||||||
[workspace]
|
|
||||||
members = []
|
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
unstable = []
|
unstable = []
|
||||||
|
|||||||
@@ -22,6 +22,9 @@ pub struct Column<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<T: PartialOrd> Column<T> {
|
impl<T: PartialOrd> Column<T> {
|
||||||
|
pub fn get_cardinality(&self) -> Cardinality {
|
||||||
|
self.idx.get_cardinality()
|
||||||
|
}
|
||||||
pub fn num_rows(&self) -> RowId {
|
pub fn num_rows(&self) -> RowId {
|
||||||
match &self.idx {
|
match &self.idx {
|
||||||
ColumnIndex::Full => self.values.num_vals() as u32,
|
ColumnIndex::Full => self.values.num_vals() as u32,
|
||||||
|
|||||||
@@ -1,7 +1,8 @@
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
use crate::column_index::optional_index::set_block::dense::DENSE_BLOCK_NUM_BYTES;
|
use crate::column_index::optional_index::set_block::{
|
||||||
use crate::column_index::optional_index::set_block::{DenseBlockCodec, SparseBlockCodec};
|
DenseBlockCodec, SparseBlockCodec, DENSE_BLOCK_NUM_BYTES,
|
||||||
|
};
|
||||||
use crate::column_index::optional_index::{Set, SetCodec};
|
use crate::column_index::optional_index::{Set, SetCodec};
|
||||||
|
|
||||||
fn test_set_helper<C: SetCodec<Item = u16>>(vals: &[u16]) -> usize {
|
fn test_set_helper<C: SetCodec<Item = u16>>(vals: &[u16]) -> usize {
|
||||||
|
|||||||
@@ -3,24 +3,22 @@ use std::net::Ipv6Addr;
|
|||||||
use crate::value::NumericalType;
|
use crate::value::NumericalType;
|
||||||
use crate::InvalidData;
|
use crate::InvalidData;
|
||||||
|
|
||||||
/// The column type represents the column type and can fit on 6-bits.
|
/// The column type represents the column type.
|
||||||
///
|
/// Any changes need to be propagated to `COLUMN_TYPES`.
|
||||||
/// - bits[0..3]: Column category type.
|
#[derive(Hash, Eq, PartialEq, Debug, Clone, Copy, Ord, PartialOrd)]
|
||||||
/// - bits[3..6]: Numerical type if necessary.
|
|
||||||
#[derive(Hash, Eq, PartialEq, Debug, Clone, Copy)]
|
|
||||||
#[repr(u8)]
|
#[repr(u8)]
|
||||||
pub enum ColumnType {
|
pub enum ColumnType {
|
||||||
I64 = 0u8,
|
I64 = 0u8,
|
||||||
U64 = 1u8,
|
U64 = 1u8,
|
||||||
F64 = 2u8,
|
F64 = 2u8,
|
||||||
Bytes = 10u8,
|
Bytes = 3u8,
|
||||||
Str = 14u8,
|
Str = 4u8,
|
||||||
Bool = 18u8,
|
Bool = 5u8,
|
||||||
IpAddr = 22u8,
|
IpAddr = 6u8,
|
||||||
DateTime = 26u8,
|
DateTime = 7u8,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
// The order needs to match _exactly_ the order in the enum
|
||||||
const COLUMN_TYPES: [ColumnType; 8] = [
|
const COLUMN_TYPES: [ColumnType; 8] = [
|
||||||
ColumnType::I64,
|
ColumnType::I64,
|
||||||
ColumnType::U64,
|
ColumnType::U64,
|
||||||
@@ -38,18 +36,7 @@ impl ColumnType {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn try_from_code(code: u8) -> Result<ColumnType, InvalidData> {
|
pub(crate) fn try_from_code(code: u8) -> Result<ColumnType, InvalidData> {
|
||||||
use ColumnType::*;
|
COLUMN_TYPES.get(code as usize).copied().ok_or(InvalidData)
|
||||||
match code {
|
|
||||||
0u8 => Ok(I64),
|
|
||||||
1u8 => Ok(U64),
|
|
||||||
2u8 => Ok(F64),
|
|
||||||
10u8 => Ok(Bytes),
|
|
||||||
14u8 => Ok(Str),
|
|
||||||
18u8 => Ok(Bool),
|
|
||||||
22u8 => Ok(IpAddr),
|
|
||||||
26u8 => Ok(Self::DateTime),
|
|
||||||
_ => Err(InvalidData),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -64,18 +51,6 @@ impl From<NumericalType> for ColumnType {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl ColumnType {
|
impl ColumnType {
|
||||||
/// get column type category
|
|
||||||
pub(crate) fn column_type_category(self) -> ColumnTypeCategory {
|
|
||||||
match self {
|
|
||||||
ColumnType::I64 | ColumnType::U64 | ColumnType::F64 => ColumnTypeCategory::Numerical,
|
|
||||||
ColumnType::Bytes => ColumnTypeCategory::Bytes,
|
|
||||||
ColumnType::Str => ColumnTypeCategory::Str,
|
|
||||||
ColumnType::Bool => ColumnTypeCategory::Bool,
|
|
||||||
ColumnType::IpAddr => ColumnTypeCategory::IpAddr,
|
|
||||||
ColumnType::DateTime => ColumnTypeCategory::DateTime,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn numerical_type(&self) -> Option<NumericalType> {
|
pub fn numerical_type(&self) -> Option<NumericalType> {
|
||||||
match self {
|
match self {
|
||||||
ColumnType::I64 => Some(NumericalType::I64),
|
ColumnType::I64 => Some(NumericalType::I64),
|
||||||
@@ -154,70 +129,20 @@ impl HasAssociatedColumnType for Ipv6Addr {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Column types are grouped into different categories that
|
|
||||||
/// corresponds to the different types of `JsonValue` types.
|
|
||||||
///
|
|
||||||
/// The columnar writer will apply coercion rules to make sure that
|
|
||||||
/// at most one column exist per `ColumnTypeCategory`.
|
|
||||||
///
|
|
||||||
/// See also [README.md].
|
|
||||||
#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)]
|
|
||||||
#[repr(u8)]
|
|
||||||
pub enum ColumnTypeCategory {
|
|
||||||
Bool,
|
|
||||||
Str,
|
|
||||||
Numerical,
|
|
||||||
DateTime,
|
|
||||||
Bytes,
|
|
||||||
IpAddr,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<ColumnType> for ColumnTypeCategory {
|
|
||||||
fn from(column_type: ColumnType) -> Self {
|
|
||||||
match column_type {
|
|
||||||
ColumnType::I64 => ColumnTypeCategory::Numerical,
|
|
||||||
ColumnType::U64 => ColumnTypeCategory::Numerical,
|
|
||||||
ColumnType::F64 => ColumnTypeCategory::Numerical,
|
|
||||||
ColumnType::Bytes => ColumnTypeCategory::Bytes,
|
|
||||||
ColumnType::Str => ColumnTypeCategory::Str,
|
|
||||||
ColumnType::Bool => ColumnTypeCategory::Bool,
|
|
||||||
ColumnType::IpAddr => ColumnTypeCategory::IpAddr,
|
|
||||||
ColumnType::DateTime => ColumnTypeCategory::DateTime,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::collections::HashSet;
|
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::Cardinality;
|
use crate::Cardinality;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_column_type_to_code() {
|
fn test_column_type_to_code() {
|
||||||
let mut column_type_set: HashSet<ColumnType> = HashSet::new();
|
for (code, expected_column_type) in super::COLUMN_TYPES.iter().copied().enumerate() {
|
||||||
for code in u8::MIN..=u8::MAX {
|
if let Ok(column_type) = ColumnType::try_from_code(code as u8) {
|
||||||
if let Ok(column_type) = ColumnType::try_from_code(code) {
|
assert_eq!(column_type, expected_column_type);
|
||||||
assert_eq!(column_type.to_code(), code);
|
|
||||||
assert!(column_type_set.insert(column_type));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assert_eq!(column_type_set.len(), super::COLUMN_TYPES.len());
|
for code in COLUMN_TYPES.len() as u8..=u8::MAX {
|
||||||
}
|
assert!(ColumnType::try_from_code(code as u8).is_err());
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_column_category_sort_consistent_with_column_type_sort() {
|
|
||||||
// This is a very important property because we
|
|
||||||
// we need to serialize colunmn in the right order.
|
|
||||||
let mut column_types: Vec<ColumnType> = super::COLUMN_TYPES.iter().copied().collect();
|
|
||||||
column_types.sort_by_key(|col| col.to_code());
|
|
||||||
let column_categories: Vec<ColumnTypeCategory> = column_types
|
|
||||||
.into_iter()
|
|
||||||
.map(ColumnTypeCategory::from)
|
|
||||||
.collect();
|
|
||||||
for (prev, next) in column_categories.iter().zip(column_categories.iter()) {
|
|
||||||
assert!(prev <= next);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,9 +1,10 @@
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::io;
|
use std::io;
|
||||||
|
|
||||||
use super::column_type::ColumnTypeCategory;
|
use super::writer::ColumnarSerializer;
|
||||||
use crate::columnar::ColumnarReader;
|
use crate::columnar::ColumnarReader;
|
||||||
use crate::dynamic_column::DynamicColumn;
|
use crate::dynamic_column::DynamicColumn;
|
||||||
|
use crate::{Cardinality, ColumnType};
|
||||||
|
|
||||||
pub enum MergeDocOrder {
|
pub enum MergeDocOrder {
|
||||||
/// Columnar tables are simply stacked one above the other.
|
/// Columnar tables are simply stacked one above the other.
|
||||||
@@ -19,24 +20,100 @@ pub enum MergeDocOrder {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn merge_columnar(
|
pub fn merge_columnar(
|
||||||
_columnar_readers: &[ColumnarReader],
|
columnar_readers: &[ColumnarReader],
|
||||||
mapping: MergeDocOrder,
|
mapping: MergeDocOrder,
|
||||||
_output: &mut impl io::Write,
|
output: &mut impl io::Write,
|
||||||
) -> io::Result<()> {
|
) -> io::Result<()> {
|
||||||
match mapping {
|
let mut serializer = ColumnarSerializer::new(output);
|
||||||
MergeDocOrder::Stack => {
|
|
||||||
// implement me :)
|
// TODO handle dictionary merge for Str/Bytes column
|
||||||
todo!();
|
let field_name_to_group = group_columns_for_merge(columnar_readers)?;
|
||||||
|
for (column_name, category_to_columns) in field_name_to_group {
|
||||||
|
for (_category, columns_to_merge) in category_to_columns {
|
||||||
|
let column_type = columns_to_merge[0].column_type();
|
||||||
|
let mut column_serialzier =
|
||||||
|
serializer.serialize_column(column_name.as_bytes(), column_type);
|
||||||
|
merge_columns(
|
||||||
|
column_type,
|
||||||
|
&columns_to_merge,
|
||||||
|
&mapping,
|
||||||
|
&mut column_serialzier,
|
||||||
|
)?;
|
||||||
}
|
}
|
||||||
MergeDocOrder::Complex(_) => {
|
}
|
||||||
// for later
|
serializer.finalize()?;
|
||||||
todo!();
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Column types are grouped into different categories.
|
||||||
|
/// After merge, all columns belonging to the same category are coerced to
|
||||||
|
/// the same column type.
|
||||||
|
///
|
||||||
|
/// In practise, today, only Numerical colummns are coerced into one type today.
|
||||||
|
///
|
||||||
|
/// See also [README.md].
|
||||||
|
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
|
||||||
|
#[repr(u8)]
|
||||||
|
pub enum ColumnTypeCategory {
|
||||||
|
Bool,
|
||||||
|
Str,
|
||||||
|
Numerical,
|
||||||
|
DateTime,
|
||||||
|
Bytes,
|
||||||
|
IpAddr,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<ColumnType> for ColumnTypeCategory {
|
||||||
|
fn from(column_type: ColumnType) -> Self {
|
||||||
|
match column_type {
|
||||||
|
ColumnType::I64 => ColumnTypeCategory::Numerical,
|
||||||
|
ColumnType::U64 => ColumnTypeCategory::Numerical,
|
||||||
|
ColumnType::F64 => ColumnTypeCategory::Numerical,
|
||||||
|
ColumnType::Bytes => ColumnTypeCategory::Bytes,
|
||||||
|
ColumnType::Str => ColumnTypeCategory::Str,
|
||||||
|
ColumnType::Bool => ColumnTypeCategory::Bool,
|
||||||
|
ColumnType::IpAddr => ColumnTypeCategory::IpAddr,
|
||||||
|
ColumnType::DateTime => ColumnTypeCategory::DateTime,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn collect_columns(
|
pub fn detect_cardinality(columns: &[DynamicColumn]) -> Cardinality {
|
||||||
columnar_readers: &[&ColumnarReader],
|
if columns
|
||||||
|
.iter()
|
||||||
|
.any(|column| column.get_cardinality().is_multivalue())
|
||||||
|
{
|
||||||
|
return Cardinality::Multivalued;
|
||||||
|
}
|
||||||
|
if columns
|
||||||
|
.iter()
|
||||||
|
.any(|column| column.get_cardinality().is_optional())
|
||||||
|
{
|
||||||
|
return Cardinality::Optional;
|
||||||
|
}
|
||||||
|
Cardinality::Full
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn compute_num_docs(columns: &[DynamicColumn], mapping: &MergeDocOrder) -> usize {
|
||||||
|
// TODO handle deletes
|
||||||
|
|
||||||
|
0
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn merge_columns(
|
||||||
|
column_type: ColumnType,
|
||||||
|
columns: &[DynamicColumn],
|
||||||
|
mapping: &MergeDocOrder,
|
||||||
|
column_serializer: &mut impl io::Write,
|
||||||
|
) -> io::Result<()> {
|
||||||
|
let cardinality = detect_cardinality(columns);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn group_columns_for_merge(
|
||||||
|
columnar_readers: &[ColumnarReader],
|
||||||
) -> io::Result<HashMap<String, HashMap<ColumnTypeCategory, Vec<DynamicColumn>>>> {
|
) -> io::Result<HashMap<String, HashMap<ColumnTypeCategory, Vec<DynamicColumn>>>> {
|
||||||
// Each column name may have multiple types of column associated.
|
// Each column name may have multiple types of column associated.
|
||||||
// For merging we are interested in the same column type category since they can be merged.
|
// For merging we are interested in the same column type category since they can be merged.
|
||||||
@@ -51,7 +128,7 @@ pub fn collect_columns(
|
|||||||
.or_default();
|
.or_default();
|
||||||
|
|
||||||
let columns = column_type_to_handles
|
let columns = column_type_to_handles
|
||||||
.entry(handle.column_type().column_type_category())
|
.entry(handle.column_type().into())
|
||||||
.or_default();
|
.or_default();
|
||||||
columns.push(handle.open()?);
|
columns.push(handle.open()?);
|
||||||
}
|
}
|
||||||
@@ -62,10 +139,9 @@ pub fn collect_columns(
|
|||||||
Ok(field_name_to_group)
|
Ok(field_name_to_group)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Cast numerical type columns to the same type
|
/// Coerce numerical type columns to the same type
|
||||||
pub(crate) fn normalize_columns(
|
/// TODO rename to `coerce_columns`
|
||||||
map: &mut HashMap<String, HashMap<ColumnTypeCategory, Vec<DynamicColumn>>>,
|
fn normalize_columns(map: &mut HashMap<String, HashMap<ColumnTypeCategory, Vec<DynamicColumn>>>) {
|
||||||
) {
|
|
||||||
for (_field_name, type_category_to_columns) in map.iter_mut() {
|
for (_field_name, type_category_to_columns) in map.iter_mut() {
|
||||||
for (type_category, columns) in type_category_to_columns {
|
for (type_category, columns) in type_category_to_columns {
|
||||||
if type_category == &ColumnTypeCategory::Numerical {
|
if type_category == &ColumnTypeCategory::Numerical {
|
||||||
@@ -85,26 +161,20 @@ fn cast_to_common_numerical_column(columns: &[DynamicColumn]) -> Vec<DynamicColu
|
|||||||
.all(|column| column.column_type().numerical_type().is_some()));
|
.all(|column| column.column_type().numerical_type().is_some()));
|
||||||
let coerce_to_i64: Vec<_> = columns
|
let coerce_to_i64: Vec<_> = columns
|
||||||
.iter()
|
.iter()
|
||||||
.map(|column| column.clone().coerce_to_i64())
|
.filter_map(|column| column.clone().coerce_to_i64())
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
if coerce_to_i64.iter().all(|column| column.is_some()) {
|
if coerce_to_i64.len() == columns.len() {
|
||||||
return coerce_to_i64
|
return coerce_to_i64;
|
||||||
.into_iter()
|
|
||||||
.map(|column| column.unwrap())
|
|
||||||
.collect();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let coerce_to_u64: Vec<_> = columns
|
let coerce_to_u64: Vec<_> = columns
|
||||||
.iter()
|
.iter()
|
||||||
.map(|column| column.clone().coerce_to_u64())
|
.filter_map(|column| column.clone().coerce_to_u64())
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
if coerce_to_u64.iter().all(|column| column.is_some()) {
|
if coerce_to_u64.len() == columns.len() {
|
||||||
return coerce_to_u64
|
return coerce_to_u64;
|
||||||
.into_iter()
|
|
||||||
.map(|column| column.unwrap())
|
|
||||||
.collect();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
columns
|
columns
|
||||||
@@ -151,7 +221,9 @@ mod tests {
|
|||||||
ColumnarReader::open(buffer).unwrap()
|
ColumnarReader::open(buffer).unwrap()
|
||||||
};
|
};
|
||||||
|
|
||||||
let column_map = collect_columns(&[&columnar1, &columnar2, &columnar3]).unwrap();
|
let column_map =
|
||||||
|
group_columns_for_merge(&[columnar1.clone(), columnar2.clone(), columnar3.clone()])
|
||||||
|
.unwrap();
|
||||||
assert_eq!(column_map.len(), 1);
|
assert_eq!(column_map.len(), 1);
|
||||||
let cat_to_columns = column_map.get("numbers").unwrap();
|
let cat_to_columns = column_map.get("numbers").unwrap();
|
||||||
assert_eq!(cat_to_columns.len(), 1);
|
assert_eq!(cat_to_columns.len(), 1);
|
||||||
@@ -159,14 +231,14 @@ mod tests {
|
|||||||
let numerical = cat_to_columns.get(&ColumnTypeCategory::Numerical).unwrap();
|
let numerical = cat_to_columns.get(&ColumnTypeCategory::Numerical).unwrap();
|
||||||
assert!(numerical.iter().all(|column| column.is_f64()));
|
assert!(numerical.iter().all(|column| column.is_f64()));
|
||||||
|
|
||||||
let column_map = collect_columns(&[&columnar1, &columnar1]).unwrap();
|
let column_map = group_columns_for_merge(&[columnar1.clone(), columnar1.clone()]).unwrap();
|
||||||
assert_eq!(column_map.len(), 1);
|
assert_eq!(column_map.len(), 1);
|
||||||
let cat_to_columns = column_map.get("numbers").unwrap();
|
let cat_to_columns = column_map.get("numbers").unwrap();
|
||||||
assert_eq!(cat_to_columns.len(), 1);
|
assert_eq!(cat_to_columns.len(), 1);
|
||||||
let numerical = cat_to_columns.get(&ColumnTypeCategory::Numerical).unwrap();
|
let numerical = cat_to_columns.get(&ColumnTypeCategory::Numerical).unwrap();
|
||||||
assert!(numerical.iter().all(|column| column.is_i64()));
|
assert!(numerical.iter().all(|column| column.is_i64()));
|
||||||
|
|
||||||
let column_map = collect_columns(&[&columnar2, &columnar2]).unwrap();
|
let column_map = group_columns_for_merge(&[columnar2.clone(), columnar2.clone()]).unwrap();
|
||||||
assert_eq!(column_map.len(), 1);
|
assert_eq!(column_map.len(), 1);
|
||||||
let cat_to_columns = column_map.get("numbers").unwrap();
|
let cat_to_columns = column_map.get("numbers").unwrap();
|
||||||
assert_eq!(cat_to_columns.len(), 1);
|
assert_eq!(cat_to_columns.len(), 1);
|
||||||
|
|||||||
1
columnar/src/columnar/merge_index.rs
Normal file
1
columnar/src/columnar/merge_index.rs
Normal file
@@ -0,0 +1 @@
|
|||||||
|
|
||||||
@@ -1,6 +1,7 @@
|
|||||||
mod column_type;
|
mod column_type;
|
||||||
mod format_version;
|
mod format_version;
|
||||||
mod merge;
|
mod merge;
|
||||||
|
mod merge_index;
|
||||||
mod reader;
|
mod reader;
|
||||||
mod writer;
|
mod writer;
|
||||||
|
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ fn io_invalid_data(msg: String) -> io::Error {
|
|||||||
|
|
||||||
/// The ColumnarReader makes it possible to access a set of columns
|
/// The ColumnarReader makes it possible to access a set of columns
|
||||||
/// associated to field names.
|
/// associated to field names.
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct ColumnarReader {
|
pub struct ColumnarReader {
|
||||||
column_dictionary: Dictionary<RangeSSTable>,
|
column_dictionary: Dictionary<RangeSSTable>,
|
||||||
column_data: FileSlice,
|
column_data: FileSlice,
|
||||||
|
|||||||
@@ -184,10 +184,12 @@ impl CompatibleNumericalTypes {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl NumericalColumnWriter {
|
impl NumericalColumnWriter {
|
||||||
pub fn column_type_and_cardinality(&self, num_docs: RowId) -> (NumericalType, Cardinality) {
|
pub fn numerical_type(&self) -> NumericalType {
|
||||||
let numerical_type = self.compatible_numerical_types.to_numerical_type();
|
self.compatible_numerical_types.to_numerical_type()
|
||||||
let cardinality = self.column_writer.get_cardinality(num_docs);
|
}
|
||||||
(numerical_type, cardinality)
|
|
||||||
|
pub fn cardinality(&self, num_docs: RowId) -> Cardinality {
|
||||||
|
self.column_writer.get_cardinality(num_docs)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn record_numerical_value(
|
pub fn record_numerical_value(
|
||||||
|
|||||||
@@ -8,14 +8,14 @@ use std::net::Ipv6Addr;
|
|||||||
|
|
||||||
use column_operation::ColumnOperation;
|
use column_operation::ColumnOperation;
|
||||||
use common::CountingWriter;
|
use common::CountingWriter;
|
||||||
use serializer::ColumnarSerializer;
|
pub(crate) use serializer::ColumnarSerializer;
|
||||||
use stacker::{Addr, ArenaHashMap, MemoryArena};
|
use stacker::{Addr, ArenaHashMap, MemoryArena};
|
||||||
|
|
||||||
use crate::column_index::SerializableColumnIndex;
|
use crate::column_index::SerializableColumnIndex;
|
||||||
use crate::column_values::{
|
use crate::column_values::{
|
||||||
ColumnValues, MonotonicallyMappableToU128, MonotonicallyMappableToU64, VecColumn,
|
ColumnValues, MonotonicallyMappableToU128, MonotonicallyMappableToU64, VecColumn,
|
||||||
};
|
};
|
||||||
use crate::columnar::column_type::{ColumnType, ColumnTypeCategory};
|
use crate::columnar::column_type::ColumnType;
|
||||||
use crate::columnar::writer::column_writers::{
|
use crate::columnar::writer::column_writers::{
|
||||||
ColumnWriter, NumericalColumnWriter, StrOrBytesColumnWriter,
|
ColumnWriter, NumericalColumnWriter, StrOrBytesColumnWriter,
|
||||||
};
|
};
|
||||||
@@ -276,35 +276,40 @@ impl ColumnarWriter {
|
|||||||
}
|
}
|
||||||
pub fn serialize(&mut self, num_docs: RowId, wrt: &mut dyn io::Write) -> io::Result<()> {
|
pub fn serialize(&mut self, num_docs: RowId, wrt: &mut dyn io::Write) -> io::Result<()> {
|
||||||
let mut serializer = ColumnarSerializer::new(wrt);
|
let mut serializer = ColumnarSerializer::new(wrt);
|
||||||
let mut columns: Vec<(&[u8], ColumnTypeCategory, Addr)> = self
|
let mut columns: Vec<(&[u8], ColumnType, Addr)> = self
|
||||||
.numerical_field_hash_map
|
.numerical_field_hash_map
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(column_name, addr, _)| (column_name, ColumnTypeCategory::Numerical, addr))
|
.map(|(column_name, addr, _)| {
|
||||||
|
let numerical_column_writer: NumericalColumnWriter =
|
||||||
|
self.numerical_field_hash_map.read(addr);
|
||||||
|
let column_type = numerical_column_writer.numerical_type().into();
|
||||||
|
(column_name, column_type, addr)
|
||||||
|
})
|
||||||
.collect();
|
.collect();
|
||||||
columns.extend(
|
columns.extend(
|
||||||
self.bytes_field_hash_map
|
self.bytes_field_hash_map
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(term, addr, _)| (term, ColumnTypeCategory::Bytes, addr)),
|
.map(|(term, addr, _)| (term, ColumnType::Bytes, addr)),
|
||||||
);
|
);
|
||||||
columns.extend(
|
columns.extend(
|
||||||
self.str_field_hash_map
|
self.str_field_hash_map
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(column_name, addr, _)| (column_name, ColumnTypeCategory::Str, addr)),
|
.map(|(column_name, addr, _)| (column_name, ColumnType::Str, addr)),
|
||||||
);
|
);
|
||||||
columns.extend(
|
columns.extend(
|
||||||
self.bool_field_hash_map
|
self.bool_field_hash_map
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(column_name, addr, _)| (column_name, ColumnTypeCategory::Bool, addr)),
|
.map(|(column_name, addr, _)| (column_name, ColumnType::Bool, addr)),
|
||||||
);
|
);
|
||||||
columns.extend(
|
columns.extend(
|
||||||
self.ip_addr_field_hash_map
|
self.ip_addr_field_hash_map
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(column_name, addr, _)| (column_name, ColumnTypeCategory::IpAddr, addr)),
|
.map(|(column_name, addr, _)| (column_name, ColumnType::IpAddr, addr)),
|
||||||
);
|
);
|
||||||
columns.extend(
|
columns.extend(
|
||||||
self.datetime_field_hash_map
|
self.datetime_field_hash_map
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(column_name, addr, _)| (column_name, ColumnTypeCategory::DateTime, addr)),
|
.map(|(column_name, addr, _)| (column_name, ColumnType::DateTime, addr)),
|
||||||
);
|
);
|
||||||
columns.sort_unstable_by_key(|(column_name, col_type, _)| (*column_name, *col_type));
|
columns.sort_unstable_by_key(|(column_name, col_type, _)| (*column_name, *col_type));
|
||||||
|
|
||||||
@@ -312,8 +317,12 @@ impl ColumnarWriter {
|
|||||||
let mut symbol_byte_buffer: Vec<u8> = Vec::new();
|
let mut symbol_byte_buffer: Vec<u8> = Vec::new();
|
||||||
for (column_name, column_type, addr) in columns {
|
for (column_name, column_type, addr) in columns {
|
||||||
match column_type {
|
match column_type {
|
||||||
ColumnTypeCategory::Bool => {
|
ColumnType::Bool | ColumnType::DateTime => {
|
||||||
let column_writer: ColumnWriter = self.bool_field_hash_map.read(addr);
|
let column_writer: ColumnWriter = if column_type == ColumnType::Bool {
|
||||||
|
self.bool_field_hash_map.read(addr)
|
||||||
|
} else {
|
||||||
|
self.datetime_field_hash_map.read(addr)
|
||||||
|
};
|
||||||
let cardinality = column_writer.get_cardinality(num_docs);
|
let cardinality = column_writer.get_cardinality(num_docs);
|
||||||
let mut column_serializer =
|
let mut column_serializer =
|
||||||
serializer.serialize_column(column_name, ColumnType::Bool);
|
serializer.serialize_column(column_name, ColumnType::Bool);
|
||||||
@@ -325,7 +334,7 @@ impl ColumnarWriter {
|
|||||||
&mut column_serializer,
|
&mut column_serializer,
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
ColumnTypeCategory::IpAddr => {
|
ColumnType::IpAddr => {
|
||||||
let column_writer: ColumnWriter = self.ip_addr_field_hash_map.read(addr);
|
let column_writer: ColumnWriter = self.ip_addr_field_hash_map.read(addr);
|
||||||
let cardinality = column_writer.get_cardinality(num_docs);
|
let cardinality = column_writer.get_cardinality(num_docs);
|
||||||
let mut column_serializer =
|
let mut column_serializer =
|
||||||
@@ -338,32 +347,35 @@ impl ColumnarWriter {
|
|||||||
&mut column_serializer,
|
&mut column_serializer,
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
ColumnTypeCategory::Bytes | ColumnTypeCategory::Str => {
|
ColumnType::Bytes | ColumnType::Str => {
|
||||||
let (column_type, str_column_writer): (ColumnType, StrOrBytesColumnWriter) =
|
let str_or_bytes_column_writer: StrOrBytesColumnWriter =
|
||||||
if column_type == ColumnTypeCategory::Bytes {
|
if column_type == ColumnType::Bytes {
|
||||||
(ColumnType::Bytes, self.bytes_field_hash_map.read(addr))
|
self.bytes_field_hash_map.read(addr)
|
||||||
} else {
|
} else {
|
||||||
(ColumnType::Str, self.str_field_hash_map.read(addr))
|
self.str_field_hash_map.read(addr)
|
||||||
};
|
};
|
||||||
let dictionary_builder =
|
let dictionary_builder =
|
||||||
&dictionaries[str_column_writer.dictionary_id as usize];
|
&dictionaries[str_or_bytes_column_writer.dictionary_id as usize];
|
||||||
let cardinality = str_column_writer.column_writer.get_cardinality(num_docs);
|
let cardinality = str_or_bytes_column_writer
|
||||||
|
.column_writer
|
||||||
|
.get_cardinality(num_docs);
|
||||||
let mut column_serializer =
|
let mut column_serializer =
|
||||||
serializer.serialize_column(column_name, column_type);
|
serializer.serialize_column(column_name, column_type);
|
||||||
serialize_bytes_or_str_column(
|
serialize_bytes_or_str_column(
|
||||||
cardinality,
|
cardinality,
|
||||||
num_docs,
|
num_docs,
|
||||||
dictionary_builder,
|
dictionary_builder,
|
||||||
str_column_writer.operation_iterator(arena, &mut symbol_byte_buffer),
|
str_or_bytes_column_writer
|
||||||
|
.operation_iterator(arena, &mut symbol_byte_buffer),
|
||||||
buffers,
|
buffers,
|
||||||
&mut column_serializer,
|
&mut column_serializer,
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
ColumnTypeCategory::Numerical => {
|
ColumnType::I64 | ColumnType::F64 | ColumnType::U64 => {
|
||||||
let numerical_column_writer: NumericalColumnWriter =
|
let numerical_column_writer: NumericalColumnWriter =
|
||||||
self.numerical_field_hash_map.read(addr);
|
self.numerical_field_hash_map.read(addr);
|
||||||
let (numerical_type, cardinality) =
|
let numerical_type = column_type.numerical_type().unwrap();
|
||||||
numerical_column_writer.column_type_and_cardinality(num_docs);
|
let cardinality = numerical_column_writer.cardinality(num_docs);
|
||||||
let mut column_serializer =
|
let mut column_serializer =
|
||||||
serializer.serialize_column(column_name, ColumnType::from(numerical_type));
|
serializer.serialize_column(column_name, ColumnType::from(numerical_type));
|
||||||
serialize_numerical_column(
|
serialize_numerical_column(
|
||||||
@@ -375,20 +387,6 @@ impl ColumnarWriter {
|
|||||||
&mut column_serializer,
|
&mut column_serializer,
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
ColumnTypeCategory::DateTime => {
|
|
||||||
let column_writer: ColumnWriter = self.datetime_field_hash_map.read(addr);
|
|
||||||
let cardinality = column_writer.get_cardinality(num_docs);
|
|
||||||
let mut column_serializer =
|
|
||||||
serializer.serialize_column(column_name, ColumnType::DateTime);
|
|
||||||
serialize_numerical_column(
|
|
||||||
cardinality,
|
|
||||||
num_docs,
|
|
||||||
NumericalType::I64,
|
|
||||||
column_writer.operation_iterator(arena, &mut symbol_byte_buffer),
|
|
||||||
buffers,
|
|
||||||
&mut column_serializer,
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
serializer.finalize()?;
|
serializer.finalize()?;
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ use common::{HasLen, OwnedBytes};
|
|||||||
use crate::column::{BytesColumn, Column, StrColumn};
|
use crate::column::{BytesColumn, Column, StrColumn};
|
||||||
use crate::column_values::{monotonic_map_column, StrictlyMonotonicFn};
|
use crate::column_values::{monotonic_map_column, StrictlyMonotonicFn};
|
||||||
use crate::columnar::ColumnType;
|
use crate::columnar::ColumnType;
|
||||||
use crate::{DateTime, NumericalType};
|
use crate::{Cardinality, DateTime, NumericalType};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub enum DynamicColumn {
|
pub enum DynamicColumn {
|
||||||
@@ -23,6 +23,18 @@ pub enum DynamicColumn {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl DynamicColumn {
|
impl DynamicColumn {
|
||||||
|
pub fn get_cardinality(&self) -> Cardinality {
|
||||||
|
match self {
|
||||||
|
DynamicColumn::Bool(c) => c.get_cardinality(),
|
||||||
|
DynamicColumn::I64(c) => c.get_cardinality(),
|
||||||
|
DynamicColumn::U64(c) => c.get_cardinality(),
|
||||||
|
DynamicColumn::F64(c) => c.get_cardinality(),
|
||||||
|
DynamicColumn::IpAddr(c) => c.get_cardinality(),
|
||||||
|
DynamicColumn::DateTime(c) => c.get_cardinality(),
|
||||||
|
DynamicColumn::Bytes(c) => c.ords().get_cardinality(),
|
||||||
|
DynamicColumn::Str(c) => c.ords().get_cardinality(),
|
||||||
|
}
|
||||||
|
}
|
||||||
pub fn column_type(&self) -> ColumnType {
|
pub fn column_type(&self) -> ColumnType {
|
||||||
match self {
|
match self {
|
||||||
DynamicColumn::Bool(_) => ColumnType::Bool,
|
DynamicColumn::Bool(_) => ColumnType::Bool,
|
||||||
|
|||||||
@@ -62,6 +62,12 @@ pub enum Cardinality {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Cardinality {
|
impl Cardinality {
|
||||||
|
pub fn is_optional(&self) -> bool {
|
||||||
|
matches!(self, Cardinality::Optional)
|
||||||
|
}
|
||||||
|
pub fn is_multivalue(&self) -> bool {
|
||||||
|
matches!(self, Cardinality::Multivalued)
|
||||||
|
}
|
||||||
pub(crate) fn to_code(self) -> u8 {
|
pub(crate) fn to_code(self) -> u8 {
|
||||||
self as u8
|
self as u8
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -30,6 +30,7 @@ use crate::{BlockAddr, DeltaReader, Reader, SSTable, SSTableIndex, TermOrdinal};
|
|||||||
/// block boundary.
|
/// block boundary.
|
||||||
///
|
///
|
||||||
/// (See also README.md)
|
/// (See also README.md)
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
pub struct Dictionary<TSSTable: SSTable> {
|
pub struct Dictionary<TSSTable: SSTable> {
|
||||||
pub sstable_slice: FileSlice,
|
pub sstable_slice: FileSlice,
|
||||||
pub sstable_index: SSTableIndex,
|
pub sstable_index: SSTableIndex,
|
||||||
|
|||||||
@@ -117,6 +117,7 @@ impl SSTable for MonotonicU64SSTable {
|
|||||||
/// `range_sstable[k1].end == range_sstable[k2].start`.
|
/// `range_sstable[k1].end == range_sstable[k2].start`.
|
||||||
///
|
///
|
||||||
/// The first range is not required to start at `0`.
|
/// The first range is not required to start at `0`.
|
||||||
|
#[derive(Clone, Copy, Debug)]
|
||||||
pub struct RangeSSTable;
|
pub struct RangeSSTable;
|
||||||
|
|
||||||
impl SSTable for RangeSSTable {
|
impl SSTable for RangeSSTable {
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize};
|
|||||||
|
|
||||||
use crate::{common_prefix_len, SSTableDataCorruption, TermOrdinal};
|
use crate::{common_prefix_len, SSTableDataCorruption, TermOrdinal};
|
||||||
|
|
||||||
#[derive(Default, Debug, Serialize, Deserialize)]
|
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct SSTableIndex {
|
pub struct SSTableIndex {
|
||||||
blocks: Vec<BlockMeta>,
|
blocks: Vec<BlockMeta>,
|
||||||
}
|
}
|
||||||
@@ -75,7 +75,7 @@ pub struct BlockAddr {
|
|||||||
pub first_ordinal: u64,
|
pub first_ordinal: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub(crate) struct BlockMeta {
|
pub(crate) struct BlockMeta {
|
||||||
/// Any byte string that is lexicographically greater or equal to
|
/// Any byte string that is lexicographically greater or equal to
|
||||||
/// the last key in the block,
|
/// the last key in the block,
|
||||||
|
|||||||
Reference in New Issue
Block a user