From ca5f3e1d4635ba263831bced5db47470bf482178 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 16 Dec 2016 23:20:05 +0100 Subject: [PATCH] issue/67 First stab. Iterator working. --- src/core/mod.rs | 4 +- src/core/searcher.rs | 12 +- src/core/term_iterator.rs | 191 ++++++++++++++++++++++++++++++ src/datastruct/fstmap.rs | 23 +--- src/datastruct/mod.rs | 1 - src/indexer/merger.rs | 237 ++++++++++++-------------------------- src/lib.rs | 3 +- src/schema/field.rs | 3 +- src/schema/term.rs | 10 ++ 9 files changed, 296 insertions(+), 188 deletions(-) create mode 100644 src/core/term_iterator.rs diff --git a/src/core/mod.rs b/src/core/mod.rs index f16911302..2dfac69d1 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -7,7 +7,7 @@ mod segment_component; mod segment; mod index_meta; mod pool; - +mod term_iterator; use std::path::PathBuf; @@ -19,7 +19,7 @@ pub use self::segment::SegmentInfo; pub use self::segment::SerializableSegment; pub use self::index::Index; pub use self::index_meta::{IndexMeta, SegmentMeta}; - +pub use self::term_iterator::TermIterator; lazy_static! { pub static ref META_FILEPATH: PathBuf = PathBuf::from("meta.json"); diff --git a/src/core/searcher.rs b/src/core/searcher.rs index b1b54a0e9..548ffc2cb 100644 --- a/src/core/searcher.rs +++ b/src/core/searcher.rs @@ -7,6 +7,7 @@ use query::Query; use DocId; use DocAddress; use schema::Term; +use core::TermIterator; /// Holds a list of `SegmentReader`s ready for search. @@ -47,9 +48,18 @@ impl Searcher { .map(|segment_reader| segment_reader.doc_freq(term)) .fold(0u32, |acc, val| acc + val) } + + /// Returns a Stream over all of the sorted unique terms of + /// the searcher. + /// + /// This includes all of the fields from all of the segment_readers. + /// See [TermIterator](struct.TermIterator.html). + pub fn terms<'a>(&'a self) -> TermIterator<'a> { + TermIterator::from(self.segment_readers()) + } /// Return the list of segment readers - pub fn segment_readers(&self,) -> &Vec { + pub fn segment_readers(&self,) -> &[SegmentReader] { &self.segment_readers } diff --git a/src/core/term_iterator.rs b/src/core/term_iterator.rs new file mode 100644 index 000000000..108bc03b6 --- /dev/null +++ b/src/core/term_iterator.rs @@ -0,0 +1,191 @@ +use fst::Streamer; +use std::mem; +use std::collections::BinaryHeap; +use fst::map::Keys; +use schema::Term; +use core::SegmentReader; +use std::cmp::Ordering; + + +static EMPTY: [u8; 0] = []; + +#[derive(PartialEq, Eq, Debug)] +struct HeapItem { + term: Term, + segment_ord: usize, +} + +impl PartialOrd for HeapItem { + fn partial_cmp(&self, other: &HeapItem) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for HeapItem { + fn cmp(&self, other: &HeapItem) -> Ordering { + (&other.term, &other.segment_ord).cmp(&(&self.term, &self.segment_ord)) + } +} + +/// Given a list of sorted term streams, +/// returns an iterator over sorted unique terms. +/// +/// The item yield is actually a pair with +/// - the term +/// - a slice with the ordinal of the segments containing +/// the terms. +pub struct TermIterator<'a> { + key_streams: Vec>, + heap: BinaryHeap, + // Buffer hosting the list of segment ordinals containing + // the current term. + current_term: Term, + current_segment_ords: Vec, +} + +impl<'a> TermIterator<'a> { + fn new(key_streams: Vec>) -> TermIterator<'a> { + let key_streams_len = key_streams.len(); + let mut term_iterator = TermIterator { + key_streams: key_streams, + heap: BinaryHeap::new(), + current_term: Term::from(&EMPTY[..]), + current_segment_ords: vec![], + }; + for segment_ord in 0..key_streams_len { + term_iterator.push_next_segment_el(segment_ord); + } + term_iterator + } + + fn push_next_segment_el(&mut self, segment_ord: usize) { + self.current_segment_ords.push(segment_ord); + if let Some(term) = self.key_streams[segment_ord].next() { + self.heap.push(HeapItem { + term: Term::from(term), + segment_ord: segment_ord, + }); + } + } +} + +impl<'a, 'f> Streamer<'a> for TermIterator<'f> { + type Item = (&'a Term, &'a [usize]); + + fn next(&'a mut self) -> Option { + self.current_segment_ords.clear(); + self.heap + .pop() + .map(move |mut head| { + mem::swap(&mut self.current_term, &mut head.term); + self.push_next_segment_el(head.segment_ord); + loop { + match self.heap.peek() { + Some(&ref next_heap_it) if next_heap_it.term == self.current_term => {} + _ => { + break; + } + } + let next_heap_it = self.heap + .pop() + .expect("This is only reached if an element was \ + peeked beforehand."); + self.push_next_segment_el(next_heap_it.segment_ord); + } + (&self.current_term, self.current_segment_ords.as_slice()) + }) + } +} + +impl<'a> From<&'a [SegmentReader]> for TermIterator<'a> { + fn from(segment_readers: &'a [SegmentReader]) -> TermIterator<'a> { + TermIterator::new(segment_readers.iter() + .map(|reader| reader.term_infos().keys()) + .collect()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use schema::{SchemaBuilder, Document, TEXT}; + use core::Index; + + #[test] + fn test_term_iterator() { + let mut schema_builder = SchemaBuilder::default(); + let text_field = schema_builder.add_text_field("text", TEXT); + let index = Index::create_in_ram(schema_builder.build()); + { + let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); + { + // writing the segment + { + let mut doc = Document::default(); + doc.add_text(text_field, "a b d f"); + index_writer.add_document(doc).unwrap(); + } + index_writer.commit().unwrap(); + } + { + // writing the segment + { + let mut doc = Document::default(); + doc.add_text(text_field, "a b c d f"); + index_writer.add_document(doc).unwrap(); + } + index_writer.commit().unwrap(); + } + { + // writing the segment + { + let mut doc = Document::default(); + doc.add_text(text_field, "e f"); + index_writer.add_document(doc).unwrap(); + } + index_writer.commit().unwrap(); + } + } + let searcher = index.searcher(); + let mut term_it = searcher.terms(); + { + + let (term, segments) = term_it.next().unwrap(); + assert_eq!(term.value(), "a".as_bytes()); + let expected_segments = [0, 1]; + assert_eq!(segments, &expected_segments); + + } + { + let (term, segments): (&Term, &[usize]) = term_it.next().unwrap(); + assert_eq!(term.value(), "b".as_bytes()); + let expected_segments = [0, 1]; + assert_eq!(segments, &expected_segments); + } + { + let (ref term, ref segments) = term_it.next().unwrap(); + assert_eq!(term.value(), "c".as_bytes()); + let expected_segments = [1]; + assert_eq!(segments, &expected_segments); + } + { + let (term, segments) = term_it.next().unwrap(); + assert_eq!(term.value(), "d".as_bytes()); + let expected_segments = [0, 1]; + assert_eq!(segments, &expected_segments); + } + { + let (term, segments) = term_it.next().unwrap(); + assert_eq!(term.value(), "e".as_bytes()); + let expected_segments = [2]; + assert_eq!(segments, &expected_segments); + } + { + let (term, segments) = term_it.next().unwrap(); + assert_eq!(term.value(), "f".as_bytes()); + let expected_segments = [0, 1, 2]; + assert_eq!(segments, &expected_segments); + } + } + +} \ No newline at end of file diff --git a/src/datastruct/fstmap.rs b/src/datastruct/fstmap.rs index 096ed8bed..1d4a420ed 100644 --- a/src/datastruct/fstmap.rs +++ b/src/datastruct/fstmap.rs @@ -4,7 +4,6 @@ use std::io; use std::io::Write; use fst; use fst::raw::Fst; -use fst::Streamer; use directory::ReadOnlySource; use common::BinarySerializable; @@ -66,27 +65,10 @@ fn open_fst_index(source: ReadOnlySource) -> io::Result { })) } -pub struct FstKeyIter<'a, V: 'static + BinarySerializable> { - streamer: fst::map::Stream<'a>, - __phantom__: PhantomData -} - -impl<'a, V: 'static + BinarySerializable> FstKeyIter<'a, V> { - pub fn next(&mut self) -> Option<(&[u8])> { - self.streamer - .next() - .map(|(k, _)| k) - } -} - - impl FstMap { - pub fn keys(&self,) -> FstKeyIter { - FstKeyIter { - streamer: self.fst_index.stream(), - __phantom__: PhantomData, - } + pub fn keys(&self,) -> fst::map::Keys { + self.fst_index.keys() } pub fn from_source(source: ReadOnlySource) -> io::Result> { @@ -123,6 +105,7 @@ mod tests { use super::*; use directory::{RAMDirectory, Directory}; use std::path::PathBuf; + use fst::Streamer; #[test] fn test_fstmap() { diff --git a/src/datastruct/mod.rs b/src/datastruct/mod.rs index 85489973b..f026afa7f 100644 --- a/src/datastruct/mod.rs +++ b/src/datastruct/mod.rs @@ -4,5 +4,4 @@ pub mod stacker; pub use self::fstmap::FstMapBuilder; pub use self::fstmap::FstMap; -pub use self::fstmap::FstKeyIter; pub use self::skip::{SkipListBuilder, SkipList}; diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index e2d0cb3e1..ed575ecd2 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -3,130 +3,22 @@ use core::SegmentReader; use core::Segment; use DocId; use core::SerializableSegment; - use indexer::SegmentSerializer; use postings::PostingsSerializer; -use postings::TermInfo; use postings::Postings; use postings::DocSet; -use std::collections::BinaryHeap; -use datastruct::FstKeyIter; -use schema::{Term, Schema, Field}; +use core::TermIterator; +use fst::Streamer; +use schema::{Schema, Field}; use fastfield::FastFieldSerializer; use store::StoreWriter; use postings::ChainedPostings; use postings::HasLen; use postings::OffsetPostings; use core::SegmentInfo; -use std::cmp::{min, max, Ordering}; +use std::cmp::{min, max}; use std::iter; - -struct PostingsMerger<'a> { - doc_offsets: Vec, - heap: BinaryHeap, - term_streams: Vec>, - readers: &'a [SegmentReader], -} - -#[derive(PartialEq, Eq, Debug)] -struct HeapItem { - term: Term, - segment_ord: usize, -} - -impl PartialOrd for HeapItem { - fn partial_cmp(&self, other: &HeapItem) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for HeapItem { - fn cmp(&self, other: &HeapItem) -> Ordering { - (&other.term, &other.segment_ord).cmp(&(&self.term, &self.segment_ord)) - } -} - -impl<'a> PostingsMerger<'a> { - fn new(readers: &'a [SegmentReader]) -> PostingsMerger<'a> { - let mut doc_offsets: Vec = Vec::new(); - let mut max_doc = 0; - for reader in readers { - doc_offsets.push(max_doc); - max_doc += reader.max_doc(); - }; - let term_streams = readers - .iter() - .map(|reader| reader.term_infos().keys()) - .collect(); - let mut postings_merger = PostingsMerger { - heap: BinaryHeap::new(), - term_streams: term_streams, - doc_offsets: doc_offsets, - readers: readers, - }; - for segment_ord in 0..readers.len() { - postings_merger.push_next_segment_el(segment_ord); - } - postings_merger - } - - // pushes the term_reader associated with the given segment ordinal - // into the heap. - fn push_next_segment_el(&mut self, segment_ord: usize) { - if let Some(term) = self.term_streams[segment_ord].next() { - let it = HeapItem { - term: Term::from(term), - segment_ord: segment_ord, - }; - self.heap.push(it); - } - } - - fn append_segment(&mut self, - heap_item: &HeapItem, - segment_postings_list: &mut Vec>) { - { - - let offset = self.doc_offsets[heap_item.segment_ord]; - let reader = &self.readers[heap_item.segment_ord]; - if let Some(segment_postings) = reader.read_postings_all_info(&heap_item.term) { - let offset_postings = OffsetPostings::new(segment_postings, offset); - segment_postings_list.push(offset_postings); - } - } - self.push_next_segment_el(heap_item.segment_ord); - } - -} - -impl<'a> Iterator for PostingsMerger<'a> { - - type Item = (Term, ChainedPostings<'a>); - - fn next(&mut self,) -> Option<(Term, ChainedPostings<'a>)> { - // TODO remove the Vec allocations - match self.heap.pop() { - Some(heap_it) => { - let mut segment_postings_list = Vec::new(); - self.append_segment(&heap_it, &mut segment_postings_list); - loop { - match self.heap.peek() { - Some(&ref next_heap_it) if next_heap_it.term == heap_it.term => {}, - _ => { break; } - } - let next_heap_it = self.heap.pop().expect("This is only reached if an element was peeked beforehand."); - self.append_segment(&next_heap_it, &mut segment_postings_list); - } - let chained_posting = ChainedPostings::from(segment_postings_list); - Some((heap_it.term, chained_posting)) - }, - None => None - } - } -} - - pub struct IndexMerger { schema: Schema, readers: Vec, @@ -135,17 +27,15 @@ pub struct IndexMerger { struct DeltaPositionComputer { - buffer: Vec + buffer: Vec, } impl DeltaPositionComputer { fn new() -> DeltaPositionComputer { - DeltaPositionComputer { - buffer: iter::repeat(0u32).take(512).collect::>(), - } + DeltaPositionComputer { buffer: iter::repeat(0u32).take(512).collect::>() } } - - fn compute_delta_positions(&mut self, positions: &[u32],) -> &[u32] { + + fn compute_delta_positions(&mut self, positions: &[u32]) -> &[u32] { if positions.len() > self.buffer.len() { self.buffer.resize(positions.len(), 0u32); } @@ -158,8 +48,6 @@ impl DeltaPositionComputer { } } - - impl IndexMerger { pub fn open(schema: Schema, segments: &[Segment]) -> Result { let mut readers = Vec::new(); @@ -172,20 +60,19 @@ impl IndexMerger { Ok(IndexMerger { schema: schema, readers: readers, - segment_info: SegmentInfo { - max_doc: max_doc - }, + segment_info: SegmentInfo { max_doc: max_doc }, }) } - + fn write_fieldnorms(&self, fast_field_serializer: &mut FastFieldSerializer) -> Result<()> { // TODO make sure that works even if the field is never here. - for field in self.schema.fields() - .iter() - .enumerate() - .filter(|&(_, field_entry)| field_entry.is_indexed()) - .map(|(field_id, _)| Field(field_id as u8)) { + for field in self.schema + .fields() + .iter() + .enumerate() + .filter(|&(_, field_entry)| field_entry.is_indexed()) + .map(|(field_id, _)| Field(field_id as u8)) { let mut u32_readers = Vec::new(); let mut min_val = u32::min_value(); let mut max_val = 0; @@ -208,11 +95,12 @@ impl IndexMerger { } fn write_fast_fields(&self, fast_field_serializer: &mut FastFieldSerializer) -> Result<()> { - for field in self.schema.fields() - .iter() - .enumerate() - .filter(|&(_, field_entry)| field_entry.is_u32_fast()) - .map(|(field_id, _)| Field(field_id as u8)) { + for field in self.schema + .fields() + .iter() + .enumerate() + .filter(|&(_, field_entry)| field_entry.is_u32_fast()) + .map(|(field_id, _)| Field(field_id as u8)) { let mut u32_readers = Vec::new(); let mut min_val = u32::min_value(); let mut max_val = 0; @@ -235,16 +123,51 @@ impl IndexMerger { } fn write_postings(&self, postings_serializer: &mut PostingsSerializer) -> Result<()> { - let postings_merger = PostingsMerger::new(&self.readers); + let mut merged_terms = TermIterator::from(&self.readers[..]); let mut delta_position_computer = DeltaPositionComputer::new(); - for (term, mut merged_doc_ids) in postings_merger { - try!(postings_serializer.new_term(&term, merged_doc_ids.len() as DocId)); - while merged_doc_ids.advance() { - let delta_positions: &[u32] = delta_position_computer.compute_delta_positions(merged_doc_ids.positions()); - try!(postings_serializer.write_doc(merged_doc_ids.doc(), merged_doc_ids.term_freq(), delta_positions)); + let mut offsets: Vec = Vec::new(); + let mut max_doc = 0; + for reader in &self.readers { + offsets.push(max_doc); + max_doc += reader.max_doc(); + } + + while let Some((term, segment_ords)) = merged_terms.next() { + // Create the total list of doc ids + // by stacking the doc ids from the different segment. + // + // In the new segments, the doc id from the different + // segment are stacked so that : + // - Segment 0's doc ids become doc id [0, seg.max_doc] + // - Segment 1's doc ids become [seg0.max_doc, seg0.max_doc + seg.max_doc] + // - Segment 2's doc ids become [seg0.max_doc + seg1.max_doc, seg0.max_doc + seg1.max_doc + seg2.max_doc] + // ... + let mut merged_postings = + ChainedPostings::from(segment_ords.iter() + .flat_map(|segment_ord| { + let offset = offsets[*segment_ord]; + self.readers[*segment_ord] + .read_postings_all_info(&term) + .map(|segment_postings| { + OffsetPostings::new(segment_postings, + offset) + }) + }) + .collect::>()); + + // We can now serialize this postings, by pushing each document to the + // postings serializer. + try!(postings_serializer.new_term(&term, merged_postings.len() as DocId)); + while merged_postings.advance() { + let delta_positions: &[u32] = + delta_position_computer.compute_delta_positions(merged_postings.positions()); + try!(postings_serializer.write_doc(merged_postings.doc(), + merged_postings.term_freq(), + delta_positions)); } try!(postings_serializer.close_term()); } + Ok(()) } @@ -284,7 +207,9 @@ mod tests { #[test] fn test_index_merger() { let mut schema_builder = schema::SchemaBuilder::default(); - let text_fieldtype = schema::TextOptions::default().set_indexing_options(TextIndexingOptions::TokenizedWithFreq).set_stored(); + let text_fieldtype = schema::TextOptions::default() + .set_indexing_options(TextIndexingOptions::TokenizedWithFreq) + .set_stored(); let text_field = schema_builder.add_text_field("text", text_fieldtype); let score_fieldtype = schema::U32Options::default().set_fast(); let score_field = schema_builder.add_u32_field("score", score_fieldtype); @@ -346,22 +271,14 @@ mod tests { collector.docs() }; { - assert_eq!( - get_doc_ids(vec!(Term::from_field_text(text_field, "a"))), - vec!(1, 2, 4,) - ); - assert_eq!( - get_doc_ids(vec!(Term::from_field_text(text_field, "af"))), - vec!(0, 3,) - ); - assert_eq!( - get_doc_ids(vec!(Term::from_field_text(text_field, "g"))), - vec!(4,) - ); - assert_eq!( - get_doc_ids(vec!(Term::from_field_text(text_field, "b"))), - vec!(0, 1, 2, 3, 4,) - ); + assert_eq!(get_doc_ids(vec![Term::from_field_text(text_field, "a")]), + vec!(1, 2, 4,)); + assert_eq!(get_doc_ids(vec![Term::from_field_text(text_field, "af")]), + vec!(0, 3,)); + assert_eq!(get_doc_ids(vec![Term::from_field_text(text_field, "g")]), + vec!(4,)); + assert_eq!(get_doc_ids(vec![Term::from_field_text(text_field, "b")]), + vec!(0, 1, 2, 3, 4,)); } { let doc = searcher.doc(&DocAddress(0, 0)).unwrap(); @@ -390,10 +307,8 @@ mod tests { assert!(searcher.search(&query, &mut collector).is_ok()); collector.vals().clone() }; - assert_eq!( - get_fast_vals(vec!(Term::from_field_text(text_field, "a"))), - vec!(5, 7, 13,) - ); + assert_eq!(get_fast_vals(vec![Term::from_field_text(text_field, "a")]), + vec!(5, 7, 13,)); } } } diff --git a/src/lib.rs b/src/lib.rs index 8f6c78902..e64a5189d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -110,8 +110,7 @@ pub use self::common::TimerTree; pub use postings::DocSet; pub use postings::Postings; pub use postings::SegmentPostingsOption; - - +pub use core::TermIterator; /// Tantivy's makes it possible to personalize when /// the indexer should merge its segments diff --git a/src/schema/field.rs b/src/schema/field.rs index b9bca998d..d3af27441 100644 --- a/src/schema/field.rs +++ b/src/schema/field.rs @@ -8,7 +8,8 @@ use common::BinarySerializable; /// The schema is in charge of holding mapping between field names /// to `Field` objects. /// -/// Because the field id is a `u8`, tantivy can only have at most `256` fields +/// Because the field id is a `u8`, tantivy can only have at most `255` fields. +/// Value 255 is reserved. #[derive(Copy,Clone,Debug,PartialEq,PartialOrd,Eq,Ord,Hash, RustcEncodable, RustcDecodable)] pub struct Field(pub u8); diff --git a/src/schema/term.rs b/src/schema/term.rs index 777e3044c..4cb2adab4 100644 --- a/src/schema/term.rs +++ b/src/schema/term.rs @@ -8,9 +8,11 @@ use super::Field; /// Term represents the value that the token can take. /// /// It actually wraps a `Vec`. +/// TODO remove pub #[derive(Clone, PartialEq, PartialOrd, Ord, Eq, Hash)] pub struct Term(Vec); + impl Term { /// Pre-allocate a term buffer. @@ -63,6 +65,14 @@ impl Term { Term(buffer) } + /// Returns the serialized value associated to the field. + /// If the term is a string, its value is utf-8 encoded. + /// If the term is a u32, its value is encoded according + /// to `byteorder::LittleEndian`. + pub fn value(&self) -> &[u8] { + &self.0[1..] + } + /// Set the texts only, keeping the field untouched. pub fn set_text(&mut self, text: &str) { self.0.resize(1, 0u8);