refactor: try best to make sure hook is called on manifest update (#8329)

* refactor: try best to make sure hook is called on manifest update

* chore: minimize comments

* chore: make sure hook are merged for same region
This commit is contained in:
Ning Sun
2026-06-22 11:34:28 +08:00
committed by GitHub
parent b365e0dd95
commit 11543b016f
3 changed files with 155 additions and 62 deletions

View File

@@ -62,10 +62,17 @@
//! `on_sst_files_written` is invoked at the SST write site (flush task or compaction task),
//! immediately after SST files are written but **before** the manifest is committed.
//!
//! `on_manifest_updated` is centralized in [`ManifestContext::update_manifest_with_state_check`],
//! so it automatically covers all manifest write paths that go through `ManifestContext`.
//! The sole exception is [`MitoRegion::exit_staging_on_success`], which returns the hook
//! payload to the caller so the hook can be invoked after dropping the manifest write lock.
//! `on_manifest_updated` is funneled through [`ManifestContext::update_locked`],
//! the sole caller of the low-level [`RegionManifestManager::update`], which
//! packages each successful write into a [`PendingManifestHook`]. The caller
//! owns the write lock, drops it, and *then* fires the receipt — the hook must
//! never run under the lock. [`ManifestContext::update_manifest`] is the common
//! case: it acquires the lock, delegates to `update_locked`, and fires the
//! receipt in one go. Multi-step sequences (staging-exit, role-state backfill)
//! call `update_locked` directly under their own held guard.
//!
//! Non-logical writes (GC, staging bookkeeping) call the manager's own methods
//! directly and intentionally do not fire the hook.
//!
//! ## Future work
//!
@@ -74,6 +81,9 @@
//!
//! [`on_sst_files_written`]: RegionHook::on_sst_files_written
//! [`on_manifest_updated`]: RegionHook::on_manifest_updated
//! [`RegionManifestManager::update`]: crate::manifest::manager::RegionManifestManager::update
//! [`ManifestContext::update_locked`]: crate::region::ManifestContext::update_locked
//! [`ManifestContext::update_manifest`]: crate::region::ManifestContext::update_manifest
use std::fmt::Debug;
use std::sync::Arc;
@@ -87,6 +97,76 @@ use crate::manifest::action::RegionMetaActionList;
use crate::sst::file::FileMeta;
use crate::sst::parquet::SstInfo;
/// A deferred [`RegionHook::on_manifest_updated`] notification produced by a
/// logical manifest write via [`ManifestContext::update_locked`](crate::region::ManifestContext::update_locked).
///
/// Must be [`fire`](Self::fire)d **after** the manifest write lock is released
/// (the hook may read the manifest). `#[must_use]` so a forgotten receipt warns.
#[must_use = "the region hook must be fired after releasing the manifest write lock"]
pub(crate) struct PendingManifestHook {
region_id: RegionId,
/// `None` when no hook is registered (fire becomes a no-op).
action_list: Option<RegionMetaActionList>,
version: ManifestVersion,
hook: Option<RegionHookRef>,
}
impl PendingManifestHook {
pub(crate) fn new(
region_id: RegionId,
action_list: Option<RegionMetaActionList>,
version: ManifestVersion,
hook: Option<RegionHookRef>,
) -> Self {
Self {
region_id,
action_list,
version,
hook,
}
}
/// The manifest version produced by the write.
pub(crate) fn version(&self) -> ManifestVersion {
self.version
}
/// Fires the hook if one is registered. Safe to call unconditionally: it is
/// a no-op when no hook is registered.
pub(crate) async fn fire(self) {
if let (Some(hook), Some(action_list)) = (self.hook, self.action_list) {
hook.on_manifest_updated(self.region_id, &action_list, self.version)
.await;
}
}
/// Merges two pending notifications into one so consumers observe a single
/// `on_manifest_updated` call covering all actions. The combined action list
/// keeps `self`'s actions followed by `other`'s, and the *later* manifest
/// version wins. Used when a sequence of writes (e.g. staging-exit followed
/// by metadata backfill) should notify the hook exactly once.
pub(crate) fn merge(self, other: PendingManifestHook) -> PendingManifestHook {
debug_assert_eq!(
self.region_id, other.region_id,
"Cannot merge pending hooks of different regions: {:?} and {:?}",
self.region_id, other.region_id
);
PendingManifestHook {
region_id: self.region_id,
action_list: match (self.action_list, other.action_list) {
(Some(mut a), Some(b)) => {
a.actions.extend(b.actions);
Some(a)
}
(a, None) => a,
(None, b) => b,
},
version: self.version.max(other.version),
hook: self.hook.or(other.hook),
}
}
}
/// Information about a single SST data file written during flush or compaction.
pub struct SstFileInfo<'a> {
pub sst_info_ref: &'a SstInfo,

View File

@@ -46,7 +46,7 @@ use tokio::sync::RwLockWriteGuard;
pub use utils::*;
use crate::access_layer::AccessLayerRef;
use crate::engine::region_hook::RegionHookRef;
use crate::engine::region_hook::{PendingManifestHook, RegionHookRef};
use crate::error::{
FlushableRegionStateSnafu, InvalidPartitionExprSnafu, RegionNotFoundSnafu, RegionStateSnafu,
RegionTruncatedSnafu, Result, UnexpectedSnafu, UpdateManifestSnafu,
@@ -450,7 +450,7 @@ impl MitoRegion {
self.manifest_ctx.manifest_manager.write().await;
let current_state = self.state();
let hook_payload: Option<(RegionMetaActionList, ManifestVersion)> = match state {
let hook_payload: Option<PendingManifestHook> = match state {
SettableRegionRoleState::Leader => {
// Exit staging mode and return to normal writable leader
// Only allowed from staging state
@@ -555,7 +555,7 @@ impl MitoRegion {
};
// Hack(zhongzc): If we have just become leader (writable), persist any backfilled metadata.
let mut backfill_hook_payload: Option<(RegionMetaActionList, ManifestVersion)> = None;
let mut backfill_hook_payload: Option<PendingManifestHook> = None;
if self.state() == RegionRoleState::Leader(RegionLeaderState::Writable) {
// Persist backfilled metadata if manifest is missing fields (e.g., partition_expr)
let manifest_meta = &manager.manifest().metadata;
@@ -568,17 +568,18 @@ impl MitoRegion {
append_mode: None,
});
let action_list = RegionMetaActionList::with_action(action);
let hook = self.manifest_ctx.hook();
let al_for_hook = hook.as_ref().map(|_| action_list.clone());
let result = manager.update(action_list, false).await;
match result {
Ok(version) => {
match self
.manifest_ctx
.update_locked(&mut manager, action_list, false)
.await
{
Ok(pending) => {
info!(
"Successfully persisted backfilled metadata for region {}, version: {}",
self.region_id, version
self.region_id,
pending.version()
);
backfill_hook_payload = al_for_hook.map(|al| (al, version));
backfill_hook_payload = Some(pending);
}
Err(e) => {
warn!(e; "Failed to persist backfilled metadata for region {}", self.region_id);
@@ -589,25 +590,17 @@ impl MitoRegion {
drop(manager);
// Merge both payloads into a single hook invocation so consumers see
// the complete set of actions and the latest manifest version.
// Merge both payloads so consumers see the complete set of actions in
// one notification. The lock is released, so it's safe to fire.
let merged = match (hook_payload, backfill_hook_payload) {
(Some((al, _v)), Some((backfill_al, backfill_v))) => {
// Combine action lists; use the later version (backfill happens after staging merge).
let mut combined = al;
combined.actions.extend(backfill_al.actions);
Some((combined, backfill_v))
}
(Some(staging), Some(backfill)) => Some(staging.merge(backfill)),
(Some(payload), None) => Some(payload),
(None, Some(payload)) => Some(payload),
(None, None) => None,
};
if let Some((action_list, version)) = merged
&& let Some(hook) = self.manifest_ctx.hook()
{
hook.on_manifest_updated(self.region_id, &action_list, version)
.await;
if let Some(pending) = merged {
pending.fire().await;
}
Ok(())
@@ -824,15 +817,15 @@ impl MitoRegion {
/// Merges staged manifest actions into the live manifest and exits staging mode.
///
/// The caller must hold the manifest write lock and pass it via `manager`.
/// Returns `Ok(Some(action_list, version))` when staging manifests were merged,
/// or `Ok(None)` when there were no staged manifests to merge.
/// Returns `Ok(Some(pending))` when staging manifests were merged, or
/// `Ok(None)` when there were no staged manifests to merge.
///
/// **Important:** the returned hook payload must be invoked **after** the caller
/// drops the manifest write lock, to avoid deadlocking if the hook reads the manifest.
/// **Important:** [`fire`](PendingManifestHook::fire) the receipt only after
/// dropping the lock — the hook may read the manifest and deadlock otherwise.
pub(crate) async fn exit_staging_on_success(
&self,
manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
) -> Result<Option<(RegionMetaActionList, ManifestVersion)>> {
) -> Result<Option<PendingManifestHook>> {
let current_state = self.manifest_ctx.current_state();
ensure!(
current_state == RegionRoleState::Leader(RegionLeaderState::Staging),
@@ -898,12 +891,11 @@ impl MitoRegion {
// Submit merged actions using the manifest manager's update method.
// Pass `false` so it saves to normal directory, not staging.
//
// Clone the action list before it is consumed so we can return it to
// the caller for hook invocation after the lock is dropped.
let hook = self.manifest_ctx.hook();
let action_list_for_hook = hook.as_ref().map(|_| merged_actions.clone());
let new_version = manager.update(merged_actions, false).await?;
let pending = self
.manifest_ctx
.update_locked(manager, merged_actions, false)
.await?;
let new_version = pending.version();
info!(
"Successfully submitted merged staged manifests for region {}, new version: {}",
self.region_id, new_version
@@ -928,8 +920,7 @@ impl MitoRegion {
self.exit_staging()?;
// Return the hook payload; the caller invokes the hook after dropping the lock.
let hook_payload = action_list_for_hook.map(|action_list| (action_list, new_version));
Ok(hook_payload)
Ok(Some(pending))
}
/// Returns the partition expression string for this region.
@@ -995,7 +986,9 @@ impl Drop for MitoRegion {
/// Context to update the region manifest.
#[derive(Debug)]
pub(crate) struct ManifestContext {
/// Manager to maintain manifest for this region.
/// Manager to maintain manifest for this region. Logical writes go through
/// [`update_locked`](Self::update_locked) (or an [`update_manifest`](Self::update_manifest)
/// variant) so they produce a [`PendingManifestHook`].
pub(crate) manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
/// The state of the region. The region checks the state before updating
/// manifest.
@@ -1178,6 +1171,38 @@ impl ManifestContext {
.await
}
/// Performs a manifest write under a caller-held write lock and returns a
/// [`PendingManifestHook`] to [`fire`](PendingManifestHook::fire) after
/// dropping the lock. This is the sole caller of
/// [`RegionManifestManager::update`], so it is the funnel through which all
/// logical manifest writes notify the hook.
///
/// Does not validate state or edit applicability — the caller must do so
/// while still holding the lock (see `update_manifest_with_state_check`
/// and `MitoRegion::exit_staging_on_success`).
pub(crate) async fn update_locked(
&self,
manager: &mut RegionManifestManager,
action_list: RegionMetaActionList,
is_staging: bool,
) -> Result<PendingManifestHook> {
let region_id = manager.manifest().metadata.region_id;
// Clone before `action_list` is moved into `update` so the hook still
// sees what was written.
let action_list_for_hook = self.hook.as_ref().map(|_| action_list.clone());
let version = manager
.update(action_list, is_staging)
.await
.inspect_err(|e| error!(e; "Failed to update manifest, region_id: {}", region_id))?;
Ok(PendingManifestHook::new(
region_id,
action_list_for_hook,
version,
self.hook.clone(),
))
}
async fn update_manifest_with_state_check(
&self,
action_list: RegionMetaActionList,
@@ -1243,15 +1268,12 @@ impl ManifestContext {
}
}
// If a region hook is registered, clone the action list before it is consumed
// so we can pass a reference to the hook after a successful update.
let hook = self.hook.clone();
let action_list_for_hook = hook.as_ref().map(|_| action_list.clone());
// `update_locked` returns a `PendingManifestHook` we fire after releasing the lock.
let region_id = manifest.metadata.region_id;
let version = manager
.update(action_list, is_staging)
.await
.inspect_err(|e| error!(e; "Failed to update manifest, region_id: {}", region_id))?;
let pending = self
.update_locked(&mut manager, action_list, is_staging)
.await?;
let version = pending.version();
// Drop the write lock before invoking the hook. Hook implementations may
// read the manifest or send region requests that acquire this lock;
@@ -1265,12 +1287,7 @@ impl ManifestContext {
);
}
if let Some(hook) = hook
&& let Some(action_list) = action_list_for_hook
{
hook.on_manifest_updated(region_id, &action_list, version)
.await;
}
pending.fire().await;
Ok(version)
}

View File

@@ -159,13 +159,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
return;
}
};
// Drop the write lock before invoking the hook to avoid deadlock.
drop(manager);
if let Some((action_list, version)) = hook_payload
&& let Some(hook) = region.manifest_ctx.hook()
{
hook.on_manifest_updated(region.region_id, &action_list, version)
.await;
if let Some(pending) = hook_payload {
pending.fire().await;
}
sender.send(Ok(0));
} else {