diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 25bb3cb2bf..8e2e7cdecd 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -653,21 +653,20 @@ impl FlownodeManager { /// /// TODO(discord9): better way to do it, and not expose flow tick even to other flow to avoid /// TSO coord mess -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct FlowTickManager { start: Instant, -} - -impl std::fmt::Debug for FlowTickManager { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("FlowTickManager").finish() - } + start_timestamp: repr::Timestamp, } impl FlowTickManager { pub fn new() -> Self { FlowTickManager { start: Instant::now(), + start_timestamp: SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis() as repr::Timestamp, } } diff --git a/src/flow/src/transform/aggr.rs b/src/flow/src/transform/aggr.rs index 50ffd8e46d..2550685343 100644 --- a/src/flow/src/transform/aggr.rs +++ b/src/flow/src/transform/aggr.rs @@ -435,6 +435,133 @@ mod test { use crate::repr::{self, ColumnType, RelationType}; use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait}; + #[tokio::test] + async fn test_tumble_compsite() { + let engine = create_test_query_engine(); + let sql = + "SELECT number, avg(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 hour'), number"; + let plan = sql_to_substrait(engine.clone(), sql).await; + + let mut ctx = create_test_ctx(); + let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).unwrap(); + + let aggr_exprs = vec![ + AggregateExpr { + func: AggregateFunc::SumUInt32, + expr: ScalarExpr::Column(0), + distinct: false, + }, + AggregateExpr { + func: AggregateFunc::Count, + expr: ScalarExpr::Column(0), + distinct: false, + }, + ]; + let avg_expr = ScalarExpr::If { + cond: Box::new(ScalarExpr::Column(4).call_binary( + ScalarExpr::Literal(Value::from(0i64), CDT::int64_datatype()), + BinaryFunc::NotEq, + )), + then: Box::new(ScalarExpr::Column(3).call_binary( + ScalarExpr::Column(4).call_unary(UnaryFunc::Cast(CDT::uint64_datatype())), + BinaryFunc::DivUInt64, + )), + els: Box::new(ScalarExpr::Literal(Value::Null, CDT::uint64_datatype())), + }; + let expected = TypedPlan { + typ: RelationType::new(vec![ + ColumnType::new(CDT::uint32_datatype(), false), // number + ColumnType::new(CDT::uint64_datatype(), true), // sum(number) + ColumnType::new(CDT::datetime_datatype(), false), // window start + ColumnType::new(CDT::datetime_datatype(), false), // window end + ]), + // TODO(discord9): mfp indirectly ref to key columns + /* + .with_key(vec![1]) + .with_time_index(Some(0)),*/ + plan: Plan::Mfp { + input: Box::new( + Plan::Reduce { + input: Box::new( + Plan::Get { + id: crate::expr::Id::Global(GlobalId::User(1)), + } + .with_types(RelationType::new(vec![ + ColumnType::new(ConcreteDataType::uint32_datatype(), false), + ColumnType::new(ConcreteDataType::datetime_datatype(), false), + ])), + ), + key_val_plan: KeyValPlan { + key_plan: MapFilterProject::new(2) + .map(vec![ + ScalarExpr::Column(1).call_unary( + UnaryFunc::TumbleWindowFloor { + window_size: Interval::from_month_day_nano( + 0, + 0, + 3_600_000_000_000, + ), + start_time: None, + }, + ), + ScalarExpr::Column(1).call_unary( + UnaryFunc::TumbleWindowCeiling { + window_size: Interval::from_month_day_nano( + 0, + 0, + 3_600_000_000_000, + ), + start_time: None, + }, + ), + ScalarExpr::Column(0), + ]) + .unwrap() + .project(vec![2, 3, 4]) + .unwrap() + .into_safe(), + val_plan: MapFilterProject::new(2) + .project(vec![0, 1]) + .unwrap() + .into_safe(), + }, + reduce_plan: ReducePlan::Accumulable(AccumulablePlan { + full_aggrs: aggr_exprs.clone(), + simple_aggrs: vec![ + AggrWithIndex::new(aggr_exprs[0].clone(), 0, 0), + AggrWithIndex::new(aggr_exprs[1].clone(), 0, 1), + ], + distinct_aggrs: vec![], + }), + } + .with_types( + RelationType::new(vec![ + ColumnType::new(CDT::datetime_datatype(), false), // window start(time index) + ColumnType::new(CDT::datetime_datatype(), false), // window end(pk) + ColumnType::new(CDT::uint32_datatype(), false), // number(pk) + ColumnType::new(CDT::uint64_datatype(), true), // avg.sum(number) + ColumnType::new(CDT::uint64_datatype(), true), // avg.count(number) + ]) + .with_key(vec![1, 2]) + .with_time_index(Some(0)), + ), + ), + mfp: MapFilterProject::new(5) + .map(vec![ + avg_expr, + ScalarExpr::Column(2), + ScalarExpr::Column(5), + ScalarExpr::Column(0), + ScalarExpr::Column(1), + ]) + .unwrap() + .project(vec![6, 7, 8, 9]) + .unwrap(), + }, + }; + assert_eq!(flow_plan, expected); + } + #[tokio::test] async fn test_tumble_parse_optional() { let engine = create_test_query_engine(); diff --git a/src/flow/src/utils.rs b/src/flow/src/utils.rs index 93edf176e7..599dc4541a 100644 --- a/src/flow/src/utils.rs +++ b/src/flow/src/utils.rs @@ -208,7 +208,9 @@ impl Arrangement { for ((key, val), update_ts, diff) in updates { // check if the key is expired if let Some(s) = &mut self.expire_state { + dbg!(now, &key, &s); if let Some(expired_by) = s.update_event_ts(now, &key)? { + dbg!(expired_by, &key); max_expired_by = max_expired_by.max(Some(expired_by)); continue; }