fix: do not add projection to cast timestamp in label_values (#6040)

* fix: do not add projection for cast

Use cast to build time filter directly instead of adding a projection,
which will cause column not found

* feat: cast before creating plan
This commit is contained in:
Yingwen
2025-05-07 07:47:41 +08:00
committed by GitHub
parent f298a110f9
commit 07e84a28a3
4 changed files with 77 additions and 50 deletions

View File

@@ -17,6 +17,7 @@ use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_time::timestamp::TimeUnit;
use datafusion::error::DataFusionError;
use promql::error::Error as PromqlError;
use promql_parser::parser::token::TokenType;
@@ -192,6 +193,14 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Timestamp out of range: {} of {:?}", timestamp, unit))]
TimestampOutOfRange {
timestamp: i64,
unit: TimeUnit,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -211,7 +220,8 @@ impl ErrorExt for Error {
| UnsupportedVectorMatch { .. }
| CombineTableColumnMismatch { .. }
| UnexpectedPlanExpr { .. }
| UnsupportedMatcherOp { .. } => StatusCode::InvalidArguments,
| UnsupportedMatcherOp { .. }
| TimestampOutOfRange { .. } => StatusCode::InvalidArguments,
UnknownTable { .. } => StatusCode::Internal,

View File

@@ -14,80 +14,73 @@
use std::time::{SystemTime, UNIX_EPOCH};
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::expr::Alias;
use datafusion_expr::utils::conjunction;
use datafusion_expr::{col, Cast, Expr, LogicalPlan, LogicalPlanBuilder};
use datafusion_sql::TableReference;
use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
use datatypes::prelude::ConcreteDataType;
use datafusion_expr::{col, Expr, LogicalPlan, LogicalPlanBuilder};
use snafu::{OptionExt, ResultExt};
use table::TableRef;
use crate::promql::error::{DataFusionPlanningSnafu, Result, TimeIndexNotFoundSnafu};
use crate::promql::error::{
DataFusionPlanningSnafu, Result, TimeIndexNotFoundSnafu, TimestampOutOfRangeSnafu,
};
fn build_time_filter(time_index_expr: Expr, start: i64, end: i64) -> Expr {
fn build_time_filter(time_index_expr: Expr, start: Timestamp, end: Timestamp) -> Expr {
time_index_expr
.clone()
.gt_eq(Expr::Literal(ScalarValue::TimestampMillisecond(
Some(start),
None,
)))
.and(
time_index_expr.lt_eq(Expr::Literal(ScalarValue::TimestampMillisecond(
Some(end),
None,
))),
)
.gt_eq(Expr::Literal(timestamp_to_scalar_value(start)))
.and(time_index_expr.lt_eq(Expr::Literal(timestamp_to_scalar_value(end))))
}
fn timestamp_to_scalar_value(timestamp: Timestamp) -> ScalarValue {
let value = timestamp.value();
match timestamp.unit() {
TimeUnit::Second => ScalarValue::TimestampSecond(Some(value), None),
TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(value), None),
TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(value), None),
TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(value), None),
}
}
/// Rewrite label values query to DataFusion logical plan.
pub fn rewrite_label_values_query(
table: TableRef,
mut scan_plan: LogicalPlan,
scan_plan: LogicalPlan,
mut conditions: Vec<Expr>,
label_name: String,
start: SystemTime,
end: SystemTime,
) -> Result<LogicalPlan> {
let table_ref = TableReference::partial(
table.table_info().schema_name.as_str(),
table.table_info().name.as_str(),
);
let schema = table.schema();
let ts_column = schema
.timestamp_column()
.with_context(|| TimeIndexNotFoundSnafu {
table: table.table_info().full_table_name(),
})?;
let unit = ts_column
.data_type
.as_timestamp()
.map(|data_type| data_type.unit())
.with_context(|| TimeIndexNotFoundSnafu {
table: table.table_info().full_table_name(),
})?;
let is_time_index_ms =
ts_column.data_type == ConcreteDataType::timestamp_millisecond_datatype();
// We only support millisecond precision at most.
let start =
Timestamp::new_millisecond(start.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64);
let start = start.convert_to(unit).context(TimestampOutOfRangeSnafu {
timestamp: start.value(),
unit,
})?;
let end =
Timestamp::new_millisecond(end.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64);
let end = end.convert_to(unit).context(TimestampOutOfRangeSnafu {
timestamp: end.value(),
unit,
})?;
let time_index_expr = col(Column::from_name(ts_column.name.clone()));
if !is_time_index_ms {
// cast to ms if time_index not in Millisecond precision
let expr = vec![
col(Column::from_name(label_name.clone())),
Expr::Alias(Alias {
expr: Box::new(Expr::Cast(Cast {
expr: Box::new(time_index_expr.clone()),
data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
})),
relation: Some(table_ref),
name: ts_column.name.clone(),
}),
];
scan_plan = LogicalPlanBuilder::from(scan_plan)
.project(expr)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)?;
};
let start = start.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64;
let end = end.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64;
conditions.push(build_time_filter(time_index_expr, start, end));
// Safety: `conditions` is not empty.
let filter = conjunction(conditions).unwrap();

View File

@@ -491,11 +491,15 @@ pub async fn setup_test_prom_app_with_frontend(
// build physical table
let sql = "CREATE TABLE phy (ts timestamp time index, val double, host string primary key) engine=metric with ('physical_metric_table' = '')";
run_sql(sql, &instance).await;
let sql = "CREATE TABLE phy_ns (ts timestamp(0) time index, val double, host string primary key) engine=metric with ('physical_metric_table' = '')";
run_sql(sql, &instance).await;
// build metric tables
let sql = "CREATE TABLE demo (ts timestamp time index, val double, host string primary key) engine=metric with ('on_physical_table' = 'phy')";
run_sql(sql, &instance).await;
let sql = "CREATE TABLE demo_metrics (ts timestamp time index, val double, idc string primary key) engine=metric with ('on_physical_table' = 'phy')";
run_sql(sql, &instance).await;
let sql = "CREATE TABLE multi_labels (ts timestamp(0) time index, val double, idc string, env string, host string, primary key (idc, env, host)) engine=metric with ('on_physical_table' = 'phy_ns')";
run_sql(sql, &instance).await;
// insert rows
let sql = "INSERT INTO demo(host, val, ts) VALUES ('host1', 1.1, 0), ('host2', 2.1, 600000)";
@@ -507,6 +511,10 @@ pub async fn setup_test_prom_app_with_frontend(
let sql = "INSERT INTO demo_metrics(val, ts) VALUES (1.1, 0)";
run_sql(sql, &instance).await;
// insert rows to multi_labels
let sql = "INSERT INTO multi_labels(idc, env, host, val, ts) VALUES ('idc1', 'dev', 'host1', 1.1, 0), ('idc1', 'dev', 'host2', 2.1, 0), ('idc2', 'dev', 'host1', 1.1, 0), ('idc2', 'test', 'host3', 2.1, 0)";
run_sql(sql, &instance).await;
// build physical table
let sql = "CREATE TABLE phy2 (ts timestamp(9) time index, val double, host string primary key) engine=metric with ('physical_metric_table' = '')";
run_sql(sql, &instance).await;

View File

@@ -603,8 +603,10 @@ pub async fn test_prom_http_api(store_type: StorageType) {
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PrometheusResponse>(json!(["__name__", "host", "idc", "number",]))
.unwrap()
serde_json::from_value::<PrometheusResponse>(json!([
"__name__", "env", "host", "idc", "number",
]))
.unwrap()
);
// labels query with multiple match[] params
@@ -725,6 +727,19 @@ pub async fn test_prom_http_api(store_type: StorageType) {
serde_json::from_value::<PrometheusResponse>(json!(["idc1"])).unwrap()
);
// match labels.
let res = client
.get("/v1/prometheus/api/v1/label/host/values?match[]=multi_labels{idc=\"idc1\", env=\"dev\"}&start=0&end=600")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PrometheusResponse>(json!(["host1", "host2"])).unwrap()
);
// search field name
let res = client
.get("/v1/prometheus/api/v1/label/__field__/values?match[]=demo")
@@ -814,6 +829,7 @@ pub async fn test_prom_http_api(store_type: StorageType) {
"demo_metrics_with_nanos".to_string(),
"logic_table".to_string(),
"mito".to_string(),
"multi_labels".to_string(),
"numbers".to_string()
])
);