From 9b5b6bacc055bec35b7ce1f15860dff071752f43 Mon Sep 17 00:00:00 2001 From: paomian Date: Thu, 29 May 2025 16:23:22 +0800 Subject: [PATCH] chore: fix rds kv backend test --- src/common/meta/src/kv_backend/rds/mysql.rs | 115 ++++++++++-------- .../meta/src/kv_backend/rds/postgres.rs | 112 +++++++++-------- src/meta-srv/src/election/mysql.rs | 72 +++++++++-- src/meta-srv/src/election/postgres.rs | 68 +++++++++-- 4 files changed, 235 insertions(+), 132 deletions(-) diff --git a/src/common/meta/src/kv_backend/rds/mysql.rs b/src/common/meta/src/kv_backend/rds/mysql.rs index c85c1278f7..c556d99ca0 100644 --- a/src/common/meta/src/kv_backend/rds/mysql.rs +++ b/src/common/meta/src/kv_backend/rds/mysql.rs @@ -609,95 +609,102 @@ mod tests { #[tokio::test] async fn test_mysql_put() { - let kv_backend = build_mysql_kv_backend("put_test").await.unwrap(); - let prefix = b"put/"; - prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; - test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await; - unprepare_kv(&kv_backend, prefix).await; + if let Some(kv_backend) = build_mysql_kv_backend("put_test").await { + let prefix = b"put/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; + } } #[tokio::test] async fn test_mysql_range() { - let kv_backend = build_mysql_kv_backend("range_test").await.unwrap(); - let prefix = b"range/"; - prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; - test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await; - unprepare_kv(&kv_backend, prefix).await; + if let Some(kv_backend) = build_mysql_kv_backend("range_test").await { + let prefix = b"range/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; + } } #[tokio::test] async fn test_mysql_range_2() { - let kv_backend = build_mysql_kv_backend("range2_test").await.unwrap(); - let prefix = b"range2/"; - test_kv_range_2_with_prefix(&kv_backend, prefix.to_vec()).await; - unprepare_kv(&kv_backend, prefix).await; + if let Some(kv_backend) = build_mysql_kv_backend("range2_test").await { + let prefix = b"range2/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_range_2_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; + } } #[tokio::test] async fn test_mysql_all_range() { - let kv_backend = build_mysql_kv_backend("simple_range_test").await.unwrap(); - let prefix = b""; - prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; - test_simple_kv_range(&kv_backend).await; - unprepare_kv(&kv_backend, prefix).await; + if let Some(kv_backend) = build_mysql_kv_backend("all_range_test").await { + let prefix = b""; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_simple_kv_range(&kv_backend).await; + unprepare_kv(&kv_backend, prefix).await; + } } #[tokio::test] async fn test_mysql_batch_get() { - let kv_backend = build_mysql_kv_backend("batch_get_test").await.unwrap(); - let prefix = b"batch_get/"; - prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; - test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await; - unprepare_kv(&kv_backend, prefix).await; + if let Some(kv_backend) = build_mysql_kv_backend("batch_get_test").await { + let prefix = b"batch_get/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; + } } #[tokio::test] async fn test_mysql_batch_delete() { - let kv_backend = build_mysql_kv_backend("batch_delete_test").await.unwrap(); - let prefix = b"batch_delete/"; - prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; - test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await; - unprepare_kv(&kv_backend, prefix).await; + if let Some(kv_backend) = build_mysql_kv_backend("batch_delete_test").await { + let prefix = b"batch_delete/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; + } } #[tokio::test] async fn test_mysql_batch_delete_with_prefix() { - let kv_backend = build_mysql_kv_backend("batch_delete_with_prefix_test") - .await - .unwrap(); - let prefix = b"batch_delete/"; - prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; - test_kv_batch_delete_with_prefix(&kv_backend, prefix.to_vec()).await; - unprepare_kv(&kv_backend, prefix).await; + if let Some(kv_backend) = build_mysql_kv_backend("batch_delete_with_prefix_test").await { + let prefix = b"batch_delete/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_batch_delete_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; + } } #[tokio::test] async fn test_mysql_delete_range() { - let kv_backend = build_mysql_kv_backend("delete_range_test").await.unwrap(); - let prefix = b"delete_range/"; - prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; - test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await; - unprepare_kv(&kv_backend, prefix).await; + if let Some(kv_backend) = build_mysql_kv_backend("delete_range_test").await { + let prefix = b"delete_range/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; + } } #[tokio::test] async fn test_mysql_compare_and_put() { - let kv_backend = build_mysql_kv_backend("compare_and_put_test") - .await - .unwrap(); - let prefix = b"compare_and_put/"; - let kv_backend = Arc::new(kv_backend); - test_kv_compare_and_put_with_prefix(kv_backend.clone(), prefix.to_vec()).await; + if let Some(kv_backend) = build_mysql_kv_backend("compare_and_put_test").await { + let prefix = b"compare_and_put/"; + let kv_backend = Arc::new(kv_backend); + test_kv_compare_and_put_with_prefix(kv_backend.clone(), prefix.to_vec()).await; + } } #[tokio::test] async fn test_mysql_txn() { - let kv_backend = build_mysql_kv_backend("txn_test").await.unwrap(); - test_txn_one_compare_op(&kv_backend).await; - text_txn_multi_compare_op(&kv_backend).await; - test_txn_compare_equal(&kv_backend).await; - test_txn_compare_greater(&kv_backend).await; - test_txn_compare_less(&kv_backend).await; - test_txn_compare_not_equal(&kv_backend).await; + if let Some(kv_backend) = build_mysql_kv_backend("txn_test").await { + test_txn_one_compare_op(&kv_backend).await; + text_txn_multi_compare_op(&kv_backend).await; + test_txn_compare_equal(&kv_backend).await; + test_txn_compare_greater(&kv_backend).await; + test_txn_compare_less(&kv_backend).await; + test_txn_compare_not_equal(&kv_backend).await; + } } } diff --git a/src/common/meta/src/kv_backend/rds/postgres.rs b/src/common/meta/src/kv_backend/rds/postgres.rs index 0b96882694..098ebdb0a9 100644 --- a/src/common/meta/src/kv_backend/rds/postgres.rs +++ b/src/common/meta/src/kv_backend/rds/postgres.rs @@ -583,93 +583,101 @@ mod tests { #[tokio::test] async fn test_pg_put() { - let kv_backend = build_pg_kv_backend("put_test").await.unwrap(); - let prefix = b"put/"; - prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; - test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await; - unprepare_kv(&kv_backend, prefix).await; + if let Some(kv_backend) = build_pg_kv_backend("put_test").await { + let prefix = b"put/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; + } } #[tokio::test] async fn test_pg_range() { - let kv_backend = build_pg_kv_backend("range_test").await.unwrap(); - let prefix = b"range/"; - prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; - test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await; - unprepare_kv(&kv_backend, prefix).await; + if let Some(kv_backend) = build_pg_kv_backend("range_test").await { + let prefix = b"range/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; + } } #[tokio::test] async fn test_pg_range_2() { - let kv_backend = build_pg_kv_backend("range2_test").await.unwrap(); - let prefix = b"range2/"; - test_kv_range_2_with_prefix(&kv_backend, prefix.to_vec()).await; - unprepare_kv(&kv_backend, prefix).await; + if let Some(kv_backend) = build_pg_kv_backend("range2_test").await { + let prefix = b"range2/"; + test_kv_range_2_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; + } } #[tokio::test] async fn test_pg_all_range() { - let kv_backend = build_pg_kv_backend("simple_range_test").await.unwrap(); - let prefix = b""; - prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; - test_simple_kv_range(&kv_backend).await; - unprepare_kv(&kv_backend, prefix).await; + if let Some(kv_backend) = build_pg_kv_backend("simple_range_test").await { + let prefix = b""; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_simple_kv_range(&kv_backend).await; + unprepare_kv(&kv_backend, prefix).await; + } } #[tokio::test] async fn test_pg_batch_get() { - let kv_backend = build_pg_kv_backend("batch_get_test").await.unwrap(); - let prefix = b"batch_get/"; - prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; - test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await; - unprepare_kv(&kv_backend, prefix).await; + if let Some(kv_backend) = build_pg_kv_backend("batch_get_test").await { + let prefix = b"batch_get/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; + } } #[tokio::test] async fn test_pg_batch_delete() { - let kv_backend = build_pg_kv_backend("batch_delete_test").await.unwrap(); - let prefix = b"batch_delete/"; - prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; - test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await; - unprepare_kv(&kv_backend, prefix).await; + if let Some(kv_backend) = build_pg_kv_backend("batch_delete_test").await { + let prefix = b"batch_delete/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; + } } #[tokio::test] async fn test_pg_batch_delete_with_prefix() { - let kv_backend = build_pg_kv_backend("batch_delete_with_prefix_test") - .await - .unwrap(); - let prefix = b"batch_delete/"; - prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; - test_kv_batch_delete_with_prefix(&kv_backend, prefix.to_vec()).await; - unprepare_kv(&kv_backend, prefix).await; + if let Some(kv_backend) = build_pg_kv_backend("batch_delete_with_prefix_test").await { + let prefix = b"batch_delete/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_batch_delete_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; + } } #[tokio::test] async fn test_pg_delete_range() { - let kv_backend = build_pg_kv_backend("delete_range_test").await.unwrap(); - let prefix = b"delete_range/"; - prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; - test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await; - unprepare_kv(&kv_backend, prefix).await; + if let Some(kv_backend) = build_pg_kv_backend("delete_range_test").await { + let prefix = b"delete_range/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; + } } #[tokio::test] async fn test_pg_compare_and_put() { - let kv_backend = build_pg_kv_backend("compare_and_put_test").await.unwrap(); - let prefix = b"compare_and_put/"; - let kv_backend = Arc::new(kv_backend); - test_kv_compare_and_put_with_prefix(kv_backend.clone(), prefix.to_vec()).await; + if let Some(kv_backend) = build_pg_kv_backend("compare_and_put_test").await { + let prefix = b"compare_and_put/"; + let kv_backend = Arc::new(kv_backend); + test_kv_compare_and_put_with_prefix(kv_backend.clone(), prefix.to_vec()).await; + } } #[tokio::test] async fn test_pg_txn() { - let kv_backend = build_pg_kv_backend("txn_test").await.unwrap(); - test_txn_one_compare_op(&kv_backend).await; - text_txn_multi_compare_op(&kv_backend).await; - test_txn_compare_equal(&kv_backend).await; - test_txn_compare_greater(&kv_backend).await; - test_txn_compare_less(&kv_backend).await; - test_txn_compare_not_equal(&kv_backend).await; + if let Some(kv_backend) = build_pg_kv_backend("txn_test").await { + test_txn_one_compare_op(&kv_backend).await; + text_txn_multi_compare_op(&kv_backend).await; + test_txn_compare_equal(&kv_backend).await; + test_txn_compare_greater(&kv_backend).await; + test_txn_compare_less(&kv_backend).await; + test_txn_compare_not_equal(&kv_backend).await; + } } } diff --git a/src/meta-srv/src/election/mysql.rs b/src/meta-srv/src/election/mysql.rs index b25a376b97..d8155864f9 100644 --- a/src/meta-srv/src/election/mysql.rs +++ b/src/meta-srv/src/election/mysql.rs @@ -848,14 +848,13 @@ mod tests { use super::*; use crate::error::MySqlExecutionSnafu; - async fn create_mysql_client(table_name: Option<&str>) -> Result> { + async fn create_mysql_client( + table_name: Option<&str>, + ) -> Result>> { init_default_ut_logging(); let endpoint = env::var("GT_MYSQL_ENDPOINTS").unwrap_or_default(); if endpoint.is_empty() { - return UnexpectedSnafu { - violated: "MySQL endpoint is empty".to_string(), - } - .fail(); + return Ok(None); } let mut client = MySqlConnection::connect(&endpoint).await.unwrap(); if let Some(table_name) = table_name { @@ -870,7 +869,7 @@ mod tests { sql: create_table_sql, })?; } - Ok(Mutex::new(client)) + Ok(Some(Mutex::new(client))) } async fn drop_table(client: &Mutex, table_name: &str) { @@ -890,7 +889,14 @@ mod tests { let uuid = uuid::Uuid::new_v4().to_string(); let table_name = "test_mysql_crud_greptime_metakv"; - let client = create_mysql_client(Some(table_name)).await.unwrap(); + let client_result = create_mysql_client(Some(table_name)).await.unwrap(); + let client: Mutex; + if let Some(c) = client_result { + client = c; + } else { + // If the client creation failed, skip the test. + return; + } { let mut a = client.lock().await; @@ -988,7 +994,14 @@ mod tests { store_key_prefix: String, table_name: String, ) { - let client = create_mysql_client(None).await.unwrap(); + let client_result = create_mysql_client(None).await.unwrap(); + let client: Mutex; + if let Some(c) = client_result { + client = c; + } else { + // If the client creation failed, skip the test. + return; + } let (tx, _) = broadcast::channel(100); let mysql_election = MySqlElection { @@ -1019,7 +1032,14 @@ mod tests { let uuid = uuid::Uuid::new_v4().to_string(); let table_name = "test_candidate_registration_greptime_metakv"; let mut handles = vec![]; - let client = create_mysql_client(Some(table_name)).await.unwrap(); + + let client_result = create_mysql_client(Some(table_name)).await.unwrap(); + let client: Mutex; + if let Some(c) = client_result { + client = c; + } else { + return; + } for i in 0..10 { let leader_value = format!("{}{}", leader_value_prefix, i); @@ -1103,7 +1123,14 @@ mod tests { let candidate_lease_ttl_secs = 1; let uuid = uuid::Uuid::new_v4().to_string(); let table_name = "test_elected_and_step_down_greptime_metakv"; - let client = create_mysql_client(Some(table_name)).await.unwrap(); + + let client_result = create_mysql_client(Some(table_name)).await.unwrap(); + let client: Mutex; + if let Some(c) = client_result { + client = c; + } else { + return; + } let (tx, mut rx) = broadcast::channel(100); let leader_mysql_election = MySqlElection { @@ -1187,7 +1214,14 @@ mod tests { let table_name = "test_leader_action_greptime_metakv"; let candidate_lease_ttl_secs = 5; let meta_lease_ttl_secs = 1; - let client = create_mysql_client(Some(table_name)).await.unwrap(); + + let client_result = create_mysql_client(Some(table_name)).await.unwrap(); + let client: Mutex; + if let Some(c) = client_result { + client = c; + } else { + return; + } let (tx, mut rx) = broadcast::channel(100); let leader_mysql_election = MySqlElection { @@ -1369,7 +1403,13 @@ mod tests { let uuid = uuid::Uuid::new_v4().to_string(); let table_name = "test_follower_action_greptime_metakv"; - let follower_client = create_mysql_client(Some(table_name)).await.unwrap(); + let client_result = create_mysql_client(Some(table_name)).await.unwrap(); + let follower_client: Mutex; + if let Some(c) = client_result { + follower_client = c; + } else { + return; + } let (tx, mut rx) = broadcast::channel(100); let follower_mysql_election = MySqlElection { leader_value: "test_follower".to_string(), @@ -1383,7 +1423,13 @@ mod tests { sql_set: ElectionSqlFactory::new(table_name, 1).build(), }; - let leader_client = create_mysql_client(Some(table_name)).await.unwrap(); + let client_result = create_mysql_client(Some(table_name)).await.unwrap(); + let leader_client: Mutex; + if let Some(c) = client_result { + leader_client = c; + } else { + return; + } let (tx, _) = broadcast::channel(100); let leader_mysql_election = MySqlElection { leader_value: "test_leader".to_string(), diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index efe3f15cb0..d2e0c5828d 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -741,13 +741,10 @@ mod tests { use super::*; use crate::error::PostgresExecutionSnafu; - async fn create_postgres_client(table_name: Option<&str>) -> Result { + async fn create_postgres_client(table_name: Option<&str>) -> Result> { let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default(); if endpoint.is_empty() { - return UnexpectedSnafu { - violated: "Postgres endpoint is empty".to_string(), - } - .fail(); + return Ok(None); } let (client, connection) = tokio_postgres::connect(&endpoint, NoTls) .await @@ -762,7 +759,7 @@ mod tests { ); client.execute(&create_table_sql, &[]).await.unwrap(); } - Ok(client) + Ok(Some(client)) } async fn drop_table(client: &Client, table_name: &str) { @@ -777,7 +774,13 @@ mod tests { let uuid = uuid::Uuid::new_v4().to_string(); let table_name = "test_postgres_crud_greptime_metakv"; - let client = create_postgres_client(Some(table_name)).await.unwrap(); + let client_result = create_postgres_client(Some(table_name)).await.unwrap(); + let client; + if let Some(c) = client_result { + client = c; + } else { + return; + } let (tx, _) = broadcast::channel(100); let pg_election = PgElection { @@ -855,7 +858,13 @@ mod tests { store_key_prefix: String, table_name: String, ) { - let client = create_postgres_client(None).await.unwrap(); + let client_result = create_postgres_client(None).await.unwrap(); + let client; + if let Some(c) = client_result { + client = c; + } else { + return; + } let (tx, _) = broadcast::channel(100); let pg_election = PgElection { @@ -886,7 +895,14 @@ mod tests { let uuid = uuid::Uuid::new_v4().to_string(); let table_name = "test_candidate_registration_greptime_metakv"; let mut handles = vec![]; - let client = create_postgres_client(Some(table_name)).await.unwrap(); + + let client_result = create_postgres_client(Some(table_name)).await.unwrap(); + let client; + if let Some(c) = client_result { + client = c; + } else { + return; + } for i in 0..10 { let leader_value = format!("{}{}", leader_value_prefix, i); @@ -943,7 +959,14 @@ mod tests { let candidate_lease_ttl_secs = 5; let uuid = uuid::Uuid::new_v4().to_string(); let table_name = "test_elected_and_step_down_greptime_metakv"; - let client = create_postgres_client(Some(table_name)).await.unwrap(); + + let client_result = create_postgres_client(Some(table_name)).await.unwrap(); + let client; + if let Some(c) = client_result { + client = c; + } else { + return; + } let (tx, mut rx) = broadcast::channel(100); let leader_pg_election = PgElection { @@ -1056,7 +1079,14 @@ mod tests { let uuid = uuid::Uuid::new_v4().to_string(); let table_name = "test_leader_action_greptime_metakv"; let candidate_lease_ttl_secs = 5; - let client = create_postgres_client(Some(table_name)).await.unwrap(); + + let client_result = create_postgres_client(Some(table_name)).await.unwrap(); + let client; + if let Some(c) = client_result { + client = c; + } else { + return; + } let (tx, mut rx) = broadcast::channel(100); let leader_pg_election = PgElection { @@ -1292,7 +1322,13 @@ mod tests { let uuid = uuid::Uuid::new_v4().to_string(); let table_name = "test_follower_action_greptime_metakv"; - let follower_client = create_postgres_client(Some(table_name)).await.unwrap(); + let client_result = create_postgres_client(Some(table_name)).await.unwrap(); + let follower_client; + if let Some(c) = client_result { + follower_client = c; + } else { + return; + } let (tx, mut rx) = broadcast::channel(100); let follower_pg_election = PgElection { leader_value: "test_follower".to_string(), @@ -1306,7 +1342,13 @@ mod tests { sql_set: ElectionSqlFactory::new(28322, table_name, 2).build(), }; - let leader_client = create_postgres_client(Some(table_name)).await.unwrap(); + let client_result = create_postgres_client(Some(table_name)).await.unwrap(); + let leader_client; + if let Some(c) = client_result { + leader_client = c; + } else { + return; + } let (tx, _) = broadcast::channel(100); let leader_pg_election = PgElection { leader_value: "test_leader".to_string(),