From aa22f9c94a80904a71a8297fc810b701099ab366 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 3 Jan 2024 17:05:45 +0900 Subject: [PATCH] refactor: allow procedure to acquire share lock (#3061) * feat: implement `KeyRwLock` * refactor: use KeyRwLock instead of LockMap * refactor: use StringKey instead of String * chore: remove redundant code * refactor: cleanup KeyRwLock staled locks before granting new lock * feat: clean staled locks manually * feat: sort lock key in lexicographically order * feat: ensure the ref count before dropping the rwlock * feat: add more tests for rwlock * feat: drop the key guards first * feat: drops the key guards in the reverse order * chore: apply suggestions from CR * chore: apply suggestions from CR * chore: apply suggestions from CR --- src/common/meta/src/ddl/alter_table.rs | 2 +- src/common/meta/src/ddl/create_table.rs | 2 +- src/common/meta/src/ddl/drop_table.rs | 2 +- src/common/meta/src/ddl/truncate_table.rs | 2 +- src/common/procedure/src/local.rs | 25 +- src/common/procedure/src/local/lock.rs | 214 --------------- src/common/procedure/src/local/runner.rs | 55 ++-- src/common/procedure/src/local/rwlock.rs | 247 ++++++++++++++++++ src/common/procedure/src/procedure.rs | 66 +++-- src/common/procedure/src/watcher.rs | 2 +- src/meta-srv/src/procedure/region_failover.rs | 2 +- .../src/procedure/region_migration.rs | 8 +- 12 files changed, 354 insertions(+), 273 deletions(-) delete mode 100644 src/common/procedure/src/local/lock.rs create mode 100644 src/common/procedure/src/local/rwlock.rs diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index c3b1f7c311..e196ed70c6 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -394,7 +394,7 @@ impl Procedure for AlterTableProcedure { fn lock_key(&self) -> LockKey { let key = self.lock_key_inner(); - LockKey::new(key) + LockKey::new_exclusive(key) } } diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index b480c82acd..9bdb6929c6 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -349,7 +349,7 @@ impl Procedure for CreateTableProcedure { table_ref.table, ); - LockKey::single(key) + LockKey::single_exclusive(key) } } diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index 7fac47e62c..dfd674d139 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -273,7 +273,7 @@ impl Procedure for DropTableProcedure { table_ref.table, ); - LockKey::single(key) + LockKey::single_exclusive(key) } } diff --git a/src/common/meta/src/ddl/truncate_table.rs b/src/common/meta/src/ddl/truncate_table.rs index ec5a7897cd..90f746104c 100644 --- a/src/common/meta/src/ddl/truncate_table.rs +++ b/src/common/meta/src/ddl/truncate_table.rs @@ -81,7 +81,7 @@ impl Procedure for TruncateTableProcedure { table_ref.table, ); - LockKey::single(key) + LockKey::single_exclusive(key) } } diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index ae01022c9c..30c0403f68 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod lock; mod runner; +mod rwlock; use std::collections::{HashMap, VecDeque}; use std::sync::atomic::{AtomicBool, Ordering}; @@ -29,11 +29,11 @@ use snafu::{ensure, ResultExt}; use tokio::sync::watch::{self, Receiver, Sender}; use tokio::sync::{Mutex as TokioMutex, Notify}; +use self::rwlock::KeyRwLock; use crate::error::{ DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu, Result, StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu, }; -use crate::local::lock::LockMap; use crate::local::runner::Runner; use crate::procedure::BoxedProcedureLoader; use crate::store::{ProcedureMessage, ProcedureStore, StateStoreRef}; @@ -57,8 +57,6 @@ const META_TTL: Duration = Duration::from_secs(60 * 10); pub(crate) struct ProcedureMeta { /// Id of this procedure. id: ProcedureId, - /// Notify to wait for a lock. - lock_notify: Notify, /// Parent procedure id. parent_id: Option, /// Notify to wait for subprocedures. @@ -78,7 +76,6 @@ impl ProcedureMeta { let (state_sender, state_receiver) = watch::channel(ProcedureState::Running); ProcedureMeta { id, - lock_notify: Notify::new(), parent_id, child_notify: Notify::new(), lock_key, @@ -131,7 +128,7 @@ struct LoadedProcedure { pub(crate) struct ManagerContext { /// Procedure loaders. The key is the type name of the procedure which the loader returns. loaders: Mutex>, - lock_map: LockMap, + key_lock: KeyRwLock, procedures: RwLock>, /// Messages loaded from the procedure store. messages: Mutex>, @@ -152,8 +149,8 @@ impl ManagerContext { /// Returns a new [ManagerContext]. fn new() -> ManagerContext { ManagerContext { + key_lock: KeyRwLock::new(), loaders: Mutex::new(HashMap::new()), - lock_map: LockMap::new(), procedures: RwLock::new(HashMap::new()), messages: Mutex::new(HashMap::new()), finished_procedures: Mutex::new(VecDeque::new()), @@ -850,7 +847,7 @@ mod tests { assert!(manager.procedure_watcher(procedure_id).is_none()); let mut procedure = ProcedureToLoad::new("submit"); - procedure.lock_key = LockKey::single("test.submit"); + procedure.lock_key = LockKey::single_exclusive("test.submit"); assert!(manager .submit(ProcedureWithId { id: procedure_id, @@ -918,7 +915,7 @@ mod tests { } fn lock_key(&self) -> LockKey { - LockKey::single("test.submit") + LockKey::single_exclusive("test.submit") } } @@ -955,7 +952,7 @@ mod tests { let manager = LocalManager::new(config, state_store); let mut procedure = ProcedureToLoad::new("submit"); - procedure.lock_key = LockKey::single("test.submit"); + procedure.lock_key = LockKey::single_exclusive("test.submit"); let procedure_id = ProcedureId::random(); assert_matches!( manager @@ -986,7 +983,7 @@ mod tests { manager.start().await.unwrap(); let mut procedure = ProcedureToLoad::new("submit"); - procedure.lock_key = LockKey::single("test.submit"); + procedure.lock_key = LockKey::single_exclusive("test.submit"); let procedure_id = ProcedureId::random(); assert!(manager .submit(ProcedureWithId { @@ -1018,7 +1015,7 @@ mod tests { manager.manager_ctx.set_running(); let mut procedure = ProcedureToLoad::new("submit"); - procedure.lock_key = LockKey::single("test.submit"); + procedure.lock_key = LockKey::single_exclusive("test.submit"); let procedure_id = ProcedureId::random(); assert!(manager .submit(ProcedureWithId { @@ -1041,7 +1038,7 @@ mod tests { // The remove_outdated_meta method has been stopped, so any procedure meta-data will not be automatically removed. manager.stop().await.unwrap(); let mut procedure = ProcedureToLoad::new("submit"); - procedure.lock_key = LockKey::single("test.submit"); + procedure.lock_key = LockKey::single_exclusive("test.submit"); let procedure_id = ProcedureId::random(); manager.manager_ctx.set_running(); @@ -1063,7 +1060,7 @@ mod tests { // After restart let mut procedure = ProcedureToLoad::new("submit"); - procedure.lock_key = LockKey::single("test.submit"); + procedure.lock_key = LockKey::single_exclusive("test.submit"); let procedure_id = ProcedureId::random(); assert!(manager .submit(ProcedureWithId { diff --git a/src/common/procedure/src/local/lock.rs b/src/common/procedure/src/local/lock.rs deleted file mode 100644 index 59e197d951..0000000000 --- a/src/common/procedure/src/local/lock.rs +++ /dev/null @@ -1,214 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::{HashMap, VecDeque}; -use std::sync::RwLock; - -use crate::local::ProcedureMetaRef; -use crate::ProcedureId; - -/// A lock entry. -#[derive(Debug)] -struct Lock { - /// Current lock owner. - owner: ProcedureMetaRef, - /// Waiter procedures. - waiters: VecDeque, -} - -impl Lock { - /// Returns a [Lock] with specific `owner` procedure. - fn from_owner(owner: ProcedureMetaRef) -> Lock { - Lock { - owner, - waiters: VecDeque::new(), - } - } - - /// Try to pop a waiter from the waiter list, set it as owner - /// and wake up the new owner. - /// - /// Returns false if there is no waiter in the waiter list. - fn switch_owner(&mut self) -> bool { - if let Some(waiter) = self.waiters.pop_front() { - // Update owner. - self.owner = waiter.clone(); - // We need to use notify_one() since the waiter may have not called `notified()` yet. - waiter.lock_notify.notify_one(); - true - } else { - false - } - } -} - -/// Manages lock entries for procedures. -pub(crate) struct LockMap { - locks: RwLock>, -} - -impl LockMap { - /// Returns a new [LockMap]. - pub(crate) fn new() -> LockMap { - LockMap { - locks: RwLock::new(HashMap::new()), - } - } - - /// Acquire lock by `key` for procedure with specific `meta`. - /// - /// Though `meta` is cloneable, callers must ensure that only one `meta` - /// is acquiring and holding the lock at the same time. - /// - /// # Panics - /// Panics if the procedure acquires the lock recursively. - pub(crate) async fn acquire_lock(&self, key: &str, meta: ProcedureMetaRef) { - assert!(!self.hold_lock(key, meta.id)); - - { - let mut locks = self.locks.write().unwrap(); - if let Some(lock) = locks.get_mut(key) { - // Lock already exists, but we don't expect that a procedure acquires - // the same lock again. - assert_ne!(lock.owner.id, meta.id); - - // Add this procedure to the waiter list. Here we don't check - // whether the procedure is already in the waiter list as we - // expect that a procedure should not wait for two lock simultaneously. - lock.waiters.push_back(meta.clone()); - } else { - let _ = locks.insert(key.to_string(), Lock::from_owner(meta)); - - return; - } - } - - // Wait for notify. - meta.lock_notify.notified().await; - - assert!(self.hold_lock(key, meta.id)); - } - - /// Release lock by `key`. - pub(crate) fn release_lock(&self, key: &str, procedure_id: ProcedureId) { - let mut locks = self.locks.write().unwrap(); - if let Some(lock) = locks.get_mut(key) { - if lock.owner.id != procedure_id { - // This is not the lock owner. - return; - } - - if !lock.switch_owner() { - // No body waits for this lock, we can remove the lock entry. - let _ = locks.remove(key); - } - } - } - - /// Returns true if the procedure with specific `procedure_id` holds the - /// lock of `key`. - fn hold_lock(&self, key: &str, procedure_id: ProcedureId) -> bool { - let locks = self.locks.read().unwrap(); - locks - .get(key) - .map(|lock| lock.owner.id == procedure_id) - .unwrap_or(false) - } - - /// Returns true if the procedure is waiting for the lock `key`. - #[cfg(test)] - fn waiting_lock(&self, key: &str, procedure_id: ProcedureId) -> bool { - let locks = self.locks.read().unwrap(); - locks - .get(key) - .map(|lock| lock.waiters.iter().any(|meta| meta.id == procedure_id)) - .unwrap_or(false) - } -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use super::*; - use crate::local::test_util; - - #[test] - fn test_lock_no_waiter() { - let meta = Arc::new(test_util::procedure_meta_for_test()); - let mut lock = Lock::from_owner(meta); - - assert!(!lock.switch_owner()); - } - - #[tokio::test] - async fn test_lock_with_waiter() { - let owner = Arc::new(test_util::procedure_meta_for_test()); - let mut lock = Lock::from_owner(owner); - - let waiter = Arc::new(test_util::procedure_meta_for_test()); - lock.waiters.push_back(waiter.clone()); - - assert!(lock.switch_owner()); - assert!(lock.waiters.is_empty()); - - waiter.lock_notify.notified().await; - assert_eq!(lock.owner.id, waiter.id); - } - - #[tokio::test] - async fn test_lock_map() { - let key = "hello"; - - let owner = Arc::new(test_util::procedure_meta_for_test()); - let lock_map = Arc::new(LockMap::new()); - lock_map.acquire_lock(key, owner.clone()).await; - - let waiter = Arc::new(test_util::procedure_meta_for_test()); - let waiter_id = waiter.id; - - // Waiter release the lock, this should not take effect. - lock_map.release_lock(key, waiter_id); - - let lock_map2 = lock_map.clone(); - let owner_id = owner.id; - let handle = tokio::spawn(async move { - assert!(lock_map2.hold_lock(key, owner_id)); - assert!(!lock_map2.hold_lock(key, waiter_id)); - - // Waiter wait for lock. - lock_map2.acquire_lock(key, waiter.clone()).await; - - assert!(lock_map2.hold_lock(key, waiter_id)); - }); - - // Owner still holds the lock. - assert!(lock_map.hold_lock(key, owner_id)); - - // Wait until the waiter acquired the lock - while !lock_map.waiting_lock(key, waiter_id) { - tokio::time::sleep(std::time::Duration::from_millis(5)).await; - } - // Release lock - lock_map.release_lock(key, owner_id); - assert!(!lock_map.hold_lock(key, owner_id)); - - // Wait for task. - handle.await.unwrap(); - // The waiter should hold the lock now. - assert!(lock_map.hold_lock(key, waiter_id)); - - lock_map.release_lock(key, waiter_id); - } -} diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index 0b50f4497f..87f2e2f635 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -19,8 +19,10 @@ use backon::{BackoffBuilder, ExponentialBuilder}; use common_telemetry::logging; use tokio::time; +use super::rwlock::OwnedKeyRwLockGuard; use crate::error::{self, ProcedurePanicSnafu, Result}; use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef}; +use crate::procedure::StringKey; use crate::store::ProcedureStore; use crate::ProcedureState::Retrying; use crate::{BoxedProcedure, Context, Error, ProcedureId, ProcedureState, ProcedureWithId, Status}; @@ -56,6 +58,7 @@ impl ExecResult { struct ProcedureGuard { meta: ProcedureMetaRef, manager_ctx: Arc, + key_guards: Vec, finish: bool, } @@ -65,6 +68,7 @@ impl ProcedureGuard { ProcedureGuard { meta, manager_ctx, + key_guards: vec![], finish: false, } } @@ -95,10 +99,15 @@ impl Drop for ProcedureGuard { self.manager_ctx.notify_by_subprocedure(parent_id); } - // Release lock in reverse order. - for key in self.meta.lock_key.keys_to_unlock() { - self.manager_ctx.lock_map.release_lock(key, self.meta.id); + // Drops the key guards in the reverse order. + while !self.key_guards.is_empty() { + self.key_guards.pop(); } + + // Clean the staled locks. + self.manager_ctx + .key_lock + .clean_keys(self.meta.lock_key.keys_to_lock().map(|k| k.as_string())); } } @@ -121,7 +130,7 @@ impl Runner { /// Run the procedure. pub(crate) async fn run(mut self) { // Ensure we can update the procedure state. - let guard = ProcedureGuard::new(self.meta.clone(), self.manager_ctx.clone()); + let mut guard = ProcedureGuard::new(self.meta.clone(), self.manager_ctx.clone()); logging::info!( "Runner {}-{} starts", @@ -133,10 +142,14 @@ impl Runner { // recursive locking by adding a root procedure id to the meta. for key in self.meta.lock_key.keys_to_lock() { // Acquire lock for each key. - self.manager_ctx - .lock_map - .acquire_lock(key, self.meta.clone()) - .await; + let key_guard = match key { + StringKey::Share(key) => self.manager_ctx.key_lock.read(key.clone()).await.into(), + StringKey::Exclusive(key) => { + self.manager_ctx.key_lock.write(key.clone()).await.into() + } + }; + + guard.key_guards.push(key_guard); } // Execute the procedure. We need to release the lock whenever the the execution @@ -604,7 +617,7 @@ mod tests { }; let normal = ProcedureAdapter { data: "normal".to_string(), - lock_key: LockKey::single("catalog.schema.table"), + lock_key: LockKey::single_exclusive("catalog.schema.table"), exec_fn, }; @@ -665,7 +678,7 @@ mod tests { }; let suspend = ProcedureAdapter { data: "suspend".to_string(), - lock_key: LockKey::single("catalog.schema.table"), + lock_key: LockKey::single_exclusive("catalog.schema.table"), exec_fn, }; @@ -697,7 +710,7 @@ mod tests { }; let child = ProcedureAdapter { data: "child".to_string(), - lock_key: LockKey::new(keys.iter().map(|k| k.to_string())), + lock_key: LockKey::new_exclusive(keys.iter().map(|k| k.to_string())), exec_fn, }; @@ -765,7 +778,7 @@ mod tests { }; let parent = ProcedureAdapter { data: "parent".to_string(), - lock_key: LockKey::single("catalog.schema.table"), + lock_key: LockKey::single_exclusive("catalog.schema.table"), exec_fn, }; @@ -784,6 +797,7 @@ mod tests { runner.manager_ctx = manager_ctx.clone(); runner.run().await; + assert!(manager_ctx.key_lock.is_empty()); // Check child procedures. for child_id in children_ids { @@ -810,7 +824,7 @@ mod tests { let exec_fn = move |_| async move { Ok(Status::Executing { persist: true }) }.boxed(); let normal = ProcedureAdapter { data: "normal".to_string(), - lock_key: LockKey::single("catalog.schema.table"), + lock_key: LockKey::single_exclusive("catalog.schema.table"), exec_fn, }; @@ -851,7 +865,7 @@ mod tests { |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed(); let normal = ProcedureAdapter { data: "fail".to_string(), - lock_key: LockKey::single("catalog.schema.table"), + lock_key: LockKey::single_exclusive("catalog.schema.table"), exec_fn, }; @@ -875,7 +889,7 @@ mod tests { |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed(); let fail = ProcedureAdapter { data: "fail".to_string(), - lock_key: LockKey::single("catalog.schema.table"), + lock_key: LockKey::single_exclusive("catalog.schema.table"), exec_fn, }; @@ -917,7 +931,7 @@ mod tests { let retry_later = ProcedureAdapter { data: "retry_later".to_string(), - lock_key: LockKey::single("catalog.schema.table"), + lock_key: LockKey::single_exclusive("catalog.schema.table"), exec_fn, }; @@ -952,7 +966,7 @@ mod tests { let exceed_max_retry_later = ProcedureAdapter { data: "exceed_max_retry_later".to_string(), - lock_key: LockKey::single("catalog.schema.table"), + lock_key: LockKey::single_exclusive("catalog.schema.table"), exec_fn, }; @@ -993,7 +1007,7 @@ mod tests { }; let fail = ProcedureAdapter { data: "fail".to_string(), - lock_key: LockKey::single("catalog.schema.table.region-0"), + lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"), exec_fn, }; @@ -1027,7 +1041,7 @@ mod tests { }; let parent = ProcedureAdapter { data: "parent".to_string(), - lock_key: LockKey::single("catalog.schema.table"), + lock_key: LockKey::single_exclusive("catalog.schema.table"), exec_fn, }; @@ -1042,10 +1056,11 @@ mod tests { // Manually add this procedure to the manager ctx. assert!(manager_ctx.try_insert_procedure(meta.clone())); // Replace the manager ctx. - runner.manager_ctx = manager_ctx; + runner.manager_ctx = manager_ctx.clone(); // Run the runner and execute the procedure. runner.run().await; + assert!(manager_ctx.key_lock.is_empty()); let err = meta.state().error().unwrap().output_msg(); assert!(err.contains("subprocedure failed"), "{err}"); } diff --git a/src/common/procedure/src/local/rwlock.rs b/src/common/procedure/src/local/rwlock.rs new file mode 100644 index 0000000000..a170132036 --- /dev/null +++ b/src/common/procedure/src/local/rwlock.rs @@ -0,0 +1,247 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::hash::Hash; +use std::sync::{Arc, Mutex}; + +use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock}; + +pub enum OwnedKeyRwLockGuard { + Read(OwnedRwLockReadGuard<()>), + Write(OwnedRwLockWriteGuard<()>), +} + +impl From> for OwnedKeyRwLockGuard { + fn from(guard: OwnedRwLockReadGuard<()>) -> Self { + OwnedKeyRwLockGuard::Read(guard) + } +} + +impl From> for OwnedKeyRwLockGuard { + fn from(guard: OwnedRwLockWriteGuard<()>) -> Self { + OwnedKeyRwLockGuard::Write(guard) + } +} + +/// Locks based on a key, allowing other keys to lock independently. +#[derive(Debug)] +pub struct KeyRwLock { + /// The inner map of locks for specific keys. + inner: Mutex>>>, +} + +impl KeyRwLock +where + K: Eq + Hash + Clone, +{ + pub fn new() -> Self { + KeyRwLock { + inner: Default::default(), + } + } + + /// Locks the key with shared read access, returning a guard. + pub async fn read(&self, key: K) -> OwnedRwLockReadGuard<()> { + let lock = { + let mut locks = self.inner.lock().unwrap(); + locks.entry(key).or_default().clone() + }; + + lock.read_owned().await + } + + /// Locks the key with exclusive write access, returning a guard. + pub async fn write(&self, key: K) -> OwnedRwLockWriteGuard<()> { + let lock = { + let mut locks = self.inner.lock().unwrap(); + locks.entry(key).or_default().clone() + }; + + lock.write_owned().await + } + + /// Clean up stale locks. + /// + /// Note: It only cleans a lock if + /// - Its strong ref count equals one. + /// - Able to acquire the write lock. + pub fn clean_keys<'a>(&'a self, iter: impl IntoIterator) { + let mut locks = self.inner.lock().unwrap(); + let mut keys = Vec::new(); + for key in iter { + if let Some(lock) = locks.get(key) { + if lock.try_write().is_ok() { + debug_assert_eq!(Arc::weak_count(lock), 0); + // Ensures nobody keeps this ref. + if Arc::strong_count(lock) == 1 { + keys.push(key); + } + } + } + } + + for key in keys { + locks.remove(key); + } + } +} + +#[cfg(test)] +impl KeyRwLock +where + K: Eq + Hash + Clone, +{ + /// Tries to lock the key with shared read access, returning immediately. + pub fn try_read(&self, key: K) -> Result, tokio::sync::TryLockError> { + let lock = { + let mut locks = self.inner.lock().unwrap(); + locks.entry(key).or_default().clone() + }; + + lock.try_read_owned() + } + + /// Tries lock this key with exclusive write access, returning immediately. + pub fn try_write( + &self, + key: K, + ) -> Result, tokio::sync::TryLockError> { + let lock = { + let mut locks = self.inner.lock().unwrap(); + locks.entry(key).or_default().clone() + }; + + lock.try_write_owned() + } + + /// Returns number of keys. + pub fn len(&self) -> usize { + self.inner.lock().unwrap().len() + } + + /// Returns true the inner map is empty. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_naive() { + let lock_key = KeyRwLock::new(); + + { + let _guard = lock_key.read("test1").await; + assert_eq!(lock_key.len(), 1); + assert!(lock_key.try_read("test1").is_ok()); + assert!(lock_key.try_write("test1").is_err()); + } + + { + let _guard0 = lock_key.write("test2").await; + let _guard = lock_key.write("test1").await; + assert_eq!(lock_key.len(), 2); + assert!(lock_key.try_read("test1").is_err()); + assert!(lock_key.try_write("test1").is_err()); + } + + assert_eq!(lock_key.len(), 2); + + lock_key.clean_keys(&vec!["test1", "test2"]); + assert!(lock_key.is_empty()); + + let mut guards = Vec::new(); + for key in ["test1", "test2"] { + guards.push(lock_key.read(key).await); + } + while !guards.is_empty() { + guards.pop(); + } + lock_key.clean_keys(vec![&"test1", &"test2"]); + assert_eq!(lock_key.len(), 0); + } + + #[tokio::test] + async fn test_clean_keys() { + let lock_key = KeyRwLock::<&str>::new(); + { + let rwlock = { + lock_key + .inner + .lock() + .unwrap() + .entry("test") + .or_default() + .clone() + }; + assert_eq!(Arc::strong_count(&rwlock), 2); + let _guard = rwlock.read_owned().await; + + { + let inner = lock_key.inner.lock().unwrap(); + let rwlock = inner.get("test").unwrap(); + assert_eq!(Arc::strong_count(rwlock), 2); + } + } + + { + let rwlock = { + lock_key + .inner + .lock() + .unwrap() + .entry("test") + .or_default() + .clone() + }; + assert_eq!(Arc::strong_count(&rwlock), 2); + let _guard = rwlock.write_owned().await; + + { + let inner = lock_key.inner.lock().unwrap(); + let rwlock = inner.get("test").unwrap(); + assert_eq!(Arc::strong_count(rwlock), 2); + } + } + + { + let inner = lock_key.inner.lock().unwrap(); + let rwlock = inner.get("test").unwrap(); + assert_eq!(Arc::strong_count(rwlock), 1); + } + + // Someone has the ref of the rwlock, but it waits to be granted the lock. + let rwlock = { + lock_key + .inner + .lock() + .unwrap() + .entry("test") + .or_default() + .clone() + }; + assert_eq!(Arc::strong_count(&rwlock), 2); + // However, One thread trying to remove the "test" key should have no effect. + lock_key.clean_keys(vec![&"test"]); + // Should get the rwlock. + { + let inner = lock_key.inner.lock().unwrap(); + inner.get("test").unwrap(); + } + } +} diff --git a/src/common/procedure/src/procedure.rs b/src/common/procedure/src/procedure.rs index 54f34b7d7c..2df005bdf0 100644 --- a/src/common/procedure/src/procedure.rs +++ b/src/common/procedure/src/procedure.rs @@ -116,22 +116,49 @@ impl Procedure for Box { } } +#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub enum StringKey { + Share(String), + Exclusive(String), +} + /// Keys to identify required locks. /// /// [LockKey] always sorts keys lexicographically so that they can be acquired /// in the same order. -// Most procedures should only acquire 1 ~ 2 locks so we use smallvec to hold keys. +/// Most procedures should only acquire 1 ~ 2 locks so we use smallvec to hold keys. #[derive(Clone, Debug, Default, PartialEq, Eq)] -pub struct LockKey(SmallVec<[String; 2]>); +pub struct LockKey(SmallVec<[StringKey; 2]>); + +impl StringKey { + pub fn into_string(self) -> String { + match self { + StringKey::Share(s) => s, + StringKey::Exclusive(s) => s, + } + } + + pub fn as_string(&self) -> &String { + match self { + StringKey::Share(s) => s, + StringKey::Exclusive(s) => s, + } + } +} impl LockKey { /// Returns a new [LockKey] with only one key. - pub fn single(key: impl Into) -> LockKey { + pub fn single(key: impl Into) -> LockKey { LockKey(smallvec![key.into()]) } + /// Returns a new [LockKey] with only one key. + pub fn single_exclusive(key: impl Into) -> LockKey { + LockKey(smallvec![StringKey::Exclusive(key.into())]) + } + /// Returns a new [LockKey] with keys from specific `iter`. - pub fn new(iter: impl IntoIterator) -> LockKey { + pub fn new(iter: impl IntoIterator) -> LockKey { let mut vec: SmallVec<_> = iter.into_iter().collect(); vec.sort(); // Dedup keys to avoid acquiring the same key multiple times. @@ -139,14 +166,14 @@ impl LockKey { LockKey(vec) } - /// Returns the keys to lock. - pub fn keys_to_lock(&self) -> impl Iterator { - self.0.iter() + /// Returns a new [LockKey] with keys from specific `iter`. + pub fn new_exclusive(iter: impl IntoIterator) -> LockKey { + Self::new(iter.into_iter().map(StringKey::Exclusive)) } - /// Returns the keys to unlock. - pub fn keys_to_unlock(&self) -> impl Iterator { - self.0.iter().rev() + /// Returns the keys to lock. + pub fn keys_to_lock(&self) -> impl Iterator { + self.0.iter() } } @@ -340,20 +367,25 @@ mod tests { #[test] fn test_lock_key() { let entity = "catalog.schema.my_table"; - let key = LockKey::single(entity); - assert_eq!(vec![entity], key.keys_to_lock().collect::>()); - assert_eq!(vec![entity], key.keys_to_unlock().collect::>()); + let key = LockKey::single_exclusive(entity); + assert_eq!( + vec![&StringKey::Exclusive(entity.to_string())], + key.keys_to_lock().collect::>() + ); - let key = LockKey::new([ + let key = LockKey::new_exclusive([ "b".to_string(), "c".to_string(), "a".to_string(), "c".to_string(), ]); - assert_eq!(vec!["a", "b", "c"], key.keys_to_lock().collect::>()); assert_eq!( - vec!["c", "b", "a"], - key.keys_to_unlock().collect::>() + vec![ + &StringKey::Exclusive("a".to_string()), + &StringKey::Exclusive("b".to_string()), + &StringKey::Exclusive("c".to_string()) + ], + key.keys_to_lock().collect::>() ); } diff --git a/src/common/procedure/src/watcher.rs b/src/common/procedure/src/watcher.rs index 75cf777bee..584aae520d 100644 --- a/src/common/procedure/src/watcher.rs +++ b/src/common/procedure/src/watcher.rs @@ -98,7 +98,7 @@ mod tests { } fn lock_key(&self) -> LockKey { - LockKey::single("test.submit") + LockKey::single_exclusive("test.submit") } } diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index 50ab0e7423..37468437b2 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -373,7 +373,7 @@ impl Procedure for RegionFailoverProcedure { fn lock_key(&self) -> LockKey { let region_ident = &self.node.failed_region; let region_key = region_lock_key(region_ident.table_id, region_ident.region_number); - LockKey::single(region_key) + LockKey::single_exclusive(region_key) } } diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index de1dd46d4f..ec46686f47 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -419,7 +419,7 @@ impl Procedure for RegionMigrationProcedure { fn lock_key(&self) -> LockKey { let key = self.context.persistent_ctx.lock_key(); - LockKey::single(key) + LockKey::single_exclusive(key) } } @@ -455,7 +455,11 @@ mod tests { let procedure = RegionMigrationProcedure::new(persistent_context, context); let key = procedure.lock_key(); - let keys = key.keys_to_lock().cloned().collect::>(); + let keys = key + .keys_to_lock() + .cloned() + .map(|s| s.into_string()) + .collect::>(); assert!(keys.contains(&expected_key)); }