The different handles to `SegmentMeta` are closed before calling gc on
end_merge.
This commit is contained in:
Paul Masurel
2019-06-16 14:12:13 +09:00
committed by GitHub
parent 2a88094ec4
commit c23a7c992b
4 changed files with 77 additions and 31 deletions

View File

@@ -537,4 +537,35 @@ mod tests {
}
assert_eq!(count, 2);
}
#[test]
fn garbage_collect_works_as_intended() {
let directory = RAMDirectory::create();
let schema = throw_away_schema();
let field = schema.get_field("num_likes").unwrap();
let index = Index::create(directory.clone(), schema).unwrap();
let mut writer = index.writer_with_num_threads(8, 24_000_000).unwrap();
for i in 0u64..8_000u64 {
writer.add_document(doc!(field => i));
}
writer.commit().unwrap();
let mem_right_after_commit = directory.total_mem_usage();
thread::sleep(Duration::from_millis(1_000));
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.try_into()
.unwrap();
assert_eq!(reader.searcher().num_docs(), 8_000);
writer.wait_merging_threads().unwrap();
let mem_right_after_merge_finished = directory.total_mem_usage();
reader.reload().unwrap();
let searcher = reader.searcher();
assert_eq!(searcher.num_docs(), 8_000);
assert!(mem_right_after_merge_finished < mem_right_after_commit);
}
}

View File

@@ -103,6 +103,10 @@ impl InnerDirectory {
fn watch(&mut self, watch_handle: WatchCallback) -> WatchHandle {
self.watch_router.subscribe(watch_handle)
}
fn total_mem_usage(&self) -> usize {
self.fs.values().map(|f| f.len()).sum()
}
}
impl fmt::Debug for RAMDirectory {
@@ -126,6 +130,12 @@ impl RAMDirectory {
pub fn create() -> RAMDirectory {
Self::default()
}
/// Returns the sum of the size of the different files
/// in the RAMDirectory.
pub fn total_mem_usage(&self) -> usize {
self.fs.read().unwrap().total_mem_usage()
}
}
impl Directory for RAMDirectory {

View File

@@ -332,7 +332,8 @@ fn index_documents(
}
impl IndexWriter {
/// The index writer
/// If there are some merging threads, blocks until they all finish their work and
/// then drop the `IndexWriter`.
pub fn wait_merging_threads(mut self) -> Result<()> {
// this will stop the indexing thread,
// dropping the last reference to the segment_updater.
@@ -1205,7 +1206,7 @@ mod tests {
assert!(clear_tstamp < commit_tstamp);
// rollback
let rollback_tstamp = index_writer.rollback().unwrap();
let _rollback_tstamp = index_writer.rollback().unwrap();
// Find original docs in the index
let term_a = Term::from_field_text(text_field, "a");
// expect the document with that term to be in the index

View File

@@ -70,6 +70,7 @@ pub fn save_new_metas(schema: Schema, directory: &mut Directory) -> Result<()> {
///
/// This method is not part of tantivy's public API
fn save_metas(metas: &IndexMeta, directory: &mut Directory) -> Result<()> {
info!("save metas");
let mut buffer = serde_json::to_vec_pretty(metas)?;
// Just adding a new line at the end of the buffer.
writeln!(&mut buffer)?;
@@ -451,38 +452,41 @@ impl SegmentUpdater {
) -> Result<()> {
self.run_async(move |segment_updater| {
info!("End merge {:?}", after_merge_segment_entry.meta());
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
if let Some(delete_operation) = delete_cursor.get() {
let committed_opstamp = segment_updater.load_metas().opstamp;
if delete_operation.opstamp < committed_opstamp {
let index = &segment_updater.0.index;
let segment = index.segment(after_merge_segment_entry.meta().clone());
if let Err(e) =
advance_deletes(segment, &mut after_merge_segment_entry, committed_opstamp)
{
error!(
"Merge of {:?} was cancelled (advancing deletes failed): {:?}",
merge_operation.segment_ids(),
e
);
if cfg!(test) {
panic!("Merge failed.");
{
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
if let Some(delete_operation) = delete_cursor.get() {
let committed_opstamp = segment_updater.load_metas().opstamp;
if delete_operation.opstamp < committed_opstamp {
let index = &segment_updater.0.index;
let segment = index.segment(after_merge_segment_entry.meta().clone());
if let Err(e) = advance_deletes(
segment,
&mut after_merge_segment_entry,
committed_opstamp,
) {
error!(
"Merge of {:?} was cancelled (advancing deletes failed): {:?}",
merge_operation.segment_ids(),
e
);
if cfg!(test) {
panic!("Merge failed.");
}
// ... cancel merge
// `merge_operations` are tracked. As it is dropped, the
// the segment_ids will be available again for merge.
return;
}
// ... cancel merge
// `merge_operations` are tracked. As it is dropped, the
// the segment_ids will be available again for merge.
return;
}
}
}
segment_updater
.0
.segment_manager
.end_merge(merge_operation.segment_ids(), after_merge_segment_entry);
segment_updater.consider_merge_options();
info!("save metas");
let previous_metas = segment_updater.load_metas();
segment_updater.save_metas(previous_metas.opstamp, previous_metas.payload.clone());
let previous_metas = segment_updater.load_metas();
segment_updater
.0
.segment_manager
.end_merge(merge_operation.segment_ids(), after_merge_segment_entry);
segment_updater.consider_merge_options();
segment_updater.save_metas(previous_metas.opstamp, previous_metas.payload.clone());
} // we drop all possible handle to a now useless `SegmentMeta`.
segment_updater.garbage_collect_files_exec();
})
.wait()