From 243dbde3d5186cd9280249491889fedca7c9fb9f Mon Sep 17 00:00:00 2001 From: LFC <990479+MichaelScofield@users.noreply.github.com> Date: Fri, 26 Sep 2025 17:24:29 +0800 Subject: [PATCH] refactor: rewrite some UDFs to DataFusion style (final part) (#7023) Signed-off-by: luofucong --- src/common/function/src/helper.rs | 35 +++ .../function/src/scalars/date/date_format.rs | 194 ++++++------ .../function/src/scalars/json/json_get.rs | 281 ++++++++---------- .../function/src/scalars/json/json_is.rs | 106 ++++--- .../src/scalars/json/json_path_exists.rs | 197 ++++++------ .../src/scalars/json/json_path_match.rs | 137 ++++----- .../src/scalars/json/json_to_string.rs | 118 ++++---- .../function/src/scalars/json/parse_json.rs | 110 +++---- .../function/src/scalars/matches_term.rs | 145 +++++---- src/common/function/src/scalars/math/rate.rs | 46 +-- .../src/scalars/timestamp/to_unixtime.rs | 235 ++++++--------- .../scalars/vector/convert/parse_vector.rs | 141 +++++---- .../vector/convert/vector_to_string.rs | 97 +++--- .../function/src/scalars/vector/distance.rs | 14 +- .../src/scalars/vector/elem_product.rs | 1 + .../function/src/scalars/vector/elem_sum.rs | 1 + .../function/src/scalars/vector/impl_conv.rs | 4 +- .../function/src/scalars/vector/scalar_add.rs | 2 +- .../function/src/scalars/vector/scalar_mul.rs | 7 +- .../function/src/scalars/vector/vector_add.rs | 4 +- .../function/src/scalars/vector/vector_div.rs | 4 +- .../function/src/scalars/vector/vector_mul.rs | 4 +- .../src/scalars/vector/vector_norm.rs | 1 + .../function/src/scalars/vector/vector_sub.rs | 4 +- .../function/src/system/pg_catalog/version.rs | 16 +- src/common/query/src/error.rs | 8 - src/query/src/datafusion.rs | 6 + .../standalone/common/types/json/json.result | 4 +- 28 files changed, 940 insertions(+), 982 deletions(-) diff --git a/src/common/function/src/helper.rs b/src/common/function/src/helper.rs index 7e2b3cfc98..086755e738 100644 --- a/src/common/function/src/helper.rs +++ b/src/common/function/src/helper.rs @@ -96,6 +96,41 @@ pub fn get_string_from_params<'a>( Ok(s) } +macro_rules! with_match_timestamp_types { + ($data_type:expr, | $_t:tt $T:ident | $body:tt) => {{ + macro_rules! __with_ty__ { + ( $_t $T:ident ) => { + $body + }; + } + + use datafusion_common::DataFusionError; + use datafusion_common::arrow::datatypes::{ + TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, + TimestampSecondType, + }; + + match $data_type { + DataType::Timestamp(TimeUnit::Second, _) => Ok(__with_ty__! { TimestampSecondType }), + DataType::Timestamp(TimeUnit::Millisecond, _) => { + Ok(__with_ty__! { TimestampMillisecondType }) + } + DataType::Timestamp(TimeUnit::Microsecond, _) => { + Ok(__with_ty__! { TimestampMicrosecondType }) + } + DataType::Timestamp(TimeUnit::Nanosecond, _) => { + Ok(__with_ty__! { TimestampNanosecondType }) + } + _ => Err(DataFusionError::Execution(format!( + "not expected data type: '{}'", + $data_type + ))), + } + }}; +} + +pub(crate) use with_match_timestamp_types; + #[cfg(test)] mod tests { use super::*; diff --git a/src/common/function/src/scalars/date/date_format.rs b/src/common/function/src/scalars/date/date_format.rs index 278f8ccc0b..b4f267d32e 100644 --- a/src/common/function/src/scalars/date/date_format.rs +++ b/src/common/function/src/scalars/date/date_format.rs @@ -13,17 +13,20 @@ // limitations under the License. use std::fmt; +use std::sync::Arc; use common_error::ext::BoxedError; -use common_query::error::{self, InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu}; -use datafusion_expr::Signature; -use datatypes::arrow::datatypes::{DataType, TimeUnit}; -use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder}; -use datatypes::vectors::{StringVectorBuilder, VectorRef}; -use snafu::{ResultExt, ensure}; +use common_query::error::{self, Result}; +use common_time::{Date, Timestamp}; +use datafusion_common::DataFusionError; +use datafusion_common::arrow::array::{Array, AsArray, StringViewBuilder}; +use datafusion_common::arrow::datatypes::{ArrowTimestampType, DataType, Date32Type, TimeUnit}; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature}; +use snafu::ResultExt; -use crate::function::{Function, FunctionContext}; +use crate::function::{Function, extract_args, find_function_context}; use crate::helper; +use crate::helper::with_match_timestamp_types; /// A function that formats timestamp/date/datetime into string by the format #[derive(Clone, Debug, Default)] @@ -37,7 +40,7 @@ impl Function for DateFormatFunction { } fn return_type(&self, _: &[DataType]) -> Result { - Ok(DataType::Utf8) + Ok(DataType::Utf8View) } fn signature(&self) -> Signature { @@ -53,68 +56,65 @@ impl Function for DateFormatFunction { ) } - fn eval(&self, func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure!( - columns.len() == 2, - InvalidFuncArgsSnafu { - err_msg: format!( - "The length of the args is not correct, expect 2, have: {}", - columns.len() - ), - } - ); + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + let ctx = find_function_context(&args)?; + let timezone = &ctx.query_ctx.timezone(); - let left = &columns[0]; - let formats = &columns[1]; + let [left, arg1] = extract_args(self.name(), &args)?; + let formats = arg1.as_string::(); let size = left.len(); - let left_datatype = columns[0].data_type(); - let mut results = StringVectorBuilder::with_capacity(size); + let left_datatype = left.data_type(); + let mut builder = StringViewBuilder::with_capacity(size); match left_datatype { - ConcreteDataType::Timestamp(_) => { - for i in 0..size { - let ts = left.get(i).as_timestamp(); - let format = formats.get(i).as_string(); - - let result = match (ts, format) { - (Some(ts), Some(fmt)) => Some( - ts.as_formatted_string(&fmt, Some(&func_ctx.query_ctx.timezone())) - .map_err(BoxedError::new) - .context(error::ExecuteSnafu)?, - ), - _ => None, - }; - - results.push(result.as_deref()); - } + DataType::Timestamp(_, _) => { + with_match_timestamp_types!(left_datatype, |$S| { + let array = left.as_primitive::<$S>(); + for (date, format) in array.iter().zip(formats.iter()) { + let result = match (date, format) { + (Some(date), Some(format)) => { + let ts = Timestamp::new(date, $S::UNIT.into()); + let x = ts.as_formatted_string(&format, Some(timezone)) + .map_err(|e| DataFusionError::Execution(format!( + "cannot format {ts:?} as '{format}': {e}" + )))?; + Some(x) + } + _ => None + }; + builder.append_option(result.as_deref()); + } + })?; } - ConcreteDataType::Date(_) => { + DataType::Date32 => { + let left = left.as_primitive::(); for i in 0..size { - let date = left.get(i).as_date(); - let format = formats.get(i).as_string(); + let date = left.is_valid(i).then(|| Date::from(left.value(i))); + let format = formats.is_valid(i).then(|| formats.value(i)); let result = match (date, format) { (Some(date), Some(fmt)) => date - .as_formatted_string(&fmt, Some(&func_ctx.query_ctx.timezone())) + .as_formatted_string(fmt, Some(timezone)) .map_err(BoxedError::new) .context(error::ExecuteSnafu)?, _ => None, }; - results.push(result.as_deref()); + builder.append_option(result.as_deref()); } } - _ => { - return UnsupportedInputDataTypeSnafu { - function: NAME, - datatypes: columns.iter().map(|c| c.data_type()).collect::>(), - } - .fail(); + x => { + return Err(DataFusionError::Execution(format!( + "unsupported input data type {x}" + ))); } } - Ok(results.to_vector()) + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } } @@ -128,28 +128,32 @@ impl fmt::Display for DateFormatFunction { mod tests { use std::sync::Arc; + use arrow_schema::Field; + use datafusion_common::arrow::array::{Date32Array, StringArray, TimestampSecondArray}; + use datafusion_common::config::ConfigOptions; use datafusion_expr::{TypeSignature, Volatility}; - use datatypes::prelude::ScalarVector; - use datatypes::value::Value; - use datatypes::vectors::{DateVector, StringVector, TimestampSecondVector}; use super::{DateFormatFunction, *}; + use crate::function::FunctionContext; #[test] fn test_date_format_misc() { let f = DateFormatFunction; assert_eq!("date_format", f.name()); assert_eq!( - DataType::Utf8, + DataType::Utf8View, f.return_type(&[DataType::Timestamp(TimeUnit::Microsecond, None)]) .unwrap() ); assert_eq!( - DataType::Utf8, + DataType::Utf8View, f.return_type(&[DataType::Timestamp(TimeUnit::Second, None)]) .unwrap() ); - assert_eq!(DataType::Utf8, f.return_type(&[DataType::Date32]).unwrap()); + assert_eq!( + DataType::Utf8View, + f.return_type(&[DataType::Date32]).unwrap() + ); assert!(matches!(f.signature(), Signature { type_signature: TypeSignature::OneOf(sigs), @@ -175,26 +179,29 @@ mod tests { None, ]; - let time_vector = TimestampSecondVector::from(times.clone()); - let interval_vector = StringVector::from_vec(formats); - let args: Vec = vec![Arc::new(time_vector), Arc::new(interval_vector)]; - let vector = f.eval(&FunctionContext::default(), &args).unwrap(); + let mut config_options = ConfigOptions::default(); + config_options.extensions.insert(FunctionContext::default()); + let config_options = Arc::new(config_options); + + let args = ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(Arc::new(TimestampSecondArray::from(times))), + ColumnarValue::Array(Arc::new(StringArray::from_iter_values(formats))), + ], + arg_fields: vec![], + number_rows: 4, + return_field: Arc::new(Field::new("x", DataType::Utf8View, false)), + config_options, + }; + let result = f + .invoke_with_args(args) + .and_then(|x| x.to_array(4)) + .unwrap(); + let vector = result.as_string_view(); assert_eq!(4, vector.len()); - for (i, _t) in times.iter().enumerate() { - let v = vector.get(i); - let result = results.get(i).unwrap(); - - if result.is_none() { - assert_eq!(Value::Null, v); - continue; - } - match v { - Value::String(s) => { - assert_eq!(s.as_utf8(), result.unwrap()); - } - _ => unreachable!(), - } + for (actual, expect) in vector.iter().zip(results) { + assert_eq!(actual, expect); } } @@ -216,26 +223,29 @@ mod tests { None, ]; - let date_vector = DateVector::from(dates.clone()); - let interval_vector = StringVector::from_vec(formats); - let args: Vec = vec![Arc::new(date_vector), Arc::new(interval_vector)]; - let vector = f.eval(&FunctionContext::default(), &args).unwrap(); + let mut config_options = ConfigOptions::default(); + config_options.extensions.insert(FunctionContext::default()); + let config_options = Arc::new(config_options); + + let args = ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(Arc::new(Date32Array::from(dates))), + ColumnarValue::Array(Arc::new(StringArray::from_iter_values(formats))), + ], + arg_fields: vec![], + number_rows: 4, + return_field: Arc::new(Field::new("x", DataType::Utf8View, false)), + config_options, + }; + let result = f + .invoke_with_args(args) + .and_then(|x| x.to_array(4)) + .unwrap(); + let vector = result.as_string_view(); assert_eq!(4, vector.len()); - for (i, _t) in dates.iter().enumerate() { - let v = vector.get(i); - let result = results.get(i).unwrap(); - - if result.is_none() { - assert_eq!(Value::Null, v); - continue; - } - match v { - Value::String(s) => { - assert_eq!(s.as_utf8(), result.unwrap()); - } - _ => unreachable!(), - } + for (actual, expect) in vector.iter().zip(results) { + assert_eq!(actual, expect); } } } diff --git a/src/common/function/src/scalars/json/json_get.rs b/src/common/function/src/scalars/json/json_get.rs index 66858d5470..da340829a0 100644 --- a/src/common/function/src/scalars/json/json_get.rs +++ b/src/common/function/src/scalars/json/json_get.rs @@ -13,20 +13,18 @@ // limitations under the License. use std::fmt::{self, Display}; +use std::sync::Arc; -use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu}; -use datafusion_expr::{Signature, Volatility}; -use datatypes::arrow::datatypes::DataType; -use datatypes::data_type::ConcreteDataType; -use datatypes::prelude::VectorRef; -use datatypes::scalars::ScalarVectorBuilder; -use datatypes::vectors::{ - BooleanVectorBuilder, Float64VectorBuilder, Int64VectorBuilder, MutableVector, - StringVectorBuilder, +use arrow::compute; +use common_query::error::Result; +use datafusion_common::arrow::array::{ + Array, AsArray, BooleanBuilder, Float64Builder, Int64Builder, StringViewBuilder, }; -use snafu::ensure; +use datafusion_common::arrow::datatypes::DataType; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature}; -use crate::function::{Function, FunctionContext}; +use crate::function::{Function, extract_args}; +use crate::helper; fn get_json_by_path(json: &[u8], path: &str) -> Option> { let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes()); @@ -64,59 +62,40 @@ macro_rules! json_get { fn signature(&self) -> Signature { // TODO(LFC): Use a more clear type here instead of "Binary" for Json input, once we have a "Json" type. - Signature::exact( - vec![DataType::Binary, DataType::Utf8], - Volatility::Immutable, + helper::one_of_sigs2( + vec![DataType::Binary, DataType::BinaryView], + vec![DataType::Utf8, DataType::Utf8View], ) } - fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure!( - columns.len() == 2, - InvalidFuncArgsSnafu { - err_msg: format!( - "The length of the args is not correct, expect exactly two, have: {}", - columns.len() - ), - } - ); - let jsons = &columns[0]; - let paths = &columns[1]; + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + let [arg0, arg1] = extract_args(self.name(), &args)?; + let arg0 = compute::cast(&arg0, &DataType::BinaryView)?; + let jsons = arg0.as_binary_view(); + let arg1 = compute::cast(&arg1, &DataType::Utf8View)?; + let paths = arg1.as_string_view(); let size = jsons.len(); - let datatype = jsons.data_type(); - let mut results = [<$type VectorBuilder>]::with_capacity(size); + let mut builder = [<$type Builder>]::with_capacity(size); - match datatype { - // JSON data type uses binary vector - ConcreteDataType::Binary(_) => { - for i in 0..size { - let json = jsons.get_ref(i); - let path = paths.get_ref(i); - - let json = json.as_binary(); - let path = path.as_string(); - let result = match (json, path) { - (Ok(Some(json)), Ok(Some(path))) => { - get_json_by_path(json, path) - .and_then(|json| { jsonb::[](&json).ok() }) - } - _ => None, - }; - - results.push(result); + for i in 0..size { + let json = jsons.is_valid(i).then(|| jsons.value(i)); + let path = paths.is_valid(i).then(|| paths.value(i)); + let result = match (json, path) { + (Some(json), Some(path)) => { + get_json_by_path(json, path) + .and_then(|json| { jsonb::[](&json).ok() }) } - } - _ => { - return UnsupportedInputDataTypeSnafu { - function: stringify!([<$name:snake>]), - datatypes: columns.iter().map(|c| c.data_type()).collect::>(), - } - .fail(); - } + _ => None, + }; + + builder.append_option(result); } - Ok(results.to_vector()) + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } } @@ -160,63 +139,43 @@ impl Function for JsonGetString { } fn return_type(&self, _: &[DataType]) -> Result { - Ok(DataType::Utf8) + Ok(DataType::Utf8View) } fn signature(&self) -> Signature { // TODO(LFC): Use a more clear type here instead of "Binary" for Json input, once we have a "Json" type. - Signature::exact( - vec![DataType::Binary, DataType::Utf8], - Volatility::Immutable, + helper::one_of_sigs2( + vec![DataType::Binary, DataType::BinaryView], + vec![DataType::Utf8, DataType::Utf8View], ) } - fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure!( - columns.len() == 2, - InvalidFuncArgsSnafu { - err_msg: format!( - "The length of the args is not correct, expect exactly two, have: {}", - columns.len() - ), - } - ); - let jsons = &columns[0]; - let paths = &columns[1]; + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + let [arg0, arg1] = extract_args(self.name(), &args)?; + let arg0 = compute::cast(&arg0, &DataType::BinaryView)?; + let jsons = arg0.as_binary_view(); + let arg1 = compute::cast(&arg1, &DataType::Utf8View)?; + let paths = arg1.as_string_view(); let size = jsons.len(); - let datatype = jsons.data_type(); - let mut results = StringVectorBuilder::with_capacity(size); + let mut builder = StringViewBuilder::with_capacity(size); - match datatype { - // JSON data type uses binary vector - ConcreteDataType::Binary(_) => { - for i in 0..size { - let json = jsons.get_ref(i); - let path = paths.get_ref(i); - - let json = json.as_binary(); - let path = path.as_string(); - let result = match (json, path) { - (Ok(Some(json)), Ok(Some(path))) => { - get_json_by_path(json, path).and_then(|json| jsonb::to_str(&json).ok()) - } - _ => None, - }; - - results.push(result.as_deref()); + for i in 0..size { + let json = jsons.is_valid(i).then(|| jsons.value(i)); + let path = paths.is_valid(i).then(|| paths.value(i)); + let result = match (json, path) { + (Some(json), Some(path)) => { + get_json_by_path(json, path).and_then(|json| jsonb::to_str(&json).ok()) } - } - _ => { - return UnsupportedInputDataTypeSnafu { - function: "json_get_string", - datatypes: columns.iter().map(|c| c.data_type()).collect::>(), - } - .fail(); - } + _ => None, + }; + builder.append_option(result); } - Ok(results.to_vector()) + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } } @@ -230,9 +189,9 @@ impl Display for JsonGetString { mod tests { use std::sync::Arc; - use datafusion_expr::TypeSignature; - use datatypes::scalars::ScalarVector; - use datatypes::vectors::{BinaryVector, StringVector}; + use arrow_schema::Field; + use datafusion_common::arrow::array::{BinaryArray, StringArray}; + use datafusion_common::arrow::datatypes::{Float64Type, Int64Type}; use super::*; @@ -248,13 +207,6 @@ mod tests { .unwrap() ); - assert!(matches!(json_get_int.signature(), - Signature { - type_signature: TypeSignature::Exact(valid_types), - volatility: Volatility::Immutable - } if valid_types == vec![DataType::Binary, DataType::Utf8] - )); - let json_strings = [ r#"{"a": {"b": 2}, "b": 2, "c": 3}"#, r#"{"a": 4, "b": {"c": 6}, "c": 6}"#, @@ -271,17 +223,25 @@ mod tests { }) .collect::>(); - let json_vector = BinaryVector::from_vec(jsonbs); - let path_vector = StringVector::from_vec(paths); - let args: Vec = vec![Arc::new(json_vector), Arc::new(path_vector)]; - let vector = json_get_int - .eval(&FunctionContext::default(), &args) + let args = ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))), + ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))), + ], + arg_fields: vec![], + number_rows: 3, + return_field: Arc::new(Field::new("x", DataType::Int64, false)), + config_options: Arc::new(Default::default()), + }; + let result = json_get_int + .invoke_with_args(args) + .and_then(|x| x.to_array(3)) .unwrap(); + let vector = result.as_primitive::(); assert_eq!(3, vector.len()); for (i, gt) in results.iter().enumerate() { - let result = vector.get_ref(i); - let result = result.as_i64().unwrap(); + let result = vector.is_valid(i).then(|| vector.value(i)); assert_eq!(*gt, result); } } @@ -298,13 +258,6 @@ mod tests { .unwrap() ); - assert!(matches!(json_get_float.signature(), - Signature { - type_signature: TypeSignature::Exact(valid_types), - volatility: Volatility::Immutable - } if valid_types == vec![DataType::Binary, DataType::Utf8] - )); - let json_strings = [ r#"{"a": {"b": 2.1}, "b": 2.2, "c": 3.3}"#, r#"{"a": 4.4, "b": {"c": 6.6}, "c": 6.6}"#, @@ -321,17 +274,25 @@ mod tests { }) .collect::>(); - let json_vector = BinaryVector::from_vec(jsonbs); - let path_vector = StringVector::from_vec(paths); - let args: Vec = vec![Arc::new(json_vector), Arc::new(path_vector)]; - let vector = json_get_float - .eval(&FunctionContext::default(), &args) + let args = ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))), + ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))), + ], + arg_fields: vec![], + number_rows: 3, + return_field: Arc::new(Field::new("x", DataType::Float64, false)), + config_options: Arc::new(Default::default()), + }; + let result = json_get_float + .invoke_with_args(args) + .and_then(|x| x.to_array(3)) .unwrap(); + let vector = result.as_primitive::(); assert_eq!(3, vector.len()); for (i, gt) in results.iter().enumerate() { - let result = vector.get_ref(i); - let result = result.as_f64().unwrap(); + let result = vector.is_valid(i).then(|| vector.value(i)); assert_eq!(*gt, result); } } @@ -348,13 +309,6 @@ mod tests { .unwrap() ); - assert!(matches!(json_get_bool.signature(), - Signature { - type_signature: TypeSignature::Exact(valid_types), - volatility: Volatility::Immutable - } if valid_types == vec![DataType::Binary, DataType::Utf8] - )); - let json_strings = [ r#"{"a": {"b": true}, "b": false, "c": true}"#, r#"{"a": false, "b": {"c": true}, "c": false}"#, @@ -371,17 +325,25 @@ mod tests { }) .collect::>(); - let json_vector = BinaryVector::from_vec(jsonbs); - let path_vector = StringVector::from_vec(paths); - let args: Vec = vec![Arc::new(json_vector), Arc::new(path_vector)]; - let vector = json_get_bool - .eval(&FunctionContext::default(), &args) + let args = ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))), + ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))), + ], + arg_fields: vec![], + number_rows: 3, + return_field: Arc::new(Field::new("x", DataType::Boolean, false)), + config_options: Arc::new(Default::default()), + }; + let result = json_get_bool + .invoke_with_args(args) + .and_then(|x| x.to_array(3)) .unwrap(); + let vector = result.as_boolean(); assert_eq!(3, vector.len()); for (i, gt) in results.iter().enumerate() { - let result = vector.get_ref(i); - let result = result.as_boolean().unwrap(); + let result = vector.is_valid(i).then(|| vector.value(i)); assert_eq!(*gt, result); } } @@ -392,19 +354,12 @@ mod tests { assert_eq!("json_get_string", json_get_string.name()); assert_eq!( - DataType::Utf8, + DataType::Utf8View, json_get_string .return_type(&[DataType::Binary, DataType::Utf8]) .unwrap() ); - assert!(matches!(json_get_string.signature(), - Signature { - type_signature: TypeSignature::Exact(valid_types), - volatility: Volatility::Immutable - } if valid_types == vec![DataType::Binary, DataType::Utf8] - )); - let json_strings = [ r#"{"a": {"b": "a"}, "b": "b", "c": "c"}"#, r#"{"a": "d", "b": {"c": "e"}, "c": "f"}"#, @@ -421,17 +376,25 @@ mod tests { }) .collect::>(); - let json_vector = BinaryVector::from_vec(jsonbs); - let path_vector = StringVector::from_vec(paths); - let args: Vec = vec![Arc::new(json_vector), Arc::new(path_vector)]; - let vector = json_get_string - .eval(&FunctionContext::default(), &args) + let args = ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))), + ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))), + ], + arg_fields: vec![], + number_rows: 3, + return_field: Arc::new(Field::new("x", DataType::Utf8View, false)), + config_options: Arc::new(Default::default()), + }; + let result = json_get_string + .invoke_with_args(args) + .and_then(|x| x.to_array(3)) .unwrap(); + let vector = result.as_string_view(); assert_eq!(3, vector.len()); for (i, gt) in results.iter().enumerate() { - let result = vector.get_ref(i); - let result = result.as_string().unwrap(); + let result = vector.is_valid(i).then(|| vector.value(i)); assert_eq!(*gt, result); } } diff --git a/src/common/function/src/scalars/json/json_is.rs b/src/common/function/src/scalars/json/json_is.rs index de2f55b53b..29d9937bd1 100644 --- a/src/common/function/src/scalars/json/json_is.rs +++ b/src/common/function/src/scalars/json/json_is.rs @@ -13,17 +13,15 @@ // limitations under the License. use std::fmt::{self, Display}; +use std::sync::Arc; -use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu}; -use datafusion_expr::{Signature, Volatility}; -use datatypes::arrow::datatypes::DataType; -use datatypes::data_type::ConcreteDataType; -use datatypes::prelude::VectorRef; -use datatypes::scalars::ScalarVectorBuilder; -use datatypes::vectors::{BooleanVectorBuilder, MutableVector}; -use snafu::ensure; +use common_query::error::Result; +use datafusion_common::arrow::array::{Array, AsArray, BooleanBuilder}; +use datafusion_common::arrow::compute; +use datafusion_common::arrow::datatypes::DataType; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility}; -use crate::function::{Function, FunctionContext}; +use crate::function::{Function, extract_args}; /// Checks if the input is a JSON object of the given type. macro_rules! json_is { @@ -43,50 +41,36 @@ macro_rules! json_is { fn signature(&self) -> Signature { // TODO(LFC): Use a more clear type here instead of "Binary" for Json input, once we have a "Json" type. - Signature::exact(vec![DataType::Binary], Volatility::Immutable) + Signature::uniform( + 1, + vec![DataType::Binary, DataType::BinaryView], + Volatility::Immutable, + ) } - fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure!( - columns.len() == 1, - InvalidFuncArgsSnafu { - err_msg: format!( - "The length of the args is not correct, expect exactly one, have: {}", - columns.len() - ), - } - ); + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + let [arg0] = extract_args(self.name(), &args)?; - let jsons = &columns[0]; + let arg0 = compute::cast(&arg0, &DataType::BinaryView)?; + let jsons = arg0.as_binary_view(); let size = jsons.len(); - let datatype = jsons.data_type(); - let mut results = BooleanVectorBuilder::with_capacity(size); + let mut builder = BooleanBuilder::with_capacity(size); - match datatype { - // JSON data type uses binary vector - ConcreteDataType::Binary(_) => { - for i in 0..size { - let json = jsons.get_ref(i); - let json = json.as_binary(); - let result = match json { - Ok(Some(json)) => { - Some(jsonb::[](json)) - } - _ => None, - }; - results.push(result); + for i in 0..size { + let json = jsons.is_valid(i).then(|| jsons.value(i)); + let result = match json { + Some(json) => { + Some(jsonb::[](json)) } - } - _ => { - return UnsupportedInputDataTypeSnafu { - function: stringify!([<$name:snake>]), - datatypes: columns.iter().map(|c| c.data_type()).collect::>(), - } - .fail(); - } + _ => None, + }; + builder.append_option(result); } - Ok(results.to_vector()) + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } } @@ -96,7 +80,7 @@ macro_rules! json_is { } } } - } + }; } json_is!(JsonIsNull, null, "Checks if the input JSONB is null"); @@ -135,8 +119,8 @@ json_is!( mod tests { use std::sync::Arc; - use datatypes::scalars::ScalarVector; - use datatypes::vectors::BinaryVector; + use arrow_schema::Field; + use datafusion_common::arrow::array::{AsArray, BinaryArray}; use super::*; @@ -166,7 +150,11 @@ mod tests { ); assert_eq!( func.signature(), - Signature::exact(vec![DataType::Binary], Volatility::Immutable) + Signature::uniform( + 1, + vec![DataType::Binary, DataType::BinaryView], + Volatility::Immutable + ) ); } @@ -195,16 +183,26 @@ mod tests { value.to_vec() }) .collect::>(); - let json_vector = BinaryVector::from_vec(jsonbs); - let args: Vec = vec![Arc::new(json_vector)]; + let args = ScalarFunctionArgs { + args: vec![ColumnarValue::Array(Arc::new( + BinaryArray::from_iter_values(jsonbs), + ))], + arg_fields: vec![], + number_rows: 6, + return_field: Arc::new(Field::new("", DataType::Boolean, false)), + config_options: Arc::new(Default::default()), + }; for (func, expected_result) in json_is_functions.iter().zip(expected_results.iter()) { - let vector = func.eval(&FunctionContext::default(), &args).unwrap(); + let result = func + .invoke_with_args(args.clone()) + .and_then(|x| x.to_array(6)) + .unwrap(); + let vector = result.as_boolean(); assert_eq!(vector.len(), json_strings.len()); for (i, expected) in expected_result.iter().enumerate() { - let result = vector.get_ref(i); - let result = result.as_boolean().unwrap().unwrap(); + let result = vector.value(i); assert_eq!(result, *expected); } } diff --git a/src/common/function/src/scalars/json/json_path_exists.rs b/src/common/function/src/scalars/json/json_path_exists.rs index a5446ec614..0f0373950d 100644 --- a/src/common/function/src/scalars/json/json_path_exists.rs +++ b/src/common/function/src/scalars/json/json_path_exists.rs @@ -13,17 +13,17 @@ // limitations under the License. use std::fmt::{self, Display}; +use std::sync::Arc; -use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu}; -use datafusion_expr::{Signature, TypeSignature, Volatility}; -use datatypes::arrow::datatypes::DataType; -use datatypes::data_type::ConcreteDataType; -use datatypes::prelude::VectorRef; -use datatypes::scalars::ScalarVectorBuilder; -use datatypes::vectors::{BooleanVectorBuilder, MutableVector}; -use snafu::ensure; +use arrow::compute; +use common_query::error::Result; +use datafusion_common::DataFusionError; +use datafusion_common::arrow::array::{Array, AsArray, BooleanBuilder}; +use datafusion_common::arrow::datatypes::DataType; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature}; -use crate::function::{Function, FunctionContext}; +use crate::function::{Function, extract_args}; +use crate::helper; /// Check if the given JSON data contains the given JSON path. #[derive(Clone, Debug, Default)] @@ -42,48 +42,41 @@ impl Function for JsonPathExistsFunction { fn signature(&self) -> Signature { // TODO(LFC): Use a more clear type here instead of "Binary" for Json input, once we have a "Json" type. - Signature::one_of( - vec![ - TypeSignature::Exact(vec![DataType::Binary, DataType::Utf8]), - TypeSignature::Exact(vec![DataType::Null, DataType::Utf8]), - TypeSignature::Exact(vec![DataType::Binary, DataType::Null]), - TypeSignature::Exact(vec![DataType::Null, DataType::Null]), - ], - Volatility::Immutable, + helper::one_of_sigs2( + vec![DataType::Binary, DataType::BinaryView, DataType::Null], + vec![DataType::Utf8, DataType::Utf8View, DataType::Null], ) } - fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure!( - columns.len() == 2, - InvalidFuncArgsSnafu { - err_msg: format!( - "The length of the args is not correct, expect exactly two, have: {}", - columns.len() - ), - } - ); - let jsons = &columns[0]; - let paths = &columns[1]; + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + let [jsons, paths] = extract_args(self.name(), &args)?; let size = jsons.len(); - let mut results = BooleanVectorBuilder::with_capacity(size); + let mut builder = BooleanBuilder::with_capacity(size); match (jsons.data_type(), paths.data_type()) { - (ConcreteDataType::Binary(_), ConcreteDataType::String(_)) => { + (DataType::Null, _) | (_, DataType::Null) => builder.append_nulls(size), + _ => { + let jsons = compute::cast(&jsons, &DataType::BinaryView)?; + let jsons = jsons.as_binary_view(); + let paths = compute::cast(&paths, &DataType::Utf8View)?; + let paths = paths.as_string_view(); for i in 0..size { - let result = match (jsons.get_ref(i).as_binary(), paths.get_ref(i).as_string()) - { - (Ok(Some(json)), Ok(Some(path))) => { + let json = jsons.is_valid(i).then(|| jsons.value(i)); + let path = paths.is_valid(i).then(|| paths.value(i)); + let result = match (json, path) { + (Some(json), Some(path)) => { // Get `JsonPath`. let json_path = match jsonb::jsonpath::parse_json_path(path.as_bytes()) { Ok(json_path) => json_path, - Err(_) => { - return InvalidFuncArgsSnafu { - err_msg: format!("Illegal json path: {:?}", path), - } - .fail(); + Err(e) => { + return Err(DataFusionError::Execution(format!( + "invalid json path '{path}': {e}" + ))); } }; jsonb::path_exists(json, json_path).ok() @@ -91,25 +84,12 @@ impl Function for JsonPathExistsFunction { _ => None, }; - results.push(result); + builder.append_option(result); } } - - // Any null args existence causes the result to be NULL. - (ConcreteDataType::Null(_), ConcreteDataType::String(_)) => results.push_nulls(size), - (ConcreteDataType::Binary(_), ConcreteDataType::Null(_)) => results.push_nulls(size), - (ConcreteDataType::Null(_), ConcreteDataType::Null(_)) => results.push_nulls(size), - - _ => { - return UnsupportedInputDataTypeSnafu { - function: NAME, - datatypes: columns.iter().map(|c| c.data_type()).collect::>(), - } - .fail(); - } } - Ok(results.to_vector()) + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } } @@ -123,8 +103,8 @@ impl Display for JsonPathExistsFunction { mod tests { use std::sync::Arc; - use datatypes::prelude::ScalarVector; - use datatypes::vectors::{BinaryVector, NullVector, StringVector}; + use arrow_schema::Field; + use datafusion_common::arrow::array::{BinaryArray, NullArray, StringArray}; use super::*; @@ -138,31 +118,6 @@ mod tests { json_path_exists.return_type(&[DataType::Binary]).unwrap() ); - assert!(matches!(json_path_exists.signature(), - Signature { - type_signature: TypeSignature::OneOf(valid_types), - volatility: Volatility::Immutable - } if valid_types == - vec![ - TypeSignature::Exact(vec![ - DataType::Binary, - DataType::Utf8, - ]), - TypeSignature::Exact(vec![ - DataType::Null, - DataType::Utf8, - ]), - TypeSignature::Exact(vec![ - DataType::Binary, - DataType::Null, - ]), - TypeSignature::Exact(vec![ - DataType::Null, - DataType::Null, - ]), - ], - )); - let json_strings = [ r#"{"a": {"b": 2}, "b": 2, "c": 3}"#, r#"{"a": 4, "b": {"c": 6}, "c": 6}"#, @@ -186,51 +141,83 @@ mod tests { }) .collect::>(); - let json_vector = BinaryVector::from_vec(jsonbs); - let path_vector = StringVector::from_vec(paths); - let args: Vec = vec![Arc::new(json_vector), Arc::new(path_vector)]; - let vector = json_path_exists - .eval(&FunctionContext::default(), &args) + let args = ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))), + ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))), + ], + arg_fields: vec![], + number_rows: 8, + return_field: Arc::new(Field::new("x", DataType::Boolean, false)), + config_options: Arc::new(Default::default()), + }; + let result = json_path_exists + .invoke_with_args(args) + .and_then(|x| x.to_array(8)) .unwrap(); + let vector = result.as_boolean(); // Test for non-nulls. assert_eq!(8, vector.len()); for (i, real) in expected.iter().enumerate() { - let result = vector.get_ref(i); - assert!(!result.is_null()); - let val = result.as_boolean().unwrap().unwrap(); + let val = vector.value(i); assert_eq!(val, *real); } // Test for path error. let json_bytes = jsonb::parse_value("{}".as_bytes()).unwrap().to_vec(); - let json = BinaryVector::from_vec(vec![json_bytes]); - let illegal_path = StringVector::from_vec(vec!["$..a"]); + let illegal_path = "$..a"; - let args: Vec = vec![Arc::new(json), Arc::new(illegal_path)]; - let err = json_path_exists.eval(&FunctionContext::default(), &args); + let args = ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(vec![json_bytes]))), + ColumnarValue::Array(Arc::new(StringArray::from_iter_values(vec![illegal_path]))), + ], + arg_fields: vec![], + number_rows: 1, + return_field: Arc::new(Field::new("x", DataType::Boolean, false)), + config_options: Arc::new(Default::default()), + }; + let err = json_path_exists.invoke_with_args(args); assert!(err.is_err()); // Test for nulls. let json_bytes = jsonb::parse_value("{}".as_bytes()).unwrap().to_vec(); - let json = BinaryVector::from_vec(vec![json_bytes]); - let null_json = NullVector::new(1); + let json = Arc::new(BinaryArray::from_iter_values(vec![json_bytes])); + let null_json = Arc::new(NullArray::new(1)); - let path = StringVector::from_vec(vec!["$.a"]); - let null_path = NullVector::new(1); + let path = Arc::new(StringArray::from_iter_values(vec!["$.a"])); + let null_path = Arc::new(NullArray::new(1)); - let args: Vec = vec![Arc::new(null_json), Arc::new(path)]; - let result1 = json_path_exists - .eval(&FunctionContext::default(), &args) + let args = ScalarFunctionArgs { + args: vec![ColumnarValue::Array(null_json), ColumnarValue::Array(path)], + arg_fields: vec![], + number_rows: 1, + return_field: Arc::new(Field::new("x", DataType::Boolean, false)), + config_options: Arc::new(Default::default()), + }; + let result = json_path_exists + .invoke_with_args(args) + .and_then(|x| x.to_array(1)) .unwrap(); - let args: Vec = vec![Arc::new(json), Arc::new(null_path)]; - let result2 = json_path_exists - .eval(&FunctionContext::default(), &args) + let result1 = result.as_boolean(); + + let args = ScalarFunctionArgs { + args: vec![ColumnarValue::Array(json), ColumnarValue::Array(null_path)], + arg_fields: vec![], + number_rows: 1, + return_field: Arc::new(Field::new("x", DataType::Boolean, false)), + config_options: Arc::new(Default::default()), + }; + let result = json_path_exists + .invoke_with_args(args) + .and_then(|x| x.to_array(1)) .unwrap(); + let result2 = result.as_boolean(); assert_eq!(result1.len(), 1); - assert!(result1.get_ref(0).is_null()); + assert!(result1.is_null(0)); assert_eq!(result2.len(), 1); - assert!(result2.get_ref(0).is_null()); + assert!(result2.is_null(0)); } } diff --git a/src/common/function/src/scalars/json/json_path_match.rs b/src/common/function/src/scalars/json/json_path_match.rs index 3adbe7eab9..26370100e1 100644 --- a/src/common/function/src/scalars/json/json_path_match.rs +++ b/src/common/function/src/scalars/json/json_path_match.rs @@ -13,17 +13,16 @@ // limitations under the License. use std::fmt::{self, Display}; +use std::sync::Arc; -use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu}; -use datafusion_expr::{Signature, Volatility}; -use datatypes::arrow::datatypes::DataType; -use datatypes::data_type::ConcreteDataType; -use datatypes::prelude::VectorRef; -use datatypes::scalars::ScalarVectorBuilder; -use datatypes::vectors::{BooleanVectorBuilder, MutableVector}; -use snafu::ensure; +use arrow::compute; +use common_query::error::Result; +use datafusion_common::arrow::array::{Array, AsArray, BooleanBuilder}; +use datafusion_common::arrow::datatypes::DataType; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature}; -use crate::function::{Function, FunctionContext}; +use crate::function::{Function, extract_args}; +use crate::helper; /// Check if the given JSON data match the given JSON path's predicate. #[derive(Clone, Debug, Default)] @@ -42,66 +41,47 @@ impl Function for JsonPathMatchFunction { fn signature(&self) -> Signature { // TODO(LFC): Use a more clear type here instead of "Binary" for Json input, once we have a "Json" type. - Signature::exact( - vec![DataType::Binary, DataType::Utf8], - Volatility::Immutable, + helper::one_of_sigs2( + vec![DataType::Binary, DataType::BinaryView], + vec![DataType::Utf8, DataType::Utf8View], ) } - fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure!( - columns.len() == 2, - InvalidFuncArgsSnafu { - err_msg: format!( - "The length of the args is not correct, expect exactly two, have: {}", - columns.len() - ), - } - ); - let jsons = &columns[0]; - let paths = &columns[1]; + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + let [arg0, arg1] = extract_args(self.name(), &args)?; + let arg0 = compute::cast(&arg0, &DataType::BinaryView)?; + let jsons = arg0.as_binary_view(); + let arg1 = compute::cast(&arg1, &DataType::Utf8View)?; + let paths = arg1.as_string_view(); let size = jsons.len(); - let mut results = BooleanVectorBuilder::with_capacity(size); + let mut builder = BooleanBuilder::with_capacity(size); for i in 0..size { - let json = jsons.get_ref(i); - let path = paths.get_ref(i); + let json = jsons.is_valid(i).then(|| jsons.value(i)); + let path = paths.is_valid(i).then(|| paths.value(i)); - match json.data_type() { - // JSON data type uses binary vector - ConcreteDataType::Binary(_) => { - let json = json.as_binary(); - let path = path.as_string(); - let result = match (json, path) { - (Ok(Some(json)), Ok(Some(path))) => { - if !jsonb::is_null(json) { - let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes()); - match json_path { - Ok(json_path) => jsonb::path_match(json, json_path).ok(), - Err(_) => None, - } - } else { - None - } + let result = match (json, path) { + (Some(json), Some(path)) => { + if !jsonb::is_null(json) { + let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes()); + match json_path { + Ok(json_path) => jsonb::path_match(json, json_path).ok(), + Err(_) => None, } - _ => None, - }; - - results.push(result); - } - - _ => { - return UnsupportedInputDataTypeSnafu { - function: NAME, - datatypes: columns.iter().map(|c| c.data_type()).collect::>(), + } else { + None } - .fail(); } - } + _ => None, + }; + builder.append_option(result); } - Ok(results.to_vector()) + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } } @@ -115,8 +95,8 @@ impl Display for JsonPathMatchFunction { mod tests { use std::sync::Arc; - use datafusion_expr::TypeSignature; - use datatypes::vectors::{BinaryVector, StringVector}; + use arrow_schema::Field; + use datafusion_common::arrow::array::{BinaryArray, StringArray}; use super::*; @@ -130,13 +110,6 @@ mod tests { json_path_match.return_type(&[DataType::Binary]).unwrap() ); - assert!(matches!(json_path_match.signature(), - Signature { - type_signature: TypeSignature::Exact(valid_types), - volatility: Volatility::Immutable - } if valid_types == vec![DataType::Binary, DataType::Utf8], - )); - let json_strings = [ Some(r#"{"a": {"b": 2}, "b": 2, "c": 3}"#.to_string()), Some(r#"{"a": 1, "b": [1,2,3]}"#.to_string()), @@ -172,27 +145,25 @@ mod tests { .map(|s| s.map(|json| jsonb::parse_value(json.as_bytes()).unwrap().to_vec())) .collect::>(); - let json_vector = BinaryVector::from(jsonbs); - let path_vector = StringVector::from(paths); - let args: Vec = vec![Arc::new(json_vector), Arc::new(path_vector)]; - let vector = json_path_match - .eval(&FunctionContext::default(), &args) + let args = ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(Arc::new(BinaryArray::from_iter(jsonbs))), + ColumnarValue::Array(Arc::new(StringArray::from_iter(paths))), + ], + arg_fields: vec![], + number_rows: 7, + return_field: Arc::new(Field::new("x", DataType::Boolean, false)), + config_options: Arc::new(Default::default()), + }; + let result = json_path_match + .invoke_with_args(args) + .and_then(|x| x.to_array(7)) .unwrap(); + let vector = result.as_boolean(); assert_eq!(7, vector.len()); - for (i, expected) in results.iter().enumerate() { - let result = vector.get_ref(i); - - match expected { - Some(expected_value) => { - assert!(!result.is_null()); - let result_value = result.as_boolean().unwrap().unwrap(); - assert_eq!(*expected_value, result_value); - } - None => { - assert!(result.is_null()); - } - } + for (actual, expected) in vector.iter().zip(results) { + assert_eq!(actual, expected); } } } diff --git a/src/common/function/src/scalars/json/json_to_string.rs b/src/common/function/src/scalars/json/json_to_string.rs index a5197a249c..f10d5c90fb 100644 --- a/src/common/function/src/scalars/json/json_to_string.rs +++ b/src/common/function/src/scalars/json/json_to_string.rs @@ -13,17 +13,15 @@ // limitations under the License. use std::fmt::{self, Display}; +use std::sync::Arc; -use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu}; -use datafusion_expr::{Signature, Volatility}; -use datatypes::arrow::datatypes::DataType; -use datatypes::data_type::ConcreteDataType; -use datatypes::prelude::VectorRef; -use datatypes::scalars::ScalarVectorBuilder; -use datatypes::vectors::{MutableVector, StringVectorBuilder}; -use snafu::ensure; +use common_query::error::Result; +use datafusion_common::DataFusionError; +use datafusion_common::arrow::array::{Array, AsArray, StringViewBuilder}; +use datafusion_common::arrow::datatypes::DataType; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility}; -use crate::function::{Function, FunctionContext}; +use crate::function::{Function, extract_args}; /// Converts the `JSONB` into `String`. It's useful for displaying JSONB content. #[derive(Clone, Debug, Default)] @@ -37,7 +35,7 @@ impl Function for JsonToStringFunction { } fn return_type(&self, _: &[DataType]) -> Result { - Ok(DataType::Utf8) + Ok(DataType::Utf8View) } fn signature(&self) -> Signature { @@ -45,58 +43,27 @@ impl Function for JsonToStringFunction { Signature::exact(vec![DataType::Binary], Volatility::Immutable) } - fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure!( - columns.len() == 1, - InvalidFuncArgsSnafu { - err_msg: format!( - "The length of the args is not correct, expect exactly one, have: {}", - columns.len() - ), - } - ); - let jsons = &columns[0]; + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + let [arg0] = extract_args(self.name(), &args)?; + let jsons = arg0.as_binary::(); let size = jsons.len(); - let datatype = jsons.data_type(); - let mut results = StringVectorBuilder::with_capacity(size); + let mut builder = StringViewBuilder::with_capacity(size); - match datatype { - // JSON data type uses binary vector - ConcreteDataType::Binary(_) => { - for i in 0..size { - let json = jsons.get_ref(i); + for i in 0..size { + let json = jsons.is_valid(i).then(|| jsons.value(i)); + let result = json + .map(|json| jsonb::from_slice(json).map(|x| x.to_string())) + .transpose() + .map_err(|e| DataFusionError::Execution(format!("invalid json binary: {e}")))?; - let json = json.as_binary(); - let result = match json { - Ok(Some(json)) => match jsonb::from_slice(json) { - Ok(json) => { - let json = json.to_string(); - Some(json) - } - Err(_) => { - return InvalidFuncArgsSnafu { - err_msg: format!("Illegal json binary: {:?}", json), - } - .fail(); - } - }, - _ => None, - }; - - results.push(result.as_deref()); - } - } - _ => { - return UnsupportedInputDataTypeSnafu { - function: NAME, - datatypes: columns.iter().map(|c| c.data_type()).collect::>(), - } - .fail(); - } + builder.append_option(result.as_deref()); } - Ok(results.to_vector()) + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } } @@ -110,9 +77,9 @@ impl Display for JsonToStringFunction { mod tests { use std::sync::Arc; + use arrow_schema::Field; + use datafusion_common::arrow::array::BinaryArray; use datafusion_expr::TypeSignature; - use datatypes::scalars::ScalarVector; - use datatypes::vectors::BinaryVector; use super::*; @@ -122,7 +89,7 @@ mod tests { assert_eq!("json_to_string", json_to_string.name()); assert_eq!( - DataType::Utf8, + DataType::Utf8View, json_to_string.return_type(&[DataType::Binary]).unwrap() ); @@ -147,24 +114,39 @@ mod tests { }) .collect::>(); - let json_vector = BinaryVector::from_vec(jsonbs); - let args: Vec = vec![Arc::new(json_vector)]; - let vector = json_to_string - .eval(&FunctionContext::default(), &args) + let args = ScalarFunctionArgs { + args: vec![ColumnarValue::Array(Arc::new( + BinaryArray::from_iter_values(jsonbs), + ))], + arg_fields: vec![], + number_rows: 3, + return_field: Arc::new(Field::new("x", DataType::Utf8View, false)), + config_options: Arc::new(Default::default()), + }; + let result = json_to_string + .invoke_with_args(args) + .and_then(|x| x.to_array(1)) .unwrap(); + let vector = result.as_string_view(); assert_eq!(3, vector.len()); for (i, gt) in json_strings.iter().enumerate() { - let result = vector.get_ref(i); - let result = result.as_string().unwrap().unwrap(); + let result = vector.value(i); // remove whitespaces assert_eq!(gt.replace(" ", ""), result); } let invalid_jsonb = vec![b"invalid json"]; - let invalid_json_vector = BinaryVector::from_vec(invalid_jsonb); - let args: Vec = vec![Arc::new(invalid_json_vector)]; - let vector = json_to_string.eval(&FunctionContext::default(), &args); + let args = ScalarFunctionArgs { + args: vec![ColumnarValue::Array(Arc::new( + BinaryArray::from_iter_values(invalid_jsonb), + ))], + arg_fields: vec![], + number_rows: 1, + return_field: Arc::new(Field::new("x", DataType::Utf8View, false)), + config_options: Arc::new(Default::default()), + }; + let vector = json_to_string.invoke_with_args(args); assert!(vector.is_err()); } } diff --git a/src/common/function/src/scalars/json/parse_json.rs b/src/common/function/src/scalars/json/parse_json.rs index 40914ae791..16199e5c84 100644 --- a/src/common/function/src/scalars/json/parse_json.rs +++ b/src/common/function/src/scalars/json/parse_json.rs @@ -13,17 +13,16 @@ // limitations under the License. use std::fmt::{self, Display}; +use std::sync::Arc; -use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu}; -use datafusion::arrow::datatypes::DataType; -use datafusion_expr::{Signature, Volatility}; -use datatypes::data_type::ConcreteDataType; -use datatypes::prelude::VectorRef; -use datatypes::scalars::ScalarVectorBuilder; -use datatypes::vectors::{BinaryVectorBuilder, MutableVector}; -use snafu::ensure; +use common_query::error::Result; +use datafusion_common::DataFusionError; +use datafusion_common::arrow::array::{Array, AsArray, BinaryViewBuilder}; +use datafusion_common::arrow::compute; +use datafusion_common::arrow::datatypes::DataType; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility}; -use crate::function::{Function, FunctionContext}; +use crate::function::{Function, extract_args}; /// Parses the `String` into `JSONB`. #[derive(Clone, Debug, Default)] @@ -37,64 +36,37 @@ impl Function for ParseJsonFunction { } fn return_type(&self, _: &[DataType]) -> Result { - Ok(DataType::Binary) + Ok(DataType::BinaryView) } fn signature(&self) -> Signature { Signature::string(1, Volatility::Immutable) } - fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure!( - columns.len() == 1, - InvalidFuncArgsSnafu { - err_msg: format!( - "The length of the args is not correct, expect exactly one, have: {}", - columns.len() - ), - } - ); - let json_strings = &columns[0]; + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + let [arg0] = extract_args(self.name(), &args)?; + let arg0 = compute::cast(&arg0, &DataType::Utf8View)?; + let json_strings = arg0.as_string_view(); let size = json_strings.len(); - let datatype = json_strings.data_type(); - let mut results = BinaryVectorBuilder::with_capacity(size); + let mut builder = BinaryViewBuilder::with_capacity(size); - match datatype { - ConcreteDataType::String(_) => { - for i in 0..size { - let json_string = json_strings.get_ref(i); - - let json_string = json_string.as_string(); - let result = match json_string { - Ok(Some(json_string)) => match jsonb::parse_value(json_string.as_bytes()) { - Ok(json) => Some(json.to_vec()), - Err(_) => { - return InvalidFuncArgsSnafu { - err_msg: format!( - "Cannot convert the string to json, have: {}", - json_string - ), - } - .fail(); - } - }, - _ => None, - }; - - results.push(result.as_deref()); - } - } - _ => { - return UnsupportedInputDataTypeSnafu { - function: NAME, - datatypes: columns.iter().map(|c| c.data_type()).collect::>(), - } - .fail(); - } + for i in 0..size { + let s = json_strings.is_valid(i).then(|| json_strings.value(i)); + let result = s + .map(|s| { + jsonb::parse_value(s.as_bytes()) + .map(|x| x.to_vec()) + .map_err(|e| DataFusionError::Execution(format!("cannot parse '{s}': {e}"))) + }) + .transpose()?; + builder.append_option(result.as_deref()); } - Ok(results.to_vector()) + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } } @@ -108,8 +80,8 @@ impl Display for ParseJsonFunction { mod tests { use std::sync::Arc; - use datatypes::scalars::ScalarVector; - use datatypes::vectors::StringVector; + use arrow_schema::Field; + use datafusion_common::arrow::array::StringViewArray; use super::*; @@ -119,7 +91,7 @@ mod tests { assert_eq!("parse_json", parse_json.name()); assert_eq!( - DataType::Binary, + DataType::BinaryView, parse_json.return_type(&[DataType::Binary]).unwrap() ); @@ -137,14 +109,24 @@ mod tests { }) .collect::>(); - let json_string_vector = StringVector::from_vec(json_strings.to_vec()); - let args: Vec = vec![Arc::new(json_string_vector)]; - let vector = parse_json.eval(&FunctionContext::default(), &args).unwrap(); + let args = ScalarFunctionArgs { + args: vec![ColumnarValue::Array(Arc::new( + StringViewArray::from_iter_values(json_strings), + ))], + arg_fields: vec![], + number_rows: 3, + return_field: Arc::new(Field::new("x", DataType::BinaryView, false)), + config_options: Arc::new(Default::default()), + }; + let result = parse_json + .invoke_with_args(args) + .and_then(|x| x.to_array(3)) + .unwrap(); + let vector = result.as_binary_view(); assert_eq!(3, vector.len()); for (i, gt) in jsonbs.iter().enumerate() { - let result = vector.get_ref(i); - let result = result.as_binary().unwrap().unwrap(); + let result = vector.value(i); assert_eq!(gt, result); } } diff --git a/src/common/function/src/scalars/matches_term.rs b/src/common/function/src/scalars/matches_term.rs index d4261c2881..4160579193 100644 --- a/src/common/function/src/scalars/matches_term.rs +++ b/src/common/function/src/scalars/matches_term.rs @@ -13,18 +13,17 @@ // limitations under the License. use std::fmt; -use std::iter::repeat_n; use std::sync::Arc; -use common_query::error::{InvalidFuncArgsSnafu, Result}; -use datafusion::arrow::datatypes::DataType; -use datafusion_expr::{Signature, Volatility}; -use datatypes::scalars::ScalarVectorBuilder; -use datatypes::vectors::{BooleanVector, BooleanVectorBuilder, MutableVector, VectorRef}; +use common_query::error::Result; +use datafusion_common::arrow::array::{Array, AsArray, BooleanArray, BooleanBuilder}; +use datafusion_common::arrow::compute; +use datafusion_common::arrow::datatypes::DataType; +use datafusion_common::{DataFusionError, ScalarValue}; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility}; use memchr::memmem; -use snafu::ensure; -use crate::function::{Function, FunctionContext}; +use crate::function::Function; use crate::function_registry::FunctionRegistry; /// Exact term/phrase matching function for text columns. @@ -100,64 +99,94 @@ impl Function for MatchesTermFunction { Signature::string(2, Volatility::Immutable) } - fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure!( - columns.len() == 2, - InvalidFuncArgsSnafu { - err_msg: format!( - "The length of the args is not correct, expect exactly 2, have: {}", - columns.len() - ), - } - ); + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + let [arg0, arg1] = datafusion_common::utils::take_function_args(self.name(), &args.args)?; - let text_column = &columns[0]; - if text_column.is_empty() { - return Ok(Arc::new(BooleanVector::from(Vec::::with_capacity(0)))); + fn as_str(v: &ScalarValue) -> Option<&str> { + match v { + ScalarValue::Utf8View(Some(x)) + | ScalarValue::Utf8(Some(x)) + | ScalarValue::LargeUtf8(Some(x)) => Some(x.as_str()), + _ => None, + } } - let term_column = &columns[1]; - let compiled_finder = if term_column.is_const() { - let term = term_column.get_ref(0).as_string().unwrap(); - match term { - None => { - return Ok(Arc::new(BooleanVector::from_iter(repeat_n( - None, - text_column.len(), - )))); - } - Some(term) => Some(MatchesTermFinder::new(term)), - } - } else { - None + if let (ColumnarValue::Scalar(text), ColumnarValue::Scalar(term)) = (arg0, arg1) { + let text = as_str(text); + let term = as_str(term); + let result = match (text, term) { + (Some(text), Some(term)) => Some(MatchesTermFinder::new(term).find(text)), + _ => None, + }; + return Ok(ColumnarValue::Scalar(ScalarValue::Boolean(result))); }; - let len = text_column.len(); - let mut result = BooleanVectorBuilder::with_capacity(len); - for i in 0..len { - let text = text_column.get_ref(i).as_string().unwrap(); - let Some(text) = text else { - result.push_null(); - continue; - }; + let v = match (arg0, arg1) { + (ColumnarValue::Scalar(_), ColumnarValue::Scalar(_)) => { + // Unreachable because we have checked this case above and returned if matched. + unreachable!() + } + (ColumnarValue::Scalar(text), ColumnarValue::Array(terms)) => { + let text = as_str(text); + if let Some(text) = text { + let terms = compute::cast(terms, &DataType::Utf8View)?; + let terms = terms.as_string_view(); - let contains = match &compiled_finder { - Some(finder) => finder.find(text), - None => { - let term = match term_column.get_ref(i).as_string().unwrap() { - None => { - result.push_null(); - continue; - } - Some(term) => term, - }; - MatchesTermFinder::new(term).find(text) + let mut builder = BooleanBuilder::with_capacity(terms.len()); + terms.iter().for_each(|term| { + builder.append_option(term.map(|x| MatchesTermFinder::new(x).find(text))) + }); + ColumnarValue::Array(Arc::new(builder.finish())) + } else { + ColumnarValue::Array(Arc::new(BooleanArray::new_null(terms.len()))) } - }; - result.push(Some(contains)); - } + } + (ColumnarValue::Array(texts), ColumnarValue::Scalar(term)) => { + let term = as_str(term); + if let Some(term) = term { + let finder = MatchesTermFinder::new(term); - Ok(result.to_vector()) + let texts = compute::cast(texts, &DataType::Utf8View)?; + let texts = texts.as_string_view(); + + let mut builder = BooleanBuilder::with_capacity(texts.len()); + texts + .iter() + .for_each(|text| builder.append_option(text.map(|x| finder.find(x)))); + ColumnarValue::Array(Arc::new(builder.finish())) + } else { + ColumnarValue::Array(Arc::new(BooleanArray::new_null(texts.len()))) + } + } + (ColumnarValue::Array(texts), ColumnarValue::Array(terms)) => { + let terms = compute::cast(terms, &DataType::Utf8View)?; + let terms = terms.as_string_view(); + let texts = compute::cast(texts, &DataType::Utf8View)?; + let texts = texts.as_string_view(); + + let len = texts.len(); + if terms.len() != len { + return Err(DataFusionError::Internal(format!( + "input arrays have different lengths: {len}, {}", + terms.len() + ))); + } + + let mut builder = BooleanBuilder::with_capacity(len); + for (text, term) in texts.iter().zip(terms.iter()) { + let result = match (text, term) { + (Some(text), Some(term)) => Some(MatchesTermFinder::new(term).find(text)), + _ => None, + }; + builder.append_option(result); + } + ColumnarValue::Array(Arc::new(builder.finish())) + } + }; + Ok(v) } } diff --git a/src/common/function/src/scalars/math/rate.rs b/src/common/function/src/scalars/math/rate.rs index c899cfaf17..06ae995f2b 100644 --- a/src/common/function/src/scalars/math/rate.rs +++ b/src/common/function/src/scalars/math/rate.rs @@ -16,14 +16,13 @@ use std::fmt; use common_query::error::{self, Result}; use datafusion::arrow::compute::kernels::numeric; +use datafusion_common::arrow::compute::kernels::cast; +use datafusion_common::arrow::datatypes::DataType; use datafusion_expr::type_coercion::aggregates::NUMERICS; -use datafusion_expr::{Signature, Volatility}; -use datatypes::arrow::compute::kernels::cast; -use datatypes::arrow::datatypes::DataType; -use datatypes::vectors::{Helper, VectorRef}; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility}; use snafu::ResultExt; -use crate::function::{Function, FunctionContext}; +use crate::function::{Function, extract_args}; /// generates rates from a sequence of adjacent data points. #[derive(Clone, Debug, Default)] @@ -48,12 +47,14 @@ impl Function for RateFunction { Signature::uniform(2, NUMERICS.to_vec(), Volatility::Immutable) } - fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - let val = &columns[0].to_arrow_array(); + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + let [val, ts] = extract_args(self.name(), &args)?; let val_0 = val.slice(0, val.len() - 1); let val_1 = val.slice(1, val.len() - 1); let dv = numeric::sub(&val_1, &val_0).context(error::ArrowComputeSnafu)?; - let ts = &columns[1].to_arrow_array(); let ts_0 = ts.slice(0, ts.len() - 1); let ts_1 = ts.slice(1, ts.len() - 1); let dt = numeric::sub(&ts_1, &ts_0).context(error::ArrowComputeSnafu)?; @@ -65,9 +66,8 @@ impl Function for RateFunction { typ: DataType::Float64, })?; let rate = numeric::div(&dv, &dt).context(error::ArrowComputeSnafu)?; - let v = Helper::try_into_vector(&rate).context(error::FromArrowArraySnafu)?; - Ok(v) + Ok(ColumnarValue::Array(rate)) } } @@ -75,8 +75,10 @@ impl Function for RateFunction { mod tests { use std::sync::Arc; + use arrow_schema::Field; + use datafusion_common::arrow::array::{AsArray, Float32Array, Float64Array, Int64Array}; + use datafusion_common::arrow::datatypes::Float64Type; use datafusion_expr::TypeSignature; - use datatypes::vectors::{Float32Vector, Float64Vector, Int64Vector}; use super::*; #[test] @@ -93,12 +95,22 @@ mod tests { let values = vec![1.0, 3.0, 6.0]; let ts = vec![0, 1, 2]; - let args: Vec = vec![ - Arc::new(Float32Vector::from_vec(values)), - Arc::new(Int64Vector::from_vec(ts)), - ]; - let vector = rate.eval(&FunctionContext::default(), &args).unwrap(); - let expect: VectorRef = Arc::new(Float64Vector::from_vec(vec![2.0, 3.0])); + let args = ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(Arc::new(Float32Array::from(values))), + ColumnarValue::Array(Arc::new(Int64Array::from(ts))), + ], + arg_fields: vec![], + number_rows: 3, + return_field: Arc::new(Field::new("x", DataType::Float64, false)), + config_options: Arc::new(Default::default()), + }; + let result = rate + .invoke_with_args(args) + .and_then(|x| x.to_array(2)) + .unwrap(); + let vector = result.as_primitive::(); + let expect = &Float64Array::from(vec![2.0, 3.0]); assert_eq!(expect, vector); } } diff --git a/src/common/function/src/scalars/timestamp/to_unixtime.rs b/src/common/function/src/scalars/timestamp/to_unixtime.rs index 2d4ad0e6e5..239fdbc96a 100644 --- a/src/common/function/src/scalars/timestamp/to_unixtime.rs +++ b/src/common/function/src/scalars/timestamp/to_unixtime.rs @@ -15,15 +15,18 @@ use std::fmt; use std::sync::Arc; -use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu}; +use common_query::error::Result; use common_time::{Date, Timestamp}; -use datafusion_expr::{Signature, Volatility}; -use datatypes::arrow::datatypes::{DataType, TimeUnit}; -use datatypes::prelude::ConcreteDataType; -use datatypes::vectors::{Int64Vector, VectorRef}; -use snafu::ensure; +use datafusion_common::DataFusionError; +use datafusion_common::arrow::array::{ + Array, ArrayRef, AsArray, Date32Array, Int64Array, Int64Builder, +}; +use datafusion_common::arrow::compute; +use datafusion_common::arrow::datatypes::{ArrowTimestampType, DataType, Date32Type, TimeUnit}; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility}; -use crate::function::{Function, FunctionContext}; +use crate::function::{Function, FunctionContext, extract_args, find_function_context}; +use crate::helper::with_match_timestamp_types; /// A function to convert the column into the unix timestamp in seconds. #[derive(Clone, Debug, Default)] @@ -44,15 +47,23 @@ fn convert_to_seconds(arg: &str, func_ctx: &FunctionContext) -> Option { None } -fn convert_timestamps_to_seconds(vector: &VectorRef) -> Vec> { - (0..vector.len()) - .map(|i| vector.get(i).as_timestamp().map(|ts| ts.split().0)) - .collect::>>() +fn convert_timestamps_to_seconds(array: &ArrayRef) -> datafusion_common::Result>> { + with_match_timestamp_types!(array.data_type(), |$S| { + let array = array.as_primitive::<$S>(); + array + .iter() + .map(|x| x.map(|i| Timestamp::new(i, $S::UNIT.into()).split().0)) + .collect::>() + }) } -fn convert_dates_to_seconds(vector: &VectorRef) -> Vec> { +fn convert_dates_to_seconds(vector: &Date32Array) -> Vec> { (0..vector.len()) - .map(|i| vector.get(i).as_date().map(|dt| dt.to_secs())) + .map(|i| { + vector + .is_valid(i) + .then(|| Date::from(vector.value(i)).to_secs()) + }) .collect::>>() } @@ -82,43 +93,43 @@ impl Function for ToUnixtimeFunction { ) } - fn eval(&self, ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure!( - columns.len() == 1, - InvalidFuncArgsSnafu { - err_msg: format!( - "The length of the args is not correct, expect exactly one, have: {}", - columns.len() - ), + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + let ctx = find_function_context(&args)?; + let [arg0] = extract_args(self.name(), &args)?; + let result: ArrayRef = match arg0.data_type() { + DataType::Utf8 => { + let arg0 = arg0.as_string::(); + let mut builder = Int64Builder::with_capacity(arg0.len()); + for i in 0..arg0.len() { + builder.append_option( + arg0.is_valid(i) + .then(|| convert_to_seconds(arg0.value(i), ctx)) + .flatten(), + ); + } + Arc::new(builder.finish()) } - ); - - let vector = &columns[0]; - - match columns[0].data_type() { - ConcreteDataType::String(_) => Ok(Arc::new(Int64Vector::from( - (0..vector.len()) - .map(|i| convert_to_seconds(&vector.get(i).to_string(), ctx)) - .collect::>(), - ))), - ConcreteDataType::Int64(_) | ConcreteDataType::Int32(_) => { - // Safety: cast always successfully at here - Ok(vector.cast(&ConcreteDataType::int64_datatype()).unwrap()) - } - ConcreteDataType::Date(_) => { + DataType::Int64 | DataType::Int32 => compute::cast(&arg0, &DataType::Int64)?, + DataType::Date32 => { + let vector = arg0.as_primitive::(); let seconds = convert_dates_to_seconds(vector); - Ok(Arc::new(Int64Vector::from(seconds))) + Arc::new(Int64Array::from(seconds)) } - ConcreteDataType::Timestamp(_) => { - let seconds = convert_timestamps_to_seconds(vector); - Ok(Arc::new(Int64Vector::from(seconds))) + DataType::Timestamp(_, _) => { + let seconds = convert_timestamps_to_seconds(&arg0)?; + Arc::new(Int64Array::from(seconds)) } - _ => UnsupportedInputDataTypeSnafu { - function: NAME, - datatypes: columns.iter().map(|c| c.data_type()).collect::>(), + x => { + return Err(DataFusionError::Execution(format!( + "{}: unsupported input data type {x}", + self.name() + ))); } - .fail(), - } + }; + Ok(ColumnarValue::Array(result)) } } @@ -130,14 +141,41 @@ impl fmt::Display for ToUnixtimeFunction { #[cfg(test)] mod tests { - use datafusion_expr::TypeSignature; - use datatypes::value::Value; - use datatypes::vectors::{ - DateVector, StringVector, TimestampMillisecondVector, TimestampSecondVector, + use arrow_schema::Field; + use datafusion_common::arrow::array::{ + StringArray, TimestampMillisecondArray, TimestampSecondArray, }; + use datafusion_common::arrow::datatypes::Int64Type; + use datafusion_common::config::ConfigOptions; + use datafusion_expr::TypeSignature; use super::{ToUnixtimeFunction, *}; + fn test_invoke(arg0: ArrayRef, expects: &[Option]) { + let mut config_options = ConfigOptions::default(); + config_options.extensions.insert(FunctionContext::default()); + let config_options = Arc::new(config_options); + + let number_rows = arg0.len(); + let args = ScalarFunctionArgs { + args: vec![ColumnarValue::Array(arg0)], + arg_fields: vec![], + number_rows, + return_field: Arc::new(Field::new("", DataType::Int64, true)), + config_options, + }; + let result = ToUnixtimeFunction + .invoke_with_args(args) + .and_then(|x| x.to_array(number_rows)) + .unwrap(); + let result = result.as_primitive::(); + + assert_eq!(result.len(), expects.len()); + for (actual, expect) in result.iter().zip(expects) { + assert_eq!(&actual, expect); + } + } + #[test] fn test_string_to_unixtime() { let f = ToUnixtimeFunction; @@ -167,115 +205,36 @@ mod tests { Some("invalid_time_stamp"), ]; let results = [Some(1677652502), None, Some(1656633600), None]; - let args: Vec = vec![Arc::new(StringVector::from(times.clone()))]; - let vector = f.eval(&FunctionContext::default(), &args).unwrap(); - assert_eq!(4, vector.len()); - for (i, _t) in times.iter().enumerate() { - let v = vector.get(i); - if i == 1 || i == 3 { - assert_eq!(Value::Null, v); - continue; - } - match v { - Value::Int64(ts) => { - assert_eq!(ts, (*results.get(i).unwrap()).unwrap()); - } - _ => unreachable!(), - } - } + let arg0 = Arc::new(StringArray::from(times)); + test_invoke(arg0, &results); } #[test] fn test_int_to_unixtime() { - let f = ToUnixtimeFunction; - let times = vec![Some(3_i64), None, Some(5_i64), None]; let results = [Some(3), None, Some(5), None]; - let args: Vec = vec![Arc::new(Int64Vector::from(times.clone()))]; - let vector = f.eval(&FunctionContext::default(), &args).unwrap(); - assert_eq!(4, vector.len()); - for (i, _t) in times.iter().enumerate() { - let v = vector.get(i); - if i == 1 || i == 3 { - assert_eq!(Value::Null, v); - continue; - } - match v { - Value::Int64(ts) => { - assert_eq!(ts, (*results.get(i).unwrap()).unwrap()); - } - _ => unreachable!(), - } - } + let arg0 = Arc::new(Int64Array::from(times)); + test_invoke(arg0, &results); } #[test] fn test_date_to_unixtime() { - let f = ToUnixtimeFunction; - let times = vec![Some(123), None, Some(42), None]; let results = [Some(10627200), None, Some(3628800), None]; - let date_vector = DateVector::from(times.clone()); - let args: Vec = vec![Arc::new(date_vector)]; - let vector = f.eval(&FunctionContext::default(), &args).unwrap(); - assert_eq!(4, vector.len()); - for (i, _t) in times.iter().enumerate() { - let v = vector.get(i); - if i == 1 || i == 3 { - assert_eq!(Value::Null, v); - continue; - } - match v { - Value::Int64(ts) => { - assert_eq!(ts, (*results.get(i).unwrap()).unwrap()); - } - _ => unreachable!(), - } - } + let arg0 = Arc::new(Date32Array::from(times)); + test_invoke(arg0, &results); } #[test] fn test_timestamp_to_unixtime() { - let f = ToUnixtimeFunction; - let times = vec![Some(123), None, Some(42), None]; let results = [Some(123), None, Some(42), None]; - let ts_vector = TimestampSecondVector::from(times.clone()); - let args: Vec = vec![Arc::new(ts_vector)]; - let vector = f.eval(&FunctionContext::default(), &args).unwrap(); - assert_eq!(4, vector.len()); - for (i, _t) in times.iter().enumerate() { - let v = vector.get(i); - if i == 1 || i == 3 { - assert_eq!(Value::Null, v); - continue; - } - match v { - Value::Int64(ts) => { - assert_eq!(ts, (*results.get(i).unwrap()).unwrap()); - } - _ => unreachable!(), - } - } + let arg0 = Arc::new(TimestampSecondArray::from(times)); + test_invoke(arg0, &results); let times = vec![Some(123000), None, Some(42000), None]; let results = [Some(123), None, Some(42), None]; - let ts_vector = TimestampMillisecondVector::from(times.clone()); - let args: Vec = vec![Arc::new(ts_vector)]; - let vector = f.eval(&FunctionContext::default(), &args).unwrap(); - assert_eq!(4, vector.len()); - for (i, _t) in times.iter().enumerate() { - let v = vector.get(i); - if i == 1 || i == 3 { - assert_eq!(Value::Null, v); - continue; - } - match v { - Value::Int64(ts) => { - assert_eq!(ts, (*results.get(i).unwrap()).unwrap()); - } - _ => unreachable!(), - } - } + let arg0 = Arc::new(TimestampMillisecondArray::from(times)); + test_invoke(arg0, &results); } } diff --git a/src/common/function/src/scalars/vector/convert/parse_vector.rs b/src/common/function/src/scalars/vector/convert/parse_vector.rs index 84ec51d959..336b0c6019 100644 --- a/src/common/function/src/scalars/vector/convert/parse_vector.rs +++ b/src/common/function/src/scalars/vector/convert/parse_vector.rs @@ -13,16 +13,17 @@ // limitations under the License. use std::fmt::Display; +use std::sync::Arc; -use common_query::error::{InvalidFuncArgsSnafu, InvalidVectorStringSnafu, Result}; -use datafusion::arrow::datatypes::DataType; -use datafusion_expr::{Signature, Volatility}; -use datatypes::scalars::ScalarVectorBuilder; +use common_query::error::{InvalidVectorStringSnafu, Result}; +use datafusion_common::arrow::array::{Array, AsArray, BinaryViewBuilder}; +use datafusion_common::arrow::compute; +use datafusion_common::arrow::datatypes::DataType; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility}; use datatypes::types::parse_string_to_vector_type_value; -use datatypes::vectors::{BinaryVectorBuilder, MutableVector, VectorRef}; -use snafu::{ResultExt, ensure}; +use snafu::ResultExt; -use crate::function::{Function, FunctionContext}; +use crate::function::{Function, extract_args}; const NAME: &str = "parse_vec"; @@ -35,40 +36,36 @@ impl Function for ParseVectorFunction { } fn return_type(&self, _: &[DataType]) -> Result { - Ok(DataType::Binary) + Ok(DataType::BinaryView) } fn signature(&self) -> Signature { Signature::string(1, Volatility::Immutable) } - fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure!( - columns.len() == 1, - InvalidFuncArgsSnafu { - err_msg: format!( - "The length of the args is not correct, expect exactly one, have: {}", - columns.len() - ), - } - ); + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + let [arg0] = extract_args(self.name(), &args)?; + let arg0 = compute::cast(&arg0, &DataType::Utf8View)?; + let column = arg0.as_string_view(); - let column = &columns[0]; let size = column.len(); - let mut result = BinaryVectorBuilder::with_capacity(size); + let mut builder = BinaryViewBuilder::with_capacity(size); for i in 0..size { - let value = column.get(i).as_string(); + let value = column.is_valid(i).then(|| column.value(i)); if let Some(value) = value { - let res = parse_string_to_vector_type_value(&value, None) - .context(InvalidVectorStringSnafu { vec_str: &value })?; - result.push(Some(&res)); + let result = parse_string_to_vector_type_value(value, None) + .context(InvalidVectorStringSnafu { vec_str: value })?; + builder.append_value(result); } else { - result.push_null(); + builder.append_null(); } } - Ok(result.to_vector()) + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } } @@ -82,9 +79,9 @@ impl Display for ParseVectorFunction { mod tests { use std::sync::Arc; + use arrow_schema::Field; use common_base::bytes::Bytes; - use datatypes::value::Value; - use datatypes::vectors::StringVector; + use datafusion_common::arrow::array::StringViewArray; use super::*; @@ -92,66 +89,84 @@ mod tests { fn test_parse_vector() { let func = ParseVectorFunction; - let input = Arc::new(StringVector::from(vec![ + let arg0 = Arc::new(StringViewArray::from_iter([ Some("[1.0,2.0,3.0]".to_string()), Some("[4.0,5.0,6.0]".to_string()), None, ])); + let args = ScalarFunctionArgs { + args: vec![ColumnarValue::Array(arg0)], + arg_fields: vec![], + number_rows: 3, + return_field: Arc::new(Field::new("", DataType::BinaryView, false)), + config_options: Arc::new(Default::default()), + }; - let result = func.eval(&FunctionContext::default(), &[input]).unwrap(); + let result = func + .invoke_with_args(args) + .and_then(|x| x.to_array(3)) + .unwrap(); + let result = result.as_binary_view(); - let result = result.as_ref(); assert_eq!(result.len(), 3); assert_eq!( - result.get(0), - Value::Binary(Bytes::from( + result.value(0), + &Bytes::from( [1.0f32, 2.0, 3.0] .iter() .flat_map(|e| e.to_le_bytes()) .collect::>() - )) + ) ); assert_eq!( - result.get(1), - Value::Binary(Bytes::from( + result.value(1), + &Bytes::from( [4.0f32, 5.0, 6.0] .iter() .flat_map(|e| e.to_le_bytes()) .collect::>() - )) + ) ); - assert!(result.get(2).is_null()); + assert!(result.is_null(2)); } #[test] fn test_parse_vector_error() { let func = ParseVectorFunction; - let input = Arc::new(StringVector::from(vec![ - Some("[1.0,2.0,3.0]".to_string()), - Some("[4.0,5.0,6.0]".to_string()), - Some("[7.0,8.0,9.0".to_string()), - ])); + let inputs = [ + StringViewArray::from_iter([ + Some("[1.0,2.0,3.0]".to_string()), + Some("[4.0,5.0,6.0]".to_string()), + Some("[7.0,8.0,9.0".to_string()), + ]), + StringViewArray::from_iter([ + Some("[1.0,2.0,3.0]".to_string()), + Some("[4.0,5.0,6.0]".to_string()), + Some("7.0,8.0,9.0]".to_string()), + ]), + StringViewArray::from_iter([ + Some("[1.0,2.0,3.0]".to_string()), + Some("[4.0,5.0,6.0]".to_string()), + Some("[7.0,hello,9.0]".to_string()), + ]), + ]; + let expected = [ + "External error: Invalid vector string: [7.0,8.0,9.0", + "External error: Invalid vector string: 7.0,8.0,9.0]", + "External error: Invalid vector string: [7.0,hello,9.0]", + ]; - let result = func.eval(&FunctionContext::default(), &[input]); - assert!(result.is_err()); - - let input = Arc::new(StringVector::from(vec![ - Some("[1.0,2.0,3.0]".to_string()), - Some("[4.0,5.0,6.0]".to_string()), - Some("7.0,8.0,9.0]".to_string()), - ])); - - let result = func.eval(&FunctionContext::default(), &[input]); - assert!(result.is_err()); - - let input = Arc::new(StringVector::from(vec![ - Some("[1.0,2.0,3.0]".to_string()), - Some("[4.0,5.0,6.0]".to_string()), - Some("[7.0,hello,9.0]".to_string()), - ])); - - let result = func.eval(&FunctionContext::default(), &[input]); - assert!(result.is_err()); + for (input, expected) in inputs.into_iter().zip(expected.into_iter()) { + let args = ScalarFunctionArgs { + args: vec![ColumnarValue::Array(Arc::new(input))], + arg_fields: vec![], + number_rows: 3, + return_field: Arc::new(Field::new("", DataType::BinaryView, false)), + config_options: Arc::new(Default::default()), + }; + let result = func.invoke_with_args(args); + assert_eq!(result.unwrap_err().to_string(), expected); + } } } diff --git a/src/common/function/src/scalars/vector/convert/vector_to_string.rs b/src/common/function/src/scalars/vector/convert/vector_to_string.rs index 3dc5f06ad6..b2b5a4e0fa 100644 --- a/src/common/function/src/scalars/vector/convert/vector_to_string.rs +++ b/src/common/function/src/scalars/vector/convert/vector_to_string.rs @@ -13,18 +13,18 @@ // limitations under the License. use std::fmt::Display; +use std::sync::Arc; -use common_query::error::{InvalidFuncArgsSnafu, Result}; -use datafusion::arrow::datatypes::DataType; +use common_query::error::Result; +use datafusion_common::DataFusionError; +use datafusion_common::arrow::array::{Array, AsArray, StringViewBuilder}; +use datafusion_common::arrow::compute; +use datafusion_common::arrow::datatypes::DataType; use datafusion_expr::type_coercion::aggregates::BINARYS; -use datafusion_expr::{Signature, TypeSignature, Volatility}; -use datatypes::scalars::ScalarVectorBuilder; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, TypeSignature, Volatility}; use datatypes::types::vector_type_value_to_string; -use datatypes::value::Value; -use datatypes::vectors::{MutableVector, StringVectorBuilder, VectorRef}; -use snafu::ensure; -use crate::function::{Function, FunctionContext}; +use crate::function::{Function, extract_args}; const NAME: &str = "vec_to_string"; @@ -37,7 +37,7 @@ impl Function for VectorToStringFunction { } fn return_type(&self, _: &[DataType]) -> Result { - Ok(DataType::Utf8) + Ok(DataType::Utf8View) } fn signature(&self) -> Signature { @@ -50,51 +50,40 @@ impl Function for VectorToStringFunction { ) } - fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure!( - columns.len() == 1, - InvalidFuncArgsSnafu { - err_msg: format!( - "The length of the args is not correct, expect exactly one, have: {}", - columns.len() - ), - } - ); + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + let [arg0] = extract_args(self.name(), &args)?; + let arg0 = compute::cast(&arg0, &DataType::BinaryView)?; + let column = arg0.as_binary_view(); - let column = &columns[0]; let size = column.len(); - let mut result = StringVectorBuilder::with_capacity(size); + let mut builder = StringViewBuilder::with_capacity(size); for i in 0..size { - let value = column.get(i); + let value = column.is_valid(i).then(|| column.value(i)); match value { - Value::Binary(bytes) => { + Some(bytes) => { let len = bytes.len(); if len % std::mem::size_of::() != 0 { - return InvalidFuncArgsSnafu { - err_msg: format!("Invalid binary length of vector: {}", len), - } - .fail(); + return Err(DataFusionError::Execution(format!( + "Invalid binary length of vector: {len}" + ))); } let dim = len / std::mem::size_of::(); // Safety: `dim` is calculated from the length of `bytes` and is guaranteed to be valid - let res = vector_type_value_to_string(&bytes, dim as _).unwrap(); - result.push(Some(&res)); + let result = vector_type_value_to_string(bytes, dim as _).unwrap(); + builder.append_value(result); } - Value::Null => { - result.push_null(); - } - _ => { - return InvalidFuncArgsSnafu { - err_msg: format!("Invalid value type: {:?}", value.data_type()), - } - .fail(); + None => { + builder.append_null(); } } } - Ok(result.to_vector()) + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } } @@ -106,8 +95,8 @@ impl Display for VectorToStringFunction { #[cfg(test)] mod tests { - use datatypes::value::Value; - use datatypes::vectors::BinaryVectorBuilder; + use arrow_schema::Field; + use datafusion_common::arrow::array::BinaryViewBuilder; use super::*; @@ -115,29 +104,39 @@ mod tests { fn test_vector_to_string() { let func = VectorToStringFunction; - let mut builder = BinaryVectorBuilder::with_capacity(3); - builder.push(Some( + let mut builder = BinaryViewBuilder::with_capacity(3); + builder.append_option(Some( [1.0f32, 2.0, 3.0] .iter() .flat_map(|e| e.to_le_bytes()) .collect::>() .as_slice(), )); - builder.push(Some( + builder.append_option(Some( [4.0f32, 5.0, 6.0] .iter() .flat_map(|e| e.to_le_bytes()) .collect::>() .as_slice(), )); - builder.push_null(); - let vector = builder.to_vector(); + builder.append_null(); + let args = ScalarFunctionArgs { + args: vec![ColumnarValue::Array(Arc::new(builder.finish()))], + arg_fields: vec![], + number_rows: 3, + return_field: Arc::new(Field::new("", DataType::Utf8View, false)), + config_options: Arc::new(Default::default()), + }; - let result = func.eval(&FunctionContext::default(), &[vector]).unwrap(); + let result = func + .invoke_with_args(args) + .and_then(|x| x.to_array(3)) + .unwrap(); + let result = result.as_string_view(); assert_eq!(result.len(), 3); - assert_eq!(result.get(0), Value::String("[1,2,3]".to_string().into())); - assert_eq!(result.get(1), Value::String("[4,5,6]".to_string().into())); - assert_eq!(result.get(2), Value::Null); + assert_eq!(result.value(0), "[1,2,3]"); + assert_eq!(result.value(1), "[4,5,6]"); + assert!(result.is_null(2)); } } diff --git a/src/common/function/src/scalars/vector/distance.rs b/src/common/function/src/scalars/vector/distance.rs index ab79f64143..bbf0598c37 100644 --- a/src/common/function/src/scalars/vector/distance.rs +++ b/src/common/function/src/scalars/vector/distance.rs @@ -46,8 +46,18 @@ macro_rules! define_distance_function { fn signature(&self) -> Signature { helper::one_of_sigs2( - vec![DataType::Utf8, DataType::Binary], - vec![DataType::Utf8, DataType::Binary], + vec![ + DataType::Utf8, + DataType::Utf8View, + DataType::Binary, + DataType::BinaryView, + ], + vec![ + DataType::Utf8, + DataType::Utf8View, + DataType::Binary, + DataType::BinaryView, + ], ) } diff --git a/src/common/function/src/scalars/vector/elem_product.rs b/src/common/function/src/scalars/vector/elem_product.rs index c527faa616..548d5f15d4 100644 --- a/src/common/function/src/scalars/vector/elem_product.rs +++ b/src/common/function/src/scalars/vector/elem_product.rs @@ -57,6 +57,7 @@ impl Function for ElemProductFunction { vec![ TypeSignature::Uniform(1, STRINGS.to_vec()), TypeSignature::Uniform(1, BINARYS.to_vec()), + TypeSignature::Uniform(1, vec![DataType::BinaryView]), ], Volatility::Immutable, ) diff --git a/src/common/function/src/scalars/vector/elem_sum.rs b/src/common/function/src/scalars/vector/elem_sum.rs index 08e078df73..7e44f1d38e 100644 --- a/src/common/function/src/scalars/vector/elem_sum.rs +++ b/src/common/function/src/scalars/vector/elem_sum.rs @@ -44,6 +44,7 @@ impl Function for ElemSumFunction { vec![ TypeSignature::Uniform(1, STRINGS.to_vec()), TypeSignature::Uniform(1, BINARYS.to_vec()), + TypeSignature::Uniform(1, vec![DataType::BinaryView]), ], Volatility::Immutable, ) diff --git a/src/common/function/src/scalars/vector/impl_conv.rs b/src/common/function/src/scalars/vector/impl_conv.rs index e97ca85e33..875a1a6a09 100644 --- a/src/common/function/src/scalars/vector/impl_conv.rs +++ b/src/common/function/src/scalars/vector/impl_conv.rs @@ -20,7 +20,9 @@ use datafusion_common::ScalarValue; /// Convert a string or binary literal to a vector literal. pub fn as_veclit(arg: &ScalarValue) -> Result>> { match arg { - ScalarValue::Binary(b) => b.as_ref().map(|x| binlit_as_veclit(x)).transpose(), + ScalarValue::Binary(b) | ScalarValue::BinaryView(b) => { + b.as_ref().map(|x| binlit_as_veclit(x)).transpose() + } ScalarValue::Utf8(s) | ScalarValue::Utf8View(s) => s .as_ref() .map(|x| parse_veclit_from_strlit(x).map(Cow::Owned)) diff --git a/src/common/function/src/scalars/vector/scalar_add.rs b/src/common/function/src/scalars/vector/scalar_add.rs index 187eccc761..879e7bc5a8 100644 --- a/src/common/function/src/scalars/vector/scalar_add.rs +++ b/src/common/function/src/scalars/vector/scalar_add.rs @@ -65,7 +65,7 @@ impl Function for ScalarAddFunction { fn signature(&self) -> Signature { helper::one_of_sigs2( vec![DataType::Float64], - vec![DataType::Utf8, DataType::Binary], + vec![DataType::Utf8, DataType::Binary, DataType::BinaryView], ) } diff --git a/src/common/function/src/scalars/vector/scalar_mul.rs b/src/common/function/src/scalars/vector/scalar_mul.rs index 27127e8ee9..c77aa74375 100644 --- a/src/common/function/src/scalars/vector/scalar_mul.rs +++ b/src/common/function/src/scalars/vector/scalar_mul.rs @@ -65,7 +65,12 @@ impl Function for ScalarMulFunction { fn signature(&self) -> Signature { helper::one_of_sigs2( vec![DataType::Float64], - vec![DataType::Utf8, DataType::Binary], + vec![ + DataType::Utf8, + DataType::Utf8View, + DataType::Binary, + DataType::BinaryView, + ], ) } diff --git a/src/common/function/src/scalars/vector/vector_add.rs b/src/common/function/src/scalars/vector/vector_add.rs index e07cefbf09..a2a429b5d5 100644 --- a/src/common/function/src/scalars/vector/vector_add.rs +++ b/src/common/function/src/scalars/vector/vector_add.rs @@ -56,8 +56,8 @@ impl Function for VectorAddFunction { fn signature(&self) -> Signature { helper::one_of_sigs2( - vec![DataType::Utf8, DataType::Binary], - vec![DataType::Utf8, DataType::Binary], + vec![DataType::Utf8, DataType::Binary, DataType::BinaryView], + vec![DataType::Utf8, DataType::Binary, DataType::BinaryView], ) } diff --git a/src/common/function/src/scalars/vector/vector_div.rs b/src/common/function/src/scalars/vector/vector_div.rs index b76602ac7c..cbe52f6924 100644 --- a/src/common/function/src/scalars/vector/vector_div.rs +++ b/src/common/function/src/scalars/vector/vector_div.rs @@ -57,8 +57,8 @@ impl Function for VectorDivFunction { fn signature(&self) -> Signature { helper::one_of_sigs2( - vec![DataType::Utf8, DataType::Binary], - vec![DataType::Utf8, DataType::Binary], + vec![DataType::Utf8, DataType::Binary, DataType::BinaryView], + vec![DataType::Utf8, DataType::Binary, DataType::BinaryView], ) } diff --git a/src/common/function/src/scalars/vector/vector_mul.rs b/src/common/function/src/scalars/vector/vector_mul.rs index e25178cc7c..c01143fb1b 100644 --- a/src/common/function/src/scalars/vector/vector_mul.rs +++ b/src/common/function/src/scalars/vector/vector_mul.rs @@ -57,8 +57,8 @@ impl Function for VectorMulFunction { fn signature(&self) -> Signature { helper::one_of_sigs2( - vec![DataType::Utf8, DataType::Binary], - vec![DataType::Utf8, DataType::Binary], + vec![DataType::Utf8, DataType::Binary, DataType::BinaryView], + vec![DataType::Utf8, DataType::Binary, DataType::BinaryView], ) } diff --git a/src/common/function/src/scalars/vector/vector_norm.rs b/src/common/function/src/scalars/vector/vector_norm.rs index 9a7d19a371..70d578fc15 100644 --- a/src/common/function/src/scalars/vector/vector_norm.rs +++ b/src/common/function/src/scalars/vector/vector_norm.rs @@ -60,6 +60,7 @@ impl Function for VectorNormFunction { vec![ TypeSignature::Uniform(1, STRINGS.to_vec()), TypeSignature::Uniform(1, BINARYS.to_vec()), + TypeSignature::Uniform(1, vec![DataType::BinaryView]), ], Volatility::Immutable, ) diff --git a/src/common/function/src/scalars/vector/vector_sub.rs b/src/common/function/src/scalars/vector/vector_sub.rs index 43b33d152f..92eb20f247 100644 --- a/src/common/function/src/scalars/vector/vector_sub.rs +++ b/src/common/function/src/scalars/vector/vector_sub.rs @@ -56,8 +56,8 @@ impl Function for VectorSubFunction { fn signature(&self) -> Signature { helper::one_of_sigs2( - vec![DataType::Utf8, DataType::Binary], - vec![DataType::Utf8, DataType::Binary], + vec![DataType::Utf8, DataType::Binary, DataType::BinaryView], + vec![DataType::Utf8, DataType::Binary, DataType::BinaryView], ) } diff --git a/src/common/function/src/system/pg_catalog/version.rs b/src/common/function/src/system/pg_catalog/version.rs index e562fd6569..b2ab090737 100644 --- a/src/common/function/src/system/pg_catalog/version.rs +++ b/src/common/function/src/system/pg_catalog/version.rs @@ -13,14 +13,13 @@ // limitations under the License. use std::fmt; -use std::sync::Arc; use common_query::error::Result; use datafusion::arrow::datatypes::DataType; -use datafusion_expr::{Signature, Volatility}; -use datatypes::vectors::{StringVector, VectorRef}; +use datafusion_common::ScalarValue; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility}; -use crate::function::{Function, FunctionContext}; +use crate::function::Function; #[derive(Clone, Debug, Default)] pub(crate) struct PGVersionFunction; @@ -37,18 +36,17 @@ impl Function for PGVersionFunction { } fn return_type(&self, _: &[DataType]) -> Result { - Ok(DataType::Utf8) + Ok(DataType::Utf8View) } fn signature(&self) -> Signature { Signature::exact(vec![], Volatility::Immutable) } - fn eval(&self, _func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result { - let result = StringVector::from(vec![format!( + fn invoke_with_args(&self, _: ScalarFunctionArgs) -> datafusion_common::Result { + Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(Some(format!( "PostgreSQL 16.3 GreptimeDB {}", common_version::version() - )]); - Ok(Arc::new(result)) + ))))) } } diff --git a/src/common/query/src/error.rs b/src/common/query/src/error.rs index a6271159ae..163efb30a7 100644 --- a/src/common/query/src/error.rs +++ b/src/common/query/src/error.rs @@ -44,13 +44,6 @@ pub enum Error { source: DataTypeError, }, - #[snafu(display("Failed to cast arrow array into vector"))] - FromArrowArray { - #[snafu(implicit)] - location: Location, - source: DataTypeError, - }, - #[snafu(display("Failed to cast arrow array into vector: {:?}", data_type))] IntoVector { #[snafu(implicit)] @@ -225,7 +218,6 @@ impl ErrorExt for Error { Error::InvalidInputType { source, .. } | Error::IntoVector { source, .. } | Error::FromScalarValue { source, .. } - | Error::FromArrowArray { source, .. } | Error::InvalidVectorString { source, .. } => source.status_code(), Error::MissingTableMutationHandler { .. } diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 61430d6786..867cc30b54 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -579,6 +579,12 @@ impl QueryEngine for DatafusionQueryEngine { state: self.engine_state().function_state(), }); + let config_options = state.config_options().clone(); + let _ = state + .execution_props_mut() + .config_options + .insert(config_options); + QueryEngineContext::new(state, query_ctx) } diff --git a/tests/cases/standalone/common/types/json/json.result b/tests/cases/standalone/common/types/json/json.result index 1fc69c9c28..8c4755f4ae 100644 --- a/tests/cases/standalone/common/types/json/json.result +++ b/tests/cases/standalone/common/types/json/json.result @@ -119,11 +119,11 @@ Affected Rows: 25 INSERT INTO jsons VALUES(parse_json('{"a":1, "b":2, "c":3'), 4); -Error: 3001(EngineExecuteQuery), Invalid function args: Cannot convert the string to json, have: {"a":1, "b":2, "c":3 +Error: 3001(EngineExecuteQuery), Execution error: cannot parse '{"a":1, "b":2, "c":3': EOF while parsing a value, pos 20 INSERT INTO jsons VALUES(parse_json('Morning my friends, have a nice day :)'), 5); -Error: 3001(EngineExecuteQuery), Invalid function args: Cannot convert the string to json, have: Morning my friends, have a nice day :) +Error: 3001(EngineExecuteQuery), Execution error: cannot parse 'Morning my friends, have a nice day :)': expected value, pos 1 SELECT json_to_string(j), t FROM jsons;