From fb92e4d0b2ebc1d4b81aacd95ac12eaf61baec77 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Thu, 6 Nov 2025 02:34:57 +0800 Subject: [PATCH] feat: add greptime's arrow json extension type (#7168) * feat: add arrow json extension type * feat: add json structure settings to extension type * refactor: store json structure settings as extension metadata * chore: make binary an acceptable type for extension --- Cargo.lock | 1 + src/api/Cargo.toml | 1 + src/api/src/v1/column_def.rs | 25 ++++-- src/datatypes/src/error.rs | 11 ++- src/datatypes/src/extension.rs | 15 ++++ src/datatypes/src/extension/json.rs | 104 ++++++++++++++++++++++ src/datatypes/src/lib.rs | 1 + src/datatypes/src/schema.rs | 5 +- src/datatypes/src/schema/column_schema.rs | 49 +++++++--- src/query/src/sql/show_create_table.rs | 9 +- src/sql/src/statements.rs | 8 +- 11 files changed, 204 insertions(+), 25 deletions(-) create mode 100644 src/datatypes/src/extension.rs create mode 100644 src/datatypes/src/extension/json.rs diff --git a/Cargo.lock b/Cargo.lock index 198f009fea..57e5e7c6f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -214,6 +214,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c" name = "api" version = "0.18.0" dependencies = [ + "arrow-schema", "common-base", "common-decimal", "common-error", diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index f5826d01a7..3515b788b5 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true workspace = true [dependencies] +arrow-schema.workspace = true common-base.workspace = true common-decimal.workspace = true common-error.workspace = true diff --git a/src/api/src/v1/column_def.rs b/src/api/src/v1/column_def.rs index 912b7ee13e..88ee0c5749 100644 --- a/src/api/src/v1/column_def.rs +++ b/src/api/src/v1/column_def.rs @@ -14,10 +14,11 @@ use std::collections::HashMap; +use arrow_schema::extension::{EXTENSION_TYPE_METADATA_KEY, EXTENSION_TYPE_NAME_KEY}; use datatypes::schema::{ COMMENT_KEY, ColumnDefaultConstraint, ColumnSchema, FULLTEXT_KEY, FulltextAnalyzer, - FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, JSON_STRUCTURE_SETTINGS_KEY, - SKIPPING_INDEX_KEY, SkippingIndexOptions, SkippingIndexType, + FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY, SkippingIndexOptions, + SkippingIndexType, }; use greptime_proto::v1::{ Analyzer, FulltextBackend as PbFulltextBackend, SkippingIndexType as PbSkippingIndexType, @@ -68,8 +69,14 @@ pub fn try_as_column_schema(column_def: &ColumnDef) -> Result { if let Some(skipping_index) = options.options.get(SKIPPING_INDEX_GRPC_KEY) { metadata.insert(SKIPPING_INDEX_KEY.to_string(), skipping_index.to_owned()); } - if let Some(settings) = options.options.get(JSON_STRUCTURE_SETTINGS_KEY) { - metadata.insert(JSON_STRUCTURE_SETTINGS_KEY.to_string(), settings.clone()); + if let Some(extension_name) = options.options.get(EXTENSION_TYPE_NAME_KEY) { + metadata.insert(EXTENSION_TYPE_NAME_KEY.to_string(), extension_name.clone()); + } + if let Some(extension_metadata) = options.options.get(EXTENSION_TYPE_METADATA_KEY) { + metadata.insert( + EXTENSION_TYPE_METADATA_KEY.to_string(), + extension_metadata.clone(), + ); } } @@ -142,10 +149,16 @@ pub fn options_from_column_schema(column_schema: &ColumnSchema) -> Option StatusCode::Internal, + | InconsistentStructFieldsAndItems { .. } + | ArrowMetadata { .. } => StatusCode::Internal, } } diff --git a/src/datatypes/src/extension.rs b/src/datatypes/src/extension.rs new file mode 100644 index 0000000000..83776cdcc1 --- /dev/null +++ b/src/datatypes/src/extension.rs @@ -0,0 +1,15 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod json; diff --git a/src/datatypes/src/extension/json.rs b/src/datatypes/src/extension/json.rs new file mode 100644 index 0000000000..bd3bd94712 --- /dev/null +++ b/src/datatypes/src/extension/json.rs @@ -0,0 +1,104 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use arrow_schema::extension::ExtensionType; +use arrow_schema::{ArrowError, DataType}; +use serde::{Deserialize, Serialize}; + +use crate::json::JsonStructureSettings; + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct JsonMetadata { + /// Indicates how to handle JSON is stored in underlying data type + /// + /// This field can be `None` for data is converted to complete structured in-memory form. + pub json_structure_settings: Option, +} + +#[derive(Debug, Clone)] +pub struct JsonExtensionType(Arc); + +impl JsonExtensionType { + pub fn new(metadata: Arc) -> Self { + JsonExtensionType(metadata) + } +} + +impl ExtensionType for JsonExtensionType { + const NAME: &'static str = "greptime.json"; + type Metadata = Arc; + + fn metadata(&self) -> &Self::Metadata { + &self.0 + } + + fn serialize_metadata(&self) -> Option { + serde_json::to_string(self.metadata()).ok() + } + + fn deserialize_metadata(metadata: Option<&str>) -> Result { + if let Some(metadata) = metadata { + let metadata = serde_json::from_str(metadata).map_err(|e| { + ArrowError::ParseError(format!("Failed to deserialize JSON metadata: {}", e)) + })?; + Ok(Arc::new(metadata)) + } else { + Ok(Arc::new(JsonMetadata::default())) + } + } + + fn supports_data_type(&self, data_type: &DataType) -> Result<(), ArrowError> { + match data_type { + // object + DataType::Struct(_) + // array + | DataType::List(_) + | DataType::ListView(_) + | DataType::LargeList(_) + | DataType::LargeListView(_) + // string + | DataType::Utf8 + | DataType::Utf8View + | DataType::LargeUtf8 + // number + | DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + | DataType::Float32 + | DataType::Float64 + // boolean + | DataType::Boolean + // null + | DataType::Null + // legacy json type + | DataType::Binary => Ok(()), + dt => Err(ArrowError::SchemaError(format!( + "Unexpected data type {dt}" + ))), + } + } + + fn try_new(data_type: &DataType, metadata: Self::Metadata) -> Result { + let json = Self(metadata); + json.supports_data_type(data_type)?; + Ok(json) + } +} diff --git a/src/datatypes/src/lib.rs b/src/datatypes/src/lib.rs index 56be63b229..2c3d4c23bf 100644 --- a/src/datatypes/src/lib.rs +++ b/src/datatypes/src/lib.rs @@ -19,6 +19,7 @@ pub mod arrow_array; pub mod data_type; pub mod duration; pub mod error; +pub mod extension; pub mod interval; pub mod json; pub mod macros; diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index 9995072b7c..6bdf321137 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -32,9 +32,8 @@ pub use crate::schema::column_schema::{ COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE, COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE, COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY, ColumnExtType, ColumnSchema, FULLTEXT_KEY, - FulltextAnalyzer, FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, - JSON_STRUCTURE_SETTINGS_KEY, Metadata, SKIPPING_INDEX_KEY, SkippingIndexOptions, - SkippingIndexType, TIME_INDEX_KEY, + FulltextAnalyzer, FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, Metadata, + SKIPPING_INDEX_KEY, SkippingIndexOptions, SkippingIndexType, TIME_INDEX_KEY, }; pub use crate::schema::constraint::ColumnDefaultConstraint; pub use crate::schema::raw::RawSchema; diff --git a/src/datatypes/src/schema/column_schema.rs b/src/datatypes/src/schema/column_schema.rs index 627d898810..2ba7beb701 100644 --- a/src/datatypes/src/schema/column_schema.rs +++ b/src/datatypes/src/schema/column_schema.rs @@ -17,13 +17,17 @@ use std::fmt; use std::str::FromStr; use arrow::datatypes::Field; +use arrow_schema::extension::{ + EXTENSION_TYPE_METADATA_KEY, EXTENSION_TYPE_NAME_KEY, ExtensionType, +}; use serde::{Deserialize, Serialize}; use snafu::{ResultExt, ensure}; use sqlparser_derive::{Visit, VisitMut}; use crate::data_type::{ConcreteDataType, DataType}; -use crate::error::{self, Error, InvalidFulltextOptionSnafu, ParseExtendedTypeSnafu, Result}; -use crate::json::JsonStructureSettings; +use crate::error::{ + self, ArrowMetadataSnafu, Error, InvalidFulltextOptionSnafu, ParseExtendedTypeSnafu, Result, +}; use crate::schema::TYPE_KEY; use crate::schema::constraint::ColumnDefaultConstraint; use crate::value::Value; @@ -42,7 +46,6 @@ pub const FULLTEXT_KEY: &str = "greptime:fulltext"; pub const INVERTED_INDEX_KEY: &str = "greptime:inverted_index"; /// Key used to store skip options in arrow field's metadata. pub const SKIPPING_INDEX_KEY: &str = "greptime:skipping_index"; -pub const JSON_STRUCTURE_SETTINGS_KEY: &str = "greptime:json:structure_settings"; /// Keys used in fulltext options pub const COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE: &str = "enable"; @@ -394,18 +397,38 @@ impl ColumnSchema { Ok(()) } - pub fn json_structure_settings(&self) -> Result> { - self.metadata - .get(JSON_STRUCTURE_SETTINGS_KEY) - .map(|json| serde_json::from_str(json).context(error::DeserializeSnafu { json })) - .transpose() + pub fn extension_type(&self) -> Result> + where + E: ExtensionType, + { + let extension_type_name = self.metadata.get(EXTENSION_TYPE_NAME_KEY); + + if extension_type_name.map(|s| s.as_str()) == Some(E::NAME) { + let extension_metadata = self.metadata.get(EXTENSION_TYPE_METADATA_KEY); + let extension_metadata = + E::deserialize_metadata(extension_metadata.map(|s| s.as_str())) + .context(ArrowMetadataSnafu)?; + + let extension = E::try_new(&self.data_type.as_arrow_type(), extension_metadata) + .context(ArrowMetadataSnafu)?; + Ok(Some(extension)) + } else { + Ok(None) + } } - pub fn with_json_structure_settings(&mut self, settings: &JsonStructureSettings) -> Result<()> { - self.metadata.insert( - JSON_STRUCTURE_SETTINGS_KEY.to_string(), - serde_json::to_string(settings).context(error::SerializeSnafu)?, - ); + pub fn with_extension_type(&mut self, extension_type: &E) -> Result<()> + where + E: ExtensionType, + { + self.metadata + .insert(EXTENSION_TYPE_NAME_KEY.to_string(), E::NAME.to_string()); + + if let Some(extension_metadata) = extension_type.serialize_metadata() { + self.metadata + .insert(EXTENSION_TYPE_METADATA_KEY.to_string(), extension_metadata); + } + Ok(()) } } diff --git a/src/query/src/sql/show_create_table.rs b/src/query/src/sql/show_create_table.rs index 3b2d8aaceb..2c566af508 100644 --- a/src/query/src/sql/show_create_table.rs +++ b/src/query/src/sql/show_create_table.rs @@ -16,7 +16,9 @@ use std::collections::HashMap; +use arrow_schema::extension::ExtensionType; use common_meta::SchemaOptions; +use datatypes::extension::json::JsonExtensionType; use datatypes::schema::{ COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND, COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE, @@ -159,7 +161,12 @@ fn create_column(column_schema: &ColumnSchema, quote_style: char) -> Result()? { + let settings = json_extension + .metadata() + .json_structure_settings + .clone() + .unwrap_or_default(); extensions.set_json_structure_settings(settings); } diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index 823b123011..b493375373 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -32,10 +32,13 @@ pub mod tql; pub(crate) mod transform; pub mod truncate; +use std::sync::Arc; + use api::helper::ColumnDataTypeWrapper; use api::v1::SemanticType; use common_sql::default_constraint::parse_column_default_constraint; use common_time::timezone::Timezone; +use datatypes::extension::json::{JsonExtensionType, JsonMetadata}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{COMMENT_KEY, ColumnDefaultConstraint, ColumnSchema}; use datatypes::types::TimestampType; @@ -149,8 +152,11 @@ pub fn column_to_schema( .extensions .build_json_structure_settings()? .unwrap_or_default(); + let extension = JsonExtensionType::new(Arc::new(JsonMetadata { + json_structure_settings: Some(settings.clone()), + })); column_schema - .with_json_structure_settings(&settings) + .with_extension_type(&extension) .with_context(|_| SetJsonStructureSettingsSnafu { value: format!("{settings:?}"), })?;