Executor made sorted

This commit is contained in:
Paul Masurel
2018-11-30 22:52:26 +09:00
parent 07d87e154b
commit 6af0488dbe
2 changed files with 32 additions and 22 deletions

View File

@@ -31,19 +31,22 @@ impl Executor {
//
// Regardless of the executor (`SingleThread` or `ThreadPool`), panics in the task
// will propagate to the caller.
pub fn map_unsorted<A: Send, R: Send, AIterator: Iterator<Item=A>, F: Sized + Sync + Fn(A) -> Result<R>>(&self, f: F, args: AIterator) -> Result<Vec<R>> {
pub fn map<A: Send, R: Send, AIterator: Iterator<Item=A>, F: Sized + Sync + Fn(A) -> Result<R>>(&self, f: F, args: AIterator) -> Result<Vec<R>> {
match self {
Executor::SingleThread => {
args.map(f).collect::<Result<_>>()
}
Executor::ThreadPool(pool) => {
let args_with_indices: Vec<(usize, A)> = args.enumerate().collect();
let num_fruits = args_with_indices.len();
let fruit_receiver = {
let (fruit_sender, fruit_receiver) = channel::unbounded();
pool.scoped(|scope| {
for arg in args {
for arg_with_idx in args_with_indices {
scope.execute(|| {
let (idx, arg) = arg_with_idx;
let fruit = f(arg);
if let Err(err) = fruit_sender.send(fruit) {
if let Err(err) = fruit_sender.send((idx, fruit)) {
error!("Failed to send search task. It probably means all search threads have panicked. {:?}", err);
}
});
@@ -54,9 +57,17 @@ impl Executor {
// This is important as it makes it possible for the fruit_receiver iteration to
// terminate.
};
fruit_receiver
.into_iter()
.collect::<Result<_>>()
let mut results = Vec::with_capacity(num_fruits);
unsafe {results.set_len(num_fruits)};
let mut num_items = 0;
for (pos, fruit_res) in fruit_receiver {
results[pos] = fruit_res?;
num_items += 1;
}
// this checks ensures that we filled of this
// uninitialized memory.
assert_eq!(num_items, results.len());
Ok(results)
}
}
}
@@ -70,35 +81,34 @@ mod tests {
#[test]
#[should_panic(expected="panic should propagate")]
fn test_panic_propagates_single_thread() {
let _result: Vec<usize> = Executor::single_thread().map_unsorted(|_| {panic!("panic should propagate"); }, vec![0].into_iter()).unwrap();
let _result: Vec<usize> = Executor::single_thread().map(|_| {panic!("panic should propagate"); }, vec![0].into_iter()).unwrap();
}
#[test]
#[should_panic] //< unfortunately the panic message is not propagated
fn test_panic_propagates_multi_thread() {
let _result: Vec<usize> = Executor::multi_thread(1, "search-test")
.map_unsorted(|_| {panic!("panic should propagate"); }, vec![0].into_iter()).unwrap();
.map(|_| {panic!("panic should propagate"); }, vec![0].into_iter()).unwrap();
}
#[test]
fn test_map_singlethread() {
let result: Vec<usize> = Executor::single_thread()
.map_unsorted(|i| { Ok(i * 2) }, 0..1_000).unwrap();
.map(|i| { Ok(i * 2) }, 0..1_000).unwrap();
assert_eq!(result.len(), 1_000);
for i in 0..1_000 {
assert_eq!(result[i], i * 2);
}
}
#[test]
fn test_map_multithread() {
let mut result: Vec<usize> = Executor::multi_thread(3, "search-test")
.map_unsorted(|i| Ok(i * 2), 0..10).unwrap();
assert_eq!(result.len(), 10);
result.sort();
for i in 0..10 {
assert_eq!(result[i], i * 2);
}
}
}
}
#[test]
fn test_map_multithread() {
let result: Vec<usize> = Executor::multi_thread(3, "search-test")
.map(|i| Ok(i * 2), 0..10).unwrap();
assert_eq!(result.len(), 10);
for i in 0..10 {
assert_eq!(result[i], i * 2);
}
}

View File

@@ -152,10 +152,10 @@ impl Searcher {
let weight = query.weight(self, scoring_enabled)?;
let segment_readers = self.segment_readers();
let fruits = executor
.map_unsorted(|(segment_ord, segment_reader)| {
.map(|(segment_ord, segment_reader)| {
collect_segment(collector, weight.as_ref(), segment_ord as u32, segment_reader)
},
segment_readers.iter().enumerate())?;
segment_readers.iter().enumerate())?;
collector.merge_fruits(fruits)
}