feat: add metrics for protocol interfaces (#1495)

* feat: add metrics for various interfaces

* feat: add db label for protocols

* feat: add postgres protocol metrics

* feat: add metrics for grpcs apis

* feat: add auth failure counter for mysql/pg

* fix: add db label to grpc prometheus interface

* Apply suggestions from code review

Co-authored-by: dennis zhuang <killme2008@gmail.com>

* feat: add error code for auth failure counter

* fix: use schema as dbname when catalog is default

---------

Co-authored-by: dennis zhuang <killme2008@gmail.com>
This commit is contained in:
Ning Sun
2023-04-28 15:42:35 +00:00
committed by GitHub
parent 0b0b5a10da
commit 7dbac89000
14 changed files with 244 additions and 22 deletions

View File

@@ -16,8 +16,11 @@ use std::sync::Arc;
use api::v1::auth_header::AuthScheme;
use api::v1::{Basic, GreptimeRequest, RequestHeader};
use common_error::prelude::ErrorExt;
use common_query::Output;
use common_runtime::Runtime;
use common_telemetry::timer;
use metrics::increment_counter;
use session::context::{QueryContext, QueryContextRef};
use snafu::OptionExt;
use tonic::Status;
@@ -57,6 +60,10 @@ impl GreptimeRequestHandler {
self.auth(header, &query_ctx).await?;
let _timer = timer!(
crate::metrics::METRIC_SERVER_GRPC_DB_REQUEST_TIMER,
&[(crate::metrics::METRIC_DB_LABEL, &query_ctx.get_db_string())]
);
let handler = self.handler.clone();
// Executes requests in another runtime to
@@ -112,7 +119,16 @@ impl GreptimeRequestHandler {
name: "Token AuthScheme".to_string(),
}),
}
.map_err(|e| Status::unauthenticated(e.to_string()))?;
.map_err(|e| {
increment_counter!(
crate::metrics::METRIC_AUTH_FAILURE,
&[(
crate::metrics::METRIC_CODE_LABEL,
format!("{}", e.status_code())
)]
);
Status::unauthenticated(e.to_string())
})?;
Ok(())
}
}

View File

@@ -19,6 +19,7 @@ use api::v1::prometheus_gateway_server::PrometheusGateway;
use api::v1::promql_request::Promql;
use api::v1::{PromqlRequest, PromqlResponse, ResponseHeader};
use async_trait::async_trait;
use common_telemetry::timer;
use common_time::util::current_time_rfc3339;
use query::parser::PromQuery;
use snafu::OptionExt;
@@ -62,6 +63,13 @@ impl PrometheusGateway for PrometheusGatewayService {
};
let query_context = create_query_context(inner.header.as_ref());
let _timer = timer!(
crate::metrics::METRIC_SERVER_GRPC_PROM_REQUEST_TIMER,
&[(
crate::metrics::METRIC_DB_LABEL,
&query_context.get_db_string()
)]
);
let result = self.handler.do_query(&prom_query, query_context).await;
let (metric_name, result_type) =
retrieve_metric_name_and_result_type(&prom_query.query).unwrap_or_default();

View File

@@ -16,9 +16,11 @@ use std::marker::PhantomData;
use axum::http::{self, Request, StatusCode};
use axum::response::Response;
use common_telemetry::error;
use common_error::prelude::ErrorExt;
use common_telemetry::warn;
use futures::future::BoxFuture;
use http_body::Body;
use metrics::increment_counter;
use session::context::UserInfo;
use snafu::{ensure, OptionExt, ResultExt};
use tower_http::auth::AsyncAuthorizeRequest;
@@ -80,7 +82,14 @@ where
let (username, password) = match extract_username_and_password(&request) {
Ok((username, password)) => (username, password),
Err(e) => {
error!("extract username and password failed: {}", e);
warn!("extract username and password failed: {}", e);
increment_counter!(
crate::metrics::METRIC_AUTH_FAILURE,
&[(
crate::metrics::METRIC_CODE_LABEL,
format!("{}", e.status_code())
)]
);
return Err(unauthorized_resp());
}
};
@@ -88,7 +97,14 @@ where
let (catalog, schema) = match extract_catalog_and_schema(&request) {
Ok((catalog, schema)) => (catalog, schema),
Err(e) => {
error!("extract catalog and schema failed: {}", e);
warn!("extract catalog and schema failed: {}", e);
increment_counter!(
crate::metrics::METRIC_AUTH_FAILURE,
&[(
crate::metrics::METRIC_CODE_LABEL,
format!("{}", e.status_code())
)]
);
return Err(unauthorized_resp());
}
};
@@ -107,7 +123,14 @@ where
Ok(request)
}
Err(e) => {
error!("authenticate failed: {}", e);
warn!("authenticate failed: {}", e);
increment_counter!(
crate::metrics::METRIC_AUTH_FAILURE,
&[(
crate::metrics::METRIC_CODE_LABEL,
format!("{}", e.status_code())
)]
);
Err(unauthorized_resp())
}
}

View File

@@ -43,13 +43,15 @@ pub async fn sql(
_user_info: Extension<UserInfo>,
Form(form_params): Form<SqlQuery>,
) -> Json<JsonResponse> {
let _timer = timer!(crate::metrics::METRIC_HTTP_SQL_ELAPSED);
let sql_handler = &state.sql_handler;
let start = Instant::now();
let sql = query_params.sql.or(form_params.sql);
let db = query_params.db.or(form_params.db);
let _timer = timer!(
crate::metrics::METRIC_HTTP_SQL_ELAPSED,
&[(crate::metrics::METRIC_DB_LABEL, db.as_deref().unwrap_or(""))]
);
let resp = if let Some(sql) = &sql {
match crate::http::query_context_from_db(sql_handler.clone(), db).await {
@@ -96,11 +98,14 @@ pub async fn promql(
// TODO(fys): pass _user_info into query context
_user_info: Extension<UserInfo>,
) -> Json<JsonResponse> {
let _timer = timer!(crate::metrics::METRIC_HTTP_PROMQL_ELAPSED);
let sql_handler = &state.sql_handler;
let exec_start = Instant::now();
let db = params.db.clone();
let _timer = timer!(
crate::metrics::METRIC_HTTP_PROMQL_ELAPSED,
&[(crate::metrics::METRIC_DB_LABEL, db.as_deref().unwrap_or(""))]
);
let prom_query = params.into();
let resp = match super::query_context_from_db(sql_handler.clone(), db).await {
Ok(query_ctx) => {

View File

@@ -20,6 +20,7 @@ use axum::http::StatusCode;
use axum::response::IntoResponse;
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_grpc::writer::Precision;
use common_telemetry::timer;
use session::context::QueryContext;
use crate::error::{Result, TimePrecisionSnafu};
@@ -48,6 +49,10 @@ pub async fn influxdb_write(
let db = params
.remove("db")
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string());
let _timer = timer!(
crate::metrics::METRIC_HTTP_INFLUXDB_WRITE_ELAPSED,
&[(crate::metrics::METRIC_DB_LABEL, &db)]
);
let (catalog, schema) = parse_catalog_and_schema_from_client_database_name(&db);
let ctx = Arc::new(QueryContext::with(catalog, schema));

View File

@@ -19,6 +19,7 @@ use axum::extract::{Query, RawBody, State};
use axum::http::{header, StatusCode};
use axum::response::IntoResponse;
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_telemetry::timer;
use hyper::Body;
use prost::Message;
use schemars::JsonSchema;
@@ -52,6 +53,13 @@ pub async fn remote_write(
) -> Result<(StatusCode, ())> {
let request = decode_remote_write_request(body).await?;
let _timer = timer!(
crate::metrics::METRIC_HTTP_PROMETHEUS_WRITE_ELAPSED,
&[(
crate::metrics::METRIC_DB_LABEL,
params.db.as_deref().unwrap_or("")
)]
);
let ctx = if let Some(db) = params.db {
let (catalog, schema) = parse_catalog_and_schema_from_client_database_name(&db);
Arc::new(QueryContext::with(catalog, schema))
@@ -85,6 +93,13 @@ pub async fn remote_read(
) -> Result<PrometheusResponse> {
let request = decode_remote_read_request(body).await?;
let _timer = timer!(
crate::metrics::METRIC_HTTP_PROMETHEUS_READ_ELAPSED,
&[(
crate::metrics::METRIC_DB_LABEL,
params.db.as_deref().unwrap_or("")
)]
);
let ctx = if let Some(db) = params.db {
let (catalog, schema) = parse_catalog_and_schema_from_client_database_name(&db);
Arc::new(QueryContext::with(catalog, schema))

View File

@@ -12,5 +12,32 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) const METRIC_DB_LABEL: &str = "db";
pub(crate) const METRIC_CODE_LABEL: &str = "code";
pub(crate) const METRIC_HTTP_SQL_ELAPSED: &str = "servers.http_sql_elapsed";
pub(crate) const METRIC_HTTP_PROMQL_ELAPSED: &str = "servers.http_promql_elapsed";
pub(crate) const METRIC_AUTH_FAILURE: &str = "servers.auth_failure_count";
pub(crate) const METRIC_HTTP_INFLUXDB_WRITE_ELAPSED: &str = "servers.http_influxdb_write_elapsed";
pub(crate) const METRIC_HTTP_PROMETHEUS_WRITE_ELAPSED: &str =
"servers.http_prometheus_write_elapsed";
pub(crate) const METRIC_HTTP_PROMETHEUS_READ_ELAPSED: &str = "servers.http_prometheus_read_elapsed";
pub(crate) const METRIC_TCP_OPENTSDB_LINE_WRITE_ELAPSED: &str =
"servers.opentsdb_line_write_elapsed";
pub(crate) const METRIC_MYSQL_CONNECTIONS: &str = "servers.mysql_connection_count";
pub(crate) const METRIC_MYSQL_QUERY_TIMER: &str = "servers.mysql_query_elapsed";
pub(crate) const METRIC_MYSQL_SUBPROTOCOL_LABEL: &str = "subprotocol";
pub(crate) const METRIC_MYSQL_BINQUERY: &str = "binquery";
pub(crate) const METRIC_MYSQL_TEXTQUERY: &str = "textquery";
pub(crate) const METRIC_MYSQL_PREPARED_COUNT: &str = "servers.mysql_prepared_count";
pub(crate) const METRIC_POSTGRES_CONNECTIONS: &str = "servers.postgres_connection_count";
pub(crate) const METRIC_POSTGRES_QUERY_TIMER: &str = "servers.postgres_query_elapsed";
pub(crate) const METRIC_POSTGRES_SUBPROTOCOL_LABEL: &str = "subprotocol";
pub(crate) const METRIC_POSTGRES_SIMPLE_QUERY: &str = "simple";
pub(crate) const METRIC_POSTGRES_EXTENDED_QUERY: &str = "extended";
pub(crate) const METRIC_POSTGRES_PREPARED_COUNT: &str = "servers.postgres_prepared_count";
pub(crate) const METRIC_SERVER_GRPC_DB_REQUEST_TIMER: &str = "servers.grpc.db_request_elapsed";
pub(crate) const METRIC_SERVER_GRPC_PROM_REQUEST_TIMER: &str = "servers.grpc.prom_request_elapsed";

View File

@@ -20,9 +20,11 @@ use std::time::{Duration, Instant};
use async_trait::async_trait;
use chrono::{NaiveDate, NaiveDateTime};
use common_error::prelude::ErrorExt;
use common_query::Output;
use common_telemetry::tracing::log;
use common_telemetry::{error, trace};
use common_telemetry::{error, timer, trace, warn};
use metrics::increment_counter;
use opensrv_mysql::{
AsyncMysqlShim, Column, ColumnFlags, ColumnType, ErrorKind, InitWriter, ParamParser,
ParamValue, QueryResultWriter, StatementMetaWriter, ValueInner,
@@ -154,7 +156,8 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
user_info = Some(userinfo);
}
Err(e) => {
error!("Failed to auth, err: {:?}", e);
increment_counter!(crate::metrics::METRIC_AUTH_FAILURE);
warn!("Failed to auth, err: {:?}", e);
return false;
}
};
@@ -182,6 +185,13 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
let params = dummy_params(param_num);
w.reply(stmt_id, &params, &[]).await?;
increment_counter!(
crate::metrics::METRIC_MYSQL_PREPARED_COUNT,
&[(
crate::metrics::METRIC_DB_LABEL,
self.session.context().get_db_string()
)]
);
return Ok(());
}
@@ -191,6 +201,19 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
p: ParamParser<'a>,
w: QueryResultWriter<'a, W>,
) -> Result<()> {
let _timer = timer!(
crate::metrics::METRIC_MYSQL_QUERY_TIMER,
&[
(
crate::metrics::METRIC_MYSQL_SUBPROTOCOL_LABEL,
crate::metrics::METRIC_MYSQL_BINQUERY
),
(
crate::metrics::METRIC_DB_LABEL,
&self.session.context().get_db_string()
)
]
);
let params: Vec<ParamValue> = p.into_iter().collect();
let query = match self.query(stmt_id) {
None => {
@@ -226,6 +249,19 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
query: &'a str,
writer: QueryResultWriter<'a, W>,
) -> Result<()> {
let _timer = timer!(
crate::metrics::METRIC_MYSQL_QUERY_TIMER,
&[
(
crate::metrics::METRIC_MYSQL_SUBPROTOCOL_LABEL,
crate::metrics::METRIC_MYSQL_TEXTQUERY
),
(
crate::metrics::METRIC_DB_LABEL,
&self.session.context().get_db_string()
)
]
);
let outputs = self.do_query(query).await;
writer::write_output(writer, query, outputs).await?;
Ok(())
@@ -242,6 +278,13 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
if let Some(schema_validator) = &self.user_provider {
if let Err(e) = schema_validator.authorize(catalog, schema, user_info).await {
increment_counter!(
crate::metrics::METRIC_AUTH_FAILURE,
&[(
crate::metrics::METRIC_CODE_LABEL,
format!("{}", e.status_code())
)]
);
return w
.error(
ErrorKind::ER_DBACCESS_DENIED_ERROR,

View File

@@ -20,6 +20,7 @@ use async_trait::async_trait;
use common_runtime::Runtime;
use common_telemetry::logging::{info, warn};
use futures::StreamExt;
use metrics::{decrement_gauge, increment_gauge};
use opensrv_mysql::{
plain_run_with_options, secure_run_with_options, AsyncMysqlIntermediary, IntermediaryOptions,
};
@@ -155,12 +156,14 @@ impl MysqlServer {
) -> Result<()> {
info!("MySQL connection coming from: {}", stream.peer_addr()?);
io_runtime.spawn(async move {
increment_gauge!(crate::metrics::METRIC_MYSQL_CONNECTIONS, 1.0);
// TODO(LFC): Use `output_stream` to write large MySQL ResultSet to client.
if let Err(e) = Self::do_handle(stream, spawn_ref, spawn_config).await {
// TODO(LFC): Write this error to client as well, in MySQL text protocol.
// Looks like we have to expose opensrv-mysql's `PacketWriter`?
warn!("Internal error occurred during query exec, server actively close the channel to let client try next time: {}.", e)
}
decrement_gauge!(crate::metrics::METRIC_MYSQL_CONNECTIONS, 1.0);
});
Ok(())

View File

@@ -14,6 +14,7 @@
//! Modified from Tokio's mini-redis example.
use common_telemetry::timer;
use session::context::QueryContext;
use tokio::io::{AsyncRead, AsyncWrite};
@@ -91,6 +92,7 @@ impl<S: AsyncWrite + AsyncRead + Unpin> Handler<S> {
match DataPoint::try_create(&line) {
Ok(data_point) => {
let _timer = timer!(crate::metrics::METRIC_TCP_OPENTSDB_LINE_WRITE_ELAPSED);
let result = self.query_handler.exec(&data_point, ctx.clone()).await;
if let Err(e) = result {
self.connection.write_line(e.to_string()).await?;

View File

@@ -15,7 +15,9 @@
use std::fmt::Debug;
use async_trait::async_trait;
use common_error::prelude::ErrorExt;
use futures::{Sink, SinkExt};
use metrics::increment_counter;
use pgwire::api::auth::StartupHandler;
use pgwire::api::{auth, ClientInfo, PgWireConnectionState};
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
@@ -87,16 +89,26 @@ impl PgLoginVerifier {
None => return Ok(false),
};
let _user_info = user_provider
if let Err(e) = user_provider
.auth(
Identity::UserId(user_name, None),
Password::PlainText(password),
catalog,
schema,
)
.await?;
Ok(true)
.await
{
increment_counter!(
crate::metrics::METRIC_AUTH_FAILURE,
&[(
crate::metrics::METRIC_CODE_LABEL,
format!("{}", e.status_code())
)]
);
Err(e.into())
} else {
Ok(true)
}
}
}

View File

@@ -19,9 +19,11 @@ use async_trait::async_trait;
use common_query::Output;
use common_recordbatch::error::Result as RecordBatchResult;
use common_recordbatch::RecordBatch;
use common_telemetry::timer;
use datatypes::prelude::{ConcreteDataType, Value};
use datatypes::schema::{Schema, SchemaRef};
use futures::{future, stream, Stream, StreamExt};
use metrics::increment_counter;
use pgwire::api::portal::{Format, Portal};
use pgwire::api::query::{ExtendedQueryHandler, SimpleQueryHandler, StatementOrPortal};
use pgwire::api::results::{
@@ -44,6 +46,19 @@ impl SimpleQueryHandler for PostgresServerHandler {
where
C: ClientInfo + Unpin + Send + Sync,
{
let _timer = timer!(
crate::metrics::METRIC_POSTGRES_QUERY_TIMER,
&[
(
crate::metrics::METRIC_POSTGRES_SUBPROTOCOL_LABEL,
crate::metrics::METRIC_POSTGRES_SIMPLE_QUERY
),
(
crate::metrics::METRIC_DB_LABEL,
&self.query_ctx.get_db_string()
)
]
);
let outputs = self
.query_handler
.do_query(query, self.query_ctx.clone())
@@ -244,6 +259,7 @@ impl QueryParser for POCQueryParser {
type Statement = (Statement, String);
fn parse_sql(&self, sql: &str, types: &[Type]) -> PgWireResult<Self::Statement> {
increment_counter!(crate::metrics::METRIC_POSTGRES_PREPARED_COUNT);
let mut stmts = ParserContext::create_with_dialect(sql, &GenericDialect {})
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
if stmts.len() != 1 {
@@ -336,9 +352,23 @@ impl ExtendedQueryHandler for PostgresServerHandler {
where
C: ClientInfo + Unpin + Send + Sync,
{
let _timer = timer!(
crate::metrics::METRIC_POSTGRES_QUERY_TIMER,
&[
(
crate::metrics::METRIC_POSTGRES_SUBPROTOCOL_LABEL,
crate::metrics::METRIC_POSTGRES_EXTENDED_QUERY
),
(
crate::metrics::METRIC_DB_LABEL,
&self.query_ctx.get_db_string()
)
]
);
let (_, sql) = portal.statement().statement();
// manually replace variables in prepared statement
// FIXME(sunng87)
let mut sql = sql.clone();
for i in 0..portal.parameter_len() {
sql = sql.replace(&format!("${}", i + 1), &parameter_to_string(portal, i)?);

View File

@@ -21,6 +21,7 @@ use common_runtime::Runtime;
use common_telemetry::logging::error;
use common_telemetry::{debug, warn};
use futures::StreamExt;
use metrics::{decrement_gauge, increment_gauge};
use pgwire::api::MakeHandler;
use pgwire::tokio::process_socket;
use tokio;
@@ -83,13 +84,19 @@ impl PostgresServer {
Err(e) => warn!("Failed to get PostgreSQL client addr, err: {}", e),
}
io_runtime.spawn(process_socket(
io_stream,
tls_acceptor.clone(),
handler.clone(),
handler.clone(),
handler,
));
io_runtime.spawn(async move {
increment_gauge!(crate::metrics::METRIC_POSTGRES_CONNECTIONS, 1.0);
let r = process_socket(
io_stream,
tls_acceptor.clone(),
handler.clone(),
handler.clone(),
handler,
)
.await;
decrement_gauge!(crate::metrics::METRIC_POSTGRES_CONNECTIONS, 1.0);
r
});
}
};
}

View File

@@ -92,6 +92,16 @@ impl QueryContext {
)
}
}
pub fn get_db_string(&self) -> String {
let catalog = self.current_catalog();
let schema = self.current_schema();
if catalog == DEFAULT_CATALOG_NAME {
schema
} else {
format!("{catalog}-{schema}")
}
}
}
pub const DEFAULT_USERNAME: &str = "greptime";
@@ -148,6 +158,7 @@ pub enum Channel {
#[cfg(test)]
mod test {
use super::*;
use crate::context::{Channel, UserInfo};
use crate::Session;
@@ -167,4 +178,19 @@ mod test {
);
assert_eq!(session.conn_info().client_host.port(), 9000);
}
#[test]
fn test_context_db_string() {
let context = QueryContext::new();
context.set_current_catalog("a0b1c2d3");
context.set_current_schema("test");
assert_eq!("a0b1c2d3-test", context.get_db_string());
context.set_current_catalog(DEFAULT_CATALOG_NAME);
context.set_current_schema("test");
assert_eq!("test", context.get_db_string());
}
}