Compare commits

...

21 Commits

Author SHA1 Message Date
pascal
74f37f045d Avoid scoring buffered unions when scores are ignored
BufferedUnionScorer can use score_doc during refill only when the score combiner needs scores. DoNothingCombiner now advertises that scoring is unnecessary, preserving the no-score path for count collectors and avoiding wasted score_doc calls.

Add a regression test that verifies DoNothingCombiner does not invoke score() or score_doc() while counting a buffered union.
2026-05-31 21:58:29 +02:00
pascal
72cca113cd cargo fmt, remove impl 2026-05-31 21:48:03 +02:00
pascal
672bf45235 Clarify postings copy variable names 2026-05-31 20:50:35 +02:00
pascal
33ef167441 Share BM25 fieldnorm caches per thread
Reuse BM25 TF normalization caches for weights with the same average fieldnorm using a bounded thread-local LRU. This avoids recomputing and duplicating the cache for many terms on the same field without adding cross-thread contention.
2026-05-31 19:13:18 +02:00
pascal
bf8b263f16 Optimize buffered union scoring with block refills
Add horizon-limited buffering APIs for docsets and scorers so buffered union can refill from block-oriented postings while preserving term frequencies. This lets term scorers score buffered docs directly and reduces per-document refill overhead for dense unions.
2026-05-31 19:13:18 +02:00
pascal
24a97dbe69 Split buffered refill from scorer removal 2026-05-31 12:20:07 +02:00
pascal
34fec8b23e Defer terminated scorer removal during buffered refill 2026-05-31 11:53:11 +02:00
Paul Masurel
46b3fb9ed3 Relying on upstream version of datasketch and stop using HLL 4. (#2936)
We were relying on a fork for:

a bugfix in LIST serialization
a better API exposing a new Coupon type, required for caching coupons.
We also stop using HLL8 in hope to fix
https://datadoghq.atlassian.net/browse/CLOUDPREM-625

Co-authored-by: Paul Masurel <paul.masurel@datadoghq.com>
2026-05-19 13:29:35 +02:00
trinity-1686a
fbe620b9b4 Merge pull request #2933 from quickwit-oss/1686a/sstable-opt
optimise sstable index access pattern
2026-05-19 11:43:17 +02:00
trinity-1686a
95d8a3989a cr 2026-05-19 11:38:48 +02:00
trinity-1686a
ea61a68db4 skip sstable index binary search when ordinal is in same block 2026-05-16 11:35:38 +02:00
trinity-1686a
c367df37c1 refactor sstable index 2026-05-16 11:30:02 +02:00
Mohammad Dashti
d99a5d4e91 Rename validate_aggregation_fields to validate_aggregation_fields_exist
Applies @PSeitz's review suggestion to make the function name more
descriptive of what it checks. Also adds a doc note clarifying why
validation is opt-in rather than enforced by default.
2026-05-16 15:45:20 +08:00
Mohammad Dashti
2de6f075ce Fixed the example 2026-05-16 15:45:20 +08:00
Mohammad Dashti
18080067c7 Applied PR comment:
I would move it outside of the aggregation. You can fetch the fields from the aggregation request and do a validation in a helper function
2026-05-16 15:45:20 +08:00
Mohammad Dashti
95db7d2e5c Revert "Revert all impl."
This reverts commit d5e0991549a05bf80f19f853f7689ad69f96e7e5.
2026-05-16 15:45:20 +08:00
Mohammad Dashti
fc017c4c74 Applied PR comments. 2026-05-16 15:45:20 +08:00
Mohammad Dashti
141c91d028 Added a flag: strict_validation 2026-05-16 15:45:20 +08:00
Mohammad Dashti
36a83e7c1a Fixed agg validation 2026-05-16 15:45:20 +08:00
jinhelin
be11f8a6a1 Fix opening positions file error 2026-05-14 15:55:59 +08:00
dependabot[bot]
4305e4029e Update binggan requirement from 0.16.1 to 0.17.0
Updates the requirements on [binggan](https://github.com/pseitz/binggan) to permit the latest version.
- [Changelog](https://github.com/PSeitz/binggan/blob/main/CHANGELOG.md)
- [Commits](https://github.com/pseitz/binggan/commits)

---
updated-dependencies:
- dependency-name: binggan
  dependency-version: 0.17.0
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-05-12 15:10:20 +08:00
29 changed files with 1152 additions and 378 deletions

View File

@@ -65,7 +65,7 @@ tantivy-bitpacker = { version = "0.10", path = "./bitpacker" }
common = { version = "0.11", path = "./common/", package = "tantivy-common" }
tokenizer-api = { version = "0.7", path = "./tokenizer-api", package = "tantivy-tokenizer-api" }
sketches-ddsketch = { version = "0.4", features = ["use_serde"] }
datasketches = { git = "https://github.com/fulmicoton-dd/datasketches-rust", rev = "7635fb8" }
datasketches = { version = "0.3.0", features = ["hll"] }
futures-util = { version = "0.3.28", optional = true }
futures-channel = { version = "0.3.28", optional = true }
fnv = "1.0.7"
@@ -75,7 +75,7 @@ typetag = "0.2.21"
winapi = "0.3.9"
[dev-dependencies]
binggan = "0.16.1"
binggan = "0.17.0"
rand = "0.9"
maplit = "1.0.2"
matches = "0.1.9"

View File

@@ -23,7 +23,7 @@ downcast-rs = "2.0.1"
proptest = "1"
more-asserts = "0.3.1"
rand = "0.9"
binggan = "0.16.1"
binggan = "0.17.0"
[[bench]]
name = "bench_merge"

View File

@@ -19,6 +19,6 @@ time = { version = "0.3.47", features = ["serde-well-known"] }
serde = { version = "1.0.136", features = ["derive"] }
[dev-dependencies]
binggan = "0.16.1"
binggan = "0.17.0"
proptest = "1.0.0"
rand = "0.9"

View File

@@ -115,6 +115,71 @@ pub fn get_fast_field_names(aggs: &Aggregations) -> HashSet<String> {
fast_field_names
}
/// Validates that all fields referenced in the aggregation request exist in the schema
/// and are configured as fast fields.
///
/// This is a convenience function for upfront validation before executing aggregations.
/// Returns an error if any field doesn't exist or is not a fast field.
///
/// Validation is intentionally opt-in rather than baked into aggregation execution: the
/// default lenient behavior (returning empty results for missing fields) supports
/// schema evolution and federated queries where the same request runs against segments
/// or indices with different schemas.
///
/// # Example
/// ```
/// use tantivy::aggregation::agg_req::{Aggregations, validate_aggregation_fields_exist};
/// use tantivy::schema::{Schema, FAST};
/// use tantivy::Index;
///
/// # fn main() -> tantivy::Result<()> {
/// // Create a simple index
/// let mut schema_builder = Schema::builder();
/// schema_builder.add_f64_field("price", FAST);
/// let schema = schema_builder.build();
/// let index = Index::create_in_ram(schema);
///
/// // Parse aggregation request
/// let agg_req: Aggregations = serde_json::from_str(r#"{
/// "avg_price": { "avg": { "field": "price" } }
/// }"#)?;
///
/// let reader = index.reader()?;
/// let searcher = reader.searcher();
///
/// // Validate fields before executing
/// for segment_reader in searcher.segment_readers() {
/// validate_aggregation_fields_exist(&agg_req, segment_reader)?;
/// }
/// # Ok(())
/// # }
/// ```
pub fn validate_aggregation_fields_exist(
aggs: &Aggregations,
reader: &crate::SegmentReader,
) -> crate::Result<()> {
let field_names = get_fast_field_names(aggs);
let schema = reader.schema();
for field_name in field_names {
// Check if the field is either directly in the schema or could be part of a json field
// present in the schema, and verify it's a fast field.
if let Some((field, _path)) = schema.find_field(&field_name) {
let field_type = schema.get_field_entry(field).field_type();
if !field_type.is_fast() {
return Err(crate::TantivyError::SchemaError(format!(
"Field '{}' is not a fast field. Aggregations require fast fields.",
field_name
)));
}
} else {
return Err(crate::TantivyError::FieldNotFound(field_name));
}
}
Ok(())
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
/// All aggregation types.
pub enum AggregationVariants {

View File

@@ -1436,3 +1436,46 @@ fn test_aggregation_on_json_object_mixed_numerical_segments() {
)
);
}
#[test]
fn test_aggregation_field_validation_helper() {
// Test the standalone validation helper function for field validation
let index = get_test_index_2_segments(false).unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let segment_reader = searcher.segment_reader(0);
// Test with invalid field
let agg_req: Aggregations = serde_json::from_str(
r#"{
"avg_test": {
"avg": { "field": "nonexistent_field" }
}
}"#,
)
.unwrap();
let result =
crate::aggregation::agg_req::validate_aggregation_fields_exist(&agg_req, segment_reader);
assert!(result.is_err());
match result {
Err(crate::TantivyError::FieldNotFound(field_name)) => {
assert_eq!(field_name, "nonexistent_field");
}
_ => panic!("Expected FieldNotFound error, got: {:?}", result),
}
// Test with valid field
let agg_req: Aggregations = serde_json::from_str(
r#"{
"avg_test": {
"avg": { "field": "score" }
}
}"#,
)
.unwrap();
let result =
crate::aggregation::agg_req::validate_aggregation_fields_exist(&agg_req, segment_reader);
assert!(result.is_ok());
}

View File

@@ -166,8 +166,12 @@ impl CouponCache {
let should_use_dense =
highest_term_ord < 1_000_000u64 || highest_term_ord < num_terms as u64 * 3u64;
if should_use_dense {
let mut coupon_map: Vec<Coupon> = vec![Coupon::EMPTY; highest_term_ord as usize + 1];
for (term_ord, coupon) in term_ords.into_iter().zip(coupons.into_iter()) {
// We don't really care about the value here. We will populate all the values we will
// read anyway.
let uninitialized_coupon = Coupon::from_hash(0);
let mut coupon_map: Vec<Coupon> =
vec![uninitialized_coupon; highest_term_ord as usize + 1];
for (term_ord, coupon) in term_ords.into_iter().zip(coupons) {
coupon_map[term_ord as usize] = coupon;
}
CouponCache::Dense {
@@ -821,7 +825,7 @@ impl<'de> Deserialize<'de> for CardinalityCollector {
impl CardinalityCollector {
fn new(salt: u8) -> Self {
Self {
sketch: HllSketch::new(LG_K, HllType::Hll4),
sketch: HllSketch::new(LG_K, HllType::Hll8),
salt,
}
}
@@ -852,7 +856,7 @@ impl CardinalityCollector {
let mut union = HllUnion::new(LG_K);
union.update(&self.sketch);
union.update(&right.sketch);
self.sketch = union.to_sketch(HllType::Hll4);
self.sketch = union.to_sketch(HllType::Hll8);
Ok(())
}
}

View File

@@ -138,6 +138,31 @@ pub trait DocSet: Send {
buffer.len()
}
/// Fills a given mutable buffer with the next doc ids smaller than `horizon`.
///
/// Unlike [`DocSet::fill_buffer`], this method must not advance past a doc id greater than or
/// equal to `horizon`.
fn fill_buffer_up_to(
&mut self,
horizon: DocId,
buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
) -> usize {
if self.doc() == TERMINATED {
return 0;
}
for (pos, buffer_val) in buffer.iter_mut().enumerate() {
let doc = self.doc();
if doc >= horizon {
return pos;
}
*buffer_val = doc;
if self.advance() == TERMINATED {
return pos + 1;
}
}
buffer.len()
}
/// Returns the current document
/// Right after creating a new `DocSet`, the docset points to the first document.
///
@@ -251,6 +276,14 @@ impl DocSet for &mut dyn DocSet {
(**self).fill_buffer(buffer)
}
fn fill_buffer_up_to(
&mut self,
horizon: DocId,
buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
) -> usize {
(**self).fill_buffer_up_to(horizon, buffer)
}
fn fill_bitset_block(
&mut self,
min_doc: DocId,

View File

@@ -6,6 +6,7 @@ use common::{ByteCount, HasLen};
use fnv::FnvHashMap;
use itertools::Itertools;
use crate::directory::error::OpenReadError;
use crate::directory::{CompositeFile, FileSlice};
use crate::error::DataCorruption;
use crate::fastfield::{intersect_alive_bitsets, AliveBitSet, FacetReader, FastFieldReaders};
@@ -159,12 +160,10 @@ impl SegmentReader {
let postings_file = segment.open_read(SegmentComponent::Postings)?;
let postings_composite = CompositeFile::open(&postings_file)?;
let positions_composite = {
if let Ok(positions_file) = segment.open_read(SegmentComponent::Positions) {
CompositeFile::open(&positions_file)?
} else {
CompositeFile::empty()
}
let positions_composite = match segment.open_read(SegmentComponent::Positions) {
Ok(positions_file) => CompositeFile::open(&positions_file)?,
Err(OpenReadError::FileDoesNotExist(_)) => CompositeFile::empty(),
Err(open_read_error) => return Err(open_read_error.into()),
};
let schema = segment.schema();

View File

@@ -240,6 +240,42 @@ impl BlockSegmentPostings {
self.freq_decoder.output_array()
}
pub(crate) fn copy_docs_and_term_freqs(
&self,
block_offset: usize,
horizon: DocId,
docs: &mut [DocId],
term_freqs: &mut [u32],
) -> usize {
debug_assert_eq!(docs.len(), term_freqs.len());
let block_docs = self.docs();
let remaining_docs_in_block = block_docs.len().saturating_sub(block_offset);
let max_len = remaining_docs_in_block.min(docs.len());
if max_len == 0 {
return 0;
}
let source_docs = &block_docs[block_offset..block_offset + max_len];
let len = if source_docs[max_len - 1] < horizon {
max_len
} else {
source_docs
.iter()
.position(|&doc| doc >= horizon)
.unwrap_or(max_len)
};
docs[..len].copy_from_slice(&source_docs[..len]);
let block_freqs = self.freq_output_array();
if block_freqs.len() >= block_offset + len {
term_freqs[..len].copy_from_slice(&block_freqs[block_offset..block_offset + len]);
} else {
term_freqs[..len].fill(1);
}
len
}
/// Return the frequency at index `idx` of the block.
#[inline]
pub fn freq(&self, idx: usize) -> u32 {

View File

@@ -532,6 +532,16 @@ pub(crate) mod tests {
fn score(&mut self) -> Score {
self.0.score()
}
#[inline]
fn can_score_doc(&self) -> bool {
self.0.can_score_doc()
}
#[inline]
fn score_doc(&mut self, doc: DocId, term_freq: u32) -> Score {
self.0.score_doc(doc, term_freq)
}
}
pub fn test_skip_against_unoptimized<F: Fn() -> Box<dyn DocSet>>(

View File

@@ -1,6 +1,6 @@
use common::HasLen;
use crate::docset::DocSet;
use crate::docset::{DocSet, COLLECT_BLOCK_BUFFER_LEN};
use crate::fastfield::AliveBitSet;
use crate::positions::PositionReader;
use crate::postings::compression::COMPRESSION_BLOCK_SIZE;
@@ -151,6 +151,34 @@ impl SegmentPostings {
position_reader,
}
}
pub(crate) fn fill_buffer_up_to_with_term_freqs(
&mut self,
horizon: DocId,
docs: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
term_freqs: &mut [u32; COLLECT_BLOCK_BUFFER_LEN],
) -> usize {
let mut num_elems = 0;
while num_elems < COLLECT_BLOCK_BUFFER_LEN && self.doc() < horizon {
let copied = self.block_cursor.copy_docs_and_term_freqs(
self.cur,
horizon,
&mut docs[num_elems..],
&mut term_freqs[num_elems..],
);
if copied == 0 {
break;
}
num_elems += copied;
self.cur += copied;
if self.cur == COMPRESSION_BLOCK_SIZE {
self.cur = 0;
self.block_cursor.advance();
}
}
num_elems
}
}
impl DocSet for SegmentPostings {

View File

@@ -109,6 +109,16 @@ impl Scorer for AllScorer {
fn score(&mut self) -> Score {
1.0
}
#[inline]
fn can_score_doc(&self) -> bool {
true
}
#[inline]
fn score_doc(&mut self, _doc: DocId, _term_freq: u32) -> Score {
1.0
}
}
#[cfg(test)]

View File

@@ -1,5 +1,9 @@
use std::cell::RefCell;
use std::num::NonZeroUsize;
use std::sync::Arc;
use lru::LruCache;
use crate::fieldnorm::FieldNormReader;
use crate::query::Explanation;
use crate::schema::Field;
@@ -59,7 +63,9 @@ fn cached_tf_component(fieldnorm: u32, average_fieldnorm: Score) -> Score {
K1 * (1.0 - B + B * fieldnorm as Score / average_fieldnorm)
}
fn compute_tf_cache(average_fieldnorm: Score) -> Arc<[Score; 256]> {
const BM25_TF_CACHE_CAPACITY: usize = 64;
fn compute_tf_cache_uncached(average_fieldnorm: Score) -> Arc<[Score; 256]> {
let mut cache: [Score; 256] = [0.0; 256];
for (fieldnorm_id, cache_mut) in cache.iter_mut().enumerate() {
let fieldnorm = FieldNormReader::id_to_fieldnorm(fieldnorm_id as u8);
@@ -68,6 +74,36 @@ fn compute_tf_cache(average_fieldnorm: Score) -> Arc<[Score; 256]> {
Arc::new(cache)
}
thread_local! {
static TF_CACHES: RefCell<LruCache<u32, Arc<[Score; 256]>>> = RefCell::new(LruCache::new(
NonZeroUsize::new(BM25_TF_CACHE_CAPACITY).unwrap(),
));
}
/// The cache is shared across all [Bm25Weight] with the same average fieldnorm on the same thread.
/// It is stored in a thread local LRU cache.
///
/// On one query all terms on the same field will share the same average fieldnorm, and thus the
/// same cache. This will lower cache pressure.
///
/// Even between queries (on the same thread), the cache will be reused, which allows the cache to
/// better learn the memory address of the cache and access patterns.
///
/// Thread local is used in order to be defensive about potential contention on the cache.
fn compute_tf_cache(average_fieldnorm: Score) -> Arc<[Score; 256]> {
let cache_key = average_fieldnorm.to_bits();
TF_CACHES.with(|cache_by_average_fieldnorm| {
let mut cache_by_average_fieldnorm = cache_by_average_fieldnorm.borrow_mut();
if let Some(cache) = cache_by_average_fieldnorm.get(&cache_key) {
return cache.clone();
}
let cache = compute_tf_cache_uncached(average_fieldnorm);
cache_by_average_fieldnorm.put(cache_key, cache.clone());
cache
})
}
/// A struct used for computing BM25 scores.
#[derive(Clone)]
pub struct Bm25Weight {
@@ -229,7 +265,7 @@ impl Bm25Weight {
#[cfg(test)]
mod tests {
use super::idf;
use super::{idf, Bm25Weight};
use crate::{assert_nearly_equals, Score};
#[test]
@@ -237,4 +273,12 @@ mod tests {
let score: Score = 2.0;
assert_nearly_equals!(idf(1, 2), score.ln());
}
#[test]
fn test_bm25_tf_cache_is_shared_for_same_average_fieldnorm() {
let weight1 = Bm25Weight::for_one_term(1, 10, 3.0);
let weight2 = Bm25Weight::for_one_term(2, 10, 3.0);
assert!(std::sync::Arc::ptr_eq(&weight1.cache, &weight2.cache));
}
}

View File

@@ -91,10 +91,14 @@ fn into_box_scorer<TScoreCombiner: ScoreCombiner>(
num_docs: u32,
) -> Box<dyn Scorer> {
match scorer {
SpecializedScorer::TermUnion(term_scorers) => {
let union_scorer =
BufferedUnionScorer::build(term_scorers, score_combiner_fn, num_docs);
Box::new(union_scorer)
SpecializedScorer::TermUnion(mut term_scorers) => {
if term_scorers.len() == 1 {
Box::new(term_scorers.pop().unwrap())
} else {
let union_scorer =
BufferedUnionScorer::build(term_scorers, score_combiner_fn, num_docs);
Box::new(union_scorer)
}
}
SpecializedScorer::TermIntersection(term_scorers) => {
let boxed_scorers: Vec<Box<dyn Scorer>> = term_scorers

View File

@@ -112,6 +112,14 @@ impl<S: Scorer> DocSet for BoostScorer<S> {
self.underlying.fill_buffer(buffer)
}
fn fill_buffer_up_to(
&mut self,
horizon: DocId,
buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
) -> usize {
self.underlying.fill_buffer_up_to(horizon, buffer)
}
fn doc(&self) -> u32 {
self.underlying.doc()
}
@@ -138,6 +146,27 @@ impl<S: Scorer> Scorer for BoostScorer<S> {
fn score(&mut self) -> Score {
self.underlying.score() * self.boost
}
#[inline]
fn can_score_doc(&self) -> bool {
self.underlying.can_score_doc()
}
#[inline]
fn score_doc(&mut self, doc: DocId, term_freq: u32) -> Score {
self.underlying.score_doc(doc, term_freq) * self.boost
}
#[inline]
fn fill_buffer_up_to_with_term_freqs(
&mut self,
horizon: DocId,
docs: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
term_freqs: &mut [u32; COLLECT_BLOCK_BUFFER_LEN],
) -> usize {
self.underlying
.fill_buffer_up_to_with_term_freqs(horizon, docs, term_freqs)
}
}
#[cfg(test)]

View File

@@ -141,6 +141,16 @@ impl<TDocSet: DocSet + 'static> Scorer for ConstScorer<TDocSet> {
fn score(&mut self) -> Score {
self.score
}
#[inline]
fn can_score_doc(&self) -> bool {
true
}
#[inline]
fn score_doc(&mut self, _doc: DocId, _term_freq: u32) -> Score {
self.score
}
}
#[cfg(test)]

View File

@@ -315,6 +315,20 @@ mod tests {
fn score(&mut self) -> Score {
self.foo.get(self.cursor).map(|x| x.1).unwrap_or(0.0)
}
#[inline]
fn can_score_doc(&self) -> bool {
true
}
#[inline]
fn score_doc(&mut self, doc: DocId, _term_freq: u32) -> Score {
self.foo
.iter()
.find(|(candidate_doc, _)| *candidate_doc == doc)
.map(|(_, score)| *score)
.unwrap_or(0.0)
}
}
#[test]

View File

@@ -59,6 +59,16 @@ impl Scorer for EmptyScorer {
fn score(&mut self) -> Score {
0.0
}
#[inline]
fn can_score_doc(&self) -> bool {
true
}
#[inline]
fn score_doc(&mut self, _doc: DocId, _term_freq: u32) -> Score {
0.0
}
}
#[cfg(test)]

View File

@@ -1,5 +1,40 @@
use crate::docset::{DocSet, TERMINATED};
use crate::query::Scorer;
use crate::Score;
use crate::{DocId, Score};
struct ScoreOnlyScorer {
doc: DocId,
score: Score,
}
impl DocSet for ScoreOnlyScorer {
fn advance(&mut self) -> DocId {
self.doc = TERMINATED;
TERMINATED
}
fn doc(&self) -> DocId {
self.doc
}
fn size_hint(&self) -> u32 {
1
}
}
impl Scorer for ScoreOnlyScorer {
fn score(&mut self) -> Score {
self.score
}
fn can_score_doc(&self) -> bool {
true
}
fn score_doc(&mut self, _doc: DocId, _term_freq: u32) -> Score {
self.score
}
}
/// The `ScoreCombiner` trait defines how to compute
/// an overall score given a list of scores.
@@ -10,6 +45,17 @@ pub trait ScoreCombiner: Default + Clone + Send + Copy + 'static {
/// or not.
fn update<TScorer: Scorer>(&mut self, scorer: &mut TScorer);
/// Aggregates the score combiner with an already computed score.
fn update_score(&mut self, doc: DocId, score: Score) {
let mut scorer = ScoreOnlyScorer { doc, score };
self.update(&mut scorer);
}
/// Returns true if this combiner needs scorer scores to compute its state.
fn requires_scoring() -> bool {
true
}
/// Clears the score combiner state back to its initial state.
fn clear(&mut self);
@@ -27,6 +73,12 @@ pub struct DoNothingCombiner;
impl ScoreCombiner for DoNothingCombiner {
fn update<TScorer: Scorer>(&mut self, _scorer: &mut TScorer) {}
fn update_score(&mut self, _doc: DocId, _score: Score) {}
fn requires_scoring() -> bool {
false
}
fn clear(&mut self) {}
#[inline]
@@ -42,10 +94,16 @@ pub struct SumCombiner {
}
impl ScoreCombiner for SumCombiner {
#[inline]
fn update<TScorer: Scorer>(&mut self, scorer: &mut TScorer) {
self.score += scorer.score();
}
#[inline]
fn update_score(&mut self, _doc: DocId, score: Score) {
self.score += score;
}
fn clear(&mut self) {
self.score = 0.0;
}
@@ -77,12 +135,19 @@ impl DisjunctionMaxCombiner {
}
impl ScoreCombiner for DisjunctionMaxCombiner {
#[inline]
fn update<TScorer: Scorer>(&mut self, scorer: &mut TScorer) {
let score = scorer.score();
self.max = Score::max(score, self.max);
self.sum += score;
}
#[inline]
fn update_score(&mut self, _doc: DocId, score: Score) {
self.max = Score::max(score, self.max);
self.sum += score;
}
fn clear(&mut self) {
self.max = 0.0;
self.sum = 0.0;

View File

@@ -2,8 +2,8 @@ use std::ops::DerefMut;
use downcast_rs::impl_downcast;
use crate::docset::DocSet;
use crate::Score;
use crate::docset::{DocSet, COLLECT_BLOCK_BUFFER_LEN};
use crate::{DocId, Score};
/// Scored set of documents matching a query within a specific segment.
///
@@ -13,6 +13,36 @@ pub trait Scorer: downcast_rs::Downcast + DocSet + 'static {
///
/// This method will perform a bit of computation and is not cached.
fn score(&mut self) -> Score;
/// Returns true if [`Scorer::score_doc`] can score buffered docs without
/// repositioning the scorer.
///
/// Scorers whose [`Scorer::score_doc`] needs term frequencies must also override
/// [`Scorer::fill_buffer_up_to_with_term_freqs`].
fn can_score_doc(&self) -> bool {
false
}
/// Returns the score for `doc` with its term frequency.
fn score_doc(&mut self, _doc: DocId, _term_freq: u32) -> Score {
panic!(
"score_doc is not supported by this scorer. You need check can_score_doc() before \
calling this method."
)
}
/// Fills docs up to `horizon`.
///
/// The default implementation does not fill `term_freqs`. Scorers whose
/// [`Scorer::score_doc`] reads term frequencies must override this method.
fn fill_buffer_up_to_with_term_freqs(
&mut self,
horizon: DocId,
docs: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
_term_freqs: &mut [u32; COLLECT_BLOCK_BUFFER_LEN],
) -> usize {
DocSet::fill_buffer_up_to(self, horizon, docs)
}
}
impl_downcast!(Scorer);
@@ -22,4 +52,25 @@ impl Scorer for Box<dyn Scorer> {
fn score(&mut self) -> Score {
self.deref_mut().score()
}
#[inline]
fn can_score_doc(&self) -> bool {
self.as_ref().can_score_doc()
}
#[inline]
fn score_doc(&mut self, doc: DocId, term_freq: u32) -> Score {
self.deref_mut().score_doc(doc, term_freq)
}
#[inline]
fn fill_buffer_up_to_with_term_freqs(
&mut self,
horizon: DocId,
docs: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
term_freqs: &mut [u32; COLLECT_BLOCK_BUFFER_LEN],
) -> usize {
self.deref_mut()
.fill_buffer_up_to_with_term_freqs(horizon, docs, term_freqs)
}
}

View File

@@ -1,4 +1,4 @@
use crate::docset::DocSet;
use crate::docset::{DocSet, COLLECT_BLOCK_BUFFER_LEN};
use crate::fieldnorm::FieldNormReader;
use crate::postings::{BlockSegmentPostings, FreqReadingOption, Postings, SegmentPostings};
use crate::query::bm25::Bm25Weight;
@@ -147,6 +147,27 @@ impl Scorer for TermScorer {
let term_freq = self.term_freq();
self.similarity_weight.score(fieldnorm_id, term_freq)
}
#[inline]
fn can_score_doc(&self) -> bool {
true
}
#[inline]
fn score_doc(&mut self, doc: DocId, term_freq: u32) -> Score {
let fieldnorm_id = self.fieldnorm_reader.fieldnorm_id(doc);
self.similarity_weight.score(fieldnorm_id, term_freq)
}
fn fill_buffer_up_to_with_term_freqs(
&mut self,
horizon: DocId,
docs: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
term_freqs: &mut [u32; COLLECT_BLOCK_BUFFER_LEN],
) -> usize {
self.postings
.fill_buffer_up_to_with_term_freqs(horizon, docs, term_freqs)
}
}
#[cfg(test)]

View File

@@ -10,23 +10,7 @@ use crate::{DocId, Score};
// of upcoming document IDs (the "horizon").
const HORIZON_NUM_TINYBITSETS: usize = HORIZON as usize / 64;
const HORIZON: u32 = 64u32 * 64u32;
// `drain_filter` is not stable yet.
// This function is similar except that it does is not unstable, and
// it does not keep the original vector ordering.
//
// Elements are dropped and not yielded.
fn unordered_drain_filter<T, P>(v: &mut Vec<T>, mut predicate: P)
where P: FnMut(&mut T) -> bool {
let mut i = 0;
while i < v.len() {
if predicate(&mut v[i]) {
v.swap_remove(i);
} else {
i += 1;
}
}
}
const GROUPED_INSERT_MAX_BUCKET_SPAN: u32 = 2;
/// Creates a `DocSet` that iterate through the union of two or more `DocSet`s.
pub struct BufferedUnionScorer<TScorer, TScoreCombiner = DoNothingCombiner> {
@@ -53,31 +37,213 @@ pub struct BufferedUnionScorer<TScorer, TScoreCombiner = DoNothingCombiner> {
score: Score,
/// Number of documents in the segment.
num_docs: u32,
/// Scratch buffer for block-based refill.
refill_docs: [DocId; COLLECT_BLOCK_BUFFER_LEN],
/// Scratch buffer for term frequencies matching `refill_docs`.
refill_term_freqs: [u32; COLLECT_BLOCK_BUFFER_LEN],
/// Whether all children support scoring buffered docs after advancing.
use_score_doc_refill: bool,
}
#[inline]
fn union_bucket(
bitsets: &mut [TinySet; HORIZON_NUM_TINYBITSETS],
bucket_pos: u32,
tinyset: TinySet,
) {
debug_assert!((bucket_pos as usize) < HORIZON_NUM_TINYBITSETS);
// `bucket` comes from a doc delta below `HORIZON`; there are exactly
// `HORIZON / 64` buckets in the refill window.
bitsets[bucket_pos as usize] = bitsets[bucket_pos as usize].union(tinyset);
}
#[inline]
fn insert_delta(bitsets: &mut [TinySet; HORIZON_NUM_TINYBITSETS], delta: DocId) {
debug_assert!(delta < HORIZON);
// `delta < HORIZON`, so `delta / 64` is in the bitset array. The bit
// offset is reduced modulo 64 before being inserted in the TinySet.
bitsets[delta as usize / 64].insert_mut(delta % 64u32);
}
fn insert_and_score_full_buffer<TScorer: Scorer, TScoreCombiner: ScoreCombiner>(
scorer: &mut TScorer,
docs: &[DocId; COLLECT_BLOCK_BUFFER_LEN],
term_freqs: &[u32; COLLECT_BLOCK_BUFFER_LEN],
bitsets: &mut [TinySet; HORIZON_NUM_TINYBITSETS],
score_combiner: &mut [TScoreCombiner; HORIZON as usize],
min_doc: DocId,
) {
debug_assert!(docs.windows(2).all(|pair| pair[0] < pair[1]));
debug_assert!(docs[COLLECT_BLOCK_BUFFER_LEN - 1] - min_doc < HORIZON);
let first_delta = docs[0] - min_doc;
let last_delta = docs[COLLECT_BLOCK_BUFFER_LEN - 1] - min_doc;
let first_bucket = first_delta / 64;
let last_bucket = last_delta / 64;
// Common for very dense scorers: 64 distinct doc ids in one 64-doc bucket
// means all bits in that bucket are present.
if first_bucket == last_bucket {
union_bucket(bitsets, first_bucket, TinySet::full());
score_full_buffer(scorer, docs, term_freqs, score_combiner, min_doc);
return;
}
// 64 sorted distinct integers spanning exactly 64 values are consecutive.
// If they cross a TinySet boundary, this is just the suffix of the first
// bucket plus the prefix of the second bucket.
if last_delta - first_delta == COLLECT_BLOCK_BUFFER_LEN as u32 - 1 {
union_bucket(
bitsets,
first_bucket,
TinySet::range_greater_or_equal(first_delta % 64u32),
);
union_bucket(
bitsets,
last_bucket,
TinySet::range_lower((last_delta + 1) % 64u32),
);
score_full_buffer(scorer, docs, term_freqs, score_combiner, min_doc);
return;
}
// Grouping wins only for very dense buffers that hit the same TinySet many
// times. Once the 64 docs are spread farther, a straight pass is cheaper.
if last_bucket - first_bucket <= GROUPED_INSERT_MAX_BUCKET_SPAN {
let mut bucket = first_bucket;
let mut tinyset = TinySet::empty();
for (&doc, &term_freq) in docs.iter().zip(term_freqs.iter()) {
let delta = doc - min_doc;
let delta_bucket = delta / 64;
if delta_bucket != bucket {
union_bucket(bitsets, bucket, tinyset);
bucket = delta_bucket;
tinyset = TinySet::empty();
}
tinyset.insert_mut(delta % 64u32);
let score = scorer.score_doc(doc, term_freq);
update_score_combiner(score_combiner, delta, doc, score);
}
union_bucket(bitsets, bucket, tinyset);
} else {
for (&doc, &term_freq) in docs.iter().zip(term_freqs.iter()) {
let delta = doc - min_doc;
insert_delta(bitsets, delta);
// TODO: score_doc access the field_norm reader for each _term_, instead of once per
// doc. We could optimize this by caching the field norm for the doc, and
// reusing it for all terms in the doc.
let score = scorer.score_doc(doc, term_freq);
update_score_combiner(score_combiner, delta, doc, score);
}
}
}
#[inline]
fn update_score_combiner<TScoreCombiner: ScoreCombiner>(
score_combiner: &mut [TScoreCombiner; HORIZON as usize],
delta: DocId,
doc: DocId,
score: Score,
) {
debug_assert!(delta < HORIZON);
// Full and partial refill only buffer docs below `horizon`, so their
// deltas are always in the score-combiner window.
score_combiner[delta as usize].update_score(doc, score);
}
fn score_full_buffer<TScorer: Scorer, TScoreCombiner: ScoreCombiner>(
scorer: &mut TScorer,
docs: &[DocId; COLLECT_BLOCK_BUFFER_LEN],
term_freqs: &[u32; COLLECT_BLOCK_BUFFER_LEN],
score_combiner: &mut [TScoreCombiner; HORIZON as usize],
min_doc: DocId,
) {
for (&doc, &term_freq) in docs.iter().zip(term_freqs.iter()) {
let score = scorer.score_doc(doc, term_freq);
update_score_combiner(score_combiner, doc - min_doc, doc, score);
}
}
fn refill_scorer_with_score_docs<TScorer: Scorer, TScoreCombiner: ScoreCombiner>(
scorer: &mut TScorer,
bitsets: &mut [TinySet; HORIZON_NUM_TINYBITSETS],
score_combiner: &mut [TScoreCombiner; HORIZON as usize],
docs: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
term_freqs: &mut [u32; COLLECT_BLOCK_BUFFER_LEN],
min_doc: DocId,
horizon: DocId,
) {
loop {
let len = scorer.fill_buffer_up_to_with_term_freqs(horizon, docs, term_freqs);
if len == COLLECT_BLOCK_BUFFER_LEN {
debug_assert!(docs[COLLECT_BLOCK_BUFFER_LEN - 1] != TERMINATED);
debug_assert!(docs[COLLECT_BLOCK_BUFFER_LEN - 1] < horizon);
insert_and_score_full_buffer(
scorer,
docs,
term_freqs,
bitsets,
score_combiner,
min_doc,
);
} else {
for (&doc, &term_freq) in docs[..len].iter().zip(term_freqs[..len].iter()) {
let delta = doc - min_doc;
insert_delta(bitsets, delta);
let score = scorer.score_doc(doc, term_freq);
update_score_combiner(score_combiner, delta, doc, score);
}
break;
}
}
}
fn refill_scorer_from_current_doc<TScorer: Scorer, TScoreCombiner: ScoreCombiner>(
scorer: &mut TScorer,
bitsets: &mut [TinySet; HORIZON_NUM_TINYBITSETS],
score_combiner: &mut [TScoreCombiner; HORIZON as usize],
min_doc: DocId,
horizon: DocId,
) {
loop {
let doc = scorer.doc();
if doc >= horizon {
break;
}
let delta = doc - min_doc;
insert_delta(bitsets, delta);
debug_assert!(delta < HORIZON);
score_combiner[delta as usize].update(scorer);
scorer.advance();
}
}
fn refill<TScorer: Scorer, TScoreCombiner: ScoreCombiner>(
scorers: &mut Vec<TScorer>,
bitsets: &mut [TinySet; HORIZON_NUM_TINYBITSETS],
score_combiner: &mut [TScoreCombiner; HORIZON as usize],
docs: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
term_freqs: &mut [u32; COLLECT_BLOCK_BUFFER_LEN],
min_doc: DocId,
use_score_doc_refill: bool,
) {
unordered_drain_filter(scorers, |scorer| {
let horizon = min_doc + HORIZON;
loop {
let doc = scorer.doc();
if doc >= horizon {
return false;
}
// add this document
let delta = doc - min_doc;
bitsets[(delta / 64) as usize].insert_mut(delta % 64u32);
score_combiner[delta as usize].update(scorer);
if scorer.advance() == TERMINATED {
// remove the docset, it has been entirely consumed.
return true;
}
let horizon = min_doc + HORIZON;
for scorer in scorers.iter_mut() {
if use_score_doc_refill {
refill_scorer_with_score_docs(
scorer,
bitsets,
score_combiner,
docs,
term_freqs,
min_doc,
horizon,
);
} else {
refill_scorer_from_current_doc(scorer, bitsets, score_combiner, min_doc, horizon);
}
});
}
scorers.retain(|scorer| scorer.doc() != TERMINATED);
}
impl<TScorer: Scorer, TScoreCombiner: ScoreCombiner> BufferedUnionScorer<TScorer, TScoreCombiner> {
@@ -87,6 +253,8 @@ impl<TScorer: Scorer, TScoreCombiner: ScoreCombiner> BufferedUnionScorer<TScorer
score_combiner_fn: impl FnOnce() -> TScoreCombiner,
num_docs: u32,
) -> BufferedUnionScorer<TScorer, TScoreCombiner> {
let use_score_doc_refill =
TScoreCombiner::requires_scoring() && docsets.iter().all(Scorer::can_score_doc);
let non_empty_docsets: Vec<TScorer> = docsets
.into_iter()
.filter(|docset| docset.doc() != TERMINATED)
@@ -100,6 +268,9 @@ impl<TScorer: Scorer, TScoreCombiner: ScoreCombiner> BufferedUnionScorer<TScorer
doc: 0,
score: 0.0,
num_docs,
refill_docs: [TERMINATED; COLLECT_BLOCK_BUFFER_LEN],
refill_term_freqs: [1u32; COLLECT_BLOCK_BUFFER_LEN],
use_score_doc_refill,
};
if union.refill() {
union.advance();
@@ -120,7 +291,10 @@ impl<TScorer: Scorer, TScoreCombiner: ScoreCombiner> BufferedUnionScorer<TScorer
&mut self.docsets,
&mut self.bitsets,
&mut self.scores,
&mut self.refill_docs,
&mut self.refill_term_freqs,
min_doc,
self.use_score_doc_refill,
);
true
} else {
@@ -248,12 +422,12 @@ where
// The target is outside of the buffered horizon.
// advance all docsets to a doc >= to the target.
unordered_drain_filter(&mut self.docsets, |docset| {
for docset in &mut self.docsets {
if docset.doc() < target {
docset.seek(target);
}
docset.doc() == TERMINATED
});
}
self.docsets.retain(|docset| docset.doc() != TERMINATED);
// at this point all of the docsets
// are positioned on a doc >= to the target.

View File

@@ -10,6 +10,8 @@ pub use simple_union::SimpleUnion;
mod tests {
use std::collections::BTreeSet;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use common::BitSet;
@@ -18,8 +20,8 @@ mod tests {
use crate::postings::tests::test_skip_against_unoptimized;
use crate::query::score_combiner::DoNothingCombiner;
use crate::query::union::bitset_union::BitSetPostingUnion;
use crate::query::{BitSetDocSet, ConstScorer, VecDocSet};
use crate::{tests, DocId};
use crate::query::{BitSetDocSet, ConstScorer, Scorer, VecDocSet};
use crate::{tests, DocId, Score};
fn vec_doc_set_from_docs_list(
docs_list: &[Vec<DocId>],
@@ -66,6 +68,61 @@ mod tests {
}
BitSetDocSet::from(doc_bitset)
}
struct CountingScorer {
docset: VecDocSet,
score_calls: Arc<AtomicUsize>,
score_doc_calls: Arc<AtomicUsize>,
}
impl CountingScorer {
fn new(
doc_ids: Vec<DocId>,
score_calls: Arc<AtomicUsize>,
score_doc_calls: Arc<AtomicUsize>,
) -> Self {
CountingScorer {
docset: VecDocSet::from(doc_ids),
score_calls,
score_doc_calls,
}
}
}
impl DocSet for CountingScorer {
fn advance(&mut self) -> DocId {
self.docset.advance()
}
fn seek(&mut self, target: DocId) -> DocId {
self.docset.seek(target)
}
fn doc(&self) -> DocId {
self.docset.doc()
}
fn size_hint(&self) -> u32 {
self.docset.size_hint()
}
}
impl Scorer for CountingScorer {
fn score(&mut self) -> Score {
self.score_calls.fetch_add(1, Ordering::SeqCst);
1.0
}
fn can_score_doc(&self) -> bool {
true
}
fn score_doc(&mut self, _doc: DocId, _term_freq: u32) -> Score {
self.score_doc_calls.fetch_add(1, Ordering::SeqCst);
1.0
}
}
fn aux_test_union(docs_list: &[Vec<DocId>]) {
for constructor in [
posting_list_union_from_docs_list,
@@ -168,6 +225,22 @@ mod tests {
]);
}
#[test]
fn test_do_nothing_combiner_does_not_score_buffered_docs() {
let score_calls = Arc::new(AtomicUsize::new(0));
let score_doc_calls = Arc::new(AtomicUsize::new(0));
let scorers = vec![
CountingScorer::new(vec![1, 3, 5], score_calls.clone(), score_doc_calls.clone()),
CountingScorer::new(vec![2, 3, 6], score_calls.clone(), score_doc_calls.clone()),
];
let mut union = BufferedUnionScorer::build(scorers, DoNothingCombiner::default, 10);
assert_eq!(union.count_including_deleted(), 5);
assert_eq!(score_calls.load(Ordering::SeqCst), 0);
assert_eq!(score_doc_calls.load(Ordering::SeqCst), 0);
}
fn test_aux_union_skip(docs_list: &[Vec<DocId>], skip_targets: Vec<DocId>) {
for constructor in [
posting_list_union_from_docs_list,

View File

@@ -14,11 +14,8 @@ use itertools::Itertools;
use tantivy_fst::Automaton;
use tantivy_fst::automaton::AlwaysMatch;
use crate::sstable_index_v3::SSTableIndexV3Empty;
use crate::streamer::{Streamer, StreamerBuilder};
use crate::{
BlockAddr, DeltaReader, Reader, SSTable, SSTableIndex, SSTableIndexV3, TermOrdinal, VoidSSTable,
};
use crate::{BlockAddr, DeltaReader, Reader, SSTable, SSTableIndex, TermOrdinal, VoidSSTable};
/// An SSTable is a sorted map that associates sorted `&[u8]` keys
/// to any kind of typed values.
@@ -288,33 +285,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
let (sstable_slice, index_slice) = main_slice.split(index_offset as usize);
let sstable_index_bytes = index_slice.read_bytes()?;
let sstable_index = match version {
2 => SSTableIndex::V2(
crate::sstable_index_v2::SSTableIndex::load(sstable_index_bytes).map_err(|_| {
io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption")
})?,
),
3 => {
let (sstable_index_bytes, mut footerv3_len_bytes) = sstable_index_bytes.rsplit(8);
let store_offset = u64::deserialize(&mut footerv3_len_bytes)?;
if store_offset != 0 {
SSTableIndex::V3(
SSTableIndexV3::load(sstable_index_bytes, store_offset).map_err(|_| {
io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption")
})?,
)
} else {
// if store_offset is zero, there is no index, so we build a pseudo-index
// assuming a single block of sstable covering everything.
SSTableIndex::V3Empty(SSTableIndexV3Empty::load(index_offset as usize))
}
}
_ => {
return Err(io::Error::other(format!(
"Unsupported sstable version, expected one of [2, 3], found {version}"
)));
}
};
let sstable_index = SSTableIndex::open(version, index_offset, sstable_index_bytes)?;
Ok(Dictionary {
sstable_slice,
@@ -525,10 +496,15 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
// Open the block for the first ordinal.
let mut bytes = Vec::new();
let mut current_block_addr = self.sstable_index.get_block_with_ord(ord);
let (mut current_block_addr, block_id) = self.sstable_index.get_and_locate_with_ord(ord);
let mut current_sstable_delta_reader =
self.sstable_delta_reader_block(current_block_addr.clone())?;
let mut current_block_ordinal = current_block_addr.first_ordinal;
let mut current_block_end_bound = self
.sstable_index
.get_block(block_id + 1)
.map(|block_addr| block_addr.first_ordinal)
.unwrap_or(u64::MAX);
loop {
// move to the ord inside the current block
@@ -557,17 +533,19 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
}
};
// TODO optimization: it is silly to do a binary search to get the block every single
// time.
//
// Check if block changed for new term_ord
let new_block_addr = self.sstable_index.get_block_with_ord(next_ord);
if new_block_addr != current_block_addr {
if next_ord >= current_block_end_bound {
let (new_block_addr, block_id) =
self.sstable_index.get_and_locate_with_ord(next_ord);
current_block_addr = new_block_addr;
current_block_ordinal = current_block_addr.first_ordinal;
current_sstable_delta_reader =
self.sstable_delta_reader_block(current_block_addr.clone())?;
bytes.clear();
current_block_end_bound = self
.sstable_index
.get_block(block_id + 1)
.map(|block_addr| block_addr.first_ordinal)
.unwrap_or(u64::MAX)
}
ord = next_ord;
}

319
sstable/src/index/mod.rs Normal file
View File

@@ -0,0 +1,319 @@
pub(crate) mod v2;
pub(crate) mod v3;
use std::io::{self, Read, Write};
use std::ops::Range;
use common::{BinarySerializable, FixedSize, OwnedBytes};
use tantivy_fst::{Automaton, MapBuilder};
use crate::{TermOrdinal, common_prefix_len};
#[derive(Debug, Clone)]
pub enum SSTableIndex {
V2(v2::SSTableIndex),
V3(v3::SSTableIndexV3),
V3Empty(v3::SSTableIndexV3Empty),
}
impl SSTableIndex {
pub(crate) fn open(
version: u32,
index_offset: u64,
index_bytes: OwnedBytes,
) -> io::Result<Self> {
let index = match version {
2 => {
SSTableIndex::V2(v2::SSTableIndex::load(index_bytes).map_err(|_| {
io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption")
})?)
}
3 => {
let (index_bytes, mut footerv3_len_bytes) = index_bytes.rsplit(8);
let store_offset = u64::deserialize(&mut footerv3_len_bytes)?;
if store_offset != 0 {
SSTableIndex::V3(v3::SSTableIndexV3::load(index_bytes, store_offset).map_err(
|_| io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption"),
)?)
} else {
// if store_offset is zero, there is no index, so we build a pseudo-index
// assuming a single block of sstable covering everything.
SSTableIndex::V3Empty(v3::SSTableIndexV3Empty::load(index_offset as usize))
}
}
_ => {
return Err(io::Error::other(format!(
"Unsupported sstable version, expected one of [2, 3], found {version}"
)));
}
};
Ok(index)
}
/// Get the [`BlockAddr`] of the requested block.
pub(crate) fn get_block(&self, block_id: u64) -> Option<BlockAddr> {
match self {
SSTableIndex::V2(v2_index) => v2_index.get_block(block_id as usize),
SSTableIndex::V3(v3_index) => v3_index.get_block(block_id),
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block(block_id),
}
}
/// Get the block id of the block that would contain `key`.
///
/// Returns None if `key` is lexicographically after the last key recorded.
pub(crate) fn locate_with_key(&self, key: &[u8]) -> Option<u64> {
match self {
SSTableIndex::V2(v2_index) => v2_index.locate_with_key(key).map(|i| i as u64),
SSTableIndex::V3(v3_index) => v3_index.locate_with_key(key),
SSTableIndex::V3Empty(v3_empty) => v3_empty.locate_with_key(key),
}
}
/// Get the [`BlockAddr`] of the block that would contain `key`.
///
/// Returns None if `key` is lexicographically after the last key recorded.
pub fn get_block_with_key(&self, key: &[u8]) -> Option<BlockAddr> {
match self {
SSTableIndex::V2(v2_index) => v2_index.get_block_with_key(key),
SSTableIndex::V3(v3_index) => v3_index.get_block_with_key(key),
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_key(key),
}
}
pub(crate) fn locate_with_ord(&self, ord: TermOrdinal) -> u64 {
match self {
SSTableIndex::V2(v2_index) => v2_index.locate_with_ord(ord) as u64,
SSTableIndex::V3(v3_index) => v3_index.locate_with_ord(ord),
SSTableIndex::V3Empty(v3_empty) => v3_empty.locate_with_ord(ord),
}
}
/// Get the [`BlockAddr`] of the block containing the `ord`-th term.
pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr {
match self {
SSTableIndex::V2(v2_index) => v2_index.get_block_with_ord(ord),
SSTableIndex::V3(v3_index) => v3_index.get_block_with_ord(ord),
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_ord(ord),
}
}
pub(crate) fn get_and_locate_with_ord(&self, ord: TermOrdinal) -> (BlockAddr, u64) {
match self {
SSTableIndex::V2(v2_index) => v2_index.get_and_locate_with_ord(ord),
SSTableIndex::V3(v3_index) => v3_index.get_and_locate_with_ord(ord),
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_and_locate_with_ord(ord),
}
}
pub fn get_block_for_automaton<'a>(
&'a self,
automaton: &'a impl Automaton,
) -> impl Iterator<Item = (u64, BlockAddr)> + 'a {
match self {
SSTableIndex::V2(v2_index) => {
BlockIter::V2(v2_index.get_block_for_automaton(automaton))
}
SSTableIndex::V3(v3_index) => {
BlockIter::V3(v3_index.get_block_for_automaton(automaton))
}
SSTableIndex::V3Empty(v3_empty) => {
BlockIter::V3Empty(std::iter::once((0, v3_empty.block_addr.clone())))
}
}
}
}
enum BlockIter<V2, V3, T> {
V2(V2),
V3(V3),
V3Empty(std::iter::Once<T>),
}
impl<V2: Iterator<Item = T>, V3: Iterator<Item = T>, T> Iterator for BlockIter<V2, V3, T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
match self {
BlockIter::V2(v2) => v2.next(),
BlockIter::V3(v3) => v3.next(),
BlockIter::V3Empty(once) => once.next(),
}
}
}
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct BlockAddr {
pub first_ordinal: u64,
pub byte_range: Range<usize>,
}
impl BlockAddr {
fn to_block_start(&self) -> BlockStartAddr {
BlockStartAddr {
first_ordinal: self.first_ordinal,
byte_range_start: self.byte_range.start,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct BlockStartAddr {
first_ordinal: u64,
byte_range_start: usize,
}
impl BlockStartAddr {
fn to_block_addr(&self, byte_range_end: usize) -> BlockAddr {
BlockAddr {
first_ordinal: self.first_ordinal,
byte_range: self.byte_range_start..byte_range_end,
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct BlockMeta {
/// Any byte string that is lexicographically greater or equal to
/// the last key in the block,
/// and yet strictly smaller than the first key in the next block.
pub last_key_or_greater: Vec<u8>,
pub block_addr: BlockAddr,
}
impl BinarySerializable for BlockStartAddr {
fn serialize<W: Write + ?Sized>(&self, writer: &mut W) -> io::Result<()> {
let start = self.byte_range_start as u64;
start.serialize(writer)?;
self.first_ordinal.serialize(writer)
}
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
let byte_range_start = u64::deserialize(reader)? as usize;
let first_ordinal = u64::deserialize(reader)?;
Ok(BlockStartAddr {
first_ordinal,
byte_range_start,
})
}
// Provided method
fn num_bytes(&self) -> u64 {
BlockStartAddr::SIZE_IN_BYTES as u64
}
}
impl FixedSize for BlockStartAddr {
const SIZE_IN_BYTES: usize = 2 * u64::SIZE_IN_BYTES;
}
/// Given that left < right,
/// mutates `left into a shorter byte string left'` that
/// matches `left <= left' < right`.
fn find_shorter_str_in_between(left: &mut Vec<u8>, right: &[u8]) {
assert!(&left[..] < right);
let common_len = common_prefix_len(left, right);
if left.len() == common_len {
return;
}
// It is possible to do one character shorter in some case,
// but it is not worth the extra complexity
for pos in (common_len + 1)..left.len() {
if left[pos] != u8::MAX {
left[pos] += 1;
left.truncate(pos + 1);
return;
}
}
}
#[derive(Default)]
pub struct SSTableIndexBuilder {
blocks: Vec<BlockMeta>,
}
impl SSTableIndexBuilder {
/// In order to make the index as light as possible, we
/// try to find a shorter alternative to the last key of the last block
/// that is still smaller than the next key.
pub(crate) fn shorten_last_block_key_given_next_key(&mut self, next_key: &[u8]) {
if let Some(last_block) = self.blocks.last_mut() {
find_shorter_str_in_between(&mut last_block.last_key_or_greater, next_key);
}
}
pub fn add_block(&mut self, last_key: &[u8], byte_range: Range<usize>, first_ordinal: u64) {
self.blocks.push(BlockMeta {
last_key_or_greater: last_key.to_vec(),
block_addr: BlockAddr {
byte_range,
first_ordinal,
},
})
}
pub fn serialize<W: std::io::Write>(&self, wrt: W) -> io::Result<u64> {
if self.blocks.len() <= 1 {
return Ok(0);
}
let counting_writer = common::CountingWriter::wrap(wrt);
let mut map_builder = MapBuilder::new(counting_writer).map_err(fst_error_to_io_error)?;
for (i, block) in self.blocks.iter().enumerate() {
map_builder
.insert(&block.last_key_or_greater, i as u64)
.map_err(fst_error_to_io_error)?;
}
let counting_writer = map_builder.into_inner().map_err(fst_error_to_io_error)?;
let written_bytes = counting_writer.written_bytes();
let mut wrt = counting_writer.finish();
let mut block_store_writer = v3::BlockAddrStoreWriter::new();
for block in &self.blocks {
block_store_writer.write_block_meta(block.block_addr.clone())?;
}
block_store_writer.serialize(&mut wrt)?;
Ok(written_bytes)
}
}
fn fst_error_to_io_error(error: tantivy_fst::Error) -> io::Error {
match error {
tantivy_fst::Error::Fst(fst_error) => io::Error::other(fst_error),
tantivy_fst::Error::Io(ioerror) => ioerror,
}
}
#[cfg(test)]
mod tests {
#[track_caller]
fn test_find_shorter_str_in_between_aux(left: &[u8], right: &[u8]) {
let mut left_buf = left.to_vec();
super::find_shorter_str_in_between(&mut left_buf, right);
assert!(left_buf.len() <= left.len());
assert!(left <= &left_buf);
assert!(&left_buf[..] < right);
}
#[test]
fn test_find_shorter_str_in_between() {
test_find_shorter_str_in_between_aux(b"", b"hello");
test_find_shorter_str_in_between_aux(b"abc", b"abcd");
test_find_shorter_str_in_between_aux(b"abcd", b"abd");
test_find_shorter_str_in_between_aux(&[0, 0, 0], &[1]);
test_find_shorter_str_in_between_aux(&[0, 0, 0], &[0, 0, 1]);
test_find_shorter_str_in_between_aux(&[0, 0, 255, 255, 255, 0u8], &[0, 1]);
}
use proptest::prelude::*;
proptest! {
#![proptest_config(ProptestConfig::with_cases(100))]
#[test]
fn test_proptest_find_shorter_str(left in any::<Vec<u8>>(), right in any::<Vec<u8>>()) {
if left < right {
test_find_shorter_str_in_between_aux(&left, &right);
}
}
}
}

View File

@@ -77,6 +77,13 @@ impl SSTableIndex {
self.get_block(self.locate_with_ord(ord)).unwrap()
}
pub(crate) fn get_and_locate_with_ord(&self, ord: TermOrdinal) -> (BlockAddr, u64) {
let location = self.locate_with_ord(ord);
// locate_with_ord always returns an index within range
let block_addr = self.get_block(location).unwrap();
(block_addr, location as u64)
}
pub(crate) fn get_block_for_automaton<'a>(
&'a self,
automaton: &'a impl Automaton,

View File

@@ -1,106 +1,14 @@
use std::io::{self, Read, Write};
use std::ops::Range;
use std::sync::Arc;
use common::{BinarySerializable, FixedSize, OwnedBytes};
use tantivy_bitpacker::{BitPacker, compute_num_bits};
use tantivy_fst::raw::Fst;
use tantivy_fst::{Automaton, IntoStreamer, Map, MapBuilder, Streamer};
use tantivy_fst::{Automaton, IntoStreamer, Map, Streamer};
use super::{BlockAddr, BlockStartAddr};
use crate::block_match_automaton::can_block_match_automaton;
use crate::{SSTableDataCorruption, TermOrdinal, common_prefix_len};
#[derive(Debug, Clone)]
pub enum SSTableIndex {
V2(crate::sstable_index_v2::SSTableIndex),
V3(SSTableIndexV3),
V3Empty(SSTableIndexV3Empty),
}
impl SSTableIndex {
/// Get the [`BlockAddr`] of the requested block.
pub(crate) fn get_block(&self, block_id: u64) -> Option<BlockAddr> {
match self {
SSTableIndex::V2(v2_index) => v2_index.get_block(block_id as usize),
SSTableIndex::V3(v3_index) => v3_index.get_block(block_id),
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block(block_id),
}
}
/// Get the block id of the block that would contain `key`.
///
/// Returns None if `key` is lexicographically after the last key recorded.
pub(crate) fn locate_with_key(&self, key: &[u8]) -> Option<u64> {
match self {
SSTableIndex::V2(v2_index) => v2_index.locate_with_key(key).map(|i| i as u64),
SSTableIndex::V3(v3_index) => v3_index.locate_with_key(key),
SSTableIndex::V3Empty(v3_empty) => v3_empty.locate_with_key(key),
}
}
/// Get the [`BlockAddr`] of the block that would contain `key`.
///
/// Returns None if `key` is lexicographically after the last key recorded.
pub fn get_block_with_key(&self, key: &[u8]) -> Option<BlockAddr> {
match self {
SSTableIndex::V2(v2_index) => v2_index.get_block_with_key(key),
SSTableIndex::V3(v3_index) => v3_index.get_block_with_key(key),
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_key(key),
}
}
pub(crate) fn locate_with_ord(&self, ord: TermOrdinal) -> u64 {
match self {
SSTableIndex::V2(v2_index) => v2_index.locate_with_ord(ord) as u64,
SSTableIndex::V3(v3_index) => v3_index.locate_with_ord(ord),
SSTableIndex::V3Empty(v3_empty) => v3_empty.locate_with_ord(ord),
}
}
/// Get the [`BlockAddr`] of the block containing the `ord`-th term.
pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr {
match self {
SSTableIndex::V2(v2_index) => v2_index.get_block_with_ord(ord),
SSTableIndex::V3(v3_index) => v3_index.get_block_with_ord(ord),
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_ord(ord),
}
}
pub fn get_block_for_automaton<'a>(
&'a self,
automaton: &'a impl Automaton,
) -> impl Iterator<Item = (u64, BlockAddr)> + 'a {
match self {
SSTableIndex::V2(v2_index) => {
BlockIter::V2(v2_index.get_block_for_automaton(automaton))
}
SSTableIndex::V3(v3_index) => {
BlockIter::V3(v3_index.get_block_for_automaton(automaton))
}
SSTableIndex::V3Empty(v3_empty) => {
BlockIter::V3Empty(std::iter::once((0, v3_empty.block_addr.clone())))
}
}
}
}
enum BlockIter<V2, V3, T> {
V2(V2),
V3(V3),
V3Empty(std::iter::Once<T>),
}
impl<V2: Iterator<Item = T>, V3: Iterator<Item = T>, T> Iterator for BlockIter<V2, V3, T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
match self {
BlockIter::V2(v2) => v2.next(),
BlockIter::V3(v3) => v3.next(),
BlockIter::V3Empty(once) => once.next(),
}
}
}
use crate::{SSTableDataCorruption, TermOrdinal};
#[derive(Debug, Clone)]
pub struct SSTableIndexV3 {
@@ -160,6 +68,11 @@ impl SSTableIndexV3 {
self.block_addr_store.binary_search_ord(ord).1
}
pub(crate) fn get_and_locate_with_ord(&self, ord: TermOrdinal) -> (BlockAddr, u64) {
let (location, block_addr) = self.block_addr_store.binary_search_ord(ord);
(block_addr, location)
}
pub(crate) fn get_block_for_automaton<'a>(
&'a self,
automaton: &'a impl Automaton,
@@ -216,7 +129,7 @@ impl<A: Automaton> Iterator for GetBlockForAutomaton<'_, A> {
#[derive(Debug, Clone)]
pub struct SSTableIndexV3Empty {
block_addr: BlockAddr,
pub block_addr: BlockAddr,
}
impl SSTableIndexV3Empty {
@@ -230,8 +143,8 @@ impl SSTableIndexV3Empty {
}
/// Get the [`BlockAddr`] of the requested block.
pub(crate) fn get_block(&self, _block_id: u64) -> Option<BlockAddr> {
Some(self.block_addr.clone())
pub(crate) fn get_block(&self, block_id: u64) -> Option<BlockAddr> {
(block_id == 0).then(|| self.block_addr.clone())
}
/// Get the block id of the block that would contain `key`.
@@ -256,146 +169,9 @@ impl SSTableIndexV3Empty {
pub(crate) fn get_block_with_ord(&self, _ord: TermOrdinal) -> BlockAddr {
self.block_addr.clone()
}
}
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct BlockAddr {
pub first_ordinal: u64,
pub byte_range: Range<usize>,
}
impl BlockAddr {
fn to_block_start(&self) -> BlockStartAddr {
BlockStartAddr {
first_ordinal: self.first_ordinal,
byte_range_start: self.byte_range.start,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct BlockStartAddr {
first_ordinal: u64,
byte_range_start: usize,
}
impl BlockStartAddr {
fn to_block_addr(&self, byte_range_end: usize) -> BlockAddr {
BlockAddr {
first_ordinal: self.first_ordinal,
byte_range: self.byte_range_start..byte_range_end,
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct BlockMeta {
/// Any byte string that is lexicographically greater or equal to
/// the last key in the block,
/// and yet strictly smaller than the first key in the next block.
pub last_key_or_greater: Vec<u8>,
pub block_addr: BlockAddr,
}
impl BinarySerializable for BlockStartAddr {
fn serialize<W: Write + ?Sized>(&self, writer: &mut W) -> io::Result<()> {
let start = self.byte_range_start as u64;
start.serialize(writer)?;
self.first_ordinal.serialize(writer)
}
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
let byte_range_start = u64::deserialize(reader)? as usize;
let first_ordinal = u64::deserialize(reader)?;
Ok(BlockStartAddr {
first_ordinal,
byte_range_start,
})
}
// Provided method
fn num_bytes(&self) -> u64 {
BlockStartAddr::SIZE_IN_BYTES as u64
}
}
impl FixedSize for BlockStartAddr {
const SIZE_IN_BYTES: usize = 2 * u64::SIZE_IN_BYTES;
}
/// Given that left < right,
/// mutates `left into a shorter byte string left'` that
/// matches `left <= left' < right`.
fn find_shorter_str_in_between(left: &mut Vec<u8>, right: &[u8]) {
assert!(&left[..] < right);
let common_len = common_prefix_len(left, right);
if left.len() == common_len {
return;
}
// It is possible to do one character shorter in some case,
// but it is not worth the extra complexity
for pos in (common_len + 1)..left.len() {
if left[pos] != u8::MAX {
left[pos] += 1;
left.truncate(pos + 1);
return;
}
}
}
#[derive(Default)]
pub struct SSTableIndexBuilder {
blocks: Vec<BlockMeta>,
}
impl SSTableIndexBuilder {
/// In order to make the index as light as possible, we
/// try to find a shorter alternative to the last key of the last block
/// that is still smaller than the next key.
pub(crate) fn shorten_last_block_key_given_next_key(&mut self, next_key: &[u8]) {
if let Some(last_block) = self.blocks.last_mut() {
find_shorter_str_in_between(&mut last_block.last_key_or_greater, next_key);
}
}
pub fn add_block(&mut self, last_key: &[u8], byte_range: Range<usize>, first_ordinal: u64) {
self.blocks.push(BlockMeta {
last_key_or_greater: last_key.to_vec(),
block_addr: BlockAddr {
byte_range,
first_ordinal,
},
})
}
pub fn serialize<W: std::io::Write>(&self, wrt: W) -> io::Result<u64> {
if self.blocks.len() <= 1 {
return Ok(0);
}
let counting_writer = common::CountingWriter::wrap(wrt);
let mut map_builder = MapBuilder::new(counting_writer).map_err(fst_error_to_io_error)?;
for (i, block) in self.blocks.iter().enumerate() {
map_builder
.insert(&block.last_key_or_greater, i as u64)
.map_err(fst_error_to_io_error)?;
}
let counting_writer = map_builder.into_inner().map_err(fst_error_to_io_error)?;
let written_bytes = counting_writer.written_bytes();
let mut wrt = counting_writer.finish();
let mut block_store_writer = BlockAddrStoreWriter::new();
for block in &self.blocks {
block_store_writer.write_block_meta(block.block_addr.clone())?;
}
block_store_writer.serialize(&mut wrt)?;
Ok(written_bytes)
}
}
fn fst_error_to_io_error(error: tantivy_fst::Error) -> io::Error {
match error {
tantivy_fst::Error::Fst(fst_error) => io::Error::other(fst_error),
tantivy_fst::Error::Io(ioerror) => ioerror,
pub(crate) fn get_and_locate_with_ord(&self, _ord: TermOrdinal) -> (BlockAddr, u64) {
(self.block_addr.clone(), 0)
}
}
@@ -647,14 +423,14 @@ fn binary_search(max: u64, cmp_fn: impl Fn(u64) -> std::cmp::Ordering) -> Result
Err(left)
}
struct BlockAddrStoreWriter {
pub(crate) struct BlockAddrStoreWriter {
buffer_block_metas: Vec<u8>,
buffer_addrs: Vec<u8>,
block_addrs: Vec<BlockAddr>,
}
impl BlockAddrStoreWriter {
fn new() -> Self {
pub(crate) fn new() -> Self {
BlockAddrStoreWriter {
buffer_block_metas: Vec::new(),
buffer_addrs: Vec::new(),
@@ -662,7 +438,7 @@ impl BlockAddrStoreWriter {
}
}
fn flush_block(&mut self) -> io::Result<()> {
pub(crate) fn flush_block(&mut self) -> io::Result<()> {
if self.block_addrs.is_empty() {
return Ok(());
}
@@ -741,7 +517,7 @@ impl BlockAddrStoreWriter {
Ok(())
}
fn write_block_meta(&mut self, block_addr: BlockAddr) -> io::Result<()> {
pub(crate) fn write_block_meta(&mut self, block_addr: BlockAddr) -> io::Result<()> {
self.block_addrs.push(block_addr);
if self.block_addrs.len() >= STORE_BLOCK_LEN {
self.flush_block()?;
@@ -749,7 +525,7 @@ impl BlockAddrStoreWriter {
Ok(())
}
fn serialize<W: std::io::Write>(&mut self, wrt: &mut W) -> io::Result<()> {
pub(crate) fn serialize<W: std::io::Write>(&mut self, wrt: &mut W) -> io::Result<()> {
self.flush_block()?;
let len = self.buffer_block_metas.len() as u64;
len.serialize(wrt)?;
@@ -824,8 +600,9 @@ mod tests {
use common::OwnedBytes;
use super::*;
use crate::SSTableDataCorruption;
use crate::block_match_automaton::tests::EqBuffer;
use crate::index::BlockMeta;
use crate::{SSTableDataCorruption, SSTableIndexBuilder};
#[test]
fn test_sstable_index() {
@@ -874,36 +651,7 @@ mod tests {
assert!(matches!(data_corruption_err, SSTableDataCorruption));
}
#[track_caller]
fn test_find_shorter_str_in_between_aux(left: &[u8], right: &[u8]) {
let mut left_buf = left.to_vec();
super::find_shorter_str_in_between(&mut left_buf, right);
assert!(left_buf.len() <= left.len());
assert!(left <= &left_buf);
assert!(&left_buf[..] < right);
}
#[test]
fn test_find_shorter_str_in_between() {
test_find_shorter_str_in_between_aux(b"", b"hello");
test_find_shorter_str_in_between_aux(b"abc", b"abcd");
test_find_shorter_str_in_between_aux(b"abcd", b"abd");
test_find_shorter_str_in_between_aux(&[0, 0, 0], &[1]);
test_find_shorter_str_in_between_aux(&[0, 0, 0], &[0, 0, 1]);
test_find_shorter_str_in_between_aux(&[0, 0, 255, 255, 255, 0u8], &[0, 1]);
}
use proptest::prelude::*;
proptest! {
#![proptest_config(ProptestConfig::with_cases(100))]
#[test]
fn test_proptest_find_shorter_str(left in any::<Vec<u8>>(), right in any::<Vec<u8>>()) {
if left < right {
test_find_shorter_str_in_between_aux(&left, &right);
}
}
}
// use proptest::prelude::*;
#[test]
fn test_find_best_slop() {

View File

@@ -47,9 +47,8 @@ pub mod merge;
mod streamer;
pub mod value;
mod sstable_index_v3;
pub use sstable_index_v3::{BlockAddr, SSTableIndex, SSTableIndexBuilder, SSTableIndexV3};
mod sstable_index_v2;
mod index;
pub use index::{BlockAddr, SSTableIndex, SSTableIndexBuilder};
pub(crate) mod vint;
pub use dictionary::{Dictionary, TermOrdHit};
pub use streamer::{Streamer, StreamerBuilder};

View File

@@ -27,7 +27,7 @@ rand = "0.9"
zipf = "7.0.0"
rustc-hash = "2.1.0"
proptest = "1.2.0"
binggan = { version = "0.16.1" }
binggan = { version = "0.17.0" }
rand_distr = "0.5"
[features]