diff --git a/Cargo.toml b/Cargo.toml index 406b0bda8..998d5b48c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,10 @@ uuid = { version = "0.4", features = ["v4", "rustc-serialize"] } chan = "0.1" version = "2" crossbeam = "0.2" + eventual = "0.1.7" +futures = "0.1.9" +futures-cpupool = "0.1.2" [dev-dependencies] rand = "0.3" diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 52907f778..4a4377e74 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -5,6 +5,7 @@ use indexer::SegmentSerializer; use core::SerializableSegment; use core::Index; use core::Segment; +use core::SegmentId; use schema::Term; use indexer::SegmentEntry; use std::thread::JoinHandle; @@ -63,7 +64,6 @@ pub struct IndexWriter { _merge_policy: Arc>>, index: Index, - segment_manager: Arc, heap_size_in_bytes_per_thread: usize, @@ -72,7 +72,7 @@ pub struct IndexWriter { document_receiver: DocumentReceiver, document_sender: DocumentSender, - segment_update_manager: SegmentUpdater, + segment_updater: SegmentUpdater, worker_id: usize, @@ -93,7 +93,7 @@ fn index_documents(heap: &mut Heap, mut segment: Segment, schema: &Schema, document_iterator: &mut Iterator, - segment_update_manager: &mut SegmentUpdater, + segment_updater: &mut SegmentUpdater, delete_cursor: &mut DeleteQueueCursor) -> Result<()> { heap.clear(); @@ -134,7 +134,7 @@ fn index_documents(heap: &mut Heap, let segment_entry = SegmentEntry::new(segment_meta, delete_cursor.clone()); try!(segment_writer.finalize()); - segment_update_manager.send(SegmentUpdate::AddSegment(segment_entry)); + segment_updater.add_segment(segment_entry); Ok(()) } @@ -144,10 +144,10 @@ impl IndexWriter { /// The index writer pub fn wait_merging_threads(mut self) -> Result<()> { - let future = self.segment_update_manager.send(SegmentUpdate::Terminate); + let future = self.segment_updater.terminate(); // this will stop the indexing thread, - // dropping the last reference to the segment_update_manager. + // dropping the last reference to the segment_updater. drop(self.document_sender); let mut v = Vec::new(); @@ -172,7 +172,7 @@ impl IndexWriter { let index = self.index.clone(); let schema = self.index.schema(); let document_receiver_clone = self.document_receiver.clone(); - let mut segment_update_manager = self.segment_update_manager.clone(); + let mut segment_updater = self.segment_updater.clone(); let mut heap = Heap::with_capacity(self.heap_size_in_bytes_per_thread); // TODO fix this. the cursor might be too advanced @@ -201,7 +201,7 @@ impl IndexWriter { segment, &schema, &mut document_iterator, - &mut segment_update_manager, + &mut segment_updater, &mut delete_cursor_clone)); } else { // No more documents. @@ -250,11 +250,8 @@ impl IndexWriter { let delete_queue = DeleteQueue::default(); - let committed_segments = index.committed_segments()?; - let segment_manager = Arc::new(SegmentManager::from_segments(committed_segments, delete_queue.cursor())); - - let segment_update_manager = SegmentUpdater::new(index.clone(), segment_manager.clone(), merge_policy.clone()); + let segment_updater = SegmentUpdater::create(index.clone(), delete_queue.cursor(), merge_policy.clone())?; let mut index_writer = IndexWriter { @@ -264,12 +261,11 @@ impl IndexWriter { heap_size_in_bytes_per_thread: heap_size_in_bytes_per_thread, index: index.clone(), - segment_manager: segment_manager, document_receiver: document_receiver, document_sender: document_sender, - segment_update_manager: segment_update_manager, + segment_updater: segment_updater, workers_join_handle: Vec::new(), num_threads: num_threads, @@ -303,72 +299,8 @@ impl IndexWriter { } /// Merges a given list of segments - pub fn merge(&mut self, segments: &[Segment]) -> Result<()> { - - if segments.len() < 2 { - // no segments or one segment? nothing to do. - return Ok(()); - } - - let ref segment_manager = self.segment_manager; - { - // let's check that all these segments are in the same - // committed/uncommited state. - let first_commit_state = segment_manager.is_committed(segments[0].id()); - - for segment in segments { - let commit_state = segment_manager.is_committed(segment.id()); - if commit_state == CommitState::Missing { - return Err(Error::InvalidArgument(format!("Segment {:?} is not in the index", - segments[0].id()))); - } - if commit_state != first_commit_state { - return Err(Error::InvalidArgument(String::from("You may not merge segments \ - that are heterogenously in \ - committed and uncommited."))); - } - } - } - - let schema = self.index.schema(); - - // An IndexMerger is like a "view" of our merged segments. - let merger = try!(IndexMerger::open(schema, segments)); - let mut merged_segment = self.index.new_segment(); - - // ... we just serialize this index merger in our new segment - // to merge the two segments. - let segment_serializer = try!(SegmentSerializer::for_segment(&mut merged_segment)); - let num_docs = try!(merger.write(segment_serializer)); - let merged_segment_ids = segments - .iter() - .map(|segment| segment.id()) - .collect::>(); - - let segment_meta = SegmentMeta { - segment_id: merged_segment.id(), - num_docs: num_docs, - num_deleted_docs: 0, - }; - - // TODO fix this!!! - let delete_queue = DeleteQueue::default(); - let delete_cursor = delete_queue.cursor(); - - let segment_entry = SegmentEntry::new(segment_meta, delete_cursor); - - let segment_update = SegmentUpdate::EndMerge( - None, - merged_segment_ids, - segment_entry - ); - - self.segment_update_manager.send(segment_update); - - // self.segment_updater.(segment_ids, segment_entry); - //segment_manager.end_merge(&merged_segment_ids, segment_entry); - - Ok(()) + pub fn merge(&mut self, segments: &[SegmentId]) -> impl Async { + self.segment_updater.start_merge(segments.to_vec()) } /// Closes the current document channel send. @@ -397,7 +329,7 @@ impl IndexWriter { /// The docstamp at the last commit is returned. pub fn rollback(&mut self) -> Result { - self.segment_update_manager.send(SegmentUpdate::CancelGeneration); + self.segment_updater.cancel_generation(); // we cannot drop segment ready receiver yet // as it would block the workers. @@ -430,15 +362,20 @@ impl IndexWriter { // // We can now open a new generation and reaccept segments // from now on. - self.segment_update_manager.send(SegmentUpdate::NewGeneration); + self.segment_updater.new_generation(); + + + // TODO Send rollback. + //let rollbacked_segments = self.segment_manager.rollback(); - let rollbacked_segments = self.segment_manager.rollback(); - for segment_id in rollbacked_segments { + // for segment_id in rollbacked_segments { + // // TODO all delete must happen after saving + // // meta.json + // self.index.delete_segment(segment_id); + // } + + panic!("aaaa"); - // TODO all delete must happen after saving - // meta.json - self.index.delete_segment(segment_id); - } // reset the docstamp self.uncommitted_docstamp = self.committed_docstamp; @@ -490,7 +427,7 @@ impl IndexWriter { // This will move uncommitted segments to the state of // committed segments. - let future = self.segment_update_manager.send(SegmentUpdate::Commit(self.committed_docstamp)); + let future = self.segment_updater.commit(self.committed_docstamp); // wait for the segment update thread to have processed the info // TODO remove unwrap diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index f3d286434..a41f211f0 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -1,6 +1,7 @@ use Result; use core::SegmentReader; use core::Segment; +use core::SegmentId; use DocId; use core::SerializableSegment; use indexer::SegmentSerializer; @@ -205,6 +206,7 @@ mod tests { use collector::tests::TestCollector; use query::BooleanQuery; use schema::TextIndexingOptions; + use eventual::Async; #[test] fn test_index_merger() { @@ -260,9 +262,11 @@ mod tests { } } { - let segments = index.searchable_segments().expect("Searchable segments failed."); + let segment_ids = index.searchable_segment_ids().expect("Searchable segments failed."); let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); - index_writer.merge(&segments).expect("Merging failed"); + index_writer.merge(&segment_ids) + .await() + .expect("Merging failed"); index_writer.wait_merging_threads().unwrap(); } { diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 89a318a09..53ab821c6 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -20,7 +20,7 @@ use std::thread::JoinHandle; use std::sync::Arc; use std::collections::HashMap; use rustc_serialize::json; -use indexer::delete_queue::DeleteQueue; +use indexer::delete_queue::{DeleteQueueCursor, DeleteQueue}; use Result; use core::IndexMeta; use core::META_FILEPATH; @@ -91,6 +91,9 @@ pub enum SegmentUpdate { /// Created by the indexing worker thread AddSegment(SegmentEntry), + + StartMerge(Vec), + /// A merge is ended. /// Remove the merged segment and record the new /// large merged segment. @@ -119,8 +122,6 @@ pub enum SegmentUpdate { } - - // TODO Rename #[derive(Clone)] pub struct SegmentUpdater { @@ -130,21 +131,56 @@ pub struct SegmentUpdater { impl SegmentUpdater { - pub fn new( + pub fn create( index: Index, - segment_manager: Arc, - merge_policy: Arc>>) -> SegmentUpdater { + delete_cursor: DeleteQueueCursor, + merge_policy: Arc>>) -> Result { let (segment_update_sender, segment_update_receiver): (SegmentUpdateSender, SegmentUpdateReceiver) = chan::async(); - let segment_update_manager = SegmentUpdater { + let segment_updater = SegmentUpdater { channel: segment_update_sender, }; + let committed_segments = index.committed_segments()?; + let segment_manager = SegmentManager::from_segments(committed_segments, delete_cursor); SegmentUpdateRunner::new( index, segment_manager, merge_policy, - segment_update_manager.clone(), + segment_updater.clone(), segment_update_receiver).start(); - segment_update_manager + Ok(segment_updater) + } + + pub fn add_segment(&self, segment_entry: SegmentEntry) -> impl Async { + self.send(SegmentUpdate::AddSegment(segment_entry)) + } + + pub fn commit(&self, committed_docstamp: u64) -> impl Async { + self.send(SegmentUpdate::Commit(committed_docstamp)) + } + + pub fn start_merge(&self, segment_ids: Vec) -> impl Async { + self.send(SegmentUpdate::StartMerge(segment_ids)) + } + + pub fn new_generation(&self) -> impl Async { + self.send(SegmentUpdate::NewGeneration) + } + + pub fn cancel_generation(&self) -> impl Async { + self.send(SegmentUpdate::CancelGeneration) + } + + + pub fn end_merge(&self, + merge_thread_id: Option, + merged_segment_ids: Vec, + resulting_segment_entry: SegmentEntry) -> impl Async { + let segment_update = SegmentUpdate::EndMerge(merge_thread_id, merged_segment_ids, resulting_segment_entry); + self.send(segment_update) + } + + pub fn terminate(&self) -> impl Async { + self.send(SegmentUpdate::Terminate) } pub fn send(&self, segment_update: SegmentUpdate) -> impl Async { @@ -170,8 +206,8 @@ pub struct SegmentUpdateRunner { index: Index, is_cancelled_generation: bool, segment_update_receiver: SegmentUpdateReceiver, - segment_update_manager: SegmentUpdater, - segment_manager: Arc, + segment_updater: SegmentUpdater, + segment_manager: SegmentManager, merge_policy: Arc>>, merging_thread_id: usize, merging_threads: HashMap, SegmentEntry)> >, @@ -180,14 +216,14 @@ pub struct SegmentUpdateRunner { impl SegmentUpdateRunner { fn new(index: Index, - segment_manager: Arc, + segment_manager: SegmentManager, merge_policy: Arc>>, - segment_update_manager: SegmentUpdater, + segment_updater: SegmentUpdater, segment_update_receiver: SegmentUpdateReceiver) -> SegmentUpdateRunner { SegmentUpdateRunner { index: index, is_cancelled_generation: false, - segment_update_manager: segment_update_manager, + segment_updater: segment_updater, segment_update_receiver: segment_update_receiver, segment_manager: segment_manager, merge_policy: merge_policy, @@ -206,10 +242,10 @@ impl SegmentUpdateRunner { &mut self, segment_ids: Vec, segment_entry: SegmentEntry) { - let segment_manager = self.segment_manager.clone(); - segment_manager.end_merge(&segment_ids, segment_entry); + + self.segment_manager.end_merge(&segment_ids, segment_entry); save_metas( - &*segment_manager, + &self.segment_manager, self.index.schema(), self.index.docstamp(), self.index.directory_mut()).expect("Could not save metas."); @@ -221,60 +257,64 @@ impl SegmentUpdateRunner { self.index.load_searchers().unwrap(); } + + fn start_merge(&mut self, segment_ids: Vec, complete_opt: Option>) { + + let merging_thread_id = self.new_merging_thread_id(); + self.segment_manager.start_merge(&segment_ids); + + let index_clone = self.index.clone(); + let segment_updater_clone = self.segment_updater.clone(); + + let merge_thread_handle = thread::Builder::new() + .name(format!("merge_thread_{:?}", merging_thread_id)) + .spawn(move || { + info!("Start merge: {:?}", segment_ids); + let schema = index_clone.schema(); + let segments: Vec = segment_ids + .iter() + .map(|&segment_id| index_clone.segment(segment_id)) + .collect(); + // An IndexMerger is like a "view" of our merged segments. + // TODO unwrap + let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).expect("Creating index merger failed"); + let mut merged_segment = index_clone.new_segment(); + // ... we just serialize this index merger in our new segment + // to merge the two segments. + let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).expect("Creating index serializer failed"); + let num_docs = merger.write(segment_serializer).expect("Serializing merged index failed"); + let segment_meta = SegmentMeta { + segment_id: merged_segment.id(), + num_docs: num_docs, + num_deleted_docs: 0u32, + }; + + // TODO fix delete cursor + let delete_queue = DeleteQueue::default(); + + let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor()); + + let segment_update = SegmentUpdate::EndMerge(Some(merging_thread_id), segment_ids.clone(), segment_entry.clone()); + segment_updater_clone.send(segment_update.clone()); + if let Some(complete) = complete_opt { + complete.complete(()); + } + (segment_ids, segment_entry) + }) + .expect("Failed to spawn merge thread"); + + self.merging_threads.insert(merging_thread_id, merge_thread_handle); + } + fn start_merges(&mut self) { - let merge_candidates = self.consider_merge_options(); - for MergeCandidate(segment_ids) in merge_candidates { - - let merging_thread_id = self.new_merging_thread_id(); - - self.segment_manager().start_merge(&segment_ids); - - let index_clone = self.index.clone(); - let segment_update_manager_clone = self.segment_update_manager.clone(); - - let merge_thread_handle = thread::Builder::new() - .name(format!("merge_thread_{:?}", merging_thread_id)) - .spawn(move || { - info!("Start merge: {:?}", segment_ids); - let schema = index_clone.schema(); - let segments: Vec = segment_ids - .iter() - .map(|&segment_id| index_clone.segment(segment_id)) - .collect(); - // An IndexMerger is like a "view" of our merged segments. - // TODO unwrap - let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).expect("Creating index merger failed"); - let mut merged_segment = index_clone.new_segment(); - // ... we just serialize this index merger in our new segment - // to merge the two segments. - let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).expect("Creating index serializer failed"); - let num_docs = merger.write(segment_serializer).expect("Serializing merged index failed"); - let segment_meta = SegmentMeta { - segment_id: merged_segment.id(), - num_docs: num_docs, - num_deleted_docs: 0u32, - }; - - // TODO fix delete cursor - let delete_queue = DeleteQueue::default(); - - let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor()); - - let segment_update = SegmentUpdate::EndMerge(Some(merging_thread_id), segment_ids.clone(), segment_entry.clone()); - segment_update_manager_clone.send(segment_update.clone()); - (segment_ids, segment_entry) - }) - .expect("Failed to spawn merge thread"); - - self.merging_threads.insert(merging_thread_id, merge_thread_handle); + self.start_merge(segment_ids, None); } } fn consider_merge_options(&self,) -> Vec { - let segment_manager = self.segment_manager(); - let (committed_segments, uncommitted_segments) = get_segment_ready_for_commit(segment_manager); + let (committed_segments, uncommitted_segments) = get_segment_ready_for_commit(&self.segment_manager); // Committed segments cannot be merged with uncommitted_segments. // We therefore consider merges using these two sets of segments independantly. let merge_policy_lock = self.merge_policy.lock().unwrap(); @@ -284,11 +324,6 @@ impl SegmentUpdateRunner { merge_candidates } - - fn segment_manager(&self,) -> &SegmentManager { - &*self.segment_manager - } - pub fn start(self) -> JoinHandle<()> { thread::Builder::new() .name("segment_update".to_string()) @@ -299,9 +334,7 @@ impl SegmentUpdateRunner { } fn process(mut self) { - - let segment_manager = self.segment_manager.clone(); - + let mut complete_option = None; for (complete, segment_update) in self.segment_update_receiver.clone() { @@ -316,40 +349,46 @@ impl SegmentUpdateRunner { // dirty-bit. If the value is different // to our generation, then the segment_manager has // been update updated. - let generation_before_update = segment_manager.generation(); + let generation_before_update = self.segment_manager.generation(); - self.process_one(segment_update); - - if generation_before_update != segment_manager.generation() { - // The segment manager has changed, we need to - // - save meta.json - save_metas( - &*segment_manager, - self.index.schema(), - self.index.docstamp(), - self.index.directory_mut()).expect("Could not save metas."); + if let SegmentUpdate::StartMerge(segment_ids) = segment_update { + self.start_merge(segment_ids, Some(complete)); + } + else { + self.process_one(segment_update); + if generation_before_update != self.segment_manager.generation() { + // The segment manager has changed, we need to + // - save meta.json + save_metas( + &self.segment_manager, + self.index.schema(), + self.index.docstamp(), + self.index.directory_mut()).expect("Could not save metas."); - // - update the searchers - - // update the searchers so that they eventually will - // use the new segments. - // TODO eventually have this work through watching meta.json - // so that an external process stays up to date as well. - match self.index.load_searchers() { - Ok(()) => { - } - Err(e) => { - error!("Failure while loading new searchers {:?}", e); - panic!(format!("Failure while loading new searchers {:?}", e)); + // - update the searchers + + // update the searchers so that they eventually will + // use the new segments. + // TODO eventually have this work through watching meta.json + // so that an external process stays up to date as well. + match self.index.load_searchers() { + Ok(()) => {} + Err(e) => { + error!("Failure while loading new searchers {:?}", e); + panic!(format!("Failure while loading new searchers {:?}", e)); + } } + + // - start merges if required + self.start_merges(); } - - // - start merges if required - self.start_merges(); + complete.complete(()); } - complete.complete(()); + + + } @@ -381,12 +420,12 @@ impl SegmentUpdateRunner { segment_update: SegmentUpdate) { info!("Segment update: {:?}", segment_update); - + use self::SegmentUpdate::*; match segment_update { AddSegment(segment_entry) => { if !self.is_cancelled_generation { - self.segment_manager().add_segment(segment_entry); + self.segment_manager.add_segment(segment_entry); } else { // rollback has been called and this @@ -397,6 +436,9 @@ impl SegmentUpdateRunner { self.index.delete_segment(segment_entry.segment_id()); } } + StartMerge(segment_ids) => { + panic!("this should have been handled somewhere else"); + } EndMerge(merging_thread_id_opt, segment_ids, segment_entry) => { self.end_merge( segment_ids, @@ -417,7 +459,7 @@ impl SegmentUpdateRunner { self.is_cancelled_generation = false; } Commit(docstamp) => { - self.segment_manager().commit(docstamp); + self.segment_manager.commit(docstamp); } Terminate => { panic!("We should have left the loop before processing it."); diff --git a/src/lib.rs b/src/lib.rs index 9ba528862..ed0d726fa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -50,6 +50,8 @@ extern crate crossbeam; extern crate bit_set; extern crate notify; extern crate eventual; +extern crate futures; +extern crate futures_cpupool; #[cfg(feature="simdcompression")] extern crate libc;