feat: move metasrv admin to http server while keep tonic for backward compatibility (#6466)

* feat: move metasrv admin to http server while keep tonic for backward compatibility

Signed-off-by: lyang24 <lanqingy93@gmail.com>

* refactor with nest method

Signed-off-by: lyang24 <lanqingy93@gmail.com>

---------

Signed-off-by: lyang24 <lanqingy93@gmail.com>
Co-authored-by: lyang24 <lanqingy@usc.edu>
This commit is contained in:
Lanqing Yang
2025-07-24 02:11:04 -07:00
committed by GitHub
parent 5908febd6c
commit b01be5efc5
11 changed files with 527 additions and 16 deletions

5
Cargo.lock generated
View File

@@ -7221,6 +7221,9 @@ version = "0.16.0"
dependencies = [
"api",
"async-trait",
"axum 0.8.1",
"axum-extra",
"axum-macros",
"bytes",
"chrono",
"clap 4.5.19",
@@ -7254,6 +7257,7 @@ dependencies = [
"http-body-util",
"humantime",
"humantime-serde",
"hyper 0.14.30",
"hyper-util",
"itertools 0.14.0",
"lazy_static",
@@ -7281,6 +7285,7 @@ dependencies = [
"toml 0.8.19",
"tonic 0.12.3",
"tower 0.5.2",
"tower-http 0.6.2",
"tracing",
"tracing-subscriber",
"typetag",

View File

@@ -225,6 +225,7 @@ tokio-util = { version = "0.7", features = ["io-util", "compat"] }
toml = "0.8.8"
tonic = { version = "0.12", features = ["tls", "gzip", "zstd"] }
tower = "0.5"
tower-http = "0.6"
tracing = "0.1"
tracing-appender = "0.2"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "fmt"] }

View File

@@ -20,6 +20,9 @@ local-ip-address.workspace = true
[dependencies]
api.workspace = true
async-trait.workspace = true
axum.workspace = true
axum-extra.workspace = true
axum-macros.workspace = true
bytes.workspace = true
chrono.workspace = true
clap.workspace = true
@@ -77,6 +80,7 @@ tokio-stream = { workspace = true, features = ["net"] }
toml.workspace = true
tonic.workspace = true
tower.workspace = true
tower-http.workspace = true
typetag.workspace = true
url = "2.3"
uuid.workspace = true
@@ -87,6 +91,7 @@ client = { workspace = true, features = ["testing"] }
common-meta = { workspace = true, features = ["testing"] }
common-procedure-test.workspace = true
common-wal = { workspace = true, features = ["testing"] }
hyper = "0.14"
session.workspace = true
tracing = "0.1"
tracing-subscriber.workspace = true

View File

@@ -72,6 +72,7 @@ use crate::selector::round_robin::RoundRobinSelector;
use crate::selector::weight_compute::RegionNumsBasedWeightCompute;
use crate::selector::SelectorType;
use crate::service::admin;
use crate::service::admin::admin_axum_router;
use crate::{error, Result};
pub struct MetasrvInstance {
@@ -98,12 +99,16 @@ impl MetasrvInstance {
pub async fn new(metasrv: Metasrv) -> Result<MetasrvInstance> {
let opts = metasrv.options().clone();
let plugins = metasrv.plugins().clone();
let metasrv = Arc::new(metasrv);
let builder = HttpServerBuilder::new(opts.http.clone())
// Wire up the admin_axum_router as an extra router
let extra_routers = admin_axum_router(metasrv.clone());
let mut builder = HttpServerBuilder::new(opts.http.clone())
.with_metrics_handler(MetricsHandler)
.with_greptime_config_options(opts.to_toml().context(error::TomlFormatSnafu)?);
builder = builder.with_extra_router(extra_routers);
let metasrv = Arc::new(metasrv);
// put metasrv into plugins for later use
plugins.insert::<Arc<Metasrv>>(metasrv.clone());
let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
@@ -150,6 +155,7 @@ impl MetasrvInstance {
self.signal_sender = Some(tx);
// Start gRPC server with admin services for backward compatibility
let mut router = router(self.metasrv.clone());
if let Some(configurator) = self.metasrv.plugins().get::<ConfiguratorRef>() {
router = configurator.config_grpc(router);

View File

@@ -12,12 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod health;
mod heartbeat;
mod leader;
mod maintenance;
mod node_lease;
mod procedure;
pub(crate) mod health;
pub(crate) mod heartbeat;
pub(crate) mod leader;
pub(crate) mod maintenance;
pub(crate) mod node_lease;
pub(crate) mod procedure;
mod util;
use std::collections::HashMap;
@@ -25,6 +25,9 @@ use std::convert::Infallible;
use std::sync::Arc;
use std::task::{Context, Poll};
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::{routing, Router as AxumRouter};
use bytes::Bytes;
use http_body_util::{BodyExt, Full};
use tonic::body::BoxBody;
@@ -32,6 +35,13 @@ use tonic::codegen::{empty_body, http, BoxFuture, Service};
use tonic::server::NamedService;
use crate::metasrv::Metasrv;
use crate::service::admin::health::HealthHandler;
use crate::service::admin::heartbeat::HeartBeatHandler;
use crate::service::admin::leader::LeaderHandler;
use crate::service::admin::maintenance::MaintenanceHandler;
use crate::service::admin::node_lease::NodeLeaseHandler;
use crate::service::admin::procedure::ProcedureManagerHandler;
use crate::service::admin::util::{to_axum_json_response, to_axum_not_found_response};
pub fn make_admin_service(metasrv: Arc<Metasrv>) -> Admin {
let router = Router::new().route("/health", health::HealthHandler);
@@ -223,6 +233,241 @@ fn boxed(body: String) -> BoxBody {
.boxed_unsync()
}
/// Expose admin HTTP endpoints as an Axum router for the main HTTP server.
pub fn admin_axum_router(metasrv: Arc<Metasrv>) -> AxumRouter {
let node_lease_handler = Arc::new(NodeLeaseHandler {
meta_peer_client: metasrv.meta_peer_client().clone(),
});
let heartbeat_handler = Arc::new(HeartBeatHandler {
meta_peer_client: metasrv.meta_peer_client().clone(),
});
let leader_handler = Arc::new(LeaderHandler {
election: metasrv.election().cloned(),
});
let maintenance_handler = Arc::new(MaintenanceHandler {
manager: metasrv.runtime_switch_manager().clone(),
});
let procedure_handler = Arc::new(ProcedureManagerHandler {
manager: metasrv.runtime_switch_manager().clone(),
});
let health_router = AxumRouter::new().route(
"/",
routing::get({
move || {
let handler = HealthHandler;
async move {
match handler
.handle("/health", http::Method::GET, &Default::default())
.await
{
Ok(status) => status.body().clone().into_response(),
Err(e) => {
common_telemetry::error!(e; "Health handler failed");
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
}
}
}
}),
);
let node_lease_router = AxumRouter::new().route(
"/",
routing::get({
let handler = node_lease_handler.clone();
move || async move {
match handler
.handle("/node-lease", http::Method::GET, &Default::default())
.await
{
Ok(resp) => resp.body().clone().into_response(),
Err(e) => {
common_telemetry::error!(e; "Node lease handler failed");
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
}
}
}),
);
let leader_router = AxumRouter::new().route(
"/",
routing::get({
let handler = leader_handler.clone();
move || async move {
match handler
.handle("/leader", http::Method::GET, &Default::default())
.await
{
Ok(resp) => resp.body().clone().into_response(),
Err(e) => {
common_telemetry::error!(e; "Leader handler failed");
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
}
}
}),
);
let heartbeat_router = AxumRouter::new()
.route(
"/",
routing::get({
let handler = heartbeat_handler.clone();
move || async move {
match handler
.handle("/heartbeat", http::Method::GET, &Default::default())
.await
{
Ok(resp) => resp.body().clone().into_response(),
Err(e) => {
common_telemetry::error!(e; "Heartbeat handler failed");
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
}
}
}),
)
.route(
"/help",
routing::get({
let handler = heartbeat_handler.clone();
move || async move {
match handler
.handle("/heartbeat/help", http::Method::GET, &Default::default())
.await
{
Ok(resp) => resp.body().clone().into_response(),
Err(e) => {
common_telemetry::error!(e; "Heartbeat help handler failed");
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
}
}
}),
);
let maintenance_router = AxumRouter::new()
.route(
"/",
routing::get({
let handler = maintenance_handler.clone();
move || async move {
match handler.get_maintenance().await {
Ok(resp) => to_axum_json_response(resp),
Err(e) => {
common_telemetry::error!(e; "Maintenance handler failed");
to_axum_not_found_response()
}
}
}
}),
)
.route(
"/status",
routing::get({
let handler = maintenance_handler.clone();
move || async move {
match handler.get_maintenance().await {
Ok(resp) => to_axum_json_response(resp),
Err(e) => {
common_telemetry::error!(e; "Maintenance status handler failed");
to_axum_not_found_response()
}
}
}
}),
)
.route(
"/enable",
routing::post({
let handler = maintenance_handler.clone();
move || async move {
match handler.set_maintenance().await {
Ok(resp) => to_axum_json_response(resp),
Err(e) => {
common_telemetry::error!(e; "Maintenance enable handler failed");
to_axum_not_found_response()
}
}
}
}),
)
.route(
"/disable",
routing::post({
let handler = maintenance_handler.clone();
move || async move {
match handler.unset_maintenance().await {
Ok(resp) => to_axum_json_response(resp),
Err(e) => {
common_telemetry::error!(e; "Maintenance disable handler failed");
to_axum_not_found_response()
}
}
}
}),
);
let procedure_router = AxumRouter::new()
.route(
"/status",
routing::get({
let handler = procedure_handler.clone();
move || async move {
match handler.get_procedure_manager_status().await {
Ok(resp) => to_axum_json_response(resp),
Err(e) => {
common_telemetry::error!(e; "Procedure manager status handler failed");
to_axum_not_found_response()
}
}
}
}),
)
.route(
"/pause",
routing::post({
let handler = procedure_handler.clone();
move || async move {
match handler.pause_procedure_manager().await {
Ok(resp) => to_axum_json_response(resp),
Err(e) => {
common_telemetry::error!(e; "Procedure manager pause handler failed");
to_axum_not_found_response()
}
}
}
}),
)
.route(
"/resume",
routing::post({
let handler = procedure_handler.clone();
move || async move {
match handler.resume_procedure_manager().await {
Ok(resp) => to_axum_json_response(resp),
Err(e) => {
common_telemetry::error!(e; "Procedure manager resume handler failed");
to_axum_not_found_response()
}
}
}
}),
);
let admin_router = AxumRouter::new()
.nest("/health", health_router)
.nest("/node-lease", node_lease_router)
.nest("/leader", leader_router)
.nest("/heartbeat", heartbeat_router)
.nest("/maintenance", maintenance_router)
.nest("/procedure-manager", procedure_router);
AxumRouter::new().nest("/admin", admin_router)
}
#[cfg(test)]
mod tests {
use common_meta::kv_backend::memory::MemoryKvBackend;
@@ -558,3 +803,230 @@ mod tests {
assert!(response.contains("404 Not Found"));
}
}
#[cfg(test)]
mod axum_admin_tests {
use std::sync::Arc;
use axum::body::{to_bytes, Body};
use axum::http::{Method, Request, StatusCode};
use common_meta::kv_backend::memory::MemoryKvBackend;
use tower::ServiceExt; // for `oneshot`
use super::*;
use crate::metasrv::builder::MetasrvBuilder;
use crate::metasrv::MetasrvOptions;
async fn setup_axum_app() -> AxumRouter {
let kv_backend = Arc::new(MemoryKvBackend::new());
let metasrv = MetasrvBuilder::new()
.options(MetasrvOptions::default())
.kv_backend(kv_backend)
.build()
.await
.unwrap();
let metasrv = Arc::new(metasrv);
admin_axum_router(metasrv)
}
async fn get_body_string(resp: axum::response::Response) -> String {
let body_bytes = to_bytes(resp.into_body(), usize::MAX).await.unwrap();
String::from_utf8_lossy(&body_bytes).to_string()
}
#[tokio::test]
async fn test_admin_health() {
let app = setup_axum_app().await;
let response = app
.oneshot(
Request::builder()
.uri("/admin/health")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = get_body_string(response).await;
assert!(body.to_lowercase().contains("ok"));
}
#[tokio::test]
async fn test_admin_node_lease() {
let app = setup_axum_app().await;
let response = app
.oneshot(
Request::builder()
.uri("/admin/node-lease")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
}
#[tokio::test]
async fn test_admin_heartbeat() {
let app = setup_axum_app().await;
let response = app
.oneshot(
Request::builder()
.uri("/admin/heartbeat")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
}
#[tokio::test]
async fn test_admin_heartbeat_help() {
let app = setup_axum_app().await;
let response = app
.oneshot(
Request::builder()
.uri("/admin/heartbeat/help")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
}
#[tokio::test]
async fn test_admin_leader() {
let app = setup_axum_app().await;
let response = app
.oneshot(
Request::builder()
.uri("/admin/leader")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
}
#[tokio::test]
async fn test_admin_maintenance() {
let app = setup_axum_app().await;
let response = app
.oneshot(
Request::builder()
.uri("/admin/maintenance")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = get_body_string(response).await;
assert!(body.contains("enabled"));
}
#[tokio::test]
async fn test_admin_maintenance_status() {
let app = setup_axum_app().await;
let response = app
.oneshot(
Request::builder()
.uri("/admin/maintenance/status")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = get_body_string(response).await;
assert!(body.contains("enabled"));
}
#[tokio::test]
async fn test_admin_maintenance_enable_disable() {
// Enable maintenance
let response = setup_axum_app()
.await
.oneshot(
Request::builder()
.method(Method::POST)
.uri("/admin/maintenance/enable")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = get_body_string(response).await;
assert!(body.contains("enabled"));
// Disable maintenance
let response = setup_axum_app()
.await
.oneshot(
Request::builder()
.method(Method::POST)
.uri("/admin/maintenance/disable")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = get_body_string(response).await;
assert!(body.contains("enabled"));
}
#[tokio::test]
async fn test_admin_procedure_manager_status() {
let app = setup_axum_app().await;
let response = app
.oneshot(
Request::builder()
.uri("/admin/procedure-manager/status")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = get_body_string(response).await;
assert!(body.contains("status"));
}
#[tokio::test]
async fn test_admin_procedure_manager_pause_resume() {
// Pause
let response = setup_axum_app()
.await
.oneshot(
Request::builder()
.method(Method::POST)
.uri("/admin/procedure-manager/pause")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = get_body_string(response).await;
assert!(body.contains("paused"));
// Resume
let response = setup_axum_app()
.await
.oneshot(
Request::builder()
.method(Method::POST)
.uri("/admin/procedure-manager/resume")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = get_body_string(response).await;
assert!(body.contains("running"));
}
}

View File

@@ -21,6 +21,7 @@ use crate::service::admin::HttpHandler;
const HTTP_OK: &str = "OK\n";
#[derive(Clone)]
pub struct HealthHandler;
#[async_trait::async_trait]

View File

@@ -21,6 +21,7 @@ use crate::error::{self, Result};
use crate::metasrv::ElectionRef;
use crate::service::admin::HttpHandler;
#[derive(Clone)]
pub struct LeaderHandler {
pub election: Option<ElectionRef>,
}

View File

@@ -34,7 +34,7 @@ pub struct MaintenanceHandler {
}
#[derive(Debug, Serialize, Deserialize)]
struct MaintenanceResponse {
pub(crate) struct MaintenanceResponse {
enabled: bool,
}
@@ -49,7 +49,7 @@ impl TryFrom<MaintenanceResponse> for String {
}
impl MaintenanceHandler {
async fn get_maintenance(&self) -> crate::Result<MaintenanceResponse> {
pub(crate) async fn get_maintenance(&self) -> crate::Result<MaintenanceResponse> {
let enabled = self
.manager
.maintenance_mode()
@@ -58,7 +58,7 @@ impl MaintenanceHandler {
Ok(MaintenanceResponse { enabled })
}
async fn set_maintenance(&self) -> crate::Result<MaintenanceResponse> {
pub(crate) async fn set_maintenance(&self) -> crate::Result<MaintenanceResponse> {
self.manager
.set_maintenance_mode()
.await
@@ -68,7 +68,7 @@ impl MaintenanceHandler {
Ok(MaintenanceResponse { enabled: true })
}
async fn unset_maintenance(&self) -> crate::Result<MaintenanceResponse> {
pub(crate) async fn unset_maintenance(&self) -> crate::Result<MaintenanceResponse> {
self.manager
.unset_maintenance_mode()
.await

View File

@@ -24,6 +24,7 @@ use crate::key::{DatanodeLeaseKey, LeaseValue};
use crate::lease;
use crate::service::admin::HttpHandler;
#[derive(Clone)]
pub struct NodeLeaseHandler {
pub meta_peer_client: MetaPeerClientRef,
}

View File

@@ -31,7 +31,7 @@ pub struct ProcedureManagerHandler {
}
#[derive(Debug, Serialize, Deserialize)]
struct ProcedureManagerStatusResponse {
pub(crate) struct ProcedureManagerStatusResponse {
status: ProcedureManagerStatus,
}
@@ -43,7 +43,9 @@ enum ProcedureManagerStatus {
}
impl ProcedureManagerHandler {
async fn pause_procedure_manager(&self) -> crate::Result<ProcedureManagerStatusResponse> {
pub(crate) async fn pause_procedure_manager(
&self,
) -> crate::Result<ProcedureManagerStatusResponse> {
self.manager
.pasue_procedure()
.await
@@ -55,7 +57,9 @@ impl ProcedureManagerHandler {
})
}
async fn resume_procedure_manager(&self) -> crate::Result<ProcedureManagerStatusResponse> {
pub(crate) async fn resume_procedure_manager(
&self,
) -> crate::Result<ProcedureManagerStatusResponse> {
self.manager
.resume_procedure()
.await
@@ -67,7 +71,9 @@ impl ProcedureManagerHandler {
})
}
async fn get_procedure_manager_status(&self) -> crate::Result<ProcedureManagerStatusResponse> {
pub(crate) async fn get_procedure_manager_status(
&self,
) -> crate::Result<ProcedureManagerStatusResponse> {
let is_paused = self
.manager
.is_procedure_paused()

View File

@@ -14,6 +14,9 @@
use std::fmt::Debug;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use axum::Json;
use serde::Serialize;
use snafu::ResultExt;
use tonic::codegen::http;
@@ -44,6 +47,16 @@ where
.context(error::InvalidHttpBodySnafu)
}
/// Converts any serializable type to an Axum JSON response with status 200.
pub fn to_axum_json_response<T: Serialize>(value: T) -> Response {
(StatusCode::OK, Json(value)).into_response()
}
/// Returns a 404 response with an empty body.
pub fn to_axum_not_found_response() -> Response {
(StatusCode::NOT_FOUND, "").into_response()
}
/// Returns a 404 response with an empty body.
pub fn to_not_found_response() -> Result<http::Response<String>> {
http::Response::builder()