mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
fix: obsolete wal entires while opening a migrated region (#4993)
* fix: delete obsolete wal entrie while opening a migrated region * chore: add logs * chore: rust fmt * fix: fix fuzz test
This commit is contained in:
@@ -80,10 +80,10 @@ impl Arbitrary<'_> for FuzzInput {
|
||||
let rows = rng.gen_range(128..1024);
|
||||
let inserts = rng.gen_range(2..8);
|
||||
Ok(FuzzInput {
|
||||
partitions,
|
||||
columns,
|
||||
rows,
|
||||
seed,
|
||||
columns,
|
||||
partitions,
|
||||
rows,
|
||||
inserts,
|
||||
})
|
||||
}
|
||||
@@ -133,22 +133,15 @@ struct Migration {
|
||||
region_id: RegionId,
|
||||
}
|
||||
|
||||
async fn execute_region_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
|
||||
info!("input: {:?}", input);
|
||||
let mut rng = ChaChaRng::seed_from_u64(input.seed);
|
||||
|
||||
let create_expr = generate_create_expr(input, &mut rng)?;
|
||||
let translator = CreateTableExprTranslator;
|
||||
let sql = translator.translate(&create_expr)?;
|
||||
let _result = sqlx::query(&sql)
|
||||
.execute(&ctx.greptime)
|
||||
.await
|
||||
.context(error::ExecuteQuerySnafu { sql: &sql })?;
|
||||
|
||||
let table_ctx = Arc::new(TableContext::from(&create_expr));
|
||||
async fn insert_values<R: Rng + 'static>(
|
||||
ctx: &FuzzContext,
|
||||
input: FuzzInput,
|
||||
table_ctx: &TableContextRef,
|
||||
rng: &mut R,
|
||||
insert_exprs: &[InsertIntoExpr],
|
||||
) -> Result<()> {
|
||||
// Inserts data into the table
|
||||
let insert_exprs = generate_insert_exprs(input, &mut rng, table_ctx.clone())?;
|
||||
for insert_expr in &insert_exprs {
|
||||
for insert_expr in insert_exprs {
|
||||
let translator = InsertIntoExprTranslator;
|
||||
let sql = translator.translate(insert_expr)?;
|
||||
let result = ctx
|
||||
@@ -168,12 +161,125 @@ async fn execute_region_migration(ctx: FuzzContext, input: FuzzInput) -> Result<
|
||||
}
|
||||
);
|
||||
if rng.gen_bool(0.2) {
|
||||
flush_memtable(&ctx.greptime, &create_expr.table_name).await?;
|
||||
flush_memtable(&ctx.greptime, &table_ctx.name).await?;
|
||||
}
|
||||
if rng.gen_bool(0.1) {
|
||||
compact_table(&ctx.greptime, &create_expr.table_name).await?;
|
||||
compact_table(&ctx.greptime, &table_ctx.name).await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn validate_insert_exprs(
|
||||
ctx: &FuzzContext,
|
||||
table_ctx: &TableContextRef,
|
||||
insert_exprs: &[InsertIntoExpr],
|
||||
) -> Result<()> {
|
||||
info!("Validating rows");
|
||||
let ts_column = table_ctx.timestamp_column().unwrap();
|
||||
for (idx, insert_expr) in insert_exprs[0..insert_exprs.len() - 1].iter().enumerate() {
|
||||
let ts_column_idx = insert_expr.timestamp_column_idx().unwrap();
|
||||
let ts_value = insert_expr.values_list[0][ts_column_idx].clone();
|
||||
let next_batch_ts_column_idx = insert_exprs[idx + 1].timestamp_column_idx().unwrap();
|
||||
let next_batch_ts = insert_exprs[idx + 1].values_list[0][next_batch_ts_column_idx].clone();
|
||||
|
||||
let primary_keys_idx = insert_expr.primary_key_column_idx();
|
||||
let column_list = format_columns(&insert_expr.columns);
|
||||
let primary_keys_column_list = format_columns(&insert_expr.primary_key_columns());
|
||||
let select_sql = format!(
|
||||
"SELECT {} FROM {} WHERE {} >= {} AND {} < {} ORDER BY {};",
|
||||
column_list,
|
||||
table_ctx.name,
|
||||
ts_column.name,
|
||||
ts_value,
|
||||
ts_column.name,
|
||||
next_batch_ts,
|
||||
primary_keys_column_list
|
||||
);
|
||||
info!("Executing sql: {select_sql}");
|
||||
let fetched_rows = ctx.greptime.fetch_all(select_sql.as_str()).await.unwrap();
|
||||
let mut expected_rows = replace_default(&insert_expr.values_list, table_ctx, insert_expr);
|
||||
sort_by_primary_keys(&mut expected_rows, primary_keys_idx);
|
||||
validator::row::assert_eq::<MySql>(&insert_expr.columns, &fetched_rows, &expected_rows)?;
|
||||
}
|
||||
let insert_expr = insert_exprs.last().unwrap();
|
||||
let ts_column_idx = insert_expr.timestamp_column_idx().unwrap();
|
||||
let ts_value = insert_expr.values_list[0][ts_column_idx].clone();
|
||||
let primary_keys_idx = insert_expr.primary_key_column_idx();
|
||||
let column_list = format_columns(&insert_expr.columns);
|
||||
let primary_keys_column_list = format_columns(&insert_expr.primary_key_columns());
|
||||
let select_sql = format!(
|
||||
"SELECT {} FROM {} WHERE {} >= {} ORDER BY {};",
|
||||
column_list, table_ctx.name, ts_column.name, ts_value, primary_keys_column_list
|
||||
);
|
||||
info!("Executing sql: {select_sql}");
|
||||
let fetched_rows = ctx.greptime.fetch_all(select_sql.as_str()).await.unwrap();
|
||||
let mut expected_rows = replace_default(&insert_expr.values_list, table_ctx, insert_expr);
|
||||
sort_by_primary_keys(&mut expected_rows, primary_keys_idx);
|
||||
validator::row::assert_eq::<MySql>(&insert_expr.columns, &fetched_rows, &expected_rows)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn migrate_regions(ctx: &FuzzContext, migrations: &[Migration]) -> Result<()> {
|
||||
let mut procedure_ids = Vec::with_capacity(migrations.len());
|
||||
|
||||
for Migration {
|
||||
from_peer,
|
||||
to_peer,
|
||||
region_id,
|
||||
} in migrations
|
||||
{
|
||||
let procedure_id =
|
||||
migrate_region(&ctx.greptime, region_id.as_u64(), *from_peer, *to_peer, 240).await;
|
||||
info!("Migrating region: {region_id} from {from_peer} to {to_peer}, procedure: {procedure_id}");
|
||||
procedure_ids.push(procedure_id);
|
||||
}
|
||||
|
||||
for (migration, procedure_id) in migrations.iter().zip(procedure_ids) {
|
||||
info!("Waits for migration: {migration:?}");
|
||||
let region_id = migration.region_id.as_u64();
|
||||
wait_condition_fn(
|
||||
Duration::from_secs(240),
|
||||
|| {
|
||||
let greptime = ctx.greptime.clone();
|
||||
let procedure_id = procedure_id.to_string();
|
||||
Box::pin(async move {
|
||||
{
|
||||
let output = procedure_state(&greptime, &procedure_id).await;
|
||||
info!("Checking procedure: {procedure_id}, output: {output}");
|
||||
fetch_partition(&greptime, region_id).await.unwrap()
|
||||
}
|
||||
})
|
||||
},
|
||||
|partition| {
|
||||
info!("Region: {region_id}, datanode: {}", partition.datanode_id);
|
||||
partition.datanode_id == migration.to_peer
|
||||
},
|
||||
Duration::from_secs(5),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn execute_region_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
|
||||
info!("input: {:?}", input);
|
||||
let mut rng = ChaChaRng::seed_from_u64(input.seed);
|
||||
|
||||
let create_expr = generate_create_expr(input, &mut rng)?;
|
||||
let translator = CreateTableExprTranslator;
|
||||
let sql = translator.translate(&create_expr)?;
|
||||
let _result = sqlx::query(&sql)
|
||||
.execute(&ctx.greptime)
|
||||
.await
|
||||
.context(error::ExecuteQuerySnafu { sql: &sql })?;
|
||||
|
||||
let table_ctx = Arc::new(TableContext::from(&create_expr));
|
||||
let mut insert_exprs = generate_insert_exprs(input, &mut rng, table_ctx.clone())?;
|
||||
let remaining_insert_exprs = insert_exprs.split_off(insert_exprs.len() / 2);
|
||||
insert_values(&ctx, input, &table_ctx, &mut rng, &insert_exprs).await?;
|
||||
|
||||
// Fetches region distribution
|
||||
let partitions = fetch_partitions(&ctx.greptime, table_ctx.name.clone()).await?;
|
||||
@@ -209,89 +315,32 @@ async fn execute_region_migration(ctx: FuzzContext, input: FuzzInput) -> Result<
|
||||
}
|
||||
}
|
||||
|
||||
let mut procedure_ids = Vec::with_capacity(migrations.len());
|
||||
// Triggers region migrations
|
||||
for Migration {
|
||||
from_peer,
|
||||
to_peer,
|
||||
region_id,
|
||||
} in &migrations
|
||||
{
|
||||
let procedure_id =
|
||||
migrate_region(&ctx.greptime, region_id.as_u64(), *from_peer, *to_peer, 240).await;
|
||||
info!("Migrating region: {region_id} from {from_peer} to {to_peer}, procedure: {procedure_id}");
|
||||
procedure_ids.push(procedure_id);
|
||||
}
|
||||
info!("Excepted new region distribution: {new_distribution:?}");
|
||||
|
||||
for (migration, procedure_id) in migrations.into_iter().zip(procedure_ids) {
|
||||
info!("Waits for migration: {migration:?}");
|
||||
let region_id = migration.region_id.as_u64();
|
||||
wait_condition_fn(
|
||||
Duration::from_secs(240),
|
||||
|| {
|
||||
let greptime = ctx.greptime.clone();
|
||||
let procedure_id = procedure_id.to_string();
|
||||
Box::pin(async move {
|
||||
{
|
||||
let output = procedure_state(&greptime, &procedure_id).await;
|
||||
info!("Checking procedure: {procedure_id}, output: {output}");
|
||||
fetch_partition(&greptime, region_id).await.unwrap()
|
||||
}
|
||||
})
|
||||
},
|
||||
|partition| {
|
||||
info!("Region: {region_id}, datanode: {}", partition.datanode_id);
|
||||
partition.datanode_id == migration.to_peer
|
||||
},
|
||||
Duration::from_secs(5),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// Triggers region migrations
|
||||
migrate_regions(&ctx, &migrations).await?;
|
||||
// Values validation
|
||||
info!("Validating rows");
|
||||
let ts_column = table_ctx.timestamp_column().unwrap();
|
||||
for (idx, insert_expr) in insert_exprs[0..insert_exprs.len() - 1].iter().enumerate() {
|
||||
let ts_column_idx = insert_expr.timestamp_column_idx().unwrap();
|
||||
let ts_value = insert_expr.values_list[0][ts_column_idx].clone();
|
||||
let next_batch_ts_column_idx = insert_exprs[idx + 1].timestamp_column_idx().unwrap();
|
||||
let next_batch_ts = insert_exprs[idx + 1].values_list[0][next_batch_ts_column_idx].clone();
|
||||
validate_insert_exprs(&ctx, &table_ctx, &insert_exprs).await?;
|
||||
|
||||
let primary_keys_idx = insert_expr.primary_key_column_idx();
|
||||
let column_list = format_columns(&insert_expr.columns);
|
||||
let primary_keys_column_list = format_columns(&insert_expr.primary_key_columns());
|
||||
let select_sql = format!(
|
||||
"SELECT {} FROM {} WHERE {} >= {} AND {} < {} ORDER BY {};",
|
||||
column_list,
|
||||
create_expr.table_name,
|
||||
ts_column.name,
|
||||
ts_value,
|
||||
ts_column.name,
|
||||
next_batch_ts,
|
||||
primary_keys_column_list
|
||||
);
|
||||
info!("Executing sql: {select_sql}");
|
||||
let fetched_rows = ctx.greptime.fetch_all(select_sql.as_str()).await.unwrap();
|
||||
let mut expected_rows = replace_default(&insert_expr.values_list, &table_ctx, insert_expr);
|
||||
sort_by_primary_keys(&mut expected_rows, primary_keys_idx);
|
||||
validator::row::assert_eq::<MySql>(&insert_expr.columns, &fetched_rows, &expected_rows)?;
|
||||
}
|
||||
let insert_expr = insert_exprs.last().unwrap();
|
||||
let ts_column_idx = insert_expr.timestamp_column_idx().unwrap();
|
||||
let ts_value = insert_expr.values_list[0][ts_column_idx].clone();
|
||||
let primary_keys_idx = insert_expr.primary_key_column_idx();
|
||||
let column_list = format_columns(&insert_expr.columns);
|
||||
let primary_keys_column_list = format_columns(&insert_expr.primary_key_columns());
|
||||
let select_sql = format!(
|
||||
"SELECT {} FROM {} WHERE {} >= {} ORDER BY {};",
|
||||
column_list, create_expr.table_name, ts_column.name, ts_value, primary_keys_column_list
|
||||
);
|
||||
info!("Executing sql: {select_sql}");
|
||||
let fetched_rows = ctx.greptime.fetch_all(select_sql.as_str()).await.unwrap();
|
||||
let mut expected_rows = replace_default(&insert_expr.values_list, &table_ctx, insert_expr);
|
||||
sort_by_primary_keys(&mut expected_rows, primary_keys_idx);
|
||||
validator::row::assert_eq::<MySql>(&insert_expr.columns, &fetched_rows, &expected_rows)?;
|
||||
insert_values(&ctx, input, &table_ctx, &mut rng, &remaining_insert_exprs).await?;
|
||||
// Recovers region distribution
|
||||
let migrations = migrations
|
||||
.into_iter()
|
||||
.map(
|
||||
|Migration {
|
||||
from_peer,
|
||||
to_peer,
|
||||
region_id,
|
||||
}| Migration {
|
||||
from_peer: to_peer,
|
||||
to_peer: from_peer,
|
||||
region_id,
|
||||
},
|
||||
)
|
||||
.collect::<Vec<_>>();
|
||||
// Triggers region migrations
|
||||
migrate_regions(&ctx, &migrations).await?;
|
||||
// Values validation
|
||||
validate_insert_exprs(&ctx, &table_ctx, &remaining_insert_exprs).await?;
|
||||
|
||||
// Cleans up
|
||||
let sql = format!("DROP TABLE {}", table_ctx.name);
|
||||
|
||||
Reference in New Issue
Block a user