From 200422313f3b7c70c1665f280f84c1a2c71f16cb Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 27 Aug 2025 18:47:11 +0800 Subject: [PATCH] refactor(meta): refactor admin service to use modern axum handlers (#6833) * refactor(meta): refactor admin service to use modern axum handlers Signed-off-by: WenyXu * Update src/meta-srv/src/service/admin/health.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Signed-off-by: WenyXu Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/meta-srv/src/service/admin.rs | 320 ++++-------------- src/meta-srv/src/service/admin/health.rs | 11 + src/meta-srv/src/service/admin/heartbeat.rs | 134 +++++++- src/meta-srv/src/service/admin/leader.rs | 27 +- src/meta-srv/src/service/admin/maintenance.rs | 41 ++- src/meta-srv/src/service/admin/node_lease.rs | 39 ++- src/meta-srv/src/service/admin/procedure.rs | 41 ++- src/meta-srv/src/service/admin/recovery.rs | 48 +-- src/meta-srv/src/service/admin/sequencer.rs | 10 +- src/meta-srv/src/service/admin/util.rs | 34 +- 10 files changed, 390 insertions(+), 315 deletions(-) diff --git a/src/meta-srv/src/service/admin.rs b/src/meta-srv/src/service/admin.rs index 608da1eb38..8b089d11aa 100644 --- a/src/meta-srv/src/service/admin.rs +++ b/src/meta-srv/src/service/admin.rs @@ -27,26 +27,26 @@ use std::convert::Infallible; use std::sync::Arc; use std::task::{Context, Poll}; -use axum::http::StatusCode; -use axum::response::IntoResponse; use axum::{routing, Router as AxumRouter}; use tonic::body::Body; use tonic::codegen::{http, BoxFuture, Service}; use tonic::server::NamedService; use crate::metasrv::Metasrv; -use crate::service::admin::health::HealthHandler; use crate::service::admin::heartbeat::HeartBeatHandler; use crate::service::admin::leader::LeaderHandler; use crate::service::admin::maintenance::MaintenanceHandler; use crate::service::admin::node_lease::NodeLeaseHandler; use crate::service::admin::procedure::ProcedureManagerHandler; -use crate::service::admin::recovery::{ - get_recovery_mode, set_recovery_mode, unset_recovery_mode, RecoveryHandler, -}; +use crate::service::admin::recovery::RecoveryHandler; use crate::service::admin::sequencer::TableIdSequenceHandler; -use crate::service::admin::util::{to_axum_json_response, to_axum_not_found_response}; +/// Expose admin http service on rpc port(3002). +/// +/// # Deprecated +/// +/// This function is deprecated and will be removed in the future. Please use +/// [`admin_axum_router`] instead. pub fn make_admin_service(metasrv: Arc) -> Admin { let router = Router::new().route("/health", health::HealthHandler); @@ -231,257 +231,81 @@ fn check_path(path: &str) { /// Expose admin HTTP endpoints as an Axum router for the main HTTP server. pub fn admin_axum_router(metasrv: Arc) -> AxumRouter { - let node_lease_handler = Arc::new(NodeLeaseHandler { + let node_lease_handler = NodeLeaseHandler { meta_peer_client: metasrv.meta_peer_client().clone(), - }); - let heartbeat_handler = Arc::new(HeartBeatHandler { + }; + let heartbeat_handler = HeartBeatHandler { meta_peer_client: metasrv.meta_peer_client().clone(), - }); - let leader_handler = Arc::new(LeaderHandler { + }; + let leader_handler = LeaderHandler { election: metasrv.election().cloned(), - }); - let maintenance_handler = Arc::new(MaintenanceHandler { + }; + let maintenance_handler = MaintenanceHandler { manager: metasrv.runtime_switch_manager().clone(), - }); - let procedure_handler = Arc::new(ProcedureManagerHandler { + }; + let procedure_handler = ProcedureManagerHandler { manager: metasrv.runtime_switch_manager().clone(), - }); - let recovery_handler = Arc::new(RecoveryHandler { + }; + let recovery_handler = RecoveryHandler { manager: metasrv.runtime_switch_manager().clone(), - }); - let table_id_sequence_handler = Arc::new(TableIdSequenceHandler { + }; + let table_id_sequence_handler = TableIdSequenceHandler { table_id_sequence: metasrv.table_id_sequence().clone(), runtime_switch_manager: metasrv.runtime_switch_manager().clone(), - }); - let sequence_router = AxumRouter::new().nest( - "/table", - AxumRouter::new() - .route("/next-id", routing::get(sequencer::get_next_table_id)) - .route("/set-next-id", routing::post(sequencer::set_next_table_id)) - .with_state(table_id_sequence_handler), - ); - - let health_router = AxumRouter::new().route( - "/", - routing::get({ - move || { - let handler = HealthHandler; - async move { - match handler - .handle("/health", http::Method::GET, &Default::default()) - .await - { - Ok(status) => status.body().clone().into_response(), - Err(e) => { - common_telemetry::error!(e; "Health handler failed"); - StatusCode::INTERNAL_SERVER_ERROR.into_response() - } - } - } - } - }), - ); - - let node_lease_router = AxumRouter::new().route( - "/", - routing::get({ - let handler = node_lease_handler.clone(); - move || async move { - match handler - .handle("/node-lease", http::Method::GET, &Default::default()) - .await - { - Ok(resp) => resp.body().clone().into_response(), - Err(e) => { - common_telemetry::error!(e; "Node lease handler failed"); - StatusCode::INTERNAL_SERVER_ERROR.into_response() - } - } - } - }), - ); - - let leader_router = AxumRouter::new().route( - "/", - routing::get({ - let handler = leader_handler.clone(); - move || async move { - match handler - .handle("/leader", http::Method::GET, &Default::default()) - .await - { - Ok(resp) => resp.body().clone().into_response(), - Err(e) => { - common_telemetry::error!(e; "Leader handler failed"); - StatusCode::INTERNAL_SERVER_ERROR.into_response() - } - } - } - }), - ); - - let heartbeat_router = AxumRouter::new() - .route( - "/", - routing::get({ - let handler = heartbeat_handler.clone(); - move || async move { - match handler - .handle("/heartbeat", http::Method::GET, &Default::default()) - .await - { - Ok(resp) => resp.body().clone().into_response(), - Err(e) => { - common_telemetry::error!(e; "Heartbeat handler failed"); - StatusCode::INTERNAL_SERVER_ERROR.into_response() - } - } - } - }), - ) - .route( - "/help", - routing::get({ - let handler = heartbeat_handler.clone(); - move || async move { - match handler - .handle("/heartbeat/help", http::Method::GET, &Default::default()) - .await - { - Ok(resp) => resp.body().clone().into_response(), - Err(e) => { - common_telemetry::error!(e; "Heartbeat help handler failed"); - StatusCode::INTERNAL_SERVER_ERROR.into_response() - } - } - } - }), - ); - - let maintenance_router = AxumRouter::new() - .route( - "/", - routing::get({ - let handler = maintenance_handler.clone(); - move || async move { - match handler.get_maintenance().await { - Ok(resp) => to_axum_json_response(resp), - Err(e) => { - common_telemetry::error!(e; "Maintenance handler failed"); - to_axum_not_found_response() - } - } - } - }), - ) - .route( - "/status", - routing::get({ - let handler = maintenance_handler.clone(); - move || async move { - match handler.get_maintenance().await { - Ok(resp) => to_axum_json_response(resp), - Err(e) => { - common_telemetry::error!(e; "Maintenance status handler failed"); - to_axum_not_found_response() - } - } - } - }), - ) - .route( - "/enable", - routing::post({ - let handler = maintenance_handler.clone(); - move || async move { - match handler.set_maintenance().await { - Ok(resp) => to_axum_json_response(resp), - Err(e) => { - common_telemetry::error!(e; "Maintenance enable handler failed"); - to_axum_not_found_response() - } - } - } - }), - ) - .route( - "/disable", - routing::post({ - let handler = maintenance_handler.clone(); - move || async move { - match handler.unset_maintenance().await { - Ok(resp) => to_axum_json_response(resp), - Err(e) => { - common_telemetry::error!(e; "Maintenance disable handler failed"); - to_axum_not_found_response() - } - } - } - }), - ); - - let procedure_router = AxumRouter::new() - .route( - "/status", - routing::get({ - let handler = procedure_handler.clone(); - move || async move { - match handler.get_procedure_manager_status().await { - Ok(resp) => to_axum_json_response(resp), - Err(e) => { - common_telemetry::error!(e; "Procedure manager status handler failed"); - to_axum_not_found_response() - } - } - } - }), - ) - .route( - "/pause", - routing::post({ - let handler = procedure_handler.clone(); - move || async move { - match handler.pause_procedure_manager().await { - Ok(resp) => to_axum_json_response(resp), - Err(e) => { - common_telemetry::error!(e; "Procedure manager pause handler failed"); - to_axum_not_found_response() - } - } - } - }), - ) - .route( - "/resume", - routing::post({ - let handler = procedure_handler.clone(); - move || async move { - match handler.resume_procedure_manager().await { - Ok(resp) => to_axum_json_response(resp), - Err(e) => { - common_telemetry::error!(e; "Procedure manager resume handler failed"); - to_axum_not_found_response() - } - } - } - }), - ); - - let recovery_router = AxumRouter::new() - .route("/enable", routing::post(set_recovery_mode)) - .route("/disable", routing::post(unset_recovery_mode)) - .route("/status", routing::get(get_recovery_mode)) - .with_state(recovery_handler); + }; let admin_router = AxumRouter::new() - .nest("/health", health_router) - .nest("/node-lease", node_lease_router) - .nest("/leader", leader_router) - .nest("/heartbeat", heartbeat_router) - .nest("/maintenance", maintenance_router) - .nest("/procedure-manager", procedure_router) - .nest("/recovery", recovery_router) - .nest("/sequence", sequence_router); + .route("/health", routing::get(health::health)) + .route( + "/node-lease", + routing::get(node_lease::get).with_state(node_lease_handler), + ) + .route( + "/leader", + routing::get(leader::get).with_state(leader_handler), + ) + .nest( + "/heartbeat", + AxumRouter::new() + .route("/", routing::get(heartbeat::get)) + .route("/help", routing::get(heartbeat::help)) + .with_state(heartbeat_handler), + ) + .nest( + "/maintenance", + AxumRouter::new() + .route("/", routing::get(maintenance::status)) + .route("/status", routing::get(maintenance::status)) + .route("/enable", routing::post(maintenance::set)) + .route("/disable", routing::post(maintenance::unset)) + .with_state(maintenance_handler), + ) + .nest( + "/procedure-manager", + AxumRouter::new() + .route("/status", routing::get(procedure::status)) + .route("/pause", routing::post(procedure::pause)) + .route("/resume", routing::post(procedure::resume)) + .with_state(procedure_handler), + ) + .nest( + "/recovery", + AxumRouter::new() + .route("/status", routing::get(recovery::status)) + .route("/enable", routing::post(recovery::set)) + .route("/disable", routing::post(recovery::unset)) + .with_state(recovery_handler), + ) + .nest( + "/sequence", + AxumRouter::new().nest( + "/table", + AxumRouter::new() + .route("/next-id", routing::get(sequencer::get_next_table_id)) + .route("/set-next-id", routing::post(sequencer::set_next_table_id)) + .with_state(table_id_sequence_handler.clone()), + ), + ); AxumRouter::new().nest("/admin", admin_router) } diff --git a/src/meta-srv/src/service/admin/health.rs b/src/meta-srv/src/service/admin/health.rs index 1394c389c8..23116f10ff 100644 --- a/src/meta-srv/src/service/admin/health.rs +++ b/src/meta-srv/src/service/admin/health.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; +use axum::response::{IntoResponse, Response}; use tonic::codegen::http; use crate::error::Result; @@ -24,6 +25,16 @@ const HTTP_OK: &str = "OK\n"; #[derive(Clone)] pub struct HealthHandler; +/// Health check endpoint that returns HTTP 200 OK if the service is healthy. +#[axum_macros::debug_handler] +pub(crate) async fn health() -> Response { + http::Response::builder() + .status(http::StatusCode::OK) + .body(HTTP_OK.to_owned()) + .unwrap() + .into_response() +} + #[async_trait::async_trait] impl HttpHandler for HealthHandler { async fn handle( diff --git a/src/meta-srv/src/service/admin/heartbeat.rs b/src/meta-srv/src/service/admin/heartbeat.rs index 078d6cdc77..e351f76ffa 100644 --- a/src/meta-srv/src/service/admin/heartbeat.rs +++ b/src/meta-srv/src/service/admin/heartbeat.rs @@ -14,6 +14,9 @@ use std::collections::HashMap; +use axum::extract::{Query, State}; +use axum::response::{IntoResponse, Response}; +use axum::Json; use common_meta::datanode::DatanodeStatValue; use serde::{Deserialize, Serialize}; use snafu::ResultExt; @@ -21,6 +24,7 @@ use tonic::codegen::http; use crate::cluster::MetaPeerClientRef; use crate::error::{self, Result}; +use crate::service::admin::util::ErrorHandler; use crate::service::admin::{util, HttpHandler}; #[derive(Clone)] @@ -28,6 +32,47 @@ pub struct HeartBeatHandler { pub meta_peer_client: MetaPeerClientRef, } +impl HeartBeatHandler { + async fn get_heartbeat(&self, filter: Option<&str>) -> Result { + let stat_kvs = self.meta_peer_client.get_all_dn_stat_kvs().await?; + let mut stat_vals: Vec = stat_kvs.into_values().collect(); + + if let Some(addr) = filter { + stat_vals = filter_by_addr(stat_vals, addr); + } + Ok(StatValues { stat_vals }) + } +} + +fn find_addr_filter_from_params(params: &HashMap) -> Option<&str> { + params.get("addr").map(|s| s.as_str()) +} + +/// Get the heartbeat handler. +#[axum_macros::debug_handler] +pub(crate) async fn get( + State(handler): State, + Query(params): Query>, +) -> Response { + let filter = find_addr_filter_from_params(¶ms); + handler + .get_heartbeat(filter) + .await + .map_err(ErrorHandler::new) + .map(Json) + .into_response() +} + +/// Get the heartbeat help handler. +#[axum_macros::debug_handler] +pub(crate) async fn help() -> Response { + r#" + - GET /heartbeat + - GET /heartbeat?addr=127.0.0.1:3001 + "# + .into_response() +} + #[async_trait::async_trait] impl HttpHandler for HeartBeatHandler { async fn handle( @@ -45,13 +90,8 @@ impl HttpHandler for HeartBeatHandler { ); } - let stat_kvs = self.meta_peer_client.get_all_dn_stat_kvs().await?; - let mut stat_vals: Vec = stat_kvs.into_values().collect(); - - if let Some(addr) = params.get("addr") { - stat_vals = filter_by_addr(stat_vals, addr); - } - let result = StatValues { stat_vals }.try_into()?; + let filter = find_addr_filter_from_params(params); + let result = self.get_heartbeat(filter).await?.try_into()?; http::Response::builder() .status(http::StatusCode::OK) @@ -85,9 +125,18 @@ fn filter_by_addr(stat_vals: Vec, addr: &str) -> Vec String { + let body_bytes = to_bytes(resp.into_body(), usize::MAX).await.unwrap(); + String::from_utf8_lossy(&body_bytes).to_string() + } + + async fn put_stat_value(kv_backend: &KvBackendRef, node_id: u64, addr: &str) { + let value: Vec = DatanodeStatValue { + stats: vec![Stat { + addr: addr.to_string(), + timestamp_millis: 3, + ..Default::default() + }], + } + .try_into() + .unwrap(); + let put = PutRequest::new() + .with_key(DatanodeStatKey { node_id }) + .with_value(value); + kv_backend.put(put).await.unwrap(); + } + + #[tokio::test] + async fn test_get_heartbeat_with_filter() { + common_telemetry::init_default_ut_logging(); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let meta_peer_client = MetaPeerClientBuilder::default() + .election(None) + .in_memory(kv_backend.clone()) + .build() + .map(Arc::new) + .unwrap(); + let kv_backend = kv_backend as _; + put_stat_value(&kv_backend, 0, "127.0.0.1:3000").await; + put_stat_value(&kv_backend, 1, "127.0.0.1:3001").await; + put_stat_value(&kv_backend, 2, "127.0.0.1:3002").await; + + let handler = HeartBeatHandler { + meta_peer_client: meta_peer_client.clone(), + }; + let app = axum::Router::new() + .route("/", axum::routing::get(heartbeat::get)) + .with_state(handler); + + let req = Request::builder() + .uri("/?addr=127.0.0.1:3003") + .body(Body::empty()) + .unwrap(); + let resp = app.clone().oneshot(req).await.unwrap(); + let status = resp.status(); + let body = get_body_string(resp).await; + assert_eq!(status, http::StatusCode::OK); + assert_eq!(body, r#"[]"#); + + let req = Request::builder() + .uri("/?addr=127.0.0.1:3001") + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + let status = resp.status(); + let body = get_body_string(resp).await; + assert_eq!(status, http::StatusCode::OK); + assert_eq!( + body, + "[[{\"timestamp_millis\":3,\"id\":0,\"addr\":\"127.0.0.1:3001\",\"rcus\":0,\"wcus\":0,\"region_num\":0,\"region_stats\":[],\"topic_stats\":[],\"node_epoch\":0,\"datanode_workloads\":{\"types\":[]}}]]" + ); + } } diff --git a/src/meta-srv/src/service/admin/leader.rs b/src/meta-srv/src/service/admin/leader.rs index aa952083c6..9b72af0651 100644 --- a/src/meta-srv/src/service/admin/leader.rs +++ b/src/meta-srv/src/service/admin/leader.rs @@ -14,11 +14,14 @@ use std::collections::HashMap; +use axum::extract::State; +use axum::response::{IntoResponse, Response}; use snafu::ResultExt; use tonic::codegen::http; use crate::error::{self, Result}; use crate::metasrv::ElectionRef; +use crate::service::admin::util::ErrorHandler; use crate::service::admin::HttpHandler; #[derive(Clone)] @@ -26,6 +29,27 @@ pub struct LeaderHandler { pub election: Option, } +impl LeaderHandler { + async fn get_leader(&self) -> Result> { + if let Some(election) = &self.election { + let leader_addr = election.leader().await?.0; + return Ok(Some(leader_addr)); + } + Ok(None) + } +} + +/// Get the leader handler. +#[axum_macros::debug_handler] +pub(crate) async fn get(State(handler): State) -> Response { + handler + .get_leader() + .await + .map_err(ErrorHandler::new) + .map(|leader| leader.unwrap_or("election info is None".to_string())) + .into_response() +} + #[async_trait::async_trait] impl HttpHandler for LeaderHandler { async fn handle( @@ -34,8 +58,7 @@ impl HttpHandler for LeaderHandler { _: http::Method, _: &HashMap, ) -> Result> { - if let Some(election) = &self.election { - let leader_addr = election.leader().await?.0; + if let Some(leader_addr) = self.get_leader().await? { return http::Response::builder() .status(http::StatusCode::OK) .body(leader_addr) diff --git a/src/meta-srv/src/service/admin/maintenance.rs b/src/meta-srv/src/service/admin/maintenance.rs index e324a02f2b..8e2639ec60 100644 --- a/src/meta-srv/src/service/admin/maintenance.rs +++ b/src/meta-srv/src/service/admin/maintenance.rs @@ -14,18 +14,20 @@ use std::collections::HashMap; +use axum::extract::State; +use axum::response::{IntoResponse, Response}; +use axum::Json; use common_meta::key::runtime_switch::RuntimeSwitchManagerRef; use common_telemetry::{info, warn}; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use tonic::codegen::http; -use tonic::codegen::http::Response; use crate::error::{ self, MissingRequiredParameterSnafu, ParseBoolSnafu, Result, RuntimeSwitchManagerSnafu, UnsupportedSnafu, }; -use crate::service::admin::util::{to_json_response, to_not_found_response}; +use crate::service::admin::util::{to_json_response, to_not_found_response, ErrorHandler}; use crate::service::admin::HttpHandler; #[derive(Clone)] @@ -38,6 +40,39 @@ pub(crate) struct MaintenanceResponse { enabled: bool, } +/// Get the maintenance mode. +#[axum_macros::debug_handler] +pub(crate) async fn status(State(handler): State) -> Response { + handler + .get_maintenance() + .await + .map(Json) + .map_err(ErrorHandler::new) + .into_response() +} + +/// Set the maintenance mode. +#[axum_macros::debug_handler] +pub(crate) async fn set(State(handler): State) -> Response { + handler + .set_maintenance() + .await + .map(Json) + .map_err(ErrorHandler::new) + .into_response() +} + +/// Unset the maintenance mode. +#[axum_macros::debug_handler] +pub(crate) async fn unset(State(handler): State) -> Response { + handler + .unset_maintenance() + .await + .map(Json) + .map_err(ErrorHandler::new) + .into_response() +} + impl TryFrom for String { type Error = error::Error; @@ -115,7 +150,7 @@ impl HttpHandler for MaintenanceHandler { path: &str, method: http::Method, params: &HashMap, - ) -> crate::Result> { + ) -> crate::Result> { match method { http::Method::GET => { if path.ends_with(STATUS_SUFFIX) { diff --git a/src/meta-srv/src/service/admin/node_lease.rs b/src/meta-srv/src/service/admin/node_lease.rs index f3216e1644..4384ce0835 100644 --- a/src/meta-srv/src/service/admin/node_lease.rs +++ b/src/meta-srv/src/service/admin/node_lease.rs @@ -15,6 +15,9 @@ use std::collections::HashMap; use std::time::Duration; +use axum::extract::State; +use axum::response::{IntoResponse, Response}; +use axum::Json; use serde::{Deserialize, Serialize}; use snafu::ResultExt; use tonic::codegen::http; @@ -23,6 +26,7 @@ use crate::cluster::MetaPeerClientRef; use crate::error::{self, Result}; use crate::key::{DatanodeLeaseKey, LeaseValue}; use crate::lease; +use crate::service::admin::util::ErrorHandler; use crate::service::admin::HttpHandler; #[derive(Clone)] @@ -30,14 +34,8 @@ pub struct NodeLeaseHandler { pub meta_peer_client: MetaPeerClientRef, } -#[async_trait::async_trait] -impl HttpHandler for NodeLeaseHandler { - async fn handle( - &self, - _: &str, - _: http::Method, - _: &HashMap, - ) -> Result> { +impl NodeLeaseHandler { + async fn get_node_lease(&self) -> Result { let leases = lease::alive_datanodes(&self.meta_peer_client, Duration::from_secs(u64::MAX)).await?; let leases = leases @@ -49,7 +47,30 @@ impl HttpHandler for NodeLeaseHandler { lease: v, }) .collect::>(); - let result = LeaseValues { leases }.try_into()?; + Ok(LeaseValues { leases }) + } +} + +/// Get the node lease handler. +#[axum_macros::debug_handler] +pub(crate) async fn get(State(handler): State) -> Response { + handler + .get_node_lease() + .await + .map_err(ErrorHandler::new) + .map(Json) + .into_response() +} + +#[async_trait::async_trait] +impl HttpHandler for NodeLeaseHandler { + async fn handle( + &self, + _: &str, + _: http::Method, + _: &HashMap, + ) -> Result> { + let result = self.get_node_lease().await?.try_into()?; http::Response::builder() .status(http::StatusCode::OK) diff --git a/src/meta-srv/src/service/admin/procedure.rs b/src/meta-srv/src/service/admin/procedure.rs index a0a6ee87e9..74c8594503 100644 --- a/src/meta-srv/src/service/admin/procedure.rs +++ b/src/meta-srv/src/service/admin/procedure.rs @@ -14,15 +14,17 @@ use std::collections::HashMap; +use axum::extract::State; +use axum::response::{IntoResponse, Response}; +use axum::Json; use common_meta::key::runtime_switch::RuntimeSwitchManagerRef; use common_telemetry::info; use serde::{Deserialize, Serialize}; use snafu::ResultExt; use tonic::codegen::http; -use tonic::codegen::http::Response; use crate::error::RuntimeSwitchManagerSnafu; -use crate::service::admin::util::{to_json_response, to_not_found_response}; +use crate::service::admin::util::{to_json_response, to_not_found_response, ErrorHandler}; use crate::service::admin::HttpHandler; #[derive(Clone)] @@ -42,6 +44,39 @@ enum ProcedureManagerStatus { Running, } +/// Get the procedure manager status. +#[axum_macros::debug_handler] +pub(crate) async fn status(State(handler): State) -> Response { + handler + .get_procedure_manager_status() + .await + .map(Json) + .map_err(ErrorHandler::new) + .into_response() +} + +/// Pause the procedure manager. +#[axum_macros::debug_handler] +pub(crate) async fn pause(State(handler): State) -> Response { + handler + .pause_procedure_manager() + .await + .map(Json) + .map_err(ErrorHandler::new) + .into_response() +} + +/// Resume the procedure manager. +#[axum_macros::debug_handler] +pub(crate) async fn resume(State(handler): State) -> Response { + handler + .resume_procedure_manager() + .await + .map(Json) + .map_err(ErrorHandler::new) + .into_response() +} + impl ProcedureManagerHandler { pub(crate) async fn pause_procedure_manager( &self, @@ -98,7 +133,7 @@ impl HttpHandler for ProcedureManagerHandler { path: &str, method: http::Method, _: &HashMap, - ) -> crate::Result> { + ) -> crate::Result> { match method { http::Method::GET => { if path.ends_with("status") { diff --git a/src/meta-srv/src/service/admin/recovery.rs b/src/meta-srv/src/service/admin/recovery.rs index d35d52450d..ebfcc05656 100644 --- a/src/meta-srv/src/service/admin/recovery.rs +++ b/src/meta-srv/src/service/admin/recovery.rs @@ -12,18 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use axum::extract::State; -use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; use axum::Json; use common_meta::key::runtime_switch::RuntimeSwitchManagerRef; use serde::{Deserialize, Serialize}; -use servers::http::result::error_result::ErrorResponse; -pub(crate) type RecoveryHandlerRef = Arc; +use crate::service::admin::util::ErrorHandler; +#[derive(Clone)] pub(crate) struct RecoveryHandler { pub(crate) manager: RuntimeSwitchManagerRef, } @@ -35,29 +32,36 @@ pub(crate) struct RecoveryResponse { /// Get the recovery mode. #[axum_macros::debug_handler] -pub(crate) async fn get_recovery_mode(State(handler): State) -> Response { - let enabled = handler.manager.recovery_mode().await; - - match enabled { - Ok(enabled) => (StatusCode::OK, Json(RecoveryResponse { enabled })).into_response(), - Err(e) => ErrorResponse::from_error(e).into_response(), - } +pub(crate) async fn status(State(handler): State) -> Response { + handler + .manager + .recovery_mode() + .await + .map(|enabled| Json(RecoveryResponse { enabled })) + .map_err(ErrorHandler::new) + .into_response() } /// Set the recovery mode. #[axum_macros::debug_handler] -pub(crate) async fn set_recovery_mode(State(handler): State) -> Response { - match handler.manager.set_recovery_mode().await { - Ok(_) => (StatusCode::OK, Json(RecoveryResponse { enabled: true })).into_response(), - Err(e) => ErrorResponse::from_error(e).into_response(), - } +pub(crate) async fn set(State(handler): State) -> Response { + handler + .manager + .set_recovery_mode() + .await + .map(|_| Json(RecoveryResponse { enabled: true })) + .map_err(ErrorHandler::new) + .into_response() } /// Unset the recovery mode. #[axum_macros::debug_handler] -pub(crate) async fn unset_recovery_mode(State(handler): State) -> Response { - match handler.manager.unset_recovery_mode().await { - Ok(_) => (StatusCode::OK, Json(RecoveryResponse { enabled: false })).into_response(), - Err(e) => ErrorResponse::from_error(e).into_response(), - } +pub(crate) async fn unset(State(handler): State) -> Response { + handler + .manager + .unset_recovery_mode() + .await + .map(|_| Json(RecoveryResponse { enabled: false })) + .map_err(ErrorHandler::new) + .into_response() } diff --git a/src/meta-srv/src/service/admin/sequencer.rs b/src/meta-srv/src/service/admin/sequencer.rs index fa0558672e..db15998b05 100644 --- a/src/meta-srv/src/service/admin/sequencer.rs +++ b/src/meta-srv/src/service/admin/sequencer.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use axum::extract::{self, State}; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; @@ -28,8 +26,6 @@ use crate::error::{ PeekSequenceSnafu, Result, RuntimeSwitchManagerSnafu, SetNextSequenceSnafu, UnexpectedSnafu, }; -pub type TableIdSequenceHandlerRef = Arc; - #[derive(Clone)] pub(crate) struct TableIdSequenceHandler { pub(crate) table_id_sequence: SequenceRef, @@ -77,7 +73,7 @@ pub(crate) struct ResetTableIdRequest { /// Set the next table id. #[axum_macros::debug_handler] pub(crate) async fn set_next_table_id( - State(handler): State, + State(handler): State, extract::Json(ResetTableIdRequest { next_table_id }): extract::Json, ) -> Response { match handler.set_next_table_id(next_table_id).await { @@ -88,9 +84,7 @@ pub(crate) async fn set_next_table_id( /// Get the next table id without incrementing the sequence. #[axum_macros::debug_handler] -pub(crate) async fn get_next_table_id( - State(handler): State, -) -> Response { +pub(crate) async fn get_next_table_id(State(handler): State) -> Response { match handler.peek_table_id().await { Ok(next_table_id) => { (StatusCode::OK, Json(NextTableIdResponse { next_table_id })).into_response() diff --git a/src/meta-srv/src/service/admin/util.rs b/src/meta-srv/src/service/admin/util.rs index c47145656d..532678b72d 100644 --- a/src/meta-srv/src/service/admin/util.rs +++ b/src/meta-srv/src/service/admin/util.rs @@ -16,7 +16,6 @@ use std::fmt::Debug; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; -use axum::Json; use serde::Serialize; use snafu::ResultExt; use tonic::codegen::http; @@ -47,16 +46,6 @@ where .context(error::InvalidHttpBodySnafu) } -/// Converts any serializable type to an Axum JSON response with status 200. -pub fn to_axum_json_response(value: T) -> Response { - (StatusCode::OK, Json(value)).into_response() -} - -/// Returns a 404 response with an empty body. -pub fn to_axum_not_found_response() -> Response { - (StatusCode::NOT_FOUND, "").into_response() -} - /// Returns a 404 response with an empty body. pub fn to_not_found_response() -> Result> { http::Response::builder() @@ -64,3 +53,26 @@ pub fn to_not_found_response() -> Result> { .body("".to_string()) .context(error::InvalidHttpBodySnafu) } + +/// A wrapper for error handling in admin services. +pub(crate) struct ErrorHandler(E) +where + E: std::error::Error + Send + Sync + Sized; + +impl ErrorHandler +where + E: std::error::Error + Send + Sync + Sized, +{ + pub(crate) fn new(error: E) -> Self { + Self(error) + } +} + +impl IntoResponse for ErrorHandler +where + E: std::error::Error + Send + Sync + Sized, +{ + fn into_response(self) -> Response { + (StatusCode::INTERNAL_SERVER_ERROR, self.0.to_string()).into_response() + } +}