diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index a3455f25d8..076ecf9943 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -238,6 +238,13 @@ pub enum Error { source: servers::error::Error, }, + #[snafu(display("Failed to create logical plan for prometheus label values query"))] + PrometheusLabelValuesQueryPlan { + #[snafu(implicit)] + location: Location, + source: query::promql::error::Error, + }, + #[snafu(display("Failed to describe schema for given statement"))] DescribeStatement { #[snafu(implicit)] @@ -366,6 +373,8 @@ impl ErrorExt for Error { | Error::PrometheusMetricNamesQueryPlan { source, .. } | Error::ExecutePromql { source, .. } => source.status_code(), + Error::PrometheusLabelValuesQueryPlan { source, .. } => source.status_code(), + Error::CollectRecordbatch { .. } => StatusCode::EngineExecuteQuery, Error::SqlExecIntercepted { source, .. } => source.status_code(), diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 7cb6d24a75..d3dec49fa4 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -26,6 +26,7 @@ mod region_query; pub mod standalone; use std::sync::Arc; +use std::time::SystemTime; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; @@ -471,6 +472,21 @@ impl PrometheusHandler for Instance { .context(ExecuteQuerySnafu) } + async fn query_label_values( + &self, + metric: String, + label_name: String, + matchers: Vec, + start: SystemTime, + end: SystemTime, + ctx: &QueryContextRef, + ) -> server_error::Result> { + self.handle_query_label_values(metric, label_name, matchers, start, end, ctx) + .await + .map_err(BoxedError::new) + .context(ExecuteQuerySnafu) + } + fn catalog_manager(&self) -> CatalogManagerRef { self.catalog_manager.clone() } diff --git a/src/frontend/src/instance/promql.rs b/src/frontend/src/instance/promql.rs index 7ee675a49e..1dca064982 100644 --- a/src/frontend/src/instance/promql.rs +++ b/src/frontend/src/instance/promql.rs @@ -12,20 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::time::SystemTime; + use catalog::information_schema::TABLES; use client::OutputData; use common_catalog::consts::INFORMATION_SCHEMA_NAME; +use common_catalog::format_full_table_name; use common_recordbatch::util; use common_telemetry::tracing; use datatypes::prelude::Value; -use promql_parser::label::Matcher; +use promql_parser::label::{Matcher, Matchers}; +use query::promql; +use query::promql::planner::PromPlanner; use servers::prometheus; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; use crate::error::{ CatalogSnafu, CollectRecordbatchSnafu, ExecLogicalPlanSnafu, - PrometheusMetricNamesQueryPlanSnafu, ReadTableSnafu, Result, TableNotFoundSnafu, + PrometheusLabelValuesQueryPlanSnafu, PrometheusMetricNamesQueryPlanSnafu, ReadTableSnafu, + Result, TableNotFoundSnafu, }; use crate::instance::Instance; @@ -96,4 +102,77 @@ impl Instance { Ok(results) } + + /// Handles label values query request, returns the values. + #[tracing::instrument(skip_all)] + pub(crate) async fn handle_query_label_values( + &self, + metric: String, + label_name: String, + matchers: Vec, + start: SystemTime, + end: SystemTime, + ctx: &QueryContextRef, + ) -> Result> { + let table_schema = ctx.current_schema(); + let table = self + .catalog_manager + .table(ctx.current_catalog(), &table_schema, &metric, Some(ctx)) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: format_full_table_name(ctx.current_catalog(), &table_schema, &metric), + })?; + + let dataframe = self + .query_engine + .read_table(table.clone()) + .with_context(|_| ReadTableSnafu { + table_name: format_full_table_name(ctx.current_catalog(), &table_schema, &metric), + })?; + + let scan_plan = dataframe.into_logical_plan(); + let filter_conditions = + PromPlanner::matchers_to_expr(Matchers::new(matchers), scan_plan.schema()) + .context(PrometheusLabelValuesQueryPlanSnafu)?; + let logical_plan = promql::label_values::rewrite_label_values_query( + table, + scan_plan, + filter_conditions, + label_name, + start, + end, + ) + .context(PrometheusLabelValuesQueryPlanSnafu)?; + + let results = self + .query_engine + .execute(logical_plan, ctx.clone()) + .await + .context(ExecLogicalPlanSnafu)?; + + let batches = match results.data { + OutputData::Stream(stream) => util::collect(stream) + .await + .context(CollectRecordbatchSnafu)?, + OutputData::RecordBatches(rbs) => rbs.take(), + _ => unreachable!("should not happen"), + }; + + let mut results = Vec::with_capacity(batches.iter().map(|b| b.num_rows()).sum()); + for batch in batches { + // Only one column the results, ensured by `prometheus::label_values_matchers_to_plan`. + let names = batch.column(0); + + for i in 0..names.len() { + let Value::String(name) = names.get(i) else { + unreachable!(); + }; + + results.push(name.into_string()); + } + } + + Ok(results) + } } diff --git a/src/query/src/dataframe.rs b/src/query/src/dataframe.rs index 0e87f3e54a..ce630b99e7 100644 --- a/src/query/src/dataframe.rs +++ b/src/query/src/dataframe.rs @@ -13,6 +13,7 @@ // limitations under the License. use datafusion::dataframe::DataFrame as DfDataFrame; +use datafusion_expr::LogicalPlan; /// DataFrame represents a logical set of rows with the same named columns. /// Similar to a Pandas DataFrame or Spark DataFrame @@ -20,3 +21,11 @@ use datafusion::dataframe::DataFrame as DfDataFrame; pub enum DataFrame { DataFusion(DfDataFrame), } + +impl DataFrame { + pub fn into_logical_plan(self) -> LogicalPlan { + match self { + Self::DataFusion(dataframe) => dataframe.into_parts().1, + } + } +} diff --git a/src/query/src/parser.rs b/src/query/src/parser.rs index 7867cc38fc..82d5975e50 100644 --- a/src/query/src/parser.rs +++ b/src/query/src/parser.rs @@ -188,7 +188,7 @@ impl QueryLanguageParser { Ok(QueryStatement::Promql(eval_stmt)) } - fn parse_promql_timestamp(timestamp: &str) -> Result { + pub fn parse_promql_timestamp(timestamp: &str) -> Result { // try rfc3339 format let rfc3339_result = DateTime::parse_from_rfc3339(timestamp) .context(ParseTimestampSnafu { raw: timestamp }) diff --git a/src/query/src/promql.rs b/src/query/src/promql.rs index 06d2bbd21a..27b446e6e4 100644 --- a/src/query/src/promql.rs +++ b/src/query/src/promql.rs @@ -12,5 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub(crate) mod error; +pub mod error; +pub mod label_values; pub mod planner; diff --git a/src/query/src/promql/label_values.rs b/src/query/src/promql/label_values.rs new file mode 100644 index 0000000000..647cbb69e3 --- /dev/null +++ b/src/query/src/promql/label_values.rs @@ -0,0 +1,107 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::{SystemTime, UNIX_EPOCH}; + +use datafusion_common::{Column, ScalarValue}; +use datafusion_expr::expr::Alias; +use datafusion_expr::utils::conjunction; +use datafusion_expr::{col, Cast, Expr, LogicalPlan, LogicalPlanBuilder}; +use datafusion_sql::TableReference; +use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit}; +use datatypes::prelude::ConcreteDataType; +use snafu::{OptionExt, ResultExt}; +use table::TableRef; + +use crate::promql::error::{DataFusionPlanningSnafu, Result, TimeIndexNotFoundSnafu}; + +fn build_time_filter(time_index_expr: Expr, start: i64, end: i64) -> Expr { + time_index_expr + .clone() + .gt_eq(Expr::Literal(ScalarValue::TimestampMillisecond( + Some(start), + None, + ))) + .and( + time_index_expr.lt_eq(Expr::Literal(ScalarValue::TimestampMillisecond( + Some(end), + None, + ))), + ) +} + +/// Rewrite label values query to DataFusion logical plan. +pub fn rewrite_label_values_query( + table: TableRef, + mut scan_plan: LogicalPlan, + mut conditions: Vec, + label_name: String, + start: SystemTime, + end: SystemTime, +) -> Result { + let table_ref = TableReference::partial( + table.table_info().schema_name.as_str(), + table.table_info().name.as_str(), + ); + let schema = table.schema(); + let ts_column = schema + .timestamp_column() + .with_context(|| TimeIndexNotFoundSnafu { + table: table.table_info().full_table_name(), + })?; + + let is_time_index_ms = + ts_column.data_type == ConcreteDataType::timestamp_millisecond_datatype(); + let time_index_expr = col(Column::from_name(ts_column.name.clone())); + + if !is_time_index_ms { + // cast to ms if time_index not in Millisecond precision + let expr = vec![ + col(Column::from_name(label_name.clone())), + Expr::Alias(Alias { + expr: Box::new(Expr::Cast(Cast { + expr: Box::new(time_index_expr.clone()), + data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None), + })), + relation: Some(table_ref), + name: ts_column.name.clone(), + }), + ]; + scan_plan = LogicalPlanBuilder::from(scan_plan) + .project(expr) + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu)?; + }; + + let start = start.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64; + let end = end.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64; + + conditions.push(build_time_filter(time_index_expr, start, end)); + // Safety: `conditions` is not empty. + let filter = conjunction(conditions).unwrap(); + + // Builds time filter + let logical_plan = LogicalPlanBuilder::from(scan_plan) + .filter(filter) + .context(DataFusionPlanningSnafu)? + .project(vec![col(Column::from_name(label_name))]) + .context(DataFusionPlanningSnafu)? + .distinct() + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu)?; + + Ok(logical_plan) +} diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index d86fd1c769..607cb98504 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -939,7 +939,7 @@ impl PromPlanner { Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond), None => 0, }; - let mut scan_filters = self.matchers_to_expr(label_matchers.clone(), table_schema)?; + let mut scan_filters = Self::matchers_to_expr(label_matchers.clone(), table_schema)?; if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? { scan_filters.push(time_index_filter); } @@ -1135,8 +1135,7 @@ impl PromPlanner { } // TODO(ruihang): ignore `MetricNameLabel` (`__name__`) matcher - fn matchers_to_expr( - &self, + pub fn matchers_to_expr( label_matchers: Matchers, table_schema: &DFSchemaRef, ) -> Result> { diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 7c5bc48909..6c76942d05 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -410,6 +410,15 @@ pub enum Error { source: query::error::Error, }, + #[snafu(display("Failed to parse timestamp: {}", timestamp))] + ParseTimestamp { + timestamp: String, + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: query::error::Error, + }, + #[snafu(display("{}", reason))] UnexpectedResult { reason: String, @@ -685,7 +694,8 @@ impl ErrorExt for Error { | PrepareStatementNotFound { .. } | FailedToParseQuery { .. } | InvalidElasticsearchInput { .. } - | InvalidJaegerQuery { .. } => StatusCode::InvalidArguments, + | InvalidJaegerQuery { .. } + | ParseTimestamp { .. } => StatusCode::InvalidArguments, Catalog { source, .. } => source.status_code(), RowWriter { source, .. } => source.status_code(), diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index 41dfc672ab..7ac2ccd3d2 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -29,7 +29,7 @@ use common_time::util::{current_time_rfc3339, yesterday_rfc3339}; use common_version::OwnedBuildInfo; use datatypes::prelude::ConcreteDataType; use datatypes::scalars::ScalarVector; -use datatypes::vectors::{Float64Vector, StringVector}; +use datatypes::vectors::Float64Vector; use futures::future::join_all; use futures::StreamExt; use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME}; @@ -38,7 +38,7 @@ use promql_parser::parser::{ AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, MatrixSelector, ParenExpr, SubqueryExpr, UnaryExpr, VectorSelector, }; -use query::parser::{PromQuery, DEFAULT_LOOKBACK_STRING}; +use query::parser::{PromQuery, QueryLanguageParser, DEFAULT_LOOKBACK_STRING}; use query::promql::planner::normalize_matcher; use serde::de::{self, MapAccess, Visitor}; use serde::{Deserialize, Serialize}; @@ -51,8 +51,8 @@ use store_api::metric_engine_consts::{ pub use super::result::prometheus_resp::PrometheusJsonResponse; use crate::error::{ - CatalogSnafu, CollectRecordbatchSnafu, Error, InvalidQuerySnafu, Result, TableNotFoundSnafu, - UnexpectedResultSnafu, + CatalogSnafu, CollectRecordbatchSnafu, Error, InvalidQuerySnafu, ParseTimestampSnafu, Result, + TableNotFoundSnafu, UnexpectedResultSnafu, }; use crate::http::header::collect_plan_metrics; use crate::prom_store::{FIELD_NAME_LABEL, METRIC_NAME_LABEL}; @@ -994,44 +994,58 @@ pub async fn label_values_query( let start = params.start.unwrap_or_else(yesterday_rfc3339); let end = params.end.unwrap_or_else(current_time_rfc3339); - let lookback = params - .lookback - .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()); - let mut label_values = HashSet::new(); - let mut merge_map = HashMap::new(); + let start = try_call_return_response!(QueryLanguageParser::parse_promql_timestamp(&start) + .context(ParseTimestampSnafu { timestamp: &start })); + let end = try_call_return_response!(QueryLanguageParser::parse_promql_timestamp(&end) + .context(ParseTimestampSnafu { timestamp: &end })); + for query in queries { - let prom_query = PromQuery { - query, - start: start.clone(), - end: end.clone(), - step: DEFAULT_LOOKBACK_STRING.to_string(), - lookback: lookback.clone(), + let promql_expr = try_call_return_response!(promql_parser::parser::parse(&query)); + let PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) = promql_expr else { + return PrometheusJsonResponse::error( + StatusCode::InvalidArguments, + "expected vector selector", + ); }; - let result = handler.do_query(&prom_query, query_ctx.clone()).await; - if let Err(err) = - retrieve_label_values(result, &label_name, &mut label_values, &mut merge_map).await - { - // Prometheus won't report error if querying nonexist label and metric - if err.status_code() != StatusCode::TableNotFound - && err.status_code() != StatusCode::TableColumnNotFound - { - return PrometheusJsonResponse::error(err.status_code(), err.output_msg()); + let Some(name) = name else { + return PrometheusJsonResponse::error( + StatusCode::InvalidArguments, + "expected metric name", + ); + }; + // Only use and filter matchers. + let matchers = matchers.matchers; + let result = handler + .query_label_values( + name, + label_name.to_string(), + matchers, + start, + end, + &query_ctx, + ) + .await; + + match result { + Ok(result) => { + label_values.extend(result.into_iter()); + } + Err(err) => { + // Prometheus won't report error if querying nonexist label and metric + if err.status_code() != StatusCode::TableNotFound + && err.status_code() != StatusCode::TableColumnNotFound + { + return PrometheusJsonResponse::error(err.status_code(), err.output_msg()); + } } } } - let merge_map = merge_map - .into_iter() - .map(|(k, v)| (k, Value::from(v))) - .collect(); - let mut label_values: Vec<_> = label_values.into_iter().collect(); label_values.sort_unstable(); - let mut resp = PrometheusJsonResponse::success(PrometheusResponse::LabelValues(label_values)); - resp.resp_metrics = merge_map; - resp + PrometheusJsonResponse::success(PrometheusResponse::LabelValues(label_values)) } async fn retrieve_field_names( @@ -1076,71 +1090,6 @@ async fn retrieve_field_names( Ok(field_columns) } -async fn retrieve_label_values( - result: Result, - label_name: &str, - labels_values: &mut HashSet, - metrics: &mut HashMap, -) -> Result<()> { - let result = result?; - match result.data { - OutputData::RecordBatches(batches) => { - retrieve_label_values_from_record_batch(batches, label_name, labels_values).await - } - OutputData::Stream(stream) => { - let batches = RecordBatches::try_collect(stream) - .await - .context(CollectRecordbatchSnafu)?; - retrieve_label_values_from_record_batch(batches, label_name, labels_values).await - } - OutputData::AffectedRows(_) => UnexpectedResultSnafu { - reason: "expected data result, but got affected rows".to_string(), - } - .fail(), - }?; - - if let Some(ref plan) = result.meta.plan { - collect_plan_metrics(plan, &mut [metrics]); - } - - Ok(()) -} - -async fn retrieve_label_values_from_record_batch( - batches: RecordBatches, - label_name: &str, - labels_values: &mut HashSet, -) -> Result<()> { - let Some(label_col_idx) = batches.schema().column_index_by_name(label_name) else { - return Ok(()); - }; - - // check whether label_name belongs to tag column - match batches - .schema() - .column_schema_by_name(label_name) - .unwrap() - .data_type - { - ConcreteDataType::String(_) => {} - _ => return Ok(()), - } - for batch in batches.iter() { - let label_column = batch - .column(label_col_idx) - .as_any() - .downcast_ref::() - .unwrap(); - for row_index in 0..batch.num_rows() { - if let Some(label_value) = label_column.get_data(row_index) { - let _ = labels_values.insert(label_value.to_string()); - } - } - } - - Ok(()) -} - /// Try to parse and extract the name of referenced metric from the promql query. /// /// Returns the metric name if a single metric is referenced, otherwise None. diff --git a/src/servers/src/prometheus_handler.rs b/src/servers/src/prometheus_handler.rs index a89e449918..85779827b3 100644 --- a/src/servers/src/prometheus_handler.rs +++ b/src/servers/src/prometheus_handler.rs @@ -15,6 +15,7 @@ //! prom supply the prometheus HTTP API Server compliance use std::sync::Arc; +use std::time::SystemTime; use async_trait::async_trait; use catalog::CatalogManagerRef; @@ -40,5 +41,15 @@ pub trait PrometheusHandler { ctx: &QueryContextRef, ) -> Result>; + async fn query_label_values( + &self, + metric: String, + label_name: String, + matchers: Vec, + start: SystemTime, + end: SystemTime, + ctx: &QueryContextRef, + ) -> Result>; + fn catalog_manager(&self) -> CatalogManagerRef; } diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index bc7fa65cd6..c0abacb940 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -465,6 +465,7 @@ pub async fn setup_test_prom_app_with_frontend( run_sql(sql, &instance).await; let sql = "CREATE TABLE demo_metrics (ts timestamp time index, val double, idc string primary key) engine=metric with ('on_physical_table' = 'phy')"; run_sql(sql, &instance).await; + // insert rows let sql = "INSERT INTO demo(host, val, ts) VALUES ('host1', 1.1, 0), ('host2', 2.1, 600000)"; run_sql(sql, &instance).await; @@ -472,6 +473,14 @@ pub async fn setup_test_prom_app_with_frontend( "INSERT INTO demo_metrics(idc, val, ts) VALUES ('idc1', 1.1, 0), ('idc2', 2.1, 600000)"; run_sql(sql, &instance).await; + // build physical table + let sql = "CREATE TABLE phy2 (ts timestamp(9) time index, val double, host string primary key) engine=metric with ('physical_metric_table' = '')"; + run_sql(sql, &instance).await; + let sql = "CREATE TABLE demo_metrics_with_nanos(ts timestamp(9) time index, val double, idc string primary key) engine=metric with ('on_physical_table' = 'phy2')"; + run_sql(sql, &instance).await; + let sql = "INSERT INTO demo_metrics_with_nanos(idc, val, ts) VALUES ('idc1', 1.1, 0)"; + run_sql(sql, &instance).await; + let http_opts = HttpOptions { addr: format!("127.0.0.1:{}", ports::get_port()), ..Default::default() diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index fb67fa6586..40a8547db0 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -666,6 +666,32 @@ pub async fn test_prom_http_api(store_type: StorageType) { serde_json::from_value::(json!(["host1", "host2"])).unwrap() ); + // single match[] + let res = client + .get("/v1/prometheus/api/v1/label/host/values?match[]=demo&start=0&end=300") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let body = serde_json::from_str::(&res.text().await).unwrap(); + assert_eq!(body.status, "success"); + assert_eq!( + body.data, + serde_json::from_value::(json!(["host1"])).unwrap() + ); + + // single match[] + let res = client + .get("/v1/prometheus/api/v1/label/idc/values?match[]=demo_metrics_with_nanos&start=0&end=600") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let body = serde_json::from_str::(&res.text().await).unwrap(); + assert_eq!(body.status, "success"); + assert_eq!( + body.data, + serde_json::from_value::(json!(["idc1"])).unwrap() + ); + // search field name let res = client .get("/v1/prometheus/api/v1/label/__field__/values?match[]=demo") @@ -745,6 +771,7 @@ pub async fn test_prom_http_api(store_type: StorageType) { PrometheusResponse::Labels(vec![ "demo".to_string(), "demo_metrics".to_string(), + "demo_metrics_with_nanos".to_string(), "logic_table".to_string(), "numbers".to_string() ])