mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-15 04:20:39 +00:00
refactor: make json_get adapted with JSON2
Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -2390,7 +2390,6 @@ dependencies = [
|
||||
"hyperloglogplus",
|
||||
"icu_properties",
|
||||
"jsonb",
|
||||
"jsonpath-rust 0.7.5",
|
||||
"memchr",
|
||||
"mito-codec",
|
||||
"nalgebra",
|
||||
|
||||
@@ -49,7 +49,6 @@ h3o = { version = "0.6", optional = true }
|
||||
hyperloglogplus = "0.4"
|
||||
icu_properties.workspace = true
|
||||
jsonb.workspace = true
|
||||
jsonpath-rust = "0.7.5"
|
||||
memchr = "2.7"
|
||||
mito-codec.workspace = true
|
||||
nalgebra.workspace = true
|
||||
|
||||
@@ -12,24 +12,22 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::array::{ArrayRef, BinaryViewArray, StringViewArray, StructArray};
|
||||
use arrow::array::{ArrayRef, BinaryViewArray, new_null_array};
|
||||
use arrow::compute;
|
||||
use arrow::datatypes::{Float64Type, Int64Type, UInt64Type};
|
||||
use arrow::datatypes::Float64Type;
|
||||
use arrow_schema::Field;
|
||||
use datafusion_common::arrow::array::{
|
||||
Array, AsArray, BinaryViewBuilder, BooleanBuilder, Float64Builder, Int64Builder,
|
||||
StringViewBuilder,
|
||||
};
|
||||
use datafusion_common::arrow::datatypes::DataType;
|
||||
use datafusion_common::{DataFusionError, Result};
|
||||
use datafusion_common::{DataFusionError, Result, ScalarValue, exec_datafusion_err, exec_err};
|
||||
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
|
||||
use datatypes::arrow_array::{int_array_value_at_index, string_array_value_at_index};
|
||||
use datatypes::json::JsonStructureSettings;
|
||||
use datatypes::vectors::json::array::JsonArray;
|
||||
use derive_more::Display;
|
||||
use jsonpath_rust::JsonPath;
|
||||
use serde_json::Value;
|
||||
|
||||
use crate::function::{Function, extract_args};
|
||||
@@ -52,6 +50,7 @@ fn get_json_by_path(json: &[u8], path: &str) -> Option<Vec<u8>> {
|
||||
|
||||
enum JsonResultValue<'a> {
|
||||
Jsonb(Vec<u8>),
|
||||
#[expect(unused)]
|
||||
JsonStructByColumn(&'a ArrayRef, usize),
|
||||
JsonStructByValue(&'a Value),
|
||||
}
|
||||
@@ -64,57 +63,27 @@ trait JsonGetResultBuilder {
|
||||
fn build(&mut self) -> ArrayRef;
|
||||
}
|
||||
|
||||
/// Common implementation for JSON get scalar functions.
|
||||
///
|
||||
/// `JsonGet` encapsulates the logic for extracting values from JSON inputs
|
||||
/// based on a path expression. Different JSON get functions reuse this
|
||||
/// implementation by supplying their own `JsonGetResultBuilder` to control
|
||||
/// how the resulting values are materialized into an Arrow array.
|
||||
#[derive(Debug)]
|
||||
struct JsonGet {
|
||||
signature: Signature,
|
||||
}
|
||||
|
||||
impl JsonGet {
|
||||
fn invoke<F, B>(&self, args: ScalarFunctionArgs, builder_factory: F) -> Result<ColumnarValue>
|
||||
where
|
||||
F: Fn(usize) -> B,
|
||||
B: JsonGetResultBuilder,
|
||||
{
|
||||
let [arg0, arg1] = extract_args("JSON_GET", &args)?;
|
||||
|
||||
let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
|
||||
let paths = arg1.as_string_view();
|
||||
|
||||
let mut builder = (builder_factory)(arg0.len());
|
||||
match arg0.data_type() {
|
||||
DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
|
||||
let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
|
||||
let jsons = arg0.as_binary_view();
|
||||
jsonb_get(jsons, paths, &mut builder)?;
|
||||
fn result_builder(
|
||||
len: usize,
|
||||
with_type: Option<&DataType>,
|
||||
) -> Result<Box<dyn JsonGetResultBuilder>> {
|
||||
let builder = if let Some(t) = with_type {
|
||||
match t {
|
||||
DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
|
||||
Box::new(StringResultBuilder(StringViewBuilder::with_capacity(len)))
|
||||
as Box<dyn JsonGetResultBuilder>
|
||||
}
|
||||
DataType::Struct(_) => {
|
||||
let jsons = arg0.as_struct();
|
||||
json_struct_get(jsons, paths, &mut builder)?
|
||||
DataType::Int64 => Box::new(IntResultBuilder(Int64Builder::with_capacity(len))),
|
||||
DataType::Float64 => Box::new(FloatResultBuilder(Float64Builder::with_capacity(len))),
|
||||
DataType::Boolean => Box::new(BoolResultBuilder(BooleanBuilder::with_capacity(len))),
|
||||
t => {
|
||||
return exec_err!("json_get with unknown type {t}");
|
||||
}
|
||||
_ => {
|
||||
return Err(DataFusionError::Execution(format!(
|
||||
"JSON_GET not supported argument type {}",
|
||||
arg0.data_type(),
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
Ok(ColumnarValue::Array(builder.build()))
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for JsonGet {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
signature: Signature::any(2, Volatility::Immutable),
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Box::new(StringResultBuilder(StringViewBuilder::with_capacity(len)))
|
||||
};
|
||||
Ok(builder)
|
||||
}
|
||||
|
||||
// TODO: refactor this to StringLikeArrayBuilder from Arrow 57
|
||||
@@ -154,7 +123,7 @@ impl JsonGetResultBuilder for StringResultBuilder {
|
||||
|
||||
#[derive(Default, Display, Debug)]
|
||||
#[display("{}", Self::NAME.to_ascii_uppercase())]
|
||||
pub struct JsonGetString(JsonGet);
|
||||
pub struct JsonGetString(JsonGetWithType);
|
||||
|
||||
impl JsonGetString {
|
||||
pub const NAME: &'static str = "json_get_string";
|
||||
@@ -173,10 +142,10 @@ impl Function for JsonGetString {
|
||||
&self.0.signature
|
||||
}
|
||||
|
||||
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
|
||||
self.0.invoke(args, |len: usize| {
|
||||
StringResultBuilder(StringViewBuilder::with_capacity(len))
|
||||
})
|
||||
fn invoke_with_args(&self, mut args: ScalarFunctionArgs) -> Result<ColumnarValue> {
|
||||
args.args
|
||||
.push(ColumnarValue::Scalar(ScalarValue::Utf8View(None)));
|
||||
self.0.invoke_with_args(args)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -205,7 +174,7 @@ impl JsonGetResultBuilder for IntResultBuilder {
|
||||
|
||||
#[derive(Default, Display, Debug)]
|
||||
#[display("{}", Self::NAME.to_ascii_uppercase())]
|
||||
pub struct JsonGetInt(JsonGet);
|
||||
pub struct JsonGetInt(JsonGetWithType);
|
||||
|
||||
impl JsonGetInt {
|
||||
pub const NAME: &'static str = "json_get_int";
|
||||
@@ -224,10 +193,10 @@ impl Function for JsonGetInt {
|
||||
&self.0.signature
|
||||
}
|
||||
|
||||
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
|
||||
self.0.invoke(args, |len: usize| {
|
||||
IntResultBuilder(Int64Builder::with_capacity(len))
|
||||
})
|
||||
fn invoke_with_args(&self, mut args: ScalarFunctionArgs) -> Result<ColumnarValue> {
|
||||
args.args
|
||||
.push(ColumnarValue::Scalar(ScalarValue::Int64(None)));
|
||||
self.0.invoke_with_args(args)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -264,7 +233,7 @@ impl JsonGetResultBuilder for FloatResultBuilder {
|
||||
|
||||
#[derive(Default, Display, Debug)]
|
||||
#[display("{}", Self::NAME.to_ascii_uppercase())]
|
||||
pub struct JsonGetFloat(JsonGet);
|
||||
pub struct JsonGetFloat(JsonGetWithType);
|
||||
|
||||
impl JsonGetFloat {
|
||||
pub const NAME: &'static str = "json_get_float";
|
||||
@@ -283,10 +252,10 @@ impl Function for JsonGetFloat {
|
||||
&self.0.signature
|
||||
}
|
||||
|
||||
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
|
||||
self.0.invoke(args, |len: usize| {
|
||||
FloatResultBuilder(Float64Builder::with_capacity(len))
|
||||
})
|
||||
fn invoke_with_args(&self, mut args: ScalarFunctionArgs) -> Result<ColumnarValue> {
|
||||
args.args
|
||||
.push(ColumnarValue::Scalar(ScalarValue::Float64(None)));
|
||||
self.0.invoke_with_args(args)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -323,7 +292,7 @@ impl JsonGetResultBuilder for BoolResultBuilder {
|
||||
|
||||
#[derive(Default, Display, Debug)]
|
||||
#[display("{}", Self::NAME.to_ascii_uppercase())]
|
||||
pub struct JsonGetBool(JsonGet);
|
||||
pub struct JsonGetBool(JsonGetWithType);
|
||||
|
||||
impl JsonGetBool {
|
||||
pub const NAME: &'static str = "json_get_bool";
|
||||
@@ -342,24 +311,23 @@ impl Function for JsonGetBool {
|
||||
&self.0.signature
|
||||
}
|
||||
|
||||
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
|
||||
self.0.invoke(args, |len: usize| {
|
||||
BoolResultBuilder(BooleanBuilder::with_capacity(len))
|
||||
})
|
||||
fn invoke_with_args(&self, mut args: ScalarFunctionArgs) -> Result<ColumnarValue> {
|
||||
args.args
|
||||
.push(ColumnarValue::Scalar(ScalarValue::Boolean(None)));
|
||||
self.0.invoke_with_args(args)
|
||||
}
|
||||
}
|
||||
|
||||
fn jsonb_get(
|
||||
jsons: &BinaryViewArray,
|
||||
paths: &StringViewArray,
|
||||
path: &str,
|
||||
builder: &mut dyn JsonGetResultBuilder,
|
||||
) -> Result<()> {
|
||||
let size = jsons.len();
|
||||
for i in 0..size {
|
||||
let json = jsons.is_valid(i).then(|| jsons.value(i));
|
||||
let path = paths.is_valid(i).then(|| paths.value(i));
|
||||
let result = match (json, path) {
|
||||
(Some(json), Some(path)) => get_json_by_path(json, path),
|
||||
let result = match json {
|
||||
Some(json) => get_json_by_path(json, path),
|
||||
_ => None,
|
||||
};
|
||||
if let Some(v) = result {
|
||||
@@ -371,141 +339,99 @@ fn jsonb_get(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn json_struct_get(
|
||||
jsons: &StructArray,
|
||||
paths: &StringViewArray,
|
||||
builder: &mut dyn JsonGetResultBuilder,
|
||||
) -> Result<()> {
|
||||
let size = jsons.len();
|
||||
for i in 0..size {
|
||||
if jsons.is_null(i) || paths.is_null(i) {
|
||||
builder.append_null();
|
||||
continue;
|
||||
fn json_struct_get(array: &ArrayRef, path: &str, with_type: Option<&DataType>) -> Result<ArrayRef> {
|
||||
let path = path.trim_start_matches("$");
|
||||
|
||||
// Fast path: if the JSON array fields can be directly indexed into by the `path`, simply get
|
||||
// the sub-array (`column_by_name`).
|
||||
let mut direct = true;
|
||||
let mut current = array;
|
||||
for segment in path.split(".").filter(|s| !s.is_empty()) {
|
||||
if matches!(current.data_type(), DataType::Binary) {
|
||||
direct = false;
|
||||
break;
|
||||
}
|
||||
let path = paths.value(i);
|
||||
|
||||
// naively assume the JSON path is our kind of indexing to the field, by removing its "root"
|
||||
let field_path = path.trim().replace("$.", "");
|
||||
let column = jsons.column_by_name(&field_path);
|
||||
let Some(json) = current.as_struct_opt() else {
|
||||
return exec_err!("unknown JSON array datatype: {}", current.data_type());
|
||||
};
|
||||
let Some(sub_json) = json.column_by_name(segment) else {
|
||||
return Ok(new_null_array(
|
||||
with_type.unwrap_or(&DataType::Null),
|
||||
array.len(),
|
||||
));
|
||||
};
|
||||
current = sub_json;
|
||||
}
|
||||
|
||||
if let Some(column) = column {
|
||||
builder.append_value(JsonResultValue::JsonStructByColumn(column, i))?;
|
||||
} else {
|
||||
let Some(raw) = jsons
|
||||
.column_by_name(JsonStructureSettings::RAW_FIELD)
|
||||
.and_then(|x| string_array_value_at_index(x, i))
|
||||
else {
|
||||
// Build the result array with optional value mapper.
|
||||
fn build_with<F>(
|
||||
input: &ArrayRef,
|
||||
with_type: Option<&DataType>,
|
||||
value_mapper: F,
|
||||
) -> Result<ArrayRef>
|
||||
where
|
||||
for<'a> F: Fn(&'a Value) -> Option<&'a Value>,
|
||||
{
|
||||
let json_array = JsonArray::from(input);
|
||||
|
||||
let mut builder = result_builder(input.len(), with_type)?;
|
||||
for i in 0..input.len() {
|
||||
if input.is_null(i) {
|
||||
builder.append_null();
|
||||
continue;
|
||||
};
|
||||
}
|
||||
|
||||
let path: JsonPath<Value> = JsonPath::try_from(path).map_err(|e| {
|
||||
DataFusionError::Execution(format!("{path} is not a valid JSON path: {e}"))
|
||||
})?;
|
||||
// the wanted field is not retrievable from the JSON struct columns directly, we have
|
||||
// to combine everything (columns and the "_raw") into a complete JSON value to find it
|
||||
let value = json_struct_to_value(raw, jsons, i)?;
|
||||
let value = json_array
|
||||
.try_get_value(i)
|
||||
.map_err(|e| exec_datafusion_err!("{e}"))?;
|
||||
let value = value_mapper(&value);
|
||||
|
||||
match path.find(&value) {
|
||||
Value::Null => builder.append_null(),
|
||||
Value::Array(values) => match values.as_slice() {
|
||||
[] => builder.append_null(),
|
||||
[x] => builder.append_value(JsonResultValue::JsonStructByValue(x))?,
|
||||
_ => builder.append_value(JsonResultValue::JsonStructByValue(&value))?,
|
||||
},
|
||||
value => builder.append_value(JsonResultValue::JsonStructByValue(&value))?,
|
||||
if let Some(value) = value {
|
||||
builder.append_value(JsonResultValue::JsonStructByValue(value))?;
|
||||
} else {
|
||||
builder.append_null();
|
||||
}
|
||||
}
|
||||
Ok(builder.build())
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn json_struct_to_value(raw: &str, jsons: &StructArray, i: usize) -> Result<Value> {
|
||||
let Ok(mut json) = Value::from_str(raw) else {
|
||||
return Err(DataFusionError::Internal(format!(
|
||||
"inner field '{}' is not a valid JSON string",
|
||||
JsonStructureSettings::RAW_FIELD
|
||||
)));
|
||||
};
|
||||
|
||||
for (column_name, column) in jsons.column_names().into_iter().zip(jsons.columns()) {
|
||||
if column_name == JsonStructureSettings::RAW_FIELD {
|
||||
continue;
|
||||
}
|
||||
|
||||
let (json_pointer, field) = if let Some((json_object, field)) = column_name.rsplit_once(".")
|
||||
if direct {
|
||||
let casted = if let Some(with_type) = with_type
|
||||
&& current.data_type() != with_type
|
||||
{
|
||||
let json_pointer = format!("/{}", json_object.replace(".", "/"));
|
||||
(json_pointer, field)
|
||||
} else {
|
||||
("".to_string(), column_name)
|
||||
};
|
||||
let Some(json_object) = json
|
||||
.pointer_mut(&json_pointer)
|
||||
.and_then(|x| x.as_object_mut())
|
||||
else {
|
||||
return Err(DataFusionError::Internal(format!(
|
||||
"value at JSON pointer '{}' is not an object",
|
||||
json_pointer
|
||||
)));
|
||||
};
|
||||
|
||||
macro_rules! insert {
|
||||
($column: ident, $i: ident, $json_object: ident, $field: ident) => {{
|
||||
if let Some(value) = $column
|
||||
.is_valid($i)
|
||||
.then(|| serde_json::Value::from($column.value($i)))
|
||||
{
|
||||
$json_object.insert($field.to_string(), value);
|
||||
match (current.data_type(), with_type) {
|
||||
(DataType::Binary, _) => {
|
||||
// Fall back to the slow path if the found JSON sub-array is serialized to bytes
|
||||
// (because of JSON type conflicting)
|
||||
build_with(current, Some(with_type), |v| Some(v))?
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
match column.data_type() {
|
||||
// boolean => Value::Bool
|
||||
DataType::Boolean => {
|
||||
let column = column.as_boolean();
|
||||
insert!(column, i, json_object, field);
|
||||
(DataType::List(_) | DataType::Struct(_), with_type) if with_type.is_string() => {
|
||||
// Special handle for wanted array is string (Arrow cast is not working here if
|
||||
// the datatype is list or struct), because it could be used in displaying the
|
||||
// result.
|
||||
build_with(current, Some(with_type), |v| Some(v))?
|
||||
}
|
||||
(_, with_type) if with_type.is_string() => {
|
||||
// Same special handle for wanted array is string as above, except for simply
|
||||
// casting by Arrow is more desirable.
|
||||
arrow_cast::cast(current.as_ref(), with_type)?
|
||||
}
|
||||
_ => new_null_array(with_type, current.len()),
|
||||
}
|
||||
// int => Value::Number
|
||||
DataType::Int64 => {
|
||||
let column = column.as_primitive::<Int64Type>();
|
||||
insert!(column, i, json_object, field);
|
||||
}
|
||||
DataType::UInt64 => {
|
||||
let column = column.as_primitive::<UInt64Type>();
|
||||
insert!(column, i, json_object, field);
|
||||
}
|
||||
DataType::Float64 => {
|
||||
let column = column.as_primitive::<Float64Type>();
|
||||
insert!(column, i, json_object, field);
|
||||
}
|
||||
// string => Value::String
|
||||
DataType::Utf8 => {
|
||||
let column = column.as_string::<i32>();
|
||||
insert!(column, i, json_object, field);
|
||||
}
|
||||
DataType::LargeUtf8 => {
|
||||
let column = column.as_string::<i64>();
|
||||
insert!(column, i, json_object, field);
|
||||
}
|
||||
DataType::Utf8View => {
|
||||
let column = column.as_string_view();
|
||||
insert!(column, i, json_object, field);
|
||||
}
|
||||
// other => Value::Array and Value::Object
|
||||
_ => {
|
||||
return Err(DataFusionError::NotImplemented(format!(
|
||||
"{} is not yet supported to be executed with field {} of datatype {}",
|
||||
JsonGetString::NAME,
|
||||
column_name,
|
||||
column.data_type()
|
||||
)));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
current.clone()
|
||||
};
|
||||
return Ok(casted);
|
||||
}
|
||||
Ok(json)
|
||||
|
||||
// Slow path: reconstruct the JSON array from serialized representation of conflicting JSON
|
||||
// values: `serde_json::Value`.
|
||||
let mut pointer = path.replace(".", "/");
|
||||
if !pointer.starts_with("/") {
|
||||
pointer = format!("/{}", pointer);
|
||||
}
|
||||
build_with(array, with_type, |value| value.pointer(&pointer))
|
||||
}
|
||||
|
||||
/// This function is mostly called as `json_get(value, 'attr')::type` and rewritten by
|
||||
@@ -565,38 +491,36 @@ impl Function for JsonGetWithType {
|
||||
&self,
|
||||
args: ScalarFunctionArgs,
|
||||
) -> datafusion_common::Result<ColumnarValue> {
|
||||
let [arg0, arg1, _] = extract_args(self.name(), &args)?;
|
||||
let args_len = args.args.len();
|
||||
if args_len != 2 && args_len != 3 {
|
||||
return exec_err!("json_get expects 2 or 3 arguments, got {args_len}");
|
||||
}
|
||||
|
||||
let arg0 = args.args[0].to_array(args.number_rows)?;
|
||||
let len = arg0.len();
|
||||
|
||||
let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
|
||||
let paths = arg1.as_string_view();
|
||||
|
||||
// mapping datatypes returned from return_field_from_args
|
||||
let mut builder: Box<dyn JsonGetResultBuilder> = match args.return_field.data_type() {
|
||||
DataType::Utf8View => {
|
||||
Box::new(StringResultBuilder(StringViewBuilder::with_capacity(len)))
|
||||
}
|
||||
DataType::Int64 => Box::new(IntResultBuilder(Int64Builder::with_capacity(len))),
|
||||
DataType::Float64 => Box::new(FloatResultBuilder(Float64Builder::with_capacity(len))),
|
||||
DataType::Boolean => Box::new(BoolResultBuilder(BooleanBuilder::with_capacity(len))),
|
||||
_type => {
|
||||
return Err(DataFusionError::Internal(format!(
|
||||
"Unsupported return type {}",
|
||||
_type
|
||||
)));
|
||||
}
|
||||
let path = if let ColumnarValue::Scalar(path) = &args.args[1]
|
||||
&& let Some(Some(path)) = path.try_as_str()
|
||||
{
|
||||
path
|
||||
} else {
|
||||
return exec_err!(
|
||||
r#"json_get expects a string literal "path" argument, got {}"#,
|
||||
args.args[1]
|
||||
);
|
||||
};
|
||||
|
||||
match arg0.data_type() {
|
||||
let with_type = args.args.get(2).map(|x| x.data_type());
|
||||
let result = match arg0.data_type() {
|
||||
DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
|
||||
let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
|
||||
let jsons = arg0.as_binary_view();
|
||||
jsonb_get(jsons, paths, builder.as_mut())?;
|
||||
}
|
||||
DataType::Struct(_) => {
|
||||
let jsons = arg0.as_struct();
|
||||
json_struct_get(jsons, paths, builder.as_mut())?;
|
||||
|
||||
let mut builder = result_builder(len, with_type.as_ref())?;
|
||||
jsonb_get(jsons, path, builder.as_mut())?;
|
||||
builder.build()
|
||||
}
|
||||
DataType::Struct(_) => json_struct_get(&arg0, path, with_type.as_ref())?,
|
||||
_ => {
|
||||
return Err(DataFusionError::Execution(format!(
|
||||
"JSON_GET not supported argument type {}",
|
||||
@@ -605,7 +529,7 @@ impl Function for JsonGetWithType {
|
||||
}
|
||||
};
|
||||
|
||||
Ok(ColumnarValue::Array(builder.build()))
|
||||
Ok(ColumnarValue::Array(result))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -686,8 +610,8 @@ impl Function for JsonGetObject {
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::array::{Float64Array, Int64Array, StructArray};
|
||||
use arrow_schema::Field;
|
||||
use arrow::array::{BooleanArray, Int64Array, StructArray};
|
||||
use arrow_schema::{Field, Fields};
|
||||
use datafusion_common::ScalarValue;
|
||||
use datafusion_common::arrow::array::{BinaryArray, BinaryViewArray, StringArray};
|
||||
use datafusion_common::arrow::datatypes::{Float64Type, Int64Type};
|
||||
@@ -712,29 +636,35 @@ mod tests {
|
||||
/// }
|
||||
/// ```
|
||||
fn test_json_struct() -> ArrayRef {
|
||||
let payload_fields = Fields::from(vec![
|
||||
Field::new("code", DataType::Int64, true),
|
||||
Field::new("success", DataType::Boolean, true),
|
||||
Field::new("result", DataType::Binary, true),
|
||||
]);
|
||||
Arc::new(StructArray::new(
|
||||
vec![
|
||||
Field::new("kind", DataType::Utf8, true),
|
||||
Field::new("payload.code", DataType::Int64, true),
|
||||
Field::new("payload.result.time_cost", DataType::Float64, true),
|
||||
Field::new(JsonStructureSettings::RAW_FIELD, DataType::Utf8View, true),
|
||||
Field::new("payload", DataType::Struct(payload_fields.clone()), true),
|
||||
]
|
||||
.into(),
|
||||
vec![
|
||||
Arc::new(StringArray::from_iter([Some("foo")])) as ArrayRef,
|
||||
Arc::new(Int64Array::from_iter([Some(404)])),
|
||||
Arc::new(Float64Array::from_iter([Some(1.234)])),
|
||||
Arc::new(StringViewArray::from_iter([Some(
|
||||
json! ({
|
||||
"payload": {
|
||||
"success": false,
|
||||
"result": {
|
||||
"error": "not found"
|
||||
}
|
||||
}
|
||||
})
|
||||
.to_string(),
|
||||
)])),
|
||||
Arc::new(StructArray::new(
|
||||
payload_fields,
|
||||
vec![
|
||||
Arc::new(Int64Array::from_iter([Some(404)])) as ArrayRef,
|
||||
Arc::new(BooleanArray::from_iter([Some(false)])),
|
||||
Arc::new(BinaryArray::from_iter([Some(
|
||||
json!({
|
||||
"error": "not found",
|
||||
"time_cost": 1.234
|
||||
})
|
||||
.to_string()
|
||||
.as_bytes(),
|
||||
)])),
|
||||
],
|
||||
None,
|
||||
)),
|
||||
],
|
||||
None,
|
||||
))
|
||||
@@ -1156,7 +1086,7 @@ mod tests {
|
||||
args: vec![
|
||||
ColumnarValue::Array(json.clone()),
|
||||
ColumnarValue::Scalar(path.into()),
|
||||
ColumnarValue::Scalar(ScalarValue::Utf8(Some("string".to_string()))),
|
||||
ColumnarValue::Scalar(ScalarValue::Utf8View(None)),
|
||||
],
|
||||
arg_fields: vec![],
|
||||
number_rows: 1,
|
||||
@@ -1194,7 +1124,7 @@ mod tests {
|
||||
args: vec![
|
||||
ColumnarValue::Array(json),
|
||||
ColumnarValue::Scalar((*path).into()),
|
||||
ColumnarValue::Scalar(ScalarValue::Utf8(Some("int".to_string()))),
|
||||
ColumnarValue::Scalar(ScalarValue::Int64(None)),
|
||||
],
|
||||
arg_fields: vec![],
|
||||
number_rows: 1,
|
||||
@@ -1232,7 +1162,7 @@ mod tests {
|
||||
args: vec![
|
||||
ColumnarValue::Array(json),
|
||||
ColumnarValue::Scalar((*path).into()),
|
||||
ColumnarValue::Scalar(ScalarValue::Utf8(Some("float".to_string()))),
|
||||
ColumnarValue::Scalar(ScalarValue::Float64(None)),
|
||||
],
|
||||
arg_fields: vec![],
|
||||
number_rows: 1,
|
||||
@@ -1270,7 +1200,7 @@ mod tests {
|
||||
args: vec![
|
||||
ColumnarValue::Array(json),
|
||||
ColumnarValue::Scalar((*path).into()),
|
||||
ColumnarValue::Scalar(ScalarValue::Utf8(Some("bool".to_string()))),
|
||||
ColumnarValue::Scalar(ScalarValue::Boolean(None)),
|
||||
],
|
||||
arg_fields: vec![],
|
||||
number_rows: 1,
|
||||
|
||||
@@ -18,18 +18,78 @@ use std::sync::Arc;
|
||||
use arrow::compute;
|
||||
use arrow::util::display::{ArrayFormatter, FormatOptions};
|
||||
use arrow_array::cast::AsArray;
|
||||
use arrow_array::types::{Float64Type, Int64Type, UInt64Type};
|
||||
use arrow_array::{Array, ArrayRef, GenericListArray, ListArray, StructArray, new_null_array};
|
||||
use arrow_schema::{DataType, FieldRef};
|
||||
use serde_json::Value;
|
||||
use snafu::{OptionExt, ResultExt, ensure};
|
||||
|
||||
use crate::arrow_array::StringArray;
|
||||
use crate::error::{AlignJsonArraySnafu, ArrowComputeSnafu, Result};
|
||||
use crate::arrow_array::{StringArray, binary_array_value, string_array_value};
|
||||
use crate::error::{
|
||||
AlignJsonArraySnafu, ArrowComputeSnafu, DeserializeSnafu, InvalidJsonSnafu, Result,
|
||||
};
|
||||
|
||||
pub struct JsonArray<'a> {
|
||||
inner: &'a ArrayRef,
|
||||
}
|
||||
|
||||
impl JsonArray<'_> {
|
||||
/// Try to get the value (as a [Value]) at the index `i`.
|
||||
pub fn try_get_value(&self, i: usize) -> Result<Value> {
|
||||
let array = self.inner;
|
||||
if array.is_null(i) {
|
||||
return Ok(Value::Null);
|
||||
}
|
||||
|
||||
let value = match array.data_type() {
|
||||
DataType::Null => Value::Null,
|
||||
DataType::Boolean => Value::Bool(array.as_boolean().value(i)),
|
||||
DataType::Int64 => Value::from(array.as_primitive::<Int64Type>().value(i)),
|
||||
DataType::UInt64 => Value::from(array.as_primitive::<UInt64Type>().value(i)),
|
||||
DataType::Float64 => Value::from(array.as_primitive::<Float64Type>().value(i)),
|
||||
DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
|
||||
Value::String(string_array_value(array, i).to_string())
|
||||
}
|
||||
DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
|
||||
let bytes = binary_array_value(array, i);
|
||||
serde_json::from_slice(bytes).with_context(|_| DeserializeSnafu {
|
||||
json: String::from_utf8_lossy(bytes),
|
||||
})?
|
||||
}
|
||||
DataType::Struct(_) => {
|
||||
let structs = array.as_struct();
|
||||
let object = structs
|
||||
.fields()
|
||||
.iter()
|
||||
.zip(structs.columns())
|
||||
.map(|(field, column)| {
|
||||
JsonArray::from(column)
|
||||
.try_get_value(i)
|
||||
.map(|v| (field.name().clone(), v))
|
||||
})
|
||||
.collect::<Result<_>>()?;
|
||||
Value::Object(object)
|
||||
}
|
||||
DataType::List(_) => {
|
||||
let lists = array.as_list::<i32>();
|
||||
let list = lists.value(i);
|
||||
let list = JsonArray::from(&list);
|
||||
let mut values = Vec::with_capacity(list.inner.len());
|
||||
for i in 0..list.inner.len() {
|
||||
values.push(list.try_get_value(i)?);
|
||||
}
|
||||
Value::Array(values)
|
||||
}
|
||||
t => {
|
||||
return InvalidJsonSnafu {
|
||||
value: format!("unknown JSON type {t}"),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
};
|
||||
Ok(value)
|
||||
}
|
||||
|
||||
/// Align a JSON array to the `expect` data type. The `expect` data type is often the "largest"
|
||||
/// JSON type after some insertions in the table schema, while the JSON array previously
|
||||
/// written in the SST could be lagged behind it. So it's important to "align" the JSON array by
|
||||
@@ -189,11 +249,90 @@ impl<'a> From<&'a ArrayRef> for JsonArray<'a> {
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use arrow_array::types::Int64Type;
|
||||
use arrow_array::{BooleanArray, Float64Array, Int64Array, ListArray};
|
||||
use arrow_array::{BinaryArray, BooleanArray, Float64Array, Int32Array, Int64Array, ListArray};
|
||||
use arrow_schema::{Field, Fields};
|
||||
use serde_json::json;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_try_get_value() -> Result<()> {
|
||||
let nulls = new_null_array(&DataType::Null, 2);
|
||||
assert_eq!(JsonArray::from(&nulls).try_get_value(0)?, Value::Null);
|
||||
|
||||
let bools: ArrayRef = Arc::new(BooleanArray::from(vec![Some(true), None]));
|
||||
assert_eq!(JsonArray::from(&bools).try_get_value(0)?, json!(true));
|
||||
assert_eq!(JsonArray::from(&bools).try_get_value(1)?, Value::Null);
|
||||
|
||||
let ints: ArrayRef = Arc::new(Int64Array::from(vec![Some(-7), None]));
|
||||
assert_eq!(JsonArray::from(&ints).try_get_value(0)?, json!(-7));
|
||||
assert_eq!(JsonArray::from(&ints).try_get_value(1)?, Value::Null);
|
||||
|
||||
let floats: ArrayRef = Arc::new(Float64Array::from(vec![Some(1.5)]));
|
||||
assert_eq!(JsonArray::from(&floats).try_get_value(0)?, json!(1.5));
|
||||
|
||||
let strings: ArrayRef = Arc::new(StringArray::from(vec![Some("hello"), None]));
|
||||
assert_eq!(JsonArray::from(&strings).try_get_value(0)?, json!("hello"));
|
||||
assert_eq!(JsonArray::from(&strings).try_get_value(1)?, Value::Null);
|
||||
|
||||
let binaries: ArrayRef = Arc::new(BinaryArray::from(vec![
|
||||
br#"{"nested":[1,null,"x"]}"#.as_slice(),
|
||||
b"null".as_slice(),
|
||||
]));
|
||||
assert_eq!(
|
||||
JsonArray::from(&binaries).try_get_value(0)?,
|
||||
json!({"nested": [1, null, "x"]})
|
||||
);
|
||||
assert_eq!(JsonArray::from(&binaries).try_get_value(1)?, Value::Null);
|
||||
|
||||
let lists: ArrayRef = Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
|
||||
Some(vec![Some(1), None, Some(3)]),
|
||||
None,
|
||||
]));
|
||||
assert_eq!(
|
||||
JsonArray::from(&lists).try_get_value(0)?,
|
||||
json!([1, null, 3])
|
||||
);
|
||||
assert_eq!(JsonArray::from(&lists).try_get_value(1)?, Value::Null);
|
||||
|
||||
let structs: ArrayRef = Arc::new(StructArray::from(vec![
|
||||
(
|
||||
Arc::new(Field::new("flag", DataType::Boolean, true)),
|
||||
Arc::new(BooleanArray::from(vec![Some(true), None])) as ArrayRef,
|
||||
),
|
||||
(
|
||||
Arc::new(Field::new_list(
|
||||
"items",
|
||||
Field::new_list_field(DataType::Int64, true),
|
||||
true,
|
||||
)),
|
||||
Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
|
||||
Some(vec![Some(1), None]),
|
||||
Some(vec![Some(2)]),
|
||||
])) as ArrayRef,
|
||||
),
|
||||
]));
|
||||
assert_eq!(
|
||||
JsonArray::from(&structs).try_get_value(0)?,
|
||||
json!({"flag": true, "items": [1, null]})
|
||||
);
|
||||
assert_eq!(
|
||||
JsonArray::from(&structs).try_get_value(1)?,
|
||||
json!({"flag": null, "items": [2]})
|
||||
);
|
||||
|
||||
let unsupported: ArrayRef = Arc::new(Int32Array::from(vec![1]));
|
||||
assert_eq!(
|
||||
JsonArray::from(&unsupported)
|
||||
.try_get_value(0)
|
||||
.unwrap_err()
|
||||
.to_string(),
|
||||
"Invalid JSON: unknown JSON type Int32"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_align_json_array() -> Result<()> {
|
||||
struct TestCase {
|
||||
|
||||
Reference in New Issue
Block a user