From d32ade739998b435d935e5fa6180476ffb2593bb Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 21 Apr 2025 17:20:04 +0800 Subject: [PATCH] fix: query without time window also clean dirty time window --- src/flow/src/batching_mode/engine.rs | 5 +++++ src/flow/src/batching_mode/state.rs | 17 +++++++++++++++-- src/flow/src/batching_mode/task.rs | 7 ++++++- 3 files changed, 26 insertions(+), 3 deletions(-) diff --git a/src/flow/src/batching_mode/engine.rs b/src/flow/src/batching_mode/engine.rs index 72fcd6ee6c..9ab2ea0046 100644 --- a/src/flow/src/batching_mode/engine.rs +++ b/src/flow/src/batching_mode/engine.rs @@ -374,7 +374,12 @@ impl BatchingEngine { let res = task .gen_exec_once(&self.query_engine, &self.frontend_client) .await?; + let affected_rows = res.map(|(r, _)| r).unwrap_or_default() as usize; + info!( + "Successfully flush flow {flow_id}, affected rows={}", + affected_rows + ); Ok(affected_rows) } diff --git a/src/flow/src/batching_mode/state.rs b/src/flow/src/batching_mode/state.rs index e89ca996fb..d34ab8fbeb 100644 --- a/src/flow/src/batching_mode/state.rs +++ b/src/flow/src/batching_mode/state.rs @@ -17,8 +17,8 @@ use std::collections::BTreeMap; use std::time::Duration; -use common_telemetry::debug; use common_telemetry::tracing::warn; +use common_telemetry::{debug, info}; use common_time::Timestamp; use datatypes::value::Value; use session::context::QueryContextRef; @@ -74,7 +74,11 @@ impl TaskState { /// wait for at least `last_query_duration`, at most `max_timeout` to start next query /// /// if have more dirty time window, exec next query immediately - pub fn get_next_start_query_time(&self, max_timeout: Option) -> Instant { + pub fn get_next_start_query_time( + &self, + flow_id: FlowId, + max_timeout: Option, + ) -> Instant { let next_duration = max_timeout .unwrap_or(self.last_query_duration) .min(self.last_query_duration); @@ -84,6 +88,10 @@ impl TaskState { if self.dirty_time_windows.windows.is_empty() { self.last_update_time + next_duration } else { + info!( + "Flow id = {}, still have {:?} dirty time window, execute immediately", + flow_id, self.dirty_time_windows.windows + ); Instant::now() } } @@ -123,6 +131,11 @@ impl DirtyTimeWindows { self.windows.insert(start, end); } + /// Clean all dirty time windows, useful when can't found time window expr + pub fn clean(&mut self) { + self.windows.clear(); + } + /// Generate all filter expressions consuming all time windows pub fn gen_filter_exprs( &mut self, diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index a3af6fe242..c36c1521b7 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -417,7 +417,10 @@ impl BatchingTask { } Err(TryRecvError::Empty) => (), } - state.get_next_start_query_time(Some(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT)) + state.get_next_start_query_time( + self.config.flow_id, + Some(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT), + ) }; tokio::time::sleep_until(sleep_until).await; } @@ -526,6 +529,8 @@ impl BatchingTask { debug!( "Flow id = {:?}, can't get window size: precise_lower_bound={expire_time_window_bound:?}, using the same query", self.config.flow_id ); + // clean dirty time window too, this could be from create flow's check_execute + self.state.write().unwrap().dirty_time_windows.clean(); let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone());