From 497dfde90bcf73eae75acb6286111b4434376974 Mon Sep 17 00:00:00 2001 From: luofucong Date: Thu, 18 Dec 2025 20:07:44 +0800 Subject: [PATCH] ingest jsonbench data parse partial struct json datatype in create sql --- Cargo.lock | 6 +- Cargo.toml | 2 +- src/common/sql/Cargo.toml | 2 + src/common/sql/src/convert.rs | 568 +++++++----------- src/common/sql/src/default_constraint.rs | 14 +- .../src/req_convert/insert/stmt_to_region.rs | 3 +- src/operator/src/statement.rs | 17 +- src/operator/src/statement/admin.rs | 18 +- src/operator/src/statement/ddl.rs | 5 +- src/servers/src/mysql/helper.rs | 6 +- src/sql/src/dialect.rs | 4 + src/sql/src/error.rs | 8 +- src/sql/src/parsers/create_parser/json.rs | 42 +- src/sql/src/statements.rs | 14 +- src/sql/src/statements/create.rs | 59 +- src/sql/src/util.rs | 25 +- .../resources/jsonbench-head-10.ndjson | 10 + tests-integration/src/grpc.rs | 5 +- tests-integration/src/test_util.rs | 8 + tests-integration/src/tests.rs | 2 +- tests-integration/src/tests/gc.rs | 4 +- .../src/tests/instance_noop_wal_test.rs | 5 +- .../src/tests/reconcile_table.rs | 5 +- tests-integration/src/tests/test_util.rs | 14 - tests-integration/tests/jsonbench.rs | 94 +++ tests-integration/tests/main.rs | 1 + 26 files changed, 504 insertions(+), 437 deletions(-) create mode 100644 tests-integration/resources/jsonbench-head-10.ndjson create mode 100644 tests-integration/tests/jsonbench.rs diff --git a/Cargo.lock b/Cargo.lock index 4cf1583019..916e47b10d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2580,10 +2580,12 @@ dependencies = [ name = "common-sql" version = "1.0.0-beta.3" dependencies = [ + "arrow-schema", "common-base", "common-decimal", "common-error", "common-macro", + "common-telemetry", "common-time", "datafusion-sql", "datatypes", @@ -12227,7 +12229,7 @@ dependencies = [ [[package]] name = "sqlparser" version = "0.58.0" -source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=4b519a5caa95472cc3988f5556813a583dd35af1#4b519a5caa95472cc3988f5556813a583dd35af1" +source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=a0ce2bc6eb3e804532932f39833c32432f5c9a39#a0ce2bc6eb3e804532932f39833c32432f5c9a39" dependencies = [ "lazy_static", "log", @@ -12251,7 +12253,7 @@ dependencies = [ [[package]] name = "sqlparser_derive" version = "0.3.0" -source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=4b519a5caa95472cc3988f5556813a583dd35af1#4b519a5caa95472cc3988f5556813a583dd35af1" +source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=a0ce2bc6eb3e804532932f39833c32432f5c9a39#a0ce2bc6eb3e804532932f39833c32432f5c9a39" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index e94f27e4dc..c4d0832627 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -332,7 +332,7 @@ datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.g datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" } datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" } datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" } -sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "4b519a5caa95472cc3988f5556813a583dd35af1" } # branch = "v0.58.x" +sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "a0ce2bc6eb3e804532932f39833c32432f5c9a39" } # branch = "v0.58.x" [profile.release] debug = 1 diff --git a/src/common/sql/Cargo.toml b/src/common/sql/Cargo.toml index 73454367e1..3a66ec2d04 100644 --- a/src/common/sql/Cargo.toml +++ b/src/common/sql/Cargo.toml @@ -5,10 +5,12 @@ edition.workspace = true license.workspace = true [dependencies] +arrow-schema.workspace = true common-base.workspace = true common-decimal.workspace = true common-error.workspace = true common-macro.workspace = true +common-telemetry.workspace = true common-time.workspace = true datafusion-sql.workspace = true datatypes.workspace = true diff --git a/src/common/sql/src/convert.rs b/src/common/sql/src/convert.rs index edb793baf6..bd9a1d0769 100644 --- a/src/common/sql/src/convert.rs +++ b/src/common/sql/src/convert.rs @@ -14,11 +14,12 @@ use std::str::FromStr; +use arrow_schema::extension::ExtensionType; use common_time::Timestamp; use common_time::timezone::Timezone; -use datatypes::json::JsonStructureSettings; +use datatypes::extension::json::JsonExtensionType; use datatypes::prelude::ConcreteDataType; -use datatypes::schema::ColumnDefaultConstraint; +use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema}; use datatypes::types::{JsonFormat, parse_string_to_jsonb, parse_string_to_vector_type_value}; use datatypes::value::{OrderedF32, OrderedF64, Value}; use snafu::{OptionExt, ResultExt, ensure}; @@ -124,13 +125,14 @@ pub(crate) fn sql_number_to_value(data_type: &ConcreteDataType, n: &str) -> Resu /// If `auto_string_to_numeric` is true, tries to cast the string value to numeric values, /// and returns error if the cast fails. pub fn sql_value_to_value( - column_name: &str, - data_type: &ConcreteDataType, + column_schema: &ColumnSchema, sql_val: &SqlValue, timezone: Option<&Timezone>, unary_op: Option, auto_string_to_numeric: bool, ) -> Result { + let column_name = &column_schema.name; + let data_type = &column_schema.data_type; let mut value = match sql_val { SqlValue::Number(n, _) => sql_number_to_value(data_type, n)?, SqlValue::Null => Value::Null, @@ -146,13 +148,9 @@ pub fn sql_value_to_value( (*b).into() } - SqlValue::DoubleQuotedString(s) | SqlValue::SingleQuotedString(s) => parse_string_to_value( - column_name, - s.clone(), - data_type, - timezone, - auto_string_to_numeric, - )?, + SqlValue::DoubleQuotedString(s) | SqlValue::SingleQuotedString(s) => { + parse_string_to_value(column_schema, s.clone(), timezone, auto_string_to_numeric)? + } SqlValue::HexStringLiteral(s) => { // Should not directly write binary into json column ensure!( @@ -244,12 +242,12 @@ pub fn sql_value_to_value( } pub(crate) fn parse_string_to_value( - column_name: &str, + column_schema: &ColumnSchema, s: String, - data_type: &ConcreteDataType, timezone: Option<&Timezone>, auto_string_to_numeric: bool, ) -> Result { + let data_type = &column_schema.data_type; if auto_string_to_numeric && let Some(value) = auto_cast_to_numeric(&s, data_type)? { return Ok(value); } @@ -257,7 +255,7 @@ pub(crate) fn parse_string_to_value( ensure!( data_type.is_stringifiable(), ColumnTypeMismatchSnafu { - column_name, + column_name: column_schema.name.clone(), expect: data_type.clone(), actual: ConcreteDataType::string_datatype(), } @@ -303,23 +301,21 @@ pub(crate) fn parse_string_to_value( } } ConcreteDataType::Binary(_) => Ok(Value::Binary(s.as_bytes().into())), - ConcreteDataType::Json(j) => { - match &j.format { - JsonFormat::Jsonb => { - let v = parse_string_to_jsonb(&s).context(DatatypeSnafu)?; - Ok(Value::Binary(v.into())) - } - JsonFormat::Native(_inner) => { - // Always use the structured version at this level. - let serde_json_value = - serde_json::from_str(&s).context(DeserializeSnafu { json: s })?; - let json_structure_settings = JsonStructureSettings::Structured(None); - json_structure_settings - .encode(serde_json_value) - .context(DatatypeSnafu) - } + ConcreteDataType::Json(j) => match &j.format { + JsonFormat::Jsonb => { + let v = parse_string_to_jsonb(&s).context(DatatypeSnafu)?; + Ok(Value::Binary(v.into())) } - } + JsonFormat::Native(_) => { + let extension_type: Option = + column_schema.extension_type().context(DatatypeSnafu)?; + let json_structure_settings = extension_type + .and_then(|x| x.metadata().json_structure_settings.clone()) + .unwrap_or_default(); + let v = serde_json::from_str(&s).context(DeserializeSnafu { json: s })?; + json_structure_settings.encode(v).context(DatatypeSnafu) + } + }, ConcreteDataType::Vector(d) => { let v = parse_string_to_vector_type_value(&s, Some(d.dim)).context(DatatypeSnafu)?; Ok(Value::Binary(v.into())) @@ -417,305 +413,265 @@ mod test { use super::*; + macro_rules! call_parse_string_to_value { + ($column_name: expr, $input: expr, $data_type: expr) => { + call_parse_string_to_value!($column_name, $input, $data_type, None) + }; + ($column_name: expr, $input: expr, $data_type: expr, timezone = $timezone: expr) => { + call_parse_string_to_value!($column_name, $input, $data_type, Some($timezone)) + }; + ($column_name: expr, $input: expr, $data_type: expr, $timezone: expr) => {{ + let column_schema = ColumnSchema::new($column_name, $data_type, true); + parse_string_to_value(&column_schema, $input, $timezone, true) + }}; + } + #[test] - fn test_string_to_value_auto_numeric() { + fn test_string_to_value_auto_numeric() -> Result<()> { // Test string to boolean with auto cast - let result = parse_string_to_value( + let result = call_parse_string_to_value!( "col", "true".to_string(), - &ConcreteDataType::boolean_datatype(), - None, - true, - ) - .unwrap(); + ConcreteDataType::boolean_datatype() + )?; assert_eq!(Value::Boolean(true), result); // Test invalid string to boolean with auto cast - let result = parse_string_to_value( + let result = call_parse_string_to_value!( "col", "not_a_boolean".to_string(), - &ConcreteDataType::boolean_datatype(), - None, - true, + ConcreteDataType::boolean_datatype() ); assert!(result.is_err()); // Test string to int8 - let result = parse_string_to_value( + let result = call_parse_string_to_value!( "col", "42".to_string(), - &ConcreteDataType::int8_datatype(), - None, - true, - ) - .unwrap(); + ConcreteDataType::int8_datatype() + )?; assert_eq!(Value::Int8(42), result); // Test invalid string to int8 with auto cast - let result = parse_string_to_value( + let result = call_parse_string_to_value!( "col", "not_an_int8".to_string(), - &ConcreteDataType::int8_datatype(), - None, - true, + ConcreteDataType::int8_datatype() ); assert!(result.is_err()); // Test string to int16 - let result = parse_string_to_value( + let result = call_parse_string_to_value!( "col", "1000".to_string(), - &ConcreteDataType::int16_datatype(), - None, - true, - ) - .unwrap(); + ConcreteDataType::int16_datatype() + )?; assert_eq!(Value::Int16(1000), result); // Test invalid string to int16 with auto cast - let result = parse_string_to_value( + let result = call_parse_string_to_value!( "col", "not_an_int16".to_string(), - &ConcreteDataType::int16_datatype(), - None, - true, + ConcreteDataType::int16_datatype() ); assert!(result.is_err()); // Test string to int32 - let result = parse_string_to_value( + let result = call_parse_string_to_value!( "col", "100000".to_string(), - &ConcreteDataType::int32_datatype(), - None, - true, - ) - .unwrap(); + ConcreteDataType::int32_datatype() + )?; assert_eq!(Value::Int32(100000), result); // Test invalid string to int32 with auto cast - let result = parse_string_to_value( + let result = call_parse_string_to_value!( "col", "not_an_int32".to_string(), - &ConcreteDataType::int32_datatype(), - None, - true, + ConcreteDataType::int32_datatype() ); assert!(result.is_err()); // Test string to int64 - let result = parse_string_to_value( + let result = call_parse_string_to_value!( "col", "1000000".to_string(), - &ConcreteDataType::int64_datatype(), - None, - true, - ) - .unwrap(); + ConcreteDataType::int64_datatype() + )?; assert_eq!(Value::Int64(1000000), result); // Test invalid string to int64 with auto cast - let result = parse_string_to_value( + let result = call_parse_string_to_value!( "col", "not_an_int64".to_string(), - &ConcreteDataType::int64_datatype(), - None, - true, + ConcreteDataType::int64_datatype() ); assert!(result.is_err()); // Test string to uint8 - let result = parse_string_to_value( + let result = call_parse_string_to_value!( "col", "200".to_string(), - &ConcreteDataType::uint8_datatype(), - None, - true, - ) - .unwrap(); + ConcreteDataType::uint8_datatype() + )?; assert_eq!(Value::UInt8(200), result); // Test invalid string to uint8 with auto cast - let result = parse_string_to_value( + let result = call_parse_string_to_value!( "col", "not_a_uint8".to_string(), - &ConcreteDataType::uint8_datatype(), - None, - true, + ConcreteDataType::uint8_datatype() ); assert!(result.is_err()); // Test string to uint16 - let result = parse_string_to_value( + let result = call_parse_string_to_value!( "col", "60000".to_string(), - &ConcreteDataType::uint16_datatype(), - None, - true, - ) - .unwrap(); + ConcreteDataType::uint16_datatype() + )?; assert_eq!(Value::UInt16(60000), result); // Test invalid string to uint16 with auto cast - let result = parse_string_to_value( + let result = call_parse_string_to_value!( "col", "not_a_uint16".to_string(), - &ConcreteDataType::uint16_datatype(), - None, - true, + ConcreteDataType::uint16_datatype() ); assert!(result.is_err()); // Test string to uint32 - let result = parse_string_to_value( + let result = call_parse_string_to_value!( "col", "4000000000".to_string(), - &ConcreteDataType::uint32_datatype(), - None, - true, - ) - .unwrap(); + ConcreteDataType::uint32_datatype() + )?; assert_eq!(Value::UInt32(4000000000), result); // Test invalid string to uint32 with auto cast - let result = parse_string_to_value( + let result = call_parse_string_to_value!( "col", "not_a_uint32".to_string(), - &ConcreteDataType::uint32_datatype(), - None, - true, + ConcreteDataType::uint32_datatype() ); assert!(result.is_err()); // Test string to uint64 - let result = parse_string_to_value( + let result = call_parse_string_to_value!( "col", "18446744073709551615".to_string(), - &ConcreteDataType::uint64_datatype(), - None, - true, - ) - .unwrap(); + ConcreteDataType::uint64_datatype() + )?; assert_eq!(Value::UInt64(18446744073709551615), result); // Test invalid string to uint64 with auto cast - let result = parse_string_to_value( + let result = call_parse_string_to_value!( "col", "not_a_uint64".to_string(), - &ConcreteDataType::uint64_datatype(), - None, - true, + ConcreteDataType::uint64_datatype() ); assert!(result.is_err()); // Test string to float32 - let result = parse_string_to_value( + let result = call_parse_string_to_value!( "col", "3.5".to_string(), - &ConcreteDataType::float32_datatype(), - None, - true, - ) - .unwrap(); + ConcreteDataType::float32_datatype() + )?; assert_eq!(Value::Float32(OrderedF32::from(3.5)), result); // Test invalid string to float32 with auto cast - let result = parse_string_to_value( + let result = call_parse_string_to_value!( "col", "not_a_float32".to_string(), - &ConcreteDataType::float32_datatype(), - None, - true, + ConcreteDataType::float32_datatype() ); assert!(result.is_err()); // Test string to float64 - let result = parse_string_to_value( + let result = call_parse_string_to_value!( "col", "3.5".to_string(), - &ConcreteDataType::float64_datatype(), - None, - true, - ) - .unwrap(); + ConcreteDataType::float64_datatype() + )?; assert_eq!(Value::Float64(OrderedF64::from(3.5)), result); // Test invalid string to float64 with auto cast - let result = parse_string_to_value( + let result = call_parse_string_to_value!( "col", "not_a_float64".to_string(), - &ConcreteDataType::float64_datatype(), - None, - true, + ConcreteDataType::float64_datatype() ); assert!(result.is_err()); + Ok(()) } - #[test] - fn test_sql_value_to_value() { - let sql_val = SqlValue::Null; - assert_eq!( - Value::Null, - sql_value_to_value( - "a", - &ConcreteDataType::float64_datatype(), - &sql_val, - None, + macro_rules! call_sql_value_to_value { + ($column_name: expr, $data_type: expr, $sql_value: expr) => { + call_sql_value_to_value!($column_name, $data_type, $sql_value, None, None, false) + }; + ($column_name: expr, $data_type: expr, $sql_value: expr, timezone = $timezone: expr) => { + call_sql_value_to_value!( + $column_name, + $data_type, + $sql_value, + Some($timezone), None, false ) - .unwrap() + }; + ($column_name: expr, $data_type: expr, $sql_value: expr, unary_op = $unary_op: expr) => { + call_sql_value_to_value!( + $column_name, + $data_type, + $sql_value, + None, + Some($unary_op), + false + ) + }; + ($column_name: expr, $data_type: expr, $sql_value: expr, auto_string_to_numeric) => { + call_sql_value_to_value!($column_name, $data_type, $sql_value, None, None, true) + }; + ($column_name: expr, $data_type: expr, $sql_value: expr, $timezone: expr, $unary_op: expr, $auto_string_to_numeric: expr) => {{ + let column_schema = ColumnSchema::new($column_name, $data_type, true); + sql_value_to_value( + &column_schema, + $sql_value, + $timezone, + $unary_op, + $auto_string_to_numeric, + ) + }}; + } + + #[test] + fn test_sql_value_to_value() -> Result<()> { + let sql_val = SqlValue::Null; + assert_eq!( + Value::Null, + call_sql_value_to_value!("a", ConcreteDataType::float64_datatype(), &sql_val)? ); let sql_val = SqlValue::Boolean(true); assert_eq!( Value::Boolean(true), - sql_value_to_value( - "a", - &ConcreteDataType::boolean_datatype(), - &sql_val, - None, - None, - false - ) - .unwrap() + call_sql_value_to_value!("a", ConcreteDataType::boolean_datatype(), &sql_val)? ); let sql_val = SqlValue::Number("3.0".to_string(), false); assert_eq!( Value::Float64(OrderedFloat(3.0)), - sql_value_to_value( - "a", - &ConcreteDataType::float64_datatype(), - &sql_val, - None, - None, - false - ) - .unwrap() + call_sql_value_to_value!("a", ConcreteDataType::float64_datatype(), &sql_val)? ); let sql_val = SqlValue::Number("3.0".to_string(), false); - let v = sql_value_to_value( - "a", - &ConcreteDataType::boolean_datatype(), - &sql_val, - None, - None, - false, - ); + let v = call_sql_value_to_value!("a", ConcreteDataType::boolean_datatype(), &sql_val); assert!(v.is_err()); assert!(format!("{v:?}").contains("Failed to parse number '3.0' to boolean column type")); let sql_val = SqlValue::Boolean(true); - let v = sql_value_to_value( - "a", - &ConcreteDataType::float64_datatype(), - &sql_val, - None, - None, - false, - ); + let v = call_sql_value_to_value!("a", ConcreteDataType::float64_datatype(), &sql_val); assert!(v.is_err()); assert!( format!("{v:?}").contains( @@ -725,41 +681,18 @@ mod test { ); let sql_val = SqlValue::HexStringLiteral("48656c6c6f20776f726c6421".to_string()); - let v = sql_value_to_value( - "a", - &ConcreteDataType::binary_datatype(), - &sql_val, - None, - None, - false, - ) - .unwrap(); + let v = call_sql_value_to_value!("a", ConcreteDataType::binary_datatype(), &sql_val)?; assert_eq!(Value::Binary(Bytes::from(b"Hello world!".as_slice())), v); let sql_val = SqlValue::DoubleQuotedString("MorningMyFriends".to_string()); - let v = sql_value_to_value( - "a", - &ConcreteDataType::binary_datatype(), - &sql_val, - None, - None, - false, - ) - .unwrap(); + let v = call_sql_value_to_value!("a", ConcreteDataType::binary_datatype(), &sql_val)?; assert_eq!( Value::Binary(Bytes::from(b"MorningMyFriends".as_slice())), v ); let sql_val = SqlValue::HexStringLiteral("9AF".to_string()); - let v = sql_value_to_value( - "a", - &ConcreteDataType::binary_datatype(), - &sql_val, - None, - None, - false, - ); + let v = call_sql_value_to_value!("a", ConcreteDataType::binary_datatype(), &sql_val); assert!(v.is_err()); assert!( format!("{v:?}").contains("odd number of digits"), @@ -767,38 +700,16 @@ mod test { ); let sql_val = SqlValue::HexStringLiteral("AG".to_string()); - let v = sql_value_to_value( - "a", - &ConcreteDataType::binary_datatype(), - &sql_val, - None, - None, - false, - ); + let v = call_sql_value_to_value!("a", ConcreteDataType::binary_datatype(), &sql_val); assert!(v.is_err()); assert!(format!("{v:?}").contains("invalid character"), "v is {v:?}",); let sql_val = SqlValue::DoubleQuotedString("MorningMyFriends".to_string()); - let v = sql_value_to_value( - "a", - &ConcreteDataType::json_datatype(), - &sql_val, - None, - None, - false, - ); + let v = call_sql_value_to_value!("a", ConcreteDataType::json_datatype(), &sql_val); assert!(v.is_err()); let sql_val = SqlValue::DoubleQuotedString(r#"{"a":"b"}"#.to_string()); - let v = sql_value_to_value( - "a", - &ConcreteDataType::json_datatype(), - &sql_val, - None, - None, - false, - ) - .unwrap(); + let v = call_sql_value_to_value!("a", ConcreteDataType::json_datatype(), &sql_val)?; assert_eq!( Value::Binary(Bytes::from( jsonb::parse_value(r#"{"a":"b"}"#.as_bytes()) @@ -808,16 +719,15 @@ mod test { )), v ); + Ok(()) } #[test] fn test_parse_json_to_jsonb() { - match parse_string_to_value( + match call_parse_string_to_value!( "json_col", r#"{"a": "b"}"#.to_string(), - &ConcreteDataType::json_datatype(), - None, - false, + ConcreteDataType::json_datatype() ) { Ok(Value::Binary(b)) => { assert_eq!( @@ -833,12 +743,10 @@ mod test { } assert!( - parse_string_to_value( + call_parse_string_to_value!( "json_col", r#"Nicola Kovac is the best rifler in the world"#.to_string(), - &ConcreteDataType::json_datatype(), - None, - false, + ConcreteDataType::json_datatype() ) .is_err() ) @@ -878,13 +786,10 @@ mod test { #[test] fn test_parse_date_literal() { - let value = sql_value_to_value( + let value = call_sql_value_to_value!( "date", - &ConcreteDataType::date_datatype(), - &SqlValue::DoubleQuotedString("2022-02-22".to_string()), - None, - None, - false, + ConcreteDataType::date_datatype(), + &SqlValue::DoubleQuotedString("2022-02-22".to_string()) ) .unwrap(); assert_eq!(ConcreteDataType::date_datatype(), value.data_type()); @@ -895,13 +800,11 @@ mod test { } // with timezone - let value = sql_value_to_value( + let value = call_sql_value_to_value!( "date", - &ConcreteDataType::date_datatype(), + ConcreteDataType::date_datatype(), &SqlValue::DoubleQuotedString("2022-02-22".to_string()), - Some(&Timezone::from_tz_string("+07:00").unwrap()), - None, - false, + timezone = &Timezone::from_tz_string("+07:00").unwrap() ) .unwrap(); assert_eq!(ConcreteDataType::date_datatype(), value.data_type()); @@ -913,16 +816,12 @@ mod test { } #[test] - fn test_parse_timestamp_literal() { - match parse_string_to_value( + fn test_parse_timestamp_literal() -> Result<()> { + match call_parse_string_to_value!( "timestamp_col", "2022-02-22T00:01:01+08:00".to_string(), - &ConcreteDataType::timestamp_millisecond_datatype(), - None, - false, - ) - .unwrap() - { + ConcreteDataType::timestamp_millisecond_datatype() + )? { Value::Timestamp(ts) => { assert_eq!(1645459261000, ts.value()); assert_eq!(TimeUnit::Millisecond, ts.unit()); @@ -932,15 +831,11 @@ mod test { } } - match parse_string_to_value( + match call_parse_string_to_value!( "timestamp_col", "2022-02-22T00:01:01+08:00".to_string(), - &ConcreteDataType::timestamp_datatype(TimeUnit::Second), - None, - false, - ) - .unwrap() - { + ConcreteDataType::timestamp_datatype(TimeUnit::Second) + )? { Value::Timestamp(ts) => { assert_eq!(1645459261, ts.value()); assert_eq!(TimeUnit::Second, ts.unit()); @@ -950,15 +845,11 @@ mod test { } } - match parse_string_to_value( + match call_parse_string_to_value!( "timestamp_col", "2022-02-22T00:01:01+08:00".to_string(), - &ConcreteDataType::timestamp_datatype(TimeUnit::Microsecond), - None, - false, - ) - .unwrap() - { + ConcreteDataType::timestamp_datatype(TimeUnit::Microsecond) + )? { Value::Timestamp(ts) => { assert_eq!(1645459261000000, ts.value()); assert_eq!(TimeUnit::Microsecond, ts.unit()); @@ -968,15 +859,11 @@ mod test { } } - match parse_string_to_value( + match call_parse_string_to_value!( "timestamp_col", "2022-02-22T00:01:01+08:00".to_string(), - &ConcreteDataType::timestamp_datatype(TimeUnit::Nanosecond), - None, - false, - ) - .unwrap() - { + ConcreteDataType::timestamp_datatype(TimeUnit::Nanosecond) + )? { Value::Timestamp(ts) => { assert_eq!(1645459261000000000, ts.value()); assert_eq!(TimeUnit::Nanosecond, ts.unit()); @@ -987,26 +874,21 @@ mod test { } assert!( - parse_string_to_value( + call_parse_string_to_value!( "timestamp_col", "2022-02-22T00:01:01+08".to_string(), - &ConcreteDataType::timestamp_datatype(TimeUnit::Nanosecond), - None, - false, + ConcreteDataType::timestamp_datatype(TimeUnit::Nanosecond) ) .is_err() ); // with timezone - match parse_string_to_value( + match call_parse_string_to_value!( "timestamp_col", "2022-02-22T00:01:01".to_string(), - &ConcreteDataType::timestamp_datatype(TimeUnit::Nanosecond), - Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap()), - false, - ) - .unwrap() - { + ConcreteDataType::timestamp_datatype(TimeUnit::Nanosecond), + timezone = &Timezone::from_tz_string("Asia/Shanghai").unwrap() + )? { Value::Timestamp(ts) => { assert_eq!(1645459261000000000, ts.value()); assert_eq!("2022-02-21 16:01:01+0000", ts.to_iso8601_string()); @@ -1016,51 +898,42 @@ mod test { unreachable!() } } + Ok(()) } #[test] fn test_parse_placeholder_value() { assert!( - sql_value_to_value( + call_sql_value_to_value!( "test", - &ConcreteDataType::string_datatype(), + ConcreteDataType::string_datatype(), + &SqlValue::Placeholder("default".into()) + ) + .is_err() + ); + assert!( + call_sql_value_to_value!( + "test", + ConcreteDataType::string_datatype(), &SqlValue::Placeholder("default".into()), - None, - None, - false + unary_op = UnaryOperator::Minus ) .is_err() ); assert!( - sql_value_to_value( + call_sql_value_to_value!( "test", - &ConcreteDataType::string_datatype(), - &SqlValue::Placeholder("default".into()), - None, - Some(UnaryOperator::Minus), - false - ) - .is_err() - ); - assert!( - sql_value_to_value( - "test", - &ConcreteDataType::uint16_datatype(), + ConcreteDataType::uint16_datatype(), &SqlValue::Number("3".into(), false), - None, - Some(UnaryOperator::Minus), - false + unary_op = UnaryOperator::Minus ) .is_err() ); assert!( - sql_value_to_value( + call_sql_value_to_value!( "test", - &ConcreteDataType::uint16_datatype(), - &SqlValue::Number("3".into(), false), - None, - None, - false + ConcreteDataType::uint16_datatype(), + &SqlValue::Number("3".into(), false) ) .is_ok() ); @@ -1070,77 +943,60 @@ mod test { fn test_auto_string_to_numeric() { // Test with auto_string_to_numeric=true let sql_val = SqlValue::SingleQuotedString("123".to_string()); - let v = sql_value_to_value( + let v = call_sql_value_to_value!( "a", - &ConcreteDataType::int32_datatype(), + ConcreteDataType::int32_datatype(), &sql_val, - None, - None, - true, + auto_string_to_numeric ) .unwrap(); assert_eq!(Value::Int32(123), v); // Test with a float string let sql_val = SqlValue::SingleQuotedString("3.5".to_string()); - let v = sql_value_to_value( + let v = call_sql_value_to_value!( "a", - &ConcreteDataType::float64_datatype(), + ConcreteDataType::float64_datatype(), &sql_val, - None, - None, - true, + auto_string_to_numeric ) .unwrap(); assert_eq!(Value::Float64(OrderedFloat(3.5)), v); // Test with auto_string_to_numeric=false let sql_val = SqlValue::SingleQuotedString("123".to_string()); - let v = sql_value_to_value( - "a", - &ConcreteDataType::int32_datatype(), - &sql_val, - None, - None, - false, - ); + let v = call_sql_value_to_value!("a", ConcreteDataType::int32_datatype(), &sql_val); assert!(v.is_err()); // Test with an invalid numeric string but auto_string_to_numeric=true // Should return an error now with the new auto_cast_to_numeric behavior let sql_val = SqlValue::SingleQuotedString("not_a_number".to_string()); - let v = sql_value_to_value( + let v = call_sql_value_to_value!( "a", - &ConcreteDataType::int32_datatype(), + ConcreteDataType::int32_datatype(), &sql_val, - None, - None, - true, + auto_string_to_numeric ); assert!(v.is_err()); // Test with boolean type let sql_val = SqlValue::SingleQuotedString("true".to_string()); - let v = sql_value_to_value( + let v = call_sql_value_to_value!( "a", - &ConcreteDataType::boolean_datatype(), + ConcreteDataType::boolean_datatype(), &sql_val, - None, - None, - true, + auto_string_to_numeric ) .unwrap(); assert_eq!(Value::Boolean(true), v); // Non-numeric types should still be handled normally let sql_val = SqlValue::SingleQuotedString("hello".to_string()); - let v = sql_value_to_value( + let v = call_sql_value_to_value!( "a", - &ConcreteDataType::string_datatype(), + ConcreteDataType::string_datatype(), &sql_val, - None, - None, - true, + auto_string_to_numeric ); assert!(v.is_ok()); } diff --git a/src/common/sql/src/default_constraint.rs b/src/common/sql/src/default_constraint.rs index e2a57337a5..0084320835 100644 --- a/src/common/sql/src/default_constraint.rs +++ b/src/common/sql/src/default_constraint.rs @@ -14,8 +14,8 @@ use common_time::timezone::Timezone; use datatypes::prelude::ConcreteDataType; -use datatypes::schema::ColumnDefaultConstraint; use datatypes::schema::constraint::{CURRENT_TIMESTAMP, CURRENT_TIMESTAMP_FN}; +use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema}; use snafu::ensure; use sqlparser::ast::ValueWithSpan; pub use sqlparser::ast::{ @@ -47,9 +47,12 @@ pub fn parse_column_default_constraint( ); let default_constraint = match &opt.option { - ColumnOption::Default(Expr::Value(v)) => ColumnDefaultConstraint::Value( - sql_value_to_value(column_name, data_type, &v.value, timezone, None, false)?, - ), + ColumnOption::Default(Expr::Value(v)) => { + let schema = ColumnSchema::new(column_name, data_type.clone(), true); + ColumnDefaultConstraint::Value(sql_value_to_value( + &schema, &v.value, timezone, None, false, + )?) + } ColumnOption::Default(Expr::Function(func)) => { let mut func = format!("{func}").to_lowercase(); // normalize CURRENT_TIMESTAMP to CURRENT_TIMESTAMP() @@ -80,8 +83,7 @@ pub fn parse_column_default_constraint( if let Expr::Value(v) = &**expr { let value = sql_value_to_value( - column_name, - data_type, + &ColumnSchema::new(column_name, data_type.clone(), true), &v.value, timezone, Some(*op), diff --git a/src/operator/src/req_convert/insert/stmt_to_region.rs b/src/operator/src/req_convert/insert/stmt_to_region.rs index ef4e7cac8e..e2e0969035 100644 --- a/src/operator/src/req_convert/insert/stmt_to_region.rs +++ b/src/operator/src/req_convert/insert/stmt_to_region.rs @@ -410,8 +410,7 @@ fn sql_value_to_value( })? } else { common_sql::convert::sql_value_to_value( - column, - &column_schema.data_type, + column_schema, sql_val, timezone, None, diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index 9c1b6a749e..54efd80369 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -52,6 +52,7 @@ use common_time::Timestamp; use common_time::range::TimestampRange; use datafusion_expr::LogicalPlan; use datatypes::prelude::ConcreteDataType; +use datatypes::schema::ColumnSchema; use humantime::format_duration; use itertools::Itertools; use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef}; @@ -644,11 +645,20 @@ impl StatementExecutor { })? .unit(); + let start_column = ColumnSchema::new( + "range_start", + ConcreteDataType::timestamp_datatype(time_unit), + false, + ); + let end_column = ColumnSchema::new( + "range_end", + ConcreteDataType::timestamp_datatype(time_unit), + false, + ); let mut time_ranges = Vec::with_capacity(sql_values_time_range.len()); for (start, end) in sql_values_time_range { let start = common_sql::convert::sql_value_to_value( - "range_start", - &ConcreteDataType::timestamp_datatype(time_unit), + &start_column, start, Some(&query_ctx.timezone()), None, @@ -667,8 +677,7 @@ impl StatementExecutor { })?; let end = common_sql::convert::sql_value_to_value( - "range_end", - &ConcreteDataType::timestamp_datatype(time_unit), + &end_column, end, Some(&query_ctx.timezone()), None, diff --git a/src/operator/src/statement/admin.rs b/src/operator/src/statement/admin.rs index f97e9e552e..e606f8a939 100644 --- a/src/operator/src/statement/admin.rs +++ b/src/operator/src/statement/admin.rs @@ -242,8 +242,12 @@ fn values_to_vectors_by_exact_types( args.iter() .zip(exact_types.iter()) .map(|(value, data_type)| { - let data_type = &ConcreteDataType::from_arrow_type(data_type); - let value = sql_value_to_value(DUMMY_COLUMN, data_type, value, tz, None, false) + let schema = ColumnSchema::new( + DUMMY_COLUMN, + ConcreteDataType::from_arrow_type(data_type), + true, + ); + let value = sql_value_to_value(&schema, value, tz, None, false) .context(error::SqlCommonSnafu)?; Ok(value_to_vector(value)) @@ -260,10 +264,12 @@ fn values_to_vectors_by_valid_types( args.iter() .map(|value| { for data_type in valid_types { - let data_type = &ConcreteDataType::from_arrow_type(data_type); - if let Ok(value) = - sql_value_to_value(DUMMY_COLUMN, data_type, value, tz, None, false) - { + let schema = ColumnSchema::new( + DUMMY_COLUMN, + ConcreteDataType::from_arrow_type(data_type), + true, + ); + if let Ok(value) = sql_value_to_value(&schema, value, tz, None, false) { return Ok(value_to_vector(value)); } } diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 89ca7f2b78..dc2ca3e91d 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -50,7 +50,7 @@ use common_time::{Timestamp, Timezone}; use datafusion_common::tree_node::TreeNodeVisitor; use datafusion_expr::LogicalPlan; use datatypes::prelude::ConcreteDataType; -use datatypes::schema::{RawSchema, Schema}; +use datatypes::schema::{ColumnSchema, RawSchema, Schema}; use datatypes::value::Value; use partition::expr::{Operand, PartitionExpr, RestrictedOp}; use partition::multi_dim::MultiDimPartitionRule; @@ -2001,8 +2001,7 @@ fn convert_value( unary_op: Option, ) -> Result { sql_value_to_value( - "", - &data_type, + &ColumnSchema::new("", data_type, true), value, Some(timezone), unary_op, diff --git a/src/servers/src/mysql/helper.rs b/src/servers/src/mysql/helper.rs index f765fba2d4..4427ec4b2f 100644 --- a/src/servers/src/mysql/helper.rs +++ b/src/servers/src/mysql/helper.rs @@ -22,6 +22,7 @@ use common_time::{Date, Timestamp}; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_expr::LogicalPlan; use datatypes::prelude::ConcreteDataType; +use datatypes::schema::ColumnSchema; use datatypes::types::TimestampType; use datatypes::value::{self, Value}; use itertools::Itertools; @@ -254,9 +255,10 @@ pub fn convert_value(param: &ParamValue, t: &ConcreteDataType) -> Result Result { + let column_schema = ColumnSchema::new("", t.clone(), true); match param { Expr::Value(v) => { - let v = sql_value_to_value("", t, &v.value, None, None, true); + let v = sql_value_to_value(&column_schema, &v.value, None, None, true); match v { Ok(v) => v .try_to_scalar_value(t) @@ -268,7 +270,7 @@ pub fn convert_expr_to_scalar_value(param: &Expr, t: &ConcreteDataType) -> Resul } } Expr::UnaryOp { op, expr } if let Expr::Value(v) = &**expr => { - let v = sql_value_to_value("", t, &v.value, None, Some(*op), true); + let v = sql_value_to_value(&column_schema, &v.value, None, Some(*op), true); match v { Ok(v) => v .try_to_scalar_value(t) diff --git a/src/sql/src/dialect.rs b/src/sql/src/dialect.rs index 7b303ee286..2793bd268d 100644 --- a/src/sql/src/dialect.rs +++ b/src/sql/src/dialect.rs @@ -40,4 +40,8 @@ impl Dialect for GreptimeDbDialect { fn supports_filter_during_aggregation(&self) -> bool { true } + + fn supports_struct_literal(&self) -> bool { + true + } } diff --git a/src/sql/src/error.rs b/src/sql/src/error.rs index cb7a71f0e4..1a9a8fa81a 100644 --- a/src/sql/src/error.rs +++ b/src/sql/src/error.rs @@ -208,9 +208,9 @@ pub enum Error { location: Location, }, - #[snafu(display("Invalid expr as option value, error: {error}"))] - InvalidExprAsOptionValue { - error: String, + #[snafu(display("Invalid JSON structure setting, reason: {reason}"))] + InvalidJsonStructureSetting { + reason: String, #[snafu(implicit)] location: Location, }, @@ -373,7 +373,7 @@ impl ErrorExt for Error { } InvalidColumnOption { .. } - | InvalidExprAsOptionValue { .. } + | InvalidJsonStructureSetting { .. } | InvalidDatabaseName { .. } | InvalidDatabaseOption { .. } | ColumnTypeMismatch { .. } diff --git a/src/sql/src/parsers/create_parser/json.rs b/src/sql/src/parsers/create_parser/json.rs index 649a91106a..30364546b3 100644 --- a/src/sql/src/parsers/create_parser/json.rs +++ b/src/sql/src/parsers/create_parser/json.rs @@ -40,16 +40,17 @@ pub(super) fn parse_json_datatype_options(parser: &mut Parser<'_>) -> Result), + ts TIMESTAMP TIME INDEX, +)"#; + let options = parse(sql).unwrap(); + assert_eq!(options.len(), 1); + let option = options.value(JSON_OPT_FIELDS); + let expected = Some(&OptionValue::new(Expr::Struct { + values: vec![], + fields: vec![ + StructField { + field_name: Some(Ident::new("i")), + field_type: DataType::Int(None), + options: None, + }, + StructField { + field_name: Some(Ident::with_quote('"', "o.a")), + field_type: DataType::String(None), + options: None, + }, + StructField { + field_name: Some(Ident::with_quote('"', "o.b")), + field_type: DataType::String(None), + options: None, + }, + StructField { + field_name: Some(Ident::with_quote('`', "x.y.z")), + field_type: DataType::Float64, + options: None, + }, + ], + })); + assert_eq!(option, expected); + + let sql = r#" +CREATE TABLE json_data ( my_json JSON(format = "partial", unstructured_keys = ["k", "foo.bar", "a.b.c"]), ts TIMESTAMP TIME INDEX, )"#; diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index a6ee60164a..eb3c6be336 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -40,6 +40,7 @@ use api::v1::SemanticType; use common_sql::default_constraint::parse_column_default_constraint; use common_time::timezone::Timezone; use datatypes::extension::json::{JsonExtensionType, JsonMetadata}; +use datatypes::json::JsonStructureSettings; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{COMMENT_KEY, ColumnDefaultConstraint, ColumnSchema}; use datatypes::types::json_type::JsonNativeType; @@ -281,8 +282,17 @@ pub fn sql_data_type_to_concrete_data_type( } }, SqlDataType::JSON => { - let format = if column_extensions.json_datatype_options.is_some() { - JsonFormat::Native(Box::new(JsonNativeType::Null)) + let format = if let Some(x) = column_extensions.build_json_structure_settings()? { + if let Some(fields) = match x { + JsonStructureSettings::Structured(fields) => fields, + JsonStructureSettings::UnstructuredRaw => None, + JsonStructureSettings::PartialUnstructuredByKey { fields, .. } => fields, + } { + let datatype = &ConcreteDataType::Struct(fields); + JsonFormat::Native(Box::new(datatype.into())) + } else { + JsonFormat::Native(Box::new(JsonNativeType::Null)) + } } else { JsonFormat::Jsonb }; diff --git a/src/sql/src/statements/create.rs b/src/sql/src/statements/create.rs index 3791effac0..2bfb05bc4b 100644 --- a/src/sql/src/statements/create.rs +++ b/src/sql/src/statements/create.rs @@ -14,27 +14,30 @@ use std::collections::{HashMap, HashSet}; use std::fmt::{Display, Formatter}; +use std::sync::Arc; use common_catalog::consts::FILE_ENGINE; +use datatypes::data_type::ConcreteDataType; use datatypes::json::JsonStructureSettings; use datatypes::schema::{ FulltextOptions, SkippingIndexOptions, VectorDistanceMetric, VectorIndexEngineType, VectorIndexOptions, }; +use datatypes::types::StructType; use itertools::Itertools; use serde::Serialize; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use sqlparser::ast::{ColumnOptionDef, DataType, Expr, Query}; use sqlparser_derive::{Visit, VisitMut}; use crate::ast::{ColumnDef, Ident, ObjectName, Value as SqlValue}; use crate::error::{ - InvalidFlowQuerySnafu, InvalidSqlSnafu, Result, SetFulltextOptionSnafu, - SetSkippingIndexOptionSnafu, + InvalidFlowQuerySnafu, InvalidJsonStructureSettingSnafu, InvalidSqlSnafu, Result, + SetFulltextOptionSnafu, SetSkippingIndexOptionSnafu, }; -use crate::statements::OptionMap; use crate::statements::statement::Statement; use crate::statements::tql::Tql; +use crate::statements::{OptionMap, sql_data_type_to_concrete_data_type}; use crate::util::OptionValue; const LINE_SEP: &str = ",\n"; @@ -44,6 +47,7 @@ pub const VECTOR_OPT_DIM: &str = "dim"; pub const JSON_OPT_UNSTRUCTURED_KEYS: &str = "unstructured_keys"; pub const JSON_OPT_FORMAT: &str = "format"; +pub(crate) const JSON_OPT_FIELDS: &str = "fields"; pub const JSON_FORMAT_FULL_STRUCTURED: &str = "structured"; pub const JSON_FORMAT_RAW: &str = "raw"; pub const JSON_FORMAT_PARTIAL: &str = "partial"; @@ -346,14 +350,51 @@ impl ColumnExtensions { }) .unwrap_or_default(); + let fields = if let Some(value) = options.value(JSON_OPT_FIELDS) { + let fields = value + .as_struct_fields() + .context(InvalidJsonStructureSettingSnafu { + reason: format!(r#"expect "{JSON_OPT_FIELDS}" a struct, actual: "{value}""#,), + })?; + let fields = fields + .iter() + .map(|field| { + let name = field.field_name.as_ref().map(|x| x.value.clone()).context( + InvalidJsonStructureSettingSnafu { + reason: format!(r#"missing field name in "{field}""#), + }, + )?; + let datatype = sql_data_type_to_concrete_data_type( + &field.field_type, + &Default::default(), + )?; + Ok(datatypes::types::StructField::new(name, datatype, true)) + }) + .collect::>()?; + Some(StructType::new(Arc::new(fields))) + } else { + None + }; + options .get(JSON_OPT_FORMAT) .map(|format| match format { - JSON_FORMAT_FULL_STRUCTURED => Ok(JsonStructureSettings::Structured(None)), - JSON_FORMAT_PARTIAL => Ok(JsonStructureSettings::PartialUnstructuredByKey { - fields: None, - unstructured_keys, - }), + JSON_FORMAT_FULL_STRUCTURED => Ok(JsonStructureSettings::Structured(fields)), + JSON_FORMAT_PARTIAL => { + let fields = fields.map(|fields| { + let mut fields = Arc::unwrap_or_clone(fields.fields()); + fields.push(datatypes::types::StructField::new( + JsonStructureSettings::RAW_FIELD.to_string(), + ConcreteDataType::string_datatype(), + true, + )); + StructType::new(Arc::new(fields)) + }); + Ok(JsonStructureSettings::PartialUnstructuredByKey { + fields, + unstructured_keys, + }) + } JSON_FORMAT_RAW => Ok(JsonStructureSettings::UnstructuredRaw), _ => InvalidSqlSnafu { msg: format!("unknown JSON datatype 'format': {format}"), diff --git a/src/sql/src/util.rs b/src/sql/src/util.rs index 3b221d7642..0d80c8496d 100644 --- a/src/sql/src/util.rs +++ b/src/sql/src/util.rs @@ -17,14 +17,14 @@ use std::fmt::{Display, Formatter}; use itertools::Itertools; use serde::Serialize; -use snafu::ensure; use sqlparser::ast::{ - Array, Expr, Ident, ObjectName, SetExpr, SqlOption, TableFactor, Value, ValueWithSpan, + Array, Expr, Ident, ObjectName, SetExpr, SqlOption, StructField, TableFactor, Value, + ValueWithSpan, }; use sqlparser_derive::{Visit, VisitMut}; use crate::ast::ObjectNamePartExt; -use crate::error::{InvalidExprAsOptionValueSnafu, InvalidSqlSnafu, Result}; +use crate::error::{InvalidSqlSnafu, Result}; use crate::statements::create::SqlOrTql; /// Format an [ObjectName] without any quote of its idents. @@ -52,14 +52,8 @@ pub fn format_raw_object_name(name: &ObjectName) -> String { pub struct OptionValue(Expr); impl OptionValue { - fn try_new(expr: Expr) -> Result { - ensure!( - matches!(expr, Expr::Value(_) | Expr::Identifier(_) | Expr::Array(_)), - InvalidExprAsOptionValueSnafu { - error: format!("{expr} not accepted") - } - ); - Ok(Self(expr)) + pub fn new(expr: Expr) -> Self { + Self(expr) } fn expr_as_string(expr: &Expr) -> Option<&str> { @@ -106,6 +100,13 @@ impl OptionValue { _ => None, } } + + pub(crate) fn as_struct_fields(&self) -> Option<&[StructField]> { + match &self.0 { + Expr::Struct { fields, .. } => Some(fields), + _ => None, + } + } } impl From for OptionValue { @@ -155,7 +156,7 @@ pub fn parse_option_string(option: SqlOption) -> Result<(String, OptionValue)> { } .fail(); }; - let v = OptionValue::try_new(value)?; + let v = OptionValue::new(value); let k = key.value.to_lowercase(); Ok((k, v)) } diff --git a/tests-integration/resources/jsonbench-head-10.ndjson b/tests-integration/resources/jsonbench-head-10.ndjson new file mode 100644 index 0000000000..44abab4f37 --- /dev/null +++ b/tests-integration/resources/jsonbench-head-10.ndjson @@ -0,0 +1,10 @@ +{"did":"did:plc:yj3sjq3blzpynh27cumnp5ks","time_us":1732206349000167,"kind":"commit","commit":{"rev":"3lbhtytnn2k2f","operation":"create","collection":"app.bsky.feed.post","rkey":"3lbhtyteurk2y","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.095Z","langs":["en"],"reply":{"parent":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"},"root":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"}},"text":"aaaaah. LIght shines in a corner of WTF...."},"cid":"bafyreidblutgvj75o4q4akzyyejedjj6l3it6hgqwee6jpwv2wqph5fsgm"}} +{"did":"did:plc:3i4xf2v4wcnyktgv6satke64","time_us":1732206349000644,"kind":"commit","commit":{"rev":"3lbhuvzds6d2a","operation":"create","collection":"app.bsky.feed.like","rkey":"3lbhuvzdked2a","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.221Z","subject":{"cid":"bafyreidjvrcmckkm765mct5fph36x7kupkfo35rjklbf2k76xkzwyiauge","uri":"at://did:plc:azrv4rcbws6kmcga4fsbphg2/app.bsky.feed.post/3lbgjdpbiec2l"}},"cid":"bafyreia5l5vrkh5oj4cjyhcqby2dprhyvcyofo2q5562tijlae2pzih23m"}} +{"did":"did:plc:gccfnqqizz4urhchsaie6jft","time_us":1732206349001108,"kind":"commit","commit":{"rev":"3lbhuvze3gi2u","operation":"create","collection":"app.bsky.graph.follow","rkey":"3lbhuvzdtmi2u","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:27:40.923Z","subject":"did:plc:r7cdh4sgzqbfdc6wcdxxti7c"},"cid":"bafyreiew2p6cgirfaj45qoenm4fgumib7xoloclrap3jgkz5es7g7kby3i"}} +{"did":"did:plc:msxqf3twq7abtdw7dbfskphk","time_us":1732206349001372,"kind":"commit","commit":{"rev":"3lbhueija5p22","operation":"create","collection":"app.bsky.feed.like","rkey":"3lbhueiizcx22","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:15:58.232Z","subject":{"cid":"bafyreiavpshyqzrlo5m7fqodjhs6jevweqnif4phasiwimv4a7mnsqi2fe","uri":"at://did:plc:fusulxqc52zbrc75fi6xrcof/app.bsky.feed.post/3lbhskq5zn22f"}},"cid":"bafyreidjix4dauj2afjlbzmhj3a7gwftcevvmmy6edww6vrjdbst26rkby"}} +{"did":"did:plc:l5o3qjrmfztir54cpwlv2eme","time_us":1732206349001905,"kind":"commit","commit":{"rev":"3lbhtytohxc2o","operation":"create","collection":"app.bsky.feed.post","rkey":"3lbhtytjqzk2q","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.254Z","langs":["en"],"reply":{"parent":{"cid":"bafyreih35fe2jj3gchmgk4amold4l6sfxd2sby5wrg3jrws5fkdypxrbg4","uri":"at://did:plc:6wx2gg5yqgvmlu35r6y3bk6d/app.bsky.feed.post/3lbhtj2eb4s2o"},"root":{"cid":"bafyreifipyt3vctd4ptuoicvio7rbr5xvjv4afwuggnd2prnmn55mu6luu","uri":"at://did:plc:474ldquxwzrlcvjhhbbk2wte/app.bsky.feed.post/3lbhdzrynik27"}},"text":"okay i take mine back because I hadn’t heard this one yet^^"},"cid":"bafyreigzdsdne3z2xxcakgisieyj7y47hj6eg7lj6v4q25ah5q2qotu5ku"}} +{"did":"did:plc:jkaaf5j2yb2pvpx3ualm3vbh","time_us":1732206349002758,"kind":"commit","commit":{"rev":"3lbhudfo3yi2w","operation":"create","collection":"app.bsky.graph.follow","rkey":"3lbhudfnw4y2w","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:15:21.495Z","subject":"did:plc:amsdn2tbjxo3xrwqneqhh4cm"},"cid":"bafyreiaa2vsdr4ckwjg4jq47zfd7mewidywfz3qh3dmglcd6ozi4xwdega"}} +{"did":"did:plc:tdwz2h4id5dxezvohftsmffu","time_us":1732206349003106,"kind":"commit","commit":{"rev":"3lbhujcp4ix2n","operation":"create","collection":"app.bsky.graph.follow","rkey":"3lbhujcoxmp2n","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:18:39.913Z","subject":"did:plc:gf3vum7insztt5rxrpxdz2id"},"cid":"bafyreihaatlpar3abtx6ck3kde2ksic6zzflk4ppduhf6dxurytqrv33ni"}} +{"did":"did:plc:cdsd346mwow7aj3tgfkwsct3","time_us":1732206349003461,"kind":"commit","commit":{"rev":"3lbhus5vior2t","operation":"create","collection":"app.bsky.feed.repost","rkey":"3lbhus5vbtz2t","record":{"$type":"app.bsky.feed.repost","createdAt":"2024-11-21T16:23:36.714Z","subject":{"cid":"bafyreieaacfiobnuqvjhhsndyi5s3fd6krbzdduxsyrzfv43kczpcmkl6y","uri":"at://did:plc:o5q6dynpme4ndolc3heztasm/app.bsky.feed.post/3lbfli3qsoc2o"}},"cid":"bafyreid5ycocp5zq2g7fcx2xxzxrbafuh7b5qhtwuwiomzo6vqila2cbpu"}} +{"did":"did:plc:s4bwqchfzm6gjqfeb6mexgbu","time_us":1732206349003907,"kind":"commit","commit":{"rev":"3lbhuvzeccx2w","operation":"create","collection":"app.bsky.feed.post","rkey":"3lbhuvxf4qs2m","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:25:44.376Z","langs":["en"],"reply":{"parent":{"cid":"bafyreiaev27cfcxxvn2pdhrwwquzwgclujnulzbcfnn4p4fwgb6migjhw4","uri":"at://did:plc:zec6cslvgc3hhdatrhk6pq5p/app.bsky.feed.post/3lbhujvds4c2b"},"root":{"cid":"bafyreif7qjxhvecwnhlynijj6pf47jwvtkahsz3zh2kaipwu2bw2dxwaqq","uri":"at://did:plc:s4bwqchfzm6gjqfeb6mexgbu/app.bsky.feed.post/3lbhug53kkk2m"}},"text":"ㅤ\nㅤㅤ⌜ Blinking. She hadn't realized she spoke out loud. ⌟\n\nㅤ‘ It was nothing like that — . I was only thinking . . . ’\n\nㅤㅤ⌜ Trailing off, her mind occupied. ⌟\nㅤ"},"cid":"bafyreibugobcike72y4zxvdyz2oopyt6ywwqfielcwojkb27p7s6rlomgm"}} +{"did":"did:plc:hbc74dlsxhq53kp5oxges6d7","time_us":1732206349004769,"kind":"commit","commit":{"rev":"3lbhuvzedg52j","operation":"create","collection":"app.bsky.feed.like","rkey":"3lbhuvzdyof2j","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.167Z","subject":{"cid":"bafyreiaumopip75nzx2xjbugtwemdppsyx54bd2odf6q45f3o7xkocgari","uri":"at://did:plc:ig2jv6gqup4t7gdq2pmanknw/app.bsky.feed.post/3lbhuvtlaec2c"}},"cid":"bafyreidjk2svg2fdjiiwohmfmvp3hdxhpb33ycnixzbkyib5m6cocindxq"}} diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index 8d4dc9c3ce..0bef63cfa5 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -76,11 +76,10 @@ mod test { use super::*; use crate::standalone::GreptimeDbStandaloneBuilder; + use crate::test_util::execute_sql; use crate::tests; use crate::tests::MockDistributedInstance; - use crate::tests::test_util::{ - MockInstance, both_instances_cases, distributed, execute_sql, standalone, - }; + use crate::tests::test_util::{MockInstance, both_instances_cases, distributed, standalone}; #[tokio::test(flavor = "multi_thread")] async fn test_distributed_handle_ddl_request() { diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 5a0619c1cc..7075dae36e 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -24,6 +24,7 @@ use common_base::Plugins; use common_config::Configurable; use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::schema_name::SchemaNameKey; +use common_query::Output; use common_runtime::runtime::BuilderBuild; use common_runtime::{Builder as RuntimeBuilder, Runtime}; use common_test_util::ports; @@ -747,3 +748,10 @@ pub(crate) async fn prepare_another_catalog_and_schema(instance: &Instance) { .await .unwrap(); } + +pub async fn execute_sql(instance: &Arc, sql: &str) -> Output { + SqlQueryHandler::do_query(instance.as_ref(), sql, QueryContext::arc()) + .await + .remove(0) + .unwrap() +} diff --git a/tests-integration/src/tests.rs b/tests-integration/src/tests.rs index 74d713d3ee..ddb9b1c386 100644 --- a/tests-integration/src/tests.rs +++ b/tests-integration/src/tests.rs @@ -18,7 +18,7 @@ mod instance_noop_wal_test; mod instance_test; mod promql_test; mod reconcile_table; -pub(crate) mod test_util; +pub mod test_util; use std::collections::HashMap; use std::sync::Arc; diff --git a/tests-integration/src/tests/gc.rs b/tests-integration/src/tests/gc.rs index c2b402eb1a..f627291c4a 100644 --- a/tests-integration/src/tests/gc.rs +++ b/tests-integration/src/tests/gc.rs @@ -27,8 +27,8 @@ use store_api::storage::RegionId; use table::metadata::TableId; use crate::cluster::GreptimeDbClusterBuilder; -use crate::test_util::{StorageType, TempDirGuard, get_test_store_config}; -use crate::tests::test_util::{MockInstanceBuilder, TestContext, execute_sql, wait_procedure}; +use crate::test_util::{StorageType, TempDirGuard, execute_sql, get_test_store_config}; +use crate::tests::test_util::{MockInstanceBuilder, TestContext, wait_procedure}; /// Helper function to get table route information for GC procedure async fn get_table_route( diff --git a/tests-integration/src/tests/instance_noop_wal_test.rs b/tests-integration/src/tests/instance_noop_wal_test.rs index dd9c46cb1d..73eeec2672 100644 --- a/tests-integration/src/tests/instance_noop_wal_test.rs +++ b/tests-integration/src/tests/instance_noop_wal_test.rs @@ -18,9 +18,8 @@ use common_test_util::recordbatch::check_output_stream; use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig}; use crate::cluster::GreptimeDbClusterBuilder; -use crate::tests::test_util::{ - MockInstanceBuilder, RebuildableMockInstance, TestContext, execute_sql, -}; +use crate::test_util::execute_sql; +use crate::tests::test_util::{MockInstanceBuilder, RebuildableMockInstance, TestContext}; pub(crate) async fn distributed_with_noop_wal() -> TestContext { common_telemetry::init_default_ut_logging(); diff --git a/tests-integration/src/tests/reconcile_table.rs b/tests-integration/src/tests/reconcile_table.rs index d1204cba20..acbfcf6079 100644 --- a/tests-integration/src/tests/reconcile_table.rs +++ b/tests-integration/src/tests/reconcile_table.rs @@ -23,9 +23,10 @@ use common_test_util::recordbatch::check_output_stream; use table::table_reference::TableReference; use crate::cluster::GreptimeDbClusterBuilder; +use crate::test_util::execute_sql; use crate::tests::test_util::{ - MockInstanceBuilder, RebuildableMockInstance, TestContext, dump_kvbackend, execute_sql, - restore_kvbackend, try_execute_sql, wait_procedure, + MockInstanceBuilder, RebuildableMockInstance, TestContext, dump_kvbackend, restore_kvbackend, + try_execute_sql, wait_procedure, }; const CREATE_MONITOR_TABLE_SQL: &str = r#" diff --git a/tests-integration/src/tests/test_util.rs b/tests-integration/src/tests/test_util.rs index eccca85305..7ceec77c55 100644 --- a/tests-integration/src/tests/test_util.rs +++ b/tests-integration/src/tests/test_util.rs @@ -439,10 +439,6 @@ pub fn find_testing_resource(path: &str) -> String { prepare_path(&p) } -pub async fn execute_sql(instance: &Arc, sql: &str) -> Output { - execute_sql_with(instance, sql, QueryContext::arc()).await -} - pub async fn try_execute_sql(instance: &Arc, sql: &str) -> Result { try_execute_sql_with(instance, sql, QueryContext::arc()).await } @@ -455,16 +451,6 @@ pub async fn try_execute_sql_with( instance.do_query(sql, query_ctx).await.remove(0) } -pub async fn execute_sql_with( - instance: &Arc, - sql: &str, - query_ctx: QueryContextRef, -) -> Output { - try_execute_sql_with(instance, sql, query_ctx) - .await - .unwrap_or_else(|e| panic!("Failed to execute sql: {sql}, error: {e:?}")) -} - /// Dump the kv backend to a vector of key-value pairs. pub async fn dump_kvbackend(kv_backend: &KvBackendRef) -> Vec<(Vec, Vec)> { let req = RangeRequest::new().with_range(vec![0], vec![0]); diff --git a/tests-integration/tests/jsonbench.rs b/tests-integration/tests/jsonbench.rs new file mode 100644 index 0000000000..8405846eb2 --- /dev/null +++ b/tests-integration/tests/jsonbench.rs @@ -0,0 +1,94 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io::BufRead; +use std::sync::Arc; +use std::{fs, io}; + +use common_test_util::find_workspace_path; +use frontend::instance::Instance; +use tests_integration::standalone::GreptimeDbStandaloneBuilder; +use tests_integration::test_util::execute_sql; + +#[tokio::test] +async fn test_load_jsonbench_data() { + common_telemetry::init_default_ut_logging(); + + let instance = GreptimeDbStandaloneBuilder::new("test_load_jsonbench_data") + .build() + .await; + let frontend = instance.fe_instance(); + + create_table(frontend).await; + + desc_table(frontend).await; + + insert_data(frontend).await.unwrap(); +} + +async fn insert_data(frontend: &Arc) -> io::Result<()> { + let file = fs::File::open(find_workspace_path( + "tests-integration/resources/jsonbench-head-10.ndjson", + ))?; + let reader = io::BufReader::new(file); + for (i, line) in reader.lines().enumerate() { + let line = line?; + if line.is_empty() { + continue; + } + let sql = format!( + "INSERT INTO bluesky (ts, data) VALUES ({}, '{line}')", + i + 1, + ); + let output = execute_sql(frontend, &sql).await; + let output = output.data.pretty_print().await; + assert_eq!(output, "Affected Rows: 1"); + } + Ok(()) +} + +async fn desc_table(frontend: &Arc) { + let sql = "DESC TABLE bluesky"; + let expected = r#" ++--------+----------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ +| Column | Type | Key | Null | Default | Semantic Type | ++--------+----------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ +| data | Json | | YES | | FIELD | +| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP | ++--------+----------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+"#; + let output = execute_sql(frontend, sql).await; + let output = output.data.pretty_print().await; + assert_eq!(output, expected.trim()); +} + +async fn create_table(frontend: &Arc) { + let sql = r#" +CREATE TABLE bluesky ( + "data" JSON ( + format = "partial", + fields = Struct< + kind String, + "commit.operation" String, + "commit.collection" String, + did String, + time_us Bigint + >, + ), + ts Timestamp TIME INDEX, +) +"#; + let output = execute_sql(frontend, sql).await; + let output = output.data.pretty_print().await; + assert_eq!(output, "Affected Rows: 0"); +} diff --git a/tests-integration/tests/main.rs b/tests-integration/tests/main.rs index b30820517f..1069414917 100644 --- a/tests-integration/tests/main.rs +++ b/tests-integration/tests/main.rs @@ -16,6 +16,7 @@ mod grpc; #[macro_use] mod http; +mod jsonbench; #[macro_use] mod sql; #[macro_use]