From 1c03d98a11e4ef8cd8f34efd0314d76469c9d15a Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 16 Feb 2017 10:20:29 +0900 Subject: [PATCH] issue/43 added delete_queue right in the segment updater --- src/indexer/delete_queue.rs | 82 +++++++++++++++++++--------------- src/indexer/index_writer.rs | 12 +++-- src/indexer/segment_updater.rs | 14 +++--- 3 files changed, 59 insertions(+), 49 deletions(-) diff --git a/src/indexer/delete_queue.rs b/src/indexer/delete_queue.rs index 5ae89e9b5..425d290ce 100644 --- a/src/indexer/delete_queue.rs +++ b/src/indexer/delete_queue.rs @@ -16,7 +16,7 @@ impl InnerDeleteQueue { self.last_chunk.push(delete_operation); } - pub fn operations(&mut self,) -> ReadOnlyDeletes { + pub fn snapshot(&mut self,) -> ReadOnlyDeletes { if self.last_chunk.len() > 0 { let new_operations = vec!(); let new_ro_chunk = mem::replace(&mut self.last_chunk, new_operations); @@ -24,6 +24,11 @@ impl InnerDeleteQueue { } self.ro_chunks.clone() } + + pub fn clear(&mut self) { + self.ro_chunks.clear(); + self.last_chunk.clear(); + } } #[derive(Default, Clone)] @@ -39,6 +44,10 @@ impl ReadOnlyDeletes { .iter() .flat_map(|chunk| chunk.iter()) } + + pub fn clear(&mut self) { + self.0.clear(); + } } #[derive(Clone, Default)] @@ -49,8 +58,12 @@ impl DeleteQueue { self.0.write().unwrap().push(delete_operation); } - pub fn operations(&self) -> ReadOnlyDeletes { - self.0.write().unwrap().operations() + pub fn snapshot(&self) -> ReadOnlyDeletes { + self.0.write().unwrap().snapshot() + } + + pub fn clear(&self) { + self.0.write().unwrap().clear(); } } @@ -74,40 +87,35 @@ mod tests { delete_queue.push(make_op(1)); delete_queue.push(make_op(2)); - - // TODO unit tests - // let mut delete_cursor_3 = delete_queue.cursor(); - // let mut delete_cursor_3_b = delete_cursor_3.clone(); - - // assert!(delete_cursor_3.next().is_none()); - // assert!(delete_cursor_3.peek().is_none()); - - // delete_queue.push_op(make_op(3)); - // delete_queue.push_op(make_op(4)); - - // assert_eq!(delete_cursor_3_b.peek(), Some(make_op(3))); - // let mut delete_cursor_3_c = delete_cursor_3_b.clone(); - - // assert_eq!(delete_cursor_3_b.next(), Some(make_op(3))); - // let mut delete_cursor_4 = delete_cursor_3_b.clone(); - - // assert_eq!(delete_cursor_3_b.peek(), Some(make_op(4))); - // assert_eq!(delete_cursor_3_b.next(), Some(make_op(4))); - - // assert_eq!(delete_cursor_3_c.next(), Some(make_op(3))); - - // assert!(delete_cursor_3_b.next().is_none()); - // assert_eq!(delete_cursor_3_c.next(), Some(make_op(4))); - // assert!(delete_cursor_3_c.next().is_none()); - - // assert_eq!(delete_cursor_3.peek(), Some(make_op(3))); - // assert_eq!(delete_cursor_3.next(), Some(make_op(3))); - // assert!(delete_cursor_3_b.next().is_none()); - - // assert_eq!(delete_cursor_4.next(), Some(make_op(4))); - // assert!(delete_cursor_4.next().is_none()); - - + let snapshot = delete_queue.snapshot(); + { + let mut operations_it = snapshot.iter(); + assert_eq!(operations_it.next().unwrap().opstamp, 1); + assert_eq!(operations_it.next().unwrap().opstamp, 2); + assert!(operations_it.next().is_none()); + } + { // iterating does not consume results. + let mut operations_it = snapshot.iter(); + assert_eq!(operations_it.next().unwrap().opstamp, 1); + assert_eq!(operations_it.next().unwrap().opstamp, 2); + assert!(operations_it.next().is_none()); + } + // operations does not own a lock on the queue. + delete_queue.push(make_op(3)); + let snapshot2 = delete_queue.snapshot(); + { + // operations is not affected by + // the push that occurs after. + let mut operations_it = snapshot.iter(); + let mut operations2_it = snapshot2.iter(); + assert_eq!(operations_it.next().unwrap().opstamp, 1); + assert_eq!(operations2_it.next().unwrap().opstamp, 1); + assert_eq!(operations_it.next().unwrap().opstamp, 2); + assert_eq!(operations2_it.next().unwrap().opstamp, 2); + assert!(operations_it.next().is_none()); + assert_eq!(operations2_it.next().unwrap().opstamp, 3); + assert!(operations2_it.next().is_none()); + } } } \ No newline at end of file diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 15a25ca3d..03b936229 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -130,7 +130,7 @@ pub fn advance_deletes( let mut last_opstamp_opt: Option = None; - let delete_operations = delete_queue.operations(); + let delete_operations = delete_queue.snapshot(); for delete_op in delete_operations.iter() { // A delete operation should only affect // document that were inserted after it. @@ -319,7 +319,7 @@ impl IndexWriter { let delete_queue = DeleteQueue::default(); - let segment_updater = SegmentUpdater::new(index.clone())?; + let segment_updater = SegmentUpdater::new(index.clone(), delete_queue.clone())?; let mut index_writer = IndexWriter { @@ -438,7 +438,7 @@ impl IndexWriter { Error::ErrorInThread("Error while waiting for rollback.".to_string()) )?; - self.delete_queue = DeleteQueue::default(); + self.delete_queue.clear(); // reset the opstamp self.uncommitted_opstamp = self.committed_opstamp; @@ -489,16 +489,14 @@ impl IndexWriter { // committed segments. self.committed_opstamp = self.stamp(); - let new_delete_queue = DeleteQueue::default(); - // TODO remove clone - let future = self.segment_updater.commit(self.delete_queue.clone(), self.committed_opstamp); + let future = self.segment_updater.commit(self.committed_opstamp); // wait for the segment update thread to have processed the info // TODO remove unwrap future.wait().unwrap(); - self.delete_queue = new_delete_queue; + self.delete_queue.clear(); Ok(self.committed_opstamp) } diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 28c9b0c84..d72efd313 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -102,11 +102,12 @@ struct InnerSegmentUpdater { merging_thread_id: AtomicUsize, merging_threads: RwLock>>>, generation: AtomicUsize, + delete_queue: DeleteQueue, } impl SegmentUpdater { - pub fn new(index: Index) -> Result + pub fn new(index: Index, delete_queue: DeleteQueue) -> Result { let segments = index.segments()?; let segment_manager = SegmentManager::from_segments(segments); @@ -119,6 +120,7 @@ impl SegmentUpdater { merging_thread_id: AtomicUsize::default(), merging_threads: RwLock::new(HashMap::new()), generation: AtomicUsize::default(), + delete_queue: delete_queue, })) ) } @@ -163,21 +165,21 @@ impl SegmentUpdater { } } - fn purge_deletes(&self, delete_queue: &DeleteQueue) -> Result> { + fn purge_deletes(&self) -> Result> { self.0.segment_manager .segment_entries() .into_iter() .map(|segment_entry| { let mut segment = self.0.index.segment(segment_entry.meta().clone()); - advance_deletes(&mut segment, delete_queue, segment_entry.doc_to_opstamp()) + advance_deletes(&mut segment, &self.0.delete_queue, segment_entry.doc_to_opstamp()) .map(|entry| entry.meta().clone()) }) .collect() } - pub fn commit(&self, delete_queue: DeleteQueue, opstamp: u64) -> impl Future { + pub fn commit(&self, opstamp: u64) -> impl Future { self.run_async(move |segment_updater| { - let segment_metas = segment_updater.purge_deletes(&delete_queue).expect("Failed purge deletes"); + let segment_metas = segment_updater.purge_deletes().expect("Failed purge deletes"); let segment_entries = segment_metas .into_iter() .map(SegmentEntry::new) @@ -210,6 +212,8 @@ impl SegmentUpdater { let merging_join_handle = thread::spawn(move || { + + // first we need to apply deletes to our segment. info!("Start merge: {:?}", segment_ids_vec); let ref index = segment_updater_clone.0.index;