mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-28 19:00:39 +00:00
fix: do not add projection to cast timestamp in label_values (#6040)
* fix: do not add projection for cast Use cast to build time filter directly instead of adding a projection, which will cause column not found * feat: cast before creating plan
This commit is contained in:
@@ -17,6 +17,7 @@ use std::any::Any;
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_macro::stack_trace_debug;
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use datafusion::error::DataFusionError;
|
||||
use promql::error::Error as PromqlError;
|
||||
use promql_parser::parser::token::TokenType;
|
||||
@@ -192,6 +193,14 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Timestamp out of range: {} of {:?}", timestamp, unit))]
|
||||
TimestampOutOfRange {
|
||||
timestamp: i64,
|
||||
unit: TimeUnit,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
impl ErrorExt for Error {
|
||||
@@ -211,7 +220,8 @@ impl ErrorExt for Error {
|
||||
| UnsupportedVectorMatch { .. }
|
||||
| CombineTableColumnMismatch { .. }
|
||||
| UnexpectedPlanExpr { .. }
|
||||
| UnsupportedMatcherOp { .. } => StatusCode::InvalidArguments,
|
||||
| UnsupportedMatcherOp { .. }
|
||||
| TimestampOutOfRange { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
UnknownTable { .. } => StatusCode::Internal,
|
||||
|
||||
|
||||
@@ -14,80 +14,73 @@
|
||||
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use common_time::Timestamp;
|
||||
use datafusion_common::{Column, ScalarValue};
|
||||
use datafusion_expr::expr::Alias;
|
||||
use datafusion_expr::utils::conjunction;
|
||||
use datafusion_expr::{col, Cast, Expr, LogicalPlan, LogicalPlanBuilder};
|
||||
use datafusion_sql::TableReference;
|
||||
use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datafusion_expr::{col, Expr, LogicalPlan, LogicalPlanBuilder};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use table::TableRef;
|
||||
|
||||
use crate::promql::error::{DataFusionPlanningSnafu, Result, TimeIndexNotFoundSnafu};
|
||||
use crate::promql::error::{
|
||||
DataFusionPlanningSnafu, Result, TimeIndexNotFoundSnafu, TimestampOutOfRangeSnafu,
|
||||
};
|
||||
|
||||
fn build_time_filter(time_index_expr: Expr, start: i64, end: i64) -> Expr {
|
||||
fn build_time_filter(time_index_expr: Expr, start: Timestamp, end: Timestamp) -> Expr {
|
||||
time_index_expr
|
||||
.clone()
|
||||
.gt_eq(Expr::Literal(ScalarValue::TimestampMillisecond(
|
||||
Some(start),
|
||||
None,
|
||||
)))
|
||||
.and(
|
||||
time_index_expr.lt_eq(Expr::Literal(ScalarValue::TimestampMillisecond(
|
||||
Some(end),
|
||||
None,
|
||||
))),
|
||||
)
|
||||
.gt_eq(Expr::Literal(timestamp_to_scalar_value(start)))
|
||||
.and(time_index_expr.lt_eq(Expr::Literal(timestamp_to_scalar_value(end))))
|
||||
}
|
||||
|
||||
fn timestamp_to_scalar_value(timestamp: Timestamp) -> ScalarValue {
|
||||
let value = timestamp.value();
|
||||
match timestamp.unit() {
|
||||
TimeUnit::Second => ScalarValue::TimestampSecond(Some(value), None),
|
||||
TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(value), None),
|
||||
TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(value), None),
|
||||
TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(value), None),
|
||||
}
|
||||
}
|
||||
|
||||
/// Rewrite label values query to DataFusion logical plan.
|
||||
pub fn rewrite_label_values_query(
|
||||
table: TableRef,
|
||||
mut scan_plan: LogicalPlan,
|
||||
scan_plan: LogicalPlan,
|
||||
mut conditions: Vec<Expr>,
|
||||
label_name: String,
|
||||
start: SystemTime,
|
||||
end: SystemTime,
|
||||
) -> Result<LogicalPlan> {
|
||||
let table_ref = TableReference::partial(
|
||||
table.table_info().schema_name.as_str(),
|
||||
table.table_info().name.as_str(),
|
||||
);
|
||||
let schema = table.schema();
|
||||
let ts_column = schema
|
||||
.timestamp_column()
|
||||
.with_context(|| TimeIndexNotFoundSnafu {
|
||||
table: table.table_info().full_table_name(),
|
||||
})?;
|
||||
let unit = ts_column
|
||||
.data_type
|
||||
.as_timestamp()
|
||||
.map(|data_type| data_type.unit())
|
||||
.with_context(|| TimeIndexNotFoundSnafu {
|
||||
table: table.table_info().full_table_name(),
|
||||
})?;
|
||||
|
||||
let is_time_index_ms =
|
||||
ts_column.data_type == ConcreteDataType::timestamp_millisecond_datatype();
|
||||
// We only support millisecond precision at most.
|
||||
let start =
|
||||
Timestamp::new_millisecond(start.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64);
|
||||
let start = start.convert_to(unit).context(TimestampOutOfRangeSnafu {
|
||||
timestamp: start.value(),
|
||||
unit,
|
||||
})?;
|
||||
let end =
|
||||
Timestamp::new_millisecond(end.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64);
|
||||
let end = end.convert_to(unit).context(TimestampOutOfRangeSnafu {
|
||||
timestamp: end.value(),
|
||||
unit,
|
||||
})?;
|
||||
let time_index_expr = col(Column::from_name(ts_column.name.clone()));
|
||||
|
||||
if !is_time_index_ms {
|
||||
// cast to ms if time_index not in Millisecond precision
|
||||
let expr = vec![
|
||||
col(Column::from_name(label_name.clone())),
|
||||
Expr::Alias(Alias {
|
||||
expr: Box::new(Expr::Cast(Cast {
|
||||
expr: Box::new(time_index_expr.clone()),
|
||||
data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
|
||||
})),
|
||||
relation: Some(table_ref),
|
||||
name: ts_column.name.clone(),
|
||||
}),
|
||||
];
|
||||
scan_plan = LogicalPlanBuilder::from(scan_plan)
|
||||
.project(expr)
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
.build()
|
||||
.context(DataFusionPlanningSnafu)?;
|
||||
};
|
||||
|
||||
let start = start.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64;
|
||||
let end = end.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64;
|
||||
|
||||
conditions.push(build_time_filter(time_index_expr, start, end));
|
||||
// Safety: `conditions` is not empty.
|
||||
let filter = conjunction(conditions).unwrap();
|
||||
|
||||
Reference in New Issue
Block a user