refactor: use DataFusion's Signature directly in UDF (#6908)

* refactor: use DataFusion's Signature directly in UDF

Signed-off-by: luofucong <luofc@foxmail.com>

* fix sqlness

Signed-off-by: luofucong <luofc@foxmail.com>

* resolve PR comments

Signed-off-by: luofucong <luofc@foxmail.com>

* fix ci

Signed-off-by: luofucong <luofc@foxmail.com>

---------

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
LFC
2025-09-05 13:04:56 +08:00
committed by GitHub
parent f7c8d86ebb
commit 48f5db3f5f
65 changed files with 297 additions and 756 deletions

1
Cargo.lock generated
View File

@@ -2192,7 +2192,6 @@ dependencies = [
"nalgebra",
"num",
"num-traits",
"once_cell",
"paste",
"pretty_assertions",
"s2",

View File

@@ -49,7 +49,6 @@ memchr = "2.7"
nalgebra.workspace = true
num = "0.4"
num-traits = "0.2"
once_cell.workspace = true
paste.workspace = true
s2 = { version = "0.0.12", optional = true }
serde.workspace = true

View File

@@ -33,6 +33,7 @@ use datafusion::error::{DataFusionError, Result as DfResult};
use datafusion::logical_expr::function::AccumulatorArgs;
use datafusion::logical_expr::{Accumulator as DfAccumulator, AggregateUDF};
use datafusion::prelude::create_udaf;
use datafusion_expr::Volatility;
use datatypes::arrow::datatypes::DataType;
use hyperloglogplus::{HyperLogLog, HyperLogLogPlus};

View File

@@ -26,7 +26,7 @@ use datafusion::common::cast::{as_binary_array, as_primitive_array};
use datafusion::common::not_impl_err;
use datafusion::error::{DataFusionError, Result as DfResult};
use datafusion::logical_expr::function::AccumulatorArgs;
use datafusion::logical_expr::{Accumulator as DfAccumulator, AggregateUDF};
use datafusion::logical_expr::{Accumulator as DfAccumulator, AggregateUDF, Volatility};
use datafusion::physical_plan::expressions::Literal;
use datafusion::prelude::create_udaf;
use datatypes::arrow::array::ArrayRef;

View File

@@ -16,7 +16,7 @@ use std::fmt;
use std::sync::Arc;
use common_query::error::Result;
use common_query::prelude::Signature;
use datafusion_expr::Signature;
use datatypes::data_type::ConcreteDataType;
use datatypes::vectors::VectorRef;
use session::context::{QueryContextBuilder, QueryContextRef};

View File

@@ -14,10 +14,9 @@
//! functions registry
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::sync::{Arc, LazyLock, RwLock};
use datafusion_expr::AggregateUDF;
use once_cell::sync::Lazy;
use crate::admin::AdminFunction;
use crate::aggrs::aggr_wrapper::StateMergeHelper;
@@ -102,7 +101,7 @@ impl FunctionRegistry {
}
}
pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
pub static FUNCTION_REGISTRY: LazyLock<Arc<FunctionRegistry>> = LazyLock::new(|| {
let function_registry = FunctionRegistry::default();
// Utility functions

View File

@@ -16,14 +16,15 @@ use api::v1::meta::ResolveStrategy;
use common_query::error::{
InvalidFuncArgsSnafu, InvalidInputTypeSnafu, Result, UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::{Signature, TypeSignature, Volatility};
use datafusion_expr::{Signature, TypeSignature, Volatility};
use datatypes::arrow::datatypes::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::types::cast::cast;
use datatypes::value::ValueRef;
use snafu::{OptionExt, ResultExt};
/// Create a function signature with oneof signatures of interleaving two arguments.
pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>) -> Signature {
pub(crate) fn one_of_sigs2(args1: Vec<DataType>, args2: Vec<DataType>) -> Signature {
let mut sigs = Vec::with_capacity(args1.len() * args2.len());
for arg1 in &args1 {

View File

@@ -15,8 +15,9 @@
use std::fmt;
use common_query::error::{ArrowComputeSnafu, IntoVectorSnafu, InvalidFuncArgsSnafu, Result};
use common_query::prelude::Signature;
use datafusion_expr::Signature;
use datatypes::arrow::compute::kernels::numeric;
use datatypes::arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
use datatypes::prelude::ConcreteDataType;
use datatypes::vectors::{Helper, VectorRef};
use snafu::{ensure, ResultExt};
@@ -44,16 +45,16 @@ impl Function for DateAddFunction {
fn signature(&self) -> Signature {
helper::one_of_sigs2(
vec![
ConcreteDataType::date_datatype(),
ConcreteDataType::timestamp_second_datatype(),
ConcreteDataType::timestamp_millisecond_datatype(),
ConcreteDataType::timestamp_microsecond_datatype(),
ConcreteDataType::timestamp_nanosecond_datatype(),
DataType::Date32,
DataType::Timestamp(TimeUnit::Second, None),
DataType::Timestamp(TimeUnit::Millisecond, None),
DataType::Timestamp(TimeUnit::Microsecond, None),
DataType::Timestamp(TimeUnit::Nanosecond, None),
],
vec![
ConcreteDataType::interval_month_day_nano_datatype(),
ConcreteDataType::interval_year_month_datatype(),
ConcreteDataType::interval_day_time_datatype(),
DataType::Interval(IntervalUnit::MonthDayNano),
DataType::Interval(IntervalUnit::YearMonth),
DataType::Interval(IntervalUnit::DayTime),
],
)
}
@@ -90,7 +91,7 @@ impl fmt::Display for DateAddFunction {
mod tests {
use std::sync::Arc;
use common_query::prelude::{TypeSignature, Volatility};
use datafusion_expr::{TypeSignature, Volatility};
use datatypes::arrow::datatypes::IntervalDayTime;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::Value;

View File

@@ -16,7 +16,8 @@ use std::fmt;
use common_error::ext::BoxedError;
use common_query::error::{self, InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu};
use common_query::prelude::Signature;
use datafusion_expr::Signature;
use datatypes::arrow::datatypes::{DataType, TimeUnit};
use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder};
use datatypes::vectors::{StringVectorBuilder, VectorRef};
use snafu::{ensure, ResultExt};
@@ -42,13 +43,13 @@ impl Function for DateFormatFunction {
fn signature(&self) -> Signature {
helper::one_of_sigs2(
vec![
ConcreteDataType::date_datatype(),
ConcreteDataType::timestamp_second_datatype(),
ConcreteDataType::timestamp_millisecond_datatype(),
ConcreteDataType::timestamp_microsecond_datatype(),
ConcreteDataType::timestamp_nanosecond_datatype(),
DataType::Date32,
DataType::Timestamp(TimeUnit::Second, None),
DataType::Timestamp(TimeUnit::Millisecond, None),
DataType::Timestamp(TimeUnit::Microsecond, None),
DataType::Timestamp(TimeUnit::Nanosecond, None),
],
vec![ConcreteDataType::string_datatype()],
vec![DataType::Utf8],
)
}
@@ -127,7 +128,7 @@ impl fmt::Display for DateFormatFunction {
mod tests {
use std::sync::Arc;
use common_query::prelude::{TypeSignature, Volatility};
use datafusion_expr::{TypeSignature, Volatility};
use datatypes::prelude::{ConcreteDataType, ScalarVector};
use datatypes::value::Value;
use datatypes::vectors::{DateVector, StringVector, TimestampSecondVector};

View File

@@ -15,8 +15,9 @@
use std::fmt;
use common_query::error::{ArrowComputeSnafu, IntoVectorSnafu, InvalidFuncArgsSnafu, Result};
use common_query::prelude::Signature;
use datafusion_expr::Signature;
use datatypes::arrow::compute::kernels::numeric;
use datatypes::arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
use datatypes::prelude::ConcreteDataType;
use datatypes::vectors::{Helper, VectorRef};
use snafu::{ensure, ResultExt};
@@ -44,16 +45,16 @@ impl Function for DateSubFunction {
fn signature(&self) -> Signature {
helper::one_of_sigs2(
vec![
ConcreteDataType::date_datatype(),
ConcreteDataType::timestamp_second_datatype(),
ConcreteDataType::timestamp_millisecond_datatype(),
ConcreteDataType::timestamp_microsecond_datatype(),
ConcreteDataType::timestamp_nanosecond_datatype(),
DataType::Date32,
DataType::Timestamp(TimeUnit::Second, None),
DataType::Timestamp(TimeUnit::Millisecond, None),
DataType::Timestamp(TimeUnit::Microsecond, None),
DataType::Timestamp(TimeUnit::Nanosecond, None),
],
vec![
ConcreteDataType::interval_month_day_nano_datatype(),
ConcreteDataType::interval_year_month_datatype(),
ConcreteDataType::interval_day_time_datatype(),
DataType::Interval(IntervalUnit::MonthDayNano),
DataType::Interval(IntervalUnit::YearMonth),
DataType::Interval(IntervalUnit::DayTime),
],
)
}
@@ -90,7 +91,7 @@ impl fmt::Display for DateSubFunction {
mod tests {
use std::sync::Arc;
use common_query::prelude::{TypeSignature, Volatility};
use datafusion_expr::{TypeSignature, Volatility};
use datatypes::arrow::datatypes::IntervalDayTime;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::Value;

View File

@@ -18,9 +18,9 @@ use std::sync::Arc;
use common_query::error;
use common_query::error::{ArrowComputeSnafu, InvalidFuncArgsSnafu};
use common_query::prelude::{Signature, Volatility};
use datafusion::arrow::array::ArrayRef;
use datafusion::arrow::compute::is_null;
use datafusion_expr::{Signature, Volatility};
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::VectorRef;
use datatypes::vectors::Helper;
@@ -79,7 +79,7 @@ impl Function for IsNullFunction {
mod tests {
use std::sync::Arc;
use common_query::prelude::TypeSignature;
use datafusion_expr::TypeSignature;
use datatypes::scalars::ScalarVector;
use datatypes::vectors::{BooleanVector, Float32Vector};

View File

@@ -17,8 +17,9 @@ use std::fmt;
use common_error::ext::{BoxedError, PlainError};
use common_error::status_code::StatusCode;
use common_query::error::{self, InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, TypeSignature};
use datafusion::logical_expr::Volatility;
use datafusion_expr::type_coercion::aggregates::INTEGERS;
use datafusion_expr::{Signature, TypeSignature, Volatility};
use datatypes::arrow::datatypes::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::{Scalar, ScalarVectorBuilder};
use datatypes::value::{ListValue, Value};
@@ -91,20 +92,8 @@ impl Function for GeohashFunction {
fn signature(&self) -> Signature {
let mut signatures = Vec::new();
for coord_type in &[
ConcreteDataType::float32_datatype(),
ConcreteDataType::float64_datatype(),
] {
for resolution_type in &[
ConcreteDataType::int8_datatype(),
ConcreteDataType::int16_datatype(),
ConcreteDataType::int32_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::uint8_datatype(),
ConcreteDataType::uint16_datatype(),
ConcreteDataType::uint32_datatype(),
ConcreteDataType::uint64_datatype(),
] {
for coord_type in &[DataType::Float32, DataType::Float64] {
for resolution_type in INTEGERS {
signatures.push(TypeSignature::Exact(vec![
// latitude
coord_type.clone(),
@@ -191,20 +180,8 @@ impl Function for GeohashNeighboursFunction {
fn signature(&self) -> Signature {
let mut signatures = Vec::new();
for coord_type in &[
ConcreteDataType::float32_datatype(),
ConcreteDataType::float64_datatype(),
] {
for resolution_type in &[
ConcreteDataType::int8_datatype(),
ConcreteDataType::int16_datatype(),
ConcreteDataType::int32_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::uint8_datatype(),
ConcreteDataType::uint16_datatype(),
ConcreteDataType::uint32_datatype(),
ConcreteDataType::uint64_datatype(),
] {
for coord_type in &[DataType::Float32, DataType::Float64] {
for resolution_type in INTEGERS {
signatures.push(TypeSignature::Exact(vec![
// latitude
coord_type.clone(),

View File

@@ -13,12 +13,14 @@
// limitations under the License.
use std::str::FromStr;
use std::sync::LazyLock;
use common_error::ext::{BoxedError, PlainError};
use common_error::status_code::StatusCode;
use common_query::error::{self, Result};
use common_query::prelude::{Signature, TypeSignature};
use datafusion::logical_expr::Volatility;
use datafusion_expr::type_coercion::aggregates::INTEGERS;
use datafusion_expr::{Signature, TypeSignature, Volatility};
use datatypes::arrow::datatypes::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::{Scalar, ScalarVectorBuilder};
use datatypes::value::{ListValue, Value};
@@ -28,63 +30,22 @@ use datatypes::vectors::{
};
use derive_more::Display;
use h3o::{CellIndex, LatLng, Resolution};
use once_cell::sync::Lazy;
use snafu::ResultExt;
use crate::function::{Function, FunctionContext};
use crate::scalars::geo::helpers::{ensure_and_coerce, ensure_columns_len, ensure_columns_n};
static CELL_TYPES: Lazy<Vec<ConcreteDataType>> = Lazy::new(|| {
vec![
ConcreteDataType::int64_datatype(),
ConcreteDataType::uint64_datatype(),
ConcreteDataType::string_datatype(),
]
});
static CELL_TYPES: LazyLock<Vec<DataType>> =
LazyLock::new(|| vec![DataType::Int64, DataType::UInt64, DataType::Utf8]);
static COORDINATE_TYPES: Lazy<Vec<ConcreteDataType>> = Lazy::new(|| {
vec![
ConcreteDataType::float32_datatype(),
ConcreteDataType::float64_datatype(),
]
});
static RESOLUTION_TYPES: Lazy<Vec<ConcreteDataType>> = Lazy::new(|| {
vec![
ConcreteDataType::int8_datatype(),
ConcreteDataType::int16_datatype(),
ConcreteDataType::int32_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::uint8_datatype(),
ConcreteDataType::uint16_datatype(),
ConcreteDataType::uint32_datatype(),
ConcreteDataType::uint64_datatype(),
]
});
static DISTANCE_TYPES: Lazy<Vec<ConcreteDataType>> = Lazy::new(|| {
vec![
ConcreteDataType::int8_datatype(),
ConcreteDataType::int16_datatype(),
ConcreteDataType::int32_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::uint8_datatype(),
ConcreteDataType::uint16_datatype(),
ConcreteDataType::uint32_datatype(),
ConcreteDataType::uint64_datatype(),
]
});
static COORDINATE_TYPES: LazyLock<Vec<DataType>> =
LazyLock::new(|| vec![DataType::Float32, DataType::Float64]);
static POSITION_TYPES: Lazy<Vec<ConcreteDataType>> = Lazy::new(|| {
vec![
ConcreteDataType::int8_datatype(),
ConcreteDataType::int16_datatype(),
ConcreteDataType::int32_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::uint8_datatype(),
ConcreteDataType::uint16_datatype(),
ConcreteDataType::uint32_datatype(),
ConcreteDataType::uint64_datatype(),
]
});
static RESOLUTION_TYPES: &[DataType] = INTEGERS;
static DISTANCE_TYPES: &[DataType] = INTEGERS;
static POSITION_TYPES: &[DataType] = INTEGERS;
/// Function that returns [h3] encoding cellid for a given geospatial coordinate.
///
@@ -105,7 +66,7 @@ impl Function for H3LatLngToCell {
fn signature(&self) -> Signature {
let mut signatures = Vec::new();
for coord_type in COORDINATE_TYPES.as_slice() {
for resolution_type in RESOLUTION_TYPES.as_slice() {
for resolution_type in RESOLUTION_TYPES {
signatures.push(TypeSignature::Exact(vec![
// latitude
coord_type.clone(),
@@ -177,7 +138,7 @@ impl Function for H3LatLngToCellString {
fn signature(&self) -> Signature {
let mut signatures = Vec::new();
for coord_type in COORDINATE_TYPES.as_slice() {
for resolution_type in RESOLUTION_TYPES.as_slice() {
for resolution_type in RESOLUTION_TYPES {
signatures.push(TypeSignature::Exact(vec![
// latitude
coord_type.clone(),
@@ -279,10 +240,7 @@ impl Function for H3StringToCell {
}
fn signature(&self) -> Signature {
Signature::new(
TypeSignature::Exact(vec![ConcreteDataType::string_datatype()]),
Volatility::Stable,
)
Signature::string(1, Volatility::Stable)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
@@ -692,9 +650,9 @@ impl Function for H3ChildPosToCell {
fn signature(&self) -> Signature {
let mut signatures =
Vec::with_capacity(POSITION_TYPES.len() * CELL_TYPES.len() * RESOLUTION_TYPES.len());
for position_type in POSITION_TYPES.as_slice() {
for position_type in POSITION_TYPES {
for cell_type in CELL_TYPES.as_slice() {
for resolution_type in RESOLUTION_TYPES.as_slice() {
for resolution_type in RESOLUTION_TYPES {
signatures.push(TypeSignature::Exact(vec![
position_type.clone(),
cell_type.clone(),
@@ -969,10 +927,10 @@ impl Function for H3CellContains {
fn signature(&self) -> Signature {
let multi_cell_types = vec![
ConcreteDataType::list_datatype(ConcreteDataType::int64_datatype()),
ConcreteDataType::list_datatype(ConcreteDataType::uint64_datatype()),
ConcreteDataType::list_datatype(ConcreteDataType::string_datatype()),
ConcreteDataType::string_datatype(),
DataType::new_list(DataType::Int64, true),
DataType::new_list(DataType::UInt64, true),
DataType::new_list(DataType::Utf8, true),
DataType::Utf8,
];
let mut signatures = Vec::with_capacity(multi_cell_types.len() * CELL_TYPES.len());
@@ -1204,7 +1162,7 @@ fn signature_of_double_cells() -> Signature {
fn signature_of_cell_and_resolution() -> Signature {
let mut signatures = Vec::with_capacity(CELL_TYPES.len() * RESOLUTION_TYPES.len());
for cell_type in CELL_TYPES.as_slice() {
for resolution_type in RESOLUTION_TYPES.as_slice() {
for resolution_type in RESOLUTION_TYPES {
signatures.push(TypeSignature::Exact(vec![
cell_type.clone(),
resolution_type.clone(),
@@ -1217,7 +1175,7 @@ fn signature_of_cell_and_resolution() -> Signature {
fn signature_of_cell_and_distance() -> Signature {
let mut signatures = Vec::with_capacity(CELL_TYPES.len() * DISTANCE_TYPES.len());
for cell_type in CELL_TYPES.as_slice() {
for distance_type in DISTANCE_TYPES.as_slice() {
for distance_type in DISTANCE_TYPES {
signatures.push(TypeSignature::Exact(vec![
cell_type.clone(),
distance_type.clone(),

View File

@@ -15,8 +15,7 @@
use common_error::ext::{BoxedError, PlainError};
use common_error::status_code::StatusCode;
use common_query::error::{self, Result};
use common_query::prelude::{Signature, TypeSignature};
use datafusion::logical_expr::Volatility;
use datafusion_expr::{Signature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{Float64VectorBuilder, MutableVector, VectorRef};
@@ -45,13 +44,7 @@ impl Function for STDistance {
}
fn signature(&self) -> Signature {
Signature::new(
TypeSignature::Exact(vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::string_datatype(),
]),
Volatility::Stable,
)
Signature::string(2, Volatility::Stable)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
@@ -99,13 +92,7 @@ impl Function for STDistanceSphere {
}
fn signature(&self) -> Signature {
Signature::new(
TypeSignature::Exact(vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::string_datatype(),
]),
Volatility::Stable,
)
Signature::string(2, Volatility::Stable)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
@@ -163,10 +150,7 @@ impl Function for STArea {
}
fn signature(&self) -> Signature {
Signature::new(
TypeSignature::Exact(vec![ConcreteDataType::string_datatype()]),
Volatility::Stable,
)
Signature::string(1, Volatility::Stable)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {

View File

@@ -13,8 +13,7 @@
// limitations under the License.
use common_query::error::Result;
use common_query::prelude::{Signature, TypeSignature};
use datafusion::logical_expr::Volatility;
use datafusion_expr::{Signature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{BooleanVectorBuilder, MutableVector, VectorRef};
@@ -42,13 +41,7 @@ impl Function for STContains {
}
fn signature(&self) -> Signature {
Signature::new(
TypeSignature::Exact(vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::string_datatype(),
]),
Volatility::Stable,
)
Signature::string(2, Volatility::Stable)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
@@ -96,13 +89,7 @@ impl Function for STWithin {
}
fn signature(&self) -> Signature {
Signature::new(
TypeSignature::Exact(vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::string_datatype(),
]),
Volatility::Stable,
)
Signature::string(2, Volatility::Stable)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
@@ -150,13 +137,7 @@ impl Function for STIntersects {
}
fn signature(&self) -> Signature {
Signature::new(
TypeSignature::Exact(vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::string_datatype(),
]),
Volatility::Stable,
)
Signature::string(2, Volatility::Stable)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {

View File

@@ -12,15 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::LazyLock;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, TypeSignature};
use datafusion::logical_expr::Volatility;
use datafusion_expr::{Signature, TypeSignature, Volatility};
use datatypes::arrow::datatypes::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::value::Value;
use datatypes::vectors::{MutableVector, StringVectorBuilder, UInt64VectorBuilder, VectorRef};
use derive_more::Display;
use once_cell::sync::Lazy;
use s2::cellid::{CellID, MAX_LEVEL};
use s2::latlng::LatLng;
use snafu::ensure;
@@ -28,32 +29,13 @@ use snafu::ensure;
use crate::function::{Function, FunctionContext};
use crate::scalars::geo::helpers::{ensure_and_coerce, ensure_columns_len, ensure_columns_n};
static CELL_TYPES: Lazy<Vec<ConcreteDataType>> = Lazy::new(|| {
vec![
ConcreteDataType::int64_datatype(),
ConcreteDataType::uint64_datatype(),
]
});
static CELL_TYPES: LazyLock<Vec<DataType>> =
LazyLock::new(|| vec![DataType::Int64, DataType::UInt64]);
static COORDINATE_TYPES: Lazy<Vec<ConcreteDataType>> = Lazy::new(|| {
vec![
ConcreteDataType::float32_datatype(),
ConcreteDataType::float64_datatype(),
]
});
static COORDINATE_TYPES: LazyLock<Vec<DataType>> =
LazyLock::new(|| vec![DataType::Float32, DataType::Float64]);
static LEVEL_TYPES: Lazy<Vec<ConcreteDataType>> = Lazy::new(|| {
vec![
ConcreteDataType::int8_datatype(),
ConcreteDataType::int16_datatype(),
ConcreteDataType::int32_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::uint8_datatype(),
ConcreteDataType::uint16_datatype(),
ConcreteDataType::uint32_datatype(),
ConcreteDataType::uint64_datatype(),
]
});
static LEVEL_TYPES: &[DataType] = datafusion_expr::type_coercion::aggregates::INTEGERS;
/// Function that returns [s2] encoding cellid for a given geospatial coordinate.
///
@@ -242,7 +224,7 @@ fn signature_of_cell() -> Signature {
fn signature_of_cell_and_level() -> Signature {
let mut signatures = Vec::with_capacity(CELL_TYPES.len() * LEVEL_TYPES.len());
for cell_type in CELL_TYPES.as_slice() {
for level_type in LEVEL_TYPES.as_slice() {
for level_type in LEVEL_TYPES {
signatures.push(TypeSignature::Exact(vec![
cell_type.clone(),
level_type.clone(),

View File

@@ -12,29 +12,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::LazyLock;
use common_error::ext::{BoxedError, PlainError};
use common_error::status_code::StatusCode;
use common_query::error::{self, Result};
use common_query::prelude::{Signature, TypeSignature};
use datafusion::logical_expr::Volatility;
use datafusion_expr::{Signature, TypeSignature, Volatility};
use datatypes::arrow::datatypes::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{MutableVector, StringVectorBuilder, VectorRef};
use derive_more::Display;
use geo_types::{Geometry, Point};
use once_cell::sync::Lazy;
use snafu::ResultExt;
use wkt::{ToWkt, TryFromWkt};
use crate::function::{Function, FunctionContext};
use crate::scalars::geo::helpers::{ensure_columns_len, ensure_columns_n};
static COORDINATE_TYPES: Lazy<Vec<ConcreteDataType>> = Lazy::new(|| {
vec![
ConcreteDataType::float32_datatype(),
ConcreteDataType::float64_datatype(),
]
});
static COORDINATE_TYPES: LazyLock<Vec<DataType>> =
LazyLock::new(|| vec![DataType::Float32, DataType::Float64]);
/// Return WGS84(SRID: 4326) euclidean distance between two geometry object, in degree
#[derive(Clone, Debug, Default, Display)]

View File

@@ -18,7 +18,8 @@ use std::fmt;
use std::fmt::Display;
use common_query::error::{DowncastVectorSnafu, InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, Volatility};
use datafusion_expr::{Signature, Volatility};
use datatypes::arrow::datatypes::DataType;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::Vector;
use datatypes::scalars::{ScalarVector, ScalarVectorBuilder};
@@ -64,10 +65,7 @@ impl Function for HllCalcFunction {
fn signature(&self) -> Signature {
// Only argument: HyperLogLogPlus state (binary)
Signature::exact(
vec![ConcreteDataType::binary_datatype()],
Volatility::Immutable,
)
Signature::exact(vec![DataType::Binary], Volatility::Immutable)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {

View File

@@ -16,8 +16,9 @@ use std::net::{Ipv4Addr, Ipv6Addr};
use std::str::FromStr;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, TypeSignature};
use datafusion::logical_expr::Volatility;
use datafusion_common::types;
use datafusion_expr::{Coercion, Signature, TypeSignature, TypeSignatureClass, Volatility};
use datatypes::arrow::datatypes::DataType;
use datatypes::prelude::{ConcreteDataType, Value};
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{MutableVector, StringVectorBuilder, VectorRef};
@@ -51,10 +52,10 @@ impl Function for Ipv4ToCidr {
fn signature(&self) -> Signature {
Signature::one_of(
vec![
TypeSignature::Exact(vec![ConcreteDataType::string_datatype()]),
TypeSignature::Exact(vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::uint8_datatype(),
TypeSignature::String(1),
TypeSignature::Coercible(vec![
Coercion::new_exact(TypeSignatureClass::Native(types::logical_string())),
Coercion::new_exact(TypeSignatureClass::Integer),
]),
],
Volatility::Immutable,
@@ -180,11 +181,8 @@ impl Function for Ipv6ToCidr {
fn signature(&self) -> Signature {
Signature::one_of(
vec![
TypeSignature::Exact(vec![ConcreteDataType::string_datatype()]),
TypeSignature::Exact(vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::uint8_datatype(),
]),
TypeSignature::String(1),
TypeSignature::Exact(vec![DataType::Utf8, DataType::UInt8]),
],
Volatility::Immutable,
)

View File

@@ -16,8 +16,8 @@ use std::net::Ipv4Addr;
use std::str::FromStr;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, TypeSignature};
use datafusion::logical_expr::Volatility;
use datafusion_expr::{Signature, TypeSignature, Volatility};
use datatypes::arrow::datatypes::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{MutableVector, StringVectorBuilder, UInt32VectorBuilder, VectorRef};
@@ -49,7 +49,7 @@ impl Function for Ipv4NumToString {
fn signature(&self) -> Signature {
Signature::new(
TypeSignature::Exact(vec![ConcreteDataType::uint32_datatype()]),
TypeSignature::Exact(vec![DataType::UInt32]),
Volatility::Immutable,
)
}
@@ -107,10 +107,7 @@ impl Function for Ipv4StringToNum {
}
fn signature(&self) -> Signature {
Signature::new(
TypeSignature::Exact(vec![ConcreteDataType::string_datatype()]),
Volatility::Immutable,
)
Signature::string(1, Volatility::Immutable)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {

View File

@@ -16,8 +16,7 @@ use std::net::{Ipv4Addr, Ipv6Addr};
use std::str::FromStr;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, TypeSignature};
use datafusion::logical_expr::Volatility;
use datafusion_expr::{Signature, Volatility};
use datatypes::prelude::{ConcreteDataType, Value};
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{BinaryVectorBuilder, MutableVector, StringVectorBuilder, VectorRef};
@@ -45,10 +44,7 @@ impl Function for Ipv6NumToString {
}
fn signature(&self) -> Signature {
Signature::new(
TypeSignature::Exact(vec![ConcreteDataType::string_datatype()]),
Volatility::Immutable,
)
Signature::string(1, Volatility::Immutable)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
@@ -137,10 +133,7 @@ impl Function for Ipv6StringToNum {
}
fn signature(&self) -> Signature {
Signature::new(
TypeSignature::Exact(vec![ConcreteDataType::string_datatype()]),
Volatility::Immutable,
)
Signature::string(1, Volatility::Immutable)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {

View File

@@ -16,8 +16,7 @@ use std::net::{Ipv4Addr, Ipv6Addr};
use std::str::FromStr;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, TypeSignature};
use datafusion::logical_expr::Volatility;
use datafusion_expr::{Signature, Volatility};
use datatypes::prelude::{ConcreteDataType, Value};
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{BooleanVectorBuilder, MutableVector, VectorRef};
@@ -49,13 +48,7 @@ impl Function for Ipv4InRange {
}
fn signature(&self) -> Signature {
Signature::new(
TypeSignature::Exact(vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::string_datatype(),
]),
Volatility::Immutable,
)
Signature::string(2, Volatility::Immutable)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
@@ -144,13 +137,7 @@ impl Function for Ipv6InRange {
}
fn signature(&self) -> Signature {
Signature::new(
TypeSignature::Exact(vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::string_datatype(),
]),
Volatility::Immutable,
)
Signature::string(2, Volatility::Immutable)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {

View File

@@ -15,8 +15,8 @@
use std::fmt::{self, Display};
use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu};
use common_query::prelude::Signature;
use datafusion::logical_expr::Volatility;
use datafusion_expr::{Signature, Volatility};
use datatypes::arrow::datatypes::DataType;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::VectorRef;
use datatypes::scalars::ScalarVectorBuilder;
@@ -63,11 +63,9 @@ macro_rules! json_get {
}
fn signature(&self) -> Signature {
// TODO(LFC): Use a more clear type here instead of "Binary" for Json input, once we have a "Json" type.
Signature::exact(
vec![
ConcreteDataType::json_datatype(),
ConcreteDataType::string_datatype(),
],
vec![DataType::Binary, DataType::Utf8],
Volatility::Immutable,
)
}
@@ -166,11 +164,9 @@ impl Function for JsonGetString {
}
fn signature(&self) -> Signature {
// TODO(LFC): Use a more clear type here instead of "Binary" for Json input, once we have a "Json" type.
Signature::exact(
vec![
ConcreteDataType::json_datatype(),
ConcreteDataType::string_datatype(),
],
vec![DataType::Binary, DataType::Utf8],
Volatility::Immutable,
)
}
@@ -234,7 +230,7 @@ impl Display for JsonGetString {
mod tests {
use std::sync::Arc;
use common_query::prelude::TypeSignature;
use datafusion_expr::TypeSignature;
use datatypes::scalars::ScalarVector;
use datatypes::vectors::{BinaryVector, StringVector};
@@ -259,7 +255,7 @@ mod tests {
Signature {
type_signature: TypeSignature::Exact(valid_types),
volatility: Volatility::Immutable
} if valid_types == vec![ConcreteDataType::json_datatype(), ConcreteDataType::string_datatype()]
} if valid_types == vec![DataType::Binary, DataType::Utf8]
));
let json_strings = [
@@ -312,7 +308,7 @@ mod tests {
Signature {
type_signature: TypeSignature::Exact(valid_types),
volatility: Volatility::Immutable
} if valid_types == vec![ConcreteDataType::json_datatype(), ConcreteDataType::string_datatype()]
} if valid_types == vec![DataType::Binary, DataType::Utf8]
));
let json_strings = [
@@ -365,7 +361,7 @@ mod tests {
Signature {
type_signature: TypeSignature::Exact(valid_types),
volatility: Volatility::Immutable
} if valid_types == vec![ConcreteDataType::json_datatype(), ConcreteDataType::string_datatype()]
} if valid_types == vec![DataType::Binary, DataType::Utf8]
));
let json_strings = [
@@ -418,7 +414,7 @@ mod tests {
Signature {
type_signature: TypeSignature::Exact(valid_types),
volatility: Volatility::Immutable
} if valid_types == vec![ConcreteDataType::json_datatype(), ConcreteDataType::string_datatype()]
} if valid_types == vec![DataType::Binary, DataType::Utf8]
));
let json_strings = [

View File

@@ -15,8 +15,8 @@
use std::fmt::{self, Display};
use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu};
use common_query::prelude::Signature;
use datafusion::logical_expr::Volatility;
use datafusion_expr::{Signature, Volatility};
use datatypes::arrow::datatypes::DataType;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::VectorRef;
use datatypes::scalars::ScalarVectorBuilder;
@@ -42,7 +42,8 @@ macro_rules! json_is {
}
fn signature(&self) -> Signature {
Signature::exact(vec![ConcreteDataType::json_datatype()], Volatility::Immutable)
// TODO(LFC): Use a more clear type here instead of "Binary" for Json input, once we have a "Json" type.
Signature::exact(vec![DataType::Binary], Volatility::Immutable)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
@@ -166,10 +167,7 @@ mod tests {
);
assert_eq!(
func.signature(),
Signature::exact(
vec![ConcreteDataType::json_datatype()],
Volatility::Immutable
)
Signature::exact(vec![DataType::Binary], Volatility::Immutable)
);
}

View File

@@ -15,8 +15,8 @@
use std::fmt::{self, Display};
use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu};
use common_query::prelude::{Signature, TypeSignature};
use datafusion::logical_expr::Volatility;
use datafusion_expr::{Signature, TypeSignature, Volatility};
use datatypes::arrow::datatypes::DataType;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::VectorRef;
use datatypes::scalars::ScalarVectorBuilder;
@@ -41,24 +41,13 @@ impl Function for JsonPathExistsFunction {
}
fn signature(&self) -> Signature {
// TODO(LFC): Use a more clear type here instead of "Binary" for Json input, once we have a "Json" type.
Signature::one_of(
vec![
TypeSignature::Exact(vec![
ConcreteDataType::json_datatype(),
ConcreteDataType::string_datatype(),
]),
TypeSignature::Exact(vec![
ConcreteDataType::null_datatype(),
ConcreteDataType::string_datatype(),
]),
TypeSignature::Exact(vec![
ConcreteDataType::json_datatype(),
ConcreteDataType::null_datatype(),
]),
TypeSignature::Exact(vec![
ConcreteDataType::null_datatype(),
ConcreteDataType::null_datatype(),
]),
TypeSignature::Exact(vec![DataType::Binary, DataType::Utf8]),
TypeSignature::Exact(vec![DataType::Null, DataType::Utf8]),
TypeSignature::Exact(vec![DataType::Binary, DataType::Null]),
TypeSignature::Exact(vec![DataType::Null, DataType::Null]),
],
Volatility::Immutable,
)
@@ -134,7 +123,6 @@ impl Display for JsonPathExistsFunction {
mod tests {
use std::sync::Arc;
use common_query::prelude::TypeSignature;
use datatypes::prelude::ScalarVector;
use datatypes::vectors::{BinaryVector, NullVector, StringVector};
@@ -159,20 +147,20 @@ mod tests {
} if valid_types ==
vec![
TypeSignature::Exact(vec![
ConcreteDataType::json_datatype(),
ConcreteDataType::string_datatype(),
DataType::Binary,
DataType::Utf8,
]),
TypeSignature::Exact(vec![
ConcreteDataType::null_datatype(),
ConcreteDataType::string_datatype(),
DataType::Null,
DataType::Utf8,
]),
TypeSignature::Exact(vec![
ConcreteDataType::json_datatype(),
ConcreteDataType::null_datatype(),
DataType::Binary,
DataType::Null,
]),
TypeSignature::Exact(vec![
ConcreteDataType::null_datatype(),
ConcreteDataType::null_datatype(),
DataType::Null,
DataType::Null,
]),
],
));

View File

@@ -15,8 +15,8 @@
use std::fmt::{self, Display};
use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu};
use common_query::prelude::Signature;
use datafusion::logical_expr::Volatility;
use datafusion_expr::{Signature, Volatility};
use datatypes::arrow::datatypes::DataType;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::VectorRef;
use datatypes::scalars::ScalarVectorBuilder;
@@ -41,11 +41,9 @@ impl Function for JsonPathMatchFunction {
}
fn signature(&self) -> Signature {
// TODO(LFC): Use a more clear type here instead of "Binary" for Json input, once we have a "Json" type.
Signature::exact(
vec![
ConcreteDataType::json_datatype(),
ConcreteDataType::string_datatype(),
],
vec![DataType::Binary, DataType::Utf8],
Volatility::Immutable,
)
}
@@ -117,7 +115,7 @@ impl Display for JsonPathMatchFunction {
mod tests {
use std::sync::Arc;
use common_query::prelude::TypeSignature;
use datafusion_expr::TypeSignature;
use datatypes::vectors::{BinaryVector, StringVector};
use super::*;
@@ -138,7 +136,7 @@ mod tests {
Signature {
type_signature: TypeSignature::Exact(valid_types),
volatility: Volatility::Immutable
} if valid_types == vec![ConcreteDataType::json_datatype(), ConcreteDataType::string_datatype()],
} if valid_types == vec![DataType::Binary, DataType::Utf8],
));
let json_strings = [

View File

@@ -15,8 +15,8 @@
use std::fmt::{self, Display};
use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu};
use common_query::prelude::Signature;
use datafusion::logical_expr::Volatility;
use datafusion_expr::{Signature, Volatility};
use datatypes::arrow::datatypes::DataType;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::VectorRef;
use datatypes::scalars::ScalarVectorBuilder;
@@ -41,10 +41,8 @@ impl Function for JsonToStringFunction {
}
fn signature(&self) -> Signature {
Signature::exact(
vec![ConcreteDataType::json_datatype()],
Volatility::Immutable,
)
// TODO(LFC): Use a more clear type here instead of "Binary" for Json input, once we have a "Json" type.
Signature::exact(vec![DataType::Binary], Volatility::Immutable)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
@@ -112,7 +110,7 @@ impl Display for JsonToStringFunction {
mod tests {
use std::sync::Arc;
use common_query::prelude::TypeSignature;
use datafusion_expr::TypeSignature;
use datatypes::scalars::ScalarVector;
use datatypes::vectors::BinaryVector;
@@ -134,7 +132,7 @@ mod tests {
Signature {
type_signature: TypeSignature::Exact(valid_types),
volatility: Volatility::Immutable
} if valid_types == vec![ConcreteDataType::json_datatype()]
} if valid_types == vec![DataType::Binary]
));
let json_strings = [

View File

@@ -15,8 +15,7 @@
use std::fmt::{self, Display};
use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu};
use common_query::prelude::Signature;
use datafusion::logical_expr::Volatility;
use datafusion_expr::{Signature, Volatility};
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::VectorRef;
use datatypes::scalars::ScalarVectorBuilder;
@@ -41,10 +40,7 @@ impl Function for ParseJsonFunction {
}
fn signature(&self) -> Signature {
Signature::exact(
vec![ConcreteDataType::string_datatype()],
Volatility::Immutable,
)
Signature::string(1, Volatility::Immutable)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
@@ -111,7 +107,6 @@ impl Display for ParseJsonFunction {
mod tests {
use std::sync::Arc;
use common_query::prelude::TypeSignature;
use datatypes::scalars::ScalarVector;
use datatypes::vectors::StringVector;
@@ -129,13 +124,6 @@ mod tests {
.unwrap()
);
assert!(matches!(parse_json.signature(),
Signature {
type_signature: TypeSignature::Exact(valid_types),
volatility: Volatility::Immutable
} if valid_types == vec![ConcreteDataType::string_datatype()]
));
let json_strings = [
r#"{"a": {"b": 2}, "b": 2, "c": 3}"#,
r#"{"a": 4, "b": {"c": 6}, "c": 6}"#,

View File

@@ -24,6 +24,7 @@ use datafusion::common::{DFSchema, Result as DfResult};
use datafusion::execution::SessionStateBuilder;
use datafusion::logical_expr::{self, Expr, Volatility};
use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
use datafusion_expr::Signature;
use datatypes::arrow::array::RecordBatch;
use datatypes::arrow::datatypes::{DataType, Field};
use datatypes::prelude::VectorRef;
@@ -61,14 +62,8 @@ impl Function for MatchesFunction {
Ok(ConcreteDataType::boolean_datatype())
}
fn signature(&self) -> common_query::prelude::Signature {
common_query::prelude::Signature::exact(
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::string_datatype(),
],
Volatility::Immutable,
)
fn signature(&self) -> Signature {
Signature::string(2, Volatility::Immutable)
}
// TODO: read case-sensitive config

View File

@@ -17,7 +17,7 @@ use std::iter::repeat_n;
use std::sync::Arc;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::Volatility;
use datafusion_expr::{Signature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{BooleanVector, BooleanVectorBuilder, MutableVector, VectorRef};
@@ -96,14 +96,8 @@ impl Function for MatchesTermFunction {
Ok(ConcreteDataType::boolean_datatype())
}
fn signature(&self) -> common_query::prelude::Signature {
common_query::prelude::Signature::exact(
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::string_datatype(),
],
Volatility::Immutable,
)
fn signature(&self) -> Signature {
Signature::string(2, Volatility::Immutable)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {

View File

@@ -20,9 +20,8 @@ use std::fmt;
pub use clamp::{ClampFunction, ClampMaxFunction, ClampMinFunction};
use common_query::error::{GeneralDataFusionSnafu, Result};
use common_query::prelude::Signature;
use datafusion::error::DataFusionError;
use datafusion::logical_expr::Volatility;
use datafusion_expr::{Signature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::vectors::VectorRef;
pub use rate::RateFunction;

View File

@@ -16,9 +16,10 @@ use std::fmt::{self, Display};
use std::sync::Arc;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::Signature;
use datafusion::arrow::array::{ArrayIter, PrimitiveArray};
use datafusion::logical_expr::Volatility;
use datafusion_expr::type_coercion::aggregates::NUMERICS;
use datafusion_expr::Signature;
use datatypes::data_type::{ConcreteDataType, DataType};
use datatypes::prelude::VectorRef;
use datatypes::types::LogicalPrimitiveType;
@@ -73,7 +74,7 @@ impl Function for ClampFunction {
fn signature(&self) -> Signature {
// input, min, max
Signature::uniform(3, ConcreteDataType::numerics(), Volatility::Immutable)
Signature::uniform(3, NUMERICS.to_vec(), Volatility::Immutable)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
@@ -192,7 +193,7 @@ impl Function for ClampMinFunction {
fn signature(&self) -> Signature {
// input, min
Signature::uniform(2, ConcreteDataType::numerics(), Volatility::Immutable)
Signature::uniform(2, NUMERICS.to_vec(), Volatility::Immutable)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
@@ -273,7 +274,7 @@ impl Function for ClampMaxFunction {
fn signature(&self) -> Signature {
// input, max
Signature::uniform(2, ConcreteDataType::numerics(), Volatility::Immutable)
Signature::uniform(2, NUMERICS.to_vec(), Volatility::Immutable)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {

View File

@@ -17,7 +17,8 @@ use std::fmt::Display;
use common_query::error;
use common_query::error::{ArrowComputeSnafu, InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, Volatility};
use datafusion_expr::type_coercion::aggregates::NUMERICS;
use datafusion_expr::{Signature, Volatility};
use datatypes::arrow::compute;
use datatypes::arrow::compute::kernels::numeric;
use datatypes::arrow::datatypes::DataType as ArrowDataType;
@@ -55,7 +56,7 @@ impl Function for ModuloFunction {
}
fn signature(&self) -> Signature {
Signature::uniform(2, ConcreteDataType::numerics(), Volatility::Immutable)
Signature::uniform(2, NUMERICS.to_vec(), Volatility::Immutable)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {

View File

@@ -15,8 +15,9 @@
use std::fmt;
use common_query::error::{self, Result};
use common_query::prelude::{Signature, Volatility};
use datafusion::arrow::compute::kernels::numeric;
use datafusion_expr::type_coercion::aggregates::NUMERICS;
use datafusion_expr::{Signature, Volatility};
use datatypes::arrow::compute::kernels::cast;
use datatypes::arrow::datatypes::DataType;
use datatypes::prelude::*;
@@ -45,7 +46,7 @@ impl Function for RateFunction {
}
fn signature(&self) -> Signature {
Signature::uniform(2, ConcreteDataType::numerics(), Volatility::Immutable)
Signature::uniform(2, NUMERICS.to_vec(), Volatility::Immutable)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
@@ -75,7 +76,7 @@ impl Function for RateFunction {
mod tests {
use std::sync::Arc;
use common_query::prelude::TypeSignature;
use datafusion_expr::TypeSignature;
use datatypes::vectors::{Float32Vector, Float64Vector, Int64Vector};
use super::*;
@@ -91,7 +92,7 @@ mod tests {
Signature {
type_signature: TypeSignature::Uniform(2, valid_types),
volatility: Volatility::Immutable
} if valid_types == ConcreteDataType::numerics()
} if valid_types == NUMERICS
));
let values = vec![1.0, 3.0, 6.0];
let ts = vec![0, 1, 2];

View File

@@ -16,7 +16,8 @@ use std::fmt;
use std::sync::Arc;
use common_query::error::Result;
use common_query::prelude::{Signature, Volatility};
use datafusion_expr::{Signature, Volatility};
use datatypes::arrow::datatypes::DataType;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::VectorRef;
@@ -37,10 +38,7 @@ impl Function for TestAndFunction {
fn signature(&self) -> Signature {
Signature::exact(
vec![
ConcreteDataType::boolean_datatype(),
ConcreteDataType::boolean_datatype(),
],
vec![DataType::Boolean, DataType::Boolean],
Volatility::Immutable,
)
}

View File

@@ -16,8 +16,9 @@ use std::fmt;
use std::sync::Arc;
use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu};
use common_query::prelude::{Signature, Volatility};
use common_time::{Date, Timestamp};
use datafusion_expr::{Signature, Volatility};
use datatypes::arrow::datatypes::{DataType, TimeUnit};
use datatypes::prelude::ConcreteDataType;
use datatypes::vectors::{Int64Vector, VectorRef};
use snafu::ensure;
@@ -68,14 +69,14 @@ impl Function for ToUnixtimeFunction {
Signature::uniform(
1,
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::int32_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::date_datatype(),
ConcreteDataType::timestamp_second_datatype(),
ConcreteDataType::timestamp_millisecond_datatype(),
ConcreteDataType::timestamp_microsecond_datatype(),
ConcreteDataType::timestamp_nanosecond_datatype(),
DataType::Utf8,
DataType::Int32,
DataType::Int64,
DataType::Date32,
DataType::Timestamp(TimeUnit::Second, None),
DataType::Timestamp(TimeUnit::Millisecond, None),
DataType::Timestamp(TimeUnit::Microsecond, None),
DataType::Timestamp(TimeUnit::Nanosecond, None),
],
Volatility::Immutable,
)
@@ -129,7 +130,7 @@ impl fmt::Display for ToUnixtimeFunction {
#[cfg(test)]
mod tests {
use common_query::prelude::TypeSignature;
use datafusion_expr::TypeSignature;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::Value;
use datatypes::vectors::{
@@ -152,14 +153,14 @@ mod tests {
type_signature: TypeSignature::Uniform(1, valid_types),
volatility: Volatility::Immutable
} if valid_types == vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::int32_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::date_datatype(),
ConcreteDataType::timestamp_second_datatype(),
ConcreteDataType::timestamp_millisecond_datatype(),
ConcreteDataType::timestamp_microsecond_datatype(),
ConcreteDataType::timestamp_nanosecond_datatype(),
DataType::Utf8,
DataType::Int32,
DataType::Int64,
DataType::Date32,
DataType::Timestamp(TimeUnit::Second, None),
DataType::Timestamp(TimeUnit::Millisecond, None),
DataType::Timestamp(TimeUnit::Microsecond, None),
DataType::Timestamp(TimeUnit::Nanosecond, None),
]
));

View File

@@ -18,7 +18,8 @@ use std::fmt;
use std::fmt::Display;
use common_query::error::{DowncastVectorSnafu, InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, Volatility};
use datafusion_expr::{Signature, Volatility};
use datatypes::arrow::datatypes::DataType;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::Vector;
use datatypes::scalars::{ScalarVector, ScalarVectorBuilder};
@@ -66,10 +67,7 @@ impl Function for UddSketchCalcFunction {
// First argument: percentile (float64)
// Second argument: UDDSketch state (binary)
Signature::exact(
vec![
ConcreteDataType::float64_datatype(),
ConcreteDataType::binary_datatype(),
],
vec![DataType::Float64, DataType::Binary],
Volatility::Immutable,
)
}

View File

@@ -89,7 +89,7 @@ pub fn create_udf(
query_ctx: QueryContextRef,
state: Arc<FunctionState>,
) -> ScalarUDF {
let signature = func.signature().into();
let signature = func.signature();
let udf = ScalarUdf {
function: func,
signature,
@@ -139,7 +139,7 @@ mod tests {
let udf = create_udf(f.clone(), query_ctx, Arc::new(FunctionState::default()));
assert_eq!("test_and", udf.name());
let expected_signature: datafusion_expr::Signature = f.signature().into();
let expected_signature: datafusion_expr::Signature = f.signature();
assert_eq!(udf.signature(), &expected_signature);
assert_eq!(
ConcreteDataType::boolean_datatype(),

View File

@@ -15,7 +15,7 @@
use std::fmt::Display;
use common_query::error::{InvalidFuncArgsSnafu, InvalidVectorStringSnafu, Result};
use common_query::prelude::{Signature, Volatility};
use datafusion_expr::{Signature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::types::parse_string_to_vector_type_value;
@@ -39,10 +39,7 @@ impl Function for ParseVectorFunction {
}
fn signature(&self) -> Signature {
Signature::exact(
vec![ConcreteDataType::string_datatype()],
Volatility::Immutable,
)
Signature::string(1, Volatility::Immutable)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {

View File

@@ -15,7 +15,8 @@
use std::fmt::Display;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, Volatility};
use datafusion_expr::type_coercion::aggregates::BINARYS;
use datafusion_expr::{Signature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::types::vector_type_value_to_string;
@@ -40,10 +41,7 @@ impl Function for VectorToStringFunction {
}
fn signature(&self) -> Signature {
Signature::exact(
vec![ConcreteDataType::binary_datatype()],
Volatility::Immutable,
)
Signature::uniform(1, BINARYS.to_vec(), Volatility::Immutable)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {

View File

@@ -20,7 +20,8 @@ use std::borrow::Cow;
use std::fmt::Display;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::Signature;
use datafusion_expr::Signature;
use datatypes::arrow::datatypes::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{Float32VectorBuilder, MutableVector, VectorRef};
@@ -49,14 +50,8 @@ macro_rules! define_distance_function {
fn signature(&self) -> Signature {
helper::one_of_sigs2(
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::binary_datatype(),
],
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::binary_datatype(),
],
vec![DataType::Utf8, DataType::Binary],
vec![DataType::Utf8, DataType::Binary],
)
}

View File

@@ -16,7 +16,8 @@ use std::borrow::Cow;
use std::fmt::Display;
use common_query::error::InvalidFuncArgsSnafu;
use common_query::prelude::{Signature, TypeSignature, Volatility};
use datafusion::logical_expr_common::type_coercion::aggregates::{BINARYS, STRINGS};
use datafusion_expr::{Signature, TypeSignature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{Float32VectorBuilder, MutableVector, VectorRef};
@@ -59,8 +60,8 @@ impl Function for ElemProductFunction {
fn signature(&self) -> Signature {
Signature::one_of(
vec![
TypeSignature::Exact(vec![ConcreteDataType::string_datatype()]),
TypeSignature::Exact(vec![ConcreteDataType::binary_datatype()]),
TypeSignature::Uniform(1, STRINGS.to_vec()),
TypeSignature::Uniform(1, BINARYS.to_vec()),
],
Volatility::Immutable,
)

View File

@@ -16,7 +16,8 @@ use std::borrow::Cow;
use std::fmt::Display;
use common_query::error::InvalidFuncArgsSnafu;
use common_query::prelude::{Signature, TypeSignature, Volatility};
use datafusion_expr::type_coercion::aggregates::{BINARYS, STRINGS};
use datafusion_expr::{Signature, TypeSignature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{Float32VectorBuilder, MutableVector, VectorRef};
@@ -46,8 +47,8 @@ impl Function for ElemSumFunction {
fn signature(&self) -> Signature {
Signature::one_of(
vec![
TypeSignature::Exact(vec![ConcreteDataType::string_datatype()]),
TypeSignature::Exact(vec![ConcreteDataType::binary_datatype()]),
TypeSignature::Uniform(1, STRINGS.to_vec()),
TypeSignature::Uniform(1, BINARYS.to_vec()),
],
Volatility::Immutable,
)

View File

@@ -16,7 +16,8 @@ use std::borrow::Cow;
use std::fmt::Display;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::Signature;
use datafusion_expr::Signature;
use datatypes::arrow::datatypes::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{BinaryVectorBuilder, MutableVector, VectorRef};
@@ -65,11 +66,8 @@ impl Function for ScalarAddFunction {
fn signature(&self) -> Signature {
helper::one_of_sigs2(
vec![ConcreteDataType::float64_datatype()],
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::binary_datatype(),
],
vec![DataType::Float64],
vec![DataType::Utf8, DataType::Binary],
)
}

View File

@@ -16,7 +16,8 @@ use std::borrow::Cow;
use std::fmt::Display;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::Signature;
use datafusion_expr::Signature;
use datatypes::arrow::datatypes::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{BinaryVectorBuilder, MutableVector, VectorRef};
@@ -65,11 +66,8 @@ impl Function for ScalarMulFunction {
fn signature(&self) -> Signature {
helper::one_of_sigs2(
vec![ConcreteDataType::float64_datatype()],
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::binary_datatype(),
],
vec![DataType::Float64],
vec![DataType::Utf8, DataType::Binary],
)
}

View File

@@ -16,7 +16,8 @@ use std::borrow::Cow;
use std::fmt::Display;
use common_query::error::InvalidFuncArgsSnafu;
use common_query::prelude::Signature;
use datafusion_expr::Signature;
use datatypes::arrow::datatypes::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{BinaryVectorBuilder, MutableVector, VectorRef};
@@ -59,14 +60,8 @@ impl Function for VectorAddFunction {
fn signature(&self) -> Signature {
helper::one_of_sigs2(
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::binary_datatype(),
],
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::binary_datatype(),
],
vec![DataType::Utf8, DataType::Binary],
vec![DataType::Utf8, DataType::Binary],
)
}

View File

@@ -16,7 +16,8 @@ use std::borrow::Cow;
use std::fmt::Display;
use common_query::error::InvalidFuncArgsSnafu;
use common_query::prelude::{Signature, TypeSignature, Volatility};
use datafusion::logical_expr_common::type_coercion::aggregates::{BINARYS, STRINGS};
use datafusion_expr::{Signature, TypeSignature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{MutableVector, UInt64VectorBuilder, VectorRef};
@@ -58,8 +59,8 @@ impl Function for VectorDimFunction {
fn signature(&self) -> Signature {
Signature::one_of(
vec![
TypeSignature::Exact(vec![ConcreteDataType::string_datatype()]),
TypeSignature::Exact(vec![ConcreteDataType::binary_datatype()]),
TypeSignature::Uniform(1, STRINGS.to_vec()),
TypeSignature::Uniform(1, BINARYS.to_vec()),
],
Volatility::Immutable,
)

View File

@@ -16,7 +16,8 @@ use std::borrow::Cow;
use std::fmt::Display;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::Signature;
use datafusion_expr::Signature;
use datatypes::arrow::datatypes::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{BinaryVectorBuilder, MutableVector, VectorRef};
@@ -57,14 +58,8 @@ impl Function for VectorDivFunction {
fn signature(&self) -> Signature {
helper::one_of_sigs2(
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::binary_datatype(),
],
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::binary_datatype(),
],
vec![DataType::Utf8, DataType::Binary],
vec![DataType::Utf8, DataType::Binary],
)
}

View File

@@ -16,7 +16,8 @@ use std::borrow::Cow;
use std::fmt::Display;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::Signature;
use datafusion_expr::Signature;
use datatypes::arrow::datatypes::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{Float32VectorBuilder, MutableVector, VectorRef};
@@ -61,11 +62,8 @@ impl Function for VectorKthElemFunction {
fn signature(&self) -> Signature {
helper::one_of_sigs2(
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::binary_datatype(),
],
vec![ConcreteDataType::int64_datatype()],
vec![DataType::Utf8, DataType::Binary],
vec![DataType::Int64],
)
}

View File

@@ -16,7 +16,8 @@ use std::borrow::Cow;
use std::fmt::Display;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::Signature;
use datafusion_expr::Signature;
use datatypes::arrow::datatypes::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{BinaryVectorBuilder, MutableVector, VectorRef};
@@ -57,14 +58,8 @@ impl Function for VectorMulFunction {
fn signature(&self) -> Signature {
helper::one_of_sigs2(
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::binary_datatype(),
],
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::binary_datatype(),
],
vec![DataType::Utf8, DataType::Binary],
vec![DataType::Utf8, DataType::Binary],
)
}

View File

@@ -16,7 +16,8 @@ use std::borrow::Cow;
use std::fmt::Display;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, TypeSignature, Volatility};
use datafusion::logical_expr_common::type_coercion::aggregates::{BINARYS, STRINGS};
use datafusion_expr::{Signature, TypeSignature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{BinaryVectorBuilder, MutableVector, VectorRef};
@@ -58,8 +59,8 @@ impl Function for VectorNormFunction {
fn signature(&self) -> Signature {
Signature::one_of(
vec![
TypeSignature::Exact(vec![ConcreteDataType::string_datatype()]),
TypeSignature::Exact(vec![ConcreteDataType::binary_datatype()]),
TypeSignature::Uniform(1, STRINGS.to_vec()),
TypeSignature::Uniform(1, BINARYS.to_vec()),
],
Volatility::Immutable,
)

View File

@@ -16,7 +16,8 @@ use std::borrow::Cow;
use std::fmt::Display;
use common_query::error::InvalidFuncArgsSnafu;
use common_query::prelude::Signature;
use datafusion_expr::Signature;
use datatypes::arrow::datatypes::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{BinaryVectorBuilder, MutableVector, VectorRef};
@@ -59,14 +60,8 @@ impl Function for VectorSubFunction {
fn signature(&self) -> Signature {
helper::one_of_sigs2(
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::binary_datatype(),
],
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::binary_datatype(),
],
vec![DataType::Utf8, DataType::Binary],
vec![DataType::Utf8, DataType::Binary],
)
}

View File

@@ -16,8 +16,8 @@ use std::borrow::Cow;
use std::fmt::Display;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, TypeSignature};
use datafusion_expr::Volatility;
use datafusion_expr::{Signature, TypeSignature, Volatility};
use datatypes::arrow::datatypes::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{BinaryVectorBuilder, MutableVector, VectorRef};
@@ -59,16 +59,8 @@ impl Function for VectorSubvectorFunction {
fn signature(&self) -> Signature {
Signature::one_of(
vec![
TypeSignature::Exact(vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::int64_datatype(),
]),
TypeSignature::Exact(vec![
ConcreteDataType::binary_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::int64_datatype(),
]),
TypeSignature::Exact(vec![DataType::Utf8, DataType::Int64, DataType::Int64]),
TypeSignature::Exact(vec![DataType::Binary, DataType::Int64, DataType::Int64]),
],
Volatility::Immutable,
)

View File

@@ -16,7 +16,7 @@ use std::fmt;
use std::sync::Arc;
use common_query::error::Result;
use common_query::prelude::{Signature, Volatility};
use datafusion_expr::{Signature, Volatility};
use datatypes::prelude::*;
use datatypes::vectors::{StringVector, VectorRef};

View File

@@ -16,7 +16,7 @@ use std::fmt::{self};
use std::sync::Arc;
use common_query::error::Result;
use common_query::prelude::{Signature, Volatility};
use datafusion_expr::{Signature, Volatility};
use datatypes::prelude::{ConcreteDataType, ScalarVector};
use datatypes::vectors::{StringVector, UInt32Vector, VectorRef};
use derive_more::Display;
@@ -78,7 +78,7 @@ impl Function for CurrentSchemaFunction {
}
fn signature(&self) -> Signature {
Signature::uniform(0, vec![], Volatility::Immutable)
Signature::nullary(Volatility::Immutable)
}
fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
@@ -98,7 +98,7 @@ impl Function for SessionUserFunction {
}
fn signature(&self) -> Signature {
Signature::uniform(0, vec![], Volatility::Immutable)
Signature::nullary(Volatility::Immutable)
}
fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {

View File

@@ -16,7 +16,7 @@ use std::fmt::{self};
use std::sync::Arc;
use common_query::error::Result;
use common_query::prelude::{Signature, Volatility};
use datafusion_expr::{Signature, Volatility};
use datatypes::prelude::{ConcreteDataType, DataType, VectorRef};
use datatypes::types::LogicalPrimitiveType;
use datatypes::with_match_primitive_type_id;
@@ -48,7 +48,7 @@ impl Function for PGGetUserByIdFunction {
fn signature(&self) -> Signature {
Signature::uniform(
1,
vec![ConcreteDataType::uint32_datatype()],
vec![arrow::datatypes::DataType::UInt32],
Volatility::Immutable,
)
}

View File

@@ -16,7 +16,7 @@ use std::fmt::{self};
use std::sync::Arc;
use common_query::error::Result;
use common_query::prelude::{Signature, Volatility};
use datafusion_expr::{Signature, Volatility};
use datatypes::prelude::{ConcreteDataType, DataType, VectorRef};
use datatypes::types::LogicalPrimitiveType;
use datatypes::with_match_primitive_type_id;
@@ -48,7 +48,7 @@ impl Function for PGTableIsVisibleFunction {
fn signature(&self) -> Signature {
Signature::uniform(
1,
vec![ConcreteDataType::uint32_datatype()],
vec![arrow::datatypes::DataType::UInt32],
Volatility::Immutable,
)
}

View File

@@ -16,7 +16,7 @@ use std::fmt;
use std::sync::Arc;
use common_query::error::Result;
use common_query::prelude::{Signature, Volatility};
use datafusion_expr::{Signature, Volatility};
use datatypes::data_type::ConcreteDataType;
use datatypes::vectors::{StringVector, VectorRef};

View File

@@ -16,7 +16,7 @@ use std::fmt::{self};
use std::sync::Arc;
use common_query::error::Result;
use common_query::prelude::{Signature, Volatility};
use datafusion_expr::{Signature, Volatility};
use datatypes::prelude::{ConcreteDataType, ScalarVector};
use datatypes::vectors::{StringVector, VectorRef};

View File

@@ -16,7 +16,7 @@ use std::fmt;
use std::sync::Arc;
use common_query::error::Result;
use common_query::prelude::{Signature, Volatility};
use datafusion_expr::{Signature, Volatility};
use datatypes::data_type::ConcreteDataType;
use datatypes::vectors::{StringVector, VectorRef};
use session::context::Channel;

View File

@@ -17,7 +17,6 @@ pub mod error;
pub mod logical_plan;
pub mod prelude;
pub mod request;
mod signature;
pub mod stream;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;

View File

@@ -15,7 +15,6 @@
pub use datafusion_common::ScalarValue;
pub use crate::columnar_value::ColumnarValue;
pub use crate::signature::{Signature, TypeSignature, Volatility};
/// Default timestamp column name for Prometheus metrics.
pub const GREPTIME_TIMESTAMP: &str = "greptime_timestamp";

View File

@@ -1,231 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Signature module contains foundational types that are used to represent signatures, types,
//! and return types of functions.
//! Copied and modified from datafusion.
pub use datafusion_expr::Volatility;
use datafusion_expr::{Signature as DfSignature, TypeSignature as DfTypeSignature};
use datatypes::arrow::datatypes::DataType as ArrowDataType;
use datatypes::data_type::DataType;
use datatypes::prelude::ConcreteDataType;
/// A function's type signature, which defines the function's supported argument types.
#[derive(Debug, Clone, PartialEq)]
pub enum TypeSignature {
/// arbitrary number of arguments of an common type out of a list of valid types
// A function such as `concat` is `Variadic(vec![ConcreteDataType::String, ConcreteDataType::String])`
Variadic(Vec<ConcreteDataType>),
/// One or more arguments with arbitrary types
VariadicAny,
/// fixed number of arguments of an arbitrary but equal type out of a list of valid types
// A function of one argument of f64 is `Uniform(1, vec![ConcreteDataType::Float64])`
// A function of one argument of f64 or f32 is `Uniform(1, vec![ConcreteDataType::Float32, ConcreteDataType::Float64])`
Uniform(usize, Vec<ConcreteDataType>),
/// exact number of arguments of an exact type
Exact(Vec<ConcreteDataType>),
/// fixed number of arguments of arbitrary types
Any(usize),
/// One of a list of signatures
OneOf(Vec<TypeSignature>),
/// Zero argument
/// This is the new signature for functions with zero arguments
/// TODO(discord9): make all other usize nonzero usize
NullAry,
}
///The Signature of a function defines its supported input types as well as its volatility.
#[derive(Debug, Clone, PartialEq)]
pub struct Signature {
/// type_signature - The types that the function accepts. See [TypeSignature] for more information.
pub type_signature: TypeSignature,
/// volatility - The volatility of the function. See [Volatility] for more information.
pub volatility: Volatility,
}
#[inline]
fn concrete_types_to_arrow_types(ts: Vec<ConcreteDataType>) -> Vec<ArrowDataType> {
ts.iter().map(ConcreteDataType::as_arrow_type).collect()
}
impl Signature {
/// new - Creates a new Signature from any type signature and the volatility.
pub fn new(type_signature: TypeSignature, volatility: Volatility) -> Self {
Signature {
type_signature,
volatility,
}
}
/// variadic - Creates a variadic signature that represents an arbitrary number of arguments all from a type in common_types.
pub fn variadic(common_types: Vec<ConcreteDataType>, volatility: Volatility) -> Self {
Self {
type_signature: TypeSignature::Variadic(common_types),
volatility,
}
}
/// variadic_any - Creates a variadic signature that represents an arbitrary number of arguments of any type.
pub fn variadic_any(volatility: Volatility) -> Self {
Self {
type_signature: TypeSignature::VariadicAny,
volatility,
}
}
/// uniform - Creates a function with a fixed number of arguments of the same type, which must be from valid_types.
pub fn uniform(
arg_count: usize,
valid_types: Vec<ConcreteDataType>,
volatility: Volatility,
) -> Self {
Self {
type_signature: TypeSignature::Uniform(arg_count, valid_types),
volatility,
}
}
/// exact - Creates a signature which must match the types in exact_types in order.
pub fn exact(exact_types: Vec<ConcreteDataType>, volatility: Volatility) -> Self {
Signature {
type_signature: TypeSignature::Exact(exact_types),
volatility,
}
}
/// any - Creates a signature which can a be made of any type but of a specified number
pub fn any(arg_count: usize, volatility: Volatility) -> Self {
Signature {
type_signature: TypeSignature::Any(arg_count),
volatility,
}
}
/// one_of Creates a signature which can match any of the [TypeSignature]s which are passed in.
pub fn one_of(type_signatures: Vec<TypeSignature>, volatility: Volatility) -> Self {
Signature {
type_signature: TypeSignature::OneOf(type_signatures),
volatility,
}
}
pub fn nullary(volatility: Volatility) -> Self {
Signature {
type_signature: TypeSignature::NullAry,
volatility,
}
}
}
/// Conversations between datafusion signature and our signature
impl From<TypeSignature> for DfTypeSignature {
fn from(type_signature: TypeSignature) -> DfTypeSignature {
match type_signature {
TypeSignature::Variadic(types) => {
DfTypeSignature::Variadic(concrete_types_to_arrow_types(types))
}
TypeSignature::Uniform(n, types) => {
if n == 0 {
return DfTypeSignature::Nullary;
}
DfTypeSignature::Uniform(n, concrete_types_to_arrow_types(types))
}
TypeSignature::Exact(types) => {
DfTypeSignature::Exact(concrete_types_to_arrow_types(types))
}
TypeSignature::Any(n) => {
if n == 0 {
return DfTypeSignature::Nullary;
}
DfTypeSignature::Any(n)
}
TypeSignature::OneOf(ts) => {
DfTypeSignature::OneOf(ts.into_iter().map(Into::into).collect())
}
TypeSignature::VariadicAny => DfTypeSignature::VariadicAny,
TypeSignature::NullAry => DfTypeSignature::Nullary,
}
}
}
impl From<&DfTypeSignature> for TypeSignature {
fn from(type_signature: &DfTypeSignature) -> TypeSignature {
match type_signature {
DfTypeSignature::Variadic(types) => TypeSignature::Variadic(
types
.iter()
.map(ConcreteDataType::from_arrow_type)
.collect(),
),
DfTypeSignature::Uniform(n, types) => {
if *n == 0 {
return TypeSignature::NullAry;
}
TypeSignature::Uniform(
*n,
types
.iter()
.map(ConcreteDataType::from_arrow_type)
.collect(),
)
}
DfTypeSignature::Exact(types) => TypeSignature::Exact(
types
.iter()
.map(ConcreteDataType::from_arrow_type)
.collect(),
),
DfTypeSignature::Any(n) => {
if *n == 0 {
return TypeSignature::NullAry;
}
TypeSignature::Any(*n)
}
DfTypeSignature::OneOf(ts) => TypeSignature::OneOf(ts.iter().map(Into::into).collect()),
DfTypeSignature::VariadicAny => TypeSignature::VariadicAny,
DfTypeSignature::Nullary => TypeSignature::NullAry,
// Other type signatures are currently mapped to VariadicAny as a fallback.
// These cases are not used in the current UDF implementation.
_ => TypeSignature::VariadicAny,
}
}
}
impl From<Signature> for DfSignature {
fn from(sig: Signature) -> DfSignature {
DfSignature::new(sig.type_signature.into(), sig.volatility)
}
}
#[cfg(test)]
mod tests {
use datatypes::arrow::datatypes::DataType;
use super::*;
#[test]
fn test_into_df_signature() {
let types = vec![
ConcreteDataType::int8_datatype(),
ConcreteDataType::float32_datatype(),
ConcreteDataType::float64_datatype(),
];
let sig = Signature::exact(types.clone(), Volatility::Immutable);
assert_eq!(Volatility::Immutable, sig.volatility);
assert!(matches!(&sig.type_signature, TypeSignature::Exact(x) if x.clone() == types));
let df_sig = DfSignature::from(sig);
assert_eq!(Volatility::Immutable, df_sig.volatility);
let types = vec![DataType::Int8, DataType::Float32, DataType::Float64];
assert!(matches!(df_sig.type_signature, DfTypeSignature::Exact(x) if x == types));
}
}

View File

@@ -18,6 +18,7 @@ use std::sync::Arc;
use common_error::ext::BoxedError;
use common_function::function::{FunctionContext, FunctionRef};
use datafusion_expr::{Signature, Volatility};
use datafusion_substrait::extensions::Extensions;
use datatypes::data_type::ConcreteDataType as CDT;
use query::QueryEngine;
@@ -145,8 +146,8 @@ impl common_function::function::Function for TumbleFunction {
Ok(CDT::timestamp_millisecond_datatype())
}
fn signature(&self) -> common_query::prelude::Signature {
common_query::prelude::Signature::variadic_any(common_query::prelude::Volatility::Immutable)
fn signature(&self) -> Signature {
Signature::variadic_any(Volatility::Immutable)
}
fn eval(

View File

@@ -16,12 +16,13 @@ use std::sync::Arc;
use common_function::function::FunctionContext;
use common_function::function_registry::FUNCTION_REGISTRY;
use common_query::prelude::TypeSignature;
use common_query::Output;
use common_recordbatch::{RecordBatch, RecordBatches};
use common_sql::convert::sql_value_to_value;
use common_telemetry::tracing;
use common_time::Timezone;
use datafusion_expr::TypeSignature;
use datatypes::arrow::datatypes::DataType as ArrowDataType;
use datatypes::data_type::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
@@ -81,8 +82,7 @@ impl StatementExecutor {
})
.collect::<Result<Vec<_>>>()?;
let type_sig = (&signature.type_signature).into();
let args = args_to_vector(&type_sig, &arg_values, &query_ctx)?;
let args = args_to_vector(&signature.type_signature, &arg_values, &query_ctx)?;
let arg_types = args
.iter()
.map(|arg| arg.data_type().as_arrow_type())
@@ -223,19 +223,23 @@ fn args_to_vector(
.fail()
}
TypeSignature::NullAry => Ok(vec![]),
_ => error::BuildAdminFunctionArgsSnafu {
msg: format!("unknown function type signature: {type_signature:?}"),
}
.fail(),
}
}
/// Try to cast sql values to vectors by exact data types.
fn values_to_vectors_by_exact_types(
exact_types: &[ConcreteDataType],
exact_types: &[ArrowDataType],
args: &[&SqlValue],
tz: Option<&Timezone>,
) -> Result<Vec<VectorRef>> {
args.iter()
.zip(exact_types.iter())
.map(|(value, data_type)| {
let data_type = &ConcreteDataType::from_arrow_type(data_type);
let value = sql_value_to_value(DUMMY_COLUMN, data_type, value, tz, None, false)
.context(error::SqlCommonSnafu)?;
@@ -246,13 +250,14 @@ fn values_to_vectors_by_exact_types(
/// Try to cast sql values to vectors by valid data types.
fn values_to_vectors_by_valid_types(
valid_types: &[ConcreteDataType],
valid_types: &[ArrowDataType],
args: &[&SqlValue],
tz: Option<&Timezone>,
) -> Result<Vec<VectorRef>> {
args.iter()
.map(|value| {
for data_type in valid_types {
let data_type = &ConcreteDataType::from_arrow_type(data_type);
if let Ok(value) =
sql_value_to_value(DUMMY_COLUMN, data_type, value, tz, None, false)
{
@@ -278,14 +283,14 @@ fn value_to_vector(value: Value) -> VectorRef {
}
/// Try to infer the data type from sql value.
fn try_get_data_type_for_sql_value(value: &SqlValue) -> Result<ConcreteDataType> {
fn try_get_data_type_for_sql_value(value: &SqlValue) -> Result<ArrowDataType> {
match value {
SqlValue::Number(_, _) => Ok(ConcreteDataType::float64_datatype()),
SqlValue::Null => Ok(ConcreteDataType::null_datatype()),
SqlValue::Boolean(_) => Ok(ConcreteDataType::boolean_datatype()),
SqlValue::Number(_, _) => Ok(ArrowDataType::Float64),
SqlValue::Null => Ok(ArrowDataType::Null),
SqlValue::Boolean(_) => Ok(ArrowDataType::Boolean),
SqlValue::HexStringLiteral(_)
| SqlValue::DoubleQuotedString(_)
| SqlValue::SingleQuotedString(_) => Ok(ConcreteDataType::string_datatype()),
| SqlValue::SingleQuotedString(_) => Ok(ArrowDataType::Utf8),
_ => error::BuildAdminFunctionArgsSnafu {
msg: format!("unsupported sql value: {value}"),
}