diff --git a/Cargo.lock b/Cargo.lock index 8d3f5a56c4..116cb2fea7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6712,6 +6712,7 @@ dependencies = [ "pgwire", "pin-project", "postgres-types", + "promql-parser", "prost 0.11.6", "query", "rand 0.8.5", diff --git a/src/datanode/src/tests/promql_test.rs b/src/datanode/src/tests/promql_test.rs index c26f8f584d..cfd31ca81b 100644 --- a/src/datanode/src/tests/promql_test.rs +++ b/src/datanode/src/tests/promql_test.rs @@ -82,16 +82,16 @@ async fn sql_insert_promql_query_ceil() { UNIX_EPOCH.checked_add(Duration::from_secs(100)).unwrap(), Duration::from_secs(5), Duration::from_secs(1), - "+---------------------+-------------------------------+----------------------------------+\ - \n| ts | ceil(http_requests_total.cpu) | ceil(http_requests_total.memory) |\ - \n+---------------------+-------------------------------+----------------------------------+\ - \n| 1970-01-01T00:00:00 | 67 | 1024 |\ - \n| 1970-01-01T00:00:05 | 67 | 4096 |\ - \n| 1970-01-01T00:00:10 | 100 | 20480 |\ - \n| 1970-01-01T00:00:50 | 12424 | 1334 |\ - \n| 1970-01-01T00:01:20 | 0 | 2334 |\ - \n| 1970-01-01T00:01:40 | 49 | 3334 |\ - \n+---------------------+-------------------------------+----------------------------------+", + "+---------------------+-------------------------------+----------------------------------+-------+\ + \n| ts | ceil(http_requests_total.cpu) | ceil(http_requests_total.memory) | host |\ + \n+---------------------+-------------------------------+----------------------------------+-------+\ + \n| 1970-01-01T00:00:00 | 67 | 1024 | host1 |\ + \n| 1970-01-01T00:00:05 | 67 | 4096 | host1 |\ + \n| 1970-01-01T00:00:10 | 100 | 20480 | host1 |\ + \n| 1970-01-01T00:01:20 | 0 | 2334 | host1 |\ + \n| 1970-01-01T00:01:40 | 49 | 3334 | host1 |\ + \n| 1970-01-01T00:00:50 | 12424 | 1334 | host1 |\ + \n+---------------------+-------------------------------+----------------------------------+-------+" ) .await; } @@ -142,13 +142,12 @@ async fn aggregators_simple_sum() { unix_epoch_plus_100s(), Duration::from_secs(60), Duration::from_secs(0), - "+------------+--------------------------+\ - \n| group | SUM(http_requests.value) |\ - \n+------------+--------------------------+\ - \n| | |\ - \n| canary | 700 |\ - \n| production | 300 |\ - \n+------------+--------------------------+", + "+------------+---------------------+--------------------------+\ + \n| group | ts | SUM(http_requests.value) |\ + \n+------------+---------------------+--------------------------+\ + \n| production | 1970-01-01T00:00:00 | 300 |\ + \n| canary | 1970-01-01T00:00:00 | 700 |\ + \n+------------+---------------------+--------------------------+", ) .await; } @@ -167,13 +166,12 @@ async fn aggregators_simple_avg() { unix_epoch_plus_100s(), Duration::from_secs(60), Duration::from_secs(0), - "+------------+--------------------------+\ - \n| group | AVG(http_requests.value) |\ - \n+------------+--------------------------+\ - \n| | 0 |\ - \n| production | 150 |\ - \n| canary | 350 |\ - \n+------------+--------------------------+", + "+------------+---------------------+--------------------------+\ + \n| group | ts | AVG(http_requests.value) |\ + \n+------------+---------------------+--------------------------+\ + \n| production | 1970-01-01T00:00:00 | 150 |\ + \n| canary | 1970-01-01T00:00:00 | 350 |\ + \n+------------+---------------------+--------------------------+", ) .await; } @@ -192,13 +190,12 @@ async fn aggregators_simple_count() { unix_epoch_plus_100s(), Duration::from_secs(60), Duration::from_secs(0), - "+------------+----------------------------+\ - \n| group | COUNT(http_requests.value) |\ - \n+------------+----------------------------+\ - \n| | 0 |\ - \n| canary | 2 |\ - \n| production | 2 |\ - \n+------------+----------------------------+", + "+------------+---------------------+----------------------------+\ + \n| group | ts | COUNT(http_requests.value) |\ + \n+------------+---------------------+----------------------------+\ + \n| canary | 1970-01-01T00:00:00 | 2 |\ + \n| production | 1970-01-01T00:00:00 | 2 |\ + \n+------------+---------------------+----------------------------+", ) .await; } @@ -217,13 +214,12 @@ async fn aggregators_simple_without() { unix_epoch_plus_100s(), Duration::from_secs(60), Duration::from_secs(0), - "+------------+------------+--------------------------+\ - \n| group | job | SUM(http_requests.value) |\ - \n+------------+------------+--------------------------+\ - \n| | | |\ - \n| canary | api-server | 700 |\ - \n| production | api-server | 300 |\ - \n+------------+------------+--------------------------+", + "+------------+------------+---------------------+--------------------------+\ + \n| group | job | ts | SUM(http_requests.value) |\ + \n+------------+------------+---------------------+--------------------------+\ + \n| production | api-server | 1970-01-01T00:00:00 | 300 |\ + \n| canary | api-server | 1970-01-01T00:00:00 | 700 |\ + \n+------------+------------+---------------------+--------------------------+", ) .await; } @@ -241,11 +237,11 @@ async fn aggregators_empty_by() { unix_epoch_plus_100s(), Duration::from_secs(60), Duration::from_secs(0), - "+--------------------------+\ - \n| SUM(http_requests.value) |\ - \n+--------------------------+\ - \n| 1000 |\ - \n+--------------------------+", + "+---------------------+--------------------------+\ + \n| ts | SUM(http_requests.value) |\ + \n+---------------------+--------------------------+\ + \n| 1970-01-01T00:00:00 | 1000 |\ + \n+---------------------+--------------------------+", ) .await; } @@ -263,11 +259,11 @@ async fn aggregators_no_by_without() { unix_epoch_plus_100s(), Duration::from_secs(60), Duration::from_secs(0), - "+--------------------------+\ - \n| SUM(http_requests.value) |\ - \n+--------------------------+\ - \n| 1000 |\ - \n+--------------------------+", + "+---------------------+--------------------------+\ + \n| ts | SUM(http_requests.value) |\ + \n+---------------------+--------------------------+\ + \n| 1970-01-01T00:00:00 | 1000 |\ + \n+---------------------+--------------------------+", ) .await; } @@ -286,13 +282,12 @@ async fn aggregators_empty_without() { unix_epoch_plus_100s(), Duration::from_secs(60), Duration::from_secs(0), - "+------------+----------+------------+--------------------------+\ - \n| group | instance | job | SUM(http_requests.value) |\ - \n+------------+----------+------------+--------------------------+\ - \n| | | | |\ - \n| production | 0 | api-server | 100 |\ - \n| production | 1 | api-server | 200 |\ - \n+------------+----------+------------+--------------------------+", + "+------------+----------+------------+---------------------+--------------------------+\ + \n| group | instance | job | ts | SUM(http_requests.value) |\ + \n+------------+----------+------------+---------------------+--------------------------+\ + \n| production | 0 | api-server | 1970-01-01T00:00:00 | 100 |\ + \n| production | 1 | api-server | 1970-01-01T00:00:00 | 200 |\ + \n+------------+----------+------------+---------------------+--------------------------+", ) .await; } @@ -356,11 +351,12 @@ async fn stddev_by_label() { unix_epoch_plus_100s(), Duration::from_secs(60), Duration::from_secs(0), - "+----------+-----------------------------+\ - \n| instance | STDDEV(http_requests.value) |\ - \n+----------+-----------------------------+\ - \n| 0 | 258.19888974716116 |\ - \n+----------+-----------------------------+", + "+----------+---------------------+-----------------------------+\ + \n| instance | ts | STDDEV(http_requests.value) |\ + \n+----------+---------------------+-----------------------------+\ + \n| 0 | 1970-01-01T00:00:00 | 258.19888974716116 |\ + \n| 1 | 1970-01-01T00:00:00 | 258.19888974716116 |\ + \n+----------+---------------------+-----------------------------+", ) .await; } diff --git a/src/promql/src/extension_plan/instant_manipulate.rs b/src/promql/src/extension_plan/instant_manipulate.rs index a096447b5e..e735bd4a94 100644 --- a/src/promql/src/extension_plan/instant_manipulate.rs +++ b/src/promql/src/extension_plan/instant_manipulate.rs @@ -41,8 +41,7 @@ use crate::extension_plan::Millisecond; /// /// This plan will try to align the input time series, for every timestamp between /// `start` and `end` with step `interval`. Find in the `lookback` range if data -/// is missing at the given timestamp. If data is absent in some timestamp, all columns -/// except the time index will left blank. +/// is missing at the given timestamp. #[derive(Debug)] pub struct InstantManipulate { start: Millisecond, @@ -341,6 +340,17 @@ impl InstantManipulateStream { take_indices: Vec>, aligned_ts: Vec, ) -> ArrowResult { + let aligned_ts = aligned_ts + .into_iter() + .zip(take_indices.iter()) + .filter_map(|(ts, i)| i.map(|_| ts)) + .collect::>(); + let take_indices = take_indices + .iter() + .filter(|i| i.is_some()) + .copied() + .collect::>(); + let indices_array = UInt64Array::from(take_indices); let mut arrays = record_batch .columns() @@ -349,7 +359,8 @@ impl InstantManipulateStream { .collect::>>()?; arrays[self.time_index] = Arc::new(TimestampMillisecondArray::from(aligned_ts)); - RecordBatch::try_new(record_batch.schema(), arrays) + let result = RecordBatch::try_new(record_batch.schema(), arrays)?; + Ok(result) } } @@ -430,11 +441,8 @@ mod test { \n| 1970-01-01T00:01:00 | 1 | foo |\ \n| 1970-01-01T00:01:30 | 1 | foo |\ \n| 1970-01-01T00:02:00 | 1 | foo |\ - \n| 1970-01-01T00:02:30 | | |\ \n| 1970-01-01T00:03:00 | 1 | foo |\ - \n| 1970-01-01T00:03:30 | | |\ \n| 1970-01-01T00:04:00 | 1 | foo |\ - \n| 1970-01-01T00:04:30 | | |\ \n| 1970-01-01T00:05:00 | 1 | foo |\ \n+---------------------+-------+------+", ); @@ -449,34 +457,19 @@ mod test { \n+---------------------+-------+------+\ \n| 1970-01-01T00:00:00 | 1 | foo |\ \n| 1970-01-01T00:00:10 | 1 | foo |\ - \n| 1970-01-01T00:00:20 | | |\ \n| 1970-01-01T00:00:30 | 1 | foo |\ \n| 1970-01-01T00:00:40 | 1 | foo |\ - \n| 1970-01-01T00:00:50 | | |\ \n| 1970-01-01T00:01:00 | 1 | foo |\ \n| 1970-01-01T00:01:10 | 1 | foo |\ - \n| 1970-01-01T00:01:20 | | |\ \n| 1970-01-01T00:01:30 | 1 | foo |\ \n| 1970-01-01T00:01:40 | 1 | foo |\ - \n| 1970-01-01T00:01:50 | | |\ \n| 1970-01-01T00:02:00 | 1 | foo |\ \n| 1970-01-01T00:02:10 | 1 | foo |\ - \n| 1970-01-01T00:02:20 | | |\ - \n| 1970-01-01T00:02:30 | | |\ - \n| 1970-01-01T00:02:40 | | |\ - \n| 1970-01-01T00:02:50 | | |\ \n| 1970-01-01T00:03:00 | 1 | foo |\ \n| 1970-01-01T00:03:10 | 1 | foo |\ - \n| 1970-01-01T00:03:20 | | |\ - \n| 1970-01-01T00:03:30 | | |\ - \n| 1970-01-01T00:03:40 | | |\ - \n| 1970-01-01T00:03:50 | | |\ \n| 1970-01-01T00:04:00 | 1 | foo |\ \n| 1970-01-01T00:04:10 | 1 | foo |\ - \n| 1970-01-01T00:04:20 | | |\ - \n| 1970-01-01T00:04:30 | | |\ \n| 1970-01-01T00:04:40 | 1 | foo |\ - \n| 1970-01-01T00:04:50 | | |\ \n| 1970-01-01T00:05:00 | 1 | foo |\ \n+---------------------+-------+------+", ); @@ -527,14 +520,10 @@ mod test { \n| 1970-01-01T00:02:10 | 1 | foo |\ \n| 1970-01-01T00:02:20 | 1 | foo |\ \n| 1970-01-01T00:02:30 | 1 | foo |\ - \n| 1970-01-01T00:02:40 | | |\ - \n| 1970-01-01T00:02:50 | | |\ \n| 1970-01-01T00:03:00 | 1 | foo |\ \n| 1970-01-01T00:03:10 | 1 | foo |\ \n| 1970-01-01T00:03:20 | 1 | foo |\ \n| 1970-01-01T00:03:30 | 1 | foo |\ - \n| 1970-01-01T00:03:40 | | |\ - \n| 1970-01-01T00:03:50 | | |\ \n| 1970-01-01T00:04:00 | 1 | foo |\ \n| 1970-01-01T00:04:10 | 1 | foo |\ \n| 1970-01-01T00:04:20 | 1 | foo |\ @@ -617,22 +606,8 @@ mod test { "+---------------------+-------+------+\ \n| timestamp | value | path |\ \n+---------------------+-------+------+\ - \n| 1970-01-01T00:03:50 | | |\ - \n| 1970-01-01T00:03:51 | | |\ - \n| 1970-01-01T00:03:52 | | |\ - \n| 1970-01-01T00:03:53 | | |\ - \n| 1970-01-01T00:03:54 | | |\ - \n| 1970-01-01T00:03:55 | | |\ - \n| 1970-01-01T00:03:56 | | |\ - \n| 1970-01-01T00:03:57 | | |\ - \n| 1970-01-01T00:03:58 | | |\ - \n| 1970-01-01T00:03:59 | | |\ \n| 1970-01-01T00:04:00 | 1 | foo |\ \n| 1970-01-01T00:04:01 | 1 | foo |\ - \n| 1970-01-01T00:04:02 | | |\ - \n| 1970-01-01T00:04:03 | | |\ - \n| 1970-01-01T00:04:04 | | |\ - \n| 1970-01-01T00:04:05 | | |\ \n+---------------------+-------+------+", ); do_normalize_test(230_000, 245_000, 0, 1_000, expected).await; @@ -646,7 +621,6 @@ mod test { \n+---------------------+-------+------+\ \n| 1970-01-01T00:00:00 | 1 | foo |\ \n| 1970-01-01T00:00:10 | 1 | foo |\ - \n| 1970-01-01T00:00:20 | | |\ \n| 1970-01-01T00:00:30 | 1 | foo |\ \n+---------------------+-------+------+", ); @@ -659,37 +633,12 @@ mod test { "+---------------------+-------+------+\ \n| timestamp | value | path |\ \n+---------------------+-------+------+\ - \n| 1969-12-31T23:45:00 | | |\ - \n| 1969-12-31T23:46:00 | | |\ - \n| 1969-12-31T23:47:00 | | |\ - \n| 1969-12-31T23:48:00 | | |\ - \n| 1969-12-31T23:49:00 | | |\ - \n| 1969-12-31T23:50:00 | | |\ - \n| 1969-12-31T23:51:00 | | |\ - \n| 1969-12-31T23:52:00 | | |\ - \n| 1969-12-31T23:53:00 | | |\ - \n| 1969-12-31T23:54:00 | | |\ - \n| 1969-12-31T23:55:00 | | |\ - \n| 1969-12-31T23:56:00 | | |\ - \n| 1969-12-31T23:57:00 | | |\ - \n| 1969-12-31T23:58:00 | | |\ - \n| 1969-12-31T23:59:00 | | |\ \n| 1970-01-01T00:00:00 | 1 | foo |\ \n| 1970-01-01T00:01:00 | 1 | foo |\ \n| 1970-01-01T00:02:00 | 1 | foo |\ \n| 1970-01-01T00:03:00 | 1 | foo |\ \n| 1970-01-01T00:04:00 | 1 | foo |\ \n| 1970-01-01T00:05:00 | 1 | foo |\ - \n| 1970-01-01T00:06:00 | | |\ - \n| 1970-01-01T00:07:00 | | |\ - \n| 1970-01-01T00:08:00 | | |\ - \n| 1970-01-01T00:09:00 | | |\ - \n| 1970-01-01T00:10:00 | | |\ - \n| 1970-01-01T00:11:00 | | |\ - \n| 1970-01-01T00:12:00 | | |\ - \n| 1970-01-01T00:13:00 | | |\ - \n| 1970-01-01T00:14:00 | | |\ - \n| 1970-01-01T00:15:00 | | |\ \n+---------------------+-------+------+", ); do_normalize_test(-900_000, 900_000, 30_000, 60_000, expected).await; @@ -704,8 +653,6 @@ mod test { \n| 1970-01-01T00:03:10 | 1 | foo |\ \n| 1970-01-01T00:03:20 | 1 | foo |\ \n| 1970-01-01T00:03:30 | 1 | foo |\ - \n| 1970-01-01T00:03:40 | | |\ - \n| 1970-01-01T00:03:50 | | |\ \n| 1970-01-01T00:04:00 | 1 | foo |\ \n| 1970-01-01T00:04:10 | 1 | foo |\ \n| 1970-01-01T00:04:20 | 1 | foo |\ diff --git a/src/promql/src/extension_plan/normalize.rs b/src/promql/src/extension_plan/normalize.rs index 34b84d0c9f..9623a0ef09 100644 --- a/src/promql/src/extension_plan/normalize.rs +++ b/src/promql/src/extension_plan/normalize.rs @@ -17,6 +17,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use datafusion::arrow::array::{BooleanArray, Float64Array}; use datafusion::arrow::compute; use datafusion::common::{DFSchemaRef, Result as DataFusionResult, Statistics}; use datafusion::execution::context::TaskContext; @@ -40,6 +41,7 @@ use crate::extension_plan::Millisecond; /// Roughly speaking, this method does these things: /// - bias sample's timestamp by offset /// - sort the record batch based on timestamp column +/// - remove NaN values #[derive(Debug)] pub struct SeriesNormalize { offset: Millisecond, @@ -237,7 +239,23 @@ impl SeriesNormalizeStream { .iter() .map(|array| compute::take(array, &ordered_indices, None)) .collect::>>()?; - RecordBatch::try_new(input.schema(), ordered_columns) + let ordered_batch = RecordBatch::try_new(input.schema(), ordered_columns)?; + + // TODO(ruihang): consider the "special NaN" + // filter out NaN + let mut filter = vec![true; input.num_rows()]; + for column in ordered_batch.columns() { + if let Some(float_column) = column.as_any().downcast_ref::() { + for (i, flag) in filter.iter_mut().enumerate() { + if float_column.value(i).is_nan() { + *flag = false; + } + } + } + } + + let result = compute::filter_record_batch(&ordered_batch, &BooleanArray::from(filter))?; + Ok(result) } } diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 7ec70e38ad..2f6fec3eeb 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -103,9 +103,12 @@ impl PromPlanner { let input = self.prom_expr_to_plan(*expr.clone())?; // calculate columns to group by - let group_exprs = modifier.as_ref().map_or(Ok(Vec::new()), |m| { - self.agg_modifier_to_col(input.schema(), m) - })?; + // Need to append time index column into group by columns + let group_exprs = modifier + .as_ref() + .map_or(Ok(vec![self.create_time_index_column_expr()?]), |m| { + self.agg_modifier_to_col(input.schema(), m) + })?; // convert op and value columns to aggregate exprs let aggr_exprs = self.create_aggregate_exprs(*op, &input)?; @@ -114,9 +117,15 @@ impl PromPlanner { self.ctx.time_index_column = None; // create plan + let group_sort_expr = group_exprs + .clone() + .into_iter() + .map(|expr| expr.sort(true, false)); LogicalPlanBuilder::from(input) .aggregate(group_exprs, aggr_exprs) .context(DataFusionPlanningSnafu)? + .sort(group_sort_expr) + .context(DataFusionPlanningSnafu)? .build() .context(DataFusionPlanningSnafu)? } @@ -267,6 +276,7 @@ impl PromPlanner { })?)?; let mut func_exprs = self.create_function_expr(func, args.literals)?; func_exprs.insert(0, self.create_time_index_column_expr()?); + func_exprs.extend_from_slice(&self.create_tag_column_exprs()?); LogicalPlanBuilder::from(input) .project(func_exprs) .context(DataFusionPlanningSnafu)? @@ -302,12 +312,20 @@ impl PromPlanner { let table_name = self.ctx.table_name.clone().unwrap(); // make filter exprs + let offset_duration = -match offset { + Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond, + Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond), + None => 0, + }; let mut filters = self.matchers_to_expr(label_matchers)?; filters.push(self.create_time_index_column_expr()?.gt_eq(DfExpr::Literal( - ScalarValue::TimestampMillisecond(Some(self.ctx.start), None), + ScalarValue::TimestampMillisecond( + Some(self.ctx.start - offset_duration - self.ctx.lookback_delta), + None, + ), ))); filters.push(self.create_time_index_column_expr()?.lt_eq(DfExpr::Literal( - ScalarValue::TimestampMillisecond(Some(self.ctx.end), None), + ScalarValue::TimestampMillisecond(Some(self.ctx.end - offset_duration), None), ))); // make table scan with filter exprs @@ -328,11 +346,6 @@ impl PromPlanner { }); // make series_normalize plan - let offset_duration = match offset { - Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond, - Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond), - None => 0, - }; let series_normalize = SeriesNormalize::new( offset_duration, self.ctx @@ -344,10 +357,12 @@ impl PromPlanner { let logical_plan = LogicalPlan::Extension(Extension { node: Arc::new(series_normalize), }); + Ok(logical_plan) } /// Convert [AggModifier] to [Column] exprs for aggregation. + /// Timestamp column and tag columns will be included. /// /// # Side effect /// @@ -380,6 +395,9 @@ impl PromPlanner { // change the tag columns in context self.ctx.tag_columns = labels.iter().cloned().collect(); + // add timestamp column + exprs.push(self.create_time_index_column_expr()?); + Ok(exprs) } AggModifier::Without(labels) => { @@ -415,10 +433,14 @@ impl PromPlanner { self.ctx.tag_columns = all_fields.iter().map(|col| (*col).clone()).collect(); // collect remaining fields and convert to col expr - let exprs = all_fields + let mut exprs = all_fields .into_iter() .map(|c| DfExpr::Column(Column::from(c))) .collect::>(); + + // add timestamp column + exprs.push(self.create_time_index_column_expr()?); + Ok(exprs) } } @@ -587,6 +609,15 @@ impl PromPlanner { ))) } + fn create_tag_column_exprs(&self) -> Result> { + let mut result = Vec::with_capacity(self.ctx.tag_columns.len()); + for tag in &self.ctx.tag_columns { + let expr = DfExpr::Column(Column::from_name(tag)); + result.push(expr); + } + Ok(result) + } + fn create_tag_and_time_index_column_sort_exprs(&self) -> Result> { let mut result = self .ctx @@ -923,14 +954,14 @@ mod test { let plan = PromPlanner::stmt_to_plan(eval_stmt, context_provider).unwrap(); let expected = String::from( - "Filter: some_metric.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N]\ - \n Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) [timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N]\ + "Filter: some_metric.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, tag_0:Utf8]\ + \n Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, tag_0:Utf8]\ \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(0, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" + \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" ).replace("TEMPLATE", plan_name); assert_eq!(plan.display_indent_schema().to_string(), expected); @@ -1123,13 +1154,14 @@ mod test { let context_provider = build_test_context_provider("some_metric".to_string(), 2, 2).await; let plan = PromPlanner::stmt_to_plan(eval_stmt.clone(), context_provider).unwrap(); let expected_no_without = String::from( - "Aggregate: groupBy=[[some_metric.tag_1]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\ - \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ - \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ - \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ - \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.tag_1 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ - \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(0, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ - \n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]" + "Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\ + \n Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\ + \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ + \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ + \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ + \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.tag_1 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ + \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]" ).replace("TEMPLATE", name); assert_eq!( plan.display_indent_schema().to_string(), @@ -1145,13 +1177,14 @@ mod test { let context_provider = build_test_context_provider("some_metric".to_string(), 2, 2).await; let plan = PromPlanner::stmt_to_plan(eval_stmt, context_provider).unwrap(); let expected_without = String::from( - "Aggregate: groupBy=[[some_metric.tag_0]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\ - \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ - \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ - \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ - \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.tag_1 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ - \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(0, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ - \n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]" + "Sort: some_metric.tag_0 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\ + \n Aggregate: groupBy=[[some_metric.tag_0, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\ + \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ + \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ + \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ + \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.tag_1 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ + \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]" ).replace("TEMPLATE", name); assert_eq!(plan.display_indent_schema().to_string(), expected_without); } @@ -1271,14 +1304,14 @@ mod test { \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(0, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"foo\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"foo\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(0, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" + \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" ); assert_eq!(plan.display_indent_schema().to_string(), expected); @@ -1306,8 +1339,8 @@ mod test { \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(0, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" + \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" ); assert_eq!(plan.display_indent_schema().to_string(), expected); diff --git a/src/query/src/parser.rs b/src/query/src/parser.rs index a34dc5b155..1baa111f1c 100644 --- a/src/query/src/parser.rs +++ b/src/query/src/parser.rs @@ -88,7 +88,11 @@ impl QueryLanguageParser { query: &query.query, })?; - let step = promql_parser::util::parse_duration(&query.step) + let step = query + .step + .parse::() + .map(Duration::from_secs) + .or_else(|_| promql_parser::util::parse_duration(&query.step)) .map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments))) .context(QueryParseSnafu { query: &query.query, diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index b0dd12fa0d..611a207731 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -42,6 +42,7 @@ opensrv-mysql = { git = "https://github.com/datafuselabs/opensrv", rev = "b44c9d pgwire = "0.10" pin-project = "1.0" postgres-types = { version = "0.2", features = ["with-chrono-0_4"] } +promql-parser = "0.1.0" prost.workspace = true query = { path = "../query" } rand = "0.8" diff --git a/src/servers/src/promql.rs b/src/servers/src/promql.rs index b85df2a4dd..7c2379192a 100644 --- a/src/servers/src/promql.rs +++ b/src/servers/src/promql.rs @@ -19,24 +19,35 @@ use std::sync::Arc; use async_trait::async_trait; use axum::body::BoxBody; use axum::extract::{Query, State}; -use axum::{routing, Json, Router}; +use axum::{routing, Form, Json, Router}; use common_error::prelude::ErrorExt; use common_query::Output; use common_recordbatch::RecordBatches; use common_telemetry::info; +use datatypes::prelude::ConcreteDataType; +use datatypes::scalars::ScalarVector; +use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector}; use futures::FutureExt; +use promql_parser::label::METRIC_NAME; +use promql_parser::parser::{ + AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, MatrixSelector, ParenExpr, SubqueryExpr, + UnaryExpr, VectorSelector, +}; use query::parser::PromQuery; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use snafu::{ensure, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use tokio::sync::oneshot::Sender; use tokio::sync::{oneshot, Mutex}; use tower::ServiceBuilder; use tower_http::auth::AsyncRequireAuthorizationLayer; +use tower_http::compression::CompressionLayer; use tower_http::trace::TraceLayer; use crate::auth::UserProviderRef; -use crate::error::{AlreadyStartedSnafu, CollectRecordbatchSnafu, Result, StartHttpSnafu}; +use crate::error::{ + AlreadyStartedSnafu, CollectRecordbatchSnafu, InternalSnafu, Result, StartHttpSnafu, +}; use crate::http::authorize::HttpAuth; use crate::server::Server; @@ -74,15 +85,16 @@ impl PromqlServer { let router = Router::new() .route("/query", routing::post(instant_query).get(instant_query)) - .route("/range_query", routing::post(range_query).get(range_query)) + .route("/query_range", routing::post(range_query).get(range_query)) .with_state(self.query_handler.clone()); Router::new() - .nest(&format!("/{PROMQL_API_VERSION}"), router) + .nest(&format!("/api/{PROMQL_API_VERSION}"), router) // middlewares .layer( ServiceBuilder::new() .layer(TraceLayer::new_for_http()) + .layer(CompressionLayer::new()) // custom layer .layer(AsyncRequireAuthorizationLayer::new( HttpAuth::::new(self.user_provider.clone()), @@ -134,7 +146,7 @@ impl Server for PromqlServer { #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] pub struct PromqlSeries { metric: HashMap, - values: Vec<(i64, String)>, + values: Vec<(f64, String)>, } #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] @@ -148,8 +160,12 @@ pub struct PromqlData { pub struct PromqlJsonResponse { status: String, data: PromqlData, + #[serde(skip_serializing_if = "Option::is_none")] error: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(rename = "errorType")] error_type: Option, + #[serde(skip_serializing_if = "Option::is_none")] warnings: Option>, } @@ -179,17 +195,17 @@ impl PromqlJsonResponse { } /// Convert from `Result` - pub async fn from_query_result(result: Result) -> Json { + pub async fn from_query_result(result: Result, metric_name: String) -> Json { let response: Result> = try { let json = match result? { Output::RecordBatches(batches) => { - Self::success(Self::record_batches_to_data(batches)?) + Self::success(Self::record_batches_to_data(batches, metric_name)?) } Output::Stream(stream) => { let record_batches = RecordBatches::try_collect(stream) .await .context(CollectRecordbatchSnafu)?; - Self::success(Self::record_batches_to_data(record_batches)?) + Self::success(Self::record_batches_to_data(record_batches, metric_name)?) } Output::AffectedRows(_) => Self::error( "unexpected result", @@ -203,16 +219,102 @@ impl PromqlJsonResponse { response.unwrap_or_else(|err| Self::error(err.status_code().to_string(), err.to_string())) } - /// TODO(ruihang): implement this conversion method - fn record_batches_to_data(_: RecordBatches) -> Result { + fn record_batches_to_data(batches: RecordBatches, metric_name: String) -> Result { + // infer semantic type of each column from schema. + // TODO(ruihang): wish there is a better way to do this. + let mut timestamp_column_index = None; + let mut tag_column_indices = Vec::new(); + let mut first_value_column_index = None; + + for (i, column) in batches.schema().column_schemas().iter().enumerate() { + match column.data_type { + ConcreteDataType::Timestamp(datatypes::types::TimestampType::Millisecond(_)) => { + if timestamp_column_index.is_none() { + timestamp_column_index = Some(i); + } + } + ConcreteDataType::Float64(_) => { + if first_value_column_index.is_none() { + first_value_column_index = Some(i); + } + } + ConcreteDataType::String(_) => { + tag_column_indices.push(i); + } + _ => {} + } + } + + let timestamp_column_index = timestamp_column_index.context(InternalSnafu { + err_msg: "no timestamp column found".to_string(), + })?; + let first_value_column_index = first_value_column_index.context(InternalSnafu { + err_msg: "no value column found".to_string(), + })?; + + let metric_name = (METRIC_NAME.to_string(), metric_name); + let mut buffer = HashMap::, Vec<(f64, String)>>::new(); + + for batch in batches.iter() { + // prepare things... + let tag_columns = tag_column_indices + .iter() + .map(|i| { + batch + .column(*i) + .as_any() + .downcast_ref::() + .unwrap() + }) + .collect::>(); + let tag_names = tag_column_indices + .iter() + .map(|c| batches.schema().column_name_by_index(*c).to_string()) + .collect::>(); + let timestamp_column = batch + .column(timestamp_column_index) + .as_any() + .downcast_ref::() + .unwrap(); + let value_column = batch + .column(first_value_column_index) + .as_any() + .downcast_ref::() + .unwrap(); + + // assemble rows + for row_index in 0..batch.num_rows() { + // retrieve tags + // TODO(ruihang): push table name `__metric__` + let mut tags = vec![metric_name.clone()]; + for (tag_column, tag_name) in tag_columns.iter().zip(tag_names.iter()) { + let tag_value = tag_column.get_data(row_index).unwrap().to_string(); + tags.push((tag_name.to_string(), tag_value)); + } + + // retrieve timestamp + let timestamp_millis: i64 = timestamp_column.get_data(row_index).unwrap().into(); + let timestamp = timestamp_millis as f64 / 1000.0; + + // retrieve value + let value = + Into::::into(value_column.get_data(row_index).unwrap()).to_string(); + + buffer.entry(tags).or_default().push((timestamp, value)); + } + } + + let result = buffer + .into_iter() + .map(|(tags, values)| PromqlSeries { + metric: tags.into_iter().collect(), + values, + }) + .collect(); + let data = PromqlData { result_type: "matrix".to_string(), - result: vec![PromqlSeries { - metric: vec![("__name__".to_string(), "foo".to_string())] - .into_iter() - .collect(), - values: vec![(1, "123.45".to_string())], - }], + result, }; Ok(data) @@ -233,16 +335,16 @@ pub async fn instant_query( ) -> Json { PromqlJsonResponse::error( "not implemented", - "instant query api `/query` is not implemented. Use `/range_query` instead.", + "instant query api `/query` is not implemented. Use `/query_range` instead.", ) } #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] pub struct RangeQuery { - query: String, - start: String, - end: String, - step: String, + query: Option, + start: Option, + end: Option, + step: Option, timeout: Option, } @@ -250,13 +352,47 @@ pub struct RangeQuery { pub async fn range_query( State(handler): State, Query(params): Query, + Form(form_params): Form, ) -> Json { let prom_query = PromQuery { - query: params.query, - start: params.start, - end: params.end, - step: params.step, + query: params.query.or(form_params.query).unwrap_or_default(), + start: params.start.or(form_params.start).unwrap_or_default(), + end: params.end.or(form_params.end).unwrap_or_default(), + step: params.step.or(form_params.step).unwrap_or_default(), }; let result = handler.do_query(&prom_query).await; - PromqlJsonResponse::from_query_result(result).await + let metric_name = retrieve_metric_name(&prom_query.query).unwrap_or_default(); + PromqlJsonResponse::from_query_result(result, metric_name).await +} + +fn retrieve_metric_name(promql: &str) -> Option { + let promql_expr = promql_parser::parser::parse(promql).ok()?; + promql_expr_to_metric_name(promql_expr) +} + +fn promql_expr_to_metric_name(expr: PromqlExpr) -> Option { + match expr { + PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => promql_expr_to_metric_name(*expr), + PromqlExpr::Unary(UnaryExpr { expr }) => promql_expr_to_metric_name(*expr), + PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => { + promql_expr_to_metric_name(*lhs).or(promql_expr_to_metric_name(*rhs)) + } + PromqlExpr::Paren(ParenExpr { expr }) => promql_expr_to_metric_name(*expr), + PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => promql_expr_to_metric_name(*expr), + PromqlExpr::NumberLiteral(_) => None, + PromqlExpr::StringLiteral(_) => None, + PromqlExpr::VectorSelector(VectorSelector { matchers, .. }) => { + matchers.find_matchers(METRIC_NAME).pop().cloned() + } + PromqlExpr::MatrixSelector(MatrixSelector { + vector_selector, .. + }) => { + let VectorSelector { matchers, .. } = vector_selector; + matchers.find_matchers(METRIC_NAME).pop().cloned() + } + PromqlExpr::Call(Call { args, .. }) => args + .args + .into_iter() + .find_map(|e| promql_expr_to_metric_name(*e)), + } } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 64b8e3715b..340b03efcc 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -290,18 +290,19 @@ pub async fn test_promql_http_api(store_type: StorageType) { let client = TestClient::new(app); // instant query - let res = client.get("/v1/query?query=up").send().await; + let res = client.get("/api/v1/query?query=up").send().await; assert_eq!(res.status(), StatusCode::OK); - let res = client.post("/v1/query?query=up").send().await; + let res = client.post("/api/v1/query?query=up").send().await; assert_eq!(res.status(), StatusCode::OK); let res = client - .get("/v1/range_query?query=up&start=1&end=100&step=5") + .get("/api/v1/query_range?query=up&start=1&end=100&step=5") .send() .await; assert_eq!(res.status(), StatusCode::OK); let res = client - .post("/v1/range_query?query=up&start=1&end=100&step=5") + .post("/api/v1/query_range?query=up&start=1&end=100&step=5") + .header("Content-Type", "application/x-www-form-urlencoded") .send() .await; assert_eq!(res.status(), StatusCode::OK);