diff --git a/src/indexer/delete_queue.rs b/src/indexer/delete_queue.rs index 0286454c7..1f476dad6 100644 --- a/src/indexer/delete_queue.rs +++ b/src/indexer/delete_queue.rs @@ -1,90 +1,273 @@ use super::operation::DeleteOperation; use std::sync::{Arc, RwLock}; - -/// This implementation assumes that we -/// have a lot more write operation than read operations. +use std::mem; +use std::ops::DerefMut; -type InnerDeleteQueue = Arc>>; - -// TODO very inefficient. -// fix this once the refactoring/bugfix is done -#[derive(Clone)] -pub struct DeleteCursor { - cursor: usize, - operations: InnerDeleteQueue, +// The DeleteQueue is similar in conceptually to a multiple +// consumer single producer broadcast channel. +// +// All consumer will receive all messages. +// +// Consumer of the delete queue are holding a `DeleteCursor`, +// which points to a specific place of the `DeleteQueue`. +// +// New consumer can be created in two ways +// - calling `delete_queue.cursor()` returns a cursor, that +// will include all future delete operation (and no past operations). +// - cloning an existing cursor returns a new cursor, that +// is at the exact same position, and can now advance independantly +// from the original cursor. +#[derive(Default)] +struct InnerDeleteQueue { + writer: Vec, + last_block: Option>, // TODO last block... is that ok. } -impl DeleteCursor { - - pub fn skip_to(&mut self, target_opstamp: u64) { - while let Some(operation) = self.peek() { - if operation.opstamp >= target_opstamp { - break; - } - self.advance() - } - } - - pub fn advance(&mut self) { - let read = self.operations.read().unwrap(); - if self.cursor < read.len() { - self.cursor += 1; - } - } - - pub fn peek(&self,) -> Option { - let read = self.operations.read().unwrap(); - if self.cursor >= read.len() { - None - } - else { - let operation = read[self.cursor].clone(); - Some(operation) - } - } -} - -// TODO remove copy -impl Iterator for DeleteCursor { - - type Item=DeleteOperation; - - fn next(&mut self) -> Option{ - let read = self.operations.read().unwrap(); - if self.cursor >= read.len() { - None - } - else { - let operation = read[self.cursor].clone(); - self.cursor += 1; - Some(operation) - } - } -} - - #[derive(Clone, Default)] -pub struct DeleteQueue(InnerDeleteQueue); +pub struct DeleteQueue { + inner: Arc>, +} + impl DeleteQueue { + // Creates a new delete queue. pub fn new() -> DeleteQueue { - DeleteQueue::default() - } + + let delete_queue = DeleteQueue { + inner: Arc::new(RwLock::new(InnerDeleteQueue::default())) + }; + + let next_block = NextBlock::from(delete_queue.clone()); + { + let mut delete_queue_wlock = delete_queue.inner.write().unwrap(); + delete_queue_wlock.last_block = Some( + Arc::new(Block { + operations: Arc::default(), + next: next_block, + }) + ); + } - pub fn push(&self, delete_operation: DeleteOperation) { - self.0.write().unwrap().push(delete_operation); + delete_queue } + + // Creates a new cursor that makes it possible to + // consume future delete operations. + // + // Past delete operations are not accessible. pub fn cursor(&self) -> DeleteCursor { + let last_block = self.inner + .read() + .unwrap() + .last_block + .clone() + .expect("Failed to unwrap last_block. This should never happen + as the Option<> is only here to make + initialization possible"); + let operations_len = last_block.operations.len(); DeleteCursor { - cursor: 0, - operations: self.0.clone(), + block: last_block, + pos: operations_len, + } + } + + // Appends a new delete operations. + pub fn push(&self, delete_operation: DeleteOperation) { + self.inner + .write() + .expect("Failed to acquire write lock on delete queue writer") + .writer + .push(delete_operation); + } + + // DeleteQueue is a linked list of blocks of + // delete operations. + // + // Writing happens by simply appending to a vec. + // `.flush()` takes this pending delete operations vec + // creates a new read-only block from it, + // and appends it to the linked list. + // + // `.flush()` happens when, for instance, + // a consumer reaches the last read-only operations. + // It then ask the delete queue if there happen to + // be some unflushed operations. + // + fn flush(&self) -> Option> { + let mut self_wlock = self + .inner + .write() + .expect("Failed to acquire write lock on delete queue writer"); + + let delete_operations; + { + let writer: &mut Vec = &mut self_wlock.writer; + if writer.is_empty() { + return None; + } + delete_operations = mem::replace(writer, vec!()); + } + + let next_block = NextBlock::from(self.clone()); + { + self_wlock.last_block = Some( + Arc::new(Block { + operations: Arc::new(delete_operations), + next: next_block, + }) + ); + } + self_wlock.last_block.clone() + } +} + +enum InnerNextBlock { + Writer(DeleteQueue), + Closed(Arc), +} + +struct NextBlock(RwLock); + +impl From for NextBlock { + fn from(delete_queue: DeleteQueue) -> NextBlock { + NextBlock(RwLock::new(InnerNextBlock::Writer(delete_queue))) + } +} + +impl NextBlock { + fn next_block(&self) -> Option> { + { + let next_read_lock = self.0 + .read() + .expect("Failed to acquire write lock in delete queue"); + match *next_read_lock { + InnerNextBlock::Closed(ref block) => { + return Some(block.clone()); + } + _ => {} + } + } + let next_block; + { + let mut next_write_lock = self.0 + .write() + .expect("Failed to acquire write lock in delete queue"); + match *next_write_lock { + InnerNextBlock::Closed(ref block) => { + return Some(block.clone()); + } + InnerNextBlock::Writer(ref writer) => { + match writer.flush() { + Some(flushed_next_block) => { + next_block = flushed_next_block; + } + None => { + return None; + } + } + } + } + *next_write_lock.deref_mut() = InnerNextBlock::Closed(next_block.clone()); // TODO fix + return Some(next_block) } } } +struct Block { + operations: Arc>, + next: NextBlock, +} + + +#[derive(Clone)] +pub struct DeleteCursor { + block: Arc, + pos: usize, +} + + +impl DeleteCursor { + + /// Skips operations and position it so that + /// - either all of the delete operation currently in the + /// queue are consume and the next get will return None. + /// - the next get will return the first operation with an + /// `opstamp >= target_opstamp`. + pub fn skip_to(&mut self, target_opstamp: u64) { + // TODO Can be optimize as we work with block. + loop { + if let Some(operation) = self.get() { + if operation.opstamp >= target_opstamp { + break; + } + } + else { + break; + } + self.advance(); + } + } + + /// If the current block has been entirely + /// consumed, try to load the next one. + /// + /// Return `true`, if after this attempt, + /// the cursor is on a block that has not + /// been entirely consumed. + /// Return `false`, if we have reached the end of the queue. + fn load_block_if_required(&mut self) -> bool { + if self.pos >= self.block.operations.len() { + // we have consumed our operations entirely. + // let's ask our writer if he has more for us. + // self.go_next_block(); + match self.block.next.next_block() { + Some(block) => { + self.block = block; + self.pos = 0; + true + } + None => { + false + } + } + } + else { + true + } + } + + /// Advance to the next delete operation. + /// Returns true iff there is such an operation. + pub fn advance(&mut self) -> bool { + if self.load_block_if_required() { + self.pos += 1; + true + } + else { + false + } + } + + /// Get the current delete operation. + /// Calling `.get` does not advance the cursor. + pub fn get<'a>(&'a mut self) -> Option<&'a DeleteOperation> { + if self.load_block_if_required() { + Some(&self.block.operations[self.pos]) + } + else { + None + } + } + +} + + + + + #[cfg(test)] mod tests { @@ -110,32 +293,33 @@ mod tests { let snapshot = delete_queue.cursor(); { let mut operations_it = snapshot.clone(); - assert_eq!(operations_it.next().unwrap().opstamp, 1); - assert_eq!(operations_it.next().unwrap().opstamp, 2); - assert!(operations_it.next().is_none()); + assert_eq!(operations_it.get().unwrap().opstamp, 1); + operations_it.advance(); + assert_eq!(operations_it.get().unwrap().opstamp, 2); + operations_it.advance(); + assert!(operations_it.get().is_none()); + operations_it.advance(); + + let mut snapshot2 = delete_queue.cursor(); + assert!(snapshot2.get().is_none()); + delete_queue.push(make_op(3)); + assert_eq!(snapshot2.get().unwrap().opstamp, 3); + assert_eq!(operations_it.get().unwrap().opstamp, 3); + assert_eq!(operations_it.get().unwrap().opstamp, 3); + operations_it.advance(); + assert!(operations_it.get().is_none()); + operations_it.advance(); } { let mut operations_it = snapshot.clone(); - assert_eq!(operations_it.next().unwrap().opstamp, 1); - assert_eq!(operations_it.next().unwrap().opstamp, 2); - assert!(operations_it.next().is_none()); + assert_eq!(operations_it.get().unwrap().opstamp, 1); + operations_it.advance(); + assert_eq!(operations_it.get().unwrap().opstamp, 2); + operations_it.advance(); + assert_eq!(operations_it.get().unwrap().opstamp, 3); + operations_it.advance(); + assert!(operations_it.get().is_none()); } - // // operations does not own a lock on the queue. - // delete_queue.push(make_op(3)); - // let snapshot2 = delete_queue.snapshot(); - // { - // // operations is not affected by - // // the push that occurs after. - // let mut operations_it = snapshot.iter(); - // let mut operations2_it = snapshot2.iter(); - // assert_eq!(operations_it.next().unwrap().opstamp, 1); - // assert_eq!(operations2_it.next().unwrap().opstamp, 1); - // assert_eq!(operations_it.next().unwrap().opstamp, 2); - // assert_eq!(operations2_it.next().unwrap().opstamp, 2); - // assert!(operations_it.next().is_none()); - // assert_eq!(operations2_it.next().unwrap().opstamp, 3); - // assert!(operations2_it.next().is_none()); - // } } } \ No newline at end of file diff --git a/src/indexer/delete_queue2.rs b/src/indexer/delete_queue2.rs deleted file mode 100644 index e2b8230d1..000000000 --- a/src/indexer/delete_queue2.rs +++ /dev/null @@ -1,206 +0,0 @@ -use super::operation::DeleteOperation; -use std::sync::{Arc, RwLock}; -use std::mem; -use std::ops::DerefMut; - - -#[derive(Clone, Default)] -struct DeleteQueue { - writer: Arc>>, - next_block: Option, -} - -impl DeleteQueue { - - pub fn new() -> Arc { - let mut delete_queue = Arc::new(DeleteQueue::default()); - delete_queue.next_block = Some( - NextBlock::from(delete_queue) - ); - delete_queue - } - - pub fn cursor(&self) -> Cursor { - - Cursor { - current_block: Arc, - pos: 0, - } - } - - pub fn push(&self, delete_operation: DeleteOperation) { - let mut write_lock = self.writer - .write() - .expect("Failed to acquire write lock on delete queue writer") - .push(delete_operation); - } - - fn flush(&self) -> Option> { - let mut write_lock = self - .writer - .write() - .expect("Failed to acquire write lock on delete queue writer"); - if write_lock.is_empty() { - return None; - } - Some(mem::replace(write_lock.deref_mut(), vec!())) - } -} - -enum InnerNextBlock { - Writer(Arc), - Closed(Arc), - Terminated, -} - -struct NextBlock(RwLock); - -impl From> for NextBlock { - fn from(writer_arc: Arc) -> NextBlock { - NextBlock(RwLock::new(InnerNextBlock::Writer(writer_arc))) - } -} - -impl NextBlock { - pub fn next_block(&self) -> Option> { - { - let next_read_lock = self.0 - .read() - .expect("Failed to acquire write lock in delete queue"); - match *next_read_lock { - InnerNextBlock::Terminated => { - return None; - } - InnerNextBlock::Closed(ref block) => { - return Some(block.clone()); - } - _ => {} - } - } - let delete_operations; - let writer_arc; - { - let mut next_write_lock = self.0 - .write() - .expect("Failed to acquire write lock in delete queue"); - match *next_write_lock { - InnerNextBlock::Terminated => { - return None; - } - InnerNextBlock::Closed(ref block) => { - return Some(block.clone()); - } - InnerNextBlock::Writer(ref writer) => { - match writer.flush() { - Some(flushed_delete_operations) => { - delete_operations = flushed_delete_operations; - } - None => { - return None; - } - } - writer_arc = writer.clone(); - } - } - let next_block = Arc::new(Block { - operations: Arc::new(delete_operations), - next: NextBlock::from(writer_arc), - }); - *next_write_lock.deref_mut() = InnerNextBlock::Closed(next_block.clone()); // TODO fix - return Some(next_block) - } - } -} - -struct Block { - operations: Arc>, - next: NextBlock, -} - - -#[derive(Clone)] -struct Cursor { - current_block: Arc, - pos: usize, -} - -impl Cursor { - fn next<'a>(&'a mut self) -> Option<&'a DeleteOperation> { - if self.pos >= self.current_block.operations.len() { - // we have consumed our operations entirely. - // let's ask our writer if he has more for us. - // self.go_next_block(); - match self.current_block.next.next_block() { - Some(block) => { - self.current_block = block; - self.pos = 0; - } - None => { - return None; - } - } - } - let operation = &self.current_block.operations[self.pos]; - self.pos += 1; - return Some(operation); - } -} - - - - - - -#[cfg(test)] -mod tests { - - use super::{DeleteQueue, DeleteOperation}; - use schema::{Term, Field}; - - #[test] - fn test_deletequeue() { - let delete_queue = DeleteQueue::new(); - - let make_op = |i: usize| { - let field = Field(1u8); - DeleteOperation { - opstamp: i as u64, - term: Term::from_field_u32(field, i as u32) - } - }; - - delete_queue.push(make_op(1)); - delete_queue.push(make_op(2)); - - let snapshot = delete_queue.cursor(); - { - let mut operations_it = snapshot.clone(); - assert_eq!(operations_it.next().unwrap().opstamp, 1); - assert_eq!(operations_it.next().unwrap().opstamp, 2); - assert!(operations_it.next().is_none()); - } - { - let mut operations_it = snapshot.clone(); - assert_eq!(operations_it.next().unwrap().opstamp, 1); - assert_eq!(operations_it.next().unwrap().opstamp, 2); - assert!(operations_it.next().is_none()); - } - - // // operations does not own a lock on the queue. - // delete_queue.push(make_op(3)); - // let snapshot2 = delete_queue.snapshot(); - // { - // // operations is not affected by - // // the push that occurs after. - // let mut operations_it = snapshot.iter(); - // let mut operations2_it = snapshot2.iter(); - // assert_eq!(operations_it.next().unwrap().opstamp, 1); - // assert_eq!(operations2_it.next().unwrap().opstamp, 1); - // assert_eq!(operations_it.next().unwrap().opstamp, 2); - // assert_eq!(operations2_it.next().unwrap().opstamp, 2); - // assert!(operations_it.next().is_none()); - // assert_eq!(operations2_it.next().unwrap().opstamp, 3); - // assert!(operations2_it.next().is_none()); - // } - } -} \ No newline at end of file diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 33afe7108..0ee8d716c 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -165,7 +165,7 @@ pub fn compute_deleted_bitset( let mut might_have_changed = false; loop { - if let Some(delete_op) = delete_cursor.peek() { + if let Some(delete_op) = delete_cursor.get() { if delete_op.opstamp > target_opstamp { break; } diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index af6e6a21d..4dcd9fe12 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -14,9 +14,6 @@ mod doc_opstamp_mapping; pub mod operation; mod stamper; - -// TODO avoid exposing SegmentState / SegmentEntry if it does not have to be public API - pub use self::segment_entry::{SegmentEntry, SegmentState}; pub use self::segment_serializer::SegmentSerializer; pub use self::segment_writer::SegmentWriter; diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 74ea6ca2f..f00923e37 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -359,7 +359,8 @@ impl SegmentUpdater { self.run_async(move |segment_updater| { debug!("End merge {:?}", after_merge_segment_entry.meta()); - if let Some(delete_operation) = after_merge_segment_entry.delete_cursor().peek() { + let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone(); + if let Some(delete_operation) = delete_cursor.get() { let committed_opstamp = segment_updater.0.index.opstamp(); if delete_operation.opstamp < committed_opstamp { let segment = segment_updater.0.index.segment(after_merge_segment_entry.meta().clone());