diff --git a/Cargo.lock b/Cargo.lock index a98a7b047a..6a2282ff22 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1523,6 +1523,7 @@ dependencies = [ "common-runtime", "common-telemetry", "futures", + "futures-util", "object-store", "serde", "serde_json", diff --git a/docs/rfcs/2023-01-03-procedure-framework.md b/docs/rfcs/2023-01-03-procedure-framework.md index a428c6ab31..47ff9b2778 100644 --- a/docs/rfcs/2023-01-03-procedure-framework.md +++ b/docs/rfcs/2023-01-03-procedure-framework.md @@ -140,8 +140,6 @@ Rollback is complicated to implement so some procedures might not support rollba ## Locking The `ProcedureManager` can provide a locking mechanism that gives a procedure read/write access to a database object such as a table so other procedures are unable to modify the same table while the current one is executing. -Sub-procedures always inherit their parents' locks. The `ProcedureManager` only acquires locks for a procedure if its parent doesn't hold the lock. - # Drawbacks The `Procedure` framework introduces additional complexity and overhead to our database. - To execute a `Procedure`, we need to write to the `ProcedureStore` multiple times, which may slow down the server diff --git a/src/common/procedure/Cargo.toml b/src/common/procedure/Cargo.toml index 23f40ce307..c99c4359cc 100644 --- a/src/common/procedure/Cargo.toml +++ b/src/common/procedure/Cargo.toml @@ -18,4 +18,5 @@ tokio.workspace = true uuid.workspace = true [dev-dependencies] +futures-util.workspace = true tempdir = "0.3" diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index db16e0299e..fa8500256c 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -15,10 +15,11 @@ mod lock; mod runner; -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::sync::{Arc, Mutex, RwLock}; use async_trait::async_trait; +use common_telemetry::logging; use object_store::ObjectStore; use snafu::ensure; use tokio::sync::Notify; @@ -37,6 +38,17 @@ use crate::{ struct ExecMeta { /// Current procedure state. state: ProcedureState, + /// Id of child procedures. + children: Vec, +} + +impl Default for ExecMeta { + fn default() -> ExecMeta { + ExecMeta { + state: ProcedureState::Running, + children: Vec::new(), + } + } } /// Shared metadata of a procedure. @@ -57,30 +69,13 @@ pub(crate) struct ProcedureMeta { parent_id: Option, /// Notify to wait for subprocedures. child_notify: Notify, - /// Locks inherted from the parent procedure. - parent_locks: Vec, - /// Lock not in `parent_locks` but required by this procedure. - /// - /// If the parent procedure already owns the lock that this procedure - /// needs, we set this field to `None`. + /// Lock required by this procedure. lock_key: Option, /// Mutable status during execution. exec_meta: Mutex, } impl ProcedureMeta { - /// Return all locks the procedure needs. - fn locks_needed(&self) -> Vec { - let num_locks = self.parent_locks.len() + if self.lock_key.is_some() { 1 } else { 0 }; - let mut locks = Vec::with_capacity(num_locks); - locks.extend_from_slice(&self.parent_locks); - if let Some(key) = &self.lock_key { - locks.push(key.clone()); - } - - locks - } - /// Returns current [ProcedureState]. fn state(&self) -> ProcedureState { let meta = self.exec_meta.lock().unwrap(); @@ -92,12 +87,33 @@ impl ProcedureMeta { let mut meta = self.exec_meta.lock().unwrap(); meta.state = state; } + + /// Push `procedure_id` of the subprocedure to the metadata. + fn push_child(&self, procedure_id: ProcedureId) { + let mut meta = self.exec_meta.lock().unwrap(); + meta.children.push(procedure_id); + } + + /// Append subprocedures to given `buffer`. + fn list_children(&self, buffer: &mut Vec) { + let meta = self.exec_meta.lock().unwrap(); + buffer.extend_from_slice(&meta.children); + } + + /// Returns the number of subprocedures. + fn num_children(&self) -> usize { + self.exec_meta.lock().unwrap().children.len() + } } /// Reference counted pointer to [ProcedureMeta]. type ProcedureMetaRef = Arc; -/// Procedure and its parent procedure id. -struct ProcedureAndParent(BoxedProcedure, Option); +/// Procedure loaded from store. +struct LoadedProcedure { + procedure: BoxedProcedure, + parent_id: Option, + step: u32, +} /// Shared context of the manager. pub(crate) struct ManagerContext { @@ -128,14 +144,20 @@ impl ManagerContext { procedures.contains_key(&procedure_id) } - /// Insert the `procedure` to the context. + /// Try to insert the `procedure` to the context if there is no procedure + /// with same [ProcedureId]. /// - /// # Panics - /// Panics if the procedure already exists. - fn insert_procedure(&self, meta: ProcedureMetaRef) { + /// Returns `false` if there is already a procedure using the same [ProcedureId]. + fn try_insert_procedure(&self, meta: ProcedureMetaRef) -> bool { let mut procedures = self.procedures.write().unwrap(); + if procedures.contains_key(&meta.id) { + return false; + } + let old = procedures.insert(meta.id, meta); - assert!(old.is_none()); + debug_assert!(old.is_none()); + + true } /// Returns the [ProcedureState] of specific `procedure_id`. @@ -143,6 +165,99 @@ impl ManagerContext { let procedures = self.procedures.read().unwrap(); procedures.get(&procedure_id).map(|meta| meta.state()) } + + /// Notify a suspended parent procedure with specific `procedure_id` by its subprocedure. + fn notify_by_subprocedure(&self, procedure_id: ProcedureId) { + let procedures = self.procedures.read().unwrap(); + if let Some(meta) = procedures.get(&procedure_id) { + meta.child_notify.notify_one(); + } + } + + /// Load procedure with specific `procedure_id` from cached [ProcedureMessage]s. + fn load_one_procedure(&self, procedure_id: ProcedureId) -> Option { + let messages = self.messages.lock().unwrap(); + let message = messages.get(&procedure_id)?; + + let loaders = self.loaders.lock().unwrap(); + let loader = loaders.get(&message.type_name).or_else(|| { + logging::error!( + "Loader not found, procedure_id: {}, type_name: {}", + procedure_id, + message.type_name + ); + None + })?; + + let procedure = loader(&message.data) + .map_err(|e| { + logging::error!( + "Failed to load procedure data, key: {}, source: {}", + procedure_id, + e + ); + e + }) + .ok()?; + + Some(LoadedProcedure { + procedure, + parent_id: message.parent_id, + step: message.step, + }) + } + + /// Returns all procedures in the tree (including given `root` procedure). + /// + /// If callers need a consistent view of the tree, they must ensure no new + /// procedure is added to the tree during using this method. + fn procedures_in_tree(&self, root: &ProcedureMetaRef) -> Vec { + let sub_num = root.num_children(); + // Reserve capacity for the root procedure and its children. + let mut procedures = Vec::with_capacity(1 + sub_num); + + let mut queue = VecDeque::with_capacity(1 + sub_num); + // Push the root procedure to the queue. + queue.push_back(root.clone()); + + let mut children_ids = Vec::with_capacity(sub_num); + let mut children = Vec::with_capacity(sub_num); + while let Some(meta) = queue.pop_front() { + procedures.push(meta.id); + + // Find metadatas of children. + children_ids.clear(); + meta.list_children(&mut children_ids); + self.find_procedures(&children_ids, &mut children); + + // Traverse children later. + for child in children.drain(..) { + queue.push_back(child); + } + } + + procedures + } + + /// Finds procedures by given `procedure_ids`. + /// + /// Ignores the id if corresponding procedure is not found. + fn find_procedures(&self, procedure_ids: &[ProcedureId], metas: &mut Vec) { + let procedures = self.procedures.read().unwrap(); + for procedure_id in procedure_ids { + if let Some(meta) = procedures.get(procedure_id) { + metas.push(meta.clone()); + } + } + } + + /// Remove cached [ProcedureMessage] by ids. + fn remove_messages(&self, procedure_ids: &[ProcedureId]) { + let mut messages = self.messages.lock().unwrap(); + for procedure_id in procedure_ids { + messages.remove(procedure_id); + } + } } /// Config for [LocalManager]. @@ -168,17 +283,19 @@ impl LocalManager { } /// Submit a root procedure with given `procedure_id`. - fn submit_root(&self, procedure_id: ProcedureId, step: u32, procedure: BoxedProcedure) { + fn submit_root( + &self, + procedure_id: ProcedureId, + step: u32, + procedure: BoxedProcedure, + ) -> Result<()> { let meta = Arc::new(ProcedureMeta { id: procedure_id, lock_notify: Notify::new(), parent_id: None, child_notify: Notify::new(), - parent_locks: Vec::new(), lock_key: procedure.lock_key(), - exec_meta: Mutex::new(ExecMeta { - state: ProcedureState::Running, - }), + exec_meta: Mutex::new(ExecMeta::default()), }); let runner = Runner { meta: meta.clone(), @@ -188,12 +305,18 @@ impl LocalManager { store: ProcedureStore::new(self.state_store.clone()), }; - self.manager_ctx.insert_procedure(meta); + // Inserts meta into the manager before actually spawnd the runner. + ensure!( + self.manager_ctx.try_insert_procedure(meta), + DuplicateProcedureSnafu { procedure_id }, + ); common_runtime::spawn_bg(async move { // Run the root procedure. - runner.run().await + let _ = runner.run().await; }); + + Ok(()) } } @@ -215,13 +338,13 @@ impl ProcedureManager for LocalManager { DuplicateProcedureSnafu { procedure_id } ); - self.submit_root(procedure.id, 0, procedure.procedure); + self.submit_root(procedure.id, 0, procedure.procedure)?; Ok(()) } async fn recover(&self) -> Result<()> { - unimplemented!() + todo!("Recover procedure and messages") } async fn procedure_state(&self, procedure_id: ProcedureId) -> Result> { @@ -231,23 +354,32 @@ impl ProcedureManager for LocalManager { /// Create a new [ProcedureMeta] for test purpose. #[cfg(test)] -fn procedure_meta_for_test() -> ProcedureMeta { - ProcedureMeta { - id: ProcedureId::random(), - lock_notify: Notify::new(), - parent_id: None, - child_notify: Notify::new(), - parent_locks: Vec::new(), - lock_key: None, - exec_meta: Mutex::new(ExecMeta { - state: ProcedureState::Running, - }), +mod test_util { + use object_store::services::fs::Builder; + use tempdir::TempDir; + + use super::*; + + pub(crate) fn procedure_meta_for_test() -> ProcedureMeta { + ProcedureMeta { + id: ProcedureId::random(), + lock_notify: Notify::new(), + parent_id: None, + child_notify: Notify::new(), + lock_key: None, + exec_meta: Mutex::new(ExecMeta::default()), + } + } + + pub(crate) fn new_object_store(dir: &TempDir) -> ObjectStore { + let store_dir = dir.path().to_str().unwrap(); + let accessor = Builder::default().root(store_dir).build().unwrap(); + ObjectStore::new(accessor) } } #[cfg(test)] mod tests { - use object_store::services::fs::Builder; use serde::{Deserialize, Serialize}; use tempdir::TempDir; @@ -255,40 +387,15 @@ mod tests { use crate::error::Error; use crate::{Context, Procedure, Status}; - #[test] - fn test_locks_needed() { - let mut meta = procedure_meta_for_test(); - let locks = meta.locks_needed(); - assert!(locks.is_empty()); - - let parent_locks = vec![LockKey::new("a"), LockKey::new("b")]; - meta.parent_locks = parent_locks.clone(); - let locks = meta.locks_needed(); - assert_eq!(parent_locks, locks); - - meta.lock_key = Some(LockKey::new("c")); - let locks = meta.locks_needed(); - assert_eq!( - vec![LockKey::new("a"), LockKey::new("b"), LockKey::new("c")], - locks - ); - } - - fn new_object_store(dir: &TempDir) -> ObjectStore { - let store_dir = dir.path().to_str().unwrap(); - let accessor = Builder::default().root(store_dir).build().unwrap(); - ObjectStore::new(accessor) - } - #[test] fn test_manager_context() { let ctx = ManagerContext::new(); - let meta = Arc::new(procedure_meta_for_test()); + let meta = Arc::new(test_util::procedure_meta_for_test()); assert!(!ctx.contains_procedure(meta.id)); assert!(ctx.state(meta.id).is_none()); - ctx.insert_procedure(meta.clone()); + assert!(ctx.try_insert_procedure(meta.clone())); assert!(ctx.contains_procedure(meta.id)); assert_eq!(ProcedureState::Running, ctx.state(meta.id).unwrap()); @@ -297,20 +404,54 @@ mod tests { } #[test] - #[should_panic] fn test_manager_context_insert_duplicate() { let ctx = ManagerContext::new(); - let meta = Arc::new(procedure_meta_for_test()); + let meta = Arc::new(test_util::procedure_meta_for_test()); - ctx.insert_procedure(meta.clone()); - ctx.insert_procedure(meta); + assert!(ctx.try_insert_procedure(meta.clone())); + assert!(!ctx.try_insert_procedure(meta)); + } + + fn new_child(parent_id: ProcedureId, ctx: &ManagerContext) -> ProcedureMetaRef { + let mut child = test_util::procedure_meta_for_test(); + child.parent_id = Some(parent_id); + let child = Arc::new(child); + assert!(ctx.try_insert_procedure(child.clone())); + + let mut parent = Vec::new(); + ctx.find_procedures(&[parent_id], &mut parent); + parent[0].push_child(child.id); + + child + } + + #[test] + fn test_procedures_in_tree() { + let ctx = ManagerContext::new(); + let root = Arc::new(test_util::procedure_meta_for_test()); + assert!(ctx.try_insert_procedure(root.clone())); + + assert_eq!(1, ctx.procedures_in_tree(&root).len()); + + let child1 = new_child(root.id, &ctx); + let child2 = new_child(root.id, &ctx); + + let child3 = new_child(child1.id, &ctx); + let child4 = new_child(child1.id, &ctx); + + let child5 = new_child(child2.id, &ctx); + + let expect = vec![ + root.id, child1.id, child2.id, child3.id, child4.id, child5.id, + ]; + assert_eq!(expect, ctx.procedures_in_tree(&root)); } #[test] fn test_register_loader() { let dir = TempDir::new("register").unwrap(); let config = ManagerConfig { - object_store: new_object_store(&dir), + object_store: test_util::new_object_store(&dir), }; let manager = LocalManager::new(config); @@ -363,7 +504,7 @@ mod tests { async fn test_submit_procedure() { let dir = TempDir::new("submit").unwrap(); let config = ManagerConfig { - object_store: new_object_store(&dir), + object_store: test_util::new_object_store(&dir), }; let manager = LocalManager::new(config); @@ -417,6 +558,6 @@ mod tests { }) .await .unwrap_err(); - assert!(matches!(err, Error::DuplicateProcedure { .. }), "{err:?}"); + assert!(matches!(err, Error::DuplicateProcedure { .. }), "{err}"); } } diff --git a/src/common/procedure/src/local/lock.rs b/src/common/procedure/src/local/lock.rs index 93e401ec59..2a14425da4 100644 --- a/src/common/procedure/src/local/lock.rs +++ b/src/common/procedure/src/local/lock.rs @@ -142,11 +142,11 @@ mod tests { use std::sync::Arc; use super::*; - use crate::local; + use crate::local::test_util; #[test] fn test_lock_no_waiter() { - let meta = Arc::new(local::procedure_meta_for_test()); + let meta = Arc::new(test_util::procedure_meta_for_test()); let mut lock = Lock::from_owner(meta); assert!(!lock.switch_owner()); @@ -154,10 +154,10 @@ mod tests { #[tokio::test] async fn test_lock_with_waiter() { - let owner = Arc::new(local::procedure_meta_for_test()); + let owner = Arc::new(test_util::procedure_meta_for_test()); let mut lock = Lock::from_owner(owner); - let waiter = Arc::new(local::procedure_meta_for_test()); + let waiter = Arc::new(test_util::procedure_meta_for_test()); lock.waiters.push_back(waiter.clone()); assert!(lock.switch_owner()); @@ -171,11 +171,11 @@ mod tests { async fn test_lock_map() { let key = "hello"; - let owner = Arc::new(local::procedure_meta_for_test()); + let owner = Arc::new(test_util::procedure_meta_for_test()); let lock_map = Arc::new(LockMap::new()); lock_map.acquire_lock(key, owner.clone()).await; - let waiter = Arc::new(local::procedure_meta_for_test()); + let waiter = Arc::new(test_util::procedure_meta_for_test()); let waiter_id = waiter.id; // Waiter release the lock, this should not take effect. diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index 0df557a0d9..b61ee17e40 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -12,14 +12,48 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; +use std::sync::{Arc, Mutex}; +use std::time::Duration; use common_telemetry::logging; +use tokio::sync::Notify; +use tokio::time; -use crate::local::{ManagerContext, ProcedureMetaRef}; +use crate::error::{Error, Result}; +use crate::local::{ExecMeta, ManagerContext, ProcedureMeta, ProcedureMetaRef}; use crate::store::ProcedureStore; -use crate::BoxedProcedure; +use crate::{BoxedProcedure, Context, ProcedureId, ProcedureState, ProcedureWithId, Status}; +const ERR_WAIT_DURATION: Duration = Duration::from_secs(30); + +#[derive(Debug)] +enum ExecResult { + Continue, + Done, + RetryLater, + Failed(Error), +} + +#[cfg(test)] +impl ExecResult { + fn is_continue(&self) -> bool { + matches!(self, ExecResult::Continue) + } + + fn is_done(&self) -> bool { + matches!(self, ExecResult::Done) + } + + fn is_retry_later(&self) -> bool { + matches!(self, ExecResult::RetryLater) + } + + fn is_failed(&self) -> bool { + matches!(self, ExecResult::Failed(_)) + } +} + +// TODO(yingwen): Support cancellation. pub(crate) struct Runner { pub(crate) meta: ProcedureMetaRef, pub(crate) procedure: BoxedProcedure, @@ -30,7 +64,7 @@ pub(crate) struct Runner { impl Runner { /// Run the procedure. - pub(crate) async fn run(self) { + pub(crate) async fn run(mut self) -> Result<()> { logging::info!( "Runner {}-{} starts", self.procedure.type_name(), @@ -40,7 +74,10 @@ impl Runner { // its parent. let lock_key = self.meta.lock_key.clone(); - // TODO(yingwen): Support multiple lock keys. + // TODO(yingwen): + // 1. Support multiple lock keys; + // 2. Detect recursive locking (and deadlock) if possible. Maybe we could detect + // recursive locking by adding a root procedure id to the meta. // Acquire lock if necessary. if let Some(key) = &lock_key { self.manager_ctx @@ -49,13 +86,30 @@ impl Runner { .await; } - // TODO(yingwen): Execute the procedure. + let mut result = Ok(()); + // Execute the procedure. We need to release the lock whenever the the execution + // is successful or fail. + if let Err(e) = self.execute_procedure_in_loop().await { + result = Err(e); + } + + // Notify parent procedure. + if let Some(parent_id) = self.meta.parent_id { + self.manager_ctx.notify_by_subprocedure(parent_id); + } if let Some(key) = &lock_key { self.manager_ctx .lock_map .release_lock(key.key(), self.meta.id); } + + // If this is the root procedure, clean up message cache. + if self.meta.parent_id.is_none() { + let procedure_ids = self.manager_ctx.procedures_in_tree(&self.meta); + self.manager_ctx.remove_messages(&procedure_ids); + } + // We can't remove the metadata of the procedure now as users and its parent might // need to query its state. // TODO(yingwen): 1. Add TTL to the metadata; 2. Only keep state in the procedure store @@ -66,5 +120,614 @@ impl Runner { self.procedure.type_name(), self.meta.id ); + + result + } + + async fn execute_procedure_in_loop(&mut self) -> Result<()> { + let ctx = Context { + procedure_id: self.meta.id, + }; + + loop { + match self.execute_once(&ctx).await { + ExecResult::Continue => (), + ExecResult::Done => return Ok(()), + ExecResult::RetryLater => { + self.wait_on_err().await; + } + ExecResult::Failed(e) => return Err(e), + } + } + } + + async fn execute_once(&mut self, ctx: &Context) -> ExecResult { + match self.procedure.execute(ctx).await { + Ok(status) => { + logging::debug!( + "Execute procedure {}-{} once, status: {:?}, need_persist: {}", + self.procedure.type_name(), + self.meta.id, + status, + status.need_persist(), + ); + + if status.need_persist() && self.persist_procedure().await.is_err() { + return ExecResult::RetryLater; + } + + match status { + Status::Executing { .. } => (), + Status::Suspended { subprocedures, .. } => { + self.on_suspended(subprocedures).await; + } + Status::Done => { + if self.commit_procedure().await.is_err() { + return ExecResult::RetryLater; + } + + self.done(); + return ExecResult::Done; + } + } + + ExecResult::Continue + } + Err(e) => { + logging::error!( + e; + "Failed to execute procedure {}-{}", + self.procedure.type_name(), + self.meta.id + ); + + self.meta.set_state(ProcedureState::Failed); + + // Write rollback key so we can skip this procedure while recovering procedures. + if self.rollback_procedure().await.is_err() { + return ExecResult::RetryLater; + } + + ExecResult::Failed(e) + } + } + } + + /// Submit a subprocedure with specific `procedure_id`. + fn submit_subprocedure(&self, procedure_id: ProcedureId, mut procedure: BoxedProcedure) { + if self.manager_ctx.contains_procedure(procedure_id) { + // If the parent has already submitted this procedure, don't submit it again. + return; + } + + let mut step = 0; + if let Some(loaded_procedure) = self.manager_ctx.load_one_procedure(procedure_id) { + // Try to load procedure state from the message to avoid re-run the subprocedure + // from initial state. + assert_eq!(self.meta.id, loaded_procedure.parent_id.unwrap()); + + // Use the dumped procedure from the procedure store. + procedure = loaded_procedure.procedure; + // Update step number. + step = loaded_procedure.step; + } + + let meta = Arc::new(ProcedureMeta { + id: procedure_id, + lock_notify: Notify::new(), + parent_id: Some(self.meta.id), + child_notify: Notify::new(), + lock_key: procedure.lock_key(), + exec_meta: Mutex::new(ExecMeta::default()), + }); + let runner = Runner { + meta: meta.clone(), + procedure, + manager_ctx: self.manager_ctx.clone(), + step, + store: self.store.clone(), + }; + + // Insert the procedure. We already check the procedure existence before inserting + // so we add an assertion to ensure the procedure id is unique and no other procedures + // using the same procedure id. + assert!( + self.manager_ctx.try_insert_procedure(meta), + "Procedure {}-{} submit an existing procedure {}-{}", + self.procedure.type_name(), + self.meta.id, + runner.procedure.type_name(), + procedure_id, + ); + + // Add the id of the subprocedure to the metadata. + self.meta.push_child(procedure_id); + + common_runtime::spawn_bg(async move { + // Run the root procedure. + runner.run().await + }); + } + + async fn wait_on_err(&self) { + time::sleep(ERR_WAIT_DURATION).await; + } + + async fn on_suspended(&self, subprocedures: Vec) { + let has_child = !subprocedures.is_empty(); + for subprocedure in subprocedures { + logging::info!( + "Procedure {}-{} submit subprocedure {}-{}", + self.procedure.type_name(), + self.meta.id, + subprocedure.procedure.type_name(), + subprocedure.id + ); + + self.submit_subprocedure(subprocedure.id, subprocedure.procedure); + } + + logging::info!( + "Procedure {}-{} is waiting for subprocedures", + self.procedure.type_name(), + self.meta.id, + ); + + // Wait for subprocedures. + if has_child { + self.meta.child_notify.notified().await; + + logging::info!( + "Procedure {}-{} is waked up", + self.procedure.type_name(), + self.meta.id, + ); + } + } + + async fn persist_procedure(&mut self) -> Result<()> { + self.store + .store_procedure( + self.meta.id, + self.step, + &self.procedure, + self.meta.parent_id, + ) + .await + .map_err(|e| { + logging::error!( + e; "Failed to persist procedure {}-{}", + self.procedure.type_name(), + self.meta.id + ); + e + })?; + self.step += 1; + Ok(()) + } + + async fn commit_procedure(&mut self) -> Result<()> { + self.store + .commit_procedure(self.meta.id, self.step) + .await + .map_err(|e| { + logging::error!( + e; "Failed to commit procedure {}-{}", + self.procedure.type_name(), + self.meta.id + ); + e + })?; + self.step += 1; + Ok(()) + } + + async fn rollback_procedure(&mut self) -> Result<()> { + self.store + .rollback_procedure(self.meta.id, self.step) + .await + .map_err(|e| { + logging::error!( + e; "Failed to write rollback key for procedure {}-{}", + self.procedure.type_name(), + self.meta.id + ); + e + })?; + self.step += 1; + Ok(()) + } + + fn done(&self) { + // TODO(yingwen): Add files to remove list. + logging::info!( + "Procedure {}-{} done", + self.procedure.type_name(), + self.meta.id + ); + + // Mark the state of this procedure to done. + self.meta.set_state(ProcedureState::Done); + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use async_trait::async_trait; + use common_error::ext::PlainError; + use common_error::mock::MockError; + use common_error::prelude::StatusCode; + use futures_util::future::BoxFuture; + use futures_util::{FutureExt, TryStreamExt}; + use object_store::ObjectStore; + use tempdir::TempDir; + + use super::*; + use crate::local::test_util; + use crate::store::ObjectStateStore; + use crate::{LockKey, Procedure}; + + const ROOT_ID: &str = "9f805a1f-05f7-490c-9f91-bd56e3cc54c1"; + + fn new_runner( + meta: ProcedureMetaRef, + procedure: BoxedProcedure, + store: ProcedureStore, + ) -> Runner { + Runner { + meta, + procedure, + manager_ctx: Arc::new(ManagerContext::new()), + step: 0, + store, + } + } + + fn new_procedure_store(object_store: ObjectStore) -> ProcedureStore { + let state_store = ObjectStateStore::new(object_store); + + ProcedureStore::new(Arc::new(state_store)) + } + + async fn check_files(object_store: &ObjectStore, procedure_id: ProcedureId, files: &[&str]) { + let dir = format!("{procedure_id}/"); + let object = object_store.object(&dir); + let lister = object.list().await.unwrap(); + let mut files_in_dir: Vec<_> = lister + .map_ok(|de| de.name().to_string()) + .try_collect() + .await + .unwrap(); + files_in_dir.sort_unstable(); + assert_eq!(files, files_in_dir); + } + + #[derive(Debug)] + struct ProcedureAdapter { + data: String, + lock_key: Option, + exec_fn: F, + } + + impl ProcedureAdapter { + fn new_meta(&self, uuid: &str) -> ProcedureMetaRef { + let mut meta = test_util::procedure_meta_for_test(); + meta.id = ProcedureId::parse_str(uuid).unwrap(); + meta.lock_key = self.lock_key.clone(); + + Arc::new(meta) + } + } + + #[async_trait] + impl Procedure for ProcedureAdapter + where + F: FnMut() -> BoxFuture<'static, Result> + Send + Sync, + { + fn type_name(&self) -> &str { + "ProcedureAdapter" + } + + async fn execute(&mut self, _ctx: &Context) -> Result { + let f = (self.exec_fn)(); + f.await + } + + fn dump(&self) -> Result { + Ok(self.data.clone()) + } + + fn lock_key(&self) -> Option { + self.lock_key.clone() + } + } + + async fn execute_once_normal(persist: bool, first_files: &[&str], second_files: &[&str]) { + let mut times = 0; + let exec_fn = move || { + times += 1; + async move { + if times == 1 { + Ok(Status::Executing { persist }) + } else { + Ok(Status::Done) + } + } + .boxed() + }; + let normal = ProcedureAdapter { + data: "normal".to_string(), + lock_key: Some(LockKey::new("catalog.schema.table")), + exec_fn, + }; + + let dir = TempDir::new("normal").unwrap(); + let meta = normal.new_meta(ROOT_ID); + let ctx = Context { + procedure_id: meta.id, + }; + let object_store = test_util::new_object_store(&dir); + let procedure_store = new_procedure_store(object_store.clone()); + let mut runner = new_runner(meta, Box::new(normal), procedure_store); + + let res = runner.execute_once(&ctx).await; + assert!(res.is_continue(), "{res:?}"); + check_files(&object_store, ctx.procedure_id, first_files).await; + + let res = runner.execute_once(&ctx).await; + assert!(res.is_done(), "{res:?}"); + check_files(&object_store, ctx.procedure_id, second_files).await; + } + + #[tokio::test] + async fn test_execute_once_normal() { + execute_once_normal( + true, + &["0000000000.step"], + &["0000000000.step", "0000000001.commit"], + ) + .await; + } + + #[tokio::test] + async fn test_execute_once_normal_skip_persist() { + execute_once_normal(false, &[], &["0000000000.commit"]).await; + } + + #[tokio::test] + async fn test_on_suspend_empty() { + let exec_fn = move || { + async move { + Ok(Status::Suspended { + subprocedures: Vec::new(), + persist: false, + }) + } + .boxed() + }; + let suspend = ProcedureAdapter { + data: "suspend".to_string(), + lock_key: Some(LockKey::new("catalog.schema.table")), + exec_fn, + }; + + let dir = TempDir::new("suspend").unwrap(); + let meta = suspend.new_meta(ROOT_ID); + let ctx = Context { + procedure_id: meta.id, + }; + let object_store = test_util::new_object_store(&dir); + let procedure_store = new_procedure_store(object_store.clone()); + let mut runner = new_runner(meta, Box::new(suspend), procedure_store); + + let res = runner.execute_once(&ctx).await; + assert!(res.is_continue(), "{res:?}"); + } + + fn new_child_procedure(procedure_id: ProcedureId, key: &str) -> ProcedureWithId { + let mut times = 0; + let exec_fn = move || { + times += 1; + async move { + if times == 1 { + time::sleep(Duration::from_millis(200)).await; + Ok(Status::Executing { persist: true }) + } else { + Ok(Status::Done) + } + } + .boxed() + }; + let child = ProcedureAdapter { + data: "child".to_string(), + lock_key: Some(LockKey::new(key)), + exec_fn, + }; + + ProcedureWithId { + id: procedure_id, + procedure: Box::new(child), + } + } + + #[tokio::test] + async fn test_on_suspend_by_subprocedures() { + let mut times = 0; + let children_ids = [ProcedureId::random(), ProcedureId::random()]; + let keys = [ + "catalog.schema.table.region-0", + "catalog.schema.table.region-1", + ]; + let manager_ctx = Arc::new(ManagerContext::new()); + + let ctx_in_fn = manager_ctx.clone(); + let exec_fn = move || { + times += 1; + let ctx_in_future = ctx_in_fn.clone(); + + async move { + if times == 1 { + // Submit subprocedures. + Ok(Status::Suspended { + subprocedures: children_ids + .into_iter() + .zip(keys) + .map(|(id, key)| new_child_procedure(id, key)) + .collect(), + persist: true, + }) + } else { + // Wait for subprocedures. + let all_child_done = children_ids + .iter() + .all(|id| ctx_in_future.state(*id) == Some(ProcedureState::Done)); + if all_child_done { + Ok(Status::Done) + } else { + // Return suspended to wait for notify. + Ok(Status::Suspended { + subprocedures: Vec::new(), + persist: false, + }) + } + } + } + .boxed() + }; + let parent = ProcedureAdapter { + data: "parent".to_string(), + lock_key: Some(LockKey::new("catalog.schema.table")), + exec_fn, + }; + + let dir = TempDir::new("parent").unwrap(); + let meta = parent.new_meta(ROOT_ID); + let procedure_id = meta.id; + // Manually add this procedure to the manager ctx. + assert!(manager_ctx.try_insert_procedure(meta.clone())); + + let object_store = test_util::new_object_store(&dir); + let procedure_store = new_procedure_store(object_store.clone()); + let mut runner = new_runner(meta, Box::new(parent), procedure_store); + // Replace the manager ctx. + runner.manager_ctx = manager_ctx; + + runner.run().await.unwrap(); + + // Check files on store. + for child_id in children_ids { + check_files( + &object_store, + child_id, + &["0000000000.step", "0000000001.commit"], + ) + .await; + } + check_files( + &object_store, + procedure_id, + &["0000000000.step", "0000000001.commit"], + ) + .await; + } + + #[tokio::test] + async fn test_execute_on_error() { + let exec_fn = + || async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed(); + let fail = ProcedureAdapter { + data: "fail".to_string(), + lock_key: Some(LockKey::new("catalog.schema.table")), + exec_fn, + }; + + let dir = TempDir::new("fail").unwrap(); + let meta = fail.new_meta(ROOT_ID); + let ctx = Context { + procedure_id: meta.id, + }; + let object_store = test_util::new_object_store(&dir); + let procedure_store = new_procedure_store(object_store.clone()); + let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store); + + let res = runner.execute_once(&ctx).await; + assert!(res.is_failed(), "{res:?}"); + assert_eq!(ProcedureState::Failed, meta.state()); + check_files(&object_store, ctx.procedure_id, &["0000000000.rollback"]).await; + } + + #[tokio::test] + async fn test_child_error() { + let mut times = 0; + let manager_ctx = Arc::new(ManagerContext::new()); + let child_id = ProcedureId::random(); + + let ctx_in_fn = manager_ctx.clone(); + let exec_fn = move || { + times += 1; + let ctx_in_future = ctx_in_fn.clone(); + + async move { + if times == 1 { + // Submit subprocedures. + let exec_fn = || { + async { Err(Error::external(MockError::new(StatusCode::Unexpected))) } + .boxed() + }; + let fail = ProcedureAdapter { + data: "fail".to_string(), + lock_key: Some(LockKey::new("catalog.schema.table.region-0")), + exec_fn, + }; + + Ok(Status::Suspended { + subprocedures: vec![ProcedureWithId { + id: child_id, + procedure: Box::new(fail), + }], + persist: true, + }) + } else { + // Wait for subprocedures. + logging::info!("child state is {:?}", ctx_in_future.state(child_id)); + if ctx_in_future.state(child_id) == Some(ProcedureState::Failed) { + // The parent procedure to abort itself if child procedure is failed. + Err(Error::external(PlainError::new( + "subprocedure failed".to_string(), + StatusCode::Unexpected, + ))) + } else { + // Return suspended to wait for notify. + Ok(Status::Suspended { + subprocedures: Vec::new(), + persist: false, + }) + } + } + } + .boxed() + }; + let parent = ProcedureAdapter { + data: "parent".to_string(), + lock_key: Some(LockKey::new("catalog.schema.table")), + exec_fn, + }; + + let dir = TempDir::new("child_err").unwrap(); + let meta = parent.new_meta(ROOT_ID); + // Manually add this procedure to the manager ctx. + assert!(manager_ctx.try_insert_procedure(meta.clone())); + + let object_store = test_util::new_object_store(&dir); + let procedure_store = new_procedure_store(object_store.clone()); + let mut runner = new_runner(meta, Box::new(parent), procedure_store); + // Replace the manager ctx. + runner.manager_ctx = manager_ctx; + + // Run the runer and execute the procedure. + let err = runner.run().await.unwrap_err(); + assert!(err.to_string().contains("subprocedure failed"), "{err}"); } } diff --git a/src/common/procedure/src/procedure.rs b/src/common/procedure/src/procedure.rs index 57337b4b80..627322552c 100644 --- a/src/common/procedure/src/procedure.rs +++ b/src/common/procedure/src/procedure.rs @@ -24,6 +24,7 @@ use uuid::Uuid; use crate::error::Result; /// Procedure execution status. +#[derive(Debug)] pub enum Status { /// The procedure is still executing. Executing { @@ -121,6 +122,12 @@ impl ProcedureWithId { } } +impl fmt::Debug for ProcedureWithId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}-{}", self.procedure.type_name(), self.id) + } +} + #[derive(Debug, Snafu)] pub struct ParseIdError { source: uuid::Error, diff --git a/src/common/procedure/src/store.rs b/src/common/procedure/src/store.rs index ae862dcf34..efdebf3329 100644 --- a/src/common/procedure/src/store.rs +++ b/src/common/procedure/src/store.rs @@ -36,6 +36,8 @@ pub struct ProcedureMessage { pub data: String, /// Parent procedure id. pub parent_id: Option, + /// Current step. + pub step: u32, } /// Procedure storage layer. @@ -49,7 +51,7 @@ impl ProcedureStore { } /// Dump the `procedure` to the storage. - async fn store_procedure( + pub(crate) async fn store_procedure( &self, procedure_id: ProcedureId, step: u32, @@ -63,11 +65,12 @@ impl ProcedureStore { type_name: type_name.to_string(), data, parent_id, + step, }; let key = ParsedKey { procedure_id, step, - is_committed: false, + key_type: KeyType::Step, } .to_string(); let value = serde_json::to_string(&message).context(ToJsonSnafu)?; @@ -78,11 +81,32 @@ impl ProcedureStore { } /// Write commit flag to the storage. - async fn commit_procedure(&self, procedure_id: ProcedureId, step: u32) -> Result<()> { + pub(crate) async fn commit_procedure( + &self, + procedure_id: ProcedureId, + step: u32, + ) -> Result<()> { let key = ParsedKey { procedure_id, step, - is_committed: true, + key_type: KeyType::Commit, + } + .to_string(); + self.0.put(&key, Vec::new()).await?; + + Ok(()) + } + + /// Write rollback flag to the storage. + pub(crate) async fn rollback_procedure( + &self, + procedure_id: ProcedureId, + step: u32, + ) -> Result<()> { + let key = ParsedKey { + procedure_id, + step, + key_type: KeyType::Rollback, } .to_string(); self.0.put(&key, Vec::new()).await?; @@ -115,7 +139,7 @@ impl ProcedureStore { } for (procedure_id, (parsed_key, value)) in procedure_key_values { - if !parsed_key.is_committed { + if parsed_key.key_type == KeyType::Step { let Some(message) = self.load_one_message(&parsed_key, &value) else { // We don't abort the loading process and just ignore errors to ensure all remaining // procedures are loaded. @@ -139,12 +163,39 @@ impl ProcedureStore { } } +/// Suffix type of the key. +#[derive(Debug, PartialEq, Eq)] +enum KeyType { + Step, + Commit, + Rollback, +} + +impl KeyType { + fn as_str(&self) -> &'static str { + match self { + KeyType::Step => "step", + KeyType::Commit => "commit", + KeyType::Rollback => "rollback", + } + } + + fn from_str(s: &str) -> Option { + match s { + "step" => Some(KeyType::Step), + "commit" => Some(KeyType::Commit), + "rollback" => Some(KeyType::Rollback), + _ => None, + } + } +} + /// Key to refer the procedure in the [ProcedureStore]. #[derive(Debug, PartialEq, Eq)] struct ParsedKey { procedure_id: ProcedureId, step: u32, - is_committed: bool, + key_type: KeyType, } impl fmt::Display for ParsedKey { @@ -154,7 +205,7 @@ impl fmt::Display for ParsedKey { "{}/{:010}.{}", self.procedure_id, self.step, - if self.is_committed { "commit" } else { "step" } + self.key_type.as_str(), ) } } @@ -171,17 +222,13 @@ impl ParsedKey { let mut parts = name.split('.'); let step_str = parts.next()?; let suffix = parts.next()?; - let is_committed = match suffix { - "commit" => true, - "step" => false, - _ => return None, - }; + let key_type = KeyType::from_str(suffix)?; let step = step_str.parse().ok()?; Some(ParsedKey { procedure_id, step, - is_committed, + key_type, }) } } @@ -196,16 +243,24 @@ mod tests { use tempdir::TempDir; use super::*; - use crate::store::state_store::ObjectStateStore; use crate::{Context, LockKey, Procedure, Status}; + fn procedure_store_for_test(dir: &TempDir) -> ProcedureStore { + let store_dir = dir.path().to_str().unwrap(); + let accessor = Builder::default().root(store_dir).build().unwrap(); + let object_store = ObjectStore::new(accessor); + let state_store = ObjectStateStore::new(object_store); + + ProcedureStore::new(Arc::new(state_store)) + } + #[test] fn test_parsed_key() { let procedure_id = ProcedureId::random(); let key = ParsedKey { procedure_id, step: 2, - is_committed: false, + key_type: KeyType::Step, }; assert_eq!(format!("{procedure_id}/0000000002.step"), key.to_string()); assert_eq!(key, ParsedKey::parse_str(&key.to_string()).unwrap()); @@ -213,10 +268,21 @@ mod tests { let key = ParsedKey { procedure_id, step: 2, - is_committed: true, + key_type: KeyType::Commit, }; assert_eq!(format!("{procedure_id}/0000000002.commit"), key.to_string()); assert_eq!(key, ParsedKey::parse_str(&key.to_string()).unwrap()); + + let key = ParsedKey { + procedure_id, + step: 2, + key_type: KeyType::Rollback, + }; + assert_eq!( + format!("{procedure_id}/0000000002.rollback"), + key.to_string() + ); + assert_eq!(key, ParsedKey::parse_str(&key.to_string()).unwrap()); } #[test] @@ -251,12 +317,13 @@ mod tests { type_name: "TestMessage".to_string(), data: "no parent id".to_string(), parent_id: None, + step: 4, }; let json = serde_json::to_string(&message).unwrap(); assert_eq!( json, - r#"{"type_name":"TestMessage","data":"no parent id","parent_id":null}"# + r#"{"type_name":"TestMessage","data":"no parent id","parent_id":null,"step":4}"# ); let procedure_id = ProcedureId::parse_str("9f805a1f-05f7-490c-9f91-bd56e3cc54c1").unwrap(); @@ -264,7 +331,7 @@ mod tests { let json = serde_json::to_string(&message).unwrap(); assert_eq!( json, - r#"{"type_name":"TestMessage","data":"no parent id","parent_id":"9f805a1f-05f7-490c-9f91-bd56e3cc54c1"}"# + r#"{"type_name":"TestMessage","data":"no parent id","parent_id":"9f805a1f-05f7-490c-9f91-bd56e3cc54c1","step":4}"# ); } @@ -297,19 +364,10 @@ mod tests { } } - fn new_procedure_store(dir: &TempDir) -> ProcedureStore { - let store_dir = dir.path().to_str().unwrap(); - let accessor = Builder::default().root(store_dir).build().unwrap(); - let object_store = ObjectStore::new(accessor); - let state_store = ObjectStateStore::new(object_store); - - ProcedureStore(Arc::new(state_store)) - } - #[tokio::test] async fn test_store_procedure() { let dir = TempDir::new("store_procedure").unwrap(); - let store = new_procedure_store(&dir); + let store = procedure_store_for_test(&dir); let procedure_id = ProcedureId::random(); let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure")); @@ -326,6 +384,7 @@ mod tests { type_name: "MockProcedure".to_string(), data: "test store procedure".to_string(), parent_id: None, + step: 0, }; assert_eq!(expect, *msg); } @@ -333,7 +392,7 @@ mod tests { #[tokio::test] async fn test_commit_procedure() { let dir = TempDir::new("commit_procedure").unwrap(); - let store = new_procedure_store(&dir); + let store = procedure_store_for_test(&dir); let procedure_id = ProcedureId::random(); let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure")); @@ -348,10 +407,28 @@ mod tests { assert!(messages.is_empty()); } + #[tokio::test] + async fn test_rollback_procedure() { + let dir = TempDir::new("rollback_procedure").unwrap(); + let store = procedure_store_for_test(&dir); + + let procedure_id = ProcedureId::random(); + let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure")); + + store + .store_procedure(procedure_id, 0, &procedure, None) + .await + .unwrap(); + store.rollback_procedure(procedure_id, 1).await.unwrap(); + + let messages = store.load_messages().await.unwrap(); + assert!(messages.is_empty()); + } + #[tokio::test] async fn test_load_messages() { let dir = TempDir::new("load_messages").unwrap(); - let store = new_procedure_store(&dir); + let store = procedure_store_for_test(&dir); // store 3 steps let id0 = ProcedureId::random();