From 3c24ca1a7ad92fc1a440ae41202f9fdb6f3928ca Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 13 Dec 2023 12:00:58 +0900 Subject: [PATCH] test: add more tests for region migration procedure (#2895) * test: add flow test for open candidate region with retryable error * test: add flow test for upgrade candidate retry failed * test: add flow test for upgrade candidate with retry --- .../src/procedure/region_migration.rs | 305 ++++++++++++++++++ .../region_migration/migration_start.rs | 2 +- .../region_migration/open_candidate_region.rs | 56 ++-- .../procedure/region_migration/test_util.rs | 106 +++++- 4 files changed, 427 insertions(+), 42 deletions(-) diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 787462cecf..5e935d9328 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -374,6 +374,7 @@ mod tests { use super::update_metadata::UpdateMetadata; use super::*; use crate::handler::HeartbeatMailbox; + use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion; use crate::procedure::region_migration::test_util::*; use crate::service::mailbox::Channel; @@ -646,4 +647,308 @@ mod tests { runner.suite.verify_table_metadata().await; } + + #[tokio::test] + async fn test_procedure_flow_open_candidate_region_retryable_error() { + common_telemetry::init_default_ut_logging(); + + let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1)); + let state = Box::new(RegionMigrationStart); + + // The table metadata. + let to_peer_id = persistent_context.to_peer.id; + let from_peer = persistent_context.from_peer.clone(); + let region_id = persistent_context.region_id; + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(from_peer), + follower_peers: vec![], + ..Default::default() + }]; + + let suite = ProcedureMigrationTestSuite::new(persistent_context, state); + suite.init_table_metadata(table_info, region_routes).await; + + let steps = vec![ + // Migration Start + Step::next( + "Should be the open candidate region", + None, + Assertion::simple(assert_open_candidate_region, assert_need_persist), + ), + // OpenCandidateRegion + Step::next( + "Should be throwing a non-retry error", + Some(mock_datanode_reply( + to_peer_id, + Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()), + )), + Assertion::error(|error| assert!(error.is_retryable())), + ), + // OpenCandidateRegion + Step::next( + "Should be throwing a non-retry error again", + Some(mock_datanode_reply( + to_peer_id, + Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()), + )), + Assertion::error(|error| assert!(error.is_retryable())), + ), + ]; + + let setup_to_latest_persisted_state = Step::setup( + "Sets state to UpdateMetadata::Downgrade", + merge_before_test_fn(vec![ + setup_state(Arc::new(|| Box::new(OpenCandidateRegion))), + Arc::new(reset_volatile_ctx), + ]), + ); + + let steps = [ + steps.clone(), + // Mocks the volatile ctx lost(i.g., Meta leader restarts). + vec![setup_to_latest_persisted_state.clone()], + steps.clone()[1..].to_vec(), + vec![setup_to_latest_persisted_state], + steps.clone()[1..].to_vec(), + ] + .concat(); + + // Run the table tests. + let runner = ProcedureMigrationSuiteRunner::new(suite) + .steps(steps.clone()) + .run_once() + .await; + + let table_routes_version = runner + .env() + .table_metadata_manager() + .table_route_manager() + .get(region_id.table_id()) + .await + .unwrap() + .unwrap() + .version(); + // Should be unchanged. + assert_eq!(table_routes_version, 0); + } + + #[tokio::test] + async fn test_procedure_flow_upgrade_candidate_with_retry_and_failed() { + common_telemetry::init_default_ut_logging(); + + let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1)); + let state = Box::new(RegionMigrationStart); + + // The table metadata. + let from_peer_id = persistent_context.from_peer.id; + let to_peer_id = persistent_context.to_peer.id; + let from_peer = persistent_context.from_peer.clone(); + let to_peer = persistent_context.to_peer.clone(); + let region_id = persistent_context.region_id; + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(from_peer), + follower_peers: vec![to_peer], + ..Default::default() + }]; + + let suite = ProcedureMigrationTestSuite::new(persistent_context, state); + suite.init_table_metadata(table_info, region_routes).await; + + let steps = vec![ + // MigrationStart + Step::next( + "Should be the update metadata for downgrading", + None, + Assertion::simple(assert_update_metadata_downgrade, assert_need_persist), + ), + // UpdateMetadata::Downgrade + Step::next( + "Should be the downgrade leader region", + None, + Assertion::simple(assert_downgrade_leader_region, assert_no_persist), + ), + // Downgrade Candidate + Step::next( + "Should be the upgrade candidate region", + Some(mock_datanode_reply( + from_peer_id, + Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))), + )), + Assertion::simple(assert_upgrade_candidate_region, assert_no_persist), + ), + // Upgrade Candidate + Step::next( + "Should be the rollback metadata", + Some(mock_datanode_reply( + to_peer_id, + Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()), + )), + Assertion::simple(assert_update_metadata_rollback, assert_no_persist), + ), + // UpdateMetadata::Rollback + Step::next( + "Should be the region migration abort", + None, + Assertion::simple(assert_region_migration_abort, assert_no_persist), + ), + // RegionMigrationAbort + Step::next( + "Should throw an error", + None, + Assertion::error(|error| { + assert!(!error.is_retryable()); + assert_matches!(error, error::Error::MigrationAbort { .. }); + }), + ), + ]; + + let setup_to_latest_persisted_state = Step::setup( + "Sets state to UpdateMetadata::Downgrade", + merge_before_test_fn(vec![ + setup_state(Arc::new(|| Box::new(UpdateMetadata::Downgrade))), + Arc::new(reset_volatile_ctx), + ]), + ); + + let steps = [ + steps.clone(), + vec![setup_to_latest_persisted_state.clone()], + steps.clone()[1..].to_vec(), + vec![setup_to_latest_persisted_state], + steps.clone()[1..].to_vec(), + ] + .concat(); + + // Run the table tests. + ProcedureMigrationSuiteRunner::new(suite) + .steps(steps.clone()) + .run_once() + .await; + } + + #[tokio::test] + async fn test_procedure_flow_upgrade_candidate_with_retry() { + common_telemetry::init_default_ut_logging(); + + let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1)); + let state = Box::new(RegionMigrationStart); + + // The table metadata. + let to_peer_id = persistent_context.to_peer.id; + let from_peer_id = persistent_context.from_peer.id; + let from_peer = persistent_context.from_peer.clone(); + let region_id = persistent_context.region_id; + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(from_peer), + follower_peers: vec![], + ..Default::default() + }]; + + let suite = ProcedureMigrationTestSuite::new(persistent_context, state); + suite.init_table_metadata(table_info, region_routes).await; + + let steps = vec![ + // Migration Start + Step::next( + "Should be the open candidate region", + None, + Assertion::simple(assert_open_candidate_region, assert_need_persist), + ), + // OpenCandidateRegion + Step::next( + "Should be throwing a retryable error", + Some(mock_datanode_reply( + to_peer_id, + Arc::new(|id| Ok(new_open_region_reply(id, false, None))), + )), + Assertion::error(|error| assert!(error.is_retryable())), + ), + // OpenCandidateRegion + Step::next( + "Should be the update metadata for downgrading", + Some(mock_datanode_reply( + to_peer_id, + Arc::new(|id| Ok(new_open_region_reply(id, true, None))), + )), + Assertion::simple(assert_update_metadata_downgrade, assert_no_persist), + ), + // UpdateMetadata::Downgrade + Step::next( + "Should be the downgrade leader region", + None, + Assertion::simple(assert_downgrade_leader_region, assert_no_persist), + ), + // Downgrade Leader + Step::next( + "Should be the upgrade candidate region", + Some(mock_datanode_reply( + from_peer_id, + merge_mailbox_messages(vec![ + Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()), + Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))), + ]), + )), + Assertion::simple(assert_upgrade_candidate_region, assert_no_persist), + ), + // Upgrade Candidate + Step::next( + "Should be the update metadata for upgrading", + Some(mock_datanode_reply( + to_peer_id, + merge_mailbox_messages(vec![ + Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()), + Arc::new(|id| Ok(new_upgrade_region_reply(id, true, true, None))), + ]), + )), + Assertion::simple(assert_update_metadata_upgrade, assert_no_persist), + ), + // UpdateMetadata::Upgrade + Step::next( + "Should be the region migration end", + None, + Assertion::simple(assert_region_migration_end, assert_done), + ), + // RegionMigrationEnd + Step::next( + "Should be the region migration end again", + None, + Assertion::simple(assert_region_migration_end, assert_done), + ), + ]; + + let setup_to_latest_persisted_state = Step::setup( + "Sets state to OpenCandidateRegion", + merge_before_test_fn(vec![ + setup_state(Arc::new(|| Box::new(OpenCandidateRegion))), + Arc::new(reset_volatile_ctx), + ]), + ); + + let steps = [ + steps.clone(), + vec![setup_to_latest_persisted_state.clone()], + steps.clone()[1..].to_vec(), + vec![setup_to_latest_persisted_state], + steps.clone()[1..].to_vec(), + ] + .concat(); + + let timer = Instant::now(); + + // Run the table tests. + let runner = ProcedureMigrationSuiteRunner::new(suite) + .steps(steps.clone()) + .run_once() + .await; + + // Ensure it didn't run into the slow path. + assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS); + runner.suite.verify_table_metadata().await; + } } diff --git a/src/meta-srv/src/procedure/region_migration/migration_start.rs b/src/meta-srv/src/procedure/region_migration/migration_start.rs index 80475904e6..9800613929 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_start.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_start.rs @@ -201,7 +201,7 @@ mod tests { } #[tokio::test] - async fn test_next_downgrade_leader_region_state() { + async fn test_next_update_metadata_downgrade_state() { let mut state = Box::new(RegionMigrationStart); // from_peer: 1 // to_peer: 2 diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index 450e9b7e44..830c4e22b8 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -25,9 +25,9 @@ use common_procedure::Status; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; +use super::update_metadata::UpdateMetadata; use crate::error::{self, Result}; use crate::handler::HeartbeatMailbox; -use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion; use crate::procedure::region_migration::{Context, State}; use crate::service::mailbox::Channel; @@ -44,7 +44,7 @@ impl State for OpenCandidateRegion { self.open_candidate_region(ctx, instruction).await?; Ok(( - Box::::default(), + Box::new(UpdateMetadata::Downgrade), Status::executing(false), )) } @@ -112,17 +112,19 @@ impl OpenCandidateRegion { let region_id = pc.region_id; let candidate = &pc.to_peer; - // Registers the opening region. - let guard = ctx - .opening_region_keeper - .register(candidate.id, region_id) - .context(error::RegionOpeningRaceSnafu { - peer_id: candidate.id, - region_id, - })?; - - debug_assert!(vc.opening_region_guard.is_none()); - vc.opening_region_guard = Some(guard); + // This method might be invoked multiple times. + // Only registers the guard if `opening_region_guard` is absent. + if vc.opening_region_guard.is_none() { + // Registers the opening region. + let guard = ctx + .opening_region_keeper + .register(candidate.id, region_id) + .context(error::RegionOpeningRaceSnafu { + peer_id: candidate.id, + region_id, + })?; + vc.opening_region_guard = Some(guard); + } let msg = MailboxMessage::json_message( &format!("Open candidate region: {}", region_id), @@ -180,20 +182,17 @@ impl OpenCandidateRegion { mod tests { use std::assert_matches::assert_matches; - use api::v1::meta::mailbox_message::Payload; use common_catalog::consts::MITO2_ENGINE; use common_meta::key::test_utils::new_test_table_info; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; use common_meta::DatanodeId; - use common_time::util::current_time_millis; use store_api::storage::RegionId; use super::*; use crate::error::Error; - use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion; use crate::procedure::region_migration::test_util::{ - self, new_close_region_reply, send_mock_reply, TestingEnv, + self, new_close_region_reply, new_open_region_reply, send_mock_reply, TestingEnv, }; use crate::procedure::region_migration::{ContextFactory, PersistentContext}; @@ -215,20 +214,6 @@ mod tests { }) } - fn new_open_region_reply(id: u64, result: bool, error: Option) -> MailboxMessage { - MailboxMessage { - id, - subject: "mock".to_string(), - from: "datanode".to_string(), - to: "meta".to_string(), - timestamp_millis: current_time_millis(), - payload: Some(Payload::Json( - serde_json::to_string(&InstructionReply::OpenRegion(SimpleReply { result, error })) - .unwrap(), - )), - } - } - #[tokio::test] async fn test_table_info_is_not_found_error() { let state = OpenCandidateRegion; @@ -400,7 +385,7 @@ mod tests { } #[tokio::test] - async fn test_next_downgrade_leader_region_state() { + async fn test_next_update_metadata_downgrade_state() { let mut state = Box::new(OpenCandidateRegion); // from_peer: 1 // to_peer: 2 @@ -441,9 +426,8 @@ mod tests { (to_peer_id, region_id) ); - let _ = next - .as_any() - .downcast_ref::() - .unwrap(); + let update_metadata = next.as_any().downcast_ref::().unwrap(); + + assert_matches!(update_metadata, UpdateMetadata::Downgrade); } } diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index 43c3233723..616104fe82 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::assert_matches::assert_matches; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use api::v1::meta::mailbox_message::Payload; @@ -36,12 +37,14 @@ use store_api::storage::RegionId; use table::metadata::RawTableInfo; use tokio::sync::mpsc::{Receiver, Sender}; +use super::migration_abort::RegionMigrationAbort; use super::upgrade_candidate_region::UpgradeCandidateRegion; use super::{Context, ContextFactory, ContextFactoryImpl, State, VolatileContext}; -use crate::error::Result; +use crate::error::{self, Error, Result}; use crate::handler::{HeartbeatMailbox, Pusher, Pushers}; use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion; use crate::procedure::region_migration::migration_end::RegionMigrationEnd; +use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion; use crate::procedure::region_migration::update_metadata::UpdateMetadata; use crate::procedure::region_migration::PersistentContext; use crate::service::mailbox::{Channel, MailboxRef}; @@ -141,6 +144,25 @@ impl TestingEnv { } } +/// Generates a [InstructionReply::OpenRegion] reply. +pub(crate) fn new_open_region_reply( + id: u64, + result: bool, + error: Option, +) -> MailboxMessage { + MailboxMessage { + id, + subject: "mock".to_string(), + from: "datanode".to_string(), + to: "meta".to_string(), + timestamp_millis: current_time_millis(), + payload: Some(Payload::Json( + serde_json::to_string(&InstructionReply::OpenRegion(SimpleReply { result, error })) + .unwrap(), + )), + } +} + /// Generates a [InstructionReply::CloseRegion] reply. pub fn new_close_region_reply(id: u64) -> MailboxMessage { MailboxMessage { @@ -214,9 +236,10 @@ pub fn send_mock_reply( msg: impl Fn(u64) -> Result + Send + 'static, ) { common_runtime::spawn_bg(async move { - let resp = rx.recv().await.unwrap().unwrap(); - let reply_id = resp.mailbox_message.unwrap().id; - mailbox.on_recv(reply_id, msg(reply_id)).await.unwrap(); + while let Some(Ok(resp)) = rx.recv().await { + let reply_id = resp.mailbox_message.unwrap().id; + mailbox.on_recv(reply_id, msg(reply_id)).await.unwrap(); + } }); } @@ -257,12 +280,16 @@ pub(crate) type StateAssertion = Arc; /// Status assertion function. pub(crate) type StatusAssertion = Arc; +/// Error assertion function. +pub(crate) type ErrorAssertion = Arc; + // TODO(weny): Remove it. #[allow(dead_code)] /// The type of assertion. #[derive(Clone)] pub(crate) enum Assertion { Simple(StateAssertion, StatusAssertion), + Error(ErrorAssertion), Custom(CustomAssertion), } @@ -277,6 +304,11 @@ impl Assertion { ) -> Self { Self::Simple(Arc::new(state), Arc::new(status)) } + + /// Returns an [Assertion::Error]. + pub(crate) fn error(error_assert: T) -> Self { + Self::Error(Arc::new(error_assert)) + } } impl ProcedureMigrationTestSuite { @@ -315,8 +347,12 @@ impl ProcedureMigrationTestSuite { status_assert(status); self.state = next; } + Assertion::Error(error_assert) => { + let error = result.unwrap_err(); + error_assert(error); + } Assertion::Custom(assert_fn) => { - assert_fn(self, result); + assert_fn(self, result).await?; } } @@ -425,6 +461,11 @@ impl ProcedureMigrationSuiteRunner { self } + + /// Returns [TestingEnv] of [ProcedureMigrationTestSuite]. + pub(crate) fn env(&self) -> &TestingEnv { + &self.suite.env + } } /// Asserts the [Status] needs to be persistent. @@ -442,6 +483,11 @@ pub(crate) fn assert_done(status: Status) { assert_matches!(status, Status::Done) } +/// Asserts the [State] should be [OpenCandidateRegion]. +pub(crate) fn assert_open_candidate_region(next: &dyn State) { + let _ = next.as_any().downcast_ref::().unwrap(); +} + /// Asserts the [State] should be [UpdateMetadata::Downgrade]. pub(crate) fn assert_update_metadata_downgrade(next: &dyn State) { let state = next.as_any().downcast_ref::().unwrap(); @@ -454,11 +500,25 @@ pub(crate) fn assert_update_metadata_upgrade(next: &dyn State) { assert_matches!(state, UpdateMetadata::Upgrade); } +/// Asserts the [State] should be [UpdateMetadata::Rollback]. +pub(crate) fn assert_update_metadata_rollback(next: &dyn State) { + let state = next.as_any().downcast_ref::().unwrap(); + assert_matches!(state, UpdateMetadata::Rollback); +} + /// Asserts the [State] should be [RegionMigrationEnd]. pub(crate) fn assert_region_migration_end(next: &dyn State) { let _ = next.as_any().downcast_ref::().unwrap(); } +/// Asserts the [State] should be [RegionMigrationAbort]. +pub(crate) fn assert_region_migration_abort(next: &dyn State) { + let _ = next + .as_any() + .downcast_ref::() + .unwrap(); +} + /// Asserts the [State] should be [DowngradeLeaderRegion]. pub(crate) fn assert_downgrade_leader_region(next: &dyn State) { let _ = next @@ -526,3 +586,39 @@ pub(crate) fn merge_before_test_fn(hooks: Vec) -> BeforeTest { }) }) } + +/// The factory of [MailboxMessage]. +type MailboxMessageFactory = Arc Result + Send + Sync>; + +/// Merges the batch of [MailboxMessageFactory] and all [MailboxMessageFactory] only will be executed once. +pub(crate) fn merge_mailbox_messages(msgs: Vec) -> MailboxMessageFactory { + let counter = Arc::new(AtomicUsize::new(0)); + let l = msgs.len(); + + Arc::new(move |id| { + let cur = counter.fetch_add(1, Ordering::Relaxed) % l; + + debug!("Sending message id: {id} use message[{cur}]"); + msgs[cur](id) + }) +} + +#[test] +fn test_merge_mailbox_messages() { + let merged_factory = merge_mailbox_messages(vec![ + Arc::new(|_| error::UnexpectedSnafu { violated: "first" }.fail()), + Arc::new(|_| error::UnexpectedSnafu { violated: "second" }.fail()), + ]); + + if let error::Error::Unexpected { violated, .. } = merged_factory(0).unwrap_err() { + assert_eq!(violated, "first"); + } else { + unreachable!() + } + + if let error::Error::Unexpected { violated, .. } = merged_factory(0).unwrap_err() { + assert_eq!(violated, "second"); + } else { + unreachable!() + } +}