Merge branch 'issue/155'

Conflicts:
	src/indexer/merger.rs
	src/indexer/segment_writer.rs
This commit is contained in:
Paul Masurel
2017-05-19 20:14:36 +09:00
25 changed files with 831 additions and 515 deletions

View File

@@ -7,7 +7,6 @@ mod segment;
mod index_meta;
mod pool;
mod segment_meta;
mod term_iterator;
pub use self::searcher::Searcher;
pub use self::segment_component::SegmentComponent;
@@ -18,7 +17,6 @@ pub use self::segment::SerializableSegment;
pub use self::index::Index;
pub use self::segment_meta::SegmentMeta;
pub use self::index_meta::IndexMeta;
pub use self::term_iterator::TermIterator;
use std::path::PathBuf;

View File

@@ -7,8 +7,9 @@ use query::Query;
use DocId;
use DocAddress;
use schema::Term;
use core::TermIterator;
use termdict::TermMerger;
use std::fmt;
use postings::TermInfo;
/// Holds a list of `SegmentReader`s ready for search.
@@ -49,18 +50,6 @@ impl Searcher {
.fold(0u32, |acc, val| acc + val)
}
/// Returns a Stream over all of the sorted unique terms of
/// the searcher.
///
/// This includes all of the fields from all of the segment_readers.
/// See [TermIterator](struct.TermIterator.html).
///
/// # Warning
/// This API is very likely to change in the future.
pub fn terms(&self) -> TermIterator {
TermIterator::from(self.segment_readers())
}
/// Return the list of segment readers
pub fn segment_readers(&self) -> &[SegmentReader] {
&self.segment_readers
@@ -75,6 +64,18 @@ impl Searcher {
pub fn search<C: Collector>(&self, query: &Query, collector: &mut C) -> Result<TimerTree> {
query.search(self, collector)
}
/// Returns a Stream over all of the sorted unique terms of
/// the searcher.
///
/// This includes all of the fields from all of the segment_readers.
/// See [TermIterator](struct.TermIterator.html).
///
/// # Warning
/// This API is very likely to change in the future.
pub fn terms(&self) -> TermMerger<TermInfo> {
TermMerger::from(self.segment_readers())
}
}

View File

@@ -12,8 +12,9 @@ use schema::Document;
use directory::ReadOnlySource;
use DocId;
use std::str;
use std::cmp;
use postings::TermInfo;
use datastruct::FstMap;
use termdict::TermDictionary;
use std::sync::Arc;
use std::fmt;
use schema::Field;
@@ -42,7 +43,7 @@ use schema::TextIndexingOptions;
pub struct SegmentReader {
segment_id: SegmentId,
segment_meta: SegmentMeta,
term_infos: Arc<FstMap<TermInfo>>,
terms: Arc<TermDictionary>,
postings_data: ReadOnlySource,
store_reader: StoreReader,
fast_fields_reader: Arc<FastFieldsReader>,
@@ -133,16 +134,19 @@ impl SegmentReader {
/// Open a new segment for reading.
pub fn open(segment: Segment) -> Result<SegmentReader> {
let source = try!(segment.open_read(SegmentComponent::TERMS));
let term_infos = try!(FstMap::from_source(source));
let store_reader = StoreReader::from(try!(segment.open_read(SegmentComponent::STORE)));
let postings_shared_mmap = try!(segment.open_read(SegmentComponent::POSTINGS));
let source = segment.open_read(SegmentComponent::TERMS)?;
let terms = TermDictionary::from_source(source)?;
let fast_field_data = try!(segment.open_read(SegmentComponent::FASTFIELDS));
let fast_fields_reader = try!(FastFieldsReader::open(fast_field_data));
let store_source = segment.open_read(SegmentComponent::STORE)?;
let store_reader = StoreReader::from_source(store_source);
let fieldnorms_data = try!(segment.open_read(SegmentComponent::FIELDNORMS));
let fieldnorms_reader = try!(FastFieldsReader::open(fieldnorms_data));
let postings_shared_mmap = segment.open_read(SegmentComponent::POSTINGS)?;
let fast_field_data = segment.open_read(SegmentComponent::FASTFIELDS)?;
let fast_fields_reader = FastFieldsReader::from_source(fast_field_data)?;
let fieldnorms_data = segment.open_read(SegmentComponent::FIELDNORMS)?;
let fieldnorms_reader = FastFieldsReader::from_source(fieldnorms_data)?;
let positions_data = segment
.open_read(SegmentComponent::POSITIONS)
@@ -159,7 +163,7 @@ impl SegmentReader {
Ok(SegmentReader {
segment_meta: segment.meta().clone(),
postings_data: postings_shared_mmap,
term_infos: Arc::new(term_infos),
terms: Arc::new(terms),
segment_id: segment.id(),
store_reader: store_reader,
fast_fields_reader: Arc::new(fast_fields_reader),
@@ -171,8 +175,8 @@ impl SegmentReader {
}
/// Return the term dictionary datastructure.
pub fn term_infos(&self) -> &FstMap<TermInfo> {
&self.term_infos
pub fn terms(&self) -> &TermDictionary {
&self.terms
}
/// Returns the document (or to be accurate, its stored field)
@@ -201,39 +205,35 @@ impl SegmentReader {
let field = term.field();
let field_entry = self.schema.get_field_entry(field);
let term_info = get!(self.get_term_info(term));
let maximum_option = get!(field_entry.field_type().get_segment_postings_option());
let best_effort_option = cmp::min(maximum_option, option);
Some(self.read_postings_from_terminfo(&term_info, best_effort_option))
}
/// Returns a posting object given a `term_info`.
/// This method is for an advanced usage only.
///
/// Most user should prefer using `read_postings` instead.
pub fn read_postings_from_terminfo(&self,
term_info: &TermInfo,
option: SegmentPostingsOption)
-> SegmentPostings {
let offset = term_info.postings_offset as usize;
let postings_data = &self.postings_data[offset..];
let freq_handler = match *field_entry.field_type() {
FieldType::Str(ref options) => {
let indexing_options = options.get_indexing_options();
match option {
SegmentPostingsOption::NoFreq => FreqHandler::new_without_freq(),
SegmentPostingsOption::Freq => {
if indexing_options.is_termfreq_enabled() {
FreqHandler::new_with_freq()
} else {
FreqHandler::new_without_freq()
}
}
SegmentPostingsOption::FreqAndPositions => {
if indexing_options == TextIndexingOptions::TokenizedWithFreqAndPosition {
let offset = term_info.positions_offset as usize;
let offseted_position_data = &self.positions_data[offset..];
FreqHandler::new_with_freq_and_position(offseted_position_data)
} else if indexing_options.is_termfreq_enabled() {
FreqHandler::new_with_freq()
} else {
FreqHandler::new_without_freq()
}
}
}
let freq_handler = match option {
SegmentPostingsOption::NoFreq => FreqHandler::new_without_freq(),
SegmentPostingsOption::Freq => FreqHandler::new_with_freq(),
SegmentPostingsOption::FreqAndPositions => {
let offset = term_info.positions_offset as usize;
let offseted_position_data = &self.positions_data[offset..];
FreqHandler::new_with_freq_and_position(offseted_position_data)
}
_ => FreqHandler::new_without_freq(),
};
Some(SegmentPostings::from_data(term_info.doc_freq,
postings_data,
&self.delete_bitset,
freq_handler))
SegmentPostings::from_data(term_info.doc_freq,
postings_data,
&self.delete_bitset,
freq_handler)
}
@@ -262,7 +262,7 @@ impl SegmentReader {
/// Returns the term info associated with the term.
pub fn get_term_info(&self, term: &Term) -> Option<TermInfo> {
self.term_infos.get(term.as_slice())
self.terms.get(term.as_slice())
}
/// Returns the segment id

View File

@@ -1,182 +0,0 @@
use fst::Streamer;
use std::mem;
use std::collections::BinaryHeap;
use fst::map::Keys;
use schema::Field;
use schema::Term;
use core::SegmentReader;
use std::cmp::Ordering;
#[derive(PartialEq, Eq, Debug)]
struct HeapItem {
term: Term,
segment_ord: usize,
}
impl PartialOrd for HeapItem {
fn partial_cmp(&self, other: &HeapItem) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for HeapItem {
fn cmp(&self, other: &HeapItem) -> Ordering {
(&other.term, &other.segment_ord).cmp(&(&self.term, &self.segment_ord))
}
}
/// Given a list of sorted term streams,
/// returns an iterator over sorted unique terms.
///
/// The item yield is actually a pair with
/// - the term
/// - a slice with the ordinal of the segments containing
/// the terms.
pub struct TermIterator<'a> {
key_streams: Vec<Keys<'a>>,
heap: BinaryHeap<HeapItem>,
// Buffer hosting the list of segment ordinals containing
// the current term.
current_term: Term,
current_segment_ords: Vec<usize>,
}
impl<'a> TermIterator<'a> {
fn new(key_streams: Vec<Keys<'a>>) -> TermIterator<'a> {
let key_streams_len = key_streams.len();
TermIterator {
key_streams: key_streams,
heap: BinaryHeap::new(),
current_term: Term::from_field_text(Field(0), ""),
current_segment_ords: (0..key_streams_len).collect(),
}
}
/// Advance the term iterator to the next term.
/// Returns true if there is indeed another term
/// False if there is none.
pub fn advance(&mut self) -> bool {
self.advance_segments();
if let Some(mut head) = self.heap.pop() {
mem::swap(&mut self.current_term, &mut head.term);
self.current_segment_ords.push(head.segment_ord);
loop {
match self.heap.peek() {
Some(&ref next_heap_it) if next_heap_it.term == self.current_term => {}
_ => {
break;
}
}
let next_heap_it = self.heap.pop().unwrap(); // safe : we peeked beforehand
self.current_segment_ords.push(next_heap_it.segment_ord);
}
true
} else {
false
}
}
/// Returns the current term.
///
/// This method may be called
/// iff advance() has been called before
/// and "true" was returned.
pub fn term(&self) -> &Term {
&self.current_term
}
/// Returns the sorted list of segment ordinals
/// that include the current term.
///
/// This method may be called
/// iff advance() has been called before
/// and "true" was returned.
pub fn segment_ords(&self) -> &[usize] {
&self.current_segment_ords[..]
}
fn advance_segments(&mut self) {
for segment_ord in self.current_segment_ords.drain(..) {
if let Some(term) = self.key_streams[segment_ord].next() {
self.heap
.push(HeapItem {
term: Term::from_bytes(term),
segment_ord: segment_ord,
});
}
}
}
}
impl<'a, 'f> Streamer<'a> for TermIterator<'f> {
type Item = &'a Term;
fn next(&'a mut self) -> Option<Self::Item> {
if self.advance() {
Some(&self.current_term)
} else {
None
}
}
}
impl<'a> From<&'a [SegmentReader]> for TermIterator<'a> {
fn from(segment_readers: &'a [SegmentReader]) -> TermIterator<'a> {
TermIterator::new(segment_readers
.iter()
.map(|reader| reader.term_infos().keys())
.collect())
}
}
#[cfg(test)]
mod tests {
use super::*;
use schema::{SchemaBuilder, Document, TEXT};
use core::Index;
#[test]
fn test_term_iterator() {
let mut schema_builder = SchemaBuilder::default();
let text_field = schema_builder.add_text_field("text", TEXT);
let index = Index::create_in_ram(schema_builder.build());
{
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
{
{
let mut doc = Document::default();
doc.add_text(text_field, "a b d f");
index_writer.add_document(doc);
}
index_writer.commit().unwrap();
}
{
{
let mut doc = Document::default();
doc.add_text(text_field, "a b c d f");
index_writer.add_document(doc);
}
index_writer.commit().unwrap();
}
{
{
let mut doc = Document::default();
doc.add_text(text_field, "e f");
index_writer.add_document(doc);
}
index_writer.commit().unwrap();
}
}
index.load_searchers().unwrap();
let searcher = index.searcher();
let mut term_it = searcher.terms();
let mut terms = String::new();
while let Some(term) = term_it.next() {
terms.push_str(term.text());
}
assert_eq!(terms, "abcdef");
}
}

View File

@@ -1,150 +0,0 @@
use std::io;
use std::io::Write;
use fst;
use fst::raw::Fst;
use directory::ReadOnlySource;
use common::BinarySerializable;
use std::marker::PhantomData;
fn convert_fst_error(e: fst::Error) -> io::Error {
io::Error::new(io::ErrorKind::Other, e)
}
pub struct FstMapBuilder<W: Write, V: BinarySerializable> {
fst_builder: fst::MapBuilder<W>,
data: Vec<u8>,
_phantom_: PhantomData<V>,
}
impl<W: Write, V: BinarySerializable> FstMapBuilder<W, V> {
pub fn new(w: W) -> io::Result<FstMapBuilder<W, V>> {
let fst_builder = try!(fst::MapBuilder::new(w).map_err(convert_fst_error));
Ok(FstMapBuilder {
fst_builder: fst_builder,
data: Vec::new(),
_phantom_: PhantomData,
})
}
/// Horribly unsafe, nobody should ever do that... except me :)
///
/// If used, it must be used by systematically alternating calls
/// to insert_key and insert_value.
///
/// TODO see if I can bend Rust typesystem to enforce that
/// in a nice way.
pub fn insert_key(&mut self, key: &[u8]) -> io::Result<()> {
try!(self.fst_builder
.insert(key, self.data.len() as u64)
.map_err(convert_fst_error));
Ok(())
}
/// Horribly unsafe, nobody should ever do that... except me :)
pub fn insert_value(&mut self, value: &V) -> io::Result<()> {
try!(value.serialize(&mut self.data));
Ok(())
}
#[cfg(test)]
pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()> {
try!(self.fst_builder
.insert(key, self.data.len() as u64)
.map_err(convert_fst_error));
try!(value.serialize(&mut self.data));
Ok(())
}
pub fn finish(self) -> io::Result<W> {
let mut file = try!(self.fst_builder.into_inner().map_err(convert_fst_error));
let footer_size = self.data.len() as u32;
try!(file.write_all(&self.data));
try!((footer_size as u32).serialize(&mut file));
try!(file.flush());
Ok(file)
}
}
pub struct FstMap<V: BinarySerializable> {
fst_index: fst::Map,
values_mmap: ReadOnlySource,
_phantom_: PhantomData<V>,
}
fn open_fst_index(source: ReadOnlySource) -> io::Result<fst::Map> {
Ok(fst::Map::from(match source {
ReadOnlySource::Anonymous(data) => {
try!(Fst::from_shared_bytes(data.data, data.start, data.len)
.map_err(convert_fst_error))
}
ReadOnlySource::Mmap(mmap_readonly) => {
try!(Fst::from_mmap(mmap_readonly).map_err(convert_fst_error))
}
}))
}
impl<V: BinarySerializable> FstMap<V> {
pub fn keys(&self) -> fst::map::Keys {
self.fst_index.keys()
}
pub fn from_source(source: ReadOnlySource) -> io::Result<FstMap<V>> {
let total_len = source.len();
let length_offset = total_len - 4;
let mut split_len_buffer: &[u8] = &source.as_slice()[length_offset..];
let footer_size = try!(u32::deserialize(&mut split_len_buffer)) as usize;
let split_len = length_offset - footer_size;
let fst_source = source.slice(0, split_len);
let values_source = source.slice(split_len, length_offset);
let fst_index = try!(open_fst_index(fst_source));
Ok(FstMap {
fst_index: fst_index,
values_mmap: values_source,
_phantom_: PhantomData,
})
}
fn read_value(&self, offset: u64) -> V {
let buffer = self.values_mmap.as_slice();
let mut cursor = &buffer[(offset as usize)..];
V::deserialize(&mut cursor).expect("Data in FST is corrupted")
}
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Option<V> {
self.fst_index
.get(key)
.map(|offset| self.read_value(offset))
}
}
#[cfg(test)]
mod tests {
use super::*;
use directory::{RAMDirectory, Directory};
use std::path::PathBuf;
use fst::Streamer;
#[test]
fn test_fstmap() {
let mut directory = RAMDirectory::create();
let path = PathBuf::from("fstmap");
{
let write = directory.open_write(&path).unwrap();
let mut fstmap_builder = FstMapBuilder::new(write).unwrap();
fstmap_builder.insert("abc".as_bytes(), &34u32).unwrap();
fstmap_builder.insert("abcd".as_bytes(), &346u32).unwrap();
fstmap_builder.finish().unwrap();
}
let source = directory.open_read(&path).unwrap();
let fstmap: FstMap<u32> = FstMap::from_source(source).unwrap();
assert_eq!(fstmap.get("abc"), Some(34u32));
assert_eq!(fstmap.get("abcd"), Some(346u32));
let mut keys = fstmap.keys();
assert_eq!(keys.next().unwrap(), "abc".as_bytes());
assert_eq!(keys.next().unwrap(), "abcd".as_bytes());
assert_eq!(keys.next(), None);
}
}

View File

@@ -1,7 +1,4 @@
mod fstmap;
mod skip;
pub mod stacker;
pub use self::fstmap::FstMapBuilder;
pub use self::fstmap::FstMap;
pub use self::skip::{SkipListBuilder, SkipList};

View File

@@ -1,3 +1,8 @@
/*!
WORM directory abstraction.
*/
mod mmap_directory;
mod ram_directory;
mod directory;

View File

@@ -1,25 +1,27 @@
//! # Fast fields
//!
//! Fast fields are the equivalent of `DocValues` in `Lucene`.
//! Fast fields is a non-compressed column-oriented fashion storage
//! of `tantivy`.
//!
//! It is designed for the fast random access of some document
//! fields given a document id.
//!
//! `FastField` are useful when a field is required for all or most of
//! the `DocSet` : for instance for scoring, grouping, filtering, or facetting.
//!
//!
//! Fields have to be declared as `FAST` in the schema.
//! Currently only 64-bits integers (signed or unsigned) are
//! supported.
//!
//! They are stored in a bitpacked fashion so that their
//! memory usage is directly linear with the amplitude of the
//! values stored.
//!
//! Read access performance is comparable to that of an array lookup.
/*!
Fast fields is a column oriented storage storage.
It is the equivalent of `Lucene`'s `DocValues`.
Fast fields is a column-oriented fashion storage of `tantivy`.
It is designed for the fast random access of some document
fields given a document id.
`FastField` are useful when a field is required for all or most of
the `DocSet` : for instance for scoring, grouping, filtering, or facetting.
Fields have to be declared as `FAST` in the schema.
Currently only 64-bits integers (signed or unsigned) are
supported.
They are stored in a bitpacked fashion so that their
memory usage is directly linear with the amplitude of the
values stored.
Read access performance is comparable to that of an array lookup.
*/
mod reader;
mod writer;
@@ -95,7 +97,7 @@ mod tests {
assert_eq!(source.len(), 38 as usize);
}
{
let fast_field_readers = FastFieldsReader::open(source).unwrap();
let fast_field_readers = FastFieldsReader::from_source(source).unwrap();
let fast_field_reader: U64FastFieldReader =
fast_field_readers.open_reader(*FIELD).unwrap();
assert_eq!(fast_field_reader.get(0), 13u64);
@@ -129,7 +131,7 @@ mod tests {
assert_eq!(source.len(), 63 as usize);
}
{
let fast_field_readers = FastFieldsReader::open(source).unwrap();
let fast_field_readers = FastFieldsReader::from_source(source).unwrap();
let fast_field_reader: U64FastFieldReader =
fast_field_readers.open_reader(*FIELD).unwrap();
assert_eq!(fast_field_reader.get(0), 4u64);
@@ -165,7 +167,7 @@ mod tests {
assert_eq!(source.len(), 36 as usize);
}
{
let fast_field_readers = FastFieldsReader::open(source).unwrap();
let fast_field_readers = FastFieldsReader::from_source(source).unwrap();
let fast_field_reader: U64FastFieldReader =
fast_field_readers.open_reader(*FIELD).unwrap();
for doc in 0..10_000 {
@@ -198,7 +200,7 @@ mod tests {
assert_eq!(source.len(), 80044 as usize);
}
{
let fast_field_readers = FastFieldsReader::open(source).unwrap();
let fast_field_readers = FastFieldsReader::from_source(source).unwrap();
let fast_field_reader: U64FastFieldReader =
fast_field_readers.open_reader(*FIELD).unwrap();
assert_eq!(fast_field_reader.get(0), 0u64);
@@ -235,7 +237,7 @@ mod tests {
assert_eq!(source.len(), 17711 as usize);
}
{
let fast_field_readers = FastFieldsReader::open(source).unwrap();
let fast_field_readers = FastFieldsReader::from_source(source).unwrap();
let fast_field_reader: I64FastFieldReader =
fast_field_readers.open_reader(i64_field).unwrap();
assert_eq!(fast_field_reader.min_value(), -100i64);
@@ -266,7 +268,7 @@ mod tests {
let source = directory.open_read(&path).unwrap();
{
let fast_field_readers = FastFieldsReader::open(source).unwrap();
let fast_field_readers = FastFieldsReader::from_source(source).unwrap();
let fast_field_reader: I64FastFieldReader =
fast_field_readers.open_reader(i64_field).unwrap();
assert_eq!(fast_field_reader.get(0u32), 0i64);
@@ -299,7 +301,7 @@ mod tests {
}
let source = directory.open_read(&path).unwrap();
{
let fast_field_readers = FastFieldsReader::open(source).unwrap();
let fast_field_readers = FastFieldsReader::from_source(source).unwrap();
let fast_field_reader: U64FastFieldReader =
fast_field_readers.open_reader(*FIELD).unwrap();
let mut a = 0u64;
@@ -357,7 +359,7 @@ mod tests {
}
let source = directory.open_read(&path).unwrap();
{
let fast_field_readers = FastFieldsReader::open(source).unwrap();
let fast_field_readers = FastFieldsReader::from_source(source).unwrap();
let fast_field_reader: U64FastFieldReader =
fast_field_readers.open_reader(*FIELD).unwrap();
b.iter(|| {
@@ -388,7 +390,7 @@ mod tests {
}
let source = directory.open_read(&path).unwrap();
{
let fast_field_readers = FastFieldsReader::open(source).unwrap();
let fast_field_readers = FastFieldsReader::from_source(source).unwrap();
let fast_field_reader: U64FastFieldReader =
fast_field_readers.open_reader(*FIELD).unwrap();
b.iter(|| {

View File

@@ -128,7 +128,7 @@ impl From<Vec<u64>> for U64FastFieldReader {
serializer.close().unwrap();
}
let source = directory.open_read(path).unwrap();
let fast_field_readers = FastFieldsReader::open(source).unwrap();
let fast_field_readers = FastFieldsReader::from_source(source).unwrap();
fast_field_readers.open_reader(field).unwrap()
}
}
@@ -194,12 +194,12 @@ pub struct FastFieldsReader {
}
impl FastFieldsReader {
/// Opens the `FastFieldsReader` file
/// Opens a `FastFieldsReader`
///
/// When opening the fast field reader, the
/// the list of the offset is read (as a footer of the
/// data file).
pub fn open(source: ReadOnlySource) -> io::Result<FastFieldsReader> {
pub fn from_source(source: ReadOnlySource) -> io::Result<FastFieldsReader> {
let header_offset;
let field_offsets: Vec<(Field, u32)>;
{

View File

@@ -273,7 +273,7 @@ fn index_documents(heap: &mut Heap,
//
// Tantivy does not resize its hashtable. When it reaches
// capacity, we just stop indexing new document.
if segment_writer.is_termdic_saturated() {
if segment_writer.is_term_saturated() {
info!("Term dic saturated, flushing segment with maxdoc={}.",
segment_writer.max_doc());
break;

View File

@@ -10,13 +10,15 @@ use fastfield::U64FastFieldReader;
use itertools::Itertools;
use postings::Postings;
use postings::DocSet;
use core::TermIterator;
use fastfield::DeleteBitSet;
use schema::{Schema, Field};
use termdict::TermMerger;
use fastfield::FastFieldSerializer;
use fastfield::FastFieldReader;
use store::StoreWriter;
use std::cmp::{min, max};
use schema;
use postings::SegmentPostingsOption;
pub struct IndexMerger {
schema: Schema,
@@ -161,7 +163,6 @@ impl IndexMerger {
return Err(Error::SchemaError(error_msg));
}
}
}
if u64_readers.is_empty() {
@@ -172,24 +173,25 @@ impl IndexMerger {
assert!(min_val <= max_val);
try!(fast_field_serializer.new_u64_fast_field(field, min_val, max_val));
fast_field_serializer
.new_u64_fast_field(field, min_val, max_val)?;
for (max_doc, u64_reader, delete_bitset) in u64_readers {
for doc_id in 0..max_doc {
if !delete_bitset.is_deleted(doc_id) {
let val = u64_reader.get(doc_id);
try!(fast_field_serializer.add_val(val));
fast_field_serializer.add_val(val)?;
}
}
}
try!(fast_field_serializer.close_field());
fast_field_serializer.close_field()?;
}
Ok(())
}
fn write_postings(&self, postings_serializer: &mut PostingsSerializer) -> Result<()> {
fn write_postings(&self, serializer: &mut PostingsSerializer) -> Result<()> {
let mut merged_terms = TermIterator::from(&self.readers[..]);
let mut merged_terms = TermMerger::from(&self.readers[..]);
let mut delta_position_computer = DeltaPositionComputer::new();
let mut max_doc = 0;
@@ -212,6 +214,8 @@ impl IndexMerger {
let mut last_field: Option<Field> = None;
let mut segment_postings_option = SegmentPostingsOption::FreqAndPositions;
while merged_terms.advance() {
// Create the total list of doc ids
// by stacking the doc ids from the different segment.
@@ -223,59 +227,85 @@ impl IndexMerger {
// - Segment 2's doc ids become [seg0.max_doc + seg1.max_doc,
// seg0.max_doc + seg1.max_doc + seg2.max_doc]
// ...
let term = merged_terms.term();
let mut term_written = false;
let segment_postings = merged_terms
.segment_ords()
let term_bytes = merged_terms.key();
let current_field = schema::extract_field_from_term_bytes(term_bytes);
if last_field != Some(current_field) {
// we reached a new field.
let field_entry = self.schema.get_field_entry(current_field);
// ... set segment postings option the new field.
segment_postings_option = field_entry
.field_type()
.get_segment_postings_option()
.expect("Encounterred a field that is not supposed to be
indexed. Have you modified the index?");
last_field = Some(current_field);
// it is perfectly safe to call `.new_field`
// even if there is no postings associated.
serializer.new_field(current_field);
}
// Let's compute the list of non-empty posting lists
let segment_postings: Vec<_> = merged_terms
.current_kvs()
.iter()
.cloned()
.flat_map(|segment_ord| {
self.readers[segment_ord]
.read_postings_all_info(term)
.map(|segment_postings| (segment_ord, segment_postings))
})
.collect::<Vec<_>>();
// We can remove the term if all documents which
// contained it have been deleted.
if !segment_postings.is_empty() {
// We can now serialize this postings, by pushing each document to the
// postings serializer.
for (segment_ord, mut segment_postings) in segment_postings {
let old_to_new_doc_id = &merged_doc_id_map[segment_ord];
while segment_postings.advance() {
if let Some(remapped_doc_id) =
old_to_new_doc_id[segment_postings.doc() as usize] {
if !term_written {
let current_field = term.field();
if last_field != Some(current_field) {
postings_serializer.new_field(current_field);
last_field = Some(current_field);
}
// we make sure to only write the term iff
// there is at least one document.
postings_serializer.new_term(term.as_slice())?;
term_written = true;
}
let delta_positions: &[u32] =
delta_position_computer
.compute_delta_positions(segment_postings.positions());
try!(postings_serializer.write_doc(remapped_doc_id,
segment_postings.term_freq(),
delta_positions));
}
.flat_map(|heap_item| {
let segment_ord = heap_item.segment_ord;
let term_info = heap_item.streamer.value();
let segment_reader = &self.readers[heap_item.segment_ord];
let mut segment_postings =
segment_reader
.read_postings_from_terminfo(&term_info, segment_postings_option);
if segment_postings.advance() {
Some((segment_ord, segment_postings))
} else {
None
}
}
})
.collect();
if term_written {
try!(postings_serializer.close_term());
// At this point, `segment_postings` contains the posting list
// of all of the segments containing the given term.
//
// These segments are non-empty and advance has already been called.
if segment_postings.is_empty() {
// by continuing here, the `term` will be entirely removed.
continue;
}
// We know that there is at least one document containing
// the term, so we add it.
serializer.new_term(term_bytes)?;
// We can now serialize this postings, by pushing each document to the
// postings serializer.
for (segment_ord, mut segment_postings) in segment_postings {
let old_to_new_doc_id = &merged_doc_id_map[segment_ord];
loop {
// `.advance()` has been called once before the loop.
// Hence we cannot use a `while segment_postings.advance()` loop.
if let Some(remapped_doc_id) =
old_to_new_doc_id[segment_postings.doc() as usize] {
// we make sure to only write the term iff
// there is at least one document.
let delta_positions: &[u32] =
delta_position_computer
.compute_delta_positions(segment_postings.positions());
let term_freq = segment_postings.term_freq();
serializer
.write_doc(remapped_doc_id, term_freq, delta_positions)?;
}
if !segment_postings.advance() {
break;
}
}
}
// closing the term.
serializer.close_term()?;
}
Ok(())
}

View File

@@ -58,10 +58,10 @@ impl SegmentSerializer {
/// Finalize the segment serialization.
pub fn close(self) -> Result<()> {
try!(self.fast_field_serializer.close());
try!(self.postings_serializer.close());
try!(self.store_writer.close());
try!(self.fieldnorms_serializer.close());
self.fast_field_serializer.close()?;
self.postings_serializer.close()?;
self.store_writer.close()?;
self.fieldnorms_serializer.close()?;
Ok(())
}
}

View File

@@ -98,8 +98,8 @@ impl<'a> SegmentWriter<'a> {
/// Return true if the term dictionary hashmap is reaching capacity.
/// It is one of the condition that triggers a `SegmentWriter` to
/// be finalized.
pub(crate) fn is_termdic_saturated(&self) -> bool {
self.multifield_postings.is_termdic_saturated()
pub(crate) fn is_term_saturated(&self) -> bool {
self.multifield_postings.is_term_saturated()
}

View File

@@ -12,6 +12,8 @@
#![doc(test(attr(allow(unused_variables), deny(warnings))))]
#![allow(unknown_lints)]
#![warn(missing_docs)]
//! # `tantivy`
@@ -109,9 +111,10 @@ mod datastruct;
pub mod termdict;
/// Query module
pub mod query;
/// Directory module
pub mod directory;
/// Collector module
pub mod collector;
@@ -134,8 +137,6 @@ pub use postings::DocSet;
pub use postings::Postings;
pub use postings::SegmentPostingsOption;
pub use core::TermIterator;
/// Expose the current version of tantivy, as well
/// whether it was compiled with the simd compression.
@@ -147,8 +148,7 @@ pub fn version() -> &'static str {
}
}
/// Tantivy's makes it possible to personalize when
/// the indexer should merge its segments
/// Defines tantivy's merging strategy
pub mod merge_policy {
pub use indexer::MergePolicy;
pub use indexer::LogMergePolicy;

View File

@@ -78,6 +78,7 @@ impl<'a> MultiFieldPostingsWriter<'a> {
/// Serialize the inverted index.
/// It pushes all term, one field at a time, towards the
/// postings serializer.
#[allow(needless_range_loop)]
pub fn serialize(&self, serializer: &mut PostingsSerializer) -> Result<()> {
let mut term_offsets: Vec<(&[u8], u32)> = self.term_index.iter().collect();
term_offsets.sort_by_key(|&(k, _v)| k);
@@ -108,7 +109,7 @@ impl<'a> MultiFieldPostingsWriter<'a> {
}
/// Return true iff the term dictionary is saturated.
pub fn is_termdic_saturated(&self) -> bool {
pub fn is_term_saturated(&self) -> bool {
self.term_index.is_saturated()
}
}

View File

@@ -6,7 +6,7 @@
/// avoid this extra cost when the information is not required.
/// For instance, positions are useful when running phrase queries
/// but useless in other queries.
#[derive(Clone, Copy, Debug)]
#[derive(Clone, Copy, Debug, PartialEq, PartialOrd, Ord, Eq)]
pub enum SegmentPostingsOption {
/// Only the doc ids are decoded
NoFreq,
@@ -15,3 +15,15 @@ pub enum SegmentPostingsOption {
/// DocIds, term frequencies and positions will be decoded.
FreqAndPositions,
}
#[cfg(test)]
mod tests {
use super::SegmentPostingsOption;
#[test]
fn test_cmp_segment_postings_option() {
assert!(SegmentPostingsOption::FreqAndPositions > SegmentPostingsOption::Freq);
assert!(SegmentPostingsOption::Freq > SegmentPostingsOption::NoFreq);
}
}

View File

@@ -1,5 +1,5 @@
use Result;
use datastruct::FstMapBuilder;
use termdict::TermDictionaryBuilder;
use super::TermInfo;
use schema::Field;
use schema::FieldEntry;
@@ -50,7 +50,7 @@ use common::BinarySerializable;
/// A description of the serialization format is
/// [available here](https://fulmicoton.gitbooks.io/tantivy-doc/content/inverted-index.html).
pub struct PostingsSerializer {
terms_fst_builder: FstMapBuilder<WritePtr, TermInfo>,
terms_fst_builder: TermDictionaryBuilder<WritePtr, TermInfo>,
postings_write: WritePtr,
positions_write: WritePtr,
written_bytes_postings: usize,
@@ -74,7 +74,7 @@ impl PostingsSerializer {
positions_write: WritePtr,
schema: Schema)
-> Result<PostingsSerializer> {
let terms_fst_builder = try!(FstMapBuilder::new(terms_write));
let terms_fst_builder = try!(TermDictionaryBuilder::new(terms_write));
Ok(PostingsSerializer {
terms_fst_builder: terms_fst_builder,
postings_write: postings_write,

View File

@@ -2,7 +2,8 @@ use schema::{TextOptions, IntOptions};
use serde_json::Value as JsonValue;
use schema::Value;
use postings::SegmentPostingsOption;
use schema::TextIndexingOptions;
/// Possible error that may occur while parsing a field value
/// At this point the JSON is known to be valid.
@@ -39,6 +40,34 @@ impl FieldType {
}
}
/// Given a field configuration, return the maximal possible
/// `SegmentPostingsOption` available.
///
/// If the field is not indexed, then returns `None`.
pub fn get_segment_postings_option(&self) -> Option<SegmentPostingsOption> {
match *self {
FieldType::Str(ref text_options) => {
match text_options.get_indexing_options() {
TextIndexingOptions::Untokenized |
TextIndexingOptions::TokenizedNoFreq => Some(SegmentPostingsOption::NoFreq),
TextIndexingOptions::TokenizedWithFreq => Some(SegmentPostingsOption::Freq),
TextIndexingOptions::TokenizedWithFreqAndPosition => {
Some(SegmentPostingsOption::FreqAndPositions)
}
TextIndexingOptions::Unindexed => None,
}
}
FieldType::U64(ref int_options) |
FieldType::I64(ref int_options) => {
if int_options.is_indexed() {
Some(SegmentPostingsOption::NoFreq)
} else {
None
}
}
}
}
/// Parses a field value from json, given the target FieldType.
///
/// Tantivy will not try to cast values.

View File

@@ -106,6 +106,7 @@ impl Term {
///
/// If you want to build a field for a given `str`,
/// you want to use `from_field_text`.
#[cfg(test)]
pub(crate) fn from_bytes(data: &[u8]) -> Term {
Term(Vec::from(data))
}

View File

@@ -58,7 +58,7 @@ mod tests {
let schema = write_lorem_ipsum_store(store_file, 1_000);
let field_title = schema.get_field("title").unwrap();
let store_source = directory.open_read(path).unwrap();
let store = StoreReader::from(store_source);
let store = StoreReader::from_source(store_source);
for i in 0..1_000 {
assert_eq!(*store.get(i).unwrap().get_first(field_title).unwrap().text(),
format!("Doc {}", i));
@@ -82,7 +82,7 @@ mod tests {
let path = Path::new("store");
write_lorem_ipsum_store(directory.open_write(path).unwrap(), 1_000);
let store_source = directory.open_read(path).unwrap();
let store = StoreReader::from(store_source);
let store = StoreReader::from_source(store_source);
b.iter(|| { store.get(12).unwrap(); });
}

View File

@@ -21,6 +21,17 @@ pub struct StoreReader {
}
impl StoreReader {
pub fn from_source(data: ReadOnlySource) -> StoreReader {
let (data_source, offset_index_source, max_doc) = split_source(data);
StoreReader {
data: data_source,
offset_index_source: offset_index_source,
current_block_offset: RefCell::new(usize::max_value()),
current_block: RefCell::new(Vec::new()),
max_doc: max_doc,
}
}
fn block_offset(&self, doc_id: DocId) -> (DocId, u64) {
SkipList::from(self.offset_index_source.as_slice())
.seek(doc_id + 1)
@@ -76,17 +87,3 @@ fn split_source(data: ReadOnlySource) -> (ReadOnlySource, ReadOnlySource, DocId)
drop(data);
res
}
impl From<ReadOnlySource> for StoreReader {
fn from(data: ReadOnlySource) -> StoreReader {
let (data_source, offset_index_source, max_doc) = split_source(data);
StoreReader {
data: data_source,
offset_index_source: offset_index_source,
current_block_offset: RefCell::new(usize::max_value()),
current_block: RefCell::new(Vec::new()),
max_doc: max_doc,
}
}
}

208
src/termdict/merger.rs Normal file
View File

@@ -0,0 +1,208 @@
use std::collections::BinaryHeap;
use core::SegmentReader;
use super::TermStreamer;
use common::BinarySerializable;
use postings::TermInfo;
use std::cmp::Ordering;
use fst::Streamer;
pub struct HeapItem<'a, V>
where V: 'a + BinarySerializable
{
pub streamer: TermStreamer<'a, V>,
pub segment_ord: usize,
}
impl<'a, V> PartialEq for HeapItem<'a, V>
where V: 'a + BinarySerializable
{
fn eq(&self, other: &Self) -> bool {
self.segment_ord == other.segment_ord
}
}
impl<'a, V> Eq for HeapItem<'a, V> where V: 'a + BinarySerializable {}
impl<'a, V> PartialOrd for HeapItem<'a, V>
where V: 'a + BinarySerializable
{
fn partial_cmp(&self, other: &HeapItem<'a, V>) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<'a, V> Ord for HeapItem<'a, V>
where V: 'a + BinarySerializable
{
fn cmp(&self, other: &HeapItem<'a, V>) -> Ordering {
(&other.streamer.key(), &other.segment_ord).cmp(&(&self.streamer.key(), &self.segment_ord))
}
}
/// Given a list of sorted term streams,
/// returns an iterator over sorted unique terms.
///
/// The item yield is actually a pair with
/// - the term
/// - a slice with the ordinal of the segments containing
/// the terms.
pub struct TermMerger<'a, V>
where V: 'a + BinarySerializable
{
heap: BinaryHeap<HeapItem<'a, V>>,
current_streamers: Vec<HeapItem<'a, V>>,
}
impl<'a, V> TermMerger<'a, V>
where V: 'a + BinarySerializable
{
fn new(streams: Vec<TermStreamer<'a, V>>) -> TermMerger<'a, V> {
TermMerger {
heap: BinaryHeap::new(),
current_streamers: streams
.into_iter()
.enumerate()
.map(|(ord, streamer)| {
HeapItem {
streamer: streamer,
segment_ord: ord,
}
})
.collect(),
}
}
fn advance_segments(&mut self) {
let streamers = &mut self.current_streamers;
let heap = &mut self.heap;
for mut heap_item in streamers.drain(..) {
if heap_item.streamer.advance() {
heap.push(heap_item);
}
}
}
/// Advance the term iterator to the next term.
/// Returns true if there is indeed another term
/// False if there is none.
#[allow(while_let_loop)]
pub fn advance(&mut self) -> bool {
self.advance_segments();
if let Some(head) = self.heap.pop() {
self.current_streamers.push(head);
loop {
if let Some(next_streamer) = self.heap.peek() {
if self.current_streamers[0].streamer.key() != next_streamer.streamer.key() {
break;
}
} else {
break;
} // no more streamer.
let next_heap_it = self.heap.pop().unwrap(); // safe : we peeked beforehand
self.current_streamers.push(next_heap_it);
}
true
} else {
false
}
}
/// Returns the current term.
///
/// This method may be called
/// iff advance() has been called before
/// and "true" was returned.
pub fn key(&self) -> &[u8] {
self.current_streamers[0].streamer.key()
}
/// Returns the sorted list of segment ordinals
/// that include the current term.
///
/// This method may be called
/// iff advance() has been called before
/// and "true" was returned.
pub fn current_kvs(&self) -> &[HeapItem<'a, V>] {
&self.current_streamers[..]
}
}
impl<'a> From<&'a [SegmentReader]> for TermMerger<'a, TermInfo>
where TermInfo: BinarySerializable
{
fn from(segment_readers: &'a [SegmentReader]) -> TermMerger<'a, TermInfo> {
TermMerger::new(segment_readers
.iter()
.map(|reader| reader.terms().stream())
.collect())
}
}
impl<'a, V> Streamer<'a> for TermMerger<'a, V>
where V: BinarySerializable
{
type Item = &'a [u8];
fn next(&'a mut self) -> Option<Self::Item> {
if self.advance() {
Some(self.current_streamers[0].streamer.key())
} else {
None
}
}
}
#[cfg(test)]
mod tests {
use schema::{Term, SchemaBuilder, Document, TEXT};
use core::Index;
#[test]
fn test_term_iterator() {
let mut schema_builder = SchemaBuilder::default();
let text_field = schema_builder.add_text_field("text", TEXT);
let index = Index::create_in_ram(schema_builder.build());
{
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
{
{
let mut doc = Document::default();
doc.add_text(text_field, "a b d f");
index_writer.add_document(doc);
}
index_writer.commit().unwrap();
}
{
{
let mut doc = Document::default();
doc.add_text(text_field, "a b c d f");
index_writer.add_document(doc);
}
index_writer.commit().unwrap();
}
{
{
let mut doc = Document::default();
doc.add_text(text_field, "e f");
index_writer.add_document(doc);
}
index_writer.commit().unwrap();
}
}
index.load_searchers().unwrap();
let searcher = index.searcher();
let mut term_it = searcher.terms();
let mut term_string = String::new();
while term_it.advance() {
let term = Term::from_bytes(term_it.key());
term_string.push_str(term.text());
}
assert_eq!(&*term_string, "abcdef");
}
}

24
src/termdict/mod.rs Normal file
View File

@@ -0,0 +1,24 @@
/*!
The term dictionary contains all of the terms in
`tantivy index` in a sorted manner.
It is implemented as a wrapper of the `Fst` crate in order
to add a value type.
A finite state transducer itself associates
each term `&[u8]` to a `u64` that is in fact an address
in a buffer. The value is then accessible via
deserializing the value at this address.
Keys (`&[u8]`) in this datastructure are sorted.
*/
mod termdict;
mod streamer;
mod merger;
pub use self::termdict::TermDictionary;
pub use self::termdict::TermDictionaryBuilder;
pub use self::streamer::TermStreamer;
pub use self::streamer::TermStreamerBuilder;
pub use self::merger::TermMerger;

142
src/termdict/streamer.rs Normal file
View File

@@ -0,0 +1,142 @@
use fst::{self, IntoStreamer, Streamer};
use fst::map::{StreamBuilder, Stream};
use common::BinarySerializable;
use super::TermDictionary;
/// `TermStreamerBuilder` is an helper object used to define
/// a range of terms that should be streamed.
pub struct TermStreamerBuilder<'a, V>
where V: 'a + BinarySerializable
{
fst_map: &'a TermDictionary<V>,
stream_builder: StreamBuilder<'a>,
}
impl<'a, V> TermStreamerBuilder<'a, V>
where V: 'a + BinarySerializable
{
/// Limit the range to terms greater or equal to the bound
pub fn ge<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
self.stream_builder = self.stream_builder.ge(bound);
self
}
/// Limit the range to terms strictly greater than the bound
pub fn gt<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
self.stream_builder = self.stream_builder.gt(bound);
self
}
/// Limit the range to terms lesser or equal to the bound
pub fn le<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
self.stream_builder = self.stream_builder.le(bound);
self
}
/// Limit the range to terms lesser or equal to the bound
pub fn lt<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
self.stream_builder = self.stream_builder.lt(bound);
self
}
/// Creates the stream corresponding to the range
/// of terms defined using the `TermStreamerBuilder`.
pub fn into_stream(self) -> TermStreamer<'a, V> {
TermStreamer {
fst_map: self.fst_map,
stream: self.stream_builder.into_stream(),
buffer: Vec::with_capacity(100),
offset: 0u64,
}
}
/// Crates a new `TermStreamBuilder`
pub(crate) fn new(fst_map: &'a TermDictionary<V>,
stream_builder: StreamBuilder<'a>)
-> TermStreamerBuilder<'a, V> {
TermStreamerBuilder {
fst_map: fst_map,
stream_builder: stream_builder,
}
}
}
/// `TermStreamer` acts as a cursor over a range of terms of a segment.
/// Terms are guaranteed to be sorted.
pub struct TermStreamer<'a, V>
where V: 'a + BinarySerializable
{
fst_map: &'a TermDictionary<V>,
stream: Stream<'a>,
offset: u64,
buffer: Vec<u8>,
}
impl<'a, 'b, V> fst::Streamer<'b> for TermStreamer<'a, V>
where V: 'b + BinarySerializable
{
type Item = (&'b [u8], V);
fn next(&'b mut self) -> Option<(&'b [u8], V)> {
if self.advance() {
let v = self.value();
Some((&self.buffer, v))
} else {
None
}
}
}
impl<'a, V> TermStreamer<'a, V>
where V: 'a + BinarySerializable
{
/// Advance position the stream on the next item.
/// Before the first call to `.advance()`, the stream
/// is an unitialized state.
pub fn advance(&mut self) -> bool {
if let Some((term, offset)) = self.stream.next() {
self.buffer.clear();
self.buffer.extend_from_slice(term);
self.offset = offset;
true
} else {
false
}
}
/// Accesses the current key.
///
/// `.key()` should return the key that was returned
/// by the `.next()` method.
///
/// If the end of the stream as been reached, and `.next()`
/// has been called and returned `None`, `.key()` remains
/// the value of the last key encounterred.
///
/// Before any call to `.next()`, `.key()` returns an empty array.
pub fn key(&self) -> &[u8] {
&self.buffer
}
/// Accesses the current value.
///
/// Values are accessed in a lazy manner, their data is fetched
/// and deserialized only at the moment of the call to `.value()`.
///
/// Calling `.value()` after the end of the stream will return the
/// last `.value()` encounterred.
///
/// # Panics
///
/// Calling `.value()` before the first call to `.advance()` or `.next()`
/// is undefined behavior.
pub fn value(&self) -> V {
self.fst_map
.read_value(self.offset)
.expect("Fst data is corrupted. Failed to deserialize a value.")
}
}

201
src/termdict/termdict.rs Normal file
View File

@@ -0,0 +1,201 @@
use std::io::{self, Write};
use fst;
use fst::raw::Fst;
use super::{TermStreamerBuilder, TermStreamer};
use directory::ReadOnlySource;
use common::BinarySerializable;
use std::marker::PhantomData;
use schema::{Field, Term};
use postings::TermInfo;
fn convert_fst_error(e: fst::Error) -> io::Error {
io::Error::new(io::ErrorKind::Other, e)
}
/// Builder for the new term dictionary.
///
/// Just like for the fst crate, all terms must be inserted in order.
pub struct TermDictionaryBuilder<W: Write, V = TermInfo>
where V: BinarySerializable
{
fst_builder: fst::MapBuilder<W>,
data: Vec<u8>,
_phantom_: PhantomData<V>,
}
impl<W: Write, V: BinarySerializable> TermDictionaryBuilder<W, V> {
/// Creates a new `TermDictionaryBuilder`
pub fn new(w: W) -> io::Result<TermDictionaryBuilder<W, V>> {
let fst_builder = fst::MapBuilder::new(w).map_err(convert_fst_error)?;
Ok(TermDictionaryBuilder {
fst_builder: fst_builder,
data: Vec::new(),
_phantom_: PhantomData,
})
}
/// # Warning
/// Horribly dangerous internal API
///
/// If used, it must be used by systematically alternating calls
/// to insert_key and insert_value.
///
/// Prefer using `.insert(key, value)`
pub(crate) fn insert_key(&mut self, key: &[u8]) -> io::Result<()> {
self.fst_builder
.insert(key, self.data.len() as u64)
.map_err(convert_fst_error)?;
Ok(())
}
/// # Warning
///
/// Horribly dangerous internal API. See `.insert_key(...)`.
pub(crate) fn insert_value(&mut self, value: &V) -> io::Result<()> {
value.serialize(&mut self.data)?;
Ok(())
}
/// Inserts a `(key, value)` pair in the term dictionary.
///
/// *Keys have to be inserted in order.*
pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()> {
self.fst_builder
.insert(key, self.data.len() as u64)
.map_err(convert_fst_error)?;
value.serialize(&mut self.data)?;
Ok(())
}
/// Finalize writing the builder, and returns the underlying
/// `Write` object.
pub fn finish(self) -> io::Result<W> {
let mut file = self.fst_builder.into_inner().map_err(convert_fst_error)?;
let footer_size = self.data.len() as u32;
file.write_all(&self.data)?;
(footer_size as u32).serialize(&mut file)?;
file.flush()?;
Ok(file)
}
}
/// Datastructure to access the `terms` of a segment.
pub struct TermDictionary<V = TermInfo>
where V: BinarySerializable
{
fst_index: fst::Map,
values_mmap: ReadOnlySource,
_phantom_: PhantomData<V>,
}
fn open_fst_index(source: ReadOnlySource) -> io::Result<fst::Map> {
let fst = match source {
ReadOnlySource::Anonymous(data) => {
Fst::from_shared_bytes(data.data, data.start, data.len)
.map_err(convert_fst_error)?
}
ReadOnlySource::Mmap(mmap_readonly) => {
Fst::from_mmap(mmap_readonly).map_err(convert_fst_error)?
}
};
Ok(fst::Map::from(fst))
}
impl<V> TermDictionary<V>
where V: BinarySerializable
{
/// Opens a `TermDictionary` given a data source.
pub fn from_source(source: ReadOnlySource) -> io::Result<TermDictionary<V>> {
let total_len = source.len();
let length_offset = total_len - 4;
let mut split_len_buffer: &[u8] = &source.as_slice()[length_offset..];
let footer_size = u32::deserialize(&mut split_len_buffer)? as usize;
let split_len = length_offset - footer_size;
let fst_source = source.slice(0, split_len);
let values_source = source.slice(split_len, length_offset);
let fst_index = open_fst_index(fst_source)?;
Ok(TermDictionary {
fst_index: fst_index,
values_mmap: values_source,
_phantom_: PhantomData,
})
}
/// Deserialize and returns the value at address `offset`
pub(crate) fn read_value(&self, offset: u64) -> io::Result<V> {
let buffer = self.values_mmap.as_slice();
let mut cursor = &buffer[(offset as usize)..];
V::deserialize(&mut cursor)
}
/// Lookups the value corresponding to the key.
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Option<V> {
self.fst_index
.get(key)
.map(|offset| {
self.read_value(offset)
.expect("The fst is corrupted. Failed to deserialize a value.")
})
}
/// A stream of all the sorted terms. [See also `.stream_field()`](#method.stream_field)
pub fn stream(&self) -> TermStreamer<V> {
self.range().into_stream()
}
/// A stream of all the sorted terms in the given field.
pub fn stream_field(&self, field: Field) -> TermStreamer<V> {
let start_term = Term::from_field_text(field, "");
let stop_term = Term::from_field_text(Field(field.0 + 1), "");
self.range()
.ge(start_term.as_slice())
.lt(stop_term.as_slice())
.into_stream()
}
/// Returns a range builder, to stream all of the terms
/// within an interval.
pub fn range(&self) -> TermStreamerBuilder<V> {
TermStreamerBuilder::new(self, self.fst_index.range())
}
}
#[cfg(test)]
mod tests {
use super::*;
use directory::{RAMDirectory, Directory};
use std::path::PathBuf;
use fst::Streamer;
#[test]
fn test_term_dictionary() {
let mut directory = RAMDirectory::create();
let path = PathBuf::from("TermDictionary");
{
let write = directory.open_write(&path).unwrap();
let mut term_dictionary_builder = TermDictionaryBuilder::new(write).unwrap();
term_dictionary_builder
.insert("abc".as_bytes(), &34u32)
.unwrap();
term_dictionary_builder
.insert("abcd".as_bytes(), &346u32)
.unwrap();
term_dictionary_builder.finish().unwrap();
}
let source = directory.open_read(&path).unwrap();
let term_dict: TermDictionary<u32> = TermDictionary::from_source(source).unwrap();
assert_eq!(term_dict.get("abc"), Some(34u32));
assert_eq!(term_dict.get("abcd"), Some(346u32));
let mut stream = term_dict.stream();
assert_eq!(stream.next().unwrap(), ("abc".as_bytes(), 34u32));
assert_eq!(stream.key(), "abc".as_bytes());
assert_eq!(stream.value(), 34u32);
assert_eq!(stream.next().unwrap(), ("abcd".as_bytes(), 346u32));
assert_eq!(stream.key(), "abcd".as_bytes());
assert_eq!(stream.value(), 346u32);
assert!(!stream.advance());
}
}