fix: provide plugins for open region of remote compactor; catch manifest update in exit staging

This commit is contained in:
Ning Sun
2026-06-09 20:41:20 -07:00
parent 396be1defc
commit 8fcd28b376
4 changed files with 36 additions and 7 deletions

View File

@@ -114,13 +114,15 @@ pub struct CompactionRegion {
}
/// OpenCompactionRegionRequest represents the request to open a compaction region.
#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct OpenCompactionRegionRequest {
pub region_id: RegionId,
pub table_dir: String,
pub path_type: PathType,
pub region_options: RegionOptions,
pub max_parallelism: usize,
/// Plugins for the compaction region, used to look up the [`RegionHook`](crate::engine::region_hook::RegionHook).
pub plugins: Plugins,
}
/// Open a compaction region from a compaction request.
@@ -179,10 +181,11 @@ pub async fn open_compaction_region(
let manifest = manifest_manager.manifest();
let region_metadata = manifest.metadata.clone();
let hook: Option<RegionHookRef> = req.plugins.get();
let manifest_ctx = Arc::new(ManifestContext::new(
manifest_manager,
RegionRoleState::Leader(RegionLeaderState::Writable),
None,
hook,
));
let file_purger = {
@@ -236,7 +239,7 @@ pub async fn open_compaction_region(
file_purger: Some(file_purger),
ttl: Some(ttl),
max_parallelism: req.max_parallelism,
plugins: Plugins::new(),
plugins: req.plugins.clone(),
})
}

View File

@@ -18,6 +18,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use api::v1::Rows;
use common_base::Plugins;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::RecordBatches;
@@ -619,6 +620,7 @@ async fn test_open_compaction_region_with_format(flat_format: bool) {
path_type: PathType::Bare,
region_options: RegionOptions::default(),
max_parallelism: 1,
plugins: Plugins::new(),
};
let compaction_region = open_compaction_region(

View File

@@ -37,15 +37,22 @@
//! |------------------------------|:----------------------:|:---------------------:|
//! | Flush (memtable → SST) | ✅ Yes | ✅ Yes |
//! | Local compaction | ✅ Yes | ✅ Yes |
//! | Remote compaction | ✅ (on compactor node) | ✅ (on compactor node) |
//! | Remote compaction | ✅ (compactor node) ¹ | ✅ (compactor node) ¹ |
//! | RegionEdit / bulk ingestion | ❌ (files pre-written) | ✅ Yes |
//! | Copy region | ❌ (object-store copy) | ✅ Yes |
//! | Apply staging | ❌ (delegates to edit) | ✅ Yes |
//! | Apply staging | ❌ (delegates to edit) | ✅ Yes ² |
//! | Alter (schema change) | ❌ (no SST files) | ✅ Yes |
//! | Truncate | ❌ (removes files) | ✅ Yes |
//! | Enter staging | ❌ (no SST files) | ✅ Yes |
//! | Async index build | ❌ (index files only) | ✅ Yes |
//!
//! ¹ Remote compaction runs on a dedicated compactor node via `open_compaction_region()`.
//! The caller must pass plugins via `OpenCompactionRegionRequest` to enable hooks on the
//! compactor node.
//! ² Apply staging fires `on_manifest_updated` twice: once when the staging SST files are
//! committed via `RegionEdit`, and once when `exit_staging_on_success` merges all staged
//! manifest actions into the live manifest.
//!
//! The following paths do **not** trigger any hook:
//! - Follower region sync / catchup (manifest read-only; followers don't author changes)
//! - GC / checkpoint / drop / remap (internal bookkeeping, not logical state changes)
@@ -57,6 +64,8 @@
//!
//! `on_manifest_updated` is centralized in [`ManifestContext::update_manifest_with_state_check`],
//! so it automatically covers all manifest write paths that go through `ManifestContext`.
//! The sole exception is [`MitoRegion::exit_staging_on_success`], which invokes the hook
//! inline because it receives a pre-acquired manifest write lock from the caller.
//!
//! ## Future work
//!

View File

@@ -852,14 +852,29 @@ impl MitoRegion {
);
}
// Submit merged actions using the manifest manager's update method
// Pass the `false` so it saves to normal directory, not staging
// Submit merged actions using the manifest manager's update method.
// Pass `false` so it saves to normal directory, not staging.
//
// Unlike the centralized `update_manifest_with_state_check` path, we invoke
// the hook inline here because the caller already holds the manifest write lock
// and we cannot release it mid-method. This is acceptable because staging exit
// is infrequent and callers manage the lock scope tightly.
let hook = self.manifest_ctx.hook();
let action_list_for_hook = hook.as_ref().map(|_| merged_actions.clone());
let new_version = manager.update(merged_actions, false).await?;
info!(
"Successfully submitted merged staged manifests for region {}, new version: {}",
self.region_id, new_version
);
// Invoke the on_manifest_updated hook for the merged staging actions.
if let Some(hook) = hook
&& let Some(action_list) = action_list_for_hook
{
hook.on_manifest_updated(self.region_id, &action_list, new_version)
.await;
}
// Apply the merged changes to in-memory version control
if let Some(change) = merged_partition_expr_change {
let mut new_metadata = self.version().metadata.as_ref().clone();