Compare commits

...

2 Commits

Author SHA1 Message Date
Paul Masurel
89f91b1b58 first stab 2021-10-06 12:10:16 +09:00
Paul Masurel
19965c46bc Added wasm-mt 2021-10-06 10:45:17 +09:00
4 changed files with 17 additions and 51 deletions

View File

@@ -26,6 +26,7 @@ snap = { version = "1.0.5", optional = true }
tempfile = { version = "3.2", optional = true } tempfile = { version = "3.2", optional = true }
log = "0.4.14" log = "0.4.14"
serde = { version = "1.0.126", features = ["derive"] } serde = { version = "1.0.126", features = ["derive"] }
serde_closure = "0.3"
serde_json = "1.0.64" serde_json = "1.0.64"
num_cpus = "1.13" num_cpus = "1.13"
fs2={ version = "0.4.3", optional = true } fs2={ version = "0.4.3", optional = true }
@@ -50,11 +51,12 @@ fail = "0.4"
murmurhash32 = "0.2" murmurhash32 = "0.2"
chrono = "0.4.19" chrono = "0.4.19"
smallvec = "1.6.1" smallvec = "1.6.1"
rayon = "1.5"
lru = "0.7.0" lru = "0.7.0"
fastdivide = "0.3" fastdivide = "0.3"
itertools = "0.10.0" itertools = "0.10.0"
measure_time = "0.7.0" measure_time = "0.7.0"
wasm-mt = "0.1"
wasm-mt-pool = "0.1"
[target.'cfg(windows)'.dependencies] [target.'cfg(windows)'.dependencies]
winapi = "0.3.9" winapi = "0.3.9"

View File

@@ -1,5 +1,4 @@
use crossbeam::channel; use crossbeam::channel;
use rayon::{ThreadPool, ThreadPoolBuilder};
/// Search executor whether search request are single thread or multithread. /// Search executor whether search request are single thread or multithread.
/// ///
@@ -11,8 +10,6 @@ use rayon::{ThreadPool, ThreadPoolBuilder};
pub enum Executor { pub enum Executor {
/// Single thread variant of an Executor /// Single thread variant of an Executor
SingleThread, SingleThread,
/// Thread pool variant of an Executor
ThreadPool(ThreadPool),
} }
impl Executor { impl Executor {
@@ -21,15 +18,6 @@ impl Executor {
Executor::SingleThread Executor::SingleThread
} }
/// Creates an Executor that dispatches the tasks in a thread pool.
pub fn multi_thread(num_threads: usize, prefix: &'static str) -> crate::Result<Executor> {
let pool = ThreadPoolBuilder::new()
.num_threads(num_threads)
.thread_name(move |num| format!("{}{}", prefix, num))
.build()?;
Ok(Executor::ThreadPool(pool))
}
/// Perform a map in the thread pool. /// Perform a map in the thread pool.
/// ///
/// Regardless of the executor (`SingleThread` or `ThreadPool`), panics in the task /// Regardless of the executor (`SingleThread` or `ThreadPool`), panics in the task
@@ -46,40 +34,6 @@ impl Executor {
) -> crate::Result<Vec<R>> { ) -> crate::Result<Vec<R>> {
match self { match self {
Executor::SingleThread => args.map(f).collect::<crate::Result<_>>(), Executor::SingleThread => args.map(f).collect::<crate::Result<_>>(),
Executor::ThreadPool(pool) => {
let args_with_indices: Vec<(usize, A)> = args.enumerate().collect();
let num_fruits = args_with_indices.len();
let fruit_receiver = {
let (fruit_sender, fruit_receiver) = channel::unbounded();
pool.scope(|scope| {
for arg_with_idx in args_with_indices {
scope.spawn(|_| {
let (idx, arg) = arg_with_idx;
let fruit = f(arg);
if let Err(err) = fruit_sender.send((idx, fruit)) {
error!("Failed to send search task. It probably means all search threads have panicked. {:?}", err);
}
});
}
});
fruit_receiver
// This ends the scope of fruit_sender.
// This is important as it makes it possible for the fruit_receiver iteration to
// terminate.
};
// This is lame, but safe.
let mut results_with_position = Vec::with_capacity(num_fruits);
for (pos, fruit_res) in fruit_receiver {
let fruit = fruit_res?;
results_with_position.push((pos, fruit));
}
results_with_position.sort_by_key(|(pos, _)| *pos);
assert_eq!(results_with_position.len(), num_fruits);
Ok(results_with_position
.into_iter()
.map(|(_, fruit)| fruit)
.collect::<Vec<_>>())
}
} }
} }
} }

View File

@@ -29,9 +29,12 @@ use futures::executor::block_on;
use futures::future::Future; use futures::future::Future;
use smallvec::smallvec; use smallvec::smallvec;
use smallvec::SmallVec; use smallvec::SmallVec;
use wasm_mt_pool::pool_exec;
use wasm_mt::prelude::*;
use std::mem; use std::mem;
use std::ops::Range; use std::ops::Range;
use std::sync::Arc; use std::sync::Arc;
use wasm_mt_pool::prelude::*;
use std::thread; use std::thread;
use std::thread::JoinHandle; use std::thread::JoinHandle;
@@ -75,7 +78,7 @@ pub struct IndexWriter {
heap_size_in_bytes_per_thread: usize, heap_size_in_bytes_per_thread: usize,
workers_join_handle: Vec<JoinHandle<crate::Result<()>>>, workers_join_handle: Vec<JoinHandle<Result<JsValue, JsValue>>>,
operation_receiver: OperationReceiver, operation_receiver: OperationReceiver,
operation_sender: OperationSender, operation_sender: OperationSender,
@@ -90,6 +93,8 @@ pub struct IndexWriter {
stamper: Stamper, stamper: Stamper,
committed_opstamp: Opstamp, committed_opstamp: Opstamp,
worker_pool: wasm_mt_pool::ThreadPool,
} }
fn compute_deleted_bitset( fn compute_deleted_bitset(
@@ -318,6 +323,7 @@ impl IndexWriter {
let segment_updater = let segment_updater =
SegmentUpdater::create(index.clone(), stamper.clone(), &delete_queue.cursor())?; SegmentUpdater::create(index.clone(), stamper.clone(), &delete_queue.cursor())?;
let worker_pool = block_on(wasm_mt_pool::ThreadPool::new(num_threads, crate::PKG_JS).and_init()).unwrap();
let mut index_writer = IndexWriter { let mut index_writer = IndexWriter {
_directory_lock: Some(directory_lock), _directory_lock: Some(directory_lock),
@@ -338,6 +344,7 @@ impl IndexWriter {
stamper, stamper,
worker_id: 0, worker_id: 0,
worker_pool,
}; };
index_writer.start_workers()?; index_writer.start_workers()?;
Ok(index_writer) Ok(index_writer)
@@ -411,9 +418,8 @@ impl IndexWriter {
let mem_budget = self.heap_size_in_bytes_per_thread; let mem_budget = self.heap_size_in_bytes_per_thread;
let index = self.index.clone(); let index = self.index.clone();
let join_handle: JoinHandle<crate::Result<()>> = thread::Builder::new() let join_handle: JoinHandle<crate::Result<_>> = pool_exec!(self.worker_pool,
.name(format!("thrd-tantivy-index{}", self.worker_id)) move || {
.spawn(move || {
loop { loop {
let mut document_iterator = let mut document_iterator =
document_receiver_clone.clone().into_iter().peekable(); document_receiver_clone.clone().into_iter().peekable();

View File

@@ -11,6 +11,8 @@
#![doc(test(attr(allow(unused_variables), deny(warnings))))] #![doc(test(attr(allow(unused_variables), deny(warnings))))]
#![warn(missing_docs)] #![warn(missing_docs)]
#![feature(async_closure)]
//! # `tantivy` //! # `tantivy`
//! //!
//! Tantivy is a search engine library. //! Tantivy is a search engine library.
@@ -126,6 +128,8 @@ mod macros;
pub use crate::error::TantivyError; pub use crate::error::TantivyError;
pub use chrono; pub use chrono;
pub const PKG_JS: &'static str = "./pkg/pool_exec.js"; // path to `wasm-bindgen`'s JS binding
/// Tantivy result. /// Tantivy result.
/// ///
/// Within tantivy, please avoid importing `Result` using `use crate::Result` /// Within tantivy, please avoid importing `Result` using `use crate::Result`