mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-11 03:22:55 +00:00
NOBUG Garbage collection after end merge.
This commit is contained in:
@@ -144,8 +144,7 @@ impl InnerHeap {
|
||||
addr
|
||||
} else {
|
||||
if self.next_heap.is_none() {
|
||||
info!(r#"Exceeded heap size.
|
||||
The segment will be committed right after indexing this document."#,);
|
||||
info!(r#"Exceeded heap size. The segment will be committed right after indexing this document."#,);
|
||||
self.next_heap = Some(Box::new(InnerHeap::with_capacity(self.buffer_len as usize)));
|
||||
}
|
||||
self.next_heap.as_mut().unwrap().allocate_space(num_bytes) + self.buffer_len
|
||||
|
||||
@@ -117,6 +117,7 @@ impl ManagedDirectory {
|
||||
/// an error is simply logged, and the file remains in the list of managed
|
||||
/// files.
|
||||
pub fn garbage_collect(&mut self, living_files: HashSet<PathBuf>) {
|
||||
info!("Garbage collect");
|
||||
let mut files_to_delete = vec![];
|
||||
{
|
||||
// releasing the lock as .delete() will use it too.
|
||||
|
||||
@@ -258,11 +258,14 @@ impl SegmentUpdater {
|
||||
}
|
||||
|
||||
pub fn garbage_collect_files(&self) -> Result<()> {
|
||||
self.run_async(move |segment_updater| { segment_updater.garbage_collect_files_exec(); })
|
||||
.wait()
|
||||
self.run_async(move |segment_updater| {
|
||||
segment_updater.garbage_collect_files_exec();
|
||||
})
|
||||
.wait()
|
||||
}
|
||||
|
||||
fn garbage_collect_files_exec(&self) {
|
||||
info!("Running garbage collection");
|
||||
let living_files = self.0.segment_manager.list_files();
|
||||
let mut index = self.0.index.clone();
|
||||
index.directory_mut().garbage_collect(living_files);
|
||||
@@ -270,14 +273,14 @@ impl SegmentUpdater {
|
||||
|
||||
pub fn commit(&self, opstamp: u64) -> Result<()> {
|
||||
self.run_async(move |segment_updater| if segment_updater.is_alive() {
|
||||
let segment_entries = segment_updater
|
||||
.purge_deletes(opstamp)
|
||||
.expect("Failed purge deletes");
|
||||
segment_updater.0.segment_manager.commit(segment_entries);
|
||||
segment_updater.save_metas(opstamp);
|
||||
segment_updater.garbage_collect_files_exec();
|
||||
segment_updater.consider_merge_options();
|
||||
})
|
||||
let segment_entries = segment_updater
|
||||
.purge_deletes(opstamp)
|
||||
.expect("Failed purge deletes");
|
||||
segment_updater.0.segment_manager.commit(segment_entries);
|
||||
segment_updater.save_metas(opstamp);
|
||||
segment_updater.garbage_collect_files_exec();
|
||||
segment_updater.consider_merge_options();
|
||||
})
|
||||
.wait()
|
||||
}
|
||||
|
||||
@@ -379,7 +382,7 @@ impl SegmentUpdater {
|
||||
-> Result<()> {
|
||||
|
||||
self.run_async(move |segment_updater| {
|
||||
debug!("End merge {:?}", after_merge_segment_entry.meta());
|
||||
info!("End merge {:?}", after_merge_segment_entry.meta());
|
||||
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
|
||||
let mut _file_protection_opt = None;
|
||||
if let Some(delete_operation) = delete_cursor.get() {
|
||||
@@ -410,7 +413,9 @@ impl SegmentUpdater {
|
||||
segment_updater.0.segment_manager.end_merge(&before_merge_segment_ids,
|
||||
after_merge_segment_entry);
|
||||
segment_updater.consider_merge_options();
|
||||
info!("save metas");
|
||||
segment_updater.save_metas(segment_updater.0.index.opstamp());
|
||||
segment_updater.garbage_collect_files_exec();
|
||||
}).wait()
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user