feat: implement /api/v1/labels for prometheus (#1580)

* feat: implement /api/v1/labels for prometheus

* fix: only gather match[]

* chore: fix typo

* chore: fix typo

* chore: change style

* fix: suggestion

* fix: suggestion

* chore: typo

* fix: fmt

* fix: add more test
This commit is contained in:
Huaijin
2023-05-17 11:56:22 +08:00
committed by GitHub
parent a6ec79ee30
commit 715e1a321f
7 changed files with 260 additions and 25 deletions

View File

@@ -22,6 +22,13 @@ pub fn current_time_rfc3339() -> String {
chrono::Utc::now().to_rfc3339()
}
/// Returns the yesterday time in rfc3339 format.
pub fn yesterday_rfc3339() -> String {
let now = chrono::Utc::now();
let day_before = now - chrono::Duration::days(1);
day_before.to_rfc3339()
}
/// Port of rust unstable features `int_roundings`.
pub(crate) fn div_ceil(this: i64, rhs: i64) -> i64 {
let d = this / rhs;

View File

@@ -37,6 +37,7 @@ use crate::error::{
use crate::metrics::{METRIC_PARSE_PROMQL_ELAPSED, METRIC_PARSE_SQL_ELAPSED};
const DEFAULT_LOOKBACK: u64 = 5 * 60; // 5m
pub const DEFAULT_LOOKBACK_STRING: &str = "5m";
pub const EXPLAIN_NODE_NAME: &str = "EXPLAIN";
pub const ANALYZE_NODE_NAME: &str = "ANALYZE";

View File

@@ -281,6 +281,9 @@ pub enum Error {
#[snafu(backtrace)]
source: query::error::Error,
},
#[snafu(display("{}", reason))]
UnexpectedResult { reason: String, location: Location },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -348,6 +351,8 @@ impl ErrorExt for Error {
InvalidFlushArgument { .. } => StatusCode::InvalidArguments,
ParsePromQL { source, .. } => source.status_code(),
UnexpectedResult { .. } => StatusCode::Unexpected,
}
}

View File

@@ -13,7 +13,7 @@
// limitations under the License.
//! prom supply the prometheus HTTP API Server compliance
use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::net::SocketAddr;
use std::sync::Arc;
@@ -27,7 +27,7 @@ use common_error::status_code::StatusCode;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::info;
use common_time::util::current_time_rfc3339;
use common_time::util::{current_time_rfc3339, yesterday_rfc3339};
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVector;
use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
@@ -37,11 +37,12 @@ use promql_parser::parser::{
AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, MatrixSelector, ParenExpr, SubqueryExpr,
UnaryExpr, ValueType, VectorSelector,
};
use query::parser::PromQuery;
use query::parser::{PromQuery, DEFAULT_LOOKBACK_STRING};
use schemars::JsonSchema;
use serde::de::{self, MapAccess, Visitor};
use serde::{Deserialize, Serialize};
use session::context::{QueryContext, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};
use snafu::{ensure, Location, OptionExt, ResultExt};
use tokio::sync::oneshot::Sender;
use tokio::sync::{oneshot, Mutex};
use tower::ServiceBuilder;
@@ -51,10 +52,11 @@ use tower_http::trace::TraceLayer;
use crate::auth::UserProviderRef;
use crate::error::{
AlreadyStartedSnafu, CollectRecordbatchSnafu, InternalSnafu, NotSupportedSnafu, Result,
AlreadyStartedSnafu, CollectRecordbatchSnafu, Error, InternalSnafu, NotSupportedSnafu, Result,
StartHttpSnafu,
};
use crate::http::authorize::HttpAuth;
use crate::prometheus::{FIELD_COLUMN_NAME, TIMESTAMP_COLUMN_NAME};
use crate::server::Server;
pub const PROM_API_VERSION: &str = "v1";
@@ -88,11 +90,12 @@ impl PromServer {
}
pub fn make_app(&self) -> Router {
// TODO(ruihang): implement format_query, series, labels, values, query_examplars and targets methods
// TODO(ruihang): implement format_query, series, values, query_examplars and targets methods
let router = Router::new()
.route("/query", routing::post(instant_query).get(instant_query))
.route("/query_range", routing::post(range_query).get(range_query))
.route("/labels", routing::post(labels_query).get(labels_query))
.with_state(self.query_handler.clone());
Router::new()
@@ -175,10 +178,23 @@ pub struct PromData {
pub result: Vec<PromSeries>,
}
#[derive(Debug, Serialize, Deserialize, JsonSchema, PartialEq)]
#[serde(untagged)]
pub enum PromResponse {
PromData(PromData),
Labels(Vec<String>),
}
impl Default for PromResponse {
fn default() -> Self {
PromResponse::PromData(Default::default())
}
}
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)]
pub struct PromJsonResponse {
pub status: String,
pub data: PromData,
pub data: PromResponse,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
@@ -196,14 +212,14 @@ impl PromJsonResponse {
{
Json(PromJsonResponse {
status: "error".to_string(),
data: PromData::default(),
data: PromResponse::default(),
error: Some(reason.into()),
error_type: Some(error_type.into()),
warnings: None,
})
}
pub fn success(data: PromData) -> Json<Self> {
pub fn success(data: PromResponse) -> Json<Self> {
Json(PromJsonResponse {
status: "success".to_string(),
data,
@@ -236,10 +252,9 @@ impl PromJsonResponse {
result_type,
)?)
}
Output::AffectedRows(_) => Self::error(
"unexpected result",
"expected data result, but got affected rows",
),
Output::AffectedRows(_) => {
Self::error("Unexpected", "expected data result, but got affected rows")
}
};
json
@@ -254,10 +269,10 @@ impl PromJsonResponse {
if err.status_code() == StatusCode::TableNotFound
|| err.status_code() == StatusCode::TableColumnNotFound
{
Self::success(PromData {
Self::success(PromResponse::PromData(PromData {
result_type: result_type_string,
..Default::default()
})
}))
} else {
Self::error(err.status_code().to_string(), err.to_string())
}
@@ -270,7 +285,7 @@ impl PromJsonResponse {
batches: RecordBatches,
metric_name: String,
result_type: Option<ValueType>,
) -> Result<PromData> {
) -> Result<PromResponse> {
// infer semantic type of each column from schema.
// TODO(ruihang): wish there is a better way to do this.
let mut timestamp_column_index = None;
@@ -383,10 +398,10 @@ impl PromJsonResponse {
.collect::<Result<Vec<_>>>()?;
let result_type_string = result_type.map(|t| t.to_string()).unwrap_or_default();
let data = PromData {
let data = PromResponse::PromData(PromData {
result_type: result_type_string,
result,
};
});
Ok(data)
}
@@ -463,6 +478,191 @@ pub async fn range_query(
PromJsonResponse::from_query_result(result, metric_name, Some(ValueType::Matrix)).await
}
#[derive(Debug, Default, Serialize, JsonSchema)]
struct Matches(Vec<String>);
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct LabelsQuery {
start: Option<String>,
end: Option<String>,
#[serde(flatten)]
matches: Matches,
db: Option<String>,
}
// Custom Deserialize method to support parsing repeated match[]
impl<'de> Deserialize<'de> for Matches {
fn deserialize<D>(deserializer: D) -> std::result::Result<Matches, D::Error>
where
D: de::Deserializer<'de>,
{
struct MatchesVisitor;
impl<'d> Visitor<'d> for MatchesVisitor {
type Value = Vec<String>;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("a string")
}
fn visit_map<M>(self, mut access: M) -> std::result::Result<Self::Value, M::Error>
where
M: MapAccess<'d>,
{
let mut matches = Vec::new();
while let Some((key, value)) = access.next_entry::<String, String>()? {
if key == "match[]" {
matches.push(value);
}
}
Ok(matches)
}
}
Ok(Matches(deserializer.deserialize_map(MatchesVisitor)?))
}
}
#[axum_macros::debug_handler]
pub async fn labels_query(
State(handler): State<PromHandlerRef>,
Query(params): Query<LabelsQuery>,
Form(form_params): Form<LabelsQuery>,
) -> Json<PromJsonResponse> {
let mut queries: Vec<String> = params.matches.0;
if queries.is_empty() {
queries = form_params.matches.0;
}
if queries.is_empty() {
return PromJsonResponse::error("Unsupported", "match[] parameter is required");
}
let start = params
.start
.or(form_params.start)
.unwrap_or_else(yesterday_rfc3339);
let end = params
.end
.or(form_params.end)
.unwrap_or_else(current_time_rfc3339);
let db = &params.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string());
let (catalog, schema) = super::parse_catalog_and_schema_from_client_database_name(db);
let query_ctx = Arc::new(QueryContext::with(catalog, schema));
let mut labels: HashSet<String> = HashSet::new();
labels.insert(METRIC_NAME.to_string());
for query in queries {
let prom_query = PromQuery {
query,
start: start.clone(),
end: end.clone(),
// TODO: find a better value for step
step: DEFAULT_LOOKBACK_STRING.to_string(),
};
let result = handler.do_query(&prom_query, query_ctx.clone()).await;
let response = retrieve_labels_name_from_query_result(result, &mut labels).await;
if let Err(err) = response {
// Prometheus won't report error if querying nonexist label and metric
if err.status_code() != StatusCode::TableNotFound
&& err.status_code() != StatusCode::TableColumnNotFound
{
return PromJsonResponse::error(err.status_code().to_string(), err.to_string());
}
}
}
labels.remove(TIMESTAMP_COLUMN_NAME);
labels.remove(FIELD_COLUMN_NAME);
let mut sorted_labels: Vec<String> = labels.into_iter().collect();
sorted_labels.sort();
PromJsonResponse::success(PromResponse::Labels(sorted_labels))
}
/// Retrieve labels name from query result
async fn retrieve_labels_name_from_query_result(
result: Result<Output>,
labels: &mut HashSet<String>,
) -> Result<()> {
match result? {
Output::RecordBatches(batches) => {
record_batches_to_labels_name(batches, labels)?;
Ok(())
}
Output::Stream(stream) => {
let batches = RecordBatches::try_collect(stream)
.await
.context(CollectRecordbatchSnafu)?;
record_batches_to_labels_name(batches, labels)?;
Ok(())
}
Output::AffectedRows(_) => Err(Error::UnexpectedResult {
reason: "expected data result, but got affected rows".to_string(),
location: Location::default(),
}),
}
}
/// Retrieve labels name from record batches
fn record_batches_to_labels_name(
batches: RecordBatches,
labels: &mut HashSet<String>,
) -> Result<()> {
let mut column_indices = Vec::new();
let mut field_column_indices = Vec::new();
for (i, column) in batches.schema().column_schemas().iter().enumerate() {
if let ConcreteDataType::Float64(_) = column.data_type {
field_column_indices.push(i);
}
column_indices.push(i);
}
if field_column_indices.is_empty() {
return Err(Error::Internal {
err_msg: "no value column found".to_string(),
});
}
for batch in batches.iter() {
let names = column_indices
.iter()
.map(|c| batches.schema().column_name_by_index(*c).to_string())
.collect::<Vec<_>>();
let field_columns = field_column_indices
.iter()
.map(|i| {
batch
.column(*i)
.as_any()
.downcast_ref::<Float64Vector>()
.unwrap()
})
.collect::<Vec<_>>();
for row_index in 0..batch.num_rows() {
// if all field columns are null, skip this row
if field_columns
.iter()
.all(|c| c.get_data(row_index).is_none())
{
continue;
}
// if a field is not null, record the tag name and return
names.iter().for_each(|name| {
labels.insert(name.to_string());
});
return Ok(());
}
}
Ok(())
}
pub(crate) fn retrieve_metric_name_and_result_type(
promql: &str,
) -> Option<(String, Option<ValueType>)> {

View File

@@ -31,8 +31,8 @@ use snap::raw::{Decoder, Encoder};
use crate::error::{self, Result};
const TIMESTAMP_COLUMN_NAME: &str = "greptime_timestamp";
const FIELD_COLUMN_NAME: &str = "greptime_value";
pub const TIMESTAMP_COLUMN_NAME: &str = "greptime_timestamp";
pub const FIELD_COLUMN_NAME: &str = "greptime_value";
pub const METRIC_NAME_LABEL: &str = "__name__";
/// Metrics for push gateway protocol

View File

@@ -22,7 +22,7 @@ use api::v1::{
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::consts::{MIN_USER_TABLE_ID, MITO_ENGINE};
use common_query::Output;
use servers::prom::{PromData, PromJsonResponse, PromSeries};
use servers::prom::{PromData, PromJsonResponse, PromResponse, PromSeries};
use servers::server::Server;
use tests_integration::test_util::{setup_grpc_server, StorageType};
@@ -366,7 +366,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) {
let instant_query_result = serde_json::from_slice::<PromJsonResponse>(&json_bytes).unwrap();
let expected = PromJsonResponse {
status: "success".to_string(),
data: PromData {
data: PromResponse::PromData(PromData {
result_type: "vector".to_string(),
result: vec![
PromSeries {
@@ -390,7 +390,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) {
..Default::default()
},
],
},
}),
error: None,
error_type: None,
warnings: None,
@@ -417,7 +417,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) {
let range_query_result = serde_json::from_slice::<PromJsonResponse>(&json_bytes).unwrap();
let expected = PromJsonResponse {
status: "success".to_string(),
data: PromData {
data: PromResponse::PromData(PromData {
result_type: "matrix".to_string(),
result: vec![
PromSeries {
@@ -441,7 +441,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) {
..Default::default()
},
],
},
}),
error: None,
error_type: None,
warnings: None,

View File

@@ -313,6 +313,28 @@ pub async fn test_prom_http_api(store_type: StorageType) {
.await;
assert_eq!(res.status(), StatusCode::OK);
// labels
let res = client.get("/api/v1/labels?match[]=up").send().await;
assert_eq!(res.status(), StatusCode::OK);
let res = client
.post("/api/v1/labels?match[]=up")
.header("Content-Type", "application/x-www-form-urlencoded")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
// labels query with multiple match[] params
let res = client
.get("/api/v1/labels?match[]=up&match[]=down")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let res = client
.post("/api/v1/labels?match[]=up&match[]=down")
.header("Content-Type", "application/x-www-form-urlencoded")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
guard.remove_all().await;
}