Compare commits

..

1 Commits

Author SHA1 Message Date
Paul Masurel
d30cafa80a Fixes the behavior of the range doc set, especially on multivalued
cardinality.

We stop scanning from the beginning of the column which is a great
improvement.
This is done by introducing a stateful `SelectCursor`.
2023-02-13 04:02:14 +01:00
91 changed files with 1194 additions and 793 deletions

View File

@@ -2,9 +2,9 @@ name: Coverage
on:
push:
branches: [main]
branches: [ main ]
pull_request:
branches: [main]
branches: [ main ]
jobs:
coverage:
@@ -16,7 +16,7 @@ jobs:
- uses: Swatinem/rust-cache@v2
- uses: taiki-e/install-action@cargo-llvm-cov
- name: Generate code coverage
run: cargo +nightly llvm-cov --all-features --workspace --doctests --lcov --output-path lcov.info
run: cargo +nightly llvm-cov --all-features --workspace --lcov --output-path lcov.info
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
continue-on-error: true

23
ci/before_deploy.ps1 Normal file
View File

@@ -0,0 +1,23 @@
# This script takes care of packaging the build artifacts that will go in the
# release zipfile
$SRC_DIR = $PWD.Path
$STAGE = [System.Guid]::NewGuid().ToString()
Set-Location $ENV:Temp
New-Item -Type Directory -Name $STAGE
Set-Location $STAGE
$ZIP = "$SRC_DIR\$($Env:CRATE_NAME)-$($Env:APPVEYOR_REPO_TAG_NAME)-$($Env:TARGET).zip"
# TODO Update this to package the right artifacts
Copy-Item "$SRC_DIR\target\$($Env:TARGET)\release\hello.exe" '.\'
7z a "$ZIP" *
Push-AppveyorArtifact "$ZIP"
Remove-Item *.* -Force
Set-Location ..
Remove-Item $STAGE
Set-Location $SRC_DIR

33
ci/before_deploy.sh Normal file
View File

@@ -0,0 +1,33 @@
# This script takes care of building your crate and packaging it for release
set -ex
main() {
local src=$(pwd) \
stage=
case $TRAVIS_OS_NAME in
linux)
stage=$(mktemp -d)
;;
osx)
stage=$(mktemp -d -t tmp)
;;
esac
test -f Cargo.lock || cargo generate-lockfile
# TODO Update this to build the artifacts that matter to you
cross rustc --bin hello --target $TARGET --release -- -C lto
# TODO Update this to package the right artifacts
cp target/$TARGET/release/hello $stage/
cd $stage
tar czf $src/$CRATE_NAME-$TRAVIS_TAG-$TARGET.tar.gz *
cd $src
rm -rf $stage
}
main

47
ci/install.sh Normal file
View File

@@ -0,0 +1,47 @@
set -ex
main() {
local target=
if [ $TRAVIS_OS_NAME = linux ]; then
target=x86_64-unknown-linux-musl
sort=sort
else
target=x86_64-apple-darwin
sort=gsort # for `sort --sort-version`, from brew's coreutils.
fi
# Builds for iOS are done on OSX, but require the specific target to be
# installed.
case $TARGET in
aarch64-apple-ios)
rustup target install aarch64-apple-ios
;;
armv7-apple-ios)
rustup target install armv7-apple-ios
;;
armv7s-apple-ios)
rustup target install armv7s-apple-ios
;;
i386-apple-ios)
rustup target install i386-apple-ios
;;
x86_64-apple-ios)
rustup target install x86_64-apple-ios
;;
esac
# This fetches latest stable release
local tag=$(git ls-remote --tags --refs --exit-code https://github.com/japaric/cross \
| cut -d/ -f3 \
| grep -E '^v[0.1.0-9.]+$' \
| $sort --version-sort \
| tail -n1)
curl -LSfs https://japaric.github.io/trust/install.sh | \
sh -s -- \
--force \
--git japaric/cross \
--tag $tag \
--target $target
}
main

30
ci/script.sh Normal file
View File

@@ -0,0 +1,30 @@
#!/usr/bin/env bash
# This script takes care of testing your crate
set -ex
main() {
if [ ! -z $CODECOV ]; then
echo "Codecov"
cargo build --verbose && cargo coverage --verbose --all && bash <(curl -s https://codecov.io/bash) -s target/kcov
else
echo "Build"
cross build --target $TARGET
if [ ! -z $DISABLE_TESTS ]; then
return
fi
echo "Test"
cross test --target $TARGET --no-default-features --features mmap
cross test --target $TARGET --no-default-features --features mmap query-grammar
fi
for example in $(ls examples/*.rs)
do
cargo run --example $(basename $example .rs)
done
}
# we don't run the "test phase" when doing deploys
if [ -z $TRAVIS_TAG ]; then
main
fi

View File

@@ -22,6 +22,11 @@ tantivy-bitpacker = { version= "0.3", path = "../bitpacker/" }
proptest = "1"
more-asserts = "0.3.1"
rand = "0.8.5"
criterion = "0.4"
[features]
unstable = []
[[bench]]
name = "bench_index"
harness = false

View File

@@ -0,0 +1,91 @@
use std::ops::Range;
use criterion::*;
use rand::prelude::*;
use tantivy_columnar::column_index::MultiValueIndex;
use tantivy_columnar::RowId;
const WINDOW: usize = 40;
fn bench_multi_value_index_util(
len_range: Range<u32>,
num_rows: RowId,
select_value_ratio: f64,
b: &mut criterion::Bencher,
) {
let mut start_index: Vec<RowId> = vec![0u32];
let mut cursor: u32 = 0u32;
let mut rng = StdRng::from_seed([16u8; 32]);
for i in 0..num_rows {
let num_vals = rng.gen_range(len_range.clone());
cursor += num_vals;
start_index.push(cursor);
}
let select_rows: Vec<RowId> = (0u32..cursor)
.filter(|i| rng.gen_bool(select_value_ratio))
.collect();
let mv_index = MultiValueIndex::for_test(&start_index);
// mv_index.select_batch_in_place(0, &mut select_rows[..]);
let mut buffer = Vec::new();
b.iter(|| {
let mut start_row = 0u32;
let mut len = 0;
for chunk in select_rows.chunks(WINDOW) {
buffer.clear();
buffer.extend_from_slice(chunk);
mv_index.select_batch_in_place(start_row, &mut buffer);
start_row = buffer.last().copied().unwrap();
len += buffer.len()
}
assert_eq!(len, 4303);
len
});
}
fn bench_multi_value_index_util2(
len_range: Range<u32>,
num_rows: RowId,
select_value_ratio: f64,
b: &mut criterion::Bencher,
) {
let mut start_index: Vec<RowId> = vec![0u32];
let mut cursor: u32 = 0u32;
let mut rng = StdRng::from_seed([16u8; 32]);
for i in 0..num_rows {
let num_vals = rng.gen_range(len_range.clone());
cursor += num_vals;
start_index.push(cursor);
}
let select_rows: Vec<RowId> = (0u32..cursor)
.filter(|i| rng.gen_bool(select_value_ratio))
.collect();
let mv_index = MultiValueIndex::for_test(&start_index);
// mv_index.select_batch_in_place(0, &mut select_rows[..]);
let mut buffer = Vec::new();
b.iter(|| {
let mut mv_index_cursor = mv_index.select_cursor();
let mut len = 0;
for chunk in select_rows.chunks(WINDOW) {
buffer.clear();
buffer.extend_from_slice(chunk);
mv_index_cursor.select_batch_in_place(&mut buffer);
len += buffer.len();
}
assert_eq!(len, 4303);
len
});
}
fn select_benchmark(c: &mut criterion::Criterion) {
c.bench_function("bench_multi_value_index_10_100", |b| {
bench_multi_value_index_util(0..10, 100_000, 0.01f64, b)
});
c.bench_function("bench_multi_value_cursor_index_10_100", |b| {
bench_multi_value_index_util2(0..10, 100_000, 0.01f64, b)
});
}
criterion_group!(benches, select_benchmark);
criterion_main!(benches);

View File

@@ -58,7 +58,7 @@ fn bench_intfastfield_getrange_u128_50percent_hit(b: &mut Bencher) {
b.iter(|| {
let mut positions = Vec::new();
column.get_row_ids_for_value_range(
column.get_docids_for_value_range(
*FIFTY_PERCENT_RANGE.start() as u128..=*FIFTY_PERCENT_RANGE.end() as u128,
0..data.len() as u32,
&mut positions,
@@ -74,7 +74,7 @@ fn bench_intfastfield_getrange_u128_single_hit(b: &mut Bencher) {
b.iter(|| {
let mut positions = Vec::new();
column.get_row_ids_for_value_range(
column.get_docids_for_value_range(
*SINGLE_ITEM_RANGE.start() as u128..=*SINGLE_ITEM_RANGE.end() as u128,
0..data.len() as u32,
&mut positions,
@@ -90,7 +90,7 @@ fn bench_intfastfield_getrange_u128_hit_all(b: &mut Bencher) {
b.iter(|| {
let mut positions = Vec::new();
column.get_row_ids_for_value_range(0..=u128::MAX, 0..data.len() as u32, &mut positions);
column.get_docids_for_value_range(0..=u128::MAX, 0..data.len() as u32, &mut positions);
positions
});
}

View File

@@ -89,7 +89,7 @@ fn bench_intfastfield_getrange_u64_50percent_hit(b: &mut Bencher) {
let column: Arc<dyn ColumnValues<u64>> = serialize_and_load(&data, CodecType::Bitpacked);
b.iter(|| {
let mut positions = Vec::new();
column.get_row_ids_for_value_range(
column.get_docids_for_value_range(
FIFTY_PERCENT_RANGE,
0..data.len() as u32,
&mut positions,
@@ -106,7 +106,7 @@ fn bench_intfastfield_getrange_u64_1percent_hit(b: &mut Bencher) {
b.iter(|| {
let mut positions = Vec::new();
column.get_row_ids_for_value_range(
column.get_docids_for_value_range(
ONE_PERCENT_ITEM_RANGE,
0..data.len() as u32,
&mut positions,
@@ -123,7 +123,7 @@ fn bench_intfastfield_getrange_u64_single_hit(b: &mut Bencher) {
b.iter(|| {
let mut positions = Vec::new();
column.get_row_ids_for_value_range(SINGLE_ITEM_RANGE, 0..data.len() as u32, &mut positions);
column.get_docids_for_value_range(SINGLE_ITEM_RANGE, 0..data.len() as u32, &mut positions);
positions
});
}
@@ -136,7 +136,7 @@ fn bench_intfastfield_getrange_u64_hit_all(b: &mut Bencher) {
b.iter(|| {
let mut positions = Vec::new();
column.get_row_ids_for_value_range(0..=u64::MAX, 0..data.len() as u32, &mut positions);
column.get_docids_for_value_range(0..=u64::MAX, 0..data.len() as u32, &mut positions);
positions
});
}

View File

@@ -11,7 +11,6 @@
# Perf and Size
* remove alloc in `ord_to_term`
+ multivaued range queries restrat frm the beginning all of the time.
* re-add ZSTD compression for dictionaries
no systematic monotonic mapping
consider removing multilinear

View File

@@ -32,7 +32,7 @@ impl BytesColumn {
/// Returns the number of rows in the column.
pub fn num_rows(&self) -> RowId {
self.term_ord_column.num_docs()
self.term_ord_column.num_rows()
}
pub fn term_ords(&self, row_id: RowId) -> impl Iterator<Item = u64> + '_ {

View File

@@ -3,7 +3,7 @@ mod serialize;
use std::fmt::Debug;
use std::io::Write;
use std::ops::{Deref, Range, RangeInclusive};
use std::ops::Deref;
use std::sync::Arc;
use common::BinarySerializable;
@@ -42,14 +42,14 @@ impl<T: PartialOrd + Copy + Debug + Send + Sync + 'static> Column<T> {
self.idx.get_cardinality()
}
pub fn num_docs(&self) -> RowId {
pub fn num_rows(&self) -> RowId {
match &self.idx {
ColumnIndex::Full => self.values.num_vals() as u32,
ColumnIndex::Optional(optional_index) => optional_index.num_docs(),
ColumnIndex::Optional(optional_index) => optional_index.num_rows(),
ColumnIndex::Multivalued(col_index) => {
// The multivalued index contains all value start row_id,
// and one extra value at the end with the overall number of rows.
col_index.num_docs()
col_index.num_rows()
}
}
}
@@ -71,25 +71,6 @@ impl<T: PartialOrd + Copy + Debug + Send + Sync + 'static> Column<T> {
.map(|value_row_id: RowId| self.values.get_val(value_row_id))
}
/// Get the docids of values which are in the provided value range.
#[inline]
pub fn get_docids_for_value_range(
&self,
value_range: RangeInclusive<T>,
selected_docid_range: Range<u32>,
docids: &mut Vec<u32>,
) {
// convert passed docid range to row id range
let rowid_range = self.idx.docid_range_to_rowids(selected_docid_range.clone());
// Load rows
self.values
.get_row_ids_for_value_range(value_range, rowid_range, docids);
// Convert rows to docids
self.idx
.select_batch_in_place(docids, selected_docid_range.start);
}
/// Fils the output vector with the (possibly multiple values that are associated_with
/// `row_id`.
///
@@ -151,8 +132,8 @@ impl<T: PartialOrd + Debug + Send + Sync + Copy + 'static> ColumnValues<T>
fn num_vals(&self) -> u32 {
match &self.column.idx {
ColumnIndex::Full => self.column.values.num_vals(),
ColumnIndex::Optional(optional_idx) => optional_idx.num_docs(),
ColumnIndex::Multivalued(multivalue_idx) => multivalue_idx.num_docs(),
ColumnIndex::Optional(optional_idx) => optional_idx.num_rows(),
ColumnIndex::Multivalued(multivalue_idx) => multivalue_idx.num_rows(),
}
}
}

View File

@@ -93,7 +93,11 @@ fn iter_num_values<'a>(
match column_index {
ColumnIndex::Full => 1,
ColumnIndex::Optional(optional_index) => {
u32::from(optional_index.contains(row_addr.row_id))
if optional_index.contains(row_addr.row_id) {
1u32
} else {
0u32
}
}
ColumnIndex::Multivalued(multivalued_index) => {
multivalued_index.range(row_addr.row_id).len() as u32

View File

@@ -9,8 +9,89 @@ pub use merge::merge_column_index;
pub use optional_index::{OptionalIndex, Set};
pub use serialize::{open_column_index, serialize_column_index, SerializableColumnIndex};
use crate::column_index::multivalued_index::MultiValueIndex;
use crate::{Cardinality, DocId, RowId};
pub use crate::column_index::multivalued_index::{MultiValueIndex, MultiValueIndexCursor};
use crate::column_index::optional_index::OptionalIndexSelectCursor;
use crate::{Cardinality, RowId};
pub struct ColumnIndexSelectCursor {
last_rank: Option<RowId>,
cardinality_specific_impl: CardinalitySpecificSelectCursor,
}
impl From<CardinalitySpecificSelectCursor> for ColumnIndexSelectCursor {
fn from(cardinality_specific_impl: CardinalitySpecificSelectCursor) -> Self {
ColumnIndexSelectCursor {
last_rank: None,
cardinality_specific_impl,
}
}
}
enum CardinalitySpecificSelectCursor {
Full,
Optional(OptionalIndexSelectCursor),
Multivalued(MultiValueIndexCursor),
}
/// This cursor object point is to compute batches of `select` operations.
///
/// Regardless of cardinality, a column index can always be seen as a mapping
/// from row_id -> start_value_row_id. By definition, it is increasing.
/// If `left <= right, column_index[left] <= column_index[right]`.
///
/// The select operation then identifies, given a value row id, which row it
/// belong to: it is the inverse mapping.
///
/// As a more formal definition, `select(rank)` is defined as the only `i` such that
/// mapping[i] <= rank and mapping[i+1] < rank.
/// Another way to define it is to say that it is the last i such that
/// mapping[i] <= rank.
/// Finally it can be defined as the number of `row_id` such that
/// mapping[i] <= rank.
///
/// `select_batch_in_place` is a complex function that copmutes
/// select operation in batches and in place.
///
/// For optimization reasons, it only supports supplying ever striclty increasing
/// values of `rank_ids`, even cross calls.
///
/// It is also required from the caller, to only supply rank_ids lower than max(mapping).
/// Within those condition, the returned `row_ids` are guaranteed to be unique.
///
/// # Panics
///
/// Panics if the supplied rank_ids are not increasing from one call to another.
/// We only check that the `rank_ids` Vec is increasing in debug mode for
/// performance reason.
impl ColumnIndexSelectCursor {
/// Returns a list of
pub fn select_batch_in_place(&mut self, rank_ids: &mut Vec<RowId>) {
// `rank_ids` has to be sorted.
debug_assert!(rank_ids.windows(2).all(|window| window[0] < window[1]));
// Two consecutive calls must pass strictly increasing `rank_ids`.
let (Some(first_rank), Some(new_last_rank)) = (rank_ids.first().copied(), rank_ids.last().copied()) else {
// rank_ids is empty, there is nothing to do.
return;
};
if let Some(last_rank) = self.last_rank {
assert!(last_rank < first_rank);
}
self.last_rank = Some(new_last_rank);
match &mut self.cardinality_specific_impl {
CardinalitySpecificSelectCursor::Full => {
// No need to do anything:
// `value_idx` and `row_idx` are the same.
}
CardinalitySpecificSelectCursor::Optional(optional_index) => {
optional_index.select_batch_in_place(&mut rank_ids[..]);
}
CardinalitySpecificSelectCursor::Multivalued(multivalued_index) => {
// TODO important: avoid using 0u32, and restart from the beginning all of the time.
multivalued_index.select_batch_in_place(rank_ids)
}
}
}
}
#[derive(Clone)]
pub enum ColumnIndex {
@@ -43,61 +124,39 @@ impl ColumnIndex {
}
/// Returns true if and only if there are at least one value associated to the row.
pub fn has_value(&self, doc_id: DocId) -> bool {
pub fn has_value(&self, row_id: RowId) -> bool {
match self {
ColumnIndex::Full => true,
ColumnIndex::Optional(optional_index) => optional_index.contains(doc_id),
ColumnIndex::Optional(optional_index) => optional_index.contains(row_id),
ColumnIndex::Multivalued(multivalued_index) => {
!multivalued_index.range(doc_id).is_empty()
multivalued_index.range(row_id).len() > 0
}
}
}
pub fn value_row_ids(&self, doc_id: DocId) -> Range<RowId> {
pub fn value_row_ids(&self, row_id: RowId) -> Range<RowId> {
match self {
ColumnIndex::Full => doc_id..doc_id + 1,
ColumnIndex::Full => row_id..row_id + 1,
ColumnIndex::Optional(optional_index) => {
if let Some(val) = optional_index.rank_if_exists(doc_id) {
if let Some(val) = optional_index.rank_if_exists(row_id) {
val..val + 1
} else {
0..0
}
}
ColumnIndex::Multivalued(multivalued_index) => multivalued_index.range(doc_id),
ColumnIndex::Multivalued(multivalued_index) => multivalued_index.range(row_id),
}
}
pub fn docid_range_to_rowids(&self, doc_id: Range<DocId>) -> Range<RowId> {
pub fn select_cursor(&self) -> ColumnIndexSelectCursor {
match self {
ColumnIndex::Full => doc_id,
ColumnIndex::Full => CardinalitySpecificSelectCursor::Full.into(),
ColumnIndex::Optional(optional_index) => {
let row_start = optional_index.rank(doc_id.start);
let row_end = optional_index.rank(doc_id.end);
row_start..row_end
CardinalitySpecificSelectCursor::Optional(optional_index.select_cursor()).into()
}
ColumnIndex::Multivalued(multivalued_index) => {
let end_docid = doc_id.end.min(multivalued_index.num_docs() - 1) + 1;
let start_docid = doc_id.start.min(end_docid);
let row_start = multivalued_index.start_index_column.get_val(start_docid);
let row_end = multivalued_index.start_index_column.get_val(end_docid);
row_start..row_end
}
}
}
pub fn select_batch_in_place(&self, rank_ids: &mut Vec<RowId>, doc_id_start: DocId) {
match self {
ColumnIndex::Full => {
// No need to do anything:
// value_idx and row_idx are the same.
}
ColumnIndex::Optional(optional_index) => {
optional_index.select_batch(&mut rank_ids[..]);
}
ColumnIndex::Multivalued(multivalued_index) => {
multivalued_index.select_batch_in_place(doc_id_start, rank_ids)
CardinalitySpecificSelectCursor::Multivalued(multivalued_index.select_cursor())
.into()
}
}
}

View File

@@ -8,7 +8,7 @@ use common::OwnedBytes;
use crate::column_values::u64_based::CodecType;
use crate::column_values::ColumnValues;
use crate::iterable::Iterable;
use crate::{DocId, RowId};
use crate::RowId;
pub fn serialize_multivalued_index(
multivalued_index: &dyn Iterable<RowId>,
@@ -35,12 +35,6 @@ pub struct MultiValueIndex {
pub start_index_column: Arc<dyn crate::ColumnValues<RowId>>,
}
impl From<Arc<dyn ColumnValues<RowId>>> for MultiValueIndex {
fn from(start_index_column: Arc<dyn ColumnValues<RowId>>) -> Self {
MultiValueIndex { start_index_column }
}
}
impl MultiValueIndex {
pub fn for_test(start_offsets: &[RowId]) -> MultiValueIndex {
let mut buffer = Vec::new();
@@ -52,90 +46,314 @@ impl MultiValueIndex {
/// Returns `[start, end)`, such that the values associated with
/// the given document are `start..end`.
#[inline]
pub(crate) fn range(&self, doc_id: DocId) -> Range<RowId> {
let start = self.start_index_column.get_val(doc_id);
let end = self.start_index_column.get_val(doc_id + 1);
pub(crate) fn range(&self, row_id: RowId) -> Range<RowId> {
let start = self.start_index_column.get_val(row_id);
let end = self.start_index_column.get_val(row_id + 1);
start..end
}
/// Returns the number of documents in the index.
#[inline]
pub fn num_docs(&self) -> u32 {
pub fn num_rows(&self) -> u32 {
self.start_index_column.num_vals() - 1
}
/// Converts a list of ranks (row ids of values) in a 1:n index to the corresponding list of
/// docids. Positions are converted inplace to docids.
pub fn select_cursor(&self) -> MultiValueIndexCursor {
MultiValueIndexCursor {
multivalued_index: self.clone(),
row_cursor: 0u32,
}
}
}
pub struct MultiValueIndexCursor {
multivalued_index: MultiValueIndex,
row_cursor: RowId,
}
impl MultiValueIndexCursor {
/// See contract in `ColumnIndexSelectCursor`.
///
/// Since there is no index for value pos -> docid, but docid -> value pos range, we scan the
/// index.
/// Multi valued cardinality is special for two different
/// ranks `rank_left` and `rank_right`, we can end up with
/// the same `select(rank_left)` and `select(rank_right)`.
///
/// Correctness: positions needs to be sorted. idx_reader needs to contain monotonically
/// increasing positions.
///
/// TODO: Instead of a linear scan we can employ a exponential search into binary search to
/// match a docid to its value position.
#[allow(clippy::bool_to_int_with_if)]
pub(crate) fn select_batch_in_place(&self, docid_start: DocId, ranks: &mut Vec<u32>) {
/// For this reason, this function includes extra complexity
/// to prevent the cursor from emitting the same row_id.
/// - From a last call, by skipping ranks mapping to
/// the same row_id
/// - With the batch, by simply deduplicating the output.
pub fn select_batch_in_place(&mut self, ranks: &mut Vec<RowId>) {
if ranks.is_empty() {
return;
}
let mut cur_doc = docid_start;
let mut last_doc = None;
let mut row_cursor = self.row_cursor;
assert!(self.start_index_column.get_val(docid_start) as u32 <= ranks[0]);
let mut write_cursor_id = usize::MAX;
let mut last_written_row_id = u32::MAX;
let mut write_doc_pos = 0;
for i in 0..ranks.len() {
let pos = ranks[i];
loop {
let end = self.start_index_column.get_val(cur_doc + 1) as u32;
if end > pos {
ranks[write_doc_pos] = cur_doc;
write_doc_pos += if last_doc == Some(cur_doc) { 0 } else { 1 };
last_doc = Some(cur_doc);
break;
}
cur_doc += 1;
// We skip all of the ranks that we already passed.
//
// It is possible in the case of multivalued, for a the first
// few rank to belong to the same row_id as the last rank
// of the previous call.
let start_bound = self
.multivalued_index
.start_index_column
.get_val(row_cursor);
let mut skip = 0;
while ranks[skip] < start_bound {
skip += 1;
if skip == ranks.len() {
ranks.clear();
return;
}
}
ranks.truncate(write_doc_pos);
for i in skip..ranks.len() {
let rank = ranks[i];
let row_id = loop {
// TODO See if we can find a way to introduce a function in
// ColumnValue to remove dynamic dispatch.
// This is tricky however... because it only applies to T=u32.
//
// TODO consider using exponential search.
let end = self
.multivalued_index
.start_index_column
.get_val(row_cursor + 1) as u32;
if end > rank {
break row_cursor;
}
row_cursor += 1;
};
// We remove duplicates in a branchless fashion: we only advance
// the write cursor when we are writing a value different from
// the last written value.
write_cursor_id =
write_cursor_id.wrapping_add(if row_id == last_written_row_id { 0 } else { 1 });
ranks[write_cursor_id] = row_id;
last_written_row_id = row_id;
}
self.row_cursor = row_cursor + 1;
ranks.truncate(write_cursor_id + 1);
}
}
#[cfg(test)]
mod tests {
use std::ops::Range;
use std::sync::Arc;
use super::MultiValueIndex;
use crate::column_values::IterColumn;
use crate::{ColumnValues, RowId};
use proptest::prelude::*;
fn index_to_pos_helper(
index: &MultiValueIndex,
doc_id_range: Range<u32>,
positions: &[u32],
) -> Vec<u32> {
fn index_to_pos_helper(index: &MultiValueIndex, positions: &[u32]) -> Vec<u32> {
let mut positions = positions.to_vec();
index.select_batch_in_place(doc_id_range.start, &mut positions);
let mut cursor = index.select_cursor();
cursor.select_batch_in_place(&mut positions);
positions
}
// Value row id ranges are [0..10, 10..12, 12..15, etc.]
const START_OFFSETS: &[RowId] = &[0, 10, 12, 15, 22, 23];
#[track_caller]
fn test_multivalue_select_cursor_aux(
start_offsets: &'static [RowId],
ranks: &[RowId],
expected: &[RowId],
) {
let column: Arc<dyn ColumnValues<RowId>> =
Arc::new(IterColumn::from(start_offsets.iter().copied()));
let index = MultiValueIndex {
start_index_column: column,
};
assert_eq!(&index_to_pos_helper(&index, &ranks), expected);
}
#[test]
fn test_positions_to_docid() {
let offsets: Vec<RowId> = vec![0, 10, 12, 15, 22, 23]; // docid values are [0..10, 10..12, 12..15, etc.]
let column: Arc<dyn ColumnValues<RowId>> = Arc::new(IterColumn::from(offsets.into_iter()));
let index = MultiValueIndex::from(column);
assert_eq!(index.num_docs(), 5);
let positions = &[10u32, 11, 15, 20, 21, 22];
assert_eq!(index_to_pos_helper(&index, 0..5, positions), vec![1, 3, 4]);
assert_eq!(index_to_pos_helper(&index, 1..5, positions), vec![1, 3, 4]);
assert_eq!(index_to_pos_helper(&index, 0..5, &[9]), vec![0]);
assert_eq!(index_to_pos_helper(&index, 1..5, &[10]), vec![1]);
assert_eq!(index_to_pos_helper(&index, 1..5, &[11]), vec![1]);
assert_eq!(index_to_pos_helper(&index, 2..5, &[12]), vec![2]);
assert_eq!(index_to_pos_helper(&index, 2..5, &[12, 14]), vec![2]);
assert_eq!(index_to_pos_helper(&index, 2..5, &[12, 14, 15]), vec![2, 3]);
fn test_multivalue_select_cursor_empty() {
test_multivalue_select_cursor_aux(START_OFFSETS, &[], &[]);
}
#[test]
fn test_multivalue_select_cursor_single() {
test_multivalue_select_cursor_aux(START_OFFSETS, &[9], &[0]);
test_multivalue_select_cursor_aux(START_OFFSETS, &[10], &[1]);
test_multivalue_select_cursor_aux(START_OFFSETS, &[11], &[1]);
test_multivalue_select_cursor_aux(START_OFFSETS, &[11], &[1]);
test_multivalue_select_cursor_aux(START_OFFSETS, &[12], &[2]);
}
#[test]
fn test_multivalue_select_cursor_duplicates() {
test_multivalue_select_cursor_aux(START_OFFSETS, &[12, 14], &[2]);
}
#[test]
fn test_multivalue_select_cursor_complex() {
test_multivalue_select_cursor_aux(START_OFFSETS, &[10, 11, 15, 20, 21, 22], &[1, 3, 4])
}
#[test]
fn test_multivalue_select_corner_case_skip_all() {
let column: Arc<dyn ColumnValues<RowId>> =
Arc::new(IterColumn::from([0, 10].into_iter()));
let index = MultiValueIndex {
start_index_column: column,
};
let mut cursor = index.select_cursor();
{
let mut ranks = vec![0];
cursor.select_batch_in_place(&mut ranks);
assert_eq!(ranks, &[0]);
}
{
let mut ranks = vec![5];
cursor.select_batch_in_place(&mut ranks);
assert_eq!(ranks, &[]);
}
}
#[test]
fn test_multi_value_index_cursor_bug() {
let column: Arc<dyn ColumnValues<RowId>> =
Arc::new(IterColumn::from([0, 10].into_iter()));
let index = MultiValueIndex {
start_index_column: column,
};
let mut cursor = index.select_cursor();
{
let mut ranks = vec![0];
cursor.select_batch_in_place(&mut ranks);
assert_eq!(ranks, &[0]);
}
{
let mut ranks = vec![4];
cursor.select_batch_in_place(&mut ranks);
assert_eq!(ranks, &[]);
}
{
let mut ranks = vec![9];
cursor.select_batch_in_place(&mut ranks);
assert_eq!(ranks, &[]);
}
}
#[test]
fn test_multivalue_select_cursor_skip_already_emitted() {
let column: Arc<dyn ColumnValues<RowId>> =
Arc::new(IterColumn::from(START_OFFSETS.iter().copied()));
let index = MultiValueIndex {
start_index_column: column,
};
let mut cursor = index.select_cursor();
{
let mut ranks = vec![1, 10];
cursor.select_batch_in_place(&mut ranks);
assert_eq!(ranks, &[0, 1]);
}
{
// Here we skip row_id = 1.
let mut ranks = vec![11, 12];
cursor.select_batch_in_place(&mut ranks);
assert_eq!(ranks, &[2]);
}
}
fn start_index_strategy() -> impl Strategy<Value = Vec<RowId>> {
proptest::collection::vec(0u32..3u32, 1..6)
.prop_map(|deltas: Vec<u32>| {
let mut start_offsets: Vec<RowId> = Vec::with_capacity(deltas.len() + 1);
let mut cumul = 0u32;
start_offsets.push(cumul);
for delta in deltas {
cumul += delta;
if cumul >= 10 {
break;
}
start_offsets.push(cumul);
}
start_offsets.push(10);
start_offsets
})
}
fn query_strategy() -> impl Strategy<Value = Vec<Vec<RowId>> > {
proptest::collection::btree_set(0u32..10u32, 1..=10)
.prop_flat_map(|els| {
let els: Vec<RowId> = els.into_iter().collect();
proptest::collection::btree_set(0..els.len(), 0..els.len())
.prop_map(move |mut split_positions| {
split_positions.insert(els.len());
let mut queries: Vec<Vec<RowId>> = Vec::with_capacity(split_positions.len() + 1);
let mut cursor = 0;
for split_position in split_positions {
queries.push(els[cursor..split_position].to_vec());
cursor = split_position;
}
queries
})
})
}
/// Simple inefficient implementation used for reference.
struct SimpleSelectCursor {
start_indexes: Vec<RowId>,
last_emitted_row_id: Option<RowId>,
}
impl SimpleSelectCursor {
fn select(&self, rank: u32) -> RowId {
for i in 0..self.start_indexes.len() - 1 {
if self.start_indexes[i] <= rank && self.start_indexes[i + 1] > rank{
return i as u32;
}
}
panic!();
}
fn select_batch_in_place(&mut self, ranks: &mut Vec<RowId>) {
if ranks.is_empty() {
return;
}
for rank in ranks.iter_mut() {
*rank = self.select(*rank);
}
ranks.dedup();
if ranks.first().copied() == self.last_emitted_row_id {
ranks.remove(0);
}
if let Some(last_emitted) = ranks.last().copied() {
self.last_emitted_row_id = Some(last_emitted);
}
}
}
proptest! {
#[test]
fn test_multi_value_index_cursor_proptest(start_indexes in start_index_strategy(), mut queries in query_strategy()) {
let mut simple_select_cursor = SimpleSelectCursor {
start_indexes: start_indexes.clone(),
last_emitted_row_id: None
};
let column: Arc<dyn ColumnValues<RowId>> =
Arc::new(IterColumn::from(start_indexes.into_iter()));
let index = MultiValueIndex { start_index_column: column };
let mut select_cursor = index.select_cursor();
for query in queries.iter_mut() {
let mut query_clone = query.clone();
select_cursor.select_batch_in_place(query);
simple_select_cursor.select_batch_in_place(&mut query_clone);
assert_eq!(&query[..], &query_clone[..]);
}
}
}
}

View File

@@ -11,7 +11,7 @@ use set_block::{
};
use crate::iterable::Iterable;
use crate::{DocId, InvalidData, RowId};
use crate::{InvalidData, RowId};
/// The threshold for for number of elements after which we switch to dense block encoding.
///
@@ -118,17 +118,17 @@ impl<'a> BlockSelectCursor<'a> {
}
}
}
pub struct OptionalIndexSelectCursor<'a> {
current_block_cursor: BlockSelectCursor<'a>,
pub struct OptionalIndexSelectCursor {
current_block_cursor: BlockSelectCursor<'static>,
current_block_id: u16,
// The current block is guaranteed to contain ranks < end_rank.
current_block_end_rank: RowId,
optional_index: &'a OptionalIndex,
optional_index: OptionalIndex,
block_doc_idx_start: RowId,
num_null_rows_before_block: RowId,
}
impl<'a> OptionalIndexSelectCursor<'a> {
impl OptionalIndexSelectCursor {
fn search_and_load_block(&mut self, rank: RowId) {
if rank < self.current_block_end_rank {
// we are already in the right block
@@ -145,14 +145,23 @@ impl<'a> OptionalIndexSelectCursor<'a> {
let block_meta = self.optional_index.block_metas[self.current_block_id as usize];
self.num_null_rows_before_block = block_meta.non_null_rows_before_block;
let block: Block<'_> = self.optional_index.block(block_meta);
self.current_block_cursor = match block {
let current_block_cursor = match block {
Block::Dense(dense_block) => BlockSelectCursor::Dense(dense_block.select_cursor()),
Block::Sparse(sparse_block) => BlockSelectCursor::Sparse(sparse_block.select_cursor()),
};
// We are building a self-owned `OptionalIndexSelectCursor`.
self.current_block_cursor = unsafe { std::mem::transmute(current_block_cursor) };
}
pub fn select_batch_in_place(&mut self, ranks: &mut [RowId]) {
// TODO see if we can batch at the block level as well for optimization purposes.
for rank in ranks {
*rank = self.select(*rank);
}
}
}
impl<'a> SelectCursor<RowId> for OptionalIndexSelectCursor<'a> {
impl<'a> SelectCursor<RowId> for OptionalIndexSelectCursor {
fn select(&mut self, rank: RowId) -> RowId {
self.search_and_load_block(rank);
let index_in_block = (rank - self.num_null_rows_before_block) as u16;
@@ -161,7 +170,7 @@ impl<'a> SelectCursor<RowId> for OptionalIndexSelectCursor<'a> {
}
impl Set<RowId> for OptionalIndex {
type SelectCursor<'b> = OptionalIndexSelectCursor<'b> where Self: 'b;
type SelectCursor<'a> = OptionalIndexSelectCursor;
// Check if value at position is not null.
#[inline]
fn contains(&self, row_id: RowId) -> bool {
@@ -177,11 +186,11 @@ impl Set<RowId> for OptionalIndex {
}
#[inline]
fn rank(&self, doc_id: DocId) -> RowId {
fn rank(&self, row_id: RowId) -> RowId {
let RowAddr {
block_id,
in_block_row_id,
} = row_addr_from_row_id(doc_id);
} = row_addr_from_row_id(row_id);
let block_meta = self.block_metas[block_id as usize];
let block = self.block(block_meta);
let block_offset_row_id = match block {
@@ -192,11 +201,11 @@ impl Set<RowId> for OptionalIndex {
}
#[inline]
fn rank_if_exists(&self, doc_id: DocId) -> Option<RowId> {
fn rank_if_exists(&self, row_id: RowId) -> Option<RowId> {
let RowAddr {
block_id,
in_block_row_id,
} = row_addr_from_row_id(doc_id);
} = row_addr_from_row_id(row_id);
let block_meta = self.block_metas[block_id as usize];
let block = self.block(block_meta);
let block_offset_row_id = match block {
@@ -220,14 +229,14 @@ impl Set<RowId> for OptionalIndex {
block_doc_idx_start + in_block_rank as u32
}
fn select_cursor(&self) -> OptionalIndexSelectCursor<'_> {
fn select_cursor(&self) -> OptionalIndexSelectCursor {
OptionalIndexSelectCursor {
current_block_cursor: BlockSelectCursor::Sparse(
SparseBlockCodec::open(b"").select_cursor(),
),
current_block_id: 0u16,
current_block_end_rank: 0u32, //< this is sufficient to force the first load
optional_index: self,
optional_index: self.clone(),
block_doc_idx_start: 0u32,
num_null_rows_before_block: 0u32,
}
@@ -247,7 +256,7 @@ impl OptionalIndex {
open_optional_index(bytes).unwrap()
}
pub fn num_docs(&self) -> RowId {
pub fn num_rows(&self) -> RowId {
self.num_rows
}
@@ -255,7 +264,7 @@ impl OptionalIndex {
self.num_non_null_rows
}
pub fn iter_rows(&self) -> impl Iterator<Item = RowId> + '_ {
pub fn iter_rows<'a>(&'a self) -> impl Iterator<Item = RowId> + 'a {
// TODO optimize
let mut select_batch = self.select_cursor();
(0..self.num_non_null_rows).map(move |rank| select_batch.select(rank))
@@ -268,7 +277,7 @@ impl OptionalIndex {
}
#[inline]
fn block(&self, block_meta: BlockMeta) -> Block<'_> {
fn block<'a>(&'a self, block_meta: BlockMeta) -> Block<'a> {
let BlockMeta {
start_byte_offset,
block_variant,
@@ -351,7 +360,7 @@ fn serialize_optional_index_block(block_els: &[u16], out: &mut impl io::Write) -
Ok(())
}
pub fn serialize_optional_index<W: io::Write>(
pub fn serialize_optional_index<'a, W: io::Write>(
non_null_rows: &dyn Iterable<RowId>,
num_rows: RowId,
output: &mut W,
@@ -427,7 +436,7 @@ impl SerializedBlockMeta {
}
#[inline]
fn to_bytes(self) -> [u8; SERIALIZED_BLOCK_META_NUM_BYTES] {
fn to_bytes(&self) -> [u8; SERIALIZED_BLOCK_META_NUM_BYTES] {
assert!(self.num_non_null_rows > 0);
let mut bytes = [0u8; SERIALIZED_BLOCK_META_NUM_BYTES];
bytes[0..2].copy_from_slice(&self.block_id.to_le_bytes());
@@ -501,7 +510,7 @@ pub fn open_optional_index(bytes: OwnedBytes) -> io::Result<OptionalIndex> {
num_non_empty_block_bytes as usize * SERIALIZED_BLOCK_META_NUM_BYTES;
let (block_data, block_metas) = bytes.rsplit(block_metas_num_bytes);
let (block_metas, num_non_null_rows) =
deserialize_optional_index_block_metadatas(block_metas.as_slice(), num_rows);
deserialize_optional_index_block_metadatas(block_metas.as_slice(), num_rows).into();
let optional_index = OptionalIndex {
num_rows,
num_non_null_rows,

View File

@@ -10,7 +10,7 @@ pub trait SetCodec {
///
/// May panic if the elements are not sorted.
fn serialize(els: impl Iterator<Item = Self::Item>, wrt: impl io::Write) -> io::Result<()>;
fn open(data: &[u8]) -> Self::Reader<'_>;
fn open<'a>(data: &'a [u8]) -> Self::Reader<'a>;
}
/// Stateful object that makes it possible to compute several select in a row,
@@ -43,5 +43,5 @@ pub trait Set<T> {
fn select(&self, rank: T) -> T;
/// Creates a brand new select cursor.
fn select_cursor(&self) -> Self::SelectCursor<'_>;
fn select_cursor<'b>(&'b self) -> Self::SelectCursor<'b>;
}

View File

@@ -45,7 +45,7 @@ impl SetCodec for DenseBlockCodec {
}
#[inline]
fn open(data: &[u8]) -> Self::Reader<'_> {
fn open<'a>(data: &'a [u8]) -> Self::Reader<'a> {
assert_eq!(data.len(), DENSE_BLOCK_NUM_BYTES as usize);
DenseBlock(data)
}
@@ -94,7 +94,7 @@ impl DenseMiniBlock {
Self { bitvec, rank }
}
fn to_bytes(self) -> [u8; MINI_BLOCK_NUM_BYTES] {
fn to_bytes(&self) -> [u8; MINI_BLOCK_NUM_BYTES] {
let mut bytes = [0u8; MINI_BLOCK_NUM_BYTES];
bytes[..MINI_BLOCK_BITVEC_NUM_BYTES].copy_from_slice(&self.bitvec.to_le_bytes());
bytes[MINI_BLOCK_BITVEC_NUM_BYTES..].copy_from_slice(&self.rank.to_le_bytes());
@@ -166,7 +166,7 @@ impl<'a> Set<u16> for DenseBlock<'a> {
}
#[inline(always)]
fn select_cursor(&self) -> Self::SelectCursor<'_> {
fn select_cursor<'b>(&'b self) -> Self::SelectCursor<'b> {
DenseBlockSelectCursor {
block_id: 0,
dense_block: *self,

View File

@@ -16,7 +16,7 @@ impl SetCodec for SparseBlockCodec {
Ok(())
}
fn open(data: &[u8]) -> Self::Reader<'_> {
fn open<'a>(data: &'a [u8]) -> Self::Reader<'a> {
SparseBlock(data)
}
}
@@ -56,7 +56,7 @@ impl<'a> Set<u16> for SparseBlock<'a> {
}
#[inline(always)]
fn select_cursor(&self) -> Self::SelectCursor<'_> {
fn select_cursor<'b>(&'b self) -> Self::SelectCursor<'b> {
*self
}
}

View File

@@ -142,7 +142,7 @@ fn test_optional_index_large() {
fn test_optional_index_iter_aux(row_ids: &[RowId], num_rows: RowId) {
let optional_index = OptionalIndex::for_test(num_rows, row_ids);
assert_eq!(optional_index.num_docs(), num_rows);
assert_eq!(optional_index.num_rows(), num_rows);
assert!(optional_index.iter_rows().eq(row_ids.iter().copied()));
}
@@ -154,7 +154,7 @@ fn test_optional_index_iter_empty() {
fn test_optional_index_rank_aux(row_ids: &[RowId]) {
let num_rows = row_ids.last().copied().unwrap_or(0u32) + 1;
let null_index = OptionalIndex::for_test(num_rows, row_ids);
assert_eq!(null_index.num_docs(), num_rows);
assert_eq!(null_index.num_rows(), num_rows);
for (row_id, row_val) in row_ids.iter().copied().enumerate() {
assert_eq!(null_index.rank(row_val), row_id as u32);
assert_eq!(null_index.rank_if_exists(row_val), Some(row_id as u32));
@@ -196,7 +196,7 @@ fn test_optional_index_for_tests() {
assert!(optional_index.contains(1));
assert!(optional_index.contains(2));
assert!(!optional_index.contains(3));
assert_eq!(optional_index.num_docs(), 4);
assert_eq!(optional_index.num_rows(), 4);
}
#[cfg(all(test, feature = "unstable"))]
@@ -212,13 +212,10 @@ mod bench {
fn gen_bools(fill_ratio: f64) -> OptionalIndex {
let mut out = Vec::new();
let mut rng: StdRng = StdRng::from_seed([1u8; 32]);
let vals: Vec<RowId> = (0..TOTAL_NUM_VALUES)
let vals: Vec<bool> = (0..TOTAL_NUM_VALUES)
.map(|_| rng.gen_bool(fill_ratio))
.enumerate()
.filter(|(pos, val)| *val)
.map(|(pos, _)| pos as RowId)
.collect();
serialize_optional_index(&&vals[..], TOTAL_NUM_VALUES, &mut out).unwrap();
serialize_optional_index(&&vals[..], &mut out).unwrap();
let codec = open_optional_index(OwnedBytes::new(out)).unwrap();
codec
}

View File

@@ -6,7 +6,6 @@ use std::sync::Arc;
use tantivy_bitpacker::minmax;
use crate::column_values::monotonic_mapping::StrictlyMonotonicFn;
use crate::RowId;
/// `ColumnValues` provides access to a dense field column.
///
@@ -36,21 +35,21 @@ pub trait ColumnValues<T: PartialOrd = u64>: Send + Sync {
}
}
/// Get the row ids of values which are in the provided value range.
/// Get the positions of values which are in the provided value range.
///
/// Note that position == docid for single value fast fields
#[inline(always)]
fn get_row_ids_for_value_range(
fn get_docids_for_value_range(
&self,
value_range: RangeInclusive<T>,
row_id_range: Range<RowId>,
row_id_hits: &mut Vec<RowId>,
doc_id_range: Range<u32>,
positions: &mut Vec<u32>,
) {
let row_id_range = row_id_range.start..row_id_range.end.min(self.num_vals());
for idx in row_id_range.start..row_id_range.end {
let doc_id_range = doc_id_range.start..doc_id_range.end.min(self.num_vals());
for idx in doc_id_range.start..doc_id_range.end {
let val = self.get_val(idx);
if value_range.contains(&val) {
row_id_hits.push(idx);
positions.push(idx);
}
}
}
@@ -110,14 +109,31 @@ impl<T: Copy + PartialOrd + Debug> ColumnValues<T> for Arc<dyn ColumnValues<T>>
fn get_range(&self, start: u64, output: &mut [T]) {
self.as_ref().get_range(start, output)
}
}
fn get_row_ids_for_value_range(
&self,
value_range: RangeInclusive<T>,
row_id_range: Range<RowId>,
row_id_hits: &mut Vec<RowId>,
) {
self.as_ref().get_row_ids_for_value_range(value_range, row_id_range, row_id_hits)
impl<'a, C: ColumnValues<T> + ?Sized, T: Copy + PartialOrd + Debug> ColumnValues<T> for &'a C {
fn get_val(&self, idx: u32) -> T {
(*self).get_val(idx)
}
fn min_value(&self) -> T {
(*self).min_value()
}
fn max_value(&self) -> T {
(*self).max_value()
}
fn num_vals(&self) -> u32 {
(*self).num_vals()
}
fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = T> + 'b> {
(*self).iter()
}
fn get_range(&self, start: u64, output: &mut [T]) {
(*self).get_range(start, output)
}
}
@@ -241,13 +257,13 @@ where
)
}
fn get_row_ids_for_value_range(
fn get_docids_for_value_range(
&self,
range: RangeInclusive<Output>,
doc_id_range: Range<u32>,
positions: &mut Vec<u32>,
) {
self.from_column.get_row_ids_for_value_range(
self.from_column.get_docids_for_value_range(
self.monotonic_mapping.inverse(range.start().clone())
..=self.monotonic_mapping.inverse(range.end().clone()),
doc_id_range,

View File

@@ -313,7 +313,7 @@ impl ColumnValues<u128> for CompactSpaceDecompressor {
}
#[inline]
fn get_row_ids_for_value_range(
fn get_docids_for_value_range(
&self,
value_range: RangeInclusive<u128>,
positions_range: Range<u32>,
@@ -709,7 +709,7 @@ mod tests {
doc_id_range: Range<u32>,
) -> Vec<u32> {
let mut positions = Vec::new();
column.get_row_ids_for_value_range(value_range, doc_id_range, &mut positions);
column.get_docids_for_value_range(value_range, doc_id_range, &mut positions);
positions
}

View File

@@ -1,4 +1,5 @@
#![warn(missing_docs)]
#![cfg_attr(all(feature = "unstable", test), feature(test))]
//! # `fastfield_codecs`
//!
@@ -25,10 +26,10 @@ mod stats;
pub(crate) mod u64_based;
mod column;
pub(crate) mod serialize;
pub mod serialize;
pub use serialize::serialize_column_values_u128;
pub use stats::ColumnStats;
pub use stats::Stats;
pub use u64_based::{
load_u64_based_column_values, serialize_and_load_u64_based_column_values,
serialize_u64_based_column_values, CodecType, ALL_U64_CODEC_TYPES,
@@ -136,7 +137,6 @@ mod bench {
use test::{self, Bencher};
use super::*;
use crate::column_values::u64_based::*;
fn get_data() -> Vec<u64> {
let mut rng = StdRng::seed_from_u64(2u64);
@@ -152,30 +152,23 @@ mod bench {
data
}
fn compute_stats(vals: impl Iterator<Item = u64>) -> ColumnStats {
let mut stats_collector = StatsCollector::default();
for val in vals {
stats_collector.collect(val);
}
stats_collector.stats()
}
#[inline(never)]
fn value_iter() -> impl Iterator<Item = u64> {
0..20_000
}
fn get_reader_for_bench<Codec: ColumnCodec>(data: &[u64]) -> Codec::ColumnValues {
fn get_reader_for_bench<Codec: FastFieldCodec>(data: &[u64]) -> Codec::Reader {
let mut bytes = Vec::new();
let stats = compute_stats(data.iter().cloned());
let mut codec_serializer = Codec::estimator();
for val in data {
codec_serializer.collect(*val);
}
codec_serializer.serialize(&stats, Box::new(data.iter().copied()).as_mut(), &mut bytes);
Codec::load(OwnedBytes::new(bytes)).unwrap()
let min_value = *data.iter().min().unwrap();
let data = data.iter().map(|el| *el - min_value).collect::<Vec<_>>();
let col = VecColumn::from(&data);
let normalized_header = NormalizedHeader {
num_vals: col.num_vals(),
max_value: col.max_value(),
};
Codec::serialize(&VecColumn::from(&data), &mut bytes).unwrap();
Codec::open_from_bytes(OwnedBytes::new(bytes), normalized_header).unwrap()
}
fn bench_get<Codec: ColumnCodec>(b: &mut Bencher, data: &[u64]) {
fn bench_get<Codec: FastFieldCodec>(b: &mut Bencher, data: &[u64]) {
let col = get_reader_for_bench::<Codec>(data);
b.iter(|| {
let mut sum = 0u64;
@@ -199,22 +192,18 @@ mod bench {
});
}
fn bench_get_dynamic<Codec: ColumnCodec>(b: &mut Bencher, data: &[u64]) {
fn bench_get_dynamic<Codec: FastFieldCodec>(b: &mut Bencher, data: &[u64]) {
let col = Arc::new(get_reader_for_bench::<Codec>(data));
bench_get_dynamic_helper(b, col);
}
fn bench_create<Codec: ColumnCodec>(b: &mut Bencher, data: &[u64]) {
let stats = compute_stats(data.iter().cloned());
fn bench_create<Codec: FastFieldCodec>(b: &mut Bencher, data: &[u64]) {
let min_value = *data.iter().min().unwrap();
let data = data.iter().map(|el| *el - min_value).collect::<Vec<_>>();
let mut bytes = Vec::new();
b.iter(|| {
bytes.clear();
let mut codec_serializer = Codec::estimator();
for val in data.iter().take(1024) {
codec_serializer.collect(*val);
}
codec_serializer.serialize(&stats, Box::new(data.iter().copied()).as_mut(), &mut bytes)
Codec::serialize(&VecColumn::from(&data), &mut bytes).unwrap();
});
}

View File

@@ -1,7 +1,6 @@
use std::fmt::Debug;
use std::marker::PhantomData;
use common::DateTime;
use fastdivide::DividerU64;
use super::MonotonicallyMappableToU128;
@@ -123,7 +122,6 @@ pub(crate) struct StrictlyMonotonicMappingToInternalGCDBaseval {
min_value: u64,
}
impl StrictlyMonotonicMappingToInternalGCDBaseval {
/// Creates a linear mapping `x -> gcd*x + min_value`.
pub(crate) fn new(gcd: u64, min_value: u64) -> Self {
let gcd_divider = DividerU64::divide_by(gcd);
Self {
@@ -152,9 +150,7 @@ impl<External: MonotonicallyMappableToU64> StrictlyMonotonicFn<External, u64>
pub(crate) struct StrictlyMonotonicMappingToInternalBaseval {
min_value: u64,
}
impl StrictlyMonotonicMappingToInternalBaseval {
/// Creates a linear mapping `x -> x + min_value`.
#[inline(always)]
pub(crate) fn new(min_value: u64) -> Self {
Self { min_value }
@@ -199,15 +195,17 @@ impl MonotonicallyMappableToU64 for i64 {
}
}
impl MonotonicallyMappableToU64 for DateTime {
impl MonotonicallyMappableToU64 for crate::DateTime {
#[inline(always)]
fn to_u64(self) -> u64 {
common::i64_to_u64(self.into_timestamp_micros())
common::i64_to_u64(self.timestamp_micros)
}
#[inline(always)]
fn from_u64(val: u64) -> Self {
DateTime::from_timestamp_micros(common::u64_to_i64(val))
crate::DateTime {
timestamp_micros: common::u64_to_i64(val),
}
}
}

View File

@@ -8,6 +8,19 @@ use crate::column_values::U128FastFieldCodecType;
use crate::iterable::Iterable;
use crate::MonotonicallyMappableToU128;
/// The normalized header gives some parameters after applying the following
/// normalization of the vector:
/// `val -> (val - min_value) / gcd`
///
/// By design, after normalization, `min_value = 0` and `gcd = 1`.
#[derive(Debug, Copy, Clone)]
pub struct NormalizedHeader {
/// The number of values in the underlying column.
pub num_vals: u32,
/// The max value of the underlying column.
pub max_value: u64,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(crate) struct U128Header {
pub num_vals: u32,

View File

@@ -6,28 +6,21 @@ use common::{BinarySerializable, VInt};
use crate::RowId;
/// Column statistics.
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct ColumnStats {
/// GCD of the elements `el - min(column)`.
pub struct Stats {
pub gcd: NonZeroU64,
/// Minimum value of the column.
pub min_value: u64,
/// Maximum value of the column.
pub max_value: u64,
/// Number of rows in the column.
pub num_rows: RowId,
}
impl ColumnStats {
/// Amplitude of value.
/// Difference between the maximum and the minimum value.
impl Stats {
pub fn amplitude(&self) -> u64 {
self.max_value - self.min_value
}
}
impl BinarySerializable for ColumnStats {
impl BinarySerializable for Stats {
fn serialize<W: Write + ?Sized>(&self, writer: &mut W) -> io::Result<()> {
VInt(self.min_value).serialize(writer)?;
VInt(self.gcd.get()).serialize(writer)?;
@@ -44,7 +37,7 @@ impl BinarySerializable for ColumnStats {
let amplitude = VInt::deserialize(reader)?.0 * gcd.get();
let max_value = min_value + amplitude;
let num_rows = VInt::deserialize(reader)?.0 as RowId;
Ok(ColumnStats {
Ok(Stats {
min_value,
max_value,
num_rows,
@@ -59,21 +52,21 @@ mod tests {
use common::BinarySerializable;
use crate::column_values::ColumnStats;
use crate::column_values::Stats;
#[track_caller]
fn test_stats_ser_deser_aux(stats: &ColumnStats, num_bytes: usize) {
fn test_stats_ser_deser_aux(stats: &Stats, num_bytes: usize) {
let mut buffer: Vec<u8> = Vec::new();
stats.serialize(&mut buffer).unwrap();
assert_eq!(buffer.len(), num_bytes);
let deser_stats = ColumnStats::deserialize(&mut &buffer[..]).unwrap();
let deser_stats = Stats::deserialize(&mut &buffer[..]).unwrap();
assert_eq!(stats, &deser_stats);
}
#[test]
fn test_stats_serialization() {
test_stats_ser_deser_aux(
&(ColumnStats {
&(Stats {
gcd: NonZeroU64::new(3).unwrap(),
min_value: 1,
max_value: 3001,
@@ -82,7 +75,7 @@ mod tests {
5,
);
test_stats_ser_deser_aux(
&(ColumnStats {
&(Stats {
gcd: NonZeroU64::new(1_000).unwrap(),
min_value: 1,
max_value: 3001,
@@ -91,7 +84,7 @@ mod tests {
5,
);
test_stats_ser_deser_aux(
&(ColumnStats {
&(Stats {
gcd: NonZeroU64::new(1).unwrap(),
min_value: 0,
max_value: 0,

View File

@@ -4,7 +4,7 @@ use common::{BinarySerializable, OwnedBytes};
use fastdivide::DividerU64;
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
use crate::column_values::u64_based::{ColumnCodec, ColumnCodecEstimator, ColumnStats};
use crate::column_values::u64_based::{ColumnCodec, ColumnCodecEstimator, Stats};
use crate::{ColumnValues, RowId};
/// Depending on the field type, a different
@@ -13,7 +13,7 @@ use crate::{ColumnValues, RowId};
pub struct BitpackedReader {
data: OwnedBytes,
bit_unpacker: BitUnpacker,
stats: ColumnStats,
stats: Stats,
}
impl ColumnValues for BitpackedReader {
@@ -36,7 +36,7 @@ impl ColumnValues for BitpackedReader {
}
}
fn num_bits(stats: &ColumnStats) -> u8 {
fn num_bits(stats: &Stats) -> u8 {
compute_num_bits(stats.amplitude() / stats.gcd)
}
@@ -46,14 +46,14 @@ pub struct BitpackedCodecEstimator;
impl ColumnCodecEstimator for BitpackedCodecEstimator {
fn collect(&mut self, _value: u64) {}
fn estimate(&self, stats: &ColumnStats) -> Option<u64> {
fn estimate(&self, stats: &Stats) -> Option<u64> {
let num_bits_per_value = num_bits(stats);
Some(stats.num_bytes() + (stats.num_rows as u64 * (num_bits_per_value as u64) + 7) / 8)
}
fn serialize(
&self,
stats: &ColumnStats,
stats: &Stats,
vals: &mut dyn Iterator<Item = u64>,
wrt: &mut dyn Write,
) -> io::Result<()> {
@@ -72,12 +72,12 @@ impl ColumnCodecEstimator for BitpackedCodecEstimator {
pub struct BitpackedCodec;
impl ColumnCodec for BitpackedCodec {
type ColumnValues = BitpackedReader;
type Reader = BitpackedReader;
type Estimator = BitpackedCodecEstimator;
/// Opens a fast field given a file.
fn load(mut data: OwnedBytes) -> io::Result<Self::ColumnValues> {
let stats = ColumnStats::deserialize(&mut data)?;
fn load(mut data: OwnedBytes) -> io::Result<Self::Reader> {
let stats = Stats::deserialize(&mut data)?;
let num_bits = num_bits(&stats);
let bit_unpacker = BitUnpacker::new(num_bits);
Ok(BitpackedReader {

View File

@@ -7,7 +7,7 @@ use fastdivide::DividerU64;
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
use crate::column_values::u64_based::line::Line;
use crate::column_values::u64_based::{ColumnCodec, ColumnCodecEstimator, ColumnStats};
use crate::column_values::u64_based::{ColumnCodec, ColumnCodecEstimator, Stats};
use crate::column_values::{ColumnValues, VecColumn};
use crate::MonotonicallyMappableToU64;
@@ -84,7 +84,7 @@ impl ColumnCodecEstimator for BlockwiseLinearEstimator {
self.block.clear();
}
}
fn estimate(&self, stats: &ColumnStats) -> Option<u64> {
fn estimate(&self, stats: &Stats) -> Option<u64> {
let mut estimate = 4 + stats.num_bytes() + self.meta_num_bytes + self.values_num_bytes;
if stats.gcd.get() > 1 {
let estimate_gain_from_gcd =
@@ -100,7 +100,7 @@ impl ColumnCodecEstimator for BlockwiseLinearEstimator {
fn serialize(
&self,
stats: &ColumnStats,
stats: &Stats,
mut vals: &mut dyn Iterator<Item = u64>,
wrt: &mut dyn Write,
) -> io::Result<()> {
@@ -165,12 +165,12 @@ impl ColumnCodecEstimator for BlockwiseLinearEstimator {
pub struct BlockwiseLinearCodec;
impl ColumnCodec<u64> for BlockwiseLinearCodec {
type ColumnValues = BlockwiseLinearReader;
type Reader = BlockwiseLinearReader;
type Estimator = BlockwiseLinearEstimator;
fn load(mut bytes: OwnedBytes) -> io::Result<Self::ColumnValues> {
let stats = ColumnStats::deserialize(&mut bytes)?;
fn load(mut bytes: OwnedBytes) -> io::Result<Self::Reader> {
let stats = Stats::deserialize(&mut bytes)?;
let footer_len: u32 = (&bytes[bytes.len() - 4..]).deserialize()?;
let footer_offset = bytes.len() - 4 - footer_len as usize;
let (data, mut footer) = bytes.split(footer_offset);
@@ -195,7 +195,7 @@ impl ColumnCodec<u64> for BlockwiseLinearCodec {
pub struct BlockwiseLinearReader {
blocks: Arc<[Block]>,
data: OwnedBytes,
stats: ColumnStats,
stats: Stats,
}
impl ColumnValues for BlockwiseLinearReader {

View File

@@ -5,7 +5,7 @@ use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
use super::line::Line;
use super::ColumnValues;
use crate::column_values::u64_based::{ColumnCodec, ColumnCodecEstimator, ColumnStats};
use crate::column_values::u64_based::{ColumnCodec, ColumnCodecEstimator, Stats};
use crate::column_values::VecColumn;
use crate::RowId;
@@ -18,7 +18,7 @@ const LINE_ESTIMATION_BLOCK_LEN: usize = 512;
pub struct LinearReader {
data: OwnedBytes,
linear_params: LinearParams,
stats: ColumnStats,
stats: Stats,
}
impl ColumnValues for LinearReader {
@@ -106,7 +106,7 @@ impl ColumnCodecEstimator for LinearCodecEstimator {
}
}
fn estimate(&self, stats: &ColumnStats) -> Option<u64> {
fn estimate(&self, stats: &Stats) -> Option<u64> {
let line = self.line?;
let amplitude = self.max_deviation - self.min_deviation;
let num_bits = compute_num_bits(amplitude);
@@ -123,7 +123,7 @@ impl ColumnCodecEstimator for LinearCodecEstimator {
fn serialize(
&self,
stats: &ColumnStats,
stats: &Stats,
vals: &mut dyn Iterator<Item = u64>,
wrt: &mut dyn io::Write,
) -> io::Result<()> {
@@ -184,12 +184,12 @@ impl LinearCodecEstimator {
}
impl ColumnCodec for LinearCodec {
type ColumnValues = LinearReader;
type Reader = LinearReader;
type Estimator = LinearCodecEstimator;
fn load(mut data: OwnedBytes) -> io::Result<Self::ColumnValues> {
let stats = ColumnStats::deserialize(&mut data)?;
fn load(mut data: OwnedBytes) -> io::Result<Self::Reader> {
let stats = Stats::deserialize(&mut data)?;
let linear_params = LinearParams::deserialize(&mut data)?;
Ok(LinearReader {
stats,

View File

@@ -10,64 +10,39 @@ use std::sync::Arc;
use common::{BinarySerializable, OwnedBytes};
use crate::column_index::MultiValueIndex;
use crate::column_values::monotonic_mapping::{
StrictlyMonotonicMappingInverter, StrictlyMonotonicMappingToInternal,
};
pub use crate::column_values::u64_based::bitpacked::BitpackedCodec;
pub use crate::column_values::u64_based::blockwise_linear::BlockwiseLinearCodec;
pub use crate::column_values::u64_based::linear::LinearCodec;
pub use crate::column_values::u64_based::stats_collector::StatsCollector;
use crate::column_values::{monotonic_map_column, ColumnStats};
use crate::column_values::u64_based::bitpacked::BitpackedCodec;
use crate::column_values::u64_based::blockwise_linear::BlockwiseLinearCodec;
use crate::column_values::u64_based::linear::LinearCodec;
use crate::column_values::u64_based::stats_collector::StatsCollector;
use crate::column_values::{monotonic_map_column, Stats};
use crate::iterable::Iterable;
use crate::{ColumnValues, MonotonicallyMappableToU64};
/// A `ColumnCodecEstimator` is in charge of gathering all
/// data required to serialize a column.
///
/// This happens during a first pass on data of the column elements.
/// During that pass, all column estimators receive a call to their
/// `.collect(el)`.
///
/// After this first pass, finalize is called.
/// `.estimate(..)` then should return an accurate estimation of the
/// size of the serialized column (were we to pick this codec.).
/// `.serialize(..)` then serializes the column using this codec.
pub trait ColumnCodecEstimator<T = u64>: 'static {
/// Records a new value for estimation.
/// This method will be called for each element of the column during
/// `estimation`.
fn collect(&mut self, value: u64);
/// Finalizes the first pass phase.
fn estimate(&self, stats: &Stats) -> Option<u64>;
fn finalize(&mut self) {}
/// Returns an accurate estimation of the number of bytes that will
/// be used to represent this column.
fn estimate(&self, stats: &ColumnStats) -> Option<u64>;
/// Serializes the column using the given codec.
/// This constitutes a second pass over the columns values.
fn serialize(
&self,
stats: &ColumnStats,
stats: &Stats,
vals: &mut dyn Iterator<Item = T>,
wrt: &mut dyn io::Write,
) -> io::Result<()>;
}
/// A column codec describes a colunm serialization format.
pub trait ColumnCodec<T: PartialOrd = u64> {
/// Specialized `ColumnValues` type.
type ColumnValues: ColumnValues<T> + 'static;
/// `Estimator` for the given codec.
type Reader: ColumnValues<T> + 'static;
type Estimator: ColumnCodecEstimator + Default;
/// Loads a column that has been serialized using this codec.
fn load(bytes: OwnedBytes) -> io::Result<Self::ColumnValues>;
fn load(bytes: OwnedBytes) -> io::Result<Self::Reader>;
/// Returns an estimator.
fn estimator() -> Self::Estimator {
Self::Estimator::default()
}
/// Returns a boxed estimator.
fn boxed_estimator() -> Box<dyn ColumnCodecEstimator> {
Box::new(Self::estimator())
}
@@ -88,7 +63,6 @@ pub enum CodecType {
BlockwiseLinear = 2u8,
}
/// List of all available u64-base codecs.
pub const ALL_U64_CODEC_TYPES: [CodecType; 3] = [
CodecType::Bitpacked,
CodecType::Linear,
@@ -133,7 +107,6 @@ fn load_specific_codec<C: ColumnCodec, T: MonotonicallyMappableToU64>(
}
impl CodecType {
/// Returns a boxed codec estimator associated to a given `CodecType`.
pub fn estimator(&self) -> Box<dyn ColumnCodecEstimator> {
match self {
CodecType::Bitpacked => BitpackedCodec::boxed_estimator(),
@@ -143,8 +116,7 @@ impl CodecType {
}
}
/// Serializes a given column of u64-mapped values.
pub fn serialize_u64_based_column_values<T: MonotonicallyMappableToU64>(
pub fn serialize_u64_based_column_values<'a, T: MonotonicallyMappableToU64>(
vals: &dyn Iterable<T>,
codec_types: &[CodecType],
wrt: &mut dyn Write,
@@ -185,14 +157,11 @@ pub fn serialize_u64_based_column_values<T: MonotonicallyMappableToU64>(
Ok(())
}
/// Load u64-based column values.
///
/// This method first identifies the codec off the first byte.
pub fn load_u64_based_column_values<T: MonotonicallyMappableToU64>(
mut bytes: OwnedBytes,
) -> io::Result<Arc<dyn ColumnValues<T>>> {
let codec_type: CodecType = bytes
.first()
.get(0)
.copied()
.and_then(CodecType::try_from_code)
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Failed to read codec type"))?;

View File

@@ -2,7 +2,7 @@ use std::num::NonZeroU64;
use fastdivide::DividerU64;
use crate::column_values::ColumnStats;
use crate::column_values::Stats;
use crate::RowId;
/// Compute the gcd of two non null numbers.
@@ -33,14 +33,14 @@ pub struct StatsCollector {
}
impl StatsCollector {
pub fn stats(&self) -> ColumnStats {
pub fn stats(&self) -> Stats {
let (min_value, max_value) = self.min_max_opt.unwrap_or((0u64, 0u64));
let increment_gcd = if let Some((increment_gcd, _)) = self.increment_gcd_opt {
increment_gcd
} else {
NonZeroU64::new(1u64).unwrap()
};
ColumnStats {
Stats {
min_value,
max_value,
num_rows: self.num_rows,
@@ -97,9 +97,9 @@ mod tests {
use std::num::NonZeroU64;
use crate::column_values::u64_based::stats_collector::{compute_gcd, StatsCollector};
use crate::column_values::u64_based::ColumnStats;
use crate::column_values::u64_based::Stats;
fn compute_stats(vals: impl Iterator<Item = u64>) -> ColumnStats {
fn compute_stats(vals: impl Iterator<Item = u64>) -> Stats {
let mut stats_collector = StatsCollector::default();
for val in vals {
stats_collector.collect(val);
@@ -144,7 +144,7 @@ mod tests {
fn test_stats() {
assert_eq!(
compute_stats([].into_iter()),
ColumnStats {
Stats {
gcd: NonZeroU64::new(1).unwrap(),
min_value: 0,
max_value: 0,
@@ -153,7 +153,7 @@ mod tests {
);
assert_eq!(
compute_stats([0, 1].into_iter()),
ColumnStats {
Stats {
gcd: NonZeroU64::new(1).unwrap(),
min_value: 0,
max_value: 1,
@@ -162,7 +162,7 @@ mod tests {
);
assert_eq!(
compute_stats([0, 1].into_iter()),
ColumnStats {
Stats {
gcd: NonZeroU64::new(1).unwrap(),
min_value: 0,
max_value: 1,
@@ -171,7 +171,7 @@ mod tests {
);
assert_eq!(
compute_stats([10, 20, 30].into_iter()),
ColumnStats {
Stats {
gcd: NonZeroU64::new(10).unwrap(),
min_value: 10,
max_value: 30,
@@ -180,7 +180,7 @@ mod tests {
);
assert_eq!(
compute_stats([10, 50, 10, 30].into_iter()),
ColumnStats {
Stats {
gcd: NonZeroU64::new(20).unwrap(),
min_value: 10,
max_value: 50,
@@ -189,7 +189,7 @@ mod tests {
);
assert_eq!(
compute_stats([10, 0, 30].into_iter()),
ColumnStats {
Stats {
gcd: NonZeroU64::new(10).unwrap(),
min_value: 0,
max_value: 30,

View File

@@ -60,7 +60,7 @@ pub(crate) fn create_and_validate<TColumnCodec: ColumnCodec>(
.map(|(pos, _)| pos as u32)
.collect();
let mut positions = Vec::new();
reader.get_row_ids_for_value_range(
reader.get_docids_for_value_range(
vals[test_rand_idx]..=vals[test_rand_idx],
0..vals.len() as u32,
&mut positions,

View File

@@ -111,7 +111,7 @@ impl HasAssociatedColumnType for bool {
}
}
impl HasAssociatedColumnType for common::DateTime {
impl HasAssociatedColumnType for crate::DateTime {
fn column_type() -> ColumnType {
ColumnType::DateTime
}

View File

@@ -4,7 +4,7 @@ pub const VERSION_FOOTER_NUM_BYTES: usize = MAGIC_BYTES.len() + std::mem::size_o
/// We end the file by these 4 bytes just to somewhat identify that
/// this is indeed a columnar file.
const MAGIC_BYTES: [u8; 4] = [2, 113, 119, 66];
const MAGIC_BYTES: [u8; 4] = [2, 113, 119, 066];
pub fn footer() -> [u8; VERSION_FOOTER_NUM_BYTES] {
let mut footer_bytes = [0u8; VERSION_FOOTER_NUM_BYTES];
@@ -27,8 +27,8 @@ pub enum Version {
}
impl Version {
fn to_bytes(self) -> [u8; 4] {
(self as u32).to_le_bytes()
fn to_bytes(&self) -> [u8; 4] {
(*self as u32).to_le_bytes()
}
fn try_from_bytes(bytes: [u8; 4]) -> Result<Version, InvalidData> {

View File

@@ -58,7 +58,7 @@ impl<'a> RemappedTermOrdinalsValues<'a> {
.enumerate()
.flat_map(|(segment_ord, byte_column)| {
let segment_ord = self.term_ord_mapping.get_segment(segment_ord as u32);
byte_column.iter().flat_map(move |bytes_column| {
byte_column.into_iter().flat_map(move |bytes_column| {
bytes_column
.ords()
.values

View File

@@ -174,7 +174,6 @@ fn merge_column(
Ok(())
}
#[allow(clippy::type_complexity)]
fn group_columns_for_merge(
columnar_readers: &[&ColumnarReader],
) -> io::Result<BTreeMap<(String, ColumnType), Vec<Option<DynamicColumn>>>> {

View File

@@ -162,7 +162,7 @@ mod tests {
}
#[test]
#[should_panic(expected = "Input type forbidden")]
#[should_panic(expect = "Input type forbidden")]
fn test_list_columns_strict_typing_panics_on_wrong_types() {
let mut columnar_writer = ColumnarWriter::default();
columnar_writer.record_column_type("count", ColumnType::U64, false);

View File

@@ -47,7 +47,6 @@ struct SpareBuffers {
/// let mut wrt: Vec<u8> = Vec::new();
/// columnar_writer.serialize(2u32, None, &mut wrt).unwrap();
/// ```
#[derive(Default)]
pub struct ColumnarWriter {
numerical_field_hash_map: ArenaHashMap,
datetime_field_hash_map: ArenaHashMap,
@@ -61,6 +60,22 @@ pub struct ColumnarWriter {
buffers: SpareBuffers,
}
impl Default for ColumnarWriter {
fn default() -> Self {
ColumnarWriter {
numerical_field_hash_map: ArenaHashMap::new(10_000),
bool_field_hash_map: ArenaHashMap::new(10_000),
ip_addr_field_hash_map: ArenaHashMap::new(10_000),
bytes_field_hash_map: ArenaHashMap::new(10_000),
str_field_hash_map: ArenaHashMap::new(10_000),
datetime_field_hash_map: ArenaHashMap::new(10_000),
dictionaries: Vec::new(),
arena: MemoryArena::default(),
buffers: SpareBuffers::default(),
}
}
}
#[inline]
fn mutate_or_create_column<V, TMutator>(
arena_hash_map: &mut ArenaHashMap,
@@ -251,15 +266,11 @@ impl ColumnarWriter {
});
}
pub fn record_datetime(&mut self, doc: RowId, column_name: &str, datetime: common::DateTime) {
pub fn record_datetime(&mut self, doc: RowId, column_name: &str, datetime: crate::DateTime) {
let (hash_map, arena) = (&mut self.datetime_field_hash_map, &mut self.arena);
mutate_or_create_column(hash_map, column_name, |column_opt: Option<ColumnWriter>| {
let mut column: ColumnWriter = column_opt.unwrap_or_default();
column.record(
doc,
NumericalValue::I64(datetime.into_timestamp_micros()),
arena,
);
column.record(doc, NumericalValue::I64(datetime.timestamp_micros), arena);
column
});
}
@@ -656,7 +667,7 @@ where
Ok(())
}
fn sort_values_within_row_in_place(multivalued_index: &[RowId], values: &mut [u64]) {
fn sort_values_within_row_in_place(multivalued_index: &[RowId], values: &mut Vec<u64>) {
let mut start_index: usize = 0;
for end_index in multivalued_index.iter().copied() {
let end_index = end_index as usize;

View File

@@ -29,7 +29,7 @@ pub struct OptionalIndexBuilder {
}
impl OptionalIndexBuilder {
pub fn finish(&mut self, num_rows: RowId) -> impl Iterable<RowId> + '_ {
pub fn finish<'a>(&'a mut self, num_rows: RowId) -> impl Iterable<RowId> + 'a {
debug_assert!(self
.docs
.last()

View File

@@ -3,12 +3,12 @@ use std::net::Ipv6Addr;
use std::sync::Arc;
use common::file_slice::FileSlice;
use common::{DateTime, HasLen, OwnedBytes};
use common::{HasLen, OwnedBytes};
use crate::column::{BytesColumn, Column, StrColumn};
use crate::column_values::{monotonic_map_column, StrictlyMonotonicFn};
use crate::columnar::ColumnType;
use crate::{Cardinality, NumericalType};
use crate::{Cardinality, DateTime, NumericalType};
#[derive(Clone)]
pub enum DynamicColumn {
@@ -166,9 +166,9 @@ impl StrictlyMonotonicFn<i64, u64> for MapI64ToU64 {
macro_rules! static_dynamic_conversions {
($typ:ty, $enum_name:ident) => {
impl From<DynamicColumn> for Option<$typ> {
fn from(dynamic_column: DynamicColumn) -> Option<$typ> {
if let DynamicColumn::$enum_name(col) = dynamic_column {
impl Into<Option<$typ>> for DynamicColumn {
fn into(self) -> Option<$typ> {
if let DynamicColumn::$enum_name(col) = self {
Some(col)
} else {
None
@@ -188,7 +188,7 @@ static_dynamic_conversions!(Column<bool>, Bool);
static_dynamic_conversions!(Column<u64>, U64);
static_dynamic_conversions!(Column<i64>, I64);
static_dynamic_conversions!(Column<f64>, F64);
static_dynamic_conversions!(Column<DateTime>, DateTime);
static_dynamic_conversions!(Column<crate::DateTime>, DateTime);
static_dynamic_conversions!(StrColumn, Str);
static_dynamic_conversions!(BytesColumn, Bytes);
static_dynamic_conversions!(Column<Ipv6Addr>, IpAddr);
@@ -243,7 +243,7 @@ impl DynamicColumnHandle {
ColumnType::Bool => crate::column::open_column_u64::<bool>(column_bytes)?.into(),
ColumnType::IpAddr => crate::column::open_column_u128::<Ipv6Addr>(column_bytes)?.into(),
ColumnType::DateTime => {
crate::column::open_column_u64::<DateTime>(column_bytes)?.into()
crate::column::open_column_u64::<crate::DateTime>(column_bytes)?.into()
}
};
Ok(dynamic_column)

View File

@@ -10,7 +10,7 @@ extern crate test;
use std::io;
mod column;
mod column_index;
pub mod column_index;
pub mod column_values;
mod columnar;
mod dictionary;
@@ -32,7 +32,6 @@ pub use value::{NumericalType, NumericalValue};
pub use self::dynamic_column::{DynamicColumn, DynamicColumnHandle};
pub type RowId = u32;
pub type DocId = u32;
#[derive(Clone, Copy)]
pub struct RowAddr {
@@ -43,7 +42,16 @@ pub struct RowAddr {
pub use sstable::Dictionary;
pub type Streamer<'a> = sstable::Streamer<'a, VoidSSTable>;
pub use common::DateTime;
#[derive(Clone, Copy, PartialOrd, PartialEq, Default, Debug)]
pub struct DateTime {
pub timestamp_micros: i64,
}
impl DateTime {
pub fn into_timestamp_micros(self) -> i64 {
self.timestamp_micros
}
}
#[derive(Copy, Clone, Debug)]
pub struct InvalidData;

View File

@@ -75,7 +75,7 @@ fn test_dataframe_writer_u64_multivalued() {
divisor_col.get_cardinality(),
crate::Cardinality::Multivalued
);
assert_eq!(divisor_col.num_docs(), 7);
assert_eq!(divisor_col.num_rows(), 7);
}
#[test]

View File

@@ -1,5 +1,3 @@
use common::DateTime;
use crate::InvalidData;
#[derive(Copy, Clone, PartialEq, Debug)]
@@ -106,10 +104,10 @@ impl Coerce for f64 {
}
}
impl Coerce for DateTime {
impl Coerce for crate::DateTime {
fn coerce(value: NumericalValue) -> Self {
let timestamp_micros = i64::coerce(value);
DateTime::from_timestamp_micros(timestamp_micros)
crate::DateTime { timestamp_micros }
}
}

View File

@@ -16,8 +16,6 @@ repository = "https://github.com/quickwit-oss/tantivy"
byteorder = "1.4.3"
ownedbytes = { version= "0.5", path="../ownedbytes" }
async-trait = "0.1"
time = { version = "0.3.10", features = ["serde-well-known"] }
serde = { version = "1.0.136", features = ["derive"] }
[dev-dependencies]
proptest = "1.0.0"

View File

@@ -1,136 +0,0 @@
use std::fmt;
use serde::{Deserialize, Serialize};
use time::format_description::well_known::Rfc3339;
use time::{OffsetDateTime, PrimitiveDateTime, UtcOffset};
/// DateTime Precision
#[derive(
Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Default,
)]
#[serde(rename_all = "lowercase")]
pub enum DatePrecision {
/// Seconds precision
#[default]
Seconds,
/// Milli-seconds precision.
Milliseconds,
/// Micro-seconds precision.
Microseconds,
}
/// A date/time value with microsecond precision.
///
/// This timestamp does not carry any explicit time zone information.
/// Users are responsible for applying the provided conversion
/// functions consistently. Internally the time zone is assumed
/// to be UTC, which is also used implicitly for JSON serialization.
///
/// All constructors and conversions are provided as explicit
/// functions and not by implementing any `From`/`Into` traits
/// to prevent unintended usage.
#[derive(Clone, Default, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct DateTime {
// Timestamp in microseconds.
pub(crate) timestamp_micros: i64,
}
impl DateTime {
/// Create new from UNIX timestamp in seconds
pub const fn from_timestamp_secs(seconds: i64) -> Self {
Self {
timestamp_micros: seconds * 1_000_000,
}
}
/// Create new from UNIX timestamp in milliseconds
pub const fn from_timestamp_millis(milliseconds: i64) -> Self {
Self {
timestamp_micros: milliseconds * 1_000,
}
}
/// Create new from UNIX timestamp in microseconds.
pub const fn from_timestamp_micros(microseconds: i64) -> Self {
Self {
timestamp_micros: microseconds,
}
}
/// Create new from `OffsetDateTime`
///
/// The given date/time is converted to UTC and the actual
/// time zone is discarded.
pub const fn from_utc(dt: OffsetDateTime) -> Self {
let timestamp_micros = dt.unix_timestamp() * 1_000_000 + dt.microsecond() as i64;
Self { timestamp_micros }
}
/// Create new from `PrimitiveDateTime`
///
/// Implicitly assumes that the given date/time is in UTC!
/// Otherwise the original value must only be reobtained with
/// [`Self::into_primitive()`].
pub fn from_primitive(dt: PrimitiveDateTime) -> Self {
Self::from_utc(dt.assume_utc())
}
/// Convert to UNIX timestamp in seconds.
pub const fn into_timestamp_secs(self) -> i64 {
self.timestamp_micros / 1_000_000
}
/// Convert to UNIX timestamp in milliseconds.
pub const fn into_timestamp_millis(self) -> i64 {
self.timestamp_micros / 1_000
}
/// Convert to UNIX timestamp in microseconds.
pub const fn into_timestamp_micros(self) -> i64 {
self.timestamp_micros
}
/// Convert to UTC `OffsetDateTime`
pub fn into_utc(self) -> OffsetDateTime {
let timestamp_nanos = self.timestamp_micros as i128 * 1000;
let utc_datetime = OffsetDateTime::from_unix_timestamp_nanos(timestamp_nanos)
.expect("valid UNIX timestamp");
debug_assert_eq!(UtcOffset::UTC, utc_datetime.offset());
utc_datetime
}
/// Convert to `OffsetDateTime` with the given time zone
pub fn into_offset(self, offset: UtcOffset) -> OffsetDateTime {
self.into_utc().to_offset(offset)
}
/// Convert to `PrimitiveDateTime` without any time zone
///
/// The value should have been constructed with [`Self::from_primitive()`].
/// Otherwise the time zone is implicitly assumed to be UTC.
pub fn into_primitive(self) -> PrimitiveDateTime {
let utc_datetime = self.into_utc();
// Discard the UTC time zone offset
debug_assert_eq!(UtcOffset::UTC, utc_datetime.offset());
PrimitiveDateTime::new(utc_datetime.date(), utc_datetime.time())
}
/// Truncates the microseconds value to the corresponding precision.
pub fn truncate(self, precision: DatePrecision) -> Self {
let truncated_timestamp_micros = match precision {
DatePrecision::Seconds => (self.timestamp_micros / 1_000_000) * 1_000_000,
DatePrecision::Milliseconds => (self.timestamp_micros / 1_000) * 1_000,
DatePrecision::Microseconds => self.timestamp_micros,
};
Self {
timestamp_micros: truncated_timestamp_micros,
}
}
}
impl fmt::Debug for DateTime {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let utc_rfc3339 = self.into_utc().format(&Rfc3339).map_err(|_| fmt::Error)?;
f.write_str(&utc_rfc3339)
}
}

View File

@@ -5,14 +5,12 @@ use std::ops::Deref;
pub use byteorder::LittleEndian as Endianness;
mod bitset;
mod datetime;
pub mod file_slice;
mod group_by;
mod serialize;
mod vint;
mod writer;
pub use bitset::*;
pub use datetime::{DatePrecision, DateTime};
pub use group_by::GroupByIteratorExtended;
pub use ownedbytes::{OwnedBytes, StableDeref};
pub use serialize::{BinarySerializable, DeserializeFrom, FixedSize};

View File

@@ -24,7 +24,8 @@ fn main() -> tantivy::Result<()> {
)
.set_stored();
let text_field = schema_builder.add_text_field("text", text_fieldtype);
let score_fieldtype = crate::schema::NumericOptions::default().set_fast();
let score_fieldtype =
crate::schema::NumericOptions::default().set_fast();
let highscore_field = schema_builder.add_f64_field("highscore", score_fieldtype.clone());
let price_field = schema_builder.add_f64_field("price", score_fieldtype);

View File

@@ -7,7 +7,9 @@
// Of course, you can have a look at the tantivy's built-in collectors
// such as the `CountCollector` for more examples.
use columnar::Column;
use std::sync::Arc;
use fastfield_codecs::Column;
// ---
// Importing tantivy...
use tantivy::collector::{Collector, SegmentCollector};
@@ -95,7 +97,7 @@ impl Collector for StatsCollector {
}
struct StatsSegmentCollector {
fast_field_reader: Column,
fast_field_reader: Arc<dyn Column<u64>>,
stats: Stats,
}
@@ -103,14 +105,10 @@ impl SegmentCollector for StatsSegmentCollector {
type Fruit = Option<Stats>;
fn collect(&mut self, doc: u32, _score: Score) {
// Since we know the values are single value, we could call `first_or_default_col` on the
// column and fetch single values.
for value in self.fast_field_reader.values(doc) {
let value = value as f64;
self.stats.count += 1;
self.stats.sum += value;
self.stats.squared_sum += value * value;
}
let value = self.fast_field_reader.get_val(doc) as f64;
self.stats.count += 1;
self.stats.sum += value;
self.stats.squared_sum += value * value;
}
fn harvest(self) -> <Self as SegmentCollector>::Fruit {

View File

@@ -71,7 +71,7 @@ fn main() -> tantivy::Result<()> {
let reader = index.reader()?;
let searcher = reader.searcher();
{
let mut facet_collector = FacetCollector::for_field("classification");
let mut facet_collector = FacetCollector::for_field(classification);
facet_collector.add_facet("/Felidae");
let facet_counts = searcher.search(&AllQuery, &facet_collector)?;
// This lists all of the facet counts, right below "/Felidae".
@@ -97,7 +97,7 @@ fn main() -> tantivy::Result<()> {
let facet = Facet::from("/Felidae/Pantherinae");
let facet_term = Term::from_facet(classification, &facet);
let facet_term_query = TermQuery::new(facet_term, IndexRecordOption::Basic);
let mut facet_collector = FacetCollector::for_field("classification");
let mut facet_collector = FacetCollector::for_field(classification);
facet_collector.add_facet("/Felidae/Pantherinae");
let facet_counts = searcher.search(&facet_term_query, &facet_collector)?;
let facets: Vec<(&Facet, u64)> = facet_counts.get("/Felidae/Pantherinae").collect();

View File

@@ -56,7 +56,7 @@ fn main() -> tantivy::Result<()> {
);
let top_docs_by_custom_score =
TopDocs::with_limit(2).tweak_score(move |segment_reader: &SegmentReader| {
let ingredient_reader = segment_reader.facet_reader("ingredient").unwrap();
let ingredient_reader = segment_reader.facet_reader(ingredient).unwrap();
let facet_dict = ingredient_reader.facet_dict();
let query_ords: HashSet<u64> = facets
@@ -64,9 +64,12 @@ fn main() -> tantivy::Result<()> {
.filter_map(|key| facet_dict.term_ord(key.encoded_str()).unwrap())
.collect();
let mut facet_ords_buffer: Vec<u64> = Vec::with_capacity(20);
move |doc: DocId, original_score: Score| {
let missing_ingredients = ingredient_reader
.facet_ords(doc)
ingredient_reader.facet_ords(doc, &mut facet_ords_buffer);
let missing_ingredients = facet_ords_buffer
.iter()
.filter(|ord| !query_ords.contains(ord))
.count();
let tweak = 1.0 / 4_f32.powi(missing_ingredients as i32);

View File

@@ -48,10 +48,7 @@ impl Warmer for DynamicPriceColumn {
fn warm(&self, searcher: &Searcher) -> tantivy::Result<()> {
for segment in searcher.segment_readers() {
let key = (segment.segment_id(), segment.delete_opstamp());
let product_id_reader = segment
.fast_fields()
.u64(&self.field)?
.first_or_default_col(0);
let product_id_reader = segment.fast_fields().u64(&self.field)?;
let product_ids: Vec<ProductId> = segment
.doc_ids_alive()
.map(|doc| product_id_reader.get_val(doc))

View File

@@ -66,7 +66,7 @@ impl BucketAggregationWithAccessor {
BucketAggregationType::Terms(TermsAggregation {
field: field_name, ..
}) => {
str_dict_column = reader.fast_fields().str(field_name)?;
str_dict_column = reader.fast_fields().str(&field_name)?;
get_ff_reader_and_validate(reader, field_name)?
}
};

View File

@@ -79,9 +79,8 @@ pub enum DateHistogramParseError {
fn parse_into_milliseconds(input: &str) -> Result<u64, DateHistogramParseError> {
let split_boundary = input
.as_bytes()
.iter()
.take_while(|byte| byte.is_ascii_digit())
.char_indices()
.take_while(|(pos, el)| el.is_numeric())
.count();
let (number, unit) = input.split_at(split_boundary);
if number.is_empty() {
@@ -90,12 +89,7 @@ fn parse_into_milliseconds(input: &str) -> Result<u64, DateHistogramParseError>
if unit.is_empty() {
return Err(DateHistogramParseError::UnitMissing(input.to_string()));
}
let number: u64 = number
.parse()
// Technically this should never happen, but there was a bug
// here and being defensive does not hurt.
.map_err(|_err| DateHistogramParseError::NumberMissing(input.to_string()))?;
let number: u64 = number.parse().unwrap();
let multiplier_from_unit = match unit {
"ms" => 1,
"s" => 1000,
@@ -113,7 +107,7 @@ mod tests {
use super::*;
#[test]
fn test_parse_into_milliseconds() {
fn parser_test() {
assert_eq!(parse_into_milliseconds("1m").unwrap(), 60_000);
assert_eq!(parse_into_milliseconds("2m").unwrap(), 120_000);
assert_eq!(
@@ -129,9 +123,4 @@ mod tests {
DateHistogramParseError::NumberMissing("ms".to_string())
);
}
#[test]
fn test_parse_into_milliseconds_do_not_accept_non_ascii() {
assert!(parse_into_milliseconds("m").is_err());
}
}

View File

@@ -1,4 +1,4 @@
// mod date_histogram;
mod date_histogram;
mod histogram;
// pub use date_histogram::*;
pub use date_histogram::*;
pub use histogram::*;

View File

@@ -1,5 +1,7 @@
use std::fmt::Debug;
use columnar::Column;
use itertools::Itertools;
use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize};
@@ -11,9 +13,11 @@ use crate::aggregation::intermediate_agg_result::{
IntermediateBucketResult, IntermediateTermBucketEntry, IntermediateTermBucketResult,
};
use crate::aggregation::segment_agg_result::{
build_segment_agg_collector, SegmentAggregationCollector,
build_segment_agg_collector, GenericSegmentAggregationResultsCollector,
SegmentAggregationCollector,
};
use crate::error::DataCorruption;
use crate::schema::Type;
use crate::{DocId, TantivyError};
/// Creates a bucket for every unique term and counts the number of occurences.
@@ -74,9 +78,9 @@ use crate::{DocId, TantivyError};
/// ...
/// "aggregations": {
/// "genres": {
/// "doc_count_error_upper_bound": 0,
/// "sum_other_doc_count": 0,
/// "buckets": [
/// "doc_count_error_upper_bound": 0,
/// "sum_other_doc_count": 0,
/// "buckets": [
/// { "key": "drumnbass", "doc_count": 6 },
/// { "key": "raggae", "doc_count": 4 },
/// { "key": "jazz", "doc_count": 2 }
@@ -241,6 +245,15 @@ impl TermBucketEntry {
}
impl TermBuckets {
pub(crate) fn from_req_and_validate(
sub_aggregation: &AggregationsWithAccessor,
_max_term_id: usize,
) -> crate::Result<Self> {
Ok(TermBuckets {
entries: Default::default(),
})
}
fn force_flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> {
for entry in &mut self.entries.values_mut() {
if let Some(sub_aggregations) = entry.sub_aggregations.as_mut() {

View File

@@ -4,7 +4,10 @@ use super::agg_req::Aggregations;
use super::agg_req_with_accessor::AggregationsWithAccessor;
use super::agg_result::AggregationResults;
use super::intermediate_agg_result::IntermediateAggregationResults;
use super::segment_agg_result::{build_segment_agg_collector, SegmentAggregationCollector};
use super::segment_agg_result::{
build_segment_agg_collector, GenericSegmentAggregationResultsCollector,
SegmentAggregationCollector,
};
use crate::aggregation::agg_req_with_accessor::get_aggs_with_accessor_and_validate;
use crate::collector::{Collector, SegmentCollector};
use crate::schema::Schema;

View File

@@ -1,4 +1,6 @@
use columnar::{BytesColumn, Column};
use std::sync::Arc;
use columnar::{BytesColumn, ColumnValues};
use super::*;
use crate::collector::{Count, FilterCollector, TopDocs};
@@ -55,7 +57,7 @@ pub fn test_filter_collector() -> crate::Result<()> {
assert_eq!(filtered_top_docs.len(), 0);
fn date_filter(value: DateTime) -> bool {
fn date_filter(value: columnar::DateTime) -> bool {
(crate::DateTime::from(value).into_utc()
- OffsetDateTime::parse("2019-04-09T00:00:00+00:00", &Rfc3339).unwrap())
.whole_weeks()
@@ -158,7 +160,7 @@ pub struct FastFieldTestCollector {
pub struct FastFieldSegmentCollector {
vals: Vec<u64>,
reader: Column,
reader: Arc<dyn columnar::ColumnValues>,
}
impl FastFieldTestCollector {
@@ -201,7 +203,8 @@ impl SegmentCollector for FastFieldSegmentCollector {
type Fruit = Vec<u64>;
fn collect(&mut self, doc: DocId, _score: Score) {
self.vals.extend(self.reader.values(doc));
let val = self.reader.get_val(doc);
self.vals.push(val);
}
fn harvest(self) -> Vec<u64> {

View File

@@ -457,10 +457,9 @@ impl TopDocs {
/// // Typically, fast_fields.
/// //
/// // In our case, we will get a reader for the popularity
/// // fast field. For simplicity we read the first or default value in the fast
/// // field.
/// // fast field.
/// let popularity_reader =
/// segment_reader.fast_fields().u64("popularity").unwrap().first_or_default_col(0);
/// segment_reader.fast_fields().u64("popularity").unwrap();
///
/// // We can now define our actual scoring function
/// move |doc: DocId, original_score: Score| {
@@ -567,9 +566,9 @@ impl TopDocs {
/// // Note that this is implemented by using a `(u64, u64)`
/// // as a score.
/// let popularity_reader =
/// segment_reader.fast_fields().u64("popularity").unwrap().first_or_default_col(0);
/// segment_reader.fast_fields().u64("popularity").unwrap();
/// let boosted_reader =
/// segment_reader.fast_fields().u64("boosted").unwrap().first_or_default_col(0);
/// segment_reader.fast_fields().u64("boosted").unwrap();
///
/// // We can now define our actual scoring function
/// move |doc: DocId| {

View File

@@ -196,21 +196,8 @@ impl MmapDirectory {
directory_path,
)));
}
#[allow(clippy::bind_instead_of_map)]
let canonical_path: PathBuf = directory_path.canonicalize().or_else(|io_err| {
let directory_path = directory_path.to_owned();
#[cfg(windows)]
{
// `canonicalize` returns "Incorrect function" (error code 1)
// for virtual drives (network drives, ramdisk, etc.).
if io_err.raw_os_error() == Some(1) && directory_path.exists() {
// Should call `std::path::absolute` when it is stabilised.
return Ok(directory_path);
}
}
Err(OpenDirectoryError::wrap_io_error(io_err, directory_path))
let canonical_path: PathBuf = directory_path.canonicalize().map_err(|io_err| {
OpenDirectoryError::wrap_io_error(io_err, PathBuf::from(directory_path))
})?;
if !canonical_path.is_dir() {
return Err(OpenDirectoryError::NotADirectory(PathBuf::from(
@@ -456,16 +443,6 @@ impl Directory for MmapDirectory {
Ok(self.inner.watch(watch_callback))
}
#[cfg(windows)]
fn sync_directory(&self) -> Result<(), io::Error> {
// On Windows, it is not necessary to fsync the parent directory to
// ensure that the directory entry containing the file has also reached
// disk, and calling sync_data on a handle to directory is a no-op on
// local disks, but will return an error on virtual drives.
Ok(())
}
#[cfg(not(windows))]
fn sync_directory(&self) -> Result<(), io::Error> {
let mut open_opts = OpenOptions::new();
@@ -473,6 +450,19 @@ impl Directory for MmapDirectory {
// write must not be set, or it fails with EISDIR
open_opts.read(true);
// On Windows, opening a directory requires FILE_FLAG_BACKUP_SEMANTICS
// and calling sync_all() only works if write access is requested.
#[cfg(windows)]
{
use std::os::windows::fs::OpenOptionsExt;
use winapi::um::winbase;
open_opts
.write(true)
.custom_flags(winbase::FILE_FLAG_BACKUP_SEMANTICS);
}
let fd = open_opts.open(&self.inner.root_path)?;
fd.sync_data()?;
Ok(())

View File

@@ -49,6 +49,11 @@ impl AliveBitSet {
Self::open(alive_bitset_bytes)
}
pub(crate) fn from_bitset(bitset: &BitSet) -> AliveBitSet {
let readonly_bitset = ReadOnlyBitSet::from(bitset);
AliveBitSet::from(readonly_bitset)
}
/// Opens an alive bitset given its file.
pub fn open(bytes: OwnedBytes) -> AliveBitSet {
let bitset = ReadOnlyBitSet::open(bytes);

View File

@@ -54,7 +54,6 @@ impl FacetReader {
self.facet_column.ords().values(doc)
}
/// Accessor to the facet dictionary.
pub fn facet_dict(&self) -> &columnar::Dictionary {
self.facet_column.dictionary()
}

View File

@@ -22,6 +22,7 @@
use std::net::Ipv6Addr;
pub use columnar::Column;
use columnar::MonotonicallyMappableToU64;
pub use self::alive_bitset::{intersect_alive_bitsets, write_alive_bitset, AliveBitSet};
pub use self::error::{FastFieldNotAvailableError, Result};
@@ -102,13 +103,25 @@ impl FastValue for DateTime {
}
}
impl columnar::MonotonicallyMappableToU64 for DateTime {
fn to_u64(self) -> u64 {
self.timestamp_micros.to_u64()
}
fn from_u64(val: u64) -> Self {
DateTime {
timestamp_micros: MonotonicallyMappableToU64::from_u64(val),
}
}
}
#[cfg(test)]
mod tests {
use std::ops::{Range, RangeInclusive};
use std::path::Path;
use columnar::{Column, MonotonicallyMappableToU64};
use columnar::Column;
use common::{HasLen, TerminatingWrite};
use once_cell::sync::Lazy;
use rand::prelude::SliceRandom;
@@ -160,10 +173,7 @@ mod tests {
assert_eq!(file.len(), 161);
let fast_field_readers = FastFieldReaders::open(file).unwrap();
let column = fast_field_readers
.u64("field")
.unwrap()
.first_or_default_col(0);
let column = fast_field_readers.u64("field").unwrap();
assert_eq!(column.get_val(0), 13u64);
assert_eq!(column.get_val(1), 14u64);
assert_eq!(column.get_val(2), 2u64);
@@ -210,10 +220,7 @@ mod tests {
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 189);
let fast_field_readers = FastFieldReaders::open(file).unwrap();
let col = fast_field_readers
.u64("field")
.unwrap()
.first_or_default_col(0);
let col = fast_field_readers.u64("field").unwrap();
assert_eq!(col.get_val(0), 4u64);
assert_eq!(col.get_val(1), 14_082_001u64);
assert_eq!(col.get_val(2), 3_052u64);
@@ -243,10 +250,7 @@ mod tests {
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 162);
let fast_field_readers = FastFieldReaders::open(file).unwrap();
let fast_field_reader = fast_field_readers
.u64("field")
.unwrap()
.first_or_default_col(0);
let fast_field_reader = fast_field_readers.u64("field").unwrap();
for doc in 0..10_000 {
assert_eq!(fast_field_reader.get_val(doc), 100_000u64);
}
@@ -276,10 +280,7 @@ mod tests {
assert_eq!(file.len(), 4557);
{
let fast_field_readers = FastFieldReaders::open(file).unwrap();
let col = fast_field_readers
.u64("field")
.unwrap()
.first_or_default_col(0);
let col = fast_field_readers.u64("field").unwrap();
for doc in 1..10_000 {
assert_eq!(col.get_val(doc), 5_000_000_000_000_000_000u64 + doc as u64);
}
@@ -310,10 +311,7 @@ mod tests {
{
let fast_field_readers = FastFieldReaders::open(file).unwrap();
let col = fast_field_readers
.i64("field")
.unwrap()
.first_or_default_col(0);
let col = fast_field_readers.i64("field").unwrap();
assert_eq!(col.min_value(), -100i64);
assert_eq!(col.max_value(), 9_999i64);
for (doc, i) in (-100i64..10_000i64).enumerate() {
@@ -348,18 +346,7 @@ mod tests {
let file = directory.open_read(path).unwrap();
let fast_field_readers = FastFieldReaders::open(file).unwrap();
let col = fast_field_readers.i64("field").unwrap();
assert_eq!(col.first(0), None);
let col = fast_field_readers
.i64("field")
.unwrap()
.first_or_default_col(0);
assert_eq!(col.get_val(0), 0);
let col = fast_field_readers
.i64("field")
.unwrap()
.first_or_default_col(-100);
assert_eq!(col.get_val(0), -100);
assert_eq!(col.get_val(0), 0i64);
}
#[test]
@@ -380,11 +367,8 @@ mod tests {
let file = directory.open_read(path).unwrap();
let fast_field_readers = FastFieldReaders::open(file).unwrap();
let col = fast_field_readers
.date("date")
.unwrap()
.first_or_default_col(DateTime::default());
assert_eq!(col.get_val(0), DateTime::default());
let col = fast_field_readers.date("date").unwrap();
assert_eq!(col.get_val(0), columnar::DateTime::default());
}
// Warning: this generates the same permutation at each call
@@ -416,10 +400,7 @@ mod tests {
}
let file = directory.open_read(path).unwrap();
let fast_field_readers = FastFieldReaders::open(file).unwrap();
let col = fast_field_readers
.u64("field")
.unwrap()
.first_or_default_col(0);
let col = fast_field_readers.u64("field").unwrap();
for a in 0..n {
assert_eq!(col.get_val(a as u32), permutation[a]);
}
@@ -446,7 +427,7 @@ mod tests {
let mut index_writer = index.writer_for_tests().unwrap();
index_writer.set_merge_policy(Box::new(NoMergePolicy));
index_writer
.add_document(doc!(date_field => DateTime::from_utc(OffsetDateTime::now_utc())))
.add_document(doc!(date_field =>DateTime::from_utc(OffsetDateTime::now_utc())))
.unwrap();
index_writer.commit().unwrap();
index_writer.add_document(doc!()).unwrap();
@@ -744,12 +725,12 @@ mod tests {
let segment_reader = searcher.segment_reader(0);
let fast_fields = segment_reader.fast_fields();
let date_fast_field = fast_fields
.column_opt::<DateTime>("date")
.column_opt::<columnar::DateTime>("date")
.unwrap()
.unwrap()
.first_or_default_col(Default::default());
let dates_fast_field = fast_fields
.column_opt::<DateTime>("multi_date")
.column_opt::<columnar::DateTime>("multi_date")
.unwrap()
.unwrap();
let mut dates = vec![];
@@ -803,10 +784,10 @@ mod tests {
assert_eq!(file.len(), 175);
let fast_field_readers = FastFieldReaders::open(file).unwrap();
let bool_col = fast_field_readers.bool("field_bool").unwrap();
assert_eq!(bool_col.first(0), Some(true));
assert_eq!(bool_col.first(1), Some(false));
assert_eq!(bool_col.first(2), Some(true));
assert_eq!(bool_col.first(3), Some(false));
assert_eq!(bool_col.get_val(0), true);
assert_eq!(bool_col.get_val(1), false);
assert_eq!(bool_col.get_val(2), true);
assert_eq!(bool_col.get_val(3), false);
}
#[test]
@@ -836,8 +817,8 @@ mod tests {
let readers = FastFieldReaders::open(file).unwrap();
let bool_col = readers.bool("field_bool").unwrap();
for i in 0..25 {
assert_eq!(bool_col.first(i * 2), Some(true));
assert_eq!(bool_col.first(i * 2 + 1), Some(false));
assert_eq!(bool_col.get_val(i * 2), true);
assert_eq!(bool_col.get_val(i * 2 + 1), false);
}
}
@@ -860,17 +841,7 @@ mod tests {
assert_eq!(file.len(), 177);
let fastfield_readers = FastFieldReaders::open(file).unwrap();
let col = fastfield_readers.bool("field_bool").unwrap();
assert_eq!(col.first(0), None);
let col = fastfield_readers
.bool("field_bool")
.unwrap()
.first_or_default_col(false);
assert_eq!(col.get_val(0), false);
let col = fastfield_readers
.bool("field_bool")
.unwrap()
.first_or_default_col(true);
assert_eq!(col.get_val(0), true);
}
fn get_index(docs: &[crate::Document], schema: &Schema) -> crate::Result<RamDirectory> {
@@ -924,7 +895,7 @@ mod tests {
let col = readers.date("field").unwrap();
for (i, time) in times.iter().enumerate() {
let dt: DateTime = col.first(i as u32).unwrap().into();
let dt: crate::DateTime = col.get_val(i as u32).into();
assert_eq!(dt, time.truncate(precision));
}
readers.column_num_bytes("field").unwrap()
@@ -960,17 +931,13 @@ mod tests {
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let segment = &searcher.segment_readers()[0];
let field = segment
.fast_fields()
.u64("url_norm_hash")
.unwrap()
.first_or_default_col(0);
let field = segment.fast_fields().u64("url_norm_hash").unwrap();
let numbers = vec![100, 200, 300];
let test_range = |range: RangeInclusive<u64>| {
let expexted_count = numbers.iter().filter(|num| range.contains(num)).count();
let mut vec = vec![];
field.get_row_ids_for_value_range(range, 0..u32::MAX, &mut vec);
field.get_docids_for_value_range(range, 0..u32::MAX, &mut vec);
assert_eq!(vec.len(), expexted_count);
};
test_range(50..=50);
@@ -998,7 +965,7 @@ mod tests {
let searcher = index.reader().unwrap().searcher();
let fastfields = searcher.segment_reader(0u32).fast_fields();
let column: Column<Ipv6Addr> = fastfields.column_opt("ip").unwrap().unwrap();
assert_eq!(column.num_docs(), 3);
assert_eq!(column.num_rows(), 3);
assert_eq!(column.first(0), None);
assert_eq!(column.first(1), Some(ip_addr));
assert_eq!(column.first(2), None);
@@ -1034,17 +1001,13 @@ mod tests {
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let segment = &searcher.segment_readers()[0];
let field = segment
.fast_fields()
.u64("url_norm_hash")
.unwrap()
.first_or_default_col(0);
let field = segment.fast_fields().u64("url_norm_hash").unwrap();
let numbers = vec![1000, 1001, 1003];
let test_range = |range: RangeInclusive<u64>| {
let expexted_count = numbers.iter().filter(|num| range.contains(num)).count();
let mut vec = vec![];
field.get_row_ids_for_value_range(range, 0..u32::MAX, &mut vec);
field.get_docids_for_value_range(range, 0..u32::MAX, &mut vec);
assert_eq!(vec.len(), expexted_count);
};
let test_range_variant = |start, stop| {

View File

@@ -81,49 +81,39 @@ impl FastFieldReaders {
/// - Rows with no value are associated with the default value.
/// - Rows with several values are associated with the first value.
pub fn column_first_or_default<T>(&self, field: &str) -> crate::Result<Arc<dyn ColumnValues<T>>>
where
T: PartialOrd + Copy + HasAssociatedColumnType + Send + Sync + 'static,
DynamicColumn: Into<Option<Column<T>>>,
{
let col: Column<T> = self.column(field)?;
Ok(col.first_or_default_col(T::default_value()))
}
/// Returns a typed column associated to a given field name.
///
/// Returns an error if no column associated with that field_name exists.
pub fn column<T>(&self, field: &str) -> crate::Result<Column<T>>
where
T: PartialOrd + Copy + HasAssociatedColumnType + Send + Sync + 'static,
DynamicColumn: Into<Option<Column<T>>>,
{
let col_opt: Option<Column<T>> = self.column_opt(field)?;
col_opt.ok_or_else(|| {
crate::TantivyError::SchemaError(format!(
if let Some(col) = col_opt {
Ok(col.first_or_default_col(T::default_value()))
} else {
Err(crate::TantivyError::SchemaError(format!(
"Field `{field}` is missing or is not configured as a fast field."
))
})
)))
}
}
/// Returns the `u64` fast field reader reader associated with `field`.
///
/// If `field` is not a u64 fast field, this method returns an Error.
pub fn u64(&self, field: &str) -> crate::Result<Column<u64>> {
self.column(field)
pub fn u64(&self, field: &str) -> crate::Result<Arc<dyn ColumnValues<u64>>> {
self.column_first_or_default(field)
}
/// Returns the `date` fast field reader reader associated with `field`.
///
/// If `field` is not a date fast field, this method returns an Error.
pub fn date(&self, field: &str) -> crate::Result<Column<common::DateTime>> {
self.column(field)
pub fn date(&self, field: &str) -> crate::Result<Arc<dyn ColumnValues<columnar::DateTime>>> {
self.column_first_or_default(field)
}
/// Returns the `ip` fast field reader reader associated to `field`.
///
/// If `field` is not a u128 fast field, this method returns an Error.
pub fn ip_addr(&self, field: &str) -> crate::Result<Column<Ipv6Addr>> {
self.column(field)
pub fn ip_addr(&self, field: &str) -> crate::Result<Arc<dyn ColumnValues<Ipv6Addr>>> {
self.column_first_or_default(field)
}
/// Returns a `str` column.
@@ -156,7 +146,8 @@ impl FastFieldReaders {
.columnar
.read_columns(field_name)?
.into_iter()
.find(|column| column.column_type() == column_type);
.filter(|column| column.column_type() == column_type)
.next();
Ok(dynamic_column_handle_opt)
}
@@ -174,21 +165,21 @@ impl FastFieldReaders {
/// Returns the `i64` fast field reader reader associated with `field`.
///
/// If `field` is not a i64 fast field, this method returns an Error.
pub fn i64(&self, field_name: &str) -> crate::Result<Column<i64>> {
self.column(field_name)
pub fn i64(&self, field_name: &str) -> crate::Result<Arc<dyn ColumnValues<i64>>> {
self.column_first_or_default(field_name)
}
/// Returns the `f64` fast field reader reader associated with `field`.
///
/// If `field` is not a f64 fast field, this method returns an Error.
pub fn f64(&self, field_name: &str) -> crate::Result<Column<f64>> {
self.column(field_name)
pub fn f64(&self, field_name: &str) -> crate::Result<Arc<dyn ColumnValues<f64>>> {
self.column_first_or_default(field_name)
}
/// Returns the `bool` fast field reader reader associated with `field`.
///
/// If `field` is not a bool fast field, this method returns an Error.
pub fn bool(&self, field_name: &str) -> crate::Result<Column<bool>> {
self.column(field_name)
pub fn bool(&self, field_name: &str) -> crate::Result<Arc<dyn ColumnValues<bool>>> {
self.column_first_or_default(field_name)
}
}

View File

@@ -126,7 +126,7 @@ impl FastFieldsWriter {
self.columnar_writer.record_datetime(
doc_id,
field_name.as_str(),
truncated_datetime,
truncated_datetime.into(),
);
}
Value::Facet(facet) => {

View File

@@ -802,7 +802,6 @@ mod tests {
use std::net::Ipv6Addr;
use columnar::{Cardinality, Column, MonotonicallyMappableToU128};
use itertools::Itertools;
use proptest::prop_oneof;
use proptest::strategy::Strategy;
@@ -1487,10 +1486,9 @@ mod tests {
assert_eq!(segment_reader.num_docs(), 8);
assert_eq!(segment_reader.max_doc(), 10);
let fast_field_reader = segment_reader.fast_fields().u64("id")?;
let in_order_alive_ids: Vec<u64> = segment_reader
.doc_ids_alive()
.flat_map(|doc| fast_field_reader.values(doc))
.map(|doc| fast_field_reader.get_val(doc))
.collect();
assert_eq!(&in_order_alive_ids[..], &[9, 8, 7, 6, 5, 4, 1, 0]);
Ok(())
@@ -1550,7 +1548,7 @@ mod tests {
let fast_field_reader = segment_reader.fast_fields().u64("id")?;
let in_order_alive_ids: Vec<u64> = segment_reader
.doc_ids_alive()
.flat_map(|doc| fast_field_reader.values(doc))
.map(|doc| fast_field_reader.get_val(doc))
.collect();
assert_eq!(&in_order_alive_ids[..], &[9, 8, 7, 6, 5, 4, 2, 0]);
Ok(())
@@ -1795,7 +1793,7 @@ mod tests {
let ff_reader = segment_reader.fast_fields().u64("id").unwrap();
segment_reader
.doc_ids_alive()
.flat_map(move |doc| ff_reader.values(doc).collect_vec().into_iter())
.map(move |doc| ff_reader.get_val(doc))
})
.collect();
@@ -1806,7 +1804,7 @@ mod tests {
let ff_reader = segment_reader.fast_fields().u64("id").unwrap();
segment_reader
.doc_ids_alive()
.flat_map(move |doc| ff_reader.values(doc).collect_vec().into_iter())
.map(move |doc| ff_reader.get_val(doc))
})
.collect();
@@ -1872,7 +1870,7 @@ mod tests {
.column_opt::<Ipv6Addr>("ips")
.unwrap()
.unwrap();
ff_reader.num_docs() as usize
ff_reader.num_rows() as usize
})
.sum();
assert_eq!(num_docs, num_docs_expected);
@@ -1938,7 +1936,7 @@ mod tests {
let vals: Vec<u64> = ff_reader.values(doc).collect();
assert_eq!(vals.len(), 2);
assert_eq!(vals[0], vals[1]);
assert_eq!(id_reader.first(doc), Some(vals[0]));
assert_eq!(id_reader.get_val(doc), vals[0]);
let bool_vals: Vec<bool> = bool_ff_reader.values(doc).collect();
assert_eq!(bool_vals.len(), 2);
@@ -2091,12 +2089,11 @@ mod tests {
let do_search_ip_field = |term: &str| do_search(term, ip_field).len() as u64;
// Range query on single value field
let query = gen_query_inclusive("ip", ip, ip);
assert_eq!(do_search_ip_field(&query), count);
// let query = gen_query_inclusive("ip", ip, ip);
// assert_eq!(do_search_ip_field(&query), count);
// Range query on multi value field
let query = gen_query_inclusive("ips", ip, ip);
assert_eq!(do_search_ip_field(&query), count);
}
@@ -2114,8 +2111,8 @@ mod tests {
let do_search_ip_field = |term: &str| do_search(term, ip_field).len() as u64;
// Range query on single value field
let query = gen_query_inclusive("ip", ip, ip);
assert_eq!(do_search_ip_field(&query), count);
// let query = gen_query_inclusive("ip", ip, ip);
// assert_eq!(do_search_ip_field(&query), count);
// Range query on multi value field
let query = gen_query_inclusive("ips", ip, ip);
@@ -2125,11 +2122,7 @@ mod tests {
// test facets
for segment_reader in searcher.segment_readers().iter() {
let facet_reader = segment_reader.facet_reader("facet").unwrap();
let ff_reader = segment_reader
.fast_fields()
.u64("id")
.unwrap()
.first_or_default_col(0);
let ff_reader = segment_reader.fast_fields().u64("id").unwrap();
for doc_id in segment_reader.doc_ids_alive() {
let facet_ords: Vec<u64> = facet_reader.facet_ords(doc_id).collect();
assert_eq!(facet_ords.len(), 1);
@@ -2379,30 +2372,6 @@ mod tests {
test_operation_strategy(&ops[..], false, true).unwrap();
}
#[test]
fn test_range_query_bug_1() {
use IndexingOp::*;
let ops = &[
AddDoc { id: 9 },
AddDoc { id: 0 },
AddDoc { id: 13 },
Commit,
];
test_operation_strategy(&ops[..], false, true).unwrap();
}
#[test]
fn test_range_query_bug_2() {
use IndexingOp::*;
let ops = &[
AddDoc { id: 3 },
AddDoc { id: 6 },
AddDoc { id: 9 },
AddDoc { id: 10 },
];
test_operation_strategy(&ops[..], false, false).unwrap();
}
#[test]
fn test_index_doc_missing_field() -> crate::Result<()> {
let mut schema_builder = schema::Schema::builder();

View File

@@ -110,8 +110,8 @@ impl DeltaComputer {
}
}
fn convert_to_merge_order(
columnars: &[&ColumnarReader],
fn convert_to_merge_order<'a>(
columnars: &[&'a ColumnarReader],
doc_id_mapping: SegmentDocIdMapping,
) -> MergeRowOrder {
match doc_id_mapping.mapping_type() {
@@ -369,8 +369,11 @@ impl IndexMerger {
.readers
.iter()
.map(|segment_reader| {
let alive_bitset = segment_reader.alive_bitset()?;
Some(alive_bitset.bitset().clone())
if let Some(alive_bitset) = segment_reader.alive_bitset() {
Some(alive_bitset.bitset().clone())
} else {
None
}
})
.collect();
Ok(SegmentDocIdMapping::new(
@@ -413,8 +416,11 @@ impl IndexMerger {
.readers
.iter()
.map(|reader| {
let alive_bitset = reader.alive_bitset()?;
Some(alive_bitset.bitset().clone())
if let Some(bitset) = reader.alive_bitset() {
Some(bitset.bitset().clone())
} else {
None
}
})
.collect();
Ok(SegmentDocIdMapping::new(

View File

@@ -183,17 +183,17 @@ mod tests {
let fast_fields = segment_reader.fast_fields();
let fast_field = fast_fields.u64("intval").unwrap();
assert_eq!(fast_field.first(5), Some(1u64));
assert_eq!(fast_field.first(4), Some(2u64));
assert_eq!(fast_field.first(3), Some(3u64));
assert_eq!(fast_field.get_val(5), 1u64);
assert_eq!(fast_field.get_val(4), 2u64);
assert_eq!(fast_field.get_val(3), 3u64);
if force_disjunct_segment_sort_values {
assert_eq!(fast_field.first(2), Some(20u64));
assert_eq!(fast_field.first(1), Some(100u64));
assert_eq!(fast_field.get_val(2), 20u64);
assert_eq!(fast_field.get_val(1), 100u64);
} else {
assert_eq!(fast_field.first(2), Some(10u64));
assert_eq!(fast_field.first(1), Some(20u64));
assert_eq!(fast_field.get_val(2), 10u64);
assert_eq!(fast_field.get_val(1), 20u64);
}
assert_eq!(fast_field.first(0), Some(1_000u64));
assert_eq!(fast_field.get_val(0), 1_000u64);
// test new field norm mapping
{
@@ -560,9 +560,7 @@ mod bench_sorted_index_merge {
let merger: IndexMerger =
IndexMerger::open(index.schema(), index.settings().clone(), &segments[..])?;
b.iter(|| {
merger
.generate_doc_id_mapping_with_sort_by_field(&sort_by_field)
.unwrap();
merger.generate_doc_id_mapping(&sort_by_field).unwrap();
});
Ok(())

View File

@@ -333,12 +333,12 @@ impl SegmentWriter {
///
/// As a user, you should rather use `IndexWriter`'s add_document.
pub fn add_document(&mut self, add_operation: AddOperation) -> crate::Result<()> {
let AddOperation { document, opstamp } = add_operation;
self.doc_opstamps.push(opstamp);
self.fast_field_writers.add_document(&document)?;
self.index_document(&document)?;
let doc = add_operation.document;
self.doc_opstamps.push(add_operation.opstamp);
self.fast_field_writers.add_document(&doc)?;
self.index_document(&doc)?;
let doc_writer = self.segment_serializer.get_store_writer();
doc_writer.store(&document, &self.schema)?;
doc_writer.store(&doc, &self.schema)?;
self.max_doc += 1;
Ok(())
}

View File

@@ -123,12 +123,146 @@ mod functional_test;
mod macros;
mod future_result;
pub use common::DateTime;
/// Re-export of the `time` crate
///
/// Tantivy uses [`time`](https://crates.io/crates/time) for dates.
pub use time;
use crate::time::format_description::well_known::Rfc3339;
use crate::time::{OffsetDateTime, PrimitiveDateTime, UtcOffset};
/// A date/time value with microsecond precision.
///
/// This timestamp does not carry any explicit time zone information.
/// Users are responsible for applying the provided conversion
/// functions consistently. Internally the time zone is assumed
/// to be UTC, which is also used implicitly for JSON serialization.
///
/// All constructors and conversions are provided as explicit
/// functions and not by implementing any `From`/`Into` traits
/// to prevent unintended usage.
#[derive(Clone, Default, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct DateTime {
// Timestamp in microseconds.
pub(crate) timestamp_micros: i64,
}
impl From<columnar::DateTime> for DateTime {
fn from(columnar_datetime: columnar::DateTime) -> Self {
DateTime {
timestamp_micros: columnar_datetime.timestamp_micros,
}
}
}
impl From<DateTime> for columnar::DateTime {
fn from(datetime: crate::DateTime) -> Self {
columnar::DateTime {
timestamp_micros: datetime.timestamp_micros,
}
}
}
impl DateTime {
/// Create new from UNIX timestamp in seconds
pub const fn from_timestamp_secs(seconds: i64) -> Self {
Self {
timestamp_micros: seconds * 1_000_000,
}
}
/// Create new from UNIX timestamp in milliseconds
pub const fn from_timestamp_millis(milliseconds: i64) -> Self {
Self {
timestamp_micros: milliseconds * 1_000,
}
}
/// Create new from UNIX timestamp in microseconds.
pub const fn from_timestamp_micros(microseconds: i64) -> Self {
Self {
timestamp_micros: microseconds,
}
}
/// Create new from `OffsetDateTime`
///
/// The given date/time is converted to UTC and the actual
/// time zone is discarded.
pub const fn from_utc(dt: OffsetDateTime) -> Self {
let timestamp_micros = dt.unix_timestamp() * 1_000_000 + dt.microsecond() as i64;
Self { timestamp_micros }
}
/// Create new from `PrimitiveDateTime`
///
/// Implicitly assumes that the given date/time is in UTC!
/// Otherwise the original value must only be reobtained with
/// [`Self::into_primitive()`].
pub fn from_primitive(dt: PrimitiveDateTime) -> Self {
Self::from_utc(dt.assume_utc())
}
/// Convert to UNIX timestamp in seconds.
pub const fn into_timestamp_secs(self) -> i64 {
self.timestamp_micros / 1_000_000
}
/// Convert to UNIX timestamp in milliseconds.
pub const fn into_timestamp_millis(self) -> i64 {
self.timestamp_micros / 1_000
}
/// Convert to UNIX timestamp in microseconds.
pub const fn into_timestamp_micros(self) -> i64 {
self.timestamp_micros
}
/// Convert to UTC `OffsetDateTime`
pub fn into_utc(self) -> OffsetDateTime {
let timestamp_nanos = self.timestamp_micros as i128 * 1000;
let utc_datetime = OffsetDateTime::from_unix_timestamp_nanos(timestamp_nanos)
.expect("valid UNIX timestamp");
debug_assert_eq!(UtcOffset::UTC, utc_datetime.offset());
utc_datetime
}
/// Convert to `OffsetDateTime` with the given time zone
pub fn into_offset(self, offset: UtcOffset) -> OffsetDateTime {
self.into_utc().to_offset(offset)
}
/// Convert to `PrimitiveDateTime` without any time zone
///
/// The value should have been constructed with [`Self::from_primitive()`].
/// Otherwise the time zone is implicitly assumed to be UTC.
pub fn into_primitive(self) -> PrimitiveDateTime {
let utc_datetime = self.into_utc();
// Discard the UTC time zone offset
debug_assert_eq!(UtcOffset::UTC, utc_datetime.offset());
PrimitiveDateTime::new(utc_datetime.date(), utc_datetime.time())
}
/// Truncates the microseconds value to the corresponding precision.
pub(crate) fn truncate(self, precision: DatePrecision) -> Self {
let truncated_timestamp_micros = match precision {
DatePrecision::Seconds => (self.timestamp_micros / 1_000_000) * 1_000_000,
DatePrecision::Milliseconds => (self.timestamp_micros / 1_000) * 1_000,
DatePrecision::Microseconds => self.timestamp_micros,
};
Self {
timestamp_micros: truncated_timestamp_micros,
}
}
}
impl fmt::Debug for DateTime {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let utc_rfc3339 = self.into_utc().format(&Rfc3339).map_err(|_| fmt::Error)?;
f.write_str(&utc_rfc3339)
}
}
pub use crate::error::TantivyError;
pub use crate::future_result::FutureResult;
@@ -911,21 +1045,21 @@ pub mod tests {
let fast_field_reader_opt = segment_reader.fast_fields().u64("unsigned");
assert!(fast_field_reader_opt.is_ok());
let fast_field_reader = fast_field_reader_opt.unwrap();
assert_eq!(fast_field_reader.first(0), Some(4u64))
assert_eq!(fast_field_reader.get_val(0), 4u64)
}
{
let fast_field_reader_res = segment_reader.fast_fields().i64("signed");
assert!(fast_field_reader_res.is_ok());
let fast_field_reader = fast_field_reader_res.unwrap();
assert_eq!(fast_field_reader.first(0), Some(4i64))
assert_eq!(fast_field_reader.get_val(0), 4i64)
}
{
let fast_field_reader_res = segment_reader.fast_fields().f64("float");
assert!(fast_field_reader_res.is_ok());
let fast_field_reader = fast_field_reader_res.unwrap();
assert_eq!(fast_field_reader.first(0), Some(4f64))
assert_eq!(fast_field_reader.get_val(0), 4f64)
}
Ok(())
}

View File

@@ -13,7 +13,7 @@ pub(crate) struct IndexingContext {
impl IndexingContext {
/// Create a new IndexingContext given the size of the term hash map.
pub(crate) fn new(table_size: usize) -> IndexingContext {
let term_index = ArenaHashMap::with_capacity(table_size);
let term_index = ArenaHashMap::new(table_size);
IndexingContext {
arena: MemoryArena::default(),
term_index,

View File

@@ -1,7 +1,9 @@
use core::fmt::Debug;
use std::ops::RangeInclusive;
use std::sync::Arc;
use columnar::Column;
use columnar::column_index::ColumnIndexSelectCursor;
use columnar::{Column, ColumnValues};
use crate::fastfield::MakeZero;
use crate::{DocId, DocSet, TERMINATED};
@@ -43,7 +45,9 @@ impl VecCursor {
pub(crate) struct RangeDocSet<T: MakeZero> {
/// The range filter on the values.
value_range: RangeInclusive<T>,
column: Column<T>,
column_index_select_cursor: ColumnIndexSelectCursor,
column_values: Arc<dyn ColumnValues<T>>,
/// The next docid start range to fetch (inclusive).
next_fetch_start: u32,
/// Number of docs range checked in a batch.
@@ -63,13 +67,15 @@ pub(crate) struct RangeDocSet<T: MakeZero> {
const DEFAULT_FETCH_HORIZON: u32 = 128;
impl<T: MakeZero + Send + Sync + PartialOrd + Copy + Debug + 'static> RangeDocSet<T> {
pub(crate) fn new(value_range: RangeInclusive<T>, column: Column<T>) -> Self {
let column_index_select_cursor = column.select_cursor();
let mut range_docset = Self {
value_range,
column,
column_values: column.values,
loaded_docs: VecCursor::new(),
next_fetch_start: 0,
fetch_horizon: DEFAULT_FETCH_HORIZON,
last_seek_pos_opt: None,
column_index_select_cursor,
};
range_docset.reset_fetch_range();
range_docset.fetch_block();
@@ -106,25 +112,21 @@ impl<T: MakeZero + Send + Sync + PartialOrd + Copy + Debug + 'static> RangeDocSe
fn fetch_horizon(&mut self, horizon: u32) -> bool {
let mut finished_to_end = false;
let limit = self.column.num_docs();
let limit = self.column_values.num_vals();
let mut end = self.next_fetch_start + horizon;
if end >= limit {
end = limit;
finished_to_end = true;
}
let last_value = self.loaded_docs.last_value();
let doc_buffer: &mut Vec<DocId> = self.loaded_docs.get_cleared_data();
self.column.get_docids_for_value_range(
self.column_values.get_docids_for_value_range(
self.value_range.clone(),
self.next_fetch_start..end,
doc_buffer,
);
if let Some(last_value) = last_value {
while self.loaded_docs.current() == Some(last_value) {
self.loaded_docs.next();
}
}
self.column_index_select_cursor
.select_batch_in_place(doc_buffer);
self.next_fetch_start = end;
finished_to_end
@@ -137,7 +139,7 @@ impl<T: MakeZero + Send + Sync + PartialOrd + Copy + Debug + 'static> DocSet for
if let Some(docid) = self.loaded_docs.next() {
return docid;
}
if self.next_fetch_start >= self.column.values.num_vals() {
if self.next_fetch_start >= self.column_values.num_vals() {
return TERMINATED;
}
self.fetch_block();

View File

@@ -154,12 +154,6 @@ pub mod tests {
assert!(test_id_range_for_docs(ops).is_ok());
}
#[test]
fn test_range_regression3() {
let ops = vec![doc_from_id_1(9), doc_from_id_1(0), doc_from_id_1(13)];
assert!(test_id_range_for_docs(ops).is_ok());
}
#[test]
fn test_range_regression_simplified() {
let mut schema_builder = SchemaBuilder::new();

View File

@@ -1,10 +1,24 @@
use std::ops::BitOr;
pub use common::DatePrecision;
use serde::{Deserialize, Serialize};
use crate::schema::flags::{FastFlag, IndexedFlag, SchemaFlagList, StoredFlag};
/// DateTime Precision
#[derive(
Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Default,
)]
#[serde(rename_all = "lowercase")]
pub enum DatePrecision {
/// Seconds precision
#[default]
Seconds,
/// Milli-seconds precision.
Milliseconds,
/// Micro-seconds precision.
Microseconds,
}
/// Defines how DateTime field should be handled by tantivy.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct DateOptions {

View File

@@ -380,7 +380,9 @@ mod binary_serialize {
}
Value::Date(ref val) => {
DATE_CODE.serialize(writer)?;
let timestamp_micros = val.into_timestamp_micros();
let DateTime {
timestamp_micros, ..
} = val;
timestamp_micros.serialize(writer)
}
Value::Facet(ref facet) => {

View File

@@ -90,7 +90,7 @@ impl CheckpointBlock {
return Ok(());
}
let mut doc = read_u32_vint(data);
let mut start_offset = VInt::deserialize_u64(data)? as usize;
let mut start_offset = read_u32_vint(data) as usize;
for _ in 0..len {
let num_docs = read_u32_vint(data);
let block_num_bytes = read_u32_vint(data) as usize;
@@ -147,15 +147,6 @@ mod tests {
test_aux_ser_deser(&checkpoints)
}
#[test]
fn test_block_serialize_large_byte_range() -> io::Result<()> {
let checkpoints = vec![Checkpoint {
doc_range: 10..12,
byte_range: 8_000_000_000..9_000_000_000,
}];
test_aux_ser_deser(&checkpoints)
}
#[test]
fn test_block_serialize() -> io::Result<()> {
let offsets: Vec<usize> = (0..11).map(|i| i * i * i).collect();

View File

@@ -27,10 +27,8 @@ pub struct FacetTokenStream<'a> {
impl Tokenizer for FacetTokenizer {
fn token_stream<'a>(&self, text: &'a str) -> BoxTokenStream<'a> {
let token = Token {
position: 0,
..Default::default()
};
let mut token = Token::default();
token.position = 0;
FacetTokenStream {
text,
state: State::RootFacetNotEmitted, //< pos is the first char that has not been processed yet.

View File

@@ -48,7 +48,8 @@ impl Dictionary<VoidSSTable> {
dictionary_writer.insert(term, &()).unwrap();
}
dictionary_writer.finish().unwrap();
Dictionary::from_bytes(OwnedBytes::new(buffer)).unwrap()
let dictionary = Dictionary::from_bytes(OwnedBytes::new(buffer)).unwrap();
dictionary
}
}

View File

@@ -103,21 +103,9 @@ fn compute_previous_power_of_two(n: usize) -> usize {
1 << msb
}
impl Default for ArenaHashMap {
fn default() -> Self {
let memory_arena = MemoryArena::default();
ArenaHashMap {
table: Box::new([]),
memory_arena,
mask: 0,
occupied: Vec::new(),
len: 0,
}
}
}
impl ArenaHashMap {
pub fn with_capacity(table_size: usize) -> ArenaHashMap {
pub fn new(table_size: usize) -> ArenaHashMap {
assert!(table_size > 0);
let table_size_power_of_2 = compute_previous_power_of_two(table_size);
let memory_arena = MemoryArena::default();
let table: Vec<KeyValue> = iter::repeat(KeyValue::default())
@@ -149,7 +137,7 @@ impl ArenaHashMap {
#[inline]
fn is_saturated(&self) -> bool {
self.table.len() <= self.occupied.len() * 3
self.table.len() < self.occupied.len() * 3
}
#[inline]
@@ -202,7 +190,7 @@ impl ArenaHashMap {
}
fn resize(&mut self) {
let new_len = (self.table.len() * 2).max(1 << 13);
let new_len = self.table.len() * 2;
let mask = new_len - 1;
self.mask = mask;
let new_table = vec![KeyValue::default(); new_len].into_boxed_slice();
@@ -300,7 +288,7 @@ mod tests {
#[test]
fn test_hash_map() {
let mut hash_map: ArenaHashMap = ArenaHashMap::default();
let mut hash_map: ArenaHashMap = ArenaHashMap::new(1 << 18);
hash_map.mutate_or_create(b"abc", |opt_val: Option<u32>| {
assert_eq!(opt_val, None);
3u32