feat: support querying field column names in Prometheus HTTP API (#3880)

* feat: support querying field column names in Prometheus HTTP API

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* use tables stream API

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2024-05-08 20:18:20 +08:00
committed by GitHub
parent d99746385b
commit a6a702de4e
5 changed files with 97 additions and 3 deletions

View File

@@ -310,14 +310,14 @@ pub enum Error {
location: Location,
},
#[snafu(display("Cannot find requested database: {}-{}", catalog, schema))]
#[snafu(display("Cannot find requested database: {}.{}", catalog, schema))]
DatabaseNotFound {
catalog: String,
schema: String,
location: Location,
},
#[snafu(display("Cannot find requested table: {}-{}-{}", catalog, schema, table))]
#[snafu(display("Cannot find requested table: {}.{}.{}", catalog, schema, table))]
TableNotFound {
catalog: String,
schema: String,

View File

@@ -30,6 +30,7 @@ use common_version::BuildInfo;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVector;
use datatypes::vectors::{Float64Vector, StringVector};
use futures::StreamExt;
use promql_parser::label::METRIC_NAME;
use promql_parser::parser::{
AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, MatrixSelector, ParenExpr, SubqueryExpr,
@@ -49,7 +50,7 @@ use crate::error::{
UnexpectedResultSnafu,
};
use crate::http::header::collect_plan_metrics;
use crate::prom_store::METRIC_NAME_LABEL;
use crate::prom_store::{FIELD_NAME_LABEL, METRIC_NAME_LABEL};
use crate::prometheus_handler::PrometheusHandlerRef;
/// For [ValueType::Vector] result type
@@ -690,6 +691,23 @@ pub async fn label_values_query(
};
table_names.sort_unstable();
return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(table_names));
} else if label_name == FIELD_NAME_LABEL {
let field_columns =
match retrieve_field_names(&query_ctx, handler.catalog_manager(), params.matches.0)
.await
{
Ok(table_names) => table_names,
Err(e) => {
return PrometheusJsonResponse::error(
e.status_code().to_string(),
e.output_msg(),
);
}
};
return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(
field_columns.into_iter().collect(),
));
}
let queries = params.matches.0;
@@ -742,6 +760,44 @@ pub async fn label_values_query(
resp
}
async fn retrieve_field_names(
query_ctx: &QueryContext,
manager: CatalogManagerRef,
matches: Vec<String>,
) -> Result<HashSet<String>> {
let mut field_columns = HashSet::new();
let catalog = query_ctx.current_catalog();
let schema = query_ctx.current_schema();
if matches.is_empty() {
// query all tables if no matcher is provided
while let Some(table) = manager.tables(catalog, schema).await.next().await {
let table = table.context(CatalogSnafu)?;
for column in table.field_columns() {
field_columns.insert(column.name);
}
}
return Ok(field_columns);
}
for table_name in matches {
let table = manager
.table(catalog, schema, &table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
catalog: catalog.to_string(),
schema: schema.to_string(),
table: table_name.to_string(),
})?;
for column in table.field_columns() {
field_columns.insert(column.name);
}
}
Ok(field_columns)
}
async fn retrieve_label_values(
result: Result<Output>,
label_name: &str,

View File

@@ -42,6 +42,9 @@ pub const METRIC_NAME_LABEL: &str = "__name__";
pub const METRIC_NAME_LABEL_BYTES: &[u8] = b"__name__";
/// The same as `FIELD_COLUMN_MATCHER` in `promql` crate
pub const FIELD_NAME_LABEL: &str = "__field__";
/// Metrics for push gateway protocol
pub struct Metrics {
pub exposition: MetricsExposition<PrometheusType, PrometheusValue>,

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::sync::Arc;
use common_query::logical_plan::Expr;
@@ -90,4 +91,25 @@ impl Table {
.iter()
.map(|i| self.table_info.meta.schema.column_schemas()[*i].clone())
}
/// Get field columns in the definition order.
pub fn field_columns(&self) -> impl Iterator<Item = ColumnSchema> + '_ {
// `value_indices` in TableMeta is not reliable. Do a filter here.
let primary_keys = self
.table_info
.meta
.primary_key_indices
.iter()
.copied()
.collect::<HashSet<_>>();
self.table_info
.meta
.schema
.column_schemas()
.iter()
.enumerate()
.filter(move |(i, c)| !primary_keys.contains(i) && !c.is_time_index())
.map(|(_, c)| c.clone())
}
}

View File

@@ -544,6 +544,19 @@ pub async fn test_prom_http_api(store_type: StorageType) {
serde_json::from_value::<PrometheusResponse>(json!(["host1", "host2"])).unwrap()
);
// search field name
let res = client
.get("/v1/prometheus/api/v1/label/__field__/values?match[]=demo")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PrometheusResponse>(json!(["cpu", "memory"])).unwrap()
);
// query an empty database should return nothing
let res = client
.get("/v1/prometheus/api/v1/label/host/values?match[]=demo&start=0&end=600")