feat: implement pause/resume functionality for procedure manager (#6393)

* feat: implement pause/resume functionality for procedure manager

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-06-26 09:57:12 +08:00
committed by GitHub
parent 55f5e09885
commit 4737285275
18 changed files with 744 additions and 217 deletions

View File

@@ -995,6 +995,7 @@ mod tests {
Default::default(),
state_store,
poison_manager,
None,
));
let _ = DdlManager::try_new(

View File

@@ -100,8 +100,8 @@
pub mod catalog_name;
pub mod datanode_table;
pub mod flow;
pub mod maintenance;
pub mod node_address;
pub mod runtime_switch;
mod schema_metadata_manager;
pub mod schema_name;
pub mod table_info;
@@ -164,7 +164,9 @@ use crate::state_store::PoisonValue;
use crate::DatanodeId;
pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*";
pub const MAINTENANCE_KEY: &str = "__maintenance";
pub const LEGACY_MAINTENANCE_KEY: &str = "__maintenance";
pub const MAINTENANCE_KEY: &str = "__switches/maintenance";
pub const PAUSE_PROCEDURE_KEY: &str = "__switches/pause_procedure";
pub const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";

View File

@@ -1,86 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use crate::error::Result;
use crate::key::MAINTENANCE_KEY;
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::PutRequest;
pub type MaintenanceModeManagerRef = Arc<MaintenanceModeManager>;
/// The maintenance mode manager.
///
/// Used to enable or disable maintenance mode.
#[derive(Clone)]
pub struct MaintenanceModeManager {
kv_backend: KvBackendRef,
}
impl MaintenanceModeManager {
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
}
/// Enables maintenance mode.
pub async fn set_maintenance_mode(&self) -> Result<()> {
let req = PutRequest {
key: Vec::from(MAINTENANCE_KEY),
value: vec![],
prev_kv: false,
};
self.kv_backend.put(req).await?;
Ok(())
}
/// Unsets maintenance mode.
pub async fn unset_maintenance_mode(&self) -> Result<()> {
self.kv_backend
.delete(MAINTENANCE_KEY.as_bytes(), false)
.await?;
Ok(())
}
/// Returns true if maintenance mode is enabled.
pub async fn maintenance_mode(&self) -> Result<bool> {
self.kv_backend.exists(MAINTENANCE_KEY.as_bytes()).await
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use crate::key::maintenance::MaintenanceModeManager;
use crate::kv_backend::memory::MemoryKvBackend;
#[tokio::test]
async fn test_maintenance_mode_manager() {
let maintenance_mode_manager = Arc::new(MaintenanceModeManager::new(Arc::new(
MemoryKvBackend::new(),
)));
assert!(!maintenance_mode_manager.maintenance_mode().await.unwrap());
maintenance_mode_manager
.set_maintenance_mode()
.await
.unwrap();
assert!(maintenance_mode_manager.maintenance_mode().await.unwrap());
maintenance_mode_manager
.unset_maintenance_mode()
.await
.unwrap();
assert!(!maintenance_mode_manager.maintenance_mode().await.unwrap());
}
}

View File

@@ -0,0 +1,224 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::time::Duration;
use common_error::ext::BoxedError;
use common_procedure::local::PauseAware;
use moka::future::Cache;
use snafu::ResultExt;
use crate::error::{GetCacheSnafu, Result};
use crate::key::{LEGACY_MAINTENANCE_KEY, MAINTENANCE_KEY, PAUSE_PROCEDURE_KEY};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{BatchDeleteRequest, PutRequest};
pub type RuntimeSwitchManagerRef = Arc<RuntimeSwitchManager>;
/// The runtime switch manager.
///
/// Used to enable or disable runtime switches.
#[derive(Clone)]
pub struct RuntimeSwitchManager {
kv_backend: KvBackendRef,
cache: Cache<Vec<u8>, Option<Vec<u8>>>,
}
#[async_trait::async_trait]
impl PauseAware for RuntimeSwitchManager {
async fn is_paused(&self) -> std::result::Result<bool, BoxedError> {
self.is_procedure_paused().await.map_err(BoxedError::new)
}
}
const CACHE_TTL: Duration = Duration::from_secs(10);
const MAX_CAPACITY: u64 = 32;
impl RuntimeSwitchManager {
pub fn new(kv_backend: KvBackendRef) -> Self {
let cache = Cache::builder()
.time_to_live(CACHE_TTL)
.max_capacity(MAX_CAPACITY)
.build();
Self { kv_backend, cache }
}
async fn put_key(&self, key: &str) -> Result<()> {
let req = PutRequest {
key: Vec::from(key),
value: vec![],
prev_kv: false,
};
self.kv_backend.put(req).await?;
self.cache.invalidate(key.as_bytes()).await;
Ok(())
}
async fn delete_keys(&self, keys: &[&str]) -> Result<()> {
let req = BatchDeleteRequest::new()
.with_keys(keys.iter().map(|x| x.as_bytes().to_vec()).collect());
self.kv_backend.batch_delete(req).await?;
for key in keys {
self.cache.invalidate(key.as_bytes()).await;
}
Ok(())
}
/// Returns true if the key exists.
async fn exists(&self, key: &str) -> Result<bool> {
let key = key.as_bytes().to_vec();
let kv_backend = self.kv_backend.clone();
let value = self
.cache
.try_get_with(key.clone(), async move {
kv_backend.get(&key).await.map(|v| v.map(|v| v.value))
})
.await
.context(GetCacheSnafu)?;
Ok(value.is_some())
}
/// Enables maintenance mode.
pub async fn set_maintenance_mode(&self) -> Result<()> {
self.put_key(MAINTENANCE_KEY).await
}
/// Unsets maintenance mode.
pub async fn unset_maintenance_mode(&self) -> Result<()> {
self.delete_keys(&[MAINTENANCE_KEY, LEGACY_MAINTENANCE_KEY])
.await
}
/// Returns true if maintenance mode is enabled.
pub async fn maintenance_mode(&self) -> Result<bool> {
let exists = self.exists(MAINTENANCE_KEY).await?;
if exists {
return Ok(true);
}
let exists = self.exists(LEGACY_MAINTENANCE_KEY).await?;
if exists {
return Ok(true);
}
Ok(false)
}
// Pauses handling of incoming procedure requests.
pub async fn pasue_procedure(&self) -> Result<()> {
self.put_key(PAUSE_PROCEDURE_KEY).await
}
/// Resumes processing of incoming procedure requests.
pub async fn resume_procedure(&self) -> Result<()> {
self.delete_keys(&[PAUSE_PROCEDURE_KEY]).await
}
/// Returns true if the system is currently pausing incoming procedure requests.
pub async fn is_procedure_paused(&self) -> Result<bool> {
self.exists(PAUSE_PROCEDURE_KEY).await
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use crate::key::runtime_switch::RuntimeSwitchManager;
use crate::key::{LEGACY_MAINTENANCE_KEY, MAINTENANCE_KEY};
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::KvBackend;
use crate::rpc::store::PutRequest;
#[tokio::test]
async fn test_runtime_switch_manager_basic() {
let runtime_switch_manager =
Arc::new(RuntimeSwitchManager::new(Arc::new(MemoryKvBackend::new())));
runtime_switch_manager
.put_key(MAINTENANCE_KEY)
.await
.unwrap();
let v = runtime_switch_manager
.cache
.get(MAINTENANCE_KEY.as_bytes())
.await;
assert!(v.is_none());
runtime_switch_manager
.exists(MAINTENANCE_KEY)
.await
.unwrap();
let v = runtime_switch_manager
.cache
.get(MAINTENANCE_KEY.as_bytes())
.await;
assert!(v.is_some());
runtime_switch_manager
.delete_keys(&[MAINTENANCE_KEY])
.await
.unwrap();
let v = runtime_switch_manager
.cache
.get(MAINTENANCE_KEY.as_bytes())
.await;
assert!(v.is_none());
}
#[tokio::test]
async fn test_runtime_switch_manager() {
let runtime_switch_manager =
Arc::new(RuntimeSwitchManager::new(Arc::new(MemoryKvBackend::new())));
assert!(!runtime_switch_manager.maintenance_mode().await.unwrap());
runtime_switch_manager.set_maintenance_mode().await.unwrap();
assert!(runtime_switch_manager.maintenance_mode().await.unwrap());
runtime_switch_manager
.unset_maintenance_mode()
.await
.unwrap();
assert!(!runtime_switch_manager.maintenance_mode().await.unwrap());
}
#[tokio::test]
async fn test_runtime_switch_manager_with_legacy_key() {
let kv_backend = Arc::new(MemoryKvBackend::new());
kv_backend
.put(PutRequest {
key: Vec::from(LEGACY_MAINTENANCE_KEY),
value: vec![],
prev_kv: false,
})
.await
.unwrap();
let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend));
assert!(runtime_switch_manager.maintenance_mode().await.unwrap());
runtime_switch_manager
.unset_maintenance_mode()
.await
.unwrap();
assert!(!runtime_switch_manager.maintenance_mode().await.unwrap());
runtime_switch_manager.set_maintenance_mode().await.unwrap();
assert!(runtime_switch_manager.maintenance_mode().await.unwrap());
}
#[tokio::test]
async fn test_pasue_procedure() {
let runtime_switch_manager =
Arc::new(RuntimeSwitchManager::new(Arc::new(MemoryKvBackend::new())));
runtime_switch_manager.pasue_procedure().await.unwrap();
assert!(runtime_switch_manager.is_procedure_paused().await.unwrap());
runtime_switch_manager.resume_procedure().await.unwrap();
assert!(!runtime_switch_manager.is_procedure_paused().await.unwrap());
}
}

View File

@@ -28,6 +28,19 @@ use crate::PoisonKey;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Failed to check procedure manager status"))]
CheckStatus {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Manager is pasued"))]
ManagerPasued {
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Failed to execute procedure due to external error, clean poisons: {}",
clean_poisons
@@ -246,7 +259,8 @@ impl ErrorExt for Error {
| Error::ListState { source, .. }
| Error::PutPoison { source, .. }
| Error::DeletePoison { source, .. }
| Error::GetPoison { source, .. } => source.status_code(),
| Error::GetPoison { source, .. }
| Error::CheckStatus { source, .. } => source.status_code(),
Error::ToJson { .. }
| Error::DeleteState { .. }
@@ -259,7 +273,8 @@ impl ErrorExt for Error {
Error::RetryTimesExceeded { .. }
| Error::RollbackTimesExceeded { .. }
| Error::ManagerNotStart { .. } => StatusCode::IllegalState,
| Error::ManagerNotStart { .. }
| Error::ManagerPasued { .. } => StatusCode::IllegalState,
Error::RollbackNotSupported { .. } => StatusCode::Unsupported,
Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => {

View File

@@ -22,6 +22,7 @@ use std::time::{Duration, Instant};
use async_trait::async_trait;
use backon::ExponentialBuilder;
use common_error::ext::BoxedError;
use common_runtime::{RepeatedTask, TaskFunction};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{error, info, tracing};
@@ -30,9 +31,10 @@ use tokio::sync::watch::{self, Receiver, Sender};
use tokio::sync::{Mutex as TokioMutex, Notify};
use crate::error::{
self, DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu,
PoisonKeyNotDefinedSnafu, ProcedureNotFoundSnafu, Result, StartRemoveOutdatedMetaTaskSnafu,
StopRemoveOutdatedMetaTaskSnafu, TooManyRunningProceduresSnafu,
self, CheckStatusSnafu, DuplicateProcedureSnafu, Error, LoaderConflictSnafu,
ManagerNotStartSnafu, ManagerPasuedSnafu, PoisonKeyNotDefinedSnafu, ProcedureNotFoundSnafu,
Result, StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu,
TooManyRunningProceduresSnafu,
};
use crate::local::runner::Runner;
use crate::procedure::{BoxedProcedureLoader, InitProcedureState, PoisonKeys, ProcedureInfo};
@@ -522,6 +524,14 @@ impl Default for ManagerConfig {
}
}
type PauseAwareRef = Arc<dyn PauseAware>;
#[async_trait]
pub trait PauseAware: Send + Sync {
/// Returns true if the procedure manager is paused.
async fn is_paused(&self) -> std::result::Result<bool, BoxedError>;
}
/// A [ProcedureManager] that maintains procedure states locally.
pub struct LocalManager {
manager_ctx: Arc<ManagerContext>,
@@ -531,6 +541,7 @@ pub struct LocalManager {
/// GC task.
remove_outdated_meta_task: TokioMutex<Option<RepeatedTask<Error>>>,
config: ManagerConfig,
pause_aware: Option<PauseAwareRef>,
}
impl LocalManager {
@@ -539,6 +550,7 @@ impl LocalManager {
config: ManagerConfig,
state_store: StateStoreRef,
poison_store: PoisonStoreRef,
pause_aware: Option<PauseAwareRef>,
) -> LocalManager {
let manager_ctx = Arc::new(ManagerContext::new(poison_store));
@@ -549,6 +561,7 @@ impl LocalManager {
retry_delay: config.retry_delay,
remove_outdated_meta_task: TokioMutex::new(None),
config,
pause_aware,
}
}
@@ -719,6 +732,17 @@ impl LocalManager {
let loaders = self.manager_ctx.loaders.lock().unwrap();
loaders.contains_key(name)
}
async fn check_status(&self) -> Result<()> {
if let Some(pause_aware) = self.pause_aware.as_ref() {
ensure!(
!pause_aware.is_paused().await.context(CheckStatusSnafu)?,
ManagerPasuedSnafu
);
}
Ok(())
}
}
#[async_trait]
@@ -774,6 +798,7 @@ impl ProcedureManager for LocalManager {
!self.manager_ctx.contains_procedure(procedure_id),
DuplicateProcedureSnafu { procedure_id }
);
self.check_status().await?;
self.submit_root(
procedure.id,
@@ -979,7 +1004,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
let manager = LocalManager::new(config, state_store, poison_manager, None);
manager.manager_ctx.start();
manager
@@ -1004,7 +1029,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
let manager = LocalManager::new(config, state_store, poison_manager, None);
manager.manager_ctx.start();
manager
@@ -1058,7 +1083,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
let manager = LocalManager::new(config, state_store, poison_manager, None);
manager.manager_ctx.start();
let procedure_id = ProcedureId::random();
@@ -1110,7 +1135,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
let manager = LocalManager::new(config, state_store, poison_manager, None);
manager.manager_ctx.start();
#[derive(Debug)]
@@ -1191,7 +1216,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
let manager = LocalManager::new(config, state_store, poison_manager, None);
let mut procedure = ProcedureToLoad::new("submit");
procedure.lock_key = LockKey::single_exclusive("test.submit");
@@ -1219,7 +1244,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
let manager = LocalManager::new(config, state_store, poison_manager, None);
manager.start().await.unwrap();
manager.stop().await.unwrap();
@@ -1256,7 +1281,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
let manager = LocalManager::new(config, state_store, poison_manager, None);
manager.manager_ctx.set_running();
let mut procedure = ProcedureToLoad::new("submit");
@@ -1338,7 +1363,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
let manager = LocalManager::new(config, state_store, poison_manager, None);
manager.manager_ctx.set_running();
manager
@@ -1463,7 +1488,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
let manager = LocalManager::new(config, state_store, poison_manager, None);
manager.manager_ctx.start();
let notify = Arc::new(Notify::new());

View File

@@ -83,7 +83,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let poison_manager = Arc::new(InMemoryPoisonStore::default());
let manager = LocalManager::new(config, state_store, poison_manager);
let manager = LocalManager::new(config, state_store, poison_manager, None);
manager.start().await.unwrap();
#[derive(Debug)]

View File

@@ -37,6 +37,7 @@ use common_base::cancellation::CancellableFuture;
use common_base::Plugins;
use common_config::KvBackendConfig;
use common_error::ext::{BoxedError, ErrorExt};
use common_meta::key::runtime_switch::RuntimeSwitchManager;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::KvBackendRef;
use common_meta::state_store::KvStateStore;
@@ -125,10 +126,12 @@ impl Instance {
max_running_procedures: procedure_config.max_running_procedures,
..Default::default()
};
let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend.clone()));
let procedure_manager = Arc::new(LocalManager::new(
manager_config,
kv_state_store.clone(),
kv_state_store,
Some(runtime_switch_manager),
));
Ok((kv_backend, procedure_manager))

View File

@@ -687,8 +687,8 @@ pub enum Error {
location: Location,
},
#[snafu(display("Maintenance mode manager error"))]
MaintenanceModeManager {
#[snafu(display("Runtime switch manager error"))]
RuntimeSwitchManager {
source: common_meta::error::Error,
#[snafu(implicit)]
location: Location,
@@ -1015,7 +1015,7 @@ impl ErrorExt for Error {
Error::SubmitDdlTask { source, .. } => source.status_code(),
Error::ConvertProtoData { source, .. }
| Error::TableMetadataManager { source, .. }
| Error::MaintenanceModeManager { source, .. }
| Error::RuntimeSwitchManager { source, .. }
| Error::KvBackend { source, .. }
| Error::UnexpectedLogicalRouteTable { source, .. }
| Error::UpdateTopicNameValue { source, .. } => source.status_code(),

View File

@@ -27,7 +27,7 @@ use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::distributed_time_constants;
use common_meta::key::maintenance::MaintenanceModeManagerRef;
use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
use common_meta::leadership_notifier::{
@@ -437,7 +437,7 @@ pub struct Metasrv {
procedure_executor: ProcedureExecutorRef,
wal_options_allocator: WalOptionsAllocatorRef,
table_metadata_manager: TableMetadataManagerRef,
maintenance_mode_manager: MaintenanceModeManagerRef,
runtime_switch_manager: RuntimeSwitchManagerRef,
memory_region_keeper: MemoryRegionKeeperRef,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
region_migration_manager: RegionMigrationManagerRef,
@@ -696,8 +696,8 @@ impl Metasrv {
&self.table_metadata_manager
}
pub fn maintenance_mode_manager(&self) -> &MaintenanceModeManagerRef {
&self.maintenance_mode_manager
pub fn runtime_switch_manager(&self) -> &RuntimeSwitchManagerRef {
&self.runtime_switch_manager
}
pub fn memory_region_keeper(&self) -> &MemoryRegionKeeperRef {

View File

@@ -29,7 +29,7 @@ use common_meta::ddl_manager::DdlManager;
use common_meta::distributed_time_constants;
use common_meta::key::flow::flow_state::FlowStateManager;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::maintenance::MaintenanceModeManager;
use common_meta::key::runtime_switch::{RuntimeSwitchManager, RuntimeSwitchManagerRef};
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
@@ -193,7 +193,9 @@ impl MetasrvBuilder {
let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector::default()));
let pushers = Pushers::default();
let mailbox = build_mailbox(&kv_backend, &pushers);
let procedure_manager = build_procedure_manager(&options, &kv_backend);
let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend.clone()));
let procedure_manager =
build_procedure_manager(&options, &kv_backend, &runtime_switch_manager);
let table_metadata_manager = Arc::new(TableMetadataManager::new(
leader_cached_kv_backend.clone() as _,
@@ -201,7 +203,7 @@ impl MetasrvBuilder {
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(
leader_cached_kv_backend.clone() as _,
));
let maintenance_mode_manager = Arc::new(MaintenanceModeManager::new(kv_backend.clone()));
let selector_ctx = SelectorContext {
server_addr: options.grpc.server_addr.clone(),
datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS,
@@ -341,7 +343,7 @@ impl MetasrvBuilder {
selector_ctx.clone(),
supervisor_selector,
region_migration_manager.clone(),
maintenance_mode_manager.clone(),
runtime_switch_manager.clone(),
peer_lookup_service.clone(),
leader_cached_kv_backend.clone(),
);
@@ -464,7 +466,7 @@ impl MetasrvBuilder {
procedure_executor: ddl_manager,
wal_options_allocator,
table_metadata_manager,
maintenance_mode_manager,
runtime_switch_manager,
greptimedb_telemetry_task: get_greptimedb_telemetry_task(
Some(metasrv_home),
meta_peer_client,
@@ -507,6 +509,7 @@ fn build_mailbox(kv_backend: &KvBackendRef, pushers: &Pushers) -> MailboxRef {
fn build_procedure_manager(
options: &MetasrvOptions,
kv_backend: &KvBackendRef,
runtime_switch_manager: &RuntimeSwitchManagerRef,
) -> ProcedureManagerRef {
let manager_config = ManagerConfig {
max_retry_times: options.procedure.max_retry_times,
@@ -527,6 +530,7 @@ fn build_procedure_manager(
manager_config,
kv_state_store.clone(),
kv_state_store,
Some(runtime_switch_manager.clone()),
))
}

View File

@@ -92,6 +92,7 @@ impl TestingEnv {
ManagerConfig::default(),
state_store,
poison_manager,
None,
));
Self {

View File

@@ -52,6 +52,7 @@ impl TestEnv {
ManagerConfig::default(),
state_store,
poison_manager,
None,
));
let mailbox_ctx = MailboxContext::new(mailbox_sequence);

View File

@@ -20,7 +20,7 @@ use std::time::{Duration, Instant};
use async_trait::async_trait;
use common_meta::datanode::Stat;
use common_meta::ddl::{DetectingRegion, RegionFailureDetectorController};
use common_meta::key::maintenance::MaintenanceModeManagerRef;
use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
use common_meta::key::table_route::{TableRouteKey, TableRouteValue};
use common_meta::key::{MetadataKey, MetadataValue};
use common_meta::kv_backend::KvBackendRef;
@@ -280,7 +280,7 @@ pub struct RegionSupervisor {
/// Region migration manager.
region_migration_manager: RegionMigrationManagerRef,
/// The maintenance mode manager.
maintenance_mode_manager: MaintenanceModeManagerRef,
runtime_switch_manager: RuntimeSwitchManagerRef,
/// Peer lookup service
peer_lookup: PeerLookupServiceRef,
/// The kv backend.
@@ -354,7 +354,7 @@ impl RegionSupervisor {
selector_context: SelectorContext,
selector: RegionSupervisorSelector,
region_migration_manager: RegionMigrationManagerRef,
maintenance_mode_manager: MaintenanceModeManagerRef,
runtime_switch_manager: RuntimeSwitchManagerRef,
peer_lookup: PeerLookupServiceRef,
kv_backend: KvBackendRef,
) -> Self {
@@ -365,7 +365,7 @@ impl RegionSupervisor {
selector_context,
selector,
region_migration_manager,
maintenance_mode_manager,
runtime_switch_manager,
peer_lookup,
kv_backend,
}
@@ -559,10 +559,10 @@ impl RegionSupervisor {
}
pub(crate) async fn is_maintenance_mode_enabled(&self) -> Result<bool> {
self.maintenance_mode_manager
self.runtime_switch_manager
.maintenance_mode()
.await
.context(error::MaintenanceModeManagerSnafu)
.context(error::RuntimeSwitchManagerSnafu)
}
async fn select_peers(
@@ -768,7 +768,7 @@ pub(crate) mod tests {
use common_meta::key::table_route::{
LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue,
};
use common_meta::key::{maintenance, TableMetadataManager};
use common_meta::key::{runtime_switch, TableMetadataManager};
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::test_util::NoopPeerLookupService;
@@ -798,8 +798,8 @@ pub(crate) mod tests {
env.procedure_manager().clone(),
context_factory,
));
let maintenance_mode_manager =
Arc::new(maintenance::MaintenanceModeManager::new(env.kv_backend()));
let runtime_switch_manager =
Arc::new(runtime_switch::RuntimeSwitchManager::new(env.kv_backend()));
let peer_lookup = Arc::new(NoopPeerLookupService);
let (tx, rx) = RegionSupervisor::channel();
let kv_backend = env.kv_backend();
@@ -811,7 +811,7 @@ pub(crate) mod tests {
selector_context,
RegionSupervisorSelector::NaiveSelector(selector),
region_migration_manager,
maintenance_mode_manager,
runtime_switch_manager,
peer_lookup,
kv_backend,
),
@@ -1008,7 +1008,7 @@ pub(crate) mod tests {
let (mut supervisor, sender) = new_test_supervisor();
supervisor
.maintenance_mode_manager
.runtime_switch_manager
.set_maintenance_mode()
.await
.unwrap();

View File

@@ -17,6 +17,7 @@ mod heartbeat;
mod leader;
mod maintenance;
mod node_lease;
mod procedure;
mod util;
use std::collections::HashMap;
@@ -56,10 +57,25 @@ pub fn make_admin_service(metasrv: Arc<Metasrv>) -> Admin {
},
);
let router = router.route(
"/maintenance",
let router = router.routes(
&[
"/maintenance",
"/maintenance/status",
"/maintenance/enable",
"/maintenance/disable",
],
maintenance::MaintenanceHandler {
manager: metasrv.maintenance_mode_manager().clone(),
manager: metasrv.runtime_switch_manager().clone(),
},
);
let router = router.routes(
&[
"/procedure-manager/pause",
"/procedure-manager/resume",
"/procedure-manager/status",
],
procedure::ProcedureManagerHandler {
manager: metasrv.runtime_switch_manager().clone(),
},
);
let router = Router::nest("/admin", router);
@@ -97,10 +113,7 @@ impl NamedService for Admin {
const NAME: &'static str = "admin";
}
impl<T> Service<http::Request<T>> for Admin
where
T: Send,
{
impl Service<http::Request<BoxBody>> for Admin {
type Response = http::Response<BoxBody>;
type Error = Infallible;
type Future = BoxFuture<Self::Response, Self::Error>;
@@ -109,7 +122,7 @@ where
Poll::Ready(Ok(()))
}
fn call(&mut self, req: http::Request<T>) -> Self::Future {
fn call(&mut self, req: http::Request<BoxBody>) -> Self::Future {
let router = self.router.clone();
let query_params = req
.uri()
@@ -128,7 +141,7 @@ where
#[derive(Default)]
pub struct Router {
handlers: HashMap<String, Box<dyn HttpHandler>>,
handlers: HashMap<String, Arc<dyn HttpHandler>>,
}
impl Router {
@@ -153,7 +166,17 @@ impl Router {
pub fn route(mut self, path: &str, handler: impl HttpHandler + 'static) -> Self {
check_path(path);
let _ = self.handlers.insert(path.to_owned(), Box::new(handler));
let _ = self.handlers.insert(path.to_owned(), Arc::new(handler));
self
}
pub fn routes(mut self, paths: &[&str], handler: impl HttpHandler + 'static) -> Self {
let handler = Arc::new(handler);
for path in paths {
check_path(path);
let _ = self.handlers.insert(path.to_string(), handler.clone());
}
self
}
@@ -204,7 +227,7 @@ fn boxed(body: String) -> BoxBody {
mod tests {
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream};
use super::*;
use crate::metasrv::builder::MetasrvBuilder;
@@ -325,6 +348,13 @@ mod tests {
metasrv
}
async fn send_request(client: &mut DuplexStream, request: &[u8]) -> String {
client.write_all(request).await.unwrap();
let mut buf = vec![0; 1024];
let n = client.read(&mut buf).await.unwrap();
String::from_utf8_lossy(&buf[..n]).to_string()
}
#[tokio::test(flavor = "multi_thread")]
async fn test_metasrv_maintenance_mode() {
common_telemetry::init_default_ut_logging();
@@ -343,73 +373,188 @@ mod tests {
});
// Get maintenance mode
let http_request = b"GET /admin/maintenance HTTP/1.1\r\nHost: localhost\r\n\r\n";
client.write_all(http_request).await.unwrap();
let mut buf = vec![0; 1024];
let n = client.read(&mut buf).await.unwrap();
let response = String::from_utf8_lossy(&buf[..n]);
let response = send_request(
&mut client,
b"GET /admin/maintenance HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await;
assert!(response.contains(r#"{"enabled":false}"#));
assert!(response.contains("200 OK"));
// Set maintenance mode to true
let http_post = b"POST /admin/maintenance?enable=true HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n";
client.write_all(http_post).await.unwrap();
let mut buf = vec![0; 1024];
let n = client.read(&mut buf).await.unwrap();
let response = String::from_utf8_lossy(&buf[..n]);
let response = send_request(
&mut client,
b"POST /admin/maintenance?enable=true HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
)
.await;
assert!(response.contains(r#"{"enabled":true}"#));
assert!(response.contains("200 OK"));
let enabled = metasrv
.maintenance_mode_manager()
.runtime_switch_manager()
.maintenance_mode()
.await
.unwrap();
assert!(enabled);
// Get maintenance mode again
let http_request = b"GET /admin/maintenance HTTP/1.1\r\nHost: localhost\r\n\r\n";
client.write_all(http_request).await.unwrap();
let mut buf = vec![0; 1024];
let n = client.read(&mut buf).await.unwrap();
let response = String::from_utf8_lossy(&buf[..n]);
let response = send_request(
&mut client,
b"GET /admin/maintenance HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await;
assert!(response.contains(r#"{"enabled":true}"#));
assert!(response.contains("200 OK"));
// Set maintenance mode to false
let http_post = b"POST /admin/maintenance?enable=false HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n";
client.write_all(http_post).await.unwrap();
let mut buf = vec![0; 1024];
let n = client.read(&mut buf).await.unwrap();
let response = String::from_utf8_lossy(&buf[..n]);
let response = send_request(
&mut client,
b"POST /admin/maintenance?enable=false HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
)
.await;
assert!(response.contains(r#"{"enabled":false}"#));
assert!(response.contains("200 OK"));
let enabled = metasrv
.maintenance_mode_manager()
.runtime_switch_manager()
.maintenance_mode()
.await
.unwrap();
assert!(!enabled);
// Set maintenance mode to true via GET request
let http_request =
b"GET /admin/maintenance?enable=true HTTP/1.1\r\nHost: localhost\r\n\r\n";
client.write_all(http_request).await.unwrap();
let mut buf = vec![0; 1024];
let n = client.read(&mut buf).await.unwrap();
let response = String::from_utf8_lossy(&buf[..n]);
let response = send_request(
&mut client,
b"GET /admin/maintenance?enable=true HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await;
assert!(response.contains(r#"{"enabled":true}"#));
assert!(response.contains("200 OK"));
// Set maintenance mode to false via GET request
let http_request =
b"PUT /admin/maintenance?enable=false HTTP/1.1\r\nHost: localhost\r\n\r\n";
client.write_all(http_request).await.unwrap();
let mut buf = vec![0; 1024];
let n = client.read(&mut buf).await.unwrap();
let response = String::from_utf8_lossy(&buf[..n]);
let response = send_request(
&mut client,
b"PUT /admin/maintenance?enable=false HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await;
assert!(response.contains(r#"{"enabled":false}"#));
assert!(response.contains("200 OK"));
// Get maintenance mode via status path
let response = send_request(
&mut client,
b"GET /admin/maintenance/status HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await;
assert!(response.contains(r#"{"enabled":false}"#));
// Set maintenance mode via enable path
let response = send_request(
&mut client,
b"POST /admin/maintenance/enable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
)
.await;
assert!(response.contains(r#"{"enabled":true}"#));
// Unset maintenance mode via disable path
let response = send_request(
&mut client,
b"POST /admin/maintenance/disable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
)
.await;
assert!(response.contains(r#"{"enabled":false}"#));
// send POST request to status path
let response = send_request(
&mut client,
b"POST /admin/maintenance/status HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
)
.await;
assert!(response.contains("404 Not Found"));
// send GET request to enable path
let response = send_request(
&mut client,
b"GET /admin/maintenance/enable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
)
.await;
assert!(response.contains("404 Not Found"));
// send GET request to disable path
let response = send_request(
&mut client,
b"GET /admin/maintenance/disable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
)
.await;
assert!(response.contains("404 Not Found"));
}
#[tokio::test(flavor = "multi_thread")]
async fn test_metasrv_procedure_manager_handler() {
common_telemetry::init_default_ut_logging();
let kv_backend = Arc::new(MemoryKvBackend::new());
let metasrv = test_metasrv(kv_backend).await;
metasrv.try_start().await.unwrap();
let (mut client, server) = tokio::io::duplex(1024);
let metasrv = Arc::new(metasrv);
let service = metasrv.clone();
let _handle = tokio::spawn(async move {
let router = bootstrap::router(service);
router
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
});
// send GET request to procedure-manager/status path
let response = send_request(
&mut client,
b"GET /admin/procedure-manager/status HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await;
assert!(response.contains("200 OK"));
assert!(
response.contains(r#"{"status":"running"}"#),
"response: {}",
response
);
// send POST request to procedure-manager/pause path
let response = send_request(
&mut client,
b"POST /admin/procedure-manager/pause HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await;
assert!(response.contains("200 OK"));
assert!(response.contains(r#"{"status":"paused"}"#));
// send POST request to procedure-manager/resume path
let response = send_request(
&mut client,
b"POST /admin/procedure-manager/resume HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await;
assert!(response.contains("200 OK"));
assert!(
response.contains(r#"{"status":"running"}"#),
"response: {}",
response
);
// send GET request to procedure-manager/resume path
let response = send_request(
&mut client,
b"GET /admin/procedure-manager/resume HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await;
assert!(response.contains("404 Not Found"));
// send GET request to procedure-manager/pause path
let response = send_request(
&mut client,
b"GET /admin/procedure-manager/pause HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await;
assert!(response.contains("404 Not Found"));
}
}

View File

@@ -14,7 +14,7 @@
use std::collections::HashMap;
use common_meta::key::maintenance::MaintenanceModeManagerRef;
use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
use common_telemetry::{info, warn};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
@@ -22,14 +22,15 @@ use tonic::codegen::http;
use tonic::codegen::http::Response;
use crate::error::{
self, InvalidHttpBodySnafu, MaintenanceModeManagerSnafu, MissingRequiredParameterSnafu,
ParseBoolSnafu, Result, UnsupportedSnafu,
self, MissingRequiredParameterSnafu, ParseBoolSnafu, Result, RuntimeSwitchManagerSnafu,
UnsupportedSnafu,
};
use crate::service::admin::util::{to_json_response, to_not_found_response};
use crate::service::admin::HttpHandler;
#[derive(Clone)]
pub struct MaintenanceHandler {
pub manager: MaintenanceModeManagerRef,
pub manager: RuntimeSwitchManagerRef,
}
#[derive(Debug, Serialize, Deserialize)]
@@ -48,79 +49,124 @@ impl TryFrom<MaintenanceResponse> for String {
}
impl MaintenanceHandler {
async fn get_maintenance(&self) -> crate::Result<Response<String>> {
async fn get_maintenance(&self) -> crate::Result<MaintenanceResponse> {
let enabled = self
.manager
.maintenance_mode()
.await
.context(MaintenanceModeManagerSnafu)?;
let response = MaintenanceResponse { enabled }.try_into()?;
http::Response::builder()
.status(http::StatusCode::OK)
.body(response)
.context(InvalidHttpBodySnafu)
.context(RuntimeSwitchManagerSnafu)?;
Ok(MaintenanceResponse { enabled })
}
async fn set_maintenance(
async fn set_maintenance(&self) -> crate::Result<MaintenanceResponse> {
self.manager
.set_maintenance_mode()
.await
.context(RuntimeSwitchManagerSnafu)?;
// TODO(weny): Add a record to the system events.
info!("Enable the maintenance mode.");
Ok(MaintenanceResponse { enabled: true })
}
async fn unset_maintenance(&self) -> crate::Result<MaintenanceResponse> {
self.manager
.unset_maintenance_mode()
.await
.context(RuntimeSwitchManagerSnafu)?;
// TODO(weny): Add a record to the system events.
info!("Disable the maintenance mode.");
Ok(MaintenanceResponse { enabled: false })
}
async fn handle_legacy_maintenance(
&self,
params: &HashMap<String, String>,
) -> crate::Result<Response<String>> {
let enable = params
.get("enable")
.map(|v| v.parse::<bool>())
.context(MissingRequiredParameterSnafu { param: "enable" })?
.context(ParseBoolSnafu {
err_msg: "'enable' must be 'true' or 'false'",
})?;
) -> crate::Result<MaintenanceResponse> {
let enable = get_enable_from_params(params)?;
if enable {
self.manager
.set_maintenance_mode()
.await
.context(MaintenanceModeManagerSnafu)?;
info!("Enable the maintenance mode.");
self.set_maintenance().await
} else {
self.manager
.unset_maintenance_mode()
.await
.context(MaintenanceModeManagerSnafu)?;
info!("Disable the maintenance mode.");
};
let response = MaintenanceResponse { enabled: enable }.try_into()?;
http::Response::builder()
.status(http::StatusCode::OK)
.body(response)
.context(InvalidHttpBodySnafu)
self.unset_maintenance().await
}
}
}
fn get_enable_from_params(params: &HashMap<String, String>) -> crate::Result<bool> {
params
.get("enable")
.map(|v| v.parse::<bool>())
.context(MissingRequiredParameterSnafu { param: "enable" })?
.context(ParseBoolSnafu {
err_msg: "'enable' must be 'true' or 'false'",
})
}
const MAINTENANCE_PATH: &str = "maintenance";
const ENABLE_SUFFIX: &str = "enable";
const DISABLE_SUFFIX: &str = "disable";
const STATUS_SUFFIX: &str = "status";
#[async_trait::async_trait]
impl HttpHandler for MaintenanceHandler {
// TODO(weny): Remove the legacy version of the maintenance API.
// However, we need to keep the legacy version for a while to avoid breaking the existing operators.
async fn handle(
&self,
_: &str,
path: &str,
method: http::Method,
params: &HashMap<String, String>,
) -> crate::Result<Response<String>> {
match method {
http::Method::GET => {
if params.is_empty() {
self.get_maintenance().await
} else {
if path.ends_with(STATUS_SUFFIX) {
// Handle GET request to '/admin/maintenance/status'
let response = self.get_maintenance().await?;
to_json_response(response)
} else if path.ends_with(MAINTENANCE_PATH) && params.is_empty() {
// Handle GET request to '/admin/maintenance'. (The legacy version)
let response = self.get_maintenance().await?;
to_json_response(response)
} else if path.ends_with(MAINTENANCE_PATH) {
// Handle GET request to '/admin/maintenance' with URL parameters. (The legacy version)
warn!(
"Found URL parameters in '/admin/maintenance' request, it's deprecated, will be removed in the future"
);
// The old version operator will send GET request with URL parameters,
// so we need to support it.
self.set_maintenance(params).await
let response = self.handle_legacy_maintenance(params).await?;
to_json_response(response)
} else {
to_not_found_response()
}
}
http::Method::PUT => {
warn!("Found PUT request to '/admin/maintenance', it's deprecated, will be removed in the future");
self.set_maintenance(params).await
// Handle PUT request to '/admin/maintenance' with URL parameters. (The legacy version)
if path.ends_with(MAINTENANCE_PATH) {
warn!("Found PUT request to '/admin/maintenance', it's deprecated, will be removed in the future");
let response = self.handle_legacy_maintenance(params).await?;
to_json_response(response)
} else {
to_not_found_response()
}
}
http::Method::POST => {
// Handle POST request to '/admin/maintenance/enable'
if path.ends_with(ENABLE_SUFFIX) {
let response = self.set_maintenance().await?;
to_json_response(response)
} else if path.ends_with(DISABLE_SUFFIX) {
// Handle POST request to '/admin/maintenance/disable'
let response = self.unset_maintenance().await?;
to_json_response(response)
} else if path.ends_with(MAINTENANCE_PATH) {
// Handle POST request to '/admin/maintenance' with URL parameters. (The legacy version)
warn!("Found PUT request to '/admin/maintenance', it's deprecated, will be removed in the future");
let response = self.handle_legacy_maintenance(params).await?;
to_json_response(response)
} else {
to_not_found_response()
}
}
http::Method::POST => self.set_maintenance(params).await,
_ => UnsupportedSnafu {
operation: format!("http method {method}"),
}

View File

@@ -0,0 +1,119 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use tonic::codegen::http;
use tonic::codegen::http::Response;
use crate::error::RuntimeSwitchManagerSnafu;
use crate::service::admin::util::{to_json_response, to_not_found_response};
use crate::service::admin::HttpHandler;
#[derive(Clone)]
pub struct ProcedureManagerHandler {
pub manager: RuntimeSwitchManagerRef,
}
#[derive(Debug, Serialize, Deserialize)]
struct ProcedureManagerStatusResponse {
status: ProcedureManagerStatus,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
enum ProcedureManagerStatus {
Paused,
Running,
}
impl ProcedureManagerHandler {
async fn pause_procedure_manager(&self) -> crate::Result<ProcedureManagerStatusResponse> {
self.manager
.pasue_procedure()
.await
.context(RuntimeSwitchManagerSnafu)?;
// TODO(weny): Add a record to the system events.
info!("Pause the procedure manager.");
Ok(ProcedureManagerStatusResponse {
status: ProcedureManagerStatus::Paused,
})
}
async fn resume_procedure_manager(&self) -> crate::Result<ProcedureManagerStatusResponse> {
self.manager
.resume_procedure()
.await
.context(RuntimeSwitchManagerSnafu)?;
// TODO(weny): Add a record to the system events.
info!("Resume the procedure manager.");
Ok(ProcedureManagerStatusResponse {
status: ProcedureManagerStatus::Running,
})
}
async fn get_procedure_manager_status(&self) -> crate::Result<ProcedureManagerStatusResponse> {
let is_paused = self
.manager
.is_procedure_paused()
.await
.context(RuntimeSwitchManagerSnafu)?;
let response = ProcedureManagerStatusResponse {
status: if is_paused {
ProcedureManagerStatus::Paused
} else {
ProcedureManagerStatus::Running
},
};
Ok(response)
}
}
#[async_trait::async_trait]
impl HttpHandler for ProcedureManagerHandler {
async fn handle(
&self,
path: &str,
method: http::Method,
_: &HashMap<String, String>,
) -> crate::Result<Response<String>> {
match method {
http::Method::GET => {
if path.ends_with("status") {
let response = self.get_procedure_manager_status().await?;
to_json_response(response)
} else {
to_not_found_response()
}
}
http::Method::POST => {
if path.ends_with("pause") {
let response = self.pause_procedure_manager().await?;
to_json_response(response)
} else if path.ends_with("resume") {
let response = self.resume_procedure_manager().await?;
to_json_response(response)
} else {
to_not_found_response()
}
}
_ => to_not_found_response(),
}
}
}

View File

@@ -12,11 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Debug;
use serde::Serialize;
use snafu::ResultExt;
use tonic::codegen::http;
use crate::error::{self, Result};
/// Returns a 200 response with a text body.
pub fn to_text_response(text: &str) -> Result<http::Response<String>> {
http::Response::builder()
.header("Content-Type", "text/plain")
@@ -24,3 +28,26 @@ pub fn to_text_response(text: &str) -> Result<http::Response<String>> {
.body(text.to_string())
.context(error::InvalidHttpBodySnafu)
}
/// Returns a 200 response with a JSON body.
pub fn to_json_response<T>(response: T) -> Result<http::Response<String>>
where
T: Serialize + Debug,
{
let response = serde_json::to_string(&response).context(error::SerializeToJsonSnafu {
input: format!("{response:?}"),
})?;
http::Response::builder()
.header("Content-Type", "application/json")
.status(http::StatusCode::OK)
.body(response)
.context(error::InvalidHttpBodySnafu)
}
/// Returns a 404 response with an empty body.
pub fn to_not_found_response() -> Result<http::Response<String>> {
http::Response::builder()
.status(http::StatusCode::NOT_FOUND)
.body("".to_string())
.context(error::InvalidHttpBodySnafu)
}