refactor!: remove pb_value to json conversion, keep json output consistent (#7063)

* refactor: remove pb_value to json

* chore: remove unused module
This commit is contained in:
Ning Sun
2025-10-10 15:09:20 +08:00
committed by GitHub
parent af213be403
commit aa84642afc
5 changed files with 24 additions and 203 deletions

2
Cargo.lock generated
View File

@@ -3962,7 +3962,6 @@ dependencies = [
"arrow",
"arrow-array",
"arrow-schema",
"base64 0.22.1",
"common-base",
"common-decimal",
"common-error",
@@ -3971,7 +3970,6 @@ dependencies = [
"common-time",
"datafusion-common",
"enum_dispatch",
"greptime-proto",
"jsonb",
"num",
"num-traits",

View File

@@ -15,7 +15,6 @@ workspace = true
arrow.workspace = true
arrow-array.workspace = true
arrow-schema.workspace = true
base64.workspace = true
common-base.workspace = true
common-decimal.workspace = true
common-error.workspace = true
@@ -24,7 +23,6 @@ common-telemetry.workspace = true
common-time.workspace = true
datafusion-common.workspace = true
enum_dispatch = "0.3"
greptime-proto.workspace = true
jsonb.workspace = true
num = "0.4"
num-traits = "0.2"

View File

@@ -18,8 +18,6 @@ use std::sync::Arc;
use arrow::datatypes::{DataType as ArrowDataType, Field};
use arrow_array::{Array, ListArray};
use base64::Engine as _;
use base64::engine::general_purpose::URL_SAFE;
use common_base::bytes::{Bytes, StringBytes};
use common_decimal::Decimal128;
use common_telemetry::error;
@@ -30,10 +28,8 @@ use common_time::timestamp::{TimeUnit, Timestamp};
use common_time::{Duration, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timezone};
use datafusion_common::ScalarValue;
use datafusion_common::scalar::ScalarStructBuilder;
use greptime_proto::v1::value::ValueData;
pub use ordered_float::OrderedFloat;
use serde::{Deserialize, Serialize, Serializer};
use serde_json::{Number, Value as JsonValue};
use snafu::{ResultExt, ensure};
use crate::error::{self, ConvertArrowArrayToScalarsSnafu, Error, Result, TryFromValueSnafu};
@@ -1402,186 +1398,15 @@ impl ValueRef<'_> {
}
}
pub fn column_data_to_json(data: ValueData) -> JsonValue {
match data {
ValueData::BinaryValue(b) => JsonValue::String(URL_SAFE.encode(b)),
ValueData::BoolValue(b) => JsonValue::Bool(b),
ValueData::U8Value(i) => JsonValue::Number(i.into()),
ValueData::U16Value(i) => JsonValue::Number(i.into()),
ValueData::U32Value(i) => JsonValue::Number(i.into()),
ValueData::U64Value(i) => JsonValue::Number(i.into()),
ValueData::I8Value(i) => JsonValue::Number(i.into()),
ValueData::I16Value(i) => JsonValue::Number(i.into()),
ValueData::I32Value(i) => JsonValue::Number(i.into()),
ValueData::I64Value(i) => JsonValue::Number(i.into()),
ValueData::F32Value(f) => Number::from_f64(f as f64)
.map(JsonValue::Number)
.unwrap_or(JsonValue::Null),
ValueData::F64Value(f) => Number::from_f64(f)
.map(JsonValue::Number)
.unwrap_or(JsonValue::Null),
ValueData::StringValue(s) => JsonValue::String(s),
ValueData::DateValue(d) => JsonValue::String(Date::from(d).to_string()),
ValueData::DatetimeValue(d) => {
JsonValue::String(Timestamp::new_microsecond(d).to_iso8601_string())
}
ValueData::TimeSecondValue(d) => JsonValue::String(Time::new_second(d).to_iso8601_string()),
ValueData::TimeMillisecondValue(d) => {
JsonValue::String(Time::new_millisecond(d).to_iso8601_string())
}
ValueData::TimeMicrosecondValue(d) => {
JsonValue::String(Time::new_microsecond(d).to_iso8601_string())
}
ValueData::TimeNanosecondValue(d) => {
JsonValue::String(Time::new_nanosecond(d).to_iso8601_string())
}
ValueData::TimestampMicrosecondValue(d) => {
JsonValue::String(Timestamp::new_microsecond(d).to_iso8601_string())
}
ValueData::TimestampMillisecondValue(d) => {
JsonValue::String(Timestamp::new_millisecond(d).to_iso8601_string())
}
ValueData::TimestampNanosecondValue(d) => {
JsonValue::String(Timestamp::new_nanosecond(d).to_iso8601_string())
}
ValueData::TimestampSecondValue(d) => {
JsonValue::String(Timestamp::new_second(d).to_iso8601_string())
}
ValueData::IntervalYearMonthValue(d) => JsonValue::String(format!("interval year [{}]", d)),
ValueData::IntervalMonthDayNanoValue(d) => JsonValue::String(format!(
"interval month [{}][{}][{}]",
d.months, d.days, d.nanoseconds
)),
ValueData::IntervalDayTimeValue(d) => JsonValue::String(format!("interval day [{}]", d)),
ValueData::Decimal128Value(d) => {
JsonValue::String(format!("decimal128 [{}][{}]", d.hi, d.lo))
}
}
}
#[cfg(test)]
mod tests {
use arrow::datatypes::DataType as ArrowDataType;
use common_time::timezone::set_default_timezone;
use greptime_proto::v1::{
Decimal128 as ProtoDecimal128, IntervalMonthDayNano as ProtoIntervalMonthDayNano,
};
use num_traits::Float;
use super::*;
use crate::vectors::ListVectorBuilder;
#[test]
fn test_column_data_to_json() {
set_default_timezone(Some("Asia/Shanghai")).unwrap();
assert_eq!(
column_data_to_json(ValueData::BinaryValue(b"hello".to_vec())),
JsonValue::String("aGVsbG8=".to_string())
);
assert_eq!(
column_data_to_json(ValueData::BoolValue(true)),
JsonValue::Bool(true)
);
assert_eq!(
column_data_to_json(ValueData::U8Value(1)),
JsonValue::Number(1.into())
);
assert_eq!(
column_data_to_json(ValueData::U16Value(2)),
JsonValue::Number(2.into())
);
assert_eq!(
column_data_to_json(ValueData::U32Value(3)),
JsonValue::Number(3.into())
);
assert_eq!(
column_data_to_json(ValueData::U64Value(4)),
JsonValue::Number(4.into())
);
assert_eq!(
column_data_to_json(ValueData::I8Value(5)),
JsonValue::Number(5.into())
);
assert_eq!(
column_data_to_json(ValueData::I16Value(6)),
JsonValue::Number(6.into())
);
assert_eq!(
column_data_to_json(ValueData::I32Value(7)),
JsonValue::Number(7.into())
);
assert_eq!(
column_data_to_json(ValueData::I64Value(8)),
JsonValue::Number(8.into())
);
assert_eq!(
column_data_to_json(ValueData::F32Value(9.0)),
JsonValue::Number(Number::from_f64(9.0_f64).unwrap())
);
assert_eq!(
column_data_to_json(ValueData::F64Value(10.0)),
JsonValue::Number(Number::from_f64(10.0_f64).unwrap())
);
assert_eq!(
column_data_to_json(ValueData::StringValue("hello".to_string())),
JsonValue::String("hello".to_string())
);
assert_eq!(
column_data_to_json(ValueData::DateValue(123)),
JsonValue::String("1970-05-04".to_string())
);
assert_eq!(
column_data_to_json(ValueData::DatetimeValue(456)),
JsonValue::String("1970-01-01 08:00:00.000456+0800".to_string())
);
assert_eq!(
column_data_to_json(ValueData::TimeSecondValue(789)),
JsonValue::String("08:13:09+0800".to_string())
);
assert_eq!(
column_data_to_json(ValueData::TimeMillisecondValue(789)),
JsonValue::String("08:00:00.789+0800".to_string())
);
assert_eq!(
column_data_to_json(ValueData::TimeMicrosecondValue(789)),
JsonValue::String("08:00:00.000789+0800".to_string())
);
assert_eq!(
column_data_to_json(ValueData::TimestampMillisecondValue(1234567890)),
JsonValue::String("1970-01-15 14:56:07.890+0800".to_string())
);
assert_eq!(
column_data_to_json(ValueData::TimestampNanosecondValue(1234567890123456789)),
JsonValue::String("2009-02-14 07:31:30.123456789+0800".to_string())
);
assert_eq!(
column_data_to_json(ValueData::TimestampSecondValue(1234567890)),
JsonValue::String("2009-02-14 07:31:30+0800".to_string())
);
assert_eq!(
column_data_to_json(ValueData::IntervalYearMonthValue(12)),
JsonValue::String("interval year [12]".to_string())
);
assert_eq!(
column_data_to_json(ValueData::IntervalMonthDayNanoValue(
ProtoIntervalMonthDayNano {
months: 1,
days: 2,
nanoseconds: 3,
}
)),
JsonValue::String("interval month [1][2][3]".to_string())
);
assert_eq!(
column_data_to_json(ValueData::IntervalDayTimeValue(4)),
JsonValue::String("interval day [4]".to_string())
);
assert_eq!(
column_data_to_json(ValueData::Decimal128Value(ProtoDecimal128 { hi: 5, lo: 6 })),
JsonValue::String("decimal128 [5][6]".to_string())
);
}
#[test]
fn test_try_from_scalar_value() {
assert_eq!(

View File

@@ -19,6 +19,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::Instant;
use api::helper::pb_value_to_value_ref;
use async_trait::async_trait;
use axum::body::Bytes;
use axum::extract::{FromRequest, Multipart, Path, Query, Request, State};
@@ -31,7 +32,6 @@ use common_catalog::consts::default_engine;
use common_error::ext::{BoxedError, ErrorExt};
use common_query::{Output, OutputData};
use common_telemetry::{error, warn};
use datatypes::value::column_data_to_json;
use headers::ContentType;
use lazy_static::lazy_static;
use mime_guess::mime;
@@ -455,9 +455,9 @@ async fn dryrun_pipeline_inner(
.filter_map(|row| {
if let Some(rows) = row.rows {
let table_name = row.table_name;
let schema = rows.schema;
let result_schema = rows.schema;
let schema = schema
let schema = result_schema
.iter()
.map(|cs| {
let mut map = Map::new();
@@ -493,25 +493,25 @@ async fn dryrun_pipeline_inner(
.into_iter()
.enumerate()
.map(|(idx, v)| {
v.value_data
.map(|d| {
let mut map = Map::new();
map.insert("value".to_string(), column_data_to_json(d));
map.insert(
"key".to_string(),
schema[idx][name_key].clone(),
);
map.insert(
"semantic_type".to_string(),
schema[idx][column_type_key].clone(),
);
map.insert(
"data_type".to_string(),
schema[idx][data_type_key].clone(),
);
JsonValue::Object(map)
})
.unwrap_or(JsonValue::Null)
let mut map = Map::new();
let value_ref = pb_value_to_value_ref(
&v,
&result_schema[idx].datatype_extension,
);
let greptime_value: datatypes::value::Value = value_ref.into();
let serde_json_value =
serde_json::Value::try_from(greptime_value).unwrap();
map.insert("value".to_string(), serde_json_value);
map.insert("key".to_string(), schema[idx][name_key].clone());
map.insert(
"semantic_type".to_string(),
schema[idx][column_type_key].clone(),
);
map.insert(
"data_type".to_string(),
schema[idx][data_type_key].clone(),
);
JsonValue::Object(map)
})
.collect()
})

View File

@@ -3591,7 +3591,7 @@ transform:
"data_type": "TIMESTAMP_NANOSECOND",
"key": "time",
"semantic_type": "TIMESTAMP",
"value": "2024-05-25 20:16:37.217+0000"
"value": 1716668197217000000i64
}
],
[
@@ -3629,7 +3629,7 @@ transform:
"data_type": "TIMESTAMP_NANOSECOND",
"key": "time",
"semantic_type": "TIMESTAMP",
"value": "2024-05-25 20:16:38.217+0000"
"value": 1716668198217000000i64
}
]
]);