feat: Implement OR for PromQL (#3024)

* with anit-join

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* impl UnionDistinctOn

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* unify schema

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add sqlness case

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add UTs

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/promql/src/planner.rs

Co-authored-by: dennis zhuang <killme2008@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: dennis zhuang <killme2008@gmail.com>
This commit is contained in:
Ruihang Xia
2023-12-28 14:56:17 +08:00
committed by GitHub
parent 1d80a0f2d6
commit b58296de22
11 changed files with 951 additions and 64 deletions

1
Cargo.lock generated
View File

@@ -6541,6 +6541,7 @@ dependencies = [
name = "promql"
version = "0.5.0"
dependencies = [
"ahash 0.8.6",
"async-recursion",
"async-trait",
"bytemuck",

View File

@@ -5,6 +5,7 @@ edition.workspace = true
license.workspace = true
[dependencies]
ahash.workspace = true
async-recursion = "1.0"
async-trait.workspace = true
bytemuck.workspace = true

View File

@@ -19,6 +19,9 @@ mod normalize;
mod planner;
mod range_manipulate;
mod series_divide;
#[cfg(test)]
mod test_util;
mod union_distinct_on;
use datafusion::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType};
pub use empty_metric::{build_special_time_expr, EmptyMetric, EmptyMetricExec, EmptyMetricStream};
@@ -28,5 +31,6 @@ pub use normalize::{SeriesNormalize, SeriesNormalizeExec, SeriesNormalizeStream}
pub use planner::PromExtensionPlanner;
pub use range_manipulate::{RangeManipulate, RangeManipulateExec, RangeManipulateStream};
pub use series_divide::{SeriesDivide, SeriesDivideExec, SeriesDivideStream};
pub use union_distinct_on::{UnionDistinctOn, UnionDistinctOnExec, UnionDistinctOnStream};
pub(crate) type Millisecond = <TimestampMillisecondType as ArrowPrimitiveType>::Native;

View File

@@ -445,40 +445,12 @@ impl InstantManipulateStream {
#[cfg(test)]
mod test {
use datafusion::arrow::array::Float64Array;
use datafusion::arrow::datatypes::{
ArrowPrimitiveType, DataType, Field, Schema, TimestampMillisecondType,
};
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::prelude::SessionContext;
use datatypes::arrow::array::TimestampMillisecondArray;
use datatypes::arrow_array::StringArray;
use super::*;
const TIME_INDEX_COLUMN: &str = "timestamp";
fn prepare_test_data() -> MemoryExec {
let schema = Arc::new(Schema::new(vec![
Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
Field::new("value", DataType::Float64, true),
Field::new("path", DataType::Utf8, true),
]));
let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
0, 30_000, 60_000, 90_000, 120_000, // every 30s
180_000, 240_000, // every 60s
241_000, 271_000, 291_000, // others
])) as _;
let field_column = Arc::new(Float64Array::from(vec![1.0; 10])) as _;
let path_column = Arc::new(StringArray::from(vec!["foo"; 10])) as _;
let data = RecordBatch::try_new(
schema.clone(),
vec![timestamp_column, field_column, path_column],
)
.unwrap();
MemoryExec::try_new(&[vec![data]], schema, None).unwrap()
}
use crate::extension_plan::test_util::{
prepare_test_data, prepare_test_data_with_nan, TIME_INDEX_COLUMN,
};
async fn do_normalize_test(
start: Millisecond,
@@ -749,22 +721,6 @@ mod test {
do_normalize_test(190_000, 300_000, 30_000, 10_000, expected, false).await;
}
fn prepare_test_data_with_nan() -> MemoryExec {
let schema = Arc::new(Schema::new(vec![
Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
Field::new("value", DataType::Float64, true),
]));
let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
0, 30_000, 60_000, 90_000, 120_000, // every 30s
])) as _;
let field_column =
Arc::new(Float64Array::from(vec![0.0, f64::NAN, 6.0, f64::NAN, 12.0])) as _;
let data =
RecordBatch::try_new(schema.clone(), vec![timestamp_column, field_column]).unwrap();
MemoryExec::try_new(&[vec![data]], schema, None).unwrap()
}
#[tokio::test]
async fn lookback_10s_interval_10s_with_nan() {
let expected = String::from(

View File

@@ -21,7 +21,7 @@ use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
use super::HistogramFold;
use super::{HistogramFold, UnionDistinctOn};
use crate::extension_plan::{
EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize,
};
@@ -50,6 +50,11 @@ impl ExtensionPlanner for PromExtensionPlanner {
Ok(Some(node.to_execution_plan(session_state, planner)?))
} else if let Some(node) = node.as_any().downcast_ref::<HistogramFold>() {
Ok(Some(node.to_execution_plan(physical_inputs[0].clone())))
} else if let Some(node) = node.as_any().downcast_ref::<UnionDistinctOn>() {
Ok(Some(node.to_execution_plan(
physical_inputs[0].clone(),
physical_inputs[1].clone(),
)))
} else {
Ok(None)
}

View File

@@ -0,0 +1,64 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Utils for testing extension plan
use std::sync::Arc;
use common_recordbatch::DfRecordBatch as RecordBatch;
use datafusion::arrow::array::Float64Array;
use datafusion::arrow::datatypes::{
ArrowPrimitiveType, DataType, Field, Schema, TimestampMillisecondType,
};
use datafusion::physical_plan::memory::MemoryExec;
use datatypes::arrow::array::TimestampMillisecondArray;
use datatypes::arrow_array::StringArray;
pub(crate) const TIME_INDEX_COLUMN: &str = "timestamp";
pub(crate) fn prepare_test_data() -> MemoryExec {
let schema = Arc::new(Schema::new(vec![
Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
Field::new("value", DataType::Float64, true),
Field::new("path", DataType::Utf8, true),
]));
let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
0, 30_000, 60_000, 90_000, 120_000, // every 30s
180_000, 240_000, // every 60s
241_000, 271_000, 291_000, // others
])) as _;
let field_column = Arc::new(Float64Array::from(vec![1.0; 10])) as _;
let path_column = Arc::new(StringArray::from(vec!["foo"; 10])) as _;
let data = RecordBatch::try_new(
schema.clone(),
vec![timestamp_column, field_column, path_column],
)
.unwrap();
MemoryExec::try_new(&[vec![data]], schema, None).unwrap()
}
pub(crate) fn prepare_test_data_with_nan() -> MemoryExec {
let schema = Arc::new(Schema::new(vec![
Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
Field::new("value", DataType::Float64, true),
]));
let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
0, 30_000, 60_000, 90_000, 120_000, // every 30s
])) as _;
let field_column = Arc::new(Float64Array::from(vec![0.0, f64::NAN, 6.0, f64::NAN, 12.0])) as _;
let data = RecordBatch::try_new(schema.clone(), vec![timestamp_column, field_column]).unwrap();
MemoryExec::try_new(&[vec![data]], schema, None).unwrap()
}

View File

@@ -0,0 +1,576 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use ahash::{HashMap, RandomState};
use datafusion::arrow::array::UInt64Array;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::DFSchemaRef;
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
hash_utils, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use datatypes::arrow::compute;
use futures::future::BoxFuture;
use futures::{ready, Stream, StreamExt, TryStreamExt};
/// A special kind of `UNION`(`OR` in PromQL) operator, for PromQL specific use case.
///
/// This operator is similar to `UNION` from SQL, but it only accepts two inputs. The
/// most different part is that it treat left child and right child differently:
/// - All columns from left child will be outputted.
/// - Only check collisions (when not distinct) on the columns specified by `compare_keys`.
/// - When there is a collision:
/// - If the collision is from right child itself, only the first observed row will be
/// preserved. All others are discarded.
/// - If the collision is from left child, the row in right child will be discarded.
/// - The output order is not maintained. This plan will output left child first, then right child.
/// - The output schema contains all columns from left or right child plans.
///
/// From the implementation perspective, this operator is similar to `HashJoin`, but the
/// probe side is the right child, and the build side is the left child. Another difference
/// is that the probe is opting-out.
///
/// This plan will exhaust the right child first to build probe hash table, then streaming
/// on left side, and use the left side to "mask" the hash table.
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct UnionDistinctOn {
left: LogicalPlan,
right: LogicalPlan,
/// The columns to compare for equality.
/// TIME INDEX is included.
compare_keys: Vec<String>,
ts_col: String,
output_schema: DFSchemaRef,
}
impl UnionDistinctOn {
pub fn name() -> &'static str {
"UnionDistinctOn"
}
pub fn new(
left: LogicalPlan,
right: LogicalPlan,
compare_keys: Vec<String>,
ts_col: String,
output_schema: DFSchemaRef,
) -> Self {
Self {
left,
right,
compare_keys,
ts_col,
output_schema,
}
}
pub fn to_execution_plan(
&self,
left_exec: Arc<dyn ExecutionPlan>,
right_exec: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
Arc::new(UnionDistinctOnExec {
left: left_exec,
right: right_exec,
compare_keys: self.compare_keys.clone(),
ts_col: self.ts_col.clone(),
output_schema: Arc::new(self.output_schema.as_ref().into()),
metric: ExecutionPlanMetricsSet::new(),
random_state: RandomState::new(),
})
}
}
impl UserDefinedLogicalNodeCore for UnionDistinctOn {
fn name(&self) -> &str {
Self::name()
}
fn inputs(&self) -> Vec<&LogicalPlan> {
vec![&self.left, &self.right]
}
fn schema(&self) -> &DFSchemaRef {
&self.output_schema
}
fn expressions(&self) -> Vec<Expr> {
vec![]
}
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"UnionDistinctOn: on col=[{:?}], ts_col=[{}]",
self.compare_keys, self.ts_col
)
}
fn from_template(&self, _exprs: &[Expr], inputs: &[LogicalPlan]) -> Self {
assert_eq!(inputs.len(), 2);
let left = inputs[0].clone();
let right = inputs[1].clone();
Self {
left,
right,
compare_keys: self.compare_keys.clone(),
ts_col: self.ts_col.clone(),
output_schema: self.output_schema.clone(),
}
}
}
#[derive(Debug)]
pub struct UnionDistinctOnExec {
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
compare_keys: Vec<String>,
ts_col: String,
output_schema: SchemaRef,
metric: ExecutionPlanMetricsSet,
/// Shared the `RandomState` for the hashing algorithm
random_state: RandomState,
}
impl ExecutionPlan for UnionDistinctOnExec {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.output_schema.clone()
}
fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::SinglePartition, Distribution::SinglePartition]
}
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(1)
}
/// [UnionDistinctOnExec] will output left first, then right.
/// So the order of the output is not maintained.
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.left.clone(), self.right.clone()]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
assert_eq!(children.len(), 2);
let left = children[0].clone();
let right = children[1].clone();
Ok(Arc::new(UnionDistinctOnExec {
left,
right,
compare_keys: self.compare_keys.clone(),
ts_col: self.ts_col.clone(),
output_schema: self.output_schema.clone(),
metric: self.metric.clone(),
random_state: self.random_state.clone(),
}))
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> DataFusionResult<SendableRecordBatchStream> {
let left_stream = self.left.execute(partition, context.clone())?;
let right_stream = self.right.execute(partition, context.clone())?;
// Convert column name to column index. Add one for the time column.
let mut key_indices = Vec::with_capacity(self.compare_keys.len() + 1);
for key in &self.compare_keys {
let index = self
.output_schema
.column_with_name(key)
.map(|(i, _)| i)
.ok_or_else(|| DataFusionError::Internal(format!("Column {} not found", key)))?;
key_indices.push(index);
}
let ts_index = self
.output_schema
.column_with_name(&self.ts_col)
.map(|(i, _)| i)
.ok_or_else(|| {
DataFusionError::Internal(format!("Column {} not found", self.ts_col))
})?;
key_indices.push(ts_index);
// Build right hash table future.
let hashed_data_future = HashedDataFut::Pending(Box::pin(HashedData::new(
right_stream,
self.random_state.clone(),
key_indices.clone(),
)));
let baseline_metric = BaselineMetrics::new(&self.metric, partition);
Ok(Box::pin(UnionDistinctOnStream {
left: left_stream,
right: hashed_data_future,
compare_keys: key_indices,
output_schema: self.output_schema.clone(),
metric: baseline_metric,
}))
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metric.clone_inner())
}
fn statistics(&self) -> Statistics {
Statistics::default()
}
}
impl DisplayAs for UnionDistinctOnExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"UnionDistinctOnExec: on col=[{:?}], ts_col=[{}]",
self.compare_keys, self.ts_col
)
}
}
}
}
// TODO(ruihang): some unused fields are for metrics, which will be implemented later.
#[allow(dead_code)]
pub struct UnionDistinctOnStream {
left: SendableRecordBatchStream,
right: HashedDataFut,
/// Include time index
compare_keys: Vec<usize>,
output_schema: SchemaRef,
metric: BaselineMetrics,
}
impl UnionDistinctOnStream {
fn poll_impl(&mut self, cx: &mut Context<'_>) -> Poll<Option<<Self as Stream>::Item>> {
// resolve the right stream
let right = match self.right {
HashedDataFut::Pending(ref mut fut) => {
let right = ready!(fut.as_mut().poll(cx))?;
self.right = HashedDataFut::Ready(right);
let HashedDataFut::Ready(right_ref) = &mut self.right else {
unreachable!()
};
right_ref
}
HashedDataFut::Ready(ref mut right) => right,
HashedDataFut::Empty => return Poll::Ready(None),
};
// poll left and probe with right
let next_left = ready!(self.left.poll_next_unpin(cx));
match next_left {
Some(Ok(left)) => {
// observe left batch and return it
right.update_map(&left)?;
Poll::Ready(Some(Ok(left)))
}
Some(Err(e)) => Poll::Ready(Some(Err(e))),
None => {
// left stream is exhausted, so we can send the right part
let right = std::mem::replace(&mut self.right, HashedDataFut::Empty);
let HashedDataFut::Ready(data) = right else {
unreachable!()
};
Poll::Ready(Some(data.finish()))
}
}
}
}
impl RecordBatchStream for UnionDistinctOnStream {
fn schema(&self) -> SchemaRef {
self.output_schema.clone()
}
}
impl Stream for UnionDistinctOnStream {
type Item = DataFusionResult<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.poll_impl(cx)
}
}
/// Simple future state for [HashedData]
enum HashedDataFut {
/// The result is not ready
Pending(BoxFuture<'static, DataFusionResult<HashedData>>),
/// The result is ready
Ready(HashedData),
/// The result is taken
Empty,
}
/// ALL input batches and its hash table
struct HashedData {
// TODO(ruihang): use `JoinHashMap` instead after upgrading to DF 34.0
/// Hash table for all input batches. The key is hash value, and the value
/// is the index of `bathc`.
hash_map: HashMap<u64, usize>,
/// Output batch.
batch: RecordBatch,
/// The indices of the columns to be hashed.
hash_key_indices: Vec<usize>,
random_state: RandomState,
}
impl HashedData {
pub async fn new(
input: SendableRecordBatchStream,
random_state: RandomState,
hash_key_indices: Vec<usize>,
) -> DataFusionResult<Self> {
// Collect all batches from the input stream
let initial = (Vec::new(), 0);
let (batches, _num_rows) = input
.try_fold(initial, |mut acc, batch| async {
// Update rowcount
acc.1 += batch.num_rows();
// Push batch to output
acc.0.push(batch);
Ok(acc)
})
.await?;
// Create hash for each batch
let mut hash_map = HashMap::default();
let mut hashes_buffer = Vec::new();
let mut interleave_indices = Vec::new();
for (batch_number, batch) in batches.iter().enumerate() {
hashes_buffer.resize(batch.num_rows(), 0);
// get columns for hashing
let arrays = hash_key_indices
.iter()
.map(|i| batch.column(*i).clone())
.collect::<Vec<_>>();
// compute hash
let hash_values =
hash_utils::create_hashes(&arrays, &random_state, &mut hashes_buffer)?;
for (row_number, hash_value) in hash_values.iter().enumerate() {
// Only keeps the first observed row for each hash value
if hash_map
.try_insert(*hash_value, interleave_indices.len())
.is_ok()
{
interleave_indices.push((batch_number, row_number));
}
}
}
// Finilize the hash map
let batch = interleave_batches(batches, interleave_indices)?;
Ok(Self {
hash_map,
batch,
hash_key_indices,
random_state,
})
}
/// Remove rows that hash value present in the input
/// record batch from the hash map.
pub fn update_map(&mut self, input: &RecordBatch) -> DataFusionResult<()> {
// get columns for hashing
let mut hashes_buffer = Vec::new();
let arrays = self
.hash_key_indices
.iter()
.map(|i| input.column(*i).clone())
.collect::<Vec<_>>();
// compute hash
hashes_buffer.resize(input.num_rows(), 0);
let hash_values =
hash_utils::create_hashes(&arrays, &self.random_state, &mut hashes_buffer)?;
// remove those hashes
for hash in hash_values {
self.hash_map.remove(hash);
}
Ok(())
}
pub fn finish(self) -> DataFusionResult<RecordBatch> {
let valid_indices = self.hash_map.values().copied().collect::<Vec<_>>();
let result = take_batch(&self.batch, &valid_indices)?;
Ok(result)
}
}
/// Utility function to interleave batches. Based on [interleave](datafusion::arrow::compute::interleave)
fn interleave_batches(
batches: Vec<RecordBatch>,
indices: Vec<(usize, usize)>,
) -> DataFusionResult<RecordBatch> {
let schema = batches[0].schema();
// transform batches into arrays
let mut arrays = vec![vec![]; schema.fields().len()];
for batch in &batches {
for (i, array) in batch.columns().iter().enumerate() {
arrays[i].push(array.as_ref());
}
}
// interleave arrays
let mut interleaved_arrays = Vec::with_capacity(arrays.len());
for array in arrays {
interleaved_arrays.push(compute::interleave(&array, &indices)?);
}
// assemble new record batch
RecordBatch::try_new(schema.clone(), interleaved_arrays).map_err(DataFusionError::ArrowError)
}
/// Utility function to take rows from a record batch. Based on [take](datafusion::arrow::compute::take)
fn take_batch(batch: &RecordBatch, indices: &[usize]) -> DataFusionResult<RecordBatch> {
// fast path
if batch.num_rows() == indices.len() {
return Ok(batch.clone());
}
let schema = batch.schema();
let indices_array = UInt64Array::from_iter(indices.iter().map(|i| *i as u64));
let arrays = batch
.columns()
.iter()
.map(|array| compute::take(array, &indices_array, None))
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(DataFusionError::ArrowError)?;
let result = RecordBatch::try_new(schema, arrays).map_err(DataFusionError::ArrowError)?;
Ok(result)
}
#[cfg(test)]
mod test {
use datafusion::arrow::array::Int32Array;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use super::*;
#[test]
fn test_interleave_batches() {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);
let batch1 = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(Int32Array::from(vec![4, 5, 6])),
],
)
.unwrap();
let batch2 = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(Int32Array::from(vec![7, 8, 9])),
Arc::new(Int32Array::from(vec![10, 11, 12])),
],
)
.unwrap();
let batch3 = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(Int32Array::from(vec![13, 14, 15])),
Arc::new(Int32Array::from(vec![16, 17, 18])),
],
)
.unwrap();
let batches = vec![batch1, batch2, batch3];
let indices = vec![(0, 0), (1, 0), (2, 0), (0, 1), (1, 1), (2, 1)];
let result = interleave_batches(batches, indices).unwrap();
let expected = RecordBatch::try_new(
Arc::new(schema),
vec![
Arc::new(Int32Array::from(vec![1, 7, 13, 2, 8, 14])),
Arc::new(Int32Array::from(vec![4, 10, 16, 5, 11, 17])),
],
)
.unwrap();
assert_eq!(result, expected);
}
#[test]
fn test_take_batch() {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(Int32Array::from(vec![4, 5, 6])),
],
)
.unwrap();
let indices = vec![0, 2];
let result = take_batch(&batch, &indices).unwrap();
let expected = RecordBatch::try_new(
Arc::new(schema),
vec![
Arc::new(Int32Array::from(vec![1, 3])),
Arc::new(Int32Array::from(vec![4, 6])),
],
)
.unwrap();
assert_eq!(result, expected);
}
}

View File

@@ -14,6 +14,7 @@
#![feature(option_get_or_insert_default)]
#![feature(let_chains)]
#![feature(map_try_insert)]
pub mod error;
pub mod extension_plan;

View File

@@ -51,7 +51,7 @@ use crate::error::{
};
use crate::extension_plan::{
build_special_time_expr, EmptyMetric, HistogramFold, InstantManipulate, Millisecond,
RangeManipulate, SeriesDivide, SeriesNormalize,
RangeManipulate, SeriesDivide, SeriesNormalize, UnionDistinctOn,
};
use crate::functions::{
AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters, IDelta,
@@ -1489,6 +1489,7 @@ impl PromPlanner {
.context(DataFusionPlanningSnafu)
}
/// Build a set operator (AND/OR/UNLESS)
fn set_op_on_non_field_columns(
&self,
left: LogicalPlan,
@@ -1501,6 +1502,10 @@ impl PromPlanner {
let mut left_tag_col_set = left_tag_cols.into_iter().collect::<HashSet<_>>();
let mut right_tag_col_set = right_tag_cols.into_iter().collect::<HashSet<_>>();
if matches!(op.id(), token::T_LOR) {
return self.or_operator(left, right, left_tag_col_set, right_tag_col_set, modifier);
}
// apply modifier
if let Some(modifier) = modifier {
// one-to-many and many-to-one are not supported
@@ -1545,7 +1550,8 @@ impl PromPlanner {
)
};
let join_keys = left_tag_col_set
.into_iter()
.iter()
.cloned()
.chain([self.ctx.time_index_column.clone().unwrap()])
.collect::<Vec<_>>();
@@ -1579,17 +1585,122 @@ impl PromPlanner {
.build()
.context(DataFusionPlanningSnafu),
token::T_LOR => {
// `OR` can not be expressed by `UNION` precisely.
// it will generate unexpceted result when schemas don't match
UnsupportedExprSnafu {
name: "set operation `OR`",
}
.fail()
self.or_operator(left, right, left_tag_col_set, right_tag_col_set, modifier)
}
_ => UnexpectedTokenSnafu { token: op }.fail(),
}
}
// TODO(ruihang): change function name
fn or_operator(
&self,
left: LogicalPlan,
right: LogicalPlan,
left_tag_cols_set: HashSet<String>,
right_tag_cols_set: HashSet<String>,
modifier: &Option<BinModifier>,
) -> Result<LogicalPlan> {
// prepare hash sets
let all_tags = left_tag_cols_set
.union(&right_tag_cols_set)
.cloned()
.collect::<HashSet<_>>();
let tags_not_in_left = all_tags
.difference(&left_tag_cols_set)
.cloned()
.collect::<Vec<_>>();
let tags_not_in_right = all_tags
.difference(&right_tag_cols_set)
.cloned()
.collect::<Vec<_>>();
let left_qualifier = left.schema().field(0).qualifier().cloned();
let right_qualifier = right.schema().field(0).qualifier().cloned();
let left_qualifier_string = left_qualifier
.as_ref()
.map(|l| l.to_string())
.unwrap_or_default();
let right_qualifier_string = right_qualifier
.as_ref()
.map(|r| r.to_string())
.unwrap_or_default();
// step 0: fill all columns in output schema
let all_columns_set = left
.schema()
.fields()
.iter()
.chain(right.schema().fields().iter())
.map(|field| field.name().clone())
.collect::<HashSet<_>>();
let mut all_columns = all_columns_set.into_iter().collect::<Vec<_>>();
// sort to ensure the generated schema is not volatile
all_columns.sort_unstable();
// step 1: align schema using project, fill non-exist columns with null
let left_proj_exprs = all_columns.iter().map(|col| {
if tags_not_in_left.contains(col) {
DfExpr::Literal(ScalarValue::Utf8(None)).alias(col.to_string())
} else {
DfExpr::Column(Column::new(left_qualifier.clone(), col))
}
});
let right_proj_exprs = all_columns.iter().map(|col| {
if tags_not_in_right.contains(col) {
DfExpr::Literal(ScalarValue::Utf8(None)).alias(col.to_string())
} else {
DfExpr::Column(Column::new(right_qualifier.clone(), col))
}
});
let left_projected = LogicalPlanBuilder::from(left)
.project(left_proj_exprs)
.context(DataFusionPlanningSnafu)?
.alias(left_qualifier_string.clone())
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)?;
let right_projected = LogicalPlanBuilder::from(right)
.project(right_proj_exprs)
.context(DataFusionPlanningSnafu)?
.alias(right_qualifier_string.clone())
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)?;
// step 2: compute match columns
let mut match_columns = if let Some(modifier) = modifier
&& let Some(matching) = &modifier.matching
{
match matching {
// keeps columns mentioned in `on`
LabelModifier::Include(on) => on.labels.clone(),
// removes columns memtioned in `ignoring`
LabelModifier::Exclude(ignoring) => {
let ignoring = ignoring.labels.iter().cloned().collect::<HashSet<_>>();
all_tags.difference(&ignoring).cloned().collect()
}
}
} else {
all_tags.iter().cloned().collect()
};
// sort to ensure the generated plan is not volatile
match_columns.sort_unstable();
// step 3: build `UnionDistinctOn` plan
let schema = left_projected.schema().clone();
let union_distinct_on = UnionDistinctOn::new(
left_projected,
right_projected,
match_columns,
self.ctx.time_index_column.clone().unwrap(),
schema,
);
let result = LogicalPlan::Extension(Extension {
node: Arc::new(union_distinct_on),
});
Ok(result)
}
/// Build a projection that project and perform operation expr for every value columns.
/// Non-value columns (tag and timestamp) will be preserved in the projection.
///

View File

@@ -130,10 +130,21 @@ tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) and ignoring(g, job)
-- http_requests{group="production", instance="0", job="app-server"} 500
-- http_requests{group="production", instance="1", job="api-server"} 200
-- http_requests{group="production", instance="1", job="app-server"} 600
-- NOT SUPPORTED: `or`
-- SQLNESS SORT_RESULT 3 1
tql eval (3000, 3000, '1s') http_requests{g="canary"} or http_requests{g="production"};
Error: 1004(InvalidArguments), Unsupported expr type: set operation `OR`
+------------+----------+-----+---------------------+-------+
| g | instance | job | ts | val |
+------------+----------+-----+---------------------+-------+
| canary | 0 | api | 1970-01-01T00:50:00 | 300.0 |
| canary | 0 | app | 1970-01-01T00:50:00 | 700.0 |
| canary | 1 | api | 1970-01-01T00:50:00 | 400.0 |
| canary | 1 | app | 1970-01-01T00:50:00 | 800.0 |
| production | 0 | api | 1970-01-01T00:50:00 | 100.0 |
| production | 0 | app | 1970-01-01T00:50:00 | 500.0 |
| production | 1 | api | 1970-01-01T00:50:00 | 200.0 |
| production | 1 | app | 1970-01-01T00:50:00 | 600.0 |
+------------+----------+-----+---------------------+-------+
-- # On overlap the rhs samples must be dropped.
-- eval instant at 50m (http_requests{group="canary"} + 1) or http_requests{instance="1"}
@@ -143,10 +154,10 @@ Error: 1004(InvalidArguments), Unsupported expr type: set operation `OR`
-- {group="canary", instance="1", job="app-server"} 801
-- http_requests{group="production", instance="1", job="api-server"} 200
-- http_requests{group="production", instance="1", job="app-server"} 600
-- NOT SUPPORTED: `or`
-- SQLNESS SORT_RESULT 3 1
tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) or http_requests{instance="1"};
Error: 1004(InvalidArguments), Unsupported expr type: set operation `OR`
Error: 1004(InvalidArguments), Internal error during building DataFusion plan: No field named http_requests.val. Valid fields are http_requests.job, http_requests.instance, http_requests.g, http_requests.ts, "val + Float64(1)".
-- # Matching only on instance excludes everything that has instance=0/1 but includes
-- # entries without the instance label.
@@ -161,7 +172,7 @@ Error: 1004(InvalidArguments), Unsupported expr type: set operation `OR`
-- NOT SUPPORTED: `or`
tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) or on(instance) (http_requests or cpu_count or vector_matching_a);
Error: 1004(InvalidArguments), Unsupported expr type: set operation `OR`
Error: 1004(InvalidArguments), Internal error during building DataFusion plan: No field named cpu_count.val. Valid fields are cpu_count.ts.
-- eval instant at 50m (http_requests{group="canary"} + 1) or ignoring(l, group, job) (http_requests or cpu_count or vector_matching_a)
-- {group="canary", instance="0", job="api-server"} 301
@@ -174,7 +185,7 @@ Error: 1004(InvalidArguments), Unsupported expr type: set operation `OR`
-- NOT SUPPORTED: `or`
tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) or ignoring(l, g, job) (http_requests or cpu_count or vector_matching_a);
Error: 1004(InvalidArguments), Unsupported expr type: set operation `OR`
Error: 1004(InvalidArguments), Internal error during building DataFusion plan: No field named cpu_count.val. Valid fields are cpu_count.ts.
-- eval instant at 50m http_requests{group="canary"} unless http_requests{instance="0"}
-- http_requests{group="canary", instance="1", job="api-server"} 400
@@ -268,3 +279,128 @@ drop table vector_matching_a;
Affected Rows: 0
-- the following cases are not from Prometheus.
create table t1 (ts timestamp time index, job string primary key, val double);
Affected Rows: 0
insert into t1 values (0, "a", 1.0), (500000, "b", 2.0), (1000000, "a", 3.0), (1500000, "c", 4.0);
Affected Rows: 4
create table t2 (ts timestamp time index, val double);
Affected Rows: 0
insert into t2 values (0, 0), (300000, 0), (600000, 0), (900000, 0), (1200000, 0), (1500000, 0), (1800000, 0);
Affected Rows: 7
-- SQLNESS SORT_RESULT 3 1
tql eval (0, 2000, '400') t1 or t2;
+-----+---------------------+-----+
| job | ts | val |
+-----+---------------------+-----+
| | 1970-01-01T00:00:00 | 0.0 |
| | 1970-01-01T00:06:40 | 0.0 |
| | 1970-01-01T00:13:20 | 0.0 |
| | 1970-01-01T00:20:00 | 0.0 |
| | 1970-01-01T00:26:40 | 0.0 |
| | 1970-01-01T00:33:20 | 0.0 |
| a | 1970-01-01T00:00:00 | 1.0 |
| a | 1970-01-01T00:20:00 | 3.0 |
| b | 1970-01-01T00:13:20 | 2.0 |
| c | 1970-01-01T00:26:40 | 4.0 |
+-----+---------------------+-----+
-- SQLNESS SORT_RESULT 3 1
tql eval (0, 2000, '400') t1 or on () t2;
+-----+---------------------+-----+
| job | ts | val |
+-----+---------------------+-----+
| | 1970-01-01T00:06:40 | 0.0 |
| | 1970-01-01T00:33:20 | 0.0 |
| a | 1970-01-01T00:00:00 | 1.0 |
| a | 1970-01-01T00:20:00 | 3.0 |
| b | 1970-01-01T00:13:20 | 2.0 |
| c | 1970-01-01T00:26:40 | 4.0 |
+-----+---------------------+-----+
-- SQLNESS SORT_RESULT 3 1
tql eval (0, 2000, '400') t1 or on (job) t2;
+-----+---------------------+-----+
| job | ts | val |
+-----+---------------------+-----+
| | 1970-01-01T00:00:00 | 0.0 |
| | 1970-01-01T00:06:40 | 0.0 |
| | 1970-01-01T00:13:20 | 0.0 |
| | 1970-01-01T00:20:00 | 0.0 |
| | 1970-01-01T00:26:40 | 0.0 |
| | 1970-01-01T00:33:20 | 0.0 |
| a | 1970-01-01T00:00:00 | 1.0 |
| a | 1970-01-01T00:20:00 | 3.0 |
| b | 1970-01-01T00:13:20 | 2.0 |
| c | 1970-01-01T00:26:40 | 4.0 |
+-----+---------------------+-----+
-- SQLNESS SORT_RESULT 3 1
tql eval (0, 2000, '400') t2 or t1;
+-----+---------------------+-----+
| job | ts | val |
+-----+---------------------+-----+
| | 1970-01-01T00:00:00 | 0.0 |
| | 1970-01-01T00:06:40 | 0.0 |
| | 1970-01-01T00:13:20 | 0.0 |
| | 1970-01-01T00:20:00 | 0.0 |
| | 1970-01-01T00:26:40 | 0.0 |
| | 1970-01-01T00:33:20 | 0.0 |
| a | 1970-01-01T00:00:00 | 1.0 |
| a | 1970-01-01T00:20:00 | 3.0 |
| b | 1970-01-01T00:13:20 | 2.0 |
| c | 1970-01-01T00:26:40 | 4.0 |
+-----+---------------------+-----+
-- SQLNESS SORT_RESULT 3 1
tql eval (0, 2000, '400') t2 or on () t1;
+-----+---------------------+-----+
| job | ts | val |
+-----+---------------------+-----+
| | 1970-01-01T00:00:00 | 0.0 |
| | 1970-01-01T00:06:40 | 0.0 |
| | 1970-01-01T00:13:20 | 0.0 |
| | 1970-01-01T00:20:00 | 0.0 |
| | 1970-01-01T00:26:40 | 0.0 |
| | 1970-01-01T00:33:20 | 0.0 |
+-----+---------------------+-----+
-- SQLNESS SORT_RESULT 3 1
tql eval (0, 2000, '400') t2 or on(job) t1;
+-----+---------------------+-----+
| job | ts | val |
+-----+---------------------+-----+
| | 1970-01-01T00:00:00 | 0.0 |
| | 1970-01-01T00:06:40 | 0.0 |
| | 1970-01-01T00:13:20 | 0.0 |
| | 1970-01-01T00:20:00 | 0.0 |
| | 1970-01-01T00:26:40 | 0.0 |
| | 1970-01-01T00:33:20 | 0.0 |
| a | 1970-01-01T00:00:00 | 1.0 |
| a | 1970-01-01T00:20:00 | 3.0 |
| b | 1970-01-01T00:13:20 | 2.0 |
| c | 1970-01-01T00:26:40 | 4.0 |
+-----+---------------------+-----+
drop table t1;
Affected Rows: 0
drop table t2;
Affected Rows: 0

View File

@@ -79,7 +79,7 @@ tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) and ignoring(g, job)
-- http_requests{group="production", instance="0", job="app-server"} 500
-- http_requests{group="production", instance="1", job="api-server"} 200
-- http_requests{group="production", instance="1", job="app-server"} 600
-- NOT SUPPORTED: `or`
-- SQLNESS SORT_RESULT 3 1
tql eval (3000, 3000, '1s') http_requests{g="canary"} or http_requests{g="production"};
-- # On overlap the rhs samples must be dropped.
@@ -90,7 +90,7 @@ tql eval (3000, 3000, '1s') http_requests{g="canary"} or http_requests{g="produc
-- {group="canary", instance="1", job="app-server"} 801
-- http_requests{group="production", instance="1", job="api-server"} 200
-- http_requests{group="production", instance="1", job="app-server"} 600
-- NOT SUPPORTED: `or`
-- SQLNESS SORT_RESULT 3 1
tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) or http_requests{instance="1"};
@@ -173,3 +173,35 @@ drop table http_requests;
drop table cpu_count;
drop table vector_matching_a;
-- the following cases are not from Prometheus.
create table t1 (ts timestamp time index, job string primary key, val double);
insert into t1 values (0, "a", 1.0), (500000, "b", 2.0), (1000000, "a", 3.0), (1500000, "c", 4.0);
create table t2 (ts timestamp time index, val double);
insert into t2 values (0, 0), (300000, 0), (600000, 0), (900000, 0), (1200000, 0), (1500000, 0), (1800000, 0);
-- SQLNESS SORT_RESULT 3 1
tql eval (0, 2000, '400') t1 or t2;
-- SQLNESS SORT_RESULT 3 1
tql eval (0, 2000, '400') t1 or on () t2;
-- SQLNESS SORT_RESULT 3 1
tql eval (0, 2000, '400') t1 or on (job) t2;
-- SQLNESS SORT_RESULT 3 1
tql eval (0, 2000, '400') t2 or t1;
-- SQLNESS SORT_RESULT 3 1
tql eval (0, 2000, '400') t2 or on () t1;
-- SQLNESS SORT_RESULT 3 1
tql eval (0, 2000, '400') t2 or on(job) t1;
drop table t1;
drop table t2;