diff --git a/src/datastruct/stacker/hashmap.rs b/src/datastruct/stacker/hashmap.rs index 55a6dc12c..c70c879fc 100644 --- a/src/datastruct/stacker/hashmap.rs +++ b/src/datastruct/stacker/hashmap.rs @@ -125,6 +125,10 @@ impl<'a, V> HashMap<'a, V> where V: HeapAllocable { .map(move |addr: u32| heap.get_mut_ref::(addr)) } + pub fn heap(&self) -> &Heap { + &self.heap + } + pub fn get_or_create>(&mut self, key: S) -> &mut V { let entry = self.lookup(key.as_ref()); match entry { diff --git a/src/datastruct/stacker/heap.rs b/src/datastruct/stacker/heap.rs index 9a43de897..c3b8d0a27 100644 --- a/src/datastruct/stacker/heap.rs +++ b/src/datastruct/stacker/heap.rs @@ -41,7 +41,6 @@ impl Heap { self.inner().clear(); } - /// Return the heap capacity. pub fn capacity(&self,) -> u32 { self.inner().capacity() @@ -91,6 +90,10 @@ impl Heap { pub fn get_mut_ref(&self, addr: u32) -> &mut Item { self.inner().get_mut_ref(addr) } + + pub fn get_ref(&self, addr: u32) -> &Item { + self.inner().get_mut_ref(addr) + } } diff --git a/src/indexer/delete_queue.rs b/src/indexer/delete_queue.rs index 23b8e162c..6ee4980f3 100644 --- a/src/indexer/delete_queue.rs +++ b/src/indexer/delete_queue.rs @@ -85,6 +85,24 @@ pub struct DeleteQueueCursor { impl DeleteQueueCursor { + /// Skips to the first delete operation which has + /// a timestamp that is greater or equal to opstamp. + /// + /// Returns false in the DeleteQueue reaches its end before + /// meeting such an element. + pub fn skip_to(&mut self, opstamp: u64) -> bool { + // TODO optimize + while let Some(delete_operation) = self.peek() { + if delete_operation.opstamp >= opstamp { + return true; + } + else { + self.consume(); + } + } + return false; + } + pub fn peek(&mut self) -> Option { if self.pos >= BLOCK_SIZE { self.pos = 0; diff --git a/src/indexer/document_receiver.rs b/src/indexer/document_receiver.rs new file mode 100644 index 000000000..73bb7b4ec --- /dev/null +++ b/src/indexer/document_receiver.rs @@ -0,0 +1,5 @@ +use DocId; + +pub trait DocumentReceiver { + fn receive(&mut self, doc: DocId); +} \ No newline at end of file diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 9bcfae1c5..1f970f72f 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -10,6 +10,7 @@ mod segment_manager; pub mod delete_queue; pub mod segment_updater; mod directory_lock; +pub mod document_receiver; pub mod operation; pub use self::segment_serializer::SegmentSerializer; diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index c4ba34602..bb8e19fb3 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -22,6 +22,21 @@ use super::delete_queue::DeleteQueueCursor; use indexer::index_writer::MARGIN_IN_BYTES; use super::operation::{AddOperation, DeleteOperation}; use bit_set::BitSet; +use indexer::document_receiver::DocumentReceiver; + + +struct DocumentDeleter<'a> { + limit_doc_id: DocId, + deleted_docs: &'a mut BitSet, +} + +impl<'a> DocumentReceiver for DocumentDeleter<'a> { + fn receive(&mut self, doc: DocId) { + if doc < self.limit_doc_id { + self.deleted_docs.insert(doc as usize); + } + } +} /// A `SegmentWriter` is in charge of creating segment index from a /// documents. @@ -36,7 +51,7 @@ pub struct SegmentWriter<'a> { fast_field_writers: U32FastFieldsWriter, fieldnorms_writer: U32FastFieldsWriter, delete_queue_cursor: &'a mut DeleteQueueCursor, - docstamps: Vec, + doc_opstamps: Vec, } @@ -103,7 +118,7 @@ impl<'a> SegmentWriter<'a> { segment_serializer: segment_serializer, fast_field_writers: U32FastFieldsWriter::from_schema(schema), delete_queue_cursor: delete_queue_cursor, - docstamps: Vec::with_capacity(1_000), + doc_opstamps: Vec::with_capacity(1_000), }) } @@ -136,23 +151,41 @@ impl<'a> SegmentWriter<'a> { pub fn is_buffer_full(&self,) -> bool { self.heap.num_free_bytes() <= MARGIN_IN_BYTES } - + + fn compute_doc_limit(&self, opstamp: u64) -> DocId { + let doc_id = match self.doc_opstamps.binary_search(&opstamp) { + Ok(doc_id) => doc_id, + Err(doc_id) => doc_id, + }; + doc_id as DocId + } + fn compute_delete_mask(&mut self) -> BitSet { - let delete_docs = BitSet::with_capacity(self.max_doc as usize); - loop { - if let Some(delete_operation) = self.delete_queue_cursor.peek() { - let delete_term = delete_operation.term; - let Field(field_id) = delete_term.field(); - let postings_writer = &self.per_field_postings_writers[field_id as usize]; - // TODO add the associated posting list with the correct cut-off. - self.delete_queue_cursor.consume(); + if let Some(min_opstamp) = self.doc_opstamps.first() { + if !self.delete_queue_cursor.skip_to(*min_opstamp) { + return BitSet::new(); } - else { - break; - } - } - delete_docs + else { + return BitSet::new(); + } + let mut deleted_docs = BitSet::with_capacity(self.max_doc as usize); + while let Some(delete_operation) = self.delete_queue_cursor.consume() { + // We can skip computing delete operations that + // are older than our oldest document. + // + // They don't belong to this document anyway. + let delete_term = delete_operation.term; + let Field(field_id) = delete_term.field(); + let postings_writer: &Box = &self.per_field_postings_writers[field_id as usize]; + let limit_doc_id = self.compute_doc_limit(delete_operation.opstamp); + let mut document_deleter = DocumentDeleter { + limit_doc_id: limit_doc_id, + deleted_docs: &mut deleted_docs + }; + postings_writer.push_documents(delete_term.value(), &mut document_deleter); + } + deleted_docs } @@ -162,7 +195,7 @@ impl<'a> SegmentWriter<'a> { pub fn add_document(&mut self, add_operation: &AddOperation, schema: &Schema) -> io::Result<()> { let doc_id = self.max_doc; let doc = &add_operation.document; - self.docstamps.push(add_operation.opstamp); + self.doc_opstamps.push(add_operation.opstamp); for (field, field_values) in doc.get_sorted_field_values() { let field_posting_writer: &mut Box = &mut self.per_field_postings_writers[field.0 as usize]; let field_options = schema.get_field_entry(field); diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs index c3d1f997f..c69218b39 100644 --- a/src/postings/postings_writer.rs +++ b/src/postings/postings_writer.rs @@ -7,7 +7,8 @@ use postings::Recorder; use analyzer::SimpleTokenizer; use schema::Field; use analyzer::StreamingIterator; -use datastruct::stacker::{HashMap, Heap}; +use indexer::document_receiver::DocumentReceiver; +use datastruct::stacker::{HashMap, Entry, Heap}; /// The `PostingsWriter` is in charge of receiving documenting /// and building a `Segment` in anonymous memory. @@ -22,11 +23,15 @@ pub trait PostingsWriter { /// * heap - heap used to store the postings informations as well as the terms /// in the hashmap. fn suscribe(&mut self, doc: DocId, pos: u32, term: &Term, heap: &Heap); - + /// Serializes the postings on disk. /// The actual serialization format is handled by the `PostingsSerializer`. fn serialize(&self, serializer: &mut PostingsSerializer, heap: &Heap) -> io::Result<()>; + /// Push all documents associated with a given term to a + /// given DocumentLister. + fn push_documents(&self, term_val: &[u8], document_listener: &mut DocumentReceiver); + /// Closes all of the currently open `Recorder`'s. fn close(&mut self, heap: &Heap); @@ -99,6 +104,15 @@ impl<'a, Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<' } } + + fn push_documents(&self, term_val: &[u8], document_receiver: &mut DocumentReceiver) { + if let Entry::Occupied(addr) = self.term_index.lookup(term_val) { + let heap = self.term_index.heap(); + let recorder: &Rec = heap.get_ref(addr); + recorder.push_documents(addr, document_receiver, heap); + } + } + #[inline] fn suscribe(&mut self, doc: DocId, position: u32, term: &Term, heap: &Heap) { let mut recorder = self.term_index.get_or_create(term); diff --git a/src/postings/recorder.rs b/src/postings/recorder.rs index 94173720b..970b6f071 100644 --- a/src/postings/recorder.rs +++ b/src/postings/recorder.rs @@ -2,6 +2,7 @@ use DocId; use std::io; use postings::PostingsSerializer; use datastruct::stacker::{ExpUnrolledLinkedList, Heap, HeapAllocable}; +use indexer::document_receiver::DocumentReceiver; const EMPTY_ARRAY: [u32; 0] = [0u32; 0]; const POSITION_END: u32 = 4294967295; @@ -28,6 +29,11 @@ pub trait Recorder: HeapAllocable { fn close_doc(&mut self, heap: &Heap); /// Returns the number of document that have been seen so far fn doc_freq(&self) -> u32; + /// Push all documents to a given DocumentLister. + fn push_documents(&self, + self_addr: u32, + document_receiver: &mut DocumentReceiver, + heap: &Heap); /// Pushes the postings information to the serializer. fn serialize(&self, self_addr: u32, @@ -73,6 +79,15 @@ impl Recorder for NothingRecorder { self.doc_freq } + fn push_documents(&self, + self_addr: u32, + document_receiver: &mut DocumentReceiver, + heap: &Heap) { + for doc in self.stack.iter(self_addr, heap) { + document_receiver.receive(doc); + } + } + fn serialize(&self, self_addr: u32, serializer: &mut PostingsSerializer, @@ -130,6 +145,17 @@ impl Recorder for TermFrequencyRecorder { self.doc_freq } + fn push_documents(&self, + self_addr: u32, + document_receiver: &mut DocumentReceiver, + heap: &Heap) { + let mut doc_iter = self.stack.iter(self_addr, heap); + while let Some(doc) = doc_iter.next() { + doc_iter.next().expect("Panicked while trying to read a frequency"); + document_receiver.receive(doc); + } + } + fn serialize(&self, self_addr: u32, serializer: &mut PostingsSerializer, @@ -190,6 +216,22 @@ impl Recorder for TFAndPositionRecorder { self.doc_freq } + fn push_documents(&self, + self_addr: u32, + document_receiver: &mut DocumentReceiver, + heap: &Heap) { + let mut positions_iter = self.stack.iter(self_addr, heap); + while let Some(doc) = positions_iter.next() { + document_receiver.receive(doc); + loop { + let position = positions_iter.next().expect("This should never happen. Pleasee report the bug."); + if position == POSITION_END { + break; + } + } + } + } + fn serialize(&self, self_addr: u32, serializer: &mut PostingsSerializer,