diff --git a/Cargo.lock b/Cargo.lock index 507bef7d5b..e223d17c4c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7156,6 +7156,9 @@ version = "0.15.4" dependencies = [ "api", "async-trait", + "axum 0.8.1", + "axum-extra", + "axum-macros", "bytes", "chrono", "clap 4.5.19", @@ -7188,6 +7191,7 @@ dependencies = [ "http-body-util", "humantime", "humantime-serde", + "hyper 0.14.30", "hyper-util", "itertools 0.14.0", "lazy_static", @@ -7215,6 +7219,7 @@ dependencies = [ "toml 0.8.19", "tonic 0.12.3", "tower 0.5.2", + "tower-http 0.6.2", "tracing", "tracing-subscriber", "typetag", diff --git a/Cargo.toml b/Cargo.toml index 5f93c30b35..a5e3d141be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -220,6 +220,8 @@ tokio-util = { version = "0.7", features = ["io-util", "compat"] } toml = "0.8.8" tonic = { version = "0.12", features = ["tls", "gzip", "zstd"] } tower = "0.5" +tower-http = "0.6" +tracing = "0.1" tracing-appender = "0.2" tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "fmt"] } typetag = "0.2" diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index 4e26a71cde..ea2262e243 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -341,7 +341,7 @@ impl StartCommand { .context(error::BuildMetaServerSnafu)?; let metasrv = builder.build().await.context(error::BuildMetaServerSnafu)?; - let instance = MetasrvInstance::new(opts, plugins, metasrv) + let instance = MetasrvInstance::new(metasrv) .await .context(error::BuildMetaServerSnafu)?; diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index c2b042059e..955b47f32b 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -20,6 +20,9 @@ local-ip-address.workspace = true [dependencies] api.workspace = true async-trait.workspace = true +axum.workspace = true +axum-extra.workspace = true +axum-macros.workspace = true bytes.workspace = true chrono.workspace = true clap.workspace = true @@ -76,6 +79,7 @@ tokio-stream = { workspace = true, features = ["net"] } toml.workspace = true tonic.workspace = true tower.workspace = true +tower-http.workspace = true typetag.workspace = true url = "2.3" uuid.workspace = true @@ -86,6 +90,7 @@ client = { workspace = true, features = ["testing"] } common-meta = { workspace = true, features = ["testing"] } common-procedure-test.workspace = true common-wal = { workspace = true, features = ["testing"] } +hyper = "0.14" session.workspace = true tracing = "0.1" tracing-subscriber.workspace = true diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index c9affc41ad..0a042cd484 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -71,6 +71,7 @@ use crate::selector::round_robin::RoundRobinSelector; use crate::selector::weight_compute::RegionNumsBasedWeightCompute; use crate::selector::SelectorType; use crate::service::admin; +use crate::service::admin::admin_axum_router; use crate::{error, Result}; pub struct MetasrvInstance { @@ -94,17 +95,20 @@ pub struct MetasrvInstance { } impl MetasrvInstance { - pub async fn new( - opts: MetasrvOptions, - plugins: Plugins, - metasrv: Metasrv, - ) -> Result { + pub async fn new(metasrv: Metasrv) -> Result { + let opts = metasrv.options().clone(); + let plugins = metasrv.plugins().clone(); + let metasrv = Arc::new(metasrv); + + // Wire up the admin_axum_router as an extra router + let extra_routers = admin_axum_router(metasrv.clone()); + let http_server = HttpServerBuilder::new(opts.http.clone()) .with_metrics_handler(MetricsHandler) .with_greptime_config_options(opts.to_toml().context(error::TomlFormatSnafu)?) + .with_extra_router(extra_routers) .build(); - let metasrv = Arc::new(metasrv); // put metasrv into plugins for later use plugins.insert::>(metasrv.clone()); let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins)) @@ -132,6 +136,7 @@ impl MetasrvInstance { self.signal_sender = Some(tx); + // Start gRPC server with admin services for backward compatibility let mut router = router(self.metasrv.clone()); if let Some(configurator) = self.metasrv.plugins().get::() { router = configurator.config_grpc(router); diff --git a/src/meta-srv/src/service/admin.rs b/src/meta-srv/src/service/admin.rs index 23a6034a4d..23fc7f89c0 100644 --- a/src/meta-srv/src/service/admin.rs +++ b/src/meta-srv/src/service/admin.rs @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod health; -mod heartbeat; -mod leader; -mod maintenance; -mod node_lease; -mod procedure; +pub(crate) mod health; +pub(crate) mod heartbeat; +pub(crate) mod leader; +pub(crate) mod maintenance; +pub(crate) mod node_lease; +pub(crate) mod procedure; mod util; use std::collections::HashMap; @@ -25,6 +25,9 @@ use std::convert::Infallible; use std::sync::Arc; use std::task::{Context, Poll}; +use axum::http::StatusCode; +use axum::response::IntoResponse; +use axum::{routing, Router as AxumRouter}; use bytes::Bytes; use http_body_util::{BodyExt, Full}; use tonic::body::BoxBody; @@ -32,6 +35,13 @@ use tonic::codegen::{empty_body, http, BoxFuture, Service}; use tonic::server::NamedService; use crate::metasrv::Metasrv; +use crate::service::admin::health::HealthHandler; +use crate::service::admin::heartbeat::HeartBeatHandler; +use crate::service::admin::leader::LeaderHandler; +use crate::service::admin::maintenance::MaintenanceHandler; +use crate::service::admin::node_lease::NodeLeaseHandler; +use crate::service::admin::procedure::ProcedureManagerHandler; +use crate::service::admin::util::{to_axum_json_response, to_axum_not_found_response}; pub fn make_admin_service(metasrv: Arc) -> Admin { let router = Router::new().route("/health", health::HealthHandler); @@ -223,6 +233,241 @@ fn boxed(body: String) -> BoxBody { .boxed_unsync() } +/// Expose admin HTTP endpoints as an Axum router for the main HTTP server. +pub fn admin_axum_router(metasrv: Arc) -> AxumRouter { + let node_lease_handler = Arc::new(NodeLeaseHandler { + meta_peer_client: metasrv.meta_peer_client().clone(), + }); + let heartbeat_handler = Arc::new(HeartBeatHandler { + meta_peer_client: metasrv.meta_peer_client().clone(), + }); + let leader_handler = Arc::new(LeaderHandler { + election: metasrv.election().cloned(), + }); + let maintenance_handler = Arc::new(MaintenanceHandler { + manager: metasrv.runtime_switch_manager().clone(), + }); + let procedure_handler = Arc::new(ProcedureManagerHandler { + manager: metasrv.runtime_switch_manager().clone(), + }); + + let health_router = AxumRouter::new().route( + "/", + routing::get({ + move || { + let handler = HealthHandler; + async move { + match handler + .handle("/health", http::Method::GET, &Default::default()) + .await + { + Ok(status) => status.body().clone().into_response(), + Err(e) => { + common_telemetry::error!(e; "Health handler failed"); + StatusCode::INTERNAL_SERVER_ERROR.into_response() + } + } + } + } + }), + ); + + let node_lease_router = AxumRouter::new().route( + "/", + routing::get({ + let handler = node_lease_handler.clone(); + move || async move { + match handler + .handle("/node-lease", http::Method::GET, &Default::default()) + .await + { + Ok(resp) => resp.body().clone().into_response(), + Err(e) => { + common_telemetry::error!(e; "Node lease handler failed"); + StatusCode::INTERNAL_SERVER_ERROR.into_response() + } + } + } + }), + ); + + let leader_router = AxumRouter::new().route( + "/", + routing::get({ + let handler = leader_handler.clone(); + move || async move { + match handler + .handle("/leader", http::Method::GET, &Default::default()) + .await + { + Ok(resp) => resp.body().clone().into_response(), + Err(e) => { + common_telemetry::error!(e; "Leader handler failed"); + StatusCode::INTERNAL_SERVER_ERROR.into_response() + } + } + } + }), + ); + + let heartbeat_router = AxumRouter::new() + .route( + "/", + routing::get({ + let handler = heartbeat_handler.clone(); + move || async move { + match handler + .handle("/heartbeat", http::Method::GET, &Default::default()) + .await + { + Ok(resp) => resp.body().clone().into_response(), + Err(e) => { + common_telemetry::error!(e; "Heartbeat handler failed"); + StatusCode::INTERNAL_SERVER_ERROR.into_response() + } + } + } + }), + ) + .route( + "/help", + routing::get({ + let handler = heartbeat_handler.clone(); + move || async move { + match handler + .handle("/heartbeat/help", http::Method::GET, &Default::default()) + .await + { + Ok(resp) => resp.body().clone().into_response(), + Err(e) => { + common_telemetry::error!(e; "Heartbeat help handler failed"); + StatusCode::INTERNAL_SERVER_ERROR.into_response() + } + } + } + }), + ); + + let maintenance_router = AxumRouter::new() + .route( + "/", + routing::get({ + let handler = maintenance_handler.clone(); + move || async move { + match handler.get_maintenance().await { + Ok(resp) => to_axum_json_response(resp), + Err(e) => { + common_telemetry::error!(e; "Maintenance handler failed"); + to_axum_not_found_response() + } + } + } + }), + ) + .route( + "/status", + routing::get({ + let handler = maintenance_handler.clone(); + move || async move { + match handler.get_maintenance().await { + Ok(resp) => to_axum_json_response(resp), + Err(e) => { + common_telemetry::error!(e; "Maintenance status handler failed"); + to_axum_not_found_response() + } + } + } + }), + ) + .route( + "/enable", + routing::post({ + let handler = maintenance_handler.clone(); + move || async move { + match handler.set_maintenance().await { + Ok(resp) => to_axum_json_response(resp), + Err(e) => { + common_telemetry::error!(e; "Maintenance enable handler failed"); + to_axum_not_found_response() + } + } + } + }), + ) + .route( + "/disable", + routing::post({ + let handler = maintenance_handler.clone(); + move || async move { + match handler.unset_maintenance().await { + Ok(resp) => to_axum_json_response(resp), + Err(e) => { + common_telemetry::error!(e; "Maintenance disable handler failed"); + to_axum_not_found_response() + } + } + } + }), + ); + + let procedure_router = AxumRouter::new() + .route( + "/status", + routing::get({ + let handler = procedure_handler.clone(); + move || async move { + match handler.get_procedure_manager_status().await { + Ok(resp) => to_axum_json_response(resp), + Err(e) => { + common_telemetry::error!(e; "Procedure manager status handler failed"); + to_axum_not_found_response() + } + } + } + }), + ) + .route( + "/pause", + routing::post({ + let handler = procedure_handler.clone(); + move || async move { + match handler.pause_procedure_manager().await { + Ok(resp) => to_axum_json_response(resp), + Err(e) => { + common_telemetry::error!(e; "Procedure manager pause handler failed"); + to_axum_not_found_response() + } + } + } + }), + ) + .route( + "/resume", + routing::post({ + let handler = procedure_handler.clone(); + move || async move { + match handler.resume_procedure_manager().await { + Ok(resp) => to_axum_json_response(resp), + Err(e) => { + common_telemetry::error!(e; "Procedure manager resume handler failed"); + to_axum_not_found_response() + } + } + } + }), + ); + + let admin_router = AxumRouter::new() + .nest("/health", health_router) + .nest("/node-lease", node_lease_router) + .nest("/leader", leader_router) + .nest("/heartbeat", heartbeat_router) + .nest("/maintenance", maintenance_router) + .nest("/procedure-manager", procedure_router); + + AxumRouter::new().nest("/admin", admin_router) +} + #[cfg(test)] mod tests { use common_meta::kv_backend::memory::MemoryKvBackend; @@ -558,3 +803,230 @@ mod tests { assert!(response.contains("404 Not Found")); } } + +#[cfg(test)] +mod axum_admin_tests { + use std::sync::Arc; + + use axum::body::{to_bytes, Body}; + use axum::http::{Method, Request, StatusCode}; + use common_meta::kv_backend::memory::MemoryKvBackend; + use tower::ServiceExt; // for `oneshot` + + use super::*; + use crate::metasrv::builder::MetasrvBuilder; + use crate::metasrv::MetasrvOptions; + + async fn setup_axum_app() -> AxumRouter { + let kv_backend = Arc::new(MemoryKvBackend::new()); + let metasrv = MetasrvBuilder::new() + .options(MetasrvOptions::default()) + .kv_backend(kv_backend) + .build() + .await + .unwrap(); + let metasrv = Arc::new(metasrv); + admin_axum_router(metasrv) + } + + async fn get_body_string(resp: axum::response::Response) -> String { + let body_bytes = to_bytes(resp.into_body(), usize::MAX).await.unwrap(); + String::from_utf8_lossy(&body_bytes).to_string() + } + + #[tokio::test] + async fn test_admin_health() { + let app = setup_axum_app().await; + let response = app + .oneshot( + Request::builder() + .uri("/admin/health") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + let body = get_body_string(response).await; + assert!(body.to_lowercase().contains("ok")); + } + + #[tokio::test] + async fn test_admin_node_lease() { + let app = setup_axum_app().await; + let response = app + .oneshot( + Request::builder() + .uri("/admin/node-lease") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + } + + #[tokio::test] + async fn test_admin_heartbeat() { + let app = setup_axum_app().await; + let response = app + .oneshot( + Request::builder() + .uri("/admin/heartbeat") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + } + + #[tokio::test] + async fn test_admin_heartbeat_help() { + let app = setup_axum_app().await; + let response = app + .oneshot( + Request::builder() + .uri("/admin/heartbeat/help") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + } + + #[tokio::test] + async fn test_admin_leader() { + let app = setup_axum_app().await; + let response = app + .oneshot( + Request::builder() + .uri("/admin/leader") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + } + + #[tokio::test] + async fn test_admin_maintenance() { + let app = setup_axum_app().await; + let response = app + .oneshot( + Request::builder() + .uri("/admin/maintenance") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + let body = get_body_string(response).await; + assert!(body.contains("enabled")); + } + + #[tokio::test] + async fn test_admin_maintenance_status() { + let app = setup_axum_app().await; + let response = app + .oneshot( + Request::builder() + .uri("/admin/maintenance/status") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + let body = get_body_string(response).await; + assert!(body.contains("enabled")); + } + + #[tokio::test] + async fn test_admin_maintenance_enable_disable() { + // Enable maintenance + let response = setup_axum_app() + .await + .oneshot( + Request::builder() + .method(Method::POST) + .uri("/admin/maintenance/enable") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + let body = get_body_string(response).await; + assert!(body.contains("enabled")); + // Disable maintenance + let response = setup_axum_app() + .await + .oneshot( + Request::builder() + .method(Method::POST) + .uri("/admin/maintenance/disable") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + let body = get_body_string(response).await; + assert!(body.contains("enabled")); + } + + #[tokio::test] + async fn test_admin_procedure_manager_status() { + let app = setup_axum_app().await; + let response = app + .oneshot( + Request::builder() + .uri("/admin/procedure-manager/status") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + let body = get_body_string(response).await; + assert!(body.contains("status")); + } + + #[tokio::test] + async fn test_admin_procedure_manager_pause_resume() { + // Pause + let response = setup_axum_app() + .await + .oneshot( + Request::builder() + .method(Method::POST) + .uri("/admin/procedure-manager/pause") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + let body = get_body_string(response).await; + assert!(body.contains("paused")); + // Resume + let response = setup_axum_app() + .await + .oneshot( + Request::builder() + .method(Method::POST) + .uri("/admin/procedure-manager/resume") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + let body = get_body_string(response).await; + assert!(body.contains("running")); + } +} diff --git a/src/meta-srv/src/service/admin/health.rs b/src/meta-srv/src/service/admin/health.rs index 76f79fc8ad..1394c389c8 100644 --- a/src/meta-srv/src/service/admin/health.rs +++ b/src/meta-srv/src/service/admin/health.rs @@ -21,6 +21,7 @@ use crate::service::admin::HttpHandler; const HTTP_OK: &str = "OK\n"; +#[derive(Clone)] pub struct HealthHandler; #[async_trait::async_trait] diff --git a/src/meta-srv/src/service/admin/leader.rs b/src/meta-srv/src/service/admin/leader.rs index 207176e6af..aa952083c6 100644 --- a/src/meta-srv/src/service/admin/leader.rs +++ b/src/meta-srv/src/service/admin/leader.rs @@ -21,6 +21,7 @@ use crate::error::{self, Result}; use crate::metasrv::ElectionRef; use crate::service::admin::HttpHandler; +#[derive(Clone)] pub struct LeaderHandler { pub election: Option, } diff --git a/src/meta-srv/src/service/admin/maintenance.rs b/src/meta-srv/src/service/admin/maintenance.rs index ed753c3bbe..e324a02f2b 100644 --- a/src/meta-srv/src/service/admin/maintenance.rs +++ b/src/meta-srv/src/service/admin/maintenance.rs @@ -34,7 +34,7 @@ pub struct MaintenanceHandler { } #[derive(Debug, Serialize, Deserialize)] -struct MaintenanceResponse { +pub(crate) struct MaintenanceResponse { enabled: bool, } @@ -49,7 +49,7 @@ impl TryFrom for String { } impl MaintenanceHandler { - async fn get_maintenance(&self) -> crate::Result { + pub(crate) async fn get_maintenance(&self) -> crate::Result { let enabled = self .manager .maintenance_mode() @@ -58,7 +58,7 @@ impl MaintenanceHandler { Ok(MaintenanceResponse { enabled }) } - async fn set_maintenance(&self) -> crate::Result { + pub(crate) async fn set_maintenance(&self) -> crate::Result { self.manager .set_maintenance_mode() .await @@ -68,7 +68,7 @@ impl MaintenanceHandler { Ok(MaintenanceResponse { enabled: true }) } - async fn unset_maintenance(&self) -> crate::Result { + pub(crate) async fn unset_maintenance(&self) -> crate::Result { self.manager .unset_maintenance_mode() .await diff --git a/src/meta-srv/src/service/admin/node_lease.rs b/src/meta-srv/src/service/admin/node_lease.rs index 26089450d2..bce3591054 100644 --- a/src/meta-srv/src/service/admin/node_lease.rs +++ b/src/meta-srv/src/service/admin/node_lease.rs @@ -24,6 +24,7 @@ use crate::key::{DatanodeLeaseKey, LeaseValue}; use crate::lease; use crate::service::admin::HttpHandler; +#[derive(Clone)] pub struct NodeLeaseHandler { pub meta_peer_client: MetaPeerClientRef, } diff --git a/src/meta-srv/src/service/admin/procedure.rs b/src/meta-srv/src/service/admin/procedure.rs index 232a93acbb..a0a6ee87e9 100644 --- a/src/meta-srv/src/service/admin/procedure.rs +++ b/src/meta-srv/src/service/admin/procedure.rs @@ -31,7 +31,7 @@ pub struct ProcedureManagerHandler { } #[derive(Debug, Serialize, Deserialize)] -struct ProcedureManagerStatusResponse { +pub(crate) struct ProcedureManagerStatusResponse { status: ProcedureManagerStatus, } @@ -43,7 +43,9 @@ enum ProcedureManagerStatus { } impl ProcedureManagerHandler { - async fn pause_procedure_manager(&self) -> crate::Result { + pub(crate) async fn pause_procedure_manager( + &self, + ) -> crate::Result { self.manager .pasue_procedure() .await @@ -55,7 +57,9 @@ impl ProcedureManagerHandler { }) } - async fn resume_procedure_manager(&self) -> crate::Result { + pub(crate) async fn resume_procedure_manager( + &self, + ) -> crate::Result { self.manager .resume_procedure() .await @@ -67,7 +71,9 @@ impl ProcedureManagerHandler { }) } - async fn get_procedure_manager_status(&self) -> crate::Result { + pub(crate) async fn get_procedure_manager_status( + &self, + ) -> crate::Result { let is_paused = self .manager .is_procedure_paused() diff --git a/src/meta-srv/src/service/admin/util.rs b/src/meta-srv/src/service/admin/util.rs index 13a3b5f89f..c47145656d 100644 --- a/src/meta-srv/src/service/admin/util.rs +++ b/src/meta-srv/src/service/admin/util.rs @@ -14,6 +14,9 @@ use std::fmt::Debug; +use axum::http::StatusCode; +use axum::response::{IntoResponse, Response}; +use axum::Json; use serde::Serialize; use snafu::ResultExt; use tonic::codegen::http; @@ -44,6 +47,16 @@ where .context(error::InvalidHttpBodySnafu) } +/// Converts any serializable type to an Axum JSON response with status 200. +pub fn to_axum_json_response(value: T) -> Response { + (StatusCode::OK, Json(value)).into_response() +} + +/// Returns a 404 response with an empty body. +pub fn to_axum_not_found_response() -> Response { + (StatusCode::NOT_FOUND, "").into_response() +} + /// Returns a 404 response with an empty body. pub fn to_not_found_response() -> Result> { http::Response::builder()