From dc3591655e12a412c23dae54ea8f3511857087ca Mon Sep 17 00:00:00 2001 From: localhost Date: Wed, 28 May 2025 21:11:11 +0800 Subject: [PATCH] chore: add metrics for rds kv backend (#6201) * chore: add metrics for rds kv backend * chore: make clippy happy * chore: remove useless rds timer * chore: remove in record_rds_sql_execute_elapsed macro * chore: change some str literal to constant * chore: fix import issue * chore: remove impl Display for RangeTemplateType --- src/common/meta/src/kv_backend/rds.rs | 24 ++++++++ src/common/meta/src/kv_backend/rds/mysql.rs | 59 ++++++++++++++++--- .../meta/src/kv_backend/rds/postgres.rs | 48 ++++++++++++--- src/common/meta/src/lib.rs | 1 + src/common/meta/src/metrics.rs | 6 ++ 5 files changed, 122 insertions(+), 16 deletions(-) diff --git a/src/common/meta/src/kv_backend/rds.rs b/src/common/meta/src/kv_backend/rds.rs index dbd28c5d78..8e57964051 100644 --- a/src/common/meta/src/kv_backend/rds.rs +++ b/src/common/meta/src/kv_backend/rds.rs @@ -33,6 +33,12 @@ use crate::rpc::store::{ }; use crate::rpc::KeyValue; +const RDS_STORE_OP_BATCH_GET: &str = "batch_get"; +const RDS_STORE_OP_BATCH_PUT: &str = "batch_put"; +const RDS_STORE_OP_RANGE_QUERY: &str = "range_query"; +const RDS_STORE_OP_RANGE_DELETE: &str = "range_delete"; +const RDS_STORE_OP_BATCH_DELETE: &str = "batch_delete"; + #[cfg(feature = "pg_kvbackend")] mod postgres; #[cfg(feature = "pg_kvbackend")] @@ -560,3 +566,21 @@ fn check_txn_ops(txn_ops: &[TxnOp]) -> Result { }); Ok(same) } + +#[macro_export] +macro_rules! record_rds_sql_execute_elapsed { + ($result:expr, $label_store:expr,$label_op:expr,$label_type:expr) => {{ + let timer = std::time::Instant::now(); + $result + .inspect(|_| { + $crate::metrics::RDS_SQL_EXECUTE_ELAPSED + .with_label_values(&[$label_store, "success", $label_op, $label_type]) + .observe(timer.elapsed().as_millis_f64()) + }) + .inspect_err(|_| { + $crate::metrics::RDS_SQL_EXECUTE_ELAPSED + .with_label_values(&[$label_store, "error", $label_op, $label_type]) + .observe(timer.elapsed().as_millis_f64()); + }) + }}; +} diff --git a/src/common/meta/src/kv_backend/rds/mysql.rs b/src/common/meta/src/kv_backend/rds/mysql.rs index b6281a5490..c85c1278f7 100644 --- a/src/common/meta/src/kv_backend/rds/mysql.rs +++ b/src/common/meta/src/kv_backend/rds/mysql.rs @@ -20,11 +20,13 @@ use snafu::ResultExt; use sqlx::mysql::MySqlRow; use sqlx::pool::Pool; use sqlx::{MySql, MySqlPool, Row, Transaction as MySqlTransaction}; +use strum::AsRefStr; use crate::error::{CreateMySqlPoolSnafu, MySqlExecutionSnafu, MySqlTransactionSnafu, Result}; use crate::kv_backend::rds::{ Executor, ExecutorFactory, ExecutorImpl, KvQueryExecutor, RdsStore, Transaction, - RDS_STORE_TXN_RETRY_COUNT, + RDS_STORE_OP_BATCH_DELETE, RDS_STORE_OP_BATCH_GET, RDS_STORE_OP_BATCH_PUT, + RDS_STORE_OP_RANGE_DELETE, RDS_STORE_OP_RANGE_QUERY, RDS_STORE_TXN_RETRY_COUNT, }; use crate::kv_backend::KvBackendRef; use crate::rpc::store::{ @@ -33,6 +35,8 @@ use crate::rpc::store::{ }; use crate::rpc::KeyValue; +const MYSQL_STORE_NAME: &str = "mysql_store"; + type MySqlClient = Arc>; pub struct MySqlTxnClient(MySqlTransaction<'static, MySql>); @@ -47,7 +51,7 @@ fn key_value_from_row(row: MySqlRow) -> KeyValue { const EMPTY: &[u8] = &[0]; /// Type of range template. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, AsRefStr)] enum RangeTemplateType { Point, Range, @@ -58,6 +62,8 @@ enum RangeTemplateType { /// Builds params for the given range template type. impl RangeTemplateType { + /// Builds the parameters for the given range template type. + /// You can check out the conventions at [RangeRequest] fn build_params(&self, mut key: Vec, range_end: Vec) -> Vec> { match self { RangeTemplateType::Point => vec![key], @@ -343,7 +349,12 @@ impl KvQueryExecutor for MySqlStore { RangeTemplate::with_limit(template, if req.limit == 0 { 0 } else { req.limit + 1 }); let limit = req.limit as usize; debug!("query: {:?}, params: {:?}", query, params); - let mut kvs = query_executor.query(&query, ¶ms_ref).await?; + let mut kvs = crate::record_rds_sql_execute_elapsed!( + query_executor.query(&query, ¶ms_ref).await, + MYSQL_STORE_NAME, + RDS_STORE_OP_RANGE_QUERY, + template_type.as_ref() + )?; if req.keys_only { kvs.iter_mut().for_each(|kv| kv.value = vec![]); } @@ -381,7 +392,12 @@ impl KvQueryExecutor for MySqlStore { // Fast path: if we don't need previous kvs, we can just upsert the keys. if !req.prev_kv { - query_executor.execute(&update, &values_params).await?; + crate::record_rds_sql_execute_elapsed!( + query_executor.execute(&update, &values_params).await, + MYSQL_STORE_NAME, + RDS_STORE_OP_BATCH_PUT, + "" + )?; return Ok(BatchPutResponse::default()); } // Should use transaction to ensure atomicity. @@ -392,7 +408,12 @@ impl KvQueryExecutor for MySqlStore { txn.commit().await?; return res; } - let prev_kvs = query_executor.query(&select, &in_params).await?; + let prev_kvs = crate::record_rds_sql_execute_elapsed!( + query_executor.query(&select, &in_params).await, + MYSQL_STORE_NAME, + RDS_STORE_OP_BATCH_PUT, + "" + )?; query_executor.execute(&update, &values_params).await?; Ok(BatchPutResponse { prev_kvs }) } @@ -409,7 +430,12 @@ impl KvQueryExecutor for MySqlStore { .sql_template_set .generate_batch_get_query(req.keys.len()); let params = req.keys.iter().map(|x| x as _).collect::>(); - let kvs = query_executor.query(&query, ¶ms).await?; + let kvs = crate::record_rds_sql_execute_elapsed!( + query_executor.query(&query, ¶ms).await, + MYSQL_STORE_NAME, + RDS_STORE_OP_BATCH_GET, + "" + )?; Ok(BatchGetResponse { kvs }) } @@ -441,7 +467,12 @@ impl KvQueryExecutor for MySqlStore { let template = self.sql_template_set.delete_template.get(template_type); let params = template_type.build_params(req.key, req.range_end); let params_ref = params.iter().map(|x| x as _).collect::>(); - query_executor.execute(template, ¶ms_ref).await?; + crate::record_rds_sql_execute_elapsed!( + query_executor.execute(template, ¶ms_ref).await, + MYSQL_STORE_NAME, + RDS_STORE_OP_RANGE_DELETE, + template_type.as_ref() + )?; let mut resp = DeleteRangeResponse::new(prev_kvs.len() as i64); if req.prev_kv { resp.with_prev_kvs(prev_kvs); @@ -463,7 +494,12 @@ impl KvQueryExecutor for MySqlStore { let params = req.keys.iter().map(|x| x as _).collect::>(); // Fast path: if we don't need previous kvs, we can just delete the keys. if !req.prev_kv { - query_executor.execute(&query, ¶ms).await?; + crate::record_rds_sql_execute_elapsed!( + query_executor.execute(&query, ¶ms).await, + MYSQL_STORE_NAME, + RDS_STORE_OP_BATCH_DELETE, + "" + )?; return Ok(BatchDeleteResponse::default()); } // Should use transaction to ensure atomicity. @@ -483,7 +519,12 @@ impl KvQueryExecutor for MySqlStore { .await? .kvs; // Pure `DELETE` has no return value, so we need to use `execute` instead of `query`. - query_executor.execute(&query, ¶ms).await?; + crate::record_rds_sql_execute_elapsed!( + query_executor.execute(&query, ¶ms).await, + MYSQL_STORE_NAME, + RDS_STORE_OP_BATCH_DELETE, + "" + )?; if req.prev_kv { Ok(BatchDeleteResponse { prev_kvs }) } else { diff --git a/src/common/meta/src/kv_backend/rds/postgres.rs b/src/common/meta/src/kv_backend/rds/postgres.rs index 349e233e63..0b96882694 100644 --- a/src/common/meta/src/kv_backend/rds/postgres.rs +++ b/src/common/meta/src/kv_backend/rds/postgres.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use common_telemetry::debug; use deadpool_postgres::{Config, Pool, Runtime}; use snafu::ResultExt; +use strum::AsRefStr; use tokio_postgres::types::ToSql; use tokio_postgres::{IsolationLevel, NoTls, Row}; @@ -27,7 +28,8 @@ use crate::error::{ }; use crate::kv_backend::rds::{ Executor, ExecutorFactory, ExecutorImpl, KvQueryExecutor, RdsStore, Transaction, - RDS_STORE_TXN_RETRY_COUNT, + RDS_STORE_OP_BATCH_DELETE, RDS_STORE_OP_BATCH_GET, RDS_STORE_OP_BATCH_PUT, + RDS_STORE_OP_RANGE_DELETE, RDS_STORE_OP_RANGE_QUERY, RDS_STORE_TXN_RETRY_COUNT, }; use crate::kv_backend::KvBackendRef; use crate::rpc::store::{ @@ -36,6 +38,8 @@ use crate::rpc::store::{ }; use crate::rpc::KeyValue; +const PG_STORE_NAME: &str = "pg_store"; + pub struct PgClient(deadpool::managed::Object); pub struct PgTxnClient<'a>(deadpool_postgres::Transaction<'a>); @@ -50,7 +54,7 @@ fn key_value_from_row(r: Row) -> KeyValue { const EMPTY: &[u8] = &[0]; /// Type of range template. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, AsRefStr)] enum RangeTemplateType { Point, Range, @@ -61,6 +65,8 @@ enum RangeTemplateType { /// Builds params for the given range template type. impl RangeTemplateType { + /// Builds the parameters for the given range template type. + /// You can check out the conventions at [RangeRequest] fn build_params(&self, mut key: Vec, range_end: Vec) -> Vec> { match self { RangeTemplateType::Point => vec![key], @@ -358,7 +364,13 @@ impl KvQueryExecutor for PgStore { RangeTemplate::with_limit(template, if req.limit == 0 { 0 } else { req.limit + 1 }); let limit = req.limit as usize; debug!("query: {:?}, params: {:?}", query, params); - let mut kvs = query_executor.query(&query, ¶ms_ref).await?; + let mut kvs = crate::record_rds_sql_execute_elapsed!( + query_executor.query(&query, ¶ms_ref).await, + PG_STORE_NAME, + RDS_STORE_OP_RANGE_QUERY, + template_type.as_ref() + )?; + if req.keys_only { kvs.iter_mut().for_each(|kv| kv.value = vec![]); } @@ -393,7 +405,13 @@ impl KvQueryExecutor for PgStore { let query = self .sql_template_set .generate_batch_upsert_query(req.kvs.len()); - let kvs = query_executor.query(&query, ¶ms).await?; + + let kvs = crate::record_rds_sql_execute_elapsed!( + query_executor.query(&query, ¶ms).await, + PG_STORE_NAME, + RDS_STORE_OP_BATCH_PUT, + "" + )?; if req.prev_kv { Ok(BatchPutResponse { prev_kvs: kvs }) } else { @@ -414,7 +432,12 @@ impl KvQueryExecutor for PgStore { .sql_template_set .generate_batch_get_query(req.keys.len()); let params = req.keys.iter().map(|x| x as _).collect::>(); - let kvs = query_executor.query(&query, ¶ms).await?; + let kvs = crate::record_rds_sql_execute_elapsed!( + query_executor.query(&query, ¶ms).await, + PG_STORE_NAME, + RDS_STORE_OP_BATCH_GET, + "" + )?; Ok(BatchGetResponse { kvs }) } @@ -427,7 +450,12 @@ impl KvQueryExecutor for PgStore { let template = self.sql_template_set.delete_template.get(template_type); let params = template_type.build_params(req.key, req.range_end); let params_ref = params.iter().map(|x| x as _).collect::>(); - let kvs = query_executor.query(template, ¶ms_ref).await?; + let kvs = crate::record_rds_sql_execute_elapsed!( + query_executor.query(template, ¶ms_ref).await, + PG_STORE_NAME, + RDS_STORE_OP_RANGE_DELETE, + template_type.as_ref() + )?; let mut resp = DeleteRangeResponse::new(kvs.len() as i64); if req.prev_kv { resp.with_prev_kvs(kvs); @@ -447,7 +475,13 @@ impl KvQueryExecutor for PgStore { .sql_template_set .generate_batch_delete_query(req.keys.len()); let params = req.keys.iter().map(|x| x as _).collect::>(); - let kvs = query_executor.query(&query, ¶ms).await?; + + let kvs = crate::record_rds_sql_execute_elapsed!( + query_executor.query(&query, ¶ms).await, + PG_STORE_NAME, + RDS_STORE_OP_BATCH_DELETE, + "" + )?; if req.prev_kv { Ok(BatchDeleteResponse { prev_kvs: kvs }) } else { diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index 8cdcf35c9d..50d3cdc8d3 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -15,6 +15,7 @@ #![feature(assert_matches)] #![feature(btree_extract_if)] #![feature(let_chains)] +#![feature(duration_millis_float)] pub mod cache; pub mod cache_invalidator; diff --git a/src/common/meta/src/metrics.rs b/src/common/meta/src/metrics.rs index 4626074f73..2df82c8aba 100644 --- a/src/common/meta/src/metrics.rs +++ b/src/common/meta/src/metrics.rs @@ -108,4 +108,10 @@ lazy_static! { &["name"] ) .unwrap(); + pub static ref RDS_SQL_EXECUTE_ELAPSED: HistogramVec = register_histogram_vec!( + "greptime_meta_rds_pg_sql_execute_elapsed_ms", + "rds pg sql execute elapsed", + &["backend", "result", "op", "type"] + ) + .unwrap(); }