test: add more tests for region migration procedure (#2895)

* test: add flow test for open candidate region with retryable error

* test: add flow test for upgrade candidate retry failed

* test: add flow test for upgrade candidate with retry
This commit is contained in:
Weny Xu
2023-12-13 12:00:58 +09:00
committed by GitHub
parent 9531469660
commit 3c24ca1a7a
4 changed files with 427 additions and 42 deletions

View File

@@ -374,6 +374,7 @@ mod tests {
use super::update_metadata::UpdateMetadata;
use super::*;
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
use crate::procedure::region_migration::test_util::*;
use crate::service::mailbox::Channel;
@@ -646,4 +647,308 @@ mod tests {
runner.suite.verify_table_metadata().await;
}
#[tokio::test]
async fn test_procedure_flow_open_candidate_region_retryable_error() {
common_telemetry::init_default_ut_logging();
let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
let state = Box::new(RegionMigrationStart);
// The table metadata.
let to_peer_id = persistent_context.to_peer.id;
let from_peer = persistent_context.from_peer.clone();
let region_id = persistent_context.region_id;
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(from_peer),
follower_peers: vec![],
..Default::default()
}];
let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
suite.init_table_metadata(table_info, region_routes).await;
let steps = vec![
// Migration Start
Step::next(
"Should be the open candidate region",
None,
Assertion::simple(assert_open_candidate_region, assert_need_persist),
),
// OpenCandidateRegion
Step::next(
"Should be throwing a non-retry error",
Some(mock_datanode_reply(
to_peer_id,
Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
)),
Assertion::error(|error| assert!(error.is_retryable())),
),
// OpenCandidateRegion
Step::next(
"Should be throwing a non-retry error again",
Some(mock_datanode_reply(
to_peer_id,
Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
)),
Assertion::error(|error| assert!(error.is_retryable())),
),
];
let setup_to_latest_persisted_state = Step::setup(
"Sets state to UpdateMetadata::Downgrade",
merge_before_test_fn(vec![
setup_state(Arc::new(|| Box::new(OpenCandidateRegion))),
Arc::new(reset_volatile_ctx),
]),
);
let steps = [
steps.clone(),
// Mocks the volatile ctx lost(i.g., Meta leader restarts).
vec![setup_to_latest_persisted_state.clone()],
steps.clone()[1..].to_vec(),
vec![setup_to_latest_persisted_state],
steps.clone()[1..].to_vec(),
]
.concat();
// Run the table tests.
let runner = ProcedureMigrationSuiteRunner::new(suite)
.steps(steps.clone())
.run_once()
.await;
let table_routes_version = runner
.env()
.table_metadata_manager()
.table_route_manager()
.get(region_id.table_id())
.await
.unwrap()
.unwrap()
.version();
// Should be unchanged.
assert_eq!(table_routes_version, 0);
}
#[tokio::test]
async fn test_procedure_flow_upgrade_candidate_with_retry_and_failed() {
common_telemetry::init_default_ut_logging();
let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
let state = Box::new(RegionMigrationStart);
// The table metadata.
let from_peer_id = persistent_context.from_peer.id;
let to_peer_id = persistent_context.to_peer.id;
let from_peer = persistent_context.from_peer.clone();
let to_peer = persistent_context.to_peer.clone();
let region_id = persistent_context.region_id;
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(from_peer),
follower_peers: vec![to_peer],
..Default::default()
}];
let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
suite.init_table_metadata(table_info, region_routes).await;
let steps = vec![
// MigrationStart
Step::next(
"Should be the update metadata for downgrading",
None,
Assertion::simple(assert_update_metadata_downgrade, assert_need_persist),
),
// UpdateMetadata::Downgrade
Step::next(
"Should be the downgrade leader region",
None,
Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
),
// Downgrade Candidate
Step::next(
"Should be the upgrade candidate region",
Some(mock_datanode_reply(
from_peer_id,
Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
)),
Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
),
// Upgrade Candidate
Step::next(
"Should be the rollback metadata",
Some(mock_datanode_reply(
to_peer_id,
Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
)),
Assertion::simple(assert_update_metadata_rollback, assert_no_persist),
),
// UpdateMetadata::Rollback
Step::next(
"Should be the region migration abort",
None,
Assertion::simple(assert_region_migration_abort, assert_no_persist),
),
// RegionMigrationAbort
Step::next(
"Should throw an error",
None,
Assertion::error(|error| {
assert!(!error.is_retryable());
assert_matches!(error, error::Error::MigrationAbort { .. });
}),
),
];
let setup_to_latest_persisted_state = Step::setup(
"Sets state to UpdateMetadata::Downgrade",
merge_before_test_fn(vec![
setup_state(Arc::new(|| Box::new(UpdateMetadata::Downgrade))),
Arc::new(reset_volatile_ctx),
]),
);
let steps = [
steps.clone(),
vec![setup_to_latest_persisted_state.clone()],
steps.clone()[1..].to_vec(),
vec![setup_to_latest_persisted_state],
steps.clone()[1..].to_vec(),
]
.concat();
// Run the table tests.
ProcedureMigrationSuiteRunner::new(suite)
.steps(steps.clone())
.run_once()
.await;
}
#[tokio::test]
async fn test_procedure_flow_upgrade_candidate_with_retry() {
common_telemetry::init_default_ut_logging();
let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
let state = Box::new(RegionMigrationStart);
// The table metadata.
let to_peer_id = persistent_context.to_peer.id;
let from_peer_id = persistent_context.from_peer.id;
let from_peer = persistent_context.from_peer.clone();
let region_id = persistent_context.region_id;
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(from_peer),
follower_peers: vec![],
..Default::default()
}];
let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
suite.init_table_metadata(table_info, region_routes).await;
let steps = vec![
// Migration Start
Step::next(
"Should be the open candidate region",
None,
Assertion::simple(assert_open_candidate_region, assert_need_persist),
),
// OpenCandidateRegion
Step::next(
"Should be throwing a retryable error",
Some(mock_datanode_reply(
to_peer_id,
Arc::new(|id| Ok(new_open_region_reply(id, false, None))),
)),
Assertion::error(|error| assert!(error.is_retryable())),
),
// OpenCandidateRegion
Step::next(
"Should be the update metadata for downgrading",
Some(mock_datanode_reply(
to_peer_id,
Arc::new(|id| Ok(new_open_region_reply(id, true, None))),
)),
Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
),
// UpdateMetadata::Downgrade
Step::next(
"Should be the downgrade leader region",
None,
Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
),
// Downgrade Leader
Step::next(
"Should be the upgrade candidate region",
Some(mock_datanode_reply(
from_peer_id,
merge_mailbox_messages(vec![
Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
]),
)),
Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
),
// Upgrade Candidate
Step::next(
"Should be the update metadata for upgrading",
Some(mock_datanode_reply(
to_peer_id,
merge_mailbox_messages(vec![
Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
Arc::new(|id| Ok(new_upgrade_region_reply(id, true, true, None))),
]),
)),
Assertion::simple(assert_update_metadata_upgrade, assert_no_persist),
),
// UpdateMetadata::Upgrade
Step::next(
"Should be the region migration end",
None,
Assertion::simple(assert_region_migration_end, assert_done),
),
// RegionMigrationEnd
Step::next(
"Should be the region migration end again",
None,
Assertion::simple(assert_region_migration_end, assert_done),
),
];
let setup_to_latest_persisted_state = Step::setup(
"Sets state to OpenCandidateRegion",
merge_before_test_fn(vec![
setup_state(Arc::new(|| Box::new(OpenCandidateRegion))),
Arc::new(reset_volatile_ctx),
]),
);
let steps = [
steps.clone(),
vec![setup_to_latest_persisted_state.clone()],
steps.clone()[1..].to_vec(),
vec![setup_to_latest_persisted_state],
steps.clone()[1..].to_vec(),
]
.concat();
let timer = Instant::now();
// Run the table tests.
let runner = ProcedureMigrationSuiteRunner::new(suite)
.steps(steps.clone())
.run_once()
.await;
// Ensure it didn't run into the slow path.
assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS);
runner.suite.verify_table_metadata().await;
}
}

View File

@@ -201,7 +201,7 @@ mod tests {
}
#[tokio::test]
async fn test_next_downgrade_leader_region_state() {
async fn test_next_update_metadata_downgrade_state() {
let mut state = Box::new(RegionMigrationStart);
// from_peer: 1
// to_peer: 2

View File

@@ -25,9 +25,9 @@ use common_procedure::Status;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use super::update_metadata::UpdateMetadata;
use crate::error::{self, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;
@@ -44,7 +44,7 @@ impl State for OpenCandidateRegion {
self.open_candidate_region(ctx, instruction).await?;
Ok((
Box::<DowngradeLeaderRegion>::default(),
Box::new(UpdateMetadata::Downgrade),
Status::executing(false),
))
}
@@ -112,17 +112,19 @@ impl OpenCandidateRegion {
let region_id = pc.region_id;
let candidate = &pc.to_peer;
// Registers the opening region.
let guard = ctx
.opening_region_keeper
.register(candidate.id, region_id)
.context(error::RegionOpeningRaceSnafu {
peer_id: candidate.id,
region_id,
})?;
debug_assert!(vc.opening_region_guard.is_none());
vc.opening_region_guard = Some(guard);
// This method might be invoked multiple times.
// Only registers the guard if `opening_region_guard` is absent.
if vc.opening_region_guard.is_none() {
// Registers the opening region.
let guard = ctx
.opening_region_keeper
.register(candidate.id, region_id)
.context(error::RegionOpeningRaceSnafu {
peer_id: candidate.id,
region_id,
})?;
vc.opening_region_guard = Some(guard);
}
let msg = MailboxMessage::json_message(
&format!("Open candidate region: {}", region_id),
@@ -180,20 +182,17 @@ impl OpenCandidateRegion {
mod tests {
use std::assert_matches::assert_matches;
use api::v1::meta::mailbox_message::Payload;
use common_catalog::consts::MITO2_ENGINE;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::DatanodeId;
use common_time::util::current_time_millis;
use store_api::storage::RegionId;
use super::*;
use crate::error::Error;
use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
use crate::procedure::region_migration::test_util::{
self, new_close_region_reply, send_mock_reply, TestingEnv,
self, new_close_region_reply, new_open_region_reply, send_mock_reply, TestingEnv,
};
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
@@ -215,20 +214,6 @@ mod tests {
})
}
fn new_open_region_reply(id: u64, result: bool, error: Option<String>) -> MailboxMessage {
MailboxMessage {
id,
subject: "mock".to_string(),
from: "datanode".to_string(),
to: "meta".to_string(),
timestamp_millis: current_time_millis(),
payload: Some(Payload::Json(
serde_json::to_string(&InstructionReply::OpenRegion(SimpleReply { result, error }))
.unwrap(),
)),
}
}
#[tokio::test]
async fn test_table_info_is_not_found_error() {
let state = OpenCandidateRegion;
@@ -400,7 +385,7 @@ mod tests {
}
#[tokio::test]
async fn test_next_downgrade_leader_region_state() {
async fn test_next_update_metadata_downgrade_state() {
let mut state = Box::new(OpenCandidateRegion);
// from_peer: 1
// to_peer: 2
@@ -441,9 +426,8 @@ mod tests {
(to_peer_id, region_id)
);
let _ = next
.as_any()
.downcast_ref::<DowngradeLeaderRegion>()
.unwrap();
let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
assert_matches!(update_metadata, UpdateMetadata::Downgrade);
}
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::assert_matches::assert_matches;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use api::v1::meta::mailbox_message::Payload;
@@ -36,12 +37,14 @@ use store_api::storage::RegionId;
use table::metadata::RawTableInfo;
use tokio::sync::mpsc::{Receiver, Sender};
use super::migration_abort::RegionMigrationAbort;
use super::upgrade_candidate_region::UpgradeCandidateRegion;
use super::{Context, ContextFactory, ContextFactoryImpl, State, VolatileContext};
use crate::error::Result;
use crate::error::{self, Error, Result};
use crate::handler::{HeartbeatMailbox, Pusher, Pushers};
use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::PersistentContext;
use crate::service::mailbox::{Channel, MailboxRef};
@@ -141,6 +144,25 @@ impl TestingEnv {
}
}
/// Generates a [InstructionReply::OpenRegion] reply.
pub(crate) fn new_open_region_reply(
id: u64,
result: bool,
error: Option<String>,
) -> MailboxMessage {
MailboxMessage {
id,
subject: "mock".to_string(),
from: "datanode".to_string(),
to: "meta".to_string(),
timestamp_millis: current_time_millis(),
payload: Some(Payload::Json(
serde_json::to_string(&InstructionReply::OpenRegion(SimpleReply { result, error }))
.unwrap(),
)),
}
}
/// Generates a [InstructionReply::CloseRegion] reply.
pub fn new_close_region_reply(id: u64) -> MailboxMessage {
MailboxMessage {
@@ -214,9 +236,10 @@ pub fn send_mock_reply(
msg: impl Fn(u64) -> Result<MailboxMessage> + Send + 'static,
) {
common_runtime::spawn_bg(async move {
let resp = rx.recv().await.unwrap().unwrap();
let reply_id = resp.mailbox_message.unwrap().id;
mailbox.on_recv(reply_id, msg(reply_id)).await.unwrap();
while let Some(Ok(resp)) = rx.recv().await {
let reply_id = resp.mailbox_message.unwrap().id;
mailbox.on_recv(reply_id, msg(reply_id)).await.unwrap();
}
});
}
@@ -257,12 +280,16 @@ pub(crate) type StateAssertion = Arc<dyn Fn(&dyn State) + Send + Sync>;
/// Status assertion function.
pub(crate) type StatusAssertion = Arc<dyn Fn(Status) + Send + Sync>;
/// Error assertion function.
pub(crate) type ErrorAssertion = Arc<dyn Fn(Error) + Send + Sync>;
// TODO(weny): Remove it.
#[allow(dead_code)]
/// The type of assertion.
#[derive(Clone)]
pub(crate) enum Assertion {
Simple(StateAssertion, StatusAssertion),
Error(ErrorAssertion),
Custom(CustomAssertion),
}
@@ -277,6 +304,11 @@ impl Assertion {
) -> Self {
Self::Simple(Arc::new(state), Arc::new(status))
}
/// Returns an [Assertion::Error].
pub(crate) fn error<T: Fn(Error) + Send + Sync + 'static>(error_assert: T) -> Self {
Self::Error(Arc::new(error_assert))
}
}
impl ProcedureMigrationTestSuite {
@@ -315,8 +347,12 @@ impl ProcedureMigrationTestSuite {
status_assert(status);
self.state = next;
}
Assertion::Error(error_assert) => {
let error = result.unwrap_err();
error_assert(error);
}
Assertion::Custom(assert_fn) => {
assert_fn(self, result);
assert_fn(self, result).await?;
}
}
@@ -425,6 +461,11 @@ impl ProcedureMigrationSuiteRunner {
self
}
/// Returns [TestingEnv] of [ProcedureMigrationTestSuite].
pub(crate) fn env(&self) -> &TestingEnv {
&self.suite.env
}
}
/// Asserts the [Status] needs to be persistent.
@@ -442,6 +483,11 @@ pub(crate) fn assert_done(status: Status) {
assert_matches!(status, Status::Done)
}
/// Asserts the [State] should be [OpenCandidateRegion].
pub(crate) fn assert_open_candidate_region(next: &dyn State) {
let _ = next.as_any().downcast_ref::<OpenCandidateRegion>().unwrap();
}
/// Asserts the [State] should be [UpdateMetadata::Downgrade].
pub(crate) fn assert_update_metadata_downgrade(next: &dyn State) {
let state = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
@@ -454,11 +500,25 @@ pub(crate) fn assert_update_metadata_upgrade(next: &dyn State) {
assert_matches!(state, UpdateMetadata::Upgrade);
}
/// Asserts the [State] should be [UpdateMetadata::Rollback].
pub(crate) fn assert_update_metadata_rollback(next: &dyn State) {
let state = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
assert_matches!(state, UpdateMetadata::Rollback);
}
/// Asserts the [State] should be [RegionMigrationEnd].
pub(crate) fn assert_region_migration_end(next: &dyn State) {
let _ = next.as_any().downcast_ref::<RegionMigrationEnd>().unwrap();
}
/// Asserts the [State] should be [RegionMigrationAbort].
pub(crate) fn assert_region_migration_abort(next: &dyn State) {
let _ = next
.as_any()
.downcast_ref::<RegionMigrationAbort>()
.unwrap();
}
/// Asserts the [State] should be [DowngradeLeaderRegion].
pub(crate) fn assert_downgrade_leader_region(next: &dyn State) {
let _ = next
@@ -526,3 +586,39 @@ pub(crate) fn merge_before_test_fn(hooks: Vec<BeforeTest>) -> BeforeTest {
})
})
}
/// The factory of [MailboxMessage].
type MailboxMessageFactory = Arc<dyn Fn(u64) -> Result<MailboxMessage> + Send + Sync>;
/// Merges the batch of [MailboxMessageFactory] and all [MailboxMessageFactory] only will be executed once.
pub(crate) fn merge_mailbox_messages(msgs: Vec<MailboxMessageFactory>) -> MailboxMessageFactory {
let counter = Arc::new(AtomicUsize::new(0));
let l = msgs.len();
Arc::new(move |id| {
let cur = counter.fetch_add(1, Ordering::Relaxed) % l;
debug!("Sending message id: {id} use message[{cur}]");
msgs[cur](id)
})
}
#[test]
fn test_merge_mailbox_messages() {
let merged_factory = merge_mailbox_messages(vec![
Arc::new(|_| error::UnexpectedSnafu { violated: "first" }.fail()),
Arc::new(|_| error::UnexpectedSnafu { violated: "second" }.fail()),
]);
if let error::Error::Unexpected { violated, .. } = merged_factory(0).unwrap_err() {
assert_eq!(violated, "first");
} else {
unreachable!()
}
if let error::Error::Unexpected { violated, .. } = merged_factory(0).unwrap_err() {
assert_eq!(violated, "second");
} else {
unreachable!()
}
}