diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 0a60d16af..b0f967e2e 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -12,11 +12,12 @@ use std::thread; use indexer::merger::IndexMerger; use core::SegmentId; use datastruct::stacker::Heap; +use std::sync::Arc; use std::mem::swap; use chan; use core::SegmentMeta; -use indexer::SegmentAppender; use super::super::core::index::get_segment_manager; +use super::segment_manager::SegmentManager; use Result; use Error; @@ -34,8 +35,8 @@ const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000; type DocumentSender = chan::Sender; type DocumentReceiver = chan::Receiver; -type NewSegmentSender = chan::Sender>; -type NewSegmentReceiver = chan::Receiver>; +type SegmentUpdateSender = chan::Sender; +type SegmentUpdateReceiver = chan::Receiver; /// `IndexWriter` is the user entry-point to add document to an index. /// @@ -46,10 +47,15 @@ type NewSegmentReceiver = chan::Receiver>; pub struct IndexWriter { index: Index, heap_size_in_bytes_per_thread: usize, + workers_join_handle: Vec>>, - segment_appender: SegmentAppender, + document_receiver: DocumentReceiver, document_sender: DocumentSender, + + segment_update_sender: SegmentUpdateSender, + segment_update_worker: JoinHandle>, + num_threads: usize, docstamp: u64, } @@ -59,7 +65,7 @@ fn index_documents(heap: &mut Heap, segment: Segment, schema: &Schema, document_iterator: &mut Iterator, - segment_appender: &mut SegmentAppender) -> Result { + segment_update_sender: &mut SegmentUpdateSender) -> Result<()> { heap.clear(); let segment_id = segment.id(); let mut segment_writer = try!(SegmentWriter::for_segment(heap, segment, &schema)); @@ -77,10 +83,95 @@ fn index_documents(heap: &mut Heap, }; try!(segment_writer.finalize()); - segment_appender.add_segment(segment_meta) + segment_update_sender.send(SegmentUpdate::AddSegment(segment_meta)); + Ok(()) } +#[derive(Debug)] +pub enum SegmentUpdate { + AddSegment(SegmentMeta), + StartMerge(Vec), + EndMerge(Vec, SegmentMeta), + CancelGeneration, + NewGeneration, +} + +// Process a single segment update. +// +// If the segment manager has been changed a result, +// return true. (else return false) +fn process_segment_update(segment_manager: &SegmentManager, + segment_update: SegmentUpdate, + is_cancelled_generation: &mut bool) -> Result { + match segment_update { + SegmentUpdate::AddSegment(segment_meta) => { + if !*is_cancelled_generation { + try!(segment_manager.add_segment(segment_meta)); + } + Ok(true) + }, + SegmentUpdate::StartMerge(segment_ids) => { + if !*is_cancelled_generation { + try!(segment_manager.start_merge(&segment_ids)); + // TODO spawn a segment merge thread + } + Ok(false) + }, + SegmentUpdate::EndMerge(segment_ids, segment_meta) => { + try!(segment_manager.end_merge(&segment_ids, &segment_meta)); + Ok(true) + }, + SegmentUpdate::CancelGeneration => { + *is_cancelled_generation = true; + Ok(false) + }, + SegmentUpdate::NewGeneration => { + *is_cancelled_generation = false; + Ok(false) + } + } +} + + +fn on_segment_change() { + // TODO + // - save meta.json, + // - merge opportunities + // - update searchers +} + +// Consumes the `segment_update_receiver` channel +// for segment updates and apply them. +// +// Using a channel ensures that all of the updates +// happen in the same thread, and makes +// the implementation of rollback and commit +// trivial. +fn process_segment_updates(segment_manager: &SegmentManager, + segment_update_receiver: SegmentUpdateReceiver) -> Result<()> { + let mut segment_update_it = segment_update_receiver.into_iter(); + let mut is_cancelled_generation = false; + loop { + if let Some(segment_update) = segment_update_it.next() { + let has_changed = try!( + process_segment_update( + segment_manager, + segment_update, + &mut is_cancelled_generation) + ); + if has_changed { + on_segment_change(); + } + } + else { + // somehow, the channel was dropped. + return Ok(()); + } + } + Ok(()) +} + impl IndexWriter { /// Spawns a new worker thread for indexing. @@ -89,9 +180,10 @@ impl IndexWriter { fn add_indexing_worker(&mut self,) -> Result<()> { let index = self.index.clone(); let schema = self.index.schema(); - let mut segment_appender_clone = self.segment_appender.clone(); - let document_receiver_clone = self.document_receiver.clone(); + let document_receiver_clone = self.document_receiver.clone(); + let mut segment_update_sender = self.segment_update_sender.clone(); + let mut heap = Heap::with_capacity(self.heap_size_in_bytes_per_thread); let join_handle: JoinHandle> = thread::spawn(move || { loop { @@ -104,15 +196,11 @@ impl IndexWriter { // creating a new segment's files // if no document are available. if document_iterator.peek().is_some() { - if !try!( - index_documents(&mut heap, - segment, - &schema, - &mut document_iterator, - &mut segment_appender_clone) - ) { - return Ok(()); - } + try!(index_documents(&mut heap, + segment, + &schema, + &mut document_iterator, + &mut segment_update_sender)); } else { // No more documents. @@ -123,6 +211,7 @@ impl IndexWriter { } }); self.workers_join_handle.push(join_handle); + Ok(()) } @@ -140,19 +229,28 @@ impl IndexWriter { pub fn open(index: &Index, num_threads: usize, heap_size_in_bytes_per_thread: usize) -> Result { - if heap_size_in_bytes_per_thread <= HEAP_SIZE_LIMIT as usize { + if heap_size_in_byt es_per_thread <= HEAP_SIZE_LIMIT as usize { panic!(format!("The heap size per thread needs to be at least {}.", HEAP_SIZE_LIMIT)); } let (document_sender, document_receiver): (DocumentSender, DocumentReceiver) = chan::sync(PIPELINE_MAX_SIZE_IN_DOCS); + let (segment_update_sender, segment_update_receiver): (SegmentUpdateSender, SegmentUpdateReceiver) = chan::sync(0); + let segment_manager = get_segment_manager(index); - let segment_appender = SegmentAppender::for_manager(segment_manager); + + let segment_update_worker = thread::spawn(move || { + process_segment_updates(&*segment_manager, segment_update_receiver) + }); let mut index_writer = IndexWriter { heap_size_in_bytes_per_thread: heap_size_in_bytes_per_thread, index: index.clone(), - segment_appender: segment_appender, + document_receiver: document_receiver, document_sender: document_sender, + + segment_update_sender: segment_update_sender, + segment_update_worker: segment_update_worker, + workers_join_handle: Vec::new(), num_threads: num_threads, docstamp: index.docstamp(), @@ -203,7 +301,6 @@ impl IndexWriter { swap(&mut self.document_sender, &mut document_sender); swap(&mut self.document_receiver, &mut document_receiver); let segment_manager = get_segment_manager(&self.index); - self.segment_appender = SegmentAppender::for_manager(segment_manager); document_receiver } @@ -217,11 +314,8 @@ impl IndexWriter { /// The docstamp at the last commit is returned. pub fn rollback(&mut self,) -> Result { - // the current generation is killed... - // if some pending segment are still in the pipe, - // they won't be added to our index. - self.segment_appender.close(); - + self.segment_update_sender.send(SegmentUpdate::CancelGeneration); + // we cannot drop segment ready receiver yet // as it would block the workers. let document_receiver = self.recreate_channel(); @@ -235,16 +329,29 @@ impl IndexWriter { // wait for all the worker to finish their work // (it should be fast since we consumed all pending documents) + let num_workers = former_workers_join_handle.len(); for worker_handle in former_workers_join_handle { + // we stop one worker at a time ... try!(try!( worker_handle .join() .map_err(|e| Error::ErrorInThread(format!("{:?}", e))) )); - // add a new worker for the next generation. + // ... and recreate a new one right away + // to work on the next generation. try!(self.add_indexing_worker()); } + // All of our indexing workers for the rollbacked generation have + // been terminated. + // Our document receiver pipe was drained. + // No new document have been added in the meanwhile because `IndexWriter` + // is not shared by different threads. + // + // We can now open a new generation and reaccept segments + // from now on. + self.segment_update_sender.send(SegmentUpdate::NewGeneration); + get_segment_manager(&self.index).rollback(); // reset the docstamp to what it was before diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index d8d773ab0..e707529b1 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -11,4 +11,4 @@ pub use self::segment_serializer::SegmentSerializer; pub use self::segment_writer::SegmentWriter; pub use self::index_writer::IndexWriter; pub use self::merge_policy::MergePolicy; -pub use self::segment_manager::{SegmentManager, SegmentAppender}; \ No newline at end of file +pub use self::segment_manager::SegmentManager; \ No newline at end of file diff --git a/src/indexer/segment_manager.rs b/src/indexer/segment_manager.rs index c11a770c1..4131a3804 100644 --- a/src/indexer/segment_manager.rs +++ b/src/indexer/segment_manager.rs @@ -3,8 +3,6 @@ use std::sync::RwLock; use core::SegmentMeta; use error::Result; use core::SegmentId; -use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; struct SegmentRegisters { uncommitted: SegmentRegister, @@ -22,6 +20,7 @@ impl Default for SegmentRegisters { + /// The segment manager stores the list of segments /// as well as their state. /// @@ -58,7 +57,7 @@ impl SegmentManager { Ok(()) } - fn add_segment(&self, segment_meta: SegmentMeta) -> Result<()> { + pub fn add_segment(&self, segment_meta: SegmentMeta) -> Result<()> { let mut registers_lock = try!(self.registers.write()); registers_lock.uncommitted.add_segment(segment_meta); Ok(()) @@ -107,37 +106,3 @@ impl SegmentManager { } } - -#[derive(Clone)] -pub struct SegmentAppender { - is_open: Arc, - manager: Arc, -} - -impl SegmentAppender { - - pub fn for_manager(manager: Arc) -> SegmentAppender { - SegmentAppender { - is_open: Arc::new(AtomicBool::new(true)), - manager: manager, - } - } - - pub fn is_open(&self,) -> bool { - self.is_open.load(Ordering::Acquire) - } - - pub fn add_segment(&mut self, segment_meta: SegmentMeta) -> Result { - if self.is_open() { - try!(self.manager.add_segment(segment_meta)); - Ok(true) - } - else { - Ok(false) - } - } - - pub fn close(&mut self,) { - self.is_open.store(false, Ordering::Release); - } -} \ No newline at end of file