mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2025-12-28 04:52:55 +00:00
Compare commits
2 Commits
fix_estima
...
column-rea
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c69661f0f0 | ||
|
|
85ebb3c420 |
@@ -259,7 +259,11 @@ impl BitSet {
|
||||
// we do not check saturated els.
|
||||
let higher = el / 64u32;
|
||||
let lower = el % 64u32;
|
||||
self.len += u64::from(self.tinysets[higher as usize].insert_mut(lower));
|
||||
self.len += if self.tinysets[higher as usize].insert_mut(lower) {
|
||||
1
|
||||
} else {
|
||||
0
|
||||
};
|
||||
}
|
||||
|
||||
/// Inserts an element in the `BitSet`
|
||||
@@ -268,7 +272,11 @@ impl BitSet {
|
||||
// we do not check saturated els.
|
||||
let higher = el / 64u32;
|
||||
let lower = el % 64u32;
|
||||
self.len -= u64::from(self.tinysets[higher as usize].remove_mut(lower));
|
||||
self.len -= if self.tinysets[higher as usize].remove_mut(lower) {
|
||||
1
|
||||
} else {
|
||||
0
|
||||
};
|
||||
}
|
||||
|
||||
/// Returns true iff the elements is in the `BitSet`.
|
||||
|
||||
@@ -161,7 +161,8 @@ impl FixedSize for u8 {
|
||||
|
||||
impl BinarySerializable for bool {
|
||||
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
|
||||
writer.write_u8(u8::from(*self))
|
||||
let val = if *self { 1 } else { 0 };
|
||||
writer.write_u8(val)
|
||||
}
|
||||
fn deserialize<R: Read>(reader: &mut R) -> io::Result<bool> {
|
||||
let val = reader.read_u8()?;
|
||||
|
||||
@@ -3,7 +3,6 @@ use std::io::{self, Write};
|
||||
use ownedbytes::OwnedBytes;
|
||||
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
|
||||
|
||||
use crate::column::EstimateColumn;
|
||||
use crate::serialize::NormalizedHeader;
|
||||
use crate::{Column, FastFieldCodec, FastFieldCodecType};
|
||||
|
||||
@@ -69,14 +68,16 @@ impl FastFieldCodec for BitpackedCodec {
|
||||
assert_eq!(column.min_value(), 0u64);
|
||||
let num_bits = compute_num_bits(column.max_value());
|
||||
let mut bit_packer = BitPacker::new();
|
||||
for val in column.iter() {
|
||||
let mut reader = column.reader();
|
||||
while reader.advance() {
|
||||
let val = reader.get();
|
||||
bit_packer.write(val, num_bits, write)?;
|
||||
}
|
||||
bit_packer.close(write)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn estimate(column: &EstimateColumn) -> Option<f32> {
|
||||
fn estimate(column: &impl Column) -> Option<f32> {
|
||||
let num_bits = compute_num_bits(column.max_value());
|
||||
let num_bits_uncompressed = 64;
|
||||
Some(num_bits as f32 / num_bits_uncompressed as f32)
|
||||
|
||||
@@ -5,7 +5,6 @@ use common::{BinarySerializable, CountingWriter, DeserializeFrom};
|
||||
use ownedbytes::OwnedBytes;
|
||||
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
|
||||
|
||||
use crate::column::EstimateColumn;
|
||||
use crate::line::Line;
|
||||
use crate::serialize::NormalizedHeader;
|
||||
use crate::{Column, FastFieldCodec, FastFieldCodecType, VecColumn};
|
||||
@@ -72,11 +71,13 @@ impl FastFieldCodec for BlockwiseLinearCodec {
|
||||
}
|
||||
|
||||
// Estimate first_chunk and extrapolate
|
||||
fn estimate(column: &EstimateColumn) -> Option<f32> {
|
||||
fn estimate(column: &impl crate::Column) -> Option<f32> {
|
||||
if column.num_vals() < 10 * CHUNK_SIZE as u64 {
|
||||
return None;
|
||||
}
|
||||
let mut first_chunk: Vec<u64> = column.iter().take(CHUNK_SIZE as usize).collect();
|
||||
let mut first_chunk: Vec<u64> = crate::iter_from_reader(column.reader())
|
||||
.take(CHUNK_SIZE as usize)
|
||||
.collect();
|
||||
let line = Line::train(&VecColumn::from(&first_chunk));
|
||||
for (i, buffer_val) in first_chunk.iter_mut().enumerate() {
|
||||
let interpolated_val = line.eval(i as u64);
|
||||
@@ -101,7 +102,7 @@ impl FastFieldCodec for BlockwiseLinearCodec {
|
||||
Some(num_bits as f32 / num_bits_uncompressed as f32)
|
||||
}
|
||||
|
||||
fn serialize(column: &dyn Column, wrt: &mut impl io::Write) -> io::Result<()> {
|
||||
fn serialize(column: &dyn crate::Column, wrt: &mut impl io::Write) -> io::Result<()> {
|
||||
// The BitpackedReader assumes a normalized vector.
|
||||
assert_eq!(column.min_value(), 0);
|
||||
let mut buffer = Vec::with_capacity(CHUNK_SIZE);
|
||||
@@ -110,7 +111,7 @@ impl FastFieldCodec for BlockwiseLinearCodec {
|
||||
let num_blocks = compute_num_blocks(num_vals);
|
||||
let mut blocks = Vec::with_capacity(num_blocks);
|
||||
|
||||
let mut vals = column.iter();
|
||||
let mut vals = crate::iter_from_reader(column.reader());
|
||||
|
||||
let mut bit_packer = BitPacker::new();
|
||||
|
||||
|
||||
@@ -3,7 +3,13 @@ use std::ops::RangeInclusive;
|
||||
|
||||
use tantivy_bitpacker::minmax;
|
||||
|
||||
pub trait Column<T: PartialOrd = u64>: Send + Sync {
|
||||
pub trait Column<T: PartialOrd + Copy + 'static = u64>: Send + Sync {
|
||||
/// Return a `ColumnReader`.
|
||||
fn reader(&self) -> Box<dyn ColumnReader<T> + '_> {
|
||||
// Box::new(ColumnReaderAdapter { column: self, idx: 0, })
|
||||
Box::new(ColumnReaderAdapter::from(self))
|
||||
}
|
||||
|
||||
/// Return the value associated to the given idx.
|
||||
///
|
||||
/// This accessor should return as fast as possible.
|
||||
@@ -11,6 +17,8 @@ pub trait Column<T: PartialOrd = u64>: Send + Sync {
|
||||
/// # Panics
|
||||
///
|
||||
/// May panic if `idx` is greater than the column length.
|
||||
///
|
||||
/// TODO remove to force people to use `.reader()`.
|
||||
fn get_val(&self, idx: u64) -> T;
|
||||
|
||||
/// Fills an output buffer with the fast field values
|
||||
@@ -58,10 +66,70 @@ pub trait Column<T: PartialOrd = u64>: Send + Sync {
|
||||
fn max_value(&self) -> T;
|
||||
|
||||
fn num_vals(&self) -> u64;
|
||||
}
|
||||
|
||||
/// Returns a iterator over the data
|
||||
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = T> + 'a> {
|
||||
Box::new((0..self.num_vals()).map(|idx| self.get_val(idx)))
|
||||
/// `ColumnReader` makes it possible to read forward through a column.
|
||||
pub trait ColumnReader<T = u64> {
|
||||
/// Advance the reader to the target_idx.
|
||||
///
|
||||
/// After a successful call to seek,
|
||||
/// `.get()` should returns `column.get_val(target_idx)`.
|
||||
fn seek(&mut self, target_idx: u64) -> T;
|
||||
|
||||
fn advance(&mut self) -> bool;
|
||||
|
||||
/// Get the current value without advancing the reader
|
||||
fn get(&self) -> T;
|
||||
}
|
||||
|
||||
pub fn iter_from_reader<'a, T: 'static>(
|
||||
mut column_reader: Box<dyn ColumnReader<T> + 'a>,
|
||||
) -> impl Iterator<Item = T> + 'a {
|
||||
std::iter::from_fn(move || {
|
||||
if !column_reader.advance() {
|
||||
return None;
|
||||
}
|
||||
Some(column_reader.get())
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) struct ColumnReaderAdapter<'a, C: ?Sized, T> {
|
||||
column: &'a C,
|
||||
idx: u64,
|
||||
len: u64,
|
||||
_phantom: PhantomData<T>,
|
||||
}
|
||||
|
||||
impl<'a, C: Column<T> + ?Sized, T: Copy + PartialOrd + 'static> From<&'a C>
|
||||
for ColumnReaderAdapter<'a, C, T>
|
||||
{
|
||||
fn from(column: &'a C) -> Self {
|
||||
ColumnReaderAdapter {
|
||||
column,
|
||||
idx: u64::MAX,
|
||||
len: column.num_vals(),
|
||||
_phantom: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T, C: ?Sized> ColumnReader<T> for ColumnReaderAdapter<'a, C, T>
|
||||
where
|
||||
C: Column<T>,
|
||||
T: PartialOrd<T> + Copy + 'static,
|
||||
{
|
||||
fn seek(&mut self, idx: u64) -> T {
|
||||
self.idx = idx;
|
||||
self.get()
|
||||
}
|
||||
|
||||
fn advance(&mut self) -> bool {
|
||||
self.idx = self.idx.wrapping_add(1);
|
||||
self.idx < self.len
|
||||
}
|
||||
|
||||
fn get(&self) -> T {
|
||||
self.column.get_val(self.idx)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -71,7 +139,9 @@ pub struct VecColumn<'a, T = u64> {
|
||||
max_value: T,
|
||||
}
|
||||
|
||||
impl<'a, C: Column<T>, T: Copy + PartialOrd> Column<T> for &'a C {
|
||||
impl<'a, C: Column<T>, T> Column<T> for &'a C
|
||||
where T: Copy + PartialOrd + 'static
|
||||
{
|
||||
fn get_val(&self, idx: u64) -> T {
|
||||
(*self).get_val(idx)
|
||||
}
|
||||
@@ -88,8 +158,8 @@ impl<'a, C: Column<T>, T: Copy + PartialOrd> Column<T> for &'a C {
|
||||
(*self).num_vals()
|
||||
}
|
||||
|
||||
fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = T> + 'b> {
|
||||
(*self).iter()
|
||||
fn reader(&self) -> Box<dyn ColumnReader<T> + '_> {
|
||||
(*self).reader()
|
||||
}
|
||||
|
||||
fn get_range(&self, start: u64, output: &mut [T]) {
|
||||
@@ -97,15 +167,11 @@ impl<'a, C: Column<T>, T: Copy + PartialOrd> Column<T> for &'a C {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: Copy + PartialOrd + Send + Sync> Column<T> for VecColumn<'a, T> {
|
||||
impl<'a, T: Copy + PartialOrd + Send + Sync + 'static> Column<T> for VecColumn<'a, T> {
|
||||
fn get_val(&self, position: u64) -> T {
|
||||
self.values[position as usize]
|
||||
}
|
||||
|
||||
fn iter(&self) -> Box<dyn Iterator<Item = T> + '_> {
|
||||
Box::new(self.values.iter().copied())
|
||||
}
|
||||
|
||||
fn min_value(&self) -> T {
|
||||
self.min_value
|
||||
}
|
||||
@@ -137,57 +203,6 @@ where V: AsRef<[T]> + ?Sized
|
||||
}
|
||||
}
|
||||
|
||||
// Creates a view over a Column with a limited number of vals. Stats like min max are unchanged
|
||||
pub struct EstimateColumn<'a> {
|
||||
column: &'a dyn Column,
|
||||
num_vals: u64,
|
||||
}
|
||||
impl<'a> EstimateColumn<'a> {
|
||||
pub(crate) fn new(column: &'a dyn Column) -> Self {
|
||||
let limit_num_vals = column.num_vals().min(100_000);
|
||||
Self {
|
||||
column,
|
||||
num_vals: limit_num_vals,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Column for EstimateColumn<'a> {
|
||||
fn get_val(&self, idx: u64) -> u64 {
|
||||
(*self.column).get_val(idx)
|
||||
}
|
||||
|
||||
fn min_value(&self) -> u64 {
|
||||
(*self.column).min_value()
|
||||
}
|
||||
|
||||
fn max_value(&self) -> u64 {
|
||||
(*self.column).max_value()
|
||||
}
|
||||
|
||||
fn num_vals(&self) -> u64 {
|
||||
self.num_vals
|
||||
}
|
||||
|
||||
fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = u64> + 'b> {
|
||||
Box::new((*self.column).iter().take(self.num_vals as usize))
|
||||
}
|
||||
|
||||
fn get_range(&self, start: u64, output: &mut [u64]) {
|
||||
(*self.column).get_range(start, output)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<&'a dyn Column> for EstimateColumn<'a> {
|
||||
fn from(column: &'a dyn Column) -> Self {
|
||||
let limit_num_vals = column.num_vals().min(100_000);
|
||||
Self {
|
||||
column,
|
||||
num_vals: limit_num_vals,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct MonotonicMappingColumn<C, T, Input> {
|
||||
from_column: C,
|
||||
monotonic_mapping: T,
|
||||
@@ -195,15 +210,15 @@ struct MonotonicMappingColumn<C, T, Input> {
|
||||
}
|
||||
|
||||
/// Creates a view of a column transformed by a monotonic mapping.
|
||||
pub fn monotonic_map_column<C, T, Input: PartialOrd, Output: PartialOrd>(
|
||||
pub fn monotonic_map_column<C, T, Input: PartialOrd + Copy, Output: PartialOrd + Copy>(
|
||||
from_column: C,
|
||||
monotonic_mapping: T,
|
||||
) -> impl Column<Output>
|
||||
where
|
||||
C: Column<Input>,
|
||||
T: Fn(Input) -> Output + Send + Sync,
|
||||
Input: Send + Sync,
|
||||
Output: Send + Sync,
|
||||
Input: Send + Sync + 'static,
|
||||
Output: Send + Sync + 'static,
|
||||
{
|
||||
MonotonicMappingColumn {
|
||||
from_column,
|
||||
@@ -212,13 +227,13 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<C, T, Input: PartialOrd, Output: PartialOrd> Column<Output>
|
||||
impl<C, T, Input: PartialOrd + Copy, Output: PartialOrd + Copy> Column<Output>
|
||||
for MonotonicMappingColumn<C, T, Input>
|
||||
where
|
||||
C: Column<Input>,
|
||||
T: Fn(Input) -> Output + Send + Sync,
|
||||
Input: Send + Sync,
|
||||
Output: Send + Sync,
|
||||
Input: Send + Sync + 'static,
|
||||
Output: Send + Sync + 'static,
|
||||
{
|
||||
#[inline]
|
||||
fn get_val(&self, idx: u64) -> Output {
|
||||
@@ -240,14 +255,44 @@ where
|
||||
self.from_column.num_vals()
|
||||
}
|
||||
|
||||
fn iter(&self) -> Box<dyn Iterator<Item = Output> + '_> {
|
||||
Box::new(self.from_column.iter().map(&self.monotonic_mapping))
|
||||
fn reader(&self) -> Box<dyn ColumnReader<Output> + '_> {
|
||||
Box::new(MonotonicMappingColumnReader {
|
||||
col_reader: self.from_column.reader(),
|
||||
monotonic_mapping: &self.monotonic_mapping,
|
||||
intermdiary_type: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
// We voluntarily do not implement get_range as it yields a regression,
|
||||
// and we do not have any specialized implementation anyway.
|
||||
}
|
||||
|
||||
struct MonotonicMappingColumnReader<'a, Transform, U> {
|
||||
col_reader: Box<dyn ColumnReader<U> + 'a>,
|
||||
monotonic_mapping: &'a Transform,
|
||||
intermdiary_type: PhantomData<U>,
|
||||
}
|
||||
|
||||
impl<'a, U, V, Transform> ColumnReader<V> for MonotonicMappingColumnReader<'a, Transform, U>
|
||||
where
|
||||
U: Copy,
|
||||
V: Copy,
|
||||
Transform: Fn(U) -> V,
|
||||
{
|
||||
fn seek(&mut self, idx: u64) -> V {
|
||||
let intermediary_value = self.col_reader.seek(idx);
|
||||
(*self.monotonic_mapping)(intermediary_value)
|
||||
}
|
||||
|
||||
fn advance(&mut self) -> bool {
|
||||
self.col_reader.advance()
|
||||
}
|
||||
|
||||
fn get(&self) -> V {
|
||||
(*self.monotonic_mapping)(self.col_reader.get())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct IterColumn<T>(T);
|
||||
|
||||
impl<T> From<T> for IterColumn<T>
|
||||
@@ -261,7 +306,7 @@ where T: Iterator + Clone + ExactSizeIterator
|
||||
impl<T> Column<T::Item> for IterColumn<T>
|
||||
where
|
||||
T: Iterator + Clone + ExactSizeIterator + Send + Sync,
|
||||
T::Item: PartialOrd,
|
||||
T::Item: PartialOrd + Copy + 'static,
|
||||
{
|
||||
fn get_val(&self, idx: u64) -> T::Item {
|
||||
self.0.clone().nth(idx as usize).unwrap()
|
||||
@@ -278,10 +323,6 @@ where
|
||||
fn num_vals(&self) -> u64 {
|
||||
self.0.len() as u64
|
||||
}
|
||||
|
||||
fn iter(&self) -> Box<dyn Iterator<Item = T::Item> + '_> {
|
||||
Box::new(self.0.clone())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -314,7 +355,7 @@ mod tests {
|
||||
let vals: Vec<u64> = (-1..99).map(i64::to_u64).collect();
|
||||
let col = VecColumn::from(&vals);
|
||||
let mapped = monotonic_map_column(col, |el| i64::from_u64(el) * 10i64);
|
||||
let val_i64s: Vec<i64> = mapped.iter().collect();
|
||||
let val_i64s: Vec<i64> = crate::iter_from_reader(mapped.reader()).collect();
|
||||
for i in 0..100 {
|
||||
assert_eq!(val_i64s[i as usize], mapped.get_val(i));
|
||||
}
|
||||
@@ -328,7 +369,7 @@ mod tests {
|
||||
assert_eq!(mapped.min_value(), -10i64);
|
||||
assert_eq!(mapped.max_value(), 980i64);
|
||||
assert_eq!(mapped.num_vals(), 100);
|
||||
let val_i64s: Vec<i64> = mapped.iter().collect();
|
||||
let val_i64s: Vec<i64> = crate::iter_from_reader(mapped.reader()).collect();
|
||||
assert_eq!(val_i64s.len(), 100);
|
||||
for i in 0..100 {
|
||||
assert_eq!(val_i64s[i as usize], mapped.get_val(i));
|
||||
|
||||
@@ -22,7 +22,7 @@ use ownedbytes::OwnedBytes;
|
||||
use tantivy_bitpacker::{self, BitPacker, BitUnpacker};
|
||||
|
||||
use crate::compact_space::build_compact_space::get_compact_space;
|
||||
use crate::Column;
|
||||
use crate::{iter_from_reader, Column, ColumnReader};
|
||||
|
||||
mod blank_range;
|
||||
mod build_compact_space;
|
||||
@@ -173,11 +173,14 @@ impl CompactSpaceCompressor {
|
||||
/// Taking the vals as Vec may cost a lot of memory. It is used to sort the vals.
|
||||
pub fn train_from(column: &impl Column<u128>) -> Self {
|
||||
let mut values_sorted = BTreeSet::new();
|
||||
values_sorted.extend(column.iter());
|
||||
|
||||
let total_num_values = column.num_vals();
|
||||
|
||||
values_sorted.extend(iter_from_reader(column.reader()));
|
||||
|
||||
let compact_space =
|
||||
get_compact_space(&values_sorted, total_num_values, COST_PER_BLANK_IN_BITS);
|
||||
|
||||
let amplitude_compact_space = compact_space.amplitude_compact_space();
|
||||
|
||||
assert!(
|
||||
@@ -218,11 +221,12 @@ impl CompactSpaceCompressor {
|
||||
|
||||
pub fn compress_into(
|
||||
self,
|
||||
vals: impl Iterator<Item = u128>,
|
||||
mut vals: Box<dyn ColumnReader<u128> + '_>,
|
||||
write: &mut impl Write,
|
||||
) -> io::Result<()> {
|
||||
let mut bitpacker = BitPacker::default();
|
||||
for val in vals {
|
||||
while vals.advance() {
|
||||
let val = vals.get();
|
||||
let compact = self
|
||||
.params
|
||||
.compact_space
|
||||
@@ -300,13 +304,13 @@ impl Column<u128> for CompactSpaceDecompressor {
|
||||
self.params.num_vals
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn iter(&self) -> Box<dyn Iterator<Item = u128> + '_> {
|
||||
Box::new(self.iter())
|
||||
}
|
||||
fn get_between_vals(&self, range: RangeInclusive<u128>) -> Vec<u64> {
|
||||
self.get_between_vals(range)
|
||||
}
|
||||
|
||||
fn reader(&self) -> Box<dyn ColumnReader<u128> + '_> {
|
||||
Box::new(self.specialized_reader())
|
||||
}
|
||||
}
|
||||
|
||||
impl CompactSpaceDecompressor {
|
||||
@@ -410,18 +414,13 @@ impl CompactSpaceDecompressor {
|
||||
positions
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn iter_compact(&self) -> impl Iterator<Item = u64> + '_ {
|
||||
(0..self.params.num_vals)
|
||||
.map(move |idx| self.params.bit_unpacker.get(idx as u64, &self.data) as u64)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn iter(&self) -> impl Iterator<Item = u128> + '_ {
|
||||
// TODO: Performance. It would be better to iterate on the ranges and check existence via
|
||||
// the bit_unpacker.
|
||||
self.iter_compact()
|
||||
.map(|compact| self.compact_to_u128(compact))
|
||||
fn specialized_reader(&self) -> CompactSpaceReader<'_> {
|
||||
CompactSpaceReader {
|
||||
data: self.data.as_slice(),
|
||||
params: &self.params,
|
||||
idx: 0u64,
|
||||
len: self.params.num_vals,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
@@ -439,6 +438,30 @@ impl CompactSpaceDecompressor {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct CompactSpaceReader<'a> {
|
||||
data: &'a [u8],
|
||||
params: &'a IPCodecParams,
|
||||
idx: u64,
|
||||
len: u64,
|
||||
}
|
||||
|
||||
impl<'a> ColumnReader<u128> for CompactSpaceReader<'a> {
|
||||
fn seek(&mut self, target_idx: u64) -> u128 {
|
||||
self.idx = target_idx;
|
||||
self.get()
|
||||
}
|
||||
|
||||
fn advance(&mut self) -> bool {
|
||||
self.idx = self.idx.wrapping_add(1);
|
||||
self.idx < self.len
|
||||
}
|
||||
|
||||
fn get(&self) -> u128 {
|
||||
let compact_code = self.params.bit_unpacker.get(self.idx, self.data);
|
||||
self.params.compact_space.compact_to_u128(compact_code)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
|
||||
@@ -11,7 +11,6 @@ use std::io;
|
||||
use std::io::Write;
|
||||
use std::sync::Arc;
|
||||
|
||||
use column::EstimateColumn;
|
||||
use common::BinarySerializable;
|
||||
use compact_space::CompactSpaceDecompressor;
|
||||
use ownedbytes::OwnedBytes;
|
||||
@@ -30,7 +29,7 @@ mod serialize;
|
||||
|
||||
use self::bitpacked::BitpackedCodec;
|
||||
use self::blockwise_linear::BlockwiseLinearCodec;
|
||||
pub use self::column::{monotonic_map_column, Column, VecColumn};
|
||||
pub use self::column::{iter_from_reader, monotonic_map_column, Column, ColumnReader, VecColumn};
|
||||
use self::linear::LinearCodec;
|
||||
pub use self::monotonic_mapping::MonotonicallyMappableToU64;
|
||||
pub use self::serialize::{
|
||||
@@ -124,7 +123,7 @@ trait FastFieldCodec: 'static {
|
||||
///
|
||||
/// The column iterator should be preferred over using column `get_val` method for
|
||||
/// performance reasons.
|
||||
fn serialize(column: &dyn Column, write: &mut impl Write) -> io::Result<()>;
|
||||
fn serialize(column: &dyn Column<u64>, write: &mut impl Write) -> io::Result<()>;
|
||||
|
||||
/// Returns an estimate of the compression ratio.
|
||||
/// If the codec is not applicable, returns `None`.
|
||||
@@ -133,7 +132,7 @@ trait FastFieldCodec: 'static {
|
||||
///
|
||||
/// It could make sense to also return a value representing
|
||||
/// computational complexity.
|
||||
fn estimate(column: &EstimateColumn) -> Option<f32>;
|
||||
fn estimate(column: &impl Column) -> Option<f32>;
|
||||
}
|
||||
|
||||
pub const ALL_CODEC_TYPES: [FastFieldCodecType; 3] = [
|
||||
@@ -150,7 +149,6 @@ mod tests {
|
||||
|
||||
use crate::bitpacked::BitpackedCodec;
|
||||
use crate::blockwise_linear::BlockwiseLinearCodec;
|
||||
use crate::column::EstimateColumn;
|
||||
use crate::linear::LinearCodec;
|
||||
use crate::serialize::Header;
|
||||
|
||||
@@ -161,9 +159,7 @@ mod tests {
|
||||
let col = &VecColumn::from(data);
|
||||
let header = Header::compute_header(col, &[Codec::CODEC_TYPE])?;
|
||||
let normalized_col = header.normalize_column(col);
|
||||
|
||||
let limited_column = EstimateColumn::new(&normalized_col);
|
||||
let estimation = Codec::estimate(&limited_column)?;
|
||||
let estimation = Codec::estimate(&normalized_col)?;
|
||||
|
||||
let mut out = Vec::new();
|
||||
let col = VecColumn::from(data);
|
||||
@@ -284,16 +280,14 @@ mod tests {
|
||||
let data = (10..=20000_u64).collect::<Vec<_>>();
|
||||
let data: VecColumn = data.as_slice().into();
|
||||
|
||||
let linear_interpol_estimation =
|
||||
LinearCodec::estimate(&EstimateColumn::new(&data)).unwrap();
|
||||
let linear_interpol_estimation = LinearCodec::estimate(&data).unwrap();
|
||||
assert_le!(linear_interpol_estimation, 0.01);
|
||||
|
||||
let multi_linear_interpol_estimation =
|
||||
BlockwiseLinearCodec::estimate(&EstimateColumn::new(&data)).unwrap();
|
||||
let multi_linear_interpol_estimation = BlockwiseLinearCodec::estimate(&data).unwrap();
|
||||
assert_le!(multi_linear_interpol_estimation, 0.2);
|
||||
assert_lt!(linear_interpol_estimation, multi_linear_interpol_estimation);
|
||||
|
||||
let bitpacked_estimation = BitpackedCodec::estimate(&EstimateColumn::new(&data)).unwrap();
|
||||
let bitpacked_estimation = BitpackedCodec::estimate(&data).unwrap();
|
||||
assert_lt!(linear_interpol_estimation, bitpacked_estimation);
|
||||
}
|
||||
#[test]
|
||||
@@ -301,20 +295,18 @@ mod tests {
|
||||
let data: &[u64] = &[200, 10, 10, 10, 10, 1000, 20];
|
||||
|
||||
let data: VecColumn = data.into();
|
||||
let linear_interpol_estimation =
|
||||
LinearCodec::estimate(&EstimateColumn::new(&data)).unwrap();
|
||||
let linear_interpol_estimation = LinearCodec::estimate(&data).unwrap();
|
||||
assert_le!(linear_interpol_estimation, 0.34);
|
||||
|
||||
let bitpacked_estimation = BitpackedCodec::estimate(&EstimateColumn::new(&data)).unwrap();
|
||||
let bitpacked_estimation = BitpackedCodec::estimate(&data).unwrap();
|
||||
assert_lt!(bitpacked_estimation, linear_interpol_estimation);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn estimation_prefer_bitpacked() {
|
||||
let data = VecColumn::from(&[10, 10, 10, 10]);
|
||||
let linear_interpol_estimation =
|
||||
LinearCodec::estimate(&EstimateColumn::new(&data)).unwrap();
|
||||
let bitpacked_estimation = BitpackedCodec::estimate(&EstimateColumn::new(&data)).unwrap();
|
||||
let linear_interpol_estimation = LinearCodec::estimate(&data).unwrap();
|
||||
let bitpacked_estimation = BitpackedCodec::estimate(&data).unwrap();
|
||||
assert_lt!(bitpacked_estimation, linear_interpol_estimation);
|
||||
}
|
||||
|
||||
@@ -326,11 +318,10 @@ mod tests {
|
||||
|
||||
// in this case the linear interpolation can't in fact not be worse than bitpacking,
|
||||
// but the estimator adds some threshold, which leads to estimated worse behavior
|
||||
let linear_interpol_estimation =
|
||||
LinearCodec::estimate(&EstimateColumn::new(&data)).unwrap();
|
||||
let linear_interpol_estimation = LinearCodec::estimate(&data).unwrap();
|
||||
assert_le!(linear_interpol_estimation, 0.35);
|
||||
|
||||
let bitpacked_estimation = BitpackedCodec::estimate(&EstimateColumn::new(&data)).unwrap();
|
||||
let bitpacked_estimation = BitpackedCodec::estimate(&data).unwrap();
|
||||
assert_le!(bitpacked_estimation, 0.32);
|
||||
assert_le!(bitpacked_estimation, linear_interpol_estimation);
|
||||
}
|
||||
|
||||
@@ -67,22 +67,25 @@ impl Line {
|
||||
self.intercept.wrapping_add(linear_part)
|
||||
}
|
||||
|
||||
// Same as train, but the intercept is only estimated from provided sample positions
|
||||
pub fn estimate(ys: &dyn Column, sample_positions: &[u64]) -> Self {
|
||||
Self::train_from(ys, sample_positions.iter().cloned())
|
||||
}
|
||||
|
||||
// Intercept is only computed from provided positions
|
||||
pub fn train_from(
|
||||
ys: &dyn Column,
|
||||
positions_and_values: impl Iterator<Item = (u64, u64)>,
|
||||
) -> Self {
|
||||
let num_vals = if let Some(num_vals) = NonZeroU64::new(ys.num_vals() - 1) {
|
||||
num_vals
|
||||
fn train_from(ys: &dyn Column, positions: impl Iterator<Item = u64>) -> Self {
|
||||
let last_idx = if let Some(last_idx) = NonZeroU64::new(ys.num_vals() - 1) {
|
||||
last_idx
|
||||
} else {
|
||||
return Line::default();
|
||||
};
|
||||
|
||||
let y0 = ys.get_val(0);
|
||||
let y1 = ys.get_val(num_vals.get());
|
||||
let mut ys_reader = ys.reader();
|
||||
let y0 = ys_reader.seek(0);
|
||||
let y1 = ys_reader.seek(last_idx.get());
|
||||
|
||||
// We first independently pick our slope.
|
||||
let slope = compute_slope(y0, y1, num_vals);
|
||||
let slope = compute_slope(y0, y1, last_idx);
|
||||
|
||||
// We picked our slope. Note that it does not have to be perfect.
|
||||
// Now we need to compute the best intercept.
|
||||
@@ -112,8 +115,12 @@ impl Line {
|
||||
intercept: 0,
|
||||
};
|
||||
let heuristic_shift = y0.wrapping_sub(MID_POINT);
|
||||
line.intercept = positions_and_values
|
||||
.map(|(pos, y)| y.wrapping_sub(line.eval(pos)))
|
||||
let mut ys_reader = ys.reader();
|
||||
line.intercept = positions
|
||||
.map(|pos| {
|
||||
let y = ys_reader.seek(pos);
|
||||
y.wrapping_sub(line.eval(pos))
|
||||
})
|
||||
.min_by_key(|&val| val.wrapping_sub(heuristic_shift))
|
||||
.unwrap_or(0u64); //< Never happens.
|
||||
line
|
||||
@@ -130,10 +137,7 @@ impl Line {
|
||||
/// This function is only invariable by translation if all of the
|
||||
/// `ys` are packaged into half of the space. (See heuristic below)
|
||||
pub fn train(ys: &dyn Column) -> Self {
|
||||
Self::train_from(
|
||||
ys,
|
||||
ys.iter().enumerate().map(|(pos, val)| (pos as u64, val)),
|
||||
)
|
||||
Self::train_from(ys, 0..ys.num_vals())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,7 +4,6 @@ use common::BinarySerializable;
|
||||
use ownedbytes::OwnedBytes;
|
||||
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
|
||||
|
||||
use crate::column::EstimateColumn;
|
||||
use crate::line::Line;
|
||||
use crate::serialize::NormalizedHeader;
|
||||
use crate::{Column, FastFieldCodec, FastFieldCodecType};
|
||||
@@ -90,8 +89,7 @@ impl FastFieldCodec for LinearCodec {
|
||||
assert_eq!(column.min_value(), 0);
|
||||
let line = Line::train(column);
|
||||
|
||||
let max_offset_from_line = column
|
||||
.iter()
|
||||
let max_offset_from_line = crate::iter_from_reader(column.reader())
|
||||
.enumerate()
|
||||
.map(|(pos, actual_value)| {
|
||||
let calculated_value = line.eval(pos as u64);
|
||||
@@ -108,7 +106,12 @@ impl FastFieldCodec for LinearCodec {
|
||||
linear_params.serialize(write)?;
|
||||
|
||||
let mut bit_packer = BitPacker::new();
|
||||
for (pos, actual_value) in column.iter().enumerate() {
|
||||
let mut col_reader = column.reader();
|
||||
for pos in 0.. {
|
||||
if !col_reader.advance() {
|
||||
break;
|
||||
}
|
||||
let actual_value = col_reader.get();
|
||||
let calculated_value = line.eval(pos as u64);
|
||||
let offset = actual_value.wrapping_sub(calculated_value);
|
||||
bit_packer.write(offset, num_bits, write)?;
|
||||
@@ -122,23 +125,24 @@ impl FastFieldCodec for LinearCodec {
|
||||
/// where the local maxima for the deviation of the calculated value are and
|
||||
/// the offset to shift all values to >=0 is also unknown.
|
||||
#[allow(clippy::question_mark)]
|
||||
fn estimate(column: &EstimateColumn) -> Option<f32> {
|
||||
fn estimate(column: &impl Column) -> Option<f32> {
|
||||
if column.num_vals() < 3 {
|
||||
return None; // disable compressor for this case
|
||||
}
|
||||
|
||||
// let's sample at 0%, 5%, 10% .. 95%, 100%
|
||||
let num_vals = column.num_vals() as f32 / 100.0;
|
||||
let sample_positions_and_values = (0..20)
|
||||
let sample_positions = (0..20)
|
||||
.map(|pos| (num_vals * pos as f32 * 5.0) as u64)
|
||||
.map(|pos| (pos, column.get_val(pos)))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let line = { Line::train_from(column, sample_positions_and_values.iter().cloned()) };
|
||||
let line = Line::estimate(column, &sample_positions);
|
||||
|
||||
let estimated_bit_width = sample_positions_and_values
|
||||
let mut column_reader = column.reader();
|
||||
let estimated_bit_width = sample_positions
|
||||
.into_iter()
|
||||
.map(|(pos, actual_value)| {
|
||||
.map(|pos| {
|
||||
let actual_value = column_reader.seek(pos);
|
||||
let interpolated_val = line.eval(pos as u64);
|
||||
actual_value.wrapping_sub(interpolated_val)
|
||||
})
|
||||
|
||||
@@ -36,7 +36,11 @@ impl MonotonicallyMappableToU64 for i64 {
|
||||
impl MonotonicallyMappableToU64 for bool {
|
||||
#[inline(always)]
|
||||
fn to_u64(self) -> u64 {
|
||||
u64::from(self)
|
||||
if self {
|
||||
1
|
||||
} else {
|
||||
0
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
|
||||
@@ -28,12 +28,11 @@ use ownedbytes::OwnedBytes;
|
||||
|
||||
use crate::bitpacked::BitpackedCodec;
|
||||
use crate::blockwise_linear::BlockwiseLinearCodec;
|
||||
use crate::column::EstimateColumn;
|
||||
use crate::compact_space::CompactSpaceCompressor;
|
||||
use crate::linear::LinearCodec;
|
||||
use crate::{
|
||||
monotonic_map_column, Column, FastFieldCodec, FastFieldCodecType, MonotonicallyMappableToU64,
|
||||
VecColumn, ALL_CODEC_TYPES,
|
||||
iter_from_reader, monotonic_map_column, Column, FastFieldCodec, FastFieldCodecType,
|
||||
MonotonicallyMappableToU64, VecColumn, ALL_CODEC_TYPES,
|
||||
};
|
||||
|
||||
/// The normalized header gives some parameters after applying the following
|
||||
@@ -80,8 +79,9 @@ impl Header {
|
||||
let num_vals = column.num_vals();
|
||||
let min_value = column.min_value();
|
||||
let max_value = column.max_value();
|
||||
let gcd = crate::gcd::find_gcd(column.iter().map(|val| val - min_value))
|
||||
.filter(|gcd| gcd.get() > 1u64);
|
||||
let gcd =
|
||||
crate::gcd::find_gcd(iter_from_reader(column.reader()).map(|val| val - min_value))
|
||||
.filter(|gcd| gcd.get() > 1u64);
|
||||
let divider = DividerU64::divide_by(gcd.map(|gcd| gcd.get()).unwrap_or(1u64));
|
||||
let shifted_column = monotonic_map_column(&column, |val| divider.divide(val - min_value));
|
||||
let codec_type = detect_codec(shifted_column, codecs)?;
|
||||
@@ -126,6 +126,23 @@ impl BinarySerializable for Header {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn estimate<T: MonotonicallyMappableToU64>(
|
||||
typed_column: impl Column<T>,
|
||||
codec_type: FastFieldCodecType,
|
||||
) -> Option<f32> {
|
||||
let column = monotonic_map_column(typed_column, T::to_u64);
|
||||
let min_value = column.min_value();
|
||||
let gcd = crate::gcd::find_gcd(iter_from_reader(column.reader()).map(|val| val - min_value))
|
||||
.filter(|gcd| gcd.get() > 1u64);
|
||||
let divider = DividerU64::divide_by(gcd.map(|gcd| gcd.get()).unwrap_or(1u64));
|
||||
let normalized_column = monotonic_map_column(&column, |val| divider.divide(val - min_value));
|
||||
match codec_type {
|
||||
FastFieldCodecType::Bitpacked => BitpackedCodec::estimate(&normalized_column),
|
||||
FastFieldCodecType::Linear => LinearCodec::estimate(&normalized_column),
|
||||
FastFieldCodecType::BlockwiseLinear => BlockwiseLinearCodec::estimate(&normalized_column),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn serialize_u128(
|
||||
typed_column: impl Column<u128>,
|
||||
output: &mut impl io::Write,
|
||||
@@ -133,7 +150,7 @@ pub fn serialize_u128(
|
||||
// TODO write header, to later support more codecs
|
||||
let compressor = CompactSpaceCompressor::train_from(&typed_column);
|
||||
compressor
|
||||
.compress_into(typed_column.iter(), output)
|
||||
.compress_into(typed_column.reader(), output)
|
||||
.unwrap();
|
||||
|
||||
Ok(())
|
||||
@@ -161,29 +178,10 @@ pub fn serialize<T: MonotonicallyMappableToU64>(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn estimate<T: MonotonicallyMappableToU64>(
|
||||
typed_column: impl Column<T>,
|
||||
codec_type: FastFieldCodecType,
|
||||
) -> Option<f32> {
|
||||
let column = monotonic_map_column(typed_column, T::to_u64);
|
||||
let min_value = column.min_value();
|
||||
let gcd = crate::gcd::find_gcd(column.iter().map(|val| val - min_value))
|
||||
.filter(|gcd| gcd.get() > 1u64);
|
||||
let divider = DividerU64::divide_by(gcd.map(|gcd| gcd.get()).unwrap_or(1u64));
|
||||
let normalized_column = monotonic_map_column(&column, |val| divider.divide(val - min_value));
|
||||
let estimate_column = EstimateColumn::new(&normalized_column);
|
||||
match codec_type {
|
||||
FastFieldCodecType::Bitpacked => BitpackedCodec::estimate(&estimate_column),
|
||||
FastFieldCodecType::Linear => LinearCodec::estimate(&estimate_column),
|
||||
FastFieldCodecType::BlockwiseLinear => BlockwiseLinearCodec::estimate(&estimate_column),
|
||||
}
|
||||
}
|
||||
|
||||
fn detect_codec(
|
||||
column: impl Column<u64>,
|
||||
codecs: &[FastFieldCodecType],
|
||||
) -> Option<FastFieldCodecType> {
|
||||
let column: EstimateColumn = EstimateColumn::new(&column);
|
||||
let mut estimations = Vec::new();
|
||||
for &codec in codecs {
|
||||
let estimation_opt = match codec {
|
||||
@@ -243,7 +241,8 @@ mod tests {
|
||||
#[test]
|
||||
fn test_serialize_deserialize() {
|
||||
let original = [1u64, 5u64, 10u64];
|
||||
let restored: Vec<u64> = serialize_and_load(&original[..]).iter().collect();
|
||||
let restored: Vec<u64> =
|
||||
crate::iter_from_reader(serialize_and_load(&original[..]).reader()).collect();
|
||||
assert_eq!(&restored, &original[..]);
|
||||
}
|
||||
|
||||
|
||||
@@ -425,7 +425,7 @@ impl SegmentHistogramCollector {
|
||||
let bucket = &mut self.buckets[bucket_pos];
|
||||
bucket.doc_count += 1;
|
||||
if let Some(sub_aggregation) = self.sub_aggregations.as_mut() {
|
||||
sub_aggregation[bucket_pos].collect(doc, bucket_with_accessor)?;
|
||||
(&mut sub_aggregation[bucket_pos]).collect(doc, bucket_with_accessor)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -57,7 +57,7 @@ impl SegmentId {
|
||||
/// Picking the first 8 chars is ok to identify
|
||||
/// segments in a display message (e.g. a5c4dfcb).
|
||||
pub fn short_uuid_string(&self) -> String {
|
||||
self.0.as_simple().to_string()[..8].to_string()
|
||||
(&self.0.as_simple().to_string()[..8]).to_string()
|
||||
}
|
||||
|
||||
/// Returns a segment uuid string.
|
||||
|
||||
@@ -472,8 +472,6 @@ mod tests {
|
||||
// There are more tests in directory/mod.rs
|
||||
// The following tests are specific to the MmapDirectory
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use common::HasLen;
|
||||
|
||||
use super::*;
|
||||
@@ -612,14 +610,7 @@ mod tests {
|
||||
mmap_directory.get_cache_info().mmapped.len()
|
||||
);
|
||||
}
|
||||
// This test failed on CI. The last Mmap is dropped from the merging thread so there might
|
||||
// be a race condition indeed.
|
||||
for _ in 0..10 {
|
||||
if mmap_directory.get_cache_info().mmapped.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
std::thread::sleep(Duration::from_millis(200));
|
||||
}
|
||||
panic!("The cache still contains information. One of the Mmap has not been dropped.");
|
||||
assert!(mmap_directory.get_cache_info().mmapped.is_empty());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -136,20 +136,6 @@ impl RamDirectory {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
/// Deep clones the directory.
|
||||
///
|
||||
/// Ulterior writes on one of the copy
|
||||
/// will not affect the other copy.
|
||||
pub fn deep_clone(&self) -> RamDirectory {
|
||||
let inner_clone = InnerDirectory {
|
||||
fs: self.fs.read().unwrap().fs.clone(),
|
||||
watch_router: Default::default(),
|
||||
};
|
||||
RamDirectory {
|
||||
fs: Arc::new(RwLock::new(inner_clone)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the sum of the size of the different files
|
||||
/// in the [`RamDirectory`].
|
||||
pub fn total_mem_usage(&self) -> usize {
|
||||
@@ -270,23 +256,4 @@ mod tests {
|
||||
assert_eq!(directory_copy.atomic_read(path_atomic).unwrap(), msg_atomic);
|
||||
assert_eq!(directory_copy.atomic_read(path_seq).unwrap(), msg_seq);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ram_directory_deep_clone() {
|
||||
let dir = RamDirectory::default();
|
||||
let test = Path::new("test");
|
||||
let test2 = Path::new("test2");
|
||||
dir.atomic_write(test, b"firstwrite").unwrap();
|
||||
let dir_clone = dir.deep_clone();
|
||||
assert_eq!(
|
||||
dir_clone.atomic_read(test).unwrap(),
|
||||
dir.atomic_read(test).unwrap()
|
||||
);
|
||||
dir.atomic_write(test, b"original").unwrap();
|
||||
dir_clone.atomic_write(test, b"clone").unwrap();
|
||||
dir_clone.atomic_write(test2, b"clone2").unwrap();
|
||||
assert_eq!(dir.atomic_read(test).unwrap(), b"original");
|
||||
assert_eq!(&dir_clone.atomic_read(test).unwrap(), b"clone");
|
||||
assert_eq!(&dir_clone.atomic_read(test2).unwrap(), b"clone2");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,6 +41,7 @@ mod error;
|
||||
mod facet_reader;
|
||||
mod multivalued;
|
||||
mod readers;
|
||||
mod remapped_column;
|
||||
mod serializer;
|
||||
mod writer;
|
||||
|
||||
@@ -424,7 +425,7 @@ mod tests {
|
||||
permutation
|
||||
}
|
||||
|
||||
fn test_intfastfield_permutation_with_data(permutation: Vec<u64>) -> crate::Result<()> {
|
||||
fn test_intfastfield_permutation_with_data(permutation: &[u64]) -> crate::Result<()> {
|
||||
let path = Path::new("test");
|
||||
let n = permutation.len();
|
||||
let directory = RamDirectory::create();
|
||||
@@ -432,7 +433,7 @@ mod tests {
|
||||
let write: WritePtr = directory.open_write(Path::new("test"))?;
|
||||
let mut serializer = CompositeFastFieldSerializer::from_write(write)?;
|
||||
let mut fast_field_writers = FastFieldsWriter::from_schema(&SCHEMA);
|
||||
for &x in &permutation {
|
||||
for &x in permutation {
|
||||
fast_field_writers.add_document(&doc!(*FIELD=>x));
|
||||
}
|
||||
fast_field_writers.serialize(&mut serializer, &HashMap::new(), None)?;
|
||||
@@ -446,7 +447,6 @@ mod tests {
|
||||
.unwrap()
|
||||
.read_bytes()?;
|
||||
let fast_field_reader = open::<u64>(data)?;
|
||||
|
||||
for a in 0..n {
|
||||
assert_eq!(fast_field_reader.get_val(a as u64), permutation[a as usize]);
|
||||
}
|
||||
@@ -455,16 +455,23 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_intfastfield_permutation_gcd() -> crate::Result<()> {
|
||||
let permutation = generate_permutation_gcd();
|
||||
test_intfastfield_permutation_with_data(permutation)?;
|
||||
fn test_intfastfield_simple() -> crate::Result<()> {
|
||||
let permutation = &[1, 2, 3];
|
||||
test_intfastfield_permutation_with_data(&permutation[..])?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_intfastfield_permutation() -> crate::Result<()> {
|
||||
let permutation = generate_permutation();
|
||||
test_intfastfield_permutation_with_data(permutation)?;
|
||||
test_intfastfield_permutation_with_data(&permutation)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_intfastfield_permutation_gcd() -> crate::Result<()> {
|
||||
let permutation = generate_permutation_gcd();
|
||||
test_intfastfield_permutation_with_data(&permutation)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
mod multivalue_start_index;
|
||||
mod reader;
|
||||
mod writer;
|
||||
|
||||
pub(crate) use self::multivalue_start_index::MultivalueStartIndex;
|
||||
pub use self::reader::MultiValuedFastFieldReader;
|
||||
pub use self::writer::MultiValuedFastFieldWriter;
|
||||
pub(crate) use self::writer::MultivalueStartIndex;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
@@ -402,74 +403,6 @@ mod bench {
|
||||
use crate::schema::{Cardinality, NumericOptions, Schema};
|
||||
use crate::Document;
|
||||
|
||||
fn bench_multi_value_ff_merge_opt(
|
||||
num_docs: usize,
|
||||
segments_every_n_docs: usize,
|
||||
merge_policy: impl crate::indexer::MergePolicy + 'static,
|
||||
) {
|
||||
let mut builder = crate::schema::SchemaBuilder::new();
|
||||
|
||||
let fast_multi =
|
||||
crate::schema::NumericOptions::default().set_fast(Cardinality::MultiValues);
|
||||
let multi_field = builder.add_f64_field("f64s", fast_multi);
|
||||
|
||||
let index = crate::Index::create_in_ram(builder.build());
|
||||
|
||||
let mut writer = index.writer_for_tests().unwrap();
|
||||
writer.set_merge_policy(Box::new(merge_policy));
|
||||
|
||||
for i in 0..num_docs {
|
||||
let mut doc = crate::Document::new();
|
||||
doc.add_f64(multi_field, 0.24);
|
||||
doc.add_f64(multi_field, 0.27);
|
||||
doc.add_f64(multi_field, 0.37);
|
||||
if i % 3 == 0 {
|
||||
doc.add_f64(multi_field, 0.44);
|
||||
}
|
||||
|
||||
writer.add_document(doc).unwrap();
|
||||
if i % segments_every_n_docs == 0 {
|
||||
writer.commit().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
writer.wait_merging_threads().unwrap();
|
||||
let mut writer = index.writer_for_tests().unwrap();
|
||||
let segment_ids = index.searchable_segment_ids().unwrap();
|
||||
writer.merge(&segment_ids).wait().unwrap();
|
||||
}
|
||||
|
||||
// If a merging thread fails, we should end up with more
|
||||
// than one segment here
|
||||
assert_eq!(1, index.searchable_segments().unwrap().len());
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_multi_value_ff_merge_many_segments(b: &mut Bencher) {
|
||||
let num_docs = 100_000;
|
||||
b.iter(|| {
|
||||
bench_multi_value_ff_merge_opt(num_docs, 1_000, crate::indexer::NoMergePolicy);
|
||||
});
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_multi_value_ff_merge_many_segments_log_merge(b: &mut Bencher) {
|
||||
let num_docs = 100_000;
|
||||
b.iter(|| {
|
||||
let merge_policy = crate::indexer::LogMergePolicy::default();
|
||||
bench_multi_value_ff_merge_opt(num_docs, 1_000, merge_policy);
|
||||
});
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_multi_value_ff_merge_few_segments(b: &mut Bencher) {
|
||||
let num_docs = 100_000;
|
||||
b.iter(|| {
|
||||
bench_multi_value_ff_merge_opt(num_docs, 33_000, crate::indexer::NoMergePolicy);
|
||||
});
|
||||
}
|
||||
|
||||
fn multi_values(num_docs: usize, vals_per_doc: usize) -> Vec<Vec<u64>> {
|
||||
let mut vals = vec![];
|
||||
for _i in 0..num_docs {
|
||||
|
||||
195
src/fastfield/multivalued/multivalue_start_index.rs
Normal file
195
src/fastfield/multivalued/multivalue_start_index.rs
Normal file
@@ -0,0 +1,195 @@
|
||||
use fastfield_codecs::{Column, ColumnReader};
|
||||
|
||||
use crate::indexer::doc_id_mapping::DocIdMapping;
|
||||
use crate::DocId;
|
||||
|
||||
pub(crate) struct MultivalueStartIndex<'a, C: Column> {
|
||||
column: &'a C,
|
||||
doc_id_map: &'a DocIdMapping,
|
||||
min_value: u64,
|
||||
max_value: u64,
|
||||
}
|
||||
|
||||
struct MultivalueStartIndexReader<'a, C: Column> {
|
||||
column: &'a C,
|
||||
doc_id_map: &'a DocIdMapping,
|
||||
idx: u64,
|
||||
val: u64,
|
||||
len: u64,
|
||||
}
|
||||
|
||||
impl<'a, C: Column> MultivalueStartIndexReader<'a, C> {
|
||||
fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
|
||||
Self {
|
||||
column,
|
||||
doc_id_map,
|
||||
idx: u64::MAX,
|
||||
val: 0,
|
||||
len: doc_id_map.num_new_doc_ids() as u64 + 1,
|
||||
}
|
||||
}
|
||||
|
||||
fn reset(&mut self) {
|
||||
self.idx = u64::MAX;
|
||||
self.val = 0;
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, C: Column> ColumnReader for MultivalueStartIndexReader<'a, C> {
|
||||
fn seek(&mut self, idx: u64) -> u64 {
|
||||
if self.idx > idx {
|
||||
self.reset();
|
||||
self.advance();
|
||||
}
|
||||
for _ in self.idx..idx {
|
||||
self.advance();
|
||||
}
|
||||
self.get()
|
||||
}
|
||||
|
||||
fn advance(&mut self) -> bool {
|
||||
if self.idx == u64::MAX {
|
||||
self.idx = 0;
|
||||
self.val = 0;
|
||||
return true;
|
||||
}
|
||||
let new_doc_id: DocId = self.idx as DocId;
|
||||
self.idx += 1;
|
||||
if self.idx >= self.len {
|
||||
self.idx = self.len;
|
||||
return false;
|
||||
}
|
||||
let old_doc: DocId = self.doc_id_map.get_old_doc_id(new_doc_id);
|
||||
let num_vals_for_doc =
|
||||
self.column.get_val(old_doc as u64 + 1) - self.column.get_val(old_doc as u64);
|
||||
self.val += num_vals_for_doc;
|
||||
true
|
||||
}
|
||||
|
||||
fn get(&self) -> u64 {
|
||||
self.val
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, C: Column> MultivalueStartIndex<'a, C> {
|
||||
pub fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
|
||||
assert_eq!(column.num_vals(), doc_id_map.num_old_doc_ids() as u64 + 1);
|
||||
let iter = MultivalueStartIndexIter::new(column, doc_id_map);
|
||||
let (min_value, max_value) = tantivy_bitpacker::minmax(iter).unwrap_or((0, 0));
|
||||
MultivalueStartIndex {
|
||||
column,
|
||||
doc_id_map,
|
||||
min_value,
|
||||
max_value,
|
||||
}
|
||||
}
|
||||
|
||||
fn specialized_reader(&self) -> MultivalueStartIndexReader<'a, C> {
|
||||
MultivalueStartIndexReader::new(self.column, self.doc_id_map)
|
||||
}
|
||||
}
|
||||
impl<'a, C: Column> Column for MultivalueStartIndex<'a, C> {
|
||||
fn reader(&self) -> Box<dyn ColumnReader + '_> {
|
||||
Box::new(self.specialized_reader())
|
||||
}
|
||||
|
||||
fn get_val(&self, idx: u64) -> u64 {
|
||||
let mut reader = self.specialized_reader();
|
||||
reader.seek(idx)
|
||||
}
|
||||
|
||||
fn min_value(&self) -> u64 {
|
||||
self.min_value
|
||||
}
|
||||
|
||||
fn max_value(&self) -> u64 {
|
||||
self.max_value
|
||||
}
|
||||
|
||||
fn num_vals(&self) -> u64 {
|
||||
(self.doc_id_map.num_new_doc_ids() + 1) as u64
|
||||
}
|
||||
}
|
||||
|
||||
struct MultivalueStartIndexIter<'a, C: Column> {
|
||||
column: &'a C,
|
||||
doc_id_map: &'a DocIdMapping,
|
||||
new_doc_id: usize,
|
||||
offset: u64,
|
||||
}
|
||||
|
||||
impl<'a, C: Column> MultivalueStartIndexIter<'a, C> {
|
||||
fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
|
||||
Self {
|
||||
column,
|
||||
doc_id_map,
|
||||
new_doc_id: 0,
|
||||
offset: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, C: Column> Iterator for MultivalueStartIndexIter<'a, C> {
|
||||
type Item = u64;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
if self.new_doc_id > self.doc_id_map.num_new_doc_ids() {
|
||||
return None;
|
||||
}
|
||||
let new_doc_id = self.new_doc_id;
|
||||
self.new_doc_id += 1;
|
||||
let start_offset = self.offset;
|
||||
if new_doc_id < self.doc_id_map.num_new_doc_ids() {
|
||||
let old_doc = self.doc_id_map.get_old_doc_id(new_doc_id as u32) as u64;
|
||||
let num_vals_for_doc = self.column.get_val(old_doc + 1) - self.column.get_val(old_doc);
|
||||
self.offset += num_vals_for_doc;
|
||||
}
|
||||
Some(start_offset)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use fastfield_codecs::VecColumn;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_multivalue_start_index() {
|
||||
let doc_id_mapping = DocIdMapping::from_new_id_to_old_id(vec![4, 1, 2]);
|
||||
assert_eq!(doc_id_mapping.num_old_doc_ids(), 5);
|
||||
let col = VecColumn::from(&[0u64, 3, 5, 10, 12, 16][..]);
|
||||
let multivalue_start_index = MultivalueStartIndex::new(
|
||||
&col, // 3, 2, 5, 2, 4
|
||||
&doc_id_mapping,
|
||||
);
|
||||
assert_eq!(multivalue_start_index.num_vals(), 4);
|
||||
assert_eq!(
|
||||
fastfield_codecs::iter_from_reader(multivalue_start_index.reader())
|
||||
.collect::<Vec<u64>>(),
|
||||
vec![0, 4, 6, 11]
|
||||
); // 4, 2, 5
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_multivalue_get_vals() {
|
||||
let doc_id_mapping =
|
||||
DocIdMapping::from_new_id_to_old_id(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||
assert_eq!(doc_id_mapping.num_old_doc_ids(), 10);
|
||||
let col = VecColumn::from(&[0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55][..]);
|
||||
let multivalue_start_index = MultivalueStartIndex::new(&col, &doc_id_mapping);
|
||||
assert_eq!(
|
||||
fastfield_codecs::iter_from_reader(multivalue_start_index.reader())
|
||||
.collect::<Vec<u64>>(),
|
||||
vec![0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55]
|
||||
);
|
||||
assert_eq!(multivalue_start_index.num_vals(), 11);
|
||||
let mut multivalue_start_index_reader = multivalue_start_index.reader();
|
||||
assert_eq!(multivalue_start_index_reader.seek(3), 2);
|
||||
assert_eq!(multivalue_start_index_reader.seek(5), 5);
|
||||
assert_eq!(multivalue_start_index_reader.seek(8), 21);
|
||||
assert_eq!(multivalue_start_index_reader.seek(4), 3);
|
||||
assert_eq!(multivalue_start_index_reader.seek(0), 0);
|
||||
assert_eq!(multivalue_start_index_reader.seek(10), 55);
|
||||
}
|
||||
}
|
||||
@@ -1,10 +1,11 @@
|
||||
use std::io;
|
||||
use std::sync::Mutex;
|
||||
|
||||
use fastfield_codecs::{Column, MonotonicallyMappableToU64, VecColumn};
|
||||
use fastfield_codecs::{MonotonicallyMappableToU64, VecColumn};
|
||||
use fnv::FnvHashMap;
|
||||
|
||||
use crate::fastfield::{value_to_u64, CompositeFastFieldSerializer, FastFieldType};
|
||||
use crate::fastfield::{
|
||||
value_to_u64, CompositeFastFieldSerializer, FastFieldType, MultivalueStartIndex,
|
||||
};
|
||||
use crate::indexer::doc_id_mapping::DocIdMapping;
|
||||
use crate::postings::UnorderedTermId;
|
||||
use crate::schema::{Document, Field, Value};
|
||||
@@ -200,155 +201,3 @@ impl MultiValuedFastFieldWriter {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct MultivalueStartIndex<'a, C: Column> {
|
||||
column: &'a C,
|
||||
doc_id_map: &'a DocIdMapping,
|
||||
min_max_opt: Mutex<Option<(u64, u64)>>,
|
||||
random_seeker: Mutex<MultivalueStartIndexRandomSeeker<'a, C>>,
|
||||
}
|
||||
|
||||
struct MultivalueStartIndexRandomSeeker<'a, C: Column> {
|
||||
seek_head: MultivalueStartIndexIter<'a, C>,
|
||||
seek_next_id: u64,
|
||||
}
|
||||
impl<'a, C: Column> MultivalueStartIndexRandomSeeker<'a, C> {
|
||||
fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
|
||||
Self {
|
||||
seek_head: MultivalueStartIndexIter {
|
||||
column,
|
||||
doc_id_map,
|
||||
new_doc_id: 0,
|
||||
offset: 0u64,
|
||||
},
|
||||
seek_next_id: 0u64,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, C: Column> MultivalueStartIndex<'a, C> {
|
||||
pub fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
|
||||
assert_eq!(column.num_vals(), doc_id_map.num_old_doc_ids() as u64 + 1);
|
||||
MultivalueStartIndex {
|
||||
column,
|
||||
doc_id_map,
|
||||
min_max_opt: Mutex::default(),
|
||||
random_seeker: Mutex::new(MultivalueStartIndexRandomSeeker::new(column, doc_id_map)),
|
||||
}
|
||||
}
|
||||
|
||||
fn minmax(&self) -> (u64, u64) {
|
||||
if let Some((min, max)) = *self.min_max_opt.lock().unwrap() {
|
||||
return (min, max);
|
||||
}
|
||||
let (min, max) = tantivy_bitpacker::minmax(self.iter()).unwrap_or((0u64, 0u64));
|
||||
*self.min_max_opt.lock().unwrap() = Some((min, max));
|
||||
(min, max)
|
||||
}
|
||||
}
|
||||
impl<'a, C: Column> Column for MultivalueStartIndex<'a, C> {
|
||||
fn get_val(&self, idx: u64) -> u64 {
|
||||
let mut random_seeker_lock = self.random_seeker.lock().unwrap();
|
||||
if random_seeker_lock.seek_next_id > idx {
|
||||
*random_seeker_lock =
|
||||
MultivalueStartIndexRandomSeeker::new(self.column, self.doc_id_map);
|
||||
}
|
||||
let to_skip = idx - random_seeker_lock.seek_next_id;
|
||||
random_seeker_lock.seek_next_id = idx + 1;
|
||||
random_seeker_lock.seek_head.nth(to_skip as usize).unwrap()
|
||||
}
|
||||
|
||||
fn min_value(&self) -> u64 {
|
||||
self.minmax().0
|
||||
}
|
||||
|
||||
fn max_value(&self) -> u64 {
|
||||
self.minmax().1
|
||||
}
|
||||
|
||||
fn num_vals(&self) -> u64 {
|
||||
(self.doc_id_map.num_new_doc_ids() + 1) as u64
|
||||
}
|
||||
|
||||
fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = u64> + 'b> {
|
||||
Box::new(MultivalueStartIndexIter::new(self.column, self.doc_id_map))
|
||||
}
|
||||
}
|
||||
|
||||
struct MultivalueStartIndexIter<'a, C: Column> {
|
||||
pub column: &'a C,
|
||||
pub doc_id_map: &'a DocIdMapping,
|
||||
pub new_doc_id: usize,
|
||||
pub offset: u64,
|
||||
}
|
||||
|
||||
impl<'a, C: Column> MultivalueStartIndexIter<'a, C> {
|
||||
fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
|
||||
Self {
|
||||
column,
|
||||
doc_id_map,
|
||||
new_doc_id: 0,
|
||||
offset: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, C: Column> Iterator for MultivalueStartIndexIter<'a, C> {
|
||||
type Item = u64;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
if self.new_doc_id > self.doc_id_map.num_new_doc_ids() {
|
||||
return None;
|
||||
}
|
||||
let new_doc_id = self.new_doc_id;
|
||||
self.new_doc_id += 1;
|
||||
let start_offset = self.offset;
|
||||
if new_doc_id < self.doc_id_map.num_new_doc_ids() {
|
||||
let old_doc = self.doc_id_map.get_old_doc_id(new_doc_id as u32) as u64;
|
||||
let num_vals_for_doc = self.column.get_val(old_doc + 1) - self.column.get_val(old_doc);
|
||||
self.offset += num_vals_for_doc;
|
||||
}
|
||||
Some(start_offset)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_multivalue_start_index() {
|
||||
let doc_id_mapping = DocIdMapping::from_new_id_to_old_id(vec![4, 1, 2]);
|
||||
assert_eq!(doc_id_mapping.num_old_doc_ids(), 5);
|
||||
let col = VecColumn::from(&[0u64, 3, 5, 10, 12, 16][..]);
|
||||
let multivalue_start_index = MultivalueStartIndex::new(
|
||||
&col, // 3, 2, 5, 2, 4
|
||||
&doc_id_mapping,
|
||||
);
|
||||
assert_eq!(multivalue_start_index.num_vals(), 4);
|
||||
assert_eq!(
|
||||
multivalue_start_index.iter().collect::<Vec<u64>>(),
|
||||
vec![0, 4, 6, 11]
|
||||
); // 4, 2, 5
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_multivalue_get_vals() {
|
||||
let doc_id_mapping =
|
||||
DocIdMapping::from_new_id_to_old_id(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||
assert_eq!(doc_id_mapping.num_old_doc_ids(), 10);
|
||||
let col = VecColumn::from(&[0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55][..]);
|
||||
let multivalue_start_index = MultivalueStartIndex::new(&col, &doc_id_mapping);
|
||||
assert_eq!(
|
||||
multivalue_start_index.iter().collect::<Vec<u64>>(),
|
||||
vec![0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55]
|
||||
);
|
||||
assert_eq!(multivalue_start_index.num_vals(), 11);
|
||||
assert_eq!(multivalue_start_index.get_val(3), 2);
|
||||
assert_eq!(multivalue_start_index.get_val(5), 5);
|
||||
assert_eq!(multivalue_start_index.get_val(8), 21);
|
||||
assert_eq!(multivalue_start_index.get_val(4), 3);
|
||||
assert_eq!(multivalue_start_index.get_val(0), 0);
|
||||
assert_eq!(multivalue_start_index.get_val(10), 55);
|
||||
}
|
||||
}
|
||||
|
||||
112
src/fastfield/remapped_column.rs
Normal file
112
src/fastfield/remapped_column.rs
Normal file
@@ -0,0 +1,112 @@
|
||||
use fastfield_codecs::{Column, ColumnReader};
|
||||
use tantivy_bitpacker::BlockedBitpacker;
|
||||
|
||||
use crate::indexer::doc_id_mapping::DocIdMapping;
|
||||
use crate::DocId;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct WriterFastFieldColumn<'map, 'bitp> {
|
||||
pub(crate) doc_id_mapping_opt: Option<&'map DocIdMapping>,
|
||||
pub(crate) vals: &'bitp BlockedBitpacker,
|
||||
pub(crate) min_value: u64,
|
||||
pub(crate) max_value: u64,
|
||||
pub(crate) num_vals: u64,
|
||||
}
|
||||
|
||||
impl<'map, 'bitp> Column for WriterFastFieldColumn<'map, 'bitp> {
|
||||
/// Return the value associated to the given doc.
|
||||
///
|
||||
/// Whenever possible use the Iterator passed to the fastfield creation instead, for performance
|
||||
/// reasons.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// May panic if `doc` is greater than the index.
|
||||
fn get_val(&self, doc: u64) -> u64 {
|
||||
if let Some(doc_id_map) = self.doc_id_mapping_opt {
|
||||
self.vals
|
||||
.get(doc_id_map.get_old_doc_id(doc as u32) as usize) // consider extra
|
||||
// FastFieldReader wrapper for
|
||||
// non doc_id_map
|
||||
} else {
|
||||
self.vals.get(doc as usize)
|
||||
}
|
||||
}
|
||||
|
||||
fn reader(&self) -> Box<dyn ColumnReader + '_> {
|
||||
if let Some(doc_id_mapping) = self.doc_id_mapping_opt {
|
||||
Box::new(RemappedColumnReader {
|
||||
doc_id_mapping,
|
||||
vals: self.vals,
|
||||
idx: u64::MAX,
|
||||
len: doc_id_mapping.num_new_doc_ids() as u64,
|
||||
})
|
||||
} else {
|
||||
Box::new(BitpackedColumnReader {
|
||||
vals: self.vals,
|
||||
idx: u64::MAX,
|
||||
len: self.num_vals,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn min_value(&self) -> u64 {
|
||||
self.min_value
|
||||
}
|
||||
|
||||
fn max_value(&self) -> u64 {
|
||||
self.max_value
|
||||
}
|
||||
|
||||
fn num_vals(&self) -> u64 {
|
||||
self.num_vals
|
||||
}
|
||||
}
|
||||
|
||||
struct RemappedColumnReader<'a> {
|
||||
doc_id_mapping: &'a DocIdMapping,
|
||||
vals: &'a BlockedBitpacker,
|
||||
idx: u64,
|
||||
len: u64,
|
||||
}
|
||||
|
||||
impl<'a> ColumnReader for RemappedColumnReader<'a> {
|
||||
fn seek(&mut self, target_idx: u64) -> u64 {
|
||||
assert!(target_idx < self.len);
|
||||
self.idx = target_idx;
|
||||
self.get()
|
||||
}
|
||||
|
||||
fn advance(&mut self) -> bool {
|
||||
self.idx = self.idx.wrapping_add(1);
|
||||
self.idx < self.len
|
||||
}
|
||||
|
||||
fn get(&self) -> u64 {
|
||||
let old_doc_id: DocId = self.doc_id_mapping.get_old_doc_id(self.idx as DocId);
|
||||
self.vals.get(old_doc_id as usize)
|
||||
}
|
||||
}
|
||||
|
||||
struct BitpackedColumnReader<'a> {
|
||||
vals: &'a BlockedBitpacker,
|
||||
idx: u64,
|
||||
len: u64,
|
||||
}
|
||||
|
||||
impl<'a> ColumnReader for BitpackedColumnReader<'a> {
|
||||
fn seek(&mut self, target_idx: u64) -> u64 {
|
||||
assert!(target_idx < self.len);
|
||||
self.idx = target_idx;
|
||||
self.get()
|
||||
}
|
||||
|
||||
fn advance(&mut self) -> bool {
|
||||
self.idx = self.idx.wrapping_add(1);
|
||||
self.idx < self.len
|
||||
}
|
||||
|
||||
fn get(&self) -> u64 {
|
||||
self.vals.get(self.idx as usize)
|
||||
}
|
||||
}
|
||||
@@ -2,12 +2,13 @@ use std::collections::HashMap;
|
||||
use std::io;
|
||||
|
||||
use common;
|
||||
use fastfield_codecs::{Column, MonotonicallyMappableToU64};
|
||||
use fastfield_codecs::MonotonicallyMappableToU64;
|
||||
use fnv::FnvHashMap;
|
||||
use tantivy_bitpacker::BlockedBitpacker;
|
||||
|
||||
use super::multivalued::MultiValuedFastFieldWriter;
|
||||
use super::FastFieldType;
|
||||
use crate::fastfield::remapped_column::WriterFastFieldColumn;
|
||||
use crate::fastfield::{BytesFastFieldWriter, CompositeFastFieldSerializer};
|
||||
use crate::indexer::doc_id_mapping::DocIdMapping;
|
||||
use crate::postings::UnorderedTermId;
|
||||
@@ -351,7 +352,7 @@ impl IntFastFieldWriter {
|
||||
pub fn serialize(
|
||||
&self,
|
||||
serializer: &mut CompositeFastFieldSerializer,
|
||||
doc_id_map: Option<&DocIdMapping>,
|
||||
doc_id_mapping_opt: Option<&DocIdMapping>,
|
||||
) -> io::Result<()> {
|
||||
let (min, max) = if self.val_min > self.val_max {
|
||||
(0, 0)
|
||||
@@ -359,8 +360,8 @@ impl IntFastFieldWriter {
|
||||
(self.val_min, self.val_max)
|
||||
};
|
||||
|
||||
let fastfield_accessor = WriterFastFieldAccessProvider {
|
||||
doc_id_map,
|
||||
let fastfield_accessor = WriterFastFieldColumn {
|
||||
doc_id_mapping_opt,
|
||||
vals: &self.vals,
|
||||
min_value: min,
|
||||
max_value: max,
|
||||
@@ -372,57 +373,3 @@ impl IntFastFieldWriter {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct WriterFastFieldAccessProvider<'map, 'bitp> {
|
||||
doc_id_map: Option<&'map DocIdMapping>,
|
||||
vals: &'bitp BlockedBitpacker,
|
||||
min_value: u64,
|
||||
max_value: u64,
|
||||
num_vals: u64,
|
||||
}
|
||||
|
||||
impl<'map, 'bitp> Column for WriterFastFieldAccessProvider<'map, 'bitp> {
|
||||
/// Return the value associated to the given doc.
|
||||
///
|
||||
/// Whenever possible use the Iterator passed to the fastfield creation instead, for performance
|
||||
/// reasons.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// May panic if `doc` is greater than the index.
|
||||
fn get_val(&self, doc: u64) -> u64 {
|
||||
if let Some(doc_id_map) = self.doc_id_map {
|
||||
self.vals
|
||||
.get(doc_id_map.get_old_doc_id(doc as u32) as usize) // consider extra
|
||||
// FastFieldReader wrapper for
|
||||
// non doc_id_map
|
||||
} else {
|
||||
self.vals.get(doc as usize)
|
||||
}
|
||||
}
|
||||
|
||||
fn iter(&self) -> Box<dyn Iterator<Item = u64> + '_> {
|
||||
if let Some(doc_id_map) = self.doc_id_map {
|
||||
Box::new(
|
||||
doc_id_map
|
||||
.iter_old_doc_ids()
|
||||
.map(|doc_id| self.vals.get(doc_id as usize)),
|
||||
)
|
||||
} else {
|
||||
Box::new(self.vals.iter())
|
||||
}
|
||||
}
|
||||
|
||||
fn min_value(&self) -> u64 {
|
||||
self.min_value
|
||||
}
|
||||
|
||||
fn max_value(&self) -> u64 {
|
||||
self.max_value
|
||||
}
|
||||
|
||||
fn num_vals(&self) -> u64 {
|
||||
self.num_vals
|
||||
}
|
||||
}
|
||||
|
||||
@@ -246,27 +246,18 @@ impl DeleteCursor {
|
||||
mod tests {
|
||||
|
||||
use super::{DeleteOperation, DeleteQueue};
|
||||
use crate::query::{Explanation, Scorer, Weight};
|
||||
use crate::{DocId, Score, SegmentReader};
|
||||
|
||||
struct DummyWeight;
|
||||
impl Weight for DummyWeight {
|
||||
fn scorer(&self, _reader: &SegmentReader, _boost: Score) -> crate::Result<Box<dyn Scorer>> {
|
||||
Err(crate::TantivyError::InternalError("dummy impl".to_owned()))
|
||||
}
|
||||
|
||||
fn explain(&self, _reader: &SegmentReader, _doc: DocId) -> crate::Result<Explanation> {
|
||||
Err(crate::TantivyError::InternalError("dummy impl".to_owned()))
|
||||
}
|
||||
}
|
||||
use crate::schema::{Field, Term};
|
||||
|
||||
#[test]
|
||||
fn test_deletequeue() {
|
||||
let delete_queue = DeleteQueue::new();
|
||||
|
||||
let make_op = |i: usize| DeleteOperation {
|
||||
opstamp: i as u64,
|
||||
target: Box::new(DummyWeight),
|
||||
let make_op = |i: usize| {
|
||||
let field = Field::from_field_id(1u32);
|
||||
DeleteOperation {
|
||||
opstamp: i as u64,
|
||||
term: Term::from_field_u64(field, i as u64),
|
||||
}
|
||||
};
|
||||
|
||||
delete_queue.push(make_op(1));
|
||||
|
||||
@@ -11,6 +11,7 @@ use super::segment_updater::SegmentUpdater;
|
||||
use super::{AddBatch, AddBatchReceiver, AddBatchSender, PreparedCommit};
|
||||
use crate::core::{Index, Segment, SegmentComponent, SegmentId, SegmentMeta, SegmentReader};
|
||||
use crate::directory::{DirectoryLock, GarbageCollectionResult, TerminatingWrite};
|
||||
use crate::docset::{DocSet, TERMINATED};
|
||||
use crate::error::TantivyError;
|
||||
use crate::fastfield::write_alive_bitset;
|
||||
use crate::indexer::delete_queue::{DeleteCursor, DeleteQueue};
|
||||
@@ -19,9 +20,8 @@ use crate::indexer::index_writer_status::IndexWriterStatus;
|
||||
use crate::indexer::operation::DeleteOperation;
|
||||
use crate::indexer::stamper::Stamper;
|
||||
use crate::indexer::{MergePolicy, SegmentEntry, SegmentWriter};
|
||||
use crate::query::{Query, TermQuery};
|
||||
use crate::schema::{Document, IndexRecordOption, Term};
|
||||
use crate::{FutureResult, IndexReader, Opstamp};
|
||||
use crate::{FutureResult, Opstamp};
|
||||
|
||||
// Size of the margin for the `memory_arena`. A segment is closed when the remaining memory
|
||||
// in the `memory_arena` goes below MARGIN_IN_BYTES.
|
||||
@@ -57,7 +57,6 @@ pub struct IndexWriter {
|
||||
_directory_lock: Option<DirectoryLock>,
|
||||
|
||||
index: Index,
|
||||
index_reader: IndexReader,
|
||||
|
||||
memory_arena_in_bytes_per_thread: usize,
|
||||
|
||||
@@ -93,14 +92,19 @@ fn compute_deleted_bitset(
|
||||
|
||||
// A delete operation should only affect
|
||||
// document that were inserted before it.
|
||||
delete_op
|
||||
.target
|
||||
.for_each(segment_reader, &mut |doc_matching_delete_query, _| {
|
||||
if doc_opstamps.is_deleted(doc_matching_delete_query, delete_op.opstamp) {
|
||||
alive_bitset.remove(doc_matching_delete_query);
|
||||
let inverted_index = segment_reader.inverted_index(delete_op.term.field())?;
|
||||
if let Some(mut docset) =
|
||||
inverted_index.read_postings(&delete_op.term, IndexRecordOption::Basic)?
|
||||
{
|
||||
let mut doc_matching_deleted_term = docset.doc();
|
||||
while doc_matching_deleted_term != TERMINATED {
|
||||
if doc_opstamps.is_deleted(doc_matching_deleted_term, delete_op.opstamp) {
|
||||
alive_bitset.remove(doc_matching_deleted_term);
|
||||
might_have_changed = true;
|
||||
}
|
||||
})?;
|
||||
doc_matching_deleted_term = docset.advance();
|
||||
}
|
||||
}
|
||||
delete_cursor.advance();
|
||||
}
|
||||
Ok(might_have_changed)
|
||||
@@ -298,7 +302,6 @@ impl IndexWriter {
|
||||
|
||||
memory_arena_in_bytes_per_thread,
|
||||
index: index.clone(),
|
||||
index_reader: index.reader()?,
|
||||
|
||||
index_writer_status: IndexWriterStatus::from(document_receiver),
|
||||
operation_sender: document_sender,
|
||||
@@ -663,33 +666,10 @@ impl IndexWriter {
|
||||
/// Like adds, the deletion itself will be visible
|
||||
/// only after calling `commit()`.
|
||||
pub fn delete_term(&self, term: Term) -> Opstamp {
|
||||
let query = TermQuery::new(term, IndexRecordOption::Basic);
|
||||
// For backward compatibility, if Term is invalid for the index, do nothing but return an
|
||||
// Opstamp
|
||||
self.delete_query(Box::new(query))
|
||||
.unwrap_or_else(|_| self.stamper.stamp())
|
||||
}
|
||||
|
||||
/// Delete all documents matching a given query.
|
||||
/// Returns an `Err` if the query can't be executed.
|
||||
///
|
||||
/// Delete operation only affects documents that
|
||||
/// were added in previous commits, and documents
|
||||
/// that were added previously in the same commit.
|
||||
///
|
||||
/// Like adds, the deletion itself will be visible
|
||||
/// only after calling `commit()`.
|
||||
#[doc(hidden)]
|
||||
pub fn delete_query(&self, query: Box<dyn Query>) -> crate::Result<Opstamp> {
|
||||
let weight = query.weight(&self.index_reader.searcher(), false)?;
|
||||
|
||||
let opstamp = self.stamper.stamp();
|
||||
let delete_operation = DeleteOperation {
|
||||
opstamp,
|
||||
target: weight,
|
||||
};
|
||||
let delete_operation = DeleteOperation { opstamp, term };
|
||||
self.delete_queue.push(delete_operation);
|
||||
Ok(opstamp)
|
||||
opstamp
|
||||
}
|
||||
|
||||
/// Returns the opstamp of the last successful commit.
|
||||
@@ -758,17 +738,10 @@ impl IndexWriter {
|
||||
let (batch_opstamp, stamps) = self.get_batch_opstamps(count);
|
||||
|
||||
let mut adds = AddBatch::default();
|
||||
|
||||
for (user_op, opstamp) in user_operations_it.zip(stamps) {
|
||||
match user_op {
|
||||
UserOperation::Delete(term) => {
|
||||
let query = TermQuery::new(term, IndexRecordOption::Basic);
|
||||
let weight = query.weight(&self.index_reader.searcher(), false)?;
|
||||
|
||||
let delete_operation = DeleteOperation {
|
||||
opstamp,
|
||||
target: weight,
|
||||
};
|
||||
let delete_operation = DeleteOperation { opstamp, term };
|
||||
self.delete_queue.push(delete_operation);
|
||||
}
|
||||
UserOperation::Add(document) => {
|
||||
@@ -813,7 +786,7 @@ mod tests {
|
||||
use crate::directory::error::LockError;
|
||||
use crate::error::*;
|
||||
use crate::indexer::NoMergePolicy;
|
||||
use crate::query::{BooleanQuery, Occur, Query, QueryParser, TermQuery};
|
||||
use crate::query::{QueryParser, TermQuery};
|
||||
use crate::schema::{
|
||||
self, Cardinality, Facet, FacetOptions, IndexRecordOption, NumericOptions,
|
||||
TextFieldIndexing, TextOptions, FAST, INDEXED, STORED, STRING, TEXT,
|
||||
@@ -1445,72 +1418,10 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_delete_query_with_sort_by_field() -> crate::Result<()> {
|
||||
let mut schema_builder = schema::Schema::builder();
|
||||
let id_field =
|
||||
schema_builder.add_u64_field("id", schema::INDEXED | schema::STORED | schema::FAST);
|
||||
let schema = schema_builder.build();
|
||||
|
||||
let settings = IndexSettings {
|
||||
sort_by_field: Some(IndexSortByField {
|
||||
field: "id".to_string(),
|
||||
order: Order::Desc,
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let index = Index::builder()
|
||||
.schema(schema)
|
||||
.settings(settings)
|
||||
.create_in_ram()?;
|
||||
let index_reader = index.reader()?;
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
|
||||
// create and delete docs in same commit
|
||||
for id in 0u64..5u64 {
|
||||
index_writer.add_document(doc!(id_field => id))?;
|
||||
}
|
||||
for id in 1u64..4u64 {
|
||||
let term = Term::from_field_u64(id_field, id);
|
||||
let not_term = Term::from_field_u64(id_field, 2);
|
||||
let term = Box::new(TermQuery::new(term, Default::default()));
|
||||
let not_term = Box::new(TermQuery::new(not_term, Default::default()));
|
||||
|
||||
let query: BooleanQuery = vec![
|
||||
(Occur::Must, term as Box<dyn Query>),
|
||||
(Occur::MustNot, not_term as Box<dyn Query>),
|
||||
]
|
||||
.into();
|
||||
|
||||
index_writer.delete_query(Box::new(query))?;
|
||||
}
|
||||
for id in 5u64..10u64 {
|
||||
index_writer.add_document(doc!(id_field => id))?;
|
||||
}
|
||||
index_writer.commit()?;
|
||||
index_reader.reload()?;
|
||||
|
||||
let searcher = index_reader.searcher();
|
||||
assert_eq!(searcher.segment_readers().len(), 1);
|
||||
|
||||
let segment_reader = searcher.segment_reader(0);
|
||||
assert_eq!(segment_reader.num_docs(), 8);
|
||||
assert_eq!(segment_reader.max_doc(), 10);
|
||||
let fast_field_reader = segment_reader.fast_fields().u64(id_field)?;
|
||||
let in_order_alive_ids: Vec<u64> = segment_reader
|
||||
.doc_ids_alive()
|
||||
.map(|doc| fast_field_reader.get_val(doc as u64))
|
||||
.collect();
|
||||
assert_eq!(&in_order_alive_ids[..], &[9, 8, 7, 6, 5, 4, 2, 0]);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
enum IndexingOp {
|
||||
AddDoc { id: u64 },
|
||||
DeleteDoc { id: u64 },
|
||||
DeleteDocQuery { id: u64 },
|
||||
Commit,
|
||||
Merge,
|
||||
}
|
||||
@@ -1518,7 +1429,6 @@ mod tests {
|
||||
fn balanced_operation_strategy() -> impl Strategy<Value = IndexingOp> {
|
||||
prop_oneof![
|
||||
(0u64..20u64).prop_map(|id| IndexingOp::DeleteDoc { id }),
|
||||
(0u64..20u64).prop_map(|id| IndexingOp::DeleteDocQuery { id }),
|
||||
(0u64..20u64).prop_map(|id| IndexingOp::AddDoc { id }),
|
||||
(0u64..1u64).prop_map(|_| IndexingOp::Commit),
|
||||
(0u64..1u64).prop_map(|_| IndexingOp::Merge),
|
||||
@@ -1527,8 +1437,7 @@ mod tests {
|
||||
|
||||
fn adding_operation_strategy() -> impl Strategy<Value = IndexingOp> {
|
||||
prop_oneof![
|
||||
5 => (0u64..100u64).prop_map(|id| IndexingOp::DeleteDoc { id }),
|
||||
5 => (0u64..100u64).prop_map(|id| IndexingOp::DeleteDocQuery { id }),
|
||||
10 => (0u64..100u64).prop_map(|id| IndexingOp::DeleteDoc { id }),
|
||||
50 => (0u64..100u64).prop_map(|id| IndexingOp::AddDoc { id }),
|
||||
2 => (0u64..1u64).prop_map(|_| IndexingOp::Commit),
|
||||
1 => (0u64..1u64).prop_map(|_| IndexingOp::Merge),
|
||||
@@ -1548,10 +1457,6 @@ mod tests {
|
||||
existing_ids.remove(&id);
|
||||
deleted_ids.insert(id);
|
||||
}
|
||||
IndexingOp::DeleteDocQuery { id } => {
|
||||
existing_ids.remove(&id);
|
||||
deleted_ids.insert(id);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
@@ -1634,11 +1539,6 @@ mod tests {
|
||||
IndexingOp::DeleteDoc { id } => {
|
||||
index_writer.delete_term(Term::from_field_u64(id_field, id));
|
||||
}
|
||||
IndexingOp::DeleteDocQuery { id } => {
|
||||
let term = Term::from_field_u64(id_field, id);
|
||||
let query = TermQuery::new(term, Default::default());
|
||||
index_writer.delete_query(Box::new(query))?;
|
||||
}
|
||||
IndexingOp::Commit => {
|
||||
index_writer.commit()?;
|
||||
}
|
||||
|
||||
@@ -1,11 +1,20 @@
|
||||
use crate::query::Weight;
|
||||
use crate::schema::{Document, Term};
|
||||
use crate::Opstamp;
|
||||
|
||||
/// Timestamped Delete operation.
|
||||
#[derive(Clone, Eq, PartialEq, Debug)]
|
||||
pub struct DeleteOperation {
|
||||
pub opstamp: Opstamp,
|
||||
pub target: Box<dyn Weight>,
|
||||
pub term: Term,
|
||||
}
|
||||
|
||||
impl Default for DeleteOperation {
|
||||
fn default() -> Self {
|
||||
DeleteOperation {
|
||||
opstamp: 0u64,
|
||||
term: Term::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Timestamped Add operation.
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use fastfield_codecs::Column;
|
||||
use fastfield_codecs::{Column, ColumnReader};
|
||||
use itertools::Itertools;
|
||||
|
||||
use crate::indexer::doc_id_mapping::SegmentDocIdMapping;
|
||||
use crate::schema::Field;
|
||||
use crate::{DocAddress, SegmentReader};
|
||||
use crate::{DocAddress, DocId, SegmentReader};
|
||||
|
||||
pub(crate) struct SortedDocIdColumn<'a> {
|
||||
doc_id_mapping: &'a SegmentDocIdMapping,
|
||||
@@ -87,17 +87,14 @@ impl<'a> Column for SortedDocIdColumn<'a> {
|
||||
self.fast_field_readers[segment_ord as usize].get_val(doc_id as u64)
|
||||
}
|
||||
|
||||
fn iter(&self) -> Box<dyn Iterator<Item = u64> + '_> {
|
||||
Box::new(
|
||||
self.doc_id_mapping
|
||||
.iter_old_doc_addrs()
|
||||
.map(|old_doc_addr| {
|
||||
let fast_field_reader =
|
||||
&self.fast_field_readers[old_doc_addr.segment_ord as usize];
|
||||
fast_field_reader.get_val(old_doc_addr.doc_id as u64)
|
||||
}),
|
||||
)
|
||||
fn reader(&self) -> Box<dyn ColumnReader<u64> + '_> {
|
||||
Box::new(SortedDocIdColumnReader {
|
||||
doc_id_mapping: self.doc_id_mapping,
|
||||
fast_field_readers: &self.fast_field_readers[..],
|
||||
new_doc_id: u32::MAX,
|
||||
})
|
||||
}
|
||||
|
||||
fn min_value(&self) -> u64 {
|
||||
self.min_value
|
||||
}
|
||||
@@ -110,3 +107,27 @@ impl<'a> Column for SortedDocIdColumn<'a> {
|
||||
self.num_vals
|
||||
}
|
||||
}
|
||||
|
||||
struct SortedDocIdColumnReader<'a> {
|
||||
doc_id_mapping: &'a SegmentDocIdMapping,
|
||||
fast_field_readers: &'a [Arc<dyn Column>],
|
||||
new_doc_id: DocId,
|
||||
}
|
||||
|
||||
impl<'a> ColumnReader for SortedDocIdColumnReader<'a> {
|
||||
fn seek(&mut self, target_idx: u64) -> u64 {
|
||||
assert!(target_idx < self.doc_id_mapping.len() as u64);
|
||||
self.new_doc_id = target_idx as u32;
|
||||
self.get()
|
||||
}
|
||||
|
||||
fn advance(&mut self) -> bool {
|
||||
self.new_doc_id = self.new_doc_id.wrapping_add(1);
|
||||
self.new_doc_id < self.doc_id_mapping.len() as u32
|
||||
}
|
||||
|
||||
fn get(&self) -> u64 {
|
||||
let old_doc = self.doc_id_mapping.get_old_doc_addr(self.new_doc_id);
|
||||
self.fast_field_readers[old_doc.segment_ord as usize].get_val(old_doc.doc_id as u64)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::cmp;
|
||||
|
||||
use fastfield_codecs::Column;
|
||||
use fastfield_codecs::{Column, ColumnReader};
|
||||
|
||||
use crate::fastfield::{MultiValueLength, MultiValuedFastFieldReader};
|
||||
use crate::indexer::doc_id_mapping::SegmentDocIdMapping;
|
||||
@@ -95,18 +95,6 @@ impl<'a> Column for SortedDocIdMultiValueColumn<'a> {
|
||||
vals[pos_in_values as usize]
|
||||
}
|
||||
|
||||
fn iter(&self) -> Box<dyn Iterator<Item = u64> + '_> {
|
||||
Box::new(
|
||||
self.doc_id_mapping
|
||||
.iter_old_doc_addrs()
|
||||
.flat_map(|old_doc_addr| {
|
||||
let ff_reader = &self.fast_field_readers[old_doc_addr.segment_ord as usize];
|
||||
let mut vals = Vec::new();
|
||||
ff_reader.get_vals(old_doc_addr.doc_id, &mut vals);
|
||||
vals.into_iter()
|
||||
}),
|
||||
)
|
||||
}
|
||||
fn min_value(&self) -> u64 {
|
||||
self.min_value
|
||||
}
|
||||
@@ -118,4 +106,80 @@ impl<'a> Column for SortedDocIdMultiValueColumn<'a> {
|
||||
fn num_vals(&self) -> u64 {
|
||||
self.num_vals
|
||||
}
|
||||
|
||||
fn reader(&self) -> Box<dyn ColumnReader<u64> + '_> {
|
||||
let mut reader = SortedDocMultiValueColumnReader {
|
||||
doc_id_mapping: self.doc_id_mapping,
|
||||
fast_field_readers: &self.fast_field_readers[..],
|
||||
new_doc_id: u32::MAX,
|
||||
in_buffer_idx: 0,
|
||||
buffer: Vec::new(),
|
||||
idx: u64::MAX,
|
||||
};
|
||||
reader.reset();
|
||||
Box::new(reader)
|
||||
}
|
||||
}
|
||||
|
||||
struct SortedDocMultiValueColumnReader<'a> {
|
||||
doc_id_mapping: &'a SegmentDocIdMapping,
|
||||
fast_field_readers: &'a [MultiValuedFastFieldReader<u64>],
|
||||
|
||||
new_doc_id: DocId,
|
||||
in_buffer_idx: usize,
|
||||
buffer: Vec<u64>,
|
||||
idx: u64,
|
||||
}
|
||||
|
||||
impl<'a> SortedDocMultiValueColumnReader<'a> {
|
||||
fn fill(&mut self) {
|
||||
let old_doc = self.doc_id_mapping.get_old_doc_addr(self.new_doc_id);
|
||||
let ff_reader = &self.fast_field_readers[old_doc.segment_ord as usize];
|
||||
ff_reader.get_vals(old_doc.doc_id, &mut self.buffer);
|
||||
self.in_buffer_idx = 0;
|
||||
}
|
||||
|
||||
fn reset(&mut self) {
|
||||
self.buffer.clear();
|
||||
self.idx = u64::MAX;
|
||||
self.in_buffer_idx = 0;
|
||||
self.new_doc_id = u32::MAX;
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> ColumnReader for SortedDocMultiValueColumnReader<'a> {
|
||||
fn seek(&mut self, target_idx: u64) -> u64 {
|
||||
if target_idx < self.idx {
|
||||
self.reset();
|
||||
self.advance();
|
||||
}
|
||||
for _ in self.idx..target_idx {
|
||||
// TODO could be optimized.
|
||||
assert!(self.advance());
|
||||
}
|
||||
self.get()
|
||||
}
|
||||
|
||||
fn advance(&mut self) -> bool {
|
||||
loop {
|
||||
self.in_buffer_idx += 1;
|
||||
if self.in_buffer_idx < self.buffer.len() {
|
||||
self.idx = self.idx.wrapping_add(1);
|
||||
return true;
|
||||
}
|
||||
self.new_doc_id = self.new_doc_id.wrapping_add(1);
|
||||
if self.new_doc_id >= self.doc_id_mapping.len() as u32 {
|
||||
return false;
|
||||
}
|
||||
self.fill();
|
||||
if !self.buffer.is_empty() {
|
||||
self.idx = self.idx.wrapping_add(1);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn get(&self) -> u64 {
|
||||
self.buffer[self.in_buffer_idx]
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user