test passing

This commit is contained in:
Paul Masurel
2017-03-13 10:00:19 +09:00
parent 202dda98ba
commit 5932278e00
5 changed files with 46 additions and 29 deletions

View File

@@ -60,7 +60,12 @@ pub mod tests {
.iter()
.map(|segment_meta| segment_meta.id())
.collect::<Vec<SegmentId>>();
vec!(MergeCandidate(segment_ids))
if segment_ids.len() > 1 {
vec!(MergeCandidate(segment_ids))
}
else {
vec!()
}
}
fn box_clone(&self) -> Box<MergePolicy> {

View File

@@ -175,9 +175,9 @@ impl IndexMerger {
Ok(())
}
fn write_postings(&self,
postings_serializer: &mut PostingsSerializer) -> Result<()> {
fn write_postings(
&self,
postings_serializer: &mut PostingsSerializer) -> Result<()> {
let mut merged_terms = TermIterator::from(&self.readers[..]);
let mut delta_position_computer = DeltaPositionComputer::new();
@@ -493,9 +493,7 @@ mod tests {
index_writer.commit().expect("committed");
index.load_searchers().unwrap();
let searcher = index.searcher();
for segment_reader in searcher.segment_readers() {
println!("segment reader {}", segment_reader.num_docs());
}
assert_eq!(searcher.segment_readers().len(), 2);
assert_eq!(searcher.num_docs(), 3);
assert_eq!(searcher.segment_readers()[0].num_docs(), 1);

View File

@@ -42,10 +42,16 @@ impl Debug for SegmentManager {
///
/// For instance, a segment will not appear in both committed and uncommitted
/// segments
pub fn get_segments(segment_manager: &SegmentManager,) -> (Vec<SegmentMeta>, Vec<SegmentMeta>) {
pub fn get_all_segments(segment_manager: &SegmentManager,) -> (Vec<SegmentMeta>, Vec<SegmentMeta>) {
let registers_lock = segment_manager.read();
(registers_lock.committed.get_segments(),
registers_lock.uncommitted.get_segments())
(registers_lock.committed.get_all_segments(),
registers_lock.uncommitted.get_all_segments())
}
pub fn get_mergeable_segments(segment_manager: &SegmentManager,) -> (Vec<SegmentMeta>, Vec<SegmentMeta>) {
let registers_lock = segment_manager.read();
(registers_lock.committed.get_mergeable_segments(),
registers_lock.uncommitted.get_mergeable_segments())
}
impl SegmentManager {
@@ -78,18 +84,18 @@ impl SegmentManager {
files.insert(META_FILEPATH.clone());
files.insert(LOCKFILE_FILEPATH.clone());
let segment_metas =
let segment_metas: Vec<SegmentMeta> =
registers_lock.committed
.get_segments()
.get_all_segments()
.into_iter()
.chain(registers_lock.uncommitted
.get_segments()
.get_all_segments()
.into_iter())
.chain(registers_lock.writing
.iter()
.cloned()
.map(SegmentMeta::new));
.map(SegmentMeta::new))
.collect();
for segment_meta in segment_metas {
files.extend(segment_meta.list_files());
}

View File

@@ -36,8 +36,15 @@ impl SegmentRegister {
pub fn clear(&mut self,) {
self.segment_states.clear();
}
pub fn get_all_segments(&self,) -> Vec<SegmentMeta> {
self.segment_states
.values()
.map(|segment_entry| segment_entry.meta().clone())
.collect()
}
pub fn get_segments(&self,) -> Vec<SegmentMeta> {
pub fn get_mergeable_segments(&self,) -> Vec<SegmentMeta> {
self.segment_states
.values()
.filter(|segment_entry| segment_entry.is_ready())

View File

@@ -35,7 +35,7 @@ use std::sync::atomic::Ordering;
use std::sync::RwLock;
use std::thread;
use std::thread::JoinHandle;
use super::segment_manager::{SegmentManager, get_segments};
use super::segment_manager::{SegmentManager, get_mergeable_segments};
/// Save the index meta file.
@@ -129,9 +129,7 @@ impl SegmentUpdater {
pub fn new_segment(&self) -> Segment {
let new_segment = self.0.index.new_segment();
let segment_id = new_segment.id();
self.run_async(move |segment_updater| {
segment_updater.0.segment_manager.write_segment(segment_id);
});
self.0.segment_manager.write_segment(segment_id);
new_segment
}
@@ -147,7 +145,8 @@ impl SegmentUpdater {
self.0.merging_thread_id.fetch_add(1, Ordering::SeqCst)
}
/// TODO check that we use this correctly taking
/// the laziness in account.
fn run_async<T: 'static + Send, F: 'static + Send + FnOnce(SegmentUpdater) -> T>(&self, f: F) -> impl Future<Item=T, Error=Error> {
let me_clone = self.clone();
self.0.pool.spawn_fn(move || {
@@ -157,11 +156,13 @@ impl SegmentUpdater {
pub fn add_segment(&self, generation: usize, segment_entry: SegmentEntry) -> impl Future<Item=bool, Error=Error> {
if self.is_alive() && generation >= self.0.generation.load(Ordering::Acquire) {
future::Either::A(self.run_async(|segment_updater| {
segment_updater.0.segment_manager.add_segment(segment_entry);
segment_updater.consider_merge_options();
true
}))
future::Either::A({
self.run_async(|segment_updater| {
segment_updater.0.segment_manager.add_segment(segment_entry);
segment_updater.consider_merge_options();
true
})
})
}
else {
future::Either::B(future::ok(false))
@@ -306,7 +307,7 @@ impl SegmentUpdater {
fn consider_merge_options(&self) {
let (committed_segments, uncommitted_segments) = get_segments(&self.0.segment_manager);
let (committed_segments, uncommitted_segments) = get_mergeable_segments(&self.0.segment_manager);
// Committed segments cannot be merged with uncommitted_segments.
// We therefore consider merges using these two sets of segments independently.
let merge_policy = self.get_merge_policy();
@@ -405,7 +406,7 @@ mod tests {
}
index.load_searchers().unwrap();
assert_eq!(index.searcher().segment_readers().len(), 3);
assert_eq!(index.searcher().segment_readers().len(), 2);
assert_eq!(index.searcher().num_docs(), 302);
{
@@ -414,7 +415,7 @@ mod tests {
}
index.load_searchers().unwrap();
assert_eq!(index.searcher().segment_readers().len(), 2);
assert_eq!(index.searcher().segment_readers().len(), 1);
assert_eq!(index.searcher().num_docs(), 302);
}
}