mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-26 08:00:01 +00:00
Compare commits
5 Commits
flow/adjus
...
flow/admin
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0c392da638 | ||
|
|
9722482043 | ||
|
|
b5c185ed59 | ||
|
|
9df5f94662 | ||
|
|
838d3ab04e |
@@ -232,7 +232,6 @@
|
||||
| `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
|
||||
| `grpc.server_addr` | String | `127.0.0.1:4001` | The address advertised to the metasrv, and used for connections from outside the host.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `grpc.bind_addr`. |
|
||||
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
|
||||
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for frontend side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression. |
|
||||
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
|
||||
| `grpc.tls.mode` | String | `disable` | TLS mode. |
|
||||
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |
|
||||
@@ -405,7 +404,6 @@
|
||||
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
|
||||
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
|
||||
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
|
||||
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for datanode side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression. |
|
||||
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
|
||||
| `grpc.tls.mode` | String | `disable` | TLS mode. |
|
||||
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |
|
||||
|
||||
@@ -44,12 +44,6 @@ runtime_size = 8
|
||||
max_recv_message_size = "512MB"
|
||||
## The maximum send message size for gRPC server.
|
||||
max_send_message_size = "512MB"
|
||||
## Compression mode for datanode side Arrow IPC service. Available options:
|
||||
## - `none`: disable all compression
|
||||
## - `transport`: only enable gRPC transport compression (zstd)
|
||||
## - `arrow_ipc`: only enable Arrow IPC compression (lz4)
|
||||
## - `all`: enable all compression.
|
||||
flight_compression = "arrow_ipc"
|
||||
|
||||
## gRPC server TLS options, see `mysql.tls` section.
|
||||
[grpc.tls]
|
||||
|
||||
@@ -54,12 +54,6 @@ bind_addr = "127.0.0.1:4001"
|
||||
server_addr = "127.0.0.1:4001"
|
||||
## The number of server worker threads.
|
||||
runtime_size = 8
|
||||
## Compression mode for frontend side Arrow IPC service. Available options:
|
||||
## - `none`: disable all compression
|
||||
## - `transport`: only enable gRPC transport compression (zstd)
|
||||
## - `arrow_ipc`: only enable Arrow IPC compression (lz4)
|
||||
## - `all`: enable all compression.
|
||||
flight_compression = "arrow_ipc"
|
||||
|
||||
## gRPC server TLS options, see `mysql.tls` section.
|
||||
[grpc.tls]
|
||||
|
||||
@@ -12,7 +12,11 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub mod approximate;
|
||||
#[cfg(feature = "geo")]
|
||||
pub mod geo;
|
||||
pub mod vector;
|
||||
mod geo_path;
|
||||
mod hll;
|
||||
mod uddsketch_state;
|
||||
|
||||
pub use geo_path::{GeoPathAccumulator, GEO_PATH_NAME};
|
||||
pub(crate) use hll::HllStateType;
|
||||
pub use hll::{HllState, HLL_MERGE_NAME, HLL_NAME};
|
||||
pub use uddsketch_state::{UddSketchState, UDDSKETCH_MERGE_NAME, UDDSKETCH_STATE_NAME};
|
||||
@@ -47,7 +47,7 @@ impl GeoPathAccumulator {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
pub fn uadf_impl() -> AggregateUDF {
|
||||
pub fn udf_impl() -> AggregateUDF {
|
||||
create_udaf(
|
||||
GEO_PATH_NAME,
|
||||
// Input types: lat, lng, timestamp
|
||||
@@ -1,32 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
|
||||
pub(crate) mod hll;
|
||||
mod uddsketch;
|
||||
|
||||
pub(crate) struct ApproximateFunction;
|
||||
|
||||
impl ApproximateFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
// uddsketch
|
||||
registry.register_aggr(uddsketch::UddSketchState::state_udf_impl());
|
||||
registry.register_aggr(uddsketch::UddSketchState::merge_udf_impl());
|
||||
|
||||
// hll
|
||||
registry.register_aggr(hll::HllState::state_udf_impl());
|
||||
registry.register_aggr(hll::HllState::merge_udf_impl());
|
||||
}
|
||||
}
|
||||
@@ -1,27 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
|
||||
mod encoding;
|
||||
mod geo_path;
|
||||
|
||||
pub(crate) struct GeoFunction;
|
||||
|
||||
impl GeoFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register_aggr(geo_path::GeoPathAccumulator::uadf_impl());
|
||||
registry.register_aggr(encoding::JsonPathAccumulator::uadf_impl());
|
||||
}
|
||||
}
|
||||
@@ -1,29 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::aggrs::vector::product::VectorProduct;
|
||||
use crate::aggrs::vector::sum::VectorSum;
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
|
||||
mod product;
|
||||
mod sum;
|
||||
|
||||
pub(crate) struct VectorFunction;
|
||||
|
||||
impl VectorFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register_aggr(VectorSum::uadf_impl());
|
||||
registry.register_aggr(VectorProduct::uadf_impl());
|
||||
}
|
||||
}
|
||||
@@ -1,63 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use datafusion_expr::ScalarUDF;
|
||||
|
||||
use crate::function::{FunctionContext, FunctionRef};
|
||||
use crate::scalars::udf::create_udf;
|
||||
|
||||
/// A factory for creating `ScalarUDF` that require a function context.
|
||||
#[derive(Clone)]
|
||||
pub struct ScalarFunctionFactory {
|
||||
name: String,
|
||||
factory: Arc<dyn Fn(FunctionContext) -> ScalarUDF + Send + Sync>,
|
||||
}
|
||||
|
||||
impl ScalarFunctionFactory {
|
||||
/// Returns the name of the function.
|
||||
pub fn name(&self) -> &str {
|
||||
&self.name
|
||||
}
|
||||
|
||||
/// Returns a `ScalarUDF` when given a function context.
|
||||
pub fn provide(&self, ctx: FunctionContext) -> ScalarUDF {
|
||||
(self.factory)(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ScalarUDF> for ScalarFunctionFactory {
|
||||
fn from(df_udf: ScalarUDF) -> Self {
|
||||
let name = df_udf.name().to_string();
|
||||
let func = Arc::new(move |_ctx| df_udf.clone());
|
||||
Self {
|
||||
name,
|
||||
factory: func,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<FunctionRef> for ScalarFunctionFactory {
|
||||
fn from(func: FunctionRef) -> Self {
|
||||
let name = func.name().to_string();
|
||||
let func = Arc::new(move |ctx: FunctionContext| {
|
||||
create_udf(func.clone(), ctx.query_ctx, ctx.state)
|
||||
});
|
||||
Self {
|
||||
name,
|
||||
factory: func,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -16,14 +16,11 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use datafusion_expr::AggregateUDF;
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
use crate::admin::AdminFunction;
|
||||
use crate::aggrs::approximate::ApproximateFunction;
|
||||
use crate::aggrs::vector::VectorFunction as VectorAggrFunction;
|
||||
use crate::function::{AsyncFunctionRef, Function, FunctionRef};
|
||||
use crate::function_factory::ScalarFunctionFactory;
|
||||
use crate::function::{AsyncFunctionRef, FunctionRef};
|
||||
use crate::scalars::aggregate::{AggregateFunctionMetaRef, AggregateFunctions};
|
||||
use crate::scalars::date::DateFunction;
|
||||
use crate::scalars::expression::ExpressionFunction;
|
||||
use crate::scalars::hll_count::HllCalcFunction;
|
||||
@@ -34,19 +31,18 @@ use crate::scalars::matches_term::MatchesTermFunction;
|
||||
use crate::scalars::math::MathFunction;
|
||||
use crate::scalars::timestamp::TimestampFunction;
|
||||
use crate::scalars::uddsketch_calc::UddSketchCalcFunction;
|
||||
use crate::scalars::vector::VectorFunction as VectorScalarFunction;
|
||||
use crate::scalars::vector::VectorFunction;
|
||||
use crate::system::SystemFunction;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct FunctionRegistry {
|
||||
functions: RwLock<HashMap<String, ScalarFunctionFactory>>,
|
||||
functions: RwLock<HashMap<String, FunctionRef>>,
|
||||
async_functions: RwLock<HashMap<String, AsyncFunctionRef>>,
|
||||
aggregate_functions: RwLock<HashMap<String, AggregateUDF>>,
|
||||
aggregate_functions: RwLock<HashMap<String, AggregateFunctionMetaRef>>,
|
||||
}
|
||||
|
||||
impl FunctionRegistry {
|
||||
pub fn register(&self, func: impl Into<ScalarFunctionFactory>) {
|
||||
let func = func.into();
|
||||
pub fn register(&self, func: FunctionRef) {
|
||||
let _ = self
|
||||
.functions
|
||||
.write()
|
||||
@@ -54,10 +50,6 @@ impl FunctionRegistry {
|
||||
.insert(func.name().to_string(), func);
|
||||
}
|
||||
|
||||
pub fn register_scalar(&self, func: impl Function + 'static) {
|
||||
self.register(Arc::new(func) as FunctionRef);
|
||||
}
|
||||
|
||||
pub fn register_async(&self, func: AsyncFunctionRef) {
|
||||
let _ = self
|
||||
.async_functions
|
||||
@@ -66,14 +58,6 @@ impl FunctionRegistry {
|
||||
.insert(func.name().to_string(), func);
|
||||
}
|
||||
|
||||
pub fn register_aggr(&self, func: AggregateUDF) {
|
||||
let _ = self
|
||||
.aggregate_functions
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert(func.name().to_string(), func);
|
||||
}
|
||||
|
||||
pub fn get_async_function(&self, name: &str) -> Option<AsyncFunctionRef> {
|
||||
self.async_functions.read().unwrap().get(name).cloned()
|
||||
}
|
||||
@@ -87,16 +71,27 @@ impl FunctionRegistry {
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn get_function(&self, name: &str) -> Option<ScalarFunctionFactory> {
|
||||
pub fn register_aggregate_function(&self, func: AggregateFunctionMetaRef) {
|
||||
let _ = self
|
||||
.aggregate_functions
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert(func.name(), func);
|
||||
}
|
||||
|
||||
pub fn get_aggr_function(&self, name: &str) -> Option<AggregateFunctionMetaRef> {
|
||||
self.aggregate_functions.read().unwrap().get(name).cloned()
|
||||
}
|
||||
|
||||
pub fn get_function(&self, name: &str) -> Option<FunctionRef> {
|
||||
self.functions.read().unwrap().get(name).cloned()
|
||||
}
|
||||
|
||||
pub fn scalar_functions(&self) -> Vec<ScalarFunctionFactory> {
|
||||
pub fn functions(&self) -> Vec<FunctionRef> {
|
||||
self.functions.read().unwrap().values().cloned().collect()
|
||||
}
|
||||
|
||||
pub fn aggregate_functions(&self) -> Vec<AggregateUDF> {
|
||||
pub fn aggregate_functions(&self) -> Vec<AggregateFunctionMetaRef> {
|
||||
self.aggregate_functions
|
||||
.read()
|
||||
.unwrap()
|
||||
@@ -117,6 +112,9 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
|
||||
UddSketchCalcFunction::register(&function_registry);
|
||||
HllCalcFunction::register(&function_registry);
|
||||
|
||||
// Aggregate functions
|
||||
AggregateFunctions::register(&function_registry);
|
||||
|
||||
// Full text search function
|
||||
MatchesFunction::register(&function_registry);
|
||||
MatchesTermFunction::register(&function_registry);
|
||||
@@ -129,21 +127,15 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
|
||||
JsonFunction::register(&function_registry);
|
||||
|
||||
// Vector related functions
|
||||
VectorScalarFunction::register(&function_registry);
|
||||
VectorAggrFunction::register(&function_registry);
|
||||
VectorFunction::register(&function_registry);
|
||||
|
||||
// Geo functions
|
||||
#[cfg(feature = "geo")]
|
||||
crate::scalars::geo::GeoFunctions::register(&function_registry);
|
||||
#[cfg(feature = "geo")]
|
||||
crate::aggrs::geo::GeoFunction::register(&function_registry);
|
||||
|
||||
// Ip functions
|
||||
IpFunctions::register(&function_registry);
|
||||
|
||||
// Approximate functions
|
||||
ApproximateFunction::register(&function_registry);
|
||||
|
||||
Arc::new(function_registry)
|
||||
});
|
||||
|
||||
@@ -155,11 +147,12 @@ mod tests {
|
||||
#[test]
|
||||
fn test_function_registry() {
|
||||
let registry = FunctionRegistry::default();
|
||||
let func = Arc::new(TestAndFunction);
|
||||
|
||||
assert!(registry.get_function("test_and").is_none());
|
||||
assert!(registry.scalar_functions().is_empty());
|
||||
registry.register_scalar(TestAndFunction);
|
||||
assert!(registry.functions().is_empty());
|
||||
registry.register(func);
|
||||
let _ = registry.get_function("test_and").unwrap();
|
||||
assert_eq!(1, registry.scalar_functions().len());
|
||||
assert_eq!(1, registry.functions().len());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,14 +19,13 @@ mod adjust_flow;
|
||||
mod admin;
|
||||
mod flush_flow;
|
||||
mod macros;
|
||||
pub mod scalars;
|
||||
mod system;
|
||||
|
||||
pub mod aggrs;
|
||||
pub mod aggr;
|
||||
pub mod function;
|
||||
pub mod function_factory;
|
||||
pub mod function_registry;
|
||||
pub mod handlers;
|
||||
pub mod helper;
|
||||
pub mod scalars;
|
||||
pub mod state;
|
||||
pub mod utils;
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub mod aggregate;
|
||||
pub(crate) mod date;
|
||||
pub mod expression;
|
||||
#[cfg(feature = "geo")]
|
||||
|
||||
89
src/common/function/src/scalars/aggregate.rs
Normal file
89
src/common/function/src/scalars/aggregate.rs
Normal file
@@ -0,0 +1,89 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! # Deprecate Warning:
|
||||
//!
|
||||
//! This module is deprecated and will be removed in the future.
|
||||
//! All UDAF implementation here are not maintained and should
|
||||
//! not be used before they are refactored into the `src/aggr`
|
||||
//! version.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_query::logical_plan::AggregateFunctionCreatorRef;
|
||||
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
use crate::scalars::vector::product::VectorProductCreator;
|
||||
use crate::scalars::vector::sum::VectorSumCreator;
|
||||
|
||||
/// A function creates `AggregateFunctionCreator`.
|
||||
/// "Aggregator" *is* AggregatorFunction. Since the later one is long, we named an short alias for it.
|
||||
/// The two names might be used interchangeably.
|
||||
type AggregatorCreatorFunction = Arc<dyn Fn() -> AggregateFunctionCreatorRef + Send + Sync>;
|
||||
|
||||
/// `AggregateFunctionMeta` dynamically creates AggregateFunctionCreator.
|
||||
#[derive(Clone)]
|
||||
pub struct AggregateFunctionMeta {
|
||||
name: String,
|
||||
args_count: u8,
|
||||
creator: AggregatorCreatorFunction,
|
||||
}
|
||||
|
||||
pub type AggregateFunctionMetaRef = Arc<AggregateFunctionMeta>;
|
||||
|
||||
impl AggregateFunctionMeta {
|
||||
pub fn new(name: &str, args_count: u8, creator: AggregatorCreatorFunction) -> Self {
|
||||
Self {
|
||||
name: name.to_string(),
|
||||
args_count,
|
||||
creator,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn name(&self) -> String {
|
||||
self.name.to_string()
|
||||
}
|
||||
|
||||
pub fn args_count(&self) -> u8 {
|
||||
self.args_count
|
||||
}
|
||||
|
||||
pub fn create(&self) -> AggregateFunctionCreatorRef {
|
||||
(self.creator)()
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct AggregateFunctions;
|
||||
|
||||
impl AggregateFunctions {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register_aggregate_function(Arc::new(AggregateFunctionMeta::new(
|
||||
"vec_sum",
|
||||
1,
|
||||
Arc::new(|| Arc::new(VectorSumCreator::default())),
|
||||
)));
|
||||
registry.register_aggregate_function(Arc::new(AggregateFunctionMeta::new(
|
||||
"vec_product",
|
||||
1,
|
||||
Arc::new(|| Arc::new(VectorProductCreator::default())),
|
||||
)));
|
||||
|
||||
#[cfg(feature = "geo")]
|
||||
registry.register_aggregate_function(Arc::new(AggregateFunctionMeta::new(
|
||||
"json_encode_path",
|
||||
3,
|
||||
Arc::new(|| Arc::new(super::geo::encoding::JsonPathEncodeFunctionCreator::default())),
|
||||
)));
|
||||
}
|
||||
}
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
mod date_add;
|
||||
mod date_format;
|
||||
mod date_sub;
|
||||
@@ -26,8 +27,8 @@ pub(crate) struct DateFunction;
|
||||
|
||||
impl DateFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register_scalar(DateAddFunction);
|
||||
registry.register_scalar(DateSubFunction);
|
||||
registry.register_scalar(DateFormatFunction);
|
||||
registry.register(Arc::new(DateAddFunction));
|
||||
registry.register(Arc::new(DateSubFunction));
|
||||
registry.register(Arc::new(DateFormatFunction));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,6 +17,8 @@ mod ctx;
|
||||
mod is_null;
|
||||
mod unary;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
pub use binary::scalar_binary_op;
|
||||
pub use ctx::EvalContext;
|
||||
pub use unary::scalar_unary_op;
|
||||
@@ -28,6 +30,6 @@ pub(crate) struct ExpressionFunction;
|
||||
|
||||
impl ExpressionFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register_scalar(IsNullFunction);
|
||||
registry.register(Arc::new(IsNullFunction));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,9 +12,11 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
pub(crate) mod encoding;
|
||||
mod geohash;
|
||||
mod h3;
|
||||
pub(crate) mod helpers;
|
||||
mod helpers;
|
||||
mod measure;
|
||||
mod relation;
|
||||
mod s2;
|
||||
@@ -27,57 +29,57 @@ pub(crate) struct GeoFunctions;
|
||||
impl GeoFunctions {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
// geohash
|
||||
registry.register_scalar(geohash::GeohashFunction);
|
||||
registry.register_scalar(geohash::GeohashNeighboursFunction);
|
||||
registry.register(Arc::new(geohash::GeohashFunction));
|
||||
registry.register(Arc::new(geohash::GeohashNeighboursFunction));
|
||||
|
||||
// h3 index
|
||||
registry.register_scalar(h3::H3LatLngToCell);
|
||||
registry.register_scalar(h3::H3LatLngToCellString);
|
||||
registry.register(Arc::new(h3::H3LatLngToCell));
|
||||
registry.register(Arc::new(h3::H3LatLngToCellString));
|
||||
|
||||
// h3 index inspection
|
||||
registry.register_scalar(h3::H3CellBase);
|
||||
registry.register_scalar(h3::H3CellIsPentagon);
|
||||
registry.register_scalar(h3::H3StringToCell);
|
||||
registry.register_scalar(h3::H3CellToString);
|
||||
registry.register_scalar(h3::H3CellCenterLatLng);
|
||||
registry.register_scalar(h3::H3CellResolution);
|
||||
registry.register(Arc::new(h3::H3CellBase));
|
||||
registry.register(Arc::new(h3::H3CellIsPentagon));
|
||||
registry.register(Arc::new(h3::H3StringToCell));
|
||||
registry.register(Arc::new(h3::H3CellToString));
|
||||
registry.register(Arc::new(h3::H3CellCenterLatLng));
|
||||
registry.register(Arc::new(h3::H3CellResolution));
|
||||
|
||||
// h3 hierarchical grid
|
||||
registry.register_scalar(h3::H3CellCenterChild);
|
||||
registry.register_scalar(h3::H3CellParent);
|
||||
registry.register_scalar(h3::H3CellToChildren);
|
||||
registry.register_scalar(h3::H3CellToChildrenSize);
|
||||
registry.register_scalar(h3::H3CellToChildPos);
|
||||
registry.register_scalar(h3::H3ChildPosToCell);
|
||||
registry.register_scalar(h3::H3CellContains);
|
||||
registry.register(Arc::new(h3::H3CellCenterChild));
|
||||
registry.register(Arc::new(h3::H3CellParent));
|
||||
registry.register(Arc::new(h3::H3CellToChildren));
|
||||
registry.register(Arc::new(h3::H3CellToChildrenSize));
|
||||
registry.register(Arc::new(h3::H3CellToChildPos));
|
||||
registry.register(Arc::new(h3::H3ChildPosToCell));
|
||||
registry.register(Arc::new(h3::H3CellContains));
|
||||
|
||||
// h3 grid traversal
|
||||
registry.register_scalar(h3::H3GridDisk);
|
||||
registry.register_scalar(h3::H3GridDiskDistances);
|
||||
registry.register_scalar(h3::H3GridDistance);
|
||||
registry.register_scalar(h3::H3GridPathCells);
|
||||
registry.register(Arc::new(h3::H3GridDisk));
|
||||
registry.register(Arc::new(h3::H3GridDiskDistances));
|
||||
registry.register(Arc::new(h3::H3GridDistance));
|
||||
registry.register(Arc::new(h3::H3GridPathCells));
|
||||
|
||||
// h3 measurement
|
||||
registry.register_scalar(h3::H3CellDistanceSphereKm);
|
||||
registry.register_scalar(h3::H3CellDistanceEuclideanDegree);
|
||||
registry.register(Arc::new(h3::H3CellDistanceSphereKm));
|
||||
registry.register(Arc::new(h3::H3CellDistanceEuclideanDegree));
|
||||
|
||||
// s2
|
||||
registry.register_scalar(s2::S2LatLngToCell);
|
||||
registry.register_scalar(s2::S2CellLevel);
|
||||
registry.register_scalar(s2::S2CellToToken);
|
||||
registry.register_scalar(s2::S2CellParent);
|
||||
registry.register(Arc::new(s2::S2LatLngToCell));
|
||||
registry.register(Arc::new(s2::S2CellLevel));
|
||||
registry.register(Arc::new(s2::S2CellToToken));
|
||||
registry.register(Arc::new(s2::S2CellParent));
|
||||
|
||||
// spatial data type
|
||||
registry.register_scalar(wkt::LatLngToPointWkt);
|
||||
registry.register(Arc::new(wkt::LatLngToPointWkt));
|
||||
|
||||
// spatial relation
|
||||
registry.register_scalar(relation::STContains);
|
||||
registry.register_scalar(relation::STWithin);
|
||||
registry.register_scalar(relation::STIntersects);
|
||||
registry.register(Arc::new(relation::STContains));
|
||||
registry.register(Arc::new(relation::STWithin));
|
||||
registry.register(Arc::new(relation::STIntersects));
|
||||
|
||||
// spatial measure
|
||||
registry.register_scalar(measure::STDistance);
|
||||
registry.register_scalar(measure::STDistanceSphere);
|
||||
registry.register_scalar(measure::STArea);
|
||||
registry.register(Arc::new(measure::STDistance));
|
||||
registry.register(Arc::new(measure::STDistanceSphere));
|
||||
registry.register(Arc::new(measure::STArea));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,12 +19,9 @@ use common_error::status_code::StatusCode;
|
||||
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
|
||||
use common_query::error::{self, InvalidInputStateSnafu, Result};
|
||||
use common_query::logical_plan::accumulator::AggrFuncTypeStore;
|
||||
use common_query::logical_plan::{
|
||||
create_aggregate_function, Accumulator, AggregateFunctionCreator,
|
||||
};
|
||||
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
|
||||
use common_query::prelude::AccumulatorCreatorFunction;
|
||||
use common_time::Timestamp;
|
||||
use datafusion_expr::AggregateUDF;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::value::{ListValue, Value};
|
||||
use datatypes::vectors::VectorRef;
|
||||
@@ -50,16 +47,6 @@ impl JsonPathAccumulator {
|
||||
timestamp_type,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new `AggregateUDF` for the `json_encode_path` aggregate function.
|
||||
pub fn uadf_impl() -> AggregateUDF {
|
||||
create_aggregate_function(
|
||||
"json_encode_path".to_string(),
|
||||
3,
|
||||
Arc::new(JsonPathEncodeFunctionCreator::default()),
|
||||
)
|
||||
.into()
|
||||
}
|
||||
}
|
||||
|
||||
impl Accumulator for JsonPathAccumulator {
|
||||
@@ -37,7 +37,7 @@ macro_rules! ensure_columns_len {
|
||||
};
|
||||
}
|
||||
|
||||
pub(crate) use ensure_columns_len;
|
||||
pub(super) use ensure_columns_len;
|
||||
|
||||
macro_rules! ensure_columns_n {
|
||||
($columns:ident, $n:literal) => {
|
||||
@@ -58,7 +58,7 @@ macro_rules! ensure_columns_n {
|
||||
};
|
||||
}
|
||||
|
||||
pub(crate) use ensure_columns_n;
|
||||
pub(super) use ensure_columns_n;
|
||||
|
||||
macro_rules! ensure_and_coerce {
|
||||
($compare:expr, $coerce:expr) => {{
|
||||
@@ -72,4 +72,4 @@ macro_rules! ensure_and_coerce {
|
||||
}};
|
||||
}
|
||||
|
||||
pub(crate) use ensure_and_coerce;
|
||||
pub(super) use ensure_and_coerce;
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
use std::fmt;
|
||||
use std::fmt::Display;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_query::error::{DowncastVectorSnafu, InvalidFuncArgsSnafu, Result};
|
||||
use common_query::prelude::{Signature, Volatility};
|
||||
@@ -26,7 +27,7 @@ use datatypes::vectors::{BinaryVector, MutableVector, UInt64VectorBuilder, Vecto
|
||||
use hyperloglogplus::HyperLogLog;
|
||||
use snafu::OptionExt;
|
||||
|
||||
use crate::aggrs::approximate::hll::HllStateType;
|
||||
use crate::aggr::HllStateType;
|
||||
use crate::function::{Function, FunctionContext};
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
|
||||
@@ -43,7 +44,7 @@ pub struct HllCalcFunction;
|
||||
|
||||
impl HllCalcFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register_scalar(HllCalcFunction);
|
||||
registry.register(Arc::new(HllCalcFunction));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -116,8 +117,6 @@ impl Function for HllCalcFunction {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use datatypes::vectors::BinaryVector;
|
||||
|
||||
use super::*;
|
||||
|
||||
@@ -17,6 +17,8 @@ mod ipv4;
|
||||
mod ipv6;
|
||||
mod range;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use cidr::{Ipv4ToCidr, Ipv6ToCidr};
|
||||
use ipv4::{Ipv4NumToString, Ipv4StringToNum};
|
||||
use ipv6::{Ipv6NumToString, Ipv6StringToNum};
|
||||
@@ -29,15 +31,15 @@ pub(crate) struct IpFunctions;
|
||||
impl IpFunctions {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
// Register IPv4 functions
|
||||
registry.register_scalar(Ipv4NumToString);
|
||||
registry.register_scalar(Ipv4StringToNum);
|
||||
registry.register_scalar(Ipv4ToCidr);
|
||||
registry.register_scalar(Ipv4InRange);
|
||||
registry.register(Arc::new(Ipv4NumToString));
|
||||
registry.register(Arc::new(Ipv4StringToNum));
|
||||
registry.register(Arc::new(Ipv4ToCidr));
|
||||
registry.register(Arc::new(Ipv4InRange));
|
||||
|
||||
// Register IPv6 functions
|
||||
registry.register_scalar(Ipv6NumToString);
|
||||
registry.register_scalar(Ipv6StringToNum);
|
||||
registry.register_scalar(Ipv6ToCidr);
|
||||
registry.register_scalar(Ipv6InRange);
|
||||
registry.register(Arc::new(Ipv6NumToString));
|
||||
registry.register(Arc::new(Ipv6StringToNum));
|
||||
registry.register(Arc::new(Ipv6ToCidr));
|
||||
registry.register(Arc::new(Ipv6InRange));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
pub mod json_get;
|
||||
mod json_is;
|
||||
mod json_path_exists;
|
||||
@@ -32,23 +33,23 @@ pub(crate) struct JsonFunction;
|
||||
|
||||
impl JsonFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register_scalar(JsonToStringFunction);
|
||||
registry.register_scalar(ParseJsonFunction);
|
||||
registry.register(Arc::new(JsonToStringFunction));
|
||||
registry.register(Arc::new(ParseJsonFunction));
|
||||
|
||||
registry.register_scalar(JsonGetInt);
|
||||
registry.register_scalar(JsonGetFloat);
|
||||
registry.register_scalar(JsonGetString);
|
||||
registry.register_scalar(JsonGetBool);
|
||||
registry.register(Arc::new(JsonGetInt));
|
||||
registry.register(Arc::new(JsonGetFloat));
|
||||
registry.register(Arc::new(JsonGetString));
|
||||
registry.register(Arc::new(JsonGetBool));
|
||||
|
||||
registry.register_scalar(JsonIsNull);
|
||||
registry.register_scalar(JsonIsInt);
|
||||
registry.register_scalar(JsonIsFloat);
|
||||
registry.register_scalar(JsonIsString);
|
||||
registry.register_scalar(JsonIsBool);
|
||||
registry.register_scalar(JsonIsArray);
|
||||
registry.register_scalar(JsonIsObject);
|
||||
registry.register(Arc::new(JsonIsNull));
|
||||
registry.register(Arc::new(JsonIsInt));
|
||||
registry.register(Arc::new(JsonIsFloat));
|
||||
registry.register(Arc::new(JsonIsString));
|
||||
registry.register(Arc::new(JsonIsBool));
|
||||
registry.register(Arc::new(JsonIsArray));
|
||||
registry.register(Arc::new(JsonIsObject));
|
||||
|
||||
registry.register_scalar(json_path_exists::JsonPathExistsFunction);
|
||||
registry.register_scalar(json_path_match::JsonPathMatchFunction);
|
||||
registry.register(Arc::new(json_path_exists::JsonPathExistsFunction));
|
||||
registry.register(Arc::new(json_path_match::JsonPathMatchFunction));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,11 +38,11 @@ use crate::function_registry::FunctionRegistry;
|
||||
///
|
||||
/// Usage: matches(`<col>`, `<pattern>`) -> boolean
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct MatchesFunction;
|
||||
pub(crate) struct MatchesFunction;
|
||||
|
||||
impl MatchesFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register_scalar(MatchesFunction);
|
||||
registry.register(Arc::new(MatchesFunction));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -77,7 +77,7 @@ pub struct MatchesTermFunction;
|
||||
|
||||
impl MatchesTermFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register_scalar(MatchesTermFunction);
|
||||
registry.register(Arc::new(MatchesTermFunction));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ mod pow;
|
||||
mod rate;
|
||||
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub use clamp::{ClampFunction, ClampMaxFunction, ClampMinFunction};
|
||||
use common_query::error::{GeneralDataFusionSnafu, Result};
|
||||
@@ -38,13 +39,13 @@ pub(crate) struct MathFunction;
|
||||
|
||||
impl MathFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register_scalar(ModuloFunction);
|
||||
registry.register_scalar(PowFunction);
|
||||
registry.register_scalar(RateFunction);
|
||||
registry.register_scalar(RangeFunction);
|
||||
registry.register_scalar(ClampFunction);
|
||||
registry.register_scalar(ClampMinFunction);
|
||||
registry.register_scalar(ClampMaxFunction);
|
||||
registry.register(Arc::new(ModuloFunction));
|
||||
registry.register(Arc::new(PowFunction));
|
||||
registry.register(Arc::new(RateFunction));
|
||||
registry.register(Arc::new(RangeFunction));
|
||||
registry.register(Arc::new(ClampFunction));
|
||||
registry.register(Arc::new(ClampMinFunction));
|
||||
registry.register(Arc::new(ClampMaxFunction));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
mod to_unixtime;
|
||||
|
||||
use to_unixtime::ToUnixtimeFunction;
|
||||
@@ -22,6 +23,6 @@ pub(crate) struct TimestampFunction;
|
||||
|
||||
impl TimestampFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register_scalar(ToUnixtimeFunction);
|
||||
registry.register(Arc::new(ToUnixtimeFunction));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
use std::fmt;
|
||||
use std::fmt::Display;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_query::error::{DowncastVectorSnafu, InvalidFuncArgsSnafu, Result};
|
||||
use common_query::prelude::{Signature, Volatility};
|
||||
@@ -43,7 +44,7 @@ pub struct UddSketchCalcFunction;
|
||||
|
||||
impl UddSketchCalcFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register_scalar(UddSketchCalcFunction);
|
||||
registry.register(Arc::new(UddSketchCalcFunction));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -17,8 +17,10 @@ mod distance;
|
||||
mod elem_product;
|
||||
mod elem_sum;
|
||||
pub mod impl_conv;
|
||||
pub(crate) mod product;
|
||||
mod scalar_add;
|
||||
mod scalar_mul;
|
||||
pub(crate) mod sum;
|
||||
mod vector_add;
|
||||
mod vector_dim;
|
||||
mod vector_div;
|
||||
@@ -28,34 +30,37 @@ mod vector_norm;
|
||||
mod vector_sub;
|
||||
mod vector_subvector;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
|
||||
pub(crate) struct VectorFunction;
|
||||
|
||||
impl VectorFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
// conversion
|
||||
registry.register_scalar(convert::ParseVectorFunction);
|
||||
registry.register_scalar(convert::VectorToStringFunction);
|
||||
registry.register(Arc::new(convert::ParseVectorFunction));
|
||||
registry.register(Arc::new(convert::VectorToStringFunction));
|
||||
|
||||
// distance
|
||||
registry.register_scalar(distance::CosDistanceFunction);
|
||||
registry.register_scalar(distance::DotProductFunction);
|
||||
registry.register_scalar(distance::L2SqDistanceFunction);
|
||||
registry.register(Arc::new(distance::CosDistanceFunction));
|
||||
registry.register(Arc::new(distance::DotProductFunction));
|
||||
registry.register(Arc::new(distance::L2SqDistanceFunction));
|
||||
|
||||
// scalar calculation
|
||||
registry.register_scalar(scalar_add::ScalarAddFunction);
|
||||
registry.register_scalar(scalar_mul::ScalarMulFunction);
|
||||
registry.register(Arc::new(scalar_add::ScalarAddFunction));
|
||||
registry.register(Arc::new(scalar_mul::ScalarMulFunction));
|
||||
|
||||
// vector calculation
|
||||
registry.register_scalar(vector_add::VectorAddFunction);
|
||||
registry.register_scalar(vector_sub::VectorSubFunction);
|
||||
registry.register_scalar(vector_mul::VectorMulFunction);
|
||||
registry.register_scalar(vector_div::VectorDivFunction);
|
||||
registry.register_scalar(vector_norm::VectorNormFunction);
|
||||
registry.register_scalar(vector_dim::VectorDimFunction);
|
||||
registry.register_scalar(vector_kth_elem::VectorKthElemFunction);
|
||||
registry.register_scalar(vector_subvector::VectorSubvectorFunction);
|
||||
registry.register_scalar(elem_sum::ElemSumFunction);
|
||||
registry.register_scalar(elem_product::ElemProductFunction);
|
||||
registry.register(Arc::new(vector_add::VectorAddFunction));
|
||||
registry.register(Arc::new(vector_sub::VectorSubFunction));
|
||||
registry.register(Arc::new(vector_mul::VectorMulFunction));
|
||||
registry.register(Arc::new(vector_div::VectorDivFunction));
|
||||
registry.register(Arc::new(vector_norm::VectorNormFunction));
|
||||
registry.register(Arc::new(vector_dim::VectorDimFunction));
|
||||
registry.register(Arc::new(vector_kth_elem::VectorKthElemFunction));
|
||||
registry.register(Arc::new(vector_subvector::VectorSubvectorFunction));
|
||||
registry.register(Arc::new(elem_sum::ElemSumFunction));
|
||||
registry.register(Arc::new(elem_product::ElemProductFunction));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,11 +16,8 @@ use std::sync::Arc;
|
||||
|
||||
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
|
||||
use common_query::error::{CreateAccumulatorSnafu, Error, InvalidFuncArgsSnafu};
|
||||
use common_query::logical_plan::{
|
||||
create_aggregate_function, Accumulator, AggregateFunctionCreator,
|
||||
};
|
||||
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
|
||||
use common_query::prelude::AccumulatorCreatorFunction;
|
||||
use datafusion_expr::AggregateUDF;
|
||||
use datatypes::prelude::{ConcreteDataType, Value, *};
|
||||
use datatypes::vectors::VectorRef;
|
||||
use nalgebra::{Const, DVectorView, Dyn, OVector};
|
||||
@@ -78,16 +75,6 @@ impl AggregateFunctionCreator for VectorProductCreator {
|
||||
}
|
||||
|
||||
impl VectorProduct {
|
||||
/// Create a new `AggregateUDF` for the `vec_product` aggregate function.
|
||||
pub fn uadf_impl() -> AggregateUDF {
|
||||
create_aggregate_function(
|
||||
"vec_product".to_string(),
|
||||
1,
|
||||
Arc::new(VectorProductCreator::default()),
|
||||
)
|
||||
.into()
|
||||
}
|
||||
|
||||
fn inner(&mut self, len: usize) -> &mut OVector<f32, Dyn> {
|
||||
self.product.get_or_insert_with(|| {
|
||||
OVector::from_iterator_generic(Dyn(len), Const::<1>, (0..len).map(|_| 1.0))
|
||||
@@ -16,11 +16,8 @@ use std::sync::Arc;
|
||||
|
||||
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
|
||||
use common_query::error::{CreateAccumulatorSnafu, Error, InvalidFuncArgsSnafu};
|
||||
use common_query::logical_plan::{
|
||||
create_aggregate_function, Accumulator, AggregateFunctionCreator,
|
||||
};
|
||||
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
|
||||
use common_query::prelude::AccumulatorCreatorFunction;
|
||||
use datafusion_expr::AggregateUDF;
|
||||
use datatypes::prelude::{ConcreteDataType, Value, *};
|
||||
use datatypes::vectors::VectorRef;
|
||||
use nalgebra::{Const, DVectorView, Dyn, OVector};
|
||||
@@ -28,7 +25,6 @@ use snafu::ensure;
|
||||
|
||||
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const, veclit_to_binlit};
|
||||
|
||||
/// The accumulator for the `vec_sum` aggregate function.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct VectorSum {
|
||||
sum: Option<OVector<f32, Dyn>>,
|
||||
@@ -78,16 +74,6 @@ impl AggregateFunctionCreator for VectorSumCreator {
|
||||
}
|
||||
|
||||
impl VectorSum {
|
||||
/// Create a new `AggregateUDF` for the `vec_sum` aggregate function.
|
||||
pub fn uadf_impl() -> AggregateUDF {
|
||||
create_aggregate_function(
|
||||
"vec_sum".to_string(),
|
||||
1,
|
||||
Arc::new(VectorSumCreator::default()),
|
||||
)
|
||||
.into()
|
||||
}
|
||||
|
||||
fn inner(&mut self, len: usize) -> &mut OVector<f32, Dyn> {
|
||||
self.sum
|
||||
.get_or_insert_with(|| OVector::zeros_generic(Dyn(len), Const::<1>))
|
||||
@@ -36,13 +36,13 @@ pub(crate) struct SystemFunction;
|
||||
|
||||
impl SystemFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register_scalar(BuildFunction);
|
||||
registry.register_scalar(VersionFunction);
|
||||
registry.register_scalar(CurrentSchemaFunction);
|
||||
registry.register_scalar(DatabaseFunction);
|
||||
registry.register_scalar(SessionUserFunction);
|
||||
registry.register_scalar(ReadPreferenceFunction);
|
||||
registry.register_scalar(TimezoneFunction);
|
||||
registry.register(Arc::new(BuildFunction));
|
||||
registry.register(Arc::new(VersionFunction));
|
||||
registry.register(Arc::new(CurrentSchemaFunction));
|
||||
registry.register(Arc::new(DatabaseFunction));
|
||||
registry.register(Arc::new(SessionUserFunction));
|
||||
registry.register(Arc::new(ReadPreferenceFunction));
|
||||
registry.register(Arc::new(TimezoneFunction));
|
||||
registry.register_async(Arc::new(ProcedureStateFunction));
|
||||
PGCatalogFunction::register(registry);
|
||||
}
|
||||
|
||||
@@ -16,6 +16,8 @@ mod pg_get_userbyid;
|
||||
mod table_is_visible;
|
||||
mod version;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use pg_get_userbyid::PGGetUserByIdFunction;
|
||||
use table_is_visible::PGTableIsVisibleFunction;
|
||||
use version::PGVersionFunction;
|
||||
@@ -33,8 +35,8 @@ pub(super) struct PGCatalogFunction;
|
||||
|
||||
impl PGCatalogFunction {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
registry.register_scalar(PGTableIsVisibleFunction);
|
||||
registry.register_scalar(PGGetUserByIdFunction);
|
||||
registry.register_scalar(PGVersionFunction);
|
||||
registry.register(Arc::new(PGTableIsVisibleFunction));
|
||||
registry.register(Arc::new(PGGetUserByIdFunction));
|
||||
registry.register(Arc::new(PGVersionFunction));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -64,19 +64,6 @@ impl Default for FlightEncoder {
|
||||
}
|
||||
|
||||
impl FlightEncoder {
|
||||
/// Creates new [FlightEncoder] with compression disabled.
|
||||
pub fn with_compression_disabled() -> Self {
|
||||
let write_options = writer::IpcWriteOptions::default()
|
||||
.try_with_compression(None)
|
||||
.unwrap();
|
||||
|
||||
Self {
|
||||
write_options,
|
||||
data_gen: writer::IpcDataGenerator::default(),
|
||||
dictionary_tracker: writer::DictionaryTracker::new(false),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn encode(&mut self, flight_message: FlightMessage) -> FlightData {
|
||||
match flight_message {
|
||||
FlightMessage::Schema(schema) => SchemaAsIpc::new(&schema, &self.write_options).into(),
|
||||
|
||||
@@ -372,7 +372,6 @@ impl DatanodeBuilder {
|
||||
opts.max_concurrent_queries,
|
||||
//TODO: revaluate the hardcoded timeout on the next version of datanode concurrency limiter.
|
||||
Duration::from_millis(100),
|
||||
opts.grpc.flight_compression,
|
||||
);
|
||||
|
||||
let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
|
||||
|
||||
@@ -50,7 +50,6 @@ use query::QueryEngineRef;
|
||||
use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
|
||||
use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
|
||||
use servers::grpc::region_server::RegionServerHandler;
|
||||
use servers::grpc::FlightCompression;
|
||||
use session::context::{QueryContextBuilder, QueryContextRef};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::metric_engine_consts::{
|
||||
@@ -81,7 +80,6 @@ use crate::event_listener::RegionServerEventListenerRef;
|
||||
#[derive(Clone)]
|
||||
pub struct RegionServer {
|
||||
inner: Arc<RegionServerInner>,
|
||||
flight_compression: FlightCompression,
|
||||
}
|
||||
|
||||
pub struct RegionStat {
|
||||
@@ -95,7 +93,6 @@ impl RegionServer {
|
||||
query_engine: QueryEngineRef,
|
||||
runtime: Runtime,
|
||||
event_listener: RegionServerEventListenerRef,
|
||||
flight_compression: FlightCompression,
|
||||
) -> Self {
|
||||
Self::with_table_provider(
|
||||
query_engine,
|
||||
@@ -104,7 +101,6 @@ impl RegionServer {
|
||||
Arc::new(DummyTableProviderFactory),
|
||||
0,
|
||||
Duration::from_millis(0),
|
||||
flight_compression,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -115,7 +111,6 @@ impl RegionServer {
|
||||
table_provider_factory: TableProviderFactoryRef,
|
||||
max_concurrent_queries: usize,
|
||||
concurrent_query_limiter_timeout: Duration,
|
||||
flight_compression: FlightCompression,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(RegionServerInner::new(
|
||||
@@ -128,7 +123,6 @@ impl RegionServer {
|
||||
concurrent_query_limiter_timeout,
|
||||
),
|
||||
)),
|
||||
flight_compression,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -542,11 +536,7 @@ impl FlightCraft for RegionServer {
|
||||
.trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
|
||||
.await?;
|
||||
|
||||
let stream = Box::pin(FlightRecordBatchStream::new(
|
||||
result,
|
||||
tracing_context,
|
||||
self.flight_compression,
|
||||
));
|
||||
let stream = Box::pin(FlightRecordBatchStream::new(result, tracing_context));
|
||||
Ok(Response::new(stream))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,16 +19,16 @@ use std::time::Duration;
|
||||
use api::region::RegionResponse;
|
||||
use async_trait::async_trait;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_function::function_factory::ScalarFunctionFactory;
|
||||
use common_function::function::FunctionRef;
|
||||
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
|
||||
use common_query::Output;
|
||||
use common_runtime::runtime::{BuilderBuild, RuntimeTrait};
|
||||
use common_runtime::Runtime;
|
||||
use datafusion_expr::{AggregateUDF, LogicalPlan};
|
||||
use datafusion_expr::LogicalPlan;
|
||||
use query::dataframe::DataFrame;
|
||||
use query::planner::LogicalPlanner;
|
||||
use query::query_engine::{DescribeResult, QueryEngineState};
|
||||
use query::{QueryEngine, QueryEngineContext};
|
||||
use servers::grpc::FlightCompression;
|
||||
use session::context::QueryContextRef;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::region_engine::{
|
||||
@@ -76,9 +76,9 @@ impl QueryEngine for MockQueryEngine {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn register_aggregate_function(&self, _func: AggregateUDF) {}
|
||||
fn register_aggregate_function(&self, _func: AggregateFunctionMetaRef) {}
|
||||
|
||||
fn register_scalar_function(&self, _func: ScalarFunctionFactory) {}
|
||||
fn register_function(&self, _func: FunctionRef) {}
|
||||
|
||||
fn read_table(&self, _table: TableRef) -> query::error::Result<DataFrame> {
|
||||
unimplemented!()
|
||||
@@ -98,7 +98,6 @@ pub fn mock_region_server() -> RegionServer {
|
||||
Arc::new(MockQueryEngine),
|
||||
Runtime::builder().build().unwrap(),
|
||||
Box::new(NoopRegionServerEventListener),
|
||||
FlightCompression::default(),
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -214,60 +214,48 @@ impl DirtyTimeWindows {
|
||||
|
||||
// get the first `window_cnt` time windows
|
||||
let max_time_range = window_size * window_cnt as i32;
|
||||
|
||||
let mut to_be_query = BTreeMap::new();
|
||||
let mut new_windows = self.windows.clone();
|
||||
let mut cur_time_range = chrono::Duration::zero();
|
||||
for (idx, (start, end)) in self.windows.iter().enumerate() {
|
||||
let first_end = start
|
||||
.add_duration(window_size.to_std().unwrap())
|
||||
.context(TimeSnafu)?;
|
||||
let end = end.unwrap_or(first_end);
|
||||
|
||||
// if time range is too long, stop
|
||||
if cur_time_range >= max_time_range {
|
||||
break;
|
||||
}
|
||||
|
||||
// if we have enough time windows, stop
|
||||
if idx >= window_cnt {
|
||||
break;
|
||||
}
|
||||
|
||||
if let Some(x) = end.sub(start) {
|
||||
if cur_time_range + x <= max_time_range {
|
||||
to_be_query.insert(*start, Some(end));
|
||||
new_windows.remove(start);
|
||||
cur_time_range += x;
|
||||
} else {
|
||||
// too large a window, split it
|
||||
// split at window_size * times
|
||||
let surplus = max_time_range - cur_time_range;
|
||||
let times = surplus.num_seconds() / window_size.num_seconds();
|
||||
|
||||
let split_offset = window_size * times as i32;
|
||||
let split_at = start
|
||||
.add_duration(split_offset.to_std().unwrap())
|
||||
.context(TimeSnafu)?;
|
||||
to_be_query.insert(*start, Some(split_at));
|
||||
|
||||
// remove the original window
|
||||
new_windows.remove(start);
|
||||
new_windows.insert(split_at, Some(end));
|
||||
cur_time_range += split_offset;
|
||||
let nth = {
|
||||
let mut cur_time_range = chrono::Duration::zero();
|
||||
let mut nth_key = None;
|
||||
for (idx, (start, end)) in self.windows.iter().enumerate() {
|
||||
// if time range is too long, stop
|
||||
if cur_time_range > max_time_range {
|
||||
nth_key = Some(*start);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.windows = new_windows;
|
||||
// if we have enough time windows, stop
|
||||
if idx >= window_cnt {
|
||||
nth_key = Some(*start);
|
||||
break;
|
||||
}
|
||||
|
||||
if let Some(end) = end {
|
||||
if let Some(x) = end.sub(start) {
|
||||
cur_time_range += x;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
nth_key
|
||||
};
|
||||
let first_nth = {
|
||||
if let Some(nth) = nth {
|
||||
let mut after = self.windows.split_off(&nth);
|
||||
std::mem::swap(&mut self.windows, &mut after);
|
||||
|
||||
after
|
||||
} else {
|
||||
std::mem::take(&mut self.windows)
|
||||
}
|
||||
};
|
||||
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
|
||||
.with_label_values(&[
|
||||
flow_id.to_string().as_str(),
|
||||
format!("{}", window_size).as_str(),
|
||||
])
|
||||
.observe(to_be_query.len() as f64);
|
||||
.observe(first_nth.len() as f64);
|
||||
|
||||
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT
|
||||
.with_label_values(&[
|
||||
@@ -276,7 +264,7 @@ impl DirtyTimeWindows {
|
||||
])
|
||||
.observe(self.windows.len() as f64);
|
||||
|
||||
let full_time_range = to_be_query
|
||||
let full_time_range = first_nth
|
||||
.iter()
|
||||
.fold(chrono::Duration::zero(), |acc, (start, end)| {
|
||||
if let Some(end) = end {
|
||||
@@ -294,7 +282,7 @@ impl DirtyTimeWindows {
|
||||
.observe(full_time_range);
|
||||
|
||||
let mut expr_lst = vec![];
|
||||
for (start, end) in to_be_query.into_iter() {
|
||||
for (start, end) in first_nth.into_iter() {
|
||||
// align using time window exprs
|
||||
let (start, end) = if let Some(ctx) = task_ctx {
|
||||
let Some(time_window_expr) = &ctx.config.time_window_expr else {
|
||||
@@ -528,64 +516,6 @@ mod test {
|
||||
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
|
||||
)
|
||||
),
|
||||
// split range
|
||||
(
|
||||
Vec::from_iter((0..20).map(|i|Timestamp::new_second(i*3)).chain(std::iter::once(
|
||||
Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
|
||||
))),
|
||||
(chrono::Duration::seconds(3), None),
|
||||
BTreeMap::from([
|
||||
(
|
||||
Timestamp::new_second(0),
|
||||
Some(Timestamp::new_second(
|
||||
60
|
||||
)),
|
||||
),
|
||||
(
|
||||
Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
|
||||
Some(Timestamp::new_second(
|
||||
60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1) + 3
|
||||
)),
|
||||
)]),
|
||||
Some(
|
||||
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
|
||||
)
|
||||
),
|
||||
// split 2 min into 1 min
|
||||
(
|
||||
Vec::from_iter((0..40).map(|i|Timestamp::new_second(i*3))),
|
||||
(chrono::Duration::seconds(3), None),
|
||||
BTreeMap::from([
|
||||
(
|
||||
Timestamp::new_second(0),
|
||||
Some(Timestamp::new_second(
|
||||
40 * 3
|
||||
)),
|
||||
)]),
|
||||
Some(
|
||||
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
|
||||
)
|
||||
),
|
||||
// split 3s + 1min into 3s + 57s
|
||||
(
|
||||
Vec::from_iter(std::iter::once(Timestamp::new_second(0)).chain((0..40).map(|i|Timestamp::new_second(20+i*3)))),
|
||||
(chrono::Duration::seconds(3), None),
|
||||
BTreeMap::from([
|
||||
(
|
||||
Timestamp::new_second(0),
|
||||
Some(Timestamp::new_second(
|
||||
3
|
||||
)),
|
||||
),(
|
||||
Timestamp::new_second(20),
|
||||
Some(Timestamp::new_second(
|
||||
140
|
||||
)),
|
||||
)]),
|
||||
Some(
|
||||
"(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:03' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:00:20' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:17' AS TIMESTAMP))))",
|
||||
)
|
||||
),
|
||||
// expired
|
||||
(
|
||||
vec![
|
||||
@@ -602,8 +532,6 @@ mod test {
|
||||
None
|
||||
),
|
||||
];
|
||||
// let len = testcases.len();
|
||||
// let testcases = testcases[(len - 2)..(len - 1)].to_vec();
|
||||
for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
|
||||
testcases
|
||||
{
|
||||
|
||||
@@ -17,7 +17,7 @@ use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_function::function::{FunctionContext, FunctionRef};
|
||||
use common_function::function::FunctionContext;
|
||||
use datafusion_substrait::extensions::Extensions;
|
||||
use datatypes::data_type::ConcreteDataType as CDT;
|
||||
use query::QueryEngine;
|
||||
@@ -108,13 +108,9 @@ impl FunctionExtensions {
|
||||
|
||||
/// register flow-specific functions to the query engine
|
||||
pub fn register_function_to_query_engine(engine: &Arc<dyn QueryEngine>) {
|
||||
let tumble_fn = Arc::new(TumbleFunction::new("tumble")) as FunctionRef;
|
||||
let tumble_start_fn = Arc::new(TumbleFunction::new(TUMBLE_START)) as FunctionRef;
|
||||
let tumble_end_fn = Arc::new(TumbleFunction::new(TUMBLE_END)) as FunctionRef;
|
||||
|
||||
engine.register_scalar_function(tumble_fn.into());
|
||||
engine.register_scalar_function(tumble_start_fn.into());
|
||||
engine.register_scalar_function(tumble_end_fn.into());
|
||||
engine.register_function(Arc::new(TumbleFunction::new("tumble")));
|
||||
engine.register_function(Arc::new(TumbleFunction::new(TUMBLE_START)));
|
||||
engine.register_function(Arc::new(TumbleFunction::new(TUMBLE_END)));
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
||||
@@ -154,7 +154,6 @@ where
|
||||
ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()),
|
||||
user_provider.clone(),
|
||||
runtime,
|
||||
opts.grpc.flight_compression,
|
||||
);
|
||||
|
||||
let grpc_server = builder
|
||||
|
||||
@@ -282,15 +282,14 @@ mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::SemanticType;
|
||||
use common_function::function::FunctionRef;
|
||||
use common_function::function_factory::ScalarFunctionFactory;
|
||||
use common_function::scalars::matches::MatchesFunction;
|
||||
use common_function::scalars::matches_term::MatchesTermFunction;
|
||||
use common_function::function_registry::FUNCTION_REGISTRY;
|
||||
use common_function::scalars::udf::create_udf;
|
||||
use datafusion::functions::string::lower;
|
||||
use datafusion_common::Column;
|
||||
use datafusion_expr::expr::ScalarFunction;
|
||||
use datafusion_expr::ScalarUDF;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use session::context::QueryContext;
|
||||
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
@@ -318,17 +317,19 @@ mod tests {
|
||||
}
|
||||
|
||||
fn matches_func() -> Arc<ScalarUDF> {
|
||||
Arc::new(
|
||||
ScalarFunctionFactory::from(Arc::new(MatchesFunction) as FunctionRef)
|
||||
.provide(Default::default()),
|
||||
)
|
||||
Arc::new(create_udf(
|
||||
FUNCTION_REGISTRY.get_function("matches").unwrap(),
|
||||
QueryContext::arc(),
|
||||
Default::default(),
|
||||
))
|
||||
}
|
||||
|
||||
fn matches_term_func() -> Arc<ScalarUDF> {
|
||||
Arc::new(
|
||||
ScalarFunctionFactory::from(Arc::new(MatchesTermFunction) as FunctionRef)
|
||||
.provide(Default::default()),
|
||||
)
|
||||
Arc::new(create_udf(
|
||||
FUNCTION_REGISTRY.get_function("matches_term").unwrap(),
|
||||
QueryContext::arc(),
|
||||
Default::default(),
|
||||
))
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -25,7 +25,8 @@ use async_trait::async_trait;
|
||||
use common_base::Plugins;
|
||||
use common_catalog::consts::is_readonly_schema;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_function::function_factory::ScalarFunctionFactory;
|
||||
use common_function::function::FunctionRef;
|
||||
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
|
||||
use common_query::{Output, OutputData, OutputMeta};
|
||||
use common_recordbatch::adapter::RecordBatchStreamAdapter;
|
||||
use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream};
|
||||
@@ -34,9 +35,7 @@ use datafusion::physical_plan::analyze::AnalyzeExec;
|
||||
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
|
||||
use datafusion::physical_plan::ExecutionPlan;
|
||||
use datafusion_common::ResolvedTableReference;
|
||||
use datafusion_expr::{
|
||||
AggregateUDF, DmlStatement, LogicalPlan as DfLogicalPlan, LogicalPlan, WriteOp,
|
||||
};
|
||||
use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, LogicalPlan, WriteOp};
|
||||
use datatypes::prelude::VectorRef;
|
||||
use datatypes::schema::Schema;
|
||||
use futures_util::StreamExt;
|
||||
@@ -455,14 +454,14 @@ impl QueryEngine for DatafusionQueryEngine {
|
||||
/// `SELECT "my_UDAF"(x)` will look for an aggregate named `"my_UDAF"`
|
||||
///
|
||||
/// So it's better to make UDAF name lowercase when creating one.
|
||||
fn register_aggregate_function(&self, func: AggregateUDF) {
|
||||
self.state.register_aggr_function(func);
|
||||
fn register_aggregate_function(&self, func: AggregateFunctionMetaRef) {
|
||||
self.state.register_aggregate_function(func);
|
||||
}
|
||||
|
||||
/// Register an scalar function.
|
||||
/// Register an UDF function.
|
||||
/// Will override if the function with same name is already registered.
|
||||
fn register_scalar_function(&self, func: ScalarFunctionFactory) {
|
||||
self.state.register_scalar_function(func);
|
||||
fn register_function(&self, func: FunctionRef) {
|
||||
self.state.register_function(func);
|
||||
}
|
||||
|
||||
fn read_table(&self, table: TableRef) -> Result<DataFrame> {
|
||||
|
||||
@@ -18,7 +18,12 @@ use std::sync::Arc;
|
||||
|
||||
use arrow_schema::DataType;
|
||||
use catalog::table_source::DfTableSourceProvider;
|
||||
use common_function::function::FunctionContext;
|
||||
use common_function::aggr::{
|
||||
GeoPathAccumulator, HllState, UddSketchState, GEO_PATH_NAME, HLL_MERGE_NAME, HLL_NAME,
|
||||
UDDSKETCH_MERGE_NAME, UDDSKETCH_STATE_NAME,
|
||||
};
|
||||
use common_function::scalars::udf::create_udf;
|
||||
use common_query::logical_plan::create_aggregate_function;
|
||||
use datafusion::common::TableReference;
|
||||
use datafusion::datasource::cte_worktable::CteWorkTable;
|
||||
use datafusion::datasource::file_format::{format_as_file_type, FileFormatFactory};
|
||||
@@ -146,21 +151,38 @@ impl ContextProvider for DfContextProviderAdapter {
|
||||
}
|
||||
|
||||
fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
|
||||
self.engine_state.scalar_function(name).map_or_else(
|
||||
self.engine_state.udf_function(name).map_or_else(
|
||||
|| self.session_state.scalar_functions().get(name).cloned(),
|
||||
|func| {
|
||||
Some(Arc::new(func.provide(FunctionContext {
|
||||
query_ctx: self.query_ctx.clone(),
|
||||
state: self.engine_state.function_state(),
|
||||
})))
|
||||
Some(Arc::new(create_udf(
|
||||
func,
|
||||
self.query_ctx.clone(),
|
||||
self.engine_state.function_state(),
|
||||
)))
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
|
||||
self.engine_state.aggr_function(name).map_or_else(
|
||||
if name == UDDSKETCH_STATE_NAME {
|
||||
return Some(Arc::new(UddSketchState::state_udf_impl()));
|
||||
} else if name == UDDSKETCH_MERGE_NAME {
|
||||
return Some(Arc::new(UddSketchState::merge_udf_impl()));
|
||||
} else if name == HLL_NAME {
|
||||
return Some(Arc::new(HllState::state_udf_impl()));
|
||||
} else if name == HLL_MERGE_NAME {
|
||||
return Some(Arc::new(HllState::merge_udf_impl()));
|
||||
} else if name == GEO_PATH_NAME {
|
||||
return Some(Arc::new(GeoPathAccumulator::udf_impl()));
|
||||
}
|
||||
|
||||
self.engine_state.aggregate_function(name).map_or_else(
|
||||
|| self.session_state.aggregate_functions().get(name).cloned(),
|
||||
|func| Some(Arc::new(func)),
|
||||
|func| {
|
||||
Some(Arc::new(
|
||||
create_aggregate_function(func.name(), func.args_count(), func.create()).into(),
|
||||
))
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
@@ -191,13 +213,13 @@ impl ContextProvider for DfContextProviderAdapter {
|
||||
}
|
||||
|
||||
fn udf_names(&self) -> Vec<String> {
|
||||
let mut names = self.engine_state.scalar_names();
|
||||
let mut names = self.engine_state.udf_names();
|
||||
names.extend(self.session_state.scalar_functions().keys().cloned());
|
||||
names
|
||||
}
|
||||
|
||||
fn udaf_names(&self) -> Vec<String> {
|
||||
let mut names = self.engine_state.aggr_names();
|
||||
let mut names = self.engine_state.udaf_names();
|
||||
names.extend(self.session_state.aggregate_functions().keys().cloned());
|
||||
names
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::debug;
|
||||
use datafusion::datasource::DefaultTableSource;
|
||||
use datafusion::error::Result as DfResult;
|
||||
use datafusion_common::config::ConfigOptions;
|
||||
@@ -154,6 +155,7 @@ struct PlanRewriter {
|
||||
/// Partition columns of the table in current pass
|
||||
partition_cols: Option<Vec<String>>,
|
||||
column_requirements: HashSet<Column>,
|
||||
expand_on_next_call: bool,
|
||||
}
|
||||
|
||||
impl PlanRewriter {
|
||||
@@ -174,6 +176,10 @@ impl PlanRewriter {
|
||||
{
|
||||
return true;
|
||||
}
|
||||
if self.expand_on_next_call {
|
||||
self.expand_on_next_call = false;
|
||||
return true;
|
||||
}
|
||||
match Categorizer::check_plan(plan, self.partition_cols.clone()) {
|
||||
Commutativity::Commutative => {}
|
||||
Commutativity::PartialCommutative => {
|
||||
@@ -190,12 +196,17 @@ impl PlanRewriter {
|
||||
self.stage.push(plan)
|
||||
}
|
||||
}
|
||||
Commutativity::TransformedCommutative(transformer) => {
|
||||
Commutativity::TransformedCommutative {
|
||||
transformer,
|
||||
expand_on_parent,
|
||||
} => {
|
||||
if let Some(transformer) = transformer
|
||||
&& let Some(plan) = transformer(plan)
|
||||
&& let Some(changed_plan) = transformer(plan)
|
||||
{
|
||||
self.update_column_requirements(&plan);
|
||||
self.stage.push(plan)
|
||||
debug!("PlanRewriter: transformed plan: {changed_plan} from {plan}");
|
||||
self.update_column_requirements(&changed_plan);
|
||||
self.stage.push(changed_plan);
|
||||
self.expand_on_next_call = expand_on_parent;
|
||||
}
|
||||
}
|
||||
Commutativity::NonCommutative
|
||||
@@ -391,10 +402,21 @@ impl TreeNodeRewriter for PlanRewriter {
|
||||
return Ok(Transformed::yes(node));
|
||||
};
|
||||
|
||||
let parent = parent.clone();
|
||||
|
||||
// TODO(ruihang): avoid this clone
|
||||
if self.should_expand(&parent.clone()) {
|
||||
if self.should_expand(&parent) {
|
||||
// TODO(ruihang): does this work for nodes with multiple children?;
|
||||
let node = self.expand(node)?;
|
||||
debug!("PlanRewriter: should expand child:\n {node}\n Of Parent: {parent}");
|
||||
let node = self.expand(node);
|
||||
debug!(
|
||||
"PlanRewriter: expanded plan: {}",
|
||||
match &node {
|
||||
Ok(n) => n.to_string(),
|
||||
Err(e) => format!("Error expanding plan: {e}"),
|
||||
}
|
||||
);
|
||||
let node = node?;
|
||||
self.pop_stack();
|
||||
return Ok(Transformed::yes(node));
|
||||
}
|
||||
|
||||
@@ -15,6 +15,9 @@
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_function::aggr::{HllState, UddSketchState, HLL_NAME, UDDSKETCH_STATE_NAME};
|
||||
use common_telemetry::debug;
|
||||
use datafusion::functions_aggregate::sum::sum_udaf;
|
||||
use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNode};
|
||||
use promql::extension_plan::{
|
||||
EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize,
|
||||
@@ -23,12 +26,157 @@ use promql::extension_plan::{
|
||||
use crate::dist_plan::merge_sort::{merge_sort_transformer, MergeSortLogicalPlan};
|
||||
use crate::dist_plan::MergeScanLogicalPlan;
|
||||
|
||||
/// generate the upper aggregation plan that will execute on the frontend.
|
||||
pub fn step_aggr_to_upper_aggr(aggr_plan: &LogicalPlan) -> datafusion_common::Result<LogicalPlan> {
|
||||
let LogicalPlan::Aggregate(aggr) = aggr_plan else {
|
||||
return Err(datafusion_common::DataFusionError::Plan(
|
||||
"step_aggr_to_upper_aggr only accepts Aggregate plan".to_string(),
|
||||
));
|
||||
};
|
||||
if !is_all_aggr_exprs_steppable(&aggr.aggr_expr) {
|
||||
return Err(datafusion_common::DataFusionError::NotImplemented(
|
||||
"Some aggregate expressions are not steppable".to_string(),
|
||||
));
|
||||
}
|
||||
let mut upper_aggr_expr = vec![];
|
||||
for aggr_expr in &aggr.aggr_expr {
|
||||
let Some(aggr_func) = get_aggr_func(aggr_expr) else {
|
||||
return Err(datafusion_common::DataFusionError::NotImplemented(
|
||||
"Aggregate function not found".to_string(),
|
||||
));
|
||||
};
|
||||
let col_name = aggr_expr.name_for_alias()?;
|
||||
let input_column =
|
||||
Expr::Column(datafusion_common::Column::new_unqualified(col_name.clone()));
|
||||
let upper_func = match aggr_func.func.name() {
|
||||
"sum" | "min" | "max" | "last_value" | "first_value" => {
|
||||
// aggr_calc(aggr_merge(input_column))) as col_name
|
||||
let mut new_aggr_func = aggr_func.clone();
|
||||
new_aggr_func.args = vec![input_column.clone()];
|
||||
new_aggr_func
|
||||
}
|
||||
"count" => {
|
||||
// sum(input_column) as col_name
|
||||
let mut new_aggr_func = aggr_func.clone();
|
||||
new_aggr_func.func = sum_udaf();
|
||||
new_aggr_func.args = vec![input_column.clone()];
|
||||
new_aggr_func
|
||||
}
|
||||
UDDSKETCH_STATE_NAME => {
|
||||
// udd_merge(bucket_size, error_rate input_column) as col_name
|
||||
let mut new_aggr_func = aggr_func.clone();
|
||||
new_aggr_func.func = Arc::new(UddSketchState::merge_udf_impl());
|
||||
new_aggr_func.args[2] = input_column.clone();
|
||||
new_aggr_func
|
||||
}
|
||||
HLL_NAME => {
|
||||
// hll_merge(input_column) as col_name
|
||||
let mut new_aggr_func = aggr_func.clone();
|
||||
new_aggr_func.func = Arc::new(HllState::merge_udf_impl());
|
||||
new_aggr_func.args = vec![input_column.clone()];
|
||||
new_aggr_func
|
||||
}
|
||||
_ => {
|
||||
return Err(datafusion_common::DataFusionError::NotImplemented(format!(
|
||||
"Aggregate function {} is not supported for Step aggregation",
|
||||
aggr_func.func.name()
|
||||
)))
|
||||
}
|
||||
};
|
||||
|
||||
// deal with nested alias case
|
||||
let mut new_aggr_expr = aggr_expr.clone();
|
||||
{
|
||||
let new_aggr_func = get_aggr_func_mut(&mut new_aggr_expr).unwrap();
|
||||
*new_aggr_func = upper_func;
|
||||
}
|
||||
|
||||
// make the column name the same, so parent can recognize it
|
||||
upper_aggr_expr.push(new_aggr_expr.alias(col_name));
|
||||
}
|
||||
let mut new_aggr = aggr.clone();
|
||||
new_aggr.aggr_expr = upper_aggr_expr;
|
||||
// group by expr also need alias to avoid duplicated computing
|
||||
|
||||
let mut new_group_expr = new_aggr.group_expr.clone();
|
||||
for expr in &mut new_group_expr {
|
||||
if let Expr::Column(_) = expr {
|
||||
// already a column, no need to change
|
||||
continue;
|
||||
}
|
||||
let col_name = expr.name_for_alias()?;
|
||||
let input_column =
|
||||
Expr::Column(datafusion_common::Column::new_unqualified(col_name.clone()));
|
||||
*expr = input_column.alias(col_name);
|
||||
}
|
||||
new_aggr.group_expr = new_group_expr;
|
||||
// return the new logical plan
|
||||
Ok(LogicalPlan::Aggregate(new_aggr))
|
||||
}
|
||||
|
||||
/// Check if the given aggregate expression is steppable.
|
||||
/// As in if it can be split into multiple steps:
|
||||
/// i.e. on datanode first call `state(input)` then
|
||||
/// on frontend call `calc(merge(state))` to get the final result.
|
||||
///
|
||||
pub fn is_all_aggr_exprs_steppable(aggr_exprs: &[Expr]) -> bool {
|
||||
let step_action = HashSet::from([
|
||||
"sum",
|
||||
"count",
|
||||
"min",
|
||||
"max",
|
||||
"first_value",
|
||||
"last_value",
|
||||
UDDSKETCH_STATE_NAME,
|
||||
HLL_NAME,
|
||||
]);
|
||||
aggr_exprs.iter().all(|expr| {
|
||||
if let Some(aggr_func) = get_aggr_func(expr) {
|
||||
if aggr_func.distinct {
|
||||
// Distinct aggregate functions are not steppable(yet).
|
||||
return false;
|
||||
}
|
||||
step_action.contains(aggr_func.func.name())
|
||||
} else {
|
||||
false
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get_aggr_func(expr: &Expr) -> Option<&datafusion_expr::expr::AggregateFunction> {
|
||||
let mut expr_ref = expr;
|
||||
while let Expr::Alias(alias) = expr_ref {
|
||||
expr_ref = &alias.expr;
|
||||
}
|
||||
if let Expr::AggregateFunction(aggr_func) = expr_ref {
|
||||
Some(aggr_func)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_aggr_func_mut(expr: &mut Expr) -> Option<&mut datafusion_expr::expr::AggregateFunction> {
|
||||
let mut expr_ref = expr;
|
||||
while let Expr::Alias(alias) = expr_ref {
|
||||
expr_ref = &mut alias.expr;
|
||||
}
|
||||
if let Expr::AggregateFunction(aggr_func) = expr_ref {
|
||||
Some(aggr_func)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub enum Commutativity {
|
||||
Commutative,
|
||||
PartialCommutative,
|
||||
ConditionalCommutative(Option<Transformer>),
|
||||
TransformedCommutative(Option<Transformer>),
|
||||
TransformedCommutative {
|
||||
transformer: Option<Transformer>,
|
||||
/// whether the transformer changes the child to parent
|
||||
expand_on_parent: bool,
|
||||
},
|
||||
NonCommutative,
|
||||
Unimplemented,
|
||||
/// For unrelated plans like DDL
|
||||
@@ -55,7 +203,18 @@ impl Categorizer {
|
||||
LogicalPlan::Filter(filter) => Self::check_expr(&filter.predicate),
|
||||
LogicalPlan::Window(_) => Commutativity::Unimplemented,
|
||||
LogicalPlan::Aggregate(aggr) => {
|
||||
if !Self::check_partition(&aggr.group_expr, &partition_cols) {
|
||||
let is_all_steppable = is_all_aggr_exprs_steppable(&aggr.aggr_expr);
|
||||
let is_partition = Self::check_partition(&aggr.group_expr, &partition_cols);
|
||||
if !is_partition && is_all_steppable {
|
||||
debug!("Plan is steppable: {plan}");
|
||||
return Commutativity::TransformedCommutative {
|
||||
transformer: Some(Arc::new(|plan: &LogicalPlan| {
|
||||
step_aggr_to_upper_aggr(plan).ok()
|
||||
})),
|
||||
expand_on_parent: true,
|
||||
};
|
||||
}
|
||||
if !is_partition {
|
||||
return Commutativity::NonCommutative;
|
||||
}
|
||||
for expr in &aggr.aggr_expr {
|
||||
|
||||
@@ -22,13 +22,14 @@ use std::sync::Arc;
|
||||
use async_trait::async_trait;
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_base::Plugins;
|
||||
use common_function::function_factory::ScalarFunctionFactory;
|
||||
use common_function::function::FunctionRef;
|
||||
use common_function::function_registry::FUNCTION_REGISTRY;
|
||||
use common_function::handlers::{
|
||||
FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
|
||||
};
|
||||
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
|
||||
use common_query::Output;
|
||||
use datafusion_expr::{AggregateUDF, LogicalPlan};
|
||||
use datafusion_expr::LogicalPlan;
|
||||
use datatypes::schema::Schema;
|
||||
pub use default_serializer::{DefaultPlanDecoder, DefaultSerializer};
|
||||
use session::context::QueryContextRef;
|
||||
@@ -78,11 +79,11 @@ pub trait QueryEngine: Send + Sync {
|
||||
///
|
||||
/// # Panics
|
||||
/// Will panic if the function with same name is already registered.
|
||||
fn register_aggregate_function(&self, func: AggregateUDF);
|
||||
fn register_aggregate_function(&self, func: AggregateFunctionMetaRef);
|
||||
|
||||
/// Register a scalar function.
|
||||
/// Register a SQL function.
|
||||
/// Will override if the function with same name is already registered.
|
||||
fn register_scalar_function(&self, func: ScalarFunctionFactory);
|
||||
fn register_function(&self, func: FunctionRef);
|
||||
|
||||
/// Create a DataFrame from a table.
|
||||
fn read_table(&self, table: TableRef) -> Result<DataFrame>;
|
||||
@@ -153,8 +154,8 @@ impl QueryEngineFactory {
|
||||
|
||||
/// Register all functions implemented by GreptimeDB
|
||||
fn register_functions(query_engine: &Arc<DatafusionQueryEngine>) {
|
||||
for func in FUNCTION_REGISTRY.scalar_functions() {
|
||||
query_engine.register_scalar_function(func);
|
||||
for func in FUNCTION_REGISTRY.functions() {
|
||||
query_engine.register_function(func);
|
||||
}
|
||||
|
||||
for accumulator in FUNCTION_REGISTRY.aggregate_functions() {
|
||||
|
||||
@@ -15,8 +15,9 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_function::function::FunctionContext;
|
||||
use common_function::aggr::{GeoPathAccumulator, HllState, UddSketchState};
|
||||
use common_function::function_registry::FUNCTION_REGISTRY;
|
||||
use common_function::scalars::udf::create_udf;
|
||||
use common_query::error::RegisterUdfSnafu;
|
||||
use common_query::logical_plan::SubstraitPlanDecoder;
|
||||
use datafusion::catalog::CatalogProviderList;
|
||||
@@ -123,46 +124,43 @@ impl SubstraitPlanDecoder for DefaultPlanDecoder {
|
||||
// if they have the same name as the default UDFs or their alias.
|
||||
// e.g. The default UDF `to_char()` has an alias `date_format()`, if we register a UDF with the name `date_format()`
|
||||
// before we build the session state, the UDF will be lost.
|
||||
for func in FUNCTION_REGISTRY.scalar_functions() {
|
||||
let udf = func.provide(FunctionContext {
|
||||
query_ctx: self.query_ctx.clone(),
|
||||
state: Default::default(),
|
||||
});
|
||||
for func in FUNCTION_REGISTRY.functions() {
|
||||
let udf = Arc::new(create_udf(
|
||||
func.clone(),
|
||||
self.query_ctx.clone(),
|
||||
Default::default(),
|
||||
));
|
||||
session_state
|
||||
.register_udf(Arc::new(udf))
|
||||
.register_udf(udf)
|
||||
.context(RegisterUdfSnafu { name: func.name() })?;
|
||||
let _ = session_state.register_udaf(Arc::new(UddSketchState::state_udf_impl()));
|
||||
let _ = session_state.register_udaf(Arc::new(UddSketchState::merge_udf_impl()));
|
||||
let _ = session_state.register_udaf(Arc::new(HllState::state_udf_impl()));
|
||||
let _ = session_state.register_udaf(Arc::new(HllState::merge_udf_impl()));
|
||||
let _ = session_state.register_udaf(Arc::new(GeoPathAccumulator::udf_impl()));
|
||||
let _ = session_state.register_udaf(quantile_udaf());
|
||||
|
||||
let _ = session_state.register_udf(Arc::new(IDelta::<false>::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(IDelta::<true>::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(Rate::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(Increase::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(Delta::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(Resets::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(Changes::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(Deriv::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(Round::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(AvgOverTime::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(MinOverTime::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(MaxOverTime::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(SumOverTime::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(CountOverTime::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(LastOverTime::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(AbsentOverTime::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(PresentOverTime::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(StddevOverTime::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(StdvarOverTime::scalar_udf()));
|
||||
// TODO(ruihang): add quantile_over_time, predict_linear, holt_winters, round
|
||||
}
|
||||
|
||||
for func in FUNCTION_REGISTRY.aggregate_functions() {
|
||||
let name = func.name().to_string();
|
||||
session_state
|
||||
.register_udaf(Arc::new(func))
|
||||
.context(RegisterUdfSnafu { name })?;
|
||||
}
|
||||
|
||||
let _ = session_state.register_udaf(quantile_udaf());
|
||||
|
||||
let _ = session_state.register_udf(Arc::new(IDelta::<false>::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(IDelta::<true>::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(Rate::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(Increase::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(Delta::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(Resets::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(Changes::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(Deriv::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(Round::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(AvgOverTime::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(MinOverTime::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(MaxOverTime::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(SumOverTime::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(CountOverTime::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(LastOverTime::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(AbsentOverTime::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(PresentOverTime::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(StddevOverTime::scalar_udf()));
|
||||
let _ = session_state.register_udf(Arc::new(StdvarOverTime::scalar_udf()));
|
||||
// TODO(ruihang): add quantile_over_time, predict_linear, holt_winters, round
|
||||
|
||||
let logical_plan = DFLogicalSubstraitConvertor
|
||||
.decode(message, session_state)
|
||||
.await
|
||||
|
||||
@@ -19,10 +19,11 @@ use std::sync::{Arc, RwLock};
|
||||
use async_trait::async_trait;
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_base::Plugins;
|
||||
use common_function::function_factory::ScalarFunctionFactory;
|
||||
use common_function::function::FunctionRef;
|
||||
use common_function::handlers::{
|
||||
FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
|
||||
};
|
||||
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
|
||||
use common_function::state::FunctionState;
|
||||
use common_telemetry::warn;
|
||||
use datafusion::dataframe::DataFrame;
|
||||
@@ -36,7 +37,7 @@ use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan;
|
||||
use datafusion::physical_optimizer::PhysicalOptimizerRule;
|
||||
use datafusion::physical_plan::ExecutionPlan;
|
||||
use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner};
|
||||
use datafusion_expr::{AggregateUDF, LogicalPlan as DfLogicalPlan};
|
||||
use datafusion_expr::LogicalPlan as DfLogicalPlan;
|
||||
use datafusion_optimizer::analyzer::count_wildcard_rule::CountWildcardRule;
|
||||
use datafusion_optimizer::analyzer::{Analyzer, AnalyzerRule};
|
||||
use datafusion_optimizer::optimizer::Optimizer;
|
||||
@@ -69,8 +70,8 @@ pub struct QueryEngineState {
|
||||
df_context: SessionContext,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
function_state: Arc<FunctionState>,
|
||||
scalar_functions: Arc<RwLock<HashMap<String, ScalarFunctionFactory>>>,
|
||||
aggr_functions: Arc<RwLock<HashMap<String, AggregateUDF>>>,
|
||||
udf_functions: Arc<RwLock<HashMap<String, FunctionRef>>>,
|
||||
aggregate_functions: Arc<RwLock<HashMap<String, AggregateFunctionMetaRef>>>,
|
||||
extension_rules: Vec<Arc<dyn ExtensionAnalyzerRule + Send + Sync>>,
|
||||
plugins: Plugins,
|
||||
}
|
||||
@@ -185,10 +186,10 @@ impl QueryEngineState {
|
||||
procedure_service_handler,
|
||||
flow_service_handler,
|
||||
}),
|
||||
aggr_functions: Arc::new(RwLock::new(HashMap::new())),
|
||||
aggregate_functions: Arc::new(RwLock::new(HashMap::new())),
|
||||
extension_rules,
|
||||
plugins,
|
||||
scalar_functions: Arc::new(RwLock::new(HashMap::new())),
|
||||
udf_functions: Arc::new(RwLock::new(HashMap::new())),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -221,28 +222,38 @@ impl QueryEngineState {
|
||||
self.session_state().optimize(&plan)
|
||||
}
|
||||
|
||||
/// Retrieve the scalar function by name
|
||||
pub fn scalar_function(&self, function_name: &str) -> Option<ScalarFunctionFactory> {
|
||||
self.scalar_functions
|
||||
/// Register an udf function.
|
||||
/// Will override if the function with same name is already registered.
|
||||
pub fn register_function(&self, func: FunctionRef) {
|
||||
let name = func.name().to_string();
|
||||
let x = self
|
||||
.udf_functions
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert(name.clone(), func);
|
||||
|
||||
if x.is_some() {
|
||||
warn!("Already registered udf function '{name}'");
|
||||
}
|
||||
}
|
||||
|
||||
/// Retrieve the udf function by name
|
||||
pub fn udf_function(&self, function_name: &str) -> Option<FunctionRef> {
|
||||
self.udf_functions
|
||||
.read()
|
||||
.unwrap()
|
||||
.get(function_name)
|
||||
.cloned()
|
||||
}
|
||||
|
||||
/// Retrieve scalar function names.
|
||||
pub fn scalar_names(&self) -> Vec<String> {
|
||||
self.scalar_functions
|
||||
.read()
|
||||
.unwrap()
|
||||
.keys()
|
||||
.cloned()
|
||||
.collect()
|
||||
/// Retrieve udf function names.
|
||||
pub fn udf_names(&self) -> Vec<String> {
|
||||
self.udf_functions.read().unwrap().keys().cloned().collect()
|
||||
}
|
||||
|
||||
/// Retrieve the aggregate function by name
|
||||
pub fn aggr_function(&self, function_name: &str) -> Option<AggregateUDF> {
|
||||
self.aggr_functions
|
||||
pub fn aggregate_function(&self, function_name: &str) -> Option<AggregateFunctionMetaRef> {
|
||||
self.aggregate_functions
|
||||
.read()
|
||||
.unwrap()
|
||||
.get(function_name)
|
||||
@@ -250,8 +261,8 @@ impl QueryEngineState {
|
||||
}
|
||||
|
||||
/// Retrieve aggregate function names.
|
||||
pub fn aggr_names(&self) -> Vec<String> {
|
||||
self.aggr_functions
|
||||
pub fn udaf_names(&self) -> Vec<String> {
|
||||
self.aggregate_functions
|
||||
.read()
|
||||
.unwrap()
|
||||
.keys()
|
||||
@@ -259,21 +270,6 @@ impl QueryEngineState {
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Register an scalar function.
|
||||
/// Will override if the function with same name is already registered.
|
||||
pub fn register_scalar_function(&self, func: ScalarFunctionFactory) {
|
||||
let name = func.name().to_string();
|
||||
let x = self
|
||||
.scalar_functions
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert(name.clone(), func);
|
||||
|
||||
if x.is_some() {
|
||||
warn!("Already registered scalar function '{name}'");
|
||||
}
|
||||
}
|
||||
|
||||
/// Register an aggregate function.
|
||||
///
|
||||
/// # Panics
|
||||
@@ -282,10 +278,10 @@ impl QueryEngineState {
|
||||
/// Panicking consideration: currently the aggregated functions are all statically registered,
|
||||
/// user cannot define their own aggregate functions on the fly. So we can panic here. If that
|
||||
/// invariant is broken in the future, we should return an error instead of panicking.
|
||||
pub fn register_aggr_function(&self, func: AggregateUDF) {
|
||||
let name = func.name().to_string();
|
||||
pub fn register_aggregate_function(&self, func: AggregateFunctionMetaRef) {
|
||||
let name = func.name();
|
||||
let x = self
|
||||
.aggr_functions
|
||||
.aggregate_functions
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert(name.clone(), func);
|
||||
|
||||
@@ -16,12 +16,11 @@ use std::fmt::Debug;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_function::scalars::aggregate::AggregateFunctionMeta;
|
||||
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
|
||||
use common_query::error::{CreateAccumulatorSnafu, Result as QueryResult};
|
||||
use common_query::logical_plan::accumulator::AggrFuncTypeStore;
|
||||
use common_query::logical_plan::{
|
||||
create_aggregate_function, Accumulator, AggregateFunctionCreator,
|
||||
};
|
||||
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
|
||||
use common_query::prelude::*;
|
||||
use common_recordbatch::{RecordBatch, RecordBatches};
|
||||
use datatypes::prelude::*;
|
||||
@@ -208,14 +207,11 @@ where
|
||||
|
||||
let engine = new_query_engine_with_table(testing_table);
|
||||
|
||||
engine.register_aggregate_function(
|
||||
create_aggregate_function(
|
||||
"my_sum".to_string(),
|
||||
1,
|
||||
Arc::new(MySumAccumulatorCreator::default()),
|
||||
)
|
||||
.into(),
|
||||
);
|
||||
engine.register_aggregate_function(Arc::new(AggregateFunctionMeta::new(
|
||||
"my_sum",
|
||||
1,
|
||||
Arc::new(|| Arc::new(MySumAccumulatorCreator::default())),
|
||||
)));
|
||||
|
||||
let sql = format!("select MY_SUM({column_name}) as my_sum from {table_name}");
|
||||
let batches = exec_selection(engine, &sql).await;
|
||||
|
||||
@@ -66,8 +66,6 @@ pub struct GrpcOptions {
|
||||
pub max_recv_message_size: ReadableSize,
|
||||
/// Max gRPC sending(encoding) message size
|
||||
pub max_send_message_size: ReadableSize,
|
||||
/// Compression mode in Arrow Flight service.
|
||||
pub flight_compression: FlightCompression,
|
||||
pub runtime_size: usize,
|
||||
#[serde(default = "Default::default")]
|
||||
pub tls: TlsOption,
|
||||
@@ -116,7 +114,6 @@ impl Default for GrpcOptions {
|
||||
server_addr: String::new(),
|
||||
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
|
||||
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
|
||||
flight_compression: FlightCompression::ArrowIpc,
|
||||
runtime_size: 8,
|
||||
tls: TlsOption::default(),
|
||||
}
|
||||
@@ -135,30 +132,6 @@ impl GrpcOptions {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Default)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum FlightCompression {
|
||||
/// Disable all compression in Arrow Flight service.
|
||||
None,
|
||||
/// Enable only transport layer compression (zstd).
|
||||
Transport,
|
||||
/// Enable only payload compression (lz4)
|
||||
#[default]
|
||||
ArrowIpc,
|
||||
/// Enable all compression.
|
||||
All,
|
||||
}
|
||||
|
||||
impl FlightCompression {
|
||||
pub fn transport_compression(&self) -> bool {
|
||||
self == &FlightCompression::Transport || self == &FlightCompression::All
|
||||
}
|
||||
|
||||
pub fn arrow_compression(&self) -> bool {
|
||||
self == &FlightCompression::ArrowIpc || self == &FlightCompression::All
|
||||
}
|
||||
}
|
||||
|
||||
pub struct GrpcServer {
|
||||
// states
|
||||
shutdown_tx: Mutex<Option<Sender<()>>>,
|
||||
|
||||
@@ -45,7 +45,7 @@ use tonic::{Request, Response, Status, Streaming};
|
||||
use crate::error::{InvalidParameterSnafu, ParseJsonSnafu, Result, ToJsonSnafu};
|
||||
pub use crate::grpc::flight::stream::FlightRecordBatchStream;
|
||||
use crate::grpc::greptime_handler::{get_request_type, GreptimeRequestHandler};
|
||||
use crate::grpc::{FlightCompression, TonicResult};
|
||||
use crate::grpc::TonicResult;
|
||||
use crate::http::header::constants::GREPTIME_DB_HEADER_NAME;
|
||||
use crate::http::AUTHORIZATION_HEADER;
|
||||
use crate::{error, hint_headers};
|
||||
@@ -195,14 +195,9 @@ impl FlightCraft for GreptimeRequestHandler {
|
||||
protocol = "grpc",
|
||||
request_type = get_request_type(&request)
|
||||
);
|
||||
let flight_compression = self.flight_compression;
|
||||
async {
|
||||
let output = self.handle_request(request, hints).await?;
|
||||
let stream = to_flight_data_stream(
|
||||
output,
|
||||
TracingContext::from_current_span(),
|
||||
flight_compression,
|
||||
);
|
||||
let stream = to_flight_data_stream(output, TracingContext::from_current_span());
|
||||
Ok(Response::new(stream))
|
||||
}
|
||||
.trace(span)
|
||||
@@ -370,16 +365,14 @@ impl Stream for PutRecordBatchRequestStream {
|
||||
fn to_flight_data_stream(
|
||||
output: Output,
|
||||
tracing_context: TracingContext,
|
||||
flight_compression: FlightCompression,
|
||||
) -> TonicStream<FlightData> {
|
||||
match output.data {
|
||||
OutputData::Stream(stream) => {
|
||||
let stream = FlightRecordBatchStream::new(stream, tracing_context, flight_compression);
|
||||
let stream = FlightRecordBatchStream::new(stream, tracing_context);
|
||||
Box::pin(stream) as _
|
||||
}
|
||||
OutputData::RecordBatches(x) => {
|
||||
let stream =
|
||||
FlightRecordBatchStream::new(x.as_stream(), tracing_context, flight_compression);
|
||||
let stream = FlightRecordBatchStream::new(x.as_stream(), tracing_context);
|
||||
Box::pin(stream) as _
|
||||
}
|
||||
OutputData::AffectedRows(rows) => {
|
||||
|
||||
@@ -30,7 +30,6 @@ use tokio::task::JoinHandle;
|
||||
|
||||
use crate::error;
|
||||
use crate::grpc::flight::TonicResult;
|
||||
use crate::grpc::FlightCompression;
|
||||
|
||||
#[pin_project(PinnedDrop)]
|
||||
pub struct FlightRecordBatchStream {
|
||||
@@ -42,27 +41,18 @@ pub struct FlightRecordBatchStream {
|
||||
}
|
||||
|
||||
impl FlightRecordBatchStream {
|
||||
pub fn new(
|
||||
recordbatches: SendableRecordBatchStream,
|
||||
tracing_context: TracingContext,
|
||||
compression: FlightCompression,
|
||||
) -> Self {
|
||||
pub fn new(recordbatches: SendableRecordBatchStream, tracing_context: TracingContext) -> Self {
|
||||
let (tx, rx) = mpsc::channel::<TonicResult<FlightMessage>>(1);
|
||||
let join_handle = common_runtime::spawn_global(async move {
|
||||
Self::flight_data_stream(recordbatches, tx)
|
||||
.trace(tracing_context.attach(info_span!("flight_data_stream")))
|
||||
.await
|
||||
});
|
||||
let encoder = if compression.arrow_compression() {
|
||||
FlightEncoder::default()
|
||||
} else {
|
||||
FlightEncoder::with_compression_disabled()
|
||||
};
|
||||
Self {
|
||||
rx,
|
||||
join_handle,
|
||||
done: false,
|
||||
encoder,
|
||||
encoder: FlightEncoder::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -171,11 +161,7 @@ mod test {
|
||||
let recordbatches = RecordBatches::try_new(schema.clone(), vec![recordbatch.clone()])
|
||||
.unwrap()
|
||||
.as_stream();
|
||||
let mut stream = FlightRecordBatchStream::new(
|
||||
recordbatches,
|
||||
TracingContext::default(),
|
||||
FlightCompression::default(),
|
||||
);
|
||||
let mut stream = FlightRecordBatchStream::new(recordbatches, TracingContext::default());
|
||||
|
||||
let mut raw_data = Vec::with_capacity(2);
|
||||
raw_data.push(stream.next().await.unwrap().unwrap());
|
||||
|
||||
@@ -49,7 +49,7 @@ use crate::error::{
|
||||
JoinTaskSnafu, NotFoundAuthHeaderSnafu, Result, UnknownHintSnafu,
|
||||
};
|
||||
use crate::grpc::flight::{PutRecordBatchRequest, PutRecordBatchRequestStream};
|
||||
use crate::grpc::{FlightCompression, TonicResult};
|
||||
use crate::grpc::TonicResult;
|
||||
use crate::metrics;
|
||||
use crate::metrics::{METRIC_AUTH_FAILURE, METRIC_SERVER_GRPC_DB_REQUEST_TIMER};
|
||||
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
|
||||
@@ -59,7 +59,6 @@ pub struct GreptimeRequestHandler {
|
||||
handler: ServerGrpcQueryHandlerRef,
|
||||
user_provider: Option<UserProviderRef>,
|
||||
runtime: Option<Runtime>,
|
||||
pub(crate) flight_compression: FlightCompression,
|
||||
}
|
||||
|
||||
impl GreptimeRequestHandler {
|
||||
@@ -67,13 +66,11 @@ impl GreptimeRequestHandler {
|
||||
handler: ServerGrpcQueryHandlerRef,
|
||||
user_provider: Option<UserProviderRef>,
|
||||
runtime: Option<Runtime>,
|
||||
flight_compression: FlightCompression,
|
||||
) -> Self {
|
||||
Self {
|
||||
handler,
|
||||
user_provider,
|
||||
runtime,
|
||||
flight_compression,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -34,7 +34,7 @@ mod test {
|
||||
use itertools::Itertools;
|
||||
use servers::grpc::builder::GrpcServerBuilder;
|
||||
use servers::grpc::greptime_handler::GreptimeRequestHandler;
|
||||
use servers::grpc::{FlightCompression, GrpcServerConfig};
|
||||
use servers::grpc::GrpcServerConfig;
|
||||
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
|
||||
use servers::server::Server;
|
||||
|
||||
@@ -94,7 +94,6 @@ mod test {
|
||||
)
|
||||
.ok(),
|
||||
Some(runtime.clone()),
|
||||
FlightCompression::default(),
|
||||
);
|
||||
let mut grpc_server = GrpcServerBuilder::new(GrpcServerConfig::default(), runtime)
|
||||
.flight_handler(Arc::new(greptime_request_handler))
|
||||
|
||||
@@ -42,7 +42,7 @@ use object_store::test_util::TempFolder;
|
||||
use object_store::ObjectStore;
|
||||
use servers::grpc::builder::GrpcServerBuilder;
|
||||
use servers::grpc::greptime_handler::GreptimeRequestHandler;
|
||||
use servers::grpc::{FlightCompression, GrpcOptions, GrpcServer, GrpcServerConfig};
|
||||
use servers::grpc::{GrpcOptions, GrpcServer, GrpcServerConfig};
|
||||
use servers::http::{HttpOptions, HttpServerBuilder, PromValidationMode};
|
||||
use servers::metrics_handler::MetricsHandler;
|
||||
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
|
||||
@@ -585,7 +585,6 @@ pub async fn setup_grpc_server_with(
|
||||
ServerGrpcQueryHandlerAdapter::arc(fe_instance_ref.clone()),
|
||||
user_provider.clone(),
|
||||
Some(runtime.clone()),
|
||||
FlightCompression::default(),
|
||||
);
|
||||
|
||||
let flight_handler = Arc::new(greptime_request_handler.clone());
|
||||
|
||||
@@ -1025,7 +1025,6 @@ bind_addr = "127.0.0.1:4001"
|
||||
server_addr = "127.0.0.1:4001"
|
||||
max_recv_message_size = "512MiB"
|
||||
max_send_message_size = "512MiB"
|
||||
flight_compression = "arrow_ipc"
|
||||
runtime_size = 8
|
||||
|
||||
[grpc.tls]
|
||||
|
||||
409
tests/cases/distributed/explain/step_aggr.result
Normal file
409
tests/cases/distributed/explain/step_aggr.result
Normal file
@@ -0,0 +1,409 @@
|
||||
CREATE TABLE integers(
|
||||
host STRING,
|
||||
i BIGINT,
|
||||
ts TIMESTAMP TIME INDEX
|
||||
) PARTITION ON COLUMNS (host) (
|
||||
host < '550-A',
|
||||
host >= '550-A'
|
||||
AND host < '550-W',
|
||||
host >= '550-W'
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO integers (host, i, ts) VALUES
|
||||
('550-A', 1, '2023-01-01 00:00:00'),
|
||||
('550-A', 2, '2023-01-01 01:00:00'),
|
||||
('550-W', 3, '2023-01-01 02:00:00'),
|
||||
('550-W', 4, '2023-01-01 03:00:00');
|
||||
|
||||
Affected Rows: 4
|
||||
|
||||
-- count
|
||||
EXPLAIN SELECT
|
||||
count(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[count(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[count(integers.i)] |
|
||||
| | CoalescePartitionsExec |
|
||||
| | AggregateExec: mode=Partial, gby=[], aggr=[count(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[count(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Projection: count(integers.i) |
|
||||
| | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[count(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | ProjectionExec: expr=[count(integers.i)@1 as count(integers.i)] |
|
||||
| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
|
||||
-- sum
|
||||
EXPLAIN SELECT
|
||||
sum(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[]], aggr=[[sum(sum(integers.i)) AS sum(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[sum(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[sum(integers.i)] |
|
||||
| | CoalescePartitionsExec |
|
||||
| | AggregateExec: mode=Partial, gby=[], aggr=[sum(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
sum(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(sum(integers.i)) AS sum(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[sum(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
sum(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Projection: sum(integers.i) |
|
||||
| | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(sum(integers.i)) AS sum(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[sum(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | ProjectionExec: expr=[sum(integers.i)@1 as sum(integers.i)] |
|
||||
| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[sum(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
|
||||
-- min
|
||||
EXPLAIN SELECT
|
||||
min(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[]], aggr=[[min(min(integers.i)) AS min(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[min(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[min(integers.i)] |
|
||||
| | CoalescePartitionsExec |
|
||||
| | AggregateExec: mode=Partial, gby=[], aggr=[min(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
min(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[min(min(integers.i)) AS min(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[min(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[min(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[min(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
min(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Projection: min(integers.i) |
|
||||
| | Aggregate: groupBy=[[integers.ts]], aggr=[[min(min(integers.i)) AS min(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[min(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | ProjectionExec: expr=[min(integers.i)@1 as min(integers.i)] |
|
||||
| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[min(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[min(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
|
||||
-- max
|
||||
EXPLAIN SELECT
|
||||
max(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[]], aggr=[[max(max(integers.i)) AS max(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[max(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[max(integers.i)] |
|
||||
| | CoalescePartitionsExec |
|
||||
| | AggregateExec: mode=Partial, gby=[], aggr=[max(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
max(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[max(max(integers.i)) AS max(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[max(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[max(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[max(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
max(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Projection: max(integers.i) |
|
||||
| | Aggregate: groupBy=[[integers.ts]], aggr=[[max(max(integers.i)) AS max(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[max(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | ProjectionExec: expr=[max(integers.i)@1 as max(integers.i)] |
|
||||
| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[max(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[max(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------+
|
||||
|
||||
-- uddsketch_state
|
||||
EXPLAIN SELECT
|
||||
uddsketch_state(128, 0.01, i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[]], aggr=[[uddsketch_merge(Int64(128), Float64(0.01), uddsketch_state(Int64(128),Float64(0.01),integers.i)) AS uddsketch_state(Int64(128),Float64(0.01),integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[uddsketch_state(Int64(128), Float64(0.01), CAST(integers.i AS Float64))]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
|
||||
| | CoalescePartitionsExec |
|
||||
| | AggregateExec: mode=Partial, gby=[], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
uddsketch_state(128, 0.01, i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[uddsketch_merge(Int64(128), Float64(0.01), uddsketch_state(Int64(128),Float64(0.01),integers.i)) AS uddsketch_state(Int64(128),Float64(0.01),integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[uddsketch_state(Int64(128), Float64(0.01), CAST(integers.i AS Float64))]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
uddsketch_state(128, 0.01, i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Projection: uddsketch_state(Int64(128),Float64(0.01),integers.i) |
|
||||
| | Aggregate: groupBy=[[integers.ts]], aggr=[[uddsketch_merge(Int64(128), Float64(0.01), uddsketch_state(Int64(128),Float64(0.01),integers.i)) AS uddsketch_state(Int64(128),Float64(0.01),integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[uddsketch_state(Int64(128), Float64(0.01), CAST(integers.i AS Float64))]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | ProjectionExec: expr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)@1 as uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
|
||||
| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[uddsketch_state(Int64(128),Float64(0.01),integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
-- hll
|
||||
EXPLAIN SELECT
|
||||
hll(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
+---------------+----------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+----------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[]], aggr=[[hll_merge(hll(integers.i)) AS hll(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[hll(CAST(integers.i AS Utf8))]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[hll(integers.i)] |
|
||||
| | CoalescePartitionsExec |
|
||||
| | AggregateExec: mode=Partial, gby=[], aggr=[hll(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+----------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
hll(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+---------------------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[hll_merge(hll(integers.i)) AS hll(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[hll(CAST(integers.i AS Utf8))]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[hll(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[hll(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
hll(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Projection: hll(integers.i) |
|
||||
| | Aggregate: groupBy=[[integers.ts]], aggr=[[hll_merge(hll(integers.i)) AS hll(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[hll(CAST(integers.i AS Utf8))]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | ProjectionExec: expr=[hll(integers.i)@1 as hll(integers.i)] |
|
||||
| | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[hll(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[hll(integers.i)] |
|
||||
| | MergeScanExec: peers=[4402341478400(1025, 0), 4402341478401(1025, 1), 4402341478402(1025, 2), ] |
|
||||
| | |
|
||||
+---------------+-----------------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
DROP TABLE integers;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
144
tests/cases/distributed/explain/step_aggr.sql
Normal file
144
tests/cases/distributed/explain/step_aggr.sql
Normal file
@@ -0,0 +1,144 @@
|
||||
CREATE TABLE integers(
|
||||
host STRING,
|
||||
i BIGINT,
|
||||
ts TIMESTAMP TIME INDEX
|
||||
) PARTITION ON COLUMNS (host) (
|
||||
host < '550-A',
|
||||
host >= '550-A'
|
||||
AND host < '550-W',
|
||||
host >= '550-W'
|
||||
);
|
||||
|
||||
INSERT INTO integers (host, i, ts) VALUES
|
||||
('550-A', 1, '2023-01-01 00:00:00'),
|
||||
('550-A', 2, '2023-01-01 01:00:00'),
|
||||
('550-W', 3, '2023-01-01 02:00:00'),
|
||||
('550-W', 4, '2023-01-01 03:00:00');
|
||||
|
||||
-- count
|
||||
EXPLAIN SELECT
|
||||
count(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
EXPLAIN SELECT
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
-- sum
|
||||
EXPLAIN SELECT
|
||||
sum(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
sum(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
EXPLAIN SELECT
|
||||
sum(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
-- min
|
||||
EXPLAIN SELECT
|
||||
min(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
min(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
EXPLAIN SELECT
|
||||
min(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
-- max
|
||||
EXPLAIN SELECT
|
||||
max(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
max(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
EXPLAIN SELECT
|
||||
max(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
-- uddsketch_state
|
||||
EXPLAIN SELECT
|
||||
uddsketch_state(128, 0.01, i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
uddsketch_state(128, 0.01, i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
EXPLAIN SELECT
|
||||
uddsketch_state(128, 0.01, i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
-- hll
|
||||
EXPLAIN SELECT
|
||||
hll(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
hll(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
EXPLAIN SELECT
|
||||
hll(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
DROP TABLE integers;
|
||||
80
tests/cases/distributed/explain/step_aggr_basic.result
Normal file
80
tests/cases/distributed/explain/step_aggr_basic.result
Normal file
@@ -0,0 +1,80 @@
|
||||
CREATE TABLE integers(
|
||||
host STRING,
|
||||
i BIGINT,
|
||||
ts TIMESTAMP TIME INDEX
|
||||
) PARTITION ON COLUMNS (host) (
|
||||
host < '550-A',
|
||||
host >= '550-A'
|
||||
AND host < '550-W',
|
||||
host >= '550-W'
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- count
|
||||
EXPLAIN SELECT
|
||||
count(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[]], aggr=[[count(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[count(integers.i)] |
|
||||
| | CoalescePartitionsExec |
|
||||
| | AggregateExec: mode=Partial, gby=[], aggr=[count(integers.i)] |
|
||||
| | MergeScanExec: peers=[4398046511104(1024, 0), 4398046511105(1024, 1), 4398046511106(1024, 2), ] |
|
||||
| | |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[integers.ts]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[integers.ts]], aggr=[[count(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([ts@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(integers.i)] |
|
||||
| | MergeScanExec: peers=[4398046511104(1024, 0), 4398046511105(1024, 1), 4398046511106(1024, 2), ] |
|
||||
| | |
|
||||
+---------------+---------------------------------------------------------------------------------------------------------+
|
||||
|
||||
EXPLAIN SELECT
|
||||
date_bin('1 hour'::INTERVAL, ts),
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
date_bin('1 hour'::INTERVAL, ts);
|
||||
|
||||
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Aggregate: groupBy=[[date_bin(Utf8("1 hour"),integers.ts) AS date_bin(Utf8("1 hour"),integers.ts)]], aggr=[[sum(count(integers.i)) AS count(integers.i)]] |
|
||||
| | MergeScan [is_placeholder=false, input=Aggregate: groupBy=[[date_bin(CAST(Utf8("1 hour") AS Interval(MonthDayNano)), integers.ts)]], aggr=[[count(integers.i)]] |
|
||||
| | TableScan: integers] |
|
||||
| physical_plan | AggregateExec: mode=FinalPartitioned, gby=[date_bin(Utf8("1 hour"),integers.ts)@0 as date_bin(Utf8("1 hour"),integers.ts)], aggr=[count(integers.i)] |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([date_bin(Utf8("1 hour"),integers.ts)@0], 20), input_partitions=20 |
|
||||
| | AggregateExec: mode=Partial, gby=[date_bin(Utf8("1 hour"),integers.ts)@0 as date_bin(Utf8("1 hour"),integers.ts)], aggr=[count(integers.i)] |
|
||||
| | MergeScanExec: peers=[4398046511104(1024, 0), 4398046511105(1024, 1), 4398046511106(1024, 2), ] |
|
||||
| | |
|
||||
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
DROP TABLE integers;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
34
tests/cases/distributed/explain/step_aggr_basic.sql
Normal file
34
tests/cases/distributed/explain/step_aggr_basic.sql
Normal file
@@ -0,0 +1,34 @@
|
||||
CREATE TABLE integers(
|
||||
host STRING,
|
||||
i BIGINT,
|
||||
ts TIMESTAMP TIME INDEX
|
||||
) PARTITION ON COLUMNS (host) (
|
||||
host < '550-A',
|
||||
host >= '550-A'
|
||||
AND host < '550-W',
|
||||
host >= '550-W'
|
||||
);
|
||||
|
||||
-- count
|
||||
EXPLAIN SELECT
|
||||
count(i)
|
||||
FROM
|
||||
integers;
|
||||
|
||||
EXPLAIN SELECT
|
||||
ts,
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
ts;
|
||||
|
||||
EXPLAIN SELECT
|
||||
date_bin('1 hour'::INTERVAL, ts),
|
||||
count(i)
|
||||
FROM
|
||||
integers
|
||||
GROUP BY
|
||||
date_bin('1 hour'::INTERVAL, ts);
|
||||
|
||||
DROP TABLE integers;
|
||||
Reference in New Issue
Block a user