Compare commits

...

3 Commits

Author SHA1 Message Date
luofucong
497dfde90b ingest jsonbench data
parse partial struct json datatype in create sql
2025-12-22 19:32:03 +08:00
Lei, HUANG
a8b512dded chore: expose symbols (#7451)
* chore/expose-symbols:
 ### Commit Message

 Enhance `merge_and_dedup` Functionality in `flush.rs`

 - **Function Signature Update**: Modified the `merge_and_dedup` function to accept `append_mode` and `merge_mode` as separate parameters instead of using `options`.
 - **Function Accessibility**: Changed the visibility of `merge_and_dedup` to `pub` to allow external access.
 - **Function Calls Update**: Updated calls to `merge_and_dedup` within `memtable_flat_sources` to align with the new function signature, passing `options.append_mode` and `options.merge_mode()` directly.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* chore/expose-symbols:
 ### Add Merge and Deduplication Functionality

 - **File**: `src/mito2/src/flush.rs`
   - Introduced `merge_and_dedup` function to merge multiple record batch iterators and apply deduplication based on specified modes.
   - Added detailed documentation for the function, explaining its arguments, behavior, and usage examples.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-12-22 05:39:03 +00:00
Ning Sun
bd8ffd3db9 feat: pgwire 0.37 (#7443) 2025-12-22 05:13:39 +00:00
31 changed files with 615 additions and 455 deletions

41
Cargo.lock generated
View File

@@ -2580,10 +2580,12 @@ dependencies = [
name = "common-sql"
version = "1.0.0-beta.3"
dependencies = [
"arrow-schema",
"common-base",
"common-decimal",
"common-error",
"common-macro",
"common-telemetry",
"common-time",
"datafusion-sql",
"datatypes",
@@ -9320,9 +9322,9 @@ dependencies = [
[[package]]
name = "pgwire"
version = "0.36.3"
version = "0.37.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70a2bcdcc4b20a88e0648778ecf00415bbd5b447742275439c22176835056f99"
checksum = "02d86d57e732d40382ceb9bfea80901d839bae8571aa11c06af9177aed9dfb6c"
dependencies = [
"async-trait",
"base64 0.22.1",
@@ -9341,6 +9343,7 @@ dependencies = [
"ryu",
"serde",
"serde_json",
"smol_str",
"stringprep",
"thiserror 2.0.17",
"tokio",
@@ -11505,10 +11508,11 @@ checksum = "1bc711410fbe7399f390ca1c3b60ad0f53f80e95c5eb935e52268a0e2cd49acc"
[[package]]
name = "serde"
version = "1.0.219"
version = "1.0.228"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6"
checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e"
dependencies = [
"serde_core",
"serde_derive",
]
@@ -11523,10 +11527,19 @@ dependencies = [
]
[[package]]
name = "serde_derive"
version = "1.0.219"
name = "serde_core"
version = "1.0.228"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00"
checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.228"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79"
dependencies = [
"proc-macro2",
"quote",
@@ -12001,6 +12014,16 @@ dependencies = [
"serde",
]
[[package]]
name = "smol_str"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3498b0a27f93ef1402f20eefacfaa1691272ac4eca1cdc8c596cb0a245d6cbf5"
dependencies = [
"borsh",
"serde_core",
]
[[package]]
name = "snafu"
version = "0.7.5"
@@ -12206,7 +12229,7 @@ dependencies = [
[[package]]
name = "sqlparser"
version = "0.58.0"
source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=4b519a5caa95472cc3988f5556813a583dd35af1#4b519a5caa95472cc3988f5556813a583dd35af1"
source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=a0ce2bc6eb3e804532932f39833c32432f5c9a39#a0ce2bc6eb3e804532932f39833c32432f5c9a39"
dependencies = [
"lazy_static",
"log",
@@ -12230,7 +12253,7 @@ dependencies = [
[[package]]
name = "sqlparser_derive"
version = "0.3.0"
source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=4b519a5caa95472cc3988f5556813a583dd35af1#4b519a5caa95472cc3988f5556813a583dd35af1"
source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=a0ce2bc6eb3e804532932f39833c32432f5c9a39#a0ce2bc6eb3e804532932f39833c32432f5c9a39"
dependencies = [
"proc-macro2",
"quote",

View File

@@ -332,7 +332,7 @@ datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.g
datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "4b519a5caa95472cc3988f5556813a583dd35af1" } # branch = "v0.58.x"
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "a0ce2bc6eb3e804532932f39833c32432f5c9a39" } # branch = "v0.58.x"
[profile.release]
debug = 1

View File

@@ -5,10 +5,12 @@ edition.workspace = true
license.workspace = true
[dependencies]
arrow-schema.workspace = true
common-base.workspace = true
common-decimal.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
datafusion-sql.workspace = true
datatypes.workspace = true

View File

@@ -14,11 +14,12 @@
use std::str::FromStr;
use arrow_schema::extension::ExtensionType;
use common_time::Timestamp;
use common_time::timezone::Timezone;
use datatypes::json::JsonStructureSettings;
use datatypes::extension::json::JsonExtensionType;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnDefaultConstraint;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema};
use datatypes::types::{JsonFormat, parse_string_to_jsonb, parse_string_to_vector_type_value};
use datatypes::value::{OrderedF32, OrderedF64, Value};
use snafu::{OptionExt, ResultExt, ensure};
@@ -124,13 +125,14 @@ pub(crate) fn sql_number_to_value(data_type: &ConcreteDataType, n: &str) -> Resu
/// If `auto_string_to_numeric` is true, tries to cast the string value to numeric values,
/// and returns error if the cast fails.
pub fn sql_value_to_value(
column_name: &str,
data_type: &ConcreteDataType,
column_schema: &ColumnSchema,
sql_val: &SqlValue,
timezone: Option<&Timezone>,
unary_op: Option<UnaryOperator>,
auto_string_to_numeric: bool,
) -> Result<Value> {
let column_name = &column_schema.name;
let data_type = &column_schema.data_type;
let mut value = match sql_val {
SqlValue::Number(n, _) => sql_number_to_value(data_type, n)?,
SqlValue::Null => Value::Null,
@@ -146,13 +148,9 @@ pub fn sql_value_to_value(
(*b).into()
}
SqlValue::DoubleQuotedString(s) | SqlValue::SingleQuotedString(s) => parse_string_to_value(
column_name,
s.clone(),
data_type,
timezone,
auto_string_to_numeric,
)?,
SqlValue::DoubleQuotedString(s) | SqlValue::SingleQuotedString(s) => {
parse_string_to_value(column_schema, s.clone(), timezone, auto_string_to_numeric)?
}
SqlValue::HexStringLiteral(s) => {
// Should not directly write binary into json column
ensure!(
@@ -244,12 +242,12 @@ pub fn sql_value_to_value(
}
pub(crate) fn parse_string_to_value(
column_name: &str,
column_schema: &ColumnSchema,
s: String,
data_type: &ConcreteDataType,
timezone: Option<&Timezone>,
auto_string_to_numeric: bool,
) -> Result<Value> {
let data_type = &column_schema.data_type;
if auto_string_to_numeric && let Some(value) = auto_cast_to_numeric(&s, data_type)? {
return Ok(value);
}
@@ -257,7 +255,7 @@ pub(crate) fn parse_string_to_value(
ensure!(
data_type.is_stringifiable(),
ColumnTypeMismatchSnafu {
column_name,
column_name: column_schema.name.clone(),
expect: data_type.clone(),
actual: ConcreteDataType::string_datatype(),
}
@@ -303,23 +301,21 @@ pub(crate) fn parse_string_to_value(
}
}
ConcreteDataType::Binary(_) => Ok(Value::Binary(s.as_bytes().into())),
ConcreteDataType::Json(j) => {
match &j.format {
JsonFormat::Jsonb => {
let v = parse_string_to_jsonb(&s).context(DatatypeSnafu)?;
Ok(Value::Binary(v.into()))
}
JsonFormat::Native(_inner) => {
// Always use the structured version at this level.
let serde_json_value =
serde_json::from_str(&s).context(DeserializeSnafu { json: s })?;
let json_structure_settings = JsonStructureSettings::Structured(None);
json_structure_settings
.encode(serde_json_value)
.context(DatatypeSnafu)
}
ConcreteDataType::Json(j) => match &j.format {
JsonFormat::Jsonb => {
let v = parse_string_to_jsonb(&s).context(DatatypeSnafu)?;
Ok(Value::Binary(v.into()))
}
}
JsonFormat::Native(_) => {
let extension_type: Option<JsonExtensionType> =
column_schema.extension_type().context(DatatypeSnafu)?;
let json_structure_settings = extension_type
.and_then(|x| x.metadata().json_structure_settings.clone())
.unwrap_or_default();
let v = serde_json::from_str(&s).context(DeserializeSnafu { json: s })?;
json_structure_settings.encode(v).context(DatatypeSnafu)
}
},
ConcreteDataType::Vector(d) => {
let v = parse_string_to_vector_type_value(&s, Some(d.dim)).context(DatatypeSnafu)?;
Ok(Value::Binary(v.into()))
@@ -417,305 +413,265 @@ mod test {
use super::*;
macro_rules! call_parse_string_to_value {
($column_name: expr, $input: expr, $data_type: expr) => {
call_parse_string_to_value!($column_name, $input, $data_type, None)
};
($column_name: expr, $input: expr, $data_type: expr, timezone = $timezone: expr) => {
call_parse_string_to_value!($column_name, $input, $data_type, Some($timezone))
};
($column_name: expr, $input: expr, $data_type: expr, $timezone: expr) => {{
let column_schema = ColumnSchema::new($column_name, $data_type, true);
parse_string_to_value(&column_schema, $input, $timezone, true)
}};
}
#[test]
fn test_string_to_value_auto_numeric() {
fn test_string_to_value_auto_numeric() -> Result<()> {
// Test string to boolean with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"true".to_string(),
&ConcreteDataType::boolean_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::boolean_datatype()
)?;
assert_eq!(Value::Boolean(true), result);
// Test invalid string to boolean with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_a_boolean".to_string(),
&ConcreteDataType::boolean_datatype(),
None,
true,
ConcreteDataType::boolean_datatype()
);
assert!(result.is_err());
// Test string to int8
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"42".to_string(),
&ConcreteDataType::int8_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::int8_datatype()
)?;
assert_eq!(Value::Int8(42), result);
// Test invalid string to int8 with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_an_int8".to_string(),
&ConcreteDataType::int8_datatype(),
None,
true,
ConcreteDataType::int8_datatype()
);
assert!(result.is_err());
// Test string to int16
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"1000".to_string(),
&ConcreteDataType::int16_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::int16_datatype()
)?;
assert_eq!(Value::Int16(1000), result);
// Test invalid string to int16 with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_an_int16".to_string(),
&ConcreteDataType::int16_datatype(),
None,
true,
ConcreteDataType::int16_datatype()
);
assert!(result.is_err());
// Test string to int32
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"100000".to_string(),
&ConcreteDataType::int32_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::int32_datatype()
)?;
assert_eq!(Value::Int32(100000), result);
// Test invalid string to int32 with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_an_int32".to_string(),
&ConcreteDataType::int32_datatype(),
None,
true,
ConcreteDataType::int32_datatype()
);
assert!(result.is_err());
// Test string to int64
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"1000000".to_string(),
&ConcreteDataType::int64_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::int64_datatype()
)?;
assert_eq!(Value::Int64(1000000), result);
// Test invalid string to int64 with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_an_int64".to_string(),
&ConcreteDataType::int64_datatype(),
None,
true,
ConcreteDataType::int64_datatype()
);
assert!(result.is_err());
// Test string to uint8
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"200".to_string(),
&ConcreteDataType::uint8_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::uint8_datatype()
)?;
assert_eq!(Value::UInt8(200), result);
// Test invalid string to uint8 with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_a_uint8".to_string(),
&ConcreteDataType::uint8_datatype(),
None,
true,
ConcreteDataType::uint8_datatype()
);
assert!(result.is_err());
// Test string to uint16
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"60000".to_string(),
&ConcreteDataType::uint16_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::uint16_datatype()
)?;
assert_eq!(Value::UInt16(60000), result);
// Test invalid string to uint16 with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_a_uint16".to_string(),
&ConcreteDataType::uint16_datatype(),
None,
true,
ConcreteDataType::uint16_datatype()
);
assert!(result.is_err());
// Test string to uint32
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"4000000000".to_string(),
&ConcreteDataType::uint32_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::uint32_datatype()
)?;
assert_eq!(Value::UInt32(4000000000), result);
// Test invalid string to uint32 with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_a_uint32".to_string(),
&ConcreteDataType::uint32_datatype(),
None,
true,
ConcreteDataType::uint32_datatype()
);
assert!(result.is_err());
// Test string to uint64
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"18446744073709551615".to_string(),
&ConcreteDataType::uint64_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::uint64_datatype()
)?;
assert_eq!(Value::UInt64(18446744073709551615), result);
// Test invalid string to uint64 with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_a_uint64".to_string(),
&ConcreteDataType::uint64_datatype(),
None,
true,
ConcreteDataType::uint64_datatype()
);
assert!(result.is_err());
// Test string to float32
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"3.5".to_string(),
&ConcreteDataType::float32_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::float32_datatype()
)?;
assert_eq!(Value::Float32(OrderedF32::from(3.5)), result);
// Test invalid string to float32 with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_a_float32".to_string(),
&ConcreteDataType::float32_datatype(),
None,
true,
ConcreteDataType::float32_datatype()
);
assert!(result.is_err());
// Test string to float64
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"3.5".to_string(),
&ConcreteDataType::float64_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::float64_datatype()
)?;
assert_eq!(Value::Float64(OrderedF64::from(3.5)), result);
// Test invalid string to float64 with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_a_float64".to_string(),
&ConcreteDataType::float64_datatype(),
None,
true,
ConcreteDataType::float64_datatype()
);
assert!(result.is_err());
Ok(())
}
#[test]
fn test_sql_value_to_value() {
let sql_val = SqlValue::Null;
assert_eq!(
Value::Null,
sql_value_to_value(
"a",
&ConcreteDataType::float64_datatype(),
&sql_val,
None,
macro_rules! call_sql_value_to_value {
($column_name: expr, $data_type: expr, $sql_value: expr) => {
call_sql_value_to_value!($column_name, $data_type, $sql_value, None, None, false)
};
($column_name: expr, $data_type: expr, $sql_value: expr, timezone = $timezone: expr) => {
call_sql_value_to_value!(
$column_name,
$data_type,
$sql_value,
Some($timezone),
None,
false
)
.unwrap()
};
($column_name: expr, $data_type: expr, $sql_value: expr, unary_op = $unary_op: expr) => {
call_sql_value_to_value!(
$column_name,
$data_type,
$sql_value,
None,
Some($unary_op),
false
)
};
($column_name: expr, $data_type: expr, $sql_value: expr, auto_string_to_numeric) => {
call_sql_value_to_value!($column_name, $data_type, $sql_value, None, None, true)
};
($column_name: expr, $data_type: expr, $sql_value: expr, $timezone: expr, $unary_op: expr, $auto_string_to_numeric: expr) => {{
let column_schema = ColumnSchema::new($column_name, $data_type, true);
sql_value_to_value(
&column_schema,
$sql_value,
$timezone,
$unary_op,
$auto_string_to_numeric,
)
}};
}
#[test]
fn test_sql_value_to_value() -> Result<()> {
let sql_val = SqlValue::Null;
assert_eq!(
Value::Null,
call_sql_value_to_value!("a", ConcreteDataType::float64_datatype(), &sql_val)?
);
let sql_val = SqlValue::Boolean(true);
assert_eq!(
Value::Boolean(true),
sql_value_to_value(
"a",
&ConcreteDataType::boolean_datatype(),
&sql_val,
None,
None,
false
)
.unwrap()
call_sql_value_to_value!("a", ConcreteDataType::boolean_datatype(), &sql_val)?
);
let sql_val = SqlValue::Number("3.0".to_string(), false);
assert_eq!(
Value::Float64(OrderedFloat(3.0)),
sql_value_to_value(
"a",
&ConcreteDataType::float64_datatype(),
&sql_val,
None,
None,
false
)
.unwrap()
call_sql_value_to_value!("a", ConcreteDataType::float64_datatype(), &sql_val)?
);
let sql_val = SqlValue::Number("3.0".to_string(), false);
let v = sql_value_to_value(
"a",
&ConcreteDataType::boolean_datatype(),
&sql_val,
None,
None,
false,
);
let v = call_sql_value_to_value!("a", ConcreteDataType::boolean_datatype(), &sql_val);
assert!(v.is_err());
assert!(format!("{v:?}").contains("Failed to parse number '3.0' to boolean column type"));
let sql_val = SqlValue::Boolean(true);
let v = sql_value_to_value(
"a",
&ConcreteDataType::float64_datatype(),
&sql_val,
None,
None,
false,
);
let v = call_sql_value_to_value!("a", ConcreteDataType::float64_datatype(), &sql_val);
assert!(v.is_err());
assert!(
format!("{v:?}").contains(
@@ -725,41 +681,18 @@ mod test {
);
let sql_val = SqlValue::HexStringLiteral("48656c6c6f20776f726c6421".to_string());
let v = sql_value_to_value(
"a",
&ConcreteDataType::binary_datatype(),
&sql_val,
None,
None,
false,
)
.unwrap();
let v = call_sql_value_to_value!("a", ConcreteDataType::binary_datatype(), &sql_val)?;
assert_eq!(Value::Binary(Bytes::from(b"Hello world!".as_slice())), v);
let sql_val = SqlValue::DoubleQuotedString("MorningMyFriends".to_string());
let v = sql_value_to_value(
"a",
&ConcreteDataType::binary_datatype(),
&sql_val,
None,
None,
false,
)
.unwrap();
let v = call_sql_value_to_value!("a", ConcreteDataType::binary_datatype(), &sql_val)?;
assert_eq!(
Value::Binary(Bytes::from(b"MorningMyFriends".as_slice())),
v
);
let sql_val = SqlValue::HexStringLiteral("9AF".to_string());
let v = sql_value_to_value(
"a",
&ConcreteDataType::binary_datatype(),
&sql_val,
None,
None,
false,
);
let v = call_sql_value_to_value!("a", ConcreteDataType::binary_datatype(), &sql_val);
assert!(v.is_err());
assert!(
format!("{v:?}").contains("odd number of digits"),
@@ -767,38 +700,16 @@ mod test {
);
let sql_val = SqlValue::HexStringLiteral("AG".to_string());
let v = sql_value_to_value(
"a",
&ConcreteDataType::binary_datatype(),
&sql_val,
None,
None,
false,
);
let v = call_sql_value_to_value!("a", ConcreteDataType::binary_datatype(), &sql_val);
assert!(v.is_err());
assert!(format!("{v:?}").contains("invalid character"), "v is {v:?}",);
let sql_val = SqlValue::DoubleQuotedString("MorningMyFriends".to_string());
let v = sql_value_to_value(
"a",
&ConcreteDataType::json_datatype(),
&sql_val,
None,
None,
false,
);
let v = call_sql_value_to_value!("a", ConcreteDataType::json_datatype(), &sql_val);
assert!(v.is_err());
let sql_val = SqlValue::DoubleQuotedString(r#"{"a":"b"}"#.to_string());
let v = sql_value_to_value(
"a",
&ConcreteDataType::json_datatype(),
&sql_val,
None,
None,
false,
)
.unwrap();
let v = call_sql_value_to_value!("a", ConcreteDataType::json_datatype(), &sql_val)?;
assert_eq!(
Value::Binary(Bytes::from(
jsonb::parse_value(r#"{"a":"b"}"#.as_bytes())
@@ -808,16 +719,15 @@ mod test {
)),
v
);
Ok(())
}
#[test]
fn test_parse_json_to_jsonb() {
match parse_string_to_value(
match call_parse_string_to_value!(
"json_col",
r#"{"a": "b"}"#.to_string(),
&ConcreteDataType::json_datatype(),
None,
false,
ConcreteDataType::json_datatype()
) {
Ok(Value::Binary(b)) => {
assert_eq!(
@@ -833,12 +743,10 @@ mod test {
}
assert!(
parse_string_to_value(
call_parse_string_to_value!(
"json_col",
r#"Nicola Kovac is the best rifler in the world"#.to_string(),
&ConcreteDataType::json_datatype(),
None,
false,
ConcreteDataType::json_datatype()
)
.is_err()
)
@@ -878,13 +786,10 @@ mod test {
#[test]
fn test_parse_date_literal() {
let value = sql_value_to_value(
let value = call_sql_value_to_value!(
"date",
&ConcreteDataType::date_datatype(),
&SqlValue::DoubleQuotedString("2022-02-22".to_string()),
None,
None,
false,
ConcreteDataType::date_datatype(),
&SqlValue::DoubleQuotedString("2022-02-22".to_string())
)
.unwrap();
assert_eq!(ConcreteDataType::date_datatype(), value.data_type());
@@ -895,13 +800,11 @@ mod test {
}
// with timezone
let value = sql_value_to_value(
let value = call_sql_value_to_value!(
"date",
&ConcreteDataType::date_datatype(),
ConcreteDataType::date_datatype(),
&SqlValue::DoubleQuotedString("2022-02-22".to_string()),
Some(&Timezone::from_tz_string("+07:00").unwrap()),
None,
false,
timezone = &Timezone::from_tz_string("+07:00").unwrap()
)
.unwrap();
assert_eq!(ConcreteDataType::date_datatype(), value.data_type());
@@ -913,16 +816,12 @@ mod test {
}
#[test]
fn test_parse_timestamp_literal() {
match parse_string_to_value(
fn test_parse_timestamp_literal() -> Result<()> {
match call_parse_string_to_value!(
"timestamp_col",
"2022-02-22T00:01:01+08:00".to_string(),
&ConcreteDataType::timestamp_millisecond_datatype(),
None,
false,
)
.unwrap()
{
ConcreteDataType::timestamp_millisecond_datatype()
)? {
Value::Timestamp(ts) => {
assert_eq!(1645459261000, ts.value());
assert_eq!(TimeUnit::Millisecond, ts.unit());
@@ -932,15 +831,11 @@ mod test {
}
}
match parse_string_to_value(
match call_parse_string_to_value!(
"timestamp_col",
"2022-02-22T00:01:01+08:00".to_string(),
&ConcreteDataType::timestamp_datatype(TimeUnit::Second),
None,
false,
)
.unwrap()
{
ConcreteDataType::timestamp_datatype(TimeUnit::Second)
)? {
Value::Timestamp(ts) => {
assert_eq!(1645459261, ts.value());
assert_eq!(TimeUnit::Second, ts.unit());
@@ -950,15 +845,11 @@ mod test {
}
}
match parse_string_to_value(
match call_parse_string_to_value!(
"timestamp_col",
"2022-02-22T00:01:01+08:00".to_string(),
&ConcreteDataType::timestamp_datatype(TimeUnit::Microsecond),
None,
false,
)
.unwrap()
{
ConcreteDataType::timestamp_datatype(TimeUnit::Microsecond)
)? {
Value::Timestamp(ts) => {
assert_eq!(1645459261000000, ts.value());
assert_eq!(TimeUnit::Microsecond, ts.unit());
@@ -968,15 +859,11 @@ mod test {
}
}
match parse_string_to_value(
match call_parse_string_to_value!(
"timestamp_col",
"2022-02-22T00:01:01+08:00".to_string(),
&ConcreteDataType::timestamp_datatype(TimeUnit::Nanosecond),
None,
false,
)
.unwrap()
{
ConcreteDataType::timestamp_datatype(TimeUnit::Nanosecond)
)? {
Value::Timestamp(ts) => {
assert_eq!(1645459261000000000, ts.value());
assert_eq!(TimeUnit::Nanosecond, ts.unit());
@@ -987,26 +874,21 @@ mod test {
}
assert!(
parse_string_to_value(
call_parse_string_to_value!(
"timestamp_col",
"2022-02-22T00:01:01+08".to_string(),
&ConcreteDataType::timestamp_datatype(TimeUnit::Nanosecond),
None,
false,
ConcreteDataType::timestamp_datatype(TimeUnit::Nanosecond)
)
.is_err()
);
// with timezone
match parse_string_to_value(
match call_parse_string_to_value!(
"timestamp_col",
"2022-02-22T00:01:01".to_string(),
&ConcreteDataType::timestamp_datatype(TimeUnit::Nanosecond),
Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap()),
false,
)
.unwrap()
{
ConcreteDataType::timestamp_datatype(TimeUnit::Nanosecond),
timezone = &Timezone::from_tz_string("Asia/Shanghai").unwrap()
)? {
Value::Timestamp(ts) => {
assert_eq!(1645459261000000000, ts.value());
assert_eq!("2022-02-21 16:01:01+0000", ts.to_iso8601_string());
@@ -1016,51 +898,42 @@ mod test {
unreachable!()
}
}
Ok(())
}
#[test]
fn test_parse_placeholder_value() {
assert!(
sql_value_to_value(
call_sql_value_to_value!(
"test",
&ConcreteDataType::string_datatype(),
ConcreteDataType::string_datatype(),
&SqlValue::Placeholder("default".into())
)
.is_err()
);
assert!(
call_sql_value_to_value!(
"test",
ConcreteDataType::string_datatype(),
&SqlValue::Placeholder("default".into()),
None,
None,
false
unary_op = UnaryOperator::Minus
)
.is_err()
);
assert!(
sql_value_to_value(
call_sql_value_to_value!(
"test",
&ConcreteDataType::string_datatype(),
&SqlValue::Placeholder("default".into()),
None,
Some(UnaryOperator::Minus),
false
)
.is_err()
);
assert!(
sql_value_to_value(
"test",
&ConcreteDataType::uint16_datatype(),
ConcreteDataType::uint16_datatype(),
&SqlValue::Number("3".into(), false),
None,
Some(UnaryOperator::Minus),
false
unary_op = UnaryOperator::Minus
)
.is_err()
);
assert!(
sql_value_to_value(
call_sql_value_to_value!(
"test",
&ConcreteDataType::uint16_datatype(),
&SqlValue::Number("3".into(), false),
None,
None,
false
ConcreteDataType::uint16_datatype(),
&SqlValue::Number("3".into(), false)
)
.is_ok()
);
@@ -1070,77 +943,60 @@ mod test {
fn test_auto_string_to_numeric() {
// Test with auto_string_to_numeric=true
let sql_val = SqlValue::SingleQuotedString("123".to_string());
let v = sql_value_to_value(
let v = call_sql_value_to_value!(
"a",
&ConcreteDataType::int32_datatype(),
ConcreteDataType::int32_datatype(),
&sql_val,
None,
None,
true,
auto_string_to_numeric
)
.unwrap();
assert_eq!(Value::Int32(123), v);
// Test with a float string
let sql_val = SqlValue::SingleQuotedString("3.5".to_string());
let v = sql_value_to_value(
let v = call_sql_value_to_value!(
"a",
&ConcreteDataType::float64_datatype(),
ConcreteDataType::float64_datatype(),
&sql_val,
None,
None,
true,
auto_string_to_numeric
)
.unwrap();
assert_eq!(Value::Float64(OrderedFloat(3.5)), v);
// Test with auto_string_to_numeric=false
let sql_val = SqlValue::SingleQuotedString("123".to_string());
let v = sql_value_to_value(
"a",
&ConcreteDataType::int32_datatype(),
&sql_val,
None,
None,
false,
);
let v = call_sql_value_to_value!("a", ConcreteDataType::int32_datatype(), &sql_val);
assert!(v.is_err());
// Test with an invalid numeric string but auto_string_to_numeric=true
// Should return an error now with the new auto_cast_to_numeric behavior
let sql_val = SqlValue::SingleQuotedString("not_a_number".to_string());
let v = sql_value_to_value(
let v = call_sql_value_to_value!(
"a",
&ConcreteDataType::int32_datatype(),
ConcreteDataType::int32_datatype(),
&sql_val,
None,
None,
true,
auto_string_to_numeric
);
assert!(v.is_err());
// Test with boolean type
let sql_val = SqlValue::SingleQuotedString("true".to_string());
let v = sql_value_to_value(
let v = call_sql_value_to_value!(
"a",
&ConcreteDataType::boolean_datatype(),
ConcreteDataType::boolean_datatype(),
&sql_val,
None,
None,
true,
auto_string_to_numeric
)
.unwrap();
assert_eq!(Value::Boolean(true), v);
// Non-numeric types should still be handled normally
let sql_val = SqlValue::SingleQuotedString("hello".to_string());
let v = sql_value_to_value(
let v = call_sql_value_to_value!(
"a",
&ConcreteDataType::string_datatype(),
ConcreteDataType::string_datatype(),
&sql_val,
None,
None,
true,
auto_string_to_numeric
);
assert!(v.is_ok());
}

View File

@@ -14,8 +14,8 @@
use common_time::timezone::Timezone;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnDefaultConstraint;
use datatypes::schema::constraint::{CURRENT_TIMESTAMP, CURRENT_TIMESTAMP_FN};
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema};
use snafu::ensure;
use sqlparser::ast::ValueWithSpan;
pub use sqlparser::ast::{
@@ -47,9 +47,12 @@ pub fn parse_column_default_constraint(
);
let default_constraint = match &opt.option {
ColumnOption::Default(Expr::Value(v)) => ColumnDefaultConstraint::Value(
sql_value_to_value(column_name, data_type, &v.value, timezone, None, false)?,
),
ColumnOption::Default(Expr::Value(v)) => {
let schema = ColumnSchema::new(column_name, data_type.clone(), true);
ColumnDefaultConstraint::Value(sql_value_to_value(
&schema, &v.value, timezone, None, false,
)?)
}
ColumnOption::Default(Expr::Function(func)) => {
let mut func = format!("{func}").to_lowercase();
// normalize CURRENT_TIMESTAMP to CURRENT_TIMESTAMP()
@@ -80,8 +83,7 @@ pub fn parse_column_default_constraint(
if let Expr::Value(v) = &**expr {
let value = sql_value_to_value(
column_name,
data_type,
&ColumnSchema::new(column_name, data_type.clone(), true),
&v.value,
timezone,
Some(*op),

View File

@@ -801,7 +801,8 @@ fn memtable_flat_sources(
if last_iter_rows > min_flush_rows {
let maybe_dedup = merge_and_dedup(
&schema,
options,
options.append_mode,
options.merge_mode(),
field_column_start,
std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)),
)?;
@@ -813,7 +814,13 @@ fn memtable_flat_sources(
// Handle remaining iters.
if !input_iters.is_empty() {
let maybe_dedup = merge_and_dedup(&schema, options, field_column_start, input_iters)?;
let maybe_dedup = merge_and_dedup(
&schema,
options.append_mode,
options.merge_mode(),
field_column_start,
input_iters,
)?;
flat_sources.sources.push(FlatSource::Iter(maybe_dedup));
}
@@ -822,19 +829,64 @@ fn memtable_flat_sources(
Ok(flat_sources)
}
fn merge_and_dedup(
/// Merges multiple record batch iterators and applies deduplication based on the specified mode.
///
/// This function is used during the flush process to combine data from multiple memtable ranges
/// into a single stream while handling duplicate records according to the configured merge strategy.
///
/// # Arguments
///
/// * `schema` - The Arrow schema reference that defines the structure of the record batches
/// * `append_mode` - When true, no deduplication is performed and all records are preserved.
/// This is used for append-only workloads where duplicate handling is not required.
/// * `merge_mode` - The strategy used for deduplication when not in append mode:
/// - `MergeMode::LastRow`: Keeps the last record for each primary key
/// - `MergeMode::LastNonNull`: Keeps the last non-null values for each field
/// * `field_column_start` - The starting column index for fields in the record batch.
/// Used when `MergeMode::LastNonNull` to identify which columns
/// contain field values versus primary key columns.
/// * `input_iters` - A vector of record batch iterators to be merged and deduplicated
///
/// # Returns
///
/// Returns a boxed record batch iterator that yields the merged and potentially deduplicated
/// record batches.
///
/// # Behavior
///
/// 1. Creates a `FlatMergeIterator` to merge all input iterators in sorted order based on
/// primary key and timestamp
/// 2. If `append_mode` is true, returns the merge iterator directly without deduplication
/// 3. If `append_mode` is false, wraps the merge iterator with a `FlatDedupIterator` that
/// applies the specified merge mode:
/// - `LastRow`: Removes duplicate rows, keeping only the last one
/// - `LastNonNull`: Removes duplicates but preserves the last non-null value for each field
///
/// # Examples
///
/// ```ignore
/// let merged_iter = merge_and_dedup(
/// &schema,
/// false, // not append mode, apply dedup
/// MergeMode::LastRow,
/// 2, // fields start at column 2 after primary key columns
/// vec![iter1, iter2, iter3],
/// )?;
/// ```
pub fn merge_and_dedup(
schema: &SchemaRef,
options: &RegionOptions,
append_mode: bool,
merge_mode: MergeMode,
field_column_start: usize,
input_iters: Vec<BoxedRecordBatchIterator>,
) -> Result<BoxedRecordBatchIterator> {
let merge_iter = FlatMergeIterator::new(schema.clone(), input_iters, DEFAULT_READ_BATCH_SIZE)?;
let maybe_dedup = if options.append_mode {
let maybe_dedup = if append_mode {
// No dedup in append mode
Box::new(merge_iter) as _
} else {
// Dedup according to merge mode.
match options.merge_mode() {
match merge_mode {
MergeMode::LastRow => {
Box::new(FlatDedupIterator::new(merge_iter, FlatLastRow::new(false))) as _
}

View File

@@ -410,8 +410,7 @@ fn sql_value_to_value(
})?
} else {
common_sql::convert::sql_value_to_value(
column,
&column_schema.data_type,
column_schema,
sql_val,
timezone,
None,

View File

@@ -52,6 +52,7 @@ use common_time::Timestamp;
use common_time::range::TimestampRange;
use datafusion_expr::LogicalPlan;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use humantime::format_duration;
use itertools::Itertools;
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
@@ -644,11 +645,20 @@ impl StatementExecutor {
})?
.unit();
let start_column = ColumnSchema::new(
"range_start",
ConcreteDataType::timestamp_datatype(time_unit),
false,
);
let end_column = ColumnSchema::new(
"range_end",
ConcreteDataType::timestamp_datatype(time_unit),
false,
);
let mut time_ranges = Vec::with_capacity(sql_values_time_range.len());
for (start, end) in sql_values_time_range {
let start = common_sql::convert::sql_value_to_value(
"range_start",
&ConcreteDataType::timestamp_datatype(time_unit),
&start_column,
start,
Some(&query_ctx.timezone()),
None,
@@ -667,8 +677,7 @@ impl StatementExecutor {
})?;
let end = common_sql::convert::sql_value_to_value(
"range_end",
&ConcreteDataType::timestamp_datatype(time_unit),
&end_column,
end,
Some(&query_ctx.timezone()),
None,

View File

@@ -242,8 +242,12 @@ fn values_to_vectors_by_exact_types(
args.iter()
.zip(exact_types.iter())
.map(|(value, data_type)| {
let data_type = &ConcreteDataType::from_arrow_type(data_type);
let value = sql_value_to_value(DUMMY_COLUMN, data_type, value, tz, None, false)
let schema = ColumnSchema::new(
DUMMY_COLUMN,
ConcreteDataType::from_arrow_type(data_type),
true,
);
let value = sql_value_to_value(&schema, value, tz, None, false)
.context(error::SqlCommonSnafu)?;
Ok(value_to_vector(value))
@@ -260,10 +264,12 @@ fn values_to_vectors_by_valid_types(
args.iter()
.map(|value| {
for data_type in valid_types {
let data_type = &ConcreteDataType::from_arrow_type(data_type);
if let Ok(value) =
sql_value_to_value(DUMMY_COLUMN, data_type, value, tz, None, false)
{
let schema = ColumnSchema::new(
DUMMY_COLUMN,
ConcreteDataType::from_arrow_type(data_type),
true,
);
if let Ok(value) = sql_value_to_value(&schema, value, tz, None, false) {
return Ok(value_to_vector(value));
}
}

View File

@@ -50,7 +50,7 @@ use common_time::{Timestamp, Timezone};
use datafusion_common::tree_node::TreeNodeVisitor;
use datafusion_expr::LogicalPlan;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{RawSchema, Schema};
use datatypes::schema::{ColumnSchema, RawSchema, Schema};
use datatypes::value::Value;
use partition::expr::{Operand, PartitionExpr, RestrictedOp};
use partition::multi_dim::MultiDimPartitionRule;
@@ -2001,8 +2001,7 @@ fn convert_value(
unary_op: Option<UnaryOperator>,
) -> Result<Value> {
sql_value_to_value(
"<NONAME>",
&data_type,
&ColumnSchema::new("<NONAME>", data_type, true),
value,
Some(timezone),
unary_op,

View File

@@ -87,7 +87,7 @@ operator.workspace = true
otel-arrow-rust.workspace = true
parking_lot.workspace = true
pg_interval = "0.4"
pgwire = { version = "0.36.3", default-features = false, features = [
pgwire = { version = "0.37", default-features = false, features = [
"server-api-ring",
"pg-ext-types",
] }

View File

@@ -22,6 +22,7 @@ use common_time::{Date, Timestamp};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_expr::LogicalPlan;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::types::TimestampType;
use datatypes::value::{self, Value};
use itertools::Itertools;
@@ -254,9 +255,10 @@ pub fn convert_value(param: &ParamValue, t: &ConcreteDataType) -> Result<ScalarV
/// Convert an MySQL expression to a scalar value.
/// It automatically handles the conversion of strings to numeric values.
pub fn convert_expr_to_scalar_value(param: &Expr, t: &ConcreteDataType) -> Result<ScalarValue> {
let column_schema = ColumnSchema::new("", t.clone(), true);
match param {
Expr::Value(v) => {
let v = sql_value_to_value("", t, &v.value, None, None, true);
let v = sql_value_to_value(&column_schema, &v.value, None, None, true);
match v {
Ok(v) => v
.try_to_scalar_value(t)
@@ -268,7 +270,7 @@ pub fn convert_expr_to_scalar_value(param: &Expr, t: &ConcreteDataType) -> Resul
}
}
Expr::UnaryOp { op, expr } if let Expr::Value(v) = &**expr => {
let v = sql_value_to_value("", t, &v.value, None, Some(*op), true);
let v = sql_value_to_value(&column_schema, &v.value, None, Some(*op), true);
match v {
Ok(v) => v
.try_to_scalar_value(t)

View File

@@ -28,13 +28,13 @@ fn build_string_data_rows(
schema: Arc<Vec<FieldInfo>>,
rows: Vec<Vec<String>>,
) -> Vec<PgWireResult<DataRow>> {
let mut encoder = DataRowEncoder::new(schema.clone());
rows.iter()
.map(|row| {
let mut encoder = DataRowEncoder::new(schema.clone());
for value in row {
encoder.encode_field(&Some(value))?;
}
encoder.finish()
Ok(encoder.take_row())
})
.collect()
}

View File

@@ -262,6 +262,26 @@ impl QueryParser for DefaultQueryParser {
})
}
}
fn get_parameter_types(&self, _stmt: &Self::Statement) -> PgWireResult<Vec<Type>> {
// we have our own implementation of describes in ExtendedQueryHandler
// so we don't use these methods
Err(PgWireError::ApiError(
"get_parameter_types is not expected to be called".into(),
))
}
fn get_result_schema(
&self,
_stmt: &Self::Statement,
_column_format: Option<&Format>,
) -> PgWireResult<Vec<FieldInfo>> {
// we have our own implementation of describes in ExtendedQueryHandler
// so we don't use these methods
Err(PgWireError::ApiError(
"get_result_schema is not expected to be called".into(),
))
}
}
#[async_trait]

View File

@@ -395,13 +395,13 @@ impl Iterator for RecordBatchRowIterator {
type Item = PgWireResult<DataRow>;
fn next(&mut self) -> Option<Self::Item> {
let mut encoder = DataRowEncoder::new(self.pg_schema.clone());
if self.i < self.record_batch.num_rows() {
let mut encoder = DataRowEncoder::new(self.pg_schema.clone());
if let Err(e) = self.encode_row(self.i, &mut encoder) {
return Some(Err(e));
}
self.i += 1;
Some(encoder.finish())
Some(Ok(encoder.take_row()))
} else {
None
}

View File

@@ -40,4 +40,8 @@ impl Dialect for GreptimeDbDialect {
fn supports_filter_during_aggregation(&self) -> bool {
true
}
fn supports_struct_literal(&self) -> bool {
true
}
}

View File

@@ -208,9 +208,9 @@ pub enum Error {
location: Location,
},
#[snafu(display("Invalid expr as option value, error: {error}"))]
InvalidExprAsOptionValue {
error: String,
#[snafu(display("Invalid JSON structure setting, reason: {reason}"))]
InvalidJsonStructureSetting {
reason: String,
#[snafu(implicit)]
location: Location,
},
@@ -373,7 +373,7 @@ impl ErrorExt for Error {
}
InvalidColumnOption { .. }
| InvalidExprAsOptionValue { .. }
| InvalidJsonStructureSetting { .. }
| InvalidDatabaseName { .. }
| InvalidDatabaseOption { .. }
| ColumnTypeMismatch { .. }

View File

@@ -40,16 +40,17 @@ pub(super) fn parse_json_datatype_options(parser: &mut Parser<'_>) -> Result<Opt
#[cfg(test)]
mod tests {
use sqlparser::ast::DataType;
use sqlparser::ast::{DataType, Expr, Ident, StructField};
use crate::dialect::GreptimeDbDialect;
use crate::parser::{ParseOptions, ParserContext};
use crate::statements::OptionMap;
use crate::statements::create::{
Column, JSON_FORMAT_FULL_STRUCTURED, JSON_FORMAT_PARTIAL, JSON_FORMAT_RAW, JSON_OPT_FORMAT,
JSON_OPT_UNSTRUCTURED_KEYS,
Column, JSON_FORMAT_FULL_STRUCTURED, JSON_FORMAT_PARTIAL, JSON_FORMAT_RAW, JSON_OPT_FIELDS,
JSON_OPT_FORMAT, JSON_OPT_UNSTRUCTURED_KEYS,
};
use crate::statements::statement::Statement;
use crate::util::OptionValue;
#[test]
fn test_parse_json_datatype_options() {
@@ -77,6 +78,41 @@ mod tests {
let sql = r#"
CREATE TABLE json_data (
my_json JSON(format = "partial", fields = Struct<i Int, "o.a" String, "o.b" String, `x.y.z` Float64>),
ts TIMESTAMP TIME INDEX,
)"#;
let options = parse(sql).unwrap();
assert_eq!(options.len(), 1);
let option = options.value(JSON_OPT_FIELDS);
let expected = Some(&OptionValue::new(Expr::Struct {
values: vec![],
fields: vec![
StructField {
field_name: Some(Ident::new("i")),
field_type: DataType::Int(None),
options: None,
},
StructField {
field_name: Some(Ident::with_quote('"', "o.a")),
field_type: DataType::String(None),
options: None,
},
StructField {
field_name: Some(Ident::with_quote('"', "o.b")),
field_type: DataType::String(None),
options: None,
},
StructField {
field_name: Some(Ident::with_quote('`', "x.y.z")),
field_type: DataType::Float64,
options: None,
},
],
}));
assert_eq!(option, expected);
let sql = r#"
CREATE TABLE json_data (
my_json JSON(format = "partial", unstructured_keys = ["k", "foo.bar", "a.b.c"]),
ts TIMESTAMP TIME INDEX,
)"#;

View File

@@ -40,6 +40,7 @@ use api::v1::SemanticType;
use common_sql::default_constraint::parse_column_default_constraint;
use common_time::timezone::Timezone;
use datatypes::extension::json::{JsonExtensionType, JsonMetadata};
use datatypes::json::JsonStructureSettings;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{COMMENT_KEY, ColumnDefaultConstraint, ColumnSchema};
use datatypes::types::json_type::JsonNativeType;
@@ -281,8 +282,17 @@ pub fn sql_data_type_to_concrete_data_type(
}
},
SqlDataType::JSON => {
let format = if column_extensions.json_datatype_options.is_some() {
JsonFormat::Native(Box::new(JsonNativeType::Null))
let format = if let Some(x) = column_extensions.build_json_structure_settings()? {
if let Some(fields) = match x {
JsonStructureSettings::Structured(fields) => fields,
JsonStructureSettings::UnstructuredRaw => None,
JsonStructureSettings::PartialUnstructuredByKey { fields, .. } => fields,
} {
let datatype = &ConcreteDataType::Struct(fields);
JsonFormat::Native(Box::new(datatype.into()))
} else {
JsonFormat::Native(Box::new(JsonNativeType::Null))
}
} else {
JsonFormat::Jsonb
};

View File

@@ -14,27 +14,30 @@
use std::collections::{HashMap, HashSet};
use std::fmt::{Display, Formatter};
use std::sync::Arc;
use common_catalog::consts::FILE_ENGINE;
use datatypes::data_type::ConcreteDataType;
use datatypes::json::JsonStructureSettings;
use datatypes::schema::{
FulltextOptions, SkippingIndexOptions, VectorDistanceMetric, VectorIndexEngineType,
VectorIndexOptions,
};
use datatypes::types::StructType;
use itertools::Itertools;
use serde::Serialize;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use sqlparser::ast::{ColumnOptionDef, DataType, Expr, Query};
use sqlparser_derive::{Visit, VisitMut};
use crate::ast::{ColumnDef, Ident, ObjectName, Value as SqlValue};
use crate::error::{
InvalidFlowQuerySnafu, InvalidSqlSnafu, Result, SetFulltextOptionSnafu,
SetSkippingIndexOptionSnafu,
InvalidFlowQuerySnafu, InvalidJsonStructureSettingSnafu, InvalidSqlSnafu, Result,
SetFulltextOptionSnafu, SetSkippingIndexOptionSnafu,
};
use crate::statements::OptionMap;
use crate::statements::statement::Statement;
use crate::statements::tql::Tql;
use crate::statements::{OptionMap, sql_data_type_to_concrete_data_type};
use crate::util::OptionValue;
const LINE_SEP: &str = ",\n";
@@ -44,6 +47,7 @@ pub const VECTOR_OPT_DIM: &str = "dim";
pub const JSON_OPT_UNSTRUCTURED_KEYS: &str = "unstructured_keys";
pub const JSON_OPT_FORMAT: &str = "format";
pub(crate) const JSON_OPT_FIELDS: &str = "fields";
pub const JSON_FORMAT_FULL_STRUCTURED: &str = "structured";
pub const JSON_FORMAT_RAW: &str = "raw";
pub const JSON_FORMAT_PARTIAL: &str = "partial";
@@ -346,14 +350,51 @@ impl ColumnExtensions {
})
.unwrap_or_default();
let fields = if let Some(value) = options.value(JSON_OPT_FIELDS) {
let fields = value
.as_struct_fields()
.context(InvalidJsonStructureSettingSnafu {
reason: format!(r#"expect "{JSON_OPT_FIELDS}" a struct, actual: "{value}""#,),
})?;
let fields = fields
.iter()
.map(|field| {
let name = field.field_name.as_ref().map(|x| x.value.clone()).context(
InvalidJsonStructureSettingSnafu {
reason: format!(r#"missing field name in "{field}""#),
},
)?;
let datatype = sql_data_type_to_concrete_data_type(
&field.field_type,
&Default::default(),
)?;
Ok(datatypes::types::StructField::new(name, datatype, true))
})
.collect::<Result<_>>()?;
Some(StructType::new(Arc::new(fields)))
} else {
None
};
options
.get(JSON_OPT_FORMAT)
.map(|format| match format {
JSON_FORMAT_FULL_STRUCTURED => Ok(JsonStructureSettings::Structured(None)),
JSON_FORMAT_PARTIAL => Ok(JsonStructureSettings::PartialUnstructuredByKey {
fields: None,
unstructured_keys,
}),
JSON_FORMAT_FULL_STRUCTURED => Ok(JsonStructureSettings::Structured(fields)),
JSON_FORMAT_PARTIAL => {
let fields = fields.map(|fields| {
let mut fields = Arc::unwrap_or_clone(fields.fields());
fields.push(datatypes::types::StructField::new(
JsonStructureSettings::RAW_FIELD.to_string(),
ConcreteDataType::string_datatype(),
true,
));
StructType::new(Arc::new(fields))
});
Ok(JsonStructureSettings::PartialUnstructuredByKey {
fields,
unstructured_keys,
})
}
JSON_FORMAT_RAW => Ok(JsonStructureSettings::UnstructuredRaw),
_ => InvalidSqlSnafu {
msg: format!("unknown JSON datatype 'format': {format}"),

View File

@@ -17,14 +17,14 @@ use std::fmt::{Display, Formatter};
use itertools::Itertools;
use serde::Serialize;
use snafu::ensure;
use sqlparser::ast::{
Array, Expr, Ident, ObjectName, SetExpr, SqlOption, TableFactor, Value, ValueWithSpan,
Array, Expr, Ident, ObjectName, SetExpr, SqlOption, StructField, TableFactor, Value,
ValueWithSpan,
};
use sqlparser_derive::{Visit, VisitMut};
use crate::ast::ObjectNamePartExt;
use crate::error::{InvalidExprAsOptionValueSnafu, InvalidSqlSnafu, Result};
use crate::error::{InvalidSqlSnafu, Result};
use crate::statements::create::SqlOrTql;
/// Format an [ObjectName] without any quote of its idents.
@@ -52,14 +52,8 @@ pub fn format_raw_object_name(name: &ObjectName) -> String {
pub struct OptionValue(Expr);
impl OptionValue {
fn try_new(expr: Expr) -> Result<Self> {
ensure!(
matches!(expr, Expr::Value(_) | Expr::Identifier(_) | Expr::Array(_)),
InvalidExprAsOptionValueSnafu {
error: format!("{expr} not accepted")
}
);
Ok(Self(expr))
pub fn new(expr: Expr) -> Self {
Self(expr)
}
fn expr_as_string(expr: &Expr) -> Option<&str> {
@@ -106,6 +100,13 @@ impl OptionValue {
_ => None,
}
}
pub(crate) fn as_struct_fields(&self) -> Option<&[StructField]> {
match &self.0 {
Expr::Struct { fields, .. } => Some(fields),
_ => None,
}
}
}
impl From<String> for OptionValue {
@@ -155,7 +156,7 @@ pub fn parse_option_string(option: SqlOption) -> Result<(String, OptionValue)> {
}
.fail();
};
let v = OptionValue::try_new(value)?;
let v = OptionValue::new(value);
let k = key.value.to_lowercase();
Ok((k, v))
}

View File

@@ -0,0 +1,10 @@
{"did":"did:plc:yj3sjq3blzpynh27cumnp5ks","time_us":1732206349000167,"kind":"commit","commit":{"rev":"3lbhtytnn2k2f","operation":"create","collection":"app.bsky.feed.post","rkey":"3lbhtyteurk2y","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.095Z","langs":["en"],"reply":{"parent":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"},"root":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"}},"text":"aaaaah. LIght shines in a corner of WTF...."},"cid":"bafyreidblutgvj75o4q4akzyyejedjj6l3it6hgqwee6jpwv2wqph5fsgm"}}
{"did":"did:plc:3i4xf2v4wcnyktgv6satke64","time_us":1732206349000644,"kind":"commit","commit":{"rev":"3lbhuvzds6d2a","operation":"create","collection":"app.bsky.feed.like","rkey":"3lbhuvzdked2a","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.221Z","subject":{"cid":"bafyreidjvrcmckkm765mct5fph36x7kupkfo35rjklbf2k76xkzwyiauge","uri":"at://did:plc:azrv4rcbws6kmcga4fsbphg2/app.bsky.feed.post/3lbgjdpbiec2l"}},"cid":"bafyreia5l5vrkh5oj4cjyhcqby2dprhyvcyofo2q5562tijlae2pzih23m"}}
{"did":"did:plc:gccfnqqizz4urhchsaie6jft","time_us":1732206349001108,"kind":"commit","commit":{"rev":"3lbhuvze3gi2u","operation":"create","collection":"app.bsky.graph.follow","rkey":"3lbhuvzdtmi2u","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:27:40.923Z","subject":"did:plc:r7cdh4sgzqbfdc6wcdxxti7c"},"cid":"bafyreiew2p6cgirfaj45qoenm4fgumib7xoloclrap3jgkz5es7g7kby3i"}}
{"did":"did:plc:msxqf3twq7abtdw7dbfskphk","time_us":1732206349001372,"kind":"commit","commit":{"rev":"3lbhueija5p22","operation":"create","collection":"app.bsky.feed.like","rkey":"3lbhueiizcx22","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:15:58.232Z","subject":{"cid":"bafyreiavpshyqzrlo5m7fqodjhs6jevweqnif4phasiwimv4a7mnsqi2fe","uri":"at://did:plc:fusulxqc52zbrc75fi6xrcof/app.bsky.feed.post/3lbhskq5zn22f"}},"cid":"bafyreidjix4dauj2afjlbzmhj3a7gwftcevvmmy6edww6vrjdbst26rkby"}}
{"did":"did:plc:l5o3qjrmfztir54cpwlv2eme","time_us":1732206349001905,"kind":"commit","commit":{"rev":"3lbhtytohxc2o","operation":"create","collection":"app.bsky.feed.post","rkey":"3lbhtytjqzk2q","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.254Z","langs":["en"],"reply":{"parent":{"cid":"bafyreih35fe2jj3gchmgk4amold4l6sfxd2sby5wrg3jrws5fkdypxrbg4","uri":"at://did:plc:6wx2gg5yqgvmlu35r6y3bk6d/app.bsky.feed.post/3lbhtj2eb4s2o"},"root":{"cid":"bafyreifipyt3vctd4ptuoicvio7rbr5xvjv4afwuggnd2prnmn55mu6luu","uri":"at://did:plc:474ldquxwzrlcvjhhbbk2wte/app.bsky.feed.post/3lbhdzrynik27"}},"text":"okay i take mine back because I hadnt heard this one yet^^"},"cid":"bafyreigzdsdne3z2xxcakgisieyj7y47hj6eg7lj6v4q25ah5q2qotu5ku"}}
{"did":"did:plc:jkaaf5j2yb2pvpx3ualm3vbh","time_us":1732206349002758,"kind":"commit","commit":{"rev":"3lbhudfo3yi2w","operation":"create","collection":"app.bsky.graph.follow","rkey":"3lbhudfnw4y2w","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:15:21.495Z","subject":"did:plc:amsdn2tbjxo3xrwqneqhh4cm"},"cid":"bafyreiaa2vsdr4ckwjg4jq47zfd7mewidywfz3qh3dmglcd6ozi4xwdega"}}
{"did":"did:plc:tdwz2h4id5dxezvohftsmffu","time_us":1732206349003106,"kind":"commit","commit":{"rev":"3lbhujcp4ix2n","operation":"create","collection":"app.bsky.graph.follow","rkey":"3lbhujcoxmp2n","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:18:39.913Z","subject":"did:plc:gf3vum7insztt5rxrpxdz2id"},"cid":"bafyreihaatlpar3abtx6ck3kde2ksic6zzflk4ppduhf6dxurytqrv33ni"}}
{"did":"did:plc:cdsd346mwow7aj3tgfkwsct3","time_us":1732206349003461,"kind":"commit","commit":{"rev":"3lbhus5vior2t","operation":"create","collection":"app.bsky.feed.repost","rkey":"3lbhus5vbtz2t","record":{"$type":"app.bsky.feed.repost","createdAt":"2024-11-21T16:23:36.714Z","subject":{"cid":"bafyreieaacfiobnuqvjhhsndyi5s3fd6krbzdduxsyrzfv43kczpcmkl6y","uri":"at://did:plc:o5q6dynpme4ndolc3heztasm/app.bsky.feed.post/3lbfli3qsoc2o"}},"cid":"bafyreid5ycocp5zq2g7fcx2xxzxrbafuh7b5qhtwuwiomzo6vqila2cbpu"}}
{"did":"did:plc:s4bwqchfzm6gjqfeb6mexgbu","time_us":1732206349003907,"kind":"commit","commit":{"rev":"3lbhuvzeccx2w","operation":"create","collection":"app.bsky.feed.post","rkey":"3lbhuvxf4qs2m","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:25:44.376Z","langs":["en"],"reply":{"parent":{"cid":"bafyreiaev27cfcxxvn2pdhrwwquzwgclujnulzbcfnn4p4fwgb6migjhw4","uri":"at://did:plc:zec6cslvgc3hhdatrhk6pq5p/app.bsky.feed.post/3lbhujvds4c2b"},"root":{"cid":"bafyreif7qjxhvecwnhlynijj6pf47jwvtkahsz3zh2kaipwu2bw2dxwaqq","uri":"at://did:plc:s4bwqchfzm6gjqfeb6mexgbu/app.bsky.feed.post/3lbhug53kkk2m"}},"text":"\n⌜ Blinking. She hadn't realized she spoke out loud. ⌟\n\n It was nothing like that — . I was only thinking . . . \n\n⌜ Trailing off, her mind occupied. ⌟\n"},"cid":"bafyreibugobcike72y4zxvdyz2oopyt6ywwqfielcwojkb27p7s6rlomgm"}}
{"did":"did:plc:hbc74dlsxhq53kp5oxges6d7","time_us":1732206349004769,"kind":"commit","commit":{"rev":"3lbhuvzedg52j","operation":"create","collection":"app.bsky.feed.like","rkey":"3lbhuvzdyof2j","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.167Z","subject":{"cid":"bafyreiaumopip75nzx2xjbugtwemdppsyx54bd2odf6q45f3o7xkocgari","uri":"at://did:plc:ig2jv6gqup4t7gdq2pmanknw/app.bsky.feed.post/3lbhuvtlaec2c"}},"cid":"bafyreidjk2svg2fdjiiwohmfmvp3hdxhpb33ycnixzbkyib5m6cocindxq"}}

View File

@@ -76,11 +76,10 @@ mod test {
use super::*;
use crate::standalone::GreptimeDbStandaloneBuilder;
use crate::test_util::execute_sql;
use crate::tests;
use crate::tests::MockDistributedInstance;
use crate::tests::test_util::{
MockInstance, both_instances_cases, distributed, execute_sql, standalone,
};
use crate::tests::test_util::{MockInstance, both_instances_cases, distributed, standalone};
#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_handle_ddl_request() {

View File

@@ -24,6 +24,7 @@ use common_base::Plugins;
use common_config::Configurable;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
use common_query::Output;
use common_runtime::runtime::BuilderBuild;
use common_runtime::{Builder as RuntimeBuilder, Runtime};
use common_test_util::ports;
@@ -747,3 +748,10 @@ pub(crate) async fn prepare_another_catalog_and_schema(instance: &Instance) {
.await
.unwrap();
}
pub async fn execute_sql(instance: &Arc<Instance>, sql: &str) -> Output {
SqlQueryHandler::do_query(instance.as_ref(), sql, QueryContext::arc())
.await
.remove(0)
.unwrap()
}

View File

@@ -18,7 +18,7 @@ mod instance_noop_wal_test;
mod instance_test;
mod promql_test;
mod reconcile_table;
pub(crate) mod test_util;
pub mod test_util;
use std::collections::HashMap;
use std::sync::Arc;

View File

@@ -27,8 +27,8 @@ use store_api::storage::RegionId;
use table::metadata::TableId;
use crate::cluster::GreptimeDbClusterBuilder;
use crate::test_util::{StorageType, TempDirGuard, get_test_store_config};
use crate::tests::test_util::{MockInstanceBuilder, TestContext, execute_sql, wait_procedure};
use crate::test_util::{StorageType, TempDirGuard, execute_sql, get_test_store_config};
use crate::tests::test_util::{MockInstanceBuilder, TestContext, wait_procedure};
/// Helper function to get table route information for GC procedure
async fn get_table_route(

View File

@@ -18,9 +18,8 @@ use common_test_util::recordbatch::check_output_stream;
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use crate::cluster::GreptimeDbClusterBuilder;
use crate::tests::test_util::{
MockInstanceBuilder, RebuildableMockInstance, TestContext, execute_sql,
};
use crate::test_util::execute_sql;
use crate::tests::test_util::{MockInstanceBuilder, RebuildableMockInstance, TestContext};
pub(crate) async fn distributed_with_noop_wal() -> TestContext {
common_telemetry::init_default_ut_logging();

View File

@@ -23,9 +23,10 @@ use common_test_util::recordbatch::check_output_stream;
use table::table_reference::TableReference;
use crate::cluster::GreptimeDbClusterBuilder;
use crate::test_util::execute_sql;
use crate::tests::test_util::{
MockInstanceBuilder, RebuildableMockInstance, TestContext, dump_kvbackend, execute_sql,
restore_kvbackend, try_execute_sql, wait_procedure,
MockInstanceBuilder, RebuildableMockInstance, TestContext, dump_kvbackend, restore_kvbackend,
try_execute_sql, wait_procedure,
};
const CREATE_MONITOR_TABLE_SQL: &str = r#"

View File

@@ -439,10 +439,6 @@ pub fn find_testing_resource(path: &str) -> String {
prepare_path(&p)
}
pub async fn execute_sql(instance: &Arc<Instance>, sql: &str) -> Output {
execute_sql_with(instance, sql, QueryContext::arc()).await
}
pub async fn try_execute_sql(instance: &Arc<Instance>, sql: &str) -> Result<Output> {
try_execute_sql_with(instance, sql, QueryContext::arc()).await
}
@@ -455,16 +451,6 @@ pub async fn try_execute_sql_with(
instance.do_query(sql, query_ctx).await.remove(0)
}
pub async fn execute_sql_with(
instance: &Arc<Instance>,
sql: &str,
query_ctx: QueryContextRef,
) -> Output {
try_execute_sql_with(instance, sql, query_ctx)
.await
.unwrap_or_else(|e| panic!("Failed to execute sql: {sql}, error: {e:?}"))
}
/// Dump the kv backend to a vector of key-value pairs.
pub async fn dump_kvbackend(kv_backend: &KvBackendRef) -> Vec<(Vec<u8>, Vec<u8>)> {
let req = RangeRequest::new().with_range(vec![0], vec![0]);

View File

@@ -0,0 +1,94 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::io::BufRead;
use std::sync::Arc;
use std::{fs, io};
use common_test_util::find_workspace_path;
use frontend::instance::Instance;
use tests_integration::standalone::GreptimeDbStandaloneBuilder;
use tests_integration::test_util::execute_sql;
#[tokio::test]
async fn test_load_jsonbench_data() {
common_telemetry::init_default_ut_logging();
let instance = GreptimeDbStandaloneBuilder::new("test_load_jsonbench_data")
.build()
.await;
let frontend = instance.fe_instance();
create_table(frontend).await;
desc_table(frontend).await;
insert_data(frontend).await.unwrap();
}
async fn insert_data(frontend: &Arc<Instance>) -> io::Result<()> {
let file = fs::File::open(find_workspace_path(
"tests-integration/resources/jsonbench-head-10.ndjson",
))?;
let reader = io::BufReader::new(file);
for (i, line) in reader.lines().enumerate() {
let line = line?;
if line.is_empty() {
continue;
}
let sql = format!(
"INSERT INTO bluesky (ts, data) VALUES ({}, '{line}')",
i + 1,
);
let output = execute_sql(frontend, &sql).await;
let output = output.data.pretty_print().await;
assert_eq!(output, "Affected Rows: 1");
}
Ok(())
}
async fn desc_table(frontend: &Arc<Instance>) {
let sql = "DESC TABLE bluesky";
let expected = r#"
+--------+----------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+--------+----------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
| data | Json<Object{"_raw": String, "commit.collection": String, "commit.operation": String, "did": String, "kind": String, "time_us": Number(I64)}> | | YES | | FIELD |
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
+--------+----------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+"#;
let output = execute_sql(frontend, sql).await;
let output = output.data.pretty_print().await;
assert_eq!(output, expected.trim());
}
async fn create_table(frontend: &Arc<Instance>) {
let sql = r#"
CREATE TABLE bluesky (
"data" JSON (
format = "partial",
fields = Struct<
kind String,
"commit.operation" String,
"commit.collection" String,
did String,
time_us Bigint
>,
),
ts Timestamp TIME INDEX,
)
"#;
let output = execute_sql(frontend, sql).await;
let output = output.data.pretty_print().await;
assert_eq!(output, "Affected Rows: 0");
}

View File

@@ -16,6 +16,7 @@
mod grpc;
#[macro_use]
mod http;
mod jsonbench;
#[macro_use]
mod sql;
#[macro_use]