fix: add sqlness tests for some promql function (#1838)

* correct range manipulate exec fmt text

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix partition requirement

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix udf signature

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* finilise

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* ignore unstable ordered result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add nan value test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-06-27 19:05:26 +08:00
committed by GitHub
parent 99f0479bd2
commit b737a240de
12 changed files with 390 additions and 30 deletions

View File

@@ -326,7 +326,7 @@ impl ExecutionPlan for RangeManipulateExec {
DisplayFormatType::Default => {
write!(
f,
"PromInstantManipulateExec: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}]",
"PromRangeManipulateExec: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}]",
self.start, self.end, self.interval, self.range, self.time_index_column
)
}

View File

@@ -27,8 +27,8 @@ use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogi
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
Statistics,
DisplayFormatType, Distribution, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use datatypes::arrow::compute;
use futures::{ready, Stream, StreamExt};
@@ -129,9 +129,15 @@ impl ExecutionPlan for SeriesDivideExec {
}
fn output_partitioning(&self) -> Partitioning {
self.input.output_partitioning()
Partitioning::UnknownPartitioning(1)
}
fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::SinglePartition]
}
// TODO(ruihang): specify required input ordering
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
self.input.output_ordering()
}
@@ -229,7 +235,7 @@ impl Stream for SeriesDivideStream {
loop {
if let Some(batch) = self.buffer.clone() {
let same_length = self.find_first_diff_row(&batch) + 1;
if same_length == batch.num_rows() {
if same_length >= batch.num_rows() {
let next_batch = match ready!(self.as_mut().fetch_next_batch(cx)) {
Some(Ok(batch)) => batch,
None => {
@@ -277,6 +283,11 @@ impl SeriesDivideStream {
}
fn find_first_diff_row(&self, batch: &RecordBatch) -> usize {
// fast path: no tag columns means all data belongs to the same series.
if self.tag_indices.is_empty() {
return batch.num_rows();
}
let num_rows = batch.num_rows();
let mut result = num_rows;

View File

@@ -65,8 +65,7 @@ impl QuantileOverTime {
fn calc(&self, input: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
// construct matrix from input.
// The third one is quantile param, which is included in fields.
assert_eq!(input.len(), 3);
assert_eq!(input.len(), 2);
let ts_array = extract_array(&input[0])?;
let value_array = extract_array(&input[1])?;

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeSet, HashSet};
use std::collections::{BTreeSet, HashSet, VecDeque};
use std::str::FromStr;
use std::sync::Arc;
use std::time::UNIX_EPOCH;
@@ -833,9 +833,10 @@ impl PromPlanner {
fn create_function_expr(
&mut self,
func: &Function,
mut other_input_exprs: Vec<DfExpr>,
other_input_exprs: Vec<DfExpr>,
) -> Result<Vec<DfExpr>> {
// TODO(ruihang): check function args list
let mut other_input_exprs: VecDeque<DfExpr> = other_input_exprs.into();
// TODO(ruihang): set this according to in-param list
let field_column_pos = 0;
@@ -865,8 +866,8 @@ impl PromPlanner {
"stddev_over_time" => ScalarFunc::Udf(StddevOverTime::scalar_udf()),
"stdvar_over_time" => ScalarFunc::Udf(StdvarOverTime::scalar_udf()),
"quantile_over_time" => {
let quantile_expr = match other_input_exprs.get(0) {
Some(DfExpr::Literal(ScalarValue::Float64(Some(quantile)))) => *quantile,
let quantile_expr = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Float64(Some(quantile)))) => quantile,
other => UnexpectedPlanExprSnafu {
desc: format!("expect f64 literal as quantile, but found {:?}", other),
}
@@ -875,8 +876,8 @@ impl PromPlanner {
ScalarFunc::Udf(QuantileOverTime::scalar_udf(quantile_expr))
}
"predict_linear" => {
let t_expr = match other_input_exprs.get(0) {
Some(DfExpr::Literal(ScalarValue::Time64Microsecond(Some(t)))) => *t,
let t_expr = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Time64Microsecond(Some(t)))) => t,
other => UnexpectedPlanExprSnafu {
desc: format!("expect i64 literal as t, but found {:?}", other),
}
@@ -885,8 +886,8 @@ impl PromPlanner {
ScalarFunc::Udf(PredictLinear::scalar_udf(t_expr))
}
"holt_winters" => {
let sf_exp = match other_input_exprs.get(0) {
Some(DfExpr::Literal(ScalarValue::Float64(Some(sf)))) => *sf,
let sf_exp = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Float64(Some(sf)))) => sf,
other => UnexpectedPlanExprSnafu {
desc: format!(
"expect f64 literal as smoothing factor, but found {:?}",
@@ -895,8 +896,8 @@ impl PromPlanner {
}
.fail()?,
};
let tf_exp = match other_input_exprs.get(1) {
Some(DfExpr::Literal(ScalarValue::Float64(Some(tf)))) => *tf,
let tf_exp = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Float64(Some(tf)))) => tf,
other => UnexpectedPlanExprSnafu {
desc: format!("expect f64 literal as trend factor, but found {:?}", other),
}
@@ -924,7 +925,7 @@ impl PromPlanner {
other_input_exprs.insert(field_column_pos, col_expr);
let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
fun,
args: other_input_exprs.clone(),
args: other_input_exprs.clone().into(),
});
exprs.push(fn_expr);
let _ = other_input_exprs.remove(field_column_pos);
@@ -939,7 +940,7 @@ impl PromPlanner {
other_input_exprs.insert(field_column_pos + 1, col_expr);
let fn_expr = DfExpr::ScalarUDF(ScalarUDF {
fun: Arc::new(fun),
args: other_input_exprs.clone(),
args: other_input_exprs.clone().into(),
});
exprs.push(fn_expr);
let _ = other_input_exprs.remove(field_column_pos + 1);
@@ -957,7 +958,7 @@ impl PromPlanner {
.insert(field_column_pos + 2, self.create_time_index_column_expr()?);
let fn_expr = DfExpr::ScalarUDF(ScalarUDF {
fun: Arc::new(fun),
args: other_input_exprs.clone(),
args: other_input_exprs.clone().into(),
});
exprs.push(fn_expr);
let _ = other_input_exprs.remove(field_column_pos + 2);

View File

@@ -19,9 +19,9 @@ TQL ANALYZE (0, 10, '5s') test;
| Plan with Metrics | CoalescePartitionsExec, REDACTED
|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j], REDACTED
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_PromSeriesDivideExec: tags=["k"], REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_MergeScanExec: peers=[REDACTED
|_|_|
+-+-+

View File

@@ -21,8 +21,8 @@ TQL EXPLAIN (0, 10, '5s') test;
| | TableScan: test projection=[i, j, k], partial_filters=[j >= TimestampMillisecond(-300000, None), j <= TimestampMillisecond(300000, None)] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["k"] |
| | RepartitionExec: partitioning=REDACTED
| | RepartitionExec: partitioning=REDACTED
| | PromSeriesDivideExec: tags=["k"] |
| | MergeScanExec: peers=[REDACTED
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+
@@ -58,8 +58,8 @@ TQL EXPLAIN host_load1{__field__="val"};
| | TableScan: host_load1 projection=[ts, collector, host, val], partial_filters=[ts >= TimestampMillisecond(-300000, None), ts <= TimestampMillisecond(300000, None)] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[ts] |
| | PromSeriesNormalizeExec: offset=[0], time index=[ts], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["collector", "host"] |
| | RepartitionExec: partitioning=REDACTED
| | RepartitionExec: partitioning=REDACTED
| | PromSeriesDivideExec: tags=["collector", "host"] |
| | ProjectionExec: expr=[val@3 as val, collector@1 as collector, host@2 as host, ts@0 as ts] |
| | MergeScanExec: peers=[REDACTED
| | |

View File

@@ -0,0 +1,31 @@
create table data (ts timestamp(3) time index, val double);
Affected Rows: 0
insert into data values
(0, 'infinity'::double),
(1, '-infinity'::double),
(2, 'nan'::double),
(3, 'NaN'::double);
Affected Rows: 4
select * from data;
+-------------------------+------+
| ts | val |
+-------------------------+------+
| 1970-01-01T00:00:00 | inf |
| 1970-01-01T00:00:00.001 | -inf |
| 1970-01-01T00:00:00.002 | NaN |
| 1970-01-01T00:00:00.003 | NaN |
+-------------------------+------+
insert into data values (4, 'infinityyyy'::double);
Error: 3001(EngineExecuteQuery), Cast error: Cannot cast string 'infinityyyy' to value of Float64 type
drop table data;
Affected Rows: 1

View File

@@ -0,0 +1,13 @@
create table data (ts timestamp(3) time index, val double);
insert into data values
(0, 'infinity'::double),
(1, '-infinity'::double),
(2, 'nan'::double),
(3, 'NaN'::double);
select * from data;
insert into data values (4, 'infinityyyy'::double);
drop table data;

View File

@@ -0,0 +1,132 @@
create table metric (ts timestamp(3) time index, val double);
Affected Rows: 0
insert into metric values
(0,0),
(10000,8),
(20000,8),
(30000,2),
(40000,3);
Affected Rows: 5
select * from metric;
+---------------------+-----+
| ts | val |
+---------------------+-----+
| 1970-01-01T00:00:00 | 0.0 |
| 1970-01-01T00:00:10 | 8.0 |
| 1970-01-01T00:00:20 | 8.0 |
| 1970-01-01T00:00:30 | 2.0 |
| 1970-01-01T00:00:40 | 3.0 |
+---------------------+-----+
tql eval (60, 61, '10s') stdvar_over_time(metric[1m]);
+---------------------+-------------------------------------+
| ts | prom_stdvar_over_time(ts_range,val) |
+---------------------+-------------------------------------+
| 1970-01-01T00:01:00 | 10.559999999999999 |
+---------------------+-------------------------------------+
tql eval (60, 60, '1s') stddev_over_time(metric[1m]);
+---------------------+-------------------------------------+
| ts | prom_stddev_over_time(ts_range,val) |
+---------------------+-------------------------------------+
| 1970-01-01T00:01:00 | 3.249615361854384 |
+---------------------+-------------------------------------+
tql eval (60, 60, '1s') stddev_over_time((metric[1m]));
+---------------------+-------------------------------------+
| ts | prom_stddev_over_time(ts_range,val) |
+---------------------+-------------------------------------+
| 1970-01-01T00:01:00 | 3.249615361854384 |
+---------------------+-------------------------------------+
drop table metric;
Affected Rows: 1
create table metric (ts timestamp(3) time index, val double);
Affected Rows: 0
insert into metric values
(0,0),
(10000,1.5990505637277868),
(20000,1.5990505637277868),
(30000,1.5990505637277868);
Affected Rows: 4
tql eval (60, 60, '1s') stdvar_over_time(metric[1m]);
+---------------------+-------------------------------------+
| ts | prom_stdvar_over_time(ts_range,val) |
+---------------------+-------------------------------------+
| 1970-01-01T00:01:00 | 0.47943050725465364 |
+---------------------+-------------------------------------+
tql eval (60, 60, '1s') stddev_over_time(metric[1m]);
+---------------------+-------------------------------------+
| ts | prom_stddev_over_time(ts_range,val) |
+---------------------+-------------------------------------+
| 1970-01-01T00:01:00 | 0.6924092050620454 |
+---------------------+-------------------------------------+
drop table metric;
Affected Rows: 1
create table data (ts timestamp(3) time index, val double, test string primary key);
Affected Rows: 0
insert into data values
(0, 0, "two samples"),
(10000, 1, "two samples"),
(0, 0, "three samples"),
(10000, 1, "three samples"),
(20000, 2, "three samples"),
(0, 0, "uneven samples"),
(10000, 1, "uneven samples"),
(20000, 4, "uneven samples");
Affected Rows: 8
drop table data;
Affected Rows: 1
create table data (ts timestamp(3) time index, val double, ty string primary key);
Affected Rows: 0
insert into data values
(0, 2::double, 'numbers'),
(10000, 0::double, 'numbers'),
(20000, 3::double, 'numbers'),
(0, 2::double, 'some_nan'),
(10000, 0::double, 'some_nan'),
(20000, 'NaN'::double, 'some_nan'),
(0, 2::double, 'some_nan2'),
(10000, 'NaN'::double, 'some_nan2'),
(20000, 1::double, 'some_nan2'),
(0, 'NaN'::double, 'some_nan3'),
(10000, 0::double, 'some_nan3'),
(20000, 1::double, 'some_nan3'),
(0, 'NaN'::double, 'only_nan'),
(10000, 'NaN'::double, 'only_nan'),
(20000, 'NaN'::double, 'only_nan');
Affected Rows: 15
drop table data;
Affected Rows: 1

View File

@@ -0,0 +1,173 @@
-- Port from functions.test L607 - L630, commit 001ee2620e094970e5657ce39275b2fccdbd1359
-- Include stddev/stdvar over time
-- load 10s
-- metric 0 8 8 2 3
create table metric (ts timestamp(3) time index, val double);
insert into metric values
(0,0),
(10000,8),
(20000,8),
(30000,2),
(40000,3);
select * from metric;
-- eval instant at 1m stdvar_over_time(metric[1m])
-- {} 10.56
tql eval (60, 61, '10s') stdvar_over_time(metric[1m]);
-- eval instant at 1m stddev_over_time(metric[1m])
-- {} 3.249615
tql eval (60, 60, '1s') stddev_over_time(metric[1m]);
-- eval instant at 1m stddev_over_time((metric[1m]))
-- {} 3.249615
tql eval (60, 60, '1s') stddev_over_time((metric[1m]));
drop table metric;
-- load 10s
-- metric 1.5990505637277868 1.5990505637277868 1.5990505637277868
create table metric (ts timestamp(3) time index, val double);
insert into metric values
(0,0),
(10000,1.5990505637277868),
(20000,1.5990505637277868),
(30000,1.5990505637277868);
-- eval instant at 1m stdvar_over_time(metric[1m])
-- {} 0
tql eval (60, 60, '1s') stdvar_over_time(metric[1m]);
-- eval instant at 1m stddev_over_time(metric[1m])
-- {} 0
tql eval (60, 60, '1s') stddev_over_time(metric[1m]);
drop table metric;
-- Port from functions.test L632 - L680, commit 001ee2620e094970e5657ce39275b2fccdbd1359
-- Include quantile over time
-- load 10s
-- data{test="two samples"} 0 1
-- data{test="three samples"} 0 1 2
-- data{test="uneven samples"} 0 1 4
create table data (ts timestamp(3) time index, val double, test string primary key);
insert into data values
(0, 0, "two samples"),
(10000, 1, "two samples"),
(0, 0, "three samples"),
(10000, 1, "three samples"),
(20000, 2, "three samples"),
(0, 0, "uneven samples"),
(10000, 1, "uneven samples"),
(20000, 4, "uneven samples");
-- eval instant at 1m quantile_over_time(0, data[1m])
-- {test="two samples"} 0
-- {test="three samples"} 0
-- {test="uneven samples"} 0
-- tql eval (60, 60, '1s') quantile_over_time(0, data[1m]);
-- eval instant at 1m quantile_over_time(0.5, data[1m])
-- {test="two samples"} 0.5
-- {test="three samples"} 1
-- {test="uneven samples"} 1
-- tql eval (60, 60, '1s') quantile_over_time(0.5, data[1m]);
-- eval instant at 1m quantile_over_time(0.75, data[1m])
-- {test="two samples"} 0.75
-- {test="three samples"} 1.5
-- {test="uneven samples"} 2.5
-- tql eval (60, 60, '1s') quantile_over_time(0.75, data[1m]);
-- eval instant at 1m quantile_over_time(0.8, data[1m])
-- {test="two samples"} 0.8
-- {test="three samples"} 1.6
-- {test="uneven samples"} 2.8
-- tql eval (60, 60, '1s') quantile_over_time(0.8, data[1m]);
-- eval instant at 1m quantile_over_time(1, data[1m])
-- {test="two samples"} 1
-- {test="three samples"} 2
-- {test="uneven samples"} 4
-- tql eval (60, 60, '1s') quantile_over_time(1, data[1m]);
-- eval instant at 1m quantile_over_time(-1, data[1m])
-- {test="two samples"} -Inf
-- {test="three samples"} -Inf
-- {test="uneven samples"} -Inf
-- tql eval (60, 60, '1s') quantile_over_time(-1, data[1m]);
-- eval instant at 1m quantile_over_time(2, data[1m])
-- {test="two samples"} +Inf
-- {test="three samples"} +Inf
-- {test="uneven samples"} +Inf
-- tql eval (60, 60, '1s') quantile_over_time(2, data[1m]);
-- eval instant at 1m (quantile_over_time(2, (data[1m])))
-- {test="two samples"} +Inf
-- {test="three samples"} +Inf
-- {test="uneven samples"} +Inf
-- tql eval (60, 60, '1s') (quantile_over_time(2, (data[1m])));
drop table data;
-- Port from functions.test L773 - L802, commit 001ee2620e094970e5657ce39275b2fccdbd1359
-- Include max/min/last over time
-- load 10s
-- data{type="numbers"} 2 0 3
-- data{type="some_nan"} 2 0 NaN
-- data{type="some_nan2"} 2 NaN 1
-- data{type="some_nan3"} NaN 0 1
-- data{type="only_nan"} NaN NaN NaN
create table data (ts timestamp(3) time index, val double, ty string primary key);
insert into data values
(0, 2::double, 'numbers'),
(10000, 0::double, 'numbers'),
(20000, 3::double, 'numbers'),
(0, 2::double, 'some_nan'),
(10000, 0::double, 'some_nan'),
(20000, 'NaN'::double, 'some_nan'),
(0, 2::double, 'some_nan2'),
(10000, 'NaN'::double, 'some_nan2'),
(20000, 1::double, 'some_nan2'),
(0, 'NaN'::double, 'some_nan3'),
(10000, 0::double, 'some_nan3'),
(20000, 1::double, 'some_nan3'),
(0, 'NaN'::double, 'only_nan'),
(10000, 'NaN'::double, 'only_nan'),
(20000, 'NaN'::double, 'only_nan');
-- eval instant at 1m min_over_time(data[1m])
-- {type="numbers"} 0
-- {type="some_nan"} 0
-- {type="some_nan2"} 1
-- {type="some_nan3"} 0
-- {type="only_nan"} NaN
-- tql eval (60, 60, '1s') min_over_time(data[1m]);
-- eval instant at 1m max_over_time(data[1m])
-- {type="numbers"} 3
-- {type="some_nan"} 2
-- {type="some_nan2"} 2
-- {type="some_nan3"} 1
-- {type="only_nan"} NaN
-- tql eval (60, 60, '1s') max_over_time(data[1m]);
-- eval instant at 1m last_over_time(data[1m])
-- data{type="numbers"} 3
-- data{type="some_nan"} NaN
-- data{type="some_nan2"} 1
-- data{type="some_nan3"} 1
-- data{type="only_nan"} NaN
-- tql eval (60, 60, '1s') last_over_time(data[1m]);
drop table data;

View File

@@ -18,9 +18,9 @@ TQL ANALYZE (0, 10, '5s') test;
| Plan with Metrics | CoalescePartitionsExec, REDACTED
|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j], REDACTED
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_PromSeriesDivideExec: tags=["k"], REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_ExecutionPlan(PlaceHolder), REDACTED
|_|_|
+-+-+

View File

@@ -19,8 +19,8 @@ TQL EXPLAIN (0, 10, '5s') test;
| | TableScan: test projection=[i, j, k], partial_filters=[j >= TimestampMillisecond(-300000, None), j <= TimestampMillisecond(300000, None)] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["k"] |
| | RepartitionExec: partitioning=REDACTED
| | RepartitionExec: partitioning=REDACTED
| | PromSeriesDivideExec: tags=["k"] |
| | ExecutionPlan(PlaceHolder) |
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------+