refactor: use DataFusion's UDAF implementation directly (#6776)

* refactor: use DataFusion's UDAF implementation directly

Signed-off-by: luofucong <luofc@foxmail.com>

* remove: delete how-to guide for writing aggregate functions

Signed-off-by: luofucong <luofc@foxmail.com>

* fix ci

Signed-off-by: luofucong <luofc@foxmail.com>

* refactor: port json_encode_path to datafusion udaf

Signed-off-by: Ning Sun <sunning@greptime.com>

---------

Signed-off-by: luofucong <luofc@foxmail.com>
Signed-off-by: Ning Sun <sunning@greptime.com>
Co-authored-by: Ning Sun <sunning@greptime.com>
This commit is contained in:
LFC
2025-08-20 16:25:00 +08:00
committed by GitHub
parent 819531393f
commit 05529387d9
17 changed files with 526 additions and 1200 deletions

View File

@@ -21,7 +21,7 @@ pub(crate) struct GeoFunction;
impl GeoFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_aggr(encoding::JsonEncodePathAccumulator::uadf_impl());
registry.register_aggr(geo_path::GeoPathAccumulator::uadf_impl());
registry.register_aggr(encoding::JsonPathAccumulator::uadf_impl());
}
}

View File

@@ -14,223 +14,332 @@
use std::sync::Arc;
use common_error::ext::{BoxedError, PlainError};
use common_error::status_code::StatusCode;
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{self, InvalidInputStateSnafu, Result};
use common_query::logical_plan::accumulator::AggrFuncTypeStore;
use common_query::logical_plan::{
create_aggregate_function, Accumulator, AggregateFunctionCreator,
use arrow::array::AsArray;
use datafusion::arrow::array::{Array, ArrayRef};
use datafusion::common::cast::as_primitive_array;
use datafusion::error::{DataFusionError, Result as DfResult};
use datafusion::logical_expr::{Accumulator as DfAccumulator, AggregateUDF, Volatility};
use datafusion::prelude::create_udaf;
use datafusion_common::cast::{as_list_array, as_struct_array};
use datafusion_common::ScalarValue;
use datatypes::arrow::array::{Float64Array, Int64Array, ListArray, StructArray};
use datatypes::arrow::datatypes::{
DataType, Field, Float64Type, Int64Type, TimeUnit, TimestampNanosecondType,
};
use common_query::prelude::AccumulatorCreatorFunction;
use common_time::Timestamp;
use datafusion_expr::AggregateUDF;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::{ListValue, Value};
use datatypes::vectors::VectorRef;
use snafu::{ensure, ResultExt};
use datatypes::compute::{self, sort_to_indices};
use crate::scalars::geo::helpers::{ensure_columns_len, ensure_columns_n};
pub const JSON_ENCODE_PATH_NAME: &str = "json_encode_path";
/// Accumulator of lat, lng, timestamp tuples
#[derive(Debug)]
pub struct JsonPathAccumulator {
timestamp_type: ConcreteDataType,
const LATITUDE_FIELD: &str = "lat";
const LONGITUDE_FIELD: &str = "lng";
const TIMESTAMP_FIELD: &str = "timestamp";
const DEFAULT_LIST_FIELD_NAME: &str = "item";
#[derive(Debug, Default)]
pub struct JsonEncodePathAccumulator {
lat: Vec<Option<f64>>,
lng: Vec<Option<f64>>,
timestamp: Vec<Option<Timestamp>>,
timestamp: Vec<Option<i64>>,
}
impl JsonPathAccumulator {
fn new(timestamp_type: ConcreteDataType) -> Self {
Self {
lat: Vec::default(),
lng: Vec::default(),
timestamp: Vec::default(),
timestamp_type,
}
impl JsonEncodePathAccumulator {
pub fn new() -> Self {
Self::default()
}
/// Create a new `AggregateUDF` for the `json_encode_path` aggregate function.
pub fn uadf_impl() -> AggregateUDF {
create_aggregate_function(
"json_encode_path".to_string(),
3,
Arc::new(JsonPathEncodeFunctionCreator::default()),
create_udaf(
JSON_ENCODE_PATH_NAME,
// Input types: lat, lng, timestamp
vec![
DataType::Float64,
DataType::Float64,
DataType::Timestamp(TimeUnit::Nanosecond, None),
],
// Output type: geojson compatible linestring
Arc::new(DataType::Utf8),
Volatility::Immutable,
// Create the accumulator
Arc::new(|_| Ok(Box::new(Self::new()))),
// Intermediate state types
Arc::new(vec![DataType::Struct(
vec![
Field::new(
LATITUDE_FIELD,
DataType::List(Arc::new(Field::new(
DEFAULT_LIST_FIELD_NAME,
DataType::Float64,
true,
))),
false,
),
Field::new(
LONGITUDE_FIELD,
DataType::List(Arc::new(Field::new(
DEFAULT_LIST_FIELD_NAME,
DataType::Float64,
true,
))),
false,
),
Field::new(
TIMESTAMP_FIELD,
DataType::List(Arc::new(Field::new(
DEFAULT_LIST_FIELD_NAME,
DataType::Int64,
true,
))),
false,
),
]
.into(),
)]),
)
.into()
}
}
impl Accumulator for JsonPathAccumulator {
fn state(&self) -> Result<Vec<Value>> {
Ok(vec![
Value::List(ListValue::new(
self.lat.iter().map(|i| Value::from(*i)).collect(),
ConcreteDataType::float64_datatype(),
)),
Value::List(ListValue::new(
self.lng.iter().map(|i| Value::from(*i)).collect(),
ConcreteDataType::float64_datatype(),
)),
Value::List(ListValue::new(
self.timestamp.iter().map(|i| Value::from(*i)).collect(),
self.timestamp_type.clone(),
)),
])
}
impl DfAccumulator for JsonEncodePathAccumulator {
fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion::error::Result<()> {
if values.len() != 3 {
return Err(DataFusionError::Internal(format!(
"Expected 3 columns for json_encode_path, got {}",
values.len()
)));
}
fn update_batch(&mut self, columns: &[VectorRef]) -> Result<()> {
// update batch as in datafusion just provides the accumulator original
// input.
//
// columns is vec of [`lat`, `lng`, `timestamp`]
// where
// - `lat` is a vector of `Value::Float64` or similar type. Each item in
// the vector is a row in given dataset.
// - so on so forth for `lng` and `timestamp`
ensure_columns_n!(columns, 3);
let lat_array = as_primitive_array::<Float64Type>(&values[0])?;
let lng_array = as_primitive_array::<Float64Type>(&values[1])?;
let ts_array = as_primitive_array::<TimestampNanosecondType>(&values[2])?;
let lat = &columns[0];
let lng = &columns[1];
let ts = &columns[2];
let size = lat.len();
let size = lat_array.len();
self.lat.reserve(size);
self.lng.reserve(size);
for idx in 0..size {
self.lat.push(lat.get(idx).as_f64_lossy());
self.lng.push(lng.get(idx).as_f64_lossy());
self.timestamp.push(ts.get(idx).as_timestamp());
self.lat.push(if lat_array.is_null(idx) {
None
} else {
Some(lat_array.value(idx))
});
self.lng.push(if lng_array.is_null(idx) {
None
} else {
Some(lng_array.value(idx))
});
self.timestamp.push(if ts_array.is_null(idx) {
None
} else {
Some(ts_array.value(idx))
});
}
Ok(())
}
fn merge_batch(&mut self, states: &[VectorRef]) -> Result<()> {
// merge batch as in datafusion gives state accumulated from the data
// returned from child accumulators' state() call
// In our particular implementation, the data structure is like
//
// states is vec of [`lat`, `lng`, `timestamp`]
// where
// - `lat` is a vector of `Value::List`. Each item in the list is all
// coordinates from a child accumulator.
// - so on so forth for `lng` and `timestamp`
fn evaluate(&mut self) -> DfResult<ScalarValue> {
let unordered_lng_array = Float64Array::from(self.lng.clone());
let unordered_lat_array = Float64Array::from(self.lat.clone());
let ts_array = Int64Array::from(self.timestamp.clone());
ensure_columns_n!(states, 3);
let ordered_indices = sort_to_indices(&ts_array, None, None)?;
let lat_array = compute::take(&unordered_lat_array, &ordered_indices, None)?;
let lng_array = compute::take(&unordered_lng_array, &ordered_indices, None)?;
let lat_lists = &states[0];
let lng_lists = &states[1];
let ts_lists = &states[2];
let len = ts_array.len();
let lat_array = lat_array.as_primitive::<Float64Type>();
let lng_array = lng_array.as_primitive::<Float64Type>();
let len = lat_lists.len();
let mut coords = Vec::with_capacity(len);
for i in 0..len {
let lng = lng_array.value(i);
let lat = lat_array.value(i);
coords.push(vec![lng, lat]);
}
for idx in 0..len {
if let Some(lat_list) = lat_lists
.get(idx)
.as_list()
.map_err(BoxedError::new)
.context(error::ExecuteSnafu)?
{
for v in lat_list.items() {
self.lat.push(v.as_f64_lossy());
}
}
let result = serde_json::to_string(&coords)
.map_err(|e| DataFusionError::Execution(format!("Failed to encode json, {}", e)))?;
if let Some(lng_list) = lng_lists
.get(idx)
.as_list()
.map_err(BoxedError::new)
.context(error::ExecuteSnafu)?
{
for v in lng_list.items() {
self.lng.push(v.as_f64_lossy());
}
}
Ok(ScalarValue::Utf8(Some(result)))
}
if let Some(ts_list) = ts_lists
.get(idx)
.as_list()
.map_err(BoxedError::new)
.context(error::ExecuteSnafu)?
{
for v in ts_list.items() {
self.timestamp.push(v.as_timestamp());
}
}
fn size(&self) -> usize {
// Base size of JsonEncodePathAccumulator struct fields
let mut total_size = std::mem::size_of::<Self>();
// Size of vectors (approximation)
total_size += self.lat.capacity() * std::mem::size_of::<Option<f64>>();
total_size += self.lng.capacity() * std::mem::size_of::<Option<f64>>();
total_size += self.timestamp.capacity() * std::mem::size_of::<Option<i64>>();
total_size
}
fn state(&mut self) -> datafusion::error::Result<Vec<ScalarValue>> {
let lat_array = Arc::new(ListArray::from_iter_primitive::<Float64Type, _, _>(vec![
Some(self.lat.clone()),
]));
let lng_array = Arc::new(ListArray::from_iter_primitive::<Float64Type, _, _>(vec![
Some(self.lng.clone()),
]));
let ts_array = Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(self.timestamp.clone()),
]));
let state_struct = StructArray::new(
vec![
Field::new(
LATITUDE_FIELD,
DataType::List(Arc::new(Field::new("item", DataType::Float64, true))),
false,
),
Field::new(
LONGITUDE_FIELD,
DataType::List(Arc::new(Field::new("item", DataType::Float64, true))),
false,
),
Field::new(
TIMESTAMP_FIELD,
DataType::List(Arc::new(Field::new("item", DataType::Int64, true))),
false,
),
]
.into(),
vec![lat_array, lng_array, ts_array],
None,
);
Ok(vec![ScalarValue::Struct(Arc::new(state_struct))])
}
fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion::error::Result<()> {
if states.len() != 1 {
return Err(DataFusionError::Internal(format!(
"Expected 1 states for json_encode_path, got {}",
states.len()
)));
}
for state in states {
let state = as_struct_array(state)?;
let lat_list = as_list_array(state.column(0))?.value(0);
let lat_array = as_primitive_array::<Float64Type>(&lat_list)?;
let lng_list = as_list_array(state.column(1))?.value(0);
let lng_array = as_primitive_array::<Float64Type>(&lng_list)?;
let ts_list = as_list_array(state.column(2))?.value(0);
let ts_array = as_primitive_array::<Int64Type>(&ts_list)?;
self.lat.extend(lat_array);
self.lng.extend(lng_array);
self.timestamp.extend(ts_array);
}
Ok(())
}
fn evaluate(&self) -> Result<Value> {
let mut work_vec: Vec<(&Option<f64>, &Option<f64>, &Option<Timestamp>)> = self
.lat
.iter()
.zip(self.lng.iter())
.zip(self.timestamp.iter())
.map(|((a, b), c)| (a, b, c))
.collect();
// sort by timestamp, we treat null timestamp as 0
work_vec.sort_unstable_by_key(|tuple| tuple.2.unwrap_or_else(|| Timestamp::new_second(0)));
let result = serde_json::to_string(
&work_vec
.into_iter()
// note that we transform to lng,lat for geojson compatibility
.map(|(lat, lng, _)| vec![lng, lat])
.collect::<Vec<Vec<&Option<f64>>>>(),
)
.map_err(|e| {
BoxedError::new(PlainError::new(
format!("Serialization failure: {}", e),
StatusCode::EngineExecuteQuery,
))
})
.context(error::ExecuteSnafu)?;
Ok(Value::String(result.into()))
}
}
/// This function accept rows of lat, lng and timestamp, sort with timestamp and
/// encoding them into a geojson-like path.
///
/// Example:
///
/// ```sql
/// SELECT json_encode_path(lat, lon, timestamp) FROM table [group by ...];
/// ```
///
#[as_aggr_func_creator]
#[derive(Debug, Default, AggrFuncTypeStore)]
pub struct JsonPathEncodeFunctionCreator {}
#[cfg(test)]
mod tests {
use datafusion::arrow::array::{Float64Array, TimestampNanosecondArray};
use datafusion::scalar::ScalarValue;
impl AggregateFunctionCreator for JsonPathEncodeFunctionCreator {
fn creator(&self) -> AccumulatorCreatorFunction {
let creator: AccumulatorCreatorFunction = Arc::new(move |types: &[ConcreteDataType]| {
let ts_type = types[2].clone();
Ok(Box::new(JsonPathAccumulator::new(ts_type)))
});
use super::*;
creator
#[test]
fn test_json_encode_path_basic() {
let mut accumulator = JsonEncodePathAccumulator::new();
// Create test data
let lat_array = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0]));
let lng_array = Arc::new(Float64Array::from(vec![4.0, 5.0, 6.0]));
let ts_array = Arc::new(TimestampNanosecondArray::from(vec![100, 200, 300]));
// Update batch
accumulator
.update_batch(&[lat_array, lng_array, ts_array])
.unwrap();
// Evaluate
let result = accumulator.evaluate().unwrap();
assert_eq!(
result,
ScalarValue::Utf8(Some("[[4.0,1.0],[5.0,2.0],[6.0,3.0]]".to_string()))
);
}
fn output_type(&self) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::string_datatype())
#[test]
fn test_json_encode_path_sort_by_timestamp() {
let mut accumulator = JsonEncodePathAccumulator::new();
// Create test data with unordered timestamps
let lat_array = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0]));
let lng_array = Arc::new(Float64Array::from(vec![4.0, 5.0, 6.0]));
let ts_array = Arc::new(TimestampNanosecondArray::from(vec![300, 100, 200]));
// Update batch
accumulator
.update_batch(&[lat_array, lng_array, ts_array])
.unwrap();
// Evaluate
let result = accumulator.evaluate().unwrap();
assert_eq!(
result,
ScalarValue::Utf8(Some("[[5.0,2.0],[6.0,3.0],[4.0,1.0]]".to_string()))
);
}
fn state_types(&self) -> Result<Vec<ConcreteDataType>> {
let input_types = self.input_types()?;
ensure!(input_types.len() == 3, InvalidInputStateSnafu);
#[test]
fn test_json_encode_path_merge() {
let mut accumulator1 = JsonEncodePathAccumulator::new();
let mut accumulator2 = JsonEncodePathAccumulator::new();
let timestamp_type = input_types[2].clone();
// Create test data for first accumulator
let lat_array1 = Arc::new(Float64Array::from(vec![1.0]));
let lng_array1 = Arc::new(Float64Array::from(vec![4.0]));
let ts_array1 = Arc::new(TimestampNanosecondArray::from(vec![100]));
Ok(vec![
ConcreteDataType::list_datatype(ConcreteDataType::float64_datatype()),
ConcreteDataType::list_datatype(ConcreteDataType::float64_datatype()),
ConcreteDataType::list_datatype(timestamp_type),
])
// Create test data for second accumulator
let lat_array2 = Arc::new(Float64Array::from(vec![2.0]));
let lng_array2 = Arc::new(Float64Array::from(vec![5.0]));
let ts_array2 = Arc::new(TimestampNanosecondArray::from(vec![200]));
// Update batches
accumulator1
.update_batch(&[lat_array1, lng_array1, ts_array1])
.unwrap();
accumulator2
.update_batch(&[lat_array2, lng_array2, ts_array2])
.unwrap();
// Get states
let state1 = accumulator1.state().unwrap();
let state2 = accumulator2.state().unwrap();
// Create a merged accumulator
let mut merged = JsonEncodePathAccumulator::new();
// Extract the struct arrays from the states
let state_array1 = match &state1[0] {
ScalarValue::Struct(array) => array.clone(),
_ => panic!("Expected Struct scalar value"),
};
let state_array2 = match &state2[0] {
ScalarValue::Struct(array) => array.clone(),
_ => panic!("Expected Struct scalar value"),
};
// Merge state arrays
merged.merge_batch(&[state_array1]).unwrap();
merged.merge_batch(&[state_array2]).unwrap();
// Evaluate merged result
let result = merged.evaluate().unwrap();
assert_eq!(
result,
ScalarValue::Utf8(Some("[[4.0,1.0],[5.0,2.0]]".to_string()))
);
}
}

View File

@@ -12,21 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::borrow::Cow;
use std::sync::Arc;
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{CreateAccumulatorSnafu, Error, InvalidFuncArgsSnafu};
use common_query::logical_plan::{
create_aggregate_function, Accumulator, AggregateFunctionCreator,
};
use common_query::prelude::AccumulatorCreatorFunction;
use datafusion_expr::AggregateUDF;
use datatypes::prelude::{ConcreteDataType, Value, *};
use datatypes::vectors::VectorRef;
use arrow::array::{Array, ArrayRef, AsArray, BinaryArray, StringArray};
use arrow_schema::{DataType, Field};
use datafusion::logical_expr::{Signature, TypeSignature, Volatility};
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::{Accumulator, AggregateUDF, SimpleAggregateUDF};
use datafusion_functions_aggregate_common::accumulator::AccumulatorArgs;
use nalgebra::{Const, DVectorView, Dyn, OVector};
use snafu::ensure;
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const, veclit_to_binlit};
use crate::scalars::vector::impl_conv::{
binlit_as_veclit, parse_veclit_from_strlit, veclit_to_binlit,
};
/// Aggregates by multiplying elements across the same dimension, returns a vector.
#[derive(Debug, Default)]
@@ -35,57 +34,42 @@ pub struct VectorProduct {
has_null: bool,
}
#[as_aggr_func_creator]
#[derive(Debug, Default, AggrFuncTypeStore)]
pub struct VectorProductCreator {}
impl AggregateFunctionCreator for VectorProductCreator {
fn creator(&self) -> AccumulatorCreatorFunction {
let creator: AccumulatorCreatorFunction = Arc::new(move |types: &[ConcreteDataType]| {
ensure!(
types.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly one, have: {}",
types.len()
)
}
);
let input_type = &types[0];
match input_type {
ConcreteDataType::String(_) | ConcreteDataType::Binary(_) => {
Ok(Box::new(VectorProduct::default()))
}
_ => {
let err_msg = format!(
"\"VEC_PRODUCT\" aggregate function not support data type {:?}",
input_type.logical_type_id(),
);
CreateAccumulatorSnafu { err_msg }.fail()?
}
}
});
creator
}
fn output_type(&self) -> common_query::error::Result<ConcreteDataType> {
Ok(ConcreteDataType::binary_datatype())
}
fn state_types(&self) -> common_query::error::Result<Vec<ConcreteDataType>> {
Ok(vec![self.output_type()?])
}
}
impl VectorProduct {
/// Create a new `AggregateUDF` for the `vec_product` aggregate function.
pub fn uadf_impl() -> AggregateUDF {
create_aggregate_function(
"vec_product".to_string(),
1,
Arc::new(VectorProductCreator::default()),
)
.into()
let signature = Signature::one_of(
vec![
TypeSignature::Exact(vec![DataType::Utf8]),
TypeSignature::Exact(vec![DataType::Binary]),
],
Volatility::Immutable,
);
let udaf = SimpleAggregateUDF::new_with_signature(
"vec_product",
signature,
DataType::Binary,
Arc::new(Self::accumulator),
vec![Arc::new(Field::new("x", DataType::Binary, true))],
);
AggregateUDF::from(udaf)
}
fn accumulator(args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
if args.schema.fields().len() != 1 {
return Err(datafusion_common::DataFusionError::Internal(format!(
"expect creating `VEC_PRODUCT` with only one input field, actual {}",
args.schema.fields().len()
)));
}
let t = args.schema.field(0).data_type();
if !matches!(t, DataType::Utf8 | DataType::Binary) {
return Err(datafusion_common::DataFusionError::Internal(format!(
"unexpected input datatype {t} when creating `VEC_PRODUCT`"
)));
}
Ok(Box::new(VectorProduct::default()))
}
fn inner(&mut self, len: usize) -> &mut OVector<f32, Dyn> {
@@ -94,67 +78,82 @@ impl VectorProduct {
})
}
fn update(&mut self, values: &[VectorRef], is_update: bool) -> Result<(), Error> {
fn update(&mut self, values: &[ArrayRef], is_update: bool) -> Result<()> {
if values.is_empty() || self.has_null {
return Ok(());
};
let column = &values[0];
let len = column.len();
match as_veclit_if_const(column)? {
Some(column) => {
let vec_column = DVectorView::from_slice(&column, column.len()).scale(len as f32);
*self.inner(vec_column.len()) =
(*self.inner(vec_column.len())).component_mul(&vec_column);
let vectors = match values[0].data_type() {
DataType::Utf8 => {
let arr: &StringArray = values[0].as_string();
arr.iter()
.filter_map(|x| x.map(|s| parse_veclit_from_strlit(s).map_err(Into::into)))
.map(|x| x.map(Cow::Owned))
.collect::<Result<Vec<_>>>()?
}
None => {
for i in 0..len {
let Some(arg0) = as_veclit(column.get_ref(i))? else {
if is_update {
self.has_null = true;
self.product = None;
}
return Ok(());
};
let vec_column = DVectorView::from_slice(&arg0, arg0.len());
*self.inner(vec_column.len()) =
(*self.inner(vec_column.len())).component_mul(&vec_column);
}
DataType::Binary => {
let arr: &BinaryArray = values[0].as_binary();
arr.iter()
.filter_map(|x| x.map(|b| binlit_as_veclit(b).map_err(Into::into)))
.collect::<Result<Vec<_>>>()?
}
_ => {
return Err(datafusion_common::DataFusionError::NotImplemented(format!(
"unsupported data type {} for `VEC_PRODUCT`",
values[0].data_type()
)))
}
};
if vectors.len() != values[0].len() {
if is_update {
self.has_null = true;
self.product = None;
}
return Ok(());
}
vectors.iter().for_each(|v| {
let v = DVectorView::from_slice(v, v.len());
let inner = self.inner(v.len());
*inner = inner.component_mul(&v);
});
Ok(())
}
}
impl Accumulator for VectorProduct {
fn state(&self) -> common_query::error::Result<Vec<Value>> {
fn state(&mut self) -> Result<Vec<ScalarValue>> {
self.evaluate().map(|v| vec![v])
}
fn update_batch(&mut self, values: &[VectorRef]) -> common_query::error::Result<()> {
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
self.update(values, true)
}
fn merge_batch(&mut self, states: &[VectorRef]) -> common_query::error::Result<()> {
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
self.update(states, false)
}
fn evaluate(&self) -> common_query::error::Result<Value> {
fn evaluate(&mut self) -> Result<ScalarValue> {
match &self.product {
None => Ok(Value::Null),
Some(vector) => {
let v = vector.as_slice();
Ok(Value::from(veclit_to_binlit(v)))
}
None => Ok(ScalarValue::Binary(None)),
Some(vector) => Ok(ScalarValue::Binary(Some(veclit_to_binlit(
vector.as_slice(),
)))),
}
}
fn size(&self) -> usize {
size_of_val(self)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use datatypes::vectors::{ConstantVector, StringVector};
use datatypes::scalars::ScalarVector;
use datatypes::vectors::{ConstantVector, StringVector, Vector};
use super::*;
@@ -165,59 +164,60 @@ mod tests {
vec_product.update_batch(&[]).unwrap();
assert!(vec_product.product.is_none());
assert!(!vec_product.has_null);
assert_eq!(Value::Null, vec_product.evaluate().unwrap());
assert_eq!(ScalarValue::Binary(None), vec_product.evaluate().unwrap());
// test update one not-null value
let mut vec_product = VectorProduct::default();
let v: Vec<VectorRef> = vec![Arc::new(StringVector::from(vec![Some(
let v: Vec<ArrayRef> = vec![Arc::new(StringArray::from(vec![Some(
"[1.0,2.0,3.0]".to_string(),
)]))];
vec_product.update_batch(&v).unwrap();
assert_eq!(
Value::from(veclit_to_binlit(&[1.0, 2.0, 3.0])),
ScalarValue::Binary(Some(veclit_to_binlit(&[1.0, 2.0, 3.0]))),
vec_product.evaluate().unwrap()
);
// test update one null value
let mut vec_product = VectorProduct::default();
let v: Vec<VectorRef> = vec![Arc::new(StringVector::from(vec![Option::<String>::None]))];
let v: Vec<ArrayRef> = vec![Arc::new(StringArray::from(vec![Option::<String>::None]))];
vec_product.update_batch(&v).unwrap();
assert_eq!(Value::Null, vec_product.evaluate().unwrap());
assert_eq!(ScalarValue::Binary(None), vec_product.evaluate().unwrap());
// test update no null-value batch
let mut vec_product = VectorProduct::default();
let v: Vec<VectorRef> = vec![Arc::new(StringVector::from(vec![
let v: Vec<ArrayRef> = vec![Arc::new(StringArray::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
Some("[4.0,5.0,6.0]".to_string()),
Some("[7.0,8.0,9.0]".to_string()),
]))];
vec_product.update_batch(&v).unwrap();
assert_eq!(
Value::from(veclit_to_binlit(&[28.0, 80.0, 162.0])),
ScalarValue::Binary(Some(veclit_to_binlit(&[28.0, 80.0, 162.0]))),
vec_product.evaluate().unwrap()
);
// test update null-value batch
let mut vec_product = VectorProduct::default();
let v: Vec<VectorRef> = vec![Arc::new(StringVector::from(vec![
let v: Vec<ArrayRef> = vec![Arc::new(StringArray::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
None,
Some("[7.0,8.0,9.0]".to_string()),
]))];
vec_product.update_batch(&v).unwrap();
assert_eq!(Value::Null, vec_product.evaluate().unwrap());
assert_eq!(ScalarValue::Binary(None), vec_product.evaluate().unwrap());
// test update with constant vector
let mut vec_product = VectorProduct::default();
let v: Vec<VectorRef> = vec![Arc::new(ConstantVector::new(
let v: Vec<ArrayRef> = vec![Arc::new(ConstantVector::new(
Arc::new(StringVector::from_vec(vec!["[1.0,2.0,3.0]".to_string()])),
4,
))];
))
.to_arrow_array()];
vec_product.update_batch(&v).unwrap();
assert_eq!(
Value::from(veclit_to_binlit(&[4.0, 8.0, 12.0])),
ScalarValue::Binary(Some(veclit_to_binlit(&[1.0, 16.0, 81.0]))),
vec_product.evaluate().unwrap()
);
}

View File

@@ -14,19 +14,18 @@
use std::sync::Arc;
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{CreateAccumulatorSnafu, Error, InvalidFuncArgsSnafu};
use common_query::logical_plan::{
create_aggregate_function, Accumulator, AggregateFunctionCreator,
use arrow::array::{Array, ArrayRef, AsArray, BinaryArray, StringArray};
use arrow_schema::{DataType, Field};
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::{
Accumulator, AggregateUDF, Signature, SimpleAggregateUDF, TypeSignature, Volatility,
};
use common_query::prelude::AccumulatorCreatorFunction;
use datafusion_expr::AggregateUDF;
use datatypes::prelude::{ConcreteDataType, Value, *};
use datatypes::vectors::VectorRef;
use datafusion_functions_aggregate_common::accumulator::AccumulatorArgs;
use nalgebra::{Const, DVectorView, Dyn, OVector};
use snafu::ensure;
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const, veclit_to_binlit};
use crate::scalars::vector::impl_conv::{
binlit_as_veclit, parse_veclit_from_strlit, veclit_to_binlit,
};
/// The accumulator for the `vec_sum` aggregate function.
#[derive(Debug, Default)]
@@ -35,57 +34,42 @@ pub struct VectorSum {
has_null: bool,
}
#[as_aggr_func_creator]
#[derive(Debug, Default, AggrFuncTypeStore)]
pub struct VectorSumCreator {}
impl AggregateFunctionCreator for VectorSumCreator {
fn creator(&self) -> AccumulatorCreatorFunction {
let creator: AccumulatorCreatorFunction = Arc::new(move |types: &[ConcreteDataType]| {
ensure!(
types.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly one, have: {}",
types.len()
)
}
);
let input_type = &types[0];
match input_type {
ConcreteDataType::String(_) | ConcreteDataType::Binary(_) => {
Ok(Box::new(VectorSum::default()))
}
_ => {
let err_msg = format!(
"\"VEC_SUM\" aggregate function not support data type {:?}",
input_type.logical_type_id(),
);
CreateAccumulatorSnafu { err_msg }.fail()?
}
}
});
creator
}
fn output_type(&self) -> common_query::error::Result<ConcreteDataType> {
Ok(ConcreteDataType::binary_datatype())
}
fn state_types(&self) -> common_query::error::Result<Vec<ConcreteDataType>> {
Ok(vec![self.output_type()?])
}
}
impl VectorSum {
/// Create a new `AggregateUDF` for the `vec_sum` aggregate function.
pub fn uadf_impl() -> AggregateUDF {
create_aggregate_function(
"vec_sum".to_string(),
1,
Arc::new(VectorSumCreator::default()),
)
.into()
let signature = Signature::one_of(
vec![
TypeSignature::Exact(vec![DataType::Utf8]),
TypeSignature::Exact(vec![DataType::Binary]),
],
Volatility::Immutable,
);
let udaf = SimpleAggregateUDF::new_with_signature(
"vec_sum",
signature,
DataType::Binary,
Arc::new(Self::accumulator),
vec![Arc::new(Field::new("x", DataType::Binary, true))],
);
AggregateUDF::from(udaf)
}
fn accumulator(args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
if args.schema.fields().len() != 1 {
return Err(datafusion_common::DataFusionError::Internal(format!(
"expect creating `VEC_SUM` with only one input field, actual {}",
args.schema.fields().len()
)));
}
let t = args.schema.field(0).data_type();
if !matches!(t, DataType::Utf8 | DataType::Binary) {
return Err(datafusion_common::DataFusionError::Internal(format!(
"unexpected input datatype {t} when creating `VEC_SUM`"
)));
}
Ok(Box::new(VectorSum::default()))
}
fn inner(&mut self, len: usize) -> &mut OVector<f32, Dyn> {
@@ -93,62 +77,87 @@ impl VectorSum {
.get_or_insert_with(|| OVector::zeros_generic(Dyn(len), Const::<1>))
}
fn update(&mut self, values: &[VectorRef], is_update: bool) -> Result<(), Error> {
fn update(&mut self, values: &[ArrayRef], is_update: bool) -> Result<()> {
if values.is_empty() || self.has_null {
return Ok(());
};
let column = &values[0];
let len = column.len();
match as_veclit_if_const(column)? {
Some(column) => {
let vec_column = DVectorView::from_slice(&column, column.len()).scale(len as f32);
*self.inner(vec_column.len()) += vec_column;
}
None => {
for i in 0..len {
let Some(arg0) = as_veclit(column.get_ref(i))? else {
match values[0].data_type() {
DataType::Utf8 => {
let arr: &StringArray = values[0].as_string();
for s in arr.iter() {
let Some(s) = s else {
if is_update {
self.has_null = true;
self.sum = None;
}
return Ok(());
};
let vec_column = DVectorView::from_slice(&arg0, arg0.len());
let values = parse_veclit_from_strlit(s)?;
let vec_column = DVectorView::from_slice(&values, values.len());
*self.inner(vec_column.len()) += vec_column;
}
}
DataType::Binary => {
let arr: &BinaryArray = values[0].as_binary();
for b in arr.iter() {
let Some(b) = b else {
if is_update {
self.has_null = true;
self.sum = None;
}
return Ok(());
};
let values = binlit_as_veclit(b)?;
let vec_column = DVectorView::from_slice(&values, values.len());
*self.inner(vec_column.len()) += vec_column;
}
}
_ => {
return Err(datafusion_common::DataFusionError::NotImplemented(format!(
"unsupported data type {} for `VEC_SUM`",
values[0].data_type()
)))
}
}
Ok(())
}
}
impl Accumulator for VectorSum {
fn state(&self) -> common_query::error::Result<Vec<Value>> {
fn state(&mut self) -> Result<Vec<ScalarValue>> {
self.evaluate().map(|v| vec![v])
}
fn update_batch(&mut self, values: &[VectorRef]) -> common_query::error::Result<()> {
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
self.update(values, true)
}
fn merge_batch(&mut self, states: &[VectorRef]) -> common_query::error::Result<()> {
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
self.update(states, false)
}
fn evaluate(&self) -> common_query::error::Result<Value> {
fn evaluate(&mut self) -> Result<ScalarValue> {
match &self.sum {
None => Ok(Value::Null),
Some(vector) => Ok(Value::from(veclit_to_binlit(vector.as_slice()))),
None => Ok(ScalarValue::Binary(None)),
Some(vector) => Ok(ScalarValue::Binary(Some(veclit_to_binlit(
vector.as_slice(),
)))),
}
}
fn size(&self) -> usize {
size_of_val(self)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use datatypes::vectors::{ConstantVector, StringVector};
use arrow::array::StringArray;
use datatypes::scalars::ScalarVector;
use datatypes::vectors::{ConstantVector, StringVector, Vector};
use super::*;
@@ -159,57 +168,58 @@ mod tests {
vec_sum.update_batch(&[]).unwrap();
assert!(vec_sum.sum.is_none());
assert!(!vec_sum.has_null);
assert_eq!(Value::Null, vec_sum.evaluate().unwrap());
assert_eq!(ScalarValue::Binary(None), vec_sum.evaluate().unwrap());
// test update one not-null value
let mut vec_sum = VectorSum::default();
let v: Vec<VectorRef> = vec![Arc::new(StringVector::from(vec![Some(
let v: Vec<ArrayRef> = vec![Arc::new(StringArray::from(vec![Some(
"[1.0,2.0,3.0]".to_string(),
)]))];
vec_sum.update_batch(&v).unwrap();
assert_eq!(
Value::from(veclit_to_binlit(&[1.0, 2.0, 3.0])),
ScalarValue::Binary(Some(veclit_to_binlit(&[1.0, 2.0, 3.0]))),
vec_sum.evaluate().unwrap()
);
// test update one null value
let mut vec_sum = VectorSum::default();
let v: Vec<VectorRef> = vec![Arc::new(StringVector::from(vec![Option::<String>::None]))];
let v: Vec<ArrayRef> = vec![Arc::new(StringArray::from(vec![Option::<String>::None]))];
vec_sum.update_batch(&v).unwrap();
assert_eq!(Value::Null, vec_sum.evaluate().unwrap());
assert_eq!(ScalarValue::Binary(None), vec_sum.evaluate().unwrap());
// test update no null-value batch
let mut vec_sum = VectorSum::default();
let v: Vec<VectorRef> = vec![Arc::new(StringVector::from(vec![
let v: Vec<ArrayRef> = vec![Arc::new(StringArray::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
Some("[4.0,5.0,6.0]".to_string()),
Some("[7.0,8.0,9.0]".to_string()),
]))];
vec_sum.update_batch(&v).unwrap();
assert_eq!(
Value::from(veclit_to_binlit(&[12.0, 15.0, 18.0])),
ScalarValue::Binary(Some(veclit_to_binlit(&[12.0, 15.0, 18.0]))),
vec_sum.evaluate().unwrap()
);
// test update null-value batch
let mut vec_sum = VectorSum::default();
let v: Vec<VectorRef> = vec![Arc::new(StringVector::from(vec![
let v: Vec<ArrayRef> = vec![Arc::new(StringArray::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
None,
Some("[7.0,8.0,9.0]".to_string()),
]))];
vec_sum.update_batch(&v).unwrap();
assert_eq!(Value::Null, vec_sum.evaluate().unwrap());
assert_eq!(ScalarValue::Binary(None), vec_sum.evaluate().unwrap());
// test update with constant vector
let mut vec_sum = VectorSum::default();
let v: Vec<VectorRef> = vec![Arc::new(ConstantVector::new(
let v: Vec<ArrayRef> = vec![Arc::new(ConstantVector::new(
Arc::new(StringVector::from_vec(vec!["[1.0,2.0,3.0]".to_string()])),
4,
))];
))
.to_arrow_array()];
vec_sum.update_batch(&v).unwrap();
assert_eq!(
Value::from(veclit_to_binlit(&[4.0, 8.0, 12.0])),
ScalarValue::Binary(Some(veclit_to_binlit(&[4.0, 8.0, 12.0]))),
vec_sum.evaluate().unwrap()
);
}