From e7b92f24e8057c16a616da57d8ad8e70fa90fba9 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 2 Mar 2023 11:15:55 +0800 Subject: [PATCH] feat: impl `EmptyMetric` plan and `time()` function (#1100) * impl EmptyMetric plan Signed-off-by: Ruihang Xia * add test cases Signed-off-by: Ruihang Xia * impl planner part Signed-off-by: Ruihang Xia * adapt new datafusion changes Signed-off-by: Ruihang Xia * fix typo Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/promql/src/extension_plan.rs | 2 + src/promql/src/extension_plan/empty_metric.rs | 373 ++++++++++++++++++ src/promql/src/extension_plan/planner.rs | 7 +- src/promql/src/planner.rs | 28 +- 4 files changed, 407 insertions(+), 3 deletions(-) create mode 100644 src/promql/src/extension_plan/empty_metric.rs diff --git a/src/promql/src/extension_plan.rs b/src/promql/src/extension_plan.rs index 3bf8cab70c..5c1a3cbbb3 100644 --- a/src/promql/src/extension_plan.rs +++ b/src/promql/src/extension_plan.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod empty_metric; mod instant_manipulate; mod normalize; mod planner; @@ -19,6 +20,7 @@ mod range_manipulate; mod series_divide; use datafusion::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType}; +pub use empty_metric::{EmptyMetric, EmptyMetricExec, EmptyMetricStream}; pub use instant_manipulate::{InstantManipulate, InstantManipulateExec, InstantManipulateStream}; pub use normalize::{SeriesNormalize, SeriesNormalizeExec, SeriesNormalizeStream}; pub use planner::PromExtensionPlanner; diff --git a/src/promql/src/extension_plan/empty_metric.rs b/src/promql/src/extension_plan/empty_metric.rs new file mode 100644 index 0000000000..26ed6d508c --- /dev/null +++ b/src/promql/src/extension_plan/empty_metric.rs @@ -0,0 +1,373 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::HashMap; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use datafusion::arrow::array::Float64Array; +use datafusion::arrow::datatypes::{DataType, TimeUnit}; +use datafusion::common::{DFField, DFSchema, DFSchemaRef, Result as DataFusionResult, Statistics}; +use datafusion::error::DataFusionError; +use datafusion::execution::context::TaskContext; +use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode}; +use datafusion::physical_expr::PhysicalSortExpr; +use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; +use datafusion::physical_plan::{ + DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, +}; +use datafusion::prelude::Expr; +use datatypes::arrow::array::TimestampMillisecondArray; +use datatypes::arrow::datatypes::SchemaRef; +use datatypes::arrow::record_batch::RecordBatch; +use futures::Stream; + +use crate::extension_plan::Millisecond; + +#[derive(Debug, Clone)] +pub struct EmptyMetric { + start: Millisecond, + end: Millisecond, + interval: Millisecond, + schema: DFSchemaRef, +} + +impl EmptyMetric { + pub fn new( + start: Millisecond, + end: Millisecond, + interval: Millisecond, + time_index_column_name: String, + value_column_name: String, + ) -> DataFusionResult { + let schema = Arc::new(DFSchema::new_with_metadata( + vec![ + DFField::new( + None, + &time_index_column_name, + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + DFField::new(None, &value_column_name, DataType::Float64, true), + ], + HashMap::new(), + )?); + + Ok(Self { + start, + end, + interval, + schema, + }) + } + + pub fn to_execution_plan(&self) -> Arc { + // let schema = self.schema.to + Arc::new(EmptyMetricExec { + start: self.start, + end: self.end, + interval: self.interval, + schema: Arc::new(self.schema.as_ref().into()), + metric: ExecutionPlanMetricsSet::new(), + }) + } +} + +impl UserDefinedLogicalNode for EmptyMetric { + fn as_any(&self) -> &dyn Any { + self as _ + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![] + } + + fn schema(&self) -> &DFSchemaRef { + &self.schema + } + + fn expressions(&self) -> Vec { + vec![] + } + + fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + f, + "EmptyMetric: range=[{}..{}], interval=[{}]", + self.start, self.end, self.interval, + ) + } + + fn from_template( + &self, + _exprs: &[datafusion::prelude::Expr], + _inputs: &[LogicalPlan], + ) -> Arc { + Arc::new(self.clone()) + } +} + +#[derive(Debug, Clone)] +pub struct EmptyMetricExec { + start: Millisecond, + end: Millisecond, + interval: Millisecond, + schema: SchemaRef, + + metric: ExecutionPlanMetricsSet, +} + +impl ExecutionPlan for EmptyMetricExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(1) + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn maintains_input_order(&self) -> Vec { + vec![] + } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> DataFusionResult> { + Ok(Arc::new(self.as_ref().clone())) + } + + fn execute( + &self, + partition: usize, + _context: Arc, + ) -> DataFusionResult { + let baseline_metric = BaselineMetrics::new(&self.metric, partition); + Ok(Box::pin(EmptyMetricStream { + start: self.start, + end: self.end, + interval: self.interval, + is_first_poll: true, + schema: self.schema.clone(), + metric: baseline_metric, + })) + } + + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default => write!( + f, + "EmptyMetric: range=[{}..{}], interval=[{}]", + self.start, self.end, self.interval, + ), + } + } + + fn metrics(&self) -> Option { + Some(self.metric.clone_inner()) + } + + fn statistics(&self) -> Statistics { + let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64; + let total_byte_size = estimated_row_num * std::mem::size_of::() as f64; + + Statistics { + num_rows: Some(estimated_row_num.floor() as _), + total_byte_size: Some(total_byte_size.floor() as _), + column_statistics: None, + is_exact: true, + } + } +} + +pub struct EmptyMetricStream { + start: Millisecond, + end: Millisecond, + interval: Millisecond, + // only generate one record batch at the first poll + is_first_poll: bool, + schema: SchemaRef, + metric: BaselineMetrics, +} + +impl RecordBatchStream for EmptyMetricStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +impl Stream for EmptyMetricStream { + type Item = DataFusionResult; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + let result = if self.is_first_poll { + self.is_first_poll = false; + let _timer = self.metric.elapsed_compute().timer(); + let result_array = (self.start..=self.end) + .step_by(self.interval as _) + .collect::>(); + let float_array = + Float64Array::from_iter(result_array.iter().map(|v| *v as f64 / 1000.0)); + let millisecond_array = TimestampMillisecondArray::from(result_array); + let batch = RecordBatch::try_new( + self.schema.clone(), + vec![Arc::new(millisecond_array), Arc::new(float_array)], + ) + .map_err(DataFusionError::ArrowError); + Poll::Ready(Some(batch)) + } else { + Poll::Ready(None) + }; + self.metric.record_poll(result) + } +} + +#[cfg(test)] +mod test { + use datafusion::prelude::SessionContext; + + use super::*; + + async fn do_empty_metric_test( + start: Millisecond, + end: Millisecond, + interval: Millisecond, + time_column_name: String, + value_column_name: String, + expected: String, + ) { + let empty_metric = + EmptyMetric::new(start, end, interval, time_column_name, value_column_name).unwrap(); + let empty_metric_exec = empty_metric.to_execution_plan(); + + let session_context = SessionContext::default(); + let result = + datafusion::physical_plan::collect(empty_metric_exec, session_context.task_ctx()) + .await + .unwrap(); + let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result) + .unwrap() + .to_string(); + + assert_eq!(result_literal, expected); + } + + #[tokio::test] + async fn normal_empty_metric_test() { + do_empty_metric_test( + 0, + 100, + 10, + "time".to_string(), + "value".to_string(), + String::from( + "+-------------------------+-------+\ + \n| time | value |\ + \n+-------------------------+-------+\ + \n| 1970-01-01T00:00:00 | 0.0 |\ + \n| 1970-01-01T00:00:00.010 | 0.01 |\ + \n| 1970-01-01T00:00:00.020 | 0.02 |\ + \n| 1970-01-01T00:00:00.030 | 0.03 |\ + \n| 1970-01-01T00:00:00.040 | 0.04 |\ + \n| 1970-01-01T00:00:00.050 | 0.05 |\ + \n| 1970-01-01T00:00:00.060 | 0.06 |\ + \n| 1970-01-01T00:00:00.070 | 0.07 |\ + \n| 1970-01-01T00:00:00.080 | 0.08 |\ + \n| 1970-01-01T00:00:00.090 | 0.09 |\ + \n| 1970-01-01T00:00:00.100 | 0.1 |\ + \n+-------------------------+-------+", + ), + ) + .await + } + + #[tokio::test] + async fn unaligned_empty_metric_test() { + do_empty_metric_test( + 0, + 100, + 11, + "time".to_string(), + "value".to_string(), + String::from( + "+-------------------------+-------+\ + \n| time | value |\ + \n+-------------------------+-------+\ + \n| 1970-01-01T00:00:00 | 0.0 |\ + \n| 1970-01-01T00:00:00.011 | 0.011 |\ + \n| 1970-01-01T00:00:00.022 | 0.022 |\ + \n| 1970-01-01T00:00:00.033 | 0.033 |\ + \n| 1970-01-01T00:00:00.044 | 0.044 |\ + \n| 1970-01-01T00:00:00.055 | 0.055 |\ + \n| 1970-01-01T00:00:00.066 | 0.066 |\ + \n| 1970-01-01T00:00:00.077 | 0.077 |\ + \n| 1970-01-01T00:00:00.088 | 0.088 |\ + \n| 1970-01-01T00:00:00.099 | 0.099 |\ + \n+-------------------------+-------+", + ), + ) + .await + } + + #[tokio::test] + async fn one_row_empty_metric_test() { + do_empty_metric_test( + 0, + 100, + 1000, + "time".to_string(), + "value".to_string(), + String::from( + "+---------------------+-------+\ + \n| time | value |\ + \n+---------------------+-------+\ + \n| 1970-01-01T00:00:00 | 0.0 |\ + \n+---------------------+-------+", + ), + ) + .await + } + + #[tokio::test] + async fn negative_range_empty_metric_test() { + do_empty_metric_test( + 1000, + -1000, + 10, + "time".to_string(), + "value".to_string(), + String::from( + "+------+-------+\ + \n| time | value |\ + \n+------+-------+\ + \n+------+-------+", + ), + ) + .await + } +} diff --git a/src/promql/src/extension_plan/planner.rs b/src/promql/src/extension_plan/planner.rs index 03366c0fb2..3beedf3d2c 100644 --- a/src/promql/src/extension_plan/planner.rs +++ b/src/promql/src/extension_plan/planner.rs @@ -21,8 +21,9 @@ use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode}; use datafusion::physical_plan::planner::ExtensionPlanner; use datafusion::physical_plan::{ExecutionPlan, PhysicalPlanner}; -use super::{InstantManipulate, RangeManipulate, SeriesDivide}; -use crate::extension_plan::SeriesNormalize; +use crate::extension_plan::{ + EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize, +}; pub struct PromExtensionPlanner {} @@ -44,6 +45,8 @@ impl ExtensionPlanner for PromExtensionPlanner { Ok(Some(node.to_execution_plan(physical_inputs[0].clone()))) } else if let Some(node) = node.as_any().downcast_ref::() { Ok(Some(node.to_execution_plan(physical_inputs[0].clone()))) + } else if let Some(node) = node.as_any().downcast_ref::() { + Ok(Some(node.to_execution_plan())) } else { Ok(None) } diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 9cd5e16783..11ccbcdcb8 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -46,12 +46,18 @@ use crate::error::{ UnsupportedExprSnafu, ValueNotFoundSnafu, }; use crate::extension_plan::{ - InstantManipulate, Millisecond, RangeManipulate, SeriesDivide, SeriesNormalize, + EmptyMetric, InstantManipulate, Millisecond, RangeManipulate, SeriesDivide, SeriesNormalize, }; use crate::functions::{IDelta, Increase}; const LEFT_PLAN_JOIN_ALIAS: &str = "lhs"; +/// `time()` function in PromQL. +const SPECIAL_TIME_FUNCTION: &str = "time"; + +/// default value column name for empty metric +const DEFAULT_VALUE_COLUMN: &str = "value"; + #[derive(Default, Debug, Clone)] struct PromPlannerContext { // query parameters @@ -331,6 +337,26 @@ impl PromPlanner { }) } PromExpr::Call(Call { func, args }) => { + // TODO(ruihang): refactor this, transform the AST in advance to include an empty metric table. + if func.name == SPECIAL_TIME_FUNCTION { + self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string()); + self.ctx.value_columns = vec![DEFAULT_VALUE_COLUMN.to_string()]; + self.ctx.table_name = Some(String::new()); + + return Ok(LogicalPlan::Extension(Extension { + node: Arc::new( + EmptyMetric::new( + self.ctx.start, + self.ctx.end, + self.ctx.interval, + SPECIAL_TIME_FUNCTION.to_string(), + DEFAULT_VALUE_COLUMN.to_string(), + ) + .context(DataFusionPlanningSnafu)?, + ), + })); + } + let args = self.create_function_args(&args.args)?; let input = self .prom_expr_to_plan(args.input.with_context(|| ExpectExprSnafu {