add benchmark of term streams merge (#1024)

* add benchmark of term streams merge
* use union based on FST for merging the term dictionaries
* Rename TermMerger benchmark
This commit is contained in:
Stéphane Campinas
2021-05-31 16:15:01 +02:00
committed by GitHub
parent dff0ffd38a
commit 41ea14840d
4 changed files with 112 additions and 77 deletions

View File

@@ -18,7 +18,7 @@ Tantivy 0.15.0
- Add detection to avoid small doc store blocks on merge (@PSeitz). #1054
- Make doc store compression dynamic (@PSeitz). #1060
- Switch to json for footer version handling (@PSeitz). #1060
- Updated TermMerger implementation to rely on the union feature of the FST (@scampi) #469
Tantivy 0.14.0
=========================

View File

@@ -871,13 +871,11 @@ impl IndexMerger {
let mut total_doc_freq = 0;
// Let's compute the list of non-empty posting lists
for heap_item in merged_terms.current_kvs() {
let segment_ord = heap_item.segment_ord;
let term_info = heap_item.streamer.value();
let segment_reader = &self.readers[heap_item.segment_ord];
for (segment_ord, term_info) in merged_terms.current_segment_ordinals_and_term_infos() {
let segment_reader = &self.readers[segment_ord];
let inverted_index: &InvertedIndexReader = &*field_readers[segment_ord];
let segment_postings = inverted_index
.read_postings_from_terminfo(term_info, segment_postings_option)?;
.read_postings_from_terminfo(&term_info, segment_postings_option)?;
let delete_bitset_opt = segment_reader.delete_bitset();
let doc_freq = if let Some(delete_bitset) = delete_bitset_opt {
segment_postings.doc_freq_given_deletes(delete_bitset)

View File

@@ -78,8 +78,8 @@ pub struct TermStreamer<'a, A = AlwaysMatch>
where
A: Automaton,
{
fst_map: &'a TermDictionary,
stream: Stream<'a, A>,
pub(crate) fst_map: &'a TermDictionary,
pub(crate) stream: Stream<'a, A>,
term_ord: TermOrdinal,
current_key: Vec<u8>,
current_value: TermInfo,

View File

@@ -1,32 +1,11 @@
use crate::postings::TermInfo;
use crate::termdict::TermDictionary;
use crate::termdict::TermOrdinal;
use crate::termdict::TermStreamer;
use std::cmp::Ordering;
use std::collections::BinaryHeap;
pub struct HeapItem<'a> {
pub streamer: TermStreamer<'a>,
pub segment_ord: usize,
}
impl<'a> PartialEq for HeapItem<'a> {
fn eq(&self, other: &Self) -> bool {
self.segment_ord == other.segment_ord
}
}
impl<'a> Eq for HeapItem<'a> {}
impl<'a> PartialOrd for HeapItem<'a> {
fn partial_cmp(&self, other: &HeapItem<'a>) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<'a> Ord for HeapItem<'a> {
fn cmp(&self, other: &HeapItem<'a>) -> Ordering {
(&other.streamer.key(), &other.segment_ord).cmp(&(&self.streamer.key(), &self.segment_ord))
}
}
use tantivy_fst::map::OpBuilder;
use tantivy_fst::map::Union;
use tantivy_fst::raw::IndexedValue;
use tantivy_fst::Streamer;
/// Given a list of sorted term streams,
/// returns an iterator over sorted unique terms.
@@ -34,61 +13,50 @@ impl<'a> Ord for HeapItem<'a> {
/// The item yield is actually a pair with
/// - the term
/// - a slice with the ordinal of the segments containing
/// the terms.
/// the term.
pub struct TermMerger<'a> {
heap: BinaryHeap<HeapItem<'a>>,
current_streamers: Vec<HeapItem<'a>>,
dictionaries: Vec<&'a TermDictionary>,
union: Union<'a>,
current_key: Vec<u8>,
current_segment_and_term_ordinals: Vec<IndexedValue>,
}
impl<'a> TermMerger<'a> {
/// Stream of merged term dictionary
///
pub fn new(streams: Vec<TermStreamer<'a>>) -> TermMerger<'a> {
let mut op_builder = OpBuilder::new();
let mut dictionaries = vec![];
for streamer in streams {
op_builder.push(streamer.stream);
dictionaries.push(streamer.fst_map);
}
TermMerger {
heap: BinaryHeap::new(),
current_streamers: streams
.into_iter()
.enumerate()
.map(|(ord, streamer)| HeapItem {
streamer,
segment_ord: ord,
})
.collect(),
dictionaries,
union: op_builder.union(),
current_key: vec![],
current_segment_and_term_ordinals: vec![],
}
}
pub(crate) fn matching_segments<'b: 'a>(
&'b self,
) -> impl 'b + Iterator<Item = (usize, TermOrdinal)> {
self.current_streamers
pub fn matching_segments<'b: 'a>(&'b self) -> impl 'b + Iterator<Item = (usize, TermOrdinal)> {
self.current_segment_and_term_ordinals
.iter()
.map(|heap_item| (heap_item.segment_ord, heap_item.streamer.term_ord()))
}
fn advance_segments(&mut self) {
let streamers = &mut self.current_streamers;
let heap = &mut self.heap;
for mut heap_item in streamers.drain(..) {
if heap_item.streamer.advance() {
heap.push(heap_item);
}
}
.map(|iv| (iv.index, iv.value))
}
/// Advance the term iterator to the next term.
/// Returns true if there is indeed another term
/// False if there is none.
pub fn advance(&mut self) -> bool {
self.advance_segments();
if let Some(head) = self.heap.pop() {
self.current_streamers.push(head);
while let Some(next_streamer) = self.heap.peek() {
if self.current_streamers[0].streamer.key() != next_streamer.streamer.key() {
break;
}
let next_heap_it = self.heap.pop().unwrap(); // safe : we peeked beforehand
self.current_streamers.push(next_heap_it);
}
if let Some((k, values)) = self.union.next() {
self.current_key.clear();
self.current_key.extend_from_slice(k);
self.current_segment_and_term_ordinals.clear();
self.current_segment_and_term_ordinals
.extend_from_slice(values);
self.current_segment_and_term_ordinals
.sort_by_key(|iv| iv.index);
true
} else {
false
@@ -101,16 +69,85 @@ impl<'a> TermMerger<'a> {
/// iff advance() has been called before
/// and "true" was returned.
pub fn key(&self) -> &[u8] {
self.current_streamers[0].streamer.key()
&self.current_key
}
/// Returns the sorted list of segment ordinals
/// that include the current term.
/// Iterator over (segment ordinal, TermInfo) pairs iterator sorted by the ordinal.
///
/// This method may be called
/// iff advance() has been called before
/// and "true" was returned.
pub fn current_kvs(&self) -> &[HeapItem<'a>] {
&self.current_streamers[..]
pub fn current_segment_ordinals_and_term_infos<'b: 'a>(
&'b self,
) -> impl 'b + Iterator<Item = (usize, TermInfo)> {
self.current_segment_and_term_ordinals
.iter()
.map(move |iv| {
(
iv.index,
self.dictionaries[iv.index].term_info_from_ord(iv.value),
)
})
}
}
#[cfg(all(test, feature = "unstable"))]
mod bench {
use super::TermMerger;
use crate::directory::FileSlice;
use crate::postings::TermInfo;
use crate::termdict::{TermDictionary, TermDictionaryBuilder};
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use test::{self, Bencher};
fn make_term_info(term_ord: u64) -> TermInfo {
let offset = |term_ord: u64| (term_ord * 100 + term_ord * term_ord) as usize;
TermInfo {
doc_freq: term_ord as u32,
postings_range: offset(term_ord)..offset(term_ord + 1),
positions_range: offset(term_ord)..offset(term_ord + 1),
}
}
/// Create a dictionary of random strings.
fn rand_dict(num_terms: usize) -> crate::Result<TermDictionary> {
let buffer: Vec<u8> = {
let mut terms = vec![];
for _i in 0..num_terms {
let rand_string: String = thread_rng()
.sample_iter(&Alphanumeric)
.take(thread_rng().gen_range(30..42))
.map(char::from)
.collect();
terms.push(rand_string);
}
terms.sort();
let mut term_dictionary_builder = TermDictionaryBuilder::create(Vec::new())?;
for i in 0..num_terms {
term_dictionary_builder.insert(terms[i].as_bytes(), &make_term_info(i as u64))?;
}
term_dictionary_builder.finish()?
};
let file = FileSlice::from(buffer);
TermDictionary::open(file)
}
#[bench]
fn bench_termmerger(b: &mut Bencher) -> crate::Result<()> {
let dict1 = rand_dict(100_000)?;
let dict2 = rand_dict(100_000)?;
b.iter(|| -> crate::Result<u32> {
let stream1 = dict1.stream()?;
let stream2 = dict2.stream()?;
let mut merger = TermMerger::new(vec![stream1, stream2]);
let mut count = 0;
while merger.advance() {
count += 1;
}
Ok(count)
});
Ok(())
}
}