From 9cc1e47a06647b7ac7d2e40f8dcdf06ee7b5872a Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 4 Oct 2016 10:41:10 +0900 Subject: [PATCH] bug/4 --- src/core/index.rs | 2 +- src/indexer/index_writer.rs | 88 ++++++++++++++++++------------ src/indexer/merge_policy.rs | 2 +- src/indexer/merger.rs | 2 +- src/indexer/segment_manager.rs | 2 - src/indexer/simple_merge_policy.rs | 2 +- 6 files changed, 57 insertions(+), 41 deletions(-) diff --git a/src/core/index.rs b/src/core/index.rs index d25c43faf..934e9657e 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -196,7 +196,7 @@ impl Index { /// Return a segment object given a segment_id /// /// The segment may or may not exist. - fn segment(&self, segment_id: SegmentId) -> Segment { + pub fn segment(&self, segment_id: SegmentId) -> Segment { create_segment(self.clone(), segment_id) } diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 54c09bdc4..e77e70f3d 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -6,6 +6,7 @@ use core::SerializableSegment; use core::Segment; use std::thread::JoinHandle; use indexer::SegmentWriter; +use indexer::MergeCandidate; use std::clone::Clone; use std::io; use indexer::MergePolicy; @@ -57,8 +58,6 @@ pub struct IndexWriter { num_threads: usize, docstamp: u64, - - merge_policy: Box, } @@ -92,7 +91,6 @@ fn index_documents(heap: &mut Heap, #[derive(Debug)] pub enum SegmentUpdate { AddSegment(SegmentMeta), - StartMerge(Vec), EndMerge(Vec, SegmentMeta), CancelGeneration, NewGeneration, @@ -106,7 +104,8 @@ fn process_segment_update( index: &Index, segment_manager: &SegmentManager, segment_update: SegmentUpdate, - is_cancelled_generation: &mut bool) -> Result { + is_cancelled_generation: &mut bool) -> bool { + info!("Segment update: {:?}", segment_update); match segment_update { SegmentUpdate::AddSegment(segment_meta) => { if !*is_cancelled_generation { @@ -115,54 +114,41 @@ fn process_segment_update( else { index.delete_segment(segment_meta.segment_id); } - Ok(true) - }, - SegmentUpdate::StartMerge(segment_ids) => { - if !*is_cancelled_generation { - segment_manager.start_merge(&segment_ids); - // TODO spawn a segment merge thread - } - Ok(false) + true }, SegmentUpdate::EndMerge(segment_ids, segment_meta) => { segment_manager.end_merge(&segment_ids, &segment_meta); for segment_id in segment_ids { index.delete_segment(segment_id); } - Ok(true) + true }, SegmentUpdate::CancelGeneration => { *is_cancelled_generation = true; - Ok(false) + false }, SegmentUpdate::NewGeneration => { *is_cancelled_generation = false; - Ok(false) + false } } } -fn consider_merge_options(index: &mut Index, merge_policy: &MergePolicy) { +fn consider_merge_options(index: &Index, merge_policy: &MergePolicy) -> Vec { let segment_manager = get_segment_manager(index); let (committed_segments, uncommitted_segments) = get_segment_ready_for_commit(&*segment_manager); // committed segments cannot be merged with uncommitted_segments. - let merge_candidates_committed = merge_policy.compute_merge_candidates(&committed_segments); - let merge_candidates_uncommitted = merge_policy.compute_merge_candidates(&uncommitted_segments); - merge_candidates_committed.into_iter().chain(merge_candidates_uncommitted) - .map(|merge_candidate| { - println!("{:?}", merge_candidate); - }); + let mut merge_candidates = merge_policy.compute_merge_candidates(&committed_segments); + merge_candidates.extend_from_slice(&merge_policy.compute_merge_candidates(&uncommitted_segments)[..]); + merge_candidates } -fn on_segment_change(index: &mut Index, - merge_policy: &MergePolicy) -> Result<()> { +fn on_segment_change(index: &mut Index) -> Result<()> { // saving the meta file. try!(index.save_metas()); // update the searcher so that they eventually will // use the new segments. try!(index.load_searchers()); - // consider merge options. - consider_merge_options(index, merge_policy); Ok(()) } @@ -175,26 +161,55 @@ fn on_segment_change(index: &mut Index, // trivial. fn process_segment_updates(mut index: Index, segment_manager: &SegmentManager, - segment_update_receiver: SegmentUpdateReceiver) -> Result<()> { + segment_update_receiver: SegmentUpdateReceiver, + segment_update_sender: SegmentUpdateSender) { let mut segment_update_it = segment_update_receiver.into_iter(); let mut is_cancelled_generation = false; let merge_policy = index.get_merge_policy(); loop { if let Some(segment_update) = segment_update_it.next() { - let has_changed = try!( - process_segment_update( + let has_changed = process_segment_update( &index, segment_manager, segment_update, - &mut is_cancelled_generation) - ); + &mut is_cancelled_generation); if has_changed { - on_segment_change(&mut index, &*merge_policy); + on_segment_change(&mut index); + + let segment_manager = get_segment_manager(&index); + + for MergeCandidate(segment_ids) in consider_merge_options(&index, &*merge_policy) { + segment_manager.start_merge(&segment_ids); + let index_clone = index.clone(); + let segment_update_sender_clone = segment_update_sender.clone(); + thread::spawn(move || { + info!("Start merge: {:?}", segment_ids); + let schema = index_clone.schema(); + let segments: Vec = segment_ids + .iter() + .map(|&segment_id| index_clone.segment(segment_id)) + .collect(); + // An IndexMerger is like a "view" of our merged segments. + // TODO unwrap + let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).unwrap(); + let mut merged_segment = index_clone.new_segment(); + // ... we just serialize this index merger in our new segment + // to merge the two segments. + let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).unwrap(); + let num_docs = merger.write(segment_serializer).unwrap(); + let segment_meta = SegmentMeta { + segment_id: merged_segment.id(), + num_docs: num_docs, + }; + let segment_update = SegmentUpdate::EndMerge(segment_ids, segment_meta); + segment_update_sender_clone.send(segment_update); + }); + } } } else { // somehow, the channel was dropped. - return Ok(()); + return; } } } @@ -267,8 +282,9 @@ impl IndexWriter { let segment_manager = get_segment_manager(index); let index_clone = index.clone(); + let segment_update_sender_clone = segment_update_sender.clone(); thread::spawn(move || { - process_segment_updates(index_clone, &*segment_manager, segment_update_receiver) + process_segment_updates(index_clone, &*segment_manager, segment_update_receiver, segment_update_sender_clone) }); let mut index_writer = IndexWriter { @@ -283,7 +299,6 @@ impl IndexWriter { workers_join_handle: Vec::new(), num_threads: num_threads, - merge_policy: index.get_merge_policy(), docstamp: index.docstamp(), }; try!(index_writer.start_workers()); @@ -383,6 +398,9 @@ impl IndexWriter { let rollbacked_segments = get_segment_manager(&self.index).rollback(); for segment_id in rollbacked_segments { + + // TODO all delete must happen after saving + // meta.json self.index.delete_segment(segment_id); } try!(self.on_change()); diff --git a/src/indexer/merge_policy.rs b/src/indexer/merge_policy.rs index 1e9040ac0..1d77c3fd7 100644 --- a/src/indexer/merge_policy.rs +++ b/src/indexer/merge_policy.rs @@ -2,7 +2,7 @@ use core::SegmentId; use core::SegmentMeta; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct MergeCandidate(pub Vec); pub trait MergePolicy { diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 52eafbb85..68e863c31 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -333,7 +333,7 @@ mod tests { } } { - let segments = index.searchable_segments().unwrap(); + let segments = index.searchable_segments(); let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); index_writer.merge(&segments).unwrap(); } diff --git a/src/indexer/segment_manager.rs b/src/indexer/segment_manager.rs index 8fde19119..4174e62c0 100644 --- a/src/indexer/segment_manager.rs +++ b/src/indexer/segment_manager.rs @@ -19,8 +19,6 @@ impl Default for SegmentRegisters { } - - /// The segment manager stores the list of segments /// as well as their state. /// diff --git a/src/indexer/simple_merge_policy.rs b/src/indexer/simple_merge_policy.rs index d36136b5f..90ac7fe51 100644 --- a/src/indexer/simple_merge_policy.rs +++ b/src/indexer/simple_merge_policy.rs @@ -11,7 +11,7 @@ impl MergePolicy for SimpleMergePolicy { let num_packs = segments.len() / PACK_LEN; (0..num_packs) .map(|i| { - let segment_ids = segments[i..i*PACK_LEN] + let segment_ids = segments[i*PACK_LEN..(i+1)*PACK_LEN] .iter() .map(|segment_meta| segment_meta.segment_id) .collect();