diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 75f329186..6318a17c9 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -5,7 +5,7 @@ use DocId; use core::SerializableSegment; use schema::FieldValue; use indexer::SegmentSerializer; -use postings::PostingsSerializer; +use postings::InvertedIndexSerializer; use fastfield::U64FastFieldReader; use itertools::Itertools; use postings::Postings; @@ -192,7 +192,7 @@ impl IndexMerger { Ok(()) } - fn write_postings(&self, serializer: &mut PostingsSerializer) -> Result<()> { + fn write_postings(&self, serializer: &mut InvertedIndexSerializer) -> Result<()> { let mut delta_computer = DeltaComputer::new(); let mut merged_terms = TermMerger::from(&self.readers[..]); diff --git a/src/indexer/segment_serializer.rs b/src/indexer/segment_serializer.rs index b75663927..35d10ef8d 100644 --- a/src/indexer/segment_serializer.rs +++ b/src/indexer/segment_serializer.rs @@ -4,7 +4,7 @@ use core::Segment; use core::SegmentComponent; use fastfield::FastFieldSerializer; use store::StoreWriter; -use postings::PostingsSerializer; +use postings::InvertedIndexSerializer; /// Segment serializer is in charge of laying out on disk @@ -13,7 +13,7 @@ pub struct SegmentSerializer { store_writer: StoreWriter, fast_field_serializer: FastFieldSerializer, fieldnorms_serializer: FastFieldSerializer, - postings_serializer: PostingsSerializer, + postings_serializer: InvertedIndexSerializer, } impl SegmentSerializer { @@ -27,7 +27,7 @@ impl SegmentSerializer { let fieldnorms_write = try!(segment.open_write(SegmentComponent::FIELDNORMS)); let fieldnorms_serializer = try!(FastFieldSerializer::new(fieldnorms_write)); - let postings_serializer = try!(PostingsSerializer::open(segment)); + let postings_serializer = try!(InvertedIndexSerializer::open(segment)); Ok(SegmentSerializer { postings_serializer: postings_serializer, store_writer: StoreWriter::new(store_write), @@ -37,7 +37,7 @@ impl SegmentSerializer { } /// Accessor to the `PostingsSerializer`. - pub fn get_postings_serializer(&mut self) -> &mut PostingsSerializer { + pub fn get_postings_serializer(&mut self) -> &mut InvertedIndexSerializer { &mut self.postings_serializer } diff --git a/src/postings/mod.rs b/src/postings/mod.rs index 21cfa6777..06e893646 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -17,7 +17,7 @@ mod segment_postings_option; pub use self::docset::{SkipResult, DocSet}; use self::recorder::{Recorder, NothingRecorder, TermFrequencyRecorder, TFAndPositionRecorder}; -pub use self::serializer::PostingsSerializer; +pub use self::serializer::InvertedIndexSerializer; pub(crate) use self::postings_writer::MultiFieldPostingsWriter; pub use self::term_info::TermInfo; pub use self::postings::Postings; @@ -58,7 +58,7 @@ mod tests { let schema = schema_builder.build(); let index = Index::create_in_ram(schema); let mut segment = index.new_segment(); - let mut posting_serializer = PostingsSerializer::open(&mut segment).unwrap(); + let mut posting_serializer = InvertedIndexSerializer::open(&mut segment).unwrap(); posting_serializer.new_field(text_field); posting_serializer.new_term("abc".as_bytes()).unwrap(); for doc_id in 0u32..120u32 { diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs index 67a8f9c5e..0a995a889 100644 --- a/src/postings/postings_writer.rs +++ b/src/postings/postings_writer.rs @@ -1,7 +1,7 @@ use DocId; use schema::Term; use schema::FieldValue; -use postings::PostingsSerializer; +use postings::InvertedIndexSerializer; use std::io; use postings::Recorder; use analyzer::SimpleTokenizer; @@ -78,7 +78,7 @@ impl<'a> MultiFieldPostingsWriter<'a> { /// It pushes all term, one field at a time, towards the /// postings serializer. #[allow(needless_range_loop)] - pub fn serialize(&self, serializer: &mut PostingsSerializer) -> Result<()> { + pub fn serialize(&self, serializer: &mut InvertedIndexSerializer) -> Result<()> { let mut term_offsets: Vec<(&[u8], u32)> = self.term_index.iter().collect(); term_offsets.sort_by_key(|&(k, _v)| k); @@ -138,7 +138,7 @@ pub trait PostingsWriter { fn serialize(&self, field: Field, term_addrs: &[(&[u8], u32)], - serializer: &mut PostingsSerializer, + serializer: &mut InvertedIndexSerializer, heap: &Heap) -> io::Result<()>; @@ -216,7 +216,7 @@ impl<'a, Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<' fn serialize(&self, field: Field, term_addrs: &[(&[u8], u32)], - serializer: &mut PostingsSerializer, + serializer: &mut InvertedIndexSerializer, heap: &Heap) -> io::Result<()> { serializer.new_field(field); diff --git a/src/postings/recorder.rs b/src/postings/recorder.rs index c340d13fd..d7f91d35c 100644 --- a/src/postings/recorder.rs +++ b/src/postings/recorder.rs @@ -1,6 +1,6 @@ use DocId; use std::io; -use postings::PostingsSerializer; +use postings::InvertedIndexSerializer; use datastruct::stacker::{ExpUnrolledLinkedList, Heap, HeapAllocable}; const EMPTY_ARRAY: [u32; 0] = [0u32; 0]; @@ -29,7 +29,7 @@ pub trait Recorder: HeapAllocable { /// Pushes the postings information to the serializer. fn serialize(&self, self_addr: u32, - serializer: &mut PostingsSerializer, + serializer: &mut InvertedIndexSerializer, heap: &Heap) -> io::Result<()>; } @@ -66,7 +66,7 @@ impl Recorder for NothingRecorder { fn serialize(&self, self_addr: u32, - serializer: &mut PostingsSerializer, + serializer: &mut InvertedIndexSerializer, heap: &Heap) -> io::Result<()> { for doc in self.stack.iter(self_addr, heap) { @@ -118,7 +118,7 @@ impl Recorder for TermFrequencyRecorder { fn serialize(&self, self_addr: u32, - serializer: &mut PostingsSerializer, + serializer: &mut InvertedIndexSerializer, heap: &Heap) -> io::Result<()> { // the last document has not been closed... @@ -173,7 +173,7 @@ impl Recorder for TFAndPositionRecorder { fn serialize(&self, self_addr: u32, - serializer: &mut PostingsSerializer, + serializer: &mut InvertedIndexSerializer, heap: &Heap) -> io::Result<()> { let mut doc_positions = Vec::with_capacity(100); diff --git a/src/postings/serializer.rs b/src/postings/serializer.rs index 5c5e93a7d..c4b9c1146 100644 --- a/src/postings/serializer.rs +++ b/src/postings/serializer.rs @@ -47,29 +47,125 @@ use termdict::TermDictionaryBuilder; /// /// A description of the serialization format is /// [available here](https://fulmicoton.gitbooks.io/tantivy-doc/content/inverted-index.html). -pub struct PostingsSerializer { +pub struct InvertedIndexSerializer { terms_fst_builder: TermDictionaryBuilderImpl, + postings_serializer: PostingsSerializer, + positions_serializer: PositionSerializer, + schema: Schema, + + term_open: bool, + text_indexing_options: TextIndexingOptions, + + current_term_info: TermInfo, + +} + +struct PostingsSerializer { postings_write: CountingWriter, last_doc_id_encoded: u32, - positions_writer: PositionWriter, + block_encoder: BlockEncoder, doc_ids: Vec, term_freqs: Vec, - schema: Schema, - text_indexing_options: TextIndexingOptions, - term_open: bool, - current_term_info: TermInfo, + + termfreq_enabled: bool, } -struct PositionWriter { +impl PostingsSerializer { + fn new(write: WritePtr) -> PostingsSerializer { + PostingsSerializer { + postings_write: CountingWriter::wrap(write), + + block_encoder: BlockEncoder::new(), + doc_ids: vec!(), + term_freqs: vec!(), + + last_doc_id_encoded: 0u32, + termfreq_enabled: false, + } + } + + fn write_doc(&mut self, doc_id: DocId, term_freq: u32) -> io::Result<()> { + self.doc_ids.push(doc_id); + if self.termfreq_enabled { + self.term_freqs.push(term_freq as u32); + } + if self.doc_ids.len() == NUM_DOCS_PER_BLOCK { + { + // encode the doc ids + let block_encoded: &[u8] = + self.block_encoder + .compress_block_sorted(&self.doc_ids, self.last_doc_id_encoded); + self.last_doc_id_encoded = self.doc_ids[self.doc_ids.len() - 1]; + self.postings_write.write_all(block_encoded)?; + } + if self.termfreq_enabled { + // encode the term_freqs + let block_encoded: &[u8] = self.block_encoder + .compress_block_unsorted(&self.term_freqs); + self.postings_write.write_all(block_encoded)?; + self.term_freqs.clear(); + } + self.doc_ids.clear(); + } + Ok(()) + } + + fn set_termfreq_enabled(&mut self, termfreq_enabled: bool) { + self.termfreq_enabled = termfreq_enabled; + } + + fn close_term(&mut self) -> io::Result<()> { + if !self.doc_ids.is_empty() { + // we have doc ids waiting to be written + // this happens when the number of doc ids is + // not a perfect multiple of our block size. + // + // In that case, the remaining part is encoded + // using variable int encoding. + { + let block_encoded = + self.block_encoder + .compress_vint_sorted(&self.doc_ids, self.last_doc_id_encoded); + self.postings_write.write_all(block_encoded)?; + self.doc_ids.clear(); + } + // ... Idem for term frequencies + if self.termfreq_enabled { + let block_encoded = self.block_encoder + .compress_vint_unsorted(&self.term_freqs[..]); + self.postings_write.write_all(block_encoded)?; + self.term_freqs.clear(); + } + } + Ok(()) + } + + fn close(mut self) -> io::Result<()> { + self.postings_write.flush() + } + + + fn addr(&self) -> u32 { + self.postings_write.written_bytes() as u32 + } + + fn clear(&mut self) { + self.doc_ids.clear(); + self.term_freqs.clear(); + self.last_doc_id_encoded = 0; + } +} + +struct PositionSerializer { buffer: Vec, write: CountingWriter, block_encoder: BlockEncoder, } -impl PositionWriter { - fn new(write: WritePtr) -> PositionWriter { - PositionWriter { +impl PositionSerializer { + fn new(write: WritePtr) -> PositionSerializer { + PositionSerializer { buffer: Vec::with_capacity(NUM_DOCS_PER_BLOCK), write: CountingWriter::wrap(write), block_encoder: BlockEncoder::new(), @@ -108,37 +204,33 @@ impl PositionWriter { } } -impl PostingsSerializer { +impl InvertedIndexSerializer { /// Open a new `PostingsSerializer` for the given segment pub fn new(terms_write: WritePtr, postings_write: WritePtr, positions_write: WritePtr, schema: Schema) - -> Result { + -> Result { let terms_fst_builder = TermDictionaryBuilderImpl::new(terms_write)?; - Ok(PostingsSerializer { - terms_fst_builder: terms_fst_builder, - postings_write: CountingWriter::wrap(postings_write), - positions_writer: PositionWriter::new(positions_write), - last_doc_id_encoded: 0u32, - block_encoder: BlockEncoder::new(), - doc_ids: Vec::new(), - term_freqs: Vec::new(), - schema: schema, - text_indexing_options: TextIndexingOptions::Unindexed, - term_open: false, - current_term_info: TermInfo::default(), - }) + Ok(InvertedIndexSerializer { + terms_fst_builder: terms_fst_builder, + positions_serializer: PositionSerializer::new(positions_write), + postings_serializer: PostingsSerializer::new(postings_write), + schema: schema, + term_open: false, + current_term_info: TermInfo::default(), + text_indexing_options: TextIndexingOptions::Untokenized, + }) } /// Open a new `PostingsSerializer` for the given segment - pub fn open(segment: &mut Segment) -> Result { + pub fn open(segment: &mut Segment) -> Result { use SegmentComponent::{TERMS, POSTINGS, POSITIONS}; - PostingsSerializer::new(segment.open_write(TERMS)?, - segment.open_write(POSTINGS)?, - segment.open_write(POSITIONS)?, - segment.schema()) + InvertedIndexSerializer::new(segment.open_write(TERMS)?, + segment.open_write(POSTINGS)?, + segment.open_write(POSITIONS)?, + segment.schema()) } /// Must be called before starting pushing terms of @@ -158,6 +250,17 @@ impl PostingsSerializer { } } }; + self.postings_serializer.set_termfreq_enabled(self.text_indexing_options.is_termfreq_enabled()); + } + + fn current_term_info(&self) -> TermInfo { + let (filepos, offset) = self.positions_serializer.addr(); + TermInfo { + doc_freq: 0, + postings_offset: self.postings_serializer.addr(), + positions_offset: filepos, + positions_inner_offset: offset, + } } /// Starts the postings for a new term. @@ -169,16 +272,8 @@ impl PostingsSerializer { panic!("Called new_term, while the previous term was not closed."); } self.term_open = true; - self.doc_ids.clear(); - self.last_doc_id_encoded = 0; - self.term_freqs.clear(); - let (filepos, offset) = self.positions_writer.addr(); - self.current_term_info = TermInfo { - doc_freq: 0, - postings_offset: self.postings_write.written_bytes() as u32, - positions_offset: filepos, - positions_inner_offset: offset, - }; + self.postings_serializer.clear(); + self.current_term_info = self.current_term_info(); self.terms_fst_builder.insert_key(term) } @@ -188,32 +283,8 @@ impl PostingsSerializer { /// using `VInt` encoding. pub fn close_term(&mut self) -> io::Result<()> { if self.term_open { - - self.terms_fst_builder - .insert_value(&self.current_term_info)?; - - if !self.doc_ids.is_empty() { - // we have doc ids waiting to be written - // this happens when the number of doc ids is - // not a perfect multiple of our block size. - // - // In that case, the remaining part is encoded - // using variable int encoding. - { - let block_encoded = - self.block_encoder - .compress_vint_sorted(&self.doc_ids, self.last_doc_id_encoded); - self.postings_write.write_all(block_encoded)?; - self.doc_ids.clear(); - } - // ... Idem for term frequencies - if self.text_indexing_options.is_termfreq_enabled() { - let block_encoded = self.block_encoder - .compress_vint_unsorted(&self.term_freqs[..]); - self.postings_write.write_all(block_encoded)?; - self.term_freqs.clear(); - } - } + self.terms_fst_builder.insert_value(&self.current_term_info)?; + self.postings_serializer.close_term()?; self.term_open = false; } Ok(()) @@ -235,31 +306,11 @@ impl PostingsSerializer { position_deltas: &[u32]) -> io::Result<()> { self.current_term_info.doc_freq += 1; - self.doc_ids.push(doc_id); - if self.text_indexing_options.is_termfreq_enabled() { - self.term_freqs.push(term_freq as u32); - } + self.postings_serializer.write_doc(doc_id, term_freq)?; if self.text_indexing_options.is_position_enabled() { - self.positions_writer.write(position_deltas)?; - } - if self.doc_ids.len() == NUM_DOCS_PER_BLOCK { - { - // encode the doc ids - let block_encoded: &[u8] = - self.block_encoder - .compress_block_sorted(&self.doc_ids, self.last_doc_id_encoded); - self.last_doc_id_encoded = self.doc_ids[self.doc_ids.len() - 1]; - self.postings_write.write_all(block_encoded)?; - } - if self.text_indexing_options.is_termfreq_enabled() { - // encode the term_freqs - let block_encoded: &[u8] = self.block_encoder - .compress_block_unsorted(&self.term_freqs); - self.postings_write.write_all(block_encoded)?; - self.term_freqs.clear(); - } - self.doc_ids.clear(); + self.positions_serializer.write(position_deltas)?; } + Ok(()) } @@ -267,8 +318,8 @@ impl PostingsSerializer { pub fn close(mut self) -> io::Result<()> { self.close_term()?; self.terms_fst_builder.finish()?; - self.postings_write.flush()?; - self.positions_writer.close()?; + self.postings_serializer.close()?; + self.positions_serializer.close()?; Ok(()) } }