Compare commits

...

1 Commits

Author SHA1 Message Date
Paul Masurel
52c24e1c06 first stab 2019-11-01 09:05:35 +09:00
4 changed files with 16 additions and 1 deletions

View File

@@ -732,7 +732,7 @@ impl IndexWriter {
}
UserOperation::Add(document) => {
let add_operation = AddOperation { opstamp, document };
adds.push(add_operation);
adds.push(add_operpation);
}
}
}

View File

@@ -7,6 +7,10 @@ use std::collections::HashSet;
pub struct MergeOperationInventory(Inventory<InnerMergeOperation>);
impl MergeOperationInventory {
pub fn num_merge_operations(&self) -> usize {
self.0.list().len()
}
pub fn segment_in_merge(&self) -> HashSet<SegmentId> {
let mut segment_in_merge = HashSet::default();
for merge_op in self.0.list() {

View File

@@ -12,6 +12,10 @@ pub struct MergeCandidate(pub Vec<SegmentId>);
/// Every time a the list of segments changes, the segment updater
/// asks the merge policy if some segments should be merged.
pub trait MergePolicy: marker::Send + marker::Sync + Debug {
fn maximum_num_threads(&self) -> Option<usize> {
None
}
/// Given the list of segment metas, returns the list of merge candidates.
///
/// This call happens on the segment updater thread, and will block

View File

@@ -39,6 +39,7 @@ use std::sync::Arc;
use std::sync::RwLock;
use std::thread;
use std::thread::JoinHandle;
use std::time::Duration;
/// Save the index meta file.
/// This operation is atomic :
@@ -200,6 +201,12 @@ impl SegmentUpdater {
}
pub fn add_segment(&self, segment_entry: SegmentEntry) -> bool {
let max_num_threads_opt = self.0.merge_policy.read().unwrap().maximum_num_threads();
if let Some(max_num_threads) = max_num_threads_opt {
while self.0.merge_operations.num_merge_operations() >= max_num_threads_opt {
std::thread::sleep(Duration::from_secs(1u64));
}
}
self.run_async(|segment_updater| {
segment_updater.0.segment_manager.add_segment(segment_entry);
segment_updater.consider_merge_options();