diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 6f63356540..aa88aa935d 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -90,6 +90,7 @@ pub mod catalog_name; pub mod datanode_table; pub mod flow; +pub mod maintenance; pub mod node_address; mod schema_metadata_manager; pub mod schema_name; diff --git a/src/common/meta/src/key/maintenance.rs b/src/common/meta/src/key/maintenance.rs new file mode 100644 index 0000000000..b8ee760eb8 --- /dev/null +++ b/src/common/meta/src/key/maintenance.rs @@ -0,0 +1,63 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use crate::error::Result; +use crate::key::MAINTENANCE_KEY; +use crate::kv_backend::KvBackendRef; +use crate::rpc::store::PutRequest; + +pub type MaintenanceModeManagerRef = Arc; + +/// The maintenance mode manager. +/// +/// Used to enable or disable maintenance mode. +#[derive(Clone)] +pub struct MaintenanceModeManager { + kv_backend: KvBackendRef, +} + +impl MaintenanceModeManager { + pub fn new(kv_backend: KvBackendRef) -> Self { + Self { kv_backend } + } + + /// Enables maintenance mode. + pub async fn set_maintenance_mode(&self) -> Result<()> { + let req = PutRequest { + key: Vec::from(MAINTENANCE_KEY), + value: vec![], + prev_kv: false, + }; + self.kv_backend.put(req).await?; + Ok(()) + } + + /// Unsets maintenance mode. + pub async fn unset_maintenance_mode(&self) -> Result<()> { + let req = PutRequest { + key: Vec::from(MAINTENANCE_KEY), + value: vec![], + prev_kv: false, + }; + self.kv_backend.put(req).await?; + Ok(()) + } + + /// Returns true if maintenance mode is enabled. + pub async fn maintenance_mode(&self) -> Result { + self.kv_backend.exists(MAINTENANCE_KEY.as_bytes()).await + } +} diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index d7db60e8b1..705f31ac49 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -622,6 +622,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Maintenance mode manager error"))] + MaintenanceModeManager { + source: common_meta::error::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Keyvalue backend error"))] KvBackend { source: common_meta::error::Error, @@ -815,6 +822,7 @@ impl ErrorExt for Error { Error::SubmitDdlTask { source, .. } => source.status_code(), Error::ConvertProtoData { source, .. } | Error::TableMetadataManager { source, .. } + | Error::MaintenanceModeManager { source, .. } | Error::KvBackend { source, .. } | Error::UnexpectedLogicalRouteTable { source, .. } => source.status_code(), diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 11c41a58da..9de0487d01 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -26,6 +26,7 @@ use common_config::Configurable; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::ddl::ProcedureExecutorRef; +use common_meta::key::maintenance::MaintenanceModeManagerRef; use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef}; use common_meta::leadership_notifier::{ @@ -340,6 +341,7 @@ pub struct Metasrv { procedure_executor: ProcedureExecutorRef, wal_options_allocator: WalOptionsAllocatorRef, table_metadata_manager: TableMetadataManagerRef, + maintenance_mode_manager: MaintenanceModeManagerRef, memory_region_keeper: MemoryRegionKeeperRef, greptimedb_telemetry_task: Arc, region_migration_manager: RegionMigrationManagerRef, @@ -572,6 +574,10 @@ impl Metasrv { &self.table_metadata_manager } + pub fn maintenance_mode_manager(&self) -> &MaintenanceModeManagerRef { + &self.maintenance_mode_manager + } + pub fn memory_region_keeper(&self) -> &MemoryRegionKeeperRef { &self.memory_region_keeper } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index b21978a397..05344b482b 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -27,6 +27,7 @@ use common_meta::ddl::{ use common_meta::ddl_manager::DdlManager; use common_meta::distributed_time_constants; use common_meta::key::flow::FlowMetadataManager; +use common_meta::key::maintenance::MaintenanceModeManager; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; @@ -195,6 +196,7 @@ impl MetasrvBuilder { let flow_metadata_manager = Arc::new(FlowMetadataManager::new( leader_cached_kv_backend.clone() as _, )); + let maintenance_mode_manager = Arc::new(MaintenanceModeManager::new(kv_backend.clone())); let selector_ctx = SelectorContext { server_addr: options.server_addr.clone(), datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS, @@ -304,7 +306,7 @@ impl MetasrvBuilder { selector_ctx.clone(), selector.clone(), region_migration_manager.clone(), - leader_cached_kv_backend.clone() as _, + maintenance_mode_manager.clone(), peer_lookup_service.clone(), ); @@ -375,6 +377,7 @@ impl MetasrvBuilder { procedure_executor: ddl_manager, wal_options_allocator, table_metadata_manager, + maintenance_mode_manager, greptimedb_telemetry_task: get_greptimedb_telemetry_task( Some(metasrv_home), meta_peer_client, diff --git a/src/meta-srv/src/region/supervisor.rs b/src/meta-srv/src/region/supervisor.rs index c6db67c761..dfc2bd45fc 100644 --- a/src/meta-srv/src/region/supervisor.rs +++ b/src/meta-srv/src/region/supervisor.rs @@ -19,8 +19,7 @@ use std::time::Duration; use async_trait::async_trait; use common_meta::datanode::Stat; use common_meta::ddl::{DetectingRegion, RegionFailureDetectorController}; -use common_meta::key::MAINTENANCE_KEY; -use common_meta::kv_backend::KvBackendRef; +use common_meta::key::maintenance::MaintenanceModeManagerRef; use common_meta::leadership_notifier::LeadershipChangeListener; use common_meta::peer::PeerLookupServiceRef; use common_meta::{ClusterId, DatanodeId}; @@ -216,8 +215,8 @@ pub struct RegionSupervisor { selector: SelectorRef, /// Region migration manager. region_migration_manager: RegionMigrationManagerRef, - // TODO(weny): find a better way - kv_backend: KvBackendRef, + /// The maintenance mode manager. + maintenance_mode_manager: MaintenanceModeManagerRef, /// Peer lookup service peer_lookup: PeerLookupServiceRef, } @@ -288,7 +287,7 @@ impl RegionSupervisor { selector_context: SelectorContext, selector: SelectorRef, region_migration_manager: RegionMigrationManagerRef, - kv_backend: KvBackendRef, + maintenance_mode_manager: MaintenanceModeManagerRef, peer_lookup: PeerLookupServiceRef, ) -> Self { Self { @@ -297,7 +296,7 @@ impl RegionSupervisor { selector_context, selector, region_migration_manager, - kv_backend, + maintenance_mode_manager, peer_lookup, } } @@ -346,7 +345,7 @@ impl RegionSupervisor { if regions.is_empty() { return; } - match self.is_maintenance_mode().await { + match self.is_maintenance_mode_enabled().await { Ok(false) => {} Ok(true) => { info!("Maintenance mode is enabled, skip failover"); @@ -382,11 +381,11 @@ impl RegionSupervisor { } } - pub(crate) async fn is_maintenance_mode(&self) -> Result { - self.kv_backend - .exists(MAINTENANCE_KEY.as_bytes()) + pub(crate) async fn is_maintenance_mode_enabled(&self) -> Result { + self.maintenance_mode_manager + .maintenance_mode() .await - .context(error::KvBackendSnafu) + .context(error::MaintenanceModeManagerSnafu) } async fn do_failover( @@ -479,6 +478,7 @@ pub(crate) mod tests { use std::time::Duration; use common_meta::ddl::RegionFailureDetectorController; + use common_meta::key::maintenance; use common_meta::peer::Peer; use common_meta::test_util::NoopPeerLookupService; use common_time::util::current_time_millis; @@ -505,7 +505,8 @@ pub(crate) mod tests { env.procedure_manager().clone(), context_factory, )); - let kv_backend = env.kv_backend(); + let maintenance_mode_manager = + Arc::new(maintenance::MaintenanceModeManager::new(env.kv_backend())); let peer_lookup = Arc::new(NoopPeerLookupService); let (tx, rx) = RegionSupervisor::channel(); @@ -516,7 +517,7 @@ pub(crate) mod tests { selector_context, selector, region_migration_manager, - kv_backend, + maintenance_mode_manager, peer_lookup, ), tx, diff --git a/src/meta-srv/src/service/admin.rs b/src/meta-srv/src/service/admin.rs index fca8cf3295..772c3bdba5 100644 --- a/src/meta-srv/src/service/admin.rs +++ b/src/meta-srv/src/service/admin.rs @@ -57,7 +57,7 @@ pub fn make_admin_service(metasrv: Arc) -> Admin { let router = router.route( "/maintenance", maintenance::MaintenanceHandler { - kv_backend: metasrv.kv_backend().clone(), + manager: metasrv.maintenance_mode_manager().clone(), }, ); let router = Router::nest("/admin", router); diff --git a/src/meta-srv/src/service/admin/maintenance.rs b/src/meta-srv/src/service/admin/maintenance.rs index b207fecd9f..266702576b 100644 --- a/src/meta-srv/src/service/admin/maintenance.rs +++ b/src/meta-srv/src/service/admin/maintenance.rs @@ -14,31 +14,29 @@ use std::collections::HashMap; -use common_meta::key::MAINTENANCE_KEY; -use common_meta::kv_backend::KvBackendRef; -use common_meta::rpc::store::PutRequest; +use common_meta::key::maintenance::MaintenanceModeManagerRef; use snafu::{OptionExt, ResultExt}; use tonic::codegen::http; use tonic::codegen::http::Response; use crate::error::{ - InvalidHttpBodySnafu, KvBackendSnafu, MissingRequiredParameterSnafu, ParseBoolSnafu, - UnsupportedSnafu, + InvalidHttpBodySnafu, MaintenanceModeManagerSnafu, MissingRequiredParameterSnafu, + ParseBoolSnafu, UnsupportedSnafu, }; use crate::service::admin::HttpHandler; #[derive(Clone)] pub struct MaintenanceHandler { - pub kv_backend: KvBackendRef, + pub manager: MaintenanceModeManagerRef, } impl MaintenanceHandler { async fn get_maintenance(&self) -> crate::Result> { let enabled = self - .kv_backend - .exists(MAINTENANCE_KEY.as_bytes()) + .manager + .maintenance_mode() .await - .context(KvBackendSnafu)?; + .context(MaintenanceModeManagerSnafu)?; let response = if enabled { "Maintenance mode is enabled" } else { @@ -63,21 +61,16 @@ impl MaintenanceHandler { })?; let response = if enable { - let req = PutRequest { - key: Vec::from(MAINTENANCE_KEY), - value: vec![], - prev_kv: false, - }; - self.kv_backend - .put(req.clone()) + self.manager + .set_maintenance_mode() .await - .context(KvBackendSnafu)?; + .context(MaintenanceModeManagerSnafu)?; "Maintenance mode enabled" } else { - self.kv_backend - .delete(MAINTENANCE_KEY.as_bytes(), false) + self.manager + .unset_maintenance_mode() .await - .context(KvBackendSnafu)?; + .context(MaintenanceModeManagerSnafu)?; "Maintenance mode disabled" };