From b53a0b86fb8b2f3cede177025c8a9ffb4bb40e3a Mon Sep 17 00:00:00 2001 From: LFC <990479+MichaelScofield@users.noreply.github.com> Date: Fri, 24 Oct 2025 10:16:49 +0800 Subject: [PATCH] feat: create table with new json datatype (#7128) * feat: create table with new json datatype Signed-off-by: luofucong * resolve PR comments Signed-off-by: luofucong --------- Signed-off-by: luofucong --- Cargo.lock | 1 + src/api/src/v1/column_def.rs | 12 ++++- src/common/test-util/src/recordbatch.rs | 2 +- src/datatypes/src/json.rs | 9 +++- src/datatypes/src/schema.rs | 5 +- src/datatypes/src/schema/column_schema.rs | 17 ++++++ src/datatypes/src/types/json_type.rs | 6 ++- src/query/src/error.rs | 14 +++-- src/query/src/sql/show_create_table.rs | 4 ++ src/sql/Cargo.toml | 1 + src/sql/src/error.rs | 12 ++++- src/sql/src/statements.rs | 16 +++++- src/sql/src/statements/create.rs | 44 ++++++++++++++- src/sql/src/statements/option_map.rs | 24 +++++++++ src/sql/src/util.rs | 17 ++++++ tests-integration/src/tests/instance_test.rs | 56 ++++++++++++++++++++ 16 files changed, 226 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f721f58369..231bd594ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11964,6 +11964,7 @@ dependencies = [ "datafusion-physical-expr", "datafusion-sql", "datatypes", + "either", "hex", "humantime", "iso8601", diff --git a/src/api/src/v1/column_def.rs b/src/api/src/v1/column_def.rs index 5be3d5c196..912b7ee13e 100644 --- a/src/api/src/v1/column_def.rs +++ b/src/api/src/v1/column_def.rs @@ -16,8 +16,8 @@ use std::collections::HashMap; use datatypes::schema::{ COMMENT_KEY, ColumnDefaultConstraint, ColumnSchema, FULLTEXT_KEY, FulltextAnalyzer, - FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY, SkippingIndexOptions, - SkippingIndexType, + FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, JSON_STRUCTURE_SETTINGS_KEY, + SKIPPING_INDEX_KEY, SkippingIndexOptions, SkippingIndexType, }; use greptime_proto::v1::{ Analyzer, FulltextBackend as PbFulltextBackend, SkippingIndexType as PbSkippingIndexType, @@ -68,6 +68,9 @@ pub fn try_as_column_schema(column_def: &ColumnDef) -> Result { if let Some(skipping_index) = options.options.get(SKIPPING_INDEX_GRPC_KEY) { metadata.insert(SKIPPING_INDEX_KEY.to_string(), skipping_index.to_owned()); } + if let Some(settings) = options.options.get(JSON_STRUCTURE_SETTINGS_KEY) { + metadata.insert(JSON_STRUCTURE_SETTINGS_KEY.to_string(), settings.clone()); + } } ColumnSchema::new(&column_def.name, data_type.into(), column_def.is_nullable) @@ -139,6 +142,11 @@ pub fn options_from_column_schema(column_schema: &ColumnSchema) -> Option unreachable!(), }; let pretty_print = recordbatches.pretty_print().unwrap(); - assert_eq!(pretty_print, expected, "actual: \n{}", pretty_print); + assert_eq!(pretty_print, expected.trim(), "actual: \n{}", pretty_print); } pub async fn execute_and_check_output(db: &Database, sql: &str, expected: ExpectedOutput<'_>) { diff --git a/src/datatypes/src/json.rs b/src/datatypes/src/json.rs index 380cc8ce06..902b84a131 100644 --- a/src/datatypes/src/json.rs +++ b/src/datatypes/src/json.rs @@ -24,6 +24,7 @@ use std::sync::Arc; use common_base::bytes::StringBytes; use ordered_float::OrderedFloat; +use serde::{Deserialize, Serialize}; use serde_json::{Map, Value as Json}; use snafu::{ResultExt, ensure}; @@ -45,7 +46,7 @@ use crate::value::{ListValue, StructValue, Value}; /// convert them to fully structured StructValue for user-facing APIs: the UI protocol and the UDF interface. /// /// **Important**: This settings only controls the internal form of JSON encoding. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub enum JsonStructureSettings { // TODO(sunng87): provide a limit Structured(Option), @@ -111,6 +112,12 @@ impl JsonStructureSettings { } } +impl Default for JsonStructureSettings { + fn default() -> Self { + Self::Structured(None) + } +} + impl<'a> JsonContext<'a> { /// Create a new context with an updated key path pub fn with_key(&self, key: &str) -> JsonContext<'a> { diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index 6bdf321137..9995072b7c 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -32,8 +32,9 @@ pub use crate::schema::column_schema::{ COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE, COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE, COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY, ColumnExtType, ColumnSchema, FULLTEXT_KEY, - FulltextAnalyzer, FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, Metadata, - SKIPPING_INDEX_KEY, SkippingIndexOptions, SkippingIndexType, TIME_INDEX_KEY, + FulltextAnalyzer, FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, + JSON_STRUCTURE_SETTINGS_KEY, Metadata, SKIPPING_INDEX_KEY, SkippingIndexOptions, + SkippingIndexType, TIME_INDEX_KEY, }; pub use crate::schema::constraint::ColumnDefaultConstraint; pub use crate::schema::raw::RawSchema; diff --git a/src/datatypes/src/schema/column_schema.rs b/src/datatypes/src/schema/column_schema.rs index f176350b8c..627d898810 100644 --- a/src/datatypes/src/schema/column_schema.rs +++ b/src/datatypes/src/schema/column_schema.rs @@ -23,6 +23,7 @@ use sqlparser_derive::{Visit, VisitMut}; use crate::data_type::{ConcreteDataType, DataType}; use crate::error::{self, Error, InvalidFulltextOptionSnafu, ParseExtendedTypeSnafu, Result}; +use crate::json::JsonStructureSettings; use crate::schema::TYPE_KEY; use crate::schema::constraint::ColumnDefaultConstraint; use crate::value::Value; @@ -41,6 +42,7 @@ pub const FULLTEXT_KEY: &str = "greptime:fulltext"; pub const INVERTED_INDEX_KEY: &str = "greptime:inverted_index"; /// Key used to store skip options in arrow field's metadata. pub const SKIPPING_INDEX_KEY: &str = "greptime:skipping_index"; +pub const JSON_STRUCTURE_SETTINGS_KEY: &str = "greptime:json:structure_settings"; /// Keys used in fulltext options pub const COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE: &str = "enable"; @@ -391,6 +393,21 @@ impl ColumnSchema { self.metadata.remove(SKIPPING_INDEX_KEY); Ok(()) } + + pub fn json_structure_settings(&self) -> Result> { + self.metadata + .get(JSON_STRUCTURE_SETTINGS_KEY) + .map(|json| serde_json::from_str(json).context(error::DeserializeSnafu { json })) + .transpose() + } + + pub fn with_json_structure_settings(&mut self, settings: &JsonStructureSettings) -> Result<()> { + self.metadata.insert( + JSON_STRUCTURE_SETTINGS_KEY.to_string(), + serde_json::to_string(settings).context(error::SerializeSnafu)?, + ); + Ok(()) + } } /// Column extended type set in column schema's metadata. diff --git a/src/datatypes/src/types/json_type.rs b/src/datatypes/src/types/json_type.rs index 01ec81dd08..99dcf9c571 100644 --- a/src/datatypes/src/types/json_type.rs +++ b/src/datatypes/src/types/json_type.rs @@ -15,6 +15,7 @@ use std::str::FromStr; use arrow::datatypes::DataType as ArrowDataType; +use arrow_schema::Fields; use common_base::bytes::Bytes; use serde::{Deserialize, Serialize}; use snafu::ResultExt; @@ -63,7 +64,10 @@ impl DataType for JsonType { } fn as_arrow_type(&self) -> ArrowDataType { - ArrowDataType::Binary + match self.format { + JsonFormat::Jsonb => ArrowDataType::Binary, + JsonFormat::Native(_) => ArrowDataType::Struct(Fields::empty()), + } } fn create_mutable_vector(&self, capacity: usize) -> Box { diff --git a/src/query/src/error.rs b/src/query/src/error.rs index 8cf64dbffc..4649b7fe49 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -353,6 +353,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(transparent)] + Datatypes { + source: datatypes::error::Error, + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for Error { @@ -406,9 +413,10 @@ impl ErrorExt for Error { MissingTableMutationHandler { .. } => StatusCode::Unexpected, GetRegionMetadata { .. } => StatusCode::RegionNotReady, TableReadOnly { .. } => StatusCode::Unsupported, - GetFulltextOptions { source, .. } | GetSkippingIndexOptions { source, .. } => { - source.status_code() - } + + GetFulltextOptions { source, .. } + | GetSkippingIndexOptions { source, .. } + | Datatypes { source, .. } => source.status_code(), } } diff --git a/src/query/src/sql/show_create_table.rs b/src/query/src/sql/show_create_table.rs index 5466bb91e6..3b2d8aaceb 100644 --- a/src/query/src/sql/show_create_table.rs +++ b/src/query/src/sql/show_create_table.rs @@ -159,6 +159,10 @@ fn create_column(column_schema: &ColumnSchema, quote_style: char) -> Result StatusCode::InvalidArguments, - SerializeColumnDefaultConstraint { source, .. } => source.status_code(), + SerializeColumnDefaultConstraint { source, .. } + | SetJsonStructureSettings { source, .. } => source.status_code(), + ConvertToGrpcDataType { source, .. } => source.status_code(), SqlCommon { source, .. } => source.status_code(), ConvertToDfStatement { .. } => StatusCode::Internal, diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index b48e208043..823b123011 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -49,8 +49,8 @@ use crate::ast::{ }; use crate::error::{ self, ConvertToGrpcDataTypeSnafu, ConvertValueSnafu, Result, - SerializeColumnDefaultConstraintSnafu, SetFulltextOptionSnafu, SetSkippingIndexOptionSnafu, - SqlCommonSnafu, + SerializeColumnDefaultConstraintSnafu, SetFulltextOptionSnafu, SetJsonStructureSettingsSnafu, + SetSkippingIndexOptionSnafu, SqlCommonSnafu, }; use crate::statements::create::Column; pub use crate::statements::option_map::OptionMap; @@ -144,6 +144,18 @@ pub fn column_to_schema( column_schema.set_inverted_index(column.extensions.inverted_index_options.is_some()); + if matches!(column.data_type(), SqlDataType::JSON) { + let settings = column + .extensions + .build_json_structure_settings()? + .unwrap_or_default(); + column_schema + .with_json_structure_settings(&settings) + .with_context(|_| SetJsonStructureSettingsSnafu { + value: format!("{settings:?}"), + })?; + } + Ok(column_schema) } diff --git a/src/sql/src/statements/create.rs b/src/sql/src/statements/create.rs index 9d945e7c8d..3c7f6d1731 100644 --- a/src/sql/src/statements/create.rs +++ b/src/sql/src/statements/create.rs @@ -32,6 +32,7 @@ use crate::error::{ use crate::statements::OptionMap; use crate::statements::statement::Statement; use crate::statements::tql::Tql; +use crate::util::OptionValue; const LINE_SEP: &str = ",\n"; const COMMA_SEP: &str = ", "; @@ -166,7 +167,20 @@ impl Display for Column { return Ok(()); } - write!(f, "{}", self.column_def)?; + write!(f, "{} {}", self.column_def.name, self.column_def.data_type)?; + if let Some(options) = &self.extensions.json_datatype_options { + write!( + f, + "({})", + options + .entries() + .map(|(k, v)| format!("{k} = {v}")) + .join(COMMA_SEP) + )?; + } + for option in &self.column_def.options { + write!(f, " {option}")?; + } if let Some(fulltext_options) = &self.extensions.fulltext_index_options { if !fulltext_options.is_empty() { @@ -251,6 +265,34 @@ impl ColumnExtensions { }) .transpose() } + + pub fn set_json_structure_settings(&mut self, settings: JsonStructureSettings) { + let mut map = OptionMap::default(); + + let format = match settings { + JsonStructureSettings::Structured(_) => JSON_FORMAT_FULL_STRUCTURED, + JsonStructureSettings::PartialUnstructuredByKey { .. } => JSON_FORMAT_PARTIAL, + JsonStructureSettings::UnstructuredRaw => JSON_FORMAT_RAW, + }; + map.insert(JSON_OPT_FORMAT.to_string(), format.to_string()); + + if let JsonStructureSettings::PartialUnstructuredByKey { + fields: _, + unstructured_keys, + } = settings + { + let value = OptionValue::from( + unstructured_keys + .iter() + .map(|x| x.as_str()) + .sorted() + .collect::>(), + ); + map.insert_options(JSON_OPT_UNSTRUCTURED_KEYS, value); + } + + self.json_datatype_options = Some(map); + } } /// Partition on columns or values. diff --git a/src/sql/src/statements/option_map.rs b/src/sql/src/statements/option_map.rs index f67b0dc72a..d6bd4d7608 100644 --- a/src/sql/src/statements/option_map.rs +++ b/src/sql/src/statements/option_map.rs @@ -16,6 +16,7 @@ use std::collections::{BTreeMap, HashMap}; use std::ops::ControlFlow; use common_base::secrets::{ExposeSecret, ExposeSecretMut, SecretString}; +use either::Either; use serde::Serialize; use sqlparser::ast::{Visit, VisitMut, Visitor, VisitorMut}; @@ -56,6 +57,17 @@ impl OptionMap { } } + pub fn insert_options(&mut self, key: &str, value: OptionValue) { + if REDACTED_OPTIONS.contains(&key) { + self.secrets.insert( + key.to_string(), + SecretString::new(Box::new(value.to_string())), + ); + } else { + self.options.insert(key.to_string(), value); + } + } + pub fn get(&self, k: &str) -> Option<&str> { if let Some(value) = self.options.get(k) { value.as_string() @@ -130,6 +142,18 @@ impl OptionMap { } result } + + pub fn entries(&self) -> impl Iterator)> { + let options = self + .options + .iter() + .map(|(k, v)| (k.as_str(), Either::Left(v))); + let secrets = self + .secrets + .keys() + .map(|k| (k.as_str(), Either::Right("******"))); + std::iter::chain(options, secrets) + } } impl> From for OptionMap { diff --git a/src/sql/src/util.rs b/src/sql/src/util.rs index f71dfcc8d7..3b221d7642 100644 --- a/src/sql/src/util.rs +++ b/src/sql/src/util.rs @@ -15,6 +15,7 @@ use std::collections::HashSet; use std::fmt::{Display, Formatter}; +use itertools::Itertools; use serde::Serialize; use snafu::ensure; use sqlparser::ast::{ @@ -131,6 +132,22 @@ impl From> for OptionValue { } } +impl Display for OptionValue { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + if let Some(s) = self.as_string() { + write!(f, "'{s}'") + } else if let Some(s) = self.as_list() { + write!( + f, + "[{}]", + s.into_iter().map(|x| format!("'{x}'")).join(", ") + ) + } else { + write!(f, "'{}'", self.0) + } + } +} + pub fn parse_option_string(option: SqlOption) -> Result<(String, OptionValue)> { let SqlOption::KeyValue { key, value } = option else { return InvalidSqlSnafu { diff --git a/tests-integration/src/tests/instance_test.rs b/tests-integration/src/tests/instance_test.rs index 95664323ff..a29e468bf6 100644 --- a/tests-integration/src/tests/instance_test.rs +++ b/tests-integration/src/tests/instance_test.rs @@ -2338,3 +2338,59 @@ async fn test_copy_parquet_map_to_binary(instance: Arc) { +----+-----------------------------------------+"#; check_output_stream(output, expected).await; } + +#[apply(both_instances_cases)] +async fn test_create_table_with_json_datatype(instance: Arc) { + let instance = instance.frontend(); + + let sql = r#" +CREATE TABLE a ( + j JSON(format = "partial", unstructured_keys = ["foo", "foo.bar"]), + ts TIMESTAMP TIME INDEX, +)"#; + let output = execute_sql(&instance, sql).await.data; + assert!(matches!(output, OutputData::AffectedRows(0))); + + // "show create table" finds the information from table metadata. + // So if the output is expected, we know the options are really set. + let output = execute_sql(&instance, "SHOW CREATE TABLE a").await.data; + let expected = r#" ++-------+------------------------------------------------------------------------------+ +| Table | Create Table | ++-------+------------------------------------------------------------------------------+ +| a | CREATE TABLE IF NOT EXISTS "a" ( | +| | "j" JSON(format = 'partial', unstructured_keys = ['foo', 'foo.bar']) NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------+------------------------------------------------------------------------------+"#; + check_output_stream(output, expected).await; + + // test the default options + let sql = r#" +CREATE TABLE b ( + j JSON, + ts TIMESTAMP TIME INDEX, +)"#; + let output = execute_sql(&instance, sql).await.data; + assert!(matches!(output, OutputData::AffectedRows(0))); + + let output = execute_sql(&instance, "SHOW CREATE TABLE b").await.data; + let expected = r#" ++-------+-----------------------------------------+ +| Table | Create Table | ++-------+-----------------------------------------+ +| b | CREATE TABLE IF NOT EXISTS "b" ( | +| | "j" JSON(format = 'structured') NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------+-----------------------------------------+"#; + check_output_stream(output, expected).await; +}