mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
feat(fuzz): enhance condition check of region migration finish (#4283)
This commit is contained in:
@@ -55,6 +55,25 @@ where
|
||||
.remove(0))
|
||||
}
|
||||
|
||||
/// Returns the [Partition] of the specific `region_id`
|
||||
pub async fn fetch_partition<'a, DB, E>(e: E, region_id: u64) -> Result<Partition>
|
||||
where
|
||||
DB: Database,
|
||||
<DB as HasArguments<'a>>::Arguments: IntoArguments<'a, DB>,
|
||||
for<'c> E: 'a + Executor<'c, Database = DB>,
|
||||
for<'c> u64: Decode<'c, DB> + Type<DB>,
|
||||
for<'c> String: Decode<'c, DB> + Type<DB>,
|
||||
for<'c> u64: Encode<'c, DB> + Type<DB>,
|
||||
for<'c> &'c str: ColumnIndex<<DB as Database>::Row>,
|
||||
{
|
||||
let sql = "select region_id, peer_id as datanode_id from information_schema.region_peers where region_id = ?;";
|
||||
sqlx::query_as::<_, Partition>(sql)
|
||||
.bind(region_id)
|
||||
.fetch_one(e)
|
||||
.await
|
||||
.context(error::ExecuteQuerySnafu { sql })
|
||||
}
|
||||
|
||||
/// Returns all [Partition] of the specific `table`
|
||||
pub async fn fetch_partitions<'a, DB, E>(e: E, table_name: Ident) -> Result<Vec<Partition>>
|
||||
where
|
||||
|
||||
@@ -22,7 +22,7 @@ use super::wait::wait_condition_fn;
|
||||
use crate::error;
|
||||
|
||||
/// Fetches the state of a procedure.
|
||||
pub async fn procedure_state(e: &Pool<MySql>, procedure_id: String) -> String {
|
||||
pub async fn procedure_state(e: &Pool<MySql>, procedure_id: &str) -> String {
|
||||
let sql = format!("select procedure_state(\"{procedure_id}\");");
|
||||
let result = sqlx::query(&sql)
|
||||
.fetch_one(e)
|
||||
@@ -43,7 +43,7 @@ pub async fn wait_for_procedure_finish(
|
||||
|| {
|
||||
let greptime = greptime.clone();
|
||||
let procedure_id = procedure_id.clone();
|
||||
Box::pin(async move { procedure_state(&greptime, procedure_id).await })
|
||||
Box::pin(async move { procedure_state(&greptime, &procedure_id).await })
|
||||
},
|
||||
|output| {
|
||||
info!("Procedure({procedure_id}) state: {:?}", output);
|
||||
|
||||
@@ -43,9 +43,10 @@ use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator;
|
||||
use tests_fuzz::translator::mysql::insert_expr::InsertIntoExprTranslator;
|
||||
use tests_fuzz::translator::DslTranslator;
|
||||
use tests_fuzz::utils::cluster_info::{fetch_nodes, PEER_TYPE_DATANODE};
|
||||
use tests_fuzz::utils::migration::{migrate_region, wait_for_region_distribution};
|
||||
use tests_fuzz::utils::partition::{fetch_partitions, region_distribution};
|
||||
use tests_fuzz::utils::procedure::wait_for_procedure_finish;
|
||||
use tests_fuzz::utils::migration::migrate_region;
|
||||
use tests_fuzz::utils::partition::{fetch_partition, fetch_partitions, region_distribution};
|
||||
use tests_fuzz::utils::procedure::procedure_state;
|
||||
use tests_fuzz::utils::wait::wait_condition_fn;
|
||||
use tests_fuzz::utils::{
|
||||
compact_table, flush_memtable, init_greptime_connections_via_env, Connections,
|
||||
};
|
||||
@@ -125,6 +126,7 @@ fn generate_insert_exprs<R: Rng + 'static>(
|
||||
.collect::<Result<Vec<_>>>()
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Migration {
|
||||
from_peer: u64,
|
||||
to_peer: u64,
|
||||
@@ -222,19 +224,31 @@ async fn execute_region_migration(ctx: FuzzContext, input: FuzzInput) -> Result<
|
||||
}
|
||||
info!("Excepted new region distribution: {new_distribution:?}");
|
||||
|
||||
for procedure_id in procedure_ids {
|
||||
wait_for_procedure_finish(&ctx.greptime, Duration::from_secs(120), procedure_id).await;
|
||||
for (migration, procedure_id) in migrations.into_iter().zip(procedure_ids) {
|
||||
info!("Waits for migration: {migration:?}");
|
||||
let region_id = migration.region_id.as_u64();
|
||||
wait_condition_fn(
|
||||
Duration::from_secs(120),
|
||||
|| {
|
||||
let greptime = ctx.greptime.clone();
|
||||
let procedure_id = procedure_id.to_string();
|
||||
Box::pin(async move {
|
||||
{
|
||||
let output = procedure_state(&greptime, &procedure_id).await;
|
||||
info!("Checking procedure: {procedure_id}, output: {output}");
|
||||
fetch_partition(&greptime, region_id).await.unwrap()
|
||||
}
|
||||
})
|
||||
},
|
||||
|partition| {
|
||||
info!("Region: {region_id}, datanode: {}", partition.datanode_id);
|
||||
partition.datanode_id == migration.to_peer
|
||||
},
|
||||
Duration::from_secs(5),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// Waits for all region migrated
|
||||
wait_for_region_distribution(
|
||||
&ctx.greptime,
|
||||
Duration::from_secs(60),
|
||||
table_ctx.name.clone(),
|
||||
new_distribution,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Values validation
|
||||
info!("Validating rows");
|
||||
let ts_column = table_ctx.timestamp_column().unwrap();
|
||||
|
||||
Reference in New Issue
Block a user