Compare commits

...

2 Commits

Author SHA1 Message Date
Paul Masurel
89f91b1b58 first stab 2021-10-06 12:10:16 +09:00
Paul Masurel
19965c46bc Added wasm-mt 2021-10-06 10:45:17 +09:00
4 changed files with 17 additions and 51 deletions

View File

@@ -26,6 +26,7 @@ snap = { version = "1.0.5", optional = true }
tempfile = { version = "3.2", optional = true }
log = "0.4.14"
serde = { version = "1.0.126", features = ["derive"] }
serde_closure = "0.3"
serde_json = "1.0.64"
num_cpus = "1.13"
fs2={ version = "0.4.3", optional = true }
@@ -50,11 +51,12 @@ fail = "0.4"
murmurhash32 = "0.2"
chrono = "0.4.19"
smallvec = "1.6.1"
rayon = "1.5"
lru = "0.7.0"
fastdivide = "0.3"
itertools = "0.10.0"
measure_time = "0.7.0"
wasm-mt = "0.1"
wasm-mt-pool = "0.1"
[target.'cfg(windows)'.dependencies]
winapi = "0.3.9"

View File

@@ -1,5 +1,4 @@
use crossbeam::channel;
use rayon::{ThreadPool, ThreadPoolBuilder};
/// Search executor whether search request are single thread or multithread.
///
@@ -11,8 +10,6 @@ use rayon::{ThreadPool, ThreadPoolBuilder};
pub enum Executor {
/// Single thread variant of an Executor
SingleThread,
/// Thread pool variant of an Executor
ThreadPool(ThreadPool),
}
impl Executor {
@@ -21,15 +18,6 @@ impl Executor {
Executor::SingleThread
}
/// Creates an Executor that dispatches the tasks in a thread pool.
pub fn multi_thread(num_threads: usize, prefix: &'static str) -> crate::Result<Executor> {
let pool = ThreadPoolBuilder::new()
.num_threads(num_threads)
.thread_name(move |num| format!("{}{}", prefix, num))
.build()?;
Ok(Executor::ThreadPool(pool))
}
/// Perform a map in the thread pool.
///
/// Regardless of the executor (`SingleThread` or `ThreadPool`), panics in the task
@@ -46,40 +34,6 @@ impl Executor {
) -> crate::Result<Vec<R>> {
match self {
Executor::SingleThread => args.map(f).collect::<crate::Result<_>>(),
Executor::ThreadPool(pool) => {
let args_with_indices: Vec<(usize, A)> = args.enumerate().collect();
let num_fruits = args_with_indices.len();
let fruit_receiver = {
let (fruit_sender, fruit_receiver) = channel::unbounded();
pool.scope(|scope| {
for arg_with_idx in args_with_indices {
scope.spawn(|_| {
let (idx, arg) = arg_with_idx;
let fruit = f(arg);
if let Err(err) = fruit_sender.send((idx, fruit)) {
error!("Failed to send search task. It probably means all search threads have panicked. {:?}", err);
}
});
}
});
fruit_receiver
// This ends the scope of fruit_sender.
// This is important as it makes it possible for the fruit_receiver iteration to
// terminate.
};
// This is lame, but safe.
let mut results_with_position = Vec::with_capacity(num_fruits);
for (pos, fruit_res) in fruit_receiver {
let fruit = fruit_res?;
results_with_position.push((pos, fruit));
}
results_with_position.sort_by_key(|(pos, _)| *pos);
assert_eq!(results_with_position.len(), num_fruits);
Ok(results_with_position
.into_iter()
.map(|(_, fruit)| fruit)
.collect::<Vec<_>>())
}
}
}
}

View File

@@ -29,9 +29,12 @@ use futures::executor::block_on;
use futures::future::Future;
use smallvec::smallvec;
use smallvec::SmallVec;
use wasm_mt_pool::pool_exec;
use wasm_mt::prelude::*;
use std::mem;
use std::ops::Range;
use std::sync::Arc;
use wasm_mt_pool::prelude::*;
use std::thread;
use std::thread::JoinHandle;
@@ -75,7 +78,7 @@ pub struct IndexWriter {
heap_size_in_bytes_per_thread: usize,
workers_join_handle: Vec<JoinHandle<crate::Result<()>>>,
workers_join_handle: Vec<JoinHandle<Result<JsValue, JsValue>>>,
operation_receiver: OperationReceiver,
operation_sender: OperationSender,
@@ -90,6 +93,8 @@ pub struct IndexWriter {
stamper: Stamper,
committed_opstamp: Opstamp,
worker_pool: wasm_mt_pool::ThreadPool,
}
fn compute_deleted_bitset(
@@ -318,6 +323,7 @@ impl IndexWriter {
let segment_updater =
SegmentUpdater::create(index.clone(), stamper.clone(), &delete_queue.cursor())?;
let worker_pool = block_on(wasm_mt_pool::ThreadPool::new(num_threads, crate::PKG_JS).and_init()).unwrap();
let mut index_writer = IndexWriter {
_directory_lock: Some(directory_lock),
@@ -338,6 +344,7 @@ impl IndexWriter {
stamper,
worker_id: 0,
worker_pool,
};
index_writer.start_workers()?;
Ok(index_writer)
@@ -411,9 +418,8 @@ impl IndexWriter {
let mem_budget = self.heap_size_in_bytes_per_thread;
let index = self.index.clone();
let join_handle: JoinHandle<crate::Result<()>> = thread::Builder::new()
.name(format!("thrd-tantivy-index{}", self.worker_id))
.spawn(move || {
let join_handle: JoinHandle<crate::Result<_>> = pool_exec!(self.worker_pool,
move || {
loop {
let mut document_iterator =
document_receiver_clone.clone().into_iter().peekable();

View File

@@ -11,6 +11,8 @@
#![doc(test(attr(allow(unused_variables), deny(warnings))))]
#![warn(missing_docs)]
#![feature(async_closure)]
//! # `tantivy`
//!
//! Tantivy is a search engine library.
@@ -126,6 +128,8 @@ mod macros;
pub use crate::error::TantivyError;
pub use chrono;
pub const PKG_JS: &'static str = "./pkg/pool_exec.js"; // path to `wasm-bindgen`'s JS binding
/// Tantivy result.
///
/// Within tantivy, please avoid importing `Result` using `use crate::Result`