Compare commits

...

2 Commits

Author SHA1 Message Date
Paul Masurel
c69661f0f0 Removing iterators. 2022-09-21 23:26:01 +09:00
Paul Masurel
85ebb3c420 Introducing ColumnReader.
Introducing a ColumnReader trait and .reader() to Column,
hence removing the dreaded Mutex in the `MultiValueStartIndex`
thingy.
2022-09-21 12:47:44 +09:00
16 changed files with 638 additions and 314 deletions

View File

@@ -68,7 +68,9 @@ impl FastFieldCodec for BitpackedCodec {
assert_eq!(column.min_value(), 0u64);
let num_bits = compute_num_bits(column.max_value());
let mut bit_packer = BitPacker::new();
for val in column.iter() {
let mut reader = column.reader();
while reader.advance() {
let val = reader.get();
bit_packer.write(val, num_bits, write)?;
}
bit_packer.close(write)?;

View File

@@ -75,7 +75,9 @@ impl FastFieldCodec for BlockwiseLinearCodec {
if column.num_vals() < 10 * CHUNK_SIZE as u64 {
return None;
}
let mut first_chunk: Vec<u64> = column.iter().take(CHUNK_SIZE as usize).collect();
let mut first_chunk: Vec<u64> = crate::iter_from_reader(column.reader())
.take(CHUNK_SIZE as usize)
.collect();
let line = Line::train(&VecColumn::from(&first_chunk));
for (i, buffer_val) in first_chunk.iter_mut().enumerate() {
let interpolated_val = line.eval(i as u64);
@@ -109,7 +111,7 @@ impl FastFieldCodec for BlockwiseLinearCodec {
let num_blocks = compute_num_blocks(num_vals);
let mut blocks = Vec::with_capacity(num_blocks);
let mut vals = column.iter();
let mut vals = crate::iter_from_reader(column.reader());
let mut bit_packer = BitPacker::new();

View File

@@ -3,7 +3,13 @@ use std::ops::RangeInclusive;
use tantivy_bitpacker::minmax;
pub trait Column<T: PartialOrd = u64>: Send + Sync {
pub trait Column<T: PartialOrd + Copy + 'static = u64>: Send + Sync {
/// Return a `ColumnReader`.
fn reader(&self) -> Box<dyn ColumnReader<T> + '_> {
// Box::new(ColumnReaderAdapter { column: self, idx: 0, })
Box::new(ColumnReaderAdapter::from(self))
}
/// Return the value associated to the given idx.
///
/// This accessor should return as fast as possible.
@@ -11,6 +17,8 @@ pub trait Column<T: PartialOrd = u64>: Send + Sync {
/// # Panics
///
/// May panic if `idx` is greater than the column length.
///
/// TODO remove to force people to use `.reader()`.
fn get_val(&self, idx: u64) -> T;
/// Fills an output buffer with the fast field values
@@ -58,10 +66,70 @@ pub trait Column<T: PartialOrd = u64>: Send + Sync {
fn max_value(&self) -> T;
fn num_vals(&self) -> u64;
}
/// Returns a iterator over the data
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = T> + 'a> {
Box::new((0..self.num_vals()).map(|idx| self.get_val(idx)))
/// `ColumnReader` makes it possible to read forward through a column.
pub trait ColumnReader<T = u64> {
/// Advance the reader to the target_idx.
///
/// After a successful call to seek,
/// `.get()` should returns `column.get_val(target_idx)`.
fn seek(&mut self, target_idx: u64) -> T;
fn advance(&mut self) -> bool;
/// Get the current value without advancing the reader
fn get(&self) -> T;
}
pub fn iter_from_reader<'a, T: 'static>(
mut column_reader: Box<dyn ColumnReader<T> + 'a>,
) -> impl Iterator<Item = T> + 'a {
std::iter::from_fn(move || {
if !column_reader.advance() {
return None;
}
Some(column_reader.get())
})
}
pub(crate) struct ColumnReaderAdapter<'a, C: ?Sized, T> {
column: &'a C,
idx: u64,
len: u64,
_phantom: PhantomData<T>,
}
impl<'a, C: Column<T> + ?Sized, T: Copy + PartialOrd + 'static> From<&'a C>
for ColumnReaderAdapter<'a, C, T>
{
fn from(column: &'a C) -> Self {
ColumnReaderAdapter {
column,
idx: u64::MAX,
len: column.num_vals(),
_phantom: PhantomData,
}
}
}
impl<'a, T, C: ?Sized> ColumnReader<T> for ColumnReaderAdapter<'a, C, T>
where
C: Column<T>,
T: PartialOrd<T> + Copy + 'static,
{
fn seek(&mut self, idx: u64) -> T {
self.idx = idx;
self.get()
}
fn advance(&mut self) -> bool {
self.idx = self.idx.wrapping_add(1);
self.idx < self.len
}
fn get(&self) -> T {
self.column.get_val(self.idx)
}
}
@@ -71,7 +139,9 @@ pub struct VecColumn<'a, T = u64> {
max_value: T,
}
impl<'a, C: Column<T>, T: Copy + PartialOrd> Column<T> for &'a C {
impl<'a, C: Column<T>, T> Column<T> for &'a C
where T: Copy + PartialOrd + 'static
{
fn get_val(&self, idx: u64) -> T {
(*self).get_val(idx)
}
@@ -88,8 +158,8 @@ impl<'a, C: Column<T>, T: Copy + PartialOrd> Column<T> for &'a C {
(*self).num_vals()
}
fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = T> + 'b> {
(*self).iter()
fn reader(&self) -> Box<dyn ColumnReader<T> + '_> {
(*self).reader()
}
fn get_range(&self, start: u64, output: &mut [T]) {
@@ -97,15 +167,11 @@ impl<'a, C: Column<T>, T: Copy + PartialOrd> Column<T> for &'a C {
}
}
impl<'a, T: Copy + PartialOrd + Send + Sync> Column<T> for VecColumn<'a, T> {
impl<'a, T: Copy + PartialOrd + Send + Sync + 'static> Column<T> for VecColumn<'a, T> {
fn get_val(&self, position: u64) -> T {
self.values[position as usize]
}
fn iter(&self) -> Box<dyn Iterator<Item = T> + '_> {
Box::new(self.values.iter().copied())
}
fn min_value(&self) -> T {
self.min_value
}
@@ -144,15 +210,15 @@ struct MonotonicMappingColumn<C, T, Input> {
}
/// Creates a view of a column transformed by a monotonic mapping.
pub fn monotonic_map_column<C, T, Input: PartialOrd, Output: PartialOrd>(
pub fn monotonic_map_column<C, T, Input: PartialOrd + Copy, Output: PartialOrd + Copy>(
from_column: C,
monotonic_mapping: T,
) -> impl Column<Output>
where
C: Column<Input>,
T: Fn(Input) -> Output + Send + Sync,
Input: Send + Sync,
Output: Send + Sync,
Input: Send + Sync + 'static,
Output: Send + Sync + 'static,
{
MonotonicMappingColumn {
from_column,
@@ -161,13 +227,13 @@ where
}
}
impl<C, T, Input: PartialOrd, Output: PartialOrd> Column<Output>
impl<C, T, Input: PartialOrd + Copy, Output: PartialOrd + Copy> Column<Output>
for MonotonicMappingColumn<C, T, Input>
where
C: Column<Input>,
T: Fn(Input) -> Output + Send + Sync,
Input: Send + Sync,
Output: Send + Sync,
Input: Send + Sync + 'static,
Output: Send + Sync + 'static,
{
#[inline]
fn get_val(&self, idx: u64) -> Output {
@@ -189,14 +255,44 @@ where
self.from_column.num_vals()
}
fn iter(&self) -> Box<dyn Iterator<Item = Output> + '_> {
Box::new(self.from_column.iter().map(&self.monotonic_mapping))
fn reader(&self) -> Box<dyn ColumnReader<Output> + '_> {
Box::new(MonotonicMappingColumnReader {
col_reader: self.from_column.reader(),
monotonic_mapping: &self.monotonic_mapping,
intermdiary_type: PhantomData,
})
}
// We voluntarily do not implement get_range as it yields a regression,
// and we do not have any specialized implementation anyway.
}
struct MonotonicMappingColumnReader<'a, Transform, U> {
col_reader: Box<dyn ColumnReader<U> + 'a>,
monotonic_mapping: &'a Transform,
intermdiary_type: PhantomData<U>,
}
impl<'a, U, V, Transform> ColumnReader<V> for MonotonicMappingColumnReader<'a, Transform, U>
where
U: Copy,
V: Copy,
Transform: Fn(U) -> V,
{
fn seek(&mut self, idx: u64) -> V {
let intermediary_value = self.col_reader.seek(idx);
(*self.monotonic_mapping)(intermediary_value)
}
fn advance(&mut self) -> bool {
self.col_reader.advance()
}
fn get(&self) -> V {
(*self.monotonic_mapping)(self.col_reader.get())
}
}
pub struct IterColumn<T>(T);
impl<T> From<T> for IterColumn<T>
@@ -210,7 +306,7 @@ where T: Iterator + Clone + ExactSizeIterator
impl<T> Column<T::Item> for IterColumn<T>
where
T: Iterator + Clone + ExactSizeIterator + Send + Sync,
T::Item: PartialOrd,
T::Item: PartialOrd + Copy + 'static,
{
fn get_val(&self, idx: u64) -> T::Item {
self.0.clone().nth(idx as usize).unwrap()
@@ -227,10 +323,6 @@ where
fn num_vals(&self) -> u64 {
self.0.len() as u64
}
fn iter(&self) -> Box<dyn Iterator<Item = T::Item> + '_> {
Box::new(self.0.clone())
}
}
#[cfg(test)]
@@ -263,7 +355,7 @@ mod tests {
let vals: Vec<u64> = (-1..99).map(i64::to_u64).collect();
let col = VecColumn::from(&vals);
let mapped = monotonic_map_column(col, |el| i64::from_u64(el) * 10i64);
let val_i64s: Vec<i64> = mapped.iter().collect();
let val_i64s: Vec<i64> = crate::iter_from_reader(mapped.reader()).collect();
for i in 0..100 {
assert_eq!(val_i64s[i as usize], mapped.get_val(i));
}
@@ -277,7 +369,7 @@ mod tests {
assert_eq!(mapped.min_value(), -10i64);
assert_eq!(mapped.max_value(), 980i64);
assert_eq!(mapped.num_vals(), 100);
let val_i64s: Vec<i64> = mapped.iter().collect();
let val_i64s: Vec<i64> = crate::iter_from_reader(mapped.reader()).collect();
assert_eq!(val_i64s.len(), 100);
for i in 0..100 {
assert_eq!(val_i64s[i as usize], mapped.get_val(i));

View File

@@ -22,7 +22,7 @@ use ownedbytes::OwnedBytes;
use tantivy_bitpacker::{self, BitPacker, BitUnpacker};
use crate::compact_space::build_compact_space::get_compact_space;
use crate::Column;
use crate::{iter_from_reader, Column, ColumnReader};
mod blank_range;
mod build_compact_space;
@@ -173,11 +173,14 @@ impl CompactSpaceCompressor {
/// Taking the vals as Vec may cost a lot of memory. It is used to sort the vals.
pub fn train_from(column: &impl Column<u128>) -> Self {
let mut values_sorted = BTreeSet::new();
values_sorted.extend(column.iter());
let total_num_values = column.num_vals();
values_sorted.extend(iter_from_reader(column.reader()));
let compact_space =
get_compact_space(&values_sorted, total_num_values, COST_PER_BLANK_IN_BITS);
let amplitude_compact_space = compact_space.amplitude_compact_space();
assert!(
@@ -218,11 +221,12 @@ impl CompactSpaceCompressor {
pub fn compress_into(
self,
vals: impl Iterator<Item = u128>,
mut vals: Box<dyn ColumnReader<u128> + '_>,
write: &mut impl Write,
) -> io::Result<()> {
let mut bitpacker = BitPacker::default();
for val in vals {
while vals.advance() {
let val = vals.get();
let compact = self
.params
.compact_space
@@ -300,13 +304,13 @@ impl Column<u128> for CompactSpaceDecompressor {
self.params.num_vals
}
#[inline]
fn iter(&self) -> Box<dyn Iterator<Item = u128> + '_> {
Box::new(self.iter())
}
fn get_between_vals(&self, range: RangeInclusive<u128>) -> Vec<u64> {
self.get_between_vals(range)
}
fn reader(&self) -> Box<dyn ColumnReader<u128> + '_> {
Box::new(self.specialized_reader())
}
}
impl CompactSpaceDecompressor {
@@ -410,18 +414,13 @@ impl CompactSpaceDecompressor {
positions
}
#[inline]
fn iter_compact(&self) -> impl Iterator<Item = u64> + '_ {
(0..self.params.num_vals)
.map(move |idx| self.params.bit_unpacker.get(idx as u64, &self.data) as u64)
}
#[inline]
fn iter(&self) -> impl Iterator<Item = u128> + '_ {
// TODO: Performance. It would be better to iterate on the ranges and check existence via
// the bit_unpacker.
self.iter_compact()
.map(|compact| self.compact_to_u128(compact))
fn specialized_reader(&self) -> CompactSpaceReader<'_> {
CompactSpaceReader {
data: self.data.as_slice(),
params: &self.params,
idx: 0u64,
len: self.params.num_vals,
}
}
#[inline]
@@ -439,6 +438,30 @@ impl CompactSpaceDecompressor {
}
}
pub struct CompactSpaceReader<'a> {
data: &'a [u8],
params: &'a IPCodecParams,
idx: u64,
len: u64,
}
impl<'a> ColumnReader<u128> for CompactSpaceReader<'a> {
fn seek(&mut self, target_idx: u64) -> u128 {
self.idx = target_idx;
self.get()
}
fn advance(&mut self) -> bool {
self.idx = self.idx.wrapping_add(1);
self.idx < self.len
}
fn get(&self) -> u128 {
let compact_code = self.params.bit_unpacker.get(self.idx, self.data);
self.params.compact_space.compact_to_u128(compact_code)
}
}
#[cfg(test)]
mod tests {

View File

@@ -29,7 +29,7 @@ mod serialize;
use self::bitpacked::BitpackedCodec;
use self::blockwise_linear::BlockwiseLinearCodec;
pub use self::column::{monotonic_map_column, Column, VecColumn};
pub use self::column::{iter_from_reader, monotonic_map_column, Column, ColumnReader, VecColumn};
use self::linear::LinearCodec;
pub use self::monotonic_mapping::MonotonicallyMappableToU64;
pub use self::serialize::{

View File

@@ -74,17 +74,18 @@ impl Line {
// Intercept is only computed from provided positions
fn train_from(ys: &dyn Column, positions: impl Iterator<Item = u64>) -> Self {
let num_vals = if let Some(num_vals) = NonZeroU64::new(ys.num_vals() - 1) {
num_vals
let last_idx = if let Some(last_idx) = NonZeroU64::new(ys.num_vals() - 1) {
last_idx
} else {
return Line::default();
};
let y0 = ys.get_val(0);
let y1 = ys.get_val(num_vals.get());
let mut ys_reader = ys.reader();
let y0 = ys_reader.seek(0);
let y1 = ys_reader.seek(last_idx.get());
// We first independently pick our slope.
let slope = compute_slope(y0, y1, num_vals);
let slope = compute_slope(y0, y1, last_idx);
// We picked our slope. Note that it does not have to be perfect.
// Now we need to compute the best intercept.
@@ -114,9 +115,10 @@ impl Line {
intercept: 0,
};
let heuristic_shift = y0.wrapping_sub(MID_POINT);
let mut ys_reader = ys.reader();
line.intercept = positions
.map(|pos| {
let y = ys.get_val(pos);
let y = ys_reader.seek(pos);
y.wrapping_sub(line.eval(pos))
})
.min_by_key(|&val| val.wrapping_sub(heuristic_shift))

View File

@@ -89,8 +89,7 @@ impl FastFieldCodec for LinearCodec {
assert_eq!(column.min_value(), 0);
let line = Line::train(column);
let max_offset_from_line = column
.iter()
let max_offset_from_line = crate::iter_from_reader(column.reader())
.enumerate()
.map(|(pos, actual_value)| {
let calculated_value = line.eval(pos as u64);
@@ -107,7 +106,12 @@ impl FastFieldCodec for LinearCodec {
linear_params.serialize(write)?;
let mut bit_packer = BitPacker::new();
for (pos, actual_value) in column.iter().enumerate() {
let mut col_reader = column.reader();
for pos in 0.. {
if !col_reader.advance() {
break;
}
let actual_value = col_reader.get();
let calculated_value = line.eval(pos as u64);
let offset = actual_value.wrapping_sub(calculated_value);
bit_packer.write(offset, num_bits, write)?;
@@ -134,10 +138,11 @@ impl FastFieldCodec for LinearCodec {
let line = Line::estimate(column, &sample_positions);
let mut column_reader = column.reader();
let estimated_bit_width = sample_positions
.into_iter()
.map(|pos| {
let actual_value = column.get_val(pos);
let actual_value = column_reader.seek(pos);
let interpolated_val = line.eval(pos as u64);
actual_value.wrapping_sub(interpolated_val)
})

View File

@@ -31,8 +31,8 @@ use crate::blockwise_linear::BlockwiseLinearCodec;
use crate::compact_space::CompactSpaceCompressor;
use crate::linear::LinearCodec;
use crate::{
monotonic_map_column, Column, FastFieldCodec, FastFieldCodecType, MonotonicallyMappableToU64,
VecColumn, ALL_CODEC_TYPES,
iter_from_reader, monotonic_map_column, Column, FastFieldCodec, FastFieldCodecType,
MonotonicallyMappableToU64, VecColumn, ALL_CODEC_TYPES,
};
/// The normalized header gives some parameters after applying the following
@@ -79,8 +79,9 @@ impl Header {
let num_vals = column.num_vals();
let min_value = column.min_value();
let max_value = column.max_value();
let gcd = crate::gcd::find_gcd(column.iter().map(|val| val - min_value))
.filter(|gcd| gcd.get() > 1u64);
let gcd =
crate::gcd::find_gcd(iter_from_reader(column.reader()).map(|val| val - min_value))
.filter(|gcd| gcd.get() > 1u64);
let divider = DividerU64::divide_by(gcd.map(|gcd| gcd.get()).unwrap_or(1u64));
let shifted_column = monotonic_map_column(&column, |val| divider.divide(val - min_value));
let codec_type = detect_codec(shifted_column, codecs)?;
@@ -131,7 +132,7 @@ pub fn estimate<T: MonotonicallyMappableToU64>(
) -> Option<f32> {
let column = monotonic_map_column(typed_column, T::to_u64);
let min_value = column.min_value();
let gcd = crate::gcd::find_gcd(column.iter().map(|val| val - min_value))
let gcd = crate::gcd::find_gcd(iter_from_reader(column.reader()).map(|val| val - min_value))
.filter(|gcd| gcd.get() > 1u64);
let divider = DividerU64::divide_by(gcd.map(|gcd| gcd.get()).unwrap_or(1u64));
let normalized_column = monotonic_map_column(&column, |val| divider.divide(val - min_value));
@@ -149,7 +150,7 @@ pub fn serialize_u128(
// TODO write header, to later support more codecs
let compressor = CompactSpaceCompressor::train_from(&typed_column);
compressor
.compress_into(typed_column.iter(), output)
.compress_into(typed_column.reader(), output)
.unwrap();
Ok(())
@@ -240,7 +241,8 @@ mod tests {
#[test]
fn test_serialize_deserialize() {
let original = [1u64, 5u64, 10u64];
let restored: Vec<u64> = serialize_and_load(&original[..]).iter().collect();
let restored: Vec<u64> =
crate::iter_from_reader(serialize_and_load(&original[..]).reader()).collect();
assert_eq!(&restored, &original[..]);
}

View File

@@ -41,6 +41,7 @@ mod error;
mod facet_reader;
mod multivalued;
mod readers;
mod remapped_column;
mod serializer;
mod writer;
@@ -424,7 +425,7 @@ mod tests {
permutation
}
fn test_intfastfield_permutation_with_data(permutation: Vec<u64>) -> crate::Result<()> {
fn test_intfastfield_permutation_with_data(permutation: &[u64]) -> crate::Result<()> {
let path = Path::new("test");
let n = permutation.len();
let directory = RamDirectory::create();
@@ -432,7 +433,7 @@ mod tests {
let write: WritePtr = directory.open_write(Path::new("test"))?;
let mut serializer = CompositeFastFieldSerializer::from_write(write)?;
let mut fast_field_writers = FastFieldsWriter::from_schema(&SCHEMA);
for &x in &permutation {
for &x in permutation {
fast_field_writers.add_document(&doc!(*FIELD=>x));
}
fast_field_writers.serialize(&mut serializer, &HashMap::new(), None)?;
@@ -446,7 +447,6 @@ mod tests {
.unwrap()
.read_bytes()?;
let fast_field_reader = open::<u64>(data)?;
for a in 0..n {
assert_eq!(fast_field_reader.get_val(a as u64), permutation[a as usize]);
}
@@ -455,16 +455,23 @@ mod tests {
}
#[test]
fn test_intfastfield_permutation_gcd() -> crate::Result<()> {
let permutation = generate_permutation_gcd();
test_intfastfield_permutation_with_data(permutation)?;
fn test_intfastfield_simple() -> crate::Result<()> {
let permutation = &[1, 2, 3];
test_intfastfield_permutation_with_data(&permutation[..])?;
Ok(())
}
#[test]
fn test_intfastfield_permutation() -> crate::Result<()> {
let permutation = generate_permutation();
test_intfastfield_permutation_with_data(permutation)?;
test_intfastfield_permutation_with_data(&permutation)?;
Ok(())
}
#[test]
fn test_intfastfield_permutation_gcd() -> crate::Result<()> {
let permutation = generate_permutation_gcd();
test_intfastfield_permutation_with_data(&permutation)?;
Ok(())
}

View File

@@ -1,9 +1,10 @@
mod multivalue_start_index;
mod reader;
mod writer;
pub(crate) use self::multivalue_start_index::MultivalueStartIndex;
pub use self::reader::MultiValuedFastFieldReader;
pub use self::writer::MultiValuedFastFieldWriter;
pub(crate) use self::writer::MultivalueStartIndex;
#[cfg(test)]
mod tests {

View File

@@ -0,0 +1,195 @@
use fastfield_codecs::{Column, ColumnReader};
use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::DocId;
pub(crate) struct MultivalueStartIndex<'a, C: Column> {
column: &'a C,
doc_id_map: &'a DocIdMapping,
min_value: u64,
max_value: u64,
}
struct MultivalueStartIndexReader<'a, C: Column> {
column: &'a C,
doc_id_map: &'a DocIdMapping,
idx: u64,
val: u64,
len: u64,
}
impl<'a, C: Column> MultivalueStartIndexReader<'a, C> {
fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
Self {
column,
doc_id_map,
idx: u64::MAX,
val: 0,
len: doc_id_map.num_new_doc_ids() as u64 + 1,
}
}
fn reset(&mut self) {
self.idx = u64::MAX;
self.val = 0;
}
}
impl<'a, C: Column> ColumnReader for MultivalueStartIndexReader<'a, C> {
fn seek(&mut self, idx: u64) -> u64 {
if self.idx > idx {
self.reset();
self.advance();
}
for _ in self.idx..idx {
self.advance();
}
self.get()
}
fn advance(&mut self) -> bool {
if self.idx == u64::MAX {
self.idx = 0;
self.val = 0;
return true;
}
let new_doc_id: DocId = self.idx as DocId;
self.idx += 1;
if self.idx >= self.len {
self.idx = self.len;
return false;
}
let old_doc: DocId = self.doc_id_map.get_old_doc_id(new_doc_id);
let num_vals_for_doc =
self.column.get_val(old_doc as u64 + 1) - self.column.get_val(old_doc as u64);
self.val += num_vals_for_doc;
true
}
fn get(&self) -> u64 {
self.val
}
}
impl<'a, C: Column> MultivalueStartIndex<'a, C> {
pub fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
assert_eq!(column.num_vals(), doc_id_map.num_old_doc_ids() as u64 + 1);
let iter = MultivalueStartIndexIter::new(column, doc_id_map);
let (min_value, max_value) = tantivy_bitpacker::minmax(iter).unwrap_or((0, 0));
MultivalueStartIndex {
column,
doc_id_map,
min_value,
max_value,
}
}
fn specialized_reader(&self) -> MultivalueStartIndexReader<'a, C> {
MultivalueStartIndexReader::new(self.column, self.doc_id_map)
}
}
impl<'a, C: Column> Column for MultivalueStartIndex<'a, C> {
fn reader(&self) -> Box<dyn ColumnReader + '_> {
Box::new(self.specialized_reader())
}
fn get_val(&self, idx: u64) -> u64 {
let mut reader = self.specialized_reader();
reader.seek(idx)
}
fn min_value(&self) -> u64 {
self.min_value
}
fn max_value(&self) -> u64 {
self.max_value
}
fn num_vals(&self) -> u64 {
(self.doc_id_map.num_new_doc_ids() + 1) as u64
}
}
struct MultivalueStartIndexIter<'a, C: Column> {
column: &'a C,
doc_id_map: &'a DocIdMapping,
new_doc_id: usize,
offset: u64,
}
impl<'a, C: Column> MultivalueStartIndexIter<'a, C> {
fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
Self {
column,
doc_id_map,
new_doc_id: 0,
offset: 0,
}
}
}
impl<'a, C: Column> Iterator for MultivalueStartIndexIter<'a, C> {
type Item = u64;
fn next(&mut self) -> Option<Self::Item> {
if self.new_doc_id > self.doc_id_map.num_new_doc_ids() {
return None;
}
let new_doc_id = self.new_doc_id;
self.new_doc_id += 1;
let start_offset = self.offset;
if new_doc_id < self.doc_id_map.num_new_doc_ids() {
let old_doc = self.doc_id_map.get_old_doc_id(new_doc_id as u32) as u64;
let num_vals_for_doc = self.column.get_val(old_doc + 1) - self.column.get_val(old_doc);
self.offset += num_vals_for_doc;
}
Some(start_offset)
}
}
#[cfg(test)]
mod tests {
use fastfield_codecs::VecColumn;
use super::*;
#[test]
fn test_multivalue_start_index() {
let doc_id_mapping = DocIdMapping::from_new_id_to_old_id(vec![4, 1, 2]);
assert_eq!(doc_id_mapping.num_old_doc_ids(), 5);
let col = VecColumn::from(&[0u64, 3, 5, 10, 12, 16][..]);
let multivalue_start_index = MultivalueStartIndex::new(
&col, // 3, 2, 5, 2, 4
&doc_id_mapping,
);
assert_eq!(multivalue_start_index.num_vals(), 4);
assert_eq!(
fastfield_codecs::iter_from_reader(multivalue_start_index.reader())
.collect::<Vec<u64>>(),
vec![0, 4, 6, 11]
); // 4, 2, 5
}
#[test]
fn test_multivalue_get_vals() {
let doc_id_mapping =
DocIdMapping::from_new_id_to_old_id(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
assert_eq!(doc_id_mapping.num_old_doc_ids(), 10);
let col = VecColumn::from(&[0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55][..]);
let multivalue_start_index = MultivalueStartIndex::new(&col, &doc_id_mapping);
assert_eq!(
fastfield_codecs::iter_from_reader(multivalue_start_index.reader())
.collect::<Vec<u64>>(),
vec![0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55]
);
assert_eq!(multivalue_start_index.num_vals(), 11);
let mut multivalue_start_index_reader = multivalue_start_index.reader();
assert_eq!(multivalue_start_index_reader.seek(3), 2);
assert_eq!(multivalue_start_index_reader.seek(5), 5);
assert_eq!(multivalue_start_index_reader.seek(8), 21);
assert_eq!(multivalue_start_index_reader.seek(4), 3);
assert_eq!(multivalue_start_index_reader.seek(0), 0);
assert_eq!(multivalue_start_index_reader.seek(10), 55);
}
}

View File

@@ -1,10 +1,11 @@
use std::io;
use std::sync::Mutex;
use fastfield_codecs::{Column, MonotonicallyMappableToU64, VecColumn};
use fastfield_codecs::{MonotonicallyMappableToU64, VecColumn};
use fnv::FnvHashMap;
use crate::fastfield::{value_to_u64, CompositeFastFieldSerializer, FastFieldType};
use crate::fastfield::{
value_to_u64, CompositeFastFieldSerializer, FastFieldType, MultivalueStartIndex,
};
use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::postings::UnorderedTermId;
use crate::schema::{Document, Field, Value};
@@ -200,155 +201,3 @@ impl MultiValuedFastFieldWriter {
Ok(())
}
}
pub(crate) struct MultivalueStartIndex<'a, C: Column> {
column: &'a C,
doc_id_map: &'a DocIdMapping,
min_max_opt: Mutex<Option<(u64, u64)>>,
random_seeker: Mutex<MultivalueStartIndexRandomSeeker<'a, C>>,
}
struct MultivalueStartIndexRandomSeeker<'a, C: Column> {
seek_head: MultivalueStartIndexIter<'a, C>,
seek_next_id: u64,
}
impl<'a, C: Column> MultivalueStartIndexRandomSeeker<'a, C> {
fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
Self {
seek_head: MultivalueStartIndexIter {
column,
doc_id_map,
new_doc_id: 0,
offset: 0u64,
},
seek_next_id: 0u64,
}
}
}
impl<'a, C: Column> MultivalueStartIndex<'a, C> {
pub fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
assert_eq!(column.num_vals(), doc_id_map.num_old_doc_ids() as u64 + 1);
MultivalueStartIndex {
column,
doc_id_map,
min_max_opt: Mutex::default(),
random_seeker: Mutex::new(MultivalueStartIndexRandomSeeker::new(column, doc_id_map)),
}
}
fn minmax(&self) -> (u64, u64) {
if let Some((min, max)) = *self.min_max_opt.lock().unwrap() {
return (min, max);
}
let (min, max) = tantivy_bitpacker::minmax(self.iter()).unwrap_or((0u64, 0u64));
*self.min_max_opt.lock().unwrap() = Some((min, max));
(min, max)
}
}
impl<'a, C: Column> Column for MultivalueStartIndex<'a, C> {
fn get_val(&self, idx: u64) -> u64 {
let mut random_seeker_lock = self.random_seeker.lock().unwrap();
if random_seeker_lock.seek_next_id > idx {
*random_seeker_lock =
MultivalueStartIndexRandomSeeker::new(self.column, self.doc_id_map);
}
let to_skip = idx - random_seeker_lock.seek_next_id;
random_seeker_lock.seek_next_id = idx + 1;
random_seeker_lock.seek_head.nth(to_skip as usize).unwrap()
}
fn min_value(&self) -> u64 {
self.minmax().0
}
fn max_value(&self) -> u64 {
self.minmax().1
}
fn num_vals(&self) -> u64 {
(self.doc_id_map.num_new_doc_ids() + 1) as u64
}
fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = u64> + 'b> {
Box::new(MultivalueStartIndexIter::new(self.column, self.doc_id_map))
}
}
struct MultivalueStartIndexIter<'a, C: Column> {
pub column: &'a C,
pub doc_id_map: &'a DocIdMapping,
pub new_doc_id: usize,
pub offset: u64,
}
impl<'a, C: Column> MultivalueStartIndexIter<'a, C> {
fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
Self {
column,
doc_id_map,
new_doc_id: 0,
offset: 0,
}
}
}
impl<'a, C: Column> Iterator for MultivalueStartIndexIter<'a, C> {
type Item = u64;
fn next(&mut self) -> Option<Self::Item> {
if self.new_doc_id > self.doc_id_map.num_new_doc_ids() {
return None;
}
let new_doc_id = self.new_doc_id;
self.new_doc_id += 1;
let start_offset = self.offset;
if new_doc_id < self.doc_id_map.num_new_doc_ids() {
let old_doc = self.doc_id_map.get_old_doc_id(new_doc_id as u32) as u64;
let num_vals_for_doc = self.column.get_val(old_doc + 1) - self.column.get_val(old_doc);
self.offset += num_vals_for_doc;
}
Some(start_offset)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_multivalue_start_index() {
let doc_id_mapping = DocIdMapping::from_new_id_to_old_id(vec![4, 1, 2]);
assert_eq!(doc_id_mapping.num_old_doc_ids(), 5);
let col = VecColumn::from(&[0u64, 3, 5, 10, 12, 16][..]);
let multivalue_start_index = MultivalueStartIndex::new(
&col, // 3, 2, 5, 2, 4
&doc_id_mapping,
);
assert_eq!(multivalue_start_index.num_vals(), 4);
assert_eq!(
multivalue_start_index.iter().collect::<Vec<u64>>(),
vec![0, 4, 6, 11]
); // 4, 2, 5
}
#[test]
fn test_multivalue_get_vals() {
let doc_id_mapping =
DocIdMapping::from_new_id_to_old_id(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
assert_eq!(doc_id_mapping.num_old_doc_ids(), 10);
let col = VecColumn::from(&[0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55][..]);
let multivalue_start_index = MultivalueStartIndex::new(&col, &doc_id_mapping);
assert_eq!(
multivalue_start_index.iter().collect::<Vec<u64>>(),
vec![0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55]
);
assert_eq!(multivalue_start_index.num_vals(), 11);
assert_eq!(multivalue_start_index.get_val(3), 2);
assert_eq!(multivalue_start_index.get_val(5), 5);
assert_eq!(multivalue_start_index.get_val(8), 21);
assert_eq!(multivalue_start_index.get_val(4), 3);
assert_eq!(multivalue_start_index.get_val(0), 0);
assert_eq!(multivalue_start_index.get_val(10), 55);
}
}

View File

@@ -0,0 +1,112 @@
use fastfield_codecs::{Column, ColumnReader};
use tantivy_bitpacker::BlockedBitpacker;
use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::DocId;
#[derive(Clone)]
pub(crate) struct WriterFastFieldColumn<'map, 'bitp> {
pub(crate) doc_id_mapping_opt: Option<&'map DocIdMapping>,
pub(crate) vals: &'bitp BlockedBitpacker,
pub(crate) min_value: u64,
pub(crate) max_value: u64,
pub(crate) num_vals: u64,
}
impl<'map, 'bitp> Column for WriterFastFieldColumn<'map, 'bitp> {
/// Return the value associated to the given doc.
///
/// Whenever possible use the Iterator passed to the fastfield creation instead, for performance
/// reasons.
///
/// # Panics
///
/// May panic if `doc` is greater than the index.
fn get_val(&self, doc: u64) -> u64 {
if let Some(doc_id_map) = self.doc_id_mapping_opt {
self.vals
.get(doc_id_map.get_old_doc_id(doc as u32) as usize) // consider extra
// FastFieldReader wrapper for
// non doc_id_map
} else {
self.vals.get(doc as usize)
}
}
fn reader(&self) -> Box<dyn ColumnReader + '_> {
if let Some(doc_id_mapping) = self.doc_id_mapping_opt {
Box::new(RemappedColumnReader {
doc_id_mapping,
vals: self.vals,
idx: u64::MAX,
len: doc_id_mapping.num_new_doc_ids() as u64,
})
} else {
Box::new(BitpackedColumnReader {
vals: self.vals,
idx: u64::MAX,
len: self.num_vals,
})
}
}
fn min_value(&self) -> u64 {
self.min_value
}
fn max_value(&self) -> u64 {
self.max_value
}
fn num_vals(&self) -> u64 {
self.num_vals
}
}
struct RemappedColumnReader<'a> {
doc_id_mapping: &'a DocIdMapping,
vals: &'a BlockedBitpacker,
idx: u64,
len: u64,
}
impl<'a> ColumnReader for RemappedColumnReader<'a> {
fn seek(&mut self, target_idx: u64) -> u64 {
assert!(target_idx < self.len);
self.idx = target_idx;
self.get()
}
fn advance(&mut self) -> bool {
self.idx = self.idx.wrapping_add(1);
self.idx < self.len
}
fn get(&self) -> u64 {
let old_doc_id: DocId = self.doc_id_mapping.get_old_doc_id(self.idx as DocId);
self.vals.get(old_doc_id as usize)
}
}
struct BitpackedColumnReader<'a> {
vals: &'a BlockedBitpacker,
idx: u64,
len: u64,
}
impl<'a> ColumnReader for BitpackedColumnReader<'a> {
fn seek(&mut self, target_idx: u64) -> u64 {
assert!(target_idx < self.len);
self.idx = target_idx;
self.get()
}
fn advance(&mut self) -> bool {
self.idx = self.idx.wrapping_add(1);
self.idx < self.len
}
fn get(&self) -> u64 {
self.vals.get(self.idx as usize)
}
}

View File

@@ -2,12 +2,13 @@ use std::collections::HashMap;
use std::io;
use common;
use fastfield_codecs::{Column, MonotonicallyMappableToU64};
use fastfield_codecs::MonotonicallyMappableToU64;
use fnv::FnvHashMap;
use tantivy_bitpacker::BlockedBitpacker;
use super::multivalued::MultiValuedFastFieldWriter;
use super::FastFieldType;
use crate::fastfield::remapped_column::WriterFastFieldColumn;
use crate::fastfield::{BytesFastFieldWriter, CompositeFastFieldSerializer};
use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::postings::UnorderedTermId;
@@ -351,7 +352,7 @@ impl IntFastFieldWriter {
pub fn serialize(
&self,
serializer: &mut CompositeFastFieldSerializer,
doc_id_map: Option<&DocIdMapping>,
doc_id_mapping_opt: Option<&DocIdMapping>,
) -> io::Result<()> {
let (min, max) = if self.val_min > self.val_max {
(0, 0)
@@ -359,8 +360,8 @@ impl IntFastFieldWriter {
(self.val_min, self.val_max)
};
let fastfield_accessor = WriterFastFieldAccessProvider {
doc_id_map,
let fastfield_accessor = WriterFastFieldColumn {
doc_id_mapping_opt,
vals: &self.vals,
min_value: min,
max_value: max,
@@ -372,57 +373,3 @@ impl IntFastFieldWriter {
Ok(())
}
}
#[derive(Clone)]
struct WriterFastFieldAccessProvider<'map, 'bitp> {
doc_id_map: Option<&'map DocIdMapping>,
vals: &'bitp BlockedBitpacker,
min_value: u64,
max_value: u64,
num_vals: u64,
}
impl<'map, 'bitp> Column for WriterFastFieldAccessProvider<'map, 'bitp> {
/// Return the value associated to the given doc.
///
/// Whenever possible use the Iterator passed to the fastfield creation instead, for performance
/// reasons.
///
/// # Panics
///
/// May panic if `doc` is greater than the index.
fn get_val(&self, doc: u64) -> u64 {
if let Some(doc_id_map) = self.doc_id_map {
self.vals
.get(doc_id_map.get_old_doc_id(doc as u32) as usize) // consider extra
// FastFieldReader wrapper for
// non doc_id_map
} else {
self.vals.get(doc as usize)
}
}
fn iter(&self) -> Box<dyn Iterator<Item = u64> + '_> {
if let Some(doc_id_map) = self.doc_id_map {
Box::new(
doc_id_map
.iter_old_doc_ids()
.map(|doc_id| self.vals.get(doc_id as usize)),
)
} else {
Box::new(self.vals.iter())
}
}
fn min_value(&self) -> u64 {
self.min_value
}
fn max_value(&self) -> u64 {
self.max_value
}
fn num_vals(&self) -> u64 {
self.num_vals
}
}

View File

@@ -1,11 +1,11 @@
use std::sync::Arc;
use fastfield_codecs::Column;
use fastfield_codecs::{Column, ColumnReader};
use itertools::Itertools;
use crate::indexer::doc_id_mapping::SegmentDocIdMapping;
use crate::schema::Field;
use crate::{DocAddress, SegmentReader};
use crate::{DocAddress, DocId, SegmentReader};
pub(crate) struct SortedDocIdColumn<'a> {
doc_id_mapping: &'a SegmentDocIdMapping,
@@ -87,17 +87,14 @@ impl<'a> Column for SortedDocIdColumn<'a> {
self.fast_field_readers[segment_ord as usize].get_val(doc_id as u64)
}
fn iter(&self) -> Box<dyn Iterator<Item = u64> + '_> {
Box::new(
self.doc_id_mapping
.iter_old_doc_addrs()
.map(|old_doc_addr| {
let fast_field_reader =
&self.fast_field_readers[old_doc_addr.segment_ord as usize];
fast_field_reader.get_val(old_doc_addr.doc_id as u64)
}),
)
fn reader(&self) -> Box<dyn ColumnReader<u64> + '_> {
Box::new(SortedDocIdColumnReader {
doc_id_mapping: self.doc_id_mapping,
fast_field_readers: &self.fast_field_readers[..],
new_doc_id: u32::MAX,
})
}
fn min_value(&self) -> u64 {
self.min_value
}
@@ -110,3 +107,27 @@ impl<'a> Column for SortedDocIdColumn<'a> {
self.num_vals
}
}
struct SortedDocIdColumnReader<'a> {
doc_id_mapping: &'a SegmentDocIdMapping,
fast_field_readers: &'a [Arc<dyn Column>],
new_doc_id: DocId,
}
impl<'a> ColumnReader for SortedDocIdColumnReader<'a> {
fn seek(&mut self, target_idx: u64) -> u64 {
assert!(target_idx < self.doc_id_mapping.len() as u64);
self.new_doc_id = target_idx as u32;
self.get()
}
fn advance(&mut self) -> bool {
self.new_doc_id = self.new_doc_id.wrapping_add(1);
self.new_doc_id < self.doc_id_mapping.len() as u32
}
fn get(&self) -> u64 {
let old_doc = self.doc_id_mapping.get_old_doc_addr(self.new_doc_id);
self.fast_field_readers[old_doc.segment_ord as usize].get_val(old_doc.doc_id as u64)
}
}

View File

@@ -1,6 +1,6 @@
use std::cmp;
use fastfield_codecs::Column;
use fastfield_codecs::{Column, ColumnReader};
use crate::fastfield::{MultiValueLength, MultiValuedFastFieldReader};
use crate::indexer::doc_id_mapping::SegmentDocIdMapping;
@@ -95,18 +95,6 @@ impl<'a> Column for SortedDocIdMultiValueColumn<'a> {
vals[pos_in_values as usize]
}
fn iter(&self) -> Box<dyn Iterator<Item = u64> + '_> {
Box::new(
self.doc_id_mapping
.iter_old_doc_addrs()
.flat_map(|old_doc_addr| {
let ff_reader = &self.fast_field_readers[old_doc_addr.segment_ord as usize];
let mut vals = Vec::new();
ff_reader.get_vals(old_doc_addr.doc_id, &mut vals);
vals.into_iter()
}),
)
}
fn min_value(&self) -> u64 {
self.min_value
}
@@ -118,4 +106,80 @@ impl<'a> Column for SortedDocIdMultiValueColumn<'a> {
fn num_vals(&self) -> u64 {
self.num_vals
}
fn reader(&self) -> Box<dyn ColumnReader<u64> + '_> {
let mut reader = SortedDocMultiValueColumnReader {
doc_id_mapping: self.doc_id_mapping,
fast_field_readers: &self.fast_field_readers[..],
new_doc_id: u32::MAX,
in_buffer_idx: 0,
buffer: Vec::new(),
idx: u64::MAX,
};
reader.reset();
Box::new(reader)
}
}
struct SortedDocMultiValueColumnReader<'a> {
doc_id_mapping: &'a SegmentDocIdMapping,
fast_field_readers: &'a [MultiValuedFastFieldReader<u64>],
new_doc_id: DocId,
in_buffer_idx: usize,
buffer: Vec<u64>,
idx: u64,
}
impl<'a> SortedDocMultiValueColumnReader<'a> {
fn fill(&mut self) {
let old_doc = self.doc_id_mapping.get_old_doc_addr(self.new_doc_id);
let ff_reader = &self.fast_field_readers[old_doc.segment_ord as usize];
ff_reader.get_vals(old_doc.doc_id, &mut self.buffer);
self.in_buffer_idx = 0;
}
fn reset(&mut self) {
self.buffer.clear();
self.idx = u64::MAX;
self.in_buffer_idx = 0;
self.new_doc_id = u32::MAX;
}
}
impl<'a> ColumnReader for SortedDocMultiValueColumnReader<'a> {
fn seek(&mut self, target_idx: u64) -> u64 {
if target_idx < self.idx {
self.reset();
self.advance();
}
for _ in self.idx..target_idx {
// TODO could be optimized.
assert!(self.advance());
}
self.get()
}
fn advance(&mut self) -> bool {
loop {
self.in_buffer_idx += 1;
if self.in_buffer_idx < self.buffer.len() {
self.idx = self.idx.wrapping_add(1);
return true;
}
self.new_doc_id = self.new_doc_id.wrapping_add(1);
if self.new_doc_id >= self.doc_id_mapping.len() as u32 {
return false;
}
self.fill();
if !self.buffer.is_empty() {
self.idx = self.idx.wrapping_add(1);
return true;
}
}
}
fn get(&self) -> u64 {
self.buffer[self.in_buffer_idx]
}
}