From ff3a46b1d0276d924663b6404f0ee4fdd32c25b3 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Thu, 24 Apr 2025 12:00:14 +0800 Subject: [PATCH] feat: improve observability of region migration procedure (#5967) * feat: improve observability of region migration procedure * chore: apply suggestions from CR * chore: observe non-zero value --- src/meta-srv/src/metrics.rs | 9 ++ .../src/procedure/region_migration.rs | 124 +++++++++++++++++- .../close_downgraded_region.rs | 8 +- .../downgrade_leader_region.rs | 8 +- .../region_migration/migration_abort.rs | 11 +- .../region_migration/open_candidate_region.rs | 5 +- .../upgrade_candidate_region.rs | 9 +- 7 files changed, 160 insertions(+), 14 deletions(-) diff --git a/src/meta-srv/src/metrics.rs b/src/meta-srv/src/metrics.rs index ffbe986d72..2984a91a1c 100644 --- a/src/meta-srv/src/metrics.rs +++ b/src/meta-srv/src/metrics.rs @@ -71,4 +71,13 @@ lazy_static! { /// The remote WAL prune execute counter. pub static ref METRIC_META_REMOTE_WAL_PRUNE_EXECUTE: IntCounterVec = register_int_counter_vec!("greptime_meta_remote_wal_prune_execute", "meta remote wal prune execute", &["topic_name"]).unwrap(); + /// The migration stage elapsed histogram. + pub static ref METRIC_META_REGION_MIGRATION_STAGE_ELAPSED: HistogramVec = register_histogram_vec!( + "greptime_meta_region_migration_stage_elapsed", + "meta region migration stage elapsed", + &["stage"], + // 0.01 ~ 1000 + exponential_buckets(0.01, 10.0, 7).unwrap(), + ) + .unwrap(); } diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index b2f1eed711..43b444d3b1 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -25,7 +25,7 @@ pub(crate) mod update_metadata; pub(crate) mod upgrade_candidate_region; use std::any::Any; -use std::fmt::Debug; +use std::fmt::{Debug, Display}; use std::time::Duration; use common_error::ext::BoxedError; @@ -43,7 +43,7 @@ use common_procedure::error::{ Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu, }; use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status, StringKey}; -use common_telemetry::info; +use common_telemetry::{error, info}; use manager::RegionMigrationProcedureGuard; pub use manager::{ RegionMigrationManagerRef, RegionMigrationProcedureTask, RegionMigrationProcedureTracker, @@ -55,7 +55,10 @@ use tokio::time::Instant; use self::migration_start::RegionMigrationStart; use crate::error::{self, Result}; -use crate::metrics::{METRIC_META_REGION_MIGRATION_ERROR, METRIC_META_REGION_MIGRATION_EXECUTE}; +use crate::metrics::{ + METRIC_META_REGION_MIGRATION_ERROR, METRIC_META_REGION_MIGRATION_EXECUTE, + METRIC_META_REGION_MIGRATION_STAGE_ELAPSED, +}; use crate::service::mailbox::MailboxRef; /// The default timeout for region migration. @@ -103,6 +106,82 @@ impl PersistentContext { } } +/// Metrics of region migration. +#[derive(Debug, Clone, Default)] +pub struct Metrics { + /// Elapsed time of downgrading region and upgrading region. + operations_elapsed: Duration, + /// Elapsed time of downgrading leader region. + downgrade_leader_region_elapsed: Duration, + /// Elapsed time of open candidate region. + open_candidate_region_elapsed: Duration, + /// Elapsed time of upgrade candidate region. + upgrade_candidate_region_elapsed: Duration, +} + +impl Display for Metrics { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "operations_elapsed: {:?}, downgrade_leader_region_elapsed: {:?}, open_candidate_region_elapsed: {:?}, upgrade_candidate_region_elapsed: {:?}", + self.operations_elapsed, + self.downgrade_leader_region_elapsed, + self.open_candidate_region_elapsed, + self.upgrade_candidate_region_elapsed + ) + } +} + +impl Metrics { + /// Updates the elapsed time of downgrading region and upgrading region. + pub fn update_operations_elapsed(&mut self, elapsed: Duration) { + self.operations_elapsed += elapsed; + } + + /// Updates the elapsed time of downgrading leader region. + pub fn update_downgrade_leader_region_elapsed(&mut self, elapsed: Duration) { + self.downgrade_leader_region_elapsed += elapsed; + } + + /// Updates the elapsed time of open candidate region. + pub fn update_open_candidate_region_elapsed(&mut self, elapsed: Duration) { + self.open_candidate_region_elapsed += elapsed; + } + + /// Updates the elapsed time of upgrade candidate region. + pub fn update_upgrade_candidate_region_elapsed(&mut self, elapsed: Duration) { + self.upgrade_candidate_region_elapsed += elapsed; + } +} + +impl Drop for Metrics { + fn drop(&mut self) { + if !self.operations_elapsed.is_zero() { + METRIC_META_REGION_MIGRATION_STAGE_ELAPSED + .with_label_values(&["operations"]) + .observe(self.operations_elapsed.as_secs_f64()); + } + + if !self.downgrade_leader_region_elapsed.is_zero() { + METRIC_META_REGION_MIGRATION_STAGE_ELAPSED + .with_label_values(&["downgrade_leader_region"]) + .observe(self.downgrade_leader_region_elapsed.as_secs_f64()); + } + + if !self.open_candidate_region_elapsed.is_zero() { + METRIC_META_REGION_MIGRATION_STAGE_ELAPSED + .with_label_values(&["open_candidate_region"]) + .observe(self.open_candidate_region_elapsed.as_secs_f64()); + } + + if !self.upgrade_candidate_region_elapsed.is_zero() { + METRIC_META_REGION_MIGRATION_STAGE_ELAPSED + .with_label_values(&["upgrade_candidate_region"]) + .observe(self.upgrade_candidate_region_elapsed.as_secs_f64()); + } + } +} + /// It's shared in each step and available in executing (including retrying). /// /// It will be dropped if the procedure runner crashes. @@ -132,8 +211,8 @@ pub struct VolatileContext { leader_region_last_entry_id: Option, /// The last_entry_id of leader metadata region (Only used for metric engine). leader_region_metadata_last_entry_id: Option, - /// Elapsed time of downgrading region and upgrading region. - operations_elapsed: Duration, + /// Metrics of region migration. + metrics: Metrics, } impl VolatileContext { @@ -231,12 +310,35 @@ impl Context { pub fn next_operation_timeout(&self) -> Option { self.persistent_ctx .timeout - .checked_sub(self.volatile_ctx.operations_elapsed) + .checked_sub(self.volatile_ctx.metrics.operations_elapsed) } /// Updates operations elapsed. pub fn update_operations_elapsed(&mut self, instant: Instant) { - self.volatile_ctx.operations_elapsed += instant.elapsed(); + self.volatile_ctx + .metrics + .update_operations_elapsed(instant.elapsed()); + } + + /// Updates the elapsed time of downgrading leader region. + pub fn update_downgrade_leader_region_elapsed(&mut self, instant: Instant) { + self.volatile_ctx + .metrics + .update_downgrade_leader_region_elapsed(instant.elapsed()); + } + + /// Updates the elapsed time of open candidate region. + pub fn update_open_candidate_region_elapsed(&mut self, instant: Instant) { + self.volatile_ctx + .metrics + .update_open_candidate_region_elapsed(instant.elapsed()); + } + + /// Updates the elapsed time of upgrade candidate region. + pub fn update_upgrade_candidate_region_elapsed(&mut self, instant: Instant) { + self.volatile_ctx + .metrics + .update_upgrade_candidate_region_elapsed(instant.elapsed()); } /// Returns address of meta server. @@ -550,6 +652,14 @@ impl Procedure for RegionMigrationProcedure { .inc(); ProcedureError::retry_later(e) } else { + error!( + e; + "Region migration procedure failed, region_id: {}, from_peer: {}, to_peer: {}, {}", + self.context.region_id(), + self.context.persistent_ctx.from_peer, + self.context.persistent_ctx.to_peer, + self.context.volatile_ctx.metrics, + ); METRIC_META_REGION_MIGRATION_ERROR .with_label_values(&[name, "external"]) .inc(); diff --git a/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs index 94256ba5ec..ba13f7cdea 100644 --- a/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs +++ b/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs @@ -46,7 +46,13 @@ impl State for CloseDowngradedRegion { let region_id = ctx.region_id(); warn!(err; "Failed to close downgraded leader region: {region_id} on datanode {:?}", downgrade_leader_datanode); } - + info!( + "Region migration is finished: region_id: {}, from_peer: {}, to_peer: {}, {}", + ctx.region_id(), + ctx.persistent_ctx.from_peer, + ctx.persistent_ctx.to_peer, + ctx.volatile_ctx.metrics, + ); Ok((Box::new(RegionMigrationEnd), Status::done())) } diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs index 02b7216fe7..93481adc54 100644 --- a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs @@ -54,6 +54,7 @@ impl Default for DowngradeLeaderRegion { #[typetag::serde] impl State for DowngradeLeaderRegion { async fn next(&mut self, ctx: &mut Context) -> Result<(Box, Status)> { + let now = Instant::now(); // Ensures the `leader_region_lease_deadline` must exist after recovering. ctx.volatile_ctx .set_leader_region_lease_deadline(Duration::from_secs(REGION_LEASE_SECS)); @@ -77,6 +78,7 @@ impl State for DowngradeLeaderRegion { } } } + ctx.update_downgrade_leader_region_elapsed(now); Ok(( Box::new(UpgradeCandidateRegion::default()), @@ -348,7 +350,8 @@ mod tests { let env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); prepare_table_metadata(&ctx, HashMap::default()).await; - ctx.volatile_ctx.operations_elapsed = ctx.persistent_ctx.timeout + Duration::from_secs(1); + ctx.volatile_ctx.metrics.operations_elapsed = + ctx.persistent_ctx.timeout + Duration::from_secs(1); let err = state.downgrade_region(&mut ctx).await.unwrap_err(); @@ -591,7 +594,8 @@ mod tests { let mut ctx = env.context_factory().new_context(persistent_context); let mailbox_ctx = env.mailbox_context(); let mailbox = mailbox_ctx.mailbox().clone(); - ctx.volatile_ctx.operations_elapsed = ctx.persistent_ctx.timeout + Duration::from_secs(1); + ctx.volatile_ctx.metrics.operations_elapsed = + ctx.persistent_ctx.timeout + Duration::from_secs(1); let (tx, rx) = tokio::sync::mpsc::channel(1); mailbox_ctx diff --git a/src/meta-srv/src/procedure/region_migration/migration_abort.rs b/src/meta-srv/src/procedure/region_migration/migration_abort.rs index af56843045..d364f0c8b9 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_abort.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_abort.rs @@ -15,6 +15,7 @@ use std::any::Any; use common_procedure::Status; +use common_telemetry::warn; use serde::{Deserialize, Serialize}; use crate::error::{self, Result}; @@ -37,7 +38,15 @@ impl RegionMigrationAbort { #[async_trait::async_trait] #[typetag::serde] impl State for RegionMigrationAbort { - async fn next(&mut self, _: &mut Context) -> Result<(Box, Status)> { + async fn next(&mut self, ctx: &mut Context) -> Result<(Box, Status)> { + warn!( + "Region migration is aborted: {}, region_id: {}, from_peer: {}, to_peer: {}, {}", + self.reason, + ctx.region_id(), + ctx.persistent_ctx.from_peer, + ctx.persistent_ctx.to_peer, + ctx.volatile_ctx.metrics, + ); error::MigrationAbortSnafu { reason: &self.reason, } diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index 6cacf75063..6d1c81d3ed 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::any::Any; -use std::time::{Duration, Instant}; +use std::time::Duration; use api::v1::meta::MailboxMessage; use common_meta::distributed_time_constants::REGION_LEASE_SECS; @@ -24,6 +24,7 @@ use common_procedure::Status; use common_telemetry::info; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; +use tokio::time::Instant; use crate::error::{self, Result}; use crate::handler::HeartbeatMailbox; @@ -42,7 +43,9 @@ pub struct OpenCandidateRegion; impl State for OpenCandidateRegion { async fn next(&mut self, ctx: &mut Context) -> Result<(Box, Status)> { let instruction = self.build_open_region_instruction(ctx).await?; + let now = Instant::now(); self.open_candidate_region(ctx, instruction).await?; + ctx.update_open_candidate_region_elapsed(now); Ok(( Box::new(UpdateMetadata::Downgrade), diff --git a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs index 552b9d3863..8f3741dbac 100644 --- a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs @@ -54,9 +54,12 @@ impl Default for UpgradeCandidateRegion { #[typetag::serde] impl State for UpgradeCandidateRegion { async fn next(&mut self, ctx: &mut Context) -> Result<(Box, Status)> { + let now = Instant::now(); if self.upgrade_region_with_retry(ctx).await { + ctx.update_upgrade_candidate_region_elapsed(now); Ok((Box::new(UpdateMetadata::Upgrade), Status::executing(false))) } else { + ctx.update_upgrade_candidate_region_elapsed(now); Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false))) } } @@ -288,7 +291,8 @@ mod tests { let persistent_context = new_persistent_context(); let env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); - ctx.volatile_ctx.operations_elapsed = ctx.persistent_ctx.timeout + Duration::from_secs(1); + ctx.volatile_ctx.metrics.operations_elapsed = + ctx.persistent_ctx.timeout + Duration::from_secs(1); let err = state.upgrade_region(&ctx).await.unwrap_err(); @@ -558,7 +562,8 @@ mod tests { let mut ctx = env.context_factory().new_context(persistent_context); let mailbox_ctx = env.mailbox_context(); let mailbox = mailbox_ctx.mailbox().clone(); - ctx.volatile_ctx.operations_elapsed = ctx.persistent_ctx.timeout + Duration::from_secs(1); + ctx.volatile_ctx.metrics.operations_elapsed = + ctx.persistent_ctx.timeout + Duration::from_secs(1); let (tx, rx) = tokio::sync::mpsc::channel(1); mailbox_ctx