diff --git a/src/core/index.rs b/src/core/index.rs index 8b486e564..87590984d 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -537,4 +537,35 @@ mod tests { } assert_eq!(count, 2); } + + #[test] + fn garbage_collect_works_as_intended() { + let directory = RAMDirectory::create(); + let schema = throw_away_schema(); + let field = schema.get_field("num_likes").unwrap(); + let index = Index::create(directory.clone(), schema).unwrap(); + + let mut writer = index.writer_with_num_threads(8, 24_000_000).unwrap(); + for i in 0u64..8_000u64 { + writer.add_document(doc!(field => i)); + } + writer.commit().unwrap(); + let mem_right_after_commit = directory.total_mem_usage(); + thread::sleep(Duration::from_millis(1_000)); + let reader = index + .reader_builder() + .reload_policy(ReloadPolicy::Manual) + .try_into() + .unwrap(); + + assert_eq!(reader.searcher().num_docs(), 8_000); + writer.wait_merging_threads().unwrap(); + let mem_right_after_merge_finished = directory.total_mem_usage(); + + reader.reload().unwrap(); + let searcher = reader.searcher(); + assert_eq!(searcher.num_docs(), 8_000); + assert!(mem_right_after_merge_finished < mem_right_after_commit); + } + } diff --git a/src/directory/ram_directory.rs b/src/directory/ram_directory.rs index 804763e3c..38fc35cc4 100644 --- a/src/directory/ram_directory.rs +++ b/src/directory/ram_directory.rs @@ -103,6 +103,10 @@ impl InnerDirectory { fn watch(&mut self, watch_handle: WatchCallback) -> WatchHandle { self.watch_router.subscribe(watch_handle) } + + fn total_mem_usage(&self) -> usize { + self.fs.values().map(|f| f.len()).sum() + } } impl fmt::Debug for RAMDirectory { @@ -126,6 +130,12 @@ impl RAMDirectory { pub fn create() -> RAMDirectory { Self::default() } + + /// Returns the sum of the size of the different files + /// in the RAMDirectory. + pub fn total_mem_usage(&self) -> usize { + self.fs.read().unwrap().total_mem_usage() + } } impl Directory for RAMDirectory { diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 17b807d9a..bd3d829bf 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -332,7 +332,8 @@ fn index_documents( } impl IndexWriter { - /// The index writer + /// If there are some merging threads, blocks until they all finish their work and + /// then drop the `IndexWriter`. pub fn wait_merging_threads(mut self) -> Result<()> { // this will stop the indexing thread, // dropping the last reference to the segment_updater. @@ -1205,7 +1206,7 @@ mod tests { assert!(clear_tstamp < commit_tstamp); // rollback - let rollback_tstamp = index_writer.rollback().unwrap(); + let _rollback_tstamp = index_writer.rollback().unwrap(); // Find original docs in the index let term_a = Term::from_field_text(text_field, "a"); // expect the document with that term to be in the index diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index a2e882276..b56842a1d 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -70,6 +70,7 @@ pub fn save_new_metas(schema: Schema, directory: &mut Directory) -> Result<()> { /// /// This method is not part of tantivy's public API fn save_metas(metas: &IndexMeta, directory: &mut Directory) -> Result<()> { + info!("save metas"); let mut buffer = serde_json::to_vec_pretty(metas)?; // Just adding a new line at the end of the buffer. writeln!(&mut buffer)?; @@ -451,38 +452,41 @@ impl SegmentUpdater { ) -> Result<()> { self.run_async(move |segment_updater| { info!("End merge {:?}", after_merge_segment_entry.meta()); - let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone(); - if let Some(delete_operation) = delete_cursor.get() { - let committed_opstamp = segment_updater.load_metas().opstamp; - if delete_operation.opstamp < committed_opstamp { - let index = &segment_updater.0.index; - let segment = index.segment(after_merge_segment_entry.meta().clone()); - if let Err(e) = - advance_deletes(segment, &mut after_merge_segment_entry, committed_opstamp) - { - error!( - "Merge of {:?} was cancelled (advancing deletes failed): {:?}", - merge_operation.segment_ids(), - e - ); - if cfg!(test) { - panic!("Merge failed."); + { + let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone(); + if let Some(delete_operation) = delete_cursor.get() { + let committed_opstamp = segment_updater.load_metas().opstamp; + if delete_operation.opstamp < committed_opstamp { + let index = &segment_updater.0.index; + let segment = index.segment(after_merge_segment_entry.meta().clone()); + if let Err(e) = advance_deletes( + segment, + &mut after_merge_segment_entry, + committed_opstamp, + ) { + error!( + "Merge of {:?} was cancelled (advancing deletes failed): {:?}", + merge_operation.segment_ids(), + e + ); + if cfg!(test) { + panic!("Merge failed."); + } + // ... cancel merge + // `merge_operations` are tracked. As it is dropped, the + // the segment_ids will be available again for merge. + return; } - // ... cancel merge - // `merge_operations` are tracked. As it is dropped, the - // the segment_ids will be available again for merge. - return; } } - } - segment_updater - .0 - .segment_manager - .end_merge(merge_operation.segment_ids(), after_merge_segment_entry); - segment_updater.consider_merge_options(); - info!("save metas"); - let previous_metas = segment_updater.load_metas(); - segment_updater.save_metas(previous_metas.opstamp, previous_metas.payload.clone()); + let previous_metas = segment_updater.load_metas(); + segment_updater + .0 + .segment_manager + .end_merge(merge_operation.segment_ids(), after_merge_segment_entry); + segment_updater.consider_merge_options(); + segment_updater.save_metas(previous_metas.opstamp, previous_metas.payload.clone()); + } // we drop all possible handle to a now useless `SegmentMeta`. segment_updater.garbage_collect_files_exec(); }) .wait()