From c26f2f94c02e68d09cf1ae09bf8a60b41d9d3d4a Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Mon, 4 Dec 2023 11:51:30 +0900 Subject: [PATCH] chore: add logs and metrics (#2858) * chore: add logs and metrics * feat: add the timer to track heartbeat intervel * feat: add the gauge to track region leases * refactor: use gauge instead of the timer * chore: apply suggestions from CR * feat: add hit rate and etcd txn metrics --- src/common/meta/src/kv_backend/etcd.rs | 6 ++++ src/datanode/src/alive_keeper.rs | 6 +++- src/datanode/src/heartbeat.rs | 29 +++++++++++++++++-- src/datanode/src/metrics.rs | 15 ++++++++++ .../src/handler/region_lease_handler.rs | 13 +++++++++ src/meta-srv/src/metrics.rs | 15 ++++++++++ src/meta-srv/src/region/lease_keeper.rs | 22 ++++++++++++-- src/meta-srv/src/service/store/cached_kv.rs | 4 +++ 8 files changed, 105 insertions(+), 5 deletions(-) diff --git a/src/common/meta/src/kv_backend/etcd.rs b/src/common/meta/src/kv_backend/etcd.rs index 5becb4b332..806b90150b 100644 --- a/src/common/meta/src/kv_backend/etcd.rs +++ b/src/common/meta/src/kv_backend/etcd.rs @@ -68,6 +68,9 @@ impl EtcdStore { async fn do_multi_txn(&self, txn_ops: Vec) -> Result> { if txn_ops.len() < MAX_TXN_SIZE { // fast path + let _timer = METRIC_META_TXN_REQUEST + .with_label_values(&["etcd", "txn"]) + .start_timer(); let txn = Txn::new().and_then(txn_ops); let txn_res = self .client @@ -81,6 +84,9 @@ impl EtcdStore { let txns = txn_ops .chunks(MAX_TXN_SIZE) .map(|part| async move { + let _timer = METRIC_META_TXN_REQUEST + .with_label_values(&["etcd", "txn"]) + .start_timer(); let txn = Txn::new().and_then(part); self.client.kv_client().txn(txn).await }) diff --git a/src/datanode/src/alive_keeper.rs b/src/datanode/src/alive_keeper.rs index d8b0b7c50d..e8bb653d69 100644 --- a/src/datanode/src/alive_keeper.rs +++ b/src/datanode/src/alive_keeper.rs @@ -129,8 +129,12 @@ impl RegionAliveKeeper { let (role, region_id) = (region.role().into(), RegionId::from(region.region_id)); if let Some(handle) = self.find_handle(region_id).await { handle.reset_deadline(role, deadline).await; + } else { + warn!( + "Trying to renew the lease for region {region_id}, the keeper handler is not found!" + ); + // Else the region alive keeper might be triggered by lagging messages, we can safely ignore it. } - // Else the region alive keeper might be triggered by lagging messages, we can safely ignore it. } } diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 4e2242288c..5a4502beb5 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -37,6 +37,7 @@ use crate::alive_keeper::RegionAliveKeeper; use crate::config::DatanodeOptions; use crate::error::{self, MetaClientInitSnafu, Result}; use crate::event_listener::RegionServerEventReceiver; +use crate::metrics; use crate::region_server::RegionServer; pub(crate) mod handler; @@ -72,9 +73,9 @@ impl HeartbeatTask { opts.heartbeat.interval.as_millis() as u64, )); let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![ + region_alive_keeper.clone(), Arc::new(ParseMailboxMessageHandler), Arc::new(RegionHeartbeatResponseHandler::new(region_server.clone())), - region_alive_keeper.clone(), ])); Ok(Self { @@ -101,8 +102,10 @@ impl HeartbeatTask { quit_signal: Arc, ) -> Result { let client_id = meta_client.id(); - let (tx, mut rx) = meta_client.heartbeat().await.context(MetaClientInitSnafu)?; + + let mut last_received_lease = Instant::now(); + let _handle = common_runtime::spawn_bg(async move { while let Some(res) = match rx.message().await { Ok(m) => m, @@ -114,6 +117,28 @@ impl HeartbeatTask { if let Some(msg) = res.mailbox_message.as_ref() { info!("Received mailbox message: {msg:?}, meta_client id: {client_id:?}"); } + if let Some(lease) = res.region_lease.as_ref() { + metrics::LAST_RECEIVED_HEARTBEAT_ELAPSED + .set(last_received_lease.elapsed().as_millis() as i64); + // Resets the timer. + last_received_lease = Instant::now(); + + let mut leader_region_lease_count = 0; + let mut follower_region_lease_count = 0; + for lease in &lease.regions { + match lease.role() { + RegionRole::Leader => leader_region_lease_count += 1, + RegionRole::Follower => follower_region_lease_count += 1, + } + } + + metrics::HEARTBEAT_REGION_LEASES + .with_label_values(&["leader"]) + .set(leader_region_lease_count); + metrics::HEARTBEAT_REGION_LEASES + .with_label_values(&["follower"]) + .set(follower_region_lease_count); + } let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), res); if let Err(e) = Self::handle_response(ctx, handler_executor.clone()).await { error!(e; "Error while handling heartbeat response"); diff --git a/src/datanode/src/metrics.rs b/src/datanode/src/metrics.rs index f4a2621ecc..54e2619d78 100644 --- a/src/datanode/src/metrics.rs +++ b/src/datanode/src/metrics.rs @@ -18,6 +18,8 @@ use prometheus::*; /// Region request type label. pub const REGION_REQUEST_TYPE: &str = "datanode_region_request_type"; +pub const REGION_ROLE: &str = "region_role"; + lazy_static! { /// The elapsed time of handling a request in the region_server. pub static ref HANDLE_REGION_REQUEST_ELAPSED: HistogramVec = register_histogram_vec!( @@ -26,4 +28,17 @@ lazy_static! { &[REGION_REQUEST_TYPE] ) .unwrap(); + /// The elapsed time since the last received heartbeat. + pub static ref LAST_RECEIVED_HEARTBEAT_ELAPSED: IntGauge = register_int_gauge!( + "last_received_heartbeat_lease_elapsed", + "last received heartbeat lease elapsed", + ) + .unwrap(); + /// The received region leases via heartbeat. + pub static ref HEARTBEAT_REGION_LEASES: IntGaugeVec = register_int_gauge_vec!( + "heartbeat_region_leases", + "received region leases via heartbeat", + &[REGION_ROLE] + ) + .unwrap(); } diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 67fcba4519..8e72ad098d 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use api::v1::meta::{HeartbeatRequest, RegionLease, Role}; use async_trait::async_trait; use common_meta::key::TableMetadataManagerRef; +use common_telemetry::info; use store_api::region_engine::{GrantedRegion, RegionRole}; use store_api::storage::RegionId; @@ -123,6 +124,12 @@ impl HeartbeatHandler for RegionLeaseHandler { &leaders, RegionRole::Leader, ); + if !closable.is_empty() { + info!( + "Granting region lease, found closable leader regions: {:?} on datanode {}", + closable, datanode_id + ); + } inactive_regions.extend(closable); let followers = followers.into_iter().flatten().collect::>(); @@ -144,6 +151,12 @@ impl HeartbeatHandler for RegionLeaseHandler { &followers, RegionRole::Follower, ); + if !closable.is_empty() { + info!( + "Granting region lease, found closable follower regions {:?} on datanode {}", + closable, datanode_id + ); + } inactive_regions.extend(closable); acc.inactive_region_ids = inactive_regions; diff --git a/src/meta-srv/src/metrics.rs b/src/meta-srv/src/metrics.rs index 8c73c9a7c1..493c2bcc74 100644 --- a/src/meta-srv/src/metrics.rs +++ b/src/meta-srv/src/metrics.rs @@ -34,4 +34,19 @@ lazy_static! { pub static ref METRIC_META_LEADER_CACHED_KV_LOAD: HistogramVec = register_histogram_vec!("meta_leader_cache_kv_load", "meta load cache", &["prefix"]) .unwrap(); + pub static ref METRIC_META_LOAD_FOLLOWER_METADATA: Histogram = register_histogram!( + "meta_load_follower_metadata", + "meta load follower regions metadata elapsed" + ) + .unwrap(); + pub static ref METRIC_META_LOAD_LEADER_METADATA: Histogram = register_histogram!( + "meta_load_leader_metadata", + "meta load leader regions metadata elapsed" + ) + .unwrap(); + pub static ref METRIC_META_KV_CACHE_BATCH_GET_HIT_RATE: Gauge = register_gauge!( + "meta_kv_cache_batch_get_hit_rate", + "meta kv cache batch get hit rate" + ) + .unwrap(); } diff --git a/src/meta-srv/src/region/lease_keeper.rs b/src/meta-srv/src/region/lease_keeper.rs index f9471f7e07..06bc4cf382 100644 --- a/src/meta-srv/src/region/lease_keeper.rs +++ b/src/meta-srv/src/region/lease_keeper.rs @@ -21,11 +21,13 @@ use std::sync::{Arc, RwLock}; use common_meta::key::table_route::TableRouteValue; use common_meta::key::TableMetadataManagerRef; use common_meta::DatanodeId; +use common_telemetry::warn; use snafu::ResultExt; use store_api::storage::{RegionId, TableId}; use self::mito::find_staled_leader_regions; use crate::error::{self, Result}; +use crate::metrics; use crate::region::lease_keeper::utils::find_staled_follower_regions; pub type RegionLeaseKeeperRef = Arc; @@ -89,7 +91,11 @@ impl RegionLeaseKeeper { ) -> Result<(HashSet, HashSet)> { let tables = self.collect_tables(datanode_regions); let table_ids = tables.keys().copied().collect::>(); - let metadata_subset = self.collect_tables_metadata(&table_ids).await?; + + let metadata_subset = { + let _timer = metrics::METRIC_META_LOAD_LEADER_METADATA.start_timer(); + self.collect_tables_metadata(&table_ids).await? + }; let mut closable_set = HashSet::new(); let mut downgradable_set = HashSet::new(); @@ -104,6 +110,10 @@ impl RegionLeaseKeeper { downgradable_set.extend(downgradable); closable_set.extend(closable); } else { + warn!( + "The table {} metadata is not found, appends closable leader regions: {:?}", + table_id, regions + ); // If table metadata is not found. closable_set.extend(regions); } @@ -128,7 +138,11 @@ impl RegionLeaseKeeper { ) -> Result<(HashSet, HashSet)> { let tables = self.collect_tables(datanode_regions); let table_ids = tables.keys().copied().collect::>(); - let metadata_subset = self.collect_tables_metadata(&table_ids).await?; + + let metadata_subset = { + let _timer = metrics::METRIC_META_LOAD_FOLLOWER_METADATA.start_timer(); + self.collect_tables_metadata(&table_ids).await? + }; let mut upgradable_set = HashSet::new(); let mut closable_set = HashSet::new(); @@ -143,6 +157,10 @@ impl RegionLeaseKeeper { upgradable_set.extend(upgradable); closable_set.extend(closable); } else { + warn!( + "The table {} metadata is not found, appends closable followers regions: {:?}", + table_id, regions + ); // If table metadata is not found. closable_set.extend(regions); } diff --git a/src/meta-srv/src/service/store/cached_kv.rs b/src/meta-srv/src/service/store/cached_kv.rs index 450a967df1..a1d251971c 100644 --- a/src/meta-srv/src/service/store/cached_kv.rs +++ b/src/meta-srv/src/service/store/cached_kv.rs @@ -260,6 +260,10 @@ impl KvBackend for LeaderCachedKvBackend { .iter() .map(|kv| kv.key.clone()) .collect::>(); + + let hit_rate = hit_keys.len() as f64 / req.keys.len() as f64; + metrics::METRIC_META_KV_CACHE_BATCH_GET_HIT_RATE.set(hit_rate); + let missed_keys = req .keys .iter()