feat: Add request type and result code to grpc metrics (#1664)

This commit is contained in:
Yingwen
2023-05-30 09:51:08 +08:00
committed by GitHub
parent 51b23664f7
commit 563ce59071
6 changed files with 214 additions and 26 deletions

View File

@@ -18,6 +18,10 @@ use datatypes::prelude::ConcreteDataType;
use datatypes::types::TimestampType;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use greptime_proto::v1::ddl_request::Expr;
use greptime_proto::v1::greptime_request::Request;
use greptime_proto::v1::query_request::Query;
use greptime_proto::v1::{DdlRequest, QueryRequest};
use snafu::prelude::*;
use crate::error::{self, Result};
@@ -224,6 +228,38 @@ pub fn push_vals(column: &mut Column, origin_count: usize, vector: VectorRef) {
column.null_mask = null_mask.into_vec();
}
/// Returns the type name of the [Request].
pub fn request_type(request: &Request) -> &'static str {
match request {
Request::Insert(_) => "insert",
Request::Query(query_req) => query_request_type(query_req),
Request::Ddl(ddl_req) => ddl_request_type(ddl_req),
Request::Delete(_) => "delete",
}
}
/// Returns the type name of the [QueryRequest].
fn query_request_type(request: &QueryRequest) -> &'static str {
match request.query {
Some(Query::Sql(_)) => "query.sql",
Some(Query::LogicalPlan(_)) => "query.logical_plan",
Some(Query::PromRangeQuery(_)) => "query.prom_range",
None => "query.empty",
}
}
/// Returns the type name of the [DdlRequest].
fn ddl_request_type(request: &DdlRequest) -> &'static str {
match request.expr {
Some(Expr::CreateDatabase(_)) => "ddl.create_database",
Some(Expr::CreateTable(_)) => "ddl.create_table",
Some(Expr::Alter(_)) => "ddl.alter",
Some(Expr::DropTable(_)) => "ddl.drop_table",
Some(Expr::FlushTable(_)) => "ddl.flush_table",
None => "ddl.empty",
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;

View File

@@ -33,6 +33,8 @@ pub enum StatusCode {
Internal = 1003,
/// Invalid arguments.
InvalidArguments = 1004,
/// The task is cancelled.
Cancelled = 1005,
// ====== End of common status code ================
// ====== Begin of SQL related status code =========
@@ -100,6 +102,7 @@ impl StatusCode {
| StatusCode::Unsupported
| StatusCode::Unexpected
| StatusCode::InvalidArguments
| StatusCode::Cancelled
| StatusCode::InvalidSyntax
| StatusCode::PlanQuery
| StatusCode::EngineExecuteQuery
@@ -125,6 +128,7 @@ impl StatusCode {
| StatusCode::Unsupported
| StatusCode::Unexpected
| StatusCode::Internal
| StatusCode::Cancelled
| StatusCode::PlanQuery
| StatusCode::EngineExecuteQuery
| StatusCode::StorageUnavailable

View File

@@ -254,6 +254,12 @@ pub enum Error {
source: BoxedError,
location: Location,
},
#[snafu(display("Failed to join task, source: {}", source))]
JoinTask {
source: tokio::task::JoinError,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -317,6 +323,16 @@ impl ErrorExt for Error {
Other { source, .. } => source.status_code(),
UnexpectedResult { .. } => StatusCode::Unexpected,
JoinTask { source, .. } => {
if source.is_cancelled() {
StatusCode::Cancelled
} else if source.is_panic() {
StatusCode::Unexpected
} else {
StatusCode::Unknown
}
}
}
}
@@ -325,13 +341,41 @@ impl ErrorExt for Error {
}
}
/// Returns the tonic [Code] of a [StatusCode].
fn status_to_tonic_code(status_code: StatusCode) -> Code {
match status_code {
StatusCode::Success => Code::Ok,
StatusCode::Unknown => Code::Unknown,
StatusCode::Unsupported => Code::Unimplemented,
StatusCode::Unexpected
| StatusCode::Internal
| StatusCode::PlanQuery
| StatusCode::EngineExecuteQuery => Code::Internal,
StatusCode::InvalidArguments | StatusCode::InvalidSyntax => Code::InvalidArgument,
StatusCode::Cancelled => Code::Cancelled,
StatusCode::TableAlreadyExists | StatusCode::TableColumnExists => Code::AlreadyExists,
StatusCode::TableNotFound
| StatusCode::TableColumnNotFound
| StatusCode::DatabaseNotFound
| StatusCode::UserNotFound => Code::NotFound,
StatusCode::StorageUnavailable => Code::Unavailable,
StatusCode::RuntimeResourcesExhausted => Code::ResourceExhausted,
StatusCode::UnsupportedPasswordType
| StatusCode::UserPasswordMismatch
| StatusCode::AuthHeaderNotFound
| StatusCode::InvalidAuthHeader => Code::Unauthenticated,
StatusCode::AccessDenied => Code::PermissionDenied,
}
}
impl From<Error> for tonic::Status {
fn from(err: Error) -> Self {
let mut headers = HeaderMap::<HeaderValue>::with_capacity(2);
// If either of the status_code or error msg cannot convert to valid HTTP header value
// (which is a very rare case), just ignore. Client will use Tonic status code and message.
if let Ok(code) = HeaderValue::from_bytes(err.status_code().to_string().as_bytes()) {
let status_code = err.status_code();
if let Ok(code) = HeaderValue::from_bytes(status_code.to_string().as_bytes()) {
headers.insert(INNER_ERROR_CODE, code);
}
let root_error = err.iter_chain().last().unwrap();
@@ -340,7 +384,7 @@ impl From<Error> for tonic::Status {
}
let metadata = MetadataMap::from_headers(headers);
tonic::Status::with_metadata(Code::Internal, err.to_string(), metadata)
tonic::Status::with_metadata(status_to_tonic_code(status_code), err.to_string(), metadata)
}
}

View File

@@ -13,22 +13,29 @@
// limitations under the License.
use std::sync::Arc;
use std::time::Instant;
use api::helper::request_type;
use api::v1::auth_header::AuthScheme;
use api::v1::{Basic, GreptimeRequest, RequestHeader};
use common_error::prelude::ErrorExt;
use common_error::status_code::StatusCode;
use common_query::Output;
use common_runtime::Runtime;
use common_telemetry::{logging, timer};
use metrics::increment_counter;
use common_telemetry::logging;
use metrics::{histogram, increment_counter};
use session::context::{QueryContext, QueryContextRef};
use snafu::OptionExt;
use snafu::{OptionExt, ResultExt};
use tonic::Status;
use crate::auth::{Identity, Password, UserProviderRef};
use crate::error::Error::{Auth, UnsupportedAuthScheme};
use crate::error::{InvalidQuerySnafu, NotFoundAuthHeaderSnafu};
use crate::error::{InvalidQuerySnafu, JoinTaskSnafu, NotFoundAuthHeaderSnafu};
use crate::grpc::TonicResult;
use crate::metrics::{
METRIC_AUTH_FAILURE, METRIC_CODE_LABEL, METRIC_SERVER_GRPC_DB_REQUEST_TIMER,
METRIC_STATUS_LABEL, METRIC_TYPE_LABEL,
};
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
pub struct GreptimeRequestHandler {
@@ -60,11 +67,10 @@ impl GreptimeRequestHandler {
self.auth(header, &query_ctx).await?;
let _timer = timer!(
crate::metrics::METRIC_SERVER_GRPC_DB_REQUEST_TIMER,
&[(crate::metrics::METRIC_DB_LABEL, query_ctx.get_db_string())]
);
let handler = self.handler.clone();
let request_type = request_type(&query);
let db = query_ctx.get_db_string();
let timer = RequestTimer::new(db.clone(), request_type);
// Executes requests in another runtime to
// 1. prevent the execution from being cancelled unexpected by Tonic runtime;
@@ -85,17 +91,9 @@ impl GreptimeRequestHandler {
})
});
let output = handle.await.map_err(|e| {
// logs the runtime join error.
logging::error!("Failed to join handle, err: {}", e);
if e.is_cancelled() {
Status::cancelled(e.to_string())
} else if e.is_panic() {
Status::internal(format!("{:?}", e.into_panic()))
} else {
Status::unknown(e.to_string())
}
let output = handle.await.context(JoinTaskSnafu).map_err(|e| {
timer.record(e.status_code());
e
})??;
Ok(output)
}
@@ -132,11 +130,8 @@ impl GreptimeRequestHandler {
}
.map_err(|e| {
increment_counter!(
crate::metrics::METRIC_AUTH_FAILURE,
&[(
crate::metrics::METRIC_CODE_LABEL,
format!("{}", e.status_code())
)]
METRIC_AUTH_FAILURE,
&[(METRIC_CODE_LABEL, format!("{}", e.status_code()))]
);
Status::unauthenticated(e.to_string())
})?;
@@ -165,3 +160,44 @@ pub(crate) fn create_query_context(header: Option<&RequestHeader>) -> QueryConte
};
ctx
}
/// Histogram timer for handling gRPC request.
///
/// The timer records the elapsed time with [StatusCode::Success] on drop.
struct RequestTimer {
start: Instant,
db: String,
request_type: &'static str,
status_code: StatusCode,
}
impl RequestTimer {
/// Returns a new timer.
fn new(db: String, request_type: &'static str) -> RequestTimer {
RequestTimer {
start: Instant::now(),
db,
request_type,
status_code: StatusCode::Success,
}
}
/// Consumes the timer and record the elapsed time with specific `status_code`.
fn record(mut self, status_code: StatusCode) {
self.status_code = status_code;
}
}
impl Drop for RequestTimer {
fn drop(&mut self) {
histogram!(
METRIC_SERVER_GRPC_DB_REQUEST_TIMER,
self.start.elapsed(),
&[
(METRIC_CODE_LABEL, std::mem::take(&mut self.db)),
(METRIC_TYPE_LABEL, self.request_type.to_string()),
(METRIC_STATUS_LABEL, self.status_code.to_string())
]
);
}
}

View File

@@ -614,6 +614,7 @@ impl HttpServer {
/// A middleware to record metrics for HTTP.
// Based on https://github.com/tokio-rs/axum/blob/axum-v0.6.16/examples/prometheus-metrics/src/main.rs
pub(crate) async fn track_metrics<B>(req: Request<B>, next: Next<B>) -> impl IntoResponse {
let _timer = common_telemetry::timer!("http_track_metrics", &[("tag", "value")]);
let start = Instant::now();
let path = if let Some(matched_path) = req.extensions().get::<MatchedPath>() {
matched_path.as_str().to_owned()

View File

@@ -12,11 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::task::{Context, Poll};
use std::time::Instant;
use hyper::Body;
use metrics_process::Collector;
use once_cell::sync::Lazy;
use tonic::body::BoxBody;
use tower::{Layer, Service};
pub(crate) const METRIC_DB_LABEL: &str = "db";
pub(crate) const METRIC_CODE_LABEL: &str = "code";
pub(crate) const METRIC_TYPE_LABEL: &str = "type";
pub(crate) const METRIC_HTTP_SQL_ELAPSED: &str = "servers.http_sql_elapsed";
pub(crate) const METRIC_HTTP_PROMQL_ELAPSED: &str = "servers.http_promql_elapsed";
@@ -47,6 +54,8 @@ pub(crate) const METRIC_SERVER_GRPC_PROM_REQUEST_TIMER: &str = "servers.grpc.pro
pub(crate) const METRIC_HTTP_REQUESTS_TOTAL: &str = "servers.http_requests_total";
pub(crate) const METRIC_HTTP_REQUESTS_ELAPSED: &str = "servers.http_requests_elapsed";
pub(crate) const METRIC_GRPC_REQUESTS_TOTAL: &str = "servers.grpc_requests_total";
pub(crate) const METRIC_GRPC_REQUESTS_ELAPSED: &str = "servers.grpc_requests_elapsed";
pub(crate) const METRIC_METHOD_LABEL: &str = "method";
pub(crate) const METRIC_PATH_LABEL: &str = "path";
pub(crate) const METRIC_STATUS_LABEL: &str = "status";
@@ -58,3 +67,61 @@ pub(crate) static PROCESS_COLLECTOR: Lazy<Collector> = Lazy::new(|| {
collector.describe();
collector
});
// Based on https://github.com/hyperium/tonic/blob/master/examples/src/tower/server.rs
// See https://github.com/hyperium/tonic/issues/242
/// A metrics middleware.
#[derive(Debug, Clone, Default)]
pub(crate) struct MetricsMiddlewareLayer;
impl<S> Layer<S> for MetricsMiddlewareLayer {
type Service = MetricsMiddleware<S>;
fn layer(&self, service: S) -> Self::Service {
MetricsMiddleware { inner: service }
}
}
#[derive(Debug, Clone)]
pub(crate) struct MetricsMiddleware<S> {
inner: S,
}
impl<S> Service<hyper::Request<Body>> for MetricsMiddleware<S>
where
S: Service<hyper::Request<Body>, Response = hyper::Response<BoxBody>> + Clone + Send + 'static,
S::Future: Send + 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: hyper::Request<Body>) -> Self::Future {
// This is necessary because tonic internally uses `tower::buffer::Buffer`.
// See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149
// for details on why this is necessary
let clone = self.inner.clone();
let mut inner = std::mem::replace(&mut self.inner, clone);
Box::pin(async move {
let start = Instant::now();
let path = req.uri().path().to_string();
// Do extra async work here...
let response = inner.call(req).await?;
let latency = start.elapsed().as_secs_f64();
let status = response.status().as_u16().to_string();
let labels = [(METRIC_PATH_LABEL, path), (METRIC_STATUS_LABEL, status)];
metrics::increment_counter!(METRIC_GRPC_REQUESTS_TOTAL, &labels);
metrics::histogram!(METRIC_GRPC_REQUESTS_ELAPSED, latency, &labels);
Ok(response)
})
}
}