diff --git a/src/common/procedure/src/lib.rs b/src/common/procedure/src/lib.rs index 88b84e0358..967c7fefe7 100644 --- a/src/common/procedure/src/lib.rs +++ b/src/common/procedure/src/lib.rs @@ -22,5 +22,5 @@ mod store; pub use crate::error::{Error, Result}; pub use crate::procedure::{ BoxedProcedure, Context, ContextProvider, LockKey, Procedure, ProcedureId, ProcedureManager, - ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, + ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, Watcher, }; diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index f9e0d51c64..468c8f6dcd 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -22,6 +22,7 @@ use async_trait::async_trait; use common_telemetry::logging; use object_store::ObjectStore; use snafu::ensure; +use tokio::sync::watch::{self, Receiver, Sender}; use tokio::sync::Notify; use crate::error::{DuplicateProcedureSnafu, LoaderConflictSnafu, Result}; @@ -31,27 +32,9 @@ use crate::procedure::BoxedProcedureLoader; use crate::store::{ObjectStateStore, ProcedureMessage, ProcedureStore, StateStoreRef}; use crate::{ BoxedProcedure, ContextProvider, LockKey, ProcedureId, ProcedureManager, ProcedureState, - ProcedureWithId, + ProcedureWithId, Watcher, }; -/// Mutable metadata of a procedure during execution. -#[derive(Debug)] -struct ExecMeta { - /// Current procedure state. - state: ProcedureState, - /// Id of child procedures. - children: Vec, -} - -impl Default for ExecMeta { - fn default() -> ExecMeta { - ExecMeta { - state: ProcedureState::Running, - children: Vec::new(), - } - } -} - /// Shared metadata of a procedure. /// /// # Note @@ -72,38 +55,55 @@ pub(crate) struct ProcedureMeta { child_notify: Notify, /// Lock required by this procedure. lock_key: LockKey, - /// Mutable status during execution. - exec_meta: Mutex, + /// Sender to notify the procedure state. + state_sender: Sender, + /// Receiver to watch the procedure state. + state_receiver: Receiver, + /// Id of child procedures. + children: Mutex>, } impl ProcedureMeta { + fn new(id: ProcedureId, parent_id: Option, lock_key: LockKey) -> ProcedureMeta { + let (state_sender, state_receiver) = watch::channel(ProcedureState::Running); + ProcedureMeta { + id, + lock_notify: Notify::new(), + parent_id, + child_notify: Notify::new(), + lock_key, + state_sender, + state_receiver, + children: Mutex::new(Vec::new()), + } + } + /// Returns current [ProcedureState]. fn state(&self) -> ProcedureState { - let meta = self.exec_meta.lock().unwrap(); - meta.state.clone() + self.state_receiver.borrow().clone() } /// Update current [ProcedureState]. fn set_state(&self, state: ProcedureState) { - let mut meta = self.exec_meta.lock().unwrap(); - meta.state = state; + // Safety: ProcedureMeta also holds the receiver, so `send()` should never fail. + self.state_sender.send(state).unwrap(); } /// Push `procedure_id` of the subprocedure to the metadata. fn push_child(&self, procedure_id: ProcedureId) { - let mut meta = self.exec_meta.lock().unwrap(); - meta.children.push(procedure_id); + let mut children = self.children.lock().unwrap(); + children.push(procedure_id); } /// Append subprocedures to given `buffer`. fn list_children(&self, buffer: &mut Vec) { - let meta = self.exec_meta.lock().unwrap(); - buffer.extend_from_slice(&meta.children); + let children = self.children.lock().unwrap(); + buffer.extend_from_slice(&children); } /// Returns the number of subprocedures. fn num_children(&self) -> usize { - self.exec_meta.lock().unwrap().children.len() + self.children.lock().unwrap().len() } } @@ -175,6 +175,14 @@ impl ManagerContext { procedures.get(&procedure_id).map(|meta| meta.state()) } + /// Returns the [Watcher] of specific `procedure_id`. + fn watcher(&self, procedure_id: ProcedureId) -> Option { + let procedures = self.procedures.read().unwrap(); + procedures + .get(&procedure_id) + .map(|meta| meta.state_receiver.clone()) + } + /// Notify a suspended parent procedure with specific `procedure_id` by its subprocedure. fn notify_by_subprocedure(&self, procedure_id: ProcedureId) { let procedures = self.procedures.read().unwrap(); @@ -308,15 +316,8 @@ impl LocalManager { procedure_id: ProcedureId, step: u32, procedure: BoxedProcedure, - ) -> Result<()> { - let meta = Arc::new(ProcedureMeta { - id: procedure_id, - lock_notify: Notify::new(), - parent_id: None, - child_notify: Notify::new(), - lock_key: procedure.lock_key(), - exec_meta: Mutex::new(ExecMeta::default()), - }); + ) -> Result { + let meta = Arc::new(ProcedureMeta::new(procedure_id, None, procedure.lock_key())); let runner = Runner { meta: meta.clone(), procedure, @@ -325,6 +326,8 @@ impl LocalManager { store: ProcedureStore::new(self.state_store.clone()), }; + let watcher = meta.state_receiver.clone(); + // Inserts meta into the manager before actually spawnd the runner. ensure!( self.manager_ctx.try_insert_procedure(meta), @@ -336,7 +339,7 @@ impl LocalManager { let _ = runner.run().await; }); - Ok(()) + Ok(watcher) } } @@ -351,16 +354,14 @@ impl ProcedureManager for LocalManager { Ok(()) } - async fn submit(&self, procedure: ProcedureWithId) -> Result<()> { + async fn submit(&self, procedure: ProcedureWithId) -> Result { let procedure_id = procedure.id; ensure!( !self.manager_ctx.contains_procedure(procedure_id), DuplicateProcedureSnafu { procedure_id } ); - self.submit_root(procedure.id, 0, procedure.procedure)?; - - Ok(()) + self.submit_root(procedure.id, 0, procedure.procedure) } async fn recover(&self) -> Result<()> { @@ -401,6 +402,10 @@ impl ProcedureManager for LocalManager { async fn procedure_state(&self, procedure_id: ProcedureId) -> Result> { Ok(self.manager_ctx.state(procedure_id)) } + + fn procedure_watcher(&self, procedure_id: ProcedureId) -> Option { + self.manager_ctx.watcher(procedure_id) + } } /// Create a new [ProcedureMeta] for test purpose. @@ -412,14 +417,7 @@ mod test_util { use super::*; pub(crate) fn procedure_meta_for_test() -> ProcedureMeta { - ProcedureMeta { - id: ProcedureId::random(), - lock_notify: Notify::new(), - parent_id: None, - child_notify: Notify::new(), - lock_key: LockKey::default(), - exec_meta: Mutex::new(ExecMeta::default()), - } + ProcedureMeta::new(ProcedureId::random(), None, LockKey::default()) } pub(crate) fn new_object_store(dir: &TempDir) -> ObjectStore { @@ -431,6 +429,8 @@ mod test_util { #[cfg(test)] mod tests { + use common_error::mock::MockError; + use common_error::prelude::StatusCode; use tempdir::TempDir; use super::*; @@ -500,6 +500,7 @@ mod tests { #[derive(Debug)] struct ProcedureToLoad { content: String, + lock_key: LockKey, } #[async_trait] @@ -517,7 +518,7 @@ mod tests { } fn lock_key(&self) -> LockKey { - LockKey::default() + self.lock_key.clone() } } @@ -525,6 +526,7 @@ mod tests { fn new(content: &str) -> ProcedureToLoad { ProcedureToLoad { content: content.to_string(), + lock_key: LockKey::default(), } } @@ -608,39 +610,20 @@ mod tests { }; let manager = LocalManager::new(config); - #[derive(Debug)] - struct MockProcedure {} - - #[async_trait] - impl Procedure for MockProcedure { - fn type_name(&self) -> &str { - "MockProcedure" - } - - async fn execute(&mut self, _ctx: &Context) -> Result { - unimplemented!() - } - - fn dump(&self) -> Result { - unimplemented!() - } - - fn lock_key(&self) -> LockKey { - LockKey::single("test.submit") - } - } - let procedure_id = ProcedureId::random(); assert!(manager .procedure_state(procedure_id) .await .unwrap() .is_none()); + assert!(manager.procedure_watcher(procedure_id).is_none()); + let mut procedure = ProcedureToLoad::new("submit"); + procedure.lock_key = LockKey::single("test.submit"); manager .submit(ProcedureWithId { id: procedure_id, - procedure: Box::new(MockProcedure {}), + procedure: Box::new(procedure), }) .await .unwrap(); @@ -649,15 +632,77 @@ mod tests { .await .unwrap() .is_some()); + // Wait for the procedure done. + let mut watcher = manager.procedure_watcher(procedure_id).unwrap(); + watcher.changed().await.unwrap(); + assert_eq!(ProcedureState::Done, *watcher.borrow()); // Try to submit procedure with same id again. let err = manager .submit(ProcedureWithId { id: procedure_id, - procedure: Box::new(MockProcedure {}), + procedure: Box::new(ProcedureToLoad::new("submit")), }) .await .unwrap_err(); assert!(matches!(err, Error::DuplicateProcedure { .. }), "{err}"); } + + #[tokio::test] + async fn test_state_changed_on_err() { + let dir = TempDir::new("on_err").unwrap(); + let config = ManagerConfig { + object_store: test_util::new_object_store(&dir), + }; + let manager = LocalManager::new(config); + + #[derive(Debug)] + struct MockProcedure { + panic: bool, + } + + #[async_trait] + impl Procedure for MockProcedure { + fn type_name(&self) -> &str { + "MockProcedure" + } + + async fn execute(&mut self, _ctx: &Context) -> Result { + if self.panic { + // Test the runner can set the state to failed even the procedure + // panics. + panic!(); + } else { + Err(Error::external(MockError::new(StatusCode::Unexpected))) + } + } + + fn dump(&self) -> Result { + Ok(String::new()) + } + + fn lock_key(&self) -> LockKey { + LockKey::single("test.submit") + } + } + + let check_procedure = |procedure| { + async { + let procedure_id = ProcedureId::random(); + let mut watcher = manager + .submit(ProcedureWithId { + id: procedure_id, + procedure: Box::new(procedure), + }) + .await + .unwrap(); + // Wait for the notification. + watcher.changed().await.unwrap(); + assert_eq!(ProcedureState::Failed, *watcher.borrow()); + } + }; + + check_procedure(MockProcedure { panic: false }).await; + check_procedure(MockProcedure { panic: true }).await; + } } diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index 90a03d51f2..a5c19b0dc3 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -12,15 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::time::Duration; use common_telemetry::logging; -use tokio::sync::Notify; use tokio::time; use crate::error::{Error, Result}; -use crate::local::{ExecMeta, ManagerContext, ProcedureMeta, ProcedureMetaRef}; +use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef}; use crate::store::ProcedureStore; use crate::{BoxedProcedure, Context, ProcedureId, ProcedureState, ProcedureWithId, Status}; @@ -49,6 +48,52 @@ impl ExecResult { } } +/// A guard to cleanup procedure state. +struct ProcedureGuard { + meta: ProcedureMetaRef, + manager_ctx: Arc, + finish: bool, +} + +impl ProcedureGuard { + /// Returns a new [ProcedureGuard]. + fn new(meta: ProcedureMetaRef, manager_ctx: Arc) -> ProcedureGuard { + ProcedureGuard { + meta, + manager_ctx, + finish: false, + } + } + + /// The procedure is finished successfully. + fn finish(mut self) { + self.finish = true; + } +} + +impl Drop for ProcedureGuard { + fn drop(&mut self) { + if !self.finish { + logging::error!("Procedure {} exits unexpectedly", self.meta.id); + + // Set state to failed. This is useful in test as runtime may not abort when the runner task panics. + // See https://github.com/tokio-rs/tokio/issues/2002 . + // We set set_panic_hook() in the application's main function. But our tests don't have this panic hook. + self.meta.set_state(ProcedureState::Failed); + } + + // Notify parent procedure. + if let Some(parent_id) = self.meta.parent_id { + self.manager_ctx.notify_by_subprocedure(parent_id); + } + + // Release lock in reverse order. + for key in self.meta.lock_key.keys_to_unlock() { + self.manager_ctx.lock_map.release_lock(key, self.meta.id); + } + } +} + // TODO(yingwen): Support cancellation. pub(crate) struct Runner { pub(crate) meta: ProcedureMetaRef, @@ -61,6 +106,9 @@ pub(crate) struct Runner { impl Runner { /// Run the procedure. pub(crate) async fn run(mut self) -> Result<()> { + // Ensure we can update the procedure state. + let guard = ProcedureGuard::new(self.meta.clone(), self.manager_ctx.clone()); + logging::info!( "Runner {}-{} starts", self.procedure.type_name(), @@ -84,15 +132,13 @@ impl Runner { result = Err(e); } - // Notify parent procedure. - if let Some(parent_id) = self.meta.parent_id { - self.manager_ctx.notify_by_subprocedure(parent_id); - } + // We can't remove the metadata of the procedure now as users and its parent might + // need to query its state. + // TODO(yingwen): 1. Add TTL to the metadata; 2. Only keep state in the procedure store + // so we don't need to always store the metadata in memory after the procedure is done. - // Release lock in reverse order. - for key in self.meta.lock_key.keys_to_unlock() { - self.manager_ctx.lock_map.release_lock(key, self.meta.id); - } + // Release locks and notify parent procedure. + guard.finish(); // If this is the root procedure, clean up message cache. if self.meta.parent_id.is_none() { @@ -100,11 +146,6 @@ impl Runner { self.manager_ctx.remove_messages(&procedure_ids); } - // We can't remove the metadata of the procedure now as users and its parent might - // need to query its state. - // TODO(yingwen): 1. Add TTL to the metadata; 2. Only keep state in the procedure store - // so we don't need to always store the metadata in memory after the procedure is done. - logging::info!( "Runner {}-{} exits", self.procedure.type_name(), @@ -203,14 +244,11 @@ impl Runner { step = loaded_procedure.step; } - let meta = Arc::new(ProcedureMeta { - id: procedure_id, - lock_notify: Notify::new(), - parent_id: Some(self.meta.id), - child_notify: Notify::new(), - lock_key: procedure.lock_key(), - exec_meta: Mutex::new(ExecMeta::default()), - }); + let meta = Arc::new(ProcedureMeta::new( + procedure_id, + Some(self.meta.id), + procedure.lock_key(), + )); let runner = Runner { meta: meta.clone(), procedure, diff --git a/src/common/procedure/src/procedure.rs b/src/common/procedure/src/procedure.rs index d9aff5955f..e5b8dc8491 100644 --- a/src/common/procedure/src/procedure.rs +++ b/src/common/procedure/src/procedure.rs @@ -20,6 +20,7 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; use smallvec::{smallvec, SmallVec}; use snafu::{ResultExt, Snafu}; +use tokio::sync::watch::Receiver; use uuid::Uuid; use crate::error::Result; @@ -209,6 +210,9 @@ pub enum ProcedureState { Failed, } +/// Watcher to watch procedure state. +pub type Watcher = Receiver; + // TODO(yingwen): Shutdown /// `ProcedureManager` executes [Procedure] submitted to it. #[async_trait] @@ -217,7 +221,9 @@ pub trait ProcedureManager: Send + Sync + 'static { fn register_loader(&self, name: &str, loader: BoxedProcedureLoader) -> Result<()>; /// Submits a procedure to execute. - async fn submit(&self, procedure: ProcedureWithId) -> Result<()>; + /// + /// Returns a [Watcher] to watch the created procedure. + async fn submit(&self, procedure: ProcedureWithId) -> Result; /// Recovers unfinished procedures and reruns them. /// @@ -228,6 +234,9 @@ pub trait ProcedureManager: Send + Sync + 'static { /// /// Returns `Ok(None)` if the procedure doesn't exist. async fn procedure_state(&self, procedure_id: ProcedureId) -> Result>; + + /// Returns a [Watcher] to watch [ProcedureState] of specific procedure. + fn procedure_watcher(&self, procedure_id: ProcedureId) -> Option; } /// Ref-counted pointer to the [ProcedureManager].