Removed AtomicUsize.

This commit is contained in:
Paul Masurel
2022-05-04 16:43:04 +09:00
parent a1afc80600
commit be70804d17

View File

@@ -1,8 +1,8 @@
use std::sync::atomic::AtomicUsize;
use crossbeam::channel;
use rayon::{ThreadPool, ThreadPoolBuilder};
use crate::TantivyError;
/// Search executor whether search request are single thread or multithread.
///
/// We don't expose Rayon thread pool directly here for several reasons.
@@ -55,10 +55,13 @@ impl Executor {
let (fruit_sender, fruit_receiver) = channel::unbounded();
pool.scope(|scope| {
for (idx, arg) in args.into_iter().enumerate() {
let idx = AtomicUsize::new(idx);
scope.spawn(|_| {
let fruit = f(arg);
if let Err(err) = fruit_sender.send((idx.into_inner(), fruit)) {
// We name references for f and fruit_sender_ref because we do not
// want these two to be moved into the closure.
let f_ref = &f;
let fruit_sender_ref = &fruit_sender;
scope.spawn(move |_| {
let fruit = f_ref(arg);
if let Err(err) = fruit_sender_ref.send((idx, fruit)) {
error!(
"Failed to send search task. It probably means all search \
threads have panicked. {:?}",
@@ -73,18 +76,19 @@ impl Executor {
// This is important as it makes it possible for the fruit_receiver iteration to
// terminate.
};
// This is lame, but safe.
let mut results_with_position = Vec::with_capacity(num_fruits);
let mut result_placeholders: Vec<Option<R>> =
std::iter::repeat_with(|| None).take(num_fruits).collect();
for (pos, fruit_res) in fruit_receiver {
let fruit = fruit_res?;
results_with_position.push((pos, fruit));
result_placeholders[pos] = Some(fruit);
}
results_with_position.sort_by_key(|(pos, _)| *pos);
assert_eq!(results_with_position.len(), num_fruits);
Ok(results_with_position
.into_iter()
.map(|(_, fruit)| fruit)
.collect::<Vec<_>>())
let results: Vec<R> = result_placeholders.into_iter().flatten().collect();
if results.len() != num_fruits {
return Err(TantivyError::InternalError(
"One of the mapped execution failed.".to_string(),
));
}
Ok(results)
}
}
}