mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2025-12-27 20:42:54 +00:00
Compare commits
2 Commits
list_field
...
wasm-frien
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
89f91b1b58 | ||
|
|
19965c46bc |
@@ -26,6 +26,7 @@ snap = { version = "1.0.5", optional = true }
|
||||
tempfile = { version = "3.2", optional = true }
|
||||
log = "0.4.14"
|
||||
serde = { version = "1.0.126", features = ["derive"] }
|
||||
serde_closure = "0.3"
|
||||
serde_json = "1.0.64"
|
||||
num_cpus = "1.13"
|
||||
fs2={ version = "0.4.3", optional = true }
|
||||
@@ -50,11 +51,12 @@ fail = "0.4"
|
||||
murmurhash32 = "0.2"
|
||||
chrono = "0.4.19"
|
||||
smallvec = "1.6.1"
|
||||
rayon = "1.5"
|
||||
lru = "0.7.0"
|
||||
fastdivide = "0.3"
|
||||
itertools = "0.10.0"
|
||||
measure_time = "0.7.0"
|
||||
wasm-mt = "0.1"
|
||||
wasm-mt-pool = "0.1"
|
||||
|
||||
[target.'cfg(windows)'.dependencies]
|
||||
winapi = "0.3.9"
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use crossbeam::channel;
|
||||
use rayon::{ThreadPool, ThreadPoolBuilder};
|
||||
|
||||
/// Search executor whether search request are single thread or multithread.
|
||||
///
|
||||
@@ -11,8 +10,6 @@ use rayon::{ThreadPool, ThreadPoolBuilder};
|
||||
pub enum Executor {
|
||||
/// Single thread variant of an Executor
|
||||
SingleThread,
|
||||
/// Thread pool variant of an Executor
|
||||
ThreadPool(ThreadPool),
|
||||
}
|
||||
|
||||
impl Executor {
|
||||
@@ -21,15 +18,6 @@ impl Executor {
|
||||
Executor::SingleThread
|
||||
}
|
||||
|
||||
/// Creates an Executor that dispatches the tasks in a thread pool.
|
||||
pub fn multi_thread(num_threads: usize, prefix: &'static str) -> crate::Result<Executor> {
|
||||
let pool = ThreadPoolBuilder::new()
|
||||
.num_threads(num_threads)
|
||||
.thread_name(move |num| format!("{}{}", prefix, num))
|
||||
.build()?;
|
||||
Ok(Executor::ThreadPool(pool))
|
||||
}
|
||||
|
||||
/// Perform a map in the thread pool.
|
||||
///
|
||||
/// Regardless of the executor (`SingleThread` or `ThreadPool`), panics in the task
|
||||
@@ -46,40 +34,6 @@ impl Executor {
|
||||
) -> crate::Result<Vec<R>> {
|
||||
match self {
|
||||
Executor::SingleThread => args.map(f).collect::<crate::Result<_>>(),
|
||||
Executor::ThreadPool(pool) => {
|
||||
let args_with_indices: Vec<(usize, A)> = args.enumerate().collect();
|
||||
let num_fruits = args_with_indices.len();
|
||||
let fruit_receiver = {
|
||||
let (fruit_sender, fruit_receiver) = channel::unbounded();
|
||||
pool.scope(|scope| {
|
||||
for arg_with_idx in args_with_indices {
|
||||
scope.spawn(|_| {
|
||||
let (idx, arg) = arg_with_idx;
|
||||
let fruit = f(arg);
|
||||
if let Err(err) = fruit_sender.send((idx, fruit)) {
|
||||
error!("Failed to send search task. It probably means all search threads have panicked. {:?}", err);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
fruit_receiver
|
||||
// This ends the scope of fruit_sender.
|
||||
// This is important as it makes it possible for the fruit_receiver iteration to
|
||||
// terminate.
|
||||
};
|
||||
// This is lame, but safe.
|
||||
let mut results_with_position = Vec::with_capacity(num_fruits);
|
||||
for (pos, fruit_res) in fruit_receiver {
|
||||
let fruit = fruit_res?;
|
||||
results_with_position.push((pos, fruit));
|
||||
}
|
||||
results_with_position.sort_by_key(|(pos, _)| *pos);
|
||||
assert_eq!(results_with_position.len(), num_fruits);
|
||||
Ok(results_with_position
|
||||
.into_iter()
|
||||
.map(|(_, fruit)| fruit)
|
||||
.collect::<Vec<_>>())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,9 +29,12 @@ use futures::executor::block_on;
|
||||
use futures::future::Future;
|
||||
use smallvec::smallvec;
|
||||
use smallvec::SmallVec;
|
||||
use wasm_mt_pool::pool_exec;
|
||||
use wasm_mt::prelude::*;
|
||||
use std::mem;
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
use wasm_mt_pool::prelude::*;
|
||||
use std::thread;
|
||||
use std::thread::JoinHandle;
|
||||
|
||||
@@ -75,7 +78,7 @@ pub struct IndexWriter {
|
||||
|
||||
heap_size_in_bytes_per_thread: usize,
|
||||
|
||||
workers_join_handle: Vec<JoinHandle<crate::Result<()>>>,
|
||||
workers_join_handle: Vec<JoinHandle<Result<JsValue, JsValue>>>,
|
||||
|
||||
operation_receiver: OperationReceiver,
|
||||
operation_sender: OperationSender,
|
||||
@@ -90,6 +93,8 @@ pub struct IndexWriter {
|
||||
|
||||
stamper: Stamper,
|
||||
committed_opstamp: Opstamp,
|
||||
|
||||
worker_pool: wasm_mt_pool::ThreadPool,
|
||||
}
|
||||
|
||||
fn compute_deleted_bitset(
|
||||
@@ -318,6 +323,7 @@ impl IndexWriter {
|
||||
let segment_updater =
|
||||
SegmentUpdater::create(index.clone(), stamper.clone(), &delete_queue.cursor())?;
|
||||
|
||||
let worker_pool = block_on(wasm_mt_pool::ThreadPool::new(num_threads, crate::PKG_JS).and_init()).unwrap();
|
||||
let mut index_writer = IndexWriter {
|
||||
_directory_lock: Some(directory_lock),
|
||||
|
||||
@@ -338,6 +344,7 @@ impl IndexWriter {
|
||||
stamper,
|
||||
|
||||
worker_id: 0,
|
||||
worker_pool,
|
||||
};
|
||||
index_writer.start_workers()?;
|
||||
Ok(index_writer)
|
||||
@@ -411,9 +418,8 @@ impl IndexWriter {
|
||||
|
||||
let mem_budget = self.heap_size_in_bytes_per_thread;
|
||||
let index = self.index.clone();
|
||||
let join_handle: JoinHandle<crate::Result<()>> = thread::Builder::new()
|
||||
.name(format!("thrd-tantivy-index{}", self.worker_id))
|
||||
.spawn(move || {
|
||||
let join_handle: JoinHandle<crate::Result<_>> = pool_exec!(self.worker_pool,
|
||||
move || {
|
||||
loop {
|
||||
let mut document_iterator =
|
||||
document_receiver_clone.clone().into_iter().peekable();
|
||||
|
||||
@@ -11,6 +11,8 @@
|
||||
#![doc(test(attr(allow(unused_variables), deny(warnings))))]
|
||||
#![warn(missing_docs)]
|
||||
|
||||
#![feature(async_closure)]
|
||||
|
||||
//! # `tantivy`
|
||||
//!
|
||||
//! Tantivy is a search engine library.
|
||||
@@ -126,6 +128,8 @@ mod macros;
|
||||
pub use crate::error::TantivyError;
|
||||
pub use chrono;
|
||||
|
||||
pub const PKG_JS: &'static str = "./pkg/pool_exec.js"; // path to `wasm-bindgen`'s JS binding
|
||||
|
||||
/// Tantivy result.
|
||||
///
|
||||
/// Within tantivy, please avoid importing `Result` using `use crate::Result`
|
||||
|
||||
Reference in New Issue
Block a user