Compare commits

..

1 Commits

Author SHA1 Message Date
Paul Masurel
ab703486aa Updated columnar todo 2023-03-21 18:55:23 +09:00
45 changed files with 436 additions and 1542 deletions

View File

@@ -32,7 +32,7 @@ log = "0.4.16"
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.79"
num_cpus = "1.13.1"
fs4 = { version = "0.6.3", optional = true }
fs2 = { version = "0.4.3", optional = true }
levenshtein_automata = "0.2.1"
uuid = { version = "1.0.0", features = ["v4", "serde"] }
crossbeam-channel = "0.5.4"
@@ -94,7 +94,7 @@ overflow-checks = true
[features]
default = ["mmap", "stopwords", "lz4-compression"]
mmap = ["fs4", "tempfile", "memmap2"]
mmap = ["fs2", "tempfile", "memmap2"]
stopwords = []
brotli-compression = ["brotli"]

View File

@@ -23,7 +23,6 @@ serde = "1.0.152"
proptest = "1"
more-asserts = "0.3.1"
rand = "0.8.5"
serde_json = "1"
[features]
unstable = []

View File

@@ -1,28 +1,22 @@
# zero to one
* revisit line codec
* add columns from schema on merge
* Plugging JSON
* replug examples
* move datetime to quickwit common
* switch to nanos
* reintroduce the gcd map.
# Perf and Size
* remove alloc in `ord_to_term`
+ multivaued range queries restrat frm the beginning all of the time.
* re-add ZSTD compression for dictionaries
no systematic monotonic mapping
consider removing multilinear
f32?
adhoc solution for bool?
add metrics helper for aggregate. sum(row_id)
review inline absence/presence
improv perf of select using PDEP
compare with roaring bitmap/elias fano etc etc.
SIMD range? (see blog post)
Add alignment?
Consider another codec to bridge the gap between few and 5k elements
* no systematic monotonic mapping
* consider removing multilinear
* f32?
* adhoc solution for bool?
* add metrics helper for aggregate. sum(row_id)
* review inline absence/presence
* improv perf of select using PDEP
* compare with roaring bitmap/elias fano etc etc.
* SIMD range? (see blog post)
* Add alignment?
* Consider another codec to bridge the gap between few and 5k elements
# Cleanup and rationalization
in benchmark, unify percent vs ratio, f32 vs f64.
@@ -30,15 +24,10 @@ investigate if should have better errors? io::Error is overused at the moment.
rename rank/select in unit tests
Review the public API via cargo doc
go through TODOs
remove all doc_id occurences -> row_id
use the rank & select naming in unit tests branch.
multi-linear -> blockwise
linear codec -> simply a multiplication for the index column
rename columnar to something more explicit, like column_dictionary or columnar_table
rename fastfield -> column
document changes
rationalization FastFieldValue, HasColumnType
isolate u128_based and uniform naming
# Other
fix enhance column-cli

View File

@@ -1,36 +0,0 @@
use crate::{Column, DocId, RowId};
#[derive(Debug, Default, Clone)]
pub struct ColumnBlockAccessor<T> {
val_cache: Vec<T>,
docid_cache: Vec<DocId>,
row_id_cache: Vec<RowId>,
}
impl<T: PartialOrd + Copy + std::fmt::Debug + Send + Sync + 'static + Default>
ColumnBlockAccessor<T>
{
#[inline]
pub fn fetch_block(&mut self, docs: &[u32], accessor: &Column<T>) {
self.docid_cache.clear();
self.row_id_cache.clear();
accessor.row_ids_for_docs(docs, &mut self.docid_cache, &mut self.row_id_cache);
self.val_cache.resize(self.row_id_cache.len(), T::default());
accessor
.values
.get_vals(&self.row_id_cache, &mut self.val_cache);
}
#[inline]
pub fn iter_vals(&self) -> impl Iterator<Item = T> + '_ {
self.val_cache.iter().cloned()
}
#[inline]
pub fn iter_docid_vals(&self) -> impl Iterator<Item = (DocId, T)> + '_ {
self.docid_cache
.iter()
.cloned()
.zip(self.val_cache.iter().cloned())
}
}

View File

@@ -1,6 +1,6 @@
use std::io;
use std::ops::Deref;
use std::sync::Arc;
use std::{fmt, io};
use sstable::{Dictionary, VoidSSTable};
@@ -21,14 +21,6 @@ pub struct BytesColumn {
pub(crate) term_ord_column: Column<u64>,
}
impl fmt::Debug for BytesColumn {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BytesColumn")
.field("term_ord_column", &self.term_ord_column)
.finish()
}
}
impl BytesColumn {
/// Fills the given `output` buffer with the term associated to the ordinal `ord`.
///
@@ -64,12 +56,6 @@ impl BytesColumn {
#[derive(Clone)]
pub struct StrColumn(BytesColumn);
impl fmt::Debug for StrColumn {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:?}", self.term_ord_column)
}
}
impl From<StrColumn> for BytesColumn {
fn from(str_column: StrColumn) -> BytesColumn {
str_column.0

View File

@@ -1,7 +1,6 @@
mod dictionary_encoded;
mod serialize;
use core::fmt;
use std::fmt::Debug;
use std::io::Write;
use std::ops::{Deref, Range, RangeInclusive};
@@ -17,33 +16,14 @@ pub use serialize::{
use crate::column_index::ColumnIndex;
use crate::column_values::monotonic_mapping::StrictlyMonotonicMappingToInternal;
use crate::column_values::{monotonic_map_column, ColumnValues};
use crate::{Cardinality, DocId, EmptyColumnValues, MonotonicallyMappableToU64, RowId};
use crate::{Cardinality, MonotonicallyMappableToU64, RowId};
#[derive(Clone)]
pub struct Column<T = u64> {
pub index: ColumnIndex,
pub idx: ColumnIndex,
pub values: Arc<dyn ColumnValues<T>>,
}
impl<T: fmt::Debug + PartialOrd + Send + Sync + Copy + 'static> fmt::Debug for Column<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let num_docs = self.num_docs();
let entries = (0..num_docs)
.map(|i| (i, self.values_for_doc(i).collect::<Vec<_>>()))
.filter(|(_, vals)| !vals.is_empty());
f.debug_map().entries(entries).finish()
}
}
impl<T: PartialOrd + Default> Column<T> {
pub fn build_empty_column(num_docs: u32) -> Column<T> {
Column {
index: ColumnIndex::Empty { num_docs },
values: Arc::new(EmptyColumnValues),
}
}
}
impl<T: MonotonicallyMappableToU64> Column<T> {
pub fn to_u64_monotonic(self) -> Column<u64> {
let values = Arc::new(monotonic_map_column(
@@ -51,7 +31,7 @@ impl<T: MonotonicallyMappableToU64> Column<T> {
StrictlyMonotonicMappingToInternal::<T>::new(),
));
Column {
index: self.index,
idx: self.idx,
values,
}
}
@@ -60,11 +40,11 @@ impl<T: MonotonicallyMappableToU64> Column<T> {
impl<T: PartialOrd + Copy + Debug + Send + Sync + 'static> Column<T> {
#[inline]
pub fn get_cardinality(&self) -> Cardinality {
self.index.get_cardinality()
self.idx.get_cardinality()
}
pub fn num_docs(&self) -> RowId {
match &self.index {
match &self.idx {
ColumnIndex::Empty { num_docs } => *num_docs,
ColumnIndex::Full => self.values.num_vals(),
ColumnIndex::Optional(optional_index) => optional_index.num_docs(),
@@ -88,25 +68,8 @@ impl<T: PartialOrd + Copy + Debug + Send + Sync + 'static> Column<T> {
self.values_for_doc(row_id).next()
}
/// Translates a block of docis to row_ids.
///
/// returns the row_ids and the matching docids on the same index
/// e.g.
/// DocId In: [0, 5, 6]
/// DocId Out: [0, 0, 6, 6]
/// RowId Out: [0, 1, 2, 3]
#[inline]
pub fn row_ids_for_docs(
&self,
doc_ids: &[DocId],
doc_ids_out: &mut Vec<DocId>,
row_ids: &mut Vec<RowId>,
) {
self.index.docids_to_rowids(doc_ids, doc_ids_out, row_ids)
}
pub fn values_for_doc(&self, doc_id: DocId) -> impl Iterator<Item = T> + '_ {
self.value_row_ids(doc_id)
pub fn values_for_doc(&self, row_id: RowId) -> impl Iterator<Item = T> + '_ {
self.value_row_ids(row_id)
.map(|value_row_id: RowId| self.values.get_val(value_row_id))
}
@@ -119,15 +82,13 @@ impl<T: PartialOrd + Copy + Debug + Send + Sync + 'static> Column<T> {
doc_ids: &mut Vec<u32>,
) {
// convert passed docid range to row id range
let rowid_range = self
.index
.docid_range_to_rowids(selected_docid_range.clone());
let rowid_range = self.idx.docid_range_to_rowids(selected_docid_range.clone());
// Load rows
self.values
.get_row_ids_for_value_range(value_range, rowid_range, doc_ids);
// Convert rows to docids
self.index
self.idx
.select_batch_in_place(selected_docid_range.start, doc_ids);
}
@@ -152,7 +113,7 @@ impl<T> Deref for Column<T> {
type Target = ColumnIndex;
fn deref(&self) -> &Self::Target {
&self.index
&self.idx
}
}
@@ -190,7 +151,7 @@ impl<T: PartialOrd + Debug + Send + Sync + Copy + 'static> ColumnValues<T>
}
fn num_vals(&self) -> u32 {
match &self.column.index {
match &self.column.idx {
ColumnIndex::Empty { .. } => 0u32,
ColumnIndex::Full => self.column.values.num_vals(),
ColumnIndex::Optional(optional_idx) => optional_idx.num_docs(),

View File

@@ -52,7 +52,7 @@ pub fn open_column_u64<T: MonotonicallyMappableToU64>(bytes: OwnedBytes) -> io::
let column_index = crate::column_index::open_column_index(column_index_data)?;
let column_values = load_u64_based_column_values(column_values_data)?;
Ok(Column {
index: column_index,
idx: column_index,
values: column_values,
})
}
@@ -71,7 +71,7 @@ pub fn open_column_u128<T: MonotonicallyMappableToU128>(
let column_index = crate::column_index::open_column_index(column_index_data)?;
let column_values = crate::column_values::open_u128_mapped(column_values_data)?;
Ok(Column {
index: column_index,
idx: column_index,
values: column_values,
})
}

View File

@@ -8,16 +8,17 @@ use crate::column_index::SerializableColumnIndex;
use crate::{Cardinality, ColumnIndex, MergeRowOrder};
// For simplification, we never have cardinality go down due to deletes.
fn detect_cardinality(columns: &[ColumnIndex]) -> Cardinality {
fn detect_cardinality(columns: &[Option<ColumnIndex>]) -> Cardinality {
columns
.iter()
.flatten()
.map(ColumnIndex::get_cardinality)
.max()
.unwrap_or(Cardinality::Full)
}
pub fn merge_column_index<'a>(
columns: &'a [ColumnIndex],
columns: &'a [Option<ColumnIndex>],
merge_row_order: &'a MergeRowOrder,
) -> SerializableColumnIndex<'a> {
// For simplification, we do not try to detect whether the cardinality could be
@@ -52,33 +53,34 @@ mod tests {
let optional_index: ColumnIndex = OptionalIndex::for_test(1, &[]).into();
let multivalued_index: ColumnIndex = MultiValueIndex::for_test(&[0, 1]).into();
assert_eq!(
detect_cardinality(&[optional_index.clone(), ColumnIndex::Empty { num_docs: 0 }]),
detect_cardinality(&[Some(optional_index.clone()), None]),
Cardinality::Optional
);
assert_eq!(
detect_cardinality(&[optional_index.clone(), ColumnIndex::Full]),
detect_cardinality(&[Some(optional_index.clone()), Some(ColumnIndex::Full)]),
Cardinality::Optional
);
assert_eq!(
detect_cardinality(&[Some(multivalued_index.clone()), None]),
Cardinality::Multivalued
);
assert_eq!(
detect_cardinality(&[
multivalued_index.clone(),
ColumnIndex::Empty { num_docs: 0 }
Some(multivalued_index.clone()),
Some(optional_index.clone())
]),
Cardinality::Multivalued
);
assert_eq!(
detect_cardinality(&[multivalued_index.clone(), optional_index.clone()]),
Cardinality::Multivalued
);
assert_eq!(
detect_cardinality(&[optional_index, multivalued_index]),
detect_cardinality(&[Some(optional_index), Some(multivalued_index)]),
Cardinality::Multivalued
);
}
#[test]
fn test_merge_index_multivalued_sorted() {
let column_indexes: Vec<ColumnIndex> = vec![MultiValueIndex::for_test(&[0, 2, 5]).into()];
let column_indexes: Vec<Option<ColumnIndex>> =
vec![Some(MultiValueIndex::for_test(&[0, 2, 5]).into())];
let merge_row_order: MergeRowOrder = ShuffleMergeOrder::for_test(
&[2],
vec![
@@ -102,10 +104,10 @@ mod tests {
#[test]
fn test_merge_index_multivalued_sorted_several_segment() {
let column_indexes: Vec<ColumnIndex> = vec![
MultiValueIndex::for_test(&[0, 2, 5]).into(),
ColumnIndex::Empty { num_docs: 0 },
MultiValueIndex::for_test(&[0, 1, 4]).into(),
let column_indexes: Vec<Option<ColumnIndex>> = vec![
Some(MultiValueIndex::for_test(&[0, 2, 5]).into()),
None,
Some(MultiValueIndex::for_test(&[0, 1, 4]).into()),
];
let merge_row_order: MergeRowOrder = ShuffleMergeOrder::for_test(
&[2, 0, 2],

View File

@@ -5,7 +5,7 @@ use crate::iterable::Iterable;
use crate::{Cardinality, ColumnIndex, RowId, ShuffleMergeOrder};
pub fn merge_column_index_shuffled<'a>(
column_indexes: &'a [ColumnIndex],
column_indexes: &'a [Option<ColumnIndex>],
cardinality_after_merge: Cardinality,
shuffle_merge_order: &'a ShuffleMergeOrder,
) -> SerializableColumnIndex<'a> {
@@ -33,41 +33,41 @@ pub fn merge_column_index_shuffled<'a>(
///
/// In other words the column_indexes passed as argument may NOT be multivalued.
fn merge_column_index_shuffled_optional<'a>(
column_indexes: &'a [ColumnIndex],
column_indexes: &'a [Option<ColumnIndex>],
merge_order: &'a ShuffleMergeOrder,
) -> Box<dyn Iterable<RowId> + 'a> {
Box::new(ShuffledIndex {
Box::new(ShuffledOptionalIndex {
column_indexes,
merge_order,
})
}
struct ShuffledIndex<'a> {
column_indexes: &'a [ColumnIndex],
struct ShuffledOptionalIndex<'a> {
column_indexes: &'a [Option<ColumnIndex>],
merge_order: &'a ShuffleMergeOrder,
}
impl<'a> Iterable<u32> for ShuffledIndex<'a> {
impl<'a> Iterable<u32> for ShuffledOptionalIndex<'a> {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = u32> + '_> {
Box::new(
self.merge_order
.iter_new_to_old_row_addrs()
.enumerate()
.filter_map(|(new_row_id, old_row_addr)| {
let column_index = &self.column_indexes[old_row_addr.segment_ord as usize];
let row_id = new_row_id as u32;
if column_index.has_value(old_row_addr.row_id) {
Some(row_id)
} else {
None
}
}),
)
Box::new(self.merge_order
.iter_new_to_old_row_addrs()
.enumerate()
.filter_map(|(new_row_id, old_row_addr)| {
let Some(column_index) = &self.column_indexes[old_row_addr.segment_ord as usize] else {
return None;
};
let row_id = new_row_id as u32;
if column_index.has_value(old_row_addr.row_id) {
Some(row_id)
} else {
None
}
}))
}
}
fn merge_column_index_shuffled_multivalued<'a>(
column_indexes: &'a [ColumnIndex],
column_indexes: &'a [Option<ColumnIndex>],
merge_order: &'a ShuffleMergeOrder,
) -> Box<dyn Iterable<RowId> + 'a> {
Box::new(ShuffledMultivaluedIndex {
@@ -77,16 +77,19 @@ fn merge_column_index_shuffled_multivalued<'a>(
}
struct ShuffledMultivaluedIndex<'a> {
column_indexes: &'a [ColumnIndex],
column_indexes: &'a [Option<ColumnIndex>],
merge_order: &'a ShuffleMergeOrder,
}
fn iter_num_values<'a>(
column_indexes: &'a [ColumnIndex],
column_indexes: &'a [Option<ColumnIndex>],
merge_order: &'a ShuffleMergeOrder,
) -> impl Iterator<Item = u32> + 'a {
merge_order.iter_new_to_old_row_addrs().map(|row_addr| {
let column_index = &column_indexes[row_addr.segment_ord as usize];
let Some(column_index) = &column_indexes[row_addr.segment_ord as usize] else {
// No values in the entire column. It surely means there are 0 values associated to this row.
return 0u32;
};
match column_index {
ColumnIndex::Empty { .. } => 0u32,
ColumnIndex::Full => 1,
@@ -140,7 +143,7 @@ mod tests {
#[test]
fn test_merge_column_index_optional_shuffle() {
let optional_index: ColumnIndex = OptionalIndex::for_test(2, &[0]).into();
let column_indexes = vec![optional_index, ColumnIndex::Full];
let column_indexes = vec![Some(optional_index), Some(ColumnIndex::Full)];
let row_addrs = vec![
RowAddr {
segment_ord: 0u32,

View File

@@ -9,7 +9,7 @@ use crate::{Cardinality, ColumnIndex, RowId, StackMergeOrder};
///
/// There are no sort nor deletes involved.
pub fn merge_column_index_stacked<'a>(
columns: &'a [ColumnIndex],
columns: &'a [Option<ColumnIndex>],
cardinality_after_merge: Cardinality,
stack_merge_order: &'a StackMergeOrder,
) -> SerializableColumnIndex<'a> {
@@ -33,7 +33,7 @@ pub fn merge_column_index_stacked<'a>(
}
struct StackedOptionalIndex<'a> {
columns: &'a [ColumnIndex],
columns: &'a [Option<ColumnIndex>],
stack_merge_order: &'a StackMergeOrder,
}
@@ -46,16 +46,16 @@ impl<'a> Iterable<RowId> for StackedOptionalIndex<'a> {
.flat_map(|(columnar_id, column_index_opt)| {
let columnar_row_range = self.stack_merge_order.columnar_range(columnar_id);
let rows_it: Box<dyn Iterator<Item = RowId>> = match column_index_opt {
ColumnIndex::Full => Box::new(columnar_row_range),
ColumnIndex::Optional(optional_index) => Box::new(
Some(ColumnIndex::Full) => Box::new(columnar_row_range),
Some(ColumnIndex::Optional(optional_index)) => Box::new(
optional_index
.iter_rows()
.map(move |row_id: RowId| columnar_row_range.start + row_id),
),
ColumnIndex::Multivalued(_) => {
Some(ColumnIndex::Multivalued(_)) => {
panic!("No multivalued index is allowed when stacking column index");
}
ColumnIndex::Empty { .. } => Box::new(std::iter::empty()),
None | Some(ColumnIndex::Empty { .. }) => Box::new(std::iter::empty()),
};
rows_it
}),
@@ -65,18 +65,20 @@ impl<'a> Iterable<RowId> for StackedOptionalIndex<'a> {
#[derive(Clone, Copy)]
struct StackedMultivaluedIndex<'a> {
columns: &'a [ColumnIndex],
columns: &'a [Option<ColumnIndex>],
stack_merge_order: &'a StackMergeOrder,
}
fn convert_column_opt_to_multivalued_index<'a>(
column_index_opt: &'a ColumnIndex,
column_index_opt: Option<&'a ColumnIndex>,
num_rows: RowId,
) -> Box<dyn Iterator<Item = RowId> + 'a> {
match column_index_opt {
ColumnIndex::Empty { .. } => Box::new(iter::repeat(0u32).take(num_rows as usize + 1)),
ColumnIndex::Full => Box::new(0..num_rows + 1),
ColumnIndex::Optional(optional_index) => {
None | Some(ColumnIndex::Empty { .. }) => {
Box::new(iter::repeat(0u32).take(num_rows as usize + 1))
}
Some(ColumnIndex::Full) => Box::new(0..num_rows + 1),
Some(ColumnIndex::Optional(optional_index)) => {
Box::new(
(0..num_rows)
// TODO optimize
@@ -84,7 +86,9 @@ fn convert_column_opt_to_multivalued_index<'a>(
.chain(std::iter::once(optional_index.num_non_nulls())),
)
}
ColumnIndex::Multivalued(multivalued_index) => multivalued_index.start_index_column.iter(),
Some(ColumnIndex::Multivalued(multivalued_index)) => {
multivalued_index.start_index_column.iter()
}
}
}
@@ -93,6 +97,7 @@ impl<'a> Iterable<RowId> for StackedMultivaluedIndex<'a> {
let multivalued_indexes =
self.columns
.iter()
.map(Option::as_ref)
.enumerate()
.map(|(columnar_id, column_opt)| {
let num_rows =

View File

@@ -12,7 +12,7 @@ pub use serialize::{open_column_index, serialize_column_index, SerializableColum
use crate::column_index::multivalued_index::MultiValueIndex;
use crate::{Cardinality, DocId, RowId};
#[derive(Clone, Debug)]
#[derive(Clone)]
pub enum ColumnIndex {
Empty {
num_docs: u32,
@@ -37,15 +37,11 @@ impl From<MultiValueIndex> for ColumnIndex {
}
impl ColumnIndex {
// Returns the cardinality of the column index.
//
// By convention, if the column contains no docs, we consider that it is
// full.
#[inline]
pub fn get_cardinality(&self) -> Cardinality {
match self {
ColumnIndex::Empty { num_docs: 0 } | ColumnIndex::Full => Cardinality::Full,
ColumnIndex::Empty { .. } => Cardinality::Optional,
ColumnIndex::Full => Cardinality::Full,
ColumnIndex::Optional(_) => Cardinality::Optional,
ColumnIndex::Multivalued(_) => Cardinality::Multivalued,
}
@@ -78,45 +74,6 @@ impl ColumnIndex {
}
}
/// Translates a block of docis to row_ids.
///
/// returns the row_ids and the matching docids on the same index
/// e.g.
/// DocId In: [0, 5, 6]
/// DocId Out: [0, 0, 6, 6]
/// RowId Out: [0, 1, 2, 3]
#[inline]
pub fn docids_to_rowids(
&self,
doc_ids: &[DocId],
doc_ids_out: &mut Vec<DocId>,
row_ids: &mut Vec<RowId>,
) {
match self {
ColumnIndex::Empty { .. } => {}
ColumnIndex::Full => {
doc_ids_out.extend_from_slice(doc_ids);
row_ids.extend_from_slice(doc_ids);
}
ColumnIndex::Optional(optional_index) => {
for doc_id in doc_ids {
if let Some(row_id) = optional_index.rank_if_exists(*doc_id) {
doc_ids_out.push(*doc_id);
row_ids.push(row_id);
}
}
}
ColumnIndex::Multivalued(multivalued_index) => {
for doc_id in doc_ids {
for row_id in multivalued_index.range(*doc_id) {
doc_ids_out.push(*doc_id);
row_ids.push(row_id);
}
}
}
}
}
pub fn docid_range_to_rowids(&self, doc_id: Range<DocId>) -> Range<RowId> {
match self {
ColumnIndex::Empty { .. } => 0..0,
@@ -156,21 +113,3 @@ impl ColumnIndex {
}
}
}
#[cfg(test)]
mod tests {
use crate::{Cardinality, ColumnIndex};
#[test]
fn test_column_index_get_cardinality() {
assert_eq!(
ColumnIndex::Empty { num_docs: 0 }.get_cardinality(),
Cardinality::Full
);
assert_eq!(ColumnIndex::Full.get_cardinality(), Cardinality::Full);
assert_eq!(
ColumnIndex::Empty { num_docs: 1 }.get_cardinality(),
Cardinality::Optional
);
}
}

View File

@@ -35,14 +35,6 @@ pub struct MultiValueIndex {
pub start_index_column: Arc<dyn crate::ColumnValues<RowId>>,
}
impl std::fmt::Debug for MultiValueIndex {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("MultiValuedIndex")
.field("num_rows", &self.start_index_column.num_vals())
.finish_non_exhaustive()
}
}
impl From<Arc<dyn ColumnValues<RowId>>> for MultiValueIndex {
fn from(start_index_column: Arc<dyn ColumnValues<RowId>>) -> Self {
MultiValueIndex { start_index_column }
@@ -114,8 +106,11 @@ impl MultiValueIndex {
#[cfg(test)]
mod tests {
use std::ops::Range;
use std::sync::Arc;
use super::MultiValueIndex;
use crate::column_values::IterColumn;
use crate::{ColumnValues, RowId};
fn index_to_pos_helper(
index: &MultiValueIndex,
@@ -129,7 +124,9 @@ mod tests {
#[test]
fn test_positions_to_docid() {
let index = MultiValueIndex::for_test(&[0, 10, 12, 15, 22, 23]);
let offsets: Vec<RowId> = vec![0, 10, 12, 15, 22, 23]; // docid values are [0..10, 10..12, 12..15, etc.]
let column: Arc<dyn ColumnValues<RowId>> = Arc::new(IterColumn::from(offsets.into_iter()));
let index = MultiValueIndex::from(column);
assert_eq!(index.num_docs(), 5);
let positions = &[10u32, 11, 15, 20, 21, 22];
assert_eq!(index_to_pos_helper(&index, 0..5, positions), vec![1, 3, 4]);

View File

@@ -88,15 +88,6 @@ pub struct OptionalIndex {
block_metas: Arc<[BlockMeta]>,
}
impl std::fmt::Debug for OptionalIndex {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OptionalIndex")
.field("num_rows", &self.num_rows)
.field("num_non_null_rows", &self.num_non_null_rows)
.finish_non_exhaustive()
}
}
/// Splits a value address into lower and upper 16bits.
/// The lower 16 bits are the value in the block
/// The upper 16 bits are the block index

View File

@@ -5,7 +5,7 @@ use crate::iterable::Iterable;
use crate::{ColumnIndex, ColumnValues, MergeRowOrder};
pub(crate) struct MergedColumnValues<'a, T> {
pub(crate) column_indexes: &'a [ColumnIndex],
pub(crate) column_indexes: &'a [Option<ColumnIndex>],
pub(crate) column_values: &'a [Option<Arc<dyn ColumnValues<T>>>],
pub(crate) merge_row_order: &'a MergeRowOrder,
}
@@ -23,7 +23,8 @@ impl<'a, T: Copy + PartialOrd + Debug> Iterable<T> for MergedColumnValues<'a, T>
shuffle_merge_order
.iter_new_to_old_row_addrs()
.flat_map(|row_addr| {
let column_index = &self.column_indexes[row_addr.segment_ord as usize];
let column_index =
self.column_indexes[row_addr.segment_ord as usize].as_ref()?;
let column_values =
self.column_values[row_addr.segment_ord as usize].as_ref()?;
let value_range = column_index.value_row_ids(row_addr.row_id);

View File

@@ -110,26 +110,20 @@ pub trait ColumnValues<T: PartialOrd = u64>: Send + Sync {
}
}
/// Returns a lower bound for this column of values.
/// Returns the minimum value for this fast field.
///
/// All values are guaranteed to be higher than `.min_value()`
/// but this value is not necessary the best boundary value.
///
/// We have
/// ∀i < self.num_vals(), self.get_val(i) >= self.min_value()
/// But we don't have necessarily
/// ∃i < self.num_vals(), self.get_val(i) == self.min_value()
/// This min_value may not be exact.
/// For instance, the min value does not take in account of possible
/// deleted document. All values are however guaranteed to be higher than
/// `.min_value()`.
fn min_value(&self) -> T;
/// Returns an upper bound for this column of values.
/// Returns the maximum value for this fast field.
///
/// All values are guaranteed to be lower than `.max_value()`
/// but this value is not necessary the best boundary value.
///
/// We have
/// ∀i < self.num_vals(), self.get_val(i) <= self.max_value()
/// But we don't have necessarily
/// ∃i < self.num_vals(), self.get_val(i) == self.max_value()
/// This max_value may not be exact.
/// For instance, the max value does not take in account of possible
/// deleted document. All values are however guaranteed to be higher than
/// `.max_value()`.
fn max_value(&self) -> T;
/// The number of values in the column.
@@ -141,27 +135,6 @@ pub trait ColumnValues<T: PartialOrd = u64>: Send + Sync {
}
}
/// Empty column of values.
pub struct EmptyColumnValues;
impl<T: PartialOrd + Default> ColumnValues<T> for EmptyColumnValues {
fn get_val(&self, _idx: u32) -> T {
panic!("Internal Error: Called get_val of empty column.")
}
fn min_value(&self) -> T {
T::default()
}
fn max_value(&self) -> T {
T::default()
}
fn num_vals(&self) -> u32 {
0
}
}
impl<T: Copy + PartialOrd + Debug> ColumnValues<T> for Arc<dyn ColumnValues<T>> {
#[inline(always)]
fn get_val(&self, idx: u32) -> T {
@@ -205,5 +178,54 @@ impl<T: Copy + PartialOrd + Debug> ColumnValues<T> for Arc<dyn ColumnValues<T>>
}
}
/// Wraps an cloneable iterator into a `Column`.
pub struct IterColumn<T>(T);
impl<T> From<T> for IterColumn<T>
where T: Iterator + Clone + ExactSizeIterator
{
fn from(iter: T) -> Self {
IterColumn(iter)
}
}
impl<T> ColumnValues<T::Item> for IterColumn<T>
where
T: Iterator + Clone + ExactSizeIterator + Send + Sync,
T::Item: PartialOrd + Debug,
{
fn get_val(&self, idx: u32) -> T::Item {
self.0.clone().nth(idx as usize).unwrap()
}
fn min_value(&self) -> T::Item {
self.0.clone().next().unwrap()
}
fn max_value(&self) -> T::Item {
self.0.clone().last().unwrap()
}
fn num_vals(&self) -> u32 {
self.0.len() as u32
}
fn iter(&self) -> Box<dyn Iterator<Item = T::Item> + '_> {
Box::new(self.0.clone())
}
}
#[cfg(all(test, feature = "unstable"))]
mod bench;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_range_as_col() {
let col = IterColumn::from(10..100);
assert_eq!(col.num_vals(), 90);
assert_eq!(col.max_value(), 99);
}
}

View File

@@ -1,4 +1,3 @@
use std::fmt;
use std::fmt::Debug;
use std::net::Ipv6Addr;
@@ -22,22 +21,6 @@ pub enum ColumnType {
DateTime = 7u8,
}
impl fmt::Display for ColumnType {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let short_str = match self {
ColumnType::I64 => "i64",
ColumnType::U64 => "u64",
ColumnType::F64 => "f64",
ColumnType::Bytes => "bytes",
ColumnType::Str => "str",
ColumnType::Bool => "bool",
ColumnType::IpAddr => "ip",
ColumnType::DateTime => "datetime",
};
write!(f, "{}", short_str)
}
}
// The order needs to match _exactly_ the order in the enum
const COLUMN_TYPES: [ColumnType; 8] = [
ColumnType::I64,

View File

@@ -28,7 +28,7 @@ use crate::{
///
/// See also [README.md].
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
pub(crate) enum ColumnTypeCategory {
enum ColumnTypeCategory {
Bool,
Str,
Numerical,
@@ -78,10 +78,6 @@ pub fn merge_columnar(
output: &mut impl io::Write,
) -> io::Result<()> {
let mut serializer = ColumnarSerializer::new(output);
let num_rows_per_columnar = columnar_readers
.iter()
.map(|reader| reader.num_rows())
.collect::<Vec<u32>>();
let columns_to_merge = group_columns_for_merge(columnar_readers, required_columns)?;
for ((column_name, column_type), columns) in columns_to_merge {
@@ -89,7 +85,6 @@ pub fn merge_columnar(
serializer.serialize_column(column_name.as_bytes(), column_type);
merge_column(
column_type,
&num_rows_per_columnar,
columns,
&merge_row_order,
&mut column_serializer,
@@ -113,7 +108,6 @@ fn dynamic_column_to_u64_monotonic(dynamic_column: DynamicColumn) -> Option<Colu
fn merge_column(
column_type: ColumnType,
num_docs_per_column: &[u32],
columns: Vec<Option<DynamicColumn>>,
merge_row_order: &MergeRowOrder,
wrt: &mut impl io::Write,
@@ -124,19 +118,17 @@ fn merge_column(
| ColumnType::F64
| ColumnType::DateTime
| ColumnType::Bool => {
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns.len());
let mut column_indexes: Vec<Option<ColumnIndex>> = Vec::with_capacity(columns.len());
let mut column_values: Vec<Option<Arc<dyn ColumnValues>>> =
Vec::with_capacity(columns.len());
for (i, dynamic_column_opt) in columns.into_iter().enumerate() {
if let Some(Column { index: idx, values }) =
for dynamic_column_opt in columns {
if let Some(Column { idx, values }) =
dynamic_column_opt.and_then(dynamic_column_to_u64_monotonic)
{
column_indexes.push(idx);
column_indexes.push(Some(idx));
column_values.push(Some(values));
} else {
column_indexes.push(ColumnIndex::Empty {
num_docs: num_docs_per_column[i],
});
column_indexes.push(None);
column_values.push(None);
}
}
@@ -150,19 +142,15 @@ fn merge_column(
serialize_column_mappable_to_u64(merged_column_index, &merge_column_values, wrt)?;
}
ColumnType::IpAddr => {
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns.len());
let mut column_indexes: Vec<Option<ColumnIndex>> = Vec::with_capacity(columns.len());
let mut column_values: Vec<Option<Arc<dyn ColumnValues<Ipv6Addr>>>> =
Vec::with_capacity(columns.len());
for (i, dynamic_column_opt) in columns.into_iter().enumerate() {
if let Some(DynamicColumn::IpAddr(Column { index: idx, values })) =
dynamic_column_opt
{
column_indexes.push(idx);
for dynamic_column_opt in columns {
if let Some(DynamicColumn::IpAddr(Column { idx, values })) = dynamic_column_opt {
column_indexes.push(Some(idx));
column_values.push(Some(values));
} else {
column_indexes.push(ColumnIndex::Empty {
num_docs: num_docs_per_column[i],
});
column_indexes.push(None);
column_values.push(None);
}
}
@@ -178,22 +166,20 @@ fn merge_column(
serialize_column_mappable_to_u128(merged_column_index, &merge_column_values, wrt)?;
}
ColumnType::Bytes | ColumnType::Str => {
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns.len());
let mut column_indexes: Vec<Option<ColumnIndex>> = Vec::with_capacity(columns.len());
let mut bytes_columns: Vec<Option<BytesColumn>> = Vec::with_capacity(columns.len());
for (i, dynamic_column_opt) in columns.into_iter().enumerate() {
for dynamic_column_opt in columns {
match dynamic_column_opt {
Some(DynamicColumn::Str(str_column)) => {
column_indexes.push(str_column.term_ord_column.index.clone());
column_indexes.push(Some(str_column.term_ord_column.idx.clone()));
bytes_columns.push(Some(str_column.into()));
}
Some(DynamicColumn::Bytes(bytes_column)) => {
column_indexes.push(bytes_column.term_ord_column.index.clone());
column_indexes.push(Some(bytes_column.term_ord_column.idx.clone()));
bytes_columns.push(Some(bytes_column));
}
_ => {
column_indexes.push(ColumnIndex::Empty {
num_docs: num_docs_per_column[i],
});
column_indexes.push(None);
bytes_columns.push(None);
}
}

View File

@@ -1,5 +1,3 @@
use itertools::Itertools;
use super::*;
use crate::{Cardinality, ColumnarWriter, HasAssociatedColumnType, RowId};
@@ -251,8 +249,6 @@ fn test_merge_columnar_texts() {
let cols = columnar_reader.read_columns("texts").unwrap();
let dynamic_column = cols[0].open().unwrap();
let DynamicColumn::Str(vals) = dynamic_column else { panic!() };
assert_eq!(vals.ords().get_cardinality(), Cardinality::Optional);
let get_str_for_ord = |ord| {
let mut out = String::new();
vals.ord_to_str(ord, &mut out).unwrap();
@@ -380,93 +376,3 @@ fn test_merge_columnar_byte_with_missing() {
assert_eq!(get_bytes_for_row(6), vec![b"b".to_vec()]);
assert_eq!(get_bytes_for_row(7), vec![b"a".to_vec(), b"b".to_vec()]);
}
#[test]
fn test_merge_columnar_different_types() {
let columnar1 = make_text_columnar_multiple_columns(&[("mixed", &[&["a"]])]);
let columnar2 = make_text_columnar_multiple_columns(&[("mixed", &[&[], &["b"]])]);
let columnar3 = make_columnar("mixed", &[1i64]);
let mut buffer = Vec::new();
let columnars = &[&columnar1, &columnar2, &columnar3];
let stack_merge_order = StackMergeOrder::stack(columnars);
crate::columnar::merge_columnar(
columnars,
&[],
MergeRowOrder::Stack(stack_merge_order),
&mut buffer,
)
.unwrap();
let columnar_reader = ColumnarReader::open(buffer).unwrap();
assert_eq!(columnar_reader.num_rows(), 4);
assert_eq!(columnar_reader.num_columns(), 2);
let cols = columnar_reader.read_columns("mixed").unwrap();
// numeric column
let dynamic_column = cols[0].open().unwrap();
let DynamicColumn::I64(vals) = dynamic_column else { panic!() };
assert_eq!(vals.get_cardinality(), Cardinality::Optional);
assert_eq!(vals.values_for_doc(0).collect_vec(), vec![]);
assert_eq!(vals.values_for_doc(1).collect_vec(), vec![]);
assert_eq!(vals.values_for_doc(2).collect_vec(), vec![]);
assert_eq!(vals.values_for_doc(3).collect_vec(), vec![1]);
assert_eq!(vals.values_for_doc(4).collect_vec(), vec![]);
// text column
let dynamic_column = cols[1].open().unwrap();
let DynamicColumn::Str(vals) = dynamic_column else { panic!() };
assert_eq!(vals.ords().get_cardinality(), Cardinality::Optional);
let get_str_for_ord = |ord| {
let mut out = String::new();
vals.ord_to_str(ord, &mut out).unwrap();
out
};
assert_eq!(vals.dictionary.num_terms(), 2);
assert_eq!(get_str_for_ord(0), "a");
assert_eq!(get_str_for_ord(1), "b");
let get_str_for_row = |row_id| {
let term_ords: Vec<String> = vals
.term_ords(row_id)
.map(|el| {
let mut out = String::new();
vals.ord_to_str(el, &mut out).unwrap();
out
})
.collect();
term_ords
};
assert_eq!(get_str_for_row(0), vec!["a".to_string()]);
assert_eq!(get_str_for_row(1), Vec::<String>::new());
assert_eq!(get_str_for_row(2), vec!["b".to_string()]);
assert_eq!(get_str_for_row(3), Vec::<String>::new());
}
#[test]
fn test_merge_columnar_different_empty_cardinality() {
let columnar1 = make_text_columnar_multiple_columns(&[("mixed", &[&["a"]])]);
let columnar2 = make_columnar("mixed", &[1i64]);
let mut buffer = Vec::new();
let columnars = &[&columnar1, &columnar2];
let stack_merge_order = StackMergeOrder::stack(columnars);
crate::columnar::merge_columnar(
columnars,
&[],
MergeRowOrder::Stack(stack_merge_order),
&mut buffer,
)
.unwrap();
let columnar_reader = ColumnarReader::open(buffer).unwrap();
assert_eq!(columnar_reader.num_rows(), 2);
assert_eq!(columnar_reader.num_columns(), 2);
let cols = columnar_reader.read_columns("mixed").unwrap();
// numeric column
let dynamic_column = cols[0].open().unwrap();
assert_eq!(dynamic_column.get_cardinality(), Cardinality::Optional);
// text column
let dynamic_column = cols[1].open().unwrap();
assert_eq!(dynamic_column.get_cardinality(), Cardinality::Optional);
}

View File

@@ -5,8 +5,6 @@ mod reader;
mod writer;
pub use column_type::{ColumnType, HasAssociatedColumnType};
#[cfg(test)]
pub(crate) use merge::ColumnTypeCategory;
pub use merge::{merge_columnar, MergeRowOrder, ShuffleMergeOrder, StackMergeOrder};
pub use reader::ColumnarReader;
pub use writer::ColumnarWriter;

View File

@@ -1,4 +1,4 @@
use std::{fmt, io, mem};
use std::{io, mem};
use common::file_slice::FileSlice;
use common::BinarySerializable;
@@ -21,32 +21,6 @@ pub struct ColumnarReader {
num_rows: RowId,
}
impl fmt::Debug for ColumnarReader {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let num_rows = self.num_rows();
let columns = self.list_columns().unwrap();
let num_cols = columns.len();
let mut debug_struct = f.debug_struct("Columnar");
debug_struct
.field("num_rows", &num_rows)
.field("num_cols", &num_cols);
for (col_name, dynamic_column_handle) in columns.into_iter().take(5) {
let col = dynamic_column_handle.open().unwrap();
if col.num_values() > 10 {
debug_struct.field(&col_name, &"..");
} else {
debug_struct.field(&col_name, &col);
}
}
if num_cols > 5 {
debug_struct.finish_non_exhaustive()?;
} else {
debug_struct.finish()?;
}
Ok(())
}
}
/// Functions by both the async/sync code listing columns.
/// It takes a stream from the column sstable and return the list of
/// `DynamicColumn` available in it.

View File

@@ -1,6 +1,6 @@
use std::io;
use std::net::Ipv6Addr;
use std::sync::Arc;
use std::{fmt, io};
use common::file_slice::FileSlice;
use common::{ByteCount, DateTime, HasLen, OwnedBytes};
@@ -8,7 +8,7 @@ use common::{ByteCount, DateTime, HasLen, OwnedBytes};
use crate::column::{BytesColumn, Column, StrColumn};
use crate::column_values::{monotonic_map_column, StrictlyMonotonicFn};
use crate::columnar::ColumnType;
use crate::{Cardinality, ColumnIndex, NumericalType};
use crate::{Cardinality, NumericalType};
#[derive(Clone)]
pub enum DynamicColumn {
@@ -22,54 +22,19 @@ pub enum DynamicColumn {
Str(StrColumn),
}
impl fmt::Debug for DynamicColumn {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "[{} {} |", self.get_cardinality(), self.column_type())?;
match self {
DynamicColumn::Bool(col) => write!(f, " {:?}", col)?,
DynamicColumn::I64(col) => write!(f, " {:?}", col)?,
DynamicColumn::U64(col) => write!(f, " {:?}", col)?,
DynamicColumn::F64(col) => write!(f, "{:?}", col)?,
DynamicColumn::IpAddr(col) => write!(f, "{:?}", col)?,
DynamicColumn::DateTime(col) => write!(f, "{:?}", col)?,
DynamicColumn::Bytes(col) => write!(f, "{:?}", col)?,
DynamicColumn::Str(col) => write!(f, "{:?}", col)?,
}
write!(f, "]")
}
}
impl DynamicColumn {
pub fn column_index(&self) -> &ColumnIndex {
match self {
DynamicColumn::Bool(c) => &c.index,
DynamicColumn::I64(c) => &c.index,
DynamicColumn::U64(c) => &c.index,
DynamicColumn::F64(c) => &c.index,
DynamicColumn::IpAddr(c) => &c.index,
DynamicColumn::DateTime(c) => &c.index,
DynamicColumn::Bytes(c) => &c.ords().index,
DynamicColumn::Str(c) => &c.ords().index,
}
}
pub fn get_cardinality(&self) -> Cardinality {
self.column_index().get_cardinality()
}
pub fn num_values(&self) -> u32 {
match self {
DynamicColumn::Bool(c) => c.values.num_vals(),
DynamicColumn::I64(c) => c.values.num_vals(),
DynamicColumn::U64(c) => c.values.num_vals(),
DynamicColumn::F64(c) => c.values.num_vals(),
DynamicColumn::IpAddr(c) => c.values.num_vals(),
DynamicColumn::DateTime(c) => c.values.num_vals(),
DynamicColumn::Bytes(c) => c.ords().values.num_vals(),
DynamicColumn::Str(c) => c.ords().values.num_vals(),
DynamicColumn::Bool(c) => c.get_cardinality(),
DynamicColumn::I64(c) => c.get_cardinality(),
DynamicColumn::U64(c) => c.get_cardinality(),
DynamicColumn::F64(c) => c.get_cardinality(),
DynamicColumn::IpAddr(c) => c.get_cardinality(),
DynamicColumn::DateTime(c) => c.get_cardinality(),
DynamicColumn::Bytes(c) => c.ords().get_cardinality(),
DynamicColumn::Str(c) => c.ords().get_cardinality(),
}
}
pub fn column_type(&self) -> ColumnType {
match self {
DynamicColumn::Bool(_) => ColumnType::Bool,
@@ -108,11 +73,11 @@ impl DynamicColumn {
fn coerce_to_f64(self) -> Option<DynamicColumn> {
match self {
DynamicColumn::I64(column) => Some(DynamicColumn::F64(Column {
index: column.index,
idx: column.idx,
values: Arc::new(monotonic_map_column(column.values, MapI64ToF64)),
})),
DynamicColumn::U64(column) => Some(DynamicColumn::F64(Column {
index: column.index,
idx: column.idx,
values: Arc::new(monotonic_map_column(column.values, MapU64ToF64)),
})),
DynamicColumn::F64(_) => Some(self),
@@ -126,7 +91,7 @@ impl DynamicColumn {
return None;
}
Some(DynamicColumn::I64(Column {
index: column.index,
idx: column.idx,
values: Arc::new(monotonic_map_column(column.values, MapU64ToI64)),
}))
}
@@ -141,7 +106,7 @@ impl DynamicColumn {
return None;
}
Some(DynamicColumn::U64(Column {
index: column.index,
idx: column.idx,
values: Arc::new(monotonic_map_column(column.values, MapI64ToU64)),
}))
}

View File

@@ -7,10 +7,8 @@ extern crate more_asserts;
#[cfg(all(test, feature = "unstable"))]
extern crate test;
use std::fmt::Display;
use std::io;
mod block_accessor;
mod column;
mod column_index;
pub mod column_values;
@@ -21,12 +19,9 @@ mod iterable;
pub(crate) mod utils;
mod value;
pub use block_accessor::ColumnBlockAccessor;
pub use column::{BytesColumn, Column, StrColumn};
pub use column_index::ColumnIndex;
pub use column_values::{
ColumnValues, EmptyColumnValues, MonotonicallyMappableToU128, MonotonicallyMappableToU64,
};
pub use column_values::{ColumnValues, MonotonicallyMappableToU128, MonotonicallyMappableToU64};
pub use columnar::{
merge_columnar, ColumnType, ColumnarReader, ColumnarWriter, HasAssociatedColumnType,
MergeRowOrder, ShuffleMergeOrder, StackMergeOrder,
@@ -76,17 +71,6 @@ pub enum Cardinality {
Multivalued = 2,
}
impl Display for Cardinality {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let short_str = match self {
Cardinality::Full => "full",
Cardinality::Optional => "opt",
Cardinality::Multivalued => "mult",
};
write!(f, "{short_str}")
}
}
impl Cardinality {
pub fn is_optional(&self) -> bool {
matches!(self, Cardinality::Optional)
@@ -97,6 +81,7 @@ impl Cardinality {
pub(crate) fn to_code(self) -> u8 {
self as u8
}
pub(crate) fn try_from_code(code: u8) -> Result<Cardinality, InvalidData> {
match code {
0 => Ok(Cardinality::Full),

View File

@@ -1,17 +1,10 @@
use std::collections::HashMap;
use std::fmt::Debug;
use std::net::Ipv6Addr;
use common::DateTime;
use proptest::prelude::*;
use crate::column_values::MonotonicallyMappableToU128;
use crate::columnar::ColumnType;
use crate::dynamic_column::{DynamicColumn, DynamicColumnHandle};
use crate::value::{Coerce, NumericalValue};
use crate::{
BytesColumn, Cardinality, Column, ColumnarReader, ColumnarWriter, RowId, StackMergeOrder,
};
use crate::value::NumericalValue;
use crate::{Cardinality, ColumnarReader, ColumnarWriter};
#[test]
fn test_dataframe_writer_str() {
@@ -24,7 +17,7 @@ fn test_dataframe_writer_str() {
assert_eq!(columnar.num_columns(), 1);
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("my_string").unwrap();
assert_eq!(cols.len(), 1);
assert_eq!(cols[0].num_bytes(), 89);
assert_eq!(cols[0].num_bytes(), 158);
}
#[test]
@@ -38,7 +31,7 @@ fn test_dataframe_writer_bytes() {
assert_eq!(columnar.num_columns(), 1);
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("my_string").unwrap();
assert_eq!(cols.len(), 1);
assert_eq!(cols[0].num_bytes(), 89);
assert_eq!(cols[0].num_bytes(), 158);
}
#[test]
@@ -133,7 +126,7 @@ fn test_dataframe_writer_numerical() {
assert_eq!(cols[0].num_bytes(), 33);
let column = cols[0].open().unwrap();
let DynamicColumn::I64(column_i64) = column else { panic!(); };
assert_eq!(column_i64.index.get_cardinality(), Cardinality::Optional);
assert_eq!(column_i64.idx.get_cardinality(), Cardinality::Optional);
assert_eq!(column_i64.first(0), None);
assert_eq!(column_i64.first(1), Some(12i64));
assert_eq!(column_i64.first(2), Some(13i64));
@@ -217,422 +210,3 @@ fn test_dictionary_encoded_bytes() {
.unwrap();
assert_eq!(term_buffer, b"b");
}
fn num_strategy() -> impl Strategy<Value = NumericalValue> {
prop_oneof![
Just(NumericalValue::U64(0u64)),
Just(NumericalValue::U64(u64::MAX)),
Just(NumericalValue::I64(0i64)),
Just(NumericalValue::I64(i64::MIN)),
Just(NumericalValue::I64(i64::MAX)),
Just(NumericalValue::F64(1.2f64)),
]
}
#[derive(Debug, Clone, Copy)]
enum ColumnValue {
Str(&'static str),
Bytes(&'static [u8]),
Numerical(NumericalValue),
IpAddr(Ipv6Addr),
Bool(bool),
DateTime(DateTime),
}
impl ColumnValue {
pub(crate) fn column_type_category(&self) -> ColumnTypeCategory {
match self {
ColumnValue::Str(_) => ColumnTypeCategory::Str,
ColumnValue::Bytes(_) => ColumnTypeCategory::Bytes,
ColumnValue::Numerical(numerical_val) => ColumnTypeCategory::Numerical,
ColumnValue::IpAddr(_) => ColumnTypeCategory::IpAddr,
ColumnValue::Bool(_) => ColumnTypeCategory::Bool,
ColumnValue::DateTime(_) => ColumnTypeCategory::DateTime,
}
}
}
fn column_name_strategy() -> impl Strategy<Value = &'static str> {
prop_oneof![Just("c1"), Just("c2")]
}
fn string_strategy() -> impl Strategy<Value = &'static str> {
prop_oneof![Just("a"), Just("b")]
}
fn bytes_strategy() -> impl Strategy<Value = &'static [u8]> {
prop_oneof![Just(&[0u8][..]), Just(&[1u8][..])]
}
// A random column value
fn column_value_strategy() -> impl Strategy<Value = ColumnValue> {
prop_oneof![
string_strategy().prop_map(|s| ColumnValue::Str(s)),
bytes_strategy().prop_map(|b| ColumnValue::Bytes(b)),
num_strategy().prop_map(|n| ColumnValue::Numerical(n)),
(1u16..3u16).prop_map(|ip_addr_byte| ColumnValue::IpAddr(Ipv6Addr::new(
127,
0,
0,
0,
0,
0,
0,
ip_addr_byte
))),
any::<bool>().prop_map(|b| ColumnValue::Bool(b)),
(0_679_723_993i64..1_679_723_995i64)
.prop_map(|val| { ColumnValue::DateTime(DateTime::from_timestamp_secs(val)) })
]
}
// A document contains up to 4 values.
fn doc_strategy() -> impl Strategy<Value = Vec<(&'static str, ColumnValue)>> {
proptest::collection::vec((column_name_strategy(), column_value_strategy()), 0..4)
}
// A columnar contains up to 2 docs.
fn columnar_docs_strategy() -> impl Strategy<Value = Vec<Vec<(&'static str, ColumnValue)>>> {
proptest::collection::vec(doc_strategy(), 0..=2)
}
fn columnar_docs_and_mapping_strategy(
) -> impl Strategy<Value = (Vec<Vec<(&'static str, ColumnValue)>>, Vec<RowId>)> {
columnar_docs_strategy().prop_flat_map(|docs| {
permutation_strategy(docs.len()).prop_map(move |permutation| (docs.clone(), permutation))
})
}
fn permutation_strategy(n: usize) -> impl Strategy<Value = Vec<RowId>> {
Just((0u32..n as RowId).collect()).prop_shuffle()
}
fn build_columnar_with_mapping(
docs: &[Vec<(&'static str, ColumnValue)>],
old_to_new_row_ids_opt: Option<&[RowId]>,
) -> ColumnarReader {
let num_docs = docs.len() as u32;
let mut buffer = Vec::new();
let mut columnar_writer = ColumnarWriter::default();
for (doc_id, vals) in docs.iter().enumerate() {
for (column_name, col_val) in vals {
match *col_val {
ColumnValue::Str(str_val) => {
columnar_writer.record_str(doc_id as u32, column_name, str_val);
}
ColumnValue::Bytes(bytes) => {
columnar_writer.record_bytes(doc_id as u32, column_name, bytes)
}
ColumnValue::Numerical(num) => {
columnar_writer.record_numerical(doc_id as u32, column_name, num);
}
ColumnValue::IpAddr(ip_addr) => {
columnar_writer.record_ip_addr(doc_id as u32, column_name, ip_addr);
}
ColumnValue::Bool(bool_val) => {
columnar_writer.record_bool(doc_id as u32, column_name, bool_val);
}
ColumnValue::DateTime(date_time) => {
columnar_writer.record_datetime(doc_id as u32, column_name, date_time);
}
}
}
}
columnar_writer
.serialize(num_docs, old_to_new_row_ids_opt, &mut buffer)
.unwrap();
let columnar_reader = ColumnarReader::open(buffer).unwrap();
columnar_reader
}
fn build_columnar(docs: &[Vec<(&'static str, ColumnValue)>]) -> ColumnarReader {
build_columnar_with_mapping(docs, None)
}
fn assert_columnar_eq(left: &ColumnarReader, right: &ColumnarReader) {
assert_eq!(left.num_rows(), right.num_rows());
let left_columns = left.list_columns().unwrap();
let right_columns = right.list_columns().unwrap();
assert_eq!(left_columns.len(), right_columns.len());
for i in 0..left_columns.len() {
assert_eq!(left_columns[i].0, right_columns[i].0);
let left_column = left_columns[i].1.open().unwrap();
let right_column = right_columns[i].1.open().unwrap();
assert_dyn_column_eq(&left_column, &right_column);
}
}
fn assert_column_eq<T: PartialEq + Copy>(left: &Column<T>, right: &Column<T>) {}
fn assert_bytes_column_eq(left: &BytesColumn, right: &BytesColumn) {}
fn assert_dyn_column_eq(left_dyn_column: &DynamicColumn, right_dyn_column: &DynamicColumn) {
assert_eq!(
&left_dyn_column.column_type(),
&right_dyn_column.column_type()
);
assert_eq!(
&left_dyn_column.get_cardinality(),
&right_dyn_column.get_cardinality()
);
match &(left_dyn_column, right_dyn_column) {
(DynamicColumn::Bool(left_col), DynamicColumn::Bool(right_col)) => {
assert_column_eq(left_col, right_col);
}
(DynamicColumn::I64(left_col), DynamicColumn::I64(right_col)) => {
assert_column_eq(left_col, right_col);
}
(DynamicColumn::U64(left_col), DynamicColumn::U64(right_col)) => {
assert_column_eq(left_col, right_col);
}
(DynamicColumn::F64(left_col), DynamicColumn::F64(right_col)) => {
assert_column_eq(left_col, right_col);
}
(DynamicColumn::DateTime(left_col), DynamicColumn::DateTime(right_col)) => {
assert_column_eq(left_col, right_col);
}
(DynamicColumn::IpAddr(left_col), DynamicColumn::IpAddr(right_col)) => {
assert_column_eq(left_col, right_col);
}
(DynamicColumn::Bytes(left_col), DynamicColumn::Bytes(right_col)) => {
assert_bytes_column_eq(left_col, right_col);
}
(DynamicColumn::Str(left_col), DynamicColumn::Str(right_col)) => {
assert_bytes_column_eq(left_col, right_col);
}
_ => {
unreachable!()
}
}
}
trait AssertEqualToColumnValue {
fn assert_equal_to_column_value(&self, column_value: &ColumnValue);
}
use crate::columnar::ColumnTypeCategory;
impl AssertEqualToColumnValue for bool {
fn assert_equal_to_column_value(&self, column_value: &ColumnValue) {
let ColumnValue::Bool(val) = column_value else { panic!() };
assert_eq!(self, val);
}
}
impl AssertEqualToColumnValue for Ipv6Addr {
fn assert_equal_to_column_value(&self, column_value: &ColumnValue) {
let ColumnValue::IpAddr(val) = column_value else { panic!() };
assert_eq!(self, val);
}
}
impl<T: Coerce + PartialEq + Debug + Into<NumericalValue>> AssertEqualToColumnValue for T {
fn assert_equal_to_column_value(&self, column_value: &ColumnValue) {
let ColumnValue::Numerical(num) = column_value else { panic!() };
assert_eq!(self, &T::coerce(*num));
}
}
impl AssertEqualToColumnValue for DateTime {
fn assert_equal_to_column_value(&self, column_value: &ColumnValue) {
let ColumnValue::DateTime(dt) = column_value else { panic!() };
assert_eq!(self, dt);
}
}
fn assert_column_values<
T: AssertEqualToColumnValue + PartialEq + Copy + PartialOrd + Debug + Send + Sync + 'static,
>(
col: &Column<T>,
expected: &HashMap<u32, Vec<&ColumnValue>>,
) {
let mut num_non_empty_rows = 0;
for doc in 0..col.num_docs() {
let doc_vals: Vec<T> = col.values_for_doc(doc).collect();
if doc_vals.is_empty() {
continue;
}
num_non_empty_rows += 1;
let expected_vals = expected.get(&doc).unwrap();
assert_eq!(doc_vals.len(), expected_vals.len());
for (val, &expected) in doc_vals.iter().zip(expected_vals.iter()) {
val.assert_equal_to_column_value(expected)
}
}
assert_eq!(num_non_empty_rows, expected.len());
}
fn assert_bytes_column_values(
col: &BytesColumn,
expected: &HashMap<u32, Vec<&ColumnValue>>,
is_str: bool,
) {
let mut num_non_empty_rows = 0;
let mut buffer = Vec::new();
for doc in 0..col.term_ord_column.num_docs() {
let doc_vals: Vec<u64> = col.term_ords(doc).collect();
if doc_vals.is_empty() {
continue;
}
let expected_vals = expected.get(&doc).unwrap();
assert_eq!(doc_vals.len(), expected_vals.len());
for (&expected_col_val, &ord) in expected_vals.iter().zip(&doc_vals) {
col.ord_to_bytes(ord, &mut buffer).unwrap();
match expected_col_val {
ColumnValue::Str(str_val) => {
assert!(is_str);
assert_eq!(str_val.as_bytes(), &buffer);
}
ColumnValue::Bytes(bytes_val) => {
assert!(!is_str);
assert_eq!(bytes_val, &buffer);
}
_ => {
panic!();
}
}
}
num_non_empty_rows += 1;
}
assert_eq!(num_non_empty_rows, expected.len());
}
proptest! {
/// This proptest attempts to create a tiny columnar based of up to 3 rows, and checks that the resulting
/// columnar matches the row data.
#[test]
fn test_single_columnar_builder_proptest(docs in columnar_docs_strategy()) {
let columnar = build_columnar(&docs[..]);
assert_eq!(columnar.num_rows() as usize, docs.len());
let mut expected_columns: HashMap<(&str, ColumnTypeCategory), HashMap<u32, Vec<&ColumnValue>> > = Default::default();
for (doc_id, doc_vals) in docs.iter().enumerate() {
for (col_name, col_val) in doc_vals {
expected_columns
.entry((col_name, col_val.column_type_category()))
.or_default()
.entry(doc_id as u32)
.or_default()
.push(col_val);
}
}
let column_list = columnar.list_columns().unwrap();
assert_eq!(expected_columns.len(), column_list.len());
for (column_name, column) in column_list {
let dynamic_column = column.open().unwrap();
let col_category: ColumnTypeCategory = dynamic_column.column_type().into();
let expected_col_values: &HashMap<u32, Vec<&ColumnValue>> = expected_columns.get(&(column_name.as_str(), col_category)).unwrap();
match &dynamic_column {
DynamicColumn::Bool(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::I64(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::U64(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::F64(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::IpAddr(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::DateTime(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::Bytes(col) =>
assert_bytes_column_values(col, expected_col_values, false),
DynamicColumn::Str(col) =>
assert_bytes_column_values(col, expected_col_values, true),
}
}
}
/// Same as `test_single_columnar_builder_proptest` but with a shuffling mapping.
#[test]
fn test_single_columnar_builder_with_shuffle_proptest((docs, mapping) in columnar_docs_and_mapping_strategy()) {
let columnar = build_columnar_with_mapping(&docs[..], Some(&mapping));
assert_eq!(columnar.num_rows() as usize, docs.len());
let mut expected_columns: HashMap<(&str, ColumnTypeCategory), HashMap<u32, Vec<&ColumnValue>> > = Default::default();
for (doc_id, doc_vals) in docs.iter().enumerate() {
for (col_name, col_val) in doc_vals {
expected_columns
.entry((col_name, col_val.column_type_category()))
.or_default()
.entry(mapping[doc_id])
.or_default()
.push(col_val);
}
}
let column_list = columnar.list_columns().unwrap();
assert_eq!(expected_columns.len(), column_list.len());
for (column_name, column) in column_list {
let dynamic_column = column.open().unwrap();
let col_category: ColumnTypeCategory = dynamic_column.column_type().into();
let expected_col_values: &HashMap<u32, Vec<&ColumnValue>> = expected_columns.get(&(column_name.as_str(), col_category)).unwrap();
for doc_id in 0..columnar.num_rows() {
match &dynamic_column {
DynamicColumn::Bool(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::I64(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::U64(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::F64(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::IpAddr(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::DateTime(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::Bytes(col) =>
assert_bytes_column_values(col, expected_col_values, false),
DynamicColumn::Str(col) =>
assert_bytes_column_values(col, expected_col_values, true),
}
}
}
}
/// This tests create 2 or 3 random small columnar and attempts to merge them.
/// It compares the resulting merged dataframe with what would have been obtained by building the
/// dataframe from the concatenated rows to begin with.
#[test]
fn test_columnar_merge_proptest(columnar_docs in proptest::collection::vec(columnar_docs_strategy(), 2..=3)) {
let columnar_readers: Vec<ColumnarReader> = columnar_docs.iter()
.map(|docs| build_columnar(&docs[..]))
.collect::<Vec<_>>();
let columnar_readers_arr: Vec<&ColumnarReader> = columnar_readers.iter().collect();
let mut output: Vec<u8> = Vec::new();
let stack_merge_order = StackMergeOrder::stack(&columnar_readers_arr[..]);
crate::merge_columnar(&columnar_readers_arr[..], &[], crate::MergeRowOrder::Stack(stack_merge_order), &mut output).unwrap();
let merged_columnar = ColumnarReader::open(output).unwrap();
let concat_rows: Vec<Vec<(&'static str, ColumnValue)>> = columnar_docs.iter().cloned().flatten().collect();
let expected_merged_columnar = build_columnar(&concat_rows[..]);
assert_columnar_eq(&merged_columnar, &expected_merged_columnar);
}
}
#[test]
fn test_columnar_failing_test() {
let columnar_docs: Vec<Vec<Vec<(&str, ColumnValue)>>> =
vec![vec![], vec![vec![("c1", ColumnValue::Str("a"))]]];
let columnar_readers: Vec<ColumnarReader> = columnar_docs
.iter()
.map(|docs| build_columnar(&docs[..]))
.collect::<Vec<_>>();
let columnar_readers_arr: Vec<&ColumnarReader> = columnar_readers.iter().collect();
let mut output: Vec<u8> = Vec::new();
let stack_merge_order = StackMergeOrder::stack(&columnar_readers_arr[..]);
crate::merge_columnar(
&columnar_readers_arr[..],
&[],
crate::MergeRowOrder::Stack(stack_merge_order),
&mut output,
)
.unwrap();
let merged_columnar = ColumnarReader::open(output).unwrap();
let concat_rows: Vec<Vec<(&'static str, ColumnValue)>> =
columnar_docs.iter().cloned().flatten().collect();
let expected_merged_columnar = build_columnar(&concat_rows[..]);
assert_columnar_eq(&merged_columnar, &expected_merged_columnar);
}
// TODO add non trivial remap and merge
// TODO test required
// TODO add support for empty columnar.

View File

@@ -1,63 +0,0 @@
use std::io::{self, Read, Write};
use crate::BinarySerializable;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u32)]
pub enum DictionaryKind {
Fst = 1,
SSTable = 2,
}
#[derive(Debug, Clone, PartialEq)]
pub struct DictionaryFooter {
pub kind: DictionaryKind,
pub version: u32,
}
impl DictionaryFooter {
pub fn verify_equal(&self, other: &DictionaryFooter) -> io::Result<()> {
if self.kind != other.kind {
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"Invalid dictionary type, expected {:?}, found {:?}",
self.kind, other.kind
),
));
}
if self.version != other.version {
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"Unsuported dictionary version, expected {}, found {}",
self.version, other.version
),
));
}
Ok(())
}
}
impl BinarySerializable for DictionaryFooter {
fn serialize<W: Write + ?Sized>(&self, writer: &mut W) -> io::Result<()> {
self.version.serialize(writer)?;
(self.kind as u32).serialize(writer)
}
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
let version = u32::deserialize(reader)?;
let kind = u32::deserialize(reader)?;
let kind = match kind {
1 => DictionaryKind::Fst,
2 => DictionaryKind::SSTable,
_ => {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("invalid dictionary kind: {kind}"),
))
}
};
Ok(DictionaryFooter { kind, version })
}
}

View File

@@ -7,7 +7,6 @@ pub use byteorder::LittleEndian as Endianness;
mod bitset;
mod byte_count;
mod datetime;
mod dictionary_footer;
pub mod file_slice;
mod group_by;
mod serialize;
@@ -16,7 +15,6 @@ mod writer;
pub use bitset::*;
pub use byte_count::ByteCount;
pub use datetime::{DatePrecision, DateTime};
pub use dictionary_footer::*;
pub use group_by::GroupByIteratorExtended;
pub use ownedbytes::{OwnedBytes, StableDeref};
pub use serialize::{BinarySerializable, DeserializeFrom, FixedSize};

View File

@@ -8,7 +8,7 @@ use super::collector::DEFAULT_MEMORY_LIMIT;
use super::{AggregationError, DEFAULT_BUCKET_LIMIT};
use crate::TantivyError;
/// An estimate for memory consumption. Non recursive
/// An estimate for memory consumption
pub trait MemoryConsumption {
fn memory_consumption(&self) -> usize;
}
@@ -83,13 +83,12 @@ impl AggregationLimits {
self.memory_consumption
.fetch_add(num_bytes, std::sync::atomic::Ordering::Relaxed);
}
/// Returns the estimated memory consumed by the aggregations
pub fn get_memory_consumed(&self) -> ByteCount {
self.memory_consumption
.load(std::sync::atomic::Ordering::Relaxed)
.into()
}
pub(crate) fn get_bucket_limit(&self) -> u32 {
pub fn get_bucket_limit(&self) -> u32 {
self.bucket_limit
}
}

View File

@@ -1,6 +1,8 @@
//! This will enhance the request tree with access to the fastfield and metadata.
use columnar::{Column, ColumnBlockAccessor, ColumnType, StrColumn};
use std::sync::Arc;
use columnar::{Column, ColumnType, ColumnValues, StrColumn};
use super::agg_req::{Aggregation, Aggregations, BucketAggregationType, MetricAggregation};
use super::bucket::{
@@ -43,7 +45,6 @@ pub struct BucketAggregationWithAccessor {
pub(crate) bucket_agg: BucketAggregationType,
pub(crate) sub_aggregation: AggregationsWithAccessor,
pub(crate) limits: AggregationLimits,
pub(crate) column_block_accessor: ColumnBlockAccessor<u64>,
}
impl BucketAggregationWithAccessor {
@@ -84,7 +85,6 @@ impl BucketAggregationWithAccessor {
bucket_agg: bucket.clone(),
str_dict_column,
limits,
column_block_accessor: Default::default(),
})
}
}
@@ -95,7 +95,6 @@ pub struct MetricAggregationWithAccessor {
pub metric: MetricAggregation,
pub field_type: ColumnType,
pub accessor: Column<u64>,
pub column_block_accessor: ColumnBlockAccessor<u64>,
}
impl MetricAggregationWithAccessor {
@@ -116,7 +115,6 @@ impl MetricAggregationWithAccessor {
accessor,
field_type,
metric: metric.clone(),
column_block_accessor: Default::default(),
})
}
}
@@ -161,11 +159,31 @@ fn get_ff_reader_and_validate(
let ff_fields = reader.fast_fields();
let ff_field_with_type = ff_fields
.u64_lenient_with_type(field_name)?
.unwrap_or_else(|| {
(
Column::build_empty_column(reader.num_docs()),
ColumnType::U64,
)
});
.unwrap_or_else(|| (build_empty_column(reader.num_docs()), ColumnType::U64));
Ok(ff_field_with_type)
}
// Empty Column
fn build_empty_column(num_docs: u32) -> Column {
struct EmptyValues;
impl ColumnValues for EmptyValues {
fn get_val(&self, _idx: u32) -> u64 {
unimplemented!("Internal Error: Called get_val of empty column.")
}
fn min_value(&self) -> u64 {
unimplemented!("Internal Error: Called min_value of empty column.")
}
fn max_value(&self) -> u64 {
unimplemented!("Internal Error: Called max_value of empty column.")
}
fn num_vals(&self) -> u32 {
0
}
}
Column {
idx: columnar::ColumnIndex::Empty { num_docs },
values: Arc::new(EmptyValues),
}
}

View File

@@ -20,7 +20,7 @@ use crate::aggregation::segment_agg_result::{
build_segment_agg_collector, AggregationLimits, SegmentAggregationCollector,
};
use crate::aggregation::{f64_from_fastfield_u64, format_date, VecWithNames};
use crate::TantivyError;
use crate::{DocId, TantivyError};
/// Histogram is a bucket aggregation, where buckets are created dynamically for given `interval`.
/// Each document value is rounded down to its bucket.
@@ -235,7 +235,7 @@ impl SegmentAggregationCollector for SegmentHistogramCollector {
fn collect(
&mut self,
doc: crate::DocId,
agg_with_accessor: &mut AggregationsWithAccessor,
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<()> {
self.collect_block(&[doc], agg_with_accessor)
}
@@ -244,9 +244,11 @@ impl SegmentAggregationCollector for SegmentHistogramCollector {
fn collect_block(
&mut self,
docs: &[crate::DocId],
agg_with_accessor: &mut AggregationsWithAccessor,
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<()> {
let bucket_agg_accessor = &mut agg_with_accessor.buckets.values[self.accessor_idx];
let accessor = &agg_with_accessor.buckets.values[self.accessor_idx].accessor;
let sub_aggregation_accessor =
&agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;
let mem_pre = self.get_memory_consumption();
@@ -255,26 +257,20 @@ impl SegmentAggregationCollector for SegmentHistogramCollector {
let offset = self.offset;
let get_bucket_pos = |val| (get_bucket_pos_f64(val, interval, offset) as i64);
bucket_agg_accessor
.column_block_accessor
.fetch_block(docs, &bucket_agg_accessor.accessor);
for doc in docs {
for val in accessor.values_for_doc(*doc) {
let val = self.f64_from_fastfield_u64(val);
for (doc, val) in bucket_agg_accessor.column_block_accessor.iter_docid_vals() {
let val = self.f64_from_fastfield_u64(val);
let bucket_pos = get_bucket_pos(val);
let bucket_pos = get_bucket_pos(val);
if bounds.contains(val) {
let bucket = self.buckets.entry(bucket_pos).or_insert_with(|| {
let key = get_bucket_key_from_pos(bucket_pos as f64, interval, offset);
SegmentHistogramBucketEntry { key, doc_count: 0 }
});
bucket.doc_count += 1;
if let Some(sub_aggregation_blueprint) = self.sub_aggregation_blueprint.as_mut() {
self.sub_aggregations
.entry(bucket_pos)
.or_insert_with(|| sub_aggregation_blueprint.clone())
.collect(doc, &mut bucket_agg_accessor.sub_aggregation)?;
if bounds.contains(val) {
self.increment_bucket(
bucket_pos,
*doc,
sub_aggregation_accessor,
interval,
offset,
)?;
}
}
}
@@ -287,9 +283,9 @@ impl SegmentAggregationCollector for SegmentHistogramCollector {
Ok(())
}
fn flush(&mut self, agg_with_accessor: &mut AggregationsWithAccessor) -> crate::Result<()> {
fn flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> {
let sub_aggregation_accessor =
&mut agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;
&agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;
for sub_aggregation in self.sub_aggregations.values_mut() {
sub_aggregation.flush(sub_aggregation_accessor)?;
@@ -364,6 +360,29 @@ impl SegmentHistogramCollector {
})
}
#[inline]
fn increment_bucket(
&mut self,
bucket_pos: i64,
doc: DocId,
bucket_with_accessor: &AggregationsWithAccessor,
interval: f64,
offset: f64,
) -> crate::Result<()> {
let bucket = self.buckets.entry(bucket_pos).or_insert_with(|| {
let key = get_bucket_key_from_pos(bucket_pos as f64, interval, offset);
SegmentHistogramBucketEntry { key, doc_count: 0 }
});
bucket.doc_count += 1;
if let Some(sub_aggregation_blueprint) = self.sub_aggregation_blueprint.as_mut() {
self.sub_aggregations
.entry(bucket_pos)
.or_insert_with(|| sub_aggregation_blueprint.clone())
.collect(doc, bucket_with_accessor)?;
}
Ok(())
}
#[inline]
fn f64_from_fastfield_u64(&self, val: u64) -> f64 {
f64_from_fastfield_u64(val, &self.column_type)

View File

@@ -212,7 +212,7 @@ impl SegmentAggregationCollector for SegmentRangeCollector {
fn collect(
&mut self,
doc: crate::DocId,
agg_with_accessor: &mut AggregationsWithAccessor,
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<()> {
self.collect_block(&[doc], agg_with_accessor)
}
@@ -221,31 +221,30 @@ impl SegmentAggregationCollector for SegmentRangeCollector {
fn collect_block(
&mut self,
docs: &[crate::DocId],
agg_with_accessor: &mut AggregationsWithAccessor,
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<()> {
let bucket_agg_accessor = &mut agg_with_accessor.buckets.values[self.accessor_idx];
let accessor = &agg_with_accessor.buckets.values[self.accessor_idx].accessor;
let sub_aggregation_accessor =
&agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;
for doc in docs {
for val in accessor.values_for_doc(*doc) {
let bucket_pos = self.get_bucket_pos(val);
bucket_agg_accessor
.column_block_accessor
.fetch_block(docs, &bucket_agg_accessor.accessor);
let bucket = &mut self.buckets[bucket_pos];
for (doc, val) in bucket_agg_accessor.column_block_accessor.iter_docid_vals() {
let bucket_pos = self.get_bucket_pos(val);
let bucket = &mut self.buckets[bucket_pos];
bucket.bucket.doc_count += 1;
if let Some(sub_aggregation) = &mut bucket.bucket.sub_aggregation {
sub_aggregation.collect(doc, &mut bucket_agg_accessor.sub_aggregation)?;
bucket.bucket.doc_count += 1;
if let Some(sub_aggregation) = &mut bucket.bucket.sub_aggregation {
sub_aggregation.collect(*doc, sub_aggregation_accessor)?;
}
}
}
Ok(())
}
fn flush(&mut self, agg_with_accessor: &mut AggregationsWithAccessor) -> crate::Result<()> {
fn flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> {
let sub_aggregation_accessor =
&mut agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;
&agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;
for bucket in self.buckets.iter_mut() {
if let Some(sub_agg) = bucket.bucket.sub_aggregation.as_mut() {

View File

@@ -1,11 +1,10 @@
use std::fmt::Debug;
use columnar::ColumnType;
use columnar::{Cardinality, ColumnType};
use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize};
use super::{CustomOrder, Order, OrderTarget};
use crate::aggregation::agg_limits::MemoryConsumption;
use crate::aggregation::agg_req_with_accessor::{
AggregationsWithAccessor, BucketAggregationWithAccessor,
};
@@ -211,16 +210,7 @@ struct TermBuckets {
}
impl TermBuckets {
fn get_memory_consumption(&self) -> usize {
let sub_aggs_mem = self.sub_aggs.memory_consumption();
let buckets_mem = self.entries.memory_consumption();
sub_aggs_mem + buckets_mem
}
fn force_flush(
&mut self,
agg_with_accessor: &mut AggregationsWithAccessor,
) -> crate::Result<()> {
fn force_flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> {
for sub_aggregations in &mut self.sub_aggs.values_mut() {
sub_aggregations.as_mut().flush(agg_with_accessor)?;
}
@@ -238,6 +228,7 @@ pub struct SegmentTermCollector {
blueprint: Option<Box<dyn SegmentAggregationCollector>>,
field_type: ColumnType,
accessor_idx: usize,
val_cache: Vec<u64>,
}
pub(crate) fn get_agg_name_and_property(name: &str) -> (&str, &str) {
@@ -266,7 +257,7 @@ impl SegmentAggregationCollector for SegmentTermCollector {
fn collect(
&mut self,
doc: crate::DocId,
agg_with_accessor: &mut AggregationsWithAccessor,
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<()> {
self.collect_block(&[doc], agg_with_accessor)
}
@@ -275,42 +266,53 @@ impl SegmentAggregationCollector for SegmentTermCollector {
fn collect_block(
&mut self,
docs: &[crate::DocId],
agg_with_accessor: &mut AggregationsWithAccessor,
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<()> {
let bucket_agg_accessor = &mut agg_with_accessor.buckets.values[self.accessor_idx];
let accessor = &agg_with_accessor.buckets.values[self.accessor_idx].accessor;
let sub_aggregation_accessor =
&agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;
let mem_pre = self.get_memory_consumption();
bucket_agg_accessor
.column_block_accessor
.fetch_block(docs, &bucket_agg_accessor.accessor);
for term_id in bucket_agg_accessor.column_block_accessor.iter_vals() {
let entry = self.term_buckets.entries.entry(term_id).or_default();
*entry += 1;
}
// has subagg
if let Some(blueprint) = self.blueprint.as_ref() {
for (doc, term_id) in bucket_agg_accessor.column_block_accessor.iter_docid_vals() {
let sub_aggregations = self
.term_buckets
.sub_aggs
.entry(term_id)
.or_insert_with(|| blueprint.clone());
sub_aggregations.collect(doc, &mut bucket_agg_accessor.sub_aggregation)?;
if accessor.get_cardinality() == Cardinality::Full {
self.val_cache.resize(docs.len(), 0);
accessor.values.get_vals(docs, &mut self.val_cache);
for term_id in self.val_cache.iter().cloned() {
let entry = self.term_buckets.entries.entry(term_id).or_default();
*entry += 1;
}
// has subagg
if let Some(blueprint) = self.blueprint.as_ref() {
for (doc, term_id) in docs.iter().zip(self.val_cache.iter().cloned()) {
let sub_aggregations = self
.term_buckets
.sub_aggs
.entry(term_id)
.or_insert_with(|| blueprint.clone());
sub_aggregations.collect(*doc, sub_aggregation_accessor)?;
}
}
} else {
for doc in docs {
for term_id in accessor.values_for_doc(*doc) {
let entry = self.term_buckets.entries.entry(term_id).or_default();
*entry += 1;
// TODO: check if seperate loop is faster (may depend on the codec)
if let Some(blueprint) = self.blueprint.as_ref() {
let sub_aggregations = self
.term_buckets
.sub_aggs
.entry(term_id)
.or_insert_with(|| blueprint.clone());
sub_aggregations.collect(*doc, sub_aggregation_accessor)?;
}
}
}
}
let mem_delta = self.get_memory_consumption() - mem_pre;
let limits = &agg_with_accessor.buckets.values[self.accessor_idx].limits;
limits.add_memory_consumed(mem_delta as u64);
limits.validate_memory_consumption()?;
Ok(())
}
fn flush(&mut self, agg_with_accessor: &mut AggregationsWithAccessor) -> crate::Result<()> {
fn flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> {
let sub_aggregation_accessor =
&mut agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;
&agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;
self.term_buckets.force_flush(sub_aggregation_accessor)?;
Ok(())
@@ -318,12 +320,6 @@ impl SegmentAggregationCollector for SegmentTermCollector {
}
impl SegmentTermCollector {
fn get_memory_consumption(&self) -> usize {
let self_mem = std::mem::size_of::<Self>();
let term_buckets_mem = self.term_buckets.get_memory_consumption();
self_mem + term_buckets_mem
}
pub(crate) fn from_req_and_validate(
req: &TermsAggregation,
sub_aggregations: &AggregationsWithAccessor,
@@ -360,6 +356,7 @@ impl SegmentTermCollector {
blueprint,
field_type,
accessor_idx,
val_cache: Default::default(),
})
}
@@ -528,10 +525,9 @@ mod tests {
};
use crate::aggregation::metric::{AverageAggregation, StatsAggregation};
use crate::aggregation::tests::{
exec_request, exec_request_with_query, exec_request_with_query_and_memory_limit,
get_test_index_from_terms, get_test_index_from_values_and_terms,
exec_request, exec_request_with_query, get_test_index_from_terms,
get_test_index_from_values_and_terms,
};
use crate::aggregation::AggregationLimits;
#[test]
fn terms_aggregation_test_single_segment() -> crate::Result<()> {
@@ -1336,40 +1332,34 @@ mod tests {
Ok(())
}
#[test]
fn terms_aggregation_term_bucket_limit() -> crate::Result<()> {
let terms: Vec<String> = (0..20_000).map(|el| el.to_string()).collect();
let terms_per_segment = vec![terms.iter().map(|el| el.as_str()).collect()];
// TODO reenable with memory limit
//#[test]
// fn terms_aggregation_term_bucket_limit() -> crate::Result<()> {
// let terms: Vec<String> = (0..100_000).map(|el| el.to_string()).collect();
// let terms_per_segment = vec![terms.iter().map(|el| el.as_str()).collect()];
let index = get_test_index_from_terms(true, &terms_per_segment)?;
// let index = get_test_index_from_terms(true, &terms_per_segment)?;
let agg_req: Aggregations = vec![(
"my_texts".to_string(),
Aggregation::Bucket(Box::new(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(),
min_doc_count: Some(0),
..Default::default()
}),
sub_aggregation: Default::default(),
})),
)]
.into_iter()
.collect();
// let agg_req: Aggregations = vec![(
//"my_texts".to_string(),
// Aggregation::Bucket(BucketAggregation {
// bucket_agg: BucketAggregationType::Terms(TermsAggregation {
// field: "string_id".to_string(),
// min_doc_count: Some(0),
//..Default::default()
//}),
// sub_aggregation: Default::default(),
//}),
//)]
//.into_iter()
//.collect();
let res = exec_request_with_query_and_memory_limit(
agg_req,
&index,
None,
AggregationLimits::new(Some(50_000), None),
)
.unwrap_err();
assert!(res
.to_string()
.contains("Aborting aggregation because memory limit was exceeded. Limit: 50.00 KB"));
// let res = exec_request_with_query(agg_req, &index, None);
Ok(())
}
// assert!(res.is_err());
// Ok(())
//}
#[test]
fn terms_aggregation_different_tokenizer_on_ff_test() -> crate::Result<()> {

View File

@@ -46,7 +46,7 @@ impl SegmentAggregationCollector for BufAggregationCollector {
fn collect(
&mut self,
doc: crate::DocId,
agg_with_accessor: &mut AggregationsWithAccessor,
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<()> {
self.staged_docs[self.num_staged_docs] = doc;
self.num_staged_docs += 1;
@@ -62,7 +62,7 @@ impl SegmentAggregationCollector for BufAggregationCollector {
fn collect_block(
&mut self,
docs: &[crate::DocId],
agg_with_accessor: &mut AggregationsWithAccessor,
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<()> {
self.collector.collect_block(docs, agg_with_accessor)?;
@@ -70,7 +70,7 @@ impl SegmentAggregationCollector for BufAggregationCollector {
}
#[inline]
fn flush(&mut self, agg_with_accessor: &mut AggregationsWithAccessor) -> crate::Result<()> {
fn flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> {
self.collector
.collect_block(&self.staged_docs[..self.num_staged_docs], agg_with_accessor)?;
self.num_staged_docs = 0;

View File

@@ -156,10 +156,7 @@ impl SegmentCollector for AggregationSegmentCollector {
if self.error.is_some() {
return;
}
if let Err(err) = self
.agg_collector
.collect(doc, &mut self.aggs_with_accessor)
{
if let Err(err) = self.agg_collector.collect(doc, &self.aggs_with_accessor) {
self.error = Some(err);
}
}
@@ -173,7 +170,7 @@ impl SegmentCollector for AggregationSegmentCollector {
}
if let Err(err) = self
.agg_collector
.collect_block(docs, &mut self.aggs_with_accessor)
.collect_block(docs, &self.aggs_with_accessor)
{
self.error = Some(err);
}
@@ -183,7 +180,7 @@ impl SegmentCollector for AggregationSegmentCollector {
if let Some(err) = self.error {
return Err(err);
}
self.agg_collector.flush(&mut self.aggs_with_accessor)?;
self.agg_collector.flush(&self.aggs_with_accessor)?;
Box::new(self.agg_collector).into_intermediate_aggregations_result(&self.aggs_with_accessor)
}
}

View File

@@ -45,6 +45,7 @@ impl IntermediateAggregationResults {
req: Aggregations,
limits: &AggregationLimits,
) -> crate::Result<AggregationResults> {
// TODO count and validate buckets
let res = self.into_final_bucket_result_internal(&(req.into()), limits)?;
let bucket_count = res.get_bucket_count() as u32;
if bucket_count > limits.get_bucket_limit() {

View File

@@ -1,10 +1,8 @@
use columnar::ColumnType;
use columnar::{Cardinality, Column, ColumnType};
use serde::{Deserialize, Serialize};
use super::*;
use crate::aggregation::agg_req_with_accessor::{
AggregationsWithAccessor, MetricAggregationWithAccessor,
};
use crate::aggregation::agg_req_with_accessor::AggregationsWithAccessor;
use crate::aggregation::intermediate_agg_result::{
IntermediateAggregationResults, IntermediateMetricResult,
};
@@ -176,18 +174,21 @@ impl SegmentStatsCollector {
}
}
#[inline]
pub(crate) fn collect_block_with_field(
&mut self,
docs: &[DocId],
agg_accessor: &mut MetricAggregationWithAccessor,
) {
agg_accessor
.column_block_accessor
.fetch_block(docs, &agg_accessor.accessor);
for val in agg_accessor.column_block_accessor.iter_vals() {
let val1 = f64_from_fastfield_u64(val, &self.field_type);
self.stats.collect(val1);
pub(crate) fn collect_block_with_field(&mut self, docs: &[DocId], field: &Column<u64>) {
if field.get_cardinality() == Cardinality::Full {
self.val_cache.resize(docs.len(), 0);
field.values.get_vals(docs, &mut self.val_cache);
for val in self.val_cache.iter() {
let val1 = f64_from_fastfield_u64(*val, &self.field_type);
self.stats.collect(val1);
}
} else {
for doc in docs {
for val in field.values_for_doc(*doc) {
let val1 = f64_from_fastfield_u64(val, &self.field_type);
self.stats.collect(val1);
}
}
}
}
}
@@ -234,7 +235,7 @@ impl SegmentAggregationCollector for SegmentStatsCollector {
fn collect(
&mut self,
doc: crate::DocId,
agg_with_accessor: &mut AggregationsWithAccessor,
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<()> {
let field = &agg_with_accessor.metrics.values[self.accessor_idx].accessor;
@@ -250,9 +251,9 @@ impl SegmentAggregationCollector for SegmentStatsCollector {
fn collect_block(
&mut self,
docs: &[crate::DocId],
agg_with_accessor: &mut AggregationsWithAccessor,
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<()> {
let field = &mut agg_with_accessor.metrics.values[self.accessor_idx];
let field = &agg_with_accessor.metrics.values[self.accessor_idx].accessor;
self.collect_block_with_field(docs, field);
Ok(())
}

View File

@@ -28,18 +28,18 @@ pub(crate) trait SegmentAggregationCollector: CollectorClone + Debug {
fn collect(
&mut self,
doc: crate::DocId,
agg_with_accessor: &mut AggregationsWithAccessor,
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<()>;
fn collect_block(
&mut self,
docs: &[crate::DocId],
agg_with_accessor: &mut AggregationsWithAccessor,
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<()>;
/// Finalize method. Some Aggregator collect blocks of docs before calling `collect_block`.
/// This method ensures those staged docs will be collected.
fn flush(&mut self, _agg_with_accessor: &mut AggregationsWithAccessor) -> crate::Result<()> {
fn flush(&mut self, _agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> {
Ok(())
}
}
@@ -206,7 +206,7 @@ impl SegmentAggregationCollector for GenericSegmentAggregationResultsCollector {
fn collect(
&mut self,
doc: crate::DocId,
agg_with_accessor: &mut AggregationsWithAccessor,
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<()> {
self.collect_block(&[doc], agg_with_accessor)?;
@@ -216,7 +216,7 @@ impl SegmentAggregationCollector for GenericSegmentAggregationResultsCollector {
fn collect_block(
&mut self,
docs: &[crate::DocId],
agg_with_accessor: &mut AggregationsWithAccessor,
agg_with_accessor: &AggregationsWithAccessor,
) -> crate::Result<()> {
if let Some(metrics) = self.metrics.as_mut() {
for collector in metrics {
@@ -233,7 +233,7 @@ impl SegmentAggregationCollector for GenericSegmentAggregationResultsCollector {
Ok(())
}
fn flush(&mut self, agg_with_accessor: &mut AggregationsWithAccessor) -> crate::Result<()> {
fn flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> {
if let Some(metrics) = &mut self.metrics {
for collector in metrics {
collector.flush(agg_with_accessor)?;

View File

@@ -7,7 +7,7 @@ use std::sync::{Arc, RwLock, Weak};
use std::{fmt, result};
use common::StableDeref;
use fs4::FileExt;
use fs2::FileExt;
use memmap2::Mmap;
use serde::{Deserialize, Serialize};
use tempfile::TempDir;

View File

@@ -130,7 +130,7 @@ mod tests {
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 95);
assert_eq!(file.len(), 161);
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
let column = fast_field_readers
.u64("field")
@@ -180,7 +180,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 123);
assert_eq!(file.len(), 189);
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
let col = fast_field_readers
.u64("field")
@@ -213,7 +213,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 96);
assert_eq!(file.len(), 162);
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
let fast_field_reader = fast_field_readers
.u64("field")
@@ -245,7 +245,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 4491);
assert_eq!(file.len(), 4557);
{
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
let col = fast_field_readers
@@ -278,7 +278,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 267);
assert_eq!(file.len(), 333_usize);
{
let fast_field_readers = FastFieldReaders::open(file, schema).unwrap();
@@ -772,7 +772,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 104);
assert_eq!(file.len(), 175);
let fast_field_readers = FastFieldReaders::open(file, schema).unwrap();
let bool_col = fast_field_readers.bool("field_bool").unwrap();
assert_eq!(bool_col.first(0), Some(true));
@@ -804,7 +804,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 116);
assert_eq!(file.len(), 187);
let readers = FastFieldReaders::open(file, schema).unwrap();
let bool_col = readers.bool("field_bool").unwrap();
for i in 0..25 {
@@ -829,7 +829,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 106);
assert_eq!(file.len(), 177);
let fastfield_readers = FastFieldReaders::open(file, schema).unwrap();
let col = fastfield_readers.bool("field_bool").unwrap();
assert_eq!(col.first(0), None);

View File

@@ -6,6 +6,8 @@ license = "MIT"
[dependencies]
common = {path="../common", package="tantivy-common"}
ciborium = "0.2"
serde = "1"
tantivy-fst = "0.4"
[dev-dependencies]

View File

@@ -26,95 +26,3 @@ possible.
- it allows incremental encoding of the keys
- the front compression is leveraged to optimize
the intersection with an automaton
# On disk format
Overview of the SSTable format. Unless noted otherwise, numbers are little-endian.
### SSTable
```
+-------+-------+-----+--------+
| Block | Block | ... | Footer |
+-------+-------+-----+--------+
|----( # of blocks)---|
```
- Block(`SSTBlock`): list of independent block, terminated by a single empty block.
- Footer(`SSTFooter`)
### SSTBlock
```
+----------+--------+-------+-------+-----+
| BlockLen | Values | Delta | Delta | ... |
+----------+--------+-------+-------+-----+
|----( # of deltas)---|
```
- BlockLen(u32): length of the block
- Values: an application defined format storing a sequence of value, capable of determining it own length
- Delta
### Delta
```
+---------+--------+
| KeepAdd | Suffix |
+---------+--------+
```
- KeepAdd
- Suffix: KeepAdd.add bytes of key suffix
### KeepAdd
KeepAdd can be represented in two different representation, a very compact 1byte one which is enough for most usage, and a longer variable-len one when required
When keep < 16 and add < 16
```
+-----+------+
| Add | Keep |
+-----+------+
```
- Add(u4): number of bytes to push
- Keep(u4): number of bytes to pop
Otherwise:
```
+------+------+-----+
| 0x01 | Keep | Add |
+------+------+-----+
```
- Add(VInt): number of bytes to push
- Keep(VInt): number of bytes to pop
Note: there is no ambiguity between both representation as Add is always guarantee to be non-zero, except for the very first key of an SSTable, where Keep is guaranteed to be zero.
### SSTFooter
```
+-------+-------+-----+-------------+---------+---------+------+
| Block | Block | ... | IndexOffset | NumTerm | Version | Type |
+-------+-------+-----+-------------+---------+---------+------+
|----( # of blocks)---|
```
- Block(SSTBlock): uses IndexValue for its Values format
- IndexOffset(u64): Offset to the start of the SSTFooter
- NumTerm(u64): number of terms in the sstable
- Version(u32): Currently defined to 0x00\_00\_00\_01
- Type(u32): Defined to 0x00\_00\_00\_02
### IndexValue
```
+------------+----------+-------+-------+-----+
| EntryCount | StartPos | Entry | Entry | ... |
+------------+----------+-------+-------+-----+
|---( # of entries)---|
```
- EntryCount(VInt): number of entries
- StartPos(VInt): the start pos of the first (data) block referenced by this (index) block
- Entry (IndexEntry)
### Entry
```
+----------+--------------+
| BlockLen | FirstOrdinal |
+----------+--------------+
```
- BlockLen(VInt): length of the block
- FirstOrdinal(VInt): ordinal of the first element in the given block

View File

@@ -18,7 +18,6 @@ where W: io::Write
value_writer: TValueWriter,
// Only here to avoid allocations.
stateless_buffer: Vec<u8>,
block_len: usize,
}
impl<W, TValueWriter> DeltaWriter<W, TValueWriter>
@@ -32,14 +31,15 @@ where
write: CountingWriter::wrap(BufWriter::new(wrt)),
value_writer: TValueWriter::default(),
stateless_buffer: Vec::new(),
block_len: BLOCK_LEN,
}
}
}
pub fn set_block_len(&mut self, block_len: usize) {
self.block_len = block_len
}
impl<W, TValueWriter> DeltaWriter<W, TValueWriter>
where
W: io::Write,
TValueWriter: value::ValueWriter,
{
pub fn flush_block(&mut self) -> io::Result<Option<Range<usize>>> {
if self.block.is_empty() {
return Ok(None);
@@ -82,7 +82,7 @@ where
}
pub fn flush_block_if_required(&mut self) -> io::Result<Option<Range<usize>>> {
if self.block.len() > self.block_len {
if self.block.len() > BLOCK_LEN {
return self.flush_block();
}
Ok(None)

View File

@@ -5,7 +5,7 @@ use std::ops::{Bound, RangeBounds};
use std::sync::Arc;
use common::file_slice::FileSlice;
use common::{BinarySerializable, DictionaryFooter, OwnedBytes};
use common::{BinarySerializable, OwnedBytes};
use tantivy_fst::automaton::AlwaysMatch;
use tantivy_fst::Automaton;
@@ -110,7 +110,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
/// only block for up to `limit` matching terms.
///
/// It works by identifying
/// - `first_block`: the block containing the start boundary key
/// - `first_block`: the block containing the start boudary key
/// - `last_block`: the block containing the end boundary key.
///
/// And then returning the range that spans over all blocks between.
@@ -178,15 +178,10 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
/// Opens a `TermDictionary`.
pub fn open(term_dictionary_file: FileSlice) -> io::Result<Self> {
let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(24);
let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(16);
let mut footer_len_bytes: OwnedBytes = footer_len_slice.read_bytes()?;
let index_offset = u64::deserialize(&mut footer_len_bytes)?;
let num_terms = u64::deserialize(&mut footer_len_bytes)?;
let footer = DictionaryFooter::deserialize(&mut footer_len_bytes)?;
crate::FOOTER.verify_equal(&footer)?;
let (sstable_slice, index_slice) = main_slice.split(index_offset as usize);
let sstable_index_bytes = index_slice.read_bytes()?;
let sstable_index = SSTableIndex::load(sstable_index_bytes.as_slice())
@@ -236,7 +231,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
let suffix = sstable_delta_reader.suffix();
match prefix_len.cmp(&ok_bytes) {
Ordering::Less => return Ok(None), // popped bytes already matched => too far
Ordering::Less => return Ok(None), // poped bytes already matched => too far
Ordering::Equal => (),
Ordering::Greater => {
// the ok prefix is less than current entry prefix => continue to next elem

View File

@@ -17,8 +17,6 @@ pub use dictionary::Dictionary;
pub use streamer::{Streamer, StreamerBuilder};
mod block_reader;
use common::{BinarySerializable, DictionaryFooter, DictionaryKind};
pub use self::block_reader::BlockReader;
pub use self::delta::{DeltaReader, DeltaWriter};
pub use self::merge::VoidMerge;
@@ -28,10 +26,6 @@ use crate::value::{RangeValueReader, RangeValueWriter};
pub type TermOrdinal = u64;
const DEFAULT_KEY_CAPACITY: usize = 50;
const FOOTER: DictionaryFooter = DictionaryFooter {
kind: DictionaryKind::SSTable,
version: 1,
};
/// Given two byte string returns the length of
/// the longest common prefix.
@@ -207,14 +201,6 @@ where
}
}
/// Set the target block length.
///
/// The delta part of a block will generally be slightly larger than the requested `block_len`,
/// however this does not account for the length of the Value part of the table.
pub fn set_block_len(&mut self, block_len: usize) {
self.delta_writer.set_block_len(block_len)
}
/// Returns the last inserted key.
/// If no key has been inserted yet, or the block was just
/// flushed, this function returns "".
@@ -302,7 +288,6 @@ where
self.first_ordinal_of_the_block = self.num_terms;
}
let mut wrt = self.delta_writer.finish();
// add a final empty block as an end marker
wrt.write_all(&0u32.to_le_bytes())?;
let offset = wrt.written_bytes();
@@ -310,9 +295,6 @@ where
self.index_builder.serialize(&mut wrt)?;
wrt.write_all(&offset.to_le_bytes())?;
wrt.write_all(&self.num_terms.to_le_bytes())?;
FOOTER.serialize(&mut wrt)?;
let wrt = wrt.finish();
Ok(wrt.into_inner()?)
}
@@ -389,26 +371,19 @@ mod test {
assert_eq!(
&buffer,
&[
// block
7u8, 0u8, 0u8, 0u8, // block len
16u8, 17u8, // keep 0 push 1 | 17
33u8, 18u8, 19u8, // keep 1 push 2 | 18 19
17u8, 20u8, // keep 1 push 1 | 20
// end of block
0u8, 0u8, 0u8, 0u8, // no more blocks
// block len
7u8, 0u8, 0u8, 0u8, // keep 0 push 1 | ""
16u8, 17u8, // keep 1 push 2 | 18 19
33u8, 18u8, 19u8, // keep 1 push 1 | 20
17u8, 20u8, 0u8, 0u8, 0u8, 0u8, // no more blocks
// index
7u8, 0u8, 0u8, 0u8, // block len
1, // num blocks
0, // offset
11, // len of 1st block
0, // first ord of 1st block
32, 17, 20, // keep 0 push 2 | 17 20
// end of block
0, 0, 0, 0, // no more blocks
15, 0, 0, 0, 0, 0, 0, 0, // index start offset
3, 0, 0, 0, 0, 0, 0, 0, // num_term
1, 0, 0, 0, // version
2, 0, 0, 0, // dictionary kind. sstable = 2
161, 102, 98, 108, 111, 99, 107, 115, 129, 162, 115, 108, 97, 115, 116, 95, 107,
101, 121, 95, 111, 114, 95, 103, 114, 101, 97, 116, 101, 114, 130, 17, 20, 106, 98,
108, 111, 99, 107, 95, 97, 100, 100, 114, 162, 106, 98, 121, 116, 101, 95, 114, 97,
110, 103, 101, 162, 101, 115, 116, 97, 114, 116, 0, 99, 101, 110, 100, 11, 109,
102, 105, 114, 115, 116, 95, 111, 114, 100, 105, 110, 97, 108, 0, 15, 0, 0, 0, 0,
0, 0, 0, // offset for the index
3u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8 // num terms
]
);
let mut sstable_reader = VoidSSTable::reader(&buffer[..]);
@@ -526,8 +501,8 @@ mod test {
fn test_proptest_sstable_ranges(words in prop::collection::btree_set("[a-c]{0,6}", 1..100),
(lower_bound, upper_bound) in bounds_strategy(),
) {
// TODO tweak block size.
let mut builder = Dictionary::<VoidSSTable>::builder(Vec::new()).unwrap();
builder.set_block_len(16);
for word in &words {
builder.insert(word.as_bytes(), &()).unwrap();
}

View File

@@ -1,9 +1,11 @@
use std::io::{self, Write};
use std::io;
use std::ops::Range;
use crate::{common_prefix_len, SSTable, SSTableDataCorruption, TermOrdinal};
use serde::{Deserialize, Serialize};
#[derive(Default, Debug, Clone)]
use crate::{common_prefix_len, SSTableDataCorruption, TermOrdinal};
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub struct SSTableIndex {
blocks: Vec<BlockMeta>,
}
@@ -11,17 +13,7 @@ pub struct SSTableIndex {
impl SSTableIndex {
/// Load an index from its binary representation
pub fn load(data: &[u8]) -> Result<SSTableIndex, SSTableDataCorruption> {
let mut reader = IndexSSTable::reader(data);
let mut blocks = Vec::new();
while reader.advance().map_err(|_| SSTableDataCorruption)? {
blocks.push(BlockMeta {
last_key_or_greater: reader.key().to_vec(),
block_addr: reader.value().clone(),
});
}
Ok(SSTableIndex { blocks })
ciborium::de::from_reader(data).map_err(|_| SSTableDataCorruption)
}
/// Get the [`BlockAddr`] of the requested block.
@@ -31,7 +23,7 @@ impl SSTableIndex {
.map(|block_meta| block_meta.block_addr.clone())
}
/// Get the block id of the block that would contain `key`.
/// Get the block id of the block that woudl contain `key`.
///
/// Returns None if `key` is lexicographically after the last key recorded.
pub(crate) fn locate_with_key(&self, key: &[u8]) -> Option<usize> {
@@ -77,13 +69,13 @@ impl SSTableIndex {
}
}
#[derive(Clone, Eq, PartialEq, Debug)]
#[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize)]
pub struct BlockAddr {
pub byte_range: Range<usize>,
pub first_ordinal: u64,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct BlockMeta {
/// Any byte string that is lexicographically greater or equal to
/// the last key in the block,
@@ -138,45 +130,11 @@ impl SSTableIndexBuilder {
}
pub fn serialize<W: std::io::Write>(&self, wrt: W) -> io::Result<()> {
// we can't use a plain writer as it would generate an index
let mut sstable_writer = IndexSSTable::delta_writer(wrt);
// in tests, set a smaller block size to stress-test
#[cfg(test)]
sstable_writer.set_block_len(16);
let mut previous_key = Vec::with_capacity(crate::DEFAULT_KEY_CAPACITY);
for block in self.index.blocks.iter() {
let keep_len = common_prefix_len(&previous_key, &block.last_key_or_greater);
sstable_writer.write_suffix(keep_len, &block.last_key_or_greater[keep_len..]);
sstable_writer.write_value(&block.block_addr);
sstable_writer.flush_block_if_required()?;
previous_key.clear();
previous_key.extend_from_slice(&block.last_key_or_greater);
}
sstable_writer.flush_block()?;
sstable_writer.finish().write_all(&0u32.to_le_bytes())?;
Ok(())
ciborium::ser::into_writer(&self.index, wrt)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
}
}
/// SSTable representing an index
///
/// `last_key_or_greater` is used as the key, the value contains the
/// length and first ordinal of each block. The start offset is implicitly
/// obtained from lengths.
struct IndexSSTable;
impl SSTable for IndexSSTable {
type Value = BlockAddr;
type ValueReader = crate::value::index::IndexValueReader;
type ValueWriter = crate::value::index::IndexValueWriter;
}
#[cfg(test)]
mod tests {
use super::{BlockAddr, SSTableIndex, SSTableIndexBuilder};

View File

@@ -1,132 +0,0 @@
use std::io;
use crate::value::{deserialize_vint_u64, ValueReader, ValueWriter};
use crate::{vint, BlockAddr};
#[derive(Default)]
pub(crate) struct IndexValueReader {
vals: Vec<BlockAddr>,
}
impl ValueReader for IndexValueReader {
type Value = BlockAddr;
#[inline(always)]
fn value(&self, idx: usize) -> &Self::Value {
&self.vals[idx]
}
fn load(&mut self, mut data: &[u8]) -> io::Result<usize> {
let original_num_bytes = data.len();
let num_vals = deserialize_vint_u64(&mut data) as usize;
self.vals.clear();
let mut first_ordinal = 0u64;
let mut prev_start = deserialize_vint_u64(&mut data) as usize;
for _ in 0..num_vals {
let len = deserialize_vint_u64(&mut data);
let delta_ordinal = deserialize_vint_u64(&mut data);
first_ordinal += delta_ordinal;
let end = prev_start + len as usize;
self.vals.push(BlockAddr {
byte_range: prev_start..end,
first_ordinal,
});
prev_start = end;
}
Ok(original_num_bytes - data.len())
}
}
#[derive(Default)]
pub(crate) struct IndexValueWriter {
vals: Vec<BlockAddr>,
}
impl ValueWriter for IndexValueWriter {
type Value = BlockAddr;
fn write(&mut self, val: &Self::Value) {
self.vals.push(val.clone());
}
fn serialize_block(&self, output: &mut Vec<u8>) {
let mut prev_ord = 0u64;
vint::serialize_into_vec(self.vals.len() as u64, output);
let start_pos = if let Some(block_addr) = self.vals.first() {
block_addr.byte_range.start as u64
} else {
0
};
vint::serialize_into_vec(start_pos, output);
// TODO use array_windows when it gets stabilized
for elem in self.vals.windows(2) {
let [current, next] = elem else {
unreachable!("windows should always return exactly 2 elements");
};
let len = next.byte_range.start - current.byte_range.start;
vint::serialize_into_vec(len as u64, output);
let delta = current.first_ordinal - prev_ord;
vint::serialize_into_vec(delta, output);
prev_ord = current.first_ordinal;
}
if let Some(last) = self.vals.last() {
let len = last.byte_range.end - last.byte_range.start;
vint::serialize_into_vec(len as u64, output);
let delta = last.first_ordinal - prev_ord;
vint::serialize_into_vec(delta, output);
}
}
fn clear(&mut self) {
self.vals.clear();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_index_reader_writer() {
crate::value::tests::test_value_reader_writer::<_, IndexValueReader, IndexValueWriter>(&[]);
crate::value::tests::test_value_reader_writer::<_, IndexValueReader, IndexValueWriter>(&[
BlockAddr {
byte_range: 0..10,
first_ordinal: 0,
},
]);
crate::value::tests::test_value_reader_writer::<_, IndexValueReader, IndexValueWriter>(&[
BlockAddr {
byte_range: 0..10,
first_ordinal: 0,
},
BlockAddr {
byte_range: 10..20,
first_ordinal: 5,
},
]);
crate::value::tests::test_value_reader_writer::<_, IndexValueReader, IndexValueWriter>(&[
BlockAddr {
byte_range: 0..10,
first_ordinal: 0,
},
BlockAddr {
byte_range: 10..20,
first_ordinal: 5,
},
BlockAddr {
byte_range: 20..30,
first_ordinal: 10,
},
]);
crate::value::tests::test_value_reader_writer::<_, IndexValueReader, IndexValueWriter>(&[
BlockAddr {
byte_range: 5..10,
first_ordinal: 2,
},
]);
}
}

View File

@@ -1,4 +1,3 @@
pub(crate) mod index;
mod range;
mod u64_monotonic;
mod void;