From e5bf41c1f656e658d570a9999df085a75547923a Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 25 Mar 2020 10:05:01 +0900 Subject: [PATCH] moving merge inventory to the segment_manager --- src/common/composite_file.rs | 2 +- src/core/segment.rs | 2 +- src/directory/mod.rs | 2 + src/directory/persistor.rs | 42 ++++++++++++++++++++ src/directory/ram_directory.rs | 10 +++-- src/fastfield/delete.rs | 2 +- src/indexer/index_writer.rs | 11 ++--- src/indexer/index_writer_config.rs | 9 +++++ src/indexer/merge_operation.rs | 4 +- src/indexer/mod.rs | 1 + src/indexer/resource_manager.rs | 64 +++++++++++++++++++++++++----- src/indexer/segment_manager.rs | 50 +++++++++++++++-------- src/indexer/segment_updater.rs | 19 ++++----- 13 files changed, 163 insertions(+), 55 deletions(-) create mode 100644 src/directory/persistor.rs diff --git a/src/common/composite_file.rs b/src/common/composite_file.rs index ec0295014..17341d1f2 100644 --- a/src/common/composite_file.rs +++ b/src/common/composite_file.rs @@ -190,7 +190,7 @@ mod test { use crate::schema::Field; use std::io::Write; use std::path::Path; - use crate::indexer::ResourceManager; + #[test] fn test_composite_file() { diff --git a/src/core/segment.rs b/src/core/segment.rs index 41a8c5445..f58f4e52b 100644 --- a/src/core/segment.rs +++ b/src/core/segment.rs @@ -11,7 +11,7 @@ use crate::Opstamp; use std::fmt; use std::ops::{Deref, DerefMut}; use std::path::PathBuf; -use crate::indexer::{Allocation, ResourceManager}; +use crate::indexer::{ResourceManager}; #[derive(Clone)] pub(crate) enum SegmentDirectory { diff --git a/src/directory/mod.rs b/src/directory/mod.rs index ee85c908b..c82a2fe23 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -15,6 +15,8 @@ mod ram_directory; mod read_only_source; mod spilling_writer; mod watch_event_router; +mod persistor; + /// Errors specific to the directory module. pub mod error; diff --git a/src/directory/persistor.rs b/src/directory/persistor.rs new file mode 100644 index 000000000..830374a50 --- /dev/null +++ b/src/directory/persistor.rs @@ -0,0 +1,42 @@ +use crate::indexer::{SegmentManager, ResourceManager, MergeOperationInventory}; +use std::thread::JoinHandle; +use crate::{IndexWriterConfig, SegmentId}; +use std::collections::HashSet; + +pub(crate) struct Persistor { + segment_manager: SegmentManager, + memory_manager: ResourceManager, + thread_handle: JoinHandle<()>, +} + +impl Persistor { + pub(crate) fn create_and_start(segment_manager: SegmentManager, + memory_manager: ResourceManager, + merge_operations: MergeOperationInventory, + config: IndexWriterConfig) -> crate::Result { + let memory_manager_clone = memory_manager.clone(); + let thread_handle = std::thread::Builder::new() + .name("persistor-thread".to_string()) + .spawn(move || { + while let Ok(_) = memory_manager_clone.wait_until_in_range(config.persist_low..) { + let merge_segment_ids: HashSet = merge_operations.segment_in_merge(); + + } + }).map_err(|_err| crate::TantivyError::ErrorInThread("Failed to start persistor thread.".to_string()))?; + Ok(Persistor { + segment_manager, + memory_manager, + thread_handle + }) + } + + /// Stop the persisting thread. + /// + /// The memory manager will be terminated, which will unlock the thread from any waiting + /// position. + /// This method blocks for a short amount of tim until the persistor thread has terminated. + pub fn stop(self) { + self.memory_manager.terminate(); + let _ = self.thread_handle.join(); + } +} \ No newline at end of file diff --git a/src/directory/ram_directory.rs b/src/directory/ram_directory.rs index 978a70db7..dbadae771 100644 --- a/src/directory/ram_directory.rs +++ b/src/directory/ram_directory.rs @@ -114,8 +114,8 @@ impl InnerDirectory { self.watch_router.subscribe(watch_handle) } - fn total_mem_usage(&self) -> usize { - self.fs.values().map(|f| f.len()).sum() + fn total_mem_usage(&self) -> u64 { + self.fs.values().map(|source| source.len() as u64).sum() } } @@ -137,6 +137,10 @@ pub struct RAMDirectory { impl RAMDirectory { + /// Creates a new RAMDirectory. + /// + /// Check `.create_with_memory_manager(..)` if you want to associate an external memory + /// manager to your RAMDirectory. pub fn create() -> RAMDirectory { RAMDirectory::default() } @@ -155,7 +159,7 @@ impl RAMDirectory { /// Returns the sum of the size of the different files /// in the RAMDirectory. - pub fn total_mem_usage(&self) -> usize { + pub fn total_mem_usage(&self) -> u64 { self.fs.read().unwrap().total_mem_usage() } diff --git a/src/fastfield/delete.rs b/src/fastfield/delete.rs index 7be1f911e..5db81d47e 100644 --- a/src/fastfield/delete.rs +++ b/src/fastfield/delete.rs @@ -89,7 +89,7 @@ mod tests { fn test_delete_bitset_helper(bitset: &BitSet, max_doc: u32) { let test_path = PathBuf::from("test"); - let mut directory = RAMDirectory::create(); + let mut directory = RAMDirectory::default(); { let mut writer = directory.open_write(&*test_path).unwrap(); write_delete_bitset(bitset, max_doc, &mut writer).unwrap(); diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 1dcfe2be3..c7da993ef 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -371,16 +371,11 @@ impl IndexWriter { })?; } - let result = self + self .segment_updater - .wait_merging_thread() - .map_err(|_| TantivyError::ErrorInThread("Failed to join merging thread.".into())); + .wait_merging_thread(); - if let Err(ref e) = result { - error!("Some merging thread failed {:?}", e); - } - - result + Ok(()) } /// Creates a new segment. diff --git a/src/indexer/index_writer_config.rs b/src/indexer/index_writer_config.rs index 8229d3405..97494191f 100644 --- a/src/indexer/index_writer_config.rs +++ b/src/indexer/index_writer_config.rs @@ -13,6 +13,9 @@ pub struct IndexWriterConfig { pub max_indexing_threads: usize, pub max_merging_threads: usize, pub memory_budget: u64, + pub store_flush_num_bytes: u64, + pub persist_low: u64, + pub persist_high: u64, } impl Default for IndexWriterConfig { @@ -21,6 +24,9 @@ impl Default for IndexWriterConfig { max_indexing_threads: 1, max_merging_threads: 3, memory_budget: 50_000_000u64, + store_flush_num_bytes: 10_000_000u64, + persist_low: 10_000_000u64, + persist_high: 50_000_000u64 } } } @@ -32,6 +38,9 @@ impl IndexWriterConfig { max_indexing_threads: 1, max_merging_threads: 5, memory_budget: 4_000_000u64, + store_flush_num_bytes: 500_000u64, + persist_low: 2_000_000u64, + persist_high: 3_000_000u64, } } diff --git a/src/indexer/merge_operation.rs b/src/indexer/merge_operation.rs index 5f5603027..820aacae4 100644 --- a/src/indexer/merge_operation.rs +++ b/src/indexer/merge_operation.rs @@ -6,7 +6,7 @@ use std::collections::HashSet; use std::fmt; use std::ops::Deref; -#[derive(Default)] +#[derive(Default, Clone)] pub(crate) struct MergeOperationInventory { inventory: Inventory, num_merge_watcher: ResourceManager, @@ -32,7 +32,7 @@ impl MergeOperationInventory { } pub fn wait_until_empty(&self) { - self.num_merge_watcher.wait_until_in_range(0..1); + let _ = self.num_merge_watcher.wait_until_in_range(0..1); } } diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 180e9c9f9..6bca0f3b9 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -19,6 +19,7 @@ mod segment_writer; mod stamper; pub(crate) use self::resource_manager::{Allocation, ResourceManager}; +pub(crate) use self::merge_operation::MergeOperationInventory; pub use self::index_writer::IndexWriter; pub use self::index_writer_config::IndexWriterConfig; pub use self::log_merge_policy::LogMergePolicy; diff --git a/src/indexer/resource_manager.rs b/src/indexer/resource_manager.rs index 8e1532992..d5244972b 100644 --- a/src/indexer/resource_manager.rs +++ b/src/indexer/resource_manager.rs @@ -1,12 +1,27 @@ use std::ops::RangeBounds; use std::sync::{Arc, Condvar, Mutex, MutexGuard, RwLock}; +struct LockedData { + count: u64, + enabled: bool +} + +impl Default for LockedData { + fn default() -> Self { + LockedData { + count: 0u64, + enabled: true + } + } +} + #[derive(Default)] struct Inner { - resource_level: Mutex, + resource_level: Mutex, convdvar: Condvar, } + /// The resource manager makes it possible to track the amount of level of a given resource. /// There is no magic here : it is to the description of the user to declare how much /// of the resource is being held. @@ -29,10 +44,10 @@ pub struct ResourceManager { impl ResourceManager { /// Return the total amount of reousrce allocated pub fn total_amount(&self) -> u64 { - *self.lock() + self.lock().count } - fn lock(&self) -> MutexGuard { + fn lock(&self) -> MutexGuard { self.inner .resource_level .lock() @@ -44,8 +59,8 @@ impl ResourceManager { return; } let mut lock = self.lock(); - let new_val = (*lock as i64) + delta; - *lock = new_val as u64; + let new_val = lock.count as i64 + delta; + lock.count = new_val as u64; self.inner.convdvar.notify_all(); } @@ -61,17 +76,32 @@ impl ResourceManager { } } + /// Stops the resource manager. + /// + /// If any thread is waiting via `.wait_until_in_range(...)`, the method will stop + /// being blocking and will return an error. + pub fn terminate(&self) { + self.lock().enabled = false; + self.inner.convdvar.notify_all(); + } + /// Blocks the current thread until the resource level reaches the given range, /// in a cpu-efficient way. /// /// This method does not necessarily wakes up the current thread at every transition /// into the targetted range, but any durable entry in the range will be detected. - pub fn wait_until_in_range>(&self, range: R) -> u64 { + pub fn wait_until_in_range>(&self, range: R) -> Result { let mut levels = self.lock(); - while !range.contains(&*levels) { - levels = self.inner.convdvar.wait(levels).unwrap(); + if !levels.enabled { + return Err(levels.count) } - *levels + while !range.contains(&levels.count) { + levels = self.inner.convdvar.wait(levels).unwrap(); + if !levels.enabled { + return Err(levels.count) + } + } + Ok(levels.count) } } @@ -149,7 +179,7 @@ mod tests { std::mem::drop(_allocation3); assert!(block_on(recv).is_ok()); }); - assert_eq!(memory.wait_until_in_range(5u64..8u64), 5u64); + assert_eq!(memory.wait_until_in_range(5u64..8u64), Ok(5u64)); assert!(send.send(()).is_ok()); } @@ -166,4 +196,18 @@ mod tests { assert_eq!(alloc.amount(), 14u64); assert_eq!(memory.total_amount(), 14u64 + 3u64) } + + #[test] + fn test_stop_resource_manager() { + let resource_manager = ResourceManager::default(); + let resource_manager_clone = resource_manager.clone(); + let (sender, recv) = oneshot::channel(); + let join_handle = thread::spawn(move || { + assert!(sender.send(()).is_ok()); + resource_manager_clone.wait_until_in_range(10..20) + }); + let _ = block_on(recv); + resource_manager.terminate(); + assert_eq!(join_handle.join().unwrap(), Err(0u64)); + } } diff --git a/src/indexer/segment_manager.rs b/src/indexer/segment_manager.rs index 0671b04eb..eb9fb97a1 100644 --- a/src/indexer/segment_manager.rs +++ b/src/indexer/segment_manager.rs @@ -2,8 +2,8 @@ use super::segment_register::SegmentRegister; use crate::core::SegmentId; use crate::core::SegmentMeta; use crate::error::TantivyError; -use crate::indexer::SegmentEntry; -use crate::Segment; +use crate::indexer::{SegmentEntry, MergeOperationInventory, MergeCandidate, MergeOperation}; +use crate::{Segment, Opstamp}; use std::collections::hash_set::HashSet; use std::fmt::{self, Debug, Formatter}; use std::sync::{Arc, RwLock}; @@ -56,6 +56,7 @@ impl SegmentRegisters { #[derive(Default)] pub struct SegmentManager { registers: Arc>, + merge_operations: MergeOperationInventory, } impl Debug for SegmentManager { @@ -69,24 +70,26 @@ impl Debug for SegmentManager { } } -pub fn get_mergeable_segments( - in_merge_segment_ids: &HashSet, - segment_manager: &SegmentManager, -) -> (Vec, Vec) { - let registers_lock = segment_manager.read(); - ( - registers_lock - .committed - .get_mergeable_segments(in_merge_segment_ids), - registers_lock - .uncommitted - .get_mergeable_segments(in_merge_segment_ids), - ) -} impl SegmentManager { pub(crate) fn new(registers: Arc>) -> SegmentManager { - SegmentManager { registers } + SegmentManager { + registers, + merge_operations: Default::default() + } + } + + pub fn new_merge_operation(&self, opstamp: Opstamp, merge_candidate: MergeCandidate) -> MergeOperation { + MergeOperation::new( + &self.merge_operations, + opstamp, + merge_candidate.0 + + ) + } + + pub fn wait_merging_thread(&self) { + self.merge_operations.wait_until_empty() } /// Returns all of the segment entries (committed or uncommitted) @@ -97,6 +100,19 @@ impl SegmentManager { segment_entries } + pub fn get_mergeable_segments(&self) -> (Vec, Vec) { + let in_merge_segment_ids: HashSet = self.merge_operations.segment_in_merge(); + let registers_lock = self.read(); + ( + registers_lock + .committed + .get_mergeable_segments(&in_merge_segment_ids), + registers_lock + .uncommitted + .get_mergeable_segments(&in_merge_segment_ids), + ) + } + // Lock poisoning should never happen : // The lock is acquired and released within this class, // and the operations cannot panic. diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index c42e06c53..0be1e8abd 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -1,4 +1,4 @@ -use super::segment_manager::{get_mergeable_segments, SegmentManager}; +use super::segment_manager::SegmentManager; use crate::core::Index; use crate::core::IndexMeta; use crate::core::Segment; @@ -8,7 +8,6 @@ use crate::core::SerializableSegment; use crate::core::META_FILEPATH; use crate::directory::{Directory, DirectoryClone, GarbageCollectionResult}; use crate::indexer::index_writer::advance_deletes; -use crate::indexer::merge_operation::MergeOperationInventory; use crate::indexer::merger::IndexMerger; use crate::indexer::segment_manager::{SegmentRegisters, SegmentsStatus}; use crate::indexer::stamper::Stamper; @@ -160,7 +159,6 @@ pub(crate) struct InnerSegmentUpdater { merge_policy: RwLock>>, killed: AtomicBool, stamper: Stamper, - merge_operations: MergeOperationInventory, } impl SegmentUpdater { @@ -198,7 +196,6 @@ impl SegmentUpdater { merge_policy: RwLock::new(Arc::new(Box::new(DefaultMergePolicy::default()))), killed: AtomicBool::new(false), stamper, - merge_operations: Default::default(), }))) } @@ -363,7 +360,7 @@ impl SegmentUpdater { pub(crate) fn make_merge_operation(&self, segment_ids: &[SegmentId]) -> MergeOperation { let commit_opstamp = self.load_metas().opstamp; - MergeOperation::new(&self.merge_operations, commit_opstamp, segment_ids.to_vec()) + self.segment_manager.new_merge_operation(commit_opstamp, MergeCandidate(segment_ids.to_vec())) } // Starts a merge operation. This function will block until the merge operation is effectively @@ -437,9 +434,8 @@ impl SegmentUpdater { } async fn consider_merge_options(&self) { - let merge_segment_ids: HashSet = self.merge_operations.segment_in_merge(); let (committed_segments, uncommitted_segments) = - get_mergeable_segments(&merge_segment_ids, &self.segment_manager); + self.segment_manager.get_mergeable_segments(); // Committed segments cannot be merged with uncommitted_segments. // We therefore consider merges using these two sets of segments independently. @@ -450,7 +446,7 @@ impl SegmentUpdater { .compute_merge_candidates(&uncommitted_segments) .into_iter() .map(|merge_candidate| { - MergeOperation::new(&self.merge_operations, current_opstamp, merge_candidate.0) + self.segment_manager.new_merge_operation(current_opstamp, merge_candidate) }) .collect(); @@ -459,7 +455,7 @@ impl SegmentUpdater { .compute_merge_candidates(&committed_segments) .into_iter() .map(|merge_candidate: MergeCandidate| { - MergeOperation::new(&self.merge_operations, commit_opstamp, merge_candidate.0) + self.segment_manager.new_merge_operation(commit_opstamp, merge_candidate) }) .collect::>(); merge_candidates.extend(committed_merge_candidates.into_iter()); @@ -539,9 +535,8 @@ impl SegmentUpdater { /// /// Obsolete files will eventually be cleaned up /// by the directory garbage collector. - pub fn wait_merging_thread(&self) -> crate::Result<()> { - self.merge_operations.wait_until_empty(); - Ok(()) + pub fn wait_merging_thread(&self) { + self.segment_manager.wait_merging_thread() } }