diff --git a/tests-fuzz/src/utils/partition.rs b/tests-fuzz/src/utils/partition.rs index db55aec7c1..26b95e68f9 100644 --- a/tests-fuzz/src/utils/partition.rs +++ b/tests-fuzz/src/utils/partition.rs @@ -55,6 +55,25 @@ where .remove(0)) } +/// Returns the [Partition] of the specific `region_id` +pub async fn fetch_partition<'a, DB, E>(e: E, region_id: u64) -> Result +where + DB: Database, + >::Arguments: IntoArguments<'a, DB>, + for<'c> E: 'a + Executor<'c, Database = DB>, + for<'c> u64: Decode<'c, DB> + Type, + for<'c> String: Decode<'c, DB> + Type, + for<'c> u64: Encode<'c, DB> + Type, + for<'c> &'c str: ColumnIndex<::Row>, +{ + let sql = "select region_id, peer_id as datanode_id from information_schema.region_peers where region_id = ?;"; + sqlx::query_as::<_, Partition>(sql) + .bind(region_id) + .fetch_one(e) + .await + .context(error::ExecuteQuerySnafu { sql }) +} + /// Returns all [Partition] of the specific `table` pub async fn fetch_partitions<'a, DB, E>(e: E, table_name: Ident) -> Result> where diff --git a/tests-fuzz/src/utils/procedure.rs b/tests-fuzz/src/utils/procedure.rs index 3eab6fd9eb..0b5e50adb9 100644 --- a/tests-fuzz/src/utils/procedure.rs +++ b/tests-fuzz/src/utils/procedure.rs @@ -22,7 +22,7 @@ use super::wait::wait_condition_fn; use crate::error; /// Fetches the state of a procedure. -pub async fn procedure_state(e: &Pool, procedure_id: String) -> String { +pub async fn procedure_state(e: &Pool, procedure_id: &str) -> String { let sql = format!("select procedure_state(\"{procedure_id}\");"); let result = sqlx::query(&sql) .fetch_one(e) @@ -43,7 +43,7 @@ pub async fn wait_for_procedure_finish( || { let greptime = greptime.clone(); let procedure_id = procedure_id.clone(); - Box::pin(async move { procedure_state(&greptime, procedure_id).await }) + Box::pin(async move { procedure_state(&greptime, &procedure_id).await }) }, |output| { info!("Procedure({procedure_id}) state: {:?}", output); diff --git a/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs b/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs index eaad2f87b1..bbc4b7297d 100644 --- a/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs +++ b/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs @@ -43,9 +43,10 @@ use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator; use tests_fuzz::translator::mysql::insert_expr::InsertIntoExprTranslator; use tests_fuzz::translator::DslTranslator; use tests_fuzz::utils::cluster_info::{fetch_nodes, PEER_TYPE_DATANODE}; -use tests_fuzz::utils::migration::{migrate_region, wait_for_region_distribution}; -use tests_fuzz::utils::partition::{fetch_partitions, region_distribution}; -use tests_fuzz::utils::procedure::wait_for_procedure_finish; +use tests_fuzz::utils::migration::migrate_region; +use tests_fuzz::utils::partition::{fetch_partition, fetch_partitions, region_distribution}; +use tests_fuzz::utils::procedure::procedure_state; +use tests_fuzz::utils::wait::wait_condition_fn; use tests_fuzz::utils::{ compact_table, flush_memtable, init_greptime_connections_via_env, Connections, }; @@ -125,6 +126,7 @@ fn generate_insert_exprs( .collect::>>() } +#[derive(Debug)] struct Migration { from_peer: u64, to_peer: u64, @@ -222,19 +224,31 @@ async fn execute_region_migration(ctx: FuzzContext, input: FuzzInput) -> Result< } info!("Excepted new region distribution: {new_distribution:?}"); - for procedure_id in procedure_ids { - wait_for_procedure_finish(&ctx.greptime, Duration::from_secs(120), procedure_id).await; + for (migration, procedure_id) in migrations.into_iter().zip(procedure_ids) { + info!("Waits for migration: {migration:?}"); + let region_id = migration.region_id.as_u64(); + wait_condition_fn( + Duration::from_secs(120), + || { + let greptime = ctx.greptime.clone(); + let procedure_id = procedure_id.to_string(); + Box::pin(async move { + { + let output = procedure_state(&greptime, &procedure_id).await; + info!("Checking procedure: {procedure_id}, output: {output}"); + fetch_partition(&greptime, region_id).await.unwrap() + } + }) + }, + |partition| { + info!("Region: {region_id}, datanode: {}", partition.datanode_id); + partition.datanode_id == migration.to_peer + }, + Duration::from_secs(5), + ) + .await; } - // Waits for all region migrated - wait_for_region_distribution( - &ctx.greptime, - Duration::from_secs(60), - table_ctx.name.clone(), - new_distribution, - ) - .await; - // Values validation info!("Validating rows"); let ts_column = table_ctx.timestamp_column().unwrap();