issue/43 made the delete queue shareable

This commit is contained in:
Paul Masurel
2017-02-15 11:52:31 +09:00
parent d007cf3435
commit 8b68f22be1
3 changed files with 55 additions and 28 deletions

View File

@@ -1,31 +1,59 @@
use super::operation::DeleteOperation;
use std::sync::{Arc, RwLock};
use std::mem;
// TODO remove clone
#[derive(Clone)]
pub struct DeleteQueue {
delete_operations: Vec<DeleteOperation>,
/// This implementation assumes that we
/// have a lot more write operation than read operations.
#[derive(Default)]
struct InnerDeleteQueue {
ro_chunks: ReadOnlyDeletes,
last_chunk: Vec<DeleteOperation>,
}
impl InnerDeleteQueue {
pub fn push(&mut self, delete_operation: DeleteOperation) {
self.last_chunk.push(delete_operation);
}
pub fn operations(&mut self,) -> ReadOnlyDeletes {
if self.last_chunk.len() > 0 {
let new_operations = vec!();
let new_ro_chunk = mem::replace(&mut self.last_chunk, new_operations);
self.ro_chunks.push(new_ro_chunk)
}
self.ro_chunks.clone()
}
}
#[derive(Default, Clone)]
pub struct ReadOnlyDeletes(Vec<Arc<Vec<DeleteOperation>>>);
impl ReadOnlyDeletes {
fn push(&mut self, operations: Vec<DeleteOperation>) {
self.0.push(Arc::new(operations));
}
pub fn iter<'a>(&'a self) -> impl Iterator<Item=&'a DeleteOperation> {
self.0
.iter()
.flat_map(|chunk| chunk.iter())
}
}
#[derive(Clone, Default)]
pub struct DeleteQueue(Arc<RwLock<InnerDeleteQueue>>);
impl DeleteQueue {
pub fn new() -> DeleteQueue {
DeleteQueue {
delete_operations: vec!(),
}
}
pub fn push_op(&mut self, delete_operation: DeleteOperation) {
self.delete_operations.push(delete_operation);
pub fn push(&self, delete_operation: DeleteOperation) {
self.0.write().unwrap().push(delete_operation);
}
pub fn operations(&self,) -> impl Iterator<Item=DeleteOperation> {
// TODO fix iterator
self.delete_operations.clone().into_iter()
pub fn operations(&self) -> ReadOnlyDeletes {
self.0.write().unwrap().operations()
}
}
#[cfg(test)]
mod tests {
@@ -34,7 +62,7 @@ mod tests {
#[test]
fn test_deletequeue() {
let mut delete_queue = DeleteQueue::new();
let delete_queue = DeleteQueue::default();
let make_op = |i: usize| {
let field = Field(1u8);
@@ -44,8 +72,8 @@ mod tests {
}
};
delete_queue.push_op(make_op(1));
delete_queue.push_op(make_op(2));
delete_queue.push(make_op(1));
delete_queue.push(make_op(2));
// TODO unit tests

View File

@@ -130,7 +130,8 @@ pub fn advance_deletes(
let mut last_opstamp_opt: Option<u64> = None;
for delete_op in delete_queue.operations() {
let delete_operations = delete_queue.operations();
for delete_op in delete_operations.iter() {
// A delete operation should only affect
// document that were inserted after it.
//
@@ -316,7 +317,7 @@ impl IndexWriter {
chan::sync(PIPELINE_MAX_SIZE_IN_DOCS);
let delete_queue = DeleteQueue::new();
let delete_queue = DeleteQueue::default();
let segment_updater = SegmentUpdater::new(index.clone())?;
@@ -437,7 +438,7 @@ impl IndexWriter {
Error::ErrorInThread("Error while waiting for rollback.".to_string())
)?;
self.delete_queue = DeleteQueue::new();
self.delete_queue = DeleteQueue::default();
// reset the opstamp
self.uncommitted_opstamp = self.committed_opstamp;
@@ -488,7 +489,7 @@ impl IndexWriter {
// committed segments.
self.committed_opstamp = self.stamp();
let new_delete_queue = DeleteQueue::new();
let new_delete_queue = DeleteQueue::default();
// TODO remove clone
let future = self.segment_updater.commit(self.delete_queue.clone(), self.committed_opstamp);
@@ -508,7 +509,7 @@ impl IndexWriter {
opstamp: opstamp,
term: term,
};
self.delete_queue.push_op(delete_operation);
self.delete_queue.push(delete_operation);
}
fn stamp(&mut self) -> u64 {

View File

@@ -28,8 +28,6 @@ extern crate log;
#[macro_use]
extern crate version;
#[macro_use]
extern crate fst;
extern crate byteorder;
extern crate memmap;