diff --git a/src/mito2/src/engine/region_hook.rs b/src/mito2/src/engine/region_hook.rs index 3fda3e542e..0abf3fc110 100644 --- a/src/mito2/src/engine/region_hook.rs +++ b/src/mito2/src/engine/region_hook.rs @@ -64,8 +64,8 @@ //! //! `on_manifest_updated` is centralized in [`ManifestContext::update_manifest_with_state_check`], //! so it automatically covers all manifest write paths that go through `ManifestContext`. -//! The sole exception is [`MitoRegion::exit_staging_on_success`], which invokes the hook -//! inline because it receives a pre-acquired manifest write lock from the caller. +//! The sole exception is [`MitoRegion::exit_staging_on_success`], which returns the hook +//! payload to the caller so the hook can be invoked after dropping the manifest write lock. //! //! ## Future work //! diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 80c3e41b5b..2e995e21d5 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -442,7 +442,7 @@ impl MitoRegion { self.manifest_ctx.manifest_manager.write().await; let current_state = self.state(); - match state { + let hook_payload: Option<(RegionMetaActionList, ManifestVersion)> = match state { SettableRegionRoleState::Leader => { // Exit staging mode and return to normal writable leader // Only allowed from staging state @@ -450,11 +450,12 @@ impl MitoRegion { RegionRoleState::Leader(RegionLeaderState::Staging) => { info!("Exiting staging mode for region {}", self.region_id); // Use the success exit path that merges all staged manifests - self.exit_staging_on_success(&mut manager).await?; + self.exit_staging_on_success(&mut manager).await? } RegionRoleState::Leader(RegionLeaderState::Writable) => { // Already in desired state - no-op info!("Region {} already in normal leader mode", self.region_id); + None } _ => { // Only staging -> leader transition is allowed @@ -489,6 +490,7 @@ impl MitoRegion { .build()); } } + None } SettableRegionRoleState::Follower => { @@ -511,6 +513,7 @@ impl MitoRegion { info!("Region {} already in follower mode", self.region_id); } } + None } SettableRegionRoleState::DowngradingLeader => { @@ -539,8 +542,9 @@ impl MitoRegion { ); } } + None } - } + }; // Hack(zhongzc): If we have just become leader (writable), persist any backfilled metadata. if self.state() == RegionRoleState::Leader(RegionLeaderState::Writable) { @@ -574,6 +578,14 @@ impl MitoRegion { drop(manager); + // Invoke the hook outside the manifest write lock to avoid deadlock. + if let Some((action_list, version)) = hook_payload + && let Some(hook) = self.manifest_ctx.hook() + { + hook.on_manifest_updated(self.region_id, &action_list, version) + .await; + } + Ok(()) } @@ -785,10 +797,18 @@ impl MitoRegion { } /// Exit staging mode successfully by merging all staged manifests and making them visible. + /// Merges staged manifest actions into the live manifest and exits staging mode. + /// + /// The caller must hold the manifest write lock and pass it via `manager`. + /// Returns `Ok(Some(action_list, version))` when staging manifests were merged, + /// or `Ok(None)` when there were no staged manifests to merge. + /// + /// **Important:** the returned hook payload must be invoked **after** the caller + /// drops the manifest write lock, to avoid deadlocking if the hook reads the manifest. pub(crate) async fn exit_staging_on_success( &self, manager: &mut RwLockWriteGuard<'_, RegionManifestManager>, - ) -> Result<()> { + ) -> Result> { let current_state = self.manifest_ctx.current_state(); ensure!( current_state == RegionRoleState::Leader(RegionLeaderState::Staging), @@ -809,7 +829,7 @@ impl MitoRegion { ); // Even if no manifests to merge, we still need to exit staging mode self.exit_staging()?; - return Ok(()); + return Ok(None); } }; let expect_change = merged_actions.actions.iter().any(|a| a.is_change()); @@ -855,10 +875,8 @@ impl MitoRegion { // Submit merged actions using the manifest manager's update method. // Pass `false` so it saves to normal directory, not staging. // - // Unlike the centralized `update_manifest_with_state_check` path, we invoke - // the hook inline here because the caller already holds the manifest write lock - // and we cannot release it mid-method. This is acceptable because staging exit - // is infrequent and callers manage the lock scope tightly. + // Clone the action list before it is consumed so we can return it to + // the caller for hook invocation after the lock is dropped. let hook = self.manifest_ctx.hook(); let action_list_for_hook = hook.as_ref().map(|_| merged_actions.clone()); let new_version = manager.update(merged_actions, false).await?; @@ -867,14 +885,6 @@ impl MitoRegion { self.region_id, new_version ); - // Invoke the on_manifest_updated hook for the merged staging actions. - if let Some(hook) = hook - && let Some(action_list) = action_list_for_hook - { - hook.on_manifest_updated(self.region_id, &action_list, new_version) - .await; - } - // Apply the merged changes to in-memory version control if let Some(change) = merged_partition_expr_change { let mut new_metadata = self.version().metadata.as_ref().clone(); @@ -893,7 +903,9 @@ impl MitoRegion { } self.exit_staging()?; - Ok(()) + // Return the hook payload; the caller invokes the hook after dropping the lock. + let hook_payload = action_list_for_hook.map(|action_list| (action_list, new_version)); + Ok(hook_payload) } /// Returns the partition expression string for this region. @@ -1852,7 +1864,7 @@ mod tests { .await .unwrap(); - region.exit_staging_on_success(&mut manager).await.unwrap(); + let _hook_payload = region.exit_staging_on_success(&mut manager).await.unwrap(); drop(manager); assert_eq!( @@ -1891,7 +1903,7 @@ mod tests { .await .unwrap(); - region.exit_staging_on_success(&mut manager).await.unwrap(); + let _hook_payload = region.exit_staging_on_success(&mut manager).await.unwrap(); drop(manager); assert_eq!( diff --git a/src/mito2/src/worker/handle_apply_staging.rs b/src/mito2/src/worker/handle_apply_staging.rs index dee66752cf..cefdda1fea 100644 --- a/src/mito2/src/worker/handle_apply_staging.rs +++ b/src/mito2/src/worker/handle_apply_staging.rs @@ -152,12 +152,22 @@ impl RegionWorkerLoop { return; }; let mut manager = region.manifest_ctx.manifest_manager.write().await; - match region.exit_staging_on_success(&mut manager).await { - Ok(()) => { - sender.send(Ok(0)); + let hook_payload = match region.exit_staging_on_success(&mut manager).await { + Ok(payload) => payload, + Err(e) => { + sender.send(Err(e)); + return; } - Err(e) => sender.send(Err(e)), + }; + // Drop the write lock before invoking the hook to avoid deadlock. + drop(manager); + if let Some((action_list, version)) = hook_payload + && let Some(hook) = region.manifest_ctx.hook() + { + hook.on_manifest_updated(region.region_id, &action_list, version) + .await; } + sender.send(Ok(0)); } else { sender.send( UnexpectedSnafu {