diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 0ba779ec3..e75861d35 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -248,10 +248,7 @@ fn index_documents(heap: &mut Heap, let mut segment_entry = SegmentEntry::new(segment_meta); segment_entry.set_doc_to_opstamp(DocToOpstampMapping::from(doc_opstamps)); - segment_updater - .add_segment(generation, segment_entry) - .wait() - .map_err(|_| Error::ErrorInThread("Could not add segment.".to_string())) + Ok(segment_updater.add_segment(generation, segment_entry)) } @@ -420,7 +417,7 @@ impl IndexWriter { // No new document have been added in the meanwhile because `IndexWriter` // is not shared by different threads. - rollback_future.wait().map_err(|_| + rollback_future.map_err(|_| Error::ErrorInThread("Error while waiting for rollback.".to_string()) )?; @@ -480,8 +477,7 @@ impl IndexWriter { // wait for the segment update thread to have processed the info self.segment_updater - .commit(self.committed_opstamp) - .wait()?; + .commit(self.committed_opstamp)?; self.delete_queue.clear(); Ok(self.committed_opstamp) diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index eb9f11578..5a4de3ed1 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -9,8 +9,9 @@ use core::SegmentMeta; use core::SerializableSegment; use directory::Directory; use Error; +use std::result; use futures_cpupool::CpuPool; -use futures::{Future, future}; +use futures::Future; use futures::Canceled; use futures::oneshot; use indexer::{MergePolicy, DefaultMergePolicy}; @@ -21,6 +22,7 @@ use indexer::merger::IndexMerger; use indexer::SegmentEntry; use indexer::SegmentSerializer; use Result; +use futures_cpupool::CpuFuture; use rustc_serialize::json; use schema::Schema; use std::borrow::BorrowMut; @@ -124,9 +126,7 @@ impl SegmentUpdater { pub fn new_segment(&self) -> Segment { let new_segment = self.0.index.new_segment(); let segment_id = new_segment.id(); - self.run_async(move |segment_updater| { - segment_updater.0.segment_manager.write_segment(segment_id); - }); + self.0.segment_manager.write_segment(segment_id); new_segment } @@ -141,32 +141,32 @@ impl SegmentUpdater { fn get_merging_thread_id(&self) -> usize { self.0.merging_thread_id.fetch_add(1, Ordering::SeqCst) } - - - fn run_async T>(&self, f: F) -> impl Future { + + fn run_async T>(&self, f: F) -> CpuFuture { let me_clone = self.clone(); self.0.pool.spawn_fn(move || { Ok(f(me_clone)) }) } - pub fn rollback(&mut self, generation: usize) -> impl Future { + pub fn rollback(&mut self, generation: usize) -> result::Result<(), Error> { self.0.generation.store(generation, Ordering::Release); self.run_async(|segment_updater| { segment_updater.0.segment_manager.rollback(); - }) + }).wait() } - pub fn add_segment(&self, generation: usize, segment_entry: SegmentEntry) -> impl Future { + pub fn add_segment(&self, generation: usize, segment_entry: SegmentEntry) -> bool { if generation >= self.0.generation.load(Ordering::Acquire) { - future::Either::A(self.run_async(|segment_updater| { + self.run_async(|segment_updater| { segment_updater.0.segment_manager.add_segment(segment_entry); segment_updater.consider_merge_options(); true - })) + }).forget(); + true } else { - future::Either::B(future::ok(false)) + false } } @@ -181,7 +181,7 @@ impl SegmentUpdater { .collect() } - pub fn commit(&self, opstamp: u64) -> impl Future { + pub fn commit(&self, opstamp: u64) -> Result<()> { self.run_async(move |segment_updater| { let segment_metas = segment_updater.purge_deletes().expect("Failed purge deletes"); segment_updater.0.segment_manager.commit(segment_metas); @@ -197,8 +197,7 @@ impl SegmentUpdater { let living_files = segment_updater.0.segment_manager.list_files(); index.directory_mut().garbage_collect(living_files); segment_updater.consider_merge_options(); - - }) + }).wait() } @@ -266,9 +265,15 @@ impl SegmentUpdater { let segment_entry = SegmentEntry::new(segment_meta); segment_updater_clone .end_merge(segment_metas.clone(), segment_entry.clone()) - .wait() .unwrap(); - merging_future_send.complete(segment_entry.clone()); + + // Send will fail if nobody is waiting for the result and + // the receiver side got destroyed. + // + // This is not a problem. + let _send_result = merging_future_send + .send(segment_entry.clone()); + segment_updater_clone.0.merging_threads.write().unwrap().remove(&merging_thread_id); Ok(segment_entry) }); @@ -293,7 +298,7 @@ impl SegmentUpdater { fn end_merge(&self, merged_segment_metas: Vec, - resulting_segment_entry: SegmentEntry) -> impl Future { + resulting_segment_entry: SegmentEntry) -> Result<()> { self.run_async(move |segment_updater| { segment_updater.0.segment_manager.end_merge(&merged_segment_metas, resulting_segment_entry); @@ -304,8 +309,7 @@ impl SegmentUpdater { segment_updater.0.index.schema(), segment_updater.0.index.opstamp(), directory.borrow_mut()).expect("Could not save metas."); - }) - + }).wait() } pub fn wait_merging_thread(&self) -> Result<()> {