fix: query without time window also clean dirty time window

This commit is contained in:
discord9
2025-04-21 17:20:04 +08:00
parent b4aa0c8b8b
commit d32ade7399
3 changed files with 26 additions and 3 deletions

View File

@@ -374,7 +374,12 @@ impl BatchingEngine {
let res = task
.gen_exec_once(&self.query_engine, &self.frontend_client)
.await?;
let affected_rows = res.map(|(r, _)| r).unwrap_or_default() as usize;
info!(
"Successfully flush flow {flow_id}, affected rows={}",
affected_rows
);
Ok(affected_rows)
}

View File

@@ -17,8 +17,8 @@
use std::collections::BTreeMap;
use std::time::Duration;
use common_telemetry::debug;
use common_telemetry::tracing::warn;
use common_telemetry::{debug, info};
use common_time::Timestamp;
use datatypes::value::Value;
use session::context::QueryContextRef;
@@ -74,7 +74,11 @@ impl TaskState {
/// wait for at least `last_query_duration`, at most `max_timeout` to start next query
///
/// if have more dirty time window, exec next query immediately
pub fn get_next_start_query_time(&self, max_timeout: Option<Duration>) -> Instant {
pub fn get_next_start_query_time(
&self,
flow_id: FlowId,
max_timeout: Option<Duration>,
) -> Instant {
let next_duration = max_timeout
.unwrap_or(self.last_query_duration)
.min(self.last_query_duration);
@@ -84,6 +88,10 @@ impl TaskState {
if self.dirty_time_windows.windows.is_empty() {
self.last_update_time + next_duration
} else {
info!(
"Flow id = {}, still have {:?} dirty time window, execute immediately",
flow_id, self.dirty_time_windows.windows
);
Instant::now()
}
}
@@ -123,6 +131,11 @@ impl DirtyTimeWindows {
self.windows.insert(start, end);
}
/// Clean all dirty time windows, useful when can't found time window expr
pub fn clean(&mut self) {
self.windows.clear();
}
/// Generate all filter expressions consuming all time windows
pub fn gen_filter_exprs(
&mut self,

View File

@@ -417,7 +417,10 @@ impl BatchingTask {
}
Err(TryRecvError::Empty) => (),
}
state.get_next_start_query_time(Some(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT))
state.get_next_start_query_time(
self.config.flow_id,
Some(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT),
)
};
tokio::time::sleep_until(sleep_until).await;
}
@@ -526,6 +529,8 @@ impl BatchingTask {
debug!(
"Flow id = {:?}, can't get window size: precise_lower_bound={expire_time_window_bound:?}, using the same query", self.config.flow_id
);
// clean dirty time window too, this could be from create flow's check_execute
self.state.write().unwrap().dirty_time_windows.clean();
let mut add_auto_column =
AddAutoColumnRewriter::new(sink_table_schema.clone());