diff --git a/Cargo.lock b/Cargo.lock index efe48899a1..b4b5bdd34c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4078,6 +4078,7 @@ dependencies = [ "partition", "pipeline", "prometheus", + "promql-parser", "prost 0.12.6", "query", "raft-engine", @@ -10269,6 +10270,7 @@ dependencies = [ "snafu 0.8.5", "snap", "sql", + "store-api", "strum 0.25.0", "table", "tempfile", diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 8d8ec43f16..b6aee42c5c 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -37,6 +37,7 @@ common-time.workspace = true common-version.workspace = true datafusion-expr.workspace = true datanode.workspace = true +datatypes.workspace = true humantime-serde.workspace = true lazy_static.workspace = true log-query.workspace = true @@ -47,6 +48,7 @@ operator.workspace = true partition.workspace = true pipeline.workspace = true prometheus.workspace = true +promql-parser.workspace = true prost.workspace = true query.workspace = true raft-engine.workspace = true diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index fc24bb17dc..a3455f25d8 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -169,6 +169,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to collect recordbatch"))] + CollectRecordbatch { + #[snafu(implicit)] + location: Location, + source: common_recordbatch::error::Error, + }, + #[snafu(display("Failed to plan statement"))] PlanStatement { #[snafu(implicit)] @@ -224,6 +231,13 @@ pub enum Error { source: servers::error::Error, }, + #[snafu(display("Failed to create logical plan for prometheus metric names query"))] + PrometheusMetricNamesQueryPlan { + #[snafu(implicit)] + location: Location, + source: servers::error::Error, + }, + #[snafu(display("Failed to describe schema for given statement"))] DescribeStatement { #[snafu(implicit)] @@ -349,8 +363,11 @@ impl ErrorExt for Error { Error::HandleHeartbeatResponse { source, .. } => source.status_code(), Error::PromStoreRemoteQueryPlan { source, .. } + | Error::PrometheusMetricNamesQueryPlan { source, .. } | Error::ExecutePromql { source, .. } => source.status_code(), + Error::CollectRecordbatch { .. } => StatusCode::EngineExecuteQuery, + Error::SqlExecIntercepted { source, .. } => source.status_code(), Error::StartServer { source, .. } => source.status_code(), Error::ShutdownServer { source, .. } => source.status_code(), diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index ba187277e9..d2b9102929 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -20,6 +20,7 @@ mod logs; mod opentsdb; mod otlp; mod prom_store; +mod promql; mod region_query; pub mod standalone; @@ -47,6 +48,7 @@ use operator::insert::InserterRef; use operator::statement::StatementExecutor; use pipeline::pipeline_operator::PipelineOperator; use prometheus::HistogramTimer; +use promql_parser::label::Matcher; use query::metrics::OnDone; use query::parser::{PromQuery, QueryLanguageParser, QueryStatement}; use query::query_engine::options::{validate_catalog_and_schema, QueryOptions}; @@ -450,6 +452,17 @@ impl PrometheusHandler for Instance { Ok(interceptor.post_execute(output, query_ctx)?) } + async fn query_metric_names( + &self, + matchers: Vec, + ctx: &QueryContextRef, + ) -> server_error::Result> { + self.handle_query_metric_names(matchers, ctx) + .await + .map_err(BoxedError::new) + .context(ExecuteQuerySnafu) + } + fn catalog_manager(&self) -> CatalogManagerRef { self.catalog_manager.clone() } diff --git a/src/frontend/src/instance/promql.rs b/src/frontend/src/instance/promql.rs new file mode 100644 index 0000000000..7ee675a49e --- /dev/null +++ b/src/frontend/src/instance/promql.rs @@ -0,0 +1,99 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use catalog::information_schema::TABLES; +use client::OutputData; +use common_catalog::consts::INFORMATION_SCHEMA_NAME; +use common_recordbatch::util; +use common_telemetry::tracing; +use datatypes::prelude::Value; +use promql_parser::label::Matcher; +use servers::prometheus; +use session::context::QueryContextRef; +use snafu::{OptionExt, ResultExt}; + +use crate::error::{ + CatalogSnafu, CollectRecordbatchSnafu, ExecLogicalPlanSnafu, + PrometheusMetricNamesQueryPlanSnafu, ReadTableSnafu, Result, TableNotFoundSnafu, +}; +use crate::instance::Instance; + +impl Instance { + /// Handles metric names query request, returns the names. + #[tracing::instrument(skip_all)] + pub(crate) async fn handle_query_metric_names( + &self, + matchers: Vec, + ctx: &QueryContextRef, + ) -> Result> { + let _timer = crate::metrics::PROMQL_QUERY_METRICS_ELAPSED + .with_label_values(&[ctx.get_db_string().as_str()]) + .start_timer(); + + let table = self + .catalog_manager + .table( + ctx.current_catalog(), + INFORMATION_SCHEMA_NAME, + TABLES, + Some(ctx), + ) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: "greptime.information_schema.tables", + })?; + + let dataframe = self + .query_engine + .read_table(table) + .with_context(|_| ReadTableSnafu { + table_name: "greptime.information_schema.tables", + })?; + + let logical_plan = prometheus::metric_name_matchers_to_plan(dataframe, matchers, ctx) + .context(PrometheusMetricNamesQueryPlanSnafu)?; + + let results = self + .query_engine + .execute(logical_plan, ctx.clone()) + .await + .context(ExecLogicalPlanSnafu)?; + + let batches = match results.data { + OutputData::Stream(stream) => util::collect(stream) + .await + .context(CollectRecordbatchSnafu)?, + OutputData::RecordBatches(rbs) => rbs.take(), + _ => unreachable!("should not happen"), + }; + + let mut results = Vec::with_capacity(batches.iter().map(|b| b.num_rows()).sum()); + + for batch in batches { + // Only one column the results, ensured by `prometheus::metric_name_matchers_to_plan`. + let names = batch.column(0); + + for i in 0..names.len() { + let Value::String(name) = names.get(i) else { + unreachable!(); + }; + + results.push(name.into_string()); + } + } + + Ok(results) + } +} diff --git a/src/frontend/src/metrics.rs b/src/frontend/src/metrics.rs index d7d3953f25..5e1eb70f59 100644 --- a/src/frontend/src/metrics.rs +++ b/src/frontend/src/metrics.rs @@ -29,6 +29,13 @@ lazy_static! { pub static ref GRPC_HANDLE_PROMQL_ELAPSED: Histogram = GRPC_HANDLE_QUERY_ELAPSED .with_label_values(&["promql"]); + pub static ref PROMQL_QUERY_METRICS_ELAPSED: HistogramVec = register_histogram_vec!( + "greptime_frontend_promql_query_metrics_elapsed", + "frontend promql query metric names elapsed", + &["db"] + ) + .unwrap(); + /// The number of OpenTelemetry metrics send by frontend node. pub static ref OTLP_METRICS_ROWS: IntCounter = register_int_counter!( "greptime_frontend_otlp_metrics_rows", diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index f2341e246e..5a7d3968f8 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -102,6 +102,7 @@ session.workspace = true snafu.workspace = true snap = "1" sql.workspace = true +store-api.workspace = true strum.workspace = true table.workspace = true tokio.workspace = true diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index fa7f67f6ff..adbf49010b 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -24,14 +24,15 @@ use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_query::{Output, OutputData}; use common_recordbatch::RecordBatches; -use common_telemetry::tracing; +use common_telemetry::{debug, tracing}; use common_time::util::{current_time_rfc3339, yesterday_rfc3339}; use common_version::OwnedBuildInfo; use datatypes::prelude::ConcreteDataType; use datatypes::scalars::ScalarVector; use datatypes::vectors::{Float64Vector, StringVector}; +use futures::future::join_all; use futures::StreamExt; -use promql_parser::label::METRIC_NAME; +use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME}; use promql_parser::parser::value::ValueType; use promql_parser::parser::{ AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, MatrixSelector, ParenExpr, SubqueryExpr, @@ -41,8 +42,11 @@ use query::parser::{PromQuery, DEFAULT_LOOKBACK_STRING}; use serde::de::{self, MapAccess, Visitor}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use session::context::QueryContext; +use session::context::{QueryContext, QueryContextRef}; use snafu::{Location, OptionExt, ResultExt}; +use store_api::metric_engine_consts::{ + DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, +}; pub use super::result::prometheus_resp::PrometheusJsonResponse; use crate::error::{ @@ -104,6 +108,44 @@ pub enum PrometheusResponse { ParseResult(promql_parser::parser::Expr), } +impl PrometheusResponse { + /// Append the other [`PrometheusResponse]`. + /// # NOTE + /// Only append matrix and vector results, otherwise just ignore the other response. + fn append(&mut self, other: PrometheusResponse) { + match (self, other) { + ( + PrometheusResponse::PromData(PromData { + result: PromQueryResult::Matrix(lhs), + .. + }), + PrometheusResponse::PromData(PromData { + result: PromQueryResult::Matrix(rhs), + .. + }), + ) => { + lhs.extend(rhs); + } + + ( + PrometheusResponse::PromData(PromData { + result: PromQueryResult::Vector(lhs), + .. + }), + PrometheusResponse::PromData(PromData { + result: PromQueryResult::Vector(rhs), + .. + }), + ) => { + lhs.extend(rhs); + } + _ => { + // TODO(dennis): process other cases? + } + } + } +} + impl Default for PrometheusResponse { fn default() -> Self { PrometheusResponse::PromData(Default::default()) @@ -161,6 +203,20 @@ pub struct InstantQuery { db: Option, } +/// Helper macro which try to evaluate the expression and return its results. +/// If the evaluation fails, return a `PrometheusJsonResponse` early. +macro_rules! try_call_return_response { + ($handle: expr) => { + match $handle { + Ok(res) => res, + Err(err) => { + let msg = err.to_string(); + return PrometheusJsonResponse::error(StatusCode::InvalidArguments, msg); + } + } + }; +} + #[axum_macros::debug_handler] #[tracing::instrument( skip_all, @@ -188,6 +244,8 @@ pub async fn instant_query( .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()), }; + let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query)); + // update catalog and schema in query context if necessary if let Some(db) = ¶ms.db { let (catalog, schema) = parse_catalog_and_schema_from_db_string(db); @@ -199,7 +257,64 @@ pub async fn instant_query( .with_label_values(&[query_ctx.get_db_string().as_str(), "instant_query"]) .start_timer(); - let result = handler.do_query(&prom_query, query_ctx).await; + if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr) + && !name_matchers.is_empty() + { + debug!("Find metric name matchers: {:?}", name_matchers); + + let metric_names = + try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await); + + debug!("Find metric names: {:?}", metric_names); + + if metric_names.is_empty() { + let result_type = promql_expr.value_type(); + + return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData { + result_type: result_type.to_string(), + ..Default::default() + })); + } + + let responses = join_all(metric_names.into_iter().map(|metric| { + let mut prom_query = prom_query.clone(); + let mut promql_expr = promql_expr.clone(); + let query_ctx = query_ctx.clone(); + let handler = handler.clone(); + + async move { + update_metric_name_matcher(&mut promql_expr, &metric); + let new_query = promql_expr.to_string(); + debug!( + "Updated promql, before: {}, after: {}", + &prom_query.query, new_query + ); + prom_query.query = new_query; + + do_instant_query(&handler, &prom_query, query_ctx).await + } + })) + .await; + + responses + .into_iter() + .reduce(|mut acc, resp| { + acc.data.append(resp.data); + acc + }) + .unwrap() + } else { + do_instant_query(&handler, &prom_query, query_ctx).await + } +} + +/// Executes a single instant query and returns response +async fn do_instant_query( + handler: &PrometheusHandlerRef, + prom_query: &PromQuery, + query_ctx: QueryContextRef, +) -> PrometheusJsonResponse { + let result = handler.do_query(prom_query, query_ctx).await; let (metric_name, result_type) = match retrieve_metric_name_and_result_type(&prom_query.query) { Ok((metric_name, result_type)) => (metric_name.unwrap_or_default(), result_type), Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()), @@ -240,18 +355,75 @@ pub async fn range_query( .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()), }; + let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query)); + // update catalog and schema in query context if necessary if let Some(db) = ¶ms.db { let (catalog, schema) = parse_catalog_and_schema_from_db_string(db); try_update_catalog_schema(&mut query_ctx, &catalog, &schema); } let query_ctx = Arc::new(query_ctx); - let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED .with_label_values(&[query_ctx.get_db_string().as_str(), "range_query"]) .start_timer(); - let result = handler.do_query(&prom_query, query_ctx).await; + if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr) + && !name_matchers.is_empty() + { + debug!("Find metric name matchers: {:?}", name_matchers); + + let metric_names = + try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await); + + debug!("Find metric names: {:?}", metric_names); + + if metric_names.is_empty() { + return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData { + result_type: ValueType::Matrix.to_string(), + ..Default::default() + })); + } + + let responses = join_all(metric_names.into_iter().map(|metric| { + let mut prom_query = prom_query.clone(); + let mut promql_expr = promql_expr.clone(); + let query_ctx = query_ctx.clone(); + let handler = handler.clone(); + + async move { + update_metric_name_matcher(&mut promql_expr, &metric); + let new_query = promql_expr.to_string(); + debug!( + "Updated promql, before: {}, after: {}", + &prom_query.query, new_query + ); + prom_query.query = new_query; + + do_range_query(&handler, &prom_query, query_ctx).await + } + })) + .await; + + // Safety: at least one responses, checked above + responses + .into_iter() + .reduce(|mut acc, resp| { + acc.data.append(resp.data); + acc + }) + .unwrap() + } else { + do_range_query(&handler, &prom_query, query_ctx).await + } +} + +/// Executes a single range query and returns response +async fn do_range_query( + handler: &PrometheusHandlerRef, + prom_query: &PromQuery, + query_ctx: QueryContextRef, +) -> PrometheusJsonResponse { + let result = handler.do_query(prom_query, query_ctx).await; let metric_name = match retrieve_metric_name_and_result_type(&prom_query.query) { Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()), Ok((metric_name, _)) => metric_name.unwrap_or_default(), @@ -414,7 +586,11 @@ async fn get_all_column_names( continue; }; for column in table.primary_key_columns() { - labels.insert(column.name); + if column.name != DATA_SCHEMA_TABLE_ID_COLUMN_NAME + && column.name != DATA_SCHEMA_TSID_COLUMN_NAME + { + labels.insert(column.name); + } } } @@ -629,35 +805,109 @@ pub(crate) fn try_update_catalog_schema(ctx: &mut QueryContext, catalog: &str, s } fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option { + find_metric_name_and_matchers(expr, |name, matchers| { + name.clone().or(matchers + .find_matchers(METRIC_NAME) + .into_iter() + .next() + .map(|m| m.value)) + }) +} + +fn find_metric_name_and_matchers(expr: &PromqlExpr, f: F) -> Option +where + F: Fn(&Option, &Matchers) -> Option + Clone, +{ match expr { - PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => promql_expr_to_metric_name(expr), - PromqlExpr::Unary(UnaryExpr { expr }) => promql_expr_to_metric_name(expr), + PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => find_metric_name_and_matchers(expr, f), + PromqlExpr::Unary(UnaryExpr { expr }) => find_metric_name_and_matchers(expr, f), PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => { - promql_expr_to_metric_name(lhs).or(promql_expr_to_metric_name(rhs)) + find_metric_name_and_matchers(lhs, f.clone()).or(find_metric_name_and_matchers(rhs, f)) } - PromqlExpr::Paren(ParenExpr { expr }) => promql_expr_to_metric_name(expr), - PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => promql_expr_to_metric_name(expr), - PromqlExpr::NumberLiteral(_) => Some(String::new()), - PromqlExpr::StringLiteral(_) => Some(String::new()), + PromqlExpr::Paren(ParenExpr { expr }) => find_metric_name_and_matchers(expr, f), + PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => find_metric_name_and_matchers(expr, f), + PromqlExpr::NumberLiteral(_) => None, + PromqlExpr::StringLiteral(_) => None, PromqlExpr::Extension(_) => None, + PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => f(name, matchers), + PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => { + let VectorSelector { name, matchers, .. } = vs; + + f(name, matchers) + } + PromqlExpr::Call(Call { args, .. }) => args + .args + .iter() + .find_map(|e| find_metric_name_and_matchers(e, f.clone())), + } +} + +/// Try to find the `__name__` matchers which op is not `MatchOp::Equal`. +fn find_metric_name_not_equal_matchers(expr: &PromqlExpr) -> Option> { + find_metric_name_and_matchers(expr, |name, matchers| { + // Has name, ignore the matchers + if name.is_some() { + return None; + } + + // FIXME(dennis): we don't consider the nested and `or` matchers yet. + Some(matchers.find_matchers(METRIC_NAME)) + }) + .map(|matchers| { + matchers + .into_iter() + .filter(|m| !matches!(m.op, MatchOp::Equal)) + .collect::>() + }) +} + +/// Update the `__name__` matchers in expression into special value +/// Returns the updated expression. +fn update_metric_name_matcher(expr: &mut PromqlExpr, metric_name: &str) { + match expr { + PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => { + update_metric_name_matcher(expr, metric_name) + } + PromqlExpr::Unary(UnaryExpr { expr }) => update_metric_name_matcher(expr, metric_name), + PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => { + update_metric_name_matcher(lhs, metric_name); + update_metric_name_matcher(rhs, metric_name); + } + PromqlExpr::Paren(ParenExpr { expr }) => update_metric_name_matcher(expr, metric_name), + PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => { + update_metric_name_matcher(expr, metric_name) + } PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => { - name.clone().or(matchers - .find_matchers(METRIC_NAME) - .into_iter() - .next() - .map(|m| m.value)) + if name.is_some() { + return; + } + + for m in &mut matchers.matchers { + if m.name == METRIC_NAME { + m.op = MatchOp::Equal; + m.value = metric_name.to_string(); + } + } } PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => { let VectorSelector { name, matchers, .. } = vs; - name.clone().or(matchers - .find_matchers(METRIC_NAME) - .into_iter() - .next() - .map(|m| m.value)) + if name.is_some() { + return; + } + + for m in &mut matchers.matchers { + if m.name == METRIC_NAME { + m.op = MatchOp::Equal; + m.value = metric_name.to_string(); + } + } } PromqlExpr::Call(Call { args, .. }) => { - args.args.iter().find_map(|e| promql_expr_to_metric_name(e)) + args.args.iter_mut().for_each(|e| { + update_metric_name_matcher(e, metric_name); + }); } + PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {} } } @@ -880,7 +1130,6 @@ async fn retrieve_label_values_from_record_batch( /// Returns the metric name if a single metric is referenced, otherwise None. fn retrieve_metric_name_from_promql(query: &str) -> Option { let promql_expr = promql_parser::parser::parse(query).ok()?; - // promql_expr_to_metric_name(&promql_expr) struct MetricNameVisitor { metric_name: Option, @@ -1034,13 +1283,8 @@ pub async fn parse_query( Form(form_params): Form, ) -> PrometheusJsonResponse { if let Some(query) = params.query.or(form_params.query) { - match promql_parser::parser::parse(&query) { - Ok(ast) => PrometheusJsonResponse::success(PrometheusResponse::ParseResult(ast)), - Err(err) => { - let msg = err.to_string(); - PrometheusJsonResponse::error(StatusCode::InvalidArguments, msg) - } - } + let ast = try_call_return_response!(promql_parser::parser::parse(&query)); + PrometheusJsonResponse::success(PrometheusResponse::ParseResult(ast)) } else { PrometheusJsonResponse::error(StatusCode::InvalidArguments, "query is required") } diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index 417d264651..e95bdac752 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -40,6 +40,7 @@ pub mod otlp; pub mod postgres; mod prom_row_builder; pub mod prom_store; +pub mod prometheus; pub mod prometheus_handler; pub mod proto; pub mod query_handler; diff --git a/src/servers/src/prometheus.rs b/src/servers/src/prometheus.rs new file mode 100644 index 0000000000..7066eda94b --- /dev/null +++ b/src/servers/src/prometheus.rs @@ -0,0 +1,84 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use catalog::system_schema::information_schema::tables::{ + ENGINE as TABLE_ENGINE, TABLE_CATALOG, TABLE_NAME, TABLE_SCHEMA, +}; +use common_telemetry::tracing; +use datafusion::prelude::{col, lit, regexp_match, Expr}; +use datafusion_expr::LogicalPlan; +use promql_parser::label::{MatchOp, Matcher}; +use query::dataframe::DataFrame; +use session::context::QueryContextRef; +use snafu::ResultExt; + +use crate::error::{self, Result}; + +/// The maximum number of metrics at one time. +const MAX_METRICS_NUM: usize = 1024; + +/// Create a DataFrame from promql `__name__` matchers. +/// # Panics +/// Panic when the machers contains `MatchOp::Equal`. +#[tracing::instrument(skip_all)] +pub fn metric_name_matchers_to_plan( + dataframe: DataFrame, + matchers: Vec, + ctx: &QueryContextRef, +) -> Result { + assert!(!matchers.is_empty()); + + let mut conditions = Vec::with_capacity(matchers.len() + 3); + + conditions.push(col(TABLE_CATALOG).eq(lit(ctx.current_catalog()))); + conditions.push(col(TABLE_SCHEMA).eq(lit(ctx.current_schema()))); + // Must be metric engine + conditions.push(col(TABLE_ENGINE).eq(lit("metric"))); + + for m in matchers { + let value = &m.value; + + match &m.op { + MatchOp::NotEqual => { + conditions.push(col(TABLE_NAME).not_eq(lit(value))); + } + // Case sensitive regexp match + MatchOp::Re(regex) => { + conditions.push( + regexp_match(col(TABLE_NAME), lit(regex.to_string()), None).is_not_null(), + ); + } + // Case sensitive regexp not match + MatchOp::NotRe(regex) => { + conditions + .push(regexp_match(col(TABLE_NAME), lit(regex.to_string()), None).is_null()); + } + _ => unreachable!("checked outside"), + } + } + + // Safety: conditions MUST not be empty, reduce always return Some(expr). + let conditions = conditions.into_iter().reduce(Expr::and).unwrap(); + + let DataFrame::DataFusion(dataframe) = dataframe; + let dataframe = dataframe + .filter(conditions) + .context(error::DataFrameSnafu)? + .select(vec![col(TABLE_NAME)]) + .context(error::DataFrameSnafu)? + .limit(0, Some(MAX_METRICS_NUM)) + .context(error::DataFrameSnafu)?; + + Ok(dataframe.into_parts().1) +} diff --git a/src/servers/src/prometheus_handler.rs b/src/servers/src/prometheus_handler.rs index e6d1359edd..a89e449918 100644 --- a/src/servers/src/prometheus_handler.rs +++ b/src/servers/src/prometheus_handler.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use async_trait::async_trait; use catalog::CatalogManagerRef; use common_query::Output; +use promql_parser::label::Matcher; use query::parser::PromQuery; use session::context::QueryContextRef; @@ -32,5 +33,12 @@ pub type PrometheusHandlerRef = Arc; pub trait PrometheusHandler { async fn do_query(&self, query: &PromQuery, query_ctx: QueryContextRef) -> Result; + /// Query metric table names by the `__name__` matchers. + async fn query_metric_names( + &self, + matchers: Vec, + ctx: &QueryContextRef, + ) -> Result>; + fn catalog_manager(&self) -> CatalogManagerRef; } diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index c930fd0acb..7235833a3b 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -438,6 +438,11 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider( (app, instance.guard) } +async fn run_sql(sql: &str, instance: &GreptimeDbStandalone) { + let result = instance.instance.do_query(sql, QueryContext::arc()).await; + let _ = result.first().unwrap().as_ref().unwrap(); +} + pub async fn setup_test_prom_app_with_frontend( store_type: StorageType, name: &str, @@ -446,11 +451,20 @@ pub async fn setup_test_prom_app_with_frontend( let instance = setup_standalone_instance(name, store_type).await; - create_test_table(instance.instance.as_ref(), "demo").await; - - let sql = "INSERT INTO demo VALUES ('host1', 1.1, 2.2, 0), ('host2', 2.1, 4.3, 600000)"; - let result = instance.instance.do_query(sql, QueryContext::arc()).await; - let _ = result.first().unwrap().as_ref().unwrap(); + // build physical table + let sql = "CREATE TABLE phy (ts timestamp time index, val double, host string primary key) engine=metric with ('physical_metric_table' = '')"; + run_sql(sql, &instance).await; + // build metric tables + let sql = "CREATE TABLE demo (ts timestamp time index, val double, host string primary key) engine=metric with ('on_physical_table' = 'phy')"; + run_sql(sql, &instance).await; + let sql = "CREATE TABLE demo_metrics (ts timestamp time index, val double, idc string primary key) engine=metric with ('on_physical_table' = 'phy')"; + run_sql(sql, &instance).await; + // insert rows + let sql = "INSERT INTO demo(host, val, ts) VALUES ('host1', 1.1, 0), ('host2', 2.1, 600000)"; + run_sql(sql, &instance).await; + let sql = + "INSERT INTO demo_metrics(idc, val, ts) VALUES ('idc1', 1.1, 0), ('idc2', 2.1, 600000)"; + run_sql(sql, &instance).await; let http_opts = HttpOptions { addr: format!("127.0.0.1:{}", ports::get_port()), diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 3d6aef33fc..6378a659bf 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -520,7 +520,7 @@ pub async fn test_prom_http_api(store_type: StorageType) { assert_eq!(body.status, "success"); assert_eq!( body.data, - serde_json::from_value::(json!(["__name__", "host", "number",])) + serde_json::from_value::(json!(["__name__", "host", "idc", "number",])) .unwrap() ); @@ -613,7 +613,7 @@ pub async fn test_prom_http_api(store_type: StorageType) { assert_eq!(body.status, "success"); assert_eq!( body.data, - serde_json::from_value::(json!(["cpu", "memory"])).unwrap() + serde_json::from_value::(json!(["val"])).unwrap() ); // query an empty database should return nothing @@ -698,6 +698,31 @@ pub async fn test_prom_http_api(store_type: StorageType) { let expected = "{\"status\":\"error\",\"data\":{\"resultType\":\"\",\"result\":[]},\"error\":\"invalid promql query\",\"errorType\":\"InvalidArguments\"}"; assert_eq!(expected, data); + // range_query with __name__ not-equal matcher + let res = client + .post("/v1/prometheus/api/v1/query_range?query=count by(__name__)({__name__=~'demo.*'})&start=1&end=100&step=5") + .header("Content-Type", "application/x-www-form-urlencoded") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let data = res.text().await; + assert!( + data.contains("{\"__name__\":\"demo_metrics\"}") + && data.contains("{\"__name__\":\"demo\"}") + ); + + let res = client + .post("/v1/prometheus/api/v1/query_range?query=count by(__name__)({__name__=~'demo_metrics'})&start=1&end=100&step=5") + .header("Content-Type", "application/x-www-form-urlencoded") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let data = res.text().await; + assert!( + data.contains("{\"__name__\":\"demo_metrics\"}") + && !data.contains("{\"__name__\":\"demo\"}") + ); + guard.remove_all().await; }