Baby step1

This commit is contained in:
Paul Masurel
2017-03-11 14:20:46 +09:00
parent b7f026bab9
commit 77c61ddab2
3 changed files with 83 additions and 93 deletions

View File

@@ -1,71 +1,55 @@
use super::operation::DeleteOperation;
use std::sync::{Arc, RwLock};
use std::mem;
/// This implementation assumes that we
/// have a lot more write operation than read operations.
#[derive(Default)]
struct InnerDeleteQueue {
ro_chunks: DeleteQueueSnapshot,
last_chunk: Vec<DeleteOperation>,
type InnerDeleteQueue = Arc<RwLock<Vec<DeleteOperation>>>;
// TODO very inefficient.
// fix this once the refactoring/bugfix is done
#[derive(Clone)]
pub struct DeleteCursor {
cursor: usize,
operations: InnerDeleteQueue,
}
impl InnerDeleteQueue {
pub fn push(&mut self, delete_operation: DeleteOperation) {
self.last_chunk.push(delete_operation);
}
pub fn snapshot(&mut self,) -> DeleteQueueSnapshot {
if self.last_chunk.len() > 0 {
let new_operations = vec!();
let new_ro_chunk = mem::replace(&mut self.last_chunk, new_operations);
self.ro_chunks.push(new_ro_chunk)
// TODO remove copy
impl Iterator for DeleteCursor {
type Item=DeleteOperation;
fn next(&mut self) -> Option<DeleteOperation >{
let read = self.operations.read().unwrap();
if self.cursor >= read.len() {
None
}
else {
let operation = read[self.cursor].clone();
self.cursor += 1;
Some(operation)
}
self.ro_chunks.clone()
}
pub fn clear(&mut self) {
self.ro_chunks.clear();
self.last_chunk.clear();
}
}
#[derive(Default, Clone)]
pub struct DeleteQueueSnapshot(Vec<Arc<Vec<DeleteOperation>>>);
impl DeleteQueueSnapshot {
fn push(&mut self, operations: Vec<DeleteOperation>) {
self.0.push(Arc::new(operations));
}
pub fn iter<'a>(&'a self) -> impl Iterator<Item=&'a DeleteOperation> {
self.0
.iter()
.flat_map(|chunk| chunk.iter())
}
pub fn clear(&mut self) {
self.0.clear();
}
}
#[derive(Clone, Default)]
pub struct DeleteQueue(Arc<RwLock<InnerDeleteQueue>>);
pub struct DeleteQueue(InnerDeleteQueue);
impl DeleteQueue {
pub fn push(&self, delete_operation: DeleteOperation) {
self.0.write().unwrap().push(delete_operation);
}
pub fn snapshot(&self) -> DeleteQueueSnapshot {
self.0.write().unwrap().snapshot()
pub fn clear(&mut self) {
self.0.write().unwrap().clear();
}
pub fn clear(&self) {
self.0.write().unwrap().clear();
pub fn cursor(&self) -> DeleteCursor {
DeleteCursor {
cursor: 0,
operations: self.0.clone(),
}
}
}
@@ -90,34 +74,35 @@ mod tests {
delete_queue.push(make_op(1));
delete_queue.push(make_op(2));
let snapshot = delete_queue.snapshot();
let snapshot = delete_queue.cursor();
{
let mut operations_it = snapshot.iter();
let mut operations_it = snapshot.clone();
assert_eq!(operations_it.next().unwrap().opstamp, 1);
assert_eq!(operations_it.next().unwrap().opstamp, 2);
assert!(operations_it.next().is_none());
}
{ // iterating does not consume results.
let mut operations_it = snapshot.iter();
{
let mut operations_it = snapshot.clone();
assert_eq!(operations_it.next().unwrap().opstamp, 1);
assert_eq!(operations_it.next().unwrap().opstamp, 2);
assert!(operations_it.next().is_none());
}
// operations does not own a lock on the queue.
delete_queue.push(make_op(3));
let snapshot2 = delete_queue.snapshot();
{
// operations is not affected by
// the push that occurs after.
let mut operations_it = snapshot.iter();
let mut operations2_it = snapshot2.iter();
assert_eq!(operations_it.next().unwrap().opstamp, 1);
assert_eq!(operations2_it.next().unwrap().opstamp, 1);
assert_eq!(operations_it.next().unwrap().opstamp, 2);
assert_eq!(operations2_it.next().unwrap().opstamp, 2);
assert!(operations_it.next().is_none());
assert_eq!(operations2_it.next().unwrap().opstamp, 3);
assert!(operations2_it.next().is_none());
}
// // operations does not own a lock on the queue.
// delete_queue.push(make_op(3));
// let snapshot2 = delete_queue.snapshot();
// {
// // operations is not affected by
// // the push that occurs after.
// let mut operations_it = snapshot.iter();
// let mut operations2_it = snapshot2.iter();
// assert_eq!(operations_it.next().unwrap().opstamp, 1);
// assert_eq!(operations2_it.next().unwrap().opstamp, 1);
// assert_eq!(operations_it.next().unwrap().opstamp, 2);
// assert_eq!(operations2_it.next().unwrap().opstamp, 2);
// assert!(operations_it.next().is_none());
// assert_eq!(operations2_it.next().unwrap().opstamp, 3);
// assert!(operations2_it.next().is_none());
// }
}
}

View File

@@ -10,7 +10,7 @@ use datastruct::stacker::Heap;
use Error;
use Directory;
use fastfield::delete::write_delete_bitset;
use indexer::delete_queue::DeleteQueueSnapshot;
use indexer::delete_queue::DeleteCursor;
use futures::Canceled;
use futures::Future;
use indexer::delete_queue::DeleteQueue;
@@ -119,7 +119,7 @@ pub fn open_index_writer(index: &Index,
let delete_queue = DeleteQueue::default();
let segment_updater = SegmentUpdater::new(index.clone(), delete_queue.clone())?;
let segment_updater = SegmentUpdater::new(index.clone())?;
let mut index_writer = IndexWriter {
@@ -158,7 +158,7 @@ pub fn open_index_writer(index: &Index,
pub fn advance_deletes(
segment: &mut Segment,
delete_operations: &DeleteQueueSnapshot,
delete_cursor: DeleteCursor,
doc_opstamps: &DocToOpstampMapping) -> Result<SegmentMeta> {
@@ -170,7 +170,7 @@ pub fn advance_deletes(
let previous_delete_opstamp_opt = segment.meta().delete_opstamp();
for delete_op in delete_operations.iter() {
for delete_op in delete_cursor {
// let's skip operations that have already been deleted.0u32
if let Some(previous_delete_opstamp) = previous_delete_opstamp_opt {
@@ -480,7 +480,7 @@ impl IndexWriter {
// wait for the segment update thread to have processed the info
self.segment_updater
.commit(self.committed_opstamp)
.commit(self.committed_opstamp, self.delete_queue.cursor())
.wait()?;
self.delete_queue.clear();

View File

@@ -14,7 +14,6 @@ use futures::{Future, future};
use futures::Canceled;
use futures::oneshot;
use indexer::{MergePolicy, DefaultMergePolicy};
use indexer::delete_queue::DeleteQueue;
use indexer::index_writer::advance_deletes;
use indexer::MergeCandidate;
use indexer::merger::IndexMerger;
@@ -22,6 +21,7 @@ use indexer::SegmentEntry;
use indexer::SegmentSerializer;
use Result;
use rustc_serialize::json;
use indexer::delete_queue::DeleteCursor;
use schema::Schema;
use std::borrow::BorrowMut;
use std::collections::HashMap;
@@ -99,12 +99,11 @@ struct InnerSegmentUpdater {
merging_thread_id: AtomicUsize,
merging_threads: RwLock<HashMap<usize, JoinHandle<Result<()>>>>,
generation: AtomicUsize,
delete_queue: DeleteQueue,
}
impl SegmentUpdater {
pub fn new(index: Index, delete_queue: DeleteQueue) -> Result<SegmentUpdater> {
pub fn new(index: Index) -> Result<SegmentUpdater> {
let segments = index.segments()?;
let segment_manager = SegmentManager::from_segments(segments);
Ok(
@@ -116,7 +115,6 @@ impl SegmentUpdater {
merging_thread_id: AtomicUsize::default(),
merging_threads: RwLock::new(HashMap::new()),
generation: AtomicUsize::default(),
delete_queue: delete_queue,
}))
)
}
@@ -170,20 +168,22 @@ impl SegmentUpdater {
}
}
fn purge_deletes(&self) -> Result<Vec<SegmentMeta>> {
self.0.segment_manager
.segment_entries()
.into_iter()
.map(|segment_entry| {
let mut segment = self.0.index.segment(segment_entry.meta().clone());
advance_deletes(&mut segment, &self.0.delete_queue.snapshot(), segment_entry.doc_to_opstamp())
})
.collect()
fn purge_deletes(&self, delete_cursor: DeleteCursor) -> Result<Vec<SegmentMeta>> {
let mut segment_metas = vec!();
for segment_entry in self.0.segment_manager.segment_entries() {
let mut segment = self.0.index.segment(segment_entry.meta().clone());
let delete_cursor = delete_cursor.clone();
// TODO delete cursor skip...
let segment_meta = advance_deletes(&mut segment, delete_cursor, segment_entry.doc_to_opstamp())?;
segment_metas.push(segment_meta);
}
Ok(segment_metas)
}
pub fn commit(&self, opstamp: u64) -> impl Future<Item=(), Error=Error> {
pub fn commit(&self, opstamp: u64, delete_cursor: DeleteCursor) -> impl Future<Item=(), Error=Error> {
self.run_async(move |segment_updater| {
let segment_metas = segment_updater.purge_deletes().expect("Failed purge deletes");
let segment_metas = segment_updater.purge_deletes(delete_cursor).expect("Failed purge deletes");
segment_updater.0.segment_manager.commit(segment_metas);
let mut index = segment_updater.0.index.clone();
{
@@ -212,7 +212,7 @@ impl SegmentUpdater {
let merging_thread_id = self.get_merging_thread_id();
let (merging_future_send, merging_future_recv) = oneshot();
let delete_operations = self.0.delete_queue.snapshot();
// let delete_operations = self.0.delete_queue.snapshot();
if segment_ids.is_empty() {
return merging_future_recv;
@@ -231,11 +231,16 @@ impl SegmentUpdater {
if let Some(segment_entry) = segment_updater_clone.0
.segment_manager
.segment_entry(segment_id) {
let mut segment = index.segment(segment_entry.meta().clone());
let segment_meta = advance_deletes(
&mut segment,
&delete_operations,
segment_entry.doc_to_opstamp())?;
// TODOS make sure that the segment are in the same
// position with regard to deletes.
// let mut segment = index.segment(segment_entry.meta().clone());
// let segment_meta = advance_deletes(
// &mut segment,
// &delete_operations,
// segment_entry.doc_to_opstamp())?;
let segment_meta = segment_entry.meta().clone();
segment_metas.push(segment_meta);
}
else {