fix: remove flow sink table lock (#8317)

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-06-17 14:05:49 +08:00
committed by GitHub
parent fd6c9e710d
commit a3cbfdb983
2 changed files with 29 additions and 9 deletions

View File

@@ -45,7 +45,7 @@ use crate::key::flow::flow_info::{FlowInfoValue, FlowStatus};
use crate::key::flow::flow_route::FlowRouteValue;
use crate::key::table_name::TableNameKey;
use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId};
use crate::lock_key::{CatalogLock, FlowNameLock, TableNameLock};
use crate::lock_key::{CatalogLock, FlowNameLock};
use crate::metrics;
use crate::peer::Peer;
use crate::rpc::ddl::{CreateFlowTask, FlowQueryContext, QueryContext};
@@ -378,16 +378,9 @@ impl Procedure for CreateFlowProcedure {
fn lock_key(&self) -> LockKey {
let catalog_name = &self.data.task.catalog_name;
let flow_name = &self.data.task.flow_name;
let sink_table_name = &self.data.task.sink_table_name;
LockKey::new(vec![
CatalogLock::Read(catalog_name).into(),
TableNameLock::new(
&sink_table_name.catalog_name,
&sink_table_name.schema_name,
&sink_table_name.catalog_name,
)
.into(),
FlowNameLock::new(catalog_name, flow_name).into(),
])
}

View File

@@ -18,7 +18,7 @@ use std::sync::Arc;
use api::v1::flow::CreateRequest;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_procedure::Status;
use common_procedure::{Procedure, Status};
use common_procedure_test::execute_procedure_until_done;
use table::table_name::TableName;
@@ -705,6 +705,33 @@ fn create_test_flow_task_for_serialization() -> CreateFlowTask {
}
}
#[test]
fn test_create_flow_lock_key_does_not_lock_sink_table_name() {
let task = create_test_flow_task_for_serialization();
let query_ctx = test_query_context();
let ddl_context = new_ddl_context(Arc::new(MockFlownodeManager::new(NaiveFlownodeHandler)));
let procedure = CreateFlowProcedure::new(task, query_ctx, ddl_context);
let lock_keys = procedure.lock_key().get_keys();
assert!(
lock_keys.contains(&"Share(\"__catalog_lock/test_catalog\")".to_string()),
"lock keys should include the catalog read lock, got: {lock_keys:?}"
);
assert!(
lock_keys.contains(&"Exclusive(\"__flow_name_lock/test_catalog.test_flow\")".to_string()),
"lock keys should include the flow name lock, got: {lock_keys:?}"
);
assert_eq!(
lock_keys
.iter()
.filter(|key| key.contains("__table_name_lock"))
.count(),
0,
"sink table name lock should be acquired by the nested create table procedure, got: {lock_keys:?}"
);
}
#[test]
fn test_create_flow_data_serialization_backward_compatibility() {
// Test that old serialized data with query_context can be deserialized