From 85707d093b15e3c5722bf11ecd5bb7fa2eec7912 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Massot?= Date: Sun, 27 Mar 2022 20:22:50 +0200 Subject: [PATCH] Use rayon for indexing workers. --- src/indexer/index_writer.rs | 126 +++++++++++++++++++++--------------- 1 file changed, 74 insertions(+), 52 deletions(-) diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index ff71742d4..3e2943c1d 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -1,10 +1,9 @@ use std::ops::Range; -use std::sync::Arc; -use std::thread; -use std::thread::JoinHandle; +use std::sync::{Arc, Mutex}; use common::BitSet; use crossbeam::channel; +use rayon::{ThreadPool, ThreadPoolBuilder}; use smallvec::smallvec; use super::operation::{AddOperation, UserOperation}; @@ -61,7 +60,7 @@ pub struct IndexWriter { memory_arena_in_bytes_per_thread: usize, - workers_join_handle: Vec>>, + workers_future_results: Vec>>, index_writer_status: IndexWriterStatus, operation_sender: AddBatchSender, @@ -72,6 +71,8 @@ pub struct IndexWriter { num_threads: usize, + pool: ThreadPool, + delete_queue: DeleteQueue, stamper: Stamper, @@ -300,6 +301,15 @@ impl IndexWriter { let segment_updater = SegmentUpdater::create(index.clone(), stamper.clone(), &delete_queue.cursor())?; + let pool = ThreadPoolBuilder::new() + .thread_name(|i| format!("indexing_worker_thread_{i}")) + .num_threads(num_threads) + .build() + .map_err(|_| { + crate::TantivyError::SystemError( + "Failed to spawn indexing worker thread".to_string(), + ) + })?; let mut index_writer = IndexWriter { _directory_lock: Some(directory_lock), @@ -311,8 +321,9 @@ impl IndexWriter { segment_updater, - workers_join_handle: vec![], + workers_future_results: vec![], num_threads, + pool, delete_queue, @@ -342,11 +353,12 @@ impl IndexWriter { // dropping the last reference to the segment_updater. self.drop_sender(); - let former_workers_handles = std::mem::take(&mut self.workers_join_handle); - for join_handle in former_workers_handles { - join_handle - .join() - .map_err(|_| error_in_index_worker_thread("Worker thread panicked."))? + let former_workers_future_results = std::mem::take(&mut self.workers_future_results); + for future_result in former_workers_future_results.into_iter() { + future_result + .into_inner() + .map_err(|_| TantivyError::Poisoned)? + .wait() .map_err(|_| error_in_index_worker_thread("Worker thread failed."))?; } @@ -407,43 +419,51 @@ impl IndexWriter { let mem_budget = self.memory_arena_in_bytes_per_thread; let index = self.index.clone(); - let join_handle: JoinHandle> = thread::Builder::new() - .name(format!("thrd-tantivy-index{}", self.worker_id)) - .spawn(move || { - loop { - let mut document_iterator = document_receiver_clone - .clone() - .into_iter() - .filter(|batch| !batch.is_empty()) - .peekable(); + let (scheduled_result, sender) = FutureResult::create( + "A segment_updater future did not succeed. This should never happen.", + ); + self.pool.spawn(move || { + loop { + let mut document_iterator = document_receiver_clone + .clone() + .into_iter() + .filter(|batch| !batch.is_empty()) + .peekable(); - // The peeking here is to avoid creating a new segment's files - // if no document are available. - // - // This is a valid guarantee as the peeked document now belongs to - // our local iterator. - if let Some(batch) = document_iterator.peek() { - assert!(!batch.is_empty()); - delete_cursor.skip_to(batch[0].opstamp); - } else { - // No more documents. - // It happens when there is a commit, or if the `IndexWriter` - // was dropped. - index_writer_bomb.defuse(); - return Ok(()); - } - - index_documents( - mem_budget, - index.new_segment(), - &mut document_iterator, - &mut segment_updater, - delete_cursor.clone(), - )?; + // The peeking here is to avoid creating a new segment's files + // if no document are available. + // + // This is a valid guarantee as the peeked document now belongs to + // our local iterator. + if let Some(batch) = document_iterator.peek() { + assert!(!batch.is_empty()); + delete_cursor.skip_to(batch[0].opstamp); + } else { + // No more documents. + // It happens when there is a commit, or if the `IndexWriter` + // was dropped. + index_writer_bomb.defuse(); + let _send_result = sender.send(Ok(())); + return; } - })?; + + let result = index_documents( + mem_budget, + index.new_segment(), + &mut document_iterator, + &mut segment_updater, + delete_cursor.clone(), + ); + + if result.is_err() { + let _send_result = sender.send(result); + return; + } + } + }); self.worker_id += 1; - self.workers_join_handle.push(join_handle); + self.workers_future_results + .push(Mutex::new(scheduled_result)); Ok(()) } @@ -621,13 +641,13 @@ impl IndexWriter { // and recreate a new one. self.recreate_document_channel(); - let former_workers_join_handle = std::mem::take(&mut self.workers_join_handle); + let former_workers_future_results = std::mem::take(&mut self.workers_future_results); - for worker_handle in former_workers_join_handle { - let indexing_worker_result = worker_handle - .join() - .map_err(|e| TantivyError::ErrorInThread(format!("{:?}", e)))?; - indexing_worker_result?; + for future_result in former_workers_future_results.into_iter() { + future_result + .into_inner() + .map_err(|_| TantivyError::Poisoned)? + .wait()?; self.add_indexing_worker()?; } @@ -768,8 +788,10 @@ impl Drop for IndexWriter { fn drop(&mut self) { self.segment_updater.kill(); self.drop_sender(); - for work in self.workers_join_handle.drain(..) { - let _ = work.join(); + for future_result_mutex in self.workers_future_results.drain(..) { + let _ = future_result_mutex + .into_inner() + .map(|future_result| future_result.wait()); } } }