From 04afee216e23b9b9ccc188ecd192493784ce99c4 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Wed, 15 Feb 2023 18:04:19 +0800 Subject: [PATCH] feat(procedure): Support multi-lock keys and querying procedure state from context (#1006) * feat: Add ContextProvider to Context So procedures can query states of other procedures via the ContextProvider and they don't need to hold a ProcedureManagerRef * feat: Procedure supports acquring multiple lock keys * test: Use multi-locks in test * feat: Add keys_to_lock/unlock --- Cargo.lock | 1 + src/common/procedure/Cargo.toml | 1 + src/common/procedure/src/lib.rs | 2 +- src/common/procedure/src/local.rs | 22 ++-- src/common/procedure/src/local/runner.rs | 144 +++++++++++++---------- src/common/procedure/src/procedure.rs | 74 +++++++++--- src/common/procedure/src/store.rs | 4 +- 7 files changed, 157 insertions(+), 91 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 829b4ee1f5..b420c7e60d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1527,6 +1527,7 @@ dependencies = [ "object-store", "serde", "serde_json", + "smallvec", "snafu", "tempdir", "tokio", diff --git a/src/common/procedure/Cargo.toml b/src/common/procedure/Cargo.toml index c99c4359cc..83e601df61 100644 --- a/src/common/procedure/Cargo.toml +++ b/src/common/procedure/Cargo.toml @@ -13,6 +13,7 @@ futures.workspace = true object-store = { path = "../../object-store" } serde.workspace = true serde_json = "1.0" +smallvec = "1" snafu.workspace = true tokio.workspace = true uuid.workspace = true diff --git a/src/common/procedure/src/lib.rs b/src/common/procedure/src/lib.rs index 8108025cc9..1cdd70bf2a 100644 --- a/src/common/procedure/src/lib.rs +++ b/src/common/procedure/src/lib.rs @@ -24,6 +24,6 @@ mod store; pub use crate::error::{Error, Result}; pub use crate::procedure::{ - BoxedProcedure, Context, LockKey, Procedure, ProcedureId, ProcedureManager, + BoxedProcedure, Context, ContextProvider, LockKey, Procedure, ProcedureId, ProcedureManager, ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, }; diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index 9631347c28..cf4749e504 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -30,7 +30,8 @@ use crate::local::runner::Runner; use crate::procedure::BoxedProcedureLoader; use crate::store::{ObjectStateStore, ProcedureMessage, ProcedureStore, StateStoreRef}; use crate::{ - BoxedProcedure, LockKey, ProcedureId, ProcedureManager, ProcedureState, ProcedureWithId, + BoxedProcedure, ContextProvider, LockKey, ProcedureId, ProcedureManager, ProcedureState, + ProcedureWithId, }; /// Mutable metadata of a procedure during execution. @@ -70,7 +71,7 @@ pub(crate) struct ProcedureMeta { /// Notify to wait for subprocedures. child_notify: Notify, /// Lock required by this procedure. - lock_key: Option, + lock_key: LockKey, /// Mutable status during execution. exec_meta: Mutex, } @@ -128,6 +129,13 @@ pub(crate) struct ManagerContext { messages: Mutex>, } +#[async_trait] +impl ContextProvider for ManagerContext { + async fn procedure_state(&self, procedure_id: ProcedureId) -> Result> { + Ok(self.state(procedure_id)) + } +} + impl ManagerContext { /// Returns a new [ManagerContext]. fn new() -> ManagerContext { @@ -409,7 +417,7 @@ mod test_util { lock_notify: Notify::new(), parent_id: None, child_notify: Notify::new(), - lock_key: None, + lock_key: LockKey::default(), exec_meta: Mutex::new(ExecMeta::default()), } } @@ -508,8 +516,8 @@ mod tests { Ok(self.content.clone()) } - fn lock_key(&self) -> Option { - None + fn lock_key(&self) -> LockKey { + LockKey::default() } } @@ -617,8 +625,8 @@ mod tests { unimplemented!() } - fn lock_key(&self) -> Option { - Some(LockKey::new("test.submit")) + fn lock_key(&self) -> LockKey { + LockKey::single("test.submit") } } diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index e3d1ab485d..939e77ddf0 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -70,19 +70,14 @@ impl Runner { self.procedure.type_name(), self.meta.id ); - // We use the lock key in ProcedureMeta as it considers locks inherited from - // its parent. - let lock_key = self.meta.lock_key.clone(); - // TODO(yingwen): - // 1. Support multiple lock keys; - // 2. Detect recursive locking (and deadlock) if possible. Maybe we could detect + // TODO(yingwen): Detect recursive locking (and deadlock) if possible. Maybe we could detect // recursive locking by adding a root procedure id to the meta. - // Acquire lock if necessary. - if let Some(key) = &lock_key { + for key in self.meta.lock_key.keys_to_lock() { + // Acquire lock for each key. self.manager_ctx .lock_map - .acquire_lock(key.key(), self.meta.clone()) + .acquire_lock(key, self.meta.clone()) .await; } @@ -98,10 +93,9 @@ impl Runner { self.manager_ctx.notify_by_subprocedure(parent_id); } - if let Some(key) = &lock_key { - self.manager_ctx - .lock_map - .release_lock(key.key(), self.meta.id); + // Release lock in reverse order. + for key in self.meta.lock_key.keys_to_unlock() { + self.manager_ctx.lock_map.release_lock(key, self.meta.id); } // If this is the root procedure, clean up message cache. @@ -127,6 +121,7 @@ impl Runner { async fn execute_procedure_in_loop(&mut self) -> Result<()> { let ctx = Context { procedure_id: self.meta.id, + provider: self.manager_ctx.clone(), }; loop { @@ -366,7 +361,7 @@ mod tests { use super::*; use crate::local::test_util; - use crate::{LockKey, Procedure}; + use crate::{ContextProvider, LockKey, Procedure}; const ROOT_ID: &str = "9f805a1f-05f7-490c-9f91-bd56e3cc54c1"; @@ -397,10 +392,29 @@ mod tests { assert_eq!(files, files_in_dir); } + fn context_without_provider(procedure_id: ProcedureId) -> Context { + struct MockProvider; + + #[async_trait] + impl ContextProvider for MockProvider { + async fn procedure_state( + &self, + _procedure_id: ProcedureId, + ) -> Result> { + unimplemented!() + } + } + + Context { + procedure_id, + provider: Arc::new(MockProvider), + } + } + #[derive(Debug)] struct ProcedureAdapter { data: String, - lock_key: Option, + lock_key: LockKey, exec_fn: F, } @@ -417,14 +431,14 @@ mod tests { #[async_trait] impl Procedure for ProcedureAdapter where - F: FnMut() -> BoxFuture<'static, Result> + Send + Sync, + F: FnMut(Context) -> BoxFuture<'static, Result> + Send + Sync, { fn type_name(&self) -> &str { "ProcedureAdapter" } - async fn execute(&mut self, _ctx: &Context) -> Result { - let f = (self.exec_fn)(); + async fn execute(&mut self, ctx: &Context) -> Result { + let f = (self.exec_fn)(ctx.clone()); f.await } @@ -432,14 +446,14 @@ mod tests { Ok(self.data.clone()) } - fn lock_key(&self) -> Option { + fn lock_key(&self) -> LockKey { self.lock_key.clone() } } async fn execute_once_normal(persist: bool, first_files: &[&str], second_files: &[&str]) { let mut times = 0; - let exec_fn = move || { + let exec_fn = move |_| { times += 1; async move { if times == 1 { @@ -452,15 +466,13 @@ mod tests { }; let normal = ProcedureAdapter { data: "normal".to_string(), - lock_key: Some(LockKey::new("catalog.schema.table")), + lock_key: LockKey::single("catalog.schema.table"), exec_fn, }; let dir = TempDir::new("normal").unwrap(); let meta = normal.new_meta(ROOT_ID); - let ctx = Context { - procedure_id: meta.id, - }; + let ctx = context_without_provider(meta.id); let object_store = test_util::new_object_store(&dir); let procedure_store = ProcedureStore::from(object_store.clone()); let mut runner = new_runner(meta, Box::new(normal), procedure_store); @@ -491,7 +503,7 @@ mod tests { #[tokio::test] async fn test_on_suspend_empty() { - let exec_fn = move || { + let exec_fn = move |_| { async move { Ok(Status::Suspended { subprocedures: Vec::new(), @@ -502,15 +514,13 @@ mod tests { }; let suspend = ProcedureAdapter { data: "suspend".to_string(), - lock_key: Some(LockKey::new("catalog.schema.table")), + lock_key: LockKey::single("catalog.schema.table"), exec_fn, }; let dir = TempDir::new("suspend").unwrap(); let meta = suspend.new_meta(ROOT_ID); - let ctx = Context { - procedure_id: meta.id, - }; + let ctx = context_without_provider(meta.id); let object_store = test_util::new_object_store(&dir); let procedure_store = ProcedureStore::from(object_store.clone()); let mut runner = new_runner(meta, Box::new(suspend), procedure_store); @@ -519,9 +529,9 @@ mod tests { assert!(res.is_continue(), "{res:?}"); } - fn new_child_procedure(procedure_id: ProcedureId, key: &str) -> ProcedureWithId { + fn new_child_procedure(procedure_id: ProcedureId, keys: &[&str]) -> ProcedureWithId { let mut times = 0; - let exec_fn = move || { + let exec_fn = move |_| { times += 1; async move { if times == 1 { @@ -535,7 +545,7 @@ mod tests { }; let child = ProcedureAdapter { data: "child".to_string(), - lock_key: Some(LockKey::new(key)), + lock_key: LockKey::new(keys.iter().map(|k| k.to_string())), exec_fn, }; @@ -550,16 +560,18 @@ mod tests { let mut times = 0; let children_ids = [ProcedureId::random(), ProcedureId::random()]; let keys = [ - "catalog.schema.table.region-0", - "catalog.schema.table.region-1", + &[ + "catalog.schema.table.region-0", + "catalog.schema.table.region-1", + ], + &[ + "catalog.schema.table.region-2", + "catalog.schema.table.region-3", + ], ]; - let manager_ctx = Arc::new(ManagerContext::new()); - let ctx_in_fn = manager_ctx.clone(); - let exec_fn = move || { + let exec_fn = move |ctx: Context| { times += 1; - let ctx_in_future = ctx_in_fn.clone(); - async move { if times == 1 { // Submit subprocedures. @@ -567,15 +579,20 @@ mod tests { subprocedures: children_ids .into_iter() .zip(keys) - .map(|(id, key)| new_child_procedure(id, key)) + .map(|(id, key_slice)| new_child_procedure(id, key_slice)) .collect(), persist: true, }) } else { // Wait for subprocedures. - let all_child_done = children_ids - .iter() - .all(|id| ctx_in_future.state(*id) == Some(ProcedureState::Done)); + let mut all_child_done = true; + for id in children_ids { + if ctx.provider.procedure_state(id).await.unwrap() + != Some(ProcedureState::Done) + { + all_child_done = false; + } + } if all_child_done { Ok(Status::Done) } else { @@ -591,19 +608,20 @@ mod tests { }; let parent = ProcedureAdapter { data: "parent".to_string(), - lock_key: Some(LockKey::new("catalog.schema.table")), + lock_key: LockKey::single("catalog.schema.table"), exec_fn, }; let dir = TempDir::new("parent").unwrap(); let meta = parent.new_meta(ROOT_ID); let procedure_id = meta.id; - // Manually add this procedure to the manager ctx. - assert!(manager_ctx.try_insert_procedure(meta.clone())); let object_store = test_util::new_object_store(&dir); let procedure_store = ProcedureStore::from(object_store.clone()); - let mut runner = new_runner(meta, Box::new(parent), procedure_store); + let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store); + let manager_ctx = Arc::new(ManagerContext::new()); + // Manually add this procedure to the manager ctx. + assert!(manager_ctx.try_insert_procedure(meta)); // Replace the manager ctx. runner.manager_ctx = manager_ctx; @@ -629,18 +647,16 @@ mod tests { #[tokio::test] async fn test_execute_on_error() { let exec_fn = - || async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed(); + |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed(); let fail = ProcedureAdapter { data: "fail".to_string(), - lock_key: Some(LockKey::new("catalog.schema.table")), + lock_key: LockKey::single("catalog.schema.table"), exec_fn, }; let dir = TempDir::new("fail").unwrap(); let meta = fail.new_meta(ROOT_ID); - let ctx = Context { - procedure_id: meta.id, - }; + let ctx = context_without_provider(meta.id); let object_store = test_util::new_object_store(&dir); let procedure_store = ProcedureStore::from(object_store.clone()); let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store); @@ -654,24 +670,20 @@ mod tests { #[tokio::test] async fn test_child_error() { let mut times = 0; - let manager_ctx = Arc::new(ManagerContext::new()); let child_id = ProcedureId::random(); - let ctx_in_fn = manager_ctx.clone(); - let exec_fn = move || { + let exec_fn = move |ctx: Context| { times += 1; - let ctx_in_future = ctx_in_fn.clone(); - async move { if times == 1 { // Submit subprocedures. - let exec_fn = || { + let exec_fn = |_| { async { Err(Error::external(MockError::new(StatusCode::Unexpected))) } .boxed() }; let fail = ProcedureAdapter { data: "fail".to_string(), - lock_key: Some(LockKey::new("catalog.schema.table.region-0")), + lock_key: LockKey::single("catalog.schema.table.region-0"), exec_fn, }; @@ -684,8 +696,8 @@ mod tests { }) } else { // Wait for subprocedures. - logging::info!("child state is {:?}", ctx_in_future.state(child_id)); - if ctx_in_future.state(child_id) == Some(ProcedureState::Failed) { + let state = ctx.provider.procedure_state(child_id).await.unwrap(); + if state == Some(ProcedureState::Failed) { // The parent procedure to abort itself if child procedure is failed. Err(Error::external(PlainError::new( "subprocedure failed".to_string(), @@ -704,18 +716,20 @@ mod tests { }; let parent = ProcedureAdapter { data: "parent".to_string(), - lock_key: Some(LockKey::new("catalog.schema.table")), + lock_key: LockKey::single("catalog.schema.table"), exec_fn, }; let dir = TempDir::new("child_err").unwrap(); let meta = parent.new_meta(ROOT_ID); - // Manually add this procedure to the manager ctx. - assert!(manager_ctx.try_insert_procedure(meta.clone())); let object_store = test_util::new_object_store(&dir); let procedure_store = ProcedureStore::from(object_store.clone()); - let mut runner = new_runner(meta, Box::new(parent), procedure_store); + let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store); + + let manager_ctx = Arc::new(ManagerContext::new()); + // Manually add this procedure to the manager ctx. + assert!(manager_ctx.try_insert_procedure(meta)); // Replace the manager ctx. runner.manager_ctx = manager_ctx; diff --git a/src/common/procedure/src/procedure.rs b/src/common/procedure/src/procedure.rs index 627322552c..d9aff5955f 100644 --- a/src/common/procedure/src/procedure.rs +++ b/src/common/procedure/src/procedure.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use async_trait::async_trait; use serde::{Deserialize, Serialize}; +use smallvec::{smallvec, SmallVec}; use snafu::{ResultExt, Snafu}; use uuid::Uuid; @@ -58,11 +59,23 @@ impl Status { } } +/// [ContextProvider] provides information about procedures in the [ProcedureManager]. +#[async_trait] +pub trait ContextProvider: Send + Sync { + /// Query the procedure state. + async fn procedure_state(&self, procedure_id: ProcedureId) -> Result>; +} + +/// Reference-counted pointer to [ContextProvider]. +pub type ContextProviderRef = Arc; + /// Procedure execution context. -#[derive(Debug)] +#[derive(Clone)] pub struct Context { /// Id of the procedure. pub procedure_id: ProcedureId, + /// [ProcedureManager] context provider. + pub provider: ContextProviderRef, } /// A `Procedure` represents an operation or a set of operations to be performed step-by-step. @@ -79,25 +92,41 @@ pub trait Procedure: Send + Sync { /// Dump the state of the procedure to a string. fn dump(&self) -> Result; - /// Returns the [LockKey] if this procedure needs to acquire lock. - fn lock_key(&self) -> Option; + /// Returns the [LockKey] that this procedure needs to acquire. + fn lock_key(&self) -> LockKey; } -/// A key to identify the lock. -// We might hold multiple keys in this struct. When there are multiple keys, we need to sort the -// keys lock all the keys in order to avoid dead lock. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct LockKey(String); +/// Keys to identify required locks. +/// +/// [LockKey] always sorts keys lexicographically so that they can be acquired +/// in the same order. +// Most procedures should only acquire 1 ~ 2 locks so we use smallvec to hold keys. +#[derive(Clone, Debug, Default, PartialEq, Eq)] +pub struct LockKey(SmallVec<[String; 2]>); impl LockKey { - /// Returns a new [LockKey]. - pub fn new(key: impl Into) -> LockKey { - LockKey(key.into()) + /// Returns a new [LockKey] with only one key. + pub fn single(key: impl Into) -> LockKey { + LockKey(smallvec![key.into()]) } - /// Returns the lock key. - pub fn key(&self) -> &str { - &self.0 + /// Returns a new [LockKey] with keys from specific `iter`. + pub fn new(iter: impl IntoIterator) -> LockKey { + let mut vec: SmallVec<_> = iter.into_iter().collect(); + vec.sort(); + // Dedup keys to avoid acquiring the same key multiple times. + vec.dedup(); + LockKey(vec) + } + + /// Returns the keys to lock. + pub fn keys_to_lock(&self) -> impl Iterator { + self.0.iter() + } + + /// Returns the keys to unlock. + pub fn keys_to_unlock(&self) -> impl Iterator { + self.0.iter().rev() } } @@ -235,8 +264,21 @@ mod tests { #[test] fn test_lock_key() { let entity = "catalog.schema.my_table"; - let key = LockKey::new(entity); - assert_eq!(entity, key.key()); + let key = LockKey::single(entity); + assert_eq!(vec![entity], key.keys_to_lock().collect::>()); + assert_eq!(vec![entity], key.keys_to_unlock().collect::>()); + + let key = LockKey::new([ + "b".to_string(), + "c".to_string(), + "a".to_string(), + "c".to_string(), + ]); + assert_eq!(vec!["a", "b", "c"], key.keys_to_lock().collect::>()); + assert_eq!( + vec!["c", "b", "a"], + key.keys_to_unlock().collect::>() + ); } #[test] diff --git a/src/common/procedure/src/store.rs b/src/common/procedure/src/store.rs index 3f7ee76396..680c2ecf9d 100644 --- a/src/common/procedure/src/store.rs +++ b/src/common/procedure/src/store.rs @@ -365,8 +365,8 @@ mod tests { Ok(self.data.clone()) } - fn lock_key(&self) -> Option { - None + fn lock_key(&self) -> LockKey { + LockKey::default() } }