|
|
|
|
@@ -6,6 +6,7 @@ use core::SerializableSegment;
|
|
|
|
|
use core::Segment;
|
|
|
|
|
use std::thread::JoinHandle;
|
|
|
|
|
use indexer::SegmentWriter;
|
|
|
|
|
use indexer::MergeCandidate;
|
|
|
|
|
use std::clone::Clone;
|
|
|
|
|
use std::io;
|
|
|
|
|
use indexer::MergePolicy;
|
|
|
|
|
@@ -57,8 +58,6 @@ pub struct IndexWriter {
|
|
|
|
|
|
|
|
|
|
num_threads: usize,
|
|
|
|
|
docstamp: u64,
|
|
|
|
|
|
|
|
|
|
merge_policy: Box<MergePolicy>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@@ -92,7 +91,6 @@ fn index_documents(heap: &mut Heap,
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
|
pub enum SegmentUpdate {
|
|
|
|
|
AddSegment(SegmentMeta),
|
|
|
|
|
StartMerge(Vec<SegmentId>),
|
|
|
|
|
EndMerge(Vec<SegmentId>, SegmentMeta),
|
|
|
|
|
CancelGeneration,
|
|
|
|
|
NewGeneration,
|
|
|
|
|
@@ -106,7 +104,8 @@ fn process_segment_update(
|
|
|
|
|
index: &Index,
|
|
|
|
|
segment_manager: &SegmentManager,
|
|
|
|
|
segment_update: SegmentUpdate,
|
|
|
|
|
is_cancelled_generation: &mut bool) -> Result<bool> {
|
|
|
|
|
is_cancelled_generation: &mut bool) -> bool {
|
|
|
|
|
info!("Segment update: {:?}", segment_update);
|
|
|
|
|
match segment_update {
|
|
|
|
|
SegmentUpdate::AddSegment(segment_meta) => {
|
|
|
|
|
if !*is_cancelled_generation {
|
|
|
|
|
@@ -115,54 +114,41 @@ fn process_segment_update(
|
|
|
|
|
else {
|
|
|
|
|
index.delete_segment(segment_meta.segment_id);
|
|
|
|
|
}
|
|
|
|
|
Ok(true)
|
|
|
|
|
},
|
|
|
|
|
SegmentUpdate::StartMerge(segment_ids) => {
|
|
|
|
|
if !*is_cancelled_generation {
|
|
|
|
|
segment_manager.start_merge(&segment_ids);
|
|
|
|
|
// TODO spawn a segment merge thread
|
|
|
|
|
}
|
|
|
|
|
Ok(false)
|
|
|
|
|
true
|
|
|
|
|
},
|
|
|
|
|
SegmentUpdate::EndMerge(segment_ids, segment_meta) => {
|
|
|
|
|
segment_manager.end_merge(&segment_ids, &segment_meta);
|
|
|
|
|
for segment_id in segment_ids {
|
|
|
|
|
index.delete_segment(segment_id);
|
|
|
|
|
}
|
|
|
|
|
Ok(true)
|
|
|
|
|
true
|
|
|
|
|
},
|
|
|
|
|
SegmentUpdate::CancelGeneration => {
|
|
|
|
|
*is_cancelled_generation = true;
|
|
|
|
|
Ok(false)
|
|
|
|
|
false
|
|
|
|
|
},
|
|
|
|
|
SegmentUpdate::NewGeneration => {
|
|
|
|
|
*is_cancelled_generation = false;
|
|
|
|
|
Ok(false)
|
|
|
|
|
false
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn consider_merge_options(index: &mut Index, merge_policy: &MergePolicy) {
|
|
|
|
|
fn consider_merge_options(index: &Index, merge_policy: &MergePolicy) -> Vec<MergeCandidate> {
|
|
|
|
|
let segment_manager = get_segment_manager(index);
|
|
|
|
|
let (committed_segments, uncommitted_segments) = get_segment_ready_for_commit(&*segment_manager);
|
|
|
|
|
// committed segments cannot be merged with uncommitted_segments.
|
|
|
|
|
let merge_candidates_committed = merge_policy.compute_merge_candidates(&committed_segments);
|
|
|
|
|
let merge_candidates_uncommitted = merge_policy.compute_merge_candidates(&uncommitted_segments);
|
|
|
|
|
merge_candidates_committed.into_iter().chain(merge_candidates_uncommitted)
|
|
|
|
|
.map(|merge_candidate| {
|
|
|
|
|
println!("{:?}", merge_candidate);
|
|
|
|
|
});
|
|
|
|
|
let mut merge_candidates = merge_policy.compute_merge_candidates(&committed_segments);
|
|
|
|
|
merge_candidates.extend_from_slice(&merge_policy.compute_merge_candidates(&uncommitted_segments)[..]);
|
|
|
|
|
merge_candidates
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn on_segment_change(index: &mut Index,
|
|
|
|
|
merge_policy: &MergePolicy) -> Result<()> {
|
|
|
|
|
fn on_segment_change(index: &mut Index) -> Result<()> {
|
|
|
|
|
// saving the meta file.
|
|
|
|
|
try!(index.save_metas());
|
|
|
|
|
// update the searcher so that they eventually will
|
|
|
|
|
// use the new segments.
|
|
|
|
|
try!(index.load_searchers());
|
|
|
|
|
// consider merge options.
|
|
|
|
|
consider_merge_options(index, merge_policy);
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -175,26 +161,55 @@ fn on_segment_change(index: &mut Index,
|
|
|
|
|
// trivial.
|
|
|
|
|
fn process_segment_updates(mut index: Index,
|
|
|
|
|
segment_manager: &SegmentManager,
|
|
|
|
|
segment_update_receiver: SegmentUpdateReceiver) -> Result<()> {
|
|
|
|
|
segment_update_receiver: SegmentUpdateReceiver,
|
|
|
|
|
segment_update_sender: SegmentUpdateSender) {
|
|
|
|
|
let mut segment_update_it = segment_update_receiver.into_iter();
|
|
|
|
|
let mut is_cancelled_generation = false;
|
|
|
|
|
let merge_policy = index.get_merge_policy();
|
|
|
|
|
loop {
|
|
|
|
|
if let Some(segment_update) = segment_update_it.next() {
|
|
|
|
|
let has_changed = try!(
|
|
|
|
|
process_segment_update(
|
|
|
|
|
let has_changed = process_segment_update(
|
|
|
|
|
&index,
|
|
|
|
|
segment_manager,
|
|
|
|
|
segment_update,
|
|
|
|
|
&mut is_cancelled_generation)
|
|
|
|
|
);
|
|
|
|
|
&mut is_cancelled_generation);
|
|
|
|
|
if has_changed {
|
|
|
|
|
on_segment_change(&mut index, &*merge_policy);
|
|
|
|
|
on_segment_change(&mut index);
|
|
|
|
|
|
|
|
|
|
let segment_manager = get_segment_manager(&index);
|
|
|
|
|
|
|
|
|
|
for MergeCandidate(segment_ids) in consider_merge_options(&index, &*merge_policy) {
|
|
|
|
|
segment_manager.start_merge(&segment_ids);
|
|
|
|
|
let index_clone = index.clone();
|
|
|
|
|
let segment_update_sender_clone = segment_update_sender.clone();
|
|
|
|
|
thread::spawn(move || {
|
|
|
|
|
info!("Start merge: {:?}", segment_ids);
|
|
|
|
|
let schema = index_clone.schema();
|
|
|
|
|
let segments: Vec<Segment> = segment_ids
|
|
|
|
|
.iter()
|
|
|
|
|
.map(|&segment_id| index_clone.segment(segment_id))
|
|
|
|
|
.collect();
|
|
|
|
|
// An IndexMerger is like a "view" of our merged segments.
|
|
|
|
|
// TODO unwrap
|
|
|
|
|
let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).unwrap();
|
|
|
|
|
let mut merged_segment = index_clone.new_segment();
|
|
|
|
|
// ... we just serialize this index merger in our new segment
|
|
|
|
|
// to merge the two segments.
|
|
|
|
|
let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).unwrap();
|
|
|
|
|
let num_docs = merger.write(segment_serializer).unwrap();
|
|
|
|
|
let segment_meta = SegmentMeta {
|
|
|
|
|
segment_id: merged_segment.id(),
|
|
|
|
|
num_docs: num_docs,
|
|
|
|
|
};
|
|
|
|
|
let segment_update = SegmentUpdate::EndMerge(segment_ids, segment_meta);
|
|
|
|
|
segment_update_sender_clone.send(segment_update);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
// somehow, the channel was dropped.
|
|
|
|
|
return Ok(());
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -267,8 +282,9 @@ impl IndexWriter {
|
|
|
|
|
let segment_manager = get_segment_manager(index);
|
|
|
|
|
|
|
|
|
|
let index_clone = index.clone();
|
|
|
|
|
let segment_update_sender_clone = segment_update_sender.clone();
|
|
|
|
|
thread::spawn(move || {
|
|
|
|
|
process_segment_updates(index_clone, &*segment_manager, segment_update_receiver)
|
|
|
|
|
process_segment_updates(index_clone, &*segment_manager, segment_update_receiver, segment_update_sender_clone)
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
let mut index_writer = IndexWriter {
|
|
|
|
|
@@ -283,7 +299,6 @@ impl IndexWriter {
|
|
|
|
|
workers_join_handle: Vec::new(),
|
|
|
|
|
num_threads: num_threads,
|
|
|
|
|
|
|
|
|
|
merge_policy: index.get_merge_policy(),
|
|
|
|
|
docstamp: index.docstamp(),
|
|
|
|
|
};
|
|
|
|
|
try!(index_writer.start_workers());
|
|
|
|
|
@@ -383,6 +398,9 @@ impl IndexWriter {
|
|
|
|
|
|
|
|
|
|
let rollbacked_segments = get_segment_manager(&self.index).rollback();
|
|
|
|
|
for segment_id in rollbacked_segments {
|
|
|
|
|
|
|
|
|
|
// TODO all delete must happen after saving
|
|
|
|
|
// meta.json
|
|
|
|
|
self.index.delete_segment(segment_id);
|
|
|
|
|
}
|
|
|
|
|
try!(self.on_change());
|
|
|
|
|
|