From ca5f3e1d4635ba263831bced5db47470bf482178 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 16 Dec 2016 23:20:05 +0100 Subject: [PATCH 1/4] issue/67 First stab. Iterator working. --- src/core/mod.rs | 4 +- src/core/searcher.rs | 12 +- src/core/term_iterator.rs | 191 ++++++++++++++++++++++++++++++ src/datastruct/fstmap.rs | 23 +--- src/datastruct/mod.rs | 1 - src/indexer/merger.rs | 237 ++++++++++++-------------------------- src/lib.rs | 3 +- src/schema/field.rs | 3 +- src/schema/term.rs | 10 ++ 9 files changed, 296 insertions(+), 188 deletions(-) create mode 100644 src/core/term_iterator.rs diff --git a/src/core/mod.rs b/src/core/mod.rs index f16911302..2dfac69d1 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -7,7 +7,7 @@ mod segment_component; mod segment; mod index_meta; mod pool; - +mod term_iterator; use std::path::PathBuf; @@ -19,7 +19,7 @@ pub use self::segment::SegmentInfo; pub use self::segment::SerializableSegment; pub use self::index::Index; pub use self::index_meta::{IndexMeta, SegmentMeta}; - +pub use self::term_iterator::TermIterator; lazy_static! { pub static ref META_FILEPATH: PathBuf = PathBuf::from("meta.json"); diff --git a/src/core/searcher.rs b/src/core/searcher.rs index b1b54a0e9..548ffc2cb 100644 --- a/src/core/searcher.rs +++ b/src/core/searcher.rs @@ -7,6 +7,7 @@ use query::Query; use DocId; use DocAddress; use schema::Term; +use core::TermIterator; /// Holds a list of `SegmentReader`s ready for search. @@ -47,9 +48,18 @@ impl Searcher { .map(|segment_reader| segment_reader.doc_freq(term)) .fold(0u32, |acc, val| acc + val) } + + /// Returns a Stream over all of the sorted unique terms of + /// the searcher. + /// + /// This includes all of the fields from all of the segment_readers. + /// See [TermIterator](struct.TermIterator.html). + pub fn terms<'a>(&'a self) -> TermIterator<'a> { + TermIterator::from(self.segment_readers()) + } /// Return the list of segment readers - pub fn segment_readers(&self,) -> &Vec { + pub fn segment_readers(&self,) -> &[SegmentReader] { &self.segment_readers } diff --git a/src/core/term_iterator.rs b/src/core/term_iterator.rs new file mode 100644 index 000000000..108bc03b6 --- /dev/null +++ b/src/core/term_iterator.rs @@ -0,0 +1,191 @@ +use fst::Streamer; +use std::mem; +use std::collections::BinaryHeap; +use fst::map::Keys; +use schema::Term; +use core::SegmentReader; +use std::cmp::Ordering; + + +static EMPTY: [u8; 0] = []; + +#[derive(PartialEq, Eq, Debug)] +struct HeapItem { + term: Term, + segment_ord: usize, +} + +impl PartialOrd for HeapItem { + fn partial_cmp(&self, other: &HeapItem) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for HeapItem { + fn cmp(&self, other: &HeapItem) -> Ordering { + (&other.term, &other.segment_ord).cmp(&(&self.term, &self.segment_ord)) + } +} + +/// Given a list of sorted term streams, +/// returns an iterator over sorted unique terms. +/// +/// The item yield is actually a pair with +/// - the term +/// - a slice with the ordinal of the segments containing +/// the terms. +pub struct TermIterator<'a> { + key_streams: Vec>, + heap: BinaryHeap, + // Buffer hosting the list of segment ordinals containing + // the current term. + current_term: Term, + current_segment_ords: Vec, +} + +impl<'a> TermIterator<'a> { + fn new(key_streams: Vec>) -> TermIterator<'a> { + let key_streams_len = key_streams.len(); + let mut term_iterator = TermIterator { + key_streams: key_streams, + heap: BinaryHeap::new(), + current_term: Term::from(&EMPTY[..]), + current_segment_ords: vec![], + }; + for segment_ord in 0..key_streams_len { + term_iterator.push_next_segment_el(segment_ord); + } + term_iterator + } + + fn push_next_segment_el(&mut self, segment_ord: usize) { + self.current_segment_ords.push(segment_ord); + if let Some(term) = self.key_streams[segment_ord].next() { + self.heap.push(HeapItem { + term: Term::from(term), + segment_ord: segment_ord, + }); + } + } +} + +impl<'a, 'f> Streamer<'a> for TermIterator<'f> { + type Item = (&'a Term, &'a [usize]); + + fn next(&'a mut self) -> Option { + self.current_segment_ords.clear(); + self.heap + .pop() + .map(move |mut head| { + mem::swap(&mut self.current_term, &mut head.term); + self.push_next_segment_el(head.segment_ord); + loop { + match self.heap.peek() { + Some(&ref next_heap_it) if next_heap_it.term == self.current_term => {} + _ => { + break; + } + } + let next_heap_it = self.heap + .pop() + .expect("This is only reached if an element was \ + peeked beforehand."); + self.push_next_segment_el(next_heap_it.segment_ord); + } + (&self.current_term, self.current_segment_ords.as_slice()) + }) + } +} + +impl<'a> From<&'a [SegmentReader]> for TermIterator<'a> { + fn from(segment_readers: &'a [SegmentReader]) -> TermIterator<'a> { + TermIterator::new(segment_readers.iter() + .map(|reader| reader.term_infos().keys()) + .collect()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use schema::{SchemaBuilder, Document, TEXT}; + use core::Index; + + #[test] + fn test_term_iterator() { + let mut schema_builder = SchemaBuilder::default(); + let text_field = schema_builder.add_text_field("text", TEXT); + let index = Index::create_in_ram(schema_builder.build()); + { + let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); + { + // writing the segment + { + let mut doc = Document::default(); + doc.add_text(text_field, "a b d f"); + index_writer.add_document(doc).unwrap(); + } + index_writer.commit().unwrap(); + } + { + // writing the segment + { + let mut doc = Document::default(); + doc.add_text(text_field, "a b c d f"); + index_writer.add_document(doc).unwrap(); + } + index_writer.commit().unwrap(); + } + { + // writing the segment + { + let mut doc = Document::default(); + doc.add_text(text_field, "e f"); + index_writer.add_document(doc).unwrap(); + } + index_writer.commit().unwrap(); + } + } + let searcher = index.searcher(); + let mut term_it = searcher.terms(); + { + + let (term, segments) = term_it.next().unwrap(); + assert_eq!(term.value(), "a".as_bytes()); + let expected_segments = [0, 1]; + assert_eq!(segments, &expected_segments); + + } + { + let (term, segments): (&Term, &[usize]) = term_it.next().unwrap(); + assert_eq!(term.value(), "b".as_bytes()); + let expected_segments = [0, 1]; + assert_eq!(segments, &expected_segments); + } + { + let (ref term, ref segments) = term_it.next().unwrap(); + assert_eq!(term.value(), "c".as_bytes()); + let expected_segments = [1]; + assert_eq!(segments, &expected_segments); + } + { + let (term, segments) = term_it.next().unwrap(); + assert_eq!(term.value(), "d".as_bytes()); + let expected_segments = [0, 1]; + assert_eq!(segments, &expected_segments); + } + { + let (term, segments) = term_it.next().unwrap(); + assert_eq!(term.value(), "e".as_bytes()); + let expected_segments = [2]; + assert_eq!(segments, &expected_segments); + } + { + let (term, segments) = term_it.next().unwrap(); + assert_eq!(term.value(), "f".as_bytes()); + let expected_segments = [0, 1, 2]; + assert_eq!(segments, &expected_segments); + } + } + +} \ No newline at end of file diff --git a/src/datastruct/fstmap.rs b/src/datastruct/fstmap.rs index 096ed8bed..1d4a420ed 100644 --- a/src/datastruct/fstmap.rs +++ b/src/datastruct/fstmap.rs @@ -4,7 +4,6 @@ use std::io; use std::io::Write; use fst; use fst::raw::Fst; -use fst::Streamer; use directory::ReadOnlySource; use common::BinarySerializable; @@ -66,27 +65,10 @@ fn open_fst_index(source: ReadOnlySource) -> io::Result { })) } -pub struct FstKeyIter<'a, V: 'static + BinarySerializable> { - streamer: fst::map::Stream<'a>, - __phantom__: PhantomData -} - -impl<'a, V: 'static + BinarySerializable> FstKeyIter<'a, V> { - pub fn next(&mut self) -> Option<(&[u8])> { - self.streamer - .next() - .map(|(k, _)| k) - } -} - - impl FstMap { - pub fn keys(&self,) -> FstKeyIter { - FstKeyIter { - streamer: self.fst_index.stream(), - __phantom__: PhantomData, - } + pub fn keys(&self,) -> fst::map::Keys { + self.fst_index.keys() } pub fn from_source(source: ReadOnlySource) -> io::Result> { @@ -123,6 +105,7 @@ mod tests { use super::*; use directory::{RAMDirectory, Directory}; use std::path::PathBuf; + use fst::Streamer; #[test] fn test_fstmap() { diff --git a/src/datastruct/mod.rs b/src/datastruct/mod.rs index 85489973b..f026afa7f 100644 --- a/src/datastruct/mod.rs +++ b/src/datastruct/mod.rs @@ -4,5 +4,4 @@ pub mod stacker; pub use self::fstmap::FstMapBuilder; pub use self::fstmap::FstMap; -pub use self::fstmap::FstKeyIter; pub use self::skip::{SkipListBuilder, SkipList}; diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index e2d0cb3e1..ed575ecd2 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -3,130 +3,22 @@ use core::SegmentReader; use core::Segment; use DocId; use core::SerializableSegment; - use indexer::SegmentSerializer; use postings::PostingsSerializer; -use postings::TermInfo; use postings::Postings; use postings::DocSet; -use std::collections::BinaryHeap; -use datastruct::FstKeyIter; -use schema::{Term, Schema, Field}; +use core::TermIterator; +use fst::Streamer; +use schema::{Schema, Field}; use fastfield::FastFieldSerializer; use store::StoreWriter; use postings::ChainedPostings; use postings::HasLen; use postings::OffsetPostings; use core::SegmentInfo; -use std::cmp::{min, max, Ordering}; +use std::cmp::{min, max}; use std::iter; - -struct PostingsMerger<'a> { - doc_offsets: Vec, - heap: BinaryHeap, - term_streams: Vec>, - readers: &'a [SegmentReader], -} - -#[derive(PartialEq, Eq, Debug)] -struct HeapItem { - term: Term, - segment_ord: usize, -} - -impl PartialOrd for HeapItem { - fn partial_cmp(&self, other: &HeapItem) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for HeapItem { - fn cmp(&self, other: &HeapItem) -> Ordering { - (&other.term, &other.segment_ord).cmp(&(&self.term, &self.segment_ord)) - } -} - -impl<'a> PostingsMerger<'a> { - fn new(readers: &'a [SegmentReader]) -> PostingsMerger<'a> { - let mut doc_offsets: Vec = Vec::new(); - let mut max_doc = 0; - for reader in readers { - doc_offsets.push(max_doc); - max_doc += reader.max_doc(); - }; - let term_streams = readers - .iter() - .map(|reader| reader.term_infos().keys()) - .collect(); - let mut postings_merger = PostingsMerger { - heap: BinaryHeap::new(), - term_streams: term_streams, - doc_offsets: doc_offsets, - readers: readers, - }; - for segment_ord in 0..readers.len() { - postings_merger.push_next_segment_el(segment_ord); - } - postings_merger - } - - // pushes the term_reader associated with the given segment ordinal - // into the heap. - fn push_next_segment_el(&mut self, segment_ord: usize) { - if let Some(term) = self.term_streams[segment_ord].next() { - let it = HeapItem { - term: Term::from(term), - segment_ord: segment_ord, - }; - self.heap.push(it); - } - } - - fn append_segment(&mut self, - heap_item: &HeapItem, - segment_postings_list: &mut Vec>) { - { - - let offset = self.doc_offsets[heap_item.segment_ord]; - let reader = &self.readers[heap_item.segment_ord]; - if let Some(segment_postings) = reader.read_postings_all_info(&heap_item.term) { - let offset_postings = OffsetPostings::new(segment_postings, offset); - segment_postings_list.push(offset_postings); - } - } - self.push_next_segment_el(heap_item.segment_ord); - } - -} - -impl<'a> Iterator for PostingsMerger<'a> { - - type Item = (Term, ChainedPostings<'a>); - - fn next(&mut self,) -> Option<(Term, ChainedPostings<'a>)> { - // TODO remove the Vec allocations - match self.heap.pop() { - Some(heap_it) => { - let mut segment_postings_list = Vec::new(); - self.append_segment(&heap_it, &mut segment_postings_list); - loop { - match self.heap.peek() { - Some(&ref next_heap_it) if next_heap_it.term == heap_it.term => {}, - _ => { break; } - } - let next_heap_it = self.heap.pop().expect("This is only reached if an element was peeked beforehand."); - self.append_segment(&next_heap_it, &mut segment_postings_list); - } - let chained_posting = ChainedPostings::from(segment_postings_list); - Some((heap_it.term, chained_posting)) - }, - None => None - } - } -} - - pub struct IndexMerger { schema: Schema, readers: Vec, @@ -135,17 +27,15 @@ pub struct IndexMerger { struct DeltaPositionComputer { - buffer: Vec + buffer: Vec, } impl DeltaPositionComputer { fn new() -> DeltaPositionComputer { - DeltaPositionComputer { - buffer: iter::repeat(0u32).take(512).collect::>(), - } + DeltaPositionComputer { buffer: iter::repeat(0u32).take(512).collect::>() } } - - fn compute_delta_positions(&mut self, positions: &[u32],) -> &[u32] { + + fn compute_delta_positions(&mut self, positions: &[u32]) -> &[u32] { if positions.len() > self.buffer.len() { self.buffer.resize(positions.len(), 0u32); } @@ -158,8 +48,6 @@ impl DeltaPositionComputer { } } - - impl IndexMerger { pub fn open(schema: Schema, segments: &[Segment]) -> Result { let mut readers = Vec::new(); @@ -172,20 +60,19 @@ impl IndexMerger { Ok(IndexMerger { schema: schema, readers: readers, - segment_info: SegmentInfo { - max_doc: max_doc - }, + segment_info: SegmentInfo { max_doc: max_doc }, }) } - + fn write_fieldnorms(&self, fast_field_serializer: &mut FastFieldSerializer) -> Result<()> { // TODO make sure that works even if the field is never here. - for field in self.schema.fields() - .iter() - .enumerate() - .filter(|&(_, field_entry)| field_entry.is_indexed()) - .map(|(field_id, _)| Field(field_id as u8)) { + for field in self.schema + .fields() + .iter() + .enumerate() + .filter(|&(_, field_entry)| field_entry.is_indexed()) + .map(|(field_id, _)| Field(field_id as u8)) { let mut u32_readers = Vec::new(); let mut min_val = u32::min_value(); let mut max_val = 0; @@ -208,11 +95,12 @@ impl IndexMerger { } fn write_fast_fields(&self, fast_field_serializer: &mut FastFieldSerializer) -> Result<()> { - for field in self.schema.fields() - .iter() - .enumerate() - .filter(|&(_, field_entry)| field_entry.is_u32_fast()) - .map(|(field_id, _)| Field(field_id as u8)) { + for field in self.schema + .fields() + .iter() + .enumerate() + .filter(|&(_, field_entry)| field_entry.is_u32_fast()) + .map(|(field_id, _)| Field(field_id as u8)) { let mut u32_readers = Vec::new(); let mut min_val = u32::min_value(); let mut max_val = 0; @@ -235,16 +123,51 @@ impl IndexMerger { } fn write_postings(&self, postings_serializer: &mut PostingsSerializer) -> Result<()> { - let postings_merger = PostingsMerger::new(&self.readers); + let mut merged_terms = TermIterator::from(&self.readers[..]); let mut delta_position_computer = DeltaPositionComputer::new(); - for (term, mut merged_doc_ids) in postings_merger { - try!(postings_serializer.new_term(&term, merged_doc_ids.len() as DocId)); - while merged_doc_ids.advance() { - let delta_positions: &[u32] = delta_position_computer.compute_delta_positions(merged_doc_ids.positions()); - try!(postings_serializer.write_doc(merged_doc_ids.doc(), merged_doc_ids.term_freq(), delta_positions)); + let mut offsets: Vec = Vec::new(); + let mut max_doc = 0; + for reader in &self.readers { + offsets.push(max_doc); + max_doc += reader.max_doc(); + } + + while let Some((term, segment_ords)) = merged_terms.next() { + // Create the total list of doc ids + // by stacking the doc ids from the different segment. + // + // In the new segments, the doc id from the different + // segment are stacked so that : + // - Segment 0's doc ids become doc id [0, seg.max_doc] + // - Segment 1's doc ids become [seg0.max_doc, seg0.max_doc + seg.max_doc] + // - Segment 2's doc ids become [seg0.max_doc + seg1.max_doc, seg0.max_doc + seg1.max_doc + seg2.max_doc] + // ... + let mut merged_postings = + ChainedPostings::from(segment_ords.iter() + .flat_map(|segment_ord| { + let offset = offsets[*segment_ord]; + self.readers[*segment_ord] + .read_postings_all_info(&term) + .map(|segment_postings| { + OffsetPostings::new(segment_postings, + offset) + }) + }) + .collect::>()); + + // We can now serialize this postings, by pushing each document to the + // postings serializer. + try!(postings_serializer.new_term(&term, merged_postings.len() as DocId)); + while merged_postings.advance() { + let delta_positions: &[u32] = + delta_position_computer.compute_delta_positions(merged_postings.positions()); + try!(postings_serializer.write_doc(merged_postings.doc(), + merged_postings.term_freq(), + delta_positions)); } try!(postings_serializer.close_term()); } + Ok(()) } @@ -284,7 +207,9 @@ mod tests { #[test] fn test_index_merger() { let mut schema_builder = schema::SchemaBuilder::default(); - let text_fieldtype = schema::TextOptions::default().set_indexing_options(TextIndexingOptions::TokenizedWithFreq).set_stored(); + let text_fieldtype = schema::TextOptions::default() + .set_indexing_options(TextIndexingOptions::TokenizedWithFreq) + .set_stored(); let text_field = schema_builder.add_text_field("text", text_fieldtype); let score_fieldtype = schema::U32Options::default().set_fast(); let score_field = schema_builder.add_u32_field("score", score_fieldtype); @@ -346,22 +271,14 @@ mod tests { collector.docs() }; { - assert_eq!( - get_doc_ids(vec!(Term::from_field_text(text_field, "a"))), - vec!(1, 2, 4,) - ); - assert_eq!( - get_doc_ids(vec!(Term::from_field_text(text_field, "af"))), - vec!(0, 3,) - ); - assert_eq!( - get_doc_ids(vec!(Term::from_field_text(text_field, "g"))), - vec!(4,) - ); - assert_eq!( - get_doc_ids(vec!(Term::from_field_text(text_field, "b"))), - vec!(0, 1, 2, 3, 4,) - ); + assert_eq!(get_doc_ids(vec![Term::from_field_text(text_field, "a")]), + vec!(1, 2, 4,)); + assert_eq!(get_doc_ids(vec![Term::from_field_text(text_field, "af")]), + vec!(0, 3,)); + assert_eq!(get_doc_ids(vec![Term::from_field_text(text_field, "g")]), + vec!(4,)); + assert_eq!(get_doc_ids(vec![Term::from_field_text(text_field, "b")]), + vec!(0, 1, 2, 3, 4,)); } { let doc = searcher.doc(&DocAddress(0, 0)).unwrap(); @@ -390,10 +307,8 @@ mod tests { assert!(searcher.search(&query, &mut collector).is_ok()); collector.vals().clone() }; - assert_eq!( - get_fast_vals(vec!(Term::from_field_text(text_field, "a"))), - vec!(5, 7, 13,) - ); + assert_eq!(get_fast_vals(vec![Term::from_field_text(text_field, "a")]), + vec!(5, 7, 13,)); } } } diff --git a/src/lib.rs b/src/lib.rs index 8f6c78902..e64a5189d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -110,8 +110,7 @@ pub use self::common::TimerTree; pub use postings::DocSet; pub use postings::Postings; pub use postings::SegmentPostingsOption; - - +pub use core::TermIterator; /// Tantivy's makes it possible to personalize when /// the indexer should merge its segments diff --git a/src/schema/field.rs b/src/schema/field.rs index b9bca998d..d3af27441 100644 --- a/src/schema/field.rs +++ b/src/schema/field.rs @@ -8,7 +8,8 @@ use common::BinarySerializable; /// The schema is in charge of holding mapping between field names /// to `Field` objects. /// -/// Because the field id is a `u8`, tantivy can only have at most `256` fields +/// Because the field id is a `u8`, tantivy can only have at most `255` fields. +/// Value 255 is reserved. #[derive(Copy,Clone,Debug,PartialEq,PartialOrd,Eq,Ord,Hash, RustcEncodable, RustcDecodable)] pub struct Field(pub u8); diff --git a/src/schema/term.rs b/src/schema/term.rs index 777e3044c..4cb2adab4 100644 --- a/src/schema/term.rs +++ b/src/schema/term.rs @@ -8,9 +8,11 @@ use super::Field; /// Term represents the value that the token can take. /// /// It actually wraps a `Vec`. +/// TODO remove pub #[derive(Clone, PartialEq, PartialOrd, Ord, Eq, Hash)] pub struct Term(Vec); + impl Term { /// Pre-allocate a term buffer. @@ -63,6 +65,14 @@ impl Term { Term(buffer) } + /// Returns the serialized value associated to the field. + /// If the term is a string, its value is utf-8 encoded. + /// If the term is a u32, its value is encoded according + /// to `byteorder::LittleEndian`. + pub fn value(&self) -> &[u8] { + &self.0[1..] + } + /// Set the texts only, keeping the field untouched. pub fn set_text(&mut self, text: &str) { self.0.resize(1, 0u8); From 4d7d201f21e993eaf8e266ef56f8d6fb7be85f62 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Sat, 17 Dec 2016 09:38:57 +0100 Subject: [PATCH 2/4] Issue #67 - Removed segment ord array from term iteration. This was probably an early optimization. --- src/core/searcher.rs | 5 ++- src/core/term_iterator.rs | 69 ++++++++++----------------------------- src/indexer/merger.rs | 25 +++++++------- src/schema/term.rs | 16 +++++++-- 4 files changed, 49 insertions(+), 66 deletions(-) diff --git a/src/core/searcher.rs b/src/core/searcher.rs index 548ffc2cb..0d99a2897 100644 --- a/src/core/searcher.rs +++ b/src/core/searcher.rs @@ -54,10 +54,13 @@ impl Searcher { /// /// This includes all of the fields from all of the segment_readers. /// See [TermIterator](struct.TermIterator.html). + /// + /// # Warning + /// This API is very likely to change in the future. pub fn terms<'a>(&'a self) -> TermIterator<'a> { TermIterator::from(self.segment_readers()) } - + /// Return the list of segment readers pub fn segment_readers(&self,) -> &[SegmentReader] { &self.segment_readers diff --git a/src/core/term_iterator.rs b/src/core/term_iterator.rs index 108bc03b6..3a9adf2d5 100644 --- a/src/core/term_iterator.rs +++ b/src/core/term_iterator.rs @@ -2,12 +2,12 @@ use fst::Streamer; use std::mem; use std::collections::BinaryHeap; use fst::map::Keys; +use schema::Field; use schema::Term; use core::SegmentReader; use std::cmp::Ordering; -static EMPTY: [u8; 0] = []; #[derive(PartialEq, Eq, Debug)] struct HeapItem { @@ -49,7 +49,7 @@ impl<'a> TermIterator<'a> { let mut term_iterator = TermIterator { key_streams: key_streams, heap: BinaryHeap::new(), - current_term: Term::from(&EMPTY[..]), + current_term: Term::from_field_text(Field(0), ""), current_segment_ords: vec![], }; for segment_ord in 0..key_streams_len { @@ -70,7 +70,7 @@ impl<'a> TermIterator<'a> { } impl<'a, 'f> Streamer<'a> for TermIterator<'f> { - type Item = (&'a Term, &'a [usize]); + type Item = &'a Term; fn next(&'a mut self) -> Option { self.current_segment_ords.clear(); @@ -82,26 +82,24 @@ impl<'a, 'f> Streamer<'a> for TermIterator<'f> { loop { match self.heap.peek() { Some(&ref next_heap_it) if next_heap_it.term == self.current_term => {} - _ => { - break; - } + _ => { break; } } - let next_heap_it = self.heap - .pop() - .expect("This is only reached if an element was \ - peeked beforehand."); + let next_heap_it = self.heap.pop().unwrap(); // safe : we peeked beforehand self.push_next_segment_el(next_heap_it.segment_ord); } - (&self.current_term, self.current_segment_ords.as_slice()) + &self.current_term }) } } impl<'a> From<&'a [SegmentReader]> for TermIterator<'a> { fn from(segment_readers: &'a [SegmentReader]) -> TermIterator<'a> { - TermIterator::new(segment_readers.iter() - .map(|reader| reader.term_infos().keys()) - .collect()) + TermIterator::new( + segment_readers + .iter() + .map(|reader| reader.term_infos().keys()) + .collect() + ) } } @@ -148,44 +146,13 @@ mod tests { } let searcher = index.searcher(); let mut term_it = searcher.terms(); - { - - let (term, segments) = term_it.next().unwrap(); - assert_eq!(term.value(), "a".as_bytes()); - let expected_segments = [0, 1]; - assert_eq!(segments, &expected_segments); - - } - { - let (term, segments): (&Term, &[usize]) = term_it.next().unwrap(); - assert_eq!(term.value(), "b".as_bytes()); - let expected_segments = [0, 1]; - assert_eq!(segments, &expected_segments); - } - { - let (ref term, ref segments) = term_it.next().unwrap(); - assert_eq!(term.value(), "c".as_bytes()); - let expected_segments = [1]; - assert_eq!(segments, &expected_segments); - } - { - let (term, segments) = term_it.next().unwrap(); - assert_eq!(term.value(), "d".as_bytes()); - let expected_segments = [0, 1]; - assert_eq!(segments, &expected_segments); - } - { - let (term, segments) = term_it.next().unwrap(); - assert_eq!(term.value(), "e".as_bytes()); - let expected_segments = [2]; - assert_eq!(segments, &expected_segments); - } - { - let (term, segments) = term_it.next().unwrap(); - assert_eq!(term.value(), "f".as_bytes()); - let expected_segments = [0, 1, 2]; - assert_eq!(segments, &expected_segments); + let mut terms = String::new(); + while let Some(term) = term_it.next() { + unsafe { + terms.push_str(term.text()); + } } + assert_eq!(terms, "abcdef"); } } \ No newline at end of file diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index ed575ecd2..4bc7e049e 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -132,7 +132,7 @@ impl IndexMerger { max_doc += reader.max_doc(); } - while let Some((term, segment_ords)) = merged_terms.next() { + while let Some(term) = merged_terms.next() { // Create the total list of doc ids // by stacking the doc ids from the different segment. // @@ -143,17 +143,18 @@ impl IndexMerger { // - Segment 2's doc ids become [seg0.max_doc + seg1.max_doc, seg0.max_doc + seg1.max_doc + seg2.max_doc] // ... let mut merged_postings = - ChainedPostings::from(segment_ords.iter() - .flat_map(|segment_ord| { - let offset = offsets[*segment_ord]; - self.readers[*segment_ord] - .read_postings_all_info(&term) - .map(|segment_postings| { - OffsetPostings::new(segment_postings, - offset) - }) - }) - .collect::>()); + ChainedPostings::from( + self.readers + .iter() + .enumerate() + .flat_map(|(segment_ord, reader)| { + let offset = offsets[segment_ord]; + reader + .read_postings_all_info(&term) + .map(|segment_postings| OffsetPostings::new(segment_postings, offset)) + }) + .collect::>() + ); // We can now serialize this postings, by pushing each document to the // postings serializer. diff --git a/src/schema/term.rs b/src/schema/term.rs index 4cb2adab4..c0c5009fe 100644 --- a/src/schema/term.rs +++ b/src/schema/term.rs @@ -2,7 +2,7 @@ use std::fmt; use common::BinarySerializable; use super::Field; - +use std::str; /// Term represents the value that the token can take. @@ -65,7 +65,9 @@ impl Term { Term(buffer) } - /// Returns the serialized value associated to the field. + /// Returns the serialized value of the term. + /// (this does not include the field.) + /// /// If the term is a string, its value is utf-8 encoded. /// If the term is a u32, its value is encoded according /// to `byteorder::LittleEndian`. @@ -73,6 +75,16 @@ impl Term { &self.0[1..] } + /// Returns the text associated with the term. + /// + /// # Panics + /// If the value is not valid utf-8. This may happen + /// if the index is corrupted or if you try to + /// call this method on a non-string type. + pub unsafe fn text(&self) -> &str { + str::from_utf8_unchecked(self.value()) + } + /// Set the texts only, keeping the field untouched. pub fn set_text(&mut self, text: &str) { self.0.resize(1, 0u8); From 98cdc834289812744e1a87aba09b1191e3e723ef Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Sun, 18 Dec 2016 11:57:28 +0100 Subject: [PATCH 3/4] Issue #67 Removing afterwards. --- src/core/term_iterator.rs | 33 +++++++++++++-------------------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/src/core/term_iterator.rs b/src/core/term_iterator.rs index 3a9adf2d5..fb2d70517 100644 --- a/src/core/term_iterator.rs +++ b/src/core/term_iterator.rs @@ -8,7 +8,6 @@ use core::SegmentReader; use std::cmp::Ordering; - #[derive(PartialEq, Eq, Debug)] struct HeapItem { term: Term, @@ -46,25 +45,22 @@ pub struct TermIterator<'a> { impl<'a> TermIterator<'a> { fn new(key_streams: Vec>) -> TermIterator<'a> { let key_streams_len = key_streams.len(); - let mut term_iterator = TermIterator { + TermIterator { key_streams: key_streams, heap: BinaryHeap::new(), current_term: Term::from_field_text(Field(0), ""), - current_segment_ords: vec![], - }; - for segment_ord in 0..key_streams_len { - term_iterator.push_next_segment_el(segment_ord); + current_segment_ords: (0..key_streams_len).collect(), } - term_iterator } - fn push_next_segment_el(&mut self, segment_ord: usize) { - self.current_segment_ords.push(segment_ord); - if let Some(term) = self.key_streams[segment_ord].next() { - self.heap.push(HeapItem { - term: Term::from(term), - segment_ord: segment_ord, - }); + fn advance_segments(&mut self) { + for segment_ord in self.current_segment_ords.drain(..) { + if let Some(term) = self.key_streams[segment_ord].next() { + self.heap.push(HeapItem { + term: Term::from(term), + segment_ord: segment_ord, + }); + } } } } @@ -73,19 +69,19 @@ impl<'a, 'f> Streamer<'a> for TermIterator<'f> { type Item = &'a Term; fn next(&'a mut self) -> Option { - self.current_segment_ords.clear(); + self.advance_segments(); self.heap .pop() .map(move |mut head| { mem::swap(&mut self.current_term, &mut head.term); - self.push_next_segment_el(head.segment_ord); + self.current_segment_ords.push(head.segment_ord); loop { match self.heap.peek() { Some(&ref next_heap_it) if next_heap_it.term == self.current_term => {} _ => { break; } } let next_heap_it = self.heap.pop().unwrap(); // safe : we peeked beforehand - self.push_next_segment_el(next_heap_it.segment_ord); + self.current_segment_ords.push(next_heap_it.segment_ord); } &self.current_term }) @@ -117,7 +113,6 @@ mod tests { { let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); { - // writing the segment { let mut doc = Document::default(); doc.add_text(text_field, "a b d f"); @@ -126,7 +121,6 @@ mod tests { index_writer.commit().unwrap(); } { - // writing the segment { let mut doc = Document::default(); doc.add_text(text_field, "a b c d f"); @@ -135,7 +129,6 @@ mod tests { index_writer.commit().unwrap(); } { - // writing the segment { let mut doc = Document::default(); doc.add_text(text_field, "e f"); From d3d34be1676c932b7ef3ef9caaf377cae1420144 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 20 Dec 2016 11:22:34 +0100 Subject: [PATCH 4/4] issue/67 Added a advance interface to the term iterator --- src/core/term_iterator.rs | 65 +++++++++++++++++++++++++++++---------- src/indexer/merger.rs | 13 ++++---- src/schema/term.rs | 1 - 3 files changed, 56 insertions(+), 23 deletions(-) diff --git a/src/core/term_iterator.rs b/src/core/term_iterator.rs index fb2d70517..3a5e259f7 100644 --- a/src/core/term_iterator.rs +++ b/src/core/term_iterator.rs @@ -53,6 +53,49 @@ impl<'a> TermIterator<'a> { } } + /// Advance the term iterator to the next term. + /// Returns true if there is indeed another term + /// False if there is none. + pub fn advance(&mut self) -> bool { + self.advance_segments(); + if let Some(mut head) = self.heap.pop() { + mem::swap(&mut self.current_term, &mut head.term); + self.current_segment_ords.push(head.segment_ord); + loop { + match self.heap.peek() { + Some(&ref next_heap_it) if next_heap_it.term == self.current_term => {} + _ => { break; } + } + let next_heap_it = self.heap.pop().unwrap(); // safe : we peeked beforehand + self.current_segment_ords.push(next_heap_it.segment_ord); + } + true + } + else { + false + } + } + + + /// Returns the current term. + /// + /// This method may be called + /// iff advance() has been called before + /// and "true" was returned. + pub fn term(&self) -> &Term { + &self.current_term + } + + /// Returns the sorted list of segment ordinals + /// that include the current term. + /// + /// This method may be called + /// iff advance() has been called before + /// and "true" was returned. + pub fn segment_ords(&self) -> &[usize]{ + &self.current_segment_ords[..] + } + fn advance_segments(&mut self) { for segment_ord in self.current_segment_ords.drain(..) { if let Some(term) = self.key_streams[segment_ord].next() { @@ -69,22 +112,12 @@ impl<'a, 'f> Streamer<'a> for TermIterator<'f> { type Item = &'a Term; fn next(&'a mut self) -> Option { - self.advance_segments(); - self.heap - .pop() - .map(move |mut head| { - mem::swap(&mut self.current_term, &mut head.term); - self.current_segment_ords.push(head.segment_ord); - loop { - match self.heap.peek() { - Some(&ref next_heap_it) if next_heap_it.term == self.current_term => {} - _ => { break; } - } - let next_heap_it = self.heap.pop().unwrap(); // safe : we peeked beforehand - self.current_segment_ords.push(next_heap_it.segment_ord); - } - &self.current_term - }) + if self.advance() { + Some(&self.current_term) + } + else { + None + } } } diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 4bc7e049e..4e616d5f0 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -8,7 +8,6 @@ use postings::PostingsSerializer; use postings::Postings; use postings::DocSet; use core::TermIterator; -use fst::Streamer; use schema::{Schema, Field}; use fastfield::FastFieldSerializer; use store::StoreWriter; @@ -132,7 +131,7 @@ impl IndexMerger { max_doc += reader.max_doc(); } - while let Some(term) = merged_terms.next() { + while merged_terms.advance() { // Create the total list of doc ids // by stacking the doc ids from the different segment. // @@ -142,14 +141,16 @@ impl IndexMerger { // - Segment 1's doc ids become [seg0.max_doc, seg0.max_doc + seg.max_doc] // - Segment 2's doc ids become [seg0.max_doc + seg1.max_doc, seg0.max_doc + seg1.max_doc + seg2.max_doc] // ... + let term = merged_terms.term(); let mut merged_postings = ChainedPostings::from( - self.readers + merged_terms + .segment_ords() .iter() - .enumerate() - .flat_map(|(segment_ord, reader)| { + .cloned() + .flat_map(|segment_ord| { let offset = offsets[segment_ord]; - reader + self.readers[segment_ord] .read_postings_all_info(&term) .map(|segment_postings| OffsetPostings::new(segment_postings, offset)) }) diff --git a/src/schema/term.rs b/src/schema/term.rs index c0c5009fe..305aac6b6 100644 --- a/src/schema/term.rs +++ b/src/schema/term.rs @@ -8,7 +8,6 @@ use std::str; /// Term represents the value that the token can take. /// /// It actually wraps a `Vec`. -/// TODO remove pub #[derive(Clone, PartialEq, PartialOrd, Ord, Eq, Hash)] pub struct Term(Vec);