From da10fe3b4dfdaa03450fbcb966f278306c7ef4bf Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 13 Mar 2017 22:01:55 +0900 Subject: [PATCH] Various fixes. --- src/indexer/index_writer.rs | 7 ++ src/indexer/segment_entry.rs | 4 + src/indexer/segment_manager.rs | 74 ++++++++++++------ src/indexer/segment_register.rs | 26 ++++--- src/indexer/segment_updater.rs | 134 ++++++++++++++++++-------------- 5 files changed, 154 insertions(+), 91 deletions(-) diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 56e9a41f8..daa97ffa7 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -504,6 +504,13 @@ impl IndexWriter { opstamp } + /// Returns the opstamp of the last successful commit. + /// + /// This is, for instance, the opstamp the index will + /// rollback to if there is a failure like a power surge. + /// + /// This is also the opstamp of the commit that is currently + /// available for searchers. pub fn commit_opstamp(&self) -> u64 { self.committed_opstamp } diff --git a/src/indexer/segment_entry.rs b/src/indexer/segment_entry.rs index e18e84d47..74b45f7d8 100644 --- a/src/indexer/segment_entry.rs +++ b/src/indexer/segment_entry.rs @@ -77,6 +77,10 @@ impl SegmentEntry { self.state = SegmentState::InMerge; } + pub fn cancel_merge(&mut self,) { + self.state = SegmentState::Ready; + } + pub fn is_ready(&self,) -> bool { self.state == SegmentState::Ready } diff --git a/src/indexer/segment_manager.rs b/src/indexer/segment_manager.rs index 4528a412f..22302d942 100644 --- a/src/indexer/segment_manager.rs +++ b/src/indexer/segment_manager.rs @@ -36,18 +36,6 @@ impl Debug for SegmentManager { } } - -/// Returns the `SegmentMeta`s for (committed segment, uncommitted segments). -/// The result is consistent with other transactions. -/// -/// For instance, a segment will not appear in both committed and uncommitted -/// segments -pub fn get_all_segments(segment_manager: &SegmentManager,) -> (Vec, Vec) { - let registers_lock = segment_manager.read(); - (registers_lock.committed.get_all_segments(), - registers_lock.uncommitted.get_all_segments()) -} - pub fn get_mergeable_segments(segment_manager: &SegmentManager,) -> (Vec, Vec) { let registers_lock = segment_manager.read(); (registers_lock.committed.get_mergeable_segments(), @@ -160,6 +148,40 @@ impl SegmentManager { } } + + pub fn cancel_merge(&self, + before_merge_segment_ids: &[SegmentId], + after_merge_segment_id: SegmentId) { + + let mut registers_lock = self.write(); + + // we mark all segments are ready for merge. + { + let target_segment_register: &mut SegmentRegister; + target_segment_register = { + if registers_lock.uncommitted.contains_all(&before_merge_segment_ids) { + &mut registers_lock.uncommitted + } + else if registers_lock.committed.contains_all(&before_merge_segment_ids) { + &mut registers_lock.committed + } + else { + warn!("couldn't find segment in SegmentManager"); + return; + } + }; + for segment_id in before_merge_segment_ids { + target_segment_register.cancel_merge(segment_id); + } + } + + // ... and we make sure the target segment entry + // can be garbage collected. + registers_lock.writing.remove(&after_merge_segment_id); + + } + + pub fn write_segment(&self, segment_id: SegmentId) { let mut registers_lock = self.write(); registers_lock.writing.insert(segment_id); @@ -176,21 +198,27 @@ impl SegmentManager { after_merge_segment_entry: SegmentEntry) { let mut registers_lock = self.write(); + registers_lock.writing.remove(&after_merge_segment_entry.segment_id()); - if registers_lock.uncommitted.contains_all(&before_merge_segment_ids) { - for segment_id in before_merge_segment_ids { - registers_lock.uncommitted.remove_segment(segment_id); + let mut target_register: &mut SegmentRegister = { + if registers_lock.uncommitted.contains_all(&before_merge_segment_ids) { + &mut registers_lock.uncommitted } - registers_lock.uncommitted.add_segment_entry(after_merge_segment_entry); - } - else if registers_lock.committed.contains_all(&before_merge_segment_ids) { - for segment_id in before_merge_segment_ids { - registers_lock.committed.remove_segment(segment_id); + else if registers_lock.committed.contains_all(&before_merge_segment_ids) { + &mut registers_lock.committed + } else { + warn!("couldn't find segment in SegmentManager"); + return; } - registers_lock.committed.add_segment_entry(after_merge_segment_entry); - } else { - warn!("couldn't find segment in SegmentManager"); + }; + for segment_id in before_merge_segment_ids { + target_register.remove_segment(segment_id); } + target_register.add_segment_entry(after_merge_segment_entry); + + + + } pub fn committed_segment_metas(&self,) -> Vec { diff --git a/src/indexer/segment_register.rs b/src/indexer/segment_register.rs index 76f200735..9c6eec651 100644 --- a/src/indexer/segment_register.rs +++ b/src/indexer/segment_register.rs @@ -68,13 +68,6 @@ impl SegmentRegister { segment_ids } - pub fn segment_ids(&self,) -> Vec { - self.segment_metas() - .into_iter() - .map(|segment_meta| segment_meta.id()) - .collect() - } - pub fn segment_entry(&self, segment_id: &SegmentId) -> Option { self.segment_states .get(&segment_id) @@ -96,6 +89,13 @@ impl SegmentRegister { self.segment_states.remove(segment_id); } + pub fn cancel_merge(&mut self, segment_id: &SegmentId) { + self.segment_states + .get_mut(segment_id) + .expect("Received a merge notification for a segment that is not registered") + .cancel_merge(); + } + pub fn start_merge(&mut self, segment_id: &SegmentId) { self.segment_states .get_mut(segment_id) @@ -124,6 +124,14 @@ mod tests { use core::SegmentMeta; use indexer::delete_queue::*; use super::*; + + fn segment_ids(segment_register: &SegmentRegister) -> Vec { + segment_register + .segment_metas() + .into_iter() + .map(|segment_meta| segment_meta.id()) + .collect() + } #[test] fn test_segment_register() { @@ -140,7 +148,7 @@ mod tests { segment_register.add_segment_entry(segment_entry); } assert_eq!(segment_register.segment_entry(&segment_id_a).unwrap().state(), SegmentState::Ready); - assert_eq!(segment_register.segment_ids(), vec!(segment_id_a)); + assert_eq!(segment_ids(&segment_register), vec!(segment_id_a)); { let segment_meta = SegmentMeta::new(segment_id_b); let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor()); @@ -158,7 +166,7 @@ mod tests { let segment_entry = SegmentEntry::new(segment_meta_merged, delete_queue.cursor()); segment_register.add_segment_entry(segment_entry); } - assert_eq!(segment_register.segment_ids(), vec!(segment_id_merged)); + assert_eq!(segment_ids(&segment_register), vec!(segment_id_merged)); } } \ No newline at end of file diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index cb1033521..9eb1aa329 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -92,6 +92,59 @@ pub fn save_metas(segment_metas: Vec, pub struct SegmentUpdater(Arc); + +fn perform_merge(segment_ids: &[SegmentId], + segment_updater: &SegmentUpdater, + mut merged_segment: Segment, + target_opstamp: u64) -> Result { + // first we need to apply deletes to our segment. + info!("Start merge: {:?}", segment_ids); + + let ref index = segment_updater.0.index; + let schema = index.schema(); + let mut segment_entries = vec!(); + for segment_id in segment_ids { + if let Some(mut segment_entry) = segment_updater.0 + .segment_manager + .segment_entry(segment_id) { + let segment = index.segment(segment_entry.meta().clone()); + advance_deletes(segment, &mut segment_entry, target_opstamp)?; + segment_entries.push(segment_entry); + } + else { + error!("Error, had to abort merge as some of the segment is not managed anymore.a"); + return Err(Error::InvalidArgument(format!("Segment {:?} requested for merge is not managed.", segment_id))); + } + } + + let delete_cursor = segment_entries[0].delete_cursor().clone(); + + let segments: Vec = segment_entries + .iter() + .map(|segment_entry| { + index.segment(segment_entry.meta().clone()) + }) + .collect(); + + // An IndexMerger is like a "view" of our merged segments. + let merger: IndexMerger = IndexMerger::open(schema, &segments[..])?; + + // ... we just serialize this index merger in our new segment + // to merge the two segments. + + let segment_serializer = + SegmentSerializer::for_segment(&mut merged_segment) + .expect("Creating index serializer failed"); + + let num_docs = merger.write(segment_serializer).expect("Serializing merged index failed"); + let mut segment_meta = SegmentMeta::new(merged_segment.id()); + segment_meta.set_max_doc(num_docs); + + let after_merge_segment_entry = SegmentEntry::new(segment_meta.clone(), delete_cursor); + Ok(after_merge_segment_entry) +} + + struct InnerSegmentUpdater { pool: CpuPool, index: Index, @@ -145,8 +198,6 @@ impl SegmentUpdater { self.0.merging_thread_id.fetch_add(1, Ordering::SeqCst) } - /// TODO check that we use this correctly taking - /// the laziness in account. fn run_async T>(&self, f: F) -> impl Future { let me_clone = self.clone(); self.0.pool.spawn_fn(move || { @@ -238,66 +289,26 @@ impl SegmentUpdater { // first we need to apply deletes to our segment. info!("Start merge: {:?}", segment_ids_vec); - let ref index = segment_updater_clone.0.index; - let schema = index.schema(); + let merged_segment = segment_updater_clone.new_segment(); + let merged_segment_id = merged_segment.id(); + let merge_result = perform_merge(&segment_ids_vec, &segment_updater_clone, merged_segment, target_opstamp); - let mut segment_entries = vec!(); - - - for segment_id in &segment_ids_vec { - if let Some(mut segment_entry) = segment_updater_clone.0 - .segment_manager - .segment_entry(segment_id) { - - let segment = index.segment(segment_entry.meta().clone()); - // TODO unwrap - advance_deletes(segment, &mut segment_entry, target_opstamp).unwrap(); - segment_entries.push(segment_entry); + match merge_result { + Ok(after_merge_segment_entry) => { + let merged_segment_meta = after_merge_segment_entry.meta().clone(); + segment_updater_clone + .end_merge(segment_ids_vec, after_merge_segment_entry) + .wait() + .expect("Segment updater thread is corrupted."); + merging_future_send.complete(merged_segment_meta); } - else { - error!("Error, had to abort merge as some of the segment is not managed anymore.a"); - return Err(Error::InvalidArgument(format!("Segment {:?} requested for merge is not managed.", segment_id))); + Err(_) => { + // ... cancel merge + warn!("Merge of {:?} was cancelled", segment_ids_vec); + segment_updater_clone.cancel_merge(&segment_ids_vec, merged_segment_id); + // merging_future_send will be dropped, sending an error to the future. } } - - let delete_cursor = segment_entries[0].delete_cursor().clone(); - - let segments: Vec = segment_entries - .iter() - .map(|segment_entry| { - index.segment(segment_entry.meta().clone()) - }) - .collect(); - - // An IndexMerger is like a "view" of our merged segments. - let merger: IndexMerger = IndexMerger::open(schema, &segments[..])?; - let mut merged_segment = index.new_segment(); - - // ... we just serialize this index merger in our new segment - // to merge the two segments. - - let segment_serializer = - SegmentSerializer - ::for_segment(&mut merged_segment) - .expect("Creating index serializer failed"); - - let num_docs = merger.write(segment_serializer).expect("Serializing merged index failed"); - let mut segment_meta = SegmentMeta::new(merged_segment.id()); - segment_meta.set_max_doc(num_docs); - - let before_merged_segment_ids = segment_entries - .iter() - .map(|segment_entry| segment_entry.segment_id()) - .collect::>(); - - let after_merge_segment_entry = SegmentEntry::new(segment_meta.clone(), delete_cursor); - - segment_updater_clone - .end_merge(before_merged_segment_ids, after_merge_segment_entry) - .wait() - .unwrap(); - - merging_future_send.complete(segment_meta); segment_updater_clone.0.merging_threads.write().unwrap().remove(&merging_thread_id); Ok(()) }); @@ -319,6 +330,12 @@ impl SegmentUpdater { } } + fn cancel_merge(&self, + before_merge_segment_ids: &[SegmentId], + after_merge_segment_entry: SegmentId) { + self.0.segment_manager.cancel_merge(&before_merge_segment_ids, after_merge_segment_entry); + } + fn end_merge(&self, before_merge_segment_ids: Vec, @@ -336,7 +353,6 @@ impl SegmentUpdater { segment_updater.0.segment_manager.end_merge(&before_merge_segment_ids, after_merge_segment_entry); segment_updater.save_metas(segment_updater.0.index.opstamp()); }) - } pub fn wait_merging_thread(&self) -> thread::Result<()> {