From 8fcd28b37656d8eae0edd7762885e8de75a8d2fc Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Tue, 9 Jun 2026 20:41:20 -0700 Subject: [PATCH] fix: provide plugins for open region of remote compactor; catch manifest update in exit staging --- src/mito2/src/compaction/compactor.rs | 9 ++++++--- src/mito2/src/engine/open_test.rs | 2 ++ src/mito2/src/engine/region_hook.rs | 13 +++++++++++-- src/mito2/src/region.rs | 19 +++++++++++++++++-- 4 files changed, 36 insertions(+), 7 deletions(-) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index d20270ea48..b712b229ae 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -114,13 +114,15 @@ pub struct CompactionRegion { } /// OpenCompactionRegionRequest represents the request to open a compaction region. -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct OpenCompactionRegionRequest { pub region_id: RegionId, pub table_dir: String, pub path_type: PathType, pub region_options: RegionOptions, pub max_parallelism: usize, + /// Plugins for the compaction region, used to look up the [`RegionHook`](crate::engine::region_hook::RegionHook). + pub plugins: Plugins, } /// Open a compaction region from a compaction request. @@ -179,10 +181,11 @@ pub async fn open_compaction_region( let manifest = manifest_manager.manifest(); let region_metadata = manifest.metadata.clone(); + let hook: Option = req.plugins.get(); let manifest_ctx = Arc::new(ManifestContext::new( manifest_manager, RegionRoleState::Leader(RegionLeaderState::Writable), - None, + hook, )); let file_purger = { @@ -236,7 +239,7 @@ pub async fn open_compaction_region( file_purger: Some(file_purger), ttl: Some(ttl), max_parallelism: req.max_parallelism, - plugins: Plugins::new(), + plugins: req.plugins.clone(), }) } diff --git a/src/mito2/src/engine/open_test.rs b/src/mito2/src/engine/open_test.rs index 11279954a9..b22e17dd01 100644 --- a/src/mito2/src/engine/open_test.rs +++ b/src/mito2/src/engine/open_test.rs @@ -18,6 +18,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; use api::v1::Rows; +use common_base::Plugins; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_recordbatch::RecordBatches; @@ -619,6 +620,7 @@ async fn test_open_compaction_region_with_format(flat_format: bool) { path_type: PathType::Bare, region_options: RegionOptions::default(), max_parallelism: 1, + plugins: Plugins::new(), }; let compaction_region = open_compaction_region( diff --git a/src/mito2/src/engine/region_hook.rs b/src/mito2/src/engine/region_hook.rs index c5ae12651d..3fda3e542e 100644 --- a/src/mito2/src/engine/region_hook.rs +++ b/src/mito2/src/engine/region_hook.rs @@ -37,15 +37,22 @@ //! |------------------------------|:----------------------:|:---------------------:| //! | Flush (memtable → SST) | ✅ Yes | ✅ Yes | //! | Local compaction | ✅ Yes | ✅ Yes | -//! | Remote compaction | ✅ (on compactor node) | ✅ (on compactor node) | +//! | Remote compaction | ✅ (compactor node) ¹ | ✅ (compactor node) ¹ | //! | RegionEdit / bulk ingestion | ❌ (files pre-written) | ✅ Yes | //! | Copy region | ❌ (object-store copy) | ✅ Yes | -//! | Apply staging | ❌ (delegates to edit) | ✅ Yes | +//! | Apply staging | ❌ (delegates to edit) | ✅ Yes ² | //! | Alter (schema change) | ❌ (no SST files) | ✅ Yes | //! | Truncate | ❌ (removes files) | ✅ Yes | //! | Enter staging | ❌ (no SST files) | ✅ Yes | //! | Async index build | ❌ (index files only) | ✅ Yes | //! +//! ¹ Remote compaction runs on a dedicated compactor node via `open_compaction_region()`. +//! The caller must pass plugins via `OpenCompactionRegionRequest` to enable hooks on the +//! compactor node. +//! ² Apply staging fires `on_manifest_updated` twice: once when the staging SST files are +//! committed via `RegionEdit`, and once when `exit_staging_on_success` merges all staged +//! manifest actions into the live manifest. +//! //! The following paths do **not** trigger any hook: //! - Follower region sync / catchup (manifest read-only; followers don't author changes) //! - GC / checkpoint / drop / remap (internal bookkeeping, not logical state changes) @@ -57,6 +64,8 @@ //! //! `on_manifest_updated` is centralized in [`ManifestContext::update_manifest_with_state_check`], //! so it automatically covers all manifest write paths that go through `ManifestContext`. +//! The sole exception is [`MitoRegion::exit_staging_on_success`], which invokes the hook +//! inline because it receives a pre-acquired manifest write lock from the caller. //! //! ## Future work //! diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 663ee7e8eb..80c3e41b5b 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -852,14 +852,29 @@ impl MitoRegion { ); } - // Submit merged actions using the manifest manager's update method - // Pass the `false` so it saves to normal directory, not staging + // Submit merged actions using the manifest manager's update method. + // Pass `false` so it saves to normal directory, not staging. + // + // Unlike the centralized `update_manifest_with_state_check` path, we invoke + // the hook inline here because the caller already holds the manifest write lock + // and we cannot release it mid-method. This is acceptable because staging exit + // is infrequent and callers manage the lock scope tightly. + let hook = self.manifest_ctx.hook(); + let action_list_for_hook = hook.as_ref().map(|_| merged_actions.clone()); let new_version = manager.update(merged_actions, false).await?; info!( "Successfully submitted merged staged manifests for region {}, new version: {}", self.region_id, new_version ); + // Invoke the on_manifest_updated hook for the merged staging actions. + if let Some(hook) = hook + && let Some(action_list) = action_list_for_hook + { + hook.on_manifest_updated(self.region_id, &action_list, new_version) + .await; + } + // Apply the merged changes to in-memory version control if let Some(change) = merged_partition_expr_change { let mut new_metadata = self.version().metadata.as_ref().clone();