feat: delete docs by (SegmentId, DocId) (#26)

This teaches tantivy how to "directly" delete a document in a segment.
    
Our use case from pg_search is that we already know the segment_id and doc_id so it's waaaaay more efficient for us to delete docs through our `ambulkdelete()` routine.

It avoids doing a search, and all the stuff around that, for each of our "ctid" terms that we want to delete.
This commit is contained in:
Eric Ridge
2025-01-27 16:41:41 -05:00
committed by Stu Hood
parent 1b88bb61f9
commit 0552dddeb9
5 changed files with 154 additions and 29 deletions

View File

@@ -189,7 +189,7 @@ impl DeleteCursor {
fn is_behind_opstamp(&mut self, target_opstamp: Opstamp) -> bool {
self.get()
.map(|operation| operation.opstamp < target_opstamp)
.map(|operation| operation.opstamp() < target_opstamp)
.unwrap_or(false)
}
@@ -263,7 +263,7 @@ mod tests {
fn test_deletequeue() {
let delete_queue = DeleteQueue::new();
let make_op = |i: usize| DeleteOperation {
let make_op = |i: usize| DeleteOperation::ByWeight {
opstamp: i as u64,
target: Box::new(DummyWeight),
};
@@ -274,9 +274,9 @@ mod tests {
let snapshot = delete_queue.cursor();
{
let mut operations_it = snapshot.clone();
assert_eq!(operations_it.get().unwrap().opstamp, 1);
assert_eq!(operations_it.get().unwrap().opstamp(), 1);
operations_it.advance();
assert_eq!(operations_it.get().unwrap().opstamp, 2);
assert_eq!(operations_it.get().unwrap().opstamp(), 2);
operations_it.advance();
assert!(operations_it.get().is_none());
operations_it.advance();
@@ -284,20 +284,20 @@ mod tests {
let mut snapshot2 = delete_queue.cursor();
assert!(snapshot2.get().is_none());
delete_queue.push(make_op(3));
assert_eq!(snapshot2.get().unwrap().opstamp, 3);
assert_eq!(operations_it.get().unwrap().opstamp, 3);
assert_eq!(operations_it.get().unwrap().opstamp, 3);
assert_eq!(snapshot2.get().unwrap().opstamp(), 3);
assert_eq!(operations_it.get().unwrap().opstamp(), 3);
assert_eq!(operations_it.get().unwrap().opstamp(), 3);
operations_it.advance();
assert!(operations_it.get().is_none());
operations_it.advance();
}
{
let mut operations_it = snapshot;
assert_eq!(operations_it.get().unwrap().opstamp, 1);
assert_eq!(operations_it.get().unwrap().opstamp(), 1);
operations_it.advance();
assert_eq!(operations_it.get().unwrap().opstamp, 2);
assert_eq!(operations_it.get().unwrap().opstamp(), 2);
operations_it.advance();
assert_eq!(operations_it.get().unwrap().opstamp, 3);
assert_eq!(operations_it.get().unwrap().opstamp(), 3);
operations_it.advance();
assert!(operations_it.get().is_none());
}

View File

@@ -22,7 +22,7 @@ use crate::indexer::{MergePolicy, SegmentEntry, SegmentWriter};
use crate::query::{EnableScoring, Query, TermQuery};
use crate::schema::document::Document;
use crate::schema::{IndexRecordOption, TantivyDocument, Term};
use crate::{FutureResult, Opstamp};
use crate::{DocId, FutureResult, Opstamp};
// Size of the margin for the `memory_arena`. A segment is closed when the remaining memory
// in the `memory_arena` goes below MARGIN_IN_BYTES.
@@ -101,24 +101,41 @@ fn compute_deleted_bitset(
) -> crate::Result<bool> {
let mut might_have_changed = false;
while let Some(delete_op) = delete_cursor.get() {
if delete_op.opstamp > target_opstamp {
if delete_op.opstamp() > target_opstamp {
break;
}
// A delete operation should only affect
// document that were inserted before it.
delete_op
.target
.for_each_no_score(segment_reader, &mut |docs_matching_delete_query| {
for doc_matching_delete_query in docs_matching_delete_query.iter().cloned() {
if doc_opstamps.is_deleted(doc_matching_delete_query, delete_op.opstamp) {
alive_bitset.remove(doc_matching_delete_query);
match delete_op {
DeleteOperation::ByWeight { opstamp, target } => {
// A delete operation should only affect
// document that were inserted before it.
target.for_each_no_score(segment_reader, &mut |docs_matching_delete_query| {
for doc_matching_delete_query in docs_matching_delete_query.iter().cloned() {
if doc_opstamps.is_deleted(doc_matching_delete_query, *opstamp) {
alive_bitset.remove(doc_matching_delete_query);
might_have_changed = true;
}
}
})?;
}
DeleteOperation::ByAddress {
opstamp,
segment_id,
doc_id,
} => {
if *segment_id == segment_reader.segment_id() {
if doc_opstamps.is_deleted(*doc_id, *opstamp) {
alive_bitset.remove(*doc_id);
might_have_changed = true;
}
}
})?;
}
}
delete_cursor.advance();
}
Ok(might_have_changed)
}
@@ -698,7 +715,7 @@ impl<D: Document> IndexWriter<D> {
pub fn delete_query(&self, query: Box<dyn Query>) -> crate::Result<Opstamp> {
let weight = query.weight(EnableScoring::disabled_from_schema(&self.index.schema()))?;
let opstamp = self.stamper.stamp();
let delete_operation = DeleteOperation {
let delete_operation = DeleteOperation::ByWeight {
opstamp,
target: weight,
};
@@ -706,6 +723,17 @@ impl<D: Document> IndexWriter<D> {
Ok(opstamp)
}
/// Delete a specific document by its already-known [`DocAddress`]
pub fn delete_by_address(&self, segment_id: SegmentId, doc_id: DocId) -> Opstamp {
let opstamp = self.stamper.stamp();
self.delete_queue.push(DeleteOperation::ByAddress {
opstamp,
segment_id,
doc_id,
});
opstamp
}
/// Returns the opstamp of the last successful commit.
///
/// This is, for instance, the opstamp the index will
@@ -779,7 +807,7 @@ impl<D: Document> IndexWriter<D> {
let query = TermQuery::new(term, IndexRecordOption::Basic);
let weight =
query.weight(EnableScoring::disabled_from_schema(&self.index.schema()))?;
let delete_operation = DeleteOperation {
let delete_operation = DeleteOperation::ByWeight {
opstamp,
target: weight,
};
@@ -789,6 +817,13 @@ impl<D: Document> IndexWriter<D> {
let add_operation = AddOperation { opstamp, document };
adds.push(add_operation);
}
UserOperation::DeleteByAddress(segment_id, doc_id) => {
self.delete_queue.push(DeleteOperation::ByAddress {
opstamp,
segment_id,
doc_id,
});
}
}
}
self.send_add_documents_batch(adds)?;

View File

@@ -1,12 +1,28 @@
use crate::index::SegmentId;
use crate::query::Weight;
use crate::schema::document::Document;
use crate::schema::{TantivyDocument, Term};
use crate::Opstamp;
use crate::{DocId, Opstamp};
/// Timestamped Delete operation.
pub struct DeleteOperation {
pub opstamp: Opstamp,
pub target: Box<dyn Weight>,
pub enum DeleteOperation {
ByWeight {
opstamp: Opstamp,
target: Box<dyn Weight>,
},
ByAddress {
opstamp: Opstamp,
segment_id: SegmentId,
doc_id: DocId,
},
}
impl DeleteOperation {
pub fn opstamp(&self) -> Opstamp {
match self {
DeleteOperation::ByWeight { opstamp, .. } => *opstamp,
DeleteOperation::ByAddress { opstamp, .. } => *opstamp,
}
}
}
/// Timestamped Add operation.
@@ -23,4 +39,7 @@ pub enum UserOperation<D: Document = TantivyDocument> {
Add(D),
/// Delete operation
Delete(Term),
/// Delete a document by its address
DeleteByAddress(SegmentId, DocId),
}

View File

@@ -672,7 +672,7 @@ impl SegmentUpdater {
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
if let Some(delete_operation) = delete_cursor.get() {
let committed_opstamp = segment_updater.load_meta().opstamp;
if delete_operation.opstamp < committed_opstamp {
if delete_operation.opstamp() < committed_opstamp {
// We are not up to date! Let's create a new tombstone file for our
// freshly create split.
let index = &segment_updater.index;

View File

@@ -1167,6 +1167,77 @@ pub mod tests {
Ok(())
}
#[test]
fn test_delete_by_address() -> crate::Result<()> {
use crate::collector::Count;
use crate::index::SegmentId;
use crate::indexer::NoMergePolicy;
use crate::query::AllQuery;
const DOC_COUNT: u64 = 2u64;
let mut schema_builder = SchemaBuilder::default();
let id = schema_builder.add_u64_field("id", INDEXED);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let index_reader = index.reader()?;
let mut index_writer: IndexWriter = index.writer_for_tests()?;
index_writer.set_merge_policy(Box::new(NoMergePolicy));
for doc_id in 0u64..DOC_COUNT {
index_writer.add_document(doc!(id => doc_id))?;
}
index_writer.commit()?;
index_reader.reload()?;
let searcher = index_reader.searcher();
assert_eq!(
searcher.search(&AllQuery, &Count).unwrap(),
DOC_COUNT as usize
);
let segment_readers = searcher.segment_readers();
assert!(segment_readers.len() == 1);
let segment_id = segment_readers[0].segment_id();
// update the 10 elements by deleting and re-adding
for doc_id in 0u64..DOC_COUNT {
index_writer.delete_by_address(
segment_id,
doc_id
.try_into()
.expect("test doc_id should fit as a DocId"),
);
index_writer.commit()?;
index_reader.reload()?;
index_writer.add_document(doc!(id => doc_id))?;
index_writer.commit()?;
index_reader.reload()?;
let searcher = index_reader.searcher();
// The number of document should be stable.
assert_eq!(
searcher.search(&AllQuery, &Count).unwrap(),
DOC_COUNT as usize
);
}
index_reader.reload()?;
let searcher = index_reader.searcher();
let segment_ids: Vec<SegmentId> = searcher
.segment_readers()
.iter()
.map(|reader| reader.segment_id())
.collect();
index_writer.merge(&segment_ids).wait()?;
index_reader.reload()?;
let searcher = index_reader.searcher();
assert_eq!(searcher.search(&AllQuery, &Count)?, DOC_COUNT as usize);
Ok(())
}
#[test]
fn test_validate_checksum() -> crate::Result<()> {
let index_path = tempfile::tempdir().expect("dir");