diff --git a/src/flow/src/df_optimizer.rs b/src/flow/src/df_optimizer.rs index 1d41d09346..614b79ccf1 100644 --- a/src/flow/src/df_optimizer.rs +++ b/src/flow/src/df_optimizer.rs @@ -16,30 +16,19 @@ #![warn(unused)] -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::sync::Arc; use common_error::ext::BoxedError; use common_telemetry::debug; use datafusion::config::ConfigOptions; use datafusion::error::DataFusionError; -use datafusion::functions_aggregate::count::count_udaf; -use datafusion::functions_aggregate::sum::sum_udaf; use datafusion::optimizer::analyzer::type_coercion::TypeCoercion; use datafusion::optimizer::common_subexpr_eliminate::CommonSubexprEliminate; use datafusion::optimizer::optimize_projections::OptimizeProjections; use datafusion::optimizer::simplify_expressions::SimplifyExpressions; -use datafusion::optimizer::utils::NamePreserver; use datafusion::optimizer::{Analyzer, AnalyzerRule, Optimizer, OptimizerContext}; -use datafusion_common::tree_node::{ - Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter, TreeNodeVisitor, -}; -use datafusion_common::{Column, DFSchema, ScalarValue}; -use datafusion_expr::utils::merge_schema; -use datafusion_expr::{ - BinaryExpr, ColumnarValue, Expr, Literal, Operator, Projection, ScalarFunctionArgs, - ScalarUDFImpl, Signature, TypeSignature, Volatility, -}; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use query::QueryEngine; use query::optimizer::count_wildcard::CountWildcardToTimeIndexRule; use query::parser::QueryLanguageParser; @@ -52,7 +41,6 @@ use substrait::DFLogicalSubstraitConvertor; use crate::adapter::FlownodeContext; use crate::error::{DatafusionSnafu, Error, ExternalSnafu, UnexpectedSnafu}; -use crate::expr::{TUMBLE_END, TUMBLE_START}; use crate::plan::TypedPlan; // TODO(discord9): use `Analyzer` to manage rules if more `AnalyzerRule` is needed @@ -63,8 +51,6 @@ pub async fn apply_df_optimizer( let cfg = query_ctx.create_config_options(); let analyzer = Analyzer::with_rules(vec![ Arc::new(CountWildcardToTimeIndexRule), - Arc::new(AvgExpandRule), - Arc::new(TumbleExpandRule), Arc::new(CheckGroupByRule::new()), Arc::new(TypeCoercion::new()), ]); @@ -127,390 +113,6 @@ pub async fn sql_to_flow_plan( Ok(flow_plan) } -#[derive(Debug)] -struct AvgExpandRule; - -impl AnalyzerRule for AvgExpandRule { - fn analyze( - &self, - plan: datafusion_expr::LogicalPlan, - _config: &ConfigOptions, - ) -> datafusion_common::Result { - let transformed = plan - .transform_up_with_subqueries(expand_avg_analyzer)? - .data - .transform_down_with_subqueries(put_aggr_to_proj_analyzer)? - .data; - Ok(transformed) - } - - fn name(&self) -> &str { - "avg_expand" - } -} - -/// lift aggr's composite aggr_expr to outer proj, and leave aggr only with simple direct aggr expr -/// i.e. -/// ```ignore -/// proj: avg(x) -/// -- aggr: [sum(x)/count(x) as avg(x)] -/// ``` -/// becomes: -/// ```ignore -/// proj: sum(x)/count(x) as avg(x) -/// -- aggr: [sum(x), count(x)] -/// ``` -fn put_aggr_to_proj_analyzer( - plan: datafusion_expr::LogicalPlan, -) -> Result, DataFusionError> { - if let datafusion_expr::LogicalPlan::Projection(proj) = &plan - && let datafusion_expr::LogicalPlan::Aggregate(aggr) = proj.input.as_ref() - { - let mut replace_old_proj_exprs = HashMap::new(); - let mut expanded_aggr_exprs = vec![]; - for aggr_expr in &aggr.aggr_expr { - let mut is_composite = false; - if let Expr::AggregateFunction(_) = &aggr_expr { - expanded_aggr_exprs.push(aggr_expr.clone()); - } else { - let old_name = aggr_expr.name_for_alias()?; - let new_proj_expr = aggr_expr - .clone() - .transform(|ch| { - if let Expr::AggregateFunction(_) = &ch { - is_composite = true; - expanded_aggr_exprs.push(ch.clone()); - Ok(Transformed::yes(Expr::Column(Column::from_qualified_name( - ch.name_for_alias()?, - )))) - } else { - Ok(Transformed::no(ch)) - } - })? - .data; - replace_old_proj_exprs.insert(old_name, new_proj_expr); - } - } - - if expanded_aggr_exprs.len() > aggr.aggr_expr.len() { - let mut aggr = aggr.clone(); - aggr.aggr_expr = expanded_aggr_exprs; - let mut aggr_plan = datafusion_expr::LogicalPlan::Aggregate(aggr); - // important to recompute schema after changing aggr_expr - aggr_plan = aggr_plan.recompute_schema()?; - - // reconstruct proj with new proj_exprs - let mut new_proj_exprs = proj.expr.clone(); - for proj_expr in new_proj_exprs.iter_mut() { - if let Some(new_proj_expr) = - replace_old_proj_exprs.get(&proj_expr.name_for_alias()?) - { - *proj_expr = new_proj_expr.clone(); - } - *proj_expr = proj_expr - .clone() - .transform(|expr| { - if let Some(new_expr) = replace_old_proj_exprs.get(&expr.name_for_alias()?) - { - Ok(Transformed::yes(new_expr.clone())) - } else { - Ok(Transformed::no(expr)) - } - })? - .data; - } - let proj = datafusion_expr::LogicalPlan::Projection(Projection::try_new( - new_proj_exprs, - Arc::new(aggr_plan), - )?); - return Ok(Transformed::yes(proj)); - } - } - Ok(Transformed::no(plan)) -} - -/// expand `avg()` function into `cast(sum(() AS f64)/count(()` -fn expand_avg_analyzer( - plan: datafusion_expr::LogicalPlan, -) -> Result, DataFusionError> { - let mut schema = merge_schema(&plan.inputs()); - - if let datafusion_expr::LogicalPlan::TableScan(ts) = &plan { - let source_schema = - DFSchema::try_from_qualified_schema(ts.table_name.clone(), &ts.source.schema())?; - schema.merge(&source_schema); - } - - let mut expr_rewrite = ExpandAvgRewriter::new(&schema); - - let name_preserver = NamePreserver::new(&plan); - // apply coercion rewrite all expressions in the plan individually - plan.map_expressions(|expr| { - let original_name = name_preserver.save(&expr); - Ok(expr - .rewrite(&mut expr_rewrite)? - .update_data(|expr| original_name.restore(expr))) - })? - .map_data(|plan| plan.recompute_schema()) -} - -/// rewrite `avg()` function into `CASE WHEN count() !=0 THEN cast(sum(() AS avg_return_type)/count(() ELSE 0` -/// -/// TODO(discord9): support avg return type decimal128 -/// -/// see impl details at https://github.com/apache/datafusion/blob/4ad4f90d86c57226a4e0fb1f79dfaaf0d404c273/datafusion/expr/src/type_coercion/aggregates.rs#L457-L462 -pub(crate) struct ExpandAvgRewriter<'a> { - /// schema of the plan - #[allow(unused)] - pub(crate) schema: &'a DFSchema, -} - -impl<'a> ExpandAvgRewriter<'a> { - fn new(schema: &'a DFSchema) -> Self { - Self { schema } - } -} - -impl TreeNodeRewriter for ExpandAvgRewriter<'_> { - type Node = Expr; - - fn f_up(&mut self, expr: Expr) -> Result, DataFusionError> { - if let Expr::AggregateFunction(aggr_func) = &expr - && aggr_func.func.name() == "avg" - { - let sum_expr = { - let mut tmp = aggr_func.clone(); - tmp.func = sum_udaf(); - Expr::AggregateFunction(tmp) - }; - let sum_cast = { - let mut tmp = sum_expr.clone(); - tmp = Expr::Cast(datafusion_expr::Cast { - expr: Box::new(tmp), - data_type: arrow_schema::DataType::Float64, - }); - tmp - }; - - let count_expr = { - let mut tmp = aggr_func.clone(); - tmp.func = count_udaf(); - - Expr::AggregateFunction(tmp) - }; - let count_expr_ref = - Expr::Column(Column::from_qualified_name(count_expr.name_for_alias()?)); - - let div = BinaryExpr::new(Box::new(sum_cast), Operator::Divide, Box::new(count_expr)); - let div_expr = Box::new(Expr::BinaryExpr(div)); - - let zero = Box::new(0.lit()); - let not_zero = BinaryExpr::new(Box::new(count_expr_ref), Operator::NotEq, zero.clone()); - let not_zero = Box::new(Expr::BinaryExpr(not_zero)); - let null = Box::new(Expr::Literal(ScalarValue::Null, None)); - - let case_when = - datafusion_expr::Case::new(None, vec![(not_zero, div_expr)], Some(null)); - let case_when_expr = Expr::Case(case_when); - - return Ok(Transformed::yes(case_when_expr)); - } - - Ok(Transformed::no(expr)) - } -} - -/// expand tumble in aggr expr to tumble_start and tumble_end with column name like `window_start` -#[derive(Debug)] -struct TumbleExpandRule; - -impl AnalyzerRule for TumbleExpandRule { - fn analyze( - &self, - plan: datafusion_expr::LogicalPlan, - _config: &ConfigOptions, - ) -> datafusion_common::Result { - let transformed = plan - .transform_up_with_subqueries(expand_tumble_analyzer)? - .data; - Ok(transformed) - } - - fn name(&self) -> &str { - "tumble_expand" - } -} - -/// expand `tumble` in aggr expr to `tumble_start` and `tumble_end`, also expand related alias and column ref -/// -/// will add `tumble_start` and `tumble_end` to outer projection if not exist before -fn expand_tumble_analyzer( - plan: datafusion_expr::LogicalPlan, -) -> Result, DataFusionError> { - if let datafusion_expr::LogicalPlan::Projection(proj) = &plan - && let datafusion_expr::LogicalPlan::Aggregate(aggr) = proj.input.as_ref() - { - let mut new_group_expr = vec![]; - let mut alias_to_expand = HashMap::new(); - let mut encountered_tumble = false; - for expr in aggr.group_expr.iter() { - match expr { - datafusion_expr::Expr::ScalarFunction(func) if func.name() == "tumble" => { - encountered_tumble = true; - - let tumble_start = TumbleExpand::new(TUMBLE_START); - let tumble_start = datafusion_expr::expr::ScalarFunction::new_udf( - Arc::new(tumble_start.into()), - func.args.clone(), - ); - let tumble_start = datafusion_expr::Expr::ScalarFunction(tumble_start); - let start_col_name = tumble_start.name_for_alias()?; - new_group_expr.push(tumble_start); - - let tumble_end = TumbleExpand::new(TUMBLE_END); - let tumble_end = datafusion_expr::expr::ScalarFunction::new_udf( - Arc::new(tumble_end.into()), - func.args.clone(), - ); - let tumble_end = datafusion_expr::Expr::ScalarFunction(tumble_end); - let end_col_name = tumble_end.name_for_alias()?; - new_group_expr.push(tumble_end); - - alias_to_expand.insert(expr.name_for_alias()?, (start_col_name, end_col_name)); - } - _ => new_group_expr.push(expr.clone()), - } - } - if !encountered_tumble { - return Ok(Transformed::no(plan)); - } - let mut new_aggr = aggr.clone(); - new_aggr.group_expr = new_group_expr; - let new_aggr = datafusion_expr::LogicalPlan::Aggregate(new_aggr).recompute_schema()?; - // replace alias in projection if needed, and add new column ref if necessary - let mut new_proj_expr = vec![]; - let mut have_expanded = false; - - for proj_expr in proj.expr.iter() { - if let Some((start_col_name, end_col_name)) = - alias_to_expand.get(&proj_expr.name_for_alias()?) - { - let start_col = Column::from_qualified_name(start_col_name); - let end_col = Column::from_qualified_name(end_col_name); - new_proj_expr.push(datafusion_expr::Expr::Column(start_col)); - new_proj_expr.push(datafusion_expr::Expr::Column(end_col)); - have_expanded = true; - } else { - new_proj_expr.push(proj_expr.clone()); - } - } - - // append to end of projection if not exist - if !have_expanded { - for (start_col_name, end_col_name) in alias_to_expand.values() { - let start_col = Column::from_qualified_name(start_col_name); - let end_col = Column::from_qualified_name(end_col_name); - new_proj_expr.push(datafusion_expr::Expr::Column(start_col).alias("window_start")); - new_proj_expr.push(datafusion_expr::Expr::Column(end_col).alias("window_end")); - } - } - - let new_proj = datafusion_expr::LogicalPlan::Projection(Projection::try_new( - new_proj_expr, - Arc::new(new_aggr), - )?); - return Ok(Transformed::yes(new_proj)); - } - - Ok(Transformed::no(plan)) -} - -/// This is a placeholder for tumble_start and tumble_end function, so that datafusion can -/// recognize them as scalar function -#[derive(Debug, PartialEq, Eq, Hash)] -pub struct TumbleExpand { - signature: Signature, - name: String, -} - -impl TumbleExpand { - pub fn new(name: &str) -> Self { - Self { - signature: Signature::new(TypeSignature::UserDefined, Volatility::Immutable), - name: name.to_string(), - } - } -} - -impl ScalarUDFImpl for TumbleExpand { - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn name(&self) -> &str { - &self.name - } - - /// elide the signature for now - fn signature(&self) -> &Signature { - &self.signature - } - - fn coerce_types( - &self, - arg_types: &[arrow_schema::DataType], - ) -> datafusion_common::Result> { - match (arg_types.first(), arg_types.get(1), arg_types.get(2)) { - (Some(ts), Some(window), opt) => { - use arrow_schema::DataType::*; - if !matches!(ts, Date32 | Timestamp(_, _)) { - return Err(DataFusionError::Plan( - format!("Expect timestamp column as first arg for tumble_start, found {:?}", ts) - )); - } - if !matches!(window, Utf8 | Interval(_)) { - return Err(DataFusionError::Plan( - format!("Expect second arg for window size's type being interval for tumble_start, found {:?}", window), - )); - } - - if let Some(start_time) = opt - && !matches!(start_time, Utf8 | Date32 | Timestamp(_, _)){ - return Err(DataFusionError::Plan( - format!("Expect start_time to either be date, timestamp or string, found {:?}", start_time) - )); - } - - Ok(arg_types.to_vec()) - } - _ => Err(DataFusionError::Plan( - "Expect tumble function have at least two arg(timestamp column and window size) and a third optional arg for starting time".to_string(), - )), - } - } - - fn return_type( - &self, - arg_types: &[arrow_schema::DataType], - ) -> Result { - arg_types.first().cloned().ok_or_else(|| { - DataFusionError::Plan( - "Expect tumble function have at least two arg(timestamp column and window size)" - .to_string(), - ) - }) - } - - fn invoke_with_args( - &self, - _args: ScalarFunctionArgs, - ) -> datafusion_common::Result { - Err(DataFusionError::Plan( - "This function should not be executed by datafusion".to_string(), - )) - } -} - /// This rule check all group by exprs, and make sure they are also in select clause in a aggr query #[derive(Debug)] struct CheckGroupByRule {} diff --git a/src/flow/src/transform/aggr.rs b/src/flow/src/transform/aggr.rs index 579f0e8ee3..861ca8fe65 100644 --- a/src/flow/src/transform/aggr.rs +++ b/src/flow/src/transform/aggr.rs @@ -382,10 +382,9 @@ impl TypedPlan { #[cfg(test)] mod test { - use std::time::Duration; use bytes::BytesMut; - use common_time::{IntervalMonthDayNano, Timestamp}; + use common_time::IntervalMonthDayNano; use datatypes::data_type::ConcreteDataType as CDT; use datatypes::prelude::ConcreteDataType; use datatypes::value::Value; @@ -397,898 +396,6 @@ mod test { use crate::repr::{ColumnType, RelationType}; use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait}; - #[tokio::test] - async fn test_df_func_basic() { - let engine = create_test_query_engine(); - let sql = "SELECT sum(abs(number)) FROM numbers_with_ts GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');"; - let plan = sql_to_substrait(engine.clone(), sql).await; - - let mut ctx = create_test_ctx(); - let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan) - .await - .unwrap(); - - let aggr_expr = AggregateExpr { - func: AggregateFunc::SumUInt64, - expr: ScalarExpr::Column(0), - distinct: false, - }; - let expected = - TypedPlan { - schema: RelationType::new(vec![ - ColumnType::new(CDT::uint64_datatype(), true), // sum(number) - ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window start - ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window end - ]) - .with_key(vec![2]) - .with_time_index(Some(1)) - .into_named(vec![ - Some("sum(abs(numbers_with_ts.number))".to_string()), - Some("window_start".to_string()), - Some("window_end".to_string()), - ]), - plan: Plan::Mfp { - input: Box::new( - Plan::Reduce { - input: Box::new( - Plan::Get { - id: crate::expr::Id::Global(GlobalId::User(1)), - } - .with_types( - RelationType::new(vec![ - ColumnType::new(ConcreteDataType::uint32_datatype(), false), - ColumnType::new( - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ), - ]) - .into_named(vec![ - Some("number".to_string()), - Some("ts".to_string()), - ]), - ) - .mfp(MapFilterProject::new(2).into_safe()) - .unwrap(), - ), - key_val_plan: KeyValPlan { - key_plan: MapFilterProject::new(2) - .map(vec![ - ScalarExpr::Column(1).call_unary( - UnaryFunc::TumbleWindowFloor { - window_size: Duration::from_nanos(1_000_000_000), - start_time: Some(Timestamp::new_millisecond( - 1625097600000, - )), - }, - ), - ScalarExpr::Column(1).call_unary( - UnaryFunc::TumbleWindowCeiling { - window_size: Duration::from_nanos(1_000_000_000), - start_time: Some(Timestamp::new_millisecond( - 1625097600000, - )), - }, - ), - ]) - .unwrap() - .project(vec![2, 3]) - .unwrap() - .into_safe(), - val_plan: MapFilterProject::new(2) - .map(vec![ScalarExpr::CallDf { - df_scalar_fn: DfScalarFunction::try_from_raw_fn( - RawDfScalarFn { - f: BytesMut::from( - b"\x08\x02\"\x08\x1a\x06\x12\x04\n\x02\x12\0" - .as_ref(), - ), - input_schema: RelationType::new(vec![ColumnType::new( - ConcreteDataType::uint32_datatype(), - false, - )]) - .into_unnamed(), - extensions: FunctionExtensions::from_iter( - [ - (0, "tumble_start".to_string()), - (1, "tumble_end".to_string()), - (2, "abs".to_string()), - (3, "sum".to_string()), - ] - .into_iter(), - ), - }, - ) - .await - .unwrap(), - exprs: vec![ScalarExpr::Column(0)], - } - .cast(CDT::uint64_datatype())]) - .unwrap() - .project(vec![2]) - .unwrap() - .into_safe(), - }, - reduce_plan: ReducePlan::Accumulable(AccumulablePlan { - full_aggrs: vec![aggr_expr.clone()], - simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)], - distinct_aggrs: vec![], - }), - } - .with_types( - RelationType::new(vec![ - ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window start - ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window end - ColumnType::new(CDT::uint64_datatype(), true), //sum(number) - ]) - .with_key(vec![1]) - .with_time_index(Some(0)) - .into_unnamed(), - ), - ), - mfp: MapFilterProject::new(3) - .map(vec![ - ScalarExpr::Column(2), - ScalarExpr::Column(0), - ScalarExpr::Column(1), - ]) - .unwrap() - .project(vec![3, 4, 5]) - .unwrap(), - }, - }; - assert_eq!(flow_plan, expected); - } - - #[tokio::test] - async fn test_df_func_expr_tree() { - let engine = create_test_query_engine(); - let sql = "SELECT abs(sum(number)) FROM numbers_with_ts GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');"; - let plan = sql_to_substrait(engine.clone(), sql).await; - - let mut ctx = create_test_ctx(); - let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan) - .await - .unwrap(); - - let aggr_expr = AggregateExpr { - func: AggregateFunc::SumUInt64, - expr: ScalarExpr::Column(0), - distinct: false, - }; - let expected = TypedPlan { - schema: RelationType::new(vec![ - ColumnType::new(CDT::uint64_datatype(), true), // sum(number) - ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window start - ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window end - ]) - .with_key(vec![2]) - .with_time_index(Some(1)) - .into_named(vec![ - Some("abs(sum(numbers_with_ts.number))".to_string()), - Some("window_start".to_string()), - Some("window_end".to_string()), - ]), - plan: Plan::Mfp { - input: Box::new( - Plan::Reduce { - input: Box::new( - Plan::Get { - id: crate::expr::Id::Global(GlobalId::User(1)), - } - .with_types( - RelationType::new(vec![ - ColumnType::new(ConcreteDataType::uint32_datatype(), false), - ColumnType::new( - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ), - ]) - .into_named(vec![ - Some("number".to_string()), - Some("ts".to_string()), - ]), - ) - .mfp(MapFilterProject::new(2).into_safe()) - .unwrap(), - ), - key_val_plan: KeyValPlan { - key_plan: MapFilterProject::new(2) - .map(vec![ - ScalarExpr::Column(1).call_unary( - UnaryFunc::TumbleWindowFloor { - window_size: Duration::from_nanos(1_000_000_000), - start_time: Some(Timestamp::new_millisecond( - 1625097600000, - )), - }, - ), - ScalarExpr::Column(1).call_unary( - UnaryFunc::TumbleWindowCeiling { - window_size: Duration::from_nanos(1_000_000_000), - start_time: Some(Timestamp::new_millisecond( - 1625097600000, - )), - }, - ), - ]) - .unwrap() - .project(vec![2, 3]) - .unwrap() - .into_safe(), - val_plan: MapFilterProject::new(2) - .map(vec![ScalarExpr::Column(0).cast(CDT::uint64_datatype())]) - .unwrap() - .project(vec![2]) - .unwrap() - .into_safe(), - }, - reduce_plan: ReducePlan::Accumulable(AccumulablePlan { - full_aggrs: vec![aggr_expr.clone()], - simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)], - distinct_aggrs: vec![], - }), - } - .with_types( - RelationType::new(vec![ - ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window start - ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window end - ColumnType::new(CDT::uint64_datatype(), true), //sum(number) - ]) - .with_key(vec![1]) - .with_time_index(Some(0)) - .into_named(vec![None, None, None]), - ), - ), - mfp: MapFilterProject::new(3) - .map(vec![ - ScalarExpr::CallDf { - df_scalar_fn: DfScalarFunction::try_from_raw_fn(RawDfScalarFn { - f: BytesMut::from(b"\"\x08\x1a\x06\x12\x04\n\x02\x12\0".as_ref()), - input_schema: RelationType::new(vec![ColumnType::new( - ConcreteDataType::uint64_datatype(), - true, - )]) - .into_unnamed(), - extensions: FunctionExtensions::from_iter( - [ - (0, "abs".to_string()), - (1, "tumble_start".to_string()), - (2, "tumble_end".to_string()), - (3, "sum".to_string()), - ] - .into_iter(), - ), - }) - .await - .unwrap(), - exprs: vec![ScalarExpr::Column(2)], - }, - ScalarExpr::Column(0), - ScalarExpr::Column(1), - ]) - .unwrap() - .project(vec![3, 4, 5]) - .unwrap(), - }, - }; - assert_eq!(flow_plan, expected); - } - - /// TODO(discord9): add more illegal sql tests - #[tokio::test] - async fn test_tumble_composite() { - let engine = create_test_query_engine(); - let sql = - "SELECT number, avg(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 hour'), number"; - let plan = sql_to_substrait(engine.clone(), sql).await; - - let mut ctx = create_test_ctx(); - let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan) - .await - .unwrap(); - - let aggr_exprs = vec![ - AggregateExpr { - func: AggregateFunc::SumUInt64, - expr: ScalarExpr::Column(0), - distinct: false, - }, - AggregateExpr { - func: AggregateFunc::Count, - expr: ScalarExpr::Column(1), - distinct: false, - }, - ]; - let avg_expr = ScalarExpr::If { - cond: Box::new(ScalarExpr::Column(4).call_binary( - ScalarExpr::Literal(Value::from(0i64), CDT::int64_datatype()), - BinaryFunc::NotEq, - )), - then: Box::new( - ScalarExpr::Column(3) - .cast(CDT::float64_datatype()) - .call_binary( - ScalarExpr::Column(4).cast(CDT::float64_datatype()), - BinaryFunc::DivFloat64, - ), - ), - els: Box::new(ScalarExpr::Literal(Value::Null, CDT::float64_datatype())), - }; - let expected = TypedPlan { - plan: Plan::Mfp { - input: Box::new( - Plan::Reduce { - input: Box::new( - Plan::Get { - id: crate::expr::Id::Global(GlobalId::User(1)), - } - .with_types( - RelationType::new(vec![ - ColumnType::new(ConcreteDataType::uint32_datatype(), false), - ColumnType::new( - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ), - ]) - .into_named(vec![ - Some("number".to_string()), - Some("ts".to_string()), - ]), - ) - .mfp(MapFilterProject::new(2).into_safe()) - .unwrap(), - ), - key_val_plan: KeyValPlan { - key_plan: MapFilterProject::new(2) - .map(vec![ - ScalarExpr::Column(1).call_unary( - UnaryFunc::TumbleWindowFloor { - window_size: Duration::from_nanos(3_600_000_000_000), - start_time: None, - }, - ), - ScalarExpr::Column(1).call_unary( - UnaryFunc::TumbleWindowCeiling { - window_size: Duration::from_nanos(3_600_000_000_000), - start_time: None, - }, - ), - ScalarExpr::Column(0), - ]) - .unwrap() - .project(vec![2, 3, 4]) - .unwrap() - .into_safe(), - val_plan: MapFilterProject::new(2) - .map(vec![ - ScalarExpr::Column(0).cast(CDT::uint64_datatype()), - ScalarExpr::Column(0), - ]) - .unwrap() - .project(vec![2, 3]) - .unwrap() - .into_safe(), - }, - reduce_plan: ReducePlan::Accumulable(AccumulablePlan { - full_aggrs: aggr_exprs.clone(), - simple_aggrs: vec![ - AggrWithIndex::new(aggr_exprs[0].clone(), 0, 0), - AggrWithIndex::new(aggr_exprs[1].clone(), 1, 1), - ], - distinct_aggrs: vec![], - }), - } - .with_types( - RelationType::new(vec![ - // keys - ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window start(time index) - ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window end(pk) - ColumnType::new(CDT::uint32_datatype(), false), // number(pk) - // values - ColumnType::new(CDT::uint64_datatype(), true), // avg.sum(number) - ColumnType::new(CDT::int64_datatype(), true), // avg.count(number) - ]) - .with_key(vec![1, 2]) - .with_time_index(Some(0)) - .into_named(vec![ - None, - None, - Some("number".to_string()), - None, - None, - ]), - ), - ), - mfp: MapFilterProject::new(5) - .map(vec![ - ScalarExpr::Column(2), // number(pk) - avg_expr, - ScalarExpr::Column(0), // window start - ScalarExpr::Column(1), // window end - ]) - .unwrap() - .project(vec![5, 6, 7, 8]) - .unwrap(), - }, - schema: RelationType::new(vec![ - ColumnType::new(CDT::uint32_datatype(), false), // number - ColumnType::new(CDT::float64_datatype(), true), // avg(number) - ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window start - ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window end - ]) - .with_key(vec![0, 3]) - .with_time_index(Some(2)) - .into_named(vec![ - Some("number".to_string()), - Some("avg(numbers_with_ts.number)".to_string()), - Some("window_start".to_string()), - Some("window_end".to_string()), - ]), - }; - assert_eq!(flow_plan, expected); - } - - #[tokio::test] - async fn test_tumble_parse_optional() { - let engine = create_test_query_engine(); - let sql = "SELECT sum(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 hour')"; - let plan = sql_to_substrait(engine.clone(), sql).await; - - let mut ctx = create_test_ctx(); - let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan) - .await - .unwrap(); - - let aggr_expr = AggregateExpr { - func: AggregateFunc::SumUInt64, - expr: ScalarExpr::Column(0), - distinct: false, - }; - let expected = TypedPlan { - schema: RelationType::new(vec![ - ColumnType::new(CDT::uint64_datatype(), true), // sum(number) - ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window start - ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window end - ]) - .with_key(vec![2]) - .with_time_index(Some(1)) - .into_named(vec![ - Some("sum(numbers_with_ts.number)".to_string()), - Some("window_start".to_string()), - Some("window_end".to_string()), - ]), - plan: Plan::Mfp { - input: Box::new( - Plan::Reduce { - input: Box::new( - Plan::Get { - id: crate::expr::Id::Global(GlobalId::User(1)), - } - .with_types( - RelationType::new(vec![ - ColumnType::new(ConcreteDataType::uint32_datatype(), false), - ColumnType::new( - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ), - ]) - .into_named(vec![ - Some("number".to_string()), - Some("ts".to_string()), - ]), - ) - .mfp(MapFilterProject::new(2).into_safe()) - .unwrap(), - ), - key_val_plan: KeyValPlan { - key_plan: MapFilterProject::new(2) - .map(vec![ - ScalarExpr::Column(1).call_unary( - UnaryFunc::TumbleWindowFloor { - window_size: Duration::from_nanos(3_600_000_000_000), - start_time: None, - }, - ), - ScalarExpr::Column(1).call_unary( - UnaryFunc::TumbleWindowCeiling { - window_size: Duration::from_nanos(3_600_000_000_000), - start_time: None, - }, - ), - ]) - .unwrap() - .project(vec![2, 3]) - .unwrap() - .into_safe(), - val_plan: MapFilterProject::new(2) - .map(vec![ScalarExpr::Column(0).cast(CDT::uint64_datatype())]) - .unwrap() - .project(vec![2]) - .unwrap() - .into_safe(), - }, - reduce_plan: ReducePlan::Accumulable(AccumulablePlan { - full_aggrs: vec![aggr_expr.clone()], - simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)], - distinct_aggrs: vec![], - }), - } - .with_types( - RelationType::new(vec![ - ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window start - ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window end - ColumnType::new(CDT::uint64_datatype(), true), //sum(number) - ]) - .with_key(vec![1]) - .with_time_index(Some(0)) - .into_named(vec![None, None, None]), - ), - ), - mfp: MapFilterProject::new(3) - .map(vec![ - ScalarExpr::Column(2), - ScalarExpr::Column(0), - ScalarExpr::Column(1), - ]) - .unwrap() - .project(vec![3, 4, 5]) - .unwrap(), - }, - }; - assert_eq!(flow_plan, expected); - } - - #[tokio::test] - async fn test_tumble_parse() { - let engine = create_test_query_engine(); - let sql = "SELECT sum(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 hour', '2021-07-01 00:00:00')"; - let plan = sql_to_substrait(engine.clone(), sql).await; - - let mut ctx = create_test_ctx(); - let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan) - .await - .unwrap(); - - let aggr_expr = AggregateExpr { - func: AggregateFunc::SumUInt64, - expr: ScalarExpr::Column(0), - distinct: false, - }; - let expected = TypedPlan { - schema: RelationType::new(vec![ - ColumnType::new(CDT::uint64_datatype(), true), // sum(number) - ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window start - ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window end - ]) - .with_key(vec![2]) - .with_time_index(Some(1)) - .into_named(vec![ - Some("sum(numbers_with_ts.number)".to_string()), - Some("window_start".to_string()), - Some("window_end".to_string()), - ]), - plan: Plan::Mfp { - input: Box::new( - Plan::Reduce { - input: Box::new( - Plan::Get { - id: crate::expr::Id::Global(GlobalId::User(1)), - } - .with_types( - RelationType::new(vec![ - ColumnType::new(ConcreteDataType::uint32_datatype(), false), - ColumnType::new( - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ), - ]) - .into_named(vec![ - Some("number".to_string()), - Some("ts".to_string()), - ]), - ) - .mfp(MapFilterProject::new(2).into_safe()) - .unwrap(), - ), - key_val_plan: KeyValPlan { - key_plan: MapFilterProject::new(2) - .map(vec![ - ScalarExpr::Column(1).call_unary( - UnaryFunc::TumbleWindowFloor { - window_size: Duration::from_nanos(3_600_000_000_000), - start_time: Some(Timestamp::new_millisecond( - 1625097600000, - )), - }, - ), - ScalarExpr::Column(1).call_unary( - UnaryFunc::TumbleWindowCeiling { - window_size: Duration::from_nanos(3_600_000_000_000), - start_time: Some(Timestamp::new_millisecond( - 1625097600000, - )), - }, - ), - ]) - .unwrap() - .project(vec![2, 3]) - .unwrap() - .into_safe(), - val_plan: MapFilterProject::new(2) - .map(vec![ScalarExpr::Column(0).cast(CDT::uint64_datatype())]) - .unwrap() - .project(vec![2]) - .unwrap() - .into_safe(), - }, - reduce_plan: ReducePlan::Accumulable(AccumulablePlan { - full_aggrs: vec![aggr_expr.clone()], - simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)], - distinct_aggrs: vec![], - }), - } - .with_types( - RelationType::new(vec![ - ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window start - ColumnType::new(CDT::timestamp_millisecond_datatype(), true), // window end - ColumnType::new(CDT::uint64_datatype(), true), //sum(number) - ]) - .with_key(vec![1]) - .with_time_index(Some(0)) - .into_unnamed(), - ), - ), - mfp: MapFilterProject::new(3) - .map(vec![ - ScalarExpr::Column(2), - ScalarExpr::Column(0), - ScalarExpr::Column(1), - ]) - .unwrap() - .project(vec![3, 4, 5]) - .unwrap(), - }, - }; - assert_eq!(flow_plan, expected); - } - - #[tokio::test] - async fn test_avg_group_by() { - let engine = create_test_query_engine(); - let sql = "SELECT avg(number), number FROM numbers GROUP BY number"; - let plan = sql_to_substrait(engine.clone(), sql).await; - - let mut ctx = create_test_ctx(); - let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).await; - - let aggr_exprs = vec![ - AggregateExpr { - func: AggregateFunc::SumUInt64, - expr: ScalarExpr::Column(0), - distinct: false, - }, - AggregateExpr { - func: AggregateFunc::Count, - expr: ScalarExpr::Column(1), - distinct: false, - }, - ]; - let avg_expr = ScalarExpr::If { - cond: Box::new(ScalarExpr::Column(2).call_binary( - ScalarExpr::Literal(Value::from(0i64), CDT::int64_datatype()), - BinaryFunc::NotEq, - )), - then: Box::new( - ScalarExpr::Column(1) - .cast(CDT::float64_datatype()) - .call_binary( - ScalarExpr::Column(2).cast(CDT::float64_datatype()), - BinaryFunc::DivFloat64, - ), - ), - els: Box::new(ScalarExpr::Literal(Value::Null, CDT::float64_datatype())), - }; - let expected = TypedPlan { - schema: RelationType::new(vec![ - ColumnType::new(CDT::float64_datatype(), true), // avg(number: u32) -> f64 - ColumnType::new(CDT::uint32_datatype(), false), // number - ]) - .with_key(vec![1]) - .into_named(vec![ - Some("avg(numbers.number)".to_string()), - Some("number".to_string()), - ]), - plan: Plan::Mfp { - input: Box::new( - Plan::Reduce { - input: Box::new( - Plan::Get { - id: crate::expr::Id::Global(GlobalId::User(0)), - } - .with_types( - RelationType::new(vec![ColumnType::new( - ConcreteDataType::uint32_datatype(), - false, - )]) - .into_named(vec![Some("number".to_string())]), - ) - .mfp( - MapFilterProject::new(1) - .project(vec![0]) - .unwrap() - .into_safe(), - ) - .unwrap(), - ), - key_val_plan: KeyValPlan { - key_plan: MapFilterProject::new(1) - .map(vec![ScalarExpr::Column(0)]) - .unwrap() - .project(vec![1]) - .unwrap() - .into_safe(), - val_plan: MapFilterProject::new(1) - .map(vec![ - ScalarExpr::Column(0).cast(CDT::uint64_datatype()), - ScalarExpr::Column(0), - ]) - .unwrap() - .project(vec![1, 2]) - .unwrap() - .into_safe(), - }, - reduce_plan: ReducePlan::Accumulable(AccumulablePlan { - full_aggrs: aggr_exprs.clone(), - simple_aggrs: vec![ - AggrWithIndex::new(aggr_exprs[0].clone(), 0, 0), - AggrWithIndex::new(aggr_exprs[1].clone(), 1, 1), - ], - distinct_aggrs: vec![], - }), - } - .with_types( - RelationType::new(vec![ - ColumnType::new(ConcreteDataType::uint32_datatype(), false), // key: number - ColumnType::new(ConcreteDataType::uint64_datatype(), true), // sum - ColumnType::new(ConcreteDataType::int64_datatype(), true), // count - ]) - .with_key(vec![0]) - .into_named(vec![ - Some("number".to_string()), - None, - None, - ]), - ), - ), - mfp: MapFilterProject::new(3) - .map(vec![ - avg_expr, // col 3 - ScalarExpr::Column(0), - // TODO(discord9): optimize mfp so to remove indirect ref - ]) - .unwrap() - .project(vec![3, 4]) - .unwrap(), - }, - }; - assert_eq!(flow_plan.unwrap(), expected); - } - - #[tokio::test] - async fn test_avg() { - let engine = create_test_query_engine(); - let sql = "SELECT avg(number) FROM numbers"; - let plan = sql_to_substrait(engine.clone(), sql).await; - - let mut ctx = create_test_ctx(); - - let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan) - .await - .unwrap(); - - let aggr_exprs = vec![ - AggregateExpr { - func: AggregateFunc::SumUInt64, - expr: ScalarExpr::Column(0), - distinct: false, - }, - AggregateExpr { - func: AggregateFunc::Count, - expr: ScalarExpr::Column(1), - distinct: false, - }, - ]; - let avg_expr = ScalarExpr::If { - cond: Box::new(ScalarExpr::Column(1).call_binary( - ScalarExpr::Literal(Value::from(0i64), CDT::int64_datatype()), - BinaryFunc::NotEq, - )), - then: Box::new( - ScalarExpr::Column(0) - .cast(CDT::float64_datatype()) - .call_binary( - ScalarExpr::Column(1).cast(CDT::float64_datatype()), - BinaryFunc::DivFloat64, - ), - ), - els: Box::new(ScalarExpr::Literal(Value::Null, CDT::float64_datatype())), - }; - let input = Box::new( - Plan::Get { - id: crate::expr::Id::Global(GlobalId::User(0)), - } - .with_types( - RelationType::new(vec![ColumnType::new( - ConcreteDataType::uint32_datatype(), - false, - )]) - .into_named(vec![Some("number".to_string())]), - ), - ); - let expected = TypedPlan { - schema: RelationType::new(vec![ColumnType::new(CDT::float64_datatype(), true)]) - .into_named(vec![Some("avg(numbers.number)".to_string())]), - plan: Plan::Mfp { - input: Box::new( - Plan::Reduce { - input: Box::new( - Plan::Mfp { - input: input.clone(), - mfp: MapFilterProject::new(1).project(vec![0]).unwrap(), - } - .with_types( - RelationType::new(vec![ColumnType::new( - CDT::uint32_datatype(), - false, - )]) - .into_named(vec![Some("number".to_string())]), - ), - ), - key_val_plan: KeyValPlan { - key_plan: MapFilterProject::new(1) - .project(vec![]) - .unwrap() - .into_safe(), - val_plan: MapFilterProject::new(1) - .map(vec![ - ScalarExpr::Column(0).cast(CDT::uint64_datatype()), - ScalarExpr::Column(0), - ]) - .unwrap() - .project(vec![1, 2]) - .unwrap() - .into_safe(), - }, - reduce_plan: ReducePlan::Accumulable(AccumulablePlan { - full_aggrs: aggr_exprs.clone(), - simple_aggrs: vec![ - AggrWithIndex::new(aggr_exprs[0].clone(), 0, 0), - AggrWithIndex::new(aggr_exprs[1].clone(), 1, 1), - ], - distinct_aggrs: vec![], - }), - } - .with_types( - RelationType::new(vec![ - ColumnType::new(ConcreteDataType::uint64_datatype(), true), // sum - ColumnType::new(ConcreteDataType::int64_datatype(), true), // count - ]) - .into_named(vec![None, None]), - ), - ), - mfp: MapFilterProject::new(2) - .map(vec![ - avg_expr, - // TODO(discord9): optimize mfp so to remove indirect ref - ]) - .unwrap() - .project(vec![2]) - .unwrap(), - }, - }; - assert_eq!(flow_plan, expected); - } - #[tokio::test] async fn test_sum() { let engine = create_test_query_engine(); diff --git a/tests/cases/standalone/common/flow/flow_tql_avg.result b/tests/cases/standalone/common/flow/flow_tql_avg.result new file mode 100644 index 0000000000..8438f41eb6 --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_tql_avg.result @@ -0,0 +1,126 @@ +CREATE TABLE sensor_readings ( + `value` DOUBLE, + ts TIMESTAMP TIME INDEX, + sensor STRING, + loc STRING, + PRIMARY KEY (sensor, loc) +); + +Affected Rows: 0 + +CREATE TABLE sensor_readings_avg ( + `value` DOUBLE, + ts TIMESTAMP TIME INDEX, + sensor STRING, + PRIMARY KEY (sensor) +); + +Affected Rows: 0 + +INSERT INTO sensor_readings VALUES + (20, now() - '30s'::interval, 'test', 'A'); + +Affected Rows: 1 + +-- SQLNESS REPLACE (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}) TS +TQL EVAL (now() - '1m'::interval, now(), '1m') +avg by(sensor) (sensor_readings) AS value; + ++-------+--------+---------------------+ +| value | sensor | ts | ++-------+--------+---------------------+ +| 20.0 | test | TS | ++-------+--------+---------------------+ + +-- SQLNESS REPLACE (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}) TS +TQL EVAL (now() - '1m'::interval, now(), '1m') (sum by(sensor) (sensor_readings) / count by(sensor) (sensor_readings)) AS value; + ++-------+--------+---------------------+ +| value | sensor | ts | ++-------+--------+---------------------+ +| 20.0 | test | TS | ++-------+--------+---------------------+ + +CREATE FLOW sensor_readings_avg_flow +SINK TO sensor_readings_avg +EVAL INTERVAL '1m' AS +TQL EVAL (now() - '1m'::interval, now(), '1m') +avg by(sensor) (sensor_readings) AS value; + +Affected Rows: 0 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('sensor_readings_avg_flow'); + ++----------------------------------------------+ +| ADMIN FLUSH_FLOW('sensor_readings_avg_flow') | ++----------------------------------------------+ +| FLOW_FLUSHED | ++----------------------------------------------+ + +-- SQLNESS REPLACE (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}) TS +SELECT * FROM sensor_readings_avg ORDER BY ts DESC LIMIT 1; + ++-------+---------------------+--------+ +| value | ts | sensor | ++-------+---------------------+--------+ +| 20.0 | TS | test | ++-------+---------------------+--------+ + +DROP FLOW sensor_readings_avg_flow; + +Affected Rows: 0 + +-- SQLNESS SLEEP 1s +INSERT INTO sensor_readings VALUES + (30, now() - '40s'::interval, 'test', 'B'); + +Affected Rows: 1 + +-- SQLNESS REPLACE (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}) TS +TQL EVAL (now() - '1m'::interval, now(), '1m') +avg by(sensor) (sensor_readings) AS value; + ++-------+--------+---------------------+ +| value | sensor | ts | ++-------+--------+---------------------+ +| 25.0 | test | TS | ++-------+--------+---------------------+ + +CREATE FLOW sensor_readings_avg_flow +SINK TO sensor_readings_avg +EVAL INTERVAL '1m' AS +TQL EVAL (now() - '1m'::interval, now(), '1m') (sum by(sensor) (sensor_readings) / count by(sensor) (sensor_readings)) AS value; + +Affected Rows: 0 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('sensor_readings_avg_flow'); + ++----------------------------------------------+ +| ADMIN FLUSH_FLOW('sensor_readings_avg_flow') | ++----------------------------------------------+ +| FLOW_FLUSHED | ++----------------------------------------------+ + +-- SQLNESS REPLACE (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}) TS +SELECT * FROM sensor_readings_avg ORDER BY ts DESC LIMIT 1; + ++-------+---------------------+--------+ +| value | ts | sensor | ++-------+---------------------+--------+ +| 25.0 | TS | test | ++-------+---------------------+--------+ + +DROP FLOW sensor_readings_avg_flow; + +Affected Rows: 0 + +DROP TABLE sensor_readings_avg; + +Affected Rows: 0 + +DROP TABLE sensor_readings; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_tql_avg.sql b/tests/cases/standalone/common/flow/flow_tql_avg.sql new file mode 100644 index 0000000000..a5d6ab9d2b --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_tql_avg.sql @@ -0,0 +1,63 @@ +CREATE TABLE sensor_readings ( + `value` DOUBLE, + ts TIMESTAMP TIME INDEX, + sensor STRING, + loc STRING, + PRIMARY KEY (sensor, loc) +); + +CREATE TABLE sensor_readings_avg ( + `value` DOUBLE, + ts TIMESTAMP TIME INDEX, + sensor STRING, + PRIMARY KEY (sensor) +); + +INSERT INTO sensor_readings VALUES + (20, now() - '30s'::interval, 'test', 'A'); + +-- SQLNESS REPLACE (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}) TS +TQL EVAL (now() - '1m'::interval, now(), '1m') +avg by(sensor) (sensor_readings) AS value; + +-- SQLNESS REPLACE (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}) TS +TQL EVAL (now() - '1m'::interval, now(), '1m') (sum by(sensor) (sensor_readings) / count by(sensor) (sensor_readings)) AS value; + +CREATE FLOW sensor_readings_avg_flow +SINK TO sensor_readings_avg +EVAL INTERVAL '1m' AS +TQL EVAL (now() - '1m'::interval, now(), '1m') +avg by(sensor) (sensor_readings) AS value; + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('sensor_readings_avg_flow'); + +-- SQLNESS REPLACE (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}) TS +SELECT * FROM sensor_readings_avg ORDER BY ts DESC LIMIT 1; + +DROP FLOW sensor_readings_avg_flow; + +-- SQLNESS SLEEP 1s +INSERT INTO sensor_readings VALUES + (30, now() - '40s'::interval, 'test', 'B'); + +-- SQLNESS REPLACE (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}) TS +TQL EVAL (now() - '1m'::interval, now(), '1m') +avg by(sensor) (sensor_readings) AS value; + + +CREATE FLOW sensor_readings_avg_flow +SINK TO sensor_readings_avg +EVAL INTERVAL '1m' AS +TQL EVAL (now() - '1m'::interval, now(), '1m') (sum by(sensor) (sensor_readings) / count by(sensor) (sensor_readings)) AS value; + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('sensor_readings_avg_flow'); + +-- SQLNESS REPLACE (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}) TS +SELECT * FROM sensor_readings_avg ORDER BY ts DESC LIMIT 1; + +DROP FLOW sensor_readings_avg_flow; + +DROP TABLE sensor_readings_avg; +DROP TABLE sensor_readings;