diff --git a/src/common/meta/src/ddl/activate_flow.rs b/src/common/meta/src/ddl/activate_flow.rs index 120f8474bf..25498bdfbd 100644 --- a/src/common/meta/src/ddl/activate_flow.rs +++ b/src/common/meta/src/ddl/activate_flow.rs @@ -316,6 +316,14 @@ async fn update_pending_flow_metadata( unresolved_source_table_names: Vec, last_activation_error: Option, ) -> Result<()> { + let current_flow_info_value = current_flow_info.get_inner_ref(); + if current_flow_info_value.source_table_ids == resolved_table_ids + && current_flow_info_value.unresolved_source_table_names == unresolved_source_table_names + && current_flow_info_value.last_activation_error == last_activation_error + { + return Ok(()); + } + let mut new_flow_info = current_flow_info.get_inner_ref().clone(); new_flow_info.source_table_ids = resolved_table_ids; new_flow_info.unresolved_source_table_names = unresolved_source_table_names; diff --git a/src/common/meta/src/ddl/tests/activate_flow.rs b/src/common/meta/src/ddl/tests/activate_flow.rs index fa9a4848b5..4b30936ede 100644 --- a/src/common/meta/src/ddl/tests/activate_flow.rs +++ b/src/common/meta/src/ddl/tests/activate_flow.rs @@ -188,6 +188,23 @@ async fn test_activate_pending_flow_require_streaming_keeps_pending() { .unwrap() .contains("requires streaming activation") ); + let first_updated_time = *pending_flow.updated_time(); + + let mut procedure = ActivatePendingFlowProcedure::new( + flow_id, + DEFAULT_CATALOG_NAME.to_string(), + ddl_context.clone(), + ); + assert!(execute_procedure_until_done(&mut procedure).await.is_none()); + + let pending_flow = ddl_context + .flow_metadata_manager + .flow_info_manager() + .get(flow_id) + .await + .unwrap() + .unwrap(); + assert_eq!(pending_flow.updated_time(), &first_updated_time); } #[tokio::test]