diff --git a/src/common/function/src/scalars/math/rate.rs b/src/common/function/src/scalars/math/rate.rs index e296fb9496..7e5568bff4 100644 --- a/src/common/function/src/scalars/math/rate.rs +++ b/src/common/function/src/scalars/math/rate.rs @@ -37,7 +37,7 @@ impl fmt::Display for RateFunction { impl Function for RateFunction { fn name(&self) -> &str { - "prom_rate" + "rate" } fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result { diff --git a/src/promql/src/extension_plan/range_manipulate.rs b/src/promql/src/extension_plan/range_manipulate.rs index 1e1cac3555..7af75b0458 100644 --- a/src/promql/src/extension_plan/range_manipulate.rs +++ b/src/promql/src/extension_plan/range_manipulate.rs @@ -42,7 +42,7 @@ use greptime_proto::substrait_extension as pb; use prost::Message; use snafu::ResultExt; -use crate::error::{DataFusionPlanningSnafu, DeserializeSnafu, Result}; +use crate::error::{DeserializeSnafu, Result}; use crate::extension_plan::{Millisecond, METRIC_NUM_SERIES}; use crate::metrics::PROMQL_SERIES_COUNT; use crate::range_array::RangeArray; @@ -194,20 +194,26 @@ impl RangeManipulate { pub fn deserialize(bytes: &[u8]) -> Result { let pb_range_manipulate = pb::RangeManipulate::decode(bytes).context(DeserializeSnafu)?; + let empty_schema = Arc::new(DFSchema::empty()); let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, - schema: Arc::new(DFSchema::empty()), + schema: empty_schema.clone(), }); - Self::new( - pb_range_manipulate.start, - pb_range_manipulate.end, - pb_range_manipulate.interval, - pb_range_manipulate.range, - pb_range_manipulate.time_index, - pb_range_manipulate.tag_columns, - placeholder_plan, - ) - .context(DataFusionPlanningSnafu) + + // Unlike `Self::new()`, this method doesn't check the input schema as it will fail + // because the input schema is empty. + // But this is Ok since datafusion guarantees to call `with_exprs_and_inputs` for the + // deserialized plan. + Ok(Self { + start: pb_range_manipulate.start, + end: pb_range_manipulate.end, + interval: pb_range_manipulate.interval, + range: pb_range_manipulate.range, + time_index: pb_range_manipulate.time_index, + field_columns: pb_range_manipulate.tag_columns, + input: placeholder_plan, + output_schema: empty_schema, + }) } } @@ -270,14 +276,19 @@ impl UserDefinedLogicalNodeCore for RangeManipulate { fn with_exprs_and_inputs( &self, _exprs: Vec, - inputs: Vec, + mut inputs: Vec, ) -> DataFusionResult { - if inputs.is_empty() { + if inputs.len() != 1 { return Err(DataFusionError::Internal( - "RangeManipulate should have at least one input".to_string(), + "RangeManipulate should have at exact one input".to_string(), )); } + let input: LogicalPlan = inputs.pop().unwrap(); + let input_schema = input.schema(); + let output_schema = + Self::calculate_output_schema(input_schema, &self.time_index, &self.field_columns)?; + Ok(Self { start: self.start, end: self.end, @@ -285,8 +296,8 @@ impl UserDefinedLogicalNodeCore for RangeManipulate { range: self.range, time_index: self.time_index.clone(), field_columns: self.field_columns.clone(), - input: inputs.into_iter().next().unwrap(), - output_schema: self.output_schema.clone(), + input, + output_schema, }) } } diff --git a/tests/cases/standalone/common/tql/partition.result b/tests/cases/standalone/common/tql/partition.result new file mode 100644 index 0000000000..ebaf11129f --- /dev/null +++ b/tests/cases/standalone/common/tql/partition.result @@ -0,0 +1,164 @@ +-- no partition +create table t ( + i double, + j timestamp time index, + k string primary key +); + +Affected Rows: 0 + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100); + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_ProjectionExec: expr=[k@0 as k, j@1 as j, 100 - avg(prom_irate(j_range,i))@2 * 100 as Float64(100) - avg(prom_irate(j_range,i)) * Float64(100)] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_SortPreservingMergeExec: [k@0 ASC NULLS LAST, j@1 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[k@0 ASC NULLS LAST, j@1 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[k@0 as k, j@1 as j], aggr=[avg(prom_irate(j_range,i))] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_RepartitionExec: partitioning=Hash([k@0, j@1], 32), input_partitions=32 REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[k@2 as k, j@0 as j], aggr=[avg(prom_irate(j_range,i))] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_FilterExec: prom_irate(j_range,i)@1 IS NOT NULL REDACTED +|_|_|_ProjectionExec: expr=[j@1 as j, prom_irate(j_range@3, i@0) as prom_irate(j_range,i), k@2 as k] REDACTED +|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[1000], eval range=[60000], time index=[j] REDACTED +|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [true] REDACTED +|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED +|_|_|_SortExec: expr=[k@2 ASC], preserve_partitioning=[true] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_RepartitionExec: partitioning=Hash([k@2], 32), input_partitions=1 REDACTED +|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED +|_|_|_| +|_|_| Total rows: 0_| ++-+-+-+ + +drop table t; + +Affected Rows: 0 + +-- partition on tag +create table t ( + i double, + j timestamp time index, + k string, + l string, + primary key (k, l) +) partition on columns (k, l) (k < 'a', k >= 'a'); + +Affected Rows: 0 + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100); + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_ProjectionExec: expr=[k@0 as k, j@1 as j, 100 - avg(prom_irate(j_range,i))@2 * 100 as Float64(100) - avg(prom_irate(j_range,i)) * Float64(100)] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_SortPreservingMergeExec: [k@0 ASC NULLS LAST, j@1 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[k@0 ASC NULLS LAST, j@1 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[k@0 as k, j@1 as j], aggr=[avg(prom_irate(j_range,i))], ordering_mode=PartiallySorted([0]) REDACTED +|_|_|_SortExec: expr=[k@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_RepartitionExec: partitioning=Hash([k@0, j@1], 32), input_partitions=32 REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[k@2 as k, j@0 as j], aggr=[avg(prom_irate(j_range,i))], ordering_mode=PartiallySorted([0]) REDACTED +|_|_|_ProjectionExec: expr=[j@0 as j, prom_irate(j_range,i)@1 as prom_irate(j_range,i), k@2 as k] REDACTED +|_|_|_SortExec: expr=[k@2 ASC, l@3 ASC, j@0 ASC], preserve_partitioning=[true] REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_FilterExec: prom_irate(j_range,i)@1 IS NOT NULL REDACTED +|_|_|_ProjectionExec: expr=[j@1 as j, prom_irate(j_range@4, i@0) as prom_irate(j_range,i), k@2 as k, l@3 as l] REDACTED +|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[1000], eval range=[60000], time index=[j] REDACTED +|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [true] REDACTED +|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED +|_|_|_SortExec: expr=[k@2 ASC, l@3 ASC], preserve_partitioning=[true] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_RepartitionExec: partitioning=Hash([k@2, l@3], 32), input_partitions=1 REDACTED +|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED +|_|_|_| +| 1_| 1_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_FilterExec: prom_irate(j_range,i)@1 IS NOT NULL REDACTED +|_|_|_ProjectionExec: expr=[j@1 as j, prom_irate(j_range@4, i@0) as prom_irate(j_range,i), k@2 as k, l@3 as l] REDACTED +|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[1000], eval range=[60000], time index=[j] REDACTED +|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [true] REDACTED +|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED +|_|_|_SortExec: expr=[k@2 ASC, l@3 ASC], preserve_partitioning=[true] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_RepartitionExec: partitioning=Hash([k@2, l@3], 32), input_partitions=1 REDACTED +|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED +|_|_|_| +|_|_| Total rows: 0_| ++-+-+-+ + +drop table t; + +Affected Rows: 0 + +-- partition on value +create table t ( + i double, + j timestamp time index, + k string, + l string, + primary key (k, l) +) partition on columns (i) (i < 1.0, i >= 1.0); + +Affected Rows: 0 + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100); + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_ProjectionExec: expr=[k@0 as k, j@1 as j, 100 - avg(prom_irate(j_range,i))@2 * 100 as Float64(100) - avg(prom_irate(j_range,i)) * Float64(100)] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_SortPreservingMergeExec: [k@0 ASC NULLS LAST, j@1 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[k@0 ASC NULLS LAST, j@1 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[k@0 as k, j@1 as j], aggr=[avg(prom_irate(j_range,i))] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_RepartitionExec: partitioning=Hash([k@0, j@1], 32), input_partitions=32 REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[k@2 as k, j@0 as j], aggr=[avg(prom_irate(j_range,i))] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_FilterExec: prom_irate(j_range,i)@1 IS NOT NULL REDACTED +|_|_|_ProjectionExec: expr=[j@1 as j, prom_irate(j_range@4, i@0) as prom_irate(j_range,i), k@2 as k] REDACTED +|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[1000], eval range=[60000], time index=[j] REDACTED +|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [true] REDACTED +|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED +|_|_|_SortExec: expr=[k@2 ASC, l@3 ASC], preserve_partitioning=[true] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_RepartitionExec: partitioning=Hash([k@2, l@3], 32), input_partitions=32 REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED +|_|_|_| +| 1_| 1_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED +|_|_|_| +|_|_| Total rows: 0_| ++-+-+-+ + +drop table t; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/tql/partition.sql b/tests/cases/standalone/common/tql/partition.sql new file mode 100644 index 0000000000..442c0a34c8 --- /dev/null +++ b/tests/cases/standalone/common/tql/partition.sql @@ -0,0 +1,54 @@ +-- no partition +create table t ( + i double, + j timestamp time index, + k string primary key +); + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100); + +drop table t; + +-- partition on tag +create table t ( + i double, + j timestamp time index, + k string, + l string, + primary key (k, l) +) partition on columns (k, l) (k < 'a', k >= 'a'); + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100); + +drop table t; + +-- partition on value +create table t ( + i double, + j timestamp time index, + k string, + l string, + primary key (k, l) +) partition on columns (i) (i < 1.0, i >= 1.0); + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100); + +drop table t;