Compare commits

...

1 Commits

Author SHA1 Message Date
François Massot
85707d093b Use rayon for indexing workers. 2022-03-27 20:36:24 +02:00

View File

@@ -1,10 +1,9 @@
use std::ops::Range;
use std::sync::Arc;
use std::thread;
use std::thread::JoinHandle;
use std::sync::{Arc, Mutex};
use common::BitSet;
use crossbeam::channel;
use rayon::{ThreadPool, ThreadPoolBuilder};
use smallvec::smallvec;
use super::operation::{AddOperation, UserOperation};
@@ -61,7 +60,7 @@ pub struct IndexWriter {
memory_arena_in_bytes_per_thread: usize,
workers_join_handle: Vec<JoinHandle<crate::Result<()>>>,
workers_future_results: Vec<Mutex<FutureResult<()>>>,
index_writer_status: IndexWriterStatus,
operation_sender: AddBatchSender,
@@ -72,6 +71,8 @@ pub struct IndexWriter {
num_threads: usize,
pool: ThreadPool,
delete_queue: DeleteQueue,
stamper: Stamper,
@@ -300,6 +301,15 @@ impl IndexWriter {
let segment_updater =
SegmentUpdater::create(index.clone(), stamper.clone(), &delete_queue.cursor())?;
let pool = ThreadPoolBuilder::new()
.thread_name(|i| format!("indexing_worker_thread_{i}"))
.num_threads(num_threads)
.build()
.map_err(|_| {
crate::TantivyError::SystemError(
"Failed to spawn indexing worker thread".to_string(),
)
})?;
let mut index_writer = IndexWriter {
_directory_lock: Some(directory_lock),
@@ -311,8 +321,9 @@ impl IndexWriter {
segment_updater,
workers_join_handle: vec![],
workers_future_results: vec![],
num_threads,
pool,
delete_queue,
@@ -342,11 +353,12 @@ impl IndexWriter {
// dropping the last reference to the segment_updater.
self.drop_sender();
let former_workers_handles = std::mem::take(&mut self.workers_join_handle);
for join_handle in former_workers_handles {
join_handle
.join()
.map_err(|_| error_in_index_worker_thread("Worker thread panicked."))?
let former_workers_future_results = std::mem::take(&mut self.workers_future_results);
for future_result in former_workers_future_results.into_iter() {
future_result
.into_inner()
.map_err(|_| TantivyError::Poisoned)?
.wait()
.map_err(|_| error_in_index_worker_thread("Worker thread failed."))?;
}
@@ -407,43 +419,51 @@ impl IndexWriter {
let mem_budget = self.memory_arena_in_bytes_per_thread;
let index = self.index.clone();
let join_handle: JoinHandle<crate::Result<()>> = thread::Builder::new()
.name(format!("thrd-tantivy-index{}", self.worker_id))
.spawn(move || {
loop {
let mut document_iterator = document_receiver_clone
.clone()
.into_iter()
.filter(|batch| !batch.is_empty())
.peekable();
let (scheduled_result, sender) = FutureResult::create(
"A segment_updater future did not succeed. This should never happen.",
);
self.pool.spawn(move || {
loop {
let mut document_iterator = document_receiver_clone
.clone()
.into_iter()
.filter(|batch| !batch.is_empty())
.peekable();
// The peeking here is to avoid creating a new segment's files
// if no document are available.
//
// This is a valid guarantee as the peeked document now belongs to
// our local iterator.
if let Some(batch) = document_iterator.peek() {
assert!(!batch.is_empty());
delete_cursor.skip_to(batch[0].opstamp);
} else {
// No more documents.
// It happens when there is a commit, or if the `IndexWriter`
// was dropped.
index_writer_bomb.defuse();
return Ok(());
}
index_documents(
mem_budget,
index.new_segment(),
&mut document_iterator,
&mut segment_updater,
delete_cursor.clone(),
)?;
// The peeking here is to avoid creating a new segment's files
// if no document are available.
//
// This is a valid guarantee as the peeked document now belongs to
// our local iterator.
if let Some(batch) = document_iterator.peek() {
assert!(!batch.is_empty());
delete_cursor.skip_to(batch[0].opstamp);
} else {
// No more documents.
// It happens when there is a commit, or if the `IndexWriter`
// was dropped.
index_writer_bomb.defuse();
let _send_result = sender.send(Ok(()));
return;
}
})?;
let result = index_documents(
mem_budget,
index.new_segment(),
&mut document_iterator,
&mut segment_updater,
delete_cursor.clone(),
);
if result.is_err() {
let _send_result = sender.send(result);
return;
}
}
});
self.worker_id += 1;
self.workers_join_handle.push(join_handle);
self.workers_future_results
.push(Mutex::new(scheduled_result));
Ok(())
}
@@ -621,13 +641,13 @@ impl IndexWriter {
// and recreate a new one.
self.recreate_document_channel();
let former_workers_join_handle = std::mem::take(&mut self.workers_join_handle);
let former_workers_future_results = std::mem::take(&mut self.workers_future_results);
for worker_handle in former_workers_join_handle {
let indexing_worker_result = worker_handle
.join()
.map_err(|e| TantivyError::ErrorInThread(format!("{:?}", e)))?;
indexing_worker_result?;
for future_result in former_workers_future_results.into_iter() {
future_result
.into_inner()
.map_err(|_| TantivyError::Poisoned)?
.wait()?;
self.add_indexing_worker()?;
}
@@ -768,8 +788,10 @@ impl Drop for IndexWriter {
fn drop(&mut self) {
self.segment_updater.kill();
self.drop_sender();
for work in self.workers_join_handle.drain(..) {
let _ = work.join();
for future_result_mutex in self.workers_future_results.drain(..) {
let _ = future_result_mutex
.into_inner()
.map(|future_result| future_result.wait());
}
}
}