diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 98539e8f05..37a535902c 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -291,6 +291,7 @@ impl TryFrom for ColumnDataTypeWrapper { ConcreteDataType::Vector(_) => ColumnDataType::Vector, ConcreteDataType::Null(_) | ConcreteDataType::List(_) + | ConcreteDataType::Struct(_) | ConcreteDataType::Dictionary(_) | ConcreteDataType::Duration(_) => { return error::IntoColumnDataTypeSnafu { from: datatype }.fail() @@ -703,6 +704,7 @@ pub fn pb_values_to_vector_ref(data_type: &ConcreteDataType, values: Values) -> ConcreteDataType::Vector(_) => Arc::new(BinaryVector::from_vec(values.binary_values)), ConcreteDataType::Null(_) | ConcreteDataType::List(_) + | ConcreteDataType::Struct(_) | ConcreteDataType::Dictionary(_) | ConcreteDataType::Duration(_) | ConcreteDataType::Json(_) => { @@ -864,6 +866,7 @@ pub fn pb_values_to_values(data_type: &ConcreteDataType, values: Values) -> Vec< ConcreteDataType::Vector(_) => values.binary_values.into_iter().map(|v| v.into()).collect(), ConcreteDataType::Null(_) | ConcreteDataType::List(_) + | ConcreteDataType::Struct(_) | ConcreteDataType::Dictionary(_) | ConcreteDataType::Duration(_) | ConcreteDataType::Json(_) => { diff --git a/src/common/grpc/src/select.rs b/src/common/grpc/src/select.rs index 1cca03dc0f..8886c90f67 100644 --- a/src/common/grpc/src/select.rs +++ b/src/common/grpc/src/select.rs @@ -70,7 +70,7 @@ macro_rules! convert_arrow_array_to_grpc_vals { return Ok(vals); }, )+ - ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) | ConcreteDataType::Duration(_) | ConcreteDataType::Json(_) => unreachable!("Should not send {:?} in gRPC", $data_type), + ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Struct(_) | ConcreteDataType::Dictionary(_) | ConcreteDataType::Duration(_) | ConcreteDataType::Json(_) => unreachable!("Should not send {:?} in gRPC", $data_type), } }}; } diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index b954a1f2e0..f51d5c28a6 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -34,9 +34,9 @@ use crate::types::{ DurationMillisecondType, DurationNanosecondType, DurationSecondType, DurationType, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalType, IntervalYearMonthType, JsonType, ListType, NullType, - StringType, TimeMillisecondType, TimeType, TimestampMicrosecondType, TimestampMillisecondType, - TimestampNanosecondType, TimestampSecondType, TimestampType, UInt16Type, UInt32Type, - UInt64Type, UInt8Type, VectorType, + StringType, StructType, TimeMillisecondType, TimeType, TimestampMicrosecondType, + TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, TimestampType, + UInt16Type, UInt32Type, UInt64Type, UInt8Type, VectorType, }; use crate::value::Value; use crate::vectors::MutableVector; @@ -80,6 +80,7 @@ pub enum ConcreteDataType { // Compound types: List(ListType), Dictionary(DictionaryType), + Struct(StructType), // JSON type: Json(JsonType), @@ -131,6 +132,7 @@ impl fmt::Display for ConcreteDataType { }, ConcreteDataType::Decimal128(v) => write!(f, "{}", v.name()), ConcreteDataType::List(v) => write!(f, "{}", v.name()), + ConcreteDataType::Struct(v) => write!(f, "{}", v.name()), ConcreteDataType::Dictionary(v) => write!(f, "{}", v.name()), ConcreteDataType::Json(v) => write!(f, "{}", v.name()), ConcreteDataType::Vector(v) => write!(f, "{}", v.name()), @@ -406,9 +408,12 @@ impl ConcreteDataType { &ConcreteDataType::Duration(_) | &ConcreteDataType::Dictionary(_) | &ConcreteDataType::Vector(_) - | &ConcreteDataType::List(_) => "UNKNOWN", + | &ConcreteDataType::List(_) + | &ConcreteDataType::Struct(_) => "UNKNOWN", }, - &ConcreteDataType::Duration(_) | &ConcreteDataType::Dictionary(_) => "UNKNOWN", + &ConcreteDataType::Duration(_) + | &ConcreteDataType::Dictionary(_) + | &ConcreteDataType::Struct(_) => "UNKNOWN", } } } @@ -457,7 +462,20 @@ impl TryFrom<&ArrowDataType> for ConcreteDataType { ArrowDataType::Decimal128(precision, scale) => { ConcreteDataType::decimal128_datatype(*precision, *scale) } - _ => { + ArrowDataType::Struct(fields) => ConcreteDataType::Struct(fields.try_into()?), + ArrowDataType::Float16 + | ArrowDataType::Date64 + | ArrowDataType::FixedSizeBinary(_) + | ArrowDataType::BinaryView + | ArrowDataType::Utf8View + | ArrowDataType::ListView(_) + | ArrowDataType::FixedSizeList(_, _) + | ArrowDataType::LargeList(_) + | ArrowDataType::LargeListView(_) + | ArrowDataType::Union(_, _) + | ArrowDataType::Decimal256(_, _) + | ArrowDataType::Map(_, _) + | ArrowDataType::RunEndEncoded(_, _) => { return error::UnsupportedArrowTypeSnafu { arrow_type: dt.clone(), } @@ -613,6 +631,10 @@ impl ConcreteDataType { ConcreteDataType::List(ListType::new(item_type)) } + pub fn struct_datatype(fields: StructType) -> ConcreteDataType { + ConcreteDataType::Struct(fields) + } + pub fn dictionary_datatype( key_type: ConcreteDataType, value_type: ConcreteDataType, diff --git a/src/datatypes/src/type_id.rs b/src/datatypes/src/type_id.rs index 3fa3bb9b9e..52e09d8751 100644 --- a/src/datatypes/src/type_id.rs +++ b/src/datatypes/src/type_id.rs @@ -65,6 +65,7 @@ pub enum LogicalTypeId { List, Dictionary, + Struct, Json, @@ -108,6 +109,7 @@ impl LogicalTypeId { LogicalTypeId::List => { ConcreteDataType::list_datatype(ConcreteDataType::null_datatype()) } + LogicalTypeId::Struct => ConcreteDataType::struct_datatype(vec![].into()), LogicalTypeId::Dictionary => ConcreteDataType::dictionary_datatype( ConcreteDataType::null_datatype(), ConcreteDataType::null_datatype(), diff --git a/src/datatypes/src/types.rs b/src/datatypes/src/types.rs index 6f7213be49..d22151908a 100644 --- a/src/datatypes/src/types.rs +++ b/src/datatypes/src/types.rs @@ -25,6 +25,7 @@ mod list_type; mod null_type; mod primitive_type; mod string_type; +mod struct_type; mod time_type; mod timestamp_type; mod vector_type; @@ -52,6 +53,7 @@ pub use primitive_type::{ OrdPrimitive, UInt16Type, UInt32Type, UInt64Type, UInt8Type, WrapperType, }; pub use string_type::StringType; +pub use struct_type::{StructField, StructType}; pub use time_type::{ TimeMicrosecondType, TimeMillisecondType, TimeNanosecondType, TimeSecondType, TimeType, }; diff --git a/src/datatypes/src/types/struct_type.rs b/src/datatypes/src/types/struct_type.rs new file mode 100644 index 0000000000..2b59a0d603 --- /dev/null +++ b/src/datatypes/src/types/struct_type.rs @@ -0,0 +1,134 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use arrow::datatypes::{DataType as ArrowDataType, Field}; +use arrow_schema::Fields; +use serde::{Deserialize, Serialize}; + +use crate::prelude::{ConcreteDataType, DataType, LogicalTypeId}; +use crate::value::Value; + +#[derive(Clone, Debug, Default, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] +pub struct StructType { + fields: Vec, +} + +impl TryFrom<&Fields> for StructType { + type Error = crate::error::Error; + fn try_from(value: &Fields) -> Result { + let fields = value + .iter() + .map(|field| { + Ok(StructField::new( + field.name().to_string(), + ConcreteDataType::try_from(field.data_type())?, + field.is_nullable(), + )) + }) + .collect::, Self::Error>>()?; + Ok(StructType { fields }) + } +} + +impl From> for StructType { + fn from(fields: Vec) -> Self { + StructType { fields } + } +} + +impl DataType for StructType { + fn name(&self) -> String { + format!( + "Struct<{}>", + self.fields + .iter() + .map(|f| f.name()) + .collect::>() + .join(", ") + ) + } + + fn logical_type_id(&self) -> LogicalTypeId { + LogicalTypeId::Struct + } + + fn default_value(&self) -> Value { + Value::Null + } + + fn as_arrow_type(&self) -> ArrowDataType { + let fields = self + .fields + .iter() + .map(|f| Field::new(f.name.clone(), f.data_type.as_arrow_type(), f.nullable)) + .collect(); + ArrowDataType::Struct(fields) + } + + fn create_mutable_vector(&self, _capacity: usize) -> Box { + unimplemented!("What is the mutable vector for StructVector?"); + } + + fn try_cast(&self, _from: Value) -> Option { + // TODO(discord9): what is the meaning of casting from Value to StructFields? + None + } +} + +impl StructType { + pub fn new(fields: Vec) -> Self { + StructType { fields } + } + + pub fn fields(&self) -> &[StructField] { + &self.fields + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] +pub struct StructField { + name: String, + data_type: ConcreteDataType, + nullable: bool, +} + +impl StructField { + pub fn new(name: String, data_type: ConcreteDataType, nullable: bool) -> Self { + StructField { + name, + data_type, + nullable, + } + } + + pub fn name(&self) -> &str { + &self.name + } + + pub fn data_type(&self) -> &ConcreteDataType { + &self.data_type + } + + pub fn is_nullable(&self) -> bool { + self.nullable + } + + pub fn to_df_field(&self) -> Field { + Field::new( + self.name.clone(), + self.data_type.as_arrow_type(), + self.nullable, + ) + } +} diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index ba630d4e64..d417d3ccbb 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -28,6 +28,7 @@ use common_time::interval::IntervalUnit; use common_time::time::Time; use common_time::timestamp::{TimeUnit, Timestamp}; use common_time::{Duration, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timezone}; +use datafusion_common::scalar::ScalarStructBuilder; use datafusion_common::ScalarValue; use greptime_proto::v1::value::ValueData; pub use ordered_float::OrderedFloat; @@ -540,6 +541,14 @@ pub fn to_null_scalar_value(output_type: &ConcreteDataType) -> Result { + let fields = fields + .fields() + .iter() + .map(|f| f.to_df_field()) + .collect::>(); + ScalarStructBuilder::new_null(fields) + } ConcreteDataType::Dictionary(dict) => ScalarValue::Dictionary( Box::new(dict.key_type().as_arrow_type()), Box::new(to_null_scalar_value(dict.value_type())?), diff --git a/src/datatypes/src/vectors.rs b/src/datatypes/src/vectors.rs index 7d334d8638..b11c8a3f7a 100644 --- a/src/datatypes/src/vectors.rs +++ b/src/datatypes/src/vectors.rs @@ -40,6 +40,7 @@ mod null; pub(crate) mod operations; mod primitive; mod string; +mod struct_vector; mod time; mod timestamp; mod validity; diff --git a/src/datatypes/src/vectors/eq.rs b/src/datatypes/src/vectors/eq.rs index 7e642c4e58..ac01cd3538 100644 --- a/src/datatypes/src/vectors/eq.rs +++ b/src/datatypes/src/vectors/eq.rs @@ -19,6 +19,7 @@ use common_time::interval::IntervalUnit; use crate::data_type::DataType; use crate::types::{DurationType, TimeType, TimestampType}; use crate::vectors::constant::ConstantVector; +use crate::vectors::struct_vector::StructVector; use crate::vectors::{ BinaryVector, BooleanVector, DateVector, Decimal128Vector, DurationMicrosecondVector, DurationMillisecondVector, DurationNanosecondVector, DurationSecondVector, @@ -109,6 +110,7 @@ fn equal(lhs: &dyn Vector, rhs: &dyn Vector) -> bool { } }, List(_) => is_vector_eq!(ListVector, lhs, rhs), + Struct(_) => is_vector_eq!(StructVector, lhs, rhs), UInt8(_) | UInt16(_) | UInt32(_) | UInt64(_) | Int8(_) | Int16(_) | Int32(_) | Int64(_) | Float32(_) | Float64(_) | Dictionary(_) => { with_match_primitive_type_id!(lhs_type.logical_type_id(), |$T| { diff --git a/src/datatypes/src/vectors/helper.rs b/src/datatypes/src/vectors/helper.rs index 4e23d56809..4e8dd607de 100644 --- a/src/datatypes/src/vectors/helper.rs +++ b/src/datatypes/src/vectors/helper.rs @@ -21,7 +21,7 @@ use arrow::array::{Array, ArrayRef, StringArray}; use arrow::compute; use arrow::compute::kernels::comparison; use arrow::datatypes::{DataType as ArrowDataType, Int64Type, TimeUnit}; -use arrow_array::DictionaryArray; +use arrow_array::{DictionaryArray, StructArray}; use arrow_schema::IntervalUnit; use datafusion_common::ScalarValue; use snafu::{OptionExt, ResultExt}; @@ -31,6 +31,7 @@ use crate::error::{self, ConvertArrowArrayToScalarsSnafu, Result}; use crate::prelude::DataType; use crate::scalars::{Scalar, ScalarVectorBuilder}; use crate::value::{ListValue, ListValueRef, Value}; +use crate::vectors::struct_vector::StructVector; use crate::vectors::{ BinaryVector, BooleanVector, ConstantVector, DateVector, Decimal128Vector, DictionaryVector, DurationMicrosecondVector, DurationMillisecondVector, DurationNanosecondVector, @@ -359,10 +360,18 @@ impl Helper { ConcreteDataType::try_from(value.as_ref())?, )?) } + + ArrowDataType::Struct(_fields) => { + let array = array + .as_ref() + .as_any() + .downcast_ref::() + .unwrap(); + Arc::new(StructVector::new(array.clone())?) + } ArrowDataType::Float16 | ArrowDataType::LargeList(_) | ArrowDataType::FixedSizeList(_, _) - | ArrowDataType::Struct(_) | ArrowDataType::Union(_, _) | ArrowDataType::Dictionary(_, _) | ArrowDataType::Decimal256(_, _) diff --git a/src/datatypes/src/vectors/struct_vector.rs b/src/datatypes/src/vectors/struct_vector.rs new file mode 100644 index 0000000000..85445eb24d --- /dev/null +++ b/src/datatypes/src/vectors/struct_vector.rs @@ -0,0 +1,181 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use arrow::compute::TakeOptions; +use arrow_array::{Array, ArrayRef, StructArray}; +use serde_json::Value as JsonValue; +use snafu::ResultExt; + +use crate::error::{self, ArrowComputeSnafu, Result, UnsupportedOperationSnafu}; +use crate::prelude::ConcreteDataType; +use crate::serialize::Serializable; +use crate::value::{Value, ValueRef}; +use crate::vectors::operations::VectorOp; +use crate::vectors::{self, Helper, Validity, Vector, VectorRef}; + +/// A simple wrapper around `StructArray` to represent a vector of structs in GreptimeDB. +#[derive(Debug, PartialEq)] +pub struct StructVector { + array: StructArray, + data_type: ConcreteDataType, +} + +#[allow(unused)] +impl StructVector { + pub fn new(array: StructArray) -> Result { + let fields = array.fields(); + let data_type = ConcreteDataType::Struct(fields.try_into()?); + Ok(StructVector { array, data_type }) + } + + pub fn array(&self) -> &StructArray { + &self.array + } + + pub fn as_arrow(&self) -> &dyn Array { + &self.array + } +} + +impl Vector for StructVector { + fn data_type(&self) -> ConcreteDataType { + self.data_type.clone() + } + + fn vector_type_name(&self) -> String { + "StructVector".to_string() + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn len(&self) -> usize { + self.array.len() + } + + fn to_arrow_array(&self) -> ArrayRef { + Arc::new(self.array.clone()) + } + + fn to_boxed_arrow_array(&self) -> Box { + Box::new(self.array.clone()) + } + + fn validity(&self) -> Validity { + vectors::impl_validity_for_vector!(self.array) + } + + fn memory_size(&self) -> usize { + self.array.get_buffer_memory_size() + } + + fn null_count(&self) -> usize { + self.array.null_count() + } + + fn is_null(&self, row: usize) -> bool { + self.array.is_null(row) + } + + fn slice(&self, offset: usize, length: usize) -> VectorRef { + Arc::new(StructVector { + array: self.array.slice(offset, length), + data_type: self.data_type.clone(), + }) + } + + fn get(&self, _: usize) -> Value { + unimplemented!("StructValue not supported yet") + } + + fn get_ref(&self, _: usize) -> ValueRef { + unimplemented!("StructValue not supported yet") + } +} + +impl VectorOp for StructVector { + fn replicate(&self, offsets: &[usize]) -> VectorRef { + let column_arrays = self + .array + .columns() + .iter() + .map(|col| { + let vector = Helper::try_into_vector(col) + .expect("Failed to replicate struct vector columns"); + vector.replicate(offsets).to_arrow_array() + }) + .collect::>(); + let replicated_array = StructArray::new( + self.array.fields().clone(), + column_arrays, + self.array.nulls().cloned(), + ); + Arc::new( + StructVector::new(replicated_array).expect("Failed to create replicated StructVector"), + ) + } + + fn cast(&self, _to_type: &ConcreteDataType) -> Result { + UnsupportedOperationSnafu { + op: "cast", + vector_type: self.vector_type_name(), + } + .fail() + } + + fn filter(&self, filter: &vectors::BooleanVector) -> Result { + let filtered = + datafusion_common::arrow::compute::filter(&self.array, filter.as_boolean_array()) + .context(ArrowComputeSnafu) + .and_then(Helper::try_into_vector)?; + Ok(filtered) + } + + fn take(&self, indices: &vectors::UInt32Vector) -> Result { + let take_result = datafusion_common::arrow::compute::take( + &self.array, + indices.as_arrow(), + Some(TakeOptions { check_bounds: true }), + ) + .context(ArrowComputeSnafu) + .and_then(Helper::try_into_vector)?; + Ok(take_result) + } +} + +impl Serializable for StructVector { + fn serialize_to_json(&self) -> Result> { + let mut result = serde_json::Map::new(); + for (field, value) in self.array.fields().iter().zip(self.array.columns().iter()) { + let value_vector = Helper::try_into_vector(value)?; + + let field_value = value_vector.serialize_to_json()?; + result.insert(field.name().clone(), JsonValue::Array(field_value)); + } + let fields = JsonValue::Object(result); + let data_type = serde_json::to_value(&self.data_type).context(error::SerializeSnafu)?; + Ok(vec![JsonValue::Object( + [ + ("fields".to_string(), fields), + ("data_type".to_string(), data_type), + ] + .iter() + .cloned() + .collect(), + )]) + } +} diff --git a/src/mito-codec/src/row_converter/dense.rs b/src/mito-codec/src/row_converter/dense.rs index 391529a8e4..6bd1bdde8f 100644 --- a/src/mito-codec/src/row_converter/dense.rs +++ b/src/mito-codec/src/row_converter/dense.rs @@ -78,6 +78,7 @@ impl SortField { ConcreteDataType::Decimal128(_) => 19, ConcreteDataType::Null(_) | ConcreteDataType::List(_) + | ConcreteDataType::Struct(_) | ConcreteDataType::Dictionary(_) => 0, } } @@ -134,6 +135,7 @@ impl SortField { .context(SerializeFieldSnafu)?; } ConcreteDataType::List(_) | + ConcreteDataType::Struct(_) | ConcreteDataType::Dictionary(_) | ConcreteDataType::Null(_) => { return error::NotSupportedFieldSnafu { @@ -218,6 +220,10 @@ impl SortField { data_type: ConcreteDataType::List(l.clone()), } .fail(), + ConcreteDataType::Struct(f) => NotSupportedFieldSnafu { + data_type: ConcreteDataType::Struct(f.clone()), + } + .fail(), ConcreteDataType::Dictionary(d) => NotSupportedFieldSnafu { data_type: ConcreteDataType::Dictionary(d.clone()), } @@ -301,6 +307,7 @@ impl SortField { ConcreteDataType::Decimal128(_) => 19, ConcreteDataType::Null(_) | ConcreteDataType::List(_) + | ConcreteDataType::Struct(_) | ConcreteDataType::Dictionary(_) => 0, }; deserializer.advance(to_skip); diff --git a/src/servers/src/postgres/types.rs b/src/servers/src/postgres/types.rs index f3c2781c29..397445bfc6 100644 --- a/src/servers/src/postgres/types.rs +++ b/src/servers/src/postgres/types.rs @@ -469,7 +469,8 @@ pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result { &ConcreteDataType::Duration(_) => Ok(Type::INTERVAL_ARRAY), &ConcreteDataType::Dictionary(_) | &ConcreteDataType::Vector(_) - | &ConcreteDataType::List(_) => server_error::UnsupportedDataTypeSnafu { + | &ConcreteDataType::List(_) + | &ConcreteDataType::Struct(_) => server_error::UnsupportedDataTypeSnafu { data_type: origin, reason: "not implemented", } @@ -481,6 +482,11 @@ pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result { } .fail(), &ConcreteDataType::Duration(_) => Ok(Type::INTERVAL), + &ConcreteDataType::Struct(_) => server_error::UnsupportedDataTypeSnafu { + data_type: origin, + reason: "not implemented", + } + .fail(), } } diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index 9cad86ca63..14646732c0 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -308,6 +308,7 @@ pub fn concrete_data_type_to_sql_data_type(data_type: &ConcreteDataType) -> Resu ConcreteDataType::Duration(_) | ConcreteDataType::Null(_) | ConcreteDataType::List(_) + | ConcreteDataType::Struct(_) | ConcreteDataType::Dictionary(_) => error::ConcreteTypeNotSupportedSnafu { t: data_type.clone(), }