refactor: rewrite some UDFs to DataFusion style (final part) (#7023)

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
LFC
2025-09-26 17:24:29 +08:00
committed by GitHub
parent aca8b690d1
commit 243dbde3d5
28 changed files with 940 additions and 982 deletions

View File

@@ -96,6 +96,41 @@ pub fn get_string_from_params<'a>(
Ok(s)
}
macro_rules! with_match_timestamp_types {
($data_type:expr, | $_t:tt $T:ident | $body:tt) => {{
macro_rules! __with_ty__ {
( $_t $T:ident ) => {
$body
};
}
use datafusion_common::DataFusionError;
use datafusion_common::arrow::datatypes::{
TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType,
};
match $data_type {
DataType::Timestamp(TimeUnit::Second, _) => Ok(__with_ty__! { TimestampSecondType }),
DataType::Timestamp(TimeUnit::Millisecond, _) => {
Ok(__with_ty__! { TimestampMillisecondType })
}
DataType::Timestamp(TimeUnit::Microsecond, _) => {
Ok(__with_ty__! { TimestampMicrosecondType })
}
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
Ok(__with_ty__! { TimestampNanosecondType })
}
_ => Err(DataFusionError::Execution(format!(
"not expected data type: '{}'",
$data_type
))),
}
}};
}
pub(crate) use with_match_timestamp_types;
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -13,17 +13,20 @@
// limitations under the License.
use std::fmt;
use std::sync::Arc;
use common_error::ext::BoxedError;
use common_query::error::{self, InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu};
use datafusion_expr::Signature;
use datatypes::arrow::datatypes::{DataType, TimeUnit};
use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder};
use datatypes::vectors::{StringVectorBuilder, VectorRef};
use snafu::{ResultExt, ensure};
use common_query::error::{self, Result};
use common_time::{Date, Timestamp};
use datafusion_common::DataFusionError;
use datafusion_common::arrow::array::{Array, AsArray, StringViewBuilder};
use datafusion_common::arrow::datatypes::{ArrowTimestampType, DataType, Date32Type, TimeUnit};
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature};
use snafu::ResultExt;
use crate::function::{Function, FunctionContext};
use crate::function::{Function, extract_args, find_function_context};
use crate::helper;
use crate::helper::with_match_timestamp_types;
/// A function that formats timestamp/date/datetime into string by the format
#[derive(Clone, Debug, Default)]
@@ -37,7 +40,7 @@ impl Function for DateFormatFunction {
}
fn return_type(&self, _: &[DataType]) -> Result<DataType> {
Ok(DataType::Utf8)
Ok(DataType::Utf8View)
}
fn signature(&self) -> Signature {
@@ -53,68 +56,65 @@ impl Function for DateFormatFunction {
)
}
fn eval(&self, func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 2, have: {}",
columns.len()
),
}
);
fn invoke_with_args(
&self,
args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
let ctx = find_function_context(&args)?;
let timezone = &ctx.query_ctx.timezone();
let left = &columns[0];
let formats = &columns[1];
let [left, arg1] = extract_args(self.name(), &args)?;
let formats = arg1.as_string::<i32>();
let size = left.len();
let left_datatype = columns[0].data_type();
let mut results = StringVectorBuilder::with_capacity(size);
let left_datatype = left.data_type();
let mut builder = StringViewBuilder::with_capacity(size);
match left_datatype {
ConcreteDataType::Timestamp(_) => {
for i in 0..size {
let ts = left.get(i).as_timestamp();
let format = formats.get(i).as_string();
let result = match (ts, format) {
(Some(ts), Some(fmt)) => Some(
ts.as_formatted_string(&fmt, Some(&func_ctx.query_ctx.timezone()))
.map_err(BoxedError::new)
.context(error::ExecuteSnafu)?,
),
_ => None,
};
results.push(result.as_deref());
}
DataType::Timestamp(_, _) => {
with_match_timestamp_types!(left_datatype, |$S| {
let array = left.as_primitive::<$S>();
for (date, format) in array.iter().zip(formats.iter()) {
let result = match (date, format) {
(Some(date), Some(format)) => {
let ts = Timestamp::new(date, $S::UNIT.into());
let x = ts.as_formatted_string(&format, Some(timezone))
.map_err(|e| DataFusionError::Execution(format!(
"cannot format {ts:?} as '{format}': {e}"
)))?;
Some(x)
}
_ => None
};
builder.append_option(result.as_deref());
}
})?;
}
ConcreteDataType::Date(_) => {
DataType::Date32 => {
let left = left.as_primitive::<Date32Type>();
for i in 0..size {
let date = left.get(i).as_date();
let format = formats.get(i).as_string();
let date = left.is_valid(i).then(|| Date::from(left.value(i)));
let format = formats.is_valid(i).then(|| formats.value(i));
let result = match (date, format) {
(Some(date), Some(fmt)) => date
.as_formatted_string(&fmt, Some(&func_ctx.query_ctx.timezone()))
.as_formatted_string(fmt, Some(timezone))
.map_err(BoxedError::new)
.context(error::ExecuteSnafu)?,
_ => None,
};
results.push(result.as_deref());
builder.append_option(result.as_deref());
}
}
_ => {
return UnsupportedInputDataTypeSnafu {
function: NAME,
datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
}
.fail();
x => {
return Err(DataFusionError::Execution(format!(
"unsupported input data type {x}"
)));
}
}
Ok(results.to_vector())
Ok(ColumnarValue::Array(Arc::new(builder.finish())))
}
}
@@ -128,28 +128,32 @@ impl fmt::Display for DateFormatFunction {
mod tests {
use std::sync::Arc;
use arrow_schema::Field;
use datafusion_common::arrow::array::{Date32Array, StringArray, TimestampSecondArray};
use datafusion_common::config::ConfigOptions;
use datafusion_expr::{TypeSignature, Volatility};
use datatypes::prelude::ScalarVector;
use datatypes::value::Value;
use datatypes::vectors::{DateVector, StringVector, TimestampSecondVector};
use super::{DateFormatFunction, *};
use crate::function::FunctionContext;
#[test]
fn test_date_format_misc() {
let f = DateFormatFunction;
assert_eq!("date_format", f.name());
assert_eq!(
DataType::Utf8,
DataType::Utf8View,
f.return_type(&[DataType::Timestamp(TimeUnit::Microsecond, None)])
.unwrap()
);
assert_eq!(
DataType::Utf8,
DataType::Utf8View,
f.return_type(&[DataType::Timestamp(TimeUnit::Second, None)])
.unwrap()
);
assert_eq!(DataType::Utf8, f.return_type(&[DataType::Date32]).unwrap());
assert_eq!(
DataType::Utf8View,
f.return_type(&[DataType::Date32]).unwrap()
);
assert!(matches!(f.signature(),
Signature {
type_signature: TypeSignature::OneOf(sigs),
@@ -175,26 +179,29 @@ mod tests {
None,
];
let time_vector = TimestampSecondVector::from(times.clone());
let interval_vector = StringVector::from_vec(formats);
let args: Vec<VectorRef> = vec![Arc::new(time_vector), Arc::new(interval_vector)];
let vector = f.eval(&FunctionContext::default(), &args).unwrap();
let mut config_options = ConfigOptions::default();
config_options.extensions.insert(FunctionContext::default());
let config_options = Arc::new(config_options);
let args = ScalarFunctionArgs {
args: vec![
ColumnarValue::Array(Arc::new(TimestampSecondArray::from(times))),
ColumnarValue::Array(Arc::new(StringArray::from_iter_values(formats))),
],
arg_fields: vec![],
number_rows: 4,
return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
config_options,
};
let result = f
.invoke_with_args(args)
.and_then(|x| x.to_array(4))
.unwrap();
let vector = result.as_string_view();
assert_eq!(4, vector.len());
for (i, _t) in times.iter().enumerate() {
let v = vector.get(i);
let result = results.get(i).unwrap();
if result.is_none() {
assert_eq!(Value::Null, v);
continue;
}
match v {
Value::String(s) => {
assert_eq!(s.as_utf8(), result.unwrap());
}
_ => unreachable!(),
}
for (actual, expect) in vector.iter().zip(results) {
assert_eq!(actual, expect);
}
}
@@ -216,26 +223,29 @@ mod tests {
None,
];
let date_vector = DateVector::from(dates.clone());
let interval_vector = StringVector::from_vec(formats);
let args: Vec<VectorRef> = vec![Arc::new(date_vector), Arc::new(interval_vector)];
let vector = f.eval(&FunctionContext::default(), &args).unwrap();
let mut config_options = ConfigOptions::default();
config_options.extensions.insert(FunctionContext::default());
let config_options = Arc::new(config_options);
let args = ScalarFunctionArgs {
args: vec![
ColumnarValue::Array(Arc::new(Date32Array::from(dates))),
ColumnarValue::Array(Arc::new(StringArray::from_iter_values(formats))),
],
arg_fields: vec![],
number_rows: 4,
return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
config_options,
};
let result = f
.invoke_with_args(args)
.and_then(|x| x.to_array(4))
.unwrap();
let vector = result.as_string_view();
assert_eq!(4, vector.len());
for (i, _t) in dates.iter().enumerate() {
let v = vector.get(i);
let result = results.get(i).unwrap();
if result.is_none() {
assert_eq!(Value::Null, v);
continue;
}
match v {
Value::String(s) => {
assert_eq!(s.as_utf8(), result.unwrap());
}
_ => unreachable!(),
}
for (actual, expect) in vector.iter().zip(results) {
assert_eq!(actual, expect);
}
}
}

View File

@@ -13,20 +13,18 @@
// limitations under the License.
use std::fmt::{self, Display};
use std::sync::Arc;
use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu};
use datafusion_expr::{Signature, Volatility};
use datatypes::arrow::datatypes::DataType;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::VectorRef;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{
BooleanVectorBuilder, Float64VectorBuilder, Int64VectorBuilder, MutableVector,
StringVectorBuilder,
use arrow::compute;
use common_query::error::Result;
use datafusion_common::arrow::array::{
Array, AsArray, BooleanBuilder, Float64Builder, Int64Builder, StringViewBuilder,
};
use snafu::ensure;
use datafusion_common::arrow::datatypes::DataType;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature};
use crate::function::{Function, FunctionContext};
use crate::function::{Function, extract_args};
use crate::helper;
fn get_json_by_path(json: &[u8], path: &str) -> Option<Vec<u8>> {
let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes());
@@ -64,59 +62,40 @@ macro_rules! json_get {
fn signature(&self) -> Signature {
// TODO(LFC): Use a more clear type here instead of "Binary" for Json input, once we have a "Json" type.
Signature::exact(
vec![DataType::Binary, DataType::Utf8],
Volatility::Immutable,
helper::one_of_sigs2(
vec![DataType::Binary, DataType::BinaryView],
vec![DataType::Utf8, DataType::Utf8View],
)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly two, have: {}",
columns.len()
),
}
);
let jsons = &columns[0];
let paths = &columns[1];
fn invoke_with_args(
&self,
args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
let [arg0, arg1] = extract_args(self.name(), &args)?;
let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
let jsons = arg0.as_binary_view();
let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
let paths = arg1.as_string_view();
let size = jsons.len();
let datatype = jsons.data_type();
let mut results = [<$type VectorBuilder>]::with_capacity(size);
let mut builder = [<$type Builder>]::with_capacity(size);
match datatype {
// JSON data type uses binary vector
ConcreteDataType::Binary(_) => {
for i in 0..size {
let json = jsons.get_ref(i);
let path = paths.get_ref(i);
let json = json.as_binary();
let path = path.as_string();
let result = match (json, path) {
(Ok(Some(json)), Ok(Some(path))) => {
get_json_by_path(json, path)
.and_then(|json| { jsonb::[<to_ $rust_type>](&json).ok() })
}
_ => None,
};
results.push(result);
for i in 0..size {
let json = jsons.is_valid(i).then(|| jsons.value(i));
let path = paths.is_valid(i).then(|| paths.value(i));
let result = match (json, path) {
(Some(json), Some(path)) => {
get_json_by_path(json, path)
.and_then(|json| { jsonb::[<to_ $rust_type>](&json).ok() })
}
}
_ => {
return UnsupportedInputDataTypeSnafu {
function: stringify!([<$name:snake>]),
datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
}
.fail();
}
_ => None,
};
builder.append_option(result);
}
Ok(results.to_vector())
Ok(ColumnarValue::Array(Arc::new(builder.finish())))
}
}
@@ -160,63 +139,43 @@ impl Function for JsonGetString {
}
fn return_type(&self, _: &[DataType]) -> Result<DataType> {
Ok(DataType::Utf8)
Ok(DataType::Utf8View)
}
fn signature(&self) -> Signature {
// TODO(LFC): Use a more clear type here instead of "Binary" for Json input, once we have a "Json" type.
Signature::exact(
vec![DataType::Binary, DataType::Utf8],
Volatility::Immutable,
helper::one_of_sigs2(
vec![DataType::Binary, DataType::BinaryView],
vec![DataType::Utf8, DataType::Utf8View],
)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly two, have: {}",
columns.len()
),
}
);
let jsons = &columns[0];
let paths = &columns[1];
fn invoke_with_args(
&self,
args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
let [arg0, arg1] = extract_args(self.name(), &args)?;
let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
let jsons = arg0.as_binary_view();
let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
let paths = arg1.as_string_view();
let size = jsons.len();
let datatype = jsons.data_type();
let mut results = StringVectorBuilder::with_capacity(size);
let mut builder = StringViewBuilder::with_capacity(size);
match datatype {
// JSON data type uses binary vector
ConcreteDataType::Binary(_) => {
for i in 0..size {
let json = jsons.get_ref(i);
let path = paths.get_ref(i);
let json = json.as_binary();
let path = path.as_string();
let result = match (json, path) {
(Ok(Some(json)), Ok(Some(path))) => {
get_json_by_path(json, path).and_then(|json| jsonb::to_str(&json).ok())
}
_ => None,
};
results.push(result.as_deref());
for i in 0..size {
let json = jsons.is_valid(i).then(|| jsons.value(i));
let path = paths.is_valid(i).then(|| paths.value(i));
let result = match (json, path) {
(Some(json), Some(path)) => {
get_json_by_path(json, path).and_then(|json| jsonb::to_str(&json).ok())
}
}
_ => {
return UnsupportedInputDataTypeSnafu {
function: "json_get_string",
datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
}
.fail();
}
_ => None,
};
builder.append_option(result);
}
Ok(results.to_vector())
Ok(ColumnarValue::Array(Arc::new(builder.finish())))
}
}
@@ -230,9 +189,9 @@ impl Display for JsonGetString {
mod tests {
use std::sync::Arc;
use datafusion_expr::TypeSignature;
use datatypes::scalars::ScalarVector;
use datatypes::vectors::{BinaryVector, StringVector};
use arrow_schema::Field;
use datafusion_common::arrow::array::{BinaryArray, StringArray};
use datafusion_common::arrow::datatypes::{Float64Type, Int64Type};
use super::*;
@@ -248,13 +207,6 @@ mod tests {
.unwrap()
);
assert!(matches!(json_get_int.signature(),
Signature {
type_signature: TypeSignature::Exact(valid_types),
volatility: Volatility::Immutable
} if valid_types == vec![DataType::Binary, DataType::Utf8]
));
let json_strings = [
r#"{"a": {"b": 2}, "b": 2, "c": 3}"#,
r#"{"a": 4, "b": {"c": 6}, "c": 6}"#,
@@ -271,17 +223,25 @@ mod tests {
})
.collect::<Vec<_>>();
let json_vector = BinaryVector::from_vec(jsonbs);
let path_vector = StringVector::from_vec(paths);
let args: Vec<VectorRef> = vec![Arc::new(json_vector), Arc::new(path_vector)];
let vector = json_get_int
.eval(&FunctionContext::default(), &args)
let args = ScalarFunctionArgs {
args: vec![
ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))),
ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
],
arg_fields: vec![],
number_rows: 3,
return_field: Arc::new(Field::new("x", DataType::Int64, false)),
config_options: Arc::new(Default::default()),
};
let result = json_get_int
.invoke_with_args(args)
.and_then(|x| x.to_array(3))
.unwrap();
let vector = result.as_primitive::<Int64Type>();
assert_eq!(3, vector.len());
for (i, gt) in results.iter().enumerate() {
let result = vector.get_ref(i);
let result = result.as_i64().unwrap();
let result = vector.is_valid(i).then(|| vector.value(i));
assert_eq!(*gt, result);
}
}
@@ -298,13 +258,6 @@ mod tests {
.unwrap()
);
assert!(matches!(json_get_float.signature(),
Signature {
type_signature: TypeSignature::Exact(valid_types),
volatility: Volatility::Immutable
} if valid_types == vec![DataType::Binary, DataType::Utf8]
));
let json_strings = [
r#"{"a": {"b": 2.1}, "b": 2.2, "c": 3.3}"#,
r#"{"a": 4.4, "b": {"c": 6.6}, "c": 6.6}"#,
@@ -321,17 +274,25 @@ mod tests {
})
.collect::<Vec<_>>();
let json_vector = BinaryVector::from_vec(jsonbs);
let path_vector = StringVector::from_vec(paths);
let args: Vec<VectorRef> = vec![Arc::new(json_vector), Arc::new(path_vector)];
let vector = json_get_float
.eval(&FunctionContext::default(), &args)
let args = ScalarFunctionArgs {
args: vec![
ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))),
ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
],
arg_fields: vec![],
number_rows: 3,
return_field: Arc::new(Field::new("x", DataType::Float64, false)),
config_options: Arc::new(Default::default()),
};
let result = json_get_float
.invoke_with_args(args)
.and_then(|x| x.to_array(3))
.unwrap();
let vector = result.as_primitive::<Float64Type>();
assert_eq!(3, vector.len());
for (i, gt) in results.iter().enumerate() {
let result = vector.get_ref(i);
let result = result.as_f64().unwrap();
let result = vector.is_valid(i).then(|| vector.value(i));
assert_eq!(*gt, result);
}
}
@@ -348,13 +309,6 @@ mod tests {
.unwrap()
);
assert!(matches!(json_get_bool.signature(),
Signature {
type_signature: TypeSignature::Exact(valid_types),
volatility: Volatility::Immutable
} if valid_types == vec![DataType::Binary, DataType::Utf8]
));
let json_strings = [
r#"{"a": {"b": true}, "b": false, "c": true}"#,
r#"{"a": false, "b": {"c": true}, "c": false}"#,
@@ -371,17 +325,25 @@ mod tests {
})
.collect::<Vec<_>>();
let json_vector = BinaryVector::from_vec(jsonbs);
let path_vector = StringVector::from_vec(paths);
let args: Vec<VectorRef> = vec![Arc::new(json_vector), Arc::new(path_vector)];
let vector = json_get_bool
.eval(&FunctionContext::default(), &args)
let args = ScalarFunctionArgs {
args: vec![
ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))),
ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
],
arg_fields: vec![],
number_rows: 3,
return_field: Arc::new(Field::new("x", DataType::Boolean, false)),
config_options: Arc::new(Default::default()),
};
let result = json_get_bool
.invoke_with_args(args)
.and_then(|x| x.to_array(3))
.unwrap();
let vector = result.as_boolean();
assert_eq!(3, vector.len());
for (i, gt) in results.iter().enumerate() {
let result = vector.get_ref(i);
let result = result.as_boolean().unwrap();
let result = vector.is_valid(i).then(|| vector.value(i));
assert_eq!(*gt, result);
}
}
@@ -392,19 +354,12 @@ mod tests {
assert_eq!("json_get_string", json_get_string.name());
assert_eq!(
DataType::Utf8,
DataType::Utf8View,
json_get_string
.return_type(&[DataType::Binary, DataType::Utf8])
.unwrap()
);
assert!(matches!(json_get_string.signature(),
Signature {
type_signature: TypeSignature::Exact(valid_types),
volatility: Volatility::Immutable
} if valid_types == vec![DataType::Binary, DataType::Utf8]
));
let json_strings = [
r#"{"a": {"b": "a"}, "b": "b", "c": "c"}"#,
r#"{"a": "d", "b": {"c": "e"}, "c": "f"}"#,
@@ -421,17 +376,25 @@ mod tests {
})
.collect::<Vec<_>>();
let json_vector = BinaryVector::from_vec(jsonbs);
let path_vector = StringVector::from_vec(paths);
let args: Vec<VectorRef> = vec![Arc::new(json_vector), Arc::new(path_vector)];
let vector = json_get_string
.eval(&FunctionContext::default(), &args)
let args = ScalarFunctionArgs {
args: vec![
ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))),
ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
],
arg_fields: vec![],
number_rows: 3,
return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
config_options: Arc::new(Default::default()),
};
let result = json_get_string
.invoke_with_args(args)
.and_then(|x| x.to_array(3))
.unwrap();
let vector = result.as_string_view();
assert_eq!(3, vector.len());
for (i, gt) in results.iter().enumerate() {
let result = vector.get_ref(i);
let result = result.as_string().unwrap();
let result = vector.is_valid(i).then(|| vector.value(i));
assert_eq!(*gt, result);
}
}

View File

@@ -13,17 +13,15 @@
// limitations under the License.
use std::fmt::{self, Display};
use std::sync::Arc;
use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu};
use datafusion_expr::{Signature, Volatility};
use datatypes::arrow::datatypes::DataType;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::VectorRef;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{BooleanVectorBuilder, MutableVector};
use snafu::ensure;
use common_query::error::Result;
use datafusion_common::arrow::array::{Array, AsArray, BooleanBuilder};
use datafusion_common::arrow::compute;
use datafusion_common::arrow::datatypes::DataType;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
use crate::function::{Function, FunctionContext};
use crate::function::{Function, extract_args};
/// Checks if the input is a JSON object of the given type.
macro_rules! json_is {
@@ -43,50 +41,36 @@ macro_rules! json_is {
fn signature(&self) -> Signature {
// TODO(LFC): Use a more clear type here instead of "Binary" for Json input, once we have a "Json" type.
Signature::exact(vec![DataType::Binary], Volatility::Immutable)
Signature::uniform(
1,
vec![DataType::Binary, DataType::BinaryView],
Volatility::Immutable,
)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly one, have: {}",
columns.len()
),
}
);
fn invoke_with_args(
&self,
args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
let [arg0] = extract_args(self.name(), &args)?;
let jsons = &columns[0];
let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
let jsons = arg0.as_binary_view();
let size = jsons.len();
let datatype = jsons.data_type();
let mut results = BooleanVectorBuilder::with_capacity(size);
let mut builder = BooleanBuilder::with_capacity(size);
match datatype {
// JSON data type uses binary vector
ConcreteDataType::Binary(_) => {
for i in 0..size {
let json = jsons.get_ref(i);
let json = json.as_binary();
let result = match json {
Ok(Some(json)) => {
Some(jsonb::[<is_ $json_type>](json))
}
_ => None,
};
results.push(result);
for i in 0..size {
let json = jsons.is_valid(i).then(|| jsons.value(i));
let result = match json {
Some(json) => {
Some(jsonb::[<is_ $json_type>](json))
}
}
_ => {
return UnsupportedInputDataTypeSnafu {
function: stringify!([<$name:snake>]),
datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
}
.fail();
}
_ => None,
};
builder.append_option(result);
}
Ok(results.to_vector())
Ok(ColumnarValue::Array(Arc::new(builder.finish())))
}
}
@@ -96,7 +80,7 @@ macro_rules! json_is {
}
}
}
}
};
}
json_is!(JsonIsNull, null, "Checks if the input JSONB is null");
@@ -135,8 +119,8 @@ json_is!(
mod tests {
use std::sync::Arc;
use datatypes::scalars::ScalarVector;
use datatypes::vectors::BinaryVector;
use arrow_schema::Field;
use datafusion_common::arrow::array::{AsArray, BinaryArray};
use super::*;
@@ -166,7 +150,11 @@ mod tests {
);
assert_eq!(
func.signature(),
Signature::exact(vec![DataType::Binary], Volatility::Immutable)
Signature::uniform(
1,
vec![DataType::Binary, DataType::BinaryView],
Volatility::Immutable
)
);
}
@@ -195,16 +183,26 @@ mod tests {
value.to_vec()
})
.collect::<Vec<_>>();
let json_vector = BinaryVector::from_vec(jsonbs);
let args: Vec<VectorRef> = vec![Arc::new(json_vector)];
let args = ScalarFunctionArgs {
args: vec![ColumnarValue::Array(Arc::new(
BinaryArray::from_iter_values(jsonbs),
))],
arg_fields: vec![],
number_rows: 6,
return_field: Arc::new(Field::new("", DataType::Boolean, false)),
config_options: Arc::new(Default::default()),
};
for (func, expected_result) in json_is_functions.iter().zip(expected_results.iter()) {
let vector = func.eval(&FunctionContext::default(), &args).unwrap();
let result = func
.invoke_with_args(args.clone())
.and_then(|x| x.to_array(6))
.unwrap();
let vector = result.as_boolean();
assert_eq!(vector.len(), json_strings.len());
for (i, expected) in expected_result.iter().enumerate() {
let result = vector.get_ref(i);
let result = result.as_boolean().unwrap().unwrap();
let result = vector.value(i);
assert_eq!(result, *expected);
}
}

View File

@@ -13,17 +13,17 @@
// limitations under the License.
use std::fmt::{self, Display};
use std::sync::Arc;
use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu};
use datafusion_expr::{Signature, TypeSignature, Volatility};
use datatypes::arrow::datatypes::DataType;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::VectorRef;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{BooleanVectorBuilder, MutableVector};
use snafu::ensure;
use arrow::compute;
use common_query::error::Result;
use datafusion_common::DataFusionError;
use datafusion_common::arrow::array::{Array, AsArray, BooleanBuilder};
use datafusion_common::arrow::datatypes::DataType;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature};
use crate::function::{Function, FunctionContext};
use crate::function::{Function, extract_args};
use crate::helper;
/// Check if the given JSON data contains the given JSON path.
#[derive(Clone, Debug, Default)]
@@ -42,48 +42,41 @@ impl Function for JsonPathExistsFunction {
fn signature(&self) -> Signature {
// TODO(LFC): Use a more clear type here instead of "Binary" for Json input, once we have a "Json" type.
Signature::one_of(
vec![
TypeSignature::Exact(vec![DataType::Binary, DataType::Utf8]),
TypeSignature::Exact(vec![DataType::Null, DataType::Utf8]),
TypeSignature::Exact(vec![DataType::Binary, DataType::Null]),
TypeSignature::Exact(vec![DataType::Null, DataType::Null]),
],
Volatility::Immutable,
helper::one_of_sigs2(
vec![DataType::Binary, DataType::BinaryView, DataType::Null],
vec![DataType::Utf8, DataType::Utf8View, DataType::Null],
)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly two, have: {}",
columns.len()
),
}
);
let jsons = &columns[0];
let paths = &columns[1];
fn invoke_with_args(
&self,
args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
let [jsons, paths] = extract_args(self.name(), &args)?;
let size = jsons.len();
let mut results = BooleanVectorBuilder::with_capacity(size);
let mut builder = BooleanBuilder::with_capacity(size);
match (jsons.data_type(), paths.data_type()) {
(ConcreteDataType::Binary(_), ConcreteDataType::String(_)) => {
(DataType::Null, _) | (_, DataType::Null) => builder.append_nulls(size),
_ => {
let jsons = compute::cast(&jsons, &DataType::BinaryView)?;
let jsons = jsons.as_binary_view();
let paths = compute::cast(&paths, &DataType::Utf8View)?;
let paths = paths.as_string_view();
for i in 0..size {
let result = match (jsons.get_ref(i).as_binary(), paths.get_ref(i).as_string())
{
(Ok(Some(json)), Ok(Some(path))) => {
let json = jsons.is_valid(i).then(|| jsons.value(i));
let path = paths.is_valid(i).then(|| paths.value(i));
let result = match (json, path) {
(Some(json), Some(path)) => {
// Get `JsonPath`.
let json_path = match jsonb::jsonpath::parse_json_path(path.as_bytes())
{
Ok(json_path) => json_path,
Err(_) => {
return InvalidFuncArgsSnafu {
err_msg: format!("Illegal json path: {:?}", path),
}
.fail();
Err(e) => {
return Err(DataFusionError::Execution(format!(
"invalid json path '{path}': {e}"
)));
}
};
jsonb::path_exists(json, json_path).ok()
@@ -91,25 +84,12 @@ impl Function for JsonPathExistsFunction {
_ => None,
};
results.push(result);
builder.append_option(result);
}
}
// Any null args existence causes the result to be NULL.
(ConcreteDataType::Null(_), ConcreteDataType::String(_)) => results.push_nulls(size),
(ConcreteDataType::Binary(_), ConcreteDataType::Null(_)) => results.push_nulls(size),
(ConcreteDataType::Null(_), ConcreteDataType::Null(_)) => results.push_nulls(size),
_ => {
return UnsupportedInputDataTypeSnafu {
function: NAME,
datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
}
.fail();
}
}
Ok(results.to_vector())
Ok(ColumnarValue::Array(Arc::new(builder.finish())))
}
}
@@ -123,8 +103,8 @@ impl Display for JsonPathExistsFunction {
mod tests {
use std::sync::Arc;
use datatypes::prelude::ScalarVector;
use datatypes::vectors::{BinaryVector, NullVector, StringVector};
use arrow_schema::Field;
use datafusion_common::arrow::array::{BinaryArray, NullArray, StringArray};
use super::*;
@@ -138,31 +118,6 @@ mod tests {
json_path_exists.return_type(&[DataType::Binary]).unwrap()
);
assert!(matches!(json_path_exists.signature(),
Signature {
type_signature: TypeSignature::OneOf(valid_types),
volatility: Volatility::Immutable
} if valid_types ==
vec![
TypeSignature::Exact(vec![
DataType::Binary,
DataType::Utf8,
]),
TypeSignature::Exact(vec![
DataType::Null,
DataType::Utf8,
]),
TypeSignature::Exact(vec![
DataType::Binary,
DataType::Null,
]),
TypeSignature::Exact(vec![
DataType::Null,
DataType::Null,
]),
],
));
let json_strings = [
r#"{"a": {"b": 2}, "b": 2, "c": 3}"#,
r#"{"a": 4, "b": {"c": 6}, "c": 6}"#,
@@ -186,51 +141,83 @@ mod tests {
})
.collect::<Vec<_>>();
let json_vector = BinaryVector::from_vec(jsonbs);
let path_vector = StringVector::from_vec(paths);
let args: Vec<VectorRef> = vec![Arc::new(json_vector), Arc::new(path_vector)];
let vector = json_path_exists
.eval(&FunctionContext::default(), &args)
let args = ScalarFunctionArgs {
args: vec![
ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))),
ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
],
arg_fields: vec![],
number_rows: 8,
return_field: Arc::new(Field::new("x", DataType::Boolean, false)),
config_options: Arc::new(Default::default()),
};
let result = json_path_exists
.invoke_with_args(args)
.and_then(|x| x.to_array(8))
.unwrap();
let vector = result.as_boolean();
// Test for non-nulls.
assert_eq!(8, vector.len());
for (i, real) in expected.iter().enumerate() {
let result = vector.get_ref(i);
assert!(!result.is_null());
let val = result.as_boolean().unwrap().unwrap();
let val = vector.value(i);
assert_eq!(val, *real);
}
// Test for path error.
let json_bytes = jsonb::parse_value("{}".as_bytes()).unwrap().to_vec();
let json = BinaryVector::from_vec(vec![json_bytes]);
let illegal_path = StringVector::from_vec(vec!["$..a"]);
let illegal_path = "$..a";
let args: Vec<VectorRef> = vec![Arc::new(json), Arc::new(illegal_path)];
let err = json_path_exists.eval(&FunctionContext::default(), &args);
let args = ScalarFunctionArgs {
args: vec![
ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(vec![json_bytes]))),
ColumnarValue::Array(Arc::new(StringArray::from_iter_values(vec![illegal_path]))),
],
arg_fields: vec![],
number_rows: 1,
return_field: Arc::new(Field::new("x", DataType::Boolean, false)),
config_options: Arc::new(Default::default()),
};
let err = json_path_exists.invoke_with_args(args);
assert!(err.is_err());
// Test for nulls.
let json_bytes = jsonb::parse_value("{}".as_bytes()).unwrap().to_vec();
let json = BinaryVector::from_vec(vec![json_bytes]);
let null_json = NullVector::new(1);
let json = Arc::new(BinaryArray::from_iter_values(vec![json_bytes]));
let null_json = Arc::new(NullArray::new(1));
let path = StringVector::from_vec(vec!["$.a"]);
let null_path = NullVector::new(1);
let path = Arc::new(StringArray::from_iter_values(vec!["$.a"]));
let null_path = Arc::new(NullArray::new(1));
let args: Vec<VectorRef> = vec![Arc::new(null_json), Arc::new(path)];
let result1 = json_path_exists
.eval(&FunctionContext::default(), &args)
let args = ScalarFunctionArgs {
args: vec![ColumnarValue::Array(null_json), ColumnarValue::Array(path)],
arg_fields: vec![],
number_rows: 1,
return_field: Arc::new(Field::new("x", DataType::Boolean, false)),
config_options: Arc::new(Default::default()),
};
let result = json_path_exists
.invoke_with_args(args)
.and_then(|x| x.to_array(1))
.unwrap();
let args: Vec<VectorRef> = vec![Arc::new(json), Arc::new(null_path)];
let result2 = json_path_exists
.eval(&FunctionContext::default(), &args)
let result1 = result.as_boolean();
let args = ScalarFunctionArgs {
args: vec![ColumnarValue::Array(json), ColumnarValue::Array(null_path)],
arg_fields: vec![],
number_rows: 1,
return_field: Arc::new(Field::new("x", DataType::Boolean, false)),
config_options: Arc::new(Default::default()),
};
let result = json_path_exists
.invoke_with_args(args)
.and_then(|x| x.to_array(1))
.unwrap();
let result2 = result.as_boolean();
assert_eq!(result1.len(), 1);
assert!(result1.get_ref(0).is_null());
assert!(result1.is_null(0));
assert_eq!(result2.len(), 1);
assert!(result2.get_ref(0).is_null());
assert!(result2.is_null(0));
}
}

View File

@@ -13,17 +13,16 @@
// limitations under the License.
use std::fmt::{self, Display};
use std::sync::Arc;
use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu};
use datafusion_expr::{Signature, Volatility};
use datatypes::arrow::datatypes::DataType;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::VectorRef;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{BooleanVectorBuilder, MutableVector};
use snafu::ensure;
use arrow::compute;
use common_query::error::Result;
use datafusion_common::arrow::array::{Array, AsArray, BooleanBuilder};
use datafusion_common::arrow::datatypes::DataType;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature};
use crate::function::{Function, FunctionContext};
use crate::function::{Function, extract_args};
use crate::helper;
/// Check if the given JSON data match the given JSON path's predicate.
#[derive(Clone, Debug, Default)]
@@ -42,66 +41,47 @@ impl Function for JsonPathMatchFunction {
fn signature(&self) -> Signature {
// TODO(LFC): Use a more clear type here instead of "Binary" for Json input, once we have a "Json" type.
Signature::exact(
vec![DataType::Binary, DataType::Utf8],
Volatility::Immutable,
helper::one_of_sigs2(
vec![DataType::Binary, DataType::BinaryView],
vec![DataType::Utf8, DataType::Utf8View],
)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly two, have: {}",
columns.len()
),
}
);
let jsons = &columns[0];
let paths = &columns[1];
fn invoke_with_args(
&self,
args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
let [arg0, arg1] = extract_args(self.name(), &args)?;
let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
let jsons = arg0.as_binary_view();
let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
let paths = arg1.as_string_view();
let size = jsons.len();
let mut results = BooleanVectorBuilder::with_capacity(size);
let mut builder = BooleanBuilder::with_capacity(size);
for i in 0..size {
let json = jsons.get_ref(i);
let path = paths.get_ref(i);
let json = jsons.is_valid(i).then(|| jsons.value(i));
let path = paths.is_valid(i).then(|| paths.value(i));
match json.data_type() {
// JSON data type uses binary vector
ConcreteDataType::Binary(_) => {
let json = json.as_binary();
let path = path.as_string();
let result = match (json, path) {
(Ok(Some(json)), Ok(Some(path))) => {
if !jsonb::is_null(json) {
let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes());
match json_path {
Ok(json_path) => jsonb::path_match(json, json_path).ok(),
Err(_) => None,
}
} else {
None
}
let result = match (json, path) {
(Some(json), Some(path)) => {
if !jsonb::is_null(json) {
let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes());
match json_path {
Ok(json_path) => jsonb::path_match(json, json_path).ok(),
Err(_) => None,
}
_ => None,
};
results.push(result);
}
_ => {
return UnsupportedInputDataTypeSnafu {
function: NAME,
datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
} else {
None
}
.fail();
}
}
_ => None,
};
builder.append_option(result);
}
Ok(results.to_vector())
Ok(ColumnarValue::Array(Arc::new(builder.finish())))
}
}
@@ -115,8 +95,8 @@ impl Display for JsonPathMatchFunction {
mod tests {
use std::sync::Arc;
use datafusion_expr::TypeSignature;
use datatypes::vectors::{BinaryVector, StringVector};
use arrow_schema::Field;
use datafusion_common::arrow::array::{BinaryArray, StringArray};
use super::*;
@@ -130,13 +110,6 @@ mod tests {
json_path_match.return_type(&[DataType::Binary]).unwrap()
);
assert!(matches!(json_path_match.signature(),
Signature {
type_signature: TypeSignature::Exact(valid_types),
volatility: Volatility::Immutable
} if valid_types == vec![DataType::Binary, DataType::Utf8],
));
let json_strings = [
Some(r#"{"a": {"b": 2}, "b": 2, "c": 3}"#.to_string()),
Some(r#"{"a": 1, "b": [1,2,3]}"#.to_string()),
@@ -172,27 +145,25 @@ mod tests {
.map(|s| s.map(|json| jsonb::parse_value(json.as_bytes()).unwrap().to_vec()))
.collect::<Vec<_>>();
let json_vector = BinaryVector::from(jsonbs);
let path_vector = StringVector::from(paths);
let args: Vec<VectorRef> = vec![Arc::new(json_vector), Arc::new(path_vector)];
let vector = json_path_match
.eval(&FunctionContext::default(), &args)
let args = ScalarFunctionArgs {
args: vec![
ColumnarValue::Array(Arc::new(BinaryArray::from_iter(jsonbs))),
ColumnarValue::Array(Arc::new(StringArray::from_iter(paths))),
],
arg_fields: vec![],
number_rows: 7,
return_field: Arc::new(Field::new("x", DataType::Boolean, false)),
config_options: Arc::new(Default::default()),
};
let result = json_path_match
.invoke_with_args(args)
.and_then(|x| x.to_array(7))
.unwrap();
let vector = result.as_boolean();
assert_eq!(7, vector.len());
for (i, expected) in results.iter().enumerate() {
let result = vector.get_ref(i);
match expected {
Some(expected_value) => {
assert!(!result.is_null());
let result_value = result.as_boolean().unwrap().unwrap();
assert_eq!(*expected_value, result_value);
}
None => {
assert!(result.is_null());
}
}
for (actual, expected) in vector.iter().zip(results) {
assert_eq!(actual, expected);
}
}
}

View File

@@ -13,17 +13,15 @@
// limitations under the License.
use std::fmt::{self, Display};
use std::sync::Arc;
use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu};
use datafusion_expr::{Signature, Volatility};
use datatypes::arrow::datatypes::DataType;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::VectorRef;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{MutableVector, StringVectorBuilder};
use snafu::ensure;
use common_query::error::Result;
use datafusion_common::DataFusionError;
use datafusion_common::arrow::array::{Array, AsArray, StringViewBuilder};
use datafusion_common::arrow::datatypes::DataType;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
use crate::function::{Function, FunctionContext};
use crate::function::{Function, extract_args};
/// Converts the `JSONB` into `String`. It's useful for displaying JSONB content.
#[derive(Clone, Debug, Default)]
@@ -37,7 +35,7 @@ impl Function for JsonToStringFunction {
}
fn return_type(&self, _: &[DataType]) -> Result<DataType> {
Ok(DataType::Utf8)
Ok(DataType::Utf8View)
}
fn signature(&self) -> Signature {
@@ -45,58 +43,27 @@ impl Function for JsonToStringFunction {
Signature::exact(vec![DataType::Binary], Volatility::Immutable)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly one, have: {}",
columns.len()
),
}
);
let jsons = &columns[0];
fn invoke_with_args(
&self,
args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
let [arg0] = extract_args(self.name(), &args)?;
let jsons = arg0.as_binary::<i32>();
let size = jsons.len();
let datatype = jsons.data_type();
let mut results = StringVectorBuilder::with_capacity(size);
let mut builder = StringViewBuilder::with_capacity(size);
match datatype {
// JSON data type uses binary vector
ConcreteDataType::Binary(_) => {
for i in 0..size {
let json = jsons.get_ref(i);
for i in 0..size {
let json = jsons.is_valid(i).then(|| jsons.value(i));
let result = json
.map(|json| jsonb::from_slice(json).map(|x| x.to_string()))
.transpose()
.map_err(|e| DataFusionError::Execution(format!("invalid json binary: {e}")))?;
let json = json.as_binary();
let result = match json {
Ok(Some(json)) => match jsonb::from_slice(json) {
Ok(json) => {
let json = json.to_string();
Some(json)
}
Err(_) => {
return InvalidFuncArgsSnafu {
err_msg: format!("Illegal json binary: {:?}", json),
}
.fail();
}
},
_ => None,
};
results.push(result.as_deref());
}
}
_ => {
return UnsupportedInputDataTypeSnafu {
function: NAME,
datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
}
.fail();
}
builder.append_option(result.as_deref());
}
Ok(results.to_vector())
Ok(ColumnarValue::Array(Arc::new(builder.finish())))
}
}
@@ -110,9 +77,9 @@ impl Display for JsonToStringFunction {
mod tests {
use std::sync::Arc;
use arrow_schema::Field;
use datafusion_common::arrow::array::BinaryArray;
use datafusion_expr::TypeSignature;
use datatypes::scalars::ScalarVector;
use datatypes::vectors::BinaryVector;
use super::*;
@@ -122,7 +89,7 @@ mod tests {
assert_eq!("json_to_string", json_to_string.name());
assert_eq!(
DataType::Utf8,
DataType::Utf8View,
json_to_string.return_type(&[DataType::Binary]).unwrap()
);
@@ -147,24 +114,39 @@ mod tests {
})
.collect::<Vec<_>>();
let json_vector = BinaryVector::from_vec(jsonbs);
let args: Vec<VectorRef> = vec![Arc::new(json_vector)];
let vector = json_to_string
.eval(&FunctionContext::default(), &args)
let args = ScalarFunctionArgs {
args: vec![ColumnarValue::Array(Arc::new(
BinaryArray::from_iter_values(jsonbs),
))],
arg_fields: vec![],
number_rows: 3,
return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
config_options: Arc::new(Default::default()),
};
let result = json_to_string
.invoke_with_args(args)
.and_then(|x| x.to_array(1))
.unwrap();
let vector = result.as_string_view();
assert_eq!(3, vector.len());
for (i, gt) in json_strings.iter().enumerate() {
let result = vector.get_ref(i);
let result = result.as_string().unwrap().unwrap();
let result = vector.value(i);
// remove whitespaces
assert_eq!(gt.replace(" ", ""), result);
}
let invalid_jsonb = vec![b"invalid json"];
let invalid_json_vector = BinaryVector::from_vec(invalid_jsonb);
let args: Vec<VectorRef> = vec![Arc::new(invalid_json_vector)];
let vector = json_to_string.eval(&FunctionContext::default(), &args);
let args = ScalarFunctionArgs {
args: vec![ColumnarValue::Array(Arc::new(
BinaryArray::from_iter_values(invalid_jsonb),
))],
arg_fields: vec![],
number_rows: 1,
return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
config_options: Arc::new(Default::default()),
};
let vector = json_to_string.invoke_with_args(args);
assert!(vector.is_err());
}
}

View File

@@ -13,17 +13,16 @@
// limitations under the License.
use std::fmt::{self, Display};
use std::sync::Arc;
use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu};
use datafusion::arrow::datatypes::DataType;
use datafusion_expr::{Signature, Volatility};
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::VectorRef;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{BinaryVectorBuilder, MutableVector};
use snafu::ensure;
use common_query::error::Result;
use datafusion_common::DataFusionError;
use datafusion_common::arrow::array::{Array, AsArray, BinaryViewBuilder};
use datafusion_common::arrow::compute;
use datafusion_common::arrow::datatypes::DataType;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
use crate::function::{Function, FunctionContext};
use crate::function::{Function, extract_args};
/// Parses the `String` into `JSONB`.
#[derive(Clone, Debug, Default)]
@@ -37,64 +36,37 @@ impl Function for ParseJsonFunction {
}
fn return_type(&self, _: &[DataType]) -> Result<DataType> {
Ok(DataType::Binary)
Ok(DataType::BinaryView)
}
fn signature(&self) -> Signature {
Signature::string(1, Volatility::Immutable)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly one, have: {}",
columns.len()
),
}
);
let json_strings = &columns[0];
fn invoke_with_args(
&self,
args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
let [arg0] = extract_args(self.name(), &args)?;
let arg0 = compute::cast(&arg0, &DataType::Utf8View)?;
let json_strings = arg0.as_string_view();
let size = json_strings.len();
let datatype = json_strings.data_type();
let mut results = BinaryVectorBuilder::with_capacity(size);
let mut builder = BinaryViewBuilder::with_capacity(size);
match datatype {
ConcreteDataType::String(_) => {
for i in 0..size {
let json_string = json_strings.get_ref(i);
let json_string = json_string.as_string();
let result = match json_string {
Ok(Some(json_string)) => match jsonb::parse_value(json_string.as_bytes()) {
Ok(json) => Some(json.to_vec()),
Err(_) => {
return InvalidFuncArgsSnafu {
err_msg: format!(
"Cannot convert the string to json, have: {}",
json_string
),
}
.fail();
}
},
_ => None,
};
results.push(result.as_deref());
}
}
_ => {
return UnsupportedInputDataTypeSnafu {
function: NAME,
datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
}
.fail();
}
for i in 0..size {
let s = json_strings.is_valid(i).then(|| json_strings.value(i));
let result = s
.map(|s| {
jsonb::parse_value(s.as_bytes())
.map(|x| x.to_vec())
.map_err(|e| DataFusionError::Execution(format!("cannot parse '{s}': {e}")))
})
.transpose()?;
builder.append_option(result.as_deref());
}
Ok(results.to_vector())
Ok(ColumnarValue::Array(Arc::new(builder.finish())))
}
}
@@ -108,8 +80,8 @@ impl Display for ParseJsonFunction {
mod tests {
use std::sync::Arc;
use datatypes::scalars::ScalarVector;
use datatypes::vectors::StringVector;
use arrow_schema::Field;
use datafusion_common::arrow::array::StringViewArray;
use super::*;
@@ -119,7 +91,7 @@ mod tests {
assert_eq!("parse_json", parse_json.name());
assert_eq!(
DataType::Binary,
DataType::BinaryView,
parse_json.return_type(&[DataType::Binary]).unwrap()
);
@@ -137,14 +109,24 @@ mod tests {
})
.collect::<Vec<_>>();
let json_string_vector = StringVector::from_vec(json_strings.to_vec());
let args: Vec<VectorRef> = vec![Arc::new(json_string_vector)];
let vector = parse_json.eval(&FunctionContext::default(), &args).unwrap();
let args = ScalarFunctionArgs {
args: vec![ColumnarValue::Array(Arc::new(
StringViewArray::from_iter_values(json_strings),
))],
arg_fields: vec![],
number_rows: 3,
return_field: Arc::new(Field::new("x", DataType::BinaryView, false)),
config_options: Arc::new(Default::default()),
};
let result = parse_json
.invoke_with_args(args)
.and_then(|x| x.to_array(3))
.unwrap();
let vector = result.as_binary_view();
assert_eq!(3, vector.len());
for (i, gt) in jsonbs.iter().enumerate() {
let result = vector.get_ref(i);
let result = result.as_binary().unwrap().unwrap();
let result = vector.value(i);
assert_eq!(gt, result);
}
}

View File

@@ -13,18 +13,17 @@
// limitations under the License.
use std::fmt;
use std::iter::repeat_n;
use std::sync::Arc;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use datafusion::arrow::datatypes::DataType;
use datafusion_expr::{Signature, Volatility};
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{BooleanVector, BooleanVectorBuilder, MutableVector, VectorRef};
use common_query::error::Result;
use datafusion_common::arrow::array::{Array, AsArray, BooleanArray, BooleanBuilder};
use datafusion_common::arrow::compute;
use datafusion_common::arrow::datatypes::DataType;
use datafusion_common::{DataFusionError, ScalarValue};
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
use memchr::memmem;
use snafu::ensure;
use crate::function::{Function, FunctionContext};
use crate::function::Function;
use crate::function_registry::FunctionRegistry;
/// Exact term/phrase matching function for text columns.
@@ -100,64 +99,94 @@ impl Function for MatchesTermFunction {
Signature::string(2, Volatility::Immutable)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly 2, have: {}",
columns.len()
),
}
);
fn invoke_with_args(
&self,
args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
let [arg0, arg1] = datafusion_common::utils::take_function_args(self.name(), &args.args)?;
let text_column = &columns[0];
if text_column.is_empty() {
return Ok(Arc::new(BooleanVector::from(Vec::<bool>::with_capacity(0))));
fn as_str(v: &ScalarValue) -> Option<&str> {
match v {
ScalarValue::Utf8View(Some(x))
| ScalarValue::Utf8(Some(x))
| ScalarValue::LargeUtf8(Some(x)) => Some(x.as_str()),
_ => None,
}
}
let term_column = &columns[1];
let compiled_finder = if term_column.is_const() {
let term = term_column.get_ref(0).as_string().unwrap();
match term {
None => {
return Ok(Arc::new(BooleanVector::from_iter(repeat_n(
None,
text_column.len(),
))));
}
Some(term) => Some(MatchesTermFinder::new(term)),
}
} else {
None
if let (ColumnarValue::Scalar(text), ColumnarValue::Scalar(term)) = (arg0, arg1) {
let text = as_str(text);
let term = as_str(term);
let result = match (text, term) {
(Some(text), Some(term)) => Some(MatchesTermFinder::new(term).find(text)),
_ => None,
};
return Ok(ColumnarValue::Scalar(ScalarValue::Boolean(result)));
};
let len = text_column.len();
let mut result = BooleanVectorBuilder::with_capacity(len);
for i in 0..len {
let text = text_column.get_ref(i).as_string().unwrap();
let Some(text) = text else {
result.push_null();
continue;
};
let v = match (arg0, arg1) {
(ColumnarValue::Scalar(_), ColumnarValue::Scalar(_)) => {
// Unreachable because we have checked this case above and returned if matched.
unreachable!()
}
(ColumnarValue::Scalar(text), ColumnarValue::Array(terms)) => {
let text = as_str(text);
if let Some(text) = text {
let terms = compute::cast(terms, &DataType::Utf8View)?;
let terms = terms.as_string_view();
let contains = match &compiled_finder {
Some(finder) => finder.find(text),
None => {
let term = match term_column.get_ref(i).as_string().unwrap() {
None => {
result.push_null();
continue;
}
Some(term) => term,
};
MatchesTermFinder::new(term).find(text)
let mut builder = BooleanBuilder::with_capacity(terms.len());
terms.iter().for_each(|term| {
builder.append_option(term.map(|x| MatchesTermFinder::new(x).find(text)))
});
ColumnarValue::Array(Arc::new(builder.finish()))
} else {
ColumnarValue::Array(Arc::new(BooleanArray::new_null(terms.len())))
}
};
result.push(Some(contains));
}
}
(ColumnarValue::Array(texts), ColumnarValue::Scalar(term)) => {
let term = as_str(term);
if let Some(term) = term {
let finder = MatchesTermFinder::new(term);
Ok(result.to_vector())
let texts = compute::cast(texts, &DataType::Utf8View)?;
let texts = texts.as_string_view();
let mut builder = BooleanBuilder::with_capacity(texts.len());
texts
.iter()
.for_each(|text| builder.append_option(text.map(|x| finder.find(x))));
ColumnarValue::Array(Arc::new(builder.finish()))
} else {
ColumnarValue::Array(Arc::new(BooleanArray::new_null(texts.len())))
}
}
(ColumnarValue::Array(texts), ColumnarValue::Array(terms)) => {
let terms = compute::cast(terms, &DataType::Utf8View)?;
let terms = terms.as_string_view();
let texts = compute::cast(texts, &DataType::Utf8View)?;
let texts = texts.as_string_view();
let len = texts.len();
if terms.len() != len {
return Err(DataFusionError::Internal(format!(
"input arrays have different lengths: {len}, {}",
terms.len()
)));
}
let mut builder = BooleanBuilder::with_capacity(len);
for (text, term) in texts.iter().zip(terms.iter()) {
let result = match (text, term) {
(Some(text), Some(term)) => Some(MatchesTermFinder::new(term).find(text)),
_ => None,
};
builder.append_option(result);
}
ColumnarValue::Array(Arc::new(builder.finish()))
}
};
Ok(v)
}
}

View File

@@ -16,14 +16,13 @@ use std::fmt;
use common_query::error::{self, Result};
use datafusion::arrow::compute::kernels::numeric;
use datafusion_common::arrow::compute::kernels::cast;
use datafusion_common::arrow::datatypes::DataType;
use datafusion_expr::type_coercion::aggregates::NUMERICS;
use datafusion_expr::{Signature, Volatility};
use datatypes::arrow::compute::kernels::cast;
use datatypes::arrow::datatypes::DataType;
use datatypes::vectors::{Helper, VectorRef};
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
use snafu::ResultExt;
use crate::function::{Function, FunctionContext};
use crate::function::{Function, extract_args};
/// generates rates from a sequence of adjacent data points.
#[derive(Clone, Debug, Default)]
@@ -48,12 +47,14 @@ impl Function for RateFunction {
Signature::uniform(2, NUMERICS.to_vec(), Volatility::Immutable)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
let val = &columns[0].to_arrow_array();
fn invoke_with_args(
&self,
args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
let [val, ts] = extract_args(self.name(), &args)?;
let val_0 = val.slice(0, val.len() - 1);
let val_1 = val.slice(1, val.len() - 1);
let dv = numeric::sub(&val_1, &val_0).context(error::ArrowComputeSnafu)?;
let ts = &columns[1].to_arrow_array();
let ts_0 = ts.slice(0, ts.len() - 1);
let ts_1 = ts.slice(1, ts.len() - 1);
let dt = numeric::sub(&ts_1, &ts_0).context(error::ArrowComputeSnafu)?;
@@ -65,9 +66,8 @@ impl Function for RateFunction {
typ: DataType::Float64,
})?;
let rate = numeric::div(&dv, &dt).context(error::ArrowComputeSnafu)?;
let v = Helper::try_into_vector(&rate).context(error::FromArrowArraySnafu)?;
Ok(v)
Ok(ColumnarValue::Array(rate))
}
}
@@ -75,8 +75,10 @@ impl Function for RateFunction {
mod tests {
use std::sync::Arc;
use arrow_schema::Field;
use datafusion_common::arrow::array::{AsArray, Float32Array, Float64Array, Int64Array};
use datafusion_common::arrow::datatypes::Float64Type;
use datafusion_expr::TypeSignature;
use datatypes::vectors::{Float32Vector, Float64Vector, Int64Vector};
use super::*;
#[test]
@@ -93,12 +95,22 @@ mod tests {
let values = vec![1.0, 3.0, 6.0];
let ts = vec![0, 1, 2];
let args: Vec<VectorRef> = vec![
Arc::new(Float32Vector::from_vec(values)),
Arc::new(Int64Vector::from_vec(ts)),
];
let vector = rate.eval(&FunctionContext::default(), &args).unwrap();
let expect: VectorRef = Arc::new(Float64Vector::from_vec(vec![2.0, 3.0]));
let args = ScalarFunctionArgs {
args: vec![
ColumnarValue::Array(Arc::new(Float32Array::from(values))),
ColumnarValue::Array(Arc::new(Int64Array::from(ts))),
],
arg_fields: vec![],
number_rows: 3,
return_field: Arc::new(Field::new("x", DataType::Float64, false)),
config_options: Arc::new(Default::default()),
};
let result = rate
.invoke_with_args(args)
.and_then(|x| x.to_array(2))
.unwrap();
let vector = result.as_primitive::<Float64Type>();
let expect = &Float64Array::from(vec![2.0, 3.0]);
assert_eq!(expect, vector);
}
}

View File

@@ -15,15 +15,18 @@
use std::fmt;
use std::sync::Arc;
use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu};
use common_query::error::Result;
use common_time::{Date, Timestamp};
use datafusion_expr::{Signature, Volatility};
use datatypes::arrow::datatypes::{DataType, TimeUnit};
use datatypes::prelude::ConcreteDataType;
use datatypes::vectors::{Int64Vector, VectorRef};
use snafu::ensure;
use datafusion_common::DataFusionError;
use datafusion_common::arrow::array::{
Array, ArrayRef, AsArray, Date32Array, Int64Array, Int64Builder,
};
use datafusion_common::arrow::compute;
use datafusion_common::arrow::datatypes::{ArrowTimestampType, DataType, Date32Type, TimeUnit};
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
use crate::function::{Function, FunctionContext};
use crate::function::{Function, FunctionContext, extract_args, find_function_context};
use crate::helper::with_match_timestamp_types;
/// A function to convert the column into the unix timestamp in seconds.
#[derive(Clone, Debug, Default)]
@@ -44,15 +47,23 @@ fn convert_to_seconds(arg: &str, func_ctx: &FunctionContext) -> Option<i64> {
None
}
fn convert_timestamps_to_seconds(vector: &VectorRef) -> Vec<Option<i64>> {
(0..vector.len())
.map(|i| vector.get(i).as_timestamp().map(|ts| ts.split().0))
.collect::<Vec<Option<i64>>>()
fn convert_timestamps_to_seconds(array: &ArrayRef) -> datafusion_common::Result<Vec<Option<i64>>> {
with_match_timestamp_types!(array.data_type(), |$S| {
let array = array.as_primitive::<$S>();
array
.iter()
.map(|x| x.map(|i| Timestamp::new(i, $S::UNIT.into()).split().0))
.collect::<Vec<_>>()
})
}
fn convert_dates_to_seconds(vector: &VectorRef) -> Vec<Option<i64>> {
fn convert_dates_to_seconds(vector: &Date32Array) -> Vec<Option<i64>> {
(0..vector.len())
.map(|i| vector.get(i).as_date().map(|dt| dt.to_secs()))
.map(|i| {
vector
.is_valid(i)
.then(|| Date::from(vector.value(i)).to_secs())
})
.collect::<Vec<Option<i64>>>()
}
@@ -82,43 +93,43 @@ impl Function for ToUnixtimeFunction {
)
}
fn eval(&self, ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly one, have: {}",
columns.len()
),
fn invoke_with_args(
&self,
args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
let ctx = find_function_context(&args)?;
let [arg0] = extract_args(self.name(), &args)?;
let result: ArrayRef = match arg0.data_type() {
DataType::Utf8 => {
let arg0 = arg0.as_string::<i32>();
let mut builder = Int64Builder::with_capacity(arg0.len());
for i in 0..arg0.len() {
builder.append_option(
arg0.is_valid(i)
.then(|| convert_to_seconds(arg0.value(i), ctx))
.flatten(),
);
}
Arc::new(builder.finish())
}
);
let vector = &columns[0];
match columns[0].data_type() {
ConcreteDataType::String(_) => Ok(Arc::new(Int64Vector::from(
(0..vector.len())
.map(|i| convert_to_seconds(&vector.get(i).to_string(), ctx))
.collect::<Vec<_>>(),
))),
ConcreteDataType::Int64(_) | ConcreteDataType::Int32(_) => {
// Safety: cast always successfully at here
Ok(vector.cast(&ConcreteDataType::int64_datatype()).unwrap())
}
ConcreteDataType::Date(_) => {
DataType::Int64 | DataType::Int32 => compute::cast(&arg0, &DataType::Int64)?,
DataType::Date32 => {
let vector = arg0.as_primitive::<Date32Type>();
let seconds = convert_dates_to_seconds(vector);
Ok(Arc::new(Int64Vector::from(seconds)))
Arc::new(Int64Array::from(seconds))
}
ConcreteDataType::Timestamp(_) => {
let seconds = convert_timestamps_to_seconds(vector);
Ok(Arc::new(Int64Vector::from(seconds)))
DataType::Timestamp(_, _) => {
let seconds = convert_timestamps_to_seconds(&arg0)?;
Arc::new(Int64Array::from(seconds))
}
_ => UnsupportedInputDataTypeSnafu {
function: NAME,
datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
x => {
return Err(DataFusionError::Execution(format!(
"{}: unsupported input data type {x}",
self.name()
)));
}
.fail(),
}
};
Ok(ColumnarValue::Array(result))
}
}
@@ -130,14 +141,41 @@ impl fmt::Display for ToUnixtimeFunction {
#[cfg(test)]
mod tests {
use datafusion_expr::TypeSignature;
use datatypes::value::Value;
use datatypes::vectors::{
DateVector, StringVector, TimestampMillisecondVector, TimestampSecondVector,
use arrow_schema::Field;
use datafusion_common::arrow::array::{
StringArray, TimestampMillisecondArray, TimestampSecondArray,
};
use datafusion_common::arrow::datatypes::Int64Type;
use datafusion_common::config::ConfigOptions;
use datafusion_expr::TypeSignature;
use super::{ToUnixtimeFunction, *};
fn test_invoke(arg0: ArrayRef, expects: &[Option<i64>]) {
let mut config_options = ConfigOptions::default();
config_options.extensions.insert(FunctionContext::default());
let config_options = Arc::new(config_options);
let number_rows = arg0.len();
let args = ScalarFunctionArgs {
args: vec![ColumnarValue::Array(arg0)],
arg_fields: vec![],
number_rows,
return_field: Arc::new(Field::new("", DataType::Int64, true)),
config_options,
};
let result = ToUnixtimeFunction
.invoke_with_args(args)
.and_then(|x| x.to_array(number_rows))
.unwrap();
let result = result.as_primitive::<Int64Type>();
assert_eq!(result.len(), expects.len());
for (actual, expect) in result.iter().zip(expects) {
assert_eq!(&actual, expect);
}
}
#[test]
fn test_string_to_unixtime() {
let f = ToUnixtimeFunction;
@@ -167,115 +205,36 @@ mod tests {
Some("invalid_time_stamp"),
];
let results = [Some(1677652502), None, Some(1656633600), None];
let args: Vec<VectorRef> = vec![Arc::new(StringVector::from(times.clone()))];
let vector = f.eval(&FunctionContext::default(), &args).unwrap();
assert_eq!(4, vector.len());
for (i, _t) in times.iter().enumerate() {
let v = vector.get(i);
if i == 1 || i == 3 {
assert_eq!(Value::Null, v);
continue;
}
match v {
Value::Int64(ts) => {
assert_eq!(ts, (*results.get(i).unwrap()).unwrap());
}
_ => unreachable!(),
}
}
let arg0 = Arc::new(StringArray::from(times));
test_invoke(arg0, &results);
}
#[test]
fn test_int_to_unixtime() {
let f = ToUnixtimeFunction;
let times = vec![Some(3_i64), None, Some(5_i64), None];
let results = [Some(3), None, Some(5), None];
let args: Vec<VectorRef> = vec![Arc::new(Int64Vector::from(times.clone()))];
let vector = f.eval(&FunctionContext::default(), &args).unwrap();
assert_eq!(4, vector.len());
for (i, _t) in times.iter().enumerate() {
let v = vector.get(i);
if i == 1 || i == 3 {
assert_eq!(Value::Null, v);
continue;
}
match v {
Value::Int64(ts) => {
assert_eq!(ts, (*results.get(i).unwrap()).unwrap());
}
_ => unreachable!(),
}
}
let arg0 = Arc::new(Int64Array::from(times));
test_invoke(arg0, &results);
}
#[test]
fn test_date_to_unixtime() {
let f = ToUnixtimeFunction;
let times = vec![Some(123), None, Some(42), None];
let results = [Some(10627200), None, Some(3628800), None];
let date_vector = DateVector::from(times.clone());
let args: Vec<VectorRef> = vec![Arc::new(date_vector)];
let vector = f.eval(&FunctionContext::default(), &args).unwrap();
assert_eq!(4, vector.len());
for (i, _t) in times.iter().enumerate() {
let v = vector.get(i);
if i == 1 || i == 3 {
assert_eq!(Value::Null, v);
continue;
}
match v {
Value::Int64(ts) => {
assert_eq!(ts, (*results.get(i).unwrap()).unwrap());
}
_ => unreachable!(),
}
}
let arg0 = Arc::new(Date32Array::from(times));
test_invoke(arg0, &results);
}
#[test]
fn test_timestamp_to_unixtime() {
let f = ToUnixtimeFunction;
let times = vec![Some(123), None, Some(42), None];
let results = [Some(123), None, Some(42), None];
let ts_vector = TimestampSecondVector::from(times.clone());
let args: Vec<VectorRef> = vec![Arc::new(ts_vector)];
let vector = f.eval(&FunctionContext::default(), &args).unwrap();
assert_eq!(4, vector.len());
for (i, _t) in times.iter().enumerate() {
let v = vector.get(i);
if i == 1 || i == 3 {
assert_eq!(Value::Null, v);
continue;
}
match v {
Value::Int64(ts) => {
assert_eq!(ts, (*results.get(i).unwrap()).unwrap());
}
_ => unreachable!(),
}
}
let arg0 = Arc::new(TimestampSecondArray::from(times));
test_invoke(arg0, &results);
let times = vec![Some(123000), None, Some(42000), None];
let results = [Some(123), None, Some(42), None];
let ts_vector = TimestampMillisecondVector::from(times.clone());
let args: Vec<VectorRef> = vec![Arc::new(ts_vector)];
let vector = f.eval(&FunctionContext::default(), &args).unwrap();
assert_eq!(4, vector.len());
for (i, _t) in times.iter().enumerate() {
let v = vector.get(i);
if i == 1 || i == 3 {
assert_eq!(Value::Null, v);
continue;
}
match v {
Value::Int64(ts) => {
assert_eq!(ts, (*results.get(i).unwrap()).unwrap());
}
_ => unreachable!(),
}
}
let arg0 = Arc::new(TimestampMillisecondArray::from(times));
test_invoke(arg0, &results);
}
}

View File

@@ -13,16 +13,17 @@
// limitations under the License.
use std::fmt::Display;
use std::sync::Arc;
use common_query::error::{InvalidFuncArgsSnafu, InvalidVectorStringSnafu, Result};
use datafusion::arrow::datatypes::DataType;
use datafusion_expr::{Signature, Volatility};
use datatypes::scalars::ScalarVectorBuilder;
use common_query::error::{InvalidVectorStringSnafu, Result};
use datafusion_common::arrow::array::{Array, AsArray, BinaryViewBuilder};
use datafusion_common::arrow::compute;
use datafusion_common::arrow::datatypes::DataType;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
use datatypes::types::parse_string_to_vector_type_value;
use datatypes::vectors::{BinaryVectorBuilder, MutableVector, VectorRef};
use snafu::{ResultExt, ensure};
use snafu::ResultExt;
use crate::function::{Function, FunctionContext};
use crate::function::{Function, extract_args};
const NAME: &str = "parse_vec";
@@ -35,40 +36,36 @@ impl Function for ParseVectorFunction {
}
fn return_type(&self, _: &[DataType]) -> Result<DataType> {
Ok(DataType::Binary)
Ok(DataType::BinaryView)
}
fn signature(&self) -> Signature {
Signature::string(1, Volatility::Immutable)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly one, have: {}",
columns.len()
),
}
);
fn invoke_with_args(
&self,
args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
let [arg0] = extract_args(self.name(), &args)?;
let arg0 = compute::cast(&arg0, &DataType::Utf8View)?;
let column = arg0.as_string_view();
let column = &columns[0];
let size = column.len();
let mut result = BinaryVectorBuilder::with_capacity(size);
let mut builder = BinaryViewBuilder::with_capacity(size);
for i in 0..size {
let value = column.get(i).as_string();
let value = column.is_valid(i).then(|| column.value(i));
if let Some(value) = value {
let res = parse_string_to_vector_type_value(&value, None)
.context(InvalidVectorStringSnafu { vec_str: &value })?;
result.push(Some(&res));
let result = parse_string_to_vector_type_value(value, None)
.context(InvalidVectorStringSnafu { vec_str: value })?;
builder.append_value(result);
} else {
result.push_null();
builder.append_null();
}
}
Ok(result.to_vector())
Ok(ColumnarValue::Array(Arc::new(builder.finish())))
}
}
@@ -82,9 +79,9 @@ impl Display for ParseVectorFunction {
mod tests {
use std::sync::Arc;
use arrow_schema::Field;
use common_base::bytes::Bytes;
use datatypes::value::Value;
use datatypes::vectors::StringVector;
use datafusion_common::arrow::array::StringViewArray;
use super::*;
@@ -92,66 +89,84 @@ mod tests {
fn test_parse_vector() {
let func = ParseVectorFunction;
let input = Arc::new(StringVector::from(vec![
let arg0 = Arc::new(StringViewArray::from_iter([
Some("[1.0,2.0,3.0]".to_string()),
Some("[4.0,5.0,6.0]".to_string()),
None,
]));
let args = ScalarFunctionArgs {
args: vec![ColumnarValue::Array(arg0)],
arg_fields: vec![],
number_rows: 3,
return_field: Arc::new(Field::new("", DataType::BinaryView, false)),
config_options: Arc::new(Default::default()),
};
let result = func.eval(&FunctionContext::default(), &[input]).unwrap();
let result = func
.invoke_with_args(args)
.and_then(|x| x.to_array(3))
.unwrap();
let result = result.as_binary_view();
let result = result.as_ref();
assert_eq!(result.len(), 3);
assert_eq!(
result.get(0),
Value::Binary(Bytes::from(
result.value(0),
&Bytes::from(
[1.0f32, 2.0, 3.0]
.iter()
.flat_map(|e| e.to_le_bytes())
.collect::<Vec<u8>>()
))
)
);
assert_eq!(
result.get(1),
Value::Binary(Bytes::from(
result.value(1),
&Bytes::from(
[4.0f32, 5.0, 6.0]
.iter()
.flat_map(|e| e.to_le_bytes())
.collect::<Vec<u8>>()
))
)
);
assert!(result.get(2).is_null());
assert!(result.is_null(2));
}
#[test]
fn test_parse_vector_error() {
let func = ParseVectorFunction;
let input = Arc::new(StringVector::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
Some("[4.0,5.0,6.0]".to_string()),
Some("[7.0,8.0,9.0".to_string()),
]));
let inputs = [
StringViewArray::from_iter([
Some("[1.0,2.0,3.0]".to_string()),
Some("[4.0,5.0,6.0]".to_string()),
Some("[7.0,8.0,9.0".to_string()),
]),
StringViewArray::from_iter([
Some("[1.0,2.0,3.0]".to_string()),
Some("[4.0,5.0,6.0]".to_string()),
Some("7.0,8.0,9.0]".to_string()),
]),
StringViewArray::from_iter([
Some("[1.0,2.0,3.0]".to_string()),
Some("[4.0,5.0,6.0]".to_string()),
Some("[7.0,hello,9.0]".to_string()),
]),
];
let expected = [
"External error: Invalid vector string: [7.0,8.0,9.0",
"External error: Invalid vector string: 7.0,8.0,9.0]",
"External error: Invalid vector string: [7.0,hello,9.0]",
];
let result = func.eval(&FunctionContext::default(), &[input]);
assert!(result.is_err());
let input = Arc::new(StringVector::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
Some("[4.0,5.0,6.0]".to_string()),
Some("7.0,8.0,9.0]".to_string()),
]));
let result = func.eval(&FunctionContext::default(), &[input]);
assert!(result.is_err());
let input = Arc::new(StringVector::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
Some("[4.0,5.0,6.0]".to_string()),
Some("[7.0,hello,9.0]".to_string()),
]));
let result = func.eval(&FunctionContext::default(), &[input]);
assert!(result.is_err());
for (input, expected) in inputs.into_iter().zip(expected.into_iter()) {
let args = ScalarFunctionArgs {
args: vec![ColumnarValue::Array(Arc::new(input))],
arg_fields: vec![],
number_rows: 3,
return_field: Arc::new(Field::new("", DataType::BinaryView, false)),
config_options: Arc::new(Default::default()),
};
let result = func.invoke_with_args(args);
assert_eq!(result.unwrap_err().to_string(), expected);
}
}
}

View File

@@ -13,18 +13,18 @@
// limitations under the License.
use std::fmt::Display;
use std::sync::Arc;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use datafusion::arrow::datatypes::DataType;
use common_query::error::Result;
use datafusion_common::DataFusionError;
use datafusion_common::arrow::array::{Array, AsArray, StringViewBuilder};
use datafusion_common::arrow::compute;
use datafusion_common::arrow::datatypes::DataType;
use datafusion_expr::type_coercion::aggregates::BINARYS;
use datafusion_expr::{Signature, TypeSignature, Volatility};
use datatypes::scalars::ScalarVectorBuilder;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, TypeSignature, Volatility};
use datatypes::types::vector_type_value_to_string;
use datatypes::value::Value;
use datatypes::vectors::{MutableVector, StringVectorBuilder, VectorRef};
use snafu::ensure;
use crate::function::{Function, FunctionContext};
use crate::function::{Function, extract_args};
const NAME: &str = "vec_to_string";
@@ -37,7 +37,7 @@ impl Function for VectorToStringFunction {
}
fn return_type(&self, _: &[DataType]) -> Result<DataType> {
Ok(DataType::Utf8)
Ok(DataType::Utf8View)
}
fn signature(&self) -> Signature {
@@ -50,51 +50,40 @@ impl Function for VectorToStringFunction {
)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly one, have: {}",
columns.len()
),
}
);
fn invoke_with_args(
&self,
args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
let [arg0] = extract_args(self.name(), &args)?;
let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
let column = arg0.as_binary_view();
let column = &columns[0];
let size = column.len();
let mut result = StringVectorBuilder::with_capacity(size);
let mut builder = StringViewBuilder::with_capacity(size);
for i in 0..size {
let value = column.get(i);
let value = column.is_valid(i).then(|| column.value(i));
match value {
Value::Binary(bytes) => {
Some(bytes) => {
let len = bytes.len();
if len % std::mem::size_of::<f32>() != 0 {
return InvalidFuncArgsSnafu {
err_msg: format!("Invalid binary length of vector: {}", len),
}
.fail();
return Err(DataFusionError::Execution(format!(
"Invalid binary length of vector: {len}"
)));
}
let dim = len / std::mem::size_of::<f32>();
// Safety: `dim` is calculated from the length of `bytes` and is guaranteed to be valid
let res = vector_type_value_to_string(&bytes, dim as _).unwrap();
result.push(Some(&res));
let result = vector_type_value_to_string(bytes, dim as _).unwrap();
builder.append_value(result);
}
Value::Null => {
result.push_null();
}
_ => {
return InvalidFuncArgsSnafu {
err_msg: format!("Invalid value type: {:?}", value.data_type()),
}
.fail();
None => {
builder.append_null();
}
}
}
Ok(result.to_vector())
Ok(ColumnarValue::Array(Arc::new(builder.finish())))
}
}
@@ -106,8 +95,8 @@ impl Display for VectorToStringFunction {
#[cfg(test)]
mod tests {
use datatypes::value::Value;
use datatypes::vectors::BinaryVectorBuilder;
use arrow_schema::Field;
use datafusion_common::arrow::array::BinaryViewBuilder;
use super::*;
@@ -115,29 +104,39 @@ mod tests {
fn test_vector_to_string() {
let func = VectorToStringFunction;
let mut builder = BinaryVectorBuilder::with_capacity(3);
builder.push(Some(
let mut builder = BinaryViewBuilder::with_capacity(3);
builder.append_option(Some(
[1.0f32, 2.0, 3.0]
.iter()
.flat_map(|e| e.to_le_bytes())
.collect::<Vec<_>>()
.as_slice(),
));
builder.push(Some(
builder.append_option(Some(
[4.0f32, 5.0, 6.0]
.iter()
.flat_map(|e| e.to_le_bytes())
.collect::<Vec<_>>()
.as_slice(),
));
builder.push_null();
let vector = builder.to_vector();
builder.append_null();
let args = ScalarFunctionArgs {
args: vec![ColumnarValue::Array(Arc::new(builder.finish()))],
arg_fields: vec![],
number_rows: 3,
return_field: Arc::new(Field::new("", DataType::Utf8View, false)),
config_options: Arc::new(Default::default()),
};
let result = func.eval(&FunctionContext::default(), &[vector]).unwrap();
let result = func
.invoke_with_args(args)
.and_then(|x| x.to_array(3))
.unwrap();
let result = result.as_string_view();
assert_eq!(result.len(), 3);
assert_eq!(result.get(0), Value::String("[1,2,3]".to_string().into()));
assert_eq!(result.get(1), Value::String("[4,5,6]".to_string().into()));
assert_eq!(result.get(2), Value::Null);
assert_eq!(result.value(0), "[1,2,3]");
assert_eq!(result.value(1), "[4,5,6]");
assert!(result.is_null(2));
}
}

View File

@@ -46,8 +46,18 @@ macro_rules! define_distance_function {
fn signature(&self) -> Signature {
helper::one_of_sigs2(
vec![DataType::Utf8, DataType::Binary],
vec![DataType::Utf8, DataType::Binary],
vec![
DataType::Utf8,
DataType::Utf8View,
DataType::Binary,
DataType::BinaryView,
],
vec![
DataType::Utf8,
DataType::Utf8View,
DataType::Binary,
DataType::BinaryView,
],
)
}

View File

@@ -57,6 +57,7 @@ impl Function for ElemProductFunction {
vec![
TypeSignature::Uniform(1, STRINGS.to_vec()),
TypeSignature::Uniform(1, BINARYS.to_vec()),
TypeSignature::Uniform(1, vec![DataType::BinaryView]),
],
Volatility::Immutable,
)

View File

@@ -44,6 +44,7 @@ impl Function for ElemSumFunction {
vec![
TypeSignature::Uniform(1, STRINGS.to_vec()),
TypeSignature::Uniform(1, BINARYS.to_vec()),
TypeSignature::Uniform(1, vec![DataType::BinaryView]),
],
Volatility::Immutable,
)

View File

@@ -20,7 +20,9 @@ use datafusion_common::ScalarValue;
/// Convert a string or binary literal to a vector literal.
pub fn as_veclit(arg: &ScalarValue) -> Result<Option<Cow<'_, [f32]>>> {
match arg {
ScalarValue::Binary(b) => b.as_ref().map(|x| binlit_as_veclit(x)).transpose(),
ScalarValue::Binary(b) | ScalarValue::BinaryView(b) => {
b.as_ref().map(|x| binlit_as_veclit(x)).transpose()
}
ScalarValue::Utf8(s) | ScalarValue::Utf8View(s) => s
.as_ref()
.map(|x| parse_veclit_from_strlit(x).map(Cow::Owned))

View File

@@ -65,7 +65,7 @@ impl Function for ScalarAddFunction {
fn signature(&self) -> Signature {
helper::one_of_sigs2(
vec![DataType::Float64],
vec![DataType::Utf8, DataType::Binary],
vec![DataType::Utf8, DataType::Binary, DataType::BinaryView],
)
}

View File

@@ -65,7 +65,12 @@ impl Function for ScalarMulFunction {
fn signature(&self) -> Signature {
helper::one_of_sigs2(
vec![DataType::Float64],
vec![DataType::Utf8, DataType::Binary],
vec![
DataType::Utf8,
DataType::Utf8View,
DataType::Binary,
DataType::BinaryView,
],
)
}

View File

@@ -56,8 +56,8 @@ impl Function for VectorAddFunction {
fn signature(&self) -> Signature {
helper::one_of_sigs2(
vec![DataType::Utf8, DataType::Binary],
vec![DataType::Utf8, DataType::Binary],
vec![DataType::Utf8, DataType::Binary, DataType::BinaryView],
vec![DataType::Utf8, DataType::Binary, DataType::BinaryView],
)
}

View File

@@ -57,8 +57,8 @@ impl Function for VectorDivFunction {
fn signature(&self) -> Signature {
helper::one_of_sigs2(
vec![DataType::Utf8, DataType::Binary],
vec![DataType::Utf8, DataType::Binary],
vec![DataType::Utf8, DataType::Binary, DataType::BinaryView],
vec![DataType::Utf8, DataType::Binary, DataType::BinaryView],
)
}

View File

@@ -57,8 +57,8 @@ impl Function for VectorMulFunction {
fn signature(&self) -> Signature {
helper::one_of_sigs2(
vec![DataType::Utf8, DataType::Binary],
vec![DataType::Utf8, DataType::Binary],
vec![DataType::Utf8, DataType::Binary, DataType::BinaryView],
vec![DataType::Utf8, DataType::Binary, DataType::BinaryView],
)
}

View File

@@ -60,6 +60,7 @@ impl Function for VectorNormFunction {
vec![
TypeSignature::Uniform(1, STRINGS.to_vec()),
TypeSignature::Uniform(1, BINARYS.to_vec()),
TypeSignature::Uniform(1, vec![DataType::BinaryView]),
],
Volatility::Immutable,
)

View File

@@ -56,8 +56,8 @@ impl Function for VectorSubFunction {
fn signature(&self) -> Signature {
helper::one_of_sigs2(
vec![DataType::Utf8, DataType::Binary],
vec![DataType::Utf8, DataType::Binary],
vec![DataType::Utf8, DataType::Binary, DataType::BinaryView],
vec![DataType::Utf8, DataType::Binary, DataType::BinaryView],
)
}

View File

@@ -13,14 +13,13 @@
// limitations under the License.
use std::fmt;
use std::sync::Arc;
use common_query::error::Result;
use datafusion::arrow::datatypes::DataType;
use datafusion_expr::{Signature, Volatility};
use datatypes::vectors::{StringVector, VectorRef};
use datafusion_common::ScalarValue;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
use crate::function::{Function, FunctionContext};
use crate::function::Function;
#[derive(Clone, Debug, Default)]
pub(crate) struct PGVersionFunction;
@@ -37,18 +36,17 @@ impl Function for PGVersionFunction {
}
fn return_type(&self, _: &[DataType]) -> Result<DataType> {
Ok(DataType::Utf8)
Ok(DataType::Utf8View)
}
fn signature(&self) -> Signature {
Signature::exact(vec![], Volatility::Immutable)
}
fn eval(&self, _func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
let result = StringVector::from(vec![format!(
fn invoke_with_args(&self, _: ScalarFunctionArgs) -> datafusion_common::Result<ColumnarValue> {
Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(Some(format!(
"PostgreSQL 16.3 GreptimeDB {}",
common_version::version()
)]);
Ok(Arc::new(result))
)))))
}
}

View File

@@ -44,13 +44,6 @@ pub enum Error {
source: DataTypeError,
},
#[snafu(display("Failed to cast arrow array into vector"))]
FromArrowArray {
#[snafu(implicit)]
location: Location,
source: DataTypeError,
},
#[snafu(display("Failed to cast arrow array into vector: {:?}", data_type))]
IntoVector {
#[snafu(implicit)]
@@ -225,7 +218,6 @@ impl ErrorExt for Error {
Error::InvalidInputType { source, .. }
| Error::IntoVector { source, .. }
| Error::FromScalarValue { source, .. }
| Error::FromArrowArray { source, .. }
| Error::InvalidVectorString { source, .. } => source.status_code(),
Error::MissingTableMutationHandler { .. }

View File

@@ -579,6 +579,12 @@ impl QueryEngine for DatafusionQueryEngine {
state: self.engine_state().function_state(),
});
let config_options = state.config_options().clone();
let _ = state
.execution_props_mut()
.config_options
.insert(config_options);
QueryEngineContext::new(state, query_ctx)
}

View File

@@ -119,11 +119,11 @@ Affected Rows: 25
INSERT INTO jsons VALUES(parse_json('{"a":1, "b":2, "c":3'), 4);
Error: 3001(EngineExecuteQuery), Invalid function args: Cannot convert the string to json, have: {"a":1, "b":2, "c":3
Error: 3001(EngineExecuteQuery), Execution error: cannot parse '{"a":1, "b":2, "c":3': EOF while parsing a value, pos 20
INSERT INTO jsons VALUES(parse_json('Morning my friends, have a nice day :)'), 5);
Error: 3001(EngineExecuteQuery), Invalid function args: Cannot convert the string to json, have: Morning my friends, have a nice day :)
Error: 3001(EngineExecuteQuery), Execution error: cannot parse 'Morning my friends, have a nice day :)': expected value, pos 1
SELECT json_to_string(j), t FROM jsons;