From 715e1a321fc21ae546042dfd65d6b211cce26752 Mon Sep 17 00:00:00 2001 From: Huaijin Date: Wed, 17 May 2023 11:56:22 +0800 Subject: [PATCH] feat: implement /api/v1/labels for prometheus (#1580) * feat: implement /api/v1/labels for prometheus * fix: only gather match[] * chore: fix typo * chore: fix typo * chore: change style * fix: suggestion * fix: suggestion * chore: typo * fix: fmt * fix: add more test --- src/common/time/src/util.rs | 7 + src/query/src/parser.rs | 1 + src/servers/src/error.rs | 5 + src/servers/src/prom.rs | 236 +++++++++++++++++++++++++++++--- src/servers/src/prometheus.rs | 4 +- tests-integration/tests/grpc.rs | 10 +- tests-integration/tests/http.rs | 22 +++ 7 files changed, 260 insertions(+), 25 deletions(-) diff --git a/src/common/time/src/util.rs b/src/common/time/src/util.rs index f565914182..f2d1feeee1 100644 --- a/src/common/time/src/util.rs +++ b/src/common/time/src/util.rs @@ -22,6 +22,13 @@ pub fn current_time_rfc3339() -> String { chrono::Utc::now().to_rfc3339() } +/// Returns the yesterday time in rfc3339 format. +pub fn yesterday_rfc3339() -> String { + let now = chrono::Utc::now(); + let day_before = now - chrono::Duration::days(1); + day_before.to_rfc3339() +} + /// Port of rust unstable features `int_roundings`. pub(crate) fn div_ceil(this: i64, rhs: i64) -> i64 { let d = this / rhs; diff --git a/src/query/src/parser.rs b/src/query/src/parser.rs index 49971771e8..05eefa14da 100644 --- a/src/query/src/parser.rs +++ b/src/query/src/parser.rs @@ -37,6 +37,7 @@ use crate::error::{ use crate::metrics::{METRIC_PARSE_PROMQL_ELAPSED, METRIC_PARSE_SQL_ELAPSED}; const DEFAULT_LOOKBACK: u64 = 5 * 60; // 5m +pub const DEFAULT_LOOKBACK_STRING: &str = "5m"; pub const EXPLAIN_NODE_NAME: &str = "EXPLAIN"; pub const ANALYZE_NODE_NAME: &str = "ANALYZE"; diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 101ac1f44b..dccebf12f9 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -281,6 +281,9 @@ pub enum Error { #[snafu(backtrace)] source: query::error::Error, }, + + #[snafu(display("{}", reason))] + UnexpectedResult { reason: String, location: Location }, } pub type Result = std::result::Result; @@ -348,6 +351,8 @@ impl ErrorExt for Error { InvalidFlushArgument { .. } => StatusCode::InvalidArguments, ParsePromQL { source, .. } => source.status_code(), + + UnexpectedResult { .. } => StatusCode::Unexpected, } } diff --git a/src/servers/src/prom.rs b/src/servers/src/prom.rs index eaee673de6..14256374ab 100644 --- a/src/servers/src/prom.rs +++ b/src/servers/src/prom.rs @@ -13,7 +13,7 @@ // limitations under the License. //! prom supply the prometheus HTTP API Server compliance -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::net::SocketAddr; use std::sync::Arc; @@ -27,7 +27,7 @@ use common_error::status_code::StatusCode; use common_query::Output; use common_recordbatch::RecordBatches; use common_telemetry::info; -use common_time::util::current_time_rfc3339; +use common_time::util::{current_time_rfc3339, yesterday_rfc3339}; use datatypes::prelude::ConcreteDataType; use datatypes::scalars::ScalarVector; use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector}; @@ -37,11 +37,12 @@ use promql_parser::parser::{ AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, MatrixSelector, ParenExpr, SubqueryExpr, UnaryExpr, ValueType, VectorSelector, }; -use query::parser::PromQuery; +use query::parser::{PromQuery, DEFAULT_LOOKBACK_STRING}; use schemars::JsonSchema; +use serde::de::{self, MapAccess, Visitor}; use serde::{Deserialize, Serialize}; use session::context::{QueryContext, QueryContextRef}; -use snafu::{ensure, OptionExt, ResultExt}; +use snafu::{ensure, Location, OptionExt, ResultExt}; use tokio::sync::oneshot::Sender; use tokio::sync::{oneshot, Mutex}; use tower::ServiceBuilder; @@ -51,10 +52,11 @@ use tower_http::trace::TraceLayer; use crate::auth::UserProviderRef; use crate::error::{ - AlreadyStartedSnafu, CollectRecordbatchSnafu, InternalSnafu, NotSupportedSnafu, Result, + AlreadyStartedSnafu, CollectRecordbatchSnafu, Error, InternalSnafu, NotSupportedSnafu, Result, StartHttpSnafu, }; use crate::http::authorize::HttpAuth; +use crate::prometheus::{FIELD_COLUMN_NAME, TIMESTAMP_COLUMN_NAME}; use crate::server::Server; pub const PROM_API_VERSION: &str = "v1"; @@ -88,11 +90,12 @@ impl PromServer { } pub fn make_app(&self) -> Router { - // TODO(ruihang): implement format_query, series, labels, values, query_examplars and targets methods + // TODO(ruihang): implement format_query, series, values, query_examplars and targets methods let router = Router::new() .route("/query", routing::post(instant_query).get(instant_query)) .route("/query_range", routing::post(range_query).get(range_query)) + .route("/labels", routing::post(labels_query).get(labels_query)) .with_state(self.query_handler.clone()); Router::new() @@ -175,10 +178,23 @@ pub struct PromData { pub result: Vec, } +#[derive(Debug, Serialize, Deserialize, JsonSchema, PartialEq)] +#[serde(untagged)] +pub enum PromResponse { + PromData(PromData), + Labels(Vec), +} + +impl Default for PromResponse { + fn default() -> Self { + PromResponse::PromData(Default::default()) + } +} + #[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)] pub struct PromJsonResponse { pub status: String, - pub data: PromData, + pub data: PromResponse, #[serde(skip_serializing_if = "Option::is_none")] pub error: Option, #[serde(skip_serializing_if = "Option::is_none")] @@ -196,14 +212,14 @@ impl PromJsonResponse { { Json(PromJsonResponse { status: "error".to_string(), - data: PromData::default(), + data: PromResponse::default(), error: Some(reason.into()), error_type: Some(error_type.into()), warnings: None, }) } - pub fn success(data: PromData) -> Json { + pub fn success(data: PromResponse) -> Json { Json(PromJsonResponse { status: "success".to_string(), data, @@ -236,10 +252,9 @@ impl PromJsonResponse { result_type, )?) } - Output::AffectedRows(_) => Self::error( - "unexpected result", - "expected data result, but got affected rows", - ), + Output::AffectedRows(_) => { + Self::error("Unexpected", "expected data result, but got affected rows") + } }; json @@ -254,10 +269,10 @@ impl PromJsonResponse { if err.status_code() == StatusCode::TableNotFound || err.status_code() == StatusCode::TableColumnNotFound { - Self::success(PromData { + Self::success(PromResponse::PromData(PromData { result_type: result_type_string, ..Default::default() - }) + })) } else { Self::error(err.status_code().to_string(), err.to_string()) } @@ -270,7 +285,7 @@ impl PromJsonResponse { batches: RecordBatches, metric_name: String, result_type: Option, - ) -> Result { + ) -> Result { // infer semantic type of each column from schema. // TODO(ruihang): wish there is a better way to do this. let mut timestamp_column_index = None; @@ -383,10 +398,10 @@ impl PromJsonResponse { .collect::>>()?; let result_type_string = result_type.map(|t| t.to_string()).unwrap_or_default(); - let data = PromData { + let data = PromResponse::PromData(PromData { result_type: result_type_string, result, - }; + }); Ok(data) } @@ -463,6 +478,191 @@ pub async fn range_query( PromJsonResponse::from_query_result(result, metric_name, Some(ValueType::Matrix)).await } +#[derive(Debug, Default, Serialize, JsonSchema)] +struct Matches(Vec); + +#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] +pub struct LabelsQuery { + start: Option, + end: Option, + #[serde(flatten)] + matches: Matches, + db: Option, +} + +// Custom Deserialize method to support parsing repeated match[] +impl<'de> Deserialize<'de> for Matches { + fn deserialize(deserializer: D) -> std::result::Result + where + D: de::Deserializer<'de>, + { + struct MatchesVisitor; + + impl<'d> Visitor<'d> for MatchesVisitor { + type Value = Vec; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("a string") + } + + fn visit_map(self, mut access: M) -> std::result::Result + where + M: MapAccess<'d>, + { + let mut matches = Vec::new(); + while let Some((key, value)) = access.next_entry::()? { + if key == "match[]" { + matches.push(value); + } + } + Ok(matches) + } + } + Ok(Matches(deserializer.deserialize_map(MatchesVisitor)?)) + } +} + +#[axum_macros::debug_handler] +pub async fn labels_query( + State(handler): State, + Query(params): Query, + Form(form_params): Form, +) -> Json { + let mut queries: Vec = params.matches.0; + if queries.is_empty() { + queries = form_params.matches.0; + } + if queries.is_empty() { + return PromJsonResponse::error("Unsupported", "match[] parameter is required"); + } + + let start = params + .start + .or(form_params.start) + .unwrap_or_else(yesterday_rfc3339); + let end = params + .end + .or(form_params.end) + .unwrap_or_else(current_time_rfc3339); + + let db = ¶ms.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string()); + let (catalog, schema) = super::parse_catalog_and_schema_from_client_database_name(db); + let query_ctx = Arc::new(QueryContext::with(catalog, schema)); + + let mut labels: HashSet = HashSet::new(); + labels.insert(METRIC_NAME.to_string()); + + for query in queries { + let prom_query = PromQuery { + query, + start: start.clone(), + end: end.clone(), + // TODO: find a better value for step + step: DEFAULT_LOOKBACK_STRING.to_string(), + }; + + let result = handler.do_query(&prom_query, query_ctx.clone()).await; + + let response = retrieve_labels_name_from_query_result(result, &mut labels).await; + + if let Err(err) = response { + // Prometheus won't report error if querying nonexist label and metric + if err.status_code() != StatusCode::TableNotFound + && err.status_code() != StatusCode::TableColumnNotFound + { + return PromJsonResponse::error(err.status_code().to_string(), err.to_string()); + } + } + } + + labels.remove(TIMESTAMP_COLUMN_NAME); + labels.remove(FIELD_COLUMN_NAME); + + let mut sorted_labels: Vec = labels.into_iter().collect(); + sorted_labels.sort(); + PromJsonResponse::success(PromResponse::Labels(sorted_labels)) +} + +/// Retrieve labels name from query result +async fn retrieve_labels_name_from_query_result( + result: Result, + labels: &mut HashSet, +) -> Result<()> { + match result? { + Output::RecordBatches(batches) => { + record_batches_to_labels_name(batches, labels)?; + Ok(()) + } + Output::Stream(stream) => { + let batches = RecordBatches::try_collect(stream) + .await + .context(CollectRecordbatchSnafu)?; + record_batches_to_labels_name(batches, labels)?; + Ok(()) + } + Output::AffectedRows(_) => Err(Error::UnexpectedResult { + reason: "expected data result, but got affected rows".to_string(), + location: Location::default(), + }), + } +} + +/// Retrieve labels name from record batches +fn record_batches_to_labels_name( + batches: RecordBatches, + labels: &mut HashSet, +) -> Result<()> { + let mut column_indices = Vec::new(); + let mut field_column_indices = Vec::new(); + for (i, column) in batches.schema().column_schemas().iter().enumerate() { + if let ConcreteDataType::Float64(_) = column.data_type { + field_column_indices.push(i); + } + column_indices.push(i); + } + + if field_column_indices.is_empty() { + return Err(Error::Internal { + err_msg: "no value column found".to_string(), + }); + } + + for batch in batches.iter() { + let names = column_indices + .iter() + .map(|c| batches.schema().column_name_by_index(*c).to_string()) + .collect::>(); + + let field_columns = field_column_indices + .iter() + .map(|i| { + batch + .column(*i) + .as_any() + .downcast_ref::() + .unwrap() + }) + .collect::>(); + + for row_index in 0..batch.num_rows() { + // if all field columns are null, skip this row + if field_columns + .iter() + .all(|c| c.get_data(row_index).is_none()) + { + continue; + } + + // if a field is not null, record the tag name and return + names.iter().for_each(|name| { + labels.insert(name.to_string()); + }); + return Ok(()); + } + } + Ok(()) +} + pub(crate) fn retrieve_metric_name_and_result_type( promql: &str, ) -> Option<(String, Option)> { diff --git a/src/servers/src/prometheus.rs b/src/servers/src/prometheus.rs index 1725d1149c..848a714ec3 100644 --- a/src/servers/src/prometheus.rs +++ b/src/servers/src/prometheus.rs @@ -31,8 +31,8 @@ use snap::raw::{Decoder, Encoder}; use crate::error::{self, Result}; -const TIMESTAMP_COLUMN_NAME: &str = "greptime_timestamp"; -const FIELD_COLUMN_NAME: &str = "greptime_value"; +pub const TIMESTAMP_COLUMN_NAME: &str = "greptime_timestamp"; +pub const FIELD_COLUMN_NAME: &str = "greptime_value"; pub const METRIC_NAME_LABEL: &str = "__name__"; /// Metrics for push gateway protocol diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 9b620b241f..60c79c9c98 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -22,7 +22,7 @@ use api::v1::{ use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::consts::{MIN_USER_TABLE_ID, MITO_ENGINE}; use common_query::Output; -use servers::prom::{PromData, PromJsonResponse, PromSeries}; +use servers::prom::{PromData, PromJsonResponse, PromResponse, PromSeries}; use servers::server::Server; use tests_integration::test_util::{setup_grpc_server, StorageType}; @@ -366,7 +366,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) { let instant_query_result = serde_json::from_slice::(&json_bytes).unwrap(); let expected = PromJsonResponse { status: "success".to_string(), - data: PromData { + data: PromResponse::PromData(PromData { result_type: "vector".to_string(), result: vec![ PromSeries { @@ -390,7 +390,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) { ..Default::default() }, ], - }, + }), error: None, error_type: None, warnings: None, @@ -417,7 +417,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) { let range_query_result = serde_json::from_slice::(&json_bytes).unwrap(); let expected = PromJsonResponse { status: "success".to_string(), - data: PromData { + data: PromResponse::PromData(PromData { result_type: "matrix".to_string(), result: vec![ PromSeries { @@ -441,7 +441,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) { ..Default::default() }, ], - }, + }), error: None, error_type: None, warnings: None, diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 8ee4093ac1..f7b76544d4 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -313,6 +313,28 @@ pub async fn test_prom_http_api(store_type: StorageType) { .await; assert_eq!(res.status(), StatusCode::OK); + // labels + let res = client.get("/api/v1/labels?match[]=up").send().await; + assert_eq!(res.status(), StatusCode::OK); + let res = client + .post("/api/v1/labels?match[]=up") + .header("Content-Type", "application/x-www-form-urlencoded") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + // labels query with multiple match[] params + let res = client + .get("/api/v1/labels?match[]=up&match[]=down") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let res = client + .post("/api/v1/labels?match[]=up&match[]=down") + .header("Content-Type", "application/x-www-form-urlencoded") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + guard.remove_all().await; }