From 6d0470c3fbca287d8b5ce701485e9c7f1c1918d0 Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 18 Apr 2025 21:58:32 +0800 Subject: [PATCH] feat: flush_flow flush all ranges now --- src/flow/src/batching_mode/engine.rs | 3 + src/flow/src/batching_mode/state.rs | 29 ++++++++- src/flow/src/batching_mode/task.rs | 30 +++++++++ .../standalone/common/flow/flow_flush.result | 62 +++++++++++++++++++ .../standalone/common/flow/flow_flush.sql | 37 +++++++++++ 5 files changed, 159 insertions(+), 2 deletions(-) create mode 100644 tests/cases/standalone/common/flow/flow_flush.result create mode 100644 tests/cases/standalone/common/flow/flow_flush.sql diff --git a/src/flow/src/batching_mode/engine.rs b/src/flow/src/batching_mode/engine.rs index 96209ed95c..72fcd6ee6c 100644 --- a/src/flow/src/batching_mode/engine.rs +++ b/src/flow/src/batching_mode/engine.rs @@ -363,11 +363,14 @@ impl BatchingEngine { } pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result { + info!("Try flush flow {flow_id}"); let task = self.tasks.read().await.get(&flow_id).cloned(); let task = task.with_context(|| UnexpectedSnafu { reason: format!("Can't found task for flow {flow_id}"), })?; + task.mark_all_windows_as_dirty()?; + let res = task .gen_exec_once(&self.query_engine, &self.frontend_client) .await?; diff --git a/src/flow/src/batching_mode/state.rs b/src/flow/src/batching_mode/state.rs index 23b1b6c2cd..5cbc541d88 100644 --- a/src/flow/src/batching_mode/state.rs +++ b/src/flow/src/batching_mode/state.rs @@ -22,13 +22,13 @@ use common_telemetry::tracing::warn; use common_time::Timestamp; use datatypes::value::Value; use session::context::QueryContextRef; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use tokio::sync::oneshot; use tokio::time::Instant; use crate::batching_mode::task::BatchingTask; use crate::batching_mode::MIN_REFRESH_DURATION; -use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu}; +use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu}; use crate::{Error, FlowId}; /// The state of the [`BatchingTask`]. @@ -118,6 +118,10 @@ impl DirtyTimeWindows { } } + pub fn add_window(&mut self, start: Timestamp, end: Option) { + self.windows.insert(start, end); + } + /// Generate all filter expressions consuming all time windows pub fn gen_filter_exprs( &mut self, @@ -180,6 +184,27 @@ impl DirtyTimeWindows { let mut expr_lst = vec![]; for (start, end) in first_nth.into_iter() { + // align using time window exprs + let (start, end) = if let Some(ctx) = task_ctx { + let Some(time_window_expr) = &ctx.config.time_window_expr else { + UnexpectedSnafu { + reason: "time_window_expr is not set", + } + .fail()? + }; + let align_start = time_window_expr.eval(start)?.0.context(UnexpectedSnafu { + reason: format!( + "Failed to align start time {:?} with time window expr {:?}", + start, time_window_expr + ), + })?; + let align_end = end + .and_then(|end| time_window_expr.eval(end).map(|r| r.1).transpose()) + .transpose()?; + (align_start, align_end) + } else { + (start, end) + }; debug!( "Time window start: {:?}, end: {:?}", start.to_iso8601_string(), diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index f163a56cb2..0d61a23d50 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -111,6 +111,36 @@ impl BatchingTask { } } + /// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set) + /// + /// useful for flush_flow to flush dirty time windows range + pub fn mark_all_windows_as_dirty(&self) -> Result<(), Error> { + let now = SystemTime::now(); + let now = Timestamp::new_second( + now.duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_secs() as _, + ); + let lower_bound = self + .config + .expire_after + .map(|e| now.sub_duration(Duration::from_secs(e as _))) + .transpose() + .map_err(BoxedError::new) + .context(ExternalSnafu)? + .unwrap_or(Timestamp::new_second(0)); + debug!( + "Flow {} mark range ({:?}, {:?}) as dirty", + self.config.flow_id, lower_bound, now + ); + self.state + .write() + .unwrap() + .dirty_time_windows + .add_window(lower_bound, Some(now)); + Ok(()) + } + /// Test execute, for check syntax or such pub async fn check_execute( &self, diff --git a/tests/cases/standalone/common/flow/flow_flush.result b/tests/cases/standalone/common/flow/flow_flush.result new file mode 100644 index 0000000000..f9a8a43af8 --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_flush.result @@ -0,0 +1,62 @@ +-- test if flush_flow works and flush old data to flow for compute +CREATE TABLE numbers_input_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +INSERT INTO + numbers_input_basic +VALUES + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +Affected Rows: 2 + +CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS +SELECT + sum(number), + date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00.1') as time_window +FROM + numbers_input_basic +GROUP BY + time_window; + +Affected Rows: 0 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); + ++----------------------------------------+ +| ADMIN FLUSH_FLOW('test_numbers_basic') | ++----------------------------------------+ +| FLOW_FLUSHED | ++----------------------------------------+ + +SELECT + "sum(numbers_input_basic.number)", + time_window +FROM + out_num_cnt_basic; + ++---------------------------------+-------------------------+ +| sum(numbers_input_basic.number) | time_window | ++---------------------------------+-------------------------+ +| 42 | 2021-07-01T00:00:00.100 | ++---------------------------------+-------------------------+ + +DROP FLOW test_numbers_basic; + +Affected Rows: 0 + +DROP TABLE numbers_input_basic; + +Affected Rows: 0 + +DROP TABLE out_num_cnt_basic; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_flush.sql b/tests/cases/standalone/common/flow/flow_flush.sql new file mode 100644 index 0000000000..9dca98baf7 --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_flush.sql @@ -0,0 +1,37 @@ +-- test if flush_flow works and flush old data to flow for compute +CREATE TABLE numbers_input_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +INSERT INTO + numbers_input_basic +VALUES + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS +SELECT + sum(number), + date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00.1') as time_window +FROM + numbers_input_basic +GROUP BY + time_window; + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); + +SELECT + "sum(numbers_input_basic.number)", + time_window +FROM + out_num_cnt_basic; + +DROP FLOW test_numbers_basic; + +DROP TABLE numbers_input_basic; + +DROP TABLE out_num_cnt_basic;