From a4ba20eea3a0edec6a28bdbba914d0c6ac34415e Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Sat, 8 Apr 2017 17:30:25 +0900 Subject: [PATCH] issue/96 code clean up, adding comments.wq --- src/indexer/delete_queue.rs | 8 +++--- src/indexer/merge_policy.rs | 5 ++++ src/indexer/segment_entry.rs | 47 +++++++++++++++++++++++++++++++--- src/indexer/segment_manager.rs | 19 +++----------- src/indexer/segment_updater.rs | 21 +++++++++++++++ 5 files changed, 77 insertions(+), 23 deletions(-) diff --git a/src/indexer/delete_queue.rs b/src/indexer/delete_queue.rs index 1f476dad6..a031c63d3 100644 --- a/src/indexer/delete_queue.rs +++ b/src/indexer/delete_queue.rs @@ -21,7 +21,7 @@ use std::ops::DerefMut; #[derive(Default)] struct InnerDeleteQueue { writer: Vec, - last_block: Option>, // TODO last block... is that ok. + last_block: Option>, } #[derive(Clone, Default)] @@ -36,7 +36,7 @@ impl DeleteQueue { pub fn new() -> DeleteQueue { let delete_queue = DeleteQueue { - inner: Arc::new(RwLock::new(InnerDeleteQueue::default())) + inner: Arc::default(), }; let next_block = NextBlock::from(delete_queue.clone()); @@ -61,7 +61,7 @@ impl DeleteQueue { pub fn cursor(&self) -> DeleteCursor { let last_block = self.inner .read() - .unwrap() + .expect("Read lock poisoned when opening delete queue cursor") .last_block .clone() .expect("Failed to unwrap last_block. This should never happen @@ -253,7 +253,7 @@ impl DeleteCursor { /// Get the current delete operation. /// Calling `.get` does not advance the cursor. - pub fn get<'a>(&'a mut self) -> Option<&'a DeleteOperation> { + pub fn get(&mut self) -> Option<&DeleteOperation> { if self.load_block_if_required() { Some(&self.block.operations[self.pos]) } diff --git a/src/indexer/merge_policy.rs b/src/indexer/merge_policy.rs index dfd9dfcec..ecab510d7 100644 --- a/src/indexer/merge_policy.rs +++ b/src/indexer/merge_policy.rs @@ -51,6 +51,11 @@ pub mod tests { use core::SegmentId; use core::SegmentMeta; + + /// Merge policy useful for test purposes. + /// + /// Everytime there is more than one segment, + /// it will suggest to merge them. #[derive(Debug)] pub struct MergeWheneverPossible; diff --git a/src/indexer/segment_entry.rs b/src/indexer/segment_entry.rs index c93bcfe64..86673d517 100644 --- a/src/indexer/segment_entry.rs +++ b/src/indexer/segment_entry.rs @@ -20,6 +20,20 @@ impl SegmentState { } } + +/// A segment entry describes the state of +/// a given segment, at a given instant. +/// +/// In addition to segment meta, +/// it contains a few transient states +/// - state expresses whether the segment is already in the +/// middle of a merge +/// - delete_bitset is a bitset describing +/// documents that were deleted during the commit +/// itself. +/// - Delete cursor, is the position in the delete queue. +/// Deletes happening before the cursor are reflected either +/// in the .del file or in the delete_bitset. #[derive(Clone)] pub struct SegmentEntry { meta: SegmentMeta, @@ -31,6 +45,8 @@ pub struct SegmentEntry { impl SegmentEntry { + + /// Create a new `SegmentEntry` pub fn new(segment_meta: SegmentMeta, delete_cursor: DeleteCursor, delete_bitset: Option) -> SegmentEntry { @@ -42,42 +58,65 @@ impl SegmentEntry { } } + + /// Return a reference to the segment entry deleted bitset. + /// + /// `DocId` in this bitset are flagged as deleted. pub fn delete_bitset(&self,) -> Option<&BitSet> { self.delete_bitset.as_ref() } + /// Set the `SegmentMeta` for this segment. pub fn set_meta(&mut self, segment_meta: SegmentMeta) { self.meta = segment_meta; } + + /// Return a reference to the segment_entry's delete cursor pub fn delete_cursor(&mut self) -> &mut DeleteCursor { &mut self.delete_cursor } + /// Return the `SegmentEntry`. + /// + /// The state describes whether the segment is available for + /// a merge or not. pub fn state(&self) -> SegmentState { self.state } - pub fn set_state(&mut self, state: SegmentState) { - self.state = state; - } - + /// Returns the segment id. pub fn segment_id(&self) -> SegmentId { self.meta.id() } + + /// Accessor to the `SegmentMeta` pub fn meta(&self) -> &SegmentMeta { &self.meta } + + /// Mark the `SegmentEntry` as in merge. + /// + /// Only segments that are not already + /// in a merge are elligible for future merge. pub fn start_merge(&mut self,) { self.state = SegmentState::InMerge; } + /// Cancel a merge + /// + /// If a merge fails, it is important to switch + /// the segment back to a idle state, so that it + /// may be elligible for future merges. pub fn cancel_merge(&mut self,) { self.state = SegmentState::Ready; } + + /// Returns true iff a segment should + /// be considered for a merge. pub fn is_ready(&self,) -> bool { self.state == SegmentState::Ready } diff --git a/src/indexer/segment_manager.rs b/src/indexer/segment_manager.rs index 4961cb148..66f6fe569 100644 --- a/src/indexer/segment_manager.rs +++ b/src/indexer/segment_manager.rs @@ -3,7 +3,7 @@ use std::sync::RwLock; use core::SegmentMeta; use core::{META_FILEPATH, LOCKFILE_FILEPATH}; use core::SegmentId; -use indexer::{SegmentEntry, SegmentState}; +use indexer::SegmentEntry; use std::path::PathBuf; use std::collections::hash_set::HashSet; use std::sync::{RwLockReadGuard, RwLockWriteGuard}; @@ -54,6 +54,7 @@ impl SegmentManager { } } + /// Returns all of the segment entries (committed or uncommitted) pub fn segment_entries(&self,) -> Vec { let mut segment_entries = self.read() .uncommitted @@ -66,6 +67,7 @@ impl SegmentManager { segment_entries } + /// Returns the overall number of segments in the `SegmentManager` pub fn num_segments(&self,) -> usize { let registers_lock = self.read(); registers_lock.committed.len() + registers_lock.uncommitted.len() @@ -95,11 +97,6 @@ impl SegmentManager { files } - pub fn segment_state(&self, segment_id: &SegmentId) -> Option { - self.segment_entry(segment_id) - .map(|segment_entry| segment_entry.state()) - } - pub fn segment_entry(&self, segment_id: &SegmentId) -> Option { let registers = self.read(); registers @@ -119,15 +116,7 @@ impl SegmentManager { self.registers.write().expect("Failed to acquire write lock on SegmentManager.") } - pub fn commit(&self, mut segment_entries: Vec) { - // TODO is still relevant!? - // restore the state of the segment_entries - for segment_entry in &mut segment_entries { - let segment_id = segment_entry.segment_id(); - if let Some(state) = self.segment_state(&segment_id) { - segment_entry.set_state(state); - } - } + pub fn commit(&self, segment_entries: Vec) { let mut registers_lock = self.write(); registers_lock.committed.clear(); registers_lock.uncommitted.clear(); diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 8fb027042..4ac7ddea7 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -237,6 +237,11 @@ impl SegmentUpdater { !self.0.killed.load(Ordering::Acquire) } + + /// Apply deletes up to the target opstamp to all segments. + /// + /// Tne method returns copies of the segment entries, + /// updated with the delete information. fn purge_deletes(&self, target_opstamp: u64) -> Result> { let mut segment_entries = self.0.segment_manager.segment_entries(); for segment_entry in &mut segment_entries { @@ -374,6 +379,22 @@ impl SegmentUpdater { }).wait() } + + /// Wait for current merging threads. + /// + /// Upon termination of the current merging threads, + /// merge opportunity may appear. + // + /// We keep waiting until the merge policy judges that + /// no opportunity is available. + /// + /// Note that it is not required to call this + /// method in your application. + /// Terminating your application without letting + /// merge terminate is perfectly safe. + /// + /// Obsolete files will eventually be cleaned up + /// by the directory garbage collector. pub fn wait_merging_thread(&self) -> Result<()> { let mut num_segments: usize;