From ec8266b969d7eaa54037d27c8e3970a8f1e9238a Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 10 Jan 2024 18:46:39 +0900 Subject: [PATCH] refactor: refactor the locks in the procedure (#3126) * feat: add lock key * refactor: procedure lock keys * chore: apply suggestions from CR --- src/common/catalog/src/lib.rs | 5 + src/common/meta/src/ddl/alter_table.rs | 56 ++--- src/common/meta/src/ddl/create_table.rs | 8 +- src/common/meta/src/ddl/drop_table.rs | 14 +- src/common/meta/src/ddl/truncate_table.rs | 14 +- src/common/meta/src/ddl_manager.rs | 23 +- src/common/meta/src/lib.rs | 1 + src/common/meta/src/lock_key.rs | 235 ++++++++++++++++++ src/common/procedure/src/lib.rs | 2 +- src/meta-srv/src/procedure/region_failover.rs | 17 +- .../src/procedure/region_migration.rs | 30 ++- src/meta-srv/src/procedure/utils.rs | 6 - 12 files changed, 333 insertions(+), 78 deletions(-) create mode 100644 src/common/meta/src/lock_key.rs diff --git a/src/common/catalog/src/lib.rs b/src/common/catalog/src/lib.rs index 1527b7f4a0..f64d0289ba 100644 --- a/src/common/catalog/src/lib.rs +++ b/src/common/catalog/src/lib.rs @@ -17,6 +17,11 @@ use consts::DEFAULT_CATALOG_NAME; pub mod consts; pub mod error; +#[inline] +pub fn format_schema_name(catalog: &str, schema: &str) -> String { + format!("{catalog}.{schema}") +} + /// Formats table fully-qualified name #[inline] pub fn format_full_table_name(catalog: &str, schema: &str, table: &str) -> String { diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index b22f08ef98..d866462db1 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -24,7 +24,7 @@ use async_trait::async_trait; use common_grpc_expr::alter_expr_to_request; use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; use common_procedure::{ - Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Status, + Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Status, StringKey, }; use common_telemetry::tracing_context::TracingContext; use common_telemetry::{debug, info}; @@ -44,6 +44,7 @@ use crate::error::{self, ConvertAlterTableRequestSnafu, InvalidProtoMsgSnafu, Re use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::DeserializedValueWithBytes; +use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock}; use crate::metrics; use crate::rpc::ddl::AlterTableTask; use crate::rpc::router::{find_leader_regions, find_leaders}; @@ -63,7 +64,7 @@ impl AlterTableProcedure { cluster_id: u64, task: AlterTableTask, table_info_value: DeserializedValueWithBytes, - physical_table_name: Option, + physical_table_info: Option<(TableId, TableName)>, context: DdlContext, ) -> Result { let alter_kind = task @@ -86,7 +87,7 @@ impl AlterTableProcedure { data: AlterTableData::new( task, table_info_value, - physical_table_name, + physical_table_info, cluster_id, next_column_id, ), @@ -335,32 +336,31 @@ impl AlterTableProcedure { Ok(Status::Done) } - fn lock_key_inner(&self) -> Vec { + fn lock_key_inner(&self) -> Vec { let mut lock_key = vec![]; - if let Some(physical_table_name) = self.data.physical_table_name() { - let physical_table_key = common_catalog::format_full_table_name( - &physical_table_name.catalog_name, - &physical_table_name.schema_name, - &physical_table_name.table_name, + if let Some((physical_table_id, physical_table_name)) = self.data.physical_table_info() { + lock_key.push(CatalogLock::Read(&physical_table_name.catalog_name).into()); + lock_key.push( + SchemaLock::read( + &physical_table_name.catalog_name, + &physical_table_name.schema_name, + ) + .into(), ); - lock_key.push(physical_table_key); + lock_key.push(TableLock::Read(*physical_table_id).into()) } let table_ref = self.data.table_ref(); - let table_key = common_catalog::format_full_table_name( - table_ref.catalog, - table_ref.schema, - table_ref.table, - ); - lock_key.push(table_key); + let table_id = self.data.table_id(); + lock_key.push(CatalogLock::Read(table_ref.catalog).into()); + lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into()); + lock_key.push(TableLock::Write(table_id).into()); if let Ok(Kind::RenameTable(RenameTable { new_table_name })) = self.alter_kind() { - lock_key.push(common_catalog::format_full_table_name( - table_ref.catalog, - table_ref.schema, - new_table_name, - )) + lock_key.push( + TableNameLock::new(table_ref.catalog, table_ref.schema, new_table_name).into(), + ) } lock_key @@ -406,7 +406,7 @@ impl Procedure for AlterTableProcedure { fn lock_key(&self) -> LockKey { let key = self.lock_key_inner(); - LockKey::new_exclusive(key) + LockKey::new(key) } } @@ -423,13 +423,13 @@ enum AlterTableState { #[derive(Debug, Serialize, Deserialize)] pub struct AlterTableData { + cluster_id: u64, state: AlterTableState, task: AlterTableTask, /// Table info value before alteration. table_info_value: DeserializedValueWithBytes, /// Physical table name, if the table to alter is a logical table. - physical_table_name: Option, - cluster_id: u64, + physical_table_info: Option<(TableId, TableName)>, /// Next column id of the table if the task adds columns to the table. next_column_id: Option, } @@ -438,7 +438,7 @@ impl AlterTableData { pub fn new( task: AlterTableTask, table_info_value: DeserializedValueWithBytes, - physical_table_name: Option, + physical_table_info: Option<(TableId, TableName)>, cluster_id: u64, next_column_id: Option, ) -> Self { @@ -446,7 +446,7 @@ impl AlterTableData { state: AlterTableState::Prepare, task, table_info_value, - physical_table_name, + physical_table_info, cluster_id, next_column_id, } @@ -464,8 +464,8 @@ impl AlterTableData { &self.table_info_value.table_info } - fn physical_table_name(&self) -> Option<&TableName> { - self.physical_table_name.as_ref() + fn physical_table_info(&self) -> Option<&(TableId, TableName)> { + self.physical_table_info.as_ref() } } diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index 9bdb6929c6..11d63bebd5 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -41,6 +41,7 @@ use crate::ddl::DdlContext; use crate::error::{self, Result, TableRouteNotFoundSnafu}; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; +use crate::lock_key::TableNameLock; use crate::metrics; use crate::region_keeper::OperatingRegionGuard; use crate::rpc::ddl::CreateTableTask; @@ -343,13 +344,12 @@ impl Procedure for CreateTableProcedure { fn lock_key(&self) -> LockKey { let table_ref = &self.creator.data.table_ref(); - let key = common_catalog::format_full_table_name( + + LockKey::single(TableNameLock::new( table_ref.catalog, table_ref.schema, table_ref.table, - ); - - LockKey::single_exclusive(key) + )) } } diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index dfd674d139..db9974df20 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -41,6 +41,7 @@ use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; use crate::key::DeserializedValueWithBytes; +use crate::lock_key::{CatalogLock, SchemaLock, TableLock}; use crate::metrics; use crate::region_keeper::OperatingRegionGuard; use crate::rpc::ddl::DropTableTask; @@ -267,13 +268,14 @@ impl Procedure for DropTableProcedure { fn lock_key(&self) -> LockKey { let table_ref = &self.data.table_ref(); - let key = common_catalog::format_full_table_name( - table_ref.catalog, - table_ref.schema, - table_ref.table, - ); + let table_id = self.data.table_id(); + let lock_key = vec![ + CatalogLock::Read(table_ref.catalog).into(), + SchemaLock::read(table_ref.catalog, table_ref.schema).into(), + TableLock::Write(table_id).into(), + ]; - LockKey::single_exclusive(key) + LockKey::new(lock_key) } } diff --git a/src/common/meta/src/ddl/truncate_table.rs b/src/common/meta/src/ddl/truncate_table.rs index 90f746104c..d0782f8dea 100644 --- a/src/common/meta/src/ddl/truncate_table.rs +++ b/src/common/meta/src/ddl/truncate_table.rs @@ -37,6 +37,7 @@ use crate::error::{Result, TableNotFoundSnafu}; use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::DeserializedValueWithBytes; +use crate::lock_key::{CatalogLock, SchemaLock, TableLock}; use crate::metrics; use crate::rpc::ddl::TruncateTableTask; use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute}; @@ -75,13 +76,14 @@ impl Procedure for TruncateTableProcedure { fn lock_key(&self) -> LockKey { let table_ref = &self.data.table_ref(); - let key = common_catalog::format_full_table_name( - table_ref.catalog, - table_ref.schema, - table_ref.table, - ); + let table_id = self.data.table_id(); + let lock_key = vec![ + CatalogLock::Read(table_ref.catalog).into(), + SchemaLock::read(table_ref.catalog, table_ref.schema).into(), + TableLock::Write(table_id).into(), + ]; - LockKey::single_exclusive(key) + LockKey::new(lock_key) } } diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 28024b6602..d8b19cebdf 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -19,7 +19,7 @@ use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithI use common_telemetry::tracing_context::{FutureExt, TracingContext}; use common_telemetry::{info, tracing}; use snafu::{OptionExt, ResultExt}; -use store_api::storage::RegionNumber; +use store_api::storage::{RegionNumber, TableId}; use crate::cache_invalidator::CacheInvalidatorRef; use crate::datanode_manager::DatanodeManagerRef; @@ -162,7 +162,7 @@ impl DdlManager { cluster_id: u64, alter_table_task: AlterTableTask, table_info_value: DeserializedValueWithBytes, - physical_table_name: Option, + physical_table_info: Option<(TableId, TableName)>, ) -> Result { let context = self.create_context(); @@ -170,7 +170,7 @@ impl DdlManager { cluster_id, alter_table_task, table_info_value, - physical_table_name, + physical_table_info, context, )?; @@ -341,7 +341,7 @@ async fn handle_alter_table_task( .get_physical_table_id(table_id) .await?; - let physical_table_name = if physical_table_id == table_id { + let physical_table_info = if physical_table_id == table_id { None } else { let physical_table_info = &ddl_manager @@ -353,11 +353,14 @@ async fn handle_alter_table_task( table_name: table_ref.to_string(), })? .table_info; - Some(TableName { - catalog_name: physical_table_info.catalog_name.clone(), - schema_name: physical_table_info.schema_name.clone(), - table_name: physical_table_info.name.clone(), - }) + Some(( + physical_table_id, + TableName { + catalog_name: physical_table_info.catalog_name.clone(), + schema_name: physical_table_info.schema_name.clone(), + table_name: physical_table_info.name.clone(), + }, + )) }; let id = ddl_manager @@ -365,7 +368,7 @@ async fn handle_alter_table_task( cluster_id, alter_table_task, table_info_value, - physical_table_name, + physical_table_info, ) .await?; diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index 45b4db5b08..2ee47e8975 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -27,6 +27,7 @@ pub mod heartbeat; pub mod instruction; pub mod key; pub mod kv_backend; +pub mod lock_key; pub mod metrics; pub mod peer; pub mod range_stream; diff --git a/src/common/meta/src/lock_key.rs b/src/common/meta/src/lock_key.rs new file mode 100644 index 0000000000..ad09c064d3 --- /dev/null +++ b/src/common/meta/src/lock_key.rs @@ -0,0 +1,235 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Display; + +use common_catalog::{format_full_table_name, format_schema_name}; +use common_procedure::StringKey; +use store_api::storage::{RegionId, TableId}; + +const CATALOG_LOCK_PREFIX: &str = "__catalog_lock"; +const SCHEMA_LOCK_PREFIX: &str = "__schema_lock"; +const TABLE_LOCK_PREFIX: &str = "__table_lock"; +const TABLE_NAME_LOCK_PREFIX: &str = "__table_name_lock"; +const REGION_LOCK_PREFIX: &str = "__region_lock"; + +/// [CatalogLock] acquires the lock on the tenant level. +pub enum CatalogLock<'a> { + Read(&'a str), + Write(&'a str), +} + +impl<'a> Display for CatalogLock<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let key = match self { + CatalogLock::Read(s) => s, + CatalogLock::Write(s) => s, + }; + write!(f, "{}/{}", CATALOG_LOCK_PREFIX, key) + } +} + +impl<'a> From> for StringKey { + fn from(value: CatalogLock) -> Self { + match value { + CatalogLock::Write(_) => StringKey::Exclusive(value.to_string()), + CatalogLock::Read(_) => StringKey::Share(value.to_string()), + } + } +} + +/// [SchemaLock] acquires the lock on the database level. +pub enum SchemaLock { + Read(String), + Write(String), +} + +impl SchemaLock { + pub fn read(catalog: &str, schema: &str) -> Self { + Self::Read(format_schema_name(catalog, schema)) + } + + pub fn write(catalog: &str, schema: &str) -> Self { + Self::Write(format_schema_name(catalog, schema)) + } +} + +impl Display for SchemaLock { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let key = match self { + SchemaLock::Read(s) => s, + SchemaLock::Write(s) => s, + }; + write!(f, "{}/{}", SCHEMA_LOCK_PREFIX, key) + } +} + +impl From for StringKey { + fn from(value: SchemaLock) -> Self { + match value { + SchemaLock::Write(_) => StringKey::Exclusive(value.to_string()), + SchemaLock::Read(_) => StringKey::Share(value.to_string()), + } + } +} + +/// [TableNameLock] prevents any procedures trying to create a table named it. +pub enum TableNameLock { + Write(String), +} + +impl TableNameLock { + pub fn new(catalog: &str, schema: &str, table: &str) -> Self { + Self::Write(format_full_table_name(catalog, schema, table)) + } +} + +impl Display for TableNameLock { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let TableNameLock::Write(name) = self; + write!(f, "{}/{}", TABLE_NAME_LOCK_PREFIX, name) + } +} + +impl From for StringKey { + fn from(value: TableNameLock) -> Self { + match value { + TableNameLock::Write(_) => StringKey::Exclusive(value.to_string()), + } + } +} + +/// [TableLock] acquires the lock on the table level. +/// +/// Note: Allows to read/modify the corresponding table's [TableInfoValue](crate::key::table_info::TableInfoValue), +/// [TableRouteValue](crate::key::table_route::TableRouteValue), [TableDatanodeValue](crate::key::datanode_table::DatanodeTableValue). +pub enum TableLock { + Read(TableId), + Write(TableId), +} + +impl Display for TableLock { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let key = match self { + TableLock::Read(s) => s, + TableLock::Write(s) => s, + }; + write!(f, "{}/{}", TABLE_LOCK_PREFIX, key) + } +} + +impl From for StringKey { + fn from(value: TableLock) -> Self { + match value { + TableLock::Write(_) => StringKey::Exclusive(value.to_string()), + TableLock::Read(_) => StringKey::Share(value.to_string()), + } + } +} + +/// [RegionLock] acquires the lock on the region level. +/// +/// Note: +/// - Allows modification the corresponding region's [TableRouteValue](crate::key::table_route::TableRouteValue), +/// [TableDatanodeValue](crate::key::datanode_table::DatanodeTableValue) even if +/// it acquires the [RegionLock::Write] only without acquiring the [TableLock::Write]. +/// +/// - Should acquire [TableLock] of the table at same procedure. +/// +/// TODO(weny): we should consider separating TableRouteValue into finer keys. +pub enum RegionLock { + Read(RegionId), + Write(RegionId), +} + +impl Display for RegionLock { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let key = match self { + RegionLock::Read(s) => s.as_u64(), + RegionLock::Write(s) => s.as_u64(), + }; + write!(f, "{}/{}", REGION_LOCK_PREFIX, key) + } +} + +impl From for StringKey { + fn from(value: RegionLock) -> Self { + match value { + RegionLock::Write(_) => StringKey::Exclusive(value.to_string()), + RegionLock::Read(_) => StringKey::Share(value.to_string()), + } + } +} + +#[cfg(test)] +mod tests { + use common_procedure::StringKey; + + use crate::lock_key::*; + + #[test] + fn test_lock_key() { + // The catalog lock + let string_key: StringKey = CatalogLock::Read("foo").into(); + assert_eq!( + string_key, + StringKey::Share(format!("{}/{}", CATALOG_LOCK_PREFIX, "foo")) + ); + let string_key: StringKey = CatalogLock::Write("foo").into(); + assert_eq!( + string_key, + StringKey::Exclusive(format!("{}/{}", CATALOG_LOCK_PREFIX, "foo")) + ); + // The schema lock + let string_key: StringKey = SchemaLock::read("foo", "bar").into(); + assert_eq!( + string_key, + StringKey::Share(format!("{}/{}", SCHEMA_LOCK_PREFIX, "foo.bar")) + ); + let string_key: StringKey = SchemaLock::write("foo", "bar").into(); + assert_eq!( + string_key, + StringKey::Exclusive(format!("{}/{}", SCHEMA_LOCK_PREFIX, "foo.bar")) + ); + // The table lock + let string_key: StringKey = TableLock::Read(1024).into(); + assert_eq!( + string_key, + StringKey::Share(format!("{}/{}", TABLE_LOCK_PREFIX, 1024)) + ); + let string_key: StringKey = TableLock::Write(1024).into(); + assert_eq!( + string_key, + StringKey::Exclusive(format!("{}/{}", TABLE_LOCK_PREFIX, 1024)) + ); + // The table name lock + let string_key: StringKey = TableNameLock::new("foo", "bar", "baz").into(); + assert_eq!( + string_key, + StringKey::Exclusive(format!("{}/{}", TABLE_NAME_LOCK_PREFIX, "foo.bar.baz")) + ); + // The region lock + let region_id = RegionId::new(1024, 1); + let string_key: StringKey = RegionLock::Read(region_id).into(); + assert_eq!( + string_key, + StringKey::Share(format!("{}/{}", REGION_LOCK_PREFIX, region_id.as_u64())) + ); + let string_key: StringKey = RegionLock::Write(region_id).into(); + assert_eq!( + string_key, + StringKey::Exclusive(format!("{}/{}", REGION_LOCK_PREFIX, region_id.as_u64())) + ); + } +} diff --git a/src/common/procedure/src/lib.rs b/src/common/procedure/src/lib.rs index 544d5d8036..3386640572 100644 --- a/src/common/procedure/src/lib.rs +++ b/src/common/procedure/src/lib.rs @@ -26,6 +26,6 @@ pub mod watcher; pub use crate::error::{Error, Result}; pub use crate::procedure::{ BoxedProcedure, Context, ContextProvider, LockKey, Procedure, ProcedureId, ProcedureManager, - ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, + ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, StringKey, }; pub use crate::watcher::Watcher; diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index 37468437b2..0688d57593 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -28,6 +28,7 @@ use async_trait::async_trait; use common_meta::key::datanode_table::DatanodeTableKey; use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::ResettableKvBackendRef; +use common_meta::lock_key::{RegionLock, TableLock}; use common_meta::{ClusterId, RegionIdent}; use common_procedure::error::{ Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu, @@ -40,13 +41,12 @@ use common_telemetry::{error, info, warn}; use failover_start::RegionFailoverStart; use serde::{Deserialize, Serialize}; use snafu::ResultExt; -use store_api::storage::RegionNumber; +use store_api::storage::{RegionId, RegionNumber}; use table::metadata::TableId; use crate::error::{Error, RegisterProcedureLoaderSnafu, Result, TableMetadataManagerSnafu}; use crate::lock::DistLockRef; use crate::metasrv::{SelectorContext, SelectorRef}; -use crate::procedure::utils::region_lock_key; use crate::service::mailbox::MailboxRef; const OPEN_REGION_MESSAGE_TIMEOUT: Duration = Duration::from_secs(30); @@ -372,8 +372,17 @@ impl Procedure for RegionFailoverProcedure { fn lock_key(&self) -> LockKey { let region_ident = &self.node.failed_region; - let region_key = region_lock_key(region_ident.table_id, region_ident.region_number); - LockKey::single_exclusive(region_key) + // TODO(weny): acquires the catalog, schema read locks. + let lock_key = vec![ + TableLock::Read(region_ident.table_id).into(), + RegionLock::Write(RegionId::new( + region_ident.table_id, + region_ident.region_number, + )) + .into(), + ]; + + LockKey::new(lock_key) } } diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index ec46686f47..8be3ecfc98 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -35,13 +35,14 @@ use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue}; use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_route::TableRouteValue; use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; +use common_meta::lock_key::{RegionLock, TableLock}; use common_meta::peer::Peer; use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard}; use common_meta::ClusterId; use common_procedure::error::{ Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu, }; -use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; +use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status, StringKey}; pub use manager::RegionMigrationProcedureTask; use serde::{Deserialize, Serialize}; use snafu::{location, Location, OptionExt, ResultExt}; @@ -50,7 +51,6 @@ use tokio::time::Instant; use self::migration_start::RegionMigrationStart; use crate::error::{self, Error, Result}; -use crate::procedure::utils::region_lock_key; use crate::service::mailbox::{BroadcastChannel, MailboxRef}; /// It's shared in each step and available even after recovering. @@ -71,8 +71,15 @@ pub struct PersistentContext { } impl PersistentContext { - pub fn lock_key(&self) -> String { - region_lock_key(self.region_id.table_id(), self.region_id.region_number()) + pub fn lock_key(&self) -> Vec { + let region_id = self.region_id; + // TODO(weny): acquires the catalog, schema read locks. + let lock_key = vec![ + TableLock::Read(region_id.table_id()).into(), + RegionLock::Write(region_id).into(), + ]; + + lock_key } } @@ -418,8 +425,7 @@ impl Procedure for RegionMigrationProcedure { } fn lock_key(&self) -> LockKey { - let key = self.context.persistent_ctx.lock_key(); - LockKey::single_exclusive(key) + LockKey::new(self.context.persistent_ctx.lock_key()) } } @@ -447,7 +453,7 @@ mod tests { #[test] fn test_lock_key() { let persistent_context = new_persistent_context(); - let expected_key = persistent_context.lock_key(); + let expected_keys = persistent_context.lock_key(); let env = TestingEnv::new(); let context = env.context_factory(); @@ -455,13 +461,11 @@ mod tests { let procedure = RegionMigrationProcedure::new(persistent_context, context); let key = procedure.lock_key(); - let keys = key - .keys_to_lock() - .cloned() - .map(|s| s.into_string()) - .collect::>(); + let keys = key.keys_to_lock().cloned().collect::>(); - assert!(keys.contains(&expected_key)); + for key in expected_keys { + assert!(keys.contains(&key)); + } } #[test] diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index 1441aa5968..8cf2a34c61 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -12,12 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use store_api::storage::{RegionNumber, TableId}; - -pub fn region_lock_key(table_id: TableId, region_number: RegionNumber) -> String { - format!("{}/region-{}", table_id, region_number) -} - #[cfg(feature = "mock")] pub mod mock { use std::io::Error;