mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2025-12-27 20:42:54 +00:00
Compare commits
1 Commits
check_bug
...
rayon-inde
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
85707d093b |
@@ -1,10 +1,9 @@
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::thread::JoinHandle;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use common::BitSet;
|
||||
use crossbeam::channel;
|
||||
use rayon::{ThreadPool, ThreadPoolBuilder};
|
||||
use smallvec::smallvec;
|
||||
|
||||
use super::operation::{AddOperation, UserOperation};
|
||||
@@ -61,7 +60,7 @@ pub struct IndexWriter {
|
||||
|
||||
memory_arena_in_bytes_per_thread: usize,
|
||||
|
||||
workers_join_handle: Vec<JoinHandle<crate::Result<()>>>,
|
||||
workers_future_results: Vec<Mutex<FutureResult<()>>>,
|
||||
|
||||
index_writer_status: IndexWriterStatus,
|
||||
operation_sender: AddBatchSender,
|
||||
@@ -72,6 +71,8 @@ pub struct IndexWriter {
|
||||
|
||||
num_threads: usize,
|
||||
|
||||
pool: ThreadPool,
|
||||
|
||||
delete_queue: DeleteQueue,
|
||||
|
||||
stamper: Stamper,
|
||||
@@ -300,6 +301,15 @@ impl IndexWriter {
|
||||
let segment_updater =
|
||||
SegmentUpdater::create(index.clone(), stamper.clone(), &delete_queue.cursor())?;
|
||||
|
||||
let pool = ThreadPoolBuilder::new()
|
||||
.thread_name(|i| format!("indexing_worker_thread_{i}"))
|
||||
.num_threads(num_threads)
|
||||
.build()
|
||||
.map_err(|_| {
|
||||
crate::TantivyError::SystemError(
|
||||
"Failed to spawn indexing worker thread".to_string(),
|
||||
)
|
||||
})?;
|
||||
let mut index_writer = IndexWriter {
|
||||
_directory_lock: Some(directory_lock),
|
||||
|
||||
@@ -311,8 +321,9 @@ impl IndexWriter {
|
||||
|
||||
segment_updater,
|
||||
|
||||
workers_join_handle: vec![],
|
||||
workers_future_results: vec![],
|
||||
num_threads,
|
||||
pool,
|
||||
|
||||
delete_queue,
|
||||
|
||||
@@ -342,11 +353,12 @@ impl IndexWriter {
|
||||
// dropping the last reference to the segment_updater.
|
||||
self.drop_sender();
|
||||
|
||||
let former_workers_handles = std::mem::take(&mut self.workers_join_handle);
|
||||
for join_handle in former_workers_handles {
|
||||
join_handle
|
||||
.join()
|
||||
.map_err(|_| error_in_index_worker_thread("Worker thread panicked."))?
|
||||
let former_workers_future_results = std::mem::take(&mut self.workers_future_results);
|
||||
for future_result in former_workers_future_results.into_iter() {
|
||||
future_result
|
||||
.into_inner()
|
||||
.map_err(|_| TantivyError::Poisoned)?
|
||||
.wait()
|
||||
.map_err(|_| error_in_index_worker_thread("Worker thread failed."))?;
|
||||
}
|
||||
|
||||
@@ -407,43 +419,51 @@ impl IndexWriter {
|
||||
|
||||
let mem_budget = self.memory_arena_in_bytes_per_thread;
|
||||
let index = self.index.clone();
|
||||
let join_handle: JoinHandle<crate::Result<()>> = thread::Builder::new()
|
||||
.name(format!("thrd-tantivy-index{}", self.worker_id))
|
||||
.spawn(move || {
|
||||
loop {
|
||||
let mut document_iterator = document_receiver_clone
|
||||
.clone()
|
||||
.into_iter()
|
||||
.filter(|batch| !batch.is_empty())
|
||||
.peekable();
|
||||
let (scheduled_result, sender) = FutureResult::create(
|
||||
"A segment_updater future did not succeed. This should never happen.",
|
||||
);
|
||||
self.pool.spawn(move || {
|
||||
loop {
|
||||
let mut document_iterator = document_receiver_clone
|
||||
.clone()
|
||||
.into_iter()
|
||||
.filter(|batch| !batch.is_empty())
|
||||
.peekable();
|
||||
|
||||
// The peeking here is to avoid creating a new segment's files
|
||||
// if no document are available.
|
||||
//
|
||||
// This is a valid guarantee as the peeked document now belongs to
|
||||
// our local iterator.
|
||||
if let Some(batch) = document_iterator.peek() {
|
||||
assert!(!batch.is_empty());
|
||||
delete_cursor.skip_to(batch[0].opstamp);
|
||||
} else {
|
||||
// No more documents.
|
||||
// It happens when there is a commit, or if the `IndexWriter`
|
||||
// was dropped.
|
||||
index_writer_bomb.defuse();
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
index_documents(
|
||||
mem_budget,
|
||||
index.new_segment(),
|
||||
&mut document_iterator,
|
||||
&mut segment_updater,
|
||||
delete_cursor.clone(),
|
||||
)?;
|
||||
// The peeking here is to avoid creating a new segment's files
|
||||
// if no document are available.
|
||||
//
|
||||
// This is a valid guarantee as the peeked document now belongs to
|
||||
// our local iterator.
|
||||
if let Some(batch) = document_iterator.peek() {
|
||||
assert!(!batch.is_empty());
|
||||
delete_cursor.skip_to(batch[0].opstamp);
|
||||
} else {
|
||||
// No more documents.
|
||||
// It happens when there is a commit, or if the `IndexWriter`
|
||||
// was dropped.
|
||||
index_writer_bomb.defuse();
|
||||
let _send_result = sender.send(Ok(()));
|
||||
return;
|
||||
}
|
||||
})?;
|
||||
|
||||
let result = index_documents(
|
||||
mem_budget,
|
||||
index.new_segment(),
|
||||
&mut document_iterator,
|
||||
&mut segment_updater,
|
||||
delete_cursor.clone(),
|
||||
);
|
||||
|
||||
if result.is_err() {
|
||||
let _send_result = sender.send(result);
|
||||
return;
|
||||
}
|
||||
}
|
||||
});
|
||||
self.worker_id += 1;
|
||||
self.workers_join_handle.push(join_handle);
|
||||
self.workers_future_results
|
||||
.push(Mutex::new(scheduled_result));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -621,13 +641,13 @@ impl IndexWriter {
|
||||
// and recreate a new one.
|
||||
self.recreate_document_channel();
|
||||
|
||||
let former_workers_join_handle = std::mem::take(&mut self.workers_join_handle);
|
||||
let former_workers_future_results = std::mem::take(&mut self.workers_future_results);
|
||||
|
||||
for worker_handle in former_workers_join_handle {
|
||||
let indexing_worker_result = worker_handle
|
||||
.join()
|
||||
.map_err(|e| TantivyError::ErrorInThread(format!("{:?}", e)))?;
|
||||
indexing_worker_result?;
|
||||
for future_result in former_workers_future_results.into_iter() {
|
||||
future_result
|
||||
.into_inner()
|
||||
.map_err(|_| TantivyError::Poisoned)?
|
||||
.wait()?;
|
||||
self.add_indexing_worker()?;
|
||||
}
|
||||
|
||||
@@ -768,8 +788,10 @@ impl Drop for IndexWriter {
|
||||
fn drop(&mut self) {
|
||||
self.segment_updater.kill();
|
||||
self.drop_sender();
|
||||
for work in self.workers_join_handle.drain(..) {
|
||||
let _ = work.join();
|
||||
for future_result_mutex in self.workers_future_results.drain(..) {
|
||||
let _ = future_result_mutex
|
||||
.into_inner()
|
||||
.map(|future_result| future_result.wait());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user