diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index 8a419176c9..a8779b67cb 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -45,7 +45,7 @@ use crate::key::flow::flow_info::{FlowInfoValue, FlowStatus}; use crate::key::flow::flow_route::FlowRouteValue; use crate::key::table_name::TableNameKey; use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId}; -use crate::lock_key::{CatalogLock, FlowNameLock, TableNameLock}; +use crate::lock_key::{CatalogLock, FlowNameLock}; use crate::metrics; use crate::peer::Peer; use crate::rpc::ddl::{CreateFlowTask, FlowQueryContext, QueryContext}; @@ -378,16 +378,9 @@ impl Procedure for CreateFlowProcedure { fn lock_key(&self) -> LockKey { let catalog_name = &self.data.task.catalog_name; let flow_name = &self.data.task.flow_name; - let sink_table_name = &self.data.task.sink_table_name; LockKey::new(vec![ CatalogLock::Read(catalog_name).into(), - TableNameLock::new( - &sink_table_name.catalog_name, - &sink_table_name.schema_name, - &sink_table_name.catalog_name, - ) - .into(), FlowNameLock::new(catalog_name, flow_name).into(), ]) } diff --git a/src/common/meta/src/ddl/tests/create_flow.rs b/src/common/meta/src/ddl/tests/create_flow.rs index 7150be39cb..2778afcbaa 100644 --- a/src/common/meta/src/ddl/tests/create_flow.rs +++ b/src/common/meta/src/ddl/tests/create_flow.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use api::v1::flow::CreateRequest; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; -use common_procedure::Status; +use common_procedure::{Procedure, Status}; use common_procedure_test::execute_procedure_until_done; use table::table_name::TableName; @@ -705,6 +705,33 @@ fn create_test_flow_task_for_serialization() -> CreateFlowTask { } } +#[test] +fn test_create_flow_lock_key_does_not_lock_sink_table_name() { + let task = create_test_flow_task_for_serialization(); + let query_ctx = test_query_context(); + let ddl_context = new_ddl_context(Arc::new(MockFlownodeManager::new(NaiveFlownodeHandler))); + let procedure = CreateFlowProcedure::new(task, query_ctx, ddl_context); + + let lock_keys = procedure.lock_key().get_keys(); + + assert!( + lock_keys.contains(&"Share(\"__catalog_lock/test_catalog\")".to_string()), + "lock keys should include the catalog read lock, got: {lock_keys:?}" + ); + assert!( + lock_keys.contains(&"Exclusive(\"__flow_name_lock/test_catalog.test_flow\")".to_string()), + "lock keys should include the flow name lock, got: {lock_keys:?}" + ); + assert_eq!( + lock_keys + .iter() + .filter(|key| key.contains("__table_name_lock")) + .count(), + 0, + "sink table name lock should be acquired by the nested create table procedure, got: {lock_keys:?}" + ); +} + #[test] fn test_create_flow_data_serialization_backward_compatibility() { // Test that old serialized data with query_context can be deserialized