mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-06-01 08:00:41 +00:00
Compare commits
21 Commits
agg_cleanu
...
faster_uni
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
74f37f045d | ||
|
|
72cca113cd | ||
|
|
672bf45235 | ||
|
|
33ef167441 | ||
|
|
bf8b263f16 | ||
|
|
24a97dbe69 | ||
|
|
34fec8b23e | ||
|
|
46b3fb9ed3 | ||
|
|
fbe620b9b4 | ||
|
|
95d8a3989a | ||
|
|
ea61a68db4 | ||
|
|
c367df37c1 | ||
|
|
d99a5d4e91 | ||
|
|
2de6f075ce | ||
|
|
18080067c7 | ||
|
|
95db7d2e5c | ||
|
|
fc017c4c74 | ||
|
|
141c91d028 | ||
|
|
36a83e7c1a | ||
|
|
be11f8a6a1 | ||
|
|
4305e4029e |
@@ -65,7 +65,7 @@ tantivy-bitpacker = { version = "0.10", path = "./bitpacker" }
|
||||
common = { version = "0.11", path = "./common/", package = "tantivy-common" }
|
||||
tokenizer-api = { version = "0.7", path = "./tokenizer-api", package = "tantivy-tokenizer-api" }
|
||||
sketches-ddsketch = { version = "0.4", features = ["use_serde"] }
|
||||
datasketches = { git = "https://github.com/fulmicoton-dd/datasketches-rust", rev = "7635fb8" }
|
||||
datasketches = { version = "0.3.0", features = ["hll"] }
|
||||
futures-util = { version = "0.3.28", optional = true }
|
||||
futures-channel = { version = "0.3.28", optional = true }
|
||||
fnv = "1.0.7"
|
||||
@@ -75,7 +75,7 @@ typetag = "0.2.21"
|
||||
winapi = "0.3.9"
|
||||
|
||||
[dev-dependencies]
|
||||
binggan = "0.16.1"
|
||||
binggan = "0.17.0"
|
||||
rand = "0.9"
|
||||
maplit = "1.0.2"
|
||||
matches = "0.1.9"
|
||||
|
||||
@@ -23,7 +23,7 @@ downcast-rs = "2.0.1"
|
||||
proptest = "1"
|
||||
more-asserts = "0.3.1"
|
||||
rand = "0.9"
|
||||
binggan = "0.16.1"
|
||||
binggan = "0.17.0"
|
||||
|
||||
[[bench]]
|
||||
name = "bench_merge"
|
||||
|
||||
@@ -19,6 +19,6 @@ time = { version = "0.3.47", features = ["serde-well-known"] }
|
||||
serde = { version = "1.0.136", features = ["derive"] }
|
||||
|
||||
[dev-dependencies]
|
||||
binggan = "0.16.1"
|
||||
binggan = "0.17.0"
|
||||
proptest = "1.0.0"
|
||||
rand = "0.9"
|
||||
|
||||
@@ -115,6 +115,71 @@ pub fn get_fast_field_names(aggs: &Aggregations) -> HashSet<String> {
|
||||
fast_field_names
|
||||
}
|
||||
|
||||
/// Validates that all fields referenced in the aggregation request exist in the schema
|
||||
/// and are configured as fast fields.
|
||||
///
|
||||
/// This is a convenience function for upfront validation before executing aggregations.
|
||||
/// Returns an error if any field doesn't exist or is not a fast field.
|
||||
///
|
||||
/// Validation is intentionally opt-in rather than baked into aggregation execution: the
|
||||
/// default lenient behavior (returning empty results for missing fields) supports
|
||||
/// schema evolution and federated queries where the same request runs against segments
|
||||
/// or indices with different schemas.
|
||||
///
|
||||
/// # Example
|
||||
/// ```
|
||||
/// use tantivy::aggregation::agg_req::{Aggregations, validate_aggregation_fields_exist};
|
||||
/// use tantivy::schema::{Schema, FAST};
|
||||
/// use tantivy::Index;
|
||||
///
|
||||
/// # fn main() -> tantivy::Result<()> {
|
||||
/// // Create a simple index
|
||||
/// let mut schema_builder = Schema::builder();
|
||||
/// schema_builder.add_f64_field("price", FAST);
|
||||
/// let schema = schema_builder.build();
|
||||
/// let index = Index::create_in_ram(schema);
|
||||
///
|
||||
/// // Parse aggregation request
|
||||
/// let agg_req: Aggregations = serde_json::from_str(r#"{
|
||||
/// "avg_price": { "avg": { "field": "price" } }
|
||||
/// }"#)?;
|
||||
///
|
||||
/// let reader = index.reader()?;
|
||||
/// let searcher = reader.searcher();
|
||||
///
|
||||
/// // Validate fields before executing
|
||||
/// for segment_reader in searcher.segment_readers() {
|
||||
/// validate_aggregation_fields_exist(&agg_req, segment_reader)?;
|
||||
/// }
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
pub fn validate_aggregation_fields_exist(
|
||||
aggs: &Aggregations,
|
||||
reader: &crate::SegmentReader,
|
||||
) -> crate::Result<()> {
|
||||
let field_names = get_fast_field_names(aggs);
|
||||
let schema = reader.schema();
|
||||
|
||||
for field_name in field_names {
|
||||
// Check if the field is either directly in the schema or could be part of a json field
|
||||
// present in the schema, and verify it's a fast field.
|
||||
if let Some((field, _path)) = schema.find_field(&field_name) {
|
||||
let field_type = schema.get_field_entry(field).field_type();
|
||||
if !field_type.is_fast() {
|
||||
return Err(crate::TantivyError::SchemaError(format!(
|
||||
"Field '{}' is not a fast field. Aggregations require fast fields.",
|
||||
field_name
|
||||
)));
|
||||
}
|
||||
} else {
|
||||
return Err(crate::TantivyError::FieldNotFound(field_name));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
/// All aggregation types.
|
||||
pub enum AggregationVariants {
|
||||
|
||||
@@ -1436,3 +1436,46 @@ fn test_aggregation_on_json_object_mixed_numerical_segments() {
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_aggregation_field_validation_helper() {
|
||||
// Test the standalone validation helper function for field validation
|
||||
let index = get_test_index_2_segments(false).unwrap();
|
||||
let reader = index.reader().unwrap();
|
||||
let searcher = reader.searcher();
|
||||
let segment_reader = searcher.segment_reader(0);
|
||||
|
||||
// Test with invalid field
|
||||
let agg_req: Aggregations = serde_json::from_str(
|
||||
r#"{
|
||||
"avg_test": {
|
||||
"avg": { "field": "nonexistent_field" }
|
||||
}
|
||||
}"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let result =
|
||||
crate::aggregation::agg_req::validate_aggregation_fields_exist(&agg_req, segment_reader);
|
||||
assert!(result.is_err());
|
||||
match result {
|
||||
Err(crate::TantivyError::FieldNotFound(field_name)) => {
|
||||
assert_eq!(field_name, "nonexistent_field");
|
||||
}
|
||||
_ => panic!("Expected FieldNotFound error, got: {:?}", result),
|
||||
}
|
||||
|
||||
// Test with valid field
|
||||
let agg_req: Aggregations = serde_json::from_str(
|
||||
r#"{
|
||||
"avg_test": {
|
||||
"avg": { "field": "score" }
|
||||
}
|
||||
}"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let result =
|
||||
crate::aggregation::agg_req::validate_aggregation_fields_exist(&agg_req, segment_reader);
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
@@ -166,8 +166,12 @@ impl CouponCache {
|
||||
let should_use_dense =
|
||||
highest_term_ord < 1_000_000u64 || highest_term_ord < num_terms as u64 * 3u64;
|
||||
if should_use_dense {
|
||||
let mut coupon_map: Vec<Coupon> = vec![Coupon::EMPTY; highest_term_ord as usize + 1];
|
||||
for (term_ord, coupon) in term_ords.into_iter().zip(coupons.into_iter()) {
|
||||
// We don't really care about the value here. We will populate all the values we will
|
||||
// read anyway.
|
||||
let uninitialized_coupon = Coupon::from_hash(0);
|
||||
let mut coupon_map: Vec<Coupon> =
|
||||
vec![uninitialized_coupon; highest_term_ord as usize + 1];
|
||||
for (term_ord, coupon) in term_ords.into_iter().zip(coupons) {
|
||||
coupon_map[term_ord as usize] = coupon;
|
||||
}
|
||||
CouponCache::Dense {
|
||||
@@ -821,7 +825,7 @@ impl<'de> Deserialize<'de> for CardinalityCollector {
|
||||
impl CardinalityCollector {
|
||||
fn new(salt: u8) -> Self {
|
||||
Self {
|
||||
sketch: HllSketch::new(LG_K, HllType::Hll4),
|
||||
sketch: HllSketch::new(LG_K, HllType::Hll8),
|
||||
salt,
|
||||
}
|
||||
}
|
||||
@@ -852,7 +856,7 @@ impl CardinalityCollector {
|
||||
let mut union = HllUnion::new(LG_K);
|
||||
union.update(&self.sketch);
|
||||
union.update(&right.sketch);
|
||||
self.sketch = union.to_sketch(HllType::Hll4);
|
||||
self.sketch = union.to_sketch(HllType::Hll8);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -138,6 +138,31 @@ pub trait DocSet: Send {
|
||||
buffer.len()
|
||||
}
|
||||
|
||||
/// Fills a given mutable buffer with the next doc ids smaller than `horizon`.
|
||||
///
|
||||
/// Unlike [`DocSet::fill_buffer`], this method must not advance past a doc id greater than or
|
||||
/// equal to `horizon`.
|
||||
fn fill_buffer_up_to(
|
||||
&mut self,
|
||||
horizon: DocId,
|
||||
buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
|
||||
) -> usize {
|
||||
if self.doc() == TERMINATED {
|
||||
return 0;
|
||||
}
|
||||
for (pos, buffer_val) in buffer.iter_mut().enumerate() {
|
||||
let doc = self.doc();
|
||||
if doc >= horizon {
|
||||
return pos;
|
||||
}
|
||||
*buffer_val = doc;
|
||||
if self.advance() == TERMINATED {
|
||||
return pos + 1;
|
||||
}
|
||||
}
|
||||
buffer.len()
|
||||
}
|
||||
|
||||
/// Returns the current document
|
||||
/// Right after creating a new `DocSet`, the docset points to the first document.
|
||||
///
|
||||
@@ -251,6 +276,14 @@ impl DocSet for &mut dyn DocSet {
|
||||
(**self).fill_buffer(buffer)
|
||||
}
|
||||
|
||||
fn fill_buffer_up_to(
|
||||
&mut self,
|
||||
horizon: DocId,
|
||||
buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
|
||||
) -> usize {
|
||||
(**self).fill_buffer_up_to(horizon, buffer)
|
||||
}
|
||||
|
||||
fn fill_bitset_block(
|
||||
&mut self,
|
||||
min_doc: DocId,
|
||||
|
||||
@@ -6,6 +6,7 @@ use common::{ByteCount, HasLen};
|
||||
use fnv::FnvHashMap;
|
||||
use itertools::Itertools;
|
||||
|
||||
use crate::directory::error::OpenReadError;
|
||||
use crate::directory::{CompositeFile, FileSlice};
|
||||
use crate::error::DataCorruption;
|
||||
use crate::fastfield::{intersect_alive_bitsets, AliveBitSet, FacetReader, FastFieldReaders};
|
||||
@@ -159,12 +160,10 @@ impl SegmentReader {
|
||||
let postings_file = segment.open_read(SegmentComponent::Postings)?;
|
||||
let postings_composite = CompositeFile::open(&postings_file)?;
|
||||
|
||||
let positions_composite = {
|
||||
if let Ok(positions_file) = segment.open_read(SegmentComponent::Positions) {
|
||||
CompositeFile::open(&positions_file)?
|
||||
} else {
|
||||
CompositeFile::empty()
|
||||
}
|
||||
let positions_composite = match segment.open_read(SegmentComponent::Positions) {
|
||||
Ok(positions_file) => CompositeFile::open(&positions_file)?,
|
||||
Err(OpenReadError::FileDoesNotExist(_)) => CompositeFile::empty(),
|
||||
Err(open_read_error) => return Err(open_read_error.into()),
|
||||
};
|
||||
|
||||
let schema = segment.schema();
|
||||
|
||||
@@ -240,6 +240,42 @@ impl BlockSegmentPostings {
|
||||
self.freq_decoder.output_array()
|
||||
}
|
||||
|
||||
pub(crate) fn copy_docs_and_term_freqs(
|
||||
&self,
|
||||
block_offset: usize,
|
||||
horizon: DocId,
|
||||
docs: &mut [DocId],
|
||||
term_freqs: &mut [u32],
|
||||
) -> usize {
|
||||
debug_assert_eq!(docs.len(), term_freqs.len());
|
||||
let block_docs = self.docs();
|
||||
let remaining_docs_in_block = block_docs.len().saturating_sub(block_offset);
|
||||
let max_len = remaining_docs_in_block.min(docs.len());
|
||||
if max_len == 0 {
|
||||
return 0;
|
||||
}
|
||||
|
||||
let source_docs = &block_docs[block_offset..block_offset + max_len];
|
||||
let len = if source_docs[max_len - 1] < horizon {
|
||||
max_len
|
||||
} else {
|
||||
source_docs
|
||||
.iter()
|
||||
.position(|&doc| doc >= horizon)
|
||||
.unwrap_or(max_len)
|
||||
};
|
||||
|
||||
docs[..len].copy_from_slice(&source_docs[..len]);
|
||||
|
||||
let block_freqs = self.freq_output_array();
|
||||
if block_freqs.len() >= block_offset + len {
|
||||
term_freqs[..len].copy_from_slice(&block_freqs[block_offset..block_offset + len]);
|
||||
} else {
|
||||
term_freqs[..len].fill(1);
|
||||
}
|
||||
len
|
||||
}
|
||||
|
||||
/// Return the frequency at index `idx` of the block.
|
||||
#[inline]
|
||||
pub fn freq(&self, idx: usize) -> u32 {
|
||||
|
||||
@@ -532,6 +532,16 @@ pub(crate) mod tests {
|
||||
fn score(&mut self) -> Score {
|
||||
self.0.score()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn can_score_doc(&self) -> bool {
|
||||
self.0.can_score_doc()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn score_doc(&mut self, doc: DocId, term_freq: u32) -> Score {
|
||||
self.0.score_doc(doc, term_freq)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn test_skip_against_unoptimized<F: Fn() -> Box<dyn DocSet>>(
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use common::HasLen;
|
||||
|
||||
use crate::docset::DocSet;
|
||||
use crate::docset::{DocSet, COLLECT_BLOCK_BUFFER_LEN};
|
||||
use crate::fastfield::AliveBitSet;
|
||||
use crate::positions::PositionReader;
|
||||
use crate::postings::compression::COMPRESSION_BLOCK_SIZE;
|
||||
@@ -151,6 +151,34 @@ impl SegmentPostings {
|
||||
position_reader,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn fill_buffer_up_to_with_term_freqs(
|
||||
&mut self,
|
||||
horizon: DocId,
|
||||
docs: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
|
||||
term_freqs: &mut [u32; COLLECT_BLOCK_BUFFER_LEN],
|
||||
) -> usize {
|
||||
let mut num_elems = 0;
|
||||
while num_elems < COLLECT_BLOCK_BUFFER_LEN && self.doc() < horizon {
|
||||
let copied = self.block_cursor.copy_docs_and_term_freqs(
|
||||
self.cur,
|
||||
horizon,
|
||||
&mut docs[num_elems..],
|
||||
&mut term_freqs[num_elems..],
|
||||
);
|
||||
if copied == 0 {
|
||||
break;
|
||||
}
|
||||
num_elems += copied;
|
||||
self.cur += copied;
|
||||
|
||||
if self.cur == COMPRESSION_BLOCK_SIZE {
|
||||
self.cur = 0;
|
||||
self.block_cursor.advance();
|
||||
}
|
||||
}
|
||||
num_elems
|
||||
}
|
||||
}
|
||||
|
||||
impl DocSet for SegmentPostings {
|
||||
|
||||
@@ -109,6 +109,16 @@ impl Scorer for AllScorer {
|
||||
fn score(&mut self) -> Score {
|
||||
1.0
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn can_score_doc(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn score_doc(&mut self, _doc: DocId, _term_freq: u32) -> Score {
|
||||
1.0
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -1,5 +1,9 @@
|
||||
use std::cell::RefCell;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::Arc;
|
||||
|
||||
use lru::LruCache;
|
||||
|
||||
use crate::fieldnorm::FieldNormReader;
|
||||
use crate::query::Explanation;
|
||||
use crate::schema::Field;
|
||||
@@ -59,7 +63,9 @@ fn cached_tf_component(fieldnorm: u32, average_fieldnorm: Score) -> Score {
|
||||
K1 * (1.0 - B + B * fieldnorm as Score / average_fieldnorm)
|
||||
}
|
||||
|
||||
fn compute_tf_cache(average_fieldnorm: Score) -> Arc<[Score; 256]> {
|
||||
const BM25_TF_CACHE_CAPACITY: usize = 64;
|
||||
|
||||
fn compute_tf_cache_uncached(average_fieldnorm: Score) -> Arc<[Score; 256]> {
|
||||
let mut cache: [Score; 256] = [0.0; 256];
|
||||
for (fieldnorm_id, cache_mut) in cache.iter_mut().enumerate() {
|
||||
let fieldnorm = FieldNormReader::id_to_fieldnorm(fieldnorm_id as u8);
|
||||
@@ -68,6 +74,36 @@ fn compute_tf_cache(average_fieldnorm: Score) -> Arc<[Score; 256]> {
|
||||
Arc::new(cache)
|
||||
}
|
||||
|
||||
thread_local! {
|
||||
static TF_CACHES: RefCell<LruCache<u32, Arc<[Score; 256]>>> = RefCell::new(LruCache::new(
|
||||
NonZeroUsize::new(BM25_TF_CACHE_CAPACITY).unwrap(),
|
||||
));
|
||||
}
|
||||
|
||||
/// The cache is shared across all [Bm25Weight] with the same average fieldnorm on the same thread.
|
||||
/// It is stored in a thread local LRU cache.
|
||||
///
|
||||
/// On one query all terms on the same field will share the same average fieldnorm, and thus the
|
||||
/// same cache. This will lower cache pressure.
|
||||
///
|
||||
/// Even between queries (on the same thread), the cache will be reused, which allows the cache to
|
||||
/// better learn the memory address of the cache and access patterns.
|
||||
///
|
||||
/// Thread local is used in order to be defensive about potential contention on the cache.
|
||||
fn compute_tf_cache(average_fieldnorm: Score) -> Arc<[Score; 256]> {
|
||||
let cache_key = average_fieldnorm.to_bits();
|
||||
TF_CACHES.with(|cache_by_average_fieldnorm| {
|
||||
let mut cache_by_average_fieldnorm = cache_by_average_fieldnorm.borrow_mut();
|
||||
if let Some(cache) = cache_by_average_fieldnorm.get(&cache_key) {
|
||||
return cache.clone();
|
||||
}
|
||||
|
||||
let cache = compute_tf_cache_uncached(average_fieldnorm);
|
||||
cache_by_average_fieldnorm.put(cache_key, cache.clone());
|
||||
cache
|
||||
})
|
||||
}
|
||||
|
||||
/// A struct used for computing BM25 scores.
|
||||
#[derive(Clone)]
|
||||
pub struct Bm25Weight {
|
||||
@@ -229,7 +265,7 @@ impl Bm25Weight {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use super::idf;
|
||||
use super::{idf, Bm25Weight};
|
||||
use crate::{assert_nearly_equals, Score};
|
||||
|
||||
#[test]
|
||||
@@ -237,4 +273,12 @@ mod tests {
|
||||
let score: Score = 2.0;
|
||||
assert_nearly_equals!(idf(1, 2), score.ln());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bm25_tf_cache_is_shared_for_same_average_fieldnorm() {
|
||||
let weight1 = Bm25Weight::for_one_term(1, 10, 3.0);
|
||||
let weight2 = Bm25Weight::for_one_term(2, 10, 3.0);
|
||||
|
||||
assert!(std::sync::Arc::ptr_eq(&weight1.cache, &weight2.cache));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -91,10 +91,14 @@ fn into_box_scorer<TScoreCombiner: ScoreCombiner>(
|
||||
num_docs: u32,
|
||||
) -> Box<dyn Scorer> {
|
||||
match scorer {
|
||||
SpecializedScorer::TermUnion(term_scorers) => {
|
||||
let union_scorer =
|
||||
BufferedUnionScorer::build(term_scorers, score_combiner_fn, num_docs);
|
||||
Box::new(union_scorer)
|
||||
SpecializedScorer::TermUnion(mut term_scorers) => {
|
||||
if term_scorers.len() == 1 {
|
||||
Box::new(term_scorers.pop().unwrap())
|
||||
} else {
|
||||
let union_scorer =
|
||||
BufferedUnionScorer::build(term_scorers, score_combiner_fn, num_docs);
|
||||
Box::new(union_scorer)
|
||||
}
|
||||
}
|
||||
SpecializedScorer::TermIntersection(term_scorers) => {
|
||||
let boxed_scorers: Vec<Box<dyn Scorer>> = term_scorers
|
||||
|
||||
@@ -112,6 +112,14 @@ impl<S: Scorer> DocSet for BoostScorer<S> {
|
||||
self.underlying.fill_buffer(buffer)
|
||||
}
|
||||
|
||||
fn fill_buffer_up_to(
|
||||
&mut self,
|
||||
horizon: DocId,
|
||||
buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
|
||||
) -> usize {
|
||||
self.underlying.fill_buffer_up_to(horizon, buffer)
|
||||
}
|
||||
|
||||
fn doc(&self) -> u32 {
|
||||
self.underlying.doc()
|
||||
}
|
||||
@@ -138,6 +146,27 @@ impl<S: Scorer> Scorer for BoostScorer<S> {
|
||||
fn score(&mut self) -> Score {
|
||||
self.underlying.score() * self.boost
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn can_score_doc(&self) -> bool {
|
||||
self.underlying.can_score_doc()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn score_doc(&mut self, doc: DocId, term_freq: u32) -> Score {
|
||||
self.underlying.score_doc(doc, term_freq) * self.boost
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn fill_buffer_up_to_with_term_freqs(
|
||||
&mut self,
|
||||
horizon: DocId,
|
||||
docs: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
|
||||
term_freqs: &mut [u32; COLLECT_BLOCK_BUFFER_LEN],
|
||||
) -> usize {
|
||||
self.underlying
|
||||
.fill_buffer_up_to_with_term_freqs(horizon, docs, term_freqs)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -141,6 +141,16 @@ impl<TDocSet: DocSet + 'static> Scorer for ConstScorer<TDocSet> {
|
||||
fn score(&mut self) -> Score {
|
||||
self.score
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn can_score_doc(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn score_doc(&mut self, _doc: DocId, _term_freq: u32) -> Score {
|
||||
self.score
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -315,6 +315,20 @@ mod tests {
|
||||
fn score(&mut self) -> Score {
|
||||
self.foo.get(self.cursor).map(|x| x.1).unwrap_or(0.0)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn can_score_doc(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn score_doc(&mut self, doc: DocId, _term_freq: u32) -> Score {
|
||||
self.foo
|
||||
.iter()
|
||||
.find(|(candidate_doc, _)| *candidate_doc == doc)
|
||||
.map(|(_, score)| *score)
|
||||
.unwrap_or(0.0)
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -59,6 +59,16 @@ impl Scorer for EmptyScorer {
|
||||
fn score(&mut self) -> Score {
|
||||
0.0
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn can_score_doc(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn score_doc(&mut self, _doc: DocId, _term_freq: u32) -> Score {
|
||||
0.0
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -1,5 +1,40 @@
|
||||
use crate::docset::{DocSet, TERMINATED};
|
||||
use crate::query::Scorer;
|
||||
use crate::Score;
|
||||
use crate::{DocId, Score};
|
||||
|
||||
struct ScoreOnlyScorer {
|
||||
doc: DocId,
|
||||
score: Score,
|
||||
}
|
||||
|
||||
impl DocSet for ScoreOnlyScorer {
|
||||
fn advance(&mut self) -> DocId {
|
||||
self.doc = TERMINATED;
|
||||
TERMINATED
|
||||
}
|
||||
|
||||
fn doc(&self) -> DocId {
|
||||
self.doc
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> u32 {
|
||||
1
|
||||
}
|
||||
}
|
||||
|
||||
impl Scorer for ScoreOnlyScorer {
|
||||
fn score(&mut self) -> Score {
|
||||
self.score
|
||||
}
|
||||
|
||||
fn can_score_doc(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn score_doc(&mut self, _doc: DocId, _term_freq: u32) -> Score {
|
||||
self.score
|
||||
}
|
||||
}
|
||||
|
||||
/// The `ScoreCombiner` trait defines how to compute
|
||||
/// an overall score given a list of scores.
|
||||
@@ -10,6 +45,17 @@ pub trait ScoreCombiner: Default + Clone + Send + Copy + 'static {
|
||||
/// or not.
|
||||
fn update<TScorer: Scorer>(&mut self, scorer: &mut TScorer);
|
||||
|
||||
/// Aggregates the score combiner with an already computed score.
|
||||
fn update_score(&mut self, doc: DocId, score: Score) {
|
||||
let mut scorer = ScoreOnlyScorer { doc, score };
|
||||
self.update(&mut scorer);
|
||||
}
|
||||
|
||||
/// Returns true if this combiner needs scorer scores to compute its state.
|
||||
fn requires_scoring() -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
/// Clears the score combiner state back to its initial state.
|
||||
fn clear(&mut self);
|
||||
|
||||
@@ -27,6 +73,12 @@ pub struct DoNothingCombiner;
|
||||
impl ScoreCombiner for DoNothingCombiner {
|
||||
fn update<TScorer: Scorer>(&mut self, _scorer: &mut TScorer) {}
|
||||
|
||||
fn update_score(&mut self, _doc: DocId, _score: Score) {}
|
||||
|
||||
fn requires_scoring() -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn clear(&mut self) {}
|
||||
|
||||
#[inline]
|
||||
@@ -42,10 +94,16 @@ pub struct SumCombiner {
|
||||
}
|
||||
|
||||
impl ScoreCombiner for SumCombiner {
|
||||
#[inline]
|
||||
fn update<TScorer: Scorer>(&mut self, scorer: &mut TScorer) {
|
||||
self.score += scorer.score();
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn update_score(&mut self, _doc: DocId, score: Score) {
|
||||
self.score += score;
|
||||
}
|
||||
|
||||
fn clear(&mut self) {
|
||||
self.score = 0.0;
|
||||
}
|
||||
@@ -77,12 +135,19 @@ impl DisjunctionMaxCombiner {
|
||||
}
|
||||
|
||||
impl ScoreCombiner for DisjunctionMaxCombiner {
|
||||
#[inline]
|
||||
fn update<TScorer: Scorer>(&mut self, scorer: &mut TScorer) {
|
||||
let score = scorer.score();
|
||||
self.max = Score::max(score, self.max);
|
||||
self.sum += score;
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn update_score(&mut self, _doc: DocId, score: Score) {
|
||||
self.max = Score::max(score, self.max);
|
||||
self.sum += score;
|
||||
}
|
||||
|
||||
fn clear(&mut self) {
|
||||
self.max = 0.0;
|
||||
self.sum = 0.0;
|
||||
|
||||
@@ -2,8 +2,8 @@ use std::ops::DerefMut;
|
||||
|
||||
use downcast_rs::impl_downcast;
|
||||
|
||||
use crate::docset::DocSet;
|
||||
use crate::Score;
|
||||
use crate::docset::{DocSet, COLLECT_BLOCK_BUFFER_LEN};
|
||||
use crate::{DocId, Score};
|
||||
|
||||
/// Scored set of documents matching a query within a specific segment.
|
||||
///
|
||||
@@ -13,6 +13,36 @@ pub trait Scorer: downcast_rs::Downcast + DocSet + 'static {
|
||||
///
|
||||
/// This method will perform a bit of computation and is not cached.
|
||||
fn score(&mut self) -> Score;
|
||||
|
||||
/// Returns true if [`Scorer::score_doc`] can score buffered docs without
|
||||
/// repositioning the scorer.
|
||||
///
|
||||
/// Scorers whose [`Scorer::score_doc`] needs term frequencies must also override
|
||||
/// [`Scorer::fill_buffer_up_to_with_term_freqs`].
|
||||
fn can_score_doc(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
/// Returns the score for `doc` with its term frequency.
|
||||
fn score_doc(&mut self, _doc: DocId, _term_freq: u32) -> Score {
|
||||
panic!(
|
||||
"score_doc is not supported by this scorer. You need check can_score_doc() before \
|
||||
calling this method."
|
||||
)
|
||||
}
|
||||
|
||||
/// Fills docs up to `horizon`.
|
||||
///
|
||||
/// The default implementation does not fill `term_freqs`. Scorers whose
|
||||
/// [`Scorer::score_doc`] reads term frequencies must override this method.
|
||||
fn fill_buffer_up_to_with_term_freqs(
|
||||
&mut self,
|
||||
horizon: DocId,
|
||||
docs: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
|
||||
_term_freqs: &mut [u32; COLLECT_BLOCK_BUFFER_LEN],
|
||||
) -> usize {
|
||||
DocSet::fill_buffer_up_to(self, horizon, docs)
|
||||
}
|
||||
}
|
||||
|
||||
impl_downcast!(Scorer);
|
||||
@@ -22,4 +52,25 @@ impl Scorer for Box<dyn Scorer> {
|
||||
fn score(&mut self) -> Score {
|
||||
self.deref_mut().score()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn can_score_doc(&self) -> bool {
|
||||
self.as_ref().can_score_doc()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn score_doc(&mut self, doc: DocId, term_freq: u32) -> Score {
|
||||
self.deref_mut().score_doc(doc, term_freq)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn fill_buffer_up_to_with_term_freqs(
|
||||
&mut self,
|
||||
horizon: DocId,
|
||||
docs: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
|
||||
term_freqs: &mut [u32; COLLECT_BLOCK_BUFFER_LEN],
|
||||
) -> usize {
|
||||
self.deref_mut()
|
||||
.fill_buffer_up_to_with_term_freqs(horizon, docs, term_freqs)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::docset::DocSet;
|
||||
use crate::docset::{DocSet, COLLECT_BLOCK_BUFFER_LEN};
|
||||
use crate::fieldnorm::FieldNormReader;
|
||||
use crate::postings::{BlockSegmentPostings, FreqReadingOption, Postings, SegmentPostings};
|
||||
use crate::query::bm25::Bm25Weight;
|
||||
@@ -147,6 +147,27 @@ impl Scorer for TermScorer {
|
||||
let term_freq = self.term_freq();
|
||||
self.similarity_weight.score(fieldnorm_id, term_freq)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn can_score_doc(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn score_doc(&mut self, doc: DocId, term_freq: u32) -> Score {
|
||||
let fieldnorm_id = self.fieldnorm_reader.fieldnorm_id(doc);
|
||||
self.similarity_weight.score(fieldnorm_id, term_freq)
|
||||
}
|
||||
|
||||
fn fill_buffer_up_to_with_term_freqs(
|
||||
&mut self,
|
||||
horizon: DocId,
|
||||
docs: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
|
||||
term_freqs: &mut [u32; COLLECT_BLOCK_BUFFER_LEN],
|
||||
) -> usize {
|
||||
self.postings
|
||||
.fill_buffer_up_to_with_term_freqs(horizon, docs, term_freqs)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -10,23 +10,7 @@ use crate::{DocId, Score};
|
||||
// of upcoming document IDs (the "horizon").
|
||||
const HORIZON_NUM_TINYBITSETS: usize = HORIZON as usize / 64;
|
||||
const HORIZON: u32 = 64u32 * 64u32;
|
||||
|
||||
// `drain_filter` is not stable yet.
|
||||
// This function is similar except that it does is not unstable, and
|
||||
// it does not keep the original vector ordering.
|
||||
//
|
||||
// Elements are dropped and not yielded.
|
||||
fn unordered_drain_filter<T, P>(v: &mut Vec<T>, mut predicate: P)
|
||||
where P: FnMut(&mut T) -> bool {
|
||||
let mut i = 0;
|
||||
while i < v.len() {
|
||||
if predicate(&mut v[i]) {
|
||||
v.swap_remove(i);
|
||||
} else {
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
const GROUPED_INSERT_MAX_BUCKET_SPAN: u32 = 2;
|
||||
|
||||
/// Creates a `DocSet` that iterate through the union of two or more `DocSet`s.
|
||||
pub struct BufferedUnionScorer<TScorer, TScoreCombiner = DoNothingCombiner> {
|
||||
@@ -53,31 +37,213 @@ pub struct BufferedUnionScorer<TScorer, TScoreCombiner = DoNothingCombiner> {
|
||||
score: Score,
|
||||
/// Number of documents in the segment.
|
||||
num_docs: u32,
|
||||
/// Scratch buffer for block-based refill.
|
||||
refill_docs: [DocId; COLLECT_BLOCK_BUFFER_LEN],
|
||||
/// Scratch buffer for term frequencies matching `refill_docs`.
|
||||
refill_term_freqs: [u32; COLLECT_BLOCK_BUFFER_LEN],
|
||||
/// Whether all children support scoring buffered docs after advancing.
|
||||
use_score_doc_refill: bool,
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn union_bucket(
|
||||
bitsets: &mut [TinySet; HORIZON_NUM_TINYBITSETS],
|
||||
bucket_pos: u32,
|
||||
tinyset: TinySet,
|
||||
) {
|
||||
debug_assert!((bucket_pos as usize) < HORIZON_NUM_TINYBITSETS);
|
||||
// `bucket` comes from a doc delta below `HORIZON`; there are exactly
|
||||
// `HORIZON / 64` buckets in the refill window.
|
||||
bitsets[bucket_pos as usize] = bitsets[bucket_pos as usize].union(tinyset);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn insert_delta(bitsets: &mut [TinySet; HORIZON_NUM_TINYBITSETS], delta: DocId) {
|
||||
debug_assert!(delta < HORIZON);
|
||||
// `delta < HORIZON`, so `delta / 64` is in the bitset array. The bit
|
||||
// offset is reduced modulo 64 before being inserted in the TinySet.
|
||||
bitsets[delta as usize / 64].insert_mut(delta % 64u32);
|
||||
}
|
||||
|
||||
fn insert_and_score_full_buffer<TScorer: Scorer, TScoreCombiner: ScoreCombiner>(
|
||||
scorer: &mut TScorer,
|
||||
docs: &[DocId; COLLECT_BLOCK_BUFFER_LEN],
|
||||
term_freqs: &[u32; COLLECT_BLOCK_BUFFER_LEN],
|
||||
bitsets: &mut [TinySet; HORIZON_NUM_TINYBITSETS],
|
||||
score_combiner: &mut [TScoreCombiner; HORIZON as usize],
|
||||
min_doc: DocId,
|
||||
) {
|
||||
debug_assert!(docs.windows(2).all(|pair| pair[0] < pair[1]));
|
||||
debug_assert!(docs[COLLECT_BLOCK_BUFFER_LEN - 1] - min_doc < HORIZON);
|
||||
|
||||
let first_delta = docs[0] - min_doc;
|
||||
let last_delta = docs[COLLECT_BLOCK_BUFFER_LEN - 1] - min_doc;
|
||||
let first_bucket = first_delta / 64;
|
||||
let last_bucket = last_delta / 64;
|
||||
|
||||
// Common for very dense scorers: 64 distinct doc ids in one 64-doc bucket
|
||||
// means all bits in that bucket are present.
|
||||
if first_bucket == last_bucket {
|
||||
union_bucket(bitsets, first_bucket, TinySet::full());
|
||||
score_full_buffer(scorer, docs, term_freqs, score_combiner, min_doc);
|
||||
return;
|
||||
}
|
||||
|
||||
// 64 sorted distinct integers spanning exactly 64 values are consecutive.
|
||||
// If they cross a TinySet boundary, this is just the suffix of the first
|
||||
// bucket plus the prefix of the second bucket.
|
||||
if last_delta - first_delta == COLLECT_BLOCK_BUFFER_LEN as u32 - 1 {
|
||||
union_bucket(
|
||||
bitsets,
|
||||
first_bucket,
|
||||
TinySet::range_greater_or_equal(first_delta % 64u32),
|
||||
);
|
||||
union_bucket(
|
||||
bitsets,
|
||||
last_bucket,
|
||||
TinySet::range_lower((last_delta + 1) % 64u32),
|
||||
);
|
||||
score_full_buffer(scorer, docs, term_freqs, score_combiner, min_doc);
|
||||
return;
|
||||
}
|
||||
|
||||
// Grouping wins only for very dense buffers that hit the same TinySet many
|
||||
// times. Once the 64 docs are spread farther, a straight pass is cheaper.
|
||||
if last_bucket - first_bucket <= GROUPED_INSERT_MAX_BUCKET_SPAN {
|
||||
let mut bucket = first_bucket;
|
||||
let mut tinyset = TinySet::empty();
|
||||
for (&doc, &term_freq) in docs.iter().zip(term_freqs.iter()) {
|
||||
let delta = doc - min_doc;
|
||||
let delta_bucket = delta / 64;
|
||||
if delta_bucket != bucket {
|
||||
union_bucket(bitsets, bucket, tinyset);
|
||||
bucket = delta_bucket;
|
||||
tinyset = TinySet::empty();
|
||||
}
|
||||
tinyset.insert_mut(delta % 64u32);
|
||||
let score = scorer.score_doc(doc, term_freq);
|
||||
update_score_combiner(score_combiner, delta, doc, score);
|
||||
}
|
||||
union_bucket(bitsets, bucket, tinyset);
|
||||
} else {
|
||||
for (&doc, &term_freq) in docs.iter().zip(term_freqs.iter()) {
|
||||
let delta = doc - min_doc;
|
||||
insert_delta(bitsets, delta);
|
||||
// TODO: score_doc access the field_norm reader for each _term_, instead of once per
|
||||
// doc. We could optimize this by caching the field norm for the doc, and
|
||||
// reusing it for all terms in the doc.
|
||||
let score = scorer.score_doc(doc, term_freq);
|
||||
update_score_combiner(score_combiner, delta, doc, score);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn update_score_combiner<TScoreCombiner: ScoreCombiner>(
|
||||
score_combiner: &mut [TScoreCombiner; HORIZON as usize],
|
||||
delta: DocId,
|
||||
doc: DocId,
|
||||
score: Score,
|
||||
) {
|
||||
debug_assert!(delta < HORIZON);
|
||||
// Full and partial refill only buffer docs below `horizon`, so their
|
||||
// deltas are always in the score-combiner window.
|
||||
score_combiner[delta as usize].update_score(doc, score);
|
||||
}
|
||||
|
||||
fn score_full_buffer<TScorer: Scorer, TScoreCombiner: ScoreCombiner>(
|
||||
scorer: &mut TScorer,
|
||||
docs: &[DocId; COLLECT_BLOCK_BUFFER_LEN],
|
||||
term_freqs: &[u32; COLLECT_BLOCK_BUFFER_LEN],
|
||||
score_combiner: &mut [TScoreCombiner; HORIZON as usize],
|
||||
min_doc: DocId,
|
||||
) {
|
||||
for (&doc, &term_freq) in docs.iter().zip(term_freqs.iter()) {
|
||||
let score = scorer.score_doc(doc, term_freq);
|
||||
update_score_combiner(score_combiner, doc - min_doc, doc, score);
|
||||
}
|
||||
}
|
||||
|
||||
fn refill_scorer_with_score_docs<TScorer: Scorer, TScoreCombiner: ScoreCombiner>(
|
||||
scorer: &mut TScorer,
|
||||
bitsets: &mut [TinySet; HORIZON_NUM_TINYBITSETS],
|
||||
score_combiner: &mut [TScoreCombiner; HORIZON as usize],
|
||||
docs: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
|
||||
term_freqs: &mut [u32; COLLECT_BLOCK_BUFFER_LEN],
|
||||
min_doc: DocId,
|
||||
horizon: DocId,
|
||||
) {
|
||||
loop {
|
||||
let len = scorer.fill_buffer_up_to_with_term_freqs(horizon, docs, term_freqs);
|
||||
if len == COLLECT_BLOCK_BUFFER_LEN {
|
||||
debug_assert!(docs[COLLECT_BLOCK_BUFFER_LEN - 1] != TERMINATED);
|
||||
debug_assert!(docs[COLLECT_BLOCK_BUFFER_LEN - 1] < horizon);
|
||||
insert_and_score_full_buffer(
|
||||
scorer,
|
||||
docs,
|
||||
term_freqs,
|
||||
bitsets,
|
||||
score_combiner,
|
||||
min_doc,
|
||||
);
|
||||
} else {
|
||||
for (&doc, &term_freq) in docs[..len].iter().zip(term_freqs[..len].iter()) {
|
||||
let delta = doc - min_doc;
|
||||
insert_delta(bitsets, delta);
|
||||
let score = scorer.score_doc(doc, term_freq);
|
||||
update_score_combiner(score_combiner, delta, doc, score);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn refill_scorer_from_current_doc<TScorer: Scorer, TScoreCombiner: ScoreCombiner>(
|
||||
scorer: &mut TScorer,
|
||||
bitsets: &mut [TinySet; HORIZON_NUM_TINYBITSETS],
|
||||
score_combiner: &mut [TScoreCombiner; HORIZON as usize],
|
||||
min_doc: DocId,
|
||||
horizon: DocId,
|
||||
) {
|
||||
loop {
|
||||
let doc = scorer.doc();
|
||||
if doc >= horizon {
|
||||
break;
|
||||
}
|
||||
let delta = doc - min_doc;
|
||||
insert_delta(bitsets, delta);
|
||||
debug_assert!(delta < HORIZON);
|
||||
score_combiner[delta as usize].update(scorer);
|
||||
scorer.advance();
|
||||
}
|
||||
}
|
||||
|
||||
fn refill<TScorer: Scorer, TScoreCombiner: ScoreCombiner>(
|
||||
scorers: &mut Vec<TScorer>,
|
||||
bitsets: &mut [TinySet; HORIZON_NUM_TINYBITSETS],
|
||||
score_combiner: &mut [TScoreCombiner; HORIZON as usize],
|
||||
docs: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
|
||||
term_freqs: &mut [u32; COLLECT_BLOCK_BUFFER_LEN],
|
||||
min_doc: DocId,
|
||||
use_score_doc_refill: bool,
|
||||
) {
|
||||
unordered_drain_filter(scorers, |scorer| {
|
||||
let horizon = min_doc + HORIZON;
|
||||
loop {
|
||||
let doc = scorer.doc();
|
||||
if doc >= horizon {
|
||||
return false;
|
||||
}
|
||||
// add this document
|
||||
let delta = doc - min_doc;
|
||||
bitsets[(delta / 64) as usize].insert_mut(delta % 64u32);
|
||||
score_combiner[delta as usize].update(scorer);
|
||||
if scorer.advance() == TERMINATED {
|
||||
// remove the docset, it has been entirely consumed.
|
||||
return true;
|
||||
}
|
||||
let horizon = min_doc + HORIZON;
|
||||
for scorer in scorers.iter_mut() {
|
||||
if use_score_doc_refill {
|
||||
refill_scorer_with_score_docs(
|
||||
scorer,
|
||||
bitsets,
|
||||
score_combiner,
|
||||
docs,
|
||||
term_freqs,
|
||||
min_doc,
|
||||
horizon,
|
||||
);
|
||||
} else {
|
||||
refill_scorer_from_current_doc(scorer, bitsets, score_combiner, min_doc, horizon);
|
||||
}
|
||||
});
|
||||
}
|
||||
scorers.retain(|scorer| scorer.doc() != TERMINATED);
|
||||
}
|
||||
|
||||
impl<TScorer: Scorer, TScoreCombiner: ScoreCombiner> BufferedUnionScorer<TScorer, TScoreCombiner> {
|
||||
@@ -87,6 +253,8 @@ impl<TScorer: Scorer, TScoreCombiner: ScoreCombiner> BufferedUnionScorer<TScorer
|
||||
score_combiner_fn: impl FnOnce() -> TScoreCombiner,
|
||||
num_docs: u32,
|
||||
) -> BufferedUnionScorer<TScorer, TScoreCombiner> {
|
||||
let use_score_doc_refill =
|
||||
TScoreCombiner::requires_scoring() && docsets.iter().all(Scorer::can_score_doc);
|
||||
let non_empty_docsets: Vec<TScorer> = docsets
|
||||
.into_iter()
|
||||
.filter(|docset| docset.doc() != TERMINATED)
|
||||
@@ -100,6 +268,9 @@ impl<TScorer: Scorer, TScoreCombiner: ScoreCombiner> BufferedUnionScorer<TScorer
|
||||
doc: 0,
|
||||
score: 0.0,
|
||||
num_docs,
|
||||
refill_docs: [TERMINATED; COLLECT_BLOCK_BUFFER_LEN],
|
||||
refill_term_freqs: [1u32; COLLECT_BLOCK_BUFFER_LEN],
|
||||
use_score_doc_refill,
|
||||
};
|
||||
if union.refill() {
|
||||
union.advance();
|
||||
@@ -120,7 +291,10 @@ impl<TScorer: Scorer, TScoreCombiner: ScoreCombiner> BufferedUnionScorer<TScorer
|
||||
&mut self.docsets,
|
||||
&mut self.bitsets,
|
||||
&mut self.scores,
|
||||
&mut self.refill_docs,
|
||||
&mut self.refill_term_freqs,
|
||||
min_doc,
|
||||
self.use_score_doc_refill,
|
||||
);
|
||||
true
|
||||
} else {
|
||||
@@ -248,12 +422,12 @@ where
|
||||
|
||||
// The target is outside of the buffered horizon.
|
||||
// advance all docsets to a doc >= to the target.
|
||||
unordered_drain_filter(&mut self.docsets, |docset| {
|
||||
for docset in &mut self.docsets {
|
||||
if docset.doc() < target {
|
||||
docset.seek(target);
|
||||
}
|
||||
docset.doc() == TERMINATED
|
||||
});
|
||||
}
|
||||
self.docsets.retain(|docset| docset.doc() != TERMINATED);
|
||||
|
||||
// at this point all of the docsets
|
||||
// are positioned on a doc >= to the target.
|
||||
|
||||
@@ -10,6 +10,8 @@ pub use simple_union::SimpleUnion;
|
||||
mod tests {
|
||||
|
||||
use std::collections::BTreeSet;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use common::BitSet;
|
||||
|
||||
@@ -18,8 +20,8 @@ mod tests {
|
||||
use crate::postings::tests::test_skip_against_unoptimized;
|
||||
use crate::query::score_combiner::DoNothingCombiner;
|
||||
use crate::query::union::bitset_union::BitSetPostingUnion;
|
||||
use crate::query::{BitSetDocSet, ConstScorer, VecDocSet};
|
||||
use crate::{tests, DocId};
|
||||
use crate::query::{BitSetDocSet, ConstScorer, Scorer, VecDocSet};
|
||||
use crate::{tests, DocId, Score};
|
||||
|
||||
fn vec_doc_set_from_docs_list(
|
||||
docs_list: &[Vec<DocId>],
|
||||
@@ -66,6 +68,61 @@ mod tests {
|
||||
}
|
||||
BitSetDocSet::from(doc_bitset)
|
||||
}
|
||||
|
||||
struct CountingScorer {
|
||||
docset: VecDocSet,
|
||||
score_calls: Arc<AtomicUsize>,
|
||||
score_doc_calls: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
impl CountingScorer {
|
||||
fn new(
|
||||
doc_ids: Vec<DocId>,
|
||||
score_calls: Arc<AtomicUsize>,
|
||||
score_doc_calls: Arc<AtomicUsize>,
|
||||
) -> Self {
|
||||
CountingScorer {
|
||||
docset: VecDocSet::from(doc_ids),
|
||||
score_calls,
|
||||
score_doc_calls,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DocSet for CountingScorer {
|
||||
fn advance(&mut self) -> DocId {
|
||||
self.docset.advance()
|
||||
}
|
||||
|
||||
fn seek(&mut self, target: DocId) -> DocId {
|
||||
self.docset.seek(target)
|
||||
}
|
||||
|
||||
fn doc(&self) -> DocId {
|
||||
self.docset.doc()
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> u32 {
|
||||
self.docset.size_hint()
|
||||
}
|
||||
}
|
||||
|
||||
impl Scorer for CountingScorer {
|
||||
fn score(&mut self) -> Score {
|
||||
self.score_calls.fetch_add(1, Ordering::SeqCst);
|
||||
1.0
|
||||
}
|
||||
|
||||
fn can_score_doc(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn score_doc(&mut self, _doc: DocId, _term_freq: u32) -> Score {
|
||||
self.score_doc_calls.fetch_add(1, Ordering::SeqCst);
|
||||
1.0
|
||||
}
|
||||
}
|
||||
|
||||
fn aux_test_union(docs_list: &[Vec<DocId>]) {
|
||||
for constructor in [
|
||||
posting_list_union_from_docs_list,
|
||||
@@ -168,6 +225,22 @@ mod tests {
|
||||
]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_do_nothing_combiner_does_not_score_buffered_docs() {
|
||||
let score_calls = Arc::new(AtomicUsize::new(0));
|
||||
let score_doc_calls = Arc::new(AtomicUsize::new(0));
|
||||
let scorers = vec![
|
||||
CountingScorer::new(vec![1, 3, 5], score_calls.clone(), score_doc_calls.clone()),
|
||||
CountingScorer::new(vec![2, 3, 6], score_calls.clone(), score_doc_calls.clone()),
|
||||
];
|
||||
|
||||
let mut union = BufferedUnionScorer::build(scorers, DoNothingCombiner::default, 10);
|
||||
|
||||
assert_eq!(union.count_including_deleted(), 5);
|
||||
assert_eq!(score_calls.load(Ordering::SeqCst), 0);
|
||||
assert_eq!(score_doc_calls.load(Ordering::SeqCst), 0);
|
||||
}
|
||||
|
||||
fn test_aux_union_skip(docs_list: &[Vec<DocId>], skip_targets: Vec<DocId>) {
|
||||
for constructor in [
|
||||
posting_list_union_from_docs_list,
|
||||
|
||||
@@ -14,11 +14,8 @@ use itertools::Itertools;
|
||||
use tantivy_fst::Automaton;
|
||||
use tantivy_fst::automaton::AlwaysMatch;
|
||||
|
||||
use crate::sstable_index_v3::SSTableIndexV3Empty;
|
||||
use crate::streamer::{Streamer, StreamerBuilder};
|
||||
use crate::{
|
||||
BlockAddr, DeltaReader, Reader, SSTable, SSTableIndex, SSTableIndexV3, TermOrdinal, VoidSSTable,
|
||||
};
|
||||
use crate::{BlockAddr, DeltaReader, Reader, SSTable, SSTableIndex, TermOrdinal, VoidSSTable};
|
||||
|
||||
/// An SSTable is a sorted map that associates sorted `&[u8]` keys
|
||||
/// to any kind of typed values.
|
||||
@@ -288,33 +285,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
let (sstable_slice, index_slice) = main_slice.split(index_offset as usize);
|
||||
let sstable_index_bytes = index_slice.read_bytes()?;
|
||||
|
||||
let sstable_index = match version {
|
||||
2 => SSTableIndex::V2(
|
||||
crate::sstable_index_v2::SSTableIndex::load(sstable_index_bytes).map_err(|_| {
|
||||
io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption")
|
||||
})?,
|
||||
),
|
||||
3 => {
|
||||
let (sstable_index_bytes, mut footerv3_len_bytes) = sstable_index_bytes.rsplit(8);
|
||||
let store_offset = u64::deserialize(&mut footerv3_len_bytes)?;
|
||||
if store_offset != 0 {
|
||||
SSTableIndex::V3(
|
||||
SSTableIndexV3::load(sstable_index_bytes, store_offset).map_err(|_| {
|
||||
io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption")
|
||||
})?,
|
||||
)
|
||||
} else {
|
||||
// if store_offset is zero, there is no index, so we build a pseudo-index
|
||||
// assuming a single block of sstable covering everything.
|
||||
SSTableIndex::V3Empty(SSTableIndexV3Empty::load(index_offset as usize))
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
return Err(io::Error::other(format!(
|
||||
"Unsupported sstable version, expected one of [2, 3], found {version}"
|
||||
)));
|
||||
}
|
||||
};
|
||||
let sstable_index = SSTableIndex::open(version, index_offset, sstable_index_bytes)?;
|
||||
|
||||
Ok(Dictionary {
|
||||
sstable_slice,
|
||||
@@ -525,10 +496,15 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
|
||||
// Open the block for the first ordinal.
|
||||
let mut bytes = Vec::new();
|
||||
let mut current_block_addr = self.sstable_index.get_block_with_ord(ord);
|
||||
let (mut current_block_addr, block_id) = self.sstable_index.get_and_locate_with_ord(ord);
|
||||
let mut current_sstable_delta_reader =
|
||||
self.sstable_delta_reader_block(current_block_addr.clone())?;
|
||||
let mut current_block_ordinal = current_block_addr.first_ordinal;
|
||||
let mut current_block_end_bound = self
|
||||
.sstable_index
|
||||
.get_block(block_id + 1)
|
||||
.map(|block_addr| block_addr.first_ordinal)
|
||||
.unwrap_or(u64::MAX);
|
||||
|
||||
loop {
|
||||
// move to the ord inside the current block
|
||||
@@ -557,17 +533,19 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
}
|
||||
};
|
||||
|
||||
// TODO optimization: it is silly to do a binary search to get the block every single
|
||||
// time.
|
||||
//
|
||||
// Check if block changed for new term_ord
|
||||
let new_block_addr = self.sstable_index.get_block_with_ord(next_ord);
|
||||
if new_block_addr != current_block_addr {
|
||||
if next_ord >= current_block_end_bound {
|
||||
let (new_block_addr, block_id) =
|
||||
self.sstable_index.get_and_locate_with_ord(next_ord);
|
||||
current_block_addr = new_block_addr;
|
||||
current_block_ordinal = current_block_addr.first_ordinal;
|
||||
current_sstable_delta_reader =
|
||||
self.sstable_delta_reader_block(current_block_addr.clone())?;
|
||||
bytes.clear();
|
||||
current_block_end_bound = self
|
||||
.sstable_index
|
||||
.get_block(block_id + 1)
|
||||
.map(|block_addr| block_addr.first_ordinal)
|
||||
.unwrap_or(u64::MAX)
|
||||
}
|
||||
ord = next_ord;
|
||||
}
|
||||
|
||||
319
sstable/src/index/mod.rs
Normal file
319
sstable/src/index/mod.rs
Normal file
@@ -0,0 +1,319 @@
|
||||
pub(crate) mod v2;
|
||||
pub(crate) mod v3;
|
||||
|
||||
use std::io::{self, Read, Write};
|
||||
use std::ops::Range;
|
||||
|
||||
use common::{BinarySerializable, FixedSize, OwnedBytes};
|
||||
use tantivy_fst::{Automaton, MapBuilder};
|
||||
|
||||
use crate::{TermOrdinal, common_prefix_len};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum SSTableIndex {
|
||||
V2(v2::SSTableIndex),
|
||||
V3(v3::SSTableIndexV3),
|
||||
V3Empty(v3::SSTableIndexV3Empty),
|
||||
}
|
||||
|
||||
impl SSTableIndex {
|
||||
pub(crate) fn open(
|
||||
version: u32,
|
||||
index_offset: u64,
|
||||
index_bytes: OwnedBytes,
|
||||
) -> io::Result<Self> {
|
||||
let index = match version {
|
||||
2 => {
|
||||
SSTableIndex::V2(v2::SSTableIndex::load(index_bytes).map_err(|_| {
|
||||
io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption")
|
||||
})?)
|
||||
}
|
||||
3 => {
|
||||
let (index_bytes, mut footerv3_len_bytes) = index_bytes.rsplit(8);
|
||||
let store_offset = u64::deserialize(&mut footerv3_len_bytes)?;
|
||||
if store_offset != 0 {
|
||||
SSTableIndex::V3(v3::SSTableIndexV3::load(index_bytes, store_offset).map_err(
|
||||
|_| io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption"),
|
||||
)?)
|
||||
} else {
|
||||
// if store_offset is zero, there is no index, so we build a pseudo-index
|
||||
// assuming a single block of sstable covering everything.
|
||||
SSTableIndex::V3Empty(v3::SSTableIndexV3Empty::load(index_offset as usize))
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
return Err(io::Error::other(format!(
|
||||
"Unsupported sstable version, expected one of [2, 3], found {version}"
|
||||
)));
|
||||
}
|
||||
};
|
||||
Ok(index)
|
||||
}
|
||||
|
||||
/// Get the [`BlockAddr`] of the requested block.
|
||||
pub(crate) fn get_block(&self, block_id: u64) -> Option<BlockAddr> {
|
||||
match self {
|
||||
SSTableIndex::V2(v2_index) => v2_index.get_block(block_id as usize),
|
||||
SSTableIndex::V3(v3_index) => v3_index.get_block(block_id),
|
||||
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block(block_id),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the block id of the block that would contain `key`.
|
||||
///
|
||||
/// Returns None if `key` is lexicographically after the last key recorded.
|
||||
pub(crate) fn locate_with_key(&self, key: &[u8]) -> Option<u64> {
|
||||
match self {
|
||||
SSTableIndex::V2(v2_index) => v2_index.locate_with_key(key).map(|i| i as u64),
|
||||
SSTableIndex::V3(v3_index) => v3_index.locate_with_key(key),
|
||||
SSTableIndex::V3Empty(v3_empty) => v3_empty.locate_with_key(key),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the [`BlockAddr`] of the block that would contain `key`.
|
||||
///
|
||||
/// Returns None if `key` is lexicographically after the last key recorded.
|
||||
pub fn get_block_with_key(&self, key: &[u8]) -> Option<BlockAddr> {
|
||||
match self {
|
||||
SSTableIndex::V2(v2_index) => v2_index.get_block_with_key(key),
|
||||
SSTableIndex::V3(v3_index) => v3_index.get_block_with_key(key),
|
||||
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_key(key),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn locate_with_ord(&self, ord: TermOrdinal) -> u64 {
|
||||
match self {
|
||||
SSTableIndex::V2(v2_index) => v2_index.locate_with_ord(ord) as u64,
|
||||
SSTableIndex::V3(v3_index) => v3_index.locate_with_ord(ord),
|
||||
SSTableIndex::V3Empty(v3_empty) => v3_empty.locate_with_ord(ord),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the [`BlockAddr`] of the block containing the `ord`-th term.
|
||||
pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr {
|
||||
match self {
|
||||
SSTableIndex::V2(v2_index) => v2_index.get_block_with_ord(ord),
|
||||
SSTableIndex::V3(v3_index) => v3_index.get_block_with_ord(ord),
|
||||
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_ord(ord),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_and_locate_with_ord(&self, ord: TermOrdinal) -> (BlockAddr, u64) {
|
||||
match self {
|
||||
SSTableIndex::V2(v2_index) => v2_index.get_and_locate_with_ord(ord),
|
||||
SSTableIndex::V3(v3_index) => v3_index.get_and_locate_with_ord(ord),
|
||||
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_and_locate_with_ord(ord),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_block_for_automaton<'a>(
|
||||
&'a self,
|
||||
automaton: &'a impl Automaton,
|
||||
) -> impl Iterator<Item = (u64, BlockAddr)> + 'a {
|
||||
match self {
|
||||
SSTableIndex::V2(v2_index) => {
|
||||
BlockIter::V2(v2_index.get_block_for_automaton(automaton))
|
||||
}
|
||||
SSTableIndex::V3(v3_index) => {
|
||||
BlockIter::V3(v3_index.get_block_for_automaton(automaton))
|
||||
}
|
||||
SSTableIndex::V3Empty(v3_empty) => {
|
||||
BlockIter::V3Empty(std::iter::once((0, v3_empty.block_addr.clone())))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum BlockIter<V2, V3, T> {
|
||||
V2(V2),
|
||||
V3(V3),
|
||||
V3Empty(std::iter::Once<T>),
|
||||
}
|
||||
|
||||
impl<V2: Iterator<Item = T>, V3: Iterator<Item = T>, T> Iterator for BlockIter<V2, V3, T> {
|
||||
type Item = T;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
match self {
|
||||
BlockIter::V2(v2) => v2.next(),
|
||||
BlockIter::V3(v3) => v3.next(),
|
||||
BlockIter::V3Empty(once) => once.next(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Eq, PartialEq, Debug)]
|
||||
pub struct BlockAddr {
|
||||
pub first_ordinal: u64,
|
||||
pub byte_range: Range<usize>,
|
||||
}
|
||||
|
||||
impl BlockAddr {
|
||||
fn to_block_start(&self) -> BlockStartAddr {
|
||||
BlockStartAddr {
|
||||
first_ordinal: self.first_ordinal,
|
||||
byte_range_start: self.byte_range.start,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
struct BlockStartAddr {
|
||||
first_ordinal: u64,
|
||||
byte_range_start: usize,
|
||||
}
|
||||
|
||||
impl BlockStartAddr {
|
||||
fn to_block_addr(&self, byte_range_end: usize) -> BlockAddr {
|
||||
BlockAddr {
|
||||
first_ordinal: self.first_ordinal,
|
||||
byte_range: self.byte_range_start..byte_range_end,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct BlockMeta {
|
||||
/// Any byte string that is lexicographically greater or equal to
|
||||
/// the last key in the block,
|
||||
/// and yet strictly smaller than the first key in the next block.
|
||||
pub last_key_or_greater: Vec<u8>,
|
||||
pub block_addr: BlockAddr,
|
||||
}
|
||||
|
||||
impl BinarySerializable for BlockStartAddr {
|
||||
fn serialize<W: Write + ?Sized>(&self, writer: &mut W) -> io::Result<()> {
|
||||
let start = self.byte_range_start as u64;
|
||||
start.serialize(writer)?;
|
||||
self.first_ordinal.serialize(writer)
|
||||
}
|
||||
|
||||
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
|
||||
let byte_range_start = u64::deserialize(reader)? as usize;
|
||||
let first_ordinal = u64::deserialize(reader)?;
|
||||
Ok(BlockStartAddr {
|
||||
first_ordinal,
|
||||
byte_range_start,
|
||||
})
|
||||
}
|
||||
|
||||
// Provided method
|
||||
fn num_bytes(&self) -> u64 {
|
||||
BlockStartAddr::SIZE_IN_BYTES as u64
|
||||
}
|
||||
}
|
||||
|
||||
impl FixedSize for BlockStartAddr {
|
||||
const SIZE_IN_BYTES: usize = 2 * u64::SIZE_IN_BYTES;
|
||||
}
|
||||
|
||||
/// Given that left < right,
|
||||
/// mutates `left into a shorter byte string left'` that
|
||||
/// matches `left <= left' < right`.
|
||||
fn find_shorter_str_in_between(left: &mut Vec<u8>, right: &[u8]) {
|
||||
assert!(&left[..] < right);
|
||||
let common_len = common_prefix_len(left, right);
|
||||
if left.len() == common_len {
|
||||
return;
|
||||
}
|
||||
// It is possible to do one character shorter in some case,
|
||||
// but it is not worth the extra complexity
|
||||
for pos in (common_len + 1)..left.len() {
|
||||
if left[pos] != u8::MAX {
|
||||
left[pos] += 1;
|
||||
left.truncate(pos + 1);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct SSTableIndexBuilder {
|
||||
blocks: Vec<BlockMeta>,
|
||||
}
|
||||
|
||||
impl SSTableIndexBuilder {
|
||||
/// In order to make the index as light as possible, we
|
||||
/// try to find a shorter alternative to the last key of the last block
|
||||
/// that is still smaller than the next key.
|
||||
pub(crate) fn shorten_last_block_key_given_next_key(&mut self, next_key: &[u8]) {
|
||||
if let Some(last_block) = self.blocks.last_mut() {
|
||||
find_shorter_str_in_between(&mut last_block.last_key_or_greater, next_key);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_block(&mut self, last_key: &[u8], byte_range: Range<usize>, first_ordinal: u64) {
|
||||
self.blocks.push(BlockMeta {
|
||||
last_key_or_greater: last_key.to_vec(),
|
||||
block_addr: BlockAddr {
|
||||
byte_range,
|
||||
first_ordinal,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
pub fn serialize<W: std::io::Write>(&self, wrt: W) -> io::Result<u64> {
|
||||
if self.blocks.len() <= 1 {
|
||||
return Ok(0);
|
||||
}
|
||||
let counting_writer = common::CountingWriter::wrap(wrt);
|
||||
let mut map_builder = MapBuilder::new(counting_writer).map_err(fst_error_to_io_error)?;
|
||||
for (i, block) in self.blocks.iter().enumerate() {
|
||||
map_builder
|
||||
.insert(&block.last_key_or_greater, i as u64)
|
||||
.map_err(fst_error_to_io_error)?;
|
||||
}
|
||||
let counting_writer = map_builder.into_inner().map_err(fst_error_to_io_error)?;
|
||||
let written_bytes = counting_writer.written_bytes();
|
||||
let mut wrt = counting_writer.finish();
|
||||
|
||||
let mut block_store_writer = v3::BlockAddrStoreWriter::new();
|
||||
for block in &self.blocks {
|
||||
block_store_writer.write_block_meta(block.block_addr.clone())?;
|
||||
}
|
||||
block_store_writer.serialize(&mut wrt)?;
|
||||
|
||||
Ok(written_bytes)
|
||||
}
|
||||
}
|
||||
|
||||
fn fst_error_to_io_error(error: tantivy_fst::Error) -> io::Error {
|
||||
match error {
|
||||
tantivy_fst::Error::Fst(fst_error) => io::Error::other(fst_error),
|
||||
tantivy_fst::Error::Io(ioerror) => ioerror,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
#[track_caller]
|
||||
fn test_find_shorter_str_in_between_aux(left: &[u8], right: &[u8]) {
|
||||
let mut left_buf = left.to_vec();
|
||||
super::find_shorter_str_in_between(&mut left_buf, right);
|
||||
assert!(left_buf.len() <= left.len());
|
||||
assert!(left <= &left_buf);
|
||||
assert!(&left_buf[..] < right);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_shorter_str_in_between() {
|
||||
test_find_shorter_str_in_between_aux(b"", b"hello");
|
||||
test_find_shorter_str_in_between_aux(b"abc", b"abcd");
|
||||
test_find_shorter_str_in_between_aux(b"abcd", b"abd");
|
||||
test_find_shorter_str_in_between_aux(&[0, 0, 0], &[1]);
|
||||
test_find_shorter_str_in_between_aux(&[0, 0, 0], &[0, 0, 1]);
|
||||
test_find_shorter_str_in_between_aux(&[0, 0, 255, 255, 255, 0u8], &[0, 1]);
|
||||
}
|
||||
|
||||
use proptest::prelude::*;
|
||||
|
||||
proptest! {
|
||||
#![proptest_config(ProptestConfig::with_cases(100))]
|
||||
#[test]
|
||||
fn test_proptest_find_shorter_str(left in any::<Vec<u8>>(), right in any::<Vec<u8>>()) {
|
||||
if left < right {
|
||||
test_find_shorter_str_in_between_aux(&left, &right);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -77,6 +77,13 @@ impl SSTableIndex {
|
||||
self.get_block(self.locate_with_ord(ord)).unwrap()
|
||||
}
|
||||
|
||||
pub(crate) fn get_and_locate_with_ord(&self, ord: TermOrdinal) -> (BlockAddr, u64) {
|
||||
let location = self.locate_with_ord(ord);
|
||||
// locate_with_ord always returns an index within range
|
||||
let block_addr = self.get_block(location).unwrap();
|
||||
(block_addr, location as u64)
|
||||
}
|
||||
|
||||
pub(crate) fn get_block_for_automaton<'a>(
|
||||
&'a self,
|
||||
automaton: &'a impl Automaton,
|
||||
@@ -1,106 +1,14 @@
|
||||
use std::io::{self, Read, Write};
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common::{BinarySerializable, FixedSize, OwnedBytes};
|
||||
use tantivy_bitpacker::{BitPacker, compute_num_bits};
|
||||
use tantivy_fst::raw::Fst;
|
||||
use tantivy_fst::{Automaton, IntoStreamer, Map, MapBuilder, Streamer};
|
||||
use tantivy_fst::{Automaton, IntoStreamer, Map, Streamer};
|
||||
|
||||
use super::{BlockAddr, BlockStartAddr};
|
||||
use crate::block_match_automaton::can_block_match_automaton;
|
||||
use crate::{SSTableDataCorruption, TermOrdinal, common_prefix_len};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum SSTableIndex {
|
||||
V2(crate::sstable_index_v2::SSTableIndex),
|
||||
V3(SSTableIndexV3),
|
||||
V3Empty(SSTableIndexV3Empty),
|
||||
}
|
||||
|
||||
impl SSTableIndex {
|
||||
/// Get the [`BlockAddr`] of the requested block.
|
||||
pub(crate) fn get_block(&self, block_id: u64) -> Option<BlockAddr> {
|
||||
match self {
|
||||
SSTableIndex::V2(v2_index) => v2_index.get_block(block_id as usize),
|
||||
SSTableIndex::V3(v3_index) => v3_index.get_block(block_id),
|
||||
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block(block_id),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the block id of the block that would contain `key`.
|
||||
///
|
||||
/// Returns None if `key` is lexicographically after the last key recorded.
|
||||
pub(crate) fn locate_with_key(&self, key: &[u8]) -> Option<u64> {
|
||||
match self {
|
||||
SSTableIndex::V2(v2_index) => v2_index.locate_with_key(key).map(|i| i as u64),
|
||||
SSTableIndex::V3(v3_index) => v3_index.locate_with_key(key),
|
||||
SSTableIndex::V3Empty(v3_empty) => v3_empty.locate_with_key(key),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the [`BlockAddr`] of the block that would contain `key`.
|
||||
///
|
||||
/// Returns None if `key` is lexicographically after the last key recorded.
|
||||
pub fn get_block_with_key(&self, key: &[u8]) -> Option<BlockAddr> {
|
||||
match self {
|
||||
SSTableIndex::V2(v2_index) => v2_index.get_block_with_key(key),
|
||||
SSTableIndex::V3(v3_index) => v3_index.get_block_with_key(key),
|
||||
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_key(key),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn locate_with_ord(&self, ord: TermOrdinal) -> u64 {
|
||||
match self {
|
||||
SSTableIndex::V2(v2_index) => v2_index.locate_with_ord(ord) as u64,
|
||||
SSTableIndex::V3(v3_index) => v3_index.locate_with_ord(ord),
|
||||
SSTableIndex::V3Empty(v3_empty) => v3_empty.locate_with_ord(ord),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the [`BlockAddr`] of the block containing the `ord`-th term.
|
||||
pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr {
|
||||
match self {
|
||||
SSTableIndex::V2(v2_index) => v2_index.get_block_with_ord(ord),
|
||||
SSTableIndex::V3(v3_index) => v3_index.get_block_with_ord(ord),
|
||||
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_ord(ord),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_block_for_automaton<'a>(
|
||||
&'a self,
|
||||
automaton: &'a impl Automaton,
|
||||
) -> impl Iterator<Item = (u64, BlockAddr)> + 'a {
|
||||
match self {
|
||||
SSTableIndex::V2(v2_index) => {
|
||||
BlockIter::V2(v2_index.get_block_for_automaton(automaton))
|
||||
}
|
||||
SSTableIndex::V3(v3_index) => {
|
||||
BlockIter::V3(v3_index.get_block_for_automaton(automaton))
|
||||
}
|
||||
SSTableIndex::V3Empty(v3_empty) => {
|
||||
BlockIter::V3Empty(std::iter::once((0, v3_empty.block_addr.clone())))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum BlockIter<V2, V3, T> {
|
||||
V2(V2),
|
||||
V3(V3),
|
||||
V3Empty(std::iter::Once<T>),
|
||||
}
|
||||
|
||||
impl<V2: Iterator<Item = T>, V3: Iterator<Item = T>, T> Iterator for BlockIter<V2, V3, T> {
|
||||
type Item = T;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
match self {
|
||||
BlockIter::V2(v2) => v2.next(),
|
||||
BlockIter::V3(v3) => v3.next(),
|
||||
BlockIter::V3Empty(once) => once.next(),
|
||||
}
|
||||
}
|
||||
}
|
||||
use crate::{SSTableDataCorruption, TermOrdinal};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SSTableIndexV3 {
|
||||
@@ -160,6 +68,11 @@ impl SSTableIndexV3 {
|
||||
self.block_addr_store.binary_search_ord(ord).1
|
||||
}
|
||||
|
||||
pub(crate) fn get_and_locate_with_ord(&self, ord: TermOrdinal) -> (BlockAddr, u64) {
|
||||
let (location, block_addr) = self.block_addr_store.binary_search_ord(ord);
|
||||
(block_addr, location)
|
||||
}
|
||||
|
||||
pub(crate) fn get_block_for_automaton<'a>(
|
||||
&'a self,
|
||||
automaton: &'a impl Automaton,
|
||||
@@ -216,7 +129,7 @@ impl<A: Automaton> Iterator for GetBlockForAutomaton<'_, A> {
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SSTableIndexV3Empty {
|
||||
block_addr: BlockAddr,
|
||||
pub block_addr: BlockAddr,
|
||||
}
|
||||
|
||||
impl SSTableIndexV3Empty {
|
||||
@@ -230,8 +143,8 @@ impl SSTableIndexV3Empty {
|
||||
}
|
||||
|
||||
/// Get the [`BlockAddr`] of the requested block.
|
||||
pub(crate) fn get_block(&self, _block_id: u64) -> Option<BlockAddr> {
|
||||
Some(self.block_addr.clone())
|
||||
pub(crate) fn get_block(&self, block_id: u64) -> Option<BlockAddr> {
|
||||
(block_id == 0).then(|| self.block_addr.clone())
|
||||
}
|
||||
|
||||
/// Get the block id of the block that would contain `key`.
|
||||
@@ -256,146 +169,9 @@ impl SSTableIndexV3Empty {
|
||||
pub(crate) fn get_block_with_ord(&self, _ord: TermOrdinal) -> BlockAddr {
|
||||
self.block_addr.clone()
|
||||
}
|
||||
}
|
||||
#[derive(Clone, Eq, PartialEq, Debug)]
|
||||
pub struct BlockAddr {
|
||||
pub first_ordinal: u64,
|
||||
pub byte_range: Range<usize>,
|
||||
}
|
||||
|
||||
impl BlockAddr {
|
||||
fn to_block_start(&self) -> BlockStartAddr {
|
||||
BlockStartAddr {
|
||||
first_ordinal: self.first_ordinal,
|
||||
byte_range_start: self.byte_range.start,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
struct BlockStartAddr {
|
||||
first_ordinal: u64,
|
||||
byte_range_start: usize,
|
||||
}
|
||||
|
||||
impl BlockStartAddr {
|
||||
fn to_block_addr(&self, byte_range_end: usize) -> BlockAddr {
|
||||
BlockAddr {
|
||||
first_ordinal: self.first_ordinal,
|
||||
byte_range: self.byte_range_start..byte_range_end,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct BlockMeta {
|
||||
/// Any byte string that is lexicographically greater or equal to
|
||||
/// the last key in the block,
|
||||
/// and yet strictly smaller than the first key in the next block.
|
||||
pub last_key_or_greater: Vec<u8>,
|
||||
pub block_addr: BlockAddr,
|
||||
}
|
||||
|
||||
impl BinarySerializable for BlockStartAddr {
|
||||
fn serialize<W: Write + ?Sized>(&self, writer: &mut W) -> io::Result<()> {
|
||||
let start = self.byte_range_start as u64;
|
||||
start.serialize(writer)?;
|
||||
self.first_ordinal.serialize(writer)
|
||||
}
|
||||
|
||||
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
|
||||
let byte_range_start = u64::deserialize(reader)? as usize;
|
||||
let first_ordinal = u64::deserialize(reader)?;
|
||||
Ok(BlockStartAddr {
|
||||
first_ordinal,
|
||||
byte_range_start,
|
||||
})
|
||||
}
|
||||
|
||||
// Provided method
|
||||
fn num_bytes(&self) -> u64 {
|
||||
BlockStartAddr::SIZE_IN_BYTES as u64
|
||||
}
|
||||
}
|
||||
|
||||
impl FixedSize for BlockStartAddr {
|
||||
const SIZE_IN_BYTES: usize = 2 * u64::SIZE_IN_BYTES;
|
||||
}
|
||||
|
||||
/// Given that left < right,
|
||||
/// mutates `left into a shorter byte string left'` that
|
||||
/// matches `left <= left' < right`.
|
||||
fn find_shorter_str_in_between(left: &mut Vec<u8>, right: &[u8]) {
|
||||
assert!(&left[..] < right);
|
||||
let common_len = common_prefix_len(left, right);
|
||||
if left.len() == common_len {
|
||||
return;
|
||||
}
|
||||
// It is possible to do one character shorter in some case,
|
||||
// but it is not worth the extra complexity
|
||||
for pos in (common_len + 1)..left.len() {
|
||||
if left[pos] != u8::MAX {
|
||||
left[pos] += 1;
|
||||
left.truncate(pos + 1);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct SSTableIndexBuilder {
|
||||
blocks: Vec<BlockMeta>,
|
||||
}
|
||||
|
||||
impl SSTableIndexBuilder {
|
||||
/// In order to make the index as light as possible, we
|
||||
/// try to find a shorter alternative to the last key of the last block
|
||||
/// that is still smaller than the next key.
|
||||
pub(crate) fn shorten_last_block_key_given_next_key(&mut self, next_key: &[u8]) {
|
||||
if let Some(last_block) = self.blocks.last_mut() {
|
||||
find_shorter_str_in_between(&mut last_block.last_key_or_greater, next_key);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_block(&mut self, last_key: &[u8], byte_range: Range<usize>, first_ordinal: u64) {
|
||||
self.blocks.push(BlockMeta {
|
||||
last_key_or_greater: last_key.to_vec(),
|
||||
block_addr: BlockAddr {
|
||||
byte_range,
|
||||
first_ordinal,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
pub fn serialize<W: std::io::Write>(&self, wrt: W) -> io::Result<u64> {
|
||||
if self.blocks.len() <= 1 {
|
||||
return Ok(0);
|
||||
}
|
||||
let counting_writer = common::CountingWriter::wrap(wrt);
|
||||
let mut map_builder = MapBuilder::new(counting_writer).map_err(fst_error_to_io_error)?;
|
||||
for (i, block) in self.blocks.iter().enumerate() {
|
||||
map_builder
|
||||
.insert(&block.last_key_or_greater, i as u64)
|
||||
.map_err(fst_error_to_io_error)?;
|
||||
}
|
||||
let counting_writer = map_builder.into_inner().map_err(fst_error_to_io_error)?;
|
||||
let written_bytes = counting_writer.written_bytes();
|
||||
let mut wrt = counting_writer.finish();
|
||||
|
||||
let mut block_store_writer = BlockAddrStoreWriter::new();
|
||||
for block in &self.blocks {
|
||||
block_store_writer.write_block_meta(block.block_addr.clone())?;
|
||||
}
|
||||
block_store_writer.serialize(&mut wrt)?;
|
||||
|
||||
Ok(written_bytes)
|
||||
}
|
||||
}
|
||||
|
||||
fn fst_error_to_io_error(error: tantivy_fst::Error) -> io::Error {
|
||||
match error {
|
||||
tantivy_fst::Error::Fst(fst_error) => io::Error::other(fst_error),
|
||||
tantivy_fst::Error::Io(ioerror) => ioerror,
|
||||
pub(crate) fn get_and_locate_with_ord(&self, _ord: TermOrdinal) -> (BlockAddr, u64) {
|
||||
(self.block_addr.clone(), 0)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -647,14 +423,14 @@ fn binary_search(max: u64, cmp_fn: impl Fn(u64) -> std::cmp::Ordering) -> Result
|
||||
Err(left)
|
||||
}
|
||||
|
||||
struct BlockAddrStoreWriter {
|
||||
pub(crate) struct BlockAddrStoreWriter {
|
||||
buffer_block_metas: Vec<u8>,
|
||||
buffer_addrs: Vec<u8>,
|
||||
block_addrs: Vec<BlockAddr>,
|
||||
}
|
||||
|
||||
impl BlockAddrStoreWriter {
|
||||
fn new() -> Self {
|
||||
pub(crate) fn new() -> Self {
|
||||
BlockAddrStoreWriter {
|
||||
buffer_block_metas: Vec::new(),
|
||||
buffer_addrs: Vec::new(),
|
||||
@@ -662,7 +438,7 @@ impl BlockAddrStoreWriter {
|
||||
}
|
||||
}
|
||||
|
||||
fn flush_block(&mut self) -> io::Result<()> {
|
||||
pub(crate) fn flush_block(&mut self) -> io::Result<()> {
|
||||
if self.block_addrs.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
@@ -741,7 +517,7 @@ impl BlockAddrStoreWriter {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_block_meta(&mut self, block_addr: BlockAddr) -> io::Result<()> {
|
||||
pub(crate) fn write_block_meta(&mut self, block_addr: BlockAddr) -> io::Result<()> {
|
||||
self.block_addrs.push(block_addr);
|
||||
if self.block_addrs.len() >= STORE_BLOCK_LEN {
|
||||
self.flush_block()?;
|
||||
@@ -749,7 +525,7 @@ impl BlockAddrStoreWriter {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn serialize<W: std::io::Write>(&mut self, wrt: &mut W) -> io::Result<()> {
|
||||
pub(crate) fn serialize<W: std::io::Write>(&mut self, wrt: &mut W) -> io::Result<()> {
|
||||
self.flush_block()?;
|
||||
let len = self.buffer_block_metas.len() as u64;
|
||||
len.serialize(wrt)?;
|
||||
@@ -824,8 +600,9 @@ mod tests {
|
||||
use common::OwnedBytes;
|
||||
|
||||
use super::*;
|
||||
use crate::SSTableDataCorruption;
|
||||
use crate::block_match_automaton::tests::EqBuffer;
|
||||
use crate::index::BlockMeta;
|
||||
use crate::{SSTableDataCorruption, SSTableIndexBuilder};
|
||||
|
||||
#[test]
|
||||
fn test_sstable_index() {
|
||||
@@ -874,36 +651,7 @@ mod tests {
|
||||
assert!(matches!(data_corruption_err, SSTableDataCorruption));
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
fn test_find_shorter_str_in_between_aux(left: &[u8], right: &[u8]) {
|
||||
let mut left_buf = left.to_vec();
|
||||
super::find_shorter_str_in_between(&mut left_buf, right);
|
||||
assert!(left_buf.len() <= left.len());
|
||||
assert!(left <= &left_buf);
|
||||
assert!(&left_buf[..] < right);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_shorter_str_in_between() {
|
||||
test_find_shorter_str_in_between_aux(b"", b"hello");
|
||||
test_find_shorter_str_in_between_aux(b"abc", b"abcd");
|
||||
test_find_shorter_str_in_between_aux(b"abcd", b"abd");
|
||||
test_find_shorter_str_in_between_aux(&[0, 0, 0], &[1]);
|
||||
test_find_shorter_str_in_between_aux(&[0, 0, 0], &[0, 0, 1]);
|
||||
test_find_shorter_str_in_between_aux(&[0, 0, 255, 255, 255, 0u8], &[0, 1]);
|
||||
}
|
||||
|
||||
use proptest::prelude::*;
|
||||
|
||||
proptest! {
|
||||
#![proptest_config(ProptestConfig::with_cases(100))]
|
||||
#[test]
|
||||
fn test_proptest_find_shorter_str(left in any::<Vec<u8>>(), right in any::<Vec<u8>>()) {
|
||||
if left < right {
|
||||
test_find_shorter_str_in_between_aux(&left, &right);
|
||||
}
|
||||
}
|
||||
}
|
||||
// use proptest::prelude::*;
|
||||
|
||||
#[test]
|
||||
fn test_find_best_slop() {
|
||||
@@ -47,9 +47,8 @@ pub mod merge;
|
||||
mod streamer;
|
||||
pub mod value;
|
||||
|
||||
mod sstable_index_v3;
|
||||
pub use sstable_index_v3::{BlockAddr, SSTableIndex, SSTableIndexBuilder, SSTableIndexV3};
|
||||
mod sstable_index_v2;
|
||||
mod index;
|
||||
pub use index::{BlockAddr, SSTableIndex, SSTableIndexBuilder};
|
||||
pub(crate) mod vint;
|
||||
pub use dictionary::{Dictionary, TermOrdHit};
|
||||
pub use streamer::{Streamer, StreamerBuilder};
|
||||
|
||||
@@ -27,7 +27,7 @@ rand = "0.9"
|
||||
zipf = "7.0.0"
|
||||
rustc-hash = "2.1.0"
|
||||
proptest = "1.2.0"
|
||||
binggan = { version = "0.16.1" }
|
||||
binggan = { version = "0.17.0" }
|
||||
rand_distr = "0.5"
|
||||
|
||||
[features]
|
||||
|
||||
Reference in New Issue
Block a user