diff --git a/src/query/src/promql/error.rs b/src/query/src/promql/error.rs index 532f598b77..cdf47053c5 100644 --- a/src/query/src/promql/error.rs +++ b/src/query/src/promql/error.rs @@ -17,6 +17,7 @@ use std::any::Any; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; +use common_time::timestamp::TimeUnit; use datafusion::error::DataFusionError; use promql::error::Error as PromqlError; use promql_parser::parser::token::TokenType; @@ -192,6 +193,14 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Timestamp out of range: {} of {:?}", timestamp, unit))] + TimestampOutOfRange { + timestamp: i64, + unit: TimeUnit, + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for Error { @@ -211,7 +220,8 @@ impl ErrorExt for Error { | UnsupportedVectorMatch { .. } | CombineTableColumnMismatch { .. } | UnexpectedPlanExpr { .. } - | UnsupportedMatcherOp { .. } => StatusCode::InvalidArguments, + | UnsupportedMatcherOp { .. } + | TimestampOutOfRange { .. } => StatusCode::InvalidArguments, UnknownTable { .. } => StatusCode::Internal, diff --git a/src/query/src/promql/label_values.rs b/src/query/src/promql/label_values.rs index 647cbb69e3..e1dcdf968b 100644 --- a/src/query/src/promql/label_values.rs +++ b/src/query/src/promql/label_values.rs @@ -14,80 +14,73 @@ use std::time::{SystemTime, UNIX_EPOCH}; +use common_time::timestamp::TimeUnit; +use common_time::Timestamp; use datafusion_common::{Column, ScalarValue}; -use datafusion_expr::expr::Alias; use datafusion_expr::utils::conjunction; -use datafusion_expr::{col, Cast, Expr, LogicalPlan, LogicalPlanBuilder}; -use datafusion_sql::TableReference; -use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit}; -use datatypes::prelude::ConcreteDataType; +use datafusion_expr::{col, Expr, LogicalPlan, LogicalPlanBuilder}; use snafu::{OptionExt, ResultExt}; use table::TableRef; -use crate::promql::error::{DataFusionPlanningSnafu, Result, TimeIndexNotFoundSnafu}; +use crate::promql::error::{ + DataFusionPlanningSnafu, Result, TimeIndexNotFoundSnafu, TimestampOutOfRangeSnafu, +}; -fn build_time_filter(time_index_expr: Expr, start: i64, end: i64) -> Expr { +fn build_time_filter(time_index_expr: Expr, start: Timestamp, end: Timestamp) -> Expr { time_index_expr .clone() - .gt_eq(Expr::Literal(ScalarValue::TimestampMillisecond( - Some(start), - None, - ))) - .and( - time_index_expr.lt_eq(Expr::Literal(ScalarValue::TimestampMillisecond( - Some(end), - None, - ))), - ) + .gt_eq(Expr::Literal(timestamp_to_scalar_value(start))) + .and(time_index_expr.lt_eq(Expr::Literal(timestamp_to_scalar_value(end)))) +} + +fn timestamp_to_scalar_value(timestamp: Timestamp) -> ScalarValue { + let value = timestamp.value(); + match timestamp.unit() { + TimeUnit::Second => ScalarValue::TimestampSecond(Some(value), None), + TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(value), None), + TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(value), None), + TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(value), None), + } } /// Rewrite label values query to DataFusion logical plan. pub fn rewrite_label_values_query( table: TableRef, - mut scan_plan: LogicalPlan, + scan_plan: LogicalPlan, mut conditions: Vec, label_name: String, start: SystemTime, end: SystemTime, ) -> Result { - let table_ref = TableReference::partial( - table.table_info().schema_name.as_str(), - table.table_info().name.as_str(), - ); let schema = table.schema(); let ts_column = schema .timestamp_column() .with_context(|| TimeIndexNotFoundSnafu { table: table.table_info().full_table_name(), })?; + let unit = ts_column + .data_type + .as_timestamp() + .map(|data_type| data_type.unit()) + .with_context(|| TimeIndexNotFoundSnafu { + table: table.table_info().full_table_name(), + })?; - let is_time_index_ms = - ts_column.data_type == ConcreteDataType::timestamp_millisecond_datatype(); + // We only support millisecond precision at most. + let start = + Timestamp::new_millisecond(start.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64); + let start = start.convert_to(unit).context(TimestampOutOfRangeSnafu { + timestamp: start.value(), + unit, + })?; + let end = + Timestamp::new_millisecond(end.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64); + let end = end.convert_to(unit).context(TimestampOutOfRangeSnafu { + timestamp: end.value(), + unit, + })?; let time_index_expr = col(Column::from_name(ts_column.name.clone())); - if !is_time_index_ms { - // cast to ms if time_index not in Millisecond precision - let expr = vec![ - col(Column::from_name(label_name.clone())), - Expr::Alias(Alias { - expr: Box::new(Expr::Cast(Cast { - expr: Box::new(time_index_expr.clone()), - data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None), - })), - relation: Some(table_ref), - name: ts_column.name.clone(), - }), - ]; - scan_plan = LogicalPlanBuilder::from(scan_plan) - .project(expr) - .context(DataFusionPlanningSnafu)? - .build() - .context(DataFusionPlanningSnafu)?; - }; - - let start = start.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64; - let end = end.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64; - conditions.push(build_time_filter(time_index_expr, start, end)); // Safety: `conditions` is not empty. let filter = conjunction(conditions).unwrap(); diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index ee68b8d715..0e1d5cc261 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -491,11 +491,15 @@ pub async fn setup_test_prom_app_with_frontend( // build physical table let sql = "CREATE TABLE phy (ts timestamp time index, val double, host string primary key) engine=metric with ('physical_metric_table' = '')"; run_sql(sql, &instance).await; + let sql = "CREATE TABLE phy_ns (ts timestamp(0) time index, val double, host string primary key) engine=metric with ('physical_metric_table' = '')"; + run_sql(sql, &instance).await; // build metric tables let sql = "CREATE TABLE demo (ts timestamp time index, val double, host string primary key) engine=metric with ('on_physical_table' = 'phy')"; run_sql(sql, &instance).await; let sql = "CREATE TABLE demo_metrics (ts timestamp time index, val double, idc string primary key) engine=metric with ('on_physical_table' = 'phy')"; run_sql(sql, &instance).await; + let sql = "CREATE TABLE multi_labels (ts timestamp(0) time index, val double, idc string, env string, host string, primary key (idc, env, host)) engine=metric with ('on_physical_table' = 'phy_ns')"; + run_sql(sql, &instance).await; // insert rows let sql = "INSERT INTO demo(host, val, ts) VALUES ('host1', 1.1, 0), ('host2', 2.1, 600000)"; @@ -507,6 +511,10 @@ pub async fn setup_test_prom_app_with_frontend( let sql = "INSERT INTO demo_metrics(val, ts) VALUES (1.1, 0)"; run_sql(sql, &instance).await; + // insert rows to multi_labels + let sql = "INSERT INTO multi_labels(idc, env, host, val, ts) VALUES ('idc1', 'dev', 'host1', 1.1, 0), ('idc1', 'dev', 'host2', 2.1, 0), ('idc2', 'dev', 'host1', 1.1, 0), ('idc2', 'test', 'host3', 2.1, 0)"; + run_sql(sql, &instance).await; + // build physical table let sql = "CREATE TABLE phy2 (ts timestamp(9) time index, val double, host string primary key) engine=metric with ('physical_metric_table' = '')"; run_sql(sql, &instance).await; diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 43b76fcedf..8e5223fad5 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -603,8 +603,10 @@ pub async fn test_prom_http_api(store_type: StorageType) { assert_eq!(body.status, "success"); assert_eq!( body.data, - serde_json::from_value::(json!(["__name__", "host", "idc", "number",])) - .unwrap() + serde_json::from_value::(json!([ + "__name__", "env", "host", "idc", "number", + ])) + .unwrap() ); // labels query with multiple match[] params @@ -725,6 +727,19 @@ pub async fn test_prom_http_api(store_type: StorageType) { serde_json::from_value::(json!(["idc1"])).unwrap() ); + // match labels. + let res = client + .get("/v1/prometheus/api/v1/label/host/values?match[]=multi_labels{idc=\"idc1\", env=\"dev\"}&start=0&end=600") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let body = serde_json::from_str::(&res.text().await).unwrap(); + assert_eq!(body.status, "success"); + assert_eq!( + body.data, + serde_json::from_value::(json!(["host1", "host2"])).unwrap() + ); + // search field name let res = client .get("/v1/prometheus/api/v1/label/__field__/values?match[]=demo") @@ -814,6 +829,7 @@ pub async fn test_prom_http_api(store_type: StorageType) { "demo_metrics_with_nanos".to_string(), "logic_table".to_string(), "mito".to_string(), + "multi_labels".to_string(), "numbers".to_string() ]) );