test: add tests for region migration procedure (#2857)

* feat: add backward compatibility test for persistent ctx

* refactor: refactor State of region migration

* feat: add test utils for region migration tests

* test: add simple region migration tests

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2023-12-08 17:47:09 +09:00
committed by GitHub
parent 7cf9945161
commit 5a99f098c5
13 changed files with 774 additions and 128 deletions

View File

@@ -56,6 +56,14 @@ impl TableRouteValue {
version: self.version + 1,
}
}
/// Returns the version.
///
/// For test purpose.
#[cfg(any(tets, feature = "testing"))]
pub fn version(&self) -> u64 {
self.version
}
}
impl TableMetaKey for TableRouteKey {

View File

@@ -53,7 +53,7 @@ use crate::service::mailbox::{BroadcastChannel, MailboxRef};
/// It will only be updated/stored after the Red node has succeeded.
///
/// **Notes: Stores with too large data in the context might incur replication overhead.**
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PersistentContext {
/// The Id of the cluster.
cluster_id: ClusterId,
@@ -263,14 +263,9 @@ impl Context {
#[async_trait::async_trait]
#[typetag::serde(tag = "region_migration_state")]
trait State: Sync + Send + Debug {
/// Yields the next state.
async fn next(&mut self, ctx: &mut Context) -> Result<Box<dyn State>>;
/// Indicates the procedure execution status of the `State`.
fn status(&self) -> Status {
Status::Executing { persist: true }
}
pub(crate) trait State: Sync + Send + Debug {
/// Yields the next [State] and [Status].
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)>;
/// Returns as [Any](std::any::Any).
fn as_any(&self) -> &dyn Any;
@@ -340,14 +335,16 @@ impl Procedure for RegionMigrationProcedure {
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.state;
*state = state.next(&mut self.context).await.map_err(|e| {
let (next, status) = state.next(&mut self.context).await.map_err(|e| {
if matches!(e, Error::RetryLater { .. }) {
ProcedureError::retry_later(e)
} else {
ProcedureError::external(e)
}
})?;
Ok(state.status())
*state = next;
Ok(status)
}
fn dump(&self) -> ProcedureResult<String> {
@@ -367,20 +364,21 @@ impl Procedure for RegionMigrationProcedure {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::sync::Arc;
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::rpc::router::{Region, RegionRoute};
use super::migration_end::RegionMigrationEnd;
use super::update_metadata::UpdateMetadata;
use super::*;
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::test_util::TestingEnv;
use crate::procedure::region_migration::test_util::*;
use crate::service::mailbox::Channel;
fn new_persistent_context() -> PersistentContext {
PersistentContext {
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
region_id: RegionId::new(1024, 1),
cluster_id: 0,
}
test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
}
#[test]
@@ -414,20 +412,30 @@ mod tests {
assert_eq!(expected, serialized);
}
#[test]
fn test_backward_compatibility() {
let persistent_ctx = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
// NOTES: Changes it will break backward compatibility.
let serialized = r#"{"cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105}"#;
let deserialized: PersistentContext = serde_json::from_str(serialized).unwrap();
assert_eq!(persistent_ctx, deserialized);
}
#[derive(Debug, Serialize, Deserialize, Default)]
pub struct MockState;
#[async_trait::async_trait]
#[typetag::serde]
impl State for MockState {
async fn next(&mut self, ctx: &mut Context) -> Result<Box<dyn State>> {
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
let pc = &mut ctx.persistent_ctx;
if pc.cluster_id == 2 {
Ok(Box::new(RegionMigrationEnd))
Ok((Box::new(RegionMigrationEnd), Status::Done))
} else {
pc.cluster_id += 1;
Ok(Box::new(MockState))
Ok((Box::new(MockState), Status::executing(false)))
}
}
@@ -497,4 +505,145 @@ mod tests {
let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
assert_matches!(instruction, Instruction::InvalidateTableIdCache(1024));
}
fn procedure_flow_steps(from_peer_id: u64, to_peer_id: u64) -> Vec<Step> {
vec![
// MigrationStart
Step::next(
"Should be the update metadata for downgrading",
None,
Assertion::simple(assert_update_metadata_downgrade, assert_need_persist),
),
// UpdateMetadata::Downgrade
Step::next(
"Should be the downgrade leader region",
None,
Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
),
// Downgrade Candidate
Step::next(
"Should be the upgrade candidate region",
Some(mock_datanode_reply(
from_peer_id,
Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
)),
Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
),
// Upgrade Candidate
Step::next(
"Should be the update metadata for upgrading",
Some(mock_datanode_reply(
to_peer_id,
Arc::new(|id| Ok(new_upgrade_region_reply(id, true, true, None))),
)),
Assertion::simple(assert_update_metadata_upgrade, assert_no_persist),
),
// UpdateMetadata::Upgrade
Step::next(
"Should be the region migration end",
None,
Assertion::simple(assert_region_migration_end, assert_done),
),
// RegionMigrationEnd
Step::next(
"Should be the region migration end again",
None,
Assertion::simple(assert_region_migration_end, assert_done),
),
]
}
#[tokio::test]
async fn test_procedure_flow() {
common_telemetry::init_default_ut_logging();
let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
let state = Box::new(RegionMigrationStart);
// The table metadata.
let from_peer_id = persistent_context.from_peer.id;
let to_peer_id = persistent_context.to_peer.id;
let from_peer = persistent_context.from_peer.clone();
let to_peer = persistent_context.to_peer.clone();
let region_id = persistent_context.region_id;
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(from_peer),
follower_peers: vec![to_peer],
..Default::default()
}];
let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
suite.init_table_metadata(table_info, region_routes).await;
let steps = procedure_flow_steps(from_peer_id, to_peer_id);
let timer = Instant::now();
// Run the table tests.
let runner = ProcedureMigrationSuiteRunner::new(suite)
.steps(steps)
.run_once()
.await;
// Ensure it didn't run into the slow path.
assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2);
runner.suite.verify_table_metadata().await;
}
#[tokio::test]
async fn test_procedure_flow_idempotent() {
common_telemetry::init_default_ut_logging();
let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
let state = Box::new(RegionMigrationStart);
// The table metadata.
let from_peer_id = persistent_context.from_peer.id;
let to_peer_id = persistent_context.to_peer.id;
let from_peer = persistent_context.from_peer.clone();
let to_peer = persistent_context.to_peer.clone();
let region_id = persistent_context.region_id;
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(from_peer),
follower_peers: vec![to_peer],
..Default::default()
}];
let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
suite.init_table_metadata(table_info, region_routes).await;
let steps = procedure_flow_steps(from_peer_id, to_peer_id);
let setup_to_latest_persisted_state = Step::setup(
"Sets state to UpdateMetadata::Downgrade",
merge_before_test_fn(vec![
setup_state(Arc::new(|| Box::new(UpdateMetadata::Downgrade))),
Arc::new(reset_volatile_ctx),
]),
);
let steps = [
steps.clone(),
vec![setup_to_latest_persisted_state.clone()],
steps.clone()[1..].to_vec(),
vec![setup_to_latest_persisted_state],
steps.clone()[1..].to_vec(),
]
.concat();
let timer = Instant::now();
// Run the table tests.
let runner = ProcedureMigrationSuiteRunner::new(suite)
.steps(steps.clone())
.run_once()
.await;
// Ensure it didn't run into the slow path.
assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2);
runner.suite.verify_table_metadata().await;
}
}

View File

@@ -20,7 +20,8 @@ use common_meta::distributed_time_constants::{MAILBOX_RTT_SECS, REGION_LEASE_SEC
use common_meta::instruction::{
DowngradeRegion, DowngradeRegionReply, Instruction, InstructionReply,
};
use common_telemetry::warn;
use common_procedure::Status;
use common_telemetry::{info, warn};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use tokio::time::sleep;
@@ -53,18 +54,24 @@ impl Default for DowngradeLeaderRegion {
#[async_trait::async_trait]
#[typetag::serde]
impl State for DowngradeLeaderRegion {
async fn next(&mut self, ctx: &mut Context) -> Result<Box<dyn State>> {
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
// Ensures the `leader_region_lease_deadline` must exist after recovering.
ctx.volatile_ctx
.set_leader_region_lease_deadline(Duration::from_secs(REGION_LEASE_SECS));
self.downgrade_region_with_retry(ctx).await;
// Safety: must exist.
if let Some(deadline) = ctx.volatile_ctx.leader_region_lease_deadline.as_ref() {
info!(
"Running into the downgrade leader slow path, sleep until {:?}",
deadline
);
tokio::time::sleep_until(*deadline).await;
}
Ok(Box::<UpgradeCandidateRegion>::default())
Ok((
Box::<UpgradeCandidateRegion>::default(),
Status::executing(false),
))
}
fn as_any(&self) -> &dyn Any {
@@ -202,16 +209,14 @@ impl DowngradeLeaderRegion {
mod tests {
use std::assert_matches::assert_matches;
use api::v1::meta::mailbox_message::Payload;
use common_meta::peer::Peer;
use common_time::util::current_time_millis;
use store_api::storage::RegionId;
use tokio::time::Instant;
use super::*;
use crate::error::Error;
use crate::procedure::region_migration::test_util::{
new_close_region_reply, send_mock_reply, TestingEnv,
new_close_region_reply, new_downgrade_region_reply, send_mock_reply, TestingEnv,
};
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
@@ -224,29 +229,6 @@ mod tests {
}
}
fn new_downgrade_region_reply(
id: u64,
last_entry_id: Option<u64>,
exist: bool,
error: Option<String>,
) -> MailboxMessage {
MailboxMessage {
id,
subject: "mock".to_string(),
from: "datanode".to_string(),
to: "meta".to_string(),
timestamp_millis: current_time_millis(),
payload: Some(Payload::Json(
serde_json::to_string(&InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id,
exists: exist,
error,
}))
.unwrap(),
)),
}
}
#[tokio::test]
async fn test_datanode_is_unreachable() {
let state = DowngradeLeaderRegion::default();
@@ -504,7 +486,7 @@ mod tests {
});
let timer = Instant::now();
let next = state.next(&mut ctx).await.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();
let elapsed = timer.elapsed().as_secs();
assert!(elapsed < REGION_LEASE_SECS / 2);
assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1));

View File

@@ -37,17 +37,13 @@ impl RegionMigrationAbort {
#[async_trait::async_trait]
#[typetag::serde]
impl State for RegionMigrationAbort {
async fn next(&mut self, _: &mut Context) -> Result<Box<dyn State>> {
async fn next(&mut self, _: &mut Context) -> Result<(Box<dyn State>, Status)> {
error::MigrationAbortSnafu {
reason: &self.reason,
}
.fail()
}
fn status(&self) -> Status {
Status::Done
}
fn as_any(&self) -> &dyn Any {
self
}

View File

@@ -26,12 +26,8 @@ pub struct RegionMigrationEnd;
#[async_trait::async_trait]
#[typetag::serde]
impl State for RegionMigrationEnd {
async fn next(&mut self, _: &mut Context) -> Result<Box<dyn State>> {
Ok(Box::new(RegionMigrationEnd))
}
fn status(&self) -> Status {
Status::Done
async fn next(&mut self, _: &mut Context) -> Result<(Box<dyn State>, Status)> {
Ok((Box::new(RegionMigrationEnd), Status::Done))
}
fn as_any(&self) -> &dyn Any {

View File

@@ -16,16 +16,24 @@ use std::any::Any;
use common_meta::peer::Peer;
use common_meta::rpc::router::RegionRoute;
use common_procedure::Status;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use store_api::storage::RegionId;
use super::downgrade_leader_region::DowngradeLeaderRegion;
use super::migration_end::RegionMigrationEnd;
use super::open_candidate_region::OpenCandidateRegion;
use super::update_metadata::UpdateMetadata;
use crate::error::{self, Result};
use crate::procedure::region_migration::{Context, State};
/// The behaviors:
///
/// If the expected leader region has been opened on `to_peer`, go to the [RegionMigrationEnd] state.
///
/// If the candidate region has been opened on `to_peer`, go to the [UpdateMetadata::Downgrade] state.
///
/// Otherwise go to the [OpenCandidateRegion] state.
#[derive(Debug, Serialize, Deserialize)]
pub struct RegionMigrationStart;
@@ -34,22 +42,22 @@ pub struct RegionMigrationStart;
impl State for RegionMigrationStart {
/// Yields next [State].
///
/// If the expected leader region has been opened on `to_peer`, go to the MigrationEnd state.
/// If the expected leader region has been opened on `to_peer`, go to the [RegionMigrationEnd] state.
///
/// If the candidate region has been opened on `to_peer`, go to the DowngradeLeader state.
/// If the candidate region has been opened on `to_peer`, go to the [UpdateMetadata::Downgrade] state.
///
/// Otherwise go to the OpenCandidateRegion state.
async fn next(&mut self, ctx: &mut Context) -> Result<Box<dyn State>> {
/// Otherwise go to the [OpenCandidateRegion] state.
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
let region_id = ctx.persistent_ctx.region_id;
let region_route = self.retrieve_region_route(ctx, region_id).await?;
let to_peer = &ctx.persistent_ctx.to_peer;
if self.check_leader_region_on_peer(&region_route, to_peer)? {
Ok(Box::new(RegionMigrationEnd))
Ok((Box::new(RegionMigrationEnd), Status::Done))
} else if self.check_candidate_region_on_peer(&region_route, to_peer) {
Ok(Box::<DowngradeLeaderRegion>::default())
Ok((Box::new(UpdateMetadata::Downgrade), Status::executing(true)))
} else {
Ok(Box::new(OpenCandidateRegion))
Ok((Box::new(OpenCandidateRegion), Status::executing(true)))
}
}
@@ -138,6 +146,7 @@ mod tests {
use super::*;
use crate::error::Error;
use crate::procedure::region_migration::test_util::{self, TestingEnv};
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
fn new_persistent_context() -> PersistentContext {
@@ -216,12 +225,11 @@ mod tests {
.await
.unwrap();
let next = state.next(&mut ctx).await.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();
let _ = next
.as_any()
.downcast_ref::<DowngradeLeaderRegion>()
.unwrap();
let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
assert_matches!(update_metadata, UpdateMetadata::Downgrade);
}
#[tokio::test]
@@ -250,7 +258,7 @@ mod tests {
.await
.unwrap();
let next = state.next(&mut ctx).await.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();
let _ = next.as_any().downcast_ref::<RegionMigrationEnd>().unwrap();
}
@@ -277,7 +285,7 @@ mod tests {
.await
.unwrap();
let next = state.next(&mut ctx).await.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();
let _ = next.as_any().downcast_ref::<OpenCandidateRegion>().unwrap();
}

View File

@@ -21,6 +21,7 @@ use common_meta::ddl::utils::region_storage_path;
use common_meta::distributed_time_constants::MAILBOX_RTT_SECS;
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
use common_meta::RegionIdent;
use common_procedure::Status;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
@@ -38,11 +39,14 @@ pub struct OpenCandidateRegion;
#[async_trait::async_trait]
#[typetag::serde]
impl State for OpenCandidateRegion {
async fn next(&mut self, ctx: &mut Context) -> Result<Box<dyn State>> {
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
let instruction = self.build_open_region_instruction(ctx).await?;
self.open_candidate_region(ctx, instruction).await?;
Ok(Box::<DowngradeLeaderRegion>::default())
Ok((
Box::<DowngradeLeaderRegion>::default(),
Status::executing(false),
))
}
fn as_any(&self) -> &dyn Any {
@@ -430,7 +434,7 @@ mod tests {
send_mock_reply(mailbox, rx, |id| Ok(new_open_region_reply(id, true, None)));
let next = state.next(&mut ctx).await.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();
let vc = ctx.volatile_ctx;
assert_eq!(
vc.opening_region_guard.unwrap().info(),

View File

@@ -12,24 +12,36 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches::assert_matches;
use std::sync::Arc;
use api::v1::meta::mailbox_message::Payload;
use api::v1::meta::{HeartbeatResponse, MailboxMessage, RequestHeader};
use common_meta::instruction::{InstructionReply, SimpleReply};
use common_meta::instruction::{
DowngradeRegionReply, InstructionReply, SimpleReply, UpgradeRegionReply,
};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::peer::Peer;
use common_meta::rpc::router::RegionRoute;
use common_meta::sequence::Sequence;
use common_procedure::{Context as ProcedureContext, ProcedureId};
use common_meta::DatanodeId;
use common_procedure::{Context as ProcedureContext, ProcedureId, Status};
use common_procedure_test::MockContextProvider;
use common_telemetry::debug;
use common_time::util::current_time_millis;
use futures::future::BoxFuture;
use store_api::storage::RegionId;
use table::metadata::RawTableInfo;
use tokio::sync::mpsc::{Receiver, Sender};
use super::ContextFactoryImpl;
use super::upgrade_candidate_region::UpgradeCandidateRegion;
use super::{Context, ContextFactory, ContextFactoryImpl, State, VolatileContext};
use crate::error::Result;
use crate::handler::{HeartbeatMailbox, Pusher, Pushers};
use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::PersistentContext;
use crate::region::lease_keeper::{OpeningRegionKeeper, OpeningRegionKeeperRef};
use crate::service::mailbox::{Channel, MailboxRef};
@@ -147,11 +159,59 @@ pub fn new_close_region_reply(id: u64) -> MailboxMessage {
}
}
/// Generates a [InstructionReply::DowngradeRegion] reply.
pub fn new_downgrade_region_reply(
id: u64,
last_entry_id: Option<u64>,
exist: bool,
error: Option<String>,
) -> MailboxMessage {
MailboxMessage {
id,
subject: "mock".to_string(),
from: "datanode".to_string(),
to: "meta".to_string(),
timestamp_millis: current_time_millis(),
payload: Some(Payload::Json(
serde_json::to_string(&InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id,
exists: exist,
error,
}))
.unwrap(),
)),
}
}
/// Generates a [InstructionReply::UpgradeRegion] reply.
pub fn new_upgrade_region_reply(
id: u64,
ready: bool,
exists: bool,
error: Option<String>,
) -> MailboxMessage {
MailboxMessage {
id,
subject: "mock".to_string(),
from: "datanode".to_string(),
to: "meta".to_string(),
timestamp_millis: current_time_millis(),
payload: Some(Payload::Json(
serde_json::to_string(&InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready,
exists,
error,
}))
.unwrap(),
)),
}
}
/// Sends a mock reply.
pub fn send_mock_reply(
mailbox: MailboxRef,
mut rx: MockHeartbeatReceiver,
msg: impl FnOnce(u64) -> Result<MailboxMessage> + Send + 'static,
msg: impl Fn(u64) -> Result<MailboxMessage> + Send + 'static,
) {
common_runtime::spawn_bg(async move {
let resp = rx.recv().await.unwrap().unwrap();
@@ -169,3 +229,300 @@ pub fn new_persistent_context(from: u64, to: u64, region_id: RegionId) -> Persis
cluster_id: 0,
}
}
/// The test suite for region migration procedure.
pub(crate) struct ProcedureMigrationTestSuite {
pub(crate) env: TestingEnv,
context: Context,
state: Box<dyn State>,
}
/// The hook is called before the test starts.
pub(crate) type BeforeTest =
Arc<dyn Fn(&mut ProcedureMigrationTestSuite) -> BoxFuture<'_, ()> + Send + Sync>;
/// Custom assertion.
pub(crate) type CustomAssertion = Arc<
dyn Fn(
&mut ProcedureMigrationTestSuite,
Result<(Box<dyn State>, Status)>,
) -> BoxFuture<'_, Result<()>>
+ Send
+ Sync,
>;
/// State assertion function.
pub(crate) type StateAssertion = Arc<dyn Fn(&dyn State) + Send + Sync>;
/// Status assertion function.
pub(crate) type StatusAssertion = Arc<dyn Fn(Status) + Send + Sync>;
// TODO(weny): Remove it.
#[allow(dead_code)]
/// The type of assertion.
#[derive(Clone)]
pub(crate) enum Assertion {
Simple(StateAssertion, StatusAssertion),
Custom(CustomAssertion),
}
impl Assertion {
/// Returns an [Assertion::Simple].
pub(crate) fn simple<
T: Fn(&dyn State) + Send + Sync + 'static,
U: Fn(Status) + Send + Sync + 'static,
>(
state: T,
status: U,
) -> Self {
Self::Simple(Arc::new(state), Arc::new(status))
}
}
impl ProcedureMigrationTestSuite {
/// Returns a [ProcedureMigrationTestSuite].
pub(crate) fn new(persistent_ctx: PersistentContext, start: Box<dyn State>) -> Self {
let env = TestingEnv::new();
let context = env.context_factory().new_context(persistent_ctx);
Self {
env,
context,
state: start,
}
}
/// Mocks the `next` of [State] is called.
pub(crate) async fn next(
&mut self,
name: &str,
before: Option<BeforeTest>,
assertion: Assertion,
) -> Result<()> {
debug!("suite test: {name}");
if let Some(before) = before {
before(self).await;
}
debug!("suite test: {name} invoking next");
let result = self.state.next(&mut self.context).await;
match assertion {
Assertion::Simple(state_assert, status_assert) => {
let (next, status) = result?;
state_assert(&*next);
status_assert(status);
self.state = next;
}
Assertion::Custom(assert_fn) => {
assert_fn(self, result);
}
}
Ok(())
}
/// Initializes table metadata.
pub(crate) async fn init_table_metadata(
&self,
table_info: RawTableInfo,
region_routes: Vec<RegionRoute>,
) {
self.env
.table_metadata_manager()
.create_table_metadata(table_info, region_routes)
.await
.unwrap();
}
/// Verifies table metadata after region migration.
pub(crate) async fn verify_table_metadata(&self) {
let region_id = self.context.persistent_ctx.region_id;
let region_routes = self
.env
.table_metadata_manager
.table_route_manager()
.get(region_id.table_id())
.await
.unwrap()
.unwrap()
.into_inner()
.region_routes;
let expected_leader_id = self.context.persistent_ctx.to_peer.id;
let removed_follower_id = self.context.persistent_ctx.from_peer.id;
let region_route = region_routes
.into_iter()
.find(|route| route.region.id == region_id)
.unwrap();
assert!(!region_route.is_leader_downgraded());
assert_eq!(region_route.leader_peer.unwrap().id, expected_leader_id);
assert!(!region_route
.follower_peers
.into_iter()
.any(|route| route.id == removed_follower_id))
}
}
/// The step of test.
#[derive(Clone)]
pub enum Step {
Setup((String, BeforeTest)),
Next((String, Option<BeforeTest>, Assertion)),
}
impl Step {
/// Returns the [Step::Setup].
pub(crate) fn setup(name: &str, before: BeforeTest) -> Self {
Self::Setup((name.to_string(), before))
}
/// Returns the [Step::Next].
pub(crate) fn next(name: &str, before: Option<BeforeTest>, assertion: Assertion) -> Self {
Self::Next((name.to_string(), before, assertion))
}
}
/// The test runner of [ProcedureMigrationTestSuite].
pub(crate) struct ProcedureMigrationSuiteRunner {
pub(crate) suite: ProcedureMigrationTestSuite,
steps: Vec<Step>,
}
impl ProcedureMigrationSuiteRunner {
/// Returns the [ProcedureMigrationSuiteRunner]
pub(crate) fn new(suite: ProcedureMigrationTestSuite) -> Self {
Self {
suite,
steps: vec![],
}
}
/// Sets [Step]s .
pub(crate) fn steps(self, steps: Vec<Step>) -> Self {
Self {
suite: self.suite,
steps,
}
}
/// Consumes all steps and runs once.
pub(crate) async fn run_once(mut self) -> Self {
for step in self.steps.drain(..) {
match step {
Step::Setup((name, before)) => {
debug!("Running the before hook: {name}");
before(&mut self.suite).await;
}
Step::Next((name, before, assertion)) => {
self.suite.next(&name, before, assertion).await.unwrap();
}
}
}
self
}
}
/// Asserts the [Status] needs to be persistent.
pub(crate) fn assert_need_persist(status: Status) {
assert!(status.need_persist());
}
/// Asserts the [Status] doesn't need to be persistent.
pub(crate) fn assert_no_persist(status: Status) {
assert!(!status.need_persist());
}
/// Asserts the [Status] should be [Status::Done].
pub(crate) fn assert_done(status: Status) {
assert_matches!(status, Status::Done)
}
/// Asserts the [State] should be [UpdateMetadata::Downgrade].
pub(crate) fn assert_update_metadata_downgrade(next: &dyn State) {
let state = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
assert_matches!(state, UpdateMetadata::Downgrade);
}
/// Asserts the [State] should be [UpdateMetadata::Upgrade].
pub(crate) fn assert_update_metadata_upgrade(next: &dyn State) {
let state = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
assert_matches!(state, UpdateMetadata::Upgrade);
}
/// Asserts the [State] should be [RegionMigrationEnd].
pub(crate) fn assert_region_migration_end(next: &dyn State) {
let _ = next.as_any().downcast_ref::<RegionMigrationEnd>().unwrap();
}
/// Asserts the [State] should be [DowngradeLeaderRegion].
pub(crate) fn assert_downgrade_leader_region(next: &dyn State) {
let _ = next
.as_any()
.downcast_ref::<DowngradeLeaderRegion>()
.unwrap();
}
/// Asserts the [State] should be [UpgradeCandidateRegion].
pub(crate) fn assert_upgrade_candidate_region(next: &dyn State) {
let _ = next
.as_any()
.downcast_ref::<UpgradeCandidateRegion>()
.unwrap();
}
/// Mocks the reply from the datanode.
pub(crate) fn mock_datanode_reply(
peer_id: DatanodeId,
msg: Arc<dyn Fn(u64) -> Result<MailboxMessage> + Send + Sync>,
) -> BeforeTest {
Arc::new(move |suite| {
let msg_moved = msg.clone();
Box::pin(async move {
let mailbox_ctx = suite.env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
let (tx, rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(peer_id), tx)
.await;
send_mock_reply(mailbox, rx, move |id| msg_moved(id));
})
})
}
/// Setups the [State] of the [ProcedureMigrationTestSuite].
pub(crate) fn setup_state(
state_factory: Arc<dyn Fn() -> Box<dyn State> + Send + Sync>,
) -> BeforeTest {
Arc::new(move |suite| {
let factory_moved = state_factory.clone();
Box::pin(async move {
suite.state = factory_moved();
})
})
}
/// Setups the [VolatileContext] of the [Context].
pub(crate) fn reset_volatile_ctx(suite: &mut ProcedureMigrationTestSuite) -> BoxFuture<'_, ()> {
Box::pin(async {
suite.context.volatile_ctx = VolatileContext::default();
})
}
/// Merges the batch of [BeforeTest].
pub(crate) fn merge_before_test_fn(hooks: Vec<BeforeTest>) -> BeforeTest {
Arc::new(move |suite| {
let hooks_moved = hooks.clone();
Box::pin(async move {
for hook in hooks_moved {
hook(suite).await;
}
})
})
}

View File

@@ -18,6 +18,7 @@ pub(crate) mod upgrade_candidate_region;
use std::any::Any;
use common_procedure::Status;
use common_telemetry::warn;
use serde::{Deserialize, Serialize};
@@ -41,12 +42,15 @@ pub enum UpdateMetadata {
#[async_trait::async_trait]
#[typetag::serde]
impl State for UpdateMetadata {
async fn next(&mut self, ctx: &mut Context) -> Result<Box<dyn State>> {
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
match self {
UpdateMetadata::Downgrade => {
self.downgrade_leader_region(ctx).await?;
Ok(Box::<DowngradeLeaderRegion>::default())
Ok((
Box::<DowngradeLeaderRegion>::default(),
Status::executing(false),
))
}
UpdateMetadata::Upgrade => {
self.upgrade_candidate_region(ctx).await?;
@@ -54,7 +58,7 @@ impl State for UpdateMetadata {
if let Err(err) = ctx.invalidate_table_cache().await {
warn!("Failed to broadcast the invalidate table cache message during the upgrade candidate, error: {err:?}");
};
Ok(Box::new(RegionMigrationEnd))
Ok((Box::new(RegionMigrationEnd), Status::Done))
}
UpdateMetadata::Rollback => {
self.rollback_downgraded_region(ctx).await?;
@@ -62,9 +66,12 @@ impl State for UpdateMetadata {
if let Err(err) = ctx.invalidate_table_cache().await {
warn!("Failed to broadcast the invalidate table cache message during the rollback, error: {err:?}");
};
Ok(Box::new(RegionMigrationAbort::new(
"Failed to upgrade the candidate region.",
)))
Ok((
Box::new(RegionMigrationAbort::new(
"Failed to upgrade the candidate region.",
)),
Status::executing(false),
))
}
}
}

View File

@@ -38,13 +38,19 @@ impl UpdateMetadata {
/// - There is no other DDL procedure executed concurrently for the current table.
pub async fn downgrade_leader_region(&self, ctx: &mut Context) -> Result<()> {
let table_metadata_manager = ctx.table_metadata_manager.clone();
let from_peer_id = ctx.persistent_ctx.from_peer.id;
let region_id = ctx.region_id();
let table_id = region_id.table_id();
let current_table_route_value = ctx.get_table_route_value().await?;
if let Err(err) = table_metadata_manager
.update_leader_region_status(table_id, current_table_route_value, |route| {
if route.region.id == region_id {
if route.region.id == region_id
&& route
.leader_peer
.as_ref()
.is_some_and(|leader_peer| leader_peer.id == from_peer_id)
{
Some(Some(RegionStatus::Downgraded))
} else {
None
@@ -167,6 +173,48 @@ mod tests {
assert!(err.to_string().contains("Failed to update the table route"));
}
#[tokio::test]
async fn test_only_downgrade_from_peer() {
let mut state = Box::new(UpdateMetadata::Downgrade);
let persistent_context = new_persistent_context();
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let table_id = ctx.region_id().table_id();
let table_info = new_test_table_info(1024, vec![1, 2]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),
leader_peer: Some(Peer::empty(1024)),
..Default::default()
}];
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes)
.await
.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();
let _ = next
.as_any()
.downcast_ref::<DowngradeLeaderRegion>()
.unwrap();
let latest_table_route = table_metadata_manager
.table_route_manager()
.get(table_id)
.await
.unwrap()
.unwrap();
// It should remain unchanged.
assert_eq!(latest_table_route.version(), 0);
assert!(!latest_table_route.region_routes[0].is_leader_downgraded());
assert!(ctx.volatile_ctx.table_route.is_none());
}
#[tokio::test]
async fn test_next_downgrade_leader_region_state() {
let mut state = Box::new(UpdateMetadata::Downgrade);
@@ -190,7 +238,7 @@ mod tests {
.await
.unwrap();
let next = state.next(&mut ctx).await.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();
let _ = next
.as_any()

View File

@@ -219,7 +219,7 @@ mod tests {
.await
.unwrap();
let next = state.next(&mut ctx).await.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();
let _ = next
.as_any()

View File

@@ -76,6 +76,40 @@ impl UpdateMetadata {
Ok(region_routes)
}
/// Returns true if region metadata has been updated.
async fn check_metadata_updated(&self, ctx: &mut Context) -> Result<bool> {
let region_id = ctx.region_id();
let table_route_value = ctx.get_table_route_value().await?.clone();
let region_routes = table_route_value.region_routes.clone();
let region_route = region_routes
.into_iter()
.find(|route| route.region.id == region_id)
.context(error::RegionRouteNotFoundSnafu { region_id })?;
let leader_peer = region_route
.leader_peer
.as_ref()
.context(error::UnexpectedSnafu {
violated: format!("The leader peer of region {region_id} is not found during the update metadata for upgrading"),
})?;
let candidate_peer_id = ctx.persistent_ctx.to_peer.id;
if leader_peer.id == candidate_peer_id {
ensure!(
!region_route.is_leader_downgraded(),
error::UnexpectedSnafu {
violated: format!("Unexpected intermediate state is found during the update metadata for upgrading region {region_id}"),
}
);
Ok(true)
} else {
Ok(false)
}
}
/// Upgrades the candidate region.
///
/// Abort(non-retry):
@@ -89,6 +123,10 @@ impl UpdateMetadata {
let region_id = ctx.region_id();
let table_metadata_manager = ctx.table_metadata_manager.clone();
if self.check_metadata_updated(ctx).await? {
return Ok(());
}
let region_routes = self.build_upgrade_candidate_region_metadata(ctx).await?;
let table_info_value = ctx.get_table_info_value().await?;
@@ -325,6 +363,85 @@ mod tests {
assert!(err.to_string().contains("Failed to update the table route"));
}
#[tokio::test]
async fn test_check_metadata() {
let state = UpdateMetadata::Upgrade;
let env = TestingEnv::new();
let persistent_context = new_persistent_context();
let leader_peer = persistent_context.from_peer.clone();
let mut ctx = env.context_factory().new_context(persistent_context);
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),
leader_peer: Some(leader_peer),
follower_peers: vec![Peer::empty(2), Peer::empty(3)],
leader_status: None,
}];
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes)
.await
.unwrap();
let updated = state.check_metadata_updated(&mut ctx).await.unwrap();
assert!(!updated);
}
#[tokio::test]
async fn test_check_metadata_updated() {
let state = UpdateMetadata::Upgrade;
let env = TestingEnv::new();
let persistent_context = new_persistent_context();
let candidate_peer = persistent_context.to_peer.clone();
let mut ctx = env.context_factory().new_context(persistent_context);
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),
leader_peer: Some(candidate_peer),
follower_peers: vec![Peer::empty(2), Peer::empty(3)],
leader_status: None,
}];
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes)
.await
.unwrap();
let updated = state.check_metadata_updated(&mut ctx).await.unwrap();
assert!(updated);
}
#[tokio::test]
async fn test_check_metadata_intermediate_state() {
let state = UpdateMetadata::Upgrade;
let env = TestingEnv::new();
let persistent_context = new_persistent_context();
let candidate_peer = persistent_context.to_peer.clone();
let mut ctx = env.context_factory().new_context(persistent_context);
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),
leader_peer: Some(candidate_peer),
follower_peers: vec![Peer::empty(2), Peer::empty(3)],
leader_status: Some(RegionStatus::Downgraded),
}];
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes)
.await
.unwrap();
let err = state.check_metadata_updated(&mut ctx).await.unwrap_err();
assert_matches!(err, Error::Unexpected { .. });
assert!(err.to_string().contains("intermediate state"));
}
#[tokio::test]
async fn test_next_migration_end_state() {
let mut state = Box::new(UpdateMetadata::Upgrade);
@@ -353,7 +470,7 @@ mod tests {
.await
.unwrap();
let next = state.next(&mut ctx).await.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();
let _ = next.as_any().downcast_ref::<RegionMigrationEnd>().unwrap();

View File

@@ -18,6 +18,7 @@ use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_meta::distributed_time_constants::MAILBOX_RTT_SECS;
use common_meta::instruction::{Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply};
use common_procedure::Status;
use common_telemetry::warn;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
@@ -56,11 +57,11 @@ impl Default for UpgradeCandidateRegion {
#[async_trait::async_trait]
#[typetag::serde]
impl State for UpgradeCandidateRegion {
async fn next(&mut self, ctx: &mut Context) -> Result<Box<dyn State>> {
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
if self.upgrade_region_with_retry(ctx).await {
Ok(Box::new(UpdateMetadata::Upgrade))
Ok((Box::new(UpdateMetadata::Upgrade), Status::executing(false)))
} else {
Ok(Box::new(UpdateMetadata::Rollback))
Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)))
}
}
@@ -219,15 +220,13 @@ impl UpgradeCandidateRegion {
mod tests {
use std::assert_matches::assert_matches;
use api::v1::meta::mailbox_message::Payload;
use common_meta::peer::Peer;
use common_time::util::current_time_millis;
use store_api::storage::RegionId;
use super::*;
use crate::error::Error;
use crate::procedure::region_migration::test_util::{
new_close_region_reply, send_mock_reply, TestingEnv,
new_close_region_reply, new_upgrade_region_reply, send_mock_reply, TestingEnv,
};
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
@@ -240,31 +239,6 @@ mod tests {
}
}
fn new_upgrade_region_reply(
id: u64,
ready: bool,
exists: bool,
error: Option<String>,
) -> MailboxMessage {
MailboxMessage {
id,
subject: "mock".to_string(),
from: "datanode".to_string(),
to: "meta".to_string(),
timestamp_millis: current_time_millis(),
payload: Some(Payload::Json(
serde_json::to_string(&InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready,
exists,
error,
}))
.unwrap(),
)),
}
}
#[tokio::test]
async fn test_datanode_is_unreachable() {
let state = UpgradeCandidateRegion::default();
@@ -495,7 +469,7 @@ mod tests {
.unwrap();
});
let next = state.next(&mut ctx).await.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();
let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
@@ -554,7 +528,7 @@ mod tests {
.unwrap();
});
let next = state.next(&mut ctx).await.unwrap();
let (next, _) = state.next(&mut ctx).await.unwrap();
let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
assert_matches!(update_metadata, UpdateMetadata::Rollback);