mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-27 00:19:58 +00:00
Compare commits
1 Commits
main
...
feat/impl-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
48c5a9cdaf |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -2190,6 +2190,7 @@ dependencies = [
|
||||
"approx 0.5.1",
|
||||
"arc-swap",
|
||||
"arrow",
|
||||
"arrow-cast",
|
||||
"arrow-schema",
|
||||
"async-trait",
|
||||
"bincode",
|
||||
@@ -2220,6 +2221,7 @@ dependencies = [
|
||||
"h3o",
|
||||
"hyperloglogplus",
|
||||
"jsonb",
|
||||
"jsonpath-rust 0.7.5",
|
||||
"memchr",
|
||||
"mito-codec",
|
||||
"nalgebra",
|
||||
|
||||
@@ -103,6 +103,7 @@ aquamarine = "0.6"
|
||||
arrow = { version = "56.2", features = ["prettyprint"] }
|
||||
arrow-array = { version = "56.2", default-features = false, features = ["chrono-tz"] }
|
||||
arrow-buffer = "56.2"
|
||||
arrow-cast = "56.2"
|
||||
arrow-flight = "56.2"
|
||||
arrow-ipc = { version = "56.2", default-features = false, features = ["lz4", "zstd"] }
|
||||
arrow-schema = { version = "56.2", features = ["serde"] }
|
||||
|
||||
@@ -17,6 +17,7 @@ ahash.workspace = true
|
||||
api.workspace = true
|
||||
arc-swap = "1.0"
|
||||
arrow.workspace = true
|
||||
arrow-cast.workspace = true
|
||||
arrow-schema.workspace = true
|
||||
async-trait.workspace = true
|
||||
bincode = "=1.3.3"
|
||||
@@ -46,6 +47,7 @@ geohash = { version = "0.13", optional = true }
|
||||
h3o = { version = "0.6", optional = true }
|
||||
hyperloglogplus = "0.4"
|
||||
jsonb.workspace = true
|
||||
jsonpath-rust = "0.7.5"
|
||||
memchr = "2.7"
|
||||
mito-codec.workspace = true
|
||||
nalgebra.workspace = true
|
||||
|
||||
@@ -13,17 +13,24 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt::{self, Display};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::array::{ArrayRef, BinaryViewArray, StringViewArray, StructArray};
|
||||
use arrow::compute;
|
||||
use datafusion_common::DataFusionError;
|
||||
use arrow::datatypes::{Float64Type, Int64Type, UInt64Type};
|
||||
use datafusion_common::arrow::array::{
|
||||
Array, AsArray, BinaryViewBuilder, BooleanBuilder, Float64Builder, Int64Builder,
|
||||
StringViewBuilder,
|
||||
};
|
||||
use datafusion_common::arrow::datatypes::DataType;
|
||||
use datafusion_common::{DataFusionError, Result};
|
||||
use datafusion_expr::type_coercion::aggregates::STRINGS;
|
||||
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature};
|
||||
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
|
||||
use datatypes::arrow_array::string_array_value_at_index;
|
||||
use datatypes::json::JsonStructureSettings;
|
||||
use jsonpath_rust::JsonPath;
|
||||
use serde_json::Value;
|
||||
|
||||
use crate::function::{Function, extract_args};
|
||||
use crate::helper;
|
||||
@@ -158,11 +165,7 @@ impl JsonGetString {
|
||||
impl Default for JsonGetString {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
// TODO(LFC): Use a more clear type here instead of "Binary" for Json input, once we have a "Json" type.
|
||||
signature: helper::one_of_sigs2(
|
||||
vec![DataType::Binary, DataType::BinaryView],
|
||||
vec![DataType::Utf8, DataType::Utf8View],
|
||||
),
|
||||
signature: Signature::any(2, Volatility::Immutable),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -172,7 +175,7 @@ impl Function for JsonGetString {
|
||||
Self::NAME
|
||||
}
|
||||
|
||||
fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
|
||||
fn return_type(&self, _: &[DataType]) -> Result<DataType> {
|
||||
Ok(DataType::Utf8View)
|
||||
}
|
||||
|
||||
@@ -180,33 +183,203 @@ impl Function for JsonGetString {
|
||||
&self.signature
|
||||
}
|
||||
|
||||
fn invoke_with_args(
|
||||
&self,
|
||||
args: ScalarFunctionArgs,
|
||||
) -> datafusion_common::Result<ColumnarValue> {
|
||||
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
|
||||
let [arg0, arg1] = extract_args(self.name(), &args)?;
|
||||
let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
|
||||
let jsons = arg0.as_binary_view();
|
||||
|
||||
let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
|
||||
let paths = arg1.as_string_view();
|
||||
|
||||
let size = jsons.len();
|
||||
let mut builder = StringViewBuilder::with_capacity(size);
|
||||
let result = match arg0.data_type() {
|
||||
DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
|
||||
let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
|
||||
let jsons = arg0.as_binary_view();
|
||||
jsonb_get_string(jsons, paths)?
|
||||
}
|
||||
DataType::Struct(_) => {
|
||||
let jsons = arg0.as_struct();
|
||||
json_struct_get_string(jsons, paths)?
|
||||
}
|
||||
_ => {
|
||||
return Err(DataFusionError::Execution(format!(
|
||||
"{} not supported argument type {}",
|
||||
Self::NAME,
|
||||
arg0.data_type(),
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
for i in 0..size {
|
||||
let json = jsons.is_valid(i).then(|| jsons.value(i));
|
||||
let path = paths.is_valid(i).then(|| paths.value(i));
|
||||
let result = match (json, path) {
|
||||
(Some(json), Some(path)) => {
|
||||
get_json_by_path(json, path).and_then(|json| jsonb::to_str(&json).ok())
|
||||
}
|
||||
_ => None,
|
||||
Ok(ColumnarValue::Array(result))
|
||||
}
|
||||
}
|
||||
|
||||
fn jsonb_get_string(jsons: &BinaryViewArray, paths: &StringViewArray) -> Result<ArrayRef> {
|
||||
let size = jsons.len();
|
||||
let mut builder = StringViewBuilder::with_capacity(size);
|
||||
|
||||
for i in 0..size {
|
||||
let json = jsons.is_valid(i).then(|| jsons.value(i));
|
||||
let path = paths.is_valid(i).then(|| paths.value(i));
|
||||
let result = match (json, path) {
|
||||
(Some(json), Some(path)) => {
|
||||
get_json_by_path(json, path).and_then(|json| jsonb::to_str(&json).ok())
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
builder.append_option(result);
|
||||
}
|
||||
|
||||
Ok(Arc::new(builder.finish()))
|
||||
}
|
||||
|
||||
fn json_struct_get_string(jsons: &StructArray, paths: &StringViewArray) -> Result<ArrayRef> {
|
||||
let size = jsons.len();
|
||||
let mut builder = StringViewBuilder::with_capacity(size);
|
||||
|
||||
for i in 0..size {
|
||||
if jsons.is_null(i) || paths.is_null(i) {
|
||||
builder.append_null();
|
||||
continue;
|
||||
}
|
||||
let path = paths.value(i);
|
||||
|
||||
// naively assume the JSON path is our kind of indexing to the field, by removing its "root"
|
||||
let field_path = path.replace("$.", "");
|
||||
let column = jsons.column_by_name(&field_path);
|
||||
|
||||
if let Some(column) = column {
|
||||
if let Some(v) = string_array_value_at_index(column, i) {
|
||||
builder.append_value(v);
|
||||
} else {
|
||||
builder.append_value(arrow_cast::display::array_value_to_string(column, i)?);
|
||||
}
|
||||
} else {
|
||||
let Some(raw) = jsons
|
||||
.column_by_name(JsonStructureSettings::RAW_FIELD)
|
||||
.and_then(|x| string_array_value_at_index(x, i))
|
||||
else {
|
||||
builder.append_null();
|
||||
continue;
|
||||
};
|
||||
builder.append_option(result);
|
||||
|
||||
let path: JsonPath<Value> = JsonPath::try_from(path).map_err(|e| {
|
||||
DataFusionError::Execution(format!("{path} is not a valid JSON path: {e}"))
|
||||
})?;
|
||||
// the wanted field is not retrievable from the JSON struct columns directly, we have
|
||||
// to combine everything (columns and the "_raw") into a complete JSON value to find it
|
||||
let value = json_struct_to_value(raw, jsons, i)?;
|
||||
|
||||
match path.find(&value) {
|
||||
Value::Null => builder.append_null(),
|
||||
Value::Array(values) => match values.as_slice() {
|
||||
[] => builder.append_null(),
|
||||
[x] => {
|
||||
if let Some(s) = x.as_str() {
|
||||
builder.append_value(s)
|
||||
} else {
|
||||
builder.append_value(x.to_string())
|
||||
}
|
||||
}
|
||||
x => builder.append_value(
|
||||
x.iter()
|
||||
.map(|v| v.to_string())
|
||||
.collect::<Vec<_>>()
|
||||
.join(", "),
|
||||
),
|
||||
},
|
||||
// Safety: guarded by the returns of `path.find` as documented
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Arc::new(builder.finish()))
|
||||
}
|
||||
|
||||
fn json_struct_to_value(raw: &str, jsons: &StructArray, i: usize) -> Result<Value> {
|
||||
let Ok(mut json) = Value::from_str(raw) else {
|
||||
return Err(DataFusionError::Internal(format!(
|
||||
"inner field '{}' is not a valid JSON string",
|
||||
JsonStructureSettings::RAW_FIELD
|
||||
)));
|
||||
};
|
||||
|
||||
for (column_name, column) in jsons.column_names().into_iter().zip(jsons.columns()) {
|
||||
if column_name == JsonStructureSettings::RAW_FIELD {
|
||||
continue;
|
||||
}
|
||||
|
||||
Ok(ColumnarValue::Array(Arc::new(builder.finish())))
|
||||
let (json_pointer, field) = if let Some((json_object, field)) = column_name.rsplit_once(".")
|
||||
{
|
||||
let json_pointer = format!("/{}", json_object.replace(".", "/"));
|
||||
(json_pointer, field)
|
||||
} else {
|
||||
("".to_string(), column_name)
|
||||
};
|
||||
let Some(json_object) = json
|
||||
.pointer_mut(&json_pointer)
|
||||
.and_then(|x| x.as_object_mut())
|
||||
else {
|
||||
return Err(DataFusionError::Internal(format!(
|
||||
"value at JSON pointer '{}' is not an object",
|
||||
json_pointer
|
||||
)));
|
||||
};
|
||||
|
||||
macro_rules! insert {
|
||||
($column: ident, $i: ident, $json_object: ident, $field: ident) => {{
|
||||
if let Some(value) = $column
|
||||
.is_valid($i)
|
||||
.then(|| serde_json::Value::from($column.value($i)))
|
||||
{
|
||||
$json_object.insert($field.to_string(), value);
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
match column.data_type() {
|
||||
// boolean => Value::Bool
|
||||
DataType::Boolean => {
|
||||
let column = column.as_boolean();
|
||||
insert!(column, i, json_object, field);
|
||||
}
|
||||
// int => Value::Number
|
||||
DataType::Int64 => {
|
||||
let column = column.as_primitive::<Int64Type>();
|
||||
insert!(column, i, json_object, field);
|
||||
}
|
||||
DataType::UInt64 => {
|
||||
let column = column.as_primitive::<UInt64Type>();
|
||||
insert!(column, i, json_object, field);
|
||||
}
|
||||
DataType::Float64 => {
|
||||
let column = column.as_primitive::<Float64Type>();
|
||||
insert!(column, i, json_object, field);
|
||||
}
|
||||
// string => Value::String
|
||||
DataType::Utf8 => {
|
||||
let column = column.as_string::<i32>();
|
||||
insert!(column, i, json_object, field);
|
||||
}
|
||||
DataType::LargeUtf8 => {
|
||||
let column = column.as_string::<i64>();
|
||||
insert!(column, i, json_object, field);
|
||||
}
|
||||
DataType::Utf8View => {
|
||||
let column = column.as_string_view();
|
||||
insert!(column, i, json_object, field);
|
||||
}
|
||||
// other => Value::Array and Value::Object
|
||||
_ => {
|
||||
return Err(DataFusionError::NotImplemented(format!(
|
||||
"{} is not yet supported to be executed with field {} of datatype {}",
|
||||
JsonGetString::NAME,
|
||||
column_name,
|
||||
column.data_type()
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(json)
|
||||
}
|
||||
|
||||
impl Display for JsonGetString {
|
||||
@@ -296,11 +469,13 @@ impl Display for JsonGetObject {
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::array::{Float64Array, Int64Array, StructArray};
|
||||
use arrow_schema::Field;
|
||||
use datafusion_common::ScalarValue;
|
||||
use datafusion_common::arrow::array::{BinaryArray, BinaryViewArray, StringArray};
|
||||
use datafusion_common::arrow::datatypes::{Float64Type, Int64Type};
|
||||
use datatypes::types::parse_string_to_jsonb;
|
||||
use serde_json::json;
|
||||
|
||||
use super::*;
|
||||
|
||||
@@ -474,42 +649,123 @@ mod tests {
|
||||
r#"{"a": "d", "b": {"c": "e"}, "c": "f"}"#,
|
||||
r#"{"a": "g", "b": "h", "c": {"a": "g"}}"#,
|
||||
];
|
||||
let paths = vec!["$.a.b", "$.a", ""];
|
||||
let results = [Some("a"), Some("d"), None];
|
||||
|
||||
let jsonbs = json_strings
|
||||
// complete JSON is:
|
||||
// {
|
||||
// "kind": "foo",
|
||||
// "payload": {
|
||||
// "code": 404,
|
||||
// "success": false,
|
||||
// "result": {
|
||||
// "error": "not found",
|
||||
// "time_cost": 1.234
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
let json_struct: ArrayRef = Arc::new(StructArray::new(
|
||||
vec![
|
||||
Field::new("kind", DataType::Utf8, true),
|
||||
Field::new("payload.code", DataType::Int64, true),
|
||||
Field::new("payload.result.time_cost", DataType::Float64, true),
|
||||
Field::new(JsonStructureSettings::RAW_FIELD, DataType::Utf8View, true),
|
||||
]
|
||||
.into(),
|
||||
vec![
|
||||
Arc::new(StringArray::from_iter([Some("foo")])) as ArrayRef,
|
||||
Arc::new(Int64Array::from_iter([Some(404)])),
|
||||
Arc::new(Float64Array::from_iter([Some(1.234)])),
|
||||
Arc::new(StringViewArray::from_iter([Some(
|
||||
json! ({
|
||||
"payload": {
|
||||
"success": false,
|
||||
"result": {
|
||||
"error": "not found"
|
||||
}
|
||||
}
|
||||
})
|
||||
.to_string(),
|
||||
)])),
|
||||
],
|
||||
None,
|
||||
));
|
||||
|
||||
let paths = vec![
|
||||
"$.a.b",
|
||||
"$.a",
|
||||
"",
|
||||
"$.kind",
|
||||
"$.payload.code",
|
||||
"$.payload.result.time_cost",
|
||||
"$.payload",
|
||||
"$.payload.success",
|
||||
"$.payload.result",
|
||||
"$.payload.result.error",
|
||||
"$.payload.result.not-exists",
|
||||
"$.payload.not-exists",
|
||||
"$.not-exists",
|
||||
"$",
|
||||
];
|
||||
let expects = [
|
||||
Some("a"),
|
||||
Some("d"),
|
||||
None,
|
||||
Some("foo"),
|
||||
Some("404"),
|
||||
Some("1.234"),
|
||||
Some(
|
||||
r#"{"code":404,"result":{"error":"not found","time_cost":1.234},"success":false}"#,
|
||||
),
|
||||
Some("false"),
|
||||
Some(r#"{"error":"not found","time_cost":1.234}"#),
|
||||
Some("not found"),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
Some(
|
||||
r#"{"kind":"foo","payload":{"code":404,"result":{"error":"not found","time_cost":1.234},"success":false}}"#,
|
||||
),
|
||||
];
|
||||
|
||||
let mut jsons = json_strings
|
||||
.iter()
|
||||
.map(|s| {
|
||||
let value = jsonb::parse_value(s.as_bytes()).unwrap();
|
||||
value.to_vec()
|
||||
Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let json_struct_arrays =
|
||||
std::iter::repeat_n(json_struct, expects.len() - jsons.len()).collect::<Vec<_>>();
|
||||
jsons.extend(json_struct_arrays);
|
||||
|
||||
let args = ScalarFunctionArgs {
|
||||
args: vec![
|
||||
ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))),
|
||||
ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
|
||||
],
|
||||
arg_fields: vec![],
|
||||
number_rows: 3,
|
||||
return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
|
||||
config_options: Arc::new(Default::default()),
|
||||
};
|
||||
let result = json_get_string
|
||||
.invoke_with_args(args)
|
||||
.and_then(|x| x.to_array(3))
|
||||
.unwrap();
|
||||
let vector = result.as_string_view();
|
||||
for i in 0..jsons.len() {
|
||||
let json = &jsons[i];
|
||||
let path = paths[i];
|
||||
let expect = expects[i];
|
||||
|
||||
assert_eq!(3, vector.len());
|
||||
for (i, gt) in results.iter().enumerate() {
|
||||
let result = vector.is_valid(i).then(|| vector.value(i));
|
||||
assert_eq!(*gt, result);
|
||||
let args = ScalarFunctionArgs {
|
||||
args: vec![
|
||||
ColumnarValue::Array(json.clone()),
|
||||
ColumnarValue::Scalar(path.into()),
|
||||
],
|
||||
arg_fields: vec![],
|
||||
number_rows: 1,
|
||||
return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
|
||||
config_options: Arc::new(Default::default()),
|
||||
};
|
||||
let result = json_get_string
|
||||
.invoke_with_args(args)
|
||||
.and_then(|x| x.to_array(1))
|
||||
.unwrap();
|
||||
|
||||
let result = result.as_string_view();
|
||||
assert_eq!(1, result.len());
|
||||
let actual = result.is_valid(0).then(|| result.value(0));
|
||||
assert_eq!(actual, expect);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_json_get_object() -> datafusion_common::Result<()> {
|
||||
fn test_json_get_object() -> Result<()> {
|
||||
let udf = JsonGetObject::default();
|
||||
assert_eq!("json_get_object", udf.name());
|
||||
assert_eq!(
|
||||
|
||||
@@ -19,6 +19,7 @@ use arrow::datatypes::{
|
||||
Time64NanosecondType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType,
|
||||
TimestampNanosecondType, TimestampSecondType,
|
||||
};
|
||||
use arrow_array::Array;
|
||||
use common_time::time::Time;
|
||||
use common_time::{Duration, Timestamp};
|
||||
|
||||
@@ -126,3 +127,28 @@ pub fn duration_array_value(array: &ArrayRef, i: usize) -> Duration {
|
||||
};
|
||||
Duration::new(v, time_unit.into())
|
||||
}
|
||||
|
||||
/// Get the string value at index `i` for `Utf8`, `LargeUtf8`, or `Utf8View` arrays.
|
||||
///
|
||||
/// Returns `None` when the array type is not a string type or the value is null.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// If index `i` is out of bounds.
|
||||
pub fn string_array_value_at_index(array: &ArrayRef, i: usize) -> Option<&str> {
|
||||
match array.data_type() {
|
||||
DataType::Utf8 => {
|
||||
let array = array.as_string::<i32>();
|
||||
array.is_valid(i).then(|| array.value(i))
|
||||
}
|
||||
DataType::LargeUtf8 => {
|
||||
let array = array.as_string::<i64>();
|
||||
array.is_valid(i).then(|| array.value(i))
|
||||
}
|
||||
DataType::Utf8View => {
|
||||
let array = array.as_string_view();
|
||||
array.is_valid(i).then(|| array.value(i))
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,6 +56,32 @@ async fn query_data(frontend: &Arc<Instance>) -> io::Result<()> {
|
||||
))?;
|
||||
execute_sql_and_expect(frontend, sql, &expected).await;
|
||||
|
||||
// query 1:
|
||||
let sql = "SELECT json_get_string(data, '$.commit.collection') AS event, count() AS count FROM bluesky GROUP BY event ORDER BY count DESC";
|
||||
let expected = r#"
|
||||
+-----------------------+-------+
|
||||
| event | count |
|
||||
+-----------------------+-------+
|
||||
| app.bsky.feed.post | 3 |
|
||||
| app.bsky.feed.like | 3 |
|
||||
| app.bsky.graph.follow | 3 |
|
||||
| app.bsky.feed.repost | 1 |
|
||||
+-----------------------+-------+"#;
|
||||
execute_sql_and_expect(frontend, sql, expected).await;
|
||||
|
||||
// query 2:
|
||||
let sql = "SELECT json_get_string(data, '$.commit.collection') AS event, count() AS count, count(DISTINCT json_get_string(data, '$.did')) AS users FROM bluesky WHERE (json_get_string(data, '$.kind') = 'commit') AND (json_get_string(data, '$.commit.operation') = 'create') GROUP BY event ORDER BY count DESC";
|
||||
let expected = r#"
|
||||
+-----------------------+-------+-------+
|
||||
| event | count | users |
|
||||
+-----------------------+-------+-------+
|
||||
| app.bsky.feed.post | 3 | 3 |
|
||||
| app.bsky.feed.like | 3 | 3 |
|
||||
| app.bsky.graph.follow | 3 | 3 |
|
||||
| app.bsky.feed.repost | 1 | 1 |
|
||||
+-----------------------+-------+-------+"#;
|
||||
execute_sql_and_expect(frontend, sql, expected).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user