diff --git a/src/promql/src/extension_plan.rs b/src/promql/src/extension_plan.rs index 9363ff73d4..3bf8cab70c 100644 --- a/src/promql/src/extension_plan.rs +++ b/src/promql/src/extension_plan.rs @@ -16,11 +16,13 @@ mod instant_manipulate; mod normalize; mod planner; mod range_manipulate; +mod series_divide; use datafusion::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType}; pub use instant_manipulate::{InstantManipulate, InstantManipulateExec, InstantManipulateStream}; pub use normalize::{SeriesNormalize, SeriesNormalizeExec, SeriesNormalizeStream}; pub use planner::PromExtensionPlanner; pub use range_manipulate::{RangeManipulate, RangeManipulateExec, RangeManipulateStream}; +pub use series_divide::{SeriesDivide, SeriesDivideExec, SeriesDivideStream}; pub(crate) type Millisecond = ::Native; diff --git a/src/promql/src/extension_plan/planner.rs b/src/promql/src/extension_plan/planner.rs index 5ad9bacf2d..03366c0fb2 100644 --- a/src/promql/src/extension_plan/planner.rs +++ b/src/promql/src/extension_plan/planner.rs @@ -21,7 +21,7 @@ use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode}; use datafusion::physical_plan::planner::ExtensionPlanner; use datafusion::physical_plan::{ExecutionPlan, PhysicalPlanner}; -use super::{InstantManipulate, RangeManipulate}; +use super::{InstantManipulate, RangeManipulate, SeriesDivide}; use crate::extension_plan::SeriesNormalize; pub struct PromExtensionPlanner {} @@ -42,6 +42,8 @@ impl ExtensionPlanner for PromExtensionPlanner { Ok(Some(node.to_execution_plan(physical_inputs[0].clone()))) } else if let Some(node) = node.as_any().downcast_ref::() { Ok(Some(node.to_execution_plan(physical_inputs[0].clone()))) + } else if let Some(node) = node.as_any().downcast_ref::() { + Ok(Some(node.to_execution_plan(physical_inputs[0].clone()))) } else { Ok(None) } diff --git a/src/promql/src/extension_plan/series_divide.rs b/src/promql/src/extension_plan/series_divide.rs new file mode 100644 index 0000000000..9e5097b0b4 --- /dev/null +++ b/src/promql/src/extension_plan/series_divide.rs @@ -0,0 +1,458 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use datafusion::arrow::array::{Array, StringArray}; +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::common::DFSchemaRef; +use datafusion::error::Result as DataFusionResult; +use datafusion::execution::context::TaskContext; +use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNode}; +use datafusion::physical_expr::PhysicalSortExpr; +use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; +use datafusion::physical_plan::{ + DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, + Statistics, +}; +use datatypes::arrow::compute; +use datatypes::arrow::error::Result as ArrowResult; +use futures::{ready, Stream, StreamExt}; + +#[derive(Debug)] +pub struct SeriesDivide { + tag_columns: Vec, + input: LogicalPlan, +} + +impl UserDefinedLogicalNode for SeriesDivide { + fn as_any(&self) -> &dyn Any { + self as _ + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![&self.input] + } + + fn schema(&self) -> &DFSchemaRef { + self.input.schema() + } + + fn expressions(&self) -> Vec { + vec![] + } + + fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "PromSeriesDivide: tags={:?}", self.tag_columns) + } + + fn from_template( + &self, + _exprs: &[Expr], + inputs: &[LogicalPlan], + ) -> Arc { + assert!(!inputs.is_empty()); + + Arc::new(Self { + tag_columns: self.tag_columns.clone(), + input: inputs[0].clone(), + }) + } +} + +impl SeriesDivide { + pub fn new(tag_columns: Vec, input: LogicalPlan) -> Self { + Self { tag_columns, input } + } + + pub fn to_execution_plan(&self, exec_input: Arc) -> Arc { + Arc::new(SeriesDivideExec { + tag_columns: self.tag_columns.clone(), + input: exec_input, + metric: ExecutionPlanMetricsSet::new(), + }) + } +} + +#[derive(Debug)] +pub struct SeriesDivideExec { + tag_columns: Vec, + input: Arc, + metric: ExecutionPlanMetricsSet, +} + +impl ExecutionPlan for SeriesDivideExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.input.schema() + } + + fn output_partitioning(&self) -> Partitioning { + self.input.output_partitioning() + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + self.input.output_ordering() + } + + fn maintains_input_order(&self) -> bool { + true + } + + fn children(&self) -> Vec> { + vec![self.input.clone()] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> DataFusionResult> { + assert!(!children.is_empty()); + Ok(Arc::new(Self { + tag_columns: self.tag_columns.clone(), + input: children[0].clone(), + metric: self.metric.clone(), + })) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> DataFusionResult { + let baseline_metric = BaselineMetrics::new(&self.metric, partition); + + let input = self.input.execute(partition, context)?; + let schema = input.schema(); + let tag_indices = self + .tag_columns + .iter() + .map(|tag| { + schema + .column_with_name(tag) + .unwrap_or_else(|| panic!("tag column not found {tag}")) + .0 + }) + .collect(); + Ok(Box::pin(SeriesDivideStream { + tag_indices, + buffer: None, + schema, + input, + metric: baseline_metric, + })) + } + + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "PromSeriesDivideExec: tags={:?}", self.tag_columns) + } + } + } + + fn metrics(&self) -> Option { + Some(self.metric.clone_inner()) + } + + fn statistics(&self) -> Statistics { + Statistics { + num_rows: None, + total_byte_size: None, + // TODO(ruihang): support this column statistics + column_statistics: None, + is_exact: false, + } + } +} + +/// Assume the input stream is ordered on the tag columns. +pub struct SeriesDivideStream { + tag_indices: Vec, + buffer: Option, + schema: SchemaRef, + input: SendableRecordBatchStream, + metric: BaselineMetrics, +} + +impl RecordBatchStream for SeriesDivideStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +impl Stream for SeriesDivideStream { + type Item = ArrowResult; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + if let Some(batch) = self.buffer.clone() { + let same_length = self.find_first_diff_row(&batch) + 1; + if same_length == batch.num_rows() { + let next_batch = match ready!(self.as_mut().fetch_next_batch(cx)) { + Some(Ok(batch)) => batch, + None => { + self.buffer = None; + return Poll::Ready(Some(Ok(batch))); + } + error => return Poll::Ready(error), + }; + let new_batch = + compute::concat_batches(&batch.schema(), &[batch.clone(), next_batch])?; + self.buffer = Some(new_batch); + continue; + } else { + let result_batch = batch.slice(0, same_length); + let remaining_batch = batch.slice(same_length, batch.num_rows() - same_length); + self.buffer = Some(remaining_batch); + return Poll::Ready(Some(Ok(result_batch))); + } + } else { + let batch = match ready!(self.as_mut().fetch_next_batch(cx)) { + Some(Ok(batch)) => batch, + None => return Poll::Ready(None), + error => return Poll::Ready(error), + }; + self.buffer = Some(batch); + continue; + } + } + } +} + +impl SeriesDivideStream { + fn fetch_next_batch( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + let poll = match self.input.poll_next_unpin(cx) { + Poll::Ready(batch) => { + let _timer = self.metric.elapsed_compute().timer(); + Poll::Ready(batch) + } + Poll::Pending => Poll::Pending, + }; + self.metric.record_poll(poll) + } + + fn find_first_diff_row(&self, batch: &RecordBatch) -> usize { + let num_rows = batch.num_rows(); + let mut result = num_rows; + + for index in &self.tag_indices { + let array = batch.column(*index); + let string_array = array.as_any().downcast_ref::().unwrap(); + // the first row number that not equal to the next row. + let mut same_until = 0; + while same_until < num_rows - 1 { + if string_array.value(same_until) != string_array.value(same_until + 1) { + break; + } + same_until += 1; + } + result = result.min(same_until); + } + + result + } +} + +#[cfg(test)] +mod test { + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use datafusion::from_slice::FromSlice; + use datafusion::physical_plan::memory::MemoryExec; + use datafusion::prelude::SessionContext; + + use super::*; + + fn prepare_test_data() -> MemoryExec { + let schema = Arc::new(Schema::new(vec![ + Field::new("host", DataType::Utf8, true), + Field::new("path", DataType::Utf8, true), + ])); + + let path_column_1 = Arc::new(StringArray::from_slice([ + "foo", "foo", "foo", "bar", "bar", "bar", "bar", "bar", "bar", "bla", "bla", "bla", + ])) as _; + let host_column_1 = Arc::new(StringArray::from_slice([ + "000", "000", "001", "002", "002", "002", "002", "002", "003", "005", "005", "005", + ])) as _; + + let path_column_2 = Arc::new(StringArray::from_slice(["bla", "bla", "bla"])) as _; + let host_column_2 = Arc::new(StringArray::from_slice(["005", "005", "005"])) as _; + + let path_column_3 = Arc::new(StringArray::from_slice([ + "bla", "🥺", "🥺", "🥺", "🥺", "🥺", "🫠", "🫠", + ])) as _; + let host_column_3 = Arc::new(StringArray::from_slice([ + "005", "001", "001", "001", "001", "001", "001", "001", + ])) as _; + + let data_1 = + RecordBatch::try_new(schema.clone(), vec![path_column_1, host_column_1]).unwrap(); + let data_2 = + RecordBatch::try_new(schema.clone(), vec![path_column_2, host_column_2]).unwrap(); + let data_3 = + RecordBatch::try_new(schema.clone(), vec![path_column_3, host_column_3]).unwrap(); + + MemoryExec::try_new(&[vec![data_1, data_2, data_3]], schema, None).unwrap() + } + + #[tokio::test] + async fn overall_data() { + let memory_exec = Arc::new(prepare_test_data()); + let divide_exec = Arc::new(SeriesDivideExec { + tag_columns: vec!["host".to_string(), "path".to_string()], + input: memory_exec, + metric: ExecutionPlanMetricsSet::new(), + }); + let session_context = SessionContext::default(); + let result = datafusion::physical_plan::collect(divide_exec, session_context.task_ctx()) + .await + .unwrap(); + let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result) + .unwrap() + .to_string(); + + let expected = String::from( + "+------+------+\ + \n| host | path |\ + \n+------+------+\ + \n| foo | 000 |\ + \n| foo | 000 |\ + \n| foo | 001 |\ + \n| bar | 002 |\ + \n| bar | 002 |\ + \n| bar | 002 |\ + \n| bar | 002 |\ + \n| bar | 002 |\ + \n| bar | 003 |\ + \n| bla | 005 |\ + \n| bla | 005 |\ + \n| bla | 005 |\ + \n| bla | 005 |\ + \n| bla | 005 |\ + \n| bla | 005 |\ + \n| bla | 005 |\ + \n| 🥺 | 001 |\ + \n| 🥺 | 001 |\ + \n| 🥺 | 001 |\ + \n| 🥺 | 001 |\ + \n| 🥺 | 001 |\ + \n| 🫠 | 001 |\ + \n| 🫠 | 001 |\ + \n+------+------+", + ); + assert_eq!(result_literal, expected); + } + + #[tokio::test] + async fn per_batch_data() { + let memory_exec = Arc::new(prepare_test_data()); + let divide_exec = Arc::new(SeriesDivideExec { + tag_columns: vec!["host".to_string(), "path".to_string()], + input: memory_exec, + metric: ExecutionPlanMetricsSet::new(), + }); + let mut divide_stream = divide_exec + .execute(0, SessionContext::default().task_ctx()) + .unwrap(); + + let mut expectations = vec![ + String::from( + "+------+------+\ + \n| host | path |\ + \n+------+------+\ + \n| foo | 000 |\ + \n| foo | 000 |\ + \n+------+------+", + ), + String::from( + "+------+------+\ + \n| host | path |\ + \n+------+------+\ + \n| foo | 001 |\ + \n+------+------+", + ), + String::from( + "+------+------+\ + \n| host | path |\ + \n+------+------+\ + \n| bar | 002 |\ + \n| bar | 002 |\ + \n| bar | 002 |\ + \n| bar | 002 |\ + \n| bar | 002 |\ + \n+------+------+", + ), + String::from( + "+------+------+\ + \n| host | path |\ + \n+------+------+\ + \n| bar | 003 |\ + \n+------+------+", + ), + String::from( + "+------+------+\ + \n| host | path |\ + \n+------+------+\ + \n| bla | 005 |\ + \n| bla | 005 |\ + \n| bla | 005 |\ + \n| bla | 005 |\ + \n| bla | 005 |\ + \n| bla | 005 |\ + \n| bla | 005 |\ + \n+------+------+", + ), + String::from( + "+------+------+\ + \n| host | path |\ + \n+------+------+\ + \n| 🥺 | 001 |\ + \n| 🥺 | 001 |\ + \n| 🥺 | 001 |\ + \n| 🥺 | 001 |\ + \n| 🥺 | 001 |\ + \n+------+------+", + ), + String::from( + "+------+------+\ + \n| host | path |\ + \n+------+------+\ + \n| 🫠 | 001 |\ + \n| 🫠 | 001 |\ + \n+------+------+", + ), + ]; + expectations.reverse(); + + while let Some(batch) = divide_stream.next().await { + let formatted = + datatypes::arrow::util::pretty::pretty_format_batches(&[batch.unwrap()]) + .unwrap() + .to_string(); + let expected = expectations.pop().unwrap(); + assert_eq!(formatted, expected); + } + } +} diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index c8d2213929..1b749e9142 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -22,7 +22,7 @@ use datafusion::datasource::DefaultTableSource; use datafusion::logical_expr::expr::AggregateFunction; use datafusion::logical_expr::{ AggregateFunction as AggregateFunctionEnum, BinaryExpr, BuiltinScalarFunction, Extension, - Filter, LogicalPlan, LogicalPlanBuilder, Operator, + LogicalPlan, LogicalPlanBuilder, Operator, }; use datafusion::optimizer::utils; use datafusion::prelude::{Column, Expr as DfExpr, JoinType}; @@ -43,7 +43,9 @@ use crate::error::{ TableNameNotFoundSnafu, TableNotFoundSnafu, TimeIndexNotFoundSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, UnsupportedExprSnafu, ValueNotFoundSnafu, }; -use crate::extension_plan::{InstantManipulate, Millisecond, RangeManipulate, SeriesNormalize}; +use crate::extension_plan::{ + InstantManipulate, Millisecond, RangeManipulate, SeriesDivide, SeriesNormalize, +}; const LEFT_PLAN_JOIN_ALIAS: &str = "lhs"; @@ -59,6 +61,7 @@ struct PromPlannerContext { table_name: Option, time_index_column: Option, value_columns: Vec, + tag_columns: Vec, } impl PromPlannerContext { @@ -292,15 +295,19 @@ impl PromPlanner { // make table scan with filter exprs let table_scan = self.create_table_scan_plan(&table_name, filters.clone())?; - // make filter plan - let filter_plan = LogicalPlan::Filter( - Filter::try_new( - // safety: at least there are two exprs that filter timestamp column. - utils::conjunction(filters.into_iter()).unwrap(), - Arc::new(table_scan), - ) - .context(DataFusionPlanningSnafu)?, - ); + // make filter and sort plan + let sort_plan = LogicalPlanBuilder::from(table_scan) + .filter(utils::conjunction(filters.into_iter()).unwrap()) + .context(DataFusionPlanningSnafu)? + .sort(self.create_tag_and_time_index_column_sort_exprs()?) + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu)?; + + // make divide plan + let divide_plan = LogicalPlan::Extension(Extension { + node: Arc::new(SeriesDivide::new(self.ctx.tag_columns.clone(), sort_plan)), + }); // make series_normalize plan let offset_duration = match offset { @@ -314,7 +321,7 @@ impl PromPlanner { .time_index_column .clone() .with_context(|| TimeIndexNotFoundSnafu { table: table_name })?, - filter_plan, + divide_plan, ); let logical_plan = LogicalPlan::Extension(Extension { node: Arc::new(series_normalize), @@ -455,16 +462,24 @@ impl PromPlanner { .clone(); self.ctx.time_index_column = Some(time_index); - // set values column + // set values columns let values = table .table_info() .meta .value_column_names() .cloned() .collect(); - self.ctx.value_columns = values; + // set primary key (tag) columns + let tags = table + .table_info() + .meta + .row_key_column_names() + .cloned() + .collect(); + self.ctx.tag_columns = tags; + Ok(()) } @@ -542,6 +557,17 @@ impl PromPlanner { ))) } + fn create_tag_and_time_index_column_sort_exprs(&self) -> Result> { + let mut result = self + .ctx + .tag_columns + .iter() + .map(|col| DfExpr::Column(Column::from_name(col)).sort(false, false)) + .collect::>(); + result.push(self.create_time_index_column_expr()?.sort(false, false)); + Ok(result) + } + fn create_empty_values_filter_expr(&self) -> Result { let mut exprs = Vec::with_capacity(self.ctx.value_columns.len()); for value in &self.ctx.value_columns { @@ -836,13 +862,15 @@ mod test { let context_provider = build_test_context_provider("some_metric".to_string(), 1, 1).await; let plan = PromPlanner::stmt_to_plan(eval_stmt, context_provider).unwrap(); - let expected = String::from( + let expected = String::from( "Filter: some_metric.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N]\ \n Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) [timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N]\ \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n Filter: tag_0 != Utf8(\"bar\") AND timestamp >= TimestampMillisecond(0, None) AND timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]", + \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(0, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" ).replace("TEMPLATE", plan_name); assert_eq!(plan.display_indent_schema().to_string(), expected); @@ -1062,9 +1090,11 @@ mod test { "Aggregate: groupBy=[[some_metric.tag_1]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\ \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ - \n Filter: tag_0 != Utf8(\"bar\") AND timestamp >= TimestampMillisecond(0, None) AND timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ - \n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]") - .replace("TEMPLATE", name); + \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ + \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.tag_1 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ + \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(0, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]" + ).replace("TEMPLATE", name); assert_eq!( plan.display_indent_schema().to_string(), expected_no_without @@ -1080,9 +1110,11 @@ mod test { "Aggregate: groupBy=[[some_metric.tag_0]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\ \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ - \n Filter: tag_0 != Utf8(\"bar\") AND timestamp >= TimestampMillisecond(0, None) AND timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ - \n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]") - .replace("TEMPLATE", name); + \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ + \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.tag_1 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ + \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(0, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]" + ).replace("TEMPLATE", name); assert_eq!(plan.display_indent_schema().to_string(), expected_without); } @@ -1248,12 +1280,16 @@ mod test { \n SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n Filter: tag_0 = Utf8(\"foo\") AND timestamp >= TimestampMillisecond(0, None) AND timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"foo\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(0, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"foo\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n Filter: tag_0 = Utf8(\"bar\") AND timestamp >= TimestampMillisecond(0, None) AND timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" + \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(0, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" ); assert_eq!(plan.display_indent_schema().to_string(), expected); @@ -1309,8 +1345,10 @@ mod test { "Projection: Float64(1) + some_metric.field_0 [Float64(1) + some_metric.field_0:Float64;N]\ \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n Filter: tag_0 = Utf8(\"bar\") AND timestamp >= TimestampMillisecond(0, None) AND timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" + \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(0, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" ); assert_eq!(plan.display_indent_schema().to_string(), expected);