diff --git a/src/common/procedure/src/lib.rs b/src/common/procedure/src/lib.rs index a8d1628bb0..8108025cc9 100644 --- a/src/common/procedure/src/lib.rs +++ b/src/common/procedure/src/lib.rs @@ -15,6 +15,8 @@ //! Common traits and structures for the procedure framework. pub mod error; +#[allow(dead_code)] +mod local; mod procedure; // TODO(yingwen): Remove this attribute once ProcedureManager is implemented. #[allow(dead_code)] diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs new file mode 100644 index 0000000000..507dc5e3e9 --- /dev/null +++ b/src/common/procedure/src/local.rs @@ -0,0 +1,114 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod lock; + +use std::sync::{Arc, Mutex}; + +use tokio::sync::Notify; + +use crate::{LockKey, ProcedureId, ProcedureState}; + +/// Mutable metadata of a procedure during execution. +#[derive(Debug)] +struct ExecMeta { + /// Current procedure state. + state: ProcedureState, +} + +/// Shared metadata of a procedure. +/// +/// # Note +/// [Notify] is not a condition variable, we can't guarantee the waiters are notified +/// if they didn't call `notified()` before we signal the notify. So we +/// 1. use dedicated notify for each condition, such as waiting for a lock, waiting +/// for children; +/// 2. always use `notify_one` and ensure there are only one waiter. +#[derive(Debug)] +struct ProcedureMeta { + /// Id of this procedure. + id: ProcedureId, + /// Notify to wait for a lock. + lock_notify: Notify, + /// Parent procedure id. + parent_id: Option, + /// Notify to wait for subprocedures. + child_notify: Notify, + /// Locks inherted from the parent procedure. + parent_locks: Vec, + /// Lock not in `parent_locks` but required by this procedure. + /// + /// If the parent procedure already owns the lock that this procedure + /// needs, we set this field to `None`. + lock_key: Option, + /// Mutable status during execution. + exec_meta: Mutex, +} + +impl ProcedureMeta { + /// Return all locks the procedure needs. + fn locks_needed(&self) -> Vec { + let num_locks = self.parent_locks.len() + if self.lock_key.is_some() { 1 } else { 0 }; + let mut locks = Vec::with_capacity(num_locks); + locks.extend_from_slice(&self.parent_locks); + if let Some(key) = &self.lock_key { + locks.push(key.clone()); + } + + locks + } +} + +/// Reference counted pointer to [ProcedureMeta]. +type ProcedureMetaRef = Arc; + +/// Create a new [ProcedureMeta] for test purpose. +#[cfg(test)] +fn procedure_meta_for_test() -> ProcedureMeta { + ProcedureMeta { + id: ProcedureId::random(), + lock_notify: Notify::new(), + parent_id: None, + child_notify: Notify::new(), + parent_locks: Vec::new(), + lock_key: None, + exec_meta: Mutex::new(ExecMeta { + state: ProcedureState::Running, + }), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_locks_needed() { + let mut meta = procedure_meta_for_test(); + let locks = meta.locks_needed(); + assert!(locks.is_empty()); + + let parent_locks = vec![LockKey::new("a"), LockKey::new("b")]; + meta.parent_locks = parent_locks.clone(); + let locks = meta.locks_needed(); + assert_eq!(parent_locks, locks); + + meta.lock_key = Some(LockKey::new("c")); + let locks = meta.locks_needed(); + assert_eq!( + vec![LockKey::new("a"), LockKey::new("b"), LockKey::new("c")], + locks + ); + } +} diff --git a/src/common/procedure/src/local/lock.rs b/src/common/procedure/src/local/lock.rs new file mode 100644 index 0000000000..bd337c948d --- /dev/null +++ b/src/common/procedure/src/local/lock.rs @@ -0,0 +1,214 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, VecDeque}; +use std::sync::RwLock; + +use crate::local::ProcedureMetaRef; +use crate::ProcedureId; + +/// A lock entry. +#[derive(Debug)] +struct Lock { + /// Current lock owner. + owner: ProcedureMetaRef, + /// Waiter procedures. + waiters: VecDeque, +} + +impl Lock { + /// Returns a [Lock] with specific `owner` procedure. + fn from_owner(owner: ProcedureMetaRef) -> Lock { + Lock { + owner, + waiters: VecDeque::new(), + } + } + + /// Try to pop a waiter from the waiter list, set it as owner + /// and wake up the new owner. + /// + /// Returns false if there is no waiter in the waiter list. + fn switch_owner(&mut self) -> bool { + if let Some(waiter) = self.waiters.pop_front() { + // Update owner. + self.owner = waiter.clone(); + // We need to use notify_one() since the waiter may have not called `notified()` yet. + waiter.lock_notify.notify_one(); + true + } else { + false + } + } +} + +/// Manages lock entries for procedures. +struct LockMap { + locks: RwLock>, +} + +impl LockMap { + /// Returns a new [LockMap]. + fn new() -> LockMap { + LockMap { + locks: RwLock::new(HashMap::new()), + } + } + + /// Acquire lock by `key` for procedure with specific `meta`. + /// + /// Though `meta` is cloneable, callers must ensure that only one `meta` + /// is acquiring and holding the lock at the same time. + /// + /// # Panics + /// Panics if the procedure acquires the lock recursively. + async fn acquire_lock(&self, key: &str, meta: ProcedureMetaRef) { + assert!(!self.hold_lock(key, meta.id)); + + { + let mut locks = self.locks.write().unwrap(); + if let Some(lock) = locks.get_mut(key) { + // Lock already exists, but we don't expect that a procedure acquires + // the same lock again. + assert_ne!(lock.owner.id, meta.id); + + // Add this procedure to the waiter list. Here we don't check + // whether the procedure is already in the waiter list as we + // expect that a procedure should not wait for two lock simultaneously. + lock.waiters.push_back(meta.clone()); + } else { + locks.insert(key.to_string(), Lock::from_owner(meta)); + + return; + } + } + + // Wait for notify. + meta.lock_notify.notified().await; + + assert!(self.hold_lock(key, meta.id)); + } + + /// Release lock by `key`. + fn release_lock(&self, key: &str, procedure_id: ProcedureId) { + let mut locks = self.locks.write().unwrap(); + if let Some(lock) = locks.get_mut(key) { + if lock.owner.id != procedure_id { + // This is not the lock owner. + return; + } + + if !lock.switch_owner() { + // No body waits for this lock, we can remove the lock entry. + locks.remove(key); + } + } + } + + /// Returns true if the procedure with specific `procedure_id` holds the + /// lock of `key`. + fn hold_lock(&self, key: &str, procedure_id: ProcedureId) -> bool { + let locks = self.locks.read().unwrap(); + locks + .get(key) + .map(|lock| lock.owner.id == procedure_id) + .unwrap_or(false) + } + + /// Returns true if the procedure is waiting for the lock `key`. + #[cfg(test)] + fn waiting_lock(&self, key: &str, procedure_id: ProcedureId) -> bool { + let locks = self.locks.read().unwrap(); + locks + .get(key) + .map(|lock| lock.waiters.iter().any(|meta| meta.id == procedure_id)) + .unwrap_or(false) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use crate::local; + + #[test] + fn test_lock_no_waiter() { + let meta = Arc::new(local::procedure_meta_for_test()); + let mut lock = Lock::from_owner(meta); + + assert!(!lock.switch_owner()); + } + + #[tokio::test] + async fn test_lock_with_waiter() { + let owner = Arc::new(local::procedure_meta_for_test()); + let mut lock = Lock::from_owner(owner); + + let waiter = Arc::new(local::procedure_meta_for_test()); + lock.waiters.push_back(waiter.clone()); + + assert!(lock.switch_owner()); + assert!(lock.waiters.is_empty()); + + waiter.lock_notify.notified().await; + assert_eq!(lock.owner.id, waiter.id); + } + + #[tokio::test] + async fn test_lock_map() { + let key = "hello"; + + let owner = Arc::new(local::procedure_meta_for_test()); + let lock_map = Arc::new(LockMap::new()); + lock_map.acquire_lock(key, owner.clone()).await; + + let waiter = Arc::new(local::procedure_meta_for_test()); + let waiter_id = waiter.id; + + // Waiter release the lock, this should not take effect. + lock_map.release_lock(key, waiter_id); + + let lock_map2 = lock_map.clone(); + let owner_id = owner.id; + let handle = tokio::spawn(async move { + assert!(lock_map2.hold_lock(key, owner_id)); + assert!(!lock_map2.hold_lock(key, waiter_id)); + + // Waiter wait for lock. + lock_map2.acquire_lock(key, waiter.clone()).await; + + assert!(lock_map2.hold_lock(key, waiter_id)); + }); + + // Owner still holds the lock. + assert!(lock_map.hold_lock(key, owner_id)); + + // Wait until the waiter acquired the lock + while !lock_map.waiting_lock(key, waiter_id) { + tokio::time::sleep(std::time::Duration::from_millis(5)).await; + } + // Release lock + lock_map.release_lock(key, owner_id); + assert!(!lock_map.hold_lock(key, owner_id)); + + // Wait for task. + handle.await.unwrap(); + // The waiter should hold the lock now. + assert!(lock_map.hold_lock(key, waiter_id)); + + lock_map.release_lock(key, waiter_id); + } +}