feat: acquire all locks in procedure (#3514)

* feat: acquire catalog and schema lock in region failover

* chore: remove unused code

* feat!: acquire catalog and schema lock in region migration

* feat: acquire catalog and schema lock in create table
This commit is contained in:
Weny Xu
2024-03-14 19:41:23 +08:00
committed by GitHub
parent 8ca9e01455
commit a29e7ebb7d
9 changed files with 124 additions and 75 deletions

View File

@@ -36,7 +36,7 @@ use crate::ddl::DdlContext;
use crate::error::{Result, TableAlreadyExistsSnafu};
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::lock_key::{TableLock, TableNameLock};
use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
use crate::peer::Peer;
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
@@ -307,8 +307,15 @@ impl Procedure for CreateLogicalTablesProcedure {
}
fn lock_key(&self) -> LockKey {
let mut lock_key = Vec::with_capacity(1 + self.creator.data.tasks.len());
// CatalogLock, SchemaLock,
// TableLock
// TableNameLock(s)
let mut lock_key = Vec::with_capacity(2 + 1 + self.creator.data.tasks.len());
let table_ref = self.creator.data.tasks[0].table_ref();
lock_key.push(CatalogLock::Read(table_ref.catalog).into());
lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
lock_key.push(TableLock::Write(self.creator.data.physical_table_id()).into());
for task in &self.creator.data.tasks {
lock_key.push(
TableNameLock::new(

View File

@@ -38,7 +38,7 @@ use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext};
use crate::error::{self, Result, TableRouteNotFoundSnafu};
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::lock_key::TableNameLock;
use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock};
use crate::region_keeper::OperatingRegionGuard;
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{
@@ -343,11 +343,11 @@ impl Procedure for CreateTableProcedure {
fn lock_key(&self) -> LockKey {
let table_ref = &self.creator.data.table_ref();
LockKey::single(TableNameLock::new(
table_ref.catalog,
table_ref.schema,
table_ref.table,
))
LockKey::new(vec![
CatalogLock::Read(table_ref.catalog).into(),
SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(),
])
}
}

View File

@@ -28,7 +28,8 @@ use async_trait::async_trait;
use common_meta::key::datanode_table::DatanodeTableKey;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::ResettableKvBackendRef;
use common_meta::lock_key::{RegionLock, TableLock};
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock};
use common_meta::table_name::TableName;
use common_meta::{ClusterId, RegionIdent};
use common_procedure::error::{
Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
@@ -44,7 +45,7 @@ use snafu::ResultExt;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;
use crate::error::{RegisterProcedureLoaderSnafu, Result, TableMetadataManagerSnafu};
use crate::error::{self, RegisterProcedureLoaderSnafu, Result, TableMetadataManagerSnafu};
use crate::lock::DistLockRef;
use crate::metasrv::{SelectorContext, SelectorRef};
use crate::service::mailbox::MailboxRef;
@@ -164,7 +165,14 @@ impl RegionFailoverManager {
return Ok(());
};
if !self.table_exists(failed_region).await? {
let table_info = self
.table_metadata_manager
.table_info_manager()
.get(failed_region.table_id)
.await
.context(error::TableMetadataManagerSnafu)?;
if table_info.is_none() {
// The table could be dropped before the failure detector knows it. Then the region
// failover is not needed.
// Or the table could be renamed. But we will have a new region ident to detect failure.
@@ -178,7 +186,15 @@ impl RegionFailoverManager {
}
let context = self.create_context();
let procedure = RegionFailoverProcedure::new(failed_region.clone(), context);
// Safety: Check before.
let table_info = table_info.unwrap();
let TableName {
catalog_name,
schema_name,
..
} = table_info.table_name();
let procedure =
RegionFailoverProcedure::new(catalog_name, schema_name, failed_region.clone(), context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let procedure_id = procedure_with_id.id;
info!("Starting region failover procedure {procedure_id} for region {failed_region:?}");
@@ -206,16 +222,6 @@ impl RegionFailoverManager {
Ok(())
}
async fn table_exists(&self, failed_region: &RegionIdent) -> Result<bool> {
Ok(self
.table_metadata_manager
.table_route_manager()
.get_region_distribution(failed_region.table_id)
.await
.context(TableMetadataManagerSnafu)?
.is_some())
}
async fn failed_region_exists(&self, failed_region: &RegionIdent) -> Result<bool> {
let table_id = failed_region.table_id;
let datanode_id = failed_region.datanode_id;
@@ -238,10 +244,17 @@ impl RegionFailoverManager {
}
}
#[derive(Serialize, Deserialize, Debug)]
struct LockMeta {
catalog: String,
schema: String,
}
/// A "Node" in the state machine of region failover procedure.
/// Contains the current state and the data.
#[derive(Serialize, Deserialize, Debug)]
struct Node {
lock_meta: LockMeta,
failed_region: RegionIdent,
state: Box<dyn State>,
}
@@ -330,9 +343,15 @@ pub struct RegionFailoverProcedure {
impl RegionFailoverProcedure {
const TYPE_NAME: &'static str = "metasrv-procedure::RegionFailover";
pub fn new(failed_region: RegionIdent, context: RegionFailoverContext) -> Self {
pub fn new(
catalog: String,
schema: String,
failed_region: RegionIdent,
context: RegionFailoverContext,
) -> Self {
let state = RegionFailoverStart::new();
let node = Node {
lock_meta: LockMeta { catalog, schema },
failed_region,
state: Box::new(state),
};
@@ -372,8 +391,9 @@ impl Procedure for RegionFailoverProcedure {
fn lock_key(&self) -> LockKey {
let region_ident = &self.node.failed_region;
// TODO(weny): acquires the catalog, schema read locks.
let lock_key = vec![
CatalogLock::Read(&self.node.lock_meta.catalog).into(),
SchemaLock::read(&self.node.lock_meta.catalog, &self.node.lock_meta.catalog).into(),
TableLock::Read(region_ident.table_id).into(),
RegionLock::Write(RegionId::new(
region_ident.table_id,
@@ -568,6 +588,8 @@ mod tests {
let failed_region = env.failed_region(1).await;
let mut procedure = Box::new(RegionFailoverProcedure::new(
"greptime".into(),
"public".into(),
failed_region.clone(),
env.context.clone(),
)) as BoxedProcedure;
@@ -671,7 +693,7 @@ mod tests {
assert_eq!(
procedure.dump().unwrap(),
r#"{"failed_region":{"cluster_id":0,"datanode_id":1,"table_id":1,"region_number":1,"engine":"mito2"},"state":{"region_failover_state":"RegionFailoverEnd"}}"#
r#"{"lock_meta":{"catalog":"greptime","schema":"public"},"failed_region":{"cluster_id":0,"datanode_id":1,"table_id":1,"region_number":1,"engine":"mito2"},"state":{"region_failover_state":"RegionFailoverEnd"}}"#
);
// Verifies that the failed region (region 1) is moved from failed datanode (datanode 1) to the candidate datanode.
@@ -700,6 +722,10 @@ mod tests {
let state = RegionFailoverStart::new();
let node = Node {
lock_meta: LockMeta {
catalog: "greptime".into(),
schema: "public".into(),
},
failed_region,
state: Box::new(state),
};
@@ -711,12 +737,12 @@ mod tests {
let s = procedure.dump().unwrap();
assert_eq!(
s,
r#"{"failed_region":{"cluster_id":0,"datanode_id":1,"table_id":1,"region_number":1,"engine":"mito2"},"state":{"region_failover_state":"RegionFailoverStart","failover_candidate":null}}"#
r#"{"lock_meta":{"catalog":"greptime","schema":"public"},"failed_region":{"cluster_id":0,"datanode_id":1,"table_id":1,"region_number":1,"engine":"mito2"},"state":{"region_failover_state":"RegionFailoverStart","failover_candidate":null}}"#,
);
let n: Node = serde_json::from_str(&s).unwrap();
assert_eq!(
format!("{n:?}"),
r#"Node { failed_region: RegionIdent { cluster_id: 0, datanode_id: 1, table_id: 1, region_number: 1, engine: "mito2" }, state: RegionFailoverStart { failover_candidate: None } }"#
r#"Node { lock_meta: LockMeta { catalog: "greptime", schema: "public" }, failed_region: RegionIdent { cluster_id: 0, datanode_id: 1, table_id: 1, region_number: 1, engine: "mito2" }, state: RegionFailoverStart { failover_candidate: None } }"#,
);
}
@@ -765,6 +791,10 @@ mod tests {
let state = RegionFailoverStart::new();
let node = Node {
lock_meta: LockMeta {
catalog: "greptime".into(),
schema: "public".into(),
},
failed_region,
state: Box::new(state),
};

View File

@@ -13,8 +13,6 @@
// limitations under the License.
pub(crate) mod downgrade_leader_region;
// TODO(weny): remove it.
#[allow(dead_code)]
pub(crate) mod manager;
pub(crate) mod migration_abort;
pub(crate) mod migration_end;
@@ -36,7 +34,7 @@ use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use common_meta::lock_key::{RegionLock, TableLock};
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock};
use common_meta::peer::Peer;
use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
use common_meta::ClusterId;
@@ -61,6 +59,10 @@ use crate::service::mailbox::{BroadcastChannel, MailboxRef};
/// **Notes: Stores with too large data in the context might incur replication overhead.**
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PersistentContext {
/// The table catalog.
catalog: String,
/// The table schema.
schema: String,
/// The Id of the cluster.
cluster_id: ClusterId,
/// The [Peer] of migration source.
@@ -81,8 +83,9 @@ fn default_replay_timeout() -> Duration {
impl PersistentContext {
pub fn lock_key(&self) -> Vec<StringKey> {
let region_id = self.region_id;
// TODO(weny): acquires the catalog, schema read locks.
let lock_key = vec![
CatalogLock::Read(&self.catalog).into(),
SchemaLock::read(&self.catalog, &self.schema).into(),
TableLock::Read(region_id.table_id()).into(),
RegionLock::Write(region_id).into(),
];
@@ -185,8 +188,6 @@ impl ContextFactory for DefaultContextFactory {
}
}
// TODO(weny): remove it.
#[allow(dead_code)]
/// The context of procedure execution.
pub struct Context {
persistent_ctx: PersistentContext,
@@ -368,7 +369,6 @@ pub struct RegionMigrationProcedure {
context: Context,
}
// TODO(weny): remove it.
#[allow(dead_code)]
impl RegionMigrationProcedure {
const TYPE_NAME: &'static str = "metasrv-procedure::RegionMigration";
@@ -487,8 +487,7 @@ mod tests {
let procedure = RegionMigrationProcedure::new(persistent_context, context);
let serialized = procedure.dump().unwrap();
let expected = r#"{"persistent_ctx":{"cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105,"replay_timeout":"1s"},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
let expected = r#"{"persistent_ctx":{"catalog":"greptime","schema":"public","cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105,"replay_timeout":"1s"},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
assert_eq!(expected, serialized);
}
@@ -496,7 +495,7 @@ mod tests {
fn test_backward_compatibility() {
let persistent_ctx = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
// NOTES: Changes it will break backward compatibility.
let serialized = r#"{"cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105}"#;
let serialized = r#"{"catalog":"greptime","schema":"public","cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105}"#;
let deserialized: PersistentContext = serde_json::from_str(serialized).unwrap();
assert_eq!(persistent_ctx, deserialized);

View File

@@ -226,6 +226,8 @@ mod tests {
fn new_persistent_context() -> PersistentContext {
PersistentContext {
catalog: "greptime".into(),
schema: "public".into(),
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
region_id: RegionId::new(1024, 1),

View File

@@ -18,9 +18,11 @@ use std::fmt::Display;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_route::TableRouteValue;
use common_meta::peer::Peer;
use common_meta::rpc::router::RegionRoute;
use common_meta::table_name::TableName;
use common_meta::ClusterId;
use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId};
use common_telemetry::{error, info};
@@ -93,26 +95,6 @@ impl Display for RegionMigrationProcedureTask {
}
}
impl From<RegionMigrationProcedureTask> for PersistentContext {
fn from(
RegionMigrationProcedureTask {
cluster_id,
region_id,
from_peer,
to_peer,
replay_timeout,
}: RegionMigrationProcedureTask,
) -> Self {
PersistentContext {
cluster_id,
from_peer,
to_peer,
region_id,
replay_timeout,
}
}
}
impl RegionMigrationManager {
/// Returns new [RegionMigrationManager]
pub(crate) fn new(
@@ -188,6 +170,22 @@ impl RegionMigrationManager {
Ok(table_route)
}
async fn retrieve_table_info(&self, region_id: RegionId) -> Result<TableInfoValue> {
let table_route = self
.context_factory
.table_metadata_manager
.table_info_manager()
.get(region_id.table_id())
.await
.context(error::TableMetadataManagerSnafu)?
.context(error::TableInfoNotFoundSnafu {
table_id: region_id.table_id(),
})?
.into_inner();
Ok(table_route)
}
/// Verifies the type of region migration table route.
fn verify_table_route(
&self,
@@ -279,8 +277,31 @@ impl RegionMigrationManager {
self.verify_region_leader_peer(&region_route, &task)?;
let procedure =
RegionMigrationProcedure::new(task.clone().into(), self.context_factory.clone());
let table_info = self.retrieve_table_info(region_id).await?;
let TableName {
catalog_name,
schema_name,
..
} = table_info.table_name();
let RegionMigrationProcedureTask {
cluster_id,
region_id,
from_peer,
to_peer,
replay_timeout,
} = task.clone();
let procedure = RegionMigrationProcedure::new(
PersistentContext {
catalog: catalog_name,
schema: schema_name,
cluster_id,
region_id,
from_peer,
to_peer,
replay_timeout,
},
self.context_factory.clone(),
);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let procedure_id = procedure_with_id.id;
info!("Starting region migration procedure {procedure_id} for {task}");

View File

@@ -278,6 +278,8 @@ pub fn send_mock_reply(
/// Generates a [PersistentContext].
pub fn new_persistent_context(from: u64, to: u64, region_id: RegionId) -> PersistentContext {
PersistentContext {
catalog: "greptime".into(),
schema: "public".into(),
from_peer: Peer::empty(from),
to_peer: Peer::empty(to),
region_id,
@@ -297,16 +299,6 @@ pub(crate) struct ProcedureMigrationTestSuite {
pub(crate) type BeforeTest =
Arc<dyn Fn(&mut ProcedureMigrationTestSuite) -> BoxFuture<'_, ()> + Send + Sync>;
/// Custom assertion.
pub(crate) type CustomAssertion = Arc<
dyn Fn(
&mut ProcedureMigrationTestSuite,
Result<(Box<dyn State>, Status)>,
) -> BoxFuture<'_, Result<()>>
+ Send
+ Sync,
>;
/// State assertion function.
pub(crate) type StateAssertion = Arc<dyn Fn(&dyn State) + Send + Sync>;
@@ -316,14 +308,11 @@ pub(crate) type StatusAssertion = Arc<dyn Fn(Status) + Send + Sync>;
/// Error assertion function.
pub(crate) type ErrorAssertion = Arc<dyn Fn(Error) + Send + Sync>;
// TODO(weny): Remove it.
#[allow(dead_code)]
/// The type of assertion.
#[derive(Clone)]
pub(crate) enum Assertion {
Simple(StateAssertion, StatusAssertion),
Error(ErrorAssertion),
Custom(CustomAssertion),
}
impl Assertion {
@@ -384,9 +373,6 @@ impl ProcedureMigrationTestSuite {
let error = result.unwrap_err();
error_assert(error);
}
Assertion::Custom(assert_fn) => {
assert_fn(self, result).await?;
}
}
Ok(())

View File

@@ -232,6 +232,8 @@ mod tests {
fn new_persistent_context() -> PersistentContext {
PersistentContext {
catalog: "greptime".into(),
schema: "public".into(),
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
region_id: RegionId::new(1024, 1),

View File

@@ -346,6 +346,8 @@ async fn run_region_failover_procedure(
let meta_srv = &cluster.meta_srv;
let procedure_manager = meta_srv.procedure_manager();
let procedure = RegionFailoverProcedure::new(
"greptime".into(),
"public".into(),
failed_region.clone(),
RegionFailoverContext {
region_lease_secs: 10,