feat: impl timestamp function for promql (#6556)

* feat: impl timestamp function for promql

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: style and typo

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: test

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* docs: update comments

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: comment

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

---------

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
This commit is contained in:
dennis zhuang
2025-07-21 11:46:41 +08:00
committed by GitHub
parent 1c8e8b96c1
commit 78b1c6c554
3 changed files with 291 additions and 5 deletions

View File

@@ -191,18 +191,38 @@ impl PromPlanner {
planner.prom_expr_to_plan(&stmt.expr, session_state).await
}
#[async_recursion]
pub async fn prom_expr_to_plan(
&mut self,
prom_expr: &PromExpr,
session_state: &SessionState,
) -> Result<LogicalPlan> {
self.prom_expr_to_plan_inner(prom_expr, false, session_state)
.await
}
/**
Converts a PromQL expression to a logical plan.
NOTE:
The `timestamp_fn` indicates whether the PromQL `timestamp()` function is being evaluated in the current context.
If `true`, the planner generates a logical plan that projects the timestamp (time index) column
as the value column for each input row, implementing the PromQL `timestamp()` function semantics.
If `false`, the planner generates the standard logical plan for the given PromQL expression.
*/
#[async_recursion]
async fn prom_expr_to_plan_inner(
&mut self,
prom_expr: &PromExpr,
timestamp_fn: bool,
session_state: &SessionState,
) -> Result<LogicalPlan> {
let res = match prom_expr {
PromExpr::Aggregate(expr) => self.prom_aggr_expr_to_plan(session_state, expr).await?,
PromExpr::Unary(expr) => self.prom_unary_expr_to_plan(session_state, expr).await?,
PromExpr::Binary(expr) => self.prom_binary_expr_to_plan(session_state, expr).await?,
PromExpr::Paren(ParenExpr { expr }) => {
self.prom_expr_to_plan(expr, session_state).await?
self.prom_expr_to_plan_inner(expr, timestamp_fn, session_state)
.await?
}
PromExpr::Subquery(expr) => {
self.prom_subquery_expr_to_plan(session_state, expr).await?
@@ -210,7 +230,8 @@ impl PromPlanner {
PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?,
PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?,
PromExpr::VectorSelector(selector) => {
self.prom_vector_selector_to_plan(selector).await?
self.prom_vector_selector_to_plan(selector, timestamp_fn)
.await?
}
PromExpr::MatrixSelector(selector) => {
self.prom_matrix_selector_to_plan(selector).await?
@@ -673,6 +694,7 @@ impl PromPlanner {
async fn prom_vector_selector_to_plan(
&mut self,
vector_selector: &VectorSelector,
timestamp_fn: bool,
) -> Result<LogicalPlan> {
let VectorSelector {
name,
@@ -687,6 +709,15 @@ impl PromPlanner {
let normalize = self
.selector_to_series_normalize_plan(offset, matchers, false)
.await?;
let normalize = if timestamp_fn {
// If evaluating the PromQL `timestamp()` function, project the time index column as the value column
// before wrapping with [`InstantManipulate`], so the output matches PromQL's `timestamp()` semantics.
self.create_timestamp_func_plan(normalize)?
} else {
normalize
};
let manipulate = InstantManipulate::new(
self.ctx.start,
self.ctx.end,
@@ -704,6 +735,43 @@ impl PromPlanner {
}))
}
/// Builds a projection plan for the PromQL `timestamp()` function.
/// Projects the time index column as the value column for each row.
///
/// # Arguments
/// * `normalize` - Input [`LogicalPlan`] for the normalized series.
///
/// # Returns
/// Returns a [`Result<LogicalPlan>`] where the resulting logical plan projects the timestamp
/// column as the value column, along with the original tag and time index columns.
///
/// # Timestamp vs. Time Function
///
/// - **Timestamp Function (`timestamp()`)**: In PromQL, the `timestamp()` function returns the
/// timestamp (time index) of each sample as the value column.
///
/// - **Time Function (`time()`)**: The `time()` function returns the evaluation time of the query
/// as a scalar value.
///
/// # Side Effects
/// Updates the planner context's field columns to the timestamp column name.
///
fn create_timestamp_func_plan(&mut self, normalize: LogicalPlan) -> Result<LogicalPlan> {
let time_expr = build_special_time_expr(self.ctx.time_index_column.as_ref().unwrap())
.alias(DEFAULT_FIELD_COLUMN);
self.ctx.field_columns = vec![time_expr.schema_name().to_string()];
let mut project_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 2);
project_exprs.push(self.create_time_index_column_expr()?);
project_exprs.push(time_expr);
project_exprs.extend(self.create_tag_column_exprs()?);
LogicalPlanBuilder::from(normalize)
.project(project_exprs)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)
}
async fn prom_matrix_selector_to_plan(
&mut self,
matrix_selector: &MatrixSelector,
@@ -768,7 +836,8 @@ impl PromPlanner {
// transform function arguments
let args = self.create_function_args(&args.args)?;
let input = if let Some(prom_expr) = &args.input {
self.prom_expr_to_plan(prom_expr, session_state).await?
self.prom_expr_to_plan_inner(prom_expr, func.name == "timestamp", session_state)
.await?
} else {
self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
self.ctx.reset_table_name_and_schema();
@@ -1654,7 +1723,7 @@ impl PromPlanner {
ScalarFunc::GeneratedExpr
}
"sort" | "sort_desc" | "sort_by_label" | "sort_by_label_desc" => {
"sort" | "sort_desc" | "sort_by_label" | "sort_by_label_desc" | "timestamp" => {
// These functions are not expression but a part of plan,
// they are processed by `prom_call_expr_to_plan`.
for value in &self.ctx.field_columns {

View File

@@ -0,0 +1,160 @@
-- Test `timestamp()` function
-- timestamp() returns the timestamp of each sample as seconds since Unix epoch
create table timestamp_test (ts timestamp time index, val double);
Affected Rows: 0
insert into timestamp_test values
(0, 1.0),
(1000, 2.0),
(60000, 3.0),
(3600000, 4.0),
-- 2021-01-01 00:00:00
(1609459200000, 5.0),
-- 2021-01-01 00:01:00
(1609459260000, 6.0);
Affected Rows: 6
-- Test timestamp() with time series
tql eval (0, 3600, '30s') timestamp(timestamp_test);
+---------------------+--------+
| ts | value |
+---------------------+--------+
| 1970-01-01T00:00:00 | 0.0 |
| 1970-01-01T00:00:30 | 1.0 |
| 1970-01-01T00:01:00 | 60.0 |
| 1970-01-01T00:01:30 | 60.0 |
| 1970-01-01T00:02:00 | 60.0 |
| 1970-01-01T00:02:30 | 60.0 |
| 1970-01-01T00:03:00 | 60.0 |
| 1970-01-01T00:03:30 | 60.0 |
| 1970-01-01T00:04:00 | 60.0 |
| 1970-01-01T00:04:30 | 60.0 |
| 1970-01-01T00:05:00 | 60.0 |
| 1970-01-01T00:05:30 | 60.0 |
| 1970-01-01T00:06:00 | 60.0 |
| 1970-01-01T01:00:00 | 3600.0 |
+---------------------+--------+
-- Test timestamp() with specific time range
tql eval (0, 60, '30s') timestamp(timestamp_test);
+---------------------+-------+
| ts | value |
+---------------------+-------+
| 1970-01-01T00:00:00 | 0.0 |
| 1970-01-01T00:00:30 | 1.0 |
| 1970-01-01T00:01:00 | 60.0 |
+---------------------+-------+
tql eval (0, 60, '30s') -timestamp(timestamp_test);
+---------------------+-----------+
| ts | (- value) |
+---------------------+-----------+
| 1970-01-01T00:00:00 | -0.0 |
| 1970-01-01T00:00:30 | -1.0 |
| 1970-01-01T00:01:00 | -60.0 |
+---------------------+-----------+
-- Test timestamp() with 2021 data
tql eval (1609459200, 1609459260, '30s') timestamp(timestamp_test);
+---------------------+--------------+
| ts | value |
+---------------------+--------------+
| 2021-01-01T00:00:00 | 1609459200.0 |
| 2021-01-01T00:00:30 | 1609459200.0 |
| 2021-01-01T00:01:00 | 1609459260.0 |
+---------------------+--------------+
-- Test timestamp() with arithmetic operations
tql eval (0, 60, '30s') timestamp(timestamp_test) + 1;
+---------------------+--------------------+
| ts | value + Float64(1) |
+---------------------+--------------------+
| 1970-01-01T00:00:00 | 1.0 |
| 1970-01-01T00:00:30 | 2.0 |
| 1970-01-01T00:01:00 | 61.0 |
+---------------------+--------------------+
-- Test timestamp() with boolean operations
tql eval (0, 60, '30s') timestamp(timestamp_test) > bool 30;
+---------------------+---------------------+
| ts | value > Float64(30) |
+---------------------+---------------------+
| 1970-01-01T00:00:00 | 0.0 |
| 1970-01-01T00:00:30 | 0.0 |
| 1970-01-01T00:01:00 | 1.0 |
+---------------------+---------------------+
-- Test timestamp() with time functions
tql eval (0, 60, '30s') timestamp(timestamp_test) - time();
+---------------------+----------------------------+
| ts | value - ts / Float64(1000) |
+---------------------+----------------------------+
| 1970-01-01T00:00:00 | 0.0 |
| 1970-01-01T00:00:30 | -29.0 |
| 1970-01-01T00:01:00 | 0.0 |
+---------------------+----------------------------+
-- Test timestamp() with other functions
tql eval (0, 60, '30s') abs(timestamp(timestamp_test) - avg(timestamp(timestamp_test))) > 20;
Error: 1004(InvalidArguments), Invalid function argument for unknown
tql eval (0, 60, '30s') timestamp(timestamp_test) == 60;
+---------------------+-------+
| ts | value |
+---------------------+-------+
| 1970-01-01T00:01:00 | 60.0 |
+---------------------+-------+
-- Test timestamp() with multiple metrics
create table timestamp_test2 (ts timestamp time index, val double);
Affected Rows: 0
insert into timestamp_test2 values
(0, 10.0),
(1000, 20.0),
(60000, 30.0);
Affected Rows: 3
-- SQLNESS SORT_RESULT 3 1
tql eval (0, 60, '30s') timestamp(timestamp_test) + timestamp(timestamp_test2);
+---------------------+----------------------------------------------+
| ts | timestamp_test.value + timestamp_test2.value |
+---------------------+----------------------------------------------+
| 1970-01-01T00:00:00 | 0.0 |
| 1970-01-01T00:00:30 | 2.0 |
| 1970-01-01T00:01:00 | 120.0 |
+---------------------+----------------------------------------------+
-- SQLNESS SORT_RESULT 3 1
tql eval (0, 60, '30s') timestamp(timestamp_test) == timestamp(timestamp_test2);
+---------------------+-------+---------------------+-------+
| ts | value | ts | value |
+---------------------+-------+---------------------+-------+
| 1970-01-01T00:00:00 | 0.0 | 1970-01-01T00:00:00 | 0.0 |
| 1970-01-01T00:00:30 | 1.0 | 1970-01-01T00:00:30 | 1.0 |
| 1970-01-01T00:01:00 | 60.0 | 1970-01-01T00:01:00 | 60.0 |
+---------------------+-------+---------------------+-------+
drop table timestamp_test;
Affected Rows: 0
drop table timestamp_test2;
Affected Rows: 0

View File

@@ -0,0 +1,57 @@
-- Test `timestamp()` function
-- timestamp() returns the timestamp of each sample as seconds since Unix epoch
create table timestamp_test (ts timestamp time index, val double);
insert into timestamp_test values
(0, 1.0),
(1000, 2.0),
(60000, 3.0),
(3600000, 4.0),
-- 2021-01-01 00:00:00
(1609459200000, 5.0),
-- 2021-01-01 00:01:00
(1609459260000, 6.0);
-- Test timestamp() with time series
tql eval (0, 3600, '30s') timestamp(timestamp_test);
-- Test timestamp() with specific time range
tql eval (0, 60, '30s') timestamp(timestamp_test);
tql eval (0, 60, '30s') -timestamp(timestamp_test);
-- Test timestamp() with 2021 data
tql eval (1609459200, 1609459260, '30s') timestamp(timestamp_test);
-- Test timestamp() with arithmetic operations
tql eval (0, 60, '30s') timestamp(timestamp_test) + 1;
-- Test timestamp() with boolean operations
tql eval (0, 60, '30s') timestamp(timestamp_test) > bool 30;
-- Test timestamp() with time functions
tql eval (0, 60, '30s') timestamp(timestamp_test) - time();
-- Test timestamp() with other functions
tql eval (0, 60, '30s') abs(timestamp(timestamp_test) - avg(timestamp(timestamp_test))) > 20;
tql eval (0, 60, '30s') timestamp(timestamp_test) == 60;
-- Test timestamp() with multiple metrics
create table timestamp_test2 (ts timestamp time index, val double);
insert into timestamp_test2 values
(0, 10.0),
(1000, 20.0),
(60000, 30.0);
-- SQLNESS SORT_RESULT 3 1
tql eval (0, 60, '30s') timestamp(timestamp_test) + timestamp(timestamp_test2);
-- SQLNESS SORT_RESULT 3 1
tql eval (0, 60, '30s') timestamp(timestamp_test) == timestamp(timestamp_test2);
drop table timestamp_test;
drop table timestamp_test2;