diff --git a/src/core/index.rs b/src/core/index.rs index ad55c53bf..d22d39df2 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -12,7 +12,6 @@ use crate::directory::INDEX_WRITER_LOCK; use crate::directory::{Directory, RAMDirectory}; use crate::error::DataCorruption; use crate::error::TantivyError; -use crate::indexer::index_writer::open_index_writer; use crate::indexer::index_writer::HEAP_SIZE_MIN; use crate::indexer::segment_updater::save_new_metas; use crate::reader::IndexReader; @@ -265,7 +264,7 @@ impl Index { ) })?; let heap_size_in_bytes_per_thread = overall_heap_size_in_bytes / num_threads; - open_index_writer( + IndexWriter::new( self, num_threads, heap_size_in_bytes_per_thread, diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index c642efe7d..df0917f02 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -88,75 +88,7 @@ pub struct IndexWriter { committed_opstamp: Opstamp, } -/// Open a new index writer. Attempts to acquire a lockfile. -/// -/// The lockfile should be deleted on drop, but it is possible -/// that due to a panic or other error, a stale lockfile will be -/// left in the index directory. If you are sure that no other -/// `IndexWriter` on the system is accessing the index directory, -/// it is safe to manually delete the lockfile. -/// -/// `num_threads` specifies the number of indexing workers that -/// should work at the same time. -/// # Errors -/// If the lockfile already exists, returns `Error::FileAlreadyExists`. -/// # Panics -/// If the heap size per thread is too small, panics. -pub fn open_index_writer( - index: &Index, - num_threads: usize, - heap_size_in_bytes_per_thread: usize, - directory_lock: DirectoryLock, -) -> Result { - if heap_size_in_bytes_per_thread < HEAP_SIZE_MIN { - let err_msg = format!( - "The heap size per thread needs to be at least {}.", - HEAP_SIZE_MIN - ); - return Err(TantivyError::InvalidArgument(err_msg)); - } - if heap_size_in_bytes_per_thread >= HEAP_SIZE_MAX { - let err_msg = format!("The heap size per thread cannot exceed {}", HEAP_SIZE_MAX); - return Err(TantivyError::InvalidArgument(err_msg)); - } - let (document_sender, document_receiver): (OperationSender, OperationReceiver) = - channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS); - - let delete_queue = DeleteQueue::new(); - - let current_opstamp = index.load_metas()?.opstamp; - - let stamper = Stamper::new(current_opstamp); - - let segment_updater = - SegmentUpdater::create(index.clone(), stamper.clone(), &delete_queue.cursor())?; - - let mut index_writer = IndexWriter { - _directory_lock: Some(directory_lock), - - heap_size_in_bytes_per_thread, - index: index.clone(), - - operation_receiver: document_receiver, - operation_sender: document_sender, - - segment_updater, - - workers_join_handle: vec![], - num_threads, - - delete_queue, - - committed_opstamp: current_opstamp, - stamper, - - worker_id: 0, - }; - index_writer.start_workers()?; - Ok(index_writer) -} - -pub fn compute_deleted_bitset( +fn compute_deleted_bitset( delete_bitset: &mut BitSet, segment_reader: &SegmentReader, delete_cursor: &mut DeleteCursor, @@ -195,7 +127,7 @@ pub fn compute_deleted_bitset( /// Advance delete for the given segment up /// to the target opstamp. -pub fn advance_deletes( +pub(crate) fn advance_deletes( mut segment: Segment, segment_entry: &mut SegmentEntry, target_opstamp: Opstamp, @@ -318,6 +250,74 @@ fn apply_deletes( } impl IndexWriter { + /// Create a new index writer. Attempts to acquire a lockfile. + /// + /// The lockfile should be deleted on drop, but it is possible + /// that due to a panic or other error, a stale lockfile will be + /// left in the index directory. If you are sure that no other + /// `IndexWriter` on the system is accessing the index directory, + /// it is safe to manually delete the lockfile. + /// + /// `num_threads` specifies the number of indexing workers that + /// should work at the same time. + /// # Errors + /// If the lockfile already exists, returns `Error::FileAlreadyExists`. + /// # Panics + /// If the heap size per thread is too small, panics. + pub(crate) fn new( + index: &Index, + num_threads: usize, + heap_size_in_bytes_per_thread: usize, + directory_lock: DirectoryLock, + ) -> Result { + if heap_size_in_bytes_per_thread < HEAP_SIZE_MIN { + let err_msg = format!( + "The heap size per thread needs to be at least {}.", + HEAP_SIZE_MIN + ); + return Err(TantivyError::InvalidArgument(err_msg)); + } + if heap_size_in_bytes_per_thread >= HEAP_SIZE_MAX { + let err_msg = format!("The heap size per thread cannot exceed {}", HEAP_SIZE_MAX); + return Err(TantivyError::InvalidArgument(err_msg)); + } + let (document_sender, document_receiver): (OperationSender, OperationReceiver) = + channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS); + + let delete_queue = DeleteQueue::new(); + + let current_opstamp = index.load_metas()?.opstamp; + + let stamper = Stamper::new(current_opstamp); + + let segment_updater = + SegmentUpdater::create(index.clone(), stamper.clone(), &delete_queue.cursor())?; + + let mut index_writer = IndexWriter { + _directory_lock: Some(directory_lock), + + heap_size_in_bytes_per_thread, + index: index.clone(), + + operation_receiver: document_receiver, + operation_sender: document_sender, + + segment_updater, + + workers_join_handle: vec![], + num_threads, + + delete_queue, + + committed_opstamp: current_opstamp, + stamper, + + worker_id: 0, + }; + index_writer.start_workers()?; + Ok(index_writer) + } + /// If there are some merging threads, blocks until they all finish their work and /// then drop the `IndexWriter`. pub fn wait_merging_threads(mut self) -> Result<()> { @@ -538,7 +538,7 @@ impl IndexWriter { .take() .expect("The IndexWriter does not have any lock. This is a bug, please report."); - let new_index_writer: IndexWriter = open_index_writer( + let new_index_writer: IndexWriter = IndexWriter::new( &self.index, self.num_threads, self.heap_size_in_bytes_per_thread,