mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-07-03 20:40:37 +00:00
fix(ci): retry repartition chaos row validation (#8271)
fix: retry repartition chaos row validation Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
@@ -55,6 +55,7 @@ use tests_fuzz::utils::network_chaos::{
|
||||
inject_datanode_metasrv_network_partition, recover_network_chaos,
|
||||
};
|
||||
use tests_fuzz::utils::procedure::procedure_state as fetch_procedure_state_json;
|
||||
use tests_fuzz::utils::retry::retry_with_backoff;
|
||||
use tests_fuzz::utils::{
|
||||
Connections, GT_FUZZ_CLUSTER_NAME, GT_FUZZ_CLUSTER_NAMESPACE, get_fuzz_override,
|
||||
get_gt_fuzz_input_max_rows, init_greptime_connections_via_env,
|
||||
@@ -72,6 +73,9 @@ struct FuzzContext {
|
||||
|
||||
const PROCEDURE_TIMEOUT: Duration = Duration::from_secs(300);
|
||||
const NETWORK_CHAOS_DURATION_SECS: usize = 360;
|
||||
const ROW_VALIDATION_MAX_ATTEMPTS: usize = 3;
|
||||
const ROW_VALIDATION_INITIAL_BACKOFF: Duration = Duration::from_millis(500);
|
||||
const ROW_VALIDATION_MAX_BACKOFF: Duration = Duration::from_secs(2);
|
||||
|
||||
impl FuzzContext {
|
||||
async fn close(self) {
|
||||
@@ -232,8 +236,23 @@ async fn validate_table_rows(
|
||||
let count_sql = format!("SELECT COUNT(1) AS count FROM {}", table_ctx.name);
|
||||
let counts = count_values_all(&ctx.greptime, &count_sql).await?;
|
||||
info!("Validate table row count: sql={count_sql}, expected={inserted_rows}, counts={counts:?}");
|
||||
assert_eq!(counts.len(), 1, "count query must return exactly one row");
|
||||
assert_eq!(counts[0].count as u64, inserted_rows);
|
||||
if counts.len() != 1 {
|
||||
return error::AssertSnafu {
|
||||
reason: format!(
|
||||
"count query must return exactly one row, expected={inserted_rows}, counts={counts:?}"
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
if counts[0].count as u64 != inserted_rows {
|
||||
return error::AssertSnafu {
|
||||
reason: format!(
|
||||
"count mismatch, expected={inserted_rows}, actual={}, counts={counts:?}",
|
||||
counts[0].count
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
|
||||
let timestamp_column_name = table_ctx.timestamp_column().unwrap().name.clone();
|
||||
let distinct_count_sql = format!(
|
||||
@@ -244,12 +263,23 @@ async fn validate_table_rows(
|
||||
info!(
|
||||
"Validate table distinct row count: sql={distinct_count_sql}, expected={inserted_rows}, counts={distinct_counts:?}"
|
||||
);
|
||||
assert_eq!(
|
||||
distinct_counts.len(),
|
||||
1,
|
||||
"distinct count query must return exactly one row"
|
||||
);
|
||||
assert_eq!(distinct_counts[0].count as u64, inserted_rows);
|
||||
if distinct_counts.len() != 1 {
|
||||
return error::AssertSnafu {
|
||||
reason: format!(
|
||||
"distinct count query must return exactly one row, expected={inserted_rows}, counts={distinct_counts:?}"
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
if distinct_counts[0].count as u64 != inserted_rows {
|
||||
return error::AssertSnafu {
|
||||
reason: format!(
|
||||
"distinct count mismatch, expected={inserted_rows}, actual={}, counts={distinct_counts:?}",
|
||||
distinct_counts[0].count
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -482,7 +512,17 @@ async fn execute_repartition_chaos(ctx: FuzzContext, input: FuzzInput) -> Result
|
||||
&after_table_ctx,
|
||||
)
|
||||
.await?;
|
||||
validate_table_rows(&ctx, &before_table_ctx, inserted_rows).await?;
|
||||
// Wait for 1 second to ensure the frontend cache is invalidated after procedure completion.
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
// Repartition completion and frontend catalog cache invalidation are not synchronized.
|
||||
// Retry until the query path observes a consistent view of table routes and partition metadata.
|
||||
retry_with_backoff(
|
||||
|| validate_table_rows(&ctx, &before_table_ctx, inserted_rows),
|
||||
ROW_VALIDATION_MAX_ATTEMPTS,
|
||||
ROW_VALIDATION_INITIAL_BACKOFF,
|
||||
ROW_VALIDATION_MAX_BACKOFF,
|
||||
)
|
||||
.await?;
|
||||
|
||||
drop_table(&ctx, &before_table_ctx.name).await?;
|
||||
ctx.close().await;
|
||||
|
||||
Reference in New Issue
Block a user