From aaf1b2c6b6e7ee5dcd294459666a8720b461e471 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 24 May 2017 10:14:40 +0900 Subject: [PATCH] Reorganized code and added documentation. --- src/common/mod.rs | 2 - src/core/searcher.rs | 2 +- src/core/segment_reader.rs | 9 +- src/indexer/merger.rs | 1 + src/postings/serializer.rs | 7 +- src/termdict/fstdict/mod.rs | 8 +- src/termdict/fstdict/streamer.rs | 103 +++----- src/termdict/fstdict/termdict.rs | 94 +++---- src/termdict/merger.rs | 8 +- src/termdict/mod.rs | 248 +++++++++++++++--- .../streamdict}/counting_writer.rs | 0 src/termdict/streamdict/mod.rs | 10 +- src/termdict/streamdict/streamer.rs | 187 ++++++------- src/termdict/streamdict/termdict.rs | 129 ++++----- 14 files changed, 474 insertions(+), 334 deletions(-) rename src/{common => termdict/streamdict}/counting_writer.rs (100%) diff --git a/src/common/mod.rs b/src/common/mod.rs index 84b4cadc5..eef6b283d 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -1,7 +1,6 @@ mod serialize; mod timer; mod vint; -mod counting_writer; pub mod bitpacker; pub use self::serialize::BinarySerializable; @@ -9,7 +8,6 @@ pub use self::timer::Timing; pub use self::timer::TimerTree; pub use self::timer::OpenTimer; pub use self::vint::VInt; -pub use self::counting_writer::CountingWriter; use std::io; diff --git a/src/core/searcher.rs b/src/core/searcher.rs index 8f6d36b82..6e7078b5f 100644 --- a/src/core/searcher.rs +++ b/src/core/searcher.rs @@ -34,7 +34,7 @@ impl Searcher { } /// Returns the overall number of documents in the index. - pub fn num_docs(&self) -> DocId { + pub fn num_docs(&self) -> DocId { self.segment_readers .iter() .map(|segment_reader| segment_reader.num_docs()) diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index 77ee2148d..208a73559 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -12,9 +12,10 @@ use schema::Document; use directory::ReadOnlySource; use DocId; use std::str; +use termdict::TermDictionary; use std::cmp; use postings::TermInfo; -use termdict::TermDictionary; +use termdict::TermDictionaryImpl; use std::sync::Arc; use std::fmt; use schema::Field; @@ -41,7 +42,7 @@ use postings::FreqHandler; pub struct SegmentReader { segment_id: SegmentId, segment_meta: SegmentMeta, - terms: Arc, + terms: Arc, postings_data: ReadOnlySource, store_reader: StoreReader, fast_fields_reader: Arc, @@ -133,7 +134,7 @@ impl SegmentReader { pub fn open(segment: Segment) -> Result { let source = segment.open_read(SegmentComponent::TERMS)?; - let terms = TermDictionary::from_source(source)?; + let terms = TermDictionaryImpl::from_source(source)?; let store_source = segment.open_read(SegmentComponent::STORE)?; let store_reader = StoreReader::from_source(store_source); @@ -173,7 +174,7 @@ impl SegmentReader { } /// Return the term dictionary datastructure. - pub fn terms(&self) -> &TermDictionary { + pub fn terms(&self) -> &TermDictionaryImpl { &self.terms } diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 3528818ae..111478792 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -18,6 +18,7 @@ use fastfield::FastFieldReader; use store::StoreWriter; use std::cmp::{min, max}; use schema; +use termdict::TermStreamer; use postings::SegmentPostingsOption; pub struct IndexMerger { diff --git a/src/postings/serializer.rs b/src/postings/serializer.rs index 1313ad445..36f489290 100644 --- a/src/postings/serializer.rs +++ b/src/postings/serializer.rs @@ -1,5 +1,5 @@ use Result; -use termdict::TermDictionaryBuilder; +use termdict::TermDictionaryBuilderImpl; use super::TermInfo; use schema::Field; use schema::FieldEntry; @@ -16,6 +16,7 @@ use std::io::Write; use compression::VIntEncoder; use common::VInt; use common::BinarySerializable; +use termdict::TermDictionaryBuilder; /// `PostingsSerializer` is in charge of serializing @@ -50,7 +51,7 @@ use common::BinarySerializable; /// A description of the serialization format is /// [available here](https://fulmicoton.gitbooks.io/tantivy-doc/content/inverted-index.html). pub struct PostingsSerializer { - terms_fst_builder: TermDictionaryBuilder, + terms_fst_builder: TermDictionaryBuilderImpl, postings_write: WritePtr, positions_write: WritePtr, written_bytes_postings: usize, @@ -74,7 +75,7 @@ impl PostingsSerializer { positions_write: WritePtr, schema: Schema) -> Result { - let terms_fst_builder = try!(TermDictionaryBuilder::new(terms_write)); + let terms_fst_builder = try!(TermDictionaryBuilderImpl::new(terms_write)); Ok(PostingsSerializer { terms_fst_builder: terms_fst_builder, postings_write: postings_write, diff --git a/src/termdict/fstdict/mod.rs b/src/termdict/fstdict/mod.rs index a0b6589d5..3f121752f 100644 --- a/src/termdict/fstdict/mod.rs +++ b/src/termdict/fstdict/mod.rs @@ -17,10 +17,10 @@ Keys (`&[u8]`) in this datastructure are sorted. mod termdict; mod streamer; -pub use self::termdict::TermDictionary; -pub use self::termdict::TermDictionaryBuilder; -pub use self::streamer::TermStreamer; -pub use self::streamer::TermStreamerBuilder; +pub use self::termdict::TermDictionaryImpl; +pub use self::termdict::TermDictionaryBuilderImpl; +pub use self::streamer::TermStreamerImpl; +pub use self::streamer::TermStreamerBuilderImpl; diff --git a/src/termdict/fstdict/streamer.rs b/src/termdict/fstdict/streamer.rs index 082a818b8..6cf6e57db 100644 --- a/src/termdict/fstdict/streamer.rs +++ b/src/termdict/fstdict/streamer.rs @@ -1,48 +1,57 @@ use fst::{IntoStreamer, Streamer}; use fst::map::{StreamBuilder, Stream}; use common::BinarySerializable; -use super::TermDictionary; +use super::TermDictionaryImpl; +use termdict::{TermStreamerBuilder, TermStreamer}; -/// `TermStreamerBuilder` is an helper object used to define -/// a range of terms that should be streamed. -pub struct TermStreamerBuilder<'a, V> +/// See [TermStreamerBuilder](./trait.TermStreamerBuilder.html) +pub struct TermStreamerBuilderImpl<'a, V> where V: 'a + BinarySerializable + Default { - fst_map: &'a TermDictionary, + fst_map: &'a TermDictionaryImpl, stream_builder: StreamBuilder<'a>, } -impl<'a, V> TermStreamerBuilder<'a, V> - where V: 'a + BinarySerializable + Default -{ - /// Limit the range to terms greater or equal to the bound - pub fn ge>(mut self, bound: T) -> Self { +impl<'a, V> TermStreamerBuilderImpl<'a, V> + where V: 'a + BinarySerializable + Default { + + pub(crate) fn new(fst_map: &'a TermDictionaryImpl, + stream_builder: StreamBuilder<'a>) + -> Self { + TermStreamerBuilderImpl { + fst_map: fst_map, + stream_builder: stream_builder, + } + } +} + +impl<'a, V> TermStreamerBuilder for TermStreamerBuilderImpl<'a, V> + where V: 'a + BinarySerializable + Default { + + type Streamer = TermStreamerImpl<'a, V>; + + fn ge>(mut self, bound: T) -> Self { self.stream_builder = self.stream_builder.ge(bound); self } - /// Limit the range to terms strictly greater than the bound - pub fn gt>(mut self, bound: T) -> Self { + fn gt>(mut self, bound: T) -> Self { self.stream_builder = self.stream_builder.gt(bound); self } - /// Limit the range to terms lesser or equal to the bound - pub fn le>(mut self, bound: T) -> Self { + fn le>(mut self, bound: T) -> Self { self.stream_builder = self.stream_builder.le(bound); self } - /// Limit the range to terms lesser or equal to the bound - pub fn lt>(mut self, bound: T) -> Self { + fn lt>(mut self, bound: T) -> Self { self.stream_builder = self.stream_builder.lt(bound); self } - /// Creates the stream corresponding to the range - /// of terms defined using the `TermStreamerBuilder`. - pub fn into_stream(self) -> TermStreamer<'a, V> { - TermStreamer { + fn into_stream(self) -> Self::Streamer { + TermStreamerImpl { fst_map: self.fst_map, stream: self.stream_builder.into_stream(), offset: 0u64, @@ -50,41 +59,24 @@ impl<'a, V> TermStreamerBuilder<'a, V> current_value: V::default(), } } - - pub(crate) fn new(fst_map: &'a TermDictionary, - stream_builder: StreamBuilder<'a>) - -> TermStreamerBuilder<'a, V> { - TermStreamerBuilder { - fst_map: fst_map, - stream_builder: stream_builder, - } - } } - - -/// `TermStreamer` acts as a cursor over a range of terms of a segment. -/// Terms are guaranteed to be sorted. -pub struct TermStreamer<'a, V> +/// See [TermStreamer](./trait.TermStreamer.html) +pub struct TermStreamerImpl<'a, V> where V: 'a + BinarySerializable + Default { - fst_map: &'a TermDictionary, + fst_map: &'a TermDictionaryImpl, stream: Stream<'a>, offset: u64, current_key: Vec, current_value: V, } - - -impl<'a, V> TermStreamer<'a, V> - where V: 'a + BinarySerializable + Default -{ - /// Advance position the stream on the next item. - /// Before the first call to `.advance()`, the stream - /// is an unitialized state. - pub fn advance(&mut self) -> bool { +impl<'a, V> TermStreamer for TermStreamerImpl<'a, V> + where V: BinarySerializable + Default { + + fn advance(&mut self) -> bool { if let Some((term, offset)) = self.stream.next() { self.current_key.clear(); self.current_key.extend_from_slice(term); @@ -98,30 +90,11 @@ impl<'a, V> TermStreamer<'a, V> } } - /// Accesses the current key. - /// - /// `.key()` should return the key that was returned - /// by the `.next()` method. - /// - /// If the end of the stream as been reached, and `.next()` - /// has been called and returned `None`, `.key()` remains - /// the value of the last key encounterred. - /// - /// Before any call to `.next()`, `.key()` returns an empty array. - pub fn key(&self) -> &[u8] { + fn key(&self) -> &[u8] { &self.current_key } - /// Accesses the current value. - /// - /// Calling `.value()` after the end of the stream will return the - /// last `.value()` encounterred. - /// - /// # Panics - /// - /// Calling `.value()` before the first call to `.advance()` returns - /// `V::default()`. - pub fn value(&self) -> &V { + fn value(&self) -> &V { &self.current_value } } diff --git a/src/termdict/fstdict/termdict.rs b/src/termdict/fstdict/termdict.rs index 2978fdeb4..4a9f3c997 100644 --- a/src/termdict/fstdict/termdict.rs +++ b/src/termdict/fstdict/termdict.rs @@ -1,22 +1,19 @@ use std::io::{self, Write}; use fst; use fst::raw::Fst; -use super::TermStreamerBuilder; use directory::ReadOnlySource; use common::BinarySerializable; use std::marker::PhantomData; use postings::TermInfo; - +use termdict::{TermDictionary, TermDictionaryBuilder}; +use super::{TermStreamerImpl, TermStreamerBuilderImpl}; fn convert_fst_error(e: fst::Error) -> io::Error { io::Error::new(io::ErrorKind::Other, e) } - -/// Builder for the new term dictionary. -/// -/// Just like for the fst crate, all terms must be inserted in order. -pub struct TermDictionaryBuilder +/// See [TermDictionaryBuilder](./trait.TermDictionaryBuilder.html) +pub struct TermDictionaryBuilderImpl where W: Write, V: BinarySerializable + Default { fst_builder: fst::MapBuilder, @@ -24,18 +21,9 @@ pub struct TermDictionaryBuilder _phantom_: PhantomData, } -impl TermDictionaryBuilder +impl TermDictionaryBuilderImpl where W: Write, V: BinarySerializable + Default { - /// Creates a new `TermDictionaryBuilder` - pub fn new(w: W) -> io::Result> { - let fst_builder = fst::MapBuilder::new(w).map_err(convert_fst_error)?; - Ok(TermDictionaryBuilder { - fst_builder: fst_builder, - data: Vec::new(), - _phantom_: PhantomData, - }) - } - + /// # Warning /// Horribly dangerous internal API /// @@ -58,10 +46,22 @@ impl TermDictionaryBuilder Ok(()) } - /// Inserts a `(key, value)` pair in the term dictionary. - /// - /// *Keys have to be inserted in order.* - pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()> { +} + +impl TermDictionaryBuilder for TermDictionaryBuilderImpl + where W: Write, V: BinarySerializable + Default { + + fn new(w: W) -> io::Result { + let fst_builder = fst::MapBuilder::new(w).map_err(convert_fst_error)?; + Ok(TermDictionaryBuilderImpl { + fst_builder: fst_builder, + data: Vec::new(), + _phantom_: PhantomData, + }) + } + + fn insert>(&mut self, key_ref: K, value: &V) -> io::Result<()> { + let key = key_ref.as_ref(); self.fst_builder .insert(key, self.data.len() as u64) .map_err(convert_fst_error)?; @@ -69,9 +69,7 @@ impl TermDictionaryBuilder Ok(()) } - /// Finalize writing the builder, and returns the underlying - /// `Write` object. - pub fn finish(self) -> io::Result { + fn finish(self) -> io::Result { let mut file = self.fst_builder.into_inner().map_err(convert_fst_error)?; let footer_size = self.data.len() as u32; file.write_all(&self.data)?; @@ -81,7 +79,6 @@ impl TermDictionaryBuilder } } - fn open_fst_index(source: ReadOnlySource) -> io::Result { let fst = match source { ReadOnlySource::Anonymous(data) => { @@ -95,20 +92,35 @@ fn open_fst_index(source: ReadOnlySource) -> io::Result { Ok(fst::Map::from(fst)) } -/// Datastructure to access the `terms` of a segment. -pub struct TermDictionary +/// See [TermDictionary](./trait.TermDictionary.html) +pub struct TermDictionaryImpl where V: BinarySerializable + Default -{ +{ fst_index: fst::Map, values_mmap: ReadOnlySource, _phantom_: PhantomData, } -impl TermDictionary +impl TermDictionaryImpl where V: BinarySerializable + Default { - /// Opens a `TermDictionary` given a data source. - pub fn from_source(source: ReadOnlySource) -> io::Result> { + /// Deserialize and returns the value at address `offset` + pub(crate) fn read_value(&self, offset: u64) -> io::Result { + let buffer = self.values_mmap.as_slice(); + let mut cursor = &buffer[(offset as usize)..]; + V::deserialize(&mut cursor) + } +} + + +impl<'a, V> TermDictionary<'a, V> for TermDictionaryImpl + where V: BinarySerializable + Default + 'a { + + type Streamer = TermStreamerImpl<'a, V>; + + type StreamBuilder = TermStreamerBuilderImpl<'a, V>; + + fn from_source(source: ReadOnlySource) -> io::Result { let total_len = source.len(); let length_offset = total_len - 4; let mut split_len_buffer: &[u8] = &source.as_slice()[length_offset..]; @@ -117,22 +129,14 @@ impl TermDictionary let fst_source = source.slice(0, split_len); let values_source = source.slice(split_len, length_offset); let fst_index = open_fst_index(fst_source)?; - Ok(TermDictionary { + Ok(TermDictionaryImpl { fst_index: fst_index, values_mmap: values_source, _phantom_: PhantomData, }) } - /// Deserialize and returns the value at address `offset` - pub(crate) fn read_value(&self, offset: u64) -> io::Result { - let buffer = self.values_mmap.as_slice(); - let mut cursor = &buffer[(offset as usize)..]; - V::deserialize(&mut cursor) - } - - /// Lookups the value corresponding to the key. - pub fn get>(&self, key: K) -> Option { + fn get>(&self, key: K) -> Option { self.fst_index .get(key) .map(|offset| { @@ -141,9 +145,7 @@ impl TermDictionary }) } - /// Returns a range builder, to stream all of the terms - /// within an interval. - pub fn range(&self) -> TermStreamerBuilder { - TermStreamerBuilder::new(self, self.fst_index.range()) + fn range(&self) -> TermStreamerBuilderImpl { + TermStreamerBuilderImpl::new(self, self.fst_index.range()) } } diff --git a/src/termdict/merger.rs b/src/termdict/merger.rs index f56fbbdfc..d0c699695 100644 --- a/src/termdict/merger.rs +++ b/src/termdict/merger.rs @@ -1,15 +1,17 @@ use std::collections::BinaryHeap; use core::SegmentReader; -use termdict::TermStreamer; +use termdict::TermStreamerImpl; use common::BinarySerializable; use postings::TermInfo; use std::cmp::Ordering; +use termdict::TermStreamer; +use termdict::TermDictionary; use fst::Streamer; pub struct HeapItem<'a, V> where V: 'a + BinarySerializable + Default { - pub streamer: TermStreamer<'a, V>, + pub streamer: TermStreamerImpl<'a, V>, pub segment_ord: usize, } @@ -56,7 +58,7 @@ pub struct TermMerger<'a, V> impl<'a, V> TermMerger<'a, V> where V: 'a + BinarySerializable + Default { - fn new(streams: Vec>) -> TermMerger<'a, V> { + fn new(streams: Vec>) -> TermMerger<'a, V> { TermMerger { heap: BinaryHeap::new(), current_streamers: streams diff --git a/src/termdict/mod.rs b/src/termdict/mod.rs index 610aa303d..624e09124 100644 --- a/src/termdict/mod.rs +++ b/src/termdict/mod.rs @@ -1,45 +1,131 @@ /*! -The term dictionary contains all of the terms in -`tantivy index` in a sorted manner. +The term dictionary is one of the key datastructure of +tantivy. It associates sorted `terms` to their respective +posting list. -It is implemented as a wrapper of the `Fst` crate in order -to add a value type. +The term dictionary makes it possible to iterate through +the keys in a sorted manner. -A finite state transducer itself associates -each term `&[u8]` to a `u64` that is in fact an address -in a buffer. The value is then accessible via -deserializing the value at this address. +# Example -Keys (`&[u8]`) in this datastructure are sorted. +``` +extern crate tantivy; +use tantivy::termdict::*; +use tantivy::directory::ReadOnlySource; + +# fn main() { +# run().expect("Test failed"); +# } +# fn run() -> tantivy::Result<()> { +let mut term_dictionary_builder = TermDictionaryBuilderImpl::new(vec!())?; + +// keys have to be insert in order. +term_dictionary_builder.insert("apple", &1u32)?; +term_dictionary_builder.insert("grape", &2u32)?; +term_dictionary_builder.insert("pear", &3u32)?; +let buffer: Vec = term_dictionary_builder.finish()?; + +let source = ReadOnlySource::from(buffer); +let term_dictionary = TermDictionaryImpl::from_source(source)?; + +assert_eq!(term_dictionary.get("grape"), Some(2u32)); +# Ok(()) +# } +``` + + +# Implementations + +There is currently two implementations of the term dictionary. + +## Default implementation : `fstdict` + +The default one relies heavily on the `fst` crate. +It associate each terms `&[u8]` representation to a `u64` +that is in fact an address in a buffer. The value is then accessible +via deserializing the value at this address. + + +## Stream implementation : `streamdict` + +The `fstdict` is a tiny bit slow when streaming all of +the terms. +For some use case (analytics engine), it is preferrable +to use the `streamdict`, that offers better streaming +performance, to the detriment of `lookup` performance. + +`streamdict` can be enabled by adding the `streamdict` +feature when compiling `tantivy`. + +`streamdict` encodes each term relatively to the precedent +as follows. + +- number of bytes that needs to be popped. +- number of bytes that needs to be added. +- sequence of bytes that is to be added +- value. + +Because such a structure does not allow for lookups, +it comes with a `fst` that indexes 1 out of `1024` +terms in this structure. + +A `lookup` therefore consists in a lookup in the `fst` +followed by a streaming through at most `1024` elements in the +term `stream`. */ use schema::{Field, Term}; use common::BinarySerializable; -use fst; - -#[cfg(not(feature="streamdict"))] +use directory::ReadOnlySource; mod fstdict; -#[cfg(not(feature="streamdict"))] -pub use self::fstdict::{TermDictionary, TermDictionaryBuilder, TermStreamer, TermStreamerBuilder}; - -#[cfg(feature="streamdict")] mod streamdict; -#[cfg(feature="streamdict")] -pub use self::streamdict::{TermDictionary, TermDictionaryBuilder, TermStreamer, TermStreamerBuilder}; - -mod merger; pub use self::merger::TermMerger; -impl TermDictionary - where V: BinarySerializable + Default { - + +#[cfg(not(feature="streamdict"))] +mod defaultimpl { + pub use super::fstdict::{TermDictionaryImpl, TermDictionaryBuilderImpl, TermStreamerImpl, TermStreamerBuilderImpl}; +} + +#[cfg(feature="streamdict")] +mod defaultimpl { + pub use super::streamdict::{TermDictionaryImpl, TermDictionaryBuilderImpl, TermStreamerImpl, TermStreamerBuilderImpl}; +} + + +pub use self::defaultimpl::{TermDictionaryImpl, TermDictionaryBuilderImpl, TermStreamerImpl, TermStreamerBuilderImpl}; + +mod merger; +use std::io; + + +/// Dictionary associating sorted `&[u8]` to values +pub trait TermDictionary<'a, V> + where V: BinarySerializable + Default + 'a , Self: Sized { + + /// Streamer type associated to the term dictionary + type Streamer: TermStreamer + 'a; + + /// StreamerBuilder type associated to the term dictionary + type StreamBuilder: TermStreamerBuilder + 'a; + + /// Opens a `TermDictionary` given a data source. + fn from_source(source: ReadOnlySource) -> io::Result; + + /// Lookups the value corresponding to the key. + fn get>(&self, target_key: K) -> Option; + + /// Returns a range builder, to stream all of the terms + /// within an interval. + fn range(&'a self) -> Self::StreamBuilder; + /// A stream of all the sorted terms. [See also `.stream_field()`](#method.stream_field) - pub fn stream(&self) -> TermStreamer { + fn stream(&'a self) -> Self::Streamer { self.range().into_stream() } /// A stream of all the sorted terms in the given field. - pub fn stream_field(&self, field: Field) -> TermStreamer { + fn stream_field(&'a self, field: Field) -> Self::Streamer { let start_term = Term::from_field_text(field, ""); let stop_term = Term::from_field_text(Field(field.0 + 1), ""); self.range() @@ -47,15 +133,62 @@ impl TermDictionary .lt(stop_term.as_slice()) .into_stream() } - } -impl<'a, 'b, V: 'b> fst::Streamer<'b> for TermStreamer<'a, V> - where V: 'a + BinarySerializable + Default -{ - type Item = (&'b [u8], &'b V); +/// Builder for the new term dictionary. +/// +/// Inserting must be done in the order of the `keys`. +pub trait TermDictionaryBuilder: Sized + where W: io::Write, V: BinarySerializable + Default { - fn next(&'b mut self) -> Option<(&'b [u8], &V)> { + /// Creates a new `TermDictionaryBuilder` + fn new(write: W) -> io::Result; + + /// Inserts a `(key, value)` pair in the term dictionary. + /// + /// *Keys have to be inserted in order.* + fn insert>(&mut self, key: K, value: &V) -> io::Result<()>; + + /// Finalize writing the builder, and returns the underlying + /// `Write` object. + fn finish(self) -> io::Result; +} + + +/// `TermStreamer` acts as a cursor over a range of terms of a segment. +/// Terms are guaranteed to be sorted. +pub trait TermStreamer: Sized { + + /// Advance position the stream on the next item. + /// Before the first call to `.advance()`, the stream + /// is an unitialized state. + fn advance(&mut self) -> bool; + + /// Accesses the current key. + /// + /// `.key()` should return the key that was returned + /// by the `.next()` method. + /// + /// If the end of the stream as been reached, and `.next()` + /// has been called and returned `None`, `.key()` remains + /// the value of the last key encounterred. + /// + /// Before any call to `.next()`, `.key()` returns an empty array. + fn key(&self) -> &[u8]; + + /// Accesses the current value. + /// + /// Calling `.value()` after the end of the stream will return the + /// last `.value()` encounterred. + /// + /// # Panics + /// + /// Calling `.value()` before the first call to `.advance()` returns + /// `V::default()`. + fn value(&self) -> &V; + + /// Return the next `(key, value)` pair. + fn next<'b>(&'b mut self) -> Option<(&'b [u8], &'b V)> { if self.advance() { Some((self.key(), self.value())) } else { @@ -64,17 +197,46 @@ impl<'a, 'b, V: 'b> fst::Streamer<'b> for TermStreamer<'a, V> } } + +/// `TermStreamerBuilder` is an helper object used to define +/// a range of terms that should be streamed. +pub trait TermStreamerBuilder where V: BinarySerializable + Default { + + /// Associated `TermStreamer` type that this builder is building. + type Streamer: TermStreamer; + + /// Limit the range to terms greater or equal to the bound + fn ge>(self, bound: T) -> Self; + + /// Limit the range to terms strictly greater than the bound + fn gt>(self, bound: T) -> Self; + + /// Limit the range to terms lesser or equal to the bound + fn lt>(self, bound: T) -> Self; + + /// Limit the range to terms lesser or equal to the bound + fn le>(self, bound: T) -> Self; + + /// Creates the stream corresponding to the range + /// of terms defined using the `TermStreamerBuilder`. + fn into_stream(self) -> Self::Streamer; +} + + #[cfg(test)] mod tests { - use super::{TermDictionary, TermDictionaryBuilder, TermStreamer}; + use super::{TermDictionaryImpl, TermDictionaryBuilderImpl, TermStreamerImpl}; use directory::{RAMDirectory, Directory, ReadOnlySource}; use std::path::PathBuf; - use fst::Streamer; use schema::{Term, SchemaBuilder, Document, TEXT}; use core::Index; use std::str; + use termdict::TermStreamer; + use termdict::TermStreamerBuilder; + use termdict::TermDictionary; + use termdict::TermDictionaryBuilder; const BLOCK_SIZE: usize = 1_500; - + #[test] fn test_term_dictionary() { @@ -82,7 +244,7 @@ mod tests { let path = PathBuf::from("TermDictionary"); { let write = directory.open_write(&path).unwrap(); - let mut term_dictionary_builder = TermDictionaryBuilder::new(write).unwrap(); + let mut term_dictionary_builder = TermDictionaryBuilderImpl::new(write).unwrap(); term_dictionary_builder .insert("abc".as_bytes(), &34u32) .unwrap(); @@ -92,7 +254,7 @@ mod tests { term_dictionary_builder.finish().unwrap(); } let source = directory.open_read(&path).unwrap(); - let term_dict: TermDictionary = TermDictionary::from_source(source).unwrap(); + let term_dict: TermDictionaryImpl = TermDictionaryImpl::from_source(source).unwrap(); assert_eq!(term_dict.get("abc"), Some(34u32)); assert_eq!(term_dict.get("abcd"), Some(346u32)); let mut stream = term_dict.stream(); @@ -158,14 +320,14 @@ mod tests { .map(|i| (format!("doc{:0>6}", i), i)) .collect(); let buffer: Vec = { - let mut term_dictionary_builder = TermDictionaryBuilder::new(vec!()).unwrap(); + let mut term_dictionary_builder = TermDictionaryBuilderImpl::new(vec!()).unwrap(); for &(ref id, ref i) in &ids { term_dictionary_builder.insert(id.as_bytes(), i).unwrap(); } term_dictionary_builder.finish().unwrap() }; let source = ReadOnlySource::from(buffer); - let term_dictionary: TermDictionary = TermDictionary::from_source(source).unwrap(); + let term_dictionary: TermDictionaryImpl = TermDictionaryImpl::from_source(source).unwrap(); { let mut streamer = term_dictionary.stream(); let mut i = 0; @@ -187,7 +349,7 @@ mod tests { .map(|i| (format!("doc{:0>6}", i), i)) .collect(); let buffer: Vec = { - let mut term_dictionary_builder = TermDictionaryBuilder::new(vec!()).unwrap(); + let mut term_dictionary_builder = TermDictionaryBuilderImpl::new(vec!()).unwrap(); for &(ref id, ref i) in &ids { term_dictionary_builder.insert(id.as_bytes(), i).unwrap(); } @@ -196,7 +358,7 @@ mod tests { let source = ReadOnlySource::from(buffer); - let term_dictionary: TermDictionary = TermDictionary::from_source(source).unwrap(); + let term_dictionary: TermDictionaryImpl = TermDictionaryImpl::from_source(source).unwrap(); { for i in (0..20).chain(6000..8_000) { let &(ref target_key, _) = &ids[i]; @@ -254,7 +416,7 @@ mod tests { #[test] fn test_stream_range_boundaries() { let buffer: Vec = { - let mut term_dictionary_builder = TermDictionaryBuilder::new(vec!()).unwrap(); + let mut term_dictionary_builder = TermDictionaryBuilderImpl::new(vec!()).unwrap(); for i in 0u8..10u8 { let number_arr = [i; 1]; term_dictionary_builder.insert(&number_arr, &i).unwrap(); @@ -262,9 +424,9 @@ mod tests { term_dictionary_builder.finish().unwrap() }; let source = ReadOnlySource::from(buffer); - let term_dictionary: TermDictionary = TermDictionary::from_source(source).unwrap(); + let term_dictionary: TermDictionaryImpl = TermDictionaryImpl::from_source(source).unwrap(); - let value_list = |mut streamer: TermStreamer| { + let value_list = |mut streamer: TermStreamerImpl| { let mut res: Vec = vec!(); while let Some((_, &v)) = streamer.next() { res.push(v); diff --git a/src/common/counting_writer.rs b/src/termdict/streamdict/counting_writer.rs similarity index 100% rename from src/common/counting_writer.rs rename to src/termdict/streamdict/counting_writer.rs diff --git a/src/termdict/streamdict/mod.rs b/src/termdict/streamdict/mod.rs index a58773605..c20668455 100644 --- a/src/termdict/streamdict/mod.rs +++ b/src/termdict/streamdict/mod.rs @@ -1,9 +1,11 @@ mod termdict; mod streamer; +mod counting_writer; -pub use self::termdict::TermDictionary; -pub use self::termdict::TermDictionaryBuilder; -pub use self::streamer::TermStreamer; -pub use self::streamer::TermStreamerBuilder; +use self::counting_writer::CountingWriter; +pub use self::termdict::TermDictionaryImpl; +pub use self::termdict::TermDictionaryBuilderImpl; +pub use self::streamer::TermStreamerImpl; +pub use self::streamer::TermStreamerBuilderImpl; diff --git a/src/termdict/streamdict/streamer.rs b/src/termdict/streamdict/streamer.rs index 01c2c38c9..50fb5bb9f 100644 --- a/src/termdict/streamdict/streamer.rs +++ b/src/termdict/streamdict/streamer.rs @@ -4,36 +4,95 @@ use std::cmp::max; use std::io::Read; use common::VInt; use common::BinarySerializable; -use super::TermDictionary; -use fst::Streamer; +use super::TermDictionaryImpl; +use termdict::{TermStreamerBuilder, TermStreamer}; -pub(crate) fn stream_before<'a, V>(term_dictionary: &'a TermDictionary, target_key: &[u8]) -> TermStreamer<'a, V> +pub(crate) fn stream_before<'a, V>(term_dictionary: &'a TermDictionaryImpl, target_key: &[u8]) -> TermStreamerImpl<'a, V> where V: 'a + BinarySerializable + Default { let (prev_key, offset) = term_dictionary.strictly_previous_key(target_key.as_ref()); let offset: usize = offset as usize; - TermStreamer { + TermStreamerImpl { cursor: &term_dictionary.stream_data()[offset..], current_key: Vec::from(prev_key), current_value: V::default(), } } -/// `TermStreamerBuilder` is an helper object used to define -/// a range of terms that should be streamed. -pub struct TermStreamerBuilder<'a, V> +/// See [TermStreamerBuilder](./trait.TermStreamerBuilder.html) +pub struct TermStreamerBuilderImpl<'a, V> where V: 'a + BinarySerializable + Default { - term_dictionary: &'a TermDictionary, + term_dictionary: &'a TermDictionaryImpl, origin: usize, offset_from: usize, offset_to: usize, current_key: Vec, } +impl<'a, V> TermStreamerBuilder for TermStreamerBuilderImpl<'a, V> + where V: 'a + BinarySerializable + Default { + + type Streamer = TermStreamerImpl<'a, V>; + + /// Limit the range to terms greater or equal to the bound + fn ge>(mut self, bound: T) -> Self { + let target_key = bound.as_ref(); + let streamer = stream_before(&self.term_dictionary, target_key.as_ref()); + let smaller_than = |k: &[u8]| { k.lt(target_key) }; + let (offset_before, current_key) = get_offset(smaller_than, streamer); + self.current_key = current_key; + self.offset_from = offset_before - self.origin; + self + } + + /// Limit the range to terms strictly greater than the bound + fn gt>(mut self, bound: T) -> Self { + let target_key = bound.as_ref(); + let streamer = stream_before(self.term_dictionary, target_key.as_ref()); + let smaller_than = |k: &[u8]| { k.le(target_key) }; + let (offset_before, current_key) = get_offset(smaller_than, streamer); + self.current_key = current_key; + self.offset_from = offset_before - self.origin; + self + } + + /// Limit the range to terms lesser or equal to the bound + fn lt>(mut self, bound: T) -> Self { + let target_key = bound.as_ref(); + let streamer = stream_before(self.term_dictionary, target_key.as_ref()); + let smaller_than = |k: &[u8]| { k.lt(target_key) }; + let (offset_before, _) = get_offset(smaller_than, streamer); + self.offset_to = offset_before - self.origin; + self + } + + /// Limit the range to terms lesser or equal to the bound + fn le>(mut self, bound: T) -> Self { + let target_key = bound.as_ref(); + let streamer = stream_before(self.term_dictionary, target_key.as_ref()); + let smaller_than = |k: &[u8]| { k.le(target_key) }; + let (offset_before, _) = get_offset(smaller_than, streamer); + self.offset_to = offset_before - self.origin; + self + } + + /// Build the streamer. + fn into_stream(self) -> Self::Streamer { + let data: &[u8] = self.term_dictionary.stream_data(); + let start = self.offset_from; + let stop = max(self.offset_to, start); + TermStreamerImpl { + cursor: &data[start..stop], + current_key: self.current_key, + current_value: V::default(), + } + } +} + /// Returns offset information for the first /// key in the stream matching a given predicate. /// /// returns (start offset, the data required to load the value) -fn get_offset<'a, V, P: Fn(&[u8])->bool>(predicate: P, mut streamer: TermStreamer) -> (usize, Vec) +fn get_offset<'a, V, P: Fn(&[u8])->bool>(predicate: P, mut streamer: TermStreamerImpl) -> (usize, Vec) where V: 'a + BinarySerializable + Default { let mut prev: &[u8] = streamer.cursor; @@ -50,55 +109,14 @@ fn get_offset<'a, V, P: Fn(&[u8])->bool>(predicate: P, mut streamer: TermStreame return (prev.as_ptr() as usize, prev_data); } -impl<'a, V> TermStreamerBuilder<'a, V> +impl<'a, V> TermStreamerBuilderImpl<'a, V> where V: 'a + BinarySerializable + Default { - /// Limit the range to terms greater or equal to the bound - pub fn ge>(mut self, bound: T) -> TermStreamerBuilder<'a, V> { - let target_key = bound.as_ref(); - let streamer = stream_before(&self.term_dictionary, target_key.as_ref()); - let smaller_than = |k: &[u8]| { k.lt(target_key) }; - let (offset_before, current_key) = get_offset(smaller_than, streamer); - self.current_key = current_key; - self.offset_from = offset_before - self.origin; - self - } - /// Limit the range to terms strictly greater than the bound - pub fn gt>(mut self, bound: T) -> TermStreamerBuilder<'a, V> { - let target_key = bound.as_ref(); - let streamer = stream_before(self.term_dictionary, target_key.as_ref()); - let smaller_than = |k: &[u8]| { k.le(target_key) }; - let (offset_before, current_key) = get_offset(smaller_than, streamer); - self.current_key = current_key; - self.offset_from = offset_before - self.origin; - self - } - - /// Limit the range to terms lesser or equal to the bound - pub fn lt>(mut self, bound: T) -> TermStreamerBuilder<'a, V> { - let target_key = bound.as_ref(); - let streamer = stream_before(self.term_dictionary, target_key.as_ref()); - let smaller_than = |k: &[u8]| { k.lt(target_key) }; - let (offset_before, _) = get_offset(smaller_than, streamer); - self.offset_to = offset_before - self.origin; - self - } - - /// Limit the range to terms lesser or equal to the bound - pub fn le>(mut self, bound: T) -> TermStreamerBuilder<'a, V> { - let target_key = bound.as_ref(); - let streamer = stream_before(self.term_dictionary, target_key.as_ref()); - let smaller_than = |k: &[u8]| { k.le(target_key) }; - let (offset_before, _) = get_offset(smaller_than, streamer); - self.offset_to = offset_before - self.origin; - self - } - - pub(crate) fn new(term_dictionary: &'a TermDictionary) -> TermStreamerBuilder<'a, V> { + pub(crate) fn new(term_dictionary: &'a TermDictionaryImpl) -> Self { let data = term_dictionary.stream_data(); let origin = data.as_ptr() as usize; - TermStreamerBuilder { + TermStreamerBuilderImpl { term_dictionary: term_dictionary, origin: origin, offset_from: 0, @@ -106,23 +124,10 @@ impl<'a, V> TermStreamerBuilder<'a, V> current_key: vec!(), } } - - /// Build the streamer. - pub fn into_stream(self) -> TermStreamer<'a, V> { - let data: &[u8] = self.term_dictionary.stream_data(); - let start = self.offset_from; - let stop = max(self.offset_to, start); - TermStreamer { - cursor: &data[start..stop], - current_key: self.current_key, - current_value: V::default(), - } - } } -/// `TermStreamer` acts as a cursor over a range of terms of a segment. -/// Terms are guaranteed to be sorted. -pub struct TermStreamer<'a, V> +/// See [TermStreamer](./trait.TermStreamer.html) +pub struct TermStreamerImpl<'a, V> where V: 'a + BinarySerializable + Default { cursor: &'a [u8], current_key: Vec, @@ -130,14 +135,19 @@ pub struct TermStreamer<'a, V> } - -impl<'a, V: BinarySerializable> TermStreamer<'a, V> +impl<'a, V: BinarySerializable> TermStreamerImpl<'a, V> where V: 'a + BinarySerializable + Default { - /// Advance position the stream on the next item. - /// Before the first call to `.advance()`, the stream - /// is an unitialized state. - pub fn advance(&mut self) -> bool { + + pub(crate) fn extract_value(self) -> V { + self.current_value + } +} + +impl<'a, V> TermStreamer for TermStreamerImpl<'a, V> + where V: BinarySerializable + Default { + + fn advance(&mut self) -> bool { if self.cursor.len() == 0 { return false; } @@ -152,34 +162,11 @@ impl<'a, V: BinarySerializable> TermStreamer<'a, V> return true; } - /// Accesses the current key. - /// - /// `.key()` should return the key that was returned - /// by the `.next()` method. - /// - /// If the end of the stream as been reached, and `.next()` - /// has been called and returned `None`, `.key()` remains - /// the value of the last key encounterred. - /// - /// Before any call to `.next()`, `.key()` returns an empty array. - pub fn key(&self) -> &[u8] { + fn key(&self) -> &[u8] { &self.current_key } - /// Accesses the current value. - /// - /// Calling `.value()` after the end of the stream will return the - /// last `.value()` encounterred. - /// - /// # Panics - /// - /// Calling `.value()` before the first call to `.advance()` returns - /// `V::default()`. - pub fn value(&self) -> &V { + fn value(&self) -> &V { &self.current_value } - - pub(crate) fn extract_value(self) -> V { - self.current_value - } -} +} \ No newline at end of file diff --git a/src/termdict/streamdict/termdict.rs b/src/termdict/streamdict/termdict.rs index 9f3614f73..150765f88 100644 --- a/src/termdict/streamdict/termdict.rs +++ b/src/termdict/streamdict/termdict.rs @@ -7,12 +7,13 @@ use common::VInt; use directory::ReadOnlySource; use common::BinarySerializable; use std::marker::PhantomData; -use common::CountingWriter; +use super::CountingWriter; use std::cmp::Ordering; use postings::TermInfo; use fst::raw::Node; -use super::TermStreamerBuilder; use super::streamer::stream_before; +use termdict::{TermDictionary, TermDictionaryBuilder, TermStreamer}; +use super::{TermStreamerImpl, TermStreamerBuilderImpl}; const BLOCK_SIZE: usize = 1024; @@ -20,10 +21,8 @@ fn convert_fst_error(e: fst::Error) -> io::Error { io::Error::new(io::ErrorKind::Other, e) } -/// Builder for the new term dictionary. -/// -/// All terms must be inserted in order. -pub struct TermDictionaryBuilder +/// See [TermDictionaryBuilder](./trait.TermDictionaryBuilder.html) +pub struct TermDictionaryBuilderImpl where W: Write, V: BinarySerializable + Default { write: CountingWriter, block_index: fst::MapBuilder>, @@ -51,34 +50,13 @@ fn fill_last<'a>(fst: &'a Fst, mut node: Node<'a>, buffer: &mut Vec) { } } - -impl TermDictionaryBuilder { - - /// Creates a new `TermDictionaryBuilder` - pub fn new(write: W) -> io::Result> { - let buffer: Vec = vec!(); - Ok(TermDictionaryBuilder { - write: CountingWriter::wrap(write), - block_index: fst::MapBuilder::new(buffer) - .expect("This cannot fail"), - last_key: Vec::with_capacity(128), - len: 0, - _phantom_: PhantomData, - }) - } +impl TermDictionaryBuilderImpl + where W: Write, V: BinarySerializable + Default { fn add_index_entry(&mut self) { self.block_index.insert(&self.last_key, self.write.written_bytes() as u64).unwrap(); } - /// Inserts a `(key, value)` pair in the term dictionary. - /// - /// *Keys have to be inserted in order.* - pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()>{ - self.insert_key(key)?; - self.insert_value(value) - } - /// # Warning /// Horribly dangerous internal API /// @@ -104,10 +82,36 @@ impl TermDictionaryBuilder { value.serialize(&mut self.write)?; Ok(()) } +} + +impl TermDictionaryBuilder for TermDictionaryBuilderImpl + where W: Write, V: BinarySerializable + Default { + + /// Creates a new `TermDictionaryBuilder` + fn new(write: W) -> io::Result { + let buffer: Vec = vec!(); + Ok(TermDictionaryBuilderImpl { + write: CountingWriter::wrap(write), + block_index: fst::MapBuilder::new(buffer) + .expect("This cannot fail"), + last_key: Vec::with_capacity(128), + len: 0, + _phantom_: PhantomData, + }) + } + + /// Inserts a `(key, value)` pair in the term dictionary. + /// + /// *Keys have to be inserted in order.* + fn insert>(&mut self, key_ref: K, value: &V) -> io::Result<()>{ + let key = key_ref.as_ref(); + self.insert_key(key)?; + self.insert_value(value) + } /// Finalize writing the builder, and returns the underlying /// `Write` object. - pub fn finish(mut self) -> io::Result { + fn finish(mut self) -> io::Result { self.add_index_entry(); let (mut w, split_len) = self.write.finish()?; let fst_write = self.block_index @@ -121,8 +125,6 @@ impl TermDictionaryBuilder { } - - fn open_fst_index(source: ReadOnlySource) -> io::Result { Ok(fst::Map::from(match source { ReadOnlySource::Anonymous(data) => try!(Fst::from_shared_bytes(data.data, data.start, data.len).map_err(convert_fst_error)), @@ -130,34 +132,15 @@ fn open_fst_index(source: ReadOnlySource) -> io::Result { })) } -/// Datastructure to access the `terms` of a segment. -pub struct TermDictionary where V: BinarySerializable + Default { +/// See [TermDictionary](./trait.TermDictionary.html) +pub struct TermDictionaryImpl where V: BinarySerializable + Default { stream_data: ReadOnlySource, fst_index: fst::Map, _phantom_: PhantomData, } -impl TermDictionary +impl TermDictionaryImpl where V: BinarySerializable + Default { - - /// Opens a `TermDictionary` given a data source. - pub fn from_source(source: ReadOnlySource) -> io::Result> { - let total_len = source.len(); - let length_offset = total_len - 8; - let split_len: usize = { - let mut split_len_buffer: &[u8] = &source.as_slice()[length_offset..]; - u64::deserialize(&mut split_len_buffer)? as usize - }; - let stream_data = source.slice(0, split_len); - let fst_data = source.slice(split_len, length_offset); - let fst_index = open_fst_index(fst_data)?; - - Ok(TermDictionary { - stream_data: stream_data, - fst_index: fst_index, - _phantom_: PhantomData - }) - } pub(crate) fn stream_data(&self) -> &[u8] { self.stream_data.as_slice() @@ -212,8 +195,37 @@ impl TermDictionary return (vec!(), 0); } +} + + +impl<'a, V> TermDictionary<'a, V> for TermDictionaryImpl + where V: BinarySerializable + Default + 'a { + + type Streamer = TermStreamerImpl<'a, V>; + + type StreamBuilder = TermStreamerBuilderImpl<'a, V>; + + /// Opens a `TermDictionary` given a data source. + fn from_source(source: ReadOnlySource) -> io::Result { + let total_len = source.len(); + let length_offset = total_len - 8; + let split_len: usize = { + let mut split_len_buffer: &[u8] = &source.as_slice()[length_offset..]; + u64::deserialize(&mut split_len_buffer)? as usize + }; + let stream_data = source.slice(0, split_len); + let fst_data = source.slice(split_len, length_offset); + let fst_index = open_fst_index(fst_data)?; + + Ok(TermDictionaryImpl { + stream_data: stream_data, + fst_index: fst_index, + _phantom_: PhantomData + }) + } + /// Lookups the value corresponding to the key. - pub fn get>(&self, target_key: K) -> Option { + fn get>(&self, target_key: K) -> Option { let mut streamer = stream_before(self, target_key.as_ref()); while streamer.advance() { let position = streamer.key().cmp(target_key.as_ref()); @@ -230,10 +242,9 @@ impl TermDictionary return None; } - /// Returns a range builder, to stream all of the terms /// within an interval. - pub fn range(&self) -> TermStreamerBuilder { - TermStreamerBuilder::new(self) + fn range(&'a self) -> Self::StreamBuilder { + Self::StreamBuilder::new(self) } -} +} \ No newline at end of file