mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 12:52:57 +00:00
feat: flush_flow flush all ranges now
This commit is contained in:
@@ -363,11 +363,14 @@ impl BatchingEngine {
|
||||
}
|
||||
|
||||
pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
|
||||
info!("Try flush flow {flow_id}");
|
||||
let task = self.tasks.read().await.get(&flow_id).cloned();
|
||||
let task = task.with_context(|| UnexpectedSnafu {
|
||||
reason: format!("Can't found task for flow {flow_id}"),
|
||||
})?;
|
||||
|
||||
task.mark_all_windows_as_dirty()?;
|
||||
|
||||
let res = task
|
||||
.gen_exec_once(&self.query_engine, &self.frontend_client)
|
||||
.await?;
|
||||
|
||||
@@ -22,13 +22,13 @@ use common_telemetry::tracing::warn;
|
||||
use common_time::Timestamp;
|
||||
use datatypes::value::Value;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::ResultExt;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::time::Instant;
|
||||
|
||||
use crate::batching_mode::task::BatchingTask;
|
||||
use crate::batching_mode::MIN_REFRESH_DURATION;
|
||||
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu};
|
||||
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
|
||||
use crate::{Error, FlowId};
|
||||
|
||||
/// The state of the [`BatchingTask`].
|
||||
@@ -118,6 +118,10 @@ impl DirtyTimeWindows {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_window(&mut self, start: Timestamp, end: Option<Timestamp>) {
|
||||
self.windows.insert(start, end);
|
||||
}
|
||||
|
||||
/// Generate all filter expressions consuming all time windows
|
||||
pub fn gen_filter_exprs(
|
||||
&mut self,
|
||||
@@ -180,6 +184,27 @@ impl DirtyTimeWindows {
|
||||
|
||||
let mut expr_lst = vec![];
|
||||
for (start, end) in first_nth.into_iter() {
|
||||
// align using time window exprs
|
||||
let (start, end) = if let Some(ctx) = task_ctx {
|
||||
let Some(time_window_expr) = &ctx.config.time_window_expr else {
|
||||
UnexpectedSnafu {
|
||||
reason: "time_window_expr is not set",
|
||||
}
|
||||
.fail()?
|
||||
};
|
||||
let align_start = time_window_expr.eval(start)?.0.context(UnexpectedSnafu {
|
||||
reason: format!(
|
||||
"Failed to align start time {:?} with time window expr {:?}",
|
||||
start, time_window_expr
|
||||
),
|
||||
})?;
|
||||
let align_end = end
|
||||
.and_then(|end| time_window_expr.eval(end).map(|r| r.1).transpose())
|
||||
.transpose()?;
|
||||
(align_start, align_end)
|
||||
} else {
|
||||
(start, end)
|
||||
};
|
||||
debug!(
|
||||
"Time window start: {:?}, end: {:?}",
|
||||
start.to_iso8601_string(),
|
||||
|
||||
@@ -111,6 +111,36 @@ impl BatchingTask {
|
||||
}
|
||||
}
|
||||
|
||||
/// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set)
|
||||
///
|
||||
/// useful for flush_flow to flush dirty time windows range
|
||||
pub fn mark_all_windows_as_dirty(&self) -> Result<(), Error> {
|
||||
let now = SystemTime::now();
|
||||
let now = Timestamp::new_second(
|
||||
now.duration_since(UNIX_EPOCH)
|
||||
.expect("Time went backwards")
|
||||
.as_secs() as _,
|
||||
);
|
||||
let lower_bound = self
|
||||
.config
|
||||
.expire_after
|
||||
.map(|e| now.sub_duration(Duration::from_secs(e as _)))
|
||||
.transpose()
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?
|
||||
.unwrap_or(Timestamp::new_second(0));
|
||||
debug!(
|
||||
"Flow {} mark range ({:?}, {:?}) as dirty",
|
||||
self.config.flow_id, lower_bound, now
|
||||
);
|
||||
self.state
|
||||
.write()
|
||||
.unwrap()
|
||||
.dirty_time_windows
|
||||
.add_window(lower_bound, Some(now));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Test execute, for check syntax or such
|
||||
pub async fn check_execute(
|
||||
&self,
|
||||
|
||||
62
tests/cases/standalone/common/flow/flow_flush.result
Normal file
62
tests/cases/standalone/common/flow/flow_flush.result
Normal file
@@ -0,0 +1,62 @@
|
||||
-- test if flush_flow works and flush old data to flow for compute
|
||||
CREATE TABLE numbers_input_basic (
|
||||
number INT,
|
||||
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
PRIMARY KEY(number),
|
||||
TIME INDEX(ts)
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO
|
||||
numbers_input_basic
|
||||
VALUES
|
||||
(20, "2021-07-01 00:00:00.200"),
|
||||
(22, "2021-07-01 00:00:00.600");
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
|
||||
SELECT
|
||||
sum(number),
|
||||
date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00.1') as time_window
|
||||
FROM
|
||||
numbers_input_basic
|
||||
GROUP BY
|
||||
time_window;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||
ADMIN FLUSH_FLOW('test_numbers_basic');
|
||||
|
||||
+----------------------------------------+
|
||||
| ADMIN FLUSH_FLOW('test_numbers_basic') |
|
||||
+----------------------------------------+
|
||||
| FLOW_FLUSHED |
|
||||
+----------------------------------------+
|
||||
|
||||
SELECT
|
||||
"sum(numbers_input_basic.number)",
|
||||
time_window
|
||||
FROM
|
||||
out_num_cnt_basic;
|
||||
|
||||
+---------------------------------+-------------------------+
|
||||
| sum(numbers_input_basic.number) | time_window |
|
||||
+---------------------------------+-------------------------+
|
||||
| 42 | 2021-07-01T00:00:00.100 |
|
||||
+---------------------------------+-------------------------+
|
||||
|
||||
DROP FLOW test_numbers_basic;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DROP TABLE numbers_input_basic;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DROP TABLE out_num_cnt_basic;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
37
tests/cases/standalone/common/flow/flow_flush.sql
Normal file
37
tests/cases/standalone/common/flow/flow_flush.sql
Normal file
@@ -0,0 +1,37 @@
|
||||
-- test if flush_flow works and flush old data to flow for compute
|
||||
CREATE TABLE numbers_input_basic (
|
||||
number INT,
|
||||
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
PRIMARY KEY(number),
|
||||
TIME INDEX(ts)
|
||||
);
|
||||
|
||||
INSERT INTO
|
||||
numbers_input_basic
|
||||
VALUES
|
||||
(20, "2021-07-01 00:00:00.200"),
|
||||
(22, "2021-07-01 00:00:00.600");
|
||||
|
||||
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
|
||||
SELECT
|
||||
sum(number),
|
||||
date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00.1') as time_window
|
||||
FROM
|
||||
numbers_input_basic
|
||||
GROUP BY
|
||||
time_window;
|
||||
|
||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||
ADMIN FLUSH_FLOW('test_numbers_basic');
|
||||
|
||||
SELECT
|
||||
"sum(numbers_input_basic.number)",
|
||||
time_window
|
||||
FROM
|
||||
out_num_cnt_basic;
|
||||
|
||||
DROP FLOW test_numbers_basic;
|
||||
|
||||
DROP TABLE numbers_input_basic;
|
||||
|
||||
DROP TABLE out_num_cnt_basic;
|
||||
Reference in New Issue
Block a user