diff --git a/Cargo.lock b/Cargo.lock index c0f9ade4c6..2607814b16 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5325,7 +5325,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=3e821d0d405e6733690a4e4352812ba2ff780a3e#3e821d0d405e6733690a4e4352812ba2ff780a3e" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=d75496d5d09dedcd0edcade57ccf0a522f4393ae#d75496d5d09dedcd0edcade57ccf0a522f4393ae" dependencies = [ "prost 0.13.5", "prost-types 0.13.5", diff --git a/Cargo.toml b/Cargo.toml index ea77ea92f5..1185c9b2d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -147,7 +147,7 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "3e821d0d405e6733690a4e4352812ba2ff780a3e" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "d75496d5d09dedcd0edcade57ccf0a522f4393ae" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 88b34381f0..dca0f40b6b 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -15,7 +15,6 @@ use std::collections::HashSet; use std::sync::Arc; -use common_base::BitVec; use common_decimal::Decimal128; use common_decimal::decimal128::{DECIMAL128_DEFAULT_SCALE, DECIMAL128_MAX_PRECISION}; use common_time::time::Time; @@ -24,9 +23,12 @@ use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth use datatypes::prelude::{ConcreteDataType, ValueRef}; use datatypes::scalars::ScalarVector; use datatypes::types::{ - Int8Type, Int16Type, IntervalType, TimeType, TimestampType, UInt8Type, UInt16Type, + Int8Type, Int16Type, IntervalType, StructField, StructType, TimeType, TimestampType, UInt8Type, + UInt16Type, +}; +use datatypes::value::{ + ListValue, ListValueRef, OrderedF32, OrderedF64, StructValue, StructValueRef, Value, }; -use datatypes::value::{OrderedF32, OrderedF64, Value}; use datatypes::vectors::{ BinaryVector, BooleanVector, DateVector, Decimal128Vector, Float32Vector, Float64Vector, Int32Vector, Int64Vector, IntervalDayTimeVector, IntervalMonthDayNanoVector, @@ -42,14 +44,14 @@ use greptime_proto::v1::query_request::Query; use greptime_proto::v1::value::ValueData; use greptime_proto::v1::{ self, ColumnDataTypeExtension, DdlRequest, DecimalTypeExtension, JsonTypeExtension, - QueryRequest, Row, SemanticType, VectorTypeExtension, + ListTypeExtension, QueryRequest, Row, SemanticType, StructTypeExtension, VectorTypeExtension, }; use paste::paste; use snafu::prelude::*; use crate::error::{self, InconsistentTimeUnitSnafu, InvalidTimeUnitSnafu, Result}; use crate::v1::column::Values; -use crate::v1::{Column, ColumnDataType, Value as GrpcValue}; +use crate::v1::{ColumnDataType, Value as GrpcValue}; /// ColumnDataTypeWrapper is a wrapper of ColumnDataType and ColumnDataTypeExtension. /// It could be used to convert with ConcreteDataType. @@ -85,7 +87,7 @@ impl ColumnDataTypeWrapper { /// Get a tuple of ColumnDataType and ColumnDataTypeExtension. pub fn to_parts(&self) -> (ColumnDataType, Option) { - (self.datatype, self.datatype_ext) + (self.datatype, self.datatype_ext.clone()) } } @@ -159,6 +161,45 @@ impl From for ConcreteDataType { ConcreteDataType::vector_default_datatype() } } + ColumnDataType::List => { + if let Some(TypeExt::ListType(d)) = datatype_wrapper + .datatype_ext + .as_ref() + .and_then(|datatype_ext| datatype_ext.type_ext.as_ref()) + { + let item_type = ColumnDataTypeWrapper { + datatype: d.datatype(), + datatype_ext: d.datatype_extension.clone().map(|d| *d), + }; + ConcreteDataType::list_datatype(item_type.into()) + } else { + // invalid state: type extension not found + ConcreteDataType::null_datatype() + } + } + ColumnDataType::Struct => { + if let Some(TypeExt::StructType(d)) = datatype_wrapper + .datatype_ext + .as_ref() + .and_then(|datatype_ext| datatype_ext.type_ext.as_ref()) + { + let fields = d + .fields + .iter() + .map(|f| { + let field_type = ColumnDataTypeWrapper { + datatype: f.datatype(), + datatype_ext: f.datatype_extension.clone(), + }; + StructField::new(f.name.to_string(), field_type.into(), true) + }) + .collect::>(); + ConcreteDataType::struct_datatype(StructType::from(fields)) + } else { + // invalid state: type extension not found + ConcreteDataType::null_datatype() + } + } } } } @@ -249,6 +290,39 @@ impl ColumnDataTypeWrapper { }), } } + + /// Create a list datatype with the given item type. + pub fn list_datatype(item_type: ColumnDataTypeWrapper) -> Self { + ColumnDataTypeWrapper { + datatype: ColumnDataType::List, + datatype_ext: Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::ListType(Box::new(ListTypeExtension { + datatype: item_type.datatype() as i32, + datatype_extension: item_type.datatype_ext.map(Box::new), + }))), + }), + } + } + + /// Create a struct datatype with the given field tuples (name, datatype). + pub fn struct_datatype(fields: Vec<(String, ColumnDataTypeWrapper)>) -> Self { + let struct_fields = fields + .into_iter() + .map(|(name, datatype)| greptime_proto::v1::StructField { + name, + datatype: datatype.datatype() as i32, + datatype_extension: datatype.datatype_ext, + }) + .collect(); + ColumnDataTypeWrapper { + datatype: ColumnDataType::Struct, + datatype_ext: Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::StructType(StructTypeExtension { + fields: struct_fields, + })), + }), + } + } } impl TryFrom for ColumnDataTypeWrapper { @@ -290,9 +364,9 @@ impl TryFrom for ColumnDataTypeWrapper { ConcreteDataType::Decimal128(_) => ColumnDataType::Decimal128, ConcreteDataType::Json(_) => ColumnDataType::Json, ConcreteDataType::Vector(_) => ColumnDataType::Vector, + ConcreteDataType::List(_) => ColumnDataType::List, + ConcreteDataType::Struct(_) => ColumnDataType::Struct, ConcreteDataType::Null(_) - | ConcreteDataType::List(_) - | ConcreteDataType::Struct(_) | ConcreteDataType::Dictionary(_) | ConcreteDataType::Duration(_) => { return error::IntoColumnDataTypeSnafu { from: datatype }.fail(); @@ -321,6 +395,40 @@ impl TryFrom for ColumnDataTypeWrapper { })), }) } + ColumnDataType::List => { + if let Some(list_type) = datatype.as_list() { + let list_item_type = + ColumnDataTypeWrapper::try_from(list_type.item_type().clone())?; + Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::ListType(Box::new(ListTypeExtension { + datatype: list_item_type.datatype.into(), + datatype_extension: list_item_type.datatype_ext.map(Box::new), + }))), + }) + } else { + None + } + } + ColumnDataType::Struct => { + if let Some(struct_type) = datatype.as_struct() { + let mut fields = Vec::with_capacity(struct_type.fields().len()); + for field in struct_type.fields() { + let field_type = + ColumnDataTypeWrapper::try_from(field.data_type().clone())?; + let proto_field = crate::v1::StructField { + name: field.name().to_string(), + datatype: field_type.datatype.into(), + datatype_extension: field_type.datatype_ext, + }; + fields.push(proto_field); + } + Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::StructType(StructTypeExtension { fields })), + }) + } else { + None + } + } _ => None, }; Ok(Self { @@ -448,56 +556,17 @@ pub fn values_with_capacity(datatype: ColumnDataType, capacity: usize) -> Values binary_values: Vec::with_capacity(capacity), ..Default::default() }, + ColumnDataType::List => Values { + list_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::Struct => Values { + struct_values: Vec::with_capacity(capacity), + ..Default::default() + }, } } -// The type of vals must be same. -pub fn push_vals(column: &mut Column, origin_count: usize, vector: VectorRef) { - let values = column.values.get_or_insert_with(Values::default); - let mut null_mask = BitVec::from_slice(&column.null_mask); - let len = vector.len(); - null_mask.reserve_exact(origin_count + len); - null_mask.extend(BitVec::repeat(false, len)); - - (0..len).for_each(|idx| match vector.get(idx) { - Value::Null => null_mask.set(idx + origin_count, true), - Value::Boolean(val) => values.bool_values.push(val), - Value::UInt8(val) => values.u8_values.push(val.into()), - Value::UInt16(val) => values.u16_values.push(val.into()), - Value::UInt32(val) => values.u32_values.push(val), - Value::UInt64(val) => values.u64_values.push(val), - Value::Int8(val) => values.i8_values.push(val.into()), - Value::Int16(val) => values.i16_values.push(val.into()), - Value::Int32(val) => values.i32_values.push(val), - Value::Int64(val) => values.i64_values.push(val), - Value::Float32(val) => values.f32_values.push(*val), - Value::Float64(val) => values.f64_values.push(*val), - Value::String(val) => values.string_values.push(val.as_utf8().to_string()), - Value::Binary(val) => values.binary_values.push(val.to_vec()), - Value::Date(val) => values.date_values.push(val.val()), - Value::Timestamp(val) => match val.unit() { - TimeUnit::Second => values.timestamp_second_values.push(val.value()), - TimeUnit::Millisecond => values.timestamp_millisecond_values.push(val.value()), - TimeUnit::Microsecond => values.timestamp_microsecond_values.push(val.value()), - TimeUnit::Nanosecond => values.timestamp_nanosecond_values.push(val.value()), - }, - Value::Time(val) => match val.unit() { - TimeUnit::Second => values.time_second_values.push(val.value()), - TimeUnit::Millisecond => values.time_millisecond_values.push(val.value()), - TimeUnit::Microsecond => values.time_microsecond_values.push(val.value()), - TimeUnit::Nanosecond => values.time_nanosecond_values.push(val.value()), - }, - Value::IntervalYearMonth(val) => values.interval_year_month_values.push(val.to_i32()), - Value::IntervalDayTime(val) => values.interval_day_time_values.push(val.to_i64()), - Value::IntervalMonthDayNano(val) => values - .interval_month_day_nano_values - .push(convert_month_day_nano_to_pb(val)), - Value::Decimal128(val) => values.decimal128_values.push(convert_to_pb_decimal128(val)), - Value::List(_) | Value::Duration(_) => unreachable!(), - }); - column.null_mask = null_mask.into_vec(); -} - /// Returns the type name of the [Request]. pub fn request_type(request: &Request) -> &'static str { match request { @@ -555,7 +624,7 @@ pub fn convert_to_pb_decimal128(v: Decimal128) -> v1::Decimal128 { pub fn pb_value_to_value_ref<'a>( value: &'a v1::Value, - datatype_ext: &'a Option, + datatype_ext: Option<&'a ColumnDataTypeExtension>, ) -> ValueRef<'a> { let Some(value) = &value.value_data else { return ValueRef::Null; @@ -622,6 +691,77 @@ pub fn pb_value_to_value_ref<'a>( )) } } + ValueData::ListValue(list) => { + let list_datatype_ext = datatype_ext + .as_ref() + .and_then(|ext| { + if let Some(TypeExt::ListType(l)) = &ext.type_ext { + Some(l) + } else { + None + } + }) + .expect("list must contain datatype ext"); + let item_type = ConcreteDataType::from(ColumnDataTypeWrapper::new( + list_datatype_ext.datatype(), + list_datatype_ext + .datatype_extension + .as_ref() + .map(|ext| *ext.clone()), + )); + let items = list + .items + .iter() + .map(|item| { + pb_value_to_value_ref(item, list_datatype_ext.datatype_extension.as_deref()) + }) + .collect::>(); + + let list_value = ListValueRef::RefList { + val: items, + item_datatype: item_type.clone(), + }; + ValueRef::List(list_value) + } + + ValueData::StructValue(struct_value) => { + let struct_datatype_ext = datatype_ext + .as_ref() + .and_then(|ext| { + if let Some(TypeExt::StructType(s)) = &ext.type_ext { + Some(s) + } else { + None + } + }) + .expect("struct must contain datatype ext"); + + let struct_fields = struct_datatype_ext + .fields + .iter() + .map(|field| { + let field_type = ConcreteDataType::from(ColumnDataTypeWrapper::new( + field.datatype(), + field.datatype_extension.clone(), + )); + let field_name = field.name.to_string(); + StructField::new(field_name, field_type, true) + }) + .collect::>(); + + let items = struct_value + .items + .iter() + .zip(struct_datatype_ext.fields.iter()) + .map(|(item, field)| pb_value_to_value_ref(item, field.datatype_extension.as_ref())) + .collect::>(); + + let struct_value_ref = StructValueRef::RefList { + val: items, + fields: StructType::new(struct_fields), + }; + ValueRef::Struct(struct_value_ref) + } } } @@ -896,8 +1036,8 @@ pub fn is_column_type_value_eq( } /// Convert value into proto's value. -pub fn to_proto_value(value: Value) -> Option { - let proto_value = match value { +pub fn to_proto_value(value: Value) -> v1::Value { + match value { Value::Null => v1::Value { value_data: None }, Value::Boolean(v) => v1::Value { value_data: Some(ValueData::BoolValue(v)), @@ -983,10 +1123,34 @@ pub fn to_proto_value(value: Value) -> Option { Value::Decimal128(v) => v1::Value { value_data: Some(ValueData::Decimal128Value(convert_to_pb_decimal128(v))), }, - Value::List(_) | Value::Duration(_) => return None, - }; + Value::List(list_value) => v1::Value { + value_data: Some(ValueData::ListValue(v1::ListValue { + items: convert_list_to_pb_values(list_value), + })), + }, + Value::Struct(struct_value) => v1::Value { + value_data: Some(ValueData::StructValue(v1::StructValue { + items: convert_struct_to_pb_values(struct_value), + })), + }, + Value::Duration(_) => v1::Value { value_data: None }, + } +} - Some(proto_value) +fn convert_list_to_pb_values(list_value: ListValue) -> Vec { + list_value + .take_items() + .into_iter() + .map(to_proto_value) + .collect() +} + +fn convert_struct_to_pb_values(struct_value: StructValue) -> Vec { + struct_value + .take_items() + .into_iter() + .map(to_proto_value) + .collect() } /// Returns the [ColumnDataTypeWrapper] of the value. @@ -1021,6 +1185,8 @@ pub fn proto_value_type(value: &v1::Value) -> Option { ValueData::IntervalDayTimeValue(_) => ColumnDataType::IntervalDayTime, ValueData::IntervalMonthDayNanoValue(_) => ColumnDataType::IntervalMonthDayNano, ValueData::Decimal128Value(_) => ColumnDataType::Decimal128, + ValueData::ListValue(_) => ColumnDataType::List, + ValueData::StructValue(_) => ColumnDataType::Struct, }; Some(value_type) } @@ -1075,7 +1241,23 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue { convert_month_day_nano_to_pb(v), )), Value::Decimal128(v) => Some(ValueData::Decimal128Value(convert_to_pb_decimal128(v))), - Value::List(_) | Value::Duration(_) => unreachable!(), + Value::List(list_value) => { + let items = list_value + .take_items() + .into_iter() + .map(value_to_grpc_value) + .collect(); + Some(ValueData::ListValue(v1::ListValue { items })) + } + Value::Struct(struct_value) => { + let items = struct_value + .take_items() + .into_iter() + .map(value_to_grpc_value) + .collect(); + Some(ValueData::StructValue(v1::StructValue { items })) + } + Value::Duration(_) => unreachable!(), }, } } @@ -1173,15 +1355,11 @@ mod tests { TimeMillisecondType, TimeSecondType, TimestampMillisecondType, TimestampSecondType, UInt32Type, }; - use datatypes::vectors::{ - BooleanVector, IntervalDayTimeVector, IntervalMonthDayNanoVector, IntervalYearMonthVector, - TimeMicrosecondVector, TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector, - TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector, - TimestampSecondVector, Vector, - }; + use datatypes::vectors::BooleanVector; use paste::paste; use super::*; + use crate::v1::Column; #[test] fn test_values_with_capacity() { @@ -1260,6 +1438,14 @@ mod tests { let values = values_with_capacity(ColumnDataType::Vector, 2); let values = values.binary_values; assert_eq!(2, values.capacity()); + + let values = values_with_capacity(ColumnDataType::List, 2); + let values = values.list_values; + assert_eq!(2, values.capacity()); + + let values = values_with_capacity(ColumnDataType::Struct, 2); + let values = values.struct_values; + assert_eq!(2, values.capacity()); } #[test] @@ -1352,6 +1538,37 @@ mod tests { ConcreteDataType::vector_datatype(3), ColumnDataTypeWrapper::vector_datatype(3).into() ); + assert_eq!( + ConcreteDataType::list_datatype(ConcreteDataType::string_datatype()), + ColumnDataTypeWrapper::list_datatype(ColumnDataTypeWrapper::string_datatype()).into() + ); + let struct_type = StructType::new(vec![ + StructField::new("id".to_string(), ConcreteDataType::int64_datatype(), true), + StructField::new( + "name".to_string(), + ConcreteDataType::string_datatype(), + true, + ), + StructField::new("age".to_string(), ConcreteDataType::int32_datatype(), true), + StructField::new( + "address".to_string(), + ConcreteDataType::string_datatype(), + true, + ), + ]); + assert_eq!( + ConcreteDataType::struct_datatype(struct_type.clone()), + ColumnDataTypeWrapper::struct_datatype(vec![ + ("id".to_string(), ColumnDataTypeWrapper::int64_datatype()), + ("name".to_string(), ColumnDataTypeWrapper::string_datatype()), + ("age".to_string(), ColumnDataTypeWrapper::int32_datatype()), + ( + "address".to_string(), + ColumnDataTypeWrapper::string_datatype() + ) + ]) + .into() + ); } #[test] @@ -1455,176 +1672,29 @@ mod tests { "Failed to create column datatype from Null(NullType)" ); - let result: Result = - ConcreteDataType::list_datatype(ConcreteDataType::boolean_datatype()).try_into(); - assert!(result.is_err()); assert_eq!( - result.unwrap_err().to_string(), - "Failed to create column datatype from List(ListType { item_type: Boolean(BooleanType) })" - ); - } - - #[test] - fn test_column_put_timestamp_values() { - let mut column = Column { - column_name: "test".to_string(), - semantic_type: 0, - values: Some(Values { - ..Default::default() - }), - null_mask: vec![], - datatype: 0, - ..Default::default() - }; - - let vector = Arc::new(TimestampNanosecondVector::from_vec(vec![1, 2, 3])); - push_vals(&mut column, 3, vector); - assert_eq!( - vec![1, 2, 3], - column.values.as_ref().unwrap().timestamp_nanosecond_values + ColumnDataTypeWrapper::list_datatype(ColumnDataTypeWrapper::int16_datatype()), + ConcreteDataType::list_datatype(ConcreteDataType::int16_datatype()) + .try_into() + .expect("Failed to create column datatype from List(ListType { item_type: Int16(Int16Type) })") ); - let vector = Arc::new(TimestampMillisecondVector::from_vec(vec![4, 5, 6])); - push_vals(&mut column, 3, vector); assert_eq!( - vec![4, 5, 6], - column.values.as_ref().unwrap().timestamp_millisecond_values - ); - - let vector = Arc::new(TimestampMicrosecondVector::from_vec(vec![7, 8, 9])); - push_vals(&mut column, 3, vector); - assert_eq!( - vec![7, 8, 9], - column.values.as_ref().unwrap().timestamp_microsecond_values - ); - - let vector = Arc::new(TimestampSecondVector::from_vec(vec![10, 11, 12])); - push_vals(&mut column, 3, vector); - assert_eq!( - vec![10, 11, 12], - column.values.as_ref().unwrap().timestamp_second_values - ); - } - - #[test] - fn test_column_put_time_values() { - let mut column = Column { - column_name: "test".to_string(), - semantic_type: 0, - values: Some(Values { - ..Default::default() - }), - null_mask: vec![], - datatype: 0, - ..Default::default() - }; - - let vector = Arc::new(TimeNanosecondVector::from_vec(vec![1, 2, 3])); - push_vals(&mut column, 3, vector); - assert_eq!( - vec![1, 2, 3], - column.values.as_ref().unwrap().time_nanosecond_values - ); - - let vector = Arc::new(TimeMillisecondVector::from_vec(vec![4, 5, 6])); - push_vals(&mut column, 3, vector); - assert_eq!( - vec![4, 5, 6], - column.values.as_ref().unwrap().time_millisecond_values - ); - - let vector = Arc::new(TimeMicrosecondVector::from_vec(vec![7, 8, 9])); - push_vals(&mut column, 3, vector); - assert_eq!( - vec![7, 8, 9], - column.values.as_ref().unwrap().time_microsecond_values - ); - - let vector = Arc::new(TimeSecondVector::from_vec(vec![10, 11, 12])); - push_vals(&mut column, 3, vector); - assert_eq!( - vec![10, 11, 12], - column.values.as_ref().unwrap().time_second_values - ); - } - - #[test] - fn test_column_put_interval_values() { - let mut column = Column { - column_name: "test".to_string(), - semantic_type: 0, - values: Some(Values { - ..Default::default() - }), - null_mask: vec![], - datatype: 0, - ..Default::default() - }; - - let vector = Arc::new(IntervalYearMonthVector::from_vec(vec![1, 2, 3])); - push_vals(&mut column, 3, vector); - assert_eq!( - vec![1, 2, 3], - column.values.as_ref().unwrap().interval_year_month_values - ); - - let vector = Arc::new(IntervalDayTimeVector::from_vec(vec![ - IntervalDayTime::new(0, 4).into(), - IntervalDayTime::new(0, 5).into(), - IntervalDayTime::new(0, 6).into(), - ])); - push_vals(&mut column, 3, vector); - assert_eq!( - vec![4, 5, 6], - column.values.as_ref().unwrap().interval_day_time_values - ); - - let vector = Arc::new(IntervalMonthDayNanoVector::from_vec(vec![ - IntervalMonthDayNano::new(0, 0, 7).into(), - IntervalMonthDayNano::new(0, 0, 8).into(), - IntervalMonthDayNano::new(0, 0, 9).into(), - ])); - let len = vector.len(); - push_vals(&mut column, 3, vector); - (0..len).for_each(|i| { - assert_eq!( - 7 + i as i64, - column - .values - .as_ref() - .unwrap() - .interval_month_day_nano_values - .get(i) - .unwrap() - .nanoseconds - ); - }); - } - - #[test] - fn test_column_put_vector() { - use crate::v1::SemanticType; - // Some(false), None, Some(true), Some(true) - let mut column = Column { - column_name: "test".to_string(), - semantic_type: SemanticType::Field as i32, - values: Some(Values { - bool_values: vec![false, true, true], - ..Default::default() - }), - null_mask: vec![2], - datatype: ColumnDataType::Boolean as i32, - ..Default::default() - }; - let row_count = 4; - - let vector = Arc::new(BooleanVector::from(vec![Some(true), None, Some(false)])); - push_vals(&mut column, row_count, vector); - // Some(false), None, Some(true), Some(true), Some(true), None, Some(false) - let bool_values = column.values.unwrap().bool_values; - assert_eq!(vec![false, true, true, true, false], bool_values); - let null_mask = column.null_mask; - assert_eq!(34, null_mask[0]); + ColumnDataTypeWrapper::struct_datatype(vec![ + ("a".to_string(), ColumnDataTypeWrapper::int64_datatype()), + ( + "a.a".to_string(), + ColumnDataTypeWrapper::list_datatype(ColumnDataTypeWrapper::string_datatype()) + ) + ]), + ConcreteDataType::struct_datatype(StructType::new(vec![ + StructField::new("a".to_string(), ConcreteDataType::int64_datatype(), true), + StructField::new( + "a.a".to_string(), + ConcreteDataType::list_datatype(ConcreteDataType::string_datatype()), true + ) + ])).try_into().expect("Failed to create column datatype from Struct(StructType { fields: [StructField { name: \"a\", data_type: Int64(Int64Type) }, StructField { name: \"a.a\", data_type: List(ListType { item_type: String(StringType) }) }] })") + ) } #[test] @@ -2000,4 +2070,50 @@ mod tests { assert_eq!(pb_decimal.lo, 123); assert_eq!(pb_decimal.hi, 0); } + + #[test] + fn test_list_to_pb_value() { + let value = Value::List(ListValue::new( + vec![Value::Boolean(true)], + ConcreteDataType::boolean_datatype(), + )); + + let pb_value = to_proto_value(value); + + match pb_value.value_data.unwrap() { + ValueData::ListValue(pb_list_value) => { + assert_eq!(pb_list_value.items.len(), 1); + } + _ => panic!("Unexpected value type"), + } + } + + #[test] + fn test_struct_to_pb_value() { + let items = vec![Value::Boolean(true), Value::String("tom".into())]; + + let value = Value::Struct( + StructValue::try_new( + items, + StructType::new(vec![ + StructField::new( + "a.a".to_string(), + ConcreteDataType::boolean_datatype(), + true, + ), + StructField::new("a.b".to_string(), ConcreteDataType::string_datatype(), true), + ]), + ) + .unwrap(), + ); + + let pb_value = to_proto_value(value); + + match pb_value.value_data.unwrap() { + ValueData::StructValue(pb_struct_value) => { + assert_eq!(pb_struct_value.items.len(), 2); + } + _ => panic!("Unexpected value type"), + } + } } diff --git a/src/api/src/v1/column_def.rs b/src/api/src/v1/column_def.rs index ec3d7da3bf..5be3d5c196 100644 --- a/src/api/src/v1/column_def.rs +++ b/src/api/src/v1/column_def.rs @@ -37,8 +37,10 @@ const SKIPPING_INDEX_GRPC_KEY: &str = "skipping_index"; /// Tries to construct a `ColumnSchema` from the given `ColumnDef`. pub fn try_as_column_schema(column_def: &ColumnDef) -> Result { - let data_type = - ColumnDataTypeWrapper::try_new(column_def.data_type, column_def.datatype_extension)?; + let data_type = ColumnDataTypeWrapper::try_new( + column_def.data_type, + column_def.datatype_extension.clone(), + )?; let constraint = if column_def.default_constraint.is_empty() { None diff --git a/src/common/function/src/helper.rs b/src/common/function/src/helper.rs index 086755e738..6e643443ec 100644 --- a/src/common/function/src/helper.rs +++ b/src/common/function/src/helper.rs @@ -38,7 +38,7 @@ pub(crate) fn one_of_sigs2(args1: Vec, args2: Vec) -> Signat /// Cast a [`ValueRef`] to u64, returns `None` if fails pub fn cast_u64(value: &ValueRef) -> Result> { - cast((*value).into(), &ConcreteDataType::uint64_datatype()) + cast(value.clone().into(), &ConcreteDataType::uint64_datatype()) .context(InvalidInputTypeSnafu { err_msg: format!( "Failed to cast input into uint64, actual type: {:#?}", @@ -50,7 +50,7 @@ pub fn cast_u64(value: &ValueRef) -> Result> { /// Cast a [`ValueRef`] to u32, returns `None` if fails pub fn cast_u32(value: &ValueRef) -> Result> { - cast((*value).into(), &ConcreteDataType::uint32_datatype()) + cast(value.clone().into(), &ConcreteDataType::uint32_datatype()) .context(InvalidInputTypeSnafu { err_msg: format!( "Failed to cast input into uint32, actual type: {:#?}", diff --git a/src/common/function/src/scalars/expression/binary.rs b/src/common/function/src/scalars/expression/binary.rs index 9ef67a2b0f..4d9f6268a8 100644 --- a/src/common/function/src/scalars/expression/binary.rs +++ b/src/common/function/src/scalars/expression/binary.rs @@ -41,7 +41,7 @@ where let right: &::VectorType = unsafe { Helper::static_cast(right.inner()) }; let b = right.get_data(0); - let it = left.iter_data().map(|a| f(a, b, ctx)); + let it = left.iter_data().map(|a| f(a, b.clone(), ctx)); ::VectorType::from_owned_iterator(it) } @@ -62,7 +62,7 @@ where let a = left.get_data(0); let right: &::VectorType = unsafe { Helper::static_cast(r) }; - let it = right.iter_data().map(|b| f(a, b, ctx)); + let it = right.iter_data().map(|b| f(a.clone(), b, ctx)); ::VectorType::from_owned_iterator(it) } diff --git a/src/common/grpc-expr/src/insert.rs b/src/common/grpc-expr/src/insert.rs index 08857faf7b..d1e360d148 100644 --- a/src/common/grpc-expr/src/insert.rs +++ b/src/common/grpc-expr/src/insert.rs @@ -46,7 +46,7 @@ pub(crate) fn add_values_to_builder( Some(true) => builder.push_null(), _ => { builder - .try_push_value_ref(values[idx_of_values].as_value_ref()) + .try_push_value_ref(&values[idx_of_values].as_value_ref()) .context(CreateVectorSnafu)?; idx_of_values += 1 } diff --git a/src/common/grpc-expr/src/util.rs b/src/common/grpc-expr/src/util.rs index b807b211b6..84e268b687 100644 --- a/src/common/grpc-expr/src/util.rs +++ b/src/common/grpc-expr/src/util.rs @@ -167,7 +167,7 @@ pub fn build_create_table_expr( default_constraint: vec![], semantic_type, comment: String::new(), - datatype_extension: *datatype_extension, + datatype_extension: datatype_extension.clone(), options: options.clone(), }); } @@ -209,7 +209,7 @@ pub fn extract_new_columns( default_constraint: vec![], semantic_type: expr.semantic_type, comment: String::new(), - datatype_extension: *expr.datatype_extension, + datatype_extension: expr.datatype_extension.clone(), options: expr.options.clone(), }); AddColumn { @@ -425,7 +425,7 @@ mod tests { ConcreteDataType::from( ColumnDataTypeWrapper::try_new( decimal_column.data_type, - decimal_column.datatype_extension, + decimal_column.datatype_extension.clone(), ) .unwrap() ) @@ -520,6 +520,7 @@ mod tests { .as_ref() .unwrap() .datatype_extension + .clone() ) .unwrap() ) diff --git a/src/common/macro/src/admin_fn.rs b/src/common/macro/src/admin_fn.rs index 023ff977b5..ca97e5468f 100644 --- a/src/common/macro/src/admin_fn.rs +++ b/src/common/macro/src/admin_fn.rs @@ -325,7 +325,7 @@ fn build_struct( let result = #fn_name(handler, query_ctx, &[]).await .map_err(|e| datafusion_common::DataFusionError::Execution(format!("Function execution error: {}", e.output_msg())))?; - builder.push_value_ref(result.as_value_ref()); + builder.push_value_ref(&result.as_value_ref()); } else { for i in 0..rows_num { let args: Vec<_> = columns.iter() @@ -335,7 +335,7 @@ fn build_struct( let result = #fn_name(handler, query_ctx, &args).await .map_err(|e| datafusion_common::DataFusionError::Execution(format!("Function execution error: {}", e.output_msg())))?; - builder.push_value_ref(result.as_value_ref()); + builder.push_value_ref(&result.as_value_ref()); } } diff --git a/src/common/macro/src/row/schema.rs b/src/common/macro/src/row/schema.rs index 8a033b5aad..4c9d538dd3 100644 --- a/src/common/macro/src/row/schema.rs +++ b/src/common/macro/src/row/schema.rs @@ -90,6 +90,24 @@ fn impl_schema_method(fields: &[ParsedField<'_>]) -> Result { Some(ColumnDataTypeExtension { type_ext: Some(TypeExt::VectorType(VectorTypeExtension { dim: #dim })) }) } } + Some(TypeExt::ListType(ext)) => { + let item_type = syn::Ident::new(&ext.datatype.to_string(), ident.span()); + quote! { + Some(ColumnDataTypeExtension { type_ext: Some(TypeExt::ListType(ListTypeExtension { item_type: #item_type })) }) + } + } + Some(TypeExt::StructType(ext)) => { + let fields = ext.fields.iter().map(|field| { + let field_name = syn::Ident::new(&field.name.to_string(), ident.span()); + let field_type = syn::Ident::new(&field.datatype.to_string(), ident.span()); + quote! { + StructField { name: #field_name, type_: #field_type } + } + }).collect::>(); + quote! { + Some(ColumnDataTypeExtension { type_ext: Some(TypeExt::StructType(StructTypeExtension { fields: [#(#fields),*] })) }) + } + } None => { quote! { None } } diff --git a/src/common/macro/src/row/utils.rs b/src/common/macro/src/row/utils.rs index 7b6547847a..40f990a40a 100644 --- a/src/common/macro/src/row/utils.rs +++ b/src/common/macro/src/row/utils.rs @@ -184,7 +184,7 @@ pub(crate) fn convert_semantic_type_to_proto_semantic_type( } } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] pub(crate) struct ColumnDataTypeWithExtension { pub(crate) data_type: ColumnDataType, pub(crate) extension: Option, @@ -260,7 +260,10 @@ pub(crate) fn get_column_data_type( infer_column_data_type: &Option, attribute: &ColumnAttribute, ) -> Option { - attribute.datatype.or(*infer_column_data_type) + attribute + .datatype + .clone() + .or_else(|| infer_column_data_type.clone()) } /// Convert a column data type to a value data ident. @@ -304,5 +307,7 @@ pub(crate) fn convert_column_data_type_to_value_data_ident( // Json is a special case, it is actually a string column. ColumnDataType::Json => format_ident!("StringValue"), ColumnDataType::Vector => format_ident!("VectorValue"), + ColumnDataType::List => format_ident!("ListValue"), + ColumnDataType::Struct => format_ident!("StructValue"), } } diff --git a/src/common/meta/src/ddl/create_table_template.rs b/src/common/meta/src/ddl/create_table_template.rs index df745e2f48..53cc00a460 100644 --- a/src/common/meta/src/ddl/create_table_template.rs +++ b/src/common/meta/src/ddl/create_table_template.rs @@ -90,7 +90,7 @@ pub(crate) fn build_template(create_table_expr: &CreateTableExpr) -> Result return InvalidUnaryOpSnafu { unary_op, value }.fail(), }, - Value::String(_) | Value::Binary(_) | Value::List(_) => { + Value::String(_) | Value::Binary(_) | Value::List(_) | Value::Struct(_) => { return InvalidUnaryOpSnafu { unary_op, value }.fail(); } } diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index fb03869e41..8b97399284 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -285,6 +285,13 @@ impl ConcreteDataType { } } + pub fn as_struct(&self) -> Option<&StructType> { + match self { + ConcreteDataType::Struct(s) => Some(s), + _ => None, + } + } + /// Try to cast data type as a [`TimestampType`]. pub fn as_timestamp(&self) -> Option { match self { diff --git a/src/datatypes/src/error.rs b/src/datatypes/src/error.rs index 565daae4cf..85e78ce1eb 100644 --- a/src/datatypes/src/error.rs +++ b/src/datatypes/src/error.rs @@ -220,6 +220,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to convert scalar value to Arrow array"))] + ConvertScalarToArrowArray { + #[snafu(source)] + error: datafusion_common::DataFusionError, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to parse extended type in metadata: {}", value))] ParseExtendedType { value: String, @@ -238,6 +246,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + #[snafu(display("Inconsistent struct field count {field_len} and item count {item_len}"))] + InconsistentStructFieldsAndItems { + field_len: usize, + item_len: usize, + #[snafu(implicit)] + location: Location, + }, #[snafu(display("Failed to process JSONB value"))] InvalidJsonb { error: jsonb::Error, @@ -282,7 +297,9 @@ impl ErrorExt for Error { | ToScalarValue { .. } | TryFromValue { .. } | ConvertArrowArrayToScalars { .. } - | ParseExtendedType { .. } => StatusCode::Internal, + | ConvertScalarToArrowArray { .. } + | ParseExtendedType { .. } + | InconsistentStructFieldsAndItems { .. } => StatusCode::Internal, } } diff --git a/src/datatypes/src/scalars.rs b/src/datatypes/src/scalars.rs index 389888a9dd..2032f6bc56 100644 --- a/src/datatypes/src/scalars.rs +++ b/src/datatypes/src/scalars.rs @@ -21,10 +21,10 @@ use crate::types::{ Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, Int64Type, UInt8Type, UInt16Type, UInt32Type, UInt64Type, }; -use crate::value::{ListValue, ListValueRef, Value}; +use crate::value::{ListValue, ListValueRef, StructValue, StructValueRef, Value}; use crate::vectors::{ BinaryVector, BooleanVector, DateVector, Decimal128Vector, ListVector, MutableVector, - PrimitiveVector, StringVector, Vector, + NullVector, PrimitiveVector, StringVector, StructVector, Vector, }; fn get_iter_capacity>(iter: &I) -> usize { @@ -52,7 +52,7 @@ where fn upcast_gat<'short, 'long: 'short>(long: Self::RefType<'long>) -> Self::RefType<'short>; } -pub trait ScalarRef<'a>: std::fmt::Debug + Clone + Copy + Send + 'a { +pub trait ScalarRef<'a>: std::fmt::Debug + Clone + Send + 'a { /// The corresponding [`Scalar`] type. type ScalarType: Scalar = Self>; @@ -95,7 +95,7 @@ where fn from_slice(data: &[Self::RefItem<'_>]) -> Self { let mut builder = Self::Builder::with_capacity(data.len()); for item in data { - builder.push(Some(*item)); + builder.push(Some(item.clone())); } builder.finish() } @@ -152,13 +152,11 @@ macro_rules! impl_scalar_for_native { type VectorType = PrimitiveVector<$DataType>; type RefType<'a> = $Native; - #[inline] fn as_scalar_ref(&self) -> $Native { *self } #[allow(clippy::needless_lifetimes)] - #[inline] fn upcast_gat<'short, 'long: 'short>(long: $Native) -> $Native { long } @@ -168,7 +166,6 @@ macro_rules! impl_scalar_for_native { impl<'a> ScalarRef<'a> for $Native { type ScalarType = $Native; - #[inline] fn to_owned_scalar(&self) -> $Native { *self } @@ -187,17 +184,33 @@ impl_scalar_for_native!(i64, Int64Type); impl_scalar_for_native!(f32, Float32Type); impl_scalar_for_native!(f64, Float64Type); +impl Scalar for () { + type VectorType = NullVector; + type RefType<'a> = (); + + fn as_scalar_ref(&self) {} + + #[allow(clippy::needless_lifetimes)] + fn upcast_gat<'short, 'long: 'short>(long: ()) { + long + } +} + +impl ScalarRef<'_> for () { + type ScalarType = (); + + fn to_owned_scalar(&self) {} +} + impl Scalar for bool { type VectorType = BooleanVector; type RefType<'a> = bool; - #[inline] fn as_scalar_ref(&self) -> bool { *self } #[allow(clippy::needless_lifetimes)] - #[inline] fn upcast_gat<'short, 'long: 'short>(long: bool) -> bool { long } @@ -206,7 +219,6 @@ impl Scalar for bool { impl ScalarRef<'_> for bool { type ScalarType = bool; - #[inline] fn to_owned_scalar(&self) -> bool { *self } @@ -216,12 +228,10 @@ impl Scalar for String { type VectorType = StringVector; type RefType<'a> = &'a str; - #[inline] fn as_scalar_ref(&self) -> &str { self } - #[inline] fn upcast_gat<'short, 'long: 'short>(long: &'long str) -> &'short str { long } @@ -230,7 +240,6 @@ impl Scalar for String { impl<'a> ScalarRef<'a> for &'a str { type ScalarType = String; - #[inline] fn to_owned_scalar(&self) -> String { self.to_string() } @@ -240,12 +249,10 @@ impl Scalar for Vec { type VectorType = BinaryVector; type RefType<'a> = &'a [u8]; - #[inline] fn as_scalar_ref(&self) -> &[u8] { self } - #[inline] fn upcast_gat<'short, 'long: 'short>(long: &'long [u8]) -> &'short [u8] { long } @@ -254,7 +261,6 @@ impl Scalar for Vec { impl<'a> ScalarRef<'a> for &'a [u8] { type ScalarType = Vec; - #[inline] fn to_owned_scalar(&self) -> Vec { self.to_vec() } @@ -332,6 +338,42 @@ impl<'a> ScalarRef<'a> for ListValueRef<'a> { _ => unreachable!(), }, ListValueRef::Ref { val } => (*val).clone(), + ListValueRef::RefList { val, item_datatype } => ListValue::new( + val.iter().map(|v| Value::from(v.clone())).collect(), + item_datatype.clone(), + ), + } + } +} + +impl Scalar for StructValue { + type VectorType = StructVector; + type RefType<'a> = StructValueRef<'a>; + + fn as_scalar_ref(&self) -> Self::RefType<'_> { + StructValueRef::Ref(self) + } + + fn upcast_gat<'short, 'long: 'short>(long: Self::RefType<'long>) -> Self::RefType<'short> { + long + } +} + +impl<'a> ScalarRef<'a> for StructValueRef<'a> { + type ScalarType = StructValue; + + fn to_owned_scalar(&self) -> Self::ScalarType { + match self { + Self::Indexed { vector, idx } => match vector.get(*idx) { + Value::Null => StructValue::default(), + Value::Struct(v) => v, + _ => unreachable!(), + }, + StructValueRef::Ref(val) => (*val).clone(), + StructValueRef::RefList { val, fields } => { + let items = val.iter().map(|v| Value::from(v.clone())).collect(); + StructValue::try_new(items, fields.clone()).unwrap() + } } } } @@ -346,7 +388,7 @@ mod tests { fn build_vector_from_slice(items: &[Option>]) -> T { let mut builder = T::Builder::with_capacity(items.len()); for item in items { - builder.push(*item); + builder.push(item.clone()); } builder.finish() } diff --git a/src/datatypes/src/schema/column_schema.rs b/src/datatypes/src/schema/column_schema.rs index 24ab30b61b..f176350b8c 100644 --- a/src/datatypes/src/schema/column_schema.rs +++ b/src/datatypes/src/schema/column_schema.rs @@ -296,7 +296,7 @@ impl ColumnSchema { let value_ref = padding_value.as_value_ref(); let mut mutable_vector = self.data_type.create_mutable_vector(num_rows); for _ in 0..num_rows { - mutable_vector.push_value_ref(value_ref); + mutable_vector.push_value_ref(&value_ref); } mutable_vector.to_vector() } diff --git a/src/datatypes/src/schema/constraint.rs b/src/datatypes/src/schema/constraint.rs index 01928dc3b5..60025be68e 100644 --- a/src/datatypes/src/schema/constraint.rs +++ b/src/datatypes/src/schema/constraint.rs @@ -157,7 +157,7 @@ impl ColumnDefaultConstraint { // attempt to downcast the vector fail if they don't check whether the vector is const // first. let mut mutable_vector = data_type.create_mutable_vector(1); - mutable_vector.try_push_value_ref(v.as_value_ref())?; + mutable_vector.try_push_value_ref(&v.as_value_ref())?; let base_vector = mutable_vector.to_vector(); Ok(base_vector.replicate(&[num_rows])) } diff --git a/src/datatypes/src/types/date_type.rs b/src/datatypes/src/types/date_type.rs index a78da1d65b..a34bc6069b 100644 --- a/src/datatypes/src/types/date_type.rs +++ b/src/datatypes/src/types/date_type.rs @@ -83,10 +83,10 @@ impl LogicalPrimitiveType for DateType { }) } - fn cast_value_ref(value: ValueRef) -> Result> { + fn cast_value_ref(value: &ValueRef) -> Result> { match value { ValueRef::Null => Ok(None), - ValueRef::Date(v) => Ok(Some(v)), + ValueRef::Date(v) => Ok(Some(*v)), other => error::CastTypeSnafu { msg: format!("Failed to cast value {other:?} to Date"), } diff --git a/src/datatypes/src/types/duration_type.rs b/src/datatypes/src/types/duration_type.rs index e5e5c66be5..961be38182 100644 --- a/src/datatypes/src/types/duration_type.rs +++ b/src/datatypes/src/types/duration_type.rs @@ -133,11 +133,11 @@ macro_rules! impl_data_type_for_duration { }) } - fn cast_value_ref(value: ValueRef) -> crate::Result> { + fn cast_value_ref(value: &ValueRef) -> crate::Result> { match value { ValueRef::Null => Ok(None), ValueRef::Duration(t) => match t.unit() { - TimeUnit::$unit => Ok(Some([](t))), + TimeUnit::$unit => Ok(Some([](*t))), other => error::CastTypeSnafu { msg: format!( "Failed to cast Duration value with different unit {:?} to {}", diff --git a/src/datatypes/src/types/interval_type.rs b/src/datatypes/src/types/interval_type.rs index e6b6544d0d..8687c714c6 100644 --- a/src/datatypes/src/types/interval_type.rs +++ b/src/datatypes/src/types/interval_type.rs @@ -123,10 +123,10 @@ macro_rules! impl_data_type_for_interval { }) } - fn cast_value_ref(value: ValueRef) -> crate::Result> { + fn cast_value_ref(value: &ValueRef) -> crate::Result> { match value { ValueRef::Null => Ok(None), - ValueRef::[](t) => Ok(Some(t)), + ValueRef::[](t) => Ok(Some(*t)), other => error::CastTypeSnafu { msg: format!("Failed to cast value {:?} to {}", other, stringify!([])), } diff --git a/src/datatypes/src/types/list_type.rs b/src/datatypes/src/types/list_type.rs index 91f4c8654f..acdb443c7e 100644 --- a/src/datatypes/src/types/list_type.rs +++ b/src/datatypes/src/types/list_type.rs @@ -65,7 +65,11 @@ impl DataType for ListType { } fn as_arrow_type(&self) -> ArrowDataType { - let field = Arc::new(Field::new("item", self.item_type.as_arrow_type(), true)); + let field = Arc::new(Field::new( + Field::LIST_FIELD_DEFAULT_NAME, + self.item_type.as_arrow_type(), + true, + )); ArrowDataType::List(field) } diff --git a/src/datatypes/src/types/primitive_type.rs b/src/datatypes/src/types/primitive_type.rs index bcd228df35..b861b4c158 100644 --- a/src/datatypes/src/types/primitive_type.rs +++ b/src/datatypes/src/types/primitive_type.rs @@ -80,7 +80,7 @@ pub trait LogicalPrimitiveType: 'static + Sized { fn cast_vector(vector: &dyn Vector) -> Result<&PrimitiveVector>; /// Cast value ref to the primitive type. - fn cast_value_ref(value: ValueRef) -> Result>; + fn cast_value_ref(value: &ValueRef) -> Result>; } /// A new type for [WrapperType], complement the `Ord` feature for it. @@ -194,10 +194,10 @@ macro_rules! define_logical_primitive_type { }) } - fn cast_value_ref(value: ValueRef) -> Result> { + fn cast_value_ref(value: &ValueRef) -> Result> { match value { ValueRef::Null => Ok(None), - ValueRef::$TypeId(v) => Ok(Some(v.into())), + ValueRef::$TypeId(v) => Ok(Some((*v).into())), other => error::CastTypeSnafu { msg: format!( "Failed to cast value {:?} to primitive type {}", diff --git a/src/datatypes/src/types/string_type.rs b/src/datatypes/src/types/string_type.rs index 4cf0522b1f..fdca963707 100644 --- a/src/datatypes/src/types/string_type.rs +++ b/src/datatypes/src/types/string_type.rs @@ -89,8 +89,8 @@ impl DataType for StringType { Value::Duration(v) => Some(Value::String(StringBytes::from(v.to_string()))), Value::Decimal128(v) => Some(Value::String(StringBytes::from(v.to_string()))), - // StringBytes is only support for utf-8, Value::Binary is not allowed. - Value::Binary(_) | Value::List(_) => None, + // StringBytes is only support for utf-8, Value::Binary and collections are not allowed. + Value::Binary(_) | Value::List(_) | Value::Struct(_) => None, } } } diff --git a/src/datatypes/src/types/struct_type.rs b/src/datatypes/src/types/struct_type.rs index 2b59a0d603..cbd3a549ef 100644 --- a/src/datatypes/src/types/struct_type.rs +++ b/src/datatypes/src/types/struct_type.rs @@ -18,6 +18,7 @@ use serde::{Deserialize, Serialize}; use crate::prelude::{ConcreteDataType, DataType, LogicalTypeId}; use crate::value::Value; +use crate::vectors::StructVectorBuilder; #[derive(Clone, Debug, Default, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] pub struct StructType { @@ -68,16 +69,15 @@ impl DataType for StructType { } fn as_arrow_type(&self) -> ArrowDataType { - let fields = self - .fields - .iter() - .map(|f| Field::new(f.name.clone(), f.data_type.as_arrow_type(), f.nullable)) - .collect(); + let fields = self.as_arrow_fields(); ArrowDataType::Struct(fields) } - fn create_mutable_vector(&self, _capacity: usize) -> Box { - unimplemented!("What is the mutable vector for StructVector?"); + fn create_mutable_vector(&self, capacity: usize) -> Box { + Box::new(StructVectorBuilder::with_type_and_capacity( + self.clone(), + capacity, + )) } fn try_cast(&self, _from: Value) -> Option { @@ -94,6 +94,13 @@ impl StructType { pub fn fields(&self) -> &[StructField] { &self.fields } + + pub fn as_arrow_fields(&self) -> Fields { + self.fields + .iter() + .map(|f| Field::new(f.name.clone(), f.data_type.as_arrow_type(), f.nullable)) + .collect() + } } #[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] diff --git a/src/datatypes/src/types/time_type.rs b/src/datatypes/src/types/time_type.rs index bb360f5a08..2f9408b472 100644 --- a/src/datatypes/src/types/time_type.rs +++ b/src/datatypes/src/types/time_type.rs @@ -149,14 +149,14 @@ macro_rules! impl_data_type_for_time { }) } - fn cast_value_ref(value: ValueRef) -> crate::Result> { + fn cast_value_ref(value: &ValueRef) -> crate::Result> { match value { ValueRef::Null => Ok(None), ValueRef::Int64(v) =>{ - Ok(Some([