perf: not update metadata if no change

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-03-17 16:11:30 +08:00
parent 54f28f7a78
commit 431fbc3371
2 changed files with 25 additions and 0 deletions

View File

@@ -316,6 +316,14 @@ async fn update_pending_flow_metadata(
unresolved_source_table_names: Vec<TableName>,
last_activation_error: Option<String>,
) -> Result<()> {
let current_flow_info_value = current_flow_info.get_inner_ref();
if current_flow_info_value.source_table_ids == resolved_table_ids
&& current_flow_info_value.unresolved_source_table_names == unresolved_source_table_names
&& current_flow_info_value.last_activation_error == last_activation_error
{
return Ok(());
}
let mut new_flow_info = current_flow_info.get_inner_ref().clone();
new_flow_info.source_table_ids = resolved_table_ids;
new_flow_info.unresolved_source_table_names = unresolved_source_table_names;

View File

@@ -188,6 +188,23 @@ async fn test_activate_pending_flow_require_streaming_keeps_pending() {
.unwrap()
.contains("requires streaming activation")
);
let first_updated_time = *pending_flow.updated_time();
let mut procedure = ActivatePendingFlowProcedure::new(
flow_id,
DEFAULT_CATALOG_NAME.to_string(),
ddl_context.clone(),
);
assert!(execute_procedure_until_done(&mut procedure).await.is_none());
let pending_flow = ddl_context
.flow_metadata_manager
.flow_info_manager()
.get(flow_id)
.await
.unwrap()
.unwrap();
assert_eq!(pending_flow.updated_time(), &first_updated_time);
}
#[tokio::test]