Compare commits

...

7 Commits

Author SHA1 Message Date
Pascal Seitz
5d53b11a2c change debug_assert to assert in term length
change `debug_assert` to `assert` since the bug with the truncated Terms only occurs in prod
2024-04-04 22:54:28 +08:00
PSeitz
4e79e11007 add collect_block to BoxableSegmentCollector (#2331) 2024-03-21 09:10:25 +01:00
PSeitz
67ebba3c3c expose collect_block buffer size (#2326)
* expose buffer of collect_block

* flip shard_size segment_size
2024-03-15 08:02:08 +01:00
PSeitz
7ce950f141 add method to fetch block of first vals in columnar (#2330)
* add method to fetch block of first vals in columnar

add method to fetch block of first vals in columnar (this is way faster
than single calls for full columns)
add benchmark
fix import warnings

```
test bench_get_block_first_on_full_column                  ... bench:          56 ns/iter (+/- 26)
test bench_get_block_first_on_full_column_single_calls     ... bench:         311 ns/iter (+/- 6)
test bench_get_block_first_on_multi_column                 ... bench:         378 ns/iter (+/- 15)
test bench_get_block_first_on_multi_column_single_calls    ... bench:         546 ns/iter (+/- 13)
test bench_get_block_first_on_optional_column              ... bench:         291 ns/iter (+/- 6)
test bench_get_block_first_on_optional_column_single_calls ... bench:         362 ns/iter (+/- 8)
```

* use remainder
2024-03-15 08:01:47 +01:00
dependabot[bot]
0cffe5fb09 Update base64 requirement from 0.21.0 to 0.22.0 (#2324)
Updates the requirements on [base64](https://github.com/marshallpierce/rust-base64) to permit the latest version.
- [Changelog](https://github.com/marshallpierce/rust-base64/blob/master/RELEASE-NOTES.md)
- [Commits](https://github.com/marshallpierce/rust-base64/compare/v0.21.0...v0.22.0)

---
updated-dependencies:
- dependency-name: base64
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-03-15 15:50:34 +09:00
PSeitz
b0e65560a1 handle ip adresses in term aggregation (#2319)
* handle ip adresses in term aggregation

Stores IpAdresses during the segment term aggregation via u64 representation
and convert to u128(IpV6Adress) via downcast when converting to intermediate results.

Enable Downcasting on `ColumnValues`
Expose u64 variant for u128 encoded data via `open_u64_lenient` method.
Remove lifetime in VecColumn, to avoid 'static lifetime requirement coming
from downcast trait.

* rename method
2024-03-14 09:41:18 +01:00
PSeitz
ec37295b2f add fast path for full columns in fetch_block (#2328)
Spotted in `range_date_histogram` query in quickwit benchmark:
5% of time copying docs around, which is not needed in the full index case

remove Column to ColumnIndex deref
2024-03-14 04:07:11 +01:00
57 changed files with 607 additions and 174 deletions

View File

@@ -16,7 +16,7 @@ exclude = ["benches/*.json", "benches/*.txt"]
[dependencies] [dependencies]
oneshot = "0.1.5" oneshot = "0.1.5"
base64 = "0.21.0" base64 = "0.22.0"
byteorder = "1.4.3" byteorder = "1.4.3"
crc32fast = "1.3.2" crc32fast = "1.3.2"
once_cell = "1.10.0" once_cell = "1.10.0"

View File

@@ -1,4 +1,3 @@
use std::convert::TryInto;
use std::io; use std::io;
use std::ops::{Range, RangeInclusive}; use std::ops::{Range, RangeInclusive};

View File

@@ -17,6 +17,7 @@ sstable = { version= "0.2", path = "../sstable", package = "tantivy-sstable" }
common = { version= "0.6", path = "../common", package = "tantivy-common" } common = { version= "0.6", path = "../common", package = "tantivy-common" }
tantivy-bitpacker = { version= "0.5", path = "../bitpacker/" } tantivy-bitpacker = { version= "0.5", path = "../bitpacker/" }
serde = "1.0.152" serde = "1.0.152"
downcast-rs = "1.2.0"
[dev-dependencies] [dev-dependencies]
proptest = "1" proptest = "1"

View File

@@ -0,0 +1,155 @@
#![feature(test)]
extern crate test;
use std::sync::Arc;
use rand::prelude::*;
use tantivy_columnar::column_values::{serialize_and_load_u64_based_column_values, CodecType};
use tantivy_columnar::*;
use test::{black_box, Bencher};
struct Columns {
pub optional: Column,
pub full: Column,
pub multi: Column,
}
fn get_test_columns() -> Columns {
let data = generate_permutation();
let mut dataframe_writer = ColumnarWriter::default();
for (idx, val) in data.iter().enumerate() {
dataframe_writer.record_numerical(idx as u32, "full_values", NumericalValue::U64(*val));
if idx % 2 == 0 {
dataframe_writer.record_numerical(
idx as u32,
"optional_values",
NumericalValue::U64(*val),
);
}
dataframe_writer.record_numerical(idx as u32, "multi_values", NumericalValue::U64(*val));
dataframe_writer.record_numerical(idx as u32, "multi_values", NumericalValue::U64(*val));
}
let mut buffer: Vec<u8> = Vec::new();
dataframe_writer
.serialize(data.len() as u32, None, &mut buffer)
.unwrap();
let columnar = ColumnarReader::open(buffer).unwrap();
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("optional_values").unwrap();
assert_eq!(cols.len(), 1);
let optional = cols[0].open_u64_lenient().unwrap().unwrap();
assert_eq!(optional.index.get_cardinality(), Cardinality::Optional);
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("full_values").unwrap();
assert_eq!(cols.len(), 1);
let column_full = cols[0].open_u64_lenient().unwrap().unwrap();
assert_eq!(column_full.index.get_cardinality(), Cardinality::Full);
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("multi_values").unwrap();
assert_eq!(cols.len(), 1);
let multi = cols[0].open_u64_lenient().unwrap().unwrap();
assert_eq!(multi.index.get_cardinality(), Cardinality::Multivalued);
Columns {
optional,
full: column_full,
multi,
}
}
const NUM_VALUES: u64 = 100_000;
fn generate_permutation() -> Vec<u64> {
let mut permutation: Vec<u64> = (0u64..NUM_VALUES).collect();
permutation.shuffle(&mut StdRng::from_seed([1u8; 32]));
permutation
}
pub fn serialize_and_load(column: &[u64], codec_type: CodecType) -> Arc<dyn ColumnValues<u64>> {
serialize_and_load_u64_based_column_values(&column, &[codec_type])
}
fn run_bench_on_column_full_scan(b: &mut Bencher, column: Column) {
let num_iter = black_box(NUM_VALUES);
b.iter(|| {
let mut sum = 0u64;
for i in 0..num_iter as u32 {
let val = column.first(i);
sum += val.unwrap_or(0);
}
sum
});
}
fn run_bench_on_column_block_fetch(b: &mut Bencher, column: Column) {
let mut block: Vec<Option<u64>> = vec![None; 64];
let fetch_docids = (0..64).collect::<Vec<_>>();
b.iter(move || {
column.first_vals(&fetch_docids, &mut block);
block[0]
});
}
fn run_bench_on_column_block_single_calls(b: &mut Bencher, column: Column) {
let mut block: Vec<Option<u64>> = vec![None; 64];
let fetch_docids = (0..64).collect::<Vec<_>>();
b.iter(move || {
for i in 0..fetch_docids.len() {
block[i] = column.first(fetch_docids[i]);
}
block[0]
});
}
/// Column first method
#[bench]
fn bench_get_first_on_full_column_full_scan(b: &mut Bencher) {
let column = get_test_columns().full;
run_bench_on_column_full_scan(b, column);
}
#[bench]
fn bench_get_first_on_optional_column_full_scan(b: &mut Bencher) {
let column = get_test_columns().optional;
run_bench_on_column_full_scan(b, column);
}
#[bench]
fn bench_get_first_on_multi_column_full_scan(b: &mut Bencher) {
let column = get_test_columns().multi;
run_bench_on_column_full_scan(b, column);
}
/// Block fetch column accessor
#[bench]
fn bench_get_block_first_on_optional_column(b: &mut Bencher) {
let column = get_test_columns().optional;
run_bench_on_column_block_fetch(b, column);
}
#[bench]
fn bench_get_block_first_on_multi_column(b: &mut Bencher) {
let column = get_test_columns().multi;
run_bench_on_column_block_fetch(b, column);
}
#[bench]
fn bench_get_block_first_on_full_column(b: &mut Bencher) {
let column = get_test_columns().full;
run_bench_on_column_block_fetch(b, column);
}
#[bench]
fn bench_get_block_first_on_optional_column_single_calls(b: &mut Bencher) {
let column = get_test_columns().optional;
run_bench_on_column_block_single_calls(b, column);
}
#[bench]
fn bench_get_block_first_on_multi_column_single_calls(b: &mut Bencher) {
let column = get_test_columns().multi;
run_bench_on_column_block_single_calls(b, column);
}
#[bench]
fn bench_get_block_first_on_full_column_single_calls(b: &mut Bencher) {
let column = get_test_columns().full;
run_bench_on_column_block_single_calls(b, column);
}

View File

@@ -16,14 +16,6 @@ fn generate_permutation() -> Vec<u64> {
permutation permutation
} }
fn generate_random() -> Vec<u64> {
let mut permutation: Vec<u64> = (0u64..100_000u64)
.map(|el| el + random::<u16>() as u64)
.collect();
permutation.shuffle(&mut StdRng::from_seed([1u8; 32]));
permutation
}
// Warning: this generates the same permutation at each call // Warning: this generates the same permutation at each call
fn generate_permutation_gcd() -> Vec<u64> { fn generate_permutation_gcd() -> Vec<u64> {
let mut permutation: Vec<u64> = (1u64..100_000u64).map(|el| el * 1000).collect(); let mut permutation: Vec<u64> = (1u64..100_000u64).map(|el| el * 1000).collect();

View File

@@ -14,20 +14,32 @@ impl<T: PartialOrd + Copy + std::fmt::Debug + Send + Sync + 'static + Default>
ColumnBlockAccessor<T> ColumnBlockAccessor<T>
{ {
#[inline] #[inline]
pub fn fetch_block(&mut self, docs: &[u32], accessor: &Column<T>) { pub fn fetch_block<'a>(&'a mut self, docs: &'a [u32], accessor: &Column<T>) {
self.docid_cache.clear(); if accessor.index.get_cardinality().is_full() {
self.row_id_cache.clear(); self.val_cache.resize(docs.len(), T::default());
accessor.row_ids_for_docs(docs, &mut self.docid_cache, &mut self.row_id_cache); accessor.values.get_vals(docs, &mut self.val_cache);
self.val_cache.resize(self.row_id_cache.len(), T::default()); } else {
accessor self.docid_cache.clear();
.values self.row_id_cache.clear();
.get_vals(&self.row_id_cache, &mut self.val_cache); accessor.row_ids_for_docs(docs, &mut self.docid_cache, &mut self.row_id_cache);
self.val_cache.resize(self.row_id_cache.len(), T::default());
accessor
.values
.get_vals(&self.row_id_cache, &mut self.val_cache);
}
} }
#[inline] #[inline]
pub fn fetch_block_with_missing(&mut self, docs: &[u32], accessor: &Column<T>, missing: T) { pub fn fetch_block_with_missing(&mut self, docs: &[u32], accessor: &Column<T>, missing: T) {
self.fetch_block(docs, accessor); self.fetch_block(docs, accessor);
// We can compare docid_cache with docs to find missing docs // no missing values
if docs.len() != self.docid_cache.len() || accessor.index.is_multivalue() { if accessor.index.get_cardinality().is_full() {
return;
}
// We can compare docid_cache length with docs to find missing docs
// For multi value columns we can't rely on the length and always need to scan
if accessor.index.get_cardinality().is_multivalue() || docs.len() != self.docid_cache.len()
{
self.missing_docids_cache.clear(); self.missing_docids_cache.clear();
find_missing_docs(docs, &self.docid_cache, |doc| { find_missing_docs(docs, &self.docid_cache, |doc| {
self.missing_docids_cache.push(doc); self.missing_docids_cache.push(doc);
@@ -44,11 +56,25 @@ impl<T: PartialOrd + Copy + std::fmt::Debug + Send + Sync + 'static + Default>
} }
#[inline] #[inline]
pub fn iter_docid_vals(&self) -> impl Iterator<Item = (DocId, T)> + '_ { /// Returns an iterator over the docids and values
self.docid_cache /// The passed in `docs` slice needs to be the same slice that was passed to `fetch_block` or
.iter() /// `fetch_block_with_missing`.
.cloned() ///
.zip(self.val_cache.iter().cloned()) /// The docs is used if the column is full (each docs has exactly one value), otherwise the
/// internal docid vec is used for the iterator, which e.g. may contain duplicate docs.
pub fn iter_docid_vals<'a>(
&'a self,
docs: &'a [u32],
accessor: &Column<T>,
) -> impl Iterator<Item = (DocId, T)> + '_ {
if accessor.index.get_cardinality().is_full() {
docs.iter().cloned().zip(self.val_cache.iter().cloned())
} else {
self.docid_cache
.iter()
.cloned()
.zip(self.val_cache.iter().cloned())
}
} }
} }

View File

@@ -3,17 +3,17 @@ mod serialize;
use std::fmt::{self, Debug}; use std::fmt::{self, Debug};
use std::io::Write; use std::io::Write;
use std::ops::{Deref, Range, RangeInclusive}; use std::ops::{Range, RangeInclusive};
use std::sync::Arc; use std::sync::Arc;
use common::BinarySerializable; use common::BinarySerializable;
pub use dictionary_encoded::{BytesColumn, StrColumn}; pub use dictionary_encoded::{BytesColumn, StrColumn};
pub use serialize::{ pub use serialize::{
open_column_bytes, open_column_str, open_column_u128, open_column_u64, open_column_bytes, open_column_str, open_column_u128, open_column_u128_as_compact_u64,
serialize_column_mappable_to_u128, serialize_column_mappable_to_u64, open_column_u64, serialize_column_mappable_to_u128, serialize_column_mappable_to_u64,
}; };
use crate::column_index::ColumnIndex; use crate::column_index::{ColumnIndex, Set};
use crate::column_values::monotonic_mapping::StrictlyMonotonicMappingToInternal; use crate::column_values::monotonic_mapping::StrictlyMonotonicMappingToInternal;
use crate::column_values::{monotonic_map_column, ColumnValues}; use crate::column_values::{monotonic_map_column, ColumnValues};
use crate::{Cardinality, DocId, EmptyColumnValues, MonotonicallyMappableToU64, RowId}; use crate::{Cardinality, DocId, EmptyColumnValues, MonotonicallyMappableToU64, RowId};
@@ -83,10 +83,36 @@ impl<T: PartialOrd + Copy + Debug + Send + Sync + 'static> Column<T> {
self.values.max_value() self.values.max_value()
} }
#[inline]
pub fn first(&self, row_id: RowId) -> Option<T> { pub fn first(&self, row_id: RowId) -> Option<T> {
self.values_for_doc(row_id).next() self.values_for_doc(row_id).next()
} }
/// Load the first value for each docid in the provided slice.
#[inline]
pub fn first_vals(&self, docids: &[DocId], output: &mut [Option<T>]) {
match &self.index {
ColumnIndex::Empty { .. } => {}
ColumnIndex::Full => self.values.get_vals_opt(docids, output),
ColumnIndex::Optional(optional_index) => {
for (i, docid) in docids.iter().enumerate() {
output[i] = optional_index
.rank_if_exists(*docid)
.map(|rowid| self.values.get_val(rowid));
}
}
ColumnIndex::Multivalued(multivalued_index) => {
for (i, docid) in docids.iter().enumerate() {
let range = multivalued_index.range(*docid);
let is_empty = range.start == range.end;
if !is_empty {
output[i] = Some(self.values.get_val(range.start));
}
}
}
}
}
/// Translates a block of docis to row_ids. /// Translates a block of docis to row_ids.
/// ///
/// returns the row_ids and the matching docids on the same index /// returns the row_ids and the matching docids on the same index
@@ -105,7 +131,8 @@ impl<T: PartialOrd + Copy + Debug + Send + Sync + 'static> Column<T> {
} }
pub fn values_for_doc(&self, doc_id: DocId) -> impl Iterator<Item = T> + '_ { pub fn values_for_doc(&self, doc_id: DocId) -> impl Iterator<Item = T> + '_ {
self.value_row_ids(doc_id) self.index
.value_row_ids(doc_id)
.map(|value_row_id: RowId| self.values.get_val(value_row_id)) .map(|value_row_id: RowId| self.values.get_val(value_row_id))
} }
@@ -147,14 +174,6 @@ impl<T: PartialOrd + Copy + Debug + Send + Sync + 'static> Column<T> {
} }
} }
impl<T> Deref for Column<T> {
type Target = ColumnIndex;
fn deref(&self) -> &Self::Target {
&self.index
}
}
impl BinarySerializable for Cardinality { impl BinarySerializable for Cardinality {
fn serialize<W: Write + ?Sized>(&self, writer: &mut W) -> std::io::Result<()> { fn serialize<W: Write + ?Sized>(&self, writer: &mut W) -> std::io::Result<()> {
self.to_code().serialize(writer) self.to_code().serialize(writer)
@@ -176,6 +195,7 @@ struct FirstValueWithDefault<T: Copy> {
impl<T: PartialOrd + Debug + Send + Sync + Copy + 'static> ColumnValues<T> impl<T: PartialOrd + Debug + Send + Sync + Copy + 'static> ColumnValues<T>
for FirstValueWithDefault<T> for FirstValueWithDefault<T>
{ {
#[inline(always)]
fn get_val(&self, idx: u32) -> T { fn get_val(&self, idx: u32) -> T {
self.column.first(idx).unwrap_or(self.default_value) self.column.first(idx).unwrap_or(self.default_value)
} }

View File

@@ -76,6 +76,26 @@ pub fn open_column_u128<T: MonotonicallyMappableToU128>(
}) })
} }
/// Open the column as u64.
///
/// See [`open_u128_as_compact_u64`] for more details.
pub fn open_column_u128_as_compact_u64(bytes: OwnedBytes) -> io::Result<Column<u64>> {
let (body, column_index_num_bytes_payload) = bytes.rsplit(4);
let column_index_num_bytes = u32::from_le_bytes(
column_index_num_bytes_payload
.as_slice()
.try_into()
.unwrap(),
);
let (column_index_data, column_values_data) = body.split(column_index_num_bytes as usize);
let column_index = crate::column_index::open_column_index(column_index_data)?;
let column_values = crate::column_values::open_u128_as_compact_u64(column_values_data)?;
Ok(Column {
index: column_index,
values: column_values,
})
}
pub fn open_column_bytes(data: OwnedBytes) -> io::Result<BytesColumn> { pub fn open_column_bytes(data: OwnedBytes) -> io::Result<BytesColumn> {
let (body, dictionary_len_bytes) = data.rsplit(4); let (body, dictionary_len_bytes) = data.rsplit(4);
let dictionary_len = u32::from_le_bytes(dictionary_len_bytes.as_slice().try_into().unwrap()); let dictionary_len = u32::from_le_bytes(dictionary_len_bytes.as_slice().try_into().unwrap());

View File

@@ -42,10 +42,6 @@ impl From<MultiValueIndex> for ColumnIndex {
} }
impl ColumnIndex { impl ColumnIndex {
#[inline]
pub fn is_multivalue(&self) -> bool {
matches!(self, ColumnIndex::Multivalued(_))
}
/// Returns the cardinality of the column index. /// Returns the cardinality of the column index.
/// ///
/// By convention, if the column contains no docs, we consider that it is /// By convention, if the column contains no docs, we consider that it is

View File

@@ -1,4 +1,3 @@
use std::convert::TryInto;
use std::io::{self, Write}; use std::io::{self, Write};
use common::BinarySerializable; use common::BinarySerializable;

View File

@@ -1,5 +1,4 @@
use proptest::prelude::{any, prop, *}; use proptest::prelude::*;
use proptest::strategy::Strategy;
use proptest::{prop_oneof, proptest}; use proptest::{prop_oneof, proptest};
use super::*; use super::*;

View File

@@ -10,7 +10,7 @@ pub(crate) struct MergedColumnValues<'a, T> {
pub(crate) merge_row_order: &'a MergeRowOrder, pub(crate) merge_row_order: &'a MergeRowOrder,
} }
impl<'a, T: Copy + PartialOrd + Debug> Iterable<T> for MergedColumnValues<'a, T> { impl<'a, T: Copy + PartialOrd + Debug + 'static> Iterable<T> for MergedColumnValues<'a, T> {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = T> + '_> { fn boxed_iter(&self) -> Box<dyn Iterator<Item = T> + '_> {
match self.merge_row_order { match self.merge_row_order {
MergeRowOrder::Stack(_) => Box::new( MergeRowOrder::Stack(_) => Box::new(

View File

@@ -10,6 +10,7 @@ use std::fmt::Debug;
use std::ops::{Range, RangeInclusive}; use std::ops::{Range, RangeInclusive};
use std::sync::Arc; use std::sync::Arc;
use downcast_rs::DowncastSync;
pub use monotonic_mapping::{MonotonicallyMappableToU64, StrictlyMonotonicFn}; pub use monotonic_mapping::{MonotonicallyMappableToU64, StrictlyMonotonicFn};
pub use monotonic_mapping_u128::MonotonicallyMappableToU128; pub use monotonic_mapping_u128::MonotonicallyMappableToU128;
@@ -25,7 +26,10 @@ mod monotonic_column;
pub(crate) use merge::MergedColumnValues; pub(crate) use merge::MergedColumnValues;
pub use stats::ColumnStats; pub use stats::ColumnStats;
pub use u128_based::{open_u128_mapped, serialize_column_values_u128}; pub use u128_based::{
open_u128_as_compact_u64, open_u128_mapped, serialize_column_values_u128,
CompactSpaceU64Accessor,
};
pub use u64_based::{ pub use u64_based::{
load_u64_based_column_values, serialize_and_load_u64_based_column_values, load_u64_based_column_values, serialize_and_load_u64_based_column_values,
serialize_u64_based_column_values, CodecType, ALL_U64_CODEC_TYPES, serialize_u64_based_column_values, CodecType, ALL_U64_CODEC_TYPES,
@@ -41,7 +45,7 @@ use crate::RowId;
/// ///
/// Any methods with a default and specialized implementation need to be called in the /// Any methods with a default and specialized implementation need to be called in the
/// wrappers that implement the trait: Arc and MonotonicMappingColumn /// wrappers that implement the trait: Arc and MonotonicMappingColumn
pub trait ColumnValues<T: PartialOrd = u64>: Send + Sync { pub trait ColumnValues<T: PartialOrd = u64>: Send + Sync + DowncastSync {
/// Return the value associated with the given idx. /// Return the value associated with the given idx.
/// ///
/// This accessor should return as fast as possible. /// This accessor should return as fast as possible.
@@ -68,11 +72,40 @@ pub trait ColumnValues<T: PartialOrd = u64>: Send + Sync {
out_x4[3] = self.get_val(idx_x4[3]); out_x4[3] = self.get_val(idx_x4[3]);
} }
let step_size = 4; let out_and_idx_chunks = output
let cutoff = indexes.len() - indexes.len() % step_size; .chunks_exact_mut(4)
.into_remainder()
.into_iter()
.zip(indexes.chunks_exact(4).remainder());
for (out, idx) in out_and_idx_chunks {
*out = self.get_val(*idx);
}
}
for idx in cutoff..indexes.len() { /// Allows to push down multiple fetch calls, to avoid dynamic dispatch overhead.
output[idx] = self.get_val(indexes[idx]); /// The slightly weird `Option<T>` in output allows pushdown to full columns.
///
/// idx and output should have the same length
///
/// # Panics
///
/// May panic if `idx` is greater than the column length.
fn get_vals_opt(&self, indexes: &[u32], output: &mut [Option<T>]) {
assert!(indexes.len() == output.len());
let out_and_idx_chunks = output.chunks_exact_mut(4).zip(indexes.chunks_exact(4));
for (out_x4, idx_x4) in out_and_idx_chunks {
out_x4[0] = Some(self.get_val(idx_x4[0]));
out_x4[1] = Some(self.get_val(idx_x4[1]));
out_x4[2] = Some(self.get_val(idx_x4[2]));
out_x4[3] = Some(self.get_val(idx_x4[3]));
}
let out_and_idx_chunks = output
.chunks_exact_mut(4)
.into_remainder()
.into_iter()
.zip(indexes.chunks_exact(4).remainder());
for (out, idx) in out_and_idx_chunks {
*out = Some(self.get_val(*idx));
} }
} }
@@ -139,6 +172,7 @@ pub trait ColumnValues<T: PartialOrd = u64>: Send + Sync {
Box::new((0..self.num_vals()).map(|idx| self.get_val(idx))) Box::new((0..self.num_vals()).map(|idx| self.get_val(idx)))
} }
} }
downcast_rs::impl_downcast!(sync ColumnValues<T> where T: PartialOrd);
/// Empty column of values. /// Empty column of values.
pub struct EmptyColumnValues; pub struct EmptyColumnValues;
@@ -161,12 +195,17 @@ impl<T: PartialOrd + Default> ColumnValues<T> for EmptyColumnValues {
} }
} }
impl<T: Copy + PartialOrd + Debug> ColumnValues<T> for Arc<dyn ColumnValues<T>> { impl<T: Copy + PartialOrd + Debug + 'static> ColumnValues<T> for Arc<dyn ColumnValues<T>> {
#[inline(always)] #[inline(always)]
fn get_val(&self, idx: u32) -> T { fn get_val(&self, idx: u32) -> T {
self.as_ref().get_val(idx) self.as_ref().get_val(idx)
} }
#[inline(always)]
fn get_vals_opt(&self, indexes: &[u32], output: &mut [Option<T>]) {
self.as_ref().get_vals_opt(indexes, output)
}
#[inline(always)] #[inline(always)]
fn min_value(&self) -> T { fn min_value(&self) -> T {
self.as_ref().min_value() self.as_ref().min_value()

View File

@@ -31,10 +31,10 @@ pub fn monotonic_map_column<C, T, Input, Output>(
monotonic_mapping: T, monotonic_mapping: T,
) -> impl ColumnValues<Output> ) -> impl ColumnValues<Output>
where where
C: ColumnValues<Input>, C: ColumnValues<Input> + 'static,
T: StrictlyMonotonicFn<Input, Output> + Send + Sync, T: StrictlyMonotonicFn<Input, Output> + Send + Sync + 'static,
Input: PartialOrd + Debug + Send + Sync + Clone, Input: PartialOrd + Debug + Send + Sync + Clone + 'static,
Output: PartialOrd + Debug + Send + Sync + Clone, Output: PartialOrd + Debug + Send + Sync + Clone + 'static,
{ {
MonotonicMappingColumn { MonotonicMappingColumn {
from_column, from_column,
@@ -45,10 +45,10 @@ where
impl<C, T, Input, Output> ColumnValues<Output> for MonotonicMappingColumn<C, T, Input> impl<C, T, Input, Output> ColumnValues<Output> for MonotonicMappingColumn<C, T, Input>
where where
C: ColumnValues<Input>, C: ColumnValues<Input> + 'static,
T: StrictlyMonotonicFn<Input, Output> + Send + Sync, T: StrictlyMonotonicFn<Input, Output> + Send + Sync + 'static,
Input: PartialOrd + Send + Debug + Sync + Clone, Input: PartialOrd + Send + Debug + Sync + Clone + 'static,
Output: PartialOrd + Send + Debug + Sync + Clone, Output: PartialOrd + Send + Debug + Sync + Clone + 'static,
{ {
#[inline(always)] #[inline(always)]
fn get_val(&self, idx: u32) -> Output { fn get_val(&self, idx: u32) -> Output {
@@ -107,7 +107,7 @@ mod tests {
#[test] #[test]
fn test_monotonic_mapping_iter() { fn test_monotonic_mapping_iter() {
let vals: Vec<u64> = (0..100u64).map(|el| el * 10).collect(); let vals: Vec<u64> = (0..100u64).map(|el| el * 10).collect();
let col = VecColumn::from(&vals); let col = VecColumn::from(vals);
let mapped = monotonic_map_column( let mapped = monotonic_map_column(
col, col,
StrictlyMonotonicMappingInverter::from(StrictlyMonotonicMappingToInternal::<i64>::new()), StrictlyMonotonicMappingInverter::from(StrictlyMonotonicMappingToInternal::<i64>::new()),

View File

@@ -22,7 +22,7 @@ mod build_compact_space;
use build_compact_space::get_compact_space; use build_compact_space::get_compact_space;
use common::{BinarySerializable, CountingWriter, OwnedBytes, VInt, VIntU128}; use common::{BinarySerializable, CountingWriter, OwnedBytes, VInt, VIntU128};
use tantivy_bitpacker::{self, BitPacker, BitUnpacker}; use tantivy_bitpacker::{BitPacker, BitUnpacker};
use crate::column_values::ColumnValues; use crate::column_values::ColumnValues;
use crate::RowId; use crate::RowId;
@@ -292,6 +292,63 @@ impl BinarySerializable for IPCodecParams {
} }
} }
/// Exposes the compact space compressed values as u64.
///
/// This allows faster access to the values, as u64 is faster to work with than u128.
/// It also allows to handle u128 values like u64, via the `open_u64_lenient` as a uniform
/// access interface.
///
/// When converting from the internal u64 to u128 `compact_to_u128` can be used.
pub struct CompactSpaceU64Accessor(CompactSpaceDecompressor);
impl CompactSpaceU64Accessor {
pub(crate) fn open(data: OwnedBytes) -> io::Result<CompactSpaceU64Accessor> {
let decompressor = CompactSpaceU64Accessor(CompactSpaceDecompressor::open(data)?);
Ok(decompressor)
}
/// Convert a compact space value to u128
pub fn compact_to_u128(&self, compact: u32) -> u128 {
self.0.compact_to_u128(compact)
}
}
impl ColumnValues<u64> for CompactSpaceU64Accessor {
#[inline]
fn get_val(&self, doc: u32) -> u64 {
let compact = self.0.get_compact(doc);
compact as u64
}
fn min_value(&self) -> u64 {
self.0.u128_to_compact(self.0.min_value()).unwrap() as u64
}
fn max_value(&self) -> u64 {
self.0.u128_to_compact(self.0.max_value()).unwrap() as u64
}
fn num_vals(&self) -> u32 {
self.0.params.num_vals
}
#[inline]
fn iter(&self) -> Box<dyn Iterator<Item = u64> + '_> {
Box::new(self.0.iter_compact().map(|el| el as u64))
}
#[inline]
fn get_row_ids_for_value_range(
&self,
value_range: RangeInclusive<u64>,
position_range: Range<u32>,
positions: &mut Vec<u32>,
) {
let value_range = self.0.compact_to_u128(*value_range.start() as u32)
..=self.0.compact_to_u128(*value_range.end() as u32);
self.0
.get_row_ids_for_value_range(value_range, position_range, positions)
}
}
impl ColumnValues<u128> for CompactSpaceDecompressor { impl ColumnValues<u128> for CompactSpaceDecompressor {
#[inline] #[inline]
fn get_val(&self, doc: u32) -> u128 { fn get_val(&self, doc: u32) -> u128 {
@@ -402,9 +459,14 @@ impl CompactSpaceDecompressor {
.map(|compact| self.compact_to_u128(compact)) .map(|compact| self.compact_to_u128(compact))
} }
#[inline]
pub fn get_compact(&self, idx: u32) -> u32 {
self.params.bit_unpacker.get(idx, &self.data) as u32
}
#[inline] #[inline]
pub fn get(&self, idx: u32) -> u128 { pub fn get(&self, idx: u32) -> u128 {
let compact = self.params.bit_unpacker.get(idx, &self.data) as u32; let compact = self.get_compact(idx);
self.compact_to_u128(compact) self.compact_to_u128(compact)
} }

View File

@@ -6,7 +6,9 @@ use std::sync::Arc;
mod compact_space; mod compact_space;
use common::{BinarySerializable, OwnedBytes, VInt}; use common::{BinarySerializable, OwnedBytes, VInt};
use compact_space::{CompactSpaceCompressor, CompactSpaceDecompressor}; pub use compact_space::{
CompactSpaceCompressor, CompactSpaceDecompressor, CompactSpaceU64Accessor,
};
use crate::column_values::monotonic_map_column; use crate::column_values::monotonic_map_column;
use crate::column_values::monotonic_mapping::{ use crate::column_values::monotonic_mapping::{
@@ -108,6 +110,23 @@ pub fn open_u128_mapped<T: MonotonicallyMappableToU128 + Debug>(
StrictlyMonotonicMappingToInternal::<T>::new().into(); StrictlyMonotonicMappingToInternal::<T>::new().into();
Ok(Arc::new(monotonic_map_column(reader, inverted))) Ok(Arc::new(monotonic_map_column(reader, inverted)))
} }
/// Returns the u64 representation of the u128 data.
/// The internal representation of the data as u64 is useful for faster processing.
///
/// In order to convert to u128 back cast to `CompactSpaceU64Accessor` and call
/// `compact_to_u128`.
///
/// # Notice
/// In case there are new codecs added, check for usages of `CompactSpaceDecompressorU64` and
/// also handle the new codecs.
pub fn open_u128_as_compact_u64(mut bytes: OwnedBytes) -> io::Result<Arc<dyn ColumnValues<u64>>> {
let header = U128Header::deserialize(&mut bytes)?;
assert_eq!(header.codec_type, U128FastFieldCodecType::CompactSpace);
let reader = CompactSpaceU64Accessor::open(bytes)?;
Ok(Arc::new(reader))
}
#[cfg(test)] #[cfg(test)]
pub mod tests { pub mod tests {
use super::*; use super::*;

View File

@@ -63,7 +63,6 @@ impl ColumnValues for BitpackedReader {
fn get_val(&self, doc: u32) -> u64 { fn get_val(&self, doc: u32) -> u64 {
self.stats.min_value + self.stats.gcd.get() * self.bit_unpacker.get(doc, &self.data) self.stats.min_value + self.stats.gcd.get() * self.bit_unpacker.get(doc, &self.data)
} }
#[inline] #[inline]
fn min_value(&self) -> u64 { fn min_value(&self) -> u64 {
self.stats.min_value self.stats.min_value

View File

@@ -63,7 +63,10 @@ impl BlockwiseLinearEstimator {
if self.block.is_empty() { if self.block.is_empty() {
return; return;
} }
let line = Line::train(&VecColumn::from(&self.block)); let column = VecColumn::from(std::mem::take(&mut self.block));
let line = Line::train(&column);
self.block = column.into();
let mut max_value = 0u64; let mut max_value = 0u64;
for (i, buffer_val) in self.block.iter().enumerate() { for (i, buffer_val) in self.block.iter().enumerate() {
let interpolated_val = line.eval(i as u32); let interpolated_val = line.eval(i as u32);
@@ -125,7 +128,7 @@ impl ColumnCodecEstimator for BlockwiseLinearEstimator {
*buffer_val = gcd_divider.divide(*buffer_val - stats.min_value); *buffer_val = gcd_divider.divide(*buffer_val - stats.min_value);
} }
let line = Line::train(&VecColumn::from(&buffer)); let line = Line::train(&VecColumn::from(buffer.to_vec()));
assert!(!buffer.is_empty()); assert!(!buffer.is_empty());

View File

@@ -184,7 +184,7 @@ mod tests {
} }
fn test_eval_max_err(ys: &[u64]) -> Option<u64> { fn test_eval_max_err(ys: &[u64]) -> Option<u64> {
let line = Line::train(&VecColumn::from(&ys)); let line = Line::train(&VecColumn::from(ys.to_vec()));
ys.iter() ys.iter()
.enumerate() .enumerate()
.map(|(x, y)| y.wrapping_sub(line.eval(x as u32))) .map(|(x, y)| y.wrapping_sub(line.eval(x as u32)))

View File

@@ -173,7 +173,9 @@ impl LinearCodecEstimator {
fn collect_before_line_estimation(&mut self, value: u64) { fn collect_before_line_estimation(&mut self, value: u64) {
self.block.push(value); self.block.push(value);
if self.block.len() == LINE_ESTIMATION_BLOCK_LEN { if self.block.len() == LINE_ESTIMATION_BLOCK_LEN {
let line = Line::train(&VecColumn::from(&self.block)); let column = VecColumn::from(std::mem::take(&mut self.block));
let line = Line::train(&column);
self.block = column.into();
let block = std::mem::take(&mut self.block); let block = std::mem::take(&mut self.block);
for val in block { for val in block {
self.collect_after_line_estimation(&line, val); self.collect_after_line_estimation(&line, val);

View File

@@ -1,5 +1,4 @@
use proptest::prelude::*; use proptest::prelude::*;
use proptest::strategy::Strategy;
use proptest::{prop_oneof, proptest}; use proptest::{prop_oneof, proptest};
#[test] #[test]

View File

@@ -4,14 +4,14 @@ use tantivy_bitpacker::minmax;
use crate::ColumnValues; use crate::ColumnValues;
/// VecColumn provides `Column` over a slice. /// VecColumn provides `Column` over a `Vec<T>`.
pub struct VecColumn<'a, T = u64> { pub struct VecColumn<T = u64> {
pub(crate) values: &'a [T], pub(crate) values: Vec<T>,
pub(crate) min_value: T, pub(crate) min_value: T,
pub(crate) max_value: T, pub(crate) max_value: T,
} }
impl<'a, T: Copy + PartialOrd + Send + Sync + Debug> ColumnValues<T> for VecColumn<'a, T> { impl<T: Copy + PartialOrd + Send + Sync + Debug + 'static> ColumnValues<T> for VecColumn<T> {
fn get_val(&self, position: u32) -> T { fn get_val(&self, position: u32) -> T {
self.values[position as usize] self.values[position as usize]
} }
@@ -37,11 +37,8 @@ impl<'a, T: Copy + PartialOrd + Send + Sync + Debug> ColumnValues<T> for VecColu
} }
} }
impl<'a, T: Copy + PartialOrd + Default, V> From<&'a V> for VecColumn<'a, T> impl<T: Copy + PartialOrd + Default> From<Vec<T>> for VecColumn<T> {
where V: AsRef<[T]> + ?Sized fn from(values: Vec<T>) -> Self {
{
fn from(values: &'a V) -> Self {
let values = values.as_ref();
let (min_value, max_value) = minmax(values.iter().copied()).unwrap_or_default(); let (min_value, max_value) = minmax(values.iter().copied()).unwrap_or_default();
Self { Self {
values, values,
@@ -50,3 +47,8 @@ where V: AsRef<[T]> + ?Sized
} }
} }
} }
impl From<VecColumn> for Vec<u64> {
fn from(column: VecColumn) -> Self {
column.values
}
}

View File

@@ -1,7 +1,3 @@
use std::collections::BTreeMap;
use itertools::Itertools;
use super::*; use super::*;
use crate::{Cardinality, ColumnarWriter, HasAssociatedColumnType, RowId}; use crate::{Cardinality, ColumnarWriter, HasAssociatedColumnType, RowId};

View File

@@ -13,9 +13,7 @@ pub(crate) use serializer::ColumnarSerializer;
use stacker::{Addr, ArenaHashMap, MemoryArena}; use stacker::{Addr, ArenaHashMap, MemoryArena};
use crate::column_index::SerializableColumnIndex; use crate::column_index::SerializableColumnIndex;
use crate::column_values::{ use crate::column_values::{MonotonicallyMappableToU128, MonotonicallyMappableToU64};
ColumnValues, MonotonicallyMappableToU128, MonotonicallyMappableToU64, VecColumn,
};
use crate::columnar::column_type::ColumnType; use crate::columnar::column_type::ColumnType;
use crate::columnar::writer::column_writers::{ use crate::columnar::writer::column_writers::{
ColumnWriter, NumericalColumnWriter, StrOrBytesColumnWriter, ColumnWriter, NumericalColumnWriter, StrOrBytesColumnWriter,
@@ -645,10 +643,7 @@ fn send_to_serialize_column_mappable_to_u128<
value_index_builders: &mut PreallocatedIndexBuilders, value_index_builders: &mut PreallocatedIndexBuilders,
values: &mut Vec<T>, values: &mut Vec<T>,
mut wrt: impl io::Write, mut wrt: impl io::Write,
) -> io::Result<()> ) -> io::Result<()> {
where
for<'a> VecColumn<'a, T>: ColumnValues<T>,
{
values.clear(); values.clear();
// TODO: split index and values // TODO: split index and values
let serializable_column_index = match cardinality { let serializable_column_index = match cardinality {
@@ -701,10 +696,7 @@ fn send_to_serialize_column_mappable_to_u64(
value_index_builders: &mut PreallocatedIndexBuilders, value_index_builders: &mut PreallocatedIndexBuilders,
values: &mut Vec<u64>, values: &mut Vec<u64>,
mut wrt: impl io::Write, mut wrt: impl io::Write,
) -> io::Result<()> ) -> io::Result<()> {
where
for<'a> VecColumn<'a, u64>: ColumnValues<u64>,
{
values.clear(); values.clear();
let serializable_column_index = match cardinality { let serializable_column_index = match cardinality {
Cardinality::Full => { Cardinality::Full => {

View File

@@ -96,7 +96,6 @@ impl<'a, W: io::Write> io::Write for ColumnSerializer<'a, W> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::columnar::column_type::ColumnType;
#[test] #[test]
fn test_prepare_key_bytes() { fn test_prepare_key_bytes() {

View File

@@ -8,7 +8,7 @@ use common::{ByteCount, DateTime, HasLen, OwnedBytes};
use crate::column::{BytesColumn, Column, StrColumn}; use crate::column::{BytesColumn, Column, StrColumn};
use crate::column_values::{monotonic_map_column, StrictlyMonotonicFn}; use crate::column_values::{monotonic_map_column, StrictlyMonotonicFn};
use crate::columnar::ColumnType; use crate::columnar::ColumnType;
use crate::{Cardinality, ColumnIndex, NumericalType}; use crate::{Cardinality, ColumnIndex, ColumnValues, NumericalType};
#[derive(Clone)] #[derive(Clone)]
pub enum DynamicColumn { pub enum DynamicColumn {
@@ -247,7 +247,12 @@ impl DynamicColumnHandle {
} }
/// Returns the `u64` fast field reader reader associated with `fields` of types /// Returns the `u64` fast field reader reader associated with `fields` of types
/// Str, u64, i64, f64, bool, or datetime. /// Str, u64, i64, f64, bool, ip, or datetime.
///
/// Notice that for IpAddr, the fastfield reader will return the u64 representation of the
/// IpAddr.
/// In order to convert to u128 back cast to `CompactSpaceU64Accessor` and call
/// `compact_to_u128`.
/// ///
/// If not, the fastfield reader will returns the u64-value associated with the original /// If not, the fastfield reader will returns the u64-value associated with the original
/// FastValue. /// FastValue.
@@ -258,7 +263,10 @@ impl DynamicColumnHandle {
let column: BytesColumn = crate::column::open_column_bytes(column_bytes)?; let column: BytesColumn = crate::column::open_column_bytes(column_bytes)?;
Ok(Some(column.term_ord_column)) Ok(Some(column.term_ord_column))
} }
ColumnType::IpAddr => Ok(None), ColumnType::IpAddr => {
let column = crate::column::open_column_u128_as_compact_u64(column_bytes)?;
Ok(Some(column))
}
ColumnType::Bool ColumnType::Bool
| ColumnType::I64 | ColumnType::I64
| ColumnType::U64 | ColumnType::U64

View File

@@ -113,6 +113,9 @@ impl Cardinality {
pub fn is_multivalue(&self) -> bool { pub fn is_multivalue(&self) -> bool {
matches!(self, Cardinality::Multivalued) matches!(self, Cardinality::Multivalued)
} }
pub fn is_full(&self) -> bool {
matches!(self, Cardinality::Full)
}
pub(crate) fn to_code(self) -> u8 { pub(crate) fn to_code(self) -> u8 {
self as u8 self as u8
} }

View File

@@ -1,4 +1,3 @@
use std::convert::TryInto;
use std::io::Write; use std::io::Write;
use std::{fmt, io, u64}; use std::{fmt, io, u64};

View File

@@ -1,4 +1,3 @@
use std::convert::TryInto;
use std::ops::{Deref, Range}; use std::ops::{Deref, Range};
use std::sync::Arc; use std::sync::Arc;
use std::{fmt, io}; use std::{fmt, io};

View File

@@ -170,8 +170,8 @@ impl AggregationWithAccessor {
ColumnType::Str, ColumnType::Str,
ColumnType::DateTime, ColumnType::DateTime,
ColumnType::Bool, ColumnType::Bool,
ColumnType::IpAddr,
// ColumnType::Bytes Unsupported // ColumnType::Bytes Unsupported
// ColumnType::IpAddr Unsupported
]; ];
// In case the column is empty we want the shim column to match the missing type // In case the column is empty we want the shim column to match the missing type

View File

@@ -816,38 +816,38 @@ fn test_aggregation_on_json_object_mixed_types() {
let mut index_writer: IndexWriter = index.writer_for_tests().unwrap(); let mut index_writer: IndexWriter = index.writer_for_tests().unwrap();
// => Segment with all values numeric // => Segment with all values numeric
index_writer index_writer
.add_document(doc!(json => json!({"mixed_type": 10.0}))) .add_document(doc!(json => json!({"mixed_type": 10.0, "mixed_price": 10.0})))
.unwrap(); .unwrap();
index_writer.commit().unwrap(); index_writer.commit().unwrap();
// => Segment with all values text // => Segment with all values text
index_writer index_writer
.add_document(doc!(json => json!({"mixed_type": "blue"}))) .add_document(doc!(json => json!({"mixed_type": "blue", "mixed_price": 5.0})))
.unwrap(); .unwrap();
index_writer index_writer
.add_document(doc!(json => json!({"mixed_type": "blue"}))) .add_document(doc!(json => json!({"mixed_type": "blue", "mixed_price": 5.0})))
.unwrap(); .unwrap();
index_writer index_writer
.add_document(doc!(json => json!({"mixed_type": "blue"}))) .add_document(doc!(json => json!({"mixed_type": "blue", "mixed_price": 5.0})))
.unwrap(); .unwrap();
index_writer.commit().unwrap(); index_writer.commit().unwrap();
// => Segment with all boolen // => Segment with all boolen
index_writer index_writer
.add_document(doc!(json => json!({"mixed_type": true}))) .add_document(doc!(json => json!({"mixed_type": true, "mixed_price": "no_price"})))
.unwrap(); .unwrap();
index_writer.commit().unwrap(); index_writer.commit().unwrap();
// => Segment with mixed values // => Segment with mixed values
index_writer index_writer
.add_document(doc!(json => json!({"mixed_type": "red"}))) .add_document(doc!(json => json!({"mixed_type": "red", "mixed_price": 1.0})))
.unwrap(); .unwrap();
index_writer index_writer
.add_document(doc!(json => json!({"mixed_type": "red"}))) .add_document(doc!(json => json!({"mixed_type": "red", "mixed_price": 1.0})))
.unwrap(); .unwrap();
index_writer index_writer
.add_document(doc!(json => json!({"mixed_type": -20.5}))) .add_document(doc!(json => json!({"mixed_type": -20.5, "mixed_price": -20.5})))
.unwrap(); .unwrap();
index_writer index_writer
.add_document(doc!(json => json!({"mixed_type": true}))) .add_document(doc!(json => json!({"mixed_type": true, "mixed_price": "no_price"})))
.unwrap(); .unwrap();
index_writer.commit().unwrap(); index_writer.commit().unwrap();
@@ -861,7 +861,7 @@ fn test_aggregation_on_json_object_mixed_types() {
"order": { "min_price": "desc" } "order": { "min_price": "desc" }
}, },
"aggs": { "aggs": {
"min_price": { "min": { "field": "json.mixed_type" } } "min_price": { "min": { "field": "json.mixed_price" } }
} }
}, },
"rangeagg": { "rangeagg": {
@@ -885,7 +885,6 @@ fn test_aggregation_on_json_object_mixed_types() {
let aggregation_results = searcher.search(&AllQuery, &aggregation_collector).unwrap(); let aggregation_results = searcher.search(&AllQuery, &aggregation_collector).unwrap();
let aggregation_res_json = serde_json::to_value(aggregation_results).unwrap(); let aggregation_res_json = serde_json::to_value(aggregation_results).unwrap();
// pretty print as json
use pretty_assertions::assert_eq; use pretty_assertions::assert_eq;
assert_eq!( assert_eq!(
&aggregation_res_json, &aggregation_res_json,
@@ -901,10 +900,10 @@ fn test_aggregation_on_json_object_mixed_types() {
"termagg": { "termagg": {
"buckets": [ "buckets": [
{ "doc_count": 1, "key": 10.0, "min_price": { "value": 10.0 } }, { "doc_count": 1, "key": 10.0, "min_price": { "value": 10.0 } },
{ "doc_count": 3, "key": "blue", "min_price": { "value": 5.0 } },
{ "doc_count": 2, "key": "red", "min_price": { "value": 1.0 } },
{ "doc_count": 1, "key": -20.5, "min_price": { "value": -20.5 } }, { "doc_count": 1, "key": -20.5, "min_price": { "value": -20.5 } },
{ "doc_count": 2, "key": "red", "min_price": { "value": null } },
{ "doc_count": 2, "key": 1.0, "key_as_string": "true", "min_price": { "value": null } }, { "doc_count": 2, "key": 1.0, "key_as_string": "true", "min_price": { "value": null } },
{ "doc_count": 3, "key": "blue", "min_price": { "value": null } },
], ],
"sum_other_doc_count": 0 "sum_other_doc_count": 0
} }

View File

@@ -1,5 +1,4 @@
use std::cmp::Ordering; use std::cmp::Ordering;
use std::fmt::Display;
use columnar::ColumnType; use columnar::ColumnType;
use itertools::Itertools; use itertools::Itertools;
@@ -310,7 +309,10 @@ impl SegmentAggregationCollector for SegmentHistogramCollector {
.column_block_accessor .column_block_accessor
.fetch_block(docs, &bucket_agg_accessor.accessor); .fetch_block(docs, &bucket_agg_accessor.accessor);
for (doc, val) in bucket_agg_accessor.column_block_accessor.iter_docid_vals() { for (doc, val) in bucket_agg_accessor
.column_block_accessor
.iter_docid_vals(docs, &bucket_agg_accessor.accessor)
{
let val = self.f64_from_fastfield_u64(val); let val = self.f64_from_fastfield_u64(val);
let bucket_pos = get_bucket_pos(val); let bucket_pos = get_bucket_pos(val);
@@ -597,13 +599,11 @@ mod tests {
use serde_json::Value; use serde_json::Value;
use super::*; use super::*;
use crate::aggregation::agg_req::Aggregations;
use crate::aggregation::agg_result::AggregationResults; use crate::aggregation::agg_result::AggregationResults;
use crate::aggregation::tests::{ use crate::aggregation::tests::{
exec_request, exec_request_with_query, exec_request_with_query_and_memory_limit, exec_request, exec_request_with_query, exec_request_with_query_and_memory_limit,
get_test_index_2_segments, get_test_index_from_values, get_test_index_with_num_docs, get_test_index_2_segments, get_test_index_from_values, get_test_index_with_num_docs,
}; };
use crate::aggregation::AggregationCollector;
use crate::query::AllQuery; use crate::query::AllQuery;
#[test] #[test]

View File

@@ -236,7 +236,10 @@ impl SegmentAggregationCollector for SegmentRangeCollector {
.column_block_accessor .column_block_accessor
.fetch_block(docs, &bucket_agg_accessor.accessor); .fetch_block(docs, &bucket_agg_accessor.accessor);
for (doc, val) in bucket_agg_accessor.column_block_accessor.iter_docid_vals() { for (doc, val) in bucket_agg_accessor
.column_block_accessor
.iter_docid_vals(docs, &bucket_agg_accessor.accessor)
{
let bucket_pos = self.get_bucket_pos(val); let bucket_pos = self.get_bucket_pos(val);
let bucket = &mut self.buckets[bucket_pos]; let bucket = &mut self.buckets[bucket_pos];

View File

@@ -1,6 +1,10 @@
use std::fmt::Debug; use std::fmt::Debug;
use std::net::Ipv6Addr;
use columnar::{BytesColumn, ColumnType, MonotonicallyMappableToU64, StrColumn}; use columnar::column_values::CompactSpaceU64Accessor;
use columnar::{
BytesColumn, ColumnType, MonotonicallyMappableToU128, MonotonicallyMappableToU64, StrColumn,
};
use rustc_hash::FxHashMap; use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -105,9 +109,9 @@ pub struct TermsAggregation {
/// ///
/// Defaults to 10 * size. /// Defaults to 10 * size.
#[serde(skip_serializing_if = "Option::is_none", default)] #[serde(skip_serializing_if = "Option::is_none", default)]
#[serde(alias = "segment_size")] #[serde(alias = "shard_size")]
#[serde(alias = "split_size")] #[serde(alias = "split_size")]
pub shard_size: Option<u32>, pub segment_size: Option<u32>,
/// If you set the `show_term_doc_count_error` parameter to true, the terms aggregation will /// If you set the `show_term_doc_count_error` parameter to true, the terms aggregation will
/// include doc_count_error_upper_bound, which is an upper bound to the error on the /// include doc_count_error_upper_bound, which is an upper bound to the error on the
@@ -196,7 +200,7 @@ impl TermsAggregationInternal {
pub(crate) fn from_req(req: &TermsAggregation) -> Self { pub(crate) fn from_req(req: &TermsAggregation) -> Self {
let size = req.size.unwrap_or(10); let size = req.size.unwrap_or(10);
let mut segment_size = req.shard_size.unwrap_or(size * 10); let mut segment_size = req.segment_size.unwrap_or(size * 10);
let order = req.order.clone().unwrap_or_default(); let order = req.order.clone().unwrap_or_default();
segment_size = segment_size.max(size); segment_size = segment_size.max(size);
@@ -306,7 +310,10 @@ impl SegmentAggregationCollector for SegmentTermCollector {
} }
// has subagg // has subagg
if let Some(blueprint) = self.blueprint.as_ref() { if let Some(blueprint) = self.blueprint.as_ref() {
for (doc, term_id) in bucket_agg_accessor.column_block_accessor.iter_docid_vals() { for (doc, term_id) in bucket_agg_accessor
.column_block_accessor
.iter_docid_vals(docs, &bucket_agg_accessor.accessor)
{
let sub_aggregations = self let sub_aggregations = self
.term_buckets .term_buckets
.sub_aggs .sub_aggs
@@ -535,6 +542,27 @@ impl SegmentTermCollector {
let val = bool::from_u64(val); let val = bool::from_u64(val);
dict.insert(IntermediateKey::Bool(val), intermediate_entry); dict.insert(IntermediateKey::Bool(val), intermediate_entry);
} }
} else if self.column_type == ColumnType::IpAddr {
let compact_space_accessor = agg_with_accessor
.accessor
.values
.clone()
.downcast_arc::<CompactSpaceU64Accessor>()
.map_err(|_| {
TantivyError::AggregationError(
crate::aggregation::AggregationError::InternalError(
"Type mismatch: Could not downcast to CompactSpaceU64Accessor"
.to_string(),
),
)
})?;
for (val, doc_count) in entries {
let intermediate_entry = into_intermediate_bucket_entry(val, doc_count)?;
let val: u128 = compact_space_accessor.compact_to_u128(val as u32);
let val = Ipv6Addr::from_u128(val);
dict.insert(IntermediateKey::IpAddr(val), intermediate_entry);
}
} else { } else {
for (val, doc_count) in entries { for (val, doc_count) in entries {
let intermediate_entry = into_intermediate_bucket_entry(val, doc_count)?; let intermediate_entry = into_intermediate_bucket_entry(val, doc_count)?;
@@ -587,6 +615,9 @@ pub(crate) fn cut_off_buckets<T: GetDocCount + Debug>(
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::net::IpAddr;
use std::str::FromStr;
use common::DateTime; use common::DateTime;
use time::{Date, Month}; use time::{Date, Month};
@@ -597,7 +628,7 @@ mod tests {
}; };
use crate::aggregation::AggregationLimits; use crate::aggregation::AggregationLimits;
use crate::indexer::NoMergePolicy; use crate::indexer::NoMergePolicy;
use crate::schema::{Schema, FAST, STRING}; use crate::schema::{IntoIpv6Addr, Schema, FAST, STRING};
use crate::{Index, IndexWriter}; use crate::{Index, IndexWriter};
#[test] #[test]
@@ -1179,9 +1210,9 @@ mod tests {
assert_eq!(res["my_texts"]["buckets"][0]["key"], "terma"); assert_eq!(res["my_texts"]["buckets"][0]["key"], "terma");
assert_eq!(res["my_texts"]["buckets"][0]["doc_count"], 4); assert_eq!(res["my_texts"]["buckets"][0]["doc_count"], 4);
assert_eq!(res["my_texts"]["buckets"][1]["key"], "termc"); assert_eq!(res["my_texts"]["buckets"][1]["key"], "termb");
assert_eq!(res["my_texts"]["buckets"][1]["doc_count"], 0); assert_eq!(res["my_texts"]["buckets"][1]["doc_count"], 0);
assert_eq!(res["my_texts"]["buckets"][2]["key"], "termb"); assert_eq!(res["my_texts"]["buckets"][2]["key"], "termc");
assert_eq!(res["my_texts"]["buckets"][2]["doc_count"], 0); assert_eq!(res["my_texts"]["buckets"][2]["doc_count"], 0);
assert_eq!(res["my_texts"]["sum_other_doc_count"], 0); assert_eq!(res["my_texts"]["sum_other_doc_count"], 0);
assert_eq!(res["my_texts"]["doc_count_error_upper_bound"], 0); assert_eq!(res["my_texts"]["doc_count_error_upper_bound"], 0);
@@ -1927,4 +1958,44 @@ mod tests {
Ok(()) Ok(())
} }
#[test]
fn terms_aggregation_ip_addr() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let field = schema_builder.add_ip_addr_field("ip_field", FAST);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
{
let mut writer = index.writer_with_num_threads(1, 15_000_000)?;
// IpV6 loopback
writer.add_document(doc!(field=>IpAddr::from_str("::1").unwrap().into_ipv6_addr()))?;
writer.add_document(doc!(field=>IpAddr::from_str("::1").unwrap().into_ipv6_addr()))?;
// IpV4
writer.add_document(
doc!(field=>IpAddr::from_str("127.0.0.1").unwrap().into_ipv6_addr()),
)?;
writer.commit()?;
}
let agg_req: Aggregations = serde_json::from_value(json!({
"my_bool": {
"terms": {
"field": "ip_field"
},
}
}))
.unwrap();
let res = exec_request_with_query(agg_req, &index, None)?;
// print as json
// println!("{}", serde_json::to_string_pretty(&res).unwrap());
assert_eq!(res["my_bool"]["buckets"][0]["key"], "::1");
assert_eq!(res["my_bool"]["buckets"][0]["doc_count"], 2);
assert_eq!(res["my_bool"]["buckets"][1]["key"], "127.0.0.1");
assert_eq!(res["my_bool"]["buckets"][1]["doc_count"], 1);
assert_eq!(res["my_bool"]["buckets"][2]["key"], serde_json::Value::Null);
Ok(())
}
} }

View File

@@ -5,6 +5,7 @@
use std::cmp::Ordering; use std::cmp::Ordering;
use std::collections::hash_map::Entry; use std::collections::hash_map::Entry;
use std::hash::Hash; use std::hash::Hash;
use std::net::Ipv6Addr;
use columnar::ColumnType; use columnar::ColumnType;
use itertools::Itertools; use itertools::Itertools;
@@ -41,6 +42,8 @@ pub struct IntermediateAggregationResults {
/// This might seem redundant with `Key`, but the point is to have a different /// This might seem redundant with `Key`, but the point is to have a different
/// Serialize implementation. /// Serialize implementation.
pub enum IntermediateKey { pub enum IntermediateKey {
/// Ip Addr key
IpAddr(Ipv6Addr),
/// Bool key /// Bool key
Bool(bool), Bool(bool),
/// String key /// String key
@@ -60,6 +63,14 @@ impl From<IntermediateKey> for Key {
fn from(value: IntermediateKey) -> Self { fn from(value: IntermediateKey) -> Self {
match value { match value {
IntermediateKey::Str(s) => Self::Str(s), IntermediateKey::Str(s) => Self::Str(s),
IntermediateKey::IpAddr(s) => {
// Prefer to use the IPv4 representation if possible
if let Some(ip) = s.to_ipv4_mapped() {
Self::Str(ip.to_string())
} else {
Self::Str(s.to_string())
}
}
IntermediateKey::F64(f) => Self::F64(f), IntermediateKey::F64(f) => Self::F64(f),
IntermediateKey::Bool(f) => Self::F64(f as u64 as f64), IntermediateKey::Bool(f) => Self::F64(f as u64 as f64),
} }
@@ -75,6 +86,7 @@ impl std::hash::Hash for IntermediateKey {
IntermediateKey::Str(text) => text.hash(state), IntermediateKey::Str(text) => text.hash(state),
IntermediateKey::F64(val) => val.to_bits().hash(state), IntermediateKey::F64(val) => val.to_bits().hash(state),
IntermediateKey::Bool(val) => val.hash(state), IntermediateKey::Bool(val) => val.hash(state),
IntermediateKey::IpAddr(val) => val.hash(state),
} }
} }
} }

View File

@@ -274,6 +274,10 @@ pub trait SegmentCollector: 'static {
fn collect(&mut self, doc: DocId, score: Score); fn collect(&mut self, doc: DocId, score: Score);
/// The query pushes the scored document to the collector via this method. /// The query pushes the scored document to the collector via this method.
/// This method is used when the collector does not require scoring.
///
/// See [`COLLECT_BLOCK_BUFFER_LEN`](crate::COLLECT_BLOCK_BUFFER_LEN) for the
/// buffer size passed to the collector.
fn collect_block(&mut self, docs: &[DocId]) { fn collect_block(&mut self, docs: &[DocId]) {
for doc in docs { for doc in docs {
self.collect(*doc, 0.0); self.collect(*doc, 0.0);

View File

@@ -52,10 +52,16 @@ impl<TCollector: Collector> Collector for CollectorWrapper<TCollector> {
impl SegmentCollector for Box<dyn BoxableSegmentCollector> { impl SegmentCollector for Box<dyn BoxableSegmentCollector> {
type Fruit = Box<dyn Fruit>; type Fruit = Box<dyn Fruit>;
#[inline]
fn collect(&mut self, doc: u32, score: Score) { fn collect(&mut self, doc: u32, score: Score) {
self.as_mut().collect(doc, score); self.as_mut().collect(doc, score);
} }
#[inline]
fn collect_block(&mut self, docs: &[DocId]) {
self.as_mut().collect_block(docs);
}
fn harvest(self) -> Box<dyn Fruit> { fn harvest(self) -> Box<dyn Fruit> {
BoxableSegmentCollector::harvest_from_box(self) BoxableSegmentCollector::harvest_from_box(self)
} }
@@ -63,6 +69,11 @@ impl SegmentCollector for Box<dyn BoxableSegmentCollector> {
pub trait BoxableSegmentCollector { pub trait BoxableSegmentCollector {
fn collect(&mut self, doc: u32, score: Score); fn collect(&mut self, doc: u32, score: Score);
fn collect_block(&mut self, docs: &[DocId]) {
for &doc in docs {
self.collect(doc, 0.0);
}
}
fn harvest_from_box(self: Box<Self>) -> Box<dyn Fruit>; fn harvest_from_box(self: Box<Self>) -> Box<dyn Fruit>;
} }
@@ -71,9 +82,14 @@ pub struct SegmentCollectorWrapper<TSegmentCollector: SegmentCollector>(TSegment
impl<TSegmentCollector: SegmentCollector> BoxableSegmentCollector impl<TSegmentCollector: SegmentCollector> BoxableSegmentCollector
for SegmentCollectorWrapper<TSegmentCollector> for SegmentCollectorWrapper<TSegmentCollector>
{ {
#[inline]
fn collect(&mut self, doc: u32, score: Score) { fn collect(&mut self, doc: u32, score: Score) {
self.0.collect(doc, score); self.0.collect(doc, score);
} }
#[inline]
fn collect_block(&mut self, docs: &[DocId]) {
self.0.collect_block(docs);
}
fn harvest_from_box(self: Box<Self>) -> Box<dyn Fruit> { fn harvest_from_box(self: Box<Self>) -> Box<dyn Fruit> {
Box::new(self.0.harvest()) Box::new(self.0.harvest())

View File

@@ -137,7 +137,6 @@ mod mmap_specific {
use tempfile::TempDir; use tempfile::TempDir;
use super::*; use super::*;
use crate::Directory;
#[test] #[test]
fn test_index_on_commit_reload_policy_mmap() -> crate::Result<()> { fn test_index_on_commit_reload_policy_mmap() -> crate::Result<()> {

View File

@@ -9,7 +9,10 @@ use crate::DocId;
/// to compare `[u32; 4]`. /// to compare `[u32; 4]`.
pub const TERMINATED: DocId = i32::MAX as u32; pub const TERMINATED: DocId = i32::MAX as u32;
pub const BUFFER_LEN: usize = 64; /// The collect_block method on `SegmentCollector` uses a buffer of this size.
/// Passed results to `collect_block` will not exceed this size and will be
/// exactly this size as long as we can fill the buffer.
pub const COLLECT_BLOCK_BUFFER_LEN: usize = 64;
/// Represents an iterable set of sorted doc ids. /// Represents an iterable set of sorted doc ids.
pub trait DocSet: Send { pub trait DocSet: Send {
@@ -61,7 +64,7 @@ pub trait DocSet: Send {
/// This method is only here for specific high-performance /// This method is only here for specific high-performance
/// use case where batching. The normal way to /// use case where batching. The normal way to
/// go through the `DocId`'s is to call `.advance()`. /// go through the `DocId`'s is to call `.advance()`.
fn fill_buffer(&mut self, buffer: &mut [DocId; BUFFER_LEN]) -> usize { fn fill_buffer(&mut self, buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN]) -> usize {
if self.doc() == TERMINATED { if self.doc() == TERMINATED {
return 0; return 0;
} }
@@ -151,7 +154,7 @@ impl<TDocSet: DocSet + ?Sized> DocSet for Box<TDocSet> {
unboxed.seek(target) unboxed.seek(target)
} }
fn fill_buffer(&mut self, buffer: &mut [DocId; BUFFER_LEN]) -> usize { fn fill_buffer(&mut self, buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN]) -> usize {
let unboxed: &mut TDocSet = self.borrow_mut(); let unboxed: &mut TDocSet = self.borrow_mut();
unboxed.fill_buffer(buffer) unboxed.fill_buffer(buffer)
} }

View File

@@ -158,8 +158,7 @@ mod tests_indexsorting {
use crate::indexer::doc_id_mapping::DocIdMapping; use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::indexer::NoMergePolicy; use crate::indexer::NoMergePolicy;
use crate::query::QueryParser; use crate::query::QueryParser;
use crate::schema::document::Value; use crate::schema::*;
use crate::schema::{Schema, *};
use crate::{DocAddress, Index, IndexSettings, IndexSortByField, Order}; use crate::{DocAddress, Index, IndexSettings, IndexSortByField, Order};
fn create_test_index( fn create_test_index(

View File

@@ -806,7 +806,6 @@ mod tests {
use columnar::{Cardinality, Column, MonotonicallyMappableToU128}; use columnar::{Cardinality, Column, MonotonicallyMappableToU128};
use itertools::Itertools; use itertools::Itertools;
use proptest::prop_oneof; use proptest::prop_oneof;
use proptest::strategy::Strategy;
use super::super::operation::UserOperation; use super::super::operation::UserOperation;
use crate::collector::TopDocs; use crate::collector::TopDocs;

View File

@@ -144,10 +144,10 @@ mod tests {
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use super::*; use super::*;
use crate::index::{SegmentId, SegmentMeta, SegmentMetaInventory}; use crate::index::SegmentMetaInventory;
use crate::indexer::merge_policy::MergePolicy; use crate::indexer::merge_policy::MergePolicy;
use crate::schema;
use crate::schema::INDEXED; use crate::schema::INDEXED;
use crate::{schema, SegmentId};
static INVENTORY: Lazy<SegmentMetaInventory> = Lazy::new(SegmentMetaInventory::default); static INVENTORY: Lazy<SegmentMetaInventory> = Lazy::new(SegmentMetaInventory::default);

View File

@@ -103,7 +103,7 @@ impl SegmentRegister {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::index::{SegmentId, SegmentMetaInventory}; use crate::index::SegmentMetaInventory;
use crate::indexer::delete_queue::*; use crate::indexer::delete_queue::*;
fn segment_ids(segment_register: &SegmentRegister) -> Vec<SegmentId> { fn segment_ids(segment_register: &SegmentRegister) -> Vec<SegmentId> {

View File

@@ -213,7 +213,7 @@ pub use common::{f64_to_u64, i64_to_u64, u64_to_f64, u64_to_i64, HasLen};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
pub use self::docset::{DocSet, TERMINATED}; pub use self::docset::{DocSet, COLLECT_BLOCK_BUFFER_LEN, TERMINATED};
#[deprecated( #[deprecated(
since = "0.22.0", since = "0.22.0",
note = "Will be removed in tantivy 0.23. Use export from snippet module instead" note = "Will be removed in tantivy 0.23. Use export from snippet module instead"
@@ -391,7 +391,6 @@ pub mod tests {
use crate::index::SegmentReader; use crate::index::SegmentReader;
use crate::merge_policy::NoMergePolicy; use crate::merge_policy::NoMergePolicy;
use crate::query::BooleanQuery; use crate::query::BooleanQuery;
use crate::schema::document::Value;
use crate::schema::*; use crate::schema::*;
use crate::{DateTime, DocAddress, Index, IndexWriter, Postings, ReloadPolicy}; use crate::{DateTime, DocAddress, Index, IndexWriter, Postings, ReloadPolicy};

View File

@@ -204,7 +204,11 @@ impl<Rec: Recorder> SpecializedPostingsWriter<Rec> {
impl<Rec: Recorder> PostingsWriter for SpecializedPostingsWriter<Rec> { impl<Rec: Recorder> PostingsWriter for SpecializedPostingsWriter<Rec> {
#[inline] #[inline]
fn subscribe(&mut self, doc: DocId, position: u32, term: &Term, ctx: &mut IndexingContext) { fn subscribe(&mut self, doc: DocId, position: u32, term: &Term, ctx: &mut IndexingContext) {
debug_assert!(term.serialized_term().len() >= 4); assert!(
term.serialized_term().len() >= 4,
"Term too short expect >=4 but got {:?}",
term.serialized_term()
);
self.total_num_tokens += 1; self.total_num_tokens += 1;
let (term_index, arena) = (&mut ctx.term_index, &mut ctx.arena); let (term_index, arena) = (&mut ctx.term_index, &mut ctx.arena);
term_index.mutate_or_create(term.serialized_term(), |opt_recorder: Option<Rec>| { term_index.mutate_or_create(term.serialized_term(), |opt_recorder: Option<Rec>| {

View File

@@ -1,4 +1,4 @@
use crate::docset::{DocSet, BUFFER_LEN, TERMINATED}; use crate::docset::{DocSet, COLLECT_BLOCK_BUFFER_LEN, TERMINATED};
use crate::index::SegmentReader; use crate::index::SegmentReader;
use crate::query::boost_query::BoostScorer; use crate::query::boost_query::BoostScorer;
use crate::query::explanation::does_not_match; use crate::query::explanation::does_not_match;
@@ -54,7 +54,7 @@ impl DocSet for AllScorer {
self.doc self.doc
} }
fn fill_buffer(&mut self, buffer: &mut [DocId; BUFFER_LEN]) -> usize { fn fill_buffer(&mut self, buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN]) -> usize {
if self.doc() == TERMINATED { if self.doc() == TERMINATED {
return 0; return 0;
} }
@@ -96,7 +96,7 @@ impl Scorer for AllScorer {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::AllQuery; use super::AllQuery;
use crate::docset::{DocSet, BUFFER_LEN, TERMINATED}; use crate::docset::{DocSet, COLLECT_BLOCK_BUFFER_LEN, TERMINATED};
use crate::query::{AllScorer, EnableScoring, Query}; use crate::query::{AllScorer, EnableScoring, Query};
use crate::schema::{Schema, TEXT}; use crate::schema::{Schema, TEXT};
use crate::{Index, IndexWriter}; use crate::{Index, IndexWriter};
@@ -162,16 +162,16 @@ mod tests {
pub fn test_fill_buffer() { pub fn test_fill_buffer() {
let mut postings = AllScorer { let mut postings = AllScorer {
doc: 0u32, doc: 0u32,
max_doc: BUFFER_LEN as u32 * 2 + 9, max_doc: COLLECT_BLOCK_BUFFER_LEN as u32 * 2 + 9,
}; };
let mut buffer = [0u32; BUFFER_LEN]; let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];
assert_eq!(postings.fill_buffer(&mut buffer), BUFFER_LEN); assert_eq!(postings.fill_buffer(&mut buffer), COLLECT_BLOCK_BUFFER_LEN);
for i in 0u32..BUFFER_LEN as u32 { for i in 0u32..COLLECT_BLOCK_BUFFER_LEN as u32 {
assert_eq!(buffer[i as usize], i); assert_eq!(buffer[i as usize], i);
} }
assert_eq!(postings.fill_buffer(&mut buffer), BUFFER_LEN); assert_eq!(postings.fill_buffer(&mut buffer), COLLECT_BLOCK_BUFFER_LEN);
for i in 0u32..BUFFER_LEN as u32 { for i in 0u32..COLLECT_BLOCK_BUFFER_LEN as u32 {
assert_eq!(buffer[i as usize], i + BUFFER_LEN as u32); assert_eq!(buffer[i as usize], i + COLLECT_BLOCK_BUFFER_LEN as u32);
} }
assert_eq!(postings.fill_buffer(&mut buffer), 9); assert_eq!(postings.fill_buffer(&mut buffer), 9);
} }

View File

@@ -1,6 +1,6 @@
use std::collections::HashMap; use std::collections::HashMap;
use crate::docset::BUFFER_LEN; use crate::docset::COLLECT_BLOCK_BUFFER_LEN;
use crate::index::SegmentReader; use crate::index::SegmentReader;
use crate::postings::FreqReadingOption; use crate::postings::FreqReadingOption;
use crate::query::explanation::does_not_match; use crate::query::explanation::does_not_match;
@@ -228,7 +228,7 @@ impl<TScoreCombiner: ScoreCombiner + Sync> Weight for BooleanWeight<TScoreCombin
callback: &mut dyn FnMut(&[DocId]), callback: &mut dyn FnMut(&[DocId]),
) -> crate::Result<()> { ) -> crate::Result<()> {
let scorer = self.complex_scorer(reader, 1.0, || DoNothingCombiner)?; let scorer = self.complex_scorer(reader, 1.0, || DoNothingCombiner)?;
let mut buffer = [0u32; BUFFER_LEN]; let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];
match scorer { match scorer {
SpecializedScorer::TermUnion(term_scorers) => { SpecializedScorer::TermUnion(term_scorers) => {

View File

@@ -1,6 +1,6 @@
use std::fmt; use std::fmt;
use crate::docset::BUFFER_LEN; use crate::docset::COLLECT_BLOCK_BUFFER_LEN;
use crate::fastfield::AliveBitSet; use crate::fastfield::AliveBitSet;
use crate::query::{EnableScoring, Explanation, Query, Scorer, Weight}; use crate::query::{EnableScoring, Explanation, Query, Scorer, Weight};
use crate::{DocId, DocSet, Score, SegmentReader, Term}; use crate::{DocId, DocSet, Score, SegmentReader, Term};
@@ -105,7 +105,7 @@ impl<S: Scorer> DocSet for BoostScorer<S> {
self.underlying.seek(target) self.underlying.seek(target)
} }
fn fill_buffer(&mut self, buffer: &mut [DocId; BUFFER_LEN]) -> usize { fn fill_buffer(&mut self, buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN]) -> usize {
self.underlying.fill_buffer(buffer) self.underlying.fill_buffer(buffer)
} }

View File

@@ -1,6 +1,6 @@
use std::fmt; use std::fmt;
use crate::docset::BUFFER_LEN; use crate::docset::COLLECT_BLOCK_BUFFER_LEN;
use crate::query::{EnableScoring, Explanation, Query, Scorer, Weight}; use crate::query::{EnableScoring, Explanation, Query, Scorer, Weight};
use crate::{DocId, DocSet, Score, SegmentReader, TantivyError, Term}; use crate::{DocId, DocSet, Score, SegmentReader, TantivyError, Term};
@@ -119,7 +119,7 @@ impl<TDocSet: DocSet> DocSet for ConstScorer<TDocSet> {
self.docset.seek(target) self.docset.seek(target)
} }
fn fill_buffer(&mut self, buffer: &mut [DocId; BUFFER_LEN]) -> usize { fn fill_buffer(&mut self, buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN]) -> usize {
self.docset.fill_buffer(buffer) self.docset.fill_buffer(buffer)
} }

View File

@@ -1,5 +1,5 @@
use super::term_scorer::TermScorer; use super::term_scorer::TermScorer;
use crate::docset::{DocSet, BUFFER_LEN}; use crate::docset::{DocSet, COLLECT_BLOCK_BUFFER_LEN};
use crate::fieldnorm::FieldNormReader; use crate::fieldnorm::FieldNormReader;
use crate::index::SegmentReader; use crate::index::SegmentReader;
use crate::postings::SegmentPostings; use crate::postings::SegmentPostings;
@@ -64,7 +64,7 @@ impl Weight for TermWeight {
callback: &mut dyn FnMut(&[DocId]), callback: &mut dyn FnMut(&[DocId]),
) -> crate::Result<()> { ) -> crate::Result<()> {
let mut scorer = self.specialized_scorer(reader, 1.0)?; let mut scorer = self.specialized_scorer(reader, 1.0)?;
let mut buffer = [0u32; BUFFER_LEN]; let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];
for_each_docset_buffered(&mut scorer, &mut buffer, callback); for_each_docset_buffered(&mut scorer, &mut buffer, callback);
Ok(()) Ok(())
} }

View File

@@ -53,7 +53,7 @@ impl HasLen for VecDocSet {
pub mod tests { pub mod tests {
use super::*; use super::*;
use crate::docset::{DocSet, BUFFER_LEN}; use crate::docset::{DocSet, COLLECT_BLOCK_BUFFER_LEN};
use crate::DocId; use crate::DocId;
#[test] #[test]
@@ -72,16 +72,16 @@ pub mod tests {
#[test] #[test]
pub fn test_fill_buffer() { pub fn test_fill_buffer() {
let doc_ids: Vec<DocId> = (1u32..=(BUFFER_LEN as u32 * 2 + 9)).collect(); let doc_ids: Vec<DocId> = (1u32..=(COLLECT_BLOCK_BUFFER_LEN as u32 * 2 + 9)).collect();
let mut postings = VecDocSet::from(doc_ids); let mut postings = VecDocSet::from(doc_ids);
let mut buffer = [0u32; BUFFER_LEN]; let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];
assert_eq!(postings.fill_buffer(&mut buffer), BUFFER_LEN); assert_eq!(postings.fill_buffer(&mut buffer), COLLECT_BLOCK_BUFFER_LEN);
for i in 0u32..BUFFER_LEN as u32 { for i in 0u32..COLLECT_BLOCK_BUFFER_LEN as u32 {
assert_eq!(buffer[i as usize], i + 1); assert_eq!(buffer[i as usize], i + 1);
} }
assert_eq!(postings.fill_buffer(&mut buffer), BUFFER_LEN); assert_eq!(postings.fill_buffer(&mut buffer), COLLECT_BLOCK_BUFFER_LEN);
for i in 0u32..BUFFER_LEN as u32 { for i in 0u32..COLLECT_BLOCK_BUFFER_LEN as u32 {
assert_eq!(buffer[i as usize], i + 1 + BUFFER_LEN as u32); assert_eq!(buffer[i as usize], i + 1 + COLLECT_BLOCK_BUFFER_LEN as u32);
} }
assert_eq!(postings.fill_buffer(&mut buffer), 9); assert_eq!(postings.fill_buffer(&mut buffer), 9);
} }

View File

@@ -1,5 +1,5 @@
use super::Scorer; use super::Scorer;
use crate::docset::BUFFER_LEN; use crate::docset::COLLECT_BLOCK_BUFFER_LEN;
use crate::index::SegmentReader; use crate::index::SegmentReader;
use crate::query::Explanation; use crate::query::Explanation;
use crate::{DocId, DocSet, Score, TERMINATED}; use crate::{DocId, DocSet, Score, TERMINATED};
@@ -22,7 +22,7 @@ pub(crate) fn for_each_scorer<TScorer: Scorer + ?Sized>(
#[inline] #[inline]
pub(crate) fn for_each_docset_buffered<T: DocSet + ?Sized>( pub(crate) fn for_each_docset_buffered<T: DocSet + ?Sized>(
docset: &mut T, docset: &mut T,
buffer: &mut [DocId; BUFFER_LEN], buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
mut callback: impl FnMut(&[DocId]), mut callback: impl FnMut(&[DocId]),
) { ) {
loop { loop {
@@ -105,7 +105,7 @@ pub trait Weight: Send + Sync + 'static {
) -> crate::Result<()> { ) -> crate::Result<()> {
let mut docset = self.scorer(reader, 1.0)?; let mut docset = self.scorer(reader, 1.0)?;
let mut buffer = [0u32; BUFFER_LEN]; let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];
for_each_docset_buffered(&mut docset, &mut buffer, callback); for_each_docset_buffered(&mut docset, &mut buffer, callback);
Ok(()) Ok(())
} }

View File

@@ -1,6 +1,5 @@
mod warming; mod warming;
use std::convert::TryInto;
use std::sync::atomic::AtomicU64; use std::sync::atomic::AtomicU64;
use std::sync::{atomic, Arc, Weak}; use std::sync::{atomic, Arc, Weak};

View File

@@ -288,7 +288,6 @@ impl TermInfoStoreWriter {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use common;
use common::BinarySerializable; use common::BinarySerializable;
use tantivy_bitpacker::{compute_num_bits, BitPacker}; use tantivy_bitpacker::{compute_num_bits, BitPacker};

View File

@@ -1,7 +1,7 @@
use std::borrow::Cow; use std::borrow::Cow;
use std::mem; use std::mem;
use rust_stemmers::{self, Algorithm}; use rust_stemmers::Algorithm;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use super::{Token, TokenFilter, TokenStream, Tokenizer}; use super::{Token, TokenFilter, TokenStream, Tokenizer};

View File

@@ -95,7 +95,6 @@ impl TokenStream for PreTokenizedStream {
mod tests { mod tests {
use super::*; use super::*;
use crate::tokenizer::Token;
#[test] #[test]
fn test_tokenized_stream() { fn test_tokenized_stream() {