mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-05-27 13:40:49 +00:00
Reorganized code and added documentation.
This commit is contained in:
@@ -1,7 +1,6 @@
|
||||
mod serialize;
|
||||
mod timer;
|
||||
mod vint;
|
||||
mod counting_writer;
|
||||
pub mod bitpacker;
|
||||
|
||||
pub use self::serialize::BinarySerializable;
|
||||
@@ -9,7 +8,6 @@ pub use self::timer::Timing;
|
||||
pub use self::timer::TimerTree;
|
||||
pub use self::timer::OpenTimer;
|
||||
pub use self::vint::VInt;
|
||||
pub use self::counting_writer::CountingWriter;
|
||||
|
||||
use std::io;
|
||||
|
||||
|
||||
@@ -34,7 +34,7 @@ impl Searcher {
|
||||
}
|
||||
|
||||
/// Returns the overall number of documents in the index.
|
||||
pub fn num_docs(&self) -> DocId {
|
||||
pub fn num_docs(&self) -> DocId {
|
||||
self.segment_readers
|
||||
.iter()
|
||||
.map(|segment_reader| segment_reader.num_docs())
|
||||
|
||||
@@ -12,9 +12,10 @@ use schema::Document;
|
||||
use directory::ReadOnlySource;
|
||||
use DocId;
|
||||
use std::str;
|
||||
use termdict::TermDictionary;
|
||||
use std::cmp;
|
||||
use postings::TermInfo;
|
||||
use termdict::TermDictionary;
|
||||
use termdict::TermDictionaryImpl;
|
||||
use std::sync::Arc;
|
||||
use std::fmt;
|
||||
use schema::Field;
|
||||
@@ -41,7 +42,7 @@ use postings::FreqHandler;
|
||||
pub struct SegmentReader {
|
||||
segment_id: SegmentId,
|
||||
segment_meta: SegmentMeta,
|
||||
terms: Arc<TermDictionary>,
|
||||
terms: Arc<TermDictionaryImpl>,
|
||||
postings_data: ReadOnlySource,
|
||||
store_reader: StoreReader,
|
||||
fast_fields_reader: Arc<FastFieldsReader>,
|
||||
@@ -133,7 +134,7 @@ impl SegmentReader {
|
||||
pub fn open(segment: Segment) -> Result<SegmentReader> {
|
||||
|
||||
let source = segment.open_read(SegmentComponent::TERMS)?;
|
||||
let terms = TermDictionary::from_source(source)?;
|
||||
let terms = TermDictionaryImpl::from_source(source)?;
|
||||
|
||||
let store_source = segment.open_read(SegmentComponent::STORE)?;
|
||||
let store_reader = StoreReader::from_source(store_source);
|
||||
@@ -173,7 +174,7 @@ impl SegmentReader {
|
||||
}
|
||||
|
||||
/// Return the term dictionary datastructure.
|
||||
pub fn terms(&self) -> &TermDictionary {
|
||||
pub fn terms(&self) -> &TermDictionaryImpl {
|
||||
&self.terms
|
||||
}
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ use fastfield::FastFieldReader;
|
||||
use store::StoreWriter;
|
||||
use std::cmp::{min, max};
|
||||
use schema;
|
||||
use termdict::TermStreamer;
|
||||
use postings::SegmentPostingsOption;
|
||||
|
||||
pub struct IndexMerger {
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use Result;
|
||||
use termdict::TermDictionaryBuilder;
|
||||
use termdict::TermDictionaryBuilderImpl;
|
||||
use super::TermInfo;
|
||||
use schema::Field;
|
||||
use schema::FieldEntry;
|
||||
@@ -16,6 +16,7 @@ use std::io::Write;
|
||||
use compression::VIntEncoder;
|
||||
use common::VInt;
|
||||
use common::BinarySerializable;
|
||||
use termdict::TermDictionaryBuilder;
|
||||
|
||||
|
||||
/// `PostingsSerializer` is in charge of serializing
|
||||
@@ -50,7 +51,7 @@ use common::BinarySerializable;
|
||||
/// A description of the serialization format is
|
||||
/// [available here](https://fulmicoton.gitbooks.io/tantivy-doc/content/inverted-index.html).
|
||||
pub struct PostingsSerializer {
|
||||
terms_fst_builder: TermDictionaryBuilder<WritePtr, TermInfo>,
|
||||
terms_fst_builder: TermDictionaryBuilderImpl<WritePtr, TermInfo>,
|
||||
postings_write: WritePtr,
|
||||
positions_write: WritePtr,
|
||||
written_bytes_postings: usize,
|
||||
@@ -74,7 +75,7 @@ impl PostingsSerializer {
|
||||
positions_write: WritePtr,
|
||||
schema: Schema)
|
||||
-> Result<PostingsSerializer> {
|
||||
let terms_fst_builder = try!(TermDictionaryBuilder::new(terms_write));
|
||||
let terms_fst_builder = try!(TermDictionaryBuilderImpl::new(terms_write));
|
||||
Ok(PostingsSerializer {
|
||||
terms_fst_builder: terms_fst_builder,
|
||||
postings_write: postings_write,
|
||||
|
||||
@@ -17,10 +17,10 @@ Keys (`&[u8]`) in this datastructure are sorted.
|
||||
mod termdict;
|
||||
mod streamer;
|
||||
|
||||
pub use self::termdict::TermDictionary;
|
||||
pub use self::termdict::TermDictionaryBuilder;
|
||||
pub use self::streamer::TermStreamer;
|
||||
pub use self::streamer::TermStreamerBuilder;
|
||||
pub use self::termdict::TermDictionaryImpl;
|
||||
pub use self::termdict::TermDictionaryBuilderImpl;
|
||||
pub use self::streamer::TermStreamerImpl;
|
||||
pub use self::streamer::TermStreamerBuilderImpl;
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -1,48 +1,57 @@
|
||||
use fst::{IntoStreamer, Streamer};
|
||||
use fst::map::{StreamBuilder, Stream};
|
||||
use common::BinarySerializable;
|
||||
use super::TermDictionary;
|
||||
use super::TermDictionaryImpl;
|
||||
use termdict::{TermStreamerBuilder, TermStreamer};
|
||||
|
||||
/// `TermStreamerBuilder` is an helper object used to define
|
||||
/// a range of terms that should be streamed.
|
||||
pub struct TermStreamerBuilder<'a, V>
|
||||
/// See [TermStreamerBuilder](./trait.TermStreamerBuilder.html)
|
||||
pub struct TermStreamerBuilderImpl<'a, V>
|
||||
where V: 'a + BinarySerializable + Default
|
||||
{
|
||||
fst_map: &'a TermDictionary<V>,
|
||||
fst_map: &'a TermDictionaryImpl<V>,
|
||||
stream_builder: StreamBuilder<'a>,
|
||||
}
|
||||
|
||||
impl<'a, V> TermStreamerBuilder<'a, V>
|
||||
where V: 'a + BinarySerializable + Default
|
||||
{
|
||||
/// Limit the range to terms greater or equal to the bound
|
||||
pub fn ge<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
|
||||
impl<'a, V> TermStreamerBuilderImpl<'a, V>
|
||||
where V: 'a + BinarySerializable + Default {
|
||||
|
||||
pub(crate) fn new(fst_map: &'a TermDictionaryImpl<V>,
|
||||
stream_builder: StreamBuilder<'a>)
|
||||
-> Self {
|
||||
TermStreamerBuilderImpl {
|
||||
fst_map: fst_map,
|
||||
stream_builder: stream_builder,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, V> TermStreamerBuilder<V> for TermStreamerBuilderImpl<'a, V>
|
||||
where V: 'a + BinarySerializable + Default {
|
||||
|
||||
type Streamer = TermStreamerImpl<'a, V>;
|
||||
|
||||
fn ge<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
|
||||
self.stream_builder = self.stream_builder.ge(bound);
|
||||
self
|
||||
}
|
||||
|
||||
/// Limit the range to terms strictly greater than the bound
|
||||
pub fn gt<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
|
||||
fn gt<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
|
||||
self.stream_builder = self.stream_builder.gt(bound);
|
||||
self
|
||||
}
|
||||
|
||||
/// Limit the range to terms lesser or equal to the bound
|
||||
pub fn le<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
|
||||
fn le<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
|
||||
self.stream_builder = self.stream_builder.le(bound);
|
||||
self
|
||||
}
|
||||
|
||||
/// Limit the range to terms lesser or equal to the bound
|
||||
pub fn lt<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
|
||||
fn lt<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
|
||||
self.stream_builder = self.stream_builder.lt(bound);
|
||||
self
|
||||
}
|
||||
|
||||
/// Creates the stream corresponding to the range
|
||||
/// of terms defined using the `TermStreamerBuilder`.
|
||||
pub fn into_stream(self) -> TermStreamer<'a, V> {
|
||||
TermStreamer {
|
||||
fn into_stream(self) -> Self::Streamer {
|
||||
TermStreamerImpl {
|
||||
fst_map: self.fst_map,
|
||||
stream: self.stream_builder.into_stream(),
|
||||
offset: 0u64,
|
||||
@@ -50,41 +59,24 @@ impl<'a, V> TermStreamerBuilder<'a, V>
|
||||
current_value: V::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn new(fst_map: &'a TermDictionary<V>,
|
||||
stream_builder: StreamBuilder<'a>)
|
||||
-> TermStreamerBuilder<'a, V> {
|
||||
TermStreamerBuilder {
|
||||
fst_map: fst_map,
|
||||
stream_builder: stream_builder,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/// `TermStreamer` acts as a cursor over a range of terms of a segment.
|
||||
/// Terms are guaranteed to be sorted.
|
||||
pub struct TermStreamer<'a, V>
|
||||
/// See [TermStreamer](./trait.TermStreamer.html)
|
||||
pub struct TermStreamerImpl<'a, V>
|
||||
where V: 'a + BinarySerializable + Default
|
||||
{
|
||||
fst_map: &'a TermDictionary<V>,
|
||||
fst_map: &'a TermDictionaryImpl<V>,
|
||||
stream: Stream<'a>,
|
||||
offset: u64,
|
||||
current_key: Vec<u8>,
|
||||
current_value: V,
|
||||
}
|
||||
|
||||
|
||||
|
||||
impl<'a, V> TermStreamer<'a, V>
|
||||
where V: 'a + BinarySerializable + Default
|
||||
{
|
||||
/// Advance position the stream on the next item.
|
||||
/// Before the first call to `.advance()`, the stream
|
||||
/// is an unitialized state.
|
||||
pub fn advance(&mut self) -> bool {
|
||||
impl<'a, V> TermStreamer<V> for TermStreamerImpl<'a, V>
|
||||
where V: BinarySerializable + Default {
|
||||
|
||||
fn advance(&mut self) -> bool {
|
||||
if let Some((term, offset)) = self.stream.next() {
|
||||
self.current_key.clear();
|
||||
self.current_key.extend_from_slice(term);
|
||||
@@ -98,30 +90,11 @@ impl<'a, V> TermStreamer<'a, V>
|
||||
}
|
||||
}
|
||||
|
||||
/// Accesses the current key.
|
||||
///
|
||||
/// `.key()` should return the key that was returned
|
||||
/// by the `.next()` method.
|
||||
///
|
||||
/// If the end of the stream as been reached, and `.next()`
|
||||
/// has been called and returned `None`, `.key()` remains
|
||||
/// the value of the last key encounterred.
|
||||
///
|
||||
/// Before any call to `.next()`, `.key()` returns an empty array.
|
||||
pub fn key(&self) -> &[u8] {
|
||||
fn key(&self) -> &[u8] {
|
||||
&self.current_key
|
||||
}
|
||||
|
||||
/// Accesses the current value.
|
||||
///
|
||||
/// Calling `.value()` after the end of the stream will return the
|
||||
/// last `.value()` encounterred.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Calling `.value()` before the first call to `.advance()` returns
|
||||
/// `V::default()`.
|
||||
pub fn value(&self) -> &V {
|
||||
fn value(&self) -> &V {
|
||||
&self.current_value
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,22 +1,19 @@
|
||||
use std::io::{self, Write};
|
||||
use fst;
|
||||
use fst::raw::Fst;
|
||||
use super::TermStreamerBuilder;
|
||||
use directory::ReadOnlySource;
|
||||
use common::BinarySerializable;
|
||||
use std::marker::PhantomData;
|
||||
use postings::TermInfo;
|
||||
|
||||
use termdict::{TermDictionary, TermDictionaryBuilder};
|
||||
use super::{TermStreamerImpl, TermStreamerBuilderImpl};
|
||||
|
||||
fn convert_fst_error(e: fst::Error) -> io::Error {
|
||||
io::Error::new(io::ErrorKind::Other, e)
|
||||
}
|
||||
|
||||
|
||||
/// Builder for the new term dictionary.
|
||||
///
|
||||
/// Just like for the fst crate, all terms must be inserted in order.
|
||||
pub struct TermDictionaryBuilder<W, V = TermInfo>
|
||||
/// See [TermDictionaryBuilder](./trait.TermDictionaryBuilder.html)
|
||||
pub struct TermDictionaryBuilderImpl<W, V = TermInfo>
|
||||
where W: Write, V: BinarySerializable + Default
|
||||
{
|
||||
fst_builder: fst::MapBuilder<W>,
|
||||
@@ -24,18 +21,9 @@ pub struct TermDictionaryBuilder<W, V = TermInfo>
|
||||
_phantom_: PhantomData<V>,
|
||||
}
|
||||
|
||||
impl<W, V> TermDictionaryBuilder<W, V>
|
||||
impl<W, V> TermDictionaryBuilderImpl<W, V>
|
||||
where W: Write, V: BinarySerializable + Default {
|
||||
/// Creates a new `TermDictionaryBuilder`
|
||||
pub fn new(w: W) -> io::Result<TermDictionaryBuilder<W, V>> {
|
||||
let fst_builder = fst::MapBuilder::new(w).map_err(convert_fst_error)?;
|
||||
Ok(TermDictionaryBuilder {
|
||||
fst_builder: fst_builder,
|
||||
data: Vec::new(),
|
||||
_phantom_: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
/// # Warning
|
||||
/// Horribly dangerous internal API
|
||||
///
|
||||
@@ -58,10 +46,22 @@ impl<W, V> TermDictionaryBuilder<W, V>
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Inserts a `(key, value)` pair in the term dictionary.
|
||||
///
|
||||
/// *Keys have to be inserted in order.*
|
||||
pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()> {
|
||||
}
|
||||
|
||||
impl<W, V> TermDictionaryBuilder<W, V> for TermDictionaryBuilderImpl<W, V>
|
||||
where W: Write, V: BinarySerializable + Default {
|
||||
|
||||
fn new(w: W) -> io::Result<Self> {
|
||||
let fst_builder = fst::MapBuilder::new(w).map_err(convert_fst_error)?;
|
||||
Ok(TermDictionaryBuilderImpl {
|
||||
fst_builder: fst_builder,
|
||||
data: Vec::new(),
|
||||
_phantom_: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
fn insert<K: AsRef<[u8]>>(&mut self, key_ref: K, value: &V) -> io::Result<()> {
|
||||
let key = key_ref.as_ref();
|
||||
self.fst_builder
|
||||
.insert(key, self.data.len() as u64)
|
||||
.map_err(convert_fst_error)?;
|
||||
@@ -69,9 +69,7 @@ impl<W, V> TermDictionaryBuilder<W, V>
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Finalize writing the builder, and returns the underlying
|
||||
/// `Write` object.
|
||||
pub fn finish(self) -> io::Result<W> {
|
||||
fn finish(self) -> io::Result<W> {
|
||||
let mut file = self.fst_builder.into_inner().map_err(convert_fst_error)?;
|
||||
let footer_size = self.data.len() as u32;
|
||||
file.write_all(&self.data)?;
|
||||
@@ -81,7 +79,6 @@ impl<W, V> TermDictionaryBuilder<W, V>
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
fn open_fst_index(source: ReadOnlySource) -> io::Result<fst::Map> {
|
||||
let fst = match source {
|
||||
ReadOnlySource::Anonymous(data) => {
|
||||
@@ -95,20 +92,35 @@ fn open_fst_index(source: ReadOnlySource) -> io::Result<fst::Map> {
|
||||
Ok(fst::Map::from(fst))
|
||||
}
|
||||
|
||||
/// Datastructure to access the `terms` of a segment.
|
||||
pub struct TermDictionary<V = TermInfo>
|
||||
/// See [TermDictionary](./trait.TermDictionary.html)
|
||||
pub struct TermDictionaryImpl<V = TermInfo>
|
||||
where V: BinarySerializable + Default
|
||||
{
|
||||
{
|
||||
fst_index: fst::Map,
|
||||
values_mmap: ReadOnlySource,
|
||||
_phantom_: PhantomData<V>,
|
||||
}
|
||||
|
||||
impl<V> TermDictionary<V>
|
||||
impl<V> TermDictionaryImpl<V>
|
||||
where V: BinarySerializable + Default
|
||||
{
|
||||
/// Opens a `TermDictionary` given a data source.
|
||||
pub fn from_source(source: ReadOnlySource) -> io::Result<TermDictionary<V>> {
|
||||
/// Deserialize and returns the value at address `offset`
|
||||
pub(crate) fn read_value(&self, offset: u64) -> io::Result<V> {
|
||||
let buffer = self.values_mmap.as_slice();
|
||||
let mut cursor = &buffer[(offset as usize)..];
|
||||
V::deserialize(&mut cursor)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
impl<'a, V> TermDictionary<'a, V> for TermDictionaryImpl<V>
|
||||
where V: BinarySerializable + Default + 'a {
|
||||
|
||||
type Streamer = TermStreamerImpl<'a, V>;
|
||||
|
||||
type StreamBuilder = TermStreamerBuilderImpl<'a, V>;
|
||||
|
||||
fn from_source(source: ReadOnlySource) -> io::Result<Self> {
|
||||
let total_len = source.len();
|
||||
let length_offset = total_len - 4;
|
||||
let mut split_len_buffer: &[u8] = &source.as_slice()[length_offset..];
|
||||
@@ -117,22 +129,14 @@ impl<V> TermDictionary<V>
|
||||
let fst_source = source.slice(0, split_len);
|
||||
let values_source = source.slice(split_len, length_offset);
|
||||
let fst_index = open_fst_index(fst_source)?;
|
||||
Ok(TermDictionary {
|
||||
Ok(TermDictionaryImpl {
|
||||
fst_index: fst_index,
|
||||
values_mmap: values_source,
|
||||
_phantom_: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
/// Deserialize and returns the value at address `offset`
|
||||
pub(crate) fn read_value(&self, offset: u64) -> io::Result<V> {
|
||||
let buffer = self.values_mmap.as_slice();
|
||||
let mut cursor = &buffer[(offset as usize)..];
|
||||
V::deserialize(&mut cursor)
|
||||
}
|
||||
|
||||
/// Lookups the value corresponding to the key.
|
||||
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Option<V> {
|
||||
fn get<K: AsRef<[u8]>>(&self, key: K) -> Option<V> {
|
||||
self.fst_index
|
||||
.get(key)
|
||||
.map(|offset| {
|
||||
@@ -141,9 +145,7 @@ impl<V> TermDictionary<V>
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns a range builder, to stream all of the terms
|
||||
/// within an interval.
|
||||
pub fn range(&self) -> TermStreamerBuilder<V> {
|
||||
TermStreamerBuilder::new(self, self.fst_index.range())
|
||||
fn range(&self) -> TermStreamerBuilderImpl<V> {
|
||||
TermStreamerBuilderImpl::new(self, self.fst_index.range())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,15 +1,17 @@
|
||||
use std::collections::BinaryHeap;
|
||||
use core::SegmentReader;
|
||||
use termdict::TermStreamer;
|
||||
use termdict::TermStreamerImpl;
|
||||
use common::BinarySerializable;
|
||||
use postings::TermInfo;
|
||||
use std::cmp::Ordering;
|
||||
use termdict::TermStreamer;
|
||||
use termdict::TermDictionary;
|
||||
use fst::Streamer;
|
||||
|
||||
pub struct HeapItem<'a, V>
|
||||
where V: 'a + BinarySerializable + Default
|
||||
{
|
||||
pub streamer: TermStreamer<'a, V>,
|
||||
pub streamer: TermStreamerImpl<'a, V>,
|
||||
pub segment_ord: usize,
|
||||
}
|
||||
|
||||
@@ -56,7 +58,7 @@ pub struct TermMerger<'a, V>
|
||||
impl<'a, V> TermMerger<'a, V>
|
||||
where V: 'a + BinarySerializable + Default
|
||||
{
|
||||
fn new(streams: Vec<TermStreamer<'a, V>>) -> TermMerger<'a, V> {
|
||||
fn new(streams: Vec<TermStreamerImpl<'a, V>>) -> TermMerger<'a, V> {
|
||||
TermMerger {
|
||||
heap: BinaryHeap::new(),
|
||||
current_streamers: streams
|
||||
|
||||
@@ -1,45 +1,131 @@
|
||||
/*!
|
||||
The term dictionary contains all of the terms in
|
||||
`tantivy index` in a sorted manner.
|
||||
The term dictionary is one of the key datastructure of
|
||||
tantivy. It associates sorted `terms` to their respective
|
||||
posting list.
|
||||
|
||||
It is implemented as a wrapper of the `Fst` crate in order
|
||||
to add a value type.
|
||||
The term dictionary makes it possible to iterate through
|
||||
the keys in a sorted manner.
|
||||
|
||||
A finite state transducer itself associates
|
||||
each term `&[u8]` to a `u64` that is in fact an address
|
||||
in a buffer. The value is then accessible via
|
||||
deserializing the value at this address.
|
||||
# Example
|
||||
|
||||
Keys (`&[u8]`) in this datastructure are sorted.
|
||||
```
|
||||
extern crate tantivy;
|
||||
use tantivy::termdict::*;
|
||||
use tantivy::directory::ReadOnlySource;
|
||||
|
||||
# fn main() {
|
||||
# run().expect("Test failed");
|
||||
# }
|
||||
# fn run() -> tantivy::Result<()> {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilderImpl::new(vec!())?;
|
||||
|
||||
// keys have to be insert in order.
|
||||
term_dictionary_builder.insert("apple", &1u32)?;
|
||||
term_dictionary_builder.insert("grape", &2u32)?;
|
||||
term_dictionary_builder.insert("pear", &3u32)?;
|
||||
let buffer: Vec<u8> = term_dictionary_builder.finish()?;
|
||||
|
||||
let source = ReadOnlySource::from(buffer);
|
||||
let term_dictionary = TermDictionaryImpl::from_source(source)?;
|
||||
|
||||
assert_eq!(term_dictionary.get("grape"), Some(2u32));
|
||||
# Ok(())
|
||||
# }
|
||||
```
|
||||
|
||||
|
||||
# Implementations
|
||||
|
||||
There is currently two implementations of the term dictionary.
|
||||
|
||||
## Default implementation : `fstdict`
|
||||
|
||||
The default one relies heavily on the `fst` crate.
|
||||
It associate each terms `&[u8]` representation to a `u64`
|
||||
that is in fact an address in a buffer. The value is then accessible
|
||||
via deserializing the value at this address.
|
||||
|
||||
|
||||
## Stream implementation : `streamdict`
|
||||
|
||||
The `fstdict` is a tiny bit slow when streaming all of
|
||||
the terms.
|
||||
For some use case (analytics engine), it is preferrable
|
||||
to use the `streamdict`, that offers better streaming
|
||||
performance, to the detriment of `lookup` performance.
|
||||
|
||||
`streamdict` can be enabled by adding the `streamdict`
|
||||
feature when compiling `tantivy`.
|
||||
|
||||
`streamdict` encodes each term relatively to the precedent
|
||||
as follows.
|
||||
|
||||
- number of bytes that needs to be popped.
|
||||
- number of bytes that needs to be added.
|
||||
- sequence of bytes that is to be added
|
||||
- value.
|
||||
|
||||
Because such a structure does not allow for lookups,
|
||||
it comes with a `fst` that indexes 1 out of `1024`
|
||||
terms in this structure.
|
||||
|
||||
A `lookup` therefore consists in a lookup in the `fst`
|
||||
followed by a streaming through at most `1024` elements in the
|
||||
term `stream`.
|
||||
*/
|
||||
|
||||
use schema::{Field, Term};
|
||||
use common::BinarySerializable;
|
||||
use fst;
|
||||
|
||||
#[cfg(not(feature="streamdict"))]
|
||||
use directory::ReadOnlySource;
|
||||
mod fstdict;
|
||||
#[cfg(not(feature="streamdict"))]
|
||||
pub use self::fstdict::{TermDictionary, TermDictionaryBuilder, TermStreamer, TermStreamerBuilder};
|
||||
|
||||
#[cfg(feature="streamdict")]
|
||||
mod streamdict;
|
||||
#[cfg(feature="streamdict")]
|
||||
pub use self::streamdict::{TermDictionary, TermDictionaryBuilder, TermStreamer, TermStreamerBuilder};
|
||||
|
||||
mod merger;
|
||||
pub use self::merger::TermMerger;
|
||||
|
||||
impl<V> TermDictionary<V>
|
||||
where V: BinarySerializable + Default {
|
||||
|
||||
|
||||
#[cfg(not(feature="streamdict"))]
|
||||
mod defaultimpl {
|
||||
pub use super::fstdict::{TermDictionaryImpl, TermDictionaryBuilderImpl, TermStreamerImpl, TermStreamerBuilderImpl};
|
||||
}
|
||||
|
||||
#[cfg(feature="streamdict")]
|
||||
mod defaultimpl {
|
||||
pub use super::streamdict::{TermDictionaryImpl, TermDictionaryBuilderImpl, TermStreamerImpl, TermStreamerBuilderImpl};
|
||||
}
|
||||
|
||||
|
||||
pub use self::defaultimpl::{TermDictionaryImpl, TermDictionaryBuilderImpl, TermStreamerImpl, TermStreamerBuilderImpl};
|
||||
|
||||
mod merger;
|
||||
use std::io;
|
||||
|
||||
|
||||
/// Dictionary associating sorted `&[u8]` to values
|
||||
pub trait TermDictionary<'a, V>
|
||||
where V: BinarySerializable + Default + 'a , Self: Sized {
|
||||
|
||||
/// Streamer type associated to the term dictionary
|
||||
type Streamer: TermStreamer<V> + 'a;
|
||||
|
||||
/// StreamerBuilder type associated to the term dictionary
|
||||
type StreamBuilder: TermStreamerBuilder<V, Streamer=Self::Streamer> + 'a;
|
||||
|
||||
/// Opens a `TermDictionary` given a data source.
|
||||
fn from_source(source: ReadOnlySource) -> io::Result<Self>;
|
||||
|
||||
/// Lookups the value corresponding to the key.
|
||||
fn get<K: AsRef<[u8]>>(&self, target_key: K) -> Option<V>;
|
||||
|
||||
/// Returns a range builder, to stream all of the terms
|
||||
/// within an interval.
|
||||
fn range(&'a self) -> Self::StreamBuilder;
|
||||
|
||||
/// A stream of all the sorted terms. [See also `.stream_field()`](#method.stream_field)
|
||||
pub fn stream(&self) -> TermStreamer<V> {
|
||||
fn stream(&'a self) -> Self::Streamer {
|
||||
self.range().into_stream()
|
||||
}
|
||||
|
||||
/// A stream of all the sorted terms in the given field.
|
||||
pub fn stream_field(&self, field: Field) -> TermStreamer<V> {
|
||||
fn stream_field(&'a self, field: Field) -> Self::Streamer {
|
||||
let start_term = Term::from_field_text(field, "");
|
||||
let stop_term = Term::from_field_text(Field(field.0 + 1), "");
|
||||
self.range()
|
||||
@@ -47,15 +133,62 @@ impl<V> TermDictionary<V>
|
||||
.lt(stop_term.as_slice())
|
||||
.into_stream()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
impl<'a, 'b, V: 'b> fst::Streamer<'b> for TermStreamer<'a, V>
|
||||
where V: 'a + BinarySerializable + Default
|
||||
{
|
||||
type Item = (&'b [u8], &'b V);
|
||||
/// Builder for the new term dictionary.
|
||||
///
|
||||
/// Inserting must be done in the order of the `keys`.
|
||||
pub trait TermDictionaryBuilder<W, V>: Sized
|
||||
where W: io::Write, V: BinarySerializable + Default {
|
||||
|
||||
fn next(&'b mut self) -> Option<(&'b [u8], &V)> {
|
||||
/// Creates a new `TermDictionaryBuilder`
|
||||
fn new(write: W) -> io::Result<Self>;
|
||||
|
||||
/// Inserts a `(key, value)` pair in the term dictionary.
|
||||
///
|
||||
/// *Keys have to be inserted in order.*
|
||||
fn insert<K: AsRef<[u8]>>(&mut self, key: K, value: &V) -> io::Result<()>;
|
||||
|
||||
/// Finalize writing the builder, and returns the underlying
|
||||
/// `Write` object.
|
||||
fn finish(self) -> io::Result<W>;
|
||||
}
|
||||
|
||||
|
||||
/// `TermStreamer` acts as a cursor over a range of terms of a segment.
|
||||
/// Terms are guaranteed to be sorted.
|
||||
pub trait TermStreamer<V>: Sized {
|
||||
|
||||
/// Advance position the stream on the next item.
|
||||
/// Before the first call to `.advance()`, the stream
|
||||
/// is an unitialized state.
|
||||
fn advance(&mut self) -> bool;
|
||||
|
||||
/// Accesses the current key.
|
||||
///
|
||||
/// `.key()` should return the key that was returned
|
||||
/// by the `.next()` method.
|
||||
///
|
||||
/// If the end of the stream as been reached, and `.next()`
|
||||
/// has been called and returned `None`, `.key()` remains
|
||||
/// the value of the last key encounterred.
|
||||
///
|
||||
/// Before any call to `.next()`, `.key()` returns an empty array.
|
||||
fn key(&self) -> &[u8];
|
||||
|
||||
/// Accesses the current value.
|
||||
///
|
||||
/// Calling `.value()` after the end of the stream will return the
|
||||
/// last `.value()` encounterred.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Calling `.value()` before the first call to `.advance()` returns
|
||||
/// `V::default()`.
|
||||
fn value(&self) -> &V;
|
||||
|
||||
/// Return the next `(key, value)` pair.
|
||||
fn next<'b>(&'b mut self) -> Option<(&'b [u8], &'b V)> {
|
||||
if self.advance() {
|
||||
Some((self.key(), self.value()))
|
||||
} else {
|
||||
@@ -64,17 +197,46 @@ impl<'a, 'b, V: 'b> fst::Streamer<'b> for TermStreamer<'a, V>
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// `TermStreamerBuilder` is an helper object used to define
|
||||
/// a range of terms that should be streamed.
|
||||
pub trait TermStreamerBuilder<V> where V: BinarySerializable + Default {
|
||||
|
||||
/// Associated `TermStreamer` type that this builder is building.
|
||||
type Streamer: TermStreamer<V>;
|
||||
|
||||
/// Limit the range to terms greater or equal to the bound
|
||||
fn ge<T: AsRef<[u8]>>(self, bound: T) -> Self;
|
||||
|
||||
/// Limit the range to terms strictly greater than the bound
|
||||
fn gt<T: AsRef<[u8]>>(self, bound: T) -> Self;
|
||||
|
||||
/// Limit the range to terms lesser or equal to the bound
|
||||
fn lt<T: AsRef<[u8]>>(self, bound: T) -> Self;
|
||||
|
||||
/// Limit the range to terms lesser or equal to the bound
|
||||
fn le<T: AsRef<[u8]>>(self, bound: T) -> Self;
|
||||
|
||||
/// Creates the stream corresponding to the range
|
||||
/// of terms defined using the `TermStreamerBuilder`.
|
||||
fn into_stream(self) -> Self::Streamer;
|
||||
}
|
||||
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{TermDictionary, TermDictionaryBuilder, TermStreamer};
|
||||
use super::{TermDictionaryImpl, TermDictionaryBuilderImpl, TermStreamerImpl};
|
||||
use directory::{RAMDirectory, Directory, ReadOnlySource};
|
||||
use std::path::PathBuf;
|
||||
use fst::Streamer;
|
||||
use schema::{Term, SchemaBuilder, Document, TEXT};
|
||||
use core::Index;
|
||||
use std::str;
|
||||
use termdict::TermStreamer;
|
||||
use termdict::TermStreamerBuilder;
|
||||
use termdict::TermDictionary;
|
||||
use termdict::TermDictionaryBuilder;
|
||||
const BLOCK_SIZE: usize = 1_500;
|
||||
|
||||
|
||||
|
||||
#[test]
|
||||
fn test_term_dictionary() {
|
||||
@@ -82,7 +244,7 @@ mod tests {
|
||||
let path = PathBuf::from("TermDictionary");
|
||||
{
|
||||
let write = directory.open_write(&path).unwrap();
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::new(write).unwrap();
|
||||
let mut term_dictionary_builder = TermDictionaryBuilderImpl::new(write).unwrap();
|
||||
term_dictionary_builder
|
||||
.insert("abc".as_bytes(), &34u32)
|
||||
.unwrap();
|
||||
@@ -92,7 +254,7 @@ mod tests {
|
||||
term_dictionary_builder.finish().unwrap();
|
||||
}
|
||||
let source = directory.open_read(&path).unwrap();
|
||||
let term_dict: TermDictionary<u32> = TermDictionary::from_source(source).unwrap();
|
||||
let term_dict: TermDictionaryImpl<u32> = TermDictionaryImpl::from_source(source).unwrap();
|
||||
assert_eq!(term_dict.get("abc"), Some(34u32));
|
||||
assert_eq!(term_dict.get("abcd"), Some(346u32));
|
||||
let mut stream = term_dict.stream();
|
||||
@@ -158,14 +320,14 @@ mod tests {
|
||||
.map(|i| (format!("doc{:0>6}", i), i))
|
||||
.collect();
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::new(vec!()).unwrap();
|
||||
let mut term_dictionary_builder = TermDictionaryBuilderImpl::new(vec!()).unwrap();
|
||||
for &(ref id, ref i) in &ids {
|
||||
term_dictionary_builder.insert(id.as_bytes(), i).unwrap();
|
||||
}
|
||||
term_dictionary_builder.finish().unwrap()
|
||||
};
|
||||
let source = ReadOnlySource::from(buffer);
|
||||
let term_dictionary: TermDictionary<u32> = TermDictionary::from_source(source).unwrap();
|
||||
let term_dictionary: TermDictionaryImpl<u32> = TermDictionaryImpl::from_source(source).unwrap();
|
||||
{
|
||||
let mut streamer = term_dictionary.stream();
|
||||
let mut i = 0;
|
||||
@@ -187,7 +349,7 @@ mod tests {
|
||||
.map(|i| (format!("doc{:0>6}", i), i))
|
||||
.collect();
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::new(vec!()).unwrap();
|
||||
let mut term_dictionary_builder = TermDictionaryBuilderImpl::new(vec!()).unwrap();
|
||||
for &(ref id, ref i) in &ids {
|
||||
term_dictionary_builder.insert(id.as_bytes(), i).unwrap();
|
||||
}
|
||||
@@ -196,7 +358,7 @@ mod tests {
|
||||
|
||||
let source = ReadOnlySource::from(buffer);
|
||||
|
||||
let term_dictionary: TermDictionary<u32> = TermDictionary::from_source(source).unwrap();
|
||||
let term_dictionary: TermDictionaryImpl<u32> = TermDictionaryImpl::from_source(source).unwrap();
|
||||
{
|
||||
for i in (0..20).chain(6000..8_000) {
|
||||
let &(ref target_key, _) = &ids[i];
|
||||
@@ -254,7 +416,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_stream_range_boundaries() {
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::new(vec!()).unwrap();
|
||||
let mut term_dictionary_builder = TermDictionaryBuilderImpl::new(vec!()).unwrap();
|
||||
for i in 0u8..10u8 {
|
||||
let number_arr = [i; 1];
|
||||
term_dictionary_builder.insert(&number_arr, &i).unwrap();
|
||||
@@ -262,9 +424,9 @@ mod tests {
|
||||
term_dictionary_builder.finish().unwrap()
|
||||
};
|
||||
let source = ReadOnlySource::from(buffer);
|
||||
let term_dictionary: TermDictionary<u8> = TermDictionary::from_source(source).unwrap();
|
||||
let term_dictionary: TermDictionaryImpl<u8> = TermDictionaryImpl::from_source(source).unwrap();
|
||||
|
||||
let value_list = |mut streamer: TermStreamer<u8>| {
|
||||
let value_list = |mut streamer: TermStreamerImpl<u8>| {
|
||||
let mut res: Vec<u8> = vec!();
|
||||
while let Some((_, &v)) = streamer.next() {
|
||||
res.push(v);
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
|
||||
mod termdict;
|
||||
mod streamer;
|
||||
mod counting_writer;
|
||||
|
||||
pub use self::termdict::TermDictionary;
|
||||
pub use self::termdict::TermDictionaryBuilder;
|
||||
pub use self::streamer::TermStreamer;
|
||||
pub use self::streamer::TermStreamerBuilder;
|
||||
use self::counting_writer::CountingWriter;
|
||||
pub use self::termdict::TermDictionaryImpl;
|
||||
pub use self::termdict::TermDictionaryBuilderImpl;
|
||||
pub use self::streamer::TermStreamerImpl;
|
||||
pub use self::streamer::TermStreamerBuilderImpl;
|
||||
|
||||
|
||||
@@ -4,36 +4,95 @@ use std::cmp::max;
|
||||
use std::io::Read;
|
||||
use common::VInt;
|
||||
use common::BinarySerializable;
|
||||
use super::TermDictionary;
|
||||
use fst::Streamer;
|
||||
use super::TermDictionaryImpl;
|
||||
use termdict::{TermStreamerBuilder, TermStreamer};
|
||||
|
||||
pub(crate) fn stream_before<'a, V>(term_dictionary: &'a TermDictionary<V>, target_key: &[u8]) -> TermStreamer<'a, V>
|
||||
pub(crate) fn stream_before<'a, V>(term_dictionary: &'a TermDictionaryImpl<V>, target_key: &[u8]) -> TermStreamerImpl<'a, V>
|
||||
where V: 'a + BinarySerializable + Default {
|
||||
let (prev_key, offset) = term_dictionary.strictly_previous_key(target_key.as_ref());
|
||||
let offset: usize = offset as usize;
|
||||
TermStreamer {
|
||||
TermStreamerImpl {
|
||||
cursor: &term_dictionary.stream_data()[offset..],
|
||||
current_key: Vec::from(prev_key),
|
||||
current_value: V::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// `TermStreamerBuilder` is an helper object used to define
|
||||
/// a range of terms that should be streamed.
|
||||
pub struct TermStreamerBuilder<'a, V>
|
||||
/// See [TermStreamerBuilder](./trait.TermStreamerBuilder.html)
|
||||
pub struct TermStreamerBuilderImpl<'a, V>
|
||||
where V: 'a + BinarySerializable + Default {
|
||||
term_dictionary: &'a TermDictionary<V>,
|
||||
term_dictionary: &'a TermDictionaryImpl<V>,
|
||||
origin: usize,
|
||||
offset_from: usize,
|
||||
offset_to: usize,
|
||||
current_key: Vec<u8>,
|
||||
}
|
||||
|
||||
impl<'a, V> TermStreamerBuilder<V> for TermStreamerBuilderImpl<'a, V>
|
||||
where V: 'a + BinarySerializable + Default {
|
||||
|
||||
type Streamer = TermStreamerImpl<'a, V>;
|
||||
|
||||
/// Limit the range to terms greater or equal to the bound
|
||||
fn ge<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
|
||||
let target_key = bound.as_ref();
|
||||
let streamer = stream_before(&self.term_dictionary, target_key.as_ref());
|
||||
let smaller_than = |k: &[u8]| { k.lt(target_key) };
|
||||
let (offset_before, current_key) = get_offset(smaller_than, streamer);
|
||||
self.current_key = current_key;
|
||||
self.offset_from = offset_before - self.origin;
|
||||
self
|
||||
}
|
||||
|
||||
/// Limit the range to terms strictly greater than the bound
|
||||
fn gt<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
|
||||
let target_key = bound.as_ref();
|
||||
let streamer = stream_before(self.term_dictionary, target_key.as_ref());
|
||||
let smaller_than = |k: &[u8]| { k.le(target_key) };
|
||||
let (offset_before, current_key) = get_offset(smaller_than, streamer);
|
||||
self.current_key = current_key;
|
||||
self.offset_from = offset_before - self.origin;
|
||||
self
|
||||
}
|
||||
|
||||
/// Limit the range to terms lesser or equal to the bound
|
||||
fn lt<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
|
||||
let target_key = bound.as_ref();
|
||||
let streamer = stream_before(self.term_dictionary, target_key.as_ref());
|
||||
let smaller_than = |k: &[u8]| { k.lt(target_key) };
|
||||
let (offset_before, _) = get_offset(smaller_than, streamer);
|
||||
self.offset_to = offset_before - self.origin;
|
||||
self
|
||||
}
|
||||
|
||||
/// Limit the range to terms lesser or equal to the bound
|
||||
fn le<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
|
||||
let target_key = bound.as_ref();
|
||||
let streamer = stream_before(self.term_dictionary, target_key.as_ref());
|
||||
let smaller_than = |k: &[u8]| { k.le(target_key) };
|
||||
let (offset_before, _) = get_offset(smaller_than, streamer);
|
||||
self.offset_to = offset_before - self.origin;
|
||||
self
|
||||
}
|
||||
|
||||
/// Build the streamer.
|
||||
fn into_stream(self) -> Self::Streamer {
|
||||
let data: &[u8] = self.term_dictionary.stream_data();
|
||||
let start = self.offset_from;
|
||||
let stop = max(self.offset_to, start);
|
||||
TermStreamerImpl {
|
||||
cursor: &data[start..stop],
|
||||
current_key: self.current_key,
|
||||
current_value: V::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns offset information for the first
|
||||
/// key in the stream matching a given predicate.
|
||||
///
|
||||
/// returns (start offset, the data required to load the value)
|
||||
fn get_offset<'a, V, P: Fn(&[u8])->bool>(predicate: P, mut streamer: TermStreamer<V>) -> (usize, Vec<u8>)
|
||||
fn get_offset<'a, V, P: Fn(&[u8])->bool>(predicate: P, mut streamer: TermStreamerImpl<V>) -> (usize, Vec<u8>)
|
||||
where V: 'a + BinarySerializable + Default {
|
||||
let mut prev: &[u8] = streamer.cursor;
|
||||
|
||||
@@ -50,55 +109,14 @@ fn get_offset<'a, V, P: Fn(&[u8])->bool>(predicate: P, mut streamer: TermStreame
|
||||
return (prev.as_ptr() as usize, prev_data);
|
||||
}
|
||||
|
||||
impl<'a, V> TermStreamerBuilder<'a, V>
|
||||
impl<'a, V> TermStreamerBuilderImpl<'a, V>
|
||||
where V: 'a + BinarySerializable + Default {
|
||||
|
||||
/// Limit the range to terms greater or equal to the bound
|
||||
pub fn ge<T: AsRef<[u8]>>(mut self, bound: T) -> TermStreamerBuilder<'a, V> {
|
||||
let target_key = bound.as_ref();
|
||||
let streamer = stream_before(&self.term_dictionary, target_key.as_ref());
|
||||
let smaller_than = |k: &[u8]| { k.lt(target_key) };
|
||||
let (offset_before, current_key) = get_offset(smaller_than, streamer);
|
||||
self.current_key = current_key;
|
||||
self.offset_from = offset_before - self.origin;
|
||||
self
|
||||
}
|
||||
|
||||
/// Limit the range to terms strictly greater than the bound
|
||||
pub fn gt<T: AsRef<[u8]>>(mut self, bound: T) -> TermStreamerBuilder<'a, V> {
|
||||
let target_key = bound.as_ref();
|
||||
let streamer = stream_before(self.term_dictionary, target_key.as_ref());
|
||||
let smaller_than = |k: &[u8]| { k.le(target_key) };
|
||||
let (offset_before, current_key) = get_offset(smaller_than, streamer);
|
||||
self.current_key = current_key;
|
||||
self.offset_from = offset_before - self.origin;
|
||||
self
|
||||
}
|
||||
|
||||
/// Limit the range to terms lesser or equal to the bound
|
||||
pub fn lt<T: AsRef<[u8]>>(mut self, bound: T) -> TermStreamerBuilder<'a, V> {
|
||||
let target_key = bound.as_ref();
|
||||
let streamer = stream_before(self.term_dictionary, target_key.as_ref());
|
||||
let smaller_than = |k: &[u8]| { k.lt(target_key) };
|
||||
let (offset_before, _) = get_offset(smaller_than, streamer);
|
||||
self.offset_to = offset_before - self.origin;
|
||||
self
|
||||
}
|
||||
|
||||
/// Limit the range to terms lesser or equal to the bound
|
||||
pub fn le<T: AsRef<[u8]>>(mut self, bound: T) -> TermStreamerBuilder<'a, V> {
|
||||
let target_key = bound.as_ref();
|
||||
let streamer = stream_before(self.term_dictionary, target_key.as_ref());
|
||||
let smaller_than = |k: &[u8]| { k.le(target_key) };
|
||||
let (offset_before, _) = get_offset(smaller_than, streamer);
|
||||
self.offset_to = offset_before - self.origin;
|
||||
self
|
||||
}
|
||||
|
||||
pub(crate) fn new(term_dictionary: &'a TermDictionary<V>) -> TermStreamerBuilder<'a, V> {
|
||||
pub(crate) fn new(term_dictionary: &'a TermDictionaryImpl<V>) -> Self {
|
||||
let data = term_dictionary.stream_data();
|
||||
let origin = data.as_ptr() as usize;
|
||||
TermStreamerBuilder {
|
||||
TermStreamerBuilderImpl {
|
||||
term_dictionary: term_dictionary,
|
||||
origin: origin,
|
||||
offset_from: 0,
|
||||
@@ -106,23 +124,10 @@ impl<'a, V> TermStreamerBuilder<'a, V>
|
||||
current_key: vec!(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Build the streamer.
|
||||
pub fn into_stream(self) -> TermStreamer<'a, V> {
|
||||
let data: &[u8] = self.term_dictionary.stream_data();
|
||||
let start = self.offset_from;
|
||||
let stop = max(self.offset_to, start);
|
||||
TermStreamer {
|
||||
cursor: &data[start..stop],
|
||||
current_key: self.current_key,
|
||||
current_value: V::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// `TermStreamer` acts as a cursor over a range of terms of a segment.
|
||||
/// Terms are guaranteed to be sorted.
|
||||
pub struct TermStreamer<'a, V>
|
||||
/// See [TermStreamer](./trait.TermStreamer.html)
|
||||
pub struct TermStreamerImpl<'a, V>
|
||||
where V: 'a + BinarySerializable + Default {
|
||||
cursor: &'a [u8],
|
||||
current_key: Vec<u8>,
|
||||
@@ -130,14 +135,19 @@ pub struct TermStreamer<'a, V>
|
||||
}
|
||||
|
||||
|
||||
|
||||
impl<'a, V: BinarySerializable> TermStreamer<'a, V>
|
||||
impl<'a, V: BinarySerializable> TermStreamerImpl<'a, V>
|
||||
where V: 'a + BinarySerializable + Default {
|
||||
|
||||
/// Advance position the stream on the next item.
|
||||
/// Before the first call to `.advance()`, the stream
|
||||
/// is an unitialized state.
|
||||
pub fn advance(&mut self) -> bool {
|
||||
|
||||
pub(crate) fn extract_value(self) -> V {
|
||||
self.current_value
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, V> TermStreamer<V> for TermStreamerImpl<'a, V>
|
||||
where V: BinarySerializable + Default {
|
||||
|
||||
fn advance(&mut self) -> bool {
|
||||
if self.cursor.len() == 0 {
|
||||
return false;
|
||||
}
|
||||
@@ -152,34 +162,11 @@ impl<'a, V: BinarySerializable> TermStreamer<'a, V>
|
||||
return true;
|
||||
}
|
||||
|
||||
/// Accesses the current key.
|
||||
///
|
||||
/// `.key()` should return the key that was returned
|
||||
/// by the `.next()` method.
|
||||
///
|
||||
/// If the end of the stream as been reached, and `.next()`
|
||||
/// has been called and returned `None`, `.key()` remains
|
||||
/// the value of the last key encounterred.
|
||||
///
|
||||
/// Before any call to `.next()`, `.key()` returns an empty array.
|
||||
pub fn key(&self) -> &[u8] {
|
||||
fn key(&self) -> &[u8] {
|
||||
&self.current_key
|
||||
}
|
||||
|
||||
/// Accesses the current value.
|
||||
///
|
||||
/// Calling `.value()` after the end of the stream will return the
|
||||
/// last `.value()` encounterred.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Calling `.value()` before the first call to `.advance()` returns
|
||||
/// `V::default()`.
|
||||
pub fn value(&self) -> &V {
|
||||
fn value(&self) -> &V {
|
||||
&self.current_value
|
||||
}
|
||||
|
||||
pub(crate) fn extract_value(self) -> V {
|
||||
self.current_value
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -7,12 +7,13 @@ use common::VInt;
|
||||
use directory::ReadOnlySource;
|
||||
use common::BinarySerializable;
|
||||
use std::marker::PhantomData;
|
||||
use common::CountingWriter;
|
||||
use super::CountingWriter;
|
||||
use std::cmp::Ordering;
|
||||
use postings::TermInfo;
|
||||
use fst::raw::Node;
|
||||
use super::TermStreamerBuilder;
|
||||
use super::streamer::stream_before;
|
||||
use termdict::{TermDictionary, TermDictionaryBuilder, TermStreamer};
|
||||
use super::{TermStreamerImpl, TermStreamerBuilderImpl};
|
||||
|
||||
const BLOCK_SIZE: usize = 1024;
|
||||
|
||||
@@ -20,10 +21,8 @@ fn convert_fst_error(e: fst::Error) -> io::Error {
|
||||
io::Error::new(io::ErrorKind::Other, e)
|
||||
}
|
||||
|
||||
/// Builder for the new term dictionary.
|
||||
///
|
||||
/// All terms must be inserted in order.
|
||||
pub struct TermDictionaryBuilder<W, V=TermInfo>
|
||||
/// See [TermDictionaryBuilder](./trait.TermDictionaryBuilder.html)
|
||||
pub struct TermDictionaryBuilderImpl<W, V=TermInfo>
|
||||
where W: Write, V: BinarySerializable + Default {
|
||||
write: CountingWriter<W>,
|
||||
block_index: fst::MapBuilder<Vec<u8>>,
|
||||
@@ -51,34 +50,13 @@ fn fill_last<'a>(fst: &'a Fst, mut node: Node<'a>, buffer: &mut Vec<u8>) {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
impl<W: Write, V: BinarySerializable + Default> TermDictionaryBuilder<W, V> {
|
||||
|
||||
/// Creates a new `TermDictionaryBuilder`
|
||||
pub fn new(write: W) -> io::Result<TermDictionaryBuilder<W, V>> {
|
||||
let buffer: Vec<u8> = vec!();
|
||||
Ok(TermDictionaryBuilder {
|
||||
write: CountingWriter::wrap(write),
|
||||
block_index: fst::MapBuilder::new(buffer)
|
||||
.expect("This cannot fail"),
|
||||
last_key: Vec::with_capacity(128),
|
||||
len: 0,
|
||||
_phantom_: PhantomData,
|
||||
})
|
||||
}
|
||||
impl<W, V> TermDictionaryBuilderImpl<W, V>
|
||||
where W: Write, V: BinarySerializable + Default {
|
||||
|
||||
fn add_index_entry(&mut self) {
|
||||
self.block_index.insert(&self.last_key, self.write.written_bytes() as u64).unwrap();
|
||||
}
|
||||
|
||||
/// Inserts a `(key, value)` pair in the term dictionary.
|
||||
///
|
||||
/// *Keys have to be inserted in order.*
|
||||
pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()>{
|
||||
self.insert_key(key)?;
|
||||
self.insert_value(value)
|
||||
}
|
||||
|
||||
/// # Warning
|
||||
/// Horribly dangerous internal API
|
||||
///
|
||||
@@ -104,10 +82,36 @@ impl<W: Write, V: BinarySerializable + Default> TermDictionaryBuilder<W, V> {
|
||||
value.serialize(&mut self.write)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<W, V> TermDictionaryBuilder<W, V> for TermDictionaryBuilderImpl<W, V>
|
||||
where W: Write, V: BinarySerializable + Default {
|
||||
|
||||
/// Creates a new `TermDictionaryBuilder`
|
||||
fn new(write: W) -> io::Result<Self> {
|
||||
let buffer: Vec<u8> = vec!();
|
||||
Ok(TermDictionaryBuilderImpl {
|
||||
write: CountingWriter::wrap(write),
|
||||
block_index: fst::MapBuilder::new(buffer)
|
||||
.expect("This cannot fail"),
|
||||
last_key: Vec::with_capacity(128),
|
||||
len: 0,
|
||||
_phantom_: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
/// Inserts a `(key, value)` pair in the term dictionary.
|
||||
///
|
||||
/// *Keys have to be inserted in order.*
|
||||
fn insert<K: AsRef<[u8]>>(&mut self, key_ref: K, value: &V) -> io::Result<()>{
|
||||
let key = key_ref.as_ref();
|
||||
self.insert_key(key)?;
|
||||
self.insert_value(value)
|
||||
}
|
||||
|
||||
/// Finalize writing the builder, and returns the underlying
|
||||
/// `Write` object.
|
||||
pub fn finish(mut self) -> io::Result<W> {
|
||||
fn finish(mut self) -> io::Result<W> {
|
||||
self.add_index_entry();
|
||||
let (mut w, split_len) = self.write.finish()?;
|
||||
let fst_write = self.block_index
|
||||
@@ -121,8 +125,6 @@ impl<W: Write, V: BinarySerializable + Default> TermDictionaryBuilder<W, V> {
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
fn open_fst_index(source: ReadOnlySource) -> io::Result<fst::Map> {
|
||||
Ok(fst::Map::from(match source {
|
||||
ReadOnlySource::Anonymous(data) => try!(Fst::from_shared_bytes(data.data, data.start, data.len).map_err(convert_fst_error)),
|
||||
@@ -130,34 +132,15 @@ fn open_fst_index(source: ReadOnlySource) -> io::Result<fst::Map> {
|
||||
}))
|
||||
}
|
||||
|
||||
/// Datastructure to access the `terms` of a segment.
|
||||
pub struct TermDictionary<V=TermInfo> where V: BinarySerializable + Default {
|
||||
/// See [TermDictionary](./trait.TermDictionary.html)
|
||||
pub struct TermDictionaryImpl<V=TermInfo> where V: BinarySerializable + Default {
|
||||
stream_data: ReadOnlySource,
|
||||
fst_index: fst::Map,
|
||||
_phantom_: PhantomData<V>,
|
||||
}
|
||||
|
||||
impl<V> TermDictionary<V>
|
||||
impl<V> TermDictionaryImpl<V>
|
||||
where V: BinarySerializable + Default {
|
||||
|
||||
/// Opens a `TermDictionary` given a data source.
|
||||
pub fn from_source(source: ReadOnlySource) -> io::Result<TermDictionary<V>> {
|
||||
let total_len = source.len();
|
||||
let length_offset = total_len - 8;
|
||||
let split_len: usize = {
|
||||
let mut split_len_buffer: &[u8] = &source.as_slice()[length_offset..];
|
||||
u64::deserialize(&mut split_len_buffer)? as usize
|
||||
};
|
||||
let stream_data = source.slice(0, split_len);
|
||||
let fst_data = source.slice(split_len, length_offset);
|
||||
let fst_index = open_fst_index(fst_data)?;
|
||||
|
||||
Ok(TermDictionary {
|
||||
stream_data: stream_data,
|
||||
fst_index: fst_index,
|
||||
_phantom_: PhantomData
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn stream_data(&self) -> &[u8] {
|
||||
self.stream_data.as_slice()
|
||||
@@ -212,8 +195,37 @@ impl<V> TermDictionary<V>
|
||||
return (vec!(), 0);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
impl<'a, V> TermDictionary<'a, V> for TermDictionaryImpl<V>
|
||||
where V: BinarySerializable + Default + 'a {
|
||||
|
||||
type Streamer = TermStreamerImpl<'a, V>;
|
||||
|
||||
type StreamBuilder = TermStreamerBuilderImpl<'a, V>;
|
||||
|
||||
/// Opens a `TermDictionary` given a data source.
|
||||
fn from_source(source: ReadOnlySource) -> io::Result<Self> {
|
||||
let total_len = source.len();
|
||||
let length_offset = total_len - 8;
|
||||
let split_len: usize = {
|
||||
let mut split_len_buffer: &[u8] = &source.as_slice()[length_offset..];
|
||||
u64::deserialize(&mut split_len_buffer)? as usize
|
||||
};
|
||||
let stream_data = source.slice(0, split_len);
|
||||
let fst_data = source.slice(split_len, length_offset);
|
||||
let fst_index = open_fst_index(fst_data)?;
|
||||
|
||||
Ok(TermDictionaryImpl {
|
||||
stream_data: stream_data,
|
||||
fst_index: fst_index,
|
||||
_phantom_: PhantomData
|
||||
})
|
||||
}
|
||||
|
||||
/// Lookups the value corresponding to the key.
|
||||
pub fn get<K: AsRef<[u8]>>(&self, target_key: K) -> Option<V> {
|
||||
fn get<K: AsRef<[u8]>>(&self, target_key: K) -> Option<V> {
|
||||
let mut streamer = stream_before(self, target_key.as_ref());
|
||||
while streamer.advance() {
|
||||
let position = streamer.key().cmp(target_key.as_ref());
|
||||
@@ -230,10 +242,9 @@ impl<V> TermDictionary<V>
|
||||
return None;
|
||||
}
|
||||
|
||||
|
||||
/// Returns a range builder, to stream all of the terms
|
||||
/// within an interval.
|
||||
pub fn range(&self) -> TermStreamerBuilder<V> {
|
||||
TermStreamerBuilder::new(self)
|
||||
fn range(&'a self) -> Self::StreamBuilder {
|
||||
Self::StreamBuilder::new(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user