moving merge inventory to the segment_manager

This commit is contained in:
Paul Masurel
2020-03-25 10:05:01 +09:00
parent 58d40ebf95
commit e5bf41c1f6
13 changed files with 163 additions and 55 deletions

View File

@@ -190,7 +190,7 @@ mod test {
use crate::schema::Field;
use std::io::Write;
use std::path::Path;
use crate::indexer::ResourceManager;
#[test]
fn test_composite_file() {

View File

@@ -11,7 +11,7 @@ use crate::Opstamp;
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::path::PathBuf;
use crate::indexer::{Allocation, ResourceManager};
use crate::indexer::{ResourceManager};
#[derive(Clone)]
pub(crate) enum SegmentDirectory {

View File

@@ -15,6 +15,8 @@ mod ram_directory;
mod read_only_source;
mod spilling_writer;
mod watch_event_router;
mod persistor;
/// Errors specific to the directory module.
pub mod error;

View File

@@ -0,0 +1,42 @@
use crate::indexer::{SegmentManager, ResourceManager, MergeOperationInventory};
use std::thread::JoinHandle;
use crate::{IndexWriterConfig, SegmentId};
use std::collections::HashSet;
pub(crate) struct Persistor {
segment_manager: SegmentManager,
memory_manager: ResourceManager,
thread_handle: JoinHandle<()>,
}
impl Persistor {
pub(crate) fn create_and_start(segment_manager: SegmentManager,
memory_manager: ResourceManager,
merge_operations: MergeOperationInventory,
config: IndexWriterConfig) -> crate::Result<Persistor> {
let memory_manager_clone = memory_manager.clone();
let thread_handle = std::thread::Builder::new()
.name("persistor-thread".to_string())
.spawn(move || {
while let Ok(_) = memory_manager_clone.wait_until_in_range(config.persist_low..) {
let merge_segment_ids: HashSet<SegmentId> = merge_operations.segment_in_merge();
}
}).map_err(|_err| crate::TantivyError::ErrorInThread("Failed to start persistor thread.".to_string()))?;
Ok(Persistor {
segment_manager,
memory_manager,
thread_handle
})
}
/// Stop the persisting thread.
///
/// The memory manager will be terminated, which will unlock the thread from any waiting
/// position.
/// This method blocks for a short amount of tim until the persistor thread has terminated.
pub fn stop(self) {
self.memory_manager.terminate();
let _ = self.thread_handle.join();
}
}

View File

@@ -114,8 +114,8 @@ impl InnerDirectory {
self.watch_router.subscribe(watch_handle)
}
fn total_mem_usage(&self) -> usize {
self.fs.values().map(|f| f.len()).sum()
fn total_mem_usage(&self) -> u64 {
self.fs.values().map(|source| source.len() as u64).sum()
}
}
@@ -137,6 +137,10 @@ pub struct RAMDirectory {
impl RAMDirectory {
/// Creates a new RAMDirectory.
///
/// Check `.create_with_memory_manager(..)` if you want to associate an external memory
/// manager to your RAMDirectory.
pub fn create() -> RAMDirectory {
RAMDirectory::default()
}
@@ -155,7 +159,7 @@ impl RAMDirectory {
/// Returns the sum of the size of the different files
/// in the RAMDirectory.
pub fn total_mem_usage(&self) -> usize {
pub fn total_mem_usage(&self) -> u64 {
self.fs.read().unwrap().total_mem_usage()
}

View File

@@ -89,7 +89,7 @@ mod tests {
fn test_delete_bitset_helper(bitset: &BitSet, max_doc: u32) {
let test_path = PathBuf::from("test");
let mut directory = RAMDirectory::create();
let mut directory = RAMDirectory::default();
{
let mut writer = directory.open_write(&*test_path).unwrap();
write_delete_bitset(bitset, max_doc, &mut writer).unwrap();

View File

@@ -371,16 +371,11 @@ impl IndexWriter {
})?;
}
let result = self
self
.segment_updater
.wait_merging_thread()
.map_err(|_| TantivyError::ErrorInThread("Failed to join merging thread.".into()));
.wait_merging_thread();
if let Err(ref e) = result {
error!("Some merging thread failed {:?}", e);
}
result
Ok(())
}
/// Creates a new segment.

View File

@@ -13,6 +13,9 @@ pub struct IndexWriterConfig {
pub max_indexing_threads: usize,
pub max_merging_threads: usize,
pub memory_budget: u64,
pub store_flush_num_bytes: u64,
pub persist_low: u64,
pub persist_high: u64,
}
impl Default for IndexWriterConfig {
@@ -21,6 +24,9 @@ impl Default for IndexWriterConfig {
max_indexing_threads: 1,
max_merging_threads: 3,
memory_budget: 50_000_000u64,
store_flush_num_bytes: 10_000_000u64,
persist_low: 10_000_000u64,
persist_high: 50_000_000u64
}
}
}
@@ -32,6 +38,9 @@ impl IndexWriterConfig {
max_indexing_threads: 1,
max_merging_threads: 5,
memory_budget: 4_000_000u64,
store_flush_num_bytes: 500_000u64,
persist_low: 2_000_000u64,
persist_high: 3_000_000u64,
}
}

View File

@@ -6,7 +6,7 @@ use std::collections::HashSet;
use std::fmt;
use std::ops::Deref;
#[derive(Default)]
#[derive(Default, Clone)]
pub(crate) struct MergeOperationInventory {
inventory: Inventory<InnerMergeOperation>,
num_merge_watcher: ResourceManager,
@@ -32,7 +32,7 @@ impl MergeOperationInventory {
}
pub fn wait_until_empty(&self) {
self.num_merge_watcher.wait_until_in_range(0..1);
let _ = self.num_merge_watcher.wait_until_in_range(0..1);
}
}

View File

@@ -19,6 +19,7 @@ mod segment_writer;
mod stamper;
pub(crate) use self::resource_manager::{Allocation, ResourceManager};
pub(crate) use self::merge_operation::MergeOperationInventory;
pub use self::index_writer::IndexWriter;
pub use self::index_writer_config::IndexWriterConfig;
pub use self::log_merge_policy::LogMergePolicy;

View File

@@ -1,12 +1,27 @@
use std::ops::RangeBounds;
use std::sync::{Arc, Condvar, Mutex, MutexGuard, RwLock};
struct LockedData {
count: u64,
enabled: bool
}
impl Default for LockedData {
fn default() -> Self {
LockedData {
count: 0u64,
enabled: true
}
}
}
#[derive(Default)]
struct Inner {
resource_level: Mutex<u64>,
resource_level: Mutex<LockedData>,
convdvar: Condvar,
}
/// The resource manager makes it possible to track the amount of level of a given resource.
/// There is no magic here : it is to the description of the user to declare how much
/// of the resource is being held.
@@ -29,10 +44,10 @@ pub struct ResourceManager {
impl ResourceManager {
/// Return the total amount of reousrce allocated
pub fn total_amount(&self) -> u64 {
*self.lock()
self.lock().count
}
fn lock(&self) -> MutexGuard<u64> {
fn lock(&self) -> MutexGuard<LockedData> {
self.inner
.resource_level
.lock()
@@ -44,8 +59,8 @@ impl ResourceManager {
return;
}
let mut lock = self.lock();
let new_val = (*lock as i64) + delta;
*lock = new_val as u64;
let new_val = lock.count as i64 + delta;
lock.count = new_val as u64;
self.inner.convdvar.notify_all();
}
@@ -61,17 +76,32 @@ impl ResourceManager {
}
}
/// Stops the resource manager.
///
/// If any thread is waiting via `.wait_until_in_range(...)`, the method will stop
/// being blocking and will return an error.
pub fn terminate(&self) {
self.lock().enabled = false;
self.inner.convdvar.notify_all();
}
/// Blocks the current thread until the resource level reaches the given range,
/// in a cpu-efficient way.
///
/// This method does not necessarily wakes up the current thread at every transition
/// into the targetted range, but any durable entry in the range will be detected.
pub fn wait_until_in_range<R: RangeBounds<u64>>(&self, range: R) -> u64 {
pub fn wait_until_in_range<R: RangeBounds<u64>>(&self, range: R) -> Result<u64, u64> {
let mut levels = self.lock();
while !range.contains(&*levels) {
levels = self.inner.convdvar.wait(levels).unwrap();
if !levels.enabled {
return Err(levels.count)
}
*levels
while !range.contains(&levels.count) {
levels = self.inner.convdvar.wait(levels).unwrap();
if !levels.enabled {
return Err(levels.count)
}
}
Ok(levels.count)
}
}
@@ -149,7 +179,7 @@ mod tests {
std::mem::drop(_allocation3);
assert!(block_on(recv).is_ok());
});
assert_eq!(memory.wait_until_in_range(5u64..8u64), 5u64);
assert_eq!(memory.wait_until_in_range(5u64..8u64), Ok(5u64));
assert!(send.send(()).is_ok());
}
@@ -166,4 +196,18 @@ mod tests {
assert_eq!(alloc.amount(), 14u64);
assert_eq!(memory.total_amount(), 14u64 + 3u64)
}
#[test]
fn test_stop_resource_manager() {
let resource_manager = ResourceManager::default();
let resource_manager_clone = resource_manager.clone();
let (sender, recv) = oneshot::channel();
let join_handle = thread::spawn(move || {
assert!(sender.send(()).is_ok());
resource_manager_clone.wait_until_in_range(10..20)
});
let _ = block_on(recv);
resource_manager.terminate();
assert_eq!(join_handle.join().unwrap(), Err(0u64));
}
}

View File

@@ -2,8 +2,8 @@ use super::segment_register::SegmentRegister;
use crate::core::SegmentId;
use crate::core::SegmentMeta;
use crate::error::TantivyError;
use crate::indexer::SegmentEntry;
use crate::Segment;
use crate::indexer::{SegmentEntry, MergeOperationInventory, MergeCandidate, MergeOperation};
use crate::{Segment, Opstamp};
use std::collections::hash_set::HashSet;
use std::fmt::{self, Debug, Formatter};
use std::sync::{Arc, RwLock};
@@ -56,6 +56,7 @@ impl SegmentRegisters {
#[derive(Default)]
pub struct SegmentManager {
registers: Arc<RwLock<SegmentRegisters>>,
merge_operations: MergeOperationInventory,
}
impl Debug for SegmentManager {
@@ -69,24 +70,26 @@ impl Debug for SegmentManager {
}
}
pub fn get_mergeable_segments(
in_merge_segment_ids: &HashSet<SegmentId>,
segment_manager: &SegmentManager,
) -> (Vec<SegmentMeta>, Vec<SegmentMeta>) {
let registers_lock = segment_manager.read();
(
registers_lock
.committed
.get_mergeable_segments(in_merge_segment_ids),
registers_lock
.uncommitted
.get_mergeable_segments(in_merge_segment_ids),
)
}
impl SegmentManager {
pub(crate) fn new(registers: Arc<RwLock<SegmentRegisters>>) -> SegmentManager {
SegmentManager { registers }
SegmentManager {
registers,
merge_operations: Default::default()
}
}
pub fn new_merge_operation(&self, opstamp: Opstamp, merge_candidate: MergeCandidate) -> MergeOperation {
MergeOperation::new(
&self.merge_operations,
opstamp,
merge_candidate.0
)
}
pub fn wait_merging_thread(&self) {
self.merge_operations.wait_until_empty()
}
/// Returns all of the segment entries (committed or uncommitted)
@@ -97,6 +100,19 @@ impl SegmentManager {
segment_entries
}
pub fn get_mergeable_segments(&self) -> (Vec<SegmentMeta>, Vec<SegmentMeta>) {
let in_merge_segment_ids: HashSet<SegmentId> = self.merge_operations.segment_in_merge();
let registers_lock = self.read();
(
registers_lock
.committed
.get_mergeable_segments(&in_merge_segment_ids),
registers_lock
.uncommitted
.get_mergeable_segments(&in_merge_segment_ids),
)
}
// Lock poisoning should never happen :
// The lock is acquired and released within this class,
// and the operations cannot panic.

View File

@@ -1,4 +1,4 @@
use super::segment_manager::{get_mergeable_segments, SegmentManager};
use super::segment_manager::SegmentManager;
use crate::core::Index;
use crate::core::IndexMeta;
use crate::core::Segment;
@@ -8,7 +8,6 @@ use crate::core::SerializableSegment;
use crate::core::META_FILEPATH;
use crate::directory::{Directory, DirectoryClone, GarbageCollectionResult};
use crate::indexer::index_writer::advance_deletes;
use crate::indexer::merge_operation::MergeOperationInventory;
use crate::indexer::merger::IndexMerger;
use crate::indexer::segment_manager::{SegmentRegisters, SegmentsStatus};
use crate::indexer::stamper::Stamper;
@@ -160,7 +159,6 @@ pub(crate) struct InnerSegmentUpdater {
merge_policy: RwLock<Arc<Box<dyn MergePolicy>>>,
killed: AtomicBool,
stamper: Stamper,
merge_operations: MergeOperationInventory,
}
impl SegmentUpdater {
@@ -198,7 +196,6 @@ impl SegmentUpdater {
merge_policy: RwLock::new(Arc::new(Box::new(DefaultMergePolicy::default()))),
killed: AtomicBool::new(false),
stamper,
merge_operations: Default::default(),
})))
}
@@ -363,7 +360,7 @@ impl SegmentUpdater {
pub(crate) fn make_merge_operation(&self, segment_ids: &[SegmentId]) -> MergeOperation {
let commit_opstamp = self.load_metas().opstamp;
MergeOperation::new(&self.merge_operations, commit_opstamp, segment_ids.to_vec())
self.segment_manager.new_merge_operation(commit_opstamp, MergeCandidate(segment_ids.to_vec()))
}
// Starts a merge operation. This function will block until the merge operation is effectively
@@ -437,9 +434,8 @@ impl SegmentUpdater {
}
async fn consider_merge_options(&self) {
let merge_segment_ids: HashSet<SegmentId> = self.merge_operations.segment_in_merge();
let (committed_segments, uncommitted_segments) =
get_mergeable_segments(&merge_segment_ids, &self.segment_manager);
self.segment_manager.get_mergeable_segments();
// Committed segments cannot be merged with uncommitted_segments.
// We therefore consider merges using these two sets of segments independently.
@@ -450,7 +446,7 @@ impl SegmentUpdater {
.compute_merge_candidates(&uncommitted_segments)
.into_iter()
.map(|merge_candidate| {
MergeOperation::new(&self.merge_operations, current_opstamp, merge_candidate.0)
self.segment_manager.new_merge_operation(current_opstamp, merge_candidate)
})
.collect();
@@ -459,7 +455,7 @@ impl SegmentUpdater {
.compute_merge_candidates(&committed_segments)
.into_iter()
.map(|merge_candidate: MergeCandidate| {
MergeOperation::new(&self.merge_operations, commit_opstamp, merge_candidate.0)
self.segment_manager.new_merge_operation(commit_opstamp, merge_candidate)
})
.collect::<Vec<_>>();
merge_candidates.extend(committed_merge_candidates.into_iter());
@@ -539,9 +535,8 @@ impl SegmentUpdater {
///
/// Obsolete files will eventually be cleaned up
/// by the directory garbage collector.
pub fn wait_merging_thread(&self) -> crate::Result<()> {
self.merge_operations.wait_until_empty();
Ok(())
pub fn wait_merging_thread(&self) {
self.segment_manager.wait_merging_thread()
}
}