mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-25 15:40:02 +00:00
Compare commits
1 Commits
basic_with
...
zhongzc/re
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
01081ef97b |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -2336,6 +2336,7 @@ dependencies = [
|
|||||||
"num-traits",
|
"num-traits",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
"paste",
|
"paste",
|
||||||
|
"promql",
|
||||||
"s2",
|
"s2",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
|
|||||||
@@ -47,6 +47,7 @@ num = "0.4"
|
|||||||
num-traits = "0.2"
|
num-traits = "0.2"
|
||||||
once_cell.workspace = true
|
once_cell.workspace = true
|
||||||
paste.workspace = true
|
paste.workspace = true
|
||||||
|
promql.workspace = true
|
||||||
s2 = { version = "0.0.12", optional = true }
|
s2 = { version = "0.0.12", optional = true }
|
||||||
serde.workspace = true
|
serde.workspace = true
|
||||||
serde_json.workspace = true
|
serde_json.workspace = true
|
||||||
|
|||||||
@@ -39,7 +39,7 @@ use crate::system::SystemFunction;
|
|||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct FunctionRegistry {
|
pub struct FunctionRegistry {
|
||||||
functions: RwLock<HashMap<String, ScalarFunctionFactory>>,
|
scalar_functions: RwLock<HashMap<String, ScalarFunctionFactory>>,
|
||||||
async_functions: RwLock<HashMap<String, AsyncFunctionRef>>,
|
async_functions: RwLock<HashMap<String, AsyncFunctionRef>>,
|
||||||
aggregate_functions: RwLock<HashMap<String, AggregateUDF>>,
|
aggregate_functions: RwLock<HashMap<String, AggregateUDF>>,
|
||||||
}
|
}
|
||||||
@@ -48,7 +48,7 @@ impl FunctionRegistry {
|
|||||||
pub fn register(&self, func: impl Into<ScalarFunctionFactory>) {
|
pub fn register(&self, func: impl Into<ScalarFunctionFactory>) {
|
||||||
let func = func.into();
|
let func = func.into();
|
||||||
let _ = self
|
let _ = self
|
||||||
.functions
|
.scalar_functions
|
||||||
.write()
|
.write()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.insert(func.name().to_string(), func);
|
.insert(func.name().to_string(), func);
|
||||||
@@ -87,13 +87,17 @@ impl FunctionRegistry {
|
|||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
pub fn get_scalar_function(&self, name: &str) -> Option<ScalarFunctionFactory> {
|
||||||
pub fn get_function(&self, name: &str) -> Option<ScalarFunctionFactory> {
|
self.scalar_functions.read().unwrap().get(name).cloned()
|
||||||
self.functions.read().unwrap().get(name).cloned()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn scalar_functions(&self) -> Vec<ScalarFunctionFactory> {
|
pub fn scalar_functions(&self) -> Vec<ScalarFunctionFactory> {
|
||||||
self.functions.read().unwrap().values().cloned().collect()
|
self.scalar_functions
|
||||||
|
.read()
|
||||||
|
.unwrap()
|
||||||
|
.values()
|
||||||
|
.cloned()
|
||||||
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn aggregate_functions(&self) -> Vec<AggregateUDF> {
|
pub fn aggregate_functions(&self) -> Vec<AggregateUDF> {
|
||||||
@@ -144,6 +148,11 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
|
|||||||
// Approximate functions
|
// Approximate functions
|
||||||
ApproximateFunction::register(&function_registry);
|
ApproximateFunction::register(&function_registry);
|
||||||
|
|
||||||
|
// PromQL aggregate functions
|
||||||
|
for aggr in promql::functions::aggr_funcs() {
|
||||||
|
function_registry.register_aggr(aggr);
|
||||||
|
}
|
||||||
|
|
||||||
Arc::new(function_registry)
|
Arc::new(function_registry)
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -156,10 +165,10 @@ mod tests {
|
|||||||
fn test_function_registry() {
|
fn test_function_registry() {
|
||||||
let registry = FunctionRegistry::default();
|
let registry = FunctionRegistry::default();
|
||||||
|
|
||||||
assert!(registry.get_function("test_and").is_none());
|
assert!(registry.get_scalar_function("test_and").is_none());
|
||||||
assert!(registry.scalar_functions().is_empty());
|
assert!(registry.scalar_functions().is_empty());
|
||||||
registry.register_scalar(TestAndFunction);
|
registry.register_scalar(TestAndFunction);
|
||||||
let _ = registry.get_function("test_and").unwrap();
|
let _ = registry.get_scalar_function("test_and").unwrap();
|
||||||
assert_eq!(1, registry.scalar_functions().len());
|
assert_eq!(1, registry.scalar_functions().len());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -34,6 +34,7 @@ pub use changes::Changes;
|
|||||||
use datafusion::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray};
|
use datafusion::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray};
|
||||||
use datafusion::error::DataFusionError;
|
use datafusion::error::DataFusionError;
|
||||||
use datafusion::physical_plan::ColumnarValue;
|
use datafusion::physical_plan::ColumnarValue;
|
||||||
|
use datafusion_expr::{AggregateUDF, ScalarUDF};
|
||||||
pub use deriv::Deriv;
|
pub use deriv::Deriv;
|
||||||
pub use extrapolate_rate::{Delta, Increase, Rate};
|
pub use extrapolate_rate::{Delta, Increase, Rate};
|
||||||
pub use holt_winters::HoltWinters;
|
pub use holt_winters::HoltWinters;
|
||||||
@@ -44,6 +45,39 @@ pub use quantile_aggr::{quantile_udaf, QUANTILE_NAME};
|
|||||||
pub use resets::Resets;
|
pub use resets::Resets;
|
||||||
pub use round::Round;
|
pub use round::Round;
|
||||||
|
|
||||||
|
/// Range functions for PromQL.
|
||||||
|
pub fn range_funcs() -> Vec<ScalarUDF> {
|
||||||
|
vec![
|
||||||
|
IDelta::<false>::scalar_udf(),
|
||||||
|
IDelta::<true>::scalar_udf(),
|
||||||
|
Rate::scalar_udf(),
|
||||||
|
Increase::scalar_udf(),
|
||||||
|
Delta::scalar_udf(),
|
||||||
|
Resets::scalar_udf(),
|
||||||
|
Changes::scalar_udf(),
|
||||||
|
Deriv::scalar_udf(),
|
||||||
|
Round::scalar_udf(),
|
||||||
|
AvgOverTime::scalar_udf(),
|
||||||
|
MinOverTime::scalar_udf(),
|
||||||
|
MaxOverTime::scalar_udf(),
|
||||||
|
SumOverTime::scalar_udf(),
|
||||||
|
CountOverTime::scalar_udf(),
|
||||||
|
LastOverTime::scalar_udf(),
|
||||||
|
AbsentOverTime::scalar_udf(),
|
||||||
|
PresentOverTime::scalar_udf(),
|
||||||
|
StddevOverTime::scalar_udf(),
|
||||||
|
StdvarOverTime::scalar_udf(),
|
||||||
|
QuantileOverTime::scalar_udf(),
|
||||||
|
PredictLinear::scalar_udf(),
|
||||||
|
HoltWinters::scalar_udf(),
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Aggregate functions for PromQL.
|
||||||
|
pub fn aggr_funcs() -> Vec<AggregateUDF> {
|
||||||
|
vec![quantile_udaf()]
|
||||||
|
}
|
||||||
|
|
||||||
/// Extracts an array from a `ColumnarValue`.
|
/// Extracts an array from a `ColumnarValue`.
|
||||||
///
|
///
|
||||||
/// If the `ColumnarValue` is a scalar, it converts it to an array of size 1.
|
/// If the `ColumnarValue` is a scalar, it converts it to an array of size 1.
|
||||||
|
|||||||
@@ -40,8 +40,8 @@ pub struct QuantileAccumulator {
|
|||||||
|
|
||||||
/// Create a quantile `AggregateUDF` for PromQL quantile operator,
|
/// Create a quantile `AggregateUDF` for PromQL quantile operator,
|
||||||
/// which calculates φ-quantile (0 ≤ φ ≤ 1) over dimensions
|
/// which calculates φ-quantile (0 ≤ φ ≤ 1) over dimensions
|
||||||
pub fn quantile_udaf() -> Arc<AggregateUDF> {
|
pub fn quantile_udaf() -> AggregateUDF {
|
||||||
Arc::new(create_udaf(
|
create_udaf(
|
||||||
QUANTILE_NAME,
|
QUANTILE_NAME,
|
||||||
// Input type: (φ, values)
|
// Input type: (φ, values)
|
||||||
vec![DataType::Float64, DataType::Float64],
|
vec![DataType::Float64, DataType::Float64],
|
||||||
@@ -63,7 +63,7 @@ pub fn quantile_udaf() -> Arc<AggregateUDF> {
|
|||||||
)]
|
)]
|
||||||
.into(),
|
.into(),
|
||||||
)]),
|
)]),
|
||||||
))
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
impl QuantileAccumulator {
|
impl QuantileAccumulator {
|
||||||
|
|||||||
@@ -1948,7 +1948,7 @@ impl PromPlanner {
|
|||||||
token::T_QUANTILE => {
|
token::T_QUANTILE => {
|
||||||
let q = Self::get_param_value_as_f64(op, param)?;
|
let q = Self::get_param_value_as_f64(op, param)?;
|
||||||
non_col_args.push(lit(q));
|
non_col_args.push(lit(q));
|
||||||
quantile_udaf()
|
Arc::new(quantile_udaf())
|
||||||
}
|
}
|
||||||
token::T_AVG => avg_udaf(),
|
token::T_AVG => avg_udaf(),
|
||||||
token::T_COUNT_VALUES | token::T_COUNT => count_udaf(),
|
token::T_COUNT_VALUES | token::T_COUNT => count_udaf(),
|
||||||
|
|||||||
@@ -28,11 +28,6 @@ use datafusion::execution::{FunctionRegistry, SessionStateBuilder};
|
|||||||
use datafusion::logical_expr::LogicalPlan;
|
use datafusion::logical_expr::LogicalPlan;
|
||||||
use datafusion_expr::UserDefinedLogicalNode;
|
use datafusion_expr::UserDefinedLogicalNode;
|
||||||
use greptime_proto::substrait_extension::MergeScan as PbMergeScan;
|
use greptime_proto::substrait_extension::MergeScan as PbMergeScan;
|
||||||
use promql::functions::{
|
|
||||||
quantile_udaf, AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, IDelta,
|
|
||||||
Increase, LastOverTime, MaxOverTime, MinOverTime, PresentOverTime, Rate, Resets, Round,
|
|
||||||
StddevOverTime, StdvarOverTime, SumOverTime,
|
|
||||||
};
|
|
||||||
use prost::Message;
|
use prost::Message;
|
||||||
use session::context::QueryContextRef;
|
use session::context::QueryContextRef;
|
||||||
use snafu::ResultExt;
|
use snafu::ResultExt;
|
||||||
@@ -117,12 +112,15 @@ impl SubstraitPlanDecoder for DefaultPlanDecoder {
|
|||||||
let mut session_state = SessionStateBuilder::new_from_existing(self.session_state.clone())
|
let mut session_state = SessionStateBuilder::new_from_existing(self.session_state.clone())
|
||||||
.with_catalog_list(catalog_list)
|
.with_catalog_list(catalog_list)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
// Substrait decoder will look up the UDFs in SessionState, so we need to register them
|
// Substrait decoder will look up the UDFs in SessionState, so we need to register them
|
||||||
// Note: the query context must be passed to set the timezone
|
// Note: the query context must be passed to set the timezone
|
||||||
// We MUST register the UDFs after we build the session state, otherwise the UDFs will be lost
|
// We MUST register the UDFs after we build the session state, otherwise the UDFs will be lost
|
||||||
// if they have the same name as the default UDFs or their alias.
|
// if they have the same name as the default UDFs or their alias.
|
||||||
// e.g. The default UDF `to_char()` has an alias `date_format()`, if we register a UDF with the name `date_format()`
|
// e.g. The default UDF `to_char()` has an alias `date_format()`, if we register a UDF with the name `date_format()`
|
||||||
// before we build the session state, the UDF will be lost.
|
// before we build the session state, the UDF will be lost.
|
||||||
|
|
||||||
|
// Scalar functions
|
||||||
for func in FUNCTION_REGISTRY.scalar_functions() {
|
for func in FUNCTION_REGISTRY.scalar_functions() {
|
||||||
let udf = func.provide(FunctionContext {
|
let udf = func.provide(FunctionContext {
|
||||||
query_ctx: self.query_ctx.clone(),
|
query_ctx: self.query_ctx.clone(),
|
||||||
@@ -133,6 +131,15 @@ impl SubstraitPlanDecoder for DefaultPlanDecoder {
|
|||||||
.context(RegisterUdfSnafu { name: func.name() })?;
|
.context(RegisterUdfSnafu { name: func.name() })?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PromQL range functions
|
||||||
|
for func in promql::functions::range_funcs() {
|
||||||
|
let name = func.name().to_string();
|
||||||
|
session_state
|
||||||
|
.register_udf(Arc::new(func))
|
||||||
|
.context(RegisterUdfSnafu { name })?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Aggregate functions
|
||||||
for func in FUNCTION_REGISTRY.aggregate_functions() {
|
for func in FUNCTION_REGISTRY.aggregate_functions() {
|
||||||
let name = func.name().to_string();
|
let name = func.name().to_string();
|
||||||
session_state
|
session_state
|
||||||
@@ -140,29 +147,6 @@ impl SubstraitPlanDecoder for DefaultPlanDecoder {
|
|||||||
.context(RegisterUdfSnafu { name })?;
|
.context(RegisterUdfSnafu { name })?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let _ = session_state.register_udaf(quantile_udaf());
|
|
||||||
|
|
||||||
let _ = session_state.register_udf(Arc::new(IDelta::<false>::scalar_udf()));
|
|
||||||
let _ = session_state.register_udf(Arc::new(IDelta::<true>::scalar_udf()));
|
|
||||||
let _ = session_state.register_udf(Arc::new(Rate::scalar_udf()));
|
|
||||||
let _ = session_state.register_udf(Arc::new(Increase::scalar_udf()));
|
|
||||||
let _ = session_state.register_udf(Arc::new(Delta::scalar_udf()));
|
|
||||||
let _ = session_state.register_udf(Arc::new(Resets::scalar_udf()));
|
|
||||||
let _ = session_state.register_udf(Arc::new(Changes::scalar_udf()));
|
|
||||||
let _ = session_state.register_udf(Arc::new(Deriv::scalar_udf()));
|
|
||||||
let _ = session_state.register_udf(Arc::new(Round::scalar_udf()));
|
|
||||||
let _ = session_state.register_udf(Arc::new(AvgOverTime::scalar_udf()));
|
|
||||||
let _ = session_state.register_udf(Arc::new(MinOverTime::scalar_udf()));
|
|
||||||
let _ = session_state.register_udf(Arc::new(MaxOverTime::scalar_udf()));
|
|
||||||
let _ = session_state.register_udf(Arc::new(SumOverTime::scalar_udf()));
|
|
||||||
let _ = session_state.register_udf(Arc::new(CountOverTime::scalar_udf()));
|
|
||||||
let _ = session_state.register_udf(Arc::new(LastOverTime::scalar_udf()));
|
|
||||||
let _ = session_state.register_udf(Arc::new(AbsentOverTime::scalar_udf()));
|
|
||||||
let _ = session_state.register_udf(Arc::new(PresentOverTime::scalar_udf()));
|
|
||||||
let _ = session_state.register_udf(Arc::new(StddevOverTime::scalar_udf()));
|
|
||||||
let _ = session_state.register_udf(Arc::new(StdvarOverTime::scalar_udf()));
|
|
||||||
// TODO(ruihang): add quantile_over_time, predict_linear, holt_winters, round
|
|
||||||
|
|
||||||
let logical_plan = DFLogicalSubstraitConvertor
|
let logical_plan = DFLogicalSubstraitConvertor
|
||||||
.decode(message, session_state)
|
.decode(message, session_state)
|
||||||
.await
|
.await
|
||||||
|
|||||||
Reference in New Issue
Block a user