refactor: refactor the locks in the procedure (#3126)

* feat: add lock key

* refactor: procedure lock keys

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2024-01-10 18:46:39 +09:00
committed by GitHub
parent 490312bf57
commit ec8266b969
12 changed files with 333 additions and 78 deletions

View File

@@ -17,6 +17,11 @@ use consts::DEFAULT_CATALOG_NAME;
pub mod consts;
pub mod error;
#[inline]
pub fn format_schema_name(catalog: &str, schema: &str) -> String {
format!("{catalog}.{schema}")
}
/// Formats table fully-qualified name
#[inline]
pub fn format_full_table_name(catalog: &str, schema: &str, table: &str) -> String {

View File

@@ -24,7 +24,7 @@ use async_trait::async_trait;
use common_grpc_expr::alter_expr_to_request;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Status,
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Status, StringKey,
};
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{debug, info};
@@ -44,6 +44,7 @@ use crate::error::{self, ConvertAlterTableRequestSnafu, InvalidProtoMsgSnafu, Re
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::DeserializedValueWithBytes;
use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
use crate::metrics;
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders};
@@ -63,7 +64,7 @@ impl AlterTableProcedure {
cluster_id: u64,
task: AlterTableTask,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
physical_table_name: Option<TableName>,
physical_table_info: Option<(TableId, TableName)>,
context: DdlContext,
) -> Result<Self> {
let alter_kind = task
@@ -86,7 +87,7 @@ impl AlterTableProcedure {
data: AlterTableData::new(
task,
table_info_value,
physical_table_name,
physical_table_info,
cluster_id,
next_column_id,
),
@@ -335,32 +336,31 @@ impl AlterTableProcedure {
Ok(Status::Done)
}
fn lock_key_inner(&self) -> Vec<String> {
fn lock_key_inner(&self) -> Vec<StringKey> {
let mut lock_key = vec![];
if let Some(physical_table_name) = self.data.physical_table_name() {
let physical_table_key = common_catalog::format_full_table_name(
&physical_table_name.catalog_name,
&physical_table_name.schema_name,
&physical_table_name.table_name,
if let Some((physical_table_id, physical_table_name)) = self.data.physical_table_info() {
lock_key.push(CatalogLock::Read(&physical_table_name.catalog_name).into());
lock_key.push(
SchemaLock::read(
&physical_table_name.catalog_name,
&physical_table_name.schema_name,
)
.into(),
);
lock_key.push(physical_table_key);
lock_key.push(TableLock::Read(*physical_table_id).into())
}
let table_ref = self.data.table_ref();
let table_key = common_catalog::format_full_table_name(
table_ref.catalog,
table_ref.schema,
table_ref.table,
);
lock_key.push(table_key);
let table_id = self.data.table_id();
lock_key.push(CatalogLock::Read(table_ref.catalog).into());
lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
lock_key.push(TableLock::Write(table_id).into());
if let Ok(Kind::RenameTable(RenameTable { new_table_name })) = self.alter_kind() {
lock_key.push(common_catalog::format_full_table_name(
table_ref.catalog,
table_ref.schema,
new_table_name,
))
lock_key.push(
TableNameLock::new(table_ref.catalog, table_ref.schema, new_table_name).into(),
)
}
lock_key
@@ -406,7 +406,7 @@ impl Procedure for AlterTableProcedure {
fn lock_key(&self) -> LockKey {
let key = self.lock_key_inner();
LockKey::new_exclusive(key)
LockKey::new(key)
}
}
@@ -423,13 +423,13 @@ enum AlterTableState {
#[derive(Debug, Serialize, Deserialize)]
pub struct AlterTableData {
cluster_id: u64,
state: AlterTableState,
task: AlterTableTask,
/// Table info value before alteration.
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
/// Physical table name, if the table to alter is a logical table.
physical_table_name: Option<TableName>,
cluster_id: u64,
physical_table_info: Option<(TableId, TableName)>,
/// Next column id of the table if the task adds columns to the table.
next_column_id: Option<ColumnId>,
}
@@ -438,7 +438,7 @@ impl AlterTableData {
pub fn new(
task: AlterTableTask,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
physical_table_name: Option<TableName>,
physical_table_info: Option<(TableId, TableName)>,
cluster_id: u64,
next_column_id: Option<ColumnId>,
) -> Self {
@@ -446,7 +446,7 @@ impl AlterTableData {
state: AlterTableState::Prepare,
task,
table_info_value,
physical_table_name,
physical_table_info,
cluster_id,
next_column_id,
}
@@ -464,8 +464,8 @@ impl AlterTableData {
&self.table_info_value.table_info
}
fn physical_table_name(&self) -> Option<&TableName> {
self.physical_table_name.as_ref()
fn physical_table_info(&self) -> Option<&(TableId, TableName)> {
self.physical_table_info.as_ref()
}
}

View File

@@ -41,6 +41,7 @@ use crate::ddl::DdlContext;
use crate::error::{self, Result, TableRouteNotFoundSnafu};
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::lock_key::TableNameLock;
use crate::metrics;
use crate::region_keeper::OperatingRegionGuard;
use crate::rpc::ddl::CreateTableTask;
@@ -343,13 +344,12 @@ impl Procedure for CreateTableProcedure {
fn lock_key(&self) -> LockKey {
let table_ref = &self.creator.data.table_ref();
let key = common_catalog::format_full_table_name(
LockKey::single(TableNameLock::new(
table_ref.catalog,
table_ref.schema,
table_ref.table,
);
LockKey::single_exclusive(key)
))
}
}

View File

@@ -41,6 +41,7 @@ use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::key::DeserializedValueWithBytes;
use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
use crate::metrics;
use crate::region_keeper::OperatingRegionGuard;
use crate::rpc::ddl::DropTableTask;
@@ -267,13 +268,14 @@ impl Procedure for DropTableProcedure {
fn lock_key(&self) -> LockKey {
let table_ref = &self.data.table_ref();
let key = common_catalog::format_full_table_name(
table_ref.catalog,
table_ref.schema,
table_ref.table,
);
let table_id = self.data.table_id();
let lock_key = vec![
CatalogLock::Read(table_ref.catalog).into(),
SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
TableLock::Write(table_id).into(),
];
LockKey::single_exclusive(key)
LockKey::new(lock_key)
}
}

View File

@@ -37,6 +37,7 @@ use crate::error::{Result, TableNotFoundSnafu};
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::DeserializedValueWithBytes;
use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
use crate::metrics;
use crate::rpc::ddl::TruncateTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
@@ -75,13 +76,14 @@ impl Procedure for TruncateTableProcedure {
fn lock_key(&self) -> LockKey {
let table_ref = &self.data.table_ref();
let key = common_catalog::format_full_table_name(
table_ref.catalog,
table_ref.schema,
table_ref.table,
);
let table_id = self.data.table_id();
let lock_key = vec![
CatalogLock::Read(table_ref.catalog).into(),
SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
TableLock::Write(table_id).into(),
];
LockKey::single_exclusive(key)
LockKey::new(lock_key)
}
}

View File

@@ -19,7 +19,7 @@ use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithI
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{info, tracing};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionNumber;
use store_api::storage::{RegionNumber, TableId};
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::datanode_manager::DatanodeManagerRef;
@@ -162,7 +162,7 @@ impl DdlManager {
cluster_id: u64,
alter_table_task: AlterTableTask,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
physical_table_name: Option<TableName>,
physical_table_info: Option<(TableId, TableName)>,
) -> Result<ProcedureId> {
let context = self.create_context();
@@ -170,7 +170,7 @@ impl DdlManager {
cluster_id,
alter_table_task,
table_info_value,
physical_table_name,
physical_table_info,
context,
)?;
@@ -341,7 +341,7 @@ async fn handle_alter_table_task(
.get_physical_table_id(table_id)
.await?;
let physical_table_name = if physical_table_id == table_id {
let physical_table_info = if physical_table_id == table_id {
None
} else {
let physical_table_info = &ddl_manager
@@ -353,11 +353,14 @@ async fn handle_alter_table_task(
table_name: table_ref.to_string(),
})?
.table_info;
Some(TableName {
catalog_name: physical_table_info.catalog_name.clone(),
schema_name: physical_table_info.schema_name.clone(),
table_name: physical_table_info.name.clone(),
})
Some((
physical_table_id,
TableName {
catalog_name: physical_table_info.catalog_name.clone(),
schema_name: physical_table_info.schema_name.clone(),
table_name: physical_table_info.name.clone(),
},
))
};
let id = ddl_manager
@@ -365,7 +368,7 @@ async fn handle_alter_table_task(
cluster_id,
alter_table_task,
table_info_value,
physical_table_name,
physical_table_info,
)
.await?;

View File

@@ -27,6 +27,7 @@ pub mod heartbeat;
pub mod instruction;
pub mod key;
pub mod kv_backend;
pub mod lock_key;
pub mod metrics;
pub mod peer;
pub mod range_stream;

View File

@@ -0,0 +1,235 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Display;
use common_catalog::{format_full_table_name, format_schema_name};
use common_procedure::StringKey;
use store_api::storage::{RegionId, TableId};
const CATALOG_LOCK_PREFIX: &str = "__catalog_lock";
const SCHEMA_LOCK_PREFIX: &str = "__schema_lock";
const TABLE_LOCK_PREFIX: &str = "__table_lock";
const TABLE_NAME_LOCK_PREFIX: &str = "__table_name_lock";
const REGION_LOCK_PREFIX: &str = "__region_lock";
/// [CatalogLock] acquires the lock on the tenant level.
pub enum CatalogLock<'a> {
Read(&'a str),
Write(&'a str),
}
impl<'a> Display for CatalogLock<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let key = match self {
CatalogLock::Read(s) => s,
CatalogLock::Write(s) => s,
};
write!(f, "{}/{}", CATALOG_LOCK_PREFIX, key)
}
}
impl<'a> From<CatalogLock<'a>> for StringKey {
fn from(value: CatalogLock) -> Self {
match value {
CatalogLock::Write(_) => StringKey::Exclusive(value.to_string()),
CatalogLock::Read(_) => StringKey::Share(value.to_string()),
}
}
}
/// [SchemaLock] acquires the lock on the database level.
pub enum SchemaLock {
Read(String),
Write(String),
}
impl SchemaLock {
pub fn read(catalog: &str, schema: &str) -> Self {
Self::Read(format_schema_name(catalog, schema))
}
pub fn write(catalog: &str, schema: &str) -> Self {
Self::Write(format_schema_name(catalog, schema))
}
}
impl Display for SchemaLock {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let key = match self {
SchemaLock::Read(s) => s,
SchemaLock::Write(s) => s,
};
write!(f, "{}/{}", SCHEMA_LOCK_PREFIX, key)
}
}
impl From<SchemaLock> for StringKey {
fn from(value: SchemaLock) -> Self {
match value {
SchemaLock::Write(_) => StringKey::Exclusive(value.to_string()),
SchemaLock::Read(_) => StringKey::Share(value.to_string()),
}
}
}
/// [TableNameLock] prevents any procedures trying to create a table named it.
pub enum TableNameLock {
Write(String),
}
impl TableNameLock {
pub fn new(catalog: &str, schema: &str, table: &str) -> Self {
Self::Write(format_full_table_name(catalog, schema, table))
}
}
impl Display for TableNameLock {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let TableNameLock::Write(name) = self;
write!(f, "{}/{}", TABLE_NAME_LOCK_PREFIX, name)
}
}
impl From<TableNameLock> for StringKey {
fn from(value: TableNameLock) -> Self {
match value {
TableNameLock::Write(_) => StringKey::Exclusive(value.to_string()),
}
}
}
/// [TableLock] acquires the lock on the table level.
///
/// Note: Allows to read/modify the corresponding table's [TableInfoValue](crate::key::table_info::TableInfoValue),
/// [TableRouteValue](crate::key::table_route::TableRouteValue), [TableDatanodeValue](crate::key::datanode_table::DatanodeTableValue).
pub enum TableLock {
Read(TableId),
Write(TableId),
}
impl Display for TableLock {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let key = match self {
TableLock::Read(s) => s,
TableLock::Write(s) => s,
};
write!(f, "{}/{}", TABLE_LOCK_PREFIX, key)
}
}
impl From<TableLock> for StringKey {
fn from(value: TableLock) -> Self {
match value {
TableLock::Write(_) => StringKey::Exclusive(value.to_string()),
TableLock::Read(_) => StringKey::Share(value.to_string()),
}
}
}
/// [RegionLock] acquires the lock on the region level.
///
/// Note:
/// - Allows modification the corresponding region's [TableRouteValue](crate::key::table_route::TableRouteValue),
/// [TableDatanodeValue](crate::key::datanode_table::DatanodeTableValue) even if
/// it acquires the [RegionLock::Write] only without acquiring the [TableLock::Write].
///
/// - Should acquire [TableLock] of the table at same procedure.
///
/// TODO(weny): we should consider separating TableRouteValue into finer keys.
pub enum RegionLock {
Read(RegionId),
Write(RegionId),
}
impl Display for RegionLock {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let key = match self {
RegionLock::Read(s) => s.as_u64(),
RegionLock::Write(s) => s.as_u64(),
};
write!(f, "{}/{}", REGION_LOCK_PREFIX, key)
}
}
impl From<RegionLock> for StringKey {
fn from(value: RegionLock) -> Self {
match value {
RegionLock::Write(_) => StringKey::Exclusive(value.to_string()),
RegionLock::Read(_) => StringKey::Share(value.to_string()),
}
}
}
#[cfg(test)]
mod tests {
use common_procedure::StringKey;
use crate::lock_key::*;
#[test]
fn test_lock_key() {
// The catalog lock
let string_key: StringKey = CatalogLock::Read("foo").into();
assert_eq!(
string_key,
StringKey::Share(format!("{}/{}", CATALOG_LOCK_PREFIX, "foo"))
);
let string_key: StringKey = CatalogLock::Write("foo").into();
assert_eq!(
string_key,
StringKey::Exclusive(format!("{}/{}", CATALOG_LOCK_PREFIX, "foo"))
);
// The schema lock
let string_key: StringKey = SchemaLock::read("foo", "bar").into();
assert_eq!(
string_key,
StringKey::Share(format!("{}/{}", SCHEMA_LOCK_PREFIX, "foo.bar"))
);
let string_key: StringKey = SchemaLock::write("foo", "bar").into();
assert_eq!(
string_key,
StringKey::Exclusive(format!("{}/{}", SCHEMA_LOCK_PREFIX, "foo.bar"))
);
// The table lock
let string_key: StringKey = TableLock::Read(1024).into();
assert_eq!(
string_key,
StringKey::Share(format!("{}/{}", TABLE_LOCK_PREFIX, 1024))
);
let string_key: StringKey = TableLock::Write(1024).into();
assert_eq!(
string_key,
StringKey::Exclusive(format!("{}/{}", TABLE_LOCK_PREFIX, 1024))
);
// The table name lock
let string_key: StringKey = TableNameLock::new("foo", "bar", "baz").into();
assert_eq!(
string_key,
StringKey::Exclusive(format!("{}/{}", TABLE_NAME_LOCK_PREFIX, "foo.bar.baz"))
);
// The region lock
let region_id = RegionId::new(1024, 1);
let string_key: StringKey = RegionLock::Read(region_id).into();
assert_eq!(
string_key,
StringKey::Share(format!("{}/{}", REGION_LOCK_PREFIX, region_id.as_u64()))
);
let string_key: StringKey = RegionLock::Write(region_id).into();
assert_eq!(
string_key,
StringKey::Exclusive(format!("{}/{}", REGION_LOCK_PREFIX, region_id.as_u64()))
);
}
}

View File

@@ -26,6 +26,6 @@ pub mod watcher;
pub use crate::error::{Error, Result};
pub use crate::procedure::{
BoxedProcedure, Context, ContextProvider, LockKey, Procedure, ProcedureId, ProcedureManager,
ProcedureManagerRef, ProcedureState, ProcedureWithId, Status,
ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, StringKey,
};
pub use crate::watcher::Watcher;

View File

@@ -28,6 +28,7 @@ use async_trait::async_trait;
use common_meta::key::datanode_table::DatanodeTableKey;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::ResettableKvBackendRef;
use common_meta::lock_key::{RegionLock, TableLock};
use common_meta::{ClusterId, RegionIdent};
use common_procedure::error::{
Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
@@ -40,13 +41,12 @@ use common_telemetry::{error, info, warn};
use failover_start::RegionFailoverStart;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::storage::RegionNumber;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;
use crate::error::{Error, RegisterProcedureLoaderSnafu, Result, TableMetadataManagerSnafu};
use crate::lock::DistLockRef;
use crate::metasrv::{SelectorContext, SelectorRef};
use crate::procedure::utils::region_lock_key;
use crate::service::mailbox::MailboxRef;
const OPEN_REGION_MESSAGE_TIMEOUT: Duration = Duration::from_secs(30);
@@ -372,8 +372,17 @@ impl Procedure for RegionFailoverProcedure {
fn lock_key(&self) -> LockKey {
let region_ident = &self.node.failed_region;
let region_key = region_lock_key(region_ident.table_id, region_ident.region_number);
LockKey::single_exclusive(region_key)
// TODO(weny): acquires the catalog, schema read locks.
let lock_key = vec![
TableLock::Read(region_ident.table_id).into(),
RegionLock::Write(RegionId::new(
region_ident.table_id,
region_ident.region_number,
))
.into(),
];
LockKey::new(lock_key)
}
}

View File

@@ -35,13 +35,14 @@ use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use common_meta::lock_key::{RegionLock, TableLock};
use common_meta::peer::Peer;
use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
use common_meta::ClusterId;
use common_procedure::error::{
Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status, StringKey};
pub use manager::RegionMigrationProcedureTask;
use serde::{Deserialize, Serialize};
use snafu::{location, Location, OptionExt, ResultExt};
@@ -50,7 +51,6 @@ use tokio::time::Instant;
use self::migration_start::RegionMigrationStart;
use crate::error::{self, Error, Result};
use crate::procedure::utils::region_lock_key;
use crate::service::mailbox::{BroadcastChannel, MailboxRef};
/// It's shared in each step and available even after recovering.
@@ -71,8 +71,15 @@ pub struct PersistentContext {
}
impl PersistentContext {
pub fn lock_key(&self) -> String {
region_lock_key(self.region_id.table_id(), self.region_id.region_number())
pub fn lock_key(&self) -> Vec<StringKey> {
let region_id = self.region_id;
// TODO(weny): acquires the catalog, schema read locks.
let lock_key = vec![
TableLock::Read(region_id.table_id()).into(),
RegionLock::Write(region_id).into(),
];
lock_key
}
}
@@ -418,8 +425,7 @@ impl Procedure for RegionMigrationProcedure {
}
fn lock_key(&self) -> LockKey {
let key = self.context.persistent_ctx.lock_key();
LockKey::single_exclusive(key)
LockKey::new(self.context.persistent_ctx.lock_key())
}
}
@@ -447,7 +453,7 @@ mod tests {
#[test]
fn test_lock_key() {
let persistent_context = new_persistent_context();
let expected_key = persistent_context.lock_key();
let expected_keys = persistent_context.lock_key();
let env = TestingEnv::new();
let context = env.context_factory();
@@ -455,13 +461,11 @@ mod tests {
let procedure = RegionMigrationProcedure::new(persistent_context, context);
let key = procedure.lock_key();
let keys = key
.keys_to_lock()
.cloned()
.map(|s| s.into_string())
.collect::<Vec<_>>();
let keys = key.keys_to_lock().cloned().collect::<Vec<_>>();
assert!(keys.contains(&expected_key));
for key in expected_keys {
assert!(keys.contains(&key));
}
}
#[test]

View File

@@ -12,12 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use store_api::storage::{RegionNumber, TableId};
pub fn region_lock_key(table_id: TableId, region_number: RegionNumber) -> String {
format!("{}/region-{}", table_id, region_number)
}
#[cfg(feature = "mock")]
pub mod mock {
use std::io::Error;