mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-28 10:50:39 +00:00
feat: uddsketch_merge udaf (#5992)
This commit is contained in:
@@ -19,4 +19,4 @@ mod uddsketch_state;
|
||||
pub use geo_path::{GeoPathAccumulator, GEO_PATH_NAME};
|
||||
pub(crate) use hll::HllStateType;
|
||||
pub use hll::{HllState, HLL_MERGE_NAME, HLL_NAME};
|
||||
pub use uddsketch_state::{UddSketchState, UDDSKETCH_STATE_NAME};
|
||||
pub use uddsketch_state::{UddSketchState, UDDSKETCH_MERGE_NAME, UDDSKETCH_STATE_NAME};
|
||||
|
||||
@@ -31,23 +31,28 @@ use datafusion::physical_plan::expressions::Literal;
|
||||
use datafusion::prelude::create_udaf;
|
||||
use datatypes::arrow::array::ArrayRef;
|
||||
use datatypes::arrow::datatypes::{DataType, Float64Type};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uddsketch::{SketchHashKey, UDDSketch};
|
||||
|
||||
pub const UDDSKETCH_STATE_NAME: &str = "uddsketch_state";
|
||||
|
||||
#[derive(Debug)]
|
||||
pub const UDDSKETCH_MERGE_NAME: &str = "uddsketch_merge";
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct UddSketchState {
|
||||
uddsketch: UDDSketch,
|
||||
error_rate: f64,
|
||||
}
|
||||
|
||||
impl UddSketchState {
|
||||
pub fn new(bucket_size: u64, error_rate: f64) -> Self {
|
||||
Self {
|
||||
uddsketch: UDDSketch::new(bucket_size, error_rate),
|
||||
error_rate,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn udf_impl() -> AggregateUDF {
|
||||
pub fn state_udf_impl() -> AggregateUDF {
|
||||
create_udaf(
|
||||
UDDSKETCH_STATE_NAME,
|
||||
vec![DataType::Int64, DataType::Float64, DataType::Float64],
|
||||
@@ -61,18 +66,55 @@ impl UddSketchState {
|
||||
)
|
||||
}
|
||||
|
||||
/// Create a UDF for the `uddsketch_merge` function.
|
||||
///
|
||||
/// `uddsketch_merge` accepts bucket size, error rate, and a binary column of states generated by `uddsketch_state`
|
||||
/// and merges them into a single state.
|
||||
///
|
||||
/// The bucket size and error rate must be the same as the original state.
|
||||
pub fn merge_udf_impl() -> AggregateUDF {
|
||||
create_udaf(
|
||||
UDDSKETCH_MERGE_NAME,
|
||||
vec![DataType::Int64, DataType::Float64, DataType::Binary],
|
||||
Arc::new(DataType::Binary),
|
||||
Volatility::Immutable,
|
||||
Arc::new(|args| {
|
||||
let (bucket_size, error_rate) = downcast_accumulator_args(args)?;
|
||||
Ok(Box::new(UddSketchState::new(bucket_size, error_rate)))
|
||||
}),
|
||||
Arc::new(vec![DataType::Binary]),
|
||||
)
|
||||
}
|
||||
|
||||
fn update(&mut self, value: f64) {
|
||||
self.uddsketch.add_value(value);
|
||||
}
|
||||
|
||||
fn merge(&mut self, raw: &[u8]) {
|
||||
if let Ok(uddsketch) = bincode::deserialize::<UDDSketch>(raw) {
|
||||
if uddsketch.count() != 0 {
|
||||
self.uddsketch.merge_sketch(&uddsketch);
|
||||
fn merge(&mut self, raw: &[u8]) -> DfResult<()> {
|
||||
if let Ok(uddsketch) = bincode::deserialize::<Self>(raw) {
|
||||
if uddsketch.uddsketch.count() != 0 {
|
||||
if self.uddsketch.max_allowed_buckets() != uddsketch.uddsketch.max_allowed_buckets()
|
||||
|| (self.error_rate - uddsketch.error_rate).abs() >= 1e-9
|
||||
{
|
||||
return Err(DataFusionError::Plan(format!(
|
||||
"Merging UDDSketch with different parameters: arguments={:?} vs actual input={:?}",
|
||||
(
|
||||
self.uddsketch.max_allowed_buckets(),
|
||||
self.error_rate
|
||||
),
|
||||
(uddsketch.uddsketch.max_allowed_buckets(), uddsketch.error_rate)
|
||||
)));
|
||||
}
|
||||
self.uddsketch.merge_sketch(&uddsketch.uddsketch);
|
||||
}
|
||||
} else {
|
||||
trace!("Warning: Failed to deserialize UDDSketch from {:?}", raw);
|
||||
return Err(DataFusionError::Plan(
|
||||
"Failed to deserialize UDDSketch from binary".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -113,9 +155,21 @@ fn downcast_accumulator_args(args: AccumulatorArgs) -> DfResult<(u64, f64)> {
|
||||
impl DfAccumulator for UddSketchState {
|
||||
fn update_batch(&mut self, values: &[ArrayRef]) -> DfResult<()> {
|
||||
let array = &values[2]; // the third column is data value
|
||||
let f64_array = as_primitive_array::<Float64Type>(array)?;
|
||||
for v in f64_array.iter().flatten() {
|
||||
self.update(v);
|
||||
match array.data_type() {
|
||||
DataType::Float64 => {
|
||||
let f64_array = as_primitive_array::<Float64Type>(array)?;
|
||||
for v in f64_array.iter().flatten() {
|
||||
self.update(v);
|
||||
}
|
||||
}
|
||||
// meaning instantiate as `uddsketch_merge`
|
||||
DataType::Binary => self.merge_batch(&[array.clone()])?,
|
||||
_ => {
|
||||
return not_impl_err!(
|
||||
"UDDSketch functions do not support data type: {}",
|
||||
array.data_type()
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -123,7 +177,7 @@ impl DfAccumulator for UddSketchState {
|
||||
|
||||
fn evaluate(&mut self) -> DfResult<ScalarValue> {
|
||||
Ok(ScalarValue::Binary(Some(
|
||||
bincode::serialize(&self.uddsketch).map_err(|e| {
|
||||
bincode::serialize(&self).map_err(|e| {
|
||||
DataFusionError::Internal(format!("Failed to serialize UDDSketch: {}", e))
|
||||
})?,
|
||||
)))
|
||||
@@ -150,7 +204,7 @@ impl DfAccumulator for UddSketchState {
|
||||
|
||||
fn state(&mut self) -> DfResult<Vec<ScalarValue>> {
|
||||
Ok(vec![ScalarValue::Binary(Some(
|
||||
bincode::serialize(&self.uddsketch).map_err(|e| {
|
||||
bincode::serialize(&self).map_err(|e| {
|
||||
DataFusionError::Internal(format!("Failed to serialize UDDSketch: {}", e))
|
||||
})?,
|
||||
))])
|
||||
@@ -160,7 +214,7 @@ impl DfAccumulator for UddSketchState {
|
||||
let array = &states[0];
|
||||
let binary_array = as_binary_array(array)?;
|
||||
for v in binary_array.iter().flatten() {
|
||||
self.merge(v);
|
||||
self.merge(v)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -182,8 +236,8 @@ mod tests {
|
||||
|
||||
let result = state.evaluate().unwrap();
|
||||
if let ScalarValue::Binary(Some(bytes)) = result {
|
||||
let deserialized: UDDSketch = bincode::deserialize(&bytes).unwrap();
|
||||
assert_eq!(deserialized.count(), 3);
|
||||
let deserialized: UddSketchState = bincode::deserialize(&bytes).unwrap();
|
||||
assert_eq!(deserialized.uddsketch.count(), 3);
|
||||
} else {
|
||||
panic!("Expected binary scalar value");
|
||||
}
|
||||
@@ -201,13 +255,15 @@ mod tests {
|
||||
// Create new state and merge the serialized data
|
||||
let mut new_state = UddSketchState::new(10, 0.01);
|
||||
if let ScalarValue::Binary(Some(bytes)) = &serialized {
|
||||
new_state.merge(bytes);
|
||||
new_state.merge(bytes).unwrap();
|
||||
|
||||
// Verify the merged state matches original by comparing deserialized values
|
||||
let original_sketch: UDDSketch = bincode::deserialize(bytes).unwrap();
|
||||
let original_sketch: UddSketchState = bincode::deserialize(bytes).unwrap();
|
||||
let original_sketch = original_sketch.uddsketch;
|
||||
let new_result = new_state.evaluate().unwrap();
|
||||
if let ScalarValue::Binary(Some(new_bytes)) = new_result {
|
||||
let new_sketch: UDDSketch = bincode::deserialize(&new_bytes).unwrap();
|
||||
let new_sketch: UddSketchState = bincode::deserialize(&new_bytes).unwrap();
|
||||
let new_sketch = new_sketch.uddsketch;
|
||||
assert_eq!(original_sketch.count(), new_sketch.count());
|
||||
assert_eq!(original_sketch.sum(), new_sketch.sum());
|
||||
assert_eq!(original_sketch.mean(), new_sketch.mean());
|
||||
@@ -244,7 +300,8 @@ mod tests {
|
||||
|
||||
let result = state.evaluate().unwrap();
|
||||
if let ScalarValue::Binary(Some(bytes)) = result {
|
||||
let deserialized: UDDSketch = bincode::deserialize(&bytes).unwrap();
|
||||
let deserialized: UddSketchState = bincode::deserialize(&bytes).unwrap();
|
||||
let deserialized = deserialized.uddsketch;
|
||||
assert_eq!(deserialized.count(), 3);
|
||||
} else {
|
||||
panic!("Expected binary scalar value");
|
||||
@@ -273,7 +330,8 @@ mod tests {
|
||||
|
||||
let result = merged_state.evaluate().unwrap();
|
||||
if let ScalarValue::Binary(Some(bytes)) = result {
|
||||
let deserialized: UDDSketch = bincode::deserialize(&bytes).unwrap();
|
||||
let deserialized: UddSketchState = bincode::deserialize(&bytes).unwrap();
|
||||
let deserialized = deserialized.uddsketch;
|
||||
assert_eq!(deserialized.count(), 2);
|
||||
} else {
|
||||
panic!("Expected binary scalar value");
|
||||
|
||||
Reference in New Issue
Block a user