diff --git a/Cargo.toml b/Cargo.toml index c9e691cbc..13f5a2c7c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,7 +58,6 @@ debug-assertions = false [features] default = ["mmap"] simd = ["bitpacking/simd"] -streamdict = [] mmap = ["fst/mmap", "atomicwrites"] unstable = ["simd"] diff --git a/src/collector/facet_collector.rs b/src/collector/facet_collector.rs index 8fae1e231..9ac30992c 100644 --- a/src/collector/facet_collector.rs +++ b/src/collector/facet_collector.rs @@ -7,9 +7,6 @@ use schema::Facet; use std::collections::BTreeMap; use std::collections::BinaryHeap; use std::collections::Bound; -use termdict::TermDictionary; -use termdict::TermStreamer; -use termdict::TermStreamerBuilder; use std::collections::BTreeSet; use termdict::TermMerger; use docset::SkipResult; diff --git a/src/core/inverted_index_reader.rs b/src/core/inverted_index_reader.rs index e72e6d19d..ac7b70314 100644 --- a/src/core/inverted_index_reader.rs +++ b/src/core/inverted_index_reader.rs @@ -1,5 +1,5 @@ use directory::{ReadOnlySource, SourceRead}; -use termdict::{TermDictionary, TermDictionaryImpl}; +use termdict::TermDictionary; use postings::{BlockSegmentPostings, SegmentPostings}; use postings::TermInfo; use schema::IndexRecordOption; @@ -23,7 +23,7 @@ use schema::FieldType; /// `InvertedIndexReader` are created by calling /// the `SegmentReader`'s [`.inverted_index(...)`] method pub struct InvertedIndexReader { - termdict: TermDictionaryImpl, + termdict: TermDictionary, postings_source: ReadOnlySource, positions_source: ReadOnlySource, record_option: IndexRecordOption, @@ -32,7 +32,7 @@ pub struct InvertedIndexReader { impl InvertedIndexReader { pub(crate) fn new( - termdict: TermDictionaryImpl, + termdict: TermDictionary, postings_source: ReadOnlySource, positions_source: ReadOnlySource, record_option: IndexRecordOption, @@ -56,7 +56,7 @@ impl InvertedIndexReader { .get_index_record_option() .unwrap_or(IndexRecordOption::Basic); InvertedIndexReader { - termdict: TermDictionaryImpl::empty(field_type), + termdict: TermDictionary::empty(field_type), postings_source: ReadOnlySource::empty(), positions_source: ReadOnlySource::empty(), record_option, @@ -70,7 +70,7 @@ impl InvertedIndexReader { } /// Return the term dictionary datastructure. - pub fn terms(&self) -> &TermDictionaryImpl { + pub fn terms(&self) -> &TermDictionary { &self.termdict } diff --git a/src/core/searcher.rs b/src/core/searcher.rs index b8d3dcdab..b9389b0c2 100644 --- a/src/core/searcher.rs +++ b/src/core/searcher.rs @@ -5,7 +5,7 @@ use collector::Collector; use query::Query; use DocAddress; use schema::{Field, Term}; -use termdict::{TermDictionary, TermMerger}; +use termdict::TermMerger; use std::sync::Arc; use std::fmt; use schema::Schema; diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index 7be2565f6..0dd18c853 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -18,7 +18,6 @@ use core::InvertedIndexReader; use schema::Field; use schema::FieldType; use error::ErrorKind; -use termdict::TermDictionaryImpl; use fastfield::FacetReader; use fastfield::FastFieldReader; use schema::Schema; @@ -162,7 +161,7 @@ impl SegmentReader { field_entry.name() )) })?; - let termdict = TermDictionaryImpl::from_source(termdict_source); + let termdict = TermDictionary::from_source(termdict_source); let facet_reader = FacetReader::new(term_ords_reader, termdict); Ok(facet_reader) } @@ -286,7 +285,7 @@ impl SegmentReader { .expect("Index corrupted. Failed to open field positions in composite file."); let inv_idx_reader = Arc::new(InvertedIndexReader::new( - TermDictionaryImpl::from_source(termdict_source), + TermDictionary::from_source(termdict_source), postings_source, positions_source, record_option, diff --git a/src/fastfield/facet_reader.rs b/src/fastfield/facet_reader.rs index d18a145b3..5490cfd19 100644 --- a/src/fastfield/facet_reader.rs +++ b/src/fastfield/facet_reader.rs @@ -2,7 +2,7 @@ use super::MultiValueIntFastFieldReader; use DocId; use termdict::TermOrdinal; use schema::Facet; -use termdict::{TermDictionary, TermDictionaryImpl}; +use termdict::TermDictionary; /// The facet reader makes it possible to access the list of /// facets associated to a given document in a specific @@ -19,7 +19,7 @@ use termdict::{TermDictionary, TermDictionaryImpl}; /// only makes sense for a given segment. pub struct FacetReader { term_ords: MultiValueIntFastFieldReader, - term_dict: TermDictionaryImpl, + term_dict: TermDictionary, } impl FacetReader { @@ -28,11 +28,11 @@ impl FacetReader { /// A facet reader just wraps : /// - a `MultiValueIntFastFieldReader` that makes it possible to /// access the list of facet ords for a given document. - /// - a `TermDictionaryImpl` that helps associating a facet to + /// - a `TermDictionary` that helps associating a facet to /// an ordinal and vice versa. pub fn new( term_ords: MultiValueIntFastFieldReader, - term_dict: TermDictionaryImpl, + term_dict: TermDictionary, ) -> FacetReader { FacetReader { term_ords, @@ -50,7 +50,7 @@ impl FacetReader { } /// Accessor for the facet term dictionary. - pub fn facet_dict(&self) -> &TermDictionaryImpl { + pub fn facet_dict(&self) -> &TermDictionary { &self.term_dict } diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index d172a2209..5e4cc527e 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -14,8 +14,6 @@ use fastfield::FastFieldSerializer; use fastfield::FastFieldReader; use store::StoreWriter; use std::cmp::{max, min}; -use termdict::TermDictionary; -use termdict::TermStreamer; use fieldnorm::FieldNormsSerializer; use fieldnorm::FieldNormsWriter; use fieldnorm::FieldNormReader; diff --git a/src/postings/serializer.rs b/src/postings/serializer.rs index 973038a34..62c0f8e7a 100644 --- a/src/postings/serializer.rs +++ b/src/postings/serializer.rs @@ -1,5 +1,4 @@ use Result; -use termdict::TermDictionaryBuilderImpl; use super::TermInfo; use schema::Field; use schema::FieldEntry; @@ -111,7 +110,7 @@ impl InvertedIndexSerializer { /// The field serializer is in charge of /// the serialization of a specific field. pub struct FieldSerializer<'a> { - term_dictionary_builder: TermDictionaryBuilderImpl<&'a mut CountingWriter>, + term_dictionary_builder: TermDictionaryBuilder<&'a mut CountingWriter>, postings_serializer: PostingsSerializer<&'a mut CountingWriter>, positions_serializer_opt: Option>>, current_term_info: TermInfo, @@ -141,7 +140,7 @@ impl<'a> FieldSerializer<'a> { _ => (false, false), }; let term_dictionary_builder = - TermDictionaryBuilderImpl::new(term_dictionary_write, field_type)?; + TermDictionaryBuilder::new(term_dictionary_write, field_type)?; let postings_serializer = PostingsSerializer::new(postings_write, term_freq_enabled); let positions_serializer_opt = if position_enabled { Some(PositionSerializer::new(positions_write)) diff --git a/src/query/range_query.rs b/src/query/range_query.rs index d42aab41a..599541095 100644 --- a/src/query/range_query.rs +++ b/src/query/range_query.rs @@ -1,6 +1,6 @@ use schema::{Field, IndexRecordOption, Term}; use query::{Query, Scorer, Weight}; -use termdict::{TermDictionary, TermStreamer, TermStreamerBuilder}; +use termdict::{TermDictionary, TermStreamer}; use core::SegmentReader; use common::BitSet; use Result; @@ -213,10 +213,7 @@ pub struct RangeWeight { } impl RangeWeight { - fn term_range<'a, T>(&self, term_dict: &'a T) -> T::Streamer - where - T: TermDictionary<'a> + 'a, - { + fn term_range<'a>(&self, term_dict: &'a TermDictionary) -> TermStreamer<'a> { use std::collections::Bound::*; let mut term_stream_builder = term_dict.range(); term_stream_builder = match self.left_bound { diff --git a/src/termdict/fstdict/mod.rs b/src/termdict/fstdict/mod.rs deleted file mode 100644 index 0f31b6e15..000000000 --- a/src/termdict/fstdict/mod.rs +++ /dev/null @@ -1,25 +0,0 @@ -/*! -The term dictionary contains all of the terms in -`tantivy index` in a sorted manner. - -`fstdict` is the default implementation -of the term dictionary. It is implemented as a wrapper -of the `Fst` crate in order to add a value type. - -A finite state transducer itself associates -each term `&[u8]` to a `u64` that is in fact an address -in a buffer. The value is then accessible via -deserializing the value at this address. - -Keys (`&[u8]`) in this datastructure are sorted. -*/ - -mod termdict; -mod streamer; -mod term_info_store; - -pub use self::termdict::TermDictionaryImpl; -pub use self::termdict::TermDictionaryBuilderImpl; -pub use self::term_info_store::{TermInfoStore, TermInfoStoreWriter}; -pub use self::streamer::TermStreamerImpl; -pub use self::streamer::TermStreamerBuilderImpl; diff --git a/src/termdict/fstdict/streamer.rs b/src/termdict/fstdict/streamer.rs deleted file mode 100644 index 1037610a6..000000000 --- a/src/termdict/fstdict/streamer.rs +++ /dev/null @@ -1,89 +0,0 @@ -use fst::{IntoStreamer, Streamer}; -use fst::map::{Stream, StreamBuilder}; -use postings::TermInfo; -use super::TermDictionaryImpl; -use termdict::{TermDictionary, TermOrdinal, TermStreamer, TermStreamerBuilder}; - -/// See [`TermStreamerBuilder`](./trait.TermStreamerBuilder.html) -pub struct TermStreamerBuilderImpl<'a> { - fst_map: &'a TermDictionaryImpl, - stream_builder: StreamBuilder<'a>, -} - -impl<'a> TermStreamerBuilderImpl<'a> { - pub(crate) fn new(fst_map: &'a TermDictionaryImpl, stream_builder: StreamBuilder<'a>) -> Self { - TermStreamerBuilderImpl { - fst_map, - stream_builder, - } - } -} - -impl<'a> TermStreamerBuilder for TermStreamerBuilderImpl<'a> { - type Streamer = TermStreamerImpl<'a>; - - fn ge>(mut self, bound: T) -> Self { - self.stream_builder = self.stream_builder.ge(bound); - self - } - - fn gt>(mut self, bound: T) -> Self { - self.stream_builder = self.stream_builder.gt(bound); - self - } - - fn le>(mut self, bound: T) -> Self { - self.stream_builder = self.stream_builder.le(bound); - self - } - - fn lt>(mut self, bound: T) -> Self { - self.stream_builder = self.stream_builder.lt(bound); - self - } - - fn into_stream(self) -> Self::Streamer { - TermStreamerImpl { - fst_map: self.fst_map, - stream: self.stream_builder.into_stream(), - term_ord: 0u64, - current_key: Vec::with_capacity(100), - current_value: TermInfo::default(), - } - } -} - -/// See [`TermStreamer`](./trait.TermStreamer.html) -pub struct TermStreamerImpl<'a> { - fst_map: &'a TermDictionaryImpl, - stream: Stream<'a>, - term_ord: TermOrdinal, - current_key: Vec, - current_value: TermInfo, -} - -impl<'a> TermStreamer for TermStreamerImpl<'a> { - fn advance(&mut self) -> bool { - if let Some((term, term_ord)) = self.stream.next() { - self.current_key.clear(); - self.current_key.extend_from_slice(term); - self.term_ord = term_ord; - self.current_value = self.fst_map.term_info_from_ord(term_ord); - true - } else { - false - } - } - - fn term_ord(&self) -> TermOrdinal { - self.term_ord - } - - fn key(&self) -> &[u8] { - &self.current_key - } - - fn value(&self) -> &TermInfo { - &self.current_value - } -} diff --git a/src/termdict/merger.rs b/src/termdict/merger.rs index 76e3891c1..c2d1f5d3d 100644 --- a/src/termdict/merger.rs +++ b/src/termdict/merger.rs @@ -1,11 +1,10 @@ use std::collections::BinaryHeap; -use termdict::TermStreamerImpl; -use std::cmp::Ordering; use termdict::TermStreamer; +use std::cmp::Ordering; use schema::Term; pub struct HeapItem<'a> { - pub streamer: TermStreamerImpl<'a>, + pub streamer: TermStreamer<'a>, pub segment_ord: usize, } @@ -45,7 +44,7 @@ impl<'a> TermMerger<'a> { /// Stream of merged term dictionary /// /// - pub fn new(streams: Vec>) -> TermMerger<'a> { + pub fn new(streams: Vec>) -> TermMerger<'a> { TermMerger { heap: BinaryHeap::new(), current_streamers: streams diff --git a/src/termdict/mod.rs b/src/termdict/mod.rs index 05ca31240..fe31d6622 100644 --- a/src/termdict/mod.rs +++ b/src/termdict/mod.rs @@ -47,193 +47,26 @@ followed by a streaming through at most `1024` elements in the term `stream`. */ -use schema::{Field, FieldType, Term}; -use directory::ReadOnlySource; -use postings::TermInfo; - /// Position of the term in the sorted list of terms. pub type TermOrdinal = u64; -pub use self::merger::TermMerger; - -#[cfg(not(feature = "streamdict"))] -mod fstdict; -#[cfg(not(feature = "streamdict"))] -pub use self::fstdict::{TermDictionaryBuilderImpl, TermDictionaryImpl, TermStreamerBuilderImpl, - TermStreamerImpl}; - -#[cfg(feature = "streamdict")] -mod streamdict; -#[cfg(feature = "streamdict")] -pub use self::streamdict::{TermDictionaryBuilderImpl, TermDictionaryImpl, TermStreamerBuilderImpl, - TermStreamerImpl}; - +mod term_info_store; +mod streamer; +mod termdict; mod merger; -use std::io; -/// Dictionary associating sorted `&[u8]` to values -pub trait TermDictionary<'a> -where - Self: Sized, -{ - /// Streamer type associated to the term dictionary - type Streamer: TermStreamer + 'a; - - /// StreamerBuilder type associated to the term dictionary - type StreamBuilder: TermStreamerBuilder + 'a; - - /// Opens a `TermDictionary` given a data source. - fn from_source(source: ReadOnlySource) -> Self; - - /// Returns the number of terms in the dictionary. - /// Term ordinals range from 0 to `num_terms() - 1`. - fn num_terms(&self) -> usize; - - /// Returns the ordinal associated to a given term. - fn term_ord>(&self, term: K) -> Option; - - /// Returns the term associated to a given term ordinal. - /// - /// Term ordinals are defined as the position of the term in - /// the sorted list of terms. - /// - /// Returns true iff the term has been found. - /// - /// Regardless of whether the term is found or not, - /// the buffer may be modified. - fn ord_to_term(&self, ord: TermOrdinal, bytes: &mut Vec) -> bool; - - /// Returns the number of terms in the dictionary. - fn term_info_from_ord(&self, term_ord: TermOrdinal) -> TermInfo; - - /// Lookups the value corresponding to the key. - fn get>(&self, target_key: K) -> Option; - - /// Returns a range builder, to stream all of the terms - /// within an interval. - fn range(&'a self) -> Self::StreamBuilder; - - /// A stream of all the sorted terms. [See also `.stream_field()`](#method.stream_field) - fn stream(&'a self) -> Self::Streamer { - self.range().into_stream() - } - - /// A stream of all the sorted terms in the given field. - fn stream_field(&'a self, field: Field) -> Self::Streamer { - let start_term = Term::from_field_text(field, ""); - let stop_term = Term::from_field_text(Field(field.0 + 1), ""); - self.range() - .ge(start_term.as_slice()) - .lt(stop_term.as_slice()) - .into_stream() - } - - /// Creates an empty term dictionary which contains no terms. - fn empty(field_type: FieldType) -> Self; -} - -/// Builder for the new term dictionary. -/// -/// Inserting must be done in the order of the `keys`. -pub trait TermDictionaryBuilder: Sized -where - W: io::Write, -{ - /// Creates a new `TermDictionaryBuilder` - fn new(write: W, field_type: FieldType) -> io::Result; - - /// Inserts a `(key, value)` pair in the term dictionary. - /// - /// *Keys have to be inserted in order.* - fn insert>(&mut self, key: K, value: &TermInfo) -> io::Result<()>; - - /// Finalize writing the builder, and returns the underlying - /// `Write` object. - fn finish(self) -> io::Result; -} - -/// `TermStreamer` acts as a cursor over a range of terms of a segment. -/// Terms are guaranteed to be sorted. -pub trait TermStreamer: Sized { - /// Advance position the stream on the next item. - /// Before the first call to `.advance()`, the stream - /// is an unitialized state. - fn advance(&mut self) -> bool; - - /// Accesses the current key. - /// - /// `.key()` should return the key that was returned - /// by the `.next()` method. - /// - /// If the end of the stream as been reached, and `.next()` - /// has been called and returned `None`, `.key()` remains - /// the value of the last key encountered. - /// - /// Before any call to `.next()`, `.key()` returns an empty array. - fn key(&self) -> &[u8]; - - /// Returns the `TermOrdinal` of the given term. - /// - /// May panic if the called as `.advance()` as never - /// been called before. - fn term_ord(&self) -> TermOrdinal; - - /// Accesses the current value. - /// - /// Calling `.value()` after the end of the stream will return the - /// last `.value()` encountered. - /// - /// # Panics - /// - /// Calling `.value()` before the first call to `.advance()` returns - /// `V::default()`. - fn value(&self) -> &TermInfo; - - /// Return the next `(key, value)` pair. - fn next(&mut self) -> Option<(&[u8], &TermInfo)> { - if self.advance() { - Some((self.key(), self.value())) - } else { - None - } - } -} - -/// `TermStreamerBuilder` is an helper object used to define -/// a range of terms that should be streamed. -pub trait TermStreamerBuilder { - /// Associated `TermStreamer` type that this builder is building. - type Streamer: TermStreamer; - - /// Limit the range to terms greater or equal to the bound - fn ge>(self, bound: T) -> Self; - - /// Limit the range to terms strictly greater than the bound - fn gt>(self, bound: T) -> Self; - - /// Limit the range to terms lesser or equal to the bound - fn lt>(self, bound: T) -> Self; - - /// Limit the range to terms lesser or equal to the bound - fn le>(self, bound: T) -> Self; - - /// Creates the stream corresponding to the range - /// of terms defined using the `TermStreamerBuilder`. - fn into_stream(self) -> Self::Streamer; -} +pub use self::termdict::{TermDictionary, TermDictionaryBuilder}; +pub use self::streamer::{TermStreamer, TermStreamerBuilder}; +pub use self::merger::TermMerger; #[cfg(test)] mod tests { - use super::{TermDictionaryBuilderImpl, TermDictionaryImpl, TermStreamerImpl}; + use super::{TermDictionaryBuilder, TermDictionary, TermStreamer}; use directory::{Directory, RAMDirectory, ReadOnlySource}; use std::path::PathBuf; use schema::{Document, FieldType, SchemaBuilder, TEXT}; use core::Index; use std::str; - use termdict::TermStreamer; - use termdict::TermStreamerBuilder; - use termdict::TermDictionary; - use termdict::TermDictionaryBuilder; use postings::TermInfo; const BLOCK_SIZE: usize = 1_500; @@ -264,7 +97,7 @@ mod tests { let write = directory.open_write(&path).unwrap(); let field_type = FieldType::Str(TEXT); let mut term_dictionary_builder = - TermDictionaryBuilderImpl::new(write, field_type).unwrap(); + TermDictionaryBuilder::new(write, field_type).unwrap(); for term in COUNTRIES.iter() { term_dictionary_builder .insert(term.as_bytes(), &make_term_info(0u64)) @@ -273,7 +106,7 @@ mod tests { term_dictionary_builder.finish().unwrap(); } let source = directory.open_read(&path).unwrap(); - let term_dict: TermDictionaryImpl = TermDictionaryImpl::from_source(source); + let term_dict: TermDictionary = TermDictionary::from_source(source); for (term_ord, term) in COUNTRIES.iter().enumerate() { assert_eq!(term_dict.term_ord(term).unwrap(), term_ord as u64); let mut bytes = vec![]; @@ -290,7 +123,7 @@ mod tests { let write = directory.open_write(&path).unwrap(); let field_type = FieldType::Str(TEXT); let mut term_dictionary_builder = - TermDictionaryBuilderImpl::new(write, field_type).unwrap(); + TermDictionaryBuilder::new(write, field_type).unwrap(); term_dictionary_builder .insert("abc".as_bytes(), &make_term_info(34u64)) .unwrap(); @@ -300,7 +133,7 @@ mod tests { term_dictionary_builder.finish().unwrap(); } let source = directory.open_read(&path).unwrap(); - let term_dict: TermDictionaryImpl = TermDictionaryImpl::from_source(source); + let term_dict: TermDictionary = TermDictionary::from_source(source); assert_eq!(term_dict.get("abc").unwrap().doc_freq, 34u32); assert_eq!(term_dict.get("abcd").unwrap().doc_freq, 346u32); let mut stream = term_dict.stream(); @@ -378,7 +211,7 @@ mod tests { let field_type = FieldType::Str(TEXT); let buffer: Vec = { let mut term_dictionary_builder = - TermDictionaryBuilderImpl::new(vec![], field_type).unwrap(); + TermDictionaryBuilder::new(vec![], field_type).unwrap(); for &(ref id, ref i) in &ids { term_dictionary_builder .insert(id.as_bytes(), &make_term_info(*i as u64)) @@ -387,7 +220,7 @@ mod tests { term_dictionary_builder.finish().unwrap() }; let source = ReadOnlySource::from(buffer); - let term_dictionary: TermDictionaryImpl = TermDictionaryImpl::from_source(source); + let term_dictionary: TermDictionary = TermDictionary::from_source(source); { let mut streamer = term_dictionary.stream(); let mut i = 0; @@ -408,7 +241,7 @@ mod tests { let field_type = FieldType::Str(TEXT); let buffer: Vec = { let mut term_dictionary_builder = - TermDictionaryBuilderImpl::new(vec![], field_type).unwrap(); + TermDictionaryBuilder::new(vec![], field_type).unwrap(); // term requires more than 16bits term_dictionary_builder .insert("abcdefghijklmnopqrstuvwxy", &make_term_info(1)) @@ -422,7 +255,7 @@ mod tests { term_dictionary_builder.finish().unwrap() }; let source = ReadOnlySource::from(buffer); - let term_dictionary: TermDictionaryImpl = TermDictionaryImpl::from_source(source); + let term_dictionary: TermDictionary = TermDictionary::from_source(source); let mut kv_stream = term_dictionary.stream(); assert!(kv_stream.advance()); assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxy".as_bytes()); @@ -443,7 +276,7 @@ mod tests { let field_type = FieldType::Str(TEXT); let buffer: Vec = { let mut term_dictionary_builder = - TermDictionaryBuilderImpl::new(vec![], field_type).unwrap(); + TermDictionaryBuilder::new(vec![], field_type).unwrap(); for &(ref id, ref i) in &ids { term_dictionary_builder .insert(id.as_bytes(), &make_term_info(*i as u64)) @@ -454,7 +287,7 @@ mod tests { let source = ReadOnlySource::from(buffer); - let term_dictionary: TermDictionaryImpl = TermDictionaryImpl::from_source(source); + let term_dictionary: TermDictionary = TermDictionary::from_source(source); { for i in (0..20).chain(6000..8_000) { let &(ref target_key, _) = &ids[i]; @@ -512,7 +345,7 @@ mod tests { let field_type = FieldType::Str(TEXT); let buffer: Vec = { let mut term_dictionary_builder = - TermDictionaryBuilderImpl::new(vec![], field_type).unwrap(); + TermDictionaryBuilder::new(vec![], field_type).unwrap(); term_dictionary_builder .insert(&[], &make_term_info(1 as u64)) .unwrap(); @@ -522,7 +355,7 @@ mod tests { term_dictionary_builder.finish().unwrap() }; let source = ReadOnlySource::from(buffer); - let term_dictionary: TermDictionaryImpl = TermDictionaryImpl::from_source(source); + let term_dictionary: TermDictionary = TermDictionary::from_source(source); let mut stream = term_dictionary.stream(); assert!(stream.advance()); assert!(stream.key().is_empty()); @@ -536,7 +369,7 @@ mod tests { let field_type = FieldType::Str(TEXT); let buffer: Vec = { let mut term_dictionary_builder = - TermDictionaryBuilderImpl::new(vec![], field_type).unwrap(); + TermDictionaryBuilder::new(vec![], field_type).unwrap(); for i in 0u8..10u8 { let number_arr = [i; 1]; term_dictionary_builder @@ -546,9 +379,9 @@ mod tests { term_dictionary_builder.finish().unwrap() }; let source = ReadOnlySource::from(buffer); - let term_dictionary: TermDictionaryImpl = TermDictionaryImpl::from_source(source); + let term_dictionary: TermDictionary = TermDictionary::from_source(source); - let value_list = |mut streamer: TermStreamerImpl| { + let value_list = |mut streamer: TermStreamer| { let mut res: Vec = vec![]; while let Some((_, ref v)) = streamer.next() { res.push(v.doc_freq); diff --git a/src/termdict/streamdict/delta_encoder.rs b/src/termdict/streamdict/delta_encoder.rs deleted file mode 100644 index b7e64f3cf..000000000 --- a/src/termdict/streamdict/delta_encoder.rs +++ /dev/null @@ -1,179 +0,0 @@ -use postings::TermInfo; -use super::CheckPoint; -use std::mem; -use common::BinarySerializable; - -/// Returns the len of the longest -/// common prefix of `s1` and `s2`. -/// -/// ie: the greatest `L` such that -/// for all `0 <= i < L`, `s1[i] == s2[i]` -fn common_prefix_len(s1: &[u8], s2: &[u8]) -> usize { - s1.iter() - .zip(s2.iter()) - .take_while(|&(a, b)| a == b) - .count() -} - -#[derive(Default)] -pub struct TermDeltaEncoder { - last_term: Vec, - prefix_len: usize, -} - -impl TermDeltaEncoder { - pub fn encode<'a>(&mut self, term: &'a [u8]) { - self.prefix_len = common_prefix_len(term, &self.last_term); - self.last_term.truncate(self.prefix_len); - self.last_term.extend_from_slice(&term[self.prefix_len..]); - } - - pub fn term(&self) -> &[u8] { - &self.last_term[..] - } - - pub fn prefix_suffix(&mut self) -> (usize, &[u8]) { - (self.prefix_len, &self.last_term[self.prefix_len..]) - } -} - -#[derive(Default)] -pub struct TermDeltaDecoder { - term: Vec, -} - -impl TermDeltaDecoder { - pub fn with_previous_term(term: Vec) -> TermDeltaDecoder { - TermDeltaDecoder { - term: Vec::from(term), - } - } - - // code - // first bit represents whether the prefix / suffix len can be encoded - // on the same byte. (the next one) - // - - #[inline(always)] - pub fn decode<'a>(&mut self, code: u8, mut cursor: &'a [u8]) -> &'a [u8] { - let (prefix_len, suffix_len): (usize, usize) = if (code & 1u8) == 1u8 { - let b = cursor[0]; - cursor = &cursor[1..]; - let prefix_len = (b & 15u8) as usize; - let suffix_len = (b >> 4u8) as usize; - (prefix_len, suffix_len) - } else { - let prefix_len = u32::deserialize(&mut cursor).unwrap(); - let suffix_len = u32::deserialize(&mut cursor).unwrap(); - (prefix_len as usize, suffix_len as usize) - }; - unsafe { self.term.set_len(prefix_len) }; - self.term.extend_from_slice(&(*cursor)[..suffix_len]); - &cursor[suffix_len..] - } - - pub fn term(&self) -> &[u8] { - &self.term[..] - } -} - -#[derive(Default)] -pub struct DeltaTermInfo { - pub doc_freq: u32, - pub delta_postings_offset: u64, - pub delta_positions_offset: u64, - pub positions_inner_offset: u8, -} - -pub struct TermInfoDeltaEncoder { - term_info: TermInfo, - pub has_positions: bool, -} - -impl TermInfoDeltaEncoder { - pub fn new(has_positions: bool) -> Self { - TermInfoDeltaEncoder { - term_info: TermInfo::default(), - has_positions, - } - } - - pub fn term_info(&self) -> &TermInfo { - &self.term_info - } - - pub fn encode(&mut self, term_info: TermInfo) -> DeltaTermInfo { - let mut delta_term_info = DeltaTermInfo { - doc_freq: term_info.doc_freq, - delta_postings_offset: term_info.postings_offset - self.term_info.postings_offset, - delta_positions_offset: 0u64, - positions_inner_offset: 0, - }; - if self.has_positions { - delta_term_info.delta_positions_offset = - term_info.positions_offset - self.term_info.positions_offset; - delta_term_info.positions_inner_offset = term_info.positions_inner_offset; - } - mem::replace(&mut self.term_info, term_info); - delta_term_info - } -} - -pub struct TermInfoDeltaDecoder { - term_info: TermInfo, - has_positions: bool, -} - -#[inline(always)] -pub fn make_mask(num_bytes: usize) -> u32 { - const MASK: [u32; 4] = [0xffu32, 0xffffu32, 0xffffffu32, 0xffffffffu32]; - *unsafe { MASK.get_unchecked(num_bytes.wrapping_sub(1) as usize) } -} - -impl TermInfoDeltaDecoder { - pub fn from_term_info(term_info: TermInfo, has_positions: bool) -> TermInfoDeltaDecoder { - TermInfoDeltaDecoder { - term_info, - has_positions, - } - } - - pub fn from_checkpoint(checkpoint: &CheckPoint, has_positions: bool) -> TermInfoDeltaDecoder { - TermInfoDeltaDecoder { - term_info: TermInfo { - doc_freq: 0u32, - postings_offset: checkpoint.postings_offset, - positions_offset: checkpoint.positions_offset, - positions_inner_offset: 0u8, - }, - has_positions, - } - } - - #[inline(always)] - pub fn decode<'a>(&mut self, code: u8, mut cursor: &'a [u8]) -> &'a [u8] { - let num_bytes_docfreq: usize = ((code >> 1) & 3) as usize + 1; - let num_bytes_postings_offset: usize = ((code >> 3) & 3) as usize + 1; - let mut v: u64 = unsafe { *(cursor.as_ptr() as *const u64) }; - let doc_freq: u32 = (v as u32) & make_mask(num_bytes_docfreq); - v >>= (num_bytes_docfreq as u64) * 8u64; - let delta_postings_offset: u64 = v & make_mask(num_bytes_postings_offset); - cursor = &cursor[num_bytes_docfreq + num_bytes_postings_offset..]; - self.term_info.doc_freq = doc_freq; - self.term_info.postings_offset += delta_postings_offset; - if self.has_positions { - let num_bytes_positions_offset = ((code >> 5) & 3) as usize + 1; - let delta_positions_offset: u32 = - unsafe { *(cursor.as_ptr() as *const u32) } & make_mask(num_bytes_positions_offset); - self.term_info.positions_offset += delta_positions_offset; - self.term_info.positions_inner_offset = cursor[num_bytes_positions_offset]; - &cursor[num_bytes_positions_offset + 1..] - } else { - cursor - } - } - - pub fn term_info(&self) -> &TermInfo { - &self.term_info - } -} diff --git a/src/termdict/streamdict/mod.rs b/src/termdict/streamdict/mod.rs deleted file mode 100644 index 4bb7ad082..000000000 --- a/src/termdict/streamdict/mod.rs +++ /dev/null @@ -1,41 +0,0 @@ -use std::io::{self, Read, Write}; -use common::BinarySerializable; - -mod termdict; -mod streamer; -mod delta_encoder; - -pub use self::delta_encoder::{TermDeltaDecoder, TermDeltaEncoder}; -pub use self::delta_encoder::{DeltaTermInfo, TermInfoDeltaDecoder, TermInfoDeltaEncoder}; - -pub use self::termdict::TermDictionaryImpl; -pub use self::termdict::TermDictionaryBuilderImpl; -pub use self::streamer::TermStreamerImpl; -pub use self::streamer::TermStreamerBuilderImpl; - -#[derive(Debug)] -pub struct CheckPoint { - pub stream_offset: u32, - pub postings_offset: u32, - pub positions_offset: u32, -} - -impl BinarySerializable for CheckPoint { - fn serialize(&self, writer: &mut W) -> io::Result<()> { - self.stream_offset.serialize(writer)?; - self.postings_offset.serialize(writer)?; - self.positions_offset.serialize(writer)?; - Ok(()) - } - - fn deserialize(reader: &mut R) -> io::Result { - let stream_offset = u32::deserialize(reader)?; - let postings_offset = u32::deserialize(reader)?; - let positions_offset = u32::deserialize(reader)?; - Ok(CheckPoint { - stream_offset, - postings_offset, - positions_offset, - }) - } -} diff --git a/src/termdict/streamdict/streamer.rs b/src/termdict/streamdict/streamer.rs deleted file mode 100644 index c63337434..000000000 --- a/src/termdict/streamdict/streamer.rs +++ /dev/null @@ -1,184 +0,0 @@ -#![allow(should_implement_trait)] - -use std::cmp::max; -use super::TermDictionaryImpl; -use termdict::{TermStreamer, TermStreamerBuilder}; -use postings::TermInfo; -use super::delta_encoder::{TermDeltaDecoder, TermInfoDeltaDecoder}; - -fn stream_before<'a>( - term_dictionary: &'a TermDictionaryImpl, - target_key: &[u8], - has_positions: bool, -) -> TermStreamerImpl<'a> { - let (prev_key, checkpoint) = term_dictionary.strictly_previous_key(target_key.as_ref()); - let stream_data: &'a [u8] = &term_dictionary.stream_data()[checkpoint.stream_offset as usize..]; - TermStreamerImpl { - cursor: stream_data, - term_delta_decoder: TermDeltaDecoder::with_previous_term(prev_key), - term_info_decoder: TermInfoDeltaDecoder::from_checkpoint(&checkpoint, has_positions), - } -} - -/// See [`TermStreamerBuilder`](./trait.TermStreamerBuilder.html) -pub struct TermStreamerBuilderImpl<'a> { - term_dictionary: &'a TermDictionaryImpl, - origin: usize, - offset_from: usize, - offset_to: usize, - current_key: Vec, - term_info: TermInfo, - has_positions: bool, -} - -impl<'a> TermStreamerBuilder for TermStreamerBuilderImpl<'a> { - type Streamer = TermStreamerImpl<'a>; - - /// Limit the range to terms greater or equal to the bound - fn ge>(mut self, bound: T) -> Self { - let target_key = bound.as_ref(); - let streamer = stream_before( - self.term_dictionary, - target_key.as_ref(), - self.has_positions, - ); - let smaller_than = |k: &[u8]| k.lt(target_key); - let (offset_before, current_key, term_info) = get_offset(smaller_than, streamer); - self.current_key = current_key; - self.term_info = term_info; - self.offset_from = offset_before - self.origin; - self - } - - /// Limit the range to terms strictly greater than the bound - fn gt>(mut self, bound: T) -> Self { - let target_key = bound.as_ref(); - let streamer = stream_before( - self.term_dictionary, - target_key.as_ref(), - self.has_positions, - ); - let smaller_than = |k: &[u8]| k.le(target_key); - let (offset_before, current_key, term_info) = get_offset(smaller_than, streamer); - self.current_key = current_key; - self.term_info = term_info; - self.offset_from = offset_before - self.origin; - self - } - - /// Limit the range to terms lesser or equal to the bound - fn lt>(mut self, bound: T) -> Self { - let target_key = bound.as_ref(); - let streamer = stream_before( - self.term_dictionary, - target_key.as_ref(), - self.has_positions, - ); - let smaller_than = |k: &[u8]| k.lt(target_key); - let (offset_before, _, _) = get_offset(smaller_than, streamer); - self.offset_to = offset_before - self.origin; - self - } - - /// Limit the range to terms lesser or equal to the bound - fn le>(mut self, bound: T) -> Self { - let target_key = bound.as_ref(); - let streamer = stream_before( - self.term_dictionary, - target_key.as_ref(), - self.has_positions, - ); - let smaller_than = |k: &[u8]| k.le(target_key); - let (offset_before, _, _) = get_offset(smaller_than, streamer); - self.offset_to = offset_before - self.origin; - self - } - - /// Build the streamer. - fn into_stream(self) -> Self::Streamer { - let data: &[u8] = self.term_dictionary.stream_data(); - let start = self.offset_from; - let stop = max(self.offset_to, start); - let term_delta_decoder = TermDeltaDecoder::with_previous_term(self.current_key); - let term_info_decoder = - TermInfoDeltaDecoder::from_term_info(self.term_info, self.has_positions); - TermStreamerImpl { - cursor: &data[start..stop], - term_delta_decoder, - term_info_decoder, - } - } -} - -/// Returns offset information for the first -/// key in the stream matching a given predicate. -/// -/// returns -/// - the block start -/// - the index within this block -/// - the term_buffer state to initialize the block) -fn get_offset<'a, P: Fn(&[u8]) -> bool>( - predicate: P, - mut streamer: TermStreamerImpl<'a>, -) -> (usize, Vec, TermInfo) { - let mut prev: &[u8] = streamer.cursor; - - let mut term_info = streamer.value().clone(); - let mut prev_data: Vec = Vec::from(streamer.term_delta_decoder.term()); - - while let Some((iter_key, iter_term_info)) = streamer.next() { - if !predicate(iter_key.as_ref()) { - return (prev.as_ptr() as usize, prev_data, term_info); - } - prev = streamer.cursor; - prev_data.clear(); - prev_data.extend_from_slice(iter_key.as_ref()); - term_info = iter_term_info.clone(); - } - (prev.as_ptr() as usize, prev_data, term_info) -} - -impl<'a> TermStreamerBuilderImpl<'a> { - pub(crate) fn new(term_dictionary: &'a TermDictionaryImpl, has_positions: bool) -> Self { - let data = term_dictionary.stream_data(); - let origin = data.as_ptr() as usize; - TermStreamerBuilderImpl { - term_dictionary, - term_info: TermInfo::default(), - origin, - offset_from: 0, - offset_to: data.len(), - current_key: Vec::with_capacity(300), - has_positions, - } - } -} - -/// See [`TermStreamer`](./trait.TermStreamer.html) -pub struct TermStreamerImpl<'a> { - cursor: &'a [u8], - term_delta_decoder: TermDeltaDecoder, - term_info_decoder: TermInfoDeltaDecoder, -} - -impl<'a> TermStreamer for TermStreamerImpl<'a> { - fn advance(&mut self) -> bool { - if self.cursor.is_empty() { - return false; - } - let mut cursor: &[u8] = &self.cursor; - let code: u8 = cursor[0]; - cursor = self.term_delta_decoder.decode(code, &cursor[1..]); - cursor = self.term_info_decoder.decode(code, cursor); - self.cursor = cursor; - true - } - - fn key(&self) -> &[u8] { - self.term_delta_decoder.term() - } - - fn value(&self) -> &TermInfo { - &self.term_info_decoder.term_info() - } -} diff --git a/src/termdict/streamdict/termdict.rs b/src/termdict/streamdict/termdict.rs deleted file mode 100644 index 8d7fb16c2..000000000 --- a/src/termdict/streamdict/termdict.rs +++ /dev/null @@ -1,354 +0,0 @@ -#![allow(should_implement_trait)] - -use std::io::{self, Write}; -use super::CheckPoint; -use fst; - -use fst::raw::Fst; -use directory::ReadOnlySource; -use common::BinarySerializable; -use common::CountingWriter; -use postings::TermInfo; -use schema::FieldType; -use super::{DeltaTermInfo, TermDeltaEncoder, TermInfoDeltaEncoder}; -use fst::raw::Node; -use termdict::{TermDictionary, TermDictionaryBuilder, TermStreamer}; -use super::{TermStreamerBuilderImpl, TermStreamerImpl}; -use termdict::TermStreamerBuilder; -use std::mem::transmute; - -const PADDING_SIZE: usize = 4; -const INDEX_INTERVAL: usize = 1024; - -fn convert_fst_error(e: fst::Error) -> io::Error { - io::Error::new(io::ErrorKind::Other, e) -} - -fn has_positions(field_type: &FieldType) -> bool { - match *field_type { - FieldType::Str(ref text_options) => { - let indexing_options = text_options.get_indexing_options(); - if indexing_options.is_position_enabled() { - true - } else { - false - } - } - _ => false, - } -} - -/// See [`TermDictionaryBuilder`](./trait.TermDictionaryBuilder.html) -pub struct TermDictionaryBuilderImpl { - write: CountingWriter, - term_delta_encoder: TermDeltaEncoder, - term_info_encoder: TermInfoDeltaEncoder, - block_index: fst::MapBuilder>, - checkpoints: Vec, - len: usize, -} - -fn fill_last<'a>(fst: &'a Fst, mut node: Node<'a>, buffer: &mut Vec) { - while let Some(transition) = node.transitions().last() { - buffer.push(transition.inp); - node = fst.node(transition.addr); - } -} - -impl TermDictionaryBuilderImpl -where - W: Write, -{ - fn add_index_entry(&mut self) { - let stream_offset = self.write.written_bytes() as u32; - let term_info = self.term_info_encoder.term_info(); - let postings_offset = term_info.postings_offset as u32; - let positions_offset = term_info.positions_offset as u32; - let checkpoint = CheckPoint { - stream_offset, - postings_offset, - positions_offset, - }; - self.block_index - .insert( - &self.term_delta_encoder.term(), - self.checkpoints.len() as u64, - ) - .expect( - "Serializing fst on a Vec should never fail. \ - Where your terms not in order maybe?", - ); - checkpoint - .serialize(&mut self.checkpoints) - .expect("Serializing checkpoint on a Vec should never fail."); - } - - /// # Warning - /// Horribly dangerous internal API - /// - /// If used, it must be used by systematically alternating calls - /// to insert_key and insert_value. - /// - /// Prefer using `.insert(key, value)` - pub(crate) fn insert_key(&mut self, key: &[u8]) -> io::Result<()> { - if self.len % INDEX_INTERVAL == 0 { - self.add_index_entry(); - } - self.term_delta_encoder.encode(key); - Ok(()) - } - - pub(crate) fn insert_value(&mut self, term_info: &TermInfo) -> io::Result<()> { - let delta_term_info = self.term_info_encoder.encode(term_info.clone()); - let (prefix_len, suffix) = self.term_delta_encoder.prefix_suffix(); - write_term_kv( - prefix_len, - suffix, - &delta_term_info, - self.term_info_encoder.has_positions, - &mut self.write, - )?; - self.len += 1; - Ok(()) - } -} - -fn num_bytes_required(mut n: u32) -> u8 { - for i in 1u8..5u8 { - if n < 256u32 { - return i; - } else { - n /= 256; - } - } - 0u8 -} - -fn write_term_kv( - prefix_len: usize, - suffix: &[u8], - delta_term_info: &DeltaTermInfo, - has_positions: bool, - write: &mut W, -) -> io::Result<()> { - let suffix_len = suffix.len(); - let mut code = 0u8; - let num_bytes_docfreq = num_bytes_required(delta_term_info.doc_freq); - let num_bytes_postings_offset = num_bytes_required(delta_term_info.delta_postings_offset); - let num_bytes_positions_offset = num_bytes_required(delta_term_info.delta_positions_offset); - code |= (num_bytes_docfreq - 1) << 1u8; - code |= (num_bytes_postings_offset - 1) << 3u8; - code |= (num_bytes_positions_offset - 1) << 5u8; - if (prefix_len < 16) && (suffix_len < 16) { - code |= 1u8; - write.write_all(&[code, (prefix_len as u8) | ((suffix_len as u8) << 4u8)])?; - } else { - write.write_all(&[code])?; - (prefix_len as u32).serialize(write)?; - (suffix_len as u32).serialize(write)?; - } - write.write_all(suffix)?; - { - let bytes: [u8; 4] = unsafe { transmute(delta_term_info.doc_freq) }; - write.write_all(&bytes[0..num_bytes_docfreq as usize])?; - } - { - let bytes: [u8; 4] = unsafe { transmute(delta_term_info.delta_postings_offset) }; - write.write_all(&bytes[0..num_bytes_postings_offset as usize])?; - } - if has_positions { - let bytes: [u8; 4] = unsafe { transmute(delta_term_info.delta_positions_offset) }; - write.write_all(&bytes[0..num_bytes_positions_offset as usize])?; - write.write_all(&[delta_term_info.positions_inner_offset])?; - } - Ok(()) -} - -impl TermDictionaryBuilder for TermDictionaryBuilderImpl -where - W: Write, -{ - /// Creates a new `TermDictionaryBuilder` - fn new(mut write: W, field_type: FieldType) -> io::Result { - let has_positions = has_positions(&field_type); - let has_positions_code = if has_positions { 255u8 } else { 0u8 }; - write.write_all(&[has_positions_code])?; - Ok(TermDictionaryBuilderImpl { - write: CountingWriter::wrap(write), - term_delta_encoder: TermDeltaEncoder::default(), - term_info_encoder: TermInfoDeltaEncoder::new(has_positions), - block_index: fst::MapBuilder::new(vec![]).expect("This cannot fail"), - checkpoints: vec![], - len: 0, - }) - } - - /// Inserts a `(key, value)` pair in the term dictionary. - /// - /// *Keys have to be inserted in order.* - fn insert>(&mut self, key_ref: K, value: &TermInfo) -> io::Result<()> { - let key = key_ref.as_ref(); - self.insert_key(key)?; - self.insert_value(value)?; - Ok(()) - } - - /// Finalize writing the builder, and returns the underlying - /// `Write` object. - fn finish(mut self) -> io::Result { - self.add_index_entry(); - self.write.write_all(&[0u8; PADDING_SIZE])?; - let fst_addr = self.write.written_bytes(); - let fst_write = self.block_index.into_inner().map_err(convert_fst_error)?; - self.write.write_all(&fst_write)?; - let check_points_addr = self.write.written_bytes(); - let (mut w, _) = self.write.finish()?; - w.write_all(&self.checkpoints)?; - (fst_addr as u64).serialize(&mut w)?; - (check_points_addr as u64).serialize(&mut w)?; - w.flush()?; - Ok(w) - } -} - -fn open_fst_index(source: ReadOnlySource) -> io::Result { - use self::ReadOnlySource::*; - let fst_result = match source { - Anonymous(data) => Fst::from_shared_bytes(data.data, data.start, data.len), - Mmap(mmap_readonly) => Fst::from_mmap(mmap_readonly), - }; - let fst = fst_result.map_err(convert_fst_error)?; - Ok(fst::Map::from(fst)) -} - -/// See [`TermDictionary`](./trait.TermDictionary.html) -pub struct TermDictionaryImpl { - stream_data: ReadOnlySource, - fst_index: fst::Map, - checkpoints_data: ReadOnlySource, - has_positions: bool, -} - -impl TermDictionaryImpl { - pub(crate) fn stream_data(&self) -> &[u8] { - self.stream_data.as_slice() - } - - pub(crate) fn strictly_previous_key(&self, key: &[u8]) -> (Vec, CheckPoint) { - let (term, checkpoint_offset) = self.strictly_previous_key_checkpoint_offset(key); - let mut checkpoint_data = &self.checkpoints_data.as_slice()[checkpoint_offset..]; - let checkpoint = - CheckPoint::deserialize(&mut checkpoint_data).expect("Checkpoint data is corrupted"); - (term, checkpoint) - } - - fn strictly_previous_key_checkpoint_offset(&self, key: &[u8]) -> (Vec, usize) { - let fst_map = &self.fst_index; - let fst = fst_map.as_fst(); - let mut node = fst.root(); - let mut node_stack: Vec = vec![node]; - - // first check the longest prefix. - for &b in &key[..key.len() - 1] { - node = match node.find_input(b) { - None => { - break; - } - Some(i) => fst.node(node.transition_addr(i)), - }; - node_stack.push(node); - } - - let len_node_stack = node_stack.len(); - for i in (1..len_node_stack).rev() { - let cur_node = &node_stack[i]; - let b: u8 = key[i]; - let last_transition_opt = cur_node - .transitions() - .take_while(|transition| transition.inp < b) - .last(); - - if let Some(last_transition) = last_transition_opt { - let mut result_buffer = Vec::from(&key[..i]); - result_buffer.push(last_transition.inp); - let mut result = Vec::from(&key[..i]); - result.push(last_transition.inp); - let fork_node = fst.node(last_transition.addr); - fill_last(fst, fork_node, &mut result); - let val = fst_map.get(&result).expect("Fst data corrupted") as usize; - return (result, val); - } else if cur_node.is_final() { - // the previous key is a prefix - let result_buffer = Vec::from(&key[..i]); - let val = fst_map.get(&result_buffer).expect("Fst data corrupted") as usize; - return (result_buffer, val); - } - } - (vec![], 0) - } -} - -impl<'a> TermDictionary<'a> for TermDictionaryImpl { - type Streamer = TermStreamerImpl<'a>; - - type StreamBuilder = TermStreamerBuilderImpl<'a>; - - /// Opens a `TermDictionary` given a data source. - fn from_source(mut source: ReadOnlySource) -> Self { - let has_positions = source.slice(0, 1)[0] == 255u8; - source = source.slice_from(1); - - let total_len = source.len(); - let (body, footer) = source.split(total_len - 16); - - let mut footer_buffer: &[u8] = footer.as_slice(); - let fst_addr = u64::deserialize(&mut footer_buffer) - .expect("deserializing 8 byte should never fail") as usize; - let checkpoints_addr = u64::deserialize(&mut footer_buffer) - .expect("deserializing 8 byte should never fail") - as usize; - - let stream_data = body.slice(0, fst_addr - PADDING_SIZE); - let fst_data = body.slice(fst_addr, checkpoints_addr); - let checkpoints_data = body.slice_from(checkpoints_addr); - - let fst_index = open_fst_index(fst_data).expect("Index FST data corrupted"); - - TermDictionaryImpl { - has_positions, - stream_data, - checkpoints_data, - fst_index, - } - } - - /// Lookups the value corresponding to the key. - fn get>(&self, target_key: K) -> Option { - let mut streamer = self.range().ge(&target_key).into_stream(); - if streamer.advance() && streamer.key() == target_key.as_ref() { - Some(streamer.value().clone()) - } else { - None - } - } - - /// Returns a range builder, to stream all of the terms - /// within an interval. - fn range(&'a self) -> Self::StreamBuilder { - Self::StreamBuilder::new(self, self.has_positions) - } -} - -#[cfg(test)] -mod tests { - use super::num_bytes_required; - - #[test] - fn test_num_bytes_required() { - assert_eq!(num_bytes_required(0), 1); - assert_eq!(num_bytes_required(1), 1); - assert_eq!(num_bytes_required(255), 1); - assert_eq!(num_bytes_required(256), 2); - assert_eq!(num_bytes_required(u32::max_value()), 4); - } -} diff --git a/src/termdict/streamer.rs b/src/termdict/streamer.rs new file mode 100644 index 000000000..6486a5202 --- /dev/null +++ b/src/termdict/streamer.rs @@ -0,0 +1,131 @@ +use fst::{IntoStreamer, Streamer}; +use fst::map::{Stream, StreamBuilder}; +use postings::TermInfo; +use super::TermDictionary; +use termdict::TermOrdinal; + +/// `TermStreamerBuilder` is an helper object used to define +/// a range of terms that should be streamed. +pub struct TermStreamerBuilder<'a> { + fst_map: &'a TermDictionary, + stream_builder: StreamBuilder<'a>, +} + + +impl<'a> TermStreamerBuilder<'a> { + + pub(crate) fn new(fst_map: &'a TermDictionary, stream_builder: StreamBuilder<'a>) -> Self { + TermStreamerBuilder { + fst_map, + stream_builder, + } + } + + /// Limit the range to terms greater or equal to the bound + pub fn ge>(mut self, bound: T) -> Self { + self.stream_builder = self.stream_builder.ge(bound); + self + } + + /// Limit the range to terms strictly greater than the bound + pub fn gt>(mut self, bound: T) -> Self { + self.stream_builder = self.stream_builder.gt(bound); + self + } + + /// Limit the range to terms lesser or equal to the bound + pub fn le>(mut self, bound: T) -> Self { + self.stream_builder = self.stream_builder.le(bound); + self + } + + /// Limit the range to terms lesser or equal to the bound + pub fn lt>(mut self, bound: T) -> Self { + self.stream_builder = self.stream_builder.lt(bound); + self + } + + /// Creates the stream corresponding to the range + /// of terms defined using the `TermStreamerBuilder`. + pub fn into_stream(self) -> TermStreamer<'a> { + TermStreamer { + fst_map: self.fst_map, + stream: self.stream_builder.into_stream(), + term_ord: 0u64, + current_key: Vec::with_capacity(100), + current_value: TermInfo::default(), + } + } +} + +/// `TermStreamer` acts as a cursor over a range of terms of a segment. +/// Terms are guaranteed to be sorted. +pub struct TermStreamer<'a> { + fst_map: &'a TermDictionary, + stream: Stream<'a>, + term_ord: TermOrdinal, + current_key: Vec, + current_value: TermInfo, +} + +impl<'a> TermStreamer<'a> { + + /// Advance position the stream on the next item. + /// Before the first call to `.advance()`, the stream + /// is an unitialized state. + pub fn advance(&mut self) -> bool { + if let Some((term, term_ord)) = self.stream.next() { + self.current_key.clear(); + self.current_key.extend_from_slice(term); + self.term_ord = term_ord; + self.current_value = self.fst_map.term_info_from_ord(term_ord); + true + } else { + false + } + } + + /// Returns the `TermOrdinal` of the given term. + /// + /// May panic if the called as `.advance()` as never + /// been called before. + pub fn term_ord(&self) -> TermOrdinal { + self.term_ord + } + + /// Accesses the current key. + /// + /// `.key()` should return the key that was returned + /// by the `.next()` method. + /// + /// If the end of the stream as been reached, and `.next()` + /// has been called and returned `None`, `.key()` remains + /// the value of the last key encountered. + /// + /// Before any call to `.next()`, `.key()` returns an empty array. + pub fn key(&self) -> &[u8] { + &self.current_key + } + + /// Accesses the current value. + /// + /// Calling `.value()` after the end of the stream will return the + /// last `.value()` encountered. + /// + /// # Panics + /// + /// Calling `.value()` before the first call to `.advance()` returns + /// `V::default()`. + pub fn value(&self) -> &TermInfo { + &self.current_value + } + + /// Return the next `(key, value)` pair. + pub fn next(&mut self) -> Option<(&[u8], &TermInfo)> { + if self.advance() { + Some((self.key(), self.value())) + } else { + None + } + } +} diff --git a/src/termdict/fstdict/term_info_store.rs b/src/termdict/term_info_store.rs similarity index 100% rename from src/termdict/fstdict/term_info_store.rs rename to src/termdict/term_info_store.rs diff --git a/src/termdict/fstdict/termdict.rs b/src/termdict/termdict.rs similarity index 60% rename from src/termdict/fstdict/termdict.rs rename to src/termdict/termdict.rs index 5f71b40fc..92966bcea 100644 --- a/src/termdict/fstdict/termdict.rs +++ b/src/termdict/termdict.rs @@ -6,24 +6,48 @@ use common::BinarySerializable; use common::CountingWriter; use schema::FieldType; use postings::TermInfo; -use termdict::{TermDictionary, TermDictionaryBuilder, TermOrdinal}; -use super::{TermInfoStore, TermInfoStoreWriter, TermStreamerBuilderImpl, TermStreamerImpl}; +use termdict::TermOrdinal; +use super::{TermStreamerBuilder, TermStreamer}; +use super::term_info_store::{TermInfoStore, TermInfoStoreWriter}; fn convert_fst_error(e: fst::Error) -> io::Error { io::Error::new(io::ErrorKind::Other, e) } -/// See [`TermDictionaryBuilder`](./trait.TermDictionaryBuilder.html) -pub struct TermDictionaryBuilderImpl { +/// Builder for the new term dictionary. +/// +/// Inserting must be done in the order of the `keys`. +pub struct TermDictionaryBuilder { fst_builder: fst::MapBuilder, term_info_store_writer: TermInfoStoreWriter, term_ord: u64, } -impl TermDictionaryBuilderImpl + +impl TermDictionaryBuilder where - W: Write, + W: Write { + /// Creates a new `TermDictionaryBuilder` + pub fn new(w: W, _field_type: FieldType) -> io::Result { + let fst_builder = fst::MapBuilder::new(w).map_err(convert_fst_error)?; + Ok(TermDictionaryBuilder { + fst_builder, + term_info_store_writer: TermInfoStoreWriter::new(), + term_ord: 0, + }) + } + + /// Inserts a `(key, value)` pair in the term dictionary. + /// + /// *Keys have to be inserted in order.* + pub fn insert>(&mut self, key_ref: K, value: &TermInfo) -> io::Result<()> { + let key = key_ref.as_ref(); + self.insert_key(key)?; + self.insert_value(value)?; + Ok(()) + } + /// # Warning /// Horribly dangerous internal API /// @@ -46,29 +70,10 @@ where self.term_info_store_writer.write_term_info(term_info)?; Ok(()) } -} -impl TermDictionaryBuilder for TermDictionaryBuilderImpl -where - W: Write, -{ - fn new(w: W, _field_type: FieldType) -> io::Result { - let fst_builder = fst::MapBuilder::new(w).map_err(convert_fst_error)?; - Ok(TermDictionaryBuilderImpl { - fst_builder, - term_info_store_writer: TermInfoStoreWriter::new(), - term_ord: 0, - }) - } - - fn insert>(&mut self, key_ref: K, value: &TermInfo) -> io::Result<()> { - let key = key_ref.as_ref(); - self.insert_key(key)?; - self.insert_value(value)?; - Ok(()) - } - - fn finish(mut self) -> io::Result { + /// Finalize writing the builder, and returns the underlying + /// `Write` object. + pub fn finish(mut self) -> io::Result { let mut file = self.fst_builder.into_inner().map_err(convert_fst_error)?; { let mut counting_writer = CountingWriter::wrap(&mut file); @@ -94,18 +99,21 @@ fn open_fst_index(source: ReadOnlySource) -> fst::Map { fst::Map::from(fst) } -/// See [`TermDictionary`](./trait.TermDictionary.html) -pub struct TermDictionaryImpl { +/// The term dictionary contains all of the terms in +/// `tantivy index` in a sorted manner. +/// +/// The `Fst` crate is used to assoicated terms to their +/// respective `TermOrdinal`. The `TermInfoStore` then makes it +/// possible to fetch the associated `TermInfo`. +pub struct TermDictionary { fst_index: fst::Map, term_info_store: TermInfoStore, } -impl<'a> TermDictionary<'a> for TermDictionaryImpl { - type Streamer = TermStreamerImpl<'a>; +impl TermDictionary { - type StreamBuilder = TermStreamerBuilderImpl<'a>; - - fn from_source(source: ReadOnlySource) -> Self { + /// Opens a `TermDictionary` given a data source. + pub fn from_source(source: ReadOnlySource) -> Self { let total_len = source.len(); let length_offset = total_len - 8; let mut split_len_buffer: &[u8] = &source.as_slice()[length_offset..]; @@ -115,15 +123,16 @@ impl<'a> TermDictionary<'a> for TermDictionaryImpl { let fst_source = source.slice(0, split_len); let values_source = source.slice(split_len, length_offset); let fst_index = open_fst_index(fst_source); - TermDictionaryImpl { + TermDictionary { fst_index, term_info_store: TermInfoStore::open(&values_source), } } - fn empty(field_type: FieldType) -> Self { + /// Creates an empty term dictionary which contains no terms. + pub fn empty(field_type: FieldType) -> Self { let term_dictionary_data: Vec = - TermDictionaryBuilderImpl::new(Vec::::new(), field_type) + TermDictionaryBuilder::new(Vec::::new(), field_type) .expect("Creating a TermDictionaryBuilder in a Vec should never fail") .finish() .expect("Writing in a Vec should never fail"); @@ -131,15 +140,27 @@ impl<'a> TermDictionary<'a> for TermDictionaryImpl { Self::from_source(source) } - fn num_terms(&self) -> usize { + /// Returns the number of terms in the dictionary. + /// Term ordinals range from 0 to `num_terms() - 1`. + pub fn num_terms(&self) -> usize { self.term_info_store.num_terms() } - fn term_ord>(&self, key: K) -> Option { + /// Returns the ordinal associated to a given term. + pub fn term_ord>(&self, key: K) -> Option { self.fst_index.get(key) } - fn ord_to_term(&self, mut ord: TermOrdinal, bytes: &mut Vec) -> bool { + /// Returns the term associated to a given term ordinal. + /// + /// Term ordinals are defined as the position of the term in + /// the sorted list of terms. + /// + /// Returns true iff the term has been found. + /// + /// Regardless of whether the term is found or not, + /// the buffer may be modified. + pub fn ord_to_term(&self, mut ord: TermOrdinal, bytes: &mut Vec) -> bool { bytes.clear(); let fst = self.fst_index.as_fst(); let mut node = fst.root(); @@ -159,16 +180,26 @@ impl<'a> TermDictionary<'a> for TermDictionaryImpl { true } - fn term_info_from_ord(&self, term_ord: TermOrdinal) -> TermInfo { + /// Returns the number of terms in the dictionary. + pub fn term_info_from_ord(&self, term_ord: TermOrdinal) -> TermInfo { self.term_info_store.get(term_ord) } - fn get>(&self, key: K) -> Option { + /// Lookups the value corresponding to the key. + pub fn get>(&self, key: K) -> Option { self.term_ord(key) .map(|term_ord| self.term_info_from_ord(term_ord)) } - fn range(&self) -> TermStreamerBuilderImpl { - TermStreamerBuilderImpl::new(self, self.fst_index.range()) + /// Returns a range builder, to stream all of the terms + /// within an interval. + pub fn range<'a>(&'a self) -> TermStreamerBuilder<'a> { + TermStreamerBuilder::new(self, self.fst_index.range()) + } + + + /// A stream of all the sorted terms. [See also `.stream_field()`](#method.stream_field) + pub fn stream<'a>(&'a self) -> TermStreamer<'a> { + self.range().into_stream() } }