From 9b5185b8d5c6000699b281e44eec269646ae0b5d Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 10 Oct 2016 11:41:25 +0900 Subject: [PATCH] bug/4 Bugfix. Committing was throwing away the segment's state (InMerge or Ready). --- src/core/segment_id.rs | 4 ++ src/directory/mmap_directory.rs | 4 ++ src/indexer/index_writer.rs | 111 ++++++++++++++++++-------------- src/indexer/segment_manager.rs | 22 +++++-- src/indexer/segment_register.rs | 44 +++++++++++-- 5 files changed, 123 insertions(+), 62 deletions(-) diff --git a/src/core/segment_id.rs b/src/core/segment_id.rs index 54dbab8ca..3561f64d0 100644 --- a/src/core/segment_id.rs +++ b/src/core/segment_id.rs @@ -41,6 +41,10 @@ impl SegmentId { SegmentId(create_uuid()) } + pub fn short_uuid_string(&self,) -> String { + String::from(&self.0.to_simple_string()[..8]) + } + pub fn uuid_string(&self,) -> String { self.0.to_simple_string() } diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index d6906956c..a11a5e88b 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -136,6 +136,7 @@ impl Directory for MmapDirectory { fn open_read(&self, path: &Path) -> result::Result { + debug!("Open Read {:?}", path); let full_path = self.resolve_path(path); let mut mmap_cache = try!( @@ -176,6 +177,7 @@ impl Directory for MmapDirectory { } fn open_write(&mut self, path: &Path) -> Result { + debug!("Open Write {:?}", path); let full_path = self.resolve_path(path); let open_res = OpenOptions::new() @@ -206,6 +208,7 @@ impl Directory for MmapDirectory { } fn delete(&self, path: &Path) -> result::Result<(), FileError> { + debug!("Delete {:?}", path); let full_path = self.resolve_path(path); let mut mmap_cache = try!(self.mmap_cache .write() @@ -228,6 +231,7 @@ impl Directory for MmapDirectory { } fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> { + debug!("Atomic Write {:?}", path); let full_path = self.resolve_path(path); let meta_file = atomicwrites::AtomicFile::new(full_path, atomicwrites::AllowOverwrite); try!(meta_file.write(|f| { diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index e77e70f3d..4eb6721b1 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -55,7 +55,10 @@ pub struct IndexWriter { document_sender: DocumentSender, segment_update_sender: SegmentUpdateSender, - + segment_update_thread: JoinHandle<()>, + + worker_id: usize, + num_threads: usize, docstamp: u64, } @@ -163,59 +166,63 @@ fn process_segment_updates(mut index: Index, segment_manager: &SegmentManager, segment_update_receiver: SegmentUpdateReceiver, segment_update_sender: SegmentUpdateSender) { - let mut segment_update_it = segment_update_receiver.into_iter(); let mut is_cancelled_generation = false; let merge_policy = index.get_merge_policy(); - loop { - if let Some(segment_update) = segment_update_it.next() { - let has_changed = process_segment_update( - &index, - segment_manager, - segment_update, - &mut is_cancelled_generation); - if has_changed { - on_segment_change(&mut index); + for segment_update in segment_update_receiver { + let has_changed = process_segment_update( + &index, + segment_manager, + segment_update, + &mut is_cancelled_generation); + if has_changed { + on_segment_change(&mut index); - let segment_manager = get_segment_manager(&index); + let segment_manager = get_segment_manager(&index); - for MergeCandidate(segment_ids) in consider_merge_options(&index, &*merge_policy) { - segment_manager.start_merge(&segment_ids); - let index_clone = index.clone(); - let segment_update_sender_clone = segment_update_sender.clone(); - thread::spawn(move || { - info!("Start merge: {:?}", segment_ids); - let schema = index_clone.schema(); - let segments: Vec = segment_ids - .iter() - .map(|&segment_id| index_clone.segment(segment_id)) - .collect(); - // An IndexMerger is like a "view" of our merged segments. - // TODO unwrap - let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).unwrap(); - let mut merged_segment = index_clone.new_segment(); - // ... we just serialize this index merger in our new segment - // to merge the two segments. - let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).unwrap(); - let num_docs = merger.write(segment_serializer).unwrap(); - let segment_meta = SegmentMeta { - segment_id: merged_segment.id(), - num_docs: num_docs, - }; - let segment_update = SegmentUpdate::EndMerge(segment_ids, segment_meta); - segment_update_sender_clone.send(segment_update); - }); - } + for MergeCandidate(segment_ids) in consider_merge_options(&index, &*merge_policy) { + segment_manager.start_merge(&segment_ids); + let index_clone = index.clone(); + let segment_update_sender_clone = segment_update_sender.clone(); + thread::Builder::new().name(format!("merge_thread_{:?}", segment_ids[0])).spawn(move || { + info!("Start merge: {:?}", segment_ids); + let schema = index_clone.schema(); + let segments: Vec = segment_ids + .iter() + .map(|&segment_id| index_clone.segment(segment_id)) + .collect(); + // An IndexMerger is like a "view" of our merged segments. + // TODO unwrap + let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).unwrap(); + let mut merged_segment = index_clone.new_segment(); + // ... we just serialize this index merger in our new segment + // to merge the two segments. + let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).unwrap(); + let num_docs = merger.write(segment_serializer).unwrap(); + let segment_meta = SegmentMeta { + segment_id: merged_segment.id(), + num_docs: num_docs, + }; + let segment_update = SegmentUpdate::EndMerge(segment_ids, segment_meta); + segment_update_sender_clone.send(segment_update); + }).expect("Failed to spawn merge thread"); } } - else { - // somehow, the channel was dropped. - return; - } } } impl IndexWriter { - + + pub fn wait_merging_threads(self) -> Result<()> { + drop(self.segment_update_sender); + info!("Joining update thread"); + self.segment_update_thread + .join() + .map_err(|err| { + error!("Error in the merging thread {:?}", err); + Error::ErrorInThread(format!("{:?}", err)) + }) + } + /// Spawns a new worker thread for indexing. /// The thread consumes documents from the pipeline. /// @@ -226,8 +233,11 @@ impl IndexWriter { let document_receiver_clone = self.document_receiver.clone(); let mut segment_update_sender = self.segment_update_sender.clone(); - let mut heap = Heap::with_capacity(self.heap_size_in_bytes_per_thread); - let join_handle: JoinHandle> = thread::spawn(move || { + let mut heap = Heap::with_capacity(self.heap_size_in_bytes_per_thread); + + let join_handle: JoinHandle> = try!(thread::Builder::new() + .name(format!("indexing_thread_{}", self.worker_id)) + .spawn(move || { loop { let segment = index.new_segment(); let mut document_iterator = document_receiver_clone @@ -254,7 +264,8 @@ impl IndexWriter { return Ok(()); } } - }); + })); + self.worker_id += 1; self.workers_join_handle.push(join_handle); Ok(()) @@ -283,9 +294,9 @@ impl IndexWriter { let index_clone = index.clone(); let segment_update_sender_clone = segment_update_sender.clone(); - thread::spawn(move || { + let segment_update_thread = try!(thread::Builder::new().name("segment_update".to_string()).spawn(move || { process_segment_updates(index_clone, &*segment_manager, segment_update_receiver, segment_update_sender_clone) - }); + })); let mut index_writer = IndexWriter { heap_size_in_bytes_per_thread: heap_size_in_bytes_per_thread, @@ -295,11 +306,13 @@ impl IndexWriter { document_sender: document_sender, segment_update_sender: segment_update_sender, + segment_update_thread: segment_update_thread, workers_join_handle: Vec::new(), num_threads: num_threads, docstamp: index.docstamp(), + worker_id: 0, }; try!(index_writer.start_workers()); Ok(index_writer) diff --git a/src/indexer/segment_manager.rs b/src/indexer/segment_manager.rs index 4174e62c0..049902fb8 100644 --- a/src/indexer/segment_manager.rs +++ b/src/indexer/segment_manager.rs @@ -3,6 +3,7 @@ use std::sync::RwLock; use core::SegmentMeta; use core::SegmentId; use std::sync::{RwLockReadGuard, RwLockWriteGuard}; +use std::fmt::{self, Debug, Formatter}; struct SegmentRegisters { uncommitted: SegmentRegister, @@ -28,16 +29,21 @@ pub struct SegmentManager { registers: RwLock, } +impl Debug for SegmentManager { + fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> { + let lock = self.read(); + write!(f, "{{ uncommitted: {:?}, committed: {:?} }}", lock.uncommitted, lock.committed) + } +} + + /// Returns the segment_metas for (committed segment, uncommitted segments). /// The result is consistent with other transactions. /// /// For instance, a segment will not appear in both committed and uncommitted /// segments pub fn get_segment_ready_for_commit(segment_manager: &SegmentManager,) -> (Vec, Vec) { - let registers_lock = segment_manager - .registers - .read() - .expect("Segment manager lock is poisoned"); + let registers_lock = segment_manager.read(); (registers_lock.committed.get_segment_ready_for_commit(), registers_lock.uncommitted.get_segment_ready_for_commit()) } @@ -75,9 +81,9 @@ impl SegmentManager { pub fn commit(&self,) { let mut registers_lock = self.write(); - let segment_metas = registers_lock.uncommitted.segment_metas(); - for segment_meta in segment_metas { - registers_lock.committed.add_segment(segment_meta.clone()); + let segment_entries = registers_lock.uncommitted.segment_entries(); + for segment_entry in segment_entries { + registers_lock.committed.add_segment_entry(segment_entry); } registers_lock.uncommitted.clear(); } @@ -114,6 +120,8 @@ impl SegmentManager { registers_lock.committed.remove_segment(segment_id); } registers_lock.committed.add_segment(merged_segment_meta.clone()); + } else { + warn!("couldn't find segment in SegmentManager"); } } diff --git a/src/indexer/segment_register.rs b/src/indexer/segment_register.rs index 3425963e6..7f4af4f4e 100644 --- a/src/indexer/segment_register.rs +++ b/src/indexer/segment_register.rs @@ -1,13 +1,24 @@ use core::SegmentId; use std::collections::HashMap; use core::SegmentMeta; +use std::fmt; +use std::fmt::{Debug, Formatter}; -#[derive(Clone, PartialEq, Eq, Debug)] +#[derive(Clone, PartialEq, Eq)] pub enum SegmentState { Ready, InMerge, } +impl SegmentState { + fn letter_code(&self,) -> char { + match *self { + SegmentState::InMerge => 'M', + SegmentState::Ready => 'R', + } + } +} + #[derive(Clone)] pub struct SegmentEntry { meta: SegmentMeta, @@ -38,6 +49,17 @@ pub struct SegmentRegister { segment_states: HashMap, } +impl Debug for SegmentRegister { + fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> { + try!(write!(f, "SegmentRegister(")); + for (ref k, ref v) in &self.segment_states { + try!(write!(f, "{}:{}, ", k.short_uuid_string(), v.state.letter_code())); + } + try!(write!(f, ")")); + Ok(()) + } +} + impl SegmentRegister { pub fn clear(&mut self,) { @@ -51,7 +73,14 @@ impl SegmentRegister { .map(|segment_entry| segment_entry.meta.clone()) .collect() } - + + pub fn segment_entries(&self,) -> Vec{ + self.segment_states + .values() + .cloned() + .collect() + } + pub fn segment_metas(&self,) -> Vec { let mut segment_ids: Vec = self.segment_states .values() @@ -82,13 +111,16 @@ impl SegmentRegister { .all(|segment_id| self.segment_states.contains_key(segment_id)) } + pub fn add_segment_entry(&mut self, segment_entry: SegmentEntry) { + let segment_id = segment_entry.meta.segment_id; + self.segment_states.insert(segment_id, segment_entry); + } + pub fn add_segment(&mut self, segment_meta: SegmentMeta) { - let segment_id = segment_meta.segment_id.clone(); - let segment_entry = SegmentEntry { + self.add_segment_entry(SegmentEntry { meta: segment_meta.clone(), state: SegmentState::Ready, - }; - self.segment_states.insert(segment_id, segment_entry); + }); } pub fn remove_segment(&mut self, segment_id: &SegmentId) {