diff --git a/src/common/function/src/admin.rs b/src/common/function/src/admin.rs index 1a02caa088..e311d99818 100644 --- a/src/common/function/src/admin.rs +++ b/src/common/function/src/admin.rs @@ -21,8 +21,6 @@ mod reconcile_database; mod reconcile_table; mod remove_region_follower; -use std::sync::Arc; - use add_region_follower::AddRegionFollowerFunction; use flush_compact_region::{CompactRegionFunction, FlushRegionFunction}; use flush_compact_table::{CompactTableFunction, FlushTableFunction}; @@ -35,22 +33,22 @@ use remove_region_follower::RemoveRegionFollowerFunction; use crate::flush_flow::FlushFlowFunction; use crate::function_registry::FunctionRegistry; -/// Table functions +/// Administration functions pub(crate) struct AdminFunction; impl AdminFunction { - /// Register all table functions to [`FunctionRegistry`]. + /// Register all admin functions to [`FunctionRegistry`]. pub fn register(registry: &FunctionRegistry) { - registry.register_async(Arc::new(MigrateRegionFunction)); - registry.register_async(Arc::new(AddRegionFollowerFunction)); - registry.register_async(Arc::new(RemoveRegionFollowerFunction)); - registry.register_async(Arc::new(FlushRegionFunction)); - registry.register_async(Arc::new(CompactRegionFunction)); - registry.register_async(Arc::new(FlushTableFunction)); - registry.register_async(Arc::new(CompactTableFunction)); - registry.register_async(Arc::new(FlushFlowFunction)); - registry.register_async(Arc::new(ReconcileCatalogFunction)); - registry.register_async(Arc::new(ReconcileDatabaseFunction)); - registry.register_async(Arc::new(ReconcileTableFunction)); + registry.register(MigrateRegionFunction::factory()); + registry.register(AddRegionFollowerFunction::factory()); + registry.register(RemoveRegionFollowerFunction::factory()); + registry.register(FlushRegionFunction::factory()); + registry.register(CompactRegionFunction::factory()); + registry.register(FlushTableFunction::factory()); + registry.register(CompactTableFunction::factory()); + registry.register(FlushFlowFunction::factory()); + registry.register(ReconcileCatalogFunction::factory()); + registry.register(ReconcileDatabaseFunction::factory()); + registry.register(ReconcileTableFunction::factory()); } } diff --git a/src/common/function/src/admin/add_region_follower.rs b/src/common/function/src/admin/add_region_follower.rs index 757c715ddd..976c6848a3 100644 --- a/src/common/function/src/admin/add_region_follower.rs +++ b/src/common/function/src/admin/add_region_follower.rs @@ -18,7 +18,8 @@ use common_query::error::{ InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu, }; -use common_query::prelude::{Signature, TypeSignature, Volatility}; +use datafusion_expr::{Signature, TypeSignature, Volatility}; +use datatypes::data_type::DataType; use datatypes::prelude::ConcreteDataType; use datatypes::value::{Value, ValueRef}; use session::context::QueryContextRef; @@ -82,7 +83,13 @@ fn signature() -> Signature { Signature::one_of( vec![ // add_region_follower(region_id, peer) - TypeSignature::Uniform(2, ConcreteDataType::numerics()), + TypeSignature::Uniform( + 2, + ConcreteDataType::numerics() + .into_iter() + .map(|dt| dt.as_arrow_type()) + .collect(), + ), ], Volatility::Immutable, ) @@ -92,38 +99,57 @@ fn signature() -> Signature { mod tests { use std::sync::Arc; - use common_query::prelude::TypeSignature; - use datatypes::vectors::{UInt64Vector, VectorRef}; + use arrow::array::UInt64Array; + use arrow::datatypes::{DataType, Field}; + use datafusion_expr::ColumnarValue; use super::*; - use crate::function::{AsyncFunction, FunctionContext}; + use crate::function::FunctionContext; + use crate::function_factory::ScalarFunctionFactory; #[test] fn test_add_region_follower_misc() { - let f = AddRegionFollowerFunction; + let factory: ScalarFunctionFactory = AddRegionFollowerFunction::factory().into(); + let f = factory.provide(FunctionContext::mock()); assert_eq!("add_region_follower", f.name()); - assert_eq!( - ConcreteDataType::uint64_datatype(), - f.return_type(&[]).unwrap() - ); + assert_eq!(DataType::UInt64, f.return_type(&[]).unwrap()); assert!(matches!(f.signature(), - Signature { - type_signature: TypeSignature::OneOf(sigs), - volatility: Volatility::Immutable + datafusion_expr::Signature { + type_signature: datafusion_expr::TypeSignature::OneOf(sigs), + volatility: datafusion_expr::Volatility::Immutable } if sigs.len() == 1)); } #[tokio::test] async fn test_add_region_follower() { - let f = AddRegionFollowerFunction; - let args = vec![1, 1]; - let args = args - .into_iter() - .map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _) - .collect::>(); + let factory: ScalarFunctionFactory = AddRegionFollowerFunction::factory().into(); + let provider = factory.provide(FunctionContext::mock()); + let f = provider.as_async().unwrap(); - let result = f.eval(FunctionContext::mock(), &args).await.unwrap(); - let expect: VectorRef = Arc::new(UInt64Vector::from_slice([0u64])); - assert_eq!(result, expect); + let func_args = datafusion::logical_expr::ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(Arc::new(UInt64Array::from(vec![1]))), + ColumnarValue::Array(Arc::new(UInt64Array::from(vec![2]))), + ], + arg_fields: vec![ + Arc::new(Field::new("arg_0", DataType::UInt64, false)), + Arc::new(Field::new("arg_1", DataType::UInt64, false)), + ], + return_field: Arc::new(Field::new("result", DataType::UInt64, true)), + number_rows: 1, + config_options: Arc::new(datafusion_common::config::ConfigOptions::default()), + }; + + let result = f.invoke_async_with_args(func_args).await.unwrap(); + + match result { + ColumnarValue::Array(array) => { + let result_array = array.as_any().downcast_ref::().unwrap(); + assert_eq!(result_array.value(0), 0u64); + } + ColumnarValue::Scalar(scalar) => { + assert_eq!(scalar, datafusion_common::ScalarValue::UInt64(Some(0))); + } + } } } diff --git a/src/common/function/src/admin/flush_compact_region.rs b/src/common/function/src/admin/flush_compact_region.rs index 17e5ee712f..3b5be3dc0c 100644 --- a/src/common/function/src/admin/flush_compact_region.rs +++ b/src/common/function/src/admin/flush_compact_region.rs @@ -16,7 +16,8 @@ use common_macro::admin_fn; use common_query::error::{ InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, UnsupportedInputDataTypeSnafu, }; -use common_query::prelude::{Signature, Volatility}; +use datafusion_expr::{Signature, Volatility}; +use datatypes::data_type::DataType; use datatypes::prelude::*; use session::context::QueryContextRef; use snafu::ensure; @@ -66,71 +67,99 @@ define_region_function!(FlushRegionFunction, flush_region, flush_region); define_region_function!(CompactRegionFunction, compact_region, compact_region); fn signature() -> Signature { - Signature::uniform(1, ConcreteDataType::numerics(), Volatility::Immutable) + Signature::uniform( + 1, + ConcreteDataType::numerics() + .into_iter() + .map(|dt| dt.as_arrow_type()) + .collect(), + Volatility::Immutable, + ) } #[cfg(test)] mod tests { use std::sync::Arc; - use common_query::prelude::TypeSignature; - use datatypes::vectors::UInt64Vector; + use arrow::array::UInt64Array; + use arrow::datatypes::{DataType, Field}; + use datafusion_expr::ColumnarValue; use super::*; - use crate::function::{AsyncFunction, FunctionContext}; + use crate::function::FunctionContext; + use crate::function_factory::ScalarFunctionFactory; macro_rules! define_region_function_test { ($name: ident, $func: ident) => { paste::paste! { #[test] fn []() { - let f = $func; + let factory: ScalarFunctionFactory = $func::factory().into(); + let f = factory.provide(FunctionContext::mock()); assert_eq!(stringify!($name), f.name()); assert_eq!( - ConcreteDataType::uint64_datatype(), + DataType::UInt64, f.return_type(&[]).unwrap() ); assert!(matches!(f.signature(), - Signature { - type_signature: TypeSignature::Uniform(1, valid_types), - volatility: Volatility::Immutable - } if valid_types == ConcreteDataType::numerics())); + datafusion_expr::Signature { + type_signature: datafusion_expr::TypeSignature::Uniform(1, valid_types), + volatility: datafusion_expr::Volatility::Immutable + } if valid_types == &ConcreteDataType::numerics().into_iter().map(|dt| { use datatypes::data_type::DataType; dt.as_arrow_type() }).collect::>())); } #[tokio::test] async fn []() { - let f = $func; + let factory: ScalarFunctionFactory = $func::factory().into(); + let provider = factory.provide(FunctionContext::default()); + let f = provider.as_async().unwrap(); - let args = vec![99]; - - let args = args - .into_iter() - .map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _) - .collect::>(); - - let result = f.eval(FunctionContext::default(), &args).await.unwrap_err(); + let func_args = datafusion::logical_expr::ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(Arc::new(UInt64Array::from(vec![99]))), + ], + arg_fields: vec![ + Arc::new(Field::new("arg_0", DataType::UInt64, false)), + ], + return_field: Arc::new(Field::new("result", DataType::UInt64, true)), + number_rows: 1, + config_options: Arc::new(datafusion_common::config::ConfigOptions::default()), + }; + let result = f.invoke_async_with_args(func_args).await.unwrap_err(); assert_eq!( - "Missing TableMutationHandler, not expected", + "Execution error: Handler error: Missing TableMutationHandler, not expected", result.to_string() ); } #[tokio::test] async fn []() { - let f = $func; + let factory: ScalarFunctionFactory = $func::factory().into(); + let provider = factory.provide(FunctionContext::mock()); + let f = provider.as_async().unwrap(); + let func_args = datafusion::logical_expr::ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(Arc::new(UInt64Array::from(vec![99]))), + ], + arg_fields: vec![ + Arc::new(Field::new("arg_0", DataType::UInt64, false)), + ], + return_field: Arc::new(Field::new("result", DataType::UInt64, true)), + number_rows: 1, + config_options: Arc::new(datafusion_common::config::ConfigOptions::default()), + }; + let result = f.invoke_async_with_args(func_args).await.unwrap(); - let args = vec![99]; - - let args = args - .into_iter() - .map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _) - .collect::>(); - - let result = f.eval(FunctionContext::mock(), &args).await.unwrap(); - - let expect: VectorRef = Arc::new(UInt64Vector::from_slice([42])); - assert_eq!(expect, result); + match result { + ColumnarValue::Array(array) => { + let result_array = array.as_any().downcast_ref::().unwrap(); + assert_eq!(result_array.value(0), 42u64); + } + ColumnarValue::Scalar(scalar) => { + assert_eq!(scalar, datafusion_common::ScalarValue::UInt64(Some(42))); + } + } } } }; diff --git a/src/common/function/src/admin/flush_compact_table.rs b/src/common/function/src/admin/flush_compact_table.rs index e946a38194..7ae56f3f05 100644 --- a/src/common/function/src/admin/flush_compact_table.rs +++ b/src/common/function/src/admin/flush_compact_table.rs @@ -15,14 +15,15 @@ use std::str::FromStr; use api::v1::region::{compact_request, StrictWindow}; +use arrow::datatypes::DataType as ArrowDataType; use common_error::ext::BoxedError; use common_macro::admin_fn; use common_query::error::{ InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, TableMutationSnafu, UnsupportedInputDataTypeSnafu, }; -use common_query::prelude::{Signature, Volatility}; use common_telemetry::info; +use datafusion_expr::{Signature, Volatility}; use datatypes::prelude::*; use session::context::QueryContextRef; use session::table_name::table_name_to_full_name; @@ -105,18 +106,11 @@ pub(crate) async fn compact_table( } fn flush_signature() -> Signature { - Signature::uniform( - 1, - vec![ConcreteDataType::string_datatype()], - Volatility::Immutable, - ) + Signature::uniform(1, vec![ArrowDataType::Utf8], Volatility::Immutable) } fn compact_signature() -> Signature { - Signature::variadic( - vec![ConcreteDataType::string_datatype()], - Volatility::Immutable, - ) + Signature::variadic(vec![ArrowDataType::Utf8], Volatility::Immutable) } /// Parses `compact_table` UDF parameters. This function accepts following combinations: @@ -204,66 +198,87 @@ mod tests { use std::sync::Arc; use api::v1::region::compact_request::Options; + use arrow::array::StringArray; + use arrow::datatypes::{DataType, Field}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - use common_query::prelude::TypeSignature; - use datatypes::vectors::{StringVector, UInt64Vector}; + use datafusion_expr::ColumnarValue; use session::context::QueryContext; use super::*; - use crate::function::{AsyncFunction, FunctionContext}; + use crate::function::FunctionContext; + use crate::function_factory::ScalarFunctionFactory; macro_rules! define_table_function_test { ($name: ident, $func: ident) => { paste::paste!{ #[test] fn []() { - let f = $func; + let factory: ScalarFunctionFactory = $func::factory().into(); + let f = factory.provide(FunctionContext::mock()); assert_eq!(stringify!($name), f.name()); assert_eq!( - ConcreteDataType::uint64_datatype(), + DataType::UInt64, f.return_type(&[]).unwrap() ); assert!(matches!(f.signature(), - Signature { - type_signature: TypeSignature::Uniform(1, valid_types), - volatility: Volatility::Immutable - } if valid_types == vec![ConcreteDataType::string_datatype()])); + datafusion_expr::Signature { + type_signature: datafusion_expr::TypeSignature::Uniform(1, valid_types), + volatility: datafusion_expr::Volatility::Immutable + } if valid_types == &vec![ArrowDataType::Utf8])); } #[tokio::test] async fn []() { - let f = $func; + let factory: ScalarFunctionFactory = $func::factory().into(); + let provider = factory.provide(FunctionContext::default()); + let f = provider.as_async().unwrap(); - let args = vec!["test"]; - - let args = args - .into_iter() - .map(|arg| Arc::new(StringVector::from(vec![arg])) as _) - .collect::>(); - - let result = f.eval(FunctionContext::default(), &args).await.unwrap_err(); + let func_args = datafusion::logical_expr::ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(Arc::new(StringArray::from(vec!["test"]))), + ], + arg_fields: vec![ + Arc::new(Field::new("arg_0", DataType::Utf8, false)), + ], + return_field: Arc::new(Field::new("result", DataType::UInt64, true)), + number_rows: 1, + config_options: Arc::new(datafusion_common::config::ConfigOptions::default()), + }; + let result = f.invoke_async_with_args(func_args).await.unwrap_err(); assert_eq!( - "Missing TableMutationHandler, not expected", + "Execution error: Handler error: Missing TableMutationHandler, not expected", result.to_string() ); } #[tokio::test] async fn []() { - let f = $func; + let factory: ScalarFunctionFactory = $func::factory().into(); + let provider = factory.provide(FunctionContext::mock()); + let f = provider.as_async().unwrap(); + let func_args = datafusion::logical_expr::ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(Arc::new(StringArray::from(vec!["test"]))), + ], + arg_fields: vec![ + Arc::new(Field::new("arg_0", DataType::Utf8, false)), + ], + return_field: Arc::new(Field::new("result", DataType::UInt64, true)), + number_rows: 1, + config_options: Arc::new(datafusion_common::config::ConfigOptions::default()), + }; + let result = f.invoke_async_with_args(func_args).await.unwrap(); - let args = vec!["test"]; - - let args = args - .into_iter() - .map(|arg| Arc::new(StringVector::from(vec![arg])) as _) - .collect::>(); - - let result = f.eval(FunctionContext::mock(), &args).await.unwrap(); - - let expect: VectorRef = Arc::new(UInt64Vector::from_slice([42])); - assert_eq!(expect, result); + match result { + ColumnarValue::Array(array) => { + let result_array = array.as_any().downcast_ref::().unwrap(); + assert_eq!(result_array.value(0), 42u64); + } + ColumnarValue::Scalar(scalar) => { + assert_eq!(scalar, datafusion_common::ScalarValue::UInt64(Some(42))); + } + } } } } diff --git a/src/common/function/src/admin/migrate_region.rs b/src/common/function/src/admin/migrate_region.rs index b1f79c0c07..b980cb4cb9 100644 --- a/src/common/function/src/admin/migrate_region.rs +++ b/src/common/function/src/admin/migrate_region.rs @@ -17,7 +17,8 @@ use std::time::Duration; use common_macro::admin_fn; use common_meta::rpc::procedure::MigrateRegionRequest; use common_query::error::{InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result}; -use common_query::prelude::{Signature, TypeSignature, Volatility}; +use datafusion_expr::{Signature, TypeSignature, Volatility}; +use datatypes::data_type::DataType; use datatypes::prelude::ConcreteDataType; use datatypes::value::{Value, ValueRef}; use session::context::QueryContextRef; @@ -103,9 +104,21 @@ fn signature() -> Signature { Signature::one_of( vec![ // migrate_region(region_id, from_peer, to_peer) - TypeSignature::Uniform(3, ConcreteDataType::numerics()), + TypeSignature::Uniform( + 3, + ConcreteDataType::numerics() + .into_iter() + .map(|dt| dt.as_arrow_type()) + .collect(), + ), // migrate_region(region_id, from_peer, to_peer, timeout(secs)) - TypeSignature::Uniform(4, ConcreteDataType::numerics()), + TypeSignature::Uniform( + 4, + ConcreteDataType::numerics() + .into_iter() + .map(|dt| dt.as_arrow_type()) + .collect(), + ), ], Volatility::Immutable, ) @@ -115,59 +128,89 @@ fn signature() -> Signature { mod tests { use std::sync::Arc; - use common_query::prelude::TypeSignature; - use datatypes::vectors::{StringVector, UInt64Vector, VectorRef}; + use arrow::array::{StringArray, UInt64Array}; + use arrow::datatypes::{DataType, Field}; + use datafusion_expr::ColumnarValue; use super::*; - use crate::function::{AsyncFunction, FunctionContext}; + use crate::function::FunctionContext; + use crate::function_factory::ScalarFunctionFactory; #[test] fn test_migrate_region_misc() { - let f = MigrateRegionFunction; + let factory: ScalarFunctionFactory = MigrateRegionFunction::factory().into(); + let f = factory.provide(FunctionContext::mock()); assert_eq!("migrate_region", f.name()); - assert_eq!( - ConcreteDataType::string_datatype(), - f.return_type(&[]).unwrap() - ); + assert_eq!(DataType::Utf8, f.return_type(&[]).unwrap()); assert!(matches!(f.signature(), - Signature { - type_signature: TypeSignature::OneOf(sigs), - volatility: Volatility::Immutable + datafusion_expr::Signature { + type_signature: datafusion_expr::TypeSignature::OneOf(sigs), + volatility: datafusion_expr::Volatility::Immutable } if sigs.len() == 2)); } #[tokio::test] async fn test_missing_procedure_service() { - let f = MigrateRegionFunction; + let factory: ScalarFunctionFactory = MigrateRegionFunction::factory().into(); + let provider = factory.provide(FunctionContext::default()); + let f = provider.as_async().unwrap(); - let args = vec![1, 1, 1]; - - let args = args - .into_iter() - .map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _) - .collect::>(); - - let result = f.eval(FunctionContext::default(), &args).await.unwrap_err(); + let func_args = datafusion::logical_expr::ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(Arc::new(UInt64Array::from(vec![1]))), + ColumnarValue::Array(Arc::new(UInt64Array::from(vec![1]))), + ColumnarValue::Array(Arc::new(UInt64Array::from(vec![1]))), + ], + arg_fields: vec![ + Arc::new(Field::new("arg_0", DataType::UInt64, false)), + Arc::new(Field::new("arg_1", DataType::UInt64, false)), + Arc::new(Field::new("arg_2", DataType::UInt64, false)), + ], + return_field: Arc::new(Field::new("result", DataType::Utf8, true)), + number_rows: 1, + config_options: Arc::new(datafusion_common::config::ConfigOptions::default()), + }; + let result = f.invoke_async_with_args(func_args).await.unwrap_err(); assert_eq!( - "Missing ProcedureServiceHandler, not expected", + "Execution error: Handler error: Missing ProcedureServiceHandler, not expected", result.to_string() ); } #[tokio::test] async fn test_migrate_region() { - let f = MigrateRegionFunction; + let factory: ScalarFunctionFactory = MigrateRegionFunction::factory().into(); + let provider = factory.provide(FunctionContext::mock()); + let f = provider.as_async().unwrap(); - let args = vec![1, 1, 1]; + let func_args = datafusion::logical_expr::ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(Arc::new(UInt64Array::from(vec![1]))), + ColumnarValue::Array(Arc::new(UInt64Array::from(vec![1]))), + ColumnarValue::Array(Arc::new(UInt64Array::from(vec![1]))), + ], + arg_fields: vec![ + Arc::new(Field::new("arg_0", DataType::UInt64, false)), + Arc::new(Field::new("arg_1", DataType::UInt64, false)), + Arc::new(Field::new("arg_2", DataType::UInt64, false)), + ], + return_field: Arc::new(Field::new("result", DataType::Utf8, true)), + number_rows: 1, + config_options: Arc::new(datafusion_common::config::ConfigOptions::default()), + }; + let result = f.invoke_async_with_args(func_args).await.unwrap(); - let args = args - .into_iter() - .map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _) - .collect::>(); - - let result = f.eval(FunctionContext::mock(), &args).await.unwrap(); - - let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"])); - assert_eq!(expect, result); + match result { + ColumnarValue::Array(array) => { + let result_array = array.as_any().downcast_ref::().unwrap(); + assert_eq!(result_array.value(0), "test_pid"); + } + ColumnarValue::Scalar(scalar) => { + assert_eq!( + scalar, + datafusion_common::ScalarValue::Utf8(Some("test_pid".to_string())) + ); + } + } } } diff --git a/src/common/function/src/admin/reconcile_catalog.rs b/src/common/function/src/admin/reconcile_catalog.rs index fc2fec3273..d860970d61 100644 --- a/src/common/function/src/admin/reconcile_catalog.rs +++ b/src/common/function/src/admin/reconcile_catalog.rs @@ -14,13 +14,15 @@ use api::v1::meta::reconcile_request::Target; use api::v1::meta::{ReconcileCatalog, ReconcileRequest}; +use arrow::datatypes::DataType as ArrowDataType; use common_macro::admin_fn; use common_query::error::{ InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu, }; -use common_query::prelude::{Signature, TypeSignature, Volatility}; use common_telemetry::info; +use datafusion_expr::{Signature, TypeSignature, Volatility}; +use datatypes::data_type::DataType; use datatypes::prelude::*; use session::context::QueryContextRef; @@ -104,15 +106,15 @@ fn signature() -> Signature { let mut signs = Vec::with_capacity(2 + nums.len()); signs.extend([ // reconcile_catalog() - TypeSignature::NullAry, + TypeSignature::Nullary, // reconcile_catalog(resolve_strategy) - TypeSignature::Exact(vec![ConcreteDataType::string_datatype()]), + TypeSignature::Exact(vec![ArrowDataType::Utf8]), ]); for sign in nums { // reconcile_catalog(resolve_strategy, parallelism) signs.push(TypeSignature::Exact(vec![ - ConcreteDataType::string_datatype(), - sign, + ArrowDataType::Utf8, + sign.as_arrow_type(), ])); } Signature::one_of(signs, Volatility::Immutable) @@ -120,60 +122,149 @@ fn signature() -> Signature { #[cfg(test)] mod tests { - use std::assert_matches::assert_matches; use std::sync::Arc; - use common_query::error::Error; - use datatypes::vectors::{StringVector, UInt64Vector, VectorRef}; + use arrow::array::{StringArray, UInt64Array}; + use arrow::datatypes::{DataType, Field}; + use datafusion_expr::ColumnarValue; use crate::admin::reconcile_catalog::ReconcileCatalogFunction; - use crate::function::{AsyncFunction, FunctionContext}; + use crate::function::FunctionContext; + use crate::function_factory::ScalarFunctionFactory; #[tokio::test] async fn test_reconcile_catalog() { common_telemetry::init_default_ut_logging(); // reconcile_catalog() - let f = ReconcileCatalogFunction; - let args = vec![]; - let result = f.eval(FunctionContext::mock(), &args).await.unwrap(); - let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"])); - assert_eq!(expect, result); + let factory: ScalarFunctionFactory = ReconcileCatalogFunction::factory().into(); + let provider = factory.provide(FunctionContext::mock()); + let f = provider.as_async().unwrap(); + + let func_args = datafusion::logical_expr::ScalarFunctionArgs { + args: vec![], + arg_fields: vec![], + return_field: Arc::new(Field::new("result", DataType::Utf8, true)), + number_rows: 1, + config_options: Arc::new(datafusion_common::config::ConfigOptions::default()), + }; + + let result = f.invoke_async_with_args(func_args).await.unwrap(); + match result { + ColumnarValue::Array(array) => { + let result_array = array.as_any().downcast_ref::().unwrap(); + assert_eq!(result_array.value(0), "test_pid"); + } + ColumnarValue::Scalar(scalar) => { + assert_eq!( + scalar, + datafusion_common::ScalarValue::Utf8(Some("test_pid".to_string())) + ); + } + } // reconcile_catalog(resolve_strategy) - let f = ReconcileCatalogFunction; - let args = vec![Arc::new(StringVector::from(vec!["UseMetasrv"])) as _]; - let result = f.eval(FunctionContext::mock(), &args).await.unwrap(); - let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"])); - assert_eq!(expect, result); + let factory: ScalarFunctionFactory = ReconcileCatalogFunction::factory().into(); + let provider = factory.provide(FunctionContext::mock()); + let f = provider.as_async().unwrap(); + + let func_args = datafusion::logical_expr::ScalarFunctionArgs { + args: vec![ColumnarValue::Array(Arc::new(StringArray::from(vec![ + "UseMetasrv", + ])))], + arg_fields: vec![Arc::new(Field::new("arg_0", DataType::Utf8, false))], + return_field: Arc::new(Field::new("result", DataType::Utf8, true)), + number_rows: 1, + config_options: Arc::new(datafusion_common::config::ConfigOptions::default()), + }; + let result = f.invoke_async_with_args(func_args).await.unwrap(); + match result { + ColumnarValue::Array(array) => { + let result_array = array.as_any().downcast_ref::().unwrap(); + assert_eq!(result_array.value(0), "test_pid"); + } + ColumnarValue::Scalar(scalar) => { + assert_eq!( + scalar, + datafusion_common::ScalarValue::Utf8(Some("test_pid".to_string())) + ); + } + } // reconcile_catalog(resolve_strategy, parallelism) - let f = ReconcileCatalogFunction; - let args = vec![ - Arc::new(StringVector::from(vec!["UseLatest"])) as _, - Arc::new(UInt64Vector::from_slice([10])) as _, - ]; - let result = f.eval(FunctionContext::mock(), &args).await.unwrap(); - let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"])); - assert_eq!(expect, result); + let factory: ScalarFunctionFactory = ReconcileCatalogFunction::factory().into(); + let provider = factory.provide(FunctionContext::mock()); + let f = provider.as_async().unwrap(); + + let func_args = datafusion::logical_expr::ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(Arc::new(StringArray::from(vec!["UseLatest"]))), + ColumnarValue::Array(Arc::new(UInt64Array::from(vec![10]))), + ], + arg_fields: vec![ + Arc::new(Field::new("arg_0", DataType::Utf8, false)), + Arc::new(Field::new("arg_1", DataType::UInt64, false)), + ], + return_field: Arc::new(Field::new("result", DataType::Utf8, true)), + number_rows: 1, + config_options: Arc::new(datafusion_common::config::ConfigOptions::default()), + }; + let result = f.invoke_async_with_args(func_args).await.unwrap(); + match result { + ColumnarValue::Array(array) => { + let result_array = array.as_any().downcast_ref::().unwrap(); + assert_eq!(result_array.value(0), "test_pid"); + } + ColumnarValue::Scalar(scalar) => { + assert_eq!( + scalar, + datafusion_common::ScalarValue::Utf8(Some("test_pid".to_string())) + ); + } + } // unsupported input data type - let f = ReconcileCatalogFunction; - let args = vec![ - Arc::new(StringVector::from(vec!["UseLatest"])) as _, - Arc::new(StringVector::from(vec!["test"])) as _, - ]; - let err = f.eval(FunctionContext::mock(), &args).await.unwrap_err(); - assert_matches!(err, Error::UnsupportedInputDataType { .. }); + let factory: ScalarFunctionFactory = ReconcileCatalogFunction::factory().into(); + let provider = factory.provide(FunctionContext::mock()); + let f = provider.as_async().unwrap(); + + let func_args = datafusion::logical_expr::ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(Arc::new(StringArray::from(vec!["UseLatest"]))), + ColumnarValue::Array(Arc::new(StringArray::from(vec!["test"]))), + ], + arg_fields: vec![ + Arc::new(Field::new("arg_0", DataType::Utf8, false)), + Arc::new(Field::new("arg_1", DataType::Utf8, false)), + ], + return_field: Arc::new(Field::new("result", DataType::Utf8, true)), + number_rows: 1, + config_options: Arc::new(datafusion_common::config::ConfigOptions::default()), + }; + let _err = f.invoke_async_with_args(func_args).await.unwrap_err(); + // Note: Error type is DataFusionError at this level, not common_query::Error // invalid function args - let f = ReconcileCatalogFunction; - let args = vec![ - Arc::new(StringVector::from(vec!["UseLatest"])) as _, - Arc::new(UInt64Vector::from_slice([10])) as _, - Arc::new(StringVector::from(vec!["10"])) as _, - ]; - let err = f.eval(FunctionContext::mock(), &args).await.unwrap_err(); - assert_matches!(err, Error::InvalidFuncArgs { .. }); + let factory: ScalarFunctionFactory = ReconcileCatalogFunction::factory().into(); + let provider = factory.provide(FunctionContext::mock()); + let f = provider.as_async().unwrap(); + + let func_args = datafusion::logical_expr::ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(Arc::new(StringArray::from(vec!["UseLatest"]))), + ColumnarValue::Array(Arc::new(UInt64Array::from(vec![10]))), + ColumnarValue::Array(Arc::new(StringArray::from(vec!["10"]))), + ], + arg_fields: vec![ + Arc::new(Field::new("arg_0", DataType::Utf8, false)), + Arc::new(Field::new("arg_1", DataType::UInt64, false)), + Arc::new(Field::new("arg_2", DataType::Utf8, false)), + ], + return_field: Arc::new(Field::new("result", DataType::Utf8, true)), + number_rows: 1, + config_options: Arc::new(datafusion_common::config::ConfigOptions::default()), + }; + let _err = f.invoke_async_with_args(func_args).await.unwrap_err(); + // Note: Error type is DataFusionError at this level, not common_query::Error } } diff --git a/src/common/function/src/admin/reconcile_database.rs b/src/common/function/src/admin/reconcile_database.rs index 622d2bb069..020a7f4cf0 100644 --- a/src/common/function/src/admin/reconcile_database.rs +++ b/src/common/function/src/admin/reconcile_database.rs @@ -14,13 +14,15 @@ use api::v1::meta::reconcile_request::Target; use api::v1::meta::{ReconcileDatabase, ReconcileRequest}; +use arrow::datatypes::DataType as ArrowDataType; use common_macro::admin_fn; use common_query::error::{ InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu, }; -use common_query::prelude::{Signature, TypeSignature, Volatility}; use common_telemetry::info; +use datafusion_expr::{Signature, TypeSignature, Volatility}; +use datatypes::data_type::DataType; use datatypes::prelude::*; use session::context::QueryContextRef; @@ -113,19 +115,16 @@ fn signature() -> Signature { let mut signs = Vec::with_capacity(2 + nums.len()); signs.extend([ // reconcile_database(datanode_name) - TypeSignature::Exact(vec![ConcreteDataType::string_datatype()]), + TypeSignature::Exact(vec![ArrowDataType::Utf8]), // reconcile_database(database_name, resolve_strategy) - TypeSignature::Exact(vec![ - ConcreteDataType::string_datatype(), - ConcreteDataType::string_datatype(), - ]), + TypeSignature::Exact(vec![ArrowDataType::Utf8, ArrowDataType::Utf8]), ]); for sign in nums { // reconcile_database(database_name, resolve_strategy, parallelism) signs.push(TypeSignature::Exact(vec![ - ConcreteDataType::string_datatype(), - ConcreteDataType::string_datatype(), - sign, + ArrowDataType::Utf8, + ArrowDataType::Utf8, + sign.as_arrow_type(), ])); } Signature::one_of(signs, Volatility::Immutable) @@ -133,66 +132,160 @@ fn signature() -> Signature { #[cfg(test)] mod tests { - use std::assert_matches::assert_matches; use std::sync::Arc; - use common_query::error::Error; - use datatypes::vectors::{StringVector, UInt32Vector, VectorRef}; + use arrow::array::{StringArray, UInt32Array}; + use arrow::datatypes::{DataType, Field}; + use datafusion_expr::ColumnarValue; use crate::admin::reconcile_database::ReconcileDatabaseFunction; - use crate::function::{AsyncFunction, FunctionContext}; + use crate::function::FunctionContext; + use crate::function_factory::ScalarFunctionFactory; #[tokio::test] async fn test_reconcile_catalog() { common_telemetry::init_default_ut_logging(); // reconcile_database(database_name) - let f = ReconcileDatabaseFunction; - let args = vec![Arc::new(StringVector::from(vec!["test"])) as _]; - let result = f.eval(FunctionContext::mock(), &args).await.unwrap(); - let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"])); - assert_eq!(expect, result); + let factory: ScalarFunctionFactory = ReconcileDatabaseFunction::factory().into(); + let provider = factory.provide(FunctionContext::mock()); + let f = provider.as_async().unwrap(); + + let func_args = datafusion::logical_expr::ScalarFunctionArgs { + args: vec![ColumnarValue::Array(Arc::new(StringArray::from(vec![ + "test", + ])))], + arg_fields: vec![Arc::new(Field::new("arg_0", DataType::Utf8, false))], + return_field: Arc::new(Field::new("result", DataType::Utf8, true)), + number_rows: 1, + config_options: Arc::new(datafusion_common::config::ConfigOptions::default()), + }; + let result = f.invoke_async_with_args(func_args).await.unwrap(); + match result { + ColumnarValue::Array(array) => { + let result_array = array.as_any().downcast_ref::().unwrap(); + assert_eq!(result_array.value(0), "test_pid"); + } + ColumnarValue::Scalar(scalar) => { + assert_eq!( + scalar, + datafusion_common::ScalarValue::Utf8(Some("test_pid".to_string())) + ); + } + } // reconcile_database(database_name, resolve_strategy) - let f = ReconcileDatabaseFunction; - let args = vec![ - Arc::new(StringVector::from(vec!["test"])) as _, - Arc::new(StringVector::from(vec!["UseLatest"])) as _, - ]; - let result = f.eval(FunctionContext::mock(), &args).await.unwrap(); - let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"])); - assert_eq!(expect, result); + let factory: ScalarFunctionFactory = ReconcileDatabaseFunction::factory().into(); + let provider = factory.provide(FunctionContext::mock()); + let f = provider.as_async().unwrap(); + + let func_args = datafusion::logical_expr::ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(Arc::new(StringArray::from(vec!["test"]))), + ColumnarValue::Array(Arc::new(StringArray::from(vec!["UseLatest"]))), + ], + arg_fields: vec![ + Arc::new(Field::new("arg_0", DataType::Utf8, false)), + Arc::new(Field::new("arg_1", DataType::Utf8, false)), + ], + return_field: Arc::new(Field::new("result", DataType::Utf8, true)), + number_rows: 1, + config_options: Arc::new(datafusion_common::config::ConfigOptions::default()), + }; + let result = f.invoke_async_with_args(func_args).await.unwrap(); + match result { + ColumnarValue::Array(array) => { + let result_array = array.as_any().downcast_ref::().unwrap(); + assert_eq!(result_array.value(0), "test_pid"); + } + ColumnarValue::Scalar(scalar) => { + assert_eq!( + scalar, + datafusion_common::ScalarValue::Utf8(Some("test_pid".to_string())) + ); + } + } // reconcile_database(database_name, resolve_strategy, parallelism) - let f = ReconcileDatabaseFunction; - let args = vec![ - Arc::new(StringVector::from(vec!["test"])) as _, - Arc::new(StringVector::from(vec!["UseLatest"])) as _, - Arc::new(UInt32Vector::from_slice([10])) as _, - ]; - let result = f.eval(FunctionContext::mock(), &args).await.unwrap(); - let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"])); - assert_eq!(expect, result); + let factory: ScalarFunctionFactory = ReconcileDatabaseFunction::factory().into(); + let provider = factory.provide(FunctionContext::mock()); + let f = provider.as_async().unwrap(); + + let func_args = datafusion::logical_expr::ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(Arc::new(StringArray::from(vec!["test"]))), + ColumnarValue::Array(Arc::new(StringArray::from(vec!["UseLatest"]))), + ColumnarValue::Array(Arc::new(UInt32Array::from(vec![10]))), + ], + arg_fields: vec![ + Arc::new(Field::new("arg_0", DataType::Utf8, false)), + Arc::new(Field::new("arg_1", DataType::Utf8, false)), + Arc::new(Field::new("arg_2", DataType::UInt32, false)), + ], + return_field: Arc::new(Field::new("result", DataType::Utf8, true)), + number_rows: 1, + config_options: Arc::new(datafusion_common::config::ConfigOptions::default()), + }; + let result = f.invoke_async_with_args(func_args).await.unwrap(); + match result { + ColumnarValue::Array(array) => { + let result_array = array.as_any().downcast_ref::().unwrap(); + assert_eq!(result_array.value(0), "test_pid"); + } + ColumnarValue::Scalar(scalar) => { + assert_eq!( + scalar, + datafusion_common::ScalarValue::Utf8(Some("test_pid".to_string())) + ); + } + } // invalid function args - let f = ReconcileDatabaseFunction; - let args = vec![ - Arc::new(StringVector::from(vec!["UseLatest"])) as _, - Arc::new(UInt32Vector::from_slice([10])) as _, - Arc::new(StringVector::from(vec!["v1"])) as _, - Arc::new(StringVector::from(vec!["v2"])) as _, - ]; - let err = f.eval(FunctionContext::mock(), &args).await.unwrap_err(); - assert_matches!(err, Error::InvalidFuncArgs { .. }); + let factory: ScalarFunctionFactory = ReconcileDatabaseFunction::factory().into(); + let provider = factory.provide(FunctionContext::mock()); + let f = provider.as_async().unwrap(); + + let func_args = datafusion::logical_expr::ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(Arc::new(StringArray::from(vec!["UseLatest"]))), + ColumnarValue::Array(Arc::new(UInt32Array::from(vec![10]))), + ColumnarValue::Array(Arc::new(StringArray::from(vec!["v1"]))), + ColumnarValue::Array(Arc::new(StringArray::from(vec!["v2"]))), + ], + arg_fields: vec![ + Arc::new(Field::new("arg_0", DataType::Utf8, false)), + Arc::new(Field::new("arg_1", DataType::UInt32, false)), + Arc::new(Field::new("arg_2", DataType::Utf8, false)), + Arc::new(Field::new("arg_3", DataType::Utf8, false)), + ], + return_field: Arc::new(Field::new("result", DataType::Utf8, true)), + number_rows: 1, + config_options: Arc::new(datafusion_common::config::ConfigOptions::default()), + }; + let _err = f.invoke_async_with_args(func_args).await.unwrap_err(); + // Note: Error type is DataFusionError at this level, not common_query::Error // unsupported input data type - let f = ReconcileDatabaseFunction; - let args = vec![ - Arc::new(StringVector::from(vec!["UseLatest"])) as _, - Arc::new(UInt32Vector::from_slice([10])) as _, - Arc::new(StringVector::from(vec!["v1"])) as _, - ]; - let err = f.eval(FunctionContext::mock(), &args).await.unwrap_err(); - assert_matches!(err, Error::UnsupportedInputDataType { .. }); + let factory: ScalarFunctionFactory = ReconcileDatabaseFunction::factory().into(); + let provider = factory.provide(FunctionContext::mock()); + let f = provider.as_async().unwrap(); + + let func_args = datafusion::logical_expr::ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(Arc::new(StringArray::from(vec!["UseLatest"]))), + ColumnarValue::Array(Arc::new(UInt32Array::from(vec![10]))), + ColumnarValue::Array(Arc::new(StringArray::from(vec!["v1"]))), + ], + arg_fields: vec![ + Arc::new(Field::new("arg_0", DataType::Utf8, false)), + Arc::new(Field::new("arg_1", DataType::UInt32, false)), + Arc::new(Field::new("arg_2", DataType::Utf8, false)), + ], + return_field: Arc::new(Field::new("result", DataType::Utf8, true)), + number_rows: 1, + config_options: Arc::new(datafusion_common::config::ConfigOptions::default()), + }; + let _err = f.invoke_async_with_args(func_args).await.unwrap_err(); + // Note: Error type is DataFusionError at this level, not common_query::Error } } diff --git a/src/common/function/src/admin/reconcile_table.rs b/src/common/function/src/admin/reconcile_table.rs index 61e54e47bc..10d2c5fdbf 100644 --- a/src/common/function/src/admin/reconcile_table.rs +++ b/src/common/function/src/admin/reconcile_table.rs @@ -14,14 +14,15 @@ use api::v1::meta::reconcile_request::Target; use api::v1::meta::{ReconcileRequest, ReconcileTable, ResolveStrategy}; +use arrow::datatypes::DataType as ArrowDataType; use common_catalog::format_full_table_name; use common_error::ext::BoxedError; use common_macro::admin_fn; use common_query::error::{ MissingProcedureServiceHandlerSnafu, Result, TableMutationSnafu, UnsupportedInputDataTypeSnafu, }; -use common_query::prelude::{Signature, TypeSignature, Volatility}; use common_telemetry::info; +use datafusion_expr::{Signature, TypeSignature, Volatility}; use datatypes::prelude::*; use session::context::QueryContextRef; use session::table_name::table_name_to_full_name; @@ -93,12 +94,9 @@ fn signature() -> Signature { Signature::one_of( vec![ // reconcile_table(table_name) - TypeSignature::Exact(vec![ConcreteDataType::string_datatype()]), + TypeSignature::Exact(vec![ArrowDataType::Utf8]), // reconcile_table(table_name, resolve_strategy) - TypeSignature::Exact(vec![ - ConcreteDataType::string_datatype(), - ConcreteDataType::string_datatype(), - ]), + TypeSignature::Exact(vec![ArrowDataType::Utf8, ArrowDataType::Utf8]), ], Volatility::Immutable, ) @@ -106,44 +104,101 @@ fn signature() -> Signature { #[cfg(test)] mod tests { - use std::assert_matches::assert_matches; use std::sync::Arc; - use common_query::error::Error; - use datatypes::vectors::{StringVector, VectorRef}; + use arrow::array::StringArray; + use arrow::datatypes::{DataType, Field}; + use datafusion_expr::ColumnarValue; use crate::admin::reconcile_table::ReconcileTableFunction; - use crate::function::{AsyncFunction, FunctionContext}; + use crate::function::FunctionContext; + use crate::function_factory::ScalarFunctionFactory; #[tokio::test] async fn test_reconcile_table() { common_telemetry::init_default_ut_logging(); // reconcile_table(table_name) - let f = ReconcileTableFunction; - let args = vec![Arc::new(StringVector::from(vec!["test"])) as _]; - let result = f.eval(FunctionContext::mock(), &args).await.unwrap(); - let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"])); - assert_eq!(expect, result); + let factory: ScalarFunctionFactory = ReconcileTableFunction::factory().into(); + let provider = factory.provide(FunctionContext::mock()); + let f = provider.as_async().unwrap(); + + let func_args = datafusion::logical_expr::ScalarFunctionArgs { + args: vec![ColumnarValue::Array(Arc::new(StringArray::from(vec![ + "test", + ])))], + arg_fields: vec![Arc::new(Field::new("arg_0", DataType::Utf8, false))], + return_field: Arc::new(Field::new("result", DataType::Utf8, true)), + number_rows: 1, + config_options: Arc::new(datafusion_common::config::ConfigOptions::default()), + }; + let result = f.invoke_async_with_args(func_args).await.unwrap(); + match result { + ColumnarValue::Array(array) => { + let result_array = array.as_any().downcast_ref::().unwrap(); + assert_eq!(result_array.value(0), "test_pid"); + } + ColumnarValue::Scalar(scalar) => { + assert_eq!( + scalar, + datafusion_common::ScalarValue::Utf8(Some("test_pid".to_string())) + ); + } + } // reconcile_table(table_name, resolve_strategy) - let f = ReconcileTableFunction; - let args = vec![ - Arc::new(StringVector::from(vec!["test"])) as _, - Arc::new(StringVector::from(vec!["UseMetasrv"])) as _, - ]; - let result = f.eval(FunctionContext::mock(), &args).await.unwrap(); - let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"])); - assert_eq!(expect, result); + let factory: ScalarFunctionFactory = ReconcileTableFunction::factory().into(); + let provider = factory.provide(FunctionContext::mock()); + let f = provider.as_async().unwrap(); + + let func_args = datafusion::logical_expr::ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(Arc::new(StringArray::from(vec!["test"]))), + ColumnarValue::Array(Arc::new(StringArray::from(vec!["UseMetasrv"]))), + ], + arg_fields: vec![ + Arc::new(Field::new("arg_0", DataType::Utf8, false)), + Arc::new(Field::new("arg_1", DataType::Utf8, false)), + ], + return_field: Arc::new(Field::new("result", DataType::Utf8, true)), + number_rows: 1, + config_options: Arc::new(datafusion_common::config::ConfigOptions::default()), + }; + let result = f.invoke_async_with_args(func_args).await.unwrap(); + match result { + ColumnarValue::Array(array) => { + let result_array = array.as_any().downcast_ref::().unwrap(); + assert_eq!(result_array.value(0), "test_pid"); + } + ColumnarValue::Scalar(scalar) => { + assert_eq!( + scalar, + datafusion_common::ScalarValue::Utf8(Some("test_pid".to_string())) + ); + } + } // unsupported input data type - let f = ReconcileTableFunction; - let args = vec![ - Arc::new(StringVector::from(vec!["test"])) as _, - Arc::new(StringVector::from(vec!["UseMetasrv"])) as _, - Arc::new(StringVector::from(vec!["10"])) as _, - ]; - let err = f.eval(FunctionContext::mock(), &args).await.unwrap_err(); - assert_matches!(err, Error::UnsupportedInputDataType { .. }); + let factory: ScalarFunctionFactory = ReconcileTableFunction::factory().into(); + let provider = factory.provide(FunctionContext::mock()); + let f = provider.as_async().unwrap(); + + let func_args = datafusion::logical_expr::ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(Arc::new(StringArray::from(vec!["test"]))), + ColumnarValue::Array(Arc::new(StringArray::from(vec!["UseMetasrv"]))), + ColumnarValue::Array(Arc::new(StringArray::from(vec!["10"]))), + ], + arg_fields: vec![ + Arc::new(Field::new("arg_0", DataType::Utf8, false)), + Arc::new(Field::new("arg_1", DataType::Utf8, false)), + Arc::new(Field::new("arg_2", DataType::Utf8, false)), + ], + return_field: Arc::new(Field::new("result", DataType::Utf8, true)), + number_rows: 1, + config_options: Arc::new(datafusion_common::config::ConfigOptions::default()), + }; + let _err = f.invoke_async_with_args(func_args).await.unwrap_err(); + // Note: Error type is DataFusionError at this level, not common_query::Error } } diff --git a/src/common/function/src/admin/remove_region_follower.rs b/src/common/function/src/admin/remove_region_follower.rs index f63beec738..f461970b0c 100644 --- a/src/common/function/src/admin/remove_region_follower.rs +++ b/src/common/function/src/admin/remove_region_follower.rs @@ -18,7 +18,8 @@ use common_query::error::{ InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu, }; -use common_query::prelude::{Signature, TypeSignature, Volatility}; +use datafusion_expr::{Signature, TypeSignature, Volatility}; +use datatypes::data_type::DataType; use datatypes::prelude::ConcreteDataType; use datatypes::value::{Value, ValueRef}; use session::context::QueryContextRef; @@ -82,7 +83,13 @@ fn signature() -> Signature { Signature::one_of( vec![ // remove_region_follower(region_id, peer_id) - TypeSignature::Uniform(2, ConcreteDataType::numerics()), + TypeSignature::Uniform( + 2, + ConcreteDataType::numerics() + .into_iter() + .map(|dt| dt.as_arrow_type()) + .collect(), + ), ], Volatility::Immutable, ) @@ -92,38 +99,57 @@ fn signature() -> Signature { mod tests { use std::sync::Arc; - use common_query::prelude::TypeSignature; - use datatypes::vectors::{UInt64Vector, VectorRef}; + use arrow::array::UInt64Array; + use arrow::datatypes::{DataType, Field}; + use datafusion_expr::ColumnarValue; use super::*; - use crate::function::{AsyncFunction, FunctionContext}; + use crate::function::FunctionContext; + use crate::function_factory::ScalarFunctionFactory; #[test] fn test_remove_region_follower_misc() { - let f = RemoveRegionFollowerFunction; + let factory: ScalarFunctionFactory = RemoveRegionFollowerFunction::factory().into(); + let f = factory.provide(FunctionContext::mock()); assert_eq!("remove_region_follower", f.name()); - assert_eq!( - ConcreteDataType::uint64_datatype(), - f.return_type(&[]).unwrap() - ); + assert_eq!(DataType::UInt64, f.return_type(&[]).unwrap()); assert!(matches!(f.signature(), - Signature { - type_signature: TypeSignature::OneOf(sigs), - volatility: Volatility::Immutable + datafusion_expr::Signature { + type_signature: datafusion_expr::TypeSignature::OneOf(sigs), + volatility: datafusion_expr::Volatility::Immutable } if sigs.len() == 1)); } #[tokio::test] async fn test_remove_region_follower() { - let f = RemoveRegionFollowerFunction; - let args = vec![1, 1]; - let args = args - .into_iter() - .map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _) - .collect::>(); + let factory: ScalarFunctionFactory = RemoveRegionFollowerFunction::factory().into(); + let provider = factory.provide(FunctionContext::mock()); + let f = provider.as_async().unwrap(); - let result = f.eval(FunctionContext::mock(), &args).await.unwrap(); - let expect: VectorRef = Arc::new(UInt64Vector::from_slice([0u64])); - assert_eq!(result, expect); + let func_args = datafusion::logical_expr::ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(Arc::new(UInt64Array::from(vec![1]))), + ColumnarValue::Array(Arc::new(UInt64Array::from(vec![1]))), + ], + arg_fields: vec![ + Arc::new(Field::new("arg_0", DataType::UInt64, false)), + Arc::new(Field::new("arg_1", DataType::UInt64, false)), + ], + return_field: Arc::new(Field::new("result", DataType::UInt64, true)), + number_rows: 1, + config_options: Arc::new(datafusion_common::config::ConfigOptions::default()), + }; + + let result = f.invoke_async_with_args(func_args).await.unwrap(); + + match result { + ColumnarValue::Array(array) => { + let result_array = array.as_any().downcast_ref::().unwrap(); + assert_eq!(result_array.value(0), 0u64); + } + ColumnarValue::Scalar(scalar) => { + assert_eq!(scalar, datafusion_common::ScalarValue::UInt64(Some(0))); + } + } } } diff --git a/src/common/function/src/flush_flow.rs b/src/common/function/src/flush_flow.rs index 3952fade52..68fcb3dd14 100644 --- a/src/common/function/src/flush_flow.rs +++ b/src/common/function/src/flush_flow.rs @@ -12,29 +12,24 @@ // See the License for the specific language governing permissions and // limitations under the License. +use arrow::datatypes::DataType as ArrowDataType; use common_error::ext::BoxedError; use common_macro::admin_fn; use common_query::error::{ ExecuteSnafu, InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu, }; -use common_query::prelude::Signature; -use datafusion::logical_expr::Volatility; +use datafusion_expr::{Signature, Volatility}; use datatypes::value::{Value, ValueRef}; use session::context::QueryContextRef; use snafu::{ensure, ResultExt}; use sql::ast::ObjectNamePartExt; use sql::parser::ParserContext; -use store_api::storage::ConcreteDataType; use crate::handlers::FlowServiceHandlerRef; fn flush_signature() -> Signature { - Signature::uniform( - 1, - vec![ConcreteDataType::string_datatype()], - Volatility::Immutable, - ) + Signature::uniform(1, vec![ArrowDataType::Utf8], Volatility::Immutable) } #[admin_fn( @@ -106,44 +101,55 @@ fn parse_flush_flow( mod test { use std::sync::Arc; - use datatypes::scalars::ScalarVector; - use datatypes::vectors::StringVector; use session::context::QueryContext; use super::*; - use crate::function::{AsyncFunction, FunctionContext}; + use crate::function::FunctionContext; + use crate::function_factory::ScalarFunctionFactory; #[test] fn test_flush_flow_metadata() { - let f = FlushFlowFunction; + let factory: ScalarFunctionFactory = FlushFlowFunction::factory().into(); + let f = factory.provide(FunctionContext::mock()); assert_eq!("flush_flow", f.name()); - assert_eq!( - ConcreteDataType::uint64_datatype(), - f.return_type(&[]).unwrap() - ); - assert_eq!( - f.signature(), - Signature::uniform( - 1, - vec![ConcreteDataType::string_datatype()], - Volatility::Immutable, - ) + assert_eq!(ArrowDataType::UInt64, f.return_type(&[]).unwrap()); + let expected_signature = datafusion_expr::Signature::uniform( + 1, + vec![ArrowDataType::Utf8], + datafusion_expr::Volatility::Immutable, ); + assert_eq!(*f.signature(), expected_signature); } #[tokio::test] async fn test_missing_flow_service() { - let f = FlushFlowFunction; + let factory: ScalarFunctionFactory = FlushFlowFunction::factory().into(); + let binding = factory.provide(FunctionContext::default()); + let f = binding.as_async().unwrap(); - let args = vec!["flow_name"]; - let args = args - .into_iter() - .map(|arg| Arc::new(StringVector::from_slice(&[arg])) as _) - .collect::>(); + let flow_name_array = Arc::new(arrow::array::StringArray::from(vec!["flow_name"])); - let result = f.eval(FunctionContext::default(), &args).await.unwrap_err(); + let columnar_args = vec![datafusion_expr::ColumnarValue::Array(flow_name_array as _)]; + + let func_args = datafusion::logical_expr::ScalarFunctionArgs { + args: columnar_args, + arg_fields: vec![Arc::new(arrow::datatypes::Field::new( + "arg_0", + ArrowDataType::Utf8, + false, + ))], + return_field: Arc::new(arrow::datatypes::Field::new( + "result", + ArrowDataType::UInt64, + true, + )), + number_rows: 1, + config_options: Arc::new(datafusion_common::config::ConfigOptions::default()), + }; + + let result = f.invoke_async_with_args(func_args).await.unwrap_err(); assert_eq!( - "Missing FlowServiceHandler, not expected", + "Execution error: Handler error: Missing FlowServiceHandler, not expected", result.to_string() ); } diff --git a/src/common/function/src/function.rs b/src/common/function/src/function.rs index 1965218983..d561d5c11a 100644 --- a/src/common/function/src/function.rs +++ b/src/common/function/src/function.rs @@ -41,6 +41,12 @@ impl FunctionContext { } } +impl std::fmt::Display for FunctionContext { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "FunctionContext {{ query_ctx: {} }}", self.query_ctx) + } +} + impl Default for FunctionContext { fn default() -> Self { Self { @@ -67,22 +73,3 @@ pub trait Function: fmt::Display + Sync + Send { } pub type FunctionRef = Arc; - -/// Async Scalar function trait -#[async_trait::async_trait] -pub trait AsyncFunction: fmt::Display + Sync + Send { - /// Returns the name of the function, should be unique. - fn name(&self) -> &str; - - /// The returned data type of function execution. - fn return_type(&self, input_types: &[ConcreteDataType]) -> Result; - - /// The signature of function. - fn signature(&self) -> Signature; - - /// Evaluate the function, e.g. run/execute the function. - /// TODO(dennis): simplify the signature and refactor all the admin functions. - async fn eval(&self, _func_ctx: FunctionContext, _columns: &[VectorRef]) -> Result; -} - -pub type AsyncFunctionRef = Arc; diff --git a/src/common/function/src/function_factory.rs b/src/common/function/src/function_factory.rs index 045692f187..76b88daf99 100644 --- a/src/common/function/src/function_factory.rs +++ b/src/common/function/src/function_factory.rs @@ -22,8 +22,8 @@ use crate::scalars::udf::create_udf; /// A factory for creating `ScalarUDF` that require a function context. #[derive(Clone)] pub struct ScalarFunctionFactory { - name: String, - factory: Arc ScalarUDF + Send + Sync>, + pub(crate) name: String, + pub(crate) factory: Arc ScalarUDF + Send + Sync>, } impl ScalarFunctionFactory { diff --git a/src/common/function/src/function_registry.rs b/src/common/function/src/function_registry.rs index e84fb903b6..3478f4ffff 100644 --- a/src/common/function/src/function_registry.rs +++ b/src/common/function/src/function_registry.rs @@ -24,7 +24,7 @@ use crate::aggrs::aggr_wrapper::StateMergeHelper; use crate::aggrs::approximate::ApproximateFunction; use crate::aggrs::count_hash::CountHash; use crate::aggrs::vector::VectorFunction as VectorAggrFunction; -use crate::function::{AsyncFunctionRef, Function, FunctionRef}; +use crate::function::{Function, FunctionRef}; use crate::function_factory::ScalarFunctionFactory; use crate::scalars::date::DateFunction; use crate::scalars::expression::ExpressionFunction; @@ -42,11 +42,18 @@ use crate::system::SystemFunction; #[derive(Default)] pub struct FunctionRegistry { functions: RwLock>, - async_functions: RwLock>, aggregate_functions: RwLock>, } impl FunctionRegistry { + /// Register a function in the registry by converting it into a `ScalarFunctionFactory`. + /// + /// # Arguments + /// + /// * `func` - An object that can be converted into a `ScalarFunctionFactory`. + /// + /// The function is inserted into the internal function map, keyed by its name. + /// If a function with the same name already exists, it will be replaced. pub fn register(&self, func: impl Into) { let func = func.into(); let _ = self @@ -56,18 +63,12 @@ impl FunctionRegistry { .insert(func.name().to_string(), func); } + /// Register a scalar function in the registry. pub fn register_scalar(&self, func: impl Function + 'static) { self.register(Arc::new(func) as FunctionRef); } - pub fn register_async(&self, func: AsyncFunctionRef) { - let _ = self - .async_functions - .write() - .unwrap() - .insert(func.name().to_string(), func); - } - + /// Register an aggregate function in the registry. pub fn register_aggr(&self, func: AggregateUDF) { let _ = self .aggregate_functions @@ -76,28 +77,16 @@ impl FunctionRegistry { .insert(func.name().to_string(), func); } - pub fn get_async_function(&self, name: &str) -> Option { - self.async_functions.read().unwrap().get(name).cloned() - } - - pub fn async_functions(&self) -> Vec { - self.async_functions - .read() - .unwrap() - .values() - .cloned() - .collect() - } - - #[cfg(test)] pub fn get_function(&self, name: &str) -> Option { self.functions.read().unwrap().get(name).cloned() } + /// Returns a list of all scalar functions registered in the registry. pub fn scalar_functions(&self) -> Vec { self.functions.read().unwrap().values().cloned().collect() } + /// Returns a list of all aggregate functions registered in the registry. pub fn aggregate_functions(&self) -> Vec { self.aggregate_functions .read() @@ -107,6 +96,7 @@ impl FunctionRegistry { .collect() } + /// Returns true if an aggregate function with the given name exists in the registry. pub fn is_aggr_func_exist(&self, name: &str) -> bool { self.aggregate_functions.read().unwrap().contains_key(name) } diff --git a/src/common/function/src/system.rs b/src/common/function/src/system.rs index 1d7fb40111..8699234048 100644 --- a/src/common/function/src/system.rs +++ b/src/common/function/src/system.rs @@ -19,8 +19,6 @@ mod procedure_state; mod timezone; mod version; -use std::sync::Arc; - use build::BuildFunction; use database::{ ConnectionIdFunction, CurrentSchemaFunction, DatabaseFunction, PgBackendPidFunction, @@ -46,7 +44,7 @@ impl SystemFunction { registry.register_scalar(PgBackendPidFunction); registry.register_scalar(ConnectionIdFunction); registry.register_scalar(TimezoneFunction); - registry.register_async(Arc::new(ProcedureStateFunction)); + registry.register(ProcedureStateFunction::factory()); PGCatalogFunction::register(registry); } } diff --git a/src/common/function/src/system/procedure_state.rs b/src/common/function/src/system/procedure_state.rs index 389c553111..8c2856c636 100644 --- a/src/common/function/src/system/procedure_state.rs +++ b/src/common/function/src/system/procedure_state.rs @@ -13,13 +13,14 @@ // limitations under the License. use api::v1::meta::ProcedureStatus; +use arrow::datatypes::DataType as ArrowDataType; use common_macro::admin_fn; use common_meta::rpc::procedure::ProcedureStateResponse; use common_query::error::{ InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu, }; -use common_query::prelude::{Signature, Volatility}; +use datafusion_expr::{Signature, Volatility}; use datatypes::prelude::*; use serde::Serialize; use session::context::QueryContextRef; @@ -81,73 +82,86 @@ pub(crate) async fn procedure_state( } fn signature() -> Signature { - Signature::uniform( - 1, - vec![ConcreteDataType::string_datatype()], - Volatility::Immutable, - ) + Signature::uniform(1, vec![ArrowDataType::Utf8], Volatility::Immutable) } #[cfg(test)] mod tests { use std::sync::Arc; - use common_query::prelude::TypeSignature; - use datatypes::vectors::StringVector; + use arrow::array::StringArray; + use arrow::datatypes::{DataType, Field}; + use datafusion_expr::ColumnarValue; use super::*; - use crate::function::{AsyncFunction, FunctionContext}; + use crate::function::FunctionContext; + use crate::function_factory::ScalarFunctionFactory; #[test] fn test_procedure_state_misc() { - let f = ProcedureStateFunction; + let factory: ScalarFunctionFactory = ProcedureStateFunction::factory().into(); + let f = factory.provide(FunctionContext::mock()); assert_eq!("procedure_state", f.name()); - assert_eq!( - ConcreteDataType::string_datatype(), - f.return_type(&[]).unwrap() - ); + assert_eq!(DataType::Utf8, f.return_type(&[]).unwrap()); assert!(matches!(f.signature(), - Signature { - type_signature: TypeSignature::Uniform(1, valid_types), - volatility: Volatility::Immutable - } if valid_types == vec![ConcreteDataType::string_datatype()] - )); + datafusion_expr::Signature { + type_signature: datafusion_expr::TypeSignature::Uniform(1, valid_types), + volatility: datafusion_expr::Volatility::Immutable + } if valid_types == &vec![ArrowDataType::Utf8])); } #[tokio::test] async fn test_missing_procedure_service() { - let f = ProcedureStateFunction; + let factory: ScalarFunctionFactory = ProcedureStateFunction::factory().into(); + let binding = factory.provide(FunctionContext::default()); + let f = binding.as_async().unwrap(); - let args = vec!["pid"]; - - let args = args - .into_iter() - .map(|arg| Arc::new(StringVector::from_slice(&[arg])) as _) - .collect::>(); - - let result = f.eval(FunctionContext::default(), &args).await.unwrap_err(); - assert_eq!( - "Missing ProcedureServiceHandler, not expected", - result.to_string() - ); + let func_args = datafusion::logical_expr::ScalarFunctionArgs { + args: vec![ColumnarValue::Array(Arc::new(StringArray::from(vec![ + "pid", + ])))], + arg_fields: vec![Arc::new(Field::new("arg_0", DataType::Utf8, false))], + return_field: Arc::new(Field::new("result", DataType::Utf8, true)), + number_rows: 1, + config_options: Arc::new(datafusion_common::config::ConfigOptions::default()), + }; + let result = f.invoke_async_with_args(func_args).await; + assert!(result.is_err()); } #[tokio::test] async fn test_procedure_state() { - let f = ProcedureStateFunction; + let factory: ScalarFunctionFactory = ProcedureStateFunction::factory().into(); + let provider = factory.provide(FunctionContext::mock()); + let f = provider.as_async().unwrap(); - let args = vec!["pid"]; + let func_args = datafusion::logical_expr::ScalarFunctionArgs { + args: vec![ColumnarValue::Array(Arc::new(StringArray::from(vec![ + "pid", + ])))], + arg_fields: vec![Arc::new(Field::new("arg_0", DataType::Utf8, false))], + return_field: Arc::new(Field::new("result", DataType::Utf8, true)), + number_rows: 1, + config_options: Arc::new(datafusion_common::config::ConfigOptions::default()), + }; + let result = f.invoke_async_with_args(func_args).await.unwrap(); - let args = args - .into_iter() - .map(|arg| Arc::new(StringVector::from_slice(&[arg])) as _) - .collect::>(); - - let result = f.eval(FunctionContext::mock(), &args).await.unwrap(); - - let expect: VectorRef = Arc::new(StringVector::from(vec![ - "{\"status\":\"Done\",\"error\":\"OK\"}", - ])); - assert_eq!(expect, result); + match result { + ColumnarValue::Array(array) => { + let result_array = array.as_any().downcast_ref::().unwrap(); + assert_eq!( + result_array.value(0), + "{\"status\":\"Done\",\"error\":\"OK\"}" + ); + } + ColumnarValue::Scalar(scalar) => { + assert_eq!( + scalar, + datafusion_common::ScalarValue::Utf8(Some( + "{\"status\":\"Done\",\"error\":\"OK\"}".to_string() + )) + ); + } + } } } diff --git a/src/common/macro/src/admin_fn.rs b/src/common/macro/src/admin_fn.rs index 902bede814..5e8701e4d7 100644 --- a/src/common/macro/src/admin_fn.rs +++ b/src/common/macro/src/admin_fn.rs @@ -187,8 +187,28 @@ fn build_struct( quote! { #(#attrs)* - #[derive(Debug)] - #vis struct #name; + #vis struct #name { + signature: datafusion_expr::Signature, + func_ctx: #user_path::function::FunctionContext, + } + + impl #name { + /// Creates a new instance of the function with function context. + fn create(signature: datafusion_expr::Signature, func_ctx: #user_path::function::FunctionContext) -> Self { + Self { + signature, + func_ctx, + } + } + + /// Returns the [`ScalarFunctionFactory`] of the function. + pub fn factory() -> impl Into< #user_path::function_factory::ScalarFunctionFactory> { + Self { + signature: #sig_fn().into(), + func_ctx: #user_path::function::FunctionContext::default(), + } + } + } impl std::fmt::Display for #name { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { @@ -196,24 +216,87 @@ fn build_struct( } } + impl std::fmt::Debug for #name { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}({})", #uppcase_display_name, self.func_ctx) + } + } - #[async_trait::async_trait] - impl #user_path::function::AsyncFunction for #name { - fn name(&self) -> &'static str { + // Implement DataFusion's ScalarUDFImpl trait + impl datafusion::logical_expr::ScalarUDFImpl for #name { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn name(&self) -> &str { #display_name } - fn return_type(&self, _input_types: &[store_api::storage::ConcreteDataType]) -> common_query::error::Result { - Ok(store_api::storage::ConcreteDataType::#ret()) + fn signature(&self) -> &datafusion_expr::Signature { + &self.signature } - fn signature(&self) -> Signature { - #sig_fn() + fn return_type(&self, _arg_types: &[datafusion::arrow::datatypes::DataType]) -> datafusion_common::Result { + use datatypes::data_type::DataType; + Ok(store_api::storage::ConcreteDataType::#ret().as_arrow_type()) } - async fn eval(&self, func_ctx: #user_path::function::FunctionContext, columns: &[datatypes::vectors::VectorRef]) -> common_query::error::Result { - // Ensure under the `greptime` catalog for security - #user_path::ensure_greptime!(func_ctx); + fn invoke_with_args( + &self, + _args: datafusion::logical_expr::ScalarFunctionArgs, + ) -> datafusion_common::Result { + Err(datafusion_common::DataFusionError::NotImplemented( + format!("{} can only be called from async contexts", #display_name) + )) + } + } + + /// Implement From trait for ScalarFunctionFactory + impl From<#name> for #user_path::function_factory::ScalarFunctionFactory { + fn from(func: #name) -> Self { + use std::sync::Arc; + use datafusion_expr::ScalarUDFImpl; + use datafusion_expr::async_udf::AsyncScalarUDF; + + let name = func.name().to_string(); + + let func = Arc::new(move |ctx: #user_path::function::FunctionContext| { + // create the UDF dynamically with function context + let udf_impl = #name::create(func.signature.clone(), ctx); + let async_udf = AsyncScalarUDF::new(Arc::new(udf_impl)); + async_udf.into_scalar_udf() + }); + Self { + name, + factory: func, + } + } + } + + // Implement DataFusion's AsyncScalarUDFImpl trait + #[async_trait::async_trait] + impl datafusion_expr::async_udf::AsyncScalarUDFImpl for #name { + async fn invoke_async_with_args( + &self, + args: datafusion::logical_expr::ScalarFunctionArgs, + ) -> datafusion_common::Result { + let columns = args.args + .iter() + .map(|arg| { + common_query::prelude::ColumnarValue::try_from(arg) + .and_then(|cv| match cv { + common_query::prelude::ColumnarValue::Vector(v) => Ok(v), + common_query::prelude::ColumnarValue::Scalar(s) => { + datatypes::vectors::Helper::try_from_scalar_value(s, args.number_rows) + .context(common_query::error::FromScalarValueSnafu) + } + }) + }) + .collect::>>() + .map_err(|e| datafusion_common::DataFusionError::Execution(format!("Column conversion error: {}", e)))?; + + // Safety check: Ensure under the `greptime` catalog for security + #user_path::ensure_greptime!(self.func_ctx); let columns_num = columns.len(); let rows_num = if columns.is_empty() { @@ -221,23 +304,24 @@ fn build_struct( } else { columns[0].len() }; - let columns = Vec::from(columns); - use snafu::OptionExt; + use snafu::{OptionExt, ResultExt}; use datatypes::data_type::DataType; - let query_ctx = &func_ctx.query_ctx; - let handler = func_ctx + let query_ctx = &self.func_ctx.query_ctx; + let handler = self.func_ctx .state .#handler .as_ref() - .context(#snafu_type)?; + .context(#snafu_type) + .map_err(|e| datafusion_common::DataFusionError::Execution(format!("Handler error: {}", e)))?; let mut builder = store_api::storage::ConcreteDataType::#ret() .create_mutable_vector(rows_num); if columns_num == 0 { - let result = #fn_name(handler, query_ctx, &[]).await?; + let result = #fn_name(handler, query_ctx, &[]).await + .map_err(|e| datafusion_common::DataFusionError::Execution(format!("Function execution error: {}", e)))?; builder.push_value_ref(result.as_value_ref()); } else { @@ -246,15 +330,18 @@ fn build_struct( .map(|vector| vector.get_ref(i)) .collect(); - let result = #fn_name(handler, query_ctx, &args).await?; + let result = #fn_name(handler, query_ctx, &args).await + .map_err(|e| datafusion_common::DataFusionError::Execution(format!("Function execution error: {}", e)))?; builder.push_value_ref(result.as_value_ref()); } } - Ok(builder.to_vector()) - } + let result_vector = builder.to_vector(); + // Convert result back to DataFusion ColumnarValue + Ok(datafusion_expr::ColumnarValue::Array(result_vector.to_arrow_array())) + } } } .into() diff --git a/src/common/query/src/signature.rs b/src/common/query/src/signature.rs index 6aab88d8e8..9b01eba12e 100644 --- a/src/common/query/src/signature.rs +++ b/src/common/query/src/signature.rs @@ -156,6 +156,49 @@ impl From for DfTypeSignature { } } +impl From<&DfTypeSignature> for TypeSignature { + fn from(type_signature: &DfTypeSignature) -> TypeSignature { + match type_signature { + DfTypeSignature::Variadic(types) => TypeSignature::Variadic( + types + .iter() + .map(ConcreteDataType::from_arrow_type) + .collect(), + ), + DfTypeSignature::Uniform(n, types) => { + if *n == 0 { + return TypeSignature::NullAry; + } + TypeSignature::Uniform( + *n, + types + .iter() + .map(ConcreteDataType::from_arrow_type) + .collect(), + ) + } + DfTypeSignature::Exact(types) => TypeSignature::Exact( + types + .iter() + .map(ConcreteDataType::from_arrow_type) + .collect(), + ), + DfTypeSignature::Any(n) => { + if *n == 0 { + return TypeSignature::NullAry; + } + TypeSignature::Any(*n) + } + DfTypeSignature::OneOf(ts) => TypeSignature::OneOf(ts.iter().map(Into::into).collect()), + DfTypeSignature::VariadicAny => TypeSignature::VariadicAny, + DfTypeSignature::Nullary => TypeSignature::NullAry, + // Other type signatures are currently mapped to VariadicAny as a fallback. + // These cases are not used in the current UDF implementation. + _ => TypeSignature::VariadicAny, + } + } +} + impl From for DfSignature { fn from(sig: Signature) -> DfSignature { DfSignature::new(sig.type_signature.into(), sig.volatility) diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index 9ab849292a..5996a4a2b2 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -20,6 +20,7 @@ use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use datafusion::parquet; +use datafusion_common::DataFusionError; use datatypes::arrow::error::ArrowError; use snafu::{Location, Snafu}; use table::metadata::TableType; @@ -42,16 +43,18 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to execute admin function"))] - ExecuteAdminFunction { - #[snafu(implicit)] - location: Location, - source: common_query::error::Error, - }, - #[snafu(display("Failed to build admin function args: {msg}"))] BuildAdminFunctionArgs { msg: String }, + #[snafu(display("Failed to execute admin function: {msg}, error: {error}"))] + ExecuteAdminFunction { + msg: String, + #[snafu(source)] + error: DataFusionError, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Expected {expected} args, but actual {actual}"))] FunctionArityMismatch { expected: usize, actual: usize }, @@ -937,7 +940,7 @@ impl ErrorExt for Error { Error::BuildDfLogicalPlan { .. } | Error::BuildTableMeta { .. } | Error::MissingInsertBody { .. } => StatusCode::Internal, - Error::EncodeJson { .. } => StatusCode::Unexpected, + Error::ExecuteAdminFunction { .. } | Error::EncodeJson { .. } => StatusCode::Unexpected, Error::ViewNotFound { .. } | Error::ViewInfoNotFound { .. } | Error::TableNotFound { .. } => StatusCode::TableNotFound, @@ -980,7 +983,6 @@ impl ErrorExt for Error { | Error::ParseSqlValue { .. } | Error::InvalidTimestampRange { .. } => StatusCode::InvalidArguments, Error::CreateLogicalTables { .. } => StatusCode::Unexpected, - Error::ExecuteAdminFunction { source, .. } => source.status_code(), Error::BuildRecordBatch { source, .. } => source.status_code(), Error::UpgradeCatalogManagerRef { .. } => StatusCode::Internal, Error::ColumnOptions { source, .. } => source.status_code(), diff --git a/src/operator/src/statement/admin.rs b/src/operator/src/statement/admin.rs index 9890af6497..9cfab774d3 100644 --- a/src/operator/src/statement/admin.rs +++ b/src/operator/src/statement/admin.rs @@ -32,7 +32,7 @@ use snafu::{ensure, OptionExt, ResultExt}; use sql::ast::{Expr, FunctionArg, FunctionArgExpr, FunctionArguments, Value as SqlValue}; use sql::statements::admin::Admin; -use crate::error::{self, Result}; +use crate::error::{self, ExecuteAdminFunctionSnafu, IntoVectorsSnafu, Result}; use crate::statement::StatementExecutor; const DUMMY_COLUMN: &str = ""; @@ -48,14 +48,22 @@ impl StatementExecutor { let Admin::Func(func) = &stmt; // the function name should be in lower case. let func_name = func.name.to_string().to_lowercase(); - let admin_func = FUNCTION_REGISTRY - .get_async_function(&func_name) + let factory = FUNCTION_REGISTRY + .get_function(&func_name) .context(error::AdminFunctionNotFoundSnafu { name: func_name })?; + let func_ctx = FunctionContext { + query_ctx: query_ctx.clone(), + state: self.query_engine.engine_state().function_state(), + }; - let signature = admin_func.signature(); + let admin_udf = factory.provide(func_ctx); + let fn_name = admin_udf.name(); + let signature = admin_udf.signature(); + + // Parse function arguments let FunctionArguments::List(args) = &func.args else { return error::BuildAdminFunctionArgsSnafu { - msg: format!("unsupported function args {}", func.args), + msg: format!("unsupported function args {} for {}", func.args, fn_name), } .fail(); }; @@ -65,7 +73,7 @@ impl StatementExecutor { .map(|arg| { let FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Value(value))) = arg else { return error::BuildAdminFunctionArgsSnafu { - msg: format!("unsupported function arg {arg}"), + msg: format!("unsupported function arg {arg} for {}", fn_name), } .fail(); }; @@ -73,30 +81,84 @@ impl StatementExecutor { }) .collect::>>()?; - let args = args_to_vector(&signature.type_signature, &arg_values, &query_ctx)?; - let arg_types = args.iter().map(|arg| arg.data_type()).collect::>(); + let type_sig = (&signature.type_signature).into(); + let args = args_to_vector(&type_sig, &arg_values, &query_ctx)?; + let arg_types = args + .iter() + .map(|arg| arg.data_type().as_arrow_type()) + .collect::>(); + let ret_type = admin_udf.return_type(&arg_types).map_err(|e| { + error::Error::BuildAdminFunctionArgs { + msg: format!( + "Failed to get return type of admin function {}: {}", + fn_name, e + ), + } + })?; - let func_ctx = FunctionContext { - query_ctx, - state: self.query_engine.engine_state().function_state(), + // Convert arguments to DataFusion ColumnarValue format + let columnar_args: Vec = args + .iter() + .map(|vector| datafusion_expr::ColumnarValue::Array(vector.to_arrow_array())) + .collect(); + + // Create ScalarFunctionArgs following the same pattern as udf.rs + let func_args = datafusion::logical_expr::ScalarFunctionArgs { + args: columnar_args, + arg_fields: args + .iter() + .enumerate() + .map(|(i, vector)| { + Arc::new(arrow::datatypes::Field::new( + format!("arg_{}", i), + arg_types[i].clone(), + vector.null_count() > 0, + )) + }) + .collect(), + return_field: Arc::new(arrow::datatypes::Field::new("result", ret_type, true)), + number_rows: if args.is_empty() { 1 } else { args[0].len() }, + config_options: Arc::new(datafusion_common::config::ConfigOptions::default()), }; - let result = admin_func - .eval(func_ctx, &args) + // Execute the async UDF + let result_columnar = admin_udf + .as_async() + .with_context(|| error::BuildAdminFunctionArgsSnafu { + msg: format!("Function {} is not async", fn_name), + })? + .invoke_async_with_args(func_args) .await - .context(error::ExecuteAdminFunctionSnafu)?; + .with_context(|_| ExecuteAdminFunctionSnafu { + msg: format!("Failed to execute admin function {}", fn_name), + })?; + // Convert result back to VectorRef + let result = match result_columnar { + datafusion_expr::ColumnarValue::Array(array) => { + datatypes::vectors::Helper::try_into_vector(array).context(IntoVectorsSnafu)? + } + datafusion_expr::ColumnarValue::Scalar(scalar) => { + let array = + scalar + .to_array_of_size(1) + .with_context(|_| ExecuteAdminFunctionSnafu { + msg: format!("Failed to convert scalar to array for {}", fn_name), + })?; + datatypes::vectors::Helper::try_into_vector(array).context(IntoVectorsSnafu)? + } + }; + + let result_vector: VectorRef = result; let column_schemas = vec![ColumnSchema::new( // Use statement as the result column name stmt.to_string(), - admin_func - .return_type(&arg_types) - .context(error::ExecuteAdminFunctionSnafu)?, + result_vector.data_type(), false, )]; let schema = Arc::new(Schema::new(column_schemas)); - let batch = - RecordBatch::new(schema.clone(), vec![result]).context(error::BuildRecordBatchSnafu)?; + let batch = RecordBatch::new(schema.clone(), vec![result_vector]) + .context(error::BuildRecordBatchSnafu)?; let batches = RecordBatches::try_new(schema, vec![batch]).context(error::BuildRecordBatchSnafu)?; diff --git a/tests/cases/distributed/information_schema/cluster_info.result b/tests/cases/distributed/information_schema/cluster_info.result index ce7588a932..5ee5ddf5fd 100644 --- a/tests/cases/distributed/information_schema/cluster_info.result +++ b/tests/cases/distributed/information_schema/cluster_info.result @@ -19,7 +19,7 @@ DESC TABLE CLUSTER_INFO; -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\s\d+\.\d+(?:\.\d+)+\s) Version --- SQLNESS REPLACE (\s[a-z0-9]{7,9}\s) Hash +-- SQLNESS REPLACE (\s[a-z0-9]{7,10}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{19,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration -- SQLNESS REPLACE (\s127\.0\.0\.1:\d+\s) Address @@ -30,7 +30,7 @@ SELECT * FROM CLUSTER_INFO ORDER BY peer_type; -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\s\d+\.\d+(?:\.\d+)+\s) Version --- SQLNESS REPLACE (\s[a-z0-9]{7,9}\s) Hash +-- SQLNESS REPLACE (\s[a-z0-9]{7,10}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{19,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration -- SQLNESS REPLACE (\s127\.0\.0\.1:\d+\s) Address @@ -41,7 +41,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'METASRV' ORDER BY peer_type; -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\s\d+\.\d+(?:\.\d+)+\s) Version --- SQLNESS REPLACE (\s[a-z0-9]{7,9}\s) Hash +-- SQLNESS REPLACE (\s[a-z0-9]{7,10}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{19,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration -- SQLNESS REPLACE (\s127\.0\.0\.1:\d+\s) Address @@ -52,7 +52,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'FRONTEND' ORDER BY peer_type; -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\s\d+\.\d+(?:\.\d+)+\s) Version --- SQLNESS REPLACE (\s[a-z0-9]{7,9}\s) Hash +-- SQLNESS REPLACE (\s[a-z0-9]{7,10}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{19,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration -- SQLNESS REPLACE (\s127\.0\.0\.1:\d+\s) Address @@ -63,7 +63,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'FRONTEND' ORDER BY peer_type; -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\s\d+\.\d+(?:\.\d+)+\s) Version --- SQLNESS REPLACE (\s[a-z0-9]{7,9}\s) Hash +-- SQLNESS REPLACE (\s[a-z0-9]{7,10}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{19,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration -- SQLNESS REPLACE (\s127\.0\.0\.1:\d+\s) Address diff --git a/tests/cases/distributed/information_schema/cluster_info.sql b/tests/cases/distributed/information_schema/cluster_info.sql index 08c0e452a0..e148794dc9 100644 --- a/tests/cases/distributed/information_schema/cluster_info.sql +++ b/tests/cases/distributed/information_schema/cluster_info.sql @@ -4,7 +4,7 @@ DESC TABLE CLUSTER_INFO; -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\s\d+\.\d+(?:\.\d+)+\s) Version --- SQLNESS REPLACE (\s[a-z0-9]{7,9}\s) Hash +-- SQLNESS REPLACE (\s[a-z0-9]{7,10}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{19,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration -- SQLNESS REPLACE (\s127\.0\.0\.1:\d+\s) Address @@ -13,7 +13,7 @@ SELECT * FROM CLUSTER_INFO ORDER BY peer_type; -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\s\d+\.\d+(?:\.\d+)+\s) Version --- SQLNESS REPLACE (\s[a-z0-9]{7,9}\s) Hash +-- SQLNESS REPLACE (\s[a-z0-9]{7,10}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{19,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration -- SQLNESS REPLACE (\s127\.0\.0\.1:\d+\s) Address @@ -22,7 +22,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'METASRV' ORDER BY peer_type; -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\s\d+\.\d+(?:\.\d+)+\s) Version --- SQLNESS REPLACE (\s[a-z0-9]{7,9}\s) Hash +-- SQLNESS REPLACE (\s[a-z0-9]{7,10}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{19,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration -- SQLNESS REPLACE (\s127\.0\.0\.1:\d+\s) Address @@ -31,7 +31,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'FRONTEND' ORDER BY peer_type; -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\s\d+\.\d+(?:\.\d+)+\s) Version --- SQLNESS REPLACE (\s[a-z0-9]{7,9}\s) Hash +-- SQLNESS REPLACE (\s[a-z0-9]{7,10}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{19,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration -- SQLNESS REPLACE (\s127\.0\.0\.1:\d+\s) Address @@ -40,7 +40,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'FRONTEND' ORDER BY peer_type; -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\s\d+\.\d+(?:\.\d+)+\s) Version --- SQLNESS REPLACE (\s[a-z0-9]{7,9}\s) Hash +-- SQLNESS REPLACE (\s[a-z0-9]{7,10}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{19,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration -- SQLNESS REPLACE (\s127\.0\.0\.1:\d+\s) Address diff --git a/tests/cases/standalone/common/function/admin/flush_compact_table.result b/tests/cases/standalone/common/function/admin/flush_compact_table.result index 950461f98e..0c3fdbc0de 100644 --- a/tests/cases/standalone/common/function/admin/flush_compact_table.result +++ b/tests/cases/standalone/common/function/admin/flush_compact_table.result @@ -35,6 +35,22 @@ ADMIN COMPACT_TABLE('test'); | 0 | +-----------------------------+ +SELECT FLUSH_TABLE('test'); + ++---------------------------+ +| flush_table(Utf8("test")) | ++---------------------------+ +| 0 | ++---------------------------+ + +SELECT COMPACT_TABLE('test'); + ++-----------------------------+ +| compact_table(Utf8("test")) | ++-----------------------------+ +| 0 | ++-----------------------------+ + --- doesn't change anything --- SELECT * FROM test; diff --git a/tests/cases/standalone/common/function/admin/flush_compact_table.sql b/tests/cases/standalone/common/function/admin/flush_compact_table.sql index 8c52862cdb..a1a316b35c 100644 --- a/tests/cases/standalone/common/function/admin/flush_compact_table.sql +++ b/tests/cases/standalone/common/function/admin/flush_compact_table.sql @@ -10,6 +10,10 @@ ADMIN FLUSH_TABLE('test'); ADMIN COMPACT_TABLE('test'); +SELECT FLUSH_TABLE('test'); + +SELECT COMPACT_TABLE('test'); + --- doesn't change anything --- SELECT * FROM test; diff --git a/tests/cases/standalone/common/insert/logical_metric_table.result b/tests/cases/standalone/common/insert/logical_metric_table.result index 2985d2e9c0..ad6142050d 100644 --- a/tests/cases/standalone/common/insert/logical_metric_table.result +++ b/tests/cases/standalone/common/insert/logical_metric_table.result @@ -81,7 +81,7 @@ CREATE TABLE phy ( ts timestamp time index, val double ) engine = metric with ( - "physical_metric_table" = "", + "physical_metric_table" = "", "memtable.type" = "partition_tree", "memtable.partition_tree.primary_key_encoding" = "sparse" ); @@ -127,9 +127,13 @@ SELECT * from t2; | job1 | 1970-01-01T00:00:00 | 0.0 | +------+-------------------------+-----+ -ADMIN flush_table("phy"); +ADMIN flush_table('phy'); -Error: 1004(InvalidArguments), Failed to build admin function args: unsupported function arg "phy" ++--------------------------+ +| ADMIN flush_table('phy') | ++--------------------------+ +| 0 | ++--------------------------+ -- SQLNESS ARG restart=true INSERT INTO t2 VALUES ('job3', 0, 0), ('job4', 1, 1); diff --git a/tests/cases/standalone/common/insert/logical_metric_table.sql b/tests/cases/standalone/common/insert/logical_metric_table.sql index 7a3bd00935..9899699c66 100644 --- a/tests/cases/standalone/common/insert/logical_metric_table.sql +++ b/tests/cases/standalone/common/insert/logical_metric_table.sql @@ -28,7 +28,7 @@ CREATE TABLE phy ( ts timestamp time index, val double ) engine = metric with ( - "physical_metric_table" = "", + "physical_metric_table" = "", "memtable.type" = "partition_tree", "memtable.partition_tree.primary_key_encoding" = "sparse" ); @@ -47,7 +47,7 @@ INSERT INTO t2 VALUES ('job1', 0, 0), ('job2', 1, 1); SELECT * from t2; -ADMIN flush_table("phy"); +ADMIN flush_table('phy'); -- SQLNESS ARG restart=true INSERT INTO t2 VALUES ('job3', 0, 0), ('job4', 1, 1); diff --git a/tests/cases/standalone/information_schema/cluster_info.result b/tests/cases/standalone/information_schema/cluster_info.result index 9af6224113..0343158962 100644 --- a/tests/cases/standalone/information_schema/cluster_info.result +++ b/tests/cases/standalone/information_schema/cluster_info.result @@ -19,7 +19,7 @@ DESC TABLE CLUSTER_INFO; -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\d+\.\d+(?:\.\d+)+) Version --- SQLNESS REPLACE (\s[a-z0-9]{7,9}\s) Hash +-- SQLNESS REPLACE (\s[a-z0-9]{7,10}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration -- SQLNESS REPLACE [\s\-]+ @@ -29,7 +29,7 @@ SELECT * FROM CLUSTER_INFO; -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\d+\.\d+(?:\.\d+)+) Version --- SQLNESS REPLACE (\s[a-z0-9]{7,9}\s) Hash +-- SQLNESS REPLACE (\s[a-z0-9]{7,10}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration -- SQLNESS REPLACE [\s\-]+ @@ -44,7 +44,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'STANDALONE'; -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\d+\.\d+(?:\.\d+)+) Version --- SQLNESS REPLACE (\s[a-z0-9]{7,9}\s) Hash +-- SQLNESS REPLACE (\s[a-z0-9]{7,10}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration -- SQLNESS REPLACE [\s\-]+ diff --git a/tests/cases/standalone/information_schema/cluster_info.sql b/tests/cases/standalone/information_schema/cluster_info.sql index b0ad87fbbd..8400d2832e 100644 --- a/tests/cases/standalone/information_schema/cluster_info.sql +++ b/tests/cases/standalone/information_schema/cluster_info.sql @@ -4,7 +4,7 @@ DESC TABLE CLUSTER_INFO; -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\d+\.\d+(?:\.\d+)+) Version --- SQLNESS REPLACE (\s[a-z0-9]{7,9}\s) Hash +-- SQLNESS REPLACE (\s[a-z0-9]{7,10}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration -- SQLNESS REPLACE [\s\-]+ @@ -12,7 +12,7 @@ SELECT * FROM CLUSTER_INFO; -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\d+\.\d+(?:\.\d+)+) Version --- SQLNESS REPLACE (\s[a-z0-9]{7,9}\s) Hash +-- SQLNESS REPLACE (\s[a-z0-9]{7,10}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration -- SQLNESS REPLACE [\s\-]+ @@ -22,7 +22,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'STANDALONE'; -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\d+\.\d+(?:\.\d+)+) Version --- SQLNESS REPLACE (\s[a-z0-9]{7,9}\s) Hash +-- SQLNESS REPLACE (\s[a-z0-9]{7,10}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{15,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration -- SQLNESS REPLACE [\s\-]+