diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 61bee47122..97f98bc9b5 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -995,6 +995,7 @@ mod tests { Default::default(), state_store, poison_manager, + None, )); let _ = DdlManager::try_new( diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 6a50baf381..36becc09e2 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -100,8 +100,8 @@ pub mod catalog_name; pub mod datanode_table; pub mod flow; -pub mod maintenance; pub mod node_address; +pub mod runtime_switch; mod schema_metadata_manager; pub mod schema_name; pub mod table_info; @@ -164,7 +164,9 @@ use crate::state_store::PoisonValue; use crate::DatanodeId; pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*"; -pub const MAINTENANCE_KEY: &str = "__maintenance"; +pub const LEGACY_MAINTENANCE_KEY: &str = "__maintenance"; +pub const MAINTENANCE_KEY: &str = "__switches/maintenance"; +pub const PAUSE_PROCEDURE_KEY: &str = "__switches/pause_procedure"; pub const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table"; pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info"; diff --git a/src/common/meta/src/key/maintenance.rs b/src/common/meta/src/key/maintenance.rs deleted file mode 100644 index c1cb93d76e..0000000000 --- a/src/common/meta/src/key/maintenance.rs +++ /dev/null @@ -1,86 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use crate::error::Result; -use crate::key::MAINTENANCE_KEY; -use crate::kv_backend::KvBackendRef; -use crate::rpc::store::PutRequest; - -pub type MaintenanceModeManagerRef = Arc; - -/// The maintenance mode manager. -/// -/// Used to enable or disable maintenance mode. -#[derive(Clone)] -pub struct MaintenanceModeManager { - kv_backend: KvBackendRef, -} - -impl MaintenanceModeManager { - pub fn new(kv_backend: KvBackendRef) -> Self { - Self { kv_backend } - } - - /// Enables maintenance mode. - pub async fn set_maintenance_mode(&self) -> Result<()> { - let req = PutRequest { - key: Vec::from(MAINTENANCE_KEY), - value: vec![], - prev_kv: false, - }; - self.kv_backend.put(req).await?; - Ok(()) - } - - /// Unsets maintenance mode. - pub async fn unset_maintenance_mode(&self) -> Result<()> { - self.kv_backend - .delete(MAINTENANCE_KEY.as_bytes(), false) - .await?; - Ok(()) - } - - /// Returns true if maintenance mode is enabled. - pub async fn maintenance_mode(&self) -> Result { - self.kv_backend.exists(MAINTENANCE_KEY.as_bytes()).await - } -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use crate::key::maintenance::MaintenanceModeManager; - use crate::kv_backend::memory::MemoryKvBackend; - - #[tokio::test] - async fn test_maintenance_mode_manager() { - let maintenance_mode_manager = Arc::new(MaintenanceModeManager::new(Arc::new( - MemoryKvBackend::new(), - ))); - assert!(!maintenance_mode_manager.maintenance_mode().await.unwrap()); - maintenance_mode_manager - .set_maintenance_mode() - .await - .unwrap(); - assert!(maintenance_mode_manager.maintenance_mode().await.unwrap()); - maintenance_mode_manager - .unset_maintenance_mode() - .await - .unwrap(); - assert!(!maintenance_mode_manager.maintenance_mode().await.unwrap()); - } -} diff --git a/src/common/meta/src/key/runtime_switch.rs b/src/common/meta/src/key/runtime_switch.rs new file mode 100644 index 0000000000..3fdaa24e85 --- /dev/null +++ b/src/common/meta/src/key/runtime_switch.rs @@ -0,0 +1,224 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::time::Duration; + +use common_error::ext::BoxedError; +use common_procedure::local::PauseAware; +use moka::future::Cache; +use snafu::ResultExt; + +use crate::error::{GetCacheSnafu, Result}; +use crate::key::{LEGACY_MAINTENANCE_KEY, MAINTENANCE_KEY, PAUSE_PROCEDURE_KEY}; +use crate::kv_backend::KvBackendRef; +use crate::rpc::store::{BatchDeleteRequest, PutRequest}; + +pub type RuntimeSwitchManagerRef = Arc; + +/// The runtime switch manager. +/// +/// Used to enable or disable runtime switches. +#[derive(Clone)] +pub struct RuntimeSwitchManager { + kv_backend: KvBackendRef, + cache: Cache, Option>>, +} + +#[async_trait::async_trait] +impl PauseAware for RuntimeSwitchManager { + async fn is_paused(&self) -> std::result::Result { + self.is_procedure_paused().await.map_err(BoxedError::new) + } +} + +const CACHE_TTL: Duration = Duration::from_secs(10); +const MAX_CAPACITY: u64 = 32; + +impl RuntimeSwitchManager { + pub fn new(kv_backend: KvBackendRef) -> Self { + let cache = Cache::builder() + .time_to_live(CACHE_TTL) + .max_capacity(MAX_CAPACITY) + .build(); + Self { kv_backend, cache } + } + + async fn put_key(&self, key: &str) -> Result<()> { + let req = PutRequest { + key: Vec::from(key), + value: vec![], + prev_kv: false, + }; + self.kv_backend.put(req).await?; + self.cache.invalidate(key.as_bytes()).await; + Ok(()) + } + + async fn delete_keys(&self, keys: &[&str]) -> Result<()> { + let req = BatchDeleteRequest::new() + .with_keys(keys.iter().map(|x| x.as_bytes().to_vec()).collect()); + self.kv_backend.batch_delete(req).await?; + for key in keys { + self.cache.invalidate(key.as_bytes()).await; + } + Ok(()) + } + + /// Returns true if the key exists. + async fn exists(&self, key: &str) -> Result { + let key = key.as_bytes().to_vec(); + let kv_backend = self.kv_backend.clone(); + let value = self + .cache + .try_get_with(key.clone(), async move { + kv_backend.get(&key).await.map(|v| v.map(|v| v.value)) + }) + .await + .context(GetCacheSnafu)?; + + Ok(value.is_some()) + } + + /// Enables maintenance mode. + pub async fn set_maintenance_mode(&self) -> Result<()> { + self.put_key(MAINTENANCE_KEY).await + } + + /// Unsets maintenance mode. + pub async fn unset_maintenance_mode(&self) -> Result<()> { + self.delete_keys(&[MAINTENANCE_KEY, LEGACY_MAINTENANCE_KEY]) + .await + } + + /// Returns true if maintenance mode is enabled. + pub async fn maintenance_mode(&self) -> Result { + let exists = self.exists(MAINTENANCE_KEY).await?; + if exists { + return Ok(true); + } + + let exists = self.exists(LEGACY_MAINTENANCE_KEY).await?; + if exists { + return Ok(true); + } + + Ok(false) + } + + // Pauses handling of incoming procedure requests. + pub async fn pasue_procedure(&self) -> Result<()> { + self.put_key(PAUSE_PROCEDURE_KEY).await + } + + /// Resumes processing of incoming procedure requests. + pub async fn resume_procedure(&self) -> Result<()> { + self.delete_keys(&[PAUSE_PROCEDURE_KEY]).await + } + + /// Returns true if the system is currently pausing incoming procedure requests. + pub async fn is_procedure_paused(&self) -> Result { + self.exists(PAUSE_PROCEDURE_KEY).await + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use crate::key::runtime_switch::RuntimeSwitchManager; + use crate::key::{LEGACY_MAINTENANCE_KEY, MAINTENANCE_KEY}; + use crate::kv_backend::memory::MemoryKvBackend; + use crate::kv_backend::KvBackend; + use crate::rpc::store::PutRequest; + + #[tokio::test] + async fn test_runtime_switch_manager_basic() { + let runtime_switch_manager = + Arc::new(RuntimeSwitchManager::new(Arc::new(MemoryKvBackend::new()))); + runtime_switch_manager + .put_key(MAINTENANCE_KEY) + .await + .unwrap(); + let v = runtime_switch_manager + .cache + .get(MAINTENANCE_KEY.as_bytes()) + .await; + assert!(v.is_none()); + runtime_switch_manager + .exists(MAINTENANCE_KEY) + .await + .unwrap(); + let v = runtime_switch_manager + .cache + .get(MAINTENANCE_KEY.as_bytes()) + .await; + assert!(v.is_some()); + runtime_switch_manager + .delete_keys(&[MAINTENANCE_KEY]) + .await + .unwrap(); + let v = runtime_switch_manager + .cache + .get(MAINTENANCE_KEY.as_bytes()) + .await; + assert!(v.is_none()); + } + + #[tokio::test] + async fn test_runtime_switch_manager() { + let runtime_switch_manager = + Arc::new(RuntimeSwitchManager::new(Arc::new(MemoryKvBackend::new()))); + assert!(!runtime_switch_manager.maintenance_mode().await.unwrap()); + runtime_switch_manager.set_maintenance_mode().await.unwrap(); + assert!(runtime_switch_manager.maintenance_mode().await.unwrap()); + runtime_switch_manager + .unset_maintenance_mode() + .await + .unwrap(); + assert!(!runtime_switch_manager.maintenance_mode().await.unwrap()); + } + + #[tokio::test] + async fn test_runtime_switch_manager_with_legacy_key() { + let kv_backend = Arc::new(MemoryKvBackend::new()); + kv_backend + .put(PutRequest { + key: Vec::from(LEGACY_MAINTENANCE_KEY), + value: vec![], + prev_kv: false, + }) + .await + .unwrap(); + let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend)); + assert!(runtime_switch_manager.maintenance_mode().await.unwrap()); + runtime_switch_manager + .unset_maintenance_mode() + .await + .unwrap(); + assert!(!runtime_switch_manager.maintenance_mode().await.unwrap()); + runtime_switch_manager.set_maintenance_mode().await.unwrap(); + assert!(runtime_switch_manager.maintenance_mode().await.unwrap()); + } + + #[tokio::test] + async fn test_pasue_procedure() { + let runtime_switch_manager = + Arc::new(RuntimeSwitchManager::new(Arc::new(MemoryKvBackend::new()))); + runtime_switch_manager.pasue_procedure().await.unwrap(); + assert!(runtime_switch_manager.is_procedure_paused().await.unwrap()); + runtime_switch_manager.resume_procedure().await.unwrap(); + assert!(!runtime_switch_manager.is_procedure_paused().await.unwrap()); + } +} diff --git a/src/common/procedure/src/error.rs b/src/common/procedure/src/error.rs index 90bf9dc5f6..128418d670 100644 --- a/src/common/procedure/src/error.rs +++ b/src/common/procedure/src/error.rs @@ -28,6 +28,19 @@ use crate::PoisonKey; #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { + #[snafu(display("Failed to check procedure manager status"))] + CheckStatus { + source: BoxedError, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Manager is pasued"))] + ManagerPasued { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display( "Failed to execute procedure due to external error, clean poisons: {}", clean_poisons @@ -246,7 +259,8 @@ impl ErrorExt for Error { | Error::ListState { source, .. } | Error::PutPoison { source, .. } | Error::DeletePoison { source, .. } - | Error::GetPoison { source, .. } => source.status_code(), + | Error::GetPoison { source, .. } + | Error::CheckStatus { source, .. } => source.status_code(), Error::ToJson { .. } | Error::DeleteState { .. } @@ -259,7 +273,8 @@ impl ErrorExt for Error { Error::RetryTimesExceeded { .. } | Error::RollbackTimesExceeded { .. } - | Error::ManagerNotStart { .. } => StatusCode::IllegalState, + | Error::ManagerNotStart { .. } + | Error::ManagerPasued { .. } => StatusCode::IllegalState, Error::RollbackNotSupported { .. } => StatusCode::Unsupported, Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => { diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index 811f858d92..d9aaa8ea8a 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -22,6 +22,7 @@ use std::time::{Duration, Instant}; use async_trait::async_trait; use backon::ExponentialBuilder; +use common_error::ext::BoxedError; use common_runtime::{RepeatedTask, TaskFunction}; use common_telemetry::tracing_context::{FutureExt, TracingContext}; use common_telemetry::{error, info, tracing}; @@ -30,9 +31,10 @@ use tokio::sync::watch::{self, Receiver, Sender}; use tokio::sync::{Mutex as TokioMutex, Notify}; use crate::error::{ - self, DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu, - PoisonKeyNotDefinedSnafu, ProcedureNotFoundSnafu, Result, StartRemoveOutdatedMetaTaskSnafu, - StopRemoveOutdatedMetaTaskSnafu, TooManyRunningProceduresSnafu, + self, CheckStatusSnafu, DuplicateProcedureSnafu, Error, LoaderConflictSnafu, + ManagerNotStartSnafu, ManagerPasuedSnafu, PoisonKeyNotDefinedSnafu, ProcedureNotFoundSnafu, + Result, StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu, + TooManyRunningProceduresSnafu, }; use crate::local::runner::Runner; use crate::procedure::{BoxedProcedureLoader, InitProcedureState, PoisonKeys, ProcedureInfo}; @@ -522,6 +524,14 @@ impl Default for ManagerConfig { } } +type PauseAwareRef = Arc; + +#[async_trait] +pub trait PauseAware: Send + Sync { + /// Returns true if the procedure manager is paused. + async fn is_paused(&self) -> std::result::Result; +} + /// A [ProcedureManager] that maintains procedure states locally. pub struct LocalManager { manager_ctx: Arc, @@ -531,6 +541,7 @@ pub struct LocalManager { /// GC task. remove_outdated_meta_task: TokioMutex>>, config: ManagerConfig, + pause_aware: Option, } impl LocalManager { @@ -539,6 +550,7 @@ impl LocalManager { config: ManagerConfig, state_store: StateStoreRef, poison_store: PoisonStoreRef, + pause_aware: Option, ) -> LocalManager { let manager_ctx = Arc::new(ManagerContext::new(poison_store)); @@ -549,6 +561,7 @@ impl LocalManager { retry_delay: config.retry_delay, remove_outdated_meta_task: TokioMutex::new(None), config, + pause_aware, } } @@ -719,6 +732,17 @@ impl LocalManager { let loaders = self.manager_ctx.loaders.lock().unwrap(); loaders.contains_key(name) } + + async fn check_status(&self) -> Result<()> { + if let Some(pause_aware) = self.pause_aware.as_ref() { + ensure!( + !pause_aware.is_paused().await.context(CheckStatusSnafu)?, + ManagerPasuedSnafu + ); + } + + Ok(()) + } } #[async_trait] @@ -774,6 +798,7 @@ impl ProcedureManager for LocalManager { !self.manager_ctx.contains_procedure(procedure_id), DuplicateProcedureSnafu { procedure_id } ); + self.check_status().await?; self.submit_root( procedure.id, @@ -979,7 +1004,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let poison_manager = Arc::new(InMemoryPoisonStore::new()); - let manager = LocalManager::new(config, state_store, poison_manager); + let manager = LocalManager::new(config, state_store, poison_manager, None); manager.manager_ctx.start(); manager @@ -1004,7 +1029,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(object_store.clone())); let poison_manager = Arc::new(InMemoryPoisonStore::new()); - let manager = LocalManager::new(config, state_store, poison_manager); + let manager = LocalManager::new(config, state_store, poison_manager, None); manager.manager_ctx.start(); manager @@ -1058,7 +1083,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let poison_manager = Arc::new(InMemoryPoisonStore::new()); - let manager = LocalManager::new(config, state_store, poison_manager); + let manager = LocalManager::new(config, state_store, poison_manager, None); manager.manager_ctx.start(); let procedure_id = ProcedureId::random(); @@ -1110,7 +1135,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let poison_manager = Arc::new(InMemoryPoisonStore::new()); - let manager = LocalManager::new(config, state_store, poison_manager); + let manager = LocalManager::new(config, state_store, poison_manager, None); manager.manager_ctx.start(); #[derive(Debug)] @@ -1191,7 +1216,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let poison_manager = Arc::new(InMemoryPoisonStore::new()); - let manager = LocalManager::new(config, state_store, poison_manager); + let manager = LocalManager::new(config, state_store, poison_manager, None); let mut procedure = ProcedureToLoad::new("submit"); procedure.lock_key = LockKey::single_exclusive("test.submit"); @@ -1219,7 +1244,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let poison_manager = Arc::new(InMemoryPoisonStore::new()); - let manager = LocalManager::new(config, state_store, poison_manager); + let manager = LocalManager::new(config, state_store, poison_manager, None); manager.start().await.unwrap(); manager.stop().await.unwrap(); @@ -1256,7 +1281,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(object_store.clone())); let poison_manager = Arc::new(InMemoryPoisonStore::new()); - let manager = LocalManager::new(config, state_store, poison_manager); + let manager = LocalManager::new(config, state_store, poison_manager, None); manager.manager_ctx.set_running(); let mut procedure = ProcedureToLoad::new("submit"); @@ -1338,7 +1363,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let poison_manager = Arc::new(InMemoryPoisonStore::new()); - let manager = LocalManager::new(config, state_store, poison_manager); + let manager = LocalManager::new(config, state_store, poison_manager, None); manager.manager_ctx.set_running(); manager @@ -1463,7 +1488,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(object_store.clone())); let poison_manager = Arc::new(InMemoryPoisonStore::new()); - let manager = LocalManager::new(config, state_store, poison_manager); + let manager = LocalManager::new(config, state_store, poison_manager, None); manager.manager_ctx.start(); let notify = Arc::new(Notify::new()); diff --git a/src/common/procedure/src/watcher.rs b/src/common/procedure/src/watcher.rs index 99af9a2dc7..48d4c8559d 100644 --- a/src/common/procedure/src/watcher.rs +++ b/src/common/procedure/src/watcher.rs @@ -83,7 +83,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let poison_manager = Arc::new(InMemoryPoisonStore::default()); - let manager = LocalManager::new(config, state_store, poison_manager); + let manager = LocalManager::new(config, state_store, poison_manager, None); manager.start().await.unwrap(); #[derive(Debug)] diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index ddf728c1ba..a7dbd88969 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -37,6 +37,7 @@ use common_base::cancellation::CancellableFuture; use common_base::Plugins; use common_config::KvBackendConfig; use common_error::ext::{BoxedError, ErrorExt}; +use common_meta::key::runtime_switch::RuntimeSwitchManager; use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::KvBackendRef; use common_meta::state_store::KvStateStore; @@ -125,10 +126,12 @@ impl Instance { max_running_procedures: procedure_config.max_running_procedures, ..Default::default() }; + let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend.clone())); let procedure_manager = Arc::new(LocalManager::new( manager_config, kv_state_store.clone(), kv_state_store, + Some(runtime_switch_manager), )); Ok((kv_backend, procedure_manager)) diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 7abf5193f6..d5abdd6d05 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -687,8 +687,8 @@ pub enum Error { location: Location, }, - #[snafu(display("Maintenance mode manager error"))] - MaintenanceModeManager { + #[snafu(display("Runtime switch manager error"))] + RuntimeSwitchManager { source: common_meta::error::Error, #[snafu(implicit)] location: Location, @@ -1015,7 +1015,7 @@ impl ErrorExt for Error { Error::SubmitDdlTask { source, .. } => source.status_code(), Error::ConvertProtoData { source, .. } | Error::TableMetadataManager { source, .. } - | Error::MaintenanceModeManager { source, .. } + | Error::RuntimeSwitchManager { source, .. } | Error::KvBackend { source, .. } | Error::UnexpectedLogicalRouteTable { source, .. } | Error::UpdateTopicNameValue { source, .. } => source.status_code(), diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 91d0b22caf..c5cfaaf0e2 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -27,7 +27,7 @@ use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::ddl::ProcedureExecutorRef; use common_meta::distributed_time_constants; -use common_meta::key::maintenance::MaintenanceModeManagerRef; +use common_meta::key::runtime_switch::RuntimeSwitchManagerRef; use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef}; use common_meta::leadership_notifier::{ @@ -437,7 +437,7 @@ pub struct Metasrv { procedure_executor: ProcedureExecutorRef, wal_options_allocator: WalOptionsAllocatorRef, table_metadata_manager: TableMetadataManagerRef, - maintenance_mode_manager: MaintenanceModeManagerRef, + runtime_switch_manager: RuntimeSwitchManagerRef, memory_region_keeper: MemoryRegionKeeperRef, greptimedb_telemetry_task: Arc, region_migration_manager: RegionMigrationManagerRef, @@ -696,8 +696,8 @@ impl Metasrv { &self.table_metadata_manager } - pub fn maintenance_mode_manager(&self) -> &MaintenanceModeManagerRef { - &self.maintenance_mode_manager + pub fn runtime_switch_manager(&self) -> &RuntimeSwitchManagerRef { + &self.runtime_switch_manager } pub fn memory_region_keeper(&self) -> &MemoryRegionKeeperRef { diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 98d4b14709..262dff868f 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -29,7 +29,7 @@ use common_meta::ddl_manager::DdlManager; use common_meta::distributed_time_constants; use common_meta::key::flow::flow_state::FlowStateManager; use common_meta::key::flow::FlowMetadataManager; -use common_meta::key::maintenance::MaintenanceModeManager; +use common_meta::key::runtime_switch::{RuntimeSwitchManager, RuntimeSwitchManagerRef}; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; @@ -193,7 +193,9 @@ impl MetasrvBuilder { let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector::default())); let pushers = Pushers::default(); let mailbox = build_mailbox(&kv_backend, &pushers); - let procedure_manager = build_procedure_manager(&options, &kv_backend); + let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend.clone())); + let procedure_manager = + build_procedure_manager(&options, &kv_backend, &runtime_switch_manager); let table_metadata_manager = Arc::new(TableMetadataManager::new( leader_cached_kv_backend.clone() as _, @@ -201,7 +203,7 @@ impl MetasrvBuilder { let flow_metadata_manager = Arc::new(FlowMetadataManager::new( leader_cached_kv_backend.clone() as _, )); - let maintenance_mode_manager = Arc::new(MaintenanceModeManager::new(kv_backend.clone())); + let selector_ctx = SelectorContext { server_addr: options.grpc.server_addr.clone(), datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS, @@ -341,7 +343,7 @@ impl MetasrvBuilder { selector_ctx.clone(), supervisor_selector, region_migration_manager.clone(), - maintenance_mode_manager.clone(), + runtime_switch_manager.clone(), peer_lookup_service.clone(), leader_cached_kv_backend.clone(), ); @@ -464,7 +466,7 @@ impl MetasrvBuilder { procedure_executor: ddl_manager, wal_options_allocator, table_metadata_manager, - maintenance_mode_manager, + runtime_switch_manager, greptimedb_telemetry_task: get_greptimedb_telemetry_task( Some(metasrv_home), meta_peer_client, @@ -507,6 +509,7 @@ fn build_mailbox(kv_backend: &KvBackendRef, pushers: &Pushers) -> MailboxRef { fn build_procedure_manager( options: &MetasrvOptions, kv_backend: &KvBackendRef, + runtime_switch_manager: &RuntimeSwitchManagerRef, ) -> ProcedureManagerRef { let manager_config = ManagerConfig { max_retry_times: options.procedure.max_retry_times, @@ -527,6 +530,7 @@ fn build_procedure_manager( manager_config, kv_state_store.clone(), kv_state_store, + Some(runtime_switch_manager.clone()), )) } diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index ae5b6736e4..a60a60743c 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -92,6 +92,7 @@ impl TestingEnv { ManagerConfig::default(), state_store, poison_manager, + None, )); Self { diff --git a/src/meta-srv/src/procedure/wal_prune/test_util.rs b/src/meta-srv/src/procedure/wal_prune/test_util.rs index baa9129d3c..53436d7bef 100644 --- a/src/meta-srv/src/procedure/wal_prune/test_util.rs +++ b/src/meta-srv/src/procedure/wal_prune/test_util.rs @@ -52,6 +52,7 @@ impl TestEnv { ManagerConfig::default(), state_store, poison_manager, + None, )); let mailbox_ctx = MailboxContext::new(mailbox_sequence); diff --git a/src/meta-srv/src/region/supervisor.rs b/src/meta-srv/src/region/supervisor.rs index 6d4e67cd28..f6017cc816 100644 --- a/src/meta-srv/src/region/supervisor.rs +++ b/src/meta-srv/src/region/supervisor.rs @@ -20,7 +20,7 @@ use std::time::{Duration, Instant}; use async_trait::async_trait; use common_meta::datanode::Stat; use common_meta::ddl::{DetectingRegion, RegionFailureDetectorController}; -use common_meta::key::maintenance::MaintenanceModeManagerRef; +use common_meta::key::runtime_switch::RuntimeSwitchManagerRef; use common_meta::key::table_route::{TableRouteKey, TableRouteValue}; use common_meta::key::{MetadataKey, MetadataValue}; use common_meta::kv_backend::KvBackendRef; @@ -280,7 +280,7 @@ pub struct RegionSupervisor { /// Region migration manager. region_migration_manager: RegionMigrationManagerRef, /// The maintenance mode manager. - maintenance_mode_manager: MaintenanceModeManagerRef, + runtime_switch_manager: RuntimeSwitchManagerRef, /// Peer lookup service peer_lookup: PeerLookupServiceRef, /// The kv backend. @@ -354,7 +354,7 @@ impl RegionSupervisor { selector_context: SelectorContext, selector: RegionSupervisorSelector, region_migration_manager: RegionMigrationManagerRef, - maintenance_mode_manager: MaintenanceModeManagerRef, + runtime_switch_manager: RuntimeSwitchManagerRef, peer_lookup: PeerLookupServiceRef, kv_backend: KvBackendRef, ) -> Self { @@ -365,7 +365,7 @@ impl RegionSupervisor { selector_context, selector, region_migration_manager, - maintenance_mode_manager, + runtime_switch_manager, peer_lookup, kv_backend, } @@ -559,10 +559,10 @@ impl RegionSupervisor { } pub(crate) async fn is_maintenance_mode_enabled(&self) -> Result { - self.maintenance_mode_manager + self.runtime_switch_manager .maintenance_mode() .await - .context(error::MaintenanceModeManagerSnafu) + .context(error::RuntimeSwitchManagerSnafu) } async fn select_peers( @@ -768,7 +768,7 @@ pub(crate) mod tests { use common_meta::key::table_route::{ LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue, }; - use common_meta::key::{maintenance, TableMetadataManager}; + use common_meta::key::{runtime_switch, TableMetadataManager}; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; use common_meta::test_util::NoopPeerLookupService; @@ -798,8 +798,8 @@ pub(crate) mod tests { env.procedure_manager().clone(), context_factory, )); - let maintenance_mode_manager = - Arc::new(maintenance::MaintenanceModeManager::new(env.kv_backend())); + let runtime_switch_manager = + Arc::new(runtime_switch::RuntimeSwitchManager::new(env.kv_backend())); let peer_lookup = Arc::new(NoopPeerLookupService); let (tx, rx) = RegionSupervisor::channel(); let kv_backend = env.kv_backend(); @@ -811,7 +811,7 @@ pub(crate) mod tests { selector_context, RegionSupervisorSelector::NaiveSelector(selector), region_migration_manager, - maintenance_mode_manager, + runtime_switch_manager, peer_lookup, kv_backend, ), @@ -1008,7 +1008,7 @@ pub(crate) mod tests { let (mut supervisor, sender) = new_test_supervisor(); supervisor - .maintenance_mode_manager + .runtime_switch_manager .set_maintenance_mode() .await .unwrap(); diff --git a/src/meta-srv/src/service/admin.rs b/src/meta-srv/src/service/admin.rs index c2b7b59794..23a6034a4d 100644 --- a/src/meta-srv/src/service/admin.rs +++ b/src/meta-srv/src/service/admin.rs @@ -17,6 +17,7 @@ mod heartbeat; mod leader; mod maintenance; mod node_lease; +mod procedure; mod util; use std::collections::HashMap; @@ -56,10 +57,25 @@ pub fn make_admin_service(metasrv: Arc) -> Admin { }, ); - let router = router.route( - "/maintenance", + let router = router.routes( + &[ + "/maintenance", + "/maintenance/status", + "/maintenance/enable", + "/maintenance/disable", + ], maintenance::MaintenanceHandler { - manager: metasrv.maintenance_mode_manager().clone(), + manager: metasrv.runtime_switch_manager().clone(), + }, + ); + let router = router.routes( + &[ + "/procedure-manager/pause", + "/procedure-manager/resume", + "/procedure-manager/status", + ], + procedure::ProcedureManagerHandler { + manager: metasrv.runtime_switch_manager().clone(), }, ); let router = Router::nest("/admin", router); @@ -97,10 +113,7 @@ impl NamedService for Admin { const NAME: &'static str = "admin"; } -impl Service> for Admin -where - T: Send, -{ +impl Service> for Admin { type Response = http::Response; type Error = Infallible; type Future = BoxFuture; @@ -109,7 +122,7 @@ where Poll::Ready(Ok(())) } - fn call(&mut self, req: http::Request) -> Self::Future { + fn call(&mut self, req: http::Request) -> Self::Future { let router = self.router.clone(); let query_params = req .uri() @@ -128,7 +141,7 @@ where #[derive(Default)] pub struct Router { - handlers: HashMap>, + handlers: HashMap>, } impl Router { @@ -153,7 +166,17 @@ impl Router { pub fn route(mut self, path: &str, handler: impl HttpHandler + 'static) -> Self { check_path(path); - let _ = self.handlers.insert(path.to_owned(), Box::new(handler)); + let _ = self.handlers.insert(path.to_owned(), Arc::new(handler)); + + self + } + + pub fn routes(mut self, paths: &[&str], handler: impl HttpHandler + 'static) -> Self { + let handler = Arc::new(handler); + for path in paths { + check_path(path); + let _ = self.handlers.insert(path.to_string(), handler.clone()); + } self } @@ -204,7 +227,7 @@ fn boxed(body: String) -> BoxBody { mod tests { use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::KvBackendRef; - use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream}; use super::*; use crate::metasrv::builder::MetasrvBuilder; @@ -325,6 +348,13 @@ mod tests { metasrv } + async fn send_request(client: &mut DuplexStream, request: &[u8]) -> String { + client.write_all(request).await.unwrap(); + let mut buf = vec![0; 1024]; + let n = client.read(&mut buf).await.unwrap(); + String::from_utf8_lossy(&buf[..n]).to_string() + } + #[tokio::test(flavor = "multi_thread")] async fn test_metasrv_maintenance_mode() { common_telemetry::init_default_ut_logging(); @@ -343,73 +373,188 @@ mod tests { }); // Get maintenance mode - let http_request = b"GET /admin/maintenance HTTP/1.1\r\nHost: localhost\r\n\r\n"; - client.write_all(http_request).await.unwrap(); - let mut buf = vec![0; 1024]; - let n = client.read(&mut buf).await.unwrap(); - let response = String::from_utf8_lossy(&buf[..n]); + let response = send_request( + &mut client, + b"GET /admin/maintenance HTTP/1.1\r\nHost: localhost\r\n\r\n", + ) + .await; assert!(response.contains(r#"{"enabled":false}"#)); assert!(response.contains("200 OK")); // Set maintenance mode to true - let http_post = b"POST /admin/maintenance?enable=true HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n"; - client.write_all(http_post).await.unwrap(); - let mut buf = vec![0; 1024]; - let n = client.read(&mut buf).await.unwrap(); - let response = String::from_utf8_lossy(&buf[..n]); + let response = send_request( + &mut client, + b"POST /admin/maintenance?enable=true HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n", + ) + .await; assert!(response.contains(r#"{"enabled":true}"#)); assert!(response.contains("200 OK")); let enabled = metasrv - .maintenance_mode_manager() + .runtime_switch_manager() .maintenance_mode() .await .unwrap(); assert!(enabled); // Get maintenance mode again - let http_request = b"GET /admin/maintenance HTTP/1.1\r\nHost: localhost\r\n\r\n"; - client.write_all(http_request).await.unwrap(); - let mut buf = vec![0; 1024]; - let n = client.read(&mut buf).await.unwrap(); - let response = String::from_utf8_lossy(&buf[..n]); + let response = send_request( + &mut client, + b"GET /admin/maintenance HTTP/1.1\r\nHost: localhost\r\n\r\n", + ) + .await; assert!(response.contains(r#"{"enabled":true}"#)); assert!(response.contains("200 OK")); // Set maintenance mode to false - let http_post = b"POST /admin/maintenance?enable=false HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n"; - client.write_all(http_post).await.unwrap(); - let mut buf = vec![0; 1024]; - let n = client.read(&mut buf).await.unwrap(); - let response = String::from_utf8_lossy(&buf[..n]); + let response = send_request( + &mut client, + b"POST /admin/maintenance?enable=false HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n", + ) + .await; assert!(response.contains(r#"{"enabled":false}"#)); assert!(response.contains("200 OK")); let enabled = metasrv - .maintenance_mode_manager() + .runtime_switch_manager() .maintenance_mode() .await .unwrap(); assert!(!enabled); // Set maintenance mode to true via GET request - let http_request = - b"GET /admin/maintenance?enable=true HTTP/1.1\r\nHost: localhost\r\n\r\n"; - client.write_all(http_request).await.unwrap(); - let mut buf = vec![0; 1024]; - let n = client.read(&mut buf).await.unwrap(); - let response = String::from_utf8_lossy(&buf[..n]); + let response = send_request( + &mut client, + b"GET /admin/maintenance?enable=true HTTP/1.1\r\nHost: localhost\r\n\r\n", + ) + .await; assert!(response.contains(r#"{"enabled":true}"#)); assert!(response.contains("200 OK")); // Set maintenance mode to false via GET request - let http_request = - b"PUT /admin/maintenance?enable=false HTTP/1.1\r\nHost: localhost\r\n\r\n"; - client.write_all(http_request).await.unwrap(); - let mut buf = vec![0; 1024]; - let n = client.read(&mut buf).await.unwrap(); - let response = String::from_utf8_lossy(&buf[..n]); + let response = send_request( + &mut client, + b"PUT /admin/maintenance?enable=false HTTP/1.1\r\nHost: localhost\r\n\r\n", + ) + .await; assert!(response.contains(r#"{"enabled":false}"#)); assert!(response.contains("200 OK")); + + // Get maintenance mode via status path + let response = send_request( + &mut client, + b"GET /admin/maintenance/status HTTP/1.1\r\nHost: localhost\r\n\r\n", + ) + .await; + assert!(response.contains(r#"{"enabled":false}"#)); + + // Set maintenance mode via enable path + let response = send_request( + &mut client, + b"POST /admin/maintenance/enable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n", + ) + .await; + assert!(response.contains(r#"{"enabled":true}"#)); + + // Unset maintenance mode via disable path + let response = send_request( + &mut client, + b"POST /admin/maintenance/disable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n", + ) + .await; + assert!(response.contains(r#"{"enabled":false}"#)); + + // send POST request to status path + let response = send_request( + &mut client, + b"POST /admin/maintenance/status HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n", + ) + .await; + assert!(response.contains("404 Not Found")); + + // send GET request to enable path + let response = send_request( + &mut client, + b"GET /admin/maintenance/enable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n", + ) + .await; + assert!(response.contains("404 Not Found")); + + // send GET request to disable path + let response = send_request( + &mut client, + b"GET /admin/maintenance/disable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n", + ) + .await; + assert!(response.contains("404 Not Found")); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_metasrv_procedure_manager_handler() { + common_telemetry::init_default_ut_logging(); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let metasrv = test_metasrv(kv_backend).await; + metasrv.try_start().await.unwrap(); + + let (mut client, server) = tokio::io::duplex(1024); + let metasrv = Arc::new(metasrv); + let service = metasrv.clone(); + let _handle = tokio::spawn(async move { + let router = bootstrap::router(service); + router + .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .await + }); + + // send GET request to procedure-manager/status path + let response = send_request( + &mut client, + b"GET /admin/procedure-manager/status HTTP/1.1\r\nHost: localhost\r\n\r\n", + ) + .await; + assert!(response.contains("200 OK")); + assert!( + response.contains(r#"{"status":"running"}"#), + "response: {}", + response + ); + + // send POST request to procedure-manager/pause path + let response = send_request( + &mut client, + b"POST /admin/procedure-manager/pause HTTP/1.1\r\nHost: localhost\r\n\r\n", + ) + .await; + assert!(response.contains("200 OK")); + assert!(response.contains(r#"{"status":"paused"}"#)); + + // send POST request to procedure-manager/resume path + let response = send_request( + &mut client, + b"POST /admin/procedure-manager/resume HTTP/1.1\r\nHost: localhost\r\n\r\n", + ) + .await; + assert!(response.contains("200 OK")); + assert!( + response.contains(r#"{"status":"running"}"#), + "response: {}", + response + ); + + // send GET request to procedure-manager/resume path + let response = send_request( + &mut client, + b"GET /admin/procedure-manager/resume HTTP/1.1\r\nHost: localhost\r\n\r\n", + ) + .await; + assert!(response.contains("404 Not Found")); + + // send GET request to procedure-manager/pause path + let response = send_request( + &mut client, + b"GET /admin/procedure-manager/pause HTTP/1.1\r\nHost: localhost\r\n\r\n", + ) + .await; + assert!(response.contains("404 Not Found")); } } diff --git a/src/meta-srv/src/service/admin/maintenance.rs b/src/meta-srv/src/service/admin/maintenance.rs index 0b012187b8..ed753c3bbe 100644 --- a/src/meta-srv/src/service/admin/maintenance.rs +++ b/src/meta-srv/src/service/admin/maintenance.rs @@ -14,7 +14,7 @@ use std::collections::HashMap; -use common_meta::key::maintenance::MaintenanceModeManagerRef; +use common_meta::key::runtime_switch::RuntimeSwitchManagerRef; use common_telemetry::{info, warn}; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; @@ -22,14 +22,15 @@ use tonic::codegen::http; use tonic::codegen::http::Response; use crate::error::{ - self, InvalidHttpBodySnafu, MaintenanceModeManagerSnafu, MissingRequiredParameterSnafu, - ParseBoolSnafu, Result, UnsupportedSnafu, + self, MissingRequiredParameterSnafu, ParseBoolSnafu, Result, RuntimeSwitchManagerSnafu, + UnsupportedSnafu, }; +use crate::service::admin::util::{to_json_response, to_not_found_response}; use crate::service::admin::HttpHandler; #[derive(Clone)] pub struct MaintenanceHandler { - pub manager: MaintenanceModeManagerRef, + pub manager: RuntimeSwitchManagerRef, } #[derive(Debug, Serialize, Deserialize)] @@ -48,79 +49,124 @@ impl TryFrom for String { } impl MaintenanceHandler { - async fn get_maintenance(&self) -> crate::Result> { + async fn get_maintenance(&self) -> crate::Result { let enabled = self .manager .maintenance_mode() .await - .context(MaintenanceModeManagerSnafu)?; - let response = MaintenanceResponse { enabled }.try_into()?; - http::Response::builder() - .status(http::StatusCode::OK) - .body(response) - .context(InvalidHttpBodySnafu) + .context(RuntimeSwitchManagerSnafu)?; + Ok(MaintenanceResponse { enabled }) } - async fn set_maintenance( + async fn set_maintenance(&self) -> crate::Result { + self.manager + .set_maintenance_mode() + .await + .context(RuntimeSwitchManagerSnafu)?; + // TODO(weny): Add a record to the system events. + info!("Enable the maintenance mode."); + Ok(MaintenanceResponse { enabled: true }) + } + + async fn unset_maintenance(&self) -> crate::Result { + self.manager + .unset_maintenance_mode() + .await + .context(RuntimeSwitchManagerSnafu)?; + // TODO(weny): Add a record to the system events. + info!("Disable the maintenance mode."); + Ok(MaintenanceResponse { enabled: false }) + } + + async fn handle_legacy_maintenance( &self, params: &HashMap, - ) -> crate::Result> { - let enable = params - .get("enable") - .map(|v| v.parse::()) - .context(MissingRequiredParameterSnafu { param: "enable" })? - .context(ParseBoolSnafu { - err_msg: "'enable' must be 'true' or 'false'", - })?; - + ) -> crate::Result { + let enable = get_enable_from_params(params)?; if enable { - self.manager - .set_maintenance_mode() - .await - .context(MaintenanceModeManagerSnafu)?; - info!("Enable the maintenance mode."); + self.set_maintenance().await } else { - self.manager - .unset_maintenance_mode() - .await - .context(MaintenanceModeManagerSnafu)?; - info!("Disable the maintenance mode."); - }; - - let response = MaintenanceResponse { enabled: enable }.try_into()?; - http::Response::builder() - .status(http::StatusCode::OK) - .body(response) - .context(InvalidHttpBodySnafu) + self.unset_maintenance().await + } } } +fn get_enable_from_params(params: &HashMap) -> crate::Result { + params + .get("enable") + .map(|v| v.parse::()) + .context(MissingRequiredParameterSnafu { param: "enable" })? + .context(ParseBoolSnafu { + err_msg: "'enable' must be 'true' or 'false'", + }) +} + +const MAINTENANCE_PATH: &str = "maintenance"; +const ENABLE_SUFFIX: &str = "enable"; +const DISABLE_SUFFIX: &str = "disable"; +const STATUS_SUFFIX: &str = "status"; + #[async_trait::async_trait] impl HttpHandler for MaintenanceHandler { + // TODO(weny): Remove the legacy version of the maintenance API. + // However, we need to keep the legacy version for a while to avoid breaking the existing operators. async fn handle( &self, - _: &str, + path: &str, method: http::Method, params: &HashMap, ) -> crate::Result> { match method { http::Method::GET => { - if params.is_empty() { - self.get_maintenance().await - } else { + if path.ends_with(STATUS_SUFFIX) { + // Handle GET request to '/admin/maintenance/status' + let response = self.get_maintenance().await?; + to_json_response(response) + } else if path.ends_with(MAINTENANCE_PATH) && params.is_empty() { + // Handle GET request to '/admin/maintenance'. (The legacy version) + let response = self.get_maintenance().await?; + to_json_response(response) + } else if path.ends_with(MAINTENANCE_PATH) { + // Handle GET request to '/admin/maintenance' with URL parameters. (The legacy version) warn!( "Found URL parameters in '/admin/maintenance' request, it's deprecated, will be removed in the future" ); // The old version operator will send GET request with URL parameters, // so we need to support it. - self.set_maintenance(params).await + let response = self.handle_legacy_maintenance(params).await?; + to_json_response(response) + } else { + to_not_found_response() } } http::Method::PUT => { - warn!("Found PUT request to '/admin/maintenance', it's deprecated, will be removed in the future"); - self.set_maintenance(params).await + // Handle PUT request to '/admin/maintenance' with URL parameters. (The legacy version) + if path.ends_with(MAINTENANCE_PATH) { + warn!("Found PUT request to '/admin/maintenance', it's deprecated, will be removed in the future"); + let response = self.handle_legacy_maintenance(params).await?; + to_json_response(response) + } else { + to_not_found_response() + } + } + http::Method::POST => { + // Handle POST request to '/admin/maintenance/enable' + if path.ends_with(ENABLE_SUFFIX) { + let response = self.set_maintenance().await?; + to_json_response(response) + } else if path.ends_with(DISABLE_SUFFIX) { + // Handle POST request to '/admin/maintenance/disable' + let response = self.unset_maintenance().await?; + to_json_response(response) + } else if path.ends_with(MAINTENANCE_PATH) { + // Handle POST request to '/admin/maintenance' with URL parameters. (The legacy version) + warn!("Found PUT request to '/admin/maintenance', it's deprecated, will be removed in the future"); + let response = self.handle_legacy_maintenance(params).await?; + to_json_response(response) + } else { + to_not_found_response() + } } - http::Method::POST => self.set_maintenance(params).await, _ => UnsupportedSnafu { operation: format!("http method {method}"), } diff --git a/src/meta-srv/src/service/admin/procedure.rs b/src/meta-srv/src/service/admin/procedure.rs new file mode 100644 index 0000000000..232a93acbb --- /dev/null +++ b/src/meta-srv/src/service/admin/procedure.rs @@ -0,0 +1,119 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use common_meta::key::runtime_switch::RuntimeSwitchManagerRef; +use common_telemetry::info; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; +use tonic::codegen::http; +use tonic::codegen::http::Response; + +use crate::error::RuntimeSwitchManagerSnafu; +use crate::service::admin::util::{to_json_response, to_not_found_response}; +use crate::service::admin::HttpHandler; + +#[derive(Clone)] +pub struct ProcedureManagerHandler { + pub manager: RuntimeSwitchManagerRef, +} + +#[derive(Debug, Serialize, Deserialize)] +struct ProcedureManagerStatusResponse { + status: ProcedureManagerStatus, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +enum ProcedureManagerStatus { + Paused, + Running, +} + +impl ProcedureManagerHandler { + async fn pause_procedure_manager(&self) -> crate::Result { + self.manager + .pasue_procedure() + .await + .context(RuntimeSwitchManagerSnafu)?; + // TODO(weny): Add a record to the system events. + info!("Pause the procedure manager."); + Ok(ProcedureManagerStatusResponse { + status: ProcedureManagerStatus::Paused, + }) + } + + async fn resume_procedure_manager(&self) -> crate::Result { + self.manager + .resume_procedure() + .await + .context(RuntimeSwitchManagerSnafu)?; + // TODO(weny): Add a record to the system events. + info!("Resume the procedure manager."); + Ok(ProcedureManagerStatusResponse { + status: ProcedureManagerStatus::Running, + }) + } + + async fn get_procedure_manager_status(&self) -> crate::Result { + let is_paused = self + .manager + .is_procedure_paused() + .await + .context(RuntimeSwitchManagerSnafu)?; + let response = ProcedureManagerStatusResponse { + status: if is_paused { + ProcedureManagerStatus::Paused + } else { + ProcedureManagerStatus::Running + }, + }; + + Ok(response) + } +} + +#[async_trait::async_trait] +impl HttpHandler for ProcedureManagerHandler { + async fn handle( + &self, + path: &str, + method: http::Method, + _: &HashMap, + ) -> crate::Result> { + match method { + http::Method::GET => { + if path.ends_with("status") { + let response = self.get_procedure_manager_status().await?; + to_json_response(response) + } else { + to_not_found_response() + } + } + http::Method::POST => { + if path.ends_with("pause") { + let response = self.pause_procedure_manager().await?; + to_json_response(response) + } else if path.ends_with("resume") { + let response = self.resume_procedure_manager().await?; + to_json_response(response) + } else { + to_not_found_response() + } + } + _ => to_not_found_response(), + } + } +} diff --git a/src/meta-srv/src/service/admin/util.rs b/src/meta-srv/src/service/admin/util.rs index cdabf38a63..13a3b5f89f 100644 --- a/src/meta-srv/src/service/admin/util.rs +++ b/src/meta-srv/src/service/admin/util.rs @@ -12,11 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt::Debug; + +use serde::Serialize; use snafu::ResultExt; use tonic::codegen::http; use crate::error::{self, Result}; +/// Returns a 200 response with a text body. pub fn to_text_response(text: &str) -> Result> { http::Response::builder() .header("Content-Type", "text/plain") @@ -24,3 +28,26 @@ pub fn to_text_response(text: &str) -> Result> { .body(text.to_string()) .context(error::InvalidHttpBodySnafu) } + +/// Returns a 200 response with a JSON body. +pub fn to_json_response(response: T) -> Result> +where + T: Serialize + Debug, +{ + let response = serde_json::to_string(&response).context(error::SerializeToJsonSnafu { + input: format!("{response:?}"), + })?; + http::Response::builder() + .header("Content-Type", "application/json") + .status(http::StatusCode::OK) + .body(response) + .context(error::InvalidHttpBodySnafu) +} + +/// Returns a 404 response with an empty body. +pub fn to_not_found_response() -> Result> { + http::Response::builder() + .status(http::StatusCode::NOT_FOUND) + .body("".to_string()) + .context(error::InvalidHttpBodySnafu) +}