feat(flow): stabilize eval interval scheduling (#8360) (#8378)

* feat(flow): stabilize eval interval scheduling



* fix(flow): satisfy eval schedule clippy



* test(flow): trim eval schedule coverage



* test(flow): cover stable eval scheduling



* fix(flow): reserve scheduled runtime hint



* test(flow): trim sqlness result eof



* fix(flow): harden eval schedule edges



* fix(flow): address scheduled flow review



* fix(flow): clean scheduled config handling



* test(flow): add eval interval compat case



* test(flow): cover show create flow in compat



* fix(flow): drop scheduled time from flow context



* test(flow): assert scheduled now binding



---------

Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
Co-authored-by: discord9 <discord9@163.com>
This commit is contained in:
Yingwen
2026-06-29 21:27:22 +08:00
committed by GitHub
parent 126c212546
commit 4071cb56ae
36 changed files with 2552 additions and 171 deletions

View File

@@ -252,6 +252,7 @@ mod tests {
status: FlowStatus::Active,
created_time: chrono::Utc::now(),
updated_time: chrono::Utc::now(),
eval_schedule: None,
},
(1..=3)
.map(|i| {

View File

@@ -41,7 +41,7 @@ use crate::ddl::DdlContext;
use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error};
use crate::error::{self, Result, UnexpectedSnafu};
use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
use crate::key::flow::flow_info::{FlowInfoValue, FlowStatus};
use crate::key::flow::flow_info::{FlowInfoValue, FlowScheduleConfig, FlowStatus};
use crate::key::flow::flow_route::FlowRouteValue;
use crate::key::table_name::TableNameKey;
use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId};
@@ -69,7 +69,7 @@ impl CreateFlowProcedure {
peers: vec![],
source_table_ids: vec![],
unresolved_source_table_names: vec![],
flow_context: query_context.into(), // Convert to FlowQueryContext
flow_context: without_scheduled_time_extension(query_context).into(),
state: CreateFlowState::Prepare,
prev_flow_info_value: None,
did_replace: false,
@@ -210,6 +210,18 @@ impl CreateFlowProcedure {
}
self.data.flow_type = Some(get_flow_type_from_options(&self.data.task)?);
// Resolve schedule defaults into task.eval_schedule once, before
// CreateRequest / FlowInfoValue are constructed. This ensures the same
// typed schedule config is sent to flownodes and persisted in metadata.
// Idempotent: if eval_schedule is already present we do not recompute.
resolve_schedule_defaults_into_task(
&mut self.data.task,
self.data
.prev_flow_info_value
.as_ref()
.map(|v| v.get_inner_ref()),
);
self.data.state = if self.data.is_pending() {
self.data.peers.clear();
CreateFlowState::CreateMetadata
@@ -249,7 +261,12 @@ impl CreateFlowProcedure {
header: Some(FlowRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
// Convert FlowQueryContext to QueryContext
query_context: Some(QueryContext::from(self.data.flow_context.clone()).into()),
query_context: Some(
without_scheduled_time_extension(QueryContext::from(
self.data.flow_context.clone(),
))
.into(),
),
}),
body: Some(PbFlowRequest::Create((&self.data).into())),
};
@@ -405,6 +422,23 @@ pub fn get_flow_type_from_options(flow_task: &CreateFlowTask) -> Result<FlowType
/// The flow option key for creating pending flow metadata when source tables do not exist.
pub const DEFER_ON_MISSING_SOURCE_KEY: &str = "defer_on_missing_source";
/// Internal transient key used to pass the serialized `FlowScheduleConfig` from
/// meta to flownode through `CreateRequest.flow_options`. This key must never
/// be accepted as a user-provided option and must never be persisted into
/// `FlowInfoValue.options`.
/// TODO(discord9): Replace this transient flow_options transport with a typed
/// field in the flow create request.
pub const INTERNAL_EVAL_SCHEDULE_KEY: &str = "__greptime_internal_eval_schedule";
const FLOW_SCHEDULED_TIME_MILLIS_EXTENSION_KEY: &str = "flow.scheduled_time_millis";
fn without_scheduled_time_extension(mut query_context: QueryContext) -> QueryContext {
query_context
.extensions
.remove(FLOW_SCHEDULED_TIME_MILLIS_EXTENSION_KEY);
query_context
}
pub fn defer_on_missing_source(flow_task: &CreateFlowTask) -> Result<bool> {
flow_task
.flow_options
@@ -428,6 +462,16 @@ pub fn defer_on_missing_source(flow_task: &CreateFlowTask) -> Result<bool> {
}
pub fn validate_flow_options(flow_task: &CreateFlowTask) -> Result<()> {
// Reject non-positive eval_interval_secs (zero or negative).
if let Some(secs) = flow_task.eval_interval_secs
&& secs <= 0
{
return UnexpectedSnafu {
err_msg: format!("EVAL INTERVAL must be positive, got {secs} seconds"),
}
.fail();
}
for key in flow_task.flow_options.keys() {
match key.as_str() {
DEFER_ON_MISSING_SOURCE_KEY
@@ -449,16 +493,116 @@ pub fn validate_flow_options(flow_task: &CreateFlowTask) -> Result<()> {
Ok(())
}
/// Computes the ceiling of `time` to the next schedule boundary aligned with `anchor + k * interval`.
/// All values are Unix timestamps in seconds.
fn ceil_to_boundary(time: i64, anchor: i64, interval: i64) -> i64 {
if interval <= 0 {
return time;
}
if time <= anchor {
return anchor;
}
let diff = i128::from(time) - i128::from(anchor);
let interval = i128::from(interval);
let k = (diff + interval - 1) / interval;
let boundary = i128::from(anchor) + k * interval;
boundary.clamp(i128::from(i64::MIN), i128::from(i64::MAX)) as i64
}
/// Returns the effective typed schedule config for flow metadata.
///
/// New metadata should carry `FlowInfoValue.eval_schedule`. For older metadata
/// that lacks the typed field, derive a deterministic config from `created_time`
/// and defaults. This avoids recovery-time wall-clock drift.
pub fn effective_eval_schedule_from_flow_info(
flow_info: &FlowInfoValue,
) -> Option<FlowScheduleConfig> {
if let Some(schedule) = &flow_info.eval_schedule {
return Some(schedule.clone());
}
let eval_interval_secs = flow_info.eval_interval_secs?;
if eval_interval_secs <= 0 {
return None;
}
let start_secs = ceil_to_boundary(
flow_info.created_time.timestamp(),
FlowScheduleConfig::DEFAULT_ANCHOR_SECS,
eval_interval_secs,
);
Some(FlowScheduleConfig::default_with_start(
start_secs,
eval_interval_secs,
))
}
/// Resolve `FlowScheduleConfig` into `task.eval_schedule`.
///
/// This must be called in `on_prepare` after `prev_flow_info` is loaded so that
/// `CreateRequest` and `FlowInfoValue` both see the same resolved defaults.
///
/// The function is idempotent: if `task.eval_schedule` is already `Some`,
/// it returns immediately (important for procedure retry / dump-restore).
///
/// Schedule configuration is NOT read from `task.flow_options` (those keys are
/// no longer user-facing options). For new flows, pure defaults are computed.
/// For OR REPLACE, the previous typed config is preserved when interval+anchor
/// are unchanged; otherwise defaults are recomputed.
pub(crate) fn resolve_schedule_defaults_into_task(
task: &mut CreateFlowTask,
prev_flow_info: Option<&FlowInfoValue>,
) {
// Idempotent: if already computed, skip recomputation.
if task.eval_schedule.is_some() {
return;
}
let Some(eval_interval_secs) = task.eval_interval_secs else {
return;
};
if eval_interval_secs <= 0 {
return;
}
let anchor_secs = FlowScheduleConfig::DEFAULT_ANCHOR_SECS;
// For OR REPLACE: if interval+anchor unchanged, preserve the entire
// existing typed config so start / policy / limits are stable.
if task.or_replace
&& let Some(prev) = prev_flow_info
&& let Some(old_sched) = effective_eval_schedule_from_flow_info(prev)
{
let old_interval = prev.eval_interval_secs.unwrap_or(0);
if old_interval == eval_interval_secs && old_sched.anchor_secs == anchor_secs {
task.eval_schedule = Some(old_sched);
return;
}
}
// --- Compute start ---
let start_secs =
// New flow, or OR REPLACE with changed interval/anchor: start at the next
// aligned boundary after prepare time. The value is written into
// task.eval_schedule once so procedure retry does not recompute it.
ceil_to_boundary(chrono::Utc::now().timestamp(), anchor_secs, eval_interval_secs);
task.eval_schedule = Some(FlowScheduleConfig::default_with_start(
start_secs,
eval_interval_secs,
));
}
fn user_runtime_flow_options(options: &HashMap<String, String>) -> HashMap<String, String> {
let mut options = options.clone();
options.remove(DEFER_ON_MISSING_SOURCE_KEY);
options.remove(INTERNAL_EVAL_SCHEDULE_KEY);
options
}
fn metadata_flow_options(options: &HashMap<String, String>) -> HashMap<String, String> {
options.clone()
}
/// The state of [CreateFlowProcedure].
#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
pub enum CreateFlowState {
@@ -561,23 +705,40 @@ impl From<&CreateFlowData> for CreateRequest {
let flow_type = value.flow_type.unwrap_or_default().to_string();
req.flow_options
.insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type);
// Pass typed schedule config via internal transient key in flow_options.
if let Some(ref sched) = value.task.eval_schedule {
let json = serde_json::to_string(sched)
.expect("FlowScheduleConfig serialization should not fail");
req.flow_options
.insert(INTERNAL_EVAL_SCHEDULE_KEY.to_string(), json);
}
req
}
}
impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteValue)>) {
fn from(value: &CreateFlowData) -> Self {
let CreateFlowTask {
catalog_name,
flow_name,
sink_table_name,
expire_after,
eval_interval_secs: eval_interval,
comment,
sql,
..
} = value.task.clone();
let mut options = metadata_flow_options(&value.task.flow_options);
let catalog_name = value.task.catalog_name.clone();
let flow_name = value.task.flow_name.clone();
let sink_table_name = value.task.sink_table_name.clone();
let expire_after = value.task.expire_after;
let eval_interval = value.task.eval_interval_secs;
let comment = value.task.comment.clone();
let sql = value.task.sql.clone();
let eval_schedule = value.task.eval_schedule.clone();
// Start with a clean options map. The transient schedule payload is
// only for the meta→flownode CreateRequest boundary and must not be
// persisted in FlowInfoValue.options.
let mut options: HashMap<String, String> = value
.task
.flow_options
.iter()
.filter(|(k, _)| k.as_str() != INTERNAL_EVAL_SCHEDULE_KEY)
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
let flownode_ids = value
.peers
@@ -602,20 +763,25 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa
create_time = prev_flow_value.get_inner_ref().created_time;
}
// This conversion borrows the procedure data: the procedure keeps using
// `self.data` after metadata creation (for cache invalidation, retry and
// procedure dump/restore), while `FlowInfoValue` owns the persisted
// metadata. Cloning these owned fields is therefore intentional.
let flow_info: FlowInfoValue = FlowInfoValue {
source_table_ids: value.source_table_ids.clone(),
all_source_table_names: value.task.source_table_names.clone(),
unresolved_source_table_names: value.unresolved_source_table_names.clone(),
sink_table_name,
sink_table_name: sink_table_name.clone(),
flownode_ids,
catalog_name,
// Convert FlowQueryContext back to QueryContext for storage
query_context: Some(QueryContext::from(value.flow_context.clone())),
flow_name,
raw_sql: sql,
catalog_name: catalog_name.clone(),
query_context: Some(without_scheduled_time_extension(QueryContext::from(
value.flow_context.clone(),
))),
flow_name: flow_name.clone(),
raw_sql: sql.clone(),
expire_after,
eval_interval_secs: eval_interval,
comment,
comment: comment.clone(),
options,
status: if value.is_active() {
FlowStatus::Active
@@ -624,6 +790,7 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa
},
created_time: create_time,
updated_time: chrono::Utc::now(),
eval_schedule: eval_schedule.clone(),
};
(flow_info, flow_routes)

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use std::assert_matches;
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use api::v1::flow::CreateRequest;
@@ -26,12 +26,16 @@ use crate::ddl::DdlContext;
use crate::ddl::create_flow::{
CreateFlowData, CreateFlowProcedure, CreateFlowState, DEFER_ON_MISSING_SOURCE_KEY,
FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY, FlowType, defer_on_missing_source,
effective_eval_schedule_from_flow_info, resolve_schedule_defaults_into_task,
validate_flow_options,
};
use crate::ddl::test_util::create_table::test_create_table_task;
use crate::ddl::test_util::flownode_handler::NaiveFlownodeHandler;
use crate::error;
use crate::key::FlowId;
use crate::key::flow::flow_info::{
FlowInfoValue, FlowMissedTickPolicy, FlowScheduleConfig, FlowStatus,
};
use crate::key::table_route::TableRouteValue;
use crate::rpc::ddl::{CreateFlowTask, FlowQueryContext, QueryContext};
use crate::test_util::{MockFlownodeManager, new_ddl_context};
@@ -66,6 +70,7 @@ pub(crate) fn test_create_flow_task(
comment: "".to_string(),
sql: "select 1".to_string(),
flow_options: Default::default(),
eval_schedule: None,
}
}
@@ -74,6 +79,34 @@ fn enable_defer_on_missing_source(task: &mut CreateFlowTask) {
.insert(DEFER_ON_MISSING_SOURCE_KEY.to_string(), "true".to_string());
}
fn flow_info_for_schedule_test(
eval_schedule: Option<FlowScheduleConfig>,
eval_interval_secs: Option<i64>,
options: HashMap<String, String>,
created_time_secs: i64,
) -> FlowInfoValue {
let created_time = chrono::DateTime::from_timestamp(created_time_secs, 0).unwrap();
FlowInfoValue {
source_table_ids: vec![],
all_source_table_names: vec![],
unresolved_source_table_names: vec![],
sink_table_name: TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "sink"),
flownode_ids: BTreeMap::new(),
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
query_context: None,
flow_name: "flow".to_string(),
raw_sql: "select 1".to_string(),
expire_after: None,
eval_interval_secs,
comment: String::new(),
options,
status: FlowStatus::Active,
created_time,
updated_time: created_time,
eval_schedule,
}
}
#[tokio::test]
async fn test_create_flow_source_table_not_found() {
let source_table_names = vec![TableName::new(
@@ -292,6 +325,248 @@ fn test_validate_flow_options_allows_incremental_read_option() {
validate_flow_options(&task).unwrap();
}
#[test]
fn test_validate_flow_options_rejects_schedule_and_internal_keys_as_unknown() {
for key in [
"eval_interval_anchor",
"eval_interval_start",
"eval_interval_missed_tick_policy",
"eval_interval_catchup_max_runs",
"eval_interval_catchup_max_lag",
"__greptime_internal_eval_schedule",
] {
let mut task = test_create_flow_task(
"my_flow",
vec![],
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_sink_table"),
false,
);
task.eval_interval_secs = Some(300);
task.flow_options
.insert(key.to_string(), "value".to_string());
let err = validate_flow_options(&task).unwrap_err();
assert!(
err.to_string()
.contains(&format!("Unknown flow option '{key}'")),
"unexpected error for {key}: {err}"
);
}
}
#[test]
fn test_validate_flow_options_rejects_non_positive_eval_interval() {
for secs in [0, -1] {
let mut task = test_create_flow_task(
"my_flow",
vec![],
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_sink_table"),
false,
);
task.eval_interval_secs = Some(secs);
let err = validate_flow_options(&task).unwrap_err();
assert!(err.to_string().contains("EVAL INTERVAL must be positive"));
}
}
#[test]
fn test_resolved_schedule_defaults_in_create_request() {
let mut task = test_create_flow_task(
"my_flow",
vec![],
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_sink_table"),
false,
);
task.eval_interval_secs = Some(300);
// Simulate on_prepare: resolve defaults into task.
resolve_schedule_defaults_into_task(&mut task, None);
// Verify typed schedule config is populated.
let sched = task.eval_schedule.as_ref().unwrap();
assert!(sched.start_secs > 0);
assert_eq!(sched.anchor_secs, 0);
assert_eq!(
sched.missed_tick_policy,
FlowMissedTickPolicy::BoundedCatchUp
);
assert_eq!(sched.catchup_max_runs, 3);
assert!(sched.catchup_max_lag_secs >= 300);
// Verify schedule option names are NOT in flow_options.
for key in &[
"eval_interval_start",
"eval_interval_anchor",
"eval_interval_missed_tick_policy",
"eval_interval_catchup_max_runs",
"eval_interval_catchup_max_lag",
] {
assert!(
!task.flow_options.contains_key(*key),
"flow_options should not contain {key}"
);
}
// Idempotent: second call does not overwrite.
let start_before = sched.start_secs;
resolve_schedule_defaults_into_task(&mut task, None);
assert_eq!(
task.eval_schedule.as_ref().unwrap().start_secs,
start_before
);
let flow_context = FlowQueryContext {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
timezone: "UTC".to_string(),
extensions: HashMap::new(),
channel: 0,
snapshot_seqs: HashMap::new(),
sst_min_sequences: HashMap::new(),
};
// Construct CreateFlowData and verify CreateRequest carries internal key.
let data = CreateFlowData {
state: CreateFlowState::CreateFlows,
task,
flow_id: Some(1024),
peers: vec![],
source_table_ids: vec![],
unresolved_source_table_names: vec![],
flow_context: flow_context.clone(),
prev_flow_info_value: None,
did_replace: false,
flow_type: Some(FlowType::Batching),
};
let data2 = CreateFlowData {
state: CreateFlowState::CreateMetadata,
task: data.task.clone(),
flow_id: Some(1024),
peers: vec![],
source_table_ids: vec![],
unresolved_source_table_names: vec![],
flow_context,
prev_flow_info_value: None,
did_replace: false,
flow_type: Some(FlowType::Batching),
};
let request: CreateRequest = (&data).into();
// Verify internal key is present in the runtime request.
assert!(
request
.flow_options
.contains_key("__greptime_internal_eval_schedule"),
"CreateRequest must contain internal eval schedule key"
);
// Verify FlowInfoValue has eval_schedule populated, and options do NOT
// contain schedule keys.
let (flow_info, _) = <(FlowInfoValue, Vec<(_, _)>)>::from(&data2);
assert!(
flow_info.eval_schedule.is_some(),
"FlowInfoValue must have eval_schedule set"
);
for key in &[
"eval_interval_start",
"eval_interval_anchor",
"eval_interval_missed_tick_policy",
"eval_interval_catchup_max_runs",
"eval_interval_catchup_max_lag",
] {
assert!(
!flow_info.options.contains_key(*key),
"FlowInfoValue.options should not contain {key}"
);
}
}
#[test]
fn test_effective_eval_schedule_prefers_typed_config() {
let typed = FlowScheduleConfig {
anchor_secs: 0,
start_secs: 999,
missed_tick_policy: FlowMissedTickPolicy::BoundedCatchUp,
catchup_max_runs: 9,
catchup_max_lag_secs: 900,
};
let unrelated_options = HashMap::from([("unrelated".to_string(), "value".to_string())]);
let flow_info =
flow_info_for_schedule_test(Some(typed.clone()), Some(60), unrelated_options, 1);
let effective = effective_eval_schedule_from_flow_info(&flow_info).unwrap();
assert_eq!(effective, typed);
}
#[test]
fn test_effective_eval_schedule_uses_created_time_default() {
let flow_info = flow_info_for_schedule_test(None, Some(60), HashMap::new(), 61);
let effective = effective_eval_schedule_from_flow_info(&flow_info).unwrap();
assert_eq!(effective.anchor_secs, 0);
assert_eq!(effective.start_secs, 120);
assert_eq!(
effective.missed_tick_policy,
FlowMissedTickPolicy::BoundedCatchUp
);
assert_eq!(effective.catchup_max_runs, 3);
assert_eq!(effective.catchup_max_lag_secs, 300);
}
#[test]
fn test_effective_eval_schedule_none_without_eval_interval() {
let flow_info = flow_info_for_schedule_test(None, None, HashMap::new(), 61);
assert!(effective_eval_schedule_from_flow_info(&flow_info).is_none());
}
#[test]
fn test_effective_eval_schedule_deterministic_on_old_metadata() {
// Old metadata: no typed eval_schedule, but has eval_interval_secs and
// a fixed created_time. Repeated calls must produce the same config.
let flow_info = flow_info_for_schedule_test(None, Some(60), HashMap::new(), 61);
let first = effective_eval_schedule_from_flow_info(&flow_info).unwrap();
let second = effective_eval_schedule_from_flow_info(&flow_info).unwrap();
assert_eq!(first, second);
// Sanity: the derived values are based on created_time=61 + interval=60.
assert_eq!(first.anchor_secs, 0);
assert_eq!(first.start_secs, 120); // ceil(61, 0, 60) = 120
assert_eq!(
first.missed_tick_policy,
FlowMissedTickPolicy::BoundedCatchUp
);
assert_eq!(first.catchup_max_runs, 3);
assert_eq!(first.catchup_max_lag_secs, 300); // max(300, 3*60) = 300
}
#[test]
fn test_old_flow_info_json_without_eval_schedule_deserializes() {
let typed = FlowScheduleConfig {
anchor_secs: 0,
start_secs: 999,
missed_tick_policy: FlowMissedTickPolicy::BoundedCatchUp,
catchup_max_runs: 9,
catchup_max_lag_secs: 900,
};
let flow_info = flow_info_for_schedule_test(Some(typed), Some(60), HashMap::new(), 61);
let mut json = serde_json::to_value(&flow_info).unwrap();
json.as_object_mut().unwrap().remove("eval_schedule");
let decoded: FlowInfoValue = serde_json::from_value(json).unwrap();
assert!(decoded.eval_schedule.is_none());
let effective = effective_eval_schedule_from_flow_info(&decoded).unwrap();
assert_eq!(effective.anchor_secs, 0);
assert_eq!(effective.start_secs, 120);
assert_eq!(
effective.missed_tick_policy,
FlowMissedTickPolicy::BoundedCatchUp
);
assert_eq!(effective.catchup_max_runs, 3);
assert_eq!(effective.catchup_max_lag_secs, 300);
}
#[tokio::test]
async fn test_create_flow_rejects_unknown_option_in_meta_task() {
let mut task = test_create_flow_task(
@@ -702,6 +977,7 @@ fn create_test_flow_task_for_serialization() -> CreateFlowTask {
comment: "test comment".to_string(),
sql: "SELECT * FROM source_table".to_string(),
flow_options: HashMap::new(),
eval_schedule: None,
}
}
@@ -835,6 +1111,44 @@ fn test_flow_query_context_conversion_from_query_context() {
assert_eq!(flow_context.sst_min_sequences, HashMap::from([(1, 8)]));
}
#[test]
fn test_create_flow_procedure_strips_scheduled_time_extension() {
let task = test_create_flow_task(
"my_flow",
vec![],
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_sink_table"),
false,
);
let ddl_context = new_ddl_context(Arc::new(MockFlownodeManager::new(NaiveFlownodeHandler)));
let mut query_ctx = test_query_context();
query_ctx.extensions.insert(
"flow.scheduled_time_millis".to_string(),
"1700000000000".to_string(),
);
query_ctx
.extensions
.insert("flow.other".to_string(), "kept".to_string());
let procedure = CreateFlowProcedure::new(task, query_ctx, ddl_context);
assert!(
!procedure
.data
.flow_context
.extensions
.contains_key("flow.scheduled_time_millis")
);
assert_eq!(
procedure
.data
.flow_context
.extensions
.get("flow.other")
.map(String::as_str),
Some("kept")
);
}
#[test]
fn test_flow_info_conversion_with_flow_context() {
let flow_context = FlowQueryContext {
@@ -871,6 +1185,53 @@ fn test_flow_info_conversion_with_flow_context() {
assert!(query_context.extensions().is_empty());
}
#[test]
fn test_flow_info_conversion_strips_scheduled_time_extension() {
let flow_context = FlowQueryContext {
catalog: "info_catalog".to_string(),
schema: "info_schema".to_string(),
timezone: "UTC".to_string(),
extensions: HashMap::from([
(
"flow.scheduled_time_millis".to_string(),
"1700000000000".to_string(),
),
("flow.other".to_string(), "kept".to_string()),
]),
channel: 0,
snapshot_seqs: HashMap::new(),
sst_min_sequences: HashMap::new(),
};
let data = CreateFlowData {
state: CreateFlowState::CreateMetadata,
task: create_test_flow_task_for_serialization(),
flow_id: Some(123),
peers: vec![],
source_table_ids: vec![],
unresolved_source_table_names: vec![],
flow_context,
prev_flow_info_value: None,
did_replace: false,
flow_type: Some(FlowType::Batching),
};
let (flow_info, _routes) = (&data).into();
let query_context = flow_info.query_context.unwrap();
assert!(
!query_context
.extensions()
.contains_key("flow.scheduled_time_millis")
);
assert_eq!(
query_context
.extensions()
.get("flow.other")
.map(String::as_str),
Some("kept")
);
}
#[test]
fn test_mixed_serialization_format_support() {
// Test that we can deserialize both old and new formats

View File

@@ -535,6 +535,7 @@ mod tests {
status: FlowStatus::Active,
created_time: chrono::Utc::now(),
updated_time: chrono::Utc::now(),
eval_schedule: None,
}
}
@@ -790,6 +791,7 @@ mod tests {
status: FlowStatus::Active,
created_time: chrono::Utc::now(),
updated_time: chrono::Utc::now(),
eval_schedule: None,
};
let err = flow_metadata_manager
.create_flow_metadata(flow_id, flow_value, flow_routes.clone())
@@ -1170,6 +1172,7 @@ mod tests {
status: FlowStatus::Active,
created_time: chrono::Utc::now(),
updated_time: chrono::Utc::now(),
eval_schedule: None,
};
let err = flow_metadata_manager
.update_flow_metadata(

View File

@@ -127,6 +127,86 @@ impl<'a> MetadataKey<'a, FlowInfoKeyInner> for FlowInfoKeyInner {
}
}
/// Internal typed schedule configuration for `EVAL INTERVAL` flows.
///
/// This struct is the canonical schedule state for `EVAL INTERVAL` flows and
/// is stored alongside `FlowInfoValue` (not inside `options`).
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct FlowScheduleConfig {
/// Anchor timestamp in seconds since Unix epoch (default: 0 = epoch).
pub anchor_secs: i64,
/// Start timestamp in seconds since Unix epoch.
pub start_secs: i64,
/// Policy for handling missed ticks.
#[serde(default)]
pub missed_tick_policy: FlowMissedTickPolicy,
/// Maximum number of catch-up runs when using bounded catch-up.
///
/// A catch-up run is an evaluation for a scheduled timestamp that is
/// already due but was missed because the flownode was stopped or busy.
#[serde(default = "FlowScheduleConfig::default_catchup_max_runs")]
pub catchup_max_runs: u32,
/// Maximum age (in seconds) of a due scheduled time to still include in catch-up.
#[serde(default = "FlowScheduleConfig::default_catchup_max_lag_secs")]
pub catchup_max_lag_secs: i64,
}
impl FlowScheduleConfig {
pub const DEFAULT_ANCHOR_SECS: i64 = 0;
pub const DEFAULT_CATCHUP_MAX_RUNS: u32 = 3;
pub const DEFAULT_CATCHUP_MAX_LAG_SECS: i64 = 300;
pub fn default_catchup_max_runs() -> u32 {
Self::DEFAULT_CATCHUP_MAX_RUNS
}
pub fn default_catchup_max_lag_secs() -> i64 {
Self::DEFAULT_CATCHUP_MAX_LAG_SECS
}
pub fn catchup_max_lag_secs_for_interval(eval_interval_secs: i64) -> i64 {
std::cmp::max(
Self::DEFAULT_CATCHUP_MAX_LAG_SECS,
3_i64.saturating_mul(eval_interval_secs),
)
}
pub fn default_with_start(start_secs: i64, eval_interval_secs: i64) -> Self {
Self {
anchor_secs: Self::DEFAULT_ANCHOR_SECS,
start_secs,
missed_tick_policy: FlowMissedTickPolicy::BoundedCatchUp,
catchup_max_runs: Self::DEFAULT_CATCHUP_MAX_RUNS,
catchup_max_lag_secs: Self::catchup_max_lag_secs_for_interval(eval_interval_secs),
}
}
}
impl Default for FlowScheduleConfig {
fn default() -> Self {
Self {
anchor_secs: Self::DEFAULT_ANCHOR_SECS,
start_secs: 0,
missed_tick_policy: FlowMissedTickPolicy::default(),
catchup_max_runs: Self::default_catchup_max_runs(),
catchup_max_lag_secs: Self::default_catchup_max_lag_secs(),
}
}
}
/// Policy for handling flow evaluation scheduled times that were missed.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
pub enum FlowMissedTickPolicy {
/// Keep the most recent `catchup_max_runs` due scheduled times within
/// `catchup_max_lag_secs`, drop older ones.
#[default]
#[serde(rename = "bounded_catch_up")]
BoundedCatchUp,
/// Skip all missed scheduled times; only execute the single most recent one.
#[serde(rename = "skip")]
Skip,
}
// The metadata of the flow.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct FlowInfoValue {
@@ -175,6 +255,12 @@ pub struct FlowInfoValue {
/// The updated time.
#[serde(default)]
pub updated_time: DateTime<Utc>,
/// Typed schedule configuration for `EVAL INTERVAL` flows.
/// When `eval_interval_secs` is set, this field carries the resolved
/// schedule parameters (anchor, start, missed-tick policy, catch-up
/// limits). Absent for flows without `EVAL INTERVAL`.
#[serde(default)]
pub eval_schedule: Option<FlowScheduleConfig>,
}
impl FlowInfoValue {

View File

@@ -1184,6 +1184,11 @@ pub struct CreateFlowTask {
pub comment: String,
pub sql: String,
pub flow_options: HashMap<String, String>,
/// Typed schedule configuration resolved during `on_prepare`.
/// Not populated from proto; set by the procedure layer after
/// defaults are resolved.
#[serde(default)]
pub eval_schedule: Option<crate::key::flow::flow_info::FlowScheduleConfig>,
}
impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
@@ -1222,6 +1227,7 @@ impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
comment,
sql,
flow_options,
eval_schedule: None,
})
}
}
@@ -1240,6 +1246,7 @@ impl From<CreateFlowTask> for PbCreateFlowTask {
comment,
sql,
flow_options,
..
}: CreateFlowTask,
) -> Self {
PbCreateFlowTask {

View File

@@ -745,6 +745,7 @@ impl StreamingEngine {
sql,
flow_options,
query_ctx,
..
} = args;
let mut node_ctx = self.node_context.write().await;

View File

@@ -25,9 +25,12 @@ use api::v1::region::InsertRequests;
use catalog::CatalogManager;
use common_base::Plugins;
use common_error::ext::BoxedError;
use common_meta::ddl::create_flow::FlowType;
use common_meta::ddl::create_flow::{
FlowType, INTERNAL_EVAL_SCHEDULE_KEY, effective_eval_schedule_from_flow_info,
};
use common_meta::error::Result as MetaResult;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::flow::flow_info::FlowScheduleConfig;
use common_meta::key::flow::flow_state::FlowStat;
use common_runtime::JoinHandle;
use common_telemetry::{error, info, trace, warn};
@@ -380,6 +383,7 @@ impl FlowDualEngine {
comment: Some(info.comment().clone()),
sql: info.raw_sql().clone(),
flow_options: info.options().clone(),
eval_schedule: effective_eval_schedule_from_flow_info(&info),
query_ctx: info
.query_context()
.clone()
@@ -839,7 +843,7 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
eval_interval,
comment,
sql,
flow_options,
mut flow_options,
or_replace,
})) => {
let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
@@ -849,6 +853,10 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
sink_table_name.table_name,
];
let expire_after = expire_after.map(|e| e.value);
let eval_schedule = decode_internal_eval_schedule(&mut flow_options)
.map_err(to_meta_err(snafu::location!()))?;
let args = CreateFlowArgs {
flow_id: task_id.id as u64,
sink_table_name,
@@ -861,6 +869,7 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
sql: sql.clone(),
flow_options,
query_ctx,
eval_schedule,
};
let ret = self
.create_flow(args)
@@ -919,6 +928,24 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
}
}
/// Decode typed schedule config from the internal transient key emitted by metasrv.
/// Malformed JSON is an internal error rather than a reason to silently fall back.
fn decode_internal_eval_schedule(
flow_options: &mut HashMap<String, String>,
) -> Result<Option<FlowScheduleConfig>, Error> {
match flow_options.remove(INTERNAL_EVAL_SCHEDULE_KEY) {
Some(json) => serde_json::from_str::<FlowScheduleConfig>(&json)
.map(Some)
.map_err(|err| {
InternalSnafu {
reason: format!("Invalid internal eval schedule payload: {err}"),
}
.build()
}),
None => Ok(None),
}
}
/// return a function to convert `crate::error::Error` to `common_meta::error::Error`
fn to_meta_err(
location: snafu::Location,
@@ -1128,3 +1155,30 @@ impl StreamingEngine {
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use common_meta::ddl::create_flow::INTERNAL_EVAL_SCHEDULE_KEY;
use super::decode_internal_eval_schedule;
use crate::error::Error;
#[test]
fn test_malformed_internal_eval_schedule_json_is_error() {
let mut flow_options = HashMap::new();
flow_options.insert(
INTERNAL_EVAL_SCHEDULE_KEY.to_string(),
"not-json".to_string(),
);
let err = decode_internal_eval_schedule(&mut flow_options).unwrap_err();
assert!(matches!(
err,
Error::Internal { reason, .. }
if reason.contains("Invalid internal eval schedule payload")
));
assert!(!flow_options.contains_key(INTERNAL_EVAL_SCHEDULE_KEY));
}
}

View File

@@ -22,6 +22,7 @@ use session::ReadPreference;
mod checkpoint;
pub(crate) mod engine;
mod eval_schedule;
pub(crate) mod frontend_client;
mod state;
mod table_creator;

View File

@@ -44,6 +44,7 @@ use table::table_reference::TableReference;
use tokio::sync::{RwLock, oneshot};
use crate::batching_mode::BatchingModeOptions;
use crate::batching_mode::eval_schedule::EvalSchedule;
use crate::batching_mode::frontend_client::FrontendClient;
use crate::batching_mode::task::{BatchingTask, TaskArgs};
use crate::batching_mode::time_window::{TimeWindowExpr, find_time_window_expr};
@@ -507,6 +508,7 @@ impl BatchingEngine {
sql,
flow_options,
query_ctx,
eval_schedule: eval_schedule_config,
} = args;
// or replace logic
@@ -629,6 +631,24 @@ impl BatchingEngine {
Self::ensure_sql_flow_has_twe_or_eval_interval(eval_interval, phy_expr.is_some())?;
}
// Compute typed EvalSchedule from FlowScheduleConfig.
let eval_schedule = {
let interval = eval_interval;
let config = eval_schedule_config.as_ref();
match EvalSchedule::from_config(interval, config) {
Ok(s) => s,
Err(e) => {
return UnexpectedSnafu {
reason: format!(
"Failed to build eval schedule for flow {}: {}",
flow_id, e
),
}
.fail();
}
}
};
let task_args = TaskArgs {
flow_id,
query: &sql,
@@ -642,6 +662,7 @@ impl BatchingEngine {
shutdown_rx: rx,
batch_opts,
flow_eval_interval: eval_interval.map(|secs| Duration::from_secs(secs as u64)),
eval_schedule,
};
let task = BatchingTask::try_new(task_args)?;
@@ -1186,6 +1207,7 @@ GROUP BY l.number, time_window
shutdown_rx: rx,
batch_opts: Arc::new(BatchingModeOptions::default()),
flow_eval_interval: None,
eval_schedule: None,
})
.unwrap();

View File

@@ -0,0 +1,353 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Helpers for stable `EVAL INTERVAL` scheduled times.
pub use common_meta::key::flow::flow_info::{FlowMissedTickPolicy, FlowScheduleConfig};
use snafu::ensure;
use crate::error::{InvalidQuerySnafu, Result};
/// Schedule for an `EVAL INTERVAL` flow.
#[derive(Debug, Clone, PartialEq)]
pub struct EvalSchedule {
/// Interval between scheduled times in seconds.
pub interval_secs: i64,
/// Anchor timestamp as seconds since Unix epoch.
pub anchor_secs: i64,
/// First scheduled time as seconds since Unix epoch.
pub start_secs: i64,
/// Policy for handling missed scheduled times.
pub missed_tick_policy: FlowMissedTickPolicy,
/// Maximum number of due scheduled times to catch up.
pub max_runs: u32,
/// Maximum age of a due scheduled time to keep for catch-up.
pub max_lag_secs: i64,
}
impl EvalSchedule {
pub fn from_config(
eval_interval_secs: Option<i64>,
config: Option<&FlowScheduleConfig>,
) -> Result<Option<Self>> {
let Some(interval_secs) = eval_interval_secs else {
return Ok(None);
};
ensure!(
interval_secs > 0,
InvalidQuerySnafu {
reason: format!(
"Invalid eval_interval_secs: must be positive, got {interval_secs}"
)
}
);
Ok(Some(match config {
Some(c) => {
ensure!(
c.catchup_max_runs > 0,
InvalidQuerySnafu {
reason:
"Invalid FlowScheduleConfig.catchup_max_runs: must be positive, got 0"
.to_string()
}
);
ensure!(
c.catchup_max_lag_secs > 0,
InvalidQuerySnafu {
reason: format!(
"Invalid FlowScheduleConfig.catchup_max_lag_secs: must be positive, got {}",
c.catchup_max_lag_secs
)
}
);
Self {
interval_secs,
anchor_secs: c.anchor_secs,
start_secs: c.start_secs,
missed_tick_policy: c.missed_tick_policy,
max_runs: c.catchup_max_runs,
max_lag_secs: c.catchup_max_lag_secs,
}
}
None => {
let c = FlowScheduleConfig::default_with_start(0, interval_secs);
Self {
interval_secs,
anchor_secs: c.anchor_secs,
start_secs: c.start_secs,
missed_tick_policy: c.missed_tick_policy,
max_runs: c.catchup_max_runs,
max_lag_secs: c.catchup_max_lag_secs,
}
}
}))
}
/// Returns the next scheduled time strictly after `cursor_secs`.
pub fn next_scheduled_time_after(&self, cursor_secs: i64) -> i64 {
next_in_sequence(cursor_secs, self.start_secs, self.interval_secs)
}
}
fn next_in_sequence(cursor: i64, start: i64, interval: i64) -> i64 {
if interval <= 0 {
return cursor.saturating_add(1).max(start);
}
if cursor < start {
return start;
}
let k = (cursor - start) / interval;
start.saturating_add((k + 1).saturating_mul(interval))
}
fn first_due_in_sequence(cursor: i64, start: i64, interval: i64) -> i64 {
if interval <= 0 {
return cursor.saturating_add(1).max(start);
}
if cursor < start {
start
} else {
next_in_sequence(cursor, start, interval)
}
}
/// Scheduled times selected for execution in one scheduler pass.
///
/// A scheduled time is the logical evaluation timestamp for one flow run. When
/// executing a timestamp from `scheduled_times_secs`, SQL/TQL `now()` is bound
/// to that timestamp instead of the wall-clock execution time.
#[derive(Debug, Clone, PartialEq)]
pub struct DueScheduledTimes {
/// Scheduled times to execute, ordered oldest to newest.
pub scheduled_times_secs: Vec<i64>,
/// Number of due scheduled times skipped by lag or max-runs limits.
pub skipped: u64,
}
/// Select due scheduled times `<= wall_now_secs` without materializing all missed ticks.
pub fn select_due_scheduled_times(
schedule: &EvalSchedule,
cursor_secs: i64,
wall_now_secs: i64,
) -> Option<DueScheduledTimes> {
if schedule.interval_secs <= 0 {
return None;
}
let first_due = first_due_in_sequence(cursor_secs, schedule.start_secs, schedule.interval_secs);
if first_due > wall_now_secs {
return Some(DueScheduledTimes {
scheduled_times_secs: vec![],
skipped: 0,
});
}
let total_count = ((wall_now_secs - first_due) / schedule.interval_secs) as u64 + 1;
match schedule.missed_tick_policy {
FlowMissedTickPolicy::Skip => {
let last = first_due + (total_count as i64 - 1) * schedule.interval_secs;
Some(DueScheduledTimes {
scheduled_times_secs: vec![last],
skipped: total_count.saturating_sub(1),
})
}
FlowMissedTickPolicy::BoundedCatchUp => {
let cutoff = wall_now_secs.saturating_sub(schedule.max_lag_secs);
let skipped_by_cutoff = if first_due >= cutoff {
0
} else {
((cutoff - first_due + schedule.interval_secs - 1) / schedule.interval_secs) as u64
}
.min(total_count);
let remaining = total_count.saturating_sub(skipped_by_cutoff);
if remaining == 0 {
return Some(DueScheduledTimes {
scheduled_times_secs: vec![],
skipped: total_count,
});
}
// max_lag_secs decides which missed scheduled times are recent enough to
// run; max_runs caps how many of those times we execute
// back-to-back in one scheduler pass.
let keep_count = remaining.min(schedule.max_runs as u64);
let keep_start = skipped_by_cutoff + remaining.saturating_sub(keep_count);
let scheduled_times_secs = (0..keep_count)
.map(|i| first_due + (keep_start as i64 + i as i64) * schedule.interval_secs)
.collect::<Vec<_>>();
Some(DueScheduledTimes {
scheduled_times_secs,
skipped: total_count - keep_count,
})
}
}
}
/// Ceils `time` to the next `anchor + k * interval` boundary.
pub fn ceil_to_boundary(time: i64, anchor: i64, interval: i64) -> i64 {
if interval <= 0 {
return time;
}
if time <= anchor {
return anchor;
}
let diff = i128::from(time) - i128::from(anchor);
let interval = i128::from(interval);
let k = (diff + interval - 1) / interval;
let boundary = i128::from(anchor) + k * interval;
boundary.clamp(i128::from(i64::MIN), i128::from(i64::MAX)) as i64
}
#[cfg(test)]
mod test {
use super::*;
fn schedule(
start: i64,
policy: FlowMissedTickPolicy,
max_runs: u32,
max_lag_secs: i64,
) -> EvalSchedule {
EvalSchedule {
interval_secs: 60,
anchor_secs: 0,
start_secs: start,
missed_tick_policy: policy,
max_runs,
max_lag_secs,
}
}
fn config(policy: FlowMissedTickPolicy) -> FlowScheduleConfig {
FlowScheduleConfig {
anchor_secs: 10,
start_secs: 70,
missed_tick_policy: policy,
catchup_max_runs: 4,
catchup_max_lag_secs: 600,
}
}
#[test]
fn ceil_to_boundary_handles_anchor_and_interval_edges() {
assert_eq!(ceil_to_boundary(-10, 0, 60), 0);
assert_eq!(ceil_to_boundary(0, 0, 60), 0);
assert_eq!(ceil_to_boundary(1, 0, 60), 60);
assert_eq!(ceil_to_boundary(60, 0, 60), 60);
assert_eq!(ceil_to_boundary(101, 100, 60), 160);
assert_eq!(ceil_to_boundary(50, 0, 0), 50);
assert_eq!(ceil_to_boundary(i64::MAX, 0, 60), i64::MAX);
assert_eq!(ceil_to_boundary(i64::MAX - 1, i64::MIN, 60), i64::MAX);
}
#[test]
fn from_config_maps_typed_config_and_defaults() {
assert!(EvalSchedule::from_config(None, None).unwrap().is_none());
assert!(EvalSchedule::from_config(Some(0), None).is_err());
let from_typed =
EvalSchedule::from_config(Some(300), Some(&config(FlowMissedTickPolicy::Skip)))
.unwrap()
.unwrap();
assert_eq!(from_typed.interval_secs, 300);
assert_eq!(from_typed.anchor_secs, 10);
assert_eq!(from_typed.start_secs, 70);
assert_eq!(from_typed.missed_tick_policy, FlowMissedTickPolicy::Skip);
assert_eq!(from_typed.max_runs, 4);
assert_eq!(from_typed.max_lag_secs, 600);
let defaulted = EvalSchedule::from_config(Some(300), None).unwrap().unwrap();
assert_eq!(defaulted.start_secs, 0);
assert_eq!(defaulted.max_runs, 3);
assert_eq!(defaulted.max_lag_secs, 900);
}
#[test]
fn from_config_rejects_invalid_catchup_limits() {
let mut c = config(FlowMissedTickPolicy::BoundedCatchUp);
c.catchup_max_runs = 0;
assert!(EvalSchedule::from_config(Some(300), Some(&c)).is_err());
let mut c = config(FlowMissedTickPolicy::BoundedCatchUp);
c.catchup_max_lag_secs = 0;
assert!(EvalSchedule::from_config(Some(300), Some(&c)).is_err());
}
#[test]
fn next_scheduled_time_after_respects_start_sequence() {
let s = schedule(50, FlowMissedTickPolicy::BoundedCatchUp, 3, 300);
assert_eq!(s.next_scheduled_time_after(0), 50);
assert_eq!(s.next_scheduled_time_after(50), 110);
assert_eq!(s.next_scheduled_time_after(100), 110);
}
#[test]
fn due_scheduled_time_selection_handles_empty_and_start_boundary() {
let s = schedule(120, FlowMissedTickPolicy::BoundedCatchUp, 10, 3600);
assert_eq!(
select_due_scheduled_times(&s, 0, 100)
.unwrap()
.scheduled_times_secs,
Vec::<i64>::new()
);
assert_eq!(
select_due_scheduled_times(&s, 0, 300)
.unwrap()
.scheduled_times_secs,
vec![120, 180, 240, 300]
);
}
#[test]
fn bounded_catch_up_applies_lag_and_max_runs() {
let s = schedule(0, FlowMissedTickPolicy::BoundedCatchUp, 2, 180);
let due = select_due_scheduled_times(&s, 0, 600).unwrap();
assert_eq!(due.scheduled_times_secs, vec![540, 600]);
assert_eq!(due.skipped, 8);
}
#[test]
fn bounded_catch_up_can_skip_all_due_scheduled_times() {
let s = schedule(0, FlowMissedTickPolicy::BoundedCatchUp, 3, 30);
let due = select_due_scheduled_times(&s, 0, 100).unwrap();
assert!(due.scheduled_times_secs.is_empty());
assert_eq!(due.skipped, 1);
}
#[test]
fn skip_policy_keeps_only_latest_due_scheduled_time() {
let s = schedule(0, FlowMissedTickPolicy::Skip, 5, 3600);
let due = select_due_scheduled_times(&s, 0, 300).unwrap();
assert_eq!(due.scheduled_times_secs, vec![300]);
assert_eq!(due.skipped, 4);
}
#[test]
fn huge_missed_gap_allocates_only_kept_scheduled_times() {
let s = schedule(0, FlowMissedTickPolicy::BoundedCatchUp, 5, 3600);
let due = select_due_scheduled_times(&s, 0, 86400).unwrap();
assert_eq!(
due.scheduled_times_secs,
vec![86160, 86220, 86280, 86340, 86400]
);
assert_eq!(due.skipped, 1435);
}
}

View File

@@ -45,6 +45,7 @@ use tokio::time::Instant;
use crate::batching_mode::BatchingModeOptions;
use crate::batching_mode::checkpoint::checkpoint_mode_label;
use crate::batching_mode::eval_schedule::{EvalSchedule, select_due_scheduled_times};
use crate::batching_mode::frontend_client::{FrontendClient, PeerDesc};
use crate::batching_mode::state::{
CheckpointMode, DirtyTimeWindows, FilterExprInfo, TaskState, to_df_literal,
@@ -70,6 +71,14 @@ use crate::{Error, FlowId};
mod ckpt;
mod inc;
/// Returns the current wall-clock Unix timestamp in seconds.
fn wall_clock_unix_secs() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i64
}
/// The task's config, immutable once created
#[derive(Clone)]
pub struct TaskConfig {
@@ -86,6 +95,8 @@ pub struct TaskConfig {
pub query_type: QueryType,
pub batch_opts: Arc<BatchingModeOptions>,
pub flow_eval_interval: Option<Duration>,
/// Typed schedule configuration, pre-parsed at task creation time.
pub eval_schedule: Option<EvalSchedule>,
}
fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
@@ -156,6 +167,8 @@ pub struct TaskArgs<'a> {
pub shutdown_rx: oneshot::Receiver<()>,
pub batch_opts: Arc<BatchingModeOptions>,
pub flow_eval_interval: Option<Duration>,
/// Typed schedule configuration pre-parsed from `CreateFlowArgs`.
pub eval_schedule: Option<EvalSchedule>,
}
pub struct PlanInfo {
@@ -238,6 +251,7 @@ impl BatchingTask {
shutdown_rx,
batch_opts,
flow_eval_interval,
eval_schedule,
}: TaskArgs<'_>,
) -> Result<Self, Error> {
let mut state = TaskState::with_dirty_time_windows(
@@ -265,6 +279,7 @@ impl BatchingTask {
query_type: determine_query_type(query, &query_ctx)?,
batch_opts,
flow_eval_interval,
eval_schedule,
}),
state: Arc::new(RwLock::new(state)),
execution_lock: Arc::new(Mutex::new(())),
@@ -275,6 +290,23 @@ impl BatchingTask {
self.state.read().unwrap().last_execution_time_millis()
}
/// Collect flow-related extensions from the task's query context that should be
/// forwarded to the frontend (e.g. scheduled time).
fn frontend_extensions(&self) -> HashMap<String, String> {
let ctx = self.state.read().unwrap();
let all = ctx.query_ctx.extensions();
let mut flow_exts = HashMap::new();
// Propagate the scheduled time extension if present so that frontend
// execution can use the same logical time.
if let Some(v) = all.get(query::options::FLOW_SCHEDULED_TIME_MILLIS) {
flow_exts.insert(
query::options::FLOW_SCHEDULED_TIME_MILLIS.to_string(),
v.clone(),
);
}
flow_exts
}
/// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set)
///
/// useful for flush_flow to flush dirty time windows range
@@ -329,7 +361,7 @@ impl BatchingTask {
/// Validates that the sink table schema can accept this flow's output.
///
/// This is a dry-run of the same schema matching logic used by runtime insert-plan
/// This is a dry-run of the same schema matching logic used by insert-plan
/// generation, but without adding dirty-window filters or executing the query. It is used
/// during CREATE FLOW to catch existing sink table mismatches early.
pub async fn validate_sink_table_schema(&self, engine: &QueryEngineRef) -> Result<(), Error> {
@@ -596,9 +628,15 @@ impl BatchingTask {
let extensions = self
.build_flow_query_extensions(incremental_safe, coverage.is_incremental_delta())
.await?;
let frontend_extensions = self.frontend_extensions();
let extension_refs = extensions
.iter()
.map(|(key, value)| (*key, value.as_str()))
.chain(
frontend_extensions
.iter()
.map(|(key, value)| (key.as_str(), value.as_str())),
)
.collect::<Vec<_>>();
let query_mode = if extensions
.iter()
@@ -909,37 +947,202 @@ impl BatchingTask {
/// start executing query in a loop, break when receive shutdown signal
///
/// any error will be logged when executing query
/// any error will be logged when executing query.
///
/// Dispatches to:
/// - scheduled loop when `flow_eval_interval.is_some()`
/// - adaptive dirty-window loop otherwise
pub async fn start_executing_loop(
&self,
engine: QueryEngineRef,
frontend_client: Arc<FrontendClient>,
) {
if self.config.flow_eval_interval.is_some() {
self.start_scheduled_loop(engine, frontend_client).await;
} else {
self.start_adaptive_loop(engine, frontend_client).await;
}
}
/// Scheduled batching loop for flows with `EVAL INTERVAL`.
///
/// Uses the pre-parsed `EvalSchedule` from `TaskConfig` and selects due
/// scheduled times using bounded catch-up semantics. Each scheduled time is the
/// scheduled evaluation time used as logical `now()` for that attempt.
/// Each attempt temporarily sets `flow.scheduled_time_millis` on the
/// task's `QueryContext` and executes under the existing `execution_lock`.
/// After every attempt (success, no-op, or failure) the in-memory
/// cursor advances.
async fn start_scheduled_loop(
&self,
engine: QueryEngineRef,
frontend_client: Arc<FrontendClient>,
) {
let flow_id_str = self.config.flow_id.to_string();
let schedule = match &self.config.eval_schedule {
Some(s) => s.clone(),
None => {
let eval_interval_secs = self
.config
.flow_eval_interval
.map(|d| d.as_secs() as i64)
.expect("checked by caller");
// Fallback: no typed config provided. Compute defaults
// anchored at epoch/start=0.
match EvalSchedule::from_config(Some(eval_interval_secs), None) {
Ok(Some(s)) => s,
Ok(None) => {
warn!(
"Flow {}: EVAL INTERVAL set but no schedule parsed; exiting loop",
flow_id_str
);
return;
}
Err(e) => {
warn!(
"Flow {}: Failed to parse eval schedule: {}; exiting loop",
flow_id_str, e
);
return;
}
}
}
};
// Initial cursor is one interval before start so the first due
// scheduled time is `start_secs`.
let mut cursor_secs = schedule.start_secs.saturating_sub(schedule.interval_secs);
info!(
"Flow {}: entering scheduled loop, interval={}s, start={}, anchor={}, policy={:?}, max_runs={}, max_lag={}s",
flow_id_str,
schedule.interval_secs,
schedule.start_secs,
schedule.anchor_secs,
schedule.missed_tick_policy,
schedule.max_runs,
schedule.max_lag_secs,
);
loop {
if self.is_shutdown_signaled() {
break;
}
let wall_now_secs = wall_clock_unix_secs();
let due = match select_due_scheduled_times(&schedule, cursor_secs, wall_now_secs) {
Some(d) => d,
None => {
warn!(
"Flow {}: Invalid schedule (interval <= 0), exiting loop",
flow_id_str
);
return;
}
};
if due.scheduled_times_secs.is_empty() {
if due.skipped > 0 {
warn!(
"Flow {}: all {} due scheduled times skipped by max-lag, advancing cursor to wall-clock ({wall_now_secs}) to avoid re-skipping",
flow_id_str, due.skipped
);
cursor_secs = wall_now_secs;
continue;
}
// No due yet — sleep until the next scheduled time.
let next = schedule.next_scheduled_time_after(cursor_secs);
if next <= wall_now_secs {
// Shouldn't happen given select_due_scheduled_times returned empty,
// but guard against clock skew / logic error.
cursor_secs = wall_now_secs;
continue;
}
let wait_secs = (next - wall_now_secs) as u64;
let wait_dur = Duration::from_secs(wait_secs);
debug!(
"Flow {}: no due scheduled times, sleeping for {}s until next scheduled time at {}",
flow_id_str, wait_secs, next
);
tokio::time::sleep(wait_dur).await;
continue;
}
if due.skipped > 0 {
info!(
"Flow {}: {} due scheduled times, {} skipped (catch-up)",
flow_id_str,
due.scheduled_times_secs.len(),
due.skipped
);
}
// Execute scheduled times oldest → newest.
for scheduled_time_secs in &due.scheduled_times_secs {
if self.is_shutdown_signaled() {
break;
}
METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
.with_label_values(&[&flow_id_str])
.inc();
let outcome = self
.execute_once_serialized_at_scheduled_time(
&engine,
&frontend_client,
*scheduled_time_secs,
)
.await;
// Advance cursor regardless of outcome.
cursor_secs = *scheduled_time_secs;
match outcome.result {
Ok(Some((rows, elapsed))) => {
debug!(
"Flow {}: scheduled time {} completed, rows={}, elapsed={:?}",
flow_id_str, scheduled_time_secs, rows, elapsed
);
}
Ok(None) => {
debug!(
"Flow {}: scheduled time {} produced no query (no dirty signal or no-op)",
flow_id_str, scheduled_time_secs
);
}
Err(err) => {
warn!(
"Flow {}: scheduled time {} failed: {:?}",
flow_id_str, scheduled_time_secs, err
);
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
.with_label_values(&[&flow_id_str])
.inc();
// Dirty-window restoration is handled by the
// existing `handle_executed_query_failure` inside
// `execute_once_unlocked`.
}
}
}
}
}
/// Existing adaptive dirty-window loop for flows without `EVAL INTERVAL`.
async fn start_adaptive_loop(
&self,
engine: QueryEngineRef,
frontend_client: Arc<FrontendClient>,
) {
let flow_id_str = self.config.flow_id.to_string();
let mut max_window_cnt = None;
let mut interval = self
.config
.flow_eval_interval
.map(|d| tokio::time::interval(d));
if let Some(tick) = &mut interval {
tick.tick().await; // pass the first tick immediately
}
loop {
// first check if shutdown signal is received
// if so, break the loop
{
let mut state = self.state.write().unwrap();
match state.shutdown_rx.try_recv() {
Ok(()) => break,
Err(TryRecvError::Closed) => {
warn!(
"Unexpected shutdown flow {}, shutdown anyway",
self.config.flow_id
);
break;
}
Err(TryRecvError::Empty) => (),
}
if self.is_shutdown_signaled() {
break;
}
METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
.with_label_values(&[&flow_id_str])
@@ -952,46 +1155,36 @@ impl BatchingTask {
.await;
match outcome.result {
// normal execute, sleep for some time before doing next query
Ok(Some(_)) => {
// can increase max_window_cnt to query more windows next time
max_window_cnt = max_window_cnt.map(|cnt| {
(cnt + 1).min(self.config.batch_opts.experimental_max_filter_num_per_query)
});
// here use proper ticking if set eval interval
if let Some(eval_interval) = &mut interval {
eval_interval.tick().await;
} else {
// if not explicitly set, just automatically calculate next start time
// using time window size and more args
let sleep_until = {
let state = self.state.write().unwrap();
let sleep_until = {
let state = self.state.write().unwrap();
let time_window_size = self
.config
.time_window_expr
.as_ref()
.and_then(|t| *t.time_window_size());
let time_window_size = self
.config
.time_window_expr
.as_ref()
.and_then(|t| *t.time_window_size());
let prefer_short_incremental_cadence = state.checkpoint_mode()
== CheckpointMode::Incremental
&& !state.is_incremental_disabled();
let prefer_short_incremental_cadence = state.checkpoint_mode()
== CheckpointMode::Incremental
&& !state.is_incremental_disabled();
state.get_next_start_query_time(
self.config.flow_id,
&time_window_size,
min_refresh,
Some(self.config.batch_opts.query_timeout),
self.config.batch_opts.experimental_max_filter_num_per_query,
prefer_short_incremental_cadence,
)
};
tokio::time::sleep_until(sleep_until).await;
state.get_next_start_query_time(
self.config.flow_id,
&time_window_size,
min_refresh,
Some(self.config.batch_opts.query_timeout),
self.config.batch_opts.experimental_max_filter_num_per_query,
prefer_short_incremental_cadence,
)
};
tokio::time::sleep_until(sleep_until).await;
}
// no new data, sleep for some time before checking for new data
Ok(None) => {
debug!(
"Flow id = {:?} found no new data, sleep for {:?} then continue",
@@ -1000,7 +1193,6 @@ impl BatchingTask {
tokio::time::sleep(min_refresh).await;
continue;
}
// TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
Err(err) => {
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
.with_label_values(&[&flow_id_str])
@@ -1008,23 +1200,87 @@ impl BatchingTask {
match outcome.new_query {
Some(query) => {
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {}", self.config.flow_id, query.plan);
// TODO(discord9): add some backoff here? half the query time window or what
// backoff meaning use smaller `max_window_cnt` for next query
// since last query failed, we should not try to query too many windows
max_window_cnt = Some(1);
}
None => {
common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id)
}
}
// also sleep for a little while before try again to prevent flooding logs
tokio::time::sleep(min_refresh).await;
}
}
}
}
/// Check whether the shutdown signal has been received.
fn is_shutdown_signaled(&self) -> bool {
let mut state = self.state.write().unwrap();
match state.shutdown_rx.try_recv() {
Ok(()) | Err(TryRecvError::Closed) => true,
Err(TryRecvError::Empty) => false,
}
}
/// Execute one scheduled attempt, temporarily setting
/// `flow.scheduled_time_millis` on the task's QueryContext so
/// SQL/TQL `now()` resolves to the logical scheduled time.
///
/// The extension is removed after the attempt so a later manual
/// `flush_flow` does not reuse a stale scheduled time.
async fn execute_once_serialized_at_scheduled_time(
&self,
engine: &QueryEngineRef,
frontend_client: &Arc<FrontendClient>,
scheduled_time_secs: i64,
) -> ExecuteOnceOutcome {
let _execution_guard = self.execution_lock.lock().await;
struct QueryContextRestoreGuard {
state: Arc<RwLock<TaskState>>,
old_ctx: Option<QueryContextRef>,
}
impl Drop for QueryContextRestoreGuard {
fn drop(&mut self) {
let Some(old_ctx) = self.old_ctx.take() else {
return;
};
if let Ok(mut state) = self.state.write() {
state.query_ctx = old_ctx;
}
}
}
// Clone the current QueryContext and add the scheduled time
// extension, then swap it into the task state for this attempt.
let old_ctx = {
let mut state = self.state.write().unwrap();
let old = state.query_ctx.clone();
let mut new_ctx = (*old).clone();
new_ctx.set_extension(
query::options::FLOW_SCHEDULED_TIME_MILLIS,
(scheduled_time_secs.saturating_mul(1000)).to_string(),
);
state.query_ctx = Arc::new(new_ctx);
old
};
let restore_guard = QueryContextRestoreGuard {
state: self.state.clone(),
old_ctx: Some(old_ctx),
};
let outcome = self
.execute_once_unlocked(engine, frontend_client, None)
.await;
// Restore while still holding `execution_lock` so no future manual
// flush can observe the temporary scheduled time. The guard also
// restores during unwind/cancellation.
drop(restore_guard);
outcome
}
/// Generate the create table SQL
///
/// the auto created table will automatically added a `update_at` Milliseconds DEFAULT now() column in the end
@@ -1088,7 +1344,7 @@ impl BatchingTask {
(None, QueryType::Sql) if self.config.flow_eval_interval.is_none() => {
return UnexpectedSnafu {
reason: format!(
"Flow id={} reached runtime without a time-window expression or EVAL INTERVAL; create-flow validation should have rejected it",
"Flow id={} reached execution without a time-window expression or EVAL INTERVAL; create-flow validation should have rejected it",
self.config.flow_id
),
}

View File

@@ -26,10 +26,13 @@ use common_recordbatch::RecordBatch;
use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry};
use datatypes::data_type::ConcreteDataType as CDT;
use datatypes::schema::ColumnSchema;
use datatypes::vectors::{TimestampMillisecondVector, UInt32Vector, VectorRef};
use datatypes::vectors::{
TimestampMillisecondVector, TimestampNanosecondVector, UInt32Vector, VectorRef,
};
use pretty_assertions::assert_eq;
use query::options::{
FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY, QueryOptions,
FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY, FLOW_SCHEDULED_TIME_MILLIS,
QueryOptions,
};
use session::context::QueryContext;
use snafu::ResultExt;
@@ -115,6 +118,7 @@ async fn new_test_task_engine_and_plan_with_query_and_opts(
shutdown_rx: rx,
batch_opts,
flow_eval_interval: None,
eval_schedule: None,
})
.unwrap();
@@ -240,6 +244,7 @@ async fn new_time_window_test_task_with_query(query: &str) -> TestTaskParts {
shutdown_rx: rx,
batch_opts: incremental_batch_opts(),
flow_eval_interval: None,
eval_schedule: None,
})
.unwrap();
@@ -369,6 +374,352 @@ async fn assert_unscoped_failure_restore(
);
}
// --- scheduled-time QueryContext restore regression tests ---
/// Register a sink table whose schema matches the output of a `date_bin`
/// time-window-expression query (columns: `number` uint32, `time_window` timestamp).
fn register_twe_sink(query_engine: &QueryEngineRef, table_name: &str, table_id: u32) {
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new("number", CDT::uint32_datatype(), false),
ColumnSchema::new("time_window", CDT::timestamp_millisecond_datatype(), false)
.with_time_index(true),
]));
let columns: Vec<VectorRef> = vec![
Arc::new(UInt32Vector::from_slice([1_u32])),
Arc::new(TimestampMillisecondVector::from_slice([0_i64])),
];
let recordbatch = RecordBatch::new(schema, columns).unwrap();
let table = MemTable::table(table_name, recordbatch);
let request = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
table_id,
table,
};
let catalog_manager = query_engine.engine_state().catalog_manager();
let memory_catalog = catalog_manager
.as_any()
.downcast_ref::<MemoryCatalogManager>()
.unwrap();
memory_catalog.register_table_sync(request).unwrap();
}
fn register_scheduled_now_sink(query_engine: &QueryEngineRef, table_name: &str, table_id: u32) {
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new("ts", CDT::timestamp_nanosecond_datatype(), false).with_time_index(true),
ColumnSchema::new("number", CDT::uint32_datatype(), false),
]));
let columns: Vec<VectorRef> = vec![
Arc::new(TimestampNanosecondVector::from_slice([0_i64])),
Arc::new(UInt32Vector::from_slice([1_u32])),
];
let recordbatch = RecordBatch::new(schema, columns).unwrap();
let table = MemTable::table(table_name, recordbatch);
let request = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
table_id,
table,
};
let catalog_manager = query_engine.engine_state().catalog_manager();
let memory_catalog = catalog_manager
.as_any()
.downcast_ref::<MemoryCatalogManager>()
.unwrap();
memory_catalog.register_table_sync(request).unwrap();
}
struct CaptureScheduledNowHandler {
expected_extension: String,
captured_sql: Arc<std::sync::Mutex<Option<String>>>,
query_engine: QueryEngineRef,
}
#[async_trait::async_trait]
impl crate::batching_mode::frontend_client::GrpcQueryHandlerWithBoxedError
for CaptureScheduledNowHandler
{
async fn do_query(
&self,
query: api::v1::greptime_request::Request,
ctx: QueryContextRef,
) -> std::result::Result<Output, BoxedError> {
assert_eq!(
ctx.extension(FLOW_SCHEDULED_TIME_MILLIS),
Some(self.expected_extension.as_str())
);
let api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
query: Some(api::v1::query_request::Query::Sql(sql)),
..
}) = query
else {
panic!("expected scheduled SQL flow to send a SQL query, got {query:?}");
};
let planned = sql_to_df_plan(ctx, self.query_engine.clone(), &sql, true)
.await
.unwrap();
assert_sql_uses_scheduled_time(&planned.to_string());
*self.captured_sql.lock().unwrap() = Some(sql);
Ok(Output::new_with_affected_rows(1))
}
}
fn assert_sql_uses_scheduled_time(sql: &str) {
let lower = sql.to_ascii_lowercase();
assert!(!lower.contains("now()"), "SQL still contains now(): {sql}");
assert!(
sql.contains("2023-11-14") || sql.contains("1700000000"),
"SQL does not contain the scheduled date or epoch value: {sql}"
);
assert!(
sql.contains("22:13:20") || sql.contains("1700000000"),
"SQL does not contain the scheduled time or epoch value: {sql}"
);
}
/// After a scheduled-time attempt fails (missing sink), the
/// `FLOW_SCHEDULED_TIME_MILLIS` extension must not leak into
/// `TaskState.query_ctx` or `frontend_extensions()`.
#[tokio::test]
async fn test_scheduled_time_ctx_restored_on_error() {
let TestTaskParts {
task, query_engine, ..
} = new_test_task_engine_and_plan_with_query(
"SELECT number, ts FROM numbers_with_ts",
"missing_sink",
)
.await;
let (frontend_client, _handler) =
FrontendClient::from_empty_grpc_handler(QueryOptions::default());
let frontend_client = Arc::new(frontend_client);
// Before: no scheduled time extension
assert_eq!(
task.state
.read()
.unwrap()
.query_ctx
.extension(FLOW_SCHEDULED_TIME_MILLIS),
None
);
assert!(
!task
.frontend_extensions()
.contains_key(FLOW_SCHEDULED_TIME_MILLIS)
);
let scheduled_time_secs = 1700000000i64;
let outcome = task
.execute_once_serialized_at_scheduled_time(
&query_engine,
&frontend_client,
scheduled_time_secs,
)
.await;
// Missing sink table → gen_insert_plan_unlocked should fail.
assert!(
outcome.result.is_err(),
"Expected an error (missing sink), got {:?}",
outcome.result
);
// After: extension must be restored to absent.
assert_eq!(
task.state
.read()
.unwrap()
.query_ctx
.extension(FLOW_SCHEDULED_TIME_MILLIS),
None,
"FLOW_SCHEDULED_TIME_MILLIS leaked into query_ctx after error"
);
assert!(
!task
.frontend_extensions()
.contains_key(FLOW_SCHEDULED_TIME_MILLIS),
"FLOW_SCHEDULED_TIME_MILLIS leaked into frontend_extensions after error"
);
}
/// After a scheduled-time attempt returns `Ok(None)` (no dirty windows),
/// the `FLOW_SCHEDULED_TIME_MILLIS` extension must not leak into
/// `TaskState.query_ctx`.
#[tokio::test]
async fn test_scheduled_time_ctx_restored_on_no_dirty_windows() {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
let plan_query = "SELECT number, date_bin(INTERVAL '5 second', ts) AS time_window \
FROM numbers_with_ts GROUP BY time_window, number";
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), plan_query, true)
.await
.unwrap();
let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
&plan,
query_engine.engine_state().catalog_manager().clone(),
ctx.clone(),
)
.await
.unwrap();
let time_window_expr = time_window_expr
.map(|expr| {
TimeWindowExpr::from_expr(
&expr,
&column_name,
&df_schema,
&query_engine.engine_state().session_state(),
)
})
.transpose()
.unwrap();
let sink_table_name = "twe_sink_for_ctx_restore";
register_twe_sink(&query_engine, sink_table_name, 9101);
let (_tx, rx) = tokio::sync::oneshot::channel();
let task = BatchingTask::try_new(TaskArgs {
flow_id: 1,
query: plan_query,
plan: plan.clone(),
time_window_expr,
expire_after: None,
sink_table_name: [
"greptime".to_string(),
"public".to_string(),
sink_table_name.to_string(),
],
source_table_names: vec![[
"greptime".to_string(),
"public".to_string(),
"numbers_with_ts".to_string(),
]],
query_ctx: ctx,
catalog_manager: query_engine.engine_state().catalog_manager().clone(),
shutdown_rx: rx,
batch_opts: incremental_batch_opts(),
flow_eval_interval: None,
eval_schedule: None,
})
.unwrap();
let (frontend_client, _handler) =
FrontendClient::from_empty_grpc_handler(QueryOptions::default());
let frontend_client = Arc::new(frontend_client);
// Before: no scheduled time extension
assert_eq!(
task.state
.read()
.unwrap()
.query_ctx
.extension(FLOW_SCHEDULED_TIME_MILLIS),
None
);
let scheduled_time_secs = 1700000000i64;
let outcome = task
.execute_once_serialized_at_scheduled_time(
&query_engine,
&frontend_client,
scheduled_time_secs,
)
.await;
// No dirty windows → scoped repair returns None → outcome is Ok(None).
assert!(
matches!(outcome.result, Ok(None)),
"Expected Ok(None) (no dirty windows), got {:?}",
outcome.result
);
// After: extension must be restored to absent.
assert_eq!(
task.state
.read()
.unwrap()
.query_ctx
.extension(FLOW_SCHEDULED_TIME_MILLIS),
None,
"FLOW_SCHEDULED_TIME_MILLIS leaked into query_ctx after Ok(None)"
);
}
#[tokio::test]
async fn test_scheduled_time_now_is_bound_to_selected_attempt() {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
let query = "SELECT date_trunc('second', now()) AS ts, number FROM numbers_with_ts";
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), query, true)
.await
.unwrap();
let sink_table_name = "scheduled_now_sink";
register_scheduled_now_sink(&query_engine, sink_table_name, 9102);
let (_tx, rx) = tokio::sync::oneshot::channel();
let task = BatchingTask::try_new(TaskArgs {
flow_id: 1,
query,
plan,
time_window_expr: None,
expire_after: None,
sink_table_name: [
"greptime".to_string(),
"public".to_string(),
sink_table_name.to_string(),
],
source_table_names: vec![[
"greptime".to_string(),
"public".to_string(),
"numbers_with_ts".to_string(),
]],
query_ctx: ctx,
catalog_manager: query_engine.engine_state().catalog_manager().clone(),
shutdown_rx: rx,
batch_opts: incremental_batch_opts(),
flow_eval_interval: Some(Duration::from_secs(1)),
eval_schedule: None,
})
.unwrap();
let scheduled_time_secs = 1_700_000_000_i64;
let expected_extension = (scheduled_time_secs * 1000).to_string();
let captured_sql = Arc::new(std::sync::Mutex::new(None));
let handler: Arc<dyn crate::batching_mode::frontend_client::GrpcQueryHandlerWithBoxedError> =
Arc::new(CaptureScheduledNowHandler {
expected_extension,
captured_sql: captured_sql.clone(),
query_engine: query_engine.clone(),
});
let frontend_client = Arc::new(FrontendClient::from_grpc_handler(
Arc::downgrade(&handler),
QueryOptions::default(),
));
let outcome = task
.execute_once_serialized_at_scheduled_time(
&query_engine,
&frontend_client,
scheduled_time_secs,
)
.await;
assert!(
matches!(outcome.result, Ok(Some((1, _)))),
"scheduled attempt should execute once, got {:?}",
outcome.result
);
let sent_sql = captured_sql
.lock()
.unwrap()
.clone()
.expect("frontend handler should capture generated SQL");
assert!(!sent_sql.is_empty());
}
fn output_with_region_watermarks(
watermarks: impl IntoIterator<Item = (u64, Option<u64>)>,
) -> OutputWithMetrics {
@@ -1668,6 +2019,7 @@ async fn test_prepare_plan_for_incremental_disables_on_non_aggregate() {
shutdown_rx: rx,
batch_opts: incremental_batch_opts(),
flow_eval_interval: None,
eval_schedule: None,
})
.unwrap();
@@ -1745,6 +2097,7 @@ async fn test_unsafe_incremental_plan_skip_restores_dirty_without_query() {
shutdown_rx: rx,
batch_opts: incremental_batch_opts(),
flow_eval_interval: None,
eval_schedule: None,
})
.unwrap();
@@ -1833,6 +2186,7 @@ async fn test_prepare_plan_for_incremental_group_by_without_merge_columns_uses_o
shutdown_rx: rx,
batch_opts: incremental_batch_opts(),
flow_eval_interval: None,
eval_schedule: None,
})
.unwrap();
@@ -1916,7 +2270,7 @@ async fn test_unscoped_failure_restores_consumed_dirty_signal() {
}
#[tokio::test]
async fn test_unscoped_runtime_invariant_error_preserves_dirty_signal() {
async fn test_unscoped_execution_invariant_error_preserves_dirty_signal() {
let TestTaskParts {
task, query_engine, ..
} = new_test_task_engine_and_plan_with_query(
@@ -1936,7 +2290,7 @@ async fn test_unscoped_runtime_invariant_error_preserves_dirty_signal() {
let err = match result {
Err(err) => err,
Ok(_) => panic!("runtime should reject SQL without TWE or EVAL INTERVAL"),
Ok(_) => panic!("execution should reject SQL without TWE or EVAL INTERVAL"),
};
assert!(matches!(err, Error::Unexpected { .. }), "{err}");
assert!(

View File

@@ -882,10 +882,16 @@ pub async fn sql_to_df_plan(
sql: &str,
optimize: bool,
) -> Result<LogicalPlan, Error> {
let stmts =
ParserContext::create_with_dialect(sql, query_ctx.sql_dialect(), ParseOptions::default())
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let scheduled_time = query::options::parse_scheduled_time_datetime(&query_ctx.extensions())
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let stmts = ParserContext::create_with_dialect(
sql,
query_ctx.sql_dialect(),
ParseOptions { scheduled_time },
)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
ensure!(
stmts.len() == 1,

View File

@@ -827,6 +827,7 @@ async fn test_validate_sink_table_schema_rejects_existing_sink_missing_flow_colu
shutdown_rx,
batch_opts: Arc::new(BatchingModeOptions::default()),
flow_eval_interval: None,
eval_schedule: None,
})
.unwrap();

View File

@@ -62,7 +62,13 @@ pub async fn apply_df_optimizer(
context: "Fail to apply analyzer",
})?;
let ctx = OptimizerContext::new();
let mut ctx = OptimizerContext::new();
let scheduled_time = query::options::parse_scheduled_time_datetime(&query_ctx.extensions())
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
if let Some(dt) = scheduled_time {
ctx = ctx.with_query_execution_start_time(dt);
}
let optimizer = Optimizer::with_rules(vec![
Arc::new(OptimizeProjections::new()),
Arc::new(CommonSubexprEliminate::new()),

View File

@@ -40,6 +40,8 @@ pub struct CreateFlowArgs {
pub sql: String,
pub flow_options: HashMap<String, String>,
pub query_ctx: Option<QueryContext>,
/// Typed schedule configuration for `EVAL INTERVAL` flows.
pub eval_schedule: Option<common_meta::key::flow::flow_info::FlowScheduleConfig>,
}
pub trait FlowEngine {

View File

@@ -180,7 +180,18 @@ fn normalize_flow_bool_option(key: &str, value: &str) -> Result<String> {
fn validate_and_normalize_flow_options(
options: HashMap<String, String>,
eval_interval: Option<i64>,
) -> Result<HashMap<String, String>> {
// Reject non-positive eval_interval (zero or negative).
if let Some(secs) = eval_interval
&& secs <= 0
{
return InvalidSqlSnafu {
err_msg: format!("EVAL INTERVAL must be positive, got {secs} seconds"),
}
.fail();
}
options
.into_iter()
.map(|(key, value)| {
@@ -721,7 +732,20 @@ impl StatementExecutor {
mut expr: CreateFlowExpr,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
expr.flow_options = validate_and_normalize_flow_options(expr.flow_options)?;
let eval_interval_secs = expr.eval_interval.as_ref().map(|e| e.seconds);
// Reject non-positive eval_interval (zero or negative).
if let Some(secs) = eval_interval_secs
&& secs <= 0
{
return InvalidSqlSnafu {
err_msg: format!("EVAL INTERVAL must be positive, got {secs} seconds"),
}
.fail();
}
expr.flow_options =
validate_and_normalize_flow_options(expr.flow_options, eval_interval_secs)?;
let flow_type = self
.determine_flow_type(&expr, query_context.clone())
@@ -2578,7 +2602,7 @@ mod test {
#[test]
fn test_validate_and_normalize_flow_options_empty() {
assert!(
validate_and_normalize_flow_options(HashMap::new())
validate_and_normalize_flow_options(HashMap::new(), None)
.unwrap()
.is_empty()
);
@@ -2595,7 +2619,7 @@ mod test {
]);
assert_eq!(
validate_and_normalize_flow_options(options).unwrap(),
validate_and_normalize_flow_options(options, None).unwrap(),
HashMap::from([
(DEFER_ON_MISSING_SOURCE_KEY.to_string(), "true".to_string(),),
(
@@ -2608,24 +2632,27 @@ mod test {
#[test]
fn test_validate_and_normalize_flow_options_unknown_option() {
let err = validate_and_normalize_flow_options(HashMap::from([(
"foo".to_string(),
"bar".to_string(),
)]))
let err = validate_and_normalize_flow_options(
HashMap::from([("foo".to_string(), "bar".to_string())]),
None,
)
.unwrap_err();
assert!(
err.to_string()
.contains("unknown flow option 'foo', supported options: defer_on_missing_source, experimental_enable_incremental_read")
.contains("unknown flow option 'foo', supported options:")
);
}
#[test]
fn test_validate_and_normalize_flow_options_reserved_option() {
let err = validate_and_normalize_flow_options(HashMap::from([(
FlowType::FLOW_TYPE_KEY.to_string(),
FlowType::BATCHING.to_string(),
)]))
let err = validate_and_normalize_flow_options(
HashMap::from([(
FlowType::FLOW_TYPE_KEY.to_string(),
FlowType::BATCHING.to_string(),
)]),
None,
)
.unwrap_err();
assert!(
@@ -2636,10 +2663,13 @@ mod test {
#[test]
fn test_validate_and_normalize_flow_options_invalid_bool() {
let err = validate_and_normalize_flow_options(HashMap::from([(
DEFER_ON_MISSING_SOURCE_KEY.to_string(),
"not-a-bool".to_string(),
)]))
let err = validate_and_normalize_flow_options(
HashMap::from([(
DEFER_ON_MISSING_SOURCE_KEY.to_string(),
"not-a-bool".to_string(),
)]),
None,
)
.unwrap_err();
assert!(
@@ -2667,11 +2697,53 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;";
};
let expr =
expr_helper::to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
let err = validate_and_normalize_flow_options(expr.flow_options).unwrap_err();
let err = validate_and_normalize_flow_options(expr.flow_options, None).unwrap_err();
assert!(err.to_string().contains(
"unknown flow option 'access_key_id', supported options: defer_on_missing_source"
));
assert!(
err.to_string()
.contains("unknown flow option 'access_key_id'")
);
}
// --- Schedule option tests ---
#[test]
fn test_eval_interval_rejected_non_positive() {
// Zero eval_interval should be rejected.
let err = validate_and_normalize_flow_options(HashMap::new(), Some(0)).unwrap_err();
assert!(err.to_string().contains("EVAL INTERVAL must be positive"));
// Negative eval_interval should be rejected.
let err = validate_and_normalize_flow_options(HashMap::new(), Some(-5)).unwrap_err();
assert!(err.to_string().contains("EVAL INTERVAL must be positive"));
// Positive eval_interval should be accepted.
let result = validate_and_normalize_flow_options(HashMap::new(), Some(300));
assert!(result.is_ok());
}
#[test]
fn test_schedule_and_internal_keys_rejected_as_unknown_options() {
for key in [
"eval_interval_anchor",
"eval_interval_start",
"eval_interval_missed_tick_policy",
"eval_interval_catchup_max_runs",
"eval_interval_catchup_max_lag",
"__greptime_internal_eval_schedule",
] {
let err = validate_and_normalize_flow_options(
HashMap::from([(key.to_string(), "value".to_string())]),
Some(300),
)
.unwrap_err();
assert!(
err.to_string()
.contains(&format!("unknown flow option '{key}'")),
"unexpected error for {key}: {err}"
);
}
}
#[test]

View File

@@ -543,6 +543,19 @@ impl QueryEngine for DatafusionQueryEngine {
.config_options
.insert(config_options);
// Apply scheduled time from query context if present, so that `now()` /
// `current_timestamp()` functions evaluate against the logical scheduled time
// rather than wall-clock.
match crate::options::parse_scheduled_time_datetime(&query_ctx.extensions()) {
Ok(Some(scheduled_rt)) => {
state.execution_props_mut().query_execution_start_time = Some(scheduled_rt);
}
Ok(None) => {}
Err(err) => {
common_telemetry::warn!(err; "Ignoring invalid scheduled time query extension");
}
}
QueryEngineContext::new(state, query_ctx)
}

View File

@@ -14,8 +14,10 @@
use std::collections::HashMap;
use chrono::{DateTime, Utc};
use common_base::memory_limit::MemoryLimit;
use serde::{Deserialize, Serialize};
use session::context::QueryContextRef;
use store_api::storage::RegionId;
use table::metadata::TableId;
@@ -25,6 +27,10 @@ pub const FLOW_INCREMENTAL_AFTER_SEQS: &str = "flow.incremental_after_seqs";
pub const FLOW_INCREMENTAL_MODE: &str = "flow.incremental_mode";
pub const FLOW_RETURN_REGION_SEQ: &str = "flow.return_region_seq";
pub const FLOW_SINK_TABLE_ID: &str = "flow.sink_table_id";
/// Flow scheduler binding for the logical time of one scheduled attempt.
/// Query planning, SQL/TQL parsing, range-select rewrite and DataFusion
/// execution read this extension so `now()` is stable for the whole attempt.
pub const FLOW_SCHEDULED_TIME_MILLIS: &str = "flow.scheduled_time_millis";
/// Enable by default, set to false to explicitly disable.
pub const QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN: &str =
"query.enable_remote_dynamic_filter_pushdown";
@@ -279,6 +285,52 @@ fn parse_bool(option_name: &str, value: &str) -> Result<bool> {
}
}
/// Parse the scheduled time (in milliseconds since Unix epoch) from extensions.
///
/// Returns `Ok(None)` if the extension key is absent, `Ok(Some(millis))` on success,
/// or `Err` if the value is malformed.
pub fn parse_scheduled_time_millis(extensions: &HashMap<String, String>) -> Result<Option<i64>> {
match extensions.get(FLOW_SCHEDULED_TIME_MILLIS) {
Some(val) => val.parse::<i64>().map(Some).map_err(|_| {
invalid_query_context_extension(format!(
"Invalid value for {}: {}",
FLOW_SCHEDULED_TIME_MILLIS, val
))
}),
None => Ok(None),
}
}
/// Parse the scheduled time from extensions into a [`DateTime<Utc>`].
///
/// Returns `Ok(None)` if the extension key is absent, `Ok(Some(datetime))` on success,
/// or `Err` if the value is malformed. Millis that produce an out-of-range timestamp are
/// also rejected.
pub fn parse_scheduled_time_datetime(
extensions: &HashMap<String, String>,
) -> Result<Option<DateTime<Utc>>> {
match parse_scheduled_time_millis(extensions)? {
Some(millis) => DateTime::from_timestamp_millis(millis)
.map(Some)
.ok_or_else(|| {
invalid_query_context_extension(format!(
"Out-of-range timestamp for {}: {} ms",
FLOW_SCHEDULED_TIME_MILLIS, millis
))
}),
None => Ok(None),
}
}
/// Convenience helper: extract scheduled time from a [`QueryContextRef`] as a [`DateTime<Utc>`].
///
/// Errors are silently swallowed, returning `None`.
/// Use [`parse_scheduled_time_datetime`] when a `Result` is needed.
pub fn scheduled_time_from_ctx(query_ctx: &QueryContextRef) -> Option<DateTime<Utc>> {
let extensions = query_ctx.extensions();
parse_scheduled_time_datetime(&extensions).ok().flatten()
}
fn invalid_query_context_extension(reason: String) -> Error {
InvalidQueryContextExtensionSnafu { reason }.build()
}
@@ -551,4 +603,59 @@ mod flow_extension_tests {
Some(HashMap::from([(1, 10)]))
);
}
// --- scheduled time helper tests ---
#[test]
fn test_parse_scheduled_time_millis_absent() {
let exts = HashMap::new();
assert_eq!(parse_scheduled_time_millis(&exts).unwrap(), None);
}
#[test]
fn test_parse_scheduled_time_millis_valid() {
let exts = HashMap::from([(
FLOW_SCHEDULED_TIME_MILLIS.to_string(),
"1700000000000".to_string(),
)]);
assert_eq!(
parse_scheduled_time_millis(&exts).unwrap(),
Some(1700000000000)
);
}
#[test]
fn test_parse_scheduled_time_millis_malformed() {
let exts = HashMap::from([(
FLOW_SCHEDULED_TIME_MILLIS.to_string(),
"not-a-number".to_string(),
)]);
let err = parse_scheduled_time_millis(&exts).unwrap_err();
assert!(format!("{err}").contains(FLOW_SCHEDULED_TIME_MILLIS));
}
#[test]
fn test_parse_scheduled_time_datetime_valid() {
let exts = HashMap::from([(
FLOW_SCHEDULED_TIME_MILLIS.to_string(),
"1700000000000".to_string(),
)]);
let dt = parse_scheduled_time_datetime(&exts).unwrap().unwrap();
assert_eq!(dt.timestamp_millis(), 1700000000000);
}
#[test]
fn test_parse_scheduled_time_datetime_negative_millis() {
let exts = HashMap::from([(FLOW_SCHEDULED_TIME_MILLIS.to_string(), "-1".to_string())]);
let dt = parse_scheduled_time_datetime(&exts).unwrap().unwrap();
assert_eq!(dt.timestamp_millis(), -1);
}
#[test]
fn test_parse_scheduled_time_datetime_out_of_range() {
// i64::MAX millis is well beyond representable chrono::DateTime range
let exts = HashMap::from([(FLOW_SCHEDULED_TIME_MILLIS.to_string(), i64::MAX.to_string())]);
let err = parse_scheduled_time_datetime(&exts).unwrap_err();
assert!(format!("{err}").contains("Out-of-range"));
}
}

View File

@@ -125,12 +125,17 @@ pub struct QueryLanguageParser {}
impl QueryLanguageParser {
/// Try to parse SQL with GreptimeDB dialect, return the statement when success.
pub fn parse_sql(sql: &str, _query_ctx: &QueryContextRef) -> Result<QueryStatement> {
pub fn parse_sql(sql: &str, query_ctx: &QueryContextRef) -> Result<QueryStatement> {
let _timer = PARSE_SQL_ELAPSED.start_timer();
let mut statement =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.map_err(BoxedError::new)
.context(QueryParseSnafu { query: sql })?;
let scheduled_time =
crate::options::parse_scheduled_time_datetime(&query_ctx.extensions())?;
let mut statement = ParserContext::create_with_dialect(
sql,
&GreptimeDbDialect {},
ParseOptions { scheduled_time },
)
.map_err(BoxedError::new)
.context(QueryParseSnafu { query: sql })?;
if statement.len() != 1 {
MultipleStatementsSnafu {
query: sql.to_string(),
@@ -323,7 +328,7 @@ impl Alias {
#[cfg(test)]
mod test {
use session::context::QueryContext;
use session::context::{QueryContext, QueryContextBuilder};
use super::*;
@@ -338,6 +343,32 @@ mod test {
assert_eq!("SELECT * FROM t1", sql_stmt.to_string());
}
#[test]
fn parse_sql_tql_uses_scheduled_time_extension() {
let ctx = Arc::new(
QueryContextBuilder::default()
.set_extension(
crate::options::FLOW_SCHEDULED_TIME_MILLIS.to_string(),
"1700000000000".to_string(),
)
.build(),
);
let query = "TQL EVAL (now() - '10 minutes'::interval, now(), '1m') http_requests_total";
let stmt = QueryLanguageParser::parse_sql(query, &ctx).unwrap();
match stmt {
QueryStatement::Sql(sql::statements::statement::Statement::Tql(
sql::statements::tql::Tql::Eval(eval),
)) => {
assert_eq!(eval.start, "1699999400");
assert_eq!(eval.end, "1700000000");
assert_eq!(eval.step, "1m");
assert_eq!(eval.query, "http_requests_total");
}
_ => panic!("Expected TQL eval statement, got {stmt:?}"),
}
}
#[test]
fn parse_promql_timestamp() {
let cases = vec![

View File

@@ -24,6 +24,7 @@ use catalog::table_source::DfTableSourceProvider;
use common_error::ext::BoxedError;
use common_telemetry::tracing;
use datafusion::common::{DFSchema, plan_err};
use datafusion::execution::SessionStateBuilder;
use datafusion::execution::context::SessionState;
use datafusion::sql::planner::PlannerContext;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
@@ -85,6 +86,31 @@ impl DfLogicalPlanner {
}
}
/// Derive a [`SessionState`] whose [`ExecutionProps`] includes
/// `query_execution_start_time` if a scheduled time extension is present
/// in the query context.
fn derive_session_state_with_scheduled_time(
&self,
query_ctx: &QueryContextRef,
) -> Result<SessionState> {
let extensions = query_ctx.extensions();
match crate::options::parse_scheduled_time_datetime(&extensions)? {
Some(dt) => {
let execution_props = self
.session_state
.execution_props()
.clone()
.with_query_execution_start_time(dt);
Ok(
SessionStateBuilder::new_from_existing(self.session_state.clone())
.with_execution_props(execution_props)
.build(),
)
}
None => Ok(self.session_state.clone()),
}
}
/// Basically the same with `explain_to_plan` in DataFusion, but adapted to Greptime's
/// `plan_sql` to support Greptime Statements.
async fn explain_to_plan(
@@ -178,15 +204,16 @@ impl DfLogicalPlanner {
.fail()?;
}
let scheduled_state = self.derive_session_state_with_scheduled_time(&query_ctx)?;
let table_provider = DfTableSourceProvider::new(
self.engine_state.catalog_manager().clone(),
self.engine_state.disallow_cross_catalog_query(),
query_ctx.clone(),
Arc::new(DefaultPlanDecoder::new(
self.session_state.clone(),
scheduled_state.clone(),
&query_ctx,
)?),
self.session_state
scheduled_state
.config_options()
.sql_parser
.enable_ident_normalization,
@@ -194,7 +221,7 @@ impl DfLogicalPlanner {
let context_provider = DfContextProviderAdapter::try_new(
self.engine_state.clone(),
self.session_state.clone(),
scheduled_state.clone(),
Some(&df_stmt),
query_ctx.clone(),
)
@@ -230,7 +257,7 @@ impl DfLogicalPlanner {
.await?;
// Optimize logical plan by extension rules
let context = QueryEngineContext::new(self.session_state.clone(), query_ctx);
let context = QueryEngineContext::new(scheduled_state, query_ctx);
let plan = self
.engine_state
.optimize_by_extension_rules(plan, &context)?;
@@ -248,9 +275,10 @@ impl DfLogicalPlanner {
normalize_ident: bool,
query_ctx: QueryContextRef,
) -> Result<DfExpr> {
let scheduled_state = self.derive_session_state_with_scheduled_time(&query_ctx)?;
let context_provider = DfContextProviderAdapter::try_new(
self.engine_state.clone(),
self.session_state.clone(),
scheduled_state,
None,
query_ctx,
)
@@ -271,8 +299,9 @@ impl DfLogicalPlanner {
#[tracing::instrument(skip_all)]
async fn plan_pql(&self, stmt: &EvalStmt, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
let scheduled_state = self.derive_session_state_with_scheduled_time(&query_ctx)?;
let plan_decoder = Arc::new(DefaultPlanDecoder::new(
self.session_state.clone(),
scheduled_state.clone(),
&query_ctx,
)?);
let table_provider = DfTableSourceProvider::new(
@@ -280,7 +309,7 @@ impl DfLogicalPlanner {
self.engine_state.disallow_cross_catalog_query(),
query_ctx.clone(),
plan_decoder,
self.session_state
scheduled_state
.config_options()
.sql_parser
.enable_ident_normalization,
@@ -290,7 +319,7 @@ impl DfLogicalPlanner {
.map_err(BoxedError::new)
.context(QueryPlanSnafu)?;
let context = QueryEngineContext::new(self.session_state.clone(), query_ctx);
let context = QueryEngineContext::new(scheduled_state, query_ctx);
Ok(self
.engine_state
.optimize_by_extension_rules(plan, &context)?)

View File

@@ -19,6 +19,7 @@ use std::time::Duration;
use arrow_schema::DataType;
use async_recursion::async_recursion;
use catalog::table_source::DfTableSourceProvider;
use chrono::{DateTime, Utc};
use common_time::interval::{MS_PER_DAY, NANOS_PER_MILLI};
use common_time::timestamp::TimeUnit;
use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp, Timezone};
@@ -121,7 +122,7 @@ fn parse_duration_expr(args: &[Expr], i: usize) -> DFResult<Duration> {
parse_duration(str).map_err(DataFusionError::Plan)
}
Some(expr) => {
let ms = evaluate_expr_to_millisecond(args, i, true)?;
let ms = evaluate_expr_to_millisecond(args, i, true, None)?;
if ms <= 0 {
return Err(dispose_parse_error(Some(expr)));
}
@@ -138,14 +139,22 @@ fn parse_duration_expr(args: &[Expr], i: usize) -> DFResult<Duration> {
/// Output a millisecond timestamp
///
/// if `interval_only==true`, only accept expr with all interval type (case 2 will return a error)
fn evaluate_expr_to_millisecond(args: &[Expr], i: usize, interval_only: bool) -> DFResult<i64> {
fn evaluate_expr_to_millisecond(
args: &[Expr],
i: usize,
interval_only: bool,
scheduled_time: Option<DateTime<Utc>>,
) -> DFResult<i64> {
let Some(expr) = args.get(i) else {
return Err(dispose_parse_error(None));
};
if interval_only && !interval_only_in_expr(expr) {
return Err(dispose_parse_error(Some(expr)));
}
let info = SimplifyContext::default().with_current_time();
let info = match scheduled_time {
Some(dt) => SimplifyContext::default().with_query_execution_start_time(Some(dt)),
None => SimplifyContext::default().with_current_time(),
};
let simplify_expr = ExprSimplifier::new(info).simplify(expr.clone())?;
match simplify_expr {
Expr::Literal(ScalarValue::TimestampNanosecond(ts_nanos, _), _)
@@ -207,13 +216,22 @@ fn evaluate_expr_to_millisecond(args: &[Expr], i: usize, interval_only: bool) ->
/// 2. Timestamp string: align to specific timestamp
/// 3. An expr can be evaluated at the logical plan stage (e.g. `now() - INTERVAL '1' day`)
/// 4. leave empty (as Default Option): align to unix epoch 0 (timezone aware)
fn parse_align_to(args: &[Expr], i: usize, timezone: Option<&Timezone>) -> DFResult<i64> {
fn parse_align_to(
args: &[Expr],
i: usize,
timezone: Option<&Timezone>,
scheduled_time: Option<DateTime<Utc>>,
) -> DFResult<i64> {
let Ok(s) = parse_str_expr(args, i) else {
return evaluate_expr_to_millisecond(args, i, false);
return evaluate_expr_to_millisecond(args, i, false, scheduled_time);
};
let upper = s.to_uppercase();
match upper.as_str() {
"NOW" => return Ok(Timestamp::current_millis().value()),
"NOW" => {
return Ok(scheduled_time
.map(|dt| dt.timestamp_millis())
.unwrap_or_else(|| Timestamp::current_millis().value()));
}
// default align to unix epoch 0 (timezone aware)
"" => return Ok(timezone.map(|tz| tz.local_minus_utc() * 1000).unwrap_or(0)),
_ => (),
@@ -285,7 +303,15 @@ impl TreeNodeRewriter for RangeExprRewriter<'_> {
.map_err(|e| DataFusionError::Plan(e.to_string()))?;
let by = parse_expr_list(&func.args, 4, byc)?;
let align = parse_duration_expr(&func.args, byc + 4)?;
let align_to = parse_align_to(&func.args, byc + 5, Some(&self.query_ctx.timezone()))?;
let scheduled_time =
crate::options::parse_scheduled_time_datetime(&self.query_ctx.extensions())
.map_err(|err| DataFusionError::Plan(err.to_string()))?;
let align_to = parse_align_to(
&func.args,
byc + 5,
Some(&self.query_ctx.timezone()),
scheduled_time,
)?;
let mut data_type = range_expr.get_type(self.input_plan.schema())?;
let mut need_cast = false;
let fill = Fill::try_from_str(parse_str_expr(&func.args, 2)?, &data_type)?;
@@ -665,7 +691,7 @@ mod test {
use datafusion_expr::{BinaryExpr, Literal, Operator};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use session::context::QueryContext;
use session::context::{QueryContext, QueryContextBuilder};
use table::metadata::{TableInfoBuilder, TableMetaBuilder};
use table::table::TableRef;
use table::test_util::EmptyTable;
@@ -771,6 +797,12 @@ mod test {
engine.planner().plan(&stmt, QueryContext::arc()).await
}
async fn do_query_with_ctx(sql: &str, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx).unwrap();
let engine = create_test_engine().await;
engine.planner().plan(&stmt, query_ctx).await
}
async fn do_union_query(sql: &str) -> Result<LogicalPlan> {
let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
let engine = create_union_test_engine().await;
@@ -782,6 +814,26 @@ mod test {
assert_eq!(plan.display_indent_schema().to_string(), expected);
}
#[tokio::test]
async fn range_align_to_now_uses_scheduled_time_extension() {
let query_ctx = Arc::new(
QueryContextBuilder::default()
.set_extension(
crate::options::FLOW_SCHEDULED_TIME_MILLIS.to_string(),
"1700000000123".to_string(),
)
.build(),
);
let query = r#"SELECT timestamp, tag_0, tag_1, avg(field_0) RANGE '5m' FROM test ALIGN '1h' TO NOW by (tag_0,tag_1);"#;
let plan = do_query_with_ctx(query, query_ctx).await.unwrap();
assert!(
plan.display_indent_schema()
.to_string()
.contains("align_to=1700000000123ms")
);
}
#[tokio::test]
async fn range_no_project() {
let query = r#"SELECT timestamp, tag_0, tag_1, avg(field_0 + field_1) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1);"#;
@@ -1077,37 +1129,54 @@ mod test {
fn test_parse_align_to() {
// test NOW
let args = vec!["NOW".lit()];
let epsinon = parse_align_to(&args, 0, None).unwrap() - Timestamp::current_millis().value();
let epsinon =
parse_align_to(&args, 0, None, None).unwrap() - Timestamp::current_millis().value();
assert!(epsinon.abs() < 100);
let scheduled_time = DateTime::from_timestamp(1_700_000_000, 123_000_000).unwrap();
assert_eq!(
scheduled_time.timestamp_millis(),
parse_align_to(&args, 0, None, Some(scheduled_time)).unwrap()
);
// test default
let args = vec!["".lit()];
assert_eq!(0, parse_align_to(&args, 0, None).unwrap());
assert_eq!(0, parse_align_to(&args, 0, None, None).unwrap());
// test default with timezone
let args = vec!["".lit()];
assert_eq!(
-36000 * 1000,
parse_align_to(&args, 0, Some(&Timezone::from_tz_string("HST").unwrap())).unwrap()
parse_align_to(
&args,
0,
Some(&Timezone::from_tz_string("HST").unwrap()),
None
)
.unwrap()
);
assert_eq!(
28800 * 1000,
parse_align_to(
&args,
0,
Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap())
Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap()),
None
)
.unwrap()
);
// test Timestamp
let args = vec!["1970-01-01T00:00:00+08:00".lit()];
assert_eq!(parse_align_to(&args, 0, None).unwrap(), -8 * 60 * 60 * 1000);
assert_eq!(
parse_align_to(&args, 0, None, None).unwrap(),
-8 * 60 * 60 * 1000
);
// timezone
let args = vec!["1970-01-01T00:00:00".lit()];
assert_eq!(
parse_align_to(
&args,
0,
Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap())
Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap()),
None
)
.unwrap(),
-8 * 60 * 60 * 1000
@@ -1122,7 +1191,7 @@ mod test {
ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(0, 10).into())).lit(),
),
})];
assert_eq!(parse_align_to(&args, 0, None).unwrap(), 20);
assert_eq!(parse_align_to(&args, 0, None, None).unwrap(), 20);
}
#[test]

View File

@@ -627,6 +627,7 @@ fn to_flight_data_stream(
#[cfg(test)]
mod tests {
use query::options::FLOW_SCHEDULED_TIME_MILLIS;
use tonic::metadata::{AsciiMetadataValue, MetadataMap};
use super::*;
@@ -655,6 +656,25 @@ mod tests {
);
}
#[test]
fn test_flow_extensions_can_carry_scheduled_time() {
let mut metadata = MetadataMap::new();
metadata.insert(
FLOW_EXTENSIONS_METADATA_KEY,
AsciiMetadataValue::try_from(r#"[["flow.scheduled_time_millis","1700000000000"]]"#)
.unwrap(),
);
let flow_extensions = extract_flow_extensions(&metadata).unwrap();
let query_ctx =
create_query_context(Channel::Grpc, None, flow_extensions, HashMap::new()).unwrap();
assert_eq!(
query_ctx.extension(FLOW_SCHEDULED_TIME_MILLIS),
Some("1700000000000")
);
}
#[test]
fn test_extract_flow_extensions_rejects_invalid_json() {
let mut metadata = MetadataMap::new();

View File

@@ -299,6 +299,7 @@ impl Drop for RequestTimer {
mod tests {
use chrono::FixedOffset;
use common_time::Timezone;
use query::options::FLOW_SCHEDULED_TIME_MILLIS;
use session::hints::{
INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY, REMOTE_QUERY_ID_EXTENSION_KEY,
};
@@ -326,6 +327,10 @@ mod tests {
INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY.to_string(),
"spoofed-regs".to_string(),
),
(
FLOW_SCHEDULED_TIME_MILLIS.to_string(),
"1700000000000".to_string(),
),
],
HashMap::from([(7, 88)]),
)
@@ -341,23 +346,17 @@ mod tests {
query_context.read_preference(),
ReadPreference::Leader
));
let mut extensions = query_context.extensions().into_iter().collect::<Vec<_>>();
extensions.sort_unstable_by(|a, b| a.0.cmp(&b.0));
assert_eq!(
extensions[0],
("auto_create_table".to_string(), "true".to_string())
);
assert_eq!(extensions[1].0, REMOTE_QUERY_ID_EXTENSION_KEY.to_string());
assert_eq!(
query_context.remote_query_id(),
Some(extensions[1].1.as_str())
);
assert_eq!(query_context.extension("auto_create_table"), Some("true"));
assert_ne!(query_context.remote_query_id(), Some("spoofed"));
assert!(
query_context
.extension(INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY)
.is_none()
);
assert_eq!(
query_context.extension(FLOW_SCHEDULED_TIME_MILLIS),
Some("1700000000000")
);
}
#[test]

View File

@@ -46,6 +46,7 @@ fn apply_hints(query_ctx: &mut QueryContext, hints: Vec<(String, String)>) {
#[cfg(test)]
mod tests {
use common_query::request::INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY as COMMON_INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY;
use query::options::FLOW_SCHEDULED_TIME_MILLIS;
use session::context::{QueryContextBuilder, generate_remote_query_id};
use session::hints::{
INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY, REMOTE_QUERY_ID_EXTENSION_KEY,
@@ -54,7 +55,7 @@ mod tests {
use super::apply_hints;
#[test]
fn test_apply_hints_ignores_reserved_extension_keys() {
fn test_apply_hints_ignores_reserved_extension_keys_only() {
let original_query_id = generate_remote_query_id();
let mut query_ctx = QueryContextBuilder::default()
.set_extension(
@@ -74,6 +75,10 @@ mod tests {
INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY.to_string(),
"spoofed-regs".to_string(),
),
(
FLOW_SCHEDULED_TIME_MILLIS.to_string(),
"1700000000000".to_string(),
),
("ttl".to_string(), "7d".to_string()),
],
);
@@ -87,6 +92,10 @@ mod tests {
.extension(INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY)
.is_none()
);
assert_eq!(
query_ctx.extension(FLOW_SCHEDULED_TIME_MILLIS),
Some("1700000000000")
);
assert_eq!(query_ctx.extension("ttl"), Some("7d"));
}

View File

@@ -14,6 +14,7 @@
use std::str::FromStr;
use chrono::{DateTime, Utc};
use snafu::{OptionExt, ResultExt};
use sqlparser::ast::{Ident, Query, Value};
use sqlparser::dialect::Dialect;
@@ -32,12 +33,18 @@ pub const FLOW: &str = "FLOW";
/// SQL Parser options.
#[derive(Clone, Debug, Default)]
pub struct ParseOptions {}
pub struct ParseOptions {
/// If set, TQL parameter expressions containing `now()` will be evaluated
/// against this scheduled time instead of wall-clock time.
pub scheduled_time: Option<DateTime<Utc>>,
}
/// GrepTime SQL parser context, a simple wrapper for Datafusion SQL parser.
pub struct ParserContext<'a> {
pub(crate) parser: Parser<'a>,
pub(crate) sql: &'a str,
/// Optional scheduled time for `now()` evaluation in TQL parameters.
pub(crate) scheduled_time: Option<DateTime<Utc>>,
}
impl ParserContext<'_> {
@@ -48,7 +55,11 @@ impl ParserContext<'_> {
.try_with_sql(sql)
.context(SyntaxSnafu)?;
Ok(ParserContext { parser, sql })
Ok(ParserContext {
parser,
sql,
scheduled_time: None,
})
}
/// Parses parser context to Query.
@@ -60,11 +71,12 @@ impl ParserContext<'_> {
pub fn create_with_dialect(
sql: &str,
dialect: &dyn Dialect,
_opts: ParseOptions,
opts: ParseOptions,
) -> Result<Vec<Statement>> {
let mut stmts: Vec<Statement> = Vec::new();
let mut parser_ctx = ParserContext::new(dialect, sql)?;
parser_ctx.scheduled_time = opts.scheduled_time;
let mut expecting_statement_delimiter = false;
loop {
@@ -95,7 +107,12 @@ impl ParserContext<'_> {
.with_options(ParserOptions::new().with_trailing_commas(true))
.try_with_sql(sql)
.context(SyntaxSnafu)?;
ParserContext { parser, sql }.intern_parse_table_name()
ParserContext {
parser,
sql,
scheduled_time: None,
}
.intern_parse_table_name()
}
pub(crate) fn intern_parse_table_name(&mut self) -> Result<ObjectName> {

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use chrono::{DateTime, Utc};
use datafusion_common::ScalarValue;
use snafu::{OptionExt, ResultExt};
use sqlparser::ast::AnalyzeFormat;
@@ -172,6 +173,7 @@ impl ParserContext<'_> {
let raw_query = tql_sql.trim_end_matches(';');
let mut parser_ctx = ParserContext::new(&crate::dialect::GreptimeDbDialect {}, tql_sql)?;
parser_ctx.scheduled_time = self.scheduled_time;
let statement = parser_ctx.parse_tql(require_now_expr)?;
match statement {
@@ -195,6 +197,7 @@ impl ParserContext<'_> {
&mut self,
require_now_expr: bool,
) -> std::result::Result<TqlParameters, TQLError> {
let scheduled_time = self.scheduled_time;
let parser = &mut self.parser;
let (start, end, step, lookback) = match parser.peek_token().token {
Token::LParen => {
@@ -217,16 +220,22 @@ impl ParserContext<'_> {
let start = Self::parse_expr_to_literal_or_ts(
exprs_iter.next().unwrap(),
require_now_expr,
scheduled_time,
)?;
let end = Self::parse_expr_to_literal_or_ts(
exprs_iter.next().unwrap(),
require_now_expr,
scheduled_time,
)?;
let step = Self::parse_expr_to_literal_or_ts(
exprs_iter.next().unwrap(),
false,
scheduled_time,
)?;
let step = Self::parse_expr_to_literal_or_ts(exprs_iter.next().unwrap(), false)?;
let lookback = exprs_iter
.next()
.map(|expr| Self::parse_expr_to_literal_or_ts(expr, false))
.map(|expr| Self::parse_expr_to_literal_or_ts(expr, false, scheduled_time))
.transpose()?;
if !parser.consume_token(&Token::RParen) {
@@ -289,6 +298,7 @@ impl ParserContext<'_> {
fn parse_expr_to_literal_or_ts(
parser_expr: sqlparser::ast::Expr,
require_now_expr: bool,
scheduled_time: Option<DateTime<Utc>>,
) -> std::result::Result<String, TQLError> {
match parser_expr {
sqlparser::ast::Expr::Value(v) => match v.value {
@@ -313,7 +323,7 @@ impl ParserContext<'_> {
}
}
},
_ => Self::parse_expr_to_ts(parser_expr, require_now_expr),
_ => Self::parse_expr_to_ts(parser_expr, require_now_expr, scheduled_time),
}
}
@@ -321,10 +331,15 @@ impl ParserContext<'_> {
fn parse_expr_to_ts(
parser_expr: sqlparser::ast::Expr,
require_now_expr: bool,
scheduled_time: Option<DateTime<Utc>>,
) -> std::result::Result<String, TQLError> {
let lit = utils::parser_expr_to_scalar_value_literal(parser_expr, require_now_expr)
.map_err(Box::new)
.context(ConvertToLogicalExpressionSnafu)?;
let lit = utils::parser_expr_to_scalar_value_literal_at(
parser_expr,
require_now_expr,
scheduled_time,
)
.map_err(Box::new)
.context(ConvertToLogicalExpressionSnafu)?;
let second = match lit {
ScalarValue::TimestampNanosecond(ts_nanos, _)
@@ -416,6 +431,35 @@ mod tests {
result.remove(0)
}
fn parse_into_statement_at(sql: &str, scheduled_time: DateTime<Utc>) -> Statement {
let mut result = ParserContext::create_with_dialect(
sql,
&GreptimeDbDialect {},
ParseOptions {
scheduled_time: Some(scheduled_time),
},
)
.unwrap();
assert_eq!(1, result.len());
result.remove(0)
}
#[test]
fn test_parse_tql_eval_with_scheduled_time() {
let scheduled_time = DateTime::from_timestamp(1_700_000_000, 0).unwrap();
let sql = "TQL EVAL (now() - '10 minutes'::interval, now(), '1m') http_requests_total";
match parse_into_statement_at(sql, scheduled_time) {
Statement::Tql(Tql::Eval(eval)) => {
assert_eq!(eval.start, "1699999400");
assert_eq!(eval.end, "1700000000");
assert_eq!(eval.step, "1m");
assert_eq!(eval.query, "http_requests_total");
}
_ => unreachable!(),
}
}
#[test]
fn test_require_now_expr() {
let sql = "TQL EVAL (now() - now(), now() - (now() - '10 seconds'::interval), '1s') http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m";

View File

@@ -15,6 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;
use chrono::{DateTime, Utc};
use datafusion::config::ConfigOptions;
use datafusion::error::Result as DfResult;
use datafusion::execution::SessionStateBuilder;
@@ -201,6 +202,18 @@ fn is_plain_wildcard_projection(projection: &[sqlparser::ast::SelectItem]) -> bo
pub fn parser_expr_to_scalar_value_literal(
expr: sqlparser::ast::Expr,
require_now_expr: bool,
) -> Result<ScalarValue> {
parser_expr_to_scalar_value_literal_at(expr, require_now_expr, None)
}
/// Same as [`parser_expr_to_scalar_value_literal`] but uses the provided
/// `scheduled_time` for evaluating `now()`. If `scheduled_time` is
/// `Some`, `now()` will be simplified to the given timestamp instead of
/// the current wall-clock time.
pub fn parser_expr_to_scalar_value_literal_at(
expr: sqlparser::ast::Expr,
require_now_expr: bool,
scheduled_time: Option<DateTime<Utc>>,
) -> Result<ScalarValue> {
// 1. convert parser expr to logical expr
let empty_df_schema = DFSchema::empty();
@@ -250,8 +263,11 @@ pub fn parser_expr_to_scalar_value_literal(
}
}
// 2. simplify logical expr
let info = SimplifyContext::default().with_current_time();
// 2. simplify logical expr — use scheduled time if provided, else wall-clock
let info = match scheduled_time {
Some(dt) => SimplifyContext::default().with_query_execution_start_time(Some(dt)),
None => SimplifyContext::default().with_current_time(),
};
let simplifier = ExprSimplifier::new(info);
// Coerce the logical expression so simplifier can handle it correctly. This is necessary for const eval with possible type mismatch. i.e.: `now() - now() + '15s'::interval` which is `TimestampNanosecond - TimestampNanosecond + IntervalMonthDayNano`.

View File

@@ -275,6 +275,7 @@ impl ParserContext<'_> {
let tql_string = tql_string.trim();
let mut parser_ctx = ParserContext::new(&GreptimeDbDialect {}, tql_string)?;
parser_ctx.scheduled_time = self.scheduled_time;
let statement = parser_ctx.parse_tql(require_now_expr)?;
match statement {

View File

@@ -0,0 +1,52 @@
CREATE TABLE eval_interval_schedule_input (
ts TIMESTAMP(3) TIME INDEX,
series STRING,
v DOUBLE,
PRIMARY KEY(series)
);
Affected Rows: 0
CREATE FLOW eval_interval_schedule_flow
SINK TO eval_interval_schedule_sink
EVAL INTERVAL '1s'
AS
SELECT
date_trunc('second', now()) AS ts,
count(v) AS value_count
FROM eval_interval_schedule_input
GROUP BY date_trunc('second', now());
Affected Rows: 0
INSERT INTO eval_interval_schedule_input VALUES
('2026-06-25 00:00:00', 'a', 1.0);
Affected Rows: 1
-- SQLNESS SLEEP 5s
SELECT
count(DISTINCT ts) >= 2 AS has_multiple_scheduled_ticks,
min(value_count) AS min_value_count,
max(value_count) AS max_value_count
FROM eval_interval_schedule_sink
WHERE value_count > 0;
+------------------------------+-----------------+-----------------+
| has_multiple_scheduled_ticks | min_value_count | max_value_count |
+------------------------------+-----------------+-----------------+
| true | 1 | 1 |
+------------------------------+-----------------+-----------------+
DROP FLOW eval_interval_schedule_flow;
Affected Rows: 0
DROP TABLE eval_interval_schedule_sink;
Affected Rows: 0
DROP TABLE eval_interval_schedule_input;
Affected Rows: 0

View File

@@ -0,0 +1,31 @@
CREATE TABLE eval_interval_schedule_input (
ts TIMESTAMP(3) TIME INDEX,
series STRING,
v DOUBLE,
PRIMARY KEY(series)
);
CREATE FLOW eval_interval_schedule_flow
SINK TO eval_interval_schedule_sink
EVAL INTERVAL '1s'
AS
SELECT
date_trunc('second', now()) AS ts,
count(v) AS value_count
FROM eval_interval_schedule_input
GROUP BY date_trunc('second', now());
INSERT INTO eval_interval_schedule_input VALUES
('2026-06-25 00:00:00', 'a', 1.0);
-- SQLNESS SLEEP 5s
SELECT
count(DISTINCT ts) >= 2 AS has_multiple_scheduled_ticks,
min(value_count) AS min_value_count,
max(value_count) AS max_value_count
FROM eval_interval_schedule_sink
WHERE value_count > 0;
DROP FLOW eval_interval_schedule_flow;
DROP TABLE eval_interval_schedule_sink;
DROP TABLE eval_interval_schedule_input;

View File

@@ -0,0 +1,8 @@
name = "flow_eval_interval"
reason = "Verify EVAL INTERVAL SQL and TQL flow metadata created before upgrade can be read and executed after upgrade."
introduced_by = "PR #8360"
topologies = ["distributed"]
from_range = ["<=v1.1.1"]
to_range = [">v1.1.1"]
features = ["flow", "tql", "table"]
owner = "flow"

View File

@@ -0,0 +1,30 @@
CREATE TABLE compat_sql_input (
ts TIMESTAMP(3) TIME INDEX,
series STRING,
v DOUBLE,
PRIMARY KEY(series)
);
CREATE FLOW compat_sql_eval_flow
SINK TO compat_sql_eval_sink
EVAL INTERVAL '1s'
AS
SELECT
date_trunc('second', now()) AS ts,
count(v) AS value_count
FROM compat_sql_input
GROUP BY date_trunc('second', now());
CREATE TABLE compat_tql_input (
ts TIMESTAMP(3) TIME INDEX,
host STRING,
idc STRING,
val DOUBLE,
PRIMARY KEY(host, idc)
);
CREATE FLOW compat_tql_eval_flow
SINK TO compat_tql_eval_sink
EVAL INTERVAL '1s'
AS
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", compat_tql_input);

View File

@@ -0,0 +1,119 @@
SELECT flow_name, source_table_names
FROM information_schema.flows
WHERE flow_name IN ('compat_sql_eval_flow', 'compat_tql_eval_flow')
ORDER BY flow_name;
+----------------------+----------------------------------------------+
| flow_name | source_table_names |
+----------------------+----------------------------------------------+
| compat_sql_eval_flow | greptime.flow_eval_interval.compat_sql_input |
| compat_tql_eval_flow | greptime.flow_eval_interval.compat_tql_input |
+----------------------+----------------------------------------------+
SHOW CREATE FLOW compat_sql_eval_flow;
+----------------------+---------------------------------------------------------------------------------------------------------------------------------+
| Flow | Create Flow |
+----------------------+---------------------------------------------------------------------------------------------------------------------------------+
| compat_sql_eval_flow | CREATE FLOW IF NOT EXISTS compat_sql_eval_flow |
| | SINK TO flow_eval_interval.compat_sql_eval_sink |
| | EVAL INTERVAL '1 s' |
| | AS SELECT date_trunc('second', now()) AS ts, count(v) AS value_count FROM compat_sql_input GROUP BY date_trunc('second', now()) |
+----------------------+---------------------------------------------------------------------------------------------------------------------------------+
SHOW CREATE FLOW compat_tql_eval_flow;
+----------------------+-------------------------------------------------------------------------------------------------+
| Flow | Create Flow |
+----------------------+-------------------------------------------------------------------------------------------------+
| compat_tql_eval_flow | CREATE FLOW IF NOT EXISTS compat_tql_eval_flow |
| | SINK TO flow_eval_interval.compat_tql_eval_sink |
| | EVAL INTERVAL '1 s' |
| | AS TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", compat_tql_input) |
+----------------------+-------------------------------------------------------------------------------------------------+
SHOW CREATE TABLE compat_sql_eval_sink;
+----------------------+-----------------------------------------------------+
| Table | Create Table |
+----------------------+-----------------------------------------------------+
| compat_sql_eval_sink | CREATE TABLE IF NOT EXISTS "compat_sql_eval_sink" ( |
| | "ts" TIMESTAMP(9) NOT NULL, |
| | "value_count" BIGINT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+----------------------+-----------------------------------------------------+
SHOW CREATE TABLE compat_tql_eval_sink;
+----------------------+-----------------------------------------------------+
| Table | Create Table |
+----------------------+-----------------------------------------------------+
| compat_tql_eval_sink | CREATE TABLE IF NOT EXISTS "compat_tql_eval_sink" ( |
| | "count(compat_tql_input.val)" DOUBLE NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "status_code" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("status_code") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+----------------------+-----------------------------------------------------+
INSERT INTO compat_sql_input VALUES
('2026-06-25 00:00:00', 'a', 1.0),
('2026-06-25 00:00:01', 'b', 2.0);
Affected Rows: 2
INSERT INTO compat_tql_input VALUES
(now() - '17s'::interval, 'host1', 'idc1', 200),
(now() - '13s'::interval, 'host2', 'idc1', 401),
(now() - '7s'::interval, 'host3', 'idc2', 500);
Affected Rows: 3
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('compat_sql_eval_flow');
+------------------------------------------+
| ADMIN FLUSH_FLOW('compat_sql_eval_flow') |
+------------------------------------------+
| FLOW_FLUSHED |
+------------------------------------------+
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('compat_tql_eval_flow');
+------------------------------------------+
| ADMIN FLUSH_FLOW('compat_tql_eval_flow') |
+------------------------------------------+
| FLOW_FLUSHED |
+------------------------------------------+
SELECT count(*) > 0 AS sql_flow_executed
FROM compat_sql_eval_sink;
+-------------------+
| sql_flow_executed |
+-------------------+
| true |
+-------------------+
SELECT count(*) > 0 AS tql_flow_executed
FROM compat_tql_eval_sink;
+-------------------+
| tql_flow_executed |
+-------------------+
| true |
+-------------------+

View File

@@ -0,0 +1,33 @@
SELECT flow_name, source_table_names
FROM information_schema.flows
WHERE flow_name IN ('compat_sql_eval_flow', 'compat_tql_eval_flow')
ORDER BY flow_name;
SHOW CREATE FLOW compat_sql_eval_flow;
SHOW CREATE FLOW compat_tql_eval_flow;
SHOW CREATE TABLE compat_sql_eval_sink;
SHOW CREATE TABLE compat_tql_eval_sink;
INSERT INTO compat_sql_input VALUES
('2026-06-25 00:00:00', 'a', 1.0),
('2026-06-25 00:00:01', 'b', 2.0);
INSERT INTO compat_tql_input VALUES
(now() - '17s'::interval, 'host1', 'idc1', 200),
(now() - '13s'::interval, 'host2', 'idc1', 401),
(now() - '7s'::interval, 'host3', 'idc2', 500);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('compat_sql_eval_flow');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('compat_tql_eval_flow');
SELECT count(*) > 0 AS sql_flow_executed
FROM compat_sql_eval_sink;
SELECT count(*) > 0 AS tql_flow_executed
FROM compat_tql_eval_sink;