From 4b263ef1ccaf5deba02b14f8a13bb8fb143af23a Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Fri, 15 Nov 2024 18:55:22 +0800 Subject: [PATCH] fix: obsolete wal entires while opening a migrated region (#4993) * fix: delete obsolete wal entrie while opening a migrated region * chore: add logs * chore: rust fmt * fix: fix fuzz test --- .../src/heartbeat/handler/downgrade_region.rs | 4 + .../downgrade_leader_region.rs | 1 + .../region_migration/open_candidate_region.rs | 2 + src/mito2/src/worker/handle_catchup.rs | 6 +- .../migration/fuzz_migrate_mito_regions.rs | 249 +++++++++++------- 5 files changed, 161 insertions(+), 101 deletions(-) diff --git a/src/datanode/src/heartbeat/handler/downgrade_region.rs b/src/datanode/src/heartbeat/handler/downgrade_region.rs index fd85c75ba2..bc7df1a171 100644 --- a/src/datanode/src/heartbeat/handler/downgrade_region.rs +++ b/src/datanode/src/heartbeat/handler/downgrade_region.rs @@ -74,6 +74,10 @@ impl HandlerContext { // Ignores flush request if !writable { + warn!( + "Region: {region_id} is not writable, flush_timeout: {:?}", + flush_timeout + ); return self.downgrade_to_follower_gracefully(region_id).await; } diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs index ec5114b9eb..d2bf6685f1 100644 --- a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs @@ -165,6 +165,7 @@ impl DowngradeLeaderRegion { match receiver.await? { Ok(msg) => { let reply = HeartbeatMailbox::json_reply(&msg)?; + info!("Downgrade region reply: {:?}", reply); let InstructionReply::DowngradeRegion(DowngradeRegionReply { last_entry_id, exists, diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index 3ef485c09a..22b64b0142 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -21,6 +21,7 @@ use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, Simple use common_meta::key::datanode_table::RegionInfo; use common_meta::RegionIdent; use common_procedure::Status; +use common_telemetry::info; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; @@ -144,6 +145,7 @@ impl OpenCandidateRegion { match receiver.await? { Ok(msg) => { let reply = HeartbeatMailbox::json_reply(&msg)?; + info!("Received open region reply: {:?}", reply); let InstructionReply::OpenRegion(SimpleReply { result, error }) = reply else { return error::UnexpectedInstructionReplySnafu { mailbox_message: msg.to_string(), diff --git a/src/mito2/src/worker/handle_catchup.rs b/src/mito2/src/worker/handle_catchup.rs index cacd563ed7..f0fd6b0550 100644 --- a/src/mito2/src/worker/handle_catchup.rs +++ b/src/mito2/src/worker/handle_catchup.rs @@ -49,7 +49,8 @@ impl RegionWorkerLoop { // Utilizes the short circuit evaluation. let region = if !is_mutable_empty || region.manifest_ctx.has_update().await? { let manifest_version = region.manifest_ctx.manifest_version().await; - info!("Reopening the region: {region_id}, empty mutable: {is_mutable_empty}, manifest version: {manifest_version}"); + let flushed_entry_id = region.version_control.current().last_entry_id; + info!("Reopening the region: {region_id}, empty mutable: {is_mutable_empty}, manifest version: {manifest_version}, flushed entry id: {flushed_entry_id}"); let reopened_region = Arc::new( RegionOpener::new( region_id, @@ -111,6 +112,9 @@ impl RegionWorkerLoop { } } else { warn!("Skips to replay memtable for region: {}", region.region_id); + let flushed_entry_id = region.version_control.current().last_entry_id; + let on_region_opened = self.wal.on_region_opened(); + on_region_opened(region_id, flushed_entry_id, ®ion.provider).await?; } if request.set_writable { diff --git a/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs b/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs index d8fc102da4..3f15e859c4 100644 --- a/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs +++ b/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs @@ -80,10 +80,10 @@ impl Arbitrary<'_> for FuzzInput { let rows = rng.gen_range(128..1024); let inserts = rng.gen_range(2..8); Ok(FuzzInput { - partitions, - columns, - rows, seed, + columns, + partitions, + rows, inserts, }) } @@ -133,22 +133,15 @@ struct Migration { region_id: RegionId, } -async fn execute_region_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> { - info!("input: {:?}", input); - let mut rng = ChaChaRng::seed_from_u64(input.seed); - - let create_expr = generate_create_expr(input, &mut rng)?; - let translator = CreateTableExprTranslator; - let sql = translator.translate(&create_expr)?; - let _result = sqlx::query(&sql) - .execute(&ctx.greptime) - .await - .context(error::ExecuteQuerySnafu { sql: &sql })?; - - let table_ctx = Arc::new(TableContext::from(&create_expr)); +async fn insert_values( + ctx: &FuzzContext, + input: FuzzInput, + table_ctx: &TableContextRef, + rng: &mut R, + insert_exprs: &[InsertIntoExpr], +) -> Result<()> { // Inserts data into the table - let insert_exprs = generate_insert_exprs(input, &mut rng, table_ctx.clone())?; - for insert_expr in &insert_exprs { + for insert_expr in insert_exprs { let translator = InsertIntoExprTranslator; let sql = translator.translate(insert_expr)?; let result = ctx @@ -168,12 +161,125 @@ async fn execute_region_migration(ctx: FuzzContext, input: FuzzInput) -> Result< } ); if rng.gen_bool(0.2) { - flush_memtable(&ctx.greptime, &create_expr.table_name).await?; + flush_memtable(&ctx.greptime, &table_ctx.name).await?; } if rng.gen_bool(0.1) { - compact_table(&ctx.greptime, &create_expr.table_name).await?; + compact_table(&ctx.greptime, &table_ctx.name).await?; } } + Ok(()) +} + +async fn validate_insert_exprs( + ctx: &FuzzContext, + table_ctx: &TableContextRef, + insert_exprs: &[InsertIntoExpr], +) -> Result<()> { + info!("Validating rows"); + let ts_column = table_ctx.timestamp_column().unwrap(); + for (idx, insert_expr) in insert_exprs[0..insert_exprs.len() - 1].iter().enumerate() { + let ts_column_idx = insert_expr.timestamp_column_idx().unwrap(); + let ts_value = insert_expr.values_list[0][ts_column_idx].clone(); + let next_batch_ts_column_idx = insert_exprs[idx + 1].timestamp_column_idx().unwrap(); + let next_batch_ts = insert_exprs[idx + 1].values_list[0][next_batch_ts_column_idx].clone(); + + let primary_keys_idx = insert_expr.primary_key_column_idx(); + let column_list = format_columns(&insert_expr.columns); + let primary_keys_column_list = format_columns(&insert_expr.primary_key_columns()); + let select_sql = format!( + "SELECT {} FROM {} WHERE {} >= {} AND {} < {} ORDER BY {};", + column_list, + table_ctx.name, + ts_column.name, + ts_value, + ts_column.name, + next_batch_ts, + primary_keys_column_list + ); + info!("Executing sql: {select_sql}"); + let fetched_rows = ctx.greptime.fetch_all(select_sql.as_str()).await.unwrap(); + let mut expected_rows = replace_default(&insert_expr.values_list, table_ctx, insert_expr); + sort_by_primary_keys(&mut expected_rows, primary_keys_idx); + validator::row::assert_eq::(&insert_expr.columns, &fetched_rows, &expected_rows)?; + } + let insert_expr = insert_exprs.last().unwrap(); + let ts_column_idx = insert_expr.timestamp_column_idx().unwrap(); + let ts_value = insert_expr.values_list[0][ts_column_idx].clone(); + let primary_keys_idx = insert_expr.primary_key_column_idx(); + let column_list = format_columns(&insert_expr.columns); + let primary_keys_column_list = format_columns(&insert_expr.primary_key_columns()); + let select_sql = format!( + "SELECT {} FROM {} WHERE {} >= {} ORDER BY {};", + column_list, table_ctx.name, ts_column.name, ts_value, primary_keys_column_list + ); + info!("Executing sql: {select_sql}"); + let fetched_rows = ctx.greptime.fetch_all(select_sql.as_str()).await.unwrap(); + let mut expected_rows = replace_default(&insert_expr.values_list, table_ctx, insert_expr); + sort_by_primary_keys(&mut expected_rows, primary_keys_idx); + validator::row::assert_eq::(&insert_expr.columns, &fetched_rows, &expected_rows)?; + + Ok(()) +} + +async fn migrate_regions(ctx: &FuzzContext, migrations: &[Migration]) -> Result<()> { + let mut procedure_ids = Vec::with_capacity(migrations.len()); + + for Migration { + from_peer, + to_peer, + region_id, + } in migrations + { + let procedure_id = + migrate_region(&ctx.greptime, region_id.as_u64(), *from_peer, *to_peer, 240).await; + info!("Migrating region: {region_id} from {from_peer} to {to_peer}, procedure: {procedure_id}"); + procedure_ids.push(procedure_id); + } + + for (migration, procedure_id) in migrations.iter().zip(procedure_ids) { + info!("Waits for migration: {migration:?}"); + let region_id = migration.region_id.as_u64(); + wait_condition_fn( + Duration::from_secs(240), + || { + let greptime = ctx.greptime.clone(); + let procedure_id = procedure_id.to_string(); + Box::pin(async move { + { + let output = procedure_state(&greptime, &procedure_id).await; + info!("Checking procedure: {procedure_id}, output: {output}"); + fetch_partition(&greptime, region_id).await.unwrap() + } + }) + }, + |partition| { + info!("Region: {region_id}, datanode: {}", partition.datanode_id); + partition.datanode_id == migration.to_peer + }, + Duration::from_secs(5), + ) + .await; + } + + Ok(()) +} + +async fn execute_region_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> { + info!("input: {:?}", input); + let mut rng = ChaChaRng::seed_from_u64(input.seed); + + let create_expr = generate_create_expr(input, &mut rng)?; + let translator = CreateTableExprTranslator; + let sql = translator.translate(&create_expr)?; + let _result = sqlx::query(&sql) + .execute(&ctx.greptime) + .await + .context(error::ExecuteQuerySnafu { sql: &sql })?; + + let table_ctx = Arc::new(TableContext::from(&create_expr)); + let mut insert_exprs = generate_insert_exprs(input, &mut rng, table_ctx.clone())?; + let remaining_insert_exprs = insert_exprs.split_off(insert_exprs.len() / 2); + insert_values(&ctx, input, &table_ctx, &mut rng, &insert_exprs).await?; // Fetches region distribution let partitions = fetch_partitions(&ctx.greptime, table_ctx.name.clone()).await?; @@ -209,89 +315,32 @@ async fn execute_region_migration(ctx: FuzzContext, input: FuzzInput) -> Result< } } - let mut procedure_ids = Vec::with_capacity(migrations.len()); - // Triggers region migrations - for Migration { - from_peer, - to_peer, - region_id, - } in &migrations - { - let procedure_id = - migrate_region(&ctx.greptime, region_id.as_u64(), *from_peer, *to_peer, 240).await; - info!("Migrating region: {region_id} from {from_peer} to {to_peer}, procedure: {procedure_id}"); - procedure_ids.push(procedure_id); - } info!("Excepted new region distribution: {new_distribution:?}"); - - for (migration, procedure_id) in migrations.into_iter().zip(procedure_ids) { - info!("Waits for migration: {migration:?}"); - let region_id = migration.region_id.as_u64(); - wait_condition_fn( - Duration::from_secs(240), - || { - let greptime = ctx.greptime.clone(); - let procedure_id = procedure_id.to_string(); - Box::pin(async move { - { - let output = procedure_state(&greptime, &procedure_id).await; - info!("Checking procedure: {procedure_id}, output: {output}"); - fetch_partition(&greptime, region_id).await.unwrap() - } - }) - }, - |partition| { - info!("Region: {region_id}, datanode: {}", partition.datanode_id); - partition.datanode_id == migration.to_peer - }, - Duration::from_secs(5), - ) - .await; - } - + // Triggers region migrations + migrate_regions(&ctx, &migrations).await?; // Values validation - info!("Validating rows"); - let ts_column = table_ctx.timestamp_column().unwrap(); - for (idx, insert_expr) in insert_exprs[0..insert_exprs.len() - 1].iter().enumerate() { - let ts_column_idx = insert_expr.timestamp_column_idx().unwrap(); - let ts_value = insert_expr.values_list[0][ts_column_idx].clone(); - let next_batch_ts_column_idx = insert_exprs[idx + 1].timestamp_column_idx().unwrap(); - let next_batch_ts = insert_exprs[idx + 1].values_list[0][next_batch_ts_column_idx].clone(); + validate_insert_exprs(&ctx, &table_ctx, &insert_exprs).await?; - let primary_keys_idx = insert_expr.primary_key_column_idx(); - let column_list = format_columns(&insert_expr.columns); - let primary_keys_column_list = format_columns(&insert_expr.primary_key_columns()); - let select_sql = format!( - "SELECT {} FROM {} WHERE {} >= {} AND {} < {} ORDER BY {};", - column_list, - create_expr.table_name, - ts_column.name, - ts_value, - ts_column.name, - next_batch_ts, - primary_keys_column_list - ); - info!("Executing sql: {select_sql}"); - let fetched_rows = ctx.greptime.fetch_all(select_sql.as_str()).await.unwrap(); - let mut expected_rows = replace_default(&insert_expr.values_list, &table_ctx, insert_expr); - sort_by_primary_keys(&mut expected_rows, primary_keys_idx); - validator::row::assert_eq::(&insert_expr.columns, &fetched_rows, &expected_rows)?; - } - let insert_expr = insert_exprs.last().unwrap(); - let ts_column_idx = insert_expr.timestamp_column_idx().unwrap(); - let ts_value = insert_expr.values_list[0][ts_column_idx].clone(); - let primary_keys_idx = insert_expr.primary_key_column_idx(); - let column_list = format_columns(&insert_expr.columns); - let primary_keys_column_list = format_columns(&insert_expr.primary_key_columns()); - let select_sql = format!( - "SELECT {} FROM {} WHERE {} >= {} ORDER BY {};", - column_list, create_expr.table_name, ts_column.name, ts_value, primary_keys_column_list - ); - info!("Executing sql: {select_sql}"); - let fetched_rows = ctx.greptime.fetch_all(select_sql.as_str()).await.unwrap(); - let mut expected_rows = replace_default(&insert_expr.values_list, &table_ctx, insert_expr); - sort_by_primary_keys(&mut expected_rows, primary_keys_idx); - validator::row::assert_eq::(&insert_expr.columns, &fetched_rows, &expected_rows)?; + insert_values(&ctx, input, &table_ctx, &mut rng, &remaining_insert_exprs).await?; + // Recovers region distribution + let migrations = migrations + .into_iter() + .map( + |Migration { + from_peer, + to_peer, + region_id, + }| Migration { + from_peer: to_peer, + to_peer: from_peer, + region_id, + }, + ) + .collect::>(); + // Triggers region migrations + migrate_regions(&ctx, &migrations).await?; + // Values validation + validate_insert_exprs(&ctx, &table_ctx, &remaining_insert_exprs).await?; // Cleans up let sql = format!("DROP TABLE {}", table_ctx.name);