issue/96 Changed datastruct for the delete queue.

This commit is contained in:
Paul Masurel
2017-04-01 21:01:10 +09:00
parent 4fc7bc5f09
commit afd08a7bbc
5 changed files with 276 additions and 300 deletions

View File

@@ -1,90 +1,273 @@
use super::operation::DeleteOperation;
use std::sync::{Arc, RwLock};
/// This implementation assumes that we
/// have a lot more write operation than read operations.
use std::mem;
use std::ops::DerefMut;
type InnerDeleteQueue = Arc<RwLock<Vec<DeleteOperation>>>;
// TODO very inefficient.
// fix this once the refactoring/bugfix is done
#[derive(Clone)]
pub struct DeleteCursor {
cursor: usize,
operations: InnerDeleteQueue,
// The DeleteQueue is similar in conceptually to a multiple
// consumer single producer broadcast channel.
//
// All consumer will receive all messages.
//
// Consumer of the delete queue are holding a `DeleteCursor`,
// which points to a specific place of the `DeleteQueue`.
//
// New consumer can be created in two ways
// - calling `delete_queue.cursor()` returns a cursor, that
// will include all future delete operation (and no past operations).
// - cloning an existing cursor returns a new cursor, that
// is at the exact same position, and can now advance independantly
// from the original cursor.
#[derive(Default)]
struct InnerDeleteQueue {
writer: Vec<DeleteOperation>,
last_block: Option<Arc<Block>>, // TODO last block... is that ok.
}
impl DeleteCursor {
pub fn skip_to(&mut self, target_opstamp: u64) {
while let Some(operation) = self.peek() {
if operation.opstamp >= target_opstamp {
break;
}
self.advance()
}
}
pub fn advance(&mut self) {
let read = self.operations.read().unwrap();
if self.cursor < read.len() {
self.cursor += 1;
}
}
pub fn peek(&self,) -> Option<DeleteOperation> {
let read = self.operations.read().unwrap();
if self.cursor >= read.len() {
None
}
else {
let operation = read[self.cursor].clone();
Some(operation)
}
}
}
// TODO remove copy
impl Iterator for DeleteCursor {
type Item=DeleteOperation;
fn next(&mut self) -> Option<DeleteOperation >{
let read = self.operations.read().unwrap();
if self.cursor >= read.len() {
None
}
else {
let operation = read[self.cursor].clone();
self.cursor += 1;
Some(operation)
}
}
}
#[derive(Clone, Default)]
pub struct DeleteQueue(InnerDeleteQueue);
pub struct DeleteQueue {
inner: Arc<RwLock<InnerDeleteQueue>>,
}
impl DeleteQueue {
// Creates a new delete queue.
pub fn new() -> DeleteQueue {
DeleteQueue::default()
}
let delete_queue = DeleteQueue {
inner: Arc::new(RwLock::new(InnerDeleteQueue::default()))
};
let next_block = NextBlock::from(delete_queue.clone());
{
let mut delete_queue_wlock = delete_queue.inner.write().unwrap();
delete_queue_wlock.last_block = Some(
Arc::new(Block {
operations: Arc::default(),
next: next_block,
})
);
}
pub fn push(&self, delete_operation: DeleteOperation) {
self.0.write().unwrap().push(delete_operation);
delete_queue
}
// Creates a new cursor that makes it possible to
// consume future delete operations.
//
// Past delete operations are not accessible.
pub fn cursor(&self) -> DeleteCursor {
let last_block = self.inner
.read()
.unwrap()
.last_block
.clone()
.expect("Failed to unwrap last_block. This should never happen
as the Option<> is only here to make
initialization possible");
let operations_len = last_block.operations.len();
DeleteCursor {
cursor: 0,
operations: self.0.clone(),
block: last_block,
pos: operations_len,
}
}
// Appends a new delete operations.
pub fn push(&self, delete_operation: DeleteOperation) {
self.inner
.write()
.expect("Failed to acquire write lock on delete queue writer")
.writer
.push(delete_operation);
}
// DeleteQueue is a linked list of blocks of
// delete operations.
//
// Writing happens by simply appending to a vec.
// `.flush()` takes this pending delete operations vec
// creates a new read-only block from it,
// and appends it to the linked list.
//
// `.flush()` happens when, for instance,
// a consumer reaches the last read-only operations.
// It then ask the delete queue if there happen to
// be some unflushed operations.
//
fn flush(&self) -> Option<Arc<Block>> {
let mut self_wlock = self
.inner
.write()
.expect("Failed to acquire write lock on delete queue writer");
let delete_operations;
{
let writer: &mut Vec<DeleteOperation> = &mut self_wlock.writer;
if writer.is_empty() {
return None;
}
delete_operations = mem::replace(writer, vec!());
}
let next_block = NextBlock::from(self.clone());
{
self_wlock.last_block = Some(
Arc::new(Block {
operations: Arc::new(delete_operations),
next: next_block,
})
);
}
self_wlock.last_block.clone()
}
}
enum InnerNextBlock {
Writer(DeleteQueue),
Closed(Arc<Block>),
}
struct NextBlock(RwLock<InnerNextBlock>);
impl From<DeleteQueue> for NextBlock {
fn from(delete_queue: DeleteQueue) -> NextBlock {
NextBlock(RwLock::new(InnerNextBlock::Writer(delete_queue)))
}
}
impl NextBlock {
fn next_block(&self) -> Option<Arc<Block>> {
{
let next_read_lock = self.0
.read()
.expect("Failed to acquire write lock in delete queue");
match *next_read_lock {
InnerNextBlock::Closed(ref block) => {
return Some(block.clone());
}
_ => {}
}
}
let next_block;
{
let mut next_write_lock = self.0
.write()
.expect("Failed to acquire write lock in delete queue");
match *next_write_lock {
InnerNextBlock::Closed(ref block) => {
return Some(block.clone());
}
InnerNextBlock::Writer(ref writer) => {
match writer.flush() {
Some(flushed_next_block) => {
next_block = flushed_next_block;
}
None => {
return None;
}
}
}
}
*next_write_lock.deref_mut() = InnerNextBlock::Closed(next_block.clone()); // TODO fix
return Some(next_block)
}
}
}
struct Block {
operations: Arc<Vec<DeleteOperation>>,
next: NextBlock,
}
#[derive(Clone)]
pub struct DeleteCursor {
block: Arc<Block>,
pos: usize,
}
impl DeleteCursor {
/// Skips operations and position it so that
/// - either all of the delete operation currently in the
/// queue are consume and the next get will return None.
/// - the next get will return the first operation with an
/// `opstamp >= target_opstamp`.
pub fn skip_to(&mut self, target_opstamp: u64) {
// TODO Can be optimize as we work with block.
loop {
if let Some(operation) = self.get() {
if operation.opstamp >= target_opstamp {
break;
}
}
else {
break;
}
self.advance();
}
}
/// If the current block has been entirely
/// consumed, try to load the next one.
///
/// Return `true`, if after this attempt,
/// the cursor is on a block that has not
/// been entirely consumed.
/// Return `false`, if we have reached the end of the queue.
fn load_block_if_required(&mut self) -> bool {
if self.pos >= self.block.operations.len() {
// we have consumed our operations entirely.
// let's ask our writer if he has more for us.
// self.go_next_block();
match self.block.next.next_block() {
Some(block) => {
self.block = block;
self.pos = 0;
true
}
None => {
false
}
}
}
else {
true
}
}
/// Advance to the next delete operation.
/// Returns true iff there is such an operation.
pub fn advance(&mut self) -> bool {
if self.load_block_if_required() {
self.pos += 1;
true
}
else {
false
}
}
/// Get the current delete operation.
/// Calling `.get` does not advance the cursor.
pub fn get<'a>(&'a mut self) -> Option<&'a DeleteOperation> {
if self.load_block_if_required() {
Some(&self.block.operations[self.pos])
}
else {
None
}
}
}
#[cfg(test)]
mod tests {
@@ -110,32 +293,33 @@ mod tests {
let snapshot = delete_queue.cursor();
{
let mut operations_it = snapshot.clone();
assert_eq!(operations_it.next().unwrap().opstamp, 1);
assert_eq!(operations_it.next().unwrap().opstamp, 2);
assert!(operations_it.next().is_none());
assert_eq!(operations_it.get().unwrap().opstamp, 1);
operations_it.advance();
assert_eq!(operations_it.get().unwrap().opstamp, 2);
operations_it.advance();
assert!(operations_it.get().is_none());
operations_it.advance();
let mut snapshot2 = delete_queue.cursor();
assert!(snapshot2.get().is_none());
delete_queue.push(make_op(3));
assert_eq!(snapshot2.get().unwrap().opstamp, 3);
assert_eq!(operations_it.get().unwrap().opstamp, 3);
assert_eq!(operations_it.get().unwrap().opstamp, 3);
operations_it.advance();
assert!(operations_it.get().is_none());
operations_it.advance();
}
{
let mut operations_it = snapshot.clone();
assert_eq!(operations_it.next().unwrap().opstamp, 1);
assert_eq!(operations_it.next().unwrap().opstamp, 2);
assert!(operations_it.next().is_none());
assert_eq!(operations_it.get().unwrap().opstamp, 1);
operations_it.advance();
assert_eq!(operations_it.get().unwrap().opstamp, 2);
operations_it.advance();
assert_eq!(operations_it.get().unwrap().opstamp, 3);
operations_it.advance();
assert!(operations_it.get().is_none());
}
// // operations does not own a lock on the queue.
// delete_queue.push(make_op(3));
// let snapshot2 = delete_queue.snapshot();
// {
// // operations is not affected by
// // the push that occurs after.
// let mut operations_it = snapshot.iter();
// let mut operations2_it = snapshot2.iter();
// assert_eq!(operations_it.next().unwrap().opstamp, 1);
// assert_eq!(operations2_it.next().unwrap().opstamp, 1);
// assert_eq!(operations_it.next().unwrap().opstamp, 2);
// assert_eq!(operations2_it.next().unwrap().opstamp, 2);
// assert!(operations_it.next().is_none());
// assert_eq!(operations2_it.next().unwrap().opstamp, 3);
// assert!(operations2_it.next().is_none());
// }
}
}

View File

@@ -1,206 +0,0 @@
use super::operation::DeleteOperation;
use std::sync::{Arc, RwLock};
use std::mem;
use std::ops::DerefMut;
#[derive(Clone, Default)]
struct DeleteQueue {
writer: Arc<RwLock<Vec<DeleteOperation>>>,
next_block: Option<NextBlock>,
}
impl DeleteQueue {
pub fn new() -> Arc<DeleteQueue> {
let mut delete_queue = Arc::new(DeleteQueue::default());
delete_queue.next_block = Some(
NextBlock::from(delete_queue)
);
delete_queue
}
pub fn cursor(&self) -> Cursor {
Cursor {
current_block: Arc<Block>,
pos: 0,
}
}
pub fn push(&self, delete_operation: DeleteOperation) {
let mut write_lock = self.writer
.write()
.expect("Failed to acquire write lock on delete queue writer")
.push(delete_operation);
}
fn flush(&self) -> Option<Vec<DeleteOperation>> {
let mut write_lock = self
.writer
.write()
.expect("Failed to acquire write lock on delete queue writer");
if write_lock.is_empty() {
return None;
}
Some(mem::replace(write_lock.deref_mut(), vec!()))
}
}
enum InnerNextBlock {
Writer(Arc<DeleteQueue>),
Closed(Arc<Block>),
Terminated,
}
struct NextBlock(RwLock<InnerNextBlock>);
impl From<Arc<DeleteQueue>> for NextBlock {
fn from(writer_arc: Arc<DeleteQueue>) -> NextBlock {
NextBlock(RwLock::new(InnerNextBlock::Writer(writer_arc)))
}
}
impl NextBlock {
pub fn next_block(&self) -> Option<Arc<Block>> {
{
let next_read_lock = self.0
.read()
.expect("Failed to acquire write lock in delete queue");
match *next_read_lock {
InnerNextBlock::Terminated => {
return None;
}
InnerNextBlock::Closed(ref block) => {
return Some(block.clone());
}
_ => {}
}
}
let delete_operations;
let writer_arc;
{
let mut next_write_lock = self.0
.write()
.expect("Failed to acquire write lock in delete queue");
match *next_write_lock {
InnerNextBlock::Terminated => {
return None;
}
InnerNextBlock::Closed(ref block) => {
return Some(block.clone());
}
InnerNextBlock::Writer(ref writer) => {
match writer.flush() {
Some(flushed_delete_operations) => {
delete_operations = flushed_delete_operations;
}
None => {
return None;
}
}
writer_arc = writer.clone();
}
}
let next_block = Arc::new(Block {
operations: Arc::new(delete_operations),
next: NextBlock::from(writer_arc),
});
*next_write_lock.deref_mut() = InnerNextBlock::Closed(next_block.clone()); // TODO fix
return Some(next_block)
}
}
}
struct Block {
operations: Arc<Vec<DeleteOperation>>,
next: NextBlock,
}
#[derive(Clone)]
struct Cursor {
current_block: Arc<Block>,
pos: usize,
}
impl Cursor {
fn next<'a>(&'a mut self) -> Option<&'a DeleteOperation> {
if self.pos >= self.current_block.operations.len() {
// we have consumed our operations entirely.
// let's ask our writer if he has more for us.
// self.go_next_block();
match self.current_block.next.next_block() {
Some(block) => {
self.current_block = block;
self.pos = 0;
}
None => {
return None;
}
}
}
let operation = &self.current_block.operations[self.pos];
self.pos += 1;
return Some(operation);
}
}
#[cfg(test)]
mod tests {
use super::{DeleteQueue, DeleteOperation};
use schema::{Term, Field};
#[test]
fn test_deletequeue() {
let delete_queue = DeleteQueue::new();
let make_op = |i: usize| {
let field = Field(1u8);
DeleteOperation {
opstamp: i as u64,
term: Term::from_field_u32(field, i as u32)
}
};
delete_queue.push(make_op(1));
delete_queue.push(make_op(2));
let snapshot = delete_queue.cursor();
{
let mut operations_it = snapshot.clone();
assert_eq!(operations_it.next().unwrap().opstamp, 1);
assert_eq!(operations_it.next().unwrap().opstamp, 2);
assert!(operations_it.next().is_none());
}
{
let mut operations_it = snapshot.clone();
assert_eq!(operations_it.next().unwrap().opstamp, 1);
assert_eq!(operations_it.next().unwrap().opstamp, 2);
assert!(operations_it.next().is_none());
}
// // operations does not own a lock on the queue.
// delete_queue.push(make_op(3));
// let snapshot2 = delete_queue.snapshot();
// {
// // operations is not affected by
// // the push that occurs after.
// let mut operations_it = snapshot.iter();
// let mut operations2_it = snapshot2.iter();
// assert_eq!(operations_it.next().unwrap().opstamp, 1);
// assert_eq!(operations2_it.next().unwrap().opstamp, 1);
// assert_eq!(operations_it.next().unwrap().opstamp, 2);
// assert_eq!(operations2_it.next().unwrap().opstamp, 2);
// assert!(operations_it.next().is_none());
// assert_eq!(operations2_it.next().unwrap().opstamp, 3);
// assert!(operations2_it.next().is_none());
// }
}
}

View File

@@ -165,7 +165,7 @@ pub fn compute_deleted_bitset(
let mut might_have_changed = false;
loop {
if let Some(delete_op) = delete_cursor.peek() {
if let Some(delete_op) = delete_cursor.get() {
if delete_op.opstamp > target_opstamp {
break;
}

View File

@@ -14,9 +14,6 @@ mod doc_opstamp_mapping;
pub mod operation;
mod stamper;
// TODO avoid exposing SegmentState / SegmentEntry if it does not have to be public API
pub use self::segment_entry::{SegmentEntry, SegmentState};
pub use self::segment_serializer::SegmentSerializer;
pub use self::segment_writer::SegmentWriter;

View File

@@ -359,7 +359,8 @@ impl SegmentUpdater {
self.run_async(move |segment_updater| {
debug!("End merge {:?}", after_merge_segment_entry.meta());
if let Some(delete_operation) = after_merge_segment_entry.delete_cursor().peek() {
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
if let Some(delete_operation) = delete_cursor.get() {
let committed_opstamp = segment_updater.0.index.opstamp();
if delete_operation.opstamp < committed_opstamp {
let segment = segment_updater.0.index.segment(after_merge_segment_entry.meta().clone());