From dff022b30aff6bcd4df7e908f6fa2f86e551204b Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 28 Nov 2016 00:13:22 +0900 Subject: [PATCH] NOBUG Added setting merge policy. --- src/indexer/index_writer.rs | 35 +++++++++++++++++++++++++++++++-- src/indexer/log_merge_policy.rs | 11 +++++++++-- src/indexer/merge_policy.rs | 22 +++++++++++++++++++-- src/indexer/mod.rs | 2 ++ src/indexer/segment_updater.rs | 18 +++++++++-------- src/lib.rs | 10 ++++++++++ 6 files changed, 84 insertions(+), 14 deletions(-) diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index c9e64c9d2..a7aaaf9b0 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -5,6 +5,7 @@ use core::SerializableSegment; use core::Index; use core::Segment; use std::thread::JoinHandle; +use indexer::{MergePolicy, DefaultMergePolicy}; use indexer::SegmentWriter; use super::directory_lock::DirectoryLock; use std::clone::Clone; @@ -15,6 +16,7 @@ use indexer::merger::IndexMerger; use core::SegmentId; use datastruct::stacker::Heap; use std::mem::swap; +use std::sync::{Arc, Mutex}; use chan; use core::SegmentMeta; use super::segment_updater::{SegmentUpdater, SegmentUpdate, SegmentUpdateSender}; @@ -53,6 +55,8 @@ pub struct IndexWriter { // lifetime of the lock with that of the IndexWriter. _directory_lock: DirectoryLock, + _merge_policy: Arc>>, + index: Index, heap_size_in_bytes_per_thread: usize, @@ -204,12 +208,16 @@ impl IndexWriter { let (document_sender, document_receiver): (DocumentSender, DocumentReceiver) = chan::sync(PIPELINE_MAX_SIZE_IN_DOCS); - let (segment_update_sender, segment_update_thread) = SegmentUpdater::start_updater(index.clone()); + let merge_policy: Arc>> = Arc::new(Mutex::new(box DefaultMergePolicy::default())); + + let (segment_update_sender, segment_update_thread) = SegmentUpdater::start_updater(index.clone(), merge_policy.clone()); let mut index_writer = IndexWriter { _directory_lock: directory_lock, + _merge_policy: merge_policy, + heap_size_in_bytes_per_thread: heap_size_in_bytes_per_thread, index: index.clone(), @@ -229,7 +237,18 @@ impl IndexWriter { try!(index_writer.start_workers()); Ok(index_writer) } - + + + /// Returns a clone of the index_writer merge policy. + pub fn get_merge_policy(&self) -> Box { + self._merge_policy.lock().unwrap().box_clone() + } + + /// Set the merge policy. + pub fn set_merge_policy(&self, merge_policy: Box) { + *self._merge_policy.lock().unwrap() = merge_policy; + } + fn start_workers(&mut self) -> Result<()> { for _ in 0..self.num_threads { try!(self.add_indexing_worker()); @@ -445,6 +464,7 @@ mod tests { use Index; use Term; use Error; + use indexer::NoMergePolicy; #[test] fn test_lockfile_stops_duplicates() { @@ -456,6 +476,17 @@ mod tests { _ => panic!("Expected FileAlreadyExists error"), } } + + #[test] + fn test_set_merge_policy() { + let schema_builder = schema::SchemaBuilder::default(); + let index = Index::create_in_ram(schema_builder.build()); + let index_writer = index.writer(40_000_000).unwrap(); + assert_eq!(format!("{:?}", index_writer.get_merge_policy()), "LogMergePolicy { min_merge_size: 8, min_layer_size: 10000, level_log_size: 0.75 }"); + let merge_policy = box NoMergePolicy::default(); + index_writer.set_merge_policy(merge_policy); + assert_eq!(format!("{:?}", index_writer.get_merge_policy()), "NoMergePolicy"); + } #[test] fn test_lockfile_released_on_drop() { diff --git a/src/indexer/log_merge_policy.rs b/src/indexer/log_merge_policy.rs index 1a1b62dbd..5ca049e4c 100644 --- a/src/indexer/log_merge_policy.rs +++ b/src/indexer/log_merge_policy.rs @@ -8,6 +8,10 @@ const DEFAULT_LEVEL_LOG_SIZE: f64 = 0.75; const DEFAULT_MIN_LAYER_SIZE: u32 = 10_000; const DEFAULT_MIN_MERGE_SIZE: usize = 8; + +/// LogMergePolicy tries tries to merge segments that have a similar number of +/// documents. +#[derive(Debug, Clone)] pub struct LogMergePolicy { min_merge_size: usize, min_layer_size: u32, @@ -20,7 +24,7 @@ impl LogMergePolicy { } /// Set the minimum number of segment that may be merge together. - pub fn set_min_merge_size(&mut self, min_merge_size: usize) { + pub fn set_min_merge_size(&mut self, min_merge_size: usize) { self.min_merge_size = min_merge_size; } @@ -30,7 +34,6 @@ impl LogMergePolicy { self.min_layer_size = min_layer_size; } - /// Set the ratio between two consecutive levels. /// /// Segment are group in levels according to their sizes. @@ -83,6 +86,10 @@ impl MergePolicy for LogMergePolicy { result } + + fn box_clone(&self) -> Box { + box self.clone() + } } impl Default for LogMergePolicy { diff --git a/src/indexer/merge_policy.rs b/src/indexer/merge_policy.rs index cbf55ec34..22a767042 100644 --- a/src/indexer/merge_policy.rs +++ b/src/indexer/merge_policy.rs @@ -1,15 +1,30 @@ use core::SegmentId; use core::SegmentMeta; use std::marker; +use std::fmt::Debug; +/// Set of segment suggested for a merge. #[derive(Debug, Clone)] pub struct MergeCandidate(pub Vec); -pub trait MergePolicy: marker::Send { + +/// The Merge policy defines which segments should be merged. +/// +/// Every time a the list of segments changes, the segment updater +/// asks the merge policy if some segments should be merged. +pub trait MergePolicy: marker::Send + Debug { + /// Given the list of segment metas, returns the list of merge candidates. + /// + /// This call happens on the segment updater thread, and will block + /// other segment updates, so all implementations should happen rapidly. fn compute_merge_candidates(&self, segments: &[SegmentMeta]) -> Vec; + /// Returns a boxed clone of the MergePolicy. + fn box_clone(&self) -> Box; } +/// Never merge segments. +#[derive(Debug)] pub struct NoMergePolicy; impl Default for NoMergePolicy { @@ -18,10 +33,13 @@ impl Default for NoMergePolicy { } } - impl MergePolicy for NoMergePolicy { fn compute_merge_candidates(&self, _segments: &[SegmentMeta]) -> Vec { Vec::new() } + + fn box_clone(&self) -> Box { + box NoMergePolicy + } } diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 804e4d8a5..c71c07fb4 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -17,4 +17,6 @@ pub use self::log_merge_policy::LogMergePolicy; pub use self::merge_policy::{NoMergePolicy, MergeCandidate, MergePolicy}; pub use self::segment_manager::SegmentManager; + +/// Alias for the default merge policy, which is the LogMergePolicy. pub type DefaultMergePolicy = LogMergePolicy; diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 89b66f5e7..2a1b3e577 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -2,12 +2,13 @@ use chan; use core::Index; +use std::sync::Mutex; use core::Segment; use core::SegmentId; use core::SegmentMeta; use std::mem; use core::SerializableSegment; -use indexer::{DefaultMergePolicy, MergePolicy}; +use indexer::MergePolicy; use indexer::MergeCandidate; use indexer::merger::IndexMerger; use indexer::SegmentSerializer; @@ -135,7 +136,7 @@ pub struct SegmentUpdater { segment_update_receiver: SegmentUpdateReceiver, segment_update_sender: SegmentUpdateSender, segment_manager_arc: Arc, - merge_policy: Box, + merge_policy: Arc>>, merging_thread_id: usize, merging_threads: HashMap, SegmentMeta)> >, } @@ -143,12 +144,12 @@ pub struct SegmentUpdater { impl SegmentUpdater { - pub fn start_updater(index: Index) -> (SegmentUpdateSender, JoinHandle<()>) { - let segment_updater = SegmentUpdater::new(index); + pub fn start_updater(index: Index, merge_policy: Arc>>) -> (SegmentUpdateSender, JoinHandle<()>) { + let segment_updater = SegmentUpdater::new(index, merge_policy); (segment_updater.segment_update_sender.clone(), segment_updater.start()) } - fn new(index: Index) -> SegmentUpdater { + fn new(index: Index, merge_policy: Arc>>) -> SegmentUpdater { let segment_manager_arc = get_segment_manager(&index); let (segment_update_sender, segment_update_receiver): (SegmentUpdateSender, SegmentUpdateReceiver) = chan::async(); SegmentUpdater { @@ -157,7 +158,7 @@ impl SegmentUpdater { segment_update_sender: segment_update_sender, segment_update_receiver: segment_update_receiver, segment_manager_arc: segment_manager_arc, - merge_policy: Box::new(DefaultMergePolicy::default()), // TODO make that configurable + merge_policy: merge_policy, merging_thread_id: 0, merging_threads: HashMap::new(), } @@ -236,8 +237,9 @@ impl SegmentUpdater { let (committed_segments, uncommitted_segments) = get_segment_ready_for_commit(segment_manager); // Committed segments cannot be merged with uncommitted_segments. // We therefore consider merges using these two sets of segments independantly. - let mut merge_candidates = self.merge_policy.compute_merge_candidates(&uncommitted_segments); - let committed_merge_candidates = self.merge_policy.compute_merge_candidates(&committed_segments); + let merge_policy_lock = self.merge_policy.lock().unwrap(); + let mut merge_candidates = merge_policy_lock.compute_merge_candidates(&uncommitted_segments); + let committed_merge_candidates = merge_policy_lock.compute_merge_candidates(&committed_segments); merge_candidates.extend_from_slice(&committed_merge_candidates[..]); merge_candidates } diff --git a/src/lib.rs b/src/lib.rs index 24dbff083..8f6c78902 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -112,6 +112,16 @@ pub use postings::Postings; pub use postings::SegmentPostingsOption; + +/// Tantivy's makes it possible to personalize when +/// the indexer should merge its segments +pub mod merge_policy { + pub use indexer::MergePolicy; + pub use indexer::LogMergePolicy; + pub use indexer::NoMergePolicy; + pub use indexer::DefaultMergePolicy; +} + /// u32 identifying a document within a segment. /// Documents have their doc id assigned incrementally, /// as they are added in the segment.