diff --git a/Cargo.lock b/Cargo.lock index b45790cdfc..c0f9ade4c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3962,7 +3962,6 @@ dependencies = [ "arrow", "arrow-array", "arrow-schema", - "base64 0.22.1", "common-base", "common-decimal", "common-error", @@ -3971,7 +3970,6 @@ dependencies = [ "common-time", "datafusion-common", "enum_dispatch", - "greptime-proto", "jsonb", "num", "num-traits", diff --git a/src/datatypes/Cargo.toml b/src/datatypes/Cargo.toml index e64402afd5..8eb642f5d6 100644 --- a/src/datatypes/Cargo.toml +++ b/src/datatypes/Cargo.toml @@ -15,7 +15,6 @@ workspace = true arrow.workspace = true arrow-array.workspace = true arrow-schema.workspace = true -base64.workspace = true common-base.workspace = true common-decimal.workspace = true common-error.workspace = true @@ -24,7 +23,6 @@ common-telemetry.workspace = true common-time.workspace = true datafusion-common.workspace = true enum_dispatch = "0.3" -greptime-proto.workspace = true jsonb.workspace = true num = "0.4" num-traits = "0.2" diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index bc2ffba4e0..d2eb41a015 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -18,8 +18,6 @@ use std::sync::Arc; use arrow::datatypes::{DataType as ArrowDataType, Field}; use arrow_array::{Array, ListArray}; -use base64::Engine as _; -use base64::engine::general_purpose::URL_SAFE; use common_base::bytes::{Bytes, StringBytes}; use common_decimal::Decimal128; use common_telemetry::error; @@ -30,10 +28,8 @@ use common_time::timestamp::{TimeUnit, Timestamp}; use common_time::{Duration, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timezone}; use datafusion_common::ScalarValue; use datafusion_common::scalar::ScalarStructBuilder; -use greptime_proto::v1::value::ValueData; pub use ordered_float::OrderedFloat; use serde::{Deserialize, Serialize, Serializer}; -use serde_json::{Number, Value as JsonValue}; use snafu::{ResultExt, ensure}; use crate::error::{self, ConvertArrowArrayToScalarsSnafu, Error, Result, TryFromValueSnafu}; @@ -1402,186 +1398,15 @@ impl ValueRef<'_> { } } -pub fn column_data_to_json(data: ValueData) -> JsonValue { - match data { - ValueData::BinaryValue(b) => JsonValue::String(URL_SAFE.encode(b)), - ValueData::BoolValue(b) => JsonValue::Bool(b), - ValueData::U8Value(i) => JsonValue::Number(i.into()), - ValueData::U16Value(i) => JsonValue::Number(i.into()), - ValueData::U32Value(i) => JsonValue::Number(i.into()), - ValueData::U64Value(i) => JsonValue::Number(i.into()), - ValueData::I8Value(i) => JsonValue::Number(i.into()), - ValueData::I16Value(i) => JsonValue::Number(i.into()), - ValueData::I32Value(i) => JsonValue::Number(i.into()), - ValueData::I64Value(i) => JsonValue::Number(i.into()), - ValueData::F32Value(f) => Number::from_f64(f as f64) - .map(JsonValue::Number) - .unwrap_or(JsonValue::Null), - ValueData::F64Value(f) => Number::from_f64(f) - .map(JsonValue::Number) - .unwrap_or(JsonValue::Null), - ValueData::StringValue(s) => JsonValue::String(s), - ValueData::DateValue(d) => JsonValue::String(Date::from(d).to_string()), - ValueData::DatetimeValue(d) => { - JsonValue::String(Timestamp::new_microsecond(d).to_iso8601_string()) - } - ValueData::TimeSecondValue(d) => JsonValue::String(Time::new_second(d).to_iso8601_string()), - ValueData::TimeMillisecondValue(d) => { - JsonValue::String(Time::new_millisecond(d).to_iso8601_string()) - } - ValueData::TimeMicrosecondValue(d) => { - JsonValue::String(Time::new_microsecond(d).to_iso8601_string()) - } - ValueData::TimeNanosecondValue(d) => { - JsonValue::String(Time::new_nanosecond(d).to_iso8601_string()) - } - ValueData::TimestampMicrosecondValue(d) => { - JsonValue::String(Timestamp::new_microsecond(d).to_iso8601_string()) - } - ValueData::TimestampMillisecondValue(d) => { - JsonValue::String(Timestamp::new_millisecond(d).to_iso8601_string()) - } - ValueData::TimestampNanosecondValue(d) => { - JsonValue::String(Timestamp::new_nanosecond(d).to_iso8601_string()) - } - ValueData::TimestampSecondValue(d) => { - JsonValue::String(Timestamp::new_second(d).to_iso8601_string()) - } - ValueData::IntervalYearMonthValue(d) => JsonValue::String(format!("interval year [{}]", d)), - ValueData::IntervalMonthDayNanoValue(d) => JsonValue::String(format!( - "interval month [{}][{}][{}]", - d.months, d.days, d.nanoseconds - )), - ValueData::IntervalDayTimeValue(d) => JsonValue::String(format!("interval day [{}]", d)), - ValueData::Decimal128Value(d) => { - JsonValue::String(format!("decimal128 [{}][{}]", d.hi, d.lo)) - } - } -} - #[cfg(test)] mod tests { use arrow::datatypes::DataType as ArrowDataType; use common_time::timezone::set_default_timezone; - use greptime_proto::v1::{ - Decimal128 as ProtoDecimal128, IntervalMonthDayNano as ProtoIntervalMonthDayNano, - }; use num_traits::Float; use super::*; use crate::vectors::ListVectorBuilder; - #[test] - fn test_column_data_to_json() { - set_default_timezone(Some("Asia/Shanghai")).unwrap(); - assert_eq!( - column_data_to_json(ValueData::BinaryValue(b"hello".to_vec())), - JsonValue::String("aGVsbG8=".to_string()) - ); - assert_eq!( - column_data_to_json(ValueData::BoolValue(true)), - JsonValue::Bool(true) - ); - assert_eq!( - column_data_to_json(ValueData::U8Value(1)), - JsonValue::Number(1.into()) - ); - assert_eq!( - column_data_to_json(ValueData::U16Value(2)), - JsonValue::Number(2.into()) - ); - assert_eq!( - column_data_to_json(ValueData::U32Value(3)), - JsonValue::Number(3.into()) - ); - assert_eq!( - column_data_to_json(ValueData::U64Value(4)), - JsonValue::Number(4.into()) - ); - assert_eq!( - column_data_to_json(ValueData::I8Value(5)), - JsonValue::Number(5.into()) - ); - assert_eq!( - column_data_to_json(ValueData::I16Value(6)), - JsonValue::Number(6.into()) - ); - assert_eq!( - column_data_to_json(ValueData::I32Value(7)), - JsonValue::Number(7.into()) - ); - assert_eq!( - column_data_to_json(ValueData::I64Value(8)), - JsonValue::Number(8.into()) - ); - assert_eq!( - column_data_to_json(ValueData::F32Value(9.0)), - JsonValue::Number(Number::from_f64(9.0_f64).unwrap()) - ); - assert_eq!( - column_data_to_json(ValueData::F64Value(10.0)), - JsonValue::Number(Number::from_f64(10.0_f64).unwrap()) - ); - assert_eq!( - column_data_to_json(ValueData::StringValue("hello".to_string())), - JsonValue::String("hello".to_string()) - ); - assert_eq!( - column_data_to_json(ValueData::DateValue(123)), - JsonValue::String("1970-05-04".to_string()) - ); - assert_eq!( - column_data_to_json(ValueData::DatetimeValue(456)), - JsonValue::String("1970-01-01 08:00:00.000456+0800".to_string()) - ); - assert_eq!( - column_data_to_json(ValueData::TimeSecondValue(789)), - JsonValue::String("08:13:09+0800".to_string()) - ); - assert_eq!( - column_data_to_json(ValueData::TimeMillisecondValue(789)), - JsonValue::String("08:00:00.789+0800".to_string()) - ); - assert_eq!( - column_data_to_json(ValueData::TimeMicrosecondValue(789)), - JsonValue::String("08:00:00.000789+0800".to_string()) - ); - assert_eq!( - column_data_to_json(ValueData::TimestampMillisecondValue(1234567890)), - JsonValue::String("1970-01-15 14:56:07.890+0800".to_string()) - ); - assert_eq!( - column_data_to_json(ValueData::TimestampNanosecondValue(1234567890123456789)), - JsonValue::String("2009-02-14 07:31:30.123456789+0800".to_string()) - ); - assert_eq!( - column_data_to_json(ValueData::TimestampSecondValue(1234567890)), - JsonValue::String("2009-02-14 07:31:30+0800".to_string()) - ); - assert_eq!( - column_data_to_json(ValueData::IntervalYearMonthValue(12)), - JsonValue::String("interval year [12]".to_string()) - ); - assert_eq!( - column_data_to_json(ValueData::IntervalMonthDayNanoValue( - ProtoIntervalMonthDayNano { - months: 1, - days: 2, - nanoseconds: 3, - } - )), - JsonValue::String("interval month [1][2][3]".to_string()) - ); - assert_eq!( - column_data_to_json(ValueData::IntervalDayTimeValue(4)), - JsonValue::String("interval day [4]".to_string()) - ); - assert_eq!( - column_data_to_json(ValueData::Decimal128Value(ProtoDecimal128 { hi: 5, lo: 6 })), - JsonValue::String("decimal128 [5][6]".to_string()) - ); - } - #[test] fn test_try_from_scalar_value() { assert_eq!( diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index ec74d0c386..0341232b1f 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -19,6 +19,7 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Instant; +use api::helper::pb_value_to_value_ref; use async_trait::async_trait; use axum::body::Bytes; use axum::extract::{FromRequest, Multipart, Path, Query, Request, State}; @@ -31,7 +32,6 @@ use common_catalog::consts::default_engine; use common_error::ext::{BoxedError, ErrorExt}; use common_query::{Output, OutputData}; use common_telemetry::{error, warn}; -use datatypes::value::column_data_to_json; use headers::ContentType; use lazy_static::lazy_static; use mime_guess::mime; @@ -455,9 +455,9 @@ async fn dryrun_pipeline_inner( .filter_map(|row| { if let Some(rows) = row.rows { let table_name = row.table_name; - let schema = rows.schema; + let result_schema = rows.schema; - let schema = schema + let schema = result_schema .iter() .map(|cs| { let mut map = Map::new(); @@ -493,25 +493,25 @@ async fn dryrun_pipeline_inner( .into_iter() .enumerate() .map(|(idx, v)| { - v.value_data - .map(|d| { - let mut map = Map::new(); - map.insert("value".to_string(), column_data_to_json(d)); - map.insert( - "key".to_string(), - schema[idx][name_key].clone(), - ); - map.insert( - "semantic_type".to_string(), - schema[idx][column_type_key].clone(), - ); - map.insert( - "data_type".to_string(), - schema[idx][data_type_key].clone(), - ); - JsonValue::Object(map) - }) - .unwrap_or(JsonValue::Null) + let mut map = Map::new(); + let value_ref = pb_value_to_value_ref( + &v, + &result_schema[idx].datatype_extension, + ); + let greptime_value: datatypes::value::Value = value_ref.into(); + let serde_json_value = + serde_json::Value::try_from(greptime_value).unwrap(); + map.insert("value".to_string(), serde_json_value); + map.insert("key".to_string(), schema[idx][name_key].clone()); + map.insert( + "semantic_type".to_string(), + schema[idx][column_type_key].clone(), + ); + map.insert( + "data_type".to_string(), + schema[idx][data_type_key].clone(), + ); + JsonValue::Object(map) }) .collect() }) diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index bba04439bd..c3b0a86885 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -3591,7 +3591,7 @@ transform: "data_type": "TIMESTAMP_NANOSECOND", "key": "time", "semantic_type": "TIMESTAMP", - "value": "2024-05-25 20:16:37.217+0000" + "value": 1716668197217000000i64 } ], [ @@ -3629,7 +3629,7 @@ transform: "data_type": "TIMESTAMP_NANOSECOND", "key": "time", "semantic_type": "TIMESTAMP", - "value": "2024-05-25 20:16:38.217+0000" + "value": 1716668198217000000i64 } ] ]);