From 610651fa8f39cf9da0a6b7505cfdd1bae4b02f6b Mon Sep 17 00:00:00 2001 From: JeremyHi Date: Mon, 8 May 2023 17:50:21 +0800 Subject: [PATCH] feat: meta metrics (#1538) * chore: from_etcd_kv (better name) * feat: kv request metric * feat: router metric * feat: connections metric --- src/meta-srv/src/handler.rs | 4 + src/meta-srv/src/metrics.rs | 3 + src/meta-srv/src/service/heartbeat.rs | 2 +- src/meta-srv/src/service/router.rs | 28 +++++- src/meta-srv/src/service/store/etcd.rs | 105 ++++++++++++++++++++--- src/meta-srv/src/service/store/memory.rs | 51 +++++++++-- 6 files changed, 173 insertions(+), 20 deletions(-) diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 685d059ec6..98fbca803d 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -26,6 +26,7 @@ use common_telemetry::{info, warn}; use dashmap::DashMap; pub use failure_handler::RegionFailureHandler; pub use keep_lease_handler::KeepLeaseHandler; +use metrics::{decrement_gauge, increment_gauge}; pub use on_leader_start::OnLeaderStartHandler; pub use persist_stats_handler::PersistStatsHandler; pub use response_header_handler::ResponseHeaderHandler; @@ -37,6 +38,7 @@ use self::instruction::Instruction; use self::node_stat::Stat; use crate::error::{self, Result}; use crate::metasrv::Context; +use crate::metrics::METRIC_META_HEARTBEAT_CONNECTION_NUM; use crate::sequence::Sequence; use crate::service::mailbox::{Channel, Mailbox, MailboxReceiver, MailboxRef, MessageId}; @@ -127,6 +129,7 @@ impl HeartbeatHandlerGroup { pub async fn register(&self, key: impl AsRef, pusher: Pusher) { let mut pushers = self.pushers.write().await; let key = key.as_ref(); + increment_gauge!(METRIC_META_HEARTBEAT_CONNECTION_NUM, 1.0); info!("Pusher register: {}", key); pushers.insert(key.into(), pusher); } @@ -134,6 +137,7 @@ impl HeartbeatHandlerGroup { pub async fn unregister(&self, key: impl AsRef) -> Option { let mut pushers = self.pushers.write().await; let key = key.as_ref(); + decrement_gauge!(METRIC_META_HEARTBEAT_CONNECTION_NUM, 1.0); info!("Pusher unregister: {}", key); pushers.remove(key) } diff --git a/src/meta-srv/src/metrics.rs b/src/meta-srv/src/metrics.rs index 3d59e163a1..f468c4fef5 100644 --- a/src/meta-srv/src/metrics.rs +++ b/src/meta-srv/src/metrics.rs @@ -14,3 +14,6 @@ pub(crate) const METRIC_META_CREATE_CATALOG: &str = "meta.create_catalog"; pub(crate) const METRIC_META_CREATE_SCHEMA: &str = "meta.create_schema"; +pub(crate) const METRIC_META_KV_REQUEST: &str = "meta.kv_request"; +pub(crate) const METRIC_META_ROUTE_REQUEST: &str = "meta.route_request"; +pub(crate) const METRIC_META_HEARTBEAT_CONNECTION_NUM: &str = "meta.heartbeat_connection_num"; diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index c7e7984975..bc9e42fdc7 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -92,7 +92,7 @@ impl heartbeat_server::Heartbeat for MetaSrv { } info!( "Heartbeat stream broken: {:?}", - pusher_key.as_ref().unwrap_or(&"unknow".to_string()) + pusher_key.as_ref().unwrap_or(&"unknown".to_string()) ); if let Some(key) = pusher_key { let _ = handler_group.unregister(&key).await; diff --git a/src/meta-srv/src/service/router.rs b/src/meta-srv/src/service/router.rs index f32e7f1b3a..9d7a6674da 100644 --- a/src/meta-srv/src/service/router.rs +++ b/src/meta-srv/src/service/router.rs @@ -20,7 +20,7 @@ use api::v1::meta::{ RouteResponse, Table, TableName, TableRoute, TableRouteValue, }; use catalog::helper::{TableGlobalKey, TableGlobalValue}; -use common_telemetry::warn; +use common_telemetry::{timer, warn}; use snafu::{OptionExt, ResultExt}; use table::metadata::RawTableInfo; use tonic::{Request, Response}; @@ -29,6 +29,7 @@ use crate::error; use crate::error::Result; use crate::keys::TableRouteKey; use crate::metasrv::{Context, MetaSrv, SelectorRef}; +use crate::metrics::METRIC_META_ROUTE_REQUEST; use crate::sequence::SequenceRef; use crate::service::store::ext::KvStoreExt; use crate::service::store::kv::KvStoreRef; @@ -39,7 +40,16 @@ impl router_server::Router for MetaSrv { async fn create(&self, req: Request) -> GrpcResult { let req = req.into_inner(); - let CreateRequest { table_name, .. } = &req; + let CreateRequest { + header, table_name, .. + } = &req; + let cluster_id = header.as_ref().map_or(0, |h| h.cluster_id); + + let _timer = timer!( + METRIC_META_ROUTE_REQUEST, + &[("op", "create"), ("cluster_id", &cluster_id.to_string())] + ); + let table_name = table_name.clone().context(error::EmptyTableNameSnafu)?; let ctx = self.create_ctx(table_name); @@ -53,6 +63,13 @@ impl router_server::Router for MetaSrv { async fn route(&self, req: Request) -> GrpcResult { let req = req.into_inner(); + let cluster_id = req.header.as_ref().map_or(0, |h| h.cluster_id); + + let _timer = timer!( + METRIC_META_ROUTE_REQUEST, + &[("op", "route"), ("cluster_id", &cluster_id.to_string())] + ); + let ctx = self.new_ctx(); let res = handle_route(req, ctx).await?; @@ -61,6 +78,13 @@ impl router_server::Router for MetaSrv { async fn delete(&self, req: Request) -> GrpcResult { let req = req.into_inner(); + let cluster_id = req.header.as_ref().map_or(0, |h| h.cluster_id); + + let _timer = timer!( + METRIC_META_ROUTE_REQUEST, + &[("op", "delete"), ("cluster_id", &cluster_id.to_string())] + ); + let ctx = self.new_ctx(); let res = handle_delete(req, ctx).await?; diff --git a/src/meta-srv/src/service/store/etcd.rs b/src/meta-srv/src/service/store/etcd.rs index c77d24de75..5b1d7213a0 100644 --- a/src/meta-srv/src/service/store/etcd.rs +++ b/src/meta-srv/src/service/store/etcd.rs @@ -21,13 +21,14 @@ use api::v1::meta::{ RangeRequest, RangeResponse, ResponseHeader, }; use common_error::prelude::*; -use common_telemetry::warn; +use common_telemetry::{timer, warn}; use etcd_client::{ Client, Compare, CompareOp, DeleteOptions, GetOptions, PutOptions, Txn, TxnOp, TxnOpResponse, }; use crate::error; use crate::error::Result; +use crate::metrics::METRIC_META_KV_REQUEST; use crate::service::store::kv::{KvStore, KvStoreRef}; pub struct EtcdStore { @@ -61,6 +62,15 @@ impl KvStore for EtcdStore { options, } = req.try_into()?; + let _timer = timer!( + METRIC_META_KV_REQUEST, + &[ + ("target", "etcd"), + ("op", "range"), + ("cluster_id", &cluster_id.to_string()) + ] + ); + let res = self .client .kv_client() @@ -68,7 +78,11 @@ impl KvStore for EtcdStore { .await .context(error::EtcdFailedSnafu)?; - let kvs = res.kvs().iter().map(KvPair::to_kv).collect::>(); + let kvs = res + .kvs() + .iter() + .map(KvPair::from_etcd_kv) + .collect::>(); let header = Some(ResponseHeader::success(cluster_id)); Ok(RangeResponse { @@ -86,6 +100,15 @@ impl KvStore for EtcdStore { options, } = req.try_into()?; + let _timer = timer!( + METRIC_META_KV_REQUEST, + &[ + ("target", "etcd"), + ("op", "put"), + ("cluster_id", &cluster_id.to_string()) + ] + ); + let res = self .client .kv_client() @@ -93,7 +116,7 @@ impl KvStore for EtcdStore { .await .context(error::EtcdFailedSnafu)?; - let prev_kv = res.prev_key().map(KvPair::to_kv); + let prev_kv = res.prev_key().map(KvPair::from_etcd_kv); let header = Some(ResponseHeader::success(cluster_id)); Ok(PutResponse { header, prev_kv }) @@ -106,6 +129,15 @@ impl KvStore for EtcdStore { options, } = req.try_into()?; + let _timer = timer!( + METRIC_META_KV_REQUEST, + &[ + ("target", "etcd"), + ("op", "batch_get"), + ("cluster_id", &cluster_id.to_string()) + ] + ); + let get_ops: Vec<_> = keys .into_iter() .map(|k| TxnOp::get(k, options.clone())) @@ -126,7 +158,7 @@ impl KvStore for EtcdStore { _ => unreachable!(), }; - kvs.extend(get_res.kvs().iter().map(KvPair::to_kv)); + kvs.extend(get_res.kvs().iter().map(KvPair::from_etcd_kv)); } let header = Some(ResponseHeader::success(cluster_id)); @@ -140,6 +172,15 @@ impl KvStore for EtcdStore { options, } = req.try_into()?; + let _timer = timer!( + METRIC_META_KV_REQUEST, + &[ + ("target", "etcd"), + ("op", "batch_put"), + ("cluster_id", &cluster_id.to_string()) + ] + ); + let put_ops = kvs .into_iter() .map(|kv| (TxnOp::put(kv.key, kv.value, options.clone()))) @@ -158,7 +199,7 @@ impl KvStore for EtcdStore { match op_res { TxnOpResponse::Put(put_res) => { if let Some(prev_kv) = put_res.prev_key() { - prev_kvs.push(KvPair::to_kv(prev_kv)); + prev_kvs.push(KvPair::from_etcd_kv(prev_kv)); } } _ => unreachable!(), // never get here @@ -176,6 +217,15 @@ impl KvStore for EtcdStore { options, } = req.try_into()?; + let _timer = timer!( + METRIC_META_KV_REQUEST, + &[ + ("target", "etcd"), + ("op", "batch_delete"), + ("cluster_id", &cluster_id.to_string()) + ] + ); + let mut prev_kvs = Vec::with_capacity(keys.len()); let delete_ops = keys @@ -195,7 +245,7 @@ impl KvStore for EtcdStore { match op_res { TxnOpResponse::Delete(delete_res) => { delete_res.prev_kvs().iter().for_each(|kv| { - prev_kvs.push(KvPair::to_kv(kv)); + prev_kvs.push(KvPair::from_etcd_kv(kv)); }); } _ => unreachable!(), // never get here @@ -216,6 +266,15 @@ impl KvStore for EtcdStore { put_options, } = req.try_into()?; + let _timer = timer!( + METRIC_META_KV_REQUEST, + &[ + ("target", "etcd"), + ("op", "compare_and_put"), + ("cluster_id", &cluster_id.to_string()) + ] + ); + let compare = if expect.is_empty() { // create if absent // revision 0 means key was not exist @@ -247,8 +306,8 @@ impl KvStore for EtcdStore { })?; let prev_kv = match op_res { - TxnOpResponse::Put(res) => res.prev_key().map(KvPair::to_kv), - TxnOpResponse::Get(res) => res.kvs().first().map(KvPair::to_kv), + TxnOpResponse::Put(res) => res.prev_key().map(KvPair::from_etcd_kv), + TxnOpResponse::Get(res) => res.kvs().first().map(KvPair::from_etcd_kv), _ => unreachable!(), // never get here }; @@ -267,6 +326,15 @@ impl KvStore for EtcdStore { options, } = req.try_into()?; + let _timer = timer!( + METRIC_META_KV_REQUEST, + &[ + ("target", "etcd"), + ("op", "delete_range"), + ("cluster_id", &cluster_id.to_string()) + ] + ); + let res = self .client .kv_client() @@ -274,7 +342,11 @@ impl KvStore for EtcdStore { .await .context(error::EtcdFailedSnafu)?; - let prev_kvs = res.prev_kvs().iter().map(KvPair::to_kv).collect::>(); + let prev_kvs = res + .prev_kvs() + .iter() + .map(KvPair::from_etcd_kv) + .collect::>(); let header = Some(ResponseHeader::success(cluster_id)); Ok(DeleteRangeResponse { @@ -292,6 +364,15 @@ impl KvStore for EtcdStore { delete_options, } = req.try_into()?; + let _timer = timer!( + METRIC_META_KV_REQUEST, + &[ + ("target", "etcd"), + ("op", "move_value"), + ("cluster_id", &cluster_id.to_string()) + ] + ); + let mut client = self.client.kv_client(); let header = Some(ResponseHeader::success(cluster_id)); @@ -341,13 +422,13 @@ impl KvStore for EtcdStore { TxnOpResponse::Get(res) => { return Ok(MoveValueResponse { header, - kv: res.kvs().first().map(KvPair::to_kv), + kv: res.kvs().first().map(KvPair::from_etcd_kv), }); } TxnOpResponse::Delete(res) => { return Ok(MoveValueResponse { header, - kv: res.prev_kvs().first().map(KvPair::to_kv), + kv: res.prev_kvs().first().map(KvPair::from_etcd_kv), }); } _ => {} @@ -613,7 +694,7 @@ impl<'a> KvPair<'a> { } #[inline] - fn to_kv(kv: &etcd_client::KeyValue) -> KeyValue { + fn from_etcd_kv(kv: &etcd_client::KeyValue) -> KeyValue { KeyValue::from(KvPair::new(kv)) } } diff --git a/src/meta-srv/src/service/store/memory.rs b/src/meta-srv/src/service/store/memory.rs index a26147011f..a0d2ed0920 100644 --- a/src/meta-srv/src/service/store/memory.rs +++ b/src/meta-srv/src/service/store/memory.rs @@ -22,10 +22,12 @@ use api::v1::meta::{ DeleteRangeResponse, KeyValue, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, ResponseHeader, }; +use common_telemetry::timer; use parking_lot::RwLock; use super::ext::KvStoreExt; use crate::error::Result; +use crate::metrics::METRIC_META_KV_REQUEST; use crate::service::store::kv::{KvStore, ResettableKvStore}; pub struct MemStore { @@ -55,6 +57,11 @@ impl ResettableKvStore for MemStore { #[async_trait::async_trait] impl KvStore for MemStore { async fn range(&self, req: RangeRequest) -> Result { + let _timer = timer!( + METRIC_META_KV_REQUEST, + &[("target", "memory"), ("op", "range"),] + ); + let RangeRequest { header, key, @@ -99,6 +106,11 @@ impl KvStore for MemStore { } async fn put(&self, req: PutRequest) -> Result { + let _timer = timer!( + METRIC_META_KV_REQUEST, + &[("target", "memory"), ("op", "put"),] + ); + let PutRequest { header, key, @@ -120,7 +132,12 @@ impl KvStore for MemStore { } async fn batch_get(&self, req: BatchGetRequest) -> Result { - let keys = req.keys; + let _timer = timer!( + METRIC_META_KV_REQUEST, + &[("target", "memory"), ("op", "batch_get"),] + ); + + let BatchGetRequest { header, keys } = req; let mut kvs = Vec::with_capacity(keys.len()); for key in keys { @@ -129,13 +146,17 @@ impl KvStore for MemStore { } } - Ok(BatchGetResponse { - kvs, - ..Default::default() - }) + let cluster_id = header.map_or(0, |h| h.cluster_id); + let header = Some(ResponseHeader::success(cluster_id)); + Ok(BatchGetResponse { header, kvs }) } async fn batch_put(&self, req: BatchPutRequest) -> Result { + let _timer = timer!( + METRIC_META_KV_REQUEST, + &[("target", "memory"), ("op", "batch_put"),] + ); + let BatchPutRequest { header, kvs, @@ -165,6 +186,11 @@ impl KvStore for MemStore { } async fn batch_delete(&self, req: BatchDeleteRequest) -> Result { + let _timer = timer!( + METRIC_META_KV_REQUEST, + &[("target", "memory"), ("op", "batch_delete"),] + ); + let BatchDeleteRequest { header, keys, @@ -188,6 +214,11 @@ impl KvStore for MemStore { } async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result { + let _timer = timer!( + METRIC_META_KV_REQUEST, + &[("target", "memory"), ("op", "compare_and_put"),] + ); + let CompareAndPutRequest { header, key, @@ -228,6 +259,11 @@ impl KvStore for MemStore { } async fn delete_range(&self, req: DeleteRangeRequest) -> Result { + let _timer = timer!( + METRIC_META_KV_REQUEST, + &[("target", "memory"), ("op", "deleteL_range"),] + ); + let DeleteRangeRequest { header, key, @@ -265,6 +301,11 @@ impl KvStore for MemStore { } async fn move_value(&self, req: MoveValueRequest) -> Result { + let _timer = timer!( + METRIC_META_KV_REQUEST, + &[("target", "memory"), ("op", "move_value"),] + ); + let MoveValueRequest { header, from_key,