From a0ff9e751ee6ee7ed5b2227394d8c0b630bb3e11 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Thu, 20 Feb 2025 16:12:02 +0800 Subject: [PATCH] feat: flow type on creating procedure (#5572) feat: flow type on creating --- src/common/meta/src/ddl/create_flow.rs | 81 ++++++++++++++++++++------ 1 file changed, 63 insertions(+), 18 deletions(-) diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index 177bdf6b71..ba0582f71b 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -15,6 +15,7 @@ mod metadata; use std::collections::BTreeMap; +use std::fmt; use api::v1::flow::flow_request::Body as PbFlowRequest; use api::v1::flow::{CreateRequest, FlowRequest, FlowRequestHeader}; @@ -77,6 +78,7 @@ impl CreateFlowProcedure { query_context, state: CreateFlowState::Prepare, prev_flow_info_value: None, + flow_type: None, }, } } @@ -104,7 +106,7 @@ impl CreateFlowProcedure { if create_if_not_exists && or_replace { // this is forbidden because not clear what does that mean exactly return error::UnsupportedSnafu { - operation: "Create flow with both `IF NOT EXISTS` and `OR REPLACE`".to_string(), + operation: "Create flow with both `IF NOT EXISTS` and `OR REPLACE`", } .fail(); } @@ -175,6 +177,8 @@ impl CreateFlowProcedure { self.allocate_flow_id().await?; } self.data.state = CreateFlowState::CreateFlows; + // determine flow type + self.data.flow_type = Some(determine_flow_type(&self.data.task)); Ok(Status::executing(true)) } @@ -309,6 +313,11 @@ impl Procedure for CreateFlowProcedure { } } +pub fn determine_flow_type(_flow_task: &CreateFlowTask) -> FlowType { + // TODO(discord9): determine flow type + FlowType::RecordingRule +} + /// The state of [CreateFlowProcedure]. #[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)] pub enum CreateFlowState { @@ -322,6 +331,35 @@ pub enum CreateFlowState { CreateMetadata, } +/// The type of flow. +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub enum FlowType { + /// The flow is a recording rule task. + RecordingRule, + /// The flow is a streaming task. + Streaming, +} + +impl FlowType { + pub const RECORDING_RULE: &str = "recording_rule"; + pub const STREAMING: &str = "streaming"; +} + +impl Default for FlowType { + fn default() -> Self { + Self::RecordingRule + } +} + +impl fmt::Display for FlowType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + FlowType::RecordingRule => write!(f, "{}", FlowType::RECORDING_RULE), + FlowType::Streaming => write!(f, "{}", FlowType::STREAMING), + } + } +} + /// The serializable data. #[derive(Debug, Serialize, Deserialize)] pub struct CreateFlowData { @@ -335,6 +373,7 @@ pub struct CreateFlowData { /// For verify if prev value is consistent when need to update flow metadata. /// only set when `or_replace` is true. pub(crate) prev_flow_info_value: Option>, + pub(crate) flow_type: Option, } impl From<&CreateFlowData> for CreateRequest { @@ -342,7 +381,7 @@ impl From<&CreateFlowData> for CreateRequest { let flow_id = value.flow_id.unwrap(); let source_table_ids = &value.source_table_ids; - CreateRequest { + let mut req = CreateRequest { flow_id: Some(api::v1::FlowId { id: flow_id }), source_table_ids: source_table_ids .iter() @@ -356,7 +395,11 @@ impl From<&CreateFlowData> for CreateRequest { comment: value.task.comment.clone(), sql: value.task.sql.clone(), flow_options: value.task.flow_options.clone(), - } + }; + + let flow_type = value.flow_type.unwrap_or_default().to_string(); + req.flow_options.insert("flow_type".to_string(), flow_type); + req } } @@ -369,7 +412,7 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa expire_after, comment, sql, - flow_options: options, + flow_options: mut options, .. } = value.task.clone(); @@ -386,19 +429,21 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa .map(|(idx, peer)| (idx as u32, FlowRouteValue { peer: peer.clone() })) .collect::>(); - ( - FlowInfoValue { - source_table_ids: value.source_table_ids.clone(), - sink_table_name, - flownode_ids, - catalog_name, - flow_name, - raw_sql: sql, - expire_after, - comment, - options, - }, - flow_routes, - ) + let flow_type = value.flow_type.unwrap_or_default().to_string(); + options.insert("flow_type".to_string(), flow_type); + + let flow_info = FlowInfoValue { + source_table_ids: value.source_table_ids.clone(), + sink_table_name, + flownode_ids, + catalog_name, + flow_name, + raw_sql: sql, + expire_after, + comment, + options, + }; + + (flow_info, flow_routes) } }