diff --git a/Cargo.toml b/Cargo.toml index e5ad7af1f..a63ac511a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,10 +50,10 @@ owned-read = "0.4" failure = "0.1" htmlescape = "0.3.1" fail = "0.3" -scoped-pool = "1.0" murmurhash32 = "0.2" chrono = "0.4" smallvec = "1.0" +rayon = "1" [target.'cfg(windows)'.dependencies] winapi = "0.3" @@ -64,6 +64,10 @@ maplit = "1" matches = "0.1.8" time = "0.1.42" +[dev-dependencies.fail] +version = "0.3" +features = ["failpoints"] + [profile.release] opt-level = 3 debug = false @@ -87,10 +91,6 @@ members = ["query-grammar"] [badges] travis-ci = { repository = "tantivy-search/tantivy" } -[dev-dependencies.fail] -version = "0.3" -features = ["failpoints"] - # Following the "fail" crate best practises, we isolate # tests that define specific behavior in fail check points # in a different binary. diff --git a/src/core/executor.rs b/src/core/executor.rs index c90d0b8a6..8e058e932 100644 --- a/src/core/executor.rs +++ b/src/core/executor.rs @@ -1,6 +1,6 @@ use crate::Result; use crossbeam::channel; -use scoped_pool::{Pool, ThreadConfig}; +use rayon::{ThreadPool, ThreadPoolBuilder}; /// Search executor whether search request are single thread or multithread. /// @@ -11,7 +11,7 @@ use scoped_pool::{Pool, ThreadConfig}; /// used by the client. Second, we may stop using rayon in the future. pub enum Executor { SingleThread, - ThreadPool(Pool), + ThreadPool(ThreadPool), } impl Executor { @@ -21,10 +21,12 @@ impl Executor { } // Creates an Executor that dispatches the tasks in a thread pool. - pub fn multi_thread(num_threads: usize, prefix: &'static str) -> Executor { - let thread_config = ThreadConfig::new().prefix(prefix); - let pool = Pool::with_thread_config(num_threads, thread_config); - Executor::ThreadPool(pool) + pub fn multi_thread(num_threads: usize, prefix: &'static str) -> Result { + let pool = ThreadPoolBuilder::new() + .num_threads(num_threads) + .thread_name(move |num| format!("{}{}", prefix, num)) + .build()?; + Ok(Executor::ThreadPool(pool)) } // Perform a map in the thread pool. @@ -48,9 +50,9 @@ impl Executor { let num_fruits = args_with_indices.len(); let fruit_receiver = { let (fruit_sender, fruit_receiver) = channel::unbounded(); - pool.scoped(|scope| { + pool.scope(|scope| { for arg_with_idx in args_with_indices { - scope.execute(|| { + scope.spawn(|_| { let (idx, arg) = arg_with_idx; let fruit = f(arg); if let Err(err) = fruit_sender.send((idx, fruit)) { @@ -103,6 +105,7 @@ mod tests { #[should_panic] //< unfortunately the panic message is not propagated fn test_panic_propagates_multi_thread() { let _result: Vec = Executor::multi_thread(1, "search-test") + .unwrap() .map( |_| { panic!("panic should propagate"); @@ -126,6 +129,7 @@ mod tests { #[test] fn test_map_multithread() { let result: Vec = Executor::multi_thread(3, "search-test") + .unwrap() .map(|i| Ok(i * 2), 0..10) .unwrap(); assert_eq!(result.len(), 10); diff --git a/src/core/index.rs b/src/core/index.rs index 365a119a8..e908dc953 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -73,15 +73,16 @@ impl Index { /// Replace the default single thread search executor pool /// by a thread pool with a given number of threads. - pub fn set_multithread_executor(&mut self, num_threads: usize) { - self.executor = Arc::new(Executor::multi_thread(num_threads, "thrd-tantivy-search-")); + pub fn set_multithread_executor(&mut self, num_threads: usize) -> Result<()> { + self.executor = Arc::new(Executor::multi_thread(num_threads, "thrd-tantivy-search-")?); + Ok(()) } /// Replace the default single thread search executor pool /// by a thread pool with a given number of threads. - pub fn set_default_multithread_executor(&mut self) { + pub fn set_default_multithread_executor(&mut self) -> Result<()> { let default_num_threads = num_cpus::get(); - self.set_multithread_executor(default_num_threads); + self.set_multithread_executor(default_num_threads) } /// Creates a new index using the `RAMDirectory`. diff --git a/src/error.rs b/src/error.rs index 10c6ca782..ac6d96216 100644 --- a/src/error.rs +++ b/src/error.rs @@ -170,3 +170,9 @@ impl From for TantivyError { TantivyError::IOError(io_err.into()) } } + +impl From for TantivyError { + fn from(error: rayon::ThreadPoolBuildError) -> TantivyError { + TantivyError::SystemError(error.to_string()) + } +}