Removed streamdict

Closes #271
This commit is contained in:
Paul Masurel
2018-04-21 17:12:14 +09:00
parent 9b79e21bd7
commit 175b76f119
20 changed files with 249 additions and 1138 deletions

View File

@@ -58,7 +58,6 @@ debug-assertions = false
[features]
default = ["mmap"]
simd = ["bitpacking/simd"]
streamdict = []
mmap = ["fst/mmap", "atomicwrites"]
unstable = ["simd"]

View File

@@ -7,9 +7,6 @@ use schema::Facet;
use std::collections::BTreeMap;
use std::collections::BinaryHeap;
use std::collections::Bound;
use termdict::TermDictionary;
use termdict::TermStreamer;
use termdict::TermStreamerBuilder;
use std::collections::BTreeSet;
use termdict::TermMerger;
use docset::SkipResult;

View File

@@ -1,5 +1,5 @@
use directory::{ReadOnlySource, SourceRead};
use termdict::{TermDictionary, TermDictionaryImpl};
use termdict::TermDictionary;
use postings::{BlockSegmentPostings, SegmentPostings};
use postings::TermInfo;
use schema::IndexRecordOption;
@@ -23,7 +23,7 @@ use schema::FieldType;
/// `InvertedIndexReader` are created by calling
/// the `SegmentReader`'s [`.inverted_index(...)`] method
pub struct InvertedIndexReader {
termdict: TermDictionaryImpl,
termdict: TermDictionary,
postings_source: ReadOnlySource,
positions_source: ReadOnlySource,
record_option: IndexRecordOption,
@@ -32,7 +32,7 @@ pub struct InvertedIndexReader {
impl InvertedIndexReader {
pub(crate) fn new(
termdict: TermDictionaryImpl,
termdict: TermDictionary,
postings_source: ReadOnlySource,
positions_source: ReadOnlySource,
record_option: IndexRecordOption,
@@ -56,7 +56,7 @@ impl InvertedIndexReader {
.get_index_record_option()
.unwrap_or(IndexRecordOption::Basic);
InvertedIndexReader {
termdict: TermDictionaryImpl::empty(field_type),
termdict: TermDictionary::empty(field_type),
postings_source: ReadOnlySource::empty(),
positions_source: ReadOnlySource::empty(),
record_option,
@@ -70,7 +70,7 @@ impl InvertedIndexReader {
}
/// Return the term dictionary datastructure.
pub fn terms(&self) -> &TermDictionaryImpl {
pub fn terms(&self) -> &TermDictionary {
&self.termdict
}

View File

@@ -5,7 +5,7 @@ use collector::Collector;
use query::Query;
use DocAddress;
use schema::{Field, Term};
use termdict::{TermDictionary, TermMerger};
use termdict::TermMerger;
use std::sync::Arc;
use std::fmt;
use schema::Schema;

View File

@@ -18,7 +18,6 @@ use core::InvertedIndexReader;
use schema::Field;
use schema::FieldType;
use error::ErrorKind;
use termdict::TermDictionaryImpl;
use fastfield::FacetReader;
use fastfield::FastFieldReader;
use schema::Schema;
@@ -162,7 +161,7 @@ impl SegmentReader {
field_entry.name()
))
})?;
let termdict = TermDictionaryImpl::from_source(termdict_source);
let termdict = TermDictionary::from_source(termdict_source);
let facet_reader = FacetReader::new(term_ords_reader, termdict);
Ok(facet_reader)
}
@@ -286,7 +285,7 @@ impl SegmentReader {
.expect("Index corrupted. Failed to open field positions in composite file.");
let inv_idx_reader = Arc::new(InvertedIndexReader::new(
TermDictionaryImpl::from_source(termdict_source),
TermDictionary::from_source(termdict_source),
postings_source,
positions_source,
record_option,

View File

@@ -2,7 +2,7 @@ use super::MultiValueIntFastFieldReader;
use DocId;
use termdict::TermOrdinal;
use schema::Facet;
use termdict::{TermDictionary, TermDictionaryImpl};
use termdict::TermDictionary;
/// The facet reader makes it possible to access the list of
/// facets associated to a given document in a specific
@@ -19,7 +19,7 @@ use termdict::{TermDictionary, TermDictionaryImpl};
/// only makes sense for a given segment.
pub struct FacetReader {
term_ords: MultiValueIntFastFieldReader<u64>,
term_dict: TermDictionaryImpl,
term_dict: TermDictionary,
}
impl FacetReader {
@@ -28,11 +28,11 @@ impl FacetReader {
/// A facet reader just wraps :
/// - a `MultiValueIntFastFieldReader` that makes it possible to
/// access the list of facet ords for a given document.
/// - a `TermDictionaryImpl` that helps associating a facet to
/// - a `TermDictionary` that helps associating a facet to
/// an ordinal and vice versa.
pub fn new(
term_ords: MultiValueIntFastFieldReader<u64>,
term_dict: TermDictionaryImpl,
term_dict: TermDictionary,
) -> FacetReader {
FacetReader {
term_ords,
@@ -50,7 +50,7 @@ impl FacetReader {
}
/// Accessor for the facet term dictionary.
pub fn facet_dict(&self) -> &TermDictionaryImpl {
pub fn facet_dict(&self) -> &TermDictionary {
&self.term_dict
}

View File

@@ -14,8 +14,6 @@ use fastfield::FastFieldSerializer;
use fastfield::FastFieldReader;
use store::StoreWriter;
use std::cmp::{max, min};
use termdict::TermDictionary;
use termdict::TermStreamer;
use fieldnorm::FieldNormsSerializer;
use fieldnorm::FieldNormsWriter;
use fieldnorm::FieldNormReader;

View File

@@ -1,5 +1,4 @@
use Result;
use termdict::TermDictionaryBuilderImpl;
use super::TermInfo;
use schema::Field;
use schema::FieldEntry;
@@ -111,7 +110,7 @@ impl InvertedIndexSerializer {
/// The field serializer is in charge of
/// the serialization of a specific field.
pub struct FieldSerializer<'a> {
term_dictionary_builder: TermDictionaryBuilderImpl<&'a mut CountingWriter<WritePtr>>,
term_dictionary_builder: TermDictionaryBuilder<&'a mut CountingWriter<WritePtr>>,
postings_serializer: PostingsSerializer<&'a mut CountingWriter<WritePtr>>,
positions_serializer_opt: Option<PositionSerializer<&'a mut CountingWriter<WritePtr>>>,
current_term_info: TermInfo,
@@ -141,7 +140,7 @@ impl<'a> FieldSerializer<'a> {
_ => (false, false),
};
let term_dictionary_builder =
TermDictionaryBuilderImpl::new(term_dictionary_write, field_type)?;
TermDictionaryBuilder::new(term_dictionary_write, field_type)?;
let postings_serializer = PostingsSerializer::new(postings_write, term_freq_enabled);
let positions_serializer_opt = if position_enabled {
Some(PositionSerializer::new(positions_write))

View File

@@ -1,6 +1,6 @@
use schema::{Field, IndexRecordOption, Term};
use query::{Query, Scorer, Weight};
use termdict::{TermDictionary, TermStreamer, TermStreamerBuilder};
use termdict::{TermDictionary, TermStreamer};
use core::SegmentReader;
use common::BitSet;
use Result;
@@ -213,10 +213,7 @@ pub struct RangeWeight {
}
impl RangeWeight {
fn term_range<'a, T>(&self, term_dict: &'a T) -> T::Streamer
where
T: TermDictionary<'a> + 'a,
{
fn term_range<'a>(&self, term_dict: &'a TermDictionary) -> TermStreamer<'a> {
use std::collections::Bound::*;
let mut term_stream_builder = term_dict.range();
term_stream_builder = match self.left_bound {

View File

@@ -1,25 +0,0 @@
/*!
The term dictionary contains all of the terms in
`tantivy index` in a sorted manner.
`fstdict` is the default implementation
of the term dictionary. It is implemented as a wrapper
of the `Fst` crate in order to add a value type.
A finite state transducer itself associates
each term `&[u8]` to a `u64` that is in fact an address
in a buffer. The value is then accessible via
deserializing the value at this address.
Keys (`&[u8]`) in this datastructure are sorted.
*/
mod termdict;
mod streamer;
mod term_info_store;
pub use self::termdict::TermDictionaryImpl;
pub use self::termdict::TermDictionaryBuilderImpl;
pub use self::term_info_store::{TermInfoStore, TermInfoStoreWriter};
pub use self::streamer::TermStreamerImpl;
pub use self::streamer::TermStreamerBuilderImpl;

View File

@@ -1,89 +0,0 @@
use fst::{IntoStreamer, Streamer};
use fst::map::{Stream, StreamBuilder};
use postings::TermInfo;
use super::TermDictionaryImpl;
use termdict::{TermDictionary, TermOrdinal, TermStreamer, TermStreamerBuilder};
/// See [`TermStreamerBuilder`](./trait.TermStreamerBuilder.html)
pub struct TermStreamerBuilderImpl<'a> {
fst_map: &'a TermDictionaryImpl,
stream_builder: StreamBuilder<'a>,
}
impl<'a> TermStreamerBuilderImpl<'a> {
pub(crate) fn new(fst_map: &'a TermDictionaryImpl, stream_builder: StreamBuilder<'a>) -> Self {
TermStreamerBuilderImpl {
fst_map,
stream_builder,
}
}
}
impl<'a> TermStreamerBuilder for TermStreamerBuilderImpl<'a> {
type Streamer = TermStreamerImpl<'a>;
fn ge<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
self.stream_builder = self.stream_builder.ge(bound);
self
}
fn gt<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
self.stream_builder = self.stream_builder.gt(bound);
self
}
fn le<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
self.stream_builder = self.stream_builder.le(bound);
self
}
fn lt<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
self.stream_builder = self.stream_builder.lt(bound);
self
}
fn into_stream(self) -> Self::Streamer {
TermStreamerImpl {
fst_map: self.fst_map,
stream: self.stream_builder.into_stream(),
term_ord: 0u64,
current_key: Vec::with_capacity(100),
current_value: TermInfo::default(),
}
}
}
/// See [`TermStreamer`](./trait.TermStreamer.html)
pub struct TermStreamerImpl<'a> {
fst_map: &'a TermDictionaryImpl,
stream: Stream<'a>,
term_ord: TermOrdinal,
current_key: Vec<u8>,
current_value: TermInfo,
}
impl<'a> TermStreamer for TermStreamerImpl<'a> {
fn advance(&mut self) -> bool {
if let Some((term, term_ord)) = self.stream.next() {
self.current_key.clear();
self.current_key.extend_from_slice(term);
self.term_ord = term_ord;
self.current_value = self.fst_map.term_info_from_ord(term_ord);
true
} else {
false
}
}
fn term_ord(&self) -> TermOrdinal {
self.term_ord
}
fn key(&self) -> &[u8] {
&self.current_key
}
fn value(&self) -> &TermInfo {
&self.current_value
}
}

View File

@@ -1,11 +1,10 @@
use std::collections::BinaryHeap;
use termdict::TermStreamerImpl;
use std::cmp::Ordering;
use termdict::TermStreamer;
use std::cmp::Ordering;
use schema::Term;
pub struct HeapItem<'a> {
pub streamer: TermStreamerImpl<'a>,
pub streamer: TermStreamer<'a>,
pub segment_ord: usize,
}
@@ -45,7 +44,7 @@ impl<'a> TermMerger<'a> {
/// Stream of merged term dictionary
///
///
pub fn new(streams: Vec<TermStreamerImpl<'a>>) -> TermMerger<'a> {
pub fn new(streams: Vec<TermStreamer<'a>>) -> TermMerger<'a> {
TermMerger {
heap: BinaryHeap::new(),
current_streamers: streams

View File

@@ -47,193 +47,26 @@ followed by a streaming through at most `1024` elements in the
term `stream`.
*/
use schema::{Field, FieldType, Term};
use directory::ReadOnlySource;
use postings::TermInfo;
/// Position of the term in the sorted list of terms.
pub type TermOrdinal = u64;
pub use self::merger::TermMerger;
#[cfg(not(feature = "streamdict"))]
mod fstdict;
#[cfg(not(feature = "streamdict"))]
pub use self::fstdict::{TermDictionaryBuilderImpl, TermDictionaryImpl, TermStreamerBuilderImpl,
TermStreamerImpl};
#[cfg(feature = "streamdict")]
mod streamdict;
#[cfg(feature = "streamdict")]
pub use self::streamdict::{TermDictionaryBuilderImpl, TermDictionaryImpl, TermStreamerBuilderImpl,
TermStreamerImpl};
mod term_info_store;
mod streamer;
mod termdict;
mod merger;
use std::io;
/// Dictionary associating sorted `&[u8]` to values
pub trait TermDictionary<'a>
where
Self: Sized,
{
/// Streamer type associated to the term dictionary
type Streamer: TermStreamer + 'a;
/// StreamerBuilder type associated to the term dictionary
type StreamBuilder: TermStreamerBuilder<Streamer = Self::Streamer> + 'a;
/// Opens a `TermDictionary` given a data source.
fn from_source(source: ReadOnlySource) -> Self;
/// Returns the number of terms in the dictionary.
/// Term ordinals range from 0 to `num_terms() - 1`.
fn num_terms(&self) -> usize;
/// Returns the ordinal associated to a given term.
fn term_ord<K: AsRef<[u8]>>(&self, term: K) -> Option<TermOrdinal>;
/// Returns the term associated to a given term ordinal.
///
/// Term ordinals are defined as the position of the term in
/// the sorted list of terms.
///
/// Returns true iff the term has been found.
///
/// Regardless of whether the term is found or not,
/// the buffer may be modified.
fn ord_to_term(&self, ord: TermOrdinal, bytes: &mut Vec<u8>) -> bool;
/// Returns the number of terms in the dictionary.
fn term_info_from_ord(&self, term_ord: TermOrdinal) -> TermInfo;
/// Lookups the value corresponding to the key.
fn get<K: AsRef<[u8]>>(&self, target_key: K) -> Option<TermInfo>;
/// Returns a range builder, to stream all of the terms
/// within an interval.
fn range(&'a self) -> Self::StreamBuilder;
/// A stream of all the sorted terms. [See also `.stream_field()`](#method.stream_field)
fn stream(&'a self) -> Self::Streamer {
self.range().into_stream()
}
/// A stream of all the sorted terms in the given field.
fn stream_field(&'a self, field: Field) -> Self::Streamer {
let start_term = Term::from_field_text(field, "");
let stop_term = Term::from_field_text(Field(field.0 + 1), "");
self.range()
.ge(start_term.as_slice())
.lt(stop_term.as_slice())
.into_stream()
}
/// Creates an empty term dictionary which contains no terms.
fn empty(field_type: FieldType) -> Self;
}
/// Builder for the new term dictionary.
///
/// Inserting must be done in the order of the `keys`.
pub trait TermDictionaryBuilder<W>: Sized
where
W: io::Write,
{
/// Creates a new `TermDictionaryBuilder`
fn new(write: W, field_type: FieldType) -> io::Result<Self>;
/// Inserts a `(key, value)` pair in the term dictionary.
///
/// *Keys have to be inserted in order.*
fn insert<K: AsRef<[u8]>>(&mut self, key: K, value: &TermInfo) -> io::Result<()>;
/// Finalize writing the builder, and returns the underlying
/// `Write` object.
fn finish(self) -> io::Result<W>;
}
/// `TermStreamer` acts as a cursor over a range of terms of a segment.
/// Terms are guaranteed to be sorted.
pub trait TermStreamer: Sized {
/// Advance position the stream on the next item.
/// Before the first call to `.advance()`, the stream
/// is an unitialized state.
fn advance(&mut self) -> bool;
/// Accesses the current key.
///
/// `.key()` should return the key that was returned
/// by the `.next()` method.
///
/// If the end of the stream as been reached, and `.next()`
/// has been called and returned `None`, `.key()` remains
/// the value of the last key encountered.
///
/// Before any call to `.next()`, `.key()` returns an empty array.
fn key(&self) -> &[u8];
/// Returns the `TermOrdinal` of the given term.
///
/// May panic if the called as `.advance()` as never
/// been called before.
fn term_ord(&self) -> TermOrdinal;
/// Accesses the current value.
///
/// Calling `.value()` after the end of the stream will return the
/// last `.value()` encountered.
///
/// # Panics
///
/// Calling `.value()` before the first call to `.advance()` returns
/// `V::default()`.
fn value(&self) -> &TermInfo;
/// Return the next `(key, value)` pair.
fn next(&mut self) -> Option<(&[u8], &TermInfo)> {
if self.advance() {
Some((self.key(), self.value()))
} else {
None
}
}
}
/// `TermStreamerBuilder` is an helper object used to define
/// a range of terms that should be streamed.
pub trait TermStreamerBuilder {
/// Associated `TermStreamer` type that this builder is building.
type Streamer: TermStreamer;
/// Limit the range to terms greater or equal to the bound
fn ge<T: AsRef<[u8]>>(self, bound: T) -> Self;
/// Limit the range to terms strictly greater than the bound
fn gt<T: AsRef<[u8]>>(self, bound: T) -> Self;
/// Limit the range to terms lesser or equal to the bound
fn lt<T: AsRef<[u8]>>(self, bound: T) -> Self;
/// Limit the range to terms lesser or equal to the bound
fn le<T: AsRef<[u8]>>(self, bound: T) -> Self;
/// Creates the stream corresponding to the range
/// of terms defined using the `TermStreamerBuilder`.
fn into_stream(self) -> Self::Streamer;
}
pub use self::termdict::{TermDictionary, TermDictionaryBuilder};
pub use self::streamer::{TermStreamer, TermStreamerBuilder};
pub use self::merger::TermMerger;
#[cfg(test)]
mod tests {
use super::{TermDictionaryBuilderImpl, TermDictionaryImpl, TermStreamerImpl};
use super::{TermDictionaryBuilder, TermDictionary, TermStreamer};
use directory::{Directory, RAMDirectory, ReadOnlySource};
use std::path::PathBuf;
use schema::{Document, FieldType, SchemaBuilder, TEXT};
use core::Index;
use std::str;
use termdict::TermStreamer;
use termdict::TermStreamerBuilder;
use termdict::TermDictionary;
use termdict::TermDictionaryBuilder;
use postings::TermInfo;
const BLOCK_SIZE: usize = 1_500;
@@ -264,7 +97,7 @@ mod tests {
let write = directory.open_write(&path).unwrap();
let field_type = FieldType::Str(TEXT);
let mut term_dictionary_builder =
TermDictionaryBuilderImpl::new(write, field_type).unwrap();
TermDictionaryBuilder::new(write, field_type).unwrap();
for term in COUNTRIES.iter() {
term_dictionary_builder
.insert(term.as_bytes(), &make_term_info(0u64))
@@ -273,7 +106,7 @@ mod tests {
term_dictionary_builder.finish().unwrap();
}
let source = directory.open_read(&path).unwrap();
let term_dict: TermDictionaryImpl = TermDictionaryImpl::from_source(source);
let term_dict: TermDictionary = TermDictionary::from_source(source);
for (term_ord, term) in COUNTRIES.iter().enumerate() {
assert_eq!(term_dict.term_ord(term).unwrap(), term_ord as u64);
let mut bytes = vec![];
@@ -290,7 +123,7 @@ mod tests {
let write = directory.open_write(&path).unwrap();
let field_type = FieldType::Str(TEXT);
let mut term_dictionary_builder =
TermDictionaryBuilderImpl::new(write, field_type).unwrap();
TermDictionaryBuilder::new(write, field_type).unwrap();
term_dictionary_builder
.insert("abc".as_bytes(), &make_term_info(34u64))
.unwrap();
@@ -300,7 +133,7 @@ mod tests {
term_dictionary_builder.finish().unwrap();
}
let source = directory.open_read(&path).unwrap();
let term_dict: TermDictionaryImpl = TermDictionaryImpl::from_source(source);
let term_dict: TermDictionary = TermDictionary::from_source(source);
assert_eq!(term_dict.get("abc").unwrap().doc_freq, 34u32);
assert_eq!(term_dict.get("abcd").unwrap().doc_freq, 346u32);
let mut stream = term_dict.stream();
@@ -378,7 +211,7 @@ mod tests {
let field_type = FieldType::Str(TEXT);
let buffer: Vec<u8> = {
let mut term_dictionary_builder =
TermDictionaryBuilderImpl::new(vec![], field_type).unwrap();
TermDictionaryBuilder::new(vec![], field_type).unwrap();
for &(ref id, ref i) in &ids {
term_dictionary_builder
.insert(id.as_bytes(), &make_term_info(*i as u64))
@@ -387,7 +220,7 @@ mod tests {
term_dictionary_builder.finish().unwrap()
};
let source = ReadOnlySource::from(buffer);
let term_dictionary: TermDictionaryImpl = TermDictionaryImpl::from_source(source);
let term_dictionary: TermDictionary = TermDictionary::from_source(source);
{
let mut streamer = term_dictionary.stream();
let mut i = 0;
@@ -408,7 +241,7 @@ mod tests {
let field_type = FieldType::Str(TEXT);
let buffer: Vec<u8> = {
let mut term_dictionary_builder =
TermDictionaryBuilderImpl::new(vec![], field_type).unwrap();
TermDictionaryBuilder::new(vec![], field_type).unwrap();
// term requires more than 16bits
term_dictionary_builder
.insert("abcdefghijklmnopqrstuvwxy", &make_term_info(1))
@@ -422,7 +255,7 @@ mod tests {
term_dictionary_builder.finish().unwrap()
};
let source = ReadOnlySource::from(buffer);
let term_dictionary: TermDictionaryImpl = TermDictionaryImpl::from_source(source);
let term_dictionary: TermDictionary = TermDictionary::from_source(source);
let mut kv_stream = term_dictionary.stream();
assert!(kv_stream.advance());
assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxy".as_bytes());
@@ -443,7 +276,7 @@ mod tests {
let field_type = FieldType::Str(TEXT);
let buffer: Vec<u8> = {
let mut term_dictionary_builder =
TermDictionaryBuilderImpl::new(vec![], field_type).unwrap();
TermDictionaryBuilder::new(vec![], field_type).unwrap();
for &(ref id, ref i) in &ids {
term_dictionary_builder
.insert(id.as_bytes(), &make_term_info(*i as u64))
@@ -454,7 +287,7 @@ mod tests {
let source = ReadOnlySource::from(buffer);
let term_dictionary: TermDictionaryImpl = TermDictionaryImpl::from_source(source);
let term_dictionary: TermDictionary = TermDictionary::from_source(source);
{
for i in (0..20).chain(6000..8_000) {
let &(ref target_key, _) = &ids[i];
@@ -512,7 +345,7 @@ mod tests {
let field_type = FieldType::Str(TEXT);
let buffer: Vec<u8> = {
let mut term_dictionary_builder =
TermDictionaryBuilderImpl::new(vec![], field_type).unwrap();
TermDictionaryBuilder::new(vec![], field_type).unwrap();
term_dictionary_builder
.insert(&[], &make_term_info(1 as u64))
.unwrap();
@@ -522,7 +355,7 @@ mod tests {
term_dictionary_builder.finish().unwrap()
};
let source = ReadOnlySource::from(buffer);
let term_dictionary: TermDictionaryImpl = TermDictionaryImpl::from_source(source);
let term_dictionary: TermDictionary = TermDictionary::from_source(source);
let mut stream = term_dictionary.stream();
assert!(stream.advance());
assert!(stream.key().is_empty());
@@ -536,7 +369,7 @@ mod tests {
let field_type = FieldType::Str(TEXT);
let buffer: Vec<u8> = {
let mut term_dictionary_builder =
TermDictionaryBuilderImpl::new(vec![], field_type).unwrap();
TermDictionaryBuilder::new(vec![], field_type).unwrap();
for i in 0u8..10u8 {
let number_arr = [i; 1];
term_dictionary_builder
@@ -546,9 +379,9 @@ mod tests {
term_dictionary_builder.finish().unwrap()
};
let source = ReadOnlySource::from(buffer);
let term_dictionary: TermDictionaryImpl = TermDictionaryImpl::from_source(source);
let term_dictionary: TermDictionary = TermDictionary::from_source(source);
let value_list = |mut streamer: TermStreamerImpl| {
let value_list = |mut streamer: TermStreamer| {
let mut res: Vec<u32> = vec![];
while let Some((_, ref v)) = streamer.next() {
res.push(v.doc_freq);

View File

@@ -1,179 +0,0 @@
use postings::TermInfo;
use super::CheckPoint;
use std::mem;
use common::BinarySerializable;
/// Returns the len of the longest
/// common prefix of `s1` and `s2`.
///
/// ie: the greatest `L` such that
/// for all `0 <= i < L`, `s1[i] == s2[i]`
fn common_prefix_len(s1: &[u8], s2: &[u8]) -> usize {
s1.iter()
.zip(s2.iter())
.take_while(|&(a, b)| a == b)
.count()
}
#[derive(Default)]
pub struct TermDeltaEncoder {
last_term: Vec<u8>,
prefix_len: usize,
}
impl TermDeltaEncoder {
pub fn encode<'a>(&mut self, term: &'a [u8]) {
self.prefix_len = common_prefix_len(term, &self.last_term);
self.last_term.truncate(self.prefix_len);
self.last_term.extend_from_slice(&term[self.prefix_len..]);
}
pub fn term(&self) -> &[u8] {
&self.last_term[..]
}
pub fn prefix_suffix(&mut self) -> (usize, &[u8]) {
(self.prefix_len, &self.last_term[self.prefix_len..])
}
}
#[derive(Default)]
pub struct TermDeltaDecoder {
term: Vec<u8>,
}
impl TermDeltaDecoder {
pub fn with_previous_term(term: Vec<u8>) -> TermDeltaDecoder {
TermDeltaDecoder {
term: Vec::from(term),
}
}
// code
// first bit represents whether the prefix / suffix len can be encoded
// on the same byte. (the next one)
//
#[inline(always)]
pub fn decode<'a>(&mut self, code: u8, mut cursor: &'a [u8]) -> &'a [u8] {
let (prefix_len, suffix_len): (usize, usize) = if (code & 1u8) == 1u8 {
let b = cursor[0];
cursor = &cursor[1..];
let prefix_len = (b & 15u8) as usize;
let suffix_len = (b >> 4u8) as usize;
(prefix_len, suffix_len)
} else {
let prefix_len = u32::deserialize(&mut cursor).unwrap();
let suffix_len = u32::deserialize(&mut cursor).unwrap();
(prefix_len as usize, suffix_len as usize)
};
unsafe { self.term.set_len(prefix_len) };
self.term.extend_from_slice(&(*cursor)[..suffix_len]);
&cursor[suffix_len..]
}
pub fn term(&self) -> &[u8] {
&self.term[..]
}
}
#[derive(Default)]
pub struct DeltaTermInfo {
pub doc_freq: u32,
pub delta_postings_offset: u64,
pub delta_positions_offset: u64,
pub positions_inner_offset: u8,
}
pub struct TermInfoDeltaEncoder {
term_info: TermInfo,
pub has_positions: bool,
}
impl TermInfoDeltaEncoder {
pub fn new(has_positions: bool) -> Self {
TermInfoDeltaEncoder {
term_info: TermInfo::default(),
has_positions,
}
}
pub fn term_info(&self) -> &TermInfo {
&self.term_info
}
pub fn encode(&mut self, term_info: TermInfo) -> DeltaTermInfo {
let mut delta_term_info = DeltaTermInfo {
doc_freq: term_info.doc_freq,
delta_postings_offset: term_info.postings_offset - self.term_info.postings_offset,
delta_positions_offset: 0u64,
positions_inner_offset: 0,
};
if self.has_positions {
delta_term_info.delta_positions_offset =
term_info.positions_offset - self.term_info.positions_offset;
delta_term_info.positions_inner_offset = term_info.positions_inner_offset;
}
mem::replace(&mut self.term_info, term_info);
delta_term_info
}
}
pub struct TermInfoDeltaDecoder {
term_info: TermInfo,
has_positions: bool,
}
#[inline(always)]
pub fn make_mask(num_bytes: usize) -> u32 {
const MASK: [u32; 4] = [0xffu32, 0xffffu32, 0xffffffu32, 0xffffffffu32];
*unsafe { MASK.get_unchecked(num_bytes.wrapping_sub(1) as usize) }
}
impl TermInfoDeltaDecoder {
pub fn from_term_info(term_info: TermInfo, has_positions: bool) -> TermInfoDeltaDecoder {
TermInfoDeltaDecoder {
term_info,
has_positions,
}
}
pub fn from_checkpoint(checkpoint: &CheckPoint, has_positions: bool) -> TermInfoDeltaDecoder {
TermInfoDeltaDecoder {
term_info: TermInfo {
doc_freq: 0u32,
postings_offset: checkpoint.postings_offset,
positions_offset: checkpoint.positions_offset,
positions_inner_offset: 0u8,
},
has_positions,
}
}
#[inline(always)]
pub fn decode<'a>(&mut self, code: u8, mut cursor: &'a [u8]) -> &'a [u8] {
let num_bytes_docfreq: usize = ((code >> 1) & 3) as usize + 1;
let num_bytes_postings_offset: usize = ((code >> 3) & 3) as usize + 1;
let mut v: u64 = unsafe { *(cursor.as_ptr() as *const u64) };
let doc_freq: u32 = (v as u32) & make_mask(num_bytes_docfreq);
v >>= (num_bytes_docfreq as u64) * 8u64;
let delta_postings_offset: u64 = v & make_mask(num_bytes_postings_offset);
cursor = &cursor[num_bytes_docfreq + num_bytes_postings_offset..];
self.term_info.doc_freq = doc_freq;
self.term_info.postings_offset += delta_postings_offset;
if self.has_positions {
let num_bytes_positions_offset = ((code >> 5) & 3) as usize + 1;
let delta_positions_offset: u32 =
unsafe { *(cursor.as_ptr() as *const u32) } & make_mask(num_bytes_positions_offset);
self.term_info.positions_offset += delta_positions_offset;
self.term_info.positions_inner_offset = cursor[num_bytes_positions_offset];
&cursor[num_bytes_positions_offset + 1..]
} else {
cursor
}
}
pub fn term_info(&self) -> &TermInfo {
&self.term_info
}
}

View File

@@ -1,41 +0,0 @@
use std::io::{self, Read, Write};
use common::BinarySerializable;
mod termdict;
mod streamer;
mod delta_encoder;
pub use self::delta_encoder::{TermDeltaDecoder, TermDeltaEncoder};
pub use self::delta_encoder::{DeltaTermInfo, TermInfoDeltaDecoder, TermInfoDeltaEncoder};
pub use self::termdict::TermDictionaryImpl;
pub use self::termdict::TermDictionaryBuilderImpl;
pub use self::streamer::TermStreamerImpl;
pub use self::streamer::TermStreamerBuilderImpl;
#[derive(Debug)]
pub struct CheckPoint {
pub stream_offset: u32,
pub postings_offset: u32,
pub positions_offset: u32,
}
impl BinarySerializable for CheckPoint {
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
self.stream_offset.serialize(writer)?;
self.postings_offset.serialize(writer)?;
self.positions_offset.serialize(writer)?;
Ok(())
}
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
let stream_offset = u32::deserialize(reader)?;
let postings_offset = u32::deserialize(reader)?;
let positions_offset = u32::deserialize(reader)?;
Ok(CheckPoint {
stream_offset,
postings_offset,
positions_offset,
})
}
}

View File

@@ -1,184 +0,0 @@
#![allow(should_implement_trait)]
use std::cmp::max;
use super::TermDictionaryImpl;
use termdict::{TermStreamer, TermStreamerBuilder};
use postings::TermInfo;
use super::delta_encoder::{TermDeltaDecoder, TermInfoDeltaDecoder};
fn stream_before<'a>(
term_dictionary: &'a TermDictionaryImpl,
target_key: &[u8],
has_positions: bool,
) -> TermStreamerImpl<'a> {
let (prev_key, checkpoint) = term_dictionary.strictly_previous_key(target_key.as_ref());
let stream_data: &'a [u8] = &term_dictionary.stream_data()[checkpoint.stream_offset as usize..];
TermStreamerImpl {
cursor: stream_data,
term_delta_decoder: TermDeltaDecoder::with_previous_term(prev_key),
term_info_decoder: TermInfoDeltaDecoder::from_checkpoint(&checkpoint, has_positions),
}
}
/// See [`TermStreamerBuilder`](./trait.TermStreamerBuilder.html)
pub struct TermStreamerBuilderImpl<'a> {
term_dictionary: &'a TermDictionaryImpl,
origin: usize,
offset_from: usize,
offset_to: usize,
current_key: Vec<u8>,
term_info: TermInfo,
has_positions: bool,
}
impl<'a> TermStreamerBuilder for TermStreamerBuilderImpl<'a> {
type Streamer = TermStreamerImpl<'a>;
/// Limit the range to terms greater or equal to the bound
fn ge<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
let target_key = bound.as_ref();
let streamer = stream_before(
self.term_dictionary,
target_key.as_ref(),
self.has_positions,
);
let smaller_than = |k: &[u8]| k.lt(target_key);
let (offset_before, current_key, term_info) = get_offset(smaller_than, streamer);
self.current_key = current_key;
self.term_info = term_info;
self.offset_from = offset_before - self.origin;
self
}
/// Limit the range to terms strictly greater than the bound
fn gt<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
let target_key = bound.as_ref();
let streamer = stream_before(
self.term_dictionary,
target_key.as_ref(),
self.has_positions,
);
let smaller_than = |k: &[u8]| k.le(target_key);
let (offset_before, current_key, term_info) = get_offset(smaller_than, streamer);
self.current_key = current_key;
self.term_info = term_info;
self.offset_from = offset_before - self.origin;
self
}
/// Limit the range to terms lesser or equal to the bound
fn lt<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
let target_key = bound.as_ref();
let streamer = stream_before(
self.term_dictionary,
target_key.as_ref(),
self.has_positions,
);
let smaller_than = |k: &[u8]| k.lt(target_key);
let (offset_before, _, _) = get_offset(smaller_than, streamer);
self.offset_to = offset_before - self.origin;
self
}
/// Limit the range to terms lesser or equal to the bound
fn le<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
let target_key = bound.as_ref();
let streamer = stream_before(
self.term_dictionary,
target_key.as_ref(),
self.has_positions,
);
let smaller_than = |k: &[u8]| k.le(target_key);
let (offset_before, _, _) = get_offset(smaller_than, streamer);
self.offset_to = offset_before - self.origin;
self
}
/// Build the streamer.
fn into_stream(self) -> Self::Streamer {
let data: &[u8] = self.term_dictionary.stream_data();
let start = self.offset_from;
let stop = max(self.offset_to, start);
let term_delta_decoder = TermDeltaDecoder::with_previous_term(self.current_key);
let term_info_decoder =
TermInfoDeltaDecoder::from_term_info(self.term_info, self.has_positions);
TermStreamerImpl {
cursor: &data[start..stop],
term_delta_decoder,
term_info_decoder,
}
}
}
/// Returns offset information for the first
/// key in the stream matching a given predicate.
///
/// returns
/// - the block start
/// - the index within this block
/// - the term_buffer state to initialize the block)
fn get_offset<'a, P: Fn(&[u8]) -> bool>(
predicate: P,
mut streamer: TermStreamerImpl<'a>,
) -> (usize, Vec<u8>, TermInfo) {
let mut prev: &[u8] = streamer.cursor;
let mut term_info = streamer.value().clone();
let mut prev_data: Vec<u8> = Vec::from(streamer.term_delta_decoder.term());
while let Some((iter_key, iter_term_info)) = streamer.next() {
if !predicate(iter_key.as_ref()) {
return (prev.as_ptr() as usize, prev_data, term_info);
}
prev = streamer.cursor;
prev_data.clear();
prev_data.extend_from_slice(iter_key.as_ref());
term_info = iter_term_info.clone();
}
(prev.as_ptr() as usize, prev_data, term_info)
}
impl<'a> TermStreamerBuilderImpl<'a> {
pub(crate) fn new(term_dictionary: &'a TermDictionaryImpl, has_positions: bool) -> Self {
let data = term_dictionary.stream_data();
let origin = data.as_ptr() as usize;
TermStreamerBuilderImpl {
term_dictionary,
term_info: TermInfo::default(),
origin,
offset_from: 0,
offset_to: data.len(),
current_key: Vec::with_capacity(300),
has_positions,
}
}
}
/// See [`TermStreamer`](./trait.TermStreamer.html)
pub struct TermStreamerImpl<'a> {
cursor: &'a [u8],
term_delta_decoder: TermDeltaDecoder,
term_info_decoder: TermInfoDeltaDecoder,
}
impl<'a> TermStreamer for TermStreamerImpl<'a> {
fn advance(&mut self) -> bool {
if self.cursor.is_empty() {
return false;
}
let mut cursor: &[u8] = &self.cursor;
let code: u8 = cursor[0];
cursor = self.term_delta_decoder.decode(code, &cursor[1..]);
cursor = self.term_info_decoder.decode(code, cursor);
self.cursor = cursor;
true
}
fn key(&self) -> &[u8] {
self.term_delta_decoder.term()
}
fn value(&self) -> &TermInfo {
&self.term_info_decoder.term_info()
}
}

View File

@@ -1,354 +0,0 @@
#![allow(should_implement_trait)]
use std::io::{self, Write};
use super::CheckPoint;
use fst;
use fst::raw::Fst;
use directory::ReadOnlySource;
use common::BinarySerializable;
use common::CountingWriter;
use postings::TermInfo;
use schema::FieldType;
use super::{DeltaTermInfo, TermDeltaEncoder, TermInfoDeltaEncoder};
use fst::raw::Node;
use termdict::{TermDictionary, TermDictionaryBuilder, TermStreamer};
use super::{TermStreamerBuilderImpl, TermStreamerImpl};
use termdict::TermStreamerBuilder;
use std::mem::transmute;
const PADDING_SIZE: usize = 4;
const INDEX_INTERVAL: usize = 1024;
fn convert_fst_error(e: fst::Error) -> io::Error {
io::Error::new(io::ErrorKind::Other, e)
}
fn has_positions(field_type: &FieldType) -> bool {
match *field_type {
FieldType::Str(ref text_options) => {
let indexing_options = text_options.get_indexing_options();
if indexing_options.is_position_enabled() {
true
} else {
false
}
}
_ => false,
}
}
/// See [`TermDictionaryBuilder`](./trait.TermDictionaryBuilder.html)
pub struct TermDictionaryBuilderImpl<W> {
write: CountingWriter<W>,
term_delta_encoder: TermDeltaEncoder,
term_info_encoder: TermInfoDeltaEncoder,
block_index: fst::MapBuilder<Vec<u8>>,
checkpoints: Vec<u8>,
len: usize,
}
fn fill_last<'a>(fst: &'a Fst, mut node: Node<'a>, buffer: &mut Vec<u8>) {
while let Some(transition) = node.transitions().last() {
buffer.push(transition.inp);
node = fst.node(transition.addr);
}
}
impl<W> TermDictionaryBuilderImpl<W>
where
W: Write,
{
fn add_index_entry(&mut self) {
let stream_offset = self.write.written_bytes() as u32;
let term_info = self.term_info_encoder.term_info();
let postings_offset = term_info.postings_offset as u32;
let positions_offset = term_info.positions_offset as u32;
let checkpoint = CheckPoint {
stream_offset,
postings_offset,
positions_offset,
};
self.block_index
.insert(
&self.term_delta_encoder.term(),
self.checkpoints.len() as u64,
)
.expect(
"Serializing fst on a Vec<u8> should never fail. \
Where your terms not in order maybe?",
);
checkpoint
.serialize(&mut self.checkpoints)
.expect("Serializing checkpoint on a Vec<u8> should never fail.");
}
/// # Warning
/// Horribly dangerous internal API
///
/// If used, it must be used by systematically alternating calls
/// to insert_key and insert_value.
///
/// Prefer using `.insert(key, value)`
pub(crate) fn insert_key(&mut self, key: &[u8]) -> io::Result<()> {
if self.len % INDEX_INTERVAL == 0 {
self.add_index_entry();
}
self.term_delta_encoder.encode(key);
Ok(())
}
pub(crate) fn insert_value(&mut self, term_info: &TermInfo) -> io::Result<()> {
let delta_term_info = self.term_info_encoder.encode(term_info.clone());
let (prefix_len, suffix) = self.term_delta_encoder.prefix_suffix();
write_term_kv(
prefix_len,
suffix,
&delta_term_info,
self.term_info_encoder.has_positions,
&mut self.write,
)?;
self.len += 1;
Ok(())
}
}
fn num_bytes_required(mut n: u32) -> u8 {
for i in 1u8..5u8 {
if n < 256u32 {
return i;
} else {
n /= 256;
}
}
0u8
}
fn write_term_kv<W: Write>(
prefix_len: usize,
suffix: &[u8],
delta_term_info: &DeltaTermInfo,
has_positions: bool,
write: &mut W,
) -> io::Result<()> {
let suffix_len = suffix.len();
let mut code = 0u8;
let num_bytes_docfreq = num_bytes_required(delta_term_info.doc_freq);
let num_bytes_postings_offset = num_bytes_required(delta_term_info.delta_postings_offset);
let num_bytes_positions_offset = num_bytes_required(delta_term_info.delta_positions_offset);
code |= (num_bytes_docfreq - 1) << 1u8;
code |= (num_bytes_postings_offset - 1) << 3u8;
code |= (num_bytes_positions_offset - 1) << 5u8;
if (prefix_len < 16) && (suffix_len < 16) {
code |= 1u8;
write.write_all(&[code, (prefix_len as u8) | ((suffix_len as u8) << 4u8)])?;
} else {
write.write_all(&[code])?;
(prefix_len as u32).serialize(write)?;
(suffix_len as u32).serialize(write)?;
}
write.write_all(suffix)?;
{
let bytes: [u8; 4] = unsafe { transmute(delta_term_info.doc_freq) };
write.write_all(&bytes[0..num_bytes_docfreq as usize])?;
}
{
let bytes: [u8; 4] = unsafe { transmute(delta_term_info.delta_postings_offset) };
write.write_all(&bytes[0..num_bytes_postings_offset as usize])?;
}
if has_positions {
let bytes: [u8; 4] = unsafe { transmute(delta_term_info.delta_positions_offset) };
write.write_all(&bytes[0..num_bytes_positions_offset as usize])?;
write.write_all(&[delta_term_info.positions_inner_offset])?;
}
Ok(())
}
impl<W> TermDictionaryBuilder<W> for TermDictionaryBuilderImpl<W>
where
W: Write,
{
/// Creates a new `TermDictionaryBuilder`
fn new(mut write: W, field_type: FieldType) -> io::Result<Self> {
let has_positions = has_positions(&field_type);
let has_positions_code = if has_positions { 255u8 } else { 0u8 };
write.write_all(&[has_positions_code])?;
Ok(TermDictionaryBuilderImpl {
write: CountingWriter::wrap(write),
term_delta_encoder: TermDeltaEncoder::default(),
term_info_encoder: TermInfoDeltaEncoder::new(has_positions),
block_index: fst::MapBuilder::new(vec![]).expect("This cannot fail"),
checkpoints: vec![],
len: 0,
})
}
/// Inserts a `(key, value)` pair in the term dictionary.
///
/// *Keys have to be inserted in order.*
fn insert<K: AsRef<[u8]>>(&mut self, key_ref: K, value: &TermInfo) -> io::Result<()> {
let key = key_ref.as_ref();
self.insert_key(key)?;
self.insert_value(value)?;
Ok(())
}
/// Finalize writing the builder, and returns the underlying
/// `Write` object.
fn finish(mut self) -> io::Result<W> {
self.add_index_entry();
self.write.write_all(&[0u8; PADDING_SIZE])?;
let fst_addr = self.write.written_bytes();
let fst_write = self.block_index.into_inner().map_err(convert_fst_error)?;
self.write.write_all(&fst_write)?;
let check_points_addr = self.write.written_bytes();
let (mut w, _) = self.write.finish()?;
w.write_all(&self.checkpoints)?;
(fst_addr as u64).serialize(&mut w)?;
(check_points_addr as u64).serialize(&mut w)?;
w.flush()?;
Ok(w)
}
}
fn open_fst_index(source: ReadOnlySource) -> io::Result<fst::Map> {
use self::ReadOnlySource::*;
let fst_result = match source {
Anonymous(data) => Fst::from_shared_bytes(data.data, data.start, data.len),
Mmap(mmap_readonly) => Fst::from_mmap(mmap_readonly),
};
let fst = fst_result.map_err(convert_fst_error)?;
Ok(fst::Map::from(fst))
}
/// See [`TermDictionary`](./trait.TermDictionary.html)
pub struct TermDictionaryImpl {
stream_data: ReadOnlySource,
fst_index: fst::Map,
checkpoints_data: ReadOnlySource,
has_positions: bool,
}
impl TermDictionaryImpl {
pub(crate) fn stream_data(&self) -> &[u8] {
self.stream_data.as_slice()
}
pub(crate) fn strictly_previous_key(&self, key: &[u8]) -> (Vec<u8>, CheckPoint) {
let (term, checkpoint_offset) = self.strictly_previous_key_checkpoint_offset(key);
let mut checkpoint_data = &self.checkpoints_data.as_slice()[checkpoint_offset..];
let checkpoint =
CheckPoint::deserialize(&mut checkpoint_data).expect("Checkpoint data is corrupted");
(term, checkpoint)
}
fn strictly_previous_key_checkpoint_offset(&self, key: &[u8]) -> (Vec<u8>, usize) {
let fst_map = &self.fst_index;
let fst = fst_map.as_fst();
let mut node = fst.root();
let mut node_stack: Vec<Node> = vec![node];
// first check the longest prefix.
for &b in &key[..key.len() - 1] {
node = match node.find_input(b) {
None => {
break;
}
Some(i) => fst.node(node.transition_addr(i)),
};
node_stack.push(node);
}
let len_node_stack = node_stack.len();
for i in (1..len_node_stack).rev() {
let cur_node = &node_stack[i];
let b: u8 = key[i];
let last_transition_opt = cur_node
.transitions()
.take_while(|transition| transition.inp < b)
.last();
if let Some(last_transition) = last_transition_opt {
let mut result_buffer = Vec::from(&key[..i]);
result_buffer.push(last_transition.inp);
let mut result = Vec::from(&key[..i]);
result.push(last_transition.inp);
let fork_node = fst.node(last_transition.addr);
fill_last(fst, fork_node, &mut result);
let val = fst_map.get(&result).expect("Fst data corrupted") as usize;
return (result, val);
} else if cur_node.is_final() {
// the previous key is a prefix
let result_buffer = Vec::from(&key[..i]);
let val = fst_map.get(&result_buffer).expect("Fst data corrupted") as usize;
return (result_buffer, val);
}
}
(vec![], 0)
}
}
impl<'a> TermDictionary<'a> for TermDictionaryImpl {
type Streamer = TermStreamerImpl<'a>;
type StreamBuilder = TermStreamerBuilderImpl<'a>;
/// Opens a `TermDictionary` given a data source.
fn from_source(mut source: ReadOnlySource) -> Self {
let has_positions = source.slice(0, 1)[0] == 255u8;
source = source.slice_from(1);
let total_len = source.len();
let (body, footer) = source.split(total_len - 16);
let mut footer_buffer: &[u8] = footer.as_slice();
let fst_addr = u64::deserialize(&mut footer_buffer)
.expect("deserializing 8 byte should never fail") as usize;
let checkpoints_addr = u64::deserialize(&mut footer_buffer)
.expect("deserializing 8 byte should never fail")
as usize;
let stream_data = body.slice(0, fst_addr - PADDING_SIZE);
let fst_data = body.slice(fst_addr, checkpoints_addr);
let checkpoints_data = body.slice_from(checkpoints_addr);
let fst_index = open_fst_index(fst_data).expect("Index FST data corrupted");
TermDictionaryImpl {
has_positions,
stream_data,
checkpoints_data,
fst_index,
}
}
/// Lookups the value corresponding to the key.
fn get<K: AsRef<[u8]>>(&self, target_key: K) -> Option<TermInfo> {
let mut streamer = self.range().ge(&target_key).into_stream();
if streamer.advance() && streamer.key() == target_key.as_ref() {
Some(streamer.value().clone())
} else {
None
}
}
/// Returns a range builder, to stream all of the terms
/// within an interval.
fn range(&'a self) -> Self::StreamBuilder {
Self::StreamBuilder::new(self, self.has_positions)
}
}
#[cfg(test)]
mod tests {
use super::num_bytes_required;
#[test]
fn test_num_bytes_required() {
assert_eq!(num_bytes_required(0), 1);
assert_eq!(num_bytes_required(1), 1);
assert_eq!(num_bytes_required(255), 1);
assert_eq!(num_bytes_required(256), 2);
assert_eq!(num_bytes_required(u32::max_value()), 4);
}
}

131
src/termdict/streamer.rs Normal file
View File

@@ -0,0 +1,131 @@
use fst::{IntoStreamer, Streamer};
use fst::map::{Stream, StreamBuilder};
use postings::TermInfo;
use super::TermDictionary;
use termdict::TermOrdinal;
/// `TermStreamerBuilder` is an helper object used to define
/// a range of terms that should be streamed.
pub struct TermStreamerBuilder<'a> {
fst_map: &'a TermDictionary,
stream_builder: StreamBuilder<'a>,
}
impl<'a> TermStreamerBuilder<'a> {
pub(crate) fn new(fst_map: &'a TermDictionary, stream_builder: StreamBuilder<'a>) -> Self {
TermStreamerBuilder {
fst_map,
stream_builder,
}
}
/// Limit the range to terms greater or equal to the bound
pub fn ge<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
self.stream_builder = self.stream_builder.ge(bound);
self
}
/// Limit the range to terms strictly greater than the bound
pub fn gt<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
self.stream_builder = self.stream_builder.gt(bound);
self
}
/// Limit the range to terms lesser or equal to the bound
pub fn le<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
self.stream_builder = self.stream_builder.le(bound);
self
}
/// Limit the range to terms lesser or equal to the bound
pub fn lt<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
self.stream_builder = self.stream_builder.lt(bound);
self
}
/// Creates the stream corresponding to the range
/// of terms defined using the `TermStreamerBuilder`.
pub fn into_stream(self) -> TermStreamer<'a> {
TermStreamer {
fst_map: self.fst_map,
stream: self.stream_builder.into_stream(),
term_ord: 0u64,
current_key: Vec::with_capacity(100),
current_value: TermInfo::default(),
}
}
}
/// `TermStreamer` acts as a cursor over a range of terms of a segment.
/// Terms are guaranteed to be sorted.
pub struct TermStreamer<'a> {
fst_map: &'a TermDictionary,
stream: Stream<'a>,
term_ord: TermOrdinal,
current_key: Vec<u8>,
current_value: TermInfo,
}
impl<'a> TermStreamer<'a> {
/// Advance position the stream on the next item.
/// Before the first call to `.advance()`, the stream
/// is an unitialized state.
pub fn advance(&mut self) -> bool {
if let Some((term, term_ord)) = self.stream.next() {
self.current_key.clear();
self.current_key.extend_from_slice(term);
self.term_ord = term_ord;
self.current_value = self.fst_map.term_info_from_ord(term_ord);
true
} else {
false
}
}
/// Returns the `TermOrdinal` of the given term.
///
/// May panic if the called as `.advance()` as never
/// been called before.
pub fn term_ord(&self) -> TermOrdinal {
self.term_ord
}
/// Accesses the current key.
///
/// `.key()` should return the key that was returned
/// by the `.next()` method.
///
/// If the end of the stream as been reached, and `.next()`
/// has been called and returned `None`, `.key()` remains
/// the value of the last key encountered.
///
/// Before any call to `.next()`, `.key()` returns an empty array.
pub fn key(&self) -> &[u8] {
&self.current_key
}
/// Accesses the current value.
///
/// Calling `.value()` after the end of the stream will return the
/// last `.value()` encountered.
///
/// # Panics
///
/// Calling `.value()` before the first call to `.advance()` returns
/// `V::default()`.
pub fn value(&self) -> &TermInfo {
&self.current_value
}
/// Return the next `(key, value)` pair.
pub fn next(&mut self) -> Option<(&[u8], &TermInfo)> {
if self.advance() {
Some((self.key(), self.value()))
} else {
None
}
}
}

View File

@@ -6,24 +6,48 @@ use common::BinarySerializable;
use common::CountingWriter;
use schema::FieldType;
use postings::TermInfo;
use termdict::{TermDictionary, TermDictionaryBuilder, TermOrdinal};
use super::{TermInfoStore, TermInfoStoreWriter, TermStreamerBuilderImpl, TermStreamerImpl};
use termdict::TermOrdinal;
use super::{TermStreamerBuilder, TermStreamer};
use super::term_info_store::{TermInfoStore, TermInfoStoreWriter};
fn convert_fst_error(e: fst::Error) -> io::Error {
io::Error::new(io::ErrorKind::Other, e)
}
/// See [`TermDictionaryBuilder`](./trait.TermDictionaryBuilder.html)
pub struct TermDictionaryBuilderImpl<W> {
/// Builder for the new term dictionary.
///
/// Inserting must be done in the order of the `keys`.
pub struct TermDictionaryBuilder<W> {
fst_builder: fst::MapBuilder<W>,
term_info_store_writer: TermInfoStoreWriter,
term_ord: u64,
}
impl<W> TermDictionaryBuilderImpl<W>
impl<W> TermDictionaryBuilder<W>
where
W: Write,
W: Write
{
/// Creates a new `TermDictionaryBuilder`
pub fn new(w: W, _field_type: FieldType) -> io::Result<Self> {
let fst_builder = fst::MapBuilder::new(w).map_err(convert_fst_error)?;
Ok(TermDictionaryBuilder {
fst_builder,
term_info_store_writer: TermInfoStoreWriter::new(),
term_ord: 0,
})
}
/// Inserts a `(key, value)` pair in the term dictionary.
///
/// *Keys have to be inserted in order.*
pub fn insert<K: AsRef<[u8]>>(&mut self, key_ref: K, value: &TermInfo) -> io::Result<()> {
let key = key_ref.as_ref();
self.insert_key(key)?;
self.insert_value(value)?;
Ok(())
}
/// # Warning
/// Horribly dangerous internal API
///
@@ -46,29 +70,10 @@ where
self.term_info_store_writer.write_term_info(term_info)?;
Ok(())
}
}
impl<W> TermDictionaryBuilder<W> for TermDictionaryBuilderImpl<W>
where
W: Write,
{
fn new(w: W, _field_type: FieldType) -> io::Result<Self> {
let fst_builder = fst::MapBuilder::new(w).map_err(convert_fst_error)?;
Ok(TermDictionaryBuilderImpl {
fst_builder,
term_info_store_writer: TermInfoStoreWriter::new(),
term_ord: 0,
})
}
fn insert<K: AsRef<[u8]>>(&mut self, key_ref: K, value: &TermInfo) -> io::Result<()> {
let key = key_ref.as_ref();
self.insert_key(key)?;
self.insert_value(value)?;
Ok(())
}
fn finish(mut self) -> io::Result<W> {
/// Finalize writing the builder, and returns the underlying
/// `Write` object.
pub fn finish(mut self) -> io::Result<W> {
let mut file = self.fst_builder.into_inner().map_err(convert_fst_error)?;
{
let mut counting_writer = CountingWriter::wrap(&mut file);
@@ -94,18 +99,21 @@ fn open_fst_index(source: ReadOnlySource) -> fst::Map {
fst::Map::from(fst)
}
/// See [`TermDictionary`](./trait.TermDictionary.html)
pub struct TermDictionaryImpl {
/// The term dictionary contains all of the terms in
/// `tantivy index` in a sorted manner.
///
/// The `Fst` crate is used to assoicated terms to their
/// respective `TermOrdinal`. The `TermInfoStore` then makes it
/// possible to fetch the associated `TermInfo`.
pub struct TermDictionary {
fst_index: fst::Map,
term_info_store: TermInfoStore,
}
impl<'a> TermDictionary<'a> for TermDictionaryImpl {
type Streamer = TermStreamerImpl<'a>;
impl TermDictionary {
type StreamBuilder = TermStreamerBuilderImpl<'a>;
fn from_source(source: ReadOnlySource) -> Self {
/// Opens a `TermDictionary` given a data source.
pub fn from_source(source: ReadOnlySource) -> Self {
let total_len = source.len();
let length_offset = total_len - 8;
let mut split_len_buffer: &[u8] = &source.as_slice()[length_offset..];
@@ -115,15 +123,16 @@ impl<'a> TermDictionary<'a> for TermDictionaryImpl {
let fst_source = source.slice(0, split_len);
let values_source = source.slice(split_len, length_offset);
let fst_index = open_fst_index(fst_source);
TermDictionaryImpl {
TermDictionary {
fst_index,
term_info_store: TermInfoStore::open(&values_source),
}
}
fn empty(field_type: FieldType) -> Self {
/// Creates an empty term dictionary which contains no terms.
pub fn empty(field_type: FieldType) -> Self {
let term_dictionary_data: Vec<u8> =
TermDictionaryBuilderImpl::new(Vec::<u8>::new(), field_type)
TermDictionaryBuilder::new(Vec::<u8>::new(), field_type)
.expect("Creating a TermDictionaryBuilder in a Vec<u8> should never fail")
.finish()
.expect("Writing in a Vec<u8> should never fail");
@@ -131,15 +140,27 @@ impl<'a> TermDictionary<'a> for TermDictionaryImpl {
Self::from_source(source)
}
fn num_terms(&self) -> usize {
/// Returns the number of terms in the dictionary.
/// Term ordinals range from 0 to `num_terms() - 1`.
pub fn num_terms(&self) -> usize {
self.term_info_store.num_terms()
}
fn term_ord<K: AsRef<[u8]>>(&self, key: K) -> Option<TermOrdinal> {
/// Returns the ordinal associated to a given term.
pub fn term_ord<K: AsRef<[u8]>>(&self, key: K) -> Option<TermOrdinal> {
self.fst_index.get(key)
}
fn ord_to_term(&self, mut ord: TermOrdinal, bytes: &mut Vec<u8>) -> bool {
/// Returns the term associated to a given term ordinal.
///
/// Term ordinals are defined as the position of the term in
/// the sorted list of terms.
///
/// Returns true iff the term has been found.
///
/// Regardless of whether the term is found or not,
/// the buffer may be modified.
pub fn ord_to_term(&self, mut ord: TermOrdinal, bytes: &mut Vec<u8>) -> bool {
bytes.clear();
let fst = self.fst_index.as_fst();
let mut node = fst.root();
@@ -159,16 +180,26 @@ impl<'a> TermDictionary<'a> for TermDictionaryImpl {
true
}
fn term_info_from_ord(&self, term_ord: TermOrdinal) -> TermInfo {
/// Returns the number of terms in the dictionary.
pub fn term_info_from_ord(&self, term_ord: TermOrdinal) -> TermInfo {
self.term_info_store.get(term_ord)
}
fn get<K: AsRef<[u8]>>(&self, key: K) -> Option<TermInfo> {
/// Lookups the value corresponding to the key.
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Option<TermInfo> {
self.term_ord(key)
.map(|term_ord| self.term_info_from_ord(term_ord))
}
fn range(&self) -> TermStreamerBuilderImpl {
TermStreamerBuilderImpl::new(self, self.fst_index.range())
/// Returns a range builder, to stream all of the terms
/// within an interval.
pub fn range<'a>(&'a self) -> TermStreamerBuilder<'a> {
TermStreamerBuilder::new(self, self.fst_index.range())
}
/// A stream of all the sorted terms. [See also `.stream_field()`](#method.stream_field)
pub fn stream<'a>(&'a self) -> TermStreamer<'a> {
self.range().into_stream()
}
}