From 563ce59071e80fff7e21f2e7269169debb5174cc Mon Sep 17 00:00:00 2001 From: Yingwen Date: Tue, 30 May 2023 09:51:08 +0800 Subject: [PATCH] feat: Add request type and result code to grpc metrics (#1664) --- src/api/src/helper.rs | 36 +++++++++++++ src/common/error/src/status_code.rs | 4 ++ src/servers/src/error.rs | 48 ++++++++++++++++- src/servers/src/grpc/handler.rs | 84 ++++++++++++++++++++--------- src/servers/src/http.rs | 1 + src/servers/src/metrics.rs | 67 +++++++++++++++++++++++ 6 files changed, 214 insertions(+), 26 deletions(-) diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 4409801edc..a48be91ab0 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -18,6 +18,10 @@ use datatypes::prelude::ConcreteDataType; use datatypes::types::TimestampType; use datatypes::value::Value; use datatypes::vectors::VectorRef; +use greptime_proto::v1::ddl_request::Expr; +use greptime_proto::v1::greptime_request::Request; +use greptime_proto::v1::query_request::Query; +use greptime_proto::v1::{DdlRequest, QueryRequest}; use snafu::prelude::*; use crate::error::{self, Result}; @@ -224,6 +228,38 @@ pub fn push_vals(column: &mut Column, origin_count: usize, vector: VectorRef) { column.null_mask = null_mask.into_vec(); } +/// Returns the type name of the [Request]. +pub fn request_type(request: &Request) -> &'static str { + match request { + Request::Insert(_) => "insert", + Request::Query(query_req) => query_request_type(query_req), + Request::Ddl(ddl_req) => ddl_request_type(ddl_req), + Request::Delete(_) => "delete", + } +} + +/// Returns the type name of the [QueryRequest]. +fn query_request_type(request: &QueryRequest) -> &'static str { + match request.query { + Some(Query::Sql(_)) => "query.sql", + Some(Query::LogicalPlan(_)) => "query.logical_plan", + Some(Query::PromRangeQuery(_)) => "query.prom_range", + None => "query.empty", + } +} + +/// Returns the type name of the [DdlRequest]. +fn ddl_request_type(request: &DdlRequest) -> &'static str { + match request.expr { + Some(Expr::CreateDatabase(_)) => "ddl.create_database", + Some(Expr::CreateTable(_)) => "ddl.create_table", + Some(Expr::Alter(_)) => "ddl.alter", + Some(Expr::DropTable(_)) => "ddl.drop_table", + Some(Expr::FlushTable(_)) => "ddl.flush_table", + None => "ddl.empty", + } +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/src/common/error/src/status_code.rs b/src/common/error/src/status_code.rs index 747514ba44..2798dd595b 100644 --- a/src/common/error/src/status_code.rs +++ b/src/common/error/src/status_code.rs @@ -33,6 +33,8 @@ pub enum StatusCode { Internal = 1003, /// Invalid arguments. InvalidArguments = 1004, + /// The task is cancelled. + Cancelled = 1005, // ====== End of common status code ================ // ====== Begin of SQL related status code ========= @@ -100,6 +102,7 @@ impl StatusCode { | StatusCode::Unsupported | StatusCode::Unexpected | StatusCode::InvalidArguments + | StatusCode::Cancelled | StatusCode::InvalidSyntax | StatusCode::PlanQuery | StatusCode::EngineExecuteQuery @@ -125,6 +128,7 @@ impl StatusCode { | StatusCode::Unsupported | StatusCode::Unexpected | StatusCode::Internal + | StatusCode::Cancelled | StatusCode::PlanQuery | StatusCode::EngineExecuteQuery | StatusCode::StorageUnavailable diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index f7e9b04402..247dbd3fcb 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -254,6 +254,12 @@ pub enum Error { source: BoxedError, location: Location, }, + + #[snafu(display("Failed to join task, source: {}", source))] + JoinTask { + source: tokio::task::JoinError, + location: Location, + }, } pub type Result = std::result::Result; @@ -317,6 +323,16 @@ impl ErrorExt for Error { Other { source, .. } => source.status_code(), UnexpectedResult { .. } => StatusCode::Unexpected, + + JoinTask { source, .. } => { + if source.is_cancelled() { + StatusCode::Cancelled + } else if source.is_panic() { + StatusCode::Unexpected + } else { + StatusCode::Unknown + } + } } } @@ -325,13 +341,41 @@ impl ErrorExt for Error { } } +/// Returns the tonic [Code] of a [StatusCode]. +fn status_to_tonic_code(status_code: StatusCode) -> Code { + match status_code { + StatusCode::Success => Code::Ok, + StatusCode::Unknown => Code::Unknown, + StatusCode::Unsupported => Code::Unimplemented, + StatusCode::Unexpected + | StatusCode::Internal + | StatusCode::PlanQuery + | StatusCode::EngineExecuteQuery => Code::Internal, + StatusCode::InvalidArguments | StatusCode::InvalidSyntax => Code::InvalidArgument, + StatusCode::Cancelled => Code::Cancelled, + StatusCode::TableAlreadyExists | StatusCode::TableColumnExists => Code::AlreadyExists, + StatusCode::TableNotFound + | StatusCode::TableColumnNotFound + | StatusCode::DatabaseNotFound + | StatusCode::UserNotFound => Code::NotFound, + StatusCode::StorageUnavailable => Code::Unavailable, + StatusCode::RuntimeResourcesExhausted => Code::ResourceExhausted, + StatusCode::UnsupportedPasswordType + | StatusCode::UserPasswordMismatch + | StatusCode::AuthHeaderNotFound + | StatusCode::InvalidAuthHeader => Code::Unauthenticated, + StatusCode::AccessDenied => Code::PermissionDenied, + } +} + impl From for tonic::Status { fn from(err: Error) -> Self { let mut headers = HeaderMap::::with_capacity(2); // If either of the status_code or error msg cannot convert to valid HTTP header value // (which is a very rare case), just ignore. Client will use Tonic status code and message. - if let Ok(code) = HeaderValue::from_bytes(err.status_code().to_string().as_bytes()) { + let status_code = err.status_code(); + if let Ok(code) = HeaderValue::from_bytes(status_code.to_string().as_bytes()) { headers.insert(INNER_ERROR_CODE, code); } let root_error = err.iter_chain().last().unwrap(); @@ -340,7 +384,7 @@ impl From for tonic::Status { } let metadata = MetadataMap::from_headers(headers); - tonic::Status::with_metadata(Code::Internal, err.to_string(), metadata) + tonic::Status::with_metadata(status_to_tonic_code(status_code), err.to_string(), metadata) } } diff --git a/src/servers/src/grpc/handler.rs b/src/servers/src/grpc/handler.rs index 88c1bbe15b..c01535d489 100644 --- a/src/servers/src/grpc/handler.rs +++ b/src/servers/src/grpc/handler.rs @@ -13,22 +13,29 @@ // limitations under the License. use std::sync::Arc; +use std::time::Instant; +use api::helper::request_type; use api::v1::auth_header::AuthScheme; use api::v1::{Basic, GreptimeRequest, RequestHeader}; use common_error::prelude::ErrorExt; +use common_error::status_code::StatusCode; use common_query::Output; use common_runtime::Runtime; -use common_telemetry::{logging, timer}; -use metrics::increment_counter; +use common_telemetry::logging; +use metrics::{histogram, increment_counter}; use session::context::{QueryContext, QueryContextRef}; -use snafu::OptionExt; +use snafu::{OptionExt, ResultExt}; use tonic::Status; use crate::auth::{Identity, Password, UserProviderRef}; use crate::error::Error::{Auth, UnsupportedAuthScheme}; -use crate::error::{InvalidQuerySnafu, NotFoundAuthHeaderSnafu}; +use crate::error::{InvalidQuerySnafu, JoinTaskSnafu, NotFoundAuthHeaderSnafu}; use crate::grpc::TonicResult; +use crate::metrics::{ + METRIC_AUTH_FAILURE, METRIC_CODE_LABEL, METRIC_SERVER_GRPC_DB_REQUEST_TIMER, + METRIC_STATUS_LABEL, METRIC_TYPE_LABEL, +}; use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; pub struct GreptimeRequestHandler { @@ -60,11 +67,10 @@ impl GreptimeRequestHandler { self.auth(header, &query_ctx).await?; - let _timer = timer!( - crate::metrics::METRIC_SERVER_GRPC_DB_REQUEST_TIMER, - &[(crate::metrics::METRIC_DB_LABEL, query_ctx.get_db_string())] - ); let handler = self.handler.clone(); + let request_type = request_type(&query); + let db = query_ctx.get_db_string(); + let timer = RequestTimer::new(db.clone(), request_type); // Executes requests in another runtime to // 1. prevent the execution from being cancelled unexpected by Tonic runtime; @@ -85,17 +91,9 @@ impl GreptimeRequestHandler { }) }); - let output = handle.await.map_err(|e| { - // logs the runtime join error. - logging::error!("Failed to join handle, err: {}", e); - - if e.is_cancelled() { - Status::cancelled(e.to_string()) - } else if e.is_panic() { - Status::internal(format!("{:?}", e.into_panic())) - } else { - Status::unknown(e.to_string()) - } + let output = handle.await.context(JoinTaskSnafu).map_err(|e| { + timer.record(e.status_code()); + e })??; Ok(output) } @@ -132,11 +130,8 @@ impl GreptimeRequestHandler { } .map_err(|e| { increment_counter!( - crate::metrics::METRIC_AUTH_FAILURE, - &[( - crate::metrics::METRIC_CODE_LABEL, - format!("{}", e.status_code()) - )] + METRIC_AUTH_FAILURE, + &[(METRIC_CODE_LABEL, format!("{}", e.status_code()))] ); Status::unauthenticated(e.to_string()) })?; @@ -165,3 +160,44 @@ pub(crate) fn create_query_context(header: Option<&RequestHeader>) -> QueryConte }; ctx } + +/// Histogram timer for handling gRPC request. +/// +/// The timer records the elapsed time with [StatusCode::Success] on drop. +struct RequestTimer { + start: Instant, + db: String, + request_type: &'static str, + status_code: StatusCode, +} + +impl RequestTimer { + /// Returns a new timer. + fn new(db: String, request_type: &'static str) -> RequestTimer { + RequestTimer { + start: Instant::now(), + db, + request_type, + status_code: StatusCode::Success, + } + } + + /// Consumes the timer and record the elapsed time with specific `status_code`. + fn record(mut self, status_code: StatusCode) { + self.status_code = status_code; + } +} + +impl Drop for RequestTimer { + fn drop(&mut self) { + histogram!( + METRIC_SERVER_GRPC_DB_REQUEST_TIMER, + self.start.elapsed(), + &[ + (METRIC_CODE_LABEL, std::mem::take(&mut self.db)), + (METRIC_TYPE_LABEL, self.request_type.to_string()), + (METRIC_STATUS_LABEL, self.status_code.to_string()) + ] + ); + } +} diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index a4efd8b751..2674e25c48 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -614,6 +614,7 @@ impl HttpServer { /// A middleware to record metrics for HTTP. // Based on https://github.com/tokio-rs/axum/blob/axum-v0.6.16/examples/prometheus-metrics/src/main.rs pub(crate) async fn track_metrics(req: Request, next: Next) -> impl IntoResponse { + let _timer = common_telemetry::timer!("http_track_metrics", &[("tag", "value")]); let start = Instant::now(); let path = if let Some(matched_path) = req.extensions().get::() { matched_path.as_str().to_owned() diff --git a/src/servers/src/metrics.rs b/src/servers/src/metrics.rs index b0f171e5c5..2ea3d0f3e7 100644 --- a/src/servers/src/metrics.rs +++ b/src/servers/src/metrics.rs @@ -12,11 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::task::{Context, Poll}; +use std::time::Instant; + +use hyper::Body; use metrics_process::Collector; use once_cell::sync::Lazy; +use tonic::body::BoxBody; +use tower::{Layer, Service}; pub(crate) const METRIC_DB_LABEL: &str = "db"; pub(crate) const METRIC_CODE_LABEL: &str = "code"; +pub(crate) const METRIC_TYPE_LABEL: &str = "type"; pub(crate) const METRIC_HTTP_SQL_ELAPSED: &str = "servers.http_sql_elapsed"; pub(crate) const METRIC_HTTP_PROMQL_ELAPSED: &str = "servers.http_promql_elapsed"; @@ -47,6 +54,8 @@ pub(crate) const METRIC_SERVER_GRPC_PROM_REQUEST_TIMER: &str = "servers.grpc.pro pub(crate) const METRIC_HTTP_REQUESTS_TOTAL: &str = "servers.http_requests_total"; pub(crate) const METRIC_HTTP_REQUESTS_ELAPSED: &str = "servers.http_requests_elapsed"; +pub(crate) const METRIC_GRPC_REQUESTS_TOTAL: &str = "servers.grpc_requests_total"; +pub(crate) const METRIC_GRPC_REQUESTS_ELAPSED: &str = "servers.grpc_requests_elapsed"; pub(crate) const METRIC_METHOD_LABEL: &str = "method"; pub(crate) const METRIC_PATH_LABEL: &str = "path"; pub(crate) const METRIC_STATUS_LABEL: &str = "status"; @@ -58,3 +67,61 @@ pub(crate) static PROCESS_COLLECTOR: Lazy = Lazy::new(|| { collector.describe(); collector }); + +// Based on https://github.com/hyperium/tonic/blob/master/examples/src/tower/server.rs +// See https://github.com/hyperium/tonic/issues/242 +/// A metrics middleware. +#[derive(Debug, Clone, Default)] +pub(crate) struct MetricsMiddlewareLayer; + +impl Layer for MetricsMiddlewareLayer { + type Service = MetricsMiddleware; + + fn layer(&self, service: S) -> Self::Service { + MetricsMiddleware { inner: service } + } +} + +#[derive(Debug, Clone)] +pub(crate) struct MetricsMiddleware { + inner: S, +} + +impl Service> for MetricsMiddleware +where + S: Service, Response = hyper::Response> + Clone + Send + 'static, + S::Future: Send + 'static, +{ + type Response = S::Response; + type Error = S::Error; + type Future = futures::future::BoxFuture<'static, Result>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: hyper::Request) -> Self::Future { + // This is necessary because tonic internally uses `tower::buffer::Buffer`. + // See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149 + // for details on why this is necessary + let clone = self.inner.clone(); + let mut inner = std::mem::replace(&mut self.inner, clone); + + Box::pin(async move { + let start = Instant::now(); + let path = req.uri().path().to_string(); + + // Do extra async work here... + let response = inner.call(req).await?; + + let latency = start.elapsed().as_secs_f64(); + let status = response.status().as_u16().to_string(); + + let labels = [(METRIC_PATH_LABEL, path), (METRIC_STATUS_LABEL, status)]; + metrics::increment_counter!(METRIC_GRPC_REQUESTS_TOTAL, &labels); + metrics::histogram!(METRIC_GRPC_REQUESTS_ELAPSED, latency, &labels); + + Ok(response) + }) + } +}