From 41ea14840de84f94fa9b6dfb20f2644416380ded Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Campinas?= Date: Mon, 31 May 2021 16:15:01 +0200 Subject: [PATCH] add benchmark of term streams merge (#1024) * add benchmark of term streams merge * use union based on FST for merging the term dictionaries * Rename TermMerger benchmark --- CHANGELOG.md | 2 +- src/indexer/merger.rs | 8 +- src/termdict/fst_termdict/streamer.rs | 4 +- src/termdict/merger.rs | 175 ++++++++++++++++---------- 4 files changed, 112 insertions(+), 77 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5277168c7..30f495beb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,7 +18,7 @@ Tantivy 0.15.0 - Add detection to avoid small doc store blocks on merge (@PSeitz). #1054 - Make doc store compression dynamic (@PSeitz). #1060 - Switch to json for footer version handling (@PSeitz). #1060 - +- Updated TermMerger implementation to rely on the union feature of the FST (@scampi) #469 Tantivy 0.14.0 ========================= diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index fdd63bbf4..04a656afc 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -871,13 +871,11 @@ impl IndexMerger { let mut total_doc_freq = 0; // Let's compute the list of non-empty posting lists - for heap_item in merged_terms.current_kvs() { - let segment_ord = heap_item.segment_ord; - let term_info = heap_item.streamer.value(); - let segment_reader = &self.readers[heap_item.segment_ord]; + for (segment_ord, term_info) in merged_terms.current_segment_ordinals_and_term_infos() { + let segment_reader = &self.readers[segment_ord]; let inverted_index: &InvertedIndexReader = &*field_readers[segment_ord]; let segment_postings = inverted_index - .read_postings_from_terminfo(term_info, segment_postings_option)?; + .read_postings_from_terminfo(&term_info, segment_postings_option)?; let delete_bitset_opt = segment_reader.delete_bitset(); let doc_freq = if let Some(delete_bitset) = delete_bitset_opt { segment_postings.doc_freq_given_deletes(delete_bitset) diff --git a/src/termdict/fst_termdict/streamer.rs b/src/termdict/fst_termdict/streamer.rs index 66ce02c2a..10a72d6dc 100644 --- a/src/termdict/fst_termdict/streamer.rs +++ b/src/termdict/fst_termdict/streamer.rs @@ -78,8 +78,8 @@ pub struct TermStreamer<'a, A = AlwaysMatch> where A: Automaton, { - fst_map: &'a TermDictionary, - stream: Stream<'a, A>, + pub(crate) fst_map: &'a TermDictionary, + pub(crate) stream: Stream<'a, A>, term_ord: TermOrdinal, current_key: Vec, current_value: TermInfo, diff --git a/src/termdict/merger.rs b/src/termdict/merger.rs index 720db3e80..2c3d2e18e 100644 --- a/src/termdict/merger.rs +++ b/src/termdict/merger.rs @@ -1,32 +1,11 @@ +use crate::postings::TermInfo; +use crate::termdict::TermDictionary; use crate::termdict::TermOrdinal; use crate::termdict::TermStreamer; -use std::cmp::Ordering; -use std::collections::BinaryHeap; - -pub struct HeapItem<'a> { - pub streamer: TermStreamer<'a>, - pub segment_ord: usize, -} - -impl<'a> PartialEq for HeapItem<'a> { - fn eq(&self, other: &Self) -> bool { - self.segment_ord == other.segment_ord - } -} - -impl<'a> Eq for HeapItem<'a> {} - -impl<'a> PartialOrd for HeapItem<'a> { - fn partial_cmp(&self, other: &HeapItem<'a>) -> Option { - Some(self.cmp(other)) - } -} - -impl<'a> Ord for HeapItem<'a> { - fn cmp(&self, other: &HeapItem<'a>) -> Ordering { - (&other.streamer.key(), &other.segment_ord).cmp(&(&self.streamer.key(), &self.segment_ord)) - } -} +use tantivy_fst::map::OpBuilder; +use tantivy_fst::map::Union; +use tantivy_fst::raw::IndexedValue; +use tantivy_fst::Streamer; /// Given a list of sorted term streams, /// returns an iterator over sorted unique terms. @@ -34,61 +13,50 @@ impl<'a> Ord for HeapItem<'a> { /// The item yield is actually a pair with /// - the term /// - a slice with the ordinal of the segments containing -/// the terms. +/// the term. pub struct TermMerger<'a> { - heap: BinaryHeap>, - current_streamers: Vec>, + dictionaries: Vec<&'a TermDictionary>, + union: Union<'a>, + current_key: Vec, + current_segment_and_term_ordinals: Vec, } impl<'a> TermMerger<'a> { /// Stream of merged term dictionary /// pub fn new(streams: Vec>) -> TermMerger<'a> { + let mut op_builder = OpBuilder::new(); + let mut dictionaries = vec![]; + for streamer in streams { + op_builder.push(streamer.stream); + dictionaries.push(streamer.fst_map); + } TermMerger { - heap: BinaryHeap::new(), - current_streamers: streams - .into_iter() - .enumerate() - .map(|(ord, streamer)| HeapItem { - streamer, - segment_ord: ord, - }) - .collect(), + dictionaries, + union: op_builder.union(), + current_key: vec![], + current_segment_and_term_ordinals: vec![], } } - pub(crate) fn matching_segments<'b: 'a>( - &'b self, - ) -> impl 'b + Iterator { - self.current_streamers + pub fn matching_segments<'b: 'a>(&'b self) -> impl 'b + Iterator { + self.current_segment_and_term_ordinals .iter() - .map(|heap_item| (heap_item.segment_ord, heap_item.streamer.term_ord())) - } - - fn advance_segments(&mut self) { - let streamers = &mut self.current_streamers; - let heap = &mut self.heap; - for mut heap_item in streamers.drain(..) { - if heap_item.streamer.advance() { - heap.push(heap_item); - } - } + .map(|iv| (iv.index, iv.value)) } /// Advance the term iterator to the next term. /// Returns true if there is indeed another term /// False if there is none. pub fn advance(&mut self) -> bool { - self.advance_segments(); - if let Some(head) = self.heap.pop() { - self.current_streamers.push(head); - while let Some(next_streamer) = self.heap.peek() { - if self.current_streamers[0].streamer.key() != next_streamer.streamer.key() { - break; - } - let next_heap_it = self.heap.pop().unwrap(); // safe : we peeked beforehand - self.current_streamers.push(next_heap_it); - } + if let Some((k, values)) = self.union.next() { + self.current_key.clear(); + self.current_key.extend_from_slice(k); + self.current_segment_and_term_ordinals.clear(); + self.current_segment_and_term_ordinals + .extend_from_slice(values); + self.current_segment_and_term_ordinals + .sort_by_key(|iv| iv.index); true } else { false @@ -101,16 +69,85 @@ impl<'a> TermMerger<'a> { /// iff advance() has been called before /// and "true" was returned. pub fn key(&self) -> &[u8] { - self.current_streamers[0].streamer.key() + &self.current_key } - /// Returns the sorted list of segment ordinals - /// that include the current term. + /// Iterator over (segment ordinal, TermInfo) pairs iterator sorted by the ordinal. /// /// This method may be called /// iff advance() has been called before /// and "true" was returned. - pub fn current_kvs(&self) -> &[HeapItem<'a>] { - &self.current_streamers[..] + pub fn current_segment_ordinals_and_term_infos<'b: 'a>( + &'b self, + ) -> impl 'b + Iterator { + self.current_segment_and_term_ordinals + .iter() + .map(move |iv| { + ( + iv.index, + self.dictionaries[iv.index].term_info_from_ord(iv.value), + ) + }) + } +} + +#[cfg(all(test, feature = "unstable"))] +mod bench { + use super::TermMerger; + use crate::directory::FileSlice; + use crate::postings::TermInfo; + use crate::termdict::{TermDictionary, TermDictionaryBuilder}; + use rand::distributions::Alphanumeric; + use rand::{thread_rng, Rng}; + use test::{self, Bencher}; + + fn make_term_info(term_ord: u64) -> TermInfo { + let offset = |term_ord: u64| (term_ord * 100 + term_ord * term_ord) as usize; + TermInfo { + doc_freq: term_ord as u32, + postings_range: offset(term_ord)..offset(term_ord + 1), + positions_range: offset(term_ord)..offset(term_ord + 1), + } + } + + /// Create a dictionary of random strings. + fn rand_dict(num_terms: usize) -> crate::Result { + let buffer: Vec = { + let mut terms = vec![]; + for _i in 0..num_terms { + let rand_string: String = thread_rng() + .sample_iter(&Alphanumeric) + .take(thread_rng().gen_range(30..42)) + .map(char::from) + .collect(); + terms.push(rand_string); + } + terms.sort(); + + let mut term_dictionary_builder = TermDictionaryBuilder::create(Vec::new())?; + for i in 0..num_terms { + term_dictionary_builder.insert(terms[i].as_bytes(), &make_term_info(i as u64))?; + } + term_dictionary_builder.finish()? + }; + let file = FileSlice::from(buffer); + TermDictionary::open(file) + } + + #[bench] + fn bench_termmerger(b: &mut Bencher) -> crate::Result<()> { + let dict1 = rand_dict(100_000)?; + let dict2 = rand_dict(100_000)?; + b.iter(|| -> crate::Result { + let stream1 = dict1.stream()?; + let stream2 = dict2.stream()?; + let mut merger = TermMerger::new(vec![stream1, stream2]); + let mut count = 0; + while merger.advance() { + count += 1; + } + Ok(count) + }); + Ok(()) } }