Compare commits

..

1 Commits

Author SHA1 Message Date
luofucong
1bb541112f ingest jsonbench data
Signed-off-by: luofucong <luofc@foxmail.com>
2025-12-23 19:28:21 +08:00
32 changed files with 795 additions and 505 deletions

View File

@@ -102,30 +102,6 @@ like `feat`/`fix`/`docs`, with a concise summary of code change following. AVOID
All commit messages SHOULD adhere to the [Conventional Commits specification](https://conventionalcommits.org/).
## AI-Assisted contributions
We has the following policy for AI-assisted PRs:
- The PR author should **understand the core ideas** behind the implementation **end-to-end**, and be able to justify the design and code during review.
- **Calls out unknowns and assumptions**. It's okay to not fully understand some bits of AI generated code. You should comment on these cases and point them out to reviewers so that they can use their knowledge of the codebase to clear up any concerns. For example, you might comment "calling this function here seems to work but I'm not familiar with how it works internally, I wonder if there's a race condition if it is called concurrently".
### Why fully AI-generated PRs without understanding are not helpful
Today, AI tools cannot reliably make complex changes to DataFusion on their own, which is why we rely on pull requests and code review.
The purposes of code review are:
1. Finish the intended task.
2. Share knowledge between authors and reviewers, as a long-term investment in the project. For this reason, even if someone familiar with the codebase can finish a task quickly, we're still happy to help a new contributor work on it even if it takes longer.
An AI dump for an issue doesnt meet these purposes. Maintainers could finish the task faster by using AI directly, and the submitters gain little knowledge if they act only as a pass through AI proxy without understanding.
Please understand the reviewing capacity is **very limited** for the project, so large PRs which appear to not have the requisite understanding might not get reviewed, and eventually closed or redirected.
### Better ways to contribute than an “AI dump”
It's recommended to write a high-quality issue with a clear problem statement and a minimal, reproducible example. This can make it easier for others to contribute.
## Getting Help
There are many ways to get help when you're stuck. It is recommended to ask for help by opening an issue, with a detailed description

6
Cargo.lock generated
View File

@@ -2580,10 +2580,12 @@ dependencies = [
name = "common-sql"
version = "1.0.0-beta.3"
dependencies = [
"arrow-schema",
"common-base",
"common-decimal",
"common-error",
"common-macro",
"common-telemetry",
"common-time",
"datafusion-sql",
"datatypes",
@@ -12229,7 +12231,7 @@ dependencies = [
[[package]]
name = "sqlparser"
version = "0.58.0"
source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=4b519a5caa95472cc3988f5556813a583dd35af1#4b519a5caa95472cc3988f5556813a583dd35af1"
source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=a0ce2bc6eb3e804532932f39833c32432f5c9a39#a0ce2bc6eb3e804532932f39833c32432f5c9a39"
dependencies = [
"lazy_static",
"log",
@@ -12253,7 +12255,7 @@ dependencies = [
[[package]]
name = "sqlparser_derive"
version = "0.3.0"
source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=4b519a5caa95472cc3988f5556813a583dd35af1#4b519a5caa95472cc3988f5556813a583dd35af1"
source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=a0ce2bc6eb3e804532932f39833c32432f5c9a39#a0ce2bc6eb3e804532932f39833c32432f5c9a39"
dependencies = [
"proc-macro2",
"quote",

View File

@@ -332,7 +332,7 @@ datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.g
datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "4b519a5caa95472cc3988f5556813a583dd35af1" } # branch = "v0.58.x"
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "a0ce2bc6eb3e804532932f39833c32432f5c9a39" } # branch = "v0.58.x"
[profile.release]
debug = 1

View File

@@ -5,10 +5,12 @@ edition.workspace = true
license.workspace = true
[dependencies]
arrow-schema.workspace = true
common-base.workspace = true
common-decimal.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
datafusion-sql.workspace = true
datatypes.workspace = true

View File

@@ -14,11 +14,12 @@
use std::str::FromStr;
use arrow_schema::extension::ExtensionType;
use common_time::Timestamp;
use common_time::timezone::Timezone;
use datatypes::json::JsonStructureSettings;
use datatypes::extension::json::JsonExtensionType;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnDefaultConstraint;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema};
use datatypes::types::{JsonFormat, parse_string_to_jsonb, parse_string_to_vector_type_value};
use datatypes::value::{OrderedF32, OrderedF64, Value};
use snafu::{OptionExt, ResultExt, ensure};
@@ -124,13 +125,14 @@ pub(crate) fn sql_number_to_value(data_type: &ConcreteDataType, n: &str) -> Resu
/// If `auto_string_to_numeric` is true, tries to cast the string value to numeric values,
/// and returns error if the cast fails.
pub fn sql_value_to_value(
column_name: &str,
data_type: &ConcreteDataType,
column_schema: &ColumnSchema,
sql_val: &SqlValue,
timezone: Option<&Timezone>,
unary_op: Option<UnaryOperator>,
auto_string_to_numeric: bool,
) -> Result<Value> {
let column_name = &column_schema.name;
let data_type = &column_schema.data_type;
let mut value = match sql_val {
SqlValue::Number(n, _) => sql_number_to_value(data_type, n)?,
SqlValue::Null => Value::Null,
@@ -146,13 +148,9 @@ pub fn sql_value_to_value(
(*b).into()
}
SqlValue::DoubleQuotedString(s) | SqlValue::SingleQuotedString(s) => parse_string_to_value(
column_name,
s.clone(),
data_type,
timezone,
auto_string_to_numeric,
)?,
SqlValue::DoubleQuotedString(s) | SqlValue::SingleQuotedString(s) => {
parse_string_to_value(column_schema, s.clone(), timezone, auto_string_to_numeric)?
}
SqlValue::HexStringLiteral(s) => {
// Should not directly write binary into json column
ensure!(
@@ -244,12 +242,12 @@ pub fn sql_value_to_value(
}
pub(crate) fn parse_string_to_value(
column_name: &str,
column_schema: &ColumnSchema,
s: String,
data_type: &ConcreteDataType,
timezone: Option<&Timezone>,
auto_string_to_numeric: bool,
) -> Result<Value> {
let data_type = &column_schema.data_type;
if auto_string_to_numeric && let Some(value) = auto_cast_to_numeric(&s, data_type)? {
return Ok(value);
}
@@ -257,7 +255,7 @@ pub(crate) fn parse_string_to_value(
ensure!(
data_type.is_stringifiable(),
ColumnTypeMismatchSnafu {
column_name,
column_name: column_schema.name.clone(),
expect: data_type.clone(),
actual: ConcreteDataType::string_datatype(),
}
@@ -303,23 +301,21 @@ pub(crate) fn parse_string_to_value(
}
}
ConcreteDataType::Binary(_) => Ok(Value::Binary(s.as_bytes().into())),
ConcreteDataType::Json(j) => {
match &j.format {
JsonFormat::Jsonb => {
let v = parse_string_to_jsonb(&s).context(DatatypeSnafu)?;
Ok(Value::Binary(v.into()))
}
JsonFormat::Native(_inner) => {
// Always use the structured version at this level.
let serde_json_value =
serde_json::from_str(&s).context(DeserializeSnafu { json: s })?;
let json_structure_settings = JsonStructureSettings::Structured(None);
json_structure_settings
.encode(serde_json_value)
.context(DatatypeSnafu)
}
ConcreteDataType::Json(j) => match &j.format {
JsonFormat::Jsonb => {
let v = parse_string_to_jsonb(&s).context(DatatypeSnafu)?;
Ok(Value::Binary(v.into()))
}
}
JsonFormat::Native(_) => {
let extension_type: Option<JsonExtensionType> =
column_schema.extension_type().context(DatatypeSnafu)?;
let json_structure_settings = extension_type
.and_then(|x| x.metadata().json_structure_settings.clone())
.unwrap_or_default();
let v = serde_json::from_str(&s).context(DeserializeSnafu { json: s })?;
json_structure_settings.encode(v).context(DatatypeSnafu)
}
},
ConcreteDataType::Vector(d) => {
let v = parse_string_to_vector_type_value(&s, Some(d.dim)).context(DatatypeSnafu)?;
Ok(Value::Binary(v.into()))
@@ -417,305 +413,265 @@ mod test {
use super::*;
macro_rules! call_parse_string_to_value {
($column_name: expr, $input: expr, $data_type: expr) => {
call_parse_string_to_value!($column_name, $input, $data_type, None)
};
($column_name: expr, $input: expr, $data_type: expr, timezone = $timezone: expr) => {
call_parse_string_to_value!($column_name, $input, $data_type, Some($timezone))
};
($column_name: expr, $input: expr, $data_type: expr, $timezone: expr) => {{
let column_schema = ColumnSchema::new($column_name, $data_type, true);
parse_string_to_value(&column_schema, $input, $timezone, true)
}};
}
#[test]
fn test_string_to_value_auto_numeric() {
fn test_string_to_value_auto_numeric() -> Result<()> {
// Test string to boolean with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"true".to_string(),
&ConcreteDataType::boolean_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::boolean_datatype()
)?;
assert_eq!(Value::Boolean(true), result);
// Test invalid string to boolean with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_a_boolean".to_string(),
&ConcreteDataType::boolean_datatype(),
None,
true,
ConcreteDataType::boolean_datatype()
);
assert!(result.is_err());
// Test string to int8
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"42".to_string(),
&ConcreteDataType::int8_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::int8_datatype()
)?;
assert_eq!(Value::Int8(42), result);
// Test invalid string to int8 with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_an_int8".to_string(),
&ConcreteDataType::int8_datatype(),
None,
true,
ConcreteDataType::int8_datatype()
);
assert!(result.is_err());
// Test string to int16
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"1000".to_string(),
&ConcreteDataType::int16_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::int16_datatype()
)?;
assert_eq!(Value::Int16(1000), result);
// Test invalid string to int16 with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_an_int16".to_string(),
&ConcreteDataType::int16_datatype(),
None,
true,
ConcreteDataType::int16_datatype()
);
assert!(result.is_err());
// Test string to int32
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"100000".to_string(),
&ConcreteDataType::int32_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::int32_datatype()
)?;
assert_eq!(Value::Int32(100000), result);
// Test invalid string to int32 with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_an_int32".to_string(),
&ConcreteDataType::int32_datatype(),
None,
true,
ConcreteDataType::int32_datatype()
);
assert!(result.is_err());
// Test string to int64
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"1000000".to_string(),
&ConcreteDataType::int64_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::int64_datatype()
)?;
assert_eq!(Value::Int64(1000000), result);
// Test invalid string to int64 with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_an_int64".to_string(),
&ConcreteDataType::int64_datatype(),
None,
true,
ConcreteDataType::int64_datatype()
);
assert!(result.is_err());
// Test string to uint8
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"200".to_string(),
&ConcreteDataType::uint8_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::uint8_datatype()
)?;
assert_eq!(Value::UInt8(200), result);
// Test invalid string to uint8 with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_a_uint8".to_string(),
&ConcreteDataType::uint8_datatype(),
None,
true,
ConcreteDataType::uint8_datatype()
);
assert!(result.is_err());
// Test string to uint16
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"60000".to_string(),
&ConcreteDataType::uint16_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::uint16_datatype()
)?;
assert_eq!(Value::UInt16(60000), result);
// Test invalid string to uint16 with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_a_uint16".to_string(),
&ConcreteDataType::uint16_datatype(),
None,
true,
ConcreteDataType::uint16_datatype()
);
assert!(result.is_err());
// Test string to uint32
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"4000000000".to_string(),
&ConcreteDataType::uint32_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::uint32_datatype()
)?;
assert_eq!(Value::UInt32(4000000000), result);
// Test invalid string to uint32 with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_a_uint32".to_string(),
&ConcreteDataType::uint32_datatype(),
None,
true,
ConcreteDataType::uint32_datatype()
);
assert!(result.is_err());
// Test string to uint64
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"18446744073709551615".to_string(),
&ConcreteDataType::uint64_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::uint64_datatype()
)?;
assert_eq!(Value::UInt64(18446744073709551615), result);
// Test invalid string to uint64 with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_a_uint64".to_string(),
&ConcreteDataType::uint64_datatype(),
None,
true,
ConcreteDataType::uint64_datatype()
);
assert!(result.is_err());
// Test string to float32
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"3.5".to_string(),
&ConcreteDataType::float32_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::float32_datatype()
)?;
assert_eq!(Value::Float32(OrderedF32::from(3.5)), result);
// Test invalid string to float32 with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_a_float32".to_string(),
&ConcreteDataType::float32_datatype(),
None,
true,
ConcreteDataType::float32_datatype()
);
assert!(result.is_err());
// Test string to float64
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"3.5".to_string(),
&ConcreteDataType::float64_datatype(),
None,
true,
)
.unwrap();
ConcreteDataType::float64_datatype()
)?;
assert_eq!(Value::Float64(OrderedF64::from(3.5)), result);
// Test invalid string to float64 with auto cast
let result = parse_string_to_value(
let result = call_parse_string_to_value!(
"col",
"not_a_float64".to_string(),
&ConcreteDataType::float64_datatype(),
None,
true,
ConcreteDataType::float64_datatype()
);
assert!(result.is_err());
Ok(())
}
#[test]
fn test_sql_value_to_value() {
let sql_val = SqlValue::Null;
assert_eq!(
Value::Null,
sql_value_to_value(
"a",
&ConcreteDataType::float64_datatype(),
&sql_val,
None,
macro_rules! call_sql_value_to_value {
($column_name: expr, $data_type: expr, $sql_value: expr) => {
call_sql_value_to_value!($column_name, $data_type, $sql_value, None, None, false)
};
($column_name: expr, $data_type: expr, $sql_value: expr, timezone = $timezone: expr) => {
call_sql_value_to_value!(
$column_name,
$data_type,
$sql_value,
Some($timezone),
None,
false
)
.unwrap()
};
($column_name: expr, $data_type: expr, $sql_value: expr, unary_op = $unary_op: expr) => {
call_sql_value_to_value!(
$column_name,
$data_type,
$sql_value,
None,
Some($unary_op),
false
)
};
($column_name: expr, $data_type: expr, $sql_value: expr, auto_string_to_numeric) => {
call_sql_value_to_value!($column_name, $data_type, $sql_value, None, None, true)
};
($column_name: expr, $data_type: expr, $sql_value: expr, $timezone: expr, $unary_op: expr, $auto_string_to_numeric: expr) => {{
let column_schema = ColumnSchema::new($column_name, $data_type, true);
sql_value_to_value(
&column_schema,
$sql_value,
$timezone,
$unary_op,
$auto_string_to_numeric,
)
}};
}
#[test]
fn test_sql_value_to_value() -> Result<()> {
let sql_val = SqlValue::Null;
assert_eq!(
Value::Null,
call_sql_value_to_value!("a", ConcreteDataType::float64_datatype(), &sql_val)?
);
let sql_val = SqlValue::Boolean(true);
assert_eq!(
Value::Boolean(true),
sql_value_to_value(
"a",
&ConcreteDataType::boolean_datatype(),
&sql_val,
None,
None,
false
)
.unwrap()
call_sql_value_to_value!("a", ConcreteDataType::boolean_datatype(), &sql_val)?
);
let sql_val = SqlValue::Number("3.0".to_string(), false);
assert_eq!(
Value::Float64(OrderedFloat(3.0)),
sql_value_to_value(
"a",
&ConcreteDataType::float64_datatype(),
&sql_val,
None,
None,
false
)
.unwrap()
call_sql_value_to_value!("a", ConcreteDataType::float64_datatype(), &sql_val)?
);
let sql_val = SqlValue::Number("3.0".to_string(), false);
let v = sql_value_to_value(
"a",
&ConcreteDataType::boolean_datatype(),
&sql_val,
None,
None,
false,
);
let v = call_sql_value_to_value!("a", ConcreteDataType::boolean_datatype(), &sql_val);
assert!(v.is_err());
assert!(format!("{v:?}").contains("Failed to parse number '3.0' to boolean column type"));
let sql_val = SqlValue::Boolean(true);
let v = sql_value_to_value(
"a",
&ConcreteDataType::float64_datatype(),
&sql_val,
None,
None,
false,
);
let v = call_sql_value_to_value!("a", ConcreteDataType::float64_datatype(), &sql_val);
assert!(v.is_err());
assert!(
format!("{v:?}").contains(
@@ -725,41 +681,18 @@ mod test {
);
let sql_val = SqlValue::HexStringLiteral("48656c6c6f20776f726c6421".to_string());
let v = sql_value_to_value(
"a",
&ConcreteDataType::binary_datatype(),
&sql_val,
None,
None,
false,
)
.unwrap();
let v = call_sql_value_to_value!("a", ConcreteDataType::binary_datatype(), &sql_val)?;
assert_eq!(Value::Binary(Bytes::from(b"Hello world!".as_slice())), v);
let sql_val = SqlValue::DoubleQuotedString("MorningMyFriends".to_string());
let v = sql_value_to_value(
"a",
&ConcreteDataType::binary_datatype(),
&sql_val,
None,
None,
false,
)
.unwrap();
let v = call_sql_value_to_value!("a", ConcreteDataType::binary_datatype(), &sql_val)?;
assert_eq!(
Value::Binary(Bytes::from(b"MorningMyFriends".as_slice())),
v
);
let sql_val = SqlValue::HexStringLiteral("9AF".to_string());
let v = sql_value_to_value(
"a",
&ConcreteDataType::binary_datatype(),
&sql_val,
None,
None,
false,
);
let v = call_sql_value_to_value!("a", ConcreteDataType::binary_datatype(), &sql_val);
assert!(v.is_err());
assert!(
format!("{v:?}").contains("odd number of digits"),
@@ -767,38 +700,16 @@ mod test {
);
let sql_val = SqlValue::HexStringLiteral("AG".to_string());
let v = sql_value_to_value(
"a",
&ConcreteDataType::binary_datatype(),
&sql_val,
None,
None,
false,
);
let v = call_sql_value_to_value!("a", ConcreteDataType::binary_datatype(), &sql_val);
assert!(v.is_err());
assert!(format!("{v:?}").contains("invalid character"), "v is {v:?}",);
let sql_val = SqlValue::DoubleQuotedString("MorningMyFriends".to_string());
let v = sql_value_to_value(
"a",
&ConcreteDataType::json_datatype(),
&sql_val,
None,
None,
false,
);
let v = call_sql_value_to_value!("a", ConcreteDataType::json_datatype(), &sql_val);
assert!(v.is_err());
let sql_val = SqlValue::DoubleQuotedString(r#"{"a":"b"}"#.to_string());
let v = sql_value_to_value(
"a",
&ConcreteDataType::json_datatype(),
&sql_val,
None,
None,
false,
)
.unwrap();
let v = call_sql_value_to_value!("a", ConcreteDataType::json_datatype(), &sql_val)?;
assert_eq!(
Value::Binary(Bytes::from(
jsonb::parse_value(r#"{"a":"b"}"#.as_bytes())
@@ -808,16 +719,15 @@ mod test {
)),
v
);
Ok(())
}
#[test]
fn test_parse_json_to_jsonb() {
match parse_string_to_value(
match call_parse_string_to_value!(
"json_col",
r#"{"a": "b"}"#.to_string(),
&ConcreteDataType::json_datatype(),
None,
false,
ConcreteDataType::json_datatype()
) {
Ok(Value::Binary(b)) => {
assert_eq!(
@@ -833,12 +743,10 @@ mod test {
}
assert!(
parse_string_to_value(
call_parse_string_to_value!(
"json_col",
r#"Nicola Kovac is the best rifler in the world"#.to_string(),
&ConcreteDataType::json_datatype(),
None,
false,
ConcreteDataType::json_datatype()
)
.is_err()
)
@@ -878,13 +786,10 @@ mod test {
#[test]
fn test_parse_date_literal() {
let value = sql_value_to_value(
let value = call_sql_value_to_value!(
"date",
&ConcreteDataType::date_datatype(),
&SqlValue::DoubleQuotedString("2022-02-22".to_string()),
None,
None,
false,
ConcreteDataType::date_datatype(),
&SqlValue::DoubleQuotedString("2022-02-22".to_string())
)
.unwrap();
assert_eq!(ConcreteDataType::date_datatype(), value.data_type());
@@ -895,13 +800,11 @@ mod test {
}
// with timezone
let value = sql_value_to_value(
let value = call_sql_value_to_value!(
"date",
&ConcreteDataType::date_datatype(),
ConcreteDataType::date_datatype(),
&SqlValue::DoubleQuotedString("2022-02-22".to_string()),
Some(&Timezone::from_tz_string("+07:00").unwrap()),
None,
false,
timezone = &Timezone::from_tz_string("+07:00").unwrap()
)
.unwrap();
assert_eq!(ConcreteDataType::date_datatype(), value.data_type());
@@ -913,16 +816,12 @@ mod test {
}
#[test]
fn test_parse_timestamp_literal() {
match parse_string_to_value(
fn test_parse_timestamp_literal() -> Result<()> {
match call_parse_string_to_value!(
"timestamp_col",
"2022-02-22T00:01:01+08:00".to_string(),
&ConcreteDataType::timestamp_millisecond_datatype(),
None,
false,
)
.unwrap()
{
ConcreteDataType::timestamp_millisecond_datatype()
)? {
Value::Timestamp(ts) => {
assert_eq!(1645459261000, ts.value());
assert_eq!(TimeUnit::Millisecond, ts.unit());
@@ -932,15 +831,11 @@ mod test {
}
}
match parse_string_to_value(
match call_parse_string_to_value!(
"timestamp_col",
"2022-02-22T00:01:01+08:00".to_string(),
&ConcreteDataType::timestamp_datatype(TimeUnit::Second),
None,
false,
)
.unwrap()
{
ConcreteDataType::timestamp_datatype(TimeUnit::Second)
)? {
Value::Timestamp(ts) => {
assert_eq!(1645459261, ts.value());
assert_eq!(TimeUnit::Second, ts.unit());
@@ -950,15 +845,11 @@ mod test {
}
}
match parse_string_to_value(
match call_parse_string_to_value!(
"timestamp_col",
"2022-02-22T00:01:01+08:00".to_string(),
&ConcreteDataType::timestamp_datatype(TimeUnit::Microsecond),
None,
false,
)
.unwrap()
{
ConcreteDataType::timestamp_datatype(TimeUnit::Microsecond)
)? {
Value::Timestamp(ts) => {
assert_eq!(1645459261000000, ts.value());
assert_eq!(TimeUnit::Microsecond, ts.unit());
@@ -968,15 +859,11 @@ mod test {
}
}
match parse_string_to_value(
match call_parse_string_to_value!(
"timestamp_col",
"2022-02-22T00:01:01+08:00".to_string(),
&ConcreteDataType::timestamp_datatype(TimeUnit::Nanosecond),
None,
false,
)
.unwrap()
{
ConcreteDataType::timestamp_datatype(TimeUnit::Nanosecond)
)? {
Value::Timestamp(ts) => {
assert_eq!(1645459261000000000, ts.value());
assert_eq!(TimeUnit::Nanosecond, ts.unit());
@@ -987,26 +874,21 @@ mod test {
}
assert!(
parse_string_to_value(
call_parse_string_to_value!(
"timestamp_col",
"2022-02-22T00:01:01+08".to_string(),
&ConcreteDataType::timestamp_datatype(TimeUnit::Nanosecond),
None,
false,
ConcreteDataType::timestamp_datatype(TimeUnit::Nanosecond)
)
.is_err()
);
// with timezone
match parse_string_to_value(
match call_parse_string_to_value!(
"timestamp_col",
"2022-02-22T00:01:01".to_string(),
&ConcreteDataType::timestamp_datatype(TimeUnit::Nanosecond),
Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap()),
false,
)
.unwrap()
{
ConcreteDataType::timestamp_datatype(TimeUnit::Nanosecond),
timezone = &Timezone::from_tz_string("Asia/Shanghai").unwrap()
)? {
Value::Timestamp(ts) => {
assert_eq!(1645459261000000000, ts.value());
assert_eq!("2022-02-21 16:01:01+0000", ts.to_iso8601_string());
@@ -1016,51 +898,42 @@ mod test {
unreachable!()
}
}
Ok(())
}
#[test]
fn test_parse_placeholder_value() {
assert!(
sql_value_to_value(
call_sql_value_to_value!(
"test",
&ConcreteDataType::string_datatype(),
ConcreteDataType::string_datatype(),
&SqlValue::Placeholder("default".into())
)
.is_err()
);
assert!(
call_sql_value_to_value!(
"test",
ConcreteDataType::string_datatype(),
&SqlValue::Placeholder("default".into()),
None,
None,
false
unary_op = UnaryOperator::Minus
)
.is_err()
);
assert!(
sql_value_to_value(
call_sql_value_to_value!(
"test",
&ConcreteDataType::string_datatype(),
&SqlValue::Placeholder("default".into()),
None,
Some(UnaryOperator::Minus),
false
)
.is_err()
);
assert!(
sql_value_to_value(
"test",
&ConcreteDataType::uint16_datatype(),
ConcreteDataType::uint16_datatype(),
&SqlValue::Number("3".into(), false),
None,
Some(UnaryOperator::Minus),
false
unary_op = UnaryOperator::Minus
)
.is_err()
);
assert!(
sql_value_to_value(
call_sql_value_to_value!(
"test",
&ConcreteDataType::uint16_datatype(),
&SqlValue::Number("3".into(), false),
None,
None,
false
ConcreteDataType::uint16_datatype(),
&SqlValue::Number("3".into(), false)
)
.is_ok()
);
@@ -1070,77 +943,60 @@ mod test {
fn test_auto_string_to_numeric() {
// Test with auto_string_to_numeric=true
let sql_val = SqlValue::SingleQuotedString("123".to_string());
let v = sql_value_to_value(
let v = call_sql_value_to_value!(
"a",
&ConcreteDataType::int32_datatype(),
ConcreteDataType::int32_datatype(),
&sql_val,
None,
None,
true,
auto_string_to_numeric
)
.unwrap();
assert_eq!(Value::Int32(123), v);
// Test with a float string
let sql_val = SqlValue::SingleQuotedString("3.5".to_string());
let v = sql_value_to_value(
let v = call_sql_value_to_value!(
"a",
&ConcreteDataType::float64_datatype(),
ConcreteDataType::float64_datatype(),
&sql_val,
None,
None,
true,
auto_string_to_numeric
)
.unwrap();
assert_eq!(Value::Float64(OrderedFloat(3.5)), v);
// Test with auto_string_to_numeric=false
let sql_val = SqlValue::SingleQuotedString("123".to_string());
let v = sql_value_to_value(
"a",
&ConcreteDataType::int32_datatype(),
&sql_val,
None,
None,
false,
);
let v = call_sql_value_to_value!("a", ConcreteDataType::int32_datatype(), &sql_val);
assert!(v.is_err());
// Test with an invalid numeric string but auto_string_to_numeric=true
// Should return an error now with the new auto_cast_to_numeric behavior
let sql_val = SqlValue::SingleQuotedString("not_a_number".to_string());
let v = sql_value_to_value(
let v = call_sql_value_to_value!(
"a",
&ConcreteDataType::int32_datatype(),
ConcreteDataType::int32_datatype(),
&sql_val,
None,
None,
true,
auto_string_to_numeric
);
assert!(v.is_err());
// Test with boolean type
let sql_val = SqlValue::SingleQuotedString("true".to_string());
let v = sql_value_to_value(
let v = call_sql_value_to_value!(
"a",
&ConcreteDataType::boolean_datatype(),
ConcreteDataType::boolean_datatype(),
&sql_val,
None,
None,
true,
auto_string_to_numeric
)
.unwrap();
assert_eq!(Value::Boolean(true), v);
// Non-numeric types should still be handled normally
let sql_val = SqlValue::SingleQuotedString("hello".to_string());
let v = sql_value_to_value(
let v = call_sql_value_to_value!(
"a",
&ConcreteDataType::string_datatype(),
ConcreteDataType::string_datatype(),
&sql_val,
None,
None,
true,
auto_string_to_numeric
);
assert!(v.is_ok());
}

View File

@@ -14,8 +14,8 @@
use common_time::timezone::Timezone;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnDefaultConstraint;
use datatypes::schema::constraint::{CURRENT_TIMESTAMP, CURRENT_TIMESTAMP_FN};
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema};
use snafu::ensure;
use sqlparser::ast::ValueWithSpan;
pub use sqlparser::ast::{
@@ -47,9 +47,12 @@ pub fn parse_column_default_constraint(
);
let default_constraint = match &opt.option {
ColumnOption::Default(Expr::Value(v)) => ColumnDefaultConstraint::Value(
sql_value_to_value(column_name, data_type, &v.value, timezone, None, false)?,
),
ColumnOption::Default(Expr::Value(v)) => {
let schema = ColumnSchema::new(column_name, data_type.clone(), true);
ColumnDefaultConstraint::Value(sql_value_to_value(
&schema, &v.value, timezone, None, false,
)?)
}
ColumnOption::Default(Expr::Function(func)) => {
let mut func = format!("{func}").to_lowercase();
// normalize CURRENT_TIMESTAMP to CURRENT_TIMESTAMP()
@@ -80,8 +83,7 @@ pub fn parse_column_default_constraint(
if let Expr::Value(v) = &**expr {
let value = sql_value_to_value(
column_name,
data_type,
&ColumnSchema::new(column_name, data_type.clone(), true),
&v.value,
timezone,
Some(*op),

View File

@@ -26,9 +26,9 @@ use std::sync::Arc;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value as Json};
use snafu::{ResultExt, ensure};
use snafu::{OptionExt, ResultExt, ensure};
use crate::error::{self, Error};
use crate::error::{self, InvalidJsonSnafu, Result, SerializeSnafu};
use crate::json::value::{JsonValue, JsonVariant};
use crate::types::json_type::{JsonNativeType, JsonNumberType, JsonObjectType};
use crate::types::{StructField, StructType};
@@ -71,7 +71,7 @@ impl JsonStructureSettings {
pub const RAW_FIELD: &'static str = "_raw";
/// Decode an encoded StructValue back into a serde_json::Value.
pub fn decode(&self, value: Value) -> Result<Json, Error> {
pub fn decode(&self, value: Value) -> Result<Json> {
let context = JsonContext {
key_path: String::new(),
settings: self,
@@ -82,7 +82,7 @@ impl JsonStructureSettings {
/// Decode a StructValue that was encoded with current settings back into a fully structured StructValue.
/// This is useful for reconstructing the original structure from encoded data, especially when
/// unstructured encoding was used for some fields.
pub fn decode_struct(&self, struct_value: StructValue) -> Result<StructValue, Error> {
pub fn decode_struct(&self, struct_value: StructValue) -> Result<StructValue> {
let context = JsonContext {
key_path: String::new(),
settings: self,
@@ -91,7 +91,11 @@ impl JsonStructureSettings {
}
/// Encode a serde_json::Value into a Value::Json using current settings.
pub fn encode(&self, json: Json) -> Result<Value, Error> {
pub fn encode(&self, json: Json) -> Result<Value> {
if let Some(json_struct) = self.json_struct() {
return encode_by_struct(json_struct, json);
}
let context = JsonContext {
key_path: String::new(),
settings: self,
@@ -104,13 +108,21 @@ impl JsonStructureSettings {
&self,
json: Json,
data_type: Option<&JsonNativeType>,
) -> Result<Value, Error> {
) -> Result<Value> {
let context = JsonContext {
key_path: String::new(),
settings: self,
};
encode_json_with_context(json, data_type, &context).map(|v| Value::Json(Box::new(v)))
}
fn json_struct(&self) -> Option<&StructType> {
match &self {
JsonStructureSettings::Structured(fields) => fields.as_ref(),
JsonStructureSettings::PartialUnstructuredByKey { fields, .. } => fields.as_ref(),
_ => None,
}
}
}
impl Default for JsonStructureSettings {
@@ -144,12 +156,54 @@ impl<'a> JsonContext<'a> {
}
}
fn encode_by_struct(json_struct: &StructType, mut json: Json) -> Result<Value> {
let Some(json_object) = json.as_object_mut() else {
return InvalidJsonSnafu {
value: "expect JSON object when struct is provided",
}
.fail();
};
let mut encoded = BTreeMap::new();
fn extract_field(json_object: &mut Map<String, Json>, field: &str) -> Result<Option<Json>> {
let (first, rest) = field.split_once('.').unwrap_or((field, ""));
if rest.is_empty() {
Ok(json_object.remove(first))
} else {
let Some(value) = json_object.get_mut(first) else {
return Ok(None);
};
let json_object = value.as_object_mut().with_context(|| InvalidJsonSnafu {
value: format!(r#"expect "{}" an object"#, first),
})?;
extract_field(json_object, rest)
}
}
let fields = json_struct.fields();
for field in fields.iter() {
let Some(field_value) = extract_field(json_object, field.name())? else {
continue;
};
let field_type: JsonNativeType = field.data_type().into();
let field_value = try_convert_to_expected_type(field_value, &field_type)?;
encoded.insert(field.name().to_string(), field_value);
}
let rest = serde_json::to_string(json_object).context(SerializeSnafu)?;
encoded.insert(JsonStructureSettings::RAW_FIELD.to_string(), rest.into());
let value: JsonValue = encoded.into();
Ok(Value::Json(Box::new(value)))
}
/// Main encoding function with key path tracking
pub fn encode_json_with_context<'a>(
json: Json,
data_type: Option<&JsonNativeType>,
context: &JsonContext<'a>,
) -> Result<JsonValue, Error> {
) -> Result<JsonValue> {
// Check if the entire encoding should be unstructured
if matches!(context.settings, JsonStructureSettings::UnstructuredRaw) {
let json_string = json.to_string();
@@ -215,7 +269,7 @@ fn encode_json_object_with_context<'a>(
mut json_object: Map<String, Json>,
fields: Option<&JsonObjectType>,
context: &JsonContext<'a>,
) -> Result<JsonValue, Error> {
) -> Result<JsonValue> {
let mut object = BTreeMap::new();
// First, process fields from the provided schema in their original order
if let Some(fields) = fields {
@@ -248,7 +302,7 @@ fn encode_json_array_with_context<'a>(
json_array: Vec<Json>,
item_type: Option<&JsonNativeType>,
context: &JsonContext<'a>,
) -> Result<JsonValue, Error> {
) -> Result<JsonValue> {
let json_array_len = json_array.len();
let mut items = Vec::with_capacity(json_array_len);
let mut element_type = item_type.cloned();
@@ -286,7 +340,7 @@ fn encode_json_value_with_context<'a>(
json: Json,
expected_type: Option<&JsonNativeType>,
context: &JsonContext<'a>,
) -> Result<JsonValue, Error> {
) -> Result<JsonValue> {
// Check if current key should be treated as unstructured
if context.is_unstructured_key() {
return Ok(json.to_string().into());
@@ -301,7 +355,7 @@ fn encode_json_value_with_context<'a>(
if let Some(expected) = expected_type
&& let Ok(value) = try_convert_to_expected_type(i, expected)
{
return Ok(value);
return Ok(value.into());
}
Ok(i.into())
} else if let Some(u) = n.as_u64() {
@@ -309,7 +363,7 @@ fn encode_json_value_with_context<'a>(
if let Some(expected) = expected_type
&& let Ok(value) = try_convert_to_expected_type(u, expected)
{
return Ok(value);
return Ok(value.into());
}
if u <= i64::MAX as u64 {
Ok((u as i64).into())
@@ -321,7 +375,7 @@ fn encode_json_value_with_context<'a>(
if let Some(expected) = expected_type
&& let Ok(value) = try_convert_to_expected_type(f, expected)
{
return Ok(value);
return Ok(value.into());
}
// Default to f64 for floating point numbers
@@ -335,7 +389,7 @@ fn encode_json_value_with_context<'a>(
if let Some(expected) = expected_type
&& let Ok(value) = try_convert_to_expected_type(s.as_str(), expected)
{
return Ok(value);
return Ok(value.into());
}
Ok(s.into())
}
@@ -345,10 +399,7 @@ fn encode_json_value_with_context<'a>(
}
/// Main decoding function with key path tracking
pub fn decode_value_with_context<'a>(
value: Value,
context: &JsonContext<'a>,
) -> Result<Json, Error> {
pub fn decode_value_with_context(value: Value, context: &JsonContext) -> Result<Json> {
// Check if the entire decoding should be unstructured
if matches!(context.settings, JsonStructureSettings::UnstructuredRaw) {
return decode_unstructured_value(value);
@@ -370,7 +421,7 @@ pub fn decode_value_with_context<'a>(
fn decode_struct_with_context<'a>(
struct_value: StructValue,
context: &JsonContext<'a>,
) -> Result<Json, Error> {
) -> Result<Json> {
let mut json_object = Map::with_capacity(struct_value.len());
let (items, fields) = struct_value.into_parts();
@@ -385,10 +436,7 @@ fn decode_struct_with_context<'a>(
}
/// Decode a list value to JSON array
fn decode_list_with_context<'a>(
list_value: ListValue,
context: &JsonContext<'a>,
) -> Result<Json, Error> {
fn decode_list_with_context(list_value: ListValue, context: &JsonContext) -> Result<Json> {
let mut json_array = Vec::with_capacity(list_value.len());
let data_items = list_value.take_items();
@@ -403,7 +451,7 @@ fn decode_list_with_context<'a>(
}
/// Decode unstructured value (stored as string)
fn decode_unstructured_value(value: Value) -> Result<Json, Error> {
fn decode_unstructured_value(value: Value) -> Result<Json> {
match value {
// Handle expected format: StructValue with single _raw field
Value::Struct(struct_value) => {
@@ -443,7 +491,7 @@ fn decode_unstructured_value(value: Value) -> Result<Json, Error> {
}
/// Decode primitive value to JSON
fn decode_primitive_value(value: Value) -> Result<Json, Error> {
fn decode_primitive_value(value: Value) -> Result<Json> {
match value {
Value::Null => Ok(Json::Null),
Value::Boolean(b) => Ok(Json::Bool(b)),
@@ -487,7 +535,7 @@ fn decode_primitive_value(value: Value) -> Result<Json, Error> {
fn decode_struct_with_settings<'a>(
struct_value: StructValue,
context: &JsonContext<'a>,
) -> Result<StructValue, Error> {
) -> Result<StructValue> {
// Check if we can return the struct directly (Structured case)
if matches!(context.settings, JsonStructureSettings::Structured(_)) {
return Ok(struct_value);
@@ -567,7 +615,7 @@ fn decode_struct_with_settings<'a>(
fn decode_list_with_settings<'a>(
list_value: ListValue,
context: &JsonContext<'a>,
) -> Result<ListValue, Error> {
) -> Result<ListValue> {
let mut items = Vec::with_capacity(list_value.len());
let (data_items, datatype) = list_value.into_parts();
@@ -592,7 +640,7 @@ fn decode_list_with_settings<'a>(
}
/// Helper function to decode a struct that was encoded with UnstructuredRaw settings
fn decode_unstructured_raw_struct(struct_value: StructValue) -> Result<StructValue, Error> {
fn decode_unstructured_raw_struct(struct_value: StructValue) -> Result<StructValue> {
// For UnstructuredRaw, the struct must have exactly one field named "_raw"
if struct_value.struct_type().fields().len() == 1 {
let field = &struct_value.struct_type().fields()[0];
@@ -636,12 +684,9 @@ fn decode_unstructured_raw_struct(struct_value: StructValue) -> Result<StructVal
}
/// Helper function to try converting a value to an expected type
fn try_convert_to_expected_type<T>(
value: T,
expected_type: &JsonNativeType,
) -> Result<JsonValue, Error>
fn try_convert_to_expected_type<T>(value: T, expected_type: &JsonNativeType) -> Result<JsonVariant>
where
T: Into<JsonValue>,
T: Into<JsonVariant>,
{
let value = value.into();
let cast_error = || {
@@ -650,7 +695,7 @@ where
}
.fail()
};
let actual_type = value.json_type().native_type();
let actual_type = &value.native_type();
match (actual_type, expected_type) {
(x, y) if x == y => Ok(value),
(JsonNativeType::Number(x), JsonNativeType::Number(y)) => match (x, y) {
@@ -691,6 +736,107 @@ mod tests {
use crate::data_type::ConcreteDataType;
use crate::types::ListType;
#[test]
fn test_encode_by_struct() {
let json_struct: StructType = [
StructField::new("s", ConcreteDataType::string_datatype(), true),
StructField::new("foo.i", ConcreteDataType::int64_datatype(), true),
StructField::new("x.y.z", ConcreteDataType::boolean_datatype(), true),
]
.into();
let json = json!({
"s": "hello",
"t": "world",
"foo": {
"i": 1,
"j": 2
},
"x": {
"y": {
"z": true
}
}
});
let value = encode_by_struct(&json_struct, json).unwrap();
assert_eq!(
value.to_string(),
r#"Json({ _raw: {"foo":{"j":2},"t":"world","x":{"y":{}}}, foo.i: 1, s: hello, x.y.z: true })"#
);
let json = json!({
"t": "world",
"foo": {
"i": 1,
"j": 2
},
"x": {
"y": {
"z": true
}
}
});
let value = encode_by_struct(&json_struct, json).unwrap();
assert_eq!(
value.to_string(),
r#"Json({ _raw: {"foo":{"j":2},"t":"world","x":{"y":{}}}, foo.i: 1, x.y.z: true })"#
);
let json = json!({
"s": 1234,
"foo": {
"i": 1,
"j": 2
},
"x": {
"y": {
"z": true
}
}
});
let value = encode_by_struct(&json_struct, json).unwrap();
assert_eq!(
value.to_string(),
r#"Json({ _raw: {"foo":{"j":2},"x":{"y":{}}}, foo.i: 1, s: 1234, x.y.z: true })"#
);
let json = json!({
"s": "hello",
"t": "world",
"foo": {
"i": "bar",
"j": 2
},
"x": {
"y": {
"z": true
}
}
});
let result = encode_by_struct(&json_struct, json);
assert_eq!(
result.unwrap_err().to_string(),
"Cannot cast value bar to Number(I64)"
);
let json = json!({
"s": "hello",
"t": "world",
"foo": {
"i": 1,
"j": 2
},
"x": {
"y": "z"
}
});
let result = encode_by_struct(&json_struct, json);
assert_eq!(
result.unwrap_err().to_string(),
r#"Invalid JSON: expect "y" an object"#
);
}
#[test]
fn test_encode_json_null() {
let json = Json::Null;

View File

@@ -82,6 +82,18 @@ impl From<f64> for JsonNumber {
}
}
impl From<Number> for JsonNumber {
fn from(n: Number) -> Self {
if let Some(i) = n.as_i64() {
i.into()
} else if let Some(i) = n.as_u64() {
i.into()
} else {
n.as_f64().unwrap_or(f64::NAN).into()
}
}
}
impl Display for JsonNumber {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
@@ -109,7 +121,28 @@ pub enum JsonVariant {
}
impl JsonVariant {
fn native_type(&self) -> JsonNativeType {
pub(crate) fn as_i64(&self) -> Option<i64> {
match self {
JsonVariant::Number(n) => n.as_i64(),
_ => None,
}
}
pub(crate) fn as_u64(&self) -> Option<u64> {
match self {
JsonVariant::Number(n) => n.as_u64(),
_ => None,
}
}
pub(crate) fn as_f64(&self) -> Option<f64> {
match self {
JsonVariant::Number(n) => Some(n.as_f64()),
_ => None,
}
}
pub(crate) fn native_type(&self) -> JsonNativeType {
match self {
JsonVariant::Null => JsonNativeType::Null,
JsonVariant::Bool(_) => JsonNativeType::Bool,
@@ -205,6 +238,32 @@ impl<K: Into<String>, V: Into<JsonVariant>, const N: usize> From<[(K, V); N]> fo
}
}
impl From<serde_json::Value> for JsonVariant {
fn from(v: serde_json::Value) -> Self {
fn helper(v: serde_json::Value) -> JsonVariant {
match v {
serde_json::Value::Null => JsonVariant::Null,
serde_json::Value::Bool(b) => b.into(),
serde_json::Value::Number(n) => n.into(),
serde_json::Value::String(s) => s.into(),
serde_json::Value::Array(array) => {
JsonVariant::Array(array.into_iter().map(helper).collect())
}
serde_json::Value::Object(object) => {
JsonVariant::Object(object.into_iter().map(|(k, v)| (k, helper(v))).collect())
}
}
}
helper(v)
}
}
impl From<BTreeMap<String, JsonVariant>> for JsonVariant {
fn from(v: BTreeMap<String, JsonVariant>) -> Self {
Self::Object(v)
}
}
impl Display for JsonVariant {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
@@ -277,24 +336,11 @@ impl JsonValue {
}
pub(crate) fn as_i64(&self) -> Option<i64> {
match self.json_variant {
JsonVariant::Number(n) => n.as_i64(),
_ => None,
}
self.json_variant.as_i64()
}
pub(crate) fn as_u64(&self) -> Option<u64> {
match self.json_variant {
JsonVariant::Number(n) => n.as_u64(),
_ => None,
}
}
pub(crate) fn as_f64(&self) -> Option<f64> {
match self.json_variant {
JsonVariant::Number(n) => Some(n.as_f64()),
_ => None,
}
self.json_variant.as_u64()
}
pub(crate) fn as_f64_lossy(&self) -> Option<f64> {

View File

@@ -122,9 +122,9 @@ pub struct StructField {
}
impl StructField {
pub fn new(name: String, data_type: ConcreteDataType, nullable: bool) -> Self {
pub fn new<T: Into<String>>(name: T, data_type: ConcreteDataType, nullable: bool) -> Self {
StructField {
name,
name: name.into(),
data_type,
nullable,
metadata: BTreeMap::new(),

View File

@@ -410,8 +410,7 @@ fn sql_value_to_value(
})?
} else {
common_sql::convert::sql_value_to_value(
column,
&column_schema.data_type,
column_schema,
sql_val,
timezone,
None,

View File

@@ -52,6 +52,7 @@ use common_time::Timestamp;
use common_time::range::TimestampRange;
use datafusion_expr::LogicalPlan;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use humantime::format_duration;
use itertools::Itertools;
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
@@ -644,11 +645,20 @@ impl StatementExecutor {
})?
.unit();
let start_column = ColumnSchema::new(
"range_start",
ConcreteDataType::timestamp_datatype(time_unit),
false,
);
let end_column = ColumnSchema::new(
"range_end",
ConcreteDataType::timestamp_datatype(time_unit),
false,
);
let mut time_ranges = Vec::with_capacity(sql_values_time_range.len());
for (start, end) in sql_values_time_range {
let start = common_sql::convert::sql_value_to_value(
"range_start",
&ConcreteDataType::timestamp_datatype(time_unit),
&start_column,
start,
Some(&query_ctx.timezone()),
None,
@@ -667,8 +677,7 @@ impl StatementExecutor {
})?;
let end = common_sql::convert::sql_value_to_value(
"range_end",
&ConcreteDataType::timestamp_datatype(time_unit),
&end_column,
end,
Some(&query_ctx.timezone()),
None,

View File

@@ -242,8 +242,12 @@ fn values_to_vectors_by_exact_types(
args.iter()
.zip(exact_types.iter())
.map(|(value, data_type)| {
let data_type = &ConcreteDataType::from_arrow_type(data_type);
let value = sql_value_to_value(DUMMY_COLUMN, data_type, value, tz, None, false)
let schema = ColumnSchema::new(
DUMMY_COLUMN,
ConcreteDataType::from_arrow_type(data_type),
true,
);
let value = sql_value_to_value(&schema, value, tz, None, false)
.context(error::SqlCommonSnafu)?;
Ok(value_to_vector(value))
@@ -260,10 +264,12 @@ fn values_to_vectors_by_valid_types(
args.iter()
.map(|value| {
for data_type in valid_types {
let data_type = &ConcreteDataType::from_arrow_type(data_type);
if let Ok(value) =
sql_value_to_value(DUMMY_COLUMN, data_type, value, tz, None, false)
{
let schema = ColumnSchema::new(
DUMMY_COLUMN,
ConcreteDataType::from_arrow_type(data_type),
true,
);
if let Ok(value) = sql_value_to_value(&schema, value, tz, None, false) {
return Ok(value_to_vector(value));
}
}

View File

@@ -50,7 +50,7 @@ use common_time::{Timestamp, Timezone};
use datafusion_common::tree_node::TreeNodeVisitor;
use datafusion_expr::LogicalPlan;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{RawSchema, Schema};
use datatypes::schema::{ColumnSchema, RawSchema, Schema};
use datatypes::value::Value;
use partition::expr::{Operand, PartitionExpr, RestrictedOp};
use partition::multi_dim::MultiDimPartitionRule;
@@ -2001,8 +2001,7 @@ fn convert_value(
unary_op: Option<UnaryOperator>,
) -> Result<Value> {
sql_value_to_value(
"<NONAME>",
&data_type,
&ColumnSchema::new("<NONAME>", data_type, true),
value,
Some(timezone),
unary_op,

View File

@@ -22,6 +22,7 @@ use common_time::{Date, Timestamp};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_expr::LogicalPlan;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::types::TimestampType;
use datatypes::value::{self, Value};
use itertools::Itertools;
@@ -254,9 +255,10 @@ pub fn convert_value(param: &ParamValue, t: &ConcreteDataType) -> Result<ScalarV
/// Convert an MySQL expression to a scalar value.
/// It automatically handles the conversion of strings to numeric values.
pub fn convert_expr_to_scalar_value(param: &Expr, t: &ConcreteDataType) -> Result<ScalarValue> {
let column_schema = ColumnSchema::new("", t.clone(), true);
match param {
Expr::Value(v) => {
let v = sql_value_to_value("", t, &v.value, None, None, true);
let v = sql_value_to_value(&column_schema, &v.value, None, None, true);
match v {
Ok(v) => v
.try_to_scalar_value(t)
@@ -268,7 +270,7 @@ pub fn convert_expr_to_scalar_value(param: &Expr, t: &ConcreteDataType) -> Resul
}
}
Expr::UnaryOp { op, expr } if let Expr::Value(v) = &**expr => {
let v = sql_value_to_value("", t, &v.value, None, Some(*op), true);
let v = sql_value_to_value(&column_schema, &v.value, None, Some(*op), true);
match v {
Ok(v) => v
.try_to_scalar_value(t)

View File

@@ -40,4 +40,8 @@ impl Dialect for GreptimeDbDialect {
fn supports_filter_during_aggregation(&self) -> bool {
true
}
fn supports_struct_literal(&self) -> bool {
true
}
}

View File

@@ -215,6 +215,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Invalid JSON structure setting, reason: {reason}"))]
InvalidJsonStructureSetting {
reason: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to serialize column default constraint"))]
SerializeColumnDefaultConstraint {
#[snafu(implicit)]
@@ -374,6 +381,7 @@ impl ErrorExt for Error {
InvalidColumnOption { .. }
| InvalidExprAsOptionValue { .. }
| InvalidJsonStructureSetting { .. }
| InvalidDatabaseName { .. }
| InvalidDatabaseOption { .. }
| ColumnTypeMismatch { .. }

View File

@@ -40,16 +40,17 @@ pub(super) fn parse_json_datatype_options(parser: &mut Parser<'_>) -> Result<Opt
#[cfg(test)]
mod tests {
use sqlparser::ast::DataType;
use sqlparser::ast::{DataType, Expr, Ident, StructField};
use crate::dialect::GreptimeDbDialect;
use crate::parser::{ParseOptions, ParserContext};
use crate::statements::OptionMap;
use crate::statements::create::{
Column, JSON_FORMAT_FULL_STRUCTURED, JSON_FORMAT_PARTIAL, JSON_FORMAT_RAW, JSON_OPT_FORMAT,
JSON_OPT_UNSTRUCTURED_KEYS,
Column, JSON_FORMAT_FULL_STRUCTURED, JSON_FORMAT_PARTIAL, JSON_FORMAT_RAW, JSON_OPT_FIELDS,
JSON_OPT_FORMAT, JSON_OPT_UNSTRUCTURED_KEYS,
};
use crate::statements::statement::Statement;
use crate::util::OptionValue;
#[test]
fn test_parse_json_datatype_options() {
@@ -77,6 +78,42 @@ mod tests {
let sql = r#"
CREATE TABLE json_data (
my_json JSON(format = "partial", fields = Struct<i Int, "o.a" String, "o.b" String, `x.y.z` Float64>),
ts TIMESTAMP TIME INDEX,
)"#;
let options = parse(sql).unwrap();
assert_eq!(options.len(), 2);
let option = options.value(JSON_OPT_FIELDS);
let expected = OptionValue::try_new(Expr::Struct {
values: vec![],
fields: vec![
StructField {
field_name: Some(Ident::new("i")),
field_type: DataType::Int(None),
options: None,
},
StructField {
field_name: Some(Ident::with_quote('"', "o.a")),
field_type: DataType::String(None),
options: None,
},
StructField {
field_name: Some(Ident::with_quote('"', "o.b")),
field_type: DataType::String(None),
options: None,
},
StructField {
field_name: Some(Ident::with_quote('`', "x.y.z")),
field_type: DataType::Float64,
options: None,
},
],
})
.ok();
assert_eq!(option, expected.as_ref());
let sql = r#"
CREATE TABLE json_data (
my_json JSON(format = "partial", unstructured_keys = ["k", "foo.bar", "a.b.c"]),
ts TIMESTAMP TIME INDEX,
)"#;

View File

@@ -40,6 +40,7 @@ use api::v1::SemanticType;
use common_sql::default_constraint::parse_column_default_constraint;
use common_time::timezone::Timezone;
use datatypes::extension::json::{JsonExtensionType, JsonMetadata};
use datatypes::json::JsonStructureSettings;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{COMMENT_KEY, ColumnDefaultConstraint, ColumnSchema};
use datatypes::types::json_type::JsonNativeType;
@@ -281,8 +282,17 @@ pub fn sql_data_type_to_concrete_data_type(
}
},
SqlDataType::JSON => {
let format = if column_extensions.json_datatype_options.is_some() {
JsonFormat::Native(Box::new(JsonNativeType::Null))
let format = if let Some(x) = column_extensions.build_json_structure_settings()? {
if let Some(fields) = match x {
JsonStructureSettings::Structured(fields) => fields,
JsonStructureSettings::UnstructuredRaw => None,
JsonStructureSettings::PartialUnstructuredByKey { fields, .. } => fields,
} {
let datatype = &ConcreteDataType::Struct(fields);
JsonFormat::Native(Box::new(datatype.into()))
} else {
JsonFormat::Native(Box::new(JsonNativeType::Null))
}
} else {
JsonFormat::Jsonb
};

View File

@@ -14,27 +14,30 @@
use std::collections::{HashMap, HashSet};
use std::fmt::{Display, Formatter};
use std::sync::Arc;
use common_catalog::consts::FILE_ENGINE;
use datatypes::data_type::ConcreteDataType;
use datatypes::json::JsonStructureSettings;
use datatypes::schema::{
FulltextOptions, SkippingIndexOptions, VectorDistanceMetric, VectorIndexEngineType,
VectorIndexOptions,
};
use datatypes::types::StructType;
use itertools::Itertools;
use serde::Serialize;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use sqlparser::ast::{ColumnOptionDef, DataType, Expr, Query};
use sqlparser_derive::{Visit, VisitMut};
use crate::ast::{ColumnDef, Ident, ObjectName, Value as SqlValue};
use crate::error::{
InvalidFlowQuerySnafu, InvalidSqlSnafu, Result, SetFulltextOptionSnafu,
SetSkippingIndexOptionSnafu,
InvalidFlowQuerySnafu, InvalidJsonStructureSettingSnafu, InvalidSqlSnafu, Result,
SetFulltextOptionSnafu, SetSkippingIndexOptionSnafu,
};
use crate::statements::OptionMap;
use crate::statements::statement::Statement;
use crate::statements::tql::Tql;
use crate::statements::{OptionMap, sql_data_type_to_concrete_data_type};
use crate::util::OptionValue;
const LINE_SEP: &str = ",\n";
@@ -44,6 +47,7 @@ pub const VECTOR_OPT_DIM: &str = "dim";
pub const JSON_OPT_UNSTRUCTURED_KEYS: &str = "unstructured_keys";
pub const JSON_OPT_FORMAT: &str = "format";
pub(crate) const JSON_OPT_FIELDS: &str = "fields";
pub const JSON_FORMAT_FULL_STRUCTURED: &str = "structured";
pub const JSON_FORMAT_RAW: &str = "raw";
pub const JSON_FORMAT_PARTIAL: &str = "partial";
@@ -346,14 +350,51 @@ impl ColumnExtensions {
})
.unwrap_or_default();
let fields = if let Some(value) = options.value(JSON_OPT_FIELDS) {
let fields = value
.as_struct_fields()
.context(InvalidJsonStructureSettingSnafu {
reason: format!(r#"expect "{JSON_OPT_FIELDS}" a struct, actual: "{value}""#,),
})?;
let fields = fields
.iter()
.map(|field| {
let name = field.field_name.as_ref().map(|x| x.value.clone()).context(
InvalidJsonStructureSettingSnafu {
reason: format!(r#"missing field name in "{field}""#),
},
)?;
let datatype = sql_data_type_to_concrete_data_type(
&field.field_type,
&Default::default(),
)?;
Ok(datatypes::types::StructField::new(name, datatype, true))
})
.collect::<Result<_>>()?;
Some(StructType::new(Arc::new(fields)))
} else {
None
};
options
.get(JSON_OPT_FORMAT)
.map(|format| match format {
JSON_FORMAT_FULL_STRUCTURED => Ok(JsonStructureSettings::Structured(None)),
JSON_FORMAT_PARTIAL => Ok(JsonStructureSettings::PartialUnstructuredByKey {
fields: None,
unstructured_keys,
}),
JSON_FORMAT_FULL_STRUCTURED => Ok(JsonStructureSettings::Structured(fields)),
JSON_FORMAT_PARTIAL => {
let fields = fields.map(|fields| {
let mut fields = Arc::unwrap_or_clone(fields.fields());
fields.push(datatypes::types::StructField::new(
JsonStructureSettings::RAW_FIELD.to_string(),
ConcreteDataType::string_datatype(),
true,
));
StructType::new(Arc::new(fields))
});
Ok(JsonStructureSettings::PartialUnstructuredByKey {
fields,
unstructured_keys,
})
}
JSON_FORMAT_RAW => Ok(JsonStructureSettings::UnstructuredRaw),
_ => InvalidSqlSnafu {
msg: format!("unknown JSON datatype 'format': {format}"),

View File

@@ -19,7 +19,8 @@ use itertools::Itertools;
use serde::Serialize;
use snafu::ensure;
use sqlparser::ast::{
Array, Expr, Ident, ObjectName, SetExpr, SqlOption, TableFactor, Value, ValueWithSpan,
Array, Expr, Ident, ObjectName, SetExpr, SqlOption, StructField, TableFactor, Value,
ValueWithSpan,
};
use sqlparser_derive::{Visit, VisitMut};
@@ -52,9 +53,12 @@ pub fn format_raw_object_name(name: &ObjectName) -> String {
pub struct OptionValue(Expr);
impl OptionValue {
fn try_new(expr: Expr) -> Result<Self> {
pub(crate) fn try_new(expr: Expr) -> Result<Self> {
ensure!(
matches!(expr, Expr::Value(_) | Expr::Identifier(_) | Expr::Array(_)),
matches!(
expr,
Expr::Value(_) | Expr::Identifier(_) | Expr::Array(_) | Expr::Struct { .. }
),
InvalidExprAsOptionValueSnafu {
error: format!("{expr} not accepted")
}
@@ -106,6 +110,13 @@ impl OptionValue {
_ => None,
}
}
pub(crate) fn as_struct_fields(&self) -> Option<&[StructField]> {
match &self.0 {
Expr::Struct { fields, .. } => Some(fields),
_ => None,
}
}
}
impl From<String> for OptionValue {

View File

@@ -0,0 +1,10 @@
{"did":"did:plc:yj3sjq3blzpynh27cumnp5ks","time_us":1732206349000167,"kind":"commit","commit":{"rev":"3lbhtytnn2k2f","operation":"create","collection":"app.bsky.feed.post","rkey":"3lbhtyteurk2y","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.095Z","langs":["en"],"reply":{"parent":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"},"root":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"}},"text":"aaaaah. LIght shines in a corner of WTF...."},"cid":"bafyreidblutgvj75o4q4akzyyejedjj6l3it6hgqwee6jpwv2wqph5fsgm"}}
{"did":"did:plc:3i4xf2v4wcnyktgv6satke64","time_us":1732206349000644,"kind":"commit","commit":{"rev":"3lbhuvzds6d2a","operation":"create","collection":"app.bsky.feed.like","rkey":"3lbhuvzdked2a","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.221Z","subject":{"cid":"bafyreidjvrcmckkm765mct5fph36x7kupkfo35rjklbf2k76xkzwyiauge","uri":"at://did:plc:azrv4rcbws6kmcga4fsbphg2/app.bsky.feed.post/3lbgjdpbiec2l"}},"cid":"bafyreia5l5vrkh5oj4cjyhcqby2dprhyvcyofo2q5562tijlae2pzih23m"}}
{"did":"did:plc:gccfnqqizz4urhchsaie6jft","time_us":1732206349001108,"kind":"commit","commit":{"rev":"3lbhuvze3gi2u","operation":"create","collection":"app.bsky.graph.follow","rkey":"3lbhuvzdtmi2u","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:27:40.923Z","subject":"did:plc:r7cdh4sgzqbfdc6wcdxxti7c"},"cid":"bafyreiew2p6cgirfaj45qoenm4fgumib7xoloclrap3jgkz5es7g7kby3i"}}
{"did":"did:plc:msxqf3twq7abtdw7dbfskphk","time_us":1732206349001372,"kind":"commit","commit":{"rev":"3lbhueija5p22","operation":"create","collection":"app.bsky.feed.like","rkey":"3lbhueiizcx22","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:15:58.232Z","subject":{"cid":"bafyreiavpshyqzrlo5m7fqodjhs6jevweqnif4phasiwimv4a7mnsqi2fe","uri":"at://did:plc:fusulxqc52zbrc75fi6xrcof/app.bsky.feed.post/3lbhskq5zn22f"}},"cid":"bafyreidjix4dauj2afjlbzmhj3a7gwftcevvmmy6edww6vrjdbst26rkby"}}
{"did":"did:plc:l5o3qjrmfztir54cpwlv2eme","time_us":1732206349001905,"kind":"commit","commit":{"rev":"3lbhtytohxc2o","operation":"create","collection":"app.bsky.feed.post","rkey":"3lbhtytjqzk2q","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.254Z","langs":["en"],"reply":{"parent":{"cid":"bafyreih35fe2jj3gchmgk4amold4l6sfxd2sby5wrg3jrws5fkdypxrbg4","uri":"at://did:plc:6wx2gg5yqgvmlu35r6y3bk6d/app.bsky.feed.post/3lbhtj2eb4s2o"},"root":{"cid":"bafyreifipyt3vctd4ptuoicvio7rbr5xvjv4afwuggnd2prnmn55mu6luu","uri":"at://did:plc:474ldquxwzrlcvjhhbbk2wte/app.bsky.feed.post/3lbhdzrynik27"}},"text":"okay i take mine back because I hadnt heard this one yet^^"},"cid":"bafyreigzdsdne3z2xxcakgisieyj7y47hj6eg7lj6v4q25ah5q2qotu5ku"}}
{"did":"did:plc:jkaaf5j2yb2pvpx3ualm3vbh","time_us":1732206349002758,"kind":"commit","commit":{"rev":"3lbhudfo3yi2w","operation":"create","collection":"app.bsky.graph.follow","rkey":"3lbhudfnw4y2w","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:15:21.495Z","subject":"did:plc:amsdn2tbjxo3xrwqneqhh4cm"},"cid":"bafyreiaa2vsdr4ckwjg4jq47zfd7mewidywfz3qh3dmglcd6ozi4xwdega"}}
{"did":"did:plc:tdwz2h4id5dxezvohftsmffu","time_us":1732206349003106,"kind":"commit","commit":{"rev":"3lbhujcp4ix2n","operation":"create","collection":"app.bsky.graph.follow","rkey":"3lbhujcoxmp2n","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:18:39.913Z","subject":"did:plc:gf3vum7insztt5rxrpxdz2id"},"cid":"bafyreihaatlpar3abtx6ck3kde2ksic6zzflk4ppduhf6dxurytqrv33ni"}}
{"did":"did:plc:cdsd346mwow7aj3tgfkwsct3","time_us":1732206349003461,"kind":"commit","commit":{"rev":"3lbhus5vior2t","operation":"create","collection":"app.bsky.feed.repost","rkey":"3lbhus5vbtz2t","record":{"$type":"app.bsky.feed.repost","createdAt":"2024-11-21T16:23:36.714Z","subject":{"cid":"bafyreieaacfiobnuqvjhhsndyi5s3fd6krbzdduxsyrzfv43kczpcmkl6y","uri":"at://did:plc:o5q6dynpme4ndolc3heztasm/app.bsky.feed.post/3lbfli3qsoc2o"}},"cid":"bafyreid5ycocp5zq2g7fcx2xxzxrbafuh7b5qhtwuwiomzo6vqila2cbpu"}}
{"did":"did:plc:s4bwqchfzm6gjqfeb6mexgbu","time_us":1732206349003907,"kind":"commit","commit":{"rev":"3lbhuvzeccx2w","operation":"create","collection":"app.bsky.feed.post","rkey":"3lbhuvxf4qs2m","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:25:44.376Z","langs":["en"],"reply":{"parent":{"cid":"bafyreiaev27cfcxxvn2pdhrwwquzwgclujnulzbcfnn4p4fwgb6migjhw4","uri":"at://did:plc:zec6cslvgc3hhdatrhk6pq5p/app.bsky.feed.post/3lbhujvds4c2b"},"root":{"cid":"bafyreif7qjxhvecwnhlynijj6pf47jwvtkahsz3zh2kaipwu2bw2dxwaqq","uri":"at://did:plc:s4bwqchfzm6gjqfeb6mexgbu/app.bsky.feed.post/3lbhug53kkk2m"}},"text":"\n⌜ Blinking. She hadn't realized she spoke out loud. ⌟\n\n It was nothing like that — . I was only thinking . . . \n\n⌜ Trailing off, her mind occupied. ⌟\n"},"cid":"bafyreibugobcike72y4zxvdyz2oopyt6ywwqfielcwojkb27p7s6rlomgm"}}
{"did":"did:plc:hbc74dlsxhq53kp5oxges6d7","time_us":1732206349004769,"kind":"commit","commit":{"rev":"3lbhuvzedg52j","operation":"create","collection":"app.bsky.feed.like","rkey":"3lbhuvzdyof2j","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.167Z","subject":{"cid":"bafyreiaumopip75nzx2xjbugtwemdppsyx54bd2odf6q45f3o7xkocgari","uri":"at://did:plc:ig2jv6gqup4t7gdq2pmanknw/app.bsky.feed.post/3lbhuvtlaec2c"}},"cid":"bafyreidjk2svg2fdjiiwohmfmvp3hdxhpb33ycnixzbkyib5m6cocindxq"}}

View File

@@ -0,0 +1,14 @@
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+
| data | ts |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+
| {_raw: {"commit":{"cid":"bafyreidblutgvj75o4q4akzyyejedjj6l3it6hgqwee6jpwv2wqph5fsgm","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.095Z","langs":["en"],"reply":{"parent":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"},"root":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"}},"text":"aaaaah. LIght shines in a corner of WTF...."},"rev":"3lbhtytnn2k2f","rkey":"3lbhtyteurk2y"}}, commit.collection: app.bsky.feed.post, commit.operation: create, did: did:plc:yj3sjq3blzpynh27cumnp5ks, kind: commit, time_us: 1732206349000167} | 1970-01-01T00:00:00.001 |
| {_raw: {"commit":{"cid":"bafyreia5l5vrkh5oj4cjyhcqby2dprhyvcyofo2q5562tijlae2pzih23m","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.221Z","subject":{"cid":"bafyreidjvrcmckkm765mct5fph36x7kupkfo35rjklbf2k76xkzwyiauge","uri":"at://did:plc:azrv4rcbws6kmcga4fsbphg2/app.bsky.feed.post/3lbgjdpbiec2l"}},"rev":"3lbhuvzds6d2a","rkey":"3lbhuvzdked2a"}}, commit.collection: app.bsky.feed.like, commit.operation: create, did: did:plc:3i4xf2v4wcnyktgv6satke64, kind: commit, time_us: 1732206349000644} | 1970-01-01T00:00:00.002 |
| {_raw: {"commit":{"cid":"bafyreiew2p6cgirfaj45qoenm4fgumib7xoloclrap3jgkz5es7g7kby3i","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:27:40.923Z","subject":"did:plc:r7cdh4sgzqbfdc6wcdxxti7c"},"rev":"3lbhuvze3gi2u","rkey":"3lbhuvzdtmi2u"}}, commit.collection: app.bsky.graph.follow, commit.operation: create, did: did:plc:gccfnqqizz4urhchsaie6jft, kind: commit, time_us: 1732206349001108} | 1970-01-01T00:00:00.003 |
| {_raw: {"commit":{"cid":"bafyreidjix4dauj2afjlbzmhj3a7gwftcevvmmy6edww6vrjdbst26rkby","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:15:58.232Z","subject":{"cid":"bafyreiavpshyqzrlo5m7fqodjhs6jevweqnif4phasiwimv4a7mnsqi2fe","uri":"at://did:plc:fusulxqc52zbrc75fi6xrcof/app.bsky.feed.post/3lbhskq5zn22f"}},"rev":"3lbhueija5p22","rkey":"3lbhueiizcx22"}}, commit.collection: app.bsky.feed.like, commit.operation: create, did: did:plc:msxqf3twq7abtdw7dbfskphk, kind: commit, time_us: 1732206349001372} | 1970-01-01T00:00:00.004 |
| {_raw: {"commit":{"cid":"bafyreigzdsdne3z2xxcakgisieyj7y47hj6eg7lj6v4q25ah5q2qotu5ku","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.254Z","langs":["en"],"reply":{"parent":{"cid":"bafyreih35fe2jj3gchmgk4amold4l6sfxd2sby5wrg3jrws5fkdypxrbg4","uri":"at://did:plc:6wx2gg5yqgvmlu35r6y3bk6d/app.bsky.feed.post/3lbhtj2eb4s2o"},"root":{"cid":"bafyreifipyt3vctd4ptuoicvio7rbr5xvjv4afwuggnd2prnmn55mu6luu","uri":"at://did:plc:474ldquxwzrlcvjhhbbk2wte/app.bsky.feed.post/3lbhdzrynik27"}},"text":"okay i take mine back because I hadnt heard this one yet^^"},"rev":"3lbhtytohxc2o","rkey":"3lbhtytjqzk2q"}}, commit.collection: app.bsky.feed.post, commit.operation: create, did: did:plc:l5o3qjrmfztir54cpwlv2eme, kind: commit, time_us: 1732206349001905} | 1970-01-01T00:00:00.005 |
| {_raw: {"commit":{"cid":"bafyreiaa2vsdr4ckwjg4jq47zfd7mewidywfz3qh3dmglcd6ozi4xwdega","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:15:21.495Z","subject":"did:plc:amsdn2tbjxo3xrwqneqhh4cm"},"rev":"3lbhudfo3yi2w","rkey":"3lbhudfnw4y2w"}}, commit.collection: app.bsky.graph.follow, commit.operation: create, did: did:plc:jkaaf5j2yb2pvpx3ualm3vbh, kind: commit, time_us: 1732206349002758} | 1970-01-01T00:00:00.006 |
| {_raw: {"commit":{"cid":"bafyreihaatlpar3abtx6ck3kde2ksic6zzflk4ppduhf6dxurytqrv33ni","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:18:39.913Z","subject":"did:plc:gf3vum7insztt5rxrpxdz2id"},"rev":"3lbhujcp4ix2n","rkey":"3lbhujcoxmp2n"}}, commit.collection: app.bsky.graph.follow, commit.operation: create, did: did:plc:tdwz2h4id5dxezvohftsmffu, kind: commit, time_us: 1732206349003106} | 1970-01-01T00:00:00.007 |
| {_raw: {"commit":{"cid":"bafyreid5ycocp5zq2g7fcx2xxzxrbafuh7b5qhtwuwiomzo6vqila2cbpu","record":{"$type":"app.bsky.feed.repost","createdAt":"2024-11-21T16:23:36.714Z","subject":{"cid":"bafyreieaacfiobnuqvjhhsndyi5s3fd6krbzdduxsyrzfv43kczpcmkl6y","uri":"at://did:plc:o5q6dynpme4ndolc3heztasm/app.bsky.feed.post/3lbfli3qsoc2o"}},"rev":"3lbhus5vior2t","rkey":"3lbhus5vbtz2t"}}, commit.collection: app.bsky.feed.repost, commit.operation: create, did: did:plc:cdsd346mwow7aj3tgfkwsct3, kind: commit, time_us: 1732206349003461} | 1970-01-01T00:00:00.008 |
| {_raw: {"commit":{"cid":"bafyreibugobcike72y4zxvdyz2oopyt6ywwqfielcwojkb27p7s6rlomgm","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:25:44.376Z","langs":["en"],"reply":{"parent":{"cid":"bafyreiaev27cfcxxvn2pdhrwwquzwgclujnulzbcfnn4p4fwgb6migjhw4","uri":"at://did:plc:zec6cslvgc3hhdatrhk6pq5p/app.bsky.feed.post/3lbhujvds4c2b"},"root":{"cid":"bafyreif7qjxhvecwnhlynijj6pf47jwvtkahsz3zh2kaipwu2bw2dxwaqq","uri":"at://did:plc:s4bwqchfzm6gjqfeb6mexgbu/app.bsky.feed.post/3lbhug53kkk2m"}},"text":"\n⌜ Blinking. She hadn't realized she spoke out loud. ⌟\n\n It was nothing like that — . I was only thinking . . . \n\n⌜ Trailing off, her mind occupied. ⌟\n"},"rev":"3lbhuvzeccx2w","rkey":"3lbhuvxf4qs2m"}}, commit.collection: app.bsky.feed.post, commit.operation: create, did: did:plc:s4bwqchfzm6gjqfeb6mexgbu, kind: commit, time_us: 1732206349003907} | 1970-01-01T00:00:00.009 |
| {_raw: {"commit":{"cid":"bafyreidjk2svg2fdjiiwohmfmvp3hdxhpb33ycnixzbkyib5m6cocindxq","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.167Z","subject":{"cid":"bafyreiaumopip75nzx2xjbugtwemdppsyx54bd2odf6q45f3o7xkocgari","uri":"at://did:plc:ig2jv6gqup4t7gdq2pmanknw/app.bsky.feed.post/3lbhuvtlaec2c"}},"rev":"3lbhuvzedg52j","rkey":"3lbhuvzdyof2j"}}, commit.collection: app.bsky.feed.like, commit.operation: create, did: did:plc:hbc74dlsxhq53kp5oxges6d7, kind: commit, time_us: 1732206349004769} | 1970-01-01T00:00:00.010 |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+

View File

@@ -62,7 +62,6 @@ mod test {
use common_meta::rpc::router::region_distribution;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_test_util::recordbatch::check_output_stream;
use frontend::instance::Instance;
use query::parser::QueryLanguageParser;
use query::query_engine::DefaultSerializer;
@@ -76,11 +75,10 @@ mod test {
use super::*;
use crate::standalone::GreptimeDbStandaloneBuilder;
use crate::test_util::execute_sql_and_expect;
use crate::tests;
use crate::tests::MockDistributedInstance;
use crate::tests::test_util::{
MockInstance, both_instances_cases, distributed, execute_sql, standalone,
};
use crate::tests::test_util::{MockInstance, both_instances_cases, distributed, standalone};
#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_handle_ddl_request() {
@@ -1242,8 +1240,7 @@ CREATE TABLE {table_name} (
.unwrap();
assert!(matches!(output.data, OutputData::AffectedRows(3)));
let output = execute_sql(&frontend, "show create table auto_created_table").await;
let sql = "show create table auto_created_table";
let expected = r#"+--------------------+---------------------------------------------------+
| Table | Create Table |
+--------------------+---------------------------------------------------+
@@ -1261,6 +1258,6 @@ CREATE TABLE {table_name} (
| | 'compaction.type' = 'twcs' |
| | ) |
+--------------------+---------------------------------------------------+"#;
check_output_stream(output.data, expected).await;
execute_sql_and_expect(&frontend, sql, expected).await;
}
}

View File

@@ -24,6 +24,7 @@ use common_base::Plugins;
use common_config::Configurable;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
use common_query::Output;
use common_runtime::runtime::BuilderBuild;
use common_runtime::{Builder as RuntimeBuilder, Runtime};
use common_test_util::ports;
@@ -774,3 +775,16 @@ pub(crate) async fn prepare_another_catalog_and_schema(instance: &Instance) {
.await
.unwrap();
}
pub async fn execute_sql(instance: &Arc<Instance>, sql: &str) -> Output {
SqlQueryHandler::do_query(instance.as_ref(), sql, QueryContext::arc())
.await
.remove(0)
.unwrap()
}
pub async fn execute_sql_and_expect(instance: &Arc<Instance>, sql: &str, expected: &str) {
let output = execute_sql(instance, sql).await;
let output = output.data.pretty_print().await;
assert_eq!(output, expected.trim());
}

View File

@@ -18,7 +18,7 @@ mod instance_noop_wal_test;
mod instance_test;
mod promql_test;
mod reconcile_table;
pub(crate) mod test_util;
pub mod test_util;
use std::collections::HashMap;
use std::sync::Arc;

View File

@@ -27,8 +27,8 @@ use store_api::storage::RegionId;
use table::metadata::TableId;
use crate::cluster::GreptimeDbClusterBuilder;
use crate::test_util::{StorageType, TempDirGuard, get_test_store_config};
use crate::tests::test_util::{MockInstanceBuilder, TestContext, execute_sql, wait_procedure};
use crate::test_util::{StorageType, TempDirGuard, execute_sql, get_test_store_config};
use crate::tests::test_util::{MockInstanceBuilder, TestContext, wait_procedure};
/// Helper function to get table route information for GC procedure
async fn get_table_route(

View File

@@ -18,9 +18,8 @@ use common_test_util::recordbatch::check_output_stream;
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use crate::cluster::GreptimeDbClusterBuilder;
use crate::tests::test_util::{
MockInstanceBuilder, RebuildableMockInstance, TestContext, execute_sql,
};
use crate::test_util::execute_sql;
use crate::tests::test_util::{MockInstanceBuilder, RebuildableMockInstance, TestContext};
pub(crate) async fn distributed_with_noop_wal() -> TestContext {
common_telemetry::init_default_ut_logging();

View File

@@ -23,9 +23,10 @@ use common_test_util::recordbatch::check_output_stream;
use table::table_reference::TableReference;
use crate::cluster::GreptimeDbClusterBuilder;
use crate::test_util::execute_sql;
use crate::tests::test_util::{
MockInstanceBuilder, RebuildableMockInstance, TestContext, dump_kvbackend, execute_sql,
restore_kvbackend, try_execute_sql, wait_procedure,
MockInstanceBuilder, RebuildableMockInstance, TestContext, dump_kvbackend, restore_kvbackend,
try_execute_sql, wait_procedure,
};
const CREATE_MONITOR_TABLE_SQL: &str = r#"

View File

@@ -439,10 +439,6 @@ pub fn find_testing_resource(path: &str) -> String {
prepare_path(&p)
}
pub async fn execute_sql(instance: &Arc<Instance>, sql: &str) -> Output {
execute_sql_with(instance, sql, QueryContext::arc()).await
}
pub async fn try_execute_sql(instance: &Arc<Instance>, sql: &str) -> Result<Output> {
try_execute_sql_with(instance, sql, QueryContext::arc()).await
}
@@ -455,16 +451,6 @@ pub async fn try_execute_sql_with(
instance.do_query(sql, query_ctx).await.remove(0)
}
pub async fn execute_sql_with(
instance: &Arc<Instance>,
sql: &str,
query_ctx: QueryContextRef,
) -> Output {
try_execute_sql_with(instance, sql, query_ctx)
.await
.unwrap_or_else(|e| panic!("Failed to execute sql: {sql}, error: {e:?}"))
}
/// Dump the kv backend to a vector of key-value pairs.
pub async fn dump_kvbackend(kv_backend: &KvBackendRef) -> Vec<(Vec<u8>, Vec<u8>)> {
let req = RangeRequest::new().with_range(vec![0], vec![0]);

View File

@@ -0,0 +1,111 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::io::BufRead;
use std::sync::Arc;
use std::{fs, io};
use common_test_util::find_workspace_path;
use frontend::instance::Instance;
use tests_integration::standalone::GreptimeDbStandaloneBuilder;
use tests_integration::test_util::execute_sql_and_expect;
#[tokio::test]
async fn test_load_jsonbench_data() {
common_telemetry::init_default_ut_logging();
let instance = GreptimeDbStandaloneBuilder::new("test_load_jsonbench_data")
.build()
.await;
let frontend = instance.fe_instance();
create_table(frontend).await;
desc_table(frontend).await;
insert_data(frontend).await.unwrap();
query_data(frontend).await.unwrap();
}
async fn query_data(frontend: &Arc<Instance>) -> io::Result<()> {
let sql = "SELECT count(*) FROM bluesky";
let expected = r#"
+----------+
| count(*) |
+----------+
| 10 |
+----------+
"#;
execute_sql_and_expect(frontend, sql, expected).await;
let sql = "SELECT * FROM bluesky ORDER BY ts";
let expected = fs::read_to_string(find_workspace_path(
"tests-integration/resources/jsonbench-select-all.txt",
))?;
execute_sql_and_expect(frontend, sql, &expected).await;
Ok(())
}
async fn insert_data(frontend: &Arc<Instance>) -> io::Result<()> {
let file = fs::File::open(find_workspace_path(
"tests-integration/resources/jsonbench-head-10.ndjson",
))?;
let reader = io::BufReader::new(file);
for (i, line) in reader.lines().enumerate() {
let line = line?;
if line.is_empty() {
continue;
}
let sql = format!(
"INSERT INTO bluesky (ts, data) VALUES ({}, '{}')",
i + 1,
line.replace("'", "''"), // standard method to escape the single quote
);
execute_sql_and_expect(frontend, &sql, "Affected Rows: 1").await;
}
Ok(())
}
async fn desc_table(frontend: &Arc<Instance>) {
let sql = "DESC TABLE bluesky";
let expected = r#"
+--------+----------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+--------+----------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
| data | Json<Object{"_raw": String, "commit.collection": String, "commit.operation": String, "did": String, "kind": String, "time_us": Number(I64)}> | | YES | | FIELD |
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
+--------+----------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+"#;
execute_sql_and_expect(frontend, sql, expected).await;
}
async fn create_table(frontend: &Arc<Instance>) {
let sql = r#"
CREATE TABLE bluesky (
"data" JSON (
format = "partial",
fields = Struct<
kind String,
"commit.operation" String,
"commit.collection" String,
did String,
time_us Bigint
>,
),
ts Timestamp TIME INDEX,
)
"#;
execute_sql_and_expect(frontend, sql, "Affected Rows: 0").await;
}

View File

@@ -16,6 +16,7 @@
mod grpc;
#[macro_use]
mod http;
mod jsonbench;
#[macro_use]
mod sql;
#[macro_use]

View File

@@ -15,5 +15,6 @@ extend-exclude = [
"*.sql",
"*.result",
"src/pipeline/benches/data.log",
"cyborg/pnpm-lock.yaml"
"cyborg/pnpm-lock.yaml",
"tests-integration/resources/*"
]