From 63ee7c0831f79c747e7c985eb35cf6edb66d91b6 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Mon, 12 Jan 2026 15:03:55 +0800 Subject: [PATCH] fix: fix SQL table identifier quoting for election (#7541) (#7556) fix: fix SQL table identifier quoting for election and RDS kv-backend - Quote MySQL table names with backticks and PostgreSQL tables with double quotes in election and RDS kv-backend SQL - Update related tests to use quoted identifiers and cover hyphenated table names - Ensure dynamic SQL using table names is safe for special characters in identifiers Signed-off-by: WenyXu Signed-off-by: evenyag Co-authored-by: Weny Xu --- src/common/meta/src/kv_backend/rds/mysql.rs | 20 +++++++------- .../meta/src/kv_backend/rds/postgres.rs | 20 +++++++------- src/meta-srv/src/election/rds/mysql.rs | 26 +++++++++---------- 3 files changed, 33 insertions(+), 33 deletions(-) diff --git a/src/common/meta/src/kv_backend/rds/mysql.rs b/src/common/meta/src/kv_backend/rds/mysql.rs index cb54d2cc7a..ecfc99cd45 100644 --- a/src/common/meta/src/kv_backend/rds/mysql.rs +++ b/src/common/meta/src/kv_backend/rds/mysql.rs @@ -611,7 +611,7 @@ mod tests { #[tokio::test] async fn test_mysql_put() { maybe_skip_mysql_integration_test!(); - let kv_backend = build_mysql_kv_backend("put_test").await.unwrap(); + let kv_backend = build_mysql_kv_backend("put-test").await.unwrap(); let prefix = b"put/"; prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await; @@ -621,7 +621,7 @@ mod tests { #[tokio::test] async fn test_mysql_range() { maybe_skip_mysql_integration_test!(); - let kv_backend = build_mysql_kv_backend("range_test").await.unwrap(); + let kv_backend = build_mysql_kv_backend("range-test").await.unwrap(); let prefix = b"range/"; prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await; @@ -631,7 +631,7 @@ mod tests { #[tokio::test] async fn test_mysql_range_2() { maybe_skip_mysql_integration_test!(); - let kv_backend = build_mysql_kv_backend("range2_test").await.unwrap(); + let kv_backend = build_mysql_kv_backend("range2-test").await.unwrap(); let prefix = b"range2/"; test_kv_range_2_with_prefix(&kv_backend, prefix.to_vec()).await; unprepare_kv(&kv_backend, prefix).await; @@ -640,7 +640,7 @@ mod tests { #[tokio::test] async fn test_mysql_all_range() { maybe_skip_mysql_integration_test!(); - let kv_backend = build_mysql_kv_backend("simple_range_test").await.unwrap(); + let kv_backend = build_mysql_kv_backend("simple_range-test").await.unwrap(); let prefix = b""; prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; test_simple_kv_range(&kv_backend).await; @@ -650,7 +650,7 @@ mod tests { #[tokio::test] async fn test_mysql_batch_get() { maybe_skip_mysql_integration_test!(); - let kv_backend = build_mysql_kv_backend("batch_get_test").await.unwrap(); + let kv_backend = build_mysql_kv_backend("batch_get-test").await.unwrap(); let prefix = b"batch_get/"; prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await; @@ -660,7 +660,7 @@ mod tests { #[tokio::test] async fn test_mysql_batch_delete() { maybe_skip_mysql_integration_test!(); - let kv_backend = build_mysql_kv_backend("batch_delete_test").await.unwrap(); + let kv_backend = build_mysql_kv_backend("batch_delete-test").await.unwrap(); let prefix = b"batch_delete/"; prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await; @@ -670,7 +670,7 @@ mod tests { #[tokio::test] async fn test_mysql_batch_delete_with_prefix() { maybe_skip_mysql_integration_test!(); - let kv_backend = build_mysql_kv_backend("batch_delete_with_prefix_test") + let kv_backend = build_mysql_kv_backend("batch_delete_with_prefix-test") .await .unwrap(); let prefix = b"batch_delete/"; @@ -682,7 +682,7 @@ mod tests { #[tokio::test] async fn test_mysql_delete_range() { maybe_skip_mysql_integration_test!(); - let kv_backend = build_mysql_kv_backend("delete_range_test").await.unwrap(); + let kv_backend = build_mysql_kv_backend("delete_range-test").await.unwrap(); let prefix = b"delete_range/"; prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await; @@ -692,7 +692,7 @@ mod tests { #[tokio::test] async fn test_mysql_compare_and_put() { maybe_skip_mysql_integration_test!(); - let kv_backend = build_mysql_kv_backend("compare_and_put_test") + let kv_backend = build_mysql_kv_backend("compare_and_put-test") .await .unwrap(); let prefix = b"compare_and_put/"; @@ -703,7 +703,7 @@ mod tests { #[tokio::test] async fn test_mysql_txn() { maybe_skip_mysql_integration_test!(); - let kv_backend = build_mysql_kv_backend("txn_test").await.unwrap(); + let kv_backend = build_mysql_kv_backend("txn-test").await.unwrap(); test_txn_one_compare_op(&kv_backend).await; text_txn_multi_compare_op(&kv_backend).await; test_txn_compare_equal(&kv_backend).await; diff --git a/src/common/meta/src/kv_backend/rds/postgres.rs b/src/common/meta/src/kv_backend/rds/postgres.rs index e0661f6f7a..bf130bbc2d 100644 --- a/src/common/meta/src/kv_backend/rds/postgres.rs +++ b/src/common/meta/src/kv_backend/rds/postgres.rs @@ -585,7 +585,7 @@ mod tests { #[tokio::test] async fn test_pg_put() { maybe_skip_postgres_integration_test!(); - let kv_backend = build_pg_kv_backend("put_test").await.unwrap(); + let kv_backend = build_pg_kv_backend("put-test").await.unwrap(); let prefix = b"put/"; prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await; @@ -595,7 +595,7 @@ mod tests { #[tokio::test] async fn test_pg_range() { maybe_skip_postgres_integration_test!(); - let kv_backend = build_pg_kv_backend("range_test").await.unwrap(); + let kv_backend = build_pg_kv_backend("range-test").await.unwrap(); let prefix = b"range/"; prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await; @@ -605,7 +605,7 @@ mod tests { #[tokio::test] async fn test_pg_range_2() { maybe_skip_postgres_integration_test!(); - let kv_backend = build_pg_kv_backend("range2_test").await.unwrap(); + let kv_backend = build_pg_kv_backend("range2-test").await.unwrap(); let prefix = b"range2/"; test_kv_range_2_with_prefix(&kv_backend, prefix.to_vec()).await; unprepare_kv(&kv_backend, prefix).await; @@ -614,7 +614,7 @@ mod tests { #[tokio::test] async fn test_pg_all_range() { maybe_skip_postgres_integration_test!(); - let kv_backend = build_pg_kv_backend("simple_range_test").await.unwrap(); + let kv_backend = build_pg_kv_backend("simple_range-test").await.unwrap(); let prefix = b""; prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; test_simple_kv_range(&kv_backend).await; @@ -624,7 +624,7 @@ mod tests { #[tokio::test] async fn test_pg_batch_get() { maybe_skip_postgres_integration_test!(); - let kv_backend = build_pg_kv_backend("batch_get_test").await.unwrap(); + let kv_backend = build_pg_kv_backend("batch_get-test").await.unwrap(); let prefix = b"batch_get/"; prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await; @@ -634,7 +634,7 @@ mod tests { #[tokio::test] async fn test_pg_batch_delete() { maybe_skip_postgres_integration_test!(); - let kv_backend = build_pg_kv_backend("batch_delete_test").await.unwrap(); + let kv_backend = build_pg_kv_backend("batch_delete-test").await.unwrap(); let prefix = b"batch_delete/"; prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await; @@ -644,7 +644,7 @@ mod tests { #[tokio::test] async fn test_pg_batch_delete_with_prefix() { maybe_skip_postgres_integration_test!(); - let kv_backend = build_pg_kv_backend("batch_delete_with_prefix_test") + let kv_backend = build_pg_kv_backend("batch_delete_with_prefix-test") .await .unwrap(); let prefix = b"batch_delete/"; @@ -656,7 +656,7 @@ mod tests { #[tokio::test] async fn test_pg_delete_range() { maybe_skip_postgres_integration_test!(); - let kv_backend = build_pg_kv_backend("delete_range_test").await.unwrap(); + let kv_backend = build_pg_kv_backend("delete_range-test").await.unwrap(); let prefix = b"delete_range/"; prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await; @@ -666,7 +666,7 @@ mod tests { #[tokio::test] async fn test_pg_compare_and_put() { maybe_skip_postgres_integration_test!(); - let kv_backend = build_pg_kv_backend("compare_and_put_test").await.unwrap(); + let kv_backend = build_pg_kv_backend("compare_and_put-test").await.unwrap(); let prefix = b"compare_and_put/"; let kv_backend = Arc::new(kv_backend); test_kv_compare_and_put_with_prefix(kv_backend.clone(), prefix.to_vec()).await; @@ -675,7 +675,7 @@ mod tests { #[tokio::test] async fn test_pg_txn() { maybe_skip_postgres_integration_test!(); - let kv_backend = build_pg_kv_backend("txn_test").await.unwrap(); + let kv_backend = build_pg_kv_backend("txn-test").await.unwrap(); test_txn_one_compare_op(&kv_backend).await; text_txn_multi_compare_op(&kv_backend).await; test_txn_compare_equal(&kv_backend).await; diff --git a/src/meta-srv/src/election/rds/mysql.rs b/src/meta-srv/src/election/rds/mysql.rs index de531adc38..e07c89f010 100644 --- a/src/meta-srv/src/election/rds/mysql.rs +++ b/src/meta-srv/src/election/rds/mysql.rs @@ -149,7 +149,7 @@ impl<'a> ElectionSqlFactory<'a> { } fn delete_value_sql(&self) -> String { - format!("DELETE FROM {} WHERE k = ?;", self.table_name) + format!("DELETE FROM `{}` WHERE k = ?;", self.table_name) } } @@ -1014,7 +1014,7 @@ mod tests { execution_timeout, Duration::from_secs(1), wait_timeout, - table_name.unwrap_or("default_greptime_metakv_election"), + table_name.unwrap_or("default_greptime_metakv-election"), ); client.maybe_init_client().await?; if table_name.is_some() { @@ -1025,7 +1025,7 @@ mod tests { async fn drop_table(client: &Mutex, table_name: &str) { let mut client = client.lock().await; - let sql = format!("DROP TABLE IF EXISTS {};", table_name); + let sql = format!("DROP TABLE IF EXISTS `{}`;", table_name); client.execute(sqlx::query(&sql), &sql).await.unwrap(); } @@ -1036,7 +1036,7 @@ mod tests { let value = "test_value".to_string(); let uuid = uuid::Uuid::new_v4().to_string(); - let table_name = "test_mysql_crud_greptime_metakv"; + let table_name = "test_mysql_crud_greptime-metakv"; let candidate_lease_ttl = Duration::from_secs(10); let meta_lease_ttl = Duration::from_secs(2); @@ -1050,7 +1050,7 @@ mod tests { let mut a = client.lock().await; let txn = a.transaction().await.unwrap(); let mut executor = Executor::Txn(txn); - let raw_query = format!("SELECT * FROM {} FOR UPDATE;", table_name); + let raw_query = format!("SELECT * FROM `{}` FOR UPDATE;", table_name); let query = sqlx::query(&raw_query); let _ = executor.query(query, &raw_query).await.unwrap(); } @@ -1181,7 +1181,7 @@ mod tests { let meta_lease_ttl = Duration::from_secs(2); let idle_session_timeout = Duration::from_secs(0); let uuid = uuid::Uuid::new_v4().to_string(); - let table_name = "test_candidate_registration_greptime_metakv"; + let table_name = "test_candidate_registration_greptime-metakv"; let mut handles = vec![]; let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout) .await @@ -1251,7 +1251,7 @@ mod tests { let mut client = election.client.lock().await; let txn = client.transaction().await.unwrap(); let mut executor = Executor::Txn(txn); - let raw_query = format!("SELECT * FROM {} FOR UPDATE;", table_name); + let raw_query = format!("SELECT * FROM `{}` FOR UPDATE;", table_name); let query = sqlx::query(&raw_query); let _ = executor.query(query, &raw_query).await.unwrap(); election.elected(executor, expected_lease).await @@ -1275,7 +1275,7 @@ mod tests { let execution_timeout = Duration::from_secs(10); let idle_session_timeout = Duration::from_secs(0); let uuid = uuid::Uuid::new_v4().to_string(); - let table_name = "test_elected_failed_greptime_metakv"; + let table_name = "test_elected_failed_greptime-metakv"; let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout) .await .unwrap(); @@ -1308,7 +1308,7 @@ mod tests { maybe_skip_mysql_integration_test!(); let leader_value = "test_leader".to_string(); let uuid = uuid::Uuid::new_v4().to_string(); - let table_name = "test_reelection_greptime_metakv"; + let table_name = "test_reelection_greptime-metakv"; let candidate_lease_ttl = Duration::from_secs(5); let meta_lease_ttl = Duration::from_secs(5); let execution_timeout = Duration::from_secs(10); @@ -1376,7 +1376,7 @@ mod tests { let execution_timeout = Duration::from_secs(10); let idle_session_timeout = Duration::from_secs(0); let uuid = uuid::Uuid::new_v4().to_string(); - let table_name = "test_elected_and_step_down_greptime_metakv"; + let table_name = "test_elected_and_step_down_greptime-metakv"; let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout) .await .unwrap(); @@ -1465,7 +1465,7 @@ mod tests { maybe_skip_mysql_integration_test!(); let leader_value = "test_leader".to_string(); let uuid = uuid::Uuid::new_v4().to_string(); - let table_name = "test_leader_action_greptime_metakv"; + let table_name = "test_leader_action_greptime-metakv"; let candidate_lease_ttl = Duration::from_secs(5); let meta_lease_ttl = Duration::from_secs(2); let execution_timeout = Duration::from_secs(10); @@ -1652,7 +1652,7 @@ mod tests { common_telemetry::init_default_ut_logging(); let leader_value = "test_leader".to_string(); let uuid = uuid::Uuid::new_v4().to_string(); - let table_name = "test_reset_campaign_greptime_metakv"; + let table_name = "test_reset_campaign_greptime-metakv"; let candidate_lease_ttl = Duration::from_secs(5); let meta_lease_ttl = Duration::from_secs(2); let execution_timeout = Duration::from_secs(10); @@ -1690,7 +1690,7 @@ mod tests { let execution_timeout = Duration::from_secs(10); let idle_session_timeout = Duration::from_secs(0); let uuid = uuid::Uuid::new_v4().to_string(); - let table_name = "test_follower_action_greptime_metakv"; + let table_name = "test_follower_action_greptime-metakv"; let follower_client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout)