From 49c6812e98d95143fcda994d5d249505304492a8 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 11 Nov 2025 17:44:27 +0800 Subject: [PATCH] fix: deregister failure detectors on rollback and improve timeout handling (#7212) Signed-off-by: WenyXu --- src/meta-srv/src/procedure/region_migration.rs | 3 +++ .../region_migration/open_candidate_region.rs | 12 ++++++++---- src/mito2/src/region/opener.rs | 16 ++++++++++++---- src/mito2/src/worker/handle_open.rs | 9 ++++++++- 4 files changed, 31 insertions(+), 9 deletions(-) diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 49ed2834cd..935e59ba33 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -685,6 +685,9 @@ impl RegionMigrationProcedure { .with_context(|_| error::RetryLaterWithSourceSnafu { reason: format!("Failed to update the table route during the rollback downgraded leader region: {region_id}"), })?; + self.context + .deregister_failure_detectors_for_candidate_region() + .await; } self.context.register_failure_detectors().await; diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index f3e767cfd9..111bd41fd2 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::any::Any; +use std::ops::Div; use std::time::Duration; use api::v1::meta::MailboxMessage; @@ -139,12 +140,15 @@ impl OpenCandidateRegion { input: open_instruction.to_string(), })?; + let operation_timeout = + ctx.next_operation_timeout() + .context(error::ExceededDeadlineSnafu { + operation: "Open candidate region", + })?; + let operation_timeout = operation_timeout.div(2).max(OPEN_CANDIDATE_REGION_TIMEOUT); let ch = Channel::Datanode(candidate.id); let now = Instant::now(); - let receiver = ctx - .mailbox - .send(&ch, msg, OPEN_CANDIDATE_REGION_TIMEOUT) - .await?; + let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?; match receiver.await { Ok(msg) => { diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 100443d322..a278e068af 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -401,6 +401,7 @@ impl RegionOpener { config: &MitoConfig, wal: &Wal, ) -> Result> { + let now = Instant::now(); let region_options = self.options.as_ref().unwrap().clone(); let region_manifest_options = Self::manifest_options( @@ -492,8 +493,12 @@ impl RegionOpener { .unwrap_or_default() .max(flushed_entry_id); info!( - "Start replaying memtable at replay_from_entry_id: {} for region {}, manifest version: {}, flushed entry id: {}", - replay_from_entry_id, region_id, manifest.manifest_version, flushed_entry_id + "Start replaying memtable at replay_from_entry_id: {} for region {}, manifest version: {}, flushed entry id: {}, elapsed: {:?}", + replay_from_entry_id, + region_id, + manifest.manifest_version, + flushed_entry_id, + now.elapsed() ); replay_memtable( &provider, @@ -515,8 +520,11 @@ impl RegionOpener { } } else { info!( - "Skip the WAL replay for region: {}, manifest version: {}, flushed_entry_id: {}", - region_id, manifest.manifest_version, flushed_entry_id + "Skip the WAL replay for region: {}, manifest version: {}, flushed_entry_id: {}, elapsed: {:?}", + region_id, + manifest.manifest_version, + flushed_entry_id, + now.elapsed() ); 0 diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index 420f8380db..e50e166d47 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -15,6 +15,7 @@ //! Handling open request. use std::sync::Arc; +use std::time::Instant; use common_telemetry::info; use object_store::util::join_path; @@ -119,6 +120,7 @@ impl RegionWorkerLoop { } }; + let now = Instant::now(); let regions = self.regions.clone(); let wal = self.wal.clone(); let config = self.config.clone(); @@ -129,7 +131,12 @@ impl RegionWorkerLoop { common_runtime::spawn_global(async move { match opener.open(&config, &wal).await { Ok(region) => { - info!("Region {} is opened, worker: {}", region_id, worker_id); + info!( + "Region {} is opened, worker: {}, elapsed: {:?}", + region_id, + worker_id, + now.elapsed() + ); region_count.inc(); // Insert the Region into the RegionMap.