From 8b7db36c99dad5dc599782341ec03bff242567e8 Mon Sep 17 00:00:00 2001 From: Eric Ridge Date: Fri, 28 Feb 2025 10:07:48 -0500 Subject: [PATCH] feat: Add `Directory::wants_cancel()` function (#31) This adds a function named `wants_cancel() -> bool` to the `Directory` trait. It allows a Directory implementation to indicate that it would like Tantivy to cancel an operation. Right now, querying this function only happens during key points of index merging, but _could_ be used in other places. Technically, segment merging is the only "black box" in tantivy that users don't otherwise have the direct ability to control. The default implementaiton of `wants_cancel()` returns false, so there's no fear of default tantivy spuriously cancelling a merge. The cancels happen "cleanly" such that if `wants_cancel()` returns true an `Err(TantivyError::Cancelled)` is returned from the calling function at that point, and the error result will be propogated up the stack. No panics are raised. --- columnar/src/columnar/merge/mod.rs | 5 ++ src/directory/directory.rs | 7 ++ src/directory/managed_directory.rs | 4 + src/error.rs | 3 + src/indexer/index_writer.rs | 4 + src/indexer/merger.rs | 28 +++++- src/indexer/segment_updater.rs | 139 ++++++++++++++++++----------- 7 files changed, 136 insertions(+), 54 deletions(-) diff --git a/columnar/src/columnar/merge/mod.rs b/columnar/src/columnar/merge/mod.rs index 70ace6b78..664b0759a 100644 --- a/columnar/src/columnar/merge/mod.rs +++ b/columnar/src/columnar/merge/mod.rs @@ -4,6 +4,7 @@ mod term_merger; use std::collections::{BTreeMap, HashSet}; use std::io; +use std::io::ErrorKind; use std::net::Ipv6Addr; use std::sync::Arc; @@ -78,6 +79,7 @@ pub fn merge_columnar( required_columns: &[(String, ColumnType)], merge_row_order: MergeRowOrder, output: &mut impl io::Write, + cancel: impl Fn() -> bool, ) -> io::Result<()> { let mut serializer = ColumnarSerializer::new(output); let num_docs_per_columnar = columnar_readers @@ -87,6 +89,9 @@ pub fn merge_columnar( let columns_to_merge = group_columns_for_merge(columnar_readers, required_columns)?; for res in columns_to_merge { + if cancel() { + return Err(io::Error::new(ErrorKind::Interrupted, "Merge cancelled")); + } let ((column_name, _column_type_category), grouped_columns) = res; let grouped_columns = grouped_columns.open(&merge_row_order)?; if grouped_columns.is_empty() { diff --git a/src/directory/directory.rs b/src/directory/directory.rs index 4cbc70e22..fa4da2811 100644 --- a/src/directory/directory.rs +++ b/src/directory/directory.rs @@ -294,6 +294,13 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static { fn panic_handler(&self) -> Option { None } + + /// Returns true if this directory is in a position of requiring that tantivy cancel + /// whatever operation(s) it might be doing Typically this is just for the background + /// merge processes but could be used for anything + fn wants_cancel(&self) -> bool { + false + } } /// DirectoryClone diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs index f4c8d2579..7178934a9 100644 --- a/src/directory/managed_directory.rs +++ b/src/directory/managed_directory.rs @@ -374,6 +374,10 @@ impl Directory for ManagedDirectory { fn panic_handler(&self) -> Option { self.directory.panic_handler() } + + fn wants_cancel(&self) -> bool { + self.directory.wants_cancel() + } } impl Clone for ManagedDirectory { diff --git a/src/error.rs b/src/error.rs index 39fee20d7..7b80b9cb2 100644 --- a/src/error.rs +++ b/src/error.rs @@ -110,6 +110,9 @@ pub enum TantivyError { #[error("Deserialize error: {0}")] /// An error occurred while attempting to deserialize a document. DeserializeError(DeserializeError), + /// The user requested the current operation be cancelled + #[error("User requested cancel")] + Cancelled, } impl From for TantivyError { diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 0403a13dd..943dc6a84 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -332,6 +332,10 @@ impl IndexWriter { &delete_queue.cursor(), options.num_merge_threads, index.directory().panic_handler(), + { + let index = index.clone(); + move || index.directory().wants_cancel() + }, )?; let mut index_writer = Self { diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 0a497b7d7..babc16693 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -14,6 +14,7 @@ use crate::fastfield::AliveBitSet; use crate::fieldnorm::{FieldNormReader, FieldNormReaders, FieldNormsSerializer, FieldNormsWriter}; use crate::index::{Segment, SegmentComponent, SegmentReader}; use crate::indexer::doc_id_mapping::{MappingType, SegmentDocIdMapping}; +use crate::indexer::segment_updater::CancelSentinel; use crate::indexer::SegmentSerializer; use crate::postings::{InvertedIndexSerializer, Postings, SegmentPostings}; use crate::schema::{value_type_to_column_type, Field, FieldType, Schema}; @@ -80,6 +81,7 @@ pub struct IndexMerger { schema: Schema, pub(crate) readers: Vec, max_doc: u32, + cancel: Box, } struct DeltaComputer { @@ -145,9 +147,13 @@ fn extract_fast_field_required_columns(schema: &Schema) -> Vec<(String, ColumnTy } impl IndexMerger { - pub fn open(schema: Schema, segments: &[Segment]) -> crate::Result { + pub fn open( + schema: Schema, + segments: &[Segment], + cancel: Box, + ) -> crate::Result { let alive_bitset = segments.iter().map(|_| None).collect_vec(); - Self::open_with_custom_alive_set(schema, segments, alive_bitset) + Self::open_with_custom_alive_set(schema, segments, alive_bitset, cancel) } // Create merge with a custom delete set. @@ -166,6 +172,7 @@ impl IndexMerger { schema: Schema, segments: &[Segment], alive_bitset_opt: Vec>, + cancel: Box, ) -> crate::Result { let mut readers = vec![]; for (segment, new_alive_bitset_opt) in segments.iter().zip(alive_bitset_opt) { @@ -189,6 +196,7 @@ impl IndexMerger { schema, readers, max_doc, + cancel, }) } @@ -200,6 +208,9 @@ impl IndexMerger { let fields = FieldNormsWriter::fields_with_fieldnorm(&self.schema); let mut fieldnorms_data = Vec::with_capacity(self.max_doc as usize); for field in fields { + if self.cancel.wants_cancel() { + return Err(crate::TantivyError::Cancelled); + } fieldnorms_data.clear(); let fieldnorms_readers: Vec = self .readers @@ -235,6 +246,7 @@ impl IndexMerger { &required_columns, merge_row_order, fast_field_wrt, + || self.cancel.wants_cancel(), )?; Ok(()) } @@ -358,6 +370,9 @@ impl IndexMerger { let mut segment_postings_containing_the_term: Vec<(usize, SegmentPostings)> = vec![]; while merged_terms.advance() { + if self.cancel.wants_cancel() { + return Err(crate::TantivyError::Cancelled); + } segment_postings_containing_the_term.clear(); let term_bytes: &[u8] = merged_terms.key(); @@ -436,6 +451,9 @@ impl IndexMerger { let mut doc = segment_postings.doc(); while doc != TERMINATED { + if self.cancel.wants_cancel() { + return Err(crate::TantivyError::Cancelled); + } // deleted doc are skipped as they do not have a `remapped_doc_id`. if let Some(remapped_doc_id) = old_to_new_doc_id[doc as usize] { // we make sure to only write the term if @@ -472,6 +490,9 @@ impl IndexMerger { doc_id_mapping: &SegmentDocIdMapping, ) -> crate::Result<()> { for (field, field_entry) in self.schema.fields() { + if self.cancel.wants_cancel() { + return Err(crate::TantivyError::Cancelled); + } let fieldnorm_reader = fieldnorm_readers.get_field(field)?; if field_entry.is_indexed() { self.write_postings_for_field( @@ -510,6 +531,9 @@ impl IndexMerger { || store_reader.decompressor() != store_writer.compressor().into() { for doc_bytes_res in store_reader.iter_raw(reader.alive_bitset()) { + if self.cancel.wants_cancel() { + return Err(crate::TantivyError::Cancelled); + } let doc_bytes = doc_bytes_res?; store_writer.store_bytes(&doc_bytes)?; } diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 1ef607560..88220f75c 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -63,6 +63,26 @@ pub(crate) fn save_metas( } } +/// Describes a routine for allowing an operation in tantivy to be cleanly cancelled +/// +/// We provide an implementation for `Fn() -> bool`. +pub trait CancelSentinel: Send + Sync + 'static { + fn box_clone(&self) -> Box; + fn wants_cancel(&self) -> bool; +} + +impl bool + Send + Sync + 'static> CancelSentinel for F +where F: Clone +{ + fn box_clone(&self) -> Box { + Box::new(self.clone()) + } + + fn wants_cancel(&self) -> bool { + self() + } +} + // The segment update runner is in charge of processing all // of the `SegmentUpdate`s. // @@ -71,15 +91,26 @@ pub(crate) fn save_metas( // // We voluntarily pass a merge_operation ref to guarantee that // the merge_operation is alive during the process -#[derive(Clone)] -pub(crate) struct SegmentUpdater(Arc); +pub(crate) struct SegmentUpdater { + inner: Arc, + cancel: Box, +} + +impl Clone for SegmentUpdater { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + cancel: self.cancel.box_clone(), + } + } +} impl Deref for SegmentUpdater { type Target = InnerSegmentUpdater; #[inline] fn deref(&self) -> &Self::Target { - &self.0 + &self.inner } } @@ -99,6 +130,7 @@ fn merge( index: &Index, mut segment_entries: Vec, target_opstamp: Opstamp, + cancel: Box, ) -> crate::Result> { let num_docs = segment_entries .iter() @@ -125,7 +157,7 @@ fn merge( .collect(); // An IndexMerger is like a "view" of our merged segments. - let merger: IndexMerger = IndexMerger::open(index.schema(), &segments[..])?; + let merger = IndexMerger::open(index.schema(), &segments[..], cancel)?; // ... we just serialize this index merger in our new segment to merge the segments. let segment_serializer = SegmentSerializer::for_segment(merged_segment.clone())?; @@ -153,6 +185,7 @@ fn merge( pub fn merge_indices>>( indices: &[Index], output_directory: T, + cancel: Box, ) -> crate::Result { if indices.is_empty() { // If there are no indices to merge, there is no need to do anything. @@ -180,7 +213,13 @@ pub fn merge_indices>>( } let non_filter = segments.iter().map(|_| None).collect::>(); - merge_filtered_segments(&segments, target_settings, non_filter, output_directory) + merge_filtered_segments( + &segments, + target_settings, + non_filter, + output_directory, + cancel, + ) } /// Advanced: Merges a list of segments from different indices in a new index. @@ -201,6 +240,7 @@ pub fn merge_filtered_segments>>( target_settings: IndexSettings, filter_doc_ids: Vec>, output_directory: T, + cancel: Box, ) -> crate::Result { if segments.is_empty() { // If there are no indices to merge, there is no need to do anything. @@ -229,8 +269,12 @@ pub fn merge_filtered_segments>>( )?; let merged_segment = merged_index.new_segment(); let merged_segment_id = merged_segment.id(); - let merger: IndexMerger = - IndexMerger::open_with_custom_alive_set(merged_index.schema(), segments, filter_doc_ids)?; + let merger = IndexMerger::open_with_custom_alive_set( + merged_index.schema(), + segments, + filter_doc_ids, + cancel, + )?; let segment_serializer = SegmentSerializer::for_segment(merged_segment)?; let num_docs = merger.write(segment_serializer)?; @@ -299,6 +343,7 @@ impl SegmentUpdater { delete_cursor: &DeleteCursor, num_merge_threads: usize, panic_handler: Option, + cancel: impl Fn() -> bool + 'static + Send + Sync + Clone, ) -> crate::Result { let segments = index.searchable_segment_metas()?; let segment_manager = SegmentManager::from_segments(segments, delete_cursor); @@ -330,17 +375,20 @@ impl SegmentUpdater { crate::TantivyError::SystemError("Failed to spawn segment merging thread".to_string()) })?; let index_meta = index.load_metas()?; - Ok(SegmentUpdater(Arc::new(InnerSegmentUpdater { - active_index_meta: RwLock::new(Arc::new(index_meta)), - pool, - merge_thread_pool, - index, - segment_manager, - merge_policy: RwLock::new(Arc::new(DefaultMergePolicy::default())), - killed: AtomicBool::new(false), - stamper, - merge_operations: Default::default(), - }))) + Ok(SegmentUpdater { + inner: Arc::new(InnerSegmentUpdater { + active_index_meta: RwLock::new(Arc::new(index_meta)), + pool, + merge_thread_pool, + index, + segment_manager, + merge_policy: RwLock::new(Arc::new(DefaultMergePolicy::default())), + killed: AtomicBool::new(false), + stamper, + merge_operations: Default::default(), + }), + cancel: Box::new(cancel), + }) } pub fn get_merge_policy(&self) -> Arc { @@ -473,7 +521,7 @@ impl SegmentUpdater { opstamp: Opstamp, payload: Option, ) -> FutureResult { - let segment_updater: SegmentUpdater = self.clone(); + let segment_updater = self.clone(); self.schedule_task(move || { let segment_entries = segment_updater.purge_deletes(opstamp)?; let previous_metas = segment_updater.load_meta(); @@ -552,39 +600,18 @@ impl SegmentUpdater { let (scheduled_result, merging_future_send) = FutureResult::create("Merge operation failed."); + let cancel = self.cancel.box_clone(); self.merge_thread_pool.spawn(move || { // The fact that `merge_operation` is moved here is important. // Its lifetime is used to track how many merging thread are currently running, // as well as which segment is currently in merge and therefore should not be // candidate for another merge. - let merge_panic_res = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { - merge( - &segment_updater.index, - segment_entries, - merge_operation.target_opstamp(), - ) - })); - let merge_res = match merge_panic_res { - Ok(merge_res) => merge_res, - Err(panic_err) => { - let panic_str = if let Some(msg) = panic_err.downcast_ref::<&str>() { - *msg - } else if let Some(msg) = panic_err.downcast_ref::() { - msg.as_str() - } else { - "UNKNOWN" - }; - let _send_result = merging_future_send.send(Err(TantivyError::SystemError( - format!("Merge thread panicked: {panic_str}"), - ))); - // Resume unwinding because we forced unwind safety with - // `std::panic::AssertUnwindSafe` Use a specific message so - // the panic_handler can double check that we properly caught the panic. - let boxed_panic_message: Box = Box::new(PANIC_CAUGHT); - std::panic::resume_unwind(boxed_panic_message); - } - }; - match merge_res { + match merge( + &segment_updater.index, + segment_entries, + merge_operation.target_opstamp(), + cancel, + ) { Ok(after_merge_segment_entry) => { let res = segment_updater.end_merge(merge_operation, after_merge_segment_entry); let _send_result = merging_future_send.send(res); @@ -950,7 +977,7 @@ mod tests { assert_eq!(indices.len(), 3); let output_directory: Box = Box::::default(); - let index = merge_indices(&indices, output_directory)?; + let index = merge_indices(&indices, output_directory, Box::new(|| false))?; assert_eq!(index.schema(), schema); let segments = index.searchable_segments()?; @@ -964,7 +991,7 @@ mod tests { #[test] fn test_merge_empty_indices_array() { - let merge_result = merge_indices(&[], RamDirectory::default()); + let merge_result = merge_indices(&[], RamDirectory::default(), Box::new(|| false)); assert!(merge_result.is_err()); } @@ -991,7 +1018,11 @@ mod tests { }; // mismatched schema index list - let result = merge_indices(&[first_index, second_index], RamDirectory::default()); + let result = merge_indices( + &[first_index, second_index], + RamDirectory::default(), + Box::new(|| false), + ); assert!(result.is_err()); Ok(()) @@ -1039,6 +1070,7 @@ mod tests { target_settings, filter_segments, RamDirectory::default(), + Box::new(|| false), )?; let segments = merged_index.searchable_segments()?; @@ -1084,6 +1116,7 @@ mod tests { target_settings, filter_segments, RamDirectory::default(), + Box::new(|| false), )?; let segments = index.searchable_segments()?; @@ -1147,10 +1180,11 @@ mod tests { target_schema, target_settings.clone(), )?; - let merger: IndexMerger = IndexMerger::open_with_custom_alive_set( + let merger = IndexMerger::open_with_custom_alive_set( merged_index.schema(), &segments[..], filter_segments, + Box::new(|| false), )?; let doc_ids_alive: Vec<_> = merger.readers[0].doc_ids_alive().collect(); @@ -1162,10 +1196,11 @@ mod tests { let target_schema = segments[0].schema(); let merged_index = Index::create(RamDirectory::default(), target_schema, target_settings)?; - let merger: IndexMerger = IndexMerger::open_with_custom_alive_set( + let merger = IndexMerger::open_with_custom_alive_set( merged_index.schema(), &segments[..], filter_segments, + Box::new(|| false), )?; let doc_ids_alive: Vec<_> = merger.readers[0].doc_ids_alive().collect();