diff --git a/Cargo.lock b/Cargo.lock index d15f70b631..14f8089da7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5328,7 +5328,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=69a6089933daa573c96808ec4bbc48f447ec6e8c#69a6089933daa573c96808ec4bbc48f447ec6e8c" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=72a0d22e0f5f716b2ee21bca091f87a88c36e5ca#72a0d22e0f5f716b2ee21bca091f87a88c36e5ca" dependencies = [ "prost 0.13.5", "prost-types 0.13.5", diff --git a/Cargo.toml b/Cargo.toml index f500f70b0e..50e286195c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -147,7 +147,7 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "69a6089933daa573c96808ec4bbc48f447ec6e8c" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "72a0d22e0f5f716b2ee21bca091f87a88c36e5ca" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/promql/src/extension_plan.rs b/src/promql/src/extension_plan.rs index 7aaeb83484..a72ddbeaaf 100644 --- a/src/promql/src/extension_plan.rs +++ b/src/promql/src/extension_plan.rs @@ -27,6 +27,8 @@ mod union_distinct_on; pub use absent::{Absent, AbsentExec, AbsentStream}; use datafusion::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType}; +use datafusion::common::DFSchemaRef; +use datafusion::error::{DataFusionError, Result as DataFusionResult}; pub use empty_metric::{EmptyMetric, EmptyMetricExec, EmptyMetricStream, build_special_time_expr}; pub use histogram_fold::{HistogramFold, HistogramFoldExec, HistogramFoldStream}; pub use instant_manipulate::{InstantManipulate, InstantManipulateExec, InstantManipulateStream}; @@ -40,3 +42,44 @@ pub use union_distinct_on::{UnionDistinctOn, UnionDistinctOnExec, UnionDistinctO pub type Millisecond = ::Native; const METRIC_NUM_SERIES: &str = "num_series"; + +/// Utilities for handling unfix logic in extension plans +/// Convert column name to index for serialization +pub fn serialize_column_index(schema: &DFSchemaRef, column_name: &str) -> u64 { + schema + .index_of_column_by_name(None, column_name) + .map(|idx| idx as u64) + .unwrap_or(u64::MAX) // make sure if not found, it will report error in deserialization +} + +/// Convert index back to column name for deserialization +pub fn resolve_column_name( + index: u64, + schema: &DFSchemaRef, + context: &str, + column_type: &str, +) -> DataFusionResult { + let columns = schema.columns(); + columns + .get(index as usize) + .ok_or_else(|| { + DataFusionError::Internal(format!( + "Failed to get {} column at idx {} during unfixing {} with columns:{:?}", + column_type, index, context, columns + )) + }) + .map(|field| field.name().to_string()) +} + +/// Batch process multiple column indices +pub fn resolve_column_names( + indices: &[u64], + schema: &DFSchemaRef, + context: &str, + column_type: &str, +) -> DataFusionResult> { + indices + .iter() + .map(|idx| resolve_column_name(*idx, schema, context, column_type)) + .collect() +} diff --git a/src/promql/src/extension_plan/absent.rs b/src/promql/src/extension_plan/absent.rs index 843f9a468f..2c01a6f570 100644 --- a/src/promql/src/extension_plan/absent.rs +++ b/src/promql/src/extension_plan/absent.rs @@ -47,7 +47,7 @@ use prost::Message; use snafu::ResultExt; use crate::error::DeserializeSnafu; -use crate::extension_plan::Millisecond; +use crate::extension_plan::{Millisecond, resolve_column_name, serialize_column_index}; /// Maximum number of rows per output batch const ABSENT_BATCH_SIZE: usize = 8192; @@ -62,6 +62,13 @@ pub struct Absent { fake_labels: Vec<(String, String)>, input: LogicalPlan, output_schema: DFSchemaRef, + unfix: Option, +} + +#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)] +struct UnfixIndices { + pub time_index_column_idx: u64, + pub value_column_idx: u64, } impl PartialOrd for Absent { @@ -122,16 +129,44 @@ impl UserDefinedLogicalNodeCore for Absent { )); } - Ok(Self { - start: self.start, - end: self.end, - step: self.step, - time_index_column: self.time_index_column.clone(), - value_column: self.value_column.clone(), - fake_labels: self.fake_labels.clone(), - input: inputs[0].clone(), - output_schema: self.output_schema.clone(), - }) + let input: LogicalPlan = inputs[0].clone(); + let input_schema = input.schema(); + + if let Some(unfix) = &self.unfix { + // transform indices to names + let time_index_column = resolve_column_name( + unfix.time_index_column_idx, + input_schema, + "Absent", + "time index", + )?; + + let value_column = + resolve_column_name(unfix.value_column_idx, input_schema, "Absent", "value")?; + + // Recreate output schema with actual field names + Self::try_new( + self.start, + self.end, + self.step, + time_index_column, + value_column, + self.fake_labels.clone(), + input, + ) + } else { + Ok(Self { + start: self.start, + end: self.end, + step: self.step, + time_index_column: self.time_index_column.clone(), + value_column: self.value_column.clone(), + fake_labels: self.fake_labels.clone(), + input, + output_schema: self.output_schema.clone(), + unfix: None, + }) + } } } @@ -179,6 +214,7 @@ impl Absent { fake_labels, input, output_schema, + unfix: None, }) } @@ -209,12 +245,17 @@ impl Absent { } pub fn serialize(&self) -> Vec { + let time_index_column_idx = + serialize_column_index(self.input.schema(), &self.time_index_column); + + let value_column_idx = serialize_column_index(self.input.schema(), &self.value_column); + pb::Absent { start: self.start, end: self.end, step: self.step, - time_index_column: self.time_index_column.clone(), - value_column: self.value_column.clone(), + time_index_column_idx, + value_column_idx, fake_labels: self .fake_labels .iter() @@ -223,6 +264,7 @@ impl Absent { value: value.clone(), }) .collect(), + ..Default::default() } .encode_to_vec() } @@ -233,19 +275,27 @@ impl Absent { produce_one_row: false, schema: Arc::new(DFSchema::empty()), }); - Self::try_new( - pb_absent.start, - pb_absent.end, - pb_absent.step, - pb_absent.time_index_column, - pb_absent.value_column, - pb_absent + + let unfix = UnfixIndices { + time_index_column_idx: pb_absent.time_index_column_idx, + value_column_idx: pb_absent.value_column_idx, + }; + + Ok(Self { + start: pb_absent.start, + end: pb_absent.end, + step: pb_absent.step, + time_index_column: String::new(), + value_column: String::new(), + fake_labels: pb_absent .fake_labels .iter() .map(|label| (label.key.clone(), label.value.clone())) .collect(), - placeholder_plan, - ) + input: placeholder_plan, + output_schema: Arc::new(DFSchema::empty()), + unfix: Some(unfix), + }) } } diff --git a/src/promql/src/extension_plan/instant_manipulate.rs b/src/promql/src/extension_plan/instant_manipulate.rs index 110a926399..aa4cd6d184 100644 --- a/src/promql/src/extension_plan/instant_manipulate.rs +++ b/src/promql/src/extension_plan/instant_manipulate.rs @@ -41,7 +41,9 @@ use prost::Message; use snafu::ResultExt; use crate::error::{DeserializeSnafu, Result}; -use crate::extension_plan::{METRIC_NUM_SERIES, Millisecond}; +use crate::extension_plan::{ + METRIC_NUM_SERIES, Millisecond, resolve_column_name, serialize_column_index, +}; use crate::metrics::PROMQL_SERIES_COUNT; /// Manipulate the input record batch to make it suitable for Instant Operator. @@ -59,6 +61,13 @@ pub struct InstantManipulate { /// A optional column for validating staleness field_column: Option, input: LogicalPlan, + unfix: Option, +} + +#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)] +struct UnfixIndices { + pub time_index_idx: u64, + pub field_index_idx: u64, } impl UserDefinedLogicalNodeCore for InstantManipulate { @@ -97,15 +106,51 @@ impl UserDefinedLogicalNodeCore for InstantManipulate { )); } - Ok(Self { - start: self.start, - end: self.end, - lookback_delta: self.lookback_delta, - interval: self.interval, - time_index_column: self.time_index_column.clone(), - field_column: self.field_column.clone(), - input: inputs.into_iter().next().unwrap(), - }) + let input: LogicalPlan = inputs.into_iter().next().unwrap(); + let input_schema = input.schema(); + + if let Some(unfix) = &self.unfix { + // transform indices to names + let time_index_column = resolve_column_name( + unfix.time_index_idx, + input_schema, + "InstantManipulate", + "time index", + )?; + + let field_column = if unfix.field_index_idx == u64::MAX { + None + } else { + Some(resolve_column_name( + unfix.field_index_idx, + input_schema, + "InstantManipulate", + "field", + )?) + }; + + Ok(Self { + start: self.start, + end: self.end, + lookback_delta: self.lookback_delta, + interval: self.interval, + time_index_column, + field_column, + input, + unfix: None, + }) + } else { + Ok(Self { + start: self.start, + end: self.end, + lookback_delta: self.lookback_delta, + interval: self.interval, + time_index_column: self.time_index_column.clone(), + field_column: self.field_column.clone(), + input, + unfix: None, + }) + } } } @@ -127,6 +172,7 @@ impl InstantManipulate { time_index_column, field_column, input, + unfix: None, } } @@ -148,13 +194,22 @@ impl InstantManipulate { } pub fn serialize(&self) -> Vec { + let time_index_idx = serialize_column_index(self.input.schema(), &self.time_index_column); + + let field_index_idx = self + .field_column + .as_ref() + .map(|name| serialize_column_index(self.input.schema(), name)) + .unwrap_or(u64::MAX); + pb::InstantManipulate { start: self.start, end: self.end, interval: self.interval, lookback_delta: self.lookback_delta, - time_index: self.time_index_column.clone(), - field_index: self.field_column.clone().unwrap_or_default(), + time_index_idx, + field_index_idx, + ..Default::default() } .encode_to_vec() } @@ -166,19 +221,21 @@ impl InstantManipulate { produce_one_row: false, schema: Arc::new(DFSchema::empty()), }); - let field_column = if pb_instant_manipulate.field_index.is_empty() { - None - } else { - Some(pb_instant_manipulate.field_index) + + let unfix = UnfixIndices { + time_index_idx: pb_instant_manipulate.time_index_idx, + field_index_idx: pb_instant_manipulate.field_index_idx, }; + Ok(Self { start: pb_instant_manipulate.start, end: pb_instant_manipulate.end, lookback_delta: pb_instant_manipulate.lookback_delta, interval: pb_instant_manipulate.interval, - time_index_column: pb_instant_manipulate.time_index, - field_column, + time_index_column: String::new(), + field_column: None, input: placeholder_plan, + unfix: Some(unfix), }) } } diff --git a/src/promql/src/extension_plan/normalize.rs b/src/promql/src/extension_plan/normalize.rs index eddb60f000..ccd21a9cd7 100644 --- a/src/promql/src/extension_plan/normalize.rs +++ b/src/promql/src/extension_plan/normalize.rs @@ -40,7 +40,9 @@ use prost::Message; use snafu::ResultExt; use crate::error::{DeserializeSnafu, Result}; -use crate::extension_plan::{METRIC_NUM_SERIES, Millisecond}; +use crate::extension_plan::{ + METRIC_NUM_SERIES, Millisecond, resolve_column_name, serialize_column_index, +}; use crate::metrics::PROMQL_SERIES_COUNT; /// Normalize the input record batch. Notice that for simplicity, this method assumes @@ -58,6 +60,13 @@ pub struct SeriesNormalize { tag_columns: Vec, input: LogicalPlan, + unfix: Option, +} + +#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)] +struct UnfixIndices { + pub time_index_idx: u64, + pub tag_column_indices: Vec, } impl UserDefinedLogicalNodeCore for SeriesNormalize { @@ -96,13 +105,42 @@ impl UserDefinedLogicalNodeCore for SeriesNormalize { )); } - Ok(Self { - offset: self.offset, - time_index_column_name: self.time_index_column_name.clone(), - need_filter_out_nan: self.need_filter_out_nan, - input: inputs.into_iter().next().unwrap(), - tag_columns: self.tag_columns.clone(), - }) + let input: LogicalPlan = inputs.into_iter().next().unwrap(); + let input_schema = input.schema(); + + if let Some(unfix) = &self.unfix { + // transform indices to names + let time_index_column_name = resolve_column_name( + unfix.time_index_idx, + input_schema, + "SeriesNormalize", + "time index", + )?; + + let tag_columns = unfix + .tag_column_indices + .iter() + .map(|idx| resolve_column_name(*idx, input_schema, "SeriesNormalize", "tag")) + .collect::>>()?; + + Ok(Self { + offset: self.offset, + time_index_column_name, + need_filter_out_nan: self.need_filter_out_nan, + tag_columns, + input, + unfix: None, + }) + } else { + Ok(Self { + offset: self.offset, + time_index_column_name: self.time_index_column_name.clone(), + need_filter_out_nan: self.need_filter_out_nan, + tag_columns: self.tag_columns.clone(), + input, + unfix: None, + }) + } } } @@ -120,6 +158,7 @@ impl SeriesNormalize { need_filter_out_nan, tag_columns, input, + unfix: None, } } @@ -139,11 +178,21 @@ impl SeriesNormalize { } pub fn serialize(&self) -> Vec { + let time_index_idx = + serialize_column_index(self.input.schema(), &self.time_index_column_name); + + let tag_column_indices = self + .tag_columns + .iter() + .map(|name| serialize_column_index(self.input.schema(), name)) + .collect::>(); + pb::SeriesNormalize { offset: self.offset, - time_index: self.time_index_column_name.clone(), + time_index_idx, filter_nan: self.need_filter_out_nan, - tag_columns: self.tag_columns.clone(), + tag_column_indices, + ..Default::default() } .encode_to_vec() } @@ -154,13 +203,20 @@ impl SeriesNormalize { produce_one_row: false, schema: Arc::new(DFSchema::empty()), }); - Ok(Self::new( - pb_normalize.offset, - pb_normalize.time_index, - pb_normalize.filter_nan, - pb_normalize.tag_columns, - placeholder_plan, - )) + + let unfix = UnfixIndices { + time_index_idx: pb_normalize.time_index_idx, + tag_column_indices: pb_normalize.tag_column_indices.clone(), + }; + + Ok(Self { + offset: pb_normalize.offset, + time_index_column_name: String::new(), + need_filter_out_nan: pb_normalize.filter_nan, + tag_columns: Vec::new(), + input: placeholder_plan, + unfix: Some(unfix), + }) } } diff --git a/src/promql/src/extension_plan/range_manipulate.rs b/src/promql/src/extension_plan/range_manipulate.rs index 1e18e34cd1..540fa4c174 100644 --- a/src/promql/src/extension_plan/range_manipulate.rs +++ b/src/promql/src/extension_plan/range_manipulate.rs @@ -18,6 +18,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use common_telemetry::debug; use datafusion::arrow::array::{Array, ArrayRef, Int64Array, TimestampMillisecondArray}; use datafusion::arrow::compute; use datafusion::arrow::datatypes::{Field, SchemaRef}; @@ -43,7 +44,9 @@ use prost::Message; use snafu::ResultExt; use crate::error::{DeserializeSnafu, Result}; -use crate::extension_plan::{METRIC_NUM_SERIES, Millisecond}; +use crate::extension_plan::{ + METRIC_NUM_SERIES, Millisecond, resolve_column_name, serialize_column_index, +}; use crate::metrics::PROMQL_SERIES_COUNT; use crate::range_array::RangeArray; @@ -62,11 +65,17 @@ pub struct RangeManipulate { end: Millisecond, interval: Millisecond, range: Millisecond, - time_index: String, field_columns: Vec, input: LogicalPlan, output_schema: DFSchemaRef, + unfix: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct UnfixIndices { + pub time_index_idx: u64, + pub tag_column_indices: Vec, } impl RangeManipulate { @@ -90,6 +99,7 @@ impl RangeManipulate { field_columns, input, output_schema, + unfix: None, }) } @@ -181,13 +191,22 @@ impl RangeManipulate { } pub fn serialize(&self) -> Vec { + let time_index_idx = serialize_column_index(self.input.schema(), &self.time_index); + + let tag_column_indices = self + .field_columns + .iter() + .map(|name| serialize_column_index(self.input.schema(), name)) + .collect::>(); + pb::RangeManipulate { start: self.start, end: self.end, interval: self.interval, range: self.range, - time_index: self.time_index.clone(), - tag_columns: self.field_columns.clone(), + time_index_idx, + tag_column_indices, + ..Default::default() } .encode_to_vec() } @@ -200,6 +219,12 @@ impl RangeManipulate { schema: empty_schema.clone(), }); + let unfix = UnfixIndices { + time_index_idx: pb_range_manipulate.time_index_idx, + tag_column_indices: pb_range_manipulate.tag_column_indices.clone(), + }; + debug!("RangeManipulate deserialize unfix: {:?}", unfix); + // Unlike `Self::new()`, this method doesn't check the input schema as it will fail // because the input schema is empty. // But this is Ok since datafusion guarantees to call `with_exprs_and_inputs` for the @@ -209,10 +234,11 @@ impl RangeManipulate { end: pb_range_manipulate.end, interval: pb_range_manipulate.interval, range: pb_range_manipulate.range, - time_index: pb_range_manipulate.time_index, - field_columns: pb_range_manipulate.tag_columns, + time_index: String::new(), + field_columns: Vec::new(), input: placeholder_plan, output_schema: empty_schema, + unfix: Some(unfix), }) } } @@ -286,19 +312,52 @@ impl UserDefinedLogicalNodeCore for RangeManipulate { let input: LogicalPlan = inputs.pop().unwrap(); let input_schema = input.schema(); - let output_schema = - Self::calculate_output_schema(input_schema, &self.time_index, &self.field_columns)?; - Ok(Self { - start: self.start, - end: self.end, - interval: self.interval, - range: self.range, - time_index: self.time_index.clone(), - field_columns: self.field_columns.clone(), - input, - output_schema, - }) + if let Some(unfix) = &self.unfix { + // transform indices to names + let time_index = resolve_column_name( + unfix.time_index_idx, + input_schema, + "RangeManipulate", + "time index", + )?; + + let field_columns = unfix + .tag_column_indices + .iter() + .map(|idx| resolve_column_name(*idx, input_schema, "RangeManipulate", "tag")) + .collect::>>()?; + + let output_schema = + Self::calculate_output_schema(input_schema, &time_index, &field_columns)?; + + Ok(Self { + start: self.start, + end: self.end, + interval: self.interval, + range: self.range, + time_index, + field_columns, + input, + output_schema, + unfix: None, + }) + } else { + let output_schema = + Self::calculate_output_schema(input_schema, &self.time_index, &self.field_columns)?; + + Ok(Self { + start: self.start, + end: self.end, + interval: self.interval, + range: self.range, + time_index: self.time_index.clone(), + field_columns: self.field_columns.clone(), + input, + output_schema, + unfix: None, + }) + } } } diff --git a/src/promql/src/extension_plan/scalar_calculate.rs b/src/promql/src/extension_plan/scalar_calculate.rs index 04768053fb..8619e79387 100644 --- a/src/promql/src/extension_plan/scalar_calculate.rs +++ b/src/promql/src/extension_plan/scalar_calculate.rs @@ -41,7 +41,7 @@ use prost::Message; use snafu::ResultExt; use crate::error::{ColumnNotFoundSnafu, DataFusionPlanningSnafu, DeserializeSnafu, Result}; -use crate::extension_plan::Millisecond; +use crate::extension_plan::{Millisecond, resolve_column_name, serialize_column_index}; /// `ScalarCalculate` is the custom logical plan to calculate /// [`scalar`](https://prometheus.io/docs/prometheus/latest/querying/functions/#scalar) @@ -59,6 +59,14 @@ pub struct ScalarCalculate { field_column: String, input: LogicalPlan, output_schema: DFSchemaRef, + unfix: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)] +struct UnfixIndices { + pub time_index_idx: u64, + pub tag_column_indices: Vec, + pub field_column_idx: u64, } impl ScalarCalculate { @@ -101,6 +109,7 @@ impl ScalarCalculate { field_column: field_column.to_string(), input, output_schema: Arc::new(schema), + unfix: None, }) } @@ -149,13 +158,24 @@ impl ScalarCalculate { } pub fn serialize(&self) -> Vec { + let time_index_idx = serialize_column_index(self.input.schema(), &self.time_index); + + let tag_column_indices = self + .tag_columns + .iter() + .map(|name| serialize_column_index(self.input.schema(), name)) + .collect::>(); + + let field_column_idx = serialize_column_index(self.input.schema(), &self.field_column); + pb::ScalarCalculate { start: self.start, end: self.end, interval: self.interval, - time_index: self.time_index.clone(), - tag_columns: self.tag_columns.clone(), - field_column: self.field_column.clone(), + time_index_idx, + tag_column_indices, + field_column_idx, + ..Default::default() } .encode_to_vec() } @@ -166,17 +186,20 @@ impl ScalarCalculate { produce_one_row: false, schema: Arc::new(DFSchema::empty()), }); + + let unfix = UnfixIndices { + time_index_idx: pb_scalar_calculate.time_index_idx, + tag_column_indices: pb_scalar_calculate.tag_column_indices.clone(), + field_column_idx: pb_scalar_calculate.field_column_idx, + }; + // TODO(Taylor-lagrange): Supports timestamps of different precisions let ts_field = Field::new( - &pb_scalar_calculate.time_index, + "placeholder_time_index", DataType::Timestamp(TimeUnit::Millisecond, None), true, ); - let val_field = Field::new( - format!("scalar({})", pb_scalar_calculate.field_column), - DataType::Float64, - true, - ); + let val_field = Field::new("placeholder_field", DataType::Float64, true); // TODO(Taylor-lagrange): missing tablename in pb let schema = DFSchema::new_with_metadata( vec![(None, Arc::new(ts_field)), (None, Arc::new(val_field))], @@ -188,11 +211,12 @@ impl ScalarCalculate { start: pb_scalar_calculate.start, end: pb_scalar_calculate.end, interval: pb_scalar_calculate.interval, - time_index: pb_scalar_calculate.time_index, - tag_columns: pb_scalar_calculate.tag_columns, - field_column: pb_scalar_calculate.field_column, + time_index: String::new(), + tag_columns: Vec::new(), + field_column: String::new(), output_schema: Arc::new(schema), input: placeholder_plan, + unfix: Some(unfix), }) } } @@ -259,16 +283,70 @@ impl UserDefinedLogicalNodeCore for ScalarCalculate { "ScalarCalculate should not have any expressions".to_string(), )); } - Ok(ScalarCalculate { - start: self.start, - end: self.end, - interval: self.interval, - time_index: self.time_index.clone(), - tag_columns: self.tag_columns.clone(), - field_column: self.field_column.clone(), - input: inputs.into_iter().next().unwrap(), - output_schema: self.output_schema.clone(), - }) + + let input: LogicalPlan = inputs.into_iter().next().unwrap(); + let input_schema = input.schema(); + + if let Some(unfix) = &self.unfix { + // transform indices to names + let time_index = resolve_column_name( + unfix.time_index_idx, + input_schema, + "ScalarCalculate", + "time index", + )?; + + let tag_columns = unfix + .tag_column_indices + .iter() + .map(|idx| resolve_column_name(*idx, input_schema, "ScalarCalculate", "tag")) + .collect::>>()?; + + let field_column = resolve_column_name( + unfix.field_column_idx, + input_schema, + "ScalarCalculate", + "field", + )?; + + // Recreate output schema with actual field names + let ts_field = Field::new( + &time_index, + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ); + let val_field = + Field::new(format!("scalar({})", field_column), DataType::Float64, true); + let schema = DFSchema::new_with_metadata( + vec![(None, Arc::new(ts_field)), (None, Arc::new(val_field))], + HashMap::new(), + ) + .context(DataFusionPlanningSnafu)?; + + Ok(ScalarCalculate { + start: self.start, + end: self.end, + interval: self.interval, + time_index, + tag_columns, + field_column, + input, + output_schema: Arc::new(schema), + unfix: None, + }) + } else { + Ok(ScalarCalculate { + start: self.start, + end: self.end, + interval: self.interval, + time_index: self.time_index.clone(), + tag_columns: self.tag_columns.clone(), + field_column: self.field_column.clone(), + input, + output_schema: self.output_schema.clone(), + unfix: None, + }) + } } } diff --git a/src/promql/src/extension_plan/series_divide.rs b/src/promql/src/extension_plan/series_divide.rs index ece5263741..8e50da113b 100644 --- a/src/promql/src/extension_plan/series_divide.rs +++ b/src/promql/src/extension_plan/series_divide.rs @@ -41,7 +41,7 @@ use prost::Message; use snafu::ResultExt; use crate::error::{DeserializeSnafu, Result}; -use crate::extension_plan::METRIC_NUM_SERIES; +use crate::extension_plan::{METRIC_NUM_SERIES, resolve_column_name, serialize_column_index}; use crate::metrics::PROMQL_SERIES_COUNT; #[derive(Debug, PartialEq, Eq, Hash, PartialOrd)] @@ -53,6 +53,13 @@ pub struct SeriesDivide { /// here can avoid unnecessary sort in follow on plans. time_index_column: String, input: LogicalPlan, + unfix: Option, +} + +#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)] +struct UnfixIndices { + pub tag_column_indices: Vec, + pub time_index_column_idx: u64, } impl UserDefinedLogicalNodeCore for SeriesDivide { @@ -87,11 +94,38 @@ impl UserDefinedLogicalNodeCore for SeriesDivide { )); } - Ok(Self { - tag_columns: self.tag_columns.clone(), - time_index_column: self.time_index_column.clone(), - input: inputs[0].clone(), - }) + let input: LogicalPlan = inputs[0].clone(); + let input_schema = input.schema(); + + if let Some(unfix) = &self.unfix { + // transform indices to names + let tag_columns = unfix + .tag_column_indices + .iter() + .map(|idx| resolve_column_name(*idx, input_schema, "SeriesDivide", "tag")) + .collect::>>()?; + + let time_index_column = resolve_column_name( + unfix.time_index_column_idx, + input_schema, + "SeriesDivide", + "time index", + )?; + + Ok(Self { + tag_columns, + time_index_column, + input, + unfix: None, + }) + } else { + Ok(Self { + tag_columns: self.tag_columns.clone(), + time_index_column: self.time_index_column.clone(), + input, + unfix: None, + }) + } } } @@ -101,6 +135,7 @@ impl SeriesDivide { tag_columns, time_index_column, input, + unfix: None, } } @@ -122,9 +157,19 @@ impl SeriesDivide { } pub fn serialize(&self) -> Vec { + let tag_column_indices = self + .tag_columns + .iter() + .map(|name| serialize_column_index(self.input.schema(), name)) + .collect::>(); + + let time_index_column_idx = + serialize_column_index(self.input.schema(), &self.time_index_column); + pb::SeriesDivide { - tag_columns: self.tag_columns.clone(), - time_index_column: self.time_index_column.clone(), + tag_column_indices, + time_index_column_idx, + ..Default::default() } .encode_to_vec() } @@ -135,10 +180,17 @@ impl SeriesDivide { produce_one_row: false, schema: Arc::new(DFSchema::empty()), }); + + let unfix = UnfixIndices { + tag_column_indices: pb_series_divide.tag_column_indices.clone(), + time_index_column_idx: pb_series_divide.time_index_column_idx, + }; + Ok(Self { - tag_columns: pb_series_divide.tag_columns, - time_index_column: pb_series_divide.time_index_column, + tag_columns: Vec::new(), + time_index_column: String::new(), input: placeholder_plan, + unfix: Some(unfix), }) } } diff --git a/tests/cases/standalone/common/promql/encode_substrait.result b/tests/cases/standalone/common/promql/encode_substrait.result new file mode 100644 index 0000000000..802a2308bb --- /dev/null +++ b/tests/cases/standalone/common/promql/encode_substrait.result @@ -0,0 +1,49 @@ +create table count_total ( + ts timestamp time index, + tag_a string, + tag_b string, + val double, + primary key (tag_a, tag_b), +); + +Affected Rows: 0 + +-- if `RangeManipulate` can be encoded/decoded correctly in substrait, the following queries should pass +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +tql explain (0, 100, '1s') + increase(count_total{ + tag_a="ffa", + }[1h])[12h:1h]; + ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false, remote_input=[ | +| | PromRangeManipulate: req range=[0..0], interval=[300000], eval range=[43200000], time index=[ts], values=["prom_increase(ts_range,val,ts,Int64(3600000))"] | +| | Filter: prom_increase(ts_range,val,ts,Int64(3600000)) IS NOT NULL | +| | Projection: count_total.ts, prom_increase(ts_range, val, count_total.ts, Int64(3600000)) AS prom_increase(ts_range,val,ts,Int64(3600000)), count_total.tag_a, count_total.tag_b | +| | PromRangeManipulate: req range=[-39600000..0], interval=[3600000], eval range=[3600000], time index=[ts], values=["val"] | +| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [true] | +| | PromSeriesDivide: tags=["tag_a", "tag_b"] | +| | Sort: count_total.tag_a ASC NULLS FIRST, count_total.tag_b ASC NULLS FIRST, count_total.ts ASC NULLS FIRST | +| | Filter: count_total.tag_a = Utf8("ffa") AND count_total.ts >= TimestampMillisecond(-43500000, None) AND count_total.ts <= TimestampMillisecond(300000, None) | +| | TableScan: count_total | +| | ]] | +| physical_plan | CooperativeExec | +| | MergeScanExec: REDACTED +| | | ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + +tql eval (0, 100, '1s') + increase(count_total{ + tag_a="ffa", + }[1h])[12h:1h]; + +++ +++ + +drop table count_total; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/promql/encode_substrait.sql b/tests/cases/standalone/common/promql/encode_substrait.sql new file mode 100644 index 0000000000..195c383ebd --- /dev/null +++ b/tests/cases/standalone/common/promql/encode_substrait.sql @@ -0,0 +1,22 @@ +create table count_total ( + ts timestamp time index, + tag_a string, + tag_b string, + val double, + primary key (tag_a, tag_b), +); + +-- if `RangeManipulate` can be encoded/decoded correctly in substrait, the following queries should pass +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (partitioning.*) REDACTED +tql explain (0, 100, '1s') + increase(count_total{ + tag_a="ffa", + }[1h])[12h:1h]; + +tql eval (0, 100, '1s') + increase(count_total{ + tag_a="ffa", + }[1h])[12h:1h]; + +drop table count_total;