From 89f91b1b580331dbeac40567d63e45145eb8bd51 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 6 Oct 2021 12:10:16 +0900 Subject: [PATCH] first stab --- Cargo.toml | 3 ++- src/core/executor.rs | 46 ------------------------------------- src/indexer/index_writer.rs | 14 +++++++---- src/lib.rs | 4 ++++ 4 files changed, 16 insertions(+), 51 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c5f6bf39b..5cf7449bb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ snap = { version = "1.0.5", optional = true } tempfile = { version = "3.2", optional = true } log = "0.4.14" serde = { version = "1.0.126", features = ["derive"] } +serde_closure = "0.3" serde_json = "1.0.64" num_cpus = "1.13" fs2={ version = "0.4.3", optional = true } @@ -50,12 +51,12 @@ fail = "0.4" murmurhash32 = "0.2" chrono = "0.4.19" smallvec = "1.6.1" -rayon = "1.5" lru = "0.7.0" fastdivide = "0.3" itertools = "0.10.0" measure_time = "0.7.0" wasm-mt = "0.1" +wasm-mt-pool = "0.1" [target.'cfg(windows)'.dependencies] winapi = "0.3.9" diff --git a/src/core/executor.rs b/src/core/executor.rs index 8ac39a7eb..472336c00 100644 --- a/src/core/executor.rs +++ b/src/core/executor.rs @@ -1,5 +1,4 @@ use crossbeam::channel; -use rayon::{ThreadPool, ThreadPoolBuilder}; /// Search executor whether search request are single thread or multithread. /// @@ -11,8 +10,6 @@ use rayon::{ThreadPool, ThreadPoolBuilder}; pub enum Executor { /// Single thread variant of an Executor SingleThread, - /// Thread pool variant of an Executor - ThreadPool(ThreadPool), } impl Executor { @@ -21,15 +18,6 @@ impl Executor { Executor::SingleThread } - /// Creates an Executor that dispatches the tasks in a thread pool. - pub fn multi_thread(num_threads: usize, prefix: &'static str) -> crate::Result { - let pool = ThreadPoolBuilder::new() - .num_threads(num_threads) - .thread_name(move |num| format!("{}{}", prefix, num)) - .build()?; - Ok(Executor::ThreadPool(pool)) - } - /// Perform a map in the thread pool. /// /// Regardless of the executor (`SingleThread` or `ThreadPool`), panics in the task @@ -46,40 +34,6 @@ impl Executor { ) -> crate::Result> { match self { Executor::SingleThread => args.map(f).collect::>(), - Executor::ThreadPool(pool) => { - let args_with_indices: Vec<(usize, A)> = args.enumerate().collect(); - let num_fruits = args_with_indices.len(); - let fruit_receiver = { - let (fruit_sender, fruit_receiver) = channel::unbounded(); - pool.scope(|scope| { - for arg_with_idx in args_with_indices { - scope.spawn(|_| { - let (idx, arg) = arg_with_idx; - let fruit = f(arg); - if let Err(err) = fruit_sender.send((idx, fruit)) { - error!("Failed to send search task. It probably means all search threads have panicked. {:?}", err); - } - }); - } - }); - fruit_receiver - // This ends the scope of fruit_sender. - // This is important as it makes it possible for the fruit_receiver iteration to - // terminate. - }; - // This is lame, but safe. - let mut results_with_position = Vec::with_capacity(num_fruits); - for (pos, fruit_res) in fruit_receiver { - let fruit = fruit_res?; - results_with_position.push((pos, fruit)); - } - results_with_position.sort_by_key(|(pos, _)| *pos); - assert_eq!(results_with_position.len(), num_fruits); - Ok(results_with_position - .into_iter() - .map(|(_, fruit)| fruit) - .collect::>()) - } } } } diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index cef8380e6..a92f3ef35 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -29,9 +29,12 @@ use futures::executor::block_on; use futures::future::Future; use smallvec::smallvec; use smallvec::SmallVec; +use wasm_mt_pool::pool_exec; +use wasm_mt::prelude::*; use std::mem; use std::ops::Range; use std::sync::Arc; +use wasm_mt_pool::prelude::*; use std::thread; use std::thread::JoinHandle; @@ -75,7 +78,7 @@ pub struct IndexWriter { heap_size_in_bytes_per_thread: usize, - workers_join_handle: Vec>>, + workers_join_handle: Vec>>, operation_receiver: OperationReceiver, operation_sender: OperationSender, @@ -90,6 +93,8 @@ pub struct IndexWriter { stamper: Stamper, committed_opstamp: Opstamp, + + worker_pool: wasm_mt_pool::ThreadPool, } fn compute_deleted_bitset( @@ -318,6 +323,7 @@ impl IndexWriter { let segment_updater = SegmentUpdater::create(index.clone(), stamper.clone(), &delete_queue.cursor())?; + let worker_pool = block_on(wasm_mt_pool::ThreadPool::new(num_threads, crate::PKG_JS).and_init()).unwrap(); let mut index_writer = IndexWriter { _directory_lock: Some(directory_lock), @@ -338,6 +344,7 @@ impl IndexWriter { stamper, worker_id: 0, + worker_pool, }; index_writer.start_workers()?; Ok(index_writer) @@ -411,9 +418,8 @@ impl IndexWriter { let mem_budget = self.heap_size_in_bytes_per_thread; let index = self.index.clone(); - let join_handle: JoinHandle> = thread::Builder::new() - .name(format!("thrd-tantivy-index{}", self.worker_id)) - .spawn(move || { + let join_handle: JoinHandle> = pool_exec!(self.worker_pool, + move || { loop { let mut document_iterator = document_receiver_clone.clone().into_iter().peekable(); diff --git a/src/lib.rs b/src/lib.rs index 314ebb93a..d6d58b97c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,8 @@ #![doc(test(attr(allow(unused_variables), deny(warnings))))] #![warn(missing_docs)] +#![feature(async_closure)] + //! # `tantivy` //! //! Tantivy is a search engine library. @@ -126,6 +128,8 @@ mod macros; pub use crate::error::TantivyError; pub use chrono; +pub const PKG_JS: &'static str = "./pkg/pool_exec.js"; // path to `wasm-bindgen`'s JS binding + /// Tantivy result. /// /// Within tantivy, please avoid importing `Result` using `use crate::Result`