From 4071cb56aecbe8cad664e06b786013e710bcf29e Mon Sep 17 00:00:00 2001 From: Yingwen Date: Mon, 29 Jun 2026 21:27:22 +0800 Subject: [PATCH] feat(flow): stabilize eval interval scheduling (#8360) (#8378) * feat(flow): stabilize eval interval scheduling * fix(flow): satisfy eval schedule clippy * test(flow): trim eval schedule coverage * test(flow): cover stable eval scheduling * fix(flow): reserve scheduled runtime hint * test(flow): trim sqlness result eof * fix(flow): harden eval schedule edges * fix(flow): address scheduled flow review * fix(flow): clean scheduled config handling * test(flow): add eval interval compat case * test(flow): cover show create flow in compat * fix(flow): drop scheduled time from flow context * test(flow): assert scheduled now binding --------- Signed-off-by: discord9 Signed-off-by: evenyag Co-authored-by: discord9 --- .../meta/src/cache/flow/table_flownode.rs | 1 + src/common/meta/src/ddl/create_flow.rs | 217 ++++++++-- src/common/meta/src/ddl/tests/create_flow.rs | 363 ++++++++++++++++- src/common/meta/src/key/flow.rs | 3 + src/common/meta/src/key/flow/flow_info.rs | 86 ++++ src/common/meta/src/rpc/ddl.rs | 7 + src/flow/src/adapter.rs | 1 + src/flow/src/adapter/flownode_impl.rs | 58 ++- src/flow/src/batching_mode.rs | 1 + src/flow/src/batching_mode/engine.rs | 22 + src/flow/src/batching_mode/eval_schedule.rs | 353 ++++++++++++++++ src/flow/src/batching_mode/task.rs | 378 +++++++++++++++--- src/flow/src/batching_mode/task/test.rs | 362 ++++++++++++++++- src/flow/src/batching_mode/utils.rs | 14 +- src/flow/src/batching_mode/utils/test.rs | 1 + src/flow/src/df_optimizer.rs | 8 +- src/flow/src/engine.rs | 2 + src/operator/src/statement/ddl.rs | 112 +++++- src/query/src/datafusion.rs | 13 + src/query/src/options.rs | 107 +++++ src/query/src/parser.rs | 43 +- src/query/src/planner.rs | 45 ++- src/query/src/range_select/plan_rewrite.rs | 99 ++++- src/servers/src/grpc/flight.rs | 20 + src/servers/src/grpc/greptime_handler.rs | 21 +- src/servers/src/http/hints.rs | 11 +- src/sql/src/parser.rs | 25 +- src/sql/src/parsers/tql_parser.rs | 56 ++- src/sql/src/parsers/utils.rs | 20 +- src/sql/src/parsers/with_tql_parser.rs | 1 + .../flow_eval_interval_schedule.result | 52 +++ .../flow-tql/flow_eval_interval_schedule.sql | 31 ++ .../cases/flow_eval_interval/case.toml | 8 + .../cases/flow_eval_interval/setup.sql | 30 ++ .../cases/flow_eval_interval/verify.result | 119 ++++++ .../cases/flow_eval_interval/verify.sql | 33 ++ 36 files changed, 2552 insertions(+), 171 deletions(-) create mode 100644 src/flow/src/batching_mode/eval_schedule.rs create mode 100644 tests/cases/distributed/flow-tql/flow_eval_interval_schedule.result create mode 100644 tests/cases/distributed/flow-tql/flow_eval_interval_schedule.sql create mode 100644 tests/compatibility/cases/flow_eval_interval/case.toml create mode 100644 tests/compatibility/cases/flow_eval_interval/setup.sql create mode 100644 tests/compatibility/cases/flow_eval_interval/verify.result create mode 100644 tests/compatibility/cases/flow_eval_interval/verify.sql diff --git a/src/common/meta/src/cache/flow/table_flownode.rs b/src/common/meta/src/cache/flow/table_flownode.rs index 4d3513a21d..76af08c77f 100644 --- a/src/common/meta/src/cache/flow/table_flownode.rs +++ b/src/common/meta/src/cache/flow/table_flownode.rs @@ -252,6 +252,7 @@ mod tests { status: FlowStatus::Active, created_time: chrono::Utc::now(), updated_time: chrono::Utc::now(), + eval_schedule: None, }, (1..=3) .map(|i| { diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index a8779b67cb..02502b4737 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -41,7 +41,7 @@ use crate::ddl::DdlContext; use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error}; use crate::error::{self, Result, UnexpectedSnafu}; use crate::instruction::{CacheIdent, CreateFlow, DropFlow}; -use crate::key::flow::flow_info::{FlowInfoValue, FlowStatus}; +use crate::key::flow::flow_info::{FlowInfoValue, FlowScheduleConfig, FlowStatus}; use crate::key::flow::flow_route::FlowRouteValue; use crate::key::table_name::TableNameKey; use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId}; @@ -69,7 +69,7 @@ impl CreateFlowProcedure { peers: vec![], source_table_ids: vec![], unresolved_source_table_names: vec![], - flow_context: query_context.into(), // Convert to FlowQueryContext + flow_context: without_scheduled_time_extension(query_context).into(), state: CreateFlowState::Prepare, prev_flow_info_value: None, did_replace: false, @@ -210,6 +210,18 @@ impl CreateFlowProcedure { } self.data.flow_type = Some(get_flow_type_from_options(&self.data.task)?); + // Resolve schedule defaults into task.eval_schedule once, before + // CreateRequest / FlowInfoValue are constructed. This ensures the same + // typed schedule config is sent to flownodes and persisted in metadata. + // Idempotent: if eval_schedule is already present we do not recompute. + resolve_schedule_defaults_into_task( + &mut self.data.task, + self.data + .prev_flow_info_value + .as_ref() + .map(|v| v.get_inner_ref()), + ); + self.data.state = if self.data.is_pending() { self.data.peers.clear(); CreateFlowState::CreateMetadata @@ -249,7 +261,12 @@ impl CreateFlowProcedure { header: Some(FlowRequestHeader { tracing_context: TracingContext::from_current_span().to_w3c(), // Convert FlowQueryContext to QueryContext - query_context: Some(QueryContext::from(self.data.flow_context.clone()).into()), + query_context: Some( + without_scheduled_time_extension(QueryContext::from( + self.data.flow_context.clone(), + )) + .into(), + ), }), body: Some(PbFlowRequest::Create((&self.data).into())), }; @@ -405,6 +422,23 @@ pub fn get_flow_type_from_options(flow_task: &CreateFlowTask) -> Result QueryContext { + query_context + .extensions + .remove(FLOW_SCHEDULED_TIME_MILLIS_EXTENSION_KEY); + query_context +} + pub fn defer_on_missing_source(flow_task: &CreateFlowTask) -> Result { flow_task .flow_options @@ -428,6 +462,16 @@ pub fn defer_on_missing_source(flow_task: &CreateFlowTask) -> Result { } pub fn validate_flow_options(flow_task: &CreateFlowTask) -> Result<()> { + // Reject non-positive eval_interval_secs (zero or negative). + if let Some(secs) = flow_task.eval_interval_secs + && secs <= 0 + { + return UnexpectedSnafu { + err_msg: format!("EVAL INTERVAL must be positive, got {secs} seconds"), + } + .fail(); + } + for key in flow_task.flow_options.keys() { match key.as_str() { DEFER_ON_MISSING_SOURCE_KEY @@ -449,16 +493,116 @@ pub fn validate_flow_options(flow_task: &CreateFlowTask) -> Result<()> { Ok(()) } +/// Computes the ceiling of `time` to the next schedule boundary aligned with `anchor + k * interval`. +/// All values are Unix timestamps in seconds. +fn ceil_to_boundary(time: i64, anchor: i64, interval: i64) -> i64 { + if interval <= 0 { + return time; + } + if time <= anchor { + return anchor; + } + + let diff = i128::from(time) - i128::from(anchor); + let interval = i128::from(interval); + let k = (diff + interval - 1) / interval; + let boundary = i128::from(anchor) + k * interval; + + boundary.clamp(i128::from(i64::MIN), i128::from(i64::MAX)) as i64 +} + +/// Returns the effective typed schedule config for flow metadata. +/// +/// New metadata should carry `FlowInfoValue.eval_schedule`. For older metadata +/// that lacks the typed field, derive a deterministic config from `created_time` +/// and defaults. This avoids recovery-time wall-clock drift. +pub fn effective_eval_schedule_from_flow_info( + flow_info: &FlowInfoValue, +) -> Option { + if let Some(schedule) = &flow_info.eval_schedule { + return Some(schedule.clone()); + } + + let eval_interval_secs = flow_info.eval_interval_secs?; + if eval_interval_secs <= 0 { + return None; + } + + let start_secs = ceil_to_boundary( + flow_info.created_time.timestamp(), + FlowScheduleConfig::DEFAULT_ANCHOR_SECS, + eval_interval_secs, + ); + + Some(FlowScheduleConfig::default_with_start( + start_secs, + eval_interval_secs, + )) +} + +/// Resolve `FlowScheduleConfig` into `task.eval_schedule`. +/// +/// This must be called in `on_prepare` after `prev_flow_info` is loaded so that +/// `CreateRequest` and `FlowInfoValue` both see the same resolved defaults. +/// +/// The function is idempotent: if `task.eval_schedule` is already `Some`, +/// it returns immediately (important for procedure retry / dump-restore). +/// +/// Schedule configuration is NOT read from `task.flow_options` (those keys are +/// no longer user-facing options). For new flows, pure defaults are computed. +/// For OR REPLACE, the previous typed config is preserved when interval+anchor +/// are unchanged; otherwise defaults are recomputed. +pub(crate) fn resolve_schedule_defaults_into_task( + task: &mut CreateFlowTask, + prev_flow_info: Option<&FlowInfoValue>, +) { + // Idempotent: if already computed, skip recomputation. + if task.eval_schedule.is_some() { + return; + } + + let Some(eval_interval_secs) = task.eval_interval_secs else { + return; + }; + if eval_interval_secs <= 0 { + return; + } + + let anchor_secs = FlowScheduleConfig::DEFAULT_ANCHOR_SECS; + + // For OR REPLACE: if interval+anchor unchanged, preserve the entire + // existing typed config so start / policy / limits are stable. + if task.or_replace + && let Some(prev) = prev_flow_info + && let Some(old_sched) = effective_eval_schedule_from_flow_info(prev) + { + let old_interval = prev.eval_interval_secs.unwrap_or(0); + if old_interval == eval_interval_secs && old_sched.anchor_secs == anchor_secs { + task.eval_schedule = Some(old_sched); + return; + } + } + + // --- Compute start --- + let start_secs = + // New flow, or OR REPLACE with changed interval/anchor: start at the next + // aligned boundary after prepare time. The value is written into + // task.eval_schedule once so procedure retry does not recompute it. + ceil_to_boundary(chrono::Utc::now().timestamp(), anchor_secs, eval_interval_secs); + + task.eval_schedule = Some(FlowScheduleConfig::default_with_start( + start_secs, + eval_interval_secs, + )); +} + fn user_runtime_flow_options(options: &HashMap) -> HashMap { let mut options = options.clone(); options.remove(DEFER_ON_MISSING_SOURCE_KEY); + options.remove(INTERNAL_EVAL_SCHEDULE_KEY); options } -fn metadata_flow_options(options: &HashMap) -> HashMap { - options.clone() -} - /// The state of [CreateFlowProcedure]. #[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)] pub enum CreateFlowState { @@ -561,23 +705,40 @@ impl From<&CreateFlowData> for CreateRequest { let flow_type = value.flow_type.unwrap_or_default().to_string(); req.flow_options .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type); + + // Pass typed schedule config via internal transient key in flow_options. + if let Some(ref sched) = value.task.eval_schedule { + let json = serde_json::to_string(sched) + .expect("FlowScheduleConfig serialization should not fail"); + req.flow_options + .insert(INTERNAL_EVAL_SCHEDULE_KEY.to_string(), json); + } + req } } impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteValue)>) { fn from(value: &CreateFlowData) -> Self { - let CreateFlowTask { - catalog_name, - flow_name, - sink_table_name, - expire_after, - eval_interval_secs: eval_interval, - comment, - sql, - .. - } = value.task.clone(); - let mut options = metadata_flow_options(&value.task.flow_options); + let catalog_name = value.task.catalog_name.clone(); + let flow_name = value.task.flow_name.clone(); + let sink_table_name = value.task.sink_table_name.clone(); + let expire_after = value.task.expire_after; + let eval_interval = value.task.eval_interval_secs; + let comment = value.task.comment.clone(); + let sql = value.task.sql.clone(); + let eval_schedule = value.task.eval_schedule.clone(); + + // Start with a clean options map. The transient schedule payload is + // only for the meta→flownode CreateRequest boundary and must not be + // persisted in FlowInfoValue.options. + let mut options: HashMap = value + .task + .flow_options + .iter() + .filter(|(k, _)| k.as_str() != INTERNAL_EVAL_SCHEDULE_KEY) + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); let flownode_ids = value .peers @@ -602,20 +763,25 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa create_time = prev_flow_value.get_inner_ref().created_time; } + // This conversion borrows the procedure data: the procedure keeps using + // `self.data` after metadata creation (for cache invalidation, retry and + // procedure dump/restore), while `FlowInfoValue` owns the persisted + // metadata. Cloning these owned fields is therefore intentional. let flow_info: FlowInfoValue = FlowInfoValue { source_table_ids: value.source_table_ids.clone(), all_source_table_names: value.task.source_table_names.clone(), unresolved_source_table_names: value.unresolved_source_table_names.clone(), - sink_table_name, + sink_table_name: sink_table_name.clone(), flownode_ids, - catalog_name, - // Convert FlowQueryContext back to QueryContext for storage - query_context: Some(QueryContext::from(value.flow_context.clone())), - flow_name, - raw_sql: sql, + catalog_name: catalog_name.clone(), + query_context: Some(without_scheduled_time_extension(QueryContext::from( + value.flow_context.clone(), + ))), + flow_name: flow_name.clone(), + raw_sql: sql.clone(), expire_after, eval_interval_secs: eval_interval, - comment, + comment: comment.clone(), options, status: if value.is_active() { FlowStatus::Active @@ -624,6 +790,7 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa }, created_time: create_time, updated_time: chrono::Utc::now(), + eval_schedule: eval_schedule.clone(), }; (flow_info, flow_routes) diff --git a/src/common/meta/src/ddl/tests/create_flow.rs b/src/common/meta/src/ddl/tests/create_flow.rs index 2778afcbaa..263f692bc1 100644 --- a/src/common/meta/src/ddl/tests/create_flow.rs +++ b/src/common/meta/src/ddl/tests/create_flow.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::assert_matches; -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use api::v1::flow::CreateRequest; @@ -26,12 +26,16 @@ use crate::ddl::DdlContext; use crate::ddl::create_flow::{ CreateFlowData, CreateFlowProcedure, CreateFlowState, DEFER_ON_MISSING_SOURCE_KEY, FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY, FlowType, defer_on_missing_source, + effective_eval_schedule_from_flow_info, resolve_schedule_defaults_into_task, validate_flow_options, }; use crate::ddl::test_util::create_table::test_create_table_task; use crate::ddl::test_util::flownode_handler::NaiveFlownodeHandler; use crate::error; use crate::key::FlowId; +use crate::key::flow::flow_info::{ + FlowInfoValue, FlowMissedTickPolicy, FlowScheduleConfig, FlowStatus, +}; use crate::key::table_route::TableRouteValue; use crate::rpc::ddl::{CreateFlowTask, FlowQueryContext, QueryContext}; use crate::test_util::{MockFlownodeManager, new_ddl_context}; @@ -66,6 +70,7 @@ pub(crate) fn test_create_flow_task( comment: "".to_string(), sql: "select 1".to_string(), flow_options: Default::default(), + eval_schedule: None, } } @@ -74,6 +79,34 @@ fn enable_defer_on_missing_source(task: &mut CreateFlowTask) { .insert(DEFER_ON_MISSING_SOURCE_KEY.to_string(), "true".to_string()); } +fn flow_info_for_schedule_test( + eval_schedule: Option, + eval_interval_secs: Option, + options: HashMap, + created_time_secs: i64, +) -> FlowInfoValue { + let created_time = chrono::DateTime::from_timestamp(created_time_secs, 0).unwrap(); + FlowInfoValue { + source_table_ids: vec![], + all_source_table_names: vec![], + unresolved_source_table_names: vec![], + sink_table_name: TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "sink"), + flownode_ids: BTreeMap::new(), + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + query_context: None, + flow_name: "flow".to_string(), + raw_sql: "select 1".to_string(), + expire_after: None, + eval_interval_secs, + comment: String::new(), + options, + status: FlowStatus::Active, + created_time, + updated_time: created_time, + eval_schedule, + } +} + #[tokio::test] async fn test_create_flow_source_table_not_found() { let source_table_names = vec![TableName::new( @@ -292,6 +325,248 @@ fn test_validate_flow_options_allows_incremental_read_option() { validate_flow_options(&task).unwrap(); } +#[test] +fn test_validate_flow_options_rejects_schedule_and_internal_keys_as_unknown() { + for key in [ + "eval_interval_anchor", + "eval_interval_start", + "eval_interval_missed_tick_policy", + "eval_interval_catchup_max_runs", + "eval_interval_catchup_max_lag", + "__greptime_internal_eval_schedule", + ] { + let mut task = test_create_flow_task( + "my_flow", + vec![], + TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_sink_table"), + false, + ); + task.eval_interval_secs = Some(300); + task.flow_options + .insert(key.to_string(), "value".to_string()); + + let err = validate_flow_options(&task).unwrap_err(); + assert!( + err.to_string() + .contains(&format!("Unknown flow option '{key}'")), + "unexpected error for {key}: {err}" + ); + } +} + +#[test] +fn test_validate_flow_options_rejects_non_positive_eval_interval() { + for secs in [0, -1] { + let mut task = test_create_flow_task( + "my_flow", + vec![], + TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_sink_table"), + false, + ); + task.eval_interval_secs = Some(secs); + + let err = validate_flow_options(&task).unwrap_err(); + assert!(err.to_string().contains("EVAL INTERVAL must be positive")); + } +} + +#[test] +fn test_resolved_schedule_defaults_in_create_request() { + let mut task = test_create_flow_task( + "my_flow", + vec![], + TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_sink_table"), + false, + ); + task.eval_interval_secs = Some(300); + + // Simulate on_prepare: resolve defaults into task. + resolve_schedule_defaults_into_task(&mut task, None); + + // Verify typed schedule config is populated. + let sched = task.eval_schedule.as_ref().unwrap(); + assert!(sched.start_secs > 0); + assert_eq!(sched.anchor_secs, 0); + assert_eq!( + sched.missed_tick_policy, + FlowMissedTickPolicy::BoundedCatchUp + ); + assert_eq!(sched.catchup_max_runs, 3); + assert!(sched.catchup_max_lag_secs >= 300); + + // Verify schedule option names are NOT in flow_options. + for key in &[ + "eval_interval_start", + "eval_interval_anchor", + "eval_interval_missed_tick_policy", + "eval_interval_catchup_max_runs", + "eval_interval_catchup_max_lag", + ] { + assert!( + !task.flow_options.contains_key(*key), + "flow_options should not contain {key}" + ); + } + + // Idempotent: second call does not overwrite. + let start_before = sched.start_secs; + resolve_schedule_defaults_into_task(&mut task, None); + assert_eq!( + task.eval_schedule.as_ref().unwrap().start_secs, + start_before + ); + + let flow_context = FlowQueryContext { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + timezone: "UTC".to_string(), + extensions: HashMap::new(), + channel: 0, + snapshot_seqs: HashMap::new(), + sst_min_sequences: HashMap::new(), + }; + + // Construct CreateFlowData and verify CreateRequest carries internal key. + let data = CreateFlowData { + state: CreateFlowState::CreateFlows, + task, + flow_id: Some(1024), + peers: vec![], + source_table_ids: vec![], + unresolved_source_table_names: vec![], + flow_context: flow_context.clone(), + prev_flow_info_value: None, + did_replace: false, + flow_type: Some(FlowType::Batching), + }; + + let data2 = CreateFlowData { + state: CreateFlowState::CreateMetadata, + task: data.task.clone(), + flow_id: Some(1024), + peers: vec![], + source_table_ids: vec![], + unresolved_source_table_names: vec![], + flow_context, + prev_flow_info_value: None, + did_replace: false, + flow_type: Some(FlowType::Batching), + }; + + let request: CreateRequest = (&data).into(); + // Verify internal key is present in the runtime request. + assert!( + request + .flow_options + .contains_key("__greptime_internal_eval_schedule"), + "CreateRequest must contain internal eval schedule key" + ); + + // Verify FlowInfoValue has eval_schedule populated, and options do NOT + // contain schedule keys. + let (flow_info, _) = <(FlowInfoValue, Vec<(_, _)>)>::from(&data2); + assert!( + flow_info.eval_schedule.is_some(), + "FlowInfoValue must have eval_schedule set" + ); + for key in &[ + "eval_interval_start", + "eval_interval_anchor", + "eval_interval_missed_tick_policy", + "eval_interval_catchup_max_runs", + "eval_interval_catchup_max_lag", + ] { + assert!( + !flow_info.options.contains_key(*key), + "FlowInfoValue.options should not contain {key}" + ); + } +} + +#[test] +fn test_effective_eval_schedule_prefers_typed_config() { + let typed = FlowScheduleConfig { + anchor_secs: 0, + start_secs: 999, + missed_tick_policy: FlowMissedTickPolicy::BoundedCatchUp, + catchup_max_runs: 9, + catchup_max_lag_secs: 900, + }; + let unrelated_options = HashMap::from([("unrelated".to_string(), "value".to_string())]); + let flow_info = + flow_info_for_schedule_test(Some(typed.clone()), Some(60), unrelated_options, 1); + + let effective = effective_eval_schedule_from_flow_info(&flow_info).unwrap(); + assert_eq!(effective, typed); +} + +#[test] +fn test_effective_eval_schedule_uses_created_time_default() { + let flow_info = flow_info_for_schedule_test(None, Some(60), HashMap::new(), 61); + + let effective = effective_eval_schedule_from_flow_info(&flow_info).unwrap(); + assert_eq!(effective.anchor_secs, 0); + assert_eq!(effective.start_secs, 120); + assert_eq!( + effective.missed_tick_policy, + FlowMissedTickPolicy::BoundedCatchUp + ); + assert_eq!(effective.catchup_max_runs, 3); + assert_eq!(effective.catchup_max_lag_secs, 300); +} + +#[test] +fn test_effective_eval_schedule_none_without_eval_interval() { + let flow_info = flow_info_for_schedule_test(None, None, HashMap::new(), 61); + assert!(effective_eval_schedule_from_flow_info(&flow_info).is_none()); +} + +#[test] +fn test_effective_eval_schedule_deterministic_on_old_metadata() { + // Old metadata: no typed eval_schedule, but has eval_interval_secs and + // a fixed created_time. Repeated calls must produce the same config. + let flow_info = flow_info_for_schedule_test(None, Some(60), HashMap::new(), 61); + let first = effective_eval_schedule_from_flow_info(&flow_info).unwrap(); + let second = effective_eval_schedule_from_flow_info(&flow_info).unwrap(); + assert_eq!(first, second); + // Sanity: the derived values are based on created_time=61 + interval=60. + assert_eq!(first.anchor_secs, 0); + assert_eq!(first.start_secs, 120); // ceil(61, 0, 60) = 120 + assert_eq!( + first.missed_tick_policy, + FlowMissedTickPolicy::BoundedCatchUp + ); + assert_eq!(first.catchup_max_runs, 3); + assert_eq!(first.catchup_max_lag_secs, 300); // max(300, 3*60) = 300 +} + +#[test] +fn test_old_flow_info_json_without_eval_schedule_deserializes() { + let typed = FlowScheduleConfig { + anchor_secs: 0, + start_secs: 999, + missed_tick_policy: FlowMissedTickPolicy::BoundedCatchUp, + catchup_max_runs: 9, + catchup_max_lag_secs: 900, + }; + let flow_info = flow_info_for_schedule_test(Some(typed), Some(60), HashMap::new(), 61); + let mut json = serde_json::to_value(&flow_info).unwrap(); + json.as_object_mut().unwrap().remove("eval_schedule"); + + let decoded: FlowInfoValue = serde_json::from_value(json).unwrap(); + assert!(decoded.eval_schedule.is_none()); + + let effective = effective_eval_schedule_from_flow_info(&decoded).unwrap(); + assert_eq!(effective.anchor_secs, 0); + assert_eq!(effective.start_secs, 120); + assert_eq!( + effective.missed_tick_policy, + FlowMissedTickPolicy::BoundedCatchUp + ); + assert_eq!(effective.catchup_max_runs, 3); + assert_eq!(effective.catchup_max_lag_secs, 300); +} + #[tokio::test] async fn test_create_flow_rejects_unknown_option_in_meta_task() { let mut task = test_create_flow_task( @@ -702,6 +977,7 @@ fn create_test_flow_task_for_serialization() -> CreateFlowTask { comment: "test comment".to_string(), sql: "SELECT * FROM source_table".to_string(), flow_options: HashMap::new(), + eval_schedule: None, } } @@ -835,6 +1111,44 @@ fn test_flow_query_context_conversion_from_query_context() { assert_eq!(flow_context.sst_min_sequences, HashMap::from([(1, 8)])); } +#[test] +fn test_create_flow_procedure_strips_scheduled_time_extension() { + let task = test_create_flow_task( + "my_flow", + vec![], + TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_sink_table"), + false, + ); + let ddl_context = new_ddl_context(Arc::new(MockFlownodeManager::new(NaiveFlownodeHandler))); + let mut query_ctx = test_query_context(); + query_ctx.extensions.insert( + "flow.scheduled_time_millis".to_string(), + "1700000000000".to_string(), + ); + query_ctx + .extensions + .insert("flow.other".to_string(), "kept".to_string()); + + let procedure = CreateFlowProcedure::new(task, query_ctx, ddl_context); + + assert!( + !procedure + .data + .flow_context + .extensions + .contains_key("flow.scheduled_time_millis") + ); + assert_eq!( + procedure + .data + .flow_context + .extensions + .get("flow.other") + .map(String::as_str), + Some("kept") + ); +} + #[test] fn test_flow_info_conversion_with_flow_context() { let flow_context = FlowQueryContext { @@ -871,6 +1185,53 @@ fn test_flow_info_conversion_with_flow_context() { assert!(query_context.extensions().is_empty()); } +#[test] +fn test_flow_info_conversion_strips_scheduled_time_extension() { + let flow_context = FlowQueryContext { + catalog: "info_catalog".to_string(), + schema: "info_schema".to_string(), + timezone: "UTC".to_string(), + extensions: HashMap::from([ + ( + "flow.scheduled_time_millis".to_string(), + "1700000000000".to_string(), + ), + ("flow.other".to_string(), "kept".to_string()), + ]), + channel: 0, + snapshot_seqs: HashMap::new(), + sst_min_sequences: HashMap::new(), + }; + + let data = CreateFlowData { + state: CreateFlowState::CreateMetadata, + task: create_test_flow_task_for_serialization(), + flow_id: Some(123), + peers: vec![], + source_table_ids: vec![], + unresolved_source_table_names: vec![], + flow_context, + prev_flow_info_value: None, + did_replace: false, + flow_type: Some(FlowType::Batching), + }; + + let (flow_info, _routes) = (&data).into(); + let query_context = flow_info.query_context.unwrap(); + assert!( + !query_context + .extensions() + .contains_key("flow.scheduled_time_millis") + ); + assert_eq!( + query_context + .extensions() + .get("flow.other") + .map(String::as_str), + Some("kept") + ); +} + #[test] fn test_mixed_serialization_format_support() { // Test that we can deserialize both old and new formats diff --git a/src/common/meta/src/key/flow.rs b/src/common/meta/src/key/flow.rs index bc9aaaa6b3..50c7e0ac38 100644 --- a/src/common/meta/src/key/flow.rs +++ b/src/common/meta/src/key/flow.rs @@ -535,6 +535,7 @@ mod tests { status: FlowStatus::Active, created_time: chrono::Utc::now(), updated_time: chrono::Utc::now(), + eval_schedule: None, } } @@ -790,6 +791,7 @@ mod tests { status: FlowStatus::Active, created_time: chrono::Utc::now(), updated_time: chrono::Utc::now(), + eval_schedule: None, }; let err = flow_metadata_manager .create_flow_metadata(flow_id, flow_value, flow_routes.clone()) @@ -1170,6 +1172,7 @@ mod tests { status: FlowStatus::Active, created_time: chrono::Utc::now(), updated_time: chrono::Utc::now(), + eval_schedule: None, }; let err = flow_metadata_manager .update_flow_metadata( diff --git a/src/common/meta/src/key/flow/flow_info.rs b/src/common/meta/src/key/flow/flow_info.rs index b1056902da..ee6a291dd4 100644 --- a/src/common/meta/src/key/flow/flow_info.rs +++ b/src/common/meta/src/key/flow/flow_info.rs @@ -127,6 +127,86 @@ impl<'a> MetadataKey<'a, FlowInfoKeyInner> for FlowInfoKeyInner { } } +/// Internal typed schedule configuration for `EVAL INTERVAL` flows. +/// +/// This struct is the canonical schedule state for `EVAL INTERVAL` flows and +/// is stored alongside `FlowInfoValue` (not inside `options`). +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct FlowScheduleConfig { + /// Anchor timestamp in seconds since Unix epoch (default: 0 = epoch). + pub anchor_secs: i64, + /// Start timestamp in seconds since Unix epoch. + pub start_secs: i64, + /// Policy for handling missed ticks. + #[serde(default)] + pub missed_tick_policy: FlowMissedTickPolicy, + /// Maximum number of catch-up runs when using bounded catch-up. + /// + /// A catch-up run is an evaluation for a scheduled timestamp that is + /// already due but was missed because the flownode was stopped or busy. + #[serde(default = "FlowScheduleConfig::default_catchup_max_runs")] + pub catchup_max_runs: u32, + /// Maximum age (in seconds) of a due scheduled time to still include in catch-up. + #[serde(default = "FlowScheduleConfig::default_catchup_max_lag_secs")] + pub catchup_max_lag_secs: i64, +} + +impl FlowScheduleConfig { + pub const DEFAULT_ANCHOR_SECS: i64 = 0; + pub const DEFAULT_CATCHUP_MAX_RUNS: u32 = 3; + pub const DEFAULT_CATCHUP_MAX_LAG_SECS: i64 = 300; + + pub fn default_catchup_max_runs() -> u32 { + Self::DEFAULT_CATCHUP_MAX_RUNS + } + + pub fn default_catchup_max_lag_secs() -> i64 { + Self::DEFAULT_CATCHUP_MAX_LAG_SECS + } + + pub fn catchup_max_lag_secs_for_interval(eval_interval_secs: i64) -> i64 { + std::cmp::max( + Self::DEFAULT_CATCHUP_MAX_LAG_SECS, + 3_i64.saturating_mul(eval_interval_secs), + ) + } + + pub fn default_with_start(start_secs: i64, eval_interval_secs: i64) -> Self { + Self { + anchor_secs: Self::DEFAULT_ANCHOR_SECS, + start_secs, + missed_tick_policy: FlowMissedTickPolicy::BoundedCatchUp, + catchup_max_runs: Self::DEFAULT_CATCHUP_MAX_RUNS, + catchup_max_lag_secs: Self::catchup_max_lag_secs_for_interval(eval_interval_secs), + } + } +} + +impl Default for FlowScheduleConfig { + fn default() -> Self { + Self { + anchor_secs: Self::DEFAULT_ANCHOR_SECS, + start_secs: 0, + missed_tick_policy: FlowMissedTickPolicy::default(), + catchup_max_runs: Self::default_catchup_max_runs(), + catchup_max_lag_secs: Self::default_catchup_max_lag_secs(), + } + } +} + +/// Policy for handling flow evaluation scheduled times that were missed. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)] +pub enum FlowMissedTickPolicy { + /// Keep the most recent `catchup_max_runs` due scheduled times within + /// `catchup_max_lag_secs`, drop older ones. + #[default] + #[serde(rename = "bounded_catch_up")] + BoundedCatchUp, + /// Skip all missed scheduled times; only execute the single most recent one. + #[serde(rename = "skip")] + Skip, +} + // The metadata of the flow. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct FlowInfoValue { @@ -175,6 +255,12 @@ pub struct FlowInfoValue { /// The updated time. #[serde(default)] pub updated_time: DateTime, + /// Typed schedule configuration for `EVAL INTERVAL` flows. + /// When `eval_interval_secs` is set, this field carries the resolved + /// schedule parameters (anchor, start, missed-tick policy, catch-up + /// limits). Absent for flows without `EVAL INTERVAL`. + #[serde(default)] + pub eval_schedule: Option, } impl FlowInfoValue { diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index d2b700d30c..1cca31c364 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -1184,6 +1184,11 @@ pub struct CreateFlowTask { pub comment: String, pub sql: String, pub flow_options: HashMap, + /// Typed schedule configuration resolved during `on_prepare`. + /// Not populated from proto; set by the procedure layer after + /// defaults are resolved. + #[serde(default)] + pub eval_schedule: Option, } impl TryFrom for CreateFlowTask { @@ -1222,6 +1227,7 @@ impl TryFrom for CreateFlowTask { comment, sql, flow_options, + eval_schedule: None, }) } } @@ -1240,6 +1246,7 @@ impl From for PbCreateFlowTask { comment, sql, flow_options, + .. }: CreateFlowTask, ) -> Self { PbCreateFlowTask { diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 351d176db0..46c01b8184 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -745,6 +745,7 @@ impl StreamingEngine { sql, flow_options, query_ctx, + .. } = args; let mut node_ctx = self.node_context.write().await; diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 7d451d9038..9e3399c182 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -25,9 +25,12 @@ use api::v1::region::InsertRequests; use catalog::CatalogManager; use common_base::Plugins; use common_error::ext::BoxedError; -use common_meta::ddl::create_flow::FlowType; +use common_meta::ddl::create_flow::{ + FlowType, INTERNAL_EVAL_SCHEDULE_KEY, effective_eval_schedule_from_flow_info, +}; use common_meta::error::Result as MetaResult; use common_meta::key::flow::FlowMetadataManager; +use common_meta::key::flow::flow_info::FlowScheduleConfig; use common_meta::key::flow::flow_state::FlowStat; use common_runtime::JoinHandle; use common_telemetry::{error, info, trace, warn}; @@ -380,6 +383,7 @@ impl FlowDualEngine { comment: Some(info.comment().clone()), sql: info.raw_sql().clone(), flow_options: info.options().clone(), + eval_schedule: effective_eval_schedule_from_flow_info(&info), query_ctx: info .query_context() .clone() @@ -839,7 +843,7 @@ impl common_meta::node_manager::Flownode for FlowDualEngine { eval_interval, comment, sql, - flow_options, + mut flow_options, or_replace, })) => { let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec(); @@ -849,6 +853,10 @@ impl common_meta::node_manager::Flownode for FlowDualEngine { sink_table_name.table_name, ]; let expire_after = expire_after.map(|e| e.value); + + let eval_schedule = decode_internal_eval_schedule(&mut flow_options) + .map_err(to_meta_err(snafu::location!()))?; + let args = CreateFlowArgs { flow_id: task_id.id as u64, sink_table_name, @@ -861,6 +869,7 @@ impl common_meta::node_manager::Flownode for FlowDualEngine { sql: sql.clone(), flow_options, query_ctx, + eval_schedule, }; let ret = self .create_flow(args) @@ -919,6 +928,24 @@ impl common_meta::node_manager::Flownode for FlowDualEngine { } } +/// Decode typed schedule config from the internal transient key emitted by metasrv. +/// Malformed JSON is an internal error rather than a reason to silently fall back. +fn decode_internal_eval_schedule( + flow_options: &mut HashMap, +) -> Result, Error> { + match flow_options.remove(INTERNAL_EVAL_SCHEDULE_KEY) { + Some(json) => serde_json::from_str::(&json) + .map(Some) + .map_err(|err| { + InternalSnafu { + reason: format!("Invalid internal eval schedule payload: {err}"), + } + .build() + }), + None => Ok(None), + } +} + /// return a function to convert `crate::error::Error` to `common_meta::error::Error` fn to_meta_err( location: snafu::Location, @@ -1128,3 +1155,30 @@ impl StreamingEngine { Ok(()) } } + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use common_meta::ddl::create_flow::INTERNAL_EVAL_SCHEDULE_KEY; + + use super::decode_internal_eval_schedule; + use crate::error::Error; + + #[test] + fn test_malformed_internal_eval_schedule_json_is_error() { + let mut flow_options = HashMap::new(); + flow_options.insert( + INTERNAL_EVAL_SCHEDULE_KEY.to_string(), + "not-json".to_string(), + ); + + let err = decode_internal_eval_schedule(&mut flow_options).unwrap_err(); + assert!(matches!( + err, + Error::Internal { reason, .. } + if reason.contains("Invalid internal eval schedule payload") + )); + assert!(!flow_options.contains_key(INTERNAL_EVAL_SCHEDULE_KEY)); + } +} diff --git a/src/flow/src/batching_mode.rs b/src/flow/src/batching_mode.rs index a8bd139d98..bb946e7574 100644 --- a/src/flow/src/batching_mode.rs +++ b/src/flow/src/batching_mode.rs @@ -22,6 +22,7 @@ use session::ReadPreference; mod checkpoint; pub(crate) mod engine; +mod eval_schedule; pub(crate) mod frontend_client; mod state; mod table_creator; diff --git a/src/flow/src/batching_mode/engine.rs b/src/flow/src/batching_mode/engine.rs index bd15654b5e..1285f92e82 100644 --- a/src/flow/src/batching_mode/engine.rs +++ b/src/flow/src/batching_mode/engine.rs @@ -44,6 +44,7 @@ use table::table_reference::TableReference; use tokio::sync::{RwLock, oneshot}; use crate::batching_mode::BatchingModeOptions; +use crate::batching_mode::eval_schedule::EvalSchedule; use crate::batching_mode::frontend_client::FrontendClient; use crate::batching_mode::task::{BatchingTask, TaskArgs}; use crate::batching_mode::time_window::{TimeWindowExpr, find_time_window_expr}; @@ -507,6 +508,7 @@ impl BatchingEngine { sql, flow_options, query_ctx, + eval_schedule: eval_schedule_config, } = args; // or replace logic @@ -629,6 +631,24 @@ impl BatchingEngine { Self::ensure_sql_flow_has_twe_or_eval_interval(eval_interval, phy_expr.is_some())?; } + // Compute typed EvalSchedule from FlowScheduleConfig. + let eval_schedule = { + let interval = eval_interval; + let config = eval_schedule_config.as_ref(); + match EvalSchedule::from_config(interval, config) { + Ok(s) => s, + Err(e) => { + return UnexpectedSnafu { + reason: format!( + "Failed to build eval schedule for flow {}: {}", + flow_id, e + ), + } + .fail(); + } + } + }; + let task_args = TaskArgs { flow_id, query: &sql, @@ -642,6 +662,7 @@ impl BatchingEngine { shutdown_rx: rx, batch_opts, flow_eval_interval: eval_interval.map(|secs| Duration::from_secs(secs as u64)), + eval_schedule, }; let task = BatchingTask::try_new(task_args)?; @@ -1186,6 +1207,7 @@ GROUP BY l.number, time_window shutdown_rx: rx, batch_opts: Arc::new(BatchingModeOptions::default()), flow_eval_interval: None, + eval_schedule: None, }) .unwrap(); diff --git a/src/flow/src/batching_mode/eval_schedule.rs b/src/flow/src/batching_mode/eval_schedule.rs new file mode 100644 index 0000000000..b1bf86c061 --- /dev/null +++ b/src/flow/src/batching_mode/eval_schedule.rs @@ -0,0 +1,353 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Helpers for stable `EVAL INTERVAL` scheduled times. + +pub use common_meta::key::flow::flow_info::{FlowMissedTickPolicy, FlowScheduleConfig}; +use snafu::ensure; + +use crate::error::{InvalidQuerySnafu, Result}; + +/// Schedule for an `EVAL INTERVAL` flow. +#[derive(Debug, Clone, PartialEq)] +pub struct EvalSchedule { + /// Interval between scheduled times in seconds. + pub interval_secs: i64, + /// Anchor timestamp as seconds since Unix epoch. + pub anchor_secs: i64, + /// First scheduled time as seconds since Unix epoch. + pub start_secs: i64, + /// Policy for handling missed scheduled times. + pub missed_tick_policy: FlowMissedTickPolicy, + /// Maximum number of due scheduled times to catch up. + pub max_runs: u32, + /// Maximum age of a due scheduled time to keep for catch-up. + pub max_lag_secs: i64, +} + +impl EvalSchedule { + pub fn from_config( + eval_interval_secs: Option, + config: Option<&FlowScheduleConfig>, + ) -> Result> { + let Some(interval_secs) = eval_interval_secs else { + return Ok(None); + }; + ensure!( + interval_secs > 0, + InvalidQuerySnafu { + reason: format!( + "Invalid eval_interval_secs: must be positive, got {interval_secs}" + ) + } + ); + + Ok(Some(match config { + Some(c) => { + ensure!( + c.catchup_max_runs > 0, + InvalidQuerySnafu { + reason: + "Invalid FlowScheduleConfig.catchup_max_runs: must be positive, got 0" + .to_string() + } + ); + ensure!( + c.catchup_max_lag_secs > 0, + InvalidQuerySnafu { + reason: format!( + "Invalid FlowScheduleConfig.catchup_max_lag_secs: must be positive, got {}", + c.catchup_max_lag_secs + ) + } + ); + + Self { + interval_secs, + anchor_secs: c.anchor_secs, + start_secs: c.start_secs, + missed_tick_policy: c.missed_tick_policy, + max_runs: c.catchup_max_runs, + max_lag_secs: c.catchup_max_lag_secs, + } + } + None => { + let c = FlowScheduleConfig::default_with_start(0, interval_secs); + Self { + interval_secs, + anchor_secs: c.anchor_secs, + start_secs: c.start_secs, + missed_tick_policy: c.missed_tick_policy, + max_runs: c.catchup_max_runs, + max_lag_secs: c.catchup_max_lag_secs, + } + } + })) + } + + /// Returns the next scheduled time strictly after `cursor_secs`. + pub fn next_scheduled_time_after(&self, cursor_secs: i64) -> i64 { + next_in_sequence(cursor_secs, self.start_secs, self.interval_secs) + } +} + +fn next_in_sequence(cursor: i64, start: i64, interval: i64) -> i64 { + if interval <= 0 { + return cursor.saturating_add(1).max(start); + } + if cursor < start { + return start; + } + + let k = (cursor - start) / interval; + start.saturating_add((k + 1).saturating_mul(interval)) +} + +fn first_due_in_sequence(cursor: i64, start: i64, interval: i64) -> i64 { + if interval <= 0 { + return cursor.saturating_add(1).max(start); + } + if cursor < start { + start + } else { + next_in_sequence(cursor, start, interval) + } +} + +/// Scheduled times selected for execution in one scheduler pass. +/// +/// A scheduled time is the logical evaluation timestamp for one flow run. When +/// executing a timestamp from `scheduled_times_secs`, SQL/TQL `now()` is bound +/// to that timestamp instead of the wall-clock execution time. +#[derive(Debug, Clone, PartialEq)] +pub struct DueScheduledTimes { + /// Scheduled times to execute, ordered oldest to newest. + pub scheduled_times_secs: Vec, + /// Number of due scheduled times skipped by lag or max-runs limits. + pub skipped: u64, +} + +/// Select due scheduled times `<= wall_now_secs` without materializing all missed ticks. +pub fn select_due_scheduled_times( + schedule: &EvalSchedule, + cursor_secs: i64, + wall_now_secs: i64, +) -> Option { + if schedule.interval_secs <= 0 { + return None; + } + + let first_due = first_due_in_sequence(cursor_secs, schedule.start_secs, schedule.interval_secs); + if first_due > wall_now_secs { + return Some(DueScheduledTimes { + scheduled_times_secs: vec![], + skipped: 0, + }); + } + + let total_count = ((wall_now_secs - first_due) / schedule.interval_secs) as u64 + 1; + match schedule.missed_tick_policy { + FlowMissedTickPolicy::Skip => { + let last = first_due + (total_count as i64 - 1) * schedule.interval_secs; + Some(DueScheduledTimes { + scheduled_times_secs: vec![last], + skipped: total_count.saturating_sub(1), + }) + } + FlowMissedTickPolicy::BoundedCatchUp => { + let cutoff = wall_now_secs.saturating_sub(schedule.max_lag_secs); + let skipped_by_cutoff = if first_due >= cutoff { + 0 + } else { + ((cutoff - first_due + schedule.interval_secs - 1) / schedule.interval_secs) as u64 + } + .min(total_count); + + let remaining = total_count.saturating_sub(skipped_by_cutoff); + if remaining == 0 { + return Some(DueScheduledTimes { + scheduled_times_secs: vec![], + skipped: total_count, + }); + } + + // max_lag_secs decides which missed scheduled times are recent enough to + // run; max_runs caps how many of those times we execute + // back-to-back in one scheduler pass. + let keep_count = remaining.min(schedule.max_runs as u64); + let keep_start = skipped_by_cutoff + remaining.saturating_sub(keep_count); + let scheduled_times_secs = (0..keep_count) + .map(|i| first_due + (keep_start as i64 + i as i64) * schedule.interval_secs) + .collect::>(); + + Some(DueScheduledTimes { + scheduled_times_secs, + skipped: total_count - keep_count, + }) + } + } +} + +/// Ceils `time` to the next `anchor + k * interval` boundary. +pub fn ceil_to_boundary(time: i64, anchor: i64, interval: i64) -> i64 { + if interval <= 0 { + return time; + } + if time <= anchor { + return anchor; + } + + let diff = i128::from(time) - i128::from(anchor); + let interval = i128::from(interval); + let k = (diff + interval - 1) / interval; + let boundary = i128::from(anchor) + k * interval; + + boundary.clamp(i128::from(i64::MIN), i128::from(i64::MAX)) as i64 +} + +#[cfg(test)] +mod test { + use super::*; + + fn schedule( + start: i64, + policy: FlowMissedTickPolicy, + max_runs: u32, + max_lag_secs: i64, + ) -> EvalSchedule { + EvalSchedule { + interval_secs: 60, + anchor_secs: 0, + start_secs: start, + missed_tick_policy: policy, + max_runs, + max_lag_secs, + } + } + + fn config(policy: FlowMissedTickPolicy) -> FlowScheduleConfig { + FlowScheduleConfig { + anchor_secs: 10, + start_secs: 70, + missed_tick_policy: policy, + catchup_max_runs: 4, + catchup_max_lag_secs: 600, + } + } + + #[test] + fn ceil_to_boundary_handles_anchor_and_interval_edges() { + assert_eq!(ceil_to_boundary(-10, 0, 60), 0); + assert_eq!(ceil_to_boundary(0, 0, 60), 0); + assert_eq!(ceil_to_boundary(1, 0, 60), 60); + assert_eq!(ceil_to_boundary(60, 0, 60), 60); + assert_eq!(ceil_to_boundary(101, 100, 60), 160); + assert_eq!(ceil_to_boundary(50, 0, 0), 50); + assert_eq!(ceil_to_boundary(i64::MAX, 0, 60), i64::MAX); + assert_eq!(ceil_to_boundary(i64::MAX - 1, i64::MIN, 60), i64::MAX); + } + + #[test] + fn from_config_maps_typed_config_and_defaults() { + assert!(EvalSchedule::from_config(None, None).unwrap().is_none()); + assert!(EvalSchedule::from_config(Some(0), None).is_err()); + + let from_typed = + EvalSchedule::from_config(Some(300), Some(&config(FlowMissedTickPolicy::Skip))) + .unwrap() + .unwrap(); + assert_eq!(from_typed.interval_secs, 300); + assert_eq!(from_typed.anchor_secs, 10); + assert_eq!(from_typed.start_secs, 70); + assert_eq!(from_typed.missed_tick_policy, FlowMissedTickPolicy::Skip); + assert_eq!(from_typed.max_runs, 4); + assert_eq!(from_typed.max_lag_secs, 600); + + let defaulted = EvalSchedule::from_config(Some(300), None).unwrap().unwrap(); + assert_eq!(defaulted.start_secs, 0); + assert_eq!(defaulted.max_runs, 3); + assert_eq!(defaulted.max_lag_secs, 900); + } + + #[test] + fn from_config_rejects_invalid_catchup_limits() { + let mut c = config(FlowMissedTickPolicy::BoundedCatchUp); + c.catchup_max_runs = 0; + assert!(EvalSchedule::from_config(Some(300), Some(&c)).is_err()); + + let mut c = config(FlowMissedTickPolicy::BoundedCatchUp); + c.catchup_max_lag_secs = 0; + assert!(EvalSchedule::from_config(Some(300), Some(&c)).is_err()); + } + + #[test] + fn next_scheduled_time_after_respects_start_sequence() { + let s = schedule(50, FlowMissedTickPolicy::BoundedCatchUp, 3, 300); + assert_eq!(s.next_scheduled_time_after(0), 50); + assert_eq!(s.next_scheduled_time_after(50), 110); + assert_eq!(s.next_scheduled_time_after(100), 110); + } + + #[test] + fn due_scheduled_time_selection_handles_empty_and_start_boundary() { + let s = schedule(120, FlowMissedTickPolicy::BoundedCatchUp, 10, 3600); + assert_eq!( + select_due_scheduled_times(&s, 0, 100) + .unwrap() + .scheduled_times_secs, + Vec::::new() + ); + assert_eq!( + select_due_scheduled_times(&s, 0, 300) + .unwrap() + .scheduled_times_secs, + vec![120, 180, 240, 300] + ); + } + + #[test] + fn bounded_catch_up_applies_lag_and_max_runs() { + let s = schedule(0, FlowMissedTickPolicy::BoundedCatchUp, 2, 180); + let due = select_due_scheduled_times(&s, 0, 600).unwrap(); + assert_eq!(due.scheduled_times_secs, vec![540, 600]); + assert_eq!(due.skipped, 8); + } + + #[test] + fn bounded_catch_up_can_skip_all_due_scheduled_times() { + let s = schedule(0, FlowMissedTickPolicy::BoundedCatchUp, 3, 30); + let due = select_due_scheduled_times(&s, 0, 100).unwrap(); + assert!(due.scheduled_times_secs.is_empty()); + assert_eq!(due.skipped, 1); + } + + #[test] + fn skip_policy_keeps_only_latest_due_scheduled_time() { + let s = schedule(0, FlowMissedTickPolicy::Skip, 5, 3600); + let due = select_due_scheduled_times(&s, 0, 300).unwrap(); + assert_eq!(due.scheduled_times_secs, vec![300]); + assert_eq!(due.skipped, 4); + } + + #[test] + fn huge_missed_gap_allocates_only_kept_scheduled_times() { + let s = schedule(0, FlowMissedTickPolicy::BoundedCatchUp, 5, 3600); + let due = select_due_scheduled_times(&s, 0, 86400).unwrap(); + assert_eq!( + due.scheduled_times_secs, + vec![86160, 86220, 86280, 86340, 86400] + ); + assert_eq!(due.skipped, 1435); + } +} diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index 7ffcded1b1..089adbb0ed 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -45,6 +45,7 @@ use tokio::time::Instant; use crate::batching_mode::BatchingModeOptions; use crate::batching_mode::checkpoint::checkpoint_mode_label; +use crate::batching_mode::eval_schedule::{EvalSchedule, select_due_scheduled_times}; use crate::batching_mode::frontend_client::{FrontendClient, PeerDesc}; use crate::batching_mode::state::{ CheckpointMode, DirtyTimeWindows, FilterExprInfo, TaskState, to_df_literal, @@ -70,6 +71,14 @@ use crate::{Error, FlowId}; mod ckpt; mod inc; +/// Returns the current wall-clock Unix timestamp in seconds. +fn wall_clock_unix_secs() -> i64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as i64 +} + /// The task's config, immutable once created #[derive(Clone)] pub struct TaskConfig { @@ -86,6 +95,8 @@ pub struct TaskConfig { pub query_type: QueryType, pub batch_opts: Arc, pub flow_eval_interval: Option, + /// Typed schedule configuration, pre-parsed at task creation time. + pub eval_schedule: Option, } fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result { @@ -156,6 +167,8 @@ pub struct TaskArgs<'a> { pub shutdown_rx: oneshot::Receiver<()>, pub batch_opts: Arc, pub flow_eval_interval: Option, + /// Typed schedule configuration pre-parsed from `CreateFlowArgs`. + pub eval_schedule: Option, } pub struct PlanInfo { @@ -238,6 +251,7 @@ impl BatchingTask { shutdown_rx, batch_opts, flow_eval_interval, + eval_schedule, }: TaskArgs<'_>, ) -> Result { let mut state = TaskState::with_dirty_time_windows( @@ -265,6 +279,7 @@ impl BatchingTask { query_type: determine_query_type(query, &query_ctx)?, batch_opts, flow_eval_interval, + eval_schedule, }), state: Arc::new(RwLock::new(state)), execution_lock: Arc::new(Mutex::new(())), @@ -275,6 +290,23 @@ impl BatchingTask { self.state.read().unwrap().last_execution_time_millis() } + /// Collect flow-related extensions from the task's query context that should be + /// forwarded to the frontend (e.g. scheduled time). + fn frontend_extensions(&self) -> HashMap { + let ctx = self.state.read().unwrap(); + let all = ctx.query_ctx.extensions(); + let mut flow_exts = HashMap::new(); + // Propagate the scheduled time extension if present so that frontend + // execution can use the same logical time. + if let Some(v) = all.get(query::options::FLOW_SCHEDULED_TIME_MILLIS) { + flow_exts.insert( + query::options::FLOW_SCHEDULED_TIME_MILLIS.to_string(), + v.clone(), + ); + } + flow_exts + } + /// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set) /// /// useful for flush_flow to flush dirty time windows range @@ -329,7 +361,7 @@ impl BatchingTask { /// Validates that the sink table schema can accept this flow's output. /// - /// This is a dry-run of the same schema matching logic used by runtime insert-plan + /// This is a dry-run of the same schema matching logic used by insert-plan /// generation, but without adding dirty-window filters or executing the query. It is used /// during CREATE FLOW to catch existing sink table mismatches early. pub async fn validate_sink_table_schema(&self, engine: &QueryEngineRef) -> Result<(), Error> { @@ -596,9 +628,15 @@ impl BatchingTask { let extensions = self .build_flow_query_extensions(incremental_safe, coverage.is_incremental_delta()) .await?; + let frontend_extensions = self.frontend_extensions(); let extension_refs = extensions .iter() .map(|(key, value)| (*key, value.as_str())) + .chain( + frontend_extensions + .iter() + .map(|(key, value)| (key.as_str(), value.as_str())), + ) .collect::>(); let query_mode = if extensions .iter() @@ -909,37 +947,202 @@ impl BatchingTask { /// start executing query in a loop, break when receive shutdown signal /// - /// any error will be logged when executing query + /// any error will be logged when executing query. + /// + /// Dispatches to: + /// - scheduled loop when `flow_eval_interval.is_some()` + /// - adaptive dirty-window loop otherwise pub async fn start_executing_loop( &self, engine: QueryEngineRef, frontend_client: Arc, + ) { + if self.config.flow_eval_interval.is_some() { + self.start_scheduled_loop(engine, frontend_client).await; + } else { + self.start_adaptive_loop(engine, frontend_client).await; + } + } + + /// Scheduled batching loop for flows with `EVAL INTERVAL`. + /// + /// Uses the pre-parsed `EvalSchedule` from `TaskConfig` and selects due + /// scheduled times using bounded catch-up semantics. Each scheduled time is the + /// scheduled evaluation time used as logical `now()` for that attempt. + /// Each attempt temporarily sets `flow.scheduled_time_millis` on the + /// task's `QueryContext` and executes under the existing `execution_lock`. + /// After every attempt (success, no-op, or failure) the in-memory + /// cursor advances. + async fn start_scheduled_loop( + &self, + engine: QueryEngineRef, + frontend_client: Arc, + ) { + let flow_id_str = self.config.flow_id.to_string(); + + let schedule = match &self.config.eval_schedule { + Some(s) => s.clone(), + None => { + let eval_interval_secs = self + .config + .flow_eval_interval + .map(|d| d.as_secs() as i64) + .expect("checked by caller"); + + // Fallback: no typed config provided. Compute defaults + // anchored at epoch/start=0. + match EvalSchedule::from_config(Some(eval_interval_secs), None) { + Ok(Some(s)) => s, + Ok(None) => { + warn!( + "Flow {}: EVAL INTERVAL set but no schedule parsed; exiting loop", + flow_id_str + ); + return; + } + Err(e) => { + warn!( + "Flow {}: Failed to parse eval schedule: {}; exiting loop", + flow_id_str, e + ); + return; + } + } + } + }; + + // Initial cursor is one interval before start so the first due + // scheduled time is `start_secs`. + let mut cursor_secs = schedule.start_secs.saturating_sub(schedule.interval_secs); + + info!( + "Flow {}: entering scheduled loop, interval={}s, start={}, anchor={}, policy={:?}, max_runs={}, max_lag={}s", + flow_id_str, + schedule.interval_secs, + schedule.start_secs, + schedule.anchor_secs, + schedule.missed_tick_policy, + schedule.max_runs, + schedule.max_lag_secs, + ); + + loop { + if self.is_shutdown_signaled() { + break; + } + + let wall_now_secs = wall_clock_unix_secs(); + + let due = match select_due_scheduled_times(&schedule, cursor_secs, wall_now_secs) { + Some(d) => d, + None => { + warn!( + "Flow {}: Invalid schedule (interval <= 0), exiting loop", + flow_id_str + ); + return; + } + }; + + if due.scheduled_times_secs.is_empty() { + if due.skipped > 0 { + warn!( + "Flow {}: all {} due scheduled times skipped by max-lag, advancing cursor to wall-clock ({wall_now_secs}) to avoid re-skipping", + flow_id_str, due.skipped + ); + cursor_secs = wall_now_secs; + continue; + } + + // No due yet — sleep until the next scheduled time. + let next = schedule.next_scheduled_time_after(cursor_secs); + if next <= wall_now_secs { + // Shouldn't happen given select_due_scheduled_times returned empty, + // but guard against clock skew / logic error. + cursor_secs = wall_now_secs; + continue; + } + let wait_secs = (next - wall_now_secs) as u64; + let wait_dur = Duration::from_secs(wait_secs); + debug!( + "Flow {}: no due scheduled times, sleeping for {}s until next scheduled time at {}", + flow_id_str, wait_secs, next + ); + tokio::time::sleep(wait_dur).await; + continue; + } + + if due.skipped > 0 { + info!( + "Flow {}: {} due scheduled times, {} skipped (catch-up)", + flow_id_str, + due.scheduled_times_secs.len(), + due.skipped + ); + } + + // Execute scheduled times oldest → newest. + for scheduled_time_secs in &due.scheduled_times_secs { + if self.is_shutdown_signaled() { + break; + } + + METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT + .with_label_values(&[&flow_id_str]) + .inc(); + + let outcome = self + .execute_once_serialized_at_scheduled_time( + &engine, + &frontend_client, + *scheduled_time_secs, + ) + .await; + + // Advance cursor regardless of outcome. + cursor_secs = *scheduled_time_secs; + + match outcome.result { + Ok(Some((rows, elapsed))) => { + debug!( + "Flow {}: scheduled time {} completed, rows={}, elapsed={:?}", + flow_id_str, scheduled_time_secs, rows, elapsed + ); + } + Ok(None) => { + debug!( + "Flow {}: scheduled time {} produced no query (no dirty signal or no-op)", + flow_id_str, scheduled_time_secs + ); + } + Err(err) => { + warn!( + "Flow {}: scheduled time {} failed: {:?}", + flow_id_str, scheduled_time_secs, err + ); + METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT + .with_label_values(&[&flow_id_str]) + .inc(); + // Dirty-window restoration is handled by the + // existing `handle_executed_query_failure` inside + // `execute_once_unlocked`. + } + } + } + } + } + + /// Existing adaptive dirty-window loop for flows without `EVAL INTERVAL`. + async fn start_adaptive_loop( + &self, + engine: QueryEngineRef, + frontend_client: Arc, ) { let flow_id_str = self.config.flow_id.to_string(); let mut max_window_cnt = None; - let mut interval = self - .config - .flow_eval_interval - .map(|d| tokio::time::interval(d)); - if let Some(tick) = &mut interval { - tick.tick().await; // pass the first tick immediately - } loop { - // first check if shutdown signal is received - // if so, break the loop - { - let mut state = self.state.write().unwrap(); - match state.shutdown_rx.try_recv() { - Ok(()) => break, - Err(TryRecvError::Closed) => { - warn!( - "Unexpected shutdown flow {}, shutdown anyway", - self.config.flow_id - ); - break; - } - Err(TryRecvError::Empty) => (), - } + if self.is_shutdown_signaled() { + break; } METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT .with_label_values(&[&flow_id_str]) @@ -952,46 +1155,36 @@ impl BatchingTask { .await; match outcome.result { - // normal execute, sleep for some time before doing next query Ok(Some(_)) => { - // can increase max_window_cnt to query more windows next time max_window_cnt = max_window_cnt.map(|cnt| { (cnt + 1).min(self.config.batch_opts.experimental_max_filter_num_per_query) }); - // here use proper ticking if set eval interval - if let Some(eval_interval) = &mut interval { - eval_interval.tick().await; - } else { - // if not explicitly set, just automatically calculate next start time - // using time window size and more args - let sleep_until = { - let state = self.state.write().unwrap(); + let sleep_until = { + let state = self.state.write().unwrap(); - let time_window_size = self - .config - .time_window_expr - .as_ref() - .and_then(|t| *t.time_window_size()); + let time_window_size = self + .config + .time_window_expr + .as_ref() + .and_then(|t| *t.time_window_size()); - let prefer_short_incremental_cadence = state.checkpoint_mode() - == CheckpointMode::Incremental - && !state.is_incremental_disabled(); + let prefer_short_incremental_cadence = state.checkpoint_mode() + == CheckpointMode::Incremental + && !state.is_incremental_disabled(); - state.get_next_start_query_time( - self.config.flow_id, - &time_window_size, - min_refresh, - Some(self.config.batch_opts.query_timeout), - self.config.batch_opts.experimental_max_filter_num_per_query, - prefer_short_incremental_cadence, - ) - }; - - tokio::time::sleep_until(sleep_until).await; + state.get_next_start_query_time( + self.config.flow_id, + &time_window_size, + min_refresh, + Some(self.config.batch_opts.query_timeout), + self.config.batch_opts.experimental_max_filter_num_per_query, + prefer_short_incremental_cadence, + ) }; + + tokio::time::sleep_until(sleep_until).await; } - // no new data, sleep for some time before checking for new data Ok(None) => { debug!( "Flow id = {:?} found no new data, sleep for {:?} then continue", @@ -1000,7 +1193,6 @@ impl BatchingTask { tokio::time::sleep(min_refresh).await; continue; } - // TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed Err(err) => { METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT .with_label_values(&[&flow_id_str]) @@ -1008,23 +1200,87 @@ impl BatchingTask { match outcome.new_query { Some(query) => { common_telemetry::error!(err; "Failed to execute query for flow={} with query: {}", self.config.flow_id, query.plan); - // TODO(discord9): add some backoff here? half the query time window or what - // backoff meaning use smaller `max_window_cnt` for next query - - // since last query failed, we should not try to query too many windows max_window_cnt = Some(1); } None => { common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id) } } - // also sleep for a little while before try again to prevent flooding logs tokio::time::sleep(min_refresh).await; } } } } + /// Check whether the shutdown signal has been received. + fn is_shutdown_signaled(&self) -> bool { + let mut state = self.state.write().unwrap(); + match state.shutdown_rx.try_recv() { + Ok(()) | Err(TryRecvError::Closed) => true, + Err(TryRecvError::Empty) => false, + } + } + + /// Execute one scheduled attempt, temporarily setting + /// `flow.scheduled_time_millis` on the task's QueryContext so + /// SQL/TQL `now()` resolves to the logical scheduled time. + /// + /// The extension is removed after the attempt so a later manual + /// `flush_flow` does not reuse a stale scheduled time. + async fn execute_once_serialized_at_scheduled_time( + &self, + engine: &QueryEngineRef, + frontend_client: &Arc, + scheduled_time_secs: i64, + ) -> ExecuteOnceOutcome { + let _execution_guard = self.execution_lock.lock().await; + + struct QueryContextRestoreGuard { + state: Arc>, + old_ctx: Option, + } + + impl Drop for QueryContextRestoreGuard { + fn drop(&mut self) { + let Some(old_ctx) = self.old_ctx.take() else { + return; + }; + if let Ok(mut state) = self.state.write() { + state.query_ctx = old_ctx; + } + } + } + + // Clone the current QueryContext and add the scheduled time + // extension, then swap it into the task state for this attempt. + let old_ctx = { + let mut state = self.state.write().unwrap(); + let old = state.query_ctx.clone(); + let mut new_ctx = (*old).clone(); + new_ctx.set_extension( + query::options::FLOW_SCHEDULED_TIME_MILLIS, + (scheduled_time_secs.saturating_mul(1000)).to_string(), + ); + state.query_ctx = Arc::new(new_ctx); + old + }; + let restore_guard = QueryContextRestoreGuard { + state: self.state.clone(), + old_ctx: Some(old_ctx), + }; + + let outcome = self + .execute_once_unlocked(engine, frontend_client, None) + .await; + + // Restore while still holding `execution_lock` so no future manual + // flush can observe the temporary scheduled time. The guard also + // restores during unwind/cancellation. + drop(restore_guard); + + outcome + } + /// Generate the create table SQL /// /// the auto created table will automatically added a `update_at` Milliseconds DEFAULT now() column in the end @@ -1088,7 +1344,7 @@ impl BatchingTask { (None, QueryType::Sql) if self.config.flow_eval_interval.is_none() => { return UnexpectedSnafu { reason: format!( - "Flow id={} reached runtime without a time-window expression or EVAL INTERVAL; create-flow validation should have rejected it", + "Flow id={} reached execution without a time-window expression or EVAL INTERVAL; create-flow validation should have rejected it", self.config.flow_id ), } diff --git a/src/flow/src/batching_mode/task/test.rs b/src/flow/src/batching_mode/task/test.rs index db773ed4f0..6f7daff62e 100644 --- a/src/flow/src/batching_mode/task/test.rs +++ b/src/flow/src/batching_mode/task/test.rs @@ -26,10 +26,13 @@ use common_recordbatch::RecordBatch; use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry}; use datatypes::data_type::ConcreteDataType as CDT; use datatypes::schema::ColumnSchema; -use datatypes::vectors::{TimestampMillisecondVector, UInt32Vector, VectorRef}; +use datatypes::vectors::{ + TimestampMillisecondVector, TimestampNanosecondVector, UInt32Vector, VectorRef, +}; use pretty_assertions::assert_eq; use query::options::{ - FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY, QueryOptions, + FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY, FLOW_SCHEDULED_TIME_MILLIS, + QueryOptions, }; use session::context::QueryContext; use snafu::ResultExt; @@ -115,6 +118,7 @@ async fn new_test_task_engine_and_plan_with_query_and_opts( shutdown_rx: rx, batch_opts, flow_eval_interval: None, + eval_schedule: None, }) .unwrap(); @@ -240,6 +244,7 @@ async fn new_time_window_test_task_with_query(query: &str) -> TestTaskParts { shutdown_rx: rx, batch_opts: incremental_batch_opts(), flow_eval_interval: None, + eval_schedule: None, }) .unwrap(); @@ -369,6 +374,352 @@ async fn assert_unscoped_failure_restore( ); } +// --- scheduled-time QueryContext restore regression tests --- + +/// Register a sink table whose schema matches the output of a `date_bin` +/// time-window-expression query (columns: `number` uint32, `time_window` timestamp). +fn register_twe_sink(query_engine: &QueryEngineRef, table_name: &str, table_id: u32) { + let schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("number", CDT::uint32_datatype(), false), + ColumnSchema::new("time_window", CDT::timestamp_millisecond_datatype(), false) + .with_time_index(true), + ])); + let columns: Vec = vec![ + Arc::new(UInt32Vector::from_slice([1_u32])), + Arc::new(TimestampMillisecondVector::from_slice([0_i64])), + ]; + let recordbatch = RecordBatch::new(schema, columns).unwrap(); + let table = MemTable::table(table_name, recordbatch); + let request = RegisterTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table_name: table_name.to_string(), + table_id, + table, + }; + let catalog_manager = query_engine.engine_state().catalog_manager(); + let memory_catalog = catalog_manager + .as_any() + .downcast_ref::() + .unwrap(); + memory_catalog.register_table_sync(request).unwrap(); +} + +fn register_scheduled_now_sink(query_engine: &QueryEngineRef, table_name: &str, table_id: u32) { + let schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("ts", CDT::timestamp_nanosecond_datatype(), false).with_time_index(true), + ColumnSchema::new("number", CDT::uint32_datatype(), false), + ])); + let columns: Vec = vec![ + Arc::new(TimestampNanosecondVector::from_slice([0_i64])), + Arc::new(UInt32Vector::from_slice([1_u32])), + ]; + let recordbatch = RecordBatch::new(schema, columns).unwrap(); + let table = MemTable::table(table_name, recordbatch); + let request = RegisterTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table_name: table_name.to_string(), + table_id, + table, + }; + let catalog_manager = query_engine.engine_state().catalog_manager(); + let memory_catalog = catalog_manager + .as_any() + .downcast_ref::() + .unwrap(); + memory_catalog.register_table_sync(request).unwrap(); +} + +struct CaptureScheduledNowHandler { + expected_extension: String, + captured_sql: Arc>>, + query_engine: QueryEngineRef, +} + +#[async_trait::async_trait] +impl crate::batching_mode::frontend_client::GrpcQueryHandlerWithBoxedError + for CaptureScheduledNowHandler +{ + async fn do_query( + &self, + query: api::v1::greptime_request::Request, + ctx: QueryContextRef, + ) -> std::result::Result { + assert_eq!( + ctx.extension(FLOW_SCHEDULED_TIME_MILLIS), + Some(self.expected_extension.as_str()) + ); + + let api::v1::greptime_request::Request::Query(api::v1::QueryRequest { + query: Some(api::v1::query_request::Query::Sql(sql)), + .. + }) = query + else { + panic!("expected scheduled SQL flow to send a SQL query, got {query:?}"); + }; + + let planned = sql_to_df_plan(ctx, self.query_engine.clone(), &sql, true) + .await + .unwrap(); + assert_sql_uses_scheduled_time(&planned.to_string()); + + *self.captured_sql.lock().unwrap() = Some(sql); + Ok(Output::new_with_affected_rows(1)) + } +} + +fn assert_sql_uses_scheduled_time(sql: &str) { + let lower = sql.to_ascii_lowercase(); + assert!(!lower.contains("now()"), "SQL still contains now(): {sql}"); + assert!( + sql.contains("2023-11-14") || sql.contains("1700000000"), + "SQL does not contain the scheduled date or epoch value: {sql}" + ); + assert!( + sql.contains("22:13:20") || sql.contains("1700000000"), + "SQL does not contain the scheduled time or epoch value: {sql}" + ); +} + +/// After a scheduled-time attempt fails (missing sink), the +/// `FLOW_SCHEDULED_TIME_MILLIS` extension must not leak into +/// `TaskState.query_ctx` or `frontend_extensions()`. +#[tokio::test] +async fn test_scheduled_time_ctx_restored_on_error() { + let TestTaskParts { + task, query_engine, .. + } = new_test_task_engine_and_plan_with_query( + "SELECT number, ts FROM numbers_with_ts", + "missing_sink", + ) + .await; + let (frontend_client, _handler) = + FrontendClient::from_empty_grpc_handler(QueryOptions::default()); + let frontend_client = Arc::new(frontend_client); + + // Before: no scheduled time extension + assert_eq!( + task.state + .read() + .unwrap() + .query_ctx + .extension(FLOW_SCHEDULED_TIME_MILLIS), + None + ); + assert!( + !task + .frontend_extensions() + .contains_key(FLOW_SCHEDULED_TIME_MILLIS) + ); + + let scheduled_time_secs = 1700000000i64; + let outcome = task + .execute_once_serialized_at_scheduled_time( + &query_engine, + &frontend_client, + scheduled_time_secs, + ) + .await; + + // Missing sink table → gen_insert_plan_unlocked should fail. + assert!( + outcome.result.is_err(), + "Expected an error (missing sink), got {:?}", + outcome.result + ); + + // After: extension must be restored to absent. + assert_eq!( + task.state + .read() + .unwrap() + .query_ctx + .extension(FLOW_SCHEDULED_TIME_MILLIS), + None, + "FLOW_SCHEDULED_TIME_MILLIS leaked into query_ctx after error" + ); + assert!( + !task + .frontend_extensions() + .contains_key(FLOW_SCHEDULED_TIME_MILLIS), + "FLOW_SCHEDULED_TIME_MILLIS leaked into frontend_extensions after error" + ); +} + +/// After a scheduled-time attempt returns `Ok(None)` (no dirty windows), +/// the `FLOW_SCHEDULED_TIME_MILLIS` extension must not leak into +/// `TaskState.query_ctx`. +#[tokio::test] +async fn test_scheduled_time_ctx_restored_on_no_dirty_windows() { + let query_engine = create_test_query_engine(); + let ctx = QueryContext::arc(); + let plan_query = "SELECT number, date_bin(INTERVAL '5 second', ts) AS time_window \ + FROM numbers_with_ts GROUP BY time_window, number"; + let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), plan_query, true) + .await + .unwrap(); + let (column_name, time_window_expr, _, df_schema) = find_time_window_expr( + &plan, + query_engine.engine_state().catalog_manager().clone(), + ctx.clone(), + ) + .await + .unwrap(); + let time_window_expr = time_window_expr + .map(|expr| { + TimeWindowExpr::from_expr( + &expr, + &column_name, + &df_schema, + &query_engine.engine_state().session_state(), + ) + }) + .transpose() + .unwrap(); + + let sink_table_name = "twe_sink_for_ctx_restore"; + register_twe_sink(&query_engine, sink_table_name, 9101); + + let (_tx, rx) = tokio::sync::oneshot::channel(); + let task = BatchingTask::try_new(TaskArgs { + flow_id: 1, + query: plan_query, + plan: plan.clone(), + time_window_expr, + expire_after: None, + sink_table_name: [ + "greptime".to_string(), + "public".to_string(), + sink_table_name.to_string(), + ], + source_table_names: vec![[ + "greptime".to_string(), + "public".to_string(), + "numbers_with_ts".to_string(), + ]], + query_ctx: ctx, + catalog_manager: query_engine.engine_state().catalog_manager().clone(), + shutdown_rx: rx, + batch_opts: incremental_batch_opts(), + flow_eval_interval: None, + eval_schedule: None, + }) + .unwrap(); + + let (frontend_client, _handler) = + FrontendClient::from_empty_grpc_handler(QueryOptions::default()); + let frontend_client = Arc::new(frontend_client); + + // Before: no scheduled time extension + assert_eq!( + task.state + .read() + .unwrap() + .query_ctx + .extension(FLOW_SCHEDULED_TIME_MILLIS), + None + ); + + let scheduled_time_secs = 1700000000i64; + let outcome = task + .execute_once_serialized_at_scheduled_time( + &query_engine, + &frontend_client, + scheduled_time_secs, + ) + .await; + + // No dirty windows → scoped repair returns None → outcome is Ok(None). + assert!( + matches!(outcome.result, Ok(None)), + "Expected Ok(None) (no dirty windows), got {:?}", + outcome.result + ); + + // After: extension must be restored to absent. + assert_eq!( + task.state + .read() + .unwrap() + .query_ctx + .extension(FLOW_SCHEDULED_TIME_MILLIS), + None, + "FLOW_SCHEDULED_TIME_MILLIS leaked into query_ctx after Ok(None)" + ); +} + +#[tokio::test] +async fn test_scheduled_time_now_is_bound_to_selected_attempt() { + let query_engine = create_test_query_engine(); + let ctx = QueryContext::arc(); + let query = "SELECT date_trunc('second', now()) AS ts, number FROM numbers_with_ts"; + let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), query, true) + .await + .unwrap(); + let sink_table_name = "scheduled_now_sink"; + register_scheduled_now_sink(&query_engine, sink_table_name, 9102); + let (_tx, rx) = tokio::sync::oneshot::channel(); + let task = BatchingTask::try_new(TaskArgs { + flow_id: 1, + query, + plan, + time_window_expr: None, + expire_after: None, + sink_table_name: [ + "greptime".to_string(), + "public".to_string(), + sink_table_name.to_string(), + ], + source_table_names: vec![[ + "greptime".to_string(), + "public".to_string(), + "numbers_with_ts".to_string(), + ]], + query_ctx: ctx, + catalog_manager: query_engine.engine_state().catalog_manager().clone(), + shutdown_rx: rx, + batch_opts: incremental_batch_opts(), + flow_eval_interval: Some(Duration::from_secs(1)), + eval_schedule: None, + }) + .unwrap(); + + let scheduled_time_secs = 1_700_000_000_i64; + let expected_extension = (scheduled_time_secs * 1000).to_string(); + let captured_sql = Arc::new(std::sync::Mutex::new(None)); + let handler: Arc = + Arc::new(CaptureScheduledNowHandler { + expected_extension, + captured_sql: captured_sql.clone(), + query_engine: query_engine.clone(), + }); + let frontend_client = Arc::new(FrontendClient::from_grpc_handler( + Arc::downgrade(&handler), + QueryOptions::default(), + )); + + let outcome = task + .execute_once_serialized_at_scheduled_time( + &query_engine, + &frontend_client, + scheduled_time_secs, + ) + .await; + + assert!( + matches!(outcome.result, Ok(Some((1, _)))), + "scheduled attempt should execute once, got {:?}", + outcome.result + ); + let sent_sql = captured_sql + .lock() + .unwrap() + .clone() + .expect("frontend handler should capture generated SQL"); + assert!(!sent_sql.is_empty()); +} + fn output_with_region_watermarks( watermarks: impl IntoIterator)>, ) -> OutputWithMetrics { @@ -1668,6 +2019,7 @@ async fn test_prepare_plan_for_incremental_disables_on_non_aggregate() { shutdown_rx: rx, batch_opts: incremental_batch_opts(), flow_eval_interval: None, + eval_schedule: None, }) .unwrap(); @@ -1745,6 +2097,7 @@ async fn test_unsafe_incremental_plan_skip_restores_dirty_without_query() { shutdown_rx: rx, batch_opts: incremental_batch_opts(), flow_eval_interval: None, + eval_schedule: None, }) .unwrap(); @@ -1833,6 +2186,7 @@ async fn test_prepare_plan_for_incremental_group_by_without_merge_columns_uses_o shutdown_rx: rx, batch_opts: incremental_batch_opts(), flow_eval_interval: None, + eval_schedule: None, }) .unwrap(); @@ -1916,7 +2270,7 @@ async fn test_unscoped_failure_restores_consumed_dirty_signal() { } #[tokio::test] -async fn test_unscoped_runtime_invariant_error_preserves_dirty_signal() { +async fn test_unscoped_execution_invariant_error_preserves_dirty_signal() { let TestTaskParts { task, query_engine, .. } = new_test_task_engine_and_plan_with_query( @@ -1936,7 +2290,7 @@ async fn test_unscoped_runtime_invariant_error_preserves_dirty_signal() { let err = match result { Err(err) => err, - Ok(_) => panic!("runtime should reject SQL without TWE or EVAL INTERVAL"), + Ok(_) => panic!("execution should reject SQL without TWE or EVAL INTERVAL"), }; assert!(matches!(err, Error::Unexpected { .. }), "{err}"); assert!( diff --git a/src/flow/src/batching_mode/utils.rs b/src/flow/src/batching_mode/utils.rs index 5e033c6ae7..3a4c86b30a 100644 --- a/src/flow/src/batching_mode/utils.rs +++ b/src/flow/src/batching_mode/utils.rs @@ -882,10 +882,16 @@ pub async fn sql_to_df_plan( sql: &str, optimize: bool, ) -> Result { - let stmts = - ParserContext::create_with_dialect(sql, query_ctx.sql_dialect(), ParseOptions::default()) - .map_err(BoxedError::new) - .context(ExternalSnafu)?; + let scheduled_time = query::options::parse_scheduled_time_datetime(&query_ctx.extensions()) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + let stmts = ParserContext::create_with_dialect( + sql, + query_ctx.sql_dialect(), + ParseOptions { scheduled_time }, + ) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; ensure!( stmts.len() == 1, diff --git a/src/flow/src/batching_mode/utils/test.rs b/src/flow/src/batching_mode/utils/test.rs index 9ca1186fb6..7fbd3300c4 100644 --- a/src/flow/src/batching_mode/utils/test.rs +++ b/src/flow/src/batching_mode/utils/test.rs @@ -827,6 +827,7 @@ async fn test_validate_sink_table_schema_rejects_existing_sink_missing_flow_colu shutdown_rx, batch_opts: Arc::new(BatchingModeOptions::default()), flow_eval_interval: None, + eval_schedule: None, }) .unwrap(); diff --git a/src/flow/src/df_optimizer.rs b/src/flow/src/df_optimizer.rs index 614b79ccf1..1cffe464fd 100644 --- a/src/flow/src/df_optimizer.rs +++ b/src/flow/src/df_optimizer.rs @@ -62,7 +62,13 @@ pub async fn apply_df_optimizer( context: "Fail to apply analyzer", })?; - let ctx = OptimizerContext::new(); + let mut ctx = OptimizerContext::new(); + let scheduled_time = query::options::parse_scheduled_time_datetime(&query_ctx.extensions()) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + if let Some(dt) = scheduled_time { + ctx = ctx.with_query_execution_start_time(dt); + } let optimizer = Optimizer::with_rules(vec![ Arc::new(OptimizeProjections::new()), Arc::new(CommonSubexprEliminate::new()), diff --git a/src/flow/src/engine.rs b/src/flow/src/engine.rs index a360a1015d..386610ad71 100644 --- a/src/flow/src/engine.rs +++ b/src/flow/src/engine.rs @@ -40,6 +40,8 @@ pub struct CreateFlowArgs { pub sql: String, pub flow_options: HashMap, pub query_ctx: Option, + /// Typed schedule configuration for `EVAL INTERVAL` flows. + pub eval_schedule: Option, } pub trait FlowEngine { diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index e7ddd7bfbb..79ffa7247b 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -180,7 +180,18 @@ fn normalize_flow_bool_option(key: &str, value: &str) -> Result { fn validate_and_normalize_flow_options( options: HashMap, + eval_interval: Option, ) -> Result> { + // Reject non-positive eval_interval (zero or negative). + if let Some(secs) = eval_interval + && secs <= 0 + { + return InvalidSqlSnafu { + err_msg: format!("EVAL INTERVAL must be positive, got {secs} seconds"), + } + .fail(); + } + options .into_iter() .map(|(key, value)| { @@ -721,7 +732,20 @@ impl StatementExecutor { mut expr: CreateFlowExpr, query_context: QueryContextRef, ) -> Result { - expr.flow_options = validate_and_normalize_flow_options(expr.flow_options)?; + let eval_interval_secs = expr.eval_interval.as_ref().map(|e| e.seconds); + + // Reject non-positive eval_interval (zero or negative). + if let Some(secs) = eval_interval_secs + && secs <= 0 + { + return InvalidSqlSnafu { + err_msg: format!("EVAL INTERVAL must be positive, got {secs} seconds"), + } + .fail(); + } + + expr.flow_options = + validate_and_normalize_flow_options(expr.flow_options, eval_interval_secs)?; let flow_type = self .determine_flow_type(&expr, query_context.clone()) @@ -2578,7 +2602,7 @@ mod test { #[test] fn test_validate_and_normalize_flow_options_empty() { assert!( - validate_and_normalize_flow_options(HashMap::new()) + validate_and_normalize_flow_options(HashMap::new(), None) .unwrap() .is_empty() ); @@ -2595,7 +2619,7 @@ mod test { ]); assert_eq!( - validate_and_normalize_flow_options(options).unwrap(), + validate_and_normalize_flow_options(options, None).unwrap(), HashMap::from([ (DEFER_ON_MISSING_SOURCE_KEY.to_string(), "true".to_string(),), ( @@ -2608,24 +2632,27 @@ mod test { #[test] fn test_validate_and_normalize_flow_options_unknown_option() { - let err = validate_and_normalize_flow_options(HashMap::from([( - "foo".to_string(), - "bar".to_string(), - )])) + let err = validate_and_normalize_flow_options( + HashMap::from([("foo".to_string(), "bar".to_string())]), + None, + ) .unwrap_err(); assert!( err.to_string() - .contains("unknown flow option 'foo', supported options: defer_on_missing_source, experimental_enable_incremental_read") + .contains("unknown flow option 'foo', supported options:") ); } #[test] fn test_validate_and_normalize_flow_options_reserved_option() { - let err = validate_and_normalize_flow_options(HashMap::from([( - FlowType::FLOW_TYPE_KEY.to_string(), - FlowType::BATCHING.to_string(), - )])) + let err = validate_and_normalize_flow_options( + HashMap::from([( + FlowType::FLOW_TYPE_KEY.to_string(), + FlowType::BATCHING.to_string(), + )]), + None, + ) .unwrap_err(); assert!( @@ -2636,10 +2663,13 @@ mod test { #[test] fn test_validate_and_normalize_flow_options_invalid_bool() { - let err = validate_and_normalize_flow_options(HashMap::from([( - DEFER_ON_MISSING_SOURCE_KEY.to_string(), - "not-a-bool".to_string(), - )])) + let err = validate_and_normalize_flow_options( + HashMap::from([( + DEFER_ON_MISSING_SOURCE_KEY.to_string(), + "not-a-bool".to_string(), + )]), + None, + ) .unwrap_err(); assert!( @@ -2667,11 +2697,53 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;"; }; let expr = expr_helper::to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap(); - let err = validate_and_normalize_flow_options(expr.flow_options).unwrap_err(); + let err = validate_and_normalize_flow_options(expr.flow_options, None).unwrap_err(); - assert!(err.to_string().contains( - "unknown flow option 'access_key_id', supported options: defer_on_missing_source" - )); + assert!( + err.to_string() + .contains("unknown flow option 'access_key_id'") + ); + } + + // --- Schedule option tests --- + + #[test] + fn test_eval_interval_rejected_non_positive() { + // Zero eval_interval should be rejected. + let err = validate_and_normalize_flow_options(HashMap::new(), Some(0)).unwrap_err(); + assert!(err.to_string().contains("EVAL INTERVAL must be positive")); + + // Negative eval_interval should be rejected. + let err = validate_and_normalize_flow_options(HashMap::new(), Some(-5)).unwrap_err(); + assert!(err.to_string().contains("EVAL INTERVAL must be positive")); + + // Positive eval_interval should be accepted. + let result = validate_and_normalize_flow_options(HashMap::new(), Some(300)); + assert!(result.is_ok()); + } + + #[test] + fn test_schedule_and_internal_keys_rejected_as_unknown_options() { + for key in [ + "eval_interval_anchor", + "eval_interval_start", + "eval_interval_missed_tick_policy", + "eval_interval_catchup_max_runs", + "eval_interval_catchup_max_lag", + "__greptime_internal_eval_schedule", + ] { + let err = validate_and_normalize_flow_options( + HashMap::from([(key.to_string(), "value".to_string())]), + Some(300), + ) + .unwrap_err(); + + assert!( + err.to_string() + .contains(&format!("unknown flow option '{key}'")), + "unexpected error for {key}: {err}" + ); + } } #[test] diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 7be6656c6f..d2c56461af 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -543,6 +543,19 @@ impl QueryEngine for DatafusionQueryEngine { .config_options .insert(config_options); + // Apply scheduled time from query context if present, so that `now()` / + // `current_timestamp()` functions evaluate against the logical scheduled time + // rather than wall-clock. + match crate::options::parse_scheduled_time_datetime(&query_ctx.extensions()) { + Ok(Some(scheduled_rt)) => { + state.execution_props_mut().query_execution_start_time = Some(scheduled_rt); + } + Ok(None) => {} + Err(err) => { + common_telemetry::warn!(err; "Ignoring invalid scheduled time query extension"); + } + } + QueryEngineContext::new(state, query_ctx) } diff --git a/src/query/src/options.rs b/src/query/src/options.rs index 721d79ea8d..d73f83f88b 100644 --- a/src/query/src/options.rs +++ b/src/query/src/options.rs @@ -14,8 +14,10 @@ use std::collections::HashMap; +use chrono::{DateTime, Utc}; use common_base::memory_limit::MemoryLimit; use serde::{Deserialize, Serialize}; +use session::context::QueryContextRef; use store_api::storage::RegionId; use table::metadata::TableId; @@ -25,6 +27,10 @@ pub const FLOW_INCREMENTAL_AFTER_SEQS: &str = "flow.incremental_after_seqs"; pub const FLOW_INCREMENTAL_MODE: &str = "flow.incremental_mode"; pub const FLOW_RETURN_REGION_SEQ: &str = "flow.return_region_seq"; pub const FLOW_SINK_TABLE_ID: &str = "flow.sink_table_id"; +/// Flow scheduler binding for the logical time of one scheduled attempt. +/// Query planning, SQL/TQL parsing, range-select rewrite and DataFusion +/// execution read this extension so `now()` is stable for the whole attempt. +pub const FLOW_SCHEDULED_TIME_MILLIS: &str = "flow.scheduled_time_millis"; /// Enable by default, set to false to explicitly disable. pub const QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN: &str = "query.enable_remote_dynamic_filter_pushdown"; @@ -279,6 +285,52 @@ fn parse_bool(option_name: &str, value: &str) -> Result { } } +/// Parse the scheduled time (in milliseconds since Unix epoch) from extensions. +/// +/// Returns `Ok(None)` if the extension key is absent, `Ok(Some(millis))` on success, +/// or `Err` if the value is malformed. +pub fn parse_scheduled_time_millis(extensions: &HashMap) -> Result> { + match extensions.get(FLOW_SCHEDULED_TIME_MILLIS) { + Some(val) => val.parse::().map(Some).map_err(|_| { + invalid_query_context_extension(format!( + "Invalid value for {}: {}", + FLOW_SCHEDULED_TIME_MILLIS, val + )) + }), + None => Ok(None), + } +} + +/// Parse the scheduled time from extensions into a [`DateTime`]. +/// +/// Returns `Ok(None)` if the extension key is absent, `Ok(Some(datetime))` on success, +/// or `Err` if the value is malformed. Millis that produce an out-of-range timestamp are +/// also rejected. +pub fn parse_scheduled_time_datetime( + extensions: &HashMap, +) -> Result>> { + match parse_scheduled_time_millis(extensions)? { + Some(millis) => DateTime::from_timestamp_millis(millis) + .map(Some) + .ok_or_else(|| { + invalid_query_context_extension(format!( + "Out-of-range timestamp for {}: {} ms", + FLOW_SCHEDULED_TIME_MILLIS, millis + )) + }), + None => Ok(None), + } +} + +/// Convenience helper: extract scheduled time from a [`QueryContextRef`] as a [`DateTime`]. +/// +/// Errors are silently swallowed, returning `None`. +/// Use [`parse_scheduled_time_datetime`] when a `Result` is needed. +pub fn scheduled_time_from_ctx(query_ctx: &QueryContextRef) -> Option> { + let extensions = query_ctx.extensions(); + parse_scheduled_time_datetime(&extensions).ok().flatten() +} + fn invalid_query_context_extension(reason: String) -> Error { InvalidQueryContextExtensionSnafu { reason }.build() } @@ -551,4 +603,59 @@ mod flow_extension_tests { Some(HashMap::from([(1, 10)])) ); } + + // --- scheduled time helper tests --- + + #[test] + fn test_parse_scheduled_time_millis_absent() { + let exts = HashMap::new(); + assert_eq!(parse_scheduled_time_millis(&exts).unwrap(), None); + } + + #[test] + fn test_parse_scheduled_time_millis_valid() { + let exts = HashMap::from([( + FLOW_SCHEDULED_TIME_MILLIS.to_string(), + "1700000000000".to_string(), + )]); + assert_eq!( + parse_scheduled_time_millis(&exts).unwrap(), + Some(1700000000000) + ); + } + + #[test] + fn test_parse_scheduled_time_millis_malformed() { + let exts = HashMap::from([( + FLOW_SCHEDULED_TIME_MILLIS.to_string(), + "not-a-number".to_string(), + )]); + let err = parse_scheduled_time_millis(&exts).unwrap_err(); + assert!(format!("{err}").contains(FLOW_SCHEDULED_TIME_MILLIS)); + } + + #[test] + fn test_parse_scheduled_time_datetime_valid() { + let exts = HashMap::from([( + FLOW_SCHEDULED_TIME_MILLIS.to_string(), + "1700000000000".to_string(), + )]); + let dt = parse_scheduled_time_datetime(&exts).unwrap().unwrap(); + assert_eq!(dt.timestamp_millis(), 1700000000000); + } + + #[test] + fn test_parse_scheduled_time_datetime_negative_millis() { + let exts = HashMap::from([(FLOW_SCHEDULED_TIME_MILLIS.to_string(), "-1".to_string())]); + let dt = parse_scheduled_time_datetime(&exts).unwrap().unwrap(); + assert_eq!(dt.timestamp_millis(), -1); + } + + #[test] + fn test_parse_scheduled_time_datetime_out_of_range() { + // i64::MAX millis is well beyond representable chrono::DateTime range + let exts = HashMap::from([(FLOW_SCHEDULED_TIME_MILLIS.to_string(), i64::MAX.to_string())]); + let err = parse_scheduled_time_datetime(&exts).unwrap_err(); + assert!(format!("{err}").contains("Out-of-range")); + } } diff --git a/src/query/src/parser.rs b/src/query/src/parser.rs index fa499dd838..53f24557c7 100644 --- a/src/query/src/parser.rs +++ b/src/query/src/parser.rs @@ -125,12 +125,17 @@ pub struct QueryLanguageParser {} impl QueryLanguageParser { /// Try to parse SQL with GreptimeDB dialect, return the statement when success. - pub fn parse_sql(sql: &str, _query_ctx: &QueryContextRef) -> Result { + pub fn parse_sql(sql: &str, query_ctx: &QueryContextRef) -> Result { let _timer = PARSE_SQL_ELAPSED.start_timer(); - let mut statement = - ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) - .map_err(BoxedError::new) - .context(QueryParseSnafu { query: sql })?; + let scheduled_time = + crate::options::parse_scheduled_time_datetime(&query_ctx.extensions())?; + let mut statement = ParserContext::create_with_dialect( + sql, + &GreptimeDbDialect {}, + ParseOptions { scheduled_time }, + ) + .map_err(BoxedError::new) + .context(QueryParseSnafu { query: sql })?; if statement.len() != 1 { MultipleStatementsSnafu { query: sql.to_string(), @@ -323,7 +328,7 @@ impl Alias { #[cfg(test)] mod test { - use session::context::QueryContext; + use session::context::{QueryContext, QueryContextBuilder}; use super::*; @@ -338,6 +343,32 @@ mod test { assert_eq!("SELECT * FROM t1", sql_stmt.to_string()); } + #[test] + fn parse_sql_tql_uses_scheduled_time_extension() { + let ctx = Arc::new( + QueryContextBuilder::default() + .set_extension( + crate::options::FLOW_SCHEDULED_TIME_MILLIS.to_string(), + "1700000000000".to_string(), + ) + .build(), + ); + let query = "TQL EVAL (now() - '10 minutes'::interval, now(), '1m') http_requests_total"; + let stmt = QueryLanguageParser::parse_sql(query, &ctx).unwrap(); + + match stmt { + QueryStatement::Sql(sql::statements::statement::Statement::Tql( + sql::statements::tql::Tql::Eval(eval), + )) => { + assert_eq!(eval.start, "1699999400"); + assert_eq!(eval.end, "1700000000"); + assert_eq!(eval.step, "1m"); + assert_eq!(eval.query, "http_requests_total"); + } + _ => panic!("Expected TQL eval statement, got {stmt:?}"), + } + } + #[test] fn parse_promql_timestamp() { let cases = vec![ diff --git a/src/query/src/planner.rs b/src/query/src/planner.rs index c4f4af3c6a..44b3a4604d 100644 --- a/src/query/src/planner.rs +++ b/src/query/src/planner.rs @@ -24,6 +24,7 @@ use catalog::table_source::DfTableSourceProvider; use common_error::ext::BoxedError; use common_telemetry::tracing; use datafusion::common::{DFSchema, plan_err}; +use datafusion::execution::SessionStateBuilder; use datafusion::execution::context::SessionState; use datafusion::sql::planner::PlannerContext; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; @@ -85,6 +86,31 @@ impl DfLogicalPlanner { } } + /// Derive a [`SessionState`] whose [`ExecutionProps`] includes + /// `query_execution_start_time` if a scheduled time extension is present + /// in the query context. + fn derive_session_state_with_scheduled_time( + &self, + query_ctx: &QueryContextRef, + ) -> Result { + let extensions = query_ctx.extensions(); + match crate::options::parse_scheduled_time_datetime(&extensions)? { + Some(dt) => { + let execution_props = self + .session_state + .execution_props() + .clone() + .with_query_execution_start_time(dt); + Ok( + SessionStateBuilder::new_from_existing(self.session_state.clone()) + .with_execution_props(execution_props) + .build(), + ) + } + None => Ok(self.session_state.clone()), + } + } + /// Basically the same with `explain_to_plan` in DataFusion, but adapted to Greptime's /// `plan_sql` to support Greptime Statements. async fn explain_to_plan( @@ -178,15 +204,16 @@ impl DfLogicalPlanner { .fail()?; } + let scheduled_state = self.derive_session_state_with_scheduled_time(&query_ctx)?; let table_provider = DfTableSourceProvider::new( self.engine_state.catalog_manager().clone(), self.engine_state.disallow_cross_catalog_query(), query_ctx.clone(), Arc::new(DefaultPlanDecoder::new( - self.session_state.clone(), + scheduled_state.clone(), &query_ctx, )?), - self.session_state + scheduled_state .config_options() .sql_parser .enable_ident_normalization, @@ -194,7 +221,7 @@ impl DfLogicalPlanner { let context_provider = DfContextProviderAdapter::try_new( self.engine_state.clone(), - self.session_state.clone(), + scheduled_state.clone(), Some(&df_stmt), query_ctx.clone(), ) @@ -230,7 +257,7 @@ impl DfLogicalPlanner { .await?; // Optimize logical plan by extension rules - let context = QueryEngineContext::new(self.session_state.clone(), query_ctx); + let context = QueryEngineContext::new(scheduled_state, query_ctx); let plan = self .engine_state .optimize_by_extension_rules(plan, &context)?; @@ -248,9 +275,10 @@ impl DfLogicalPlanner { normalize_ident: bool, query_ctx: QueryContextRef, ) -> Result { + let scheduled_state = self.derive_session_state_with_scheduled_time(&query_ctx)?; let context_provider = DfContextProviderAdapter::try_new( self.engine_state.clone(), - self.session_state.clone(), + scheduled_state, None, query_ctx, ) @@ -271,8 +299,9 @@ impl DfLogicalPlanner { #[tracing::instrument(skip_all)] async fn plan_pql(&self, stmt: &EvalStmt, query_ctx: QueryContextRef) -> Result { + let scheduled_state = self.derive_session_state_with_scheduled_time(&query_ctx)?; let plan_decoder = Arc::new(DefaultPlanDecoder::new( - self.session_state.clone(), + scheduled_state.clone(), &query_ctx, )?); let table_provider = DfTableSourceProvider::new( @@ -280,7 +309,7 @@ impl DfLogicalPlanner { self.engine_state.disallow_cross_catalog_query(), query_ctx.clone(), plan_decoder, - self.session_state + scheduled_state .config_options() .sql_parser .enable_ident_normalization, @@ -290,7 +319,7 @@ impl DfLogicalPlanner { .map_err(BoxedError::new) .context(QueryPlanSnafu)?; - let context = QueryEngineContext::new(self.session_state.clone(), query_ctx); + let context = QueryEngineContext::new(scheduled_state, query_ctx); Ok(self .engine_state .optimize_by_extension_rules(plan, &context)?) diff --git a/src/query/src/range_select/plan_rewrite.rs b/src/query/src/range_select/plan_rewrite.rs index 4d795565f0..44eb7035a4 100644 --- a/src/query/src/range_select/plan_rewrite.rs +++ b/src/query/src/range_select/plan_rewrite.rs @@ -19,6 +19,7 @@ use std::time::Duration; use arrow_schema::DataType; use async_recursion::async_recursion; use catalog::table_source::DfTableSourceProvider; +use chrono::{DateTime, Utc}; use common_time::interval::{MS_PER_DAY, NANOS_PER_MILLI}; use common_time::timestamp::TimeUnit; use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp, Timezone}; @@ -121,7 +122,7 @@ fn parse_duration_expr(args: &[Expr], i: usize) -> DFResult { parse_duration(str).map_err(DataFusionError::Plan) } Some(expr) => { - let ms = evaluate_expr_to_millisecond(args, i, true)?; + let ms = evaluate_expr_to_millisecond(args, i, true, None)?; if ms <= 0 { return Err(dispose_parse_error(Some(expr))); } @@ -138,14 +139,22 @@ fn parse_duration_expr(args: &[Expr], i: usize) -> DFResult { /// Output a millisecond timestamp /// /// if `interval_only==true`, only accept expr with all interval type (case 2 will return a error) -fn evaluate_expr_to_millisecond(args: &[Expr], i: usize, interval_only: bool) -> DFResult { +fn evaluate_expr_to_millisecond( + args: &[Expr], + i: usize, + interval_only: bool, + scheduled_time: Option>, +) -> DFResult { let Some(expr) = args.get(i) else { return Err(dispose_parse_error(None)); }; if interval_only && !interval_only_in_expr(expr) { return Err(dispose_parse_error(Some(expr))); } - let info = SimplifyContext::default().with_current_time(); + let info = match scheduled_time { + Some(dt) => SimplifyContext::default().with_query_execution_start_time(Some(dt)), + None => SimplifyContext::default().with_current_time(), + }; let simplify_expr = ExprSimplifier::new(info).simplify(expr.clone())?; match simplify_expr { Expr::Literal(ScalarValue::TimestampNanosecond(ts_nanos, _), _) @@ -207,13 +216,22 @@ fn evaluate_expr_to_millisecond(args: &[Expr], i: usize, interval_only: bool) -> /// 2. Timestamp string: align to specific timestamp /// 3. An expr can be evaluated at the logical plan stage (e.g. `now() - INTERVAL '1' day`) /// 4. leave empty (as Default Option): align to unix epoch 0 (timezone aware) -fn parse_align_to(args: &[Expr], i: usize, timezone: Option<&Timezone>) -> DFResult { +fn parse_align_to( + args: &[Expr], + i: usize, + timezone: Option<&Timezone>, + scheduled_time: Option>, +) -> DFResult { let Ok(s) = parse_str_expr(args, i) else { - return evaluate_expr_to_millisecond(args, i, false); + return evaluate_expr_to_millisecond(args, i, false, scheduled_time); }; let upper = s.to_uppercase(); match upper.as_str() { - "NOW" => return Ok(Timestamp::current_millis().value()), + "NOW" => { + return Ok(scheduled_time + .map(|dt| dt.timestamp_millis()) + .unwrap_or_else(|| Timestamp::current_millis().value())); + } // default align to unix epoch 0 (timezone aware) "" => return Ok(timezone.map(|tz| tz.local_minus_utc() * 1000).unwrap_or(0)), _ => (), @@ -285,7 +303,15 @@ impl TreeNodeRewriter for RangeExprRewriter<'_> { .map_err(|e| DataFusionError::Plan(e.to_string()))?; let by = parse_expr_list(&func.args, 4, byc)?; let align = parse_duration_expr(&func.args, byc + 4)?; - let align_to = parse_align_to(&func.args, byc + 5, Some(&self.query_ctx.timezone()))?; + let scheduled_time = + crate::options::parse_scheduled_time_datetime(&self.query_ctx.extensions()) + .map_err(|err| DataFusionError::Plan(err.to_string()))?; + let align_to = parse_align_to( + &func.args, + byc + 5, + Some(&self.query_ctx.timezone()), + scheduled_time, + )?; let mut data_type = range_expr.get_type(self.input_plan.schema())?; let mut need_cast = false; let fill = Fill::try_from_str(parse_str_expr(&func.args, 2)?, &data_type)?; @@ -665,7 +691,7 @@ mod test { use datafusion_expr::{BinaryExpr, Literal, Operator}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema}; - use session::context::QueryContext; + use session::context::{QueryContext, QueryContextBuilder}; use table::metadata::{TableInfoBuilder, TableMetaBuilder}; use table::table::TableRef; use table::test_util::EmptyTable; @@ -771,6 +797,12 @@ mod test { engine.planner().plan(&stmt, QueryContext::arc()).await } + async fn do_query_with_ctx(sql: &str, query_ctx: QueryContextRef) -> Result { + let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx).unwrap(); + let engine = create_test_engine().await; + engine.planner().plan(&stmt, query_ctx).await + } + async fn do_union_query(sql: &str) -> Result { let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap(); let engine = create_union_test_engine().await; @@ -782,6 +814,26 @@ mod test { assert_eq!(plan.display_indent_schema().to_string(), expected); } + #[tokio::test] + async fn range_align_to_now_uses_scheduled_time_extension() { + let query_ctx = Arc::new( + QueryContextBuilder::default() + .set_extension( + crate::options::FLOW_SCHEDULED_TIME_MILLIS.to_string(), + "1700000000123".to_string(), + ) + .build(), + ); + let query = r#"SELECT timestamp, tag_0, tag_1, avg(field_0) RANGE '5m' FROM test ALIGN '1h' TO NOW by (tag_0,tag_1);"#; + let plan = do_query_with_ctx(query, query_ctx).await.unwrap(); + + assert!( + plan.display_indent_schema() + .to_string() + .contains("align_to=1700000000123ms") + ); + } + #[tokio::test] async fn range_no_project() { let query = r#"SELECT timestamp, tag_0, tag_1, avg(field_0 + field_1) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1);"#; @@ -1077,37 +1129,54 @@ mod test { fn test_parse_align_to() { // test NOW let args = vec!["NOW".lit()]; - let epsinon = parse_align_to(&args, 0, None).unwrap() - Timestamp::current_millis().value(); + let epsinon = + parse_align_to(&args, 0, None, None).unwrap() - Timestamp::current_millis().value(); assert!(epsinon.abs() < 100); + let scheduled_time = DateTime::from_timestamp(1_700_000_000, 123_000_000).unwrap(); + assert_eq!( + scheduled_time.timestamp_millis(), + parse_align_to(&args, 0, None, Some(scheduled_time)).unwrap() + ); // test default let args = vec!["".lit()]; - assert_eq!(0, parse_align_to(&args, 0, None).unwrap()); + assert_eq!(0, parse_align_to(&args, 0, None, None).unwrap()); // test default with timezone let args = vec!["".lit()]; assert_eq!( -36000 * 1000, - parse_align_to(&args, 0, Some(&Timezone::from_tz_string("HST").unwrap())).unwrap() + parse_align_to( + &args, + 0, + Some(&Timezone::from_tz_string("HST").unwrap()), + None + ) + .unwrap() ); assert_eq!( 28800 * 1000, parse_align_to( &args, 0, - Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap()) + Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap()), + None ) .unwrap() ); // test Timestamp let args = vec!["1970-01-01T00:00:00+08:00".lit()]; - assert_eq!(parse_align_to(&args, 0, None).unwrap(), -8 * 60 * 60 * 1000); + assert_eq!( + parse_align_to(&args, 0, None, None).unwrap(), + -8 * 60 * 60 * 1000 + ); // timezone let args = vec!["1970-01-01T00:00:00".lit()]; assert_eq!( parse_align_to( &args, 0, - Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap()) + Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap()), + None ) .unwrap(), -8 * 60 * 60 * 1000 @@ -1122,7 +1191,7 @@ mod test { ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(0, 10).into())).lit(), ), })]; - assert_eq!(parse_align_to(&args, 0, None).unwrap(), 20); + assert_eq!(parse_align_to(&args, 0, None, None).unwrap(), 20); } #[test] diff --git a/src/servers/src/grpc/flight.rs b/src/servers/src/grpc/flight.rs index 4544323652..97a579d952 100644 --- a/src/servers/src/grpc/flight.rs +++ b/src/servers/src/grpc/flight.rs @@ -627,6 +627,7 @@ fn to_flight_data_stream( #[cfg(test)] mod tests { + use query::options::FLOW_SCHEDULED_TIME_MILLIS; use tonic::metadata::{AsciiMetadataValue, MetadataMap}; use super::*; @@ -655,6 +656,25 @@ mod tests { ); } + #[test] + fn test_flow_extensions_can_carry_scheduled_time() { + let mut metadata = MetadataMap::new(); + metadata.insert( + FLOW_EXTENSIONS_METADATA_KEY, + AsciiMetadataValue::try_from(r#"[["flow.scheduled_time_millis","1700000000000"]]"#) + .unwrap(), + ); + + let flow_extensions = extract_flow_extensions(&metadata).unwrap(); + let query_ctx = + create_query_context(Channel::Grpc, None, flow_extensions, HashMap::new()).unwrap(); + + assert_eq!( + query_ctx.extension(FLOW_SCHEDULED_TIME_MILLIS), + Some("1700000000000") + ); + } + #[test] fn test_extract_flow_extensions_rejects_invalid_json() { let mut metadata = MetadataMap::new(); diff --git a/src/servers/src/grpc/greptime_handler.rs b/src/servers/src/grpc/greptime_handler.rs index 6ad982dc6a..fd8dfc365d 100644 --- a/src/servers/src/grpc/greptime_handler.rs +++ b/src/servers/src/grpc/greptime_handler.rs @@ -299,6 +299,7 @@ impl Drop for RequestTimer { mod tests { use chrono::FixedOffset; use common_time::Timezone; + use query::options::FLOW_SCHEDULED_TIME_MILLIS; use session::hints::{ INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY, REMOTE_QUERY_ID_EXTENSION_KEY, }; @@ -326,6 +327,10 @@ mod tests { INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY.to_string(), "spoofed-regs".to_string(), ), + ( + FLOW_SCHEDULED_TIME_MILLIS.to_string(), + "1700000000000".to_string(), + ), ], HashMap::from([(7, 88)]), ) @@ -341,23 +346,17 @@ mod tests { query_context.read_preference(), ReadPreference::Leader )); - let mut extensions = query_context.extensions().into_iter().collect::>(); - extensions.sort_unstable_by(|a, b| a.0.cmp(&b.0)); - assert_eq!( - extensions[0], - ("auto_create_table".to_string(), "true".to_string()) - ); - assert_eq!(extensions[1].0, REMOTE_QUERY_ID_EXTENSION_KEY.to_string()); - assert_eq!( - query_context.remote_query_id(), - Some(extensions[1].1.as_str()) - ); + assert_eq!(query_context.extension("auto_create_table"), Some("true")); assert_ne!(query_context.remote_query_id(), Some("spoofed")); assert!( query_context .extension(INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY) .is_none() ); + assert_eq!( + query_context.extension(FLOW_SCHEDULED_TIME_MILLIS), + Some("1700000000000") + ); } #[test] diff --git a/src/servers/src/http/hints.rs b/src/servers/src/http/hints.rs index 32598d7bf7..09e4879dd5 100644 --- a/src/servers/src/http/hints.rs +++ b/src/servers/src/http/hints.rs @@ -46,6 +46,7 @@ fn apply_hints(query_ctx: &mut QueryContext, hints: Vec<(String, String)>) { #[cfg(test)] mod tests { use common_query::request::INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY as COMMON_INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY; + use query::options::FLOW_SCHEDULED_TIME_MILLIS; use session::context::{QueryContextBuilder, generate_remote_query_id}; use session::hints::{ INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY, REMOTE_QUERY_ID_EXTENSION_KEY, @@ -54,7 +55,7 @@ mod tests { use super::apply_hints; #[test] - fn test_apply_hints_ignores_reserved_extension_keys() { + fn test_apply_hints_ignores_reserved_extension_keys_only() { let original_query_id = generate_remote_query_id(); let mut query_ctx = QueryContextBuilder::default() .set_extension( @@ -74,6 +75,10 @@ mod tests { INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY.to_string(), "spoofed-regs".to_string(), ), + ( + FLOW_SCHEDULED_TIME_MILLIS.to_string(), + "1700000000000".to_string(), + ), ("ttl".to_string(), "7d".to_string()), ], ); @@ -87,6 +92,10 @@ mod tests { .extension(INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY) .is_none() ); + assert_eq!( + query_ctx.extension(FLOW_SCHEDULED_TIME_MILLIS), + Some("1700000000000") + ); assert_eq!(query_ctx.extension("ttl"), Some("7d")); } diff --git a/src/sql/src/parser.rs b/src/sql/src/parser.rs index 50cd62360d..3a2d654f96 100644 --- a/src/sql/src/parser.rs +++ b/src/sql/src/parser.rs @@ -14,6 +14,7 @@ use std::str::FromStr; +use chrono::{DateTime, Utc}; use snafu::{OptionExt, ResultExt}; use sqlparser::ast::{Ident, Query, Value}; use sqlparser::dialect::Dialect; @@ -32,12 +33,18 @@ pub const FLOW: &str = "FLOW"; /// SQL Parser options. #[derive(Clone, Debug, Default)] -pub struct ParseOptions {} +pub struct ParseOptions { + /// If set, TQL parameter expressions containing `now()` will be evaluated + /// against this scheduled time instead of wall-clock time. + pub scheduled_time: Option>, +} /// GrepTime SQL parser context, a simple wrapper for Datafusion SQL parser. pub struct ParserContext<'a> { pub(crate) parser: Parser<'a>, pub(crate) sql: &'a str, + /// Optional scheduled time for `now()` evaluation in TQL parameters. + pub(crate) scheduled_time: Option>, } impl ParserContext<'_> { @@ -48,7 +55,11 @@ impl ParserContext<'_> { .try_with_sql(sql) .context(SyntaxSnafu)?; - Ok(ParserContext { parser, sql }) + Ok(ParserContext { + parser, + sql, + scheduled_time: None, + }) } /// Parses parser context to Query. @@ -60,11 +71,12 @@ impl ParserContext<'_> { pub fn create_with_dialect( sql: &str, dialect: &dyn Dialect, - _opts: ParseOptions, + opts: ParseOptions, ) -> Result> { let mut stmts: Vec = Vec::new(); let mut parser_ctx = ParserContext::new(dialect, sql)?; + parser_ctx.scheduled_time = opts.scheduled_time; let mut expecting_statement_delimiter = false; loop { @@ -95,7 +107,12 @@ impl ParserContext<'_> { .with_options(ParserOptions::new().with_trailing_commas(true)) .try_with_sql(sql) .context(SyntaxSnafu)?; - ParserContext { parser, sql }.intern_parse_table_name() + ParserContext { + parser, + sql, + scheduled_time: None, + } + .intern_parse_table_name() } pub(crate) fn intern_parse_table_name(&mut self) -> Result { diff --git a/src/sql/src/parsers/tql_parser.rs b/src/sql/src/parsers/tql_parser.rs index a918bbcbe0..25743d94b9 100644 --- a/src/sql/src/parsers/tql_parser.rs +++ b/src/sql/src/parsers/tql_parser.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use chrono::{DateTime, Utc}; use datafusion_common::ScalarValue; use snafu::{OptionExt, ResultExt}; use sqlparser::ast::AnalyzeFormat; @@ -172,6 +173,7 @@ impl ParserContext<'_> { let raw_query = tql_sql.trim_end_matches(';'); let mut parser_ctx = ParserContext::new(&crate::dialect::GreptimeDbDialect {}, tql_sql)?; + parser_ctx.scheduled_time = self.scheduled_time; let statement = parser_ctx.parse_tql(require_now_expr)?; match statement { @@ -195,6 +197,7 @@ impl ParserContext<'_> { &mut self, require_now_expr: bool, ) -> std::result::Result { + let scheduled_time = self.scheduled_time; let parser = &mut self.parser; let (start, end, step, lookback) = match parser.peek_token().token { Token::LParen => { @@ -217,16 +220,22 @@ impl ParserContext<'_> { let start = Self::parse_expr_to_literal_or_ts( exprs_iter.next().unwrap(), require_now_expr, + scheduled_time, )?; let end = Self::parse_expr_to_literal_or_ts( exprs_iter.next().unwrap(), require_now_expr, + scheduled_time, + )?; + let step = Self::parse_expr_to_literal_or_ts( + exprs_iter.next().unwrap(), + false, + scheduled_time, )?; - let step = Self::parse_expr_to_literal_or_ts(exprs_iter.next().unwrap(), false)?; let lookback = exprs_iter .next() - .map(|expr| Self::parse_expr_to_literal_or_ts(expr, false)) + .map(|expr| Self::parse_expr_to_literal_or_ts(expr, false, scheduled_time)) .transpose()?; if !parser.consume_token(&Token::RParen) { @@ -289,6 +298,7 @@ impl ParserContext<'_> { fn parse_expr_to_literal_or_ts( parser_expr: sqlparser::ast::Expr, require_now_expr: bool, + scheduled_time: Option>, ) -> std::result::Result { match parser_expr { sqlparser::ast::Expr::Value(v) => match v.value { @@ -313,7 +323,7 @@ impl ParserContext<'_> { } } }, - _ => Self::parse_expr_to_ts(parser_expr, require_now_expr), + _ => Self::parse_expr_to_ts(parser_expr, require_now_expr, scheduled_time), } } @@ -321,10 +331,15 @@ impl ParserContext<'_> { fn parse_expr_to_ts( parser_expr: sqlparser::ast::Expr, require_now_expr: bool, + scheduled_time: Option>, ) -> std::result::Result { - let lit = utils::parser_expr_to_scalar_value_literal(parser_expr, require_now_expr) - .map_err(Box::new) - .context(ConvertToLogicalExpressionSnafu)?; + let lit = utils::parser_expr_to_scalar_value_literal_at( + parser_expr, + require_now_expr, + scheduled_time, + ) + .map_err(Box::new) + .context(ConvertToLogicalExpressionSnafu)?; let second = match lit { ScalarValue::TimestampNanosecond(ts_nanos, _) @@ -416,6 +431,35 @@ mod tests { result.remove(0) } + fn parse_into_statement_at(sql: &str, scheduled_time: DateTime) -> Statement { + let mut result = ParserContext::create_with_dialect( + sql, + &GreptimeDbDialect {}, + ParseOptions { + scheduled_time: Some(scheduled_time), + }, + ) + .unwrap(); + assert_eq!(1, result.len()); + result.remove(0) + } + + #[test] + fn test_parse_tql_eval_with_scheduled_time() { + let scheduled_time = DateTime::from_timestamp(1_700_000_000, 0).unwrap(); + let sql = "TQL EVAL (now() - '10 minutes'::interval, now(), '1m') http_requests_total"; + + match parse_into_statement_at(sql, scheduled_time) { + Statement::Tql(Tql::Eval(eval)) => { + assert_eq!(eval.start, "1699999400"); + assert_eq!(eval.end, "1700000000"); + assert_eq!(eval.step, "1m"); + assert_eq!(eval.query, "http_requests_total"); + } + _ => unreachable!(), + } + } + #[test] fn test_require_now_expr() { let sql = "TQL EVAL (now() - now(), now() - (now() - '10 seconds'::interval), '1s') http_requests_total{environment=~'staging|testing|development',method!='GET'} @ 1609746000 offset 5m"; diff --git a/src/sql/src/parsers/utils.rs b/src/sql/src/parsers/utils.rs index e25a1d9522..71a6a2499d 100644 --- a/src/sql/src/parsers/utils.rs +++ b/src/sql/src/parsers/utils.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use std::sync::Arc; +use chrono::{DateTime, Utc}; use datafusion::config::ConfigOptions; use datafusion::error::Result as DfResult; use datafusion::execution::SessionStateBuilder; @@ -201,6 +202,18 @@ fn is_plain_wildcard_projection(projection: &[sqlparser::ast::SelectItem]) -> bo pub fn parser_expr_to_scalar_value_literal( expr: sqlparser::ast::Expr, require_now_expr: bool, +) -> Result { + parser_expr_to_scalar_value_literal_at(expr, require_now_expr, None) +} + +/// Same as [`parser_expr_to_scalar_value_literal`] but uses the provided +/// `scheduled_time` for evaluating `now()`. If `scheduled_time` is +/// `Some`, `now()` will be simplified to the given timestamp instead of +/// the current wall-clock time. +pub fn parser_expr_to_scalar_value_literal_at( + expr: sqlparser::ast::Expr, + require_now_expr: bool, + scheduled_time: Option>, ) -> Result { // 1. convert parser expr to logical expr let empty_df_schema = DFSchema::empty(); @@ -250,8 +263,11 @@ pub fn parser_expr_to_scalar_value_literal( } } - // 2. simplify logical expr - let info = SimplifyContext::default().with_current_time(); + // 2. simplify logical expr — use scheduled time if provided, else wall-clock + let info = match scheduled_time { + Some(dt) => SimplifyContext::default().with_query_execution_start_time(Some(dt)), + None => SimplifyContext::default().with_current_time(), + }; let simplifier = ExprSimplifier::new(info); // Coerce the logical expression so simplifier can handle it correctly. This is necessary for const eval with possible type mismatch. i.e.: `now() - now() + '15s'::interval` which is `TimestampNanosecond - TimestampNanosecond + IntervalMonthDayNano`. diff --git a/src/sql/src/parsers/with_tql_parser.rs b/src/sql/src/parsers/with_tql_parser.rs index 382b2a9137..08bda1e101 100644 --- a/src/sql/src/parsers/with_tql_parser.rs +++ b/src/sql/src/parsers/with_tql_parser.rs @@ -275,6 +275,7 @@ impl ParserContext<'_> { let tql_string = tql_string.trim(); let mut parser_ctx = ParserContext::new(&GreptimeDbDialect {}, tql_string)?; + parser_ctx.scheduled_time = self.scheduled_time; let statement = parser_ctx.parse_tql(require_now_expr)?; match statement { diff --git a/tests/cases/distributed/flow-tql/flow_eval_interval_schedule.result b/tests/cases/distributed/flow-tql/flow_eval_interval_schedule.result new file mode 100644 index 0000000000..7c72b23b7f --- /dev/null +++ b/tests/cases/distributed/flow-tql/flow_eval_interval_schedule.result @@ -0,0 +1,52 @@ +CREATE TABLE eval_interval_schedule_input ( + ts TIMESTAMP(3) TIME INDEX, + series STRING, + v DOUBLE, + PRIMARY KEY(series) +); + +Affected Rows: 0 + +CREATE FLOW eval_interval_schedule_flow +SINK TO eval_interval_schedule_sink +EVAL INTERVAL '1s' +AS +SELECT + date_trunc('second', now()) AS ts, + count(v) AS value_count +FROM eval_interval_schedule_input +GROUP BY date_trunc('second', now()); + +Affected Rows: 0 + +INSERT INTO eval_interval_schedule_input VALUES + ('2026-06-25 00:00:00', 'a', 1.0); + +Affected Rows: 1 + +-- SQLNESS SLEEP 5s +SELECT + count(DISTINCT ts) >= 2 AS has_multiple_scheduled_ticks, + min(value_count) AS min_value_count, + max(value_count) AS max_value_count +FROM eval_interval_schedule_sink +WHERE value_count > 0; + ++------------------------------+-----------------+-----------------+ +| has_multiple_scheduled_ticks | min_value_count | max_value_count | ++------------------------------+-----------------+-----------------+ +| true | 1 | 1 | ++------------------------------+-----------------+-----------------+ + +DROP FLOW eval_interval_schedule_flow; + +Affected Rows: 0 + +DROP TABLE eval_interval_schedule_sink; + +Affected Rows: 0 + +DROP TABLE eval_interval_schedule_input; + +Affected Rows: 0 + diff --git a/tests/cases/distributed/flow-tql/flow_eval_interval_schedule.sql b/tests/cases/distributed/flow-tql/flow_eval_interval_schedule.sql new file mode 100644 index 0000000000..1d3e476ce5 --- /dev/null +++ b/tests/cases/distributed/flow-tql/flow_eval_interval_schedule.sql @@ -0,0 +1,31 @@ +CREATE TABLE eval_interval_schedule_input ( + ts TIMESTAMP(3) TIME INDEX, + series STRING, + v DOUBLE, + PRIMARY KEY(series) +); + +CREATE FLOW eval_interval_schedule_flow +SINK TO eval_interval_schedule_sink +EVAL INTERVAL '1s' +AS +SELECT + date_trunc('second', now()) AS ts, + count(v) AS value_count +FROM eval_interval_schedule_input +GROUP BY date_trunc('second', now()); + +INSERT INTO eval_interval_schedule_input VALUES + ('2026-06-25 00:00:00', 'a', 1.0); + +-- SQLNESS SLEEP 5s +SELECT + count(DISTINCT ts) >= 2 AS has_multiple_scheduled_ticks, + min(value_count) AS min_value_count, + max(value_count) AS max_value_count +FROM eval_interval_schedule_sink +WHERE value_count > 0; + +DROP FLOW eval_interval_schedule_flow; +DROP TABLE eval_interval_schedule_sink; +DROP TABLE eval_interval_schedule_input; diff --git a/tests/compatibility/cases/flow_eval_interval/case.toml b/tests/compatibility/cases/flow_eval_interval/case.toml new file mode 100644 index 0000000000..6dbb6cb207 --- /dev/null +++ b/tests/compatibility/cases/flow_eval_interval/case.toml @@ -0,0 +1,8 @@ +name = "flow_eval_interval" +reason = "Verify EVAL INTERVAL SQL and TQL flow metadata created before upgrade can be read and executed after upgrade." +introduced_by = "PR #8360" +topologies = ["distributed"] +from_range = ["<=v1.1.1"] +to_range = [">v1.1.1"] +features = ["flow", "tql", "table"] +owner = "flow" diff --git a/tests/compatibility/cases/flow_eval_interval/setup.sql b/tests/compatibility/cases/flow_eval_interval/setup.sql new file mode 100644 index 0000000000..eb48cd718d --- /dev/null +++ b/tests/compatibility/cases/flow_eval_interval/setup.sql @@ -0,0 +1,30 @@ +CREATE TABLE compat_sql_input ( + ts TIMESTAMP(3) TIME INDEX, + series STRING, + v DOUBLE, + PRIMARY KEY(series) +); + +CREATE FLOW compat_sql_eval_flow +SINK TO compat_sql_eval_sink +EVAL INTERVAL '1s' +AS +SELECT + date_trunc('second', now()) AS ts, + count(v) AS value_count +FROM compat_sql_input +GROUP BY date_trunc('second', now()); + +CREATE TABLE compat_tql_input ( + ts TIMESTAMP(3) TIME INDEX, + host STRING, + idc STRING, + val DOUBLE, + PRIMARY KEY(host, idc) +); + +CREATE FLOW compat_tql_eval_flow +SINK TO compat_tql_eval_sink +EVAL INTERVAL '1s' +AS +TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", compat_tql_input); diff --git a/tests/compatibility/cases/flow_eval_interval/verify.result b/tests/compatibility/cases/flow_eval_interval/verify.result new file mode 100644 index 0000000000..9073b9a13d --- /dev/null +++ b/tests/compatibility/cases/flow_eval_interval/verify.result @@ -0,0 +1,119 @@ +SELECT flow_name, source_table_names +FROM information_schema.flows +WHERE flow_name IN ('compat_sql_eval_flow', 'compat_tql_eval_flow') +ORDER BY flow_name; + ++----------------------+----------------------------------------------+ +| flow_name | source_table_names | ++----------------------+----------------------------------------------+ +| compat_sql_eval_flow | greptime.flow_eval_interval.compat_sql_input | +| compat_tql_eval_flow | greptime.flow_eval_interval.compat_tql_input | ++----------------------+----------------------------------------------+ + +SHOW CREATE FLOW compat_sql_eval_flow; + ++----------------------+---------------------------------------------------------------------------------------------------------------------------------+ +| Flow | Create Flow | ++----------------------+---------------------------------------------------------------------------------------------------------------------------------+ +| compat_sql_eval_flow | CREATE FLOW IF NOT EXISTS compat_sql_eval_flow | +| | SINK TO flow_eval_interval.compat_sql_eval_sink | +| | EVAL INTERVAL '1 s' | +| | AS SELECT date_trunc('second', now()) AS ts, count(v) AS value_count FROM compat_sql_input GROUP BY date_trunc('second', now()) | ++----------------------+---------------------------------------------------------------------------------------------------------------------------------+ + +SHOW CREATE FLOW compat_tql_eval_flow; + ++----------------------+-------------------------------------------------------------------------------------------------+ +| Flow | Create Flow | ++----------------------+-------------------------------------------------------------------------------------------------+ +| compat_tql_eval_flow | CREATE FLOW IF NOT EXISTS compat_tql_eval_flow | +| | SINK TO flow_eval_interval.compat_tql_eval_sink | +| | EVAL INTERVAL '1 s' | +| | AS TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", compat_tql_input) | ++----------------------+-------------------------------------------------------------------------------------------------+ + +SHOW CREATE TABLE compat_sql_eval_sink; + ++----------------------+-----------------------------------------------------+ +| Table | Create Table | ++----------------------+-----------------------------------------------------+ +| compat_sql_eval_sink | CREATE TABLE IF NOT EXISTS "compat_sql_eval_sink" ( | +| | "ts" TIMESTAMP(9) NOT NULL, | +| | "value_count" BIGINT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | ++----------------------+-----------------------------------------------------+ + +SHOW CREATE TABLE compat_tql_eval_sink; + ++----------------------+-----------------------------------------------------+ +| Table | Create Table | ++----------------------+-----------------------------------------------------+ +| compat_tql_eval_sink | CREATE TABLE IF NOT EXISTS "compat_tql_eval_sink" ( | +| | "count(compat_tql_input.val)" DOUBLE NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "status_code" STRING NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("status_code") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | ++----------------------+-----------------------------------------------------+ + +INSERT INTO compat_sql_input VALUES + ('2026-06-25 00:00:00', 'a', 1.0), + ('2026-06-25 00:00:01', 'b', 2.0); + +Affected Rows: 2 + +INSERT INTO compat_tql_input VALUES + (now() - '17s'::interval, 'host1', 'idc1', 200), + (now() - '13s'::interval, 'host2', 'idc1', 401), + (now() - '7s'::interval, 'host3', 'idc2', 500); + +Affected Rows: 3 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('compat_sql_eval_flow'); + ++------------------------------------------+ +| ADMIN FLUSH_FLOW('compat_sql_eval_flow') | ++------------------------------------------+ +| FLOW_FLUSHED | ++------------------------------------------+ + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('compat_tql_eval_flow'); + ++------------------------------------------+ +| ADMIN FLUSH_FLOW('compat_tql_eval_flow') | ++------------------------------------------+ +| FLOW_FLUSHED | ++------------------------------------------+ + +SELECT count(*) > 0 AS sql_flow_executed +FROM compat_sql_eval_sink; + ++-------------------+ +| sql_flow_executed | ++-------------------+ +| true | ++-------------------+ + +SELECT count(*) > 0 AS tql_flow_executed +FROM compat_tql_eval_sink; + ++-------------------+ +| tql_flow_executed | ++-------------------+ +| true | ++-------------------+ diff --git a/tests/compatibility/cases/flow_eval_interval/verify.sql b/tests/compatibility/cases/flow_eval_interval/verify.sql new file mode 100644 index 0000000000..63b83bf1f3 --- /dev/null +++ b/tests/compatibility/cases/flow_eval_interval/verify.sql @@ -0,0 +1,33 @@ +SELECT flow_name, source_table_names +FROM information_schema.flows +WHERE flow_name IN ('compat_sql_eval_flow', 'compat_tql_eval_flow') +ORDER BY flow_name; + +SHOW CREATE FLOW compat_sql_eval_flow; + +SHOW CREATE FLOW compat_tql_eval_flow; + +SHOW CREATE TABLE compat_sql_eval_sink; + +SHOW CREATE TABLE compat_tql_eval_sink; + +INSERT INTO compat_sql_input VALUES + ('2026-06-25 00:00:00', 'a', 1.0), + ('2026-06-25 00:00:01', 'b', 2.0); + +INSERT INTO compat_tql_input VALUES + (now() - '17s'::interval, 'host1', 'idc1', 200), + (now() - '13s'::interval, 'host2', 'idc1', 401), + (now() - '7s'::interval, 'host3', 'idc2', 500); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('compat_sql_eval_flow'); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('compat_tql_eval_flow'); + +SELECT count(*) > 0 AS sql_flow_executed +FROM compat_sql_eval_sink; + +SELECT count(*) > 0 AS tql_flow_executed +FROM compat_tql_eval_sink;