From bf94fd77db8068b8257ae3bb2e10b3aa5ff4b601 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Sun, 27 Jan 2019 12:18:59 +0900 Subject: [PATCH] Issue/471 (#481) * Closes 471 Removing writing_segments in the segment manager as it is now useless. Removing the target merged segment id as it is useless as well. * RAII for tracking which segment is in merge. Closes #471 * fmt * Using Inventory::default(). --- Cargo.toml | 2 +- src/directory/managed_directory.rs | 3 + src/indexer/index_writer.rs | 17 ++- src/indexer/merge_operation.rs | 64 +++++++++++ src/indexer/merge_policy.rs | 17 +-- src/indexer/mod.rs | 5 +- src/indexer/segment_entry.rs | 50 +-------- src/indexer/segment_manager.rs | 72 ++---------- src/indexer/segment_register.rs | 70 ++---------- src/indexer/segment_updater.rs | 172 +++++++++++++---------------- src/tokenizer/lower_caser.rs | 1 - 11 files changed, 182 insertions(+), 291 deletions(-) create mode 100644 src/indexer/merge_operation.rs diff --git a/Cargo.toml b/Cargo.toml index 11c2af204..98a53b462 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,7 +42,7 @@ rust-stemmers = "1" downcast = { version="0.9" } matches = "0.1" bitpacking = "0.5" -census = "0.1" +census = "0.2" fnv = "1.0.6" owned-read = "0.4" failure = "0.1" diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs index a32e4bed8..19e76556e 100644 --- a/src/directory/managed_directory.rs +++ b/src/directory/managed_directory.rs @@ -92,6 +92,9 @@ impl ManagedDirectory { /// /// * `living_files` - List of files that are still used by the index. /// + /// The use a callback ensures that the list of living_files is computed + /// while we hold the lock on meta. + /// /// This method does not panick nor returns errors. /// If a file cannot be deleted (for permission reasons for instance) /// an error is simply logged, and the file remains in the list of managed diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 125e56a9f..e07bce772 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -25,6 +25,7 @@ use schema::Document; use schema::IndexRecordOption; use schema::Term; use std::mem; +use std::sync::Arc; use std::thread; use std::thread::JoinHandle; use Result; @@ -366,13 +367,16 @@ impl IndexWriter { .add_segment(self.generation, segment_entry); } - /// *Experimental & Advanced API* Creates a new segment. - /// and marks it as currently in write. + /// Creates a new segment. /// /// This method is useful only for users trying to do complex /// operations, like converting an index format to another. + /// + /// It is safe to start writing file associated to the new `Segment`. + /// These will not be garbage collected as long as an instance object of + /// `SegmentMeta` object associated to the new `Segment` is "alive". pub fn new_segment(&self) -> Segment { - self.segment_updater.new_segment() + self.index.new_segment() } /// Spawns a new worker thread for indexing. @@ -387,6 +391,7 @@ impl IndexWriter { let mut delete_cursor = self.delete_queue.cursor(); let mem_budget = self.heap_size_in_bytes_per_thread; + let index = self.index.clone(); let join_handle: JoinHandle> = thread::Builder::new() .name(format!( "thrd-tantivy-index{}-gen{}", @@ -412,7 +417,7 @@ impl IndexWriter { // was dropped. return Ok(()); } - let segment = segment_updater.new_segment(); + let segment = index.new_segment(); index_documents( mem_budget, &segment, @@ -429,7 +434,7 @@ impl IndexWriter { } /// Accessor to the merge policy. - pub fn get_merge_policy(&self) -> Box { + pub fn get_merge_policy(&self) -> Arc> { self.segment_updater.get_merge_policy() } @@ -733,7 +738,7 @@ mod tests { index_writer.add_document(doc!(text_field=>"b")); index_writer.add_document(doc!(text_field=>"c")); } - assert_eq!(index_writer.commit().unwrap(), 3u64); + assert!(index_writer.commit().is_ok()); index.load_searchers().unwrap(); assert_eq!(num_docs_containing("a"), 0); assert_eq!(num_docs_containing("b"), 1); diff --git a/src/indexer/merge_operation.rs b/src/indexer/merge_operation.rs new file mode 100644 index 000000000..9d7bcbca6 --- /dev/null +++ b/src/indexer/merge_operation.rs @@ -0,0 +1,64 @@ +use census::{Inventory, TrackedObject}; +use std::collections::HashSet; +use SegmentId; + +#[derive(Default)] +pub struct MergeOperationInventory(Inventory); + +impl MergeOperationInventory { + pub fn segment_in_merge(&self) -> HashSet { + let mut segment_in_merge = HashSet::default(); + for merge_op in self.0.list() { + for &segment_id in &merge_op.segment_ids { + segment_in_merge.insert(segment_id); + } + } + segment_in_merge + } +} + +/// A `MergeOperation` has two role. +/// It carries all of the information required to describe a merge : +/// - `target_opstamp` is the opstamp up to which we want to consume the +/// delete queue and reflect their deletes. +/// - `segment_ids` is the list of segment to be merged. +/// +/// The second role is to ensure keep track of the fact that these +/// segments are in merge and avoid starting a merge operation that +/// may conflict with this one. +/// +/// This works by tracking merge operations. When considering computing +/// merge candidates, we simply list tracked merge operations and remove +/// their segments from possible merge candidates. +pub struct MergeOperation { + inner: TrackedObject, +} + +struct InnerMergeOperation { + target_opstamp: u64, + segment_ids: Vec, +} + +impl MergeOperation { + pub fn new( + inventory: &MergeOperationInventory, + target_opstamp: u64, + segment_ids: Vec, + ) -> MergeOperation { + let inner_merge_operation = InnerMergeOperation { + target_opstamp, + segment_ids, + }; + MergeOperation { + inner: inventory.0.track(inner_merge_operation), + } + } + + pub fn target_opstamp(&self) -> u64 { + self.inner.target_opstamp + } + + pub fn segment_ids(&self) -> &[SegmentId] { + &self.inner.segment_ids[..] + } +} diff --git a/src/indexer/merge_policy.rs b/src/indexer/merge_policy.rs index 407cb94bb..8dac58c9f 100644 --- a/src/indexer/merge_policy.rs +++ b/src/indexer/merge_policy.rs @@ -11,7 +11,7 @@ pub struct MergeCandidate(pub Vec); /// /// Every time a the list of segments changes, the segment updater /// asks the merge policy if some segments should be merged. -pub trait MergePolicy: MergePolicyClone + marker::Send + marker::Sync + Debug { +pub trait MergePolicy: marker::Send + marker::Sync + Debug { /// Given the list of segment metas, returns the list of merge candidates. /// /// This call happens on the segment updater thread, and will block @@ -19,21 +19,6 @@ pub trait MergePolicy: MergePolicyClone + marker::Send + marker::Sync + Debug { fn compute_merge_candidates(&self, segments: &[SegmentMeta]) -> Vec; } -/// MergePolicyClone -pub trait MergePolicyClone { - /// Returns a boxed clone of the MergePolicy. - fn box_clone(&self) -> Box; -} - -impl MergePolicyClone for T -where - T: 'static + MergePolicy + Clone, -{ - fn box_clone(&self) -> Box { - Box::new(self.clone()) - } -} - /// Never merge segments. #[derive(Debug, Clone)] pub struct NoMergePolicy; diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 3d29b38c0..2669d0df6 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -3,6 +3,7 @@ mod directory_lock; mod doc_opstamp_mapping; pub mod index_writer; mod log_merge_policy; +mod merge_operation; pub mod merge_policy; pub mod merger; pub mod operation; @@ -17,12 +18,12 @@ mod stamper; pub(crate) use self::directory_lock::DirectoryLock; pub use self::directory_lock::LockType; - pub use self::index_writer::IndexWriter; pub use self::log_merge_policy::LogMergePolicy; +pub use self::merge_operation::{MergeOperation, MergeOperationInventory}; pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy}; pub use self::prepared_commit::PreparedCommit; -pub use self::segment_entry::{SegmentEntry, SegmentState}; +pub use self::segment_entry::SegmentEntry; pub use self::segment_manager::SegmentManager; pub use self::segment_serializer::SegmentSerializer; pub use self::segment_writer::SegmentWriter; diff --git a/src/indexer/segment_entry.rs b/src/indexer/segment_entry.rs index 7e23940d5..34bbaf8c2 100644 --- a/src/indexer/segment_entry.rs +++ b/src/indexer/segment_entry.rs @@ -4,21 +4,6 @@ use core::SegmentMeta; use indexer::delete_queue::DeleteCursor; use std::fmt; -#[derive(Clone, Copy, PartialEq, Eq, Debug)] -pub enum SegmentState { - Ready, - InMerge, -} - -impl SegmentState { - pub fn letter_code(self) -> char { - match self { - SegmentState::InMerge => 'M', - SegmentState::Ready => 'R', - } - } -} - /// A segment entry describes the state of /// a given segment, at a given instant. /// @@ -35,7 +20,6 @@ impl SegmentState { #[derive(Clone)] pub struct SegmentEntry { meta: SegmentMeta, - state: SegmentState, delete_bitset: Option, delete_cursor: DeleteCursor, } @@ -49,7 +33,6 @@ impl SegmentEntry { ) -> SegmentEntry { SegmentEntry { meta: segment_meta, - state: SegmentState::Ready, delete_bitset, delete_cursor, } @@ -72,14 +55,6 @@ impl SegmentEntry { &mut self.delete_cursor } - /// Return the `SegmentEntry`. - /// - /// The state describes whether the segment is available for - /// a merge or not. - pub fn state(&self) -> SegmentState { - self.state - } - /// Returns the segment id. pub fn segment_id(&self) -> SegmentId { self.meta.id() @@ -89,33 +64,10 @@ impl SegmentEntry { pub fn meta(&self) -> &SegmentMeta { &self.meta } - - /// Mark the `SegmentEntry` as in merge. - /// - /// Only segments that are not already - /// in a merge are elligible for future merge. - pub fn start_merge(&mut self) { - self.state = SegmentState::InMerge; - } - - /// Cancel a merge - /// - /// If a merge fails, it is important to switch - /// the segment back to a idle state, so that it - /// may be elligible for future merges. - pub fn cancel_merge(&mut self) { - self.state = SegmentState::Ready; - } - - /// Returns true iff a segment should - /// be considered for a merge. - pub fn is_ready(&self) -> bool { - self.state == SegmentState::Ready - } } impl fmt::Debug for SegmentEntry { fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - write!(formatter, "SegmentEntry({:?}, {:?})", self.meta, self.state) + write!(formatter, "SegmentEntry({:?})", self.meta) } } diff --git a/src/indexer/segment_manager.rs b/src/indexer/segment_manager.rs index c0089b262..4e3a7e7e4 100644 --- a/src/indexer/segment_manager.rs +++ b/src/indexer/segment_manager.rs @@ -16,7 +16,6 @@ use Result as TantivyResult; struct SegmentRegisters { uncommitted: SegmentRegister, committed: SegmentRegister, - writing: HashSet, } /// The segment manager stores the list of segments @@ -41,12 +40,17 @@ impl Debug for SegmentManager { } pub fn get_mergeable_segments( + in_merge_segment_ids: &HashSet, segment_manager: &SegmentManager, ) -> (Vec, Vec) { let registers_lock = segment_manager.read(); ( - registers_lock.committed.get_mergeable_segments(), - registers_lock.uncommitted.get_mergeable_segments(), + registers_lock + .committed + .get_mergeable_segments(in_merge_segment_ids), + registers_lock + .uncommitted + .get_mergeable_segments(in_merge_segment_ids), ) } @@ -59,7 +63,6 @@ impl SegmentManager { registers: RwLock::new(SegmentRegisters { uncommitted: SegmentRegister::default(), committed: SegmentRegister::new(segment_metas, delete_cursor), - writing: HashSet::new(), }), } } @@ -72,12 +75,6 @@ impl SegmentManager { segment_entries } - /// Returns the overall number of segments in the `SegmentManager` - pub fn num_segments(&self) -> usize { - let registers_lock = self.read(); - registers_lock.committed.len() + registers_lock.uncommitted.len() - } - /// List the files that are useful to the index. /// /// This does not include lock files, or files that are obsolete @@ -136,25 +133,22 @@ impl SegmentManager { /// the `segment_ids` are not either all committed or all /// uncommitted. pub fn start_merge(&self, segment_ids: &[SegmentId]) -> TantivyResult> { - let mut registers_lock = self.write(); + let registers_lock = self.read(); let mut segment_entries = vec![]; if registers_lock.uncommitted.contains_all(segment_ids) { for segment_id in segment_ids { let segment_entry = registers_lock.uncommitted - .start_merge(segment_id) + .get(segment_id) .expect("Segment id not found {}. Should never happen because of the contains all if-block."); segment_entries.push(segment_entry); } } else if registers_lock.committed.contains_all(segment_ids) { for segment_id in segment_ids { let segment_entry = registers_lock.committed - .start_merge(segment_id) + .get(segment_id) .expect("Segment id not found {}. Should never happen because of the contains all if-block."); segment_entries.push(segment_entry); } - for segment_id in segment_ids { - registers_lock.committed.start_merge(segment_id); - } } else { let error_msg = "Merge operation sent for segments that are not \ all uncommited or commited." @@ -164,50 +158,8 @@ impl SegmentManager { Ok(segment_entries) } - pub fn cancel_merge( - &self, - before_merge_segment_ids: &[SegmentId], - after_merge_segment_id: SegmentId, - ) { - let mut registers_lock = self.write(); - - // we mark all segments are ready for merge. - { - let target_segment_register: &mut SegmentRegister; - target_segment_register = { - if registers_lock - .uncommitted - .contains_all(before_merge_segment_ids) - { - &mut registers_lock.uncommitted - } else if registers_lock - .committed - .contains_all(before_merge_segment_ids) - { - &mut registers_lock.committed - } else { - warn!("couldn't find segment in SegmentManager"); - return; - } - }; - for segment_id in before_merge_segment_ids { - target_segment_register.cancel_merge(segment_id); - } - } - - // ... and we make sure the target segment entry - // can be garbage collected. - registers_lock.writing.remove(&after_merge_segment_id); - } - - pub fn write_segment(&self, segment_id: SegmentId) { - let mut registers_lock = self.write(); - registers_lock.writing.insert(segment_id); - } - pub fn add_segment(&self, segment_entry: SegmentEntry) { let mut registers_lock = self.write(); - registers_lock.writing.remove(&segment_entry.segment_id()); registers_lock.uncommitted.add_segment_entry(segment_entry); } @@ -217,10 +169,6 @@ impl SegmentManager { after_merge_segment_entry: SegmentEntry, ) { let mut registers_lock = self.write(); - registers_lock - .writing - .remove(&after_merge_segment_entry.segment_id()); - let target_register: &mut SegmentRegister = { if registers_lock .uncommitted diff --git a/src/indexer/segment_register.rs b/src/indexer/segment_register.rs index c0c883e15..2133989ce 100644 --- a/src/indexer/segment_register.rs +++ b/src/indexer/segment_register.rs @@ -3,6 +3,7 @@ use core::SegmentMeta; use indexer::delete_queue::DeleteCursor; use indexer::segment_entry::SegmentEntry; use std::collections::HashMap; +use std::collections::HashSet; use std::fmt::{self, Debug, Formatter}; /// The segment register keeps track @@ -21,8 +22,8 @@ pub struct SegmentRegister { impl Debug for SegmentRegister { fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> { write!(f, "SegmentRegister(")?; - for (k, v) in &self.segment_states { - write!(f, "{}:{}, ", k.short_uuid_string(), v.state().letter_code())?; + for k in self.segment_states.keys() { + write!(f, "{}, ", k.short_uuid_string())?; } write!(f, ")")?; Ok(()) @@ -34,14 +35,13 @@ impl SegmentRegister { self.segment_states.clear(); } - pub fn len(&self) -> usize { - self.segment_states.len() - } - - pub fn get_mergeable_segments(&self) -> Vec { + pub fn get_mergeable_segments( + &self, + in_merge_segment_ids: &HashSet, + ) -> Vec { self.segment_states .values() - .filter(|segment_entry| segment_entry.is_ready()) + .filter(|segment_entry| !in_merge_segment_ids.contains(&segment_entry.segment_id())) .map(|segment_entry| segment_entry.meta().clone()) .collect() } @@ -60,7 +60,7 @@ impl SegmentRegister { segment_ids } - pub fn contains_all(&mut self, segment_ids: &[SegmentId]) -> bool { + pub fn contains_all(&self, segment_ids: &[SegmentId]) -> bool { segment_ids .iter() .all(|segment_id| self.segment_states.contains_key(segment_id)) @@ -75,20 +75,10 @@ impl SegmentRegister { self.segment_states.remove(segment_id); } - pub fn cancel_merge(&mut self, segment_id: &SegmentId) { + pub fn get(&self, segment_id: &SegmentId) -> Option { self.segment_states - .get_mut(segment_id) - .expect("Received a merge notification for a segment that is not registered") - .cancel_merge(); - } - - pub fn start_merge(&mut self, segment_id: &SegmentId) -> Option { - if let Some(segment_entry) = self.segment_states.get_mut(segment_id) { - segment_entry.start_merge(); - Some(segment_entry.clone()) - } else { - None - } + .get(segment_id) + .map(|segment_entry| segment_entry.clone()) } pub fn new(segment_metas: Vec, delete_cursor: &DeleteCursor) -> SegmentRegister { @@ -100,11 +90,6 @@ impl SegmentRegister { } SegmentRegister { segment_states } } - - #[cfg(test)] - pub fn segment_entry(&self, segment_id: &SegmentId) -> Option { - self.segment_states.get(segment_id).cloned() - } } #[cfg(test)] @@ -113,7 +98,6 @@ mod tests { use core::SegmentId; use core::SegmentMeta; use indexer::delete_queue::*; - use indexer::SegmentState; fn segment_ids(segment_register: &SegmentRegister) -> Vec { segment_register @@ -137,42 +121,12 @@ mod tests { let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None); segment_register.add_segment_entry(segment_entry); } - assert_eq!( - segment_register - .segment_entry(&segment_id_a) - .unwrap() - .state(), - SegmentState::Ready - ); assert_eq!(segment_ids(&segment_register), vec![segment_id_a]); { let segment_meta = SegmentMeta::new(segment_id_b, 0u32); let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None); segment_register.add_segment_entry(segment_entry); } - assert_eq!( - segment_register - .segment_entry(&segment_id_b) - .unwrap() - .state(), - SegmentState::Ready - ); - segment_register.start_merge(&segment_id_a); - segment_register.start_merge(&segment_id_b); - assert_eq!( - segment_register - .segment_entry(&segment_id_a) - .unwrap() - .state(), - SegmentState::InMerge - ); - assert_eq!( - segment_register - .segment_entry(&segment_id_b) - .unwrap() - .state(), - SegmentState::InMerge - ); segment_register.remove_segment(&segment_id_a); segment_register.remove_segment(&segment_id_b); { diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index defd5d5be..9f8e9577f 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -16,8 +16,10 @@ use futures_cpupool::CpuFuture; use futures_cpupool::CpuPool; use indexer::delete_queue::DeleteCursor; use indexer::index_writer::advance_deletes; +use indexer::merge_operation::MergeOperationInventory; use indexer::merger::IndexMerger; use indexer::stamper::Stamper; +use indexer::MergeOperation; use indexer::SegmentEntry; use indexer::SegmentSerializer; use indexer::{DefaultMergePolicy, MergePolicy}; @@ -25,6 +27,7 @@ use schema::Schema; use serde_json; use std::borrow::BorrowMut; use std::collections::HashMap; +use std::collections::HashSet; use std::io::Write; use std::mem; use std::ops::DerefMut; @@ -66,13 +69,8 @@ pub fn save_new_metas(schema: Schema, directory: &mut Directory) -> Result<()> { /// /// This method is not part of tantivy's public API fn save_metas(metas: &IndexMeta, directory: &mut Directory) -> Result<()> { - // let metas = IndexMeta { - // segments: segment_metas, - // schema, - // opstamp, - // payload, - // }; let mut buffer = serde_json::to_vec_pretty(metas)?; + // Just adding a new line at the end of the buffer. writeln!(&mut buffer)?; directory.atomic_write(&META_FILEPATH, &buffer[..])?; debug!("Saved metas {:?}", serde_json::to_string_pretty(&metas)); @@ -84,21 +82,21 @@ fn save_metas(metas: &IndexMeta, directory: &mut Directory) -> Result<()> { // // All this processing happens on a single thread // consuming a common queue. +// +// We voluntarily pass a merge_operation ref to guarantee that +// the merge_operation is alive during the process #[derive(Clone)] pub struct SegmentUpdater(Arc); -struct MergeOperation { - pub target_opstamp: u64, - pub segment_ids: Vec, -} - fn perform_merge( + merge_operation: &MergeOperation, index: &Index, mut segment_entries: Vec, - mut merged_segment: Segment, - target_opstamp: u64, ) -> Result { + let target_opstamp = merge_operation.target_opstamp(); + // first we need to apply deletes to our segment. + let mut merged_segment = index.new_segment(); // TODO add logging let schema = index.schema(); @@ -142,12 +140,13 @@ struct InnerSegmentUpdater { pool: CpuPool, index: Index, segment_manager: SegmentManager, - merge_policy: RwLock>, + merge_policy: RwLock>>, merging_thread_id: AtomicUsize, merging_threads: RwLock>>>, generation: AtomicUsize, killed: AtomicBool, stamper: Stamper, + merge_operations: MergeOperationInventory, } impl SegmentUpdater { @@ -168,28 +167,23 @@ impl SegmentUpdater { pool, index, segment_manager, - merge_policy: RwLock::new(Box::new(DefaultMergePolicy::default())), + merge_policy: RwLock::new(Arc::new(Box::new(DefaultMergePolicy::default()))), merging_thread_id: AtomicUsize::default(), merging_threads: RwLock::new(HashMap::new()), generation: AtomicUsize::default(), killed: AtomicBool::new(false), stamper, + merge_operations: Default::default(), }))) } - pub fn new_segment(&self) -> Segment { - let new_segment = self.0.index.new_segment(); - let segment_id = new_segment.id(); - self.0.segment_manager.write_segment(segment_id); - new_segment - } - - pub fn get_merge_policy(&self) -> Box { - self.0.merge_policy.read().unwrap().box_clone() + pub fn get_merge_policy(&self) -> Arc> { + self.0.merge_policy.read().unwrap().clone() } pub fn set_merge_policy(&self, merge_policy: Box) { - *self.0.merge_policy.write().unwrap() = merge_policy; + let arc_merge_policy = Arc::new(merge_policy); + *self.0.merge_policy.write().unwrap() = arc_merge_policy; } fn get_merging_thread_id(&self) -> usize { @@ -302,12 +296,14 @@ impl SegmentUpdater { } pub fn start_merge(&self, segment_ids: &[SegmentId]) -> Result> { - let segment_ids_vec = segment_ids.to_vec(); let commit_opstamp = self.load_metas().opstamp; - self.run_async(move |segment_updater| { - segment_updater.start_merge_impl(&segment_ids_vec[..], commit_opstamp) - }) - .wait()? + let merge_operation = MergeOperation::new( + &self.0.merge_operations, + commit_opstamp, + segment_ids.to_vec(), + ); + self.run_async(move |segment_updater| segment_updater.start_merge_impl(merge_operation)) + .wait()? } fn store_meta(&self, index_meta: &IndexMeta) { @@ -318,22 +314,25 @@ impl SegmentUpdater { } // `segment_ids` is required to be non-empty. - fn start_merge_impl( - &self, - segment_ids: &[SegmentId], - target_opstamp: u64, - ) -> Result> { - assert!(!segment_ids.is_empty(), "Segment_ids cannot be empty."); + fn start_merge_impl(&self, merge_operation: MergeOperation) -> Result> { + assert!( + !merge_operation.segment_ids().is_empty(), + "Segment_ids cannot be empty." + ); let segment_updater_clone = self.clone(); - let segment_entries: Vec = self.0.segment_manager.start_merge(segment_ids)?; + let segment_entries: Vec = self + .0 + .segment_manager + .start_merge(merge_operation.segment_ids())?; - let segment_ids_vec = segment_ids.to_vec(); + // let segment_ids_vec = merge_operation.segment_ids.to_vec(); let merging_thread_id = self.get_merging_thread_id(); info!( "Starting merge thread #{} - {:?}", - merging_thread_id, segment_ids + merging_thread_id, + merge_operation.segment_ids() ); let (merging_future_send, merging_future_recv) = oneshot(); @@ -342,20 +341,17 @@ impl SegmentUpdater { .name(format!("mergingthread-{}", merging_thread_id)) .spawn(move || { // first we need to apply deletes to our segment. - let merged_segment = segment_updater_clone.new_segment(); - let merged_segment_id = merged_segment.id(); let merge_result = perform_merge( + &merge_operation, &segment_updater_clone.0.index, segment_entries, - merged_segment, - target_opstamp, ); match merge_result { Ok(after_merge_segment_entry) => { let merged_segment_meta = after_merge_segment_entry.meta().clone(); segment_updater_clone - .end_merge(segment_ids_vec, after_merge_segment_entry) + .end_merge(merge_operation, after_merge_segment_entry) .expect("Segment updater thread is corrupted."); // the future may fail if the listener of the oneshot future @@ -366,13 +362,18 @@ impl SegmentUpdater { let _merging_future_res = merging_future_send.send(merged_segment_meta); } Err(e) => { - warn!("Merge of {:?} was cancelled: {:?}", segment_ids_vec, e); + warn!( + "Merge of {:?} was cancelled: {:?}", + merge_operation.segment_ids(), + e + ); // ... cancel merge if cfg!(test) { panic!("Merge failed."); } - segment_updater_clone.cancel_merge(&segment_ids_vec, merged_segment_id); - // merging_future_send will be dropped, sending an error to the future. + // As `merge_operation` will be dropped, the segment in merge state will + // be available for merge again. + // `merging_future_send` will be dropped, sending an error to the future. } } segment_updater_clone @@ -393,37 +394,34 @@ impl SegmentUpdater { } fn consider_merge_options(&self) { + let merge_segment_ids: HashSet = self.0.merge_operations.segment_in_merge(); let (committed_segments, uncommitted_segments) = - get_mergeable_segments(&self.0.segment_manager); + get_mergeable_segments(&merge_segment_ids, &self.0.segment_manager); + // Committed segments cannot be merged with uncommitted_segments. // We therefore consider merges using these two sets of segments independently. let merge_policy = self.get_merge_policy(); let current_opstamp = self.0.stamper.stamp(); - let mut merge_candidates = merge_policy + let mut merge_candidates: Vec = merge_policy .compute_merge_candidates(&uncommitted_segments) .into_iter() - .map(|merge_candidate| MergeOperation { - target_opstamp: current_opstamp, - segment_ids: merge_candidate.0, + .map(|merge_candidate| { + MergeOperation::new(&self.0.merge_operations, current_opstamp, merge_candidate.0) }) - .collect::>(); + .collect(); + let commit_opstamp = self.load_metas().opstamp; let committed_merge_candidates = merge_policy .compute_merge_candidates(&committed_segments) .into_iter() - .map(|merge_candidate| MergeOperation { - target_opstamp: commit_opstamp, - segment_ids: merge_candidate.0, + .map(|merge_candidate| { + MergeOperation::new(&self.0.merge_operations, commit_opstamp, merge_candidate.0) }) .collect::>(); merge_candidates.extend(committed_merge_candidates.into_iter()); - for MergeOperation { - target_opstamp, - segment_ids, - } in merge_candidates - { - match self.start_merge_impl(&segment_ids, target_opstamp) { + for merge_operation in merge_candidates { + match self.start_merge_impl(merge_operation) { Ok(merge_future) => { if let Err(e) = merge_future.fuse().poll() { error!("The merge task failed quickly after starting: {:?}", e); @@ -439,19 +437,9 @@ impl SegmentUpdater { } } - fn cancel_merge( - &self, - before_merge_segment_ids: &[SegmentId], - after_merge_segment_entry: SegmentId, - ) { - self.0 - .segment_manager - .cancel_merge(before_merge_segment_ids, after_merge_segment_entry); - } - fn end_merge( &self, - before_merge_segment_ids: Vec, + merge_operation: MergeOperation, mut after_merge_segment_entry: SegmentEntry, ) -> Result<()> { self.run_async(move |segment_updater| { @@ -467,16 +455,15 @@ impl SegmentUpdater { { error!( "Merge of {:?} was cancelled (advancing deletes failed): {:?}", - before_merge_segment_ids, e + merge_operation.segment_ids(), + e ); - // ... cancel merge if cfg!(test) { panic!("Merge failed."); } - segment_updater.cancel_merge( - &before_merge_segment_ids, - after_merge_segment_entry.segment_id(), - ); + // ... cancel merge + // `merge_operations` are tracked. As it is dropped, the + // the segment_ids will be available again for merge. return; } } @@ -484,7 +471,7 @@ impl SegmentUpdater { segment_updater .0 .segment_manager - .end_merge(&before_merge_segment_ids, after_merge_segment_entry); + .end_merge(merge_operation.segment_ids(), after_merge_segment_entry); segment_updater.consider_merge_options(); info!("save metas"); let previous_metas = segment_updater.load_metas(); @@ -510,32 +497,25 @@ impl SegmentUpdater { /// Obsolete files will eventually be cleaned up /// by the directory garbage collector. pub fn wait_merging_thread(&self) -> Result<()> { - let mut num_segments: usize; loop { - num_segments = self.0.segment_manager.num_segments(); - - let mut new_merging_threads = HashMap::new(); - { + let merging_threads: HashMap>> = { let mut merging_threads = self.0.merging_threads.write().unwrap(); - mem::swap(&mut new_merging_threads, merging_threads.deref_mut()); + mem::replace(merging_threads.deref_mut(), HashMap::new()) + }; + if merging_threads.is_empty() { + return Ok(()); } - debug!("wait merging thread {}", new_merging_threads.len()); - for (_, merging_thread_handle) in new_merging_threads { + debug!("wait merging thread {}", merging_threads.len()); + for (_, merging_thread_handle) in merging_threads { merging_thread_handle .join() .map(|_| ()) .map_err(|_| TantivyError::ErrorInThread("Merging thread failed.".into()))?; } - // Our merging thread may have queued their completed + // Our merging thread may have queued their completed merged segment. + // Let's wait for that too. self.run_async(move |_| {}).wait()?; - - let new_num_segments = self.0.segment_manager.num_segments(); - - if new_num_segments >= num_segments { - break; - } } - Ok(()) } } diff --git a/src/tokenizer/lower_caser.rs b/src/tokenizer/lower_caser.rs index 38fa782fc..d1f70cb30 100644 --- a/src/tokenizer/lower_caser.rs +++ b/src/tokenizer/lower_caser.rs @@ -50,7 +50,6 @@ where self.token_mut().text.make_ascii_lowercase(); } else { to_lowercase_unicode(&mut self.tail.token_mut().text, &mut self.buffer); - mem::swap(&mut self.tail.token_mut().text, &mut self.buffer); } true