From 11543b016f2194ec0c39eeec03b8f1bbe20c2dfd Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Mon, 22 Jun 2026 11:34:28 +0800 Subject: [PATCH] refactor: try best to make sure hook is called on manifest update (#8329) * refactor: try best to make sure hook is called on manifest update * chore: minimize comments * chore: make sure hook are merged for same region --- src/mito2/src/engine/region_hook.rs | 88 +++++++++++++- src/mito2/src/region.rs | 121 +++++++++++-------- src/mito2/src/worker/handle_apply_staging.rs | 8 +- 3 files changed, 155 insertions(+), 62 deletions(-) diff --git a/src/mito2/src/engine/region_hook.rs b/src/mito2/src/engine/region_hook.rs index 0abf3fc110..eca254b842 100644 --- a/src/mito2/src/engine/region_hook.rs +++ b/src/mito2/src/engine/region_hook.rs @@ -62,10 +62,17 @@ //! `on_sst_files_written` is invoked at the SST write site (flush task or compaction task), //! immediately after SST files are written but **before** the manifest is committed. //! -//! `on_manifest_updated` is centralized in [`ManifestContext::update_manifest_with_state_check`], -//! so it automatically covers all manifest write paths that go through `ManifestContext`. -//! The sole exception is [`MitoRegion::exit_staging_on_success`], which returns the hook -//! payload to the caller so the hook can be invoked after dropping the manifest write lock. +//! `on_manifest_updated` is funneled through [`ManifestContext::update_locked`], +//! the sole caller of the low-level [`RegionManifestManager::update`], which +//! packages each successful write into a [`PendingManifestHook`]. The caller +//! owns the write lock, drops it, and *then* fires the receipt — the hook must +//! never run under the lock. [`ManifestContext::update_manifest`] is the common +//! case: it acquires the lock, delegates to `update_locked`, and fires the +//! receipt in one go. Multi-step sequences (staging-exit, role-state backfill) +//! call `update_locked` directly under their own held guard. +//! +//! Non-logical writes (GC, staging bookkeeping) call the manager's own methods +//! directly and intentionally do not fire the hook. //! //! ## Future work //! @@ -74,6 +81,9 @@ //! //! [`on_sst_files_written`]: RegionHook::on_sst_files_written //! [`on_manifest_updated`]: RegionHook::on_manifest_updated +//! [`RegionManifestManager::update`]: crate::manifest::manager::RegionManifestManager::update +//! [`ManifestContext::update_locked`]: crate::region::ManifestContext::update_locked +//! [`ManifestContext::update_manifest`]: crate::region::ManifestContext::update_manifest use std::fmt::Debug; use std::sync::Arc; @@ -87,6 +97,76 @@ use crate::manifest::action::RegionMetaActionList; use crate::sst::file::FileMeta; use crate::sst::parquet::SstInfo; +/// A deferred [`RegionHook::on_manifest_updated`] notification produced by a +/// logical manifest write via [`ManifestContext::update_locked`](crate::region::ManifestContext::update_locked). +/// +/// Must be [`fire`](Self::fire)d **after** the manifest write lock is released +/// (the hook may read the manifest). `#[must_use]` so a forgotten receipt warns. +#[must_use = "the region hook must be fired after releasing the manifest write lock"] +pub(crate) struct PendingManifestHook { + region_id: RegionId, + /// `None` when no hook is registered (fire becomes a no-op). + action_list: Option, + version: ManifestVersion, + hook: Option, +} + +impl PendingManifestHook { + pub(crate) fn new( + region_id: RegionId, + action_list: Option, + version: ManifestVersion, + hook: Option, + ) -> Self { + Self { + region_id, + action_list, + version, + hook, + } + } + + /// The manifest version produced by the write. + pub(crate) fn version(&self) -> ManifestVersion { + self.version + } + + /// Fires the hook if one is registered. Safe to call unconditionally: it is + /// a no-op when no hook is registered. + pub(crate) async fn fire(self) { + if let (Some(hook), Some(action_list)) = (self.hook, self.action_list) { + hook.on_manifest_updated(self.region_id, &action_list, self.version) + .await; + } + } + + /// Merges two pending notifications into one so consumers observe a single + /// `on_manifest_updated` call covering all actions. The combined action list + /// keeps `self`'s actions followed by `other`'s, and the *later* manifest + /// version wins. Used when a sequence of writes (e.g. staging-exit followed + /// by metadata backfill) should notify the hook exactly once. + pub(crate) fn merge(self, other: PendingManifestHook) -> PendingManifestHook { + debug_assert_eq!( + self.region_id, other.region_id, + "Cannot merge pending hooks of different regions: {:?} and {:?}", + self.region_id, other.region_id + ); + PendingManifestHook { + region_id: self.region_id, + action_list: match (self.action_list, other.action_list) { + (Some(mut a), Some(b)) => { + a.actions.extend(b.actions); + Some(a) + } + (a, None) => a, + (None, b) => b, + }, + version: self.version.max(other.version), + hook: self.hook.or(other.hook), + } + } +} + /// Information about a single SST data file written during flush or compaction. pub struct SstFileInfo<'a> { pub sst_info_ref: &'a SstInfo, diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 72c269bd0e..23d3347a76 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -46,7 +46,7 @@ use tokio::sync::RwLockWriteGuard; pub use utils::*; use crate::access_layer::AccessLayerRef; -use crate::engine::region_hook::RegionHookRef; +use crate::engine::region_hook::{PendingManifestHook, RegionHookRef}; use crate::error::{ FlushableRegionStateSnafu, InvalidPartitionExprSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result, UnexpectedSnafu, UpdateManifestSnafu, @@ -450,7 +450,7 @@ impl MitoRegion { self.manifest_ctx.manifest_manager.write().await; let current_state = self.state(); - let hook_payload: Option<(RegionMetaActionList, ManifestVersion)> = match state { + let hook_payload: Option = match state { SettableRegionRoleState::Leader => { // Exit staging mode and return to normal writable leader // Only allowed from staging state @@ -555,7 +555,7 @@ impl MitoRegion { }; // Hack(zhongzc): If we have just become leader (writable), persist any backfilled metadata. - let mut backfill_hook_payload: Option<(RegionMetaActionList, ManifestVersion)> = None; + let mut backfill_hook_payload: Option = None; if self.state() == RegionRoleState::Leader(RegionLeaderState::Writable) { // Persist backfilled metadata if manifest is missing fields (e.g., partition_expr) let manifest_meta = &manager.manifest().metadata; @@ -568,17 +568,18 @@ impl MitoRegion { append_mode: None, }); let action_list = RegionMetaActionList::with_action(action); - let hook = self.manifest_ctx.hook(); - let al_for_hook = hook.as_ref().map(|_| action_list.clone()); - let result = manager.update(action_list, false).await; - - match result { - Ok(version) => { + match self + .manifest_ctx + .update_locked(&mut manager, action_list, false) + .await + { + Ok(pending) => { info!( "Successfully persisted backfilled metadata for region {}, version: {}", - self.region_id, version + self.region_id, + pending.version() ); - backfill_hook_payload = al_for_hook.map(|al| (al, version)); + backfill_hook_payload = Some(pending); } Err(e) => { warn!(e; "Failed to persist backfilled metadata for region {}", self.region_id); @@ -589,25 +590,17 @@ impl MitoRegion { drop(manager); - // Merge both payloads into a single hook invocation so consumers see - // the complete set of actions and the latest manifest version. + // Merge both payloads so consumers see the complete set of actions in + // one notification. The lock is released, so it's safe to fire. let merged = match (hook_payload, backfill_hook_payload) { - (Some((al, _v)), Some((backfill_al, backfill_v))) => { - // Combine action lists; use the later version (backfill happens after staging merge). - let mut combined = al; - combined.actions.extend(backfill_al.actions); - Some((combined, backfill_v)) - } + (Some(staging), Some(backfill)) => Some(staging.merge(backfill)), (Some(payload), None) => Some(payload), (None, Some(payload)) => Some(payload), (None, None) => None, }; - if let Some((action_list, version)) = merged - && let Some(hook) = self.manifest_ctx.hook() - { - hook.on_manifest_updated(self.region_id, &action_list, version) - .await; + if let Some(pending) = merged { + pending.fire().await; } Ok(()) @@ -824,15 +817,15 @@ impl MitoRegion { /// Merges staged manifest actions into the live manifest and exits staging mode. /// /// The caller must hold the manifest write lock and pass it via `manager`. - /// Returns `Ok(Some(action_list, version))` when staging manifests were merged, - /// or `Ok(None)` when there were no staged manifests to merge. + /// Returns `Ok(Some(pending))` when staging manifests were merged, or + /// `Ok(None)` when there were no staged manifests to merge. /// - /// **Important:** the returned hook payload must be invoked **after** the caller - /// drops the manifest write lock, to avoid deadlocking if the hook reads the manifest. + /// **Important:** [`fire`](PendingManifestHook::fire) the receipt only after + /// dropping the lock — the hook may read the manifest and deadlock otherwise. pub(crate) async fn exit_staging_on_success( &self, manager: &mut RwLockWriteGuard<'_, RegionManifestManager>, - ) -> Result> { + ) -> Result> { let current_state = self.manifest_ctx.current_state(); ensure!( current_state == RegionRoleState::Leader(RegionLeaderState::Staging), @@ -898,12 +891,11 @@ impl MitoRegion { // Submit merged actions using the manifest manager's update method. // Pass `false` so it saves to normal directory, not staging. - // - // Clone the action list before it is consumed so we can return it to - // the caller for hook invocation after the lock is dropped. - let hook = self.manifest_ctx.hook(); - let action_list_for_hook = hook.as_ref().map(|_| merged_actions.clone()); - let new_version = manager.update(merged_actions, false).await?; + let pending = self + .manifest_ctx + .update_locked(manager, merged_actions, false) + .await?; + let new_version = pending.version(); info!( "Successfully submitted merged staged manifests for region {}, new version: {}", self.region_id, new_version @@ -928,8 +920,7 @@ impl MitoRegion { self.exit_staging()?; // Return the hook payload; the caller invokes the hook after dropping the lock. - let hook_payload = action_list_for_hook.map(|action_list| (action_list, new_version)); - Ok(hook_payload) + Ok(Some(pending)) } /// Returns the partition expression string for this region. @@ -995,7 +986,9 @@ impl Drop for MitoRegion { /// Context to update the region manifest. #[derive(Debug)] pub(crate) struct ManifestContext { - /// Manager to maintain manifest for this region. + /// Manager to maintain manifest for this region. Logical writes go through + /// [`update_locked`](Self::update_locked) (or an [`update_manifest`](Self::update_manifest) + /// variant) so they produce a [`PendingManifestHook`]. pub(crate) manifest_manager: tokio::sync::RwLock, /// The state of the region. The region checks the state before updating /// manifest. @@ -1178,6 +1171,38 @@ impl ManifestContext { .await } + /// Performs a manifest write under a caller-held write lock and returns a + /// [`PendingManifestHook`] to [`fire`](PendingManifestHook::fire) after + /// dropping the lock. This is the sole caller of + /// [`RegionManifestManager::update`], so it is the funnel through which all + /// logical manifest writes notify the hook. + /// + /// Does not validate state or edit applicability — the caller must do so + /// while still holding the lock (see `update_manifest_with_state_check` + /// and `MitoRegion::exit_staging_on_success`). + pub(crate) async fn update_locked( + &self, + manager: &mut RegionManifestManager, + action_list: RegionMetaActionList, + is_staging: bool, + ) -> Result { + let region_id = manager.manifest().metadata.region_id; + // Clone before `action_list` is moved into `update` so the hook still + // sees what was written. + let action_list_for_hook = self.hook.as_ref().map(|_| action_list.clone()); + let version = manager + .update(action_list, is_staging) + .await + .inspect_err(|e| error!(e; "Failed to update manifest, region_id: {}", region_id))?; + + Ok(PendingManifestHook::new( + region_id, + action_list_for_hook, + version, + self.hook.clone(), + )) + } + async fn update_manifest_with_state_check( &self, action_list: RegionMetaActionList, @@ -1243,15 +1268,12 @@ impl ManifestContext { } } - // If a region hook is registered, clone the action list before it is consumed - // so we can pass a reference to the hook after a successful update. - let hook = self.hook.clone(); - let action_list_for_hook = hook.as_ref().map(|_| action_list.clone()); + // `update_locked` returns a `PendingManifestHook` we fire after releasing the lock. let region_id = manifest.metadata.region_id; - let version = manager - .update(action_list, is_staging) - .await - .inspect_err(|e| error!(e; "Failed to update manifest, region_id: {}", region_id))?; + let pending = self + .update_locked(&mut manager, action_list, is_staging) + .await?; + let version = pending.version(); // Drop the write lock before invoking the hook. Hook implementations may // read the manifest or send region requests that acquire this lock; @@ -1265,12 +1287,7 @@ impl ManifestContext { ); } - if let Some(hook) = hook - && let Some(action_list) = action_list_for_hook - { - hook.on_manifest_updated(region_id, &action_list, version) - .await; - } + pending.fire().await; Ok(version) } diff --git a/src/mito2/src/worker/handle_apply_staging.rs b/src/mito2/src/worker/handle_apply_staging.rs index cefdda1fea..4ac8a8990f 100644 --- a/src/mito2/src/worker/handle_apply_staging.rs +++ b/src/mito2/src/worker/handle_apply_staging.rs @@ -159,13 +159,9 @@ impl RegionWorkerLoop { return; } }; - // Drop the write lock before invoking the hook to avoid deadlock. drop(manager); - if let Some((action_list, version)) = hook_payload - && let Some(hook) = region.manifest_ctx.hook() - { - hook.on_manifest_updated(region.region_id, &action_list, version) - .await; + if let Some(pending) = hook_payload { + pending.fire().await; } sender.send(Ok(0)); } else {