From e2da92fcb588b465c337ab0f465741a8696c0756 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 12 Jun 2019 09:40:03 +0900 Subject: [PATCH] Petr tik n510 clear index (#566) * Enables clearing the index Closes #510 * Adds an examples to clear and rebuild index * Addressing code review Moved the example from examples/ to docstring above `clear` * Corrected minor typos and missed/duplicate words * Added stamper.revert method to be used for rollback Added type alias for Opstamp Moved to AtomicU64 on stable rust (since 1.34) * Change the method name and doc-string * Remove rollback from delete_all_documents test_add_then_delete_all_documents fails with --test-threads 2 * Passes all the tests with any number of test-threads (ran locally 5 times) * Addressed code review Deleted comments with debug info changed ReloadPolicy to Manual * Removing useless garbage_collect call and updated CHANGELOG --- CHANGELOG.md | 2 + src/core/segment_reader.rs | 2 +- src/directory/error.rs | 2 +- src/indexer/index_writer.rs | 197 ++++++++++++++++++++++++++++++++- src/indexer/segment_manager.rs | 6 + src/indexer/segment_updater.rs | 38 ++++++- src/indexer/segment_writer.rs | 2 +- src/indexer/stamper.rs | 19 ++++ src/store/writer.rs | 2 +- 9 files changed, 259 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f32a4d56c..c0b43241b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ Tantivy 0.10.0 - Added an ASCII folding filter (@drusellers) - Bugfix in `query.count` in presence of deletes (@pmasurel) - Added `.explain(...)` in `Query` and `Weight` to (@pmasurel) +- Added an efficient way to `delete_all_documents` in `IndexWriter` (@petr-tik). + All segments are simply removed. Minor --------- diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index 2dbb5ad28..42c1ded64 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -246,7 +246,7 @@ impl SegmentReader { let termdict_source = self .termdict_composite .open_read(field) - .expect("Failed to open field term dictionary in composite file. Is the field indexed"); + .expect("Failed to open field term dictionary in composite file. Is the field indexed?"); let positions_source = self .positions_composite diff --git a/src/directory/error.rs b/src/directory/error.rs index e56971029..a57ae1371 100644 --- a/src/directory/error.rs +++ b/src/directory/error.rs @@ -6,7 +6,7 @@ use std::path::PathBuf; /// Error while trying to acquire a directory lock. #[derive(Debug, Fail)] pub enum LockError { - /// Failed to acquired a lock as it is already hold by another + /// Failed to acquired a lock as it is already held by another /// client. /// - In the context of a blocking lock, this means the lock was not released within some `timeout` period. /// - In the context of a non-blocking lock, this means the lock was busy at the moment of the call. diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index d5bb06479..17b807d9a 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -383,7 +383,6 @@ impl IndexWriter { /// Spawns a new worker thread for indexing. /// The thread consumes documents from the pipeline. - /// fn add_indexing_worker(&mut self) -> Result<()> { let document_receiver_clone = self.operation_receiver.clone(); let mut segment_updater = self.segment_updater.clone(); @@ -462,6 +461,52 @@ impl IndexWriter { self.segment_updater.garbage_collect_files() } + /// Deletes all documents from the index + /// + /// Requires `commit`ing + /// Enables users to rebuild the index, + /// by clearing and resubmitting necessary documents + /// + /// ```rust + /// #[macro_use] + /// extern crate tantivy; + /// use tantivy::query::QueryParser; + /// use tantivy::collector::TopDocs; + /// use tantivy::schema::*; + /// use tantivy::Index; + /// + /// fn main() -> tantivy::Result<()> { + /// let mut schema_builder = Schema::builder(); + /// let title = schema_builder.add_text_field("title", TEXT | STORED); + /// let schema = schema_builder.build(); + /// + /// let index = Index::create_in_ram(schema.clone()); + /// + /// let mut index_writer = index.writer_with_num_threads(1, 50_000_000)?; + /// index_writer.add_document(doc!(title => "The modern Promotheus")); + /// index_writer.commit()?; + /// + /// let clear_res = index_writer.delete_all_documents().unwrap(); + /// // have to commit, otherwise deleted terms remain available + /// index_writer.commit()?; + /// + /// let searcher = index.reader()?.searcher(); + /// let query_parser = QueryParser::for_index(&index, vec![title]); + /// let query_promo = query_parser.parse_query("Promotheus")?; + /// let top_docs_promo = searcher.search(&query_promo, &TopDocs::with_limit(1))?; + /// + /// assert!(top_docs_promo.is_empty()); + /// Ok(()) + /// } + /// ``` + pub fn delete_all_documents(&mut self) -> Result { + // Delete segments + self.segment_updater.remove_all_segments(); + // Return new stamp - reverted stamp + self.stamper.revert(self.committed_opstamp); + Ok(self.committed_opstamp) + } + /// Merges a given list of segments /// /// `segment_ids` is required to be non-empty. @@ -489,19 +534,22 @@ impl IndexWriter { /// Rollback to the last commit /// - /// This cancels all of the update that - /// happened before after the last commit. + /// This cancels all of the updates that + /// happened after the last commit. /// After calling rollback, the index is in the same /// state as it was after the last commit. /// /// The opstamp at the last commit is returned. pub fn rollback(&mut self) -> Result { info!("Rolling back to opstamp {}", self.committed_opstamp); + self.rollback_impl() + } + /// Private, implementation of rollback + fn rollback_impl(&mut self) -> Result { // marks the segment updater as killed. From now on, all // segment updates will be ignored. self.segment_updater.kill(); - let document_receiver = self.operation_receiver.clone(); // take the directory lock to create a new index_writer. @@ -1049,4 +1097,145 @@ mod tests { assert_eq!(num_docs_containing("b"), 0); fail::cfg("RAMDirectory::atomic_write", "off").unwrap(); } + + #[test] + fn test_add_then_delete_all_documents() { + let mut schema_builder = schema::Schema::builder(); + let text_field = schema_builder.add_text_field("text", schema::TEXT); + let index = Index::create_in_ram(schema_builder.build()); + let reader = index + .reader_builder() + .reload_policy(ReloadPolicy::Manual) + .try_into() + .unwrap(); + let num_docs_containing = |s: &str| { + reader.reload().unwrap(); + let searcher = reader.searcher(); + let term = Term::from_field_text(text_field, s); + searcher.doc_freq(&term) + }; + let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap(); + + let add_tstamp = index_writer.add_document(doc!(text_field => "a")); + let commit_tstamp = index_writer.commit().unwrap(); + assert!(commit_tstamp > add_tstamp); + index_writer.delete_all_documents().unwrap(); + index_writer.commit().unwrap(); + + // Search for documents with the same term that we added + assert_eq!(num_docs_containing("a"), 0); + } + + #[test] + fn test_delete_all_documents_rollback_correct_stamp() { + let mut schema_builder = schema::Schema::builder(); + let text_field = schema_builder.add_text_field("text", schema::TEXT); + let index = Index::create_in_ram(schema_builder.build()); + let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap(); + + let add_tstamp = index_writer.add_document(doc!(text_field => "a")); + + // commit documents - they are now available + let first_commit = index_writer.commit(); + assert!(first_commit.is_ok()); + let first_commit_tstamp = first_commit.unwrap(); + assert!(first_commit_tstamp > add_tstamp); + + // delete_all_documents the index + let clear_tstamp = index_writer.delete_all_documents().unwrap(); + assert_eq!(clear_tstamp, add_tstamp); + + // commit the clear command - now documents aren't available + let second_commit = index_writer.commit(); + assert!(second_commit.is_ok()); + let second_commit_tstamp = second_commit.unwrap(); + + // add new documents again + for _ in 0..100 { + index_writer.add_document(doc!(text_field => "b")); + } + + // rollback to last commit, when index was empty + let rollback = index_writer.rollback(); + assert!(rollback.is_ok()); + let rollback_tstamp = rollback.unwrap(); + assert_eq!(rollback_tstamp, second_commit_tstamp); + + // working with an empty index == no documents + let term_b = Term::from_field_text(text_field, "b"); + assert_eq!(index.reader().unwrap().searcher().doc_freq(&term_b), 0); + } + + #[test] + fn test_delete_all_documents_then_add() { + let mut schema_builder = schema::Schema::builder(); + let text_field = schema_builder.add_text_field("text", schema::TEXT); + let index = Index::create_in_ram(schema_builder.build()); + // writing the segment + let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap(); + let res = index_writer.delete_all_documents(); + assert!(res.is_ok()); + + assert!(index_writer.commit().is_ok()); + // add one simple doc + index_writer.add_document(doc!(text_field => "a")); + assert!(index_writer.commit().is_ok()); + + let term_a = Term::from_field_text(text_field, "a"); + // expect the document with that term to be in the index + assert_eq!(index.reader().unwrap().searcher().doc_freq(&term_a), 1); + } + + #[test] + fn test_delete_all_documents_and_rollback() { + let mut schema_builder = schema::Schema::builder(); + let text_field = schema_builder.add_text_field("text", schema::TEXT); + let index = Index::create_in_ram(schema_builder.build()); + let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap(); + + // add one simple doc + index_writer.add_document(doc!(text_field => "a")); + let comm = index_writer.commit(); + assert!(comm.is_ok()); + let commit_tstamp = comm.unwrap(); + + // clear but don't commit! + let clear_tstamp = index_writer.delete_all_documents().unwrap(); + // clear_tstamp should reset to before the last commit + assert!(clear_tstamp < commit_tstamp); + + // rollback + let rollback_tstamp = index_writer.rollback().unwrap(); + // Find original docs in the index + let term_a = Term::from_field_text(text_field, "a"); + // expect the document with that term to be in the index + assert_eq!(index.reader().unwrap().searcher().doc_freq(&term_a), 1); + } + + #[test] + fn test_delete_all_documents_empty_index() { + let schema_builder = schema::Schema::builder(); + let index = Index::create_in_ram(schema_builder.build()); + let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap(); + let clear = index_writer.delete_all_documents(); + let commit = index_writer.commit(); + assert!(clear.is_ok()); + assert!(commit.is_ok()); + } + + #[test] + fn test_delete_all_documents_index_twice() { + let schema_builder = schema::Schema::builder(); + let index = Index::create_in_ram(schema_builder.build()); + let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap(); + let clear = index_writer.delete_all_documents(); + let commit = index_writer.commit(); + assert!(clear.is_ok()); + assert!(commit.is_ok()); + let clear_again = index_writer.delete_all_documents(); + let commit_again = index_writer.commit(); + assert!(clear_again.is_ok()); + assert!(commit_again.is_ok()); + } + } diff --git a/src/indexer/segment_manager.rs b/src/indexer/segment_manager.rs index 4e3a7e7e4..0087ac12c 100644 --- a/src/indexer/segment_manager.rs +++ b/src/indexer/segment_manager.rs @@ -118,6 +118,12 @@ impl SegmentManager { }); } + pub(crate) fn remove_all_segments(&self) { + let mut registers_lock = self.write(); + registers_lock.committed.clear(); + registers_lock.uncommitted.clear(); + } + pub fn commit(&self, segment_entries: Vec) { let mut registers_lock = self.write(); registers_lock.committed.clear(); diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 4dd7a6804..a2e882276 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -42,9 +42,9 @@ use Result; /// Save the index meta file. /// This operation is atomic : /// Either -// - it fails, in which case an error is returned, +/// - it fails, in which case an error is returned, /// and the `meta.json` remains untouched, -/// - it success, and `meta.json` is written +/// - it succeeds, and `meta.json` is written /// and flushed. /// /// This method is not part of tantivy's public API @@ -213,6 +213,11 @@ impl SegmentUpdater { } } + /// Orders `SegmentManager` to remove all segments + pub(crate) fn remove_all_segments(&self) { + self.0.segment_manager.remove_all_segments(); + } + pub fn kill(&mut self) { self.0.killed.store(true, Ordering::Release); } @@ -223,7 +228,7 @@ impl SegmentUpdater { /// Apply deletes up to the target opstamp to all segments. /// - /// Tne method returns copies of the segment entries, + /// The method returns copies of the segment entries, /// updated with the delete information. fn purge_deletes(&self, target_opstamp: Opstamp) -> Result> { let mut segment_entries = self.0.segment_manager.segment_entries(); @@ -651,4 +656,31 @@ mod tests { assert!(index.searchable_segment_metas().unwrap().is_empty()); assert!(reader.searcher().segment_readers().is_empty()); } + + #[test] + fn test_remove_all_segments() { + let mut schema_builder = Schema::builder(); + let text_field = schema_builder.add_text_field("text", TEXT); + let schema = schema_builder.build(); + + let index = Index::create_in_ram(schema); + + // writing the segment + let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); + + { + for _ in 0..100 { + index_writer.add_document(doc!(text_field=>"a")); + index_writer.add_document(doc!(text_field=>"b")); + } + assert!(index_writer.commit().is_ok()); + } + index_writer.segment_updater().remove_all_segments(); + let seg_vec = index_writer + .segment_updater() + .0 + .segment_manager + .segment_entries(); + assert!(seg_vec.is_empty()); + } } diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index a7da06fd1..0af1a74fd 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -20,7 +20,7 @@ use Opstamp; use Result; /// A `SegmentWriter` is in charge of creating segment index from a -/// documents. +/// set of documents. /// /// They creates the postings list in anonymous memory. /// The segment is layed on disk when the segment gets `finalized`. diff --git a/src/indexer/stamper.rs b/src/indexer/stamper.rs index 631cbb527..9385c098c 100644 --- a/src/indexer/stamper.rs +++ b/src/indexer/stamper.rs @@ -28,6 +28,12 @@ impl Stamper { end: start + n, } } + + /// Reverts the stamper to a given `Opstamp` value and returns it + pub fn revert(&self, to_opstamp: Opstamp) -> Opstamp { + self.0.store(to_opstamp, Ordering::SeqCst); + to_opstamp + } } #[cfg(test)] @@ -50,4 +56,17 @@ mod test { assert_eq!(stamper.stamp(), 15u64); } + #[test] + fn test_stamper_revert() { + let stamper = Stamper::new(7u64); + assert_eq!(stamper.stamp(), 7u64); + assert_eq!(stamper.stamp(), 8u64); + + let stamper_clone = stamper.clone(); + assert_eq!(stamper_clone.stamp(), 9u64); + + stamper.revert(6); + assert_eq!(stamper.stamp(), 6); + assert_eq!(stamper_clone.stamp(), 7); + } } diff --git a/src/store/writer.rs b/src/store/writer.rs index 3fbdee074..4693b8900 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -16,7 +16,7 @@ const BLOCK_SIZE: usize = 16_384; /// the store is written to disc as document as being added, /// as opposed to when the segment is getting finalized. /// -/// The skip list index on the other hand, is build in memory. +/// The skip list index on the other hand, is built in memory. /// pub struct StoreWriter { doc: DocId,