feat(procedure): Add procedure watcher (#1043)

* refactor: Use watch channel to store ProcedureState

* feat: Add a watcher to wait for state change

* test: test watcher on procedure failure

* feat: Only clear message cache on success

* feat: submit returns Watcher
This commit is contained in:
Yingwen
2023-02-21 17:19:39 +08:00
committed by GitHub
parent e17d5a1c41
commit c6f2db8ae0
4 changed files with 195 additions and 103 deletions

View File

@@ -22,5 +22,5 @@ mod store;
pub use crate::error::{Error, Result};
pub use crate::procedure::{
BoxedProcedure, Context, ContextProvider, LockKey, Procedure, ProcedureId, ProcedureManager,
ProcedureManagerRef, ProcedureState, ProcedureWithId, Status,
ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, Watcher,
};

View File

@@ -22,6 +22,7 @@ use async_trait::async_trait;
use common_telemetry::logging;
use object_store::ObjectStore;
use snafu::ensure;
use tokio::sync::watch::{self, Receiver, Sender};
use tokio::sync::Notify;
use crate::error::{DuplicateProcedureSnafu, LoaderConflictSnafu, Result};
@@ -31,27 +32,9 @@ use crate::procedure::BoxedProcedureLoader;
use crate::store::{ObjectStateStore, ProcedureMessage, ProcedureStore, StateStoreRef};
use crate::{
BoxedProcedure, ContextProvider, LockKey, ProcedureId, ProcedureManager, ProcedureState,
ProcedureWithId,
ProcedureWithId, Watcher,
};
/// Mutable metadata of a procedure during execution.
#[derive(Debug)]
struct ExecMeta {
/// Current procedure state.
state: ProcedureState,
/// Id of child procedures.
children: Vec<ProcedureId>,
}
impl Default for ExecMeta {
fn default() -> ExecMeta {
ExecMeta {
state: ProcedureState::Running,
children: Vec::new(),
}
}
}
/// Shared metadata of a procedure.
///
/// # Note
@@ -72,38 +55,55 @@ pub(crate) struct ProcedureMeta {
child_notify: Notify,
/// Lock required by this procedure.
lock_key: LockKey,
/// Mutable status during execution.
exec_meta: Mutex<ExecMeta>,
/// Sender to notify the procedure state.
state_sender: Sender<ProcedureState>,
/// Receiver to watch the procedure state.
state_receiver: Receiver<ProcedureState>,
/// Id of child procedures.
children: Mutex<Vec<ProcedureId>>,
}
impl ProcedureMeta {
fn new(id: ProcedureId, parent_id: Option<ProcedureId>, lock_key: LockKey) -> ProcedureMeta {
let (state_sender, state_receiver) = watch::channel(ProcedureState::Running);
ProcedureMeta {
id,
lock_notify: Notify::new(),
parent_id,
child_notify: Notify::new(),
lock_key,
state_sender,
state_receiver,
children: Mutex::new(Vec::new()),
}
}
/// Returns current [ProcedureState].
fn state(&self) -> ProcedureState {
let meta = self.exec_meta.lock().unwrap();
meta.state.clone()
self.state_receiver.borrow().clone()
}
/// Update current [ProcedureState].
fn set_state(&self, state: ProcedureState) {
let mut meta = self.exec_meta.lock().unwrap();
meta.state = state;
// Safety: ProcedureMeta also holds the receiver, so `send()` should never fail.
self.state_sender.send(state).unwrap();
}
/// Push `procedure_id` of the subprocedure to the metadata.
fn push_child(&self, procedure_id: ProcedureId) {
let mut meta = self.exec_meta.lock().unwrap();
meta.children.push(procedure_id);
let mut children = self.children.lock().unwrap();
children.push(procedure_id);
}
/// Append subprocedures to given `buffer`.
fn list_children(&self, buffer: &mut Vec<ProcedureId>) {
let meta = self.exec_meta.lock().unwrap();
buffer.extend_from_slice(&meta.children);
let children = self.children.lock().unwrap();
buffer.extend_from_slice(&children);
}
/// Returns the number of subprocedures.
fn num_children(&self) -> usize {
self.exec_meta.lock().unwrap().children.len()
self.children.lock().unwrap().len()
}
}
@@ -175,6 +175,14 @@ impl ManagerContext {
procedures.get(&procedure_id).map(|meta| meta.state())
}
/// Returns the [Watcher] of specific `procedure_id`.
fn watcher(&self, procedure_id: ProcedureId) -> Option<Watcher> {
let procedures = self.procedures.read().unwrap();
procedures
.get(&procedure_id)
.map(|meta| meta.state_receiver.clone())
}
/// Notify a suspended parent procedure with specific `procedure_id` by its subprocedure.
fn notify_by_subprocedure(&self, procedure_id: ProcedureId) {
let procedures = self.procedures.read().unwrap();
@@ -308,15 +316,8 @@ impl LocalManager {
procedure_id: ProcedureId,
step: u32,
procedure: BoxedProcedure,
) -> Result<()> {
let meta = Arc::new(ProcedureMeta {
id: procedure_id,
lock_notify: Notify::new(),
parent_id: None,
child_notify: Notify::new(),
lock_key: procedure.lock_key(),
exec_meta: Mutex::new(ExecMeta::default()),
});
) -> Result<Watcher> {
let meta = Arc::new(ProcedureMeta::new(procedure_id, None, procedure.lock_key()));
let runner = Runner {
meta: meta.clone(),
procedure,
@@ -325,6 +326,8 @@ impl LocalManager {
store: ProcedureStore::new(self.state_store.clone()),
};
let watcher = meta.state_receiver.clone();
// Inserts meta into the manager before actually spawnd the runner.
ensure!(
self.manager_ctx.try_insert_procedure(meta),
@@ -336,7 +339,7 @@ impl LocalManager {
let _ = runner.run().await;
});
Ok(())
Ok(watcher)
}
}
@@ -351,16 +354,14 @@ impl ProcedureManager for LocalManager {
Ok(())
}
async fn submit(&self, procedure: ProcedureWithId) -> Result<()> {
async fn submit(&self, procedure: ProcedureWithId) -> Result<Watcher> {
let procedure_id = procedure.id;
ensure!(
!self.manager_ctx.contains_procedure(procedure_id),
DuplicateProcedureSnafu { procedure_id }
);
self.submit_root(procedure.id, 0, procedure.procedure)?;
Ok(())
self.submit_root(procedure.id, 0, procedure.procedure)
}
async fn recover(&self) -> Result<()> {
@@ -401,6 +402,10 @@ impl ProcedureManager for LocalManager {
async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
Ok(self.manager_ctx.state(procedure_id))
}
fn procedure_watcher(&self, procedure_id: ProcedureId) -> Option<Watcher> {
self.manager_ctx.watcher(procedure_id)
}
}
/// Create a new [ProcedureMeta] for test purpose.
@@ -412,14 +417,7 @@ mod test_util {
use super::*;
pub(crate) fn procedure_meta_for_test() -> ProcedureMeta {
ProcedureMeta {
id: ProcedureId::random(),
lock_notify: Notify::new(),
parent_id: None,
child_notify: Notify::new(),
lock_key: LockKey::default(),
exec_meta: Mutex::new(ExecMeta::default()),
}
ProcedureMeta::new(ProcedureId::random(), None, LockKey::default())
}
pub(crate) fn new_object_store(dir: &TempDir) -> ObjectStore {
@@ -431,6 +429,8 @@ mod test_util {
#[cfg(test)]
mod tests {
use common_error::mock::MockError;
use common_error::prelude::StatusCode;
use tempdir::TempDir;
use super::*;
@@ -500,6 +500,7 @@ mod tests {
#[derive(Debug)]
struct ProcedureToLoad {
content: String,
lock_key: LockKey,
}
#[async_trait]
@@ -517,7 +518,7 @@ mod tests {
}
fn lock_key(&self) -> LockKey {
LockKey::default()
self.lock_key.clone()
}
}
@@ -525,6 +526,7 @@ mod tests {
fn new(content: &str) -> ProcedureToLoad {
ProcedureToLoad {
content: content.to_string(),
lock_key: LockKey::default(),
}
}
@@ -608,39 +610,20 @@ mod tests {
};
let manager = LocalManager::new(config);
#[derive(Debug)]
struct MockProcedure {}
#[async_trait]
impl Procedure for MockProcedure {
fn type_name(&self) -> &str {
"MockProcedure"
}
async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
unimplemented!()
}
fn dump(&self) -> Result<String> {
unimplemented!()
}
fn lock_key(&self) -> LockKey {
LockKey::single("test.submit")
}
}
let procedure_id = ProcedureId::random();
assert!(manager
.procedure_state(procedure_id)
.await
.unwrap()
.is_none());
assert!(manager.procedure_watcher(procedure_id).is_none());
let mut procedure = ProcedureToLoad::new("submit");
procedure.lock_key = LockKey::single("test.submit");
manager
.submit(ProcedureWithId {
id: procedure_id,
procedure: Box::new(MockProcedure {}),
procedure: Box::new(procedure),
})
.await
.unwrap();
@@ -649,15 +632,77 @@ mod tests {
.await
.unwrap()
.is_some());
// Wait for the procedure done.
let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
watcher.changed().await.unwrap();
assert_eq!(ProcedureState::Done, *watcher.borrow());
// Try to submit procedure with same id again.
let err = manager
.submit(ProcedureWithId {
id: procedure_id,
procedure: Box::new(MockProcedure {}),
procedure: Box::new(ProcedureToLoad::new("submit")),
})
.await
.unwrap_err();
assert!(matches!(err, Error::DuplicateProcedure { .. }), "{err}");
}
#[tokio::test]
async fn test_state_changed_on_err() {
let dir = TempDir::new("on_err").unwrap();
let config = ManagerConfig {
object_store: test_util::new_object_store(&dir),
};
let manager = LocalManager::new(config);
#[derive(Debug)]
struct MockProcedure {
panic: bool,
}
#[async_trait]
impl Procedure for MockProcedure {
fn type_name(&self) -> &str {
"MockProcedure"
}
async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
if self.panic {
// Test the runner can set the state to failed even the procedure
// panics.
panic!();
} else {
Err(Error::external(MockError::new(StatusCode::Unexpected)))
}
}
fn dump(&self) -> Result<String> {
Ok(String::new())
}
fn lock_key(&self) -> LockKey {
LockKey::single("test.submit")
}
}
let check_procedure = |procedure| {
async {
let procedure_id = ProcedureId::random();
let mut watcher = manager
.submit(ProcedureWithId {
id: procedure_id,
procedure: Box::new(procedure),
})
.await
.unwrap();
// Wait for the notification.
watcher.changed().await.unwrap();
assert_eq!(ProcedureState::Failed, *watcher.borrow());
}
};
check_procedure(MockProcedure { panic: false }).await;
check_procedure(MockProcedure { panic: true }).await;
}
}

View File

@@ -12,15 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::time::Duration;
use common_telemetry::logging;
use tokio::sync::Notify;
use tokio::time;
use crate::error::{Error, Result};
use crate::local::{ExecMeta, ManagerContext, ProcedureMeta, ProcedureMetaRef};
use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef};
use crate::store::ProcedureStore;
use crate::{BoxedProcedure, Context, ProcedureId, ProcedureState, ProcedureWithId, Status};
@@ -49,6 +48,52 @@ impl ExecResult {
}
}
/// A guard to cleanup procedure state.
struct ProcedureGuard {
meta: ProcedureMetaRef,
manager_ctx: Arc<ManagerContext>,
finish: bool,
}
impl ProcedureGuard {
/// Returns a new [ProcedureGuard].
fn new(meta: ProcedureMetaRef, manager_ctx: Arc<ManagerContext>) -> ProcedureGuard {
ProcedureGuard {
meta,
manager_ctx,
finish: false,
}
}
/// The procedure is finished successfully.
fn finish(mut self) {
self.finish = true;
}
}
impl Drop for ProcedureGuard {
fn drop(&mut self) {
if !self.finish {
logging::error!("Procedure {} exits unexpectedly", self.meta.id);
// Set state to failed. This is useful in test as runtime may not abort when the runner task panics.
// See https://github.com/tokio-rs/tokio/issues/2002 .
// We set set_panic_hook() in the application's main function. But our tests don't have this panic hook.
self.meta.set_state(ProcedureState::Failed);
}
// Notify parent procedure.
if let Some(parent_id) = self.meta.parent_id {
self.manager_ctx.notify_by_subprocedure(parent_id);
}
// Release lock in reverse order.
for key in self.meta.lock_key.keys_to_unlock() {
self.manager_ctx.lock_map.release_lock(key, self.meta.id);
}
}
}
// TODO(yingwen): Support cancellation.
pub(crate) struct Runner {
pub(crate) meta: ProcedureMetaRef,
@@ -61,6 +106,9 @@ pub(crate) struct Runner {
impl Runner {
/// Run the procedure.
pub(crate) async fn run(mut self) -> Result<()> {
// Ensure we can update the procedure state.
let guard = ProcedureGuard::new(self.meta.clone(), self.manager_ctx.clone());
logging::info!(
"Runner {}-{} starts",
self.procedure.type_name(),
@@ -84,15 +132,13 @@ impl Runner {
result = Err(e);
}
// Notify parent procedure.
if let Some(parent_id) = self.meta.parent_id {
self.manager_ctx.notify_by_subprocedure(parent_id);
}
// We can't remove the metadata of the procedure now as users and its parent might
// need to query its state.
// TODO(yingwen): 1. Add TTL to the metadata; 2. Only keep state in the procedure store
// so we don't need to always store the metadata in memory after the procedure is done.
// Release lock in reverse order.
for key in self.meta.lock_key.keys_to_unlock() {
self.manager_ctx.lock_map.release_lock(key, self.meta.id);
}
// Release locks and notify parent procedure.
guard.finish();
// If this is the root procedure, clean up message cache.
if self.meta.parent_id.is_none() {
@@ -100,11 +146,6 @@ impl Runner {
self.manager_ctx.remove_messages(&procedure_ids);
}
// We can't remove the metadata of the procedure now as users and its parent might
// need to query its state.
// TODO(yingwen): 1. Add TTL to the metadata; 2. Only keep state in the procedure store
// so we don't need to always store the metadata in memory after the procedure is done.
logging::info!(
"Runner {}-{} exits",
self.procedure.type_name(),
@@ -203,14 +244,11 @@ impl Runner {
step = loaded_procedure.step;
}
let meta = Arc::new(ProcedureMeta {
id: procedure_id,
lock_notify: Notify::new(),
parent_id: Some(self.meta.id),
child_notify: Notify::new(),
lock_key: procedure.lock_key(),
exec_meta: Mutex::new(ExecMeta::default()),
});
let meta = Arc::new(ProcedureMeta::new(
procedure_id,
Some(self.meta.id),
procedure.lock_key(),
));
let runner = Runner {
meta: meta.clone(),
procedure,

View File

@@ -20,6 +20,7 @@ use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
use snafu::{ResultExt, Snafu};
use tokio::sync::watch::Receiver;
use uuid::Uuid;
use crate::error::Result;
@@ -209,6 +210,9 @@ pub enum ProcedureState {
Failed,
}
/// Watcher to watch procedure state.
pub type Watcher = Receiver<ProcedureState>;
// TODO(yingwen): Shutdown
/// `ProcedureManager` executes [Procedure] submitted to it.
#[async_trait]
@@ -217,7 +221,9 @@ pub trait ProcedureManager: Send + Sync + 'static {
fn register_loader(&self, name: &str, loader: BoxedProcedureLoader) -> Result<()>;
/// Submits a procedure to execute.
async fn submit(&self, procedure: ProcedureWithId) -> Result<()>;
///
/// Returns a [Watcher] to watch the created procedure.
async fn submit(&self, procedure: ProcedureWithId) -> Result<Watcher>;
/// Recovers unfinished procedures and reruns them.
///
@@ -228,6 +234,9 @@ pub trait ProcedureManager: Send + Sync + 'static {
///
/// Returns `Ok(None)` if the procedure doesn't exist.
async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>>;
/// Returns a [Watcher] to watch [ProcedureState] of specific procedure.
fn procedure_watcher(&self, procedure_id: ProcedureId) -> Option<Watcher>;
}
/// Ref-counted pointer to the [ProcedureManager].