mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-06 17:22:54 +00:00
Compare commits
1 Commits
check_bug
...
rayon-inde
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
85707d093b |
@@ -1,10 +1,9 @@
|
|||||||
use std::ops::Range;
|
use std::ops::Range;
|
||||||
use std::sync::Arc;
|
use std::sync::{Arc, Mutex};
|
||||||
use std::thread;
|
|
||||||
use std::thread::JoinHandle;
|
|
||||||
|
|
||||||
use common::BitSet;
|
use common::BitSet;
|
||||||
use crossbeam::channel;
|
use crossbeam::channel;
|
||||||
|
use rayon::{ThreadPool, ThreadPoolBuilder};
|
||||||
use smallvec::smallvec;
|
use smallvec::smallvec;
|
||||||
|
|
||||||
use super::operation::{AddOperation, UserOperation};
|
use super::operation::{AddOperation, UserOperation};
|
||||||
@@ -61,7 +60,7 @@ pub struct IndexWriter {
|
|||||||
|
|
||||||
memory_arena_in_bytes_per_thread: usize,
|
memory_arena_in_bytes_per_thread: usize,
|
||||||
|
|
||||||
workers_join_handle: Vec<JoinHandle<crate::Result<()>>>,
|
workers_future_results: Vec<Mutex<FutureResult<()>>>,
|
||||||
|
|
||||||
index_writer_status: IndexWriterStatus,
|
index_writer_status: IndexWriterStatus,
|
||||||
operation_sender: AddBatchSender,
|
operation_sender: AddBatchSender,
|
||||||
@@ -72,6 +71,8 @@ pub struct IndexWriter {
|
|||||||
|
|
||||||
num_threads: usize,
|
num_threads: usize,
|
||||||
|
|
||||||
|
pool: ThreadPool,
|
||||||
|
|
||||||
delete_queue: DeleteQueue,
|
delete_queue: DeleteQueue,
|
||||||
|
|
||||||
stamper: Stamper,
|
stamper: Stamper,
|
||||||
@@ -300,6 +301,15 @@ impl IndexWriter {
|
|||||||
let segment_updater =
|
let segment_updater =
|
||||||
SegmentUpdater::create(index.clone(), stamper.clone(), &delete_queue.cursor())?;
|
SegmentUpdater::create(index.clone(), stamper.clone(), &delete_queue.cursor())?;
|
||||||
|
|
||||||
|
let pool = ThreadPoolBuilder::new()
|
||||||
|
.thread_name(|i| format!("indexing_worker_thread_{i}"))
|
||||||
|
.num_threads(num_threads)
|
||||||
|
.build()
|
||||||
|
.map_err(|_| {
|
||||||
|
crate::TantivyError::SystemError(
|
||||||
|
"Failed to spawn indexing worker thread".to_string(),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
let mut index_writer = IndexWriter {
|
let mut index_writer = IndexWriter {
|
||||||
_directory_lock: Some(directory_lock),
|
_directory_lock: Some(directory_lock),
|
||||||
|
|
||||||
@@ -311,8 +321,9 @@ impl IndexWriter {
|
|||||||
|
|
||||||
segment_updater,
|
segment_updater,
|
||||||
|
|
||||||
workers_join_handle: vec![],
|
workers_future_results: vec![],
|
||||||
num_threads,
|
num_threads,
|
||||||
|
pool,
|
||||||
|
|
||||||
delete_queue,
|
delete_queue,
|
||||||
|
|
||||||
@@ -342,11 +353,12 @@ impl IndexWriter {
|
|||||||
// dropping the last reference to the segment_updater.
|
// dropping the last reference to the segment_updater.
|
||||||
self.drop_sender();
|
self.drop_sender();
|
||||||
|
|
||||||
let former_workers_handles = std::mem::take(&mut self.workers_join_handle);
|
let former_workers_future_results = std::mem::take(&mut self.workers_future_results);
|
||||||
for join_handle in former_workers_handles {
|
for future_result in former_workers_future_results.into_iter() {
|
||||||
join_handle
|
future_result
|
||||||
.join()
|
.into_inner()
|
||||||
.map_err(|_| error_in_index_worker_thread("Worker thread panicked."))?
|
.map_err(|_| TantivyError::Poisoned)?
|
||||||
|
.wait()
|
||||||
.map_err(|_| error_in_index_worker_thread("Worker thread failed."))?;
|
.map_err(|_| error_in_index_worker_thread("Worker thread failed."))?;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -407,43 +419,51 @@ impl IndexWriter {
|
|||||||
|
|
||||||
let mem_budget = self.memory_arena_in_bytes_per_thread;
|
let mem_budget = self.memory_arena_in_bytes_per_thread;
|
||||||
let index = self.index.clone();
|
let index = self.index.clone();
|
||||||
let join_handle: JoinHandle<crate::Result<()>> = thread::Builder::new()
|
let (scheduled_result, sender) = FutureResult::create(
|
||||||
.name(format!("thrd-tantivy-index{}", self.worker_id))
|
"A segment_updater future did not succeed. This should never happen.",
|
||||||
.spawn(move || {
|
);
|
||||||
loop {
|
self.pool.spawn(move || {
|
||||||
let mut document_iterator = document_receiver_clone
|
loop {
|
||||||
.clone()
|
let mut document_iterator = document_receiver_clone
|
||||||
.into_iter()
|
.clone()
|
||||||
.filter(|batch| !batch.is_empty())
|
.into_iter()
|
||||||
.peekable();
|
.filter(|batch| !batch.is_empty())
|
||||||
|
.peekable();
|
||||||
|
|
||||||
// The peeking here is to avoid creating a new segment's files
|
// The peeking here is to avoid creating a new segment's files
|
||||||
// if no document are available.
|
// if no document are available.
|
||||||
//
|
//
|
||||||
// This is a valid guarantee as the peeked document now belongs to
|
// This is a valid guarantee as the peeked document now belongs to
|
||||||
// our local iterator.
|
// our local iterator.
|
||||||
if let Some(batch) = document_iterator.peek() {
|
if let Some(batch) = document_iterator.peek() {
|
||||||
assert!(!batch.is_empty());
|
assert!(!batch.is_empty());
|
||||||
delete_cursor.skip_to(batch[0].opstamp);
|
delete_cursor.skip_to(batch[0].opstamp);
|
||||||
} else {
|
} else {
|
||||||
// No more documents.
|
// No more documents.
|
||||||
// It happens when there is a commit, or if the `IndexWriter`
|
// It happens when there is a commit, or if the `IndexWriter`
|
||||||
// was dropped.
|
// was dropped.
|
||||||
index_writer_bomb.defuse();
|
index_writer_bomb.defuse();
|
||||||
return Ok(());
|
let _send_result = sender.send(Ok(()));
|
||||||
}
|
return;
|
||||||
|
|
||||||
index_documents(
|
|
||||||
mem_budget,
|
|
||||||
index.new_segment(),
|
|
||||||
&mut document_iterator,
|
|
||||||
&mut segment_updater,
|
|
||||||
delete_cursor.clone(),
|
|
||||||
)?;
|
|
||||||
}
|
}
|
||||||
})?;
|
|
||||||
|
let result = index_documents(
|
||||||
|
mem_budget,
|
||||||
|
index.new_segment(),
|
||||||
|
&mut document_iterator,
|
||||||
|
&mut segment_updater,
|
||||||
|
delete_cursor.clone(),
|
||||||
|
);
|
||||||
|
|
||||||
|
if result.is_err() {
|
||||||
|
let _send_result = sender.send(result);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
self.worker_id += 1;
|
self.worker_id += 1;
|
||||||
self.workers_join_handle.push(join_handle);
|
self.workers_future_results
|
||||||
|
.push(Mutex::new(scheduled_result));
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -621,13 +641,13 @@ impl IndexWriter {
|
|||||||
// and recreate a new one.
|
// and recreate a new one.
|
||||||
self.recreate_document_channel();
|
self.recreate_document_channel();
|
||||||
|
|
||||||
let former_workers_join_handle = std::mem::take(&mut self.workers_join_handle);
|
let former_workers_future_results = std::mem::take(&mut self.workers_future_results);
|
||||||
|
|
||||||
for worker_handle in former_workers_join_handle {
|
for future_result in former_workers_future_results.into_iter() {
|
||||||
let indexing_worker_result = worker_handle
|
future_result
|
||||||
.join()
|
.into_inner()
|
||||||
.map_err(|e| TantivyError::ErrorInThread(format!("{:?}", e)))?;
|
.map_err(|_| TantivyError::Poisoned)?
|
||||||
indexing_worker_result?;
|
.wait()?;
|
||||||
self.add_indexing_worker()?;
|
self.add_indexing_worker()?;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -768,8 +788,10 @@ impl Drop for IndexWriter {
|
|||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
self.segment_updater.kill();
|
self.segment_updater.kill();
|
||||||
self.drop_sender();
|
self.drop_sender();
|
||||||
for work in self.workers_join_handle.drain(..) {
|
for future_result_mutex in self.workers_future_results.drain(..) {
|
||||||
let _ = work.join();
|
let _ = future_result_mutex
|
||||||
|
.into_inner()
|
||||||
|
.map(|future_result| future_result.wait());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user