From a29e7ebb7de7cbd9f0b6be5a0e35119fd23a533a Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Thu, 14 Mar 2024 19:41:23 +0800 Subject: [PATCH] feat: acquire all locks in procedure (#3514) * feat: acquire catalog and schema lock in region failover * chore: remove unused code * feat!: acquire catalog and schema lock in region migration * feat: acquire catalog and schema lock in create table --- .../meta/src/ddl/create_logical_tables.rs | 11 ++- src/common/meta/src/ddl/create_table.rs | 12 ++-- src/meta-srv/src/procedure/region_failover.rs | 68 +++++++++++++------ .../src/procedure/region_migration.rs | 19 +++--- .../downgrade_leader_region.rs | 2 + .../src/procedure/region_migration/manager.rs | 65 ++++++++++++------ .../procedure/region_migration/test_util.rs | 18 +---- .../upgrade_candidate_region.rs | 2 + tests-integration/tests/region_failover.rs | 2 + 9 files changed, 124 insertions(+), 75 deletions(-) diff --git a/src/common/meta/src/ddl/create_logical_tables.rs b/src/common/meta/src/ddl/create_logical_tables.rs index 16292e14bf..8831e3aeb2 100644 --- a/src/common/meta/src/ddl/create_logical_tables.rs +++ b/src/common/meta/src/ddl/create_logical_tables.rs @@ -36,7 +36,7 @@ use crate::ddl::DdlContext; use crate::error::{Result, TableAlreadyExistsSnafu}; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; -use crate::lock_key::{TableLock, TableNameLock}; +use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock}; use crate::peer::Peer; use crate::rpc::ddl::CreateTableTask; use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute}; @@ -307,8 +307,15 @@ impl Procedure for CreateLogicalTablesProcedure { } fn lock_key(&self) -> LockKey { - let mut lock_key = Vec::with_capacity(1 + self.creator.data.tasks.len()); + // CatalogLock, SchemaLock, + // TableLock + // TableNameLock(s) + let mut lock_key = Vec::with_capacity(2 + 1 + self.creator.data.tasks.len()); + let table_ref = self.creator.data.tasks[0].table_ref(); + lock_key.push(CatalogLock::Read(table_ref.catalog).into()); + lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into()); lock_key.push(TableLock::Write(self.creator.data.physical_table_id()).into()); + for task in &self.creator.data.tasks { lock_key.push( TableNameLock::new( diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index 84f5ada3dd..103e85c009 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -38,7 +38,7 @@ use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext}; use crate::error::{self, Result, TableRouteNotFoundSnafu}; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; -use crate::lock_key::TableNameLock; +use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock}; use crate::region_keeper::OperatingRegionGuard; use crate::rpc::ddl::CreateTableTask; use crate::rpc::router::{ @@ -343,11 +343,11 @@ impl Procedure for CreateTableProcedure { fn lock_key(&self) -> LockKey { let table_ref = &self.creator.data.table_ref(); - LockKey::single(TableNameLock::new( - table_ref.catalog, - table_ref.schema, - table_ref.table, - )) + LockKey::new(vec![ + CatalogLock::Read(table_ref.catalog).into(), + SchemaLock::read(table_ref.catalog, table_ref.schema).into(), + TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(), + ]) } } diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index fbc8f781a0..ac0e4ecb7b 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -28,7 +28,8 @@ use async_trait::async_trait; use common_meta::key::datanode_table::DatanodeTableKey; use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::ResettableKvBackendRef; -use common_meta::lock_key::{RegionLock, TableLock}; +use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock}; +use common_meta::table_name::TableName; use common_meta::{ClusterId, RegionIdent}; use common_procedure::error::{ Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu, @@ -44,7 +45,7 @@ use snafu::ResultExt; use store_api::storage::{RegionId, RegionNumber}; use table::metadata::TableId; -use crate::error::{RegisterProcedureLoaderSnafu, Result, TableMetadataManagerSnafu}; +use crate::error::{self, RegisterProcedureLoaderSnafu, Result, TableMetadataManagerSnafu}; use crate::lock::DistLockRef; use crate::metasrv::{SelectorContext, SelectorRef}; use crate::service::mailbox::MailboxRef; @@ -164,7 +165,14 @@ impl RegionFailoverManager { return Ok(()); }; - if !self.table_exists(failed_region).await? { + let table_info = self + .table_metadata_manager + .table_info_manager() + .get(failed_region.table_id) + .await + .context(error::TableMetadataManagerSnafu)?; + + if table_info.is_none() { // The table could be dropped before the failure detector knows it. Then the region // failover is not needed. // Or the table could be renamed. But we will have a new region ident to detect failure. @@ -178,7 +186,15 @@ impl RegionFailoverManager { } let context = self.create_context(); - let procedure = RegionFailoverProcedure::new(failed_region.clone(), context); + // Safety: Check before. + let table_info = table_info.unwrap(); + let TableName { + catalog_name, + schema_name, + .. + } = table_info.table_name(); + let procedure = + RegionFailoverProcedure::new(catalog_name, schema_name, failed_region.clone(), context); let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); let procedure_id = procedure_with_id.id; info!("Starting region failover procedure {procedure_id} for region {failed_region:?}"); @@ -206,16 +222,6 @@ impl RegionFailoverManager { Ok(()) } - async fn table_exists(&self, failed_region: &RegionIdent) -> Result { - Ok(self - .table_metadata_manager - .table_route_manager() - .get_region_distribution(failed_region.table_id) - .await - .context(TableMetadataManagerSnafu)? - .is_some()) - } - async fn failed_region_exists(&self, failed_region: &RegionIdent) -> Result { let table_id = failed_region.table_id; let datanode_id = failed_region.datanode_id; @@ -238,10 +244,17 @@ impl RegionFailoverManager { } } +#[derive(Serialize, Deserialize, Debug)] +struct LockMeta { + catalog: String, + schema: String, +} + /// A "Node" in the state machine of region failover procedure. /// Contains the current state and the data. #[derive(Serialize, Deserialize, Debug)] struct Node { + lock_meta: LockMeta, failed_region: RegionIdent, state: Box, } @@ -330,9 +343,15 @@ pub struct RegionFailoverProcedure { impl RegionFailoverProcedure { const TYPE_NAME: &'static str = "metasrv-procedure::RegionFailover"; - pub fn new(failed_region: RegionIdent, context: RegionFailoverContext) -> Self { + pub fn new( + catalog: String, + schema: String, + failed_region: RegionIdent, + context: RegionFailoverContext, + ) -> Self { let state = RegionFailoverStart::new(); let node = Node { + lock_meta: LockMeta { catalog, schema }, failed_region, state: Box::new(state), }; @@ -372,8 +391,9 @@ impl Procedure for RegionFailoverProcedure { fn lock_key(&self) -> LockKey { let region_ident = &self.node.failed_region; - // TODO(weny): acquires the catalog, schema read locks. let lock_key = vec![ + CatalogLock::Read(&self.node.lock_meta.catalog).into(), + SchemaLock::read(&self.node.lock_meta.catalog, &self.node.lock_meta.catalog).into(), TableLock::Read(region_ident.table_id).into(), RegionLock::Write(RegionId::new( region_ident.table_id, @@ -568,6 +588,8 @@ mod tests { let failed_region = env.failed_region(1).await; let mut procedure = Box::new(RegionFailoverProcedure::new( + "greptime".into(), + "public".into(), failed_region.clone(), env.context.clone(), )) as BoxedProcedure; @@ -671,7 +693,7 @@ mod tests { assert_eq!( procedure.dump().unwrap(), - r#"{"failed_region":{"cluster_id":0,"datanode_id":1,"table_id":1,"region_number":1,"engine":"mito2"},"state":{"region_failover_state":"RegionFailoverEnd"}}"# + r#"{"lock_meta":{"catalog":"greptime","schema":"public"},"failed_region":{"cluster_id":0,"datanode_id":1,"table_id":1,"region_number":1,"engine":"mito2"},"state":{"region_failover_state":"RegionFailoverEnd"}}"# ); // Verifies that the failed region (region 1) is moved from failed datanode (datanode 1) to the candidate datanode. @@ -700,6 +722,10 @@ mod tests { let state = RegionFailoverStart::new(); let node = Node { + lock_meta: LockMeta { + catalog: "greptime".into(), + schema: "public".into(), + }, failed_region, state: Box::new(state), }; @@ -711,12 +737,12 @@ mod tests { let s = procedure.dump().unwrap(); assert_eq!( s, - r#"{"failed_region":{"cluster_id":0,"datanode_id":1,"table_id":1,"region_number":1,"engine":"mito2"},"state":{"region_failover_state":"RegionFailoverStart","failover_candidate":null}}"# + r#"{"lock_meta":{"catalog":"greptime","schema":"public"},"failed_region":{"cluster_id":0,"datanode_id":1,"table_id":1,"region_number":1,"engine":"mito2"},"state":{"region_failover_state":"RegionFailoverStart","failover_candidate":null}}"#, ); let n: Node = serde_json::from_str(&s).unwrap(); assert_eq!( format!("{n:?}"), - r#"Node { failed_region: RegionIdent { cluster_id: 0, datanode_id: 1, table_id: 1, region_number: 1, engine: "mito2" }, state: RegionFailoverStart { failover_candidate: None } }"# + r#"Node { lock_meta: LockMeta { catalog: "greptime", schema: "public" }, failed_region: RegionIdent { cluster_id: 0, datanode_id: 1, table_id: 1, region_number: 1, engine: "mito2" }, state: RegionFailoverStart { failover_candidate: None } }"#, ); } @@ -765,6 +791,10 @@ mod tests { let state = RegionFailoverStart::new(); let node = Node { + lock_meta: LockMeta { + catalog: "greptime".into(), + schema: "public".into(), + }, failed_region, state: Box::new(state), }; diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 011be7c88c..9e49d266cc 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -13,8 +13,6 @@ // limitations under the License. pub(crate) mod downgrade_leader_region; -// TODO(weny): remove it. -#[allow(dead_code)] pub(crate) mod manager; pub(crate) mod migration_abort; pub(crate) mod migration_end; @@ -36,7 +34,7 @@ use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue}; use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_route::TableRouteValue; use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; -use common_meta::lock_key::{RegionLock, TableLock}; +use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock}; use common_meta::peer::Peer; use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard}; use common_meta::ClusterId; @@ -61,6 +59,10 @@ use crate::service::mailbox::{BroadcastChannel, MailboxRef}; /// **Notes: Stores with too large data in the context might incur replication overhead.** #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct PersistentContext { + /// The table catalog. + catalog: String, + /// The table schema. + schema: String, /// The Id of the cluster. cluster_id: ClusterId, /// The [Peer] of migration source. @@ -81,8 +83,9 @@ fn default_replay_timeout() -> Duration { impl PersistentContext { pub fn lock_key(&self) -> Vec { let region_id = self.region_id; - // TODO(weny): acquires the catalog, schema read locks. let lock_key = vec![ + CatalogLock::Read(&self.catalog).into(), + SchemaLock::read(&self.catalog, &self.schema).into(), TableLock::Read(region_id.table_id()).into(), RegionLock::Write(region_id).into(), ]; @@ -185,8 +188,6 @@ impl ContextFactory for DefaultContextFactory { } } -// TODO(weny): remove it. -#[allow(dead_code)] /// The context of procedure execution. pub struct Context { persistent_ctx: PersistentContext, @@ -368,7 +369,6 @@ pub struct RegionMigrationProcedure { context: Context, } -// TODO(weny): remove it. #[allow(dead_code)] impl RegionMigrationProcedure { const TYPE_NAME: &'static str = "metasrv-procedure::RegionMigration"; @@ -487,8 +487,7 @@ mod tests { let procedure = RegionMigrationProcedure::new(persistent_context, context); let serialized = procedure.dump().unwrap(); - - let expected = r#"{"persistent_ctx":{"cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105,"replay_timeout":"1s"},"state":{"region_migration_state":"RegionMigrationStart"}}"#; + let expected = r#"{"persistent_ctx":{"catalog":"greptime","schema":"public","cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105,"replay_timeout":"1s"},"state":{"region_migration_state":"RegionMigrationStart"}}"#; assert_eq!(expected, serialized); } @@ -496,7 +495,7 @@ mod tests { fn test_backward_compatibility() { let persistent_ctx = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1)); // NOTES: Changes it will break backward compatibility. - let serialized = r#"{"cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105}"#; + let serialized = r#"{"catalog":"greptime","schema":"public","cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105}"#; let deserialized: PersistentContext = serde_json::from_str(serialized).unwrap(); assert_eq!(persistent_ctx, deserialized); diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs index 8d1c439da4..340ef375a6 100644 --- a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs @@ -226,6 +226,8 @@ mod tests { fn new_persistent_context() -> PersistentContext { PersistentContext { + catalog: "greptime".into(), + schema: "public".into(), from_peer: Peer::empty(1), to_peer: Peer::empty(2), region_id: RegionId::new(1024, 1), diff --git a/src/meta-srv/src/procedure/region_migration/manager.rs b/src/meta-srv/src/procedure/region_migration/manager.rs index 000520ee57..7dde629cbd 100644 --- a/src/meta-srv/src/procedure/region_migration/manager.rs +++ b/src/meta-srv/src/procedure/region_migration/manager.rs @@ -18,9 +18,11 @@ use std::fmt::Display; use std::sync::{Arc, RwLock}; use std::time::Duration; +use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_route::TableRouteValue; use common_meta::peer::Peer; use common_meta::rpc::router::RegionRoute; +use common_meta::table_name::TableName; use common_meta::ClusterId; use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId}; use common_telemetry::{error, info}; @@ -93,26 +95,6 @@ impl Display for RegionMigrationProcedureTask { } } -impl From for PersistentContext { - fn from( - RegionMigrationProcedureTask { - cluster_id, - region_id, - from_peer, - to_peer, - replay_timeout, - }: RegionMigrationProcedureTask, - ) -> Self { - PersistentContext { - cluster_id, - from_peer, - to_peer, - region_id, - replay_timeout, - } - } -} - impl RegionMigrationManager { /// Returns new [RegionMigrationManager] pub(crate) fn new( @@ -188,6 +170,22 @@ impl RegionMigrationManager { Ok(table_route) } + async fn retrieve_table_info(&self, region_id: RegionId) -> Result { + let table_route = self + .context_factory + .table_metadata_manager + .table_info_manager() + .get(region_id.table_id()) + .await + .context(error::TableMetadataManagerSnafu)? + .context(error::TableInfoNotFoundSnafu { + table_id: region_id.table_id(), + })? + .into_inner(); + + Ok(table_route) + } + /// Verifies the type of region migration table route. fn verify_table_route( &self, @@ -279,8 +277,31 @@ impl RegionMigrationManager { self.verify_region_leader_peer(®ion_route, &task)?; - let procedure = - RegionMigrationProcedure::new(task.clone().into(), self.context_factory.clone()); + let table_info = self.retrieve_table_info(region_id).await?; + let TableName { + catalog_name, + schema_name, + .. + } = table_info.table_name(); + let RegionMigrationProcedureTask { + cluster_id, + region_id, + from_peer, + to_peer, + replay_timeout, + } = task.clone(); + let procedure = RegionMigrationProcedure::new( + PersistentContext { + catalog: catalog_name, + schema: schema_name, + cluster_id, + region_id, + from_peer, + to_peer, + replay_timeout, + }, + self.context_factory.clone(), + ); let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); let procedure_id = procedure_with_id.id; info!("Starting region migration procedure {procedure_id} for {task}"); diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index 8d22fd8104..55cc9003b6 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -278,6 +278,8 @@ pub fn send_mock_reply( /// Generates a [PersistentContext]. pub fn new_persistent_context(from: u64, to: u64, region_id: RegionId) -> PersistentContext { PersistentContext { + catalog: "greptime".into(), + schema: "public".into(), from_peer: Peer::empty(from), to_peer: Peer::empty(to), region_id, @@ -297,16 +299,6 @@ pub(crate) struct ProcedureMigrationTestSuite { pub(crate) type BeforeTest = Arc BoxFuture<'_, ()> + Send + Sync>; -/// Custom assertion. -pub(crate) type CustomAssertion = Arc< - dyn Fn( - &mut ProcedureMigrationTestSuite, - Result<(Box, Status)>, - ) -> BoxFuture<'_, Result<()>> - + Send - + Sync, ->; - /// State assertion function. pub(crate) type StateAssertion = Arc; @@ -316,14 +308,11 @@ pub(crate) type StatusAssertion = Arc; /// Error assertion function. pub(crate) type ErrorAssertion = Arc; -// TODO(weny): Remove it. -#[allow(dead_code)] /// The type of assertion. #[derive(Clone)] pub(crate) enum Assertion { Simple(StateAssertion, StatusAssertion), Error(ErrorAssertion), - Custom(CustomAssertion), } impl Assertion { @@ -384,9 +373,6 @@ impl ProcedureMigrationTestSuite { let error = result.unwrap_err(); error_assert(error); } - Assertion::Custom(assert_fn) => { - assert_fn(self, result).await?; - } } Ok(()) diff --git a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs index bf1a3e6b1f..5591427d0d 100644 --- a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs @@ -232,6 +232,8 @@ mod tests { fn new_persistent_context() -> PersistentContext { PersistentContext { + catalog: "greptime".into(), + schema: "public".into(), from_peer: Peer::empty(1), to_peer: Peer::empty(2), region_id: RegionId::new(1024, 1), diff --git a/tests-integration/tests/region_failover.rs b/tests-integration/tests/region_failover.rs index ae32bcbc82..74289149c0 100644 --- a/tests-integration/tests/region_failover.rs +++ b/tests-integration/tests/region_failover.rs @@ -346,6 +346,8 @@ async fn run_region_failover_procedure( let meta_srv = &cluster.meta_srv; let procedure_manager = meta_srv.procedure_manager(); let procedure = RegionFailoverProcedure::new( + "greptime".into(), + "public".into(), failed_region.clone(), RegionFailoverContext { region_lease_secs: 10,