From d077892e1c4246ea6fee49bc84352e63286815fe Mon Sep 17 00:00:00 2001 From: WU Jingdi Date: Fri, 19 Apr 2024 17:56:09 +0800 Subject: [PATCH] feat: support PromQL scalar (#3693) --- .../substrait/src/extension_serializer.rs | 13 +- src/promql/src/extension_plan.rs | 2 + src/promql/src/extension_plan/planner.rs | 4 +- .../src/extension_plan/scalar_calculate.rs | 602 ++++++++++++++++++ src/promql/src/planner.rs | 78 ++- tests-integration/src/tests/promql_test.rs | 2 +- .../standalone/common/promql/scalar.result | 320 ++++++++++ .../cases/standalone/common/promql/scalar.sql | 95 +++ 8 files changed, 1104 insertions(+), 12 deletions(-) create mode 100644 src/promql/src/extension_plan/scalar_calculate.rs create mode 100644 tests/cases/standalone/common/promql/scalar.result create mode 100644 tests/cases/standalone/common/promql/scalar.sql diff --git a/src/common/substrait/src/extension_serializer.rs b/src/common/substrait/src/extension_serializer.rs index 813c525843..89944db508 100644 --- a/src/common/substrait/src/extension_serializer.rs +++ b/src/common/substrait/src/extension_serializer.rs @@ -19,7 +19,7 @@ use datafusion::execution::registry::SerializerRegistry; use datafusion_common::DataFusionError; use datafusion_expr::UserDefinedLogicalNode; use promql::extension_plan::{ - EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize, + EmptyMetric, InstantManipulate, RangeManipulate, ScalarCalculate, SeriesDivide, SeriesNormalize, }; pub struct ExtensionSerializer; @@ -50,6 +50,13 @@ impl SerializerRegistry for ExtensionSerializer { .expect("Failed to downcast to RangeManipulate"); Ok(range_manipulate.serialize()) } + name if name == ScalarCalculate::name() => { + let scalar_calculate = node + .as_any() + .downcast_ref::() + .expect("Failed to downcast to ScalarCalculate"); + Ok(scalar_calculate.serialize()) + } name if name == SeriesDivide::name() => { let series_divide = node .as_any() @@ -92,6 +99,10 @@ impl SerializerRegistry for ExtensionSerializer { let series_divide = SeriesDivide::deserialize(bytes)?; Ok(Arc::new(series_divide)) } + name if name == ScalarCalculate::name() => { + let scalar_calculate = ScalarCalculate::deserialize(bytes)?; + Ok(Arc::new(scalar_calculate)) + } name if name == EmptyMetric::name() => Err(DataFusionError::Substrait( "EmptyMetric should not be deserialized".to_string(), )), diff --git a/src/promql/src/extension_plan.rs b/src/promql/src/extension_plan.rs index ff2195e532..f8e32fc4dc 100644 --- a/src/promql/src/extension_plan.rs +++ b/src/promql/src/extension_plan.rs @@ -18,6 +18,7 @@ mod instant_manipulate; mod normalize; mod planner; mod range_manipulate; +mod scalar_calculate; mod series_divide; #[cfg(test)] mod test_util; @@ -30,6 +31,7 @@ pub use instant_manipulate::{InstantManipulate, InstantManipulateExec, InstantMa pub use normalize::{SeriesNormalize, SeriesNormalizeExec, SeriesNormalizeStream}; pub use planner::PromExtensionPlanner; pub use range_manipulate::{RangeManipulate, RangeManipulateExec, RangeManipulateStream}; +pub use scalar_calculate::ScalarCalculate; pub use series_divide::{SeriesDivide, SeriesDivideExec, SeriesDivideStream}; pub use union_distinct_on::{UnionDistinctOn, UnionDistinctOnExec, UnionDistinctOnStream}; diff --git a/src/promql/src/extension_plan/planner.rs b/src/promql/src/extension_plan/planner.rs index 80cd565bd2..10b6efd438 100644 --- a/src/promql/src/extension_plan/planner.rs +++ b/src/promql/src/extension_plan/planner.rs @@ -21,7 +21,7 @@ use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode}; use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner}; -use super::{HistogramFold, UnionDistinctOn}; +use super::{HistogramFold, ScalarCalculate, UnionDistinctOn}; use crate::extension_plan::{ EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize, }; @@ -48,6 +48,8 @@ impl ExtensionPlanner for PromExtensionPlanner { Ok(Some(node.to_execution_plan(physical_inputs[0].clone()))) } else if let Some(node) = node.as_any().downcast_ref::() { Ok(Some(node.to_execution_plan(session_state, planner)?)) + } else if let Some(node) = node.as_any().downcast_ref::() { + Ok(Some(node.to_execution_plan(physical_inputs[0].clone())?)) } else if let Some(node) = node.as_any().downcast_ref::() { Ok(Some(node.to_execution_plan(physical_inputs[0].clone()))) } else if let Some(node) = node.as_any().downcast_ref::() { diff --git a/src/promql/src/extension_plan/scalar_calculate.rs b/src/promql/src/extension_plan/scalar_calculate.rs new file mode 100644 index 0000000000..19dfd8283c --- /dev/null +++ b/src/promql/src/extension_plan/scalar_calculate.rs @@ -0,0 +1,602 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::HashMap; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use datafusion::common::stats::Precision; +use datafusion::common::{DFSchema, DFSchemaRef, Result as DataFusionResult, Statistics}; +use datafusion::error::DataFusionError; +use datafusion::execution::context::TaskContext; +use datafusion::logical_expr::{EmptyRelation, LogicalPlan, UserDefinedLogicalNodeCore}; +use datafusion::physical_expr::EquivalenceProperties; +use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PlanProperties, + RecordBatchStream, SendableRecordBatchStream, +}; +use datafusion::prelude::Expr; +use datafusion::sql::TableReference; +use datatypes::arrow::array::{Array, Float64Array, StringArray, TimestampMillisecondArray}; +use datatypes::arrow::compute::{cast_with_options, concat_batches, CastOptions}; +use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; +use datatypes::arrow::record_batch::RecordBatch; +use futures::{ready, Stream, StreamExt}; +use greptime_proto::substrait_extension as pb; +use prost::Message; +use snafu::ResultExt; + +use super::Millisecond; +use crate::error::{ColumnNotFoundSnafu, DataFusionPlanningSnafu, DeserializeSnafu, Result}; + +/// `ScalarCalculate` is the custom logical plan to calculate +/// [`scalar`](https://prometheus.io/docs/prometheus/latest/querying/functions/#scalar) +/// in PromQL, return NaN when have multiple time series. +/// return the time series as scalar value when only have one time series. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct ScalarCalculate { + start: Millisecond, + end: Millisecond, + interval: Millisecond, + + time_index: String, + tag_columns: Vec, + field_column: String, + input: LogicalPlan, + output_schema: DFSchemaRef, +} + +impl ScalarCalculate { + /// create a new `ScalarCalculate` plan + #[allow(clippy::too_many_arguments)] + pub fn new( + start: Millisecond, + end: Millisecond, + interval: Millisecond, + input: LogicalPlan, + time_index: &str, + tag_colunms: &[String], + field_column: &str, + table_name: Option<&str>, + ) -> Result { + let input_schema = input.schema(); + let Ok(ts_field) = input_schema + .field_with_unqualified_name(time_index) + .cloned() + else { + return ColumnNotFoundSnafu { col: time_index }.fail(); + }; + let val_field = Field::new(format!("scalar({})", field_column), DataType::Float64, true); + let qualifier = table_name.map(TableReference::bare); + let schema = DFSchema::new_with_metadata( + vec![ + (qualifier.clone(), Arc::new(ts_field)), + (qualifier, Arc::new(val_field)), + ], + input_schema.metadata().clone(), + ) + .context(DataFusionPlanningSnafu)?; + + Ok(Self { + start, + end, + interval, + time_index: time_index.to_string(), + tag_columns: tag_colunms.to_vec(), + field_column: field_column.to_string(), + input, + output_schema: Arc::new(schema), + }) + } + + /// The name of this custom plan + pub const fn name() -> &'static str { + "ScalarCalculate" + } + + /// Create a new execution plan from ScalarCalculate + pub fn to_execution_plan( + &self, + exec_input: Arc, + ) -> DataFusionResult> { + let fields: Vec<_> = self + .output_schema + .fields() + .iter() + .map(|field| Field::new(field.name(), field.data_type().clone(), field.is_nullable())) + .collect(); + let input_schema = exec_input.schema(); + let ts_index = input_schema + .index_of(&self.time_index) + .map_err(|e| DataFusionError::ArrowError(e, None))?; + let val_index = input_schema + .index_of(&self.field_column) + .map_err(|e| DataFusionError::ArrowError(e, None))?; + let schema = Arc::new(Schema::new(fields)); + let properties = PlanProperties::new( + EquivalenceProperties::new(schema.clone()), + Partitioning::UnknownPartitioning(1), + exec_input.properties().execution_mode, + ); + Ok(Arc::new(ScalarCalculateExec { + start: self.start, + end: self.end, + interval: self.interval, + schema, + input: exec_input, + project_index: (ts_index, val_index), + tag_columns: self.tag_columns.clone(), + metric: ExecutionPlanMetricsSet::new(), + properties, + })) + } + + pub fn serialize(&self) -> Vec { + pb::ScalarCalculate { + start: self.start, + end: self.end, + interval: self.interval, + time_index: self.time_index.clone(), + tag_columns: self.tag_columns.clone(), + field_column: self.field_column.clone(), + } + .encode_to_vec() + } + + pub fn deserialize(bytes: &[u8]) -> Result { + let pb_scalar_calculate = pb::ScalarCalculate::decode(bytes).context(DeserializeSnafu)?; + let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: Arc::new(DFSchema::empty()), + }); + // TODO(Taylor-lagrange): Supports timestamps of different precisions + let ts_field = Field::new( + &pb_scalar_calculate.time_index, + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ); + let val_field = Field::new( + format!("scalar({})", pb_scalar_calculate.field_column), + DataType::Float64, + true, + ); + // TODO(Taylor-lagrange): missing tablename in pb + let schema = DFSchema::new_with_metadata( + vec![(None, Arc::new(ts_field)), (None, Arc::new(val_field))], + HashMap::new(), + ) + .context(DataFusionPlanningSnafu)?; + + Ok(Self { + start: pb_scalar_calculate.start, + end: pb_scalar_calculate.end, + interval: pb_scalar_calculate.interval, + time_index: pb_scalar_calculate.time_index, + tag_columns: pb_scalar_calculate.tag_columns, + field_column: pb_scalar_calculate.field_column, + output_schema: Arc::new(schema), + input: placeholder_plan, + }) + } +} + +impl UserDefinedLogicalNodeCore for ScalarCalculate { + fn name(&self) -> &str { + Self::name() + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![&self.input] + } + + fn schema(&self) -> &DFSchemaRef { + &self.output_schema + } + + fn expressions(&self) -> Vec { + vec![] + } + + fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "ScalarCalculate: tags={:?}", self.tag_columns) + } + + fn from_template(&self, _expr: &[Expr], inputs: &[LogicalPlan]) -> Self { + assert!(!inputs.is_empty()); + ScalarCalculate { + start: self.start, + end: self.end, + interval: self.interval, + time_index: self.time_index.clone(), + tag_columns: self.tag_columns.clone(), + field_column: self.field_column.clone(), + input: inputs[0].clone(), + output_schema: self.output_schema.clone(), + } + } +} + +#[derive(Debug, Clone)] +struct ScalarCalculateExec { + start: Millisecond, + end: Millisecond, + interval: Millisecond, + schema: SchemaRef, + project_index: (usize, usize), + input: Arc, + tag_columns: Vec, + metric: ExecutionPlanMetricsSet, + properties: PlanProperties, +} + +impl ExecutionPlan for ScalarCalculateExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn properties(&self) -> &PlanProperties { + &self.properties + } + + fn maintains_input_order(&self) -> Vec { + vec![true; self.children().len()] + } + + fn required_input_distribution(&self) -> Vec { + vec![Distribution::SinglePartition] + } + + fn children(&self) -> Vec> { + vec![self.input.clone()] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> DataFusionResult> { + Ok(Arc::new(ScalarCalculateExec { + start: self.start, + end: self.end, + interval: self.interval, + schema: self.schema.clone(), + project_index: self.project_index, + tag_columns: self.tag_columns.clone(), + input: children[0].clone(), + metric: self.metric.clone(), + properties: self.properties.clone(), + })) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> DataFusionResult { + let baseline_metric = BaselineMetrics::new(&self.metric, partition); + let input = self.input.execute(partition, context)?; + let schema = input.schema(); + let tag_indices = self + .tag_columns + .iter() + .map(|tag| { + schema + .column_with_name(tag) + .unwrap_or_else(|| panic!("tag column not found {tag}")) + .0 + }) + .collect(); + + Ok(Box::pin(ScalarCalculateStream { + start: self.start, + end: self.end, + interval: self.interval, + schema: self.schema.clone(), + project_index: self.project_index, + metric: baseline_metric, + tag_indices, + input, + have_multi_series: false, + done: false, + batch: None, + tag_value: None, + })) + } + + fn metrics(&self) -> Option { + Some(self.metric.clone_inner()) + } + + fn statistics(&self) -> DataFusionResult { + let input_stats = self.input.statistics()?; + + let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64; + let estimated_total_bytes = input_stats + .total_byte_size + .get_value() + .zip(input_stats.num_rows.get_value()) + .map(|(size, rows)| { + Precision::Inexact(((*size as f64 / *rows as f64) * estimated_row_num).floor() as _) + }) + .unwrap_or_default(); + + Ok(Statistics { + num_rows: Precision::Inexact(estimated_row_num as _), + total_byte_size: estimated_total_bytes, + // TODO(ruihang): support this column statistics + column_statistics: Statistics::unknown_column(&self.schema()), + }) + } +} + +impl DisplayAs for ScalarCalculateExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "ScalarCalculateExec: tags={:?}", self.tag_columns) + } + } + } +} + +struct ScalarCalculateStream { + start: Millisecond, + end: Millisecond, + interval: Millisecond, + schema: SchemaRef, + input: SendableRecordBatchStream, + metric: BaselineMetrics, + tag_indices: Vec, + /// with format `(ts_index, field_index)` + project_index: (usize, usize), + have_multi_series: bool, + done: bool, + batch: Option, + tag_value: Option>, +} + +impl RecordBatchStream for ScalarCalculateStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +impl ScalarCalculateStream { + fn update_batch(&mut self, batch: RecordBatch) -> DataFusionResult<()> { + let _timer = self.metric.elapsed_compute(); + // if have multi time series, scalar will return NaN + if self.have_multi_series { + return Ok(()); + } + // fast path: no tag columns means all data belongs to the same series. + if self.tag_indices.is_empty() { + self.append_batch(batch)?; + return Ok(()); + } + let all_same = |val: Option<&str>, array: &StringArray| -> bool { + if let Some(v) = val { + array.iter().all(|s| s == Some(v)) + } else { + array.is_empty() || array.iter().skip(1).all(|s| s == Some(array.value(0))) + } + }; + // assert the entire batch belong to the same series + let all_tag_columns_same = if let Some(tags) = &self.tag_value { + tags.iter() + .zip(self.tag_indices.iter()) + .all(|(value, index)| { + let array = batch.column(*index); + let string_array = array.as_any().downcast_ref::().unwrap(); + all_same(Some(value), string_array) + }) + } else { + let mut tag_values = Vec::with_capacity(self.tag_indices.len()); + let is_same = self.tag_indices.iter().all(|index| { + let array = batch.column(*index); + let string_array = array.as_any().downcast_ref::().unwrap(); + tag_values.push(string_array.value(0).to_string()); + all_same(None, string_array) + }); + self.tag_value = Some(tag_values); + is_same + }; + if all_tag_columns_same { + self.append_batch(batch)?; + } else { + self.have_multi_series = true; + } + Ok(()) + } + + fn append_batch(&mut self, input_batch: RecordBatch) -> DataFusionResult<()> { + let ts_column = input_batch.column(self.project_index.0).clone(); + let val_column = cast_with_options( + input_batch.column(self.project_index.1), + &DataType::Float64, + &CastOptions::default(), + )?; + let input_batch = RecordBatch::try_new(self.schema.clone(), vec![ts_column, val_column])?; + if let Some(batch) = &self.batch { + self.batch = Some(concat_batches(&self.schema, vec![batch, &input_batch])?); + } else { + self.batch = Some(input_batch); + } + Ok(()) + } +} + +impl Stream for ScalarCalculateStream { + type Item = DataFusionResult; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + if self.done { + return Poll::Ready(None); + } + match ready!(self.input.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + self.update_batch(batch)?; + } + // inner had error, return to caller + Some(Err(e)) => return Poll::Ready(Some(Err(e))), + // inner is done, producing output + None => { + self.done = true; + return match self.batch.take() { + Some(batch) if !self.have_multi_series => Poll::Ready(Some(Ok(batch))), + _ => { + let time_array = (self.start..=self.end) + .step_by(self.interval as _) + .collect::>(); + let nums = time_array.len(); + let nan_batch = RecordBatch::try_new( + self.schema.clone(), + vec![ + Arc::new(TimestampMillisecondArray::from(time_array)), + Arc::new(Float64Array::from(vec![f64::NAN; nums])), + ], + )?; + Poll::Ready(Some(Ok(nan_batch))) + } + }; + } + }; + } + } +} + +#[cfg(test)] +mod test { + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use datafusion::physical_plan::memory::MemoryExec; + use datafusion::physical_plan::ExecutionMode; + use datafusion::prelude::SessionContext; + use datatypes::arrow::array::{Float64Array, TimestampMillisecondArray}; + use datatypes::arrow::datatypes::TimeUnit; + + use super::*; + + fn prepare_test_data(diff_series: bool) -> MemoryExec { + let schema = Arc::new(Schema::new(vec![ + Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true), + Field::new("tag1", DataType::Utf8, true), + Field::new("tag2", DataType::Utf8, true), + Field::new("val", DataType::Float64, true), + ])); + let batch_1 = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(TimestampMillisecondArray::from(vec![0, 5_000])), + Arc::new(StringArray::from(vec!["foo", "foo"])), + Arc::new(StringArray::from(vec!["🥺", "🥺"])), + Arc::new(Float64Array::from(vec![1.0, 2.0])), + ], + ) + .unwrap(); + let batch_2 = if diff_series { + RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(TimestampMillisecondArray::from(vec![10_000, 15_000])), + Arc::new(StringArray::from(vec!["foo", "foo"])), + Arc::new(StringArray::from(vec!["🥺", "😝"])), + Arc::new(Float64Array::from(vec![3.0, 4.0])), + ], + ) + .unwrap() + } else { + RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(TimestampMillisecondArray::from(vec![10_000, 15_000])), + Arc::new(StringArray::from(vec!["foo", "foo"])), + Arc::new(StringArray::from(vec!["🥺", "🥺"])), + Arc::new(Float64Array::from(vec![3.0, 4.0])), + ], + ) + .unwrap() + }; + MemoryExec::try_new(&[vec![batch_1, batch_2]], schema, None).unwrap() + } + + async fn run_test(diff_series: bool, expected: &str) { + let memory_exec = Arc::new(prepare_test_data(diff_series)); + let schema = Arc::new(Schema::new(vec![ + Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true), + Field::new("val", DataType::Float64, true), + ])); + let properties = PlanProperties::new( + EquivalenceProperties::new(schema.clone()), + Partitioning::UnknownPartitioning(1), + ExecutionMode::Bounded, + ); + let scalar_exec = Arc::new(ScalarCalculateExec { + start: 0, + end: 15_000, + interval: 5000, + tag_columns: vec!["tag1".to_string(), "tag2".to_string()], + input: memory_exec, + schema, + project_index: (0, 3), + metric: ExecutionPlanMetricsSet::new(), + properties, + }); + let session_context = SessionContext::default(); + let result = datafusion::physical_plan::collect(scalar_exec, session_context.task_ctx()) + .await + .unwrap(); + let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result) + .unwrap() + .to_string(); + assert_eq!(result_literal, expected); + } + + #[tokio::test] + async fn same_series() { + run_test( + false, + "+---------------------+-----+\ + \n| ts | val |\ + \n+---------------------+-----+\ + \n| 1970-01-01T00:00:00 | 1.0 |\ + \n| 1970-01-01T00:00:05 | 2.0 |\ + \n| 1970-01-01T00:00:10 | 3.0 |\ + \n| 1970-01-01T00:00:15 | 4.0 |\ + \n+---------------------+-----+", + ) + .await + } + + #[tokio::test] + async fn diff_series() { + run_test( + true, + "+---------------------+-----+\ + \n| ts | val |\ + \n+---------------------+-----+\ + \n| 1970-01-01T00:00:00 | NaN |\ + \n| 1970-01-01T00:00:05 | NaN |\ + \n| 1970-01-01T00:00:10 | NaN |\ + \n| 1970-01-01T00:00:15 | NaN |\ + \n+---------------------+-----+", + ) + .await + } +} diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 67d5614093..0af5308838 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -57,7 +57,7 @@ use crate::error::{ }; use crate::extension_plan::{ build_special_time_expr, EmptyMetric, HistogramFold, InstantManipulate, Millisecond, - RangeManipulate, SeriesDivide, SeriesNormalize, UnionDistinctOn, + RangeManipulate, ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn, }; use crate::functions::{ AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters, IDelta, @@ -67,6 +67,8 @@ use crate::functions::{ /// `time()` function in PromQL. const SPECIAL_TIME_FUNCTION: &str = "time"; +/// `scalar()` function in PromQL. +const SCALAR_FUNCTION: &str = "scalar"; /// `histogram_quantile` function in PromQL const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile"; /// `vector` function in PromQL @@ -337,7 +339,17 @@ impl PromPlanner { // rename table references to avoid ambiguity left_table_ref = TableReference::bare("lhs"); right_table_ref = TableReference::bare("rhs"); - self.ctx.table_name = Some("lhs".to_string()); + // `self.ctx` have ctx in right plan, if right plan have no tag, + // we use left plan ctx as the ctx for subsequent calculations, + // to avoid case like `host + scalar(...)` + // we need preserve tag column on `host` table in subsequent projection, + // which only show in left plan ctx. + if self.ctx.tag_columns.is_empty() { + self.ctx = left_context.clone(); + self.ctx.table_name = Some("lhs".to_string()); + } else { + self.ctx.table_name = Some("rhs".to_string()); + } } let mut field_columns = left_field_columns.iter().zip(right_field_columns.iter()); @@ -346,6 +358,10 @@ impl PromPlanner { right_input, left_table_ref.clone(), right_table_ref.clone(), + // if left plan or right plan tag is empty, means case like `scalar(...) + host` or `host + scalar(...)` + // under this case we only join on time index + left_context.tag_columns.is_empty() + || right_context.tag_columns.is_empty(), )?; let join_plan_schema = join_plan.schema().clone(); @@ -494,6 +510,7 @@ impl PromPlanner { match func.name { SPECIAL_HISTOGRAM_QUANTILE => return self.create_histogram_plan(args).await, SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await, + SCALAR_FUNCTION => return self.create_scalar_plan(args).await, _ => {} } @@ -1434,6 +1451,44 @@ impl PromPlanner { })) } + /// Create a [SCALAR_FUNCTION] plan + async fn create_scalar_plan(&mut self, args: &PromFunctionArgs) -> Result { + ensure!( + args.len() == 1, + FunctionInvalidArgumentSnafu { + fn_name: SCALAR_FUNCTION + } + ); + let input = self + .prom_expr_to_plan(args.args[0].as_ref().clone()) + .await?; + ensure!( + self.ctx.field_columns.len() == 1, + MultiFieldsNotSupportedSnafu { + operator: SCALAR_FUNCTION + }, + ); + let scalar_plan = LogicalPlan::Extension(Extension { + node: Arc::new(ScalarCalculate::new( + self.ctx.start, + self.ctx.end, + self.ctx.interval, + input, + self.ctx.time_index_column.as_ref().unwrap(), + &self.ctx.tag_columns, + &self.ctx.field_columns[0], + self.ctx.table_name.as_deref(), + )?), + }); + // scalar plan have no tag columns + self.ctx.tag_columns.clear(); + self.ctx.field_columns.clear(); + self.ctx + .field_columns + .push(scalar_plan.schema().field(1).name().clone()); + Ok(scalar_plan) + } + /// Try to build a DataFusion Literal Expression from PromQL Expr, return /// `None` if the input is not a literal expression. fn try_build_literal_expr(expr: &PromExpr) -> Option { @@ -1583,19 +1638,24 @@ impl PromPlanner { } /// Build a inner join on time index column and tag columns to concat two logical plans. + /// When `only_join_time_index == true` we only join on the time index, because these two plan may not have the same tag columns fn join_on_non_field_columns( &self, left: LogicalPlan, right: LogicalPlan, left_table_ref: TableReference, right_table_ref: TableReference, + only_join_time_index: bool, ) -> Result { - let mut tag_columns = self - .ctx - .tag_columns - .iter() - .map(Column::from_name) - .collect::>(); + let mut tag_columns = if only_join_time_index { + vec![] + } else { + self.ctx + .tag_columns + .iter() + .map(Column::from_name) + .collect::>() + }; // push time index column if it exist if let Some(time_index_column) = &self.ctx.time_index_column { @@ -2543,7 +2603,7 @@ mod test { .unwrap(); let expected = String::from( - "Projection: lhs.tag_0, lhs.timestamp, lhs.field_0 + rhs.field_0 AS lhs.field_0 + rhs.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), lhs.field_0 + rhs.field_0:Float64;N]\ + "Projection: rhs.tag_0, rhs.timestamp, lhs.field_0 + rhs.field_0 AS lhs.field_0 + rhs.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), lhs.field_0 + rhs.field_0:Float64;N]\ \n Inner Join: lhs.tag_0 = rhs.tag_0, lhs.timestamp = rhs.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ diff --git a/tests-integration/src/tests/promql_test.rs b/tests-integration/src/tests/promql_test.rs index 8810f5d946..8f5c440fa7 100644 --- a/tests-integration/src/tests/promql_test.rs +++ b/tests-integration/src/tests/promql_test.rs @@ -457,7 +457,7 @@ async fn aggregators_complex_combined_aggrs(instance: Arc) { Duration::from_secs(60), Duration::from_secs(0), "+------------+---------------------+---------------------------------------------------------------------------------------------------------------------------------------------+\ - \n| job | ts | lhs.lhs.lhs.SUM(http_requests.value) + rhs.MIN(http_requests.value) + http_requests.MAX(http_requests.value) + rhs.AVG(http_requests.value) |\ + \n| job | ts | lhs.rhs.lhs.SUM(http_requests.value) + rhs.MIN(http_requests.value) + http_requests.MAX(http_requests.value) + rhs.AVG(http_requests.value) |\ \n+------------+---------------------+---------------------------------------------------------------------------------------------------------------------------------------------+\ \n| api-server | 1970-01-01T00:00:00 | 1750.0 |\ \n| app-server | 1970-01-01T00:00:00 | 4550.0 |\ diff --git a/tests/cases/standalone/common/promql/scalar.result b/tests/cases/standalone/common/promql/scalar.result new file mode 100644 index 0000000000..0a3d5be95a --- /dev/null +++ b/tests/cases/standalone/common/promql/scalar.result @@ -0,0 +1,320 @@ +CREATE TABLE host ( + ts timestamp(3) time index, + host STRING PRIMARY KEY, + val BIGINT, +); + +Affected Rows: 0 + +INSERT INTO TABLE host VALUES + (0, 'host1', 1), + (0, 'host2', 2), + (5000, 'host1', 3), + (5000, 'host2', 4), + (10000, 'host1', 5), + (10000, 'host2', 6), + (15000, 'host1', 7), + (15000, 'host2', 8); + +Affected Rows: 8 + +-- case only have one time series, scalar return value +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(host{host="host1"}); + ++---------------------+-------------+ +| ts | scalar(val) | ++---------------------+-------------+ +| 1970-01-01T00:00:00 | 1.0 | +| 1970-01-01T00:00:05 | 3.0 | +| 1970-01-01T00:00:10 | 5.0 | +| 1970-01-01T00:00:15 | 7.0 | ++---------------------+-------------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(host{host="host1"}) + 1; + ++---------------------+--------------------------+ +| ts | scalar(val) + Float64(1) | ++---------------------+--------------------------+ +| 1970-01-01T00:00:00 | 2.0 | +| 1970-01-01T00:00:05 | 4.0 | +| 1970-01-01T00:00:10 | 6.0 | +| 1970-01-01T00:00:15 | 8.0 | ++---------------------+--------------------------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') 1 + scalar(host{host="host1"}); + ++---------------------+--------------------------+ +| ts | Float64(1) + scalar(val) | ++---------------------+--------------------------+ +| 1970-01-01T00:00:00 | 2.0 | +| 1970-01-01T00:00:05 | 4.0 | +| 1970-01-01T00:00:10 | 6.0 | +| 1970-01-01T00:00:15 | 8.0 | ++---------------------+--------------------------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(host{host="host1"}) + scalar(host{host="host2"}); + ++---------------------+-----------------------------------+ +| ts | lhs.scalar(val) + rhs.scalar(val) | ++---------------------+-----------------------------------+ +| 1970-01-01T00:00:00 | 3.0 | +| 1970-01-01T00:00:05 | 7.0 | +| 1970-01-01T00:00:10 | 11.0 | +| 1970-01-01T00:00:15 | 15.0 | ++---------------------+-----------------------------------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') host{host="host1"} + scalar(host{host="host2"}); + ++-------+---------------------+---------------------------+ +| host | ts | lhs.val + rhs.scalar(val) | ++-------+---------------------+---------------------------+ +| host1 | 1970-01-01T00:00:00 | 3.0 | +| host1 | 1970-01-01T00:00:05 | 7.0 | +| host1 | 1970-01-01T00:00:10 | 11.0 | +| host1 | 1970-01-01T00:00:15 | 15.0 | ++-------+---------------------+---------------------------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(host{host="host1"}) + host{host="host2"}; + ++-------+---------------------+---------------------------+ +| host | ts | lhs.scalar(val) + rhs.val | ++-------+---------------------+---------------------------+ +| host2 | 1970-01-01T00:00:00 | 3.0 | +| host2 | 1970-01-01T00:00:05 | 7.0 | +| host2 | 1970-01-01T00:00:10 | 11.0 | +| host2 | 1970-01-01T00:00:15 | 15.0 | ++-------+---------------------+---------------------------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') host + scalar(host{host="host2"}); + ++-------+---------------------+---------------------------+ +| host | ts | lhs.val + rhs.scalar(val) | ++-------+---------------------+---------------------------+ +| host1 | 1970-01-01T00:00:00 | 3.0 | +| host1 | 1970-01-01T00:00:05 | 7.0 | +| host1 | 1970-01-01T00:00:10 | 11.0 | +| host1 | 1970-01-01T00:00:15 | 15.0 | +| host2 | 1970-01-01T00:00:00 | 4.0 | +| host2 | 1970-01-01T00:00:05 | 8.0 | +| host2 | 1970-01-01T00:00:10 | 12.0 | +| host2 | 1970-01-01T00:00:15 | 16.0 | ++-------+---------------------+---------------------------+ + +-- SQLNESS SORT_RESULT 3 1 + +TQL EVAL (0, 15, '5s') scalar(host{host="host1"}) + host; + ++-------+---------------------+---------------------------+ +| host | ts | lhs.scalar(val) + rhs.val | ++-------+---------------------+---------------------------+ +| host1 | 1970-01-01T00:00:00 | 2.0 | +| host1 | 1970-01-01T00:00:05 | 6.0 | +| host1 | 1970-01-01T00:00:10 | 10.0 | +| host1 | 1970-01-01T00:00:15 | 14.0 | +| host2 | 1970-01-01T00:00:00 | 3.0 | +| host2 | 1970-01-01T00:00:05 | 7.0 | +| host2 | 1970-01-01T00:00:10 | 11.0 | +| host2 | 1970-01-01T00:00:15 | 15.0 | ++-------+---------------------+---------------------------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(count(count(host) by (host))); + ++---------------------+--------------------------------+ +| ts | scalar(COUNT(COUNT(host.val))) | ++---------------------+--------------------------------+ +| 1970-01-01T00:00:00 | 2.0 | +| 1970-01-01T00:00:05 | 2.0 | +| 1970-01-01T00:00:10 | 2.0 | +| 1970-01-01T00:00:15 | 2.0 | ++---------------------+--------------------------------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(host{host="host1"} + scalar(host{host="host2"})); + ++---------------------+-----------------------------------+ +| ts | scalar(lhs.val + rhs.scalar(val)) | ++---------------------+-----------------------------------+ +| 1970-01-01T00:00:00 | 3.0 | +| 1970-01-01T00:00:05 | 7.0 | +| 1970-01-01T00:00:10 | 11.0 | +| 1970-01-01T00:00:15 | 15.0 | ++---------------------+-----------------------------------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(scalar(host{host="host2"}) + host{host="host1"}); + ++---------------------+-----------------------------------+ +| ts | scalar(lhs.scalar(val) + rhs.val) | ++---------------------+-----------------------------------+ +| 1970-01-01T00:00:00 | 3.0 | +| 1970-01-01T00:00:05 | 7.0 | +| 1970-01-01T00:00:10 | 11.0 | +| 1970-01-01T00:00:15 | 15.0 | ++---------------------+-----------------------------------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(host + scalar(host{host="host2"})); + ++---------------------+-----------------------------------+ +| ts | scalar(lhs.val + rhs.scalar(val)) | ++---------------------+-----------------------------------+ +| 1970-01-01T00:00:00 | NaN | +| 1970-01-01T00:00:05 | NaN | +| 1970-01-01T00:00:10 | NaN | +| 1970-01-01T00:00:15 | NaN | ++---------------------+-----------------------------------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(scalar(host{host="host2"}) + host); + ++---------------------+-----------------------------------+ +| ts | scalar(lhs.scalar(val) + rhs.val) | ++---------------------+-----------------------------------+ +| 1970-01-01T00:00:00 | NaN | +| 1970-01-01T00:00:05 | NaN | +| 1970-01-01T00:00:10 | NaN | +| 1970-01-01T00:00:15 | NaN | ++---------------------+-----------------------------------+ + +-- case have multiple time series, scalar return NaN +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(host); + ++---------------------+-------------+ +| ts | scalar(val) | ++---------------------+-------------+ +| 1970-01-01T00:00:00 | NaN | +| 1970-01-01T00:00:05 | NaN | +| 1970-01-01T00:00:10 | NaN | +| 1970-01-01T00:00:15 | NaN | ++---------------------+-------------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(host) + 1; + ++---------------------+--------------------------+ +| ts | scalar(val) + Float64(1) | ++---------------------+--------------------------+ +| 1970-01-01T00:00:00 | NaN | +| 1970-01-01T00:00:05 | NaN | +| 1970-01-01T00:00:10 | NaN | +| 1970-01-01T00:00:15 | NaN | ++---------------------+--------------------------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') 1 + scalar(host); + ++---------------------+--------------------------+ +| ts | Float64(1) + scalar(val) | ++---------------------+--------------------------+ +| 1970-01-01T00:00:00 | NaN | +| 1970-01-01T00:00:05 | NaN | +| 1970-01-01T00:00:10 | NaN | +| 1970-01-01T00:00:15 | NaN | ++---------------------+--------------------------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(host) + scalar(host); + ++---------------------+-----------------------------------+ +| ts | lhs.scalar(val) + rhs.scalar(val) | ++---------------------+-----------------------------------+ +| 1970-01-01T00:00:00 | NaN | +| 1970-01-01T00:00:05 | NaN | +| 1970-01-01T00:00:10 | NaN | +| 1970-01-01T00:00:15 | NaN | ++---------------------+-----------------------------------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') host + scalar(host); + ++-------+---------------------+---------------------------+ +| host | ts | lhs.val + rhs.scalar(val) | ++-------+---------------------+---------------------------+ +| host1 | 1970-01-01T00:00:00 | NaN | +| host1 | 1970-01-01T00:00:05 | NaN | +| host1 | 1970-01-01T00:00:10 | NaN | +| host1 | 1970-01-01T00:00:15 | NaN | +| host2 | 1970-01-01T00:00:00 | NaN | +| host2 | 1970-01-01T00:00:05 | NaN | +| host2 | 1970-01-01T00:00:10 | NaN | +| host2 | 1970-01-01T00:00:15 | NaN | ++-------+---------------------+---------------------------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(host) + host; + ++-------+---------------------+---------------------------+ +| host | ts | lhs.scalar(val) + rhs.val | ++-------+---------------------+---------------------------+ +| host1 | 1970-01-01T00:00:00 | NaN | +| host1 | 1970-01-01T00:00:05 | NaN | +| host1 | 1970-01-01T00:00:10 | NaN | +| host1 | 1970-01-01T00:00:15 | NaN | +| host2 | 1970-01-01T00:00:00 | NaN | +| host2 | 1970-01-01T00:00:05 | NaN | +| host2 | 1970-01-01T00:00:10 | NaN | +| host2 | 1970-01-01T00:00:15 | NaN | ++-------+---------------------+---------------------------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') host{host="host2"} + scalar(host); + ++-------+---------------------+---------------------------+ +| host | ts | lhs.val + rhs.scalar(val) | ++-------+---------------------+---------------------------+ +| host2 | 1970-01-01T00:00:00 | NaN | +| host2 | 1970-01-01T00:00:05 | NaN | +| host2 | 1970-01-01T00:00:10 | NaN | +| host2 | 1970-01-01T00:00:15 | NaN | ++-------+---------------------+---------------------------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(host) + host{host="host2"}; + ++-------+---------------------+---------------------------+ +| host | ts | lhs.scalar(val) + rhs.val | ++-------+---------------------+---------------------------+ +| host2 | 1970-01-01T00:00:00 | NaN | +| host2 | 1970-01-01T00:00:05 | NaN | +| host2 | 1970-01-01T00:00:10 | NaN | +| host2 | 1970-01-01T00:00:15 | NaN | ++-------+---------------------+---------------------------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(host{host="host1"} + scalar(host)); + ++---------------------+-----------------------------------+ +| ts | scalar(lhs.val + rhs.scalar(val)) | ++---------------------+-----------------------------------+ +| 1970-01-01T00:00:00 | NaN | +| 1970-01-01T00:00:05 | NaN | +| 1970-01-01T00:00:10 | NaN | +| 1970-01-01T00:00:15 | NaN | ++---------------------+-----------------------------------+ + +-- error case +TQL EVAL (0, 15, '5s') scalar(1 + scalar(host{host="host2"})); + +Error: 2000(InvalidSyntax), expected type vector in call to function 'scalar', got scalar + +TQL EVAL (0, 15, '5s') scalar(scalar(host{host="host2"}) + 1); + +Error: 2000(InvalidSyntax), expected type vector in call to function 'scalar', got scalar + +TQL EVAL (0, 15, '5s') scalar(scalar(host{host="host1"}) + scalar(host{host="host2"})); + +Error: 2000(InvalidSyntax), expected type vector in call to function 'scalar', got scalar + +Drop table host; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/promql/scalar.sql b/tests/cases/standalone/common/promql/scalar.sql new file mode 100644 index 0000000000..3c45164226 --- /dev/null +++ b/tests/cases/standalone/common/promql/scalar.sql @@ -0,0 +1,95 @@ +CREATE TABLE host ( + ts timestamp(3) time index, + host STRING PRIMARY KEY, + val BIGINT, +); + +INSERT INTO TABLE host VALUES + (0, 'host1', 1), + (0, 'host2', 2), + (5000, 'host1', 3), + (5000, 'host2', 4), + (10000, 'host1', 5), + (10000, 'host2', 6), + (15000, 'host1', 7), + (15000, 'host2', 8); + +-- case only have one time series, scalar return value + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(host{host="host1"}); + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(host{host="host1"}) + 1; + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') 1 + scalar(host{host="host1"}); + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(host{host="host1"}) + scalar(host{host="host2"}); + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') host{host="host1"} + scalar(host{host="host2"}); + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(host{host="host1"}) + host{host="host2"}; + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') host + scalar(host{host="host2"}); + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(host{host="host1"}) + host; + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(count(count(host) by (host))); + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(host{host="host1"} + scalar(host{host="host2"})); + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(scalar(host{host="host2"}) + host{host="host1"}); + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(host + scalar(host{host="host2"})); + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(scalar(host{host="host2"}) + host); + +-- case have multiple time series, scalar return NaN + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(host); + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(host) + 1; + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') 1 + scalar(host); + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(host) + scalar(host); + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') host + scalar(host); + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(host) + host; + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') host{host="host2"} + scalar(host); + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(host) + host{host="host2"}; + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(host{host="host1"} + scalar(host)); + +-- error case + +TQL EVAL (0, 15, '5s') scalar(1 + scalar(host{host="host2"})); + +TQL EVAL (0, 15, '5s') scalar(scalar(host{host="host2"}) + 1); + +TQL EVAL (0, 15, '5s') scalar(scalar(host{host="host1"}) + scalar(host{host="host2"})); + +Drop table host;