diff --git a/src/common/function/src/function.rs b/src/common/function/src/function.rs index 538d3fd227..18ad60b622 100644 --- a/src/common/function/src/function.rs +++ b/src/common/function/src/function.rs @@ -15,11 +15,15 @@ use std::fmt; use std::sync::Arc; -use common_query::error::Result; +use common_error::ext::{BoxedError, PlainError}; +use common_error::status_code::StatusCode; +use common_query::error::{ExecuteSnafu, Result}; use datafusion::arrow::datatypes::DataType; -use datafusion_expr::Signature; +use datafusion::logical_expr::ColumnarValue; +use datafusion_expr::{ScalarFunctionArgs, Signature}; use datatypes::vectors::VectorRef; use session::context::{QueryContextBuilder, QueryContextRef}; +use snafu::ResultExt; use crate::state::FunctionState; @@ -68,8 +72,26 @@ pub trait Function: fmt::Display + Sync + Send { /// The signature of function. fn signature(&self) -> Signature; + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + // TODO(LFC): Remove default implementation once all UDFs have implemented this function. + let _ = args; + Err(datafusion_common::DataFusionError::NotImplemented( + "invoke_with_args".to_string(), + )) + } + /// Evaluate the function, e.g. run/execute the function. - fn eval(&self, ctx: &FunctionContext, columns: &[VectorRef]) -> Result; + /// TODO(LFC): Remove `eval` when all UDFs are rewritten to `invoke_with_args` + fn eval(&self, _: &FunctionContext, _: &[VectorRef]) -> Result { + Err(BoxedError::new(PlainError::new( + "unsupported".to_string(), + StatusCode::Unsupported, + ))) + .context(ExecuteSnafu) + } fn aliases(&self) -> &[String] { &[] diff --git a/src/common/function/src/scalars/geo/h3.rs b/src/common/function/src/scalars/geo/h3.rs index 35ee31c68b..66d92863ca 100644 --- a/src/common/function/src/scalars/geo/h3.rs +++ b/src/common/function/src/scalars/geo/h3.rs @@ -18,22 +18,24 @@ use std::sync::{Arc, LazyLock}; use common_error::ext::{BoxedError, PlainError}; use common_error::status_code::StatusCode; use common_query::error::{self, Result}; -use datafusion_expr::type_coercion::aggregates::INTEGERS; -use datafusion_expr::{Signature, TypeSignature, Volatility}; -use datatypes::arrow::datatypes::{DataType, Field}; -use datatypes::prelude::ConcreteDataType; -use datatypes::scalars::{Scalar, ScalarVectorBuilder}; -use datatypes::value::{ListValue, Value}; -use datatypes::vectors::{ - BooleanVectorBuilder, Float64VectorBuilder, Int32VectorBuilder, ListVectorBuilder, - MutableVector, StringVectorBuilder, UInt8VectorBuilder, UInt64VectorBuilder, VectorRef, +use datafusion::arrow::array::{ + Array, ArrayRef, AsArray, BooleanBuilder, Float64Builder, Int32Builder, ListBuilder, + StringViewArray, StringViewBuilder, UInt8Builder, UInt64Builder, }; +use datafusion::arrow::compute; +use datafusion::arrow::datatypes::{ + ArrowPrimitiveType, Float64Type, Int64Type, UInt8Type, UInt64Type, +}; +use datafusion::logical_expr::ColumnarValue; +use datafusion_common::{DataFusionError, ScalarValue, utils}; +use datafusion_expr::type_coercion::aggregates::INTEGERS; +use datafusion_expr::{ScalarFunctionArgs, Signature, TypeSignature, Volatility}; +use datatypes::arrow::datatypes::{DataType, Field}; use derive_more::Display; use h3o::{CellIndex, LatLng, Resolution}; -use snafu::ResultExt; +use snafu::prelude::*; -use crate::function::{Function, FunctionContext}; -use crate::scalars::geo::helpers::{ensure_and_coerce, ensure_columns_len, ensure_columns_n}; +use crate::function::Function; static CELL_TYPES: LazyLock> = LazyLock::new(|| vec![DataType::Int64, DataType::UInt64, DataType::Utf8]); @@ -80,23 +82,33 @@ impl Function for H3LatLngToCell { Signature::one_of(signatures, Volatility::Stable) } - fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure_columns_n!(columns, 3); + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + let args = ColumnarValue::values_to_arrays(&args.args)?; + let [lat_vec, lon_vec, resolution_vec] = utils::take_function_args(self.name(), args)?; - let lat_vec = &columns[0]; - let lon_vec = &columns[1]; - let resolution_vec = &columns[2]; + let lat_vec = cast::(&lat_vec)?; + let lat_vec = lat_vec.as_primitive::(); + let lon_vec = cast::(&lon_vec)?; + let lon_vec = lon_vec.as_primitive::(); + let resolutions = cast::(&resolution_vec)?; + let resolution_vec = resolutions.as_primitive::(); let size = lat_vec.len(); - let mut results = UInt64VectorBuilder::with_capacity(size); + let mut builder = UInt64Builder::with_capacity(size); for i in 0..size { - let lat = lat_vec.get(i).as_f64_lossy(); - let lon = lon_vec.get(i).as_f64_lossy(); - let r = value_to_resolution(resolution_vec.get(i))?; + let lat = lat_vec.is_valid(i).then(|| lat_vec.value(i)); + let lon = lon_vec.is_valid(i).then(|| lon_vec.value(i)); + let r = resolution_vec + .is_valid(i) + .then(|| value_to_resolution(resolution_vec.value(i))) + .transpose()?; - let result = match (lat, lon) { - (Some(lat), Some(lon)) => { + let result = match (lat, lon, r) { + (Some(lat), Some(lon), Some(r)) => { let coord = LatLng::new(lat, lon) .map_err(|e| { BoxedError::new(PlainError::new( @@ -111,10 +123,10 @@ impl Function for H3LatLngToCell { _ => None, }; - results.push(result); + builder.append_option(result); } - Ok(results.to_vector()) + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } } @@ -132,7 +144,7 @@ impl Function for H3LatLngToCellString { } fn return_type(&self, _: &[DataType]) -> Result { - Ok(DataType::Utf8) + Ok(DataType::Utf8View) } fn signature(&self) -> Signature { @@ -152,23 +164,33 @@ impl Function for H3LatLngToCellString { Signature::one_of(signatures, Volatility::Stable) } - fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure_columns_n!(columns, 3); + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + let args = ColumnarValue::values_to_arrays(&args.args)?; + let [lat_vec, lon_vec, resolution_vec] = utils::take_function_args(self.name(), args)?; - let lat_vec = &columns[0]; - let lon_vec = &columns[1]; - let resolution_vec = &columns[2]; + let lat_vec = cast::(&lat_vec)?; + let lat_vec = lat_vec.as_primitive::(); + let lon_vec = cast::(&lon_vec)?; + let lon_vec = lon_vec.as_primitive::(); + let resolutions = cast::(&resolution_vec)?; + let resolution_vec = resolutions.as_primitive::(); let size = lat_vec.len(); - let mut results = StringVectorBuilder::with_capacity(size); + let mut builder = StringViewBuilder::with_capacity(size); for i in 0..size { - let lat = lat_vec.get(i).as_f64_lossy(); - let lon = lon_vec.get(i).as_f64_lossy(); - let r = value_to_resolution(resolution_vec.get(i))?; + let lat = lat_vec.is_valid(i).then(|| lat_vec.value(i)); + let lon = lon_vec.is_valid(i).then(|| lon_vec.value(i)); + let r = resolution_vec + .is_valid(i) + .then(|| value_to_resolution(resolution_vec.value(i))) + .transpose()?; - let result = match (lat, lon) { - (Some(lat), Some(lon)) => { + let result = match (lat, lon, r) { + (Some(lat), Some(lon), Some(r)) => { let coord = LatLng::new(lat, lon) .map_err(|e| { BoxedError::new(PlainError::new( @@ -183,10 +205,10 @@ impl Function for H3LatLngToCellString { _ => None, }; - results.push(result.as_deref()); + builder.append_option(result); } - Ok(results.to_vector()) + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } } @@ -201,27 +223,31 @@ impl Function for H3CellToString { } fn return_type(&self, _: &[DataType]) -> Result { - Ok(DataType::Utf8) + Ok(DataType::Utf8View) } fn signature(&self) -> Signature { signature_of_cell() } - fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure_columns_n!(columns, 1); + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + let args = ColumnarValue::values_to_arrays(&args.args)?; + let [cell_vec] = utils::take_function_args(self.name(), args)?; - let cell_vec = &columns[0]; let size = cell_vec.len(); - let mut results = StringVectorBuilder::with_capacity(size); + let mut builder = StringViewBuilder::with_capacity(size); for i in 0..size { - let cell_id_string = cell_from_value(cell_vec.get(i))?.map(|c| c.to_string()); - - results.push(cell_id_string.as_deref()); + let v = ScalarValue::try_from_array(&cell_vec, i) + .and_then(cell_from_value)? + .map(|x| x.to_string()); + builder.append_option(v); } - Ok(results.to_vector()) + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } } @@ -243,35 +269,32 @@ impl Function for H3StringToCell { Signature::string(1, Volatility::Stable) } - fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure_columns_n!(columns, 1); + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + let args = ColumnarValue::values_to_arrays(&args.args)?; + let [string_vec] = utils::take_function_args(self.name(), args)?; + let string_vec = compute::cast(string_vec.as_ref(), &DataType::Utf8View)?; + let string_vec = datafusion_common::downcast_value!(string_vec, StringViewArray); - let string_vec = &columns[0]; let size = string_vec.len(); - let mut results = UInt64VectorBuilder::with_capacity(size); + let mut builder = UInt64Builder::with_capacity(size); for i in 0..size { - let cell = string_vec.get(i); + let cell_id = string_vec + .is_valid(i) + .then(|| { + CellIndex::from_str(string_vec.value(i)) + .map_err(|e| DataFusionError::Execution(format!("H3 error: {}", e))) + .map(Into::into) + }) + .transpose()?; - let cell_id = match cell { - Value::String(v) => Some( - CellIndex::from_str(v.as_utf8()) - .map_err(|e| { - BoxedError::new(PlainError::new( - format!("H3 error: {}", e), - StatusCode::EngineExecuteQuery, - )) - }) - .context(error::ExecuteSnafu)? - .into(), - ), - _ => None, - }; - - results.push(cell_id); + builder.append_option(cell_id); } - Ok(results.to_vector()) + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } } @@ -297,30 +320,30 @@ impl Function for H3CellCenterLatLng { signature_of_cell() } - fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure_columns_n!(columns, 1); + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + let args = ColumnarValue::values_to_arrays(&args.args)?; + let [cell_vec] = utils::take_function_args(self.name(), args)?; - let cell_vec = &columns[0]; let size = cell_vec.len(); - let mut results = - ListVectorBuilder::with_type_capacity(ConcreteDataType::float64_datatype(), size); + let mut builder = ListBuilder::new(Float64Builder::new()); for i in 0..size { - let cell = cell_from_value(cell_vec.get(i))?; + let cell = ScalarValue::try_from_array(&cell_vec, i).and_then(cell_from_value)?; let latlng = cell.map(LatLng::from); if let Some(latlng) = latlng { - let result = ListValue::new( - vec![latlng.lat().into(), latlng.lng().into()], - ConcreteDataType::float64_datatype(), - ); - results.push(Some(result.as_scalar_ref())); + builder.values().append_value(latlng.lat()); + builder.values().append_value(latlng.lng()); + builder.append(true); } else { - results.push(None); + builder.append_null(); } } - Ok(results.to_vector()) + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } } @@ -342,21 +365,23 @@ impl Function for H3CellResolution { signature_of_cell() } - fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure_columns_n!(columns, 1); + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + let args = ColumnarValue::values_to_arrays(&args.args)?; + let [cell_vec] = utils::take_function_args(self.name(), args)?; - let cell_vec = &columns[0]; let size = cell_vec.len(); - let mut results = UInt8VectorBuilder::with_capacity(size); + let mut builder = UInt8Builder::with_capacity(cell_vec.len()); for i in 0..size { - let cell = cell_from_value(cell_vec.get(i))?; + let cell = ScalarValue::try_from_array(&cell_vec, i).and_then(cell_from_value)?; let res = cell.map(|cell| cell.resolution().into()); - - results.push(res); + builder.append_option(res); } - Ok(results.to_vector()) + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } } @@ -378,21 +403,24 @@ impl Function for H3CellBase { signature_of_cell() } - fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure_columns_n!(columns, 1); + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + let args = ColumnarValue::values_to_arrays(&args.args)?; + let [cell_vec] = utils::take_function_args(self.name(), args)?; - let cell_vec = &columns[0]; let size = cell_vec.len(); - let mut results = UInt8VectorBuilder::with_capacity(size); + let mut builder = UInt8Builder::with_capacity(size); for i in 0..size { - let cell = cell_from_value(cell_vec.get(i))?; + let cell = ScalarValue::try_from_array(&cell_vec, i).and_then(cell_from_value)?; let res = cell.map(|cell| cell.base_cell().into()); - results.push(res); + builder.append_option(res); } - Ok(results.to_vector()) + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } } @@ -414,21 +442,24 @@ impl Function for H3CellIsPentagon { signature_of_cell() } - fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure_columns_n!(columns, 1); + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + let args = ColumnarValue::values_to_arrays(&args.args)?; + let [cell_vec] = utils::take_function_args(self.name(), args)?; - let cell_vec = &columns[0]; let size = cell_vec.len(); - let mut results = BooleanVectorBuilder::with_capacity(size); + let mut builder = BooleanBuilder::with_capacity(size); for i in 0..size { - let cell = cell_from_value(cell_vec.get(i))?; + let cell = ScalarValue::try_from_array(&cell_vec, i).and_then(cell_from_value)?; let res = cell.map(|cell| cell.is_pentagon()); - results.push(res); + builder.append_option(res); } - Ok(results.to_vector()) + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } } @@ -450,25 +481,13 @@ impl Function for H3CellCenterChild { signature_of_cell_and_resolution() } - fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure_columns_n!(columns, 2); - - let cell_vec = &columns[0]; - let res_vec = &columns[1]; - let size = cell_vec.len(); - let mut results = UInt64VectorBuilder::with_capacity(size); - - for i in 0..size { - let cell = cell_from_value(cell_vec.get(i))?; - let res = value_to_resolution(res_vec.get(i))?; - let result = cell - .and_then(|cell| cell.center_child(res)) - .map(|c| c.into()); - - results.push(result); - } - - Ok(results.to_vector()) + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + calculate_cell_child_property(self.name(), args, |cell, resolution| { + cell.center_child(resolution).map(Into::into) + }) } } @@ -490,23 +509,13 @@ impl Function for H3CellParent { signature_of_cell_and_resolution() } - fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure_columns_n!(columns, 2); - - let cell_vec = &columns[0]; - let res_vec = &columns[1]; - let size = cell_vec.len(); - let mut results = UInt64VectorBuilder::with_capacity(size); - - for i in 0..size { - let cell = cell_from_value(cell_vec.get(i))?; - let res = value_to_resolution(res_vec.get(i))?; - let result = cell.and_then(|cell| cell.parent(res)).map(|c| c.into()); - - results.push(result); - } - - Ok(results.to_vector()) + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + calculate_cell_child_property(self.name(), args, |cell, resolution| { + cell.parent(resolution).map(Into::into) + }) } } @@ -532,34 +541,37 @@ impl Function for H3CellToChildren { signature_of_cell_and_resolution() } - fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure_columns_n!(columns, 2); + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + let args = ColumnarValue::values_to_arrays(&args.args)?; + let [cell_vec, res_vec] = utils::take_function_args(self.name(), args)?; + let resolutions = cast::(&res_vec)?; + let resolutions = resolutions.as_primitive::(); - let cell_vec = &columns[0]; - let res_vec = &columns[1]; let size = cell_vec.len(); - let mut results = - ListVectorBuilder::with_type_capacity(ConcreteDataType::uint64_datatype(), size); + let mut builder = ListBuilder::new(UInt64Builder::new()); for i in 0..size { - let cell = cell_from_value(cell_vec.get(i))?; - let res = value_to_resolution(res_vec.get(i))?; - let result = cell.map(|cell| { - let children: Vec = cell - .children(res) - .map(|child| Value::from(u64::from(child))) - .collect(); - ListValue::new(children, ConcreteDataType::uint64_datatype()) - }); + let cell = ScalarValue::try_from_array(&cell_vec, i).and_then(cell_from_value)?; + let resolution = resolutions + .is_valid(i) + .then(|| value_to_resolution(resolutions.value(i))) + .transpose()?; - if let Some(list_value) = result { - results.push(Some(list_value.as_scalar_ref())); - } else { - results.push(None); + match (cell, resolution) { + (Some(c), Some(r)) => { + for x in c.children(r) { + builder.values().append_value(u64::from(x)); + } + builder.append(true); + } + _ => builder.append_null(), } } - Ok(results.to_vector()) + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } } @@ -581,22 +593,13 @@ impl Function for H3CellToChildrenSize { signature_of_cell_and_resolution() } - fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure_columns_n!(columns, 2); - - let cell_vec = &columns[0]; - let res_vec = &columns[1]; - let size = cell_vec.len(); - let mut results = UInt64VectorBuilder::with_capacity(size); - - for i in 0..size { - let cell = cell_from_value(cell_vec.get(i))?; - let res = value_to_resolution(res_vec.get(i))?; - let result = cell.map(|cell| cell.children_count(res)); - results.push(result); - } - - Ok(results.to_vector()) + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + calculate_cell_child_property(self.name(), args, |cell, resolution| { + Some(cell.children_count(resolution)) + }) } } @@ -618,25 +621,46 @@ impl Function for H3CellToChildPos { signature_of_cell_and_resolution() } - fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure_columns_n!(columns, 2); - - let cell_vec = &columns[0]; - let res_vec = &columns[1]; - let size = cell_vec.len(); - let mut results = UInt64VectorBuilder::with_capacity(size); - - for i in 0..size { - let cell = cell_from_value(cell_vec.get(i))?; - let res = value_to_resolution(res_vec.get(i))?; - let result = cell.and_then(|cell| cell.child_position(res)); - results.push(result); - } - - Ok(results.to_vector()) + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + calculate_cell_child_property(self.name(), args, |cell, resolution| { + cell.child_position(resolution) + }) } } +fn calculate_cell_child_property( + name: &str, + args: ScalarFunctionArgs, + calculator: F, +) -> datafusion_common::Result +where + F: Fn(CellIndex, Resolution) -> Option, +{ + let args = ColumnarValue::values_to_arrays(&args.args)?; + let [cells, resolutions] = utils::take_function_args(name, args)?; + let resolutions = cast::(&resolutions)?; + let resolutions = resolutions.as_primitive::(); + + let mut builder = UInt64Builder::with_capacity(cells.len()); + for i in 0..cells.len() { + let cell = ScalarValue::try_from_array(&cells, i).and_then(cell_from_value)?; + let resolution = resolutions + .is_valid(i) + .then(|| value_to_resolution(resolutions.value(i))) + .transpose()?; + let v = match (cell, resolution) { + (Some(c), Some(r)) => calculator(c, r), + _ => None, + }; + builder.append_option(v); + } + + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) +} + /// Function that returns the cell at given position of the parent at given resolution #[derive(Clone, Debug, Default, Display)] #[display("{}", self.name())] @@ -668,27 +692,48 @@ impl Function for H3ChildPosToCell { Signature::one_of(signatures, Volatility::Stable) } - fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure_columns_n!(columns, 3); + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + let args = ColumnarValue::values_to_arrays(&args.args)?; + let [pos_vec, cell_vec, res_vec] = utils::take_function_args(self.name(), args)?; + let resolutions = cast::(&res_vec)?; + let resolutions = resolutions.as_primitive::(); - let pos_vec = &columns[0]; - let cell_vec = &columns[1]; - let res_vec = &columns[2]; let size = cell_vec.len(); - let mut results = UInt64VectorBuilder::with_capacity(size); + let mut builder = UInt64Builder::with_capacity(size); for i in 0..size { - let cell = cell_from_value(cell_vec.get(i))?; - let pos = value_to_position(pos_vec.get(i))?; - let res = value_to_resolution(res_vec.get(i))?; - let result = cell.and_then(|cell| cell.child_at(pos, res).map(u64::from)); - results.push(result); + let cell = ScalarValue::try_from_array(&cell_vec, i).and_then(cell_from_value)?; + let pos = ScalarValue::try_from_array(&pos_vec, i).and_then(value_to_position)?; + let resolution = resolutions + .is_valid(i) + .then(|| value_to_resolution(resolutions.value(i))) + .transpose()?; + let result = match (cell, resolution) { + (Some(c), Some(r)) => c.child_at(pos, r).map(u64::from), + _ => None, + }; + builder.append_option(result); } - Ok(results.to_vector()) + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } } +fn cast(array: &ArrayRef) -> datafusion_common::Result { + let x = compute::cast_with_options( + array.as_ref(), + &T::DATA_TYPE, + &compute::CastOptions { + safe: false, + ..Default::default() + }, + )?; + Ok(x) +} + /// Function that returns cells with k distances of given cell #[derive(Clone, Debug, Default, Display)] #[display("{}", self.name())] @@ -711,36 +756,31 @@ impl Function for H3GridDisk { signature_of_cell_and_distance() } - fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure_columns_n!(columns, 2); + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + let args = ColumnarValue::values_to_arrays(&args.args)?; + let [cell_vec, k_vec] = utils::take_function_args(self.name(), args)?; - let cell_vec = &columns[0]; - let k_vec = &columns[1]; let size = cell_vec.len(); - let mut results = - ListVectorBuilder::with_type_capacity(ConcreteDataType::uint64_datatype(), size); + let mut builder = ListBuilder::new(UInt64Builder::new()); for i in 0..size { - let cell = cell_from_value(cell_vec.get(i))?; - let k = value_to_distance(k_vec.get(i))?; + let cell = ScalarValue::try_from_array(&cell_vec, i).and_then(cell_from_value)?; + let k = ScalarValue::try_from_array(&k_vec, i).and_then(value_to_distance)?; - let result = cell.map(|cell| { - let children: Vec = cell - .grid_disk::>(k) - .into_iter() - .map(|child| Value::from(u64::from(child))) - .collect(); - ListValue::new(children, ConcreteDataType::uint64_datatype()) - }); - - if let Some(list_value) = result { - results.push(Some(list_value.as_scalar_ref())); + if let Some(cell) = cell { + for x in cell.grid_disk::>(k) { + builder.values().append_value(u64::from(x)); + } + builder.append(true); } else { - results.push(None); + builder.append_null(); } } - Ok(results.to_vector()) + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } } @@ -766,36 +806,31 @@ impl Function for H3GridDiskDistances { signature_of_cell_and_distance() } - fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure_columns_n!(columns, 2); + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + let args = ColumnarValue::values_to_arrays(&args.args)?; + let [cell_vec, k_vec] = utils::take_function_args(self.name(), args)?; - let cell_vec = &columns[0]; - let k_vec = &columns[1]; let size = cell_vec.len(); - let mut results = - ListVectorBuilder::with_type_capacity(ConcreteDataType::uint64_datatype(), size); + let mut builder = ListBuilder::new(UInt64Builder::new()); for i in 0..size { - let cell = cell_from_value(cell_vec.get(i))?; - let k = value_to_distance(k_vec.get(i))?; + let cell = ScalarValue::try_from_array(&cell_vec, i).and_then(cell_from_value)?; + let k = ScalarValue::try_from_array(&k_vec, i).and_then(value_to_distance)?; - let result = cell.map(|cell| { - let children: Vec = cell - .grid_disk_distances::>(k) - .into_iter() - .map(|(child, _distance)| Value::from(u64::from(child))) - .collect(); - ListValue::new(children, ConcreteDataType::uint64_datatype()) - }); - - if let Some(list_value) = result { - results.push(Some(list_value.as_scalar_ref())); + if let Some(cell) = cell { + for (x, _) in cell.grid_disk_distances::>(k) { + builder.values().append_value(u64::from(x)); + } + builder.append(true); } else { - results.push(None); + builder.append_null(); } } - Ok(results.to_vector()) + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } } @@ -816,20 +851,22 @@ impl Function for H3GridDistance { signature_of_double_cells() } - fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure_columns_n!(columns, 2); + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + let args = ColumnarValue::values_to_arrays(&args.args)?; + let [cell_this_vec, cell_that_vec] = utils::take_function_args(self.name(), args)?; - let cell_this_vec = &columns[0]; - let cell_that_vec = &columns[1]; let size = cell_this_vec.len(); - - let mut results = Int32VectorBuilder::with_capacity(size); + let mut builder = Int32Builder::with_capacity(size); for i in 0..size { - let result = match ( - cell_from_value(cell_this_vec.get(i))?, - cell_from_value(cell_that_vec.get(i))?, - ) { + let cell_this = + ScalarValue::try_from_array(&cell_this_vec, i).and_then(cell_from_value)?; + let cell_that = + ScalarValue::try_from_array(&cell_that_vec, i).and_then(cell_from_value)?; + let result = match (cell_this, cell_that) { (Some(cell_this), Some(cell_that)) => { let dist = cell_this .grid_distance(cell_that) @@ -845,10 +882,10 @@ impl Function for H3GridDistance { _ => None, }; - results.push(result); + builder.append_option(result); } - Ok(results.to_vector()) + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } } @@ -874,50 +911,38 @@ impl Function for H3GridPathCells { signature_of_double_cells() } - fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure_columns_n!(columns, 2); + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + let args = ColumnarValue::values_to_arrays(&args.args)?; + let [cell_this_vec, cell_that_vec] = utils::take_function_args(self.name(), args)?; - let cell_this_vec = &columns[0]; - let cell_that_vec = &columns[1]; let size = cell_this_vec.len(); - let mut results = - ListVectorBuilder::with_type_capacity(ConcreteDataType::uint64_datatype(), size); + let mut builder = ListBuilder::new(UInt64Builder::new()); for i in 0..size { - let result = match ( - cell_from_value(cell_this_vec.get(i))?, - cell_from_value(cell_that_vec.get(i))?, - ) { + let cell_this = + ScalarValue::try_from_array(&cell_this_vec, i).and_then(cell_from_value)?; + let cell_that = + ScalarValue::try_from_array(&cell_that_vec, i).and_then(cell_from_value)?; + match (cell_this, cell_that) { (Some(cell_this), Some(cell_that)) => { let cells = cell_this .grid_path_cells(cell_that) - .and_then(|x| x.collect::, _>>()) - .map_err(|e| { - BoxedError::new(PlainError::new( - format!("H3 error: {}", e), - StatusCode::EngineExecuteQuery, - )) - }) - .context(error::ExecuteSnafu)?; - Some(ListValue::new( - cells - .into_iter() - .map(|c| Value::from(u64::from(c))) - .collect(), - ConcreteDataType::uint64_datatype(), - )) + .map_err(|e| DataFusionError::Execution(format!("H3 error: {}", e)))?; + for cell in cells { + let cell = cell + .map_err(|e| DataFusionError::Execution(format!("H3 error: {}", e)))?; + builder.values().append_value(u64::from(cell)); + } + builder.append(true); } - _ => None, + _ => builder.append_null(), }; - - if let Some(list_value) = result { - results.push(Some(list_value.as_scalar_ref())); - } else { - results.push(None); - } } - Ok(results.to_vector()) + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } } @@ -956,21 +981,22 @@ impl Function for H3CellContains { Signature::one_of(signatures, Volatility::Stable) } - fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure_columns_n!(columns, 2); - - let cells_vec = &columns[0]; - let cell_this_vec = &columns[1]; + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + let args = ColumnarValue::values_to_arrays(&args.args)?; + let [cells_vec, cell_this_vec] = utils::take_function_args(self.name(), args)?; let size = cell_this_vec.len(); - let mut results = BooleanVectorBuilder::with_capacity(size); + let mut builder = BooleanBuilder::with_capacity(size); for i in 0..size { + let cells = ScalarValue::try_from_array(&cells_vec, i).and_then(cells_from_value)?; + let cell_this = + ScalarValue::try_from_array(&cell_this_vec, i).and_then(cell_from_value)?; let mut result = None; - if let (cells, Some(cell_this)) = ( - cells_from_value(cells_vec.get(i))?, - cell_from_value(cell_this_vec.get(i))?, - ) { + if let (cells, Some(cell_this)) = (cells, cell_this) { result = Some(false); for cell_that in cells.iter() { @@ -986,10 +1012,10 @@ impl Function for H3CellContains { } } - results.push(result); + builder.append_option(result); } - Ok(results.to_vector()) + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } } @@ -1010,20 +1036,22 @@ impl Function for H3CellDistanceSphereKm { signature_of_double_cells() } - fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure_columns_n!(columns, 2); + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + let args = ColumnarValue::values_to_arrays(&args.args)?; + let [cell_this_vec, cell_that_vec] = utils::take_function_args(self.name(), args)?; - let cell_this_vec = &columns[0]; - let cell_that_vec = &columns[1]; let size = cell_this_vec.len(); - - let mut results = Float64VectorBuilder::with_capacity(size); + let mut builder = Float64Builder::with_capacity(size); for i in 0..size { - let result = match ( - cell_from_value(cell_this_vec.get(i))?, - cell_from_value(cell_that_vec.get(i))?, - ) { + let cell_this = + ScalarValue::try_from_array(&cell_this_vec, i).and_then(cell_from_value)?; + let cell_that = + ScalarValue::try_from_array(&cell_that_vec, i).and_then(cell_from_value)?; + let result = match (cell_this, cell_that) { (Some(cell_this), Some(cell_that)) => { let centroid_this = LatLng::from(cell_this); let centroid_that = LatLng::from(cell_that); @@ -1033,10 +1061,10 @@ impl Function for H3CellDistanceSphereKm { _ => None, }; - results.push(result); + builder.append_option(result); } - Ok(results.to_vector()) + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } } @@ -1065,20 +1093,22 @@ impl Function for H3CellDistanceEuclideanDegree { signature_of_double_cells() } - fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result { - ensure_columns_n!(columns, 2); + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + let args = ColumnarValue::values_to_arrays(&args.args)?; + let [cell_this_vec, cell_that_vec] = utils::take_function_args(self.name(), args)?; - let cell_this_vec = &columns[0]; - let cell_that_vec = &columns[1]; let size = cell_this_vec.len(); - - let mut results = Float64VectorBuilder::with_capacity(size); + let mut builder = Float64Builder::with_capacity(size); for i in 0..size { - let result = match ( - cell_from_value(cell_this_vec.get(i))?, - cell_from_value(cell_that_vec.get(i))?, - ) { + let cell_this = + ScalarValue::try_from_array(&cell_this_vec, i).and_then(cell_from_value)?; + let cell_that = + ScalarValue::try_from_array(&cell_that_vec, i).and_then(cell_from_value)?; + let result = match (cell_this, cell_that) { (Some(cell_this), Some(cell_that)) => { let centroid_this = LatLng::from(cell_this); let centroid_that = LatLng::from(cell_that); @@ -1089,59 +1119,52 @@ impl Function for H3CellDistanceEuclideanDegree { _ => None, }; - results.push(result); + builder.append_option(result); } - Ok(results.to_vector()) + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } } -fn value_to_resolution(v: Value) -> Result { - let r = match v { - Value::Int8(v) => v as u8, - Value::Int16(v) => v as u8, - Value::Int32(v) => v as u8, - Value::Int64(v) => v as u8, - Value::UInt8(v) => v, - Value::UInt16(v) => v as u8, - Value::UInt32(v) => v as u8, - Value::UInt64(v) => v as u8, - _ => unreachable!(), - }; - Resolution::try_from(r) - .map_err(|e| { - BoxedError::new(PlainError::new( - format!("H3 error: {}", e), - StatusCode::EngineExecuteQuery, - )) - }) - .context(error::ExecuteSnafu) +fn value_to_resolution(r: u8) -> datafusion_common::Result { + Resolution::try_from(r).map_err(|e| DataFusionError::Execution(format!("H3 error: {}", e))) } -fn value_to_position(v: Value) -> Result { +macro_rules! ensure_then_coerce { + ($compare:expr, $coerce:expr) => {{ + if !$compare { + return Err(datafusion_common::DataFusionError::Execution( + "Argument was outside of acceptable range".to_string(), + )); + } + Ok($coerce) + }}; +} + +fn value_to_position(v: ScalarValue) -> datafusion_common::Result { match v { - Value::Int8(v) => ensure_and_coerce!(v >= 0, v as u64), - Value::Int16(v) => ensure_and_coerce!(v >= 0, v as u64), - Value::Int32(v) => ensure_and_coerce!(v >= 0, v as u64), - Value::Int64(v) => ensure_and_coerce!(v >= 0, v as u64), - Value::UInt8(v) => Ok(v as u64), - Value::UInt16(v) => Ok(v as u64), - Value::UInt32(v) => Ok(v as u64), - Value::UInt64(v) => Ok(v), + ScalarValue::Int8(Some(v)) => ensure_then_coerce!(v >= 0, v as u64), + ScalarValue::Int16(Some(v)) => ensure_then_coerce!(v >= 0, v as u64), + ScalarValue::Int32(Some(v)) => ensure_then_coerce!(v >= 0, v as u64), + ScalarValue::Int64(Some(v)) => ensure_then_coerce!(v >= 0, v as u64), + ScalarValue::UInt8(Some(v)) => Ok(v as u64), + ScalarValue::UInt16(Some(v)) => Ok(v as u64), + ScalarValue::UInt32(Some(v)) => Ok(v as u64), + ScalarValue::UInt64(Some(v)) => Ok(v), _ => unreachable!(), } } -fn value_to_distance(v: Value) -> Result { +fn value_to_distance(v: ScalarValue) -> datafusion_common::Result { match v { - Value::Int8(v) => ensure_and_coerce!(v >= 0, v as u32), - Value::Int16(v) => ensure_and_coerce!(v >= 0, v as u32), - Value::Int32(v) => ensure_and_coerce!(v >= 0, v as u32), - Value::Int64(v) => ensure_and_coerce!(v >= 0, v as u32), - Value::UInt8(v) => Ok(v as u32), - Value::UInt16(v) => Ok(v as u32), - Value::UInt32(v) => Ok(v), - Value::UInt64(v) => Ok(v as u32), + ScalarValue::Int8(Some(v)) => ensure_then_coerce!(v >= 0, v as u32), + ScalarValue::Int16(Some(v)) => ensure_then_coerce!(v >= 0, v as u32), + ScalarValue::Int32(Some(v)) => ensure_then_coerce!(v >= 0, v as u32), + ScalarValue::Int64(Some(v)) => ensure_then_coerce!(v >= 0, v as u32), + ScalarValue::UInt8(Some(v)) => Ok(v as u32), + ScalarValue::UInt16(Some(v)) => Ok(v as u32), + ScalarValue::UInt32(Some(v)) => Ok(v), + ScalarValue::UInt64(Some(v)) => Ok(v as u32), _ => unreachable!(), } } @@ -1195,41 +1218,15 @@ fn signature_of_cell_and_distance() -> Signature { Signature::one_of(signatures, Volatility::Stable) } -fn cell_from_value(v: Value) -> Result> { - let cell = match v { - Value::Int64(v) => Some( - CellIndex::try_from(v as u64) - .map_err(|e| { - BoxedError::new(PlainError::new( - format!("H3 error: {}", e), - StatusCode::EngineExecuteQuery, - )) - }) - .context(error::ExecuteSnafu)?, - ), - Value::UInt64(v) => Some( - CellIndex::try_from(v) - .map_err(|e| { - BoxedError::new(PlainError::new( - format!("H3 error: {}", e), - StatusCode::EngineExecuteQuery, - )) - }) - .context(error::ExecuteSnafu)?, - ), - Value::String(s) => Some( - CellIndex::from_str(s.as_utf8()) - .map_err(|e| { - BoxedError::new(PlainError::new( - format!("H3 error: {}", e), - StatusCode::EngineExecuteQuery, - )) - }) - .context(error::ExecuteSnafu)?, - ), +fn cell_from_value(v: ScalarValue) -> datafusion_common::Result> { + match v { + ScalarValue::Int64(Some(v)) => Some(CellIndex::try_from(v as u64)), + ScalarValue::UInt64(Some(v)) => Some(CellIndex::try_from(v)), + ScalarValue::Utf8(Some(s)) => Some(CellIndex::from_str(&s)), _ => None, - }; - Ok(cell) + } + .transpose() + .map_err(|e| DataFusionError::Execution(format!("H3 error: {}", e))) } /// extract cell array from all possible types including: @@ -1237,91 +1234,64 @@ fn cell_from_value(v: Value) -> Result> { /// - uint64 list /// - string list /// - comma-separated string -fn cells_from_value(v: Value) -> Result> { +fn cells_from_value(v: ScalarValue) -> datafusion_common::Result> { match v { - Value::List(list) => match list.datatype() { - ConcreteDataType::Int64(_) => list - .items() + ScalarValue::List(list) => match list.value_type() { + DataType::Int64 => list + .values() + .as_primitive::() .iter() .map(|v| { - if let Value::Int64(v) = v { - CellIndex::try_from(*v as u64) - .map_err(|e| { - BoxedError::new(PlainError::new( - format!("H3 error: {}", e), - StatusCode::EngineExecuteQuery, - )) - }) - .context(error::ExecuteSnafu) + if let Some(v) = v { + CellIndex::try_from(v as u64) + .map_err(|e| DataFusionError::Execution(format!("H3 error: {}", e))) } else { - Err(BoxedError::new(PlainError::new( + Err(DataFusionError::Execution( "Invalid data type in array".to_string(), - StatusCode::EngineExecuteQuery, - ))) - .context(error::ExecuteSnafu) + )) } }) - .collect::>>(), - ConcreteDataType::UInt64(_) => list - .items() + .collect::>>(), + DataType::UInt64 => list + .values() + .as_primitive::() .iter() .map(|v| { - if let Value::UInt64(v) = v { - CellIndex::try_from(*v) - .map_err(|e| { - BoxedError::new(PlainError::new( - format!("H3 error: {}", e), - StatusCode::EngineExecuteQuery, - )) - }) - .context(error::ExecuteSnafu) + if let Some(v) = v { + CellIndex::try_from(v) + .map_err(|e| DataFusionError::Execution(format!("H3 error: {}", e))) } else { - Err(BoxedError::new(PlainError::new( + Err(DataFusionError::Execution( "Invalid data type in array".to_string(), - StatusCode::EngineExecuteQuery, - ))) - .context(error::ExecuteSnafu) + )) } }) - .collect::>>(), - ConcreteDataType::String(_) => list - .items() + .collect::>>(), + DataType::Utf8 => list + .values() + .as_string::() .iter() .map(|v| { - if let Value::String(v) = v { - CellIndex::from_str(v.as_utf8().trim()) - .map_err(|e| { - BoxedError::new(PlainError::new( - format!("H3 error: {}", e), - StatusCode::EngineExecuteQuery, - )) - }) - .context(error::ExecuteSnafu) + if let Some(v) = v { + CellIndex::from_str(v) + .map_err(|e| DataFusionError::Execution(format!("H3 error: {}", e))) } else { - Err(BoxedError::new(PlainError::new( + Err(DataFusionError::Execution( "Invalid data type in array".to_string(), - StatusCode::EngineExecuteQuery, - ))) - .context(error::ExecuteSnafu) + )) } }) - .collect::>>(), + .collect::>>(), _ => Ok(vec![]), }, - Value::String(csv) => { - let str_seq = csv.as_utf8().split(','); + ScalarValue::Utf8(Some(csv)) => { + let str_seq = csv.split(','); str_seq .map(|v| { CellIndex::from_str(v.trim()) - .map_err(|e| { - BoxedError::new(PlainError::new( - format!("H3 error: {}", e), - StatusCode::EngineExecuteQuery, - )) - }) - .context(error::ExecuteSnafu) + .map_err(|e| DataFusionError::Execution(format!("H3 error: {}", e))) }) - .collect::>>() + .collect::>>() } _ => Ok(vec![]), } diff --git a/src/common/function/src/scalars/matches.rs b/src/common/function/src/scalars/matches.rs index c450ac144d..e44c981ee8 100644 --- a/src/common/function/src/scalars/matches.rs +++ b/src/common/function/src/scalars/matches.rs @@ -16,9 +16,7 @@ use std::collections::HashMap; use std::fmt; use std::sync::Arc; -use common_query::error::{ - GeneralDataFusionSnafu, IntoVectorSnafu, InvalidFuncArgsSnafu, InvalidInputTypeSnafu, Result, -}; +use common_query::error::{IntoVectorSnafu, InvalidFuncArgsSnafu, InvalidInputTypeSnafu, Result}; use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeIterator, TreeNodeRecursion}; use datafusion::common::{DFSchema, Result as DfResult}; use datafusion::execution::SessionStateBuilder; @@ -106,21 +104,16 @@ impl MatchesFunction { let input_schema = Self::input_schema(); let session_state = SessionStateBuilder::new().with_default_features().build(); let planner = DefaultPhysicalPlanner::default(); - let physical_expr = planner - .create_physical_expr(&like_expr, &input_schema, &session_state) - .context(GeneralDataFusionSnafu)?; + let physical_expr = + planner.create_physical_expr(&like_expr, &input_schema, &session_state)?; let data_array = data.to_arrow_array(); let arrow_schema = Arc::new(input_schema.as_arrow().clone()); let input_record_batch = RecordBatch::try_new(arrow_schema, vec![data_array]).unwrap(); let num_rows = input_record_batch.num_rows(); - let result = physical_expr - .evaluate(&input_record_batch) - .context(GeneralDataFusionSnafu)?; - let result_array = result - .into_array(num_rows) - .context(GeneralDataFusionSnafu)?; + let result = physical_expr.evaluate(&input_record_batch)?; + let result_array = result.into_array(num_rows)?; let result_vector = BooleanVector::try_from_arrow_array(result_array).context(IntoVectorSnafu { data_type: DataType::Boolean, @@ -210,14 +203,12 @@ impl PatternAst { /// Transform this AST with preset rules to make it correct. fn transform_ast(self) -> Result { self.transform_up(Self::collapse_binary_branch_fn) - .context(GeneralDataFusionSnafu) .map(|data| data.data)? .transform_up(Self::eliminate_optional_fn) - .context(GeneralDataFusionSnafu) .map(|data| data.data)? .transform_down(Self::eliminate_single_child_fn) - .context(GeneralDataFusionSnafu) .map(|data| data.data) + .map_err(Into::into) } /// Collapse binary branch with the same operator. I.e., this transformer diff --git a/src/common/function/src/scalars/math.rs b/src/common/function/src/scalars/math.rs index cde96c2f81..a3baf54add 100644 --- a/src/common/function/src/scalars/math.rs +++ b/src/common/function/src/scalars/math.rs @@ -19,13 +19,12 @@ mod rate; use std::fmt; pub use clamp::{ClampFunction, ClampMaxFunction, ClampMinFunction}; -use common_query::error::{GeneralDataFusionSnafu, Result}; +use common_query::error::Result; use datafusion::arrow::datatypes::DataType; use datafusion::error::DataFusionError; use datafusion_expr::{Signature, Volatility}; use datatypes::vectors::VectorRef; pub use rate::RateFunction; -use snafu::ResultExt; use crate::function::{Function, FunctionContext}; use crate::function_registry::FunctionRegistry; @@ -68,7 +67,7 @@ impl Function for RangeFunction { .ok_or(DataFusionError::Internal( "No expr found in range_fn".into(), )) - .context(GeneralDataFusionSnafu) + .map_err(Into::into) } /// `range_fn` will never been used. As long as a legal signature is returned, the specific content of the signature does not matter. @@ -80,7 +79,7 @@ impl Function for RangeFunction { fn eval(&self, _func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result { Err(DataFusionError::Internal( "range_fn just a empty function used in range select, It should not be eval!".into(), - )) - .context(GeneralDataFusionSnafu) + ) + .into()) } } diff --git a/src/common/function/src/scalars/udf.rs b/src/common/function/src/scalars/udf.rs index 674bc4240e..b0aa19d9be 100644 --- a/src/common/function/src/scalars/udf.rs +++ b/src/common/function/src/scalars/udf.rs @@ -65,6 +65,14 @@ impl ScalarUDFImpl for ScalarUdf { &self, args: ScalarFunctionArgs, ) -> datafusion_common::Result { + let result = self.function.invoke_with_args(args.clone()); + if !matches!( + result, + Err(datafusion_common::DataFusionError::NotImplemented(_)) + ) { + return result; + } + let columns = args .args .iter() diff --git a/src/common/function/src/system/build.rs b/src/common/function/src/system/build.rs index 5517994a52..efcfbad7ba 100644 --- a/src/common/function/src/system/build.rs +++ b/src/common/function/src/system/build.rs @@ -16,11 +16,12 @@ use std::fmt; use std::sync::Arc; use common_query::error::Result; +use datafusion::arrow::array::StringViewArray; use datafusion::arrow::datatypes::DataType; -use datafusion_expr::{Signature, Volatility}; -use datatypes::vectors::{StringVector, VectorRef}; +use datafusion::logical_expr::ColumnarValue; +use datafusion_expr::{ScalarFunctionArgs, Signature, Volatility}; -use crate::function::{Function, FunctionContext}; +use crate::function::Function; /// Generates build information #[derive(Clone, Debug, Default)] @@ -38,17 +39,18 @@ impl Function for BuildFunction { } fn return_type(&self, _: &[DataType]) -> Result { - Ok(DataType::Utf8) + Ok(DataType::Utf8View) } fn signature(&self) -> Signature { Signature::nullary(Volatility::Immutable) } - fn eval(&self, _func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result { + fn invoke_with_args(&self, _: ScalarFunctionArgs) -> datafusion_common::Result { let build_info = common_version::build_info().to_string(); - let v = Arc::new(StringVector::from(vec![build_info])); - Ok(v) + Ok(ColumnarValue::Array(Arc::new(StringViewArray::from(vec![ + build_info, + ])))) } } @@ -56,16 +58,29 @@ impl Function for BuildFunction { mod tests { use std::sync::Arc; + use arrow_schema::Field; + use datafusion::arrow::array::ArrayRef; + use datafusion_common::config::ConfigOptions; + use super::*; #[test] fn test_build_function() { let build = BuildFunction; assert_eq!("build", build.name()); - assert_eq!(DataType::Utf8, build.return_type(&[]).unwrap()); + assert_eq!(DataType::Utf8View, build.return_type(&[]).unwrap()); assert_eq!(build.signature(), Signature::nullary(Volatility::Immutable)); let build_info = common_version::build_info().to_string(); - let vector = build.eval(&FunctionContext::default(), &[]).unwrap(); - let expect: VectorRef = Arc::new(StringVector::from(vec![build_info])); - assert_eq!(expect, vector); + let actual = build + .invoke_with_args(ScalarFunctionArgs { + args: vec![], + arg_fields: vec![], + number_rows: 0, + return_field: Arc::new(Field::new("x", DataType::Utf8View, false)), + config_options: Arc::new(ConfigOptions::new()), + }) + .unwrap(); + let actual = ColumnarValue::values_to_arrays(&[actual]).unwrap(); + let expect = vec![Arc::new(StringViewArray::from(vec![build_info])) as ArrayRef]; + assert_eq!(actual, expect); } } diff --git a/src/common/query/src/columnar_value.rs b/src/common/query/src/columnar_value.rs index 419bc5ed3a..0fd06c8b22 100644 --- a/src/common/query/src/columnar_value.rs +++ b/src/common/query/src/columnar_value.rs @@ -17,7 +17,7 @@ use datatypes::prelude::ConcreteDataType; use datatypes::vectors::{Helper, VectorRef}; use snafu::ResultExt; -use crate::error::{self, GeneralDataFusionSnafu, IntoVectorSnafu, Result}; +use crate::error::{self, IntoVectorSnafu, Result}; use crate::prelude::ScalarValue; /// Represents the result from an expression @@ -43,9 +43,7 @@ impl ColumnarValue { Ok(match self { ColumnarValue::Vector(v) => v, ColumnarValue::Scalar(s) => { - let v = s - .to_array_of_size(num_rows) - .context(GeneralDataFusionSnafu)?; + let v = s.to_array_of_size(num_rows)?; let data_type = v.data_type().clone(); Helper::try_into_vector(v).context(IntoVectorSnafu { data_type })? } diff --git a/src/common/query/src/error.rs b/src/common/query/src/error.rs index f3ba6cd5f9..a6271159ae 100644 --- a/src/common/query/src/error.rs +++ b/src/common/query/src/error.rs @@ -78,7 +78,7 @@ pub enum Error { location: Location, }, - #[snafu(display("General DataFusion error"))] + #[snafu(transparent)] GeneralDataFusion { #[snafu(source)] error: DataFusionError, diff --git a/src/common/query/src/logical_plan.rs b/src/common/query/src/logical_plan.rs index cecb4dd3c9..ca555a2552 100644 --- a/src/common/query/src/logical_plan.rs +++ b/src/common/query/src/logical_plan.rs @@ -24,9 +24,8 @@ use datafusion_common::{Column, TableReference}; use datafusion_expr::dml::InsertOp; use datafusion_expr::{DmlStatement, TableSource, WriteOp, col}; pub use expr::{build_filter_from_timestamp, build_same_type_ts_filter}; -use snafu::ResultExt; -use crate::error::{GeneralDataFusionSnafu, Result}; +use crate::error::Result; /// Rename columns by applying a new projection. Returns an error if the column to be /// renamed does not exist. The `renames` parameter is a `Vector` with elements @@ -122,7 +121,7 @@ pub fn add_insert_to_logical_plan( WriteOp::Insert(InsertOp::Append), Arc::new(input), )); - let plan = plan.recompute_schema().context(GeneralDataFusionSnafu)?; + let plan = plan.recompute_schema()?; Ok(plan) } diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 97fb927540..0dc14419f2 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -43,9 +43,9 @@ use table::table::adapter::DfTableProviderAdapter; use table::table_name::TableName; use crate::error::{ - CatalogSnafu, Error, ExternalSnafu, IncompleteGrpcRequestSnafu, NotSupportedSnafu, - PermissionSnafu, PlanStatementSnafu, Result, SubstraitDecodeLogicalPlanSnafu, - TableNotFoundSnafu, TableOperationSnafu, + CatalogSnafu, DataFusionSnafu, Error, ExternalSnafu, IncompleteGrpcRequestSnafu, + NotSupportedSnafu, PermissionSnafu, PlanStatementSnafu, Result, + SubstraitDecodeLogicalPlanSnafu, TableNotFoundSnafu, TableOperationSnafu, }; use crate::instance::{Instance, attach_timer}; use crate::metrics::{ @@ -395,14 +395,10 @@ impl Instance { let analyzed_plan = state .analyzer() .execute_and_check(insert_into, state.config_options(), |_, _| {}) - .context(common_query::error::GeneralDataFusionSnafu) - .context(SubstraitDecodeLogicalPlanSnafu)?; + .context(DataFusionSnafu)?; // Optimize the plan - let optimized_plan = state - .optimize(&analyzed_plan) - .context(common_query::error::GeneralDataFusionSnafu) - .context(SubstraitDecodeLogicalPlanSnafu)?; + let optimized_plan = state.optimize(&analyzed_plan).context(DataFusionSnafu)?; let output = SqlQueryHandler::do_exec_plan(self, None, optimized_plan, ctx.clone()).await?; diff --git a/src/query/src/query_engine/default_serializer.rs b/src/query/src/query_engine/default_serializer.rs index a78d0a3bd6..990e4a9192 100644 --- a/src/query/src/query_engine/default_serializer.rs +++ b/src/query/src/query_engine/default_serializer.rs @@ -175,7 +175,7 @@ impl SubstraitPlanDecoder for DefaultPlanDecoder { if optimize { self.session_state .optimize(&logical_plan) - .context(common_query::error::GeneralDataFusionSnafu) + .map_err(Into::into) } else { Ok(logical_plan) } diff --git a/tests/cases/standalone/common/function/geo.result b/tests/cases/standalone/common/function/geo.result index 054c2bcfa8..607908b3e0 100644 --- a/tests/cases/standalone/common/function/geo.result +++ b/tests/cases/standalone/common/function/geo.result @@ -24,11 +24,11 @@ SELECT h3_latlng_to_cell(37.76938, -122.3889, 8), h3_latlng_to_cell_string(37.76 SELECT h3_latlng_to_cell(37.76938, -122.3889, 100), h3_latlng_to_cell_string(37.76938, -122.3889, 100); -Error: 3001(EngineExecuteQuery), H3 error: invalid resolution (got Some(100)): out of range +Error: 3001(EngineExecuteQuery), Execution error: H3 error: invalid resolution (got Some(100)): out of range SELECT h3_latlng_to_cell(37.76938, -122.3889, -1), h3_latlng_to_cell_string(37.76938, -122.3889, -1); -Error: 3001(EngineExecuteQuery), H3 error: invalid resolution (got Some(255)): out of range +Error: 3001(EngineExecuteQuery), Cast error: Can't cast value -1 to type UInt8 SELECT h3_latlng_to_cell(37.76938, -122.3889, 8::Int8), h3_latlng_to_cell_string(37.76938, -122.3889, 8::Int8);