diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index fa76fafc01..58a850835f 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -49,6 +49,8 @@ mod worker; use error::Error; +pub const PER_REQ_MAX_ROW_CNT: usize = 8192; + // TODO: refactor common types for flow to a separate module pub type TaskId = u64; pub type TableName = Vec; @@ -235,6 +237,23 @@ impl FlowNodeManager { Ok(()) } + /// Generate writeback request for all sink table + pub async fn generate_writeback_request(&mut self) -> BTreeMap> { + let mut output = BTreeMap::new(); + for (name, sink_recv) in self + .node_context + .sink_receiver + .iter_mut() + .map(|(n, (_s, r))| (n, r)) + { + let mut rows = Vec::new(); + let _recv_cnt = sink_recv.recv_many(&mut rows, PER_REQ_MAX_ROW_CNT).await; + let reqs = diff_row_to_request(rows); + output.insert(name.clone(), reqs); + } + output + } + pub async fn remove_task(&mut self, task_id: TaskId) -> Result<(), Error> { for handle in self.worker_handles.iter_mut() { if handle.contains_task(task_id)? {