From 68b231987c9893c1d487054d4e1bcecb682e8f3d Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 20 Feb 2023 15:29:43 +0800 Subject: [PATCH] feat: improve Prometheus compliance (#1022) * initial impl Signed-off-by: Ruihang Xia * minor (useless) refactor Signed-off-by: Ruihang Xia * retrieve metric name Signed-off-by: Ruihang Xia * add time index column to group by columns filter out NaN in normalize remove NULL in instant manipulator accept form data as HTTP params correct API URL accept second literal as step param * happy clippy Signed-off-by: Ruihang Xia * update test result Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- Cargo.lock | 1 + src/datanode/src/tests/promql_test.rs | 116 ++++++----- .../src/extension_plan/instant_manipulate.rs | 81 ++------ src/promql/src/extension_plan/normalize.rs | 20 +- src/promql/src/planner.rs | 103 ++++++---- src/query/src/parser.rs | 6 +- src/servers/Cargo.toml | 1 + src/servers/src/promql.rs | 190 +++++++++++++++--- tests-integration/tests/http.rs | 9 +- 9 files changed, 332 insertions(+), 195 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8d3f5a56c4..116cb2fea7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6712,6 +6712,7 @@ dependencies = [ "pgwire", "pin-project", "postgres-types", + "promql-parser", "prost 0.11.6", "query", "rand 0.8.5", diff --git a/src/datanode/src/tests/promql_test.rs b/src/datanode/src/tests/promql_test.rs index c26f8f584d..cfd31ca81b 100644 --- a/src/datanode/src/tests/promql_test.rs +++ b/src/datanode/src/tests/promql_test.rs @@ -82,16 +82,16 @@ async fn sql_insert_promql_query_ceil() { UNIX_EPOCH.checked_add(Duration::from_secs(100)).unwrap(), Duration::from_secs(5), Duration::from_secs(1), - "+---------------------+-------------------------------+----------------------------------+\ - \n| ts | ceil(http_requests_total.cpu) | ceil(http_requests_total.memory) |\ - \n+---------------------+-------------------------------+----------------------------------+\ - \n| 1970-01-01T00:00:00 | 67 | 1024 |\ - \n| 1970-01-01T00:00:05 | 67 | 4096 |\ - \n| 1970-01-01T00:00:10 | 100 | 20480 |\ - \n| 1970-01-01T00:00:50 | 12424 | 1334 |\ - \n| 1970-01-01T00:01:20 | 0 | 2334 |\ - \n| 1970-01-01T00:01:40 | 49 | 3334 |\ - \n+---------------------+-------------------------------+----------------------------------+", + "+---------------------+-------------------------------+----------------------------------+-------+\ + \n| ts | ceil(http_requests_total.cpu) | ceil(http_requests_total.memory) | host |\ + \n+---------------------+-------------------------------+----------------------------------+-------+\ + \n| 1970-01-01T00:00:00 | 67 | 1024 | host1 |\ + \n| 1970-01-01T00:00:05 | 67 | 4096 | host1 |\ + \n| 1970-01-01T00:00:10 | 100 | 20480 | host1 |\ + \n| 1970-01-01T00:01:20 | 0 | 2334 | host1 |\ + \n| 1970-01-01T00:01:40 | 49 | 3334 | host1 |\ + \n| 1970-01-01T00:00:50 | 12424 | 1334 | host1 |\ + \n+---------------------+-------------------------------+----------------------------------+-------+" ) .await; } @@ -142,13 +142,12 @@ async fn aggregators_simple_sum() { unix_epoch_plus_100s(), Duration::from_secs(60), Duration::from_secs(0), - "+------------+--------------------------+\ - \n| group | SUM(http_requests.value) |\ - \n+------------+--------------------------+\ - \n| | |\ - \n| canary | 700 |\ - \n| production | 300 |\ - \n+------------+--------------------------+", + "+------------+---------------------+--------------------------+\ + \n| group | ts | SUM(http_requests.value) |\ + \n+------------+---------------------+--------------------------+\ + \n| production | 1970-01-01T00:00:00 | 300 |\ + \n| canary | 1970-01-01T00:00:00 | 700 |\ + \n+------------+---------------------+--------------------------+", ) .await; } @@ -167,13 +166,12 @@ async fn aggregators_simple_avg() { unix_epoch_plus_100s(), Duration::from_secs(60), Duration::from_secs(0), - "+------------+--------------------------+\ - \n| group | AVG(http_requests.value) |\ - \n+------------+--------------------------+\ - \n| | 0 |\ - \n| production | 150 |\ - \n| canary | 350 |\ - \n+------------+--------------------------+", + "+------------+---------------------+--------------------------+\ + \n| group | ts | AVG(http_requests.value) |\ + \n+------------+---------------------+--------------------------+\ + \n| production | 1970-01-01T00:00:00 | 150 |\ + \n| canary | 1970-01-01T00:00:00 | 350 |\ + \n+------------+---------------------+--------------------------+", ) .await; } @@ -192,13 +190,12 @@ async fn aggregators_simple_count() { unix_epoch_plus_100s(), Duration::from_secs(60), Duration::from_secs(0), - "+------------+----------------------------+\ - \n| group | COUNT(http_requests.value) |\ - \n+------------+----------------------------+\ - \n| | 0 |\ - \n| canary | 2 |\ - \n| production | 2 |\ - \n+------------+----------------------------+", + "+------------+---------------------+----------------------------+\ + \n| group | ts | COUNT(http_requests.value) |\ + \n+------------+---------------------+----------------------------+\ + \n| canary | 1970-01-01T00:00:00 | 2 |\ + \n| production | 1970-01-01T00:00:00 | 2 |\ + \n+------------+---------------------+----------------------------+", ) .await; } @@ -217,13 +214,12 @@ async fn aggregators_simple_without() { unix_epoch_plus_100s(), Duration::from_secs(60), Duration::from_secs(0), - "+------------+------------+--------------------------+\ - \n| group | job | SUM(http_requests.value) |\ - \n+------------+------------+--------------------------+\ - \n| | | |\ - \n| canary | api-server | 700 |\ - \n| production | api-server | 300 |\ - \n+------------+------------+--------------------------+", + "+------------+------------+---------------------+--------------------------+\ + \n| group | job | ts | SUM(http_requests.value) |\ + \n+------------+------------+---------------------+--------------------------+\ + \n| production | api-server | 1970-01-01T00:00:00 | 300 |\ + \n| canary | api-server | 1970-01-01T00:00:00 | 700 |\ + \n+------------+------------+---------------------+--------------------------+", ) .await; } @@ -241,11 +237,11 @@ async fn aggregators_empty_by() { unix_epoch_plus_100s(), Duration::from_secs(60), Duration::from_secs(0), - "+--------------------------+\ - \n| SUM(http_requests.value) |\ - \n+--------------------------+\ - \n| 1000 |\ - \n+--------------------------+", + "+---------------------+--------------------------+\ + \n| ts | SUM(http_requests.value) |\ + \n+---------------------+--------------------------+\ + \n| 1970-01-01T00:00:00 | 1000 |\ + \n+---------------------+--------------------------+", ) .await; } @@ -263,11 +259,11 @@ async fn aggregators_no_by_without() { unix_epoch_plus_100s(), Duration::from_secs(60), Duration::from_secs(0), - "+--------------------------+\ - \n| SUM(http_requests.value) |\ - \n+--------------------------+\ - \n| 1000 |\ - \n+--------------------------+", + "+---------------------+--------------------------+\ + \n| ts | SUM(http_requests.value) |\ + \n+---------------------+--------------------------+\ + \n| 1970-01-01T00:00:00 | 1000 |\ + \n+---------------------+--------------------------+", ) .await; } @@ -286,13 +282,12 @@ async fn aggregators_empty_without() { unix_epoch_plus_100s(), Duration::from_secs(60), Duration::from_secs(0), - "+------------+----------+------------+--------------------------+\ - \n| group | instance | job | SUM(http_requests.value) |\ - \n+------------+----------+------------+--------------------------+\ - \n| | | | |\ - \n| production | 0 | api-server | 100 |\ - \n| production | 1 | api-server | 200 |\ - \n+------------+----------+------------+--------------------------+", + "+------------+----------+------------+---------------------+--------------------------+\ + \n| group | instance | job | ts | SUM(http_requests.value) |\ + \n+------------+----------+------------+---------------------+--------------------------+\ + \n| production | 0 | api-server | 1970-01-01T00:00:00 | 100 |\ + \n| production | 1 | api-server | 1970-01-01T00:00:00 | 200 |\ + \n+------------+----------+------------+---------------------+--------------------------+", ) .await; } @@ -356,11 +351,12 @@ async fn stddev_by_label() { unix_epoch_plus_100s(), Duration::from_secs(60), Duration::from_secs(0), - "+----------+-----------------------------+\ - \n| instance | STDDEV(http_requests.value) |\ - \n+----------+-----------------------------+\ - \n| 0 | 258.19888974716116 |\ - \n+----------+-----------------------------+", + "+----------+---------------------+-----------------------------+\ + \n| instance | ts | STDDEV(http_requests.value) |\ + \n+----------+---------------------+-----------------------------+\ + \n| 0 | 1970-01-01T00:00:00 | 258.19888974716116 |\ + \n| 1 | 1970-01-01T00:00:00 | 258.19888974716116 |\ + \n+----------+---------------------+-----------------------------+", ) .await; } diff --git a/src/promql/src/extension_plan/instant_manipulate.rs b/src/promql/src/extension_plan/instant_manipulate.rs index a096447b5e..e735bd4a94 100644 --- a/src/promql/src/extension_plan/instant_manipulate.rs +++ b/src/promql/src/extension_plan/instant_manipulate.rs @@ -41,8 +41,7 @@ use crate::extension_plan::Millisecond; /// /// This plan will try to align the input time series, for every timestamp between /// `start` and `end` with step `interval`. Find in the `lookback` range if data -/// is missing at the given timestamp. If data is absent in some timestamp, all columns -/// except the time index will left blank. +/// is missing at the given timestamp. #[derive(Debug)] pub struct InstantManipulate { start: Millisecond, @@ -341,6 +340,17 @@ impl InstantManipulateStream { take_indices: Vec>, aligned_ts: Vec, ) -> ArrowResult { + let aligned_ts = aligned_ts + .into_iter() + .zip(take_indices.iter()) + .filter_map(|(ts, i)| i.map(|_| ts)) + .collect::>(); + let take_indices = take_indices + .iter() + .filter(|i| i.is_some()) + .copied() + .collect::>(); + let indices_array = UInt64Array::from(take_indices); let mut arrays = record_batch .columns() @@ -349,7 +359,8 @@ impl InstantManipulateStream { .collect::>>()?; arrays[self.time_index] = Arc::new(TimestampMillisecondArray::from(aligned_ts)); - RecordBatch::try_new(record_batch.schema(), arrays) + let result = RecordBatch::try_new(record_batch.schema(), arrays)?; + Ok(result) } } @@ -430,11 +441,8 @@ mod test { \n| 1970-01-01T00:01:00 | 1 | foo |\ \n| 1970-01-01T00:01:30 | 1 | foo |\ \n| 1970-01-01T00:02:00 | 1 | foo |\ - \n| 1970-01-01T00:02:30 | | |\ \n| 1970-01-01T00:03:00 | 1 | foo |\ - \n| 1970-01-01T00:03:30 | | |\ \n| 1970-01-01T00:04:00 | 1 | foo |\ - \n| 1970-01-01T00:04:30 | | |\ \n| 1970-01-01T00:05:00 | 1 | foo |\ \n+---------------------+-------+------+", ); @@ -449,34 +457,19 @@ mod test { \n+---------------------+-------+------+\ \n| 1970-01-01T00:00:00 | 1 | foo |\ \n| 1970-01-01T00:00:10 | 1 | foo |\ - \n| 1970-01-01T00:00:20 | | |\ \n| 1970-01-01T00:00:30 | 1 | foo |\ \n| 1970-01-01T00:00:40 | 1 | foo |\ - \n| 1970-01-01T00:00:50 | | |\ \n| 1970-01-01T00:01:00 | 1 | foo |\ \n| 1970-01-01T00:01:10 | 1 | foo |\ - \n| 1970-01-01T00:01:20 | | |\ \n| 1970-01-01T00:01:30 | 1 | foo |\ \n| 1970-01-01T00:01:40 | 1 | foo |\ - \n| 1970-01-01T00:01:50 | | |\ \n| 1970-01-01T00:02:00 | 1 | foo |\ \n| 1970-01-01T00:02:10 | 1 | foo |\ - \n| 1970-01-01T00:02:20 | | |\ - \n| 1970-01-01T00:02:30 | | |\ - \n| 1970-01-01T00:02:40 | | |\ - \n| 1970-01-01T00:02:50 | | |\ \n| 1970-01-01T00:03:00 | 1 | foo |\ \n| 1970-01-01T00:03:10 | 1 | foo |\ - \n| 1970-01-01T00:03:20 | | |\ - \n| 1970-01-01T00:03:30 | | |\ - \n| 1970-01-01T00:03:40 | | |\ - \n| 1970-01-01T00:03:50 | | |\ \n| 1970-01-01T00:04:00 | 1 | foo |\ \n| 1970-01-01T00:04:10 | 1 | foo |\ - \n| 1970-01-01T00:04:20 | | |\ - \n| 1970-01-01T00:04:30 | | |\ \n| 1970-01-01T00:04:40 | 1 | foo |\ - \n| 1970-01-01T00:04:50 | | |\ \n| 1970-01-01T00:05:00 | 1 | foo |\ \n+---------------------+-------+------+", ); @@ -527,14 +520,10 @@ mod test { \n| 1970-01-01T00:02:10 | 1 | foo |\ \n| 1970-01-01T00:02:20 | 1 | foo |\ \n| 1970-01-01T00:02:30 | 1 | foo |\ - \n| 1970-01-01T00:02:40 | | |\ - \n| 1970-01-01T00:02:50 | | |\ \n| 1970-01-01T00:03:00 | 1 | foo |\ \n| 1970-01-01T00:03:10 | 1 | foo |\ \n| 1970-01-01T00:03:20 | 1 | foo |\ \n| 1970-01-01T00:03:30 | 1 | foo |\ - \n| 1970-01-01T00:03:40 | | |\ - \n| 1970-01-01T00:03:50 | | |\ \n| 1970-01-01T00:04:00 | 1 | foo |\ \n| 1970-01-01T00:04:10 | 1 | foo |\ \n| 1970-01-01T00:04:20 | 1 | foo |\ @@ -617,22 +606,8 @@ mod test { "+---------------------+-------+------+\ \n| timestamp | value | path |\ \n+---------------------+-------+------+\ - \n| 1970-01-01T00:03:50 | | |\ - \n| 1970-01-01T00:03:51 | | |\ - \n| 1970-01-01T00:03:52 | | |\ - \n| 1970-01-01T00:03:53 | | |\ - \n| 1970-01-01T00:03:54 | | |\ - \n| 1970-01-01T00:03:55 | | |\ - \n| 1970-01-01T00:03:56 | | |\ - \n| 1970-01-01T00:03:57 | | |\ - \n| 1970-01-01T00:03:58 | | |\ - \n| 1970-01-01T00:03:59 | | |\ \n| 1970-01-01T00:04:00 | 1 | foo |\ \n| 1970-01-01T00:04:01 | 1 | foo |\ - \n| 1970-01-01T00:04:02 | | |\ - \n| 1970-01-01T00:04:03 | | |\ - \n| 1970-01-01T00:04:04 | | |\ - \n| 1970-01-01T00:04:05 | | |\ \n+---------------------+-------+------+", ); do_normalize_test(230_000, 245_000, 0, 1_000, expected).await; @@ -646,7 +621,6 @@ mod test { \n+---------------------+-------+------+\ \n| 1970-01-01T00:00:00 | 1 | foo |\ \n| 1970-01-01T00:00:10 | 1 | foo |\ - \n| 1970-01-01T00:00:20 | | |\ \n| 1970-01-01T00:00:30 | 1 | foo |\ \n+---------------------+-------+------+", ); @@ -659,37 +633,12 @@ mod test { "+---------------------+-------+------+\ \n| timestamp | value | path |\ \n+---------------------+-------+------+\ - \n| 1969-12-31T23:45:00 | | |\ - \n| 1969-12-31T23:46:00 | | |\ - \n| 1969-12-31T23:47:00 | | |\ - \n| 1969-12-31T23:48:00 | | |\ - \n| 1969-12-31T23:49:00 | | |\ - \n| 1969-12-31T23:50:00 | | |\ - \n| 1969-12-31T23:51:00 | | |\ - \n| 1969-12-31T23:52:00 | | |\ - \n| 1969-12-31T23:53:00 | | |\ - \n| 1969-12-31T23:54:00 | | |\ - \n| 1969-12-31T23:55:00 | | |\ - \n| 1969-12-31T23:56:00 | | |\ - \n| 1969-12-31T23:57:00 | | |\ - \n| 1969-12-31T23:58:00 | | |\ - \n| 1969-12-31T23:59:00 | | |\ \n| 1970-01-01T00:00:00 | 1 | foo |\ \n| 1970-01-01T00:01:00 | 1 | foo |\ \n| 1970-01-01T00:02:00 | 1 | foo |\ \n| 1970-01-01T00:03:00 | 1 | foo |\ \n| 1970-01-01T00:04:00 | 1 | foo |\ \n| 1970-01-01T00:05:00 | 1 | foo |\ - \n| 1970-01-01T00:06:00 | | |\ - \n| 1970-01-01T00:07:00 | | |\ - \n| 1970-01-01T00:08:00 | | |\ - \n| 1970-01-01T00:09:00 | | |\ - \n| 1970-01-01T00:10:00 | | |\ - \n| 1970-01-01T00:11:00 | | |\ - \n| 1970-01-01T00:12:00 | | |\ - \n| 1970-01-01T00:13:00 | | |\ - \n| 1970-01-01T00:14:00 | | |\ - \n| 1970-01-01T00:15:00 | | |\ \n+---------------------+-------+------+", ); do_normalize_test(-900_000, 900_000, 30_000, 60_000, expected).await; @@ -704,8 +653,6 @@ mod test { \n| 1970-01-01T00:03:10 | 1 | foo |\ \n| 1970-01-01T00:03:20 | 1 | foo |\ \n| 1970-01-01T00:03:30 | 1 | foo |\ - \n| 1970-01-01T00:03:40 | | |\ - \n| 1970-01-01T00:03:50 | | |\ \n| 1970-01-01T00:04:00 | 1 | foo |\ \n| 1970-01-01T00:04:10 | 1 | foo |\ \n| 1970-01-01T00:04:20 | 1 | foo |\ diff --git a/src/promql/src/extension_plan/normalize.rs b/src/promql/src/extension_plan/normalize.rs index 34b84d0c9f..9623a0ef09 100644 --- a/src/promql/src/extension_plan/normalize.rs +++ b/src/promql/src/extension_plan/normalize.rs @@ -17,6 +17,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use datafusion::arrow::array::{BooleanArray, Float64Array}; use datafusion::arrow::compute; use datafusion::common::{DFSchemaRef, Result as DataFusionResult, Statistics}; use datafusion::execution::context::TaskContext; @@ -40,6 +41,7 @@ use crate::extension_plan::Millisecond; /// Roughly speaking, this method does these things: /// - bias sample's timestamp by offset /// - sort the record batch based on timestamp column +/// - remove NaN values #[derive(Debug)] pub struct SeriesNormalize { offset: Millisecond, @@ -237,7 +239,23 @@ impl SeriesNormalizeStream { .iter() .map(|array| compute::take(array, &ordered_indices, None)) .collect::>>()?; - RecordBatch::try_new(input.schema(), ordered_columns) + let ordered_batch = RecordBatch::try_new(input.schema(), ordered_columns)?; + + // TODO(ruihang): consider the "special NaN" + // filter out NaN + let mut filter = vec![true; input.num_rows()]; + for column in ordered_batch.columns() { + if let Some(float_column) = column.as_any().downcast_ref::() { + for (i, flag) in filter.iter_mut().enumerate() { + if float_column.value(i).is_nan() { + *flag = false; + } + } + } + } + + let result = compute::filter_record_batch(&ordered_batch, &BooleanArray::from(filter))?; + Ok(result) } } diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 7ec70e38ad..2f6fec3eeb 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -103,9 +103,12 @@ impl PromPlanner { let input = self.prom_expr_to_plan(*expr.clone())?; // calculate columns to group by - let group_exprs = modifier.as_ref().map_or(Ok(Vec::new()), |m| { - self.agg_modifier_to_col(input.schema(), m) - })?; + // Need to append time index column into group by columns + let group_exprs = modifier + .as_ref() + .map_or(Ok(vec![self.create_time_index_column_expr()?]), |m| { + self.agg_modifier_to_col(input.schema(), m) + })?; // convert op and value columns to aggregate exprs let aggr_exprs = self.create_aggregate_exprs(*op, &input)?; @@ -114,9 +117,15 @@ impl PromPlanner { self.ctx.time_index_column = None; // create plan + let group_sort_expr = group_exprs + .clone() + .into_iter() + .map(|expr| expr.sort(true, false)); LogicalPlanBuilder::from(input) .aggregate(group_exprs, aggr_exprs) .context(DataFusionPlanningSnafu)? + .sort(group_sort_expr) + .context(DataFusionPlanningSnafu)? .build() .context(DataFusionPlanningSnafu)? } @@ -267,6 +276,7 @@ impl PromPlanner { })?)?; let mut func_exprs = self.create_function_expr(func, args.literals)?; func_exprs.insert(0, self.create_time_index_column_expr()?); + func_exprs.extend_from_slice(&self.create_tag_column_exprs()?); LogicalPlanBuilder::from(input) .project(func_exprs) .context(DataFusionPlanningSnafu)? @@ -302,12 +312,20 @@ impl PromPlanner { let table_name = self.ctx.table_name.clone().unwrap(); // make filter exprs + let offset_duration = -match offset { + Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond, + Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond), + None => 0, + }; let mut filters = self.matchers_to_expr(label_matchers)?; filters.push(self.create_time_index_column_expr()?.gt_eq(DfExpr::Literal( - ScalarValue::TimestampMillisecond(Some(self.ctx.start), None), + ScalarValue::TimestampMillisecond( + Some(self.ctx.start - offset_duration - self.ctx.lookback_delta), + None, + ), ))); filters.push(self.create_time_index_column_expr()?.lt_eq(DfExpr::Literal( - ScalarValue::TimestampMillisecond(Some(self.ctx.end), None), + ScalarValue::TimestampMillisecond(Some(self.ctx.end - offset_duration), None), ))); // make table scan with filter exprs @@ -328,11 +346,6 @@ impl PromPlanner { }); // make series_normalize plan - let offset_duration = match offset { - Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond, - Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond), - None => 0, - }; let series_normalize = SeriesNormalize::new( offset_duration, self.ctx @@ -344,10 +357,12 @@ impl PromPlanner { let logical_plan = LogicalPlan::Extension(Extension { node: Arc::new(series_normalize), }); + Ok(logical_plan) } /// Convert [AggModifier] to [Column] exprs for aggregation. + /// Timestamp column and tag columns will be included. /// /// # Side effect /// @@ -380,6 +395,9 @@ impl PromPlanner { // change the tag columns in context self.ctx.tag_columns = labels.iter().cloned().collect(); + // add timestamp column + exprs.push(self.create_time_index_column_expr()?); + Ok(exprs) } AggModifier::Without(labels) => { @@ -415,10 +433,14 @@ impl PromPlanner { self.ctx.tag_columns = all_fields.iter().map(|col| (*col).clone()).collect(); // collect remaining fields and convert to col expr - let exprs = all_fields + let mut exprs = all_fields .into_iter() .map(|c| DfExpr::Column(Column::from(c))) .collect::>(); + + // add timestamp column + exprs.push(self.create_time_index_column_expr()?); + Ok(exprs) } } @@ -587,6 +609,15 @@ impl PromPlanner { ))) } + fn create_tag_column_exprs(&self) -> Result> { + let mut result = Vec::with_capacity(self.ctx.tag_columns.len()); + for tag in &self.ctx.tag_columns { + let expr = DfExpr::Column(Column::from_name(tag)); + result.push(expr); + } + Ok(result) + } + fn create_tag_and_time_index_column_sort_exprs(&self) -> Result> { let mut result = self .ctx @@ -923,14 +954,14 @@ mod test { let plan = PromPlanner::stmt_to_plan(eval_stmt, context_provider).unwrap(); let expected = String::from( - "Filter: some_metric.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N]\ - \n Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) [timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N]\ + "Filter: some_metric.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, tag_0:Utf8]\ + \n Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, tag_0:Utf8]\ \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(0, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" + \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" ).replace("TEMPLATE", plan_name); assert_eq!(plan.display_indent_schema().to_string(), expected); @@ -1123,13 +1154,14 @@ mod test { let context_provider = build_test_context_provider("some_metric".to_string(), 2, 2).await; let plan = PromPlanner::stmt_to_plan(eval_stmt.clone(), context_provider).unwrap(); let expected_no_without = String::from( - "Aggregate: groupBy=[[some_metric.tag_1]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\ - \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ - \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ - \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ - \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.tag_1 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ - \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(0, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ - \n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]" + "Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\ + \n Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\ + \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ + \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ + \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ + \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.tag_1 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ + \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]" ).replace("TEMPLATE", name); assert_eq!( plan.display_indent_schema().to_string(), @@ -1145,13 +1177,14 @@ mod test { let context_provider = build_test_context_provider("some_metric".to_string(), 2, 2).await; let plan = PromPlanner::stmt_to_plan(eval_stmt, context_provider).unwrap(); let expected_without = String::from( - "Aggregate: groupBy=[[some_metric.tag_0]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\ - \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ - \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ - \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ - \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.tag_1 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ - \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(0, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ - \n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]" + "Sort: some_metric.tag_0 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\ + \n Aggregate: groupBy=[[some_metric.tag_0, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\ + \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ + \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ + \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ + \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.tag_1 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ + \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]" ).replace("TEMPLATE", name); assert_eq!(plan.display_indent_schema().to_string(), expected_without); } @@ -1271,14 +1304,14 @@ mod test { \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(0, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"foo\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"foo\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(0, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" + \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" ); assert_eq!(plan.display_indent_schema().to_string(), expected); @@ -1306,8 +1339,8 @@ mod test { \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(0, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" + \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" ); assert_eq!(plan.display_indent_schema().to_string(), expected); diff --git a/src/query/src/parser.rs b/src/query/src/parser.rs index a34dc5b155..1baa111f1c 100644 --- a/src/query/src/parser.rs +++ b/src/query/src/parser.rs @@ -88,7 +88,11 @@ impl QueryLanguageParser { query: &query.query, })?; - let step = promql_parser::util::parse_duration(&query.step) + let step = query + .step + .parse::() + .map(Duration::from_secs) + .or_else(|_| promql_parser::util::parse_duration(&query.step)) .map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments))) .context(QueryParseSnafu { query: &query.query, diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index b0dd12fa0d..611a207731 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -42,6 +42,7 @@ opensrv-mysql = { git = "https://github.com/datafuselabs/opensrv", rev = "b44c9d pgwire = "0.10" pin-project = "1.0" postgres-types = { version = "0.2", features = ["with-chrono-0_4"] } +promql-parser = "0.1.0" prost.workspace = true query = { path = "../query" } rand = "0.8" diff --git a/src/servers/src/promql.rs b/src/servers/src/promql.rs index b85df2a4dd..7c2379192a 100644 --- a/src/servers/src/promql.rs +++ b/src/servers/src/promql.rs @@ -19,24 +19,35 @@ use std::sync::Arc; use async_trait::async_trait; use axum::body::BoxBody; use axum::extract::{Query, State}; -use axum::{routing, Json, Router}; +use axum::{routing, Form, Json, Router}; use common_error::prelude::ErrorExt; use common_query::Output; use common_recordbatch::RecordBatches; use common_telemetry::info; +use datatypes::prelude::ConcreteDataType; +use datatypes::scalars::ScalarVector; +use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector}; use futures::FutureExt; +use promql_parser::label::METRIC_NAME; +use promql_parser::parser::{ + AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, MatrixSelector, ParenExpr, SubqueryExpr, + UnaryExpr, VectorSelector, +}; use query::parser::PromQuery; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use snafu::{ensure, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use tokio::sync::oneshot::Sender; use tokio::sync::{oneshot, Mutex}; use tower::ServiceBuilder; use tower_http::auth::AsyncRequireAuthorizationLayer; +use tower_http::compression::CompressionLayer; use tower_http::trace::TraceLayer; use crate::auth::UserProviderRef; -use crate::error::{AlreadyStartedSnafu, CollectRecordbatchSnafu, Result, StartHttpSnafu}; +use crate::error::{ + AlreadyStartedSnafu, CollectRecordbatchSnafu, InternalSnafu, Result, StartHttpSnafu, +}; use crate::http::authorize::HttpAuth; use crate::server::Server; @@ -74,15 +85,16 @@ impl PromqlServer { let router = Router::new() .route("/query", routing::post(instant_query).get(instant_query)) - .route("/range_query", routing::post(range_query).get(range_query)) + .route("/query_range", routing::post(range_query).get(range_query)) .with_state(self.query_handler.clone()); Router::new() - .nest(&format!("/{PROMQL_API_VERSION}"), router) + .nest(&format!("/api/{PROMQL_API_VERSION}"), router) // middlewares .layer( ServiceBuilder::new() .layer(TraceLayer::new_for_http()) + .layer(CompressionLayer::new()) // custom layer .layer(AsyncRequireAuthorizationLayer::new( HttpAuth::::new(self.user_provider.clone()), @@ -134,7 +146,7 @@ impl Server for PromqlServer { #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] pub struct PromqlSeries { metric: HashMap, - values: Vec<(i64, String)>, + values: Vec<(f64, String)>, } #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] @@ -148,8 +160,12 @@ pub struct PromqlData { pub struct PromqlJsonResponse { status: String, data: PromqlData, + #[serde(skip_serializing_if = "Option::is_none")] error: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(rename = "errorType")] error_type: Option, + #[serde(skip_serializing_if = "Option::is_none")] warnings: Option>, } @@ -179,17 +195,17 @@ impl PromqlJsonResponse { } /// Convert from `Result` - pub async fn from_query_result(result: Result) -> Json { + pub async fn from_query_result(result: Result, metric_name: String) -> Json { let response: Result> = try { let json = match result? { Output::RecordBatches(batches) => { - Self::success(Self::record_batches_to_data(batches)?) + Self::success(Self::record_batches_to_data(batches, metric_name)?) } Output::Stream(stream) => { let record_batches = RecordBatches::try_collect(stream) .await .context(CollectRecordbatchSnafu)?; - Self::success(Self::record_batches_to_data(record_batches)?) + Self::success(Self::record_batches_to_data(record_batches, metric_name)?) } Output::AffectedRows(_) => Self::error( "unexpected result", @@ -203,16 +219,102 @@ impl PromqlJsonResponse { response.unwrap_or_else(|err| Self::error(err.status_code().to_string(), err.to_string())) } - /// TODO(ruihang): implement this conversion method - fn record_batches_to_data(_: RecordBatches) -> Result { + fn record_batches_to_data(batches: RecordBatches, metric_name: String) -> Result { + // infer semantic type of each column from schema. + // TODO(ruihang): wish there is a better way to do this. + let mut timestamp_column_index = None; + let mut tag_column_indices = Vec::new(); + let mut first_value_column_index = None; + + for (i, column) in batches.schema().column_schemas().iter().enumerate() { + match column.data_type { + ConcreteDataType::Timestamp(datatypes::types::TimestampType::Millisecond(_)) => { + if timestamp_column_index.is_none() { + timestamp_column_index = Some(i); + } + } + ConcreteDataType::Float64(_) => { + if first_value_column_index.is_none() { + first_value_column_index = Some(i); + } + } + ConcreteDataType::String(_) => { + tag_column_indices.push(i); + } + _ => {} + } + } + + let timestamp_column_index = timestamp_column_index.context(InternalSnafu { + err_msg: "no timestamp column found".to_string(), + })?; + let first_value_column_index = first_value_column_index.context(InternalSnafu { + err_msg: "no value column found".to_string(), + })?; + + let metric_name = (METRIC_NAME.to_string(), metric_name); + let mut buffer = HashMap::, Vec<(f64, String)>>::new(); + + for batch in batches.iter() { + // prepare things... + let tag_columns = tag_column_indices + .iter() + .map(|i| { + batch + .column(*i) + .as_any() + .downcast_ref::() + .unwrap() + }) + .collect::>(); + let tag_names = tag_column_indices + .iter() + .map(|c| batches.schema().column_name_by_index(*c).to_string()) + .collect::>(); + let timestamp_column = batch + .column(timestamp_column_index) + .as_any() + .downcast_ref::() + .unwrap(); + let value_column = batch + .column(first_value_column_index) + .as_any() + .downcast_ref::() + .unwrap(); + + // assemble rows + for row_index in 0..batch.num_rows() { + // retrieve tags + // TODO(ruihang): push table name `__metric__` + let mut tags = vec![metric_name.clone()]; + for (tag_column, tag_name) in tag_columns.iter().zip(tag_names.iter()) { + let tag_value = tag_column.get_data(row_index).unwrap().to_string(); + tags.push((tag_name.to_string(), tag_value)); + } + + // retrieve timestamp + let timestamp_millis: i64 = timestamp_column.get_data(row_index).unwrap().into(); + let timestamp = timestamp_millis as f64 / 1000.0; + + // retrieve value + let value = + Into::::into(value_column.get_data(row_index).unwrap()).to_string(); + + buffer.entry(tags).or_default().push((timestamp, value)); + } + } + + let result = buffer + .into_iter() + .map(|(tags, values)| PromqlSeries { + metric: tags.into_iter().collect(), + values, + }) + .collect(); + let data = PromqlData { result_type: "matrix".to_string(), - result: vec![PromqlSeries { - metric: vec![("__name__".to_string(), "foo".to_string())] - .into_iter() - .collect(), - values: vec![(1, "123.45".to_string())], - }], + result, }; Ok(data) @@ -233,16 +335,16 @@ pub async fn instant_query( ) -> Json { PromqlJsonResponse::error( "not implemented", - "instant query api `/query` is not implemented. Use `/range_query` instead.", + "instant query api `/query` is not implemented. Use `/query_range` instead.", ) } #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] pub struct RangeQuery { - query: String, - start: String, - end: String, - step: String, + query: Option, + start: Option, + end: Option, + step: Option, timeout: Option, } @@ -250,13 +352,47 @@ pub struct RangeQuery { pub async fn range_query( State(handler): State, Query(params): Query, + Form(form_params): Form, ) -> Json { let prom_query = PromQuery { - query: params.query, - start: params.start, - end: params.end, - step: params.step, + query: params.query.or(form_params.query).unwrap_or_default(), + start: params.start.or(form_params.start).unwrap_or_default(), + end: params.end.or(form_params.end).unwrap_or_default(), + step: params.step.or(form_params.step).unwrap_or_default(), }; let result = handler.do_query(&prom_query).await; - PromqlJsonResponse::from_query_result(result).await + let metric_name = retrieve_metric_name(&prom_query.query).unwrap_or_default(); + PromqlJsonResponse::from_query_result(result, metric_name).await +} + +fn retrieve_metric_name(promql: &str) -> Option { + let promql_expr = promql_parser::parser::parse(promql).ok()?; + promql_expr_to_metric_name(promql_expr) +} + +fn promql_expr_to_metric_name(expr: PromqlExpr) -> Option { + match expr { + PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => promql_expr_to_metric_name(*expr), + PromqlExpr::Unary(UnaryExpr { expr }) => promql_expr_to_metric_name(*expr), + PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => { + promql_expr_to_metric_name(*lhs).or(promql_expr_to_metric_name(*rhs)) + } + PromqlExpr::Paren(ParenExpr { expr }) => promql_expr_to_metric_name(*expr), + PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => promql_expr_to_metric_name(*expr), + PromqlExpr::NumberLiteral(_) => None, + PromqlExpr::StringLiteral(_) => None, + PromqlExpr::VectorSelector(VectorSelector { matchers, .. }) => { + matchers.find_matchers(METRIC_NAME).pop().cloned() + } + PromqlExpr::MatrixSelector(MatrixSelector { + vector_selector, .. + }) => { + let VectorSelector { matchers, .. } = vector_selector; + matchers.find_matchers(METRIC_NAME).pop().cloned() + } + PromqlExpr::Call(Call { args, .. }) => args + .args + .into_iter() + .find_map(|e| promql_expr_to_metric_name(*e)), + } } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 64b8e3715b..340b03efcc 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -290,18 +290,19 @@ pub async fn test_promql_http_api(store_type: StorageType) { let client = TestClient::new(app); // instant query - let res = client.get("/v1/query?query=up").send().await; + let res = client.get("/api/v1/query?query=up").send().await; assert_eq!(res.status(), StatusCode::OK); - let res = client.post("/v1/query?query=up").send().await; + let res = client.post("/api/v1/query?query=up").send().await; assert_eq!(res.status(), StatusCode::OK); let res = client - .get("/v1/range_query?query=up&start=1&end=100&step=5") + .get("/api/v1/query_range?query=up&start=1&end=100&step=5") .send() .await; assert_eq!(res.status(), StatusCode::OK); let res = client - .post("/v1/range_query?query=up&start=1&end=100&step=5") + .post("/api/v1/query_range?query=up&start=1&end=100&step=5") + .header("Content-Type", "application/x-www-form-urlencoded") .send() .await; assert_eq!(res.status(), StatusCode::OK);