From 5eaf9816b972da980c4afdfb7e809f4ea7b24d73 Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 23 May 2024 20:58:40 +0800 Subject: [PATCH] fix: send buf clear --- src/flow/src/adapter.rs | 11 ++++++----- src/flow/src/adapter/node_context.rs | 10 ++++++++-- src/flow/src/compute/render/src_sink.rs | 4 ++-- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 340ab6991a..e0d0428fc3 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -521,18 +521,19 @@ impl FlownodeManager { worker.lock().await.run_available(now).await.unwrap(); } // first check how many inputs were sent - let send_cnt = match self.node_context.lock().await.flush_all_sender() { - Ok(cnt) => cnt, + match self.node_context.lock().await.flush_all_sender() { + Ok(_) => (), Err(err) => { common_telemetry::error!("Flush send buf errors: {:?}", err); break; } }; - // if no inputs - if send_cnt == 0 { + // if no thing in send buf then break + let buf_len = self.node_context.lock().await.get_send_buf_size(); + if buf_len == 0 { break; } else { - debug!("FlownodeManager::run_available: send_cnt={}", send_cnt); + debug!("Send buf len = {}", buf_len); } } diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index bc88d2bbe5..a5db59698d 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -51,8 +51,6 @@ pub struct FlownodeContext { mpsc::UnboundedReceiver, ), >, - /// store source in buffer for each source table, in case broadcast channel is full - pub send_buffer: BTreeMap>, /// the schema of the table, query from metasrv or inferred from TypedPlan pub schema: HashMap, /// All the tables that have been registered in the worker @@ -109,6 +107,7 @@ impl SourceSender { } if row_cnt > 0 { debug!("Send {} rows", row_cnt); + debug!("Send buf len = {}", self.send_buf.len()); } Ok(row_cnt) @@ -140,12 +139,19 @@ impl FlownodeContext { } /// flush all sender's buf + /// + /// return numbers being sent pub fn flush_all_sender(&mut self) -> Result { self.source_sender .iter_mut() .map(|(_table_id, src_sender)| src_sender.try_send_all()) .try_fold(0, |acc, x| x.map(|x| x + acc)) } + + /// Return the sum number of rows in all send buf + pub fn get_send_buf_size(&self) -> usize { + self.source_sender.values().map(|v| v.send_buf.len()).sum() + } } impl FlownodeContext { diff --git a/src/flow/src/compute/render/src_sink.rs b/src/flow/src/compute/render/src_sink.rs index 33ecb9670c..66159c35c6 100644 --- a/src/flow/src/compute/render/src_sink.rs +++ b/src/flow/src/compute/render/src_sink.rs @@ -79,8 +79,8 @@ impl<'referred, 'df> Context<'referred, 'df> { } err_collector.run(|| arrange_handler_inner.write().apply_updates(now, to_arrange)); send.give(all); - // always schedule source to run at next tick - inner_schd.schedule_at(now + 1); + // always schedule source to run at now so we can repeatly run source if needed + inner_schd.schedule_at(now); }); schd.set_cur_subgraph(sub); let arranged = Arranged::new(arrange_handler);