mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-27 18:30:38 +00:00
feat: support Dictionary type (#7277)
* feat: support Dictionary type Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix format Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * update proto Signed-off-by: Ruihang Xia <waynestxia@gmail.com> --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -5348,7 +5348,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=fd699b21991e358201ee81ead4d319545c5df2ad#fd699b21991e358201ee81ead4d319545c5df2ad"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=437e6a1ef8139e3946dcc043ea22c7fe6877019d#437e6a1ef8139e3946dcc043ea22c7fe6877019d"
|
||||
dependencies = [
|
||||
"prost 0.13.5",
|
||||
"prost-types 0.13.5",
|
||||
|
||||
@@ -148,7 +148,7 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "fd699b21991e358201ee81ead4d319545c5df2ad" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "437e6a1ef8139e3946dcc043ea22c7fe6877019d" }
|
||||
hex = "0.4"
|
||||
http = "1"
|
||||
humantime = "2.1"
|
||||
|
||||
@@ -35,9 +35,9 @@ use greptime_proto::v1::greptime_request::Request;
|
||||
use greptime_proto::v1::query_request::Query;
|
||||
use greptime_proto::v1::value::ValueData;
|
||||
use greptime_proto::v1::{
|
||||
self, ColumnDataTypeExtension, DdlRequest, DecimalTypeExtension, JsonList,
|
||||
JsonNativeTypeExtension, JsonObject, JsonTypeExtension, ListTypeExtension, QueryRequest, Row,
|
||||
SemanticType, StructTypeExtension, VectorTypeExtension, json_value,
|
||||
self, ColumnDataTypeExtension, DdlRequest, DecimalTypeExtension, DictionaryTypeExtension,
|
||||
JsonList, JsonNativeTypeExtension, JsonObject, JsonTypeExtension, ListTypeExtension,
|
||||
QueryRequest, Row, SemanticType, StructTypeExtension, VectorTypeExtension, json_value,
|
||||
};
|
||||
use paste::paste;
|
||||
use snafu::prelude::*;
|
||||
@@ -216,6 +216,26 @@ impl From<ColumnDataTypeWrapper> for ConcreteDataType {
|
||||
ConcreteDataType::null_datatype()
|
||||
}
|
||||
}
|
||||
ColumnDataType::Dictionary => {
|
||||
if let Some(TypeExt::DictionaryType(d)) = datatype_wrapper
|
||||
.datatype_ext
|
||||
.as_ref()
|
||||
.and_then(|datatype_ext| datatype_ext.type_ext.as_ref())
|
||||
{
|
||||
let key_type = ColumnDataTypeWrapper {
|
||||
datatype: d.key_datatype(),
|
||||
datatype_ext: d.key_datatype_extension.clone().map(|ext| *ext),
|
||||
};
|
||||
let value_type = ColumnDataTypeWrapper {
|
||||
datatype: d.value_datatype(),
|
||||
datatype_ext: d.value_datatype_extension.clone().map(|ext| *ext),
|
||||
};
|
||||
ConcreteDataType::dictionary_datatype(key_type.into(), value_type.into())
|
||||
} else {
|
||||
// invalid state: type extension not found
|
||||
ConcreteDataType::null_datatype()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -339,6 +359,23 @@ impl ColumnDataTypeWrapper {
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn dictionary_datatype(
|
||||
key_type: ColumnDataTypeWrapper,
|
||||
value_type: ColumnDataTypeWrapper,
|
||||
) -> Self {
|
||||
ColumnDataTypeWrapper {
|
||||
datatype: ColumnDataType::Dictionary,
|
||||
datatype_ext: Some(ColumnDataTypeExtension {
|
||||
type_ext: Some(TypeExt::DictionaryType(Box::new(DictionaryTypeExtension {
|
||||
key_datatype: key_type.datatype().into(),
|
||||
key_datatype_extension: key_type.datatype_ext.map(Box::new),
|
||||
value_datatype: value_type.datatype().into(),
|
||||
value_datatype_extension: value_type.datatype_ext.map(Box::new),
|
||||
}))),
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
|
||||
@@ -382,9 +419,8 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
|
||||
ConcreteDataType::Vector(_) => ColumnDataType::Vector,
|
||||
ConcreteDataType::List(_) => ColumnDataType::List,
|
||||
ConcreteDataType::Struct(_) => ColumnDataType::Struct,
|
||||
ConcreteDataType::Null(_)
|
||||
| ConcreteDataType::Dictionary(_)
|
||||
| ConcreteDataType::Duration(_) => {
|
||||
ConcreteDataType::Dictionary(_) => ColumnDataType::Dictionary,
|
||||
ConcreteDataType::Null(_) | ConcreteDataType::Duration(_) => {
|
||||
return error::IntoColumnDataTypeSnafu { from: datatype }.fail();
|
||||
}
|
||||
};
|
||||
@@ -464,6 +500,25 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
|
||||
None
|
||||
}
|
||||
}
|
||||
ColumnDataType::Dictionary => {
|
||||
if let ConcreteDataType::Dictionary(dict_type) = &datatype {
|
||||
let key_type = ColumnDataTypeWrapper::try_from(dict_type.key_type().clone())?;
|
||||
let value_type =
|
||||
ColumnDataTypeWrapper::try_from(dict_type.value_type().clone())?;
|
||||
Some(ColumnDataTypeExtension {
|
||||
type_ext: Some(TypeExt::DictionaryType(Box::new(
|
||||
DictionaryTypeExtension {
|
||||
key_datatype: key_type.datatype.into(),
|
||||
key_datatype_extension: key_type.datatype_ext.map(Box::new),
|
||||
value_datatype: value_type.datatype.into(),
|
||||
value_datatype_extension: value_type.datatype_ext.map(Box::new),
|
||||
},
|
||||
))),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
Ok(Self {
|
||||
@@ -602,6 +657,9 @@ pub fn values_with_capacity(datatype: ColumnDataType, capacity: usize) -> Values
|
||||
struct_values: Vec::with_capacity(capacity),
|
||||
..Default::default()
|
||||
},
|
||||
ColumnDataType::Dictionary => Values {
|
||||
..Default::default()
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1305,6 +1363,9 @@ mod tests {
|
||||
let values = values_with_capacity(ColumnDataType::Json, 2);
|
||||
assert_eq!(2, values.json_values.capacity());
|
||||
assert_eq!(2, values.string_values.capacity());
|
||||
|
||||
let values = values_with_capacity(ColumnDataType::Dictionary, 2);
|
||||
assert!(values.bool_values.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1401,6 +1462,17 @@ mod tests {
|
||||
ConcreteDataType::list_datatype(Arc::new(ConcreteDataType::string_datatype())),
|
||||
ColumnDataTypeWrapper::list_datatype(ColumnDataTypeWrapper::string_datatype()).into()
|
||||
);
|
||||
assert_eq!(
|
||||
ConcreteDataType::dictionary_datatype(
|
||||
ConcreteDataType::int32_datatype(),
|
||||
ConcreteDataType::string_datatype()
|
||||
),
|
||||
ColumnDataTypeWrapper::dictionary_datatype(
|
||||
ColumnDataTypeWrapper::int32_datatype(),
|
||||
ColumnDataTypeWrapper::string_datatype()
|
||||
)
|
||||
.into()
|
||||
);
|
||||
let struct_type = StructType::new(Arc::new(vec![
|
||||
StructField::new("id".to_string(), ConcreteDataType::int64_datatype(), true),
|
||||
StructField::new(
|
||||
@@ -1571,6 +1643,18 @@ mod tests {
|
||||
ColumnDataTypeWrapper::vector_datatype(3),
|
||||
ConcreteDataType::vector_datatype(3).try_into().unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ColumnDataTypeWrapper::dictionary_datatype(
|
||||
ColumnDataTypeWrapper::int32_datatype(),
|
||||
ColumnDataTypeWrapper::string_datatype()
|
||||
),
|
||||
ConcreteDataType::dictionary_datatype(
|
||||
ConcreteDataType::int32_datatype(),
|
||||
ConcreteDataType::string_datatype()
|
||||
)
|
||||
.try_into()
|
||||
.unwrap()
|
||||
);
|
||||
|
||||
let result: Result<ColumnDataTypeWrapper> = ConcreteDataType::null_datatype().try_into();
|
||||
assert!(result.is_err());
|
||||
|
||||
@@ -12,8 +12,9 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use greptime_proto::v1::ColumnDataTypeExtension;
|
||||
use greptime_proto::v1::column_data_type_extension::TypeExt;
|
||||
use proc_macro2::TokenStream as TokenStream2;
|
||||
use proc_macro2::{Span, TokenStream as TokenStream2};
|
||||
use quote::quote;
|
||||
use syn::spanned::Spanned;
|
||||
use syn::{DeriveInput, Result};
|
||||
@@ -69,57 +70,7 @@ fn impl_schema_method(fields: &[ParsedField<'_>]) -> Result<TokenStream2> {
|
||||
let semantic_type_val = convert_semantic_type_to_proto_semantic_type(column_attribute.semantic_type) as i32;
|
||||
let semantic_type = syn::LitInt::new(&semantic_type_val.to_string(), ident.span());
|
||||
let extension = match extension {
|
||||
Some(ext) => {
|
||||
match ext.type_ext {
|
||||
Some(TypeExt::DecimalType(ext)) => {
|
||||
let precision = syn::LitInt::new(&ext.precision.to_string(), ident.span());
|
||||
let scale = syn::LitInt::new(&ext.scale.to_string(), ident.span());
|
||||
quote! {
|
||||
Some(ColumnDataTypeExtension { type_ext: Some(TypeExt::DecimalType(DecimalTypeExtension { precision: #precision, scale: #scale })) })
|
||||
}
|
||||
}
|
||||
Some(TypeExt::JsonType(ext)) => {
|
||||
let json_type = syn::LitInt::new(&ext.to_string(), ident.span());
|
||||
quote! {
|
||||
Some(ColumnDataTypeExtension { type_ext: Some(TypeExt::JsonType(#json_type)) })
|
||||
}
|
||||
}
|
||||
Some(TypeExt::VectorType(ext)) => {
|
||||
let dim = syn::LitInt::new(&ext.dim.to_string(), ident.span());
|
||||
quote! {
|
||||
Some(ColumnDataTypeExtension { type_ext: Some(TypeExt::VectorType(VectorTypeExtension { dim: #dim })) })
|
||||
}
|
||||
}
|
||||
// TODO(sunng87): revisit all these implementations
|
||||
Some(TypeExt::ListType(ext)) => {
|
||||
let item_type = syn::Ident::new(&ext.datatype.to_string(), ident.span());
|
||||
quote! {
|
||||
Some(ColumnDataTypeExtension { type_ext: Some(TypeExt::ListType(ListTypeExtension { item_type: #item_type })) })
|
||||
}
|
||||
}
|
||||
Some(TypeExt::StructType(ext)) => {
|
||||
let fields = ext.fields.iter().map(|field| {
|
||||
let field_name = syn::Ident::new(&field.name.clone(), ident.span());
|
||||
let field_type = syn::Ident::new(&field.datatype.to_string(), ident.span());
|
||||
quote! {
|
||||
StructField { name: #field_name, type_: #field_type }
|
||||
}
|
||||
}).collect::<Vec<_>>();
|
||||
quote! {
|
||||
Some(ColumnDataTypeExtension { type_ext: Some(TypeExt::StructType(StructTypeExtension { fields: [#(#fields),*] })) })
|
||||
}
|
||||
}
|
||||
Some(TypeExt::JsonNativeType(ext)) => {
|
||||
let inner = syn::Ident::new(&ext.datatype.to_string(), ident.span());
|
||||
quote! {
|
||||
Some(ColumnDataTypeExtension { type_ext: Some(TypeExt::JsonNativeType(JsonNativeTypeExtension { datatype: #inner })) })
|
||||
}
|
||||
}
|
||||
None => {
|
||||
quote! { None }
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(ext) => column_data_type_extension_to_tokens(&ext, ident.span()),
|
||||
None => quote! { None },
|
||||
};
|
||||
|
||||
@@ -141,3 +92,125 @@ fn impl_schema_method(fields: &[ParsedField<'_>]) -> Result<TokenStream2> {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn column_data_type_extension_to_tokens(
|
||||
extension: &ColumnDataTypeExtension,
|
||||
span: Span,
|
||||
) -> TokenStream2 {
|
||||
match extension.type_ext.as_ref() {
|
||||
Some(TypeExt::DecimalType(ext)) => {
|
||||
let precision = syn::LitInt::new(&ext.precision.to_string(), span);
|
||||
let scale = syn::LitInt::new(&ext.scale.to_string(), span);
|
||||
quote! {
|
||||
Some(ColumnDataTypeExtension {
|
||||
type_ext: Some(TypeExt::DecimalType(DecimalTypeExtension {
|
||||
precision: #precision,
|
||||
scale: #scale,
|
||||
})),
|
||||
})
|
||||
}
|
||||
}
|
||||
Some(TypeExt::JsonType(ext)) => {
|
||||
let json_type = syn::LitInt::new(&ext.to_string(), span);
|
||||
quote! {
|
||||
Some(ColumnDataTypeExtension {
|
||||
type_ext: Some(TypeExt::JsonType(#json_type)),
|
||||
})
|
||||
}
|
||||
}
|
||||
Some(TypeExt::VectorType(ext)) => {
|
||||
let dim = syn::LitInt::new(&ext.dim.to_string(), span);
|
||||
quote! {
|
||||
Some(ColumnDataTypeExtension {
|
||||
type_ext: Some(TypeExt::VectorType(VectorTypeExtension { dim: #dim })),
|
||||
})
|
||||
}
|
||||
}
|
||||
Some(TypeExt::ListType(ext)) => {
|
||||
let datatype = syn::LitInt::new(&ext.datatype.to_string(), span);
|
||||
let datatype_extension = ext
|
||||
.datatype_extension
|
||||
.as_deref()
|
||||
.map(|ext| column_data_type_extension_to_tokens(ext, span))
|
||||
.unwrap_or_else(|| quote! { None });
|
||||
quote! {
|
||||
Some(ColumnDataTypeExtension {
|
||||
type_ext: Some(TypeExt::ListType(Box::new(ListTypeExtension {
|
||||
datatype: #datatype,
|
||||
datatype_extension: #datatype_extension,
|
||||
}))),
|
||||
})
|
||||
}
|
||||
}
|
||||
Some(TypeExt::StructType(ext)) => {
|
||||
let fields = ext.fields.iter().map(|field| {
|
||||
let field_name = &field.name;
|
||||
let datatype = syn::LitInt::new(&field.datatype.to_string(), span);
|
||||
let datatype_extension = field
|
||||
.datatype_extension
|
||||
.as_ref()
|
||||
.map(|ext| column_data_type_extension_to_tokens(ext, span))
|
||||
.unwrap_or_else(|| quote! { None });
|
||||
quote! {
|
||||
greptime_proto::v1::StructField {
|
||||
name: #field_name.to_string(),
|
||||
datatype: #datatype,
|
||||
datatype_extension: #datatype_extension,
|
||||
}
|
||||
}
|
||||
});
|
||||
quote! {
|
||||
Some(ColumnDataTypeExtension {
|
||||
type_ext: Some(TypeExt::StructType(StructTypeExtension {
|
||||
fields: vec![#(#fields),*],
|
||||
})),
|
||||
})
|
||||
}
|
||||
}
|
||||
Some(TypeExt::JsonNativeType(ext)) => {
|
||||
let inner = syn::LitInt::new(&ext.datatype.to_string(), span);
|
||||
let datatype_extension = ext
|
||||
.datatype_extension
|
||||
.as_deref()
|
||||
.map(|ext| column_data_type_extension_to_tokens(ext, span))
|
||||
.unwrap_or_else(|| quote! { None });
|
||||
quote! {
|
||||
Some(ColumnDataTypeExtension {
|
||||
type_ext: Some(TypeExt::JsonNativeType(Box::new(
|
||||
JsonNativeTypeExtension {
|
||||
datatype: #inner,
|
||||
datatype_extension: #datatype_extension,
|
||||
},
|
||||
))),
|
||||
})
|
||||
}
|
||||
}
|
||||
Some(TypeExt::DictionaryType(ext)) => {
|
||||
let key_datatype = syn::LitInt::new(&ext.key_datatype.to_string(), span);
|
||||
let value_datatype = syn::LitInt::new(&ext.value_datatype.to_string(), span);
|
||||
let key_datatype_extension = ext
|
||||
.key_datatype_extension
|
||||
.as_deref()
|
||||
.map(|ext| column_data_type_extension_to_tokens(ext, span))
|
||||
.unwrap_or_else(|| quote! { None });
|
||||
let value_datatype_extension = ext
|
||||
.value_datatype_extension
|
||||
.as_deref()
|
||||
.map(|ext| column_data_type_extension_to_tokens(ext, span))
|
||||
.unwrap_or_else(|| quote! { None });
|
||||
quote! {
|
||||
Some(ColumnDataTypeExtension {
|
||||
type_ext: Some(TypeExt::DictionaryType(Box::new(
|
||||
DictionaryTypeExtension {
|
||||
key_datatype: #key_datatype,
|
||||
key_datatype_extension: #key_datatype_extension,
|
||||
value_datatype: #value_datatype,
|
||||
value_datatype_extension: #value_datatype_extension,
|
||||
},
|
||||
))),
|
||||
})
|
||||
}
|
||||
}
|
||||
None => quote! { None },
|
||||
}
|
||||
}
|
||||
|
||||
@@ -309,5 +309,8 @@ pub(crate) fn convert_column_data_type_to_value_data_ident(
|
||||
ColumnDataType::Vector => format_ident!("VectorValue"),
|
||||
ColumnDataType::List => format_ident!("ListValue"),
|
||||
ColumnDataType::Struct => format_ident!("StructValue"),
|
||||
ColumnDataType::Dictionary => {
|
||||
panic!("Dictionary data type is not supported in row macros yet")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -223,7 +223,15 @@ fn push_column_to_rows(column: Column, rows: &mut [Row]) -> Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
)* }}
|
||||
)* _ => {
|
||||
return InvalidInsertRequestSnafu {
|
||||
reason: format!(
|
||||
"Column '{}' with type {:?} is not supported in row inserts.",
|
||||
column.column_name, column_type
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
} }}
|
||||
}
|
||||
|
||||
push_column_values_match_types!(
|
||||
|
||||
Reference in New Issue
Block a user