Compare commits

..

13 Commits

Author SHA1 Message Date
trinity-1686a
45dbc46ef1 add conversion for f32 and fixed precision number 2022-10-05 18:01:26 +02:00
Bruce Mitchener
c2f1c250f9 doc: Remove reference to Searcher pool. (#1598)
The pool of searchers was removed in 23fe73a6 as part of #1411.
2022-10-06 00:04:11 +09:00
PSeitz
2063f1717f Merge pull request #1591 from quickwit-oss/ff_refact
disable linear codec for multivalue values
2022-10-05 19:39:36 +08:00
Pascal Seitz
d742275048 renames 2022-10-05 19:16:49 +08:00
PSeitz
b9f06bc287 Update src/fastfield/multivalued/mod.rs
Co-authored-by: Paul Masurel <paul@quickwit.io>
2022-10-05 19:09:19 +08:00
Pascal Seitz
8b42c4c126 disable linear codec for multivalue value index
don't materialize index column on merge
use simpler chain() variant
2022-10-05 19:09:17 +08:00
PSeitz
7905965800 Merge pull request #1594 from quickwit-oss/flat_map_with_buffer
Removing alloc on all .next() in MultiValueColumn
2022-10-05 18:34:15 +08:00
Pascal Seitz
f60a551890 add flat_map_with_buffer to Iterator trait 2022-10-05 17:44:26 +08:00
Paul Masurel
7baa6e3ec5 Removing alloc on all .next() in MultiValueColumn 2022-10-05 17:12:06 +09:00
PSeitz
2100ec5d26 Merge pull request #1593 from waywardmonkeys/doc-improvements
Documentation improvements.
2022-10-05 15:50:08 +08:00
Bruce Mitchener
b3bf9a5716 Documentation improvements. 2022-10-05 14:18:10 +07:00
Paul Masurel
0dc8c458e0 Flaky unit test. (#1592) 2022-10-05 16:15:48 +09:00
trinity-1686a
5945dbf0bd change format for store to make it faster with small documents (#1569)
* use new format for docstore blocks

* move index to end of block

it makes writing the block faster due to one less memcopy
2022-10-04 09:58:55 +02:00
24 changed files with 551 additions and 173 deletions

View File

@@ -34,7 +34,8 @@ impl<T: Deref<Target = [u8]>> HasLen for T {
} }
} }
const HIGHEST_BIT: u64 = 1 << 63; const HIGHEST_BIT_64: u64 = 1 << 63;
const HIGHEST_BIT_32: u32 = 1 << 31;
/// Maps a `i64` to `u64` /// Maps a `i64` to `u64`
/// ///
@@ -58,13 +59,13 @@ const HIGHEST_BIT: u64 = 1 << 63;
/// The reverse mapping is [`u64_to_i64()`]. /// The reverse mapping is [`u64_to_i64()`].
#[inline] #[inline]
pub fn i64_to_u64(val: i64) -> u64 { pub fn i64_to_u64(val: i64) -> u64 {
(val as u64) ^ HIGHEST_BIT (val as u64) ^ HIGHEST_BIT_64
} }
/// Reverse the mapping given by [`i64_to_u64()`]. /// Reverse the mapping given by [`i64_to_u64()`].
#[inline] #[inline]
pub fn u64_to_i64(val: u64) -> i64 { pub fn u64_to_i64(val: u64) -> i64 {
(val ^ HIGHEST_BIT) as i64 (val ^ HIGHEST_BIT_64) as i64
} }
/// Maps a `f64` to `u64` /// Maps a `f64` to `u64`
@@ -88,7 +89,7 @@ pub fn u64_to_i64(val: u64) -> i64 {
pub fn f64_to_u64(val: f64) -> u64 { pub fn f64_to_u64(val: f64) -> u64 {
let bits = val.to_bits(); let bits = val.to_bits();
if val.is_sign_positive() { if val.is_sign_positive() {
bits ^ HIGHEST_BIT bits ^ HIGHEST_BIT_64
} else { } else {
!bits !bits
} }
@@ -97,26 +98,148 @@ pub fn f64_to_u64(val: f64) -> u64 {
/// Reverse the mapping given by [`f64_to_u64()`]. /// Reverse the mapping given by [`f64_to_u64()`].
#[inline] #[inline]
pub fn u64_to_f64(val: u64) -> f64 { pub fn u64_to_f64(val: u64) -> f64 {
f64::from_bits(if val & HIGHEST_BIT != 0 { f64::from_bits(if val & HIGHEST_BIT_64 != 0 {
val ^ HIGHEST_BIT val ^ HIGHEST_BIT_64
} else { } else {
!val !val
}) })
} }
/// Maps a `f32` to `u64`
///
/// # See also
/// Similar mapping for f64 [`u64_to_f64()`].
#[inline]
pub fn f32_to_u64(val: f32) -> u64 {
let bits = val.to_bits();
let res32 = if val.is_sign_positive() {
bits ^ HIGHEST_BIT_32
} else {
!bits
};
res32 as u64
}
/// Reverse the mapping given by [`f32_to_u64()`].
#[inline]
pub fn u64_to_f32(val: u64) -> f32 {
debug_assert!(val <= 1 << 32);
let val = val as u32;
f32::from_bits(if val & HIGHEST_BIT_32 != 0 {
val ^ HIGHEST_BIT_32
} else {
!val
})
}
/// Maps a `f64` to a fixed point representation.
/// Lower bound is inclusive, upper bound is exclusive.
/// `precision` is the number of bits used to represent the number.
///
/// This is a lossy, affine transformation. All provided values must be finite and non-NaN.
/// Care should be taken to not provide values which would cause loss of precision such as values
/// low enough to get sub-normal numbers, value high enough rounding would cause ±Inf to appear, or
/// a precision larger than 50b.
///
/// # See also
/// The reverse mapping is [`fixed_point_to_f64()`].
#[inline]
pub fn f64_to_fixed_point(val: f64, min: f64, max: f64, precision: u8) -> u64 {
debug_assert!((1..53).contains(&precision));
debug_assert!(min < max);
let delta = max - min;
let mult = (1u64 << precision) as f64;
let bucket_size = delta / mult;
let upper_bound = f64_next_down(max).min(max - bucket_size);
// due to different cases of rounding error, we need to enforce upper_bound to be
// max-bucket_size, but also that upper_bound < max, which is not given for small enough
// bucket_size.
let val = val.clamp(min, upper_bound);
let res = (val - min) / bucket_size;
if res.fract() == 0.5 {
res as u64
} else {
// round down when getting x.5
res.round() as u64
}
}
/// Reverse the mapping given by [`f64_to_fixed_point()`].
#[inline]
pub fn fixed_point_to_f64(val: u64, min: f64, max: f64, precision: u8) -> f64 {
let delta = max - min;
let mult = (1u64 << precision) as f64;
let bucket_size = delta / mult;
bucket_size.mul_add(val as f64, min)
}
// taken from rfc/3173-float-next-up-down, commented out part about nan in infinity as it is not
// needed.
fn f64_next_down(this: f64) -> f64 {
const NEG_TINY_BITS: u64 = 0x8000_0000_0000_0001;
const CLEAR_SIGN_MASK: u64 = 0x7fff_ffff_ffff_ffff;
let bits = this.to_bits();
// if this.is_nan() || bits == f64::NEG_INFINITY.to_bits() {
// return this;
// }
let abs = bits & CLEAR_SIGN_MASK;
let next_bits = if abs == 0 {
NEG_TINY_BITS
} else if bits == abs {
bits - 1
} else {
bits + 1
};
f64::from_bits(next_bits)
}
#[cfg(test)] #[cfg(test)]
pub mod test { pub mod test {
use std::cmp::Ordering;
use proptest::prelude::*; use proptest::prelude::*;
use super::{f64_to_u64, i64_to_u64, u64_to_f64, u64_to_i64, BinarySerializable, FixedSize}; use super::{
f32_to_u64, f64_to_fixed_point, f64_to_u64, fixed_point_to_f64, i64_to_u64, u64_to_f32,
u64_to_f64, u64_to_i64, BinarySerializable, FixedSize,
};
fn test_i64_converter_helper(val: i64) { fn test_i64_converter_helper(val: i64) {
assert_eq!(u64_to_i64(i64_to_u64(val)), val); assert_eq!(u64_to_i64(i64_to_u64(val)), val);
} }
fn test_f64_converter_helper(val: f64) { fn test_f64_converter_helper(val: f64) {
assert_eq!(u64_to_f64(f64_to_u64(val)), val); assert_eq!(u64_to_f64(f64_to_u64(val)).total_cmp(&val), Ordering::Equal);
}
fn test_f32_converter_helper(val: f32) {
assert_eq!(u64_to_f32(f32_to_u64(val)).total_cmp(&val), Ordering::Equal);
}
fn test_fixed_point_converter_helper(val: f64, min: f64, max: f64, precision: u8) {
let bucket_count = 1 << precision;
let packed = f64_to_fixed_point(val, min, max, precision);
assert!(packed < bucket_count, "used to much bits");
let depacked = fixed_point_to_f64(packed, min, max, precision);
let repacked = f64_to_fixed_point(depacked, min, max, precision);
assert_eq!(packed, repacked, "generational loss");
let error = (val.clamp(min, crate::f64_next_down(max)) - depacked).abs();
let expected = (max - min) / (bucket_count as f64);
assert!(
error <= (max - min) / (bucket_count as f64) * 2.0,
"error larger than expected"
);
} }
pub fn fixed_size_test<O: BinarySerializable + FixedSize + Default>() { pub fn fixed_size_test<O: BinarySerializable + FixedSize + Default>() {
@@ -125,12 +248,75 @@ pub mod test {
assert_eq!(buffer.len(), O::SIZE_IN_BYTES); assert_eq!(buffer.len(), O::SIZE_IN_BYTES);
} }
fn fixed_point_bound() -> proptest::num::f64::Any {
proptest::num::f64::POSITIVE
| proptest::num::f64::NEGATIVE
| proptest::num::f64::NORMAL
| proptest::num::f64::ZERO
}
proptest! { proptest! {
#[test] #[test]
fn test_f64_converter_monotonicity_proptest((left, right) in (proptest::num::f64::NORMAL, proptest::num::f64::NORMAL)) { fn test_f64_converter_monotonicity_proptest((left, right) in (proptest::num::f64::ANY, proptest::num::f64::ANY)) {
test_f64_converter_helper(left);
test_f64_converter_helper(right);
let left_u64 = f64_to_u64(left); let left_u64 = f64_to_u64(left);
let right_u64 = f64_to_u64(right); let right_u64 = f64_to_u64(right);
assert_eq!(left_u64 < right_u64, left < right);
assert_eq!(left_u64.cmp(&right_u64), left.total_cmp(&right));
}
#[test]
fn test_f32_converter_monotonicity_proptest((left, right) in (proptest::num::f32::ANY, proptest::num::f32::ANY)) {
test_f32_converter_helper(left);
test_f32_converter_helper(right);
let left_u64 = f32_to_u64(left);
let right_u64 = f32_to_u64(right);
assert_eq!(left_u64.cmp(&right_u64), left.total_cmp(&right));
}
#[test]
fn test_fixed_point_converter_proptest((left, right, min, max, precision) in
(fixed_point_bound(), fixed_point_bound(),
fixed_point_bound(), fixed_point_bound(),
proptest::num::u8::ANY)) {
// convert so all input are legal
let (min, max) = if min < max {
(min, max)
} else if min > max {
(max, min)
} else {
return Ok(()); // equals
};
if 1 > precision || precision >= 50 {
return Ok(());
}
let max_full_precision = 53.0 - precision as f64;
if (max / min).abs().log2().abs() > max_full_precision {
return Ok(());
}
// we will go in subnormal territories => loss of precision
if (((max - min).log2() - precision as f64) as i32) < f64::MIN_EXP {
return Ok(());
}
if (max - min).is_infinite() {
return Ok(());
}
test_fixed_point_converter_helper(left, min, max, precision);
test_fixed_point_converter_helper(right, min, max, precision);
let left_u64 = f64_to_fixed_point(left, min, max, precision);
let right_u64 = f64_to_fixed_point(right, min, max, precision);
if left < right {
assert!(left_u64 <= right_u64);
} else if left > right {
assert!(left_u64 >= right_u64)
}
} }
} }
@@ -168,4 +354,27 @@ pub mod test {
assert!(f64_to_u64(-2.0) < f64_to_u64(1.0)); assert!(f64_to_u64(-2.0) < f64_to_u64(1.0));
assert!(f64_to_u64(-2.0) < f64_to_u64(-1.5)); assert!(f64_to_u64(-2.0) < f64_to_u64(-1.5));
} }
#[test]
fn test_f32_converter() {
test_f32_converter_helper(f32::INFINITY);
test_f32_converter_helper(f32::NEG_INFINITY);
test_f32_converter_helper(0.0);
test_f32_converter_helper(-0.0);
test_f32_converter_helper(1.0);
test_f32_converter_helper(-1.0);
}
#[test]
fn test_f32_order() {
assert!(!(f32_to_u64(f32::NEG_INFINITY)..f32_to_u64(f32::INFINITY))
.contains(&f32_to_u64(f32::NAN))); // nan is not a number
assert!(f32_to_u64(1.5) > f32_to_u64(1.0)); // same exponent, different mantissa
assert!(f32_to_u64(2.0) > f32_to_u64(1.0)); // same mantissa, different exponent
assert!(f32_to_u64(2.0) > f32_to_u64(1.5)); // different exponent and mantissa
assert!(f32_to_u64(1.0) > f32_to_u64(-1.0)); // pos > neg
assert!(f32_to_u64(-1.5) < f32_to_u64(-1.0));
assert!(f32_to_u64(-2.0) < f32_to_u64(1.0));
assert!(f32_to_u64(-2.0) < f32_to_u64(-1.5));
}
} }

View File

@@ -452,7 +452,7 @@ fn intermediate_buckets_to_final_buckets_fill_gaps(
histogram_req: &HistogramAggregation, histogram_req: &HistogramAggregation,
sub_aggregation: &AggregationsInternal, sub_aggregation: &AggregationsInternal,
) -> crate::Result<Vec<BucketEntry>> { ) -> crate::Result<Vec<BucketEntry>> {
// Generate the the full list of buckets without gaps. // Generate the full list of buckets without gaps.
// //
// The bounds are the min max from the current buckets, optionally extended by // The bounds are the min max from the current buckets, optionally extended by
// extended_bounds from the request // extended_bounds from the request

View File

@@ -15,12 +15,11 @@ use crate::termdict::TermDictionary;
/// ///
/// It is safe to delete the segment associated with /// It is safe to delete the segment associated with
/// an `InvertedIndexReader`. As long as it is open, /// an `InvertedIndexReader`. As long as it is open,
/// the `FileSlice` it is relying on should /// the [`FileSlice`] it is relying on should
/// stay available. /// stay available.
/// ///
///
/// `InvertedIndexReader` are created by calling /// `InvertedIndexReader` are created by calling
/// the `SegmentReader`'s [`.inverted_index(...)`] method /// [`SegmentReader::inverted_index()`](crate::SegmentReader::inverted_index).
pub struct InvertedIndexReader { pub struct InvertedIndexReader {
termdict: TermDictionary, termdict: TermDictionary,
postings_file_slice: FileSlice, postings_file_slice: FileSlice,
@@ -75,7 +74,7 @@ impl InvertedIndexReader {
/// ///
/// This is useful for enumerating through a list of terms, /// This is useful for enumerating through a list of terms,
/// and consuming the associated posting lists while avoiding /// and consuming the associated posting lists while avoiding
/// reallocating a `BlockSegmentPostings`. /// reallocating a [`BlockSegmentPostings`].
/// ///
/// # Warning /// # Warning
/// ///
@@ -96,7 +95,7 @@ impl InvertedIndexReader {
/// Returns a block postings given a `Term`. /// Returns a block postings given a `Term`.
/// This method is for an advanced usage only. /// This method is for an advanced usage only.
/// ///
/// Most user should prefer using `read_postings` instead. /// Most users should prefer using [`Self::read_postings()`] instead.
pub fn read_block_postings( pub fn read_block_postings(
&self, &self,
term: &Term, term: &Term,
@@ -110,7 +109,7 @@ impl InvertedIndexReader {
/// Returns a block postings given a `term_info`. /// Returns a block postings given a `term_info`.
/// This method is for an advanced usage only. /// This method is for an advanced usage only.
/// ///
/// Most user should prefer using `read_postings` instead. /// Most users should prefer using [`Self::read_postings()`] instead.
pub fn read_block_postings_from_terminfo( pub fn read_block_postings_from_terminfo(
&self, &self,
term_info: &TermInfo, term_info: &TermInfo,
@@ -130,7 +129,7 @@ impl InvertedIndexReader {
/// Returns a posting object given a `term_info`. /// Returns a posting object given a `term_info`.
/// This method is for an advanced usage only. /// This method is for an advanced usage only.
/// ///
/// Most user should prefer using `read_postings` instead. /// Most users should prefer using [`Self::read_postings()`] instead.
pub fn read_postings_from_terminfo( pub fn read_postings_from_terminfo(
&self, &self,
term_info: &TermInfo, term_info: &TermInfo,
@@ -164,12 +163,12 @@ impl InvertedIndexReader {
/// or `None` if the term has never been encountered and indexed. /// or `None` if the term has never been encountered and indexed.
/// ///
/// If the field was not indexed with the indexing options that cover /// If the field was not indexed with the indexing options that cover
/// the requested options, the returned `SegmentPostings` the method does not fail /// the requested options, the returned [`SegmentPostings`] the method does not fail
/// and returns a `SegmentPostings` with as much information as possible. /// and returns a `SegmentPostings` with as much information as possible.
/// ///
/// For instance, requesting `IndexRecordOption::Freq` for a /// For instance, requesting [`IndexRecordOption::WithFreqs`] for a
/// `TextIndexingOptions` that does not index position will return a `SegmentPostings` /// [`TextOptions`](crate::schema::TextOptions) that does not index position
/// with `DocId`s and frequencies. /// will return a [`SegmentPostings`] with `DocId`s and frequencies.
pub fn read_postings( pub fn read_postings(
&self, &self,
term: &Term, term: &Term,
@@ -211,7 +210,7 @@ impl InvertedIndexReader {
/// Returns a block postings given a `Term`. /// Returns a block postings given a `Term`.
/// This method is for an advanced usage only. /// This method is for an advanced usage only.
/// ///
/// Most user should prefer using `read_postings` instead. /// Most users should prefer using [`Self::read_postings()`] instead.
pub async fn warm_postings( pub async fn warm_postings(
&self, &self,
term: &Term, term: &Term,

View File

@@ -216,10 +216,10 @@ impl SegmentReader {
/// term dictionary associated with a specific field, /// term dictionary associated with a specific field,
/// and opening the posting list associated with any term. /// and opening the posting list associated with any term.
/// ///
/// If the field is not marked as index, a warn is logged and an empty `InvertedIndexReader` /// If the field is not marked as index, a warning is logged and an empty `InvertedIndexReader`
/// is returned. /// is returned.
/// Similarly if the field is marked as indexed but no term has been indexed for the given /// Similarly, if the field is marked as indexed but no term has been indexed for the given
/// index. an empty `InvertedIndexReader` is returned (but no warning is logged). /// index, an empty `InvertedIndexReader` is returned (but no warning is logged).
pub fn inverted_index(&self, field: Field) -> crate::Result<Arc<InvertedIndexReader>> { pub fn inverted_index(&self, field: Field) -> crate::Result<Arc<InvertedIndexReader>> {
if let Some(inv_idx_reader) = self if let Some(inv_idx_reader) = self
.inv_idx_reader_cache .inv_idx_reader_cache

View File

@@ -13,8 +13,8 @@ use crate::directory::OwnedBytes;
/// By contract, whatever happens to the directory file, as long as a FileHandle /// By contract, whatever happens to the directory file, as long as a FileHandle
/// is alive, the data associated with it cannot be altered or destroyed. /// is alive, the data associated with it cannot be altered or destroyed.
/// ///
/// The underlying behavior is therefore specific to the `Directory` that created it. /// The underlying behavior is therefore specific to the [`Directory`](crate::Directory) that
/// Despite its name, a `FileSlice` may or may not directly map to an actual file /// created it. Despite its name, a [`FileSlice`] may or may not directly map to an actual file
/// on the filesystem. /// on the filesystem.
#[async_trait] #[async_trait]

View File

@@ -610,9 +610,11 @@ mod tests {
assert!(num_segments <= 4); assert!(num_segments <= 4);
let num_components_except_deletes_and_tempstore = let num_components_except_deletes_and_tempstore =
crate::core::SegmentComponent::iterator().len() - 2; crate::core::SegmentComponent::iterator().len() - 2;
assert_eq!( let num_mmapped = mmap_directory.get_cache_info().mmapped.len();
num_segments * num_components_except_deletes_and_tempstore, assert!(
mmap_directory.get_cache_info().mmapped.len() num_mmapped <= num_segments * num_components_except_deletes_and_tempstore,
"Expected at most {} mmapped files, got {num_mmapped}",
num_segments * num_components_except_deletes_and_tempstore
); );
} }
// This test failed on CI. The last Mmap is dropped from the merging thread so there might // This test failed on CI. The last Mmap is dropped from the merging thread so there might

View File

@@ -26,7 +26,7 @@ pub use self::alive_bitset::{intersect_alive_bitsets, write_alive_bitset, AliveB
pub use self::bytes::{BytesFastFieldReader, BytesFastFieldWriter}; pub use self::bytes::{BytesFastFieldReader, BytesFastFieldWriter};
pub use self::error::{FastFieldNotAvailableError, Result}; pub use self::error::{FastFieldNotAvailableError, Result};
pub use self::facet_reader::FacetReader; pub use self::facet_reader::FacetReader;
pub(crate) use self::multivalued::MultivalueStartIndex; pub(crate) use self::multivalued::{get_fastfield_codecs_for_multivalue, MultivalueStartIndex};
pub use self::multivalued::{MultiValuedFastFieldReader, MultiValuedFastFieldWriter}; pub use self::multivalued::{MultiValuedFastFieldReader, MultiValuedFastFieldWriter};
pub use self::readers::FastFieldReaders; pub use self::readers::FastFieldReaders;
pub(crate) use self::readers::{type_and_cardinality, FastType}; pub(crate) use self::readers::{type_and_cardinality, FastType};

View File

@@ -1,10 +1,22 @@
mod reader; mod reader;
mod writer; mod writer;
use fastfield_codecs::FastFieldCodecType;
pub use self::reader::MultiValuedFastFieldReader; pub use self::reader::MultiValuedFastFieldReader;
pub use self::writer::MultiValuedFastFieldWriter; pub use self::writer::MultiValuedFastFieldWriter;
pub(crate) use self::writer::MultivalueStartIndex; pub(crate) use self::writer::MultivalueStartIndex;
/// The valid codecs for multivalue values excludes the linear interpolation codec.
///
/// This limitation is only valid for the values, not the offset index of the multivalue index.
pub(crate) fn get_fastfield_codecs_for_multivalue() -> [FastFieldCodecType; 2] {
[
FastFieldCodecType::Bitpacked,
FastFieldCodecType::BlockwiseLinear,
]
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use proptest::strategy::Strategy; use proptest::strategy::Strategy;

View File

@@ -3,6 +3,7 @@ use std::io;
use fastfield_codecs::{Column, MonotonicallyMappableToU64, VecColumn}; use fastfield_codecs::{Column, MonotonicallyMappableToU64, VecColumn};
use fnv::FnvHashMap; use fnv::FnvHashMap;
use super::get_fastfield_codecs_for_multivalue;
use crate::fastfield::{value_to_u64, CompositeFastFieldSerializer, FastFieldType}; use crate::fastfield::{value_to_u64, CompositeFastFieldSerializer, FastFieldType};
use crate::indexer::doc_id_mapping::DocIdMapping; use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::postings::UnorderedTermId; use crate::postings::UnorderedTermId;
@@ -194,7 +195,12 @@ impl MultiValuedFastFieldWriter {
} }
} }
let col = VecColumn::from(&values[..]); let col = VecColumn::from(&values[..]);
serializer.create_auto_detect_u64_fast_field_with_idx(self.field, col, 1)?; serializer.create_auto_detect_u64_fast_field_with_idx_and_codecs(
self.field,
col,
1,
&get_fastfield_codecs_for_multivalue(),
)?;
} }
Ok(()) Ok(())
} }
@@ -251,15 +257,11 @@ fn iter_remapped_multivalue_index<'a, C: Column>(
column: &'a C, column: &'a C,
) -> impl Iterator<Item = u64> + 'a { ) -> impl Iterator<Item = u64> + 'a {
let mut offset = 0; let mut offset = 0;
let offsets = doc_id_map std::iter::once(0).chain(doc_id_map.iter_old_doc_ids().map(move |old_doc| {
.iter_old_doc_ids() let num_vals_for_doc = column.get_val(old_doc as u64 + 1) - column.get_val(old_doc as u64);
.map(move |old_doc| { offset += num_vals_for_doc;
let num_vals_for_doc = offset as u64
column.get_val(old_doc as u64 + 1) - column.get_val(old_doc as u64); }))
offset += num_vals_for_doc;
offset
});
std::iter::once(0u64).chain(offsets)
} }
#[cfg(test)] #[cfg(test)]

View File

@@ -70,6 +70,20 @@ impl CompositeFastFieldSerializer {
Ok(()) Ok(())
} }
/// Serialize data into a new u64 fast field. The best compression codec of the the provided
/// will be chosen.
pub fn create_auto_detect_u64_fast_field_with_idx_and_codecs<T: MonotonicallyMappableToU64>(
&mut self,
field: Field,
fastfield_accessor: impl Column<T>,
idx: usize,
codec_types: &[FastFieldCodecType],
) -> io::Result<()> {
let field_write = self.composite_write.for_field_with_idx(field, idx);
fastfield_codecs::serialize(fastfield_accessor, field_write, codec_types)?;
Ok(())
}
/// Start serializing a new [u8] fast field. Use the returned writer to write data into the /// Start serializing a new [u8] fast field. Use the returned writer to write data into the
/// bytes field. To associate the bytes with documents a seperate index must be created on /// bytes field. To associate the bytes with documents a seperate index must be created on
/// index 0. See bytes/writer.rs::serialize for an example. /// index 0. See bytes/writer.rs::serialize for an example.

View File

@@ -0,0 +1,69 @@
pub struct FlatMapWithBuffer<T, F, Iter> {
buffer: Vec<T>,
fill_buffer: F,
underlying_it: Iter,
}
impl<T, F, Iter, I> Iterator for FlatMapWithBuffer<T, F, Iter>
where
Iter: Iterator<Item = I>,
F: Fn(I, &mut Vec<T>),
{
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
while self.buffer.is_empty() {
let next_el = self.underlying_it.next()?;
(self.fill_buffer)(next_el, &mut self.buffer);
// We will pop elements, so we reverse the buffer first.
self.buffer.reverse();
}
self.buffer.pop()
}
}
pub trait FlatMapWithBufferIter: Iterator {
/// Function similar to `flat_map`, but allows reusing a shared `Vec`.
fn flat_map_with_buffer<F, T>(self, fill_buffer: F) -> FlatMapWithBuffer<T, F, Self>
where
F: Fn(Self::Item, &mut Vec<T>),
Self: Sized,
{
FlatMapWithBuffer {
buffer: Vec::with_capacity(10),
fill_buffer,
underlying_it: self,
}
}
}
impl<T: ?Sized> FlatMapWithBufferIter for T where T: Iterator {}
#[cfg(test)]
mod tests {
use crate::indexer::flat_map_with_buffer::FlatMapWithBufferIter;
#[test]
fn test_flat_map_with_buffer_empty() {
let mut empty_iter = std::iter::empty::<usize>()
.flat_map_with_buffer(|_val: usize, _buffer: &mut Vec<usize>| {});
assert!(empty_iter.next().is_none());
}
#[test]
fn test_flat_map_with_buffer_simple() {
let vals: Vec<usize> = (1..5)
.flat_map_with_buffer(|val: usize, buffer: &mut Vec<usize>| buffer.extend(0..val))
.collect();
assert_eq!(&[0, 0, 1, 0, 1, 2, 0, 1, 2, 3], &vals[..]);
}
#[test]
fn test_flat_map_filling_no_elements_does_not_stop_iterator() {
let vals: Vec<usize> = [2, 0, 0, 3]
.into_iter()
.flat_map_with_buffer(|val: usize, buffer: &mut Vec<usize>| buffer.extend(0..val))
.collect();
assert_eq!(&[0, 1, 0, 1, 2], &vals[..]);
}
}

View File

@@ -6,11 +6,13 @@ use fastfield_codecs::VecColumn;
use itertools::Itertools; use itertools::Itertools;
use measure_time::debug_time; use measure_time::debug_time;
use super::sorted_doc_id_multivalue_column::RemappedDocIdMultiValueIndexColumn;
use crate::core::{Segment, SegmentReader}; use crate::core::{Segment, SegmentReader};
use crate::docset::{DocSet, TERMINATED}; use crate::docset::{DocSet, TERMINATED};
use crate::error::DataCorruption; use crate::error::DataCorruption;
use crate::fastfield::{ use crate::fastfield::{
AliveBitSet, Column, CompositeFastFieldSerializer, MultiValueLength, MultiValuedFastFieldReader, get_fastfield_codecs_for_multivalue, AliveBitSet, Column, CompositeFastFieldSerializer,
MultiValueLength, MultiValuedFastFieldReader,
}; };
use crate::fieldnorm::{FieldNormReader, FieldNormReaders, FieldNormsSerializer, FieldNormsWriter}; use crate::fieldnorm::{FieldNormReader, FieldNormReaders, FieldNormsSerializer, FieldNormsWriter};
use crate::indexer::doc_id_mapping::{expect_field_id_for_sort_field, SegmentDocIdMapping}; use crate::indexer::doc_id_mapping::{expect_field_id_for_sort_field, SegmentDocIdMapping};
@@ -423,26 +425,16 @@ impl IndexMerger {
// Creating the index file to point into the data, generic over `BytesFastFieldReader` and // Creating the index file to point into the data, generic over `BytesFastFieldReader` and
// `MultiValuedFastFieldReader` // `MultiValuedFastFieldReader`
// //
fn write_1_n_fast_field_idx_generic<T: MultiValueLength>( fn write_1_n_fast_field_idx_generic<T: MultiValueLength + Send + Sync>(
field: Field, field: Field,
fast_field_serializer: &mut CompositeFastFieldSerializer, fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &SegmentDocIdMapping, doc_id_mapping: &SegmentDocIdMapping,
reader_and_field_accessors: &[(&SegmentReader, T)], segment_and_ff_readers: &[(&SegmentReader, T)],
) -> crate::Result<()> { ) -> crate::Result<()> {
// TODO Use `Column` implementation instead let column =
RemappedDocIdMultiValueIndexColumn::new(segment_and_ff_readers, doc_id_mapping);
let mut offsets = Vec::with_capacity(doc_id_mapping.len()); fast_field_serializer.create_auto_detect_u64_fast_field(field, column)?;
let mut offset = 0;
for old_doc_addr in doc_id_mapping.iter_old_doc_addrs() {
let reader = &reader_and_field_accessors[old_doc_addr.segment_ord as usize].1;
offsets.push(offset);
offset += reader.get_len(old_doc_addr.doc_id) as u64;
}
offsets.push(offset);
let fastfield_accessor = VecColumn::from(&offsets[..]);
fast_field_serializer.create_auto_detect_u64_fast_field(field, fastfield_accessor)?;
Ok(()) Ok(())
} }
/// Returns the fastfield index (index for the data, not the data). /// Returns the fastfield index (index for the data, not the data).
@@ -452,7 +444,7 @@ impl IndexMerger {
fast_field_serializer: &mut CompositeFastFieldSerializer, fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &SegmentDocIdMapping, doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<()> { ) -> crate::Result<()> {
let reader_ordinal_and_field_accessors = self let segment_and_ff_readers = self
.readers .readers
.iter() .iter()
.map(|reader| { .map(|reader| {
@@ -471,7 +463,7 @@ impl IndexMerger {
field, field,
fast_field_serializer, fast_field_serializer,
doc_id_mapping, doc_id_mapping,
&reader_ordinal_and_field_accessors, &segment_and_ff_readers,
) )
} }
@@ -520,7 +512,12 @@ impl IndexMerger {
} }
let col = VecColumn::from(&vals[..]); let col = VecColumn::from(&vals[..]);
fast_field_serializer.create_auto_detect_u64_fast_field_with_idx(field, col, 1)?; fast_field_serializer.create_auto_detect_u64_fast_field_with_idx_and_codecs(
field,
col,
1,
&get_fastfield_codecs_for_multivalue(),
)?;
} }
Ok(()) Ok(())
} }
@@ -565,10 +562,11 @@ impl IndexMerger {
let fastfield_accessor = let fastfield_accessor =
RemappedDocIdMultiValueColumn::new(&self.readers, doc_id_mapping, field); RemappedDocIdMultiValueColumn::new(&self.readers, doc_id_mapping, field);
fast_field_serializer.create_auto_detect_u64_fast_field_with_idx( fast_field_serializer.create_auto_detect_u64_fast_field_with_idx_and_codecs(
field, field,
fastfield_accessor, fastfield_accessor,
1, 1,
&get_fastfield_codecs_for_multivalue(),
)?; )?;
Ok(()) Ok(())
@@ -580,7 +578,7 @@ impl IndexMerger {
fast_field_serializer: &mut CompositeFastFieldSerializer, fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &SegmentDocIdMapping, doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<()> { ) -> crate::Result<()> {
let reader_and_field_accessors = self let segment_and_ff_readers = self
.readers .readers
.iter() .iter()
.map(|reader| { .map(|reader| {
@@ -591,17 +589,17 @@ impl IndexMerger {
(reader, bytes_reader) (reader, bytes_reader)
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
Self::write_1_n_fast_field_idx_generic( Self::write_1_n_fast_field_idx_generic(
field, field,
fast_field_serializer, fast_field_serializer,
doc_id_mapping, doc_id_mapping,
&reader_and_field_accessors, &segment_and_ff_readers,
)?; )?;
let mut serialize_vals = fast_field_serializer.new_bytes_fast_field(field); let mut serialize_vals = fast_field_serializer.new_bytes_fast_field(field);
for old_doc_addr in doc_id_mapping.iter_old_doc_addrs() { for old_doc_addr in doc_id_mapping.iter_old_doc_addrs() {
let bytes_reader = &reader_and_field_accessors[old_doc_addr.segment_ord as usize].1; let bytes_reader = &segment_and_ff_readers[old_doc_addr.segment_ord as usize].1;
let val = bytes_reader.get_bytes(old_doc_addr.doc_id); let val = bytes_reader.get_bytes(old_doc_addr.doc_id);
serialize_vals.write_all(val)?; serialize_vals.write_all(val)?;
} }

View File

@@ -3,6 +3,7 @@ pub mod delete_queue;
pub mod demuxer; pub mod demuxer;
pub mod doc_id_mapping; pub mod doc_id_mapping;
mod doc_opstamp_mapping; mod doc_opstamp_mapping;
mod flat_map_with_buffer;
pub mod index_writer; pub mod index_writer;
mod index_writer_status; mod index_writer_status;
mod json_term_writer; mod json_term_writer;

View File

@@ -133,15 +133,15 @@ fn merge(
/// Advanced: Merges a list of segments from different indices in a new index. /// Advanced: Merges a list of segments from different indices in a new index.
/// ///
/// Returns `TantivyError` if the the indices list is empty or their /// Returns `TantivyError` if the indices list is empty or their
/// schemas don't match. /// schemas don't match.
/// ///
/// `output_directory`: is assumed to be empty. /// `output_directory`: is assumed to be empty.
/// ///
/// # Warning /// # Warning
/// This function does NOT check or take the `IndexWriter` is running. It is not /// This function does NOT check or take the `IndexWriter` is running. It is not
/// meant to work if you have an IndexWriter running for the origin indices, or /// meant to work if you have an `IndexWriter` running for the origin indices, or
/// the destination Index. /// the destination `Index`.
#[doc(hidden)] #[doc(hidden)]
pub fn merge_indices<T: Into<Box<dyn Directory>>>( pub fn merge_indices<T: Into<Box<dyn Directory>>>(
indices: &[Index], indices: &[Index],
@@ -179,15 +179,15 @@ pub fn merge_indices<T: Into<Box<dyn Directory>>>(
/// Advanced: Merges a list of segments from different indices in a new index. /// Advanced: Merges a list of segments from different indices in a new index.
/// Additional you can provide a delete bitset for each segment to ignore doc_ids. /// Additional you can provide a delete bitset for each segment to ignore doc_ids.
/// ///
/// Returns `TantivyError` if the the indices list is empty or their /// Returns `TantivyError` if the indices list is empty or their
/// schemas don't match. /// schemas don't match.
/// ///
/// `output_directory`: is assumed to be empty. /// `output_directory`: is assumed to be empty.
/// ///
/// # Warning /// # Warning
/// This function does NOT check or take the `IndexWriter` is running. It is not /// This function does NOT check or take the `IndexWriter` is running. It is not
/// meant to work if you have an IndexWriter running for the origin indices, or /// meant to work if you have an `IndexWriter` running for the origin indices, or
/// the destination Index. /// the destination `Index`.
#[doc(hidden)] #[doc(hidden)]
pub fn merge_filtered_segments<T: Into<Box<dyn Directory>>>( pub fn merge_filtered_segments<T: Into<Box<dyn Directory>>>(
segments: &[Segment], segments: &[Segment],

View File

@@ -2,10 +2,11 @@ use std::cmp;
use fastfield_codecs::Column; use fastfield_codecs::Column;
use crate::fastfield::MultiValuedFastFieldReader; use super::flat_map_with_buffer::FlatMapWithBufferIter;
use crate::fastfield::{MultiValueLength, MultiValuedFastFieldReader};
use crate::indexer::doc_id_mapping::SegmentDocIdMapping; use crate::indexer::doc_id_mapping::SegmentDocIdMapping;
use crate::schema::Field; use crate::schema::Field;
use crate::SegmentReader; use crate::{DocAddress, SegmentReader};
pub(crate) struct RemappedDocIdMultiValueColumn<'a> { pub(crate) struct RemappedDocIdMultiValueColumn<'a> {
doc_id_mapping: &'a SegmentDocIdMapping, doc_id_mapping: &'a SegmentDocIdMapping,
@@ -74,11 +75,9 @@ impl<'a> Column for RemappedDocIdMultiValueColumn<'a> {
Box::new( Box::new(
self.doc_id_mapping self.doc_id_mapping
.iter_old_doc_addrs() .iter_old_doc_addrs()
.flat_map(|old_doc_addr| { .flat_map_with_buffer(|old_doc_addr: DocAddress, buffer| {
let ff_reader = &self.fast_field_readers[old_doc_addr.segment_ord as usize]; let ff_reader = &self.fast_field_readers[old_doc_addr.segment_ord as usize];
let mut vals = Vec::new(); ff_reader.get_vals(old_doc_addr.doc_id, buffer);
ff_reader.get_vals(old_doc_addr.doc_id, &mut vals);
vals.into_iter()
}), }),
) )
} }
@@ -94,3 +93,76 @@ impl<'a> Column for RemappedDocIdMultiValueColumn<'a> {
self.num_vals self.num_vals
} }
} }
pub(crate) struct RemappedDocIdMultiValueIndexColumn<'a, T: MultiValueLength> {
doc_id_mapping: &'a SegmentDocIdMapping,
multi_value_length_readers: Vec<&'a T>,
min_value: u64,
max_value: u64,
num_vals: u64,
}
impl<'a, T: MultiValueLength> RemappedDocIdMultiValueIndexColumn<'a, T> {
pub(crate) fn new(
segment_and_ff_readers: &'a [(&'a SegmentReader, T)],
doc_id_mapping: &'a SegmentDocIdMapping,
) -> Self {
// We go through a complete first pass to compute the minimum and the
// maximum value and initialize our Column.
let mut num_vals = 0;
let min_value = 0;
let mut max_value = 0;
let mut multi_value_length_readers = Vec::with_capacity(segment_and_ff_readers.len());
for segment_and_ff_reader in segment_and_ff_readers {
let segment_reader = segment_and_ff_reader.0;
let multi_value_length_reader = &segment_and_ff_reader.1;
if !segment_reader.has_deletes() {
max_value += multi_value_length_reader.get_total_len();
} else {
for doc in segment_reader.doc_ids_alive() {
max_value += multi_value_length_reader.get_len(doc);
}
}
num_vals += segment_reader.num_docs() as u64;
multi_value_length_readers.push(multi_value_length_reader);
}
Self {
doc_id_mapping,
multi_value_length_readers,
min_value,
max_value,
num_vals,
}
}
}
impl<'a, T: MultiValueLength + Send + Sync> Column for RemappedDocIdMultiValueIndexColumn<'a, T> {
fn get_val(&self, _pos: u64) -> u64 {
unimplemented!()
}
fn iter(&self) -> Box<dyn Iterator<Item = u64> + '_> {
let mut offset = 0;
Box::new(
std::iter::once(0).chain(self.doc_id_mapping.iter_old_doc_addrs().map(
move |old_doc_addr| {
let ff_reader =
&self.multi_value_length_readers[old_doc_addr.segment_ord as usize];
offset += ff_reader.get_len(old_doc_addr.doc_id);
offset
},
)),
)
}
fn min_value(&self) -> u64 {
self.min_value
}
fn max_value(&self) -> u64 {
self.max_value
}
fn num_vals(&self) -> u64 {
self.num_vals
}
}

View File

@@ -47,11 +47,11 @@ impl<'a> Iterator for VInt32Reader<'a> {
} }
} }
/// Recorder is in charge of recording relevant information about /// `Recorder` is in charge of recording relevant information about
/// the presence of a term in a document. /// the presence of a term in a document.
/// ///
/// Depending on the `TextIndexingOptions` associated with the /// Depending on the [`TextOptions`](crate::schema::TextOptions) associated
/// field, the recorder may records /// with the field, the recorder may record:
/// * the document frequency /// * the document frequency
/// * the document id /// * the document id
/// * the term frequency /// * the term frequency

View File

@@ -199,7 +199,7 @@ impl TermHashMap {
/// `update` create a new entry for a given key if it does not exists /// `update` create a new entry for a given key if it does not exists
/// or updates the existing entry. /// or updates the existing entry.
/// ///
/// The actual logic for this update is define in the the `updater` /// The actual logic for this update is define in the `updater`
/// argument. /// argument.
/// ///
/// If the key is not present, `updater` will receive `None` and /// If the key is not present, `updater` will receive `None` and

View File

@@ -263,8 +263,7 @@ impl InnerIndexReader {
/// It controls when a new version of the index should be loaded and lends /// It controls when a new version of the index should be loaded and lends
/// you instances of `Searcher` for the last loaded version. /// you instances of `Searcher` for the last loaded version.
/// ///
/// `Clone` does not clone the different pool of searcher. `IndexReader` /// `IndexReader` just wraps an `Arc`.
/// just wraps an `Arc`.
#[derive(Clone)] #[derive(Clone)]
pub struct IndexReader { pub struct IndexReader {
inner: Arc<InnerIndexReader>, inner: Arc<InnerIndexReader>,
@@ -294,9 +293,6 @@ impl IndexReader {
/// ///
/// This method should be called every single time a search /// This method should be called every single time a search
/// query is performed. /// query is performed.
/// The searchers are taken from a pool of `num_searchers` searchers.
/// If no searcher is available
/// this may block.
/// ///
/// The same searcher must be used for a given query, as it ensures /// The same searcher must be used for a given query, as it ensures
/// the use of a consistent segment set. /// the use of a consistent segment set.

View File

@@ -3,7 +3,7 @@ use std::ops::BitOr;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use super::flags::{FastFlag, IndexedFlag, SchemaFlagList, StoredFlag}; use super::flags::{FastFlag, IndexedFlag, SchemaFlagList, StoredFlag};
/// Define how an a bytes field should be handled by tantivy. /// Define how a bytes field should be handled by tantivy.
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] #[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(from = "BytesOptionsDeser")] #[serde(from = "BytesOptionsDeser")]
pub struct BytesOptions { pub struct BytesOptions {

View File

@@ -28,10 +28,10 @@
//! use tantivy::schema::*; //! use tantivy::schema::*;
//! let mut schema_builder = Schema::builder(); //! let mut schema_builder = Schema::builder();
//! let title_options = TextOptions::default() //! let title_options = TextOptions::default()
//! .set_stored() //! .set_stored()
//! .set_indexing_options(TextFieldIndexing::default() //! .set_indexing_options(TextFieldIndexing::default()
//! .set_tokenizer("default") //! .set_tokenizer("default")
//! .set_index_option(IndexRecordOption::WithFreqsAndPositions)); //! .set_index_option(IndexRecordOption::WithFreqsAndPositions));
//! schema_builder.add_text_field("title", title_options); //! schema_builder.add_text_field("title", title_options);
//! let schema = schema_builder.build(); //! let schema = schema_builder.build();
//! ``` //! ```
@@ -45,8 +45,7 @@
//! In the first phase, the ability to search for documents by the given field is determined by the //! In the first phase, the ability to search for documents by the given field is determined by the
//! [`IndexRecordOption`] of our [`TextOptions`]. //! [`IndexRecordOption`] of our [`TextOptions`].
//! //!
//! The effect of each possible setting is described more in detail //! The effect of each possible setting is described more in detail in [`TextOptions`].
//! [`TextIndexingOptions`](enum.TextIndexingOptions.html).
//! //!
//! On the other hand setting the field as stored or not determines whether the field should be //! On the other hand setting the field as stored or not determines whether the field should be
//! returned when [`Searcher::doc()`](crate::Searcher::doc) is called. //! returned when [`Searcher::doc()`](crate::Searcher::doc) is called.
@@ -60,8 +59,8 @@
//! use tantivy::schema::*; //! use tantivy::schema::*;
//! let mut schema_builder = Schema::builder(); //! let mut schema_builder = Schema::builder();
//! let num_stars_options = NumericOptions::default() //! let num_stars_options = NumericOptions::default()
//! .set_stored() //! .set_stored()
//! .set_indexed(); //! .set_indexed();
//! schema_builder.add_u64_field("num_stars", num_stars_options); //! schema_builder.add_u64_field("num_stars", num_stars_options);
//! let schema = schema_builder.build(); //! let schema = schema_builder.build();
//! ``` //! ```
@@ -79,8 +78,8 @@
//! For convenience, it is possible to define your field indexing options by combining different //! For convenience, it is possible to define your field indexing options by combining different
//! flags using the `|` operator. //! flags using the `|` operator.
//! //!
//! For instance, a schema containing the two fields defined in the example above could be rewritten //! For instance, a schema containing the two fields defined in the example above could be
//! : //! rewritten:
//! //!
//! ``` //! ```
//! use tantivy::schema::*; //! use tantivy::schema::*;

View File

@@ -54,27 +54,27 @@ impl Term {
term term
} }
/// Builds a term given a field, and a u64-value /// Builds a term given a field, and a `u64`-value
pub fn from_field_u64(field: Field, val: u64) -> Term { pub fn from_field_u64(field: Field, val: u64) -> Term {
Term::from_fast_value(field, &val) Term::from_fast_value(field, &val)
} }
/// Builds a term given a field, and a i64-value /// Builds a term given a field, and a `i64`-value
pub fn from_field_i64(field: Field, val: i64) -> Term { pub fn from_field_i64(field: Field, val: i64) -> Term {
Term::from_fast_value(field, &val) Term::from_fast_value(field, &val)
} }
/// Builds a term given a field, and a f64-value /// Builds a term given a field, and a `f64`-value
pub fn from_field_f64(field: Field, val: f64) -> Term { pub fn from_field_f64(field: Field, val: f64) -> Term {
Term::from_fast_value(field, &val) Term::from_fast_value(field, &val)
} }
/// Builds a term given a field, and a f64-value /// Builds a term given a field, and a `bool`-value
pub fn from_field_bool(field: Field, val: bool) -> Term { pub fn from_field_bool(field: Field, val: bool) -> Term {
Term::from_fast_value(field, &val) Term::from_fast_value(field, &val)
} }
/// Builds a term given a field, and a DateTime value /// Builds a term given a field, and a `DateTime` value
pub fn from_field_date(field: Field, val: DateTime) -> Term { pub fn from_field_date(field: Field, val: DateTime) -> Term {
Term::from_fast_value(field, &val.truncate(DatePrecision::Seconds)) Term::from_fast_value(field, &val.truncate(DatePrecision::Seconds))
} }
@@ -130,7 +130,7 @@ impl Term {
self.set_fast_value(val); self.set_fast_value(val);
} }
/// Sets a `i64` value in the term. /// Sets a `DateTime` value in the term.
pub fn set_date(&mut self, date: DateTime) { pub fn set_date(&mut self, date: DateTime) {
self.set_fast_value(date); self.set_fast_value(date);
} }

View File

@@ -47,7 +47,9 @@ impl TextOptions {
/// unchanged. The "default" tokenizer will store the terms as lower case and this will be /// unchanged. The "default" tokenizer will store the terms as lower case and this will be
/// reflected in the dictionary. /// reflected in the dictionary.
/// ///
/// The original text can be retrieved via `ord_to_term` from the dictionary. /// The original text can be retrieved via
/// [`TermDictionary::ord_to_term()`](crate::termdict::TermDictionary::ord_to_term)
/// from the dictionary.
#[must_use] #[must_use]
pub fn set_fast(mut self) -> TextOptions { pub fn set_fast(mut self) -> TextOptions {
self.fast = true; self.fast = true;

View File

@@ -1,10 +1,10 @@
use std::io; use std::io;
use std::iter::Sum; use std::iter::Sum;
use std::ops::AddAssign; use std::ops::{AddAssign, Range};
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use common::{BinarySerializable, HasLen, VInt}; use common::{BinarySerializable, HasLen};
use lru::LruCache; use lru::LruCache;
use ownedbytes::OwnedBytes; use ownedbytes::OwnedBytes;
@@ -140,10 +140,10 @@ impl StoreReader {
self.cache.stats() self.cache.stats()
} }
/// Get checkpoint for DocId. The checkpoint can be used to load a block containing the /// Get checkpoint for `DocId`. The checkpoint can be used to load a block containing the
/// document. /// document.
/// ///
/// Advanced API. In most cases use [get](Self::get). /// Advanced API. In most cases use [`get`](Self::get).
fn block_checkpoint(&self, doc_id: DocId) -> crate::Result<Checkpoint> { fn block_checkpoint(&self, doc_id: DocId) -> crate::Result<Checkpoint> {
self.skip_index.seek(doc_id).ok_or_else(|| { self.skip_index.seek(doc_id).ok_or_else(|| {
crate::TantivyError::InvalidArgument(format!("Failed to lookup Doc #{}.", doc_id)) crate::TantivyError::InvalidArgument(format!("Failed to lookup Doc #{}.", doc_id))
@@ -160,7 +160,7 @@ impl StoreReader {
/// Loads and decompresses a block. /// Loads and decompresses a block.
/// ///
/// Advanced API. In most cases use [get](Self::get). /// Advanced API. In most cases use [`get`](Self::get).
fn read_block(&self, checkpoint: &Checkpoint) -> io::Result<Block> { fn read_block(&self, checkpoint: &Checkpoint) -> io::Result<Block> {
let cache_key = checkpoint.byte_range.start; let cache_key = checkpoint.byte_range.start;
if let Some(block) = self.cache.get_from_cache(cache_key) { if let Some(block) = self.cache.get_from_cache(cache_key) {
@@ -205,28 +205,21 @@ impl StoreReader {
/// Advanced API. /// Advanced API.
/// ///
/// In most cases use [get_document_bytes](Self::get_document_bytes). /// In most cases use [`get_document_bytes`](Self::get_document_bytes).
fn get_document_bytes_from_block( fn get_document_bytes_from_block(
block: OwnedBytes, block: OwnedBytes,
doc_id: DocId, doc_id: DocId,
checkpoint: &Checkpoint, checkpoint: &Checkpoint,
) -> crate::Result<OwnedBytes> { ) -> crate::Result<OwnedBytes> {
let mut cursor = &block[..]; let doc_pos = doc_id - checkpoint.doc_range.start;
let cursor_len_before = cursor.len();
for _ in checkpoint.doc_range.start..doc_id {
let doc_length = VInt::deserialize(&mut cursor)?.val() as usize;
cursor = &cursor[doc_length..];
}
let doc_length = VInt::deserialize(&mut cursor)?.val() as usize; let range = block_read_index(&block, doc_pos)?;
let start_pos = cursor_len_before - cursor.len(); Ok(block.slice(range))
let end_pos = cursor_len_before - cursor.len() + doc_length;
Ok(block.slice(start_pos..end_pos))
} }
/// Iterator over all Documents in their order as they are stored in the doc store. /// Iterator over all Documents in their order as they are stored in the doc store.
/// Use this, if you want to extract all Documents from the doc store. /// Use this, if you want to extract all Documents from the doc store.
/// The alive_bitset has to be forwarded from the `SegmentReader` or the results maybe wrong. /// The `alive_bitset` has to be forwarded from the `SegmentReader` or the results may be wrong.
pub fn iter<'a: 'b, 'b>( pub fn iter<'a: 'b, 'b>(
&'b self, &'b self,
alive_bitset: Option<&'a AliveBitSet>, alive_bitset: Option<&'a AliveBitSet>,
@@ -237,9 +230,9 @@ impl StoreReader {
}) })
} }
/// Iterator over all RawDocuments in their order as they are stored in the doc store. /// Iterator over all raw Documents in their order as they are stored in the doc store.
/// Use this, if you want to extract all Documents from the doc store. /// Use this, if you want to extract all Documents from the doc store.
/// The alive_bitset has to be forwarded from the `SegmentReader` or the results maybe wrong. /// The `alive_bitset` has to be forwarded from the `SegmentReader` or the results may be wrong.
pub(crate) fn iter_raw<'a: 'b, 'b>( pub(crate) fn iter_raw<'a: 'b, 'b>(
&'b self, &'b self,
alive_bitset: Option<&'a AliveBitSet>, alive_bitset: Option<&'a AliveBitSet>,
@@ -254,9 +247,7 @@ impl StoreReader {
let mut curr_block = curr_checkpoint let mut curr_block = curr_checkpoint
.as_ref() .as_ref()
.map(|checkpoint| self.read_block(checkpoint).map_err(|e| e.kind())); // map error in order to enable cloning .map(|checkpoint| self.read_block(checkpoint).map_err(|e| e.kind())); // map error in order to enable cloning
let mut block_start_pos = 0; let mut doc_pos = 0;
let mut num_skipped = 0;
let mut reset_block_pos = false;
(0..last_doc_id) (0..last_doc_id)
.filter_map(move |doc_id| { .filter_map(move |doc_id| {
// filter_map is only used to resolve lifetime issues between the two closures on // filter_map is only used to resolve lifetime issues between the two closures on
@@ -268,24 +259,19 @@ impl StoreReader {
curr_block = curr_checkpoint curr_block = curr_checkpoint
.as_ref() .as_ref()
.map(|checkpoint| self.read_block(checkpoint).map_err(|e| e.kind())); .map(|checkpoint| self.read_block(checkpoint).map_err(|e| e.kind()));
reset_block_pos = true; doc_pos = 0;
num_skipped = 0;
} }
let alive = alive_bitset.map_or(true, |bitset| bitset.is_alive(doc_id)); let alive = alive_bitset.map_or(true, |bitset| bitset.is_alive(doc_id));
if alive { let res = if alive {
let ret = Some((curr_block.clone(), num_skipped, reset_block_pos)); Some((curr_block.clone(), doc_pos))
// the map block will move over the num_skipped, so we reset to 0
num_skipped = 0;
reset_block_pos = false;
ret
} else { } else {
// we keep the number of skipped documents to move forward in the map block
num_skipped += 1;
None None
} };
doc_pos += 1;
res
}) })
.map(move |(block, num_skipped, reset_block_pos)| { .map(move |(block, doc_pos)| {
let block = block let block = block
.ok_or_else(|| { .ok_or_else(|| {
DataCorruption::comment_only( DataCorruption::comment_only(
@@ -296,30 +282,9 @@ impl StoreReader {
.map_err(|error_kind| { .map_err(|error_kind| {
std::io::Error::new(error_kind, "error when reading block in doc store") std::io::Error::new(error_kind, "error when reading block in doc store")
})?; })?;
// this flag is set, when filter_map moved to the next block
if reset_block_pos {
block_start_pos = 0;
}
let mut cursor = &block[block_start_pos..];
let mut pos = 0;
// move forward 1 doc + num_skipped in block and return length of current doc
let doc_length = loop {
let doc_length = VInt::deserialize(&mut cursor)?.val() as usize;
let num_bytes_read = block[block_start_pos..].len() - cursor.len();
block_start_pos += num_bytes_read;
pos += 1; let range = block_read_index(&block, doc_pos)?;
if pos == num_skipped + 1 { Ok(block.slice(range))
break doc_length;
} else {
block_start_pos += doc_length;
cursor = &block[block_start_pos..];
}
};
let end_pos = block_start_pos + doc_length;
let doc_bytes = block.slice(block_start_pos..end_pos);
block_start_pos = end_pos;
Ok(doc_bytes)
}) })
} }
@@ -329,11 +294,33 @@ impl StoreReader {
} }
} }
fn block_read_index(block: &[u8], doc_pos: u32) -> crate::Result<Range<usize>> {
let doc_pos = doc_pos as usize;
let size_of_u32 = std::mem::size_of::<u32>();
let index_len_pos = block.len() - size_of_u32;
let index_len = u32::deserialize(&mut &block[index_len_pos..])? as usize;
if doc_pos > index_len {
return Err(crate::TantivyError::InternalError(
"Attempted to read doc from wrong block".to_owned(),
));
}
let index_start = block.len() - (index_len + 1) * size_of_u32;
let index = &block[index_start..index_start + index_len * size_of_u32];
let start_offset = u32::deserialize(&mut &index[doc_pos * size_of_u32..])? as usize;
let end_offset = u32::deserialize(&mut &index[(doc_pos + 1) * size_of_u32..])
.unwrap_or(index_start as u32) as usize;
Ok(start_offset..end_offset)
}
#[cfg(feature = "quickwit")] #[cfg(feature = "quickwit")]
impl StoreReader { impl StoreReader {
/// Advanced API. /// Advanced API.
/// ///
/// In most cases use [get_async](Self::get_async) /// In most cases use [`get_async`](Self::get_async)
/// ///
/// Loads and decompresses a block asynchronously. /// Loads and decompresses a block asynchronously.
async fn read_block_async(&self, checkpoint: &Checkpoint) -> crate::AsyncIoResult<Block> { async fn read_block_async(&self, checkpoint: &Checkpoint) -> crate::AsyncIoResult<Block> {
@@ -357,14 +344,14 @@ impl StoreReader {
Ok(decompressed_block) Ok(decompressed_block)
} }
/// Fetches a document asynchronously. /// Reads raw bytes of a given document asynchronously.
pub async fn get_document_bytes_async(&self, doc_id: DocId) -> crate::Result<OwnedBytes> { pub async fn get_document_bytes_async(&self, doc_id: DocId) -> crate::Result<OwnedBytes> {
let checkpoint = self.block_checkpoint(doc_id)?; let checkpoint = self.block_checkpoint(doc_id)?;
let block = self.read_block_async(&checkpoint).await?; let block = self.read_block_async(&checkpoint).await?;
Self::get_document_bytes_from_block(block, doc_id, &checkpoint) Self::get_document_bytes_from_block(block, doc_id, &checkpoint)
} }
/// Reads raw bytes of a given document. Async version of [get](Self::get). /// Fetches a document asynchronously. Async version of [`get`](Self::get).
pub async fn get_async(&self, doc_id: DocId) -> crate::Result<Document> { pub async fn get_async(&self, doc_id: DocId) -> crate::Result<Document> {
let mut doc_bytes = self.get_document_bytes_async(doc_id).await?; let mut doc_bytes = self.get_document_bytes_async(doc_id).await?;
Ok(Document::deserialize(&mut doc_bytes)?) Ok(Document::deserialize(&mut doc_bytes)?)
@@ -427,7 +414,7 @@ mod tests {
assert_eq!(store.cache_stats().cache_hits, 1); assert_eq!(store.cache_stats().cache_hits, 1);
assert_eq!(store.cache_stats().cache_misses, 2); assert_eq!(store.cache_stats().cache_misses, 2);
assert_eq!(store.cache.peek_lru(), Some(9210)); assert_eq!(store.cache.peek_lru(), Some(11163));
Ok(()) Ok(())
} }

View File

@@ -1,6 +1,6 @@
use std::io::{self, Write}; use std::io::{self, Write};
use common::{BinarySerializable, VInt}; use common::BinarySerializable;
use super::compressors::Compressor; use super::compressors::Compressor;
use super::StoreReader; use super::StoreReader;
@@ -22,6 +22,7 @@ pub struct StoreWriter {
num_docs_in_current_block: DocId, num_docs_in_current_block: DocId,
intermediary_buffer: Vec<u8>, intermediary_buffer: Vec<u8>,
current_block: Vec<u8>, current_block: Vec<u8>,
doc_pos: Vec<u32>,
block_compressor: BlockCompressor, block_compressor: BlockCompressor,
} }
@@ -42,6 +43,7 @@ impl StoreWriter {
block_size, block_size,
num_docs_in_current_block: 0, num_docs_in_current_block: 0,
intermediary_buffer: Vec::new(), intermediary_buffer: Vec::new(),
doc_pos: Vec::new(),
current_block: Vec::new(), current_block: Vec::new(),
block_compressor, block_compressor,
}) })
@@ -53,12 +55,17 @@ impl StoreWriter {
/// The memory used (inclusive childs) /// The memory used (inclusive childs)
pub fn mem_usage(&self) -> usize { pub fn mem_usage(&self) -> usize {
self.intermediary_buffer.capacity() + self.current_block.capacity() self.intermediary_buffer.capacity()
+ self.current_block.capacity()
+ self.doc_pos.capacity() * std::mem::size_of::<u32>()
} }
/// Checks if the current block is full, and if so, compresses and flushes it. /// Checks if the current block is full, and if so, compresses and flushes it.
fn check_flush_block(&mut self) -> io::Result<()> { fn check_flush_block(&mut self) -> io::Result<()> {
if self.current_block.len() > self.block_size { // this does not count the VInt storing the index lenght itself, but it is negligible in
// front of everything else.
let index_len = self.doc_pos.len() * std::mem::size_of::<usize>();
if self.current_block.len() + index_len > self.block_size {
self.send_current_block_to_compressor()?; self.send_current_block_to_compressor()?;
} }
Ok(()) Ok(())
@@ -70,8 +77,19 @@ impl StoreWriter {
if self.current_block.is_empty() { if self.current_block.is_empty() {
return Ok(()); return Ok(());
} }
let size_of_u32 = std::mem::size_of::<u32>();
self.current_block
.reserve((self.doc_pos.len() + 1) * size_of_u32);
for pos in self.doc_pos.iter() {
pos.serialize(&mut self.current_block)?;
}
(self.doc_pos.len() as u32).serialize(&mut self.current_block)?;
self.block_compressor self.block_compressor
.compress_block_and_write(&self.current_block, self.num_docs_in_current_block)?; .compress_block_and_write(&self.current_block, self.num_docs_in_current_block)?;
self.doc_pos.clear();
self.current_block.clear(); self.current_block.clear();
self.num_docs_in_current_block = 0; self.num_docs_in_current_block = 0;
Ok(()) Ok(())
@@ -87,8 +105,7 @@ impl StoreWriter {
// calling store bytes would be preferable for code reuse, but then we can't use // calling store bytes would be preferable for code reuse, but then we can't use
// intermediary_buffer due to the borrow checker // intermediary_buffer due to the borrow checker
// a new buffer costs ~1% indexing performance // a new buffer costs ~1% indexing performance
let doc_num_bytes = self.intermediary_buffer.len(); self.doc_pos.push(self.current_block.len() as u32);
VInt(doc_num_bytes as u64).serialize_into_vec(&mut self.current_block);
self.current_block self.current_block
.write_all(&self.intermediary_buffer[..])?; .write_all(&self.intermediary_buffer[..])?;
self.num_docs_in_current_block += 1; self.num_docs_in_current_block += 1;
@@ -101,8 +118,7 @@ impl StoreWriter {
/// The document id is implicitly the current number /// The document id is implicitly the current number
/// of documents. /// of documents.
pub fn store_bytes(&mut self, serialized_document: &[u8]) -> io::Result<()> { pub fn store_bytes(&mut self, serialized_document: &[u8]) -> io::Result<()> {
let doc_num_bytes = serialized_document.len(); self.doc_pos.push(self.current_block.len() as u32);
VInt(doc_num_bytes as u64).serialize_into_vec(&mut self.current_block);
self.current_block.extend_from_slice(serialized_document); self.current_block.extend_from_slice(serialized_document);
self.num_docs_in_current_block += 1; self.num_docs_in_current_block += 1;
self.check_flush_block()?; self.check_flush_block()?;