diff --git a/src/indexer/delete_queue.rs b/src/indexer/delete_queue.rs index 634a81dc6..beb026654 100644 --- a/src/indexer/delete_queue.rs +++ b/src/indexer/delete_queue.rs @@ -15,6 +15,13 @@ pub struct DeleteCursor { operations: InnerDeleteQueue, } +impl DeleteCursor { + pub fn go_to_tail(&mut self,) { + let read = self.operations.read().unwrap(); + self.cursor = read.len(); + } +} + // TODO remove copy impl Iterator for DeleteCursor { @@ -37,6 +44,11 @@ impl Iterator for DeleteCursor { pub struct DeleteQueue(InnerDeleteQueue); impl DeleteQueue { + + pub fn new() -> DeleteQueue { + DeleteQueue::default() + } + pub fn push(&self, delete_operation: DeleteOperation) { self.0.write().unwrap().push(delete_operation); } @@ -61,7 +73,7 @@ mod tests { #[test] fn test_deletequeue() { - let delete_queue = DeleteQueue::default(); + let delete_queue = DeleteQueue::new(); let make_op = |i: usize| { let field = Field(1u8); diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 732b49bcb..3b77cbd13 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -10,10 +10,9 @@ use datastruct::stacker::Heap; use Error; use Directory; use fastfield::delete::write_delete_bitset; -use indexer::delete_queue::DeleteCursor; +use indexer::delete_queue::{DeleteCursor, DeleteQueue}; use futures::Canceled; use futures::Future; -use indexer::delete_queue::DeleteQueue; use indexer::doc_opstamp_mapping::DocToOpstampMapping; use indexer::MergePolicy; use indexer::operation::DeleteOperation; @@ -117,9 +116,9 @@ pub fn open_index_writer(index: &Index, chan::sync(PIPELINE_MAX_SIZE_IN_DOCS); - let delete_queue = DeleteQueue::default(); + let delete_queue = DeleteQueue::new(); - let segment_updater = SegmentUpdater::new(index.clone())?; + let segment_updater = SegmentUpdater::new(index.clone(), delete_queue.cursor())?; let mut index_writer = IndexWriter { @@ -156,12 +155,12 @@ pub fn open_index_writer(index: &Index, // TODO skip delete operation before teh // last delete opstamp -pub fn advance_deletes( - segment: &mut Segment, - delete_cursor: DeleteCursor, - doc_opstamps: &DocToOpstampMapping) -> Result { +pub fn advance_deletes(mut segment: Segment, segment_entry: &mut SegmentEntry) -> Result<()> { + + { + let doc_opstamps = segment_entry.reset_doc_to_stamp(); + let delete_cursor = segment_entry.delete_cursor(); - let segment_reader = SegmentReader::open(segment.clone())?; let mut delete_bitset = BitSet::with_capacity(segment_reader.max_doc() as usize); @@ -172,6 +171,8 @@ pub fn advance_deletes( for delete_op in delete_cursor { + println!("opstamp {:?}", delete_op.opstamp); + // let's skip operations that have already been deleted.0u32 if let Some(previous_delete_opstamp) = previous_delete_opstamp_opt { if delete_op.opstamp <= previous_delete_opstamp { @@ -211,8 +212,10 @@ pub fn advance_deletes( let mut delete_file = segment.open_write(SegmentComponent::DELETE)?; write_delete_bitset(&delete_bitset, &mut delete_file)?; } + } + segment_entry.set_meta(segment.meta().clone()); - Ok(segment.meta().clone()) + Ok(()) } fn index_documents(heap: &mut Heap, @@ -220,7 +223,8 @@ fn index_documents(heap: &mut Heap, schema: &Schema, generation: usize, document_iterator: &mut Iterator, - segment_updater: &mut SegmentUpdater) + segment_updater: &mut SegmentUpdater, + delete_cursor: DeleteCursor) -> Result { heap.clear(); let segment_id = segment.id(); @@ -245,9 +249,9 @@ fn index_documents(heap: &mut Heap, let mut segment_meta = SegmentMeta::new(segment_id); segment_meta.set_max_doc(num_docs); - let mut segment_entry = SegmentEntry::new(segment_meta); + let mut segment_entry = SegmentEntry::new(segment_meta, delete_cursor); segment_entry.set_doc_to_opstamp(DocToOpstampMapping::from(doc_opstamps)); - + segment_updater .add_segment(generation, segment_entry) .wait() @@ -292,6 +296,8 @@ impl IndexWriter { let mut heap = Heap::with_capacity(self.heap_size_in_bytes_per_thread); let generation = self.generation; + + let mut delete_cursor = self.delete_queue.cursor(); let join_handle: JoinHandle> = thread::Builder::new() @@ -299,9 +305,14 @@ impl IndexWriter { .spawn(move || { loop { + + let mut document_iterator = document_receiver_clone.clone() .into_iter() .peekable(); + + // we consume all previous delete operations. + delete_cursor.go_to_tail(); // the peeking here is to avoid // creating a new segment's files @@ -317,7 +328,8 @@ impl IndexWriter { &schema, generation, &mut document_iterator, - &mut segment_updater)?; + &mut segment_updater, + delete_cursor.clone())?; } else { // No more documents. @@ -480,7 +492,7 @@ impl IndexWriter { // wait for the segment update thread to have processed the info self.segment_updater - .commit(self.committed_opstamp, self.delete_queue.cursor()) + .commit(self.committed_opstamp) .wait()?; self.delete_queue.clear(); diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index b7284af96..0a4c21342 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -494,6 +494,7 @@ mod tests { let searcher = index.searcher(); assert_eq!(searcher.segment_readers().len(), 2); assert_eq!(searcher.num_docs(), 3); + assert_eq!(searcher.segment_readers()[0].num_docs(), 1); assert_eq!(searcher.segment_readers()[0].max_doc(), 3); assert_eq!(searcher.segment_readers()[1].num_docs(), 2); diff --git a/src/indexer/segment_entry.rs b/src/indexer/segment_entry.rs index 0c12e72ba..e18e84d47 100644 --- a/src/indexer/segment_entry.rs +++ b/src/indexer/segment_entry.rs @@ -1,7 +1,9 @@ use indexer::doc_opstamp_mapping::DocToOpstampMapping; use core::SegmentMeta; +use indexer::delete_queue::DeleteCursor; use core::SegmentId; use std::fmt; +use std::mem; #[derive(Clone, Copy, PartialEq, Eq, Debug)] pub enum SegmentState { @@ -23,20 +25,32 @@ pub struct SegmentEntry { meta: SegmentMeta, state: SegmentState, doc_to_opstamp: DocToOpstampMapping, + delete_cursor: DeleteCursor, + } impl SegmentEntry { - pub fn new(segment_meta: SegmentMeta) -> SegmentEntry { + pub fn new(segment_meta: SegmentMeta, + delete_cursor: DeleteCursor) -> SegmentEntry { SegmentEntry { meta: segment_meta, state: SegmentState::Ready, doc_to_opstamp: DocToOpstampMapping::None, + delete_cursor: delete_cursor, } } - pub fn doc_to_opstamp(&self) -> &DocToOpstampMapping { - &self.doc_to_opstamp + pub fn reset_doc_to_stamp(&mut self,) -> DocToOpstampMapping { + mem::replace(&mut self.doc_to_opstamp, DocToOpstampMapping::None) + } + + pub fn set_meta(&mut self, segment_meta: SegmentMeta) { + self.meta = segment_meta; + } + + pub fn delete_cursor(&mut self) -> &mut DeleteCursor { + &mut self.delete_cursor } pub fn state(&self) -> SegmentState { diff --git a/src/indexer/segment_manager.rs b/src/indexer/segment_manager.rs index 1991ebda4..7ac353fb5 100644 --- a/src/indexer/segment_manager.rs +++ b/src/indexer/segment_manager.rs @@ -8,6 +8,7 @@ use std::path::PathBuf; use std::collections::hash_set::HashSet; use std::sync::{RwLockReadGuard, RwLockWriteGuard}; use std::fmt::{self, Debug, Formatter}; +use indexer::delete_queue::DeleteCursor; #[derive(Default)] struct SegmentRegisters { @@ -49,11 +50,11 @@ pub fn get_segments(segment_manager: &SegmentManager,) -> (Vec, Vec impl SegmentManager { - pub fn from_segments(segment_metas: Vec) -> SegmentManager { + pub fn from_segments(segment_metas: Vec, delete_cursor: DeleteCursor) -> SegmentManager { SegmentManager { registers: RwLock::new(SegmentRegisters { uncommitted: SegmentRegister::default(), - committed: SegmentRegister::new(segment_metas), + committed: SegmentRegister::new(segment_metas, delete_cursor), writing: HashSet::new(), }), } @@ -127,22 +128,19 @@ impl SegmentManager { segment_ids } - pub fn commit(&self, segment_metas: Vec) { - let committed_segment_entries = segment_metas - .into_iter() - .map(|segment_meta| { - let segment_id = segment_meta.id(); - let mut segment_entry = SegmentEntry::new(segment_meta); - if let Some(state) = self.segment_state(&segment_id) { - segment_entry.set_state(state); - } - segment_entry - }) - .collect::>(); + pub fn commit(&self, mut segment_entries: Vec) { + // TODO is still relevant!? + // restore the state of the segment_entries + for segment_entry in &mut segment_entries { + let segment_id = segment_entry.segment_id(); + if let Some(state) = self.segment_state(&segment_id) { + segment_entry.set_state(state); + } + } let mut registers_lock = self.write(); registers_lock.committed.clear(); registers_lock.uncommitted.clear(); - for segment_entry in committed_segment_entries { + for segment_entry in segment_entries { registers_lock.committed.add_segment_entry(segment_entry); } } @@ -175,21 +173,23 @@ impl SegmentManager { registers_lock.uncommitted.add_segment_entry(segment_entry); } - pub fn end_merge(&self, merged_segment_metas: &[SegmentMeta], merged_segment_meta: SegmentMeta) { + pub fn end_merge(&self, + before_merge_segment_ids: &[SegmentId], + after_merge_segment_entry: SegmentEntry) { + let mut registers_lock = self.write(); - let merged_segment_ids: Vec = merged_segment_metas.iter().map(|meta| meta.id()).collect(); - let merged_segment_entry = SegmentEntry::new(merged_segment_meta); - if registers_lock.uncommitted.contains_all(&merged_segment_ids) { - for segment_id in &merged_segment_ids { + + if registers_lock.uncommitted.contains_all(&before_merge_segment_ids) { + for segment_id in before_merge_segment_ids { registers_lock.uncommitted.remove_segment(segment_id); } - registers_lock.uncommitted.add_segment_entry(merged_segment_entry); + registers_lock.uncommitted.add_segment_entry(after_merge_segment_entry); } - else if registers_lock.committed.contains_all(&merged_segment_ids) { - for segment_id in &merged_segment_ids { + else if registers_lock.committed.contains_all(&before_merge_segment_ids) { + for segment_id in before_merge_segment_ids { registers_lock.committed.remove_segment(segment_id); } - registers_lock.committed.add_segment_entry(merged_segment_entry); + registers_lock.committed.add_segment_entry(after_merge_segment_entry); } else { warn!("couldn't find segment in SegmentManager"); } diff --git a/src/indexer/segment_register.rs b/src/indexer/segment_register.rs index b618cade2..367babbb8 100644 --- a/src/indexer/segment_register.rs +++ b/src/indexer/segment_register.rs @@ -4,6 +4,7 @@ use core::SegmentMeta; use std::fmt; use std::fmt::{Debug, Formatter}; use indexer::segment_entry::SegmentEntry; +use indexer::delete_queue::DeleteCursor; /// The segment register keeps track /// of the list of segment, their size as well @@ -95,16 +96,15 @@ impl SegmentRegister { .start_merge(); } - pub fn new(segment_metas: Vec) -> SegmentRegister { + pub fn new(segment_metas: Vec, delete_cursor: DeleteCursor) -> SegmentRegister { + let mut segment_states = HashMap::new(); + for segment_meta in segment_metas { + let segment_id = segment_meta.id(); + let segment_entry = SegmentEntry::new(segment_meta, delete_cursor.clone()); + segment_states.insert(segment_id, segment_entry); + } SegmentRegister { - segment_states: segment_metas - .into_iter() - .map(|segment_meta| { - let segment_id = segment_meta.id(); - let segment_entry = SegmentEntry::new(segment_meta ); - (segment_id, segment_entry) - }) - .collect(), + segment_states: segment_states } } } @@ -115,10 +115,13 @@ mod tests { use indexer::SegmentState; use core::SegmentId; use core::SegmentMeta; + use indexer::delete_queue::*; use super::*; #[test] fn test_segment_register() { + let delete_queue = DeleteQueue::new(); + let mut segment_register = SegmentRegister::default(); let segment_id_a = SegmentId::generate_random(); let segment_id_b = SegmentId::generate_random(); @@ -126,14 +129,14 @@ mod tests { { let segment_meta = SegmentMeta::new(segment_id_a); - let segment_entry = SegmentEntry::new(segment_meta); + let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor()); segment_register.add_segment_entry(segment_entry); } assert_eq!(segment_register.segment_entry(&segment_id_a).unwrap().state(), SegmentState::Ready); assert_eq!(segment_register.segment_ids(), vec!(segment_id_a)); { let segment_meta = SegmentMeta::new(segment_id_b); - let segment_entry = SegmentEntry::new(segment_meta); + let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor()); segment_register.add_segment_entry(segment_entry); } assert_eq!(segment_register.segment_entry(&segment_id_b).unwrap().state(), SegmentState::Ready); @@ -145,7 +148,7 @@ mod tests { segment_register.remove_segment(&segment_id_b); { let segment_meta_merged = SegmentMeta::new(segment_id_merged); - let segment_entry = SegmentEntry::new(segment_meta_merged); + let segment_entry = SegmentEntry::new(segment_meta_merged, delete_queue.cursor()); segment_register.add_segment_entry(segment_entry); } assert_eq!(segment_register.segment_ids(), vec!(segment_id_merged)); diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index e57c0fece..3984b8e86 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -103,9 +103,9 @@ struct InnerSegmentUpdater { impl SegmentUpdater { - pub fn new(index: Index) -> Result { + pub fn new(index: Index, delete_cursor: DeleteCursor) -> Result { let segments = index.segments()?; - let segment_manager = SegmentManager::from_segments(segments); + let segment_manager = SegmentManager::from_segments(segments, delete_cursor); Ok( SegmentUpdater(Arc::new(InnerSegmentUpdater { pool: CpuPool::new(1), @@ -168,23 +168,22 @@ impl SegmentUpdater { } } - fn purge_deletes(&self, delete_cursor: DeleteCursor) -> Result> { - let mut segment_metas = vec!(); - for segment_entry in self.0.segment_manager.segment_entries() { - let mut segment = self.0.index.segment(segment_entry.meta().clone()); - let delete_cursor = delete_cursor.clone(); - // TODO delete cursor skip... - let segment_meta = advance_deletes(&mut segment, delete_cursor, segment_entry.doc_to_opstamp())?; - segment_metas.push(segment_meta); + fn purge_deletes(&self) -> Result> { + let mut segment_entries = self.0.segment_manager.segment_entries(); + for segment_entry in &mut segment_entries { + let segment = self.0.index.segment(segment_entry.meta().clone()); + advance_deletes(segment, segment_entry)?; } - Ok(segment_metas) + Ok(segment_entries) } - pub fn commit(&self, opstamp: u64, delete_cursor: DeleteCursor) -> impl Future { + pub fn commit(&self, opstamp: u64) -> impl Future { self.run_async(move |segment_updater| { - let segment_metas = segment_updater.purge_deletes(delete_cursor).expect("Failed purge deletes"); - segment_updater.0.segment_manager.commit(segment_metas); + let segment_entries = segment_updater + .purge_deletes() + .expect("Failed purge deletes"); + segment_updater.0.segment_manager.commit(segment_entries); let mut index = segment_updater.0.index.clone(); { let directory = index.directory(); @@ -226,22 +225,20 @@ impl SegmentUpdater { let ref index = segment_updater_clone.0.index; let schema = index.schema(); - let mut segment_metas = vec!(); + let mut segment_entries = vec!(); + for segment_id in &segment_ids_vec { - if let Some(segment_entry) = segment_updater_clone.0 + if let Some(mut segment_entry) = segment_updater_clone.0 .segment_manager .segment_entry(segment_id) { // TODOS make sure that the segment are in the same // position with regard to deletes. - // let mut segment = index.segment(segment_entry.meta().clone()); - // let segment_meta = advance_deletes( - // &mut segment, - // &delete_operations, - // segment_entry.doc_to_opstamp())?; - let segment_meta = segment_entry.meta().clone(); - segment_metas.push(segment_meta); + let segment = index.segment(segment_entry.meta().clone()); + advance_deletes(segment, &mut segment_entry)?; + + segment_entries.push(segment_entry); } else { error!("Error, had to abort merge as some of the segment is not managed anymore.a"); @@ -249,10 +246,13 @@ impl SegmentUpdater { } } - let segments: Vec = segment_metas + let delete_cursor = segment_entries[0].delete_cursor().clone(); + + let segments: Vec = segment_entries .iter() - .cloned() - .map(|segment_meta| index.segment(segment_meta)) + .map(|segment_entry| { + index.segment(segment_entry.meta().clone()) + }) .collect(); // An IndexMerger is like a "view" of our merged segments. @@ -262,16 +262,27 @@ impl SegmentUpdater { // ... we just serialize this index merger in our new segment // to merge the two segments. - let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).expect("Creating index serializer failed"); + let segment_serializer = + SegmentSerializer + ::for_segment(&mut merged_segment) + .expect("Creating index serializer failed"); let num_docs = merger.write(segment_serializer).expect("Serializing merged index failed"); let mut segment_meta = SegmentMeta::new(merged_segment.id()); segment_meta.set_max_doc(num_docs); + let before_merged_segment_ids = segment_entries + .iter() + .map(|segment_entry| segment_entry.segment_id()) + .collect::>(); + + let after_merge_segment_entry = SegmentEntry::new(segment_meta.clone(), delete_cursor); + segment_updater_clone - .end_merge(segment_metas.clone(), segment_meta.clone()) + .end_merge(before_merged_segment_ids, after_merge_segment_entry) .wait() .unwrap(); + merging_future_send.complete(segment_meta); segment_updater_clone.0.merging_threads.write().unwrap().remove(&merging_thread_id); Ok(()) @@ -296,11 +307,11 @@ impl SegmentUpdater { fn end_merge(&self, - merged_segment_metas: Vec, - segment_meta: SegmentMeta) -> impl Future { + before_merge_segment_ids: Vec, + after_merge_segment_entry: SegmentEntry) -> impl Future { self.run_async(move |segment_updater| { - segment_updater.0.segment_manager.end_merge(&merged_segment_metas, segment_meta); + segment_updater.0.segment_manager.end_merge(&before_merge_segment_ids, after_merge_segment_entry); let mut directory = segment_updater.0.index.directory().box_clone(); let segment_metas = segment_updater.0.segment_manager.committed_segment_metas(); save_metas(