feat: add DictionaryVector DataType (#1061)

* fix stddev and stdvar. try build range function expr

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* feat: add dictionary data type

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* preserve timestamp column in range manipulator

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* plan range functions

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update test result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update test result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update test result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* resolve CR comments

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* resolve CR comments

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-02-23 20:31:07 +08:00
committed by GitHub
parent b48c851b96
commit 4cc3ac37d5
17 changed files with 376 additions and 117 deletions

View File

@@ -97,7 +97,9 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
TimestampType::Microsecond(_) => ColumnDataType::TimestampMicrosecond,
TimestampType::Nanosecond(_) => ColumnDataType::TimestampNanosecond,
},
ConcreteDataType::Null(_) | ConcreteDataType::List(_) => {
ConcreteDataType::Null(_)
| ConcreteDataType::List(_)
| ConcreteDataType::Dictionary(_) => {
return error::IntoColumnDataTypeSnafu { from: datatype }.fail()
}
});

View File

@@ -419,8 +419,9 @@ fn convert_values(data_type: &ConcreteDataType, values: Values) -> Vec<Value> {
.into_iter()
.map(|v| Value::Timestamp(Timestamp::new_millisecond(v)))
.collect(),
ConcreteDataType::Null(_) => unreachable!(),
ConcreteDataType::List(_) => unreachable!(),
ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => {
unreachable!()
}
}
}

View File

@@ -67,7 +67,7 @@ macro_rules! convert_arrow_array_to_grpc_vals {
return Ok(vals);
},
)+
ConcreteDataType::Null(_) | ConcreteDataType::List(_) => unreachable!("Should not send {:?} in gRPC", $data_type),
ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => unreachable!("Should not send {:?} in gRPC", $data_type),
}
}};
}

View File

@@ -131,7 +131,9 @@ pub fn from_concrete_type(ty: ConcreteDataType, nullability: Option<bool>) -> Re
ConcreteDataType::Timestamp(_) => {
build_substrait_kind!(Timestamp, Timestamp, nullability, 0)
}
ConcreteDataType::List(_) => UnsupportedConcreteTypeSnafu { ty }.fail()?,
ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => {
UnsupportedConcreteTypeSnafu { ty }.fail()?
}
};
Ok(SType { kind })

View File

@@ -103,21 +103,22 @@ async fn sql_insert_tql_query_ceil() {
('host1', 49, 3333.3, 99000);
"#,
"TQL EVAL (0,100,10) ceil(http_requests_total{host=\"host1\"})",
"+---------------------+-------------------------------+----------------------------------+-------+\
\n| ts | ceil(http_requests_total.cpu) | ceil(http_requests_total.memory) | host |\
\n+---------------------+-------------------------------+----------------------------------+-------+\
\n| 1970-01-01T00:00:00 | 67 | 1024 | host1 |\
\n| 1970-01-01T00:00:10 | 100 | 20480 | host1 |\
\n| 1970-01-01T00:00:20 | 100 | 20480 | host1 |\
\n| 1970-01-01T00:00:30 | 32 | 8192 | host1 |\
\n| 1970-01-01T00:00:40 | 96 | 334 | host1 |\
\n| 1970-01-01T00:00:50 | 12424 | 1334 | host1 |\
\n| 1970-01-01T00:01:00 | 12424 | 1334 | host1 |\
\n| 1970-01-01T00:01:10 | 12424 | 1334 | host1 |\
\n| 1970-01-01T00:01:20 | 0 | 2334 | host1 |\
\n| 1970-01-01T00:01:30 | 0 | 2334 | host1 |\
\n| 1970-01-01T00:01:40 | 49 | 3334 | host1 |\
\n+---------------------+-------------------------------+----------------------------------+-------+")
"+---------------------+-----------+--------------+-------+\
\n| ts | ceil(cpu) | ceil(memory) | host |\
\n+---------------------+-----------+--------------+-------+\
\n| 1970-01-01T00:00:00 | 67 | 1024 | host1 |\
\n| 1970-01-01T00:00:10 | 100 | 20480 | host1 |\
\n| 1970-01-01T00:00:20 | 100 | 20480 | host1 |\
\n| 1970-01-01T00:00:30 | 32 | 8192 | host1 |\
\n| 1970-01-01T00:00:40 | 96 | 334 | host1 |\
\n| 1970-01-01T00:00:50 | 12424 | 1334 | host1 |\
\n| 1970-01-01T00:01:00 | 12424 | 1334 | host1 |\
\n| 1970-01-01T00:01:10 | 12424 | 1334 | host1 |\
\n| 1970-01-01T00:01:20 | 0 | 2334 | host1 |\
\n| 1970-01-01T00:01:30 | 0 | 2334 | host1 |\
\n| 1970-01-01T00:01:40 | 49 | 3334 | host1 |\
\n+---------------------+-----------+--------------+-------+",
)
.await;
}
@@ -150,16 +151,16 @@ async fn sql_insert_promql_query_ceil() {
UNIX_EPOCH.checked_add(Duration::from_secs(100)).unwrap(),
Duration::from_secs(5),
Duration::from_secs(1),
"+---------------------+-------------------------------+----------------------------------+-------+\
\n| ts | ceil(http_requests_total.cpu) | ceil(http_requests_total.memory) | host |\
\n+---------------------+-------------------------------+----------------------------------+-------+\
\n| 1970-01-01T00:00:00 | 67 | 1024 | host1 |\
\n| 1970-01-01T00:00:05 | 67 | 4096 | host1 |\
\n| 1970-01-01T00:00:10 | 100 | 20480 | host1 |\
\n| 1970-01-01T00:01:20 | 0 | 2334 | host1 |\
\n| 1970-01-01T00:01:40 | 49 | 3334 | host1 |\
\n| 1970-01-01T00:00:50 | 12424 | 1334 | host1 |\
\n+---------------------+-------------------------------+----------------------------------+-------+"
"+---------------------+-----------+--------------+-------+\
\n| ts | ceil(cpu) | ceil(memory) | host |\
\n+---------------------+-----------+--------------+-------+\
\n| 1970-01-01T00:00:00 | 67 | 1024 | host1 |\
\n| 1970-01-01T00:00:05 | 67 | 4096 | host1 |\
\n| 1970-01-01T00:00:10 | 100 | 20480 | host1 |\
\n| 1970-01-01T00:00:50 | 12424 | 1334 | host1 |\
\n| 1970-01-01T00:01:20 | 0 | 2334 | host1 |\
\n| 1970-01-01T00:01:40 | 49 | 3334 | host1 |\
\n+---------------------+-----------+--------------+-------+",
)
.await;
}

View File

@@ -22,10 +22,10 @@ use serde::{Deserialize, Serialize};
use crate::error::{self, Error, Result};
use crate::type_id::LogicalTypeId;
use crate::types::{
BinaryType, BooleanType, DateTimeType, DateType, Float32Type, Float64Type, Int16Type,
Int32Type, Int64Type, Int8Type, ListType, NullType, StringType, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, TimestampType,
UInt16Type, UInt32Type, UInt64Type, UInt8Type,
BinaryType, BooleanType, DateTimeType, DateType, DictionaryType, Float32Type, Float64Type,
Int16Type, Int32Type, Int64Type, Int8Type, ListType, NullType, StringType,
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType, TimestampType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
};
use crate::value::Value;
use crate::vectors::MutableVector;
@@ -59,6 +59,7 @@ pub enum ConcreteDataType {
// Compound types:
List(ListType),
Dictionary(DictionaryType),
}
// TODO(yingwen): Refactor these `is_xxx()` methods, such as adding a `properties()` method
@@ -169,6 +170,11 @@ impl TryFrom<&ArrowDataType> for ConcreteDataType {
ArrowDataType::List(field) => Self::List(ListType::new(
ConcreteDataType::from_arrow_type(field.data_type()),
)),
ArrowDataType::Dictionary(key_type, value_type) => {
let key_type = ConcreteDataType::from_arrow_type(key_type);
let value_type = ConcreteDataType::from_arrow_type(value_type);
Self::Dictionary(DictionaryType::new(key_type, value_type))
}
_ => {
return error::UnsupportedArrowTypeSnafu {
arrow_type: dt.clone(),
@@ -243,6 +249,13 @@ impl ConcreteDataType {
pub fn list_datatype(item_type: ConcreteDataType) -> ConcreteDataType {
ConcreteDataType::List(ListType::new(item_type))
}
pub fn dictionary_datatype(
key_type: ConcreteDataType,
value_type: ConcreteDataType,
) -> ConcreteDataType {
ConcreteDataType::Dictionary(DictionaryType::new(key_type, value_type))
}
}
/// Data type abstraction.

View File

@@ -48,6 +48,7 @@ pub enum LogicalTypeId {
TimestampNanosecond,
List,
Dictionary,
}
impl LogicalTypeId {
@@ -88,6 +89,10 @@ impl LogicalTypeId {
LogicalTypeId::List => {
ConcreteDataType::list_datatype(ConcreteDataType::null_datatype())
}
LogicalTypeId::Dictionary => ConcreteDataType::dictionary_datatype(
ConcreteDataType::null_datatype(),
ConcreteDataType::null_datatype(),
),
}
}
}

View File

@@ -16,17 +16,18 @@ mod binary_type;
mod boolean_type;
mod date_type;
mod datetime_type;
mod dictionary_type;
mod list_type;
mod null_type;
mod primitive_type;
mod string_type;
mod timestamp_type;
pub use binary_type::BinaryType;
pub use boolean_type::BooleanType;
pub use date_type::DateType;
pub use datetime_type::DateTimeType;
pub use dictionary_type::DictionaryType;
pub use list_type::ListType;
pub use null_type::NullType;
pub use primitive_type::{

View File

@@ -0,0 +1,91 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use arrow::datatypes::DataType as ArrowDataType;
use serde::{Deserialize, Serialize};
use crate::data_type::{ConcreteDataType, DataType};
use crate::type_id::LogicalTypeId;
use crate::value::Value;
use crate::vectors::MutableVector;
/// Used to represent the Dictionary datatype.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DictionaryType {
// Use Box to avoid recursive dependency, as enum ConcreteDataType depends on DictionaryType.
/// The type of Dictionary key.
key_type: Box<ConcreteDataType>,
/// The type of Dictionary value.
value_type: Box<ConcreteDataType>,
}
impl Default for DictionaryType {
fn default() -> Self {
DictionaryType::new(
ConcreteDataType::null_datatype(),
ConcreteDataType::null_datatype(),
)
}
}
impl DictionaryType {
/// Create a new `DictionaryType` whose item's data type is `item_type`.
pub fn new(key_type: ConcreteDataType, value_type: ConcreteDataType) -> Self {
DictionaryType {
key_type: Box::new(key_type),
value_type: Box::new(value_type),
}
}
/// Returns the key data type.
#[inline]
pub fn key_type(&self) -> &ConcreteDataType {
&self.key_type
}
/// Returns the value data type.
#[inline]
pub fn value_type(&self) -> &ConcreteDataType {
&self.value_type
}
}
impl DataType for DictionaryType {
fn name(&self) -> &str {
"Dictionary"
}
fn logical_type_id(&self) -> LogicalTypeId {
LogicalTypeId::Dictionary
}
fn default_value(&self) -> Value {
unimplemented!()
}
fn as_arrow_type(&self) -> ArrowDataType {
ArrowDataType::Dictionary(
Box::new(self.key_type.as_arrow_type()),
Box::new(self.value_type.as_arrow_type()),
)
}
fn create_mutable_vector(&self, _capacity: usize) -> Box<dyn MutableVector> {
unimplemented!()
}
fn is_timestamp_compatible(&self) -> bool {
false
}
}

View File

@@ -273,6 +273,10 @@ fn to_null_value(output_type: &ConcreteDataType) -> ScalarValue {
ConcreteDataType::List(_) => {
ScalarValue::List(None, Box::new(new_item_field(output_type.as_arrow_type())))
}
ConcreteDataType::Dictionary(dict) => ScalarValue::Dictionary(
Box::new(dict.key_type().as_arrow_type()),
Box::new(to_null_value(dict.value_type())),
),
}
}
@@ -513,6 +517,17 @@ impl Ord for ListValue {
}
}
// TODO(ruihang): Implement this type
/// Dictionary value.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct DictionaryValue {
/// Inner values datatypes
key_type: ConcreteDataType,
value_type: ConcreteDataType,
}
impl Eq for DictionaryValue {}
impl TryFrom<ScalarValue> for Value {
type Error = error::Error;

View File

@@ -95,7 +95,7 @@ fn equal(lhs: &dyn Vector, rhs: &dyn Vector) -> bool {
},
List(_) => is_vector_eq!(ListVector, lhs, rhs),
UInt8(_) | UInt16(_) | UInt32(_) | UInt64(_) | Int8(_) | Int16(_) | Int32(_) | Int64(_)
| Float32(_) | Float64(_) => {
| Float32(_) | Float64(_) | Dictionary(_) => {
with_match_primitive_type_id!(lhs_type.logical_type_id(), |$T| {
let lhs = lhs.as_any().downcast_ref::<PrimitiveVector<$T>>().unwrap();
let rhs = rhs.as_any().downcast_ref::<PrimitiveVector<$T>>().unwrap();

View File

@@ -18,7 +18,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use datafusion::arrow::array::{Array, Int64Array, TimestampMillisecondArray};
use datafusion::arrow::array::{Array, ArrayRef, Int64Array, TimestampMillisecondArray};
use datafusion::arrow::compute;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::ArrowError;
@@ -80,6 +80,14 @@ impl RangeManipulate {
})
}
pub fn range_timestamp_name(&self) -> String {
Self::build_timestamp_range_name(&self.time_index)
}
pub fn build_timestamp_range_name(time_index: &str) -> String {
format!("{time_index}_range")
}
fn calculate_output_schema(
input_schema: &DFSchemaRef,
time_index: &str,
@@ -88,8 +96,15 @@ impl RangeManipulate {
let mut columns = input_schema.fields().clone();
// process time index column
// the raw timestamp field is preserved. And a new timestamp_range field is appended to the last.
let index = input_schema.index_of_column_by_name(None, time_index)?;
columns[index] = DFField::from(RangeArray::convert_field(columns[index].field()));
let timestamp_range_field = columns[index]
.field()
.clone()
.with_name(Self::build_timestamp_range_name(time_index));
columns.push(DFField::from(RangeArray::convert_field(
&timestamp_range_field,
)));
// process value columns
for name in value_columns {
@@ -110,6 +125,7 @@ impl RangeManipulate {
interval: self.interval,
range: self.range,
time_index_column: self.time_index.clone(),
time_range_column: self.range_timestamp_name(),
value_columns: self.value_columns.clone(),
input: exec_input,
output_schema: SchemaRef::new(self.output_schema.as_ref().into()),
@@ -170,6 +186,7 @@ pub struct RangeManipulateExec {
interval: Millisecond,
range: Millisecond,
time_index_column: String,
time_range_column: String,
value_columns: Vec<String>,
input: Arc<dyn ExecutionPlan>,
@@ -213,6 +230,7 @@ impl ExecutionPlan for RangeManipulateExec {
interval: self.interval,
range: self.range,
time_index_column: self.time_index_column.clone(),
time_range_column: self.time_range_column.clone(),
value_columns: self.value_columns.clone(),
output_schema: self.output_schema.clone(),
input: children[0].clone(),
@@ -333,10 +351,11 @@ impl RangeManipulateStream {
pub fn manipulate(&self, input: RecordBatch) -> ArrowResult<RecordBatch> {
let mut other_columns = (0..input.columns().len()).collect::<HashSet<_>>();
// calculate the range
let ranges = self.calculate_range(&input);
let (aligned_ts, ranges) = self.calculate_range(&input);
// transform columns
let mut new_columns = input.columns().to_vec();
for index in self.value_columns.iter().chain([self.time_index].iter()) {
for index in self.value_columns.iter() {
other_columns.remove(index);
let column = input.column(*index);
let new_column = Arc::new(
@@ -347,26 +366,37 @@ impl RangeManipulateStream {
new_columns[*index] = new_column;
}
// push timestamp range column
let ts_range_column =
RangeArray::from_ranges(input.column(self.time_index).clone(), ranges.clone())
.map_err(|e| ArrowError::InvalidArgumentError(e.to_string()))?
.into_dict();
new_columns.push(Arc::new(ts_range_column));
// truncate other columns
let take_indices = Int64Array::from(vec![0; ranges.len()]);
for index in other_columns.into_iter() {
new_columns[index] = compute::take(&input.column(index), &take_indices, None)?;
}
// replace timestamp with the aligned one
new_columns[self.time_index] = aligned_ts;
RecordBatch::try_new(self.output_schema.clone(), new_columns)
}
fn calculate_range(&self, input: &RecordBatch) -> Vec<(u32, u32)> {
fn calculate_range(&self, input: &RecordBatch) -> (ArrayRef, Vec<(u32, u32)>) {
let ts_column = input
.column(self.time_index)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
let mut result = vec![];
let mut aligned_ts = vec![];
let mut ranges = vec![];
// calculate for every aligned timestamp (`curr_ts`), assume the ts column is ordered.
for curr_ts in (self.start..=self.end).step_by(self.interval as _) {
aligned_ts.push(curr_ts);
let mut range_start = ts_column.len();
let mut range_end = 0;
for (index, ts) in ts_column.values().iter().enumerate() {
@@ -380,13 +410,15 @@ impl RangeManipulateStream {
}
}
if range_start > range_end {
result.push((0, 0));
ranges.push((0, 0));
} else {
result.push((range_start as _, (range_end + 1 - range_start) as _));
ranges.push((range_start as _, (range_end + 1 - range_start) as _));
}
}
result
let aligned_ts_array = Arc::new(TimestampMillisecondArray::from(aligned_ts)) as _;
(aligned_ts_array, ranges)
}
}
@@ -461,6 +493,7 @@ mod test {
range,
value_columns,
output_schema: manipulate_output_schema,
time_range_column: RangeManipulate::build_timestamp_range_name(&time_index),
time_index_column: time_index,
input: memory_exec,
metric: ExecutionPlanMetricsSet::new(),
@@ -499,32 +532,52 @@ mod test {
#[tokio::test]
async fn interval_30s_range_90s() {
let expected = String::from(
"RangeArray { \
"PrimitiveArray<Timestamp(Millisecond, None)>\n[\n \
1970-01-01T00:00:00,\n \
1970-01-01T00:00:30,\n \
1970-01-01T00:01:00,\n \
1970-01-01T00:01:30,\n \
1970-01-01T00:02:00,\n \
1970-01-01T00:02:30,\n \
1970-01-01T00:03:00,\n \
1970-01-01T00:03:30,\n \
1970-01-01T00:04:00,\n \
1970-01-01T00:04:30,\n \
1970-01-01T00:05:00,\n\
]\nRangeArray { \
base array: PrimitiveArray<Float64>\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \
ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
}\nRangeArray { \
base array: PrimitiveArray<Float64>\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \
ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
}\nStringArray\n[\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n]\n\
RangeArray { \
base array: PrimitiveArray<Timestamp(Millisecond, None)>\n[\n 1970-01-01T00:00:00,\n 1970-01-01T00:00:30,\n 1970-01-01T00:01:00,\n 1970-01-01T00:01:30,\n 1970-01-01T00:02:00,\n 1970-01-01T00:03:00,\n 1970-01-01T00:04:00,\n 1970-01-01T00:04:01,\n 1970-01-01T00:04:31,\n 1970-01-01T00:04:51,\n], \
ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
}\nRangeArray { \
base array: PrimitiveArray<Float64>\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \
ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
}\nRangeArray { \
base array: PrimitiveArray<Float64>\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \
ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
}\nStringArray\n[\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n]");
}",
);
do_normalize_test(0, 310_000, 30_000, 90_000, expected).await;
}
#[tokio::test]
async fn small_empty_range() {
let expected = String::from(
"RangeArray { \
"PrimitiveArray<Timestamp(Millisecond, None)>\n[\n \
1970-01-01T00:00:00.001,\n \
1970-01-01T00:00:03.001,\n \
1970-01-01T00:00:06.001,\n \
1970-01-01T00:00:09.001,\n\
]\nRangeArray { \
base array: PrimitiveArray<Float64>\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \
ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
}\nRangeArray { \
base array: PrimitiveArray<Float64>\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \
ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
}\nStringArray\n[\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n]\n\
RangeArray { \
base array: PrimitiveArray<Timestamp(Millisecond, None)>\n[\n 1970-01-01T00:00:00,\n 1970-01-01T00:00:30,\n 1970-01-01T00:01:00,\n 1970-01-01T00:01:30,\n 1970-01-01T00:02:00,\n 1970-01-01T00:03:00,\n 1970-01-01T00:04:00,\n 1970-01-01T00:04:01,\n 1970-01-01T00:04:31,\n 1970-01-01T00:04:51,\n], \
ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
}\nRangeArray { \
base array: PrimitiveArray<Float64>\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \
ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
}\nRangeArray { \
base array: PrimitiveArray<Float64>\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \
ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
}\nStringArray\n[\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n]");
}");
do_normalize_test(1, 10_001, 3_000, 1_000, expected).await;
}
}

View File

@@ -15,11 +15,12 @@
use std::fmt::Display;
use std::sync::Arc;
use datafusion::arrow::array::{Float64Array, Int64Array};
use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray};
use datafusion::arrow::datatypes::TimeUnit;
use datafusion::common::DataFusionError;
use datafusion::logical_expr::{ScalarUDF, Signature, TypeSignature, Volatility};
use datafusion::physical_plan::ColumnarValue;
use datatypes::arrow::array::{Array, PrimitiveArray};
use datatypes::arrow::array::Array;
use datatypes::arrow::datatypes::DataType;
use crate::error;
@@ -55,7 +56,7 @@ impl<const IS_RATE: bool> IDelta<IS_RATE> {
// time index column and value column
fn input_type() -> Vec<DataType> {
vec![
RangeArray::convert_data_type(DataType::Int64),
RangeArray::convert_data_type(DataType::Timestamp(TimeUnit::Millisecond, None)),
RangeArray::convert_data_type(DataType::Float64),
]
}
@@ -82,9 +83,9 @@ impl<const IS_RATE: bool> IDelta<IS_RATE> {
)),
)?;
error::ensure(
ts_range.value_type() == DataType::Int64,
ts_range.value_type() == DataType::Timestamp(TimeUnit::Millisecond, None),
DataFusionError::Execution(format!(
"{}: expect Int64 as time index array's type, found {}",
"{}: expect TimestampMillisecond as time index array's type, found {}",
Self::name(),
ts_range.value_type()
)),
@@ -92,7 +93,7 @@ impl<const IS_RATE: bool> IDelta<IS_RATE> {
error::ensure(
value_range.value_type() == DataType::Float64,
DataFusionError::Execution(format!(
"{}: expect Int64 as time index array's type, found {}",
"{}: expect Float64 as value array's type, found {}",
Self::name(),
value_range.value_type()
)),
@@ -105,7 +106,7 @@ impl<const IS_RATE: bool> IDelta<IS_RATE> {
let timestamps = ts_range.get(index).unwrap();
let timestamps = timestamps
.as_any()
.downcast_ref::<Int64Array>()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.values();
@@ -127,13 +128,13 @@ impl<const IS_RATE: bool> IDelta<IS_RATE> {
let len = timestamps.len();
if len < 2 {
result_array.push(0.0);
result_array.push(None);
continue;
}
// if is delta
if !IS_RATE {
result_array.push(values[len - 1] - values[len - 2]);
result_array.push(Some(values[len - 1] - values[len - 2]));
continue;
}
@@ -150,10 +151,10 @@ impl<const IS_RATE: bool> IDelta<IS_RATE> {
last_value - prev_value
};
result_array.push(result_value / sampled_interval as f64);
result_array.push(Some(result_value / sampled_interval as f64));
}
let result = ColumnarValue::Array(Arc::new(PrimitiveArray::from_iter(result_array)));
let result = ColumnarValue::Array(Arc::new(Float64Array::from_iter(result_array)));
Ok(result)
}
}
@@ -201,9 +202,11 @@ mod test {
#[test]
fn basic_idelta_and_irate() {
let ts_array = Arc::new(Int64Array::from_iter([
1000, 3000, 5000, 7000, 9000, 11000, 13000, 15000, 17000,
]));
let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
[1000i64, 3000, 5000, 7000, 9000, 11000, 13000, 15000, 17000]
.into_iter()
.map(Some),
));
let ts_ranges = [(0, 2), (0, 5), (1, 1), (3, 3), (8, 1), (9, 0)];
let values_array = Arc::new(Float64Array::from_iter([

View File

@@ -19,7 +19,7 @@ use datafusion::arrow::array::Float64Array;
use datafusion::common::DataFusionError;
use datafusion::logical_expr::{ScalarUDF, Signature, TypeSignature, Volatility};
use datafusion::physical_plan::ColumnarValue;
use datatypes::arrow::array::{Array, PrimitiveArray};
use datatypes::arrow::array::Array;
use datatypes::arrow::datatypes::DataType;
use crate::functions::extract_array;
@@ -61,7 +61,7 @@ impl Increase {
.values();
if range.len() < 2 {
result_array.push(0.0);
result_array.push(None);
continue;
}
@@ -75,10 +75,10 @@ impl Increase {
}
}
result_array.push(result_value);
result_array.push(Some(result_value));
}
let result = ColumnarValue::Array(Arc::new(PrimitiveArray::from_iter(result_array)));
let result = ColumnarValue::Array(Arc::new(Float64Array::from_iter(result_array)));
Ok(result)
}

View File

@@ -23,7 +23,7 @@ use datafusion::logical_expr::expr::AggregateFunction;
use datafusion::logical_expr::expr_rewriter::normalize_cols;
use datafusion::logical_expr::{
AggregateFunction as AggregateFunctionEnum, BinaryExpr, BuiltinScalarFunction, Cast, Extension,
LogicalPlan, LogicalPlanBuilder, Operator,
LogicalPlan, LogicalPlanBuilder, Operator, ScalarUDF,
};
use datafusion::optimizer::utils;
use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
@@ -48,6 +48,7 @@ use crate::error::{
use crate::extension_plan::{
InstantManipulate, Millisecond, RangeManipulate, SeriesDivide, SeriesNormalize,
};
use crate::functions::{IDelta, Increase};
const LEFT_PLAN_JOIN_ALIAS: &str = "lhs";
@@ -603,8 +604,11 @@ impl<S: ContextProvider> PromPlanner<S> {
Ok(result)
}
/// # Side Effects
///
/// This method will update [PromPlannerContext]'s value fields.
fn create_function_expr(
&self,
&mut self,
func: &Function,
mut other_input_exprs: Vec<DfExpr>,
) -> Result<Vec<DfExpr>> {
@@ -612,26 +616,67 @@ impl<S: ContextProvider> PromPlanner<S> {
// TODO(ruihang): set this according to in-param list
let value_column_pos = 0;
let scalar_func = BuiltinScalarFunction::from_str(func.name).map_err(|_| {
UnsupportedExprSnafu {
name: func.name.to_string(),
}
.build()
})?;
let scalar_func = match func.name {
"increase" => ScalarFunc::Udf(Increase::scalar_udf()),
"idelta" => ScalarFunc::Udf(IDelta::<false>::scalar_udf()),
"irate" => ScalarFunc::Udf(IDelta::<true>::scalar_udf()),
_ => ScalarFunc::DataFusionBuiltin(
BuiltinScalarFunction::from_str(func.name).map_err(|_| {
UnsupportedExprSnafu {
name: func.name.to_string(),
}
.build()
})?,
),
};
// TODO(ruihang): handle those functions doesn't require input
let mut exprs = Vec::with_capacity(self.ctx.value_columns.len());
for value in &self.ctx.value_columns {
let col_expr = DfExpr::Column(Column::from_name(value));
other_input_exprs.insert(value_column_pos, col_expr);
let fn_expr = DfExpr::ScalarFunction {
fun: scalar_func.clone(),
args: other_input_exprs.clone(),
};
exprs.push(fn_expr);
other_input_exprs.remove(value_column_pos);
match scalar_func.clone() {
ScalarFunc::DataFusionBuiltin(fun) => {
other_input_exprs.insert(value_column_pos, col_expr);
let fn_expr = DfExpr::ScalarFunction {
fun,
args: other_input_exprs.clone(),
};
exprs.push(fn_expr);
other_input_exprs.remove(value_column_pos);
}
ScalarFunc::Udf(fun) => {
let ts_range_expr = DfExpr::Column(Column::from_name(
RangeManipulate::build_timestamp_range_name(
self.ctx.time_index_column.as_ref().unwrap(),
),
));
other_input_exprs.insert(value_column_pos, ts_range_expr);
other_input_exprs.insert(value_column_pos + 1, col_expr);
let fn_expr = DfExpr::ScalarUDF {
fun: Arc::new(fun),
args: other_input_exprs.clone(),
};
exprs.push(fn_expr);
other_input_exprs.remove(value_column_pos + 1);
other_input_exprs.remove(value_column_pos);
}
}
}
// update value columns' name, and alias them to remove qualifiers
let mut new_value_columns = Vec::with_capacity(exprs.len());
exprs = exprs
.into_iter()
.map(|expr| {
let display_name = expr.display_name()?;
new_value_columns.push(display_name.clone());
Ok(expr.alias(display_name))
})
.collect::<std::result::Result<Vec<_>, _>>()
.context(DataFusionPlanningSnafu)?;
self.ctx.value_columns = new_value_columns;
Ok(exprs)
}
@@ -694,8 +739,8 @@ impl<S: ContextProvider> PromPlanner<S> {
token::T_MIN => AggregateFunctionEnum::Min,
token::T_MAX => AggregateFunctionEnum::Max,
token::T_GROUP => AggregateFunctionEnum::Grouping,
token::T_STDDEV => AggregateFunctionEnum::Stddev,
token::T_STDVAR => AggregateFunctionEnum::Variance,
token::T_STDDEV => AggregateFunctionEnum::StddevPop,
token::T_STDVAR => AggregateFunctionEnum::VariancePop,
token::T_TOPK | token::T_BOTTOMK | token::T_COUNT_VALUES | token::T_QUANTILE => {
UnsupportedExprSnafu {
name: format!("{op:?}"),
@@ -919,6 +964,12 @@ struct FunctionArgs {
literals: Vec<DfExpr>,
}
#[derive(Debug, Clone)]
enum ScalarFunc {
DataFusionBuiltin(BuiltinScalarFunction),
Udf(ScalarUDF),
}
#[cfg(test)]
mod test {
use std::time::{Duration, UNIX_EPOCH};
@@ -1028,8 +1079,8 @@ mod test {
let plan = PromPlanner::stmt_to_plan(eval_stmt, context_provider).unwrap();
let expected = String::from(
"Filter: some_metric.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, tag_0:Utf8]\
\n Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, tag_0:Utf8]\
"Filter: TEMPLATE(field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
\n Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) AS TEMPLATE(field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
@@ -1211,9 +1262,11 @@ mod test {
// },
// },
// },
async fn do_aggregate_expr_plan(name: &str) {
let prom_expr =
parser::parse(&format!("{name} by (tag_1)(some_metric{{tag_0!=\"bar\"}})",)).unwrap();
async fn do_aggregate_expr_plan(fn_name: &str, plan_name: &str) {
let prom_expr = parser::parse(&format!(
"{fn_name} by (tag_1)(some_metric{{tag_0!=\"bar\"}})",
))
.unwrap();
let mut eval_stmt = EvalStmt {
expr: prom_expr,
start: UNIX_EPOCH,
@@ -1236,7 +1289,7 @@ mod test {
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.tag_1 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
).replace("TEMPLATE", name);
).replace("TEMPLATE", plan_name);
assert_eq!(
plan.display_indent_schema().to_string(),
expected_no_without
@@ -1259,75 +1312,74 @@ mod test {
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.tag_1 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
\n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
).replace("TEMPLATE", name);
).replace("TEMPLATE", plan_name);
assert_eq!(plan.display_indent_schema().to_string(), expected_without);
}
#[tokio::test]
async fn aggregate_sum() {
do_aggregate_expr_plan("SUM").await;
do_aggregate_expr_plan("sum", "SUM").await;
}
#[tokio::test]
async fn aggregate_avg() {
do_aggregate_expr_plan("AVG").await;
do_aggregate_expr_plan("avg", "AVG").await;
}
#[tokio::test]
#[should_panic] // output type doesn't match
async fn aggregate_count() {
do_aggregate_expr_plan("COUNT").await;
do_aggregate_expr_plan("count", "COUNT").await;
}
#[tokio::test]
async fn aggregate_min() {
do_aggregate_expr_plan("MIN").await;
do_aggregate_expr_plan("min", "MIN").await;
}
#[tokio::test]
async fn aggregate_max() {
do_aggregate_expr_plan("MAX").await;
do_aggregate_expr_plan("max", "MAX").await;
}
#[tokio::test]
#[should_panic] // output type doesn't match
async fn aggregate_group() {
do_aggregate_expr_plan("GROUPING").await;
do_aggregate_expr_plan("grouping", "GROUPING").await;
}
#[tokio::test]
async fn aggregate_stddev() {
do_aggregate_expr_plan("STDDEV").await;
do_aggregate_expr_plan("stddev", "STDDEVPOP").await;
}
#[tokio::test]
#[should_panic] // schema doesn't match
async fn aggregate_stdvar() {
do_aggregate_expr_plan("STDVAR").await;
do_aggregate_expr_plan("stdvar", "VARIANCEPOP").await;
}
#[tokio::test]
#[should_panic]
async fn aggregate_top_k() {
do_aggregate_expr_plan("").await;
do_aggregate_expr_plan("topk", "").await;
}
#[tokio::test]
#[should_panic]
async fn aggregate_bottom_k() {
do_aggregate_expr_plan("").await;
do_aggregate_expr_plan("bottomk", "").await;
}
#[tokio::test]
#[should_panic]
async fn aggregate_count_values() {
do_aggregate_expr_plan("").await;
do_aggregate_expr_plan("count_values", "").await;
}
#[tokio::test]
#[should_panic]
async fn aggregate_quantile() {
do_aggregate_expr_plan("").await;
do_aggregate_expr_plan("quantile", "").await;
}
// TODO(ruihang): add range fn tests once exprs are ready.
@@ -1475,6 +1527,24 @@ mod test {
indie_query_plan_compare(query, expected).await;
}
#[tokio::test]
async fn increase_aggr() {
let query = "increase(some_metric[5m])";
let expected = String::from(
"Filter: prom_increase(timestamp_range,field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
\n Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0) AS prom_increase(timestamp_range,field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
\n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric, unsupported_filters=[timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
);
indie_query_plan_compare(query, expected).await;
}
#[tokio::test]
async fn less_filter_on_value() {
let query = "some_metric < 1.2345";
@@ -1486,6 +1556,7 @@ mod test {
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric, unsupported_filters=[timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
);
indie_query_plan_compare(query, expected).await;

View File

@@ -783,6 +783,7 @@ pub fn pyobj_try_to_typed_val(
}
}
ConcreteDataType::List(_) => unreachable!(),
ConcreteDataType::Dictionary(_) => unreachable!(),
ConcreteDataType::Date(_)
| ConcreteDataType::DateTime(_)
| ConcreteDataType::Timestamp(_) => {

View File

@@ -285,7 +285,7 @@ fn type_gt_to_pg(origin: &ConcreteDataType) -> Result<Type> {
&ConcreteDataType::Date(_) => Ok(Type::DATE),
&ConcreteDataType::DateTime(_) => Ok(Type::TIMESTAMP),
&ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP),
&ConcreteDataType::List(_) => error::InternalSnafu {
&ConcreteDataType::List(_) | &ConcreteDataType::Dictionary(_) => error::InternalSnafu {
err_msg: format!("not implemented for column datatype {origin:?}"),
}
.fail(),