feat: gen write back req

This commit is contained in:
discord9
2024-04-28 16:37:43 +08:00
parent e88a40b58b
commit edcbc89c38

View File

@@ -49,6 +49,8 @@ mod worker;
use error::Error;
pub const PER_REQ_MAX_ROW_CNT: usize = 8192;
// TODO: refactor common types for flow to a separate module
pub type TaskId = u64;
pub type TableName = Vec<String>;
@@ -235,6 +237,23 @@ impl FlowNodeManager {
Ok(())
}
/// Generate writeback request for all sink table
pub async fn generate_writeback_request(&mut self) -> BTreeMap<TableName, Vec<DiffRequest>> {
let mut output = BTreeMap::new();
for (name, sink_recv) in self
.node_context
.sink_receiver
.iter_mut()
.map(|(n, (_s, r))| (n, r))
{
let mut rows = Vec::new();
let _recv_cnt = sink_recv.recv_many(&mut rows, PER_REQ_MAX_ROW_CNT).await;
let reqs = diff_row_to_request(rows);
output.insert(name.clone(), reqs);
}
output
}
pub async fn remove_task(&mut self, task_id: TaskId) -> Result<(), Error> {
for handle in self.worker_handles.iter_mut() {
if handle.contains_task(task_id)? {