mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
fix range manipulate deserializer
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
@@ -37,7 +37,7 @@ impl fmt::Display for RateFunction {
|
||||
|
||||
impl Function for RateFunction {
|
||||
fn name(&self) -> &str {
|
||||
"prom_rate"
|
||||
"rate"
|
||||
}
|
||||
|
||||
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
|
||||
|
||||
@@ -42,7 +42,7 @@ use greptime_proto::substrait_extension as pb;
|
||||
use prost::Message;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{DataFusionPlanningSnafu, DeserializeSnafu, Result};
|
||||
use crate::error::{DeserializeSnafu, Result};
|
||||
use crate::extension_plan::{Millisecond, METRIC_NUM_SERIES};
|
||||
use crate::metrics::PROMQL_SERIES_COUNT;
|
||||
use crate::range_array::RangeArray;
|
||||
@@ -194,20 +194,26 @@ impl RangeManipulate {
|
||||
|
||||
pub fn deserialize(bytes: &[u8]) -> Result<Self> {
|
||||
let pb_range_manipulate = pb::RangeManipulate::decode(bytes).context(DeserializeSnafu)?;
|
||||
let empty_schema = Arc::new(DFSchema::empty());
|
||||
let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
|
||||
produce_one_row: false,
|
||||
schema: Arc::new(DFSchema::empty()),
|
||||
schema: empty_schema.clone(),
|
||||
});
|
||||
Self::new(
|
||||
pb_range_manipulate.start,
|
||||
pb_range_manipulate.end,
|
||||
pb_range_manipulate.interval,
|
||||
pb_range_manipulate.range,
|
||||
pb_range_manipulate.time_index,
|
||||
pb_range_manipulate.tag_columns,
|
||||
placeholder_plan,
|
||||
)
|
||||
.context(DataFusionPlanningSnafu)
|
||||
|
||||
// Unlike `Self::new()`, this method doesn't check the input schema as it will fail
|
||||
// because the input schema is empty.
|
||||
// But this is Ok since datafusion guarantees to call `with_exprs_and_inputs` for the
|
||||
// deserialized plan.
|
||||
Ok(Self {
|
||||
start: pb_range_manipulate.start,
|
||||
end: pb_range_manipulate.end,
|
||||
interval: pb_range_manipulate.interval,
|
||||
range: pb_range_manipulate.range,
|
||||
time_index: pb_range_manipulate.time_index,
|
||||
field_columns: pb_range_manipulate.tag_columns,
|
||||
input: placeholder_plan,
|
||||
output_schema: empty_schema,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -270,14 +276,19 @@ impl UserDefinedLogicalNodeCore for RangeManipulate {
|
||||
fn with_exprs_and_inputs(
|
||||
&self,
|
||||
_exprs: Vec<Expr>,
|
||||
inputs: Vec<LogicalPlan>,
|
||||
mut inputs: Vec<LogicalPlan>,
|
||||
) -> DataFusionResult<Self> {
|
||||
if inputs.is_empty() {
|
||||
if inputs.len() != 1 {
|
||||
return Err(DataFusionError::Internal(
|
||||
"RangeManipulate should have at least one input".to_string(),
|
||||
"RangeManipulate should have at exact one input".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
let input: LogicalPlan = inputs.pop().unwrap();
|
||||
let input_schema = input.schema();
|
||||
let output_schema =
|
||||
Self::calculate_output_schema(input_schema, &self.time_index, &self.field_columns)?;
|
||||
|
||||
Ok(Self {
|
||||
start: self.start,
|
||||
end: self.end,
|
||||
@@ -285,8 +296,8 @@ impl UserDefinedLogicalNodeCore for RangeManipulate {
|
||||
range: self.range,
|
||||
time_index: self.time_index.clone(),
|
||||
field_columns: self.field_columns.clone(),
|
||||
input: inputs.into_iter().next().unwrap(),
|
||||
output_schema: self.output_schema.clone(),
|
||||
input,
|
||||
output_schema,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
164
tests/cases/standalone/common/tql/partition.result
Normal file
164
tests/cases/standalone/common/tql/partition.result
Normal file
@@ -0,0 +1,164 @@
|
||||
-- no partition
|
||||
create table t (
|
||||
i double,
|
||||
j timestamp time index,
|
||||
k string primary key
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100);
|
||||
|
||||
+-+-+-+
|
||||
| stage | node | plan_|
|
||||
+-+-+-+
|
||||
| 0_| 0_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_ProjectionExec: expr=[k@0 as k, j@1 as j, 100 - avg(prom_irate(j_range,i))@2 * 100 as Float64(100) - avg(prom_irate(j_range,i)) * Float64(100)] REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
|_|_|_SortPreservingMergeExec: [k@0 ASC NULLS LAST, j@1 ASC NULLS LAST] REDACTED
|
||||
|_|_|_SortExec: expr=[k@0 ASC NULLS LAST, j@1 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[k@0 as k, j@1 as j], aggr=[avg(prom_irate(j_range,i))] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=Hash([k@0, j@1], 32), input_partitions=32 REDACTED
|
||||
|_|_|_AggregateExec: mode=Partial, gby=[k@2 as k, j@0 as j], aggr=[avg(prom_irate(j_range,i))] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_FilterExec: prom_irate(j_range,i)@1 IS NOT NULL REDACTED
|
||||
|_|_|_ProjectionExec: expr=[j@1 as j, prom_irate(j_range@3, i@0) as prom_irate(j_range,i), k@2 as k] REDACTED
|
||||
|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[1000], eval range=[60000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [true] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|
||||
|_|_|_SortExec: expr=[k@2 ASC], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=Hash([k@2], 32), input_partitions=1 REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 0_|
|
||||
+-+-+-+
|
||||
|
||||
drop table t;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- partition on tag
|
||||
create table t (
|
||||
i double,
|
||||
j timestamp time index,
|
||||
k string,
|
||||
l string,
|
||||
primary key (k, l)
|
||||
) partition on columns (k, l) (k < 'a', k >= 'a');
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100);
|
||||
|
||||
+-+-+-+
|
||||
| stage | node | plan_|
|
||||
+-+-+-+
|
||||
| 0_| 0_|_ProjectionExec: expr=[k@0 as k, j@1 as j, 100 - avg(prom_irate(j_range,i))@2 * 100 as Float64(100) - avg(prom_irate(j_range,i)) * Float64(100)] REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
|_|_|_SortPreservingMergeExec: [k@0 ASC NULLS LAST, j@1 ASC NULLS LAST] REDACTED
|
||||
|_|_|_SortExec: expr=[k@0 ASC NULLS LAST, j@1 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[k@0 as k, j@1 as j], aggr=[avg(prom_irate(j_range,i))], ordering_mode=PartiallySorted([0]) REDACTED
|
||||
|_|_|_SortExec: expr=[k@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=Hash([k@0, j@1], 32), input_partitions=32 REDACTED
|
||||
|_|_|_AggregateExec: mode=Partial, gby=[k@2 as k, j@0 as j], aggr=[avg(prom_irate(j_range,i))], ordering_mode=PartiallySorted([0]) REDACTED
|
||||
|_|_|_ProjectionExec: expr=[j@0 as j, prom_irate(j_range,i)@1 as prom_irate(j_range,i), k@2 as k] REDACTED
|
||||
|_|_|_SortExec: expr=[k@2 ASC, l@3 ASC, j@0 ASC], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_FilterExec: prom_irate(j_range,i)@1 IS NOT NULL REDACTED
|
||||
|_|_|_ProjectionExec: expr=[j@1 as j, prom_irate(j_range@4, i@0) as prom_irate(j_range,i), k@2 as k, l@3 as l] REDACTED
|
||||
|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[1000], eval range=[60000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [true] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
||||
|_|_|_SortExec: expr=[k@2 ASC, l@3 ASC], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=Hash([k@2, l@3], 32), input_partitions=1 REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 1_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_FilterExec: prom_irate(j_range,i)@1 IS NOT NULL REDACTED
|
||||
|_|_|_ProjectionExec: expr=[j@1 as j, prom_irate(j_range@4, i@0) as prom_irate(j_range,i), k@2 as k, l@3 as l] REDACTED
|
||||
|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[1000], eval range=[60000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [true] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
||||
|_|_|_SortExec: expr=[k@2 ASC, l@3 ASC], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=Hash([k@2, l@3], 32), input_partitions=1 REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 0_|
|
||||
+-+-+-+
|
||||
|
||||
drop table t;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- partition on value
|
||||
create table t (
|
||||
i double,
|
||||
j timestamp time index,
|
||||
k string,
|
||||
l string,
|
||||
primary key (k, l)
|
||||
) partition on columns (i) (i < 1.0, i >= 1.0);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100);
|
||||
|
||||
+-+-+-+
|
||||
| stage | node | plan_|
|
||||
+-+-+-+
|
||||
| 0_| 0_|_ProjectionExec: expr=[k@0 as k, j@1 as j, 100 - avg(prom_irate(j_range,i))@2 * 100 as Float64(100) - avg(prom_irate(j_range,i)) * Float64(100)] REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
|_|_|_SortPreservingMergeExec: [k@0 ASC NULLS LAST, j@1 ASC NULLS LAST] REDACTED
|
||||
|_|_|_SortExec: expr=[k@0 ASC NULLS LAST, j@1 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[k@0 as k, j@1 as j], aggr=[avg(prom_irate(j_range,i))] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=Hash([k@0, j@1], 32), input_partitions=32 REDACTED
|
||||
|_|_|_AggregateExec: mode=Partial, gby=[k@2 as k, j@0 as j], aggr=[avg(prom_irate(j_range,i))] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_FilterExec: prom_irate(j_range,i)@1 IS NOT NULL REDACTED
|
||||
|_|_|_ProjectionExec: expr=[j@1 as j, prom_irate(j_range@4, i@0) as prom_irate(j_range,i), k@2 as k] REDACTED
|
||||
|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[1000], eval range=[60000], time index=[j] REDACTED
|
||||
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [true] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED
|
||||
|_|_|_SortExec: expr=[k@2 ASC, l@3 ASC], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=Hash([k@2, l@3], 32), input_partitions=32 REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 1_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges), distribution=PerSeries REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 0_|
|
||||
+-+-+-+
|
||||
|
||||
drop table t;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
54
tests/cases/standalone/common/tql/partition.sql
Normal file
54
tests/cases/standalone/common/tql/partition.sql
Normal file
@@ -0,0 +1,54 @@
|
||||
-- no partition
|
||||
create table t (
|
||||
i double,
|
||||
j timestamp time index,
|
||||
k string primary key
|
||||
);
|
||||
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100);
|
||||
|
||||
drop table t;
|
||||
|
||||
-- partition on tag
|
||||
create table t (
|
||||
i double,
|
||||
j timestamp time index,
|
||||
k string,
|
||||
l string,
|
||||
primary key (k, l)
|
||||
) partition on columns (k, l) (k < 'a', k >= 'a');
|
||||
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100);
|
||||
|
||||
drop table t;
|
||||
|
||||
-- partition on value
|
||||
create table t (
|
||||
i double,
|
||||
j timestamp time index,
|
||||
k string,
|
||||
l string,
|
||||
primary key (k, l)
|
||||
) partition on columns (i) (i < 1.0, i >= 1.0);
|
||||
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
tql analyze (0, 10, '1s') 100 - (avg by (k) (irate(t[1m])) * 100);
|
||||
|
||||
drop table t;
|
||||
Reference in New Issue
Block a user