Compare commits

..

9 Commits

Author SHA1 Message Date
Paul Masurel
e4759b1d82 Fixes build for no-default-features 2020-06-05 19:40:32 +09:00
Paul Masurel
4026d183bc Small readability change 2020-06-03 09:04:57 +09:00
Paul Masurel
c0f5645cd9 Move for_each functions from Scorer to Weight. (#836)
* Move for_each functions from Scorer to Weight.

* Specialized foreach / foreach_pruning for union of termscorer.
2020-06-01 11:31:18 +09:00
Paul Masurel
cbff874e43 Change the loading of blocks. 2020-05-27 16:36:50 +09:00
Paul Masurel
baf015fc57 Simplification of the segment postings seek implementation. (#834) 2020-05-27 08:49:47 +09:00
Paul Masurel
7275ebdf3c Skiprefactoring skipabsolute (#831)
Simplification of the way we handle positions.
2020-05-25 09:51:23 +09:00
Paul Masurel
b974e7ce34 Closes #828. (#829)
There was a bug in the LogMergePolicy that was surfacing when there were
segments, but all of the segments were larger than the max limit.

After filtering, the list of segments candidate for merge was 0, and
the code was indexing the first element of an empty Vec.
2020-05-22 16:24:07 +09:00
Paul Masurel
8f8f34499f Updated CHANGELOG with the TopCollector offset information and cargo fmt. 2020-05-20 22:26:54 +09:00
Rob Young
6ea6f4bfcd Add offset to TopDocsCollector (#826)
* Add offset to TopDocsCollector

Add an offset to TopDocsCollector and TopDocs to make it clearer how to
handle pagination.

Closes #822

* Address review comments

- Make Debug formatting of TopDocs clearer.
- Add unit tests for limit and offset on TopCollector.
- Change API for using offset to a fluent interface.
- Add some context to the docstring to clarify what limit and offset are
  equivalent to in other projects.

* Changes required by rebase on e25284

- Pass Collector into TweakedScoreTopCollector and
  CustomScoreTopCollector.
- Add std:: qualifier to f32, i32 etc. Not sure why this was not failing
  already.
- Add unit tests for TopDocs with offset including for tweaked and
  custom score collectors.

In order to convert a TopCollector<Score> to a TopCollector<TScore> I
had to add a `into_tscore` method to `TopCollector`. This is a hack but
I don't know how to avoid it.
2020-05-20 22:25:24 +09:00
27 changed files with 1210 additions and 1048 deletions

View File

@@ -18,7 +18,6 @@ The change made it possible to greatly simplify a lot of the docset's code.
- Misc internal optimization and introduction of the `Scorer::for_each_pruning` function. (@fulmicoton)
- Added an offset option to the Top(.*)Collectors. (@robyoung)
Tantivy 0.12.0
======================
- Removing static dispatch in tokenizers for simplicity. (#762)

View File

@@ -109,7 +109,7 @@ pub use self::tweak_score_top_collector::{ScoreSegmentTweaker, ScoreTweaker};
mod facet_collector;
pub use self::facet_collector::FacetCollector;
use crate::query::Scorer;
use crate::query::Weight;
/// `Fruit` is the type for the result of our collection.
/// e.g. `usize` for the `Count` collector.
@@ -159,21 +159,22 @@ pub trait Collector: Sync {
/// Created a segment collector and
fn collect_segment(
&self,
scorer: &mut dyn Scorer,
weight: &dyn Weight,
segment_ord: u32,
segment_reader: &SegmentReader,
reader: &SegmentReader,
) -> crate::Result<<Self::Child as SegmentCollector>::Fruit> {
let mut segment_collector = self.for_segment(segment_ord as u32, segment_reader)?;
if let Some(delete_bitset) = segment_reader.delete_bitset() {
scorer.for_each(&mut |doc, score| {
let mut segment_collector = self.for_segment(segment_ord as u32, reader)?;
if let Some(delete_bitset) = reader.delete_bitset() {
weight.for_each(reader, &mut |doc, score| {
if delete_bitset.is_alive(doc) {
segment_collector.collect(doc, score);
}
});
})?;
} else {
scorer.for_each(&mut |doc, score| {
weight.for_each(reader, &mut |doc, score| {
segment_collector.collect(doc, score);
})
})?;
}
Ok(segment_collector.harvest())
}

View File

@@ -6,9 +6,8 @@ use crate::collector::tweak_score_top_collector::TweakedScoreTopCollector;
use crate::collector::{
CustomScorer, CustomSegmentScorer, ScoreSegmentTweaker, ScoreTweaker, SegmentCollector,
};
use crate::docset::TERMINATED;
use crate::fastfield::FastFieldReader;
use crate::query::Scorer;
use crate::query::Weight;
use crate::schema::Field;
use crate::DocAddress;
use crate::DocId;
@@ -472,45 +471,52 @@ impl Collector for TopDocs {
fn collect_segment(
&self,
scorer: &mut dyn Scorer,
weight: &dyn Weight,
segment_ord: u32,
segment_reader: &SegmentReader,
reader: &SegmentReader,
) -> crate::Result<<Self::Child as SegmentCollector>::Fruit> {
let mut heap: BinaryHeap<ComparableDoc<Score, DocId>> =
BinaryHeap::with_capacity(self.0.limit + self.0.offset);
// first we fill the heap with the first `limit` elements.
let mut doc = scorer.doc();
while doc != TERMINATED && heap.len() < (self.0.limit + self.0.offset) {
if !segment_reader.is_deleted(doc) {
let score = scorer.score();
heap.push(ComparableDoc {
feature: score,
doc,
});
}
doc = scorer.advance();
}
let heap_len = self.0.limit + self.0.offset;
let mut heap: BinaryHeap<ComparableDoc<Score, DocId>> = BinaryHeap::with_capacity(heap_len);
let threshold = heap.peek().map(|el| el.feature).unwrap_or(std::f32::MIN);
if let Some(delete_bitset) = segment_reader.delete_bitset() {
scorer.for_each_pruning(threshold, &mut |doc, score| {
if delete_bitset.is_alive(doc) {
*heap.peek_mut().unwrap() = ComparableDoc {
feature: score,
doc,
};
if let Some(delete_bitset) = reader.delete_bitset() {
let mut threshold = f32::MIN;
weight.for_each_pruning(threshold, reader, &mut |doc, score| {
if delete_bitset.is_deleted(doc) {
return threshold;
}
heap.peek().map(|el| el.feature).unwrap_or(std::f32::MIN)
});
} else {
scorer.for_each_pruning(threshold, &mut |doc, score| {
*heap.peek_mut().unwrap() = ComparableDoc {
let heap_item = ComparableDoc {
feature: score,
doc,
};
if heap.len() < heap_len {
heap.push(heap_item);
if heap.len() == heap_len {
threshold = heap.peek().map(|el| el.feature).unwrap_or(f32::MIN);
}
return threshold;
}
*heap.peek_mut().unwrap() = heap_item;
threshold = heap.peek().map(|el| el.feature).unwrap_or(std::f32::MIN);
threshold
})?;
} else {
weight.for_each_pruning(f32::MIN, reader, &mut |doc, score| {
let heap_item = ComparableDoc {
feature: score,
doc,
};
if heap.len() < heap_len {
heap.push(heap_item);
// TODO the threshold is suboptimal for heap.len == heap_len
if heap.len() == heap_len {
return heap.peek().map(|el| el.feature).unwrap_or(f32::MIN);
} else {
return f32::MIN;
}
}
*heap.peek_mut().unwrap() = heap_item;
heap.peek().map(|el| el.feature).unwrap_or(std::f32::MIN)
});
})?;
}
let fruit = heap
@@ -518,7 +524,6 @@ impl Collector for TopDocs {
.into_iter()
.map(|cid| (cid.feature, DocAddress(segment_ord, cid.doc)))
.collect();
Ok(fruit)
}
}

View File

@@ -24,8 +24,10 @@ use crate::IndexWriter;
use std::borrow::BorrowMut;
use std::collections::HashSet;
use std::fmt;
#[cfg(feature = "mmap")]
use std::path::{Path, PathBuf};
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
fn load_metas(

View File

@@ -7,7 +7,6 @@ use crate::schema::FieldType;
use crate::schema::IndexRecordOption;
use crate::schema::Term;
use crate::termdict::TermDictionary;
use owned_read::OwnedRead;
/// The inverted index reader is in charge of accessing
/// the inverted index associated to a specific field.
@@ -97,8 +96,7 @@ impl InvertedIndexReader {
let offset = term_info.postings_offset as usize;
let end_source = self.postings_source.len();
let postings_slice = self.postings_source.slice(offset, end_source);
let postings_reader = OwnedRead::new(postings_slice);
block_postings.reset(term_info.doc_freq, postings_reader);
block_postings.reset(term_info.doc_freq, postings_slice);
}
/// Returns a block postings given a `Term`.
@@ -127,7 +125,7 @@ impl InvertedIndexReader {
let postings_data = self.postings_source.slice_from(offset);
BlockSegmentPostings::from_data(
term_info.doc_freq,
OwnedRead::new(postings_data),
postings_data,
self.record_option,
requested_option,
)

View File

@@ -140,8 +140,7 @@ impl Searcher {
let segment_readers = self.segment_readers();
let fruits = executor.map(
|(segment_ord, segment_reader)| {
let mut scorer = weight.scorer(segment_reader, 1.0f32)?;
collector.collect_segment(scorer.as_mut(), segment_ord as u32, segment_reader)
collector.collect_segment(weight.as_ref(), segment_ord as u32, segment_reader)
},
segment_readers.iter().enumerate(),
)?;

View File

@@ -346,7 +346,7 @@ impl IndexWriter {
fn drop_sender(&mut self) {
let (sender, _receiver) = channel::bounded(1);
mem::replace(&mut self.operation_sender, sender);
self.operation_sender = sender;
}
/// If there are some merging threads, blocks until they all finish their work and

View File

@@ -54,10 +54,6 @@ impl LogMergePolicy {
impl MergePolicy for LogMergePolicy {
fn compute_merge_candidates(&self, segments: &[SegmentMeta]) -> Vec<MergeCandidate> {
if segments.is_empty() {
return Vec::new();
}
let mut size_sorted_tuples = segments
.iter()
.map(SegmentMeta::num_docs)
@@ -67,27 +63,35 @@ impl MergePolicy for LogMergePolicy {
size_sorted_tuples.sort_by(|x, y| y.1.cmp(&(x.1)));
if size_sorted_tuples.len() <= 1 {
return Vec::new();
}
let size_sorted_log_tuples: Vec<_> = size_sorted_tuples
.into_iter()
.map(|(ind, num_docs)| (ind, f64::from(self.clip_min_size(num_docs)).log2()))
.collect();
let (first_ind, first_score) = size_sorted_log_tuples[0];
let mut current_max_log_size = first_score;
let mut levels = vec![vec![first_ind]];
for &(ind, score) in (&size_sorted_log_tuples).iter().skip(1) {
if score < (current_max_log_size - self.level_log_size) {
current_max_log_size = score;
levels.push(Vec::new());
if let Some(&(first_ind, first_score)) = size_sorted_log_tuples.first() {
let mut current_max_log_size = first_score;
let mut levels = vec![vec![first_ind]];
for &(ind, score) in (&size_sorted_log_tuples).iter().skip(1) {
if score < (current_max_log_size - self.level_log_size) {
current_max_log_size = score;
levels.push(Vec::new());
}
levels.last_mut().unwrap().push(ind);
}
levels.last_mut().unwrap().push(ind);
levels
.iter()
.filter(|level| level.len() >= self.min_merge_size)
.map(|ind_vec| {
MergeCandidate(ind_vec.iter().map(|&ind| segments[ind].id()).collect())
})
.collect()
} else {
return vec![];
}
levels
.iter()
.filter(|level| level.len() >= self.min_merge_size)
.map(|ind_vec| MergeCandidate(ind_vec.iter().map(|&ind| segments[ind].id()).collect()))
.collect()
}
}
@@ -179,6 +183,7 @@ mod tests {
let result_list = test_merge_policy().compute_merge_candidates(&test_input);
assert_eq!(result_list.len(), 2);
}
#[test]
fn test_log_merge_policy_small_segments() {
// segments under min_layer_size are merged together
@@ -194,6 +199,17 @@ mod tests {
assert_eq!(result_list.len(), 1);
}
#[test]
fn test_log_merge_policy_all_segments_too_large_to_merge() {
let eight_large_segments: Vec<SegmentMeta> =
std::iter::repeat_with(|| create_random_segment_meta(100_001))
.take(8)
.collect();
assert!(test_merge_policy()
.compute_merge_candidates(&eight_large_segments)
.is_empty());
}
#[test]
fn test_large_merge_segments() {
let test_input = vec![

View File

@@ -589,48 +589,45 @@ impl IndexMerger {
// of all of the segments containing the given term.
//
// These segments are non-empty and advance has already been called.
if !segment_postings.is_empty() {
// If not, the `term` will be entirely removed.
// We know that there is at least one document containing
// the term, so we add it.
let to_term_ord = field_serializer.new_term(term_bytes)?;
if let Some(ref mut term_ord_mapping) = term_ord_mapping_opt {
for (segment_ord, from_term_ord) in merged_terms.matching_segments() {
term_ord_mapping.register_from_to(segment_ord, from_term_ord, to_term_ord);
}
}
// We can now serialize this postings, by pushing each document to the
// postings serializer.
for (segment_ord, mut segment_postings) in segment_postings {
let old_to_new_doc_id = &merged_doc_id_map[segment_ord];
let mut doc = segment_postings.doc();
while doc != TERMINATED {
// deleted doc are skipped as they do not have a `remapped_doc_id`.
if let Some(remapped_doc_id) = old_to_new_doc_id[doc as usize] {
// we make sure to only write the term iff
// there is at least one document.
let term_freq = segment_postings.term_freq();
segment_postings.positions(&mut positions_buffer);
let delta_positions = delta_computer.compute_delta(&positions_buffer);
field_serializer.write_doc(
remapped_doc_id,
term_freq,
delta_positions,
)?;
}
doc = segment_postings.advance();
}
}
// closing the term.
field_serializer.close_term()?;
if segment_postings.is_empty() {
continue;
}
// If not, the `term` will be entirely removed.
// We know that there is at least one document containing
// the term, so we add it.
let to_term_ord = field_serializer.new_term(term_bytes)?;
if let Some(ref mut term_ord_mapping) = term_ord_mapping_opt {
for (segment_ord, from_term_ord) in merged_terms.matching_segments() {
term_ord_mapping.register_from_to(segment_ord, from_term_ord, to_term_ord);
}
}
// We can now serialize this postings, by pushing each document to the
// postings serializer.
for (segment_ord, mut segment_postings) in segment_postings {
let old_to_new_doc_id = &merged_doc_id_map[segment_ord];
let mut doc = segment_postings.doc();
while doc != TERMINATED {
// deleted doc are skipped as they do not have a `remapped_doc_id`.
if let Some(remapped_doc_id) = old_to_new_doc_id[doc as usize] {
// we make sure to only write the term iff
// there is at least one document.
let term_freq = segment_postings.term_freq();
segment_postings.positions(&mut positions_buffer);
let delta_positions = delta_computer.compute_delta(&positions_buffer);
field_serializer.write_doc(remapped_doc_id, term_freq, delta_positions)?;
}
doc = segment_postings.advance();
}
}
// closing the term.
field_serializer.close_term()?;
}
field_serializer.close()?;
Ok(term_ord_mapping_opt)

View File

@@ -37,9 +37,9 @@ const LONG_SKIP_INTERVAL: u64 = (LONG_SKIP_IN_BLOCKS * COMPRESSION_BLOCK_SIZE) a
#[cfg(test)]
pub mod tests {
use super::{PositionReader, PositionSerializer};
use super::PositionSerializer;
use crate::directory::ReadOnlySource;
use crate::positions::COMPRESSION_BLOCK_SIZE;
use crate::positions::reader::PositionReader;
use std::iter;
fn create_stream_buffer(vals: &[u32]) -> (ReadOnlySource, ReadOnlySource) {
@@ -68,7 +68,7 @@ pub mod tests {
let mut position_reader = PositionReader::new(stream, skip, 0u64);
for &n in &[1, 10, 127, 128, 130, 312] {
let mut v = vec![0u32; n];
position_reader.read(&mut v[..n]);
position_reader.read(0, &mut v[..]);
for i in 0..n {
assert_eq!(v[i], i as u32);
}
@@ -76,19 +76,19 @@ pub mod tests {
}
#[test]
fn test_position_skip() {
let v: Vec<u32> = (0..1_000).collect();
fn test_position_read_with_offset() {
let v: Vec<u32> = (0..1000).collect();
let (stream, skip) = create_stream_buffer(&v[..]);
assert_eq!(skip.len(), 12);
assert_eq!(stream.len(), 1168);
let mut position_reader = PositionReader::new(stream, skip, 0u64);
position_reader.skip(10);
for &n in &[10, 127, COMPRESSION_BLOCK_SIZE, 130, 312] {
let mut v = vec![0u32; n];
position_reader.read(&mut v[..n]);
for i in 0..n {
assert_eq!(v[i], 10u32 + i as u32);
for &offset in &[1u64, 10u64, 127u64, 128u64, 130u64, 312u64] {
for &len in &[1, 10, 130, 500] {
let mut v = vec![0u32; len];
position_reader.read(offset, &mut v[..]);
for i in 0..len {
assert_eq!(v[i], i as u32 + offset as u32);
}
}
}
}
@@ -103,11 +103,12 @@ pub mod tests {
let mut position_reader = PositionReader::new(stream, skip, 0u64);
let mut buf = [0u32; 7];
let mut c = 0;
let mut offset = 0;
for _ in 0..100 {
position_reader.read(&mut buf);
position_reader.read(&mut buf);
position_reader.skip(4);
position_reader.skip(3);
position_reader.read(offset, &mut buf);
position_reader.read(offset, &mut buf);
offset += 7;
for &el in &buf {
assert_eq!(c, el);
c += 1;
@@ -115,6 +116,58 @@ pub mod tests {
}
}
#[test]
fn test_position_reread_anchor_different_than_block() {
let v: Vec<u32> = (0..2_000_000).collect();
let (stream, skip) = create_stream_buffer(&v[..]);
assert_eq!(skip.len(), 15_749);
assert_eq!(stream.len(), 4_987_872);
let mut position_reader = PositionReader::new(stream.clone(), skip.clone(), 0);
let mut buf = [0u32; 256];
position_reader.read(128, &mut buf);
for i in 0..256 {
assert_eq!(buf[i], (128 + i) as u32);
}
position_reader.read(128, &mut buf);
for i in 0..256 {
assert_eq!(buf[i], (128 + i) as u32);
}
}
#[test]
#[should_panic(expected = "offset arguments should be increasing.")]
fn test_position_panic_if_called_previous_anchor() {
let v: Vec<u32> = (0..2_000_000).collect();
let (stream, skip) = create_stream_buffer(&v[..]);
assert_eq!(skip.len(), 15_749);
assert_eq!(stream.len(), 4_987_872);
let mut buf = [0u32; 1];
let mut position_reader = PositionReader::new(stream.clone(), skip.clone(), 200_000);
position_reader.read(230, &mut buf);
position_reader.read(9, &mut buf);
}
#[test]
fn test_positions_bug() {
let mut v: Vec<u32> = vec![];
for i in 1..200 {
for j in 0..i {
v.push(j);
}
}
let (stream, skip) = create_stream_buffer(&v[..]);
let mut buf = Vec::new();
let mut position_reader = PositionReader::new(stream.clone(), skip.clone(), 0);
let mut offset = 0;
for i in 1..24 {
buf.resize(i, 0);
position_reader.read(offset, &mut buf[..]);
offset += i as u64;
let r: Vec<u32> = (0..i).map(|el| el as u32).collect();
assert_eq!(buf, &r[..]);
}
}
#[test]
fn test_position_long_skip_const() {
const CONST_VAL: u32 = 9u32;
@@ -124,7 +177,7 @@ pub mod tests {
assert_eq!(stream.len(), 1_000_000);
let mut position_reader = PositionReader::new(stream, skip, 128 * 1024);
let mut buf = [0u32; 1];
position_reader.read(&mut buf);
position_reader.read(0, &mut buf);
assert_eq!(buf[0], CONST_VAL);
}
@@ -143,7 +196,7 @@ pub mod tests {
] {
let mut position_reader = PositionReader::new(stream.clone(), skip.clone(), offset);
let mut buf = [0u32; 1];
position_reader.read(&mut buf);
position_reader.read(0, &mut buf);
assert_eq!(buf[0], offset as u32);
}
}

View File

@@ -3,7 +3,6 @@ use crate::directory::ReadOnlySource;
use crate::positions::COMPRESSION_BLOCK_SIZE;
use crate::positions::LONG_SKIP_INTERVAL;
use crate::positions::LONG_SKIP_IN_BLOCKS;
use crate::postings::compression::compressed_block_size;
/// Positions works as a long sequence of compressed block.
/// All terms are chained one after the other.
///
@@ -62,22 +61,20 @@ impl Positions {
fn reader(&self, offset: u64) -> PositionReader {
let long_skip_id = (offset / LONG_SKIP_INTERVAL) as usize;
let small_skip = (offset % LONG_SKIP_INTERVAL) as usize;
let offset_num_bytes: u64 = self.long_skip(long_skip_id);
let mut position_read = OwnedRead::new(self.position_source.clone());
position_read.advance(offset_num_bytes as usize);
let mut skip_read = OwnedRead::new(self.skip_source.clone());
skip_read.advance(long_skip_id * LONG_SKIP_IN_BLOCKS);
let mut position_reader = PositionReader {
PositionReader {
bit_packer: self.bit_packer,
skip_read,
position_read,
inner_offset: 0,
buffer: Box::new([0u32; 128]),
ahead: None,
};
position_reader.skip(small_skip);
position_reader
block_offset: std::i64::MAX as u64,
anchor_offset: (long_skip_id as u64) * LONG_SKIP_INTERVAL,
abs_offset: offset,
}
}
}
@@ -85,51 +82,12 @@ pub struct PositionReader {
skip_read: OwnedRead,
position_read: OwnedRead,
bit_packer: BitPacker4x,
inner_offset: usize,
buffer: Box<[u32; 128]>,
ahead: Option<usize>, // if None, no block is loaded.
// if Some(num_blocks), the block currently loaded is num_blocks ahead
// of the block of the next int to read.
}
buffer: Box<[u32; COMPRESSION_BLOCK_SIZE]>,
// `ahead` represents the offset of the block currently loaded
// compared to the cursor of the actual stream.
//
// By contract, when this function is called, the current block has to be
// decompressed.
//
// If the requested number of els ends exactly at a given block, the next
// block is not decompressed.
fn read_impl(
bit_packer: BitPacker4x,
mut position: &[u8],
buffer: &mut [u32; 128],
mut inner_offset: usize,
num_bits: &[u8],
output: &mut [u32],
) -> usize {
let mut output_start = 0;
let mut output_len = output.len();
let mut ahead = 0;
loop {
let available_len = COMPRESSION_BLOCK_SIZE - inner_offset;
// We have enough elements in the current block.
// Let's copy the requested elements in the output buffer,
// and return.
if output_len <= available_len {
output[output_start..].copy_from_slice(&buffer[inner_offset..][..output_len]);
return ahead;
}
output[output_start..][..available_len].copy_from_slice(&buffer[inner_offset..]);
output_len -= available_len;
output_start += available_len;
inner_offset = 0;
let num_bits = num_bits[ahead];
bit_packer.decompress(position, &mut buffer[..], num_bits);
let block_len = compressed_block_size(num_bits);
position = &position[block_len..];
ahead += 1;
}
block_offset: u64,
anchor_offset: u64,
abs_offset: u64,
}
impl PositionReader {
@@ -141,57 +99,65 @@ impl PositionReader {
Positions::new(position_source, skip_source).reader(offset)
}
/// Fills a buffer with the next `output.len()` integers.
/// This does not consume / advance the stream.
pub fn read(&mut self, output: &mut [u32]) {
let skip_data = self.skip_read.as_ref();
let position_data = self.position_read.as_ref();
let num_bits = self.skip_read.get(0);
if self.ahead != Some(0) {
// the block currently available is not the block
// for the current position
fn advance_num_blocks(&mut self, num_blocks: usize) {
let num_bits: usize = self.skip_read.as_ref()[..num_blocks]
.iter()
.cloned()
.map(|num_bits| num_bits as usize)
.sum();
let num_bytes_to_skip = num_bits * COMPRESSION_BLOCK_SIZE / 8;
self.skip_read.advance(num_blocks as usize);
self.position_read.advance(num_bytes_to_skip);
}
/// Fills a buffer with the positions `[offset..offset+output.len())` integers.
///
/// `offset` is required to have a value >= to the offsets given in previous calls
/// for the given `PositionReaderAbsolute` instance.
pub fn read(&mut self, mut offset: u64, mut output: &mut [u32]) {
offset += self.abs_offset;
assert!(
offset >= self.anchor_offset,
"offset arguments should be increasing."
);
let delta_to_block_offset = offset as i64 - self.block_offset as i64;
if delta_to_block_offset < 0 || delta_to_block_offset >= 128 {
// The first position is not within the first block.
// We need to decompress the first block.
let delta_to_anchor_offset = offset - self.anchor_offset;
let num_blocks_to_skip =
(delta_to_anchor_offset / (COMPRESSION_BLOCK_SIZE as u64)) as usize;
self.advance_num_blocks(num_blocks_to_skip);
self.anchor_offset = offset - (offset % COMPRESSION_BLOCK_SIZE as u64);
self.block_offset = self.anchor_offset;
let num_bits = self.skip_read.get(0);
self.bit_packer
.decompress(self.position_read.as_ref(), self.buffer.as_mut(), num_bits);
} else {
let num_blocks_to_skip =
((self.block_offset - self.anchor_offset) / COMPRESSION_BLOCK_SIZE as u64) as usize;
self.advance_num_blocks(num_blocks_to_skip);
self.anchor_offset = self.block_offset;
}
let mut num_bits = self.skip_read.get(0);
let mut position_data = self.position_read.as_ref();
for i in 1.. {
let offset_in_block = (offset as usize) % COMPRESSION_BLOCK_SIZE;
let remaining_in_block = COMPRESSION_BLOCK_SIZE - offset_in_block;
if remaining_in_block >= output.len() {
output.copy_from_slice(&self.buffer[offset_in_block..][..output.len()]);
break;
}
output[..remaining_in_block].copy_from_slice(&self.buffer[offset_in_block..]);
output = &mut output[remaining_in_block..];
offset += remaining_in_block as u64;
position_data = &position_data[(num_bits as usize * COMPRESSION_BLOCK_SIZE / 8)..];
num_bits = self.skip_read.get(i);
self.bit_packer
.decompress(position_data, self.buffer.as_mut(), num_bits);
self.ahead = Some(0);
self.block_offset += COMPRESSION_BLOCK_SIZE as u64;
}
let block_len = compressed_block_size(num_bits);
self.ahead = Some(read_impl(
self.bit_packer,
&position_data[block_len..],
self.buffer.as_mut(),
self.inner_offset,
&skip_data[1..],
output,
));
}
/// Skip the next `skip_len` integer.
///
/// If a full block is skipped, calling
/// `.skip(...)` will avoid decompressing it.
///
/// May panic if the end of the stream is reached.
pub fn skip(&mut self, skip_len: usize) {
let skip_len_plus_inner_offset = skip_len + self.inner_offset;
let num_blocks_to_advance = skip_len_plus_inner_offset / COMPRESSION_BLOCK_SIZE;
self.inner_offset = skip_len_plus_inner_offset % COMPRESSION_BLOCK_SIZE;
self.ahead = self.ahead.and_then(|num_blocks| {
if num_blocks >= num_blocks_to_advance {
Some(num_blocks - num_blocks_to_advance)
} else {
None
}
});
let skip_len_in_bits = self.skip_read.as_ref()[..num_blocks_to_advance]
.iter()
.map(|num_bits| *num_bits as usize)
.sum::<usize>()
* COMPRESSION_BLOCK_SIZE;
let skip_len_in_bytes = skip_len_in_bits / 8;
self.skip_read.advance(num_blocks_to_advance);
self.position_read.advance(skip_len_in_bytes);
}
}

View File

@@ -87,6 +87,7 @@ fn exponential_search(arr: &[u32], target: u32) -> (usize, usize) {
(begin, end)
}
#[inline(never)]
fn galloping(block_docs: &[u32], target: u32) -> usize {
let (start, end) = exponential_search(&block_docs, target);
start + linear_search(&block_docs[start..end], target)
@@ -133,19 +134,14 @@ impl BlockSearcher {
///
/// Indeed, if the block is not full, the remaining items are TERMINATED.
/// It is surprisingly faster, most likely because of the lack of branch misprediction.
pub(crate) fn search_in_block(
self,
block_docs: &AlignedBuffer,
start: usize,
target: u32,
) -> usize {
pub(crate) fn search_in_block(self, block_docs: &AlignedBuffer, target: u32) -> usize {
#[cfg(target_arch = "x86_64")]
{
if self == BlockSearcher::SSE2 {
return sse2::linear_search_sse2_128(block_docs, target);
}
}
start + galloping(&block_docs.0[start..], target)
galloping(&block_docs.0[..], target)
}
}
@@ -199,12 +195,10 @@ mod tests {
assert!(block.len() < COMPRESSION_BLOCK_SIZE);
let mut output_buffer = [TERMINATED; COMPRESSION_BLOCK_SIZE];
output_buffer[..block.len()].copy_from_slice(block);
for i in 0..cursor {
assert_eq!(
block_searcher.search_in_block(&AlignedBuffer(output_buffer), i, target),
cursor
);
}
assert_eq!(
block_searcher.search_in_block(&AlignedBuffer(output_buffer), target),
cursor
);
}
fn util_test_search_in_block_all(block_searcher: BlockSearcher, block: &[u32]) {

View File

@@ -0,0 +1,427 @@
use crate::common::{BinarySerializable, VInt};
use crate::directory::ReadOnlySource;
use crate::postings::compression::{
AlignedBuffer, BlockDecoder, VIntDecoder, COMPRESSION_BLOCK_SIZE,
};
use crate::postings::{BlockInfo, FreqReadingOption, SkipReader};
use crate::schema::IndexRecordOption;
use crate::{DocId, TERMINATED};
/// `BlockSegmentPostings` is a cursor iterating over blocks
/// of documents.
///
/// # Warning
///
/// While it is useful for some very specific high-performance
/// use cases, you should prefer using `SegmentPostings` for most usage.
pub struct BlockSegmentPostings {
pub(crate) doc_decoder: BlockDecoder,
loaded_offset: usize,
freq_decoder: BlockDecoder,
freq_reading_option: FreqReadingOption,
doc_freq: usize,
data: ReadOnlySource,
skip_reader: SkipReader,
}
fn decode_bitpacked_block(
doc_decoder: &mut BlockDecoder,
freq_decoder_opt: Option<&mut BlockDecoder>,
data: &[u8],
doc_offset: DocId,
doc_num_bits: u8,
tf_num_bits: u8,
) {
let num_consumed_bytes = doc_decoder.uncompress_block_sorted(data, doc_offset, doc_num_bits);
if let Some(freq_decoder) = freq_decoder_opt {
freq_decoder.uncompress_block_unsorted(&data[num_consumed_bytes..], tf_num_bits);
}
}
fn decode_vint_block(
doc_decoder: &mut BlockDecoder,
freq_decoder_opt: Option<&mut BlockDecoder>,
data: &[u8],
doc_offset: DocId,
num_vint_docs: usize,
) {
doc_decoder.clear();
let num_consumed_bytes = doc_decoder.uncompress_vint_sorted(data, doc_offset, num_vint_docs);
if let Some(freq_decoder) = freq_decoder_opt {
freq_decoder.uncompress_vint_unsorted(&data[num_consumed_bytes..], num_vint_docs);
}
}
fn split_into_skips_and_postings(
doc_freq: u32,
data: ReadOnlySource,
) -> (Option<ReadOnlySource>, ReadOnlySource) {
if doc_freq < COMPRESSION_BLOCK_SIZE as u32 {
return (None, data);
}
let mut data_byte_arr = data.as_slice();
let skip_len = VInt::deserialize(&mut data_byte_arr)
.expect("Data corrupted")
.0 as usize;
let vint_len = data.len() - data_byte_arr.len();
let (skip_data, postings_data) = data.slice_from(vint_len).split(skip_len);
(Some(skip_data), postings_data)
}
impl BlockSegmentPostings {
pub(crate) fn from_data(
doc_freq: u32,
data: ReadOnlySource,
record_option: IndexRecordOption,
requested_option: IndexRecordOption,
) -> BlockSegmentPostings {
let freq_reading_option = match (record_option, requested_option) {
(IndexRecordOption::Basic, _) => FreqReadingOption::NoFreq,
(_, IndexRecordOption::Basic) => FreqReadingOption::SkipFreq,
(_, _) => FreqReadingOption::ReadFreq,
};
let (skip_data_opt, postings_data) = split_into_skips_and_postings(doc_freq, data);
let skip_reader = match skip_data_opt {
Some(skip_data) => SkipReader::new(skip_data, doc_freq, record_option),
None => SkipReader::new(ReadOnlySource::empty(), doc_freq, record_option),
};
let doc_freq = doc_freq as usize;
let mut block_segment_postings = BlockSegmentPostings {
doc_decoder: BlockDecoder::with_val(TERMINATED),
loaded_offset: std::usize::MAX,
freq_decoder: BlockDecoder::with_val(1),
freq_reading_option,
doc_freq,
data: postings_data,
skip_reader,
};
block_segment_postings.advance();
block_segment_postings
}
// Resets the block segment postings on another position
// in the postings file.
//
// This is useful for enumerating through a list of terms,
// and consuming the associated posting lists while avoiding
// reallocating a `BlockSegmentPostings`.
//
// # Warning
//
// This does not reset the positions list.
pub(crate) fn reset(&mut self, doc_freq: u32, postings_data: ReadOnlySource) {
let (skip_data_opt, postings_data) = split_into_skips_and_postings(doc_freq, postings_data);
self.data = ReadOnlySource::new(postings_data);
self.loaded_offset = std::usize::MAX;
self.loaded_offset = std::usize::MAX;
if let Some(skip_data) = skip_data_opt {
self.skip_reader.reset(skip_data, doc_freq);
} else {
self.skip_reader.reset(ReadOnlySource::empty(), doc_freq);
}
self.doc_freq = doc_freq as usize;
}
/// Returns the document frequency associated to this block postings.
///
/// This `doc_freq` is simply the sum of the length of all of the blocks
/// length, and it does not take in account deleted documents.
pub fn doc_freq(&self) -> usize {
self.doc_freq
}
/// Returns the array of docs in the current block.
///
/// Before the first call to `.advance()`, the block
/// returned by `.docs()` is empty.
#[inline]
pub fn docs(&self) -> &[DocId] {
self.doc_decoder.output_array()
}
#[inline(always)]
pub(crate) fn docs_aligned(&self) -> &AlignedBuffer {
self.doc_decoder.output_aligned()
}
/// Return the document at index `idx` of the block.
#[inline(always)]
pub fn doc(&self, idx: usize) -> u32 {
self.doc_decoder.output(idx)
}
/// Return the array of `term freq` in the block.
#[inline]
pub fn freqs(&self) -> &[u32] {
self.freq_decoder.output_array()
}
/// Return the frequency at index `idx` of the block.
#[inline]
pub fn freq(&self, idx: usize) -> u32 {
self.freq_decoder.output(idx)
}
/// Returns the length of the current block.
///
/// All blocks have a length of `NUM_DOCS_PER_BLOCK`,
/// except the last block that may have a length
/// of any number between 1 and `NUM_DOCS_PER_BLOCK - 1`
#[inline]
pub fn block_len(&self) -> usize {
self.doc_decoder.output_len
}
pub(crate) fn position_offset(&self) -> u64 {
self.skip_reader.position_offset()
}
/// Position on a block that may contains `target_doc`.
///
/// If all docs are smaller than target, the block loaded may be empty,
/// or be the last an incomplete VInt block.
pub fn seek(&mut self, target_doc: DocId) {
self.skip_reader.seek(target_doc);
self.load_block();
}
fn load_block(&mut self) {
let offset = self.skip_reader.byte_offset();
if self.loaded_offset == offset {
return;
}
self.loaded_offset = offset;
match self.skip_reader.block_info() {
BlockInfo::BitPacked {
doc_num_bits,
tf_num_bits,
..
} => {
decode_bitpacked_block(
&mut self.doc_decoder,
if let FreqReadingOption::ReadFreq = self.freq_reading_option {
Some(&mut self.freq_decoder)
} else {
None
},
&self.data.as_slice()[offset..],
self.skip_reader.last_doc_in_previous_block,
doc_num_bits,
tf_num_bits,
);
}
BlockInfo::VInt(num_vint_docs) => {
decode_vint_block(
&mut self.doc_decoder,
if let FreqReadingOption::ReadFreq = self.freq_reading_option {
Some(&mut self.freq_decoder)
} else {
None
},
&self.data.as_slice()[offset..],
self.skip_reader.last_doc_in_previous_block,
num_vint_docs as usize,
);
}
}
}
/// Advance to the next block.
///
/// Returns false iff there was no remaining blocks.
pub fn advance(&mut self) -> bool {
if !self.skip_reader.advance() {
return false;
}
self.load_block();
true
}
/// Returns an empty segment postings object
pub fn empty() -> BlockSegmentPostings {
BlockSegmentPostings {
doc_decoder: BlockDecoder::with_val(TERMINATED),
loaded_offset: std::usize::MAX,
freq_decoder: BlockDecoder::with_val(1),
freq_reading_option: FreqReadingOption::NoFreq,
doc_freq: 0,
data: ReadOnlySource::new(vec![]),
skip_reader: SkipReader::new(ReadOnlySource::new(vec![]), 0, IndexRecordOption::Basic),
}
}
}
#[cfg(test)]
mod tests {
use super::BlockSegmentPostings;
use crate::common::HasLen;
use crate::core::Index;
use crate::docset::{DocSet, TERMINATED};
use crate::postings::compression::COMPRESSION_BLOCK_SIZE;
use crate::postings::postings::Postings;
use crate::postings::SegmentPostings;
use crate::schema::IndexRecordOption;
use crate::schema::Schema;
use crate::schema::Term;
use crate::schema::INDEXED;
use crate::DocId;
#[test]
fn test_empty_segment_postings() {
let mut postings = SegmentPostings::empty();
assert_eq!(postings.advance(), TERMINATED);
assert_eq!(postings.advance(), TERMINATED);
assert_eq!(postings.len(), 0);
}
#[test]
fn test_empty_postings_doc_returns_terminated() {
let mut postings = SegmentPostings::empty();
assert_eq!(postings.doc(), TERMINATED);
assert_eq!(postings.advance(), TERMINATED);
}
#[test]
fn test_empty_postings_doc_term_freq_returns_0() {
let postings = SegmentPostings::empty();
assert_eq!(postings.term_freq(), 1);
}
#[test]
fn test_empty_block_segment_postings() {
let mut postings = BlockSegmentPostings::empty();
assert!(!postings.advance());
assert_eq!(postings.doc_freq(), 0);
}
#[test]
fn test_block_segment_postings() {
let mut block_segments = build_block_postings(&(0..100_000).collect::<Vec<u32>>());
let mut offset: u32 = 0u32;
// checking that the `doc_freq` is correct
assert_eq!(block_segments.doc_freq(), 100_000);
loop {
let block = block_segments.docs();
for (i, doc) in block.iter().cloned().enumerate() {
assert_eq!(offset + (i as u32), doc);
}
offset += block.len() as u32;
if block_segments.advance() {
break;
}
}
}
#[test]
fn test_skip_right_at_new_block() {
let mut doc_ids = (0..128).collect::<Vec<u32>>();
// 128 is missing
doc_ids.push(129);
doc_ids.push(130);
{
let block_segments = build_block_postings(&doc_ids);
let mut docset = SegmentPostings::from_block_postings(block_segments, None);
assert_eq!(docset.seek(128), 129);
assert_eq!(docset.doc(), 129);
assert_eq!(docset.advance(), 130);
assert_eq!(docset.doc(), 130);
assert_eq!(docset.advance(), TERMINATED);
}
{
let block_segments = build_block_postings(&doc_ids);
let mut docset = SegmentPostings::from_block_postings(block_segments, None);
assert_eq!(docset.seek(129), 129);
assert_eq!(docset.doc(), 129);
assert_eq!(docset.advance(), 130);
assert_eq!(docset.doc(), 130);
assert_eq!(docset.advance(), TERMINATED);
}
{
let block_segments = build_block_postings(&doc_ids);
let mut docset = SegmentPostings::from_block_postings(block_segments, None);
assert_eq!(docset.doc(), 0);
assert_eq!(docset.seek(131), TERMINATED);
assert_eq!(docset.doc(), TERMINATED);
}
}
fn build_block_postings(docs: &[DocId]) -> BlockSegmentPostings {
let mut schema_builder = Schema::builder();
let int_field = schema_builder.add_u64_field("id", INDEXED);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
let mut last_doc = 0u32;
for &doc in docs {
for _ in last_doc..doc {
index_writer.add_document(doc!(int_field=>1u64));
}
index_writer.add_document(doc!(int_field=>0u64));
last_doc = doc + 1;
}
index_writer.commit().unwrap();
let searcher = index.reader().unwrap().searcher();
let segment_reader = searcher.segment_reader(0);
let inverted_index = segment_reader.inverted_index(int_field);
let term = Term::from_field_u64(int_field, 0u64);
let term_info = inverted_index.get_term_info(&term).unwrap();
inverted_index.read_block_postings_from_terminfo(&term_info, IndexRecordOption::Basic)
}
#[test]
fn test_block_segment_postings_skip2() {
let mut docs = vec![0];
for i in 0..1300 {
docs.push((i * i / 100) + i);
}
let mut block_postings = build_block_postings(&docs[..]);
for i in vec![0, 424, 10000] {
block_postings.seek(i);
let docs = block_postings.docs();
assert!(docs[0] <= i);
assert!(docs.last().cloned().unwrap_or(0u32) >= i);
}
block_postings.seek(100_000);
assert_eq!(block_postings.doc(COMPRESSION_BLOCK_SIZE - 1), TERMINATED);
}
#[test]
fn test_reset_block_segment_postings() {
let mut schema_builder = Schema::builder();
let int_field = schema_builder.add_u64_field("id", INDEXED);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
// create two postings list, one containg even number,
// the other containing odd numbers.
for i in 0..6 {
let doc = doc!(int_field=> (i % 2) as u64);
index_writer.add_document(doc);
}
index_writer.commit().unwrap();
let searcher = index.reader().unwrap().searcher();
let segment_reader = searcher.segment_reader(0);
let mut block_segments;
{
let term = Term::from_field_u64(int_field, 0u64);
let inverted_index = segment_reader.inverted_index(int_field);
let term_info = inverted_index.get_term_info(&term).unwrap();
block_segments = inverted_index
.read_block_postings_from_terminfo(&term_info, IndexRecordOption::Basic);
}
assert_eq!(block_segments.docs(), &[0, 2, 4]);
{
let term = Term::from_field_u64(int_field, 1u64);
let inverted_index = segment_reader.inverted_index(int_field);
let term_info = inverted_index.get_term_info(&term).unwrap();
inverted_index.reset_block_postings_from_terminfo(&term_info, &mut block_segments);
}
assert!(block_segments.advance());
assert_eq!(block_segments.docs(), &[1, 3, 5]);
}
}

View File

@@ -18,6 +18,12 @@ pub struct BlockEncoder {
pub output_len: usize,
}
impl Default for BlockEncoder {
fn default() -> Self {
BlockEncoder::new()
}
}
impl BlockEncoder {
pub fn new() -> BlockEncoder {
BlockEncoder {
@@ -55,11 +61,13 @@ pub struct BlockDecoder {
pub output_len: usize,
}
impl BlockDecoder {
pub fn new() -> BlockDecoder {
impl Default for BlockDecoder {
fn default() -> Self {
BlockDecoder::with_val(0u32)
}
}
impl BlockDecoder {
pub fn with_val(val: u32) -> BlockDecoder {
BlockDecoder {
bitpacker: BitPacker4x::new(),
@@ -175,7 +183,7 @@ impl VIntDecoder for BlockDecoder {
vint::uncompress_sorted(compressed_data, &mut self.output.0[..num_els], offset)
}
fn uncompress_vint_unsorted<'a>(&mut self, compressed_data: &'a [u8], num_els: usize) -> usize {
fn uncompress_vint_unsorted(&mut self, compressed_data: &[u8], num_els: usize) -> usize {
self.output_len = num_els;
vint::uncompress_unsorted(compressed_data, &mut self.output.0[..num_els])
}
@@ -191,7 +199,7 @@ pub mod tests {
let vals: Vec<u32> = (0u32..128u32).map(|i| i * 7).collect();
let mut encoder = BlockEncoder::new();
let (num_bits, compressed_data) = encoder.compress_block_sorted(&vals, 0);
let mut decoder = BlockDecoder::new();
let mut decoder = BlockDecoder::default();
{
let consumed_num_bytes = decoder.uncompress_block_sorted(compressed_data, 0, num_bits);
assert_eq!(consumed_num_bytes, compressed_data.len());
@@ -204,9 +212,9 @@ pub mod tests {
#[test]
fn test_encode_sorted_block_with_offset() {
let vals: Vec<u32> = (0u32..128u32).map(|i| 11 + i * 7).collect();
let mut encoder = BlockEncoder::new();
let mut encoder = BlockEncoder::default();
let (num_bits, compressed_data) = encoder.compress_block_sorted(&vals, 10);
let mut decoder = BlockDecoder::new();
let mut decoder = BlockDecoder::default();
{
let consumed_num_bytes = decoder.uncompress_block_sorted(compressed_data, 10, num_bits);
assert_eq!(consumed_num_bytes, compressed_data.len());
@@ -221,11 +229,11 @@ pub mod tests {
let mut compressed: Vec<u8> = Vec::new();
let n = 128;
let vals: Vec<u32> = (0..n).map(|i| 11u32 + (i as u32) * 7u32).collect();
let mut encoder = BlockEncoder::new();
let mut encoder = BlockEncoder::default();
let (num_bits, compressed_data) = encoder.compress_block_sorted(&vals, 10);
compressed.extend_from_slice(compressed_data);
compressed.push(173u8);
let mut decoder = BlockDecoder::new();
let mut decoder = BlockDecoder::default();
{
let consumed_num_bytes = decoder.uncompress_block_sorted(&compressed, 10, num_bits);
assert_eq!(consumed_num_bytes, compressed.len() - 1);
@@ -241,11 +249,11 @@ pub mod tests {
let mut compressed: Vec<u8> = Vec::new();
let n = 128;
let vals: Vec<u32> = (0..n).map(|i| 11u32 + (i as u32) * 7u32 % 12).collect();
let mut encoder = BlockEncoder::new();
let mut encoder = BlockEncoder::default();
let (num_bits, compressed_data) = encoder.compress_block_unsorted(&vals);
compressed.extend_from_slice(compressed_data);
compressed.push(173u8);
let mut decoder = BlockDecoder::new();
let mut decoder = BlockDecoder::default();
{
let consumed_num_bytes = decoder.uncompress_block_unsorted(&compressed, num_bits);
assert_eq!(consumed_num_bytes + 1, compressed.len());
@@ -256,6 +264,11 @@ pub mod tests {
}
}
#[test]
fn test_block_decoder_initialization() {
let block = BlockDecoder::with_val(TERMINATED);
assert_eq!(block.output(0), TERMINATED);
}
#[test]
fn test_encode_vint() {
{
@@ -265,7 +278,7 @@ pub mod tests {
for offset in &[0u32, 1u32, 2u32] {
let encoded_data = encoder.compress_vint_sorted(&input, *offset);
assert!(encoded_data.len() <= expected_length);
let mut decoder = BlockDecoder::new();
let mut decoder = BlockDecoder::default();
let consumed_num_bytes =
decoder.uncompress_vint_sorted(&encoded_data, *offset, input.len());
assert_eq!(consumed_num_bytes, encoded_data.len());

View File

@@ -3,11 +3,8 @@ Postings module (also called inverted index)
*/
mod block_search;
mod block_segment_postings;
pub(crate) mod compression;
/// Postings module
///
/// Postings, also called inverted lists, is the key datastructure
/// to full-text search.
mod postings;
mod postings_writer;
mod recorder;
@@ -22,18 +19,17 @@ pub(crate) use self::block_search::BlockSearcher;
pub(crate) use self::postings_writer::MultiFieldPostingsWriter;
pub use self::serializer::{FieldSerializer, InvertedIndexSerializer};
use self::compression::COMPRESSION_BLOCK_SIZE;
pub use self::postings::Postings;
pub(crate) use self::skip::SkipReader;
pub(crate) use self::skip::{BlockInfo, SkipReader};
pub use self::term_info::TermInfo;
pub use self::segment_postings::{BlockSegmentPostings, SegmentPostings};
pub use self::block_segment_postings::BlockSegmentPostings;
pub use self::segment_postings::SegmentPostings;
pub(crate) use self::stacker::compute_table_size;
pub use crate::common::HasLen;
pub(crate) const USE_SKIP_INFO_LIMIT: u32 = COMPRESSION_BLOCK_SIZE as u32;
pub(crate) type UnorderedTermId = u64;
#[cfg_attr(feature = "cargo-clippy", allow(clippy::enum_variant_names))]

View File

@@ -1,54 +1,19 @@
use crate::common::HasLen;
use crate::common::{BinarySerializable, VInt};
use crate::docset::{DocSet, TERMINATED};
use crate::docset::DocSet;
use crate::positions::PositionReader;
use crate::postings::compression::{compressed_block_size, AlignedBuffer};
use crate::postings::compression::{BlockDecoder, VIntDecoder, COMPRESSION_BLOCK_SIZE};
use crate::postings::compression::COMPRESSION_BLOCK_SIZE;
use crate::postings::serializer::PostingsSerializer;
use crate::postings::BlockSearcher;
use crate::postings::FreqReadingOption;
use crate::postings::Postings;
use crate::postings::SkipReader;
use crate::postings::USE_SKIP_INFO_LIMIT;
use crate::schema::IndexRecordOption;
use crate::DocId;
use owned_read::OwnedRead;
use tantivy_fst::Streamer;
struct PositionComputer {
// store the amount of position int
// before reading positions.
//
// if none, position are already loaded in
// the positions vec.
position_to_skip: usize,
position_reader: PositionReader,
}
impl PositionComputer {
pub fn new(position_reader: PositionReader) -> PositionComputer {
PositionComputer {
position_to_skip: 0,
position_reader,
}
}
pub fn add_skip(&mut self, num_skip: usize) {
self.position_to_skip += num_skip;
}
// Positions can only be read once.
pub fn positions_with_offset(&mut self, offset: u32, output: &mut [u32]) {
self.position_reader.skip(self.position_to_skip);
self.position_to_skip = 0;
self.position_reader.read(output);
let mut cum = offset;
for output_mut in output.iter_mut() {
cum += *output_mut;
*output_mut = cum;
}
}
}
use crate::directory::ReadOnlySource;
use crate::postings::BlockSegmentPostings;
/// `SegmentPostings` represents the inverted list or postings associated to
/// a term in a `Segment`.
@@ -58,22 +23,19 @@ impl PositionComputer {
pub struct SegmentPostings {
block_cursor: BlockSegmentPostings,
cur: usize,
position_computer: Option<PositionComputer>,
position_reader: Option<PositionReader>,
block_searcher: BlockSearcher,
}
impl SegmentPostings {
/// Returns an empty segment postings object
pub fn empty() -> Self {
let empty_block_cursor = BlockSegmentPostings::empty();
let mut segment_postings = SegmentPostings {
block_cursor: empty_block_cursor,
cur: COMPRESSION_BLOCK_SIZE,
position_computer: None,
SegmentPostings {
block_cursor: BlockSegmentPostings::empty(),
cur: 0,
position_reader: None,
block_searcher: BlockSearcher::default(),
};
segment_postings.advance();
segment_postings
}
}
/// Creates a segment postings object with the given documents
@@ -97,15 +59,13 @@ impl SegmentPostings {
}
let block_segment_postings = BlockSegmentPostings::from_data(
docs.len() as u32,
OwnedRead::new(buffer),
ReadOnlySource::from(buffer),
IndexRecordOption::Basic,
IndexRecordOption::Basic,
);
SegmentPostings::from_block_postings(block_segment_postings, None)
}
}
impl SegmentPostings {
/// Reads a Segment postings from an &[u8]
///
/// * `len` - number of document in the posting lists.
@@ -114,16 +74,14 @@ impl SegmentPostings {
/// frequencies and/or positions
pub(crate) fn from_block_postings(
segment_block_postings: BlockSegmentPostings,
positions_stream_opt: Option<PositionReader>,
position_reader: Option<PositionReader>,
) -> SegmentPostings {
let mut postings = SegmentPostings {
SegmentPostings {
block_cursor: segment_block_postings,
cur: COMPRESSION_BLOCK_SIZE, // cursor within the block
position_computer: positions_stream_opt.map(PositionComputer::new),
cur: 0, // cursor within the block
position_reader,
block_searcher: BlockSearcher::default(),
};
postings.advance();
postings
}
}
}
@@ -132,91 +90,43 @@ impl DocSet for SegmentPostings {
// next needs to be called a first time to point to the correct element.
#[inline]
fn advance(&mut self) -> DocId {
if self.position_computer.is_some() && self.cur < COMPRESSION_BLOCK_SIZE {
let term_freq = self.term_freq() as usize;
if let Some(position_computer) = self.position_computer.as_mut() {
position_computer.add_skip(term_freq);
}
}
self.cur += 1;
if self.cur >= self.block_cursor.block_len() {
if self.block_cursor.advance() {
self.cur = 0;
} else {
self.cur = COMPRESSION_BLOCK_SIZE - 1;
return TERMINATED;
}
if self.cur == COMPRESSION_BLOCK_SIZE - 1 {
self.cur = 0;
self.block_cursor.advance();
} else {
self.cur += 1;
}
self.doc()
}
fn seek(&mut self, target: DocId) -> DocId {
if self.doc() >= target {
return self.doc();
}
// In the following, thanks to the call to advance above,
// we know that the position is not loaded and we need
// to skip every doc_freq we cross.
// skip blocks until one that might contain the target
// check if we need to go to the next block
let mut sum_freqs_skipped: u32 = 0;
if self
.block_cursor
.docs()
.last()
.map(|&doc| doc < target)
.unwrap_or(true)
{
// We are not in the right block.
if self.position_computer.is_some() {
// First compute all of the freqs skipped from the current block.
sum_freqs_skipped = self.block_cursor.freqs()[self.cur..].iter().sum::<u32>();
match self.block_cursor.skip_to(target) {
BlockSegmentPostingsSkipResult::Success(block_skip_freqs) => {
sum_freqs_skipped += block_skip_freqs;
}
BlockSegmentPostingsSkipResult::Terminated => {
self.block_cursor.doc_decoder.clear();
self.cur = 0;
return TERMINATED;
}
}
} else if self.block_cursor.skip_to(target)
== BlockSegmentPostingsSkipResult::Terminated
{
// no positions needed. no need to sum freqs.
self.block_cursor.doc_decoder.clear();
self.cur = 0;
return TERMINATED;
}
self.cur = 0;
if self.doc() == target {
return target;
}
self.block_cursor.seek(target);
// At this point we are on the block, that might contain our document.
let cur = self.cur;
let output = self.block_cursor.docs_aligned();
let new_cur = self.block_searcher.search_in_block(&output, cur, target);
if let Some(position_computer) = self.position_computer.as_mut() {
sum_freqs_skipped += self.block_cursor.freqs()[cur..new_cur].iter().sum::<u32>();
position_computer.add_skip(sum_freqs_skipped as usize);
}
self.cur = new_cur;
self.cur = self.block_searcher.search_in_block(&output, target);
// The last block is not full and padded with the value TERMINATED,
// so that we are guaranteed to have at least doc in the block (a real one or the padding)
// that is greater or equal to the target.
debug_assert!(self.cur < COMPRESSION_BLOCK_SIZE);
// `doc` is now the first element >= `target`
let doc = output.0[new_cur];
// If all docs are smaller than target the current block should be incomplemented and padded
// with the value `TERMINATED`.
//
// After the search, the cursor should point to the first value of TERMINATED.
let doc = output.0[self.cur];
debug_assert!(doc >= target);
doc
}
/// Return the current document's `DocId`.
///
/// # Panics
///
/// Will panics if called without having called advance before.
#[inline]
fn doc(&self) -> DocId {
self.block_cursor.doc(self.cur)
@@ -225,23 +135,6 @@ impl DocSet for SegmentPostings {
fn size_hint(&self) -> u32 {
self.len() as u32
}
/*
fn append_to_bitset(&mut self, bitset: &mut BitSet) {
// finish the current block
if self.advance() {
for &doc in &self.block_cursor.docs()[self.cur..] {
bitset.insert(doc);
}
// ... iterate through the remaining blocks.
while self.block_cursor.advance() {
for &doc in self.block_cursor.docs() {
bitset.insert(doc);
}
}
}
}
*/
}
impl HasLen for SegmentPostings {
@@ -275,338 +168,33 @@ impl Postings for SegmentPostings {
fn positions_with_offset(&mut self, offset: u32, output: &mut Vec<u32>) {
let term_freq = self.term_freq() as usize;
if let Some(position_comp) = self.position_computer.as_mut() {
if let Some(position_reader) = self.position_reader.as_mut() {
let read_offset = self.block_cursor.position_offset()
+ (self.block_cursor.freqs()[..self.cur]
.iter()
.cloned()
.sum::<u32>() as u64);
output.resize(term_freq, 0u32);
position_comp.positions_with_offset(offset, &mut output[..]);
position_reader.read(read_offset, &mut output[..]);
let mut cum = offset;
for output_mut in output.iter_mut() {
cum += *output_mut;
*output_mut = cum;
}
} else {
output.clear();
}
}
}
/// `BlockSegmentPostings` is a cursor iterating over blocks
/// of documents.
///
/// # Warning
///
/// While it is useful for some very specific high-performance
/// use cases, you should prefer using `SegmentPostings` for most usage.
pub struct BlockSegmentPostings {
doc_decoder: BlockDecoder,
freq_decoder: BlockDecoder,
freq_reading_option: FreqReadingOption,
doc_freq: usize,
doc_offset: DocId,
num_vint_docs: usize,
remaining_data: OwnedRead,
skip_reader: SkipReader,
}
fn split_into_skips_and_postings(
doc_freq: u32,
mut data: OwnedRead,
) -> (Option<OwnedRead>, OwnedRead) {
if doc_freq < USE_SKIP_INFO_LIMIT {
return (None, data);
}
let skip_len = VInt::deserialize(&mut data).expect("Data corrupted").0 as usize;
let mut postings_data = data.clone();
postings_data.advance(skip_len);
data.clip(skip_len);
(Some(data), postings_data)
}
#[derive(Debug, Eq, PartialEq)]
pub enum BlockSegmentPostingsSkipResult {
Terminated,
Success(u32), //< number of term freqs to skip
}
impl BlockSegmentPostings {
pub(crate) fn from_data(
doc_freq: u32,
data: OwnedRead,
record_option: IndexRecordOption,
requested_option: IndexRecordOption,
) -> BlockSegmentPostings {
let freq_reading_option = match (record_option, requested_option) {
(IndexRecordOption::Basic, _) => FreqReadingOption::NoFreq,
(_, IndexRecordOption::Basic) => FreqReadingOption::SkipFreq,
(_, _) => FreqReadingOption::ReadFreq,
};
let (skip_data_opt, postings_data) = split_into_skips_and_postings(doc_freq, data);
let skip_reader = match skip_data_opt {
Some(skip_data) => SkipReader::new(skip_data, record_option),
None => SkipReader::new(OwnedRead::new(&[][..]), record_option),
};
let doc_freq = doc_freq as usize;
let num_vint_docs = doc_freq % COMPRESSION_BLOCK_SIZE;
BlockSegmentPostings {
num_vint_docs,
doc_decoder: BlockDecoder::new(),
freq_decoder: BlockDecoder::with_val(1),
freq_reading_option,
doc_offset: 0,
doc_freq,
remaining_data: postings_data,
skip_reader,
}
}
// Resets the block segment postings on another position
// in the postings file.
//
// This is useful for enumerating through a list of terms,
// and consuming the associated posting lists while avoiding
// reallocating a `BlockSegmentPostings`.
//
// # Warning
//
// This does not reset the positions list.
pub(crate) fn reset(&mut self, doc_freq: u32, postings_data: OwnedRead) {
let (skip_data_opt, postings_data) = split_into_skips_and_postings(doc_freq, postings_data);
let num_vint_docs = (doc_freq as usize) & (COMPRESSION_BLOCK_SIZE - 1);
self.num_vint_docs = num_vint_docs;
self.remaining_data = postings_data;
if let Some(skip_data) = skip_data_opt {
self.skip_reader.reset(skip_data);
} else {
self.skip_reader.reset(OwnedRead::new(&[][..]))
}
self.doc_offset = 0;
self.doc_freq = doc_freq as usize;
}
/// Returns the document frequency associated to this block postings.
///
/// This `doc_freq` is simply the sum of the length of all of the blocks
/// length, and it does not take in account deleted documents.
pub fn doc_freq(&self) -> usize {
self.doc_freq
}
/// Returns the array of docs in the current block.
///
/// Before the first call to `.advance()`, the block
/// returned by `.docs()` is empty.
#[inline]
pub fn docs(&self) -> &[DocId] {
self.doc_decoder.output_array()
}
pub(crate) fn docs_aligned(&self) -> &AlignedBuffer {
self.doc_decoder.output_aligned()
}
/// Return the document at index `idx` of the block.
#[inline]
pub fn doc(&self, idx: usize) -> u32 {
self.doc_decoder.output(idx)
}
/// Return the array of `term freq` in the block.
#[inline]
pub fn freqs(&self) -> &[u32] {
self.freq_decoder.output_array()
}
/// Return the frequency at index `idx` of the block.
#[inline]
pub fn freq(&self, idx: usize) -> u32 {
self.freq_decoder.output(idx)
}
/// Returns the length of the current block.
///
/// All blocks have a length of `NUM_DOCS_PER_BLOCK`,
/// except the last block that may have a length
/// of any number between 1 and `NUM_DOCS_PER_BLOCK - 1`
#[inline]
fn block_len(&self) -> usize {
self.doc_decoder.output_len
}
/// position on a block that may contains `doc_id`.
/// Always advance the current block.
///
/// Returns true if a block that has an element greater or equal to the target is found.
/// Returning true does not guarantee that the smallest element of the block is smaller
/// than the target. It only guarantees that the last element is greater or equal.
///
/// Returns false iff all of the document remaining are smaller than
/// `doc_id`. In that case, all of these document are consumed.
///
pub fn skip_to(&mut self, target_doc: DocId) -> BlockSegmentPostingsSkipResult {
let mut skip_freqs = 0u32;
while self.skip_reader.advance() {
if self.skip_reader.doc() >= target_doc {
// the last document of the current block is larger
// than the target.
//
// We found our block!
let num_bits = self.skip_reader.doc_num_bits();
let num_consumed_bytes = self.doc_decoder.uncompress_block_sorted(
self.remaining_data.as_ref(),
self.doc_offset,
num_bits,
);
self.remaining_data.advance(num_consumed_bytes);
let tf_num_bits = self.skip_reader.tf_num_bits();
match self.freq_reading_option {
FreqReadingOption::NoFreq => {}
FreqReadingOption::SkipFreq => {
let num_bytes_to_skip = compressed_block_size(tf_num_bits);
self.remaining_data.advance(num_bytes_to_skip);
}
FreqReadingOption::ReadFreq => {
let num_consumed_bytes = self
.freq_decoder
.uncompress_block_unsorted(self.remaining_data.as_ref(), tf_num_bits);
self.remaining_data.advance(num_consumed_bytes);
}
}
self.doc_offset = self.skip_reader.doc();
return BlockSegmentPostingsSkipResult::Success(skip_freqs);
} else {
skip_freqs += self.skip_reader.tf_sum();
let advance_len = self.skip_reader.total_block_len();
self.doc_offset = self.skip_reader.doc();
self.remaining_data.advance(advance_len);
}
}
self.doc_decoder.clear();
if self.num_vint_docs == 0 {
return BlockSegmentPostingsSkipResult::Terminated;
}
// we are now on the last, incomplete, variable encoded block.
let num_compressed_bytes = self.doc_decoder.uncompress_vint_sorted(
self.remaining_data.as_ref(),
self.doc_offset,
self.num_vint_docs,
);
self.remaining_data.advance(num_compressed_bytes);
match self.freq_reading_option {
FreqReadingOption::NoFreq | FreqReadingOption::SkipFreq => {}
FreqReadingOption::ReadFreq => {
self.freq_decoder
.uncompress_vint_unsorted(self.remaining_data.as_ref(), self.num_vint_docs);
}
}
self.num_vint_docs = 0;
self.docs()
.last()
.map(|last_doc| {
if *last_doc >= target_doc {
BlockSegmentPostingsSkipResult::Success(skip_freqs)
} else {
BlockSegmentPostingsSkipResult::Terminated
}
})
.unwrap_or(BlockSegmentPostingsSkipResult::Terminated)
}
/// Advance to the next block.
///
/// Returns false iff there was no remaining blocks.
pub fn advance(&mut self) -> bool {
if self.skip_reader.advance() {
let num_bits = self.skip_reader.doc_num_bits();
let num_consumed_bytes = self.doc_decoder.uncompress_block_sorted(
self.remaining_data.as_ref(),
self.doc_offset,
num_bits,
);
self.remaining_data.advance(num_consumed_bytes);
let tf_num_bits = self.skip_reader.tf_num_bits();
match self.freq_reading_option {
FreqReadingOption::NoFreq => {}
FreqReadingOption::SkipFreq => {
let num_bytes_to_skip = compressed_block_size(tf_num_bits);
self.remaining_data.advance(num_bytes_to_skip);
}
FreqReadingOption::ReadFreq => {
let num_consumed_bytes = self
.freq_decoder
.uncompress_block_unsorted(self.remaining_data.as_ref(), tf_num_bits);
self.remaining_data.advance(num_consumed_bytes);
}
}
// it will be used as the next offset.
self.doc_offset = self.doc_decoder.output(COMPRESSION_BLOCK_SIZE - 1);
return true;
}
self.doc_decoder.clear();
if self.num_vint_docs == 0 {
return false;
}
let num_compressed_bytes = self.doc_decoder.uncompress_vint_sorted(
self.remaining_data.as_ref(),
self.doc_offset,
self.num_vint_docs,
);
self.remaining_data.advance(num_compressed_bytes);
match self.freq_reading_option {
FreqReadingOption::NoFreq | FreqReadingOption::SkipFreq => {}
FreqReadingOption::ReadFreq => {
self.freq_decoder
.uncompress_vint_unsorted(self.remaining_data.as_ref(), self.num_vint_docs);
}
}
self.num_vint_docs = 0;
true
}
/// Returns an empty segment postings object
pub fn empty() -> BlockSegmentPostings {
BlockSegmentPostings {
num_vint_docs: 0,
doc_decoder: BlockDecoder::new(),
freq_decoder: BlockDecoder::with_val(1),
freq_reading_option: FreqReadingOption::NoFreq,
doc_offset: 0,
doc_freq: 0,
remaining_data: OwnedRead::new(vec![]),
skip_reader: SkipReader::new(OwnedRead::new(vec![]), IndexRecordOption::Basic),
}
}
}
impl<'b> Streamer<'b> for BlockSegmentPostings {
type Item = &'b [DocId];
fn next(&'b mut self) -> Option<&'b [DocId]> {
if self.advance() {
Some(self.docs())
} else {
None
}
}
}
#[cfg(test)]
mod tests {
use super::BlockSegmentPostings;
use super::BlockSegmentPostingsSkipResult;
use super::SegmentPostings;
use crate::common::HasLen;
use crate::core::Index;
use crate::docset::{DocSet, TERMINATED};
use crate::postings::postings::Postings;
use crate::schema::IndexRecordOption;
use crate::schema::Schema;
use crate::schema::Term;
use crate::schema::INDEXED;
use crate::DocId;
use tantivy_fst::Streamer;
#[test]
fn test_empty_segment_postings() {
@@ -628,165 +216,4 @@ mod tests {
let postings = SegmentPostings::empty();
assert_eq!(postings.term_freq(), 1);
}
#[test]
fn test_empty_block_segment_postings() {
let mut postings = BlockSegmentPostings::empty();
assert!(!postings.advance());
assert_eq!(postings.doc_freq(), 0);
}
#[test]
fn test_block_segment_postings() {
let mut block_segments = build_block_postings(&(0..100_000).collect::<Vec<u32>>());
let mut offset: u32 = 0u32;
// checking that the block before calling advance is empty
assert!(block_segments.docs().is_empty());
// checking that the `doc_freq` is correct
assert_eq!(block_segments.doc_freq(), 100_000);
while let Some(block) = block_segments.next() {
for (i, doc) in block.iter().cloned().enumerate() {
assert_eq!(offset + (i as u32), doc);
}
offset += block.len() as u32;
}
}
#[test]
fn test_skip_right_at_new_block() {
let mut doc_ids = (0..128).collect::<Vec<u32>>();
doc_ids.push(129);
doc_ids.push(130);
{
let block_segments = build_block_postings(&doc_ids);
let mut docset = SegmentPostings::from_block_postings(block_segments, None);
assert_eq!(docset.seek(128), 129);
assert_eq!(docset.doc(), 129);
assert_eq!(docset.advance(), 130);
assert_eq!(docset.doc(), 130);
assert_eq!(docset.advance(), TERMINATED);
}
{
let block_segments = build_block_postings(&doc_ids);
let mut docset = SegmentPostings::from_block_postings(block_segments, None);
assert_eq!(docset.seek(129), 129);
assert_eq!(docset.doc(), 129);
assert_eq!(docset.advance(), 130);
assert_eq!(docset.doc(), 130);
assert_eq!(docset.advance(), TERMINATED);
}
{
let block_segments = build_block_postings(&doc_ids);
let mut docset = SegmentPostings::from_block_postings(block_segments, None);
assert_eq!(docset.doc(), 0);
assert_eq!(docset.seek(131), TERMINATED);
assert_eq!(docset.doc(), TERMINATED);
}
}
fn build_block_postings(docs: &[DocId]) -> BlockSegmentPostings {
let mut schema_builder = Schema::builder();
let int_field = schema_builder.add_u64_field("id", INDEXED);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
let mut last_doc = 0u32;
for &doc in docs {
for _ in last_doc..doc {
index_writer.add_document(doc!(int_field=>1u64));
}
index_writer.add_document(doc!(int_field=>0u64));
last_doc = doc + 1;
}
index_writer.commit().unwrap();
let searcher = index.reader().unwrap().searcher();
let segment_reader = searcher.segment_reader(0);
let inverted_index = segment_reader.inverted_index(int_field);
let term = Term::from_field_u64(int_field, 0u64);
let term_info = inverted_index.get_term_info(&term).unwrap();
inverted_index.read_block_postings_from_terminfo(&term_info, IndexRecordOption::Basic)
}
#[test]
fn test_block_segment_postings_skip() {
for i in 0..4 {
let mut block_postings = build_block_postings(&[3]);
assert_eq!(
block_postings.skip_to(i),
BlockSegmentPostingsSkipResult::Success(0u32)
);
assert_eq!(
block_postings.skip_to(i),
BlockSegmentPostingsSkipResult::Terminated
);
}
let mut block_postings = build_block_postings(&[3]);
assert_eq!(
block_postings.skip_to(4u32),
BlockSegmentPostingsSkipResult::Terminated
);
}
#[test]
fn test_block_segment_postings_skip2() {
let mut docs = vec![0];
for i in 0..1300 {
docs.push((i * i / 100) + i);
}
let mut block_postings = build_block_postings(&docs[..]);
for i in vec![0, 424, 10000] {
assert_eq!(
block_postings.skip_to(i),
BlockSegmentPostingsSkipResult::Success(0u32)
);
let docs = block_postings.docs();
assert!(docs[0] <= i);
assert!(docs.last().cloned().unwrap_or(0u32) >= i);
}
assert_eq!(
block_postings.skip_to(100_000),
BlockSegmentPostingsSkipResult::Terminated
);
assert_eq!(
block_postings.skip_to(101_000),
BlockSegmentPostingsSkipResult::Terminated
);
}
#[test]
fn test_reset_block_segment_postings() {
let mut schema_builder = Schema::builder();
let int_field = schema_builder.add_u64_field("id", INDEXED);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
// create two postings list, one containg even number,
// the other containing odd numbers.
for i in 0..6 {
let doc = doc!(int_field=> (i % 2) as u64);
index_writer.add_document(doc);
}
index_writer.commit().unwrap();
let searcher = index.reader().unwrap().searcher();
let segment_reader = searcher.segment_reader(0);
let mut block_segments;
{
let term = Term::from_field_u64(int_field, 0u64);
let inverted_index = segment_reader.inverted_index(int_field);
let term_info = inverted_index.get_term_info(&term).unwrap();
block_segments = inverted_index
.read_block_postings_from_terminfo(&term_info, IndexRecordOption::Basic);
}
assert!(block_segments.advance());
assert_eq!(block_segments.docs(), &[0, 2, 4]);
{
let term = Term::from_field_u64(int_field, 1u64);
let inverted_index = segment_reader.inverted_index(int_field);
let term_info = inverted_index.get_term_info(&term).unwrap();
inverted_index.reset_block_postings_from_terminfo(&term_info, &mut block_segments);
}
assert!(block_segments.advance());
assert_eq!(block_segments.docs(), &[1, 3, 5]);
}
}

View File

@@ -6,7 +6,6 @@ use crate::directory::WritePtr;
use crate::positions::PositionSerializer;
use crate::postings::compression::{BlockEncoder, VIntEncoder, COMPRESSION_BLOCK_SIZE};
use crate::postings::skip::SkipSerializer;
use crate::postings::USE_SKIP_INFO_LIMIT;
use crate::schema::Schema;
use crate::schema::{Field, FieldEntry, FieldType};
use crate::termdict::{TermDictionaryBuilder, TermOrdinal};
@@ -391,7 +390,7 @@ impl<W: Write> PostingsSerializer<W> {
}
self.block.clear();
}
if doc_freq >= USE_SKIP_INFO_LIMIT {
if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 {
let skip_data = self.skip_write.data();
VInt(skip_data.len() as u64).serialize(&mut self.output_write)?;
self.output_write.write_all(skip_data)?;

View File

@@ -1,7 +1,8 @@
use crate::common::BinarySerializable;
use crate::postings::compression::COMPRESSION_BLOCK_SIZE;
use crate::directory::ReadOnlySource;
use crate::postings::compression::{compressed_block_size, COMPRESSION_BLOCK_SIZE};
use crate::schema::IndexRecordOption;
use crate::DocId;
use crate::{DocId, TERMINATED};
use owned_read::OwnedRead;
pub struct SkipSerializer {
@@ -50,80 +51,143 @@ impl SkipSerializer {
}
pub(crate) struct SkipReader {
doc: DocId,
last_doc_in_block: DocId,
pub(crate) last_doc_in_previous_block: DocId,
owned_read: OwnedRead,
doc_num_bits: u8,
tf_num_bits: u8,
tf_sum: u32,
skip_info: IndexRecordOption,
byte_offset: usize,
remaining_docs: u32, // number of docs remaining, including the
// documents in the current block.
block_info: BlockInfo,
position_offset: u64,
}
#[derive(Clone, Eq, PartialEq, Copy, Debug)]
pub(crate) enum BlockInfo {
BitPacked {
doc_num_bits: u8,
tf_num_bits: u8,
tf_sum: u32,
},
VInt(u32),
}
impl Default for BlockInfo {
fn default() -> Self {
BlockInfo::VInt(0)
}
}
impl SkipReader {
pub fn new(data: OwnedRead, skip_info: IndexRecordOption) -> SkipReader {
pub fn new(data: ReadOnlySource, doc_freq: u32, skip_info: IndexRecordOption) -> SkipReader {
SkipReader {
doc: 0u32,
owned_read: data,
last_doc_in_block: 0u32,
last_doc_in_previous_block: 0u32,
owned_read: OwnedRead::new(data),
skip_info,
doc_num_bits: 0u8,
tf_num_bits: 0u8,
tf_sum: 0u32,
block_info: BlockInfo::default(),
byte_offset: 0,
remaining_docs: doc_freq,
position_offset: 0u64,
}
}
pub fn reset(&mut self, data: OwnedRead) {
self.doc = 0u32;
self.owned_read = data;
self.doc_num_bits = 0u8;
self.tf_num_bits = 0u8;
self.tf_sum = 0u32;
pub fn reset(&mut self, data: ReadOnlySource, doc_freq: u32) {
self.last_doc_in_block = 0u32;
self.last_doc_in_previous_block = 0u32;
self.owned_read = OwnedRead::new(data);
self.block_info = BlockInfo::default();
self.byte_offset = 0;
self.remaining_docs = doc_freq;
}
pub fn total_block_len(&self) -> usize {
(self.doc_num_bits + self.tf_num_bits) as usize * COMPRESSION_BLOCK_SIZE / 8
#[cfg(test)]
#[inline(always)]
pub(crate) fn last_doc_in_block(&self) -> DocId {
self.last_doc_in_block
}
pub fn doc(&self) -> DocId {
self.doc
pub fn position_offset(&self) -> u64 {
self.position_offset
}
pub fn doc_num_bits(&self) -> u8 {
self.doc_num_bits
pub fn byte_offset(&self) -> usize {
self.byte_offset
}
/// Number of bits used to encode term frequencies
fn read_block_info(&mut self) {
let doc_delta = u32::deserialize(&mut self.owned_read).expect("Skip data corrupted");
self.last_doc_in_block += doc_delta as DocId;
let doc_num_bits = self.owned_read.get(0);
match self.skip_info {
IndexRecordOption::Basic => {
self.owned_read.advance(1);
self.block_info = BlockInfo::BitPacked {
doc_num_bits,
tf_num_bits: 0,
tf_sum: 0,
};
}
IndexRecordOption::WithFreqs => {
let tf_num_bits = self.owned_read.get(1);
self.block_info = BlockInfo::BitPacked {
doc_num_bits,
tf_num_bits,
tf_sum: 0,
};
self.owned_read.advance(2);
}
IndexRecordOption::WithFreqsAndPositions => {
let tf_num_bits = self.owned_read.get(1);
self.owned_read.advance(2);
let tf_sum = u32::deserialize(&mut self.owned_read).expect("Failed reading tf_sum");
self.block_info = BlockInfo::BitPacked {
doc_num_bits,
tf_num_bits,
tf_sum,
};
}
}
}
pub fn block_info(&self) -> BlockInfo {
self.block_info
}
/// Advance the skip reader to the block that may contain the target.
///
/// 0 if term frequencies are not enabled.
pub fn tf_num_bits(&self) -> u8 {
self.tf_num_bits
}
pub fn tf_sum(&self) -> u32 {
self.tf_sum
/// If the target is larger than all documents, the skip_reader
/// then advance to the last Variable In block.
pub fn seek(&mut self, target: DocId) {
while self.last_doc_in_block < target {
self.advance();
}
}
pub fn advance(&mut self) -> bool {
if self.owned_read.as_ref().is_empty() {
false
} else {
let doc_delta = u32::deserialize(&mut self.owned_read).expect("Skip data corrupted");
self.doc += doc_delta as DocId;
self.doc_num_bits = self.owned_read.get(0);
match self.skip_info {
IndexRecordOption::Basic => {
self.owned_read.advance(1);
}
IndexRecordOption::WithFreqs => {
self.tf_num_bits = self.owned_read.get(1);
self.owned_read.advance(2);
}
IndexRecordOption::WithFreqsAndPositions => {
self.tf_num_bits = self.owned_read.get(1);
self.owned_read.advance(2);
self.tf_sum =
u32::deserialize(&mut self.owned_read).expect("Failed reading tf_sum");
}
match self.block_info {
BlockInfo::BitPacked {
doc_num_bits,
tf_num_bits,
tf_sum,
} => {
self.remaining_docs -= COMPRESSION_BLOCK_SIZE as u32;
self.byte_offset += compressed_block_size(doc_num_bits + tf_num_bits);
self.position_offset += tf_sum as u64;
}
BlockInfo::VInt(num_vint_docs) => {
self.remaining_docs -= num_vint_docs;
}
}
self.last_doc_in_previous_block = self.last_doc_in_block;
if self.remaining_docs >= COMPRESSION_BLOCK_SIZE as u32 {
self.read_block_info();
true
} else {
self.last_doc_in_block = TERMINATED;
self.block_info = BlockInfo::VInt(self.remaining_docs);
self.remaining_docs > 0
}
}
}
@@ -131,9 +195,11 @@ impl SkipReader {
#[cfg(test)]
mod tests {
use super::BlockInfo;
use super::IndexRecordOption;
use super::{SkipReader, SkipSerializer};
use owned_read::OwnedRead;
use crate::directory::ReadOnlySource;
use crate::postings::compression::COMPRESSION_BLOCK_SIZE;
#[test]
fn test_skip_with_freq() {
@@ -145,15 +211,34 @@ mod tests {
skip_serializer.write_term_freq(2u8);
skip_serializer.data().to_owned()
};
let mut skip_reader = SkipReader::new(OwnedRead::new(buf), IndexRecordOption::WithFreqs);
let doc_freq = 3u32 + (COMPRESSION_BLOCK_SIZE * 2) as u32;
let mut skip_reader = SkipReader::new(
ReadOnlySource::new(buf),
doc_freq,
IndexRecordOption::WithFreqs,
);
assert!(skip_reader.advance());
assert_eq!(skip_reader.doc(), 1u32);
assert_eq!(skip_reader.doc_num_bits(), 2u8);
assert_eq!(skip_reader.tf_num_bits(), 3u8);
assert_eq!(skip_reader.last_doc_in_block(), 1u32);
assert_eq!(
skip_reader.block_info(),
BlockInfo::BitPacked {
doc_num_bits: 2u8,
tf_num_bits: 3u8,
tf_sum: 0
}
);
assert!(skip_reader.advance());
assert_eq!(skip_reader.doc(), 5u32);
assert_eq!(skip_reader.doc_num_bits(), 5u8);
assert_eq!(skip_reader.tf_num_bits(), 2u8);
assert_eq!(skip_reader.last_doc_in_block(), 5u32);
assert_eq!(
skip_reader.block_info(),
BlockInfo::BitPacked {
doc_num_bits: 5u8,
tf_num_bits: 2u8,
tf_sum: 0
}
);
assert!(skip_reader.advance());
assert_eq!(skip_reader.block_info(), BlockInfo::VInt(3u32));
assert!(!skip_reader.advance());
}
@@ -165,13 +250,60 @@ mod tests {
skip_serializer.write_doc(5u32, 5u8);
skip_serializer.data().to_owned()
};
let mut skip_reader = SkipReader::new(OwnedRead::new(buf), IndexRecordOption::Basic);
let doc_freq = 3u32 + (COMPRESSION_BLOCK_SIZE * 2) as u32;
let mut skip_reader = SkipReader::new(
ReadOnlySource::from(buf),
doc_freq,
IndexRecordOption::Basic,
);
assert!(skip_reader.advance());
assert_eq!(skip_reader.doc(), 1u32);
assert_eq!(skip_reader.doc_num_bits(), 2u8);
assert_eq!(skip_reader.last_doc_in_block(), 1u32);
assert_eq!(
skip_reader.block_info(),
BlockInfo::BitPacked {
doc_num_bits: 2u8,
tf_num_bits: 0,
tf_sum: 0u32
}
);
assert!(skip_reader.advance());
assert_eq!(skip_reader.doc(), 5u32);
assert_eq!(skip_reader.doc_num_bits(), 5u8);
assert_eq!(skip_reader.last_doc_in_block(), 5u32);
assert_eq!(
skip_reader.block_info(),
BlockInfo::BitPacked {
doc_num_bits: 5u8,
tf_num_bits: 0,
tf_sum: 0u32
}
);
assert!(skip_reader.advance());
assert_eq!(skip_reader.block_info(), BlockInfo::VInt(3u32));
assert!(!skip_reader.advance());
}
#[test]
fn test_skip_multiple_of_block_size() {
let buf = {
let mut skip_serializer = SkipSerializer::new();
skip_serializer.write_doc(1u32, 2u8);
skip_serializer.data().to_owned()
};
let doc_freq = COMPRESSION_BLOCK_SIZE as u32;
let mut skip_reader = SkipReader::new(
ReadOnlySource::from(buf),
doc_freq,
IndexRecordOption::Basic,
);
assert!(skip_reader.advance());
assert_eq!(skip_reader.last_doc_in_block(), 1u32);
assert_eq!(
skip_reader.block_info(),
BlockInfo::BitPacked {
doc_num_bits: 2u8,
tf_num_bits: 0,
tf_sum: 0u32
}
);
assert!(!skip_reader.advance());
}
}

View File

@@ -51,10 +51,13 @@ where
let term_info = term_stream.value();
let mut block_segment_postings = inverted_index
.read_block_postings_from_terminfo(term_info, IndexRecordOption::Basic);
while block_segment_postings.advance() {
loop {
for &doc in block_segment_postings.docs() {
doc_bitset.insert(doc);
}
if !block_segment_postings.advance() {
break;
}
}
}
let doc_bitset = BitSetDocSet::from(doc_bitset);

View File

@@ -2,6 +2,7 @@ use crate::core::SegmentReader;
use crate::query::explanation::does_not_match;
use crate::query::score_combiner::{DoNothingCombiner, ScoreCombiner, SumWithCoordsCombiner};
use crate::query::term_query::TermScorer;
use crate::query::weight::{for_each_pruning_scorer, for_each_scorer};
use crate::query::EmptyScorer;
use crate::query::Exclude;
use crate::query::Occur;
@@ -10,16 +11,21 @@ use crate::query::Scorer;
use crate::query::Union;
use crate::query::Weight;
use crate::query::{intersect_scorers, Explanation};
use crate::DocId;
use crate::{DocId, Score};
use std::collections::HashMap;
fn scorer_union<TScoreCombiner>(scorers: Vec<Box<dyn Scorer>>) -> Box<dyn Scorer>
enum SpecializedScorer<TScoreCombiner: ScoreCombiner> {
TermUnion(Union<TermScorer, TScoreCombiner>),
Other(Box<dyn Scorer>),
}
fn scorer_union<TScoreCombiner>(scorers: Vec<Box<dyn Scorer>>) -> SpecializedScorer<TScoreCombiner>
where
TScoreCombiner: ScoreCombiner,
{
assert!(!scorers.is_empty());
if scorers.len() == 1 {
return scorers.into_iter().next().unwrap(); //< we checked the size beforehands
return SpecializedScorer::Other(scorers.into_iter().next().unwrap()); //< we checked the size beforehands
}
{
@@ -29,14 +35,21 @@ where
.into_iter()
.map(|scorer| *(scorer.downcast::<TermScorer>().map_err(|_| ()).unwrap()))
.collect();
let scorer: Box<dyn Scorer> =
Box::new(Union::<TermScorer, TScoreCombiner>::from(scorers));
return scorer;
return SpecializedScorer::TermUnion(Union::<TermScorer, TScoreCombiner>::from(
scorers,
));
}
}
SpecializedScorer::Other(Box::new(Union::<_, TScoreCombiner>::from(scorers)))
}
let scorer: Box<dyn Scorer> = Box::new(Union::<_, TScoreCombiner>::from(scorers));
scorer
impl<TScoreCombiner: ScoreCombiner> Into<Box<dyn Scorer>> for SpecializedScorer<TScoreCombiner> {
fn into(self) -> Box<dyn Scorer> {
match self {
Self::TermUnion(union) => Box::new(union),
Self::Other(scorer) => scorer,
}
}
}
pub struct BooleanWeight {
@@ -72,41 +85,50 @@ impl BooleanWeight {
&self,
reader: &SegmentReader,
boost: f32,
) -> crate::Result<Box<dyn Scorer>> {
) -> crate::Result<SpecializedScorer<TScoreCombiner>> {
let mut per_occur_scorers = self.per_occur_scorers(reader, boost)?;
let should_scorer_opt: Option<Box<dyn Scorer>> = per_occur_scorers
let should_scorer_opt: Option<SpecializedScorer<TScoreCombiner>> = per_occur_scorers
.remove(&Occur::Should)
.map(scorer_union::<TScoreCombiner>);
let exclude_scorer_opt: Option<Box<dyn Scorer>> = per_occur_scorers
.remove(&Occur::MustNot)
.map(scorer_union::<TScoreCombiner>);
.map(scorer_union::<TScoreCombiner>)
.map(Into::into);
let must_scorer_opt: Option<Box<dyn Scorer>> = per_occur_scorers
.remove(&Occur::Must)
.map(intersect_scorers);
let positive_scorer: Box<dyn Scorer> = match (should_scorer_opt, must_scorer_opt) {
(Some(should_scorer), Some(must_scorer)) => {
if self.scoring_enabled {
Box::new(RequiredOptionalScorer::<_, _, TScoreCombiner>::new(
must_scorer,
should_scorer,
))
} else {
must_scorer
let positive_scorer: SpecializedScorer<TScoreCombiner> =
match (should_scorer_opt, must_scorer_opt) {
(Some(should_scorer), Some(must_scorer)) => {
if self.scoring_enabled {
SpecializedScorer::Other(Box::new(RequiredOptionalScorer::<
Box<dyn Scorer>,
Box<dyn Scorer>,
TScoreCombiner,
>::new(
must_scorer, should_scorer.into()
)))
} else {
SpecializedScorer::Other(must_scorer)
}
}
}
(None, Some(must_scorer)) => must_scorer,
(Some(should_scorer), None) => should_scorer,
(None, None) => {
return Ok(Box::new(EmptyScorer));
}
};
(None, Some(must_scorer)) => SpecializedScorer::Other(must_scorer),
(Some(should_scorer), None) => should_scorer,
(None, None) => {
return Ok(SpecializedScorer::Other(Box::new(EmptyScorer)));
}
};
if let Some(exclude_scorer) = exclude_scorer_opt {
Ok(Box::new(Exclude::new(positive_scorer, exclude_scorer)))
let positive_scorer_boxed: Box<dyn Scorer> = positive_scorer.into();
Ok(SpecializedScorer::Other(Box::new(Exclude::new(
positive_scorer_boxed,
exclude_scorer,
))))
} else {
Ok(positive_scorer)
}
@@ -126,8 +148,10 @@ impl Weight for BooleanWeight {
}
} else if self.scoring_enabled {
self.complex_scorer::<SumWithCoordsCombiner>(reader, boost)
.map(Into::into)
} else {
self.complex_scorer::<DoNothingCombiner>(reader, boost)
.map(Into::into)
}
}
@@ -150,6 +174,51 @@ impl Weight for BooleanWeight {
}
Ok(explanation)
}
fn for_each(
&self,
reader: &SegmentReader,
callback: &mut dyn FnMut(DocId, Score),
) -> crate::Result<()> {
let scorer = self.complex_scorer::<SumWithCoordsCombiner>(reader, 1.0f32)?;
match scorer {
SpecializedScorer::TermUnion(mut union_scorer) => {
for_each_scorer(&mut union_scorer, callback);
}
SpecializedScorer::Other(mut scorer) => {
for_each_scorer(scorer.as_mut(), callback);
}
}
Ok(())
}
/// Calls `callback` with all of the `(doc, score)` for which score
/// is exceeding a given threshold.
///
/// This method is useful for the TopDocs collector.
/// For all docsets, the blanket implementation has the benefit
/// of prefiltering (doc, score) pairs, avoiding the
/// virtual dispatch cost.
///
/// More importantly, it makes it possible for scorers to implement
/// important optimization (e.g. BlockWAND for union).
fn for_each_pruning(
&self,
threshold: f32,
reader: &SegmentReader,
callback: &mut dyn FnMut(DocId, Score) -> Score,
) -> crate::Result<()> {
let scorer = self.complex_scorer::<SumWithCoordsCombiner>(reader, 1.0f32)?;
match scorer {
SpecializedScorer::TermUnion(mut union_scorer) => {
for_each_pruning_scorer(&mut union_scorer, threshold, callback);
}
SpecializedScorer::Other(mut scorer) => {
for_each_pruning_scorer(scorer.as_mut(), threshold, callback);
}
}
Ok(())
}
}
fn is_positive_occur(occur: Occur) -> bool {

View File

@@ -112,11 +112,6 @@ impl<TDocSet: DocSet, TOtherDocSet: DocSet> DocSet for Intersection<TDocSet, TOt
debug_assert_eq!(left.doc(), right.doc());
// test the remaining scorers;
for docset in self.others.iter_mut() {
// `candidate_ord` is already at the
// right position.
//
// Calling `skip_next` would advance this docset
// and miss it.
let seek_doc = docset.seek(candidate);
if seek_doc > candidate {
candidate = left.seek(seek_doc);

View File

@@ -300,10 +300,13 @@ impl Weight for RangeWeight {
let term_info = term_range.value();
let mut block_segment_postings = inverted_index
.read_block_postings_from_terminfo(term_info, IndexRecordOption::Basic);
while block_segment_postings.advance() {
loop {
for &doc in block_segment_postings.docs() {
doc_bitset.insert(doc);
}
if !block_segment_postings.advance() {
break;
}
}
}
let doc_bitset = BitSetDocSet::from(doc_bitset);

View File

@@ -101,7 +101,10 @@ mod tests {
ConstScorer::from(VecDocSet::from(vec![])),
);
let mut docs = vec![];
reqoptscorer.for_each(&mut |doc, _| docs.push(doc));
while reqoptscorer.doc() != TERMINATED {
docs.push(reqoptscorer.doc());
reqoptscorer.advance();
}
assert_eq!(docs, req);
}

View File

@@ -1,4 +1,4 @@
use crate::docset::{DocSet, TERMINATED};
use crate::docset::DocSet;
use crate::DocId;
use crate::Score;
use downcast_rs::impl_downcast;
@@ -12,41 +12,6 @@ pub trait Scorer: downcast_rs::Downcast + DocSet + 'static {
///
/// This method will perform a bit of computation and is not cached.
fn score(&mut self) -> Score;
/// Iterates through all of the document matched by the DocSet
/// `DocSet` and push the scored documents to the collector.
fn for_each(&mut self, callback: &mut dyn FnMut(DocId, Score)) {
let mut doc = self.doc();
while doc != TERMINATED {
callback(doc, self.score());
doc = self.advance();
}
}
/// Calls `callback` with all of the `(doc, score)` for which score
/// is exceeding a given threshold.
///
/// This method is useful for the TopDocs collector.
/// For all docsets, the blanket implementation has the benefit
/// of prefiltering (doc, score) pairs, avoiding the
/// virtual dispatch cost.
///
/// More importantly, it makes it possible for scorers to implement
/// important optimization (e.g. BlockWAND for union).
fn for_each_pruning(
&mut self,
mut threshold: f32,
callback: &mut dyn FnMut(DocId, Score) -> Score,
) {
let mut doc = self.doc();
while doc != TERMINATED {
let score = self.score();
if score > threshold {
threshold = callback(doc, score);
}
doc = self.advance();
}
}
}
impl_downcast!(Scorer);
@@ -55,11 +20,6 @@ impl Scorer for Box<dyn Scorer> {
fn score(&mut self) -> Score {
self.deref_mut().score()
}
fn for_each(&mut self, callback: &mut dyn FnMut(DocId, Score)) {
let scorer = self.deref_mut();
scorer.for_each(callback);
}
}
/// Wraps a `DocSet` and simply returns a constant `Scorer`.

View File

@@ -4,12 +4,13 @@ use crate::docset::DocSet;
use crate::postings::SegmentPostings;
use crate::query::bm25::BM25Weight;
use crate::query::explanation::does_not_match;
use crate::query::weight::{for_each_pruning_scorer, for_each_scorer};
use crate::query::Weight;
use crate::query::{Explanation, Scorer};
use crate::schema::IndexRecordOption;
use crate::DocId;
use crate::Result;
use crate::Term;
use crate::{DocId, Score};
pub struct TermWeight {
term: Term,
@@ -43,6 +44,39 @@ impl Weight for TermWeight {
.unwrap_or(0))
}
}
/// Iterates through all of the document matched by the DocSet
/// `DocSet` and push the scored documents to the collector.
fn for_each(
&self,
reader: &SegmentReader,
callback: &mut dyn FnMut(DocId, Score),
) -> crate::Result<()> {
let mut scorer = self.scorer_specialized(reader, 1.0f32)?;
for_each_scorer(&mut scorer, callback);
Ok(())
}
/// Calls `callback` with all of the `(doc, score)` for which score
/// is exceeding a given threshold.
///
/// This method is useful for the TopDocs collector.
/// For all docsets, the blanket implementation has the benefit
/// of prefiltering (doc, score) pairs, avoiding the
/// virtual dispatch cost.
///
/// More importantly, it makes it possible for scorers to implement
/// important optimization (e.g. BlockWAND for union).
fn for_each_pruning(
&self,
threshold: f32,
reader: &SegmentReader,
callback: &mut dyn FnMut(DocId, Score) -> Score,
) -> crate::Result<()> {
let mut scorer = self.scorer(reader, 1.0f32)?;
for_each_pruning_scorer(&mut scorer, threshold, callback);
Ok(())
}
}
impl TermWeight {

View File

@@ -198,6 +198,18 @@ where
// TODO Also implement `count` with deletes efficiently.
fn doc(&self) -> DocId {
self.doc
}
fn size_hint(&self) -> u32 {
self.docsets
.iter()
.map(|docset| docset.size_hint())
.max()
.unwrap_or(0u32)
}
fn count_including_deleted(&mut self) -> u32 {
if self.doc == TERMINATED {
return 0;
@@ -219,18 +231,6 @@ where
self.cursor = HORIZON_NUM_TINYBITSETS;
count
}
fn doc(&self) -> DocId {
self.doc
}
fn size_hint(&self) -> u32 {
self.docsets
.iter()
.map(|docset| docset.size_hint())
.max()
.unwrap_or(0u32)
}
}
impl<TScorer, TScoreCombiner> Scorer for Union<TScorer, TScoreCombiner>

View File

@@ -1,7 +1,45 @@
use super::Scorer;
use crate::core::SegmentReader;
use crate::query::Explanation;
use crate::DocId;
use crate::{DocId, Score, TERMINATED};
/// Iterates through all of the document matched by the DocSet
/// `DocSet` and push the scored documents to the collector.
pub(crate) fn for_each_scorer<TScorer: Scorer + ?Sized>(
scorer: &mut TScorer,
callback: &mut dyn FnMut(DocId, Score),
) {
let mut doc = scorer.doc();
while doc != TERMINATED {
callback(doc, scorer.score());
doc = scorer.advance();
}
}
/// Calls `callback` with all of the `(doc, score)` for which score
/// is exceeding a given threshold.
///
/// This method is useful for the TopDocs collector.
/// For all docsets, the blanket implementation has the benefit
/// of prefiltering (doc, score) pairs, avoiding the
/// virtual dispatch cost.
///
/// More importantly, it makes it possible for scorers to implement
/// important optimization (e.g. BlockWAND for union).
pub(crate) fn for_each_pruning_scorer<TScorer: Scorer + ?Sized>(
scorer: &mut TScorer,
mut threshold: f32,
callback: &mut dyn FnMut(DocId, Score) -> Score,
) {
let mut doc = scorer.doc();
while doc != TERMINATED {
let score = scorer.score();
if score > threshold {
threshold = callback(doc, score);
}
doc = scorer.advance();
}
}
/// A Weight is the specialization of a Query
/// for a given set of segments.
@@ -27,4 +65,37 @@ pub trait Weight: Send + Sync + 'static {
Ok(scorer.count_including_deleted())
}
}
/// Iterates through all of the document matched by the DocSet
/// `DocSet` and push the scored documents to the collector.
fn for_each(
&self,
reader: &SegmentReader,
callback: &mut dyn FnMut(DocId, Score),
) -> crate::Result<()> {
let mut scorer = self.scorer(reader, 1.0f32)?;
for_each_scorer(scorer.as_mut(), callback);
Ok(())
}
/// Calls `callback` with all of the `(doc, score)` for which score
/// is exceeding a given threshold.
///
/// This method is useful for the TopDocs collector.
/// For all docsets, the blanket implementation has the benefit
/// of prefiltering (doc, score) pairs, avoiding the
/// virtual dispatch cost.
///
/// More importantly, it makes it possible for scorers to implement
/// important optimization (e.g. BlockWAND for union).
fn for_each_pruning(
&self,
threshold: f32,
reader: &SegmentReader,
callback: &mut dyn FnMut(DocId, Score) -> Score,
) -> crate::Result<()> {
let mut scorer = self.scorer(reader, 1.0f32)?;
for_each_pruning_scorer(scorer.as_mut(), threshold, callback);
Ok(())
}
}