feat: expose region migration http endpoint (#3032)

* feat: add region migration endpoint

* feat: implement naive peer registry

* chore: apply suggestions from CR

* chore: rename `ContextFactoryImpl` to `DefaultContextFactory`

* chore: rename unregister to deregister

* refactor: use lease-based alive datanode checking
This commit is contained in:
Weny Xu
2023-12-29 15:57:00 +09:00
committed by GitHub
parent b526d159c3
commit d22072f68b
12 changed files with 185 additions and 38 deletions

View File

@@ -79,6 +79,17 @@ impl MetaPeerClient {
to_stat_kv_map(kvs)
}
// Get kv information from the leader's in_mem kv store.
pub async fn get(&self, key: Vec<u8>) -> Result<Option<KeyValue>> {
let mut kvs = self.range(key, vec![], false).await?;
Ok(if kvs.is_empty() {
None
} else {
debug_assert_eq!(kvs.len(), 1);
Some(kvs.remove(0))
})
}
// Range kv information from the leader's in_mem kv store
pub async fn range(
&self,
@@ -228,7 +239,7 @@ impl MetaPeerClient {
// Check if the meta node is a leader node.
// Note: when self.election is None, we also consider the meta node is leader
fn is_leader(&self) -> bool {
pub(crate) fn is_leader(&self) -> bool {
self.election
.as_ref()
.map(|election| election.is_leader())

View File

@@ -32,6 +32,9 @@ use crate::pubsub::Message;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("The target peer is unavailable temporally: {}", peer_id))]
PeerUnavailable { location: Location, peer_id: u64 },
#[snafu(display("Another migration procedure is running for region: {}", region_id))]
MigrationRunning {
location: Location,
@@ -650,7 +653,8 @@ impl ErrorExt for Error {
| Error::Join { .. }
| Error::WeightArray { .. }
| Error::NotSetWeightArray { .. }
| Error::Unsupported { .. } => StatusCode::Internal,
| Error::Unsupported { .. }
| Error::PeerUnavailable { .. } => StatusCode::Internal,
Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
Error::EmptyKey { .. }
| Error::MissingRequiredParameter { .. }

View File

@@ -229,7 +229,7 @@ impl HeartbeatHandlerGroup {
let _ = self.pushers.insert(key.to_string(), pusher).await;
}
pub async fn unregister(&self, key: impl AsRef<str>) -> Option<Pusher> {
pub async fn deregister(&self, key: impl AsRef<str>) -> Option<Pusher> {
let key = key.as_ref();
METRIC_META_HEARTBEAT_CONNECTION_NUM.dec();
info!("Pusher unregister: {}", key);

View File

@@ -14,27 +14,57 @@
use std::collections::HashMap;
use common_meta::util;
use common_meta::peer::Peer;
use common_meta::{util, ClusterId};
use common_time::util as time_util;
use crate::cluster::MetaPeerClientRef;
use crate::error::Result;
use crate::keys::{LeaseKey, LeaseValue, DN_LEASE_PREFIX};
fn build_lease_filter(lease_secs: u64) -> impl Fn(&LeaseKey, &LeaseValue) -> bool {
move |_: &LeaseKey, v: &LeaseValue| {
((time_util::current_time_millis() - v.timestamp_millis) as u64) < lease_secs * 1000
}
}
pub async fn lookup_alive_datanode_peer(
cluster_id: ClusterId,
datanode_id: u64,
meta_peer_client: &MetaPeerClientRef,
lease_secs: u64,
) -> Result<Option<Peer>> {
let lease_filter = build_lease_filter(lease_secs);
let lease_key = LeaseKey {
cluster_id,
node_id: datanode_id,
};
let Some(kv) = meta_peer_client.get(lease_key.clone().try_into()?).await? else {
return Ok(None);
};
let lease_value: LeaseValue = kv.value.try_into()?;
if lease_filter(&lease_key, &lease_value) {
Ok(Some(Peer {
id: lease_key.node_id,
addr: lease_value.node_addr,
}))
} else {
Ok(None)
}
}
pub async fn alive_datanodes(
cluster_id: u64,
cluster_id: ClusterId,
meta_peer_client: &MetaPeerClientRef,
lease_secs: u64,
) -> Result<HashMap<LeaseKey, LeaseValue>> {
let lease_filter = |_: &LeaseKey, v: &LeaseValue| {
((time_util::current_time_millis() - v.timestamp_millis) as u64) < lease_secs * 1000
};
let lease_filter = build_lease_filter(lease_secs);
filter_datanodes(cluster_id, meta_peer_client, lease_filter).await
}
pub async fn filter_datanodes<P>(
cluster_id: u64,
cluster_id: ClusterId,
meta_peer_client: &MetaPeerClientRef,
predicate: P,
) -> Result<HashMap<LeaseKey, LeaseValue>>

View File

@@ -48,6 +48,7 @@ use crate::error::{
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::handler::HeartbeatHandlerGroup;
use crate::lock::DistLockRef;
use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
use crate::pubsub::{PublishRef, SubscribeManagerRef};
use crate::selector::{Selector, SelectorType};
use crate::service::mailbox::MailboxRef;
@@ -249,6 +250,7 @@ pub struct MetaSrv {
table_metadata_manager: TableMetadataManagerRef,
memory_region_keeper: MemoryRegionKeeperRef,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
region_migration_manager: RegionMigrationManagerRef,
plugins: Plugins,
}
@@ -411,6 +413,10 @@ impl MetaSrv {
&self.memory_region_keeper
}
pub fn region_migration_manager(&self) -> &RegionMigrationManagerRef {
&self.region_migration_manager
}
pub fn publish(&self) -> Option<PublishRef> {
self.plugins.get::<PublishRef>()
}

View File

@@ -57,6 +57,8 @@ use crate::metasrv::{
ElectionRef, MetaSrv, MetaSrvOptions, MetasrvInfo, SelectorContext, SelectorRef, TABLE_ID_SEQ,
};
use crate::procedure::region_failover::RegionFailoverManager;
use crate::procedure::region_migration::manager::RegionMigrationManager;
use crate::procedure::region_migration::DefaultContextFactory;
use crate::pubsub::PublishRef;
use crate::selector::lease_based::LeaseBasedSelector;
use crate::service::mailbox::MailboxRef;
@@ -236,6 +238,17 @@ impl MetaSrvBuilder {
&opening_region_keeper,
)?;
let region_migration_manager = Arc::new(RegionMigrationManager::new(
procedure_manager.clone(),
DefaultContextFactory::new(
table_metadata_manager.clone(),
opening_region_keeper.clone(),
mailbox.clone(),
options.server_addr.clone(),
),
));
region_migration_manager.try_start()?;
let handler_group = match handler_group {
Some(handler_group) => handler_group,
None => {
@@ -323,6 +336,7 @@ impl MetaSrvBuilder {
.await,
plugins: plugins.unwrap_or_else(Plugins::default),
memory_region_keeper: opening_region_keeper,
region_migration_manager,
})
}
}

View File

@@ -127,7 +127,7 @@ pub trait ContextFactory {
/// Default implementation.
#[derive(Clone)]
pub struct ContextFactoryImpl {
pub struct DefaultContextFactory {
volatile_ctx: VolatileContext,
table_metadata_manager: TableMetadataManagerRef,
opening_region_keeper: MemoryRegionKeeperRef,
@@ -135,7 +135,25 @@ pub struct ContextFactoryImpl {
server_addr: String,
}
impl ContextFactory for ContextFactoryImpl {
impl DefaultContextFactory {
/// Returns an [ContextFactoryImpl].
pub fn new(
table_metadata_manager: TableMetadataManagerRef,
opening_region_keeper: MemoryRegionKeeperRef,
mailbox: MailboxRef,
server_addr: String,
) -> Self {
Self {
volatile_ctx: VolatileContext::default(),
table_metadata_manager,
opening_region_keeper,
mailbox,
server_addr,
}
}
}
impl ContextFactory for DefaultContextFactory {
fn new_context(self, persistent_ctx: PersistentContext) -> Context {
Context {
persistent_ctx,

View File

@@ -21,21 +21,23 @@ use common_meta::key::table_route::TableRouteValue;
use common_meta::peer::Peer;
use common_meta::rpc::router::RegionRoute;
use common_meta::ClusterId;
use common_procedure::{watcher, ProcedureManagerRef, ProcedureWithId};
use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId};
use common_telemetry::{error, info};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionId;
use crate::error::{self, Result};
use crate::procedure::region_migration::{
ContextFactoryImpl, PersistentContext, RegionMigrationProcedure,
DefaultContextFactory, PersistentContext, RegionMigrationProcedure,
};
pub type RegionMigrationManagerRef = Arc<RegionMigrationManager>;
/// Manager of region migration procedure.
pub(crate) struct RegionMigrationManager {
pub struct RegionMigrationManager {
procedure_manager: ProcedureManagerRef,
running_procedures: Arc<RwLock<HashMap<RegionId, RegionMigrationProcedureTask>>>,
context_factory: ContextFactoryImpl,
context_factory: DefaultContextFactory,
}
/// The guard of running [RegionMigrationProcedureTask].
@@ -55,10 +57,10 @@ impl Drop for RegionMigrationProcedureGuard {
#[derive(Debug, Clone)]
pub(crate) struct RegionMigrationProcedureTask {
cluster_id: ClusterId,
region_id: RegionId,
from_peer: Peer,
to_peer: Peer,
pub(crate) cluster_id: ClusterId,
pub(crate) region_id: RegionId,
pub(crate) from_peer: Peer,
pub(crate) to_peer: Peer,
}
impl Display for RegionMigrationProcedureTask {
@@ -93,7 +95,7 @@ impl RegionMigrationManager {
/// Returns new [RegionMigrationManager]
pub(crate) fn new(
procedure_manager: ProcedureManagerRef,
context_factory: ContextFactoryImpl,
context_factory: DefaultContextFactory,
) -> Self {
Self {
procedure_manager,
@@ -221,7 +223,10 @@ impl RegionMigrationManager {
}
/// Submits a new region migration procedure.
pub(crate) async fn submit_procedure(&self, task: RegionMigrationProcedureTask) -> Result<()> {
pub(crate) async fn submit_procedure(
&self,
task: RegionMigrationProcedureTask,
) -> Result<Option<ProcedureId>> {
let Some(guard) = self.insert_running_procedure(&task) else {
return error::MigrationRunningSnafu {
region_id: task.region_id,
@@ -243,7 +248,7 @@ impl RegionMigrationManager {
if self.has_migrated(&region_route, &task)? {
info!("Skipping region migration task: {task}");
return Ok(());
return Ok(None);
}
self.verify_region_leader_peer(&region_route, &task)?;
@@ -274,7 +279,7 @@ impl RegionMigrationManager {
info!("Region migration procedure {procedure_id} for {task} is finished successfully!");
});
Ok(())
Ok(Some(procedure_id))
}
}

View File

@@ -43,7 +43,7 @@ use tokio::sync::mpsc::{Receiver, Sender};
use super::migration_abort::RegionMigrationAbort;
use super::upgrade_candidate_region::UpgradeCandidateRegion;
use super::{Context, ContextFactory, ContextFactoryImpl, State, VolatileContext};
use super::{Context, ContextFactory, DefaultContextFactory, State, VolatileContext};
use crate::error::{self, Error, Result};
use crate::handler::{HeartbeatMailbox, Pusher, Pushers};
use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
@@ -120,8 +120,8 @@ impl TestingEnv {
}
/// Returns a context of region migration procedure.
pub fn context_factory(&self) -> ContextFactoryImpl {
ContextFactoryImpl {
pub fn context_factory(&self) -> DefaultContextFactory {
DefaultContextFactory {
table_metadata_manager: self.table_metadata_manager.clone(),
opening_region_keeper: self.opening_region_keeper.clone(),
volatile_ctx: Default::default(),

View File

@@ -93,6 +93,12 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin {
.route("/route", handler.clone())
.route("/route/help", handler);
let handler = region_migration::SubmitRegionMigrationTaskHandler {
region_migration_manager: meta_srv.region_migration_manager().clone(),
meta_peer_client: meta_srv.meta_peer_client().clone(),
};
let router = router.route("/region-migration", handler);
let router = Router::nest("/admin", router);
Admin::new(router)

View File

@@ -17,22 +17,24 @@ use std::num::ParseIntError;
use std::str::FromStr;
use common_meta::peer::Peer;
use common_meta::ClusterId;
use common_meta::{distributed_time_constants, ClusterId};
use serde::Serialize;
use snafu::ResultExt;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionId;
use tonic::codegen::http;
use super::HttpHandler;
use crate::cluster::MetaPeerClientRef;
use crate::error::{self, Error, Result};
pub trait PeerLookup: Send + Sync {
fn peer(&self, peer_id: u64) -> Option<Peer>;
}
use crate::lease::lookup_alive_datanode_peer;
use crate::procedure::region_migration::manager::{
RegionMigrationManagerRef, RegionMigrationProcedureTask,
};
/// The handler of submitting migration task.
pub struct SubmitRegionMigrationTaskHandler {
// TODO(weny): waits for https://github.com/GreptimeTeam/greptimedb/pull/3014
pub region_migration_manager: RegionMigrationManagerRef,
pub meta_peer_client: MetaPeerClientRef,
}
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -45,7 +47,8 @@ struct SubmitRegionMigrationTaskRequest {
#[derive(Debug, Serialize)]
struct SubmitRegionMigrationTaskResponse {
procedure_id: String,
/// The `None` stands region has been migrated.
procedure_id: Option<String>,
}
fn parse_num_parameter_with_default<T, F>(
@@ -96,13 +99,63 @@ impl TryFrom<&HashMap<String, String>> for SubmitRegionMigrationTaskRequest {
}
impl SubmitRegionMigrationTaskHandler {
fn is_leader(&self) -> bool {
self.meta_peer_client.is_leader()
}
/// Checks the peer is available.
async fn lookup_peer(&self, cluster_id: ClusterId, peer_id: u64) -> Result<Option<Peer>> {
lookup_alive_datanode_peer(
cluster_id,
peer_id,
&self.meta_peer_client,
distributed_time_constants::DATANODE_LEASE_SECS,
)
.await
}
/// Submits a region migration task, returns the procedure id.
async fn handle_submit(
&self,
_task: SubmitRegionMigrationTaskRequest,
task: SubmitRegionMigrationTaskRequest,
) -> Result<SubmitRegionMigrationTaskResponse> {
// TODO(weny): waits for https://github.com/GreptimeTeam/greptimedb/pull/3014
todo!()
ensure!(
self.is_leader(),
error::UnexpectedSnafu {
violated: "Trying to submit a region migration procedure to non-leader meta server"
}
);
let SubmitRegionMigrationTaskRequest {
cluster_id,
region_id,
from_peer_id,
to_peer_id,
} = task;
let from_peer = self.lookup_peer(cluster_id, from_peer_id).await?.context(
error::PeerUnavailableSnafu {
peer_id: from_peer_id,
},
)?;
let to_peer = self.lookup_peer(cluster_id, to_peer_id).await?.context(
error::PeerUnavailableSnafu {
peer_id: to_peer_id,
},
)?;
let procedure_id = self
.region_migration_manager
.submit_procedure(RegionMigrationProcedureTask {
cluster_id,
region_id,
from_peer,
to_peer,
})
.await?;
Ok(SubmitRegionMigrationTaskResponse {
procedure_id: procedure_id.map(|id| id.to_string()),
})
}
}

View File

@@ -113,7 +113,7 @@ impl heartbeat_server::Heartbeat for MetaSrv {
);
if let Some(key) = pusher_key {
let _ = handler_group.unregister(&key).await;
let _ = handler_group.deregister(&key).await;
}
});