mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
chore: add metrics for log ingestion (#4411)
* chore: add metrics for log ingestion * chore: record result as well
This commit is contained in:
@@ -24,6 +24,7 @@ use axum::http::header::CONTENT_TYPE;
|
||||
use axum::http::{Request, StatusCode};
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use axum::{async_trait, BoxError, Extension, TypedHeader};
|
||||
use common_query::{Output, OutputData};
|
||||
use common_telemetry::{error, warn};
|
||||
use pipeline::error::{CastTypeSnafu, PipelineTransformSnafu};
|
||||
use pipeline::util::to_pipeline_version;
|
||||
@@ -40,6 +41,10 @@ use crate::error::{
|
||||
use crate::http::greptime_manage_resp::GreptimedbManageResponse;
|
||||
use crate::http::greptime_result_v1::GreptimedbV1Response;
|
||||
use crate::http::HttpResponse;
|
||||
use crate::metrics::{
|
||||
METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_INGESTION_COUNTER, METRIC_HTTP_LOGS_INGESTION_ELAPSED,
|
||||
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_SUCCESS_VALUE,
|
||||
};
|
||||
use crate::query_handler::LogHandlerRef;
|
||||
|
||||
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
|
||||
@@ -298,14 +303,27 @@ async fn ingest_logs_inner(
|
||||
pipeline_data: PipelineValue,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<HttpResponse> {
|
||||
let start = std::time::Instant::now();
|
||||
let db = query_ctx.get_db_string();
|
||||
let exec_timer = std::time::Instant::now();
|
||||
|
||||
let pipeline = state
|
||||
.get_pipeline(&pipeline_name, version, query_ctx.clone())
|
||||
.await?;
|
||||
|
||||
let transform_timer = std::time::Instant::now();
|
||||
let transformed_data: Rows = pipeline
|
||||
.exec(pipeline_data)
|
||||
.map_err(|reason| PipelineTransformSnafu { reason }.build())
|
||||
.inspect(|_| {
|
||||
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
|
||||
.with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
|
||||
.observe(transform_timer.elapsed().as_secs_f64());
|
||||
})
|
||||
.map_err(|reason| {
|
||||
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
|
||||
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
|
||||
.observe(transform_timer.elapsed().as_secs_f64());
|
||||
PipelineTransformSnafu { reason }.build()
|
||||
})
|
||||
.context(PipelineSnafu)?;
|
||||
|
||||
let insert_request = RowInsertRequest {
|
||||
@@ -317,9 +335,26 @@ async fn ingest_logs_inner(
|
||||
};
|
||||
let output = state.insert_logs(insert_requests, query_ctx).await;
|
||||
|
||||
if let Ok(Output {
|
||||
data: OutputData::AffectedRows(rows),
|
||||
meta: _,
|
||||
}) = &output
|
||||
{
|
||||
METRIC_HTTP_LOGS_INGESTION_COUNTER
|
||||
.with_label_values(&[db.as_str()])
|
||||
.inc_by(*rows as u64);
|
||||
METRIC_HTTP_LOGS_INGESTION_ELAPSED
|
||||
.with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
|
||||
.observe(exec_timer.elapsed().as_secs_f64());
|
||||
} else {
|
||||
METRIC_HTTP_LOGS_INGESTION_ELAPSED
|
||||
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
|
||||
.observe(exec_timer.elapsed().as_secs_f64());
|
||||
}
|
||||
|
||||
let response = GreptimedbV1Response::from_output(vec![output])
|
||||
.await
|
||||
.with_execution_time(start.elapsed().as_millis() as u64);
|
||||
.with_execution_time(exec_timer.elapsed().as_millis() as u64);
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
|
||||
@@ -44,6 +44,10 @@ pub(crate) const METRIC_POSTGRES_SIMPLE_QUERY: &str = "simple";
|
||||
pub(crate) const METRIC_POSTGRES_EXTENDED_QUERY: &str = "extended";
|
||||
pub(crate) const METRIC_METHOD_LABEL: &str = "method";
|
||||
pub(crate) const METRIC_PATH_LABEL: &str = "path";
|
||||
pub(crate) const METRIC_RESULT_LABEL: &str = "result";
|
||||
|
||||
pub(crate) const METRIC_SUCCESS_VALUE: &str = "success";
|
||||
pub(crate) const METRIC_FAILURE_VALUE: &str = "failure";
|
||||
|
||||
lazy_static! {
|
||||
pub static ref METRIC_ERROR_COUNTER: IntCounterVec = register_int_counter_vec!(
|
||||
@@ -130,6 +134,26 @@ lazy_static! {
|
||||
&[METRIC_DB_LABEL]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_HTTP_LOGS_INGESTION_COUNTER: IntCounterVec = register_int_counter_vec!(
|
||||
"greptime_servers_http_logs_ingestion_counter",
|
||||
"servers http logs ingestion counter",
|
||||
&[METRIC_DB_LABEL]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_HTTP_LOGS_INGESTION_ELAPSED: HistogramVec =
|
||||
register_histogram_vec!(
|
||||
"greptime_servers_http_logs_ingestion_elapsed",
|
||||
"servers http logs ingestion elapsed",
|
||||
&[METRIC_DB_LABEL, METRIC_RESULT_LABEL]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_HTTP_LOGS_TRANSFORM_ELAPSED: HistogramVec =
|
||||
register_histogram_vec!(
|
||||
"greptime_servers_http_logs_transform_elapsed",
|
||||
"servers http logs transform elapsed",
|
||||
&[METRIC_DB_LABEL, METRIC_RESULT_LABEL]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_MYSQL_CONNECTIONS: IntGauge = register_int_gauge!(
|
||||
"greptime_servers_mysql_connection_count",
|
||||
"servers mysql connection count"
|
||||
|
||||
Reference in New Issue
Block a user