feat: improve Prometheus compliance (#1022)

* initial impl

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* minor (useless) refactor

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* retrieve metric name

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add time index column to group by columns
filter out NaN in normalize
remove NULL in instant manipulator
accept form data as HTTP params
correct API URL
accept second literal as step param

* happy clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update test result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-02-20 15:29:43 +08:00
committed by GitHub
parent 6e9964ac97
commit 68b231987c
9 changed files with 332 additions and 195 deletions

1
Cargo.lock generated
View File

@@ -6712,6 +6712,7 @@ dependencies = [
"pgwire",
"pin-project",
"postgres-types",
"promql-parser",
"prost 0.11.6",
"query",
"rand 0.8.5",

View File

@@ -82,16 +82,16 @@ async fn sql_insert_promql_query_ceil() {
UNIX_EPOCH.checked_add(Duration::from_secs(100)).unwrap(),
Duration::from_secs(5),
Duration::from_secs(1),
"+---------------------+-------------------------------+----------------------------------+\
\n| ts | ceil(http_requests_total.cpu) | ceil(http_requests_total.memory) |\
\n+---------------------+-------------------------------+----------------------------------+\
\n| 1970-01-01T00:00:00 | 67 | 1024 |\
\n| 1970-01-01T00:00:05 | 67 | 4096 |\
\n| 1970-01-01T00:00:10 | 100 | 20480 |\
\n| 1970-01-01T00:00:50 | 12424 | 1334 |\
\n| 1970-01-01T00:01:20 | 0 | 2334 |\
\n| 1970-01-01T00:01:40 | 49 | 3334 |\
\n+---------------------+-------------------------------+----------------------------------+",
"+---------------------+-------------------------------+----------------------------------+-------+\
\n| ts | ceil(http_requests_total.cpu) | ceil(http_requests_total.memory) | host |\
\n+---------------------+-------------------------------+----------------------------------+-------+\
\n| 1970-01-01T00:00:00 | 67 | 1024 | host1 |\
\n| 1970-01-01T00:00:05 | 67 | 4096 | host1 |\
\n| 1970-01-01T00:00:10 | 100 | 20480 | host1 |\
\n| 1970-01-01T00:01:20 | 0 | 2334 | host1 |\
\n| 1970-01-01T00:01:40 | 49 | 3334 | host1 |\
\n| 1970-01-01T00:00:50 | 12424 | 1334 | host1 |\
\n+---------------------+-------------------------------+----------------------------------+-------+"
)
.await;
}
@@ -142,13 +142,12 @@ async fn aggregators_simple_sum() {
unix_epoch_plus_100s(),
Duration::from_secs(60),
Duration::from_secs(0),
"+------------+--------------------------+\
\n| group | SUM(http_requests.value) |\
\n+------------+--------------------------+\
\n| | |\
\n| canary | 700 |\
\n| production | 300 |\
\n+------------+--------------------------+",
"+------------+---------------------+--------------------------+\
\n| group | ts | SUM(http_requests.value) |\
\n+------------+---------------------+--------------------------+\
\n| production | 1970-01-01T00:00:00 | 300 |\
\n| canary | 1970-01-01T00:00:00 | 700 |\
\n+------------+---------------------+--------------------------+",
)
.await;
}
@@ -167,13 +166,12 @@ async fn aggregators_simple_avg() {
unix_epoch_plus_100s(),
Duration::from_secs(60),
Duration::from_secs(0),
"+------------+--------------------------+\
\n| group | AVG(http_requests.value) |\
\n+------------+--------------------------+\
\n| | 0 |\
\n| production | 150 |\
\n| canary | 350 |\
\n+------------+--------------------------+",
"+------------+---------------------+--------------------------+\
\n| group | ts | AVG(http_requests.value) |\
\n+------------+---------------------+--------------------------+\
\n| production | 1970-01-01T00:00:00 | 150 |\
\n| canary | 1970-01-01T00:00:00 | 350 |\
\n+------------+---------------------+--------------------------+",
)
.await;
}
@@ -192,13 +190,12 @@ async fn aggregators_simple_count() {
unix_epoch_plus_100s(),
Duration::from_secs(60),
Duration::from_secs(0),
"+------------+----------------------------+\
\n| group | COUNT(http_requests.value) |\
\n+------------+----------------------------+\
\n| | 0 |\
\n| canary | 2 |\
\n| production | 2 |\
\n+------------+----------------------------+",
"+------------+---------------------+----------------------------+\
\n| group | ts | COUNT(http_requests.value) |\
\n+------------+---------------------+----------------------------+\
\n| canary | 1970-01-01T00:00:00 | 2 |\
\n| production | 1970-01-01T00:00:00 | 2 |\
\n+------------+---------------------+----------------------------+",
)
.await;
}
@@ -217,13 +214,12 @@ async fn aggregators_simple_without() {
unix_epoch_plus_100s(),
Duration::from_secs(60),
Duration::from_secs(0),
"+------------+------------+--------------------------+\
\n| group | job | SUM(http_requests.value) |\
\n+------------+------------+--------------------------+\
\n| | | |\
\n| canary | api-server | 700 |\
\n| production | api-server | 300 |\
\n+------------+------------+--------------------------+",
"+------------+------------+---------------------+--------------------------+\
\n| group | job | ts | SUM(http_requests.value) |\
\n+------------+------------+---------------------+--------------------------+\
\n| production | api-server | 1970-01-01T00:00:00 | 300 |\
\n| canary | api-server | 1970-01-01T00:00:00 | 700 |\
\n+------------+------------+---------------------+--------------------------+",
)
.await;
}
@@ -241,11 +237,11 @@ async fn aggregators_empty_by() {
unix_epoch_plus_100s(),
Duration::from_secs(60),
Duration::from_secs(0),
"+--------------------------+\
\n| SUM(http_requests.value) |\
\n+--------------------------+\
\n| 1000 |\
\n+--------------------------+",
"+---------------------+--------------------------+\
\n| ts | SUM(http_requests.value) |\
\n+---------------------+--------------------------+\
\n| 1970-01-01T00:00:00 | 1000 |\
\n+---------------------+--------------------------+",
)
.await;
}
@@ -263,11 +259,11 @@ async fn aggregators_no_by_without() {
unix_epoch_plus_100s(),
Duration::from_secs(60),
Duration::from_secs(0),
"+--------------------------+\
\n| SUM(http_requests.value) |\
\n+--------------------------+\
\n| 1000 |\
\n+--------------------------+",
"+---------------------+--------------------------+\
\n| ts | SUM(http_requests.value) |\
\n+---------------------+--------------------------+\
\n| 1970-01-01T00:00:00 | 1000 |\
\n+---------------------+--------------------------+",
)
.await;
}
@@ -286,13 +282,12 @@ async fn aggregators_empty_without() {
unix_epoch_plus_100s(),
Duration::from_secs(60),
Duration::from_secs(0),
"+------------+----------+------------+--------------------------+\
\n| group | instance | job | SUM(http_requests.value) |\
\n+------------+----------+------------+--------------------------+\
\n| | | | |\
\n| production | 0 | api-server | 100 |\
\n| production | 1 | api-server | 200 |\
\n+------------+----------+------------+--------------------------+",
"+------------+----------+------------+---------------------+--------------------------+\
\n| group | instance | job | ts | SUM(http_requests.value) |\
\n+------------+----------+------------+---------------------+--------------------------+\
\n| production | 0 | api-server | 1970-01-01T00:00:00 | 100 |\
\n| production | 1 | api-server | 1970-01-01T00:00:00 | 200 |\
\n+------------+----------+------------+---------------------+--------------------------+",
)
.await;
}
@@ -356,11 +351,12 @@ async fn stddev_by_label() {
unix_epoch_plus_100s(),
Duration::from_secs(60),
Duration::from_secs(0),
"+----------+-----------------------------+\
\n| instance | STDDEV(http_requests.value) |\
\n+----------+-----------------------------+\
\n| 0 | 258.19888974716116 |\
\n+----------+-----------------------------+",
"+----------+---------------------+-----------------------------+\
\n| instance | ts | STDDEV(http_requests.value) |\
\n+----------+---------------------+-----------------------------+\
\n| 0 | 1970-01-01T00:00:00 | 258.19888974716116 |\
\n| 1 | 1970-01-01T00:00:00 | 258.19888974716116 |\
\n+----------+---------------------+-----------------------------+",
)
.await;
}

View File

@@ -41,8 +41,7 @@ use crate::extension_plan::Millisecond;
///
/// This plan will try to align the input time series, for every timestamp between
/// `start` and `end` with step `interval`. Find in the `lookback` range if data
/// is missing at the given timestamp. If data is absent in some timestamp, all columns
/// except the time index will left blank.
/// is missing at the given timestamp.
#[derive(Debug)]
pub struct InstantManipulate {
start: Millisecond,
@@ -341,6 +340,17 @@ impl InstantManipulateStream {
take_indices: Vec<Option<u64>>,
aligned_ts: Vec<Millisecond>,
) -> ArrowResult<RecordBatch> {
let aligned_ts = aligned_ts
.into_iter()
.zip(take_indices.iter())
.filter_map(|(ts, i)| i.map(|_| ts))
.collect::<Vec<_>>();
let take_indices = take_indices
.iter()
.filter(|i| i.is_some())
.copied()
.collect::<Vec<_>>();
let indices_array = UInt64Array::from(take_indices);
let mut arrays = record_batch
.columns()
@@ -349,7 +359,8 @@ impl InstantManipulateStream {
.collect::<ArrowResult<Vec<_>>>()?;
arrays[self.time_index] = Arc::new(TimestampMillisecondArray::from(aligned_ts));
RecordBatch::try_new(record_batch.schema(), arrays)
let result = RecordBatch::try_new(record_batch.schema(), arrays)?;
Ok(result)
}
}
@@ -430,11 +441,8 @@ mod test {
\n| 1970-01-01T00:01:00 | 1 | foo |\
\n| 1970-01-01T00:01:30 | 1 | foo |\
\n| 1970-01-01T00:02:00 | 1 | foo |\
\n| 1970-01-01T00:02:30 | | |\
\n| 1970-01-01T00:03:00 | 1 | foo |\
\n| 1970-01-01T00:03:30 | | |\
\n| 1970-01-01T00:04:00 | 1 | foo |\
\n| 1970-01-01T00:04:30 | | |\
\n| 1970-01-01T00:05:00 | 1 | foo |\
\n+---------------------+-------+------+",
);
@@ -449,34 +457,19 @@ mod test {
\n+---------------------+-------+------+\
\n| 1970-01-01T00:00:00 | 1 | foo |\
\n| 1970-01-01T00:00:10 | 1 | foo |\
\n| 1970-01-01T00:00:20 | | |\
\n| 1970-01-01T00:00:30 | 1 | foo |\
\n| 1970-01-01T00:00:40 | 1 | foo |\
\n| 1970-01-01T00:00:50 | | |\
\n| 1970-01-01T00:01:00 | 1 | foo |\
\n| 1970-01-01T00:01:10 | 1 | foo |\
\n| 1970-01-01T00:01:20 | | |\
\n| 1970-01-01T00:01:30 | 1 | foo |\
\n| 1970-01-01T00:01:40 | 1 | foo |\
\n| 1970-01-01T00:01:50 | | |\
\n| 1970-01-01T00:02:00 | 1 | foo |\
\n| 1970-01-01T00:02:10 | 1 | foo |\
\n| 1970-01-01T00:02:20 | | |\
\n| 1970-01-01T00:02:30 | | |\
\n| 1970-01-01T00:02:40 | | |\
\n| 1970-01-01T00:02:50 | | |\
\n| 1970-01-01T00:03:00 | 1 | foo |\
\n| 1970-01-01T00:03:10 | 1 | foo |\
\n| 1970-01-01T00:03:20 | | |\
\n| 1970-01-01T00:03:30 | | |\
\n| 1970-01-01T00:03:40 | | |\
\n| 1970-01-01T00:03:50 | | |\
\n| 1970-01-01T00:04:00 | 1 | foo |\
\n| 1970-01-01T00:04:10 | 1 | foo |\
\n| 1970-01-01T00:04:20 | | |\
\n| 1970-01-01T00:04:30 | | |\
\n| 1970-01-01T00:04:40 | 1 | foo |\
\n| 1970-01-01T00:04:50 | | |\
\n| 1970-01-01T00:05:00 | 1 | foo |\
\n+---------------------+-------+------+",
);
@@ -527,14 +520,10 @@ mod test {
\n| 1970-01-01T00:02:10 | 1 | foo |\
\n| 1970-01-01T00:02:20 | 1 | foo |\
\n| 1970-01-01T00:02:30 | 1 | foo |\
\n| 1970-01-01T00:02:40 | | |\
\n| 1970-01-01T00:02:50 | | |\
\n| 1970-01-01T00:03:00 | 1 | foo |\
\n| 1970-01-01T00:03:10 | 1 | foo |\
\n| 1970-01-01T00:03:20 | 1 | foo |\
\n| 1970-01-01T00:03:30 | 1 | foo |\
\n| 1970-01-01T00:03:40 | | |\
\n| 1970-01-01T00:03:50 | | |\
\n| 1970-01-01T00:04:00 | 1 | foo |\
\n| 1970-01-01T00:04:10 | 1 | foo |\
\n| 1970-01-01T00:04:20 | 1 | foo |\
@@ -617,22 +606,8 @@ mod test {
"+---------------------+-------+------+\
\n| timestamp | value | path |\
\n+---------------------+-------+------+\
\n| 1970-01-01T00:03:50 | | |\
\n| 1970-01-01T00:03:51 | | |\
\n| 1970-01-01T00:03:52 | | |\
\n| 1970-01-01T00:03:53 | | |\
\n| 1970-01-01T00:03:54 | | |\
\n| 1970-01-01T00:03:55 | | |\
\n| 1970-01-01T00:03:56 | | |\
\n| 1970-01-01T00:03:57 | | |\
\n| 1970-01-01T00:03:58 | | |\
\n| 1970-01-01T00:03:59 | | |\
\n| 1970-01-01T00:04:00 | 1 | foo |\
\n| 1970-01-01T00:04:01 | 1 | foo |\
\n| 1970-01-01T00:04:02 | | |\
\n| 1970-01-01T00:04:03 | | |\
\n| 1970-01-01T00:04:04 | | |\
\n| 1970-01-01T00:04:05 | | |\
\n+---------------------+-------+------+",
);
do_normalize_test(230_000, 245_000, 0, 1_000, expected).await;
@@ -646,7 +621,6 @@ mod test {
\n+---------------------+-------+------+\
\n| 1970-01-01T00:00:00 | 1 | foo |\
\n| 1970-01-01T00:00:10 | 1 | foo |\
\n| 1970-01-01T00:00:20 | | |\
\n| 1970-01-01T00:00:30 | 1 | foo |\
\n+---------------------+-------+------+",
);
@@ -659,37 +633,12 @@ mod test {
"+---------------------+-------+------+\
\n| timestamp | value | path |\
\n+---------------------+-------+------+\
\n| 1969-12-31T23:45:00 | | |\
\n| 1969-12-31T23:46:00 | | |\
\n| 1969-12-31T23:47:00 | | |\
\n| 1969-12-31T23:48:00 | | |\
\n| 1969-12-31T23:49:00 | | |\
\n| 1969-12-31T23:50:00 | | |\
\n| 1969-12-31T23:51:00 | | |\
\n| 1969-12-31T23:52:00 | | |\
\n| 1969-12-31T23:53:00 | | |\
\n| 1969-12-31T23:54:00 | | |\
\n| 1969-12-31T23:55:00 | | |\
\n| 1969-12-31T23:56:00 | | |\
\n| 1969-12-31T23:57:00 | | |\
\n| 1969-12-31T23:58:00 | | |\
\n| 1969-12-31T23:59:00 | | |\
\n| 1970-01-01T00:00:00 | 1 | foo |\
\n| 1970-01-01T00:01:00 | 1 | foo |\
\n| 1970-01-01T00:02:00 | 1 | foo |\
\n| 1970-01-01T00:03:00 | 1 | foo |\
\n| 1970-01-01T00:04:00 | 1 | foo |\
\n| 1970-01-01T00:05:00 | 1 | foo |\
\n| 1970-01-01T00:06:00 | | |\
\n| 1970-01-01T00:07:00 | | |\
\n| 1970-01-01T00:08:00 | | |\
\n| 1970-01-01T00:09:00 | | |\
\n| 1970-01-01T00:10:00 | | |\
\n| 1970-01-01T00:11:00 | | |\
\n| 1970-01-01T00:12:00 | | |\
\n| 1970-01-01T00:13:00 | | |\
\n| 1970-01-01T00:14:00 | | |\
\n| 1970-01-01T00:15:00 | | |\
\n+---------------------+-------+------+",
);
do_normalize_test(-900_000, 900_000, 30_000, 60_000, expected).await;
@@ -704,8 +653,6 @@ mod test {
\n| 1970-01-01T00:03:10 | 1 | foo |\
\n| 1970-01-01T00:03:20 | 1 | foo |\
\n| 1970-01-01T00:03:30 | 1 | foo |\
\n| 1970-01-01T00:03:40 | | |\
\n| 1970-01-01T00:03:50 | | |\
\n| 1970-01-01T00:04:00 | 1 | foo |\
\n| 1970-01-01T00:04:10 | 1 | foo |\
\n| 1970-01-01T00:04:20 | 1 | foo |\

View File

@@ -17,6 +17,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use datafusion::arrow::array::{BooleanArray, Float64Array};
use datafusion::arrow::compute;
use datafusion::common::{DFSchemaRef, Result as DataFusionResult, Statistics};
use datafusion::execution::context::TaskContext;
@@ -40,6 +41,7 @@ use crate::extension_plan::Millisecond;
/// Roughly speaking, this method does these things:
/// - bias sample's timestamp by offset
/// - sort the record batch based on timestamp column
/// - remove NaN values
#[derive(Debug)]
pub struct SeriesNormalize {
offset: Millisecond,
@@ -237,7 +239,23 @@ impl SeriesNormalizeStream {
.iter()
.map(|array| compute::take(array, &ordered_indices, None))
.collect::<ArrowResult<Vec<_>>>()?;
RecordBatch::try_new(input.schema(), ordered_columns)
let ordered_batch = RecordBatch::try_new(input.schema(), ordered_columns)?;
// TODO(ruihang): consider the "special NaN"
// filter out NaN
let mut filter = vec![true; input.num_rows()];
for column in ordered_batch.columns() {
if let Some(float_column) = column.as_any().downcast_ref::<Float64Array>() {
for (i, flag) in filter.iter_mut().enumerate() {
if float_column.value(i).is_nan() {
*flag = false;
}
}
}
}
let result = compute::filter_record_batch(&ordered_batch, &BooleanArray::from(filter))?;
Ok(result)
}
}

View File

@@ -103,9 +103,12 @@ impl<S: ContextProvider> PromPlanner<S> {
let input = self.prom_expr_to_plan(*expr.clone())?;
// calculate columns to group by
let group_exprs = modifier.as_ref().map_or(Ok(Vec::new()), |m| {
self.agg_modifier_to_col(input.schema(), m)
})?;
// Need to append time index column into group by columns
let group_exprs = modifier
.as_ref()
.map_or(Ok(vec![self.create_time_index_column_expr()?]), |m| {
self.agg_modifier_to_col(input.schema(), m)
})?;
// convert op and value columns to aggregate exprs
let aggr_exprs = self.create_aggregate_exprs(*op, &input)?;
@@ -114,9 +117,15 @@ impl<S: ContextProvider> PromPlanner<S> {
self.ctx.time_index_column = None;
// create plan
let group_sort_expr = group_exprs
.clone()
.into_iter()
.map(|expr| expr.sort(true, false));
LogicalPlanBuilder::from(input)
.aggregate(group_exprs, aggr_exprs)
.context(DataFusionPlanningSnafu)?
.sort(group_sort_expr)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)?
}
@@ -267,6 +276,7 @@ impl<S: ContextProvider> PromPlanner<S> {
})?)?;
let mut func_exprs = self.create_function_expr(func, args.literals)?;
func_exprs.insert(0, self.create_time_index_column_expr()?);
func_exprs.extend_from_slice(&self.create_tag_column_exprs()?);
LogicalPlanBuilder::from(input)
.project(func_exprs)
.context(DataFusionPlanningSnafu)?
@@ -302,12 +312,20 @@ impl<S: ContextProvider> PromPlanner<S> {
let table_name = self.ctx.table_name.clone().unwrap();
// make filter exprs
let offset_duration = -match offset {
Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond,
Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
None => 0,
};
let mut filters = self.matchers_to_expr(label_matchers)?;
filters.push(self.create_time_index_column_expr()?.gt_eq(DfExpr::Literal(
ScalarValue::TimestampMillisecond(Some(self.ctx.start), None),
ScalarValue::TimestampMillisecond(
Some(self.ctx.start - offset_duration - self.ctx.lookback_delta),
None,
),
)));
filters.push(self.create_time_index_column_expr()?.lt_eq(DfExpr::Literal(
ScalarValue::TimestampMillisecond(Some(self.ctx.end), None),
ScalarValue::TimestampMillisecond(Some(self.ctx.end - offset_duration), None),
)));
// make table scan with filter exprs
@@ -328,11 +346,6 @@ impl<S: ContextProvider> PromPlanner<S> {
});
// make series_normalize plan
let offset_duration = match offset {
Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond,
Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
None => 0,
};
let series_normalize = SeriesNormalize::new(
offset_duration,
self.ctx
@@ -344,10 +357,12 @@ impl<S: ContextProvider> PromPlanner<S> {
let logical_plan = LogicalPlan::Extension(Extension {
node: Arc::new(series_normalize),
});
Ok(logical_plan)
}
/// Convert [AggModifier] to [Column] exprs for aggregation.
/// Timestamp column and tag columns will be included.
///
/// # Side effect
///
@@ -380,6 +395,9 @@ impl<S: ContextProvider> PromPlanner<S> {
// change the tag columns in context
self.ctx.tag_columns = labels.iter().cloned().collect();
// add timestamp column
exprs.push(self.create_time_index_column_expr()?);
Ok(exprs)
}
AggModifier::Without(labels) => {
@@ -415,10 +433,14 @@ impl<S: ContextProvider> PromPlanner<S> {
self.ctx.tag_columns = all_fields.iter().map(|col| (*col).clone()).collect();
// collect remaining fields and convert to col expr
let exprs = all_fields
let mut exprs = all_fields
.into_iter()
.map(|c| DfExpr::Column(Column::from(c)))
.collect::<Vec<_>>();
// add timestamp column
exprs.push(self.create_time_index_column_expr()?);
Ok(exprs)
}
}
@@ -587,6 +609,15 @@ impl<S: ContextProvider> PromPlanner<S> {
)))
}
fn create_tag_column_exprs(&self) -> Result<Vec<DfExpr>> {
let mut result = Vec::with_capacity(self.ctx.tag_columns.len());
for tag in &self.ctx.tag_columns {
let expr = DfExpr::Column(Column::from_name(tag));
result.push(expr);
}
Ok(result)
}
fn create_tag_and_time_index_column_sort_exprs(&self) -> Result<Vec<DfExpr>> {
let mut result = self
.ctx
@@ -923,14 +954,14 @@ mod test {
let plan = PromPlanner::stmt_to_plan(eval_stmt, context_provider).unwrap();
let expected = String::from(
"Filter: some_metric.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N]\
\n Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) [timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N]\
"Filter: some_metric.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, tag_0:Utf8]\
\n Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, tag_0:Utf8]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(0, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
\n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
).replace("TEMPLATE", plan_name);
assert_eq!(plan.display_indent_schema().to_string(), expected);
@@ -1123,13 +1154,14 @@ mod test {
let context_provider = build_test_context_provider("some_metric".to_string(), 2, 2).await;
let plan = PromPlanner::stmt_to_plan(eval_stmt.clone(), context_provider).unwrap();
let expected_no_without = String::from(
"Aggregate: groupBy=[[some_metric.tag_1]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.tag_1 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(0, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
"Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
\n Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.tag_1 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
).replace("TEMPLATE", name);
assert_eq!(
plan.display_indent_schema().to_string(),
@@ -1145,13 +1177,14 @@ mod test {
let context_provider = build_test_context_provider("some_metric".to_string(), 2, 2).await;
let plan = PromPlanner::stmt_to_plan(eval_stmt, context_provider).unwrap();
let expected_without = String::from(
"Aggregate: groupBy=[[some_metric.tag_0]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.tag_1 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(0, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
"Sort: some_metric.tag_0 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
\n Aggregate: groupBy=[[some_metric.tag_0, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.tag_1 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
).replace("TEMPLATE", name);
assert_eq!(plan.display_indent_schema().to_string(), expected_without);
}
@@ -1271,14 +1304,14 @@ mod test {
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(0, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"foo\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"foo\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(0, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
\n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
);
assert_eq!(plan.display_indent_schema().to_string(), expected);
@@ -1306,8 +1339,8 @@ mod test {
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(0, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
\n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
);
assert_eq!(plan.display_indent_schema().to_string(), expected);

View File

@@ -88,7 +88,11 @@ impl QueryLanguageParser {
query: &query.query,
})?;
let step = promql_parser::util::parse_duration(&query.step)
let step = query
.step
.parse::<u64>()
.map(Duration::from_secs)
.or_else(|_| promql_parser::util::parse_duration(&query.step))
.map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments)))
.context(QueryParseSnafu {
query: &query.query,

View File

@@ -42,6 +42,7 @@ opensrv-mysql = { git = "https://github.com/datafuselabs/opensrv", rev = "b44c9d
pgwire = "0.10"
pin-project = "1.0"
postgres-types = { version = "0.2", features = ["with-chrono-0_4"] }
promql-parser = "0.1.0"
prost.workspace = true
query = { path = "../query" }
rand = "0.8"

View File

@@ -19,24 +19,35 @@ use std::sync::Arc;
use async_trait::async_trait;
use axum::body::BoxBody;
use axum::extract::{Query, State};
use axum::{routing, Json, Router};
use axum::{routing, Form, Json, Router};
use common_error::prelude::ErrorExt;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::info;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVector;
use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
use futures::FutureExt;
use promql_parser::label::METRIC_NAME;
use promql_parser::parser::{
AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, MatrixSelector, ParenExpr, SubqueryExpr,
UnaryExpr, VectorSelector,
};
use query::parser::PromQuery;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::oneshot::Sender;
use tokio::sync::{oneshot, Mutex};
use tower::ServiceBuilder;
use tower_http::auth::AsyncRequireAuthorizationLayer;
use tower_http::compression::CompressionLayer;
use tower_http::trace::TraceLayer;
use crate::auth::UserProviderRef;
use crate::error::{AlreadyStartedSnafu, CollectRecordbatchSnafu, Result, StartHttpSnafu};
use crate::error::{
AlreadyStartedSnafu, CollectRecordbatchSnafu, InternalSnafu, Result, StartHttpSnafu,
};
use crate::http::authorize::HttpAuth;
use crate::server::Server;
@@ -74,15 +85,16 @@ impl PromqlServer {
let router = Router::new()
.route("/query", routing::post(instant_query).get(instant_query))
.route("/range_query", routing::post(range_query).get(range_query))
.route("/query_range", routing::post(range_query).get(range_query))
.with_state(self.query_handler.clone());
Router::new()
.nest(&format!("/{PROMQL_API_VERSION}"), router)
.nest(&format!("/api/{PROMQL_API_VERSION}"), router)
// middlewares
.layer(
ServiceBuilder::new()
.layer(TraceLayer::new_for_http())
.layer(CompressionLayer::new())
// custom layer
.layer(AsyncRequireAuthorizationLayer::new(
HttpAuth::<BoxBody>::new(self.user_provider.clone()),
@@ -134,7 +146,7 @@ impl Server for PromqlServer {
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct PromqlSeries {
metric: HashMap<String, String>,
values: Vec<(i64, String)>,
values: Vec<(f64, String)>,
}
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
@@ -148,8 +160,12 @@ pub struct PromqlData {
pub struct PromqlJsonResponse {
status: String,
data: PromqlData,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "errorType")]
error_type: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
warnings: Option<Vec<String>>,
}
@@ -179,17 +195,17 @@ impl PromqlJsonResponse {
}
/// Convert from `Result<Output>`
pub async fn from_query_result(result: Result<Output>) -> Json<Self> {
pub async fn from_query_result(result: Result<Output>, metric_name: String) -> Json<Self> {
let response: Result<Json<Self>> = try {
let json = match result? {
Output::RecordBatches(batches) => {
Self::success(Self::record_batches_to_data(batches)?)
Self::success(Self::record_batches_to_data(batches, metric_name)?)
}
Output::Stream(stream) => {
let record_batches = RecordBatches::try_collect(stream)
.await
.context(CollectRecordbatchSnafu)?;
Self::success(Self::record_batches_to_data(record_batches)?)
Self::success(Self::record_batches_to_data(record_batches, metric_name)?)
}
Output::AffectedRows(_) => Self::error(
"unexpected result",
@@ -203,16 +219,102 @@ impl PromqlJsonResponse {
response.unwrap_or_else(|err| Self::error(err.status_code().to_string(), err.to_string()))
}
/// TODO(ruihang): implement this conversion method
fn record_batches_to_data(_: RecordBatches) -> Result<PromqlData> {
fn record_batches_to_data(batches: RecordBatches, metric_name: String) -> Result<PromqlData> {
// infer semantic type of each column from schema.
// TODO(ruihang): wish there is a better way to do this.
let mut timestamp_column_index = None;
let mut tag_column_indices = Vec::new();
let mut first_value_column_index = None;
for (i, column) in batches.schema().column_schemas().iter().enumerate() {
match column.data_type {
ConcreteDataType::Timestamp(datatypes::types::TimestampType::Millisecond(_)) => {
if timestamp_column_index.is_none() {
timestamp_column_index = Some(i);
}
}
ConcreteDataType::Float64(_) => {
if first_value_column_index.is_none() {
first_value_column_index = Some(i);
}
}
ConcreteDataType::String(_) => {
tag_column_indices.push(i);
}
_ => {}
}
}
let timestamp_column_index = timestamp_column_index.context(InternalSnafu {
err_msg: "no timestamp column found".to_string(),
})?;
let first_value_column_index = first_value_column_index.context(InternalSnafu {
err_msg: "no value column found".to_string(),
})?;
let metric_name = (METRIC_NAME.to_string(), metric_name);
let mut buffer = HashMap::<Vec<(String, String)>, Vec<(f64, String)>>::new();
for batch in batches.iter() {
// prepare things...
let tag_columns = tag_column_indices
.iter()
.map(|i| {
batch
.column(*i)
.as_any()
.downcast_ref::<StringVector>()
.unwrap()
})
.collect::<Vec<_>>();
let tag_names = tag_column_indices
.iter()
.map(|c| batches.schema().column_name_by_index(*c).to_string())
.collect::<Vec<_>>();
let timestamp_column = batch
.column(timestamp_column_index)
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap();
let value_column = batch
.column(first_value_column_index)
.as_any()
.downcast_ref::<Float64Vector>()
.unwrap();
// assemble rows
for row_index in 0..batch.num_rows() {
// retrieve tags
// TODO(ruihang): push table name `__metric__`
let mut tags = vec![metric_name.clone()];
for (tag_column, tag_name) in tag_columns.iter().zip(tag_names.iter()) {
let tag_value = tag_column.get_data(row_index).unwrap().to_string();
tags.push((tag_name.to_string(), tag_value));
}
// retrieve timestamp
let timestamp_millis: i64 = timestamp_column.get_data(row_index).unwrap().into();
let timestamp = timestamp_millis as f64 / 1000.0;
// retrieve value
let value =
Into::<f64>::into(value_column.get_data(row_index).unwrap()).to_string();
buffer.entry(tags).or_default().push((timestamp, value));
}
}
let result = buffer
.into_iter()
.map(|(tags, values)| PromqlSeries {
metric: tags.into_iter().collect(),
values,
})
.collect();
let data = PromqlData {
result_type: "matrix".to_string(),
result: vec![PromqlSeries {
metric: vec![("__name__".to_string(), "foo".to_string())]
.into_iter()
.collect(),
values: vec![(1, "123.45".to_string())],
}],
result,
};
Ok(data)
@@ -233,16 +335,16 @@ pub async fn instant_query(
) -> Json<PromqlJsonResponse> {
PromqlJsonResponse::error(
"not implemented",
"instant query api `/query` is not implemented. Use `/range_query` instead.",
"instant query api `/query` is not implemented. Use `/query_range` instead.",
)
}
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct RangeQuery {
query: String,
start: String,
end: String,
step: String,
query: Option<String>,
start: Option<String>,
end: Option<String>,
step: Option<String>,
timeout: Option<String>,
}
@@ -250,13 +352,47 @@ pub struct RangeQuery {
pub async fn range_query(
State(handler): State<PromqlHandlerRef>,
Query(params): Query<RangeQuery>,
Form(form_params): Form<RangeQuery>,
) -> Json<PromqlJsonResponse> {
let prom_query = PromQuery {
query: params.query,
start: params.start,
end: params.end,
step: params.step,
query: params.query.or(form_params.query).unwrap_or_default(),
start: params.start.or(form_params.start).unwrap_or_default(),
end: params.end.or(form_params.end).unwrap_or_default(),
step: params.step.or(form_params.step).unwrap_or_default(),
};
let result = handler.do_query(&prom_query).await;
PromqlJsonResponse::from_query_result(result).await
let metric_name = retrieve_metric_name(&prom_query.query).unwrap_or_default();
PromqlJsonResponse::from_query_result(result, metric_name).await
}
fn retrieve_metric_name(promql: &str) -> Option<String> {
let promql_expr = promql_parser::parser::parse(promql).ok()?;
promql_expr_to_metric_name(promql_expr)
}
fn promql_expr_to_metric_name(expr: PromqlExpr) -> Option<String> {
match expr {
PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => promql_expr_to_metric_name(*expr),
PromqlExpr::Unary(UnaryExpr { expr }) => promql_expr_to_metric_name(*expr),
PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
promql_expr_to_metric_name(*lhs).or(promql_expr_to_metric_name(*rhs))
}
PromqlExpr::Paren(ParenExpr { expr }) => promql_expr_to_metric_name(*expr),
PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => promql_expr_to_metric_name(*expr),
PromqlExpr::NumberLiteral(_) => None,
PromqlExpr::StringLiteral(_) => None,
PromqlExpr::VectorSelector(VectorSelector { matchers, .. }) => {
matchers.find_matchers(METRIC_NAME).pop().cloned()
}
PromqlExpr::MatrixSelector(MatrixSelector {
vector_selector, ..
}) => {
let VectorSelector { matchers, .. } = vector_selector;
matchers.find_matchers(METRIC_NAME).pop().cloned()
}
PromqlExpr::Call(Call { args, .. }) => args
.args
.into_iter()
.find_map(|e| promql_expr_to_metric_name(*e)),
}
}

View File

@@ -290,18 +290,19 @@ pub async fn test_promql_http_api(store_type: StorageType) {
let client = TestClient::new(app);
// instant query
let res = client.get("/v1/query?query=up").send().await;
let res = client.get("/api/v1/query?query=up").send().await;
assert_eq!(res.status(), StatusCode::OK);
let res = client.post("/v1/query?query=up").send().await;
let res = client.post("/api/v1/query?query=up").send().await;
assert_eq!(res.status(), StatusCode::OK);
let res = client
.get("/v1/range_query?query=up&start=1&end=100&step=5")
.get("/api/v1/query_range?query=up&start=1&end=100&step=5")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let res = client
.post("/v1/range_query?query=up&start=1&end=100&step=5")
.post("/api/v1/query_range?query=up&start=1&end=100&step=5")
.header("Content-Type", "application/x-www-form-urlencoded")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);