1mod metadata;
16
17use std::collections::{BTreeMap, HashMap};
18use std::fmt;
19
20use api::v1::ExpireAfter;
21use api::v1::flow::flow_request::Body as PbFlowRequest;
22use api::v1::flow::{CreateRequest, FlowRequest, FlowRequestHeader};
23use async_trait::async_trait;
24use common_catalog::format_full_flow_name;
25use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
26use common_procedure::{
27 Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
28};
29use common_telemetry::info;
30use common_telemetry::tracing_context::TracingContext;
31use futures::future::join_all;
32use itertools::Itertools;
33use serde::{Deserialize, Serialize};
34use snafu::{ResultExt, ensure};
35use strum::AsRefStr;
36use table::metadata::TableId;
37use table::table_name::TableName;
38
39use crate::cache_invalidator::Context;
40use crate::ddl::DdlContext;
41use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error};
42use crate::error::{self, Result, UnexpectedSnafu};
43use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
44use crate::key::flow::flow_info::{FlowInfoValue, FlowStatus};
45use crate::key::flow::flow_route::FlowRouteValue;
46use crate::key::table_name::TableNameKey;
47use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId};
48use crate::lock_key::{CatalogLock, FlowNameLock, TableNameLock};
49use crate::metrics;
50use crate::peer::Peer;
51use crate::rpc::ddl::{CreateFlowTask, FlowQueryContext, QueryContext};
52
53pub struct CreateFlowProcedure {
55 pub context: DdlContext,
56 pub data: CreateFlowData,
57}
58
59impl CreateFlowProcedure {
60 pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateFlow";
61
62 pub fn new(task: CreateFlowTask, query_context: QueryContext, context: DdlContext) -> Self {
64 Self {
65 context,
66 data: CreateFlowData {
67 task,
68 flow_id: None,
69 peers: vec![],
70 source_table_ids: vec![],
71 unresolved_source_table_names: vec![],
72 flow_context: query_context.into(), state: CreateFlowState::Prepare,
74 prev_flow_info_value: None,
75 did_replace: false,
76 flow_type: None,
77 },
78 }
79 }
80
81 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
83 let data = serde_json::from_str(json).context(FromJsonSnafu)?;
84 Ok(CreateFlowProcedure { context, data })
85 }
86
87 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
88 let catalog_name = &self.data.task.catalog_name;
89 let flow_name = &self.data.task.flow_name;
90 let sink_table_name = &self.data.task.sink_table_name;
91 let create_if_not_exists = self.data.task.create_if_not_exists;
92 let or_replace = self.data.task.or_replace;
93
94 validate_flow_options(&self.data.task)?;
95
96 let flow_name_value = self
97 .context
98 .flow_metadata_manager
99 .flow_name_manager()
100 .get(catalog_name, flow_name)
101 .await?;
102
103 if create_if_not_exists && or_replace {
104 return error::UnsupportedSnafu {
106 operation: "Create flow with both `IF NOT EXISTS` and `OR REPLACE`",
107 }
108 .fail();
109 }
110
111 if let Some(value) = flow_name_value {
112 ensure!(
113 create_if_not_exists || or_replace,
114 error::FlowAlreadyExistsSnafu {
115 flow_name: format_full_flow_name(catalog_name, flow_name),
116 }
117 );
118
119 let flow_id = value.flow_id();
120 if create_if_not_exists {
121 info!("Flow already exists, flow_id: {}", flow_id);
122 return Ok(Status::done_with_output(flow_id));
123 }
124
125 let flow_id = value.flow_id();
126 let peers = self
127 .context
128 .flow_metadata_manager
129 .flow_route_manager()
130 .routes(flow_id)
131 .await?
132 .into_iter()
133 .map(|(_, value)| value.peer)
134 .collect::<Vec<_>>();
135 self.data.flow_id = Some(flow_id);
136 self.data.peers = peers;
137 info!("Replacing flow, flow_id: {}", flow_id);
138
139 let flow_info_value = self
140 .context
141 .flow_metadata_manager
142 .flow_info_manager()
143 .get_raw(flow_id)
144 .await?;
145
146 ensure!(
147 flow_info_value.is_some(),
148 error::FlowNotFoundSnafu {
149 flow_name: format_full_flow_name(catalog_name, flow_name),
150 }
151 );
152
153 self.data.prev_flow_info_value = flow_info_value;
154 }
155
156 let exists = self
158 .context
159 .table_metadata_manager
160 .table_name_manager()
161 .exists(TableNameKey::new(
162 &sink_table_name.catalog_name,
163 &sink_table_name.schema_name,
164 &sink_table_name.table_name,
165 ))
166 .await?;
167 if exists {
170 common_telemetry::warn!("Table already exists, table: {}", sink_table_name);
171 }
172
173 self.collect_source_tables().await?;
174 ensure!(
175 self.data.unresolved_source_table_names.is_empty()
176 || defer_on_missing_source(&self.data.task)?,
177 error::UnsupportedSnafu {
178 operation: format!(
179 "Create flow with missing source tables requires WITH ('{DEFER_ON_MISSING_SOURCE_KEY}'='true'): {}",
180 self.data
181 .unresolved_source_table_names
182 .iter()
183 .map(ToString::to_string)
184 .join(", ")
185 )
186 }
187 );
188 self.ensure_supported_replace_transition()?;
189
190 let sink_table_name = &self.data.task.sink_table_name;
192 if self
193 .data
194 .task
195 .source_table_names
196 .iter()
197 .any(|source| source == sink_table_name)
198 {
199 return error::UnsupportedSnafu {
200 operation: format!(
201 "Creating flow with source and sink table being the same: {}",
202 sink_table_name
203 ),
204 }
205 .fail();
206 }
207
208 if self.data.flow_id.is_none() {
209 self.allocate_flow_id().await?;
210 }
211 self.data.flow_type = Some(get_flow_type_from_options(&self.data.task)?);
212
213 self.data.state = if self.data.is_pending() {
214 self.data.peers.clear();
215 CreateFlowState::CreateMetadata
216 } else {
217 CreateFlowState::CreateFlows
218 };
219
220 Ok(Status::executing(true))
221 }
222
223 fn ensure_supported_replace_transition(&self) -> Result<()> {
224 if !self.data.task.or_replace {
225 return Ok(());
226 }
227
228 let Some(prev_flow_info) = self.data.prev_flow_info_value.as_ref() else {
229 return Ok(());
230 };
231 let prev_pending = prev_flow_info.get_inner_ref().is_pending();
232 let new_pending = self.data.is_pending();
233 ensure!(
234 prev_pending == new_pending,
235 error::UnsupportedSnafu {
236 operation: "Replacing between pending and active flow states is not supported yet"
237 }
238 );
239
240 Ok(())
241 }
242
243 async fn on_flownode_create_flows(&mut self) -> Result<Status> {
244 let mut create_flow = Vec::with_capacity(self.data.peers.len());
246 for peer in &self.data.peers {
247 let requester = self.context.node_manager.flownode(peer).await;
248 let request = FlowRequest {
249 header: Some(FlowRequestHeader {
250 tracing_context: TracingContext::from_current_span().to_w3c(),
251 query_context: Some(QueryContext::from(self.data.flow_context.clone()).into()),
253 }),
254 body: Some(PbFlowRequest::Create((&self.data).into())),
255 };
256 create_flow.push(async move {
257 requester
258 .handle(request)
259 .await
260 .map_err(add_peer_context_if_needed(peer.clone()))
261 });
262 }
263 info!(
264 "Creating flow({:?}, type={:?}) on flownodes with peers={:?}",
265 self.data.flow_id, self.data.flow_type, self.data.peers
266 );
267 join_all(create_flow)
268 .await
269 .into_iter()
270 .collect::<Result<Vec<_>>>()?;
271
272 self.data.state = CreateFlowState::CreateMetadata;
273 Ok(Status::executing(true))
274 }
275
276 async fn on_create_metadata(&mut self) -> Result<Status> {
281 let flow_id = self.data.flow_id.unwrap();
283 let (flow_info, flow_routes) = (&self.data).into();
284 if let Some(prev_flow_value) = self.data.prev_flow_info_value.as_ref()
285 && self.data.task.or_replace
286 {
287 self.context
288 .flow_metadata_manager
289 .update_flow_metadata(flow_id, prev_flow_value, &flow_info, flow_routes)
290 .await?;
291 info!("Replaced flow metadata for flow {flow_id}");
292 self.data.did_replace = true;
293 } else {
294 self.context
295 .flow_metadata_manager
296 .create_flow_metadata(flow_id, flow_info, flow_routes)
297 .await?;
298 info!("Created flow metadata for flow {flow_id}");
299 }
300
301 self.data.state = CreateFlowState::InvalidateFlowCache;
302 Ok(Status::executing(true))
303 }
304
305 async fn on_broadcast(&mut self) -> Result<Status> {
306 debug_assert!(self.data.state == CreateFlowState::InvalidateFlowCache);
307 let flow_id = self.data.flow_id.unwrap();
309 let did_replace = self.data.did_replace;
310 let ctx = Context {
311 subject: Some("Invalidate flow cache by creating flow".to_string()),
312 };
313
314 let mut caches = vec![];
315
316 if did_replace {
318 let old_flow_info = self.data.prev_flow_info_value.as_ref().unwrap();
319
320 caches.extend([CacheIdent::DropFlow(DropFlow {
322 flow_id,
323 source_table_ids: old_flow_info.source_table_ids.clone(),
324 flow_part2node_id: old_flow_info.flownode_ids().clone().into_iter().collect(),
325 })]);
326 }
327
328 let (_flow_info, flow_routes) = (&self.data).into();
329 let flow_part2peers = flow_routes
330 .into_iter()
331 .map(|(part_id, route)| (part_id, route.peer))
332 .collect();
333
334 caches.extend([
335 CacheIdent::CreateFlow(CreateFlow {
336 flow_id,
337 source_table_ids: self.data.source_table_ids.clone(),
338 partition_to_peer_mapping: flow_part2peers,
339 }),
340 CacheIdent::FlowId(flow_id),
341 ]);
342
343 self.context
344 .cache_invalidator
345 .invalidate(&ctx, &caches)
346 .await?;
347
348 Ok(Status::done_with_output(flow_id))
349 }
350}
351
352#[async_trait]
353impl Procedure for CreateFlowProcedure {
354 fn type_name(&self) -> &str {
355 Self::TYPE_NAME
356 }
357
358 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
359 let state = &self.data.state;
360
361 let _timer = metrics::METRIC_META_PROCEDURE_CREATE_FLOW
362 .with_label_values(&[state.as_ref()])
363 .start_timer();
364
365 match state {
366 CreateFlowState::Prepare => self.on_prepare().await,
367 CreateFlowState::CreateFlows => self.on_flownode_create_flows().await,
368 CreateFlowState::CreateMetadata => self.on_create_metadata().await,
369 CreateFlowState::InvalidateFlowCache => self.on_broadcast().await,
370 }
371 .map_err(map_to_procedure_error)
372 }
373
374 fn dump(&self) -> ProcedureResult<String> {
375 serde_json::to_string(&self.data).context(ToJsonSnafu)
376 }
377
378 fn lock_key(&self) -> LockKey {
379 let catalog_name = &self.data.task.catalog_name;
380 let flow_name = &self.data.task.flow_name;
381 let sink_table_name = &self.data.task.sink_table_name;
382
383 LockKey::new(vec![
384 CatalogLock::Read(catalog_name).into(),
385 TableNameLock::new(
386 &sink_table_name.catalog_name,
387 &sink_table_name.schema_name,
388 &sink_table_name.catalog_name,
389 )
390 .into(),
391 FlowNameLock::new(catalog_name, flow_name).into(),
392 ])
393 }
394}
395
396pub fn get_flow_type_from_options(flow_task: &CreateFlowTask) -> Result<FlowType> {
397 let flow_type = flow_task
398 .flow_options
399 .get(FlowType::FLOW_TYPE_KEY)
400 .map(|s| s.as_str());
401 match flow_type {
402 Some(FlowType::BATCHING) => Ok(FlowType::Batching),
403 Some(FlowType::STREAMING) => Ok(FlowType::Streaming),
404 Some(unknown) => UnexpectedSnafu {
405 err_msg: format!("Unknown flow type: {}", unknown),
406 }
407 .fail(),
408 None => Ok(FlowType::Batching),
409 }
410}
411
412pub const DEFER_ON_MISSING_SOURCE_KEY: &str = "defer_on_missing_source";
414
415pub fn defer_on_missing_source(flow_task: &CreateFlowTask) -> Result<bool> {
416 flow_task
417 .flow_options
418 .get(DEFER_ON_MISSING_SOURCE_KEY)
419 .map(|value| {
420 value
421 .trim()
422 .to_ascii_lowercase()
423 .parse::<bool>()
424 .map_err(|_| {
425 error::UnexpectedSnafu {
426 err_msg: format!(
427 "Invalid flow option '{DEFER_ON_MISSING_SOURCE_KEY}': {value}"
428 ),
429 }
430 .build()
431 })
432 })
433 .transpose()
434 .map(|value| value.unwrap_or(false))
435}
436
437pub fn validate_flow_options(flow_task: &CreateFlowTask) -> Result<()> {
438 for key in flow_task.flow_options.keys() {
439 match key.as_str() {
440 DEFER_ON_MISSING_SOURCE_KEY
441 | FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY
442 | FlowType::FLOW_TYPE_KEY => {}
443 unknown => {
444 return UnexpectedSnafu {
445 err_msg: format!(
446 "Unknown flow option '{unknown}', supported user options: {DEFER_ON_MISSING_SOURCE_KEY}, {FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY}"
447 ),
448 }
449 .fail();
450 }
451 }
452 }
453
454 defer_on_missing_source(flow_task)?;
455 get_flow_type_from_options(flow_task)?;
456 Ok(())
457}
458
459fn user_runtime_flow_options(options: &HashMap<String, String>) -> HashMap<String, String> {
460 let mut options = options.clone();
461 options.remove(DEFER_ON_MISSING_SOURCE_KEY);
462 options
463}
464
465fn metadata_flow_options(options: &HashMap<String, String>) -> HashMap<String, String> {
466 options.clone()
467}
468
469#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
471pub enum CreateFlowState {
472 Prepare,
474 CreateFlows,
476 InvalidateFlowCache,
478 CreateMetadata,
480}
481
482#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
484pub enum FlowType {
485 #[default]
487 Batching,
488 Streaming,
490}
491
492pub const FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY: &str =
493 "experimental_enable_incremental_read";
494
495impl FlowType {
496 pub const BATCHING: &str = "batching";
497 pub const STREAMING: &str = "streaming";
498 pub const FLOW_TYPE_KEY: &str = "flow_type";
499}
500
501impl fmt::Display for FlowType {
502 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
503 match self {
504 FlowType::Batching => write!(f, "{}", FlowType::BATCHING),
505 FlowType::Streaming => write!(f, "{}", FlowType::STREAMING),
506 }
507 }
508}
509
510#[derive(Debug, Serialize, Deserialize)]
512pub struct CreateFlowData {
513 pub(crate) state: CreateFlowState,
514 pub(crate) task: CreateFlowTask,
515 pub(crate) flow_id: Option<FlowId>,
516 pub(crate) peers: Vec<Peer>,
517 pub(crate) source_table_ids: Vec<TableId>,
518 #[serde(default)]
519 pub(crate) unresolved_source_table_names: Vec<TableName>,
520 #[serde(alias = "query_context")]
522 pub(crate) flow_context: FlowQueryContext,
523 pub(crate) prev_flow_info_value: Option<DeserializedValueWithBytes<FlowInfoValue>>,
526 #[serde(default)]
529 pub(crate) did_replace: bool,
530 pub(crate) flow_type: Option<FlowType>,
531}
532
533impl CreateFlowData {
534 pub(crate) fn is_pending(&self) -> bool {
535 !self.unresolved_source_table_names.is_empty()
536 }
537
538 pub(crate) fn is_active(&self) -> bool {
539 !self.is_pending()
540 }
541}
542
543impl From<&CreateFlowData> for CreateRequest {
544 fn from(value: &CreateFlowData) -> Self {
545 let flow_id = value.flow_id.unwrap();
546 let source_table_ids = &value.source_table_ids;
547
548 let mut req = CreateRequest {
549 flow_id: Some(api::v1::FlowId { id: flow_id }),
550 source_table_ids: source_table_ids
551 .iter()
552 .map(|table_id| api::v1::TableId { id: *table_id })
553 .collect_vec(),
554 sink_table_name: Some(value.task.sink_table_name.clone().into()),
555 create_if_not_exists: true,
557 or_replace: value.task.or_replace,
558 expire_after: value.task.expire_after.map(|value| ExpireAfter { value }),
559 eval_interval: value
560 .task
561 .eval_interval_secs
562 .map(|seconds| api::v1::EvalInterval { seconds }),
563 comment: value.task.comment.clone(),
564 sql: value.task.sql.clone(),
565 flow_options: user_runtime_flow_options(&value.task.flow_options),
566 };
567
568 let flow_type = value.flow_type.unwrap_or_default().to_string();
569 req.flow_options
570 .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type);
571 req
572 }
573}
574
575impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteValue)>) {
576 fn from(value: &CreateFlowData) -> Self {
577 let CreateFlowTask {
578 catalog_name,
579 flow_name,
580 sink_table_name,
581 expire_after,
582 eval_interval_secs: eval_interval,
583 comment,
584 sql,
585 ..
586 } = value.task.clone();
587 let mut options = metadata_flow_options(&value.task.flow_options);
588
589 let flownode_ids = value
590 .peers
591 .iter()
592 .enumerate()
593 .map(|(idx, peer)| (idx as u32, peer.id))
594 .collect::<BTreeMap<_, _>>();
595 let flow_routes = value
596 .peers
597 .iter()
598 .enumerate()
599 .map(|(idx, peer)| (idx as u32, FlowRouteValue { peer: peer.clone() }))
600 .collect::<Vec<_>>();
601
602 let flow_type = value.flow_type.unwrap_or_default().to_string();
603 options.insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type);
604
605 let mut create_time = chrono::Utc::now();
606 if let Some(prev_flow_value) = value.prev_flow_info_value.as_ref()
607 && value.task.or_replace
608 {
609 create_time = prev_flow_value.get_inner_ref().created_time;
610 }
611
612 let flow_info: FlowInfoValue = FlowInfoValue {
613 source_table_ids: value.source_table_ids.clone(),
614 all_source_table_names: value.task.source_table_names.clone(),
615 unresolved_source_table_names: value.unresolved_source_table_names.clone(),
616 sink_table_name,
617 flownode_ids,
618 catalog_name,
619 query_context: Some(QueryContext::from(value.flow_context.clone())),
621 flow_name,
622 raw_sql: sql,
623 expire_after,
624 eval_interval_secs: eval_interval,
625 comment,
626 options,
627 status: if value.is_active() {
628 FlowStatus::Active
629 } else {
630 FlowStatus::PendingSources
631 },
632 created_time: create_time,
633 updated_time: chrono::Utc::now(),
634 };
635
636 (flow_info, flow_routes)
637 }
638}