feat: Add Directory::wants_cancel() function (#31)

This adds a function named `wants_cancel() -> bool` to the `Directory` trait.  It allows a Directory implementation to indicate that it would like Tantivy to cancel an operation.

Right now, querying this function only happens during key points of index merging, but _could_ be used in other places.  Technically, segment merging is the only "black box" in tantivy that users don't otherwise have the direct ability to control.

The default implementaiton of `wants_cancel()` returns false, so there's no fear of default tantivy spuriously cancelling a merge.

The cancels happen "cleanly" such that if `wants_cancel()` returns true an `Err(TantivyError::Cancelled)` is returned from the calling function at that point, and the error result will be propogated up the stack.  No panics are raised.
This commit is contained in:
Eric Ridge
2025-02-28 10:07:48 -05:00
committed by Stu Hood
parent eabe589814
commit 8b7db36c99
7 changed files with 136 additions and 54 deletions

View File

@@ -4,6 +4,7 @@ mod term_merger;
use std::collections::{BTreeMap, HashSet};
use std::io;
use std::io::ErrorKind;
use std::net::Ipv6Addr;
use std::sync::Arc;
@@ -78,6 +79,7 @@ pub fn merge_columnar(
required_columns: &[(String, ColumnType)],
merge_row_order: MergeRowOrder,
output: &mut impl io::Write,
cancel: impl Fn() -> bool,
) -> io::Result<()> {
let mut serializer = ColumnarSerializer::new(output);
let num_docs_per_columnar = columnar_readers
@@ -87,6 +89,9 @@ pub fn merge_columnar(
let columns_to_merge = group_columns_for_merge(columnar_readers, required_columns)?;
for res in columns_to_merge {
if cancel() {
return Err(io::Error::new(ErrorKind::Interrupted, "Merge cancelled"));
}
let ((column_name, _column_type_category), grouped_columns) = res;
let grouped_columns = grouped_columns.open(&merge_row_order)?;
if grouped_columns.is_empty() {

View File

@@ -294,6 +294,13 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
fn panic_handler(&self) -> Option<DirectoryPanicHandler> {
None
}
/// Returns true if this directory is in a position of requiring that tantivy cancel
/// whatever operation(s) it might be doing Typically this is just for the background
/// merge processes but could be used for anything
fn wants_cancel(&self) -> bool {
false
}
}
/// DirectoryClone

View File

@@ -374,6 +374,10 @@ impl Directory for ManagedDirectory {
fn panic_handler(&self) -> Option<DirectoryPanicHandler> {
self.directory.panic_handler()
}
fn wants_cancel(&self) -> bool {
self.directory.wants_cancel()
}
}
impl Clone for ManagedDirectory {

View File

@@ -110,6 +110,9 @@ pub enum TantivyError {
#[error("Deserialize error: {0}")]
/// An error occurred while attempting to deserialize a document.
DeserializeError(DeserializeError),
/// The user requested the current operation be cancelled
#[error("User requested cancel")]
Cancelled,
}
impl From<io::Error> for TantivyError {

View File

@@ -332,6 +332,10 @@ impl<D: Document> IndexWriter<D> {
&delete_queue.cursor(),
options.num_merge_threads,
index.directory().panic_handler(),
{
let index = index.clone();
move || index.directory().wants_cancel()
},
)?;
let mut index_writer = Self {

View File

@@ -14,6 +14,7 @@ use crate::fastfield::AliveBitSet;
use crate::fieldnorm::{FieldNormReader, FieldNormReaders, FieldNormsSerializer, FieldNormsWriter};
use crate::index::{Segment, SegmentComponent, SegmentReader};
use crate::indexer::doc_id_mapping::{MappingType, SegmentDocIdMapping};
use crate::indexer::segment_updater::CancelSentinel;
use crate::indexer::SegmentSerializer;
use crate::postings::{InvertedIndexSerializer, Postings, SegmentPostings};
use crate::schema::{value_type_to_column_type, Field, FieldType, Schema};
@@ -80,6 +81,7 @@ pub struct IndexMerger {
schema: Schema,
pub(crate) readers: Vec<SegmentReader>,
max_doc: u32,
cancel: Box<dyn CancelSentinel>,
}
struct DeltaComputer {
@@ -145,9 +147,13 @@ fn extract_fast_field_required_columns(schema: &Schema) -> Vec<(String, ColumnTy
}
impl IndexMerger {
pub fn open(schema: Schema, segments: &[Segment]) -> crate::Result<IndexMerger> {
pub fn open(
schema: Schema,
segments: &[Segment],
cancel: Box<dyn CancelSentinel>,
) -> crate::Result<IndexMerger> {
let alive_bitset = segments.iter().map(|_| None).collect_vec();
Self::open_with_custom_alive_set(schema, segments, alive_bitset)
Self::open_with_custom_alive_set(schema, segments, alive_bitset, cancel)
}
// Create merge with a custom delete set.
@@ -166,6 +172,7 @@ impl IndexMerger {
schema: Schema,
segments: &[Segment],
alive_bitset_opt: Vec<Option<AliveBitSet>>,
cancel: Box<dyn CancelSentinel>,
) -> crate::Result<IndexMerger> {
let mut readers = vec![];
for (segment, new_alive_bitset_opt) in segments.iter().zip(alive_bitset_opt) {
@@ -189,6 +196,7 @@ impl IndexMerger {
schema,
readers,
max_doc,
cancel,
})
}
@@ -200,6 +208,9 @@ impl IndexMerger {
let fields = FieldNormsWriter::fields_with_fieldnorm(&self.schema);
let mut fieldnorms_data = Vec::with_capacity(self.max_doc as usize);
for field in fields {
if self.cancel.wants_cancel() {
return Err(crate::TantivyError::Cancelled);
}
fieldnorms_data.clear();
let fieldnorms_readers: Vec<FieldNormReader> = self
.readers
@@ -235,6 +246,7 @@ impl IndexMerger {
&required_columns,
merge_row_order,
fast_field_wrt,
|| self.cancel.wants_cancel(),
)?;
Ok(())
}
@@ -358,6 +370,9 @@ impl IndexMerger {
let mut segment_postings_containing_the_term: Vec<(usize, SegmentPostings)> = vec![];
while merged_terms.advance() {
if self.cancel.wants_cancel() {
return Err(crate::TantivyError::Cancelled);
}
segment_postings_containing_the_term.clear();
let term_bytes: &[u8] = merged_terms.key();
@@ -436,6 +451,9 @@ impl IndexMerger {
let mut doc = segment_postings.doc();
while doc != TERMINATED {
if self.cancel.wants_cancel() {
return Err(crate::TantivyError::Cancelled);
}
// deleted doc are skipped as they do not have a `remapped_doc_id`.
if let Some(remapped_doc_id) = old_to_new_doc_id[doc as usize] {
// we make sure to only write the term if
@@ -472,6 +490,9 @@ impl IndexMerger {
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<()> {
for (field, field_entry) in self.schema.fields() {
if self.cancel.wants_cancel() {
return Err(crate::TantivyError::Cancelled);
}
let fieldnorm_reader = fieldnorm_readers.get_field(field)?;
if field_entry.is_indexed() {
self.write_postings_for_field(
@@ -510,6 +531,9 @@ impl IndexMerger {
|| store_reader.decompressor() != store_writer.compressor().into()
{
for doc_bytes_res in store_reader.iter_raw(reader.alive_bitset()) {
if self.cancel.wants_cancel() {
return Err(crate::TantivyError::Cancelled);
}
let doc_bytes = doc_bytes_res?;
store_writer.store_bytes(&doc_bytes)?;
}

View File

@@ -63,6 +63,26 @@ pub(crate) fn save_metas(
}
}
/// Describes a routine for allowing an operation in tantivy to be cleanly cancelled
///
/// We provide an implementation for `Fn() -> bool`.
pub trait CancelSentinel: Send + Sync + 'static {
fn box_clone(&self) -> Box<dyn CancelSentinel>;
fn wants_cancel(&self) -> bool;
}
impl<F: Fn() -> bool + Send + Sync + 'static> CancelSentinel for F
where F: Clone
{
fn box_clone(&self) -> Box<dyn CancelSentinel> {
Box::new(self.clone())
}
fn wants_cancel(&self) -> bool {
self()
}
}
// The segment update runner is in charge of processing all
// of the `SegmentUpdate`s.
//
@@ -71,15 +91,26 @@ pub(crate) fn save_metas(
//
// We voluntarily pass a merge_operation ref to guarantee that
// the merge_operation is alive during the process
#[derive(Clone)]
pub(crate) struct SegmentUpdater(Arc<InnerSegmentUpdater>);
pub(crate) struct SegmentUpdater {
inner: Arc<InnerSegmentUpdater>,
cancel: Box<dyn CancelSentinel>,
}
impl Clone for SegmentUpdater {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
cancel: self.cancel.box_clone(),
}
}
}
impl Deref for SegmentUpdater {
type Target = InnerSegmentUpdater;
#[inline]
fn deref(&self) -> &Self::Target {
&self.0
&self.inner
}
}
@@ -99,6 +130,7 @@ fn merge(
index: &Index,
mut segment_entries: Vec<SegmentEntry>,
target_opstamp: Opstamp,
cancel: Box<dyn CancelSentinel>,
) -> crate::Result<Option<SegmentEntry>> {
let num_docs = segment_entries
.iter()
@@ -125,7 +157,7 @@ fn merge(
.collect();
// An IndexMerger is like a "view" of our merged segments.
let merger: IndexMerger = IndexMerger::open(index.schema(), &segments[..])?;
let merger = IndexMerger::open(index.schema(), &segments[..], cancel)?;
// ... we just serialize this index merger in our new segment to merge the segments.
let segment_serializer = SegmentSerializer::for_segment(merged_segment.clone())?;
@@ -153,6 +185,7 @@ fn merge(
pub fn merge_indices<T: Into<Box<dyn Directory>>>(
indices: &[Index],
output_directory: T,
cancel: Box<dyn CancelSentinel>,
) -> crate::Result<Index> {
if indices.is_empty() {
// If there are no indices to merge, there is no need to do anything.
@@ -180,7 +213,13 @@ pub fn merge_indices<T: Into<Box<dyn Directory>>>(
}
let non_filter = segments.iter().map(|_| None).collect::<Vec<_>>();
merge_filtered_segments(&segments, target_settings, non_filter, output_directory)
merge_filtered_segments(
&segments,
target_settings,
non_filter,
output_directory,
cancel,
)
}
/// Advanced: Merges a list of segments from different indices in a new index.
@@ -201,6 +240,7 @@ pub fn merge_filtered_segments<T: Into<Box<dyn Directory>>>(
target_settings: IndexSettings,
filter_doc_ids: Vec<Option<AliveBitSet>>,
output_directory: T,
cancel: Box<dyn CancelSentinel>,
) -> crate::Result<Index> {
if segments.is_empty() {
// If there are no indices to merge, there is no need to do anything.
@@ -229,8 +269,12 @@ pub fn merge_filtered_segments<T: Into<Box<dyn Directory>>>(
)?;
let merged_segment = merged_index.new_segment();
let merged_segment_id = merged_segment.id();
let merger: IndexMerger =
IndexMerger::open_with_custom_alive_set(merged_index.schema(), segments, filter_doc_ids)?;
let merger = IndexMerger::open_with_custom_alive_set(
merged_index.schema(),
segments,
filter_doc_ids,
cancel,
)?;
let segment_serializer = SegmentSerializer::for_segment(merged_segment)?;
let num_docs = merger.write(segment_serializer)?;
@@ -299,6 +343,7 @@ impl SegmentUpdater {
delete_cursor: &DeleteCursor,
num_merge_threads: usize,
panic_handler: Option<DirectoryPanicHandler>,
cancel: impl Fn() -> bool + 'static + Send + Sync + Clone,
) -> crate::Result<SegmentUpdater> {
let segments = index.searchable_segment_metas()?;
let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
@@ -330,17 +375,20 @@ impl SegmentUpdater {
crate::TantivyError::SystemError("Failed to spawn segment merging thread".to_string())
})?;
let index_meta = index.load_metas()?;
Ok(SegmentUpdater(Arc::new(InnerSegmentUpdater {
active_index_meta: RwLock::new(Arc::new(index_meta)),
pool,
merge_thread_pool,
index,
segment_manager,
merge_policy: RwLock::new(Arc::new(DefaultMergePolicy::default())),
killed: AtomicBool::new(false),
stamper,
merge_operations: Default::default(),
})))
Ok(SegmentUpdater {
inner: Arc::new(InnerSegmentUpdater {
active_index_meta: RwLock::new(Arc::new(index_meta)),
pool,
merge_thread_pool,
index,
segment_manager,
merge_policy: RwLock::new(Arc::new(DefaultMergePolicy::default())),
killed: AtomicBool::new(false),
stamper,
merge_operations: Default::default(),
}),
cancel: Box::new(cancel),
})
}
pub fn get_merge_policy(&self) -> Arc<dyn MergePolicy> {
@@ -473,7 +521,7 @@ impl SegmentUpdater {
opstamp: Opstamp,
payload: Option<String>,
) -> FutureResult<Opstamp> {
let segment_updater: SegmentUpdater = self.clone();
let segment_updater = self.clone();
self.schedule_task(move || {
let segment_entries = segment_updater.purge_deletes(opstamp)?;
let previous_metas = segment_updater.load_meta();
@@ -552,39 +600,18 @@ impl SegmentUpdater {
let (scheduled_result, merging_future_send) =
FutureResult::create("Merge operation failed.");
let cancel = self.cancel.box_clone();
self.merge_thread_pool.spawn(move || {
// The fact that `merge_operation` is moved here is important.
// Its lifetime is used to track how many merging thread are currently running,
// as well as which segment is currently in merge and therefore should not be
// candidate for another merge.
let merge_panic_res = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
merge(
&segment_updater.index,
segment_entries,
merge_operation.target_opstamp(),
)
}));
let merge_res = match merge_panic_res {
Ok(merge_res) => merge_res,
Err(panic_err) => {
let panic_str = if let Some(msg) = panic_err.downcast_ref::<&str>() {
*msg
} else if let Some(msg) = panic_err.downcast_ref::<String>() {
msg.as_str()
} else {
"UNKNOWN"
};
let _send_result = merging_future_send.send(Err(TantivyError::SystemError(
format!("Merge thread panicked: {panic_str}"),
)));
// Resume unwinding because we forced unwind safety with
// `std::panic::AssertUnwindSafe` Use a specific message so
// the panic_handler can double check that we properly caught the panic.
let boxed_panic_message: Box<dyn Any + Send> = Box::new(PANIC_CAUGHT);
std::panic::resume_unwind(boxed_panic_message);
}
};
match merge_res {
match merge(
&segment_updater.index,
segment_entries,
merge_operation.target_opstamp(),
cancel,
) {
Ok(after_merge_segment_entry) => {
let res = segment_updater.end_merge(merge_operation, after_merge_segment_entry);
let _send_result = merging_future_send.send(res);
@@ -950,7 +977,7 @@ mod tests {
assert_eq!(indices.len(), 3);
let output_directory: Box<dyn Directory> = Box::<RamDirectory>::default();
let index = merge_indices(&indices, output_directory)?;
let index = merge_indices(&indices, output_directory, Box::new(|| false))?;
assert_eq!(index.schema(), schema);
let segments = index.searchable_segments()?;
@@ -964,7 +991,7 @@ mod tests {
#[test]
fn test_merge_empty_indices_array() {
let merge_result = merge_indices(&[], RamDirectory::default());
let merge_result = merge_indices(&[], RamDirectory::default(), Box::new(|| false));
assert!(merge_result.is_err());
}
@@ -991,7 +1018,11 @@ mod tests {
};
// mismatched schema index list
let result = merge_indices(&[first_index, second_index], RamDirectory::default());
let result = merge_indices(
&[first_index, second_index],
RamDirectory::default(),
Box::new(|| false),
);
assert!(result.is_err());
Ok(())
@@ -1039,6 +1070,7 @@ mod tests {
target_settings,
filter_segments,
RamDirectory::default(),
Box::new(|| false),
)?;
let segments = merged_index.searchable_segments()?;
@@ -1084,6 +1116,7 @@ mod tests {
target_settings,
filter_segments,
RamDirectory::default(),
Box::new(|| false),
)?;
let segments = index.searchable_segments()?;
@@ -1147,10 +1180,11 @@ mod tests {
target_schema,
target_settings.clone(),
)?;
let merger: IndexMerger = IndexMerger::open_with_custom_alive_set(
let merger = IndexMerger::open_with_custom_alive_set(
merged_index.schema(),
&segments[..],
filter_segments,
Box::new(|| false),
)?;
let doc_ids_alive: Vec<_> = merger.readers[0].doc_ids_alive().collect();
@@ -1162,10 +1196,11 @@ mod tests {
let target_schema = segments[0].schema();
let merged_index =
Index::create(RamDirectory::default(), target_schema, target_settings)?;
let merger: IndexMerger = IndexMerger::open_with_custom_alive_set(
let merger = IndexMerger::open_with_custom_alive_set(
merged_index.schema(),
&segments[..],
filter_segments,
Box::new(|| false),
)?;
let doc_ids_alive: Vec<_> = merger.readers[0].doc_ids_alive().collect();