Various fixes.

This commit is contained in:
Paul Masurel
2017-03-13 22:01:55 +09:00
parent 5932278e00
commit da10fe3b4d
5 changed files with 154 additions and 91 deletions

View File

@@ -504,6 +504,13 @@ impl IndexWriter {
opstamp
}
/// Returns the opstamp of the last successful commit.
///
/// This is, for instance, the opstamp the index will
/// rollback to if there is a failure like a power surge.
///
/// This is also the opstamp of the commit that is currently
/// available for searchers.
pub fn commit_opstamp(&self) -> u64 {
self.committed_opstamp
}

View File

@@ -77,6 +77,10 @@ impl SegmentEntry {
self.state = SegmentState::InMerge;
}
pub fn cancel_merge(&mut self,) {
self.state = SegmentState::Ready;
}
pub fn is_ready(&self,) -> bool {
self.state == SegmentState::Ready
}

View File

@@ -36,18 +36,6 @@ impl Debug for SegmentManager {
}
}
/// Returns the `SegmentMeta`s for (committed segment, uncommitted segments).
/// The result is consistent with other transactions.
///
/// For instance, a segment will not appear in both committed and uncommitted
/// segments
pub fn get_all_segments(segment_manager: &SegmentManager,) -> (Vec<SegmentMeta>, Vec<SegmentMeta>) {
let registers_lock = segment_manager.read();
(registers_lock.committed.get_all_segments(),
registers_lock.uncommitted.get_all_segments())
}
pub fn get_mergeable_segments(segment_manager: &SegmentManager,) -> (Vec<SegmentMeta>, Vec<SegmentMeta>) {
let registers_lock = segment_manager.read();
(registers_lock.committed.get_mergeable_segments(),
@@ -160,6 +148,40 @@ impl SegmentManager {
}
}
pub fn cancel_merge(&self,
before_merge_segment_ids: &[SegmentId],
after_merge_segment_id: SegmentId) {
let mut registers_lock = self.write();
// we mark all segments are ready for merge.
{
let target_segment_register: &mut SegmentRegister;
target_segment_register = {
if registers_lock.uncommitted.contains_all(&before_merge_segment_ids) {
&mut registers_lock.uncommitted
}
else if registers_lock.committed.contains_all(&before_merge_segment_ids) {
&mut registers_lock.committed
}
else {
warn!("couldn't find segment in SegmentManager");
return;
}
};
for segment_id in before_merge_segment_ids {
target_segment_register.cancel_merge(segment_id);
}
}
// ... and we make sure the target segment entry
// can be garbage collected.
registers_lock.writing.remove(&after_merge_segment_id);
}
pub fn write_segment(&self, segment_id: SegmentId) {
let mut registers_lock = self.write();
registers_lock.writing.insert(segment_id);
@@ -176,21 +198,27 @@ impl SegmentManager {
after_merge_segment_entry: SegmentEntry) {
let mut registers_lock = self.write();
registers_lock.writing.remove(&after_merge_segment_entry.segment_id());
if registers_lock.uncommitted.contains_all(&before_merge_segment_ids) {
for segment_id in before_merge_segment_ids {
registers_lock.uncommitted.remove_segment(segment_id);
let mut target_register: &mut SegmentRegister = {
if registers_lock.uncommitted.contains_all(&before_merge_segment_ids) {
&mut registers_lock.uncommitted
}
registers_lock.uncommitted.add_segment_entry(after_merge_segment_entry);
}
else if registers_lock.committed.contains_all(&before_merge_segment_ids) {
for segment_id in before_merge_segment_ids {
registers_lock.committed.remove_segment(segment_id);
else if registers_lock.committed.contains_all(&before_merge_segment_ids) {
&mut registers_lock.committed
} else {
warn!("couldn't find segment in SegmentManager");
return;
}
registers_lock.committed.add_segment_entry(after_merge_segment_entry);
} else {
warn!("couldn't find segment in SegmentManager");
};
for segment_id in before_merge_segment_ids {
target_register.remove_segment(segment_id);
}
target_register.add_segment_entry(after_merge_segment_entry);
}
pub fn committed_segment_metas(&self,) -> Vec<SegmentMeta> {

View File

@@ -68,13 +68,6 @@ impl SegmentRegister {
segment_ids
}
pub fn segment_ids(&self,) -> Vec<SegmentId> {
self.segment_metas()
.into_iter()
.map(|segment_meta| segment_meta.id())
.collect()
}
pub fn segment_entry(&self, segment_id: &SegmentId) -> Option<SegmentEntry> {
self.segment_states
.get(&segment_id)
@@ -96,6 +89,13 @@ impl SegmentRegister {
self.segment_states.remove(segment_id);
}
pub fn cancel_merge(&mut self, segment_id: &SegmentId) {
self.segment_states
.get_mut(segment_id)
.expect("Received a merge notification for a segment that is not registered")
.cancel_merge();
}
pub fn start_merge(&mut self, segment_id: &SegmentId) {
self.segment_states
.get_mut(segment_id)
@@ -124,6 +124,14 @@ mod tests {
use core::SegmentMeta;
use indexer::delete_queue::*;
use super::*;
fn segment_ids(segment_register: &SegmentRegister) -> Vec<SegmentId> {
segment_register
.segment_metas()
.into_iter()
.map(|segment_meta| segment_meta.id())
.collect()
}
#[test]
fn test_segment_register() {
@@ -140,7 +148,7 @@ mod tests {
segment_register.add_segment_entry(segment_entry);
}
assert_eq!(segment_register.segment_entry(&segment_id_a).unwrap().state(), SegmentState::Ready);
assert_eq!(segment_register.segment_ids(), vec!(segment_id_a));
assert_eq!(segment_ids(&segment_register), vec!(segment_id_a));
{
let segment_meta = SegmentMeta::new(segment_id_b);
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor());
@@ -158,7 +166,7 @@ mod tests {
let segment_entry = SegmentEntry::new(segment_meta_merged, delete_queue.cursor());
segment_register.add_segment_entry(segment_entry);
}
assert_eq!(segment_register.segment_ids(), vec!(segment_id_merged));
assert_eq!(segment_ids(&segment_register), vec!(segment_id_merged));
}
}

View File

@@ -92,6 +92,59 @@ pub fn save_metas(segment_metas: Vec<SegmentMeta>,
pub struct SegmentUpdater(Arc<InnerSegmentUpdater>);
fn perform_merge(segment_ids: &[SegmentId],
segment_updater: &SegmentUpdater,
mut merged_segment: Segment,
target_opstamp: u64) -> Result<SegmentEntry> {
// first we need to apply deletes to our segment.
info!("Start merge: {:?}", segment_ids);
let ref index = segment_updater.0.index;
let schema = index.schema();
let mut segment_entries = vec!();
for segment_id in segment_ids {
if let Some(mut segment_entry) = segment_updater.0
.segment_manager
.segment_entry(segment_id) {
let segment = index.segment(segment_entry.meta().clone());
advance_deletes(segment, &mut segment_entry, target_opstamp)?;
segment_entries.push(segment_entry);
}
else {
error!("Error, had to abort merge as some of the segment is not managed anymore.a");
return Err(Error::InvalidArgument(format!("Segment {:?} requested for merge is not managed.", segment_id)));
}
}
let delete_cursor = segment_entries[0].delete_cursor().clone();
let segments: Vec<Segment> = segment_entries
.iter()
.map(|segment_entry| {
index.segment(segment_entry.meta().clone())
})
.collect();
// An IndexMerger is like a "view" of our merged segments.
let merger: IndexMerger = IndexMerger::open(schema, &segments[..])?;
// ... we just serialize this index merger in our new segment
// to merge the two segments.
let segment_serializer =
SegmentSerializer::for_segment(&mut merged_segment)
.expect("Creating index serializer failed");
let num_docs = merger.write(segment_serializer).expect("Serializing merged index failed");
let mut segment_meta = SegmentMeta::new(merged_segment.id());
segment_meta.set_max_doc(num_docs);
let after_merge_segment_entry = SegmentEntry::new(segment_meta.clone(), delete_cursor);
Ok(after_merge_segment_entry)
}
struct InnerSegmentUpdater {
pool: CpuPool,
index: Index,
@@ -145,8 +198,6 @@ impl SegmentUpdater {
self.0.merging_thread_id.fetch_add(1, Ordering::SeqCst)
}
/// TODO check that we use this correctly taking
/// the laziness in account.
fn run_async<T: 'static + Send, F: 'static + Send + FnOnce(SegmentUpdater) -> T>(&self, f: F) -> impl Future<Item=T, Error=Error> {
let me_clone = self.clone();
self.0.pool.spawn_fn(move || {
@@ -238,66 +289,26 @@ impl SegmentUpdater {
// first we need to apply deletes to our segment.
info!("Start merge: {:?}", segment_ids_vec);
let ref index = segment_updater_clone.0.index;
let schema = index.schema();
let merged_segment = segment_updater_clone.new_segment();
let merged_segment_id = merged_segment.id();
let merge_result = perform_merge(&segment_ids_vec, &segment_updater_clone, merged_segment, target_opstamp);
let mut segment_entries = vec!();
for segment_id in &segment_ids_vec {
if let Some(mut segment_entry) = segment_updater_clone.0
.segment_manager
.segment_entry(segment_id) {
let segment = index.segment(segment_entry.meta().clone());
// TODO unwrap
advance_deletes(segment, &mut segment_entry, target_opstamp).unwrap();
segment_entries.push(segment_entry);
match merge_result {
Ok(after_merge_segment_entry) => {
let merged_segment_meta = after_merge_segment_entry.meta().clone();
segment_updater_clone
.end_merge(segment_ids_vec, after_merge_segment_entry)
.wait()
.expect("Segment updater thread is corrupted.");
merging_future_send.complete(merged_segment_meta);
}
else {
error!("Error, had to abort merge as some of the segment is not managed anymore.a");
return Err(Error::InvalidArgument(format!("Segment {:?} requested for merge is not managed.", segment_id)));
Err(_) => {
// ... cancel merge
warn!("Merge of {:?} was cancelled", segment_ids_vec);
segment_updater_clone.cancel_merge(&segment_ids_vec, merged_segment_id);
// merging_future_send will be dropped, sending an error to the future.
}
}
let delete_cursor = segment_entries[0].delete_cursor().clone();
let segments: Vec<Segment> = segment_entries
.iter()
.map(|segment_entry| {
index.segment(segment_entry.meta().clone())
})
.collect();
// An IndexMerger is like a "view" of our merged segments.
let merger: IndexMerger = IndexMerger::open(schema, &segments[..])?;
let mut merged_segment = index.new_segment();
// ... we just serialize this index merger in our new segment
// to merge the two segments.
let segment_serializer =
SegmentSerializer
::for_segment(&mut merged_segment)
.expect("Creating index serializer failed");
let num_docs = merger.write(segment_serializer).expect("Serializing merged index failed");
let mut segment_meta = SegmentMeta::new(merged_segment.id());
segment_meta.set_max_doc(num_docs);
let before_merged_segment_ids = segment_entries
.iter()
.map(|segment_entry| segment_entry.segment_id())
.collect::<Vec<_>>();
let after_merge_segment_entry = SegmentEntry::new(segment_meta.clone(), delete_cursor);
segment_updater_clone
.end_merge(before_merged_segment_ids, after_merge_segment_entry)
.wait()
.unwrap();
merging_future_send.complete(segment_meta);
segment_updater_clone.0.merging_threads.write().unwrap().remove(&merging_thread_id);
Ok(())
});
@@ -319,6 +330,12 @@ impl SegmentUpdater {
}
}
fn cancel_merge(&self,
before_merge_segment_ids: &[SegmentId],
after_merge_segment_entry: SegmentId) {
self.0.segment_manager.cancel_merge(&before_merge_segment_ids, after_merge_segment_entry);
}
fn end_merge(&self,
before_merge_segment_ids: Vec<SegmentId>,
@@ -336,7 +353,6 @@ impl SegmentUpdater {
segment_updater.0.segment_manager.end_merge(&before_merge_segment_ids, after_merge_segment_entry);
segment_updater.save_metas(segment_updater.0.index.opstamp());
})
}
pub fn wait_merging_thread(&self) -> thread::Result<()> {