mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-04 16:22:55 +00:00
Compare commits
2 Commits
list_field
...
wasm-frien
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
89f91b1b58 | ||
|
|
19965c46bc |
@@ -26,6 +26,7 @@ snap = { version = "1.0.5", optional = true }
|
|||||||
tempfile = { version = "3.2", optional = true }
|
tempfile = { version = "3.2", optional = true }
|
||||||
log = "0.4.14"
|
log = "0.4.14"
|
||||||
serde = { version = "1.0.126", features = ["derive"] }
|
serde = { version = "1.0.126", features = ["derive"] }
|
||||||
|
serde_closure = "0.3"
|
||||||
serde_json = "1.0.64"
|
serde_json = "1.0.64"
|
||||||
num_cpus = "1.13"
|
num_cpus = "1.13"
|
||||||
fs2={ version = "0.4.3", optional = true }
|
fs2={ version = "0.4.3", optional = true }
|
||||||
@@ -50,11 +51,12 @@ fail = "0.4"
|
|||||||
murmurhash32 = "0.2"
|
murmurhash32 = "0.2"
|
||||||
chrono = "0.4.19"
|
chrono = "0.4.19"
|
||||||
smallvec = "1.6.1"
|
smallvec = "1.6.1"
|
||||||
rayon = "1.5"
|
|
||||||
lru = "0.7.0"
|
lru = "0.7.0"
|
||||||
fastdivide = "0.3"
|
fastdivide = "0.3"
|
||||||
itertools = "0.10.0"
|
itertools = "0.10.0"
|
||||||
measure_time = "0.7.0"
|
measure_time = "0.7.0"
|
||||||
|
wasm-mt = "0.1"
|
||||||
|
wasm-mt-pool = "0.1"
|
||||||
|
|
||||||
[target.'cfg(windows)'.dependencies]
|
[target.'cfg(windows)'.dependencies]
|
||||||
winapi = "0.3.9"
|
winapi = "0.3.9"
|
||||||
|
|||||||
@@ -1,5 +1,4 @@
|
|||||||
use crossbeam::channel;
|
use crossbeam::channel;
|
||||||
use rayon::{ThreadPool, ThreadPoolBuilder};
|
|
||||||
|
|
||||||
/// Search executor whether search request are single thread or multithread.
|
/// Search executor whether search request are single thread or multithread.
|
||||||
///
|
///
|
||||||
@@ -11,8 +10,6 @@ use rayon::{ThreadPool, ThreadPoolBuilder};
|
|||||||
pub enum Executor {
|
pub enum Executor {
|
||||||
/// Single thread variant of an Executor
|
/// Single thread variant of an Executor
|
||||||
SingleThread,
|
SingleThread,
|
||||||
/// Thread pool variant of an Executor
|
|
||||||
ThreadPool(ThreadPool),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Executor {
|
impl Executor {
|
||||||
@@ -21,15 +18,6 @@ impl Executor {
|
|||||||
Executor::SingleThread
|
Executor::SingleThread
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Creates an Executor that dispatches the tasks in a thread pool.
|
|
||||||
pub fn multi_thread(num_threads: usize, prefix: &'static str) -> crate::Result<Executor> {
|
|
||||||
let pool = ThreadPoolBuilder::new()
|
|
||||||
.num_threads(num_threads)
|
|
||||||
.thread_name(move |num| format!("{}{}", prefix, num))
|
|
||||||
.build()?;
|
|
||||||
Ok(Executor::ThreadPool(pool))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Perform a map in the thread pool.
|
/// Perform a map in the thread pool.
|
||||||
///
|
///
|
||||||
/// Regardless of the executor (`SingleThread` or `ThreadPool`), panics in the task
|
/// Regardless of the executor (`SingleThread` or `ThreadPool`), panics in the task
|
||||||
@@ -46,40 +34,6 @@ impl Executor {
|
|||||||
) -> crate::Result<Vec<R>> {
|
) -> crate::Result<Vec<R>> {
|
||||||
match self {
|
match self {
|
||||||
Executor::SingleThread => args.map(f).collect::<crate::Result<_>>(),
|
Executor::SingleThread => args.map(f).collect::<crate::Result<_>>(),
|
||||||
Executor::ThreadPool(pool) => {
|
|
||||||
let args_with_indices: Vec<(usize, A)> = args.enumerate().collect();
|
|
||||||
let num_fruits = args_with_indices.len();
|
|
||||||
let fruit_receiver = {
|
|
||||||
let (fruit_sender, fruit_receiver) = channel::unbounded();
|
|
||||||
pool.scope(|scope| {
|
|
||||||
for arg_with_idx in args_with_indices {
|
|
||||||
scope.spawn(|_| {
|
|
||||||
let (idx, arg) = arg_with_idx;
|
|
||||||
let fruit = f(arg);
|
|
||||||
if let Err(err) = fruit_sender.send((idx, fruit)) {
|
|
||||||
error!("Failed to send search task. It probably means all search threads have panicked. {:?}", err);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
});
|
|
||||||
fruit_receiver
|
|
||||||
// This ends the scope of fruit_sender.
|
|
||||||
// This is important as it makes it possible for the fruit_receiver iteration to
|
|
||||||
// terminate.
|
|
||||||
};
|
|
||||||
// This is lame, but safe.
|
|
||||||
let mut results_with_position = Vec::with_capacity(num_fruits);
|
|
||||||
for (pos, fruit_res) in fruit_receiver {
|
|
||||||
let fruit = fruit_res?;
|
|
||||||
results_with_position.push((pos, fruit));
|
|
||||||
}
|
|
||||||
results_with_position.sort_by_key(|(pos, _)| *pos);
|
|
||||||
assert_eq!(results_with_position.len(), num_fruits);
|
|
||||||
Ok(results_with_position
|
|
||||||
.into_iter()
|
|
||||||
.map(|(_, fruit)| fruit)
|
|
||||||
.collect::<Vec<_>>())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -29,9 +29,12 @@ use futures::executor::block_on;
|
|||||||
use futures::future::Future;
|
use futures::future::Future;
|
||||||
use smallvec::smallvec;
|
use smallvec::smallvec;
|
||||||
use smallvec::SmallVec;
|
use smallvec::SmallVec;
|
||||||
|
use wasm_mt_pool::pool_exec;
|
||||||
|
use wasm_mt::prelude::*;
|
||||||
use std::mem;
|
use std::mem;
|
||||||
use std::ops::Range;
|
use std::ops::Range;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use wasm_mt_pool::prelude::*;
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use std::thread::JoinHandle;
|
use std::thread::JoinHandle;
|
||||||
|
|
||||||
@@ -75,7 +78,7 @@ pub struct IndexWriter {
|
|||||||
|
|
||||||
heap_size_in_bytes_per_thread: usize,
|
heap_size_in_bytes_per_thread: usize,
|
||||||
|
|
||||||
workers_join_handle: Vec<JoinHandle<crate::Result<()>>>,
|
workers_join_handle: Vec<JoinHandle<Result<JsValue, JsValue>>>,
|
||||||
|
|
||||||
operation_receiver: OperationReceiver,
|
operation_receiver: OperationReceiver,
|
||||||
operation_sender: OperationSender,
|
operation_sender: OperationSender,
|
||||||
@@ -90,6 +93,8 @@ pub struct IndexWriter {
|
|||||||
|
|
||||||
stamper: Stamper,
|
stamper: Stamper,
|
||||||
committed_opstamp: Opstamp,
|
committed_opstamp: Opstamp,
|
||||||
|
|
||||||
|
worker_pool: wasm_mt_pool::ThreadPool,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn compute_deleted_bitset(
|
fn compute_deleted_bitset(
|
||||||
@@ -318,6 +323,7 @@ impl IndexWriter {
|
|||||||
let segment_updater =
|
let segment_updater =
|
||||||
SegmentUpdater::create(index.clone(), stamper.clone(), &delete_queue.cursor())?;
|
SegmentUpdater::create(index.clone(), stamper.clone(), &delete_queue.cursor())?;
|
||||||
|
|
||||||
|
let worker_pool = block_on(wasm_mt_pool::ThreadPool::new(num_threads, crate::PKG_JS).and_init()).unwrap();
|
||||||
let mut index_writer = IndexWriter {
|
let mut index_writer = IndexWriter {
|
||||||
_directory_lock: Some(directory_lock),
|
_directory_lock: Some(directory_lock),
|
||||||
|
|
||||||
@@ -338,6 +344,7 @@ impl IndexWriter {
|
|||||||
stamper,
|
stamper,
|
||||||
|
|
||||||
worker_id: 0,
|
worker_id: 0,
|
||||||
|
worker_pool,
|
||||||
};
|
};
|
||||||
index_writer.start_workers()?;
|
index_writer.start_workers()?;
|
||||||
Ok(index_writer)
|
Ok(index_writer)
|
||||||
@@ -411,9 +418,8 @@ impl IndexWriter {
|
|||||||
|
|
||||||
let mem_budget = self.heap_size_in_bytes_per_thread;
|
let mem_budget = self.heap_size_in_bytes_per_thread;
|
||||||
let index = self.index.clone();
|
let index = self.index.clone();
|
||||||
let join_handle: JoinHandle<crate::Result<()>> = thread::Builder::new()
|
let join_handle: JoinHandle<crate::Result<_>> = pool_exec!(self.worker_pool,
|
||||||
.name(format!("thrd-tantivy-index{}", self.worker_id))
|
move || {
|
||||||
.spawn(move || {
|
|
||||||
loop {
|
loop {
|
||||||
let mut document_iterator =
|
let mut document_iterator =
|
||||||
document_receiver_clone.clone().into_iter().peekable();
|
document_receiver_clone.clone().into_iter().peekable();
|
||||||
|
|||||||
@@ -11,6 +11,8 @@
|
|||||||
#![doc(test(attr(allow(unused_variables), deny(warnings))))]
|
#![doc(test(attr(allow(unused_variables), deny(warnings))))]
|
||||||
#![warn(missing_docs)]
|
#![warn(missing_docs)]
|
||||||
|
|
||||||
|
#![feature(async_closure)]
|
||||||
|
|
||||||
//! # `tantivy`
|
//! # `tantivy`
|
||||||
//!
|
//!
|
||||||
//! Tantivy is a search engine library.
|
//! Tantivy is a search engine library.
|
||||||
@@ -126,6 +128,8 @@ mod macros;
|
|||||||
pub use crate::error::TantivyError;
|
pub use crate::error::TantivyError;
|
||||||
pub use chrono;
|
pub use chrono;
|
||||||
|
|
||||||
|
pub const PKG_JS: &'static str = "./pkg/pool_exec.js"; // path to `wasm-bindgen`'s JS binding
|
||||||
|
|
||||||
/// Tantivy result.
|
/// Tantivy result.
|
||||||
///
|
///
|
||||||
/// Within tantivy, please avoid importing `Result` using `use crate::Result`
|
/// Within tantivy, please avoid importing `Result` using `use crate::Result`
|
||||||
|
|||||||
Reference in New Issue
Block a user