From c152a45d4455466b78eb10caed2235361e41b10f Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 21 Nov 2025 19:21:32 +0800 Subject: [PATCH] feat: support Dictionary type (#7277) * feat: support Dictionary type Signed-off-by: Ruihang Xia * fix format Signed-off-by: Ruihang Xia * update proto Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/api/src/helper.rs | 96 +++++++++++++- src/common/macro/src/row/schema.rs | 177 +++++++++++++++++-------- src/common/macro/src/row/utils.rs | 3 + src/operator/src/req_convert/common.rs | 10 +- 6 files changed, 229 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2efaab5bfa..42ad5cd83b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5348,7 +5348,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=fd699b21991e358201ee81ead4d319545c5df2ad#fd699b21991e358201ee81ead4d319545c5df2ad" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=437e6a1ef8139e3946dcc043ea22c7fe6877019d#437e6a1ef8139e3946dcc043ea22c7fe6877019d" dependencies = [ "prost 0.13.5", "prost-types 0.13.5", diff --git a/Cargo.toml b/Cargo.toml index 5ea3dd2d1d..e97e3360ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -148,7 +148,7 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "fd699b21991e358201ee81ead4d319545c5df2ad" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "437e6a1ef8139e3946dcc043ea22c7fe6877019d" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 468ffb9cc2..6751476e8f 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -35,9 +35,9 @@ use greptime_proto::v1::greptime_request::Request; use greptime_proto::v1::query_request::Query; use greptime_proto::v1::value::ValueData; use greptime_proto::v1::{ - self, ColumnDataTypeExtension, DdlRequest, DecimalTypeExtension, JsonList, - JsonNativeTypeExtension, JsonObject, JsonTypeExtension, ListTypeExtension, QueryRequest, Row, - SemanticType, StructTypeExtension, VectorTypeExtension, json_value, + self, ColumnDataTypeExtension, DdlRequest, DecimalTypeExtension, DictionaryTypeExtension, + JsonList, JsonNativeTypeExtension, JsonObject, JsonTypeExtension, ListTypeExtension, + QueryRequest, Row, SemanticType, StructTypeExtension, VectorTypeExtension, json_value, }; use paste::paste; use snafu::prelude::*; @@ -216,6 +216,26 @@ impl From for ConcreteDataType { ConcreteDataType::null_datatype() } } + ColumnDataType::Dictionary => { + if let Some(TypeExt::DictionaryType(d)) = datatype_wrapper + .datatype_ext + .as_ref() + .and_then(|datatype_ext| datatype_ext.type_ext.as_ref()) + { + let key_type = ColumnDataTypeWrapper { + datatype: d.key_datatype(), + datatype_ext: d.key_datatype_extension.clone().map(|ext| *ext), + }; + let value_type = ColumnDataTypeWrapper { + datatype: d.value_datatype(), + datatype_ext: d.value_datatype_extension.clone().map(|ext| *ext), + }; + ConcreteDataType::dictionary_datatype(key_type.into(), value_type.into()) + } else { + // invalid state: type extension not found + ConcreteDataType::null_datatype() + } + } } } } @@ -339,6 +359,23 @@ impl ColumnDataTypeWrapper { }), } } + + pub fn dictionary_datatype( + key_type: ColumnDataTypeWrapper, + value_type: ColumnDataTypeWrapper, + ) -> Self { + ColumnDataTypeWrapper { + datatype: ColumnDataType::Dictionary, + datatype_ext: Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::DictionaryType(Box::new(DictionaryTypeExtension { + key_datatype: key_type.datatype().into(), + key_datatype_extension: key_type.datatype_ext.map(Box::new), + value_datatype: value_type.datatype().into(), + value_datatype_extension: value_type.datatype_ext.map(Box::new), + }))), + }), + } + } } impl TryFrom for ColumnDataTypeWrapper { @@ -382,9 +419,8 @@ impl TryFrom for ColumnDataTypeWrapper { ConcreteDataType::Vector(_) => ColumnDataType::Vector, ConcreteDataType::List(_) => ColumnDataType::List, ConcreteDataType::Struct(_) => ColumnDataType::Struct, - ConcreteDataType::Null(_) - | ConcreteDataType::Dictionary(_) - | ConcreteDataType::Duration(_) => { + ConcreteDataType::Dictionary(_) => ColumnDataType::Dictionary, + ConcreteDataType::Null(_) | ConcreteDataType::Duration(_) => { return error::IntoColumnDataTypeSnafu { from: datatype }.fail(); } }; @@ -464,6 +500,25 @@ impl TryFrom for ColumnDataTypeWrapper { None } } + ColumnDataType::Dictionary => { + if let ConcreteDataType::Dictionary(dict_type) = &datatype { + let key_type = ColumnDataTypeWrapper::try_from(dict_type.key_type().clone())?; + let value_type = + ColumnDataTypeWrapper::try_from(dict_type.value_type().clone())?; + Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::DictionaryType(Box::new( + DictionaryTypeExtension { + key_datatype: key_type.datatype.into(), + key_datatype_extension: key_type.datatype_ext.map(Box::new), + value_datatype: value_type.datatype.into(), + value_datatype_extension: value_type.datatype_ext.map(Box::new), + }, + ))), + }) + } else { + None + } + } _ => None, }; Ok(Self { @@ -602,6 +657,9 @@ pub fn values_with_capacity(datatype: ColumnDataType, capacity: usize) -> Values struct_values: Vec::with_capacity(capacity), ..Default::default() }, + ColumnDataType::Dictionary => Values { + ..Default::default() + }, } } @@ -1305,6 +1363,9 @@ mod tests { let values = values_with_capacity(ColumnDataType::Json, 2); assert_eq!(2, values.json_values.capacity()); assert_eq!(2, values.string_values.capacity()); + + let values = values_with_capacity(ColumnDataType::Dictionary, 2); + assert!(values.bool_values.is_empty()); } #[test] @@ -1401,6 +1462,17 @@ mod tests { ConcreteDataType::list_datatype(Arc::new(ConcreteDataType::string_datatype())), ColumnDataTypeWrapper::list_datatype(ColumnDataTypeWrapper::string_datatype()).into() ); + assert_eq!( + ConcreteDataType::dictionary_datatype( + ConcreteDataType::int32_datatype(), + ConcreteDataType::string_datatype() + ), + ColumnDataTypeWrapper::dictionary_datatype( + ColumnDataTypeWrapper::int32_datatype(), + ColumnDataTypeWrapper::string_datatype() + ) + .into() + ); let struct_type = StructType::new(Arc::new(vec![ StructField::new("id".to_string(), ConcreteDataType::int64_datatype(), true), StructField::new( @@ -1571,6 +1643,18 @@ mod tests { ColumnDataTypeWrapper::vector_datatype(3), ConcreteDataType::vector_datatype(3).try_into().unwrap() ); + assert_eq!( + ColumnDataTypeWrapper::dictionary_datatype( + ColumnDataTypeWrapper::int32_datatype(), + ColumnDataTypeWrapper::string_datatype() + ), + ConcreteDataType::dictionary_datatype( + ConcreteDataType::int32_datatype(), + ConcreteDataType::string_datatype() + ) + .try_into() + .unwrap() + ); let result: Result = ConcreteDataType::null_datatype().try_into(); assert!(result.is_err()); diff --git a/src/common/macro/src/row/schema.rs b/src/common/macro/src/row/schema.rs index 67848a36a0..82296655f9 100644 --- a/src/common/macro/src/row/schema.rs +++ b/src/common/macro/src/row/schema.rs @@ -12,8 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use greptime_proto::v1::ColumnDataTypeExtension; use greptime_proto::v1::column_data_type_extension::TypeExt; -use proc_macro2::TokenStream as TokenStream2; +use proc_macro2::{Span, TokenStream as TokenStream2}; use quote::quote; use syn::spanned::Spanned; use syn::{DeriveInput, Result}; @@ -69,57 +70,7 @@ fn impl_schema_method(fields: &[ParsedField<'_>]) -> Result { let semantic_type_val = convert_semantic_type_to_proto_semantic_type(column_attribute.semantic_type) as i32; let semantic_type = syn::LitInt::new(&semantic_type_val.to_string(), ident.span()); let extension = match extension { - Some(ext) => { - match ext.type_ext { - Some(TypeExt::DecimalType(ext)) => { - let precision = syn::LitInt::new(&ext.precision.to_string(), ident.span()); - let scale = syn::LitInt::new(&ext.scale.to_string(), ident.span()); - quote! { - Some(ColumnDataTypeExtension { type_ext: Some(TypeExt::DecimalType(DecimalTypeExtension { precision: #precision, scale: #scale })) }) - } - } - Some(TypeExt::JsonType(ext)) => { - let json_type = syn::LitInt::new(&ext.to_string(), ident.span()); - quote! { - Some(ColumnDataTypeExtension { type_ext: Some(TypeExt::JsonType(#json_type)) }) - } - } - Some(TypeExt::VectorType(ext)) => { - let dim = syn::LitInt::new(&ext.dim.to_string(), ident.span()); - quote! { - Some(ColumnDataTypeExtension { type_ext: Some(TypeExt::VectorType(VectorTypeExtension { dim: #dim })) }) - } - } - // TODO(sunng87): revisit all these implementations - Some(TypeExt::ListType(ext)) => { - let item_type = syn::Ident::new(&ext.datatype.to_string(), ident.span()); - quote! { - Some(ColumnDataTypeExtension { type_ext: Some(TypeExt::ListType(ListTypeExtension { item_type: #item_type })) }) - } - } - Some(TypeExt::StructType(ext)) => { - let fields = ext.fields.iter().map(|field| { - let field_name = syn::Ident::new(&field.name.clone(), ident.span()); - let field_type = syn::Ident::new(&field.datatype.to_string(), ident.span()); - quote! { - StructField { name: #field_name, type_: #field_type } - } - }).collect::>(); - quote! { - Some(ColumnDataTypeExtension { type_ext: Some(TypeExt::StructType(StructTypeExtension { fields: [#(#fields),*] })) }) - } - } - Some(TypeExt::JsonNativeType(ext)) => { - let inner = syn::Ident::new(&ext.datatype.to_string(), ident.span()); - quote! { - Some(ColumnDataTypeExtension { type_ext: Some(TypeExt::JsonNativeType(JsonNativeTypeExtension { datatype: #inner })) }) - } - } - None => { - quote! { None } - } - } - } + Some(ext) => column_data_type_extension_to_tokens(&ext, ident.span()), None => quote! { None }, }; @@ -141,3 +92,125 @@ fn impl_schema_method(fields: &[ParsedField<'_>]) -> Result { } }) } + +fn column_data_type_extension_to_tokens( + extension: &ColumnDataTypeExtension, + span: Span, +) -> TokenStream2 { + match extension.type_ext.as_ref() { + Some(TypeExt::DecimalType(ext)) => { + let precision = syn::LitInt::new(&ext.precision.to_string(), span); + let scale = syn::LitInt::new(&ext.scale.to_string(), span); + quote! { + Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::DecimalType(DecimalTypeExtension { + precision: #precision, + scale: #scale, + })), + }) + } + } + Some(TypeExt::JsonType(ext)) => { + let json_type = syn::LitInt::new(&ext.to_string(), span); + quote! { + Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::JsonType(#json_type)), + }) + } + } + Some(TypeExt::VectorType(ext)) => { + let dim = syn::LitInt::new(&ext.dim.to_string(), span); + quote! { + Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::VectorType(VectorTypeExtension { dim: #dim })), + }) + } + } + Some(TypeExt::ListType(ext)) => { + let datatype = syn::LitInt::new(&ext.datatype.to_string(), span); + let datatype_extension = ext + .datatype_extension + .as_deref() + .map(|ext| column_data_type_extension_to_tokens(ext, span)) + .unwrap_or_else(|| quote! { None }); + quote! { + Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::ListType(Box::new(ListTypeExtension { + datatype: #datatype, + datatype_extension: #datatype_extension, + }))), + }) + } + } + Some(TypeExt::StructType(ext)) => { + let fields = ext.fields.iter().map(|field| { + let field_name = &field.name; + let datatype = syn::LitInt::new(&field.datatype.to_string(), span); + let datatype_extension = field + .datatype_extension + .as_ref() + .map(|ext| column_data_type_extension_to_tokens(ext, span)) + .unwrap_or_else(|| quote! { None }); + quote! { + greptime_proto::v1::StructField { + name: #field_name.to_string(), + datatype: #datatype, + datatype_extension: #datatype_extension, + } + } + }); + quote! { + Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::StructType(StructTypeExtension { + fields: vec![#(#fields),*], + })), + }) + } + } + Some(TypeExt::JsonNativeType(ext)) => { + let inner = syn::LitInt::new(&ext.datatype.to_string(), span); + let datatype_extension = ext + .datatype_extension + .as_deref() + .map(|ext| column_data_type_extension_to_tokens(ext, span)) + .unwrap_or_else(|| quote! { None }); + quote! { + Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::JsonNativeType(Box::new( + JsonNativeTypeExtension { + datatype: #inner, + datatype_extension: #datatype_extension, + }, + ))), + }) + } + } + Some(TypeExt::DictionaryType(ext)) => { + let key_datatype = syn::LitInt::new(&ext.key_datatype.to_string(), span); + let value_datatype = syn::LitInt::new(&ext.value_datatype.to_string(), span); + let key_datatype_extension = ext + .key_datatype_extension + .as_deref() + .map(|ext| column_data_type_extension_to_tokens(ext, span)) + .unwrap_or_else(|| quote! { None }); + let value_datatype_extension = ext + .value_datatype_extension + .as_deref() + .map(|ext| column_data_type_extension_to_tokens(ext, span)) + .unwrap_or_else(|| quote! { None }); + quote! { + Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::DictionaryType(Box::new( + DictionaryTypeExtension { + key_datatype: #key_datatype, + key_datatype_extension: #key_datatype_extension, + value_datatype: #value_datatype, + value_datatype_extension: #value_datatype_extension, + }, + ))), + }) + } + } + None => quote! { None }, + } +} diff --git a/src/common/macro/src/row/utils.rs b/src/common/macro/src/row/utils.rs index 40f990a40a..1768b2747a 100644 --- a/src/common/macro/src/row/utils.rs +++ b/src/common/macro/src/row/utils.rs @@ -309,5 +309,8 @@ pub(crate) fn convert_column_data_type_to_value_data_ident( ColumnDataType::Vector => format_ident!("VectorValue"), ColumnDataType::List => format_ident!("ListValue"), ColumnDataType::Struct => format_ident!("StructValue"), + ColumnDataType::Dictionary => { + panic!("Dictionary data type is not supported in row macros yet") + } } } diff --git a/src/operator/src/req_convert/common.rs b/src/operator/src/req_convert/common.rs index 37529d55c6..63226ef8e4 100644 --- a/src/operator/src/req_convert/common.rs +++ b/src/operator/src/req_convert/common.rs @@ -223,7 +223,15 @@ fn push_column_to_rows(column: Column, rows: &mut [Row]) -> Result<()> { } } - )* }} + )* _ => { + return InvalidInsertRequestSnafu { + reason: format!( + "Column '{}' with type {:?} is not supported in row inserts.", + column.column_name, column_type + ), + } + .fail(); + } }} } push_column_values_match_types!(