diff --git a/src/core/executor.rs b/src/core/executor.rs index 490d68630..eda83707b 100644 --- a/src/core/executor.rs +++ b/src/core/executor.rs @@ -31,19 +31,22 @@ impl Executor { // // Regardless of the executor (`SingleThread` or `ThreadPool`), panics in the task // will propagate to the caller. - pub fn map_unsorted, F: Sized + Sync + Fn(A) -> Result>(&self, f: F, args: AIterator) -> Result> { + pub fn map, F: Sized + Sync + Fn(A) -> Result>(&self, f: F, args: AIterator) -> Result> { match self { Executor::SingleThread => { args.map(f).collect::>() } Executor::ThreadPool(pool) => { + let args_with_indices: Vec<(usize, A)> = args.enumerate().collect(); + let num_fruits = args_with_indices.len(); let fruit_receiver = { let (fruit_sender, fruit_receiver) = channel::unbounded(); pool.scoped(|scope| { - for arg in args { + for arg_with_idx in args_with_indices { scope.execute(|| { + let (idx, arg) = arg_with_idx; let fruit = f(arg); - if let Err(err) = fruit_sender.send(fruit) { + if let Err(err) = fruit_sender.send((idx, fruit)) { error!("Failed to send search task. It probably means all search threads have panicked. {:?}", err); } }); @@ -54,9 +57,17 @@ impl Executor { // This is important as it makes it possible for the fruit_receiver iteration to // terminate. }; - fruit_receiver - .into_iter() - .collect::>() + let mut results = Vec::with_capacity(num_fruits); + unsafe {results.set_len(num_fruits)}; + let mut num_items = 0; + for (pos, fruit_res) in fruit_receiver { + results[pos] = fruit_res?; + num_items += 1; + } + // this checks ensures that we filled of this + // uninitialized memory. + assert_eq!(num_items, results.len()); + Ok(results) } } } @@ -70,35 +81,34 @@ mod tests { #[test] #[should_panic(expected="panic should propagate")] fn test_panic_propagates_single_thread() { - let _result: Vec = Executor::single_thread().map_unsorted(|_| {panic!("panic should propagate"); }, vec![0].into_iter()).unwrap(); + let _result: Vec = Executor::single_thread().map(|_| {panic!("panic should propagate"); }, vec![0].into_iter()).unwrap(); } #[test] #[should_panic] //< unfortunately the panic message is not propagated fn test_panic_propagates_multi_thread() { let _result: Vec = Executor::multi_thread(1, "search-test") - .map_unsorted(|_| {panic!("panic should propagate"); }, vec![0].into_iter()).unwrap(); + .map(|_| {panic!("panic should propagate"); }, vec![0].into_iter()).unwrap(); } #[test] fn test_map_singlethread() { let result: Vec = Executor::single_thread() - .map_unsorted(|i| { Ok(i * 2) }, 0..1_000).unwrap(); + .map(|i| { Ok(i * 2) }, 0..1_000).unwrap(); assert_eq!(result.len(), 1_000); for i in 0..1_000 { assert_eq!(result[i], i * 2); } } - #[test] - fn test_map_multithread() { - let mut result: Vec = Executor::multi_thread(3, "search-test") - .map_unsorted(|i| Ok(i * 2), 0..10).unwrap(); - assert_eq!(result.len(), 10); - result.sort(); - for i in 0..10 { - assert_eq!(result[i], i * 2); - } - } +} -} \ No newline at end of file +#[test] +fn test_map_multithread() { + let result: Vec = Executor::multi_thread(3, "search-test") + .map(|i| Ok(i * 2), 0..10).unwrap(); + assert_eq!(result.len(), 10); + for i in 0..10 { + assert_eq!(result[i], i * 2); + } +} diff --git a/src/core/searcher.rs b/src/core/searcher.rs index acece093f..89346390e 100644 --- a/src/core/searcher.rs +++ b/src/core/searcher.rs @@ -152,10 +152,10 @@ impl Searcher { let weight = query.weight(self, scoring_enabled)?; let segment_readers = self.segment_readers(); let fruits = executor - .map_unsorted(|(segment_ord, segment_reader)| { + .map(|(segment_ord, segment_reader)| { collect_segment(collector, weight.as_ref(), segment_ord as u32, segment_reader) }, - segment_readers.iter().enumerate())?; + segment_readers.iter().enumerate())?; collector.merge_fruits(fruits) }