mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-27 02:10:38 +00:00
feat(metasrv): implement maintenance (#3527)
* feat(metasrv): implement maintenance Signed-off-by: tison <wander4096@gmail.com> * fixup and test Signed-off-by: tison <wander4096@gmail.com> * Add coauthors Co-authored-by: Yingwen <realevenyag@gmail.com> Co-authored-by: xifyang <595482900@qq.com> * tidy code Signed-off-by: tison <wander4096@gmail.com> * Apply suggestions from code review Co-authored-by: LFC <990479+MichaelScofield@users.noreply.github.com> * always read kv_backend maintenance state Signed-off-by: tison <wander4096@gmail.com> --------- Signed-off-by: tison <wander4096@gmail.com> Co-authored-by: Yingwen <realevenyag@gmail.com> Co-authored-by: xifyang <595482900@qq.com> Co-authored-by: LFC <990479+MichaelScofield@users.noreply.github.com>
This commit is contained in:
@@ -90,13 +90,13 @@ use crate::kv_backend::KvBackendRef;
|
||||
use crate::rpc::router::{region_distribution, RegionRoute, RegionStatus};
|
||||
use crate::DatanodeId;
|
||||
|
||||
pub const REMOVED_PREFIX: &str = "__removed";
|
||||
|
||||
pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.]*";
|
||||
pub const MAINTENANCE_KEY: &str = "maintenance";
|
||||
|
||||
const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
|
||||
const TABLE_REGION_KEY_PREFIX: &str = "__table_region";
|
||||
|
||||
pub const REMOVED_PREFIX: &str = "__removed";
|
||||
pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
|
||||
pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
|
||||
pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
|
||||
|
||||
@@ -301,6 +301,14 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to parse bool: {}", err_msg))]
|
||||
ParseBool {
|
||||
err_msg: String,
|
||||
#[snafu(source)]
|
||||
error: std::str::ParseBoolError,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid arguments: {}", err_msg))]
|
||||
InvalidArguments { err_msg: String, location: Location },
|
||||
|
||||
@@ -709,6 +717,7 @@ impl ErrorExt for Error {
|
||||
| Error::InvalidStatKey { .. }
|
||||
| Error::InvalidInactiveRegionKey { .. }
|
||||
| Error::ParseNum { .. }
|
||||
| Error::ParseBool { .. }
|
||||
| Error::ParseAddr { .. }
|
||||
| Error::ParseDuration { .. }
|
||||
| Error::UnsupportedSelectorType { .. }
|
||||
|
||||
@@ -107,6 +107,9 @@ impl HeartbeatHandler for RegionFailureHandler {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
|
||||
use common_meta::key::MAINTENANCE_KEY;
|
||||
use store_api::region_engine::RegionRole;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
@@ -163,4 +166,37 @@ mod tests {
|
||||
let dump = handler.failure_detect_runner.dump().await;
|
||||
assert_eq!(dump.iter().collect::<Vec<_>>().len(), 0);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_maintenance_mode() {
|
||||
let region_failover_manager = create_region_failover_manager();
|
||||
let kv_backend = region_failover_manager.create_context().kv_backend.clone();
|
||||
let _handler = RegionFailureHandler::try_new(
|
||||
None,
|
||||
region_failover_manager.clone(),
|
||||
PhiAccrualFailureDetectorOptions::default(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let kv_req = common_meta::rpc::store::PutRequest {
|
||||
key: Vec::from(MAINTENANCE_KEY),
|
||||
value: vec![],
|
||||
prev_kv: false,
|
||||
};
|
||||
let _ = kv_backend.put(kv_req.clone()).await.unwrap();
|
||||
assert_matches!(
|
||||
region_failover_manager.is_maintenance_mode().await,
|
||||
Ok(true)
|
||||
);
|
||||
|
||||
let _ = kv_backend
|
||||
.delete(MAINTENANCE_KEY.as_bytes(), false)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_matches!(
|
||||
region_failover_manager.is_maintenance_mode().await,
|
||||
Ok(false)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -140,40 +140,59 @@ impl FailureDetectRunner {
|
||||
let election = self.election.clone();
|
||||
let region_failover_manager = self.region_failover_manager.clone();
|
||||
let runner_handle = common_runtime::spawn_bg(async move {
|
||||
async fn maybe_region_failover(
|
||||
failure_detectors: &Arc<FailureDetectorContainer>,
|
||||
region_failover_manager: &Arc<RegionFailoverManager>,
|
||||
) {
|
||||
match region_failover_manager.is_maintenance_mode().await {
|
||||
Ok(false) => {}
|
||||
Ok(true) => {
|
||||
info!("Maintenance mode is enabled, skip failover");
|
||||
return;
|
||||
}
|
||||
Err(err) => {
|
||||
error!(err; "Failed to check maintenance mode");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
let failed_regions = failure_detectors
|
||||
.iter()
|
||||
.filter_map(|e| {
|
||||
// Intentionally not place `current_time_millis()` out of the iteration.
|
||||
// The failure detection determination should be happened "just in time",
|
||||
// i.e., failed or not has to be compared with the most recent "now".
|
||||
// Besides, it might reduce the false positive of failure detection,
|
||||
// because during the iteration, heartbeats are coming in as usual,
|
||||
// and the `phi`s are still updating.
|
||||
if !e.failure_detector().is_available(current_time_millis()) {
|
||||
Some(e.region_ident().clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect::<Vec<RegionIdent>>();
|
||||
|
||||
for r in failed_regions {
|
||||
if let Err(e) = region_failover_manager.do_region_failover(&r).await {
|
||||
error!(e; "Failed to do region failover for {r}");
|
||||
} else {
|
||||
// Now that we know the region is starting to do failover, remove it
|
||||
// from the failure detectors, avoiding the failover procedure to be
|
||||
// triggered again.
|
||||
// If the region is back alive (the failover procedure runs successfully),
|
||||
// it will be added back to the failure detectors again.
|
||||
failure_detectors.remove(&r);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
let start = Instant::now();
|
||||
|
||||
let is_leader = election.as_ref().map(|x| x.is_leader()).unwrap_or(true);
|
||||
if is_leader {
|
||||
let failed_regions = failure_detectors
|
||||
.iter()
|
||||
.filter_map(|e| {
|
||||
// Intentionally not place `current_time_millis()` out of the iteration.
|
||||
// The failure detection determination should be happened "just in time",
|
||||
// i.e., failed or not has to be compared with the most recent "now".
|
||||
// Besides, it might reduce the false positive of failure detection,
|
||||
// because during the iteration, heartbeats are coming in as usual,
|
||||
// and the `phi`s are still updating.
|
||||
if !e.failure_detector().is_available(current_time_millis()) {
|
||||
Some(e.region_ident().clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect::<Vec<RegionIdent>>();
|
||||
|
||||
for r in failed_regions {
|
||||
if let Err(e) = region_failover_manager.do_region_failover(&r).await {
|
||||
error!(e; "Failed to do region failover for {r}");
|
||||
} else {
|
||||
// Now that we know the region is starting to do failover, remove it
|
||||
// from the failure detectors, avoiding the failover procedure to be
|
||||
// triggered again.
|
||||
// If the region is back alive (the failover procedure runs successfully),
|
||||
// it will be added back to the failure detectors again.
|
||||
failure_detectors.remove(&r);
|
||||
}
|
||||
}
|
||||
maybe_region_failover(&failure_detectors, ®ion_failover_manager).await;
|
||||
}
|
||||
|
||||
let elapsed = Instant::now().duration_since(start);
|
||||
|
||||
@@ -43,7 +43,7 @@ use tokio::sync::broadcast::error::RecvError;
|
||||
use crate::cluster::MetaPeerClientRef;
|
||||
use crate::election::{Election, LeaderChangeMessage};
|
||||
use crate::error::{
|
||||
self, InitMetadataSnafu, Result, StartProcedureManagerSnafu, StartTelemetryTaskSnafu,
|
||||
InitMetadataSnafu, KvBackendSnafu, Result, StartProcedureManagerSnafu, StartTelemetryTaskSnafu,
|
||||
StopProcedureManagerSnafu,
|
||||
};
|
||||
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
|
||||
@@ -357,7 +357,7 @@ impl MetaSrv {
|
||||
self.leader_cached_kv_backend
|
||||
.load()
|
||||
.await
|
||||
.context(error::KvBackendSnafu)?;
|
||||
.context(KvBackendSnafu)?;
|
||||
self.procedure_manager
|
||||
.start()
|
||||
.await
|
||||
|
||||
@@ -260,6 +260,7 @@ impl MetaSrvBuilder {
|
||||
let region_failover_manager = Arc::new(RegionFailoverManager::new(
|
||||
distributed_time_constants::REGION_LEASE_SECS,
|
||||
in_memory.clone(),
|
||||
kv_backend.clone(),
|
||||
mailbox.clone(),
|
||||
procedure_manager.clone(),
|
||||
(selector.clone(), selector_ctx.clone()),
|
||||
|
||||
@@ -26,8 +26,8 @@ use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_meta::key::datanode_table::DatanodeTableKey;
|
||||
use common_meta::key::TableMetadataManagerRef;
|
||||
use common_meta::kv_backend::ResettableKvBackendRef;
|
||||
use common_meta::key::{TableMetadataManagerRef, MAINTENANCE_KEY};
|
||||
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
|
||||
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock};
|
||||
use common_meta::table_name::TableName;
|
||||
use common_meta::{ClusterId, RegionIdent};
|
||||
@@ -45,7 +45,9 @@ use snafu::ResultExt;
|
||||
use store_api::storage::{RegionId, RegionNumber};
|
||||
use table::metadata::TableId;
|
||||
|
||||
use crate::error::{self, RegisterProcedureLoaderSnafu, Result, TableMetadataManagerSnafu};
|
||||
use crate::error::{
|
||||
self, KvBackendSnafu, RegisterProcedureLoaderSnafu, Result, TableMetadataManagerSnafu,
|
||||
};
|
||||
use crate::lock::DistLockRef;
|
||||
use crate::metasrv::{SelectorContext, SelectorRef};
|
||||
use crate::service::mailbox::MailboxRef;
|
||||
@@ -73,6 +75,7 @@ impl From<RegionIdent> for RegionFailoverKey {
|
||||
pub(crate) struct RegionFailoverManager {
|
||||
region_lease_secs: u64,
|
||||
in_memory: ResettableKvBackendRef,
|
||||
kv_backend: KvBackendRef,
|
||||
mailbox: MailboxRef,
|
||||
procedure_manager: ProcedureManagerRef,
|
||||
selector: SelectorRef,
|
||||
@@ -94,9 +97,11 @@ impl Drop for FailoverProcedureGuard {
|
||||
}
|
||||
|
||||
impl RegionFailoverManager {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) fn new(
|
||||
region_lease_secs: u64,
|
||||
in_memory: ResettableKvBackendRef,
|
||||
kv_backend: KvBackendRef,
|
||||
mailbox: MailboxRef,
|
||||
procedure_manager: ProcedureManagerRef,
|
||||
(selector, selector_ctx): (SelectorRef, SelectorContext),
|
||||
@@ -106,6 +111,7 @@ impl RegionFailoverManager {
|
||||
Self {
|
||||
region_lease_secs,
|
||||
in_memory,
|
||||
kv_backend,
|
||||
mailbox,
|
||||
procedure_manager,
|
||||
selector,
|
||||
@@ -120,6 +126,7 @@ impl RegionFailoverManager {
|
||||
RegionFailoverContext {
|
||||
region_lease_secs: self.region_lease_secs,
|
||||
in_memory: self.in_memory.clone(),
|
||||
kv_backend: self.kv_backend.clone(),
|
||||
mailbox: self.mailbox.clone(),
|
||||
selector: self.selector.clone(),
|
||||
selector_ctx: self.selector_ctx.clone(),
|
||||
@@ -159,6 +166,13 @@ impl RegionFailoverManager {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn is_maintenance_mode(&self) -> Result<bool> {
|
||||
self.kv_backend
|
||||
.exists(MAINTENANCE_KEY.as_bytes())
|
||||
.await
|
||||
.context(KvBackendSnafu)
|
||||
}
|
||||
|
||||
pub(crate) async fn do_region_failover(&self, failed_region: &RegionIdent) -> Result<()> {
|
||||
let Some(guard) = self.insert_running_procedures(failed_region) else {
|
||||
warn!("Region failover procedure for region {failed_region} is already running!");
|
||||
@@ -264,6 +278,7 @@ struct Node {
|
||||
pub struct RegionFailoverContext {
|
||||
pub region_lease_secs: u64,
|
||||
pub in_memory: ResettableKvBackendRef,
|
||||
pub kv_backend: KvBackendRef,
|
||||
pub mailbox: MailboxRef,
|
||||
pub selector: SelectorRef,
|
||||
pub selector_ctx: SelectorContext,
|
||||
@@ -569,6 +584,7 @@ mod tests {
|
||||
context: RegionFailoverContext {
|
||||
region_lease_secs: 10,
|
||||
in_memory,
|
||||
kv_backend,
|
||||
mailbox,
|
||||
selector,
|
||||
selector_ctx,
|
||||
|
||||
@@ -15,10 +15,9 @@
|
||||
mod health;
|
||||
mod heartbeat;
|
||||
mod leader;
|
||||
mod maintenance;
|
||||
mod meta;
|
||||
// TODO(weny): removes it.
|
||||
mod node_lease;
|
||||
#[allow(dead_code)]
|
||||
mod region_migration;
|
||||
mod route;
|
||||
mod util;
|
||||
@@ -99,6 +98,13 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin {
|
||||
};
|
||||
let router = router.route("/region-migration", handler);
|
||||
|
||||
let handler = maintenance::MaintenanceHandler {
|
||||
kv_backend: meta_srv.kv_backend().clone(),
|
||||
};
|
||||
let router = router
|
||||
.route("/maintenance", handler.clone())
|
||||
.route("/maintenance/set", handler);
|
||||
|
||||
let router = Router::nest("/admin", router);
|
||||
|
||||
Admin::new(router)
|
||||
|
||||
103
src/meta-srv/src/service/admin/maintenance.rs
Normal file
103
src/meta-srv/src/service/admin/maintenance.rs
Normal file
@@ -0,0 +1,103 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use common_meta::key::MAINTENANCE_KEY;
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use common_meta::rpc::store::PutRequest;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use tonic::codegen::http;
|
||||
use tonic::codegen::http::Response;
|
||||
|
||||
use crate::error::{
|
||||
InvalidHttpBodySnafu, KvBackendSnafu, MissingRequiredParameterSnafu, ParseBoolSnafu,
|
||||
};
|
||||
use crate::service::admin::HttpHandler;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct MaintenanceHandler {
|
||||
pub kv_backend: KvBackendRef,
|
||||
}
|
||||
|
||||
impl MaintenanceHandler {
|
||||
async fn get_maintenance(&self) -> crate::Result<Response<String>> {
|
||||
let enabled = self
|
||||
.kv_backend
|
||||
.exists(MAINTENANCE_KEY.as_bytes())
|
||||
.await
|
||||
.context(KvBackendSnafu)?;
|
||||
let response = if enabled {
|
||||
"Maintenance mode is enabled"
|
||||
} else {
|
||||
"Maintenance mode is disabled"
|
||||
};
|
||||
http::Response::builder()
|
||||
.status(http::StatusCode::OK)
|
||||
.body(response.into())
|
||||
.context(InvalidHttpBodySnafu)
|
||||
}
|
||||
|
||||
async fn set_maintenance(
|
||||
&self,
|
||||
params: &HashMap<String, String>,
|
||||
) -> crate::Result<Response<String>> {
|
||||
let enable = params
|
||||
.get("enable")
|
||||
.map(|v| v.parse::<bool>())
|
||||
.context(MissingRequiredParameterSnafu { param: "enable" })?
|
||||
.context(ParseBoolSnafu {
|
||||
err_msg: "'enable' must be 'true' or 'false'",
|
||||
})?;
|
||||
|
||||
let response = if enable {
|
||||
let req = PutRequest {
|
||||
key: Vec::from(MAINTENANCE_KEY),
|
||||
value: vec![],
|
||||
prev_kv: false,
|
||||
};
|
||||
self.kv_backend
|
||||
.put(req.clone())
|
||||
.await
|
||||
.context(KvBackendSnafu)?;
|
||||
"Maintenance mode enabled"
|
||||
} else {
|
||||
self.kv_backend
|
||||
.delete(MAINTENANCE_KEY.as_bytes(), false)
|
||||
.await
|
||||
.context(KvBackendSnafu)?;
|
||||
"Maintenance mode disabled"
|
||||
};
|
||||
|
||||
http::Response::builder()
|
||||
.status(http::StatusCode::OK)
|
||||
.body(response.into())
|
||||
.context(InvalidHttpBodySnafu)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl HttpHandler for MaintenanceHandler {
|
||||
async fn handle(
|
||||
&self,
|
||||
path: &str,
|
||||
params: &HashMap<String, String>,
|
||||
) -> crate::Result<Response<String>> {
|
||||
if path.ends_with("/set") {
|
||||
self.set_maintenance(params).await
|
||||
} else {
|
||||
self.get_maintenance().await
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -86,6 +86,7 @@ pub(crate) fn create_region_failover_manager() -> Arc<RegionFailoverManager> {
|
||||
Arc::new(RegionFailoverManager::new(
|
||||
10,
|
||||
in_memory,
|
||||
kv_backend.clone(),
|
||||
mailbox,
|
||||
procedure_manager,
|
||||
(selector, selector_ctx),
|
||||
|
||||
@@ -352,6 +352,7 @@ async fn run_region_failover_procedure(
|
||||
RegionFailoverContext {
|
||||
region_lease_secs: 10,
|
||||
in_memory: meta_srv.in_memory().clone(),
|
||||
kv_backend: meta_srv.kv_backend().clone(),
|
||||
mailbox: meta_srv.mailbox().clone(),
|
||||
selector,
|
||||
selector_ctx: SelectorContext {
|
||||
|
||||
Reference in New Issue
Block a user