issue/96 code clean up, adding comments.wq

This commit is contained in:
Paul Masurel
2017-04-08 17:30:25 +09:00
parent 4bef6c99ee
commit a4ba20eea3
5 changed files with 77 additions and 23 deletions

View File

@@ -21,7 +21,7 @@ use std::ops::DerefMut;
#[derive(Default)]
struct InnerDeleteQueue {
writer: Vec<DeleteOperation>,
last_block: Option<Arc<Block>>, // TODO last block... is that ok.
last_block: Option<Arc<Block>>,
}
#[derive(Clone, Default)]
@@ -36,7 +36,7 @@ impl DeleteQueue {
pub fn new() -> DeleteQueue {
let delete_queue = DeleteQueue {
inner: Arc::new(RwLock::new(InnerDeleteQueue::default()))
inner: Arc::default(),
};
let next_block = NextBlock::from(delete_queue.clone());
@@ -61,7 +61,7 @@ impl DeleteQueue {
pub fn cursor(&self) -> DeleteCursor {
let last_block = self.inner
.read()
.unwrap()
.expect("Read lock poisoned when opening delete queue cursor")
.last_block
.clone()
.expect("Failed to unwrap last_block. This should never happen
@@ -253,7 +253,7 @@ impl DeleteCursor {
/// Get the current delete operation.
/// Calling `.get` does not advance the cursor.
pub fn get<'a>(&'a mut self) -> Option<&'a DeleteOperation> {
pub fn get(&mut self) -> Option<&DeleteOperation> {
if self.load_block_if_required() {
Some(&self.block.operations[self.pos])
}

View File

@@ -51,6 +51,11 @@ pub mod tests {
use core::SegmentId;
use core::SegmentMeta;
/// Merge policy useful for test purposes.
///
/// Everytime there is more than one segment,
/// it will suggest to merge them.
#[derive(Debug)]
pub struct MergeWheneverPossible;

View File

@@ -20,6 +20,20 @@ impl SegmentState {
}
}
/// A segment entry describes the state of
/// a given segment, at a given instant.
///
/// In addition to segment meta,
/// it contains a few transient states
/// - state expresses whether the segment is already in the
/// middle of a merge
/// - delete_bitset is a bitset describing
/// documents that were deleted during the commit
/// itself.
/// - Delete cursor, is the position in the delete queue.
/// Deletes happening before the cursor are reflected either
/// in the .del file or in the delete_bitset.
#[derive(Clone)]
pub struct SegmentEntry {
meta: SegmentMeta,
@@ -31,6 +45,8 @@ pub struct SegmentEntry {
impl SegmentEntry {
/// Create a new `SegmentEntry`
pub fn new(segment_meta: SegmentMeta,
delete_cursor: DeleteCursor,
delete_bitset: Option<BitSet>) -> SegmentEntry {
@@ -42,42 +58,65 @@ impl SegmentEntry {
}
}
/// Return a reference to the segment entry deleted bitset.
///
/// `DocId` in this bitset are flagged as deleted.
pub fn delete_bitset(&self,) -> Option<&BitSet> {
self.delete_bitset.as_ref()
}
/// Set the `SegmentMeta` for this segment.
pub fn set_meta(&mut self, segment_meta: SegmentMeta) {
self.meta = segment_meta;
}
/// Return a reference to the segment_entry's delete cursor
pub fn delete_cursor(&mut self) -> &mut DeleteCursor {
&mut self.delete_cursor
}
/// Return the `SegmentEntry`.
///
/// The state describes whether the segment is available for
/// a merge or not.
pub fn state(&self) -> SegmentState {
self.state
}
pub fn set_state(&mut self, state: SegmentState) {
self.state = state;
}
/// Returns the segment id.
pub fn segment_id(&self) -> SegmentId {
self.meta.id()
}
/// Accessor to the `SegmentMeta`
pub fn meta(&self) -> &SegmentMeta {
&self.meta
}
/// Mark the `SegmentEntry` as in merge.
///
/// Only segments that are not already
/// in a merge are elligible for future merge.
pub fn start_merge(&mut self,) {
self.state = SegmentState::InMerge;
}
/// Cancel a merge
///
/// If a merge fails, it is important to switch
/// the segment back to a idle state, so that it
/// may be elligible for future merges.
pub fn cancel_merge(&mut self,) {
self.state = SegmentState::Ready;
}
/// Returns true iff a segment should
/// be considered for a merge.
pub fn is_ready(&self,) -> bool {
self.state == SegmentState::Ready
}

View File

@@ -3,7 +3,7 @@ use std::sync::RwLock;
use core::SegmentMeta;
use core::{META_FILEPATH, LOCKFILE_FILEPATH};
use core::SegmentId;
use indexer::{SegmentEntry, SegmentState};
use indexer::SegmentEntry;
use std::path::PathBuf;
use std::collections::hash_set::HashSet;
use std::sync::{RwLockReadGuard, RwLockWriteGuard};
@@ -54,6 +54,7 @@ impl SegmentManager {
}
}
/// Returns all of the segment entries (committed or uncommitted)
pub fn segment_entries(&self,) -> Vec<SegmentEntry> {
let mut segment_entries = self.read()
.uncommitted
@@ -66,6 +67,7 @@ impl SegmentManager {
segment_entries
}
/// Returns the overall number of segments in the `SegmentManager`
pub fn num_segments(&self,) -> usize {
let registers_lock = self.read();
registers_lock.committed.len() + registers_lock.uncommitted.len()
@@ -95,11 +97,6 @@ impl SegmentManager {
files
}
pub fn segment_state(&self, segment_id: &SegmentId) -> Option<SegmentState> {
self.segment_entry(segment_id)
.map(|segment_entry| segment_entry.state())
}
pub fn segment_entry(&self, segment_id: &SegmentId) -> Option<SegmentEntry> {
let registers = self.read();
registers
@@ -119,15 +116,7 @@ impl SegmentManager {
self.registers.write().expect("Failed to acquire write lock on SegmentManager.")
}
pub fn commit(&self, mut segment_entries: Vec<SegmentEntry>) {
// TODO is still relevant!?
// restore the state of the segment_entries
for segment_entry in &mut segment_entries {
let segment_id = segment_entry.segment_id();
if let Some(state) = self.segment_state(&segment_id) {
segment_entry.set_state(state);
}
}
pub fn commit(&self, segment_entries: Vec<SegmentEntry>) {
let mut registers_lock = self.write();
registers_lock.committed.clear();
registers_lock.uncommitted.clear();

View File

@@ -237,6 +237,11 @@ impl SegmentUpdater {
!self.0.killed.load(Ordering::Acquire)
}
/// Apply deletes up to the target opstamp to all segments.
///
/// Tne method returns copies of the segment entries,
/// updated with the delete information.
fn purge_deletes(&self, target_opstamp: u64) -> Result<Vec<SegmentEntry>> {
let mut segment_entries = self.0.segment_manager.segment_entries();
for segment_entry in &mut segment_entries {
@@ -374,6 +379,22 @@ impl SegmentUpdater {
}).wait()
}
/// Wait for current merging threads.
///
/// Upon termination of the current merging threads,
/// merge opportunity may appear.
//
/// We keep waiting until the merge policy judges that
/// no opportunity is available.
///
/// Note that it is not required to call this
/// method in your application.
/// Terminating your application without letting
/// merge terminate is perfectly safe.
///
/// Obsolete files will eventually be cleaned up
/// by the directory garbage collector.
pub fn wait_merging_thread(&self) -> Result<()> {
let mut num_segments: usize;