diff --git a/src/indexer/delete_queue.rs b/src/indexer/delete_queue.rs index 3aa9f0d85..461db0ba8 100644 --- a/src/indexer/delete_queue.rs +++ b/src/indexer/delete_queue.rs @@ -189,7 +189,7 @@ impl DeleteCursor { fn is_behind_opstamp(&mut self, target_opstamp: Opstamp) -> bool { self.get() - .map(|operation| operation.opstamp < target_opstamp) + .map(|operation| operation.opstamp() < target_opstamp) .unwrap_or(false) } @@ -263,7 +263,7 @@ mod tests { fn test_deletequeue() { let delete_queue = DeleteQueue::new(); - let make_op = |i: usize| DeleteOperation { + let make_op = |i: usize| DeleteOperation::ByWeight { opstamp: i as u64, target: Box::new(DummyWeight), }; @@ -274,9 +274,9 @@ mod tests { let snapshot = delete_queue.cursor(); { let mut operations_it = snapshot.clone(); - assert_eq!(operations_it.get().unwrap().opstamp, 1); + assert_eq!(operations_it.get().unwrap().opstamp(), 1); operations_it.advance(); - assert_eq!(operations_it.get().unwrap().opstamp, 2); + assert_eq!(operations_it.get().unwrap().opstamp(), 2); operations_it.advance(); assert!(operations_it.get().is_none()); operations_it.advance(); @@ -284,20 +284,20 @@ mod tests { let mut snapshot2 = delete_queue.cursor(); assert!(snapshot2.get().is_none()); delete_queue.push(make_op(3)); - assert_eq!(snapshot2.get().unwrap().opstamp, 3); - assert_eq!(operations_it.get().unwrap().opstamp, 3); - assert_eq!(operations_it.get().unwrap().opstamp, 3); + assert_eq!(snapshot2.get().unwrap().opstamp(), 3); + assert_eq!(operations_it.get().unwrap().opstamp(), 3); + assert_eq!(operations_it.get().unwrap().opstamp(), 3); operations_it.advance(); assert!(operations_it.get().is_none()); operations_it.advance(); } { let mut operations_it = snapshot; - assert_eq!(operations_it.get().unwrap().opstamp, 1); + assert_eq!(operations_it.get().unwrap().opstamp(), 1); operations_it.advance(); - assert_eq!(operations_it.get().unwrap().opstamp, 2); + assert_eq!(operations_it.get().unwrap().opstamp(), 2); operations_it.advance(); - assert_eq!(operations_it.get().unwrap().opstamp, 3); + assert_eq!(operations_it.get().unwrap().opstamp(), 3); operations_it.advance(); assert!(operations_it.get().is_none()); } diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index af5626f9b..76f07de27 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -22,7 +22,7 @@ use crate::indexer::{MergePolicy, SegmentEntry, SegmentWriter}; use crate::query::{EnableScoring, Query, TermQuery}; use crate::schema::document::Document; use crate::schema::{IndexRecordOption, TantivyDocument, Term}; -use crate::{FutureResult, Opstamp}; +use crate::{DocId, FutureResult, Opstamp}; // Size of the margin for the `memory_arena`. A segment is closed when the remaining memory // in the `memory_arena` goes below MARGIN_IN_BYTES. @@ -101,24 +101,41 @@ fn compute_deleted_bitset( ) -> crate::Result { let mut might_have_changed = false; while let Some(delete_op) = delete_cursor.get() { - if delete_op.opstamp > target_opstamp { + if delete_op.opstamp() > target_opstamp { break; } - // A delete operation should only affect - // document that were inserted before it. - delete_op - .target - .for_each_no_score(segment_reader, &mut |docs_matching_delete_query| { - for doc_matching_delete_query in docs_matching_delete_query.iter().cloned() { - if doc_opstamps.is_deleted(doc_matching_delete_query, delete_op.opstamp) { - alive_bitset.remove(doc_matching_delete_query); + match delete_op { + DeleteOperation::ByWeight { opstamp, target } => { + // A delete operation should only affect + // document that were inserted before it. + target.for_each_no_score(segment_reader, &mut |docs_matching_delete_query| { + for doc_matching_delete_query in docs_matching_delete_query.iter().cloned() { + if doc_opstamps.is_deleted(doc_matching_delete_query, *opstamp) { + alive_bitset.remove(doc_matching_delete_query); + might_have_changed = true; + } + } + })?; + } + + DeleteOperation::ByAddress { + opstamp, + segment_id, + doc_id, + } => { + if *segment_id == segment_reader.segment_id() { + if doc_opstamps.is_deleted(*doc_id, *opstamp) { + alive_bitset.remove(*doc_id); might_have_changed = true; } } - })?; + } + } + delete_cursor.advance(); } + Ok(might_have_changed) } @@ -698,7 +715,7 @@ impl IndexWriter { pub fn delete_query(&self, query: Box) -> crate::Result { let weight = query.weight(EnableScoring::disabled_from_schema(&self.index.schema()))?; let opstamp = self.stamper.stamp(); - let delete_operation = DeleteOperation { + let delete_operation = DeleteOperation::ByWeight { opstamp, target: weight, }; @@ -706,6 +723,17 @@ impl IndexWriter { Ok(opstamp) } + /// Delete a specific document by its already-known [`DocAddress`] + pub fn delete_by_address(&self, segment_id: SegmentId, doc_id: DocId) -> Opstamp { + let opstamp = self.stamper.stamp(); + self.delete_queue.push(DeleteOperation::ByAddress { + opstamp, + segment_id, + doc_id, + }); + opstamp + } + /// Returns the opstamp of the last successful commit. /// /// This is, for instance, the opstamp the index will @@ -779,7 +807,7 @@ impl IndexWriter { let query = TermQuery::new(term, IndexRecordOption::Basic); let weight = query.weight(EnableScoring::disabled_from_schema(&self.index.schema()))?; - let delete_operation = DeleteOperation { + let delete_operation = DeleteOperation::ByWeight { opstamp, target: weight, }; @@ -789,6 +817,13 @@ impl IndexWriter { let add_operation = AddOperation { opstamp, document }; adds.push(add_operation); } + UserOperation::DeleteByAddress(segment_id, doc_id) => { + self.delete_queue.push(DeleteOperation::ByAddress { + opstamp, + segment_id, + doc_id, + }); + } } } self.send_add_documents_batch(adds)?; diff --git a/src/indexer/operation.rs b/src/indexer/operation.rs index 69bffec17..9c7c96caa 100644 --- a/src/indexer/operation.rs +++ b/src/indexer/operation.rs @@ -1,12 +1,28 @@ +use crate::index::SegmentId; use crate::query::Weight; use crate::schema::document::Document; use crate::schema::{TantivyDocument, Term}; -use crate::Opstamp; +use crate::{DocId, Opstamp}; -/// Timestamped Delete operation. -pub struct DeleteOperation { - pub opstamp: Opstamp, - pub target: Box, +pub enum DeleteOperation { + ByWeight { + opstamp: Opstamp, + target: Box, + }, + ByAddress { + opstamp: Opstamp, + segment_id: SegmentId, + doc_id: DocId, + }, +} + +impl DeleteOperation { + pub fn opstamp(&self) -> Opstamp { + match self { + DeleteOperation::ByWeight { opstamp, .. } => *opstamp, + DeleteOperation::ByAddress { opstamp, .. } => *opstamp, + } + } } /// Timestamped Add operation. @@ -23,4 +39,7 @@ pub enum UserOperation { Add(D), /// Delete operation Delete(Term), + + /// Delete a document by its address + DeleteByAddress(SegmentId, DocId), } diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 0b1c58657..e38aaea88 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -672,7 +672,7 @@ impl SegmentUpdater { let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone(); if let Some(delete_operation) = delete_cursor.get() { let committed_opstamp = segment_updater.load_meta().opstamp; - if delete_operation.opstamp < committed_opstamp { + if delete_operation.opstamp() < committed_opstamp { // We are not up to date! Let's create a new tombstone file for our // freshly create split. let index = &segment_updater.index; diff --git a/src/lib.rs b/src/lib.rs index b8e85b032..82e2d8acb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1167,6 +1167,77 @@ pub mod tests { Ok(()) } + #[test] + fn test_delete_by_address() -> crate::Result<()> { + use crate::collector::Count; + use crate::index::SegmentId; + use crate::indexer::NoMergePolicy; + use crate::query::AllQuery; + + const DOC_COUNT: u64 = 2u64; + + let mut schema_builder = SchemaBuilder::default(); + let id = schema_builder.add_u64_field("id", INDEXED); + let schema = schema_builder.build(); + + let index = Index::create_in_ram(schema); + let index_reader = index.reader()?; + + let mut index_writer: IndexWriter = index.writer_for_tests()?; + index_writer.set_merge_policy(Box::new(NoMergePolicy)); + + for doc_id in 0u64..DOC_COUNT { + index_writer.add_document(doc!(id => doc_id))?; + } + index_writer.commit()?; + + index_reader.reload()?; + let searcher = index_reader.searcher(); + + assert_eq!( + searcher.search(&AllQuery, &Count).unwrap(), + DOC_COUNT as usize + ); + + let segment_readers = searcher.segment_readers(); + assert!(segment_readers.len() == 1); + let segment_id = segment_readers[0].segment_id(); + + // update the 10 elements by deleting and re-adding + for doc_id in 0u64..DOC_COUNT { + index_writer.delete_by_address( + segment_id, + doc_id + .try_into() + .expect("test doc_id should fit as a DocId"), + ); + index_writer.commit()?; + index_reader.reload()?; + index_writer.add_document(doc!(id => doc_id))?; + index_writer.commit()?; + index_reader.reload()?; + let searcher = index_reader.searcher(); + // The number of document should be stable. + assert_eq!( + searcher.search(&AllQuery, &Count).unwrap(), + DOC_COUNT as usize + ); + } + + index_reader.reload()?; + let searcher = index_reader.searcher(); + let segment_ids: Vec = searcher + .segment_readers() + .iter() + .map(|reader| reader.segment_id()) + .collect(); + index_writer.merge(&segment_ids).wait()?; + index_reader.reload()?; + let searcher = index_reader.searcher(); + assert_eq!(searcher.search(&AllQuery, &Count)?, DOC_COUNT as usize); + Ok(()) + } + #[test] fn test_validate_checksum() -> crate::Result<()> { let index_path = tempfile::tempdir().expect("dir");