feat(procedure): Support multi-lock keys and querying procedure state from context (#1006)

* feat: Add ContextProvider to Context

So procedures can query states of other procedures via the
ContextProvider and they don't need to hold a ProcedureManagerRef

* feat: Procedure supports acquring multiple lock keys

* test: Use multi-locks in test

* feat: Add keys_to_lock/unlock
This commit is contained in:
Yingwen
2023-02-15 18:04:19 +08:00
committed by GitHub
parent 5533040be7
commit 04afee216e
7 changed files with 157 additions and 91 deletions

1
Cargo.lock generated
View File

@@ -1527,6 +1527,7 @@ dependencies = [
"object-store",
"serde",
"serde_json",
"smallvec",
"snafu",
"tempdir",
"tokio",

View File

@@ -13,6 +13,7 @@ futures.workspace = true
object-store = { path = "../../object-store" }
serde.workspace = true
serde_json = "1.0"
smallvec = "1"
snafu.workspace = true
tokio.workspace = true
uuid.workspace = true

View File

@@ -24,6 +24,6 @@ mod store;
pub use crate::error::{Error, Result};
pub use crate::procedure::{
BoxedProcedure, Context, LockKey, Procedure, ProcedureId, ProcedureManager,
BoxedProcedure, Context, ContextProvider, LockKey, Procedure, ProcedureId, ProcedureManager,
ProcedureManagerRef, ProcedureState, ProcedureWithId, Status,
};

View File

@@ -30,7 +30,8 @@ use crate::local::runner::Runner;
use crate::procedure::BoxedProcedureLoader;
use crate::store::{ObjectStateStore, ProcedureMessage, ProcedureStore, StateStoreRef};
use crate::{
BoxedProcedure, LockKey, ProcedureId, ProcedureManager, ProcedureState, ProcedureWithId,
BoxedProcedure, ContextProvider, LockKey, ProcedureId, ProcedureManager, ProcedureState,
ProcedureWithId,
};
/// Mutable metadata of a procedure during execution.
@@ -70,7 +71,7 @@ pub(crate) struct ProcedureMeta {
/// Notify to wait for subprocedures.
child_notify: Notify,
/// Lock required by this procedure.
lock_key: Option<LockKey>,
lock_key: LockKey,
/// Mutable status during execution.
exec_meta: Mutex<ExecMeta>,
}
@@ -128,6 +129,13 @@ pub(crate) struct ManagerContext {
messages: Mutex<HashMap<ProcedureId, ProcedureMessage>>,
}
#[async_trait]
impl ContextProvider for ManagerContext {
async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
Ok(self.state(procedure_id))
}
}
impl ManagerContext {
/// Returns a new [ManagerContext].
fn new() -> ManagerContext {
@@ -409,7 +417,7 @@ mod test_util {
lock_notify: Notify::new(),
parent_id: None,
child_notify: Notify::new(),
lock_key: None,
lock_key: LockKey::default(),
exec_meta: Mutex::new(ExecMeta::default()),
}
}
@@ -508,8 +516,8 @@ mod tests {
Ok(self.content.clone())
}
fn lock_key(&self) -> Option<LockKey> {
None
fn lock_key(&self) -> LockKey {
LockKey::default()
}
}
@@ -617,8 +625,8 @@ mod tests {
unimplemented!()
}
fn lock_key(&self) -> Option<LockKey> {
Some(LockKey::new("test.submit"))
fn lock_key(&self) -> LockKey {
LockKey::single("test.submit")
}
}

View File

@@ -70,19 +70,14 @@ impl Runner {
self.procedure.type_name(),
self.meta.id
);
// We use the lock key in ProcedureMeta as it considers locks inherited from
// its parent.
let lock_key = self.meta.lock_key.clone();
// TODO(yingwen):
// 1. Support multiple lock keys;
// 2. Detect recursive locking (and deadlock) if possible. Maybe we could detect
// TODO(yingwen): Detect recursive locking (and deadlock) if possible. Maybe we could detect
// recursive locking by adding a root procedure id to the meta.
// Acquire lock if necessary.
if let Some(key) = &lock_key {
for key in self.meta.lock_key.keys_to_lock() {
// Acquire lock for each key.
self.manager_ctx
.lock_map
.acquire_lock(key.key(), self.meta.clone())
.acquire_lock(key, self.meta.clone())
.await;
}
@@ -98,10 +93,9 @@ impl Runner {
self.manager_ctx.notify_by_subprocedure(parent_id);
}
if let Some(key) = &lock_key {
self.manager_ctx
.lock_map
.release_lock(key.key(), self.meta.id);
// Release lock in reverse order.
for key in self.meta.lock_key.keys_to_unlock() {
self.manager_ctx.lock_map.release_lock(key, self.meta.id);
}
// If this is the root procedure, clean up message cache.
@@ -127,6 +121,7 @@ impl Runner {
async fn execute_procedure_in_loop(&mut self) -> Result<()> {
let ctx = Context {
procedure_id: self.meta.id,
provider: self.manager_ctx.clone(),
};
loop {
@@ -366,7 +361,7 @@ mod tests {
use super::*;
use crate::local::test_util;
use crate::{LockKey, Procedure};
use crate::{ContextProvider, LockKey, Procedure};
const ROOT_ID: &str = "9f805a1f-05f7-490c-9f91-bd56e3cc54c1";
@@ -397,10 +392,29 @@ mod tests {
assert_eq!(files, files_in_dir);
}
fn context_without_provider(procedure_id: ProcedureId) -> Context {
struct MockProvider;
#[async_trait]
impl ContextProvider for MockProvider {
async fn procedure_state(
&self,
_procedure_id: ProcedureId,
) -> Result<Option<ProcedureState>> {
unimplemented!()
}
}
Context {
procedure_id,
provider: Arc::new(MockProvider),
}
}
#[derive(Debug)]
struct ProcedureAdapter<F> {
data: String,
lock_key: Option<LockKey>,
lock_key: LockKey,
exec_fn: F,
}
@@ -417,14 +431,14 @@ mod tests {
#[async_trait]
impl<F> Procedure for ProcedureAdapter<F>
where
F: FnMut() -> BoxFuture<'static, Result<Status>> + Send + Sync,
F: FnMut(Context) -> BoxFuture<'static, Result<Status>> + Send + Sync,
{
fn type_name(&self) -> &str {
"ProcedureAdapter"
}
async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
let f = (self.exec_fn)();
async fn execute(&mut self, ctx: &Context) -> Result<Status> {
let f = (self.exec_fn)(ctx.clone());
f.await
}
@@ -432,14 +446,14 @@ mod tests {
Ok(self.data.clone())
}
fn lock_key(&self) -> Option<LockKey> {
fn lock_key(&self) -> LockKey {
self.lock_key.clone()
}
}
async fn execute_once_normal(persist: bool, first_files: &[&str], second_files: &[&str]) {
let mut times = 0;
let exec_fn = move || {
let exec_fn = move |_| {
times += 1;
async move {
if times == 1 {
@@ -452,15 +466,13 @@ mod tests {
};
let normal = ProcedureAdapter {
data: "normal".to_string(),
lock_key: Some(LockKey::new("catalog.schema.table")),
lock_key: LockKey::single("catalog.schema.table"),
exec_fn,
};
let dir = TempDir::new("normal").unwrap();
let meta = normal.new_meta(ROOT_ID);
let ctx = Context {
procedure_id: meta.id,
};
let ctx = context_without_provider(meta.id);
let object_store = test_util::new_object_store(&dir);
let procedure_store = ProcedureStore::from(object_store.clone());
let mut runner = new_runner(meta, Box::new(normal), procedure_store);
@@ -491,7 +503,7 @@ mod tests {
#[tokio::test]
async fn test_on_suspend_empty() {
let exec_fn = move || {
let exec_fn = move |_| {
async move {
Ok(Status::Suspended {
subprocedures: Vec::new(),
@@ -502,15 +514,13 @@ mod tests {
};
let suspend = ProcedureAdapter {
data: "suspend".to_string(),
lock_key: Some(LockKey::new("catalog.schema.table")),
lock_key: LockKey::single("catalog.schema.table"),
exec_fn,
};
let dir = TempDir::new("suspend").unwrap();
let meta = suspend.new_meta(ROOT_ID);
let ctx = Context {
procedure_id: meta.id,
};
let ctx = context_without_provider(meta.id);
let object_store = test_util::new_object_store(&dir);
let procedure_store = ProcedureStore::from(object_store.clone());
let mut runner = new_runner(meta, Box::new(suspend), procedure_store);
@@ -519,9 +529,9 @@ mod tests {
assert!(res.is_continue(), "{res:?}");
}
fn new_child_procedure(procedure_id: ProcedureId, key: &str) -> ProcedureWithId {
fn new_child_procedure(procedure_id: ProcedureId, keys: &[&str]) -> ProcedureWithId {
let mut times = 0;
let exec_fn = move || {
let exec_fn = move |_| {
times += 1;
async move {
if times == 1 {
@@ -535,7 +545,7 @@ mod tests {
};
let child = ProcedureAdapter {
data: "child".to_string(),
lock_key: Some(LockKey::new(key)),
lock_key: LockKey::new(keys.iter().map(|k| k.to_string())),
exec_fn,
};
@@ -550,16 +560,18 @@ mod tests {
let mut times = 0;
let children_ids = [ProcedureId::random(), ProcedureId::random()];
let keys = [
"catalog.schema.table.region-0",
"catalog.schema.table.region-1",
&[
"catalog.schema.table.region-0",
"catalog.schema.table.region-1",
],
&[
"catalog.schema.table.region-2",
"catalog.schema.table.region-3",
],
];
let manager_ctx = Arc::new(ManagerContext::new());
let ctx_in_fn = manager_ctx.clone();
let exec_fn = move || {
let exec_fn = move |ctx: Context| {
times += 1;
let ctx_in_future = ctx_in_fn.clone();
async move {
if times == 1 {
// Submit subprocedures.
@@ -567,15 +579,20 @@ mod tests {
subprocedures: children_ids
.into_iter()
.zip(keys)
.map(|(id, key)| new_child_procedure(id, key))
.map(|(id, key_slice)| new_child_procedure(id, key_slice))
.collect(),
persist: true,
})
} else {
// Wait for subprocedures.
let all_child_done = children_ids
.iter()
.all(|id| ctx_in_future.state(*id) == Some(ProcedureState::Done));
let mut all_child_done = true;
for id in children_ids {
if ctx.provider.procedure_state(id).await.unwrap()
!= Some(ProcedureState::Done)
{
all_child_done = false;
}
}
if all_child_done {
Ok(Status::Done)
} else {
@@ -591,19 +608,20 @@ mod tests {
};
let parent = ProcedureAdapter {
data: "parent".to_string(),
lock_key: Some(LockKey::new("catalog.schema.table")),
lock_key: LockKey::single("catalog.schema.table"),
exec_fn,
};
let dir = TempDir::new("parent").unwrap();
let meta = parent.new_meta(ROOT_ID);
let procedure_id = meta.id;
// Manually add this procedure to the manager ctx.
assert!(manager_ctx.try_insert_procedure(meta.clone()));
let object_store = test_util::new_object_store(&dir);
let procedure_store = ProcedureStore::from(object_store.clone());
let mut runner = new_runner(meta, Box::new(parent), procedure_store);
let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store);
let manager_ctx = Arc::new(ManagerContext::new());
// Manually add this procedure to the manager ctx.
assert!(manager_ctx.try_insert_procedure(meta));
// Replace the manager ctx.
runner.manager_ctx = manager_ctx;
@@ -629,18 +647,16 @@ mod tests {
#[tokio::test]
async fn test_execute_on_error() {
let exec_fn =
|| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
|_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
let fail = ProcedureAdapter {
data: "fail".to_string(),
lock_key: Some(LockKey::new("catalog.schema.table")),
lock_key: LockKey::single("catalog.schema.table"),
exec_fn,
};
let dir = TempDir::new("fail").unwrap();
let meta = fail.new_meta(ROOT_ID);
let ctx = Context {
procedure_id: meta.id,
};
let ctx = context_without_provider(meta.id);
let object_store = test_util::new_object_store(&dir);
let procedure_store = ProcedureStore::from(object_store.clone());
let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store);
@@ -654,24 +670,20 @@ mod tests {
#[tokio::test]
async fn test_child_error() {
let mut times = 0;
let manager_ctx = Arc::new(ManagerContext::new());
let child_id = ProcedureId::random();
let ctx_in_fn = manager_ctx.clone();
let exec_fn = move || {
let exec_fn = move |ctx: Context| {
times += 1;
let ctx_in_future = ctx_in_fn.clone();
async move {
if times == 1 {
// Submit subprocedures.
let exec_fn = || {
let exec_fn = |_| {
async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }
.boxed()
};
let fail = ProcedureAdapter {
data: "fail".to_string(),
lock_key: Some(LockKey::new("catalog.schema.table.region-0")),
lock_key: LockKey::single("catalog.schema.table.region-0"),
exec_fn,
};
@@ -684,8 +696,8 @@ mod tests {
})
} else {
// Wait for subprocedures.
logging::info!("child state is {:?}", ctx_in_future.state(child_id));
if ctx_in_future.state(child_id) == Some(ProcedureState::Failed) {
let state = ctx.provider.procedure_state(child_id).await.unwrap();
if state == Some(ProcedureState::Failed) {
// The parent procedure to abort itself if child procedure is failed.
Err(Error::external(PlainError::new(
"subprocedure failed".to_string(),
@@ -704,18 +716,20 @@ mod tests {
};
let parent = ProcedureAdapter {
data: "parent".to_string(),
lock_key: Some(LockKey::new("catalog.schema.table")),
lock_key: LockKey::single("catalog.schema.table"),
exec_fn,
};
let dir = TempDir::new("child_err").unwrap();
let meta = parent.new_meta(ROOT_ID);
// Manually add this procedure to the manager ctx.
assert!(manager_ctx.try_insert_procedure(meta.clone()));
let object_store = test_util::new_object_store(&dir);
let procedure_store = ProcedureStore::from(object_store.clone());
let mut runner = new_runner(meta, Box::new(parent), procedure_store);
let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store);
let manager_ctx = Arc::new(ManagerContext::new());
// Manually add this procedure to the manager ctx.
assert!(manager_ctx.try_insert_procedure(meta));
// Replace the manager ctx.
runner.manager_ctx = manager_ctx;

View File

@@ -18,6 +18,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
use snafu::{ResultExt, Snafu};
use uuid::Uuid;
@@ -58,11 +59,23 @@ impl Status {
}
}
/// [ContextProvider] provides information about procedures in the [ProcedureManager].
#[async_trait]
pub trait ContextProvider: Send + Sync {
/// Query the procedure state.
async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>>;
}
/// Reference-counted pointer to [ContextProvider].
pub type ContextProviderRef = Arc<dyn ContextProvider>;
/// Procedure execution context.
#[derive(Debug)]
#[derive(Clone)]
pub struct Context {
/// Id of the procedure.
pub procedure_id: ProcedureId,
/// [ProcedureManager] context provider.
pub provider: ContextProviderRef,
}
/// A `Procedure` represents an operation or a set of operations to be performed step-by-step.
@@ -79,25 +92,41 @@ pub trait Procedure: Send + Sync {
/// Dump the state of the procedure to a string.
fn dump(&self) -> Result<String>;
/// Returns the [LockKey] if this procedure needs to acquire lock.
fn lock_key(&self) -> Option<LockKey>;
/// Returns the [LockKey] that this procedure needs to acquire.
fn lock_key(&self) -> LockKey;
}
/// A key to identify the lock.
// We might hold multiple keys in this struct. When there are multiple keys, we need to sort the
// keys lock all the keys in order to avoid dead lock.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct LockKey(String);
/// Keys to identify required locks.
///
/// [LockKey] always sorts keys lexicographically so that they can be acquired
/// in the same order.
// Most procedures should only acquire 1 ~ 2 locks so we use smallvec to hold keys.
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct LockKey(SmallVec<[String; 2]>);
impl LockKey {
/// Returns a new [LockKey].
pub fn new(key: impl Into<String>) -> LockKey {
LockKey(key.into())
/// Returns a new [LockKey] with only one key.
pub fn single(key: impl Into<String>) -> LockKey {
LockKey(smallvec![key.into()])
}
/// Returns the lock key.
pub fn key(&self) -> &str {
&self.0
/// Returns a new [LockKey] with keys from specific `iter`.
pub fn new(iter: impl IntoIterator<Item = String>) -> LockKey {
let mut vec: SmallVec<_> = iter.into_iter().collect();
vec.sort();
// Dedup keys to avoid acquiring the same key multiple times.
vec.dedup();
LockKey(vec)
}
/// Returns the keys to lock.
pub fn keys_to_lock(&self) -> impl Iterator<Item = &String> {
self.0.iter()
}
/// Returns the keys to unlock.
pub fn keys_to_unlock(&self) -> impl Iterator<Item = &String> {
self.0.iter().rev()
}
}
@@ -235,8 +264,21 @@ mod tests {
#[test]
fn test_lock_key() {
let entity = "catalog.schema.my_table";
let key = LockKey::new(entity);
assert_eq!(entity, key.key());
let key = LockKey::single(entity);
assert_eq!(vec![entity], key.keys_to_lock().collect::<Vec<_>>());
assert_eq!(vec![entity], key.keys_to_unlock().collect::<Vec<_>>());
let key = LockKey::new([
"b".to_string(),
"c".to_string(),
"a".to_string(),
"c".to_string(),
]);
assert_eq!(vec!["a", "b", "c"], key.keys_to_lock().collect::<Vec<_>>());
assert_eq!(
vec!["c", "b", "a"],
key.keys_to_unlock().collect::<Vec<_>>()
);
}
#[test]

View File

@@ -365,8 +365,8 @@ mod tests {
Ok(self.data.clone())
}
fn lock_key(&self) -> Option<LockKey> {
None
fn lock_key(&self) -> LockKey {
LockKey::default()
}
}