feat: flow type on creating procedure (#5572)

feat: flow type on creating
This commit is contained in:
jeremyhi
2025-02-20 16:12:02 +08:00
committed by GitHub
parent f6f617d667
commit a0ff9e751e

View File

@@ -15,6 +15,7 @@
mod metadata;
use std::collections::BTreeMap;
use std::fmt;
use api::v1::flow::flow_request::Body as PbFlowRequest;
use api::v1::flow::{CreateRequest, FlowRequest, FlowRequestHeader};
@@ -77,6 +78,7 @@ impl CreateFlowProcedure {
query_context,
state: CreateFlowState::Prepare,
prev_flow_info_value: None,
flow_type: None,
},
}
}
@@ -104,7 +106,7 @@ impl CreateFlowProcedure {
if create_if_not_exists && or_replace {
// this is forbidden because not clear what does that mean exactly
return error::UnsupportedSnafu {
operation: "Create flow with both `IF NOT EXISTS` and `OR REPLACE`".to_string(),
operation: "Create flow with both `IF NOT EXISTS` and `OR REPLACE`",
}
.fail();
}
@@ -175,6 +177,8 @@ impl CreateFlowProcedure {
self.allocate_flow_id().await?;
}
self.data.state = CreateFlowState::CreateFlows;
// determine flow type
self.data.flow_type = Some(determine_flow_type(&self.data.task));
Ok(Status::executing(true))
}
@@ -309,6 +313,11 @@ impl Procedure for CreateFlowProcedure {
}
}
pub fn determine_flow_type(_flow_task: &CreateFlowTask) -> FlowType {
// TODO(discord9): determine flow type
FlowType::RecordingRule
}
/// The state of [CreateFlowProcedure].
#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
pub enum CreateFlowState {
@@ -322,6 +331,35 @@ pub enum CreateFlowState {
CreateMetadata,
}
/// The type of flow.
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum FlowType {
/// The flow is a recording rule task.
RecordingRule,
/// The flow is a streaming task.
Streaming,
}
impl FlowType {
pub const RECORDING_RULE: &str = "recording_rule";
pub const STREAMING: &str = "streaming";
}
impl Default for FlowType {
fn default() -> Self {
Self::RecordingRule
}
}
impl fmt::Display for FlowType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
FlowType::RecordingRule => write!(f, "{}", FlowType::RECORDING_RULE),
FlowType::Streaming => write!(f, "{}", FlowType::STREAMING),
}
}
}
/// The serializable data.
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateFlowData {
@@ -335,6 +373,7 @@ pub struct CreateFlowData {
/// For verify if prev value is consistent when need to update flow metadata.
/// only set when `or_replace` is true.
pub(crate) prev_flow_info_value: Option<DeserializedValueWithBytes<FlowInfoValue>>,
pub(crate) flow_type: Option<FlowType>,
}
impl From<&CreateFlowData> for CreateRequest {
@@ -342,7 +381,7 @@ impl From<&CreateFlowData> for CreateRequest {
let flow_id = value.flow_id.unwrap();
let source_table_ids = &value.source_table_ids;
CreateRequest {
let mut req = CreateRequest {
flow_id: Some(api::v1::FlowId { id: flow_id }),
source_table_ids: source_table_ids
.iter()
@@ -356,7 +395,11 @@ impl From<&CreateFlowData> for CreateRequest {
comment: value.task.comment.clone(),
sql: value.task.sql.clone(),
flow_options: value.task.flow_options.clone(),
}
};
let flow_type = value.flow_type.unwrap_or_default().to_string();
req.flow_options.insert("flow_type".to_string(), flow_type);
req
}
}
@@ -369,7 +412,7 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa
expire_after,
comment,
sql,
flow_options: options,
flow_options: mut options,
..
} = value.task.clone();
@@ -386,19 +429,21 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa
.map(|(idx, peer)| (idx as u32, FlowRouteValue { peer: peer.clone() }))
.collect::<Vec<_>>();
(
FlowInfoValue {
source_table_ids: value.source_table_ids.clone(),
sink_table_name,
flownode_ids,
catalog_name,
flow_name,
raw_sql: sql,
expire_after,
comment,
options,
},
flow_routes,
)
let flow_type = value.flow_type.unwrap_or_default().to_string();
options.insert("flow_type".to_string(), flow_type);
let flow_info = FlowInfoValue {
source_table_ids: value.source_table_ids.clone(),
sink_table_name,
flownode_ids,
catalog_name,
flow_name,
raw_sql: sql,
expire_after,
comment,
options,
};
(flow_info, flow_routes)
}
}