From b58296de22ec774c48e420d86d5d2bb80c82e06f Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 28 Dec 2023 14:56:17 +0800 Subject: [PATCH] feat: Implement OR for PromQL (#3024) * with anit-join Signed-off-by: Ruihang Xia * impl UnionDistinctOn Signed-off-by: Ruihang Xia * unify schema Signed-off-by: Ruihang Xia * fix clippy Signed-off-by: Ruihang Xia * add sqlness case Signed-off-by: Ruihang Xia * add UTs Signed-off-by: Ruihang Xia * Update src/promql/src/planner.rs Co-authored-by: dennis zhuang --------- Signed-off-by: Ruihang Xia Co-authored-by: dennis zhuang --- Cargo.lock | 1 + src/promql/Cargo.toml | 1 + src/promql/src/extension_plan.rs | 4 + .../src/extension_plan/instant_manipulate.rs | 50 +- src/promql/src/extension_plan/planner.rs | 7 +- src/promql/src/extension_plan/test_util.rs | 64 ++ .../src/extension_plan/union_distinct_on.rs | 576 ++++++++++++++++++ src/promql/src/lib.rs | 1 + src/promql/src/planner.rs | 127 +++- .../common/promql/set_operation.result | 148 ++++- .../common/promql/set_operation.sql | 36 +- 11 files changed, 951 insertions(+), 64 deletions(-) create mode 100644 src/promql/src/extension_plan/test_util.rs create mode 100644 src/promql/src/extension_plan/union_distinct_on.rs diff --git a/Cargo.lock b/Cargo.lock index 97e7f48e92..abe0acb612 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6541,6 +6541,7 @@ dependencies = [ name = "promql" version = "0.5.0" dependencies = [ + "ahash 0.8.6", "async-recursion", "async-trait", "bytemuck", diff --git a/src/promql/Cargo.toml b/src/promql/Cargo.toml index a10973d4eb..6be12de4e3 100644 --- a/src/promql/Cargo.toml +++ b/src/promql/Cargo.toml @@ -5,6 +5,7 @@ edition.workspace = true license.workspace = true [dependencies] +ahash.workspace = true async-recursion = "1.0" async-trait.workspace = true bytemuck.workspace = true diff --git a/src/promql/src/extension_plan.rs b/src/promql/src/extension_plan.rs index 49a9199bf0..ff2195e532 100644 --- a/src/promql/src/extension_plan.rs +++ b/src/promql/src/extension_plan.rs @@ -19,6 +19,9 @@ mod normalize; mod planner; mod range_manipulate; mod series_divide; +#[cfg(test)] +mod test_util; +mod union_distinct_on; use datafusion::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType}; pub use empty_metric::{build_special_time_expr, EmptyMetric, EmptyMetricExec, EmptyMetricStream}; @@ -28,5 +31,6 @@ pub use normalize::{SeriesNormalize, SeriesNormalizeExec, SeriesNormalizeStream} pub use planner::PromExtensionPlanner; pub use range_manipulate::{RangeManipulate, RangeManipulateExec, RangeManipulateStream}; pub use series_divide::{SeriesDivide, SeriesDivideExec, SeriesDivideStream}; +pub use union_distinct_on::{UnionDistinctOn, UnionDistinctOnExec, UnionDistinctOnStream}; pub(crate) type Millisecond = ::Native; diff --git a/src/promql/src/extension_plan/instant_manipulate.rs b/src/promql/src/extension_plan/instant_manipulate.rs index ba155627d2..e65592bb37 100644 --- a/src/promql/src/extension_plan/instant_manipulate.rs +++ b/src/promql/src/extension_plan/instant_manipulate.rs @@ -445,40 +445,12 @@ impl InstantManipulateStream { #[cfg(test)] mod test { - use datafusion::arrow::array::Float64Array; - use datafusion::arrow::datatypes::{ - ArrowPrimitiveType, DataType, Field, Schema, TimestampMillisecondType, - }; - use datafusion::physical_plan::memory::MemoryExec; use datafusion::prelude::SessionContext; - use datatypes::arrow::array::TimestampMillisecondArray; - use datatypes::arrow_array::StringArray; use super::*; - - const TIME_INDEX_COLUMN: &str = "timestamp"; - - fn prepare_test_data() -> MemoryExec { - let schema = Arc::new(Schema::new(vec![ - Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true), - Field::new("value", DataType::Float64, true), - Field::new("path", DataType::Utf8, true), - ])); - let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![ - 0, 30_000, 60_000, 90_000, 120_000, // every 30s - 180_000, 240_000, // every 60s - 241_000, 271_000, 291_000, // others - ])) as _; - let field_column = Arc::new(Float64Array::from(vec![1.0; 10])) as _; - let path_column = Arc::new(StringArray::from(vec!["foo"; 10])) as _; - let data = RecordBatch::try_new( - schema.clone(), - vec![timestamp_column, field_column, path_column], - ) - .unwrap(); - - MemoryExec::try_new(&[vec![data]], schema, None).unwrap() - } + use crate::extension_plan::test_util::{ + prepare_test_data, prepare_test_data_with_nan, TIME_INDEX_COLUMN, + }; async fn do_normalize_test( start: Millisecond, @@ -749,22 +721,6 @@ mod test { do_normalize_test(190_000, 300_000, 30_000, 10_000, expected, false).await; } - fn prepare_test_data_with_nan() -> MemoryExec { - let schema = Arc::new(Schema::new(vec![ - Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true), - Field::new("value", DataType::Float64, true), - ])); - let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![ - 0, 30_000, 60_000, 90_000, 120_000, // every 30s - ])) as _; - let field_column = - Arc::new(Float64Array::from(vec![0.0, f64::NAN, 6.0, f64::NAN, 12.0])) as _; - let data = - RecordBatch::try_new(schema.clone(), vec![timestamp_column, field_column]).unwrap(); - - MemoryExec::try_new(&[vec![data]], schema, None).unwrap() - } - #[tokio::test] async fn lookback_10s_interval_10s_with_nan() { let expected = String::from( diff --git a/src/promql/src/extension_plan/planner.rs b/src/promql/src/extension_plan/planner.rs index 7798c9b321..80cd565bd2 100644 --- a/src/promql/src/extension_plan/planner.rs +++ b/src/promql/src/extension_plan/planner.rs @@ -21,7 +21,7 @@ use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode}; use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner}; -use super::HistogramFold; +use super::{HistogramFold, UnionDistinctOn}; use crate::extension_plan::{ EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize, }; @@ -50,6 +50,11 @@ impl ExtensionPlanner for PromExtensionPlanner { Ok(Some(node.to_execution_plan(session_state, planner)?)) } else if let Some(node) = node.as_any().downcast_ref::() { Ok(Some(node.to_execution_plan(physical_inputs[0].clone()))) + } else if let Some(node) = node.as_any().downcast_ref::() { + Ok(Some(node.to_execution_plan( + physical_inputs[0].clone(), + physical_inputs[1].clone(), + ))) } else { Ok(None) } diff --git a/src/promql/src/extension_plan/test_util.rs b/src/promql/src/extension_plan/test_util.rs new file mode 100644 index 0000000000..f751cb9fa8 --- /dev/null +++ b/src/promql/src/extension_plan/test_util.rs @@ -0,0 +1,64 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utils for testing extension plan + +use std::sync::Arc; + +use common_recordbatch::DfRecordBatch as RecordBatch; +use datafusion::arrow::array::Float64Array; +use datafusion::arrow::datatypes::{ + ArrowPrimitiveType, DataType, Field, Schema, TimestampMillisecondType, +}; +use datafusion::physical_plan::memory::MemoryExec; +use datatypes::arrow::array::TimestampMillisecondArray; +use datatypes::arrow_array::StringArray; + +pub(crate) const TIME_INDEX_COLUMN: &str = "timestamp"; + +pub(crate) fn prepare_test_data() -> MemoryExec { + let schema = Arc::new(Schema::new(vec![ + Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true), + Field::new("value", DataType::Float64, true), + Field::new("path", DataType::Utf8, true), + ])); + let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![ + 0, 30_000, 60_000, 90_000, 120_000, // every 30s + 180_000, 240_000, // every 60s + 241_000, 271_000, 291_000, // others + ])) as _; + let field_column = Arc::new(Float64Array::from(vec![1.0; 10])) as _; + let path_column = Arc::new(StringArray::from(vec!["foo"; 10])) as _; + let data = RecordBatch::try_new( + schema.clone(), + vec![timestamp_column, field_column, path_column], + ) + .unwrap(); + + MemoryExec::try_new(&[vec![data]], schema, None).unwrap() +} + +pub(crate) fn prepare_test_data_with_nan() -> MemoryExec { + let schema = Arc::new(Schema::new(vec![ + Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true), + Field::new("value", DataType::Float64, true), + ])); + let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![ + 0, 30_000, 60_000, 90_000, 120_000, // every 30s + ])) as _; + let field_column = Arc::new(Float64Array::from(vec![0.0, f64::NAN, 6.0, f64::NAN, 12.0])) as _; + let data = RecordBatch::try_new(schema.clone(), vec![timestamp_column, field_column]).unwrap(); + + MemoryExec::try_new(&[vec![data]], schema, None).unwrap() +} diff --git a/src/promql/src/extension_plan/union_distinct_on.rs b/src/promql/src/extension_plan/union_distinct_on.rs new file mode 100644 index 0000000000..22551b73f8 --- /dev/null +++ b/src/promql/src/extension_plan/union_distinct_on.rs @@ -0,0 +1,576 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use ahash::{HashMap, RandomState}; +use datafusion::arrow::array::UInt64Array; +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::common::DFSchemaRef; +use datafusion::error::{DataFusionError, Result as DataFusionResult}; +use datafusion::execution::context::TaskContext; +use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore}; +use datafusion::physical_expr::PhysicalSortExpr; +use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; +use datafusion::physical_plan::{ + hash_utils, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, + RecordBatchStream, SendableRecordBatchStream, Statistics, +}; +use datatypes::arrow::compute; +use futures::future::BoxFuture; +use futures::{ready, Stream, StreamExt, TryStreamExt}; + +/// A special kind of `UNION`(`OR` in PromQL) operator, for PromQL specific use case. +/// +/// This operator is similar to `UNION` from SQL, but it only accepts two inputs. The +/// most different part is that it treat left child and right child differently: +/// - All columns from left child will be outputted. +/// - Only check collisions (when not distinct) on the columns specified by `compare_keys`. +/// - When there is a collision: +/// - If the collision is from right child itself, only the first observed row will be +/// preserved. All others are discarded. +/// - If the collision is from left child, the row in right child will be discarded. +/// - The output order is not maintained. This plan will output left child first, then right child. +/// - The output schema contains all columns from left or right child plans. +/// +/// From the implementation perspective, this operator is similar to `HashJoin`, but the +/// probe side is the right child, and the build side is the left child. Another difference +/// is that the probe is opting-out. +/// +/// This plan will exhaust the right child first to build probe hash table, then streaming +/// on left side, and use the left side to "mask" the hash table. +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct UnionDistinctOn { + left: LogicalPlan, + right: LogicalPlan, + /// The columns to compare for equality. + /// TIME INDEX is included. + compare_keys: Vec, + ts_col: String, + output_schema: DFSchemaRef, +} + +impl UnionDistinctOn { + pub fn name() -> &'static str { + "UnionDistinctOn" + } + + pub fn new( + left: LogicalPlan, + right: LogicalPlan, + compare_keys: Vec, + ts_col: String, + output_schema: DFSchemaRef, + ) -> Self { + Self { + left, + right, + compare_keys, + ts_col, + output_schema, + } + } + + pub fn to_execution_plan( + &self, + left_exec: Arc, + right_exec: Arc, + ) -> Arc { + Arc::new(UnionDistinctOnExec { + left: left_exec, + right: right_exec, + compare_keys: self.compare_keys.clone(), + ts_col: self.ts_col.clone(), + output_schema: Arc::new(self.output_schema.as_ref().into()), + metric: ExecutionPlanMetricsSet::new(), + random_state: RandomState::new(), + }) + } +} + +impl UserDefinedLogicalNodeCore for UnionDistinctOn { + fn name(&self) -> &str { + Self::name() + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![&self.left, &self.right] + } + + fn schema(&self) -> &DFSchemaRef { + &self.output_schema + } + + fn expressions(&self) -> Vec { + vec![] + } + + fn fmt_for_explain(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "UnionDistinctOn: on col=[{:?}], ts_col=[{}]", + self.compare_keys, self.ts_col + ) + } + + fn from_template(&self, _exprs: &[Expr], inputs: &[LogicalPlan]) -> Self { + assert_eq!(inputs.len(), 2); + + let left = inputs[0].clone(); + let right = inputs[1].clone(); + Self { + left, + right, + compare_keys: self.compare_keys.clone(), + ts_col: self.ts_col.clone(), + output_schema: self.output_schema.clone(), + } + } +} + +#[derive(Debug)] +pub struct UnionDistinctOnExec { + left: Arc, + right: Arc, + compare_keys: Vec, + ts_col: String, + output_schema: SchemaRef, + metric: ExecutionPlanMetricsSet, + + /// Shared the `RandomState` for the hashing algorithm + random_state: RandomState, +} + +impl ExecutionPlan for UnionDistinctOnExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.output_schema.clone() + } + + fn required_input_distribution(&self) -> Vec { + vec![Distribution::SinglePartition, Distribution::SinglePartition] + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(1) + } + + /// [UnionDistinctOnExec] will output left first, then right. + /// So the order of the output is not maintained. + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn children(&self) -> Vec> { + vec![self.left.clone(), self.right.clone()] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> DataFusionResult> { + assert_eq!(children.len(), 2); + + let left = children[0].clone(); + let right = children[1].clone(); + Ok(Arc::new(UnionDistinctOnExec { + left, + right, + compare_keys: self.compare_keys.clone(), + ts_col: self.ts_col.clone(), + output_schema: self.output_schema.clone(), + metric: self.metric.clone(), + random_state: self.random_state.clone(), + })) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> DataFusionResult { + let left_stream = self.left.execute(partition, context.clone())?; + let right_stream = self.right.execute(partition, context.clone())?; + + // Convert column name to column index. Add one for the time column. + let mut key_indices = Vec::with_capacity(self.compare_keys.len() + 1); + for key in &self.compare_keys { + let index = self + .output_schema + .column_with_name(key) + .map(|(i, _)| i) + .ok_or_else(|| DataFusionError::Internal(format!("Column {} not found", key)))?; + key_indices.push(index); + } + let ts_index = self + .output_schema + .column_with_name(&self.ts_col) + .map(|(i, _)| i) + .ok_or_else(|| { + DataFusionError::Internal(format!("Column {} not found", self.ts_col)) + })?; + key_indices.push(ts_index); + + // Build right hash table future. + let hashed_data_future = HashedDataFut::Pending(Box::pin(HashedData::new( + right_stream, + self.random_state.clone(), + key_indices.clone(), + ))); + + let baseline_metric = BaselineMetrics::new(&self.metric, partition); + Ok(Box::pin(UnionDistinctOnStream { + left: left_stream, + right: hashed_data_future, + compare_keys: key_indices, + output_schema: self.output_schema.clone(), + metric: baseline_metric, + })) + } + + fn metrics(&self) -> Option { + Some(self.metric.clone_inner()) + } + + fn statistics(&self) -> Statistics { + Statistics::default() + } +} + +impl DisplayAs for UnionDistinctOnExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "UnionDistinctOnExec: on col=[{:?}], ts_col=[{}]", + self.compare_keys, self.ts_col + ) + } + } + } +} + +// TODO(ruihang): some unused fields are for metrics, which will be implemented later. +#[allow(dead_code)] +pub struct UnionDistinctOnStream { + left: SendableRecordBatchStream, + right: HashedDataFut, + /// Include time index + compare_keys: Vec, + output_schema: SchemaRef, + metric: BaselineMetrics, +} + +impl UnionDistinctOnStream { + fn poll_impl(&mut self, cx: &mut Context<'_>) -> Poll::Item>> { + // resolve the right stream + let right = match self.right { + HashedDataFut::Pending(ref mut fut) => { + let right = ready!(fut.as_mut().poll(cx))?; + self.right = HashedDataFut::Ready(right); + let HashedDataFut::Ready(right_ref) = &mut self.right else { + unreachable!() + }; + right_ref + } + HashedDataFut::Ready(ref mut right) => right, + HashedDataFut::Empty => return Poll::Ready(None), + }; + + // poll left and probe with right + let next_left = ready!(self.left.poll_next_unpin(cx)); + match next_left { + Some(Ok(left)) => { + // observe left batch and return it + right.update_map(&left)?; + Poll::Ready(Some(Ok(left))) + } + Some(Err(e)) => Poll::Ready(Some(Err(e))), + None => { + // left stream is exhausted, so we can send the right part + let right = std::mem::replace(&mut self.right, HashedDataFut::Empty); + let HashedDataFut::Ready(data) = right else { + unreachable!() + }; + Poll::Ready(Some(data.finish())) + } + } + } +} + +impl RecordBatchStream for UnionDistinctOnStream { + fn schema(&self) -> SchemaRef { + self.output_schema.clone() + } +} + +impl Stream for UnionDistinctOnStream { + type Item = DataFusionResult; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_impl(cx) + } +} + +/// Simple future state for [HashedData] +enum HashedDataFut { + /// The result is not ready + Pending(BoxFuture<'static, DataFusionResult>), + /// The result is ready + Ready(HashedData), + /// The result is taken + Empty, +} + +/// ALL input batches and its hash table +struct HashedData { + // TODO(ruihang): use `JoinHashMap` instead after upgrading to DF 34.0 + /// Hash table for all input batches. The key is hash value, and the value + /// is the index of `bathc`. + hash_map: HashMap, + /// Output batch. + batch: RecordBatch, + /// The indices of the columns to be hashed. + hash_key_indices: Vec, + random_state: RandomState, +} + +impl HashedData { + pub async fn new( + input: SendableRecordBatchStream, + random_state: RandomState, + hash_key_indices: Vec, + ) -> DataFusionResult { + // Collect all batches from the input stream + let initial = (Vec::new(), 0); + let (batches, _num_rows) = input + .try_fold(initial, |mut acc, batch| async { + // Update rowcount + acc.1 += batch.num_rows(); + // Push batch to output + acc.0.push(batch); + Ok(acc) + }) + .await?; + + // Create hash for each batch + let mut hash_map = HashMap::default(); + let mut hashes_buffer = Vec::new(); + let mut interleave_indices = Vec::new(); + for (batch_number, batch) in batches.iter().enumerate() { + hashes_buffer.resize(batch.num_rows(), 0); + // get columns for hashing + let arrays = hash_key_indices + .iter() + .map(|i| batch.column(*i).clone()) + .collect::>(); + + // compute hash + let hash_values = + hash_utils::create_hashes(&arrays, &random_state, &mut hashes_buffer)?; + for (row_number, hash_value) in hash_values.iter().enumerate() { + // Only keeps the first observed row for each hash value + if hash_map + .try_insert(*hash_value, interleave_indices.len()) + .is_ok() + { + interleave_indices.push((batch_number, row_number)); + } + } + } + + // Finilize the hash map + let batch = interleave_batches(batches, interleave_indices)?; + + Ok(Self { + hash_map, + batch, + hash_key_indices, + random_state, + }) + } + + /// Remove rows that hash value present in the input + /// record batch from the hash map. + pub fn update_map(&mut self, input: &RecordBatch) -> DataFusionResult<()> { + // get columns for hashing + let mut hashes_buffer = Vec::new(); + let arrays = self + .hash_key_indices + .iter() + .map(|i| input.column(*i).clone()) + .collect::>(); + + // compute hash + hashes_buffer.resize(input.num_rows(), 0); + let hash_values = + hash_utils::create_hashes(&arrays, &self.random_state, &mut hashes_buffer)?; + + // remove those hashes + for hash in hash_values { + self.hash_map.remove(hash); + } + + Ok(()) + } + + pub fn finish(self) -> DataFusionResult { + let valid_indices = self.hash_map.values().copied().collect::>(); + let result = take_batch(&self.batch, &valid_indices)?; + Ok(result) + } +} + +/// Utility function to interleave batches. Based on [interleave](datafusion::arrow::compute::interleave) +fn interleave_batches( + batches: Vec, + indices: Vec<(usize, usize)>, +) -> DataFusionResult { + let schema = batches[0].schema(); + + // transform batches into arrays + let mut arrays = vec![vec![]; schema.fields().len()]; + for batch in &batches { + for (i, array) in batch.columns().iter().enumerate() { + arrays[i].push(array.as_ref()); + } + } + + // interleave arrays + let mut interleaved_arrays = Vec::with_capacity(arrays.len()); + for array in arrays { + interleaved_arrays.push(compute::interleave(&array, &indices)?); + } + + // assemble new record batch + RecordBatch::try_new(schema.clone(), interleaved_arrays).map_err(DataFusionError::ArrowError) +} + +/// Utility function to take rows from a record batch. Based on [take](datafusion::arrow::compute::take) +fn take_batch(batch: &RecordBatch, indices: &[usize]) -> DataFusionResult { + // fast path + if batch.num_rows() == indices.len() { + return Ok(batch.clone()); + } + + let schema = batch.schema(); + + let indices_array = UInt64Array::from_iter(indices.iter().map(|i| *i as u64)); + let arrays = batch + .columns() + .iter() + .map(|array| compute::take(array, &indices_array, None)) + .collect::, _>>() + .map_err(DataFusionError::ArrowError)?; + + let result = RecordBatch::try_new(schema, arrays).map_err(DataFusionError::ArrowError)?; + Ok(result) +} + +#[cfg(test)] +mod test { + use datafusion::arrow::array::Int32Array; + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + + use super::*; + + #[test] + fn test_interleave_batches() { + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ]); + + let batch1 = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![4, 5, 6])), + ], + ) + .unwrap(); + + let batch2 = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![ + Arc::new(Int32Array::from(vec![7, 8, 9])), + Arc::new(Int32Array::from(vec![10, 11, 12])), + ], + ) + .unwrap(); + + let batch3 = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![ + Arc::new(Int32Array::from(vec![13, 14, 15])), + Arc::new(Int32Array::from(vec![16, 17, 18])), + ], + ) + .unwrap(); + + let batches = vec![batch1, batch2, batch3]; + let indices = vec![(0, 0), (1, 0), (2, 0), (0, 1), (1, 1), (2, 1)]; + let result = interleave_batches(batches, indices).unwrap(); + + let expected = RecordBatch::try_new( + Arc::new(schema), + vec![ + Arc::new(Int32Array::from(vec![1, 7, 13, 2, 8, 14])), + Arc::new(Int32Array::from(vec![4, 10, 16, 5, 11, 17])), + ], + ) + .unwrap(); + + assert_eq!(result, expected); + } + + #[test] + fn test_take_batch() { + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ]); + + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![4, 5, 6])), + ], + ) + .unwrap(); + + let indices = vec![0, 2]; + let result = take_batch(&batch, &indices).unwrap(); + + let expected = RecordBatch::try_new( + Arc::new(schema), + vec![ + Arc::new(Int32Array::from(vec![1, 3])), + Arc::new(Int32Array::from(vec![4, 6])), + ], + ) + .unwrap(); + + assert_eq!(result, expected); + } +} diff --git a/src/promql/src/lib.rs b/src/promql/src/lib.rs index 9514a01538..127bf45d5f 100644 --- a/src/promql/src/lib.rs +++ b/src/promql/src/lib.rs @@ -14,6 +14,7 @@ #![feature(option_get_or_insert_default)] #![feature(let_chains)] +#![feature(map_try_insert)] pub mod error; pub mod extension_plan; diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 137035755b..7c8176d7b9 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -51,7 +51,7 @@ use crate::error::{ }; use crate::extension_plan::{ build_special_time_expr, EmptyMetric, HistogramFold, InstantManipulate, Millisecond, - RangeManipulate, SeriesDivide, SeriesNormalize, + RangeManipulate, SeriesDivide, SeriesNormalize, UnionDistinctOn, }; use crate::functions::{ AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters, IDelta, @@ -1489,6 +1489,7 @@ impl PromPlanner { .context(DataFusionPlanningSnafu) } + /// Build a set operator (AND/OR/UNLESS) fn set_op_on_non_field_columns( &self, left: LogicalPlan, @@ -1501,6 +1502,10 @@ impl PromPlanner { let mut left_tag_col_set = left_tag_cols.into_iter().collect::>(); let mut right_tag_col_set = right_tag_cols.into_iter().collect::>(); + if matches!(op.id(), token::T_LOR) { + return self.or_operator(left, right, left_tag_col_set, right_tag_col_set, modifier); + } + // apply modifier if let Some(modifier) = modifier { // one-to-many and many-to-one are not supported @@ -1545,7 +1550,8 @@ impl PromPlanner { ) }; let join_keys = left_tag_col_set - .into_iter() + .iter() + .cloned() .chain([self.ctx.time_index_column.clone().unwrap()]) .collect::>(); @@ -1579,17 +1585,122 @@ impl PromPlanner { .build() .context(DataFusionPlanningSnafu), token::T_LOR => { - // `OR` can not be expressed by `UNION` precisely. - // it will generate unexpceted result when schemas don't match - UnsupportedExprSnafu { - name: "set operation `OR`", - } - .fail() + self.or_operator(left, right, left_tag_col_set, right_tag_col_set, modifier) } _ => UnexpectedTokenSnafu { token: op }.fail(), } } + // TODO(ruihang): change function name + fn or_operator( + &self, + left: LogicalPlan, + right: LogicalPlan, + left_tag_cols_set: HashSet, + right_tag_cols_set: HashSet, + modifier: &Option, + ) -> Result { + // prepare hash sets + let all_tags = left_tag_cols_set + .union(&right_tag_cols_set) + .cloned() + .collect::>(); + let tags_not_in_left = all_tags + .difference(&left_tag_cols_set) + .cloned() + .collect::>(); + let tags_not_in_right = all_tags + .difference(&right_tag_cols_set) + .cloned() + .collect::>(); + let left_qualifier = left.schema().field(0).qualifier().cloned(); + let right_qualifier = right.schema().field(0).qualifier().cloned(); + let left_qualifier_string = left_qualifier + .as_ref() + .map(|l| l.to_string()) + .unwrap_or_default(); + let right_qualifier_string = right_qualifier + .as_ref() + .map(|r| r.to_string()) + .unwrap_or_default(); + + // step 0: fill all columns in output schema + let all_columns_set = left + .schema() + .fields() + .iter() + .chain(right.schema().fields().iter()) + .map(|field| field.name().clone()) + .collect::>(); + let mut all_columns = all_columns_set.into_iter().collect::>(); + // sort to ensure the generated schema is not volatile + all_columns.sort_unstable(); + + // step 1: align schema using project, fill non-exist columns with null + let left_proj_exprs = all_columns.iter().map(|col| { + if tags_not_in_left.contains(col) { + DfExpr::Literal(ScalarValue::Utf8(None)).alias(col.to_string()) + } else { + DfExpr::Column(Column::new(left_qualifier.clone(), col)) + } + }); + let right_proj_exprs = all_columns.iter().map(|col| { + if tags_not_in_right.contains(col) { + DfExpr::Literal(ScalarValue::Utf8(None)).alias(col.to_string()) + } else { + DfExpr::Column(Column::new(right_qualifier.clone(), col)) + } + }); + + let left_projected = LogicalPlanBuilder::from(left) + .project(left_proj_exprs) + .context(DataFusionPlanningSnafu)? + .alias(left_qualifier_string.clone()) + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu)?; + let right_projected = LogicalPlanBuilder::from(right) + .project(right_proj_exprs) + .context(DataFusionPlanningSnafu)? + .alias(right_qualifier_string.clone()) + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu)?; + + // step 2: compute match columns + let mut match_columns = if let Some(modifier) = modifier + && let Some(matching) = &modifier.matching + { + match matching { + // keeps columns mentioned in `on` + LabelModifier::Include(on) => on.labels.clone(), + // removes columns memtioned in `ignoring` + LabelModifier::Exclude(ignoring) => { + let ignoring = ignoring.labels.iter().cloned().collect::>(); + all_tags.difference(&ignoring).cloned().collect() + } + } + } else { + all_tags.iter().cloned().collect() + }; + // sort to ensure the generated plan is not volatile + match_columns.sort_unstable(); + // step 3: build `UnionDistinctOn` plan + let schema = left_projected.schema().clone(); + let union_distinct_on = UnionDistinctOn::new( + left_projected, + right_projected, + match_columns, + self.ctx.time_index_column.clone().unwrap(), + schema, + ); + let result = LogicalPlan::Extension(Extension { + node: Arc::new(union_distinct_on), + }); + + Ok(result) + } + /// Build a projection that project and perform operation expr for every value columns. /// Non-value columns (tag and timestamp) will be preserved in the projection. /// diff --git a/tests/cases/standalone/common/promql/set_operation.result b/tests/cases/standalone/common/promql/set_operation.result index d14b6fe88b..15a7a865a3 100644 --- a/tests/cases/standalone/common/promql/set_operation.result +++ b/tests/cases/standalone/common/promql/set_operation.result @@ -130,10 +130,21 @@ tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) and ignoring(g, job) -- http_requests{group="production", instance="0", job="app-server"} 500 -- http_requests{group="production", instance="1", job="api-server"} 200 -- http_requests{group="production", instance="1", job="app-server"} 600 --- NOT SUPPORTED: `or` +-- SQLNESS SORT_RESULT 3 1 tql eval (3000, 3000, '1s') http_requests{g="canary"} or http_requests{g="production"}; -Error: 1004(InvalidArguments), Unsupported expr type: set operation `OR` ++------------+----------+-----+---------------------+-------+ +| g | instance | job | ts | val | ++------------+----------+-----+---------------------+-------+ +| canary | 0 | api | 1970-01-01T00:50:00 | 300.0 | +| canary | 0 | app | 1970-01-01T00:50:00 | 700.0 | +| canary | 1 | api | 1970-01-01T00:50:00 | 400.0 | +| canary | 1 | app | 1970-01-01T00:50:00 | 800.0 | +| production | 0 | api | 1970-01-01T00:50:00 | 100.0 | +| production | 0 | app | 1970-01-01T00:50:00 | 500.0 | +| production | 1 | api | 1970-01-01T00:50:00 | 200.0 | +| production | 1 | app | 1970-01-01T00:50:00 | 600.0 | ++------------+----------+-----+---------------------+-------+ -- # On overlap the rhs samples must be dropped. -- eval instant at 50m (http_requests{group="canary"} + 1) or http_requests{instance="1"} @@ -143,10 +154,10 @@ Error: 1004(InvalidArguments), Unsupported expr type: set operation `OR` -- {group="canary", instance="1", job="app-server"} 801 -- http_requests{group="production", instance="1", job="api-server"} 200 -- http_requests{group="production", instance="1", job="app-server"} 600 --- NOT SUPPORTED: `or` +-- SQLNESS SORT_RESULT 3 1 tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) or http_requests{instance="1"}; -Error: 1004(InvalidArguments), Unsupported expr type: set operation `OR` +Error: 1004(InvalidArguments), Internal error during building DataFusion plan: No field named http_requests.val. Valid fields are http_requests.job, http_requests.instance, http_requests.g, http_requests.ts, "val + Float64(1)". -- # Matching only on instance excludes everything that has instance=0/1 but includes -- # entries without the instance label. @@ -161,7 +172,7 @@ Error: 1004(InvalidArguments), Unsupported expr type: set operation `OR` -- NOT SUPPORTED: `or` tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) or on(instance) (http_requests or cpu_count or vector_matching_a); -Error: 1004(InvalidArguments), Unsupported expr type: set operation `OR` +Error: 1004(InvalidArguments), Internal error during building DataFusion plan: No field named cpu_count.val. Valid fields are cpu_count.ts. -- eval instant at 50m (http_requests{group="canary"} + 1) or ignoring(l, group, job) (http_requests or cpu_count or vector_matching_a) -- {group="canary", instance="0", job="api-server"} 301 @@ -174,7 +185,7 @@ Error: 1004(InvalidArguments), Unsupported expr type: set operation `OR` -- NOT SUPPORTED: `or` tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) or ignoring(l, g, job) (http_requests or cpu_count or vector_matching_a); -Error: 1004(InvalidArguments), Unsupported expr type: set operation `OR` +Error: 1004(InvalidArguments), Internal error during building DataFusion plan: No field named cpu_count.val. Valid fields are cpu_count.ts. -- eval instant at 50m http_requests{group="canary"} unless http_requests{instance="0"} -- http_requests{group="canary", instance="1", job="api-server"} 400 @@ -268,3 +279,128 @@ drop table vector_matching_a; Affected Rows: 0 +-- the following cases are not from Prometheus. +create table t1 (ts timestamp time index, job string primary key, val double); + +Affected Rows: 0 + +insert into t1 values (0, "a", 1.0), (500000, "b", 2.0), (1000000, "a", 3.0), (1500000, "c", 4.0); + +Affected Rows: 4 + +create table t2 (ts timestamp time index, val double); + +Affected Rows: 0 + +insert into t2 values (0, 0), (300000, 0), (600000, 0), (900000, 0), (1200000, 0), (1500000, 0), (1800000, 0); + +Affected Rows: 7 + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t1 or t2; + ++-----+---------------------+-----+ +| job | ts | val | ++-----+---------------------+-----+ +| | 1970-01-01T00:00:00 | 0.0 | +| | 1970-01-01T00:06:40 | 0.0 | +| | 1970-01-01T00:13:20 | 0.0 | +| | 1970-01-01T00:20:00 | 0.0 | +| | 1970-01-01T00:26:40 | 0.0 | +| | 1970-01-01T00:33:20 | 0.0 | +| a | 1970-01-01T00:00:00 | 1.0 | +| a | 1970-01-01T00:20:00 | 3.0 | +| b | 1970-01-01T00:13:20 | 2.0 | +| c | 1970-01-01T00:26:40 | 4.0 | ++-----+---------------------+-----+ + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t1 or on () t2; + ++-----+---------------------+-----+ +| job | ts | val | ++-----+---------------------+-----+ +| | 1970-01-01T00:06:40 | 0.0 | +| | 1970-01-01T00:33:20 | 0.0 | +| a | 1970-01-01T00:00:00 | 1.0 | +| a | 1970-01-01T00:20:00 | 3.0 | +| b | 1970-01-01T00:13:20 | 2.0 | +| c | 1970-01-01T00:26:40 | 4.0 | ++-----+---------------------+-----+ + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t1 or on (job) t2; + ++-----+---------------------+-----+ +| job | ts | val | ++-----+---------------------+-----+ +| | 1970-01-01T00:00:00 | 0.0 | +| | 1970-01-01T00:06:40 | 0.0 | +| | 1970-01-01T00:13:20 | 0.0 | +| | 1970-01-01T00:20:00 | 0.0 | +| | 1970-01-01T00:26:40 | 0.0 | +| | 1970-01-01T00:33:20 | 0.0 | +| a | 1970-01-01T00:00:00 | 1.0 | +| a | 1970-01-01T00:20:00 | 3.0 | +| b | 1970-01-01T00:13:20 | 2.0 | +| c | 1970-01-01T00:26:40 | 4.0 | ++-----+---------------------+-----+ + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t2 or t1; + ++-----+---------------------+-----+ +| job | ts | val | ++-----+---------------------+-----+ +| | 1970-01-01T00:00:00 | 0.0 | +| | 1970-01-01T00:06:40 | 0.0 | +| | 1970-01-01T00:13:20 | 0.0 | +| | 1970-01-01T00:20:00 | 0.0 | +| | 1970-01-01T00:26:40 | 0.0 | +| | 1970-01-01T00:33:20 | 0.0 | +| a | 1970-01-01T00:00:00 | 1.0 | +| a | 1970-01-01T00:20:00 | 3.0 | +| b | 1970-01-01T00:13:20 | 2.0 | +| c | 1970-01-01T00:26:40 | 4.0 | ++-----+---------------------+-----+ + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t2 or on () t1; + ++-----+---------------------+-----+ +| job | ts | val | ++-----+---------------------+-----+ +| | 1970-01-01T00:00:00 | 0.0 | +| | 1970-01-01T00:06:40 | 0.0 | +| | 1970-01-01T00:13:20 | 0.0 | +| | 1970-01-01T00:20:00 | 0.0 | +| | 1970-01-01T00:26:40 | 0.0 | +| | 1970-01-01T00:33:20 | 0.0 | ++-----+---------------------+-----+ + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t2 or on(job) t1; + ++-----+---------------------+-----+ +| job | ts | val | ++-----+---------------------+-----+ +| | 1970-01-01T00:00:00 | 0.0 | +| | 1970-01-01T00:06:40 | 0.0 | +| | 1970-01-01T00:13:20 | 0.0 | +| | 1970-01-01T00:20:00 | 0.0 | +| | 1970-01-01T00:26:40 | 0.0 | +| | 1970-01-01T00:33:20 | 0.0 | +| a | 1970-01-01T00:00:00 | 1.0 | +| a | 1970-01-01T00:20:00 | 3.0 | +| b | 1970-01-01T00:13:20 | 2.0 | +| c | 1970-01-01T00:26:40 | 4.0 | ++-----+---------------------+-----+ + +drop table t1; + +Affected Rows: 0 + +drop table t2; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/promql/set_operation.sql b/tests/cases/standalone/common/promql/set_operation.sql index e91460df34..6a71711bd8 100644 --- a/tests/cases/standalone/common/promql/set_operation.sql +++ b/tests/cases/standalone/common/promql/set_operation.sql @@ -79,7 +79,7 @@ tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) and ignoring(g, job) -- http_requests{group="production", instance="0", job="app-server"} 500 -- http_requests{group="production", instance="1", job="api-server"} 200 -- http_requests{group="production", instance="1", job="app-server"} 600 --- NOT SUPPORTED: `or` +-- SQLNESS SORT_RESULT 3 1 tql eval (3000, 3000, '1s') http_requests{g="canary"} or http_requests{g="production"}; -- # On overlap the rhs samples must be dropped. @@ -90,7 +90,7 @@ tql eval (3000, 3000, '1s') http_requests{g="canary"} or http_requests{g="produc -- {group="canary", instance="1", job="app-server"} 801 -- http_requests{group="production", instance="1", job="api-server"} 200 -- http_requests{group="production", instance="1", job="app-server"} 600 --- NOT SUPPORTED: `or` +-- SQLNESS SORT_RESULT 3 1 tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) or http_requests{instance="1"}; @@ -173,3 +173,35 @@ drop table http_requests; drop table cpu_count; drop table vector_matching_a; + +-- the following cases are not from Prometheus. + +create table t1 (ts timestamp time index, job string primary key, val double); + +insert into t1 values (0, "a", 1.0), (500000, "b", 2.0), (1000000, "a", 3.0), (1500000, "c", 4.0); + +create table t2 (ts timestamp time index, val double); + +insert into t2 values (0, 0), (300000, 0), (600000, 0), (900000, 0), (1200000, 0), (1500000, 0), (1800000, 0); + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t1 or t2; + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t1 or on () t2; + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t1 or on (job) t2; + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t2 or t1; + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t2 or on () t1; + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t2 or on(job) t1; + +drop table t1; + +drop table t2;