diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 2e66f3850b..31f4fadf03 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -64,8 +64,6 @@ mod table_source; use error::Error; -pub const PER_REQ_MAX_ROW_CNT: usize = 8192; - // TODO: replace this with `GREPTIME_TIMESTAMP` before v0.9 pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder"; diff --git a/src/flow/src/compute/render/map.rs b/src/flow/src/compute/render/map.rs index 50bd48f5fb..91cb37c6cf 100644 --- a/src/flow/src/compute/render/map.rs +++ b/src/flow/src/compute/render/map.rs @@ -113,9 +113,21 @@ fn mfp_subgraph( scheduler: &Scheduler, send: &PortCtx, ) { + // all updates that should be send immediately + let mut output_now = vec![]; let run_mfp = || { - let all_updates = eval_mfp_core(input, mfp_plan, now, err_collector); - arrange.write().apply_updates(now, all_updates)?; + let mut all_updates = eval_mfp_core(input, mfp_plan, now, err_collector); + all_updates.retain(|(kv, ts, d)| { + if *ts > now { + true + } else { + output_now.push((kv.clone(), *ts, *d)); + false + } + }); + let future_updates = all_updates; + + arrange.write().apply_updates(now, future_updates)?; Ok(()) }; err_collector.run(run_mfp); @@ -130,13 +142,19 @@ fn mfp_subgraph( std::ops::Bound::Excluded(from), std::ops::Bound::Included(now), ); + + // find all updates that need to be send from arrangement let output_kv = arrange.read().get_updates_in_range(range); + // the output is expected to be key -> empty val let output = output_kv .into_iter() + .chain(output_now) // chain previous immediately send updates .map(|((key, _v), ts, diff)| (key, ts, diff)) .collect_vec(); + // send output send.give(output); + let run_compaction = || { arrange.write().compact_to(now)?; Ok(()) @@ -305,4 +323,42 @@ mod test { ]); run_and_check(&mut state, &mut df, 1..5, expected, output); } + + /// test if mfp operator can run multiple times within same tick + #[test] + fn test_render_mfp_multiple_times() { + let mut df = Hydroflow::new(); + let mut state = DataflowState::default(); + let mut ctx = harness_test_ctx(&mut df, &mut state); + + let (sender, recv) = tokio::sync::broadcast::channel(1000); + let collection = ctx.render_source(recv).unwrap(); + ctx.insert_global(GlobalId::User(1), collection); + let input_plan = Plan::Get { + id: expr::Id::Global(GlobalId::User(1)), + }; + let typ = RelationType::new(vec![ColumnType::new_nullable( + ConcreteDataType::int64_datatype(), + )]); + // filter: col(0)>1 + let mfp = MapFilterProject::new(1) + .filter(vec![ScalarExpr::Column(0).call_binary( + ScalarExpr::literal(1.into(), ConcreteDataType::int32_datatype()), + BinaryFunc::Gt, + )]) + .unwrap(); + let bundle = ctx + .render_mfp(Box::new(input_plan.with_types(typ)), mfp) + .unwrap(); + + let output = get_output_handle(&mut ctx, bundle); + drop(ctx); + sender.send((Row::new(vec![2.into()]), 0, 1)).unwrap(); + state.run_available_with_schedule(&mut df); + assert_eq!(output.borrow().len(), 1); + output.borrow_mut().clear(); + sender.send((Row::new(vec![3.into()]), 0, 1)).unwrap(); + state.run_available_with_schedule(&mut df); + assert_eq!(output.borrow().len(), 1); + } } diff --git a/src/flow/src/repr.rs b/src/flow/src/repr.rs index e918044c0d..e28689be40 100644 --- a/src/flow/src/repr.rs +++ b/src/flow/src/repr.rs @@ -53,7 +53,8 @@ pub type KeyValDiffRow = ((Row, Row), Timestamp, Diff); /// broadcast channel capacity, can be important to memory consumption, since this influence how many /// updates can be buffered in memory in the entire dataflow -pub const BROADCAST_CAP: usize = 8192; +/// TODO(discord9): add config for this, so cpu&mem usage can be balanced and configured by this +pub const BROADCAST_CAP: usize = 65535; /// Convert a value that is or can be converted to Datetime to internal timestamp ///