Compare commits

..

11 Commits

Author SHA1 Message Date
Pascal Seitz
ba3215b469 reuse samples, add EstimateColumn
estimations can be expensive since the samples span the whole column
and depending on the implementation get_val can not be easily computed
without an index.
EstimateColumn adds a view over the column which limits num_vals
to 100_000.
2022-09-25 23:54:03 +08:00
PSeitz
dac7da780e Merge pull request #1545 from waywardmonkeys/remove-some-refs
clippy: Remove borrows that the compiler will do.
2022-09-23 15:33:23 +08:00
PSeitz
20c87903b2 fix multivalue ff index creation regression (#1543)
fixes multivalue ff regression by avoiding using `get_val`. Line::train calls repeatedly get_val, but get_val implementation on Column for multivalues is very slow. The fix is to use the iterator instead. Longterm fix should be to remove get_val access in serialization.

Old Code

test fastfield::bench::bench_multi_value_ff_merge_few_segments                                                           ... bench:  46,103,960 ns/iter (+/- 2,066,083)
test fastfield::bench::bench_multi_value_ff_merge_many_segments                                                          ... bench:  83,073,036 ns/iter (+/- 4,373,615)
est fastfield::bench::bench_multi_value_ff_merge_many_segments_log_merge                                                ... bench:  64,178,576 ns/iter (+/- 1,466,700)

Current

running 3 tests
test fastfield::multivalued::bench::bench_multi_value_ff_merge_few_segments                                              ... bench:  57,379,523 ns/iter (+/- 3,220,787)
test fastfield::multivalued::bench::bench_multi_value_ff_merge_many_segments                                             ... bench:  90,831,688 ns/iter (+/- 1,445,486)
test fastfield::multivalued::bench::bench_multi_value_ff_merge_many_segments_log_merge                                   ... bench: 158,313,264 ns/iter (+/- 28,823,250)

With Fix

running 3 tests
test fastfield::multivalued::bench::bench_multi_value_ff_merge_few_segments                                              ... bench:  57,635,671 ns/iter (+/- 2,707,361)
test fastfield::multivalued::bench::bench_multi_value_ff_merge_many_segments                                             ... bench:  91,468,712 ns/iter (+/- 11,393,581)
test fastfield::multivalued::bench::bench_multi_value_ff_merge_many_segments_log_merge                                   ... bench:  73,909,138 ns/iter (+/- 15,846,097)
2022-09-23 15:36:29 +09:00
PSeitz
f9c3947803 Merge pull request #1546 from waywardmonkeys/use-ux-from-bool
Use u8::from(bool), u64::from(bool).
2022-09-23 09:06:24 +08:00
Bruce Mitchener
e9a384bb15 Use u8::from(bool), u64::from(bool). 2022-09-22 22:44:53 +07:00
Bruce Mitchener
d231671fe2 clippy: Remove borrows that the compiler will do.
This started showing up with clippy in rust 1.64.
2022-09-22 22:38:23 +07:00
trinity-1686a
fa3d786a2f Add support for deleting all documents matching query (#1535)
* add support for deleting all documents matching query

#1494
2022-09-22 21:26:09 +09:00
Paul Masurel
75aafeeb9b Added a function to deep clone RamDirectory. (#1544) 2022-09-22 12:04:02 +02:00
PSeitz
6f066c7f65 Merge pull request #1541 from quickwit-oss/add_bench
add benchmarks for multivalued fastfield merge
2022-09-22 15:28:00 +08:00
Pascal Seitz
22e56aaee3 add benchmarks for multivalued fastfield merge 2022-09-22 11:25:41 +08:00
Paul Masurel
d641979127 Minor refactor of fast fields (#1538) 2022-09-21 12:55:03 +09:00
26 changed files with 679 additions and 742 deletions

View File

@@ -259,11 +259,7 @@ impl BitSet {
// we do not check saturated els. // we do not check saturated els.
let higher = el / 64u32; let higher = el / 64u32;
let lower = el % 64u32; let lower = el % 64u32;
self.len += if self.tinysets[higher as usize].insert_mut(lower) { self.len += u64::from(self.tinysets[higher as usize].insert_mut(lower));
1
} else {
0
};
} }
/// Inserts an element in the `BitSet` /// Inserts an element in the `BitSet`
@@ -272,11 +268,7 @@ impl BitSet {
// we do not check saturated els. // we do not check saturated els.
let higher = el / 64u32; let higher = el / 64u32;
let lower = el % 64u32; let lower = el % 64u32;
self.len -= if self.tinysets[higher as usize].remove_mut(lower) { self.len -= u64::from(self.tinysets[higher as usize].remove_mut(lower));
1
} else {
0
};
} }
/// Returns true iff the elements is in the `BitSet`. /// Returns true iff the elements is in the `BitSet`.

View File

@@ -161,8 +161,7 @@ impl FixedSize for u8 {
impl BinarySerializable for bool { impl BinarySerializable for bool {
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> { fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
let val = if *self { 1 } else { 0 }; writer.write_u8(u8::from(*self))
writer.write_u8(val)
} }
fn deserialize<R: Read>(reader: &mut R) -> io::Result<bool> { fn deserialize<R: Read>(reader: &mut R) -> io::Result<bool> {
let val = reader.read_u8()?; let val = reader.read_u8()?;

View File

@@ -3,6 +3,7 @@ use std::io::{self, Write};
use ownedbytes::OwnedBytes; use ownedbytes::OwnedBytes;
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker}; use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
use crate::column::EstimateColumn;
use crate::serialize::NormalizedHeader; use crate::serialize::NormalizedHeader;
use crate::{Column, FastFieldCodec, FastFieldCodecType}; use crate::{Column, FastFieldCodec, FastFieldCodecType};
@@ -68,16 +69,14 @@ impl FastFieldCodec for BitpackedCodec {
assert_eq!(column.min_value(), 0u64); assert_eq!(column.min_value(), 0u64);
let num_bits = compute_num_bits(column.max_value()); let num_bits = compute_num_bits(column.max_value());
let mut bit_packer = BitPacker::new(); let mut bit_packer = BitPacker::new();
let mut reader = column.reader(); for val in column.iter() {
while reader.advance() {
let val = reader.get();
bit_packer.write(val, num_bits, write)?; bit_packer.write(val, num_bits, write)?;
} }
bit_packer.close(write)?; bit_packer.close(write)?;
Ok(()) Ok(())
} }
fn estimate(column: &impl Column) -> Option<f32> { fn estimate(column: &EstimateColumn) -> Option<f32> {
let num_bits = compute_num_bits(column.max_value()); let num_bits = compute_num_bits(column.max_value());
let num_bits_uncompressed = 64; let num_bits_uncompressed = 64;
Some(num_bits as f32 / num_bits_uncompressed as f32) Some(num_bits as f32 / num_bits_uncompressed as f32)

View File

@@ -5,6 +5,7 @@ use common::{BinarySerializable, CountingWriter, DeserializeFrom};
use ownedbytes::OwnedBytes; use ownedbytes::OwnedBytes;
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker}; use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
use crate::column::EstimateColumn;
use crate::line::Line; use crate::line::Line;
use crate::serialize::NormalizedHeader; use crate::serialize::NormalizedHeader;
use crate::{Column, FastFieldCodec, FastFieldCodecType, VecColumn}; use crate::{Column, FastFieldCodec, FastFieldCodecType, VecColumn};
@@ -71,13 +72,11 @@ impl FastFieldCodec for BlockwiseLinearCodec {
} }
// Estimate first_chunk and extrapolate // Estimate first_chunk and extrapolate
fn estimate(column: &impl crate::Column) -> Option<f32> { fn estimate(column: &EstimateColumn) -> Option<f32> {
if column.num_vals() < 10 * CHUNK_SIZE as u64 { if column.num_vals() < 10 * CHUNK_SIZE as u64 {
return None; return None;
} }
let mut first_chunk: Vec<u64> = crate::iter_from_reader(column.reader()) let mut first_chunk: Vec<u64> = column.iter().take(CHUNK_SIZE as usize).collect();
.take(CHUNK_SIZE as usize)
.collect();
let line = Line::train(&VecColumn::from(&first_chunk)); let line = Line::train(&VecColumn::from(&first_chunk));
for (i, buffer_val) in first_chunk.iter_mut().enumerate() { for (i, buffer_val) in first_chunk.iter_mut().enumerate() {
let interpolated_val = line.eval(i as u64); let interpolated_val = line.eval(i as u64);
@@ -102,7 +101,7 @@ impl FastFieldCodec for BlockwiseLinearCodec {
Some(num_bits as f32 / num_bits_uncompressed as f32) Some(num_bits as f32 / num_bits_uncompressed as f32)
} }
fn serialize(column: &dyn crate::Column, wrt: &mut impl io::Write) -> io::Result<()> { fn serialize(column: &dyn Column, wrt: &mut impl io::Write) -> io::Result<()> {
// The BitpackedReader assumes a normalized vector. // The BitpackedReader assumes a normalized vector.
assert_eq!(column.min_value(), 0); assert_eq!(column.min_value(), 0);
let mut buffer = Vec::with_capacity(CHUNK_SIZE); let mut buffer = Vec::with_capacity(CHUNK_SIZE);
@@ -111,7 +110,7 @@ impl FastFieldCodec for BlockwiseLinearCodec {
let num_blocks = compute_num_blocks(num_vals); let num_blocks = compute_num_blocks(num_vals);
let mut blocks = Vec::with_capacity(num_blocks); let mut blocks = Vec::with_capacity(num_blocks);
let mut vals = crate::iter_from_reader(column.reader()); let mut vals = column.iter();
let mut bit_packer = BitPacker::new(); let mut bit_packer = BitPacker::new();

View File

@@ -3,13 +3,7 @@ use std::ops::RangeInclusive;
use tantivy_bitpacker::minmax; use tantivy_bitpacker::minmax;
pub trait Column<T: PartialOrd + Copy + 'static = u64>: Send + Sync { pub trait Column<T: PartialOrd = u64>: Send + Sync {
/// Return a `ColumnReader`.
fn reader(&self) -> Box<dyn ColumnReader<T> + '_> {
// Box::new(ColumnReaderAdapter { column: self, idx: 0, })
Box::new(ColumnReaderAdapter::from(self))
}
/// Return the value associated to the given idx. /// Return the value associated to the given idx.
/// ///
/// This accessor should return as fast as possible. /// This accessor should return as fast as possible.
@@ -17,8 +11,6 @@ pub trait Column<T: PartialOrd + Copy + 'static = u64>: Send + Sync {
/// # Panics /// # Panics
/// ///
/// May panic if `idx` is greater than the column length. /// May panic if `idx` is greater than the column length.
///
/// TODO remove to force people to use `.reader()`.
fn get_val(&self, idx: u64) -> T; fn get_val(&self, idx: u64) -> T;
/// Fills an output buffer with the fast field values /// Fills an output buffer with the fast field values
@@ -66,70 +58,10 @@ pub trait Column<T: PartialOrd + Copy + 'static = u64>: Send + Sync {
fn max_value(&self) -> T; fn max_value(&self) -> T;
fn num_vals(&self) -> u64; fn num_vals(&self) -> u64;
}
/// `ColumnReader` makes it possible to read forward through a column. /// Returns a iterator over the data
pub trait ColumnReader<T = u64> { fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = T> + 'a> {
/// Advance the reader to the target_idx. Box::new((0..self.num_vals()).map(|idx| self.get_val(idx)))
///
/// After a successful call to seek,
/// `.get()` should returns `column.get_val(target_idx)`.
fn seek(&mut self, target_idx: u64) -> T;
fn advance(&mut self) -> bool;
/// Get the current value without advancing the reader
fn get(&self) -> T;
}
pub fn iter_from_reader<'a, T: 'static>(
mut column_reader: Box<dyn ColumnReader<T> + 'a>,
) -> impl Iterator<Item = T> + 'a {
std::iter::from_fn(move || {
if !column_reader.advance() {
return None;
}
Some(column_reader.get())
})
}
pub(crate) struct ColumnReaderAdapter<'a, C: ?Sized, T> {
column: &'a C,
idx: u64,
len: u64,
_phantom: PhantomData<T>,
}
impl<'a, C: Column<T> + ?Sized, T: Copy + PartialOrd + 'static> From<&'a C>
for ColumnReaderAdapter<'a, C, T>
{
fn from(column: &'a C) -> Self {
ColumnReaderAdapter {
column,
idx: u64::MAX,
len: column.num_vals(),
_phantom: PhantomData,
}
}
}
impl<'a, T, C: ?Sized> ColumnReader<T> for ColumnReaderAdapter<'a, C, T>
where
C: Column<T>,
T: PartialOrd<T> + Copy + 'static,
{
fn seek(&mut self, idx: u64) -> T {
self.idx = idx;
self.get()
}
fn advance(&mut self) -> bool {
self.idx = self.idx.wrapping_add(1);
self.idx < self.len
}
fn get(&self) -> T {
self.column.get_val(self.idx)
} }
} }
@@ -139,9 +71,7 @@ pub struct VecColumn<'a, T = u64> {
max_value: T, max_value: T,
} }
impl<'a, C: Column<T>, T> Column<T> for &'a C impl<'a, C: Column<T>, T: Copy + PartialOrd> Column<T> for &'a C {
where T: Copy + PartialOrd + 'static
{
fn get_val(&self, idx: u64) -> T { fn get_val(&self, idx: u64) -> T {
(*self).get_val(idx) (*self).get_val(idx)
} }
@@ -158,8 +88,8 @@ where T: Copy + PartialOrd + 'static
(*self).num_vals() (*self).num_vals()
} }
fn reader(&self) -> Box<dyn ColumnReader<T> + '_> { fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = T> + 'b> {
(*self).reader() (*self).iter()
} }
fn get_range(&self, start: u64, output: &mut [T]) { fn get_range(&self, start: u64, output: &mut [T]) {
@@ -167,11 +97,15 @@ where T: Copy + PartialOrd + 'static
} }
} }
impl<'a, T: Copy + PartialOrd + Send + Sync + 'static> Column<T> for VecColumn<'a, T> { impl<'a, T: Copy + PartialOrd + Send + Sync> Column<T> for VecColumn<'a, T> {
fn get_val(&self, position: u64) -> T { fn get_val(&self, position: u64) -> T {
self.values[position as usize] self.values[position as usize]
} }
fn iter(&self) -> Box<dyn Iterator<Item = T> + '_> {
Box::new(self.values.iter().copied())
}
fn min_value(&self) -> T { fn min_value(&self) -> T {
self.min_value self.min_value
} }
@@ -203,6 +137,57 @@ where V: AsRef<[T]> + ?Sized
} }
} }
// Creates a view over a Column with a limited number of vals. Stats like min max are unchanged
pub struct EstimateColumn<'a> {
column: &'a dyn Column,
num_vals: u64,
}
impl<'a> EstimateColumn<'a> {
pub(crate) fn new(column: &'a dyn Column) -> Self {
let limit_num_vals = column.num_vals().min(100_000);
Self {
column,
num_vals: limit_num_vals,
}
}
}
impl<'a> Column for EstimateColumn<'a> {
fn get_val(&self, idx: u64) -> u64 {
(*self.column).get_val(idx)
}
fn min_value(&self) -> u64 {
(*self.column).min_value()
}
fn max_value(&self) -> u64 {
(*self.column).max_value()
}
fn num_vals(&self) -> u64 {
self.num_vals
}
fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = u64> + 'b> {
Box::new((*self.column).iter().take(self.num_vals as usize))
}
fn get_range(&self, start: u64, output: &mut [u64]) {
(*self.column).get_range(start, output)
}
}
impl<'a> From<&'a dyn Column> for EstimateColumn<'a> {
fn from(column: &'a dyn Column) -> Self {
let limit_num_vals = column.num_vals().min(100_000);
Self {
column,
num_vals: limit_num_vals,
}
}
}
struct MonotonicMappingColumn<C, T, Input> { struct MonotonicMappingColumn<C, T, Input> {
from_column: C, from_column: C,
monotonic_mapping: T, monotonic_mapping: T,
@@ -210,15 +195,15 @@ struct MonotonicMappingColumn<C, T, Input> {
} }
/// Creates a view of a column transformed by a monotonic mapping. /// Creates a view of a column transformed by a monotonic mapping.
pub fn monotonic_map_column<C, T, Input: PartialOrd + Copy, Output: PartialOrd + Copy>( pub fn monotonic_map_column<C, T, Input: PartialOrd, Output: PartialOrd>(
from_column: C, from_column: C,
monotonic_mapping: T, monotonic_mapping: T,
) -> impl Column<Output> ) -> impl Column<Output>
where where
C: Column<Input>, C: Column<Input>,
T: Fn(Input) -> Output + Send + Sync, T: Fn(Input) -> Output + Send + Sync,
Input: Send + Sync + 'static, Input: Send + Sync,
Output: Send + Sync + 'static, Output: Send + Sync,
{ {
MonotonicMappingColumn { MonotonicMappingColumn {
from_column, from_column,
@@ -227,13 +212,13 @@ where
} }
} }
impl<C, T, Input: PartialOrd + Copy, Output: PartialOrd + Copy> Column<Output> impl<C, T, Input: PartialOrd, Output: PartialOrd> Column<Output>
for MonotonicMappingColumn<C, T, Input> for MonotonicMappingColumn<C, T, Input>
where where
C: Column<Input>, C: Column<Input>,
T: Fn(Input) -> Output + Send + Sync, T: Fn(Input) -> Output + Send + Sync,
Input: Send + Sync + 'static, Input: Send + Sync,
Output: Send + Sync + 'static, Output: Send + Sync,
{ {
#[inline] #[inline]
fn get_val(&self, idx: u64) -> Output { fn get_val(&self, idx: u64) -> Output {
@@ -255,44 +240,14 @@ where
self.from_column.num_vals() self.from_column.num_vals()
} }
fn reader(&self) -> Box<dyn ColumnReader<Output> + '_> { fn iter(&self) -> Box<dyn Iterator<Item = Output> + '_> {
Box::new(MonotonicMappingColumnReader { Box::new(self.from_column.iter().map(&self.monotonic_mapping))
col_reader: self.from_column.reader(),
monotonic_mapping: &self.monotonic_mapping,
intermdiary_type: PhantomData,
})
} }
// We voluntarily do not implement get_range as it yields a regression, // We voluntarily do not implement get_range as it yields a regression,
// and we do not have any specialized implementation anyway. // and we do not have any specialized implementation anyway.
} }
struct MonotonicMappingColumnReader<'a, Transform, U> {
col_reader: Box<dyn ColumnReader<U> + 'a>,
monotonic_mapping: &'a Transform,
intermdiary_type: PhantomData<U>,
}
impl<'a, U, V, Transform> ColumnReader<V> for MonotonicMappingColumnReader<'a, Transform, U>
where
U: Copy,
V: Copy,
Transform: Fn(U) -> V,
{
fn seek(&mut self, idx: u64) -> V {
let intermediary_value = self.col_reader.seek(idx);
(*self.monotonic_mapping)(intermediary_value)
}
fn advance(&mut self) -> bool {
self.col_reader.advance()
}
fn get(&self) -> V {
(*self.monotonic_mapping)(self.col_reader.get())
}
}
pub struct IterColumn<T>(T); pub struct IterColumn<T>(T);
impl<T> From<T> for IterColumn<T> impl<T> From<T> for IterColumn<T>
@@ -306,7 +261,7 @@ where T: Iterator + Clone + ExactSizeIterator
impl<T> Column<T::Item> for IterColumn<T> impl<T> Column<T::Item> for IterColumn<T>
where where
T: Iterator + Clone + ExactSizeIterator + Send + Sync, T: Iterator + Clone + ExactSizeIterator + Send + Sync,
T::Item: PartialOrd + Copy + 'static, T::Item: PartialOrd,
{ {
fn get_val(&self, idx: u64) -> T::Item { fn get_val(&self, idx: u64) -> T::Item {
self.0.clone().nth(idx as usize).unwrap() self.0.clone().nth(idx as usize).unwrap()
@@ -323,6 +278,10 @@ where
fn num_vals(&self) -> u64 { fn num_vals(&self) -> u64 {
self.0.len() as u64 self.0.len() as u64
} }
fn iter(&self) -> Box<dyn Iterator<Item = T::Item> + '_> {
Box::new(self.0.clone())
}
} }
#[cfg(test)] #[cfg(test)]
@@ -355,7 +314,7 @@ mod tests {
let vals: Vec<u64> = (-1..99).map(i64::to_u64).collect(); let vals: Vec<u64> = (-1..99).map(i64::to_u64).collect();
let col = VecColumn::from(&vals); let col = VecColumn::from(&vals);
let mapped = monotonic_map_column(col, |el| i64::from_u64(el) * 10i64); let mapped = monotonic_map_column(col, |el| i64::from_u64(el) * 10i64);
let val_i64s: Vec<i64> = crate::iter_from_reader(mapped.reader()).collect(); let val_i64s: Vec<i64> = mapped.iter().collect();
for i in 0..100 { for i in 0..100 {
assert_eq!(val_i64s[i as usize], mapped.get_val(i)); assert_eq!(val_i64s[i as usize], mapped.get_val(i));
} }
@@ -369,7 +328,7 @@ mod tests {
assert_eq!(mapped.min_value(), -10i64); assert_eq!(mapped.min_value(), -10i64);
assert_eq!(mapped.max_value(), 980i64); assert_eq!(mapped.max_value(), 980i64);
assert_eq!(mapped.num_vals(), 100); assert_eq!(mapped.num_vals(), 100);
let val_i64s: Vec<i64> = crate::iter_from_reader(mapped.reader()).collect(); let val_i64s: Vec<i64> = mapped.iter().collect();
assert_eq!(val_i64s.len(), 100); assert_eq!(val_i64s.len(), 100);
for i in 0..100 { for i in 0..100 {
assert_eq!(val_i64s[i as usize], mapped.get_val(i)); assert_eq!(val_i64s[i as usize], mapped.get_val(i));

View File

@@ -22,7 +22,7 @@ use ownedbytes::OwnedBytes;
use tantivy_bitpacker::{self, BitPacker, BitUnpacker}; use tantivy_bitpacker::{self, BitPacker, BitUnpacker};
use crate::compact_space::build_compact_space::get_compact_space; use crate::compact_space::build_compact_space::get_compact_space;
use crate::{iter_from_reader, Column, ColumnReader}; use crate::Column;
mod blank_range; mod blank_range;
mod build_compact_space; mod build_compact_space;
@@ -173,14 +173,11 @@ impl CompactSpaceCompressor {
/// Taking the vals as Vec may cost a lot of memory. It is used to sort the vals. /// Taking the vals as Vec may cost a lot of memory. It is used to sort the vals.
pub fn train_from(column: &impl Column<u128>) -> Self { pub fn train_from(column: &impl Column<u128>) -> Self {
let mut values_sorted = BTreeSet::new(); let mut values_sorted = BTreeSet::new();
values_sorted.extend(column.iter());
let total_num_values = column.num_vals(); let total_num_values = column.num_vals();
values_sorted.extend(iter_from_reader(column.reader()));
let compact_space = let compact_space =
get_compact_space(&values_sorted, total_num_values, COST_PER_BLANK_IN_BITS); get_compact_space(&values_sorted, total_num_values, COST_PER_BLANK_IN_BITS);
let amplitude_compact_space = compact_space.amplitude_compact_space(); let amplitude_compact_space = compact_space.amplitude_compact_space();
assert!( assert!(
@@ -221,12 +218,11 @@ impl CompactSpaceCompressor {
pub fn compress_into( pub fn compress_into(
self, self,
mut vals: Box<dyn ColumnReader<u128> + '_>, vals: impl Iterator<Item = u128>,
write: &mut impl Write, write: &mut impl Write,
) -> io::Result<()> { ) -> io::Result<()> {
let mut bitpacker = BitPacker::default(); let mut bitpacker = BitPacker::default();
while vals.advance() { for val in vals {
let val = vals.get();
let compact = self let compact = self
.params .params
.compact_space .compact_space
@@ -304,13 +300,13 @@ impl Column<u128> for CompactSpaceDecompressor {
self.params.num_vals self.params.num_vals
} }
#[inline]
fn iter(&self) -> Box<dyn Iterator<Item = u128> + '_> {
Box::new(self.iter())
}
fn get_between_vals(&self, range: RangeInclusive<u128>) -> Vec<u64> { fn get_between_vals(&self, range: RangeInclusive<u128>) -> Vec<u64> {
self.get_between_vals(range) self.get_between_vals(range)
} }
fn reader(&self) -> Box<dyn ColumnReader<u128> + '_> {
Box::new(self.specialized_reader())
}
} }
impl CompactSpaceDecompressor { impl CompactSpaceDecompressor {
@@ -414,13 +410,18 @@ impl CompactSpaceDecompressor {
positions positions
} }
fn specialized_reader(&self) -> CompactSpaceReader<'_> { #[inline]
CompactSpaceReader { fn iter_compact(&self) -> impl Iterator<Item = u64> + '_ {
data: self.data.as_slice(), (0..self.params.num_vals)
params: &self.params, .map(move |idx| self.params.bit_unpacker.get(idx as u64, &self.data) as u64)
idx: 0u64, }
len: self.params.num_vals,
} #[inline]
fn iter(&self) -> impl Iterator<Item = u128> + '_ {
// TODO: Performance. It would be better to iterate on the ranges and check existence via
// the bit_unpacker.
self.iter_compact()
.map(|compact| self.compact_to_u128(compact))
} }
#[inline] #[inline]
@@ -438,30 +439,6 @@ impl CompactSpaceDecompressor {
} }
} }
pub struct CompactSpaceReader<'a> {
data: &'a [u8],
params: &'a IPCodecParams,
idx: u64,
len: u64,
}
impl<'a> ColumnReader<u128> for CompactSpaceReader<'a> {
fn seek(&mut self, target_idx: u64) -> u128 {
self.idx = target_idx;
self.get()
}
fn advance(&mut self) -> bool {
self.idx = self.idx.wrapping_add(1);
self.idx < self.len
}
fn get(&self) -> u128 {
let compact_code = self.params.bit_unpacker.get(self.idx, self.data);
self.params.compact_space.compact_to_u128(compact_code)
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {

View File

@@ -11,6 +11,7 @@ use std::io;
use std::io::Write; use std::io::Write;
use std::sync::Arc; use std::sync::Arc;
use column::EstimateColumn;
use common::BinarySerializable; use common::BinarySerializable;
use compact_space::CompactSpaceDecompressor; use compact_space::CompactSpaceDecompressor;
use ownedbytes::OwnedBytes; use ownedbytes::OwnedBytes;
@@ -29,7 +30,7 @@ mod serialize;
use self::bitpacked::BitpackedCodec; use self::bitpacked::BitpackedCodec;
use self::blockwise_linear::BlockwiseLinearCodec; use self::blockwise_linear::BlockwiseLinearCodec;
pub use self::column::{iter_from_reader, monotonic_map_column, Column, ColumnReader, VecColumn}; pub use self::column::{monotonic_map_column, Column, VecColumn};
use self::linear::LinearCodec; use self::linear::LinearCodec;
pub use self::monotonic_mapping::MonotonicallyMappableToU64; pub use self::monotonic_mapping::MonotonicallyMappableToU64;
pub use self::serialize::{ pub use self::serialize::{
@@ -123,7 +124,7 @@ trait FastFieldCodec: 'static {
/// ///
/// The column iterator should be preferred over using column `get_val` method for /// The column iterator should be preferred over using column `get_val` method for
/// performance reasons. /// performance reasons.
fn serialize(column: &dyn Column<u64>, write: &mut impl Write) -> io::Result<()>; fn serialize(column: &dyn Column, write: &mut impl Write) -> io::Result<()>;
/// Returns an estimate of the compression ratio. /// Returns an estimate of the compression ratio.
/// If the codec is not applicable, returns `None`. /// If the codec is not applicable, returns `None`.
@@ -132,7 +133,7 @@ trait FastFieldCodec: 'static {
/// ///
/// It could make sense to also return a value representing /// It could make sense to also return a value representing
/// computational complexity. /// computational complexity.
fn estimate(column: &impl Column) -> Option<f32>; fn estimate(column: &EstimateColumn) -> Option<f32>;
} }
pub const ALL_CODEC_TYPES: [FastFieldCodecType; 3] = [ pub const ALL_CODEC_TYPES: [FastFieldCodecType; 3] = [
@@ -149,6 +150,7 @@ mod tests {
use crate::bitpacked::BitpackedCodec; use crate::bitpacked::BitpackedCodec;
use crate::blockwise_linear::BlockwiseLinearCodec; use crate::blockwise_linear::BlockwiseLinearCodec;
use crate::column::EstimateColumn;
use crate::linear::LinearCodec; use crate::linear::LinearCodec;
use crate::serialize::Header; use crate::serialize::Header;
@@ -159,7 +161,9 @@ mod tests {
let col = &VecColumn::from(data); let col = &VecColumn::from(data);
let header = Header::compute_header(col, &[Codec::CODEC_TYPE])?; let header = Header::compute_header(col, &[Codec::CODEC_TYPE])?;
let normalized_col = header.normalize_column(col); let normalized_col = header.normalize_column(col);
let estimation = Codec::estimate(&normalized_col)?;
let limited_column = EstimateColumn::new(&normalized_col);
let estimation = Codec::estimate(&limited_column)?;
let mut out = Vec::new(); let mut out = Vec::new();
let col = VecColumn::from(data); let col = VecColumn::from(data);
@@ -280,14 +284,16 @@ mod tests {
let data = (10..=20000_u64).collect::<Vec<_>>(); let data = (10..=20000_u64).collect::<Vec<_>>();
let data: VecColumn = data.as_slice().into(); let data: VecColumn = data.as_slice().into();
let linear_interpol_estimation = LinearCodec::estimate(&data).unwrap(); let linear_interpol_estimation =
LinearCodec::estimate(&EstimateColumn::new(&data)).unwrap();
assert_le!(linear_interpol_estimation, 0.01); assert_le!(linear_interpol_estimation, 0.01);
let multi_linear_interpol_estimation = BlockwiseLinearCodec::estimate(&data).unwrap(); let multi_linear_interpol_estimation =
BlockwiseLinearCodec::estimate(&EstimateColumn::new(&data)).unwrap();
assert_le!(multi_linear_interpol_estimation, 0.2); assert_le!(multi_linear_interpol_estimation, 0.2);
assert_lt!(linear_interpol_estimation, multi_linear_interpol_estimation); assert_lt!(linear_interpol_estimation, multi_linear_interpol_estimation);
let bitpacked_estimation = BitpackedCodec::estimate(&data).unwrap(); let bitpacked_estimation = BitpackedCodec::estimate(&EstimateColumn::new(&data)).unwrap();
assert_lt!(linear_interpol_estimation, bitpacked_estimation); assert_lt!(linear_interpol_estimation, bitpacked_estimation);
} }
#[test] #[test]
@@ -295,18 +301,20 @@ mod tests {
let data: &[u64] = &[200, 10, 10, 10, 10, 1000, 20]; let data: &[u64] = &[200, 10, 10, 10, 10, 1000, 20];
let data: VecColumn = data.into(); let data: VecColumn = data.into();
let linear_interpol_estimation = LinearCodec::estimate(&data).unwrap(); let linear_interpol_estimation =
LinearCodec::estimate(&EstimateColumn::new(&data)).unwrap();
assert_le!(linear_interpol_estimation, 0.34); assert_le!(linear_interpol_estimation, 0.34);
let bitpacked_estimation = BitpackedCodec::estimate(&data).unwrap(); let bitpacked_estimation = BitpackedCodec::estimate(&EstimateColumn::new(&data)).unwrap();
assert_lt!(bitpacked_estimation, linear_interpol_estimation); assert_lt!(bitpacked_estimation, linear_interpol_estimation);
} }
#[test] #[test]
fn estimation_prefer_bitpacked() { fn estimation_prefer_bitpacked() {
let data = VecColumn::from(&[10, 10, 10, 10]); let data = VecColumn::from(&[10, 10, 10, 10]);
let linear_interpol_estimation = LinearCodec::estimate(&data).unwrap(); let linear_interpol_estimation =
let bitpacked_estimation = BitpackedCodec::estimate(&data).unwrap(); LinearCodec::estimate(&EstimateColumn::new(&data)).unwrap();
let bitpacked_estimation = BitpackedCodec::estimate(&EstimateColumn::new(&data)).unwrap();
assert_lt!(bitpacked_estimation, linear_interpol_estimation); assert_lt!(bitpacked_estimation, linear_interpol_estimation);
} }
@@ -318,10 +326,11 @@ mod tests {
// in this case the linear interpolation can't in fact not be worse than bitpacking, // in this case the linear interpolation can't in fact not be worse than bitpacking,
// but the estimator adds some threshold, which leads to estimated worse behavior // but the estimator adds some threshold, which leads to estimated worse behavior
let linear_interpol_estimation = LinearCodec::estimate(&data).unwrap(); let linear_interpol_estimation =
LinearCodec::estimate(&EstimateColumn::new(&data)).unwrap();
assert_le!(linear_interpol_estimation, 0.35); assert_le!(linear_interpol_estimation, 0.35);
let bitpacked_estimation = BitpackedCodec::estimate(&data).unwrap(); let bitpacked_estimation = BitpackedCodec::estimate(&EstimateColumn::new(&data)).unwrap();
assert_le!(bitpacked_estimation, 0.32); assert_le!(bitpacked_estimation, 0.32);
assert_le!(bitpacked_estimation, linear_interpol_estimation); assert_le!(bitpacked_estimation, linear_interpol_estimation);
} }

View File

@@ -67,25 +67,22 @@ impl Line {
self.intercept.wrapping_add(linear_part) self.intercept.wrapping_add(linear_part)
} }
// Same as train, but the intercept is only estimated from provided sample positions
pub fn estimate(ys: &dyn Column, sample_positions: &[u64]) -> Self {
Self::train_from(ys, sample_positions.iter().cloned())
}
// Intercept is only computed from provided positions // Intercept is only computed from provided positions
fn train_from(ys: &dyn Column, positions: impl Iterator<Item = u64>) -> Self { pub fn train_from(
let last_idx = if let Some(last_idx) = NonZeroU64::new(ys.num_vals() - 1) { ys: &dyn Column,
last_idx positions_and_values: impl Iterator<Item = (u64, u64)>,
) -> Self {
let num_vals = if let Some(num_vals) = NonZeroU64::new(ys.num_vals() - 1) {
num_vals
} else { } else {
return Line::default(); return Line::default();
}; };
let mut ys_reader = ys.reader(); let y0 = ys.get_val(0);
let y0 = ys_reader.seek(0); let y1 = ys.get_val(num_vals.get());
let y1 = ys_reader.seek(last_idx.get());
// We first independently pick our slope. // We first independently pick our slope.
let slope = compute_slope(y0, y1, last_idx); let slope = compute_slope(y0, y1, num_vals);
// We picked our slope. Note that it does not have to be perfect. // We picked our slope. Note that it does not have to be perfect.
// Now we need to compute the best intercept. // Now we need to compute the best intercept.
@@ -115,12 +112,8 @@ impl Line {
intercept: 0, intercept: 0,
}; };
let heuristic_shift = y0.wrapping_sub(MID_POINT); let heuristic_shift = y0.wrapping_sub(MID_POINT);
let mut ys_reader = ys.reader(); line.intercept = positions_and_values
line.intercept = positions .map(|(pos, y)| y.wrapping_sub(line.eval(pos)))
.map(|pos| {
let y = ys_reader.seek(pos);
y.wrapping_sub(line.eval(pos))
})
.min_by_key(|&val| val.wrapping_sub(heuristic_shift)) .min_by_key(|&val| val.wrapping_sub(heuristic_shift))
.unwrap_or(0u64); //< Never happens. .unwrap_or(0u64); //< Never happens.
line line
@@ -137,7 +130,10 @@ impl Line {
/// This function is only invariable by translation if all of the /// This function is only invariable by translation if all of the
/// `ys` are packaged into half of the space. (See heuristic below) /// `ys` are packaged into half of the space. (See heuristic below)
pub fn train(ys: &dyn Column) -> Self { pub fn train(ys: &dyn Column) -> Self {
Self::train_from(ys, 0..ys.num_vals()) Self::train_from(
ys,
ys.iter().enumerate().map(|(pos, val)| (pos as u64, val)),
)
} }
} }

View File

@@ -4,6 +4,7 @@ use common::BinarySerializable;
use ownedbytes::OwnedBytes; use ownedbytes::OwnedBytes;
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker}; use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
use crate::column::EstimateColumn;
use crate::line::Line; use crate::line::Line;
use crate::serialize::NormalizedHeader; use crate::serialize::NormalizedHeader;
use crate::{Column, FastFieldCodec, FastFieldCodecType}; use crate::{Column, FastFieldCodec, FastFieldCodecType};
@@ -89,7 +90,8 @@ impl FastFieldCodec for LinearCodec {
assert_eq!(column.min_value(), 0); assert_eq!(column.min_value(), 0);
let line = Line::train(column); let line = Line::train(column);
let max_offset_from_line = crate::iter_from_reader(column.reader()) let max_offset_from_line = column
.iter()
.enumerate() .enumerate()
.map(|(pos, actual_value)| { .map(|(pos, actual_value)| {
let calculated_value = line.eval(pos as u64); let calculated_value = line.eval(pos as u64);
@@ -106,12 +108,7 @@ impl FastFieldCodec for LinearCodec {
linear_params.serialize(write)?; linear_params.serialize(write)?;
let mut bit_packer = BitPacker::new(); let mut bit_packer = BitPacker::new();
let mut col_reader = column.reader(); for (pos, actual_value) in column.iter().enumerate() {
for pos in 0.. {
if !col_reader.advance() {
break;
}
let actual_value = col_reader.get();
let calculated_value = line.eval(pos as u64); let calculated_value = line.eval(pos as u64);
let offset = actual_value.wrapping_sub(calculated_value); let offset = actual_value.wrapping_sub(calculated_value);
bit_packer.write(offset, num_bits, write)?; bit_packer.write(offset, num_bits, write)?;
@@ -125,24 +122,23 @@ impl FastFieldCodec for LinearCodec {
/// where the local maxima for the deviation of the calculated value are and /// where the local maxima for the deviation of the calculated value are and
/// the offset to shift all values to >=0 is also unknown. /// the offset to shift all values to >=0 is also unknown.
#[allow(clippy::question_mark)] #[allow(clippy::question_mark)]
fn estimate(column: &impl Column) -> Option<f32> { fn estimate(column: &EstimateColumn) -> Option<f32> {
if column.num_vals() < 3 { if column.num_vals() < 3 {
return None; // disable compressor for this case return None; // disable compressor for this case
} }
// let's sample at 0%, 5%, 10% .. 95%, 100% // let's sample at 0%, 5%, 10% .. 95%, 100%
let num_vals = column.num_vals() as f32 / 100.0; let num_vals = column.num_vals() as f32 / 100.0;
let sample_positions = (0..20) let sample_positions_and_values = (0..20)
.map(|pos| (num_vals * pos as f32 * 5.0) as u64) .map(|pos| (num_vals * pos as f32 * 5.0) as u64)
.map(|pos| (pos, column.get_val(pos)))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let line = Line::estimate(column, &sample_positions); let line = { Line::train_from(column, sample_positions_and_values.iter().cloned()) };
let mut column_reader = column.reader(); let estimated_bit_width = sample_positions_and_values
let estimated_bit_width = sample_positions
.into_iter() .into_iter()
.map(|pos| { .map(|(pos, actual_value)| {
let actual_value = column_reader.seek(pos);
let interpolated_val = line.eval(pos as u64); let interpolated_val = line.eval(pos as u64);
actual_value.wrapping_sub(interpolated_val) actual_value.wrapping_sub(interpolated_val)
}) })

View File

@@ -36,11 +36,7 @@ impl MonotonicallyMappableToU64 for i64 {
impl MonotonicallyMappableToU64 for bool { impl MonotonicallyMappableToU64 for bool {
#[inline(always)] #[inline(always)]
fn to_u64(self) -> u64 { fn to_u64(self) -> u64 {
if self { u64::from(self)
1
} else {
0
}
} }
#[inline(always)] #[inline(always)]

View File

@@ -28,11 +28,12 @@ use ownedbytes::OwnedBytes;
use crate::bitpacked::BitpackedCodec; use crate::bitpacked::BitpackedCodec;
use crate::blockwise_linear::BlockwiseLinearCodec; use crate::blockwise_linear::BlockwiseLinearCodec;
use crate::column::EstimateColumn;
use crate::compact_space::CompactSpaceCompressor; use crate::compact_space::CompactSpaceCompressor;
use crate::linear::LinearCodec; use crate::linear::LinearCodec;
use crate::{ use crate::{
iter_from_reader, monotonic_map_column, Column, FastFieldCodec, FastFieldCodecType, monotonic_map_column, Column, FastFieldCodec, FastFieldCodecType, MonotonicallyMappableToU64,
MonotonicallyMappableToU64, VecColumn, ALL_CODEC_TYPES, VecColumn, ALL_CODEC_TYPES,
}; };
/// The normalized header gives some parameters after applying the following /// The normalized header gives some parameters after applying the following
@@ -79,9 +80,8 @@ impl Header {
let num_vals = column.num_vals(); let num_vals = column.num_vals();
let min_value = column.min_value(); let min_value = column.min_value();
let max_value = column.max_value(); let max_value = column.max_value();
let gcd = let gcd = crate::gcd::find_gcd(column.iter().map(|val| val - min_value))
crate::gcd::find_gcd(iter_from_reader(column.reader()).map(|val| val - min_value)) .filter(|gcd| gcd.get() > 1u64);
.filter(|gcd| gcd.get() > 1u64);
let divider = DividerU64::divide_by(gcd.map(|gcd| gcd.get()).unwrap_or(1u64)); let divider = DividerU64::divide_by(gcd.map(|gcd| gcd.get()).unwrap_or(1u64));
let shifted_column = monotonic_map_column(&column, |val| divider.divide(val - min_value)); let shifted_column = monotonic_map_column(&column, |val| divider.divide(val - min_value));
let codec_type = detect_codec(shifted_column, codecs)?; let codec_type = detect_codec(shifted_column, codecs)?;
@@ -126,23 +126,6 @@ impl BinarySerializable for Header {
} }
} }
pub fn estimate<T: MonotonicallyMappableToU64>(
typed_column: impl Column<T>,
codec_type: FastFieldCodecType,
) -> Option<f32> {
let column = monotonic_map_column(typed_column, T::to_u64);
let min_value = column.min_value();
let gcd = crate::gcd::find_gcd(iter_from_reader(column.reader()).map(|val| val - min_value))
.filter(|gcd| gcd.get() > 1u64);
let divider = DividerU64::divide_by(gcd.map(|gcd| gcd.get()).unwrap_or(1u64));
let normalized_column = monotonic_map_column(&column, |val| divider.divide(val - min_value));
match codec_type {
FastFieldCodecType::Bitpacked => BitpackedCodec::estimate(&normalized_column),
FastFieldCodecType::Linear => LinearCodec::estimate(&normalized_column),
FastFieldCodecType::BlockwiseLinear => BlockwiseLinearCodec::estimate(&normalized_column),
}
}
pub fn serialize_u128( pub fn serialize_u128(
typed_column: impl Column<u128>, typed_column: impl Column<u128>,
output: &mut impl io::Write, output: &mut impl io::Write,
@@ -150,7 +133,7 @@ pub fn serialize_u128(
// TODO write header, to later support more codecs // TODO write header, to later support more codecs
let compressor = CompactSpaceCompressor::train_from(&typed_column); let compressor = CompactSpaceCompressor::train_from(&typed_column);
compressor compressor
.compress_into(typed_column.reader(), output) .compress_into(typed_column.iter(), output)
.unwrap(); .unwrap();
Ok(()) Ok(())
@@ -178,10 +161,29 @@ pub fn serialize<T: MonotonicallyMappableToU64>(
Ok(()) Ok(())
} }
pub fn estimate<T: MonotonicallyMappableToU64>(
typed_column: impl Column<T>,
codec_type: FastFieldCodecType,
) -> Option<f32> {
let column = monotonic_map_column(typed_column, T::to_u64);
let min_value = column.min_value();
let gcd = crate::gcd::find_gcd(column.iter().map(|val| val - min_value))
.filter(|gcd| gcd.get() > 1u64);
let divider = DividerU64::divide_by(gcd.map(|gcd| gcd.get()).unwrap_or(1u64));
let normalized_column = monotonic_map_column(&column, |val| divider.divide(val - min_value));
let estimate_column = EstimateColumn::new(&normalized_column);
match codec_type {
FastFieldCodecType::Bitpacked => BitpackedCodec::estimate(&estimate_column),
FastFieldCodecType::Linear => LinearCodec::estimate(&estimate_column),
FastFieldCodecType::BlockwiseLinear => BlockwiseLinearCodec::estimate(&estimate_column),
}
}
fn detect_codec( fn detect_codec(
column: impl Column<u64>, column: impl Column<u64>,
codecs: &[FastFieldCodecType], codecs: &[FastFieldCodecType],
) -> Option<FastFieldCodecType> { ) -> Option<FastFieldCodecType> {
let column: EstimateColumn = EstimateColumn::new(&column);
let mut estimations = Vec::new(); let mut estimations = Vec::new();
for &codec in codecs { for &codec in codecs {
let estimation_opt = match codec { let estimation_opt = match codec {
@@ -241,8 +243,7 @@ mod tests {
#[test] #[test]
fn test_serialize_deserialize() { fn test_serialize_deserialize() {
let original = [1u64, 5u64, 10u64]; let original = [1u64, 5u64, 10u64];
let restored: Vec<u64> = let restored: Vec<u64> = serialize_and_load(&original[..]).iter().collect();
crate::iter_from_reader(serialize_and_load(&original[..]).reader()).collect();
assert_eq!(&restored, &original[..]); assert_eq!(&restored, &original[..]);
} }

View File

@@ -425,7 +425,7 @@ impl SegmentHistogramCollector {
let bucket = &mut self.buckets[bucket_pos]; let bucket = &mut self.buckets[bucket_pos];
bucket.doc_count += 1; bucket.doc_count += 1;
if let Some(sub_aggregation) = self.sub_aggregations.as_mut() { if let Some(sub_aggregation) = self.sub_aggregations.as_mut() {
(&mut sub_aggregation[bucket_pos]).collect(doc, bucket_with_accessor)?; sub_aggregation[bucket_pos].collect(doc, bucket_with_accessor)?;
} }
Ok(()) Ok(())
} }

View File

@@ -57,7 +57,7 @@ impl SegmentId {
/// Picking the first 8 chars is ok to identify /// Picking the first 8 chars is ok to identify
/// segments in a display message (e.g. a5c4dfcb). /// segments in a display message (e.g. a5c4dfcb).
pub fn short_uuid_string(&self) -> String { pub fn short_uuid_string(&self) -> String {
(&self.0.as_simple().to_string()[..8]).to_string() self.0.as_simple().to_string()[..8].to_string()
} }
/// Returns a segment uuid string. /// Returns a segment uuid string.

View File

@@ -472,6 +472,8 @@ mod tests {
// There are more tests in directory/mod.rs // There are more tests in directory/mod.rs
// The following tests are specific to the MmapDirectory // The following tests are specific to the MmapDirectory
use std::time::Duration;
use common::HasLen; use common::HasLen;
use super::*; use super::*;
@@ -610,7 +612,14 @@ mod tests {
mmap_directory.get_cache_info().mmapped.len() mmap_directory.get_cache_info().mmapped.len()
); );
} }
assert!(mmap_directory.get_cache_info().mmapped.is_empty()); // This test failed on CI. The last Mmap is dropped from the merging thread so there might
Ok(()) // be a race condition indeed.
for _ in 0..10 {
if mmap_directory.get_cache_info().mmapped.is_empty() {
return Ok(());
}
std::thread::sleep(Duration::from_millis(200));
}
panic!("The cache still contains information. One of the Mmap has not been dropped.");
} }
} }

View File

@@ -136,6 +136,20 @@ impl RamDirectory {
Self::default() Self::default()
} }
/// Deep clones the directory.
///
/// Ulterior writes on one of the copy
/// will not affect the other copy.
pub fn deep_clone(&self) -> RamDirectory {
let inner_clone = InnerDirectory {
fs: self.fs.read().unwrap().fs.clone(),
watch_router: Default::default(),
};
RamDirectory {
fs: Arc::new(RwLock::new(inner_clone)),
}
}
/// Returns the sum of the size of the different files /// Returns the sum of the size of the different files
/// in the [`RamDirectory`]. /// in the [`RamDirectory`].
pub fn total_mem_usage(&self) -> usize { pub fn total_mem_usage(&self) -> usize {
@@ -256,4 +270,23 @@ mod tests {
assert_eq!(directory_copy.atomic_read(path_atomic).unwrap(), msg_atomic); assert_eq!(directory_copy.atomic_read(path_atomic).unwrap(), msg_atomic);
assert_eq!(directory_copy.atomic_read(path_seq).unwrap(), msg_seq); assert_eq!(directory_copy.atomic_read(path_seq).unwrap(), msg_seq);
} }
#[test]
fn test_ram_directory_deep_clone() {
let dir = RamDirectory::default();
let test = Path::new("test");
let test2 = Path::new("test2");
dir.atomic_write(test, b"firstwrite").unwrap();
let dir_clone = dir.deep_clone();
assert_eq!(
dir_clone.atomic_read(test).unwrap(),
dir.atomic_read(test).unwrap()
);
dir.atomic_write(test, b"original").unwrap();
dir_clone.atomic_write(test, b"clone").unwrap();
dir_clone.atomic_write(test2, b"clone2").unwrap();
assert_eq!(dir.atomic_read(test).unwrap(), b"original");
assert_eq!(&dir_clone.atomic_read(test).unwrap(), b"clone");
assert_eq!(&dir_clone.atomic_read(test2).unwrap(), b"clone2");
}
} }

View File

@@ -41,7 +41,6 @@ mod error;
mod facet_reader; mod facet_reader;
mod multivalued; mod multivalued;
mod readers; mod readers;
mod remapped_column;
mod serializer; mod serializer;
mod writer; mod writer;
@@ -425,7 +424,7 @@ mod tests {
permutation permutation
} }
fn test_intfastfield_permutation_with_data(permutation: &[u64]) -> crate::Result<()> { fn test_intfastfield_permutation_with_data(permutation: Vec<u64>) -> crate::Result<()> {
let path = Path::new("test"); let path = Path::new("test");
let n = permutation.len(); let n = permutation.len();
let directory = RamDirectory::create(); let directory = RamDirectory::create();
@@ -433,7 +432,7 @@ mod tests {
let write: WritePtr = directory.open_write(Path::new("test"))?; let write: WritePtr = directory.open_write(Path::new("test"))?;
let mut serializer = CompositeFastFieldSerializer::from_write(write)?; let mut serializer = CompositeFastFieldSerializer::from_write(write)?;
let mut fast_field_writers = FastFieldsWriter::from_schema(&SCHEMA); let mut fast_field_writers = FastFieldsWriter::from_schema(&SCHEMA);
for &x in permutation { for &x in &permutation {
fast_field_writers.add_document(&doc!(*FIELD=>x)); fast_field_writers.add_document(&doc!(*FIELD=>x));
} }
fast_field_writers.serialize(&mut serializer, &HashMap::new(), None)?; fast_field_writers.serialize(&mut serializer, &HashMap::new(), None)?;
@@ -447,6 +446,7 @@ mod tests {
.unwrap() .unwrap()
.read_bytes()?; .read_bytes()?;
let fast_field_reader = open::<u64>(data)?; let fast_field_reader = open::<u64>(data)?;
for a in 0..n { for a in 0..n {
assert_eq!(fast_field_reader.get_val(a as u64), permutation[a as usize]); assert_eq!(fast_field_reader.get_val(a as u64), permutation[a as usize]);
} }
@@ -455,23 +455,16 @@ mod tests {
} }
#[test] #[test]
fn test_intfastfield_simple() -> crate::Result<()> { fn test_intfastfield_permutation_gcd() -> crate::Result<()> {
let permutation = &[1, 2, 3]; let permutation = generate_permutation_gcd();
test_intfastfield_permutation_with_data(&permutation[..])?; test_intfastfield_permutation_with_data(permutation)?;
Ok(()) Ok(())
} }
#[test] #[test]
fn test_intfastfield_permutation() -> crate::Result<()> { fn test_intfastfield_permutation() -> crate::Result<()> {
let permutation = generate_permutation(); let permutation = generate_permutation();
test_intfastfield_permutation_with_data(&permutation)?; test_intfastfield_permutation_with_data(permutation)?;
Ok(())
}
#[test]
fn test_intfastfield_permutation_gcd() -> crate::Result<()> {
let permutation = generate_permutation_gcd();
test_intfastfield_permutation_with_data(&permutation)?;
Ok(()) Ok(())
} }

View File

@@ -1,10 +1,9 @@
mod multivalue_start_index;
mod reader; mod reader;
mod writer; mod writer;
pub(crate) use self::multivalue_start_index::MultivalueStartIndex;
pub use self::reader::MultiValuedFastFieldReader; pub use self::reader::MultiValuedFastFieldReader;
pub use self::writer::MultiValuedFastFieldWriter; pub use self::writer::MultiValuedFastFieldWriter;
pub(crate) use self::writer::MultivalueStartIndex;
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
@@ -403,6 +402,74 @@ mod bench {
use crate::schema::{Cardinality, NumericOptions, Schema}; use crate::schema::{Cardinality, NumericOptions, Schema};
use crate::Document; use crate::Document;
fn bench_multi_value_ff_merge_opt(
num_docs: usize,
segments_every_n_docs: usize,
merge_policy: impl crate::indexer::MergePolicy + 'static,
) {
let mut builder = crate::schema::SchemaBuilder::new();
let fast_multi =
crate::schema::NumericOptions::default().set_fast(Cardinality::MultiValues);
let multi_field = builder.add_f64_field("f64s", fast_multi);
let index = crate::Index::create_in_ram(builder.build());
let mut writer = index.writer_for_tests().unwrap();
writer.set_merge_policy(Box::new(merge_policy));
for i in 0..num_docs {
let mut doc = crate::Document::new();
doc.add_f64(multi_field, 0.24);
doc.add_f64(multi_field, 0.27);
doc.add_f64(multi_field, 0.37);
if i % 3 == 0 {
doc.add_f64(multi_field, 0.44);
}
writer.add_document(doc).unwrap();
if i % segments_every_n_docs == 0 {
writer.commit().unwrap();
}
}
{
writer.wait_merging_threads().unwrap();
let mut writer = index.writer_for_tests().unwrap();
let segment_ids = index.searchable_segment_ids().unwrap();
writer.merge(&segment_ids).wait().unwrap();
}
// If a merging thread fails, we should end up with more
// than one segment here
assert_eq!(1, index.searchable_segments().unwrap().len());
}
#[bench]
fn bench_multi_value_ff_merge_many_segments(b: &mut Bencher) {
let num_docs = 100_000;
b.iter(|| {
bench_multi_value_ff_merge_opt(num_docs, 1_000, crate::indexer::NoMergePolicy);
});
}
#[bench]
fn bench_multi_value_ff_merge_many_segments_log_merge(b: &mut Bencher) {
let num_docs = 100_000;
b.iter(|| {
let merge_policy = crate::indexer::LogMergePolicy::default();
bench_multi_value_ff_merge_opt(num_docs, 1_000, merge_policy);
});
}
#[bench]
fn bench_multi_value_ff_merge_few_segments(b: &mut Bencher) {
let num_docs = 100_000;
b.iter(|| {
bench_multi_value_ff_merge_opt(num_docs, 33_000, crate::indexer::NoMergePolicy);
});
}
fn multi_values(num_docs: usize, vals_per_doc: usize) -> Vec<Vec<u64>> { fn multi_values(num_docs: usize, vals_per_doc: usize) -> Vec<Vec<u64>> {
let mut vals = vec![]; let mut vals = vec![];
for _i in 0..num_docs { for _i in 0..num_docs {

View File

@@ -1,195 +0,0 @@
use fastfield_codecs::{Column, ColumnReader};
use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::DocId;
pub(crate) struct MultivalueStartIndex<'a, C: Column> {
column: &'a C,
doc_id_map: &'a DocIdMapping,
min_value: u64,
max_value: u64,
}
struct MultivalueStartIndexReader<'a, C: Column> {
column: &'a C,
doc_id_map: &'a DocIdMapping,
idx: u64,
val: u64,
len: u64,
}
impl<'a, C: Column> MultivalueStartIndexReader<'a, C> {
fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
Self {
column,
doc_id_map,
idx: u64::MAX,
val: 0,
len: doc_id_map.num_new_doc_ids() as u64 + 1,
}
}
fn reset(&mut self) {
self.idx = u64::MAX;
self.val = 0;
}
}
impl<'a, C: Column> ColumnReader for MultivalueStartIndexReader<'a, C> {
fn seek(&mut self, idx: u64) -> u64 {
if self.idx > idx {
self.reset();
self.advance();
}
for _ in self.idx..idx {
self.advance();
}
self.get()
}
fn advance(&mut self) -> bool {
if self.idx == u64::MAX {
self.idx = 0;
self.val = 0;
return true;
}
let new_doc_id: DocId = self.idx as DocId;
self.idx += 1;
if self.idx >= self.len {
self.idx = self.len;
return false;
}
let old_doc: DocId = self.doc_id_map.get_old_doc_id(new_doc_id);
let num_vals_for_doc =
self.column.get_val(old_doc as u64 + 1) - self.column.get_val(old_doc as u64);
self.val += num_vals_for_doc;
true
}
fn get(&self) -> u64 {
self.val
}
}
impl<'a, C: Column> MultivalueStartIndex<'a, C> {
pub fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
assert_eq!(column.num_vals(), doc_id_map.num_old_doc_ids() as u64 + 1);
let iter = MultivalueStartIndexIter::new(column, doc_id_map);
let (min_value, max_value) = tantivy_bitpacker::minmax(iter).unwrap_or((0, 0));
MultivalueStartIndex {
column,
doc_id_map,
min_value,
max_value,
}
}
fn specialized_reader(&self) -> MultivalueStartIndexReader<'a, C> {
MultivalueStartIndexReader::new(self.column, self.doc_id_map)
}
}
impl<'a, C: Column> Column for MultivalueStartIndex<'a, C> {
fn reader(&self) -> Box<dyn ColumnReader + '_> {
Box::new(self.specialized_reader())
}
fn get_val(&self, idx: u64) -> u64 {
let mut reader = self.specialized_reader();
reader.seek(idx)
}
fn min_value(&self) -> u64 {
self.min_value
}
fn max_value(&self) -> u64 {
self.max_value
}
fn num_vals(&self) -> u64 {
(self.doc_id_map.num_new_doc_ids() + 1) as u64
}
}
struct MultivalueStartIndexIter<'a, C: Column> {
column: &'a C,
doc_id_map: &'a DocIdMapping,
new_doc_id: usize,
offset: u64,
}
impl<'a, C: Column> MultivalueStartIndexIter<'a, C> {
fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
Self {
column,
doc_id_map,
new_doc_id: 0,
offset: 0,
}
}
}
impl<'a, C: Column> Iterator for MultivalueStartIndexIter<'a, C> {
type Item = u64;
fn next(&mut self) -> Option<Self::Item> {
if self.new_doc_id > self.doc_id_map.num_new_doc_ids() {
return None;
}
let new_doc_id = self.new_doc_id;
self.new_doc_id += 1;
let start_offset = self.offset;
if new_doc_id < self.doc_id_map.num_new_doc_ids() {
let old_doc = self.doc_id_map.get_old_doc_id(new_doc_id as u32) as u64;
let num_vals_for_doc = self.column.get_val(old_doc + 1) - self.column.get_val(old_doc);
self.offset += num_vals_for_doc;
}
Some(start_offset)
}
}
#[cfg(test)]
mod tests {
use fastfield_codecs::VecColumn;
use super::*;
#[test]
fn test_multivalue_start_index() {
let doc_id_mapping = DocIdMapping::from_new_id_to_old_id(vec![4, 1, 2]);
assert_eq!(doc_id_mapping.num_old_doc_ids(), 5);
let col = VecColumn::from(&[0u64, 3, 5, 10, 12, 16][..]);
let multivalue_start_index = MultivalueStartIndex::new(
&col, // 3, 2, 5, 2, 4
&doc_id_mapping,
);
assert_eq!(multivalue_start_index.num_vals(), 4);
assert_eq!(
fastfield_codecs::iter_from_reader(multivalue_start_index.reader())
.collect::<Vec<u64>>(),
vec![0, 4, 6, 11]
); // 4, 2, 5
}
#[test]
fn test_multivalue_get_vals() {
let doc_id_mapping =
DocIdMapping::from_new_id_to_old_id(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
assert_eq!(doc_id_mapping.num_old_doc_ids(), 10);
let col = VecColumn::from(&[0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55][..]);
let multivalue_start_index = MultivalueStartIndex::new(&col, &doc_id_mapping);
assert_eq!(
fastfield_codecs::iter_from_reader(multivalue_start_index.reader())
.collect::<Vec<u64>>(),
vec![0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55]
);
assert_eq!(multivalue_start_index.num_vals(), 11);
let mut multivalue_start_index_reader = multivalue_start_index.reader();
assert_eq!(multivalue_start_index_reader.seek(3), 2);
assert_eq!(multivalue_start_index_reader.seek(5), 5);
assert_eq!(multivalue_start_index_reader.seek(8), 21);
assert_eq!(multivalue_start_index_reader.seek(4), 3);
assert_eq!(multivalue_start_index_reader.seek(0), 0);
assert_eq!(multivalue_start_index_reader.seek(10), 55);
}
}

View File

@@ -1,11 +1,10 @@
use std::io; use std::io;
use std::sync::Mutex;
use fastfield_codecs::{MonotonicallyMappableToU64, VecColumn}; use fastfield_codecs::{Column, MonotonicallyMappableToU64, VecColumn};
use fnv::FnvHashMap; use fnv::FnvHashMap;
use crate::fastfield::{ use crate::fastfield::{value_to_u64, CompositeFastFieldSerializer, FastFieldType};
value_to_u64, CompositeFastFieldSerializer, FastFieldType, MultivalueStartIndex,
};
use crate::indexer::doc_id_mapping::DocIdMapping; use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::postings::UnorderedTermId; use crate::postings::UnorderedTermId;
use crate::schema::{Document, Field, Value}; use crate::schema::{Document, Field, Value};
@@ -201,3 +200,155 @@ impl MultiValuedFastFieldWriter {
Ok(()) Ok(())
} }
} }
pub(crate) struct MultivalueStartIndex<'a, C: Column> {
column: &'a C,
doc_id_map: &'a DocIdMapping,
min_max_opt: Mutex<Option<(u64, u64)>>,
random_seeker: Mutex<MultivalueStartIndexRandomSeeker<'a, C>>,
}
struct MultivalueStartIndexRandomSeeker<'a, C: Column> {
seek_head: MultivalueStartIndexIter<'a, C>,
seek_next_id: u64,
}
impl<'a, C: Column> MultivalueStartIndexRandomSeeker<'a, C> {
fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
Self {
seek_head: MultivalueStartIndexIter {
column,
doc_id_map,
new_doc_id: 0,
offset: 0u64,
},
seek_next_id: 0u64,
}
}
}
impl<'a, C: Column> MultivalueStartIndex<'a, C> {
pub fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
assert_eq!(column.num_vals(), doc_id_map.num_old_doc_ids() as u64 + 1);
MultivalueStartIndex {
column,
doc_id_map,
min_max_opt: Mutex::default(),
random_seeker: Mutex::new(MultivalueStartIndexRandomSeeker::new(column, doc_id_map)),
}
}
fn minmax(&self) -> (u64, u64) {
if let Some((min, max)) = *self.min_max_opt.lock().unwrap() {
return (min, max);
}
let (min, max) = tantivy_bitpacker::minmax(self.iter()).unwrap_or((0u64, 0u64));
*self.min_max_opt.lock().unwrap() = Some((min, max));
(min, max)
}
}
impl<'a, C: Column> Column for MultivalueStartIndex<'a, C> {
fn get_val(&self, idx: u64) -> u64 {
let mut random_seeker_lock = self.random_seeker.lock().unwrap();
if random_seeker_lock.seek_next_id > idx {
*random_seeker_lock =
MultivalueStartIndexRandomSeeker::new(self.column, self.doc_id_map);
}
let to_skip = idx - random_seeker_lock.seek_next_id;
random_seeker_lock.seek_next_id = idx + 1;
random_seeker_lock.seek_head.nth(to_skip as usize).unwrap()
}
fn min_value(&self) -> u64 {
self.minmax().0
}
fn max_value(&self) -> u64 {
self.minmax().1
}
fn num_vals(&self) -> u64 {
(self.doc_id_map.num_new_doc_ids() + 1) as u64
}
fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = u64> + 'b> {
Box::new(MultivalueStartIndexIter::new(self.column, self.doc_id_map))
}
}
struct MultivalueStartIndexIter<'a, C: Column> {
pub column: &'a C,
pub doc_id_map: &'a DocIdMapping,
pub new_doc_id: usize,
pub offset: u64,
}
impl<'a, C: Column> MultivalueStartIndexIter<'a, C> {
fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
Self {
column,
doc_id_map,
new_doc_id: 0,
offset: 0,
}
}
}
impl<'a, C: Column> Iterator for MultivalueStartIndexIter<'a, C> {
type Item = u64;
fn next(&mut self) -> Option<Self::Item> {
if self.new_doc_id > self.doc_id_map.num_new_doc_ids() {
return None;
}
let new_doc_id = self.new_doc_id;
self.new_doc_id += 1;
let start_offset = self.offset;
if new_doc_id < self.doc_id_map.num_new_doc_ids() {
let old_doc = self.doc_id_map.get_old_doc_id(new_doc_id as u32) as u64;
let num_vals_for_doc = self.column.get_val(old_doc + 1) - self.column.get_val(old_doc);
self.offset += num_vals_for_doc;
}
Some(start_offset)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_multivalue_start_index() {
let doc_id_mapping = DocIdMapping::from_new_id_to_old_id(vec![4, 1, 2]);
assert_eq!(doc_id_mapping.num_old_doc_ids(), 5);
let col = VecColumn::from(&[0u64, 3, 5, 10, 12, 16][..]);
let multivalue_start_index = MultivalueStartIndex::new(
&col, // 3, 2, 5, 2, 4
&doc_id_mapping,
);
assert_eq!(multivalue_start_index.num_vals(), 4);
assert_eq!(
multivalue_start_index.iter().collect::<Vec<u64>>(),
vec![0, 4, 6, 11]
); // 4, 2, 5
}
#[test]
fn test_multivalue_get_vals() {
let doc_id_mapping =
DocIdMapping::from_new_id_to_old_id(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
assert_eq!(doc_id_mapping.num_old_doc_ids(), 10);
let col = VecColumn::from(&[0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55][..]);
let multivalue_start_index = MultivalueStartIndex::new(&col, &doc_id_mapping);
assert_eq!(
multivalue_start_index.iter().collect::<Vec<u64>>(),
vec![0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55]
);
assert_eq!(multivalue_start_index.num_vals(), 11);
assert_eq!(multivalue_start_index.get_val(3), 2);
assert_eq!(multivalue_start_index.get_val(5), 5);
assert_eq!(multivalue_start_index.get_val(8), 21);
assert_eq!(multivalue_start_index.get_val(4), 3);
assert_eq!(multivalue_start_index.get_val(0), 0);
assert_eq!(multivalue_start_index.get_val(10), 55);
}
}

View File

@@ -1,112 +0,0 @@
use fastfield_codecs::{Column, ColumnReader};
use tantivy_bitpacker::BlockedBitpacker;
use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::DocId;
#[derive(Clone)]
pub(crate) struct WriterFastFieldColumn<'map, 'bitp> {
pub(crate) doc_id_mapping_opt: Option<&'map DocIdMapping>,
pub(crate) vals: &'bitp BlockedBitpacker,
pub(crate) min_value: u64,
pub(crate) max_value: u64,
pub(crate) num_vals: u64,
}
impl<'map, 'bitp> Column for WriterFastFieldColumn<'map, 'bitp> {
/// Return the value associated to the given doc.
///
/// Whenever possible use the Iterator passed to the fastfield creation instead, for performance
/// reasons.
///
/// # Panics
///
/// May panic if `doc` is greater than the index.
fn get_val(&self, doc: u64) -> u64 {
if let Some(doc_id_map) = self.doc_id_mapping_opt {
self.vals
.get(doc_id_map.get_old_doc_id(doc as u32) as usize) // consider extra
// FastFieldReader wrapper for
// non doc_id_map
} else {
self.vals.get(doc as usize)
}
}
fn reader(&self) -> Box<dyn ColumnReader + '_> {
if let Some(doc_id_mapping) = self.doc_id_mapping_opt {
Box::new(RemappedColumnReader {
doc_id_mapping,
vals: self.vals,
idx: u64::MAX,
len: doc_id_mapping.num_new_doc_ids() as u64,
})
} else {
Box::new(BitpackedColumnReader {
vals: self.vals,
idx: u64::MAX,
len: self.num_vals,
})
}
}
fn min_value(&self) -> u64 {
self.min_value
}
fn max_value(&self) -> u64 {
self.max_value
}
fn num_vals(&self) -> u64 {
self.num_vals
}
}
struct RemappedColumnReader<'a> {
doc_id_mapping: &'a DocIdMapping,
vals: &'a BlockedBitpacker,
idx: u64,
len: u64,
}
impl<'a> ColumnReader for RemappedColumnReader<'a> {
fn seek(&mut self, target_idx: u64) -> u64 {
assert!(target_idx < self.len);
self.idx = target_idx;
self.get()
}
fn advance(&mut self) -> bool {
self.idx = self.idx.wrapping_add(1);
self.idx < self.len
}
fn get(&self) -> u64 {
let old_doc_id: DocId = self.doc_id_mapping.get_old_doc_id(self.idx as DocId);
self.vals.get(old_doc_id as usize)
}
}
struct BitpackedColumnReader<'a> {
vals: &'a BlockedBitpacker,
idx: u64,
len: u64,
}
impl<'a> ColumnReader for BitpackedColumnReader<'a> {
fn seek(&mut self, target_idx: u64) -> u64 {
assert!(target_idx < self.len);
self.idx = target_idx;
self.get()
}
fn advance(&mut self) -> bool {
self.idx = self.idx.wrapping_add(1);
self.idx < self.len
}
fn get(&self) -> u64 {
self.vals.get(self.idx as usize)
}
}

View File

@@ -2,13 +2,12 @@ use std::collections::HashMap;
use std::io; use std::io;
use common; use common;
use fastfield_codecs::MonotonicallyMappableToU64; use fastfield_codecs::{Column, MonotonicallyMappableToU64};
use fnv::FnvHashMap; use fnv::FnvHashMap;
use tantivy_bitpacker::BlockedBitpacker; use tantivy_bitpacker::BlockedBitpacker;
use super::multivalued::MultiValuedFastFieldWriter; use super::multivalued::MultiValuedFastFieldWriter;
use super::FastFieldType; use super::FastFieldType;
use crate::fastfield::remapped_column::WriterFastFieldColumn;
use crate::fastfield::{BytesFastFieldWriter, CompositeFastFieldSerializer}; use crate::fastfield::{BytesFastFieldWriter, CompositeFastFieldSerializer};
use crate::indexer::doc_id_mapping::DocIdMapping; use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::postings::UnorderedTermId; use crate::postings::UnorderedTermId;
@@ -352,7 +351,7 @@ impl IntFastFieldWriter {
pub fn serialize( pub fn serialize(
&self, &self,
serializer: &mut CompositeFastFieldSerializer, serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping_opt: Option<&DocIdMapping>, doc_id_map: Option<&DocIdMapping>,
) -> io::Result<()> { ) -> io::Result<()> {
let (min, max) = if self.val_min > self.val_max { let (min, max) = if self.val_min > self.val_max {
(0, 0) (0, 0)
@@ -360,8 +359,8 @@ impl IntFastFieldWriter {
(self.val_min, self.val_max) (self.val_min, self.val_max)
}; };
let fastfield_accessor = WriterFastFieldColumn { let fastfield_accessor = WriterFastFieldAccessProvider {
doc_id_mapping_opt, doc_id_map,
vals: &self.vals, vals: &self.vals,
min_value: min, min_value: min,
max_value: max, max_value: max,
@@ -373,3 +372,57 @@ impl IntFastFieldWriter {
Ok(()) Ok(())
} }
} }
#[derive(Clone)]
struct WriterFastFieldAccessProvider<'map, 'bitp> {
doc_id_map: Option<&'map DocIdMapping>,
vals: &'bitp BlockedBitpacker,
min_value: u64,
max_value: u64,
num_vals: u64,
}
impl<'map, 'bitp> Column for WriterFastFieldAccessProvider<'map, 'bitp> {
/// Return the value associated to the given doc.
///
/// Whenever possible use the Iterator passed to the fastfield creation instead, for performance
/// reasons.
///
/// # Panics
///
/// May panic if `doc` is greater than the index.
fn get_val(&self, doc: u64) -> u64 {
if let Some(doc_id_map) = self.doc_id_map {
self.vals
.get(doc_id_map.get_old_doc_id(doc as u32) as usize) // consider extra
// FastFieldReader wrapper for
// non doc_id_map
} else {
self.vals.get(doc as usize)
}
}
fn iter(&self) -> Box<dyn Iterator<Item = u64> + '_> {
if let Some(doc_id_map) = self.doc_id_map {
Box::new(
doc_id_map
.iter_old_doc_ids()
.map(|doc_id| self.vals.get(doc_id as usize)),
)
} else {
Box::new(self.vals.iter())
}
}
fn min_value(&self) -> u64 {
self.min_value
}
fn max_value(&self) -> u64 {
self.max_value
}
fn num_vals(&self) -> u64 {
self.num_vals
}
}

View File

@@ -246,18 +246,27 @@ impl DeleteCursor {
mod tests { mod tests {
use super::{DeleteOperation, DeleteQueue}; use super::{DeleteOperation, DeleteQueue};
use crate::schema::{Field, Term}; use crate::query::{Explanation, Scorer, Weight};
use crate::{DocId, Score, SegmentReader};
struct DummyWeight;
impl Weight for DummyWeight {
fn scorer(&self, _reader: &SegmentReader, _boost: Score) -> crate::Result<Box<dyn Scorer>> {
Err(crate::TantivyError::InternalError("dummy impl".to_owned()))
}
fn explain(&self, _reader: &SegmentReader, _doc: DocId) -> crate::Result<Explanation> {
Err(crate::TantivyError::InternalError("dummy impl".to_owned()))
}
}
#[test] #[test]
fn test_deletequeue() { fn test_deletequeue() {
let delete_queue = DeleteQueue::new(); let delete_queue = DeleteQueue::new();
let make_op = |i: usize| { let make_op = |i: usize| DeleteOperation {
let field = Field::from_field_id(1u32); opstamp: i as u64,
DeleteOperation { target: Box::new(DummyWeight),
opstamp: i as u64,
term: Term::from_field_u64(field, i as u64),
}
}; };
delete_queue.push(make_op(1)); delete_queue.push(make_op(1));

View File

@@ -11,7 +11,6 @@ use super::segment_updater::SegmentUpdater;
use super::{AddBatch, AddBatchReceiver, AddBatchSender, PreparedCommit}; use super::{AddBatch, AddBatchReceiver, AddBatchSender, PreparedCommit};
use crate::core::{Index, Segment, SegmentComponent, SegmentId, SegmentMeta, SegmentReader}; use crate::core::{Index, Segment, SegmentComponent, SegmentId, SegmentMeta, SegmentReader};
use crate::directory::{DirectoryLock, GarbageCollectionResult, TerminatingWrite}; use crate::directory::{DirectoryLock, GarbageCollectionResult, TerminatingWrite};
use crate::docset::{DocSet, TERMINATED};
use crate::error::TantivyError; use crate::error::TantivyError;
use crate::fastfield::write_alive_bitset; use crate::fastfield::write_alive_bitset;
use crate::indexer::delete_queue::{DeleteCursor, DeleteQueue}; use crate::indexer::delete_queue::{DeleteCursor, DeleteQueue};
@@ -20,8 +19,9 @@ use crate::indexer::index_writer_status::IndexWriterStatus;
use crate::indexer::operation::DeleteOperation; use crate::indexer::operation::DeleteOperation;
use crate::indexer::stamper::Stamper; use crate::indexer::stamper::Stamper;
use crate::indexer::{MergePolicy, SegmentEntry, SegmentWriter}; use crate::indexer::{MergePolicy, SegmentEntry, SegmentWriter};
use crate::query::{Query, TermQuery};
use crate::schema::{Document, IndexRecordOption, Term}; use crate::schema::{Document, IndexRecordOption, Term};
use crate::{FutureResult, Opstamp}; use crate::{FutureResult, IndexReader, Opstamp};
// Size of the margin for the `memory_arena`. A segment is closed when the remaining memory // Size of the margin for the `memory_arena`. A segment is closed when the remaining memory
// in the `memory_arena` goes below MARGIN_IN_BYTES. // in the `memory_arena` goes below MARGIN_IN_BYTES.
@@ -57,6 +57,7 @@ pub struct IndexWriter {
_directory_lock: Option<DirectoryLock>, _directory_lock: Option<DirectoryLock>,
index: Index, index: Index,
index_reader: IndexReader,
memory_arena_in_bytes_per_thread: usize, memory_arena_in_bytes_per_thread: usize,
@@ -92,19 +93,14 @@ fn compute_deleted_bitset(
// A delete operation should only affect // A delete operation should only affect
// document that were inserted before it. // document that were inserted before it.
let inverted_index = segment_reader.inverted_index(delete_op.term.field())?; delete_op
if let Some(mut docset) = .target
inverted_index.read_postings(&delete_op.term, IndexRecordOption::Basic)? .for_each(segment_reader, &mut |doc_matching_delete_query, _| {
{ if doc_opstamps.is_deleted(doc_matching_delete_query, delete_op.opstamp) {
let mut doc_matching_deleted_term = docset.doc(); alive_bitset.remove(doc_matching_delete_query);
while doc_matching_deleted_term != TERMINATED {
if doc_opstamps.is_deleted(doc_matching_deleted_term, delete_op.opstamp) {
alive_bitset.remove(doc_matching_deleted_term);
might_have_changed = true; might_have_changed = true;
} }
doc_matching_deleted_term = docset.advance(); })?;
}
}
delete_cursor.advance(); delete_cursor.advance();
} }
Ok(might_have_changed) Ok(might_have_changed)
@@ -302,6 +298,7 @@ impl IndexWriter {
memory_arena_in_bytes_per_thread, memory_arena_in_bytes_per_thread,
index: index.clone(), index: index.clone(),
index_reader: index.reader()?,
index_writer_status: IndexWriterStatus::from(document_receiver), index_writer_status: IndexWriterStatus::from(document_receiver),
operation_sender: document_sender, operation_sender: document_sender,
@@ -666,10 +663,33 @@ impl IndexWriter {
/// Like adds, the deletion itself will be visible /// Like adds, the deletion itself will be visible
/// only after calling `commit()`. /// only after calling `commit()`.
pub fn delete_term(&self, term: Term) -> Opstamp { pub fn delete_term(&self, term: Term) -> Opstamp {
let query = TermQuery::new(term, IndexRecordOption::Basic);
// For backward compatibility, if Term is invalid for the index, do nothing but return an
// Opstamp
self.delete_query(Box::new(query))
.unwrap_or_else(|_| self.stamper.stamp())
}
/// Delete all documents matching a given query.
/// Returns an `Err` if the query can't be executed.
///
/// Delete operation only affects documents that
/// were added in previous commits, and documents
/// that were added previously in the same commit.
///
/// Like adds, the deletion itself will be visible
/// only after calling `commit()`.
#[doc(hidden)]
pub fn delete_query(&self, query: Box<dyn Query>) -> crate::Result<Opstamp> {
let weight = query.weight(&self.index_reader.searcher(), false)?;
let opstamp = self.stamper.stamp(); let opstamp = self.stamper.stamp();
let delete_operation = DeleteOperation { opstamp, term }; let delete_operation = DeleteOperation {
opstamp,
target: weight,
};
self.delete_queue.push(delete_operation); self.delete_queue.push(delete_operation);
opstamp Ok(opstamp)
} }
/// Returns the opstamp of the last successful commit. /// Returns the opstamp of the last successful commit.
@@ -738,10 +758,17 @@ impl IndexWriter {
let (batch_opstamp, stamps) = self.get_batch_opstamps(count); let (batch_opstamp, stamps) = self.get_batch_opstamps(count);
let mut adds = AddBatch::default(); let mut adds = AddBatch::default();
for (user_op, opstamp) in user_operations_it.zip(stamps) { for (user_op, opstamp) in user_operations_it.zip(stamps) {
match user_op { match user_op {
UserOperation::Delete(term) => { UserOperation::Delete(term) => {
let delete_operation = DeleteOperation { opstamp, term }; let query = TermQuery::new(term, IndexRecordOption::Basic);
let weight = query.weight(&self.index_reader.searcher(), false)?;
let delete_operation = DeleteOperation {
opstamp,
target: weight,
};
self.delete_queue.push(delete_operation); self.delete_queue.push(delete_operation);
} }
UserOperation::Add(document) => { UserOperation::Add(document) => {
@@ -786,7 +813,7 @@ mod tests {
use crate::directory::error::LockError; use crate::directory::error::LockError;
use crate::error::*; use crate::error::*;
use crate::indexer::NoMergePolicy; use crate::indexer::NoMergePolicy;
use crate::query::{QueryParser, TermQuery}; use crate::query::{BooleanQuery, Occur, Query, QueryParser, TermQuery};
use crate::schema::{ use crate::schema::{
self, Cardinality, Facet, FacetOptions, IndexRecordOption, NumericOptions, self, Cardinality, Facet, FacetOptions, IndexRecordOption, NumericOptions,
TextFieldIndexing, TextOptions, FAST, INDEXED, STORED, STRING, TEXT, TextFieldIndexing, TextOptions, FAST, INDEXED, STORED, STRING, TEXT,
@@ -1418,10 +1445,72 @@ mod tests {
Ok(()) Ok(())
} }
#[test]
fn test_delete_query_with_sort_by_field() -> crate::Result<()> {
let mut schema_builder = schema::Schema::builder();
let id_field =
schema_builder.add_u64_field("id", schema::INDEXED | schema::STORED | schema::FAST);
let schema = schema_builder.build();
let settings = IndexSettings {
sort_by_field: Some(IndexSortByField {
field: "id".to_string(),
order: Order::Desc,
}),
..Default::default()
};
let index = Index::builder()
.schema(schema)
.settings(settings)
.create_in_ram()?;
let index_reader = index.reader()?;
let mut index_writer = index.writer_for_tests()?;
// create and delete docs in same commit
for id in 0u64..5u64 {
index_writer.add_document(doc!(id_field => id))?;
}
for id in 1u64..4u64 {
let term = Term::from_field_u64(id_field, id);
let not_term = Term::from_field_u64(id_field, 2);
let term = Box::new(TermQuery::new(term, Default::default()));
let not_term = Box::new(TermQuery::new(not_term, Default::default()));
let query: BooleanQuery = vec![
(Occur::Must, term as Box<dyn Query>),
(Occur::MustNot, not_term as Box<dyn Query>),
]
.into();
index_writer.delete_query(Box::new(query))?;
}
for id in 5u64..10u64 {
index_writer.add_document(doc!(id_field => id))?;
}
index_writer.commit()?;
index_reader.reload()?;
let searcher = index_reader.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
let segment_reader = searcher.segment_reader(0);
assert_eq!(segment_reader.num_docs(), 8);
assert_eq!(segment_reader.max_doc(), 10);
let fast_field_reader = segment_reader.fast_fields().u64(id_field)?;
let in_order_alive_ids: Vec<u64> = segment_reader
.doc_ids_alive()
.map(|doc| fast_field_reader.get_val(doc as u64))
.collect();
assert_eq!(&in_order_alive_ids[..], &[9, 8, 7, 6, 5, 4, 2, 0]);
Ok(())
}
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
enum IndexingOp { enum IndexingOp {
AddDoc { id: u64 }, AddDoc { id: u64 },
DeleteDoc { id: u64 }, DeleteDoc { id: u64 },
DeleteDocQuery { id: u64 },
Commit, Commit,
Merge, Merge,
} }
@@ -1429,6 +1518,7 @@ mod tests {
fn balanced_operation_strategy() -> impl Strategy<Value = IndexingOp> { fn balanced_operation_strategy() -> impl Strategy<Value = IndexingOp> {
prop_oneof![ prop_oneof![
(0u64..20u64).prop_map(|id| IndexingOp::DeleteDoc { id }), (0u64..20u64).prop_map(|id| IndexingOp::DeleteDoc { id }),
(0u64..20u64).prop_map(|id| IndexingOp::DeleteDocQuery { id }),
(0u64..20u64).prop_map(|id| IndexingOp::AddDoc { id }), (0u64..20u64).prop_map(|id| IndexingOp::AddDoc { id }),
(0u64..1u64).prop_map(|_| IndexingOp::Commit), (0u64..1u64).prop_map(|_| IndexingOp::Commit),
(0u64..1u64).prop_map(|_| IndexingOp::Merge), (0u64..1u64).prop_map(|_| IndexingOp::Merge),
@@ -1437,7 +1527,8 @@ mod tests {
fn adding_operation_strategy() -> impl Strategy<Value = IndexingOp> { fn adding_operation_strategy() -> impl Strategy<Value = IndexingOp> {
prop_oneof![ prop_oneof![
10 => (0u64..100u64).prop_map(|id| IndexingOp::DeleteDoc { id }), 5 => (0u64..100u64).prop_map(|id| IndexingOp::DeleteDoc { id }),
5 => (0u64..100u64).prop_map(|id| IndexingOp::DeleteDocQuery { id }),
50 => (0u64..100u64).prop_map(|id| IndexingOp::AddDoc { id }), 50 => (0u64..100u64).prop_map(|id| IndexingOp::AddDoc { id }),
2 => (0u64..1u64).prop_map(|_| IndexingOp::Commit), 2 => (0u64..1u64).prop_map(|_| IndexingOp::Commit),
1 => (0u64..1u64).prop_map(|_| IndexingOp::Merge), 1 => (0u64..1u64).prop_map(|_| IndexingOp::Merge),
@@ -1457,6 +1548,10 @@ mod tests {
existing_ids.remove(&id); existing_ids.remove(&id);
deleted_ids.insert(id); deleted_ids.insert(id);
} }
IndexingOp::DeleteDocQuery { id } => {
existing_ids.remove(&id);
deleted_ids.insert(id);
}
_ => {} _ => {}
} }
} }
@@ -1539,6 +1634,11 @@ mod tests {
IndexingOp::DeleteDoc { id } => { IndexingOp::DeleteDoc { id } => {
index_writer.delete_term(Term::from_field_u64(id_field, id)); index_writer.delete_term(Term::from_field_u64(id_field, id));
} }
IndexingOp::DeleteDocQuery { id } => {
let term = Term::from_field_u64(id_field, id);
let query = TermQuery::new(term, Default::default());
index_writer.delete_query(Box::new(query))?;
}
IndexingOp::Commit => { IndexingOp::Commit => {
index_writer.commit()?; index_writer.commit()?;
} }

View File

@@ -1,20 +1,11 @@
use crate::query::Weight;
use crate::schema::{Document, Term}; use crate::schema::{Document, Term};
use crate::Opstamp; use crate::Opstamp;
/// Timestamped Delete operation. /// Timestamped Delete operation.
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct DeleteOperation { pub struct DeleteOperation {
pub opstamp: Opstamp, pub opstamp: Opstamp,
pub term: Term, pub target: Box<dyn Weight>,
}
impl Default for DeleteOperation {
fn default() -> Self {
DeleteOperation {
opstamp: 0u64,
term: Term::new(),
}
}
} }
/// Timestamped Add operation. /// Timestamped Add operation.

View File

@@ -1,11 +1,11 @@
use std::sync::Arc; use std::sync::Arc;
use fastfield_codecs::{Column, ColumnReader}; use fastfield_codecs::Column;
use itertools::Itertools; use itertools::Itertools;
use crate::indexer::doc_id_mapping::SegmentDocIdMapping; use crate::indexer::doc_id_mapping::SegmentDocIdMapping;
use crate::schema::Field; use crate::schema::Field;
use crate::{DocAddress, DocId, SegmentReader}; use crate::{DocAddress, SegmentReader};
pub(crate) struct SortedDocIdColumn<'a> { pub(crate) struct SortedDocIdColumn<'a> {
doc_id_mapping: &'a SegmentDocIdMapping, doc_id_mapping: &'a SegmentDocIdMapping,
@@ -87,14 +87,17 @@ impl<'a> Column for SortedDocIdColumn<'a> {
self.fast_field_readers[segment_ord as usize].get_val(doc_id as u64) self.fast_field_readers[segment_ord as usize].get_val(doc_id as u64)
} }
fn reader(&self) -> Box<dyn ColumnReader<u64> + '_> { fn iter(&self) -> Box<dyn Iterator<Item = u64> + '_> {
Box::new(SortedDocIdColumnReader { Box::new(
doc_id_mapping: self.doc_id_mapping, self.doc_id_mapping
fast_field_readers: &self.fast_field_readers[..], .iter_old_doc_addrs()
new_doc_id: u32::MAX, .map(|old_doc_addr| {
}) let fast_field_reader =
&self.fast_field_readers[old_doc_addr.segment_ord as usize];
fast_field_reader.get_val(old_doc_addr.doc_id as u64)
}),
)
} }
fn min_value(&self) -> u64 { fn min_value(&self) -> u64 {
self.min_value self.min_value
} }
@@ -107,27 +110,3 @@ impl<'a> Column for SortedDocIdColumn<'a> {
self.num_vals self.num_vals
} }
} }
struct SortedDocIdColumnReader<'a> {
doc_id_mapping: &'a SegmentDocIdMapping,
fast_field_readers: &'a [Arc<dyn Column>],
new_doc_id: DocId,
}
impl<'a> ColumnReader for SortedDocIdColumnReader<'a> {
fn seek(&mut self, target_idx: u64) -> u64 {
assert!(target_idx < self.doc_id_mapping.len() as u64);
self.new_doc_id = target_idx as u32;
self.get()
}
fn advance(&mut self) -> bool {
self.new_doc_id = self.new_doc_id.wrapping_add(1);
self.new_doc_id < self.doc_id_mapping.len() as u32
}
fn get(&self) -> u64 {
let old_doc = self.doc_id_mapping.get_old_doc_addr(self.new_doc_id);
self.fast_field_readers[old_doc.segment_ord as usize].get_val(old_doc.doc_id as u64)
}
}

View File

@@ -1,6 +1,6 @@
use std::cmp; use std::cmp;
use fastfield_codecs::{Column, ColumnReader}; use fastfield_codecs::Column;
use crate::fastfield::{MultiValueLength, MultiValuedFastFieldReader}; use crate::fastfield::{MultiValueLength, MultiValuedFastFieldReader};
use crate::indexer::doc_id_mapping::SegmentDocIdMapping; use crate::indexer::doc_id_mapping::SegmentDocIdMapping;
@@ -95,6 +95,18 @@ impl<'a> Column for SortedDocIdMultiValueColumn<'a> {
vals[pos_in_values as usize] vals[pos_in_values as usize]
} }
fn iter(&self) -> Box<dyn Iterator<Item = u64> + '_> {
Box::new(
self.doc_id_mapping
.iter_old_doc_addrs()
.flat_map(|old_doc_addr| {
let ff_reader = &self.fast_field_readers[old_doc_addr.segment_ord as usize];
let mut vals = Vec::new();
ff_reader.get_vals(old_doc_addr.doc_id, &mut vals);
vals.into_iter()
}),
)
}
fn min_value(&self) -> u64 { fn min_value(&self) -> u64 {
self.min_value self.min_value
} }
@@ -106,80 +118,4 @@ impl<'a> Column for SortedDocIdMultiValueColumn<'a> {
fn num_vals(&self) -> u64 { fn num_vals(&self) -> u64 {
self.num_vals self.num_vals
} }
fn reader(&self) -> Box<dyn ColumnReader<u64> + '_> {
let mut reader = SortedDocMultiValueColumnReader {
doc_id_mapping: self.doc_id_mapping,
fast_field_readers: &self.fast_field_readers[..],
new_doc_id: u32::MAX,
in_buffer_idx: 0,
buffer: Vec::new(),
idx: u64::MAX,
};
reader.reset();
Box::new(reader)
}
}
struct SortedDocMultiValueColumnReader<'a> {
doc_id_mapping: &'a SegmentDocIdMapping,
fast_field_readers: &'a [MultiValuedFastFieldReader<u64>],
new_doc_id: DocId,
in_buffer_idx: usize,
buffer: Vec<u64>,
idx: u64,
}
impl<'a> SortedDocMultiValueColumnReader<'a> {
fn fill(&mut self) {
let old_doc = self.doc_id_mapping.get_old_doc_addr(self.new_doc_id);
let ff_reader = &self.fast_field_readers[old_doc.segment_ord as usize];
ff_reader.get_vals(old_doc.doc_id, &mut self.buffer);
self.in_buffer_idx = 0;
}
fn reset(&mut self) {
self.buffer.clear();
self.idx = u64::MAX;
self.in_buffer_idx = 0;
self.new_doc_id = u32::MAX;
}
}
impl<'a> ColumnReader for SortedDocMultiValueColumnReader<'a> {
fn seek(&mut self, target_idx: u64) -> u64 {
if target_idx < self.idx {
self.reset();
self.advance();
}
for _ in self.idx..target_idx {
// TODO could be optimized.
assert!(self.advance());
}
self.get()
}
fn advance(&mut self) -> bool {
loop {
self.in_buffer_idx += 1;
if self.in_buffer_idx < self.buffer.len() {
self.idx = self.idx.wrapping_add(1);
return true;
}
self.new_doc_id = self.new_doc_id.wrapping_add(1);
if self.new_doc_id >= self.doc_id_mapping.len() as u32 {
return false;
}
self.fill();
if !self.buffer.is_empty() {
self.idx = self.idx.wrapping_add(1);
return true;
}
}
}
fn get(&self) -> u64 {
self.buffer[self.in_buffer_idx]
}
} }