mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-27 18:30:38 +00:00
refactor(meta): refactor admin service to use modern axum handlers (#6833)
* refactor(meta): refactor admin service to use modern axum handlers Signed-off-by: WenyXu <wenymedia@gmail.com> * Update src/meta-srv/src/service/admin/health.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Signed-off-by: WenyXu <wenymedia@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -27,26 +27,26 @@ use std::convert::Infallible;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use axum::http::StatusCode;
|
||||
use axum::response::IntoResponse;
|
||||
use axum::{routing, Router as AxumRouter};
|
||||
use tonic::body::Body;
|
||||
use tonic::codegen::{http, BoxFuture, Service};
|
||||
use tonic::server::NamedService;
|
||||
|
||||
use crate::metasrv::Metasrv;
|
||||
use crate::service::admin::health::HealthHandler;
|
||||
use crate::service::admin::heartbeat::HeartBeatHandler;
|
||||
use crate::service::admin::leader::LeaderHandler;
|
||||
use crate::service::admin::maintenance::MaintenanceHandler;
|
||||
use crate::service::admin::node_lease::NodeLeaseHandler;
|
||||
use crate::service::admin::procedure::ProcedureManagerHandler;
|
||||
use crate::service::admin::recovery::{
|
||||
get_recovery_mode, set_recovery_mode, unset_recovery_mode, RecoveryHandler,
|
||||
};
|
||||
use crate::service::admin::recovery::RecoveryHandler;
|
||||
use crate::service::admin::sequencer::TableIdSequenceHandler;
|
||||
use crate::service::admin::util::{to_axum_json_response, to_axum_not_found_response};
|
||||
|
||||
/// Expose admin http service on rpc port(3002).
|
||||
///
|
||||
/// # Deprecated
|
||||
///
|
||||
/// This function is deprecated and will be removed in the future. Please use
|
||||
/// [`admin_axum_router`] instead.
|
||||
pub fn make_admin_service(metasrv: Arc<Metasrv>) -> Admin {
|
||||
let router = Router::new().route("/health", health::HealthHandler);
|
||||
|
||||
@@ -231,257 +231,81 @@ fn check_path(path: &str) {
|
||||
|
||||
/// Expose admin HTTP endpoints as an Axum router for the main HTTP server.
|
||||
pub fn admin_axum_router(metasrv: Arc<Metasrv>) -> AxumRouter {
|
||||
let node_lease_handler = Arc::new(NodeLeaseHandler {
|
||||
let node_lease_handler = NodeLeaseHandler {
|
||||
meta_peer_client: metasrv.meta_peer_client().clone(),
|
||||
});
|
||||
let heartbeat_handler = Arc::new(HeartBeatHandler {
|
||||
};
|
||||
let heartbeat_handler = HeartBeatHandler {
|
||||
meta_peer_client: metasrv.meta_peer_client().clone(),
|
||||
});
|
||||
let leader_handler = Arc::new(LeaderHandler {
|
||||
};
|
||||
let leader_handler = LeaderHandler {
|
||||
election: metasrv.election().cloned(),
|
||||
});
|
||||
let maintenance_handler = Arc::new(MaintenanceHandler {
|
||||
};
|
||||
let maintenance_handler = MaintenanceHandler {
|
||||
manager: metasrv.runtime_switch_manager().clone(),
|
||||
});
|
||||
let procedure_handler = Arc::new(ProcedureManagerHandler {
|
||||
};
|
||||
let procedure_handler = ProcedureManagerHandler {
|
||||
manager: metasrv.runtime_switch_manager().clone(),
|
||||
});
|
||||
let recovery_handler = Arc::new(RecoveryHandler {
|
||||
};
|
||||
let recovery_handler = RecoveryHandler {
|
||||
manager: metasrv.runtime_switch_manager().clone(),
|
||||
});
|
||||
let table_id_sequence_handler = Arc::new(TableIdSequenceHandler {
|
||||
};
|
||||
let table_id_sequence_handler = TableIdSequenceHandler {
|
||||
table_id_sequence: metasrv.table_id_sequence().clone(),
|
||||
runtime_switch_manager: metasrv.runtime_switch_manager().clone(),
|
||||
});
|
||||
let sequence_router = AxumRouter::new().nest(
|
||||
"/table",
|
||||
AxumRouter::new()
|
||||
.route("/next-id", routing::get(sequencer::get_next_table_id))
|
||||
.route("/set-next-id", routing::post(sequencer::set_next_table_id))
|
||||
.with_state(table_id_sequence_handler),
|
||||
);
|
||||
|
||||
let health_router = AxumRouter::new().route(
|
||||
"/",
|
||||
routing::get({
|
||||
move || {
|
||||
let handler = HealthHandler;
|
||||
async move {
|
||||
match handler
|
||||
.handle("/health", http::Method::GET, &Default::default())
|
||||
.await
|
||||
{
|
||||
Ok(status) => status.body().clone().into_response(),
|
||||
Err(e) => {
|
||||
common_telemetry::error!(e; "Health handler failed");
|
||||
StatusCode::INTERNAL_SERVER_ERROR.into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}),
|
||||
);
|
||||
|
||||
let node_lease_router = AxumRouter::new().route(
|
||||
"/",
|
||||
routing::get({
|
||||
let handler = node_lease_handler.clone();
|
||||
move || async move {
|
||||
match handler
|
||||
.handle("/node-lease", http::Method::GET, &Default::default())
|
||||
.await
|
||||
{
|
||||
Ok(resp) => resp.body().clone().into_response(),
|
||||
Err(e) => {
|
||||
common_telemetry::error!(e; "Node lease handler failed");
|
||||
StatusCode::INTERNAL_SERVER_ERROR.into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
}),
|
||||
);
|
||||
|
||||
let leader_router = AxumRouter::new().route(
|
||||
"/",
|
||||
routing::get({
|
||||
let handler = leader_handler.clone();
|
||||
move || async move {
|
||||
match handler
|
||||
.handle("/leader", http::Method::GET, &Default::default())
|
||||
.await
|
||||
{
|
||||
Ok(resp) => resp.body().clone().into_response(),
|
||||
Err(e) => {
|
||||
common_telemetry::error!(e; "Leader handler failed");
|
||||
StatusCode::INTERNAL_SERVER_ERROR.into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
}),
|
||||
);
|
||||
|
||||
let heartbeat_router = AxumRouter::new()
|
||||
.route(
|
||||
"/",
|
||||
routing::get({
|
||||
let handler = heartbeat_handler.clone();
|
||||
move || async move {
|
||||
match handler
|
||||
.handle("/heartbeat", http::Method::GET, &Default::default())
|
||||
.await
|
||||
{
|
||||
Ok(resp) => resp.body().clone().into_response(),
|
||||
Err(e) => {
|
||||
common_telemetry::error!(e; "Heartbeat handler failed");
|
||||
StatusCode::INTERNAL_SERVER_ERROR.into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/help",
|
||||
routing::get({
|
||||
let handler = heartbeat_handler.clone();
|
||||
move || async move {
|
||||
match handler
|
||||
.handle("/heartbeat/help", http::Method::GET, &Default::default())
|
||||
.await
|
||||
{
|
||||
Ok(resp) => resp.body().clone().into_response(),
|
||||
Err(e) => {
|
||||
common_telemetry::error!(e; "Heartbeat help handler failed");
|
||||
StatusCode::INTERNAL_SERVER_ERROR.into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
}),
|
||||
);
|
||||
|
||||
let maintenance_router = AxumRouter::new()
|
||||
.route(
|
||||
"/",
|
||||
routing::get({
|
||||
let handler = maintenance_handler.clone();
|
||||
move || async move {
|
||||
match handler.get_maintenance().await {
|
||||
Ok(resp) => to_axum_json_response(resp),
|
||||
Err(e) => {
|
||||
common_telemetry::error!(e; "Maintenance handler failed");
|
||||
to_axum_not_found_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/status",
|
||||
routing::get({
|
||||
let handler = maintenance_handler.clone();
|
||||
move || async move {
|
||||
match handler.get_maintenance().await {
|
||||
Ok(resp) => to_axum_json_response(resp),
|
||||
Err(e) => {
|
||||
common_telemetry::error!(e; "Maintenance status handler failed");
|
||||
to_axum_not_found_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/enable",
|
||||
routing::post({
|
||||
let handler = maintenance_handler.clone();
|
||||
move || async move {
|
||||
match handler.set_maintenance().await {
|
||||
Ok(resp) => to_axum_json_response(resp),
|
||||
Err(e) => {
|
||||
common_telemetry::error!(e; "Maintenance enable handler failed");
|
||||
to_axum_not_found_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/disable",
|
||||
routing::post({
|
||||
let handler = maintenance_handler.clone();
|
||||
move || async move {
|
||||
match handler.unset_maintenance().await {
|
||||
Ok(resp) => to_axum_json_response(resp),
|
||||
Err(e) => {
|
||||
common_telemetry::error!(e; "Maintenance disable handler failed");
|
||||
to_axum_not_found_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
}),
|
||||
);
|
||||
|
||||
let procedure_router = AxumRouter::new()
|
||||
.route(
|
||||
"/status",
|
||||
routing::get({
|
||||
let handler = procedure_handler.clone();
|
||||
move || async move {
|
||||
match handler.get_procedure_manager_status().await {
|
||||
Ok(resp) => to_axum_json_response(resp),
|
||||
Err(e) => {
|
||||
common_telemetry::error!(e; "Procedure manager status handler failed");
|
||||
to_axum_not_found_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/pause",
|
||||
routing::post({
|
||||
let handler = procedure_handler.clone();
|
||||
move || async move {
|
||||
match handler.pause_procedure_manager().await {
|
||||
Ok(resp) => to_axum_json_response(resp),
|
||||
Err(e) => {
|
||||
common_telemetry::error!(e; "Procedure manager pause handler failed");
|
||||
to_axum_not_found_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/resume",
|
||||
routing::post({
|
||||
let handler = procedure_handler.clone();
|
||||
move || async move {
|
||||
match handler.resume_procedure_manager().await {
|
||||
Ok(resp) => to_axum_json_response(resp),
|
||||
Err(e) => {
|
||||
common_telemetry::error!(e; "Procedure manager resume handler failed");
|
||||
to_axum_not_found_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
}),
|
||||
);
|
||||
|
||||
let recovery_router = AxumRouter::new()
|
||||
.route("/enable", routing::post(set_recovery_mode))
|
||||
.route("/disable", routing::post(unset_recovery_mode))
|
||||
.route("/status", routing::get(get_recovery_mode))
|
||||
.with_state(recovery_handler);
|
||||
};
|
||||
|
||||
let admin_router = AxumRouter::new()
|
||||
.nest("/health", health_router)
|
||||
.nest("/node-lease", node_lease_router)
|
||||
.nest("/leader", leader_router)
|
||||
.nest("/heartbeat", heartbeat_router)
|
||||
.nest("/maintenance", maintenance_router)
|
||||
.nest("/procedure-manager", procedure_router)
|
||||
.nest("/recovery", recovery_router)
|
||||
.nest("/sequence", sequence_router);
|
||||
.route("/health", routing::get(health::health))
|
||||
.route(
|
||||
"/node-lease",
|
||||
routing::get(node_lease::get).with_state(node_lease_handler),
|
||||
)
|
||||
.route(
|
||||
"/leader",
|
||||
routing::get(leader::get).with_state(leader_handler),
|
||||
)
|
||||
.nest(
|
||||
"/heartbeat",
|
||||
AxumRouter::new()
|
||||
.route("/", routing::get(heartbeat::get))
|
||||
.route("/help", routing::get(heartbeat::help))
|
||||
.with_state(heartbeat_handler),
|
||||
)
|
||||
.nest(
|
||||
"/maintenance",
|
||||
AxumRouter::new()
|
||||
.route("/", routing::get(maintenance::status))
|
||||
.route("/status", routing::get(maintenance::status))
|
||||
.route("/enable", routing::post(maintenance::set))
|
||||
.route("/disable", routing::post(maintenance::unset))
|
||||
.with_state(maintenance_handler),
|
||||
)
|
||||
.nest(
|
||||
"/procedure-manager",
|
||||
AxumRouter::new()
|
||||
.route("/status", routing::get(procedure::status))
|
||||
.route("/pause", routing::post(procedure::pause))
|
||||
.route("/resume", routing::post(procedure::resume))
|
||||
.with_state(procedure_handler),
|
||||
)
|
||||
.nest(
|
||||
"/recovery",
|
||||
AxumRouter::new()
|
||||
.route("/status", routing::get(recovery::status))
|
||||
.route("/enable", routing::post(recovery::set))
|
||||
.route("/disable", routing::post(recovery::unset))
|
||||
.with_state(recovery_handler),
|
||||
)
|
||||
.nest(
|
||||
"/sequence",
|
||||
AxumRouter::new().nest(
|
||||
"/table",
|
||||
AxumRouter::new()
|
||||
.route("/next-id", routing::get(sequencer::get_next_table_id))
|
||||
.route("/set-next-id", routing::post(sequencer::set_next_table_id))
|
||||
.with_state(table_id_sequence_handler.clone()),
|
||||
),
|
||||
);
|
||||
|
||||
AxumRouter::new().nest("/admin", admin_router)
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use tonic::codegen::http;
|
||||
|
||||
use crate::error::Result;
|
||||
@@ -24,6 +25,16 @@ const HTTP_OK: &str = "OK\n";
|
||||
#[derive(Clone)]
|
||||
pub struct HealthHandler;
|
||||
|
||||
/// Health check endpoint that returns HTTP 200 OK if the service is healthy.
|
||||
#[axum_macros::debug_handler]
|
||||
pub(crate) async fn health() -> Response {
|
||||
http::Response::builder()
|
||||
.status(http::StatusCode::OK)
|
||||
.body(HTTP_OK.to_owned())
|
||||
.unwrap()
|
||||
.into_response()
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl HttpHandler for HealthHandler {
|
||||
async fn handle(
|
||||
|
||||
@@ -14,6 +14,9 @@
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use axum::extract::{Query, State};
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use axum::Json;
|
||||
use common_meta::datanode::DatanodeStatValue;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::ResultExt;
|
||||
@@ -21,6 +24,7 @@ use tonic::codegen::http;
|
||||
|
||||
use crate::cluster::MetaPeerClientRef;
|
||||
use crate::error::{self, Result};
|
||||
use crate::service::admin::util::ErrorHandler;
|
||||
use crate::service::admin::{util, HttpHandler};
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -28,6 +32,47 @@ pub struct HeartBeatHandler {
|
||||
pub meta_peer_client: MetaPeerClientRef,
|
||||
}
|
||||
|
||||
impl HeartBeatHandler {
|
||||
async fn get_heartbeat(&self, filter: Option<&str>) -> Result<StatValues> {
|
||||
let stat_kvs = self.meta_peer_client.get_all_dn_stat_kvs().await?;
|
||||
let mut stat_vals: Vec<DatanodeStatValue> = stat_kvs.into_values().collect();
|
||||
|
||||
if let Some(addr) = filter {
|
||||
stat_vals = filter_by_addr(stat_vals, addr);
|
||||
}
|
||||
Ok(StatValues { stat_vals })
|
||||
}
|
||||
}
|
||||
|
||||
fn find_addr_filter_from_params(params: &HashMap<String, String>) -> Option<&str> {
|
||||
params.get("addr").map(|s| s.as_str())
|
||||
}
|
||||
|
||||
/// Get the heartbeat handler.
|
||||
#[axum_macros::debug_handler]
|
||||
pub(crate) async fn get(
|
||||
State(handler): State<HeartBeatHandler>,
|
||||
Query(params): Query<HashMap<String, String>>,
|
||||
) -> Response {
|
||||
let filter = find_addr_filter_from_params(¶ms);
|
||||
handler
|
||||
.get_heartbeat(filter)
|
||||
.await
|
||||
.map_err(ErrorHandler::new)
|
||||
.map(Json)
|
||||
.into_response()
|
||||
}
|
||||
|
||||
/// Get the heartbeat help handler.
|
||||
#[axum_macros::debug_handler]
|
||||
pub(crate) async fn help() -> Response {
|
||||
r#"
|
||||
- GET /heartbeat
|
||||
- GET /heartbeat?addr=127.0.0.1:3001
|
||||
"#
|
||||
.into_response()
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl HttpHandler for HeartBeatHandler {
|
||||
async fn handle(
|
||||
@@ -45,13 +90,8 @@ impl HttpHandler for HeartBeatHandler {
|
||||
);
|
||||
}
|
||||
|
||||
let stat_kvs = self.meta_peer_client.get_all_dn_stat_kvs().await?;
|
||||
let mut stat_vals: Vec<DatanodeStatValue> = stat_kvs.into_values().collect();
|
||||
|
||||
if let Some(addr) = params.get("addr") {
|
||||
stat_vals = filter_by_addr(stat_vals, addr);
|
||||
}
|
||||
let result = StatValues { stat_vals }.try_into()?;
|
||||
let filter = find_addr_filter_from_params(params);
|
||||
let result = self.get_heartbeat(filter).await?.try_into()?;
|
||||
|
||||
http::Response::builder()
|
||||
.status(http::StatusCode::OK)
|
||||
@@ -85,9 +125,18 @@ fn filter_by_addr(stat_vals: Vec<DatanodeStatValue>, addr: &str) -> Vec<Datanode
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_meta::datanode::{DatanodeStatValue, Stat};
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::service::admin::heartbeat::filter_by_addr;
|
||||
use axum::body::{to_bytes, Body};
|
||||
use axum::http::{self, Request};
|
||||
use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, Stat};
|
||||
use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use common_meta::rpc::store::PutRequest;
|
||||
use tower::ServiceExt;
|
||||
|
||||
use crate::cluster::MetaPeerClientBuilder;
|
||||
use crate::service::admin::heartbeat::{self, filter_by_addr, HeartBeatHandler};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_filter_by_addr() {
|
||||
@@ -141,4 +190,71 @@ mod tests {
|
||||
3
|
||||
);
|
||||
}
|
||||
|
||||
async fn get_body_string(resp: axum::response::Response) -> String {
|
||||
let body_bytes = to_bytes(resp.into_body(), usize::MAX).await.unwrap();
|
||||
String::from_utf8_lossy(&body_bytes).to_string()
|
||||
}
|
||||
|
||||
async fn put_stat_value(kv_backend: &KvBackendRef, node_id: u64, addr: &str) {
|
||||
let value: Vec<u8> = DatanodeStatValue {
|
||||
stats: vec![Stat {
|
||||
addr: addr.to_string(),
|
||||
timestamp_millis: 3,
|
||||
..Default::default()
|
||||
}],
|
||||
}
|
||||
.try_into()
|
||||
.unwrap();
|
||||
let put = PutRequest::new()
|
||||
.with_key(DatanodeStatKey { node_id })
|
||||
.with_value(value);
|
||||
kv_backend.put(put).await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_heartbeat_with_filter() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let kv_backend = Arc::new(MemoryKvBackend::new());
|
||||
let meta_peer_client = MetaPeerClientBuilder::default()
|
||||
.election(None)
|
||||
.in_memory(kv_backend.clone())
|
||||
.build()
|
||||
.map(Arc::new)
|
||||
.unwrap();
|
||||
let kv_backend = kv_backend as _;
|
||||
put_stat_value(&kv_backend, 0, "127.0.0.1:3000").await;
|
||||
put_stat_value(&kv_backend, 1, "127.0.0.1:3001").await;
|
||||
put_stat_value(&kv_backend, 2, "127.0.0.1:3002").await;
|
||||
|
||||
let handler = HeartBeatHandler {
|
||||
meta_peer_client: meta_peer_client.clone(),
|
||||
};
|
||||
let app = axum::Router::new()
|
||||
.route("/", axum::routing::get(heartbeat::get))
|
||||
.with_state(handler);
|
||||
|
||||
let req = Request::builder()
|
||||
.uri("/?addr=127.0.0.1:3003")
|
||||
.body(Body::empty())
|
||||
.unwrap();
|
||||
let resp = app.clone().oneshot(req).await.unwrap();
|
||||
let status = resp.status();
|
||||
let body = get_body_string(resp).await;
|
||||
assert_eq!(status, http::StatusCode::OK);
|
||||
assert_eq!(body, r#"[]"#);
|
||||
|
||||
let req = Request::builder()
|
||||
.uri("/?addr=127.0.0.1:3001")
|
||||
.body(Body::empty())
|
||||
.unwrap();
|
||||
let resp = app.oneshot(req).await.unwrap();
|
||||
let status = resp.status();
|
||||
let body = get_body_string(resp).await;
|
||||
assert_eq!(status, http::StatusCode::OK);
|
||||
assert_eq!(
|
||||
body,
|
||||
"[[{\"timestamp_millis\":3,\"id\":0,\"addr\":\"127.0.0.1:3001\",\"rcus\":0,\"wcus\":0,\"region_num\":0,\"region_stats\":[],\"topic_stats\":[],\"node_epoch\":0,\"datanode_workloads\":{\"types\":[]}}]]"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,11 +14,14 @@
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use axum::extract::State;
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use snafu::ResultExt;
|
||||
use tonic::codegen::http;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::metasrv::ElectionRef;
|
||||
use crate::service::admin::util::ErrorHandler;
|
||||
use crate::service::admin::HttpHandler;
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -26,6 +29,27 @@ pub struct LeaderHandler {
|
||||
pub election: Option<ElectionRef>,
|
||||
}
|
||||
|
||||
impl LeaderHandler {
|
||||
async fn get_leader(&self) -> Result<Option<String>> {
|
||||
if let Some(election) = &self.election {
|
||||
let leader_addr = election.leader().await?.0;
|
||||
return Ok(Some(leader_addr));
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the leader handler.
|
||||
#[axum_macros::debug_handler]
|
||||
pub(crate) async fn get(State(handler): State<LeaderHandler>) -> Response {
|
||||
handler
|
||||
.get_leader()
|
||||
.await
|
||||
.map_err(ErrorHandler::new)
|
||||
.map(|leader| leader.unwrap_or("election info is None".to_string()))
|
||||
.into_response()
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl HttpHandler for LeaderHandler {
|
||||
async fn handle(
|
||||
@@ -34,8 +58,7 @@ impl HttpHandler for LeaderHandler {
|
||||
_: http::Method,
|
||||
_: &HashMap<String, String>,
|
||||
) -> Result<http::Response<String>> {
|
||||
if let Some(election) = &self.election {
|
||||
let leader_addr = election.leader().await?.0;
|
||||
if let Some(leader_addr) = self.get_leader().await? {
|
||||
return http::Response::builder()
|
||||
.status(http::StatusCode::OK)
|
||||
.body(leader_addr)
|
||||
|
||||
@@ -14,18 +14,20 @@
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use axum::extract::State;
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use axum::Json;
|
||||
use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
|
||||
use common_telemetry::{info, warn};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use tonic::codegen::http;
|
||||
use tonic::codegen::http::Response;
|
||||
|
||||
use crate::error::{
|
||||
self, MissingRequiredParameterSnafu, ParseBoolSnafu, Result, RuntimeSwitchManagerSnafu,
|
||||
UnsupportedSnafu,
|
||||
};
|
||||
use crate::service::admin::util::{to_json_response, to_not_found_response};
|
||||
use crate::service::admin::util::{to_json_response, to_not_found_response, ErrorHandler};
|
||||
use crate::service::admin::HttpHandler;
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -38,6 +40,39 @@ pub(crate) struct MaintenanceResponse {
|
||||
enabled: bool,
|
||||
}
|
||||
|
||||
/// Get the maintenance mode.
|
||||
#[axum_macros::debug_handler]
|
||||
pub(crate) async fn status(State(handler): State<MaintenanceHandler>) -> Response {
|
||||
handler
|
||||
.get_maintenance()
|
||||
.await
|
||||
.map(Json)
|
||||
.map_err(ErrorHandler::new)
|
||||
.into_response()
|
||||
}
|
||||
|
||||
/// Set the maintenance mode.
|
||||
#[axum_macros::debug_handler]
|
||||
pub(crate) async fn set(State(handler): State<MaintenanceHandler>) -> Response {
|
||||
handler
|
||||
.set_maintenance()
|
||||
.await
|
||||
.map(Json)
|
||||
.map_err(ErrorHandler::new)
|
||||
.into_response()
|
||||
}
|
||||
|
||||
/// Unset the maintenance mode.
|
||||
#[axum_macros::debug_handler]
|
||||
pub(crate) async fn unset(State(handler): State<MaintenanceHandler>) -> Response {
|
||||
handler
|
||||
.unset_maintenance()
|
||||
.await
|
||||
.map(Json)
|
||||
.map_err(ErrorHandler::new)
|
||||
.into_response()
|
||||
}
|
||||
|
||||
impl TryFrom<MaintenanceResponse> for String {
|
||||
type Error = error::Error;
|
||||
|
||||
@@ -115,7 +150,7 @@ impl HttpHandler for MaintenanceHandler {
|
||||
path: &str,
|
||||
method: http::Method,
|
||||
params: &HashMap<String, String>,
|
||||
) -> crate::Result<Response<String>> {
|
||||
) -> crate::Result<http::Response<String>> {
|
||||
match method {
|
||||
http::Method::GET => {
|
||||
if path.ends_with(STATUS_SUFFIX) {
|
||||
|
||||
@@ -15,6 +15,9 @@
|
||||
use std::collections::HashMap;
|
||||
use std::time::Duration;
|
||||
|
||||
use axum::extract::State;
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use axum::Json;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::ResultExt;
|
||||
use tonic::codegen::http;
|
||||
@@ -23,6 +26,7 @@ use crate::cluster::MetaPeerClientRef;
|
||||
use crate::error::{self, Result};
|
||||
use crate::key::{DatanodeLeaseKey, LeaseValue};
|
||||
use crate::lease;
|
||||
use crate::service::admin::util::ErrorHandler;
|
||||
use crate::service::admin::HttpHandler;
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -30,14 +34,8 @@ pub struct NodeLeaseHandler {
|
||||
pub meta_peer_client: MetaPeerClientRef,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl HttpHandler for NodeLeaseHandler {
|
||||
async fn handle(
|
||||
&self,
|
||||
_: &str,
|
||||
_: http::Method,
|
||||
_: &HashMap<String, String>,
|
||||
) -> Result<http::Response<String>> {
|
||||
impl NodeLeaseHandler {
|
||||
async fn get_node_lease(&self) -> Result<LeaseValues> {
|
||||
let leases =
|
||||
lease::alive_datanodes(&self.meta_peer_client, Duration::from_secs(u64::MAX)).await?;
|
||||
let leases = leases
|
||||
@@ -49,7 +47,30 @@ impl HttpHandler for NodeLeaseHandler {
|
||||
lease: v,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let result = LeaseValues { leases }.try_into()?;
|
||||
Ok(LeaseValues { leases })
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the node lease handler.
|
||||
#[axum_macros::debug_handler]
|
||||
pub(crate) async fn get(State(handler): State<NodeLeaseHandler>) -> Response {
|
||||
handler
|
||||
.get_node_lease()
|
||||
.await
|
||||
.map_err(ErrorHandler::new)
|
||||
.map(Json)
|
||||
.into_response()
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl HttpHandler for NodeLeaseHandler {
|
||||
async fn handle(
|
||||
&self,
|
||||
_: &str,
|
||||
_: http::Method,
|
||||
_: &HashMap<String, String>,
|
||||
) -> Result<http::Response<String>> {
|
||||
let result = self.get_node_lease().await?.try_into()?;
|
||||
|
||||
http::Response::builder()
|
||||
.status(http::StatusCode::OK)
|
||||
|
||||
@@ -14,15 +14,17 @@
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use axum::extract::State;
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use axum::Json;
|
||||
use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
|
||||
use common_telemetry::info;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::ResultExt;
|
||||
use tonic::codegen::http;
|
||||
use tonic::codegen::http::Response;
|
||||
|
||||
use crate::error::RuntimeSwitchManagerSnafu;
|
||||
use crate::service::admin::util::{to_json_response, to_not_found_response};
|
||||
use crate::service::admin::util::{to_json_response, to_not_found_response, ErrorHandler};
|
||||
use crate::service::admin::HttpHandler;
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -42,6 +44,39 @@ enum ProcedureManagerStatus {
|
||||
Running,
|
||||
}
|
||||
|
||||
/// Get the procedure manager status.
|
||||
#[axum_macros::debug_handler]
|
||||
pub(crate) async fn status(State(handler): State<ProcedureManagerHandler>) -> Response {
|
||||
handler
|
||||
.get_procedure_manager_status()
|
||||
.await
|
||||
.map(Json)
|
||||
.map_err(ErrorHandler::new)
|
||||
.into_response()
|
||||
}
|
||||
|
||||
/// Pause the procedure manager.
|
||||
#[axum_macros::debug_handler]
|
||||
pub(crate) async fn pause(State(handler): State<ProcedureManagerHandler>) -> Response {
|
||||
handler
|
||||
.pause_procedure_manager()
|
||||
.await
|
||||
.map(Json)
|
||||
.map_err(ErrorHandler::new)
|
||||
.into_response()
|
||||
}
|
||||
|
||||
/// Resume the procedure manager.
|
||||
#[axum_macros::debug_handler]
|
||||
pub(crate) async fn resume(State(handler): State<ProcedureManagerHandler>) -> Response {
|
||||
handler
|
||||
.resume_procedure_manager()
|
||||
.await
|
||||
.map(Json)
|
||||
.map_err(ErrorHandler::new)
|
||||
.into_response()
|
||||
}
|
||||
|
||||
impl ProcedureManagerHandler {
|
||||
pub(crate) async fn pause_procedure_manager(
|
||||
&self,
|
||||
@@ -98,7 +133,7 @@ impl HttpHandler for ProcedureManagerHandler {
|
||||
path: &str,
|
||||
method: http::Method,
|
||||
_: &HashMap<String, String>,
|
||||
) -> crate::Result<Response<String>> {
|
||||
) -> crate::Result<http::Response<String>> {
|
||||
match method {
|
||||
http::Method::GET => {
|
||||
if path.ends_with("status") {
|
||||
|
||||
@@ -12,18 +12,15 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::extract::State;
|
||||
use axum::http::StatusCode;
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use axum::Json;
|
||||
use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use servers::http::result::error_result::ErrorResponse;
|
||||
|
||||
pub(crate) type RecoveryHandlerRef = Arc<RecoveryHandler>;
|
||||
use crate::service::admin::util::ErrorHandler;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct RecoveryHandler {
|
||||
pub(crate) manager: RuntimeSwitchManagerRef,
|
||||
}
|
||||
@@ -35,29 +32,36 @@ pub(crate) struct RecoveryResponse {
|
||||
|
||||
/// Get the recovery mode.
|
||||
#[axum_macros::debug_handler]
|
||||
pub(crate) async fn get_recovery_mode(State(handler): State<RecoveryHandlerRef>) -> Response {
|
||||
let enabled = handler.manager.recovery_mode().await;
|
||||
|
||||
match enabled {
|
||||
Ok(enabled) => (StatusCode::OK, Json(RecoveryResponse { enabled })).into_response(),
|
||||
Err(e) => ErrorResponse::from_error(e).into_response(),
|
||||
}
|
||||
pub(crate) async fn status(State(handler): State<RecoveryHandler>) -> Response {
|
||||
handler
|
||||
.manager
|
||||
.recovery_mode()
|
||||
.await
|
||||
.map(|enabled| Json(RecoveryResponse { enabled }))
|
||||
.map_err(ErrorHandler::new)
|
||||
.into_response()
|
||||
}
|
||||
|
||||
/// Set the recovery mode.
|
||||
#[axum_macros::debug_handler]
|
||||
pub(crate) async fn set_recovery_mode(State(handler): State<RecoveryHandlerRef>) -> Response {
|
||||
match handler.manager.set_recovery_mode().await {
|
||||
Ok(_) => (StatusCode::OK, Json(RecoveryResponse { enabled: true })).into_response(),
|
||||
Err(e) => ErrorResponse::from_error(e).into_response(),
|
||||
}
|
||||
pub(crate) async fn set(State(handler): State<RecoveryHandler>) -> Response {
|
||||
handler
|
||||
.manager
|
||||
.set_recovery_mode()
|
||||
.await
|
||||
.map(|_| Json(RecoveryResponse { enabled: true }))
|
||||
.map_err(ErrorHandler::new)
|
||||
.into_response()
|
||||
}
|
||||
|
||||
/// Unset the recovery mode.
|
||||
#[axum_macros::debug_handler]
|
||||
pub(crate) async fn unset_recovery_mode(State(handler): State<RecoveryHandlerRef>) -> Response {
|
||||
match handler.manager.unset_recovery_mode().await {
|
||||
Ok(_) => (StatusCode::OK, Json(RecoveryResponse { enabled: false })).into_response(),
|
||||
Err(e) => ErrorResponse::from_error(e).into_response(),
|
||||
}
|
||||
pub(crate) async fn unset(State(handler): State<RecoveryHandler>) -> Response {
|
||||
handler
|
||||
.manager
|
||||
.unset_recovery_mode()
|
||||
.await
|
||||
.map(|_| Json(RecoveryResponse { enabled: false }))
|
||||
.map_err(ErrorHandler::new)
|
||||
.into_response()
|
||||
}
|
||||
|
||||
@@ -12,8 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::extract::{self, State};
|
||||
use axum::http::StatusCode;
|
||||
use axum::response::{IntoResponse, Response};
|
||||
@@ -28,8 +26,6 @@ use crate::error::{
|
||||
PeekSequenceSnafu, Result, RuntimeSwitchManagerSnafu, SetNextSequenceSnafu, UnexpectedSnafu,
|
||||
};
|
||||
|
||||
pub type TableIdSequenceHandlerRef = Arc<TableIdSequenceHandler>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct TableIdSequenceHandler {
|
||||
pub(crate) table_id_sequence: SequenceRef,
|
||||
@@ -77,7 +73,7 @@ pub(crate) struct ResetTableIdRequest {
|
||||
/// Set the next table id.
|
||||
#[axum_macros::debug_handler]
|
||||
pub(crate) async fn set_next_table_id(
|
||||
State(handler): State<TableIdSequenceHandlerRef>,
|
||||
State(handler): State<TableIdSequenceHandler>,
|
||||
extract::Json(ResetTableIdRequest { next_table_id }): extract::Json<ResetTableIdRequest>,
|
||||
) -> Response {
|
||||
match handler.set_next_table_id(next_table_id).await {
|
||||
@@ -88,9 +84,7 @@ pub(crate) async fn set_next_table_id(
|
||||
|
||||
/// Get the next table id without incrementing the sequence.
|
||||
#[axum_macros::debug_handler]
|
||||
pub(crate) async fn get_next_table_id(
|
||||
State(handler): State<TableIdSequenceHandlerRef>,
|
||||
) -> Response {
|
||||
pub(crate) async fn get_next_table_id(State(handler): State<TableIdSequenceHandler>) -> Response {
|
||||
match handler.peek_table_id().await {
|
||||
Ok(next_table_id) => {
|
||||
(StatusCode::OK, Json(NextTableIdResponse { next_table_id })).into_response()
|
||||
|
||||
@@ -16,7 +16,6 @@ use std::fmt::Debug;
|
||||
|
||||
use axum::http::StatusCode;
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use axum::Json;
|
||||
use serde::Serialize;
|
||||
use snafu::ResultExt;
|
||||
use tonic::codegen::http;
|
||||
@@ -47,16 +46,6 @@ where
|
||||
.context(error::InvalidHttpBodySnafu)
|
||||
}
|
||||
|
||||
/// Converts any serializable type to an Axum JSON response with status 200.
|
||||
pub fn to_axum_json_response<T: Serialize>(value: T) -> Response {
|
||||
(StatusCode::OK, Json(value)).into_response()
|
||||
}
|
||||
|
||||
/// Returns a 404 response with an empty body.
|
||||
pub fn to_axum_not_found_response() -> Response {
|
||||
(StatusCode::NOT_FOUND, "").into_response()
|
||||
}
|
||||
|
||||
/// Returns a 404 response with an empty body.
|
||||
pub fn to_not_found_response() -> Result<http::Response<String>> {
|
||||
http::Response::builder()
|
||||
@@ -64,3 +53,26 @@ pub fn to_not_found_response() -> Result<http::Response<String>> {
|
||||
.body("".to_string())
|
||||
.context(error::InvalidHttpBodySnafu)
|
||||
}
|
||||
|
||||
/// A wrapper for error handling in admin services.
|
||||
pub(crate) struct ErrorHandler<E>(E)
|
||||
where
|
||||
E: std::error::Error + Send + Sync + Sized;
|
||||
|
||||
impl<E> ErrorHandler<E>
|
||||
where
|
||||
E: std::error::Error + Send + Sync + Sized,
|
||||
{
|
||||
pub(crate) fn new(error: E) -> Self {
|
||||
Self(error)
|
||||
}
|
||||
}
|
||||
|
||||
impl<E> IntoResponse for ErrorHandler<E>
|
||||
where
|
||||
E: std::error::Error + Send + Sync + Sized,
|
||||
{
|
||||
fn into_response(self) -> Response {
|
||||
(StatusCode::INTERNAL_SERVER_ERROR, self.0.to_string()).into_response()
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user