fix: send buf clear

This commit is contained in:
discord9
2024-05-23 20:58:40 +08:00
parent 6684f8dce3
commit 5eaf9816b9
3 changed files with 16 additions and 9 deletions

View File

@@ -521,18 +521,19 @@ impl FlownodeManager {
worker.lock().await.run_available(now).await.unwrap();
}
// first check how many inputs were sent
let send_cnt = match self.node_context.lock().await.flush_all_sender() {
Ok(cnt) => cnt,
match self.node_context.lock().await.flush_all_sender() {
Ok(_) => (),
Err(err) => {
common_telemetry::error!("Flush send buf errors: {:?}", err);
break;
}
};
// if no inputs
if send_cnt == 0 {
// if no thing in send buf then break
let buf_len = self.node_context.lock().await.get_send_buf_size();
if buf_len == 0 {
break;
} else {
debug!("FlownodeManager::run_available: send_cnt={}", send_cnt);
debug!("Send buf len = {}", buf_len);
}
}

View File

@@ -51,8 +51,6 @@ pub struct FlownodeContext {
mpsc::UnboundedReceiver<DiffRow>,
),
>,
/// store source in buffer for each source table, in case broadcast channel is full
pub send_buffer: BTreeMap<TableId, VecDeque<DiffRow>>,
/// the schema of the table, query from metasrv or inferred from TypedPlan
pub schema: HashMap<GlobalId, RelationDesc>,
/// All the tables that have been registered in the worker
@@ -109,6 +107,7 @@ impl SourceSender {
}
if row_cnt > 0 {
debug!("Send {} rows", row_cnt);
debug!("Send buf len = {}", self.send_buf.len());
}
Ok(row_cnt)
@@ -140,12 +139,19 @@ impl FlownodeContext {
}
/// flush all sender's buf
///
/// return numbers being sent
pub fn flush_all_sender(&mut self) -> Result<usize, Error> {
self.source_sender
.iter_mut()
.map(|(_table_id, src_sender)| src_sender.try_send_all())
.try_fold(0, |acc, x| x.map(|x| x + acc))
}
/// Return the sum number of rows in all send buf
pub fn get_send_buf_size(&self) -> usize {
self.source_sender.values().map(|v| v.send_buf.len()).sum()
}
}
impl FlownodeContext {

View File

@@ -79,8 +79,8 @@ impl<'referred, 'df> Context<'referred, 'df> {
}
err_collector.run(|| arrange_handler_inner.write().apply_updates(now, to_arrange));
send.give(all);
// always schedule source to run at next tick
inner_schd.schedule_at(now + 1);
// always schedule source to run at now so we can repeatly run source if needed
inner_schd.schedule_at(now);
});
schd.set_cur_subgraph(sub);
let arranged = Arranged::new(arrange_handler);