From 8b68f22be1249bc08d1bffe0d24d3dd2bfbbb92c Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 15 Feb 2017 11:52:31 +0900 Subject: [PATCH] issue/43 made the delete queue shareable --- src/indexer/delete_queue.rs | 70 ++++++++++++++++++++++++++----------- src/indexer/index_writer.rs | 11 +++--- src/lib.rs | 2 -- 3 files changed, 55 insertions(+), 28 deletions(-) diff --git a/src/indexer/delete_queue.rs b/src/indexer/delete_queue.rs index 2a2d88abd..5ae89e9b5 100644 --- a/src/indexer/delete_queue.rs +++ b/src/indexer/delete_queue.rs @@ -1,31 +1,59 @@ use super::operation::DeleteOperation; +use std::sync::{Arc, RwLock}; +use std::mem; -// TODO remove clone -#[derive(Clone)] -pub struct DeleteQueue { - delete_operations: Vec, +/// This implementation assumes that we +/// have a lot more write operation than read operations. + +#[derive(Default)] +struct InnerDeleteQueue { + ro_chunks: ReadOnlyDeletes, + last_chunk: Vec, } +impl InnerDeleteQueue { + pub fn push(&mut self, delete_operation: DeleteOperation) { + self.last_chunk.push(delete_operation); + } + + pub fn operations(&mut self,) -> ReadOnlyDeletes { + if self.last_chunk.len() > 0 { + let new_operations = vec!(); + let new_ro_chunk = mem::replace(&mut self.last_chunk, new_operations); + self.ro_chunks.push(new_ro_chunk) + } + self.ro_chunks.clone() + } +} + +#[derive(Default, Clone)] +pub struct ReadOnlyDeletes(Vec>>); + +impl ReadOnlyDeletes { + fn push(&mut self, operations: Vec) { + self.0.push(Arc::new(operations)); + } + + pub fn iter<'a>(&'a self) -> impl Iterator { + self.0 + .iter() + .flat_map(|chunk| chunk.iter()) + } +} + +#[derive(Clone, Default)] +pub struct DeleteQueue(Arc>); + impl DeleteQueue { - - pub fn new() -> DeleteQueue { - DeleteQueue { - delete_operations: vec!(), - } - } - - pub fn push_op(&mut self, delete_operation: DeleteOperation) { - self.delete_operations.push(delete_operation); + pub fn push(&self, delete_operation: DeleteOperation) { + self.0.write().unwrap().push(delete_operation); } - pub fn operations(&self,) -> impl Iterator { - // TODO fix iterator - self.delete_operations.clone().into_iter() + pub fn operations(&self) -> ReadOnlyDeletes { + self.0.write().unwrap().operations() } } - - #[cfg(test)] mod tests { @@ -34,7 +62,7 @@ mod tests { #[test] fn test_deletequeue() { - let mut delete_queue = DeleteQueue::new(); + let delete_queue = DeleteQueue::default(); let make_op = |i: usize| { let field = Field(1u8); @@ -44,8 +72,8 @@ mod tests { } }; - delete_queue.push_op(make_op(1)); - delete_queue.push_op(make_op(2)); + delete_queue.push(make_op(1)); + delete_queue.push(make_op(2)); // TODO unit tests diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index bdbe39f29..15a25ca3d 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -130,7 +130,8 @@ pub fn advance_deletes( let mut last_opstamp_opt: Option = None; - for delete_op in delete_queue.operations() { + let delete_operations = delete_queue.operations(); + for delete_op in delete_operations.iter() { // A delete operation should only affect // document that were inserted after it. // @@ -316,7 +317,7 @@ impl IndexWriter { chan::sync(PIPELINE_MAX_SIZE_IN_DOCS); - let delete_queue = DeleteQueue::new(); + let delete_queue = DeleteQueue::default(); let segment_updater = SegmentUpdater::new(index.clone())?; @@ -437,7 +438,7 @@ impl IndexWriter { Error::ErrorInThread("Error while waiting for rollback.".to_string()) )?; - self.delete_queue = DeleteQueue::new(); + self.delete_queue = DeleteQueue::default(); // reset the opstamp self.uncommitted_opstamp = self.committed_opstamp; @@ -488,7 +489,7 @@ impl IndexWriter { // committed segments. self.committed_opstamp = self.stamp(); - let new_delete_queue = DeleteQueue::new(); + let new_delete_queue = DeleteQueue::default(); // TODO remove clone let future = self.segment_updater.commit(self.delete_queue.clone(), self.committed_opstamp); @@ -508,7 +509,7 @@ impl IndexWriter { opstamp: opstamp, term: term, }; - self.delete_queue.push_op(delete_operation); + self.delete_queue.push(delete_operation); } fn stamp(&mut self) -> u64 { diff --git a/src/lib.rs b/src/lib.rs index acb4ecba1..288d95e7c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,8 +28,6 @@ extern crate log; #[macro_use] extern crate version; - -#[macro_use] extern crate fst; extern crate byteorder; extern crate memmap;