mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-07-05 05:20:38 +00:00
fix: avoid lock when calling hook
This commit is contained in:
@@ -64,8 +64,8 @@
|
||||
//!
|
||||
//! `on_manifest_updated` is centralized in [`ManifestContext::update_manifest_with_state_check`],
|
||||
//! so it automatically covers all manifest write paths that go through `ManifestContext`.
|
||||
//! The sole exception is [`MitoRegion::exit_staging_on_success`], which invokes the hook
|
||||
//! inline because it receives a pre-acquired manifest write lock from the caller.
|
||||
//! The sole exception is [`MitoRegion::exit_staging_on_success`], which returns the hook
|
||||
//! payload to the caller so the hook can be invoked after dropping the manifest write lock.
|
||||
//!
|
||||
//! ## Future work
|
||||
//!
|
||||
|
||||
@@ -442,7 +442,7 @@ impl MitoRegion {
|
||||
self.manifest_ctx.manifest_manager.write().await;
|
||||
let current_state = self.state();
|
||||
|
||||
match state {
|
||||
let hook_payload: Option<(RegionMetaActionList, ManifestVersion)> = match state {
|
||||
SettableRegionRoleState::Leader => {
|
||||
// Exit staging mode and return to normal writable leader
|
||||
// Only allowed from staging state
|
||||
@@ -450,11 +450,12 @@ impl MitoRegion {
|
||||
RegionRoleState::Leader(RegionLeaderState::Staging) => {
|
||||
info!("Exiting staging mode for region {}", self.region_id);
|
||||
// Use the success exit path that merges all staged manifests
|
||||
self.exit_staging_on_success(&mut manager).await?;
|
||||
self.exit_staging_on_success(&mut manager).await?
|
||||
}
|
||||
RegionRoleState::Leader(RegionLeaderState::Writable) => {
|
||||
// Already in desired state - no-op
|
||||
info!("Region {} already in normal leader mode", self.region_id);
|
||||
None
|
||||
}
|
||||
_ => {
|
||||
// Only staging -> leader transition is allowed
|
||||
@@ -489,6 +490,7 @@ impl MitoRegion {
|
||||
.build());
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
SettableRegionRoleState::Follower => {
|
||||
@@ -511,6 +513,7 @@ impl MitoRegion {
|
||||
info!("Region {} already in follower mode", self.region_id);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
SettableRegionRoleState::DowngradingLeader => {
|
||||
@@ -539,8 +542,9 @@ impl MitoRegion {
|
||||
);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Hack(zhongzc): If we have just become leader (writable), persist any backfilled metadata.
|
||||
if self.state() == RegionRoleState::Leader(RegionLeaderState::Writable) {
|
||||
@@ -574,6 +578,14 @@ impl MitoRegion {
|
||||
|
||||
drop(manager);
|
||||
|
||||
// Invoke the hook outside the manifest write lock to avoid deadlock.
|
||||
if let Some((action_list, version)) = hook_payload
|
||||
&& let Some(hook) = self.manifest_ctx.hook()
|
||||
{
|
||||
hook.on_manifest_updated(self.region_id, &action_list, version)
|
||||
.await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -785,10 +797,18 @@ impl MitoRegion {
|
||||
}
|
||||
|
||||
/// Exit staging mode successfully by merging all staged manifests and making them visible.
|
||||
/// Merges staged manifest actions into the live manifest and exits staging mode.
|
||||
///
|
||||
/// The caller must hold the manifest write lock and pass it via `manager`.
|
||||
/// Returns `Ok(Some(action_list, version))` when staging manifests were merged,
|
||||
/// or `Ok(None)` when there were no staged manifests to merge.
|
||||
///
|
||||
/// **Important:** the returned hook payload must be invoked **after** the caller
|
||||
/// drops the manifest write lock, to avoid deadlocking if the hook reads the manifest.
|
||||
pub(crate) async fn exit_staging_on_success(
|
||||
&self,
|
||||
manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
|
||||
) -> Result<()> {
|
||||
) -> Result<Option<(RegionMetaActionList, ManifestVersion)>> {
|
||||
let current_state = self.manifest_ctx.current_state();
|
||||
ensure!(
|
||||
current_state == RegionRoleState::Leader(RegionLeaderState::Staging),
|
||||
@@ -809,7 +829,7 @@ impl MitoRegion {
|
||||
);
|
||||
// Even if no manifests to merge, we still need to exit staging mode
|
||||
self.exit_staging()?;
|
||||
return Ok(());
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
let expect_change = merged_actions.actions.iter().any(|a| a.is_change());
|
||||
@@ -855,10 +875,8 @@ impl MitoRegion {
|
||||
// Submit merged actions using the manifest manager's update method.
|
||||
// Pass `false` so it saves to normal directory, not staging.
|
||||
//
|
||||
// Unlike the centralized `update_manifest_with_state_check` path, we invoke
|
||||
// the hook inline here because the caller already holds the manifest write lock
|
||||
// and we cannot release it mid-method. This is acceptable because staging exit
|
||||
// is infrequent and callers manage the lock scope tightly.
|
||||
// Clone the action list before it is consumed so we can return it to
|
||||
// the caller for hook invocation after the lock is dropped.
|
||||
let hook = self.manifest_ctx.hook();
|
||||
let action_list_for_hook = hook.as_ref().map(|_| merged_actions.clone());
|
||||
let new_version = manager.update(merged_actions, false).await?;
|
||||
@@ -867,14 +885,6 @@ impl MitoRegion {
|
||||
self.region_id, new_version
|
||||
);
|
||||
|
||||
// Invoke the on_manifest_updated hook for the merged staging actions.
|
||||
if let Some(hook) = hook
|
||||
&& let Some(action_list) = action_list_for_hook
|
||||
{
|
||||
hook.on_manifest_updated(self.region_id, &action_list, new_version)
|
||||
.await;
|
||||
}
|
||||
|
||||
// Apply the merged changes to in-memory version control
|
||||
if let Some(change) = merged_partition_expr_change {
|
||||
let mut new_metadata = self.version().metadata.as_ref().clone();
|
||||
@@ -893,7 +903,9 @@ impl MitoRegion {
|
||||
}
|
||||
self.exit_staging()?;
|
||||
|
||||
Ok(())
|
||||
// Return the hook payload; the caller invokes the hook after dropping the lock.
|
||||
let hook_payload = action_list_for_hook.map(|action_list| (action_list, new_version));
|
||||
Ok(hook_payload)
|
||||
}
|
||||
|
||||
/// Returns the partition expression string for this region.
|
||||
@@ -1852,7 +1864,7 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
region.exit_staging_on_success(&mut manager).await.unwrap();
|
||||
let _hook_payload = region.exit_staging_on_success(&mut manager).await.unwrap();
|
||||
drop(manager);
|
||||
|
||||
assert_eq!(
|
||||
@@ -1891,7 +1903,7 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
region.exit_staging_on_success(&mut manager).await.unwrap();
|
||||
let _hook_payload = region.exit_staging_on_success(&mut manager).await.unwrap();
|
||||
drop(manager);
|
||||
|
||||
assert_eq!(
|
||||
|
||||
@@ -152,12 +152,22 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
return;
|
||||
};
|
||||
let mut manager = region.manifest_ctx.manifest_manager.write().await;
|
||||
match region.exit_staging_on_success(&mut manager).await {
|
||||
Ok(()) => {
|
||||
sender.send(Ok(0));
|
||||
let hook_payload = match region.exit_staging_on_success(&mut manager).await {
|
||||
Ok(payload) => payload,
|
||||
Err(e) => {
|
||||
sender.send(Err(e));
|
||||
return;
|
||||
}
|
||||
Err(e) => sender.send(Err(e)),
|
||||
};
|
||||
// Drop the write lock before invoking the hook to avoid deadlock.
|
||||
drop(manager);
|
||||
if let Some((action_list, version)) = hook_payload
|
||||
&& let Some(hook) = region.manifest_ctx.hook()
|
||||
{
|
||||
hook.on_manifest_updated(region.region_id, &action_list, version)
|
||||
.await;
|
||||
}
|
||||
sender.send(Ok(0));
|
||||
} else {
|
||||
sender.send(
|
||||
UnexpectedSnafu {
|
||||
|
||||
Reference in New Issue
Block a user