chore: fix rds kv backend test

This commit is contained in:
paomian
2025-05-29 16:23:22 +08:00
parent 9afc61f778
commit 9b5b6bacc0
4 changed files with 235 additions and 132 deletions

View File

@@ -609,95 +609,102 @@ mod tests {
#[tokio::test]
async fn test_mysql_put() {
let kv_backend = build_mysql_kv_backend("put_test").await.unwrap();
let prefix = b"put/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
if let Some(kv_backend) = build_mysql_kv_backend("put_test").await {
let prefix = b"put/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
}
}
#[tokio::test]
async fn test_mysql_range() {
let kv_backend = build_mysql_kv_backend("range_test").await.unwrap();
let prefix = b"range/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
if let Some(kv_backend) = build_mysql_kv_backend("range_test").await {
let prefix = b"range/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
}
}
#[tokio::test]
async fn test_mysql_range_2() {
let kv_backend = build_mysql_kv_backend("range2_test").await.unwrap();
let prefix = b"range2/";
test_kv_range_2_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
if let Some(kv_backend) = build_mysql_kv_backend("range2_test").await {
let prefix = b"range2/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_range_2_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
}
}
#[tokio::test]
async fn test_mysql_all_range() {
let kv_backend = build_mysql_kv_backend("simple_range_test").await.unwrap();
let prefix = b"";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_simple_kv_range(&kv_backend).await;
unprepare_kv(&kv_backend, prefix).await;
if let Some(kv_backend) = build_mysql_kv_backend("all_range_test").await {
let prefix = b"";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_simple_kv_range(&kv_backend).await;
unprepare_kv(&kv_backend, prefix).await;
}
}
#[tokio::test]
async fn test_mysql_batch_get() {
let kv_backend = build_mysql_kv_backend("batch_get_test").await.unwrap();
let prefix = b"batch_get/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
if let Some(kv_backend) = build_mysql_kv_backend("batch_get_test").await {
let prefix = b"batch_get/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
}
}
#[tokio::test]
async fn test_mysql_batch_delete() {
let kv_backend = build_mysql_kv_backend("batch_delete_test").await.unwrap();
let prefix = b"batch_delete/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
if let Some(kv_backend) = build_mysql_kv_backend("batch_delete_test").await {
let prefix = b"batch_delete/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
}
}
#[tokio::test]
async fn test_mysql_batch_delete_with_prefix() {
let kv_backend = build_mysql_kv_backend("batch_delete_with_prefix_test")
.await
.unwrap();
let prefix = b"batch_delete/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_batch_delete_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
if let Some(kv_backend) = build_mysql_kv_backend("batch_delete_with_prefix_test").await {
let prefix = b"batch_delete/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_batch_delete_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
}
}
#[tokio::test]
async fn test_mysql_delete_range() {
let kv_backend = build_mysql_kv_backend("delete_range_test").await.unwrap();
let prefix = b"delete_range/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
if let Some(kv_backend) = build_mysql_kv_backend("delete_range_test").await {
let prefix = b"delete_range/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
}
}
#[tokio::test]
async fn test_mysql_compare_and_put() {
let kv_backend = build_mysql_kv_backend("compare_and_put_test")
.await
.unwrap();
let prefix = b"compare_and_put/";
let kv_backend = Arc::new(kv_backend);
test_kv_compare_and_put_with_prefix(kv_backend.clone(), prefix.to_vec()).await;
if let Some(kv_backend) = build_mysql_kv_backend("compare_and_put_test").await {
let prefix = b"compare_and_put/";
let kv_backend = Arc::new(kv_backend);
test_kv_compare_and_put_with_prefix(kv_backend.clone(), prefix.to_vec()).await;
}
}
#[tokio::test]
async fn test_mysql_txn() {
let kv_backend = build_mysql_kv_backend("txn_test").await.unwrap();
test_txn_one_compare_op(&kv_backend).await;
text_txn_multi_compare_op(&kv_backend).await;
test_txn_compare_equal(&kv_backend).await;
test_txn_compare_greater(&kv_backend).await;
test_txn_compare_less(&kv_backend).await;
test_txn_compare_not_equal(&kv_backend).await;
if let Some(kv_backend) = build_mysql_kv_backend("txn_test").await {
test_txn_one_compare_op(&kv_backend).await;
text_txn_multi_compare_op(&kv_backend).await;
test_txn_compare_equal(&kv_backend).await;
test_txn_compare_greater(&kv_backend).await;
test_txn_compare_less(&kv_backend).await;
test_txn_compare_not_equal(&kv_backend).await;
}
}
}

View File

@@ -583,93 +583,101 @@ mod tests {
#[tokio::test]
async fn test_pg_put() {
let kv_backend = build_pg_kv_backend("put_test").await.unwrap();
let prefix = b"put/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
if let Some(kv_backend) = build_pg_kv_backend("put_test").await {
let prefix = b"put/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
}
}
#[tokio::test]
async fn test_pg_range() {
let kv_backend = build_pg_kv_backend("range_test").await.unwrap();
let prefix = b"range/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
if let Some(kv_backend) = build_pg_kv_backend("range_test").await {
let prefix = b"range/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
}
}
#[tokio::test]
async fn test_pg_range_2() {
let kv_backend = build_pg_kv_backend("range2_test").await.unwrap();
let prefix = b"range2/";
test_kv_range_2_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
if let Some(kv_backend) = build_pg_kv_backend("range2_test").await {
let prefix = b"range2/";
test_kv_range_2_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
}
}
#[tokio::test]
async fn test_pg_all_range() {
let kv_backend = build_pg_kv_backend("simple_range_test").await.unwrap();
let prefix = b"";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_simple_kv_range(&kv_backend).await;
unprepare_kv(&kv_backend, prefix).await;
if let Some(kv_backend) = build_pg_kv_backend("simple_range_test").await {
let prefix = b"";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_simple_kv_range(&kv_backend).await;
unprepare_kv(&kv_backend, prefix).await;
}
}
#[tokio::test]
async fn test_pg_batch_get() {
let kv_backend = build_pg_kv_backend("batch_get_test").await.unwrap();
let prefix = b"batch_get/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
if let Some(kv_backend) = build_pg_kv_backend("batch_get_test").await {
let prefix = b"batch_get/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
}
}
#[tokio::test]
async fn test_pg_batch_delete() {
let kv_backend = build_pg_kv_backend("batch_delete_test").await.unwrap();
let prefix = b"batch_delete/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
if let Some(kv_backend) = build_pg_kv_backend("batch_delete_test").await {
let prefix = b"batch_delete/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
}
}
#[tokio::test]
async fn test_pg_batch_delete_with_prefix() {
let kv_backend = build_pg_kv_backend("batch_delete_with_prefix_test")
.await
.unwrap();
let prefix = b"batch_delete/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_batch_delete_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
if let Some(kv_backend) = build_pg_kv_backend("batch_delete_with_prefix_test").await {
let prefix = b"batch_delete/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_batch_delete_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
}
}
#[tokio::test]
async fn test_pg_delete_range() {
let kv_backend = build_pg_kv_backend("delete_range_test").await.unwrap();
let prefix = b"delete_range/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
if let Some(kv_backend) = build_pg_kv_backend("delete_range_test").await {
let prefix = b"delete_range/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
}
}
#[tokio::test]
async fn test_pg_compare_and_put() {
let kv_backend = build_pg_kv_backend("compare_and_put_test").await.unwrap();
let prefix = b"compare_and_put/";
let kv_backend = Arc::new(kv_backend);
test_kv_compare_and_put_with_prefix(kv_backend.clone(), prefix.to_vec()).await;
if let Some(kv_backend) = build_pg_kv_backend("compare_and_put_test").await {
let prefix = b"compare_and_put/";
let kv_backend = Arc::new(kv_backend);
test_kv_compare_and_put_with_prefix(kv_backend.clone(), prefix.to_vec()).await;
}
}
#[tokio::test]
async fn test_pg_txn() {
let kv_backend = build_pg_kv_backend("txn_test").await.unwrap();
test_txn_one_compare_op(&kv_backend).await;
text_txn_multi_compare_op(&kv_backend).await;
test_txn_compare_equal(&kv_backend).await;
test_txn_compare_greater(&kv_backend).await;
test_txn_compare_less(&kv_backend).await;
test_txn_compare_not_equal(&kv_backend).await;
if let Some(kv_backend) = build_pg_kv_backend("txn_test").await {
test_txn_one_compare_op(&kv_backend).await;
text_txn_multi_compare_op(&kv_backend).await;
test_txn_compare_equal(&kv_backend).await;
test_txn_compare_greater(&kv_backend).await;
test_txn_compare_less(&kv_backend).await;
test_txn_compare_not_equal(&kv_backend).await;
}
}
}

View File

@@ -848,14 +848,13 @@ mod tests {
use super::*;
use crate::error::MySqlExecutionSnafu;
async fn create_mysql_client(table_name: Option<&str>) -> Result<Mutex<MySqlConnection>> {
async fn create_mysql_client(
table_name: Option<&str>,
) -> Result<Option<Mutex<MySqlConnection>>> {
init_default_ut_logging();
let endpoint = env::var("GT_MYSQL_ENDPOINTS").unwrap_or_default();
if endpoint.is_empty() {
return UnexpectedSnafu {
violated: "MySQL endpoint is empty".to_string(),
}
.fail();
return Ok(None);
}
let mut client = MySqlConnection::connect(&endpoint).await.unwrap();
if let Some(table_name) = table_name {
@@ -870,7 +869,7 @@ mod tests {
sql: create_table_sql,
})?;
}
Ok(Mutex::new(client))
Ok(Some(Mutex::new(client)))
}
async fn drop_table(client: &Mutex<MySqlConnection>, table_name: &str) {
@@ -890,7 +889,14 @@ mod tests {
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_mysql_crud_greptime_metakv";
let client = create_mysql_client(Some(table_name)).await.unwrap();
let client_result = create_mysql_client(Some(table_name)).await.unwrap();
let client: Mutex<MySqlConnection>;
if let Some(c) = client_result {
client = c;
} else {
// If the client creation failed, skip the test.
return;
}
{
let mut a = client.lock().await;
@@ -988,7 +994,14 @@ mod tests {
store_key_prefix: String,
table_name: String,
) {
let client = create_mysql_client(None).await.unwrap();
let client_result = create_mysql_client(None).await.unwrap();
let client: Mutex<MySqlConnection>;
if let Some(c) = client_result {
client = c;
} else {
// If the client creation failed, skip the test.
return;
}
let (tx, _) = broadcast::channel(100);
let mysql_election = MySqlElection {
@@ -1019,7 +1032,14 @@ mod tests {
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_candidate_registration_greptime_metakv";
let mut handles = vec![];
let client = create_mysql_client(Some(table_name)).await.unwrap();
let client_result = create_mysql_client(Some(table_name)).await.unwrap();
let client: Mutex<MySqlConnection>;
if let Some(c) = client_result {
client = c;
} else {
return;
}
for i in 0..10 {
let leader_value = format!("{}{}", leader_value_prefix, i);
@@ -1103,7 +1123,14 @@ mod tests {
let candidate_lease_ttl_secs = 1;
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_elected_and_step_down_greptime_metakv";
let client = create_mysql_client(Some(table_name)).await.unwrap();
let client_result = create_mysql_client(Some(table_name)).await.unwrap();
let client: Mutex<MySqlConnection>;
if let Some(c) = client_result {
client = c;
} else {
return;
}
let (tx, mut rx) = broadcast::channel(100);
let leader_mysql_election = MySqlElection {
@@ -1187,7 +1214,14 @@ mod tests {
let table_name = "test_leader_action_greptime_metakv";
let candidate_lease_ttl_secs = 5;
let meta_lease_ttl_secs = 1;
let client = create_mysql_client(Some(table_name)).await.unwrap();
let client_result = create_mysql_client(Some(table_name)).await.unwrap();
let client: Mutex<MySqlConnection>;
if let Some(c) = client_result {
client = c;
} else {
return;
}
let (tx, mut rx) = broadcast::channel(100);
let leader_mysql_election = MySqlElection {
@@ -1369,7 +1403,13 @@ mod tests {
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_follower_action_greptime_metakv";
let follower_client = create_mysql_client(Some(table_name)).await.unwrap();
let client_result = create_mysql_client(Some(table_name)).await.unwrap();
let follower_client: Mutex<MySqlConnection>;
if let Some(c) = client_result {
follower_client = c;
} else {
return;
}
let (tx, mut rx) = broadcast::channel(100);
let follower_mysql_election = MySqlElection {
leader_value: "test_follower".to_string(),
@@ -1383,7 +1423,13 @@ mod tests {
sql_set: ElectionSqlFactory::new(table_name, 1).build(),
};
let leader_client = create_mysql_client(Some(table_name)).await.unwrap();
let client_result = create_mysql_client(Some(table_name)).await.unwrap();
let leader_client: Mutex<MySqlConnection>;
if let Some(c) = client_result {
leader_client = c;
} else {
return;
}
let (tx, _) = broadcast::channel(100);
let leader_mysql_election = MySqlElection {
leader_value: "test_leader".to_string(),

View File

@@ -741,13 +741,10 @@ mod tests {
use super::*;
use crate::error::PostgresExecutionSnafu;
async fn create_postgres_client(table_name: Option<&str>) -> Result<Client> {
async fn create_postgres_client(table_name: Option<&str>) -> Result<Option<Client>> {
let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default();
if endpoint.is_empty() {
return UnexpectedSnafu {
violated: "Postgres endpoint is empty".to_string(),
}
.fail();
return Ok(None);
}
let (client, connection) = tokio_postgres::connect(&endpoint, NoTls)
.await
@@ -762,7 +759,7 @@ mod tests {
);
client.execute(&create_table_sql, &[]).await.unwrap();
}
Ok(client)
Ok(Some(client))
}
async fn drop_table(client: &Client, table_name: &str) {
@@ -777,7 +774,13 @@ mod tests {
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_postgres_crud_greptime_metakv";
let client = create_postgres_client(Some(table_name)).await.unwrap();
let client_result = create_postgres_client(Some(table_name)).await.unwrap();
let client;
if let Some(c) = client_result {
client = c;
} else {
return;
}
let (tx, _) = broadcast::channel(100);
let pg_election = PgElection {
@@ -855,7 +858,13 @@ mod tests {
store_key_prefix: String,
table_name: String,
) {
let client = create_postgres_client(None).await.unwrap();
let client_result = create_postgres_client(None).await.unwrap();
let client;
if let Some(c) = client_result {
client = c;
} else {
return;
}
let (tx, _) = broadcast::channel(100);
let pg_election = PgElection {
@@ -886,7 +895,14 @@ mod tests {
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_candidate_registration_greptime_metakv";
let mut handles = vec![];
let client = create_postgres_client(Some(table_name)).await.unwrap();
let client_result = create_postgres_client(Some(table_name)).await.unwrap();
let client;
if let Some(c) = client_result {
client = c;
} else {
return;
}
for i in 0..10 {
let leader_value = format!("{}{}", leader_value_prefix, i);
@@ -943,7 +959,14 @@ mod tests {
let candidate_lease_ttl_secs = 5;
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_elected_and_step_down_greptime_metakv";
let client = create_postgres_client(Some(table_name)).await.unwrap();
let client_result = create_postgres_client(Some(table_name)).await.unwrap();
let client;
if let Some(c) = client_result {
client = c;
} else {
return;
}
let (tx, mut rx) = broadcast::channel(100);
let leader_pg_election = PgElection {
@@ -1056,7 +1079,14 @@ mod tests {
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_leader_action_greptime_metakv";
let candidate_lease_ttl_secs = 5;
let client = create_postgres_client(Some(table_name)).await.unwrap();
let client_result = create_postgres_client(Some(table_name)).await.unwrap();
let client;
if let Some(c) = client_result {
client = c;
} else {
return;
}
let (tx, mut rx) = broadcast::channel(100);
let leader_pg_election = PgElection {
@@ -1292,7 +1322,13 @@ mod tests {
let uuid = uuid::Uuid::new_v4().to_string();
let table_name = "test_follower_action_greptime_metakv";
let follower_client = create_postgres_client(Some(table_name)).await.unwrap();
let client_result = create_postgres_client(Some(table_name)).await.unwrap();
let follower_client;
if let Some(c) = client_result {
follower_client = c;
} else {
return;
}
let (tx, mut rx) = broadcast::channel(100);
let follower_pg_election = PgElection {
leader_value: "test_follower".to_string(),
@@ -1306,7 +1342,13 @@ mod tests {
sql_set: ElectionSqlFactory::new(28322, table_name, 2).build(),
};
let leader_client = create_postgres_client(Some(table_name)).await.unwrap();
let client_result = create_postgres_client(Some(table_name)).await.unwrap();
let leader_client;
if let Some(c) = client_result {
leader_client = c;
} else {
return;
}
let (tx, _) = broadcast::channel(100);
let leader_pg_election = PgElection {
leader_value: "test_leader".to_string(),