fix(fuzz): ensure all regions leases are renewed (#5294)

* fix(fuzz): ensure all regions leases are renewed

* fix: fix clippy
This commit is contained in:
Weny Xu
2025-01-06 14:21:41 +08:00
committed by GitHub
parent 2ad50332cb
commit b229c94fba
6 changed files with 68 additions and 66 deletions

1
Cargo.lock generated
View File

@@ -12336,6 +12336,7 @@ dependencies = [
"common-base",
"common-error",
"common-macro",
"common-meta",
"common-query",
"common-runtime",
"common-telemetry",

View File

@@ -21,6 +21,7 @@ chrono = { workspace = true }
common-base = { workspace = true }
common-error = { workspace = true }
common-macro = { workspace = true }
common-meta = { workspace = true }
common-query = { workspace = true }
common-runtime = { workspace = true }
common-telemetry = { workspace = true }

View File

@@ -19,6 +19,7 @@ use std::sync::Arc;
use std::time::Duration;
use arbitrary::{Arbitrary, Unstructured};
use common_meta::distributed_time_constants;
use common_telemetry::info;
use libfuzzer_sys::fuzz_target;
use rand::{Rng, SeedableRng};
@@ -251,6 +252,10 @@ async fn execute_failover(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
recover_pod_failure(ctx.kube.clone(), &ctx.namespace, &chaos_name).await?;
wait_for_all_datanode_online(ctx.greptime.clone(), Duration::from_secs(60)).await;
tokio::time::sleep(Duration::from_secs(
distributed_time_constants::REGION_LEASE_SECS,
))
.await;
// Validates value rows
info!("Validates num of rows");

View File

@@ -19,6 +19,7 @@ use std::sync::Arc;
use std::time::Duration;
use arbitrary::{Arbitrary, Unstructured};
use common_meta::distributed_time_constants;
use common_telemetry::info;
use common_time::util::current_time_millis;
use futures::future::try_join_all;
@@ -318,6 +319,10 @@ async fn execute_failover(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
recover_pod_failure(ctx.kube.clone(), &ctx.namespace, &chaos_name).await?;
wait_for_all_datanode_online(ctx.greptime.clone(), Duration::from_secs(60)).await;
tokio::time::sleep(Duration::from_secs(
distributed_time_constants::REGION_LEASE_SECS,
))
.await;
// Validates value rows
info!("Validates num of rows");
for (table_ctx, expected_rows) in table_ctxs.iter().zip(affected_rows) {

View File

@@ -19,6 +19,7 @@ use std::sync::Arc;
use std::time::Duration;
use arbitrary::{Arbitrary, Unstructured};
use common_meta::distributed_time_constants;
use common_telemetry::info;
use libfuzzer_sys::fuzz_target;
use rand::{Rng, SeedableRng};
@@ -252,6 +253,50 @@ async fn wait_for_migration(ctx: &FuzzContext, migration: &Migration, procedure_
.await;
}
async fn migrate_regions(ctx: &FuzzContext, migrations: &[Migration]) -> Result<()> {
let mut procedure_ids = Vec::with_capacity(migrations.len());
// Triggers region migrations
for Migration {
from_peer,
to_peer,
region_id,
} in migrations
{
let procedure_id =
migrate_region(&ctx.greptime, region_id.as_u64(), *from_peer, *to_peer, 120).await;
info!("Migrating region: {region_id} from {from_peer} to {to_peer}, procedure: {procedure_id}");
procedure_ids.push(procedure_id);
}
for (migration, procedure_id) in migrations.iter().zip(procedure_ids) {
wait_for_migration(ctx, migration, &procedure_id).await;
}
tokio::time::sleep(Duration::from_secs(
distributed_time_constants::REGION_LEASE_SECS,
))
.await;
Ok(())
}
async fn validate_rows(
ctx: &FuzzContext,
tables: &HashMap<Ident, (TableContextRef, InsertIntoExpr)>,
) -> Result<()> {
info!("Validates num of rows");
for (table_ctx, insert_expr) in tables.values() {
let sql = format!("select count(1) as count from {}", table_ctx.name);
let values = count_values(&ctx.greptime, &sql).await?;
let expected_rows = insert_expr.values_list.len() as u64;
assert_eq!(
values.count as u64, expected_rows,
"Expected rows: {}, got: {}, table: {}",
expected_rows, values.count, table_ctx.name
);
}
Ok(())
}
async fn execute_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
let mut rng = ChaCha20Rng::seed_from_u64(input.seed);
// Creates a physical table.
@@ -305,36 +350,11 @@ async fn execute_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
}
}
let mut procedure_ids = Vec::with_capacity(migrations.len());
// Triggers region migrations
for Migration {
from_peer,
to_peer,
region_id,
} in &migrations
{
let procedure_id =
migrate_region(&ctx.greptime, region_id.as_u64(), *from_peer, *to_peer, 120).await;
info!("Migrating region: {region_id} from {from_peer} to {to_peer}, procedure: {procedure_id}");
procedure_ids.push(procedure_id);
}
info!("Excepted new region distribution: {new_distribution:?}");
for (migration, procedure_id) in migrations.clone().into_iter().zip(procedure_ids) {
wait_for_migration(&ctx, &migration, &procedure_id).await;
}
migrate_regions(&ctx, &migrations).await?;
// Validates value rows
info!("Validates num of rows");
for (table_ctx, insert_expr) in tables.values() {
let sql = format!("select count(1) as count from {}", table_ctx.name);
let values = count_values(&ctx.greptime, &sql).await?;
let expected_rows = insert_expr.values_list.len() as u64;
assert_eq!(
values.count as u64, expected_rows,
"Expected rows: {}, got: {}, table: {}",
expected_rows, values.count, table_ctx.name
);
}
validate_rows(&ctx, &tables).await?;
// Creates more logical tables and inserts values
create_logical_table_and_insert_values(
@@ -348,17 +368,7 @@ async fn execute_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
.await?;
// Validates value rows
info!("Validates num of rows");
for (table_ctx, insert_expr) in tables.values() {
let sql = format!("select count(1) as count from {}", table_ctx.name);
let values = count_values(&ctx.greptime, &sql).await?;
let expected_rows = insert_expr.values_list.len() as u64;
assert_eq!(
values.count as u64, expected_rows,
"Expected rows: {}, got: {}, table: {}",
expected_rows, values.count, table_ctx.name
);
}
validate_rows(&ctx, &tables).await?;
// Recovers region distribution
let migrations = migrations
@@ -376,23 +386,7 @@ async fn execute_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
)
.collect::<Vec<_>>();
let mut procedure_ids = Vec::with_capacity(migrations.len());
// Triggers region migrations
for Migration {
from_peer,
to_peer,
region_id,
} in &migrations
{
let procedure_id =
migrate_region(&ctx.greptime, region_id.as_u64(), *from_peer, *to_peer, 120).await;
info!("Migrating region: {region_id} from {from_peer} to {to_peer}, procedure: {procedure_id}");
procedure_ids.push(procedure_id);
}
info!("Excepted new region distribution: {new_distribution:?}");
for (migration, procedure_id) in migrations.clone().into_iter().zip(procedure_ids) {
wait_for_migration(&ctx, &migration, &procedure_id).await;
}
migrate_regions(&ctx, &migrations).await?;
// Creates more logical tables and inserts values
create_logical_table_and_insert_values(
@@ -406,17 +400,7 @@ async fn execute_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
.await?;
// Validates value rows
info!("Validates num of rows");
for (table_ctx, insert_expr) in tables.values() {
let sql = format!("select count(1) as count from {}", table_ctx.name);
let values = count_values(&ctx.greptime, &sql).await?;
let expected_rows = insert_expr.values_list.len() as u64;
assert_eq!(
values.count as u64, expected_rows,
"Expected rows: {}, got: {}, table: {}",
expected_rows, values.count, table_ctx.name
);
}
validate_rows(&ctx, &tables).await?;
// Clean up
for (table_ctx, _) in tables.values() {

View File

@@ -19,6 +19,7 @@ use std::sync::Arc;
use std::time::Duration;
use arbitrary::{Arbitrary, Unstructured};
use common_meta::distributed_time_constants;
use common_telemetry::info;
use libfuzzer_sys::fuzz_target;
use rand::{Rng, SeedableRng};
@@ -261,6 +262,11 @@ async fn migrate_regions(ctx: &FuzzContext, migrations: &[Migration]) -> Result<
.await;
}
tokio::time::sleep(Duration::from_secs(
distributed_time_constants::REGION_LEASE_SECS,
))
.await;
Ok(())
}