diff --git a/tests-integration/src/tests/promql_test.rs b/tests-integration/src/tests/promql_test.rs index 7fbce91ea6..ede4663118 100644 --- a/tests-integration/src/tests/promql_test.rs +++ b/tests-integration/src/tests/promql_test.rs @@ -15,7 +15,9 @@ use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use common_query::Output; +use common_query::{Output, OutputData}; +use common_recordbatch::util::collect_batches; +use datatypes::arrow::array::{Float64Array, Int64Array}; use frontend::instance::Instance; use query::parser::{PromQuery, QueryLanguageParser, QueryStatement}; use rstest::rstest; @@ -151,6 +153,103 @@ async fn create_insert_tql_assert( check_unordered_output_stream(query_output, expected).await; } +async fn execute_all(instance: &Arc, sql: &str, query_ctx: Arc) { + instance + .do_query(sql, query_ctx) + .await + .into_iter() + .for_each(|v| { + let _ = v.unwrap(); + }); +} + +#[allow(clippy::too_many_arguments)] +async fn promql_query_as_batches( + ins: Arc, + promql: &str, + alias: Option, + query_ctx: Arc, + start: SystemTime, + end: SystemTime, + interval: Duration, + lookback: Duration, +) -> common_recordbatch::RecordBatches { + let output = promql_query( + ins, promql, alias, query_ctx, start, end, interval, lookback, + ) + .await + .unwrap(); + match output.data { + OutputData::Stream(stream) => collect_batches(stream).await.unwrap(), + OutputData::RecordBatches(recordbatches) => recordbatches, + _ => unreachable!(), + } +} + +const ANON_PROMQL_RATIO_REPRO_DB: &str = "repro_db"; + +const ANON_PROMQL_RATIO_REPRO_CREATE: &str = r#" +CREATE TABLE phy ( + t TIMESTAMP TIME INDEX, + v DOUBLE +) ENGINE=metric WITH ("physical_metric_table" = ""); + +CREATE TABLE metric_a ( + l1 STRING NULL, + l2 STRING NULL, + l3 STRING NULL, + l4 STRING NULL, + l5 STRING NULL, + t TIMESTAMP NOT NULL, + v DOUBLE NULL, + TIME INDEX (t), + PRIMARY KEY (l1, l2, l3, l4, l5) +) ENGINE=metric WITH (on_physical_table = 'phy'); + +CREATE TABLE metric_b ( + l6 STRING NULL, + l1 STRING NULL, + l2 STRING NULL, + l3 STRING NULL, + l4 STRING NULL, + t TIMESTAMP NOT NULL, + v DOUBLE NULL, + TIME INDEX (t), + PRIMARY KEY (l6, l1, l2, l3, l4) +) ENGINE=metric WITH (on_physical_table = 'phy'); +"#; + +const ANON_PROMQL_RATIO_REPRO_INSERT: &str = r#" +INSERT INTO metric_a (l1, l2, l3, l4, l5, t, v) VALUES + ('v1', 'v2', 'v3', 'v4a', 'v5a', 1, 0), + ('v1', 'v2', 'v3', 'v4a', 'v5a', 180000, 120), + ('v1', 'v2', 'v3', 'v4a', 'v5a', 360000, 240), + ('v1', 'v2', 'v3', 'v4a', 'v5b', 1, 0), + ('v1', 'v2', 'v3', 'v4a', 'v5b', 180000, 30), + ('v1', 'v2', 'v3', 'v4a', 'v5b', 360000, 60), + ('v1', 'v2', 'v3-b', 'v4b', 'v5c', 1, 0), + ('v1', 'v2', 'v3-b', 'v4b', 'v5c', 180000, 60), + ('v1', 'v2', 'v3-b', 'v4b', 'v5c', 360000, 120); + +INSERT INTO metric_b (l6, l1, l2, l3, l4, t, v) VALUES + ('v6', 'v1', 'v2', 'v3', 'v4a', 1, 1), + ('v6', 'v1', 'v2', 'v3', 'v4a', 180000, 1), + ('v6', 'v1', 'v2', 'v3', 'v4a', 360000, 1), + ('v6', 'v1', 'v2', 'v3-b', 'v4b', 1, 2), + ('v6', 'v1', 'v2', 'v3-b', 'v4b', 180000, 2), + ('v6', 'v1', 'v2', 'v3-b', 'v4b', 360000, 2); +"#; + +const ANON_PROMQL_RATIO_REPRO_NUMERATOR: &str = r#"count(((rate(metric_a{l1="v1",l2="v2",l3=~"v3(|-a|-b)",__schema__="repro_db"}[3m]) / on(l3,l4) group_left metric_b{l6="v6",l1="v1",l2="v2",l3=~"v3(|-a|-b)",__schema__="repro_db"}) > 0.50))"#; + +const ANON_PROMQL_RATIO_REPRO_DENOMINATOR: &str = + r#"count(rate(metric_a{l1="v1",l2="v2",l3=~"v3(|-a|-b)",__schema__="repro_db"}[3m]))"#; + +const ANON_PROMQL_RATIO_REPRO_WHOLE: &str = r#"(count(((rate(metric_a{l1="v1",l2="v2",l3=~"v3(|-a|-b)",__schema__="repro_db"}[3m]) / on(l3,l4) group_left metric_b{l6="v6",l1="v1",l2="v2",l3=~"v3(|-a|-b)",__schema__="repro_db"}) > 0.50)) / count(rate(metric_a{l1="v1",l2="v2",l3=~"v3(|-a|-b)",__schema__="repro_db"}[3m]))) * 100"#; + +const ANON_PROMQL_RATIO_REPRO_SCALAR_DIV: &str = + r#"count(rate(metric_a{l1="v1",l2="v2",l3=~"v3(|-a|-b)",__schema__="repro_db"}[3m])) / 2"#; + #[apply(both_instances_cases)] async fn sql_insert_tql_query_ceil(instance: Arc) { let instance = instance.frontend(); @@ -709,3 +808,140 @@ async fn cross_schema_query(instance: Arc) { check_unordered_output_stream(query_output, expected).await; } + +#[apply(both_instances_cases)] +async fn anon_promql_ratio_repro(instance: Arc) { + let ins = instance.frontend(); + + execute_all( + &ins, + &format!("CREATE DATABASE {ANON_PROMQL_RATIO_REPRO_DB}"), + QueryContext::arc(), + ) + .await; + + let repro_ctx: Arc = + QueryContext::with_db_name(Some(ANON_PROMQL_RATIO_REPRO_DB)).into(); + execute_all(&ins, ANON_PROMQL_RATIO_REPRO_CREATE, repro_ctx.clone()).await; + execute_all(&ins, ANON_PROMQL_RATIO_REPRO_INSERT, repro_ctx).await; + + let start = UNIX_EPOCH.checked_add(Duration::from_secs(180)).unwrap(); + let end = UNIX_EPOCH.checked_add(Duration::from_secs(360)).unwrap(); + let interval = Duration::from_secs(180); + let lookback = Duration::from_secs(1); + + let numerator = promql_query_as_batches( + ins.clone(), + ANON_PROMQL_RATIO_REPRO_NUMERATOR, + Some("num".to_string()), + QueryContext::arc(), + start, + end, + interval, + lookback, + ) + .await; + let denominator = promql_query_as_batches( + ins.clone(), + ANON_PROMQL_RATIO_REPRO_DENOMINATOR, + Some("den".to_string()), + QueryContext::arc(), + start, + end, + interval, + lookback, + ) + .await; + let whole = promql_query_as_batches( + ins.clone(), + ANON_PROMQL_RATIO_REPRO_WHOLE, + Some("pct".to_string()), + QueryContext::arc(), + start, + end, + interval, + lookback, + ) + .await; + let scalar_div = promql_query_as_batches( + ins, + ANON_PROMQL_RATIO_REPRO_SCALAR_DIV, + Some("half_den".to_string()), + QueryContext::arc(), + start, + end, + interval, + lookback, + ) + .await; + + let numerator = numerator.iter().collect::>(); + let denominator = denominator.iter().collect::>(); + let whole = whole.iter().collect::>(); + let scalar_div = scalar_div.iter().collect::>(); + + let numerator_values = numerator[0] + .column_by_name("num") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let denominator_values = denominator[0] + .column_by_name("den") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let percentage_values = whole[0] + .column_by_name("pct") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let scalar_div_values = scalar_div[0] + .column_by_name("half_den") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!(numerator_values.len(), 1, "{}", numerator[0].pretty_print()); + assert_eq!( + denominator_values.len(), + 1, + "{}", + denominator[0].pretty_print() + ); + assert_eq!(percentage_values.len(), 1, "{}", whole[0].pretty_print()); + assert_eq!( + scalar_div_values.len(), + 1, + "{}", + scalar_div[0].pretty_print() + ); + + assert_eq!( + numerator_values.value(0), + 1, + "{}", + numerator[0].pretty_print() + ); + assert_eq!( + denominator_values.value(0), + 3, + "{}", + denominator[0].pretty_print() + ); + assert!( + (scalar_div_values.value(0) - 1.5).abs() < 1e-9, + "{}", + scalar_div[0].pretty_print() + ); + + let expected = 100.0 / 3.0; + assert!( + (percentage_values.value(0) - expected).abs() < 1e-9, + "{}", + whole[0].pretty_print() + ); +}