From e12fc4bb09ddf52b0f6b3da561a18fccd1a23752 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Sun, 5 Feb 2017 19:01:06 +0900 Subject: [PATCH] issue/43 deletes merge not working only updating uncommitted --- src/core/index.rs | 4 +- src/core/segment.rs | 21 +++++-- src/core/segment_component.rs | 10 +--- src/core/segment_id.rs | 5 -- src/core/segment_reader.rs | 6 +- src/fastfield/mod.rs | 3 - src/indexer/delete_queue.rs | 38 ++++++------ src/indexer/document_receiver.rs | 5 -- src/indexer/index_writer.rs | 100 ++++++++++++++++++++++++------- src/indexer/mod.rs | 1 - src/indexer/segment_manager.rs | 17 ++++-- src/indexer/segment_register.rs | 4 ++ src/indexer/segment_updater.rs | 21 +++++++ src/indexer/segment_writer.rs | 100 ++----------------------------- src/lib.rs | 34 ++++++----- src/postings/postings_writer.rs | 15 +---- src/postings/recorder.rs | 42 ------------- 17 files changed, 185 insertions(+), 241 deletions(-) delete mode 100644 src/indexer/document_receiver.rs diff --git a/src/core/index.rs b/src/core/index.rs index 85005c6e9..50740281f 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -208,8 +208,8 @@ impl Index { /// Return a segment object given a `segment_id` /// /// The segment may or may not exist. - pub fn segment(&self, segment_id: SegmentId, commit_opstamp: u64) -> Segment { - create_segment(self.clone(), segment_id, commit_opstamp) + pub fn segment(&self, segment_id: SegmentId, opstamp: u64) -> Segment { + create_segment(self.clone(), segment_id, opstamp) } /// Return a reference to the index directory. diff --git a/src/core/segment.rs b/src/core/segment.rs index 57e87f7e8..82891e6e8 100644 --- a/src/core/segment.rs +++ b/src/core/segment.rs @@ -16,7 +16,7 @@ use directory::error::{FileError, OpenWriteError}; pub struct Segment { index: Index, segment_id: SegmentId, - commit_opstamp: u64, + opstamp: u64, } impl fmt::Debug for Segment { @@ -28,11 +28,11 @@ impl fmt::Debug for Segment { /// Creates a new segment given an `Index` and a `SegmentId` /// /// The function is here to make it private outside `tantivy`. -pub fn create_segment(index: Index, segment_id: SegmentId, commit_opstamp: u64) -> Segment { +pub fn create_segment(index: Index, segment_id: SegmentId, opstamp: u64) -> Segment { Segment { index: index, segment_id: segment_id, - commit_opstamp: commit_opstamp, + opstamp: opstamp, } } @@ -43,8 +43,8 @@ impl Segment { self.index.schema() } - pub fn commit_opstamp(&self) -> u64 { - self.commit_opstamp + pub fn opstamp(&self) -> u64 { + self.opstamp } /// Returns the segment's id. @@ -52,12 +52,21 @@ impl Segment { self.segment_id } + pub fn with_opstamp(&self, opstamp: u64) -> Segment { + Segment { + index: self.index.clone(), + segment_id: self.segment_id.clone(), + opstamp: opstamp, + } + } + /// Returns the relative path of a component of our segment. /// /// It just joins the segment id with the extension /// associated to a segment component. pub fn relative_path(&self, component: SegmentComponent) -> PathBuf { - self.segment_id.relative_path(component) + let path_suffix = component.path_suffix(self.opstamp); + PathBuf::from(self.segment_id.uuid_string() + &*path_suffix) } /// Open one of the component file for read. diff --git a/src/core/segment_component.rs b/src/core/segment_component.rs index 57994ddf9..9b7bea9be 100644 --- a/src/core/segment_component.rs +++ b/src/core/segment_component.rs @@ -7,14 +7,12 @@ pub enum SegmentComponent { FIELDNORMS, TERMS, STORE, - DELETE(u64), //< The argument here is an opstamp. - // All of the deletes with an opstamp smaller or equal - // to this opstamp have been taken in account. + DELETE } impl SegmentComponent { - pub fn path_suffix(&self)-> String { + pub fn path_suffix(&self, opstamp: u64)-> String { match *self { SegmentComponent::POSITIONS => ".pos".to_string(), SegmentComponent::INFO => ".info".to_string(), @@ -23,9 +21,7 @@ impl SegmentComponent { SegmentComponent::STORE => ".store".to_string(), SegmentComponent::FASTFIELDS => ".fast".to_string(), SegmentComponent::FIELDNORMS => ".fieldnorm".to_string(), - SegmentComponent::DELETE(opstamp) => { - format!(".{}.del", opstamp) - } + SegmentComponent::DELETE => {format!(".{}.del", opstamp)}, } } } diff --git a/src/core/segment_id.rs b/src/core/segment_id.rs index a9916cb83..263d94c58 100644 --- a/src/core/segment_id.rs +++ b/src/core/segment_id.rs @@ -48,11 +48,6 @@ impl SegmentId { pub fn uuid_string(&self,) -> String { self.0.simple().to_string() } - - pub fn relative_path(&self, component: SegmentComponent) -> PathBuf { - let filename = self.uuid_string() + &*component.path_suffix(); - PathBuf::from(filename) - } } impl Encodable for SegmentId { diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index c1e44e754..f2ed4e7a4 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -147,7 +147,7 @@ impl SegmentReader { .unwrap_or_else(|_| ReadOnlySource::empty()); // TODO 0u64 - let delete_data_res = segment.open_read(SegmentComponent::DELETE(segment.commit_opstamp())); + let delete_data_res = segment.open_read(SegmentComponent::DELETE); let delete_bitset; if let Err(FileError::FileDoesNotExist(_)) = delete_data_res { delete_bitset = DeleteBitSet::empty(); @@ -262,6 +262,10 @@ impl SegmentReader { pub fn segment_id(&self) -> SegmentId { self.segment_id } + + pub fn is_deleted(&self, doc: DocId) -> bool { + self.delete_bitset.is_deleted(doc) + } } diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index 00de208b9..ae3d83f88 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -54,9 +54,6 @@ mod tests { #[test] pub fn test_fastfield() { let test_fastfield = U32FastFieldReader::from(vec!(100,200,300)); - println!("{}", test_fastfield.get(0)); - println!("{}", test_fastfield.get(1)); - println!("{}", test_fastfield.get(2)); assert_eq!(test_fastfield.get(0), 100); assert_eq!(test_fastfield.get(1), 200); assert_eq!(test_fastfield.get(2), 300); diff --git a/src/indexer/delete_queue.rs b/src/indexer/delete_queue.rs index 6ee4980f3..fd1e4f59b 100644 --- a/src/indexer/delete_queue.rs +++ b/src/indexer/delete_queue.rs @@ -97,7 +97,7 @@ impl DeleteQueueCursor { return true; } else { - self.consume(); + self.next(); } } return false; @@ -128,15 +128,17 @@ impl DeleteQueueCursor { } } +} + +impl Iterator for DeleteQueueCursor { + type Item = DeleteOperation; + /// Returns a delete operation if an operation is available, /// None if the queue is empty. /// - /// (We are voluntarily not using the `Iterator` trait - /// as a call to `consume` may return None once, and return - /// `Some(...)` ulteriorily. While this is officially - /// compatible with the `Iterator` specification, we judge - /// this confusing.) - pub fn consume(&mut self) -> Option { + /// This iterator may return None once, and return + /// `Some(...)` ulteriorily. + fn next(&mut self) -> Option { let delete_position = self.peek(); if delete_position.is_some() { self.pos += 1; @@ -197,7 +199,7 @@ mod tests { let mut delete_cursor_3 = delete_queue.cursor(); let mut delete_cursor_3_b = delete_cursor_3.clone(); - assert!(delete_cursor_3.consume().is_none()); + assert!(delete_cursor_3.next().is_none()); assert!(delete_cursor_3.peek().is_none()); delete_queue.push_op(make_op(3)); @@ -206,24 +208,24 @@ mod tests { assert_eq!(delete_cursor_3_b.peek(), Some(make_op(3))); let mut delete_cursor_3_c = delete_cursor_3_b.clone(); - assert_eq!(delete_cursor_3_b.consume(), Some(make_op(3))); + assert_eq!(delete_cursor_3_b.next(), Some(make_op(3))); let mut delete_cursor_4 = delete_cursor_3_b.clone(); assert_eq!(delete_cursor_3_b.peek(), Some(make_op(4))); - assert_eq!(delete_cursor_3_b.consume(), Some(make_op(4))); + assert_eq!(delete_cursor_3_b.next(), Some(make_op(4))); - assert_eq!(delete_cursor_3_c.consume(), Some(make_op(3))); + assert_eq!(delete_cursor_3_c.next(), Some(make_op(3))); - assert!(delete_cursor_3_b.consume().is_none()); - assert_eq!(delete_cursor_3_c.consume(), Some(make_op(4))); - assert!(delete_cursor_3_c.consume().is_none()); + assert!(delete_cursor_3_b.next().is_none()); + assert_eq!(delete_cursor_3_c.next(), Some(make_op(4))); + assert!(delete_cursor_3_c.next().is_none()); assert_eq!(delete_cursor_3.peek(), Some(make_op(3))); - assert_eq!(delete_cursor_3.consume(), Some(make_op(3))); - assert!(delete_cursor_3_b.consume().is_none()); + assert_eq!(delete_cursor_3.next(), Some(make_op(3))); + assert!(delete_cursor_3_b.next().is_none()); - assert_eq!(delete_cursor_4.consume(), Some(make_op(4))); - assert!(delete_cursor_4.consume().is_none()); + assert_eq!(delete_cursor_4.next(), Some(make_op(4))); + assert!(delete_cursor_4.next().is_none()); } diff --git a/src/indexer/document_receiver.rs b/src/indexer/document_receiver.rs deleted file mode 100644 index 73bb7b4ec..000000000 --- a/src/indexer/document_receiver.rs +++ /dev/null @@ -1,5 +0,0 @@ -use DocId; - -pub trait DocumentReceiver { - fn receive(&mut self, doc: DocId); -} \ No newline at end of file diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index f33ba5773..dcc9126af 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -9,6 +9,11 @@ use indexer::SegmentEntry; use std::thread::JoinHandle; use indexer::MergePolicy; use indexer::SegmentWriter; +use DocId; +use bit_set::BitSet; +use fastfield::delete::write_delete_bitset; +use postings::SegmentPostingsOption; +use postings::DocSet; use core::SegmentComponent; use super::directory_lock::DirectoryLock; use futures::Future; @@ -19,6 +24,7 @@ use std::thread; use futures::Canceled; use std::mem; use datastruct::stacker::Heap; +use core::SegmentReader; use std::mem::swap; use chan; use core::SegmentMeta; @@ -84,6 +90,64 @@ impl !Send for IndexWriter {} impl !Sync for IndexWriter {} +pub enum DocToOpstampMapping { + WithMap(Vec), + None +} + +impl DocToOpstampMapping { + fn compute_doc_limit(&self, opstamp: u64) -> DocId { + match *self { + DocToOpstampMapping::WithMap(ref doc_opstamps) => { + match doc_opstamps.binary_search(&opstamp) { + Ok(doc_id) => doc_id as DocId, + Err(doc_id) => doc_id as DocId, + } + } + DocToOpstampMapping::None => DocId::max_value(), + } + } +} + + + +/// TODO +/// work on SegmentMeta +pub fn advance_deletes( + segment: &Segment, + delete_cursor: &mut DeleteQueueCursor, + doc_opstamps: DocToOpstampMapping) -> Result<(u64, BitSet)> { + let segment_reader = SegmentReader::open(segment.clone())?; + let mut delete_bitset = BitSet::new(); + for doc in 0u32..segment_reader.max_doc() { + if segment_reader.is_deleted(doc) { + delete_bitset.insert(doc as usize); + } + } + let mut has_changed = false; + let mut last_opstamp = segment.opstamp();//segment + for delete_op in delete_cursor { + // A delete operation should only affect + // document that were inserted after it. + // + // Limit doc helps identify the first document + // that may be affected by the delete operation. + let limit_doc = doc_opstamps.compute_doc_limit(delete_op.opstamp); + if let Some(mut docset) = segment_reader.read_postings(&delete_op.term, SegmentPostingsOption::NoFreq) { + while docset.advance() { + has_changed = true; + let deleted_doc = docset.doc(); + if deleted_doc < limit_doc { + has_changed = true; + delete_bitset.insert(deleted_doc as usize); + } + } + } + last_opstamp = delete_op.opstamp; + } + Ok((last_opstamp, delete_bitset)) +} + fn index_documents(heap: &mut Heap, mut segment: Segment, schema: &Schema, @@ -106,32 +170,28 @@ fn index_documents(heap: &mut Heap, let num_docs = segment_writer.max_doc(); assert!(num_docs > 0); - let deleted_docset_opt = segment_writer.compute_deleted_bitset(delete_cursor); let last_opstamp = segment_writer.last_opstamp(); + + let doc_opstamps: Vec = segment_writer.finalize()?; - let num_deleted_docs; + let (last_opstamp_after_deletes, deleted_docset) = advance_deletes(&segment, delete_cursor, DocToOpstampMapping::WithMap(doc_opstamps))?; - if let Some(deleted_docset) = deleted_docset_opt { - let mut delete_write = segment.open_write(SegmentComponent::DELETE(last_opstamp))?; - delete::write_delete_bitset(&deleted_docset, &mut delete_write)?; - num_deleted_docs = deleted_docset.len(); - } - else { - num_deleted_docs = 0; + { + let mut delete_file = segment.with_opstamp(last_opstamp_after_deletes).open_write(SegmentComponent::DELETE)?; + write_delete_bitset(&deleted_docset, &mut delete_file)?; } + let num_deleted_docs = deleted_docset.len() as DocId; let segment_meta = SegmentMeta { segment_id: segment_id, num_docs: num_docs, - num_deleted_docs: num_deleted_docs as u32, - opstamp: last_opstamp, + num_deleted_docs: num_deleted_docs, + opstamp: last_opstamp_after_deletes, }; let segment_entry = SegmentEntry::new(segment_meta, delete_cursor.clone()); - - try!(segment_writer.finalize()); - + segment_updater .add_segment(generation, segment_entry) .wait() @@ -143,8 +203,6 @@ fn index_documents(heap: &mut Heap, impl IndexWriter { /// The index writer pub fn wait_merging_threads(mut self) -> Result<()> { - - // let future = self.segment_updater.terminate(); // this will stop the indexing thread, // dropping the last reference to the segment_updater. @@ -165,9 +223,7 @@ impl IndexWriter { .wait_merging_thread() .map_err(|_| Error::ErrorInThread("Failed to join merging thread.".to_string()) - )?; - // future.wait().unwrap(); // TODO do something with the result. - Ok(()) + ) } /// Spawns a new worker thread for indexing. @@ -384,7 +440,6 @@ impl IndexWriter { Ok(self.committed_opstamp) } - /// Commits all of the pending changes /// /// A call to commit blocks. @@ -408,7 +463,7 @@ impl IndexWriter { let mut former_workers_join_handle = Vec::new(); swap(&mut former_workers_join_handle, &mut self.workers_join_handle); - + for worker_handle in former_workers_join_handle { let indexing_worker_result = try!(worker_handle.join() .map_err(|e| Error::ErrorInThread(format!("{:?}", e)))); @@ -416,6 +471,7 @@ impl IndexWriter { // add a new worker for the next generation. try!(self.add_indexing_worker()); } + // here, because we join all of the worker threads, // all of the segment update for this commit have been // sent. @@ -426,8 +482,8 @@ impl IndexWriter { // This will move uncommitted segments to the state of // committed segments. - self.committed_opstamp = self.stamp(); + let future = self.segment_updater.commit(self.committed_opstamp); // wait for the segment update thread to have processed the info diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 15e2035f7..aea0965f5 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -10,7 +10,6 @@ mod segment_manager; pub mod delete_queue; pub mod segment_updater; mod directory_lock; -pub mod document_receiver; pub mod operation; pub use self::segment_register::SegmentEntry; diff --git a/src/indexer/segment_manager.rs b/src/indexer/segment_manager.rs index 41d0ded70..10ee89486 100644 --- a/src/indexer/segment_manager.rs +++ b/src/indexer/segment_manager.rs @@ -8,7 +8,6 @@ use std::sync::{RwLockReadGuard, RwLockWriteGuard}; use std::fmt::{self, Debug, Formatter}; struct SegmentRegisters { - docstamp: u64, uncommitted: SegmentRegister, committed: SegmentRegister, } @@ -16,7 +15,6 @@ struct SegmentRegisters { impl Default for SegmentRegisters { fn default() -> SegmentRegisters { SegmentRegisters { - docstamp: 0u64, uncommitted: SegmentRegister::default(), committed: SegmentRegister::default() } @@ -57,12 +55,23 @@ impl SegmentManager { pub fn from_segments(segment_metas: Vec, delete_cursor: DeleteQueueCursor) -> SegmentManager { SegmentManager { registers: RwLock::new( SegmentRegisters { - docstamp: 0u64, // TODO put the actual value uncommitted: SegmentRegister::default(), committed: SegmentRegister::new(segment_metas, delete_cursor), }), } } + + pub fn segment_entries(&self,) -> Vec { + let mut segment_entries = self.read() + .uncommitted + .segment_entries(); + segment_entries.extend( + self.read() + .committed + .segment_entries() + ); + segment_entries + } pub fn segment_entry(&self, segment_id: &SegmentId) -> Option { let registers = self.read(); @@ -98,7 +107,6 @@ impl SegmentManager { for segment_entry in segment_entries { registers_lock.committed.add_segment_entry(segment_entry); } - registers_lock.docstamp = docstamp; registers_lock.uncommitted.clear(); } @@ -151,7 +159,6 @@ impl Default for SegmentManager { fn default() -> SegmentManager { SegmentManager { registers: RwLock::new( SegmentRegisters { - docstamp: 0u64, // TODO put the actual value uncommitted: SegmentRegister::default(), committed: SegmentRegister::default(), }), diff --git a/src/indexer/segment_register.rs b/src/indexer/segment_register.rs index 982e320c0..4c838bf63 100644 --- a/src/indexer/segment_register.rs +++ b/src/indexer/segment_register.rs @@ -32,6 +32,10 @@ impl SegmentEntry { pub fn segment_id(&self) -> SegmentId { self.meta.segment_id } + + pub fn delete_cursor(&mut self) -> &mut DeleteQueueCursor { + &mut self.delete_cursor + } pub fn meta(&self) -> &SegmentMeta { &self.meta diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index bb79b2721..9482f9aaa 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -10,9 +10,11 @@ use std::mem; use std::sync::atomic::Ordering; use std::ops::DerefMut; use futures::{Future, future}; +use fastfield::delete::write_delete_bitset; use futures::oneshot; use futures::Canceled; use std::thread; +use core::SegmentComponent; use std::sync::atomic::AtomicUsize; use std::sync::RwLock; use core::SerializableSegment; @@ -22,6 +24,7 @@ use std::borrow::BorrowMut; use indexer::SegmentSerializer; use indexer::SegmentEntry; use schema::Schema; +use indexer::index_writer::{advance_deletes, DocToOpstampMapping}; use directory::Directory; use std::thread::JoinHandle; use std::sync::Arc; @@ -171,8 +174,26 @@ impl SegmentUpdater { } } + fn purge_deletes(&self, target_opstamp: u64) -> Result<()> { + let uncommitted = self.0.segment_manager.segment_entries(); + for mut segment_entry in uncommitted { + let mut segment = self.0.index.segment(segment_entry.meta().segment_id, segment_entry.meta().opstamp); + let (_, deleted_docset) = advance_deletes( + &segment, + segment_entry.delete_cursor(), + DocToOpstampMapping::None).unwrap(); + { + let mut delete_file = segment.with_opstamp(target_opstamp).open_write(SegmentComponent::DELETE)?; + write_delete_bitset(&deleted_docset, &mut delete_file)?; + } + + } + Ok(()) + } + pub fn commit(&self, opstamp: u64) -> impl Future { self.run_async(move |segment_updater| { + segment_updater.purge_deletes(opstamp).expect("Failed purge deletes"); segment_updater.0.segment_manager.commit(opstamp); let mut directory = segment_updater.0.index.directory().box_clone(); save_metas( diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index d64bd77bd..d2de2b946 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -17,58 +17,10 @@ use postings::SpecializedPostingsWriter; use postings::{NothingRecorder, TermFrequencyRecorder, TFAndPositionRecorder}; use indexer::segment_serializer::SegmentSerializer; use datastruct::stacker::Heap; -use super::delete_queue::DeleteQueueCursor; use indexer::index_writer::MARGIN_IN_BYTES; use super::operation::AddOperation; -use bit_set::BitSet; -use indexer::document_receiver::DocumentReceiver; -use core::SegmentReader; -use postings::SegmentPostingsOption; -use postings::DocSet; - -fn update_deleted_bitset( - segment_reader: &SegmentReader, - bitset: &mut BitSet, - delete_cursor: &mut DeleteQueueCursor, - limit_opstamp_opt: Option) -> bool { - let mut has_changed = false; - let limit_opstamp = limit_opstamp_opt.unwrap_or(u64::max_value()); - loop { - if let Some(delete_op) = delete_cursor.peek() { - if delete_op.opstamp > limit_opstamp { - break; - } - if let Some(mut docset) = segment_reader.read_postings(&delete_op.term, SegmentPostingsOption::NoFreq) { - while docset.advance() { - has_changed = true; - let deleted_doc = docset.doc(); - bitset.insert(deleted_doc as usize); - } - } - } - else { - break; - } - delete_cursor.consume(); - } - has_changed -} - -struct DocumentDeleter<'a> { - limit_doc_id: DocId, - deleted_docs: &'a mut BitSet, -} - -impl<'a> DocumentReceiver for DocumentDeleter<'a> { - fn receive(&mut self, doc: DocId) { - if doc < self.limit_doc_id { - self.deleted_docs.insert(doc as usize); - } - } -} - /// A `SegmentWriter` is in charge of creating segment index from a /// documents. /// @@ -154,19 +106,18 @@ impl<'a> SegmentWriter<'a> { /// /// Finalize consumes the `SegmentWriter`, so that it cannot /// be used afterwards. - pub fn finalize(mut self) -> Result<()> { + pub fn finalize(mut self) -> Result> { let segment_info = self.segment_info(); for per_field_postings_writer in &mut self.per_field_postings_writers { per_field_postings_writer.close(self.heap); } - try!(write( - &self.per_field_postings_writers, + write(&self.per_field_postings_writers, &self.fast_field_writers, &self.fieldnorms_writer, segment_info, self.segment_serializer, - self.heap)); - Ok(()) + self.heap)?; + Ok(self.doc_opstamps) } /// Returns true iff the segment writer's buffer has reached capacity. @@ -180,14 +131,6 @@ impl<'a> SegmentWriter<'a> { self.heap.num_free_bytes() <= MARGIN_IN_BYTES } - fn compute_doc_limit(&self, opstamp: u64) -> DocId { - let doc_id = match self.doc_opstamps.binary_search(&opstamp) { - Ok(doc_id) => doc_id, - Err(doc_id) => doc_id, - }; - doc_id as DocId - } - // pub fn compute_doc_mapping_after_delete(&self, mut delete_queue_cursor: DeleteQueueCursor) -> Vec> { // let delete_docs = self.compute_delete_mask(&mut delete_queue_cursor); // let max_doc: usize = self.max_doc as usize; @@ -211,41 +154,6 @@ impl<'a> SegmentWriter<'a> { .last() .expect("Last doc opstamp called on an empty segment writer")) } - - /// TODO compute the bitset using the segment reader directly. - pub fn compute_deleted_bitset(&self, delete_queue_cursor: &mut DeleteQueueCursor) -> Option { - if let Some(first_opstamp) = self.doc_opstamps.first() { - if !delete_queue_cursor.skip_to(*first_opstamp) { - return None; - } - } - else { - return None; - } - let last_opstamp = *self.doc_opstamps.last().unwrap(); - let mut deleted_docs = BitSet::with_capacity(self.max_doc as usize); - while let Some(delete_operation) = delete_queue_cursor.peek() { - if delete_operation.opstamp > last_opstamp { - break; - } - // We can skip computing delete operations that - // are older than our oldest document. - // - // They don't belong to this document anyway. - let delete_term = delete_operation.term; - let Field(field_id) = delete_term.field(); - let postings_writer: &Box = &self.per_field_postings_writers[field_id as usize]; - let limit_doc_id = self.compute_doc_limit(delete_operation.opstamp); - let mut document_deleter = DocumentDeleter { - limit_doc_id: limit_doc_id, - deleted_docs: &mut deleted_docs - }; - postings_writer.push_documents(delete_term.value(), &mut document_deleter); - delete_queue_cursor.consume(); - } - Some(deleted_docs) - } - /// Indexes a new document /// diff --git a/src/lib.rs b/src/lib.rs index b2b3b36d8..09219195c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -300,19 +300,19 @@ mod tests { { // writing the segment let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); - { + { // 0 let doc = doc!(text_field=>"a b"); index_writer.add_document(doc).unwrap(); } - { + { // 1 let doc = doc!(text_field=>" a c"); index_writer.add_document(doc).unwrap(); } - { + { // 2 let doc = doc!(text_field=>" b c"); index_writer.add_document(doc).unwrap(); } - { + { // 3 let doc = doc!(text_field=>" b d"); index_writer.add_document(doc).unwrap(); } @@ -322,11 +322,11 @@ mod tests { { index_writer.delete_term(Term::from_field_text(text_field, "a")); } - { + { // 4 let doc = doc!(text_field=>" b c"); index_writer.add_document(doc).unwrap(); } - { + { // 5 let doc = doc!(text_field=>" a"); index_writer.add_document(doc).unwrap(); } @@ -337,14 +337,20 @@ mod tests { let searcher = index.searcher(); let reader = searcher.segment_reader(0); assert!(reader.read_postings_all_info(&Term::from_field_text(text_field, "abcd")).is_none()); - let mut postings = reader.read_postings_all_info(&Term::from_field_text(text_field, "a")).unwrap(); - assert!(postings.advance()); - assert_eq!(postings.doc(), 2); - assert!(postings.advance()); - assert_eq!(postings.doc(), 3); - assert!(postings.advance()); - assert_eq!(postings.doc(), 5); - assert!(!postings.advance()); + { + let mut postings = reader.read_postings_all_info(&Term::from_field_text(text_field, "a")).unwrap(); + assert!(postings.advance()); + assert_eq!(postings.doc(), 5); + assert!(!postings.advance()); + } + { + let mut postings = reader.read_postings_all_info(&Term::from_field_text(text_field, "b")).unwrap(); + assert!(postings.advance()); + assert_eq!(postings.doc(), 3); + assert!(postings.advance()); + assert_eq!(postings.doc(), 4); + assert!(!postings.advance()); + } } } diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs index c69218b39..a3c0194f1 100644 --- a/src/postings/postings_writer.rs +++ b/src/postings/postings_writer.rs @@ -7,8 +7,7 @@ use postings::Recorder; use analyzer::SimpleTokenizer; use schema::Field; use analyzer::StreamingIterator; -use indexer::document_receiver::DocumentReceiver; -use datastruct::stacker::{HashMap, Entry, Heap}; +use datastruct::stacker::{HashMap, Heap}; /// The `PostingsWriter` is in charge of receiving documenting /// and building a `Segment` in anonymous memory. @@ -28,10 +27,6 @@ pub trait PostingsWriter { /// The actual serialization format is handled by the `PostingsSerializer`. fn serialize(&self, serializer: &mut PostingsSerializer, heap: &Heap) -> io::Result<()>; - /// Push all documents associated with a given term to a - /// given DocumentLister. - fn push_documents(&self, term_val: &[u8], document_listener: &mut DocumentReceiver); - /// Closes all of the currently open `Recorder`'s. fn close(&mut self, heap: &Heap); @@ -105,14 +100,6 @@ impl<'a, Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<' } - fn push_documents(&self, term_val: &[u8], document_receiver: &mut DocumentReceiver) { - if let Entry::Occupied(addr) = self.term_index.lookup(term_val) { - let heap = self.term_index.heap(); - let recorder: &Rec = heap.get_ref(addr); - recorder.push_documents(addr, document_receiver, heap); - } - } - #[inline] fn suscribe(&mut self, doc: DocId, position: u32, term: &Term, heap: &Heap) { let mut recorder = self.term_index.get_or_create(term); diff --git a/src/postings/recorder.rs b/src/postings/recorder.rs index 970b6f071..94173720b 100644 --- a/src/postings/recorder.rs +++ b/src/postings/recorder.rs @@ -2,7 +2,6 @@ use DocId; use std::io; use postings::PostingsSerializer; use datastruct::stacker::{ExpUnrolledLinkedList, Heap, HeapAllocable}; -use indexer::document_receiver::DocumentReceiver; const EMPTY_ARRAY: [u32; 0] = [0u32; 0]; const POSITION_END: u32 = 4294967295; @@ -29,11 +28,6 @@ pub trait Recorder: HeapAllocable { fn close_doc(&mut self, heap: &Heap); /// Returns the number of document that have been seen so far fn doc_freq(&self) -> u32; - /// Push all documents to a given DocumentLister. - fn push_documents(&self, - self_addr: u32, - document_receiver: &mut DocumentReceiver, - heap: &Heap); /// Pushes the postings information to the serializer. fn serialize(&self, self_addr: u32, @@ -79,15 +73,6 @@ impl Recorder for NothingRecorder { self.doc_freq } - fn push_documents(&self, - self_addr: u32, - document_receiver: &mut DocumentReceiver, - heap: &Heap) { - for doc in self.stack.iter(self_addr, heap) { - document_receiver.receive(doc); - } - } - fn serialize(&self, self_addr: u32, serializer: &mut PostingsSerializer, @@ -145,17 +130,6 @@ impl Recorder for TermFrequencyRecorder { self.doc_freq } - fn push_documents(&self, - self_addr: u32, - document_receiver: &mut DocumentReceiver, - heap: &Heap) { - let mut doc_iter = self.stack.iter(self_addr, heap); - while let Some(doc) = doc_iter.next() { - doc_iter.next().expect("Panicked while trying to read a frequency"); - document_receiver.receive(doc); - } - } - fn serialize(&self, self_addr: u32, serializer: &mut PostingsSerializer, @@ -216,22 +190,6 @@ impl Recorder for TFAndPositionRecorder { self.doc_freq } - fn push_documents(&self, - self_addr: u32, - document_receiver: &mut DocumentReceiver, - heap: &Heap) { - let mut positions_iter = self.stack.iter(self_addr, heap); - while let Some(doc) = positions_iter.next() { - document_receiver.receive(doc); - loop { - let position = positions_iter.next().expect("This should never happen. Pleasee report the bug."); - if position == POSITION_END { - break; - } - } - } - } - fn serialize(&self, self_addr: u32, serializer: &mut PostingsSerializer,