fix: rm useless analyzer (#7797)

* fix: rm useless analyzer

Signed-off-by: discord9 <discord9@163.com>

* test: rm related test

Signed-off-by: discord9 <discord9@163.com>

* test: flow tql avg

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-03-12 18:53:47 +08:00
committed by GitHub
parent 7866132920
commit 3beb538aa8
4 changed files with 192 additions and 1294 deletions

View File

@@ -16,30 +16,19 @@
#![warn(unused)]
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::sync::Arc;
use common_error::ext::BoxedError;
use common_telemetry::debug;
use datafusion::config::ConfigOptions;
use datafusion::error::DataFusionError;
use datafusion::functions_aggregate::count::count_udaf;
use datafusion::functions_aggregate::sum::sum_udaf;
use datafusion::optimizer::analyzer::type_coercion::TypeCoercion;
use datafusion::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
use datafusion::optimizer::optimize_projections::OptimizeProjections;
use datafusion::optimizer::simplify_expressions::SimplifyExpressions;
use datafusion::optimizer::utils::NamePreserver;
use datafusion::optimizer::{Analyzer, AnalyzerRule, Optimizer, OptimizerContext};
use datafusion_common::tree_node::{
Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter, TreeNodeVisitor,
};
use datafusion_common::{Column, DFSchema, ScalarValue};
use datafusion_expr::utils::merge_schema;
use datafusion_expr::{
BinaryExpr, ColumnarValue, Expr, Literal, Operator, Projection, ScalarFunctionArgs,
ScalarUDFImpl, Signature, TypeSignature, Volatility,
};
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use query::QueryEngine;
use query::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
use query::parser::QueryLanguageParser;
@@ -52,7 +41,6 @@ use substrait::DFLogicalSubstraitConvertor;
use crate::adapter::FlownodeContext;
use crate::error::{DatafusionSnafu, Error, ExternalSnafu, UnexpectedSnafu};
use crate::expr::{TUMBLE_END, TUMBLE_START};
use crate::plan::TypedPlan;
// TODO(discord9): use `Analyzer` to manage rules if more `AnalyzerRule` is needed
@@ -63,8 +51,6 @@ pub async fn apply_df_optimizer(
let cfg = query_ctx.create_config_options();
let analyzer = Analyzer::with_rules(vec![
Arc::new(CountWildcardToTimeIndexRule),
Arc::new(AvgExpandRule),
Arc::new(TumbleExpandRule),
Arc::new(CheckGroupByRule::new()),
Arc::new(TypeCoercion::new()),
]);
@@ -127,390 +113,6 @@ pub async fn sql_to_flow_plan(
Ok(flow_plan)
}
#[derive(Debug)]
struct AvgExpandRule;
impl AnalyzerRule for AvgExpandRule {
fn analyze(
&self,
plan: datafusion_expr::LogicalPlan,
_config: &ConfigOptions,
) -> datafusion_common::Result<datafusion_expr::LogicalPlan> {
let transformed = plan
.transform_up_with_subqueries(expand_avg_analyzer)?
.data
.transform_down_with_subqueries(put_aggr_to_proj_analyzer)?
.data;
Ok(transformed)
}
fn name(&self) -> &str {
"avg_expand"
}
}
/// lift aggr's composite aggr_expr to outer proj, and leave aggr only with simple direct aggr expr
/// i.e.
/// ```ignore
/// proj: avg(x)
/// -- aggr: [sum(x)/count(x) as avg(x)]
/// ```
/// becomes:
/// ```ignore
/// proj: sum(x)/count(x) as avg(x)
/// -- aggr: [sum(x), count(x)]
/// ```
fn put_aggr_to_proj_analyzer(
plan: datafusion_expr::LogicalPlan,
) -> Result<Transformed<datafusion_expr::LogicalPlan>, DataFusionError> {
if let datafusion_expr::LogicalPlan::Projection(proj) = &plan
&& let datafusion_expr::LogicalPlan::Aggregate(aggr) = proj.input.as_ref()
{
let mut replace_old_proj_exprs = HashMap::new();
let mut expanded_aggr_exprs = vec![];
for aggr_expr in &aggr.aggr_expr {
let mut is_composite = false;
if let Expr::AggregateFunction(_) = &aggr_expr {
expanded_aggr_exprs.push(aggr_expr.clone());
} else {
let old_name = aggr_expr.name_for_alias()?;
let new_proj_expr = aggr_expr
.clone()
.transform(|ch| {
if let Expr::AggregateFunction(_) = &ch {
is_composite = true;
expanded_aggr_exprs.push(ch.clone());
Ok(Transformed::yes(Expr::Column(Column::from_qualified_name(
ch.name_for_alias()?,
))))
} else {
Ok(Transformed::no(ch))
}
})?
.data;
replace_old_proj_exprs.insert(old_name, new_proj_expr);
}
}
if expanded_aggr_exprs.len() > aggr.aggr_expr.len() {
let mut aggr = aggr.clone();
aggr.aggr_expr = expanded_aggr_exprs;
let mut aggr_plan = datafusion_expr::LogicalPlan::Aggregate(aggr);
// important to recompute schema after changing aggr_expr
aggr_plan = aggr_plan.recompute_schema()?;
// reconstruct proj with new proj_exprs
let mut new_proj_exprs = proj.expr.clone();
for proj_expr in new_proj_exprs.iter_mut() {
if let Some(new_proj_expr) =
replace_old_proj_exprs.get(&proj_expr.name_for_alias()?)
{
*proj_expr = new_proj_expr.clone();
}
*proj_expr = proj_expr
.clone()
.transform(|expr| {
if let Some(new_expr) = replace_old_proj_exprs.get(&expr.name_for_alias()?)
{
Ok(Transformed::yes(new_expr.clone()))
} else {
Ok(Transformed::no(expr))
}
})?
.data;
}
let proj = datafusion_expr::LogicalPlan::Projection(Projection::try_new(
new_proj_exprs,
Arc::new(aggr_plan),
)?);
return Ok(Transformed::yes(proj));
}
}
Ok(Transformed::no(plan))
}
/// expand `avg(<expr>)` function into `cast(sum((<expr>) AS f64)/count((<expr>)`
fn expand_avg_analyzer(
plan: datafusion_expr::LogicalPlan,
) -> Result<Transformed<datafusion_expr::LogicalPlan>, DataFusionError> {
let mut schema = merge_schema(&plan.inputs());
if let datafusion_expr::LogicalPlan::TableScan(ts) = &plan {
let source_schema =
DFSchema::try_from_qualified_schema(ts.table_name.clone(), &ts.source.schema())?;
schema.merge(&source_schema);
}
let mut expr_rewrite = ExpandAvgRewriter::new(&schema);
let name_preserver = NamePreserver::new(&plan);
// apply coercion rewrite all expressions in the plan individually
plan.map_expressions(|expr| {
let original_name = name_preserver.save(&expr);
Ok(expr
.rewrite(&mut expr_rewrite)?
.update_data(|expr| original_name.restore(expr)))
})?
.map_data(|plan| plan.recompute_schema())
}
/// rewrite `avg(<expr>)` function into `CASE WHEN count(<expr>) !=0 THEN cast(sum((<expr>) AS avg_return_type)/count((<expr>) ELSE 0`
///
/// TODO(discord9): support avg return type decimal128
///
/// see impl details at https://github.com/apache/datafusion/blob/4ad4f90d86c57226a4e0fb1f79dfaaf0d404c273/datafusion/expr/src/type_coercion/aggregates.rs#L457-L462
pub(crate) struct ExpandAvgRewriter<'a> {
/// schema of the plan
#[allow(unused)]
pub(crate) schema: &'a DFSchema,
}
impl<'a> ExpandAvgRewriter<'a> {
fn new(schema: &'a DFSchema) -> Self {
Self { schema }
}
}
impl TreeNodeRewriter for ExpandAvgRewriter<'_> {
type Node = Expr;
fn f_up(&mut self, expr: Expr) -> Result<Transformed<Expr>, DataFusionError> {
if let Expr::AggregateFunction(aggr_func) = &expr
&& aggr_func.func.name() == "avg"
{
let sum_expr = {
let mut tmp = aggr_func.clone();
tmp.func = sum_udaf();
Expr::AggregateFunction(tmp)
};
let sum_cast = {
let mut tmp = sum_expr.clone();
tmp = Expr::Cast(datafusion_expr::Cast {
expr: Box::new(tmp),
data_type: arrow_schema::DataType::Float64,
});
tmp
};
let count_expr = {
let mut tmp = aggr_func.clone();
tmp.func = count_udaf();
Expr::AggregateFunction(tmp)
};
let count_expr_ref =
Expr::Column(Column::from_qualified_name(count_expr.name_for_alias()?));
let div = BinaryExpr::new(Box::new(sum_cast), Operator::Divide, Box::new(count_expr));
let div_expr = Box::new(Expr::BinaryExpr(div));
let zero = Box::new(0.lit());
let not_zero = BinaryExpr::new(Box::new(count_expr_ref), Operator::NotEq, zero.clone());
let not_zero = Box::new(Expr::BinaryExpr(not_zero));
let null = Box::new(Expr::Literal(ScalarValue::Null, None));
let case_when =
datafusion_expr::Case::new(None, vec![(not_zero, div_expr)], Some(null));
let case_when_expr = Expr::Case(case_when);
return Ok(Transformed::yes(case_when_expr));
}
Ok(Transformed::no(expr))
}
}
/// expand tumble in aggr expr to tumble_start and tumble_end with column name like `window_start`
#[derive(Debug)]
struct TumbleExpandRule;
impl AnalyzerRule for TumbleExpandRule {
fn analyze(
&self,
plan: datafusion_expr::LogicalPlan,
_config: &ConfigOptions,
) -> datafusion_common::Result<datafusion_expr::LogicalPlan> {
let transformed = plan
.transform_up_with_subqueries(expand_tumble_analyzer)?
.data;
Ok(transformed)
}
fn name(&self) -> &str {
"tumble_expand"
}
}
/// expand `tumble` in aggr expr to `tumble_start` and `tumble_end`, also expand related alias and column ref
///
/// will add `tumble_start` and `tumble_end` to outer projection if not exist before
fn expand_tumble_analyzer(
plan: datafusion_expr::LogicalPlan,
) -> Result<Transformed<datafusion_expr::LogicalPlan>, DataFusionError> {
if let datafusion_expr::LogicalPlan::Projection(proj) = &plan
&& let datafusion_expr::LogicalPlan::Aggregate(aggr) = proj.input.as_ref()
{
let mut new_group_expr = vec![];
let mut alias_to_expand = HashMap::new();
let mut encountered_tumble = false;
for expr in aggr.group_expr.iter() {
match expr {
datafusion_expr::Expr::ScalarFunction(func) if func.name() == "tumble" => {
encountered_tumble = true;
let tumble_start = TumbleExpand::new(TUMBLE_START);
let tumble_start = datafusion_expr::expr::ScalarFunction::new_udf(
Arc::new(tumble_start.into()),
func.args.clone(),
);
let tumble_start = datafusion_expr::Expr::ScalarFunction(tumble_start);
let start_col_name = tumble_start.name_for_alias()?;
new_group_expr.push(tumble_start);
let tumble_end = TumbleExpand::new(TUMBLE_END);
let tumble_end = datafusion_expr::expr::ScalarFunction::new_udf(
Arc::new(tumble_end.into()),
func.args.clone(),
);
let tumble_end = datafusion_expr::Expr::ScalarFunction(tumble_end);
let end_col_name = tumble_end.name_for_alias()?;
new_group_expr.push(tumble_end);
alias_to_expand.insert(expr.name_for_alias()?, (start_col_name, end_col_name));
}
_ => new_group_expr.push(expr.clone()),
}
}
if !encountered_tumble {
return Ok(Transformed::no(plan));
}
let mut new_aggr = aggr.clone();
new_aggr.group_expr = new_group_expr;
let new_aggr = datafusion_expr::LogicalPlan::Aggregate(new_aggr).recompute_schema()?;
// replace alias in projection if needed, and add new column ref if necessary
let mut new_proj_expr = vec![];
let mut have_expanded = false;
for proj_expr in proj.expr.iter() {
if let Some((start_col_name, end_col_name)) =
alias_to_expand.get(&proj_expr.name_for_alias()?)
{
let start_col = Column::from_qualified_name(start_col_name);
let end_col = Column::from_qualified_name(end_col_name);
new_proj_expr.push(datafusion_expr::Expr::Column(start_col));
new_proj_expr.push(datafusion_expr::Expr::Column(end_col));
have_expanded = true;
} else {
new_proj_expr.push(proj_expr.clone());
}
}
// append to end of projection if not exist
if !have_expanded {
for (start_col_name, end_col_name) in alias_to_expand.values() {
let start_col = Column::from_qualified_name(start_col_name);
let end_col = Column::from_qualified_name(end_col_name);
new_proj_expr.push(datafusion_expr::Expr::Column(start_col).alias("window_start"));
new_proj_expr.push(datafusion_expr::Expr::Column(end_col).alias("window_end"));
}
}
let new_proj = datafusion_expr::LogicalPlan::Projection(Projection::try_new(
new_proj_expr,
Arc::new(new_aggr),
)?);
return Ok(Transformed::yes(new_proj));
}
Ok(Transformed::no(plan))
}
/// This is a placeholder for tumble_start and tumble_end function, so that datafusion can
/// recognize them as scalar function
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct TumbleExpand {
signature: Signature,
name: String,
}
impl TumbleExpand {
pub fn new(name: &str) -> Self {
Self {
signature: Signature::new(TypeSignature::UserDefined, Volatility::Immutable),
name: name.to_string(),
}
}
}
impl ScalarUDFImpl for TumbleExpand {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn name(&self) -> &str {
&self.name
}
/// elide the signature for now
fn signature(&self) -> &Signature {
&self.signature
}
fn coerce_types(
&self,
arg_types: &[arrow_schema::DataType],
) -> datafusion_common::Result<Vec<arrow_schema::DataType>> {
match (arg_types.first(), arg_types.get(1), arg_types.get(2)) {
(Some(ts), Some(window), opt) => {
use arrow_schema::DataType::*;
if !matches!(ts, Date32 | Timestamp(_, _)) {
return Err(DataFusionError::Plan(
format!("Expect timestamp column as first arg for tumble_start, found {:?}", ts)
));
}
if !matches!(window, Utf8 | Interval(_)) {
return Err(DataFusionError::Plan(
format!("Expect second arg for window size's type being interval for tumble_start, found {:?}", window),
));
}
if let Some(start_time) = opt
&& !matches!(start_time, Utf8 | Date32 | Timestamp(_, _)){
return Err(DataFusionError::Plan(
format!("Expect start_time to either be date, timestamp or string, found {:?}", start_time)
));
}
Ok(arg_types.to_vec())
}
_ => Err(DataFusionError::Plan(
"Expect tumble function have at least two arg(timestamp column and window size) and a third optional arg for starting time".to_string(),
)),
}
}
fn return_type(
&self,
arg_types: &[arrow_schema::DataType],
) -> Result<arrow_schema::DataType, DataFusionError> {
arg_types.first().cloned().ok_or_else(|| {
DataFusionError::Plan(
"Expect tumble function have at least two arg(timestamp column and window size)"
.to_string(),
)
})
}
fn invoke_with_args(
&self,
_args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
Err(DataFusionError::Plan(
"This function should not be executed by datafusion".to_string(),
))
}
}
/// This rule check all group by exprs, and make sure they are also in select clause in a aggr query
#[derive(Debug)]
struct CheckGroupByRule {}

View File

@@ -382,10 +382,9 @@ impl TypedPlan {
#[cfg(test)]
mod test {
use std::time::Duration;
use bytes::BytesMut;
use common_time::{IntervalMonthDayNano, Timestamp};
use common_time::IntervalMonthDayNano;
use datatypes::data_type::ConcreteDataType as CDT;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::Value;
@@ -397,898 +396,6 @@ mod test {
use crate::repr::{ColumnType, RelationType};
use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait};
#[tokio::test]
async fn test_df_func_basic() {
let engine = create_test_query_engine();
let sql = "SELECT sum(abs(number)) FROM numbers_with_ts GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');";
let plan = sql_to_substrait(engine.clone(), sql).await;
let mut ctx = create_test_ctx();
let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan)
.await
.unwrap();
let aggr_expr = AggregateExpr {
func: AggregateFunc::SumUInt64,
expr: ScalarExpr::Column(0),
distinct: false,
};
let expected =
TypedPlan {
schema: RelationType::new(vec![
ColumnType::new(CDT::uint64_datatype(), true), // sum(number)
ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window start
ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window end
])
.with_key(vec![2])
.with_time_index(Some(1))
.into_named(vec![
Some("sum(abs(numbers_with_ts.number))".to_string()),
Some("window_start".to_string()),
Some("window_end".to_string()),
]),
plan: Plan::Mfp {
input: Box::new(
Plan::Reduce {
input: Box::new(
Plan::Get {
id: crate::expr::Id::Global(GlobalId::User(1)),
}
.with_types(
RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
ColumnType::new(
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
])
.into_named(vec![
Some("number".to_string()),
Some("ts".to_string()),
]),
)
.mfp(MapFilterProject::new(2).into_safe())
.unwrap(),
),
key_val_plan: KeyValPlan {
key_plan: MapFilterProject::new(2)
.map(vec![
ScalarExpr::Column(1).call_unary(
UnaryFunc::TumbleWindowFloor {
window_size: Duration::from_nanos(1_000_000_000),
start_time: Some(Timestamp::new_millisecond(
1625097600000,
)),
},
),
ScalarExpr::Column(1).call_unary(
UnaryFunc::TumbleWindowCeiling {
window_size: Duration::from_nanos(1_000_000_000),
start_time: Some(Timestamp::new_millisecond(
1625097600000,
)),
},
),
])
.unwrap()
.project(vec![2, 3])
.unwrap()
.into_safe(),
val_plan: MapFilterProject::new(2)
.map(vec![ScalarExpr::CallDf {
df_scalar_fn: DfScalarFunction::try_from_raw_fn(
RawDfScalarFn {
f: BytesMut::from(
b"\x08\x02\"\x08\x1a\x06\x12\x04\n\x02\x12\0"
.as_ref(),
),
input_schema: RelationType::new(vec![ColumnType::new(
ConcreteDataType::uint32_datatype(),
false,
)])
.into_unnamed(),
extensions: FunctionExtensions::from_iter(
[
(0, "tumble_start".to_string()),
(1, "tumble_end".to_string()),
(2, "abs".to_string()),
(3, "sum".to_string()),
]
.into_iter(),
),
},
)
.await
.unwrap(),
exprs: vec![ScalarExpr::Column(0)],
}
.cast(CDT::uint64_datatype())])
.unwrap()
.project(vec![2])
.unwrap()
.into_safe(),
},
reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
full_aggrs: vec![aggr_expr.clone()],
simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)],
distinct_aggrs: vec![],
}),
}
.with_types(
RelationType::new(vec![
ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window start
ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window end
ColumnType::new(CDT::uint64_datatype(), true), //sum(number)
])
.with_key(vec![1])
.with_time_index(Some(0))
.into_unnamed(),
),
),
mfp: MapFilterProject::new(3)
.map(vec![
ScalarExpr::Column(2),
ScalarExpr::Column(0),
ScalarExpr::Column(1),
])
.unwrap()
.project(vec![3, 4, 5])
.unwrap(),
},
};
assert_eq!(flow_plan, expected);
}
#[tokio::test]
async fn test_df_func_expr_tree() {
let engine = create_test_query_engine();
let sql = "SELECT abs(sum(number)) FROM numbers_with_ts GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');";
let plan = sql_to_substrait(engine.clone(), sql).await;
let mut ctx = create_test_ctx();
let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan)
.await
.unwrap();
let aggr_expr = AggregateExpr {
func: AggregateFunc::SumUInt64,
expr: ScalarExpr::Column(0),
distinct: false,
};
let expected = TypedPlan {
schema: RelationType::new(vec![
ColumnType::new(CDT::uint64_datatype(), true), // sum(number)
ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window start
ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window end
])
.with_key(vec![2])
.with_time_index(Some(1))
.into_named(vec![
Some("abs(sum(numbers_with_ts.number))".to_string()),
Some("window_start".to_string()),
Some("window_end".to_string()),
]),
plan: Plan::Mfp {
input: Box::new(
Plan::Reduce {
input: Box::new(
Plan::Get {
id: crate::expr::Id::Global(GlobalId::User(1)),
}
.with_types(
RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
ColumnType::new(
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
])
.into_named(vec![
Some("number".to_string()),
Some("ts".to_string()),
]),
)
.mfp(MapFilterProject::new(2).into_safe())
.unwrap(),
),
key_val_plan: KeyValPlan {
key_plan: MapFilterProject::new(2)
.map(vec![
ScalarExpr::Column(1).call_unary(
UnaryFunc::TumbleWindowFloor {
window_size: Duration::from_nanos(1_000_000_000),
start_time: Some(Timestamp::new_millisecond(
1625097600000,
)),
},
),
ScalarExpr::Column(1).call_unary(
UnaryFunc::TumbleWindowCeiling {
window_size: Duration::from_nanos(1_000_000_000),
start_time: Some(Timestamp::new_millisecond(
1625097600000,
)),
},
),
])
.unwrap()
.project(vec![2, 3])
.unwrap()
.into_safe(),
val_plan: MapFilterProject::new(2)
.map(vec![ScalarExpr::Column(0).cast(CDT::uint64_datatype())])
.unwrap()
.project(vec![2])
.unwrap()
.into_safe(),
},
reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
full_aggrs: vec![aggr_expr.clone()],
simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)],
distinct_aggrs: vec![],
}),
}
.with_types(
RelationType::new(vec![
ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window start
ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window end
ColumnType::new(CDT::uint64_datatype(), true), //sum(number)
])
.with_key(vec![1])
.with_time_index(Some(0))
.into_named(vec![None, None, None]),
),
),
mfp: MapFilterProject::new(3)
.map(vec![
ScalarExpr::CallDf {
df_scalar_fn: DfScalarFunction::try_from_raw_fn(RawDfScalarFn {
f: BytesMut::from(b"\"\x08\x1a\x06\x12\x04\n\x02\x12\0".as_ref()),
input_schema: RelationType::new(vec![ColumnType::new(
ConcreteDataType::uint64_datatype(),
true,
)])
.into_unnamed(),
extensions: FunctionExtensions::from_iter(
[
(0, "abs".to_string()),
(1, "tumble_start".to_string()),
(2, "tumble_end".to_string()),
(3, "sum".to_string()),
]
.into_iter(),
),
})
.await
.unwrap(),
exprs: vec![ScalarExpr::Column(2)],
},
ScalarExpr::Column(0),
ScalarExpr::Column(1),
])
.unwrap()
.project(vec![3, 4, 5])
.unwrap(),
},
};
assert_eq!(flow_plan, expected);
}
/// TODO(discord9): add more illegal sql tests
#[tokio::test]
async fn test_tumble_composite() {
let engine = create_test_query_engine();
let sql =
"SELECT number, avg(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 hour'), number";
let plan = sql_to_substrait(engine.clone(), sql).await;
let mut ctx = create_test_ctx();
let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan)
.await
.unwrap();
let aggr_exprs = vec![
AggregateExpr {
func: AggregateFunc::SumUInt64,
expr: ScalarExpr::Column(0),
distinct: false,
},
AggregateExpr {
func: AggregateFunc::Count,
expr: ScalarExpr::Column(1),
distinct: false,
},
];
let avg_expr = ScalarExpr::If {
cond: Box::new(ScalarExpr::Column(4).call_binary(
ScalarExpr::Literal(Value::from(0i64), CDT::int64_datatype()),
BinaryFunc::NotEq,
)),
then: Box::new(
ScalarExpr::Column(3)
.cast(CDT::float64_datatype())
.call_binary(
ScalarExpr::Column(4).cast(CDT::float64_datatype()),
BinaryFunc::DivFloat64,
),
),
els: Box::new(ScalarExpr::Literal(Value::Null, CDT::float64_datatype())),
};
let expected = TypedPlan {
plan: Plan::Mfp {
input: Box::new(
Plan::Reduce {
input: Box::new(
Plan::Get {
id: crate::expr::Id::Global(GlobalId::User(1)),
}
.with_types(
RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
ColumnType::new(
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
])
.into_named(vec![
Some("number".to_string()),
Some("ts".to_string()),
]),
)
.mfp(MapFilterProject::new(2).into_safe())
.unwrap(),
),
key_val_plan: KeyValPlan {
key_plan: MapFilterProject::new(2)
.map(vec![
ScalarExpr::Column(1).call_unary(
UnaryFunc::TumbleWindowFloor {
window_size: Duration::from_nanos(3_600_000_000_000),
start_time: None,
},
),
ScalarExpr::Column(1).call_unary(
UnaryFunc::TumbleWindowCeiling {
window_size: Duration::from_nanos(3_600_000_000_000),
start_time: None,
},
),
ScalarExpr::Column(0),
])
.unwrap()
.project(vec![2, 3, 4])
.unwrap()
.into_safe(),
val_plan: MapFilterProject::new(2)
.map(vec![
ScalarExpr::Column(0).cast(CDT::uint64_datatype()),
ScalarExpr::Column(0),
])
.unwrap()
.project(vec![2, 3])
.unwrap()
.into_safe(),
},
reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
full_aggrs: aggr_exprs.clone(),
simple_aggrs: vec![
AggrWithIndex::new(aggr_exprs[0].clone(), 0, 0),
AggrWithIndex::new(aggr_exprs[1].clone(), 1, 1),
],
distinct_aggrs: vec![],
}),
}
.with_types(
RelationType::new(vec![
// keys
ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window start(time index)
ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window end(pk)
ColumnType::new(CDT::uint32_datatype(), false), // number(pk)
// values
ColumnType::new(CDT::uint64_datatype(), true), // avg.sum(number)
ColumnType::new(CDT::int64_datatype(), true), // avg.count(number)
])
.with_key(vec![1, 2])
.with_time_index(Some(0))
.into_named(vec![
None,
None,
Some("number".to_string()),
None,
None,
]),
),
),
mfp: MapFilterProject::new(5)
.map(vec![
ScalarExpr::Column(2), // number(pk)
avg_expr,
ScalarExpr::Column(0), // window start
ScalarExpr::Column(1), // window end
])
.unwrap()
.project(vec![5, 6, 7, 8])
.unwrap(),
},
schema: RelationType::new(vec![
ColumnType::new(CDT::uint32_datatype(), false), // number
ColumnType::new(CDT::float64_datatype(), true), // avg(number)
ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window start
ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window end
])
.with_key(vec![0, 3])
.with_time_index(Some(2))
.into_named(vec![
Some("number".to_string()),
Some("avg(numbers_with_ts.number)".to_string()),
Some("window_start".to_string()),
Some("window_end".to_string()),
]),
};
assert_eq!(flow_plan, expected);
}
#[tokio::test]
async fn test_tumble_parse_optional() {
let engine = create_test_query_engine();
let sql = "SELECT sum(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 hour')";
let plan = sql_to_substrait(engine.clone(), sql).await;
let mut ctx = create_test_ctx();
let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan)
.await
.unwrap();
let aggr_expr = AggregateExpr {
func: AggregateFunc::SumUInt64,
expr: ScalarExpr::Column(0),
distinct: false,
};
let expected = TypedPlan {
schema: RelationType::new(vec![
ColumnType::new(CDT::uint64_datatype(), true), // sum(number)
ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window start
ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window end
])
.with_key(vec![2])
.with_time_index(Some(1))
.into_named(vec![
Some("sum(numbers_with_ts.number)".to_string()),
Some("window_start".to_string()),
Some("window_end".to_string()),
]),
plan: Plan::Mfp {
input: Box::new(
Plan::Reduce {
input: Box::new(
Plan::Get {
id: crate::expr::Id::Global(GlobalId::User(1)),
}
.with_types(
RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
ColumnType::new(
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
])
.into_named(vec![
Some("number".to_string()),
Some("ts".to_string()),
]),
)
.mfp(MapFilterProject::new(2).into_safe())
.unwrap(),
),
key_val_plan: KeyValPlan {
key_plan: MapFilterProject::new(2)
.map(vec![
ScalarExpr::Column(1).call_unary(
UnaryFunc::TumbleWindowFloor {
window_size: Duration::from_nanos(3_600_000_000_000),
start_time: None,
},
),
ScalarExpr::Column(1).call_unary(
UnaryFunc::TumbleWindowCeiling {
window_size: Duration::from_nanos(3_600_000_000_000),
start_time: None,
},
),
])
.unwrap()
.project(vec![2, 3])
.unwrap()
.into_safe(),
val_plan: MapFilterProject::new(2)
.map(vec![ScalarExpr::Column(0).cast(CDT::uint64_datatype())])
.unwrap()
.project(vec![2])
.unwrap()
.into_safe(),
},
reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
full_aggrs: vec![aggr_expr.clone()],
simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)],
distinct_aggrs: vec![],
}),
}
.with_types(
RelationType::new(vec![
ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window start
ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window end
ColumnType::new(CDT::uint64_datatype(), true), //sum(number)
])
.with_key(vec![1])
.with_time_index(Some(0))
.into_named(vec![None, None, None]),
),
),
mfp: MapFilterProject::new(3)
.map(vec![
ScalarExpr::Column(2),
ScalarExpr::Column(0),
ScalarExpr::Column(1),
])
.unwrap()
.project(vec![3, 4, 5])
.unwrap(),
},
};
assert_eq!(flow_plan, expected);
}
#[tokio::test]
async fn test_tumble_parse() {
let engine = create_test_query_engine();
let sql = "SELECT sum(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 hour', '2021-07-01 00:00:00')";
let plan = sql_to_substrait(engine.clone(), sql).await;
let mut ctx = create_test_ctx();
let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan)
.await
.unwrap();
let aggr_expr = AggregateExpr {
func: AggregateFunc::SumUInt64,
expr: ScalarExpr::Column(0),
distinct: false,
};
let expected = TypedPlan {
schema: RelationType::new(vec![
ColumnType::new(CDT::uint64_datatype(), true), // sum(number)
ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window start
ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window end
])
.with_key(vec![2])
.with_time_index(Some(1))
.into_named(vec![
Some("sum(numbers_with_ts.number)".to_string()),
Some("window_start".to_string()),
Some("window_end".to_string()),
]),
plan: Plan::Mfp {
input: Box::new(
Plan::Reduce {
input: Box::new(
Plan::Get {
id: crate::expr::Id::Global(GlobalId::User(1)),
}
.with_types(
RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
ColumnType::new(
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
])
.into_named(vec![
Some("number".to_string()),
Some("ts".to_string()),
]),
)
.mfp(MapFilterProject::new(2).into_safe())
.unwrap(),
),
key_val_plan: KeyValPlan {
key_plan: MapFilterProject::new(2)
.map(vec![
ScalarExpr::Column(1).call_unary(
UnaryFunc::TumbleWindowFloor {
window_size: Duration::from_nanos(3_600_000_000_000),
start_time: Some(Timestamp::new_millisecond(
1625097600000,
)),
},
),
ScalarExpr::Column(1).call_unary(
UnaryFunc::TumbleWindowCeiling {
window_size: Duration::from_nanos(3_600_000_000_000),
start_time: Some(Timestamp::new_millisecond(
1625097600000,
)),
},
),
])
.unwrap()
.project(vec![2, 3])
.unwrap()
.into_safe(),
val_plan: MapFilterProject::new(2)
.map(vec![ScalarExpr::Column(0).cast(CDT::uint64_datatype())])
.unwrap()
.project(vec![2])
.unwrap()
.into_safe(),
},
reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
full_aggrs: vec![aggr_expr.clone()],
simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)],
distinct_aggrs: vec![],
}),
}
.with_types(
RelationType::new(vec![
ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window start
ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window end
ColumnType::new(CDT::uint64_datatype(), true), //sum(number)
])
.with_key(vec![1])
.with_time_index(Some(0))
.into_unnamed(),
),
),
mfp: MapFilterProject::new(3)
.map(vec![
ScalarExpr::Column(2),
ScalarExpr::Column(0),
ScalarExpr::Column(1),
])
.unwrap()
.project(vec![3, 4, 5])
.unwrap(),
},
};
assert_eq!(flow_plan, expected);
}
#[tokio::test]
async fn test_avg_group_by() {
let engine = create_test_query_engine();
let sql = "SELECT avg(number), number FROM numbers GROUP BY number";
let plan = sql_to_substrait(engine.clone(), sql).await;
let mut ctx = create_test_ctx();
let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).await;
let aggr_exprs = vec![
AggregateExpr {
func: AggregateFunc::SumUInt64,
expr: ScalarExpr::Column(0),
distinct: false,
},
AggregateExpr {
func: AggregateFunc::Count,
expr: ScalarExpr::Column(1),
distinct: false,
},
];
let avg_expr = ScalarExpr::If {
cond: Box::new(ScalarExpr::Column(2).call_binary(
ScalarExpr::Literal(Value::from(0i64), CDT::int64_datatype()),
BinaryFunc::NotEq,
)),
then: Box::new(
ScalarExpr::Column(1)
.cast(CDT::float64_datatype())
.call_binary(
ScalarExpr::Column(2).cast(CDT::float64_datatype()),
BinaryFunc::DivFloat64,
),
),
els: Box::new(ScalarExpr::Literal(Value::Null, CDT::float64_datatype())),
};
let expected = TypedPlan {
schema: RelationType::new(vec![
ColumnType::new(CDT::float64_datatype(), true), // avg(number: u32) -> f64
ColumnType::new(CDT::uint32_datatype(), false), // number
])
.with_key(vec![1])
.into_named(vec![
Some("avg(numbers.number)".to_string()),
Some("number".to_string()),
]),
plan: Plan::Mfp {
input: Box::new(
Plan::Reduce {
input: Box::new(
Plan::Get {
id: crate::expr::Id::Global(GlobalId::User(0)),
}
.with_types(
RelationType::new(vec![ColumnType::new(
ConcreteDataType::uint32_datatype(),
false,
)])
.into_named(vec![Some("number".to_string())]),
)
.mfp(
MapFilterProject::new(1)
.project(vec![0])
.unwrap()
.into_safe(),
)
.unwrap(),
),
key_val_plan: KeyValPlan {
key_plan: MapFilterProject::new(1)
.map(vec![ScalarExpr::Column(0)])
.unwrap()
.project(vec![1])
.unwrap()
.into_safe(),
val_plan: MapFilterProject::new(1)
.map(vec![
ScalarExpr::Column(0).cast(CDT::uint64_datatype()),
ScalarExpr::Column(0),
])
.unwrap()
.project(vec![1, 2])
.unwrap()
.into_safe(),
},
reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
full_aggrs: aggr_exprs.clone(),
simple_aggrs: vec![
AggrWithIndex::new(aggr_exprs[0].clone(), 0, 0),
AggrWithIndex::new(aggr_exprs[1].clone(), 1, 1),
],
distinct_aggrs: vec![],
}),
}
.with_types(
RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false), // key: number
ColumnType::new(ConcreteDataType::uint64_datatype(), true), // sum
ColumnType::new(ConcreteDataType::int64_datatype(), true), // count
])
.with_key(vec![0])
.into_named(vec![
Some("number".to_string()),
None,
None,
]),
),
),
mfp: MapFilterProject::new(3)
.map(vec![
avg_expr, // col 3
ScalarExpr::Column(0),
// TODO(discord9): optimize mfp so to remove indirect ref
])
.unwrap()
.project(vec![3, 4])
.unwrap(),
},
};
assert_eq!(flow_plan.unwrap(), expected);
}
#[tokio::test]
async fn test_avg() {
let engine = create_test_query_engine();
let sql = "SELECT avg(number) FROM numbers";
let plan = sql_to_substrait(engine.clone(), sql).await;
let mut ctx = create_test_ctx();
let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan)
.await
.unwrap();
let aggr_exprs = vec![
AggregateExpr {
func: AggregateFunc::SumUInt64,
expr: ScalarExpr::Column(0),
distinct: false,
},
AggregateExpr {
func: AggregateFunc::Count,
expr: ScalarExpr::Column(1),
distinct: false,
},
];
let avg_expr = ScalarExpr::If {
cond: Box::new(ScalarExpr::Column(1).call_binary(
ScalarExpr::Literal(Value::from(0i64), CDT::int64_datatype()),
BinaryFunc::NotEq,
)),
then: Box::new(
ScalarExpr::Column(0)
.cast(CDT::float64_datatype())
.call_binary(
ScalarExpr::Column(1).cast(CDT::float64_datatype()),
BinaryFunc::DivFloat64,
),
),
els: Box::new(ScalarExpr::Literal(Value::Null, CDT::float64_datatype())),
};
let input = Box::new(
Plan::Get {
id: crate::expr::Id::Global(GlobalId::User(0)),
}
.with_types(
RelationType::new(vec![ColumnType::new(
ConcreteDataType::uint32_datatype(),
false,
)])
.into_named(vec![Some("number".to_string())]),
),
);
let expected = TypedPlan {
schema: RelationType::new(vec![ColumnType::new(CDT::float64_datatype(), true)])
.into_named(vec![Some("avg(numbers.number)".to_string())]),
plan: Plan::Mfp {
input: Box::new(
Plan::Reduce {
input: Box::new(
Plan::Mfp {
input: input.clone(),
mfp: MapFilterProject::new(1).project(vec![0]).unwrap(),
}
.with_types(
RelationType::new(vec![ColumnType::new(
CDT::uint32_datatype(),
false,
)])
.into_named(vec![Some("number".to_string())]),
),
),
key_val_plan: KeyValPlan {
key_plan: MapFilterProject::new(1)
.project(vec![])
.unwrap()
.into_safe(),
val_plan: MapFilterProject::new(1)
.map(vec![
ScalarExpr::Column(0).cast(CDT::uint64_datatype()),
ScalarExpr::Column(0),
])
.unwrap()
.project(vec![1, 2])
.unwrap()
.into_safe(),
},
reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
full_aggrs: aggr_exprs.clone(),
simple_aggrs: vec![
AggrWithIndex::new(aggr_exprs[0].clone(), 0, 0),
AggrWithIndex::new(aggr_exprs[1].clone(), 1, 1),
],
distinct_aggrs: vec![],
}),
}
.with_types(
RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint64_datatype(), true), // sum
ColumnType::new(ConcreteDataType::int64_datatype(), true), // count
])
.into_named(vec![None, None]),
),
),
mfp: MapFilterProject::new(2)
.map(vec![
avg_expr,
// TODO(discord9): optimize mfp so to remove indirect ref
])
.unwrap()
.project(vec![2])
.unwrap(),
},
};
assert_eq!(flow_plan, expected);
}
#[tokio::test]
async fn test_sum() {
let engine = create_test_query_engine();

View File

@@ -0,0 +1,126 @@
CREATE TABLE sensor_readings (
`value` DOUBLE,
ts TIMESTAMP TIME INDEX,
sensor STRING,
loc STRING,
PRIMARY KEY (sensor, loc)
);
Affected Rows: 0
CREATE TABLE sensor_readings_avg (
`value` DOUBLE,
ts TIMESTAMP TIME INDEX,
sensor STRING,
PRIMARY KEY (sensor)
);
Affected Rows: 0
INSERT INTO sensor_readings VALUES
(20, now() - '30s'::interval, 'test', 'A');
Affected Rows: 1
-- SQLNESS REPLACE (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}) TS
TQL EVAL (now() - '1m'::interval, now(), '1m')
avg by(sensor) (sensor_readings) AS value;
+-------+--------+---------------------+
| value | sensor | ts |
+-------+--------+---------------------+
| 20.0 | test | TS |
+-------+--------+---------------------+
-- SQLNESS REPLACE (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}) TS
TQL EVAL (now() - '1m'::interval, now(), '1m') (sum by(sensor) (sensor_readings) / count by(sensor) (sensor_readings)) AS value;
+-------+--------+---------------------+
| value | sensor | ts |
+-------+--------+---------------------+
| 20.0 | test | TS |
+-------+--------+---------------------+
CREATE FLOW sensor_readings_avg_flow
SINK TO sensor_readings_avg
EVAL INTERVAL '1m' AS
TQL EVAL (now() - '1m'::interval, now(), '1m')
avg by(sensor) (sensor_readings) AS value;
Affected Rows: 0
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('sensor_readings_avg_flow');
+----------------------------------------------+
| ADMIN FLUSH_FLOW('sensor_readings_avg_flow') |
+----------------------------------------------+
| FLOW_FLUSHED |
+----------------------------------------------+
-- SQLNESS REPLACE (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}) TS
SELECT * FROM sensor_readings_avg ORDER BY ts DESC LIMIT 1;
+-------+---------------------+--------+
| value | ts | sensor |
+-------+---------------------+--------+
| 20.0 | TS | test |
+-------+---------------------+--------+
DROP FLOW sensor_readings_avg_flow;
Affected Rows: 0
-- SQLNESS SLEEP 1s
INSERT INTO sensor_readings VALUES
(30, now() - '40s'::interval, 'test', 'B');
Affected Rows: 1
-- SQLNESS REPLACE (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}) TS
TQL EVAL (now() - '1m'::interval, now(), '1m')
avg by(sensor) (sensor_readings) AS value;
+-------+--------+---------------------+
| value | sensor | ts |
+-------+--------+---------------------+
| 25.0 | test | TS |
+-------+--------+---------------------+
CREATE FLOW sensor_readings_avg_flow
SINK TO sensor_readings_avg
EVAL INTERVAL '1m' AS
TQL EVAL (now() - '1m'::interval, now(), '1m') (sum by(sensor) (sensor_readings) / count by(sensor) (sensor_readings)) AS value;
Affected Rows: 0
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('sensor_readings_avg_flow');
+----------------------------------------------+
| ADMIN FLUSH_FLOW('sensor_readings_avg_flow') |
+----------------------------------------------+
| FLOW_FLUSHED |
+----------------------------------------------+
-- SQLNESS REPLACE (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}) TS
SELECT * FROM sensor_readings_avg ORDER BY ts DESC LIMIT 1;
+-------+---------------------+--------+
| value | ts | sensor |
+-------+---------------------+--------+
| 25.0 | TS | test |
+-------+---------------------+--------+
DROP FLOW sensor_readings_avg_flow;
Affected Rows: 0
DROP TABLE sensor_readings_avg;
Affected Rows: 0
DROP TABLE sensor_readings;
Affected Rows: 0

View File

@@ -0,0 +1,63 @@
CREATE TABLE sensor_readings (
`value` DOUBLE,
ts TIMESTAMP TIME INDEX,
sensor STRING,
loc STRING,
PRIMARY KEY (sensor, loc)
);
CREATE TABLE sensor_readings_avg (
`value` DOUBLE,
ts TIMESTAMP TIME INDEX,
sensor STRING,
PRIMARY KEY (sensor)
);
INSERT INTO sensor_readings VALUES
(20, now() - '30s'::interval, 'test', 'A');
-- SQLNESS REPLACE (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}) TS
TQL EVAL (now() - '1m'::interval, now(), '1m')
avg by(sensor) (sensor_readings) AS value;
-- SQLNESS REPLACE (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}) TS
TQL EVAL (now() - '1m'::interval, now(), '1m') (sum by(sensor) (sensor_readings) / count by(sensor) (sensor_readings)) AS value;
CREATE FLOW sensor_readings_avg_flow
SINK TO sensor_readings_avg
EVAL INTERVAL '1m' AS
TQL EVAL (now() - '1m'::interval, now(), '1m')
avg by(sensor) (sensor_readings) AS value;
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('sensor_readings_avg_flow');
-- SQLNESS REPLACE (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}) TS
SELECT * FROM sensor_readings_avg ORDER BY ts DESC LIMIT 1;
DROP FLOW sensor_readings_avg_flow;
-- SQLNESS SLEEP 1s
INSERT INTO sensor_readings VALUES
(30, now() - '40s'::interval, 'test', 'B');
-- SQLNESS REPLACE (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}) TS
TQL EVAL (now() - '1m'::interval, now(), '1m')
avg by(sensor) (sensor_readings) AS value;
CREATE FLOW sensor_readings_avg_flow
SINK TO sensor_readings_avg
EVAL INTERVAL '1m' AS
TQL EVAL (now() - '1m'::interval, now(), '1m') (sum by(sensor) (sensor_readings) / count by(sensor) (sensor_readings)) AS value;
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('sensor_readings_avg_flow');
-- SQLNESS REPLACE (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}) TS
SELECT * FROM sensor_readings_avg ORDER BY ts DESC LIMIT 1;
DROP FLOW sensor_readings_avg_flow;
DROP TABLE sensor_readings_avg;
DROP TABLE sensor_readings;