fix: prom cast to f64 (#7840)

* fix: cast to f64

Signed-off-by: discord9 <discord9@163.com>

* test: div case

Signed-off-by: discord9 <discord9@163.com>

* test: int test

Signed-off-by: discord9 <discord9@163.com>

* chore: sqlness update

Signed-off-by: discord9 <discord9@163.com>

* chore: test

Signed-off-by: discord9 <discord9@163.com>

* chore: update test

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-03-24 14:24:52 +08:00
committed by GitHub
parent 9bd983ea40
commit 30e895abbe
6 changed files with 494 additions and 62 deletions

View File

@@ -3323,28 +3323,55 @@ impl PromPlanner {
fn prom_token_to_binary_expr_builder(
token: TokenType,
) -> Result<Box<dyn Fn(DfExpr, DfExpr) -> Result<DfExpr>>> {
let cast_float = |expr| {
if matches!(
&expr,
DfExpr::Cast(Cast {
data_type: ArrowDataType::Float64,
..
})
) || matches!(&expr, DfExpr::Literal(ScalarValue::Float64(_), _))
{
expr
} else {
DfExpr::Cast(Cast {
expr: Box::new(expr),
data_type: ArrowDataType::Float64,
})
}
};
match token.id() {
token::T_ADD => Ok(Box::new(|lhs, rhs| Ok(lhs + rhs))),
token::T_SUB => Ok(Box::new(|lhs, rhs| Ok(lhs - rhs))),
token::T_MUL => Ok(Box::new(|lhs, rhs| Ok(lhs * rhs))),
token::T_DIV => Ok(Box::new(|lhs, rhs| Ok(lhs / rhs))),
token::T_MOD => Ok(Box::new(|lhs: DfExpr, rhs| Ok(lhs % rhs))),
token::T_ADD => Ok(Box::new(move |lhs, rhs| {
Ok(cast_float(lhs) + cast_float(rhs))
})),
token::T_SUB => Ok(Box::new(move |lhs, rhs| {
Ok(cast_float(lhs) - cast_float(rhs))
})),
token::T_MUL => Ok(Box::new(move |lhs, rhs| {
Ok(cast_float(lhs) * cast_float(rhs))
})),
token::T_DIV => Ok(Box::new(move |lhs, rhs| {
Ok(cast_float(lhs) / cast_float(rhs))
})),
token::T_MOD => Ok(Box::new(move |lhs: DfExpr, rhs| {
Ok(cast_float(lhs) % cast_float(rhs))
})),
token::T_EQLC => Ok(Box::new(|lhs, rhs| Ok(lhs.eq(rhs)))),
token::T_NEQ => Ok(Box::new(|lhs, rhs| Ok(lhs.not_eq(rhs)))),
token::T_GTR => Ok(Box::new(|lhs, rhs| Ok(lhs.gt(rhs)))),
token::T_LSS => Ok(Box::new(|lhs, rhs| Ok(lhs.lt(rhs)))),
token::T_GTE => Ok(Box::new(|lhs, rhs| Ok(lhs.gt_eq(rhs)))),
token::T_LTE => Ok(Box::new(|lhs, rhs| Ok(lhs.lt_eq(rhs)))),
token::T_POW => Ok(Box::new(|lhs, rhs| {
token::T_POW => Ok(Box::new(move |lhs, rhs| {
Ok(DfExpr::ScalarFunction(ScalarFunction {
func: datafusion_functions::math::power(),
args: vec![lhs, rhs],
args: vec![cast_float(lhs), cast_float(rhs)],
}))
})),
token::T_ATAN2 => Ok(Box::new(|lhs, rhs| {
token::T_ATAN2 => Ok(Box::new(move |lhs, rhs| {
Ok(DfExpr::ScalarFunction(ScalarFunction {
func: datafusion_functions::math::atan2(),
args: vec![lhs, rhs],
args: vec![cast_float(lhs), cast_float(rhs)],
}))
})),
_ => UnexpectedTokenSnafu { token }.fail(),
@@ -5169,7 +5196,7 @@ mod test {
.unwrap();
let expected = String::from(
"Projection: rhs.tag_0, rhs.timestamp, lhs.field_0 + rhs.field_0 AS lhs.field_0 + rhs.field_0 [tag_0:Utf8, timestamp:Timestamp(ms), lhs.field_0 + rhs.field_0:Float64;N]\
"Projection: rhs.tag_0, rhs.timestamp, CAST(lhs.field_0 AS Float64) + CAST(rhs.field_0 AS Float64) AS lhs.field_0 + rhs.field_0 [tag_0:Utf8, timestamp:Timestamp(ms), lhs.field_0 + rhs.field_0:Float64;N]\
\n Inner Join: lhs.tag_0 = rhs.tag_0, lhs.timestamp = rhs.timestamp [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
\n SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
@@ -5224,7 +5251,7 @@ mod test {
async fn binary_op_literal_column() {
let query = r#"1 + some_metric{tag_0="bar"}"#;
let expected = String::from(
"Projection: some_metric.tag_0, some_metric.timestamp, Float64(1) + some_metric.field_0 AS Float64(1) + field_0 [tag_0:Utf8, timestamp:Timestamp(ms), Float64(1) + field_0:Float64;N]\
"Projection: some_metric.tag_0, some_metric.timestamp, Float64(1) + CAST(some_metric.field_0 AS Float64) AS Float64(1) + field_0 [tag_0:Utf8, timestamp:Timestamp(ms), Float64(1) + field_0:Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
\n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
@@ -5262,7 +5289,7 @@ mod test {
async fn bool_with_additional_arithmetic() {
let query = "some_metric + (1 == bool 2)";
let expected = String::from(
"Projection: some_metric.tag_0, some_metric.timestamp, some_metric.field_0 + CAST(Float64(1) = Float64(2) AS Float64) AS field_0 + Float64(1) = Float64(2) [tag_0:Utf8, timestamp:Timestamp(ms), field_0 + Float64(1) = Float64(2):Float64;N]\
"Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 AS Float64) + CAST(Float64(1) = Float64(2) AS Float64) AS field_0 + Float64(1) = Float64(2) [tag_0:Utf8, timestamp:Timestamp(ms), field_0 + Float64(1) = Float64(2):Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
\n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
@@ -5372,7 +5399,7 @@ mod test {
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
.await
.unwrap();
let expected = "Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value\
let expected = "Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, CAST(http_server_requests_seconds_sum.greptime_value AS Float64) / CAST(http_server_requests_seconds_count.greptime_value AS Float64) AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value\
\n Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri\
\n SubqueryAlias: http_server_requests_seconds_sum\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
@@ -5763,7 +5790,7 @@ mod test {
let query = "some_alt_metric{__schema__=\"greptime_private\"} / some_metric";
let expected = String::from(
"Projection: some_metric.tag_0, some_metric.timestamp, greptime_private.some_alt_metric.field_0 / some_metric.field_0 AS greptime_private.some_alt_metric.field_0 / some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(ms), greptime_private.some_alt_metric.field_0 / some_metric.field_0:Float64;N]\
"Projection: some_metric.tag_0, some_metric.timestamp, CAST(greptime_private.some_alt_metric.field_0 AS Float64) / CAST(some_metric.field_0 AS Float64) AS greptime_private.some_alt_metric.field_0 / some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(ms), greptime_private.some_alt_metric.field_0 / some_metric.field_0:Float64;N]\
\n Inner Join: greptime_private.some_alt_metric.tag_0 = some_metric.tag_0, greptime_private.some_alt_metric.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
\n SubqueryAlias: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\

View File

@@ -15,7 +15,9 @@
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use common_query::Output;
use common_query::{Output, OutputData};
use common_recordbatch::util::collect_batches;
use datatypes::arrow::array::{Float64Array, Int64Array};
use frontend::instance::Instance;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use rstest::rstest;
@@ -151,6 +153,103 @@ async fn create_insert_tql_assert(
check_unordered_output_stream(query_output, expected).await;
}
async fn execute_all(instance: &Arc<Instance>, sql: &str, query_ctx: Arc<QueryContext>) {
instance
.do_query(sql, query_ctx)
.await
.into_iter()
.for_each(|v| {
let _ = v.unwrap();
});
}
#[allow(clippy::too_many_arguments)]
async fn promql_query_as_batches(
ins: Arc<Instance>,
promql: &str,
alias: Option<String>,
query_ctx: Arc<QueryContext>,
start: SystemTime,
end: SystemTime,
interval: Duration,
lookback: Duration,
) -> common_recordbatch::RecordBatches {
let output = promql_query(
ins, promql, alias, query_ctx, start, end, interval, lookback,
)
.await
.unwrap();
match output.data {
OutputData::Stream(stream) => collect_batches(stream).await.unwrap(),
OutputData::RecordBatches(recordbatches) => recordbatches,
_ => unreachable!(),
}
}
const ANON_PROMQL_RATIO_REPRO_DB: &str = "repro_db";
const ANON_PROMQL_RATIO_REPRO_CREATE: &str = r#"
CREATE TABLE phy (
t TIMESTAMP TIME INDEX,
v DOUBLE
) ENGINE=metric WITH ("physical_metric_table" = "");
CREATE TABLE metric_a (
l1 STRING NULL,
l2 STRING NULL,
l3 STRING NULL,
l4 STRING NULL,
l5 STRING NULL,
t TIMESTAMP NOT NULL,
v DOUBLE NULL,
TIME INDEX (t),
PRIMARY KEY (l1, l2, l3, l4, l5)
) ENGINE=metric WITH (on_physical_table = 'phy');
CREATE TABLE metric_b (
l6 STRING NULL,
l1 STRING NULL,
l2 STRING NULL,
l3 STRING NULL,
l4 STRING NULL,
t TIMESTAMP NOT NULL,
v DOUBLE NULL,
TIME INDEX (t),
PRIMARY KEY (l6, l1, l2, l3, l4)
) ENGINE=metric WITH (on_physical_table = 'phy');
"#;
const ANON_PROMQL_RATIO_REPRO_INSERT: &str = r#"
INSERT INTO metric_a (l1, l2, l3, l4, l5, t, v) VALUES
('v1', 'v2', 'v3', 'v4a', 'v5a', 1, 0),
('v1', 'v2', 'v3', 'v4a', 'v5a', 180000, 120),
('v1', 'v2', 'v3', 'v4a', 'v5a', 360000, 240),
('v1', 'v2', 'v3', 'v4a', 'v5b', 1, 0),
('v1', 'v2', 'v3', 'v4a', 'v5b', 180000, 30),
('v1', 'v2', 'v3', 'v4a', 'v5b', 360000, 60),
('v1', 'v2', 'v3-b', 'v4b', 'v5c', 1, 0),
('v1', 'v2', 'v3-b', 'v4b', 'v5c', 180000, 60),
('v1', 'v2', 'v3-b', 'v4b', 'v5c', 360000, 120);
INSERT INTO metric_b (l6, l1, l2, l3, l4, t, v) VALUES
('v6', 'v1', 'v2', 'v3', 'v4a', 1, 1),
('v6', 'v1', 'v2', 'v3', 'v4a', 180000, 1),
('v6', 'v1', 'v2', 'v3', 'v4a', 360000, 1),
('v6', 'v1', 'v2', 'v3-b', 'v4b', 1, 2),
('v6', 'v1', 'v2', 'v3-b', 'v4b', 180000, 2),
('v6', 'v1', 'v2', 'v3-b', 'v4b', 360000, 2);
"#;
const ANON_PROMQL_RATIO_REPRO_NUMERATOR: &str = r#"count(((rate(metric_a{l1="v1",l2="v2",l3=~"v3(|-a|-b)",__schema__="repro_db"}[3m]) / on(l3,l4) group_left metric_b{l6="v6",l1="v1",l2="v2",l3=~"v3(|-a|-b)",__schema__="repro_db"}) > 0.50))"#;
const ANON_PROMQL_RATIO_REPRO_DENOMINATOR: &str =
r#"count(rate(metric_a{l1="v1",l2="v2",l3=~"v3(|-a|-b)",__schema__="repro_db"}[3m]))"#;
const ANON_PROMQL_RATIO_REPRO_WHOLE: &str = r#"(count(((rate(metric_a{l1="v1",l2="v2",l3=~"v3(|-a|-b)",__schema__="repro_db"}[3m]) / on(l3,l4) group_left metric_b{l6="v6",l1="v1",l2="v2",l3=~"v3(|-a|-b)",__schema__="repro_db"}) > 0.50)) / count(rate(metric_a{l1="v1",l2="v2",l3=~"v3(|-a|-b)",__schema__="repro_db"}[3m]))) * 100"#;
const ANON_PROMQL_RATIO_REPRO_SCALAR_DIV: &str =
r#"count(rate(metric_a{l1="v1",l2="v2",l3=~"v3(|-a|-b)",__schema__="repro_db"}[3m])) / 2"#;
#[apply(both_instances_cases)]
async fn sql_insert_tql_query_ceil(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
@@ -709,3 +808,140 @@ async fn cross_schema_query(instance: Arc<dyn MockInstance>) {
check_unordered_output_stream(query_output, expected).await;
}
#[apply(both_instances_cases)]
async fn anon_promql_ratio_repro(instance: Arc<dyn MockInstance>) {
let ins = instance.frontend();
execute_all(
&ins,
&format!("CREATE DATABASE {ANON_PROMQL_RATIO_REPRO_DB}"),
QueryContext::arc(),
)
.await;
let repro_ctx: Arc<QueryContext> =
QueryContext::with_db_name(Some(ANON_PROMQL_RATIO_REPRO_DB)).into();
execute_all(&ins, ANON_PROMQL_RATIO_REPRO_CREATE, repro_ctx.clone()).await;
execute_all(&ins, ANON_PROMQL_RATIO_REPRO_INSERT, repro_ctx).await;
let start = UNIX_EPOCH.checked_add(Duration::from_secs(180)).unwrap();
let end = UNIX_EPOCH.checked_add(Duration::from_secs(360)).unwrap();
let interval = Duration::from_secs(180);
let lookback = Duration::from_secs(1);
let numerator = promql_query_as_batches(
ins.clone(),
ANON_PROMQL_RATIO_REPRO_NUMERATOR,
Some("num".to_string()),
QueryContext::arc(),
start,
end,
interval,
lookback,
)
.await;
let denominator = promql_query_as_batches(
ins.clone(),
ANON_PROMQL_RATIO_REPRO_DENOMINATOR,
Some("den".to_string()),
QueryContext::arc(),
start,
end,
interval,
lookback,
)
.await;
let whole = promql_query_as_batches(
ins.clone(),
ANON_PROMQL_RATIO_REPRO_WHOLE,
Some("pct".to_string()),
QueryContext::arc(),
start,
end,
interval,
lookback,
)
.await;
let scalar_div = promql_query_as_batches(
ins,
ANON_PROMQL_RATIO_REPRO_SCALAR_DIV,
Some("half_den".to_string()),
QueryContext::arc(),
start,
end,
interval,
lookback,
)
.await;
let numerator = numerator.iter().collect::<Vec<_>>();
let denominator = denominator.iter().collect::<Vec<_>>();
let whole = whole.iter().collect::<Vec<_>>();
let scalar_div = scalar_div.iter().collect::<Vec<_>>();
let numerator_values = numerator[0]
.column_by_name("num")
.unwrap()
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
let denominator_values = denominator[0]
.column_by_name("den")
.unwrap()
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
let percentage_values = whole[0]
.column_by_name("pct")
.unwrap()
.as_any()
.downcast_ref::<Float64Array>()
.unwrap();
let scalar_div_values = scalar_div[0]
.column_by_name("half_den")
.unwrap()
.as_any()
.downcast_ref::<Float64Array>()
.unwrap();
assert_eq!(numerator_values.len(), 1, "{}", numerator[0].pretty_print());
assert_eq!(
denominator_values.len(),
1,
"{}",
denominator[0].pretty_print()
);
assert_eq!(percentage_values.len(), 1, "{}", whole[0].pretty_print());
assert_eq!(
scalar_div_values.len(),
1,
"{}",
scalar_div[0].pretty_print()
);
assert_eq!(
numerator_values.value(0),
1,
"{}",
numerator[0].pretty_print()
);
assert_eq!(
denominator_values.value(0),
3,
"{}",
denominator[0].pretty_print()
);
assert!(
(scalar_div_values.value(0) - 1.5).abs() < 1e-9,
"{}",
scalar_div[0].pretty_print()
);
let expected = 100.0 / 3.0;
assert!(
(percentage_values.value(0) - expected).abs() < 1e-9,
"{}",
whole[0].pretty_print()
);
}

View File

@@ -442,54 +442,54 @@ Affected Rows: 0
-- SQLNESS REPLACE (Hash.*) REDACTED
tql explain (1752591864, 1752592164, '30s') sum by (a, b, c) (rate(aggr_optimize_not [2m])) / sum by (a, b, c) (rate(aggr_optimize_not_count [2m]));
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: aggr_optimize_not_count.a, aggr_optimize_not_count.b, aggr_optimize_not_count.c, aggr_optimize_not_count.greptime_timestamp, aggr_optimize_not.sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000))) / aggr_optimize_not_count.sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000))) AS aggr_optimize_not.sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000))) / aggr_optimize_not_count.sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000))) |
| | Inner Join: aggr_optimize_not.a = aggr_optimize_not_count.a, aggr_optimize_not.b = aggr_optimize_not_count.b, aggr_optimize_not.c = aggr_optimize_not_count.c, aggr_optimize_not.greptime_timestamp = aggr_optimize_not_count.greptime_timestamp |
| | MergeSort: aggr_optimize_not.a ASC NULLS LAST, aggr_optimize_not.b ASC NULLS LAST, aggr_optimize_not.c ASC NULLS LAST, aggr_optimize_not.greptime_timestamp ASC NULLS LAST |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | SubqueryAlias: aggr_optimize_not |
| | Sort: aggr_optimize_not.a ASC NULLS LAST, aggr_optimize_not.b ASC NULLS LAST, aggr_optimize_not.c ASC NULLS LAST, aggr_optimize_not.greptime_timestamp ASC NULLS LAST |
| | Aggregate: groupBy=[[aggr_optimize_not.a, aggr_optimize_not.b, aggr_optimize_not.c, aggr_optimize_not.greptime_timestamp]], aggr=[[sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))]] |
| | Filter: prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)) IS NOT NULL |
| | Projection: aggr_optimize_not.greptime_timestamp, prom_rate(greptime_timestamp_range, greptime_value, aggr_optimize_not.greptime_timestamp, Int64(120000)) AS prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)), aggr_optimize_not.a, aggr_optimize_not.b, aggr_optimize_not.c, aggr_optimize_not.d |
| | PromRangeManipulate: req range=[1752591864000..1752592164000], interval=[30000], eval range=[120000], time index=[greptime_timestamp], values=["greptime_value"] |
| | PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [true] |
| | PromSeriesDivide: tags=["a", "b", "c", "d"] |
| | Sort: aggr_optimize_not.a ASC NULLS FIRST, aggr_optimize_not.b ASC NULLS FIRST, aggr_optimize_not.c ASC NULLS FIRST, aggr_optimize_not.d ASC NULLS FIRST, aggr_optimize_not.greptime_timestamp ASC NULLS FIRST |
| | Filter: aggr_optimize_not.greptime_timestamp >= TimestampMillisecond(1752591744001, None) AND aggr_optimize_not.greptime_timestamp <= TimestampMillisecond(1752592164000, None) |
| | TableScan: aggr_optimize_not |
| | ]] |
| | SubqueryAlias: aggr_optimize_not_count |
| | Sort: aggr_optimize_not_count.a ASC NULLS LAST, aggr_optimize_not_count.b ASC NULLS LAST, aggr_optimize_not_count.c ASC NULLS LAST, aggr_optimize_not_count.greptime_timestamp ASC NULLS LAST |
| | Aggregate: groupBy=[[aggr_optimize_not_count.a, aggr_optimize_not_count.b, aggr_optimize_not_count.c, aggr_optimize_not_count.greptime_timestamp]], aggr=[[sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))]] |
| | Filter: prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)) IS NOT NULL |
| | Projection: aggr_optimize_not_count.greptime_timestamp, prom_rate(greptime_timestamp_range, greptime_value, aggr_optimize_not_count.greptime_timestamp, Int64(120000)) AS prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)), aggr_optimize_not_count.a, aggr_optimize_not_count.b, aggr_optimize_not_count.c |
| | PromRangeManipulate: req range=[1752591864000..1752592164000], interval=[30000], eval range=[120000], time index=[greptime_timestamp], values=["greptime_value"] |
| | PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [true] |
| | PromSeriesDivide: tags=["a", "b", "c", "d"] |
| | Sort: aggr_optimize_not_count.a ASC NULLS FIRST, aggr_optimize_not_count.b ASC NULLS FIRST, aggr_optimize_not_count.c ASC NULLS FIRST, aggr_optimize_not_count.d ASC NULLS FIRST, aggr_optimize_not_count.greptime_timestamp ASC NULLS FIRST |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | Filter: aggr_optimize_not_count.greptime_timestamp >= TimestampMillisecond(1752591744001, None) AND aggr_optimize_not_count.greptime_timestamp <= TimestampMillisecond(1752592164000, None) |
| | TableScan: aggr_optimize_not_count |
| | ]] |
| physical_plan | ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp, sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))@5 / sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))@4 as aggr_optimize_not.sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000))) / aggr_optimize_not_count.sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))] |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: aggr_optimize_not_count.a, aggr_optimize_not_count.b, aggr_optimize_not_count.c, aggr_optimize_not_count.greptime_timestamp, CAST(aggr_optimize_not.sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000))) AS Float64) / CAST(aggr_optimize_not_count.sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000))) AS Float64) AS aggr_optimize_not.sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000))) / aggr_optimize_not_count.sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000))) |
| | Inner Join: aggr_optimize_not.a = aggr_optimize_not_count.a, aggr_optimize_not.b = aggr_optimize_not_count.b, aggr_optimize_not.c = aggr_optimize_not_count.c, aggr_optimize_not.greptime_timestamp = aggr_optimize_not_count.greptime_timestamp |
| | MergeSort: aggr_optimize_not.a ASC NULLS LAST, aggr_optimize_not.b ASC NULLS LAST, aggr_optimize_not.c ASC NULLS LAST, aggr_optimize_not.greptime_timestamp ASC NULLS LAST |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | SubqueryAlias: aggr_optimize_not |
| | Sort: aggr_optimize_not.a ASC NULLS LAST, aggr_optimize_not.b ASC NULLS LAST, aggr_optimize_not.c ASC NULLS LAST, aggr_optimize_not.greptime_timestamp ASC NULLS LAST |
| | Aggregate: groupBy=[[aggr_optimize_not.a, aggr_optimize_not.b, aggr_optimize_not.c, aggr_optimize_not.greptime_timestamp]], aggr=[[sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))]] |
| | Filter: prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)) IS NOT NULL |
| | Projection: aggr_optimize_not.greptime_timestamp, prom_rate(greptime_timestamp_range, greptime_value, aggr_optimize_not.greptime_timestamp, Int64(120000)) AS prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)), aggr_optimize_not.a, aggr_optimize_not.b, aggr_optimize_not.c, aggr_optimize_not.d |
| | PromRangeManipulate: req range=[1752591864000..1752592164000], interval=[30000], eval range=[120000], time index=[greptime_timestamp], values=["greptime_value"] |
| | PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [true] |
| | PromSeriesDivide: tags=["a", "b", "c", "d"] |
| | Sort: aggr_optimize_not.a ASC NULLS FIRST, aggr_optimize_not.b ASC NULLS FIRST, aggr_optimize_not.c ASC NULLS FIRST, aggr_optimize_not.d ASC NULLS FIRST, aggr_optimize_not.greptime_timestamp ASC NULLS FIRST |
| | Filter: aggr_optimize_not.greptime_timestamp >= TimestampMillisecond(1752591744001, None) AND aggr_optimize_not.greptime_timestamp <= TimestampMillisecond(1752592164000, None) |
| | TableScan: aggr_optimize_not |
| | ]] |
| | SubqueryAlias: aggr_optimize_not_count |
| | Sort: aggr_optimize_not_count.a ASC NULLS LAST, aggr_optimize_not_count.b ASC NULLS LAST, aggr_optimize_not_count.c ASC NULLS LAST, aggr_optimize_not_count.greptime_timestamp ASC NULLS LAST |
| | Aggregate: groupBy=[[aggr_optimize_not_count.a, aggr_optimize_not_count.b, aggr_optimize_not_count.c, aggr_optimize_not_count.greptime_timestamp]], aggr=[[sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))]] |
| | Filter: prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)) IS NOT NULL |
| | Projection: aggr_optimize_not_count.greptime_timestamp, prom_rate(greptime_timestamp_range, greptime_value, aggr_optimize_not_count.greptime_timestamp, Int64(120000)) AS prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)), aggr_optimize_not_count.a, aggr_optimize_not_count.b, aggr_optimize_not_count.c |
| | PromRangeManipulate: req range=[1752591864000..1752592164000], interval=[30000], eval range=[120000], time index=[greptime_timestamp], values=["greptime_value"] |
| | PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [true] |
| | PromSeriesDivide: tags=["a", "b", "c", "d"] |
| | Sort: aggr_optimize_not_count.a ASC NULLS FIRST, aggr_optimize_not_count.b ASC NULLS FIRST, aggr_optimize_not_count.c ASC NULLS FIRST, aggr_optimize_not_count.d ASC NULLS FIRST, aggr_optimize_not_count.greptime_timestamp ASC NULLS FIRST |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | Filter: aggr_optimize_not_count.greptime_timestamp >= TimestampMillisecond(1752591744001, None) AND aggr_optimize_not_count.greptime_timestamp <= TimestampMillisecond(1752592164000, None) |
| | TableScan: aggr_optimize_not_count |
| | ]] |
| physical_plan | ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, greptime_timestamp@3 as greptime_timestamp, sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))@5 / sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))@4 as aggr_optimize_not.sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000))) / aggr_optimize_not_count.sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))] |
| | REDACTED
| | CoalescePartitionsExec |
| | AggregateExec: mode=SinglePartitioned, gby=[a@2 as a, b@3 as b, c@4 as c, greptime_timestamp@0 as greptime_timestamp], aggr=[sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))] |
| | FilterExec: prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000))@1 IS NOT NULL |
| | ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_rate(greptime_timestamp_range@6, greptime_value@5, greptime_timestamp@4, 120000) as prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)), a@0 as a, b@1 as b, c@2 as c] |
| | PromRangeManipulateExec: req range=[1752591864000..1752592164000], interval=[30000], eval range=[120000], time index=[greptime_timestamp] |
| | PromSeriesNormalizeExec: offset=[0], time index=[greptime_timestamp], filter NaN: [true] |
| | PromSeriesDivideExec: tags=["a", "b", "c", "d"] |
| | SortExec: expr=[a@0 ASC, b@1 ASC, c@2 ASC, d@3 ASC, greptime_timestamp@4 ASC], preserve_partitioning=[true] |
| | CoalescePartitionsExec |
| | AggregateExec: mode=SinglePartitioned, gby=[a@2 as a, b@3 as b, c@4 as c, greptime_timestamp@0 as greptime_timestamp], aggr=[sum(prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)))] |
| | FilterExec: prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000))@1 IS NOT NULL |
| | ProjectionExec: expr=[greptime_timestamp@4 as greptime_timestamp, prom_rate(greptime_timestamp_range@6, greptime_value@5, greptime_timestamp@4, 120000) as prom_rate(greptime_timestamp_range,greptime_value,greptime_timestamp,Int64(120000)), a@0 as a, b@1 as b, c@2 as c] |
| | PromRangeManipulateExec: req range=[1752591864000..1752592164000], interval=[30000], eval range=[120000], time index=[greptime_timestamp] |
| | PromSeriesNormalizeExec: offset=[0], time index=[greptime_timestamp], filter NaN: [true] |
| | PromSeriesDivideExec: tags=["a", "b", "c", "d"] |
| | SortExec: expr=[a@0 ASC, b@1 ASC, c@2 ASC, d@3 ASC, greptime_timestamp@4 ASC], preserve_partitioning=[true] |
| | MergeScanExec: REDACTED
| | SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST], preserve_partitioning=[true] |
| | CooperativeExec |
| | SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, greptime_timestamp@3 ASC NULLS LAST], preserve_partitioning=[true] |
| | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED

View File

@@ -0,0 +1,106 @@
CREATE TABLE phy (
t TIMESTAMP TIME INDEX,
v DOUBLE
) ENGINE=metric WITH ("physical_metric_table" = "");
Affected Rows: 0
CREATE TABLE metric_a (
l1 STRING NULL,
l2 STRING NULL,
l3 STRING NULL,
l4 STRING NULL,
l5 STRING NULL,
t TIMESTAMP NOT NULL,
v DOUBLE NULL,
TIME INDEX (t),
PRIMARY KEY (l1, l2, l3, l4, l5)
) ENGINE=metric WITH (on_physical_table = 'phy');
Affected Rows: 0
CREATE TABLE metric_b (
l6 STRING NULL,
l1 STRING NULL,
l2 STRING NULL,
l3 STRING NULL,
l4 STRING NULL,
t TIMESTAMP NOT NULL,
v DOUBLE NULL,
TIME INDEX (t),
PRIMARY KEY (l6, l1, l2, l3, l4)
) ENGINE=metric WITH (on_physical_table = 'phy');
Affected Rows: 0
INSERT INTO metric_a (l1, l2, l3, l4, l5, t, v) VALUES
('v1', 'v2', 'v3', 'v4a', 'v5a', 1, 0),
('v1', 'v2', 'v3', 'v4a', 'v5a', 180000, 120),
('v1', 'v2', 'v3', 'v4a', 'v5a', 360000, 240),
('v1', 'v2', 'v3', 'v4a', 'v5b', 1, 0),
('v1', 'v2', 'v3', 'v4a', 'v5b', 180000, 30),
('v1', 'v2', 'v3', 'v4a', 'v5b', 360000, 60),
('v1', 'v2', 'v3-b', 'v4b', 'v5c', 1, 0),
('v1', 'v2', 'v3-b', 'v4b', 'v5c', 180000, 60),
('v1', 'v2', 'v3-b', 'v4b', 'v5c', 360000, 120);
Affected Rows: 9
INSERT INTO metric_b (l6, l1, l2, l3, l4, t, v) VALUES
('v6', 'v1', 'v2', 'v3', 'v4a', 1, 1),
('v6', 'v1', 'v2', 'v3', 'v4a', 180000, 1),
('v6', 'v1', 'v2', 'v3', 'v4a', 360000, 1),
('v6', 'v1', 'v2', 'v3-b', 'v4b', 1, 2),
('v6', 'v1', 'v2', 'v3-b', 'v4b', 180000, 2),
('v6', 'v1', 'v2', 'v3-b', 'v4b', 360000, 2);
Affected Rows: 6
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (180, 360, '180s') count(((rate(metric_a{l1="v1",l2="v2",l3=~"v3(|-a|-b)"}[3m]) / on(l3,l4) group_left metric_b{l6="v6",l1="v1",l2="v2",l3=~"v3(|-a|-b)"}) > 0.50));
+---------------------+-------------------------------------------------------------------+
| t | count(metric_a.prom_rate(t_range,v,t,Int64(180000)) / metric_b.v) |
+---------------------+-------------------------------------------------------------------+
| 1970-01-01T00:03:00 | 1 |
+---------------------+-------------------------------------------------------------------+
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (180, 360, '180s') count(rate(metric_a{l1="v1",l2="v2",l3=~"v3(|-a|-b)"}[3m]));
+---------------------+---------------------------------------------+
| t | count(prom_rate(t_range,v,t,Int64(180000))) |
+---------------------+---------------------------------------------+
| 1970-01-01T00:03:00 | 3 |
+---------------------+---------------------------------------------+
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (180, 360, '180s') count(rate(metric_a{l1="v1",l2="v2",l3=~"v3(|-a|-b)"}[3m])) / 2;
+---------------------+----------------------------------------------------------+
| t | count(prom_rate(t_range,v,t,Int64(180000))) / Float64(2) |
+---------------------+----------------------------------------------------------+
| 1970-01-01T00:03:00 | 1.5 |
+---------------------+----------------------------------------------------------+
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (180, 360, '180s') (count(((rate(metric_a{l1="v1",l2="v2",l3=~"v3(|-a|-b)"}[3m]) / on(l3,l4) group_left metric_b{l6="v6",l1="v1",l2="v2",l3=~"v3(|-a|-b)"}) > 0.50)) / count(rate(metric_a{l1="v1",l2="v2",l3=~"v3(|-a|-b)"}[3m]))) * 100;
+---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------+
| t | metric_b.count(metric_a.prom_rate(t_range,v,t,Int64(180000)) / metric_b.v) / metric_a.count(prom_rate(t_range,v,t,Int64(180000))) * Float64(100) |
+---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------+
| 1970-01-01T00:03:00 | 33.33333333333333 |
+---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------+
DROP TABLE metric_a;
Affected Rows: 0
DROP TABLE metric_b;
Affected Rows: 0
DROP TABLE phy;
Affected Rows: 0

View File

@@ -0,0 +1,63 @@
CREATE TABLE phy (
t TIMESTAMP TIME INDEX,
v DOUBLE
) ENGINE=metric WITH ("physical_metric_table" = "");
CREATE TABLE metric_a (
l1 STRING NULL,
l2 STRING NULL,
l3 STRING NULL,
l4 STRING NULL,
l5 STRING NULL,
t TIMESTAMP NOT NULL,
v DOUBLE NULL,
TIME INDEX (t),
PRIMARY KEY (l1, l2, l3, l4, l5)
) ENGINE=metric WITH (on_physical_table = 'phy');
CREATE TABLE metric_b (
l6 STRING NULL,
l1 STRING NULL,
l2 STRING NULL,
l3 STRING NULL,
l4 STRING NULL,
t TIMESTAMP NOT NULL,
v DOUBLE NULL,
TIME INDEX (t),
PRIMARY KEY (l6, l1, l2, l3, l4)
) ENGINE=metric WITH (on_physical_table = 'phy');
INSERT INTO metric_a (l1, l2, l3, l4, l5, t, v) VALUES
('v1', 'v2', 'v3', 'v4a', 'v5a', 1, 0),
('v1', 'v2', 'v3', 'v4a', 'v5a', 180000, 120),
('v1', 'v2', 'v3', 'v4a', 'v5a', 360000, 240),
('v1', 'v2', 'v3', 'v4a', 'v5b', 1, 0),
('v1', 'v2', 'v3', 'v4a', 'v5b', 180000, 30),
('v1', 'v2', 'v3', 'v4a', 'v5b', 360000, 60),
('v1', 'v2', 'v3-b', 'v4b', 'v5c', 1, 0),
('v1', 'v2', 'v3-b', 'v4b', 'v5c', 180000, 60),
('v1', 'v2', 'v3-b', 'v4b', 'v5c', 360000, 120);
INSERT INTO metric_b (l6, l1, l2, l3, l4, t, v) VALUES
('v6', 'v1', 'v2', 'v3', 'v4a', 1, 1),
('v6', 'v1', 'v2', 'v3', 'v4a', 180000, 1),
('v6', 'v1', 'v2', 'v3', 'v4a', 360000, 1),
('v6', 'v1', 'v2', 'v3-b', 'v4b', 1, 2),
('v6', 'v1', 'v2', 'v3-b', 'v4b', 180000, 2),
('v6', 'v1', 'v2', 'v3-b', 'v4b', 360000, 2);
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (180, 360, '180s') count(((rate(metric_a{l1="v1",l2="v2",l3=~"v3(|-a|-b)"}[3m]) / on(l3,l4) group_left metric_b{l6="v6",l1="v1",l2="v2",l3=~"v3(|-a|-b)"}) > 0.50));
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (180, 360, '180s') count(rate(metric_a{l1="v1",l2="v2",l3=~"v3(|-a|-b)"}[3m]));
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (180, 360, '180s') count(rate(metric_a{l1="v1",l2="v2",l3=~"v3(|-a|-b)"}[3m])) / 2;
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (180, 360, '180s') (count(((rate(metric_a{l1="v1",l2="v2",l3=~"v3(|-a|-b)"}[3m]) / on(l3,l4) group_left metric_b{l6="v6",l1="v1",l2="v2",l3=~"v3(|-a|-b)"}) > 0.50)) / count(rate(metric_a{l1="v1",l2="v2",l3=~"v3(|-a|-b)"}[3m]))) * 100;
DROP TABLE metric_a;
DROP TABLE metric_b;
DROP TABLE phy;

View File

@@ -427,8 +427,8 @@ SELECT min(val) as min_computed, max(val) as max_computed FROM computed;
| | Aggregate: groupBy=[[]], aggr=[[min(computed.val), max(computed.val)]] |
| | SubqueryAlias: computed |
| | Projection: metric.ts AS ts, val * Float64(2) + Float64(1) AS val |
| | Projection: metric.ts, val * Float64(2) + Float64(1) AS val * Float64(2) + Float64(1) |
| | Projection: metric.ts, metric.val * Float64(2) AS val * Float64(2) |
| | Projection: metric.ts, CAST(val * Float64(2) AS Float64) + Float64(1) AS val * Float64(2) + Float64(1) |
| | Projection: metric.ts, CAST(metric.val AS Float64) * Float64(2) AS val * Float64(2) |
| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] |
| | PromSeriesDivide: tags=[] |
| | Filter: metric.ts >= TimestampMillisecond(-299999, None) AND metric.ts <= TimestampMillisecond(40000, None) |