diff --git a/Cargo.lock b/Cargo.lock index b87a5d162f..c52b7cfec3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12336,6 +12336,7 @@ dependencies = [ "common-base", "common-error", "common-macro", + "common-meta", "common-query", "common-runtime", "common-telemetry", diff --git a/tests-fuzz/Cargo.toml b/tests-fuzz/Cargo.toml index c408992bd5..3b7b41e7a5 100644 --- a/tests-fuzz/Cargo.toml +++ b/tests-fuzz/Cargo.toml @@ -21,6 +21,7 @@ chrono = { workspace = true } common-base = { workspace = true } common-error = { workspace = true } common-macro = { workspace = true } +common-meta = { workspace = true } common-query = { workspace = true } common-runtime = { workspace = true } common-telemetry = { workspace = true } diff --git a/tests-fuzz/targets/failover/fuzz_failover_metric_regions.rs b/tests-fuzz/targets/failover/fuzz_failover_metric_regions.rs index 147c3ead1e..821a9cfe9e 100644 --- a/tests-fuzz/targets/failover/fuzz_failover_metric_regions.rs +++ b/tests-fuzz/targets/failover/fuzz_failover_metric_regions.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use std::time::Duration; use arbitrary::{Arbitrary, Unstructured}; +use common_meta::distributed_time_constants; use common_telemetry::info; use libfuzzer_sys::fuzz_target; use rand::{Rng, SeedableRng}; @@ -251,6 +252,10 @@ async fn execute_failover(ctx: FuzzContext, input: FuzzInput) -> Result<()> { recover_pod_failure(ctx.kube.clone(), &ctx.namespace, &chaos_name).await?; wait_for_all_datanode_online(ctx.greptime.clone(), Duration::from_secs(60)).await; + tokio::time::sleep(Duration::from_secs( + distributed_time_constants::REGION_LEASE_SECS, + )) + .await; // Validates value rows info!("Validates num of rows"); diff --git a/tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs b/tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs index f456550b3f..158ffed2c1 100644 --- a/tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs +++ b/tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use std::time::Duration; use arbitrary::{Arbitrary, Unstructured}; +use common_meta::distributed_time_constants; use common_telemetry::info; use common_time::util::current_time_millis; use futures::future::try_join_all; @@ -318,6 +319,10 @@ async fn execute_failover(ctx: FuzzContext, input: FuzzInput) -> Result<()> { recover_pod_failure(ctx.kube.clone(), &ctx.namespace, &chaos_name).await?; wait_for_all_datanode_online(ctx.greptime.clone(), Duration::from_secs(60)).await; + tokio::time::sleep(Duration::from_secs( + distributed_time_constants::REGION_LEASE_SECS, + )) + .await; // Validates value rows info!("Validates num of rows"); for (table_ctx, expected_rows) in table_ctxs.iter().zip(affected_rows) { diff --git a/tests-fuzz/targets/migration/fuzz_migrate_metric_regions.rs b/tests-fuzz/targets/migration/fuzz_migrate_metric_regions.rs index 5bcddea53a..e79b893b12 100644 --- a/tests-fuzz/targets/migration/fuzz_migrate_metric_regions.rs +++ b/tests-fuzz/targets/migration/fuzz_migrate_metric_regions.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use std::time::Duration; use arbitrary::{Arbitrary, Unstructured}; +use common_meta::distributed_time_constants; use common_telemetry::info; use libfuzzer_sys::fuzz_target; use rand::{Rng, SeedableRng}; @@ -252,6 +253,50 @@ async fn wait_for_migration(ctx: &FuzzContext, migration: &Migration, procedure_ .await; } +async fn migrate_regions(ctx: &FuzzContext, migrations: &[Migration]) -> Result<()> { + let mut procedure_ids = Vec::with_capacity(migrations.len()); + // Triggers region migrations + for Migration { + from_peer, + to_peer, + region_id, + } in migrations + { + let procedure_id = + migrate_region(&ctx.greptime, region_id.as_u64(), *from_peer, *to_peer, 120).await; + info!("Migrating region: {region_id} from {from_peer} to {to_peer}, procedure: {procedure_id}"); + procedure_ids.push(procedure_id); + } + for (migration, procedure_id) in migrations.iter().zip(procedure_ids) { + wait_for_migration(ctx, migration, &procedure_id).await; + } + + tokio::time::sleep(Duration::from_secs( + distributed_time_constants::REGION_LEASE_SECS, + )) + .await; + + Ok(()) +} + +async fn validate_rows( + ctx: &FuzzContext, + tables: &HashMap, +) -> Result<()> { + info!("Validates num of rows"); + for (table_ctx, insert_expr) in tables.values() { + let sql = format!("select count(1) as count from {}", table_ctx.name); + let values = count_values(&ctx.greptime, &sql).await?; + let expected_rows = insert_expr.values_list.len() as u64; + assert_eq!( + values.count as u64, expected_rows, + "Expected rows: {}, got: {}, table: {}", + expected_rows, values.count, table_ctx.name + ); + } + Ok(()) +} + async fn execute_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> { let mut rng = ChaCha20Rng::seed_from_u64(input.seed); // Creates a physical table. @@ -305,36 +350,11 @@ async fn execute_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> { } } - let mut procedure_ids = Vec::with_capacity(migrations.len()); - // Triggers region migrations - for Migration { - from_peer, - to_peer, - region_id, - } in &migrations - { - let procedure_id = - migrate_region(&ctx.greptime, region_id.as_u64(), *from_peer, *to_peer, 120).await; - info!("Migrating region: {region_id} from {from_peer} to {to_peer}, procedure: {procedure_id}"); - procedure_ids.push(procedure_id); - } info!("Excepted new region distribution: {new_distribution:?}"); - for (migration, procedure_id) in migrations.clone().into_iter().zip(procedure_ids) { - wait_for_migration(&ctx, &migration, &procedure_id).await; - } + migrate_regions(&ctx, &migrations).await?; // Validates value rows - info!("Validates num of rows"); - for (table_ctx, insert_expr) in tables.values() { - let sql = format!("select count(1) as count from {}", table_ctx.name); - let values = count_values(&ctx.greptime, &sql).await?; - let expected_rows = insert_expr.values_list.len() as u64; - assert_eq!( - values.count as u64, expected_rows, - "Expected rows: {}, got: {}, table: {}", - expected_rows, values.count, table_ctx.name - ); - } + validate_rows(&ctx, &tables).await?; // Creates more logical tables and inserts values create_logical_table_and_insert_values( @@ -348,17 +368,7 @@ async fn execute_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> { .await?; // Validates value rows - info!("Validates num of rows"); - for (table_ctx, insert_expr) in tables.values() { - let sql = format!("select count(1) as count from {}", table_ctx.name); - let values = count_values(&ctx.greptime, &sql).await?; - let expected_rows = insert_expr.values_list.len() as u64; - assert_eq!( - values.count as u64, expected_rows, - "Expected rows: {}, got: {}, table: {}", - expected_rows, values.count, table_ctx.name - ); - } + validate_rows(&ctx, &tables).await?; // Recovers region distribution let migrations = migrations @@ -376,23 +386,7 @@ async fn execute_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> { ) .collect::>(); - let mut procedure_ids = Vec::with_capacity(migrations.len()); - // Triggers region migrations - for Migration { - from_peer, - to_peer, - region_id, - } in &migrations - { - let procedure_id = - migrate_region(&ctx.greptime, region_id.as_u64(), *from_peer, *to_peer, 120).await; - info!("Migrating region: {region_id} from {from_peer} to {to_peer}, procedure: {procedure_id}"); - procedure_ids.push(procedure_id); - } - info!("Excepted new region distribution: {new_distribution:?}"); - for (migration, procedure_id) in migrations.clone().into_iter().zip(procedure_ids) { - wait_for_migration(&ctx, &migration, &procedure_id).await; - } + migrate_regions(&ctx, &migrations).await?; // Creates more logical tables and inserts values create_logical_table_and_insert_values( @@ -406,17 +400,7 @@ async fn execute_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> { .await?; // Validates value rows - info!("Validates num of rows"); - for (table_ctx, insert_expr) in tables.values() { - let sql = format!("select count(1) as count from {}", table_ctx.name); - let values = count_values(&ctx.greptime, &sql).await?; - let expected_rows = insert_expr.values_list.len() as u64; - assert_eq!( - values.count as u64, expected_rows, - "Expected rows: {}, got: {}, table: {}", - expected_rows, values.count, table_ctx.name - ); - } + validate_rows(&ctx, &tables).await?; // Clean up for (table_ctx, _) in tables.values() { diff --git a/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs b/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs index 12c4cdae49..b1a9c735ed 100644 --- a/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs +++ b/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use std::time::Duration; use arbitrary::{Arbitrary, Unstructured}; +use common_meta::distributed_time_constants; use common_telemetry::info; use libfuzzer_sys::fuzz_target; use rand::{Rng, SeedableRng}; @@ -261,6 +262,11 @@ async fn migrate_regions(ctx: &FuzzContext, migrations: &[Migration]) -> Result< .await; } + tokio::time::sleep(Duration::from_secs( + distributed_time_constants::REGION_LEASE_SECS, + )) + .await; + Ok(()) }