diff --git a/src/core/mod.rs b/src/core/mod.rs index 6f29254d9..dca8b5ccd 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -7,7 +7,6 @@ mod segment; mod index_meta; mod pool; mod segment_meta; -mod term_iterator; pub use self::searcher::Searcher; pub use self::segment_component::SegmentComponent; @@ -18,7 +17,6 @@ pub use self::segment::SerializableSegment; pub use self::index::Index; pub use self::segment_meta::SegmentMeta; pub use self::index_meta::IndexMeta; -pub use self::term_iterator::TermIterator; use std::path::PathBuf; diff --git a/src/core/searcher.rs b/src/core/searcher.rs index a869fa69c..8f6d36b82 100644 --- a/src/core/searcher.rs +++ b/src/core/searcher.rs @@ -7,8 +7,9 @@ use query::Query; use DocId; use DocAddress; use schema::Term; -use core::TermIterator; +use termdict::TermMerger; use std::fmt; +use postings::TermInfo; /// Holds a list of `SegmentReader`s ready for search. @@ -49,18 +50,6 @@ impl Searcher { .fold(0u32, |acc, val| acc + val) } - /// Returns a Stream over all of the sorted unique terms of - /// the searcher. - /// - /// This includes all of the fields from all of the segment_readers. - /// See [TermIterator](struct.TermIterator.html). - /// - /// # Warning - /// This API is very likely to change in the future. - pub fn terms(&self) -> TermIterator { - TermIterator::from(self.segment_readers()) - } - /// Return the list of segment readers pub fn segment_readers(&self) -> &[SegmentReader] { &self.segment_readers @@ -75,6 +64,18 @@ impl Searcher { pub fn search(&self, query: &Query, collector: &mut C) -> Result { query.search(self, collector) } + + /// Returns a Stream over all of the sorted unique terms of + /// the searcher. + /// + /// This includes all of the fields from all of the segment_readers. + /// See [TermIterator](struct.TermIterator.html). + /// + /// # Warning + /// This API is very likely to change in the future. + pub fn terms(&self) -> TermMerger { + TermMerger::from(self.segment_readers()) + } } diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index 33b8f5a06..ba05fb632 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -12,8 +12,9 @@ use schema::Document; use directory::ReadOnlySource; use DocId; use std::str; +use std::cmp; use postings::TermInfo; -use datastruct::FstMap; +use termdict::TermDictionary; use std::sync::Arc; use std::fmt; use schema::Field; @@ -42,7 +43,7 @@ use schema::TextIndexingOptions; pub struct SegmentReader { segment_id: SegmentId, segment_meta: SegmentMeta, - term_infos: Arc>, + terms: Arc, postings_data: ReadOnlySource, store_reader: StoreReader, fast_fields_reader: Arc, @@ -133,16 +134,19 @@ impl SegmentReader { /// Open a new segment for reading. pub fn open(segment: Segment) -> Result { - let source = try!(segment.open_read(SegmentComponent::TERMS)); - let term_infos = try!(FstMap::from_source(source)); - let store_reader = StoreReader::from(try!(segment.open_read(SegmentComponent::STORE))); - let postings_shared_mmap = try!(segment.open_read(SegmentComponent::POSTINGS)); + let source = segment.open_read(SegmentComponent::TERMS)?; + let terms = TermDictionary::from_source(source)?; - let fast_field_data = try!(segment.open_read(SegmentComponent::FASTFIELDS)); - let fast_fields_reader = try!(FastFieldsReader::open(fast_field_data)); + let store_source = segment.open_read(SegmentComponent::STORE)?; + let store_reader = StoreReader::from_source(store_source); - let fieldnorms_data = try!(segment.open_read(SegmentComponent::FIELDNORMS)); - let fieldnorms_reader = try!(FastFieldsReader::open(fieldnorms_data)); + let postings_shared_mmap = segment.open_read(SegmentComponent::POSTINGS)?; + + let fast_field_data = segment.open_read(SegmentComponent::FASTFIELDS)?; + let fast_fields_reader = FastFieldsReader::from_source(fast_field_data)?; + + let fieldnorms_data = segment.open_read(SegmentComponent::FIELDNORMS)?; + let fieldnorms_reader = FastFieldsReader::from_source(fieldnorms_data)?; let positions_data = segment .open_read(SegmentComponent::POSITIONS) @@ -159,7 +163,7 @@ impl SegmentReader { Ok(SegmentReader { segment_meta: segment.meta().clone(), postings_data: postings_shared_mmap, - term_infos: Arc::new(term_infos), + terms: Arc::new(terms), segment_id: segment.id(), store_reader: store_reader, fast_fields_reader: Arc::new(fast_fields_reader), @@ -171,8 +175,8 @@ impl SegmentReader { } /// Return the term dictionary datastructure. - pub fn term_infos(&self) -> &FstMap { - &self.term_infos + pub fn terms(&self) -> &TermDictionary { + &self.terms } /// Returns the document (or to be accurate, its stored field) @@ -201,39 +205,35 @@ impl SegmentReader { let field = term.field(); let field_entry = self.schema.get_field_entry(field); let term_info = get!(self.get_term_info(term)); + let maximum_option = get!(field_entry.field_type().get_segment_postings_option()); + let best_effort_option = cmp::min(maximum_option, option); + Some(self.read_postings_from_terminfo(&term_info, best_effort_option)) + } + + + /// Returns a posting object given a `term_info`. + /// This method is for an advanced usage only. + /// + /// Most user should prefer using `read_postings` instead. + pub fn read_postings_from_terminfo(&self, + term_info: &TermInfo, + option: SegmentPostingsOption) + -> SegmentPostings { let offset = term_info.postings_offset as usize; let postings_data = &self.postings_data[offset..]; - let freq_handler = match *field_entry.field_type() { - FieldType::Str(ref options) => { - let indexing_options = options.get_indexing_options(); - match option { - SegmentPostingsOption::NoFreq => FreqHandler::new_without_freq(), - SegmentPostingsOption::Freq => { - if indexing_options.is_termfreq_enabled() { - FreqHandler::new_with_freq() - } else { - FreqHandler::new_without_freq() - } - } - SegmentPostingsOption::FreqAndPositions => { - if indexing_options == TextIndexingOptions::TokenizedWithFreqAndPosition { - let offset = term_info.positions_offset as usize; - let offseted_position_data = &self.positions_data[offset..]; - FreqHandler::new_with_freq_and_position(offseted_position_data) - } else if indexing_options.is_termfreq_enabled() { - FreqHandler::new_with_freq() - } else { - FreqHandler::new_without_freq() - } - } - } + let freq_handler = match option { + SegmentPostingsOption::NoFreq => FreqHandler::new_without_freq(), + SegmentPostingsOption::Freq => FreqHandler::new_with_freq(), + SegmentPostingsOption::FreqAndPositions => { + let offset = term_info.positions_offset as usize; + let offseted_position_data = &self.positions_data[offset..]; + FreqHandler::new_with_freq_and_position(offseted_position_data) } - _ => FreqHandler::new_without_freq(), }; - Some(SegmentPostings::from_data(term_info.doc_freq, - postings_data, - &self.delete_bitset, - freq_handler)) + SegmentPostings::from_data(term_info.doc_freq, + postings_data, + &self.delete_bitset, + freq_handler) } @@ -262,7 +262,7 @@ impl SegmentReader { /// Returns the term info associated with the term. pub fn get_term_info(&self, term: &Term) -> Option { - self.term_infos.get(term.as_slice()) + self.terms.get(term.as_slice()) } /// Returns the segment id diff --git a/src/core/term_iterator.rs b/src/core/term_iterator.rs deleted file mode 100644 index 143e8aa6b..000000000 --- a/src/core/term_iterator.rs +++ /dev/null @@ -1,182 +0,0 @@ -use fst::Streamer; -use std::mem; -use std::collections::BinaryHeap; -use fst::map::Keys; -use schema::Field; -use schema::Term; -use core::SegmentReader; -use std::cmp::Ordering; - - -#[derive(PartialEq, Eq, Debug)] -struct HeapItem { - term: Term, - segment_ord: usize, -} - -impl PartialOrd for HeapItem { - fn partial_cmp(&self, other: &HeapItem) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for HeapItem { - fn cmp(&self, other: &HeapItem) -> Ordering { - (&other.term, &other.segment_ord).cmp(&(&self.term, &self.segment_ord)) - } -} - -/// Given a list of sorted term streams, -/// returns an iterator over sorted unique terms. -/// -/// The item yield is actually a pair with -/// - the term -/// - a slice with the ordinal of the segments containing -/// the terms. -pub struct TermIterator<'a> { - key_streams: Vec>, - heap: BinaryHeap, - // Buffer hosting the list of segment ordinals containing - // the current term. - current_term: Term, - current_segment_ords: Vec, -} - -impl<'a> TermIterator<'a> { - fn new(key_streams: Vec>) -> TermIterator<'a> { - let key_streams_len = key_streams.len(); - TermIterator { - key_streams: key_streams, - heap: BinaryHeap::new(), - current_term: Term::from_field_text(Field(0), ""), - current_segment_ords: (0..key_streams_len).collect(), - } - } - - /// Advance the term iterator to the next term. - /// Returns true if there is indeed another term - /// False if there is none. - pub fn advance(&mut self) -> bool { - self.advance_segments(); - if let Some(mut head) = self.heap.pop() { - mem::swap(&mut self.current_term, &mut head.term); - self.current_segment_ords.push(head.segment_ord); - loop { - match self.heap.peek() { - Some(&ref next_heap_it) if next_heap_it.term == self.current_term => {} - _ => { - break; - } - } - let next_heap_it = self.heap.pop().unwrap(); // safe : we peeked beforehand - self.current_segment_ords.push(next_heap_it.segment_ord); - } - true - } else { - false - } - } - - - /// Returns the current term. - /// - /// This method may be called - /// iff advance() has been called before - /// and "true" was returned. - pub fn term(&self) -> &Term { - &self.current_term - } - - /// Returns the sorted list of segment ordinals - /// that include the current term. - /// - /// This method may be called - /// iff advance() has been called before - /// and "true" was returned. - pub fn segment_ords(&self) -> &[usize] { - &self.current_segment_ords[..] - } - - fn advance_segments(&mut self) { - for segment_ord in self.current_segment_ords.drain(..) { - if let Some(term) = self.key_streams[segment_ord].next() { - self.heap - .push(HeapItem { - term: Term::from_bytes(term), - segment_ord: segment_ord, - }); - } - } - } -} - -impl<'a, 'f> Streamer<'a> for TermIterator<'f> { - type Item = &'a Term; - - fn next(&'a mut self) -> Option { - if self.advance() { - Some(&self.current_term) - } else { - None - } - } -} - -impl<'a> From<&'a [SegmentReader]> for TermIterator<'a> { - fn from(segment_readers: &'a [SegmentReader]) -> TermIterator<'a> { - TermIterator::new(segment_readers - .iter() - .map(|reader| reader.term_infos().keys()) - .collect()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use schema::{SchemaBuilder, Document, TEXT}; - use core::Index; - - #[test] - fn test_term_iterator() { - let mut schema_builder = SchemaBuilder::default(); - let text_field = schema_builder.add_text_field("text", TEXT); - let index = Index::create_in_ram(schema_builder.build()); - { - let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); - { - { - let mut doc = Document::default(); - doc.add_text(text_field, "a b d f"); - index_writer.add_document(doc); - } - index_writer.commit().unwrap(); - } - { - { - let mut doc = Document::default(); - doc.add_text(text_field, "a b c d f"); - index_writer.add_document(doc); - } - index_writer.commit().unwrap(); - } - { - { - let mut doc = Document::default(); - doc.add_text(text_field, "e f"); - index_writer.add_document(doc); - } - index_writer.commit().unwrap(); - } - } - index.load_searchers().unwrap(); - let searcher = index.searcher(); - let mut term_it = searcher.terms(); - let mut terms = String::new(); - while let Some(term) = term_it.next() { - terms.push_str(term.text()); - } - assert_eq!(terms, "abcdef"); - } - -} diff --git a/src/datastruct/fstmap.rs b/src/datastruct/fstmap.rs deleted file mode 100644 index 79c35a4e2..000000000 --- a/src/datastruct/fstmap.rs +++ /dev/null @@ -1,150 +0,0 @@ -use std::io; -use std::io::Write; -use fst; -use fst::raw::Fst; - -use directory::ReadOnlySource; -use common::BinarySerializable; -use std::marker::PhantomData; - -fn convert_fst_error(e: fst::Error) -> io::Error { - io::Error::new(io::ErrorKind::Other, e) -} - -pub struct FstMapBuilder { - fst_builder: fst::MapBuilder, - data: Vec, - _phantom_: PhantomData, -} - -impl FstMapBuilder { - pub fn new(w: W) -> io::Result> { - let fst_builder = try!(fst::MapBuilder::new(w).map_err(convert_fst_error)); - Ok(FstMapBuilder { - fst_builder: fst_builder, - data: Vec::new(), - _phantom_: PhantomData, - }) - } - - /// Horribly unsafe, nobody should ever do that... except me :) - /// - /// If used, it must be used by systematically alternating calls - /// to insert_key and insert_value. - /// - /// TODO see if I can bend Rust typesystem to enforce that - /// in a nice way. - pub fn insert_key(&mut self, key: &[u8]) -> io::Result<()> { - try!(self.fst_builder - .insert(key, self.data.len() as u64) - .map_err(convert_fst_error)); - Ok(()) - } - - /// Horribly unsafe, nobody should ever do that... except me :) - pub fn insert_value(&mut self, value: &V) -> io::Result<()> { - try!(value.serialize(&mut self.data)); - Ok(()) - } - - #[cfg(test)] - pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()> { - try!(self.fst_builder - .insert(key, self.data.len() as u64) - .map_err(convert_fst_error)); - try!(value.serialize(&mut self.data)); - Ok(()) - } - - pub fn finish(self) -> io::Result { - let mut file = try!(self.fst_builder.into_inner().map_err(convert_fst_error)); - let footer_size = self.data.len() as u32; - try!(file.write_all(&self.data)); - try!((footer_size as u32).serialize(&mut file)); - try!(file.flush()); - Ok(file) - } -} - -pub struct FstMap { - fst_index: fst::Map, - values_mmap: ReadOnlySource, - _phantom_: PhantomData, -} - - -fn open_fst_index(source: ReadOnlySource) -> io::Result { - Ok(fst::Map::from(match source { - ReadOnlySource::Anonymous(data) => { - try!(Fst::from_shared_bytes(data.data, data.start, data.len) - .map_err(convert_fst_error)) - } - ReadOnlySource::Mmap(mmap_readonly) => { - try!(Fst::from_mmap(mmap_readonly).map_err(convert_fst_error)) - } - })) -} - -impl FstMap { - pub fn keys(&self) -> fst::map::Keys { - self.fst_index.keys() - } - - pub fn from_source(source: ReadOnlySource) -> io::Result> { - let total_len = source.len(); - let length_offset = total_len - 4; - let mut split_len_buffer: &[u8] = &source.as_slice()[length_offset..]; - let footer_size = try!(u32::deserialize(&mut split_len_buffer)) as usize; - let split_len = length_offset - footer_size; - let fst_source = source.slice(0, split_len); - let values_source = source.slice(split_len, length_offset); - let fst_index = try!(open_fst_index(fst_source)); - Ok(FstMap { - fst_index: fst_index, - values_mmap: values_source, - _phantom_: PhantomData, - }) - } - - fn read_value(&self, offset: u64) -> V { - let buffer = self.values_mmap.as_slice(); - let mut cursor = &buffer[(offset as usize)..]; - V::deserialize(&mut cursor).expect("Data in FST is corrupted") - } - - pub fn get>(&self, key: K) -> Option { - self.fst_index - .get(key) - .map(|offset| self.read_value(offset)) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use directory::{RAMDirectory, Directory}; - use std::path::PathBuf; - use fst::Streamer; - - #[test] - fn test_fstmap() { - let mut directory = RAMDirectory::create(); - let path = PathBuf::from("fstmap"); - { - let write = directory.open_write(&path).unwrap(); - let mut fstmap_builder = FstMapBuilder::new(write).unwrap(); - fstmap_builder.insert("abc".as_bytes(), &34u32).unwrap(); - fstmap_builder.insert("abcd".as_bytes(), &346u32).unwrap(); - fstmap_builder.finish().unwrap(); - } - let source = directory.open_read(&path).unwrap(); - let fstmap: FstMap = FstMap::from_source(source).unwrap(); - assert_eq!(fstmap.get("abc"), Some(34u32)); - assert_eq!(fstmap.get("abcd"), Some(346u32)); - let mut keys = fstmap.keys(); - assert_eq!(keys.next().unwrap(), "abc".as_bytes()); - assert_eq!(keys.next().unwrap(), "abcd".as_bytes()); - assert_eq!(keys.next(), None); - } - -} diff --git a/src/datastruct/mod.rs b/src/datastruct/mod.rs index f026afa7f..5ff10f7c7 100644 --- a/src/datastruct/mod.rs +++ b/src/datastruct/mod.rs @@ -1,7 +1,4 @@ -mod fstmap; mod skip; pub mod stacker; -pub use self::fstmap::FstMapBuilder; -pub use self::fstmap::FstMap; pub use self::skip::{SkipListBuilder, SkipList}; diff --git a/src/directory/mod.rs b/src/directory/mod.rs index f47cfdcbb..b107d78c5 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -1,3 +1,8 @@ +/*! + +WORM directory abstraction. + +*/ mod mmap_directory; mod ram_directory; mod directory; diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index 061ed3910..ad2988ce4 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -1,25 +1,27 @@ -//! # Fast fields -//! -//! Fast fields are the equivalent of `DocValues` in `Lucene`. -//! Fast fields is a non-compressed column-oriented fashion storage -//! of `tantivy`. -//! -//! It is designed for the fast random access of some document -//! fields given a document id. -//! -//! `FastField` are useful when a field is required for all or most of -//! the `DocSet` : for instance for scoring, grouping, filtering, or facetting. -//! -//! -//! Fields have to be declared as `FAST` in the schema. -//! Currently only 64-bits integers (signed or unsigned) are -//! supported. -//! -//! They are stored in a bitpacked fashion so that their -//! memory usage is directly linear with the amplitude of the -//! values stored. -//! -//! Read access performance is comparable to that of an array lookup. +/*! +Fast fields is a column oriented storage storage. + +It is the equivalent of `Lucene`'s `DocValues`. + +Fast fields is a column-oriented fashion storage of `tantivy`. + +It is designed for the fast random access of some document +fields given a document id. + +`FastField` are useful when a field is required for all or most of +the `DocSet` : for instance for scoring, grouping, filtering, or facetting. + + +Fields have to be declared as `FAST` in the schema. +Currently only 64-bits integers (signed or unsigned) are +supported. + +They are stored in a bitpacked fashion so that their +memory usage is directly linear with the amplitude of the +values stored. + +Read access performance is comparable to that of an array lookup. +*/ mod reader; mod writer; @@ -95,7 +97,7 @@ mod tests { assert_eq!(source.len(), 38 as usize); } { - let fast_field_readers = FastFieldsReader::open(source).unwrap(); + let fast_field_readers = FastFieldsReader::from_source(source).unwrap(); let fast_field_reader: U64FastFieldReader = fast_field_readers.open_reader(*FIELD).unwrap(); assert_eq!(fast_field_reader.get(0), 13u64); @@ -129,7 +131,7 @@ mod tests { assert_eq!(source.len(), 63 as usize); } { - let fast_field_readers = FastFieldsReader::open(source).unwrap(); + let fast_field_readers = FastFieldsReader::from_source(source).unwrap(); let fast_field_reader: U64FastFieldReader = fast_field_readers.open_reader(*FIELD).unwrap(); assert_eq!(fast_field_reader.get(0), 4u64); @@ -165,7 +167,7 @@ mod tests { assert_eq!(source.len(), 36 as usize); } { - let fast_field_readers = FastFieldsReader::open(source).unwrap(); + let fast_field_readers = FastFieldsReader::from_source(source).unwrap(); let fast_field_reader: U64FastFieldReader = fast_field_readers.open_reader(*FIELD).unwrap(); for doc in 0..10_000 { @@ -198,7 +200,7 @@ mod tests { assert_eq!(source.len(), 80044 as usize); } { - let fast_field_readers = FastFieldsReader::open(source).unwrap(); + let fast_field_readers = FastFieldsReader::from_source(source).unwrap(); let fast_field_reader: U64FastFieldReader = fast_field_readers.open_reader(*FIELD).unwrap(); assert_eq!(fast_field_reader.get(0), 0u64); @@ -235,7 +237,7 @@ mod tests { assert_eq!(source.len(), 17711 as usize); } { - let fast_field_readers = FastFieldsReader::open(source).unwrap(); + let fast_field_readers = FastFieldsReader::from_source(source).unwrap(); let fast_field_reader: I64FastFieldReader = fast_field_readers.open_reader(i64_field).unwrap(); assert_eq!(fast_field_reader.min_value(), -100i64); @@ -266,7 +268,7 @@ mod tests { let source = directory.open_read(&path).unwrap(); { - let fast_field_readers = FastFieldsReader::open(source).unwrap(); + let fast_field_readers = FastFieldsReader::from_source(source).unwrap(); let fast_field_reader: I64FastFieldReader = fast_field_readers.open_reader(i64_field).unwrap(); assert_eq!(fast_field_reader.get(0u32), 0i64); @@ -299,7 +301,7 @@ mod tests { } let source = directory.open_read(&path).unwrap(); { - let fast_field_readers = FastFieldsReader::open(source).unwrap(); + let fast_field_readers = FastFieldsReader::from_source(source).unwrap(); let fast_field_reader: U64FastFieldReader = fast_field_readers.open_reader(*FIELD).unwrap(); let mut a = 0u64; @@ -357,7 +359,7 @@ mod tests { } let source = directory.open_read(&path).unwrap(); { - let fast_field_readers = FastFieldsReader::open(source).unwrap(); + let fast_field_readers = FastFieldsReader::from_source(source).unwrap(); let fast_field_reader: U64FastFieldReader = fast_field_readers.open_reader(*FIELD).unwrap(); b.iter(|| { @@ -388,7 +390,7 @@ mod tests { } let source = directory.open_read(&path).unwrap(); { - let fast_field_readers = FastFieldsReader::open(source).unwrap(); + let fast_field_readers = FastFieldsReader::from_source(source).unwrap(); let fast_field_reader: U64FastFieldReader = fast_field_readers.open_reader(*FIELD).unwrap(); b.iter(|| { diff --git a/src/fastfield/reader.rs b/src/fastfield/reader.rs index 7f4684fda..aa3050632 100644 --- a/src/fastfield/reader.rs +++ b/src/fastfield/reader.rs @@ -128,7 +128,7 @@ impl From> for U64FastFieldReader { serializer.close().unwrap(); } let source = directory.open_read(path).unwrap(); - let fast_field_readers = FastFieldsReader::open(source).unwrap(); + let fast_field_readers = FastFieldsReader::from_source(source).unwrap(); fast_field_readers.open_reader(field).unwrap() } } @@ -194,12 +194,12 @@ pub struct FastFieldsReader { } impl FastFieldsReader { - /// Opens the `FastFieldsReader` file + /// Opens a `FastFieldsReader` /// /// When opening the fast field reader, the /// the list of the offset is read (as a footer of the /// data file). - pub fn open(source: ReadOnlySource) -> io::Result { + pub fn from_source(source: ReadOnlySource) -> io::Result { let header_offset; let field_offsets: Vec<(Field, u32)>; { diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 6a038812c..0a774a160 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -273,7 +273,7 @@ fn index_documents(heap: &mut Heap, // // Tantivy does not resize its hashtable. When it reaches // capacity, we just stop indexing new document. - if segment_writer.is_termdic_saturated() { + if segment_writer.is_term_saturated() { info!("Term dic saturated, flushing segment with maxdoc={}.", segment_writer.max_doc()); break; diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 8890c6056..8d5a41eef 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -10,13 +10,15 @@ use fastfield::U64FastFieldReader; use itertools::Itertools; use postings::Postings; use postings::DocSet; -use core::TermIterator; use fastfield::DeleteBitSet; use schema::{Schema, Field}; +use termdict::TermMerger; use fastfield::FastFieldSerializer; use fastfield::FastFieldReader; use store::StoreWriter; use std::cmp::{min, max}; +use schema; +use postings::SegmentPostingsOption; pub struct IndexMerger { schema: Schema, @@ -161,7 +163,6 @@ impl IndexMerger { return Err(Error::SchemaError(error_msg)); } } - } if u64_readers.is_empty() { @@ -172,24 +173,25 @@ impl IndexMerger { assert!(min_val <= max_val); - try!(fast_field_serializer.new_u64_fast_field(field, min_val, max_val)); + fast_field_serializer + .new_u64_fast_field(field, min_val, max_val)?; for (max_doc, u64_reader, delete_bitset) in u64_readers { for doc_id in 0..max_doc { if !delete_bitset.is_deleted(doc_id) { let val = u64_reader.get(doc_id); - try!(fast_field_serializer.add_val(val)); + fast_field_serializer.add_val(val)?; } } } - try!(fast_field_serializer.close_field()); + fast_field_serializer.close_field()?; } Ok(()) } - fn write_postings(&self, postings_serializer: &mut PostingsSerializer) -> Result<()> { + fn write_postings(&self, serializer: &mut PostingsSerializer) -> Result<()> { - let mut merged_terms = TermIterator::from(&self.readers[..]); + let mut merged_terms = TermMerger::from(&self.readers[..]); let mut delta_position_computer = DeltaPositionComputer::new(); let mut max_doc = 0; @@ -212,6 +214,8 @@ impl IndexMerger { let mut last_field: Option = None; + let mut segment_postings_option = SegmentPostingsOption::FreqAndPositions; + while merged_terms.advance() { // Create the total list of doc ids // by stacking the doc ids from the different segment. @@ -223,59 +227,85 @@ impl IndexMerger { // - Segment 2's doc ids become [seg0.max_doc + seg1.max_doc, // seg0.max_doc + seg1.max_doc + seg2.max_doc] // ... - let term = merged_terms.term(); - let mut term_written = false; - let segment_postings = merged_terms - .segment_ords() + let term_bytes = merged_terms.key(); + let current_field = schema::extract_field_from_term_bytes(term_bytes); + + if last_field != Some(current_field) { + // we reached a new field. + let field_entry = self.schema.get_field_entry(current_field); + // ... set segment postings option the new field. + segment_postings_option = field_entry + .field_type() + .get_segment_postings_option() + .expect("Encounterred a field that is not supposed to be + indexed. Have you modified the index?"); + last_field = Some(current_field); + + // it is perfectly safe to call `.new_field` + // even if there is no postings associated. + serializer.new_field(current_field); + } + + // Let's compute the list of non-empty posting lists + let segment_postings: Vec<_> = merged_terms + .current_kvs() .iter() - .cloned() - .flat_map(|segment_ord| { - self.readers[segment_ord] - .read_postings_all_info(term) - .map(|segment_postings| (segment_ord, segment_postings)) - }) - .collect::>(); - - // We can remove the term if all documents which - // contained it have been deleted. - if !segment_postings.is_empty() { - - // We can now serialize this postings, by pushing each document to the - // postings serializer. - - for (segment_ord, mut segment_postings) in segment_postings { - let old_to_new_doc_id = &merged_doc_id_map[segment_ord]; - while segment_postings.advance() { - if let Some(remapped_doc_id) = - old_to_new_doc_id[segment_postings.doc() as usize] { - if !term_written { - let current_field = term.field(); - if last_field != Some(current_field) { - postings_serializer.new_field(current_field); - last_field = Some(current_field); - } - - // we make sure to only write the term iff - // there is at least one document. - postings_serializer.new_term(term.as_slice())?; - - term_written = true; - } - let delta_positions: &[u32] = - delta_position_computer - .compute_delta_positions(segment_postings.positions()); - try!(postings_serializer.write_doc(remapped_doc_id, - segment_postings.term_freq(), - delta_positions)); - } + .flat_map(|heap_item| { + let segment_ord = heap_item.segment_ord; + let term_info = heap_item.streamer.value(); + let segment_reader = &self.readers[heap_item.segment_ord]; + let mut segment_postings = + segment_reader + .read_postings_from_terminfo(&term_info, segment_postings_option); + if segment_postings.advance() { + Some((segment_ord, segment_postings)) + } else { + None } - } + }) + .collect(); - if term_written { - try!(postings_serializer.close_term()); + // At this point, `segment_postings` contains the posting list + // of all of the segments containing the given term. + // + // These segments are non-empty and advance has already been called. + + if segment_postings.is_empty() { + // by continuing here, the `term` will be entirely removed. + continue; + } + + // We know that there is at least one document containing + // the term, so we add it. + serializer.new_term(term_bytes)?; + + // We can now serialize this postings, by pushing each document to the + // postings serializer. + + for (segment_ord, mut segment_postings) in segment_postings { + let old_to_new_doc_id = &merged_doc_id_map[segment_ord]; + loop { + // `.advance()` has been called once before the loop. + // Hence we cannot use a `while segment_postings.advance()` loop. + if let Some(remapped_doc_id) = + old_to_new_doc_id[segment_postings.doc() as usize] { + // we make sure to only write the term iff + // there is at least one document. + let delta_positions: &[u32] = + delta_position_computer + .compute_delta_positions(segment_postings.positions()); + let term_freq = segment_postings.term_freq(); + serializer + .write_doc(remapped_doc_id, term_freq, delta_positions)?; + } + if !segment_postings.advance() { + break; + } } } + // closing the term. + serializer.close_term()?; } Ok(()) } diff --git a/src/indexer/segment_serializer.rs b/src/indexer/segment_serializer.rs index 7d800f708..b75663927 100644 --- a/src/indexer/segment_serializer.rs +++ b/src/indexer/segment_serializer.rs @@ -58,10 +58,10 @@ impl SegmentSerializer { /// Finalize the segment serialization. pub fn close(self) -> Result<()> { - try!(self.fast_field_serializer.close()); - try!(self.postings_serializer.close()); - try!(self.store_writer.close()); - try!(self.fieldnorms_serializer.close()); + self.fast_field_serializer.close()?; + self.postings_serializer.close()?; + self.store_writer.close()?; + self.fieldnorms_serializer.close()?; Ok(()) } } diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index ffaae2cca..c5d4d6662 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -98,8 +98,8 @@ impl<'a> SegmentWriter<'a> { /// Return true if the term dictionary hashmap is reaching capacity. /// It is one of the condition that triggers a `SegmentWriter` to /// be finalized. - pub(crate) fn is_termdic_saturated(&self) -> bool { - self.multifield_postings.is_termdic_saturated() + pub(crate) fn is_term_saturated(&self) -> bool { + self.multifield_postings.is_term_saturated() } diff --git a/src/lib.rs b/src/lib.rs index 96369739d..bfd098a96 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,6 +12,8 @@ #![doc(test(attr(allow(unused_variables), deny(warnings))))] +#![allow(unknown_lints)] + #![warn(missing_docs)] //! # `tantivy` @@ -109,9 +111,10 @@ mod datastruct; +pub mod termdict; + /// Query module pub mod query; -/// Directory module pub mod directory; /// Collector module pub mod collector; @@ -134,8 +137,6 @@ pub use postings::DocSet; pub use postings::Postings; pub use postings::SegmentPostingsOption; -pub use core::TermIterator; - /// Expose the current version of tantivy, as well /// whether it was compiled with the simd compression. @@ -147,8 +148,7 @@ pub fn version() -> &'static str { } } -/// Tantivy's makes it possible to personalize when -/// the indexer should merge its segments +/// Defines tantivy's merging strategy pub mod merge_policy { pub use indexer::MergePolicy; pub use indexer::LogMergePolicy; diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs index 018e22daf..65fba2f20 100644 --- a/src/postings/postings_writer.rs +++ b/src/postings/postings_writer.rs @@ -78,6 +78,7 @@ impl<'a> MultiFieldPostingsWriter<'a> { /// Serialize the inverted index. /// It pushes all term, one field at a time, towards the /// postings serializer. + #[allow(needless_range_loop)] pub fn serialize(&self, serializer: &mut PostingsSerializer) -> Result<()> { let mut term_offsets: Vec<(&[u8], u32)> = self.term_index.iter().collect(); term_offsets.sort_by_key(|&(k, _v)| k); @@ -108,7 +109,7 @@ impl<'a> MultiFieldPostingsWriter<'a> { } /// Return true iff the term dictionary is saturated. - pub fn is_termdic_saturated(&self) -> bool { + pub fn is_term_saturated(&self) -> bool { self.term_index.is_saturated() } } diff --git a/src/postings/segment_postings_option.rs b/src/postings/segment_postings_option.rs index 53aac366a..51a07bb0b 100644 --- a/src/postings/segment_postings_option.rs +++ b/src/postings/segment_postings_option.rs @@ -6,7 +6,7 @@ /// avoid this extra cost when the information is not required. /// For instance, positions are useful when running phrase queries /// but useless in other queries. -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, PartialEq, PartialOrd, Ord, Eq)] pub enum SegmentPostingsOption { /// Only the doc ids are decoded NoFreq, @@ -15,3 +15,15 @@ pub enum SegmentPostingsOption { /// DocIds, term frequencies and positions will be decoded. FreqAndPositions, } + +#[cfg(test)] +mod tests { + + use super::SegmentPostingsOption; + + #[test] + fn test_cmp_segment_postings_option() { + assert!(SegmentPostingsOption::FreqAndPositions > SegmentPostingsOption::Freq); + assert!(SegmentPostingsOption::Freq > SegmentPostingsOption::NoFreq); + } +} diff --git a/src/postings/serializer.rs b/src/postings/serializer.rs index aa6d10bb5..1313ad445 100644 --- a/src/postings/serializer.rs +++ b/src/postings/serializer.rs @@ -1,5 +1,5 @@ use Result; -use datastruct::FstMapBuilder; +use termdict::TermDictionaryBuilder; use super::TermInfo; use schema::Field; use schema::FieldEntry; @@ -50,7 +50,7 @@ use common::BinarySerializable; /// A description of the serialization format is /// [available here](https://fulmicoton.gitbooks.io/tantivy-doc/content/inverted-index.html). pub struct PostingsSerializer { - terms_fst_builder: FstMapBuilder, + terms_fst_builder: TermDictionaryBuilder, postings_write: WritePtr, positions_write: WritePtr, written_bytes_postings: usize, @@ -74,7 +74,7 @@ impl PostingsSerializer { positions_write: WritePtr, schema: Schema) -> Result { - let terms_fst_builder = try!(FstMapBuilder::new(terms_write)); + let terms_fst_builder = try!(TermDictionaryBuilder::new(terms_write)); Ok(PostingsSerializer { terms_fst_builder: terms_fst_builder, postings_write: postings_write, diff --git a/src/schema/field_type.rs b/src/schema/field_type.rs index 67a0d42ff..fcaaf8013 100644 --- a/src/schema/field_type.rs +++ b/src/schema/field_type.rs @@ -2,7 +2,8 @@ use schema::{TextOptions, IntOptions}; use serde_json::Value as JsonValue; use schema::Value; - +use postings::SegmentPostingsOption; +use schema::TextIndexingOptions; /// Possible error that may occur while parsing a field value /// At this point the JSON is known to be valid. @@ -39,6 +40,34 @@ impl FieldType { } } + /// Given a field configuration, return the maximal possible + /// `SegmentPostingsOption` available. + /// + /// If the field is not indexed, then returns `None`. + pub fn get_segment_postings_option(&self) -> Option { + match *self { + FieldType::Str(ref text_options) => { + match text_options.get_indexing_options() { + TextIndexingOptions::Untokenized | + TextIndexingOptions::TokenizedNoFreq => Some(SegmentPostingsOption::NoFreq), + TextIndexingOptions::TokenizedWithFreq => Some(SegmentPostingsOption::Freq), + TextIndexingOptions::TokenizedWithFreqAndPosition => { + Some(SegmentPostingsOption::FreqAndPositions) + } + TextIndexingOptions::Unindexed => None, + } + } + FieldType::U64(ref int_options) | + FieldType::I64(ref int_options) => { + if int_options.is_indexed() { + Some(SegmentPostingsOption::NoFreq) + } else { + None + } + } + } + } + /// Parses a field value from json, given the target FieldType. /// /// Tantivy will not try to cast values. diff --git a/src/schema/term.rs b/src/schema/term.rs index 0051adb48..06d8ffbd9 100644 --- a/src/schema/term.rs +++ b/src/schema/term.rs @@ -106,6 +106,7 @@ impl Term { /// /// If you want to build a field for a given `str`, /// you want to use `from_field_text`. + #[cfg(test)] pub(crate) fn from_bytes(data: &[u8]) -> Term { Term(Vec::from(data)) } diff --git a/src/store/mod.rs b/src/store/mod.rs index dd7fa9390..8e2431d03 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -58,7 +58,7 @@ mod tests { let schema = write_lorem_ipsum_store(store_file, 1_000); let field_title = schema.get_field("title").unwrap(); let store_source = directory.open_read(path).unwrap(); - let store = StoreReader::from(store_source); + let store = StoreReader::from_source(store_source); for i in 0..1_000 { assert_eq!(*store.get(i).unwrap().get_first(field_title).unwrap().text(), format!("Doc {}", i)); @@ -82,7 +82,7 @@ mod tests { let path = Path::new("store"); write_lorem_ipsum_store(directory.open_write(path).unwrap(), 1_000); let store_source = directory.open_read(path).unwrap(); - let store = StoreReader::from(store_source); + let store = StoreReader::from_source(store_source); b.iter(|| { store.get(12).unwrap(); }); } diff --git a/src/store/reader.rs b/src/store/reader.rs index 5569a11a5..329ce5ae7 100644 --- a/src/store/reader.rs +++ b/src/store/reader.rs @@ -21,6 +21,17 @@ pub struct StoreReader { } impl StoreReader { + pub fn from_source(data: ReadOnlySource) -> StoreReader { + let (data_source, offset_index_source, max_doc) = split_source(data); + StoreReader { + data: data_source, + offset_index_source: offset_index_source, + current_block_offset: RefCell::new(usize::max_value()), + current_block: RefCell::new(Vec::new()), + max_doc: max_doc, + } + } + fn block_offset(&self, doc_id: DocId) -> (DocId, u64) { SkipList::from(self.offset_index_source.as_slice()) .seek(doc_id + 1) @@ -76,17 +87,3 @@ fn split_source(data: ReadOnlySource) -> (ReadOnlySource, ReadOnlySource, DocId) drop(data); res } - - -impl From for StoreReader { - fn from(data: ReadOnlySource) -> StoreReader { - let (data_source, offset_index_source, max_doc) = split_source(data); - StoreReader { - data: data_source, - offset_index_source: offset_index_source, - current_block_offset: RefCell::new(usize::max_value()), - current_block: RefCell::new(Vec::new()), - max_doc: max_doc, - } - } -} diff --git a/src/termdict/merger.rs b/src/termdict/merger.rs new file mode 100644 index 000000000..dbbc5f6e2 --- /dev/null +++ b/src/termdict/merger.rs @@ -0,0 +1,208 @@ +use std::collections::BinaryHeap; +use core::SegmentReader; +use super::TermStreamer; +use common::BinarySerializable; +use postings::TermInfo; +use std::cmp::Ordering; +use fst::Streamer; + +pub struct HeapItem<'a, V> + where V: 'a + BinarySerializable +{ + pub streamer: TermStreamer<'a, V>, + pub segment_ord: usize, +} + +impl<'a, V> PartialEq for HeapItem<'a, V> + where V: 'a + BinarySerializable +{ + fn eq(&self, other: &Self) -> bool { + self.segment_ord == other.segment_ord + } +} + +impl<'a, V> Eq for HeapItem<'a, V> where V: 'a + BinarySerializable {} + +impl<'a, V> PartialOrd for HeapItem<'a, V> + where V: 'a + BinarySerializable +{ + fn partial_cmp(&self, other: &HeapItem<'a, V>) -> Option { + Some(self.cmp(other)) + } +} + +impl<'a, V> Ord for HeapItem<'a, V> + where V: 'a + BinarySerializable +{ + fn cmp(&self, other: &HeapItem<'a, V>) -> Ordering { + (&other.streamer.key(), &other.segment_ord).cmp(&(&self.streamer.key(), &self.segment_ord)) + } +} + +/// Given a list of sorted term streams, +/// returns an iterator over sorted unique terms. +/// +/// The item yield is actually a pair with +/// - the term +/// - a slice with the ordinal of the segments containing +/// the terms. +pub struct TermMerger<'a, V> + where V: 'a + BinarySerializable +{ + heap: BinaryHeap>, + current_streamers: Vec>, +} + +impl<'a, V> TermMerger<'a, V> + where V: 'a + BinarySerializable +{ + fn new(streams: Vec>) -> TermMerger<'a, V> { + TermMerger { + heap: BinaryHeap::new(), + current_streamers: streams + .into_iter() + .enumerate() + .map(|(ord, streamer)| { + HeapItem { + streamer: streamer, + segment_ord: ord, + } + }) + .collect(), + } + } + + fn advance_segments(&mut self) { + let streamers = &mut self.current_streamers; + let heap = &mut self.heap; + for mut heap_item in streamers.drain(..) { + if heap_item.streamer.advance() { + heap.push(heap_item); + } + } + } + + + /// Advance the term iterator to the next term. + /// Returns true if there is indeed another term + /// False if there is none. + #[allow(while_let_loop)] + pub fn advance(&mut self) -> bool { + self.advance_segments(); + if let Some(head) = self.heap.pop() { + self.current_streamers.push(head); + loop { + if let Some(next_streamer) = self.heap.peek() { + if self.current_streamers[0].streamer.key() != next_streamer.streamer.key() { + break; + } + } else { + break; + } // no more streamer. + let next_heap_it = self.heap.pop().unwrap(); // safe : we peeked beforehand + self.current_streamers.push(next_heap_it); + } + true + } else { + false + } + } + + /// Returns the current term. + /// + /// This method may be called + /// iff advance() has been called before + /// and "true" was returned. + pub fn key(&self) -> &[u8] { + self.current_streamers[0].streamer.key() + } + + /// Returns the sorted list of segment ordinals + /// that include the current term. + /// + /// This method may be called + /// iff advance() has been called before + /// and "true" was returned. + pub fn current_kvs(&self) -> &[HeapItem<'a, V>] { + &self.current_streamers[..] + } +} + + + +impl<'a> From<&'a [SegmentReader]> for TermMerger<'a, TermInfo> + where TermInfo: BinarySerializable +{ + fn from(segment_readers: &'a [SegmentReader]) -> TermMerger<'a, TermInfo> { + TermMerger::new(segment_readers + .iter() + .map(|reader| reader.terms().stream()) + .collect()) + } +} + +impl<'a, V> Streamer<'a> for TermMerger<'a, V> + where V: BinarySerializable +{ + type Item = &'a [u8]; + + fn next(&'a mut self) -> Option { + if self.advance() { + Some(self.current_streamers[0].streamer.key()) + } else { + None + } + + } +} + +#[cfg(test)] +mod tests { + + use schema::{Term, SchemaBuilder, Document, TEXT}; + use core::Index; + + #[test] + fn test_term_iterator() { + let mut schema_builder = SchemaBuilder::default(); + let text_field = schema_builder.add_text_field("text", TEXT); + let index = Index::create_in_ram(schema_builder.build()); + { + let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); + { + { + let mut doc = Document::default(); + doc.add_text(text_field, "a b d f"); + index_writer.add_document(doc); + } + index_writer.commit().unwrap(); + } + { + { + let mut doc = Document::default(); + doc.add_text(text_field, "a b c d f"); + index_writer.add_document(doc); + } + index_writer.commit().unwrap(); + } + { + { + let mut doc = Document::default(); + doc.add_text(text_field, "e f"); + index_writer.add_document(doc); + } + index_writer.commit().unwrap(); + } + } + index.load_searchers().unwrap(); + let searcher = index.searcher(); + let mut term_it = searcher.terms(); + let mut term_string = String::new(); + while term_it.advance() { + let term = Term::from_bytes(term_it.key()); + term_string.push_str(term.text()); + } + assert_eq!(&*term_string, "abcdef"); + } + +} diff --git a/src/termdict/mod.rs b/src/termdict/mod.rs new file mode 100644 index 000000000..7e9db1d76 --- /dev/null +++ b/src/termdict/mod.rs @@ -0,0 +1,24 @@ +/*! +The term dictionary contains all of the terms in +`tantivy index` in a sorted manner. + +It is implemented as a wrapper of the `Fst` crate in order +to add a value type. + +A finite state transducer itself associates +each term `&[u8]` to a `u64` that is in fact an address +in a buffer. The value is then accessible via +deserializing the value at this address. + +Keys (`&[u8]`) in this datastructure are sorted. +*/ + +mod termdict; +mod streamer; +mod merger; + +pub use self::termdict::TermDictionary; +pub use self::termdict::TermDictionaryBuilder; +pub use self::streamer::TermStreamer; +pub use self::streamer::TermStreamerBuilder; +pub use self::merger::TermMerger; diff --git a/src/termdict/streamer.rs b/src/termdict/streamer.rs new file mode 100644 index 000000000..13327172b --- /dev/null +++ b/src/termdict/streamer.rs @@ -0,0 +1,142 @@ +use fst::{self, IntoStreamer, Streamer}; +use fst::map::{StreamBuilder, Stream}; +use common::BinarySerializable; +use super::TermDictionary; + +/// `TermStreamerBuilder` is an helper object used to define +/// a range of terms that should be streamed. +pub struct TermStreamerBuilder<'a, V> + where V: 'a + BinarySerializable +{ + fst_map: &'a TermDictionary, + stream_builder: StreamBuilder<'a>, +} + +impl<'a, V> TermStreamerBuilder<'a, V> + where V: 'a + BinarySerializable +{ + /// Limit the range to terms greater or equal to the bound + pub fn ge>(mut self, bound: T) -> Self { + self.stream_builder = self.stream_builder.ge(bound); + self + } + + /// Limit the range to terms strictly greater than the bound + pub fn gt>(mut self, bound: T) -> Self { + self.stream_builder = self.stream_builder.gt(bound); + self + } + + /// Limit the range to terms lesser or equal to the bound + pub fn le>(mut self, bound: T) -> Self { + self.stream_builder = self.stream_builder.le(bound); + self + } + + /// Limit the range to terms lesser or equal to the bound + pub fn lt>(mut self, bound: T) -> Self { + self.stream_builder = self.stream_builder.lt(bound); + self + } + + /// Creates the stream corresponding to the range + /// of terms defined using the `TermStreamerBuilder`. + pub fn into_stream(self) -> TermStreamer<'a, V> { + TermStreamer { + fst_map: self.fst_map, + stream: self.stream_builder.into_stream(), + buffer: Vec::with_capacity(100), + offset: 0u64, + } + } + + /// Crates a new `TermStreamBuilder` + pub(crate) fn new(fst_map: &'a TermDictionary, + stream_builder: StreamBuilder<'a>) + -> TermStreamerBuilder<'a, V> { + TermStreamerBuilder { + fst_map: fst_map, + stream_builder: stream_builder, + } + } +} + + + + +/// `TermStreamer` acts as a cursor over a range of terms of a segment. +/// Terms are guaranteed to be sorted. +pub struct TermStreamer<'a, V> + where V: 'a + BinarySerializable +{ + fst_map: &'a TermDictionary, + stream: Stream<'a>, + offset: u64, + buffer: Vec, +} + + +impl<'a, 'b, V> fst::Streamer<'b> for TermStreamer<'a, V> + where V: 'b + BinarySerializable +{ + type Item = (&'b [u8], V); + + fn next(&'b mut self) -> Option<(&'b [u8], V)> { + if self.advance() { + let v = self.value(); + Some((&self.buffer, v)) + } else { + None + } + } +} + +impl<'a, V> TermStreamer<'a, V> + where V: 'a + BinarySerializable +{ + /// Advance position the stream on the next item. + /// Before the first call to `.advance()`, the stream + /// is an unitialized state. + pub fn advance(&mut self) -> bool { + if let Some((term, offset)) = self.stream.next() { + self.buffer.clear(); + self.buffer.extend_from_slice(term); + self.offset = offset; + true + } else { + false + } + } + + /// Accesses the current key. + /// + /// `.key()` should return the key that was returned + /// by the `.next()` method. + /// + /// If the end of the stream as been reached, and `.next()` + /// has been called and returned `None`, `.key()` remains + /// the value of the last key encounterred. + /// + /// Before any call to `.next()`, `.key()` returns an empty array. + pub fn key(&self) -> &[u8] { + &self.buffer + } + + /// Accesses the current value. + /// + /// Values are accessed in a lazy manner, their data is fetched + /// and deserialized only at the moment of the call to `.value()`. + /// + /// Calling `.value()` after the end of the stream will return the + /// last `.value()` encounterred. + /// + /// # Panics + /// + /// Calling `.value()` before the first call to `.advance()` or `.next()` + /// is undefined behavior. + pub fn value(&self) -> V { + self.fst_map + .read_value(self.offset) + .expect("Fst data is corrupted. Failed to deserialize a value.") + } +} diff --git a/src/termdict/termdict.rs b/src/termdict/termdict.rs new file mode 100644 index 000000000..e713a24d7 --- /dev/null +++ b/src/termdict/termdict.rs @@ -0,0 +1,201 @@ +use std::io::{self, Write}; +use fst; +use fst::raw::Fst; +use super::{TermStreamerBuilder, TermStreamer}; +use directory::ReadOnlySource; +use common::BinarySerializable; +use std::marker::PhantomData; +use schema::{Field, Term}; +use postings::TermInfo; + + +fn convert_fst_error(e: fst::Error) -> io::Error { + io::Error::new(io::ErrorKind::Other, e) +} + + +/// Builder for the new term dictionary. +/// +/// Just like for the fst crate, all terms must be inserted in order. +pub struct TermDictionaryBuilder + where V: BinarySerializable +{ + fst_builder: fst::MapBuilder, + data: Vec, + _phantom_: PhantomData, +} + +impl TermDictionaryBuilder { + /// Creates a new `TermDictionaryBuilder` + pub fn new(w: W) -> io::Result> { + let fst_builder = fst::MapBuilder::new(w).map_err(convert_fst_error)?; + Ok(TermDictionaryBuilder { + fst_builder: fst_builder, + data: Vec::new(), + _phantom_: PhantomData, + }) + } + + /// # Warning + /// Horribly dangerous internal API + /// + /// If used, it must be used by systematically alternating calls + /// to insert_key and insert_value. + /// + /// Prefer using `.insert(key, value)` + pub(crate) fn insert_key(&mut self, key: &[u8]) -> io::Result<()> { + self.fst_builder + .insert(key, self.data.len() as u64) + .map_err(convert_fst_error)?; + Ok(()) + } + + /// # Warning + /// + /// Horribly dangerous internal API. See `.insert_key(...)`. + pub(crate) fn insert_value(&mut self, value: &V) -> io::Result<()> { + value.serialize(&mut self.data)?; + Ok(()) + } + + /// Inserts a `(key, value)` pair in the term dictionary. + /// + /// *Keys have to be inserted in order.* + pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()> { + self.fst_builder + .insert(key, self.data.len() as u64) + .map_err(convert_fst_error)?; + value.serialize(&mut self.data)?; + Ok(()) + } + + /// Finalize writing the builder, and returns the underlying + /// `Write` object. + pub fn finish(self) -> io::Result { + let mut file = self.fst_builder.into_inner().map_err(convert_fst_error)?; + let footer_size = self.data.len() as u32; + file.write_all(&self.data)?; + (footer_size as u32).serialize(&mut file)?; + file.flush()?; + Ok(file) + } +} + +/// Datastructure to access the `terms` of a segment. +pub struct TermDictionary + where V: BinarySerializable +{ + fst_index: fst::Map, + values_mmap: ReadOnlySource, + _phantom_: PhantomData, +} + +fn open_fst_index(source: ReadOnlySource) -> io::Result { + let fst = match source { + ReadOnlySource::Anonymous(data) => { + Fst::from_shared_bytes(data.data, data.start, data.len) + .map_err(convert_fst_error)? + } + ReadOnlySource::Mmap(mmap_readonly) => { + Fst::from_mmap(mmap_readonly).map_err(convert_fst_error)? + } + }; + Ok(fst::Map::from(fst)) +} + +impl TermDictionary + where V: BinarySerializable +{ + /// Opens a `TermDictionary` given a data source. + pub fn from_source(source: ReadOnlySource) -> io::Result> { + let total_len = source.len(); + let length_offset = total_len - 4; + let mut split_len_buffer: &[u8] = &source.as_slice()[length_offset..]; + let footer_size = u32::deserialize(&mut split_len_buffer)? as usize; + let split_len = length_offset - footer_size; + let fst_source = source.slice(0, split_len); + let values_source = source.slice(split_len, length_offset); + let fst_index = open_fst_index(fst_source)?; + Ok(TermDictionary { + fst_index: fst_index, + values_mmap: values_source, + _phantom_: PhantomData, + }) + } + + /// Deserialize and returns the value at address `offset` + pub(crate) fn read_value(&self, offset: u64) -> io::Result { + let buffer = self.values_mmap.as_slice(); + let mut cursor = &buffer[(offset as usize)..]; + V::deserialize(&mut cursor) + } + + /// Lookups the value corresponding to the key. + pub fn get>(&self, key: K) -> Option { + self.fst_index + .get(key) + .map(|offset| { + self.read_value(offset) + .expect("The fst is corrupted. Failed to deserialize a value.") + }) + } + + /// A stream of all the sorted terms. [See also `.stream_field()`](#method.stream_field) + pub fn stream(&self) -> TermStreamer { + self.range().into_stream() + } + + /// A stream of all the sorted terms in the given field. + pub fn stream_field(&self, field: Field) -> TermStreamer { + let start_term = Term::from_field_text(field, ""); + let stop_term = Term::from_field_text(Field(field.0 + 1), ""); + self.range() + .ge(start_term.as_slice()) + .lt(stop_term.as_slice()) + .into_stream() + } + + /// Returns a range builder, to stream all of the terms + /// within an interval. + pub fn range(&self) -> TermStreamerBuilder { + TermStreamerBuilder::new(self, self.fst_index.range()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use directory::{RAMDirectory, Directory}; + use std::path::PathBuf; + use fst::Streamer; + + #[test] + fn test_term_dictionary() { + let mut directory = RAMDirectory::create(); + let path = PathBuf::from("TermDictionary"); + { + let write = directory.open_write(&path).unwrap(); + let mut term_dictionary_builder = TermDictionaryBuilder::new(write).unwrap(); + term_dictionary_builder + .insert("abc".as_bytes(), &34u32) + .unwrap(); + term_dictionary_builder + .insert("abcd".as_bytes(), &346u32) + .unwrap(); + term_dictionary_builder.finish().unwrap(); + } + let source = directory.open_read(&path).unwrap(); + let term_dict: TermDictionary = TermDictionary::from_source(source).unwrap(); + assert_eq!(term_dict.get("abc"), Some(34u32)); + assert_eq!(term_dict.get("abcd"), Some(346u32)); + let mut stream = term_dict.stream(); + assert_eq!(stream.next().unwrap(), ("abc".as_bytes(), 34u32)); + assert_eq!(stream.key(), "abc".as_bytes()); + assert_eq!(stream.value(), 34u32); + assert_eq!(stream.next().unwrap(), ("abcd".as_bytes(), 346u32)); + assert_eq!(stream.key(), "abcd".as_bytes()); + assert_eq!(stream.value(), 346u32); + assert!(!stream.advance()); + } + +}