feat: Execute procedure in LocalManager (#953)

* feat: Runner executes procedure

* feat: Add rollback key type to ParsedKey

* feat: Write rollback key when procedure is unable to execute

* feat: Use loaded step to re-submit subprocedure

* feat: Track subprocedures in ProcedureMeta

* feat: Clean message cache after the root procedure is done

* feat: Runner returns execution result

* fix: Fix tests

* test: Test Runner

* test: Test procedures_in_tree

* chore: Refine test and comments

* feat: Remove support of lock inheritance

A deadlock happens if a subprocedure acquires the same lock key as
its parent.

The main concern is if the subprocedure directly inherits its parent's
lock, then how should we behave when multiple subprocedures acquire
this same lock? Each procedure may assume it has unique access to the
same object but it actually shares the resource with others.

Now subprocedures need to use different keys to lock objects, which is
reasonable. For example:
- A parent procedure wants to create a table so it locks the table with
a key like `catalog.schema.table`
- Subprocedures create regions for the table so they lock the regions
with keys `catalog.schema.table.region-0 ~ catalog.schema.table.region-n`

* style: Fix clippy

* feat: insert_procedure returns false on duplicate procedure

Also rename this method to try_insert_procedure

* chore: Address CR comments
This commit is contained in:
Yingwen
2023-02-13 10:38:56 +08:00
committed by GitHub
parent c06e04afbb
commit be897efd01
8 changed files with 1013 additions and 125 deletions

1
Cargo.lock generated
View File

@@ -1523,6 +1523,7 @@ dependencies = [
"common-runtime",
"common-telemetry",
"futures",
"futures-util",
"object-store",
"serde",
"serde_json",

View File

@@ -140,8 +140,6 @@ Rollback is complicated to implement so some procedures might not support rollba
## Locking
The `ProcedureManager` can provide a locking mechanism that gives a procedure read/write access to a database object such as a table so other procedures are unable to modify the same table while the current one is executing.
Sub-procedures always inherit their parents' locks. The `ProcedureManager` only acquires locks for a procedure if its parent doesn't hold the lock.
# Drawbacks
The `Procedure` framework introduces additional complexity and overhead to our database.
- To execute a `Procedure`, we need to write to the `ProcedureStore` multiple times, which may slow down the server

View File

@@ -18,4 +18,5 @@ tokio.workspace = true
uuid.workspace = true
[dev-dependencies]
futures-util.workspace = true
tempdir = "0.3"

View File

@@ -15,10 +15,11 @@
mod lock;
mod runner;
use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex, RwLock};
use async_trait::async_trait;
use common_telemetry::logging;
use object_store::ObjectStore;
use snafu::ensure;
use tokio::sync::Notify;
@@ -37,6 +38,17 @@ use crate::{
struct ExecMeta {
/// Current procedure state.
state: ProcedureState,
/// Id of child procedures.
children: Vec<ProcedureId>,
}
impl Default for ExecMeta {
fn default() -> ExecMeta {
ExecMeta {
state: ProcedureState::Running,
children: Vec::new(),
}
}
}
/// Shared metadata of a procedure.
@@ -57,30 +69,13 @@ pub(crate) struct ProcedureMeta {
parent_id: Option<ProcedureId>,
/// Notify to wait for subprocedures.
child_notify: Notify,
/// Locks inherted from the parent procedure.
parent_locks: Vec<LockKey>,
/// Lock not in `parent_locks` but required by this procedure.
///
/// If the parent procedure already owns the lock that this procedure
/// needs, we set this field to `None`.
/// Lock required by this procedure.
lock_key: Option<LockKey>,
/// Mutable status during execution.
exec_meta: Mutex<ExecMeta>,
}
impl ProcedureMeta {
/// Return all locks the procedure needs.
fn locks_needed(&self) -> Vec<LockKey> {
let num_locks = self.parent_locks.len() + if self.lock_key.is_some() { 1 } else { 0 };
let mut locks = Vec::with_capacity(num_locks);
locks.extend_from_slice(&self.parent_locks);
if let Some(key) = &self.lock_key {
locks.push(key.clone());
}
locks
}
/// Returns current [ProcedureState].
fn state(&self) -> ProcedureState {
let meta = self.exec_meta.lock().unwrap();
@@ -92,12 +87,33 @@ impl ProcedureMeta {
let mut meta = self.exec_meta.lock().unwrap();
meta.state = state;
}
/// Push `procedure_id` of the subprocedure to the metadata.
fn push_child(&self, procedure_id: ProcedureId) {
let mut meta = self.exec_meta.lock().unwrap();
meta.children.push(procedure_id);
}
/// Append subprocedures to given `buffer`.
fn list_children(&self, buffer: &mut Vec<ProcedureId>) {
let meta = self.exec_meta.lock().unwrap();
buffer.extend_from_slice(&meta.children);
}
/// Returns the number of subprocedures.
fn num_children(&self) -> usize {
self.exec_meta.lock().unwrap().children.len()
}
}
/// Reference counted pointer to [ProcedureMeta].
type ProcedureMetaRef = Arc<ProcedureMeta>;
/// Procedure and its parent procedure id.
struct ProcedureAndParent(BoxedProcedure, Option<ProcedureId>);
/// Procedure loaded from store.
struct LoadedProcedure {
procedure: BoxedProcedure,
parent_id: Option<ProcedureId>,
step: u32,
}
/// Shared context of the manager.
pub(crate) struct ManagerContext {
@@ -128,14 +144,20 @@ impl ManagerContext {
procedures.contains_key(&procedure_id)
}
/// Insert the `procedure` to the context.
/// Try to insert the `procedure` to the context if there is no procedure
/// with same [ProcedureId].
///
/// # Panics
/// Panics if the procedure already exists.
fn insert_procedure(&self, meta: ProcedureMetaRef) {
/// Returns `false` if there is already a procedure using the same [ProcedureId].
fn try_insert_procedure(&self, meta: ProcedureMetaRef) -> bool {
let mut procedures = self.procedures.write().unwrap();
if procedures.contains_key(&meta.id) {
return false;
}
let old = procedures.insert(meta.id, meta);
assert!(old.is_none());
debug_assert!(old.is_none());
true
}
/// Returns the [ProcedureState] of specific `procedure_id`.
@@ -143,6 +165,99 @@ impl ManagerContext {
let procedures = self.procedures.read().unwrap();
procedures.get(&procedure_id).map(|meta| meta.state())
}
/// Notify a suspended parent procedure with specific `procedure_id` by its subprocedure.
fn notify_by_subprocedure(&self, procedure_id: ProcedureId) {
let procedures = self.procedures.read().unwrap();
if let Some(meta) = procedures.get(&procedure_id) {
meta.child_notify.notify_one();
}
}
/// Load procedure with specific `procedure_id` from cached [ProcedureMessage]s.
fn load_one_procedure(&self, procedure_id: ProcedureId) -> Option<LoadedProcedure> {
let messages = self.messages.lock().unwrap();
let message = messages.get(&procedure_id)?;
let loaders = self.loaders.lock().unwrap();
let loader = loaders.get(&message.type_name).or_else(|| {
logging::error!(
"Loader not found, procedure_id: {}, type_name: {}",
procedure_id,
message.type_name
);
None
})?;
let procedure = loader(&message.data)
.map_err(|e| {
logging::error!(
"Failed to load procedure data, key: {}, source: {}",
procedure_id,
e
);
e
})
.ok()?;
Some(LoadedProcedure {
procedure,
parent_id: message.parent_id,
step: message.step,
})
}
/// Returns all procedures in the tree (including given `root` procedure).
///
/// If callers need a consistent view of the tree, they must ensure no new
/// procedure is added to the tree during using this method.
fn procedures_in_tree(&self, root: &ProcedureMetaRef) -> Vec<ProcedureId> {
let sub_num = root.num_children();
// Reserve capacity for the root procedure and its children.
let mut procedures = Vec::with_capacity(1 + sub_num);
let mut queue = VecDeque::with_capacity(1 + sub_num);
// Push the root procedure to the queue.
queue.push_back(root.clone());
let mut children_ids = Vec::with_capacity(sub_num);
let mut children = Vec::with_capacity(sub_num);
while let Some(meta) = queue.pop_front() {
procedures.push(meta.id);
// Find metadatas of children.
children_ids.clear();
meta.list_children(&mut children_ids);
self.find_procedures(&children_ids, &mut children);
// Traverse children later.
for child in children.drain(..) {
queue.push_back(child);
}
}
procedures
}
/// Finds procedures by given `procedure_ids`.
///
/// Ignores the id if corresponding procedure is not found.
fn find_procedures(&self, procedure_ids: &[ProcedureId], metas: &mut Vec<ProcedureMetaRef>) {
let procedures = self.procedures.read().unwrap();
for procedure_id in procedure_ids {
if let Some(meta) = procedures.get(procedure_id) {
metas.push(meta.clone());
}
}
}
/// Remove cached [ProcedureMessage] by ids.
fn remove_messages(&self, procedure_ids: &[ProcedureId]) {
let mut messages = self.messages.lock().unwrap();
for procedure_id in procedure_ids {
messages.remove(procedure_id);
}
}
}
/// Config for [LocalManager].
@@ -168,17 +283,19 @@ impl LocalManager {
}
/// Submit a root procedure with given `procedure_id`.
fn submit_root(&self, procedure_id: ProcedureId, step: u32, procedure: BoxedProcedure) {
fn submit_root(
&self,
procedure_id: ProcedureId,
step: u32,
procedure: BoxedProcedure,
) -> Result<()> {
let meta = Arc::new(ProcedureMeta {
id: procedure_id,
lock_notify: Notify::new(),
parent_id: None,
child_notify: Notify::new(),
parent_locks: Vec::new(),
lock_key: procedure.lock_key(),
exec_meta: Mutex::new(ExecMeta {
state: ProcedureState::Running,
}),
exec_meta: Mutex::new(ExecMeta::default()),
});
let runner = Runner {
meta: meta.clone(),
@@ -188,12 +305,18 @@ impl LocalManager {
store: ProcedureStore::new(self.state_store.clone()),
};
self.manager_ctx.insert_procedure(meta);
// Inserts meta into the manager before actually spawnd the runner.
ensure!(
self.manager_ctx.try_insert_procedure(meta),
DuplicateProcedureSnafu { procedure_id },
);
common_runtime::spawn_bg(async move {
// Run the root procedure.
runner.run().await
let _ = runner.run().await;
});
Ok(())
}
}
@@ -215,13 +338,13 @@ impl ProcedureManager for LocalManager {
DuplicateProcedureSnafu { procedure_id }
);
self.submit_root(procedure.id, 0, procedure.procedure);
self.submit_root(procedure.id, 0, procedure.procedure)?;
Ok(())
}
async fn recover(&self) -> Result<()> {
unimplemented!()
todo!("Recover procedure and messages")
}
async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
@@ -231,23 +354,32 @@ impl ProcedureManager for LocalManager {
/// Create a new [ProcedureMeta] for test purpose.
#[cfg(test)]
fn procedure_meta_for_test() -> ProcedureMeta {
ProcedureMeta {
id: ProcedureId::random(),
lock_notify: Notify::new(),
parent_id: None,
child_notify: Notify::new(),
parent_locks: Vec::new(),
lock_key: None,
exec_meta: Mutex::new(ExecMeta {
state: ProcedureState::Running,
}),
mod test_util {
use object_store::services::fs::Builder;
use tempdir::TempDir;
use super::*;
pub(crate) fn procedure_meta_for_test() -> ProcedureMeta {
ProcedureMeta {
id: ProcedureId::random(),
lock_notify: Notify::new(),
parent_id: None,
child_notify: Notify::new(),
lock_key: None,
exec_meta: Mutex::new(ExecMeta::default()),
}
}
pub(crate) fn new_object_store(dir: &TempDir) -> ObjectStore {
let store_dir = dir.path().to_str().unwrap();
let accessor = Builder::default().root(store_dir).build().unwrap();
ObjectStore::new(accessor)
}
}
#[cfg(test)]
mod tests {
use object_store::services::fs::Builder;
use serde::{Deserialize, Serialize};
use tempdir::TempDir;
@@ -255,40 +387,15 @@ mod tests {
use crate::error::Error;
use crate::{Context, Procedure, Status};
#[test]
fn test_locks_needed() {
let mut meta = procedure_meta_for_test();
let locks = meta.locks_needed();
assert!(locks.is_empty());
let parent_locks = vec![LockKey::new("a"), LockKey::new("b")];
meta.parent_locks = parent_locks.clone();
let locks = meta.locks_needed();
assert_eq!(parent_locks, locks);
meta.lock_key = Some(LockKey::new("c"));
let locks = meta.locks_needed();
assert_eq!(
vec![LockKey::new("a"), LockKey::new("b"), LockKey::new("c")],
locks
);
}
fn new_object_store(dir: &TempDir) -> ObjectStore {
let store_dir = dir.path().to_str().unwrap();
let accessor = Builder::default().root(store_dir).build().unwrap();
ObjectStore::new(accessor)
}
#[test]
fn test_manager_context() {
let ctx = ManagerContext::new();
let meta = Arc::new(procedure_meta_for_test());
let meta = Arc::new(test_util::procedure_meta_for_test());
assert!(!ctx.contains_procedure(meta.id));
assert!(ctx.state(meta.id).is_none());
ctx.insert_procedure(meta.clone());
assert!(ctx.try_insert_procedure(meta.clone()));
assert!(ctx.contains_procedure(meta.id));
assert_eq!(ProcedureState::Running, ctx.state(meta.id).unwrap());
@@ -297,20 +404,54 @@ mod tests {
}
#[test]
#[should_panic]
fn test_manager_context_insert_duplicate() {
let ctx = ManagerContext::new();
let meta = Arc::new(procedure_meta_for_test());
let meta = Arc::new(test_util::procedure_meta_for_test());
ctx.insert_procedure(meta.clone());
ctx.insert_procedure(meta);
assert!(ctx.try_insert_procedure(meta.clone()));
assert!(!ctx.try_insert_procedure(meta));
}
fn new_child(parent_id: ProcedureId, ctx: &ManagerContext) -> ProcedureMetaRef {
let mut child = test_util::procedure_meta_for_test();
child.parent_id = Some(parent_id);
let child = Arc::new(child);
assert!(ctx.try_insert_procedure(child.clone()));
let mut parent = Vec::new();
ctx.find_procedures(&[parent_id], &mut parent);
parent[0].push_child(child.id);
child
}
#[test]
fn test_procedures_in_tree() {
let ctx = ManagerContext::new();
let root = Arc::new(test_util::procedure_meta_for_test());
assert!(ctx.try_insert_procedure(root.clone()));
assert_eq!(1, ctx.procedures_in_tree(&root).len());
let child1 = new_child(root.id, &ctx);
let child2 = new_child(root.id, &ctx);
let child3 = new_child(child1.id, &ctx);
let child4 = new_child(child1.id, &ctx);
let child5 = new_child(child2.id, &ctx);
let expect = vec![
root.id, child1.id, child2.id, child3.id, child4.id, child5.id,
];
assert_eq!(expect, ctx.procedures_in_tree(&root));
}
#[test]
fn test_register_loader() {
let dir = TempDir::new("register").unwrap();
let config = ManagerConfig {
object_store: new_object_store(&dir),
object_store: test_util::new_object_store(&dir),
};
let manager = LocalManager::new(config);
@@ -363,7 +504,7 @@ mod tests {
async fn test_submit_procedure() {
let dir = TempDir::new("submit").unwrap();
let config = ManagerConfig {
object_store: new_object_store(&dir),
object_store: test_util::new_object_store(&dir),
};
let manager = LocalManager::new(config);
@@ -417,6 +558,6 @@ mod tests {
})
.await
.unwrap_err();
assert!(matches!(err, Error::DuplicateProcedure { .. }), "{err:?}");
assert!(matches!(err, Error::DuplicateProcedure { .. }), "{err}");
}
}

View File

@@ -142,11 +142,11 @@ mod tests {
use std::sync::Arc;
use super::*;
use crate::local;
use crate::local::test_util;
#[test]
fn test_lock_no_waiter() {
let meta = Arc::new(local::procedure_meta_for_test());
let meta = Arc::new(test_util::procedure_meta_for_test());
let mut lock = Lock::from_owner(meta);
assert!(!lock.switch_owner());
@@ -154,10 +154,10 @@ mod tests {
#[tokio::test]
async fn test_lock_with_waiter() {
let owner = Arc::new(local::procedure_meta_for_test());
let owner = Arc::new(test_util::procedure_meta_for_test());
let mut lock = Lock::from_owner(owner);
let waiter = Arc::new(local::procedure_meta_for_test());
let waiter = Arc::new(test_util::procedure_meta_for_test());
lock.waiters.push_back(waiter.clone());
assert!(lock.switch_owner());
@@ -171,11 +171,11 @@ mod tests {
async fn test_lock_map() {
let key = "hello";
let owner = Arc::new(local::procedure_meta_for_test());
let owner = Arc::new(test_util::procedure_meta_for_test());
let lock_map = Arc::new(LockMap::new());
lock_map.acquire_lock(key, owner.clone()).await;
let waiter = Arc::new(local::procedure_meta_for_test());
let waiter = Arc::new(test_util::procedure_meta_for_test());
let waiter_id = waiter.id;
// Waiter release the lock, this should not take effect.

View File

@@ -12,14 +12,48 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use common_telemetry::logging;
use tokio::sync::Notify;
use tokio::time;
use crate::local::{ManagerContext, ProcedureMetaRef};
use crate::error::{Error, Result};
use crate::local::{ExecMeta, ManagerContext, ProcedureMeta, ProcedureMetaRef};
use crate::store::ProcedureStore;
use crate::BoxedProcedure;
use crate::{BoxedProcedure, Context, ProcedureId, ProcedureState, ProcedureWithId, Status};
const ERR_WAIT_DURATION: Duration = Duration::from_secs(30);
#[derive(Debug)]
enum ExecResult {
Continue,
Done,
RetryLater,
Failed(Error),
}
#[cfg(test)]
impl ExecResult {
fn is_continue(&self) -> bool {
matches!(self, ExecResult::Continue)
}
fn is_done(&self) -> bool {
matches!(self, ExecResult::Done)
}
fn is_retry_later(&self) -> bool {
matches!(self, ExecResult::RetryLater)
}
fn is_failed(&self) -> bool {
matches!(self, ExecResult::Failed(_))
}
}
// TODO(yingwen): Support cancellation.
pub(crate) struct Runner {
pub(crate) meta: ProcedureMetaRef,
pub(crate) procedure: BoxedProcedure,
@@ -30,7 +64,7 @@ pub(crate) struct Runner {
impl Runner {
/// Run the procedure.
pub(crate) async fn run(self) {
pub(crate) async fn run(mut self) -> Result<()> {
logging::info!(
"Runner {}-{} starts",
self.procedure.type_name(),
@@ -40,7 +74,10 @@ impl Runner {
// its parent.
let lock_key = self.meta.lock_key.clone();
// TODO(yingwen): Support multiple lock keys.
// TODO(yingwen):
// 1. Support multiple lock keys;
// 2. Detect recursive locking (and deadlock) if possible. Maybe we could detect
// recursive locking by adding a root procedure id to the meta.
// Acquire lock if necessary.
if let Some(key) = &lock_key {
self.manager_ctx
@@ -49,13 +86,30 @@ impl Runner {
.await;
}
// TODO(yingwen): Execute the procedure.
let mut result = Ok(());
// Execute the procedure. We need to release the lock whenever the the execution
// is successful or fail.
if let Err(e) = self.execute_procedure_in_loop().await {
result = Err(e);
}
// Notify parent procedure.
if let Some(parent_id) = self.meta.parent_id {
self.manager_ctx.notify_by_subprocedure(parent_id);
}
if let Some(key) = &lock_key {
self.manager_ctx
.lock_map
.release_lock(key.key(), self.meta.id);
}
// If this is the root procedure, clean up message cache.
if self.meta.parent_id.is_none() {
let procedure_ids = self.manager_ctx.procedures_in_tree(&self.meta);
self.manager_ctx.remove_messages(&procedure_ids);
}
// We can't remove the metadata of the procedure now as users and its parent might
// need to query its state.
// TODO(yingwen): 1. Add TTL to the metadata; 2. Only keep state in the procedure store
@@ -66,5 +120,614 @@ impl Runner {
self.procedure.type_name(),
self.meta.id
);
result
}
async fn execute_procedure_in_loop(&mut self) -> Result<()> {
let ctx = Context {
procedure_id: self.meta.id,
};
loop {
match self.execute_once(&ctx).await {
ExecResult::Continue => (),
ExecResult::Done => return Ok(()),
ExecResult::RetryLater => {
self.wait_on_err().await;
}
ExecResult::Failed(e) => return Err(e),
}
}
}
async fn execute_once(&mut self, ctx: &Context) -> ExecResult {
match self.procedure.execute(ctx).await {
Ok(status) => {
logging::debug!(
"Execute procedure {}-{} once, status: {:?}, need_persist: {}",
self.procedure.type_name(),
self.meta.id,
status,
status.need_persist(),
);
if status.need_persist() && self.persist_procedure().await.is_err() {
return ExecResult::RetryLater;
}
match status {
Status::Executing { .. } => (),
Status::Suspended { subprocedures, .. } => {
self.on_suspended(subprocedures).await;
}
Status::Done => {
if self.commit_procedure().await.is_err() {
return ExecResult::RetryLater;
}
self.done();
return ExecResult::Done;
}
}
ExecResult::Continue
}
Err(e) => {
logging::error!(
e;
"Failed to execute procedure {}-{}",
self.procedure.type_name(),
self.meta.id
);
self.meta.set_state(ProcedureState::Failed);
// Write rollback key so we can skip this procedure while recovering procedures.
if self.rollback_procedure().await.is_err() {
return ExecResult::RetryLater;
}
ExecResult::Failed(e)
}
}
}
/// Submit a subprocedure with specific `procedure_id`.
fn submit_subprocedure(&self, procedure_id: ProcedureId, mut procedure: BoxedProcedure) {
if self.manager_ctx.contains_procedure(procedure_id) {
// If the parent has already submitted this procedure, don't submit it again.
return;
}
let mut step = 0;
if let Some(loaded_procedure) = self.manager_ctx.load_one_procedure(procedure_id) {
// Try to load procedure state from the message to avoid re-run the subprocedure
// from initial state.
assert_eq!(self.meta.id, loaded_procedure.parent_id.unwrap());
// Use the dumped procedure from the procedure store.
procedure = loaded_procedure.procedure;
// Update step number.
step = loaded_procedure.step;
}
let meta = Arc::new(ProcedureMeta {
id: procedure_id,
lock_notify: Notify::new(),
parent_id: Some(self.meta.id),
child_notify: Notify::new(),
lock_key: procedure.lock_key(),
exec_meta: Mutex::new(ExecMeta::default()),
});
let runner = Runner {
meta: meta.clone(),
procedure,
manager_ctx: self.manager_ctx.clone(),
step,
store: self.store.clone(),
};
// Insert the procedure. We already check the procedure existence before inserting
// so we add an assertion to ensure the procedure id is unique and no other procedures
// using the same procedure id.
assert!(
self.manager_ctx.try_insert_procedure(meta),
"Procedure {}-{} submit an existing procedure {}-{}",
self.procedure.type_name(),
self.meta.id,
runner.procedure.type_name(),
procedure_id,
);
// Add the id of the subprocedure to the metadata.
self.meta.push_child(procedure_id);
common_runtime::spawn_bg(async move {
// Run the root procedure.
runner.run().await
});
}
async fn wait_on_err(&self) {
time::sleep(ERR_WAIT_DURATION).await;
}
async fn on_suspended(&self, subprocedures: Vec<ProcedureWithId>) {
let has_child = !subprocedures.is_empty();
for subprocedure in subprocedures {
logging::info!(
"Procedure {}-{} submit subprocedure {}-{}",
self.procedure.type_name(),
self.meta.id,
subprocedure.procedure.type_name(),
subprocedure.id
);
self.submit_subprocedure(subprocedure.id, subprocedure.procedure);
}
logging::info!(
"Procedure {}-{} is waiting for subprocedures",
self.procedure.type_name(),
self.meta.id,
);
// Wait for subprocedures.
if has_child {
self.meta.child_notify.notified().await;
logging::info!(
"Procedure {}-{} is waked up",
self.procedure.type_name(),
self.meta.id,
);
}
}
async fn persist_procedure(&mut self) -> Result<()> {
self.store
.store_procedure(
self.meta.id,
self.step,
&self.procedure,
self.meta.parent_id,
)
.await
.map_err(|e| {
logging::error!(
e; "Failed to persist procedure {}-{}",
self.procedure.type_name(),
self.meta.id
);
e
})?;
self.step += 1;
Ok(())
}
async fn commit_procedure(&mut self) -> Result<()> {
self.store
.commit_procedure(self.meta.id, self.step)
.await
.map_err(|e| {
logging::error!(
e; "Failed to commit procedure {}-{}",
self.procedure.type_name(),
self.meta.id
);
e
})?;
self.step += 1;
Ok(())
}
async fn rollback_procedure(&mut self) -> Result<()> {
self.store
.rollback_procedure(self.meta.id, self.step)
.await
.map_err(|e| {
logging::error!(
e; "Failed to write rollback key for procedure {}-{}",
self.procedure.type_name(),
self.meta.id
);
e
})?;
self.step += 1;
Ok(())
}
fn done(&self) {
// TODO(yingwen): Add files to remove list.
logging::info!(
"Procedure {}-{} done",
self.procedure.type_name(),
self.meta.id
);
// Mark the state of this procedure to done.
self.meta.set_state(ProcedureState::Done);
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use async_trait::async_trait;
use common_error::ext::PlainError;
use common_error::mock::MockError;
use common_error::prelude::StatusCode;
use futures_util::future::BoxFuture;
use futures_util::{FutureExt, TryStreamExt};
use object_store::ObjectStore;
use tempdir::TempDir;
use super::*;
use crate::local::test_util;
use crate::store::ObjectStateStore;
use crate::{LockKey, Procedure};
const ROOT_ID: &str = "9f805a1f-05f7-490c-9f91-bd56e3cc54c1";
fn new_runner(
meta: ProcedureMetaRef,
procedure: BoxedProcedure,
store: ProcedureStore,
) -> Runner {
Runner {
meta,
procedure,
manager_ctx: Arc::new(ManagerContext::new()),
step: 0,
store,
}
}
fn new_procedure_store(object_store: ObjectStore) -> ProcedureStore {
let state_store = ObjectStateStore::new(object_store);
ProcedureStore::new(Arc::new(state_store))
}
async fn check_files(object_store: &ObjectStore, procedure_id: ProcedureId, files: &[&str]) {
let dir = format!("{procedure_id}/");
let object = object_store.object(&dir);
let lister = object.list().await.unwrap();
let mut files_in_dir: Vec<_> = lister
.map_ok(|de| de.name().to_string())
.try_collect()
.await
.unwrap();
files_in_dir.sort_unstable();
assert_eq!(files, files_in_dir);
}
#[derive(Debug)]
struct ProcedureAdapter<F> {
data: String,
lock_key: Option<LockKey>,
exec_fn: F,
}
impl<F> ProcedureAdapter<F> {
fn new_meta(&self, uuid: &str) -> ProcedureMetaRef {
let mut meta = test_util::procedure_meta_for_test();
meta.id = ProcedureId::parse_str(uuid).unwrap();
meta.lock_key = self.lock_key.clone();
Arc::new(meta)
}
}
#[async_trait]
impl<F> Procedure for ProcedureAdapter<F>
where
F: FnMut() -> BoxFuture<'static, Result<Status>> + Send + Sync,
{
fn type_name(&self) -> &str {
"ProcedureAdapter"
}
async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
let f = (self.exec_fn)();
f.await
}
fn dump(&self) -> Result<String> {
Ok(self.data.clone())
}
fn lock_key(&self) -> Option<LockKey> {
self.lock_key.clone()
}
}
async fn execute_once_normal(persist: bool, first_files: &[&str], second_files: &[&str]) {
let mut times = 0;
let exec_fn = move || {
times += 1;
async move {
if times == 1 {
Ok(Status::Executing { persist })
} else {
Ok(Status::Done)
}
}
.boxed()
};
let normal = ProcedureAdapter {
data: "normal".to_string(),
lock_key: Some(LockKey::new("catalog.schema.table")),
exec_fn,
};
let dir = TempDir::new("normal").unwrap();
let meta = normal.new_meta(ROOT_ID);
let ctx = Context {
procedure_id: meta.id,
};
let object_store = test_util::new_object_store(&dir);
let procedure_store = new_procedure_store(object_store.clone());
let mut runner = new_runner(meta, Box::new(normal), procedure_store);
let res = runner.execute_once(&ctx).await;
assert!(res.is_continue(), "{res:?}");
check_files(&object_store, ctx.procedure_id, first_files).await;
let res = runner.execute_once(&ctx).await;
assert!(res.is_done(), "{res:?}");
check_files(&object_store, ctx.procedure_id, second_files).await;
}
#[tokio::test]
async fn test_execute_once_normal() {
execute_once_normal(
true,
&["0000000000.step"],
&["0000000000.step", "0000000001.commit"],
)
.await;
}
#[tokio::test]
async fn test_execute_once_normal_skip_persist() {
execute_once_normal(false, &[], &["0000000000.commit"]).await;
}
#[tokio::test]
async fn test_on_suspend_empty() {
let exec_fn = move || {
async move {
Ok(Status::Suspended {
subprocedures: Vec::new(),
persist: false,
})
}
.boxed()
};
let suspend = ProcedureAdapter {
data: "suspend".to_string(),
lock_key: Some(LockKey::new("catalog.schema.table")),
exec_fn,
};
let dir = TempDir::new("suspend").unwrap();
let meta = suspend.new_meta(ROOT_ID);
let ctx = Context {
procedure_id: meta.id,
};
let object_store = test_util::new_object_store(&dir);
let procedure_store = new_procedure_store(object_store.clone());
let mut runner = new_runner(meta, Box::new(suspend), procedure_store);
let res = runner.execute_once(&ctx).await;
assert!(res.is_continue(), "{res:?}");
}
fn new_child_procedure(procedure_id: ProcedureId, key: &str) -> ProcedureWithId {
let mut times = 0;
let exec_fn = move || {
times += 1;
async move {
if times == 1 {
time::sleep(Duration::from_millis(200)).await;
Ok(Status::Executing { persist: true })
} else {
Ok(Status::Done)
}
}
.boxed()
};
let child = ProcedureAdapter {
data: "child".to_string(),
lock_key: Some(LockKey::new(key)),
exec_fn,
};
ProcedureWithId {
id: procedure_id,
procedure: Box::new(child),
}
}
#[tokio::test]
async fn test_on_suspend_by_subprocedures() {
let mut times = 0;
let children_ids = [ProcedureId::random(), ProcedureId::random()];
let keys = [
"catalog.schema.table.region-0",
"catalog.schema.table.region-1",
];
let manager_ctx = Arc::new(ManagerContext::new());
let ctx_in_fn = manager_ctx.clone();
let exec_fn = move || {
times += 1;
let ctx_in_future = ctx_in_fn.clone();
async move {
if times == 1 {
// Submit subprocedures.
Ok(Status::Suspended {
subprocedures: children_ids
.into_iter()
.zip(keys)
.map(|(id, key)| new_child_procedure(id, key))
.collect(),
persist: true,
})
} else {
// Wait for subprocedures.
let all_child_done = children_ids
.iter()
.all(|id| ctx_in_future.state(*id) == Some(ProcedureState::Done));
if all_child_done {
Ok(Status::Done)
} else {
// Return suspended to wait for notify.
Ok(Status::Suspended {
subprocedures: Vec::new(),
persist: false,
})
}
}
}
.boxed()
};
let parent = ProcedureAdapter {
data: "parent".to_string(),
lock_key: Some(LockKey::new("catalog.schema.table")),
exec_fn,
};
let dir = TempDir::new("parent").unwrap();
let meta = parent.new_meta(ROOT_ID);
let procedure_id = meta.id;
// Manually add this procedure to the manager ctx.
assert!(manager_ctx.try_insert_procedure(meta.clone()));
let object_store = test_util::new_object_store(&dir);
let procedure_store = new_procedure_store(object_store.clone());
let mut runner = new_runner(meta, Box::new(parent), procedure_store);
// Replace the manager ctx.
runner.manager_ctx = manager_ctx;
runner.run().await.unwrap();
// Check files on store.
for child_id in children_ids {
check_files(
&object_store,
child_id,
&["0000000000.step", "0000000001.commit"],
)
.await;
}
check_files(
&object_store,
procedure_id,
&["0000000000.step", "0000000001.commit"],
)
.await;
}
#[tokio::test]
async fn test_execute_on_error() {
let exec_fn =
|| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
let fail = ProcedureAdapter {
data: "fail".to_string(),
lock_key: Some(LockKey::new("catalog.schema.table")),
exec_fn,
};
let dir = TempDir::new("fail").unwrap();
let meta = fail.new_meta(ROOT_ID);
let ctx = Context {
procedure_id: meta.id,
};
let object_store = test_util::new_object_store(&dir);
let procedure_store = new_procedure_store(object_store.clone());
let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store);
let res = runner.execute_once(&ctx).await;
assert!(res.is_failed(), "{res:?}");
assert_eq!(ProcedureState::Failed, meta.state());
check_files(&object_store, ctx.procedure_id, &["0000000000.rollback"]).await;
}
#[tokio::test]
async fn test_child_error() {
let mut times = 0;
let manager_ctx = Arc::new(ManagerContext::new());
let child_id = ProcedureId::random();
let ctx_in_fn = manager_ctx.clone();
let exec_fn = move || {
times += 1;
let ctx_in_future = ctx_in_fn.clone();
async move {
if times == 1 {
// Submit subprocedures.
let exec_fn = || {
async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }
.boxed()
};
let fail = ProcedureAdapter {
data: "fail".to_string(),
lock_key: Some(LockKey::new("catalog.schema.table.region-0")),
exec_fn,
};
Ok(Status::Suspended {
subprocedures: vec![ProcedureWithId {
id: child_id,
procedure: Box::new(fail),
}],
persist: true,
})
} else {
// Wait for subprocedures.
logging::info!("child state is {:?}", ctx_in_future.state(child_id));
if ctx_in_future.state(child_id) == Some(ProcedureState::Failed) {
// The parent procedure to abort itself if child procedure is failed.
Err(Error::external(PlainError::new(
"subprocedure failed".to_string(),
StatusCode::Unexpected,
)))
} else {
// Return suspended to wait for notify.
Ok(Status::Suspended {
subprocedures: Vec::new(),
persist: false,
})
}
}
}
.boxed()
};
let parent = ProcedureAdapter {
data: "parent".to_string(),
lock_key: Some(LockKey::new("catalog.schema.table")),
exec_fn,
};
let dir = TempDir::new("child_err").unwrap();
let meta = parent.new_meta(ROOT_ID);
// Manually add this procedure to the manager ctx.
assert!(manager_ctx.try_insert_procedure(meta.clone()));
let object_store = test_util::new_object_store(&dir);
let procedure_store = new_procedure_store(object_store.clone());
let mut runner = new_runner(meta, Box::new(parent), procedure_store);
// Replace the manager ctx.
runner.manager_ctx = manager_ctx;
// Run the runer and execute the procedure.
let err = runner.run().await.unwrap_err();
assert!(err.to_string().contains("subprocedure failed"), "{err}");
}
}

View File

@@ -24,6 +24,7 @@ use uuid::Uuid;
use crate::error::Result;
/// Procedure execution status.
#[derive(Debug)]
pub enum Status {
/// The procedure is still executing.
Executing {
@@ -121,6 +122,12 @@ impl ProcedureWithId {
}
}
impl fmt::Debug for ProcedureWithId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}-{}", self.procedure.type_name(), self.id)
}
}
#[derive(Debug, Snafu)]
pub struct ParseIdError {
source: uuid::Error,

View File

@@ -36,6 +36,8 @@ pub struct ProcedureMessage {
pub data: String,
/// Parent procedure id.
pub parent_id: Option<ProcedureId>,
/// Current step.
pub step: u32,
}
/// Procedure storage layer.
@@ -49,7 +51,7 @@ impl ProcedureStore {
}
/// Dump the `procedure` to the storage.
async fn store_procedure(
pub(crate) async fn store_procedure(
&self,
procedure_id: ProcedureId,
step: u32,
@@ -63,11 +65,12 @@ impl ProcedureStore {
type_name: type_name.to_string(),
data,
parent_id,
step,
};
let key = ParsedKey {
procedure_id,
step,
is_committed: false,
key_type: KeyType::Step,
}
.to_string();
let value = serde_json::to_string(&message).context(ToJsonSnafu)?;
@@ -78,11 +81,32 @@ impl ProcedureStore {
}
/// Write commit flag to the storage.
async fn commit_procedure(&self, procedure_id: ProcedureId, step: u32) -> Result<()> {
pub(crate) async fn commit_procedure(
&self,
procedure_id: ProcedureId,
step: u32,
) -> Result<()> {
let key = ParsedKey {
procedure_id,
step,
is_committed: true,
key_type: KeyType::Commit,
}
.to_string();
self.0.put(&key, Vec::new()).await?;
Ok(())
}
/// Write rollback flag to the storage.
pub(crate) async fn rollback_procedure(
&self,
procedure_id: ProcedureId,
step: u32,
) -> Result<()> {
let key = ParsedKey {
procedure_id,
step,
key_type: KeyType::Rollback,
}
.to_string();
self.0.put(&key, Vec::new()).await?;
@@ -115,7 +139,7 @@ impl ProcedureStore {
}
for (procedure_id, (parsed_key, value)) in procedure_key_values {
if !parsed_key.is_committed {
if parsed_key.key_type == KeyType::Step {
let Some(message) = self.load_one_message(&parsed_key, &value) else {
// We don't abort the loading process and just ignore errors to ensure all remaining
// procedures are loaded.
@@ -139,12 +163,39 @@ impl ProcedureStore {
}
}
/// Suffix type of the key.
#[derive(Debug, PartialEq, Eq)]
enum KeyType {
Step,
Commit,
Rollback,
}
impl KeyType {
fn as_str(&self) -> &'static str {
match self {
KeyType::Step => "step",
KeyType::Commit => "commit",
KeyType::Rollback => "rollback",
}
}
fn from_str(s: &str) -> Option<KeyType> {
match s {
"step" => Some(KeyType::Step),
"commit" => Some(KeyType::Commit),
"rollback" => Some(KeyType::Rollback),
_ => None,
}
}
}
/// Key to refer the procedure in the [ProcedureStore].
#[derive(Debug, PartialEq, Eq)]
struct ParsedKey {
procedure_id: ProcedureId,
step: u32,
is_committed: bool,
key_type: KeyType,
}
impl fmt::Display for ParsedKey {
@@ -154,7 +205,7 @@ impl fmt::Display for ParsedKey {
"{}/{:010}.{}",
self.procedure_id,
self.step,
if self.is_committed { "commit" } else { "step" }
self.key_type.as_str(),
)
}
}
@@ -171,17 +222,13 @@ impl ParsedKey {
let mut parts = name.split('.');
let step_str = parts.next()?;
let suffix = parts.next()?;
let is_committed = match suffix {
"commit" => true,
"step" => false,
_ => return None,
};
let key_type = KeyType::from_str(suffix)?;
let step = step_str.parse().ok()?;
Some(ParsedKey {
procedure_id,
step,
is_committed,
key_type,
})
}
}
@@ -196,16 +243,24 @@ mod tests {
use tempdir::TempDir;
use super::*;
use crate::store::state_store::ObjectStateStore;
use crate::{Context, LockKey, Procedure, Status};
fn procedure_store_for_test(dir: &TempDir) -> ProcedureStore {
let store_dir = dir.path().to_str().unwrap();
let accessor = Builder::default().root(store_dir).build().unwrap();
let object_store = ObjectStore::new(accessor);
let state_store = ObjectStateStore::new(object_store);
ProcedureStore::new(Arc::new(state_store))
}
#[test]
fn test_parsed_key() {
let procedure_id = ProcedureId::random();
let key = ParsedKey {
procedure_id,
step: 2,
is_committed: false,
key_type: KeyType::Step,
};
assert_eq!(format!("{procedure_id}/0000000002.step"), key.to_string());
assert_eq!(key, ParsedKey::parse_str(&key.to_string()).unwrap());
@@ -213,10 +268,21 @@ mod tests {
let key = ParsedKey {
procedure_id,
step: 2,
is_committed: true,
key_type: KeyType::Commit,
};
assert_eq!(format!("{procedure_id}/0000000002.commit"), key.to_string());
assert_eq!(key, ParsedKey::parse_str(&key.to_string()).unwrap());
let key = ParsedKey {
procedure_id,
step: 2,
key_type: KeyType::Rollback,
};
assert_eq!(
format!("{procedure_id}/0000000002.rollback"),
key.to_string()
);
assert_eq!(key, ParsedKey::parse_str(&key.to_string()).unwrap());
}
#[test]
@@ -251,12 +317,13 @@ mod tests {
type_name: "TestMessage".to_string(),
data: "no parent id".to_string(),
parent_id: None,
step: 4,
};
let json = serde_json::to_string(&message).unwrap();
assert_eq!(
json,
r#"{"type_name":"TestMessage","data":"no parent id","parent_id":null}"#
r#"{"type_name":"TestMessage","data":"no parent id","parent_id":null,"step":4}"#
);
let procedure_id = ProcedureId::parse_str("9f805a1f-05f7-490c-9f91-bd56e3cc54c1").unwrap();
@@ -264,7 +331,7 @@ mod tests {
let json = serde_json::to_string(&message).unwrap();
assert_eq!(
json,
r#"{"type_name":"TestMessage","data":"no parent id","parent_id":"9f805a1f-05f7-490c-9f91-bd56e3cc54c1"}"#
r#"{"type_name":"TestMessage","data":"no parent id","parent_id":"9f805a1f-05f7-490c-9f91-bd56e3cc54c1","step":4}"#
);
}
@@ -297,19 +364,10 @@ mod tests {
}
}
fn new_procedure_store(dir: &TempDir) -> ProcedureStore {
let store_dir = dir.path().to_str().unwrap();
let accessor = Builder::default().root(store_dir).build().unwrap();
let object_store = ObjectStore::new(accessor);
let state_store = ObjectStateStore::new(object_store);
ProcedureStore(Arc::new(state_store))
}
#[tokio::test]
async fn test_store_procedure() {
let dir = TempDir::new("store_procedure").unwrap();
let store = new_procedure_store(&dir);
let store = procedure_store_for_test(&dir);
let procedure_id = ProcedureId::random();
let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure"));
@@ -326,6 +384,7 @@ mod tests {
type_name: "MockProcedure".to_string(),
data: "test store procedure".to_string(),
parent_id: None,
step: 0,
};
assert_eq!(expect, *msg);
}
@@ -333,7 +392,7 @@ mod tests {
#[tokio::test]
async fn test_commit_procedure() {
let dir = TempDir::new("commit_procedure").unwrap();
let store = new_procedure_store(&dir);
let store = procedure_store_for_test(&dir);
let procedure_id = ProcedureId::random();
let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure"));
@@ -348,10 +407,28 @@ mod tests {
assert!(messages.is_empty());
}
#[tokio::test]
async fn test_rollback_procedure() {
let dir = TempDir::new("rollback_procedure").unwrap();
let store = procedure_store_for_test(&dir);
let procedure_id = ProcedureId::random();
let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure"));
store
.store_procedure(procedure_id, 0, &procedure, None)
.await
.unwrap();
store.rollback_procedure(procedure_id, 1).await.unwrap();
let messages = store.load_messages().await.unwrap();
assert!(messages.is_empty());
}
#[tokio::test]
async fn test_load_messages() {
let dir = TempDir::new("load_messages").unwrap();
let store = new_procedure_store(&dir);
let store = procedure_store_for_test(&dir);
// store 3 steps
let id0 = ProcedureId::random();