mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-27 16:32:54 +00:00
Compare commits
1 Commits
main
...
feat/impl-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
48c5a9cdaf |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -2190,6 +2190,7 @@ dependencies = [
|
|||||||
"approx 0.5.1",
|
"approx 0.5.1",
|
||||||
"arc-swap",
|
"arc-swap",
|
||||||
"arrow",
|
"arrow",
|
||||||
|
"arrow-cast",
|
||||||
"arrow-schema",
|
"arrow-schema",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"bincode",
|
"bincode",
|
||||||
@@ -2220,6 +2221,7 @@ dependencies = [
|
|||||||
"h3o",
|
"h3o",
|
||||||
"hyperloglogplus",
|
"hyperloglogplus",
|
||||||
"jsonb",
|
"jsonb",
|
||||||
|
"jsonpath-rust 0.7.5",
|
||||||
"memchr",
|
"memchr",
|
||||||
"mito-codec",
|
"mito-codec",
|
||||||
"nalgebra",
|
"nalgebra",
|
||||||
|
|||||||
@@ -103,6 +103,7 @@ aquamarine = "0.6"
|
|||||||
arrow = { version = "56.2", features = ["prettyprint"] }
|
arrow = { version = "56.2", features = ["prettyprint"] }
|
||||||
arrow-array = { version = "56.2", default-features = false, features = ["chrono-tz"] }
|
arrow-array = { version = "56.2", default-features = false, features = ["chrono-tz"] }
|
||||||
arrow-buffer = "56.2"
|
arrow-buffer = "56.2"
|
||||||
|
arrow-cast = "56.2"
|
||||||
arrow-flight = "56.2"
|
arrow-flight = "56.2"
|
||||||
arrow-ipc = { version = "56.2", default-features = false, features = ["lz4", "zstd"] }
|
arrow-ipc = { version = "56.2", default-features = false, features = ["lz4", "zstd"] }
|
||||||
arrow-schema = { version = "56.2", features = ["serde"] }
|
arrow-schema = { version = "56.2", features = ["serde"] }
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ ahash.workspace = true
|
|||||||
api.workspace = true
|
api.workspace = true
|
||||||
arc-swap = "1.0"
|
arc-swap = "1.0"
|
||||||
arrow.workspace = true
|
arrow.workspace = true
|
||||||
|
arrow-cast.workspace = true
|
||||||
arrow-schema.workspace = true
|
arrow-schema.workspace = true
|
||||||
async-trait.workspace = true
|
async-trait.workspace = true
|
||||||
bincode = "=1.3.3"
|
bincode = "=1.3.3"
|
||||||
@@ -46,6 +47,7 @@ geohash = { version = "0.13", optional = true }
|
|||||||
h3o = { version = "0.6", optional = true }
|
h3o = { version = "0.6", optional = true }
|
||||||
hyperloglogplus = "0.4"
|
hyperloglogplus = "0.4"
|
||||||
jsonb.workspace = true
|
jsonb.workspace = true
|
||||||
|
jsonpath-rust = "0.7.5"
|
||||||
memchr = "2.7"
|
memchr = "2.7"
|
||||||
mito-codec.workspace = true
|
mito-codec.workspace = true
|
||||||
nalgebra.workspace = true
|
nalgebra.workspace = true
|
||||||
|
|||||||
@@ -13,17 +13,24 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
use std::fmt::{self, Display};
|
use std::fmt::{self, Display};
|
||||||
|
use std::str::FromStr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use arrow::array::{ArrayRef, BinaryViewArray, StringViewArray, StructArray};
|
||||||
use arrow::compute;
|
use arrow::compute;
|
||||||
use datafusion_common::DataFusionError;
|
use arrow::datatypes::{Float64Type, Int64Type, UInt64Type};
|
||||||
use datafusion_common::arrow::array::{
|
use datafusion_common::arrow::array::{
|
||||||
Array, AsArray, BinaryViewBuilder, BooleanBuilder, Float64Builder, Int64Builder,
|
Array, AsArray, BinaryViewBuilder, BooleanBuilder, Float64Builder, Int64Builder,
|
||||||
StringViewBuilder,
|
StringViewBuilder,
|
||||||
};
|
};
|
||||||
use datafusion_common::arrow::datatypes::DataType;
|
use datafusion_common::arrow::datatypes::DataType;
|
||||||
|
use datafusion_common::{DataFusionError, Result};
|
||||||
use datafusion_expr::type_coercion::aggregates::STRINGS;
|
use datafusion_expr::type_coercion::aggregates::STRINGS;
|
||||||
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature};
|
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
|
||||||
|
use datatypes::arrow_array::string_array_value_at_index;
|
||||||
|
use datatypes::json::JsonStructureSettings;
|
||||||
|
use jsonpath_rust::JsonPath;
|
||||||
|
use serde_json::Value;
|
||||||
|
|
||||||
use crate::function::{Function, extract_args};
|
use crate::function::{Function, extract_args};
|
||||||
use crate::helper;
|
use crate::helper;
|
||||||
@@ -158,11 +165,7 @@ impl JsonGetString {
|
|||||||
impl Default for JsonGetString {
|
impl Default for JsonGetString {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
// TODO(LFC): Use a more clear type here instead of "Binary" for Json input, once we have a "Json" type.
|
signature: Signature::any(2, Volatility::Immutable),
|
||||||
signature: helper::one_of_sigs2(
|
|
||||||
vec![DataType::Binary, DataType::BinaryView],
|
|
||||||
vec![DataType::Utf8, DataType::Utf8View],
|
|
||||||
),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -172,7 +175,7 @@ impl Function for JsonGetString {
|
|||||||
Self::NAME
|
Self::NAME
|
||||||
}
|
}
|
||||||
|
|
||||||
fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
|
fn return_type(&self, _: &[DataType]) -> Result<DataType> {
|
||||||
Ok(DataType::Utf8View)
|
Ok(DataType::Utf8View)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -180,16 +183,36 @@ impl Function for JsonGetString {
|
|||||||
&self.signature
|
&self.signature
|
||||||
}
|
}
|
||||||
|
|
||||||
fn invoke_with_args(
|
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
|
||||||
&self,
|
|
||||||
args: ScalarFunctionArgs,
|
|
||||||
) -> datafusion_common::Result<ColumnarValue> {
|
|
||||||
let [arg0, arg1] = extract_args(self.name(), &args)?;
|
let [arg0, arg1] = extract_args(self.name(), &args)?;
|
||||||
let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
|
|
||||||
let jsons = arg0.as_binary_view();
|
|
||||||
let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
|
let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
|
||||||
let paths = arg1.as_string_view();
|
let paths = arg1.as_string_view();
|
||||||
|
|
||||||
|
let result = match arg0.data_type() {
|
||||||
|
DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
|
||||||
|
let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
|
||||||
|
let jsons = arg0.as_binary_view();
|
||||||
|
jsonb_get_string(jsons, paths)?
|
||||||
|
}
|
||||||
|
DataType::Struct(_) => {
|
||||||
|
let jsons = arg0.as_struct();
|
||||||
|
json_struct_get_string(jsons, paths)?
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
return Err(DataFusionError::Execution(format!(
|
||||||
|
"{} not supported argument type {}",
|
||||||
|
Self::NAME,
|
||||||
|
arg0.data_type(),
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(ColumnarValue::Array(result))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn jsonb_get_string(jsons: &BinaryViewArray, paths: &StringViewArray) -> Result<ArrayRef> {
|
||||||
let size = jsons.len();
|
let size = jsons.len();
|
||||||
let mut builder = StringViewBuilder::with_capacity(size);
|
let mut builder = StringViewBuilder::with_capacity(size);
|
||||||
|
|
||||||
@@ -205,8 +228,158 @@ impl Function for JsonGetString {
|
|||||||
builder.append_option(result);
|
builder.append_option(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(ColumnarValue::Array(Arc::new(builder.finish())))
|
Ok(Arc::new(builder.finish()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn json_struct_get_string(jsons: &StructArray, paths: &StringViewArray) -> Result<ArrayRef> {
|
||||||
|
let size = jsons.len();
|
||||||
|
let mut builder = StringViewBuilder::with_capacity(size);
|
||||||
|
|
||||||
|
for i in 0..size {
|
||||||
|
if jsons.is_null(i) || paths.is_null(i) {
|
||||||
|
builder.append_null();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let path = paths.value(i);
|
||||||
|
|
||||||
|
// naively assume the JSON path is our kind of indexing to the field, by removing its "root"
|
||||||
|
let field_path = path.replace("$.", "");
|
||||||
|
let column = jsons.column_by_name(&field_path);
|
||||||
|
|
||||||
|
if let Some(column) = column {
|
||||||
|
if let Some(v) = string_array_value_at_index(column, i) {
|
||||||
|
builder.append_value(v);
|
||||||
|
} else {
|
||||||
|
builder.append_value(arrow_cast::display::array_value_to_string(column, i)?);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
let Some(raw) = jsons
|
||||||
|
.column_by_name(JsonStructureSettings::RAW_FIELD)
|
||||||
|
.and_then(|x| string_array_value_at_index(x, i))
|
||||||
|
else {
|
||||||
|
builder.append_null();
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
let path: JsonPath<Value> = JsonPath::try_from(path).map_err(|e| {
|
||||||
|
DataFusionError::Execution(format!("{path} is not a valid JSON path: {e}"))
|
||||||
|
})?;
|
||||||
|
// the wanted field is not retrievable from the JSON struct columns directly, we have
|
||||||
|
// to combine everything (columns and the "_raw") into a complete JSON value to find it
|
||||||
|
let value = json_struct_to_value(raw, jsons, i)?;
|
||||||
|
|
||||||
|
match path.find(&value) {
|
||||||
|
Value::Null => builder.append_null(),
|
||||||
|
Value::Array(values) => match values.as_slice() {
|
||||||
|
[] => builder.append_null(),
|
||||||
|
[x] => {
|
||||||
|
if let Some(s) = x.as_str() {
|
||||||
|
builder.append_value(s)
|
||||||
|
} else {
|
||||||
|
builder.append_value(x.to_string())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
x => builder.append_value(
|
||||||
|
x.iter()
|
||||||
|
.map(|v| v.to_string())
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join(", "),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
// Safety: guarded by the returns of `path.find` as documented
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Arc::new(builder.finish()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn json_struct_to_value(raw: &str, jsons: &StructArray, i: usize) -> Result<Value> {
|
||||||
|
let Ok(mut json) = Value::from_str(raw) else {
|
||||||
|
return Err(DataFusionError::Internal(format!(
|
||||||
|
"inner field '{}' is not a valid JSON string",
|
||||||
|
JsonStructureSettings::RAW_FIELD
|
||||||
|
)));
|
||||||
|
};
|
||||||
|
|
||||||
|
for (column_name, column) in jsons.column_names().into_iter().zip(jsons.columns()) {
|
||||||
|
if column_name == JsonStructureSettings::RAW_FIELD {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let (json_pointer, field) = if let Some((json_object, field)) = column_name.rsplit_once(".")
|
||||||
|
{
|
||||||
|
let json_pointer = format!("/{}", json_object.replace(".", "/"));
|
||||||
|
(json_pointer, field)
|
||||||
|
} else {
|
||||||
|
("".to_string(), column_name)
|
||||||
|
};
|
||||||
|
let Some(json_object) = json
|
||||||
|
.pointer_mut(&json_pointer)
|
||||||
|
.and_then(|x| x.as_object_mut())
|
||||||
|
else {
|
||||||
|
return Err(DataFusionError::Internal(format!(
|
||||||
|
"value at JSON pointer '{}' is not an object",
|
||||||
|
json_pointer
|
||||||
|
)));
|
||||||
|
};
|
||||||
|
|
||||||
|
macro_rules! insert {
|
||||||
|
($column: ident, $i: ident, $json_object: ident, $field: ident) => {{
|
||||||
|
if let Some(value) = $column
|
||||||
|
.is_valid($i)
|
||||||
|
.then(|| serde_json::Value::from($column.value($i)))
|
||||||
|
{
|
||||||
|
$json_object.insert($field.to_string(), value);
|
||||||
|
}
|
||||||
|
}};
|
||||||
|
}
|
||||||
|
|
||||||
|
match column.data_type() {
|
||||||
|
// boolean => Value::Bool
|
||||||
|
DataType::Boolean => {
|
||||||
|
let column = column.as_boolean();
|
||||||
|
insert!(column, i, json_object, field);
|
||||||
|
}
|
||||||
|
// int => Value::Number
|
||||||
|
DataType::Int64 => {
|
||||||
|
let column = column.as_primitive::<Int64Type>();
|
||||||
|
insert!(column, i, json_object, field);
|
||||||
|
}
|
||||||
|
DataType::UInt64 => {
|
||||||
|
let column = column.as_primitive::<UInt64Type>();
|
||||||
|
insert!(column, i, json_object, field);
|
||||||
|
}
|
||||||
|
DataType::Float64 => {
|
||||||
|
let column = column.as_primitive::<Float64Type>();
|
||||||
|
insert!(column, i, json_object, field);
|
||||||
|
}
|
||||||
|
// string => Value::String
|
||||||
|
DataType::Utf8 => {
|
||||||
|
let column = column.as_string::<i32>();
|
||||||
|
insert!(column, i, json_object, field);
|
||||||
|
}
|
||||||
|
DataType::LargeUtf8 => {
|
||||||
|
let column = column.as_string::<i64>();
|
||||||
|
insert!(column, i, json_object, field);
|
||||||
|
}
|
||||||
|
DataType::Utf8View => {
|
||||||
|
let column = column.as_string_view();
|
||||||
|
insert!(column, i, json_object, field);
|
||||||
|
}
|
||||||
|
// other => Value::Array and Value::Object
|
||||||
|
_ => {
|
||||||
|
return Err(DataFusionError::NotImplemented(format!(
|
||||||
|
"{} is not yet supported to be executed with field {} of datatype {}",
|
||||||
|
JsonGetString::NAME,
|
||||||
|
column_name,
|
||||||
|
column.data_type()
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(json)
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Display for JsonGetString {
|
impl Display for JsonGetString {
|
||||||
@@ -296,11 +469,13 @@ impl Display for JsonGetObject {
|
|||||||
mod tests {
|
mod tests {
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use arrow::array::{Float64Array, Int64Array, StructArray};
|
||||||
use arrow_schema::Field;
|
use arrow_schema::Field;
|
||||||
use datafusion_common::ScalarValue;
|
use datafusion_common::ScalarValue;
|
||||||
use datafusion_common::arrow::array::{BinaryArray, BinaryViewArray, StringArray};
|
use datafusion_common::arrow::array::{BinaryArray, BinaryViewArray, StringArray};
|
||||||
use datafusion_common::arrow::datatypes::{Float64Type, Int64Type};
|
use datafusion_common::arrow::datatypes::{Float64Type, Int64Type};
|
||||||
use datatypes::types::parse_string_to_jsonb;
|
use datatypes::types::parse_string_to_jsonb;
|
||||||
|
use serde_json::json;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
@@ -474,42 +649,123 @@ mod tests {
|
|||||||
r#"{"a": "d", "b": {"c": "e"}, "c": "f"}"#,
|
r#"{"a": "d", "b": {"c": "e"}, "c": "f"}"#,
|
||||||
r#"{"a": "g", "b": "h", "c": {"a": "g"}}"#,
|
r#"{"a": "g", "b": "h", "c": {"a": "g"}}"#,
|
||||||
];
|
];
|
||||||
let paths = vec!["$.a.b", "$.a", ""];
|
|
||||||
let results = [Some("a"), Some("d"), None];
|
|
||||||
|
|
||||||
let jsonbs = json_strings
|
// complete JSON is:
|
||||||
|
// {
|
||||||
|
// "kind": "foo",
|
||||||
|
// "payload": {
|
||||||
|
// "code": 404,
|
||||||
|
// "success": false,
|
||||||
|
// "result": {
|
||||||
|
// "error": "not found",
|
||||||
|
// "time_cost": 1.234
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
let json_struct: ArrayRef = Arc::new(StructArray::new(
|
||||||
|
vec![
|
||||||
|
Field::new("kind", DataType::Utf8, true),
|
||||||
|
Field::new("payload.code", DataType::Int64, true),
|
||||||
|
Field::new("payload.result.time_cost", DataType::Float64, true),
|
||||||
|
Field::new(JsonStructureSettings::RAW_FIELD, DataType::Utf8View, true),
|
||||||
|
]
|
||||||
|
.into(),
|
||||||
|
vec![
|
||||||
|
Arc::new(StringArray::from_iter([Some("foo")])) as ArrayRef,
|
||||||
|
Arc::new(Int64Array::from_iter([Some(404)])),
|
||||||
|
Arc::new(Float64Array::from_iter([Some(1.234)])),
|
||||||
|
Arc::new(StringViewArray::from_iter([Some(
|
||||||
|
json! ({
|
||||||
|
"payload": {
|
||||||
|
"success": false,
|
||||||
|
"result": {
|
||||||
|
"error": "not found"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.to_string(),
|
||||||
|
)])),
|
||||||
|
],
|
||||||
|
None,
|
||||||
|
));
|
||||||
|
|
||||||
|
let paths = vec![
|
||||||
|
"$.a.b",
|
||||||
|
"$.a",
|
||||||
|
"",
|
||||||
|
"$.kind",
|
||||||
|
"$.payload.code",
|
||||||
|
"$.payload.result.time_cost",
|
||||||
|
"$.payload",
|
||||||
|
"$.payload.success",
|
||||||
|
"$.payload.result",
|
||||||
|
"$.payload.result.error",
|
||||||
|
"$.payload.result.not-exists",
|
||||||
|
"$.payload.not-exists",
|
||||||
|
"$.not-exists",
|
||||||
|
"$",
|
||||||
|
];
|
||||||
|
let expects = [
|
||||||
|
Some("a"),
|
||||||
|
Some("d"),
|
||||||
|
None,
|
||||||
|
Some("foo"),
|
||||||
|
Some("404"),
|
||||||
|
Some("1.234"),
|
||||||
|
Some(
|
||||||
|
r#"{"code":404,"result":{"error":"not found","time_cost":1.234},"success":false}"#,
|
||||||
|
),
|
||||||
|
Some("false"),
|
||||||
|
Some(r#"{"error":"not found","time_cost":1.234}"#),
|
||||||
|
Some("not found"),
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
Some(
|
||||||
|
r#"{"kind":"foo","payload":{"code":404,"result":{"error":"not found","time_cost":1.234},"success":false}}"#,
|
||||||
|
),
|
||||||
|
];
|
||||||
|
|
||||||
|
let mut jsons = json_strings
|
||||||
.iter()
|
.iter()
|
||||||
.map(|s| {
|
.map(|s| {
|
||||||
let value = jsonb::parse_value(s.as_bytes()).unwrap();
|
let value = jsonb::parse_value(s.as_bytes()).unwrap();
|
||||||
value.to_vec()
|
Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
let json_struct_arrays =
|
||||||
|
std::iter::repeat_n(json_struct, expects.len() - jsons.len()).collect::<Vec<_>>();
|
||||||
|
jsons.extend(json_struct_arrays);
|
||||||
|
|
||||||
|
for i in 0..jsons.len() {
|
||||||
|
let json = &jsons[i];
|
||||||
|
let path = paths[i];
|
||||||
|
let expect = expects[i];
|
||||||
|
|
||||||
let args = ScalarFunctionArgs {
|
let args = ScalarFunctionArgs {
|
||||||
args: vec![
|
args: vec![
|
||||||
ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))),
|
ColumnarValue::Array(json.clone()),
|
||||||
ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
|
ColumnarValue::Scalar(path.into()),
|
||||||
],
|
],
|
||||||
arg_fields: vec![],
|
arg_fields: vec![],
|
||||||
number_rows: 3,
|
number_rows: 1,
|
||||||
return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
|
return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
|
||||||
config_options: Arc::new(Default::default()),
|
config_options: Arc::new(Default::default()),
|
||||||
};
|
};
|
||||||
let result = json_get_string
|
let result = json_get_string
|
||||||
.invoke_with_args(args)
|
.invoke_with_args(args)
|
||||||
.and_then(|x| x.to_array(3))
|
.and_then(|x| x.to_array(1))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let vector = result.as_string_view();
|
|
||||||
|
|
||||||
assert_eq!(3, vector.len());
|
let result = result.as_string_view();
|
||||||
for (i, gt) in results.iter().enumerate() {
|
assert_eq!(1, result.len());
|
||||||
let result = vector.is_valid(i).then(|| vector.value(i));
|
let actual = result.is_valid(0).then(|| result.value(0));
|
||||||
assert_eq!(*gt, result);
|
assert_eq!(actual, expect);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_json_get_object() -> datafusion_common::Result<()> {
|
fn test_json_get_object() -> Result<()> {
|
||||||
let udf = JsonGetObject::default();
|
let udf = JsonGetObject::default();
|
||||||
assert_eq!("json_get_object", udf.name());
|
assert_eq!("json_get_object", udf.name());
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ use arrow::datatypes::{
|
|||||||
Time64NanosecondType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType,
|
Time64NanosecondType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType,
|
||||||
TimestampNanosecondType, TimestampSecondType,
|
TimestampNanosecondType, TimestampSecondType,
|
||||||
};
|
};
|
||||||
|
use arrow_array::Array;
|
||||||
use common_time::time::Time;
|
use common_time::time::Time;
|
||||||
use common_time::{Duration, Timestamp};
|
use common_time::{Duration, Timestamp};
|
||||||
|
|
||||||
@@ -126,3 +127,28 @@ pub fn duration_array_value(array: &ArrayRef, i: usize) -> Duration {
|
|||||||
};
|
};
|
||||||
Duration::new(v, time_unit.into())
|
Duration::new(v, time_unit.into())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get the string value at index `i` for `Utf8`, `LargeUtf8`, or `Utf8View` arrays.
|
||||||
|
///
|
||||||
|
/// Returns `None` when the array type is not a string type or the value is null.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
///
|
||||||
|
/// If index `i` is out of bounds.
|
||||||
|
pub fn string_array_value_at_index(array: &ArrayRef, i: usize) -> Option<&str> {
|
||||||
|
match array.data_type() {
|
||||||
|
DataType::Utf8 => {
|
||||||
|
let array = array.as_string::<i32>();
|
||||||
|
array.is_valid(i).then(|| array.value(i))
|
||||||
|
}
|
||||||
|
DataType::LargeUtf8 => {
|
||||||
|
let array = array.as_string::<i64>();
|
||||||
|
array.is_valid(i).then(|| array.value(i))
|
||||||
|
}
|
||||||
|
DataType::Utf8View => {
|
||||||
|
let array = array.as_string_view();
|
||||||
|
array.is_valid(i).then(|| array.value(i))
|
||||||
|
}
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -56,6 +56,32 @@ async fn query_data(frontend: &Arc<Instance>) -> io::Result<()> {
|
|||||||
))?;
|
))?;
|
||||||
execute_sql_and_expect(frontend, sql, &expected).await;
|
execute_sql_and_expect(frontend, sql, &expected).await;
|
||||||
|
|
||||||
|
// query 1:
|
||||||
|
let sql = "SELECT json_get_string(data, '$.commit.collection') AS event, count() AS count FROM bluesky GROUP BY event ORDER BY count DESC";
|
||||||
|
let expected = r#"
|
||||||
|
+-----------------------+-------+
|
||||||
|
| event | count |
|
||||||
|
+-----------------------+-------+
|
||||||
|
| app.bsky.feed.post | 3 |
|
||||||
|
| app.bsky.feed.like | 3 |
|
||||||
|
| app.bsky.graph.follow | 3 |
|
||||||
|
| app.bsky.feed.repost | 1 |
|
||||||
|
+-----------------------+-------+"#;
|
||||||
|
execute_sql_and_expect(frontend, sql, expected).await;
|
||||||
|
|
||||||
|
// query 2:
|
||||||
|
let sql = "SELECT json_get_string(data, '$.commit.collection') AS event, count() AS count, count(DISTINCT json_get_string(data, '$.did')) AS users FROM bluesky WHERE (json_get_string(data, '$.kind') = 'commit') AND (json_get_string(data, '$.commit.operation') = 'create') GROUP BY event ORDER BY count DESC";
|
||||||
|
let expected = r#"
|
||||||
|
+-----------------------+-------+-------+
|
||||||
|
| event | count | users |
|
||||||
|
+-----------------------+-------+-------+
|
||||||
|
| app.bsky.feed.post | 3 | 3 |
|
||||||
|
| app.bsky.feed.like | 3 | 3 |
|
||||||
|
| app.bsky.graph.follow | 3 | 3 |
|
||||||
|
| app.bsky.feed.repost | 1 | 1 |
|
||||||
|
+-----------------------+-------+-------+"#;
|
||||||
|
execute_sql_and_expect(frontend, sql, expected).await;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user