Add support for deleting all documents matching query (#1535)

* add support for deleting all documents matching query

#1494
This commit is contained in:
trinity-1686a
2022-09-22 14:26:09 +02:00
committed by GitHub
parent 75aafeeb9b
commit fa3d786a2f
4 changed files with 147 additions and 38 deletions

View File

@@ -472,6 +472,8 @@ mod tests {
// There are more tests in directory/mod.rs
// The following tests are specific to the MmapDirectory
use std::time::Duration;
use common::HasLen;
use super::*;
@@ -610,7 +612,14 @@ mod tests {
mmap_directory.get_cache_info().mmapped.len()
);
}
assert!(mmap_directory.get_cache_info().mmapped.is_empty());
Ok(())
// This test failed on CI. The last Mmap is dropped from the merging thread so there might
// be a race condition indeed.
for _ in 0..10 {
if mmap_directory.get_cache_info().mmapped.is_empty() {
return Ok(());
}
std::thread::sleep(Duration::from_millis(200));
}
panic!("The cache still contains information. One of the Mmap has not been dropped.");
}
}

View File

@@ -246,18 +246,27 @@ impl DeleteCursor {
mod tests {
use super::{DeleteOperation, DeleteQueue};
use crate::schema::{Field, Term};
use crate::query::{Explanation, Scorer, Weight};
use crate::{DocId, Score, SegmentReader};
struct DummyWeight;
impl Weight for DummyWeight {
fn scorer(&self, _reader: &SegmentReader, _boost: Score) -> crate::Result<Box<dyn Scorer>> {
Err(crate::TantivyError::InternalError("dummy impl".to_owned()))
}
fn explain(&self, _reader: &SegmentReader, _doc: DocId) -> crate::Result<Explanation> {
Err(crate::TantivyError::InternalError("dummy impl".to_owned()))
}
}
#[test]
fn test_deletequeue() {
let delete_queue = DeleteQueue::new();
let make_op = |i: usize| {
let field = Field::from_field_id(1u32);
DeleteOperation {
opstamp: i as u64,
term: Term::from_field_u64(field, i as u64),
}
let make_op = |i: usize| DeleteOperation {
opstamp: i as u64,
target: Box::new(DummyWeight),
};
delete_queue.push(make_op(1));

View File

@@ -11,7 +11,6 @@ use super::segment_updater::SegmentUpdater;
use super::{AddBatch, AddBatchReceiver, AddBatchSender, PreparedCommit};
use crate::core::{Index, Segment, SegmentComponent, SegmentId, SegmentMeta, SegmentReader};
use crate::directory::{DirectoryLock, GarbageCollectionResult, TerminatingWrite};
use crate::docset::{DocSet, TERMINATED};
use crate::error::TantivyError;
use crate::fastfield::write_alive_bitset;
use crate::indexer::delete_queue::{DeleteCursor, DeleteQueue};
@@ -20,8 +19,9 @@ use crate::indexer::index_writer_status::IndexWriterStatus;
use crate::indexer::operation::DeleteOperation;
use crate::indexer::stamper::Stamper;
use crate::indexer::{MergePolicy, SegmentEntry, SegmentWriter};
use crate::query::{Query, TermQuery};
use crate::schema::{Document, IndexRecordOption, Term};
use crate::{FutureResult, Opstamp};
use crate::{FutureResult, IndexReader, Opstamp};
// Size of the margin for the `memory_arena`. A segment is closed when the remaining memory
// in the `memory_arena` goes below MARGIN_IN_BYTES.
@@ -57,6 +57,7 @@ pub struct IndexWriter {
_directory_lock: Option<DirectoryLock>,
index: Index,
index_reader: IndexReader,
memory_arena_in_bytes_per_thread: usize,
@@ -92,19 +93,14 @@ fn compute_deleted_bitset(
// A delete operation should only affect
// document that were inserted before it.
let inverted_index = segment_reader.inverted_index(delete_op.term.field())?;
if let Some(mut docset) =
inverted_index.read_postings(&delete_op.term, IndexRecordOption::Basic)?
{
let mut doc_matching_deleted_term = docset.doc();
while doc_matching_deleted_term != TERMINATED {
if doc_opstamps.is_deleted(doc_matching_deleted_term, delete_op.opstamp) {
alive_bitset.remove(doc_matching_deleted_term);
delete_op
.target
.for_each(segment_reader, &mut |doc_matching_delete_query, _| {
if doc_opstamps.is_deleted(doc_matching_delete_query, delete_op.opstamp) {
alive_bitset.remove(doc_matching_delete_query);
might_have_changed = true;
}
doc_matching_deleted_term = docset.advance();
}
}
})?;
delete_cursor.advance();
}
Ok(might_have_changed)
@@ -302,6 +298,7 @@ impl IndexWriter {
memory_arena_in_bytes_per_thread,
index: index.clone(),
index_reader: index.reader()?,
index_writer_status: IndexWriterStatus::from(document_receiver),
operation_sender: document_sender,
@@ -666,10 +663,33 @@ impl IndexWriter {
/// Like adds, the deletion itself will be visible
/// only after calling `commit()`.
pub fn delete_term(&self, term: Term) -> Opstamp {
let query = TermQuery::new(term, IndexRecordOption::Basic);
// For backward compatibility, if Term is invalid for the index, do nothing but return an
// Opstamp
self.delete_query(Box::new(query))
.unwrap_or_else(|_| self.stamper.stamp())
}
/// Delete all documents matching a given query.
/// Returns an `Err` if the query can't be executed.
///
/// Delete operation only affects documents that
/// were added in previous commits, and documents
/// that were added previously in the same commit.
///
/// Like adds, the deletion itself will be visible
/// only after calling `commit()`.
#[doc(hidden)]
pub fn delete_query(&self, query: Box<dyn Query>) -> crate::Result<Opstamp> {
let weight = query.weight(&self.index_reader.searcher(), false)?;
let opstamp = self.stamper.stamp();
let delete_operation = DeleteOperation { opstamp, term };
let delete_operation = DeleteOperation {
opstamp,
target: weight,
};
self.delete_queue.push(delete_operation);
opstamp
Ok(opstamp)
}
/// Returns the opstamp of the last successful commit.
@@ -738,10 +758,17 @@ impl IndexWriter {
let (batch_opstamp, stamps) = self.get_batch_opstamps(count);
let mut adds = AddBatch::default();
for (user_op, opstamp) in user_operations_it.zip(stamps) {
match user_op {
UserOperation::Delete(term) => {
let delete_operation = DeleteOperation { opstamp, term };
let query = TermQuery::new(term, IndexRecordOption::Basic);
let weight = query.weight(&self.index_reader.searcher(), false)?;
let delete_operation = DeleteOperation {
opstamp,
target: weight,
};
self.delete_queue.push(delete_operation);
}
UserOperation::Add(document) => {
@@ -786,7 +813,7 @@ mod tests {
use crate::directory::error::LockError;
use crate::error::*;
use crate::indexer::NoMergePolicy;
use crate::query::{QueryParser, TermQuery};
use crate::query::{BooleanQuery, Occur, Query, QueryParser, TermQuery};
use crate::schema::{
self, Cardinality, Facet, FacetOptions, IndexRecordOption, NumericOptions,
TextFieldIndexing, TextOptions, FAST, INDEXED, STORED, STRING, TEXT,
@@ -1418,10 +1445,72 @@ mod tests {
Ok(())
}
#[test]
fn test_delete_query_with_sort_by_field() -> crate::Result<()> {
let mut schema_builder = schema::Schema::builder();
let id_field =
schema_builder.add_u64_field("id", schema::INDEXED | schema::STORED | schema::FAST);
let schema = schema_builder.build();
let settings = IndexSettings {
sort_by_field: Some(IndexSortByField {
field: "id".to_string(),
order: Order::Desc,
}),
..Default::default()
};
let index = Index::builder()
.schema(schema)
.settings(settings)
.create_in_ram()?;
let index_reader = index.reader()?;
let mut index_writer = index.writer_for_tests()?;
// create and delete docs in same commit
for id in 0u64..5u64 {
index_writer.add_document(doc!(id_field => id))?;
}
for id in 1u64..4u64 {
let term = Term::from_field_u64(id_field, id);
let not_term = Term::from_field_u64(id_field, 2);
let term = Box::new(TermQuery::new(term, Default::default()));
let not_term = Box::new(TermQuery::new(not_term, Default::default()));
let query: BooleanQuery = vec![
(Occur::Must, term as Box<dyn Query>),
(Occur::MustNot, not_term as Box<dyn Query>),
]
.into();
index_writer.delete_query(Box::new(query))?;
}
for id in 5u64..10u64 {
index_writer.add_document(doc!(id_field => id))?;
}
index_writer.commit()?;
index_reader.reload()?;
let searcher = index_reader.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
let segment_reader = searcher.segment_reader(0);
assert_eq!(segment_reader.num_docs(), 8);
assert_eq!(segment_reader.max_doc(), 10);
let fast_field_reader = segment_reader.fast_fields().u64(id_field)?;
let in_order_alive_ids: Vec<u64> = segment_reader
.doc_ids_alive()
.map(|doc| fast_field_reader.get_val(doc as u64))
.collect();
assert_eq!(&in_order_alive_ids[..], &[9, 8, 7, 6, 5, 4, 2, 0]);
Ok(())
}
#[derive(Debug, Clone, Copy)]
enum IndexingOp {
AddDoc { id: u64 },
DeleteDoc { id: u64 },
DeleteDocQuery { id: u64 },
Commit,
Merge,
}
@@ -1429,6 +1518,7 @@ mod tests {
fn balanced_operation_strategy() -> impl Strategy<Value = IndexingOp> {
prop_oneof![
(0u64..20u64).prop_map(|id| IndexingOp::DeleteDoc { id }),
(0u64..20u64).prop_map(|id| IndexingOp::DeleteDocQuery { id }),
(0u64..20u64).prop_map(|id| IndexingOp::AddDoc { id }),
(0u64..1u64).prop_map(|_| IndexingOp::Commit),
(0u64..1u64).prop_map(|_| IndexingOp::Merge),
@@ -1437,7 +1527,8 @@ mod tests {
fn adding_operation_strategy() -> impl Strategy<Value = IndexingOp> {
prop_oneof![
10 => (0u64..100u64).prop_map(|id| IndexingOp::DeleteDoc { id }),
5 => (0u64..100u64).prop_map(|id| IndexingOp::DeleteDoc { id }),
5 => (0u64..100u64).prop_map(|id| IndexingOp::DeleteDocQuery { id }),
50 => (0u64..100u64).prop_map(|id| IndexingOp::AddDoc { id }),
2 => (0u64..1u64).prop_map(|_| IndexingOp::Commit),
1 => (0u64..1u64).prop_map(|_| IndexingOp::Merge),
@@ -1457,6 +1548,10 @@ mod tests {
existing_ids.remove(&id);
deleted_ids.insert(id);
}
IndexingOp::DeleteDocQuery { id } => {
existing_ids.remove(&id);
deleted_ids.insert(id);
}
_ => {}
}
}
@@ -1539,6 +1634,11 @@ mod tests {
IndexingOp::DeleteDoc { id } => {
index_writer.delete_term(Term::from_field_u64(id_field, id));
}
IndexingOp::DeleteDocQuery { id } => {
let term = Term::from_field_u64(id_field, id);
let query = TermQuery::new(term, Default::default());
index_writer.delete_query(Box::new(query))?;
}
IndexingOp::Commit => {
index_writer.commit()?;
}

View File

@@ -1,20 +1,11 @@
use crate::query::Weight;
use crate::schema::{Document, Term};
use crate::Opstamp;
/// Timestamped Delete operation.
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct DeleteOperation {
pub opstamp: Opstamp,
pub term: Term,
}
impl Default for DeleteOperation {
fn default() -> Self {
DeleteOperation {
opstamp: 0u64,
term: Term::new(),
}
}
pub target: Box<dyn Weight>,
}
/// Timestamped Add operation.