mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
feat: improve observability of region migration procedure (#5967)
* feat: improve observability of region migration procedure * chore: apply suggestions from CR * chore: observe non-zero value
This commit is contained in:
@@ -71,4 +71,13 @@ lazy_static! {
|
||||
/// The remote WAL prune execute counter.
|
||||
pub static ref METRIC_META_REMOTE_WAL_PRUNE_EXECUTE: IntCounterVec =
|
||||
register_int_counter_vec!("greptime_meta_remote_wal_prune_execute", "meta remote wal prune execute", &["topic_name"]).unwrap();
|
||||
/// The migration stage elapsed histogram.
|
||||
pub static ref METRIC_META_REGION_MIGRATION_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
|
||||
"greptime_meta_region_migration_stage_elapsed",
|
||||
"meta region migration stage elapsed",
|
||||
&["stage"],
|
||||
// 0.01 ~ 1000
|
||||
exponential_buckets(0.01, 10.0, 7).unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@ pub(crate) mod update_metadata;
|
||||
pub(crate) mod upgrade_candidate_region;
|
||||
|
||||
use std::any::Any;
|
||||
use std::fmt::Debug;
|
||||
use std::fmt::{Debug, Display};
|
||||
use std::time::Duration;
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
@@ -43,7 +43,7 @@ use common_procedure::error::{
|
||||
Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
|
||||
};
|
||||
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status, StringKey};
|
||||
use common_telemetry::info;
|
||||
use common_telemetry::{error, info};
|
||||
use manager::RegionMigrationProcedureGuard;
|
||||
pub use manager::{
|
||||
RegionMigrationManagerRef, RegionMigrationProcedureTask, RegionMigrationProcedureTracker,
|
||||
@@ -55,7 +55,10 @@ use tokio::time::Instant;
|
||||
|
||||
use self::migration_start::RegionMigrationStart;
|
||||
use crate::error::{self, Result};
|
||||
use crate::metrics::{METRIC_META_REGION_MIGRATION_ERROR, METRIC_META_REGION_MIGRATION_EXECUTE};
|
||||
use crate::metrics::{
|
||||
METRIC_META_REGION_MIGRATION_ERROR, METRIC_META_REGION_MIGRATION_EXECUTE,
|
||||
METRIC_META_REGION_MIGRATION_STAGE_ELAPSED,
|
||||
};
|
||||
use crate::service::mailbox::MailboxRef;
|
||||
|
||||
/// The default timeout for region migration.
|
||||
@@ -103,6 +106,82 @@ impl PersistentContext {
|
||||
}
|
||||
}
|
||||
|
||||
/// Metrics of region migration.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct Metrics {
|
||||
/// Elapsed time of downgrading region and upgrading region.
|
||||
operations_elapsed: Duration,
|
||||
/// Elapsed time of downgrading leader region.
|
||||
downgrade_leader_region_elapsed: Duration,
|
||||
/// Elapsed time of open candidate region.
|
||||
open_candidate_region_elapsed: Duration,
|
||||
/// Elapsed time of upgrade candidate region.
|
||||
upgrade_candidate_region_elapsed: Duration,
|
||||
}
|
||||
|
||||
impl Display for Metrics {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"operations_elapsed: {:?}, downgrade_leader_region_elapsed: {:?}, open_candidate_region_elapsed: {:?}, upgrade_candidate_region_elapsed: {:?}",
|
||||
self.operations_elapsed,
|
||||
self.downgrade_leader_region_elapsed,
|
||||
self.open_candidate_region_elapsed,
|
||||
self.upgrade_candidate_region_elapsed
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl Metrics {
|
||||
/// Updates the elapsed time of downgrading region and upgrading region.
|
||||
pub fn update_operations_elapsed(&mut self, elapsed: Duration) {
|
||||
self.operations_elapsed += elapsed;
|
||||
}
|
||||
|
||||
/// Updates the elapsed time of downgrading leader region.
|
||||
pub fn update_downgrade_leader_region_elapsed(&mut self, elapsed: Duration) {
|
||||
self.downgrade_leader_region_elapsed += elapsed;
|
||||
}
|
||||
|
||||
/// Updates the elapsed time of open candidate region.
|
||||
pub fn update_open_candidate_region_elapsed(&mut self, elapsed: Duration) {
|
||||
self.open_candidate_region_elapsed += elapsed;
|
||||
}
|
||||
|
||||
/// Updates the elapsed time of upgrade candidate region.
|
||||
pub fn update_upgrade_candidate_region_elapsed(&mut self, elapsed: Duration) {
|
||||
self.upgrade_candidate_region_elapsed += elapsed;
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Metrics {
|
||||
fn drop(&mut self) {
|
||||
if !self.operations_elapsed.is_zero() {
|
||||
METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
|
||||
.with_label_values(&["operations"])
|
||||
.observe(self.operations_elapsed.as_secs_f64());
|
||||
}
|
||||
|
||||
if !self.downgrade_leader_region_elapsed.is_zero() {
|
||||
METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
|
||||
.with_label_values(&["downgrade_leader_region"])
|
||||
.observe(self.downgrade_leader_region_elapsed.as_secs_f64());
|
||||
}
|
||||
|
||||
if !self.open_candidate_region_elapsed.is_zero() {
|
||||
METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
|
||||
.with_label_values(&["open_candidate_region"])
|
||||
.observe(self.open_candidate_region_elapsed.as_secs_f64());
|
||||
}
|
||||
|
||||
if !self.upgrade_candidate_region_elapsed.is_zero() {
|
||||
METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
|
||||
.with_label_values(&["upgrade_candidate_region"])
|
||||
.observe(self.upgrade_candidate_region_elapsed.as_secs_f64());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// It's shared in each step and available in executing (including retrying).
|
||||
///
|
||||
/// It will be dropped if the procedure runner crashes.
|
||||
@@ -132,8 +211,8 @@ pub struct VolatileContext {
|
||||
leader_region_last_entry_id: Option<u64>,
|
||||
/// The last_entry_id of leader metadata region (Only used for metric engine).
|
||||
leader_region_metadata_last_entry_id: Option<u64>,
|
||||
/// Elapsed time of downgrading region and upgrading region.
|
||||
operations_elapsed: Duration,
|
||||
/// Metrics of region migration.
|
||||
metrics: Metrics,
|
||||
}
|
||||
|
||||
impl VolatileContext {
|
||||
@@ -231,12 +310,35 @@ impl Context {
|
||||
pub fn next_operation_timeout(&self) -> Option<Duration> {
|
||||
self.persistent_ctx
|
||||
.timeout
|
||||
.checked_sub(self.volatile_ctx.operations_elapsed)
|
||||
.checked_sub(self.volatile_ctx.metrics.operations_elapsed)
|
||||
}
|
||||
|
||||
/// Updates operations elapsed.
|
||||
pub fn update_operations_elapsed(&mut self, instant: Instant) {
|
||||
self.volatile_ctx.operations_elapsed += instant.elapsed();
|
||||
self.volatile_ctx
|
||||
.metrics
|
||||
.update_operations_elapsed(instant.elapsed());
|
||||
}
|
||||
|
||||
/// Updates the elapsed time of downgrading leader region.
|
||||
pub fn update_downgrade_leader_region_elapsed(&mut self, instant: Instant) {
|
||||
self.volatile_ctx
|
||||
.metrics
|
||||
.update_downgrade_leader_region_elapsed(instant.elapsed());
|
||||
}
|
||||
|
||||
/// Updates the elapsed time of open candidate region.
|
||||
pub fn update_open_candidate_region_elapsed(&mut self, instant: Instant) {
|
||||
self.volatile_ctx
|
||||
.metrics
|
||||
.update_open_candidate_region_elapsed(instant.elapsed());
|
||||
}
|
||||
|
||||
/// Updates the elapsed time of upgrade candidate region.
|
||||
pub fn update_upgrade_candidate_region_elapsed(&mut self, instant: Instant) {
|
||||
self.volatile_ctx
|
||||
.metrics
|
||||
.update_upgrade_candidate_region_elapsed(instant.elapsed());
|
||||
}
|
||||
|
||||
/// Returns address of meta server.
|
||||
@@ -550,6 +652,14 @@ impl Procedure for RegionMigrationProcedure {
|
||||
.inc();
|
||||
ProcedureError::retry_later(e)
|
||||
} else {
|
||||
error!(
|
||||
e;
|
||||
"Region migration procedure failed, region_id: {}, from_peer: {}, to_peer: {}, {}",
|
||||
self.context.region_id(),
|
||||
self.context.persistent_ctx.from_peer,
|
||||
self.context.persistent_ctx.to_peer,
|
||||
self.context.volatile_ctx.metrics,
|
||||
);
|
||||
METRIC_META_REGION_MIGRATION_ERROR
|
||||
.with_label_values(&[name, "external"])
|
||||
.inc();
|
||||
|
||||
@@ -46,7 +46,13 @@ impl State for CloseDowngradedRegion {
|
||||
let region_id = ctx.region_id();
|
||||
warn!(err; "Failed to close downgraded leader region: {region_id} on datanode {:?}", downgrade_leader_datanode);
|
||||
}
|
||||
|
||||
info!(
|
||||
"Region migration is finished: region_id: {}, from_peer: {}, to_peer: {}, {}",
|
||||
ctx.region_id(),
|
||||
ctx.persistent_ctx.from_peer,
|
||||
ctx.persistent_ctx.to_peer,
|
||||
ctx.volatile_ctx.metrics,
|
||||
);
|
||||
Ok((Box::new(RegionMigrationEnd), Status::done()))
|
||||
}
|
||||
|
||||
|
||||
@@ -54,6 +54,7 @@ impl Default for DowngradeLeaderRegion {
|
||||
#[typetag::serde]
|
||||
impl State for DowngradeLeaderRegion {
|
||||
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
|
||||
let now = Instant::now();
|
||||
// Ensures the `leader_region_lease_deadline` must exist after recovering.
|
||||
ctx.volatile_ctx
|
||||
.set_leader_region_lease_deadline(Duration::from_secs(REGION_LEASE_SECS));
|
||||
@@ -77,6 +78,7 @@ impl State for DowngradeLeaderRegion {
|
||||
}
|
||||
}
|
||||
}
|
||||
ctx.update_downgrade_leader_region_elapsed(now);
|
||||
|
||||
Ok((
|
||||
Box::new(UpgradeCandidateRegion::default()),
|
||||
@@ -348,7 +350,8 @@ mod tests {
|
||||
let env = TestingEnv::new();
|
||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||
prepare_table_metadata(&ctx, HashMap::default()).await;
|
||||
ctx.volatile_ctx.operations_elapsed = ctx.persistent_ctx.timeout + Duration::from_secs(1);
|
||||
ctx.volatile_ctx.metrics.operations_elapsed =
|
||||
ctx.persistent_ctx.timeout + Duration::from_secs(1);
|
||||
|
||||
let err = state.downgrade_region(&mut ctx).await.unwrap_err();
|
||||
|
||||
@@ -591,7 +594,8 @@ mod tests {
|
||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||
let mailbox_ctx = env.mailbox_context();
|
||||
let mailbox = mailbox_ctx.mailbox().clone();
|
||||
ctx.volatile_ctx.operations_elapsed = ctx.persistent_ctx.timeout + Duration::from_secs(1);
|
||||
ctx.volatile_ctx.metrics.operations_elapsed =
|
||||
ctx.persistent_ctx.timeout + Duration::from_secs(1);
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(1);
|
||||
mailbox_ctx
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
use std::any::Any;
|
||||
|
||||
use common_procedure::Status;
|
||||
use common_telemetry::warn;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::error::{self, Result};
|
||||
@@ -37,7 +38,15 @@ impl RegionMigrationAbort {
|
||||
#[async_trait::async_trait]
|
||||
#[typetag::serde]
|
||||
impl State for RegionMigrationAbort {
|
||||
async fn next(&mut self, _: &mut Context) -> Result<(Box<dyn State>, Status)> {
|
||||
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
|
||||
warn!(
|
||||
"Region migration is aborted: {}, region_id: {}, from_peer: {}, to_peer: {}, {}",
|
||||
self.reason,
|
||||
ctx.region_id(),
|
||||
ctx.persistent_ctx.from_peer,
|
||||
ctx.persistent_ctx.to_peer,
|
||||
ctx.volatile_ctx.metrics,
|
||||
);
|
||||
error::MigrationAbortSnafu {
|
||||
reason: &self.reason,
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::time::{Duration, Instant};
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::meta::MailboxMessage;
|
||||
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
|
||||
@@ -24,6 +24,7 @@ use common_procedure::Status;
|
||||
use common_telemetry::info;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use tokio::time::Instant;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::handler::HeartbeatMailbox;
|
||||
@@ -42,7 +43,9 @@ pub struct OpenCandidateRegion;
|
||||
impl State for OpenCandidateRegion {
|
||||
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
|
||||
let instruction = self.build_open_region_instruction(ctx).await?;
|
||||
let now = Instant::now();
|
||||
self.open_candidate_region(ctx, instruction).await?;
|
||||
ctx.update_open_candidate_region_elapsed(now);
|
||||
|
||||
Ok((
|
||||
Box::new(UpdateMetadata::Downgrade),
|
||||
|
||||
@@ -54,9 +54,12 @@ impl Default for UpgradeCandidateRegion {
|
||||
#[typetag::serde]
|
||||
impl State for UpgradeCandidateRegion {
|
||||
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
|
||||
let now = Instant::now();
|
||||
if self.upgrade_region_with_retry(ctx).await {
|
||||
ctx.update_upgrade_candidate_region_elapsed(now);
|
||||
Ok((Box::new(UpdateMetadata::Upgrade), Status::executing(false)))
|
||||
} else {
|
||||
ctx.update_upgrade_candidate_region_elapsed(now);
|
||||
Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)))
|
||||
}
|
||||
}
|
||||
@@ -288,7 +291,8 @@ mod tests {
|
||||
let persistent_context = new_persistent_context();
|
||||
let env = TestingEnv::new();
|
||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||
ctx.volatile_ctx.operations_elapsed = ctx.persistent_ctx.timeout + Duration::from_secs(1);
|
||||
ctx.volatile_ctx.metrics.operations_elapsed =
|
||||
ctx.persistent_ctx.timeout + Duration::from_secs(1);
|
||||
|
||||
let err = state.upgrade_region(&ctx).await.unwrap_err();
|
||||
|
||||
@@ -558,7 +562,8 @@ mod tests {
|
||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||
let mailbox_ctx = env.mailbox_context();
|
||||
let mailbox = mailbox_ctx.mailbox().clone();
|
||||
ctx.volatile_ctx.operations_elapsed = ctx.persistent_ctx.timeout + Duration::from_secs(1);
|
||||
ctx.volatile_ctx.metrics.operations_elapsed =
|
||||
ctx.persistent_ctx.timeout + Duration::from_secs(1);
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(1);
|
||||
mailbox_ctx
|
||||
|
||||
Reference in New Issue
Block a user