refactor: allow procedure to acquire share lock (#3061)

* feat: implement `KeyRwLock`

* refactor: use KeyRwLock instead of LockMap

* refactor: use StringKey instead of String

* chore: remove redundant code

* refactor: cleanup KeyRwLock staled locks before granting new lock

* feat: clean staled locks manually

* feat: sort lock key in lexicographically order

* feat: ensure the ref count before dropping the rwlock

* feat: add more tests for rwlock

* feat: drop the key guards first

* feat: drops the key guards in the reverse order

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2024-01-03 17:05:45 +09:00
committed by GitHub
parent 611a8aa2fe
commit aa22f9c94a
12 changed files with 354 additions and 273 deletions

View File

@@ -394,7 +394,7 @@ impl Procedure for AlterTableProcedure {
fn lock_key(&self) -> LockKey {
let key = self.lock_key_inner();
LockKey::new(key)
LockKey::new_exclusive(key)
}
}

View File

@@ -349,7 +349,7 @@ impl Procedure for CreateTableProcedure {
table_ref.table,
);
LockKey::single(key)
LockKey::single_exclusive(key)
}
}

View File

@@ -273,7 +273,7 @@ impl Procedure for DropTableProcedure {
table_ref.table,
);
LockKey::single(key)
LockKey::single_exclusive(key)
}
}

View File

@@ -81,7 +81,7 @@ impl Procedure for TruncateTableProcedure {
table_ref.table,
);
LockKey::single(key)
LockKey::single_exclusive(key)
}
}

View File

@@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod lock;
mod runner;
mod rwlock;
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicBool, Ordering};
@@ -29,11 +29,11 @@ use snafu::{ensure, ResultExt};
use tokio::sync::watch::{self, Receiver, Sender};
use tokio::sync::{Mutex as TokioMutex, Notify};
use self::rwlock::KeyRwLock;
use crate::error::{
DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu, Result,
StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu,
};
use crate::local::lock::LockMap;
use crate::local::runner::Runner;
use crate::procedure::BoxedProcedureLoader;
use crate::store::{ProcedureMessage, ProcedureStore, StateStoreRef};
@@ -57,8 +57,6 @@ const META_TTL: Duration = Duration::from_secs(60 * 10);
pub(crate) struct ProcedureMeta {
/// Id of this procedure.
id: ProcedureId,
/// Notify to wait for a lock.
lock_notify: Notify,
/// Parent procedure id.
parent_id: Option<ProcedureId>,
/// Notify to wait for subprocedures.
@@ -78,7 +76,6 @@ impl ProcedureMeta {
let (state_sender, state_receiver) = watch::channel(ProcedureState::Running);
ProcedureMeta {
id,
lock_notify: Notify::new(),
parent_id,
child_notify: Notify::new(),
lock_key,
@@ -131,7 +128,7 @@ struct LoadedProcedure {
pub(crate) struct ManagerContext {
/// Procedure loaders. The key is the type name of the procedure which the loader returns.
loaders: Mutex<HashMap<String, BoxedProcedureLoader>>,
lock_map: LockMap,
key_lock: KeyRwLock<String>,
procedures: RwLock<HashMap<ProcedureId, ProcedureMetaRef>>,
/// Messages loaded from the procedure store.
messages: Mutex<HashMap<ProcedureId, ProcedureMessage>>,
@@ -152,8 +149,8 @@ impl ManagerContext {
/// Returns a new [ManagerContext].
fn new() -> ManagerContext {
ManagerContext {
key_lock: KeyRwLock::new(),
loaders: Mutex::new(HashMap::new()),
lock_map: LockMap::new(),
procedures: RwLock::new(HashMap::new()),
messages: Mutex::new(HashMap::new()),
finished_procedures: Mutex::new(VecDeque::new()),
@@ -850,7 +847,7 @@ mod tests {
assert!(manager.procedure_watcher(procedure_id).is_none());
let mut procedure = ProcedureToLoad::new("submit");
procedure.lock_key = LockKey::single("test.submit");
procedure.lock_key = LockKey::single_exclusive("test.submit");
assert!(manager
.submit(ProcedureWithId {
id: procedure_id,
@@ -918,7 +915,7 @@ mod tests {
}
fn lock_key(&self) -> LockKey {
LockKey::single("test.submit")
LockKey::single_exclusive("test.submit")
}
}
@@ -955,7 +952,7 @@ mod tests {
let manager = LocalManager::new(config, state_store);
let mut procedure = ProcedureToLoad::new("submit");
procedure.lock_key = LockKey::single("test.submit");
procedure.lock_key = LockKey::single_exclusive("test.submit");
let procedure_id = ProcedureId::random();
assert_matches!(
manager
@@ -986,7 +983,7 @@ mod tests {
manager.start().await.unwrap();
let mut procedure = ProcedureToLoad::new("submit");
procedure.lock_key = LockKey::single("test.submit");
procedure.lock_key = LockKey::single_exclusive("test.submit");
let procedure_id = ProcedureId::random();
assert!(manager
.submit(ProcedureWithId {
@@ -1018,7 +1015,7 @@ mod tests {
manager.manager_ctx.set_running();
let mut procedure = ProcedureToLoad::new("submit");
procedure.lock_key = LockKey::single("test.submit");
procedure.lock_key = LockKey::single_exclusive("test.submit");
let procedure_id = ProcedureId::random();
assert!(manager
.submit(ProcedureWithId {
@@ -1041,7 +1038,7 @@ mod tests {
// The remove_outdated_meta method has been stopped, so any procedure meta-data will not be automatically removed.
manager.stop().await.unwrap();
let mut procedure = ProcedureToLoad::new("submit");
procedure.lock_key = LockKey::single("test.submit");
procedure.lock_key = LockKey::single_exclusive("test.submit");
let procedure_id = ProcedureId::random();
manager.manager_ctx.set_running();
@@ -1063,7 +1060,7 @@ mod tests {
// After restart
let mut procedure = ProcedureToLoad::new("submit");
procedure.lock_key = LockKey::single("test.submit");
procedure.lock_key = LockKey::single_exclusive("test.submit");
let procedure_id = ProcedureId::random();
assert!(manager
.submit(ProcedureWithId {

View File

@@ -1,214 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, VecDeque};
use std::sync::RwLock;
use crate::local::ProcedureMetaRef;
use crate::ProcedureId;
/// A lock entry.
#[derive(Debug)]
struct Lock {
/// Current lock owner.
owner: ProcedureMetaRef,
/// Waiter procedures.
waiters: VecDeque<ProcedureMetaRef>,
}
impl Lock {
/// Returns a [Lock] with specific `owner` procedure.
fn from_owner(owner: ProcedureMetaRef) -> Lock {
Lock {
owner,
waiters: VecDeque::new(),
}
}
/// Try to pop a waiter from the waiter list, set it as owner
/// and wake up the new owner.
///
/// Returns false if there is no waiter in the waiter list.
fn switch_owner(&mut self) -> bool {
if let Some(waiter) = self.waiters.pop_front() {
// Update owner.
self.owner = waiter.clone();
// We need to use notify_one() since the waiter may have not called `notified()` yet.
waiter.lock_notify.notify_one();
true
} else {
false
}
}
}
/// Manages lock entries for procedures.
pub(crate) struct LockMap {
locks: RwLock<HashMap<String, Lock>>,
}
impl LockMap {
/// Returns a new [LockMap].
pub(crate) fn new() -> LockMap {
LockMap {
locks: RwLock::new(HashMap::new()),
}
}
/// Acquire lock by `key` for procedure with specific `meta`.
///
/// Though `meta` is cloneable, callers must ensure that only one `meta`
/// is acquiring and holding the lock at the same time.
///
/// # Panics
/// Panics if the procedure acquires the lock recursively.
pub(crate) async fn acquire_lock(&self, key: &str, meta: ProcedureMetaRef) {
assert!(!self.hold_lock(key, meta.id));
{
let mut locks = self.locks.write().unwrap();
if let Some(lock) = locks.get_mut(key) {
// Lock already exists, but we don't expect that a procedure acquires
// the same lock again.
assert_ne!(lock.owner.id, meta.id);
// Add this procedure to the waiter list. Here we don't check
// whether the procedure is already in the waiter list as we
// expect that a procedure should not wait for two lock simultaneously.
lock.waiters.push_back(meta.clone());
} else {
let _ = locks.insert(key.to_string(), Lock::from_owner(meta));
return;
}
}
// Wait for notify.
meta.lock_notify.notified().await;
assert!(self.hold_lock(key, meta.id));
}
/// Release lock by `key`.
pub(crate) fn release_lock(&self, key: &str, procedure_id: ProcedureId) {
let mut locks = self.locks.write().unwrap();
if let Some(lock) = locks.get_mut(key) {
if lock.owner.id != procedure_id {
// This is not the lock owner.
return;
}
if !lock.switch_owner() {
// No body waits for this lock, we can remove the lock entry.
let _ = locks.remove(key);
}
}
}
/// Returns true if the procedure with specific `procedure_id` holds the
/// lock of `key`.
fn hold_lock(&self, key: &str, procedure_id: ProcedureId) -> bool {
let locks = self.locks.read().unwrap();
locks
.get(key)
.map(|lock| lock.owner.id == procedure_id)
.unwrap_or(false)
}
/// Returns true if the procedure is waiting for the lock `key`.
#[cfg(test)]
fn waiting_lock(&self, key: &str, procedure_id: ProcedureId) -> bool {
let locks = self.locks.read().unwrap();
locks
.get(key)
.map(|lock| lock.waiters.iter().any(|meta| meta.id == procedure_id))
.unwrap_or(false)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::local::test_util;
#[test]
fn test_lock_no_waiter() {
let meta = Arc::new(test_util::procedure_meta_for_test());
let mut lock = Lock::from_owner(meta);
assert!(!lock.switch_owner());
}
#[tokio::test]
async fn test_lock_with_waiter() {
let owner = Arc::new(test_util::procedure_meta_for_test());
let mut lock = Lock::from_owner(owner);
let waiter = Arc::new(test_util::procedure_meta_for_test());
lock.waiters.push_back(waiter.clone());
assert!(lock.switch_owner());
assert!(lock.waiters.is_empty());
waiter.lock_notify.notified().await;
assert_eq!(lock.owner.id, waiter.id);
}
#[tokio::test]
async fn test_lock_map() {
let key = "hello";
let owner = Arc::new(test_util::procedure_meta_for_test());
let lock_map = Arc::new(LockMap::new());
lock_map.acquire_lock(key, owner.clone()).await;
let waiter = Arc::new(test_util::procedure_meta_for_test());
let waiter_id = waiter.id;
// Waiter release the lock, this should not take effect.
lock_map.release_lock(key, waiter_id);
let lock_map2 = lock_map.clone();
let owner_id = owner.id;
let handle = tokio::spawn(async move {
assert!(lock_map2.hold_lock(key, owner_id));
assert!(!lock_map2.hold_lock(key, waiter_id));
// Waiter wait for lock.
lock_map2.acquire_lock(key, waiter.clone()).await;
assert!(lock_map2.hold_lock(key, waiter_id));
});
// Owner still holds the lock.
assert!(lock_map.hold_lock(key, owner_id));
// Wait until the waiter acquired the lock
while !lock_map.waiting_lock(key, waiter_id) {
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
}
// Release lock
lock_map.release_lock(key, owner_id);
assert!(!lock_map.hold_lock(key, owner_id));
// Wait for task.
handle.await.unwrap();
// The waiter should hold the lock now.
assert!(lock_map.hold_lock(key, waiter_id));
lock_map.release_lock(key, waiter_id);
}
}

View File

@@ -19,8 +19,10 @@ use backon::{BackoffBuilder, ExponentialBuilder};
use common_telemetry::logging;
use tokio::time;
use super::rwlock::OwnedKeyRwLockGuard;
use crate::error::{self, ProcedurePanicSnafu, Result};
use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef};
use crate::procedure::StringKey;
use crate::store::ProcedureStore;
use crate::ProcedureState::Retrying;
use crate::{BoxedProcedure, Context, Error, ProcedureId, ProcedureState, ProcedureWithId, Status};
@@ -56,6 +58,7 @@ impl ExecResult {
struct ProcedureGuard {
meta: ProcedureMetaRef,
manager_ctx: Arc<ManagerContext>,
key_guards: Vec<OwnedKeyRwLockGuard>,
finish: bool,
}
@@ -65,6 +68,7 @@ impl ProcedureGuard {
ProcedureGuard {
meta,
manager_ctx,
key_guards: vec![],
finish: false,
}
}
@@ -95,10 +99,15 @@ impl Drop for ProcedureGuard {
self.manager_ctx.notify_by_subprocedure(parent_id);
}
// Release lock in reverse order.
for key in self.meta.lock_key.keys_to_unlock() {
self.manager_ctx.lock_map.release_lock(key, self.meta.id);
// Drops the key guards in the reverse order.
while !self.key_guards.is_empty() {
self.key_guards.pop();
}
// Clean the staled locks.
self.manager_ctx
.key_lock
.clean_keys(self.meta.lock_key.keys_to_lock().map(|k| k.as_string()));
}
}
@@ -121,7 +130,7 @@ impl Runner {
/// Run the procedure.
pub(crate) async fn run(mut self) {
// Ensure we can update the procedure state.
let guard = ProcedureGuard::new(self.meta.clone(), self.manager_ctx.clone());
let mut guard = ProcedureGuard::new(self.meta.clone(), self.manager_ctx.clone());
logging::info!(
"Runner {}-{} starts",
@@ -133,10 +142,14 @@ impl Runner {
// recursive locking by adding a root procedure id to the meta.
for key in self.meta.lock_key.keys_to_lock() {
// Acquire lock for each key.
self.manager_ctx
.lock_map
.acquire_lock(key, self.meta.clone())
.await;
let key_guard = match key {
StringKey::Share(key) => self.manager_ctx.key_lock.read(key.clone()).await.into(),
StringKey::Exclusive(key) => {
self.manager_ctx.key_lock.write(key.clone()).await.into()
}
};
guard.key_guards.push(key_guard);
}
// Execute the procedure. We need to release the lock whenever the the execution
@@ -604,7 +617,7 @@ mod tests {
};
let normal = ProcedureAdapter {
data: "normal".to_string(),
lock_key: LockKey::single("catalog.schema.table"),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
exec_fn,
};
@@ -665,7 +678,7 @@ mod tests {
};
let suspend = ProcedureAdapter {
data: "suspend".to_string(),
lock_key: LockKey::single("catalog.schema.table"),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
exec_fn,
};
@@ -697,7 +710,7 @@ mod tests {
};
let child = ProcedureAdapter {
data: "child".to_string(),
lock_key: LockKey::new(keys.iter().map(|k| k.to_string())),
lock_key: LockKey::new_exclusive(keys.iter().map(|k| k.to_string())),
exec_fn,
};
@@ -765,7 +778,7 @@ mod tests {
};
let parent = ProcedureAdapter {
data: "parent".to_string(),
lock_key: LockKey::single("catalog.schema.table"),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
exec_fn,
};
@@ -784,6 +797,7 @@ mod tests {
runner.manager_ctx = manager_ctx.clone();
runner.run().await;
assert!(manager_ctx.key_lock.is_empty());
// Check child procedures.
for child_id in children_ids {
@@ -810,7 +824,7 @@ mod tests {
let exec_fn = move |_| async move { Ok(Status::Executing { persist: true }) }.boxed();
let normal = ProcedureAdapter {
data: "normal".to_string(),
lock_key: LockKey::single("catalog.schema.table"),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
exec_fn,
};
@@ -851,7 +865,7 @@ mod tests {
|_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
let normal = ProcedureAdapter {
data: "fail".to_string(),
lock_key: LockKey::single("catalog.schema.table"),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
exec_fn,
};
@@ -875,7 +889,7 @@ mod tests {
|_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
let fail = ProcedureAdapter {
data: "fail".to_string(),
lock_key: LockKey::single("catalog.schema.table"),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
exec_fn,
};
@@ -917,7 +931,7 @@ mod tests {
let retry_later = ProcedureAdapter {
data: "retry_later".to_string(),
lock_key: LockKey::single("catalog.schema.table"),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
exec_fn,
};
@@ -952,7 +966,7 @@ mod tests {
let exceed_max_retry_later = ProcedureAdapter {
data: "exceed_max_retry_later".to_string(),
lock_key: LockKey::single("catalog.schema.table"),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
exec_fn,
};
@@ -993,7 +1007,7 @@ mod tests {
};
let fail = ProcedureAdapter {
data: "fail".to_string(),
lock_key: LockKey::single("catalog.schema.table.region-0"),
lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"),
exec_fn,
};
@@ -1027,7 +1041,7 @@ mod tests {
};
let parent = ProcedureAdapter {
data: "parent".to_string(),
lock_key: LockKey::single("catalog.schema.table"),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
exec_fn,
};
@@ -1042,10 +1056,11 @@ mod tests {
// Manually add this procedure to the manager ctx.
assert!(manager_ctx.try_insert_procedure(meta.clone()));
// Replace the manager ctx.
runner.manager_ctx = manager_ctx;
runner.manager_ctx = manager_ctx.clone();
// Run the runner and execute the procedure.
runner.run().await;
assert!(manager_ctx.key_lock.is_empty());
let err = meta.state().error().unwrap().output_msg();
assert!(err.contains("subprocedure failed"), "{err}");
}

View File

@@ -0,0 +1,247 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::{Arc, Mutex};
use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
pub enum OwnedKeyRwLockGuard {
Read(OwnedRwLockReadGuard<()>),
Write(OwnedRwLockWriteGuard<()>),
}
impl From<OwnedRwLockReadGuard<()>> for OwnedKeyRwLockGuard {
fn from(guard: OwnedRwLockReadGuard<()>) -> Self {
OwnedKeyRwLockGuard::Read(guard)
}
}
impl From<OwnedRwLockWriteGuard<()>> for OwnedKeyRwLockGuard {
fn from(guard: OwnedRwLockWriteGuard<()>) -> Self {
OwnedKeyRwLockGuard::Write(guard)
}
}
/// Locks based on a key, allowing other keys to lock independently.
#[derive(Debug)]
pub struct KeyRwLock<K> {
/// The inner map of locks for specific keys.
inner: Mutex<HashMap<K, Arc<RwLock<()>>>>,
}
impl<K> KeyRwLock<K>
where
K: Eq + Hash + Clone,
{
pub fn new() -> Self {
KeyRwLock {
inner: Default::default(),
}
}
/// Locks the key with shared read access, returning a guard.
pub async fn read(&self, key: K) -> OwnedRwLockReadGuard<()> {
let lock = {
let mut locks = self.inner.lock().unwrap();
locks.entry(key).or_default().clone()
};
lock.read_owned().await
}
/// Locks the key with exclusive write access, returning a guard.
pub async fn write(&self, key: K) -> OwnedRwLockWriteGuard<()> {
let lock = {
let mut locks = self.inner.lock().unwrap();
locks.entry(key).or_default().clone()
};
lock.write_owned().await
}
/// Clean up stale locks.
///
/// Note: It only cleans a lock if
/// - Its strong ref count equals one.
/// - Able to acquire the write lock.
pub fn clean_keys<'a>(&'a self, iter: impl IntoIterator<Item = &'a K>) {
let mut locks = self.inner.lock().unwrap();
let mut keys = Vec::new();
for key in iter {
if let Some(lock) = locks.get(key) {
if lock.try_write().is_ok() {
debug_assert_eq!(Arc::weak_count(lock), 0);
// Ensures nobody keeps this ref.
if Arc::strong_count(lock) == 1 {
keys.push(key);
}
}
}
}
for key in keys {
locks.remove(key);
}
}
}
#[cfg(test)]
impl<K> KeyRwLock<K>
where
K: Eq + Hash + Clone,
{
/// Tries to lock the key with shared read access, returning immediately.
pub fn try_read(&self, key: K) -> Result<OwnedRwLockReadGuard<()>, tokio::sync::TryLockError> {
let lock = {
let mut locks = self.inner.lock().unwrap();
locks.entry(key).or_default().clone()
};
lock.try_read_owned()
}
/// Tries lock this key with exclusive write access, returning immediately.
pub fn try_write(
&self,
key: K,
) -> Result<OwnedRwLockWriteGuard<()>, tokio::sync::TryLockError> {
let lock = {
let mut locks = self.inner.lock().unwrap();
locks.entry(key).or_default().clone()
};
lock.try_write_owned()
}
/// Returns number of keys.
pub fn len(&self) -> usize {
self.inner.lock().unwrap().len()
}
/// Returns true the inner map is empty.
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_naive() {
let lock_key = KeyRwLock::new();
{
let _guard = lock_key.read("test1").await;
assert_eq!(lock_key.len(), 1);
assert!(lock_key.try_read("test1").is_ok());
assert!(lock_key.try_write("test1").is_err());
}
{
let _guard0 = lock_key.write("test2").await;
let _guard = lock_key.write("test1").await;
assert_eq!(lock_key.len(), 2);
assert!(lock_key.try_read("test1").is_err());
assert!(lock_key.try_write("test1").is_err());
}
assert_eq!(lock_key.len(), 2);
lock_key.clean_keys(&vec!["test1", "test2"]);
assert!(lock_key.is_empty());
let mut guards = Vec::new();
for key in ["test1", "test2"] {
guards.push(lock_key.read(key).await);
}
while !guards.is_empty() {
guards.pop();
}
lock_key.clean_keys(vec![&"test1", &"test2"]);
assert_eq!(lock_key.len(), 0);
}
#[tokio::test]
async fn test_clean_keys() {
let lock_key = KeyRwLock::<&str>::new();
{
let rwlock = {
lock_key
.inner
.lock()
.unwrap()
.entry("test")
.or_default()
.clone()
};
assert_eq!(Arc::strong_count(&rwlock), 2);
let _guard = rwlock.read_owned().await;
{
let inner = lock_key.inner.lock().unwrap();
let rwlock = inner.get("test").unwrap();
assert_eq!(Arc::strong_count(rwlock), 2);
}
}
{
let rwlock = {
lock_key
.inner
.lock()
.unwrap()
.entry("test")
.or_default()
.clone()
};
assert_eq!(Arc::strong_count(&rwlock), 2);
let _guard = rwlock.write_owned().await;
{
let inner = lock_key.inner.lock().unwrap();
let rwlock = inner.get("test").unwrap();
assert_eq!(Arc::strong_count(rwlock), 2);
}
}
{
let inner = lock_key.inner.lock().unwrap();
let rwlock = inner.get("test").unwrap();
assert_eq!(Arc::strong_count(rwlock), 1);
}
// Someone has the ref of the rwlock, but it waits to be granted the lock.
let rwlock = {
lock_key
.inner
.lock()
.unwrap()
.entry("test")
.or_default()
.clone()
};
assert_eq!(Arc::strong_count(&rwlock), 2);
// However, One thread trying to remove the "test" key should have no effect.
lock_key.clean_keys(vec![&"test"]);
// Should get the rwlock.
{
let inner = lock_key.inner.lock().unwrap();
inner.get("test").unwrap();
}
}
}

View File

@@ -116,22 +116,49 @@ impl<T: Procedure + ?Sized> Procedure for Box<T> {
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum StringKey {
Share(String),
Exclusive(String),
}
/// Keys to identify required locks.
///
/// [LockKey] always sorts keys lexicographically so that they can be acquired
/// in the same order.
// Most procedures should only acquire 1 ~ 2 locks so we use smallvec to hold keys.
/// Most procedures should only acquire 1 ~ 2 locks so we use smallvec to hold keys.
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct LockKey(SmallVec<[String; 2]>);
pub struct LockKey(SmallVec<[StringKey; 2]>);
impl StringKey {
pub fn into_string(self) -> String {
match self {
StringKey::Share(s) => s,
StringKey::Exclusive(s) => s,
}
}
pub fn as_string(&self) -> &String {
match self {
StringKey::Share(s) => s,
StringKey::Exclusive(s) => s,
}
}
}
impl LockKey {
/// Returns a new [LockKey] with only one key.
pub fn single(key: impl Into<String>) -> LockKey {
pub fn single(key: impl Into<StringKey>) -> LockKey {
LockKey(smallvec![key.into()])
}
/// Returns a new [LockKey] with only one key.
pub fn single_exclusive(key: impl Into<String>) -> LockKey {
LockKey(smallvec![StringKey::Exclusive(key.into())])
}
/// Returns a new [LockKey] with keys from specific `iter`.
pub fn new(iter: impl IntoIterator<Item = String>) -> LockKey {
pub fn new(iter: impl IntoIterator<Item = StringKey>) -> LockKey {
let mut vec: SmallVec<_> = iter.into_iter().collect();
vec.sort();
// Dedup keys to avoid acquiring the same key multiple times.
@@ -139,14 +166,14 @@ impl LockKey {
LockKey(vec)
}
/// Returns the keys to lock.
pub fn keys_to_lock(&self) -> impl Iterator<Item = &String> {
self.0.iter()
/// Returns a new [LockKey] with keys from specific `iter`.
pub fn new_exclusive(iter: impl IntoIterator<Item = String>) -> LockKey {
Self::new(iter.into_iter().map(StringKey::Exclusive))
}
/// Returns the keys to unlock.
pub fn keys_to_unlock(&self) -> impl Iterator<Item = &String> {
self.0.iter().rev()
/// Returns the keys to lock.
pub fn keys_to_lock(&self) -> impl Iterator<Item = &StringKey> {
self.0.iter()
}
}
@@ -340,20 +367,25 @@ mod tests {
#[test]
fn test_lock_key() {
let entity = "catalog.schema.my_table";
let key = LockKey::single(entity);
assert_eq!(vec![entity], key.keys_to_lock().collect::<Vec<_>>());
assert_eq!(vec![entity], key.keys_to_unlock().collect::<Vec<_>>());
let key = LockKey::single_exclusive(entity);
assert_eq!(
vec![&StringKey::Exclusive(entity.to_string())],
key.keys_to_lock().collect::<Vec<_>>()
);
let key = LockKey::new([
let key = LockKey::new_exclusive([
"b".to_string(),
"c".to_string(),
"a".to_string(),
"c".to_string(),
]);
assert_eq!(vec!["a", "b", "c"], key.keys_to_lock().collect::<Vec<_>>());
assert_eq!(
vec!["c", "b", "a"],
key.keys_to_unlock().collect::<Vec<_>>()
vec![
&StringKey::Exclusive("a".to_string()),
&StringKey::Exclusive("b".to_string()),
&StringKey::Exclusive("c".to_string())
],
key.keys_to_lock().collect::<Vec<_>>()
);
}

View File

@@ -98,7 +98,7 @@ mod tests {
}
fn lock_key(&self) -> LockKey {
LockKey::single("test.submit")
LockKey::single_exclusive("test.submit")
}
}

View File

@@ -373,7 +373,7 @@ impl Procedure for RegionFailoverProcedure {
fn lock_key(&self) -> LockKey {
let region_ident = &self.node.failed_region;
let region_key = region_lock_key(region_ident.table_id, region_ident.region_number);
LockKey::single(region_key)
LockKey::single_exclusive(region_key)
}
}

View File

@@ -419,7 +419,7 @@ impl Procedure for RegionMigrationProcedure {
fn lock_key(&self) -> LockKey {
let key = self.context.persistent_ctx.lock_key();
LockKey::single(key)
LockKey::single_exclusive(key)
}
}
@@ -455,7 +455,11 @@ mod tests {
let procedure = RegionMigrationProcedure::new(persistent_context, context);
let key = procedure.lock_key();
let keys = key.keys_to_lock().cloned().collect::<Vec<_>>();
let keys = key
.keys_to_lock()
.cloned()
.map(|s| s.into_string())
.collect::<Vec<_>>();
assert!(keys.contains(&expected_key));
}