From 4cc3ac37d55631792bb4f33302c3677b4edf9af4 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 23 Feb 2023 20:31:07 +0800 Subject: [PATCH] feat: add DictionaryVector DataType (#1061) * fix stddev and stdvar. try build range function expr Signed-off-by: Ruihang Xia * feat: add dictionary data type Signed-off-by: Ruihang Xia * preserve timestamp column in range manipulator Signed-off-by: Ruihang Xia * plan range functions Signed-off-by: Ruihang Xia * update test result Signed-off-by: Ruihang Xia * fix typo Signed-off-by: Ruihang Xia * update test result Signed-off-by: Ruihang Xia * update test result Signed-off-by: Ruihang Xia * resolve CR comments Signed-off-by: Ruihang Xia * resolve CR comments Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/api/src/helper.rs | 4 +- src/common/grpc-expr/src/insert.rs | 5 +- src/common/grpc/src/select.rs | 2 +- src/common/substrait/src/types.rs | 4 +- src/datanode/src/tests/promql_test.rs | 51 +++--- src/datatypes/src/data_type.rs | 21 ++- src/datatypes/src/type_id.rs | 5 + src/datatypes/src/types.rs | 3 +- src/datatypes/src/types/dictionary_type.rs | 91 +++++++++++ src/datatypes/src/value.rs | 15 ++ src/datatypes/src/vectors/eq.rs | 2 +- .../src/extension_plan/range_manipulate.rs | 103 ++++++++++--- src/promql/src/functions/idelta.rs | 31 ++-- src/promql/src/functions/increase.rs | 8 +- src/promql/src/planner.rs | 145 +++++++++++++----- src/script/src/python/vector.rs | 1 + src/servers/src/postgres/handler.rs | 2 +- 17 files changed, 376 insertions(+), 117 deletions(-) create mode 100644 src/datatypes/src/types/dictionary_type.rs diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 6eea325037..4409801edc 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -97,7 +97,9 @@ impl TryFrom for ColumnDataTypeWrapper { TimestampType::Microsecond(_) => ColumnDataType::TimestampMicrosecond, TimestampType::Nanosecond(_) => ColumnDataType::TimestampNanosecond, }, - ConcreteDataType::Null(_) | ConcreteDataType::List(_) => { + ConcreteDataType::Null(_) + | ConcreteDataType::List(_) + | ConcreteDataType::Dictionary(_) => { return error::IntoColumnDataTypeSnafu { from: datatype }.fail() } }); diff --git a/src/common/grpc-expr/src/insert.rs b/src/common/grpc-expr/src/insert.rs index 6053b8f996..559c3cc8b3 100644 --- a/src/common/grpc-expr/src/insert.rs +++ b/src/common/grpc-expr/src/insert.rs @@ -419,8 +419,9 @@ fn convert_values(data_type: &ConcreteDataType, values: Values) -> Vec { .into_iter() .map(|v| Value::Timestamp(Timestamp::new_millisecond(v))) .collect(), - ConcreteDataType::Null(_) => unreachable!(), - ConcreteDataType::List(_) => unreachable!(), + ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => { + unreachable!() + } } } diff --git a/src/common/grpc/src/select.rs b/src/common/grpc/src/select.rs index 010630b635..55e15adf77 100644 --- a/src/common/grpc/src/select.rs +++ b/src/common/grpc/src/select.rs @@ -67,7 +67,7 @@ macro_rules! convert_arrow_array_to_grpc_vals { return Ok(vals); }, )+ - ConcreteDataType::Null(_) | ConcreteDataType::List(_) => unreachable!("Should not send {:?} in gRPC", $data_type), + ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => unreachable!("Should not send {:?} in gRPC", $data_type), } }}; } diff --git a/src/common/substrait/src/types.rs b/src/common/substrait/src/types.rs index c390ad4eb9..35d7a5e39b 100644 --- a/src/common/substrait/src/types.rs +++ b/src/common/substrait/src/types.rs @@ -131,7 +131,9 @@ pub fn from_concrete_type(ty: ConcreteDataType, nullability: Option) -> Re ConcreteDataType::Timestamp(_) => { build_substrait_kind!(Timestamp, Timestamp, nullability, 0) } - ConcreteDataType::List(_) => UnsupportedConcreteTypeSnafu { ty }.fail()?, + ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => { + UnsupportedConcreteTypeSnafu { ty }.fail()? + } }; Ok(SType { kind }) diff --git a/src/datanode/src/tests/promql_test.rs b/src/datanode/src/tests/promql_test.rs index b64b08896d..a62b21e7c8 100644 --- a/src/datanode/src/tests/promql_test.rs +++ b/src/datanode/src/tests/promql_test.rs @@ -103,21 +103,22 @@ async fn sql_insert_tql_query_ceil() { ('host1', 49, 3333.3, 99000); "#, "TQL EVAL (0,100,10) ceil(http_requests_total{host=\"host1\"})", - "+---------------------+-------------------------------+----------------------------------+-------+\ - \n| ts | ceil(http_requests_total.cpu) | ceil(http_requests_total.memory) | host |\ - \n+---------------------+-------------------------------+----------------------------------+-------+\ - \n| 1970-01-01T00:00:00 | 67 | 1024 | host1 |\ - \n| 1970-01-01T00:00:10 | 100 | 20480 | host1 |\ - \n| 1970-01-01T00:00:20 | 100 | 20480 | host1 |\ - \n| 1970-01-01T00:00:30 | 32 | 8192 | host1 |\ - \n| 1970-01-01T00:00:40 | 96 | 334 | host1 |\ - \n| 1970-01-01T00:00:50 | 12424 | 1334 | host1 |\ - \n| 1970-01-01T00:01:00 | 12424 | 1334 | host1 |\ - \n| 1970-01-01T00:01:10 | 12424 | 1334 | host1 |\ - \n| 1970-01-01T00:01:20 | 0 | 2334 | host1 |\ - \n| 1970-01-01T00:01:30 | 0 | 2334 | host1 |\ - \n| 1970-01-01T00:01:40 | 49 | 3334 | host1 |\ - \n+---------------------+-------------------------------+----------------------------------+-------+") + "+---------------------+-----------+--------------+-------+\ + \n| ts | ceil(cpu) | ceil(memory) | host |\ + \n+---------------------+-----------+--------------+-------+\ + \n| 1970-01-01T00:00:00 | 67 | 1024 | host1 |\ + \n| 1970-01-01T00:00:10 | 100 | 20480 | host1 |\ + \n| 1970-01-01T00:00:20 | 100 | 20480 | host1 |\ + \n| 1970-01-01T00:00:30 | 32 | 8192 | host1 |\ + \n| 1970-01-01T00:00:40 | 96 | 334 | host1 |\ + \n| 1970-01-01T00:00:50 | 12424 | 1334 | host1 |\ + \n| 1970-01-01T00:01:00 | 12424 | 1334 | host1 |\ + \n| 1970-01-01T00:01:10 | 12424 | 1334 | host1 |\ + \n| 1970-01-01T00:01:20 | 0 | 2334 | host1 |\ + \n| 1970-01-01T00:01:30 | 0 | 2334 | host1 |\ + \n| 1970-01-01T00:01:40 | 49 | 3334 | host1 |\ + \n+---------------------+-----------+--------------+-------+", + ) .await; } @@ -150,16 +151,16 @@ async fn sql_insert_promql_query_ceil() { UNIX_EPOCH.checked_add(Duration::from_secs(100)).unwrap(), Duration::from_secs(5), Duration::from_secs(1), - "+---------------------+-------------------------------+----------------------------------+-------+\ - \n| ts | ceil(http_requests_total.cpu) | ceil(http_requests_total.memory) | host |\ - \n+---------------------+-------------------------------+----------------------------------+-------+\ - \n| 1970-01-01T00:00:00 | 67 | 1024 | host1 |\ - \n| 1970-01-01T00:00:05 | 67 | 4096 | host1 |\ - \n| 1970-01-01T00:00:10 | 100 | 20480 | host1 |\ - \n| 1970-01-01T00:01:20 | 0 | 2334 | host1 |\ - \n| 1970-01-01T00:01:40 | 49 | 3334 | host1 |\ - \n| 1970-01-01T00:00:50 | 12424 | 1334 | host1 |\ - \n+---------------------+-------------------------------+----------------------------------+-------+" + "+---------------------+-----------+--------------+-------+\ + \n| ts | ceil(cpu) | ceil(memory) | host |\ + \n+---------------------+-----------+--------------+-------+\ + \n| 1970-01-01T00:00:00 | 67 | 1024 | host1 |\ + \n| 1970-01-01T00:00:05 | 67 | 4096 | host1 |\ + \n| 1970-01-01T00:00:10 | 100 | 20480 | host1 |\ + \n| 1970-01-01T00:00:50 | 12424 | 1334 | host1 |\ + \n| 1970-01-01T00:01:20 | 0 | 2334 | host1 |\ + \n| 1970-01-01T00:01:40 | 49 | 3334 | host1 |\ + \n+---------------------+-----------+--------------+-------+", ) .await; } diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index 0f0f971894..33d74daefe 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -22,10 +22,10 @@ use serde::{Deserialize, Serialize}; use crate::error::{self, Error, Result}; use crate::type_id::LogicalTypeId; use crate::types::{ - BinaryType, BooleanType, DateTimeType, DateType, Float32Type, Float64Type, Int16Type, - Int32Type, Int64Type, Int8Type, ListType, NullType, StringType, TimestampMicrosecondType, - TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, TimestampType, - UInt16Type, UInt32Type, UInt64Type, UInt8Type, + BinaryType, BooleanType, DateTimeType, DateType, DictionaryType, Float32Type, Float64Type, + Int16Type, Int32Type, Int64Type, Int8Type, ListType, NullType, StringType, + TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, + TimestampSecondType, TimestampType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, }; use crate::value::Value; use crate::vectors::MutableVector; @@ -59,6 +59,7 @@ pub enum ConcreteDataType { // Compound types: List(ListType), + Dictionary(DictionaryType), } // TODO(yingwen): Refactor these `is_xxx()` methods, such as adding a `properties()` method @@ -169,6 +170,11 @@ impl TryFrom<&ArrowDataType> for ConcreteDataType { ArrowDataType::List(field) => Self::List(ListType::new( ConcreteDataType::from_arrow_type(field.data_type()), )), + ArrowDataType::Dictionary(key_type, value_type) => { + let key_type = ConcreteDataType::from_arrow_type(key_type); + let value_type = ConcreteDataType::from_arrow_type(value_type); + Self::Dictionary(DictionaryType::new(key_type, value_type)) + } _ => { return error::UnsupportedArrowTypeSnafu { arrow_type: dt.clone(), @@ -243,6 +249,13 @@ impl ConcreteDataType { pub fn list_datatype(item_type: ConcreteDataType) -> ConcreteDataType { ConcreteDataType::List(ListType::new(item_type)) } + + pub fn dictionary_datatype( + key_type: ConcreteDataType, + value_type: ConcreteDataType, + ) -> ConcreteDataType { + ConcreteDataType::Dictionary(DictionaryType::new(key_type, value_type)) + } } /// Data type abstraction. diff --git a/src/datatypes/src/type_id.rs b/src/datatypes/src/type_id.rs index 5548d95147..77718af6c6 100644 --- a/src/datatypes/src/type_id.rs +++ b/src/datatypes/src/type_id.rs @@ -48,6 +48,7 @@ pub enum LogicalTypeId { TimestampNanosecond, List, + Dictionary, } impl LogicalTypeId { @@ -88,6 +89,10 @@ impl LogicalTypeId { LogicalTypeId::List => { ConcreteDataType::list_datatype(ConcreteDataType::null_datatype()) } + LogicalTypeId::Dictionary => ConcreteDataType::dictionary_datatype( + ConcreteDataType::null_datatype(), + ConcreteDataType::null_datatype(), + ), } } } diff --git a/src/datatypes/src/types.rs b/src/datatypes/src/types.rs index 953baa617a..dcb7a9fdb4 100644 --- a/src/datatypes/src/types.rs +++ b/src/datatypes/src/types.rs @@ -16,17 +16,18 @@ mod binary_type; mod boolean_type; mod date_type; mod datetime_type; +mod dictionary_type; mod list_type; mod null_type; mod primitive_type; mod string_type; - mod timestamp_type; pub use binary_type::BinaryType; pub use boolean_type::BooleanType; pub use date_type::DateType; pub use datetime_type::DateTimeType; +pub use dictionary_type::DictionaryType; pub use list_type::ListType; pub use null_type::NullType; pub use primitive_type::{ diff --git a/src/datatypes/src/types/dictionary_type.rs b/src/datatypes/src/types/dictionary_type.rs new file mode 100644 index 0000000000..89aed5e051 --- /dev/null +++ b/src/datatypes/src/types/dictionary_type.rs @@ -0,0 +1,91 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use arrow::datatypes::DataType as ArrowDataType; +use serde::{Deserialize, Serialize}; + +use crate::data_type::{ConcreteDataType, DataType}; +use crate::type_id::LogicalTypeId; +use crate::value::Value; +use crate::vectors::MutableVector; + +/// Used to represent the Dictionary datatype. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct DictionaryType { + // Use Box to avoid recursive dependency, as enum ConcreteDataType depends on DictionaryType. + /// The type of Dictionary key. + key_type: Box, + /// The type of Dictionary value. + value_type: Box, +} + +impl Default for DictionaryType { + fn default() -> Self { + DictionaryType::new( + ConcreteDataType::null_datatype(), + ConcreteDataType::null_datatype(), + ) + } +} + +impl DictionaryType { + /// Create a new `DictionaryType` whose item's data type is `item_type`. + pub fn new(key_type: ConcreteDataType, value_type: ConcreteDataType) -> Self { + DictionaryType { + key_type: Box::new(key_type), + value_type: Box::new(value_type), + } + } + + /// Returns the key data type. + #[inline] + pub fn key_type(&self) -> &ConcreteDataType { + &self.key_type + } + + /// Returns the value data type. + #[inline] + pub fn value_type(&self) -> &ConcreteDataType { + &self.value_type + } +} + +impl DataType for DictionaryType { + fn name(&self) -> &str { + "Dictionary" + } + + fn logical_type_id(&self) -> LogicalTypeId { + LogicalTypeId::Dictionary + } + + fn default_value(&self) -> Value { + unimplemented!() + } + + fn as_arrow_type(&self) -> ArrowDataType { + ArrowDataType::Dictionary( + Box::new(self.key_type.as_arrow_type()), + Box::new(self.value_type.as_arrow_type()), + ) + } + + fn create_mutable_vector(&self, _capacity: usize) -> Box { + unimplemented!() + } + + fn is_timestamp_compatible(&self) -> bool { + false + } +} diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index c1a3414f3c..7435fb6add 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -273,6 +273,10 @@ fn to_null_value(output_type: &ConcreteDataType) -> ScalarValue { ConcreteDataType::List(_) => { ScalarValue::List(None, Box::new(new_item_field(output_type.as_arrow_type()))) } + ConcreteDataType::Dictionary(dict) => ScalarValue::Dictionary( + Box::new(dict.key_type().as_arrow_type()), + Box::new(to_null_value(dict.value_type())), + ), } } @@ -513,6 +517,17 @@ impl Ord for ListValue { } } +// TODO(ruihang): Implement this type +/// Dictionary value. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct DictionaryValue { + /// Inner values datatypes + key_type: ConcreteDataType, + value_type: ConcreteDataType, +} + +impl Eq for DictionaryValue {} + impl TryFrom for Value { type Error = error::Error; diff --git a/src/datatypes/src/vectors/eq.rs b/src/datatypes/src/vectors/eq.rs index 5f532622a3..5c582ecf69 100644 --- a/src/datatypes/src/vectors/eq.rs +++ b/src/datatypes/src/vectors/eq.rs @@ -95,7 +95,7 @@ fn equal(lhs: &dyn Vector, rhs: &dyn Vector) -> bool { }, List(_) => is_vector_eq!(ListVector, lhs, rhs), UInt8(_) | UInt16(_) | UInt32(_) | UInt64(_) | Int8(_) | Int16(_) | Int32(_) | Int64(_) - | Float32(_) | Float64(_) => { + | Float32(_) | Float64(_) | Dictionary(_) => { with_match_primitive_type_id!(lhs_type.logical_type_id(), |$T| { let lhs = lhs.as_any().downcast_ref::>().unwrap(); let rhs = rhs.as_any().downcast_ref::>().unwrap(); diff --git a/src/promql/src/extension_plan/range_manipulate.rs b/src/promql/src/extension_plan/range_manipulate.rs index fdbbfb78c2..22b162d7fd 100644 --- a/src/promql/src/extension_plan/range_manipulate.rs +++ b/src/promql/src/extension_plan/range_manipulate.rs @@ -18,7 +18,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use datafusion::arrow::array::{Array, Int64Array, TimestampMillisecondArray}; +use datafusion::arrow::array::{Array, ArrayRef, Int64Array, TimestampMillisecondArray}; use datafusion::arrow::compute; use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::error::ArrowError; @@ -80,6 +80,14 @@ impl RangeManipulate { }) } + pub fn range_timestamp_name(&self) -> String { + Self::build_timestamp_range_name(&self.time_index) + } + + pub fn build_timestamp_range_name(time_index: &str) -> String { + format!("{time_index}_range") + } + fn calculate_output_schema( input_schema: &DFSchemaRef, time_index: &str, @@ -88,8 +96,15 @@ impl RangeManipulate { let mut columns = input_schema.fields().clone(); // process time index column + // the raw timestamp field is preserved. And a new timestamp_range field is appended to the last. let index = input_schema.index_of_column_by_name(None, time_index)?; - columns[index] = DFField::from(RangeArray::convert_field(columns[index].field())); + let timestamp_range_field = columns[index] + .field() + .clone() + .with_name(Self::build_timestamp_range_name(time_index)); + columns.push(DFField::from(RangeArray::convert_field( + ×tamp_range_field, + ))); // process value columns for name in value_columns { @@ -110,6 +125,7 @@ impl RangeManipulate { interval: self.interval, range: self.range, time_index_column: self.time_index.clone(), + time_range_column: self.range_timestamp_name(), value_columns: self.value_columns.clone(), input: exec_input, output_schema: SchemaRef::new(self.output_schema.as_ref().into()), @@ -170,6 +186,7 @@ pub struct RangeManipulateExec { interval: Millisecond, range: Millisecond, time_index_column: String, + time_range_column: String, value_columns: Vec, input: Arc, @@ -213,6 +230,7 @@ impl ExecutionPlan for RangeManipulateExec { interval: self.interval, range: self.range, time_index_column: self.time_index_column.clone(), + time_range_column: self.time_range_column.clone(), value_columns: self.value_columns.clone(), output_schema: self.output_schema.clone(), input: children[0].clone(), @@ -333,10 +351,11 @@ impl RangeManipulateStream { pub fn manipulate(&self, input: RecordBatch) -> ArrowResult { let mut other_columns = (0..input.columns().len()).collect::>(); // calculate the range - let ranges = self.calculate_range(&input); + let (aligned_ts, ranges) = self.calculate_range(&input); + // transform columns let mut new_columns = input.columns().to_vec(); - for index in self.value_columns.iter().chain([self.time_index].iter()) { + for index in self.value_columns.iter() { other_columns.remove(index); let column = input.column(*index); let new_column = Arc::new( @@ -347,26 +366,37 @@ impl RangeManipulateStream { new_columns[*index] = new_column; } + // push timestamp range column + let ts_range_column = + RangeArray::from_ranges(input.column(self.time_index).clone(), ranges.clone()) + .map_err(|e| ArrowError::InvalidArgumentError(e.to_string()))? + .into_dict(); + new_columns.push(Arc::new(ts_range_column)); + // truncate other columns let take_indices = Int64Array::from(vec![0; ranges.len()]); for index in other_columns.into_iter() { new_columns[index] = compute::take(&input.column(index), &take_indices, None)?; } + // replace timestamp with the aligned one + new_columns[self.time_index] = aligned_ts; RecordBatch::try_new(self.output_schema.clone(), new_columns) } - fn calculate_range(&self, input: &RecordBatch) -> Vec<(u32, u32)> { + fn calculate_range(&self, input: &RecordBatch) -> (ArrayRef, Vec<(u32, u32)>) { let ts_column = input .column(self.time_index) .as_any() .downcast_ref::() .unwrap(); - let mut result = vec![]; + let mut aligned_ts = vec![]; + let mut ranges = vec![]; // calculate for every aligned timestamp (`curr_ts`), assume the ts column is ordered. for curr_ts in (self.start..=self.end).step_by(self.interval as _) { + aligned_ts.push(curr_ts); let mut range_start = ts_column.len(); let mut range_end = 0; for (index, ts) in ts_column.values().iter().enumerate() { @@ -380,13 +410,15 @@ impl RangeManipulateStream { } } if range_start > range_end { - result.push((0, 0)); + ranges.push((0, 0)); } else { - result.push((range_start as _, (range_end + 1 - range_start) as _)); + ranges.push((range_start as _, (range_end + 1 - range_start) as _)); } } - result + let aligned_ts_array = Arc::new(TimestampMillisecondArray::from(aligned_ts)) as _; + + (aligned_ts_array, ranges) } } @@ -461,6 +493,7 @@ mod test { range, value_columns, output_schema: manipulate_output_schema, + time_range_column: RangeManipulate::build_timestamp_range_name(&time_index), time_index_column: time_index, input: memory_exec, metric: ExecutionPlanMetricsSet::new(), @@ -499,32 +532,52 @@ mod test { #[tokio::test] async fn interval_30s_range_90s() { let expected = String::from( - "RangeArray { \ + "PrimitiveArray\n[\n \ + 1970-01-01T00:00:00,\n \ + 1970-01-01T00:00:30,\n \ + 1970-01-01T00:01:00,\n \ + 1970-01-01T00:01:30,\n \ + 1970-01-01T00:02:00,\n \ + 1970-01-01T00:02:30,\n \ + 1970-01-01T00:03:00,\n \ + 1970-01-01T00:03:30,\n \ + 1970-01-01T00:04:00,\n \ + 1970-01-01T00:04:30,\n \ + 1970-01-01T00:05:00,\n\ + ]\nRangeArray { \ + base array: PrimitiveArray\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \ + ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \ + }\nRangeArray { \ + base array: PrimitiveArray\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \ + ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \ + }\nStringArray\n[\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n]\n\ + RangeArray { \ base array: PrimitiveArray\n[\n 1970-01-01T00:00:00,\n 1970-01-01T00:00:30,\n 1970-01-01T00:01:00,\n 1970-01-01T00:01:30,\n 1970-01-01T00:02:00,\n 1970-01-01T00:03:00,\n 1970-01-01T00:04:00,\n 1970-01-01T00:04:01,\n 1970-01-01T00:04:31,\n 1970-01-01T00:04:51,\n], \ ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \ - }\nRangeArray { \ - base array: PrimitiveArray\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \ - ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \ - }\nRangeArray { \ - base array: PrimitiveArray\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \ - ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \ - }\nStringArray\n[\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n]"); + }", +); do_normalize_test(0, 310_000, 30_000, 90_000, expected).await; } #[tokio::test] async fn small_empty_range() { let expected = String::from( - "RangeArray { \ + "PrimitiveArray\n[\n \ + 1970-01-01T00:00:00.001,\n \ + 1970-01-01T00:00:03.001,\n \ + 1970-01-01T00:00:06.001,\n \ + 1970-01-01T00:00:09.001,\n\ + ]\nRangeArray { \ + base array: PrimitiveArray\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \ + ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \ + }\nRangeArray { \ + base array: PrimitiveArray\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \ + ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \ + }\nStringArray\n[\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n]\n\ + RangeArray { \ base array: PrimitiveArray\n[\n 1970-01-01T00:00:00,\n 1970-01-01T00:00:30,\n 1970-01-01T00:01:00,\n 1970-01-01T00:01:30,\n 1970-01-01T00:02:00,\n 1970-01-01T00:03:00,\n 1970-01-01T00:04:00,\n 1970-01-01T00:04:01,\n 1970-01-01T00:04:31,\n 1970-01-01T00:04:51,\n], \ ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \ - }\nRangeArray { \ - base array: PrimitiveArray\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \ - ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \ - }\nRangeArray { \ - base array: PrimitiveArray\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \ - ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \ - }\nStringArray\n[\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n]"); + }"); do_normalize_test(1, 10_001, 3_000, 1_000, expected).await; } } diff --git a/src/promql/src/functions/idelta.rs b/src/promql/src/functions/idelta.rs index 1a12cbff38..6d138dbb99 100644 --- a/src/promql/src/functions/idelta.rs +++ b/src/promql/src/functions/idelta.rs @@ -15,11 +15,12 @@ use std::fmt::Display; use std::sync::Arc; -use datafusion::arrow::array::{Float64Array, Int64Array}; +use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray}; +use datafusion::arrow::datatypes::TimeUnit; use datafusion::common::DataFusionError; use datafusion::logical_expr::{ScalarUDF, Signature, TypeSignature, Volatility}; use datafusion::physical_plan::ColumnarValue; -use datatypes::arrow::array::{Array, PrimitiveArray}; +use datatypes::arrow::array::Array; use datatypes::arrow::datatypes::DataType; use crate::error; @@ -55,7 +56,7 @@ impl IDelta { // time index column and value column fn input_type() -> Vec { vec![ - RangeArray::convert_data_type(DataType::Int64), + RangeArray::convert_data_type(DataType::Timestamp(TimeUnit::Millisecond, None)), RangeArray::convert_data_type(DataType::Float64), ] } @@ -82,9 +83,9 @@ impl IDelta { )), )?; error::ensure( - ts_range.value_type() == DataType::Int64, + ts_range.value_type() == DataType::Timestamp(TimeUnit::Millisecond, None), DataFusionError::Execution(format!( - "{}: expect Int64 as time index array's type, found {}", + "{}: expect TimestampMillisecond as time index array's type, found {}", Self::name(), ts_range.value_type() )), @@ -92,7 +93,7 @@ impl IDelta { error::ensure( value_range.value_type() == DataType::Float64, DataFusionError::Execution(format!( - "{}: expect Int64 as time index array's type, found {}", + "{}: expect Float64 as value array's type, found {}", Self::name(), value_range.value_type() )), @@ -105,7 +106,7 @@ impl IDelta { let timestamps = ts_range.get(index).unwrap(); let timestamps = timestamps .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap() .values(); @@ -127,13 +128,13 @@ impl IDelta { let len = timestamps.len(); if len < 2 { - result_array.push(0.0); + result_array.push(None); continue; } // if is delta if !IS_RATE { - result_array.push(values[len - 1] - values[len - 2]); + result_array.push(Some(values[len - 1] - values[len - 2])); continue; } @@ -150,10 +151,10 @@ impl IDelta { last_value - prev_value }; - result_array.push(result_value / sampled_interval as f64); + result_array.push(Some(result_value / sampled_interval as f64)); } - let result = ColumnarValue::Array(Arc::new(PrimitiveArray::from_iter(result_array))); + let result = ColumnarValue::Array(Arc::new(Float64Array::from_iter(result_array))); Ok(result) } } @@ -201,9 +202,11 @@ mod test { #[test] fn basic_idelta_and_irate() { - let ts_array = Arc::new(Int64Array::from_iter([ - 1000, 3000, 5000, 7000, 9000, 11000, 13000, 15000, 17000, - ])); + let ts_array = Arc::new(TimestampMillisecondArray::from_iter( + [1000i64, 3000, 5000, 7000, 9000, 11000, 13000, 15000, 17000] + .into_iter() + .map(Some), + )); let ts_ranges = [(0, 2), (0, 5), (1, 1), (3, 3), (8, 1), (9, 0)]; let values_array = Arc::new(Float64Array::from_iter([ diff --git a/src/promql/src/functions/increase.rs b/src/promql/src/functions/increase.rs index 1c3dede2ca..c0c087584e 100644 --- a/src/promql/src/functions/increase.rs +++ b/src/promql/src/functions/increase.rs @@ -19,7 +19,7 @@ use datafusion::arrow::array::Float64Array; use datafusion::common::DataFusionError; use datafusion::logical_expr::{ScalarUDF, Signature, TypeSignature, Volatility}; use datafusion::physical_plan::ColumnarValue; -use datatypes::arrow::array::{Array, PrimitiveArray}; +use datatypes::arrow::array::Array; use datatypes::arrow::datatypes::DataType; use crate::functions::extract_array; @@ -61,7 +61,7 @@ impl Increase { .values(); if range.len() < 2 { - result_array.push(0.0); + result_array.push(None); continue; } @@ -75,10 +75,10 @@ impl Increase { } } - result_array.push(result_value); + result_array.push(Some(result_value)); } - let result = ColumnarValue::Array(Arc::new(PrimitiveArray::from_iter(result_array))); + let result = ColumnarValue::Array(Arc::new(Float64Array::from_iter(result_array))); Ok(result) } diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 33ded08911..e42d8ef364 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -23,7 +23,7 @@ use datafusion::logical_expr::expr::AggregateFunction; use datafusion::logical_expr::expr_rewriter::normalize_cols; use datafusion::logical_expr::{ AggregateFunction as AggregateFunctionEnum, BinaryExpr, BuiltinScalarFunction, Cast, Extension, - LogicalPlan, LogicalPlanBuilder, Operator, + LogicalPlan, LogicalPlanBuilder, Operator, ScalarUDF, }; use datafusion::optimizer::utils; use datafusion::prelude::{Column, Expr as DfExpr, JoinType}; @@ -48,6 +48,7 @@ use crate::error::{ use crate::extension_plan::{ InstantManipulate, Millisecond, RangeManipulate, SeriesDivide, SeriesNormalize, }; +use crate::functions::{IDelta, Increase}; const LEFT_PLAN_JOIN_ALIAS: &str = "lhs"; @@ -603,8 +604,11 @@ impl PromPlanner { Ok(result) } + /// # Side Effects + /// + /// This method will update [PromPlannerContext]'s value fields. fn create_function_expr( - &self, + &mut self, func: &Function, mut other_input_exprs: Vec, ) -> Result> { @@ -612,26 +616,67 @@ impl PromPlanner { // TODO(ruihang): set this according to in-param list let value_column_pos = 0; - let scalar_func = BuiltinScalarFunction::from_str(func.name).map_err(|_| { - UnsupportedExprSnafu { - name: func.name.to_string(), - } - .build() - })?; + let scalar_func = match func.name { + "increase" => ScalarFunc::Udf(Increase::scalar_udf()), + "idelta" => ScalarFunc::Udf(IDelta::::scalar_udf()), + "irate" => ScalarFunc::Udf(IDelta::::scalar_udf()), + _ => ScalarFunc::DataFusionBuiltin( + BuiltinScalarFunction::from_str(func.name).map_err(|_| { + UnsupportedExprSnafu { + name: func.name.to_string(), + } + .build() + })?, + ), + }; // TODO(ruihang): handle those functions doesn't require input let mut exprs = Vec::with_capacity(self.ctx.value_columns.len()); for value in &self.ctx.value_columns { let col_expr = DfExpr::Column(Column::from_name(value)); - other_input_exprs.insert(value_column_pos, col_expr); - let fn_expr = DfExpr::ScalarFunction { - fun: scalar_func.clone(), - args: other_input_exprs.clone(), - }; - exprs.push(fn_expr); - other_input_exprs.remove(value_column_pos); + + match scalar_func.clone() { + ScalarFunc::DataFusionBuiltin(fun) => { + other_input_exprs.insert(value_column_pos, col_expr); + let fn_expr = DfExpr::ScalarFunction { + fun, + args: other_input_exprs.clone(), + }; + exprs.push(fn_expr); + other_input_exprs.remove(value_column_pos); + } + ScalarFunc::Udf(fun) => { + let ts_range_expr = DfExpr::Column(Column::from_name( + RangeManipulate::build_timestamp_range_name( + self.ctx.time_index_column.as_ref().unwrap(), + ), + )); + other_input_exprs.insert(value_column_pos, ts_range_expr); + other_input_exprs.insert(value_column_pos + 1, col_expr); + let fn_expr = DfExpr::ScalarUDF { + fun: Arc::new(fun), + args: other_input_exprs.clone(), + }; + exprs.push(fn_expr); + other_input_exprs.remove(value_column_pos + 1); + other_input_exprs.remove(value_column_pos); + } + } } + // update value columns' name, and alias them to remove qualifiers + let mut new_value_columns = Vec::with_capacity(exprs.len()); + exprs = exprs + .into_iter() + .map(|expr| { + let display_name = expr.display_name()?; + new_value_columns.push(display_name.clone()); + Ok(expr.alias(display_name)) + }) + .collect::, _>>() + .context(DataFusionPlanningSnafu)?; + self.ctx.value_columns = new_value_columns; + Ok(exprs) } @@ -694,8 +739,8 @@ impl PromPlanner { token::T_MIN => AggregateFunctionEnum::Min, token::T_MAX => AggregateFunctionEnum::Max, token::T_GROUP => AggregateFunctionEnum::Grouping, - token::T_STDDEV => AggregateFunctionEnum::Stddev, - token::T_STDVAR => AggregateFunctionEnum::Variance, + token::T_STDDEV => AggregateFunctionEnum::StddevPop, + token::T_STDVAR => AggregateFunctionEnum::VariancePop, token::T_TOPK | token::T_BOTTOMK | token::T_COUNT_VALUES | token::T_QUANTILE => { UnsupportedExprSnafu { name: format!("{op:?}"), @@ -919,6 +964,12 @@ struct FunctionArgs { literals: Vec, } +#[derive(Debug, Clone)] +enum ScalarFunc { + DataFusionBuiltin(BuiltinScalarFunction), + Udf(ScalarUDF), +} + #[cfg(test)] mod test { use std::time::{Duration, UNIX_EPOCH}; @@ -1028,8 +1079,8 @@ mod test { let plan = PromPlanner::stmt_to_plan(eval_stmt, context_provider).unwrap(); let expected = String::from( - "Filter: some_metric.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, tag_0:Utf8]\ - \n Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, tag_0:Utf8]\ + "Filter: TEMPLATE(field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\ + \n Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) AS TEMPLATE(field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\ \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ @@ -1211,9 +1262,11 @@ mod test { // }, // }, // }, - async fn do_aggregate_expr_plan(name: &str) { - let prom_expr = - parser::parse(&format!("{name} by (tag_1)(some_metric{{tag_0!=\"bar\"}})",)).unwrap(); + async fn do_aggregate_expr_plan(fn_name: &str, plan_name: &str) { + let prom_expr = parser::parse(&format!( + "{fn_name} by (tag_1)(some_metric{{tag_0!=\"bar\"}})", + )) + .unwrap(); let mut eval_stmt = EvalStmt { expr: prom_expr, start: UNIX_EPOCH, @@ -1236,7 +1289,7 @@ mod test { \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.tag_1 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ \n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]" - ).replace("TEMPLATE", name); + ).replace("TEMPLATE", plan_name); assert_eq!( plan.display_indent_schema().to_string(), expected_no_without @@ -1259,75 +1312,74 @@ mod test { \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.tag_1 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\ \n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]" - ).replace("TEMPLATE", name); + ).replace("TEMPLATE", plan_name); assert_eq!(plan.display_indent_schema().to_string(), expected_without); } #[tokio::test] async fn aggregate_sum() { - do_aggregate_expr_plan("SUM").await; + do_aggregate_expr_plan("sum", "SUM").await; } #[tokio::test] async fn aggregate_avg() { - do_aggregate_expr_plan("AVG").await; + do_aggregate_expr_plan("avg", "AVG").await; } #[tokio::test] #[should_panic] // output type doesn't match async fn aggregate_count() { - do_aggregate_expr_plan("COUNT").await; + do_aggregate_expr_plan("count", "COUNT").await; } #[tokio::test] async fn aggregate_min() { - do_aggregate_expr_plan("MIN").await; + do_aggregate_expr_plan("min", "MIN").await; } #[tokio::test] async fn aggregate_max() { - do_aggregate_expr_plan("MAX").await; + do_aggregate_expr_plan("max", "MAX").await; } #[tokio::test] #[should_panic] // output type doesn't match async fn aggregate_group() { - do_aggregate_expr_plan("GROUPING").await; + do_aggregate_expr_plan("grouping", "GROUPING").await; } #[tokio::test] async fn aggregate_stddev() { - do_aggregate_expr_plan("STDDEV").await; + do_aggregate_expr_plan("stddev", "STDDEVPOP").await; } #[tokio::test] - #[should_panic] // schema doesn't match async fn aggregate_stdvar() { - do_aggregate_expr_plan("STDVAR").await; + do_aggregate_expr_plan("stdvar", "VARIANCEPOP").await; } #[tokio::test] #[should_panic] async fn aggregate_top_k() { - do_aggregate_expr_plan("").await; + do_aggregate_expr_plan("topk", "").await; } #[tokio::test] #[should_panic] async fn aggregate_bottom_k() { - do_aggregate_expr_plan("").await; + do_aggregate_expr_plan("bottomk", "").await; } #[tokio::test] #[should_panic] async fn aggregate_count_values() { - do_aggregate_expr_plan("").await; + do_aggregate_expr_plan("count_values", "").await; } #[tokio::test] #[should_panic] async fn aggregate_quantile() { - do_aggregate_expr_plan("").await; + do_aggregate_expr_plan("quantile", "").await; } // TODO(ruihang): add range fn tests once exprs are ready. @@ -1475,6 +1527,24 @@ mod test { indie_query_plan_compare(query, expected).await; } + #[tokio::test] + async fn increase_aggr() { + let query = "increase(some_metric[5m])"; + let expected = String::from( + "Filter: prom_increase(timestamp_range,field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0):Float64;N, tag_0:Utf8]\ + \n Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0) AS prom_increase(timestamp_range,field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0):Float64;N, tag_0:Utf8]\ + \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\ + \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" + + ); + + indie_query_plan_compare(query, expected).await; + } + #[tokio::test] async fn less_filter_on_value() { let query = "some_metric < 1.2345"; @@ -1486,6 +1556,7 @@ mod test { \n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n TableScan: some_metric, unsupported_filters=[timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]" + ); indie_query_plan_compare(query, expected).await; diff --git a/src/script/src/python/vector.rs b/src/script/src/python/vector.rs index 0c12dbe447..c3c11970cc 100644 --- a/src/script/src/python/vector.rs +++ b/src/script/src/python/vector.rs @@ -783,6 +783,7 @@ pub fn pyobj_try_to_typed_val( } } ConcreteDataType::List(_) => unreachable!(), + ConcreteDataType::Dictionary(_) => unreachable!(), ConcreteDataType::Date(_) | ConcreteDataType::DateTime(_) | ConcreteDataType::Timestamp(_) => { diff --git a/src/servers/src/postgres/handler.rs b/src/servers/src/postgres/handler.rs index 8a991c00e5..a79ad20f67 100644 --- a/src/servers/src/postgres/handler.rs +++ b/src/servers/src/postgres/handler.rs @@ -285,7 +285,7 @@ fn type_gt_to_pg(origin: &ConcreteDataType) -> Result { &ConcreteDataType::Date(_) => Ok(Type::DATE), &ConcreteDataType::DateTime(_) => Ok(Type::TIMESTAMP), &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP), - &ConcreteDataType::List(_) => error::InternalSnafu { + &ConcreteDataType::List(_) | &ConcreteDataType::Dictionary(_) => error::InternalSnafu { err_msg: format!("not implemented for column datatype {origin:?}"), } .fail(),