From 5fb417ec7caf8ba7c2a6499c6fb042391244c376 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 10 Jan 2023 16:27:09 +0800 Subject: [PATCH] feat: implement RangeManipulate (#843) * basic impl Signed-off-by: Ruihang Xia * impl constructor Signed-off-by: Ruihang Xia * test printout Signed-off-by: Ruihang Xia * truncate tag columns Signed-off-by: Ruihang Xia * doc this plan Signed-off-by: Ruihang Xia * fix empty range Signed-off-by: Ruihang Xia * fix clippy Signed-off-by: Ruihang Xia * document behavior Signed-off-by: Ruihang Xia Signed-off-by: Ruihang Xia --- src/promql/src/extension_plan.rs | 2 + .../src/extension_plan/range_manipulate.rs | 530 ++++++++++++++++++ src/promql/src/range_array.rs | 31 +- 3 files changed, 560 insertions(+), 3 deletions(-) create mode 100644 src/promql/src/extension_plan/range_manipulate.rs diff --git a/src/promql/src/extension_plan.rs b/src/promql/src/extension_plan.rs index d815b08351..820f29a994 100644 --- a/src/promql/src/extension_plan.rs +++ b/src/promql/src/extension_plan.rs @@ -14,9 +14,11 @@ mod instant_manipulate; mod normalize; +mod range_manipulate; use datafusion::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType}; pub use instant_manipulate::{InstantManipulate, InstantManipulateExec, InstantManipulateStream}; pub use normalize::{SeriesNormalize, SeriesNormalizeExec, SeriesNormalizeStream}; +pub use range_manipulate::{RangeManipulate, RangeManipulateExec, RangeManipulateStream}; pub(crate) type Millisecond = ::Native; diff --git a/src/promql/src/extension_plan/range_manipulate.rs b/src/promql/src/extension_plan/range_manipulate.rs new file mode 100644 index 0000000000..fdbbfb78c2 --- /dev/null +++ b/src/promql/src/extension_plan/range_manipulate.rs @@ -0,0 +1,530 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::{HashMap, HashSet}; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use datafusion::arrow::array::{Array, Int64Array, TimestampMillisecondArray}; +use datafusion::arrow::compute; +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::error::ArrowError; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::common::{DFField, DFSchema, DFSchemaRef}; +use datafusion::error::Result as DataFusionResult; +use datafusion::execution::context::TaskContext; +use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNode}; +use datafusion::physical_expr::PhysicalSortExpr; +use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; +use datafusion::physical_plan::{ + DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, + Statistics, +}; +use datatypes::arrow::error::Result as ArrowResult; +use futures::{Stream, StreamExt}; + +use crate::extension_plan::Millisecond; +use crate::range_array::RangeArray; + +/// Time series manipulator for range function. +/// +/// This plan will "fold" time index and value columns into [RangeArray]s, and truncate +/// other columns to the same length with the "folded" [RangeArray] column. +#[derive(Debug)] +pub struct RangeManipulate { + start: Millisecond, + end: Millisecond, + interval: Millisecond, + range: Millisecond, + + time_index: String, + value_columns: Vec, + input: LogicalPlan, + output_schema: DFSchemaRef, +} + +impl RangeManipulate { + pub fn new( + start: Millisecond, + end: Millisecond, + interval: Millisecond, + range: Millisecond, + time_index: String, + value_columns: Vec, + input: LogicalPlan, + ) -> DataFusionResult { + let output_schema = + Self::calculate_output_schema(input.schema(), &time_index, &value_columns)?; + Ok(Self { + start, + end, + interval, + range, + time_index, + value_columns, + input, + output_schema, + }) + } + + fn calculate_output_schema( + input_schema: &DFSchemaRef, + time_index: &str, + value_columns: &[String], + ) -> DataFusionResult { + let mut columns = input_schema.fields().clone(); + + // process time index column + let index = input_schema.index_of_column_by_name(None, time_index)?; + columns[index] = DFField::from(RangeArray::convert_field(columns[index].field())); + + // process value columns + for name in value_columns { + let index = input_schema.index_of_column_by_name(None, name)?; + columns[index] = DFField::from(RangeArray::convert_field(columns[index].field())); + } + + Ok(Arc::new(DFSchema::new_with_metadata( + columns, + HashMap::new(), + )?)) + } + + pub fn to_execution_plan(&self, exec_input: Arc) -> Arc { + Arc::new(RangeManipulateExec { + start: self.start, + end: self.end, + interval: self.interval, + range: self.range, + time_index_column: self.time_index.clone(), + value_columns: self.value_columns.clone(), + input: exec_input, + output_schema: SchemaRef::new(self.output_schema.as_ref().into()), + metric: ExecutionPlanMetricsSet::new(), + }) + } +} + +impl UserDefinedLogicalNode for RangeManipulate { + fn as_any(&self) -> &dyn Any { + self as _ + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![&self.input] + } + + fn schema(&self) -> &DFSchemaRef { + &self.output_schema + } + + fn expressions(&self) -> Vec { + vec![] + } + + fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + f, + "PromRangeManipulate: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}], values={:?}", + self.start, self.end, self.interval, self.range, self.time_index, self.value_columns + ) + } + + fn from_template( + &self, + _exprs: &[Expr], + inputs: &[LogicalPlan], + ) -> Arc { + assert!(!inputs.is_empty()); + + Arc::new(Self { + start: self.start, + end: self.end, + interval: self.interval, + range: self.range, + time_index: self.time_index.clone(), + value_columns: self.value_columns.clone(), + input: inputs[0].clone(), + output_schema: self.output_schema.clone(), + }) + } +} + +#[derive(Debug)] +pub struct RangeManipulateExec { + start: Millisecond, + end: Millisecond, + interval: Millisecond, + range: Millisecond, + time_index_column: String, + value_columns: Vec, + + input: Arc, + output_schema: SchemaRef, + metric: ExecutionPlanMetricsSet, +} + +impl ExecutionPlan for RangeManipulateExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.output_schema.clone() + } + + fn output_partitioning(&self) -> Partitioning { + self.input.output_partitioning() + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + self.input.output_ordering() + } + + fn maintains_input_order(&self) -> bool { + true + } + + fn children(&self) -> Vec> { + vec![self.input.clone()] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> DataFusionResult> { + assert!(!children.is_empty()); + Ok(Arc::new(Self { + start: self.start, + end: self.end, + interval: self.interval, + range: self.range, + time_index_column: self.time_index_column.clone(), + value_columns: self.value_columns.clone(), + output_schema: self.output_schema.clone(), + input: children[0].clone(), + metric: self.metric.clone(), + })) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> DataFusionResult { + let baseline_metric = BaselineMetrics::new(&self.metric, partition); + + let input = self.input.execute(partition, context)?; + let schema = input.schema(); + let time_index = schema + .column_with_name(&self.time_index_column) + .unwrap_or_else(|| panic!("time index column {} not found", self.time_index_column)) + .0; + let value_columns = self + .value_columns + .iter() + .map(|value_col| { + schema + .column_with_name(value_col) + .unwrap_or_else(|| panic!("value column {value_col} not found",)) + .0 + }) + .collect(); + Ok(Box::pin(RangeManipulateStream { + start: self.start, + end: self.end, + interval: self.interval, + range: self.range, + time_index, + value_columns, + output_schema: self.output_schema.clone(), + input, + metric: baseline_metric, + })) + } + + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!( + f, + "PromInstantManipulateExec: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}]", + self.start, self.end, self.interval, self.range, self.time_index_column + ) + } + } + } + + fn metrics(&self) -> Option { + Some(self.metric.clone_inner()) + } + + fn statistics(&self) -> Statistics { + let input_stats = self.input.statistics(); + + let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64; + let estimated_total_bytes = input_stats + .total_byte_size + .zip(input_stats.num_rows) + .map(|(size, rows)| (size as f64 / rows as f64) * estimated_row_num) + .map(|size| size.floor() as _); + + Statistics { + num_rows: Some(estimated_row_num.floor() as _), + total_byte_size: estimated_total_bytes, + // TODO(ruihang): support this column statistics + column_statistics: None, + is_exact: false, + } + } +} + +pub struct RangeManipulateStream { + start: Millisecond, + end: Millisecond, + interval: Millisecond, + range: Millisecond, + time_index: usize, + value_columns: Vec, + + output_schema: SchemaRef, + input: SendableRecordBatchStream, + metric: BaselineMetrics, +} + +impl RecordBatchStream for RangeManipulateStream { + fn schema(&self) -> SchemaRef { + self.output_schema.clone() + } +} + +impl Stream for RangeManipulateStream { + type Item = ArrowResult; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let poll = match self.input.poll_next_unpin(cx) { + Poll::Ready(batch) => { + let _timer = self.metric.elapsed_compute().timer(); + Poll::Ready(batch.map(|batch| batch.and_then(|batch| self.manipulate(batch)))) + } + Poll::Pending => Poll::Pending, + }; + self.metric.record_poll(poll) + } +} + +impl RangeManipulateStream { + // Prometheus: https://github.com/prometheus/prometheus/blob/e934d0f01158a1d55fa0ebb035346b195fcc1260/promql/engine.go#L1113-L1198 + // But they are not exactly the same, because we don't eager-evaluate on the data in this plan. + // And the generated timestamp is not aligned to the step. It's expected to do later. + pub fn manipulate(&self, input: RecordBatch) -> ArrowResult { + let mut other_columns = (0..input.columns().len()).collect::>(); + // calculate the range + let ranges = self.calculate_range(&input); + // transform columns + let mut new_columns = input.columns().to_vec(); + for index in self.value_columns.iter().chain([self.time_index].iter()) { + other_columns.remove(index); + let column = input.column(*index); + let new_column = Arc::new( + RangeArray::from_ranges(column.clone(), ranges.clone()) + .map_err(|e| ArrowError::InvalidArgumentError(e.to_string()))? + .into_dict(), + ); + new_columns[*index] = new_column; + } + + // truncate other columns + let take_indices = Int64Array::from(vec![0; ranges.len()]); + for index in other_columns.into_iter() { + new_columns[index] = compute::take(&input.column(index), &take_indices, None)?; + } + + RecordBatch::try_new(self.output_schema.clone(), new_columns) + } + + fn calculate_range(&self, input: &RecordBatch) -> Vec<(u32, u32)> { + let ts_column = input + .column(self.time_index) + .as_any() + .downcast_ref::() + .unwrap(); + + let mut result = vec![]; + + // calculate for every aligned timestamp (`curr_ts`), assume the ts column is ordered. + for curr_ts in (self.start..=self.end).step_by(self.interval as _) { + let mut range_start = ts_column.len(); + let mut range_end = 0; + for (index, ts) in ts_column.values().iter().enumerate() { + if ts + self.range >= curr_ts { + range_start = range_start.min(index); + } + if *ts <= curr_ts { + range_end = range_end.max(index); + } else { + break; + } + } + if range_start > range_end { + result.push((0, 0)); + } else { + result.push((range_start as _, (range_end + 1 - range_start) as _)); + } + } + + result + } +} + +#[cfg(test)] +mod test { + use datafusion::arrow::array::{ArrayRef, DictionaryArray, Float64Array, StringArray}; + use datafusion::arrow::datatypes::{ + ArrowPrimitiveType, DataType, Field, Int64Type, Schema, TimestampMillisecondType, + }; + use datafusion::common::ToDFSchema; + use datafusion::from_slice::FromSlice; + use datafusion::physical_plan::memory::MemoryExec; + use datafusion::prelude::SessionContext; + use datatypes::arrow::array::TimestampMillisecondArray; + + use super::*; + + const TIME_INDEX_COLUMN: &str = "timestamp"; + + fn prepare_test_data() -> MemoryExec { + let schema = Arc::new(Schema::new(vec![ + Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true), + Field::new("value_1", DataType::Float64, true), + Field::new("value_2", DataType::Float64, true), + Field::new("path", DataType::Utf8, true), + ])); + let timestamp_column = Arc::new(TimestampMillisecondArray::from_slice([ + 0, 30_000, 60_000, 90_000, 120_000, // every 30s + 180_000, 240_000, // every 60s + 241_000, 271_000, 291_000, // others + ])) as _; + let value_column: ArrayRef = Arc::new(Float64Array::from_slice([1.0; 10])) as _; + let path_column = Arc::new(StringArray::from_slice(["foo"; 10])) as _; + let data = RecordBatch::try_new( + schema.clone(), + vec![ + timestamp_column, + value_column.clone(), + value_column, + path_column, + ], + ) + .unwrap(); + + MemoryExec::try_new(&[vec![data]], schema, None).unwrap() + } + + async fn do_normalize_test( + start: Millisecond, + end: Millisecond, + interval: Millisecond, + range: Millisecond, + expected: String, + ) { + let memory_exec = Arc::new(prepare_test_data()); + let time_index = TIME_INDEX_COLUMN.to_string(); + let value_columns = vec!["value_1".to_string(), "value_2".to_string()]; + let manipulate_output_schema = SchemaRef::new( + RangeManipulate::calculate_output_schema( + &memory_exec.schema().to_dfschema_ref().unwrap(), + &time_index, + &value_columns, + ) + .unwrap() + .as_ref() + .into(), + ); + let normalize_exec = Arc::new(RangeManipulateExec { + start, + end, + interval, + range, + value_columns, + output_schema: manipulate_output_schema, + time_index_column: time_index, + input: memory_exec, + metric: ExecutionPlanMetricsSet::new(), + }); + let session_context = SessionContext::default(); + let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx()) + .await + .unwrap(); + // DirectoryArray from RangeArray cannot be print as normal arrays. + let result_literal: String = result + .into_iter() + .filter_map(|batch| { + batch + .columns() + .iter() + .map(|array| { + if matches!(array.data_type(), &DataType::Dictionary(..)) { + let dict_array = array + .as_any() + .downcast_ref::>() + .unwrap() + .clone(); + format!("{:?}", RangeArray::try_new(dict_array).unwrap()) + } else { + format!("{array:?}") + } + }) + .reduce(|lhs, rhs| lhs + "\n" + &rhs) + }) + .reduce(|lhs, rhs| lhs + "\n\n" + &rhs) + .unwrap(); + + assert_eq!(result_literal, expected); + } + + #[tokio::test] + async fn interval_30s_range_90s() { + let expected = String::from( + "RangeArray { \ + base array: PrimitiveArray\n[\n 1970-01-01T00:00:00,\n 1970-01-01T00:00:30,\n 1970-01-01T00:01:00,\n 1970-01-01T00:01:30,\n 1970-01-01T00:02:00,\n 1970-01-01T00:03:00,\n 1970-01-01T00:04:00,\n 1970-01-01T00:04:01,\n 1970-01-01T00:04:31,\n 1970-01-01T00:04:51,\n], \ + ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \ + }\nRangeArray { \ + base array: PrimitiveArray\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \ + ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \ + }\nRangeArray { \ + base array: PrimitiveArray\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \ + ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \ + }\nStringArray\n[\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n]"); + do_normalize_test(0, 310_000, 30_000, 90_000, expected).await; + } + + #[tokio::test] + async fn small_empty_range() { + let expected = String::from( + "RangeArray { \ + base array: PrimitiveArray\n[\n 1970-01-01T00:00:00,\n 1970-01-01T00:00:30,\n 1970-01-01T00:01:00,\n 1970-01-01T00:01:30,\n 1970-01-01T00:02:00,\n 1970-01-01T00:03:00,\n 1970-01-01T00:04:00,\n 1970-01-01T00:04:01,\n 1970-01-01T00:04:31,\n 1970-01-01T00:04:51,\n], \ + ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \ + }\nRangeArray { \ + base array: PrimitiveArray\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \ + ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \ + }\nRangeArray { \ + base array: PrimitiveArray\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \ + ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \ + }\nStringArray\n[\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n]"); + do_normalize_test(1, 10_001, 3_000, 1_000, expected).await; + } +} diff --git a/src/promql/src/range_array.rs b/src/promql/src/range_array.rs index 7784b61360..e5a4fe7d8f 100644 --- a/src/promql/src/range_array.rs +++ b/src/promql/src/range_array.rs @@ -14,12 +14,15 @@ //!An extended "array" based on [DictionaryArray]. +use datafusion::arrow::datatypes::Field; use datatypes::arrow::array::{Array, ArrayData, ArrayRef, DictionaryArray, Int64Array}; use datatypes::arrow::datatypes::{DataType, Int64Type}; use snafu::{ensure, OptionExt}; use crate::error::{EmptyRangeSnafu, IllegalRangeSnafu, Result}; +pub type RangeTuple = (u32, u32); + /// An compound logical "array" type. Represent serval ranges (slices) of one array. /// It's useful to use case like compute sliding window, or range selector from promql. /// @@ -82,7 +85,7 @@ impl RangeArray { pub fn from_ranges(values: ArrayRef, ranges: R) -> Result where - R: IntoIterator + Clone, + R: IntoIterator + Clone, { Self::check_ranges(values.len(), ranges.clone())?; @@ -99,7 +102,7 @@ impl RangeArray { /// [`from_ranges`]: crate::range_array::RangeArray#method.from_ranges pub unsafe fn from_ranges_unchecked(values: ArrayRef, ranges: R) -> Self where - R: IntoIterator, + R: IntoIterator, { let key_array = Int64Array::from_iter( ranges @@ -161,7 +164,7 @@ impl RangeArray { fn check_ranges(value_len: usize, ranges: R) -> Result<()> where - R: IntoIterator, + R: IntoIterator, { for (offset, length) in ranges.into_iter() { ensure!( @@ -175,6 +178,28 @@ impl RangeArray { } Ok(()) } + + /// Change the field's datatype to the type after processed by [RangeArray]. + /// Like `Utf8` will become `Dictionary`. + pub fn convert_field(field: &Field) -> Field { + let value_type = Box::new(field.data_type().clone()); + Field::new( + field.name(), + DataType::Dictionary(Box::new(Self::key_type()), value_type), + field.is_nullable(), + ) + } + + pub fn values(&self) -> &ArrayRef { + self.array.values() + } + + pub fn ranges(&self) -> impl Iterator> + '_ { + self.array + .keys() + .into_iter() + .map(|compound| compound.map(unpack)) + } } impl Array for RangeArray {