chore: add metrics for rds kv backend (#6201)

* chore: add metrics for rds kv backend

* chore: make clippy happy

* chore: remove useless rds timer

* chore: remove  in record_rds_sql_execute_elapsed macro

* chore: change some str literal to constant

* chore: fix import issue

* chore: remove impl Display for RangeTemplateType
This commit is contained in:
localhost
2025-05-28 21:11:11 +08:00
committed by GitHub
parent aca7ad82b1
commit dc3591655e
5 changed files with 122 additions and 16 deletions

View File

@@ -33,6 +33,12 @@ use crate::rpc::store::{
};
use crate::rpc::KeyValue;
const RDS_STORE_OP_BATCH_GET: &str = "batch_get";
const RDS_STORE_OP_BATCH_PUT: &str = "batch_put";
const RDS_STORE_OP_RANGE_QUERY: &str = "range_query";
const RDS_STORE_OP_RANGE_DELETE: &str = "range_delete";
const RDS_STORE_OP_BATCH_DELETE: &str = "batch_delete";
#[cfg(feature = "pg_kvbackend")]
mod postgres;
#[cfg(feature = "pg_kvbackend")]
@@ -560,3 +566,21 @@ fn check_txn_ops(txn_ops: &[TxnOp]) -> Result<bool> {
});
Ok(same)
}
#[macro_export]
macro_rules! record_rds_sql_execute_elapsed {
($result:expr, $label_store:expr,$label_op:expr,$label_type:expr) => {{
let timer = std::time::Instant::now();
$result
.inspect(|_| {
$crate::metrics::RDS_SQL_EXECUTE_ELAPSED
.with_label_values(&[$label_store, "success", $label_op, $label_type])
.observe(timer.elapsed().as_millis_f64())
})
.inspect_err(|_| {
$crate::metrics::RDS_SQL_EXECUTE_ELAPSED
.with_label_values(&[$label_store, "error", $label_op, $label_type])
.observe(timer.elapsed().as_millis_f64());
})
}};
}

View File

@@ -20,11 +20,13 @@ use snafu::ResultExt;
use sqlx::mysql::MySqlRow;
use sqlx::pool::Pool;
use sqlx::{MySql, MySqlPool, Row, Transaction as MySqlTransaction};
use strum::AsRefStr;
use crate::error::{CreateMySqlPoolSnafu, MySqlExecutionSnafu, MySqlTransactionSnafu, Result};
use crate::kv_backend::rds::{
Executor, ExecutorFactory, ExecutorImpl, KvQueryExecutor, RdsStore, Transaction,
RDS_STORE_TXN_RETRY_COUNT,
RDS_STORE_OP_BATCH_DELETE, RDS_STORE_OP_BATCH_GET, RDS_STORE_OP_BATCH_PUT,
RDS_STORE_OP_RANGE_DELETE, RDS_STORE_OP_RANGE_QUERY, RDS_STORE_TXN_RETRY_COUNT,
};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{
@@ -33,6 +35,8 @@ use crate::rpc::store::{
};
use crate::rpc::KeyValue;
const MYSQL_STORE_NAME: &str = "mysql_store";
type MySqlClient = Arc<Pool<MySql>>;
pub struct MySqlTxnClient(MySqlTransaction<'static, MySql>);
@@ -47,7 +51,7 @@ fn key_value_from_row(row: MySqlRow) -> KeyValue {
const EMPTY: &[u8] = &[0];
/// Type of range template.
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, AsRefStr)]
enum RangeTemplateType {
Point,
Range,
@@ -58,6 +62,8 @@ enum RangeTemplateType {
/// Builds params for the given range template type.
impl RangeTemplateType {
/// Builds the parameters for the given range template type.
/// You can check out the conventions at [RangeRequest]
fn build_params(&self, mut key: Vec<u8>, range_end: Vec<u8>) -> Vec<Vec<u8>> {
match self {
RangeTemplateType::Point => vec![key],
@@ -343,7 +349,12 @@ impl KvQueryExecutor<MySqlClient> for MySqlStore {
RangeTemplate::with_limit(template, if req.limit == 0 { 0 } else { req.limit + 1 });
let limit = req.limit as usize;
debug!("query: {:?}, params: {:?}", query, params);
let mut kvs = query_executor.query(&query, &params_ref).await?;
let mut kvs = crate::record_rds_sql_execute_elapsed!(
query_executor.query(&query, &params_ref).await,
MYSQL_STORE_NAME,
RDS_STORE_OP_RANGE_QUERY,
template_type.as_ref()
)?;
if req.keys_only {
kvs.iter_mut().for_each(|kv| kv.value = vec![]);
}
@@ -381,7 +392,12 @@ impl KvQueryExecutor<MySqlClient> for MySqlStore {
// Fast path: if we don't need previous kvs, we can just upsert the keys.
if !req.prev_kv {
query_executor.execute(&update, &values_params).await?;
crate::record_rds_sql_execute_elapsed!(
query_executor.execute(&update, &values_params).await,
MYSQL_STORE_NAME,
RDS_STORE_OP_BATCH_PUT,
""
)?;
return Ok(BatchPutResponse::default());
}
// Should use transaction to ensure atomicity.
@@ -392,7 +408,12 @@ impl KvQueryExecutor<MySqlClient> for MySqlStore {
txn.commit().await?;
return res;
}
let prev_kvs = query_executor.query(&select, &in_params).await?;
let prev_kvs = crate::record_rds_sql_execute_elapsed!(
query_executor.query(&select, &in_params).await,
MYSQL_STORE_NAME,
RDS_STORE_OP_BATCH_PUT,
""
)?;
query_executor.execute(&update, &values_params).await?;
Ok(BatchPutResponse { prev_kvs })
}
@@ -409,7 +430,12 @@ impl KvQueryExecutor<MySqlClient> for MySqlStore {
.sql_template_set
.generate_batch_get_query(req.keys.len());
let params = req.keys.iter().map(|x| x as _).collect::<Vec<_>>();
let kvs = query_executor.query(&query, &params).await?;
let kvs = crate::record_rds_sql_execute_elapsed!(
query_executor.query(&query, &params).await,
MYSQL_STORE_NAME,
RDS_STORE_OP_BATCH_GET,
""
)?;
Ok(BatchGetResponse { kvs })
}
@@ -441,7 +467,12 @@ impl KvQueryExecutor<MySqlClient> for MySqlStore {
let template = self.sql_template_set.delete_template.get(template_type);
let params = template_type.build_params(req.key, req.range_end);
let params_ref = params.iter().map(|x| x as _).collect::<Vec<_>>();
query_executor.execute(template, &params_ref).await?;
crate::record_rds_sql_execute_elapsed!(
query_executor.execute(template, &params_ref).await,
MYSQL_STORE_NAME,
RDS_STORE_OP_RANGE_DELETE,
template_type.as_ref()
)?;
let mut resp = DeleteRangeResponse::new(prev_kvs.len() as i64);
if req.prev_kv {
resp.with_prev_kvs(prev_kvs);
@@ -463,7 +494,12 @@ impl KvQueryExecutor<MySqlClient> for MySqlStore {
let params = req.keys.iter().map(|x| x as _).collect::<Vec<_>>();
// Fast path: if we don't need previous kvs, we can just delete the keys.
if !req.prev_kv {
query_executor.execute(&query, &params).await?;
crate::record_rds_sql_execute_elapsed!(
query_executor.execute(&query, &params).await,
MYSQL_STORE_NAME,
RDS_STORE_OP_BATCH_DELETE,
""
)?;
return Ok(BatchDeleteResponse::default());
}
// Should use transaction to ensure atomicity.
@@ -483,7 +519,12 @@ impl KvQueryExecutor<MySqlClient> for MySqlStore {
.await?
.kvs;
// Pure `DELETE` has no return value, so we need to use `execute` instead of `query`.
query_executor.execute(&query, &params).await?;
crate::record_rds_sql_execute_elapsed!(
query_executor.execute(&query, &params).await,
MYSQL_STORE_NAME,
RDS_STORE_OP_BATCH_DELETE,
""
)?;
if req.prev_kv {
Ok(BatchDeleteResponse { prev_kvs })
} else {

View File

@@ -18,6 +18,7 @@ use std::sync::Arc;
use common_telemetry::debug;
use deadpool_postgres::{Config, Pool, Runtime};
use snafu::ResultExt;
use strum::AsRefStr;
use tokio_postgres::types::ToSql;
use tokio_postgres::{IsolationLevel, NoTls, Row};
@@ -27,7 +28,8 @@ use crate::error::{
};
use crate::kv_backend::rds::{
Executor, ExecutorFactory, ExecutorImpl, KvQueryExecutor, RdsStore, Transaction,
RDS_STORE_TXN_RETRY_COUNT,
RDS_STORE_OP_BATCH_DELETE, RDS_STORE_OP_BATCH_GET, RDS_STORE_OP_BATCH_PUT,
RDS_STORE_OP_RANGE_DELETE, RDS_STORE_OP_RANGE_QUERY, RDS_STORE_TXN_RETRY_COUNT,
};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{
@@ -36,6 +38,8 @@ use crate::rpc::store::{
};
use crate::rpc::KeyValue;
const PG_STORE_NAME: &str = "pg_store";
pub struct PgClient(deadpool::managed::Object<deadpool_postgres::Manager>);
pub struct PgTxnClient<'a>(deadpool_postgres::Transaction<'a>);
@@ -50,7 +54,7 @@ fn key_value_from_row(r: Row) -> KeyValue {
const EMPTY: &[u8] = &[0];
/// Type of range template.
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, AsRefStr)]
enum RangeTemplateType {
Point,
Range,
@@ -61,6 +65,8 @@ enum RangeTemplateType {
/// Builds params for the given range template type.
impl RangeTemplateType {
/// Builds the parameters for the given range template type.
/// You can check out the conventions at [RangeRequest]
fn build_params(&self, mut key: Vec<u8>, range_end: Vec<u8>) -> Vec<Vec<u8>> {
match self {
RangeTemplateType::Point => vec![key],
@@ -358,7 +364,13 @@ impl KvQueryExecutor<PgClient> for PgStore {
RangeTemplate::with_limit(template, if req.limit == 0 { 0 } else { req.limit + 1 });
let limit = req.limit as usize;
debug!("query: {:?}, params: {:?}", query, params);
let mut kvs = query_executor.query(&query, &params_ref).await?;
let mut kvs = crate::record_rds_sql_execute_elapsed!(
query_executor.query(&query, &params_ref).await,
PG_STORE_NAME,
RDS_STORE_OP_RANGE_QUERY,
template_type.as_ref()
)?;
if req.keys_only {
kvs.iter_mut().for_each(|kv| kv.value = vec![]);
}
@@ -393,7 +405,13 @@ impl KvQueryExecutor<PgClient> for PgStore {
let query = self
.sql_template_set
.generate_batch_upsert_query(req.kvs.len());
let kvs = query_executor.query(&query, &params).await?;
let kvs = crate::record_rds_sql_execute_elapsed!(
query_executor.query(&query, &params).await,
PG_STORE_NAME,
RDS_STORE_OP_BATCH_PUT,
""
)?;
if req.prev_kv {
Ok(BatchPutResponse { prev_kvs: kvs })
} else {
@@ -414,7 +432,12 @@ impl KvQueryExecutor<PgClient> for PgStore {
.sql_template_set
.generate_batch_get_query(req.keys.len());
let params = req.keys.iter().map(|x| x as _).collect::<Vec<_>>();
let kvs = query_executor.query(&query, &params).await?;
let kvs = crate::record_rds_sql_execute_elapsed!(
query_executor.query(&query, &params).await,
PG_STORE_NAME,
RDS_STORE_OP_BATCH_GET,
""
)?;
Ok(BatchGetResponse { kvs })
}
@@ -427,7 +450,12 @@ impl KvQueryExecutor<PgClient> for PgStore {
let template = self.sql_template_set.delete_template.get(template_type);
let params = template_type.build_params(req.key, req.range_end);
let params_ref = params.iter().map(|x| x as _).collect::<Vec<_>>();
let kvs = query_executor.query(template, &params_ref).await?;
let kvs = crate::record_rds_sql_execute_elapsed!(
query_executor.query(template, &params_ref).await,
PG_STORE_NAME,
RDS_STORE_OP_RANGE_DELETE,
template_type.as_ref()
)?;
let mut resp = DeleteRangeResponse::new(kvs.len() as i64);
if req.prev_kv {
resp.with_prev_kvs(kvs);
@@ -447,7 +475,13 @@ impl KvQueryExecutor<PgClient> for PgStore {
.sql_template_set
.generate_batch_delete_query(req.keys.len());
let params = req.keys.iter().map(|x| x as _).collect::<Vec<_>>();
let kvs = query_executor.query(&query, &params).await?;
let kvs = crate::record_rds_sql_execute_elapsed!(
query_executor.query(&query, &params).await,
PG_STORE_NAME,
RDS_STORE_OP_BATCH_DELETE,
""
)?;
if req.prev_kv {
Ok(BatchDeleteResponse { prev_kvs: kvs })
} else {

View File

@@ -15,6 +15,7 @@
#![feature(assert_matches)]
#![feature(btree_extract_if)]
#![feature(let_chains)]
#![feature(duration_millis_float)]
pub mod cache;
pub mod cache_invalidator;

View File

@@ -108,4 +108,10 @@ lazy_static! {
&["name"]
)
.unwrap();
pub static ref RDS_SQL_EXECUTE_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_meta_rds_pg_sql_execute_elapsed_ms",
"rds pg sql execute elapsed",
&["backend", "result", "op", "type"]
)
.unwrap();
}