refactor: introduce MaintenanceModeManager (#4994)

* refactor: introduce MaintenanceModeManager

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2024-11-15 15:15:22 +08:00
committed by GitHub
parent a2852affeb
commit 08f59008cc
8 changed files with 110 additions and 35 deletions

View File

@@ -90,6 +90,7 @@
pub mod catalog_name;
pub mod datanode_table;
pub mod flow;
pub mod maintenance;
pub mod node_address;
mod schema_metadata_manager;
pub mod schema_name;

View File

@@ -0,0 +1,63 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use crate::error::Result;
use crate::key::MAINTENANCE_KEY;
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::PutRequest;
pub type MaintenanceModeManagerRef = Arc<MaintenanceModeManager>;
/// The maintenance mode manager.
///
/// Used to enable or disable maintenance mode.
#[derive(Clone)]
pub struct MaintenanceModeManager {
kv_backend: KvBackendRef,
}
impl MaintenanceModeManager {
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
}
/// Enables maintenance mode.
pub async fn set_maintenance_mode(&self) -> Result<()> {
let req = PutRequest {
key: Vec::from(MAINTENANCE_KEY),
value: vec![],
prev_kv: false,
};
self.kv_backend.put(req).await?;
Ok(())
}
/// Unsets maintenance mode.
pub async fn unset_maintenance_mode(&self) -> Result<()> {
let req = PutRequest {
key: Vec::from(MAINTENANCE_KEY),
value: vec![],
prev_kv: false,
};
self.kv_backend.put(req).await?;
Ok(())
}
/// Returns true if maintenance mode is enabled.
pub async fn maintenance_mode(&self) -> Result<bool> {
self.kv_backend.exists(MAINTENANCE_KEY.as_bytes()).await
}
}

View File

@@ -622,6 +622,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Maintenance mode manager error"))]
MaintenanceModeManager {
source: common_meta::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Keyvalue backend error"))]
KvBackend {
source: common_meta::error::Error,
@@ -815,6 +822,7 @@ impl ErrorExt for Error {
Error::SubmitDdlTask { source, .. } => source.status_code(),
Error::ConvertProtoData { source, .. }
| Error::TableMetadataManager { source, .. }
| Error::MaintenanceModeManager { source, .. }
| Error::KvBackend { source, .. }
| Error::UnexpectedLogicalRouteTable { source, .. } => source.status_code(),

View File

@@ -26,6 +26,7 @@ use common_config::Configurable;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::key::maintenance::MaintenanceModeManagerRef;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
use common_meta::leadership_notifier::{
@@ -340,6 +341,7 @@ pub struct Metasrv {
procedure_executor: ProcedureExecutorRef,
wal_options_allocator: WalOptionsAllocatorRef,
table_metadata_manager: TableMetadataManagerRef,
maintenance_mode_manager: MaintenanceModeManagerRef,
memory_region_keeper: MemoryRegionKeeperRef,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
region_migration_manager: RegionMigrationManagerRef,
@@ -572,6 +574,10 @@ impl Metasrv {
&self.table_metadata_manager
}
pub fn maintenance_mode_manager(&self) -> &MaintenanceModeManagerRef {
&self.maintenance_mode_manager
}
pub fn memory_region_keeper(&self) -> &MemoryRegionKeeperRef {
&self.memory_region_keeper
}

View File

@@ -27,6 +27,7 @@ use common_meta::ddl::{
use common_meta::ddl_manager::DdlManager;
use common_meta::distributed_time_constants;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::maintenance::MaintenanceModeManager;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
@@ -195,6 +196,7 @@ impl MetasrvBuilder {
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(
leader_cached_kv_backend.clone() as _,
));
let maintenance_mode_manager = Arc::new(MaintenanceModeManager::new(kv_backend.clone()));
let selector_ctx = SelectorContext {
server_addr: options.server_addr.clone(),
datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS,
@@ -304,7 +306,7 @@ impl MetasrvBuilder {
selector_ctx.clone(),
selector.clone(),
region_migration_manager.clone(),
leader_cached_kv_backend.clone() as _,
maintenance_mode_manager.clone(),
peer_lookup_service.clone(),
);
@@ -375,6 +377,7 @@ impl MetasrvBuilder {
procedure_executor: ddl_manager,
wal_options_allocator,
table_metadata_manager,
maintenance_mode_manager,
greptimedb_telemetry_task: get_greptimedb_telemetry_task(
Some(metasrv_home),
meta_peer_client,

View File

@@ -19,8 +19,7 @@ use std::time::Duration;
use async_trait::async_trait;
use common_meta::datanode::Stat;
use common_meta::ddl::{DetectingRegion, RegionFailureDetectorController};
use common_meta::key::MAINTENANCE_KEY;
use common_meta::kv_backend::KvBackendRef;
use common_meta::key::maintenance::MaintenanceModeManagerRef;
use common_meta::leadership_notifier::LeadershipChangeListener;
use common_meta::peer::PeerLookupServiceRef;
use common_meta::{ClusterId, DatanodeId};
@@ -216,8 +215,8 @@ pub struct RegionSupervisor {
selector: SelectorRef,
/// Region migration manager.
region_migration_manager: RegionMigrationManagerRef,
// TODO(weny): find a better way
kv_backend: KvBackendRef,
/// The maintenance mode manager.
maintenance_mode_manager: MaintenanceModeManagerRef,
/// Peer lookup service
peer_lookup: PeerLookupServiceRef,
}
@@ -288,7 +287,7 @@ impl RegionSupervisor {
selector_context: SelectorContext,
selector: SelectorRef,
region_migration_manager: RegionMigrationManagerRef,
kv_backend: KvBackendRef,
maintenance_mode_manager: MaintenanceModeManagerRef,
peer_lookup: PeerLookupServiceRef,
) -> Self {
Self {
@@ -297,7 +296,7 @@ impl RegionSupervisor {
selector_context,
selector,
region_migration_manager,
kv_backend,
maintenance_mode_manager,
peer_lookup,
}
}
@@ -346,7 +345,7 @@ impl RegionSupervisor {
if regions.is_empty() {
return;
}
match self.is_maintenance_mode().await {
match self.is_maintenance_mode_enabled().await {
Ok(false) => {}
Ok(true) => {
info!("Maintenance mode is enabled, skip failover");
@@ -382,11 +381,11 @@ impl RegionSupervisor {
}
}
pub(crate) async fn is_maintenance_mode(&self) -> Result<bool> {
self.kv_backend
.exists(MAINTENANCE_KEY.as_bytes())
pub(crate) async fn is_maintenance_mode_enabled(&self) -> Result<bool> {
self.maintenance_mode_manager
.maintenance_mode()
.await
.context(error::KvBackendSnafu)
.context(error::MaintenanceModeManagerSnafu)
}
async fn do_failover(
@@ -479,6 +478,7 @@ pub(crate) mod tests {
use std::time::Duration;
use common_meta::ddl::RegionFailureDetectorController;
use common_meta::key::maintenance;
use common_meta::peer::Peer;
use common_meta::test_util::NoopPeerLookupService;
use common_time::util::current_time_millis;
@@ -505,7 +505,8 @@ pub(crate) mod tests {
env.procedure_manager().clone(),
context_factory,
));
let kv_backend = env.kv_backend();
let maintenance_mode_manager =
Arc::new(maintenance::MaintenanceModeManager::new(env.kv_backend()));
let peer_lookup = Arc::new(NoopPeerLookupService);
let (tx, rx) = RegionSupervisor::channel();
@@ -516,7 +517,7 @@ pub(crate) mod tests {
selector_context,
selector,
region_migration_manager,
kv_backend,
maintenance_mode_manager,
peer_lookup,
),
tx,

View File

@@ -57,7 +57,7 @@ pub fn make_admin_service(metasrv: Arc<Metasrv>) -> Admin {
let router = router.route(
"/maintenance",
maintenance::MaintenanceHandler {
kv_backend: metasrv.kv_backend().clone(),
manager: metasrv.maintenance_mode_manager().clone(),
},
);
let router = Router::nest("/admin", router);

View File

@@ -14,31 +14,29 @@
use std::collections::HashMap;
use common_meta::key::MAINTENANCE_KEY;
use common_meta::kv_backend::KvBackendRef;
use common_meta::rpc::store::PutRequest;
use common_meta::key::maintenance::MaintenanceModeManagerRef;
use snafu::{OptionExt, ResultExt};
use tonic::codegen::http;
use tonic::codegen::http::Response;
use crate::error::{
InvalidHttpBodySnafu, KvBackendSnafu, MissingRequiredParameterSnafu, ParseBoolSnafu,
UnsupportedSnafu,
InvalidHttpBodySnafu, MaintenanceModeManagerSnafu, MissingRequiredParameterSnafu,
ParseBoolSnafu, UnsupportedSnafu,
};
use crate::service::admin::HttpHandler;
#[derive(Clone)]
pub struct MaintenanceHandler {
pub kv_backend: KvBackendRef,
pub manager: MaintenanceModeManagerRef,
}
impl MaintenanceHandler {
async fn get_maintenance(&self) -> crate::Result<Response<String>> {
let enabled = self
.kv_backend
.exists(MAINTENANCE_KEY.as_bytes())
.manager
.maintenance_mode()
.await
.context(KvBackendSnafu)?;
.context(MaintenanceModeManagerSnafu)?;
let response = if enabled {
"Maintenance mode is enabled"
} else {
@@ -63,21 +61,16 @@ impl MaintenanceHandler {
})?;
let response = if enable {
let req = PutRequest {
key: Vec::from(MAINTENANCE_KEY),
value: vec![],
prev_kv: false,
};
self.kv_backend
.put(req.clone())
self.manager
.set_maintenance_mode()
.await
.context(KvBackendSnafu)?;
.context(MaintenanceModeManagerSnafu)?;
"Maintenance mode enabled"
} else {
self.kv_backend
.delete(MAINTENANCE_KEY.as_bytes(), false)
self.manager
.unset_maintenance_mode()
.await
.context(KvBackendSnafu)?;
.context(MaintenanceModeManagerSnafu)?;
"Maintenance mode disabled"
};