feat: meta metrics (#1538)

* chore: from_etcd_kv (better name)

* feat: kv request metric

* feat: router metric

* feat: connections metric
This commit is contained in:
JeremyHi
2023-05-08 17:50:21 +08:00
committed by GitHub
parent c48067f88d
commit 610651fa8f
6 changed files with 173 additions and 20 deletions

View File

@@ -26,6 +26,7 @@ use common_telemetry::{info, warn};
use dashmap::DashMap;
pub use failure_handler::RegionFailureHandler;
pub use keep_lease_handler::KeepLeaseHandler;
use metrics::{decrement_gauge, increment_gauge};
pub use on_leader_start::OnLeaderStartHandler;
pub use persist_stats_handler::PersistStatsHandler;
pub use response_header_handler::ResponseHeaderHandler;
@@ -37,6 +38,7 @@ use self::instruction::Instruction;
use self::node_stat::Stat;
use crate::error::{self, Result};
use crate::metasrv::Context;
use crate::metrics::METRIC_META_HEARTBEAT_CONNECTION_NUM;
use crate::sequence::Sequence;
use crate::service::mailbox::{Channel, Mailbox, MailboxReceiver, MailboxRef, MessageId};
@@ -127,6 +129,7 @@ impl HeartbeatHandlerGroup {
pub async fn register(&self, key: impl AsRef<str>, pusher: Pusher) {
let mut pushers = self.pushers.write().await;
let key = key.as_ref();
increment_gauge!(METRIC_META_HEARTBEAT_CONNECTION_NUM, 1.0);
info!("Pusher register: {}", key);
pushers.insert(key.into(), pusher);
}
@@ -134,6 +137,7 @@ impl HeartbeatHandlerGroup {
pub async fn unregister(&self, key: impl AsRef<str>) -> Option<Pusher> {
let mut pushers = self.pushers.write().await;
let key = key.as_ref();
decrement_gauge!(METRIC_META_HEARTBEAT_CONNECTION_NUM, 1.0);
info!("Pusher unregister: {}", key);
pushers.remove(key)
}

View File

@@ -14,3 +14,6 @@
pub(crate) const METRIC_META_CREATE_CATALOG: &str = "meta.create_catalog";
pub(crate) const METRIC_META_CREATE_SCHEMA: &str = "meta.create_schema";
pub(crate) const METRIC_META_KV_REQUEST: &str = "meta.kv_request";
pub(crate) const METRIC_META_ROUTE_REQUEST: &str = "meta.route_request";
pub(crate) const METRIC_META_HEARTBEAT_CONNECTION_NUM: &str = "meta.heartbeat_connection_num";

View File

@@ -92,7 +92,7 @@ impl heartbeat_server::Heartbeat for MetaSrv {
}
info!(
"Heartbeat stream broken: {:?}",
pusher_key.as_ref().unwrap_or(&"unknow".to_string())
pusher_key.as_ref().unwrap_or(&"unknown".to_string())
);
if let Some(key) = pusher_key {
let _ = handler_group.unregister(&key).await;

View File

@@ -20,7 +20,7 @@ use api::v1::meta::{
RouteResponse, Table, TableName, TableRoute, TableRouteValue,
};
use catalog::helper::{TableGlobalKey, TableGlobalValue};
use common_telemetry::warn;
use common_telemetry::{timer, warn};
use snafu::{OptionExt, ResultExt};
use table::metadata::RawTableInfo;
use tonic::{Request, Response};
@@ -29,6 +29,7 @@ use crate::error;
use crate::error::Result;
use crate::keys::TableRouteKey;
use crate::metasrv::{Context, MetaSrv, SelectorRef};
use crate::metrics::METRIC_META_ROUTE_REQUEST;
use crate::sequence::SequenceRef;
use crate::service::store::ext::KvStoreExt;
use crate::service::store::kv::KvStoreRef;
@@ -39,7 +40,16 @@ impl router_server::Router for MetaSrv {
async fn create(&self, req: Request<CreateRequest>) -> GrpcResult<RouteResponse> {
let req = req.into_inner();
let CreateRequest { table_name, .. } = &req;
let CreateRequest {
header, table_name, ..
} = &req;
let cluster_id = header.as_ref().map_or(0, |h| h.cluster_id);
let _timer = timer!(
METRIC_META_ROUTE_REQUEST,
&[("op", "create"), ("cluster_id", &cluster_id.to_string())]
);
let table_name = table_name.clone().context(error::EmptyTableNameSnafu)?;
let ctx = self.create_ctx(table_name);
@@ -53,6 +63,13 @@ impl router_server::Router for MetaSrv {
async fn route(&self, req: Request<RouteRequest>) -> GrpcResult<RouteResponse> {
let req = req.into_inner();
let cluster_id = req.header.as_ref().map_or(0, |h| h.cluster_id);
let _timer = timer!(
METRIC_META_ROUTE_REQUEST,
&[("op", "route"), ("cluster_id", &cluster_id.to_string())]
);
let ctx = self.new_ctx();
let res = handle_route(req, ctx).await?;
@@ -61,6 +78,13 @@ impl router_server::Router for MetaSrv {
async fn delete(&self, req: Request<DeleteRequest>) -> GrpcResult<RouteResponse> {
let req = req.into_inner();
let cluster_id = req.header.as_ref().map_or(0, |h| h.cluster_id);
let _timer = timer!(
METRIC_META_ROUTE_REQUEST,
&[("op", "delete"), ("cluster_id", &cluster_id.to_string())]
);
let ctx = self.new_ctx();
let res = handle_delete(req, ctx).await?;

View File

@@ -21,13 +21,14 @@ use api::v1::meta::{
RangeRequest, RangeResponse, ResponseHeader,
};
use common_error::prelude::*;
use common_telemetry::warn;
use common_telemetry::{timer, warn};
use etcd_client::{
Client, Compare, CompareOp, DeleteOptions, GetOptions, PutOptions, Txn, TxnOp, TxnOpResponse,
};
use crate::error;
use crate::error::Result;
use crate::metrics::METRIC_META_KV_REQUEST;
use crate::service::store::kv::{KvStore, KvStoreRef};
pub struct EtcdStore {
@@ -61,6 +62,15 @@ impl KvStore for EtcdStore {
options,
} = req.try_into()?;
let _timer = timer!(
METRIC_META_KV_REQUEST,
&[
("target", "etcd"),
("op", "range"),
("cluster_id", &cluster_id.to_string())
]
);
let res = self
.client
.kv_client()
@@ -68,7 +78,11 @@ impl KvStore for EtcdStore {
.await
.context(error::EtcdFailedSnafu)?;
let kvs = res.kvs().iter().map(KvPair::to_kv).collect::<Vec<_>>();
let kvs = res
.kvs()
.iter()
.map(KvPair::from_etcd_kv)
.collect::<Vec<_>>();
let header = Some(ResponseHeader::success(cluster_id));
Ok(RangeResponse {
@@ -86,6 +100,15 @@ impl KvStore for EtcdStore {
options,
} = req.try_into()?;
let _timer = timer!(
METRIC_META_KV_REQUEST,
&[
("target", "etcd"),
("op", "put"),
("cluster_id", &cluster_id.to_string())
]
);
let res = self
.client
.kv_client()
@@ -93,7 +116,7 @@ impl KvStore for EtcdStore {
.await
.context(error::EtcdFailedSnafu)?;
let prev_kv = res.prev_key().map(KvPair::to_kv);
let prev_kv = res.prev_key().map(KvPair::from_etcd_kv);
let header = Some(ResponseHeader::success(cluster_id));
Ok(PutResponse { header, prev_kv })
@@ -106,6 +129,15 @@ impl KvStore for EtcdStore {
options,
} = req.try_into()?;
let _timer = timer!(
METRIC_META_KV_REQUEST,
&[
("target", "etcd"),
("op", "batch_get"),
("cluster_id", &cluster_id.to_string())
]
);
let get_ops: Vec<_> = keys
.into_iter()
.map(|k| TxnOp::get(k, options.clone()))
@@ -126,7 +158,7 @@ impl KvStore for EtcdStore {
_ => unreachable!(),
};
kvs.extend(get_res.kvs().iter().map(KvPair::to_kv));
kvs.extend(get_res.kvs().iter().map(KvPair::from_etcd_kv));
}
let header = Some(ResponseHeader::success(cluster_id));
@@ -140,6 +172,15 @@ impl KvStore for EtcdStore {
options,
} = req.try_into()?;
let _timer = timer!(
METRIC_META_KV_REQUEST,
&[
("target", "etcd"),
("op", "batch_put"),
("cluster_id", &cluster_id.to_string())
]
);
let put_ops = kvs
.into_iter()
.map(|kv| (TxnOp::put(kv.key, kv.value, options.clone())))
@@ -158,7 +199,7 @@ impl KvStore for EtcdStore {
match op_res {
TxnOpResponse::Put(put_res) => {
if let Some(prev_kv) = put_res.prev_key() {
prev_kvs.push(KvPair::to_kv(prev_kv));
prev_kvs.push(KvPair::from_etcd_kv(prev_kv));
}
}
_ => unreachable!(), // never get here
@@ -176,6 +217,15 @@ impl KvStore for EtcdStore {
options,
} = req.try_into()?;
let _timer = timer!(
METRIC_META_KV_REQUEST,
&[
("target", "etcd"),
("op", "batch_delete"),
("cluster_id", &cluster_id.to_string())
]
);
let mut prev_kvs = Vec::with_capacity(keys.len());
let delete_ops = keys
@@ -195,7 +245,7 @@ impl KvStore for EtcdStore {
match op_res {
TxnOpResponse::Delete(delete_res) => {
delete_res.prev_kvs().iter().for_each(|kv| {
prev_kvs.push(KvPair::to_kv(kv));
prev_kvs.push(KvPair::from_etcd_kv(kv));
});
}
_ => unreachable!(), // never get here
@@ -216,6 +266,15 @@ impl KvStore for EtcdStore {
put_options,
} = req.try_into()?;
let _timer = timer!(
METRIC_META_KV_REQUEST,
&[
("target", "etcd"),
("op", "compare_and_put"),
("cluster_id", &cluster_id.to_string())
]
);
let compare = if expect.is_empty() {
// create if absent
// revision 0 means key was not exist
@@ -247,8 +306,8 @@ impl KvStore for EtcdStore {
})?;
let prev_kv = match op_res {
TxnOpResponse::Put(res) => res.prev_key().map(KvPair::to_kv),
TxnOpResponse::Get(res) => res.kvs().first().map(KvPair::to_kv),
TxnOpResponse::Put(res) => res.prev_key().map(KvPair::from_etcd_kv),
TxnOpResponse::Get(res) => res.kvs().first().map(KvPair::from_etcd_kv),
_ => unreachable!(), // never get here
};
@@ -267,6 +326,15 @@ impl KvStore for EtcdStore {
options,
} = req.try_into()?;
let _timer = timer!(
METRIC_META_KV_REQUEST,
&[
("target", "etcd"),
("op", "delete_range"),
("cluster_id", &cluster_id.to_string())
]
);
let res = self
.client
.kv_client()
@@ -274,7 +342,11 @@ impl KvStore for EtcdStore {
.await
.context(error::EtcdFailedSnafu)?;
let prev_kvs = res.prev_kvs().iter().map(KvPair::to_kv).collect::<Vec<_>>();
let prev_kvs = res
.prev_kvs()
.iter()
.map(KvPair::from_etcd_kv)
.collect::<Vec<_>>();
let header = Some(ResponseHeader::success(cluster_id));
Ok(DeleteRangeResponse {
@@ -292,6 +364,15 @@ impl KvStore for EtcdStore {
delete_options,
} = req.try_into()?;
let _timer = timer!(
METRIC_META_KV_REQUEST,
&[
("target", "etcd"),
("op", "move_value"),
("cluster_id", &cluster_id.to_string())
]
);
let mut client = self.client.kv_client();
let header = Some(ResponseHeader::success(cluster_id));
@@ -341,13 +422,13 @@ impl KvStore for EtcdStore {
TxnOpResponse::Get(res) => {
return Ok(MoveValueResponse {
header,
kv: res.kvs().first().map(KvPair::to_kv),
kv: res.kvs().first().map(KvPair::from_etcd_kv),
});
}
TxnOpResponse::Delete(res) => {
return Ok(MoveValueResponse {
header,
kv: res.prev_kvs().first().map(KvPair::to_kv),
kv: res.prev_kvs().first().map(KvPair::from_etcd_kv),
});
}
_ => {}
@@ -613,7 +694,7 @@ impl<'a> KvPair<'a> {
}
#[inline]
fn to_kv(kv: &etcd_client::KeyValue) -> KeyValue {
fn from_etcd_kv(kv: &etcd_client::KeyValue) -> KeyValue {
KeyValue::from(KvPair::new(kv))
}
}

View File

@@ -22,10 +22,12 @@ use api::v1::meta::{
DeleteRangeResponse, KeyValue, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse,
RangeRequest, RangeResponse, ResponseHeader,
};
use common_telemetry::timer;
use parking_lot::RwLock;
use super::ext::KvStoreExt;
use crate::error::Result;
use crate::metrics::METRIC_META_KV_REQUEST;
use crate::service::store::kv::{KvStore, ResettableKvStore};
pub struct MemStore {
@@ -55,6 +57,11 @@ impl ResettableKvStore for MemStore {
#[async_trait::async_trait]
impl KvStore for MemStore {
async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
let _timer = timer!(
METRIC_META_KV_REQUEST,
&[("target", "memory"), ("op", "range"),]
);
let RangeRequest {
header,
key,
@@ -99,6 +106,11 @@ impl KvStore for MemStore {
}
async fn put(&self, req: PutRequest) -> Result<PutResponse> {
let _timer = timer!(
METRIC_META_KV_REQUEST,
&[("target", "memory"), ("op", "put"),]
);
let PutRequest {
header,
key,
@@ -120,7 +132,12 @@ impl KvStore for MemStore {
}
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
let keys = req.keys;
let _timer = timer!(
METRIC_META_KV_REQUEST,
&[("target", "memory"), ("op", "batch_get"),]
);
let BatchGetRequest { header, keys } = req;
let mut kvs = Vec::with_capacity(keys.len());
for key in keys {
@@ -129,13 +146,17 @@ impl KvStore for MemStore {
}
}
Ok(BatchGetResponse {
kvs,
..Default::default()
})
let cluster_id = header.map_or(0, |h| h.cluster_id);
let header = Some(ResponseHeader::success(cluster_id));
Ok(BatchGetResponse { header, kvs })
}
async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
let _timer = timer!(
METRIC_META_KV_REQUEST,
&[("target", "memory"), ("op", "batch_put"),]
);
let BatchPutRequest {
header,
kvs,
@@ -165,6 +186,11 @@ impl KvStore for MemStore {
}
async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
let _timer = timer!(
METRIC_META_KV_REQUEST,
&[("target", "memory"), ("op", "batch_delete"),]
);
let BatchDeleteRequest {
header,
keys,
@@ -188,6 +214,11 @@ impl KvStore for MemStore {
}
async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
let _timer = timer!(
METRIC_META_KV_REQUEST,
&[("target", "memory"), ("op", "compare_and_put"),]
);
let CompareAndPutRequest {
header,
key,
@@ -228,6 +259,11 @@ impl KvStore for MemStore {
}
async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
let _timer = timer!(
METRIC_META_KV_REQUEST,
&[("target", "memory"), ("op", "deleteL_range"),]
);
let DeleteRangeRequest {
header,
key,
@@ -265,6 +301,11 @@ impl KvStore for MemStore {
}
async fn move_value(&self, req: MoveValueRequest) -> Result<MoveValueResponse> {
let _timer = timer!(
METRIC_META_KV_REQUEST,
&[("target", "memory"), ("op", "move_value"),]
);
let MoveValueRequest {
header,
from_key,