mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 21:02:58 +00:00
feat: SeriesDivide plan for PromQL (#960)
* implement SeriesDivide plan Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * planner part Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix clippy and typo Signed-off-by: Ruihang Xia <waynestxia@gmail.com> --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
@@ -16,11 +16,13 @@ mod instant_manipulate;
|
||||
mod normalize;
|
||||
mod planner;
|
||||
mod range_manipulate;
|
||||
mod series_divide;
|
||||
|
||||
use datafusion::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType};
|
||||
pub use instant_manipulate::{InstantManipulate, InstantManipulateExec, InstantManipulateStream};
|
||||
pub use normalize::{SeriesNormalize, SeriesNormalizeExec, SeriesNormalizeStream};
|
||||
pub use planner::PromExtensionPlanner;
|
||||
pub use range_manipulate::{RangeManipulate, RangeManipulateExec, RangeManipulateStream};
|
||||
pub use series_divide::{SeriesDivide, SeriesDivideExec, SeriesDivideStream};
|
||||
|
||||
pub(crate) type Millisecond = <TimestampMillisecondType as ArrowPrimitiveType>::Native;
|
||||
|
||||
@@ -21,7 +21,7 @@ use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode};
|
||||
use datafusion::physical_plan::planner::ExtensionPlanner;
|
||||
use datafusion::physical_plan::{ExecutionPlan, PhysicalPlanner};
|
||||
|
||||
use super::{InstantManipulate, RangeManipulate};
|
||||
use super::{InstantManipulate, RangeManipulate, SeriesDivide};
|
||||
use crate::extension_plan::SeriesNormalize;
|
||||
|
||||
pub struct PromExtensionPlanner {}
|
||||
@@ -42,6 +42,8 @@ impl ExtensionPlanner for PromExtensionPlanner {
|
||||
Ok(Some(node.to_execution_plan(physical_inputs[0].clone())))
|
||||
} else if let Some(node) = node.as_any().downcast_ref::<RangeManipulate>() {
|
||||
Ok(Some(node.to_execution_plan(physical_inputs[0].clone())))
|
||||
} else if let Some(node) = node.as_any().downcast_ref::<SeriesDivide>() {
|
||||
Ok(Some(node.to_execution_plan(physical_inputs[0].clone())))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
458
src/promql/src/extension_plan/series_divide.rs
Normal file
458
src/promql/src/extension_plan/series_divide.rs
Normal file
@@ -0,0 +1,458 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use datafusion::arrow::array::{Array, StringArray};
|
||||
use datafusion::arrow::datatypes::SchemaRef;
|
||||
use datafusion::arrow::record_batch::RecordBatch;
|
||||
use datafusion::common::DFSchemaRef;
|
||||
use datafusion::error::Result as DataFusionResult;
|
||||
use datafusion::execution::context::TaskContext;
|
||||
use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNode};
|
||||
use datafusion::physical_expr::PhysicalSortExpr;
|
||||
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
|
||||
use datafusion::physical_plan::{
|
||||
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
|
||||
Statistics,
|
||||
};
|
||||
use datatypes::arrow::compute;
|
||||
use datatypes::arrow::error::Result as ArrowResult;
|
||||
use futures::{ready, Stream, StreamExt};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct SeriesDivide {
|
||||
tag_columns: Vec<String>,
|
||||
input: LogicalPlan,
|
||||
}
|
||||
|
||||
impl UserDefinedLogicalNode for SeriesDivide {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self as _
|
||||
}
|
||||
|
||||
fn inputs(&self) -> Vec<&LogicalPlan> {
|
||||
vec![&self.input]
|
||||
}
|
||||
|
||||
fn schema(&self) -> &DFSchemaRef {
|
||||
self.input.schema()
|
||||
}
|
||||
|
||||
fn expressions(&self) -> Vec<Expr> {
|
||||
vec![]
|
||||
}
|
||||
|
||||
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "PromSeriesDivide: tags={:?}", self.tag_columns)
|
||||
}
|
||||
|
||||
fn from_template(
|
||||
&self,
|
||||
_exprs: &[Expr],
|
||||
inputs: &[LogicalPlan],
|
||||
) -> Arc<dyn UserDefinedLogicalNode> {
|
||||
assert!(!inputs.is_empty());
|
||||
|
||||
Arc::new(Self {
|
||||
tag_columns: self.tag_columns.clone(),
|
||||
input: inputs[0].clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl SeriesDivide {
|
||||
pub fn new(tag_columns: Vec<String>, input: LogicalPlan) -> Self {
|
||||
Self { tag_columns, input }
|
||||
}
|
||||
|
||||
pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
|
||||
Arc::new(SeriesDivideExec {
|
||||
tag_columns: self.tag_columns.clone(),
|
||||
input: exec_input,
|
||||
metric: ExecutionPlanMetricsSet::new(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct SeriesDivideExec {
|
||||
tag_columns: Vec<String>,
|
||||
input: Arc<dyn ExecutionPlan>,
|
||||
metric: ExecutionPlanMetricsSet,
|
||||
}
|
||||
|
||||
impl ExecutionPlan for SeriesDivideExec {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.input.schema()
|
||||
}
|
||||
|
||||
fn output_partitioning(&self) -> Partitioning {
|
||||
self.input.output_partitioning()
|
||||
}
|
||||
|
||||
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
|
||||
self.input.output_ordering()
|
||||
}
|
||||
|
||||
fn maintains_input_order(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
|
||||
vec![self.input.clone()]
|
||||
}
|
||||
|
||||
fn with_new_children(
|
||||
self: Arc<Self>,
|
||||
children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
|
||||
assert!(!children.is_empty());
|
||||
Ok(Arc::new(Self {
|
||||
tag_columns: self.tag_columns.clone(),
|
||||
input: children[0].clone(),
|
||||
metric: self.metric.clone(),
|
||||
}))
|
||||
}
|
||||
|
||||
fn execute(
|
||||
&self,
|
||||
partition: usize,
|
||||
context: Arc<TaskContext>,
|
||||
) -> DataFusionResult<SendableRecordBatchStream> {
|
||||
let baseline_metric = BaselineMetrics::new(&self.metric, partition);
|
||||
|
||||
let input = self.input.execute(partition, context)?;
|
||||
let schema = input.schema();
|
||||
let tag_indices = self
|
||||
.tag_columns
|
||||
.iter()
|
||||
.map(|tag| {
|
||||
schema
|
||||
.column_with_name(tag)
|
||||
.unwrap_or_else(|| panic!("tag column not found {tag}"))
|
||||
.0
|
||||
})
|
||||
.collect();
|
||||
Ok(Box::pin(SeriesDivideStream {
|
||||
tag_indices,
|
||||
buffer: None,
|
||||
schema,
|
||||
input,
|
||||
metric: baseline_metric,
|
||||
}))
|
||||
}
|
||||
|
||||
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
match t {
|
||||
DisplayFormatType::Default => {
|
||||
write!(f, "PromSeriesDivideExec: tags={:?}", self.tag_columns)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn metrics(&self) -> Option<MetricsSet> {
|
||||
Some(self.metric.clone_inner())
|
||||
}
|
||||
|
||||
fn statistics(&self) -> Statistics {
|
||||
Statistics {
|
||||
num_rows: None,
|
||||
total_byte_size: None,
|
||||
// TODO(ruihang): support this column statistics
|
||||
column_statistics: None,
|
||||
is_exact: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Assume the input stream is ordered on the tag columns.
|
||||
pub struct SeriesDivideStream {
|
||||
tag_indices: Vec<usize>,
|
||||
buffer: Option<RecordBatch>,
|
||||
schema: SchemaRef,
|
||||
input: SendableRecordBatchStream,
|
||||
metric: BaselineMetrics,
|
||||
}
|
||||
|
||||
impl RecordBatchStream for SeriesDivideStream {
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.schema.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for SeriesDivideStream {
|
||||
type Item = ArrowResult<RecordBatch>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
loop {
|
||||
if let Some(batch) = self.buffer.clone() {
|
||||
let same_length = self.find_first_diff_row(&batch) + 1;
|
||||
if same_length == batch.num_rows() {
|
||||
let next_batch = match ready!(self.as_mut().fetch_next_batch(cx)) {
|
||||
Some(Ok(batch)) => batch,
|
||||
None => {
|
||||
self.buffer = None;
|
||||
return Poll::Ready(Some(Ok(batch)));
|
||||
}
|
||||
error => return Poll::Ready(error),
|
||||
};
|
||||
let new_batch =
|
||||
compute::concat_batches(&batch.schema(), &[batch.clone(), next_batch])?;
|
||||
self.buffer = Some(new_batch);
|
||||
continue;
|
||||
} else {
|
||||
let result_batch = batch.slice(0, same_length);
|
||||
let remaining_batch = batch.slice(same_length, batch.num_rows() - same_length);
|
||||
self.buffer = Some(remaining_batch);
|
||||
return Poll::Ready(Some(Ok(result_batch)));
|
||||
}
|
||||
} else {
|
||||
let batch = match ready!(self.as_mut().fetch_next_batch(cx)) {
|
||||
Some(Ok(batch)) => batch,
|
||||
None => return Poll::Ready(None),
|
||||
error => return Poll::Ready(error),
|
||||
};
|
||||
self.buffer = Some(batch);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SeriesDivideStream {
|
||||
fn fetch_next_batch(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<ArrowResult<RecordBatch>>> {
|
||||
let poll = match self.input.poll_next_unpin(cx) {
|
||||
Poll::Ready(batch) => {
|
||||
let _timer = self.metric.elapsed_compute().timer();
|
||||
Poll::Ready(batch)
|
||||
}
|
||||
Poll::Pending => Poll::Pending,
|
||||
};
|
||||
self.metric.record_poll(poll)
|
||||
}
|
||||
|
||||
fn find_first_diff_row(&self, batch: &RecordBatch) -> usize {
|
||||
let num_rows = batch.num_rows();
|
||||
let mut result = num_rows;
|
||||
|
||||
for index in &self.tag_indices {
|
||||
let array = batch.column(*index);
|
||||
let string_array = array.as_any().downcast_ref::<StringArray>().unwrap();
|
||||
// the first row number that not equal to the next row.
|
||||
let mut same_until = 0;
|
||||
while same_until < num_rows - 1 {
|
||||
if string_array.value(same_until) != string_array.value(same_until + 1) {
|
||||
break;
|
||||
}
|
||||
same_until += 1;
|
||||
}
|
||||
result = result.min(same_until);
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use datafusion::arrow::datatypes::{DataType, Field, Schema};
|
||||
use datafusion::from_slice::FromSlice;
|
||||
use datafusion::physical_plan::memory::MemoryExec;
|
||||
use datafusion::prelude::SessionContext;
|
||||
|
||||
use super::*;
|
||||
|
||||
fn prepare_test_data() -> MemoryExec {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("host", DataType::Utf8, true),
|
||||
Field::new("path", DataType::Utf8, true),
|
||||
]));
|
||||
|
||||
let path_column_1 = Arc::new(StringArray::from_slice([
|
||||
"foo", "foo", "foo", "bar", "bar", "bar", "bar", "bar", "bar", "bla", "bla", "bla",
|
||||
])) as _;
|
||||
let host_column_1 = Arc::new(StringArray::from_slice([
|
||||
"000", "000", "001", "002", "002", "002", "002", "002", "003", "005", "005", "005",
|
||||
])) as _;
|
||||
|
||||
let path_column_2 = Arc::new(StringArray::from_slice(["bla", "bla", "bla"])) as _;
|
||||
let host_column_2 = Arc::new(StringArray::from_slice(["005", "005", "005"])) as _;
|
||||
|
||||
let path_column_3 = Arc::new(StringArray::from_slice([
|
||||
"bla", "🥺", "🥺", "🥺", "🥺", "🥺", "🫠", "🫠",
|
||||
])) as _;
|
||||
let host_column_3 = Arc::new(StringArray::from_slice([
|
||||
"005", "001", "001", "001", "001", "001", "001", "001",
|
||||
])) as _;
|
||||
|
||||
let data_1 =
|
||||
RecordBatch::try_new(schema.clone(), vec![path_column_1, host_column_1]).unwrap();
|
||||
let data_2 =
|
||||
RecordBatch::try_new(schema.clone(), vec![path_column_2, host_column_2]).unwrap();
|
||||
let data_3 =
|
||||
RecordBatch::try_new(schema.clone(), vec![path_column_3, host_column_3]).unwrap();
|
||||
|
||||
MemoryExec::try_new(&[vec![data_1, data_2, data_3]], schema, None).unwrap()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn overall_data() {
|
||||
let memory_exec = Arc::new(prepare_test_data());
|
||||
let divide_exec = Arc::new(SeriesDivideExec {
|
||||
tag_columns: vec!["host".to_string(), "path".to_string()],
|
||||
input: memory_exec,
|
||||
metric: ExecutionPlanMetricsSet::new(),
|
||||
});
|
||||
let session_context = SessionContext::default();
|
||||
let result = datafusion::physical_plan::collect(divide_exec, session_context.task_ctx())
|
||||
.await
|
||||
.unwrap();
|
||||
let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
|
||||
.unwrap()
|
||||
.to_string();
|
||||
|
||||
let expected = String::from(
|
||||
"+------+------+\
|
||||
\n| host | path |\
|
||||
\n+------+------+\
|
||||
\n| foo | 000 |\
|
||||
\n| foo | 000 |\
|
||||
\n| foo | 001 |\
|
||||
\n| bar | 002 |\
|
||||
\n| bar | 002 |\
|
||||
\n| bar | 002 |\
|
||||
\n| bar | 002 |\
|
||||
\n| bar | 002 |\
|
||||
\n| bar | 003 |\
|
||||
\n| bla | 005 |\
|
||||
\n| bla | 005 |\
|
||||
\n| bla | 005 |\
|
||||
\n| bla | 005 |\
|
||||
\n| bla | 005 |\
|
||||
\n| bla | 005 |\
|
||||
\n| bla | 005 |\
|
||||
\n| 🥺 | 001 |\
|
||||
\n| 🥺 | 001 |\
|
||||
\n| 🥺 | 001 |\
|
||||
\n| 🥺 | 001 |\
|
||||
\n| 🥺 | 001 |\
|
||||
\n| 🫠 | 001 |\
|
||||
\n| 🫠 | 001 |\
|
||||
\n+------+------+",
|
||||
);
|
||||
assert_eq!(result_literal, expected);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn per_batch_data() {
|
||||
let memory_exec = Arc::new(prepare_test_data());
|
||||
let divide_exec = Arc::new(SeriesDivideExec {
|
||||
tag_columns: vec!["host".to_string(), "path".to_string()],
|
||||
input: memory_exec,
|
||||
metric: ExecutionPlanMetricsSet::new(),
|
||||
});
|
||||
let mut divide_stream = divide_exec
|
||||
.execute(0, SessionContext::default().task_ctx())
|
||||
.unwrap();
|
||||
|
||||
let mut expectations = vec![
|
||||
String::from(
|
||||
"+------+------+\
|
||||
\n| host | path |\
|
||||
\n+------+------+\
|
||||
\n| foo | 000 |\
|
||||
\n| foo | 000 |\
|
||||
\n+------+------+",
|
||||
),
|
||||
String::from(
|
||||
"+------+------+\
|
||||
\n| host | path |\
|
||||
\n+------+------+\
|
||||
\n| foo | 001 |\
|
||||
\n+------+------+",
|
||||
),
|
||||
String::from(
|
||||
"+------+------+\
|
||||
\n| host | path |\
|
||||
\n+------+------+\
|
||||
\n| bar | 002 |\
|
||||
\n| bar | 002 |\
|
||||
\n| bar | 002 |\
|
||||
\n| bar | 002 |\
|
||||
\n| bar | 002 |\
|
||||
\n+------+------+",
|
||||
),
|
||||
String::from(
|
||||
"+------+------+\
|
||||
\n| host | path |\
|
||||
\n+------+------+\
|
||||
\n| bar | 003 |\
|
||||
\n+------+------+",
|
||||
),
|
||||
String::from(
|
||||
"+------+------+\
|
||||
\n| host | path |\
|
||||
\n+------+------+\
|
||||
\n| bla | 005 |\
|
||||
\n| bla | 005 |\
|
||||
\n| bla | 005 |\
|
||||
\n| bla | 005 |\
|
||||
\n| bla | 005 |\
|
||||
\n| bla | 005 |\
|
||||
\n| bla | 005 |\
|
||||
\n+------+------+",
|
||||
),
|
||||
String::from(
|
||||
"+------+------+\
|
||||
\n| host | path |\
|
||||
\n+------+------+\
|
||||
\n| 🥺 | 001 |\
|
||||
\n| 🥺 | 001 |\
|
||||
\n| 🥺 | 001 |\
|
||||
\n| 🥺 | 001 |\
|
||||
\n| 🥺 | 001 |\
|
||||
\n+------+------+",
|
||||
),
|
||||
String::from(
|
||||
"+------+------+\
|
||||
\n| host | path |\
|
||||
\n+------+------+\
|
||||
\n| 🫠 | 001 |\
|
||||
\n| 🫠 | 001 |\
|
||||
\n+------+------+",
|
||||
),
|
||||
];
|
||||
expectations.reverse();
|
||||
|
||||
while let Some(batch) = divide_stream.next().await {
|
||||
let formatted =
|
||||
datatypes::arrow::util::pretty::pretty_format_batches(&[batch.unwrap()])
|
||||
.unwrap()
|
||||
.to_string();
|
||||
let expected = expectations.pop().unwrap();
|
||||
assert_eq!(formatted, expected);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -22,7 +22,7 @@ use datafusion::datasource::DefaultTableSource;
|
||||
use datafusion::logical_expr::expr::AggregateFunction;
|
||||
use datafusion::logical_expr::{
|
||||
AggregateFunction as AggregateFunctionEnum, BinaryExpr, BuiltinScalarFunction, Extension,
|
||||
Filter, LogicalPlan, LogicalPlanBuilder, Operator,
|
||||
LogicalPlan, LogicalPlanBuilder, Operator,
|
||||
};
|
||||
use datafusion::optimizer::utils;
|
||||
use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
|
||||
@@ -43,7 +43,9 @@ use crate::error::{
|
||||
TableNameNotFoundSnafu, TableNotFoundSnafu, TimeIndexNotFoundSnafu, UnexpectedTokenSnafu,
|
||||
UnknownTableSnafu, UnsupportedExprSnafu, ValueNotFoundSnafu,
|
||||
};
|
||||
use crate::extension_plan::{InstantManipulate, Millisecond, RangeManipulate, SeriesNormalize};
|
||||
use crate::extension_plan::{
|
||||
InstantManipulate, Millisecond, RangeManipulate, SeriesDivide, SeriesNormalize,
|
||||
};
|
||||
|
||||
const LEFT_PLAN_JOIN_ALIAS: &str = "lhs";
|
||||
|
||||
@@ -59,6 +61,7 @@ struct PromPlannerContext {
|
||||
table_name: Option<String>,
|
||||
time_index_column: Option<String>,
|
||||
value_columns: Vec<String>,
|
||||
tag_columns: Vec<String>,
|
||||
}
|
||||
|
||||
impl PromPlannerContext {
|
||||
@@ -292,15 +295,19 @@ impl<S: ContextProvider> PromPlanner<S> {
|
||||
// make table scan with filter exprs
|
||||
let table_scan = self.create_table_scan_plan(&table_name, filters.clone())?;
|
||||
|
||||
// make filter plan
|
||||
let filter_plan = LogicalPlan::Filter(
|
||||
Filter::try_new(
|
||||
// safety: at least there are two exprs that filter timestamp column.
|
||||
utils::conjunction(filters.into_iter()).unwrap(),
|
||||
Arc::new(table_scan),
|
||||
)
|
||||
.context(DataFusionPlanningSnafu)?,
|
||||
);
|
||||
// make filter and sort plan
|
||||
let sort_plan = LogicalPlanBuilder::from(table_scan)
|
||||
.filter(utils::conjunction(filters.into_iter()).unwrap())
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
.sort(self.create_tag_and_time_index_column_sort_exprs()?)
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
.build()
|
||||
.context(DataFusionPlanningSnafu)?;
|
||||
|
||||
// make divide plan
|
||||
let divide_plan = LogicalPlan::Extension(Extension {
|
||||
node: Arc::new(SeriesDivide::new(self.ctx.tag_columns.clone(), sort_plan)),
|
||||
});
|
||||
|
||||
// make series_normalize plan
|
||||
let offset_duration = match offset {
|
||||
@@ -314,7 +321,7 @@ impl<S: ContextProvider> PromPlanner<S> {
|
||||
.time_index_column
|
||||
.clone()
|
||||
.with_context(|| TimeIndexNotFoundSnafu { table: table_name })?,
|
||||
filter_plan,
|
||||
divide_plan,
|
||||
);
|
||||
let logical_plan = LogicalPlan::Extension(Extension {
|
||||
node: Arc::new(series_normalize),
|
||||
@@ -455,16 +462,24 @@ impl<S: ContextProvider> PromPlanner<S> {
|
||||
.clone();
|
||||
self.ctx.time_index_column = Some(time_index);
|
||||
|
||||
// set values column
|
||||
// set values columns
|
||||
let values = table
|
||||
.table_info()
|
||||
.meta
|
||||
.value_column_names()
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
self.ctx.value_columns = values;
|
||||
|
||||
// set primary key (tag) columns
|
||||
let tags = table
|
||||
.table_info()
|
||||
.meta
|
||||
.row_key_column_names()
|
||||
.cloned()
|
||||
.collect();
|
||||
self.ctx.tag_columns = tags;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -542,6 +557,17 @@ impl<S: ContextProvider> PromPlanner<S> {
|
||||
)))
|
||||
}
|
||||
|
||||
fn create_tag_and_time_index_column_sort_exprs(&self) -> Result<Vec<DfExpr>> {
|
||||
let mut result = self
|
||||
.ctx
|
||||
.tag_columns
|
||||
.iter()
|
||||
.map(|col| DfExpr::Column(Column::from_name(col)).sort(false, false))
|
||||
.collect::<Vec<_>>();
|
||||
result.push(self.create_time_index_column_expr()?.sort(false, false));
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
fn create_empty_values_filter_expr(&self) -> Result<DfExpr> {
|
||||
let mut exprs = Vec::with_capacity(self.ctx.value_columns.len());
|
||||
for value in &self.ctx.value_columns {
|
||||
@@ -836,13 +862,15 @@ mod test {
|
||||
let context_provider = build_test_context_provider("some_metric".to_string(), 1, 1).await;
|
||||
let plan = PromPlanner::stmt_to_plan(eval_stmt, context_provider).unwrap();
|
||||
|
||||
let expected = String::from(
|
||||
let expected = String::from(
|
||||
"Filter: some_metric.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N]\
|
||||
\n Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) [timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N]\
|
||||
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n Filter: tag_0 != Utf8(\"bar\") AND timestamp >= TimestampMillisecond(0, None) AND timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
|
||||
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(0, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
|
||||
).replace("TEMPLATE", plan_name);
|
||||
|
||||
assert_eq!(plan.display_indent_schema().to_string(), expected);
|
||||
@@ -1062,9 +1090,11 @@ mod test {
|
||||
"Aggregate: groupBy=[[some_metric.tag_1]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
|
||||
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
|
||||
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
|
||||
\n Filter: tag_0 != Utf8(\"bar\") AND timestamp >= TimestampMillisecond(0, None) AND timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
|
||||
\n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]")
|
||||
.replace("TEMPLATE", name);
|
||||
\n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
|
||||
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.tag_1 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
|
||||
\n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(0, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
|
||||
\n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
|
||||
).replace("TEMPLATE", name);
|
||||
assert_eq!(
|
||||
plan.display_indent_schema().to_string(),
|
||||
expected_no_without
|
||||
@@ -1080,9 +1110,11 @@ mod test {
|
||||
"Aggregate: groupBy=[[some_metric.tag_0]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
|
||||
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
|
||||
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
|
||||
\n Filter: tag_0 != Utf8(\"bar\") AND timestamp >= TimestampMillisecond(0, None) AND timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
|
||||
\n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]")
|
||||
.replace("TEMPLATE", name);
|
||||
\n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
|
||||
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.tag_1 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
|
||||
\n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(0, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
|
||||
\n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
|
||||
).replace("TEMPLATE", name);
|
||||
assert_eq!(plan.display_indent_schema().to_string(), expected_without);
|
||||
}
|
||||
|
||||
@@ -1248,12 +1280,16 @@ mod test {
|
||||
\n SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n Filter: tag_0 = Utf8(\"foo\") AND timestamp >= TimestampMillisecond(0, None) AND timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"foo\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(0, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"foo\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n Filter: tag_0 = Utf8(\"bar\") AND timestamp >= TimestampMillisecond(0, None) AND timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
|
||||
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(0, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
|
||||
);
|
||||
|
||||
assert_eq!(plan.display_indent_schema().to_string(), expected);
|
||||
@@ -1309,8 +1345,10 @@ mod test {
|
||||
"Projection: Float64(1) + some_metric.field_0 [Float64(1) + some_metric.field_0:Float64;N]\
|
||||
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n Filter: tag_0 = Utf8(\"bar\") AND timestamp >= TimestampMillisecond(0, None) AND timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
|
||||
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(0, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
|
||||
);
|
||||
|
||||
assert_eq!(plan.display_indent_schema().to_string(), expected);
|
||||
|
||||
Reference in New Issue
Block a user