feat: implement region migration manager (#3014)

* feat: implement region migration manager

* Update src/meta-srv/src/procedure/region_migration/manager.rs

Co-authored-by: JeremyHi <jiachun_feng@proton.me>

* chore: apply suggestions from CR

---------

Co-authored-by: JeremyHi <jiachun_feng@proton.me>
This commit is contained in:
Weny Xu
2023-12-27 19:50:10 +09:00
committed by GitHub
parent 840e94630d
commit abeb32e042
6 changed files with 483 additions and 2 deletions

View File

@@ -87,6 +87,11 @@ impl TableRouteValue {
.cloned()
}
/// Returns true if it's [TableRouteValue::Physical].
pub fn is_physical(&self) -> bool {
matches!(self, TableRouteValue::Physical(_))
}
/// Gets the [RegionRoute]s of this [TableRouteValue::Physical].
///
/// # Panics

View File

@@ -32,6 +32,12 @@ use crate::pubsub::Message;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Another migration procedure is running for region: {}", region_id))]
MigrationRunning {
location: Location,
region_id: RegionId,
},
#[snafu(display("The region migration procedure aborted, reason: {}", reason))]
MigrationAbort { location: Location, reason: String },
@@ -675,7 +681,8 @@ impl ErrorExt for Error {
| Error::TableIdChanged { .. }
| Error::RegionOpeningRace { .. }
| Error::RegionRouteNotFound { .. }
| Error::MigrationAbort { .. } => StatusCode::Unexpected,
| Error::MigrationAbort { .. }
| Error::MigrationRunning { .. } => StatusCode::Unexpected,
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::InvalidateTableCache { source, .. } => source.status_code(),
Error::RequestDatanode { source, .. } => source.status_code(),

View File

@@ -13,6 +13,9 @@
// limitations under the License.
pub(crate) mod downgrade_leader_region;
// TODO(weny): remove it.
#[allow(dead_code)]
pub(crate) mod manager;
pub(crate) mod migration_abort;
pub(crate) mod migration_end;
pub(crate) mod migration_start;
@@ -123,6 +126,7 @@ pub trait ContextFactory {
}
/// Default implementation.
#[derive(Clone)]
pub struct ContextFactoryImpl {
volatile_ctx: VolatileContext,
table_metadata_manager: TableMetadataManagerRef,

View File

@@ -0,0 +1,452 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt::Display;
use std::sync::{Arc, RwLock};
use common_meta::key::table_route::TableRouteValue;
use common_meta::peer::Peer;
use common_meta::rpc::router::RegionRoute;
use common_meta::ClusterId;
use common_procedure::{watcher, ProcedureManagerRef, ProcedureWithId};
use common_telemetry::{error, info};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionId;
use crate::error::{self, Result};
use crate::procedure::region_migration::{
ContextFactoryImpl, PersistentContext, RegionMigrationProcedure,
};
/// Manager of region migration procedure.
pub(crate) struct RegionMigrationManager {
procedure_manager: ProcedureManagerRef,
running_procedures: Arc<RwLock<HashMap<RegionId, RegionMigrationProcedureTask>>>,
context_factory: ContextFactoryImpl,
}
/// The guard of running [RegionMigrationProcedureTask].
pub(crate) struct RegionMigrationProcedureGuard {
region_id: RegionId,
running_procedures: Arc<RwLock<HashMap<RegionId, RegionMigrationProcedureTask>>>,
}
impl Drop for RegionMigrationProcedureGuard {
fn drop(&mut self) {
self.running_procedures
.write()
.unwrap()
.remove(&self.region_id);
}
}
#[derive(Debug, Clone)]
pub(crate) struct RegionMigrationProcedureTask {
cluster_id: ClusterId,
region_id: RegionId,
from_peer: Peer,
to_peer: Peer,
}
impl Display for RegionMigrationProcedureTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"cluster: {}, region: {}, from_peer: {}, to_peer: {}",
self.cluster_id, self.region_id, self.from_peer, self.to_peer
)
}
}
impl From<RegionMigrationProcedureTask> for PersistentContext {
fn from(
RegionMigrationProcedureTask {
cluster_id,
region_id,
from_peer,
to_peer,
}: RegionMigrationProcedureTask,
) -> Self {
PersistentContext {
cluster_id,
from_peer,
to_peer,
region_id,
}
}
}
impl RegionMigrationManager {
/// Returns new [RegionMigrationManager]
pub(crate) fn new(
procedure_manager: ProcedureManagerRef,
context_factory: ContextFactoryImpl,
) -> Self {
Self {
procedure_manager,
running_procedures: Arc::new(RwLock::new(HashMap::new())),
context_factory,
}
}
/// Registers the loader of [RegionMigrationProcedure] to the `ProcedureManager`.
pub(crate) fn try_start(&self) -> Result<()> {
let context_factory = self.context_factory.clone();
self.procedure_manager
.register_loader(
RegionMigrationProcedure::TYPE_NAME,
Box::new(move |json| {
let context_factory = context_factory.clone();
RegionMigrationProcedure::from_json(json, context_factory)
.map(|p| Box::new(p) as _)
}),
)
.context(error::RegisterProcedureLoaderSnafu {
type_name: RegionMigrationProcedure::TYPE_NAME,
})
}
fn insert_running_procedure(
&self,
task: &RegionMigrationProcedureTask,
) -> Option<RegionMigrationProcedureGuard> {
let mut procedures = self.running_procedures.write().unwrap();
match procedures.entry(task.region_id) {
Entry::Occupied(_) => None,
Entry::Vacant(v) => {
v.insert(task.clone());
Some(RegionMigrationProcedureGuard {
region_id: task.region_id,
running_procedures: self.running_procedures.clone(),
})
}
}
}
fn verify_task(&self, task: &RegionMigrationProcedureTask) -> Result<()> {
if task.to_peer.id == task.from_peer.id {
return error::InvalidArgumentsSnafu {
err_msg: "The `from_peer_id` can't equal `to_peer_id`",
}
.fail();
}
Ok(())
}
async fn retrieve_table_route(&self, region_id: RegionId) -> Result<TableRouteValue> {
let table_route = self
.context_factory
.table_metadata_manager
.table_route_manager()
.get(region_id.table_id())
.await
.context(error::TableMetadataManagerSnafu)?
.context(error::TableRouteNotFoundSnafu {
table_id: region_id.table_id(),
})?;
Ok(table_route.into_inner())
}
/// Verifies the type of region migration table route.
fn verify_table_route(
&self,
table_route: &TableRouteValue,
task: &RegionMigrationProcedureTask,
) -> Result<()> {
if !table_route.is_physical() {
return error::UnexpectedSnafu {
violated: format!(
"Trying to execute region migration on the logical table, task {task}"
),
}
.fail();
}
Ok(())
}
/// Returns true if the region has been migrated.
fn has_migrated(
&self,
region_route: &RegionRoute,
task: &RegionMigrationProcedureTask,
) -> Result<bool> {
let leader_peer = region_route
.leader_peer
.as_ref()
.context(error::UnexpectedSnafu {
violated: "Region route leader peer is not found",
})?;
Ok(leader_peer.id == task.to_peer.id)
}
/// Throws an error if `leader_peer` is not the `from_peer`.
fn verify_region_leader_peer(
&self,
region_route: &RegionRoute,
task: &RegionMigrationProcedureTask,
) -> Result<()> {
let leader_peer = region_route
.leader_peer
.as_ref()
.context(error::UnexpectedSnafu {
violated: "Region route leader peer is not found",
})?;
ensure!(
leader_peer.id == task.from_peer.id,
error::InvalidArgumentsSnafu {
err_msg: "Invalid region migration `from_peer` argument"
}
);
Ok(())
}
/// Submits a new region migration procedure.
pub(crate) async fn submit_procedure(&self, task: RegionMigrationProcedureTask) -> Result<()> {
let Some(guard) = self.insert_running_procedure(&task) else {
return error::MigrationRunningSnafu {
region_id: task.region_id,
}
.fail();
};
self.verify_task(&task)?;
let region_id = task.region_id;
let table_route = self.retrieve_table_route(region_id).await?;
self.verify_table_route(&table_route, &task)?;
// Safety: checked before.
let region_route = table_route
.region_route(region_id)
.context(error::RegionRouteNotFoundSnafu { region_id })?;
if self.has_migrated(&region_route, &task)? {
info!("Skipping region migration task: {task}");
return Ok(());
}
self.verify_region_leader_peer(&region_route, &task)?;
let procedure =
RegionMigrationProcedure::new(task.clone().into(), self.context_factory.clone());
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let procedure_id = procedure_with_id.id;
info!("Starting region migration procedure {procedure_id} for {task}");
let procedure_manager = self.procedure_manager.clone();
common_runtime::spawn_bg(async move {
let _ = guard;
let watcher = &mut match procedure_manager.submit(procedure_with_id).await {
Ok(watcher) => watcher,
Err(e) => {
error!(e; "Failed to submit region migration procedure {procedure_id} for {task}");
return;
}
};
if let Err(e) = watcher::wait(watcher).await {
error!(e; "Failed to wait region migration procedure {procedure_id} for {task}");
return;
}
info!("Region migration procedure {procedure_id} for {task} is finished successfully!");
});
Ok(())
}
}
#[cfg(test)]
mod test {
use std::assert_matches::assert_matches;
use common_meta::key::table_route::LogicalTableRouteValue;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::rpc::router::Region;
use super::*;
use crate::procedure::region_migration::test_util::TestingEnv;
#[tokio::test]
async fn test_insert_running_procedure() {
let env = TestingEnv::new();
let context_factory = env.context_factory();
let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
let region_id = RegionId::new(1024, 1);
let task = RegionMigrationProcedureTask {
cluster_id: 1,
region_id,
from_peer: Peer::empty(2),
to_peer: Peer::empty(1),
};
// Inserts one
manager
.running_procedures
.write()
.unwrap()
.insert(region_id, task.clone());
let err = manager.submit_procedure(task).await.unwrap_err();
assert_matches!(err, error::Error::MigrationRunning { .. });
}
#[tokio::test]
async fn test_submit_procedure_invalid_task() {
let env = TestingEnv::new();
let context_factory = env.context_factory();
let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
let region_id = RegionId::new(1024, 1);
let task = RegionMigrationProcedureTask {
cluster_id: 1,
region_id,
from_peer: Peer::empty(1),
to_peer: Peer::empty(1),
};
let err = manager.submit_procedure(task).await.unwrap_err();
assert_matches!(err, error::Error::InvalidArguments { .. });
}
#[tokio::test]
async fn test_submit_procedure_table_not_found() {
let env = TestingEnv::new();
let context_factory = env.context_factory();
let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
let region_id = RegionId::new(1024, 1);
let task = RegionMigrationProcedureTask {
cluster_id: 1,
region_id,
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
};
let err = manager.submit_procedure(task).await.unwrap_err();
assert_matches!(err, error::Error::TableRouteNotFound { .. });
}
#[tokio::test]
async fn test_submit_procedure_region_route_not_found() {
let env = TestingEnv::new();
let context_factory = env.context_factory();
let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
let region_id = RegionId::new(1024, 1);
let task = RegionMigrationProcedureTask {
cluster_id: 1,
region_id,
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
};
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(RegionId::new(1024, 2)),
leader_peer: Some(Peer::empty(3)),
..Default::default()
}];
env.create_physical_table_metadata(table_info, region_routes)
.await;
let err = manager.submit_procedure(task).await.unwrap_err();
assert_matches!(err, error::Error::RegionRouteNotFound { .. });
}
#[tokio::test]
async fn test_submit_procedure_incorrect_from_peer() {
let env = TestingEnv::new();
let context_factory = env.context_factory();
let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
let region_id = RegionId::new(1024, 1);
let task = RegionMigrationProcedureTask {
cluster_id: 1,
region_id,
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
};
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),
leader_peer: Some(Peer::empty(3)),
..Default::default()
}];
env.create_physical_table_metadata(table_info, region_routes)
.await;
let err = manager.submit_procedure(task).await.unwrap_err();
assert_matches!(err, error::Error::InvalidArguments { .. });
assert!(err
.to_string()
.contains("Invalid region migration `from_peer` argument"));
}
#[tokio::test]
async fn test_submit_procedure_has_migrated() {
common_telemetry::init_default_ut_logging();
let env = TestingEnv::new();
let context_factory = env.context_factory();
let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
let region_id = RegionId::new(1024, 1);
let task = RegionMigrationProcedureTask {
cluster_id: 1,
region_id,
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
};
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),
leader_peer: Some(Peer::empty(2)),
..Default::default()
}];
env.create_physical_table_metadata(table_info, region_routes)
.await;
manager.submit_procedure(task).await.unwrap();
}
#[tokio::test]
async fn test_verify_table_route_error() {
let env = TestingEnv::new();
let context_factory = env.context_factory();
let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
let region_id = RegionId::new(1024, 1);
let task = RegionMigrationProcedureTask {
cluster_id: 1,
region_id,
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
};
let err = manager
.verify_table_route(&TableRouteValue::Logical(LogicalTableRouteValue {}), &task)
.unwrap_err();
assert_matches!(err, error::Error::Unexpected { .. });
}
}

View File

@@ -29,8 +29,10 @@ use common_meta::peer::Peer;
use common_meta::region_keeper::{MemoryRegionKeeper, MemoryRegionKeeperRef};
use common_meta::rpc::router::RegionRoute;
use common_meta::sequence::{Sequence, SequenceBuilder};
use common_meta::state_store::KvStateStore;
use common_meta::DatanodeId;
use common_procedure::{Context as ProcedureContext, ProcedureId, Status};
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::{Context as ProcedureContext, ProcedureId, ProcedureManagerRef, Status};
use common_procedure_test::MockContextProvider;
use common_telemetry::debug;
use common_time::util::current_time_millis;
@@ -90,6 +92,7 @@ pub struct TestingEnv {
mailbox_ctx: MailboxContext,
opening_region_keeper: MemoryRegionKeeperRef,
server_addr: String,
procedure_manager: ProcedureManagerRef,
}
impl TestingEnv {
@@ -104,11 +107,15 @@ impl TestingEnv {
let mailbox_ctx = MailboxContext::new(mailbox_sequence);
let opening_region_keeper = Arc::new(MemoryRegionKeeper::default());
let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store));
Self {
table_metadata_manager,
opening_region_keeper,
mailbox_ctx,
server_addr: "localhost".to_string(),
procedure_manager,
}
}
@@ -146,6 +153,11 @@ impl TestingEnv {
}
}
/// Returns the [ProcedureManagerRef].
pub fn procedure_manager(&self) -> &ProcedureManagerRef {
&self.procedure_manager
}
// Creates a table metadata with the physical table route.
pub async fn create_physical_table_metadata(
&self,

View File

@@ -43,6 +43,7 @@ impl UpdateMetadata {
let table_id = region_id.table_id();
let current_table_route_value = ctx.get_table_route_value().await?;
// TODO(weny): ensures the leader region peer is the `from_peer`.
if let Err(err) = table_metadata_manager
.update_leader_region_status(table_id, current_table_route_value, |route| {
if route.region.id == region_id