From 21532abf940b7cc10ddca9c1aa61e911ad2088bc Mon Sep 17 00:00:00 2001 From: LFC <990479+MichaelScofield@users.noreply.github.com> Date: Fri, 17 Oct 2025 13:22:29 +0800 Subject: [PATCH] feat: new create table syntax for new json datatype (#7103) * feat: new create table syntax for new json datatype Signed-off-by: luofucong * refactor: extract consts * refactor: remove unused error variant * fix tests Signed-off-by: luofucong * fix sqlness Signed-off-by: luofucong --------- Signed-off-by: luofucong Co-authored-by: Ning Sun --- Cargo.lock | 1 + src/operator/src/statement.rs | 4 +- src/sql/Cargo.toml | 1 + src/sql/src/error.rs | 11 +- src/sql/src/parsers/alter_parser.rs | 13 +- src/sql/src/parsers/alter_parser/trigger.rs | 4 +- src/sql/src/parsers/copy_parser.rs | 63 +++----- src/sql/src/parsers/create_parser.rs | 76 +++++----- src/sql/src/parsers/create_parser/json.rs | 138 ++++++++++++++++++ src/sql/src/parsers/create_parser/trigger.rs | 15 +- src/sql/src/statements.rs | 1 + src/sql/src/statements/copy.rs | 10 +- src/sql/src/statements/create.rs | 46 +++++- src/sql/src/statements/option_map.rs | 48 +++++- src/sql/src/util.rs | 112 +++++++++++--- src/table/src/error.rs | 6 +- .../alter/change_col_skipping_options.result | 2 +- 17 files changed, 408 insertions(+), 143 deletions(-) create mode 100644 src/sql/src/parsers/create_parser/json.rs diff --git a/Cargo.lock b/Cargo.lock index 67c193856c..a002ea91da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12005,6 +12005,7 @@ dependencies = [ "common-macro", "common-query", "common-sql", + "common-telemetry", "common-time", "datafusion", "datafusion-common", diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index 47a8994985..5dd39681b6 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -691,7 +691,7 @@ fn verify_time_related_format(with: &OptionMap) -> Result<()> { time_format.is_none() && date_format.is_none() && timestamp_format.is_none(), error::TimestampFormatNotSupportedSnafu { format: "".to_string(), - file_format: file_format.cloned().unwrap_or_default(), + file_format: file_format.unwrap_or_default(), } ); } @@ -742,7 +742,7 @@ fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result< let pattern = with .get(common_datasource::file_format::FILE_PATTERN) - .cloned(); + .map(|x| x.to_string()); Ok(CopyTableRequest { catalog_name, diff --git a/src/sql/Cargo.toml b/src/sql/Cargo.toml index 8459bb375d..fbfb8c480a 100644 --- a/src/sql/Cargo.toml +++ b/src/sql/Cargo.toml @@ -21,6 +21,7 @@ common-error.workspace = true common-macro.workspace = true common-query.workspace = true common-sql.workspace = true +common-telemetry.workspace = true common-time.workspace = true datafusion.workspace = true datafusion-common.workspace = true diff --git a/src/sql/src/error.rs b/src/sql/src/error.rs index 1748d3b9de..4caad26656 100644 --- a/src/sql/src/error.rs +++ b/src/sql/src/error.rs @@ -20,10 +20,8 @@ use common_macro::stack_trace_debug; use datafusion_common::DataFusionError; use datatypes::prelude::{ConcreteDataType, Value}; use snafu::{Location, Snafu}; -use sqlparser::ast::Ident; use sqlparser::parser::ParserError; -use crate::ast::Expr; use crate::parsers::error::TQLError; pub type Result = std::result::Result; @@ -210,10 +208,9 @@ pub enum Error { location: Location, }, - #[snafu(display("Unrecognized table option key: {}, value: {}", key, value))] - InvalidTableOptionValue { - key: Ident, - value: Expr, + #[snafu(display("Invalid expr as option value, error: {error}"))] + InvalidExprAsOptionValue { + error: String, #[snafu(implicit)] location: Location, }, @@ -361,7 +358,7 @@ impl ErrorExt for Error { } InvalidColumnOption { .. } - | InvalidTableOptionValue { .. } + | InvalidExprAsOptionValue { .. } | InvalidDatabaseName { .. } | InvalidDatabaseOption { .. } | ColumnTypeMismatch { .. } diff --git a/src/sql/src/parsers/alter_parser.rs b/src/sql/src/parsers/alter_parser.rs index 5f31ed8dc5..68f246f26a 100644 --- a/src/sql/src/parsers/alter_parser.rs +++ b/src/sql/src/parsers/alter_parser.rs @@ -32,13 +32,14 @@ use crate::parsers::create_parser::INVERTED; use crate::parsers::utils::{ validate_column_fulltext_create_option, validate_column_skipping_index_create_option, }; +use crate::statements::OptionMap; use crate::statements::alter::{ AddColumn, AlterDatabase, AlterDatabaseOperation, AlterTable, AlterTableOperation, DropDefaultsOperation, KeyValueOption, RepartitionOperation, SetDefaultsOperation, SetIndexOperation, UnsetIndexOperation, }; use crate::statements::statement::Statement; -use crate::util::parse_option_string; +use crate::util::{OptionValue, parse_option_string}; impl ParserContext<'_> { pub(crate) fn parse_alter(&mut self) -> Result { @@ -455,7 +456,7 @@ impl ParserContext<'_> { .context(error::SyntaxSnafu)? .into_iter() .map(parse_option_string) - .collect::>>()?; + .collect::>>()?; for key in options.keys() { ensure!( @@ -469,9 +470,10 @@ impl ParserContext<'_> { options.insert( COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE.to_string(), - "true".to_string(), + "true".to_string().into(), ); + let options = OptionMap::new(options).into_map(); Ok(AlterTableOperation::SetIndex { options: SetIndexOperation::Fulltext { column_name, @@ -487,9 +489,9 @@ impl ParserContext<'_> { .context(error::SyntaxSnafu)? .into_iter() .map(parse_option_string) - .collect::>>()?; + .collect::>>()?; - for key in options.keys() { + for (key, _) in options.iter() { ensure!( validate_column_skipping_index_create_option(key), InvalidColumnOptionSnafu { @@ -499,6 +501,7 @@ impl ParserContext<'_> { ); } + let options = OptionMap::new(options).into_map(); Ok(AlterTableOperation::SetIndex { options: SetIndexOperation::Skipping { column_name, diff --git a/src/sql/src/parsers/alter_parser/trigger.rs b/src/sql/src/parsers/alter_parser/trigger.rs index 87ba9cc610..73e9275acd 100644 --- a/src/sql/src/parsers/alter_parser/trigger.rs +++ b/src/sql/src/parsers/alter_parser/trigger.rs @@ -625,8 +625,8 @@ mod tests { }; assert_eq!(labels.len(), 2); - assert_eq!(labels.get("key1"), Some(&"value1".to_string())); - assert_eq!(labels.get("key2"), Some(&"VALUE2".to_string())); + assert_eq!(labels.get("key1"), Some("value1")); + assert_eq!(labels.get("key2"), Some("VALUE2")); // Passed case: multiple ADD/DROP/MODIFY LABELS. let sql = r#"test_trigger ADD LABELS (key1='value1') MODIFY LABELS (key2='value2') DROP LABELS ('key3')"#; diff --git a/src/sql/src/parsers/copy_parser.rs b/src/sql/src/parsers/copy_parser.rs index 185aa4dbcf..528c0de076 100644 --- a/src/sql/src/parsers/copy_parser.rs +++ b/src/sql/src/parsers/copy_parser.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; - use snafu::ResultExt; use sqlparser::keywords::Keyword; use sqlparser::tokenizer::Token; @@ -21,6 +19,7 @@ use sqlparser::tokenizer::Token::Word; use crate::error::{self, Result}; use crate::parser::ParserContext; +use crate::statements::OptionMap; use crate::statements::copy::{ CopyDatabase, CopyDatabaseArgument, CopyQueryTo, CopyQueryToArgument, CopyTable, CopyTableArgument, @@ -28,9 +27,6 @@ use crate::statements::copy::{ use crate::statements::statement::Statement; use crate::util::parse_option_string; -pub type With = HashMap; -pub type Connection = HashMap; - // COPY tbl TO 'output.parquet'; impl ParserContext<'_> { pub(crate) fn parse_copy(&mut self) -> Result { @@ -73,8 +69,8 @@ impl ParserContext<'_> { let argument = CopyDatabaseArgument { database_name, - with: with.into(), - connection: connection.into(), + with, + connection, location, }; CopyDatabase::To(argument) @@ -92,8 +88,8 @@ impl ParserContext<'_> { let argument = CopyDatabaseArgument { database_name, - with: with.into(), - connection: connection.into(), + with, + connection, location, }; CopyDatabase::From(argument) @@ -114,8 +110,8 @@ impl ParserContext<'_> { let (with, connection, location, limit) = self.parse_copy_parameters()?; Ok(CopyTable::To(CopyTableArgument { table_name, - with: with.into(), - connection: connection.into(), + with, + connection, location, limit, })) @@ -126,8 +122,8 @@ impl ParserContext<'_> { let (with, connection, location, limit) = self.parse_copy_parameters()?; Ok(CopyTable::From(CopyTableArgument { table_name, - with: with.into(), - connection: connection.into(), + with, + connection, location, limit, })) @@ -161,14 +157,14 @@ impl ParserContext<'_> { Ok(CopyQueryTo { query: Box::new(query), arg: CopyQueryToArgument { - with: with.into(), - connection: connection.into(), + with, + connection, location, }, }) } - fn parse_copy_parameters(&mut self) -> Result<(With, Connection, String, Option)> { + fn parse_copy_parameters(&mut self) -> Result<(OptionMap, OptionMap, String, Option)> { let location = self.parser .parse_literal_string() @@ -185,7 +181,8 @@ impl ParserContext<'_> { let with = options .into_iter() .map(parse_option_string) - .collect::>()?; + .collect::>>()?; + let with = OptionMap::new(with); let connection_options = self .parser @@ -195,7 +192,8 @@ impl ParserContext<'_> { let connection = connection_options .into_iter() .map(parse_option_string) - .collect::>()?; + .collect::>>()?; + let connection = OptionMap::new(connection); let limit = if self.parser.parse_keyword(Keyword::LIMIT) { Some( @@ -309,7 +307,7 @@ mod tests { struct Test<'a> { sql: &'a str, expected_pattern: Option, - expected_connection: HashMap, + expected_connection: HashMap<&'a str, &'a str>, } let tests = [ @@ -321,10 +319,7 @@ mod tests { Test { sql: "COPY catalog0.schema0.tbl FROM 'tbl_file.parquet' WITH (PATTERN = 'demo.*') CONNECTION (FOO='Bar', ONE='two')", expected_pattern: Some("demo.*".into()), - expected_connection: [("foo", "Bar"), ("one", "two")] - .into_iter() - .map(|(k, v)| (k.to_string(), v.to_string())) - .collect(), + expected_connection: HashMap::from([("foo", "Bar"), ("one", "two")]), }, ]; @@ -346,10 +341,7 @@ mod tests { if let Some(expected_pattern) = test.expected_pattern { assert_eq!(copy_table.pattern().unwrap(), expected_pattern); } - assert_eq!( - copy_table.connection.clone(), - test.expected_connection.into() - ); + assert_eq!(copy_table.connection.to_str_map(), test.expected_connection); } _ => unreachable!(), } @@ -360,7 +352,7 @@ mod tests { fn test_parse_copy_table_to() { struct Test<'a> { sql: &'a str, - expected_connection: HashMap, + expected_connection: HashMap<&'a str, &'a str>, } let tests = [ @@ -370,17 +362,11 @@ mod tests { }, Test { sql: "COPY catalog0.schema0.tbl TO 'tbl_file.parquet' CONNECTION (FOO='Bar', ONE='two')", - expected_connection: [("foo", "Bar"), ("one", "two")] - .into_iter() - .map(|(k, v)| (k.to_string(), v.to_string())) - .collect(), + expected_connection: HashMap::from([("foo", "Bar"), ("one", "two")]), }, Test { sql: "COPY catalog0.schema0.tbl TO 'tbl_file.parquet' WITH (FORMAT = 'parquet') CONNECTION (FOO='Bar', ONE='two')", - expected_connection: [("foo", "Bar"), ("one", "two")] - .into_iter() - .map(|(k, v)| (k.to_string(), v.to_string())) - .collect(), + expected_connection: HashMap::from([("foo", "Bar"), ("one", "two")]), }, ]; @@ -399,10 +385,7 @@ mod tests { Statement::Copy(crate::statements::copy::Copy::CopyTable(CopyTable::To( copy_table, ))) => { - assert_eq!( - copy_table.connection.clone(), - test.expected_connection.into() - ); + assert_eq!(copy_table.connection.to_str_map(), test.expected_connection); } _ => unreachable!(), } diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index 30a087088a..df6592b187 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod json; #[cfg(feature = "enterprise")] pub mod trigger; @@ -50,7 +51,7 @@ use crate::statements::create::{ use crate::statements::statement::Statement; use crate::statements::transform::type_alias::get_data_type_by_alias_name; use crate::statements::{OptionMap, sql_data_type_to_concrete_data_type}; -use crate::util::{location_to_index, parse_option_string}; +use crate::util::{OptionValue, location_to_index, parse_option_string}; pub const ENGINE: &str = "ENGINE"; pub const MAXVALUE: &str = "MAXVALUE"; @@ -203,7 +204,7 @@ impl<'a> ParserContext<'a> { .context(SyntaxSnafu)? .into_iter() .map(parse_option_string) - .collect::>>()?; + .collect::>>()?; for key in options.keys() { ensure!( @@ -211,7 +212,7 @@ impl<'a> ParserContext<'a> { InvalidDatabaseOptionSnafu { key: key.clone() } ); } - if let Some(append_mode) = options.get("append_mode") + if let Some(append_mode) = options.get("append_mode").and_then(|x| x.as_string()) && append_mode == "true" && options.contains_key("merge_mode") { @@ -224,7 +225,7 @@ impl<'a> ParserContext<'a> { Ok(Statement::CreateDatabase(CreateDatabase { name: database_name, if_not_exists, - options: options.into(), + options: OptionMap::new(options), })) } @@ -450,11 +451,11 @@ impl<'a> ParserContext<'a> { .context(SyntaxSnafu)? .into_iter() .map(parse_option_string) - .collect::>>()?; + .collect::>>()?; for key in options.keys() { ensure!(validate_table_option(key), InvalidTableOptionSnafu { key }); } - Ok(options.into()) + Ok(OptionMap::new(options)) } /// "PARTITION ON COLUMNS (...)" clause @@ -662,9 +663,17 @@ impl<'a> ParserContext<'a> { } ); - let data_type = parser.parse_data_type().context(SyntaxSnafu)?; - let mut options = vec![]; let mut extensions = ColumnExtensions::default(); + + let data_type = parser.parse_data_type().context(SyntaxSnafu)?; + // Must immediately parse the JSON datatype format because it is closely after the "JSON" + // datatype, like this: "JSON(format = ...)". + if matches!(data_type, DataType::JSON) { + let options = json::parse_json_datatype_options(parser)?; + extensions.json_datatype_options = Some(options); + } + + let mut options = vec![]; loop { if parser.parse_keyword(Keyword::CONSTRAINT) { let name = Some(parser.parse_identifier().context(SyntaxSnafu)?); @@ -810,9 +819,9 @@ impl<'a> ParserContext<'a> { .context(error::SyntaxSnafu)? .into_iter() .map(parse_option_string) - .collect::>>()?; + .collect::>>()?; - for key in options.keys() { + for (key, _) in options.iter() { ensure!( validate_column_skipping_index_create_option(key), InvalidColumnOptionSnafu { @@ -822,7 +831,8 @@ impl<'a> ParserContext<'a> { ); } - column_extensions.skipping_index_options = Some(options.into()); + let options = OptionMap::new(options); + column_extensions.skipping_index_options = Some(options); is_index_declared |= true; } @@ -860,9 +870,9 @@ impl<'a> ParserContext<'a> { .context(error::SyntaxSnafu)? .into_iter() .map(parse_option_string) - .collect::>>()?; + .collect::>>()?; - for key in options.keys() { + for (key, _) in options.iter() { ensure!( validate_column_fulltext_create_option(key), InvalidColumnOptionSnafu { @@ -872,7 +882,8 @@ impl<'a> ParserContext<'a> { ); } - column_extensions.fulltext_index_options = Some(options.into()); + let options = OptionMap::new(options); + column_extensions.fulltext_index_options = Some(options); is_index_declared |= true; } @@ -1203,7 +1214,7 @@ mod tests { struct Test<'a> { sql: &'a str, expected_table_name: &'a str, - expected_options: HashMap, + expected_options: HashMap<&'a str, &'a str>, expected_engine: &'a str, expected_if_not_exist: bool, } @@ -1213,8 +1224,8 @@ mod tests { sql: "CREATE EXTERNAL TABLE city with(location='/var/data/city.csv',format='csv');", expected_table_name: "city", expected_options: HashMap::from([ - ("location".to_string(), "/var/data/city.csv".to_string()), - ("format".to_string(), "csv".to_string()), + ("location", "/var/data/city.csv"), + ("format", "csv"), ]), expected_engine: FILE_ENGINE, expected_if_not_exist: false, @@ -1223,8 +1234,8 @@ mod tests { sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv');", expected_table_name: "city", expected_options: HashMap::from([ - ("location".to_string(), "/var/data/city.csv".to_string()), - ("format".to_string(), "csv".to_string()), + ("location", "/var/data/city.csv"), + ("format", "csv"), ]), expected_engine: "foo", expected_if_not_exist: true, @@ -1233,9 +1244,9 @@ mod tests { sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv','compaction.type'='bar');", expected_table_name: "city", expected_options: HashMap::from([ - ("location".to_string(), "/var/data/city.csv".to_string()), - ("format".to_string(), "csv".to_string()), - ("compaction.type".to_string(), "bar".to_string()), + ("location", "/var/data/city.csv"), + ("format", "csv"), + ("compaction.type", "bar"), ]), expected_engine: "foo", expected_if_not_exist: true, @@ -1253,7 +1264,7 @@ mod tests { match &stmts[0] { Statement::CreateExternalTable(c) => { assert_eq!(c.name.to_string(), test.expected_table_name.to_string()); - assert_eq!(c.options, test.expected_options.into()); + assert_eq!(c.options.to_str_map(), test.expected_options); assert_eq!(c.if_not_exists, test.expected_if_not_exist); assert_eq!(c.engine, test.expected_engine); } @@ -1273,10 +1284,7 @@ mod tests { PRIMARY KEY(ts, host), ) with(location='/var/data/city.csv',format='csv');"; - let options = HashMap::from([ - ("location".to_string(), "/var/data/city.csv".to_string()), - ("format".to_string(), "csv".to_string()), - ]); + let options = HashMap::from([("location", "/var/data/city.csv"), ("format", "csv")]); let stmts = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) @@ -1285,7 +1293,7 @@ mod tests { match &stmts[0] { Statement::CreateExternalTable(c) => { assert_eq!(c.name.to_string(), "city"); - assert_eq!(c.options, options.into()); + assert_eq!(c.options.to_str_map(), options); let columns = &c.columns; assert_column_def(&columns[0].column_def, "host", "STRING"); @@ -2723,7 +2731,7 @@ CREATE TABLE log ( assert!(result.is_ok()); assert!(extensions.vector_options.is_some()); let vector_options = extensions.vector_options.unwrap(); - assert_eq!(vector_options.get(VECTOR_OPT_DIM), Some(&"128".to_string())); + assert_eq!(vector_options.get(VECTOR_OPT_DIM), Some("128")); } #[test] @@ -2783,14 +2791,8 @@ CREATE TABLE log ( assert!(result.unwrap()); assert!(extensions.fulltext_index_options.is_some()); let fulltext_options = extensions.fulltext_index_options.unwrap(); - assert_eq!( - fulltext_options.get("analyzer"), - Some(&"English".to_string()) - ); - assert_eq!( - fulltext_options.get("case_sensitive"), - Some(&"true".to_string()) - ); + assert_eq!(fulltext_options.get("analyzer"), Some("English")); + assert_eq!(fulltext_options.get("case_sensitive"), Some("true")); } // Test fulltext index with invalid type (should fail) diff --git a/src/sql/src/parsers/create_parser/json.rs b/src/sql/src/parsers/create_parser/json.rs new file mode 100644 index 0000000000..1556205fef --- /dev/null +++ b/src/sql/src/parsers/create_parser/json.rs @@ -0,0 +1,138 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use snafu::ResultExt; +use sqlparser::parser::Parser; +use sqlparser::tokenizer::Token; + +use crate::error::{Result, SyntaxSnafu}; +use crate::statements::OptionMap; +use crate::util; + +pub(super) fn parse_json_datatype_options(parser: &mut Parser<'_>) -> Result { + if parser.consume_token(&Token::LParen) { + let result = parser + .parse_comma_separated0(Parser::parse_sql_option, Token::RParen) + .context(SyntaxSnafu) + .and_then(|options| { + options + .into_iter() + .map(util::parse_option_string) + .collect::>>() + })?; + parser.expect_token(&Token::RParen).context(SyntaxSnafu)?; + Ok(OptionMap::new(result)) + } else { + Ok(OptionMap::default()) + } +} + +#[cfg(test)] +mod tests { + use sqlparser::ast::DataType; + + use crate::dialect::GreptimeDbDialect; + use crate::parser::{ParseOptions, ParserContext}; + use crate::statements::OptionMap; + use crate::statements::create::{ + Column, JSON_FORMAT_FULL_STRUCTURED, JSON_FORMAT_PARTIAL, JSON_FORMAT_RAW, JSON_OPT_FORMAT, + JSON_OPT_UNSTRUCTURED_KEYS, + }; + use crate::statements::statement::Statement; + + #[test] + fn test_parse_json_datatype_options() { + fn parse(sql: &str) -> OptionMap { + let Statement::CreateTable(mut create_table) = ParserContext::create_with_dialect( + sql, + &GreptimeDbDialect {}, + ParseOptions::default(), + ) + .unwrap() + .remove(0) else { + unreachable!() + }; + + let Column { + column_def, + extensions, + } = create_table.columns.remove(0); + assert_eq!(column_def.name.to_string(), "my_json"); + assert_eq!(column_def.data_type, DataType::JSON); + assert!(column_def.options.is_empty()); + + assert!(extensions.json_datatype_options.is_some()); + extensions.json_datatype_options.unwrap() + } + + let sql = r#" +CREATE TABLE json_data ( + my_json JSON(format = "partial", unstructured_keys = ["k", "foo.bar", "a.b.c"]), + ts TIMESTAMP TIME INDEX, +)"#; + let options = parse(sql); + assert_eq!(options.len(), 2); + assert_eq!( + options.value(JSON_OPT_FORMAT).and_then(|x| x.as_string()), + Some(JSON_FORMAT_PARTIAL) + ); + let expected = vec!["k", "foo.bar", "a.b.c"]; + assert_eq!( + options + .value(JSON_OPT_UNSTRUCTURED_KEYS) + .and_then(|x| x.as_list()), + Some(expected) + ); + + let sql = r#" +CREATE TABLE json_data ( + my_json JSON(format = "structured"), + ts TIMESTAMP TIME INDEX, +)"#; + let options = parse(sql); + assert_eq!(options.len(), 1); + assert_eq!( + options.value(JSON_OPT_FORMAT).and_then(|x| x.as_string()), + Some(JSON_FORMAT_FULL_STRUCTURED) + ); + + let sql = r#" +CREATE TABLE json_data ( + my_json JSON(format = "raw"), + ts TIMESTAMP TIME INDEX, +)"#; + let options = parse(sql); + assert_eq!(options.len(), 1); + assert_eq!( + options.value(JSON_OPT_FORMAT).and_then(|x| x.as_string()), + Some(JSON_FORMAT_RAW) + ); + + let sql = r#" +CREATE TABLE json_data ( + my_json JSON(), + ts TIMESTAMP TIME INDEX, +)"#; + let options = parse(sql); + assert!(options.is_empty()); + + let sql = r#" +CREATE TABLE json_data ( + my_json JSON, + ts TIMESTAMP TIME INDEX, +)"#; + let options = parse(sql); + assert!(options.is_empty()); + } +} diff --git a/src/sql/src/parsers/create_parser/trigger.rs b/src/sql/src/parsers/create_parser/trigger.rs index d1b1cee3fa..6b2b0c1eb7 100644 --- a/src/sql/src/parsers/create_parser/trigger.rs +++ b/src/sql/src/parsers/create_parser/trigger.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; use std::time::Duration; use snafu::{OptionExt, ResultExt, ensure}; @@ -303,11 +302,11 @@ impl<'a> ParserContext<'a> { .context(error::SyntaxSnafu)? .into_iter() .map(parse_option_string) - .collect::>>()?; + .collect::>>()?; self.parser .expect_token(&Token::RParen) .context(error::SyntaxSnafu)?; - Ok(options.into()) + Ok(OptionMap::new(options)) } /// The SQL format as follows: @@ -344,11 +343,11 @@ impl<'a> ParserContext<'a> { .context(error::SyntaxSnafu)? .into_iter() .map(parse_option_string) - .collect::>>()?; + .collect::>>()?; self.parser .expect_token(&Token::RParen) .context(error::SyntaxSnafu)?; - Ok(options.into()) + Ok(OptionMap::new(options)) } /// The SQL format as follows: @@ -467,9 +466,9 @@ impl<'a> ParserContext<'a> { .context(error::SyntaxSnafu)? .into_iter() .map(parse_option_string) - .collect::>>()?; + .collect::>>()?; - for key in options.keys() { + for (key, _) in options.iter() { ensure!( validate_webhook_option(key), error::InvalidTriggerWebhookOptionSnafu { key: key.clone() } @@ -478,7 +477,7 @@ impl<'a> ParserContext<'a> { let webhook = AlertManagerWebhook { url, - options: options.into(), + options: OptionMap::new(options), }; Ok(NotifyChannel { diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index 051368f12c..b48e208043 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -679,6 +679,7 @@ mod tests { vector_options: None, skipping_index_options: None, inverted_index_options: None, + json_datatype_options: None, }, }; diff --git a/src/sql/src/statements/copy.rs b/src/sql/src/statements/copy.rs index 8c18a836f9..7aa099c53c 100644 --- a/src/sql/src/statements/copy.rs +++ b/src/sql/src/statements/copy.rs @@ -151,20 +151,14 @@ impl CopyTableArgument { pub fn format(&self) -> Option { self.with .get(common_datasource::file_format::FORMAT_TYPE) - .cloned() + .map(|v| v.to_string()) .or_else(|| Some("PARQUET".to_string())) } pub fn pattern(&self) -> Option { self.with .get(common_datasource::file_format::FILE_PATTERN) - .cloned() - } - - pub fn timestamp_pattern(&self) -> Option { - self.with - .get(common_datasource::file_format::TIMESTAMP_FORMAT) - .cloned() + .map(|v| v.to_string()) } } diff --git a/src/sql/src/statements/create.rs b/src/sql/src/statements/create.rs index c47c01543c..9d945e7c8d 100644 --- a/src/sql/src/statements/create.rs +++ b/src/sql/src/statements/create.rs @@ -12,10 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt::{Display, Formatter}; use common_catalog::consts::FILE_ENGINE; +use datatypes::json::JsonStructureSettings; use datatypes::schema::{FulltextOptions, SkippingIndexOptions}; use itertools::Itertools; use serde::Serialize; @@ -25,7 +26,8 @@ use sqlparser_derive::{Visit, VisitMut}; use crate::ast::{ColumnDef, Ident, ObjectName, Value as SqlValue}; use crate::error::{ - InvalidFlowQuerySnafu, Result, SetFulltextOptionSnafu, SetSkippingIndexOptionSnafu, + InvalidFlowQuerySnafu, InvalidSqlSnafu, Result, SetFulltextOptionSnafu, + SetSkippingIndexOptionSnafu, }; use crate::statements::OptionMap; use crate::statements::statement::Statement; @@ -36,6 +38,12 @@ const COMMA_SEP: &str = ", "; const INDENT: usize = 2; pub const VECTOR_OPT_DIM: &str = "dim"; +pub const JSON_OPT_UNSTRUCTURED_KEYS: &str = "unstructured_keys"; +pub const JSON_OPT_FORMAT: &str = "format"; +pub const JSON_FORMAT_FULL_STRUCTURED: &str = "structured"; +pub const JSON_FORMAT_RAW: &str = "raw"; +pub const JSON_FORMAT_PARTIAL: &str = "partial"; + macro_rules! format_indent { ($fmt: expr, $arg: expr) => { format!($fmt, format_args!("{: >1$}", "", INDENT), $arg) @@ -124,6 +132,7 @@ pub struct ColumnExtensions { /// /// Inverted index doesn't have options at present. There won't be any options in that map. pub inverted_index_options: Option, + pub json_datatype_options: Option, } impl Column { @@ -209,6 +218,39 @@ impl ColumnExtensions { options.try_into().context(SetSkippingIndexOptionSnafu)?, )) } + + pub fn build_json_structure_settings(&self) -> Result> { + let Some(options) = self.json_datatype_options.as_ref() else { + return Ok(None); + }; + + let unstructured_keys = options + .value(JSON_OPT_UNSTRUCTURED_KEYS) + .and_then(|v| { + v.as_list().map(|x| { + x.into_iter() + .map(|x| x.to_string()) + .collect::>() + }) + }) + .unwrap_or_default(); + + options + .get(JSON_OPT_FORMAT) + .map(|format| match format { + JSON_FORMAT_FULL_STRUCTURED => Ok(JsonStructureSettings::Structured(None)), + JSON_FORMAT_PARTIAL => Ok(JsonStructureSettings::PartialUnstructuredByKey { + fields: None, + unstructured_keys, + }), + JSON_FORMAT_RAW => Ok(JsonStructureSettings::UnstructuredRaw), + _ => InvalidSqlSnafu { + msg: format!("unknown JSON datatype 'format': {format}"), + } + .fail(), + }) + .transpose() + } } /// Partition on columns or values. diff --git a/src/sql/src/statements/option_map.rs b/src/sql/src/statements/option_map.rs index 864bb3b3f9..f67b0dc72a 100644 --- a/src/sql/src/statements/option_map.rs +++ b/src/sql/src/statements/option_map.rs @@ -19,28 +19,46 @@ use common_base::secrets::{ExposeSecret, ExposeSecretMut, SecretString}; use serde::Serialize; use sqlparser::ast::{Visit, VisitMut, Visitor, VisitorMut}; +use crate::util::OptionValue; + const REDACTED_OPTIONS: [&str; 2] = ["access_key_id", "secret_access_key"]; /// Options hashmap. #[derive(Clone, Debug, Default, Serialize)] pub struct OptionMap { - options: BTreeMap, + options: BTreeMap, #[serde(skip_serializing)] secrets: BTreeMap, } impl OptionMap { + pub fn new>(options: I) -> Self { + let (secrets, options): (Vec<_>, Vec<_>) = options + .into_iter() + .partition(|(k, _)| REDACTED_OPTIONS.contains(&k.as_str())); + Self { + options: options.into_iter().collect(), + secrets: secrets + .into_iter() + .filter_map(|(k, v)| { + v.as_string() + .map(|v| (k, SecretString::new(Box::new(v.to_string())))) + }) + .collect(), + } + } + pub fn insert(&mut self, k: String, v: String) { if REDACTED_OPTIONS.contains(&k.as_str()) { self.secrets.insert(k, SecretString::new(Box::new(v))); } else { - self.options.insert(k, v); + self.options.insert(k, v.into()); } } - pub fn get(&self, k: &str) -> Option<&String> { + pub fn get(&self, k: &str) -> Option<&str> { if let Some(value) = self.options.get(k) { - Some(value) + value.as_string() } else if let Some(value) = self.secrets.get(k) { Some(value.expose_secret()) } else { @@ -48,6 +66,10 @@ impl OptionMap { } } + pub fn value(&self, k: &str) -> Option<&OptionValue> { + self.options.get(k) + } + pub fn is_empty(&self) -> bool { self.options.is_empty() && self.secrets.is_empty() } @@ -58,7 +80,11 @@ impl OptionMap { pub fn to_str_map(&self) -> HashMap<&str, &str> { let mut map = HashMap::with_capacity(self.len()); - map.extend(self.options.iter().map(|(k, v)| (k.as_str(), v.as_str()))); + map.extend( + self.options + .iter() + .filter_map(|(k, v)| v.as_string().map(|v| (k.as_str(), v))), + ); map.extend( self.secrets .iter() @@ -69,7 +95,11 @@ impl OptionMap { pub fn into_map(self) -> HashMap { let mut map = HashMap::with_capacity(self.len()); - map.extend(self.options); + map.extend( + self.options + .into_iter() + .filter_map(|(k, v)| v.as_string().map(|v| (k, v.to_string()))), + ); map.extend( self.secrets .into_iter() @@ -80,7 +110,11 @@ impl OptionMap { pub fn kv_pairs(&self) -> Vec { let mut result = Vec::with_capacity(self.options.len() + self.secrets.len()); - for (k, v) in self.options.iter() { + for (k, v) in self + .options + .iter() + .filter_map(|(k, v)| v.as_string().map(|v| (k, v))) + { if k.contains(".") { result.push(format!("'{k}' = '{}'", v.escape_debug())); } else { diff --git a/src/sql/src/util.rs b/src/sql/src/util.rs index 7de02ca1fa..54555aa59d 100644 --- a/src/sql/src/util.rs +++ b/src/sql/src/util.rs @@ -15,10 +15,15 @@ use std::collections::HashSet; use std::fmt::{Display, Formatter}; -use sqlparser::ast::{Expr, ObjectName, SetExpr, SqlOption, TableFactor, Value, ValueWithSpan}; +use serde::Serialize; +use snafu::ensure; +use sqlparser::ast::{ + Array, Expr, Ident, ObjectName, SetExpr, SqlOption, TableFactor, Value, ValueWithSpan, +}; +use sqlparser_derive::{Visit, VisitMut}; use crate::ast::ObjectNamePartExt; -use crate::error::{InvalidSqlSnafu, InvalidTableOptionValueSnafu, Result}; +use crate::error::{InvalidExprAsOptionValueSnafu, InvalidSqlSnafu, Result}; use crate::statements::create::SqlOrTql; /// Format an [ObjectName] without any quote of its idents. @@ -42,29 +47,98 @@ pub fn format_raw_object_name(name: &ObjectName) -> String { format!("{}", Inner { name }) } -pub fn parse_option_string(option: SqlOption) -> Result<(String, String)> { +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Visit, VisitMut)] +pub struct OptionValue(Expr); + +impl OptionValue { + fn try_new(expr: Expr) -> Result { + ensure!( + matches!(expr, Expr::Value(_) | Expr::Identifier(_) | Expr::Array(_)), + InvalidExprAsOptionValueSnafu { + error: format!("{expr} not accepted") + } + ); + Ok(Self(expr)) + } + + fn expr_as_string(expr: &Expr) -> Option<&str> { + match expr { + Expr::Value(ValueWithSpan { value, .. }) => match value { + Value::SingleQuotedString(s) + | Value::DoubleQuotedString(s) + | Value::TripleSingleQuotedString(s) + | Value::TripleDoubleQuotedString(s) + | Value::SingleQuotedByteStringLiteral(s) + | Value::DoubleQuotedByteStringLiteral(s) + | Value::TripleSingleQuotedByteStringLiteral(s) + | Value::TripleDoubleQuotedByteStringLiteral(s) + | Value::SingleQuotedRawStringLiteral(s) + | Value::DoubleQuotedRawStringLiteral(s) + | Value::TripleSingleQuotedRawStringLiteral(s) + | Value::TripleDoubleQuotedRawStringLiteral(s) + | Value::EscapedStringLiteral(s) + | Value::UnicodeStringLiteral(s) + | Value::NationalStringLiteral(s) + | Value::HexStringLiteral(s) => Some(s), + Value::DollarQuotedString(s) => Some(&s.value), + Value::Number(s, _) => Some(s), + _ => None, + }, + Expr::Identifier(ident) => Some(&ident.value), + _ => None, + } + } + + pub fn as_string(&self) -> Option<&str> { + Self::expr_as_string(&self.0) + } + + pub fn as_list(&self) -> Option> { + let expr = &self.0; + match expr { + Expr::Value(_) | Expr::Identifier(_) => self.as_string().map(|s| vec![s]), + Expr::Array(array) => array + .elem + .iter() + .map(Self::expr_as_string) + .collect::>>(), + _ => None, + } + } +} + +impl From for OptionValue { + fn from(value: String) -> Self { + Self(Expr::Identifier(Ident::new(value))) + } +} + +impl From<&str> for OptionValue { + fn from(value: &str) -> Self { + Self(Expr::Identifier(Ident::new(value))) + } +} + +impl From> for OptionValue { + fn from(value: Vec<&str>) -> Self { + Self(Expr::Array(Array { + elem: value + .into_iter() + .map(|x| Expr::Identifier(Ident::new(x))) + .collect(), + named: false, + })) + } +} + +pub fn parse_option_string(option: SqlOption) -> Result<(String, OptionValue)> { let SqlOption::KeyValue { key, value } = option else { return InvalidSqlSnafu { msg: "Expecting a key-value pair in the option", } .fail(); }; - let v = match value { - Expr::Value(ValueWithSpan { - value: Value::SingleQuotedString(v), - .. - }) - | Expr::Value(ValueWithSpan { - value: Value::DoubleQuotedString(v), - .. - }) => v, - Expr::Identifier(v) => v.value, - Expr::Value(ValueWithSpan { - value: Value::Number(v, _), - .. - }) => v.clone(), - value => return InvalidTableOptionValueSnafu { key, value }.fail(), - }; + let v = OptionValue::try_new(value)?; let k = key.value.to_lowercase(); Ok((k, v)) } diff --git a/src/table/src/error.rs b/src/table/src/error.rs index 431b5a4db4..7fd04b26bf 100644 --- a/src/table/src/error.rs +++ b/src/table/src/error.rs @@ -150,9 +150,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Table options value is not valid, key: `{}`, value: `{}`", key, value))] - InvalidTableOptionValue { key: String, value: String }, - #[snafu(display("Invalid column option, column name: {}, error: {}", column_name, msg))] InvalidColumnOption { column_name: String, @@ -228,8 +225,7 @@ impl ErrorExt for Error { Error::Unsupported { .. } => StatusCode::Unsupported, Error::ParseTableOption { .. } => StatusCode::InvalidArguments, Error::MissingTimeIndexColumn { .. } => StatusCode::IllegalState, - Error::InvalidTableOptionValue { .. } - | Error::SetSkippingOptions { .. } + Error::SetSkippingOptions { .. } | Error::UnsetSkippingOptions { .. } | Error::InvalidTableName { .. } => StatusCode::InvalidArguments, } diff --git a/tests/cases/standalone/common/alter/change_col_skipping_options.result b/tests/cases/standalone/common/alter/change_col_skipping_options.result index b3b90e2359..1b2895bd09 100644 --- a/tests/cases/standalone/common/alter/change_col_skipping_options.result +++ b/tests/cases/standalone/common/alter/change_col_skipping_options.result @@ -323,7 +323,7 @@ Error: 1002(Unexpected), Invalid skipping index option: Invalid false positive r ALTER TABLE test MODIFY COLUMN value SET SKIPPING INDEX WITH(granularity = 1024, type = 'BLOOM', false_positive_rate = -0.01); -Error: 1004(InvalidArguments), Unrecognized table option key: false_positive_rate, value: -0.01 +Error: 1004(InvalidArguments), Invalid expr as option value, error: -0.01 not accepted ALTER TABLE test MODIFY COLUMN value SET SKIPPING INDEX WITH(granularity = 1024, type = 'BLOOM', false_positive_rate = 2);