feat: impl EmptyMetric plan and time() function (#1100)

* impl EmptyMetric plan

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add test cases

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* impl planner part

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* adapt new datafusion changes

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-03-02 11:15:55 +08:00
committed by GitHub
parent 4b8db408cf
commit e7b92f24e8
4 changed files with 407 additions and 3 deletions

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod empty_metric;
mod instant_manipulate;
mod normalize;
mod planner;
@@ -19,6 +20,7 @@ mod range_manipulate;
mod series_divide;
use datafusion::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType};
pub use empty_metric::{EmptyMetric, EmptyMetricExec, EmptyMetricStream};
pub use instant_manipulate::{InstantManipulate, InstantManipulateExec, InstantManipulateStream};
pub use normalize::{SeriesNormalize, SeriesNormalizeExec, SeriesNormalizeStream};
pub use planner::PromExtensionPlanner;

View File

@@ -0,0 +1,373 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use datafusion::arrow::array::Float64Array;
use datafusion::arrow::datatypes::{DataType, TimeUnit};
use datafusion::common::{DFField, DFSchema, DFSchemaRef, Result as DataFusionResult, Statistics};
use datafusion::error::DataFusionError;
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode};
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
};
use datafusion::prelude::Expr;
use datatypes::arrow::array::TimestampMillisecondArray;
use datatypes::arrow::datatypes::SchemaRef;
use datatypes::arrow::record_batch::RecordBatch;
use futures::Stream;
use crate::extension_plan::Millisecond;
#[derive(Debug, Clone)]
pub struct EmptyMetric {
start: Millisecond,
end: Millisecond,
interval: Millisecond,
schema: DFSchemaRef,
}
impl EmptyMetric {
pub fn new(
start: Millisecond,
end: Millisecond,
interval: Millisecond,
time_index_column_name: String,
value_column_name: String,
) -> DataFusionResult<Self> {
let schema = Arc::new(DFSchema::new_with_metadata(
vec![
DFField::new(
None,
&time_index_column_name,
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
DFField::new(None, &value_column_name, DataType::Float64, true),
],
HashMap::new(),
)?);
Ok(Self {
start,
end,
interval,
schema,
})
}
pub fn to_execution_plan(&self) -> Arc<dyn ExecutionPlan> {
// let schema = self.schema.to
Arc::new(EmptyMetricExec {
start: self.start,
end: self.end,
interval: self.interval,
schema: Arc::new(self.schema.as_ref().into()),
metric: ExecutionPlanMetricsSet::new(),
})
}
}
impl UserDefinedLogicalNode for EmptyMetric {
fn as_any(&self) -> &dyn Any {
self as _
}
fn inputs(&self) -> Vec<&LogicalPlan> {
vec![]
}
fn schema(&self) -> &DFSchemaRef {
&self.schema
}
fn expressions(&self) -> Vec<Expr> {
vec![]
}
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"EmptyMetric: range=[{}..{}], interval=[{}]",
self.start, self.end, self.interval,
)
}
fn from_template(
&self,
_exprs: &[datafusion::prelude::Expr],
_inputs: &[LogicalPlan],
) -> Arc<dyn UserDefinedLogicalNode> {
Arc::new(self.clone())
}
}
#[derive(Debug, Clone)]
pub struct EmptyMetricExec {
start: Millisecond,
end: Millisecond,
interval: Millisecond,
schema: SchemaRef,
metric: ExecutionPlanMetricsSet,
}
impl ExecutionPlan for EmptyMetricExec {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(1)
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}
fn maintains_input_order(&self) -> Vec<bool> {
vec![]
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(self.as_ref().clone()))
}
fn execute(
&self,
partition: usize,
_context: Arc<TaskContext>,
) -> DataFusionResult<SendableRecordBatchStream> {
let baseline_metric = BaselineMetrics::new(&self.metric, partition);
Ok(Box::pin(EmptyMetricStream {
start: self.start,
end: self.end,
interval: self.interval,
is_first_poll: true,
schema: self.schema.clone(),
metric: baseline_metric,
}))
}
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default => write!(
f,
"EmptyMetric: range=[{}..{}], interval=[{}]",
self.start, self.end, self.interval,
),
}
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metric.clone_inner())
}
fn statistics(&self) -> Statistics {
let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64;
let total_byte_size = estimated_row_num * std::mem::size_of::<Millisecond>() as f64;
Statistics {
num_rows: Some(estimated_row_num.floor() as _),
total_byte_size: Some(total_byte_size.floor() as _),
column_statistics: None,
is_exact: true,
}
}
}
pub struct EmptyMetricStream {
start: Millisecond,
end: Millisecond,
interval: Millisecond,
// only generate one record batch at the first poll
is_first_poll: bool,
schema: SchemaRef,
metric: BaselineMetrics,
}
impl RecordBatchStream for EmptyMetricStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
impl Stream for EmptyMetricStream {
type Item = DataFusionResult<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let result = if self.is_first_poll {
self.is_first_poll = false;
let _timer = self.metric.elapsed_compute().timer();
let result_array = (self.start..=self.end)
.step_by(self.interval as _)
.collect::<Vec<_>>();
let float_array =
Float64Array::from_iter(result_array.iter().map(|v| *v as f64 / 1000.0));
let millisecond_array = TimestampMillisecondArray::from(result_array);
let batch = RecordBatch::try_new(
self.schema.clone(),
vec![Arc::new(millisecond_array), Arc::new(float_array)],
)
.map_err(DataFusionError::ArrowError);
Poll::Ready(Some(batch))
} else {
Poll::Ready(None)
};
self.metric.record_poll(result)
}
}
#[cfg(test)]
mod test {
use datafusion::prelude::SessionContext;
use super::*;
async fn do_empty_metric_test(
start: Millisecond,
end: Millisecond,
interval: Millisecond,
time_column_name: String,
value_column_name: String,
expected: String,
) {
let empty_metric =
EmptyMetric::new(start, end, interval, time_column_name, value_column_name).unwrap();
let empty_metric_exec = empty_metric.to_execution_plan();
let session_context = SessionContext::default();
let result =
datafusion::physical_plan::collect(empty_metric_exec, session_context.task_ctx())
.await
.unwrap();
let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
.unwrap()
.to_string();
assert_eq!(result_literal, expected);
}
#[tokio::test]
async fn normal_empty_metric_test() {
do_empty_metric_test(
0,
100,
10,
"time".to_string(),
"value".to_string(),
String::from(
"+-------------------------+-------+\
\n| time | value |\
\n+-------------------------+-------+\
\n| 1970-01-01T00:00:00 | 0.0 |\
\n| 1970-01-01T00:00:00.010 | 0.01 |\
\n| 1970-01-01T00:00:00.020 | 0.02 |\
\n| 1970-01-01T00:00:00.030 | 0.03 |\
\n| 1970-01-01T00:00:00.040 | 0.04 |\
\n| 1970-01-01T00:00:00.050 | 0.05 |\
\n| 1970-01-01T00:00:00.060 | 0.06 |\
\n| 1970-01-01T00:00:00.070 | 0.07 |\
\n| 1970-01-01T00:00:00.080 | 0.08 |\
\n| 1970-01-01T00:00:00.090 | 0.09 |\
\n| 1970-01-01T00:00:00.100 | 0.1 |\
\n+-------------------------+-------+",
),
)
.await
}
#[tokio::test]
async fn unaligned_empty_metric_test() {
do_empty_metric_test(
0,
100,
11,
"time".to_string(),
"value".to_string(),
String::from(
"+-------------------------+-------+\
\n| time | value |\
\n+-------------------------+-------+\
\n| 1970-01-01T00:00:00 | 0.0 |\
\n| 1970-01-01T00:00:00.011 | 0.011 |\
\n| 1970-01-01T00:00:00.022 | 0.022 |\
\n| 1970-01-01T00:00:00.033 | 0.033 |\
\n| 1970-01-01T00:00:00.044 | 0.044 |\
\n| 1970-01-01T00:00:00.055 | 0.055 |\
\n| 1970-01-01T00:00:00.066 | 0.066 |\
\n| 1970-01-01T00:00:00.077 | 0.077 |\
\n| 1970-01-01T00:00:00.088 | 0.088 |\
\n| 1970-01-01T00:00:00.099 | 0.099 |\
\n+-------------------------+-------+",
),
)
.await
}
#[tokio::test]
async fn one_row_empty_metric_test() {
do_empty_metric_test(
0,
100,
1000,
"time".to_string(),
"value".to_string(),
String::from(
"+---------------------+-------+\
\n| time | value |\
\n+---------------------+-------+\
\n| 1970-01-01T00:00:00 | 0.0 |\
\n+---------------------+-------+",
),
)
.await
}
#[tokio::test]
async fn negative_range_empty_metric_test() {
do_empty_metric_test(
1000,
-1000,
10,
"time".to_string(),
"value".to_string(),
String::from(
"+------+-------+\
\n| time | value |\
\n+------+-------+\
\n+------+-------+",
),
)
.await
}
}

View File

@@ -21,8 +21,9 @@ use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode};
use datafusion::physical_plan::planner::ExtensionPlanner;
use datafusion::physical_plan::{ExecutionPlan, PhysicalPlanner};
use super::{InstantManipulate, RangeManipulate, SeriesDivide};
use crate::extension_plan::SeriesNormalize;
use crate::extension_plan::{
EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize,
};
pub struct PromExtensionPlanner {}
@@ -44,6 +45,8 @@ impl ExtensionPlanner for PromExtensionPlanner {
Ok(Some(node.to_execution_plan(physical_inputs[0].clone())))
} else if let Some(node) = node.as_any().downcast_ref::<SeriesDivide>() {
Ok(Some(node.to_execution_plan(physical_inputs[0].clone())))
} else if let Some(node) = node.as_any().downcast_ref::<EmptyMetric>() {
Ok(Some(node.to_execution_plan()))
} else {
Ok(None)
}

View File

@@ -46,12 +46,18 @@ use crate::error::{
UnsupportedExprSnafu, ValueNotFoundSnafu,
};
use crate::extension_plan::{
InstantManipulate, Millisecond, RangeManipulate, SeriesDivide, SeriesNormalize,
EmptyMetric, InstantManipulate, Millisecond, RangeManipulate, SeriesDivide, SeriesNormalize,
};
use crate::functions::{IDelta, Increase};
const LEFT_PLAN_JOIN_ALIAS: &str = "lhs";
/// `time()` function in PromQL.
const SPECIAL_TIME_FUNCTION: &str = "time";
/// default value column name for empty metric
const DEFAULT_VALUE_COLUMN: &str = "value";
#[derive(Default, Debug, Clone)]
struct PromPlannerContext {
// query parameters
@@ -331,6 +337,26 @@ impl PromPlanner {
})
}
PromExpr::Call(Call { func, args }) => {
// TODO(ruihang): refactor this, transform the AST in advance to include an empty metric table.
if func.name == SPECIAL_TIME_FUNCTION {
self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
self.ctx.value_columns = vec![DEFAULT_VALUE_COLUMN.to_string()];
self.ctx.table_name = Some(String::new());
return Ok(LogicalPlan::Extension(Extension {
node: Arc::new(
EmptyMetric::new(
self.ctx.start,
self.ctx.end,
self.ctx.interval,
SPECIAL_TIME_FUNCTION.to_string(),
DEFAULT_VALUE_COLUMN.to_string(),
)
.context(DataFusionPlanningSnafu)?,
),
}));
}
let args = self.create_function_args(&args.args)?;
let input = self
.prom_expr_to_plan(args.input.with_context(|| ExpectExprSnafu {