From 8a7998cd25d3a0f0ed2bd42068d7c7261ecf6e57 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Thu, 25 May 2023 16:31:48 +0800 Subject: [PATCH] feat(servers): Add metrics based on axum's example (#1638) Log on error --- src/servers/src/error.rs | 7 +++++- src/servers/src/http.rs | 47 +++++++++++++++++++++++++++++++++++--- src/servers/src/metrics.rs | 6 +++++ src/servers/src/prom.rs | 6 ++++- 4 files changed, 61 insertions(+), 5 deletions(-) diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 00feeaa5a7..f7e9b04402 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -22,6 +22,7 @@ use axum::{http, Json}; use base64::DecodeError; use catalog; use common_error::prelude::*; +use common_telemetry::logging; use query::parser::PromQuery; use serde_json::json; use snafu::Location; @@ -367,7 +368,11 @@ impl IntoResponse for Error { | Error::InvalidPromRemoteRequest { .. } | Error::InvalidQuery { .. } | Error::TimePrecision { .. } => (HttpStatusCode::BAD_REQUEST, self.to_string()), - _ => (HttpStatusCode::INTERNAL_SERVER_ERROR, self.to_string()), + _ => { + logging::error!(self; "Failed to handle HTTP request"); + + (HttpStatusCode::INTERNAL_SERVER_ERROR, self.to_string()) + } }; let body = Json(json!({ "error": error_message, diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index dc48d28d0f..a4efd8b751 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -27,20 +27,23 @@ pub mod mem_prof; use std::net::SocketAddr; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use aide::axum::{routing as apirouting, ApiRouter, IntoApiResponse}; use aide::openapi::{Info, OpenApi, Server as OpenAPIServer}; use async_trait::async_trait; use axum::body::BoxBody; use axum::error_handling::HandleErrorLayer; -use axum::response::{Html, Json}; +use axum::extract::MatchedPath; +use axum::http::Request; +use axum::middleware::{self, Next}; +use axum::response::{Html, IntoResponse, Json}; use axum::{routing, BoxError, Extension, Router}; use common_error::prelude::ErrorExt; use common_error::status_code::StatusCode; use common_query::Output; use common_recordbatch::{util, RecordBatch}; -use common_telemetry::logging::info; +use common_telemetry::logging::{self, info}; use datatypes::data_type::DataType; use futures::FutureExt; use schemars::JsonSchema; @@ -61,6 +64,10 @@ use crate::auth::UserProviderRef; use crate::configurator::ConfiguratorRef; use crate::error::{AlreadyStartedSnafu, Result, StartHttpSnafu}; use crate::http::admin::flush; +use crate::metrics::{ + METRIC_HTTP_REQUESTS_ELAPSED, METRIC_HTTP_REQUESTS_TOTAL, METRIC_METHOD_LABEL, + METRIC_PATH_LABEL, METRIC_STATUS_LABEL, +}; use crate::metrics_handler::MetricsHandler; use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; use crate::query_handler::sql::ServerSqlQueryHandlerRef; @@ -529,6 +536,10 @@ impl HttpServer { ); } } + + // Add a layer to collect HTTP metrics for axum. + router = router.route_layer(middleware::from_fn(track_metrics)); + router } @@ -600,6 +611,34 @@ impl HttpServer { } } +/// A middleware to record metrics for HTTP. +// Based on https://github.com/tokio-rs/axum/blob/axum-v0.6.16/examples/prometheus-metrics/src/main.rs +pub(crate) async fn track_metrics(req: Request, next: Next) -> impl IntoResponse { + let start = Instant::now(); + let path = if let Some(matched_path) = req.extensions().get::() { + matched_path.as_str().to_owned() + } else { + req.uri().path().to_owned() + }; + let method = req.method().clone(); + + let response = next.run(req).await; + + let latency = start.elapsed().as_secs_f64(); + let status = response.status().as_u16().to_string(); + + let labels = [ + (METRIC_METHOD_LABEL, method.to_string()), + (METRIC_PATH_LABEL, path), + (METRIC_STATUS_LABEL, status), + ]; + + metrics::increment_counter!(METRIC_HTTP_REQUESTS_TOTAL, &labels); + metrics::histogram!(METRIC_HTTP_REQUESTS_ELAPSED, latency, &labels); + + response +} + pub const HTTP_SERVER: &str = "HTTP_SERVER"; #[async_trait] @@ -652,6 +691,8 @@ impl Server for HttpServer { /// handle error middleware async fn handle_error(err: BoxError) -> Json { + logging::error!("Unhandled internal error: {}", err); + Json(JsonResponse::with_error( format!("Unhandled internal error: {err}"), StatusCode::Unexpected, diff --git a/src/servers/src/metrics.rs b/src/servers/src/metrics.rs index 0542d724ef..69f26f3a77 100644 --- a/src/servers/src/metrics.rs +++ b/src/servers/src/metrics.rs @@ -41,3 +41,9 @@ pub(crate) const METRIC_POSTGRES_PREPARED_COUNT: &str = "servers.postgres_prepar pub(crate) const METRIC_SERVER_GRPC_DB_REQUEST_TIMER: &str = "servers.grpc.db_request_elapsed"; pub(crate) const METRIC_SERVER_GRPC_PROM_REQUEST_TIMER: &str = "servers.grpc.prom_request_elapsed"; + +pub(crate) const METRIC_HTTP_REQUESTS_TOTAL: &str = "servers.http_requests_total"; +pub(crate) const METRIC_HTTP_REQUESTS_ELAPSED: &str = "servers.http_requests_elapsed"; +pub(crate) const METRIC_METHOD_LABEL: &str = "method"; +pub(crate) const METRIC_PATH_LABEL: &str = "path"; +pub(crate) const METRIC_STATUS_LABEL: &str = "status"; diff --git a/src/servers/src/prom.rs b/src/servers/src/prom.rs index 4669a36fe2..1a1c835bec 100644 --- a/src/servers/src/prom.rs +++ b/src/servers/src/prom.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use async_trait::async_trait; use axum::body::BoxBody; use axum::extract::{Path, Query, State}; -use axum::{routing, Form, Json, Router}; +use axum::{middleware, routing, Form, Json, Router}; use common_catalog::consts::DEFAULT_SCHEMA_NAME; use common_error::prelude::ErrorExt; use common_error::status_code::StatusCode; @@ -56,6 +56,7 @@ use crate::error::{ StartHttpSnafu, UnexpectedResultSnafu, }; use crate::http::authorize::HttpAuth; +use crate::http::track_metrics; use crate::prometheus::{FIELD_COLUMN_NAME, TIMESTAMP_COLUMN_NAME}; use crate::server::Server; @@ -114,6 +115,9 @@ impl PromServer { HttpAuth::::new(self.user_provider.clone()), )), ) + // We need to register the metrics layer again since start a new http server + // for the PromServer. + .route_layer(middleware::from_fn(track_metrics)) } }