Small refactoring

This commit is contained in:
Paul Masurel
2019-07-17 21:55:59 +09:00
parent 4e6dcf3cbe
commit 818a0abbee
2 changed files with 72 additions and 73 deletions

View File

@@ -12,7 +12,6 @@ use crate::directory::INDEX_WRITER_LOCK;
use crate::directory::{Directory, RAMDirectory};
use crate::error::DataCorruption;
use crate::error::TantivyError;
use crate::indexer::index_writer::open_index_writer;
use crate::indexer::index_writer::HEAP_SIZE_MIN;
use crate::indexer::segment_updater::save_new_metas;
use crate::reader::IndexReader;
@@ -265,7 +264,7 @@ impl Index {
)
})?;
let heap_size_in_bytes_per_thread = overall_heap_size_in_bytes / num_threads;
open_index_writer(
IndexWriter::new(
self,
num_threads,
heap_size_in_bytes_per_thread,

View File

@@ -88,75 +88,7 @@ pub struct IndexWriter {
committed_opstamp: Opstamp,
}
/// Open a new index writer. Attempts to acquire a lockfile.
///
/// The lockfile should be deleted on drop, but it is possible
/// that due to a panic or other error, a stale lockfile will be
/// left in the index directory. If you are sure that no other
/// `IndexWriter` on the system is accessing the index directory,
/// it is safe to manually delete the lockfile.
///
/// `num_threads` specifies the number of indexing workers that
/// should work at the same time.
/// # Errors
/// If the lockfile already exists, returns `Error::FileAlreadyExists`.
/// # Panics
/// If the heap size per thread is too small, panics.
pub fn open_index_writer(
index: &Index,
num_threads: usize,
heap_size_in_bytes_per_thread: usize,
directory_lock: DirectoryLock,
) -> Result<IndexWriter> {
if heap_size_in_bytes_per_thread < HEAP_SIZE_MIN {
let err_msg = format!(
"The heap size per thread needs to be at least {}.",
HEAP_SIZE_MIN
);
return Err(TantivyError::InvalidArgument(err_msg));
}
if heap_size_in_bytes_per_thread >= HEAP_SIZE_MAX {
let err_msg = format!("The heap size per thread cannot exceed {}", HEAP_SIZE_MAX);
return Err(TantivyError::InvalidArgument(err_msg));
}
let (document_sender, document_receiver): (OperationSender, OperationReceiver) =
channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
let delete_queue = DeleteQueue::new();
let current_opstamp = index.load_metas()?.opstamp;
let stamper = Stamper::new(current_opstamp);
let segment_updater =
SegmentUpdater::create(index.clone(), stamper.clone(), &delete_queue.cursor())?;
let mut index_writer = IndexWriter {
_directory_lock: Some(directory_lock),
heap_size_in_bytes_per_thread,
index: index.clone(),
operation_receiver: document_receiver,
operation_sender: document_sender,
segment_updater,
workers_join_handle: vec![],
num_threads,
delete_queue,
committed_opstamp: current_opstamp,
stamper,
worker_id: 0,
};
index_writer.start_workers()?;
Ok(index_writer)
}
pub fn compute_deleted_bitset(
fn compute_deleted_bitset(
delete_bitset: &mut BitSet,
segment_reader: &SegmentReader,
delete_cursor: &mut DeleteCursor,
@@ -195,7 +127,7 @@ pub fn compute_deleted_bitset(
/// Advance delete for the given segment up
/// to the target opstamp.
pub fn advance_deletes(
pub(crate) fn advance_deletes(
mut segment: Segment,
segment_entry: &mut SegmentEntry,
target_opstamp: Opstamp,
@@ -318,6 +250,74 @@ fn apply_deletes(
}
impl IndexWriter {
/// Create a new index writer. Attempts to acquire a lockfile.
///
/// The lockfile should be deleted on drop, but it is possible
/// that due to a panic or other error, a stale lockfile will be
/// left in the index directory. If you are sure that no other
/// `IndexWriter` on the system is accessing the index directory,
/// it is safe to manually delete the lockfile.
///
/// `num_threads` specifies the number of indexing workers that
/// should work at the same time.
/// # Errors
/// If the lockfile already exists, returns `Error::FileAlreadyExists`.
/// # Panics
/// If the heap size per thread is too small, panics.
pub(crate) fn new(
index: &Index,
num_threads: usize,
heap_size_in_bytes_per_thread: usize,
directory_lock: DirectoryLock,
) -> Result<IndexWriter> {
if heap_size_in_bytes_per_thread < HEAP_SIZE_MIN {
let err_msg = format!(
"The heap size per thread needs to be at least {}.",
HEAP_SIZE_MIN
);
return Err(TantivyError::InvalidArgument(err_msg));
}
if heap_size_in_bytes_per_thread >= HEAP_SIZE_MAX {
let err_msg = format!("The heap size per thread cannot exceed {}", HEAP_SIZE_MAX);
return Err(TantivyError::InvalidArgument(err_msg));
}
let (document_sender, document_receiver): (OperationSender, OperationReceiver) =
channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
let delete_queue = DeleteQueue::new();
let current_opstamp = index.load_metas()?.opstamp;
let stamper = Stamper::new(current_opstamp);
let segment_updater =
SegmentUpdater::create(index.clone(), stamper.clone(), &delete_queue.cursor())?;
let mut index_writer = IndexWriter {
_directory_lock: Some(directory_lock),
heap_size_in_bytes_per_thread,
index: index.clone(),
operation_receiver: document_receiver,
operation_sender: document_sender,
segment_updater,
workers_join_handle: vec![],
num_threads,
delete_queue,
committed_opstamp: current_opstamp,
stamper,
worker_id: 0,
};
index_writer.start_workers()?;
Ok(index_writer)
}
/// If there are some merging threads, blocks until they all finish their work and
/// then drop the `IndexWriter`.
pub fn wait_merging_threads(mut self) -> Result<()> {
@@ -538,7 +538,7 @@ impl IndexWriter {
.take()
.expect("The IndexWriter does not have any lock. This is a bug, please report.");
let new_index_writer: IndexWriter = open_index_writer(
let new_index_writer: IndexWriter = IndexWriter::new(
&self.index,
self.num_threads,
self.heap_size_in_bytes_per_thread,