mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 21:02:58 +00:00
feat: add migration start step (#2756)
* feat: add migration start state * refactor: move PersistentContext and VolatileContext into Context * chore: apply suggestions from CR
This commit is contained in:
@@ -551,6 +551,13 @@ pub enum Error {
|
||||
},
|
||||
}
|
||||
|
||||
impl Error {
|
||||
/// Returns `true` if the error is retryable.
|
||||
pub fn is_retryable(&self) -> bool {
|
||||
matches!(self, Error::RetryLater { .. })
|
||||
}
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
define_into_tonic_status!(Error);
|
||||
|
||||
@@ -12,12 +12,19 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub(crate) mod downgrade_leader_region;
|
||||
pub(crate) mod migration_end;
|
||||
pub(crate) mod migration_start;
|
||||
pub(crate) mod open_candidate_region;
|
||||
#[cfg(test)]
|
||||
pub(crate) mod test_util;
|
||||
|
||||
use std::any::Any;
|
||||
use std::fmt::Debug;
|
||||
|
||||
use common_meta::key::TableMetadataManagerRef;
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::ClusterId;
|
||||
use common_procedure::error::{
|
||||
Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
|
||||
};
|
||||
@@ -37,10 +44,12 @@ use crate::procedure::utils::region_lock_key;
|
||||
/// **Notes: Stores with too large data in the context might incur replication overhead.**
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct PersistentContext {
|
||||
/// The Id of the cluster.
|
||||
cluster_id: ClusterId,
|
||||
/// The [Peer] of migration source.
|
||||
from_peer: Peer,
|
||||
/// The [Peer] of migration destination.
|
||||
to_peer: Option<Peer>,
|
||||
to_peer: Peer,
|
||||
/// The [RegionId] of migration region.
|
||||
region_id: RegionId,
|
||||
}
|
||||
@@ -59,39 +68,75 @@ impl PersistentContext {
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct VolatileContext {}
|
||||
|
||||
/// Used to generate new [Context].
|
||||
pub trait ContextFactory {
|
||||
fn new_context(self, persistent_ctx: PersistentContext) -> Context;
|
||||
}
|
||||
|
||||
/// Default implementation.
|
||||
pub struct ContextFactoryImpl {
|
||||
volatile_ctx: VolatileContext,
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
}
|
||||
|
||||
impl ContextFactory for ContextFactoryImpl {
|
||||
fn new_context(self, persistent_ctx: PersistentContext) -> Context {
|
||||
Context {
|
||||
persistent_ctx,
|
||||
volatile_ctx: self.volatile_ctx,
|
||||
table_metadata_manager: self.table_metadata_manager,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(weny): remove it.
|
||||
#[allow(dead_code)]
|
||||
/// The context of procedure execution.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Context {}
|
||||
pub struct Context {
|
||||
persistent_ctx: PersistentContext,
|
||||
volatile_ctx: VolatileContext,
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
}
|
||||
|
||||
impl Context {
|
||||
/// Returns address of meta server.
|
||||
pub fn server_addr(&self) -> &str {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
#[typetag::serde(tag = "region_migration_state")]
|
||||
trait State: Sync + Send + Debug {
|
||||
/// Yields the next state.
|
||||
async fn next(
|
||||
&mut self,
|
||||
ctx: &Context,
|
||||
pc: &mut PersistentContext,
|
||||
vc: &mut VolatileContext,
|
||||
) -> Result<Box<dyn State>>;
|
||||
async fn next(&mut self, ctx: &mut Context) -> Result<Box<dyn State>>;
|
||||
|
||||
/// Indicates the procedure execution status of the `State`.
|
||||
fn status(&self) -> Status {
|
||||
Status::Executing { persist: true }
|
||||
}
|
||||
|
||||
/// Returns as [Any](std::any::Any).
|
||||
fn as_any(&self) -> &dyn Any;
|
||||
}
|
||||
|
||||
/// Persistent data of [RegionMigrationProcedure].
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct RegionMigrationData {
|
||||
context: PersistentContext,
|
||||
pub struct RegionMigrationDataOwned {
|
||||
persistent_ctx: PersistentContext,
|
||||
state: Box<dyn State>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Persistent data of [RegionMigrationProcedure].
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct RegionMigrationData<'a> {
|
||||
persistent_ctx: &'a PersistentContext,
|
||||
state: &'a dyn State,
|
||||
}
|
||||
|
||||
pub struct RegionMigrationProcedure {
|
||||
data: RegionMigrationData,
|
||||
state: Box<dyn State>,
|
||||
context: Context,
|
||||
volatile_context: VolatileContext,
|
||||
}
|
||||
|
||||
// TODO(weny): remove it.
|
||||
@@ -99,34 +144,34 @@ pub struct RegionMigrationProcedure {
|
||||
impl RegionMigrationProcedure {
|
||||
const TYPE_NAME: &str = "metasrv-procedure::RegionMigration";
|
||||
|
||||
pub fn new(persistent_context: PersistentContext, context: Context) -> Self {
|
||||
pub fn new(
|
||||
persistent_context: PersistentContext,
|
||||
context_factory: impl ContextFactory,
|
||||
) -> Self {
|
||||
let state = Box::new(RegionMigrationStart {});
|
||||
Self::new_inner(state, persistent_context, context)
|
||||
Self::new_inner(state, persistent_context, context_factory)
|
||||
}
|
||||
|
||||
fn new_inner(
|
||||
state: Box<dyn State>,
|
||||
persistent_context: PersistentContext,
|
||||
context: Context,
|
||||
context_factory: impl ContextFactory,
|
||||
) -> Self {
|
||||
Self {
|
||||
data: RegionMigrationData {
|
||||
context: persistent_context,
|
||||
state,
|
||||
},
|
||||
context,
|
||||
volatile_context: VolatileContext::default(),
|
||||
state,
|
||||
context: context_factory.new_context(persistent_context),
|
||||
}
|
||||
}
|
||||
|
||||
fn from_json(json: &str, context: Context) -> ProcedureResult<Self> {
|
||||
let data: RegionMigrationData = serde_json::from_str(json).context(FromJsonSnafu)?;
|
||||
fn from_json(json: &str, context_factory: impl ContextFactory) -> ProcedureResult<Self> {
|
||||
let RegionMigrationDataOwned {
|
||||
persistent_ctx,
|
||||
state,
|
||||
} = serde_json::from_str(json).context(FromJsonSnafu)?;
|
||||
|
||||
Ok(Self {
|
||||
data,
|
||||
context,
|
||||
volatile_context: VolatileContext::default(),
|
||||
})
|
||||
let context = context_factory.new_context(persistent_ctx);
|
||||
|
||||
Ok(Self { state, context })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -137,69 +182,56 @@ impl Procedure for RegionMigrationProcedure {
|
||||
}
|
||||
|
||||
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
|
||||
let data = &mut self.data;
|
||||
let state = &mut data.state;
|
||||
let persistent_context = &mut data.context;
|
||||
let volatile_context = &mut self.volatile_context;
|
||||
let state = &mut self.state;
|
||||
|
||||
*state = state
|
||||
.next(&self.context, persistent_context, volatile_context)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if matches!(e, Error::RetryLater { .. }) {
|
||||
ProcedureError::retry_later(e)
|
||||
} else {
|
||||
ProcedureError::external(e)
|
||||
}
|
||||
})?;
|
||||
*state = state.next(&mut self.context).await.map_err(|e| {
|
||||
if matches!(e, Error::RetryLater { .. }) {
|
||||
ProcedureError::retry_later(e)
|
||||
} else {
|
||||
ProcedureError::external(e)
|
||||
}
|
||||
})?;
|
||||
Ok(state.status())
|
||||
}
|
||||
|
||||
fn dump(&self) -> ProcedureResult<String> {
|
||||
serde_json::to_string(&self.data).context(ToJsonSnafu)
|
||||
let data = RegionMigrationData {
|
||||
state: self.state.as_ref(),
|
||||
persistent_ctx: &self.context.persistent_ctx,
|
||||
};
|
||||
serde_json::to_string(&data).context(ToJsonSnafu)
|
||||
}
|
||||
|
||||
fn lock_key(&self) -> LockKey {
|
||||
LockKey::single(self.data.context.lock_key())
|
||||
let key = self.context.persistent_ctx.lock_key();
|
||||
LockKey::single(key)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_procedure::ProcedureId;
|
||||
use common_procedure_test::MockContextProvider;
|
||||
|
||||
use super::migration_end::RegionMigrationEnd;
|
||||
use super::*;
|
||||
use crate::procedure::region_migration::test_util::TestingEnv;
|
||||
|
||||
fn persistent_context_factory() -> PersistentContext {
|
||||
fn new_persistent_context() -> PersistentContext {
|
||||
PersistentContext {
|
||||
from_peer: Peer::empty(1),
|
||||
to_peer: None,
|
||||
to_peer: Peer::empty(2),
|
||||
region_id: RegionId::new(1024, 1),
|
||||
}
|
||||
}
|
||||
|
||||
fn context_factory() -> Context {
|
||||
Context {}
|
||||
}
|
||||
|
||||
fn procedure_context_factory() -> ProcedureContext {
|
||||
ProcedureContext {
|
||||
procedure_id: ProcedureId::random(),
|
||||
provider: Arc::new(MockContextProvider::default()),
|
||||
cluster_id: 0,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_lock_key() {
|
||||
let persistent_context = persistent_context_factory();
|
||||
let persistent_context = new_persistent_context();
|
||||
let expected_key = persistent_context.lock_key();
|
||||
|
||||
let context = context_factory();
|
||||
let env = TestingEnv::new();
|
||||
let context = env.context_factory();
|
||||
|
||||
let procedure = RegionMigrationProcedure::new(persistent_context, context);
|
||||
|
||||
@@ -211,72 +243,75 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_data_serialization() {
|
||||
let persistent_context = persistent_context_factory();
|
||||
let persistent_context = new_persistent_context();
|
||||
|
||||
let context = context_factory();
|
||||
let env = TestingEnv::new();
|
||||
let context = env.context_factory();
|
||||
|
||||
let procedure = RegionMigrationProcedure::new(persistent_context, context);
|
||||
|
||||
let serialized = procedure.dump().unwrap();
|
||||
|
||||
let expected = r#"{"context":{"from_peer":{"id":1,"addr":""},"to_peer":null,"region_id":4398046511105},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
|
||||
let expected = r#"{"persistent_ctx":{"cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
|
||||
assert_eq!(expected, serialized);
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Default)]
|
||||
pub struct MockState {
|
||||
count: usize,
|
||||
}
|
||||
pub struct MockState;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
#[typetag::serde]
|
||||
impl State for MockState {
|
||||
async fn next(
|
||||
&mut self,
|
||||
_: &Context,
|
||||
_: &mut PersistentContext,
|
||||
_: &mut VolatileContext,
|
||||
) -> Result<Box<dyn State>> {
|
||||
if self.count == 2 {
|
||||
async fn next(&mut self, ctx: &mut Context) -> Result<Box<dyn State>> {
|
||||
let pc = &mut ctx.persistent_ctx;
|
||||
|
||||
if pc.cluster_id == 2 {
|
||||
Ok(Box::new(RegionMigrationEnd))
|
||||
} else {
|
||||
Ok(Box::new(MockState {
|
||||
count: self.count + 1,
|
||||
}))
|
||||
pc.cluster_id += 1;
|
||||
Ok(Box::new(MockState))
|
||||
}
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_execution_after_deserialized() {
|
||||
fn new_mock_procedure() -> RegionMigrationProcedure {
|
||||
let persistent_context = persistent_context_factory();
|
||||
let context = context_factory();
|
||||
let env = TestingEnv::new();
|
||||
|
||||
fn new_mock_procedure(env: &TestingEnv) -> RegionMigrationProcedure {
|
||||
let persistent_context = new_persistent_context();
|
||||
let context_factory = env.context_factory();
|
||||
let state = Box::<MockState>::default();
|
||||
RegionMigrationProcedure::new_inner(state, persistent_context, context)
|
||||
RegionMigrationProcedure::new_inner(state, persistent_context, context_factory)
|
||||
}
|
||||
|
||||
let ctx = procedure_context_factory();
|
||||
let mut procedure = new_mock_procedure();
|
||||
let ctx = TestingEnv::procedure_context();
|
||||
let mut procedure = new_mock_procedure(&env);
|
||||
let mut status = None;
|
||||
for _ in 0..3 {
|
||||
status = Some(procedure.execute(&ctx).await.unwrap());
|
||||
}
|
||||
assert_matches!(status.unwrap(), Status::Done);
|
||||
|
||||
let ctx = procedure_context_factory();
|
||||
let mut procedure = new_mock_procedure();
|
||||
let ctx = TestingEnv::procedure_context();
|
||||
let mut procedure = new_mock_procedure(&env);
|
||||
|
||||
status = Some(procedure.execute(&ctx).await.unwrap());
|
||||
|
||||
let serialized = procedure.dump().unwrap();
|
||||
|
||||
let context = context_factory();
|
||||
let mut procedure = RegionMigrationProcedure::from_json(&serialized, context).unwrap();
|
||||
let context_factory = env.context_factory();
|
||||
let mut procedure =
|
||||
RegionMigrationProcedure::from_json(&serialized, context_factory).unwrap();
|
||||
|
||||
for _ in 1..3 {
|
||||
status = Some(procedure.execute(&ctx).await.unwrap());
|
||||
}
|
||||
assert_eq!(procedure.context.persistent_ctx.cluster_id, 2);
|
||||
assert_matches!(status.unwrap(), Status::Done);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,35 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::procedure::region_migration::{Context, State};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct DowngradeLeaderRegion;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
#[typetag::serde]
|
||||
impl State for DowngradeLeaderRegion {
|
||||
async fn next(&mut self, _ctx: &mut Context) -> Result<Box<dyn State>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
@@ -12,11 +12,13 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
|
||||
use common_procedure::Status;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::procedure::region_migration::{Context, PersistentContext, State, VolatileContext};
|
||||
use crate::procedure::region_migration::{Context, State};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct RegionMigrationEnd;
|
||||
@@ -24,16 +26,15 @@ pub struct RegionMigrationEnd;
|
||||
#[async_trait::async_trait]
|
||||
#[typetag::serde]
|
||||
impl State for RegionMigrationEnd {
|
||||
async fn next(
|
||||
&mut self,
|
||||
_: &Context,
|
||||
_: &mut PersistentContext,
|
||||
_: &mut VolatileContext,
|
||||
) -> Result<Box<dyn State>> {
|
||||
async fn next(&mut self, _: &mut Context) -> Result<Box<dyn State>> {
|
||||
Ok(Box::new(RegionMigrationEnd))
|
||||
}
|
||||
|
||||
fn status(&self) -> Status {
|
||||
Status::Done
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,24 +12,289 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::any::Any;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::procedure::region_migration::{Context, PersistentContext, State, VolatileContext};
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::rpc::router::RegionRoute;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{location, Location, OptionExt, ResultExt};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use super::downgrade_leader_region::DowngradeLeaderRegion;
|
||||
use super::migration_end::RegionMigrationEnd;
|
||||
use super::open_candidate_region::OpenCandidateRegion;
|
||||
use crate::error::{self, Result};
|
||||
use crate::procedure::region_migration::{Context, State};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct RegionMigrationStart {}
|
||||
pub struct RegionMigrationStart;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
#[typetag::serde]
|
||||
impl State for RegionMigrationStart {
|
||||
async fn next(
|
||||
&mut self,
|
||||
_ctx: &Context,
|
||||
_pc: &mut PersistentContext,
|
||||
_vc: &mut VolatileContext,
|
||||
) -> Result<Box<dyn State>> {
|
||||
// TODO(weny): It will be added in the following PRs.
|
||||
todo!()
|
||||
/// Yields next [State].
|
||||
///
|
||||
/// If the expected leader region has been opened on `to_peer`, go to the MigrationEnd state.
|
||||
///
|
||||
/// If the candidate region has been opened on `to_peer`, go to the DowngradeLeader state.
|
||||
///
|
||||
/// Otherwise go to the OpenCandidateRegion state.
|
||||
async fn next(&mut self, ctx: &mut Context) -> Result<Box<dyn State>> {
|
||||
let region_id = ctx.persistent_ctx.region_id;
|
||||
let to_peer = &ctx.persistent_ctx.to_peer;
|
||||
|
||||
let region_route = self.retrieve_region_route(ctx, region_id).await?;
|
||||
|
||||
if self.check_leader_region_on_peer(®ion_route, to_peer)? {
|
||||
Ok(Box::new(RegionMigrationEnd))
|
||||
} else if self.check_candidate_region_on_peer(®ion_route, to_peer) {
|
||||
Ok(Box::new(DowngradeLeaderRegion))
|
||||
} else {
|
||||
Ok(Box::new(OpenCandidateRegion))
|
||||
}
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl RegionMigrationStart {
|
||||
/// Retrieves region route.
|
||||
///
|
||||
/// Abort(non-retry):
|
||||
/// - TableRoute is not found.
|
||||
/// - RegionRoute is not found.
|
||||
///
|
||||
/// Retry:
|
||||
/// - Failed to retrieve the metadata of table.
|
||||
async fn retrieve_region_route(
|
||||
&self,
|
||||
ctx: &Context,
|
||||
region_id: RegionId,
|
||||
) -> Result<RegionRoute> {
|
||||
let table_id = region_id.table_id();
|
||||
let table_route = ctx
|
||||
.table_metadata_manager
|
||||
.table_route_manager()
|
||||
.get(table_id)
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)
|
||||
.map_err(|e| error::Error::RetryLater {
|
||||
reason: e.to_string(),
|
||||
location: location!(),
|
||||
})?
|
||||
.context(error::TableRouteNotFoundSnafu { table_id })?;
|
||||
|
||||
let region_route = table_route
|
||||
.region_routes
|
||||
.iter()
|
||||
.find(|route| route.region.id == region_id)
|
||||
.cloned()
|
||||
.context(error::UnexpectedSnafu {
|
||||
violated: format!(
|
||||
"RegionRoute({}) is not found in TableRoute({})",
|
||||
region_id, table_id
|
||||
),
|
||||
})?;
|
||||
|
||||
Ok(region_route)
|
||||
}
|
||||
|
||||
/// Checks whether the candidate region on region has been opened.
|
||||
/// Returns true if it's been opened.
|
||||
fn check_candidate_region_on_peer(&self, region_route: &RegionRoute, to_peer: &Peer) -> bool {
|
||||
let region_opened = region_route
|
||||
.follower_peers
|
||||
.iter()
|
||||
.any(|peer| peer.id == to_peer.id);
|
||||
|
||||
region_opened
|
||||
}
|
||||
|
||||
/// Checks whether the leader region on region has been opened.
|
||||
/// Returns true if it's been opened.
|
||||
///
|
||||
/// Abort(non-retry):
|
||||
/// - Leader peer of RegionRoute is not found.
|
||||
fn check_leader_region_on_peer(
|
||||
&self,
|
||||
region_route: &RegionRoute,
|
||||
to_peer: &Peer,
|
||||
) -> Result<bool> {
|
||||
let region_id = region_route.region.id;
|
||||
|
||||
let region_opened = region_route
|
||||
.leader_peer
|
||||
.as_ref()
|
||||
.context(error::UnexpectedSnafu {
|
||||
violated: format!("Leader peer is not found in TableRoute({})", region_id),
|
||||
})?
|
||||
.id
|
||||
== to_peer.id;
|
||||
|
||||
Ok(region_opened)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
|
||||
use common_meta::key::test_utils::new_test_table_info;
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::rpc::router::{Region, RegionRoute};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use super::*;
|
||||
use crate::error::Error;
|
||||
use crate::procedure::region_migration::test_util::TestingEnv;
|
||||
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
|
||||
|
||||
fn new_persistent_context() -> PersistentContext {
|
||||
PersistentContext {
|
||||
from_peer: Peer::empty(1),
|
||||
to_peer: Peer::empty(2),
|
||||
region_id: RegionId::new(1024, 1),
|
||||
cluster_id: 0,
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_table_route_is_not_found_error() {
|
||||
let state = RegionMigrationStart;
|
||||
let env = TestingEnv::new();
|
||||
let persistent_context = new_persistent_context();
|
||||
let ctx = env.context_factory().new_context(persistent_context);
|
||||
|
||||
let err = state
|
||||
.retrieve_region_route(&ctx, RegionId::new(1024, 1))
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
assert_matches!(err, Error::TableRouteNotFound { .. });
|
||||
|
||||
assert!(!err.is_retryable());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_region_route_is_not_found_error() {
|
||||
let state = RegionMigrationStart;
|
||||
let persistent_context = new_persistent_context();
|
||||
let from_peer = persistent_context.from_peer.clone();
|
||||
|
||||
let env = TestingEnv::new();
|
||||
let ctx = env.context_factory().new_context(persistent_context);
|
||||
|
||||
let table_info = new_test_table_info(1024, vec![1]).into();
|
||||
let region_route = RegionRoute {
|
||||
region: Region::new_test(RegionId::new(1024, 1)),
|
||||
leader_peer: Some(from_peer.clone()),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
env.table_metadata_manager()
|
||||
.create_table_metadata(table_info, vec![region_route])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let err = state
|
||||
.retrieve_region_route(&ctx, RegionId::new(1024, 3))
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
assert_matches!(err, Error::Unexpected { .. });
|
||||
assert!(!err.is_retryable());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_next_downgrade_leader_region_state() {
|
||||
let mut state = Box::new(RegionMigrationStart);
|
||||
// from_peer: 1
|
||||
// to_peer: 2
|
||||
let persistent_context = new_persistent_context();
|
||||
let to_peer = persistent_context.to_peer.clone();
|
||||
let region_id = persistent_context.region_id;
|
||||
|
||||
let env = TestingEnv::new();
|
||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||
|
||||
let table_info = new_test_table_info(1024, vec![1]).into();
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region::new_test(region_id),
|
||||
leader_peer: Some(Peer::empty(3)),
|
||||
follower_peers: vec![to_peer],
|
||||
..Default::default()
|
||||
}];
|
||||
|
||||
env.table_metadata_manager()
|
||||
.create_table_metadata(table_info, region_routes)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let next = state.next(&mut ctx).await.unwrap();
|
||||
|
||||
let _ = next
|
||||
.as_any()
|
||||
.downcast_ref::<DowngradeLeaderRegion>()
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_next_migration_end_state() {
|
||||
let mut state = Box::new(RegionMigrationStart);
|
||||
// from_peer: 1
|
||||
// to_peer: 2
|
||||
let persistent_context = new_persistent_context();
|
||||
let to_peer = persistent_context.to_peer.clone();
|
||||
let from_peer = persistent_context.from_peer.clone();
|
||||
let region_id = persistent_context.region_id;
|
||||
|
||||
let env = TestingEnv::new();
|
||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||
|
||||
let table_info = new_test_table_info(1024, vec![1]).into();
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region::new_test(region_id),
|
||||
leader_peer: Some(to_peer),
|
||||
follower_peers: vec![from_peer],
|
||||
..Default::default()
|
||||
}];
|
||||
|
||||
env.table_metadata_manager()
|
||||
.create_table_metadata(table_info, region_routes)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let next = state.next(&mut ctx).await.unwrap();
|
||||
|
||||
let _ = next.as_any().downcast_ref::<RegionMigrationEnd>().unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_next_open_candidate_region_state() {
|
||||
let mut state = Box::new(RegionMigrationStart);
|
||||
// from_peer: 1
|
||||
// to_peer: 2
|
||||
let persistent_context = new_persistent_context();
|
||||
let region_id = persistent_context.region_id;
|
||||
let env = TestingEnv::new();
|
||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||
|
||||
let table_info = new_test_table_info(1024, vec![1]).into();
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region::new_test(region_id),
|
||||
leader_peer: Some(Peer::empty(3)),
|
||||
..Default::default()
|
||||
}];
|
||||
|
||||
env.table_metadata_manager()
|
||||
.create_table_metadata(table_info, region_routes)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let next = state.next(&mut ctx).await.unwrap();
|
||||
|
||||
let _ = next.as_any().downcast_ref::<OpenCandidateRegion>().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,35 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::procedure::region_migration::{Context, State};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct OpenCandidateRegion;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
#[typetag::serde]
|
||||
impl State for OpenCandidateRegion {
|
||||
async fn next(&mut self, _ctx: &mut Context) -> Result<Box<dyn State>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
59
src/meta-srv/src/procedure/region_migration/test_util.rs
Normal file
59
src/meta-srv/src/procedure/region_migration/test_util.rs
Normal file
@@ -0,0 +1,59 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
|
||||
use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
use common_procedure::{Context as ProcedureContext, ProcedureId};
|
||||
use common_procedure_test::MockContextProvider;
|
||||
|
||||
use super::ContextFactoryImpl;
|
||||
|
||||
/// `TestingEnv` provides components during the tests.
|
||||
pub struct TestingEnv {
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
}
|
||||
|
||||
impl TestingEnv {
|
||||
/// Returns an empty [TestingEnv].
|
||||
pub fn new() -> Self {
|
||||
let kv_backend = Arc::new(MemoryKvBackend::new());
|
||||
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
|
||||
|
||||
Self {
|
||||
table_metadata_manager,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a context of region migration procedure.
|
||||
pub fn context_factory(&self) -> ContextFactoryImpl {
|
||||
ContextFactoryImpl {
|
||||
table_metadata_manager: self.table_metadata_manager.clone(),
|
||||
volatile_ctx: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
|
||||
&self.table_metadata_manager
|
||||
}
|
||||
|
||||
/// Returns a [ProcedureContext] with a random [ProcedureId] and a [MockContextProvider].
|
||||
pub fn procedure_context() -> ProcedureContext {
|
||||
ProcedureContext {
|
||||
procedure_id: ProcedureId::random(),
|
||||
provider: Arc::new(MockContextProvider::default()),
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user