Compare commits

...

1 Commits

Author SHA1 Message Date
Paul Masurel
85ebb3c420 Introducing ColumnReader.
Introducing a ColumnReader trait and .reader() to Column,
hence removing the dreaded Mutex in the `MultiValueStartIndex`
thingy.
2022-09-21 12:47:44 +09:00
7 changed files with 256 additions and 166 deletions

View File

@@ -4,6 +4,11 @@ use std::ops::RangeInclusive;
use tantivy_bitpacker::minmax; use tantivy_bitpacker::minmax;
pub trait Column<T: PartialOrd = u64>: Send + Sync { pub trait Column<T: PartialOrd = u64>: Send + Sync {
/// Return a `ColumnReader`.
fn reader(&self) -> Box<dyn ColumnReader<T> + '_> {
Box::new(ColumnReaderAdapter { column: self })
}
/// Return the value associated to the given idx. /// Return the value associated to the given idx.
/// ///
/// This accessor should return as fast as possible. /// This accessor should return as fast as possible.
@@ -11,6 +16,8 @@ pub trait Column<T: PartialOrd = u64>: Send + Sync {
/// # Panics /// # Panics
/// ///
/// May panic if `idx` is greater than the column length. /// May panic if `idx` is greater than the column length.
///
/// TODO remove to force people to use `.reader()`.
fn get_val(&self, idx: u64) -> T; fn get_val(&self, idx: u64) -> T;
/// Fills an output buffer with the fast field values /// Fills an output buffer with the fast field values
@@ -60,11 +67,40 @@ pub trait Column<T: PartialOrd = u64>: Send + Sync {
fn num_vals(&self) -> u64; fn num_vals(&self) -> u64;
/// Returns a iterator over the data /// Returns a iterator over the data
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = T> + 'a> { ///
/// TODO get rid of `.iter()` and extend ColumnReader instead.
fn iter(&self) -> Box<dyn Iterator<Item = T> + '_> {
Box::new((0..self.num_vals()).map(|idx| self.get_val(idx))) Box::new((0..self.num_vals()).map(|idx| self.get_val(idx)))
} }
} }
/// `ColumnReader` makes it possible to read forward through a column.
///
/// TODO add methods to make it possible to scan the column and replace `.iter()`
pub trait ColumnReader<T = u64> {
fn seek(&mut self, idx: u64) -> T;
}
pub(crate) struct ColumnReaderAdapter<'a, C: ?Sized> {
column: &'a C,
}
impl<'a, C: ?Sized> From<&'a C> for ColumnReaderAdapter<'a, C> {
fn from(column: &'a C) -> Self {
ColumnReaderAdapter { column }
}
}
impl<'a, T, C: ?Sized> ColumnReader<T> for ColumnReaderAdapter<'a, C>
where
C: Column<T>,
T: PartialOrd<T>,
{
fn seek(&mut self, idx: u64) -> T {
self.column.get_val(idx)
}
}
pub struct VecColumn<'a, T = u64> { pub struct VecColumn<'a, T = u64> {
values: &'a [T], values: &'a [T],
min_value: T, min_value: T,
@@ -88,7 +124,11 @@ impl<'a, C: Column<T>, T: Copy + PartialOrd> Column<T> for &'a C {
(*self).num_vals() (*self).num_vals()
} }
fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = T> + 'b> { fn reader(&self) -> Box<dyn ColumnReader<T> + '_> {
(*self).reader()
}
fn iter(&self) -> Box<dyn Iterator<Item = T> + '_> {
(*self).iter() (*self).iter()
} }
@@ -193,10 +233,36 @@ where
Box::new(self.from_column.iter().map(&self.monotonic_mapping)) Box::new(self.from_column.iter().map(&self.monotonic_mapping))
} }
fn reader(&self) -> Box<dyn ColumnReader<Output> + '_> {
Box::new(MonotonicMappingColumnReader {
col_reader: ColumnReaderAdapter::from(&self.from_column),
monotonic_mapping: &self.monotonic_mapping,
intermdiary_type: PhantomData,
})
}
// We voluntarily do not implement get_range as it yields a regression, // We voluntarily do not implement get_range as it yields a regression,
// and we do not have any specialized implementation anyway. // and we do not have any specialized implementation anyway.
} }
struct MonotonicMappingColumnReader<'a, ColR, Transform, U> {
col_reader: ColR,
monotonic_mapping: &'a Transform,
intermdiary_type: PhantomData<U>,
}
impl<'a, U, V, ColR, Transform> ColumnReader<V>
for MonotonicMappingColumnReader<'a, ColR, Transform, U>
where
ColR: ColumnReader<U> + 'a,
Transform: Fn(U) -> V,
{
fn seek(&mut self, idx: u64) -> V {
let intermediary_value = self.col_reader.seek(idx);
(*self.monotonic_mapping)(intermediary_value)
}
}
pub struct IterColumn<T>(T); pub struct IterColumn<T>(T);
impl<T> From<T> for IterColumn<T> impl<T> From<T> for IterColumn<T>

View File

@@ -29,7 +29,7 @@ mod serialize;
use self::bitpacked::BitpackedCodec; use self::bitpacked::BitpackedCodec;
use self::blockwise_linear::BlockwiseLinearCodec; use self::blockwise_linear::BlockwiseLinearCodec;
pub use self::column::{monotonic_map_column, Column, VecColumn}; pub use self::column::{monotonic_map_column, Column, ColumnReader, VecColumn};
use self::linear::LinearCodec; use self::linear::LinearCodec;
pub use self::monotonic_mapping::MonotonicallyMappableToU64; pub use self::monotonic_mapping::MonotonicallyMappableToU64;
pub use self::serialize::{ pub use self::serialize::{

View File

@@ -74,17 +74,18 @@ impl Line {
// Intercept is only computed from provided positions // Intercept is only computed from provided positions
fn train_from(ys: &dyn Column, positions: impl Iterator<Item = u64>) -> Self { fn train_from(ys: &dyn Column, positions: impl Iterator<Item = u64>) -> Self {
let num_vals = if let Some(num_vals) = NonZeroU64::new(ys.num_vals() - 1) { let last_idx = if let Some(last_idx) = NonZeroU64::new(ys.num_vals() - 1) {
num_vals last_idx
} else { } else {
return Line::default(); return Line::default();
}; };
let y0 = ys.get_val(0); let mut ys_reader = ys.reader();
let y1 = ys.get_val(num_vals.get()); let y0 = ys_reader.seek(0);
let y1 = ys_reader.seek(last_idx.get());
// We first independently pick our slope. // We first independently pick our slope.
let slope = compute_slope(y0, y1, num_vals); let slope = compute_slope(y0, y1, last_idx);
// We picked our slope. Note that it does not have to be perfect. // We picked our slope. Note that it does not have to be perfect.
// Now we need to compute the best intercept. // Now we need to compute the best intercept.
@@ -114,9 +115,10 @@ impl Line {
intercept: 0, intercept: 0,
}; };
let heuristic_shift = y0.wrapping_sub(MID_POINT); let heuristic_shift = y0.wrapping_sub(MID_POINT);
let mut ys_reader = ys.reader();
line.intercept = positions line.intercept = positions
.map(|pos| { .map(|pos| {
let y = ys.get_val(pos); let y = ys_reader.seek(pos);
y.wrapping_sub(line.eval(pos)) y.wrapping_sub(line.eval(pos))
}) })
.min_by_key(|&val| val.wrapping_sub(heuristic_shift)) .min_by_key(|&val| val.wrapping_sub(heuristic_shift))

View File

@@ -134,10 +134,11 @@ impl FastFieldCodec for LinearCodec {
let line = Line::estimate(column, &sample_positions); let line = Line::estimate(column, &sample_positions);
let mut column_reader = column.reader();
let estimated_bit_width = sample_positions let estimated_bit_width = sample_positions
.into_iter() .into_iter()
.map(|pos| { .map(|pos| {
let actual_value = column.get_val(pos); let actual_value = column_reader.seek(pos);
let interpolated_val = line.eval(pos as u64); let interpolated_val = line.eval(pos as u64);
actual_value.wrapping_sub(interpolated_val) actual_value.wrapping_sub(interpolated_val)
}) })

View File

@@ -1,9 +1,10 @@
mod multivalue_start_index;
mod reader; mod reader;
mod writer; mod writer;
pub(crate) use self::multivalue_start_index::MultivalueStartIndex;
pub use self::reader::MultiValuedFastFieldReader; pub use self::reader::MultiValuedFastFieldReader;
pub use self::writer::MultiValuedFastFieldWriter; pub use self::writer::MultiValuedFastFieldWriter;
pub(crate) use self::writer::MultivalueStartIndex;
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {

View File

@@ -0,0 +1,171 @@
use fastfield_codecs::{Column, ColumnReader};
use crate::indexer::doc_id_mapping::DocIdMapping;
pub(crate) struct MultivalueStartIndex<'a, C: Column> {
column: &'a C,
doc_id_map: &'a DocIdMapping,
min_value: u64,
max_value: u64,
}
struct MultivalueStartIndexReader<'a, C: Column> {
seek_head: MultivalueStartIndexIter<'a, C>,
seek_next_id: u64,
}
impl<'a, C: Column> MultivalueStartIndexReader<'a, C> {
fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
Self {
seek_head: MultivalueStartIndexIter {
column,
doc_id_map,
new_doc_id: 0,
offset: 0u64,
},
seek_next_id: 0u64,
}
}
fn reset(&mut self) {
self.seek_next_id = 0;
self.seek_head.new_doc_id = 0;
self.seek_head.offset = 0;
}
}
impl<'a, C: Column> ColumnReader for MultivalueStartIndexReader<'a, C> {
fn seek(&mut self, idx: u64) -> u64 {
if self.seek_next_id > idx {
self.reset();
}
let to_skip = idx - self.seek_next_id;
self.seek_next_id = idx + 1;
self.seek_head.nth(to_skip as usize).unwrap()
}
}
impl<'a, C: Column> MultivalueStartIndex<'a, C> {
pub fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
assert_eq!(column.num_vals(), doc_id_map.num_old_doc_ids() as u64 + 1);
let iter = MultivalueStartIndexIter::new(column, doc_id_map);
let (min_value, max_value) = tantivy_bitpacker::minmax(iter).unwrap_or((0, 0));
MultivalueStartIndex {
column,
doc_id_map,
min_value,
max_value,
}
}
fn specialized_reader(&self) -> MultivalueStartIndexReader<'a, C> {
MultivalueStartIndexReader::new(self.column, self.doc_id_map)
}
}
impl<'a, C: Column> Column for MultivalueStartIndex<'a, C> {
fn reader(&self) -> Box<dyn ColumnReader + '_> {
Box::new(self.specialized_reader())
}
fn get_val(&self, idx: u64) -> u64 {
let mut reader = self.specialized_reader();
reader.seek(idx)
}
fn min_value(&self) -> u64 {
self.min_value
}
fn max_value(&self) -> u64 {
self.max_value
}
fn num_vals(&self) -> u64 {
(self.doc_id_map.num_new_doc_ids() + 1) as u64
}
fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = u64> + 'b> {
Box::new(MultivalueStartIndexIter::new(self.column, self.doc_id_map))
}
}
struct MultivalueStartIndexIter<'a, C: Column> {
column: &'a C,
doc_id_map: &'a DocIdMapping,
new_doc_id: usize,
offset: u64,
}
impl<'a, C: Column> MultivalueStartIndexIter<'a, C> {
fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
Self {
column,
doc_id_map,
new_doc_id: 0,
offset: 0,
}
}
}
impl<'a, C: Column> Iterator for MultivalueStartIndexIter<'a, C> {
type Item = u64;
fn next(&mut self) -> Option<Self::Item> {
if self.new_doc_id > self.doc_id_map.num_new_doc_ids() {
return None;
}
let new_doc_id = self.new_doc_id;
self.new_doc_id += 1;
let start_offset = self.offset;
if new_doc_id < self.doc_id_map.num_new_doc_ids() {
let old_doc = self.doc_id_map.get_old_doc_id(new_doc_id as u32) as u64;
let num_vals_for_doc = self.column.get_val(old_doc + 1) - self.column.get_val(old_doc);
self.offset += num_vals_for_doc;
}
Some(start_offset)
}
}
#[cfg(test)]
mod tests {
use fastfield_codecs::VecColumn;
use super::*;
#[test]
fn test_multivalue_start_index() {
let doc_id_mapping = DocIdMapping::from_new_id_to_old_id(vec![4, 1, 2]);
assert_eq!(doc_id_mapping.num_old_doc_ids(), 5);
let col = VecColumn::from(&[0u64, 3, 5, 10, 12, 16][..]);
let multivalue_start_index = MultivalueStartIndex::new(
&col, // 3, 2, 5, 2, 4
&doc_id_mapping,
);
assert_eq!(multivalue_start_index.num_vals(), 4);
assert_eq!(
multivalue_start_index.iter().collect::<Vec<u64>>(),
vec![0, 4, 6, 11]
); // 4, 2, 5
}
#[test]
fn test_multivalue_get_vals() {
let doc_id_mapping =
DocIdMapping::from_new_id_to_old_id(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
assert_eq!(doc_id_mapping.num_old_doc_ids(), 10);
let col = VecColumn::from(&[0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55][..]);
let multivalue_start_index = MultivalueStartIndex::new(&col, &doc_id_mapping);
assert_eq!(
multivalue_start_index.iter().collect::<Vec<u64>>(),
vec![0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55]
);
assert_eq!(multivalue_start_index.num_vals(), 11);
let mut multivalue_start_index_reader = multivalue_start_index.reader();
assert_eq!(multivalue_start_index_reader.seek(3), 2);
assert_eq!(multivalue_start_index_reader.seek(5), 5);
assert_eq!(multivalue_start_index_reader.seek(8), 21);
assert_eq!(multivalue_start_index_reader.seek(4), 3);
assert_eq!(multivalue_start_index_reader.seek(0), 0);
assert_eq!(multivalue_start_index_reader.seek(10), 55);
}
}

View File

@@ -1,10 +1,11 @@
use std::io; use std::io;
use std::sync::Mutex;
use fastfield_codecs::{Column, MonotonicallyMappableToU64, VecColumn}; use fastfield_codecs::{MonotonicallyMappableToU64, VecColumn};
use fnv::FnvHashMap; use fnv::FnvHashMap;
use crate::fastfield::{value_to_u64, CompositeFastFieldSerializer, FastFieldType}; use crate::fastfield::{
value_to_u64, CompositeFastFieldSerializer, FastFieldType, MultivalueStartIndex,
};
use crate::indexer::doc_id_mapping::DocIdMapping; use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::postings::UnorderedTermId; use crate::postings::UnorderedTermId;
use crate::schema::{Document, Field, Value}; use crate::schema::{Document, Field, Value};
@@ -200,155 +201,3 @@ impl MultiValuedFastFieldWriter {
Ok(()) Ok(())
} }
} }
pub(crate) struct MultivalueStartIndex<'a, C: Column> {
column: &'a C,
doc_id_map: &'a DocIdMapping,
min_max_opt: Mutex<Option<(u64, u64)>>,
random_seeker: Mutex<MultivalueStartIndexRandomSeeker<'a, C>>,
}
struct MultivalueStartIndexRandomSeeker<'a, C: Column> {
seek_head: MultivalueStartIndexIter<'a, C>,
seek_next_id: u64,
}
impl<'a, C: Column> MultivalueStartIndexRandomSeeker<'a, C> {
fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
Self {
seek_head: MultivalueStartIndexIter {
column,
doc_id_map,
new_doc_id: 0,
offset: 0u64,
},
seek_next_id: 0u64,
}
}
}
impl<'a, C: Column> MultivalueStartIndex<'a, C> {
pub fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
assert_eq!(column.num_vals(), doc_id_map.num_old_doc_ids() as u64 + 1);
MultivalueStartIndex {
column,
doc_id_map,
min_max_opt: Mutex::default(),
random_seeker: Mutex::new(MultivalueStartIndexRandomSeeker::new(column, doc_id_map)),
}
}
fn minmax(&self) -> (u64, u64) {
if let Some((min, max)) = *self.min_max_opt.lock().unwrap() {
return (min, max);
}
let (min, max) = tantivy_bitpacker::minmax(self.iter()).unwrap_or((0u64, 0u64));
*self.min_max_opt.lock().unwrap() = Some((min, max));
(min, max)
}
}
impl<'a, C: Column> Column for MultivalueStartIndex<'a, C> {
fn get_val(&self, idx: u64) -> u64 {
let mut random_seeker_lock = self.random_seeker.lock().unwrap();
if random_seeker_lock.seek_next_id > idx {
*random_seeker_lock =
MultivalueStartIndexRandomSeeker::new(self.column, self.doc_id_map);
}
let to_skip = idx - random_seeker_lock.seek_next_id;
random_seeker_lock.seek_next_id = idx + 1;
random_seeker_lock.seek_head.nth(to_skip as usize).unwrap()
}
fn min_value(&self) -> u64 {
self.minmax().0
}
fn max_value(&self) -> u64 {
self.minmax().1
}
fn num_vals(&self) -> u64 {
(self.doc_id_map.num_new_doc_ids() + 1) as u64
}
fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = u64> + 'b> {
Box::new(MultivalueStartIndexIter::new(self.column, self.doc_id_map))
}
}
struct MultivalueStartIndexIter<'a, C: Column> {
pub column: &'a C,
pub doc_id_map: &'a DocIdMapping,
pub new_doc_id: usize,
pub offset: u64,
}
impl<'a, C: Column> MultivalueStartIndexIter<'a, C> {
fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
Self {
column,
doc_id_map,
new_doc_id: 0,
offset: 0,
}
}
}
impl<'a, C: Column> Iterator for MultivalueStartIndexIter<'a, C> {
type Item = u64;
fn next(&mut self) -> Option<Self::Item> {
if self.new_doc_id > self.doc_id_map.num_new_doc_ids() {
return None;
}
let new_doc_id = self.new_doc_id;
self.new_doc_id += 1;
let start_offset = self.offset;
if new_doc_id < self.doc_id_map.num_new_doc_ids() {
let old_doc = self.doc_id_map.get_old_doc_id(new_doc_id as u32) as u64;
let num_vals_for_doc = self.column.get_val(old_doc + 1) - self.column.get_val(old_doc);
self.offset += num_vals_for_doc;
}
Some(start_offset)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_multivalue_start_index() {
let doc_id_mapping = DocIdMapping::from_new_id_to_old_id(vec![4, 1, 2]);
assert_eq!(doc_id_mapping.num_old_doc_ids(), 5);
let col = VecColumn::from(&[0u64, 3, 5, 10, 12, 16][..]);
let multivalue_start_index = MultivalueStartIndex::new(
&col, // 3, 2, 5, 2, 4
&doc_id_mapping,
);
assert_eq!(multivalue_start_index.num_vals(), 4);
assert_eq!(
multivalue_start_index.iter().collect::<Vec<u64>>(),
vec![0, 4, 6, 11]
); // 4, 2, 5
}
#[test]
fn test_multivalue_get_vals() {
let doc_id_mapping =
DocIdMapping::from_new_id_to_old_id(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
assert_eq!(doc_id_mapping.num_old_doc_ids(), 10);
let col = VecColumn::from(&[0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55][..]);
let multivalue_start_index = MultivalueStartIndex::new(&col, &doc_id_mapping);
assert_eq!(
multivalue_start_index.iter().collect::<Vec<u64>>(),
vec![0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55]
);
assert_eq!(multivalue_start_index.num_vals(), 11);
assert_eq!(multivalue_start_index.get_val(3), 2);
assert_eq!(multivalue_start_index.get_val(5), 5);
assert_eq!(multivalue_start_index.get_val(8), 21);
assert_eq!(multivalue_start_index.get_val(4), 3);
assert_eq!(multivalue_start_index.get_val(0), 0);
assert_eq!(multivalue_start_index.get_val(10), 55);
}
}