feat: support not-equal matcher for PromQL metric names (#5385)

* feat: make instant_query and range_query to supports not-equal matchers

* feat: impl query_metric_names

* feat: forgot some files and refactor

* chore: test and docs

* fix: typo

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* refactor: parse_query

* chore: improve test

* fix: use current catalog to query information_schema

---------

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
dennis zhuang
2025-01-22 11:04:40 +08:00
committed by GitHub
parent d2f3f2e24d
commit 4259975be9
13 changed files with 557 additions and 40 deletions

2
Cargo.lock generated
View File

@@ -4078,6 +4078,7 @@ dependencies = [
"partition",
"pipeline",
"prometheus",
"promql-parser",
"prost 0.12.6",
"query",
"raft-engine",
@@ -10269,6 +10270,7 @@ dependencies = [
"snafu 0.8.5",
"snap",
"sql",
"store-api",
"strum 0.25.0",
"table",
"tempfile",

View File

@@ -37,6 +37,7 @@ common-time.workspace = true
common-version.workspace = true
datafusion-expr.workspace = true
datanode.workspace = true
datatypes.workspace = true
humantime-serde.workspace = true
lazy_static.workspace = true
log-query.workspace = true
@@ -47,6 +48,7 @@ operator.workspace = true
partition.workspace = true
pipeline.workspace = true
prometheus.workspace = true
promql-parser.workspace = true
prost.workspace = true
query.workspace = true
raft-engine.workspace = true

View File

@@ -169,6 +169,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to collect recordbatch"))]
CollectRecordbatch {
#[snafu(implicit)]
location: Location,
source: common_recordbatch::error::Error,
},
#[snafu(display("Failed to plan statement"))]
PlanStatement {
#[snafu(implicit)]
@@ -224,6 +231,13 @@ pub enum Error {
source: servers::error::Error,
},
#[snafu(display("Failed to create logical plan for prometheus metric names query"))]
PrometheusMetricNamesQueryPlan {
#[snafu(implicit)]
location: Location,
source: servers::error::Error,
},
#[snafu(display("Failed to describe schema for given statement"))]
DescribeStatement {
#[snafu(implicit)]
@@ -349,8 +363,11 @@ impl ErrorExt for Error {
Error::HandleHeartbeatResponse { source, .. } => source.status_code(),
Error::PromStoreRemoteQueryPlan { source, .. }
| Error::PrometheusMetricNamesQueryPlan { source, .. }
| Error::ExecutePromql { source, .. } => source.status_code(),
Error::CollectRecordbatch { .. } => StatusCode::EngineExecuteQuery,
Error::SqlExecIntercepted { source, .. } => source.status_code(),
Error::StartServer { source, .. } => source.status_code(),
Error::ShutdownServer { source, .. } => source.status_code(),

View File

@@ -20,6 +20,7 @@ mod logs;
mod opentsdb;
mod otlp;
mod prom_store;
mod promql;
mod region_query;
pub mod standalone;
@@ -47,6 +48,7 @@ use operator::insert::InserterRef;
use operator::statement::StatementExecutor;
use pipeline::pipeline_operator::PipelineOperator;
use prometheus::HistogramTimer;
use promql_parser::label::Matcher;
use query::metrics::OnDone;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
@@ -450,6 +452,17 @@ impl PrometheusHandler for Instance {
Ok(interceptor.post_execute(output, query_ctx)?)
}
async fn query_metric_names(
&self,
matchers: Vec<Matcher>,
ctx: &QueryContextRef,
) -> server_error::Result<Vec<String>> {
self.handle_query_metric_names(matchers, ctx)
.await
.map_err(BoxedError::new)
.context(ExecuteQuerySnafu)
}
fn catalog_manager(&self) -> CatalogManagerRef {
self.catalog_manager.clone()
}

View File

@@ -0,0 +1,99 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use catalog::information_schema::TABLES;
use client::OutputData;
use common_catalog::consts::INFORMATION_SCHEMA_NAME;
use common_recordbatch::util;
use common_telemetry::tracing;
use datatypes::prelude::Value;
use promql_parser::label::Matcher;
use servers::prometheus;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use crate::error::{
CatalogSnafu, CollectRecordbatchSnafu, ExecLogicalPlanSnafu,
PrometheusMetricNamesQueryPlanSnafu, ReadTableSnafu, Result, TableNotFoundSnafu,
};
use crate::instance::Instance;
impl Instance {
/// Handles metric names query request, returns the names.
#[tracing::instrument(skip_all)]
pub(crate) async fn handle_query_metric_names(
&self,
matchers: Vec<Matcher>,
ctx: &QueryContextRef,
) -> Result<Vec<String>> {
let _timer = crate::metrics::PROMQL_QUERY_METRICS_ELAPSED
.with_label_values(&[ctx.get_db_string().as_str()])
.start_timer();
let table = self
.catalog_manager
.table(
ctx.current_catalog(),
INFORMATION_SCHEMA_NAME,
TABLES,
Some(ctx),
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: "greptime.information_schema.tables",
})?;
let dataframe = self
.query_engine
.read_table(table)
.with_context(|_| ReadTableSnafu {
table_name: "greptime.information_schema.tables",
})?;
let logical_plan = prometheus::metric_name_matchers_to_plan(dataframe, matchers, ctx)
.context(PrometheusMetricNamesQueryPlanSnafu)?;
let results = self
.query_engine
.execute(logical_plan, ctx.clone())
.await
.context(ExecLogicalPlanSnafu)?;
let batches = match results.data {
OutputData::Stream(stream) => util::collect(stream)
.await
.context(CollectRecordbatchSnafu)?,
OutputData::RecordBatches(rbs) => rbs.take(),
_ => unreachable!("should not happen"),
};
let mut results = Vec::with_capacity(batches.iter().map(|b| b.num_rows()).sum());
for batch in batches {
// Only one column the results, ensured by `prometheus::metric_name_matchers_to_plan`.
let names = batch.column(0);
for i in 0..names.len() {
let Value::String(name) = names.get(i) else {
unreachable!();
};
results.push(name.into_string());
}
}
Ok(results)
}
}

View File

@@ -29,6 +29,13 @@ lazy_static! {
pub static ref GRPC_HANDLE_PROMQL_ELAPSED: Histogram = GRPC_HANDLE_QUERY_ELAPSED
.with_label_values(&["promql"]);
pub static ref PROMQL_QUERY_METRICS_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_frontend_promql_query_metrics_elapsed",
"frontend promql query metric names elapsed",
&["db"]
)
.unwrap();
/// The number of OpenTelemetry metrics send by frontend node.
pub static ref OTLP_METRICS_ROWS: IntCounter = register_int_counter!(
"greptime_frontend_otlp_metrics_rows",

View File

@@ -102,6 +102,7 @@ session.workspace = true
snafu.workspace = true
snap = "1"
sql.workspace = true
store-api.workspace = true
strum.workspace = true
table.workspace = true
tokio.workspace = true

View File

@@ -24,14 +24,15 @@ use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_query::{Output, OutputData};
use common_recordbatch::RecordBatches;
use common_telemetry::tracing;
use common_telemetry::{debug, tracing};
use common_time::util::{current_time_rfc3339, yesterday_rfc3339};
use common_version::OwnedBuildInfo;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVector;
use datatypes::vectors::{Float64Vector, StringVector};
use futures::future::join_all;
use futures::StreamExt;
use promql_parser::label::METRIC_NAME;
use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME};
use promql_parser::parser::value::ValueType;
use promql_parser::parser::{
AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, MatrixSelector, ParenExpr, SubqueryExpr,
@@ -41,8 +42,11 @@ use query::parser::{PromQuery, DEFAULT_LOOKBACK_STRING};
use serde::de::{self, MapAccess, Visitor};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use session::context::QueryContext;
use session::context::{QueryContext, QueryContextRef};
use snafu::{Location, OptionExt, ResultExt};
use store_api::metric_engine_consts::{
DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
};
pub use super::result::prometheus_resp::PrometheusJsonResponse;
use crate::error::{
@@ -104,6 +108,44 @@ pub enum PrometheusResponse {
ParseResult(promql_parser::parser::Expr),
}
impl PrometheusResponse {
/// Append the other [`PrometheusResponse]`.
/// # NOTE
/// Only append matrix and vector results, otherwise just ignore the other response.
fn append(&mut self, other: PrometheusResponse) {
match (self, other) {
(
PrometheusResponse::PromData(PromData {
result: PromQueryResult::Matrix(lhs),
..
}),
PrometheusResponse::PromData(PromData {
result: PromQueryResult::Matrix(rhs),
..
}),
) => {
lhs.extend(rhs);
}
(
PrometheusResponse::PromData(PromData {
result: PromQueryResult::Vector(lhs),
..
}),
PrometheusResponse::PromData(PromData {
result: PromQueryResult::Vector(rhs),
..
}),
) => {
lhs.extend(rhs);
}
_ => {
// TODO(dennis): process other cases?
}
}
}
}
impl Default for PrometheusResponse {
fn default() -> Self {
PrometheusResponse::PromData(Default::default())
@@ -161,6 +203,20 @@ pub struct InstantQuery {
db: Option<String>,
}
/// Helper macro which try to evaluate the expression and return its results.
/// If the evaluation fails, return a `PrometheusJsonResponse` early.
macro_rules! try_call_return_response {
($handle: expr) => {
match $handle {
Ok(res) => res,
Err(err) => {
let msg = err.to_string();
return PrometheusJsonResponse::error(StatusCode::InvalidArguments, msg);
}
}
};
}
#[axum_macros::debug_handler]
#[tracing::instrument(
skip_all,
@@ -188,6 +244,8 @@ pub async fn instant_query(
.unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
};
let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
// update catalog and schema in query context if necessary
if let Some(db) = &params.db {
let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
@@ -199,7 +257,64 @@ pub async fn instant_query(
.with_label_values(&[query_ctx.get_db_string().as_str(), "instant_query"])
.start_timer();
let result = handler.do_query(&prom_query, query_ctx).await;
if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
&& !name_matchers.is_empty()
{
debug!("Find metric name matchers: {:?}", name_matchers);
let metric_names =
try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
debug!("Find metric names: {:?}", metric_names);
if metric_names.is_empty() {
let result_type = promql_expr.value_type();
return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
result_type: result_type.to_string(),
..Default::default()
}));
}
let responses = join_all(metric_names.into_iter().map(|metric| {
let mut prom_query = prom_query.clone();
let mut promql_expr = promql_expr.clone();
let query_ctx = query_ctx.clone();
let handler = handler.clone();
async move {
update_metric_name_matcher(&mut promql_expr, &metric);
let new_query = promql_expr.to_string();
debug!(
"Updated promql, before: {}, after: {}",
&prom_query.query, new_query
);
prom_query.query = new_query;
do_instant_query(&handler, &prom_query, query_ctx).await
}
}))
.await;
responses
.into_iter()
.reduce(|mut acc, resp| {
acc.data.append(resp.data);
acc
})
.unwrap()
} else {
do_instant_query(&handler, &prom_query, query_ctx).await
}
}
/// Executes a single instant query and returns response
async fn do_instant_query(
handler: &PrometheusHandlerRef,
prom_query: &PromQuery,
query_ctx: QueryContextRef,
) -> PrometheusJsonResponse {
let result = handler.do_query(prom_query, query_ctx).await;
let (metric_name, result_type) = match retrieve_metric_name_and_result_type(&prom_query.query) {
Ok((metric_name, result_type)) => (metric_name.unwrap_or_default(), result_type),
Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
@@ -240,18 +355,75 @@ pub async fn range_query(
.unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
};
let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
// update catalog and schema in query context if necessary
if let Some(db) = &params.db {
let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
}
let query_ctx = Arc::new(query_ctx);
let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
.with_label_values(&[query_ctx.get_db_string().as_str(), "range_query"])
.start_timer();
let result = handler.do_query(&prom_query, query_ctx).await;
if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
&& !name_matchers.is_empty()
{
debug!("Find metric name matchers: {:?}", name_matchers);
let metric_names =
try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
debug!("Find metric names: {:?}", metric_names);
if metric_names.is_empty() {
return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
result_type: ValueType::Matrix.to_string(),
..Default::default()
}));
}
let responses = join_all(metric_names.into_iter().map(|metric| {
let mut prom_query = prom_query.clone();
let mut promql_expr = promql_expr.clone();
let query_ctx = query_ctx.clone();
let handler = handler.clone();
async move {
update_metric_name_matcher(&mut promql_expr, &metric);
let new_query = promql_expr.to_string();
debug!(
"Updated promql, before: {}, after: {}",
&prom_query.query, new_query
);
prom_query.query = new_query;
do_range_query(&handler, &prom_query, query_ctx).await
}
}))
.await;
// Safety: at least one responses, checked above
responses
.into_iter()
.reduce(|mut acc, resp| {
acc.data.append(resp.data);
acc
})
.unwrap()
} else {
do_range_query(&handler, &prom_query, query_ctx).await
}
}
/// Executes a single range query and returns response
async fn do_range_query(
handler: &PrometheusHandlerRef,
prom_query: &PromQuery,
query_ctx: QueryContextRef,
) -> PrometheusJsonResponse {
let result = handler.do_query(prom_query, query_ctx).await;
let metric_name = match retrieve_metric_name_and_result_type(&prom_query.query) {
Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
Ok((metric_name, _)) => metric_name.unwrap_or_default(),
@@ -414,7 +586,11 @@ async fn get_all_column_names(
continue;
};
for column in table.primary_key_columns() {
labels.insert(column.name);
if column.name != DATA_SCHEMA_TABLE_ID_COLUMN_NAME
&& column.name != DATA_SCHEMA_TSID_COLUMN_NAME
{
labels.insert(column.name);
}
}
}
@@ -629,35 +805,109 @@ pub(crate) fn try_update_catalog_schema(ctx: &mut QueryContext, catalog: &str, s
}
fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
find_metric_name_and_matchers(expr, |name, matchers| {
name.clone().or(matchers
.find_matchers(METRIC_NAME)
.into_iter()
.next()
.map(|m| m.value))
})
}
fn find_metric_name_and_matchers<E, F>(expr: &PromqlExpr, f: F) -> Option<E>
where
F: Fn(&Option<String>, &Matchers) -> Option<E> + Clone,
{
match expr {
PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => promql_expr_to_metric_name(expr),
PromqlExpr::Unary(UnaryExpr { expr }) => promql_expr_to_metric_name(expr),
PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
PromqlExpr::Unary(UnaryExpr { expr }) => find_metric_name_and_matchers(expr, f),
PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
promql_expr_to_metric_name(lhs).or(promql_expr_to_metric_name(rhs))
find_metric_name_and_matchers(lhs, f.clone()).or(find_metric_name_and_matchers(rhs, f))
}
PromqlExpr::Paren(ParenExpr { expr }) => promql_expr_to_metric_name(expr),
PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => promql_expr_to_metric_name(expr),
PromqlExpr::NumberLiteral(_) => Some(String::new()),
PromqlExpr::StringLiteral(_) => Some(String::new()),
PromqlExpr::Paren(ParenExpr { expr }) => find_metric_name_and_matchers(expr, f),
PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
PromqlExpr::NumberLiteral(_) => None,
PromqlExpr::StringLiteral(_) => None,
PromqlExpr::Extension(_) => None,
PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => f(name, matchers),
PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
let VectorSelector { name, matchers, .. } = vs;
f(name, matchers)
}
PromqlExpr::Call(Call { args, .. }) => args
.args
.iter()
.find_map(|e| find_metric_name_and_matchers(e, f.clone())),
}
}
/// Try to find the `__name__` matchers which op is not `MatchOp::Equal`.
fn find_metric_name_not_equal_matchers(expr: &PromqlExpr) -> Option<Vec<Matcher>> {
find_metric_name_and_matchers(expr, |name, matchers| {
// Has name, ignore the matchers
if name.is_some() {
return None;
}
// FIXME(dennis): we don't consider the nested and `or` matchers yet.
Some(matchers.find_matchers(METRIC_NAME))
})
.map(|matchers| {
matchers
.into_iter()
.filter(|m| !matches!(m.op, MatchOp::Equal))
.collect::<Vec<_>>()
})
}
/// Update the `__name__` matchers in expression into special value
/// Returns the updated expression.
fn update_metric_name_matcher(expr: &mut PromqlExpr, metric_name: &str) {
match expr {
PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => {
update_metric_name_matcher(expr, metric_name)
}
PromqlExpr::Unary(UnaryExpr { expr }) => update_metric_name_matcher(expr, metric_name),
PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
update_metric_name_matcher(lhs, metric_name);
update_metric_name_matcher(rhs, metric_name);
}
PromqlExpr::Paren(ParenExpr { expr }) => update_metric_name_matcher(expr, metric_name),
PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => {
update_metric_name_matcher(expr, metric_name)
}
PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
name.clone().or(matchers
.find_matchers(METRIC_NAME)
.into_iter()
.next()
.map(|m| m.value))
if name.is_some() {
return;
}
for m in &mut matchers.matchers {
if m.name == METRIC_NAME {
m.op = MatchOp::Equal;
m.value = metric_name.to_string();
}
}
}
PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
let VectorSelector { name, matchers, .. } = vs;
name.clone().or(matchers
.find_matchers(METRIC_NAME)
.into_iter()
.next()
.map(|m| m.value))
if name.is_some() {
return;
}
for m in &mut matchers.matchers {
if m.name == METRIC_NAME {
m.op = MatchOp::Equal;
m.value = metric_name.to_string();
}
}
}
PromqlExpr::Call(Call { args, .. }) => {
args.args.iter().find_map(|e| promql_expr_to_metric_name(e))
args.args.iter_mut().for_each(|e| {
update_metric_name_matcher(e, metric_name);
});
}
PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
}
}
@@ -880,7 +1130,6 @@ async fn retrieve_label_values_from_record_batch(
/// Returns the metric name if a single metric is referenced, otherwise None.
fn retrieve_metric_name_from_promql(query: &str) -> Option<String> {
let promql_expr = promql_parser::parser::parse(query).ok()?;
// promql_expr_to_metric_name(&promql_expr)
struct MetricNameVisitor {
metric_name: Option<String>,
@@ -1034,13 +1283,8 @@ pub async fn parse_query(
Form(form_params): Form<ParseQuery>,
) -> PrometheusJsonResponse {
if let Some(query) = params.query.or(form_params.query) {
match promql_parser::parser::parse(&query) {
Ok(ast) => PrometheusJsonResponse::success(PrometheusResponse::ParseResult(ast)),
Err(err) => {
let msg = err.to_string();
PrometheusJsonResponse::error(StatusCode::InvalidArguments, msg)
}
}
let ast = try_call_return_response!(promql_parser::parser::parse(&query));
PrometheusJsonResponse::success(PrometheusResponse::ParseResult(ast))
} else {
PrometheusJsonResponse::error(StatusCode::InvalidArguments, "query is required")
}

View File

@@ -40,6 +40,7 @@ pub mod otlp;
pub mod postgres;
mod prom_row_builder;
pub mod prom_store;
pub mod prometheus;
pub mod prometheus_handler;
pub mod proto;
pub mod query_handler;

View File

@@ -0,0 +1,84 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use catalog::system_schema::information_schema::tables::{
ENGINE as TABLE_ENGINE, TABLE_CATALOG, TABLE_NAME, TABLE_SCHEMA,
};
use common_telemetry::tracing;
use datafusion::prelude::{col, lit, regexp_match, Expr};
use datafusion_expr::LogicalPlan;
use promql_parser::label::{MatchOp, Matcher};
use query::dataframe::DataFrame;
use session::context::QueryContextRef;
use snafu::ResultExt;
use crate::error::{self, Result};
/// The maximum number of metrics at one time.
const MAX_METRICS_NUM: usize = 1024;
/// Create a DataFrame from promql `__name__` matchers.
/// # Panics
/// Panic when the machers contains `MatchOp::Equal`.
#[tracing::instrument(skip_all)]
pub fn metric_name_matchers_to_plan(
dataframe: DataFrame,
matchers: Vec<Matcher>,
ctx: &QueryContextRef,
) -> Result<LogicalPlan> {
assert!(!matchers.is_empty());
let mut conditions = Vec::with_capacity(matchers.len() + 3);
conditions.push(col(TABLE_CATALOG).eq(lit(ctx.current_catalog())));
conditions.push(col(TABLE_SCHEMA).eq(lit(ctx.current_schema())));
// Must be metric engine
conditions.push(col(TABLE_ENGINE).eq(lit("metric")));
for m in matchers {
let value = &m.value;
match &m.op {
MatchOp::NotEqual => {
conditions.push(col(TABLE_NAME).not_eq(lit(value)));
}
// Case sensitive regexp match
MatchOp::Re(regex) => {
conditions.push(
regexp_match(col(TABLE_NAME), lit(regex.to_string()), None).is_not_null(),
);
}
// Case sensitive regexp not match
MatchOp::NotRe(regex) => {
conditions
.push(regexp_match(col(TABLE_NAME), lit(regex.to_string()), None).is_null());
}
_ => unreachable!("checked outside"),
}
}
// Safety: conditions MUST not be empty, reduce always return Some(expr).
let conditions = conditions.into_iter().reduce(Expr::and).unwrap();
let DataFrame::DataFusion(dataframe) = dataframe;
let dataframe = dataframe
.filter(conditions)
.context(error::DataFrameSnafu)?
.select(vec![col(TABLE_NAME)])
.context(error::DataFrameSnafu)?
.limit(0, Some(MAX_METRICS_NUM))
.context(error::DataFrameSnafu)?;
Ok(dataframe.into_parts().1)
}

View File

@@ -19,6 +19,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_query::Output;
use promql_parser::label::Matcher;
use query::parser::PromQuery;
use session::context::QueryContextRef;
@@ -32,5 +33,12 @@ pub type PrometheusHandlerRef = Arc<dyn PrometheusHandler + Send + Sync>;
pub trait PrometheusHandler {
async fn do_query(&self, query: &PromQuery, query_ctx: QueryContextRef) -> Result<Output>;
/// Query metric table names by the `__name__` matchers.
async fn query_metric_names(
&self,
matchers: Vec<Matcher>,
ctx: &QueryContextRef,
) -> Result<Vec<String>>;
fn catalog_manager(&self) -> CatalogManagerRef;
}

View File

@@ -438,6 +438,11 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider(
(app, instance.guard)
}
async fn run_sql(sql: &str, instance: &GreptimeDbStandalone) {
let result = instance.instance.do_query(sql, QueryContext::arc()).await;
let _ = result.first().unwrap().as_ref().unwrap();
}
pub async fn setup_test_prom_app_with_frontend(
store_type: StorageType,
name: &str,
@@ -446,11 +451,20 @@ pub async fn setup_test_prom_app_with_frontend(
let instance = setup_standalone_instance(name, store_type).await;
create_test_table(instance.instance.as_ref(), "demo").await;
let sql = "INSERT INTO demo VALUES ('host1', 1.1, 2.2, 0), ('host2', 2.1, 4.3, 600000)";
let result = instance.instance.do_query(sql, QueryContext::arc()).await;
let _ = result.first().unwrap().as_ref().unwrap();
// build physical table
let sql = "CREATE TABLE phy (ts timestamp time index, val double, host string primary key) engine=metric with ('physical_metric_table' = '')";
run_sql(sql, &instance).await;
// build metric tables
let sql = "CREATE TABLE demo (ts timestamp time index, val double, host string primary key) engine=metric with ('on_physical_table' = 'phy')";
run_sql(sql, &instance).await;
let sql = "CREATE TABLE demo_metrics (ts timestamp time index, val double, idc string primary key) engine=metric with ('on_physical_table' = 'phy')";
run_sql(sql, &instance).await;
// insert rows
let sql = "INSERT INTO demo(host, val, ts) VALUES ('host1', 1.1, 0), ('host2', 2.1, 600000)";
run_sql(sql, &instance).await;
let sql =
"INSERT INTO demo_metrics(idc, val, ts) VALUES ('idc1', 1.1, 0), ('idc2', 2.1, 600000)";
run_sql(sql, &instance).await;
let http_opts = HttpOptions {
addr: format!("127.0.0.1:{}", ports::get_port()),

View File

@@ -520,7 +520,7 @@ pub async fn test_prom_http_api(store_type: StorageType) {
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PrometheusResponse>(json!(["__name__", "host", "number",]))
serde_json::from_value::<PrometheusResponse>(json!(["__name__", "host", "idc", "number",]))
.unwrap()
);
@@ -613,7 +613,7 @@ pub async fn test_prom_http_api(store_type: StorageType) {
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PrometheusResponse>(json!(["cpu", "memory"])).unwrap()
serde_json::from_value::<PrometheusResponse>(json!(["val"])).unwrap()
);
// query an empty database should return nothing
@@ -698,6 +698,31 @@ pub async fn test_prom_http_api(store_type: StorageType) {
let expected = "{\"status\":\"error\",\"data\":{\"resultType\":\"\",\"result\":[]},\"error\":\"invalid promql query\",\"errorType\":\"InvalidArguments\"}";
assert_eq!(expected, data);
// range_query with __name__ not-equal matcher
let res = client
.post("/v1/prometheus/api/v1/query_range?query=count by(__name__)({__name__=~'demo.*'})&start=1&end=100&step=5")
.header("Content-Type", "application/x-www-form-urlencoded")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let data = res.text().await;
assert!(
data.contains("{\"__name__\":\"demo_metrics\"}")
&& data.contains("{\"__name__\":\"demo\"}")
);
let res = client
.post("/v1/prometheus/api/v1/query_range?query=count by(__name__)({__name__=~'demo_metrics'})&start=1&end=100&step=5")
.header("Content-Type", "application/x-www-form-urlencoded")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let data = res.text().await;
assert!(
data.contains("{\"__name__\":\"demo_metrics\"}")
&& !data.contains("{\"__name__\":\"demo\"}")
);
guard.remove_all().await;
}