issue/109 Remove futures from most of segment_updater API.

This commit is contained in:
Paul Masurel
2017-03-25 19:35:58 +09:00
parent daa19b770a
commit f50f557cfc
2 changed files with 28 additions and 28 deletions

View File

@@ -248,10 +248,7 @@ fn index_documents(heap: &mut Heap,
let mut segment_entry = SegmentEntry::new(segment_meta);
segment_entry.set_doc_to_opstamp(DocToOpstampMapping::from(doc_opstamps));
segment_updater
.add_segment(generation, segment_entry)
.wait()
.map_err(|_| Error::ErrorInThread("Could not add segment.".to_string()))
Ok(segment_updater.add_segment(generation, segment_entry))
}
@@ -420,7 +417,7 @@ impl IndexWriter {
// No new document have been added in the meanwhile because `IndexWriter`
// is not shared by different threads.
rollback_future.wait().map_err(|_|
rollback_future.map_err(|_|
Error::ErrorInThread("Error while waiting for rollback.".to_string())
)?;
@@ -480,8 +477,7 @@ impl IndexWriter {
// wait for the segment update thread to have processed the info
self.segment_updater
.commit(self.committed_opstamp)
.wait()?;
.commit(self.committed_opstamp)?;
self.delete_queue.clear();
Ok(self.committed_opstamp)

View File

@@ -9,8 +9,9 @@ use core::SegmentMeta;
use core::SerializableSegment;
use directory::Directory;
use Error;
use std::result;
use futures_cpupool::CpuPool;
use futures::{Future, future};
use futures::Future;
use futures::Canceled;
use futures::oneshot;
use indexer::{MergePolicy, DefaultMergePolicy};
@@ -21,6 +22,7 @@ use indexer::merger::IndexMerger;
use indexer::SegmentEntry;
use indexer::SegmentSerializer;
use Result;
use futures_cpupool::CpuFuture;
use rustc_serialize::json;
use schema::Schema;
use std::borrow::BorrowMut;
@@ -124,9 +126,7 @@ impl SegmentUpdater {
pub fn new_segment(&self) -> Segment {
let new_segment = self.0.index.new_segment();
let segment_id = new_segment.id();
self.run_async(move |segment_updater| {
segment_updater.0.segment_manager.write_segment(segment_id);
});
self.0.segment_manager.write_segment(segment_id);
new_segment
}
@@ -141,32 +141,32 @@ impl SegmentUpdater {
fn get_merging_thread_id(&self) -> usize {
self.0.merging_thread_id.fetch_add(1, Ordering::SeqCst)
}
fn run_async<T: 'static + Send, F: 'static + Send + FnOnce(SegmentUpdater) -> T>(&self, f: F) -> impl Future<Item=T, Error=Error> {
fn run_async<T: 'static + Send, F: 'static + Send + FnOnce(SegmentUpdater) -> T>(&self, f: F) -> CpuFuture<T, Error> {
let me_clone = self.clone();
self.0.pool.spawn_fn(move || {
Ok(f(me_clone))
})
}
pub fn rollback(&mut self, generation: usize) -> impl Future<Item=(), Error=Error> {
pub fn rollback(&mut self, generation: usize) -> result::Result<(), Error> {
self.0.generation.store(generation, Ordering::Release);
self.run_async(|segment_updater| {
segment_updater.0.segment_manager.rollback();
})
}).wait()
}
pub fn add_segment(&self, generation: usize, segment_entry: SegmentEntry) -> impl Future<Item=bool, Error=Error> {
pub fn add_segment(&self, generation: usize, segment_entry: SegmentEntry) -> bool {
if generation >= self.0.generation.load(Ordering::Acquire) {
future::Either::A(self.run_async(|segment_updater| {
self.run_async(|segment_updater| {
segment_updater.0.segment_manager.add_segment(segment_entry);
segment_updater.consider_merge_options();
true
}))
}).forget();
true
}
else {
future::Either::B(future::ok(false))
false
}
}
@@ -181,7 +181,7 @@ impl SegmentUpdater {
.collect()
}
pub fn commit(&self, opstamp: u64) -> impl Future<Item=(), Error=Error> {
pub fn commit(&self, opstamp: u64) -> Result<()> {
self.run_async(move |segment_updater| {
let segment_metas = segment_updater.purge_deletes().expect("Failed purge deletes");
segment_updater.0.segment_manager.commit(segment_metas);
@@ -197,8 +197,7 @@ impl SegmentUpdater {
let living_files = segment_updater.0.segment_manager.list_files();
index.directory_mut().garbage_collect(living_files);
segment_updater.consider_merge_options();
})
}).wait()
}
@@ -266,9 +265,15 @@ impl SegmentUpdater {
let segment_entry = SegmentEntry::new(segment_meta);
segment_updater_clone
.end_merge(segment_metas.clone(), segment_entry.clone())
.wait()
.unwrap();
merging_future_send.complete(segment_entry.clone());
// Send will fail if nobody is waiting for the result and
// the receiver side got destroyed.
//
// This is not a problem.
let _send_result = merging_future_send
.send(segment_entry.clone());
segment_updater_clone.0.merging_threads.write().unwrap().remove(&merging_thread_id);
Ok(segment_entry)
});
@@ -293,7 +298,7 @@ impl SegmentUpdater {
fn end_merge(&self,
merged_segment_metas: Vec<SegmentMeta>,
resulting_segment_entry: SegmentEntry) -> impl Future<Item=(), Error=Error> {
resulting_segment_entry: SegmentEntry) -> Result<()> {
self.run_async(move |segment_updater| {
segment_updater.0.segment_manager.end_merge(&merged_segment_metas, resulting_segment_entry);
@@ -304,8 +309,7 @@ impl SegmentUpdater {
segment_updater.0.index.schema(),
segment_updater.0.index.opstamp(),
directory.borrow_mut()).expect("Could not save metas.");
})
}).wait()
}
pub fn wait_merging_thread(&self) -> Result<()> {