diff --git a/tests-fuzz/targets/ddl/fuzz_repartition_table_chaos.rs b/tests-fuzz/targets/ddl/fuzz_repartition_table_chaos.rs index a41d4175ec..c1e570f698 100644 --- a/tests-fuzz/targets/ddl/fuzz_repartition_table_chaos.rs +++ b/tests-fuzz/targets/ddl/fuzz_repartition_table_chaos.rs @@ -55,6 +55,7 @@ use tests_fuzz::utils::network_chaos::{ inject_datanode_metasrv_network_partition, recover_network_chaos, }; use tests_fuzz::utils::procedure::procedure_state as fetch_procedure_state_json; +use tests_fuzz::utils::retry::retry_with_backoff; use tests_fuzz::utils::{ Connections, GT_FUZZ_CLUSTER_NAME, GT_FUZZ_CLUSTER_NAMESPACE, get_fuzz_override, get_gt_fuzz_input_max_rows, init_greptime_connections_via_env, @@ -72,6 +73,9 @@ struct FuzzContext { const PROCEDURE_TIMEOUT: Duration = Duration::from_secs(300); const NETWORK_CHAOS_DURATION_SECS: usize = 360; +const ROW_VALIDATION_MAX_ATTEMPTS: usize = 3; +const ROW_VALIDATION_INITIAL_BACKOFF: Duration = Duration::from_millis(500); +const ROW_VALIDATION_MAX_BACKOFF: Duration = Duration::from_secs(2); impl FuzzContext { async fn close(self) { @@ -232,8 +236,23 @@ async fn validate_table_rows( let count_sql = format!("SELECT COUNT(1) AS count FROM {}", table_ctx.name); let counts = count_values_all(&ctx.greptime, &count_sql).await?; info!("Validate table row count: sql={count_sql}, expected={inserted_rows}, counts={counts:?}"); - assert_eq!(counts.len(), 1, "count query must return exactly one row"); - assert_eq!(counts[0].count as u64, inserted_rows); + if counts.len() != 1 { + return error::AssertSnafu { + reason: format!( + "count query must return exactly one row, expected={inserted_rows}, counts={counts:?}" + ), + } + .fail(); + } + if counts[0].count as u64 != inserted_rows { + return error::AssertSnafu { + reason: format!( + "count mismatch, expected={inserted_rows}, actual={}, counts={counts:?}", + counts[0].count + ), + } + .fail(); + } let timestamp_column_name = table_ctx.timestamp_column().unwrap().name.clone(); let distinct_count_sql = format!( @@ -244,12 +263,23 @@ async fn validate_table_rows( info!( "Validate table distinct row count: sql={distinct_count_sql}, expected={inserted_rows}, counts={distinct_counts:?}" ); - assert_eq!( - distinct_counts.len(), - 1, - "distinct count query must return exactly one row" - ); - assert_eq!(distinct_counts[0].count as u64, inserted_rows); + if distinct_counts.len() != 1 { + return error::AssertSnafu { + reason: format!( + "distinct count query must return exactly one row, expected={inserted_rows}, counts={distinct_counts:?}" + ), + } + .fail(); + } + if distinct_counts[0].count as u64 != inserted_rows { + return error::AssertSnafu { + reason: format!( + "distinct count mismatch, expected={inserted_rows}, actual={}, counts={distinct_counts:?}", + distinct_counts[0].count + ), + } + .fail(); + } Ok(()) } @@ -482,7 +512,17 @@ async fn execute_repartition_chaos(ctx: FuzzContext, input: FuzzInput) -> Result &after_table_ctx, ) .await?; - validate_table_rows(&ctx, &before_table_ctx, inserted_rows).await?; + // Wait for 1 second to ensure the frontend cache is invalidated after procedure completion. + tokio::time::sleep(Duration::from_secs(1)).await; + // Repartition completion and frontend catalog cache invalidation are not synchronized. + // Retry until the query path observes a consistent view of table routes and partition metadata. + retry_with_backoff( + || validate_table_rows(&ctx, &before_table_ctx, inserted_rows), + ROW_VALIDATION_MAX_ATTEMPTS, + ROW_VALIDATION_INITIAL_BACKOFF, + ROW_VALIDATION_MAX_BACKOFF, + ) + .await?; drop_table(&ctx, &before_table_ctx.name).await?; ctx.close().await;