refactor: remove duplicated valueref to json (#7062)

This commit is contained in:
Ning Sun
2025-10-10 15:08:26 +08:00
committed by GitHub
parent 779865d389
commit af213be403
5 changed files with 45 additions and 56 deletions

View File

@@ -238,6 +238,12 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to process JSONB value"))]
InvalidJsonb {
error: jsonb::Error,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -257,6 +263,7 @@ impl ErrorExt for Error {
| InvalidTimestampPrecision { .. }
| InvalidPrecisionOrScale { .. }
| InvalidJson { .. }
| InvalidJsonb { .. }
| InvalidVector { .. }
| InvalidFulltextOption { .. }
| InvalidSkippingIndexOption { .. } => StatusCode::InvalidArguments,

View File

@@ -44,7 +44,8 @@ pub use interval_type::{
IntervalDayTimeType, IntervalMonthDayNanoType, IntervalType, IntervalYearMonthType,
};
pub use json_type::{
JSON_TYPE_NAME, JsonType, json_type_value_to_string, parse_string_to_json_type_value,
JSON_TYPE_NAME, JsonType, json_type_value_to_serde_json, json_type_value_to_string,
parse_string_to_json_type_value,
};
pub use list_type::ListType;
pub use null_type::NullType;

View File

@@ -12,12 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::str::FromStr;
use arrow::datatypes::DataType as ArrowDataType;
use common_base::bytes::Bytes;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use crate::data_type::DataType;
use crate::error::{InvalidJsonSnafu, Result};
use crate::error::{DeserializeSnafu, InvalidJsonSnafu, InvalidJsonbSnafu, Result};
use crate::scalars::ScalarVectorBuilder;
use crate::type_id::LogicalTypeId;
use crate::value::Value;
@@ -83,7 +86,24 @@ impl DataType for JsonType {
/// Converts a json type value to string
pub fn json_type_value_to_string(val: &[u8], format: &JsonFormat) -> Result<String> {
match format {
JsonFormat::Jsonb => Ok(jsonb::to_string(val)),
JsonFormat::Jsonb => match jsonb::from_slice(val) {
Ok(jsonb_value) => {
let serialized = jsonb_value.to_string();
Ok(serialized)
}
Err(e) => InvalidJsonbSnafu { error: e }.fail(),
},
}
}
/// Converts a json type value to serde_json::Value
pub fn json_type_value_to_serde_json(val: &[u8], format: &JsonFormat) -> Result<serde_json::Value> {
match format {
JsonFormat::Jsonb => {
let json_string = json_type_value_to_string(val, format)?;
serde_json::Value::from_str(json_string.as_str())
.context(DeserializeSnafu { json: json_string })
}
}
}

View File

@@ -38,7 +38,6 @@ use snafu::{ResultExt, ensure};
use crate::error::{self, ConvertArrowArrayToScalarsSnafu, Error, Result, TryFromValueSnafu};
use crate::prelude::*;
use crate::schema::ColumnSchema;
use crate::type_id::LogicalTypeId;
use crate::types::{IntervalType, ListType};
use crate::vectors::ListVector;
@@ -1303,53 +1302,6 @@ impl<'a> From<Option<ListValueRef<'a>>> for ValueRef<'a> {
}
}
/// transform a [ValueRef] to a [serde_json::Value].
/// The json type will be handled specially
pub fn transform_value_ref_to_json_value<'a>(
value: ValueRef<'a>,
schema: &'a ColumnSchema,
) -> serde_json::Result<serde_json::Value> {
let json_value = match value {
ValueRef::Null => serde_json::Value::Null,
ValueRef::Boolean(v) => serde_json::Value::Bool(v),
ValueRef::UInt8(v) => serde_json::Value::from(v),
ValueRef::UInt16(v) => serde_json::Value::from(v),
ValueRef::UInt32(v) => serde_json::Value::from(v),
ValueRef::UInt64(v) => serde_json::Value::from(v),
ValueRef::Int8(v) => serde_json::Value::from(v),
ValueRef::Int16(v) => serde_json::Value::from(v),
ValueRef::Int32(v) => serde_json::Value::from(v),
ValueRef::Int64(v) => serde_json::Value::from(v),
ValueRef::Float32(v) => serde_json::Value::from(v.0),
ValueRef::Float64(v) => serde_json::Value::from(v.0),
ValueRef::String(bytes) => serde_json::Value::String(bytes.to_string()),
ValueRef::Binary(bytes) => {
if let ConcreteDataType::Json(_) = schema.data_type {
match jsonb::from_slice(bytes) {
Ok(json) => json.into(),
Err(e) => {
error!(e; "Failed to parse jsonb");
serde_json::Value::Null
}
}
} else {
serde_json::to_value(bytes)?
}
}
ValueRef::Date(v) => serde_json::Value::Number(v.val().into()),
ValueRef::List(v) => serde_json::to_value(v)?,
ValueRef::Timestamp(v) => serde_json::to_value(v.value())?,
ValueRef::Time(v) => serde_json::to_value(v.value())?,
ValueRef::IntervalYearMonth(v) => serde_json::Value::from(v),
ValueRef::IntervalDayTime(v) => serde_json::Value::from(v),
ValueRef::IntervalMonthDayNano(v) => serde_json::Value::from(v),
ValueRef::Duration(v) => serde_json::to_value(v.value())?,
ValueRef::Decimal128(v) => serde_json::to_value(v.to_string())?,
};
Ok(json_value)
}
/// Reference to a [ListValue].
///
/// Now comparison still requires some allocation (call of `to_value()`) and

View File

@@ -32,8 +32,9 @@ use common_telemetry::{debug, error, info};
use common_time::Timestamp;
use common_time::timestamp::TimeUnit;
use datatypes::data_type::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::SchemaRef;
use datatypes::value::transform_value_ref_to_json_value;
use datatypes::types::json_type_value_to_serde_json;
use event::{LogState, LogValidatorRef};
use futures::FutureExt;
use http::{HeaderValue, Method};
@@ -54,8 +55,8 @@ use self::result::table_result::TableResponse;
use crate::configurator::ConfiguratorRef;
use crate::elasticsearch;
use crate::error::{
AddressBindSnafu, AlreadyStartedSnafu, Error, InternalIoSnafu, InvalidHeaderValueSnafu, Result,
ToJsonSnafu,
AddressBindSnafu, AlreadyStartedSnafu, ConvertSqlValueSnafu, Error, InternalIoSnafu,
InvalidHeaderValueSnafu, Result, ToJsonSnafu,
};
use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2};
use crate::http::otlp::OtlpState;
@@ -299,8 +300,16 @@ impl HttpRecordsOutput {
// safety here: schemas length is equal to the number of columns in the recordbatch
let schema = &schemas[col_idx];
for row_idx in 0..recordbatch.num_rows() {
let value = transform_value_ref_to_json_value(col.get_ref(row_idx), schema)
.context(ToJsonSnafu)?;
let value = col.get(row_idx);
let value = if let ConcreteDataType::Json(json_type) = schema.data_type
&& let datatypes::value::Value::Binary(bytes) = value
{
json_type_value_to_serde_json(bytes.as_ref(), &json_type.format)
.context(ConvertSqlValueSnafu)?
} else {
serde_json::Value::try_from(col.get(row_idx)).context(ToJsonSnafu)?
};
rows[row_idx + finished_row_cursor].push(value);
}
}