feat: support PromQL scalar (#3693)

This commit is contained in:
WU Jingdi
2024-04-19 17:56:09 +08:00
committed by GitHub
parent cfed466fcd
commit d077892e1c
8 changed files with 1104 additions and 12 deletions

View File

@@ -19,7 +19,7 @@ use datafusion::execution::registry::SerializerRegistry;
use datafusion_common::DataFusionError;
use datafusion_expr::UserDefinedLogicalNode;
use promql::extension_plan::{
EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize,
EmptyMetric, InstantManipulate, RangeManipulate, ScalarCalculate, SeriesDivide, SeriesNormalize,
};
pub struct ExtensionSerializer;
@@ -50,6 +50,13 @@ impl SerializerRegistry for ExtensionSerializer {
.expect("Failed to downcast to RangeManipulate");
Ok(range_manipulate.serialize())
}
name if name == ScalarCalculate::name() => {
let scalar_calculate = node
.as_any()
.downcast_ref::<ScalarCalculate>()
.expect("Failed to downcast to ScalarCalculate");
Ok(scalar_calculate.serialize())
}
name if name == SeriesDivide::name() => {
let series_divide = node
.as_any()
@@ -92,6 +99,10 @@ impl SerializerRegistry for ExtensionSerializer {
let series_divide = SeriesDivide::deserialize(bytes)?;
Ok(Arc::new(series_divide))
}
name if name == ScalarCalculate::name() => {
let scalar_calculate = ScalarCalculate::deserialize(bytes)?;
Ok(Arc::new(scalar_calculate))
}
name if name == EmptyMetric::name() => Err(DataFusionError::Substrait(
"EmptyMetric should not be deserialized".to_string(),
)),

View File

@@ -18,6 +18,7 @@ mod instant_manipulate;
mod normalize;
mod planner;
mod range_manipulate;
mod scalar_calculate;
mod series_divide;
#[cfg(test)]
mod test_util;
@@ -30,6 +31,7 @@ pub use instant_manipulate::{InstantManipulate, InstantManipulateExec, InstantMa
pub use normalize::{SeriesNormalize, SeriesNormalizeExec, SeriesNormalizeStream};
pub use planner::PromExtensionPlanner;
pub use range_manipulate::{RangeManipulate, RangeManipulateExec, RangeManipulateStream};
pub use scalar_calculate::ScalarCalculate;
pub use series_divide::{SeriesDivide, SeriesDivideExec, SeriesDivideStream};
pub use union_distinct_on::{UnionDistinctOn, UnionDistinctOnExec, UnionDistinctOnStream};

View File

@@ -21,7 +21,7 @@ use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
use super::{HistogramFold, UnionDistinctOn};
use super::{HistogramFold, ScalarCalculate, UnionDistinctOn};
use crate::extension_plan::{
EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize,
};
@@ -48,6 +48,8 @@ impl ExtensionPlanner for PromExtensionPlanner {
Ok(Some(node.to_execution_plan(physical_inputs[0].clone())))
} else if let Some(node) = node.as_any().downcast_ref::<EmptyMetric>() {
Ok(Some(node.to_execution_plan(session_state, planner)?))
} else if let Some(node) = node.as_any().downcast_ref::<ScalarCalculate>() {
Ok(Some(node.to_execution_plan(physical_inputs[0].clone())?))
} else if let Some(node) = node.as_any().downcast_ref::<HistogramFold>() {
Ok(Some(node.to_execution_plan(physical_inputs[0].clone())))
} else if let Some(node) = node.as_any().downcast_ref::<UnionDistinctOn>() {

View File

@@ -0,0 +1,602 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use datafusion::common::stats::Precision;
use datafusion::common::{DFSchema, DFSchemaRef, Result as DataFusionResult, Statistics};
use datafusion::error::DataFusionError;
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::{EmptyRelation, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PlanProperties,
RecordBatchStream, SendableRecordBatchStream,
};
use datafusion::prelude::Expr;
use datafusion::sql::TableReference;
use datatypes::arrow::array::{Array, Float64Array, StringArray, TimestampMillisecondArray};
use datatypes::arrow::compute::{cast_with_options, concat_batches, CastOptions};
use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
use datatypes::arrow::record_batch::RecordBatch;
use futures::{ready, Stream, StreamExt};
use greptime_proto::substrait_extension as pb;
use prost::Message;
use snafu::ResultExt;
use super::Millisecond;
use crate::error::{ColumnNotFoundSnafu, DataFusionPlanningSnafu, DeserializeSnafu, Result};
/// `ScalarCalculate` is the custom logical plan to calculate
/// [`scalar`](https://prometheus.io/docs/prometheus/latest/querying/functions/#scalar)
/// in PromQL, return NaN when have multiple time series.
/// return the time series as scalar value when only have one time series.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ScalarCalculate {
start: Millisecond,
end: Millisecond,
interval: Millisecond,
time_index: String,
tag_columns: Vec<String>,
field_column: String,
input: LogicalPlan,
output_schema: DFSchemaRef,
}
impl ScalarCalculate {
/// create a new `ScalarCalculate` plan
#[allow(clippy::too_many_arguments)]
pub fn new(
start: Millisecond,
end: Millisecond,
interval: Millisecond,
input: LogicalPlan,
time_index: &str,
tag_colunms: &[String],
field_column: &str,
table_name: Option<&str>,
) -> Result<Self> {
let input_schema = input.schema();
let Ok(ts_field) = input_schema
.field_with_unqualified_name(time_index)
.cloned()
else {
return ColumnNotFoundSnafu { col: time_index }.fail();
};
let val_field = Field::new(format!("scalar({})", field_column), DataType::Float64, true);
let qualifier = table_name.map(TableReference::bare);
let schema = DFSchema::new_with_metadata(
vec![
(qualifier.clone(), Arc::new(ts_field)),
(qualifier, Arc::new(val_field)),
],
input_schema.metadata().clone(),
)
.context(DataFusionPlanningSnafu)?;
Ok(Self {
start,
end,
interval,
time_index: time_index.to_string(),
tag_columns: tag_colunms.to_vec(),
field_column: field_column.to_string(),
input,
output_schema: Arc::new(schema),
})
}
/// The name of this custom plan
pub const fn name() -> &'static str {
"ScalarCalculate"
}
/// Create a new execution plan from ScalarCalculate
pub fn to_execution_plan(
&self,
exec_input: Arc<dyn ExecutionPlan>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
let fields: Vec<_> = self
.output_schema
.fields()
.iter()
.map(|field| Field::new(field.name(), field.data_type().clone(), field.is_nullable()))
.collect();
let input_schema = exec_input.schema();
let ts_index = input_schema
.index_of(&self.time_index)
.map_err(|e| DataFusionError::ArrowError(e, None))?;
let val_index = input_schema
.index_of(&self.field_column)
.map_err(|e| DataFusionError::ArrowError(e, None))?;
let schema = Arc::new(Schema::new(fields));
let properties = PlanProperties::new(
EquivalenceProperties::new(schema.clone()),
Partitioning::UnknownPartitioning(1),
exec_input.properties().execution_mode,
);
Ok(Arc::new(ScalarCalculateExec {
start: self.start,
end: self.end,
interval: self.interval,
schema,
input: exec_input,
project_index: (ts_index, val_index),
tag_columns: self.tag_columns.clone(),
metric: ExecutionPlanMetricsSet::new(),
properties,
}))
}
pub fn serialize(&self) -> Vec<u8> {
pb::ScalarCalculate {
start: self.start,
end: self.end,
interval: self.interval,
time_index: self.time_index.clone(),
tag_columns: self.tag_columns.clone(),
field_column: self.field_column.clone(),
}
.encode_to_vec()
}
pub fn deserialize(bytes: &[u8]) -> Result<Self> {
let pb_scalar_calculate = pb::ScalarCalculate::decode(bytes).context(DeserializeSnafu)?;
let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(DFSchema::empty()),
});
// TODO(Taylor-lagrange): Supports timestamps of different precisions
let ts_field = Field::new(
&pb_scalar_calculate.time_index,
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
);
let val_field = Field::new(
format!("scalar({})", pb_scalar_calculate.field_column),
DataType::Float64,
true,
);
// TODO(Taylor-lagrange): missing tablename in pb
let schema = DFSchema::new_with_metadata(
vec![(None, Arc::new(ts_field)), (None, Arc::new(val_field))],
HashMap::new(),
)
.context(DataFusionPlanningSnafu)?;
Ok(Self {
start: pb_scalar_calculate.start,
end: pb_scalar_calculate.end,
interval: pb_scalar_calculate.interval,
time_index: pb_scalar_calculate.time_index,
tag_columns: pb_scalar_calculate.tag_columns,
field_column: pb_scalar_calculate.field_column,
output_schema: Arc::new(schema),
input: placeholder_plan,
})
}
}
impl UserDefinedLogicalNodeCore for ScalarCalculate {
fn name(&self) -> &str {
Self::name()
}
fn inputs(&self) -> Vec<&LogicalPlan> {
vec![&self.input]
}
fn schema(&self) -> &DFSchemaRef {
&self.output_schema
}
fn expressions(&self) -> Vec<Expr> {
vec![]
}
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "ScalarCalculate: tags={:?}", self.tag_columns)
}
fn from_template(&self, _expr: &[Expr], inputs: &[LogicalPlan]) -> Self {
assert!(!inputs.is_empty());
ScalarCalculate {
start: self.start,
end: self.end,
interval: self.interval,
time_index: self.time_index.clone(),
tag_columns: self.tag_columns.clone(),
field_column: self.field_column.clone(),
input: inputs[0].clone(),
output_schema: self.output_schema.clone(),
}
}
}
#[derive(Debug, Clone)]
struct ScalarCalculateExec {
start: Millisecond,
end: Millisecond,
interval: Millisecond,
schema: SchemaRef,
project_index: (usize, usize),
input: Arc<dyn ExecutionPlan>,
tag_columns: Vec<String>,
metric: ExecutionPlanMetricsSet,
properties: PlanProperties,
}
impl ExecutionPlan for ScalarCalculateExec {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn properties(&self) -> &PlanProperties {
&self.properties
}
fn maintains_input_order(&self) -> Vec<bool> {
vec![true; self.children().len()]
}
fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::SinglePartition]
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(ScalarCalculateExec {
start: self.start,
end: self.end,
interval: self.interval,
schema: self.schema.clone(),
project_index: self.project_index,
tag_columns: self.tag_columns.clone(),
input: children[0].clone(),
metric: self.metric.clone(),
properties: self.properties.clone(),
}))
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> DataFusionResult<SendableRecordBatchStream> {
let baseline_metric = BaselineMetrics::new(&self.metric, partition);
let input = self.input.execute(partition, context)?;
let schema = input.schema();
let tag_indices = self
.tag_columns
.iter()
.map(|tag| {
schema
.column_with_name(tag)
.unwrap_or_else(|| panic!("tag column not found {tag}"))
.0
})
.collect();
Ok(Box::pin(ScalarCalculateStream {
start: self.start,
end: self.end,
interval: self.interval,
schema: self.schema.clone(),
project_index: self.project_index,
metric: baseline_metric,
tag_indices,
input,
have_multi_series: false,
done: false,
batch: None,
tag_value: None,
}))
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metric.clone_inner())
}
fn statistics(&self) -> DataFusionResult<Statistics> {
let input_stats = self.input.statistics()?;
let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64;
let estimated_total_bytes = input_stats
.total_byte_size
.get_value()
.zip(input_stats.num_rows.get_value())
.map(|(size, rows)| {
Precision::Inexact(((*size as f64 / *rows as f64) * estimated_row_num).floor() as _)
})
.unwrap_or_default();
Ok(Statistics {
num_rows: Precision::Inexact(estimated_row_num as _),
total_byte_size: estimated_total_bytes,
// TODO(ruihang): support this column statistics
column_statistics: Statistics::unknown_column(&self.schema()),
})
}
}
impl DisplayAs for ScalarCalculateExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "ScalarCalculateExec: tags={:?}", self.tag_columns)
}
}
}
}
struct ScalarCalculateStream {
start: Millisecond,
end: Millisecond,
interval: Millisecond,
schema: SchemaRef,
input: SendableRecordBatchStream,
metric: BaselineMetrics,
tag_indices: Vec<usize>,
/// with format `(ts_index, field_index)`
project_index: (usize, usize),
have_multi_series: bool,
done: bool,
batch: Option<RecordBatch>,
tag_value: Option<Vec<String>>,
}
impl RecordBatchStream for ScalarCalculateStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
impl ScalarCalculateStream {
fn update_batch(&mut self, batch: RecordBatch) -> DataFusionResult<()> {
let _timer = self.metric.elapsed_compute();
// if have multi time series, scalar will return NaN
if self.have_multi_series {
return Ok(());
}
// fast path: no tag columns means all data belongs to the same series.
if self.tag_indices.is_empty() {
self.append_batch(batch)?;
return Ok(());
}
let all_same = |val: Option<&str>, array: &StringArray| -> bool {
if let Some(v) = val {
array.iter().all(|s| s == Some(v))
} else {
array.is_empty() || array.iter().skip(1).all(|s| s == Some(array.value(0)))
}
};
// assert the entire batch belong to the same series
let all_tag_columns_same = if let Some(tags) = &self.tag_value {
tags.iter()
.zip(self.tag_indices.iter())
.all(|(value, index)| {
let array = batch.column(*index);
let string_array = array.as_any().downcast_ref::<StringArray>().unwrap();
all_same(Some(value), string_array)
})
} else {
let mut tag_values = Vec::with_capacity(self.tag_indices.len());
let is_same = self.tag_indices.iter().all(|index| {
let array = batch.column(*index);
let string_array = array.as_any().downcast_ref::<StringArray>().unwrap();
tag_values.push(string_array.value(0).to_string());
all_same(None, string_array)
});
self.tag_value = Some(tag_values);
is_same
};
if all_tag_columns_same {
self.append_batch(batch)?;
} else {
self.have_multi_series = true;
}
Ok(())
}
fn append_batch(&mut self, input_batch: RecordBatch) -> DataFusionResult<()> {
let ts_column = input_batch.column(self.project_index.0).clone();
let val_column = cast_with_options(
input_batch.column(self.project_index.1),
&DataType::Float64,
&CastOptions::default(),
)?;
let input_batch = RecordBatch::try_new(self.schema.clone(), vec![ts_column, val_column])?;
if let Some(batch) = &self.batch {
self.batch = Some(concat_batches(&self.schema, vec![batch, &input_batch])?);
} else {
self.batch = Some(input_batch);
}
Ok(())
}
}
impl Stream for ScalarCalculateStream {
type Item = DataFusionResult<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if self.done {
return Poll::Ready(None);
}
match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
self.update_batch(batch)?;
}
// inner had error, return to caller
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
// inner is done, producing output
None => {
self.done = true;
return match self.batch.take() {
Some(batch) if !self.have_multi_series => Poll::Ready(Some(Ok(batch))),
_ => {
let time_array = (self.start..=self.end)
.step_by(self.interval as _)
.collect::<Vec<_>>();
let nums = time_array.len();
let nan_batch = RecordBatch::try_new(
self.schema.clone(),
vec![
Arc::new(TimestampMillisecondArray::from(time_array)),
Arc::new(Float64Array::from(vec![f64::NAN; nums])),
],
)?;
Poll::Ready(Some(Ok(nan_batch)))
}
};
}
};
}
}
}
#[cfg(test)]
mod test {
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::ExecutionMode;
use datafusion::prelude::SessionContext;
use datatypes::arrow::array::{Float64Array, TimestampMillisecondArray};
use datatypes::arrow::datatypes::TimeUnit;
use super::*;
fn prepare_test_data(diff_series: bool) -> MemoryExec {
let schema = Arc::new(Schema::new(vec![
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
Field::new("tag1", DataType::Utf8, true),
Field::new("tag2", DataType::Utf8, true),
Field::new("val", DataType::Float64, true),
]));
let batch_1 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(TimestampMillisecondArray::from(vec![0, 5_000])),
Arc::new(StringArray::from(vec!["foo", "foo"])),
Arc::new(StringArray::from(vec!["🥺", "🥺"])),
Arc::new(Float64Array::from(vec![1.0, 2.0])),
],
)
.unwrap();
let batch_2 = if diff_series {
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(TimestampMillisecondArray::from(vec![10_000, 15_000])),
Arc::new(StringArray::from(vec!["foo", "foo"])),
Arc::new(StringArray::from(vec!["🥺", "😝"])),
Arc::new(Float64Array::from(vec![3.0, 4.0])),
],
)
.unwrap()
} else {
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(TimestampMillisecondArray::from(vec![10_000, 15_000])),
Arc::new(StringArray::from(vec!["foo", "foo"])),
Arc::new(StringArray::from(vec!["🥺", "🥺"])),
Arc::new(Float64Array::from(vec![3.0, 4.0])),
],
)
.unwrap()
};
MemoryExec::try_new(&[vec![batch_1, batch_2]], schema, None).unwrap()
}
async fn run_test(diff_series: bool, expected: &str) {
let memory_exec = Arc::new(prepare_test_data(diff_series));
let schema = Arc::new(Schema::new(vec![
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
Field::new("val", DataType::Float64, true),
]));
let properties = PlanProperties::new(
EquivalenceProperties::new(schema.clone()),
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
);
let scalar_exec = Arc::new(ScalarCalculateExec {
start: 0,
end: 15_000,
interval: 5000,
tag_columns: vec!["tag1".to_string(), "tag2".to_string()],
input: memory_exec,
schema,
project_index: (0, 3),
metric: ExecutionPlanMetricsSet::new(),
properties,
});
let session_context = SessionContext::default();
let result = datafusion::physical_plan::collect(scalar_exec, session_context.task_ctx())
.await
.unwrap();
let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
.unwrap()
.to_string();
assert_eq!(result_literal, expected);
}
#[tokio::test]
async fn same_series() {
run_test(
false,
"+---------------------+-----+\
\n| ts | val |\
\n+---------------------+-----+\
\n| 1970-01-01T00:00:00 | 1.0 |\
\n| 1970-01-01T00:00:05 | 2.0 |\
\n| 1970-01-01T00:00:10 | 3.0 |\
\n| 1970-01-01T00:00:15 | 4.0 |\
\n+---------------------+-----+",
)
.await
}
#[tokio::test]
async fn diff_series() {
run_test(
true,
"+---------------------+-----+\
\n| ts | val |\
\n+---------------------+-----+\
\n| 1970-01-01T00:00:00 | NaN |\
\n| 1970-01-01T00:00:05 | NaN |\
\n| 1970-01-01T00:00:10 | NaN |\
\n| 1970-01-01T00:00:15 | NaN |\
\n+---------------------+-----+",
)
.await
}
}

View File

@@ -57,7 +57,7 @@ use crate::error::{
};
use crate::extension_plan::{
build_special_time_expr, EmptyMetric, HistogramFold, InstantManipulate, Millisecond,
RangeManipulate, SeriesDivide, SeriesNormalize, UnionDistinctOn,
RangeManipulate, ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn,
};
use crate::functions::{
AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters, IDelta,
@@ -67,6 +67,8 @@ use crate::functions::{
/// `time()` function in PromQL.
const SPECIAL_TIME_FUNCTION: &str = "time";
/// `scalar()` function in PromQL.
const SCALAR_FUNCTION: &str = "scalar";
/// `histogram_quantile` function in PromQL
const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile";
/// `vector` function in PromQL
@@ -337,7 +339,17 @@ impl PromPlanner {
// rename table references to avoid ambiguity
left_table_ref = TableReference::bare("lhs");
right_table_ref = TableReference::bare("rhs");
self.ctx.table_name = Some("lhs".to_string());
// `self.ctx` have ctx in right plan, if right plan have no tag,
// we use left plan ctx as the ctx for subsequent calculations,
// to avoid case like `host + scalar(...)`
// we need preserve tag column on `host` table in subsequent projection,
// which only show in left plan ctx.
if self.ctx.tag_columns.is_empty() {
self.ctx = left_context.clone();
self.ctx.table_name = Some("lhs".to_string());
} else {
self.ctx.table_name = Some("rhs".to_string());
}
}
let mut field_columns =
left_field_columns.iter().zip(right_field_columns.iter());
@@ -346,6 +358,10 @@ impl PromPlanner {
right_input,
left_table_ref.clone(),
right_table_ref.clone(),
// if left plan or right plan tag is empty, means case like `scalar(...) + host` or `host + scalar(...)`
// under this case we only join on time index
left_context.tag_columns.is_empty()
|| right_context.tag_columns.is_empty(),
)?;
let join_plan_schema = join_plan.schema().clone();
@@ -494,6 +510,7 @@ impl PromPlanner {
match func.name {
SPECIAL_HISTOGRAM_QUANTILE => return self.create_histogram_plan(args).await,
SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await,
SCALAR_FUNCTION => return self.create_scalar_plan(args).await,
_ => {}
}
@@ -1434,6 +1451,44 @@ impl PromPlanner {
}))
}
/// Create a [SCALAR_FUNCTION] plan
async fn create_scalar_plan(&mut self, args: &PromFunctionArgs) -> Result<LogicalPlan> {
ensure!(
args.len() == 1,
FunctionInvalidArgumentSnafu {
fn_name: SCALAR_FUNCTION
}
);
let input = self
.prom_expr_to_plan(args.args[0].as_ref().clone())
.await?;
ensure!(
self.ctx.field_columns.len() == 1,
MultiFieldsNotSupportedSnafu {
operator: SCALAR_FUNCTION
},
);
let scalar_plan = LogicalPlan::Extension(Extension {
node: Arc::new(ScalarCalculate::new(
self.ctx.start,
self.ctx.end,
self.ctx.interval,
input,
self.ctx.time_index_column.as_ref().unwrap(),
&self.ctx.tag_columns,
&self.ctx.field_columns[0],
self.ctx.table_name.as_deref(),
)?),
});
// scalar plan have no tag columns
self.ctx.tag_columns.clear();
self.ctx.field_columns.clear();
self.ctx
.field_columns
.push(scalar_plan.schema().field(1).name().clone());
Ok(scalar_plan)
}
/// Try to build a DataFusion Literal Expression from PromQL Expr, return
/// `None` if the input is not a literal expression.
fn try_build_literal_expr(expr: &PromExpr) -> Option<DfExpr> {
@@ -1583,19 +1638,24 @@ impl PromPlanner {
}
/// Build a inner join on time index column and tag columns to concat two logical plans.
/// When `only_join_time_index == true` we only join on the time index, because these two plan may not have the same tag columns
fn join_on_non_field_columns(
&self,
left: LogicalPlan,
right: LogicalPlan,
left_table_ref: TableReference,
right_table_ref: TableReference,
only_join_time_index: bool,
) -> Result<LogicalPlan> {
let mut tag_columns = self
.ctx
.tag_columns
.iter()
.map(Column::from_name)
.collect::<Vec<_>>();
let mut tag_columns = if only_join_time_index {
vec![]
} else {
self.ctx
.tag_columns
.iter()
.map(Column::from_name)
.collect::<Vec<_>>()
};
// push time index column if it exist
if let Some(time_index_column) = &self.ctx.time_index_column {
@@ -2543,7 +2603,7 @@ mod test {
.unwrap();
let expected = String::from(
"Projection: lhs.tag_0, lhs.timestamp, lhs.field_0 + rhs.field_0 AS lhs.field_0 + rhs.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), lhs.field_0 + rhs.field_0:Float64;N]\
"Projection: rhs.tag_0, rhs.timestamp, lhs.field_0 + rhs.field_0 AS lhs.field_0 + rhs.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), lhs.field_0 + rhs.field_0:Float64;N]\
\n Inner Join: lhs.tag_0 = rhs.tag_0, lhs.timestamp = rhs.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\

View File

@@ -457,7 +457,7 @@ async fn aggregators_complex_combined_aggrs(instance: Arc<dyn MockInstance>) {
Duration::from_secs(60),
Duration::from_secs(0),
"+------------+---------------------+---------------------------------------------------------------------------------------------------------------------------------------------+\
\n| job | ts | lhs.lhs.lhs.SUM(http_requests.value) + rhs.MIN(http_requests.value) + http_requests.MAX(http_requests.value) + rhs.AVG(http_requests.value) |\
\n| job | ts | lhs.rhs.lhs.SUM(http_requests.value) + rhs.MIN(http_requests.value) + http_requests.MAX(http_requests.value) + rhs.AVG(http_requests.value) |\
\n+------------+---------------------+---------------------------------------------------------------------------------------------------------------------------------------------+\
\n| api-server | 1970-01-01T00:00:00 | 1750.0 |\
\n| app-server | 1970-01-01T00:00:00 | 4550.0 |\

View File

@@ -0,0 +1,320 @@
CREATE TABLE host (
ts timestamp(3) time index,
host STRING PRIMARY KEY,
val BIGINT,
);
Affected Rows: 0
INSERT INTO TABLE host VALUES
(0, 'host1', 1),
(0, 'host2', 2),
(5000, 'host1', 3),
(5000, 'host2', 4),
(10000, 'host1', 5),
(10000, 'host2', 6),
(15000, 'host1', 7),
(15000, 'host2', 8);
Affected Rows: 8
-- case only have one time series, scalar return value
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(host{host="host1"});
+---------------------+-------------+
| ts | scalar(val) |
+---------------------+-------------+
| 1970-01-01T00:00:00 | 1.0 |
| 1970-01-01T00:00:05 | 3.0 |
| 1970-01-01T00:00:10 | 5.0 |
| 1970-01-01T00:00:15 | 7.0 |
+---------------------+-------------+
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(host{host="host1"}) + 1;
+---------------------+--------------------------+
| ts | scalar(val) + Float64(1) |
+---------------------+--------------------------+
| 1970-01-01T00:00:00 | 2.0 |
| 1970-01-01T00:00:05 | 4.0 |
| 1970-01-01T00:00:10 | 6.0 |
| 1970-01-01T00:00:15 | 8.0 |
+---------------------+--------------------------+
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') 1 + scalar(host{host="host1"});
+---------------------+--------------------------+
| ts | Float64(1) + scalar(val) |
+---------------------+--------------------------+
| 1970-01-01T00:00:00 | 2.0 |
| 1970-01-01T00:00:05 | 4.0 |
| 1970-01-01T00:00:10 | 6.0 |
| 1970-01-01T00:00:15 | 8.0 |
+---------------------+--------------------------+
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(host{host="host1"}) + scalar(host{host="host2"});
+---------------------+-----------------------------------+
| ts | lhs.scalar(val) + rhs.scalar(val) |
+---------------------+-----------------------------------+
| 1970-01-01T00:00:00 | 3.0 |
| 1970-01-01T00:00:05 | 7.0 |
| 1970-01-01T00:00:10 | 11.0 |
| 1970-01-01T00:00:15 | 15.0 |
+---------------------+-----------------------------------+
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') host{host="host1"} + scalar(host{host="host2"});
+-------+---------------------+---------------------------+
| host | ts | lhs.val + rhs.scalar(val) |
+-------+---------------------+---------------------------+
| host1 | 1970-01-01T00:00:00 | 3.0 |
| host1 | 1970-01-01T00:00:05 | 7.0 |
| host1 | 1970-01-01T00:00:10 | 11.0 |
| host1 | 1970-01-01T00:00:15 | 15.0 |
+-------+---------------------+---------------------------+
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(host{host="host1"}) + host{host="host2"};
+-------+---------------------+---------------------------+
| host | ts | lhs.scalar(val) + rhs.val |
+-------+---------------------+---------------------------+
| host2 | 1970-01-01T00:00:00 | 3.0 |
| host2 | 1970-01-01T00:00:05 | 7.0 |
| host2 | 1970-01-01T00:00:10 | 11.0 |
| host2 | 1970-01-01T00:00:15 | 15.0 |
+-------+---------------------+---------------------------+
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') host + scalar(host{host="host2"});
+-------+---------------------+---------------------------+
| host | ts | lhs.val + rhs.scalar(val) |
+-------+---------------------+---------------------------+
| host1 | 1970-01-01T00:00:00 | 3.0 |
| host1 | 1970-01-01T00:00:05 | 7.0 |
| host1 | 1970-01-01T00:00:10 | 11.0 |
| host1 | 1970-01-01T00:00:15 | 15.0 |
| host2 | 1970-01-01T00:00:00 | 4.0 |
| host2 | 1970-01-01T00:00:05 | 8.0 |
| host2 | 1970-01-01T00:00:10 | 12.0 |
| host2 | 1970-01-01T00:00:15 | 16.0 |
+-------+---------------------+---------------------------+
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(host{host="host1"}) + host;
+-------+---------------------+---------------------------+
| host | ts | lhs.scalar(val) + rhs.val |
+-------+---------------------+---------------------------+
| host1 | 1970-01-01T00:00:00 | 2.0 |
| host1 | 1970-01-01T00:00:05 | 6.0 |
| host1 | 1970-01-01T00:00:10 | 10.0 |
| host1 | 1970-01-01T00:00:15 | 14.0 |
| host2 | 1970-01-01T00:00:00 | 3.0 |
| host2 | 1970-01-01T00:00:05 | 7.0 |
| host2 | 1970-01-01T00:00:10 | 11.0 |
| host2 | 1970-01-01T00:00:15 | 15.0 |
+-------+---------------------+---------------------------+
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(count(count(host) by (host)));
+---------------------+--------------------------------+
| ts | scalar(COUNT(COUNT(host.val))) |
+---------------------+--------------------------------+
| 1970-01-01T00:00:00 | 2.0 |
| 1970-01-01T00:00:05 | 2.0 |
| 1970-01-01T00:00:10 | 2.0 |
| 1970-01-01T00:00:15 | 2.0 |
+---------------------+--------------------------------+
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(host{host="host1"} + scalar(host{host="host2"}));
+---------------------+-----------------------------------+
| ts | scalar(lhs.val + rhs.scalar(val)) |
+---------------------+-----------------------------------+
| 1970-01-01T00:00:00 | 3.0 |
| 1970-01-01T00:00:05 | 7.0 |
| 1970-01-01T00:00:10 | 11.0 |
| 1970-01-01T00:00:15 | 15.0 |
+---------------------+-----------------------------------+
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(scalar(host{host="host2"}) + host{host="host1"});
+---------------------+-----------------------------------+
| ts | scalar(lhs.scalar(val) + rhs.val) |
+---------------------+-----------------------------------+
| 1970-01-01T00:00:00 | 3.0 |
| 1970-01-01T00:00:05 | 7.0 |
| 1970-01-01T00:00:10 | 11.0 |
| 1970-01-01T00:00:15 | 15.0 |
+---------------------+-----------------------------------+
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(host + scalar(host{host="host2"}));
+---------------------+-----------------------------------+
| ts | scalar(lhs.val + rhs.scalar(val)) |
+---------------------+-----------------------------------+
| 1970-01-01T00:00:00 | NaN |
| 1970-01-01T00:00:05 | NaN |
| 1970-01-01T00:00:10 | NaN |
| 1970-01-01T00:00:15 | NaN |
+---------------------+-----------------------------------+
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(scalar(host{host="host2"}) + host);
+---------------------+-----------------------------------+
| ts | scalar(lhs.scalar(val) + rhs.val) |
+---------------------+-----------------------------------+
| 1970-01-01T00:00:00 | NaN |
| 1970-01-01T00:00:05 | NaN |
| 1970-01-01T00:00:10 | NaN |
| 1970-01-01T00:00:15 | NaN |
+---------------------+-----------------------------------+
-- case have multiple time series, scalar return NaN
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(host);
+---------------------+-------------+
| ts | scalar(val) |
+---------------------+-------------+
| 1970-01-01T00:00:00 | NaN |
| 1970-01-01T00:00:05 | NaN |
| 1970-01-01T00:00:10 | NaN |
| 1970-01-01T00:00:15 | NaN |
+---------------------+-------------+
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(host) + 1;
+---------------------+--------------------------+
| ts | scalar(val) + Float64(1) |
+---------------------+--------------------------+
| 1970-01-01T00:00:00 | NaN |
| 1970-01-01T00:00:05 | NaN |
| 1970-01-01T00:00:10 | NaN |
| 1970-01-01T00:00:15 | NaN |
+---------------------+--------------------------+
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') 1 + scalar(host);
+---------------------+--------------------------+
| ts | Float64(1) + scalar(val) |
+---------------------+--------------------------+
| 1970-01-01T00:00:00 | NaN |
| 1970-01-01T00:00:05 | NaN |
| 1970-01-01T00:00:10 | NaN |
| 1970-01-01T00:00:15 | NaN |
+---------------------+--------------------------+
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(host) + scalar(host);
+---------------------+-----------------------------------+
| ts | lhs.scalar(val) + rhs.scalar(val) |
+---------------------+-----------------------------------+
| 1970-01-01T00:00:00 | NaN |
| 1970-01-01T00:00:05 | NaN |
| 1970-01-01T00:00:10 | NaN |
| 1970-01-01T00:00:15 | NaN |
+---------------------+-----------------------------------+
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') host + scalar(host);
+-------+---------------------+---------------------------+
| host | ts | lhs.val + rhs.scalar(val) |
+-------+---------------------+---------------------------+
| host1 | 1970-01-01T00:00:00 | NaN |
| host1 | 1970-01-01T00:00:05 | NaN |
| host1 | 1970-01-01T00:00:10 | NaN |
| host1 | 1970-01-01T00:00:15 | NaN |
| host2 | 1970-01-01T00:00:00 | NaN |
| host2 | 1970-01-01T00:00:05 | NaN |
| host2 | 1970-01-01T00:00:10 | NaN |
| host2 | 1970-01-01T00:00:15 | NaN |
+-------+---------------------+---------------------------+
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(host) + host;
+-------+---------------------+---------------------------+
| host | ts | lhs.scalar(val) + rhs.val |
+-------+---------------------+---------------------------+
| host1 | 1970-01-01T00:00:00 | NaN |
| host1 | 1970-01-01T00:00:05 | NaN |
| host1 | 1970-01-01T00:00:10 | NaN |
| host1 | 1970-01-01T00:00:15 | NaN |
| host2 | 1970-01-01T00:00:00 | NaN |
| host2 | 1970-01-01T00:00:05 | NaN |
| host2 | 1970-01-01T00:00:10 | NaN |
| host2 | 1970-01-01T00:00:15 | NaN |
+-------+---------------------+---------------------------+
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') host{host="host2"} + scalar(host);
+-------+---------------------+---------------------------+
| host | ts | lhs.val + rhs.scalar(val) |
+-------+---------------------+---------------------------+
| host2 | 1970-01-01T00:00:00 | NaN |
| host2 | 1970-01-01T00:00:05 | NaN |
| host2 | 1970-01-01T00:00:10 | NaN |
| host2 | 1970-01-01T00:00:15 | NaN |
+-------+---------------------+---------------------------+
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(host) + host{host="host2"};
+-------+---------------------+---------------------------+
| host | ts | lhs.scalar(val) + rhs.val |
+-------+---------------------+---------------------------+
| host2 | 1970-01-01T00:00:00 | NaN |
| host2 | 1970-01-01T00:00:05 | NaN |
| host2 | 1970-01-01T00:00:10 | NaN |
| host2 | 1970-01-01T00:00:15 | NaN |
+-------+---------------------+---------------------------+
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(host{host="host1"} + scalar(host));
+---------------------+-----------------------------------+
| ts | scalar(lhs.val + rhs.scalar(val)) |
+---------------------+-----------------------------------+
| 1970-01-01T00:00:00 | NaN |
| 1970-01-01T00:00:05 | NaN |
| 1970-01-01T00:00:10 | NaN |
| 1970-01-01T00:00:15 | NaN |
+---------------------+-----------------------------------+
-- error case
TQL EVAL (0, 15, '5s') scalar(1 + scalar(host{host="host2"}));
Error: 2000(InvalidSyntax), expected type vector in call to function 'scalar', got scalar
TQL EVAL (0, 15, '5s') scalar(scalar(host{host="host2"}) + 1);
Error: 2000(InvalidSyntax), expected type vector in call to function 'scalar', got scalar
TQL EVAL (0, 15, '5s') scalar(scalar(host{host="host1"}) + scalar(host{host="host2"}));
Error: 2000(InvalidSyntax), expected type vector in call to function 'scalar', got scalar
Drop table host;
Affected Rows: 0

View File

@@ -0,0 +1,95 @@
CREATE TABLE host (
ts timestamp(3) time index,
host STRING PRIMARY KEY,
val BIGINT,
);
INSERT INTO TABLE host VALUES
(0, 'host1', 1),
(0, 'host2', 2),
(5000, 'host1', 3),
(5000, 'host2', 4),
(10000, 'host1', 5),
(10000, 'host2', 6),
(15000, 'host1', 7),
(15000, 'host2', 8);
-- case only have one time series, scalar return value
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(host{host="host1"});
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(host{host="host1"}) + 1;
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') 1 + scalar(host{host="host1"});
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(host{host="host1"}) + scalar(host{host="host2"});
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') host{host="host1"} + scalar(host{host="host2"});
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(host{host="host1"}) + host{host="host2"};
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') host + scalar(host{host="host2"});
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(host{host="host1"}) + host;
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(count(count(host) by (host)));
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(host{host="host1"} + scalar(host{host="host2"}));
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(scalar(host{host="host2"}) + host{host="host1"});
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(host + scalar(host{host="host2"}));
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(scalar(host{host="host2"}) + host);
-- case have multiple time series, scalar return NaN
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(host);
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(host) + 1;
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') 1 + scalar(host);
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(host) + scalar(host);
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') host + scalar(host);
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(host) + host;
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') host{host="host2"} + scalar(host);
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(host) + host{host="host2"};
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') scalar(host{host="host1"} + scalar(host));
-- error case
TQL EVAL (0, 15, '5s') scalar(1 + scalar(host{host="host2"}));
TQL EVAL (0, 15, '5s') scalar(scalar(host{host="host2"}) + 1);
TQL EVAL (0, 15, '5s') scalar(scalar(host{host="host1"}) + scalar(host{host="host2"}));
Drop table host;