issue/43 added delete_queue right in the segment updater

This commit is contained in:
Paul Masurel
2017-02-16 10:20:29 +09:00
parent 8b68f22be1
commit 1c03d98a11
3 changed files with 59 additions and 49 deletions

View File

@@ -16,7 +16,7 @@ impl InnerDeleteQueue {
self.last_chunk.push(delete_operation);
}
pub fn operations(&mut self,) -> ReadOnlyDeletes {
pub fn snapshot(&mut self,) -> ReadOnlyDeletes {
if self.last_chunk.len() > 0 {
let new_operations = vec!();
let new_ro_chunk = mem::replace(&mut self.last_chunk, new_operations);
@@ -24,6 +24,11 @@ impl InnerDeleteQueue {
}
self.ro_chunks.clone()
}
pub fn clear(&mut self) {
self.ro_chunks.clear();
self.last_chunk.clear();
}
}
#[derive(Default, Clone)]
@@ -39,6 +44,10 @@ impl ReadOnlyDeletes {
.iter()
.flat_map(|chunk| chunk.iter())
}
pub fn clear(&mut self) {
self.0.clear();
}
}
#[derive(Clone, Default)]
@@ -49,8 +58,12 @@ impl DeleteQueue {
self.0.write().unwrap().push(delete_operation);
}
pub fn operations(&self) -> ReadOnlyDeletes {
self.0.write().unwrap().operations()
pub fn snapshot(&self) -> ReadOnlyDeletes {
self.0.write().unwrap().snapshot()
}
pub fn clear(&self) {
self.0.write().unwrap().clear();
}
}
@@ -74,40 +87,35 @@ mod tests {
delete_queue.push(make_op(1));
delete_queue.push(make_op(2));
// TODO unit tests
// let mut delete_cursor_3 = delete_queue.cursor();
// let mut delete_cursor_3_b = delete_cursor_3.clone();
// assert!(delete_cursor_3.next().is_none());
// assert!(delete_cursor_3.peek().is_none());
// delete_queue.push_op(make_op(3));
// delete_queue.push_op(make_op(4));
// assert_eq!(delete_cursor_3_b.peek(), Some(make_op(3)));
// let mut delete_cursor_3_c = delete_cursor_3_b.clone();
// assert_eq!(delete_cursor_3_b.next(), Some(make_op(3)));
// let mut delete_cursor_4 = delete_cursor_3_b.clone();
// assert_eq!(delete_cursor_3_b.peek(), Some(make_op(4)));
// assert_eq!(delete_cursor_3_b.next(), Some(make_op(4)));
// assert_eq!(delete_cursor_3_c.next(), Some(make_op(3)));
// assert!(delete_cursor_3_b.next().is_none());
// assert_eq!(delete_cursor_3_c.next(), Some(make_op(4)));
// assert!(delete_cursor_3_c.next().is_none());
// assert_eq!(delete_cursor_3.peek(), Some(make_op(3)));
// assert_eq!(delete_cursor_3.next(), Some(make_op(3)));
// assert!(delete_cursor_3_b.next().is_none());
// assert_eq!(delete_cursor_4.next(), Some(make_op(4)));
// assert!(delete_cursor_4.next().is_none());
let snapshot = delete_queue.snapshot();
{
let mut operations_it = snapshot.iter();
assert_eq!(operations_it.next().unwrap().opstamp, 1);
assert_eq!(operations_it.next().unwrap().opstamp, 2);
assert!(operations_it.next().is_none());
}
{ // iterating does not consume results.
let mut operations_it = snapshot.iter();
assert_eq!(operations_it.next().unwrap().opstamp, 1);
assert_eq!(operations_it.next().unwrap().opstamp, 2);
assert!(operations_it.next().is_none());
}
// operations does not own a lock on the queue.
delete_queue.push(make_op(3));
let snapshot2 = delete_queue.snapshot();
{
// operations is not affected by
// the push that occurs after.
let mut operations_it = snapshot.iter();
let mut operations2_it = snapshot2.iter();
assert_eq!(operations_it.next().unwrap().opstamp, 1);
assert_eq!(operations2_it.next().unwrap().opstamp, 1);
assert_eq!(operations_it.next().unwrap().opstamp, 2);
assert_eq!(operations2_it.next().unwrap().opstamp, 2);
assert!(operations_it.next().is_none());
assert_eq!(operations2_it.next().unwrap().opstamp, 3);
assert!(operations2_it.next().is_none());
}
}
}

View File

@@ -130,7 +130,7 @@ pub fn advance_deletes(
let mut last_opstamp_opt: Option<u64> = None;
let delete_operations = delete_queue.operations();
let delete_operations = delete_queue.snapshot();
for delete_op in delete_operations.iter() {
// A delete operation should only affect
// document that were inserted after it.
@@ -319,7 +319,7 @@ impl IndexWriter {
let delete_queue = DeleteQueue::default();
let segment_updater = SegmentUpdater::new(index.clone())?;
let segment_updater = SegmentUpdater::new(index.clone(), delete_queue.clone())?;
let mut index_writer = IndexWriter {
@@ -438,7 +438,7 @@ impl IndexWriter {
Error::ErrorInThread("Error while waiting for rollback.".to_string())
)?;
self.delete_queue = DeleteQueue::default();
self.delete_queue.clear();
// reset the opstamp
self.uncommitted_opstamp = self.committed_opstamp;
@@ -489,16 +489,14 @@ impl IndexWriter {
// committed segments.
self.committed_opstamp = self.stamp();
let new_delete_queue = DeleteQueue::default();
// TODO remove clone
let future = self.segment_updater.commit(self.delete_queue.clone(), self.committed_opstamp);
let future = self.segment_updater.commit(self.committed_opstamp);
// wait for the segment update thread to have processed the info
// TODO remove unwrap
future.wait().unwrap();
self.delete_queue = new_delete_queue;
self.delete_queue.clear();
Ok(self.committed_opstamp)
}

View File

@@ -102,11 +102,12 @@ struct InnerSegmentUpdater {
merging_thread_id: AtomicUsize,
merging_threads: RwLock<HashMap<usize, JoinHandle<Result<SegmentEntry>>>>,
generation: AtomicUsize,
delete_queue: DeleteQueue,
}
impl SegmentUpdater {
pub fn new(index: Index) -> Result<SegmentUpdater>
pub fn new(index: Index, delete_queue: DeleteQueue) -> Result<SegmentUpdater>
{
let segments = index.segments()?;
let segment_manager = SegmentManager::from_segments(segments);
@@ -119,6 +120,7 @@ impl SegmentUpdater {
merging_thread_id: AtomicUsize::default(),
merging_threads: RwLock::new(HashMap::new()),
generation: AtomicUsize::default(),
delete_queue: delete_queue,
}))
)
}
@@ -163,21 +165,21 @@ impl SegmentUpdater {
}
}
fn purge_deletes(&self, delete_queue: &DeleteQueue) -> Result<Vec<SegmentMeta>> {
fn purge_deletes(&self) -> Result<Vec<SegmentMeta>> {
self.0.segment_manager
.segment_entries()
.into_iter()
.map(|segment_entry| {
let mut segment = self.0.index.segment(segment_entry.meta().clone());
advance_deletes(&mut segment, delete_queue, segment_entry.doc_to_opstamp())
advance_deletes(&mut segment, &self.0.delete_queue, segment_entry.doc_to_opstamp())
.map(|entry| entry.meta().clone())
})
.collect()
}
pub fn commit(&self, delete_queue: DeleteQueue, opstamp: u64) -> impl Future<Item=(), Error=&'static str> {
pub fn commit(&self, opstamp: u64) -> impl Future<Item=(), Error=&'static str> {
self.run_async(move |segment_updater| {
let segment_metas = segment_updater.purge_deletes(&delete_queue).expect("Failed purge deletes");
let segment_metas = segment_updater.purge_deletes().expect("Failed purge deletes");
let segment_entries = segment_metas
.into_iter()
.map(SegmentEntry::new)
@@ -210,6 +212,8 @@ impl SegmentUpdater {
let merging_join_handle = thread::spawn(move || {
// first we need to apply deletes to our segment.
info!("Start merge: {:?}", segment_ids_vec);
let ref index = segment_updater_clone.0.index;