feat: Add procedure manager LocalManager (#946)

* feat: Add ManagerContext and LocalManager

* test: Add register_loader test

* feat: Remove some unused methods

* fix: Fix submit_procedure ensure condition
This commit is contained in:
Yingwen
2023-02-07 11:33:13 +08:00
committed by GitHub
parent afac885c10
commit ecadbc1435
5 changed files with 399 additions and 16 deletions

View File

@@ -13,12 +13,24 @@
// limitations under the License.
mod lock;
mod runner;
use std::sync::{Arc, Mutex};
use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock};
use async_trait::async_trait;
use object_store::ObjectStore;
use snafu::ensure;
use tokio::sync::Notify;
use crate::{LockKey, ProcedureId, ProcedureState};
use crate::error::{DuplicateProcedureSnafu, LoaderConflictSnafu, Result};
use crate::local::lock::LockMap;
use crate::local::runner::Runner;
use crate::procedure::BoxedProcedureLoader;
use crate::store::{ObjectStateStore, ProcedureMessage, ProcedureStore, StateStoreRef};
use crate::{
BoxedProcedure, LockKey, ProcedureId, ProcedureManager, ProcedureState, ProcedureWithId,
};
/// Mutable metadata of a procedure during execution.
#[derive(Debug)]
@@ -36,7 +48,7 @@ struct ExecMeta {
/// for children;
/// 2. always use `notify_one` and ensure there are only one waiter.
#[derive(Debug)]
struct ProcedureMeta {
pub(crate) struct ProcedureMeta {
/// Id of this procedure.
id: ProcedureId,
/// Notify to wait for a lock.
@@ -68,10 +80,154 @@ impl ProcedureMeta {
locks
}
/// Returns current [ProcedureState].
fn state(&self) -> ProcedureState {
let meta = self.exec_meta.lock().unwrap();
meta.state.clone()
}
/// Update current [ProcedureState].
fn set_state(&self, state: ProcedureState) {
let mut meta = self.exec_meta.lock().unwrap();
meta.state = state;
}
}
/// Reference counted pointer to [ProcedureMeta].
type ProcedureMetaRef = Arc<ProcedureMeta>;
/// Procedure and its parent procedure id.
struct ProcedureAndParent(BoxedProcedure, Option<ProcedureId>);
/// Shared context of the manager.
pub(crate) struct ManagerContext {
/// Procedure loaders. The key is the type name of the procedure which the loader returns.
loaders: Mutex<HashMap<String, BoxedProcedureLoader>>,
lock_map: LockMap,
procedures: RwLock<HashMap<ProcedureId, ProcedureMetaRef>>,
// TODO(yingwen): Now we never clean the messages. But when the root procedure is done, we
// should be able to remove the its message and all its child messages.
/// Messages loaded from the procedure store.
messages: Mutex<HashMap<ProcedureId, ProcedureMessage>>,
}
impl ManagerContext {
/// Returns a new [ManagerContext].
fn new() -> ManagerContext {
ManagerContext {
loaders: Mutex::new(HashMap::new()),
lock_map: LockMap::new(),
procedures: RwLock::new(HashMap::new()),
messages: Mutex::new(HashMap::new()),
}
}
/// Returns true if the procedure with specific `procedure_id` exists.
fn contains_procedure(&self, procedure_id: ProcedureId) -> bool {
let procedures = self.procedures.read().unwrap();
procedures.contains_key(&procedure_id)
}
/// Insert the `procedure` to the context.
///
/// # Panics
/// Panics if the procedure already exists.
fn insert_procedure(&self, meta: ProcedureMetaRef) {
let mut procedures = self.procedures.write().unwrap();
let old = procedures.insert(meta.id, meta);
assert!(old.is_none());
}
/// Returns the [ProcedureState] of specific `procedure_id`.
fn state(&self, procedure_id: ProcedureId) -> Option<ProcedureState> {
let procedures = self.procedures.read().unwrap();
procedures.get(&procedure_id).map(|meta| meta.state())
}
}
/// Config for [LocalManager].
#[derive(Debug)]
pub struct ManagerConfig {
/// Object store
object_store: ObjectStore,
}
/// A [ProcedureManager] that maintains procedure states locally.
pub struct LocalManager {
manager_ctx: Arc<ManagerContext>,
state_store: StateStoreRef,
}
impl LocalManager {
/// Create a new [LocalManager] with specific `config`.
pub fn new(config: ManagerConfig) -> LocalManager {
LocalManager {
manager_ctx: Arc::new(ManagerContext::new()),
state_store: Arc::new(ObjectStateStore::new(config.object_store)),
}
}
/// Submit a root procedure with given `procedure_id`.
fn submit_root(&self, procedure_id: ProcedureId, step: u32, procedure: BoxedProcedure) {
let meta = Arc::new(ProcedureMeta {
id: procedure_id,
lock_notify: Notify::new(),
parent_id: None,
child_notify: Notify::new(),
parent_locks: Vec::new(),
lock_key: procedure.lock_key(),
exec_meta: Mutex::new(ExecMeta {
state: ProcedureState::Running,
}),
});
let runner = Runner {
meta: meta.clone(),
procedure,
manager_ctx: self.manager_ctx.clone(),
step,
store: ProcedureStore::new(self.state_store.clone()),
};
self.manager_ctx.insert_procedure(meta);
common_runtime::spawn_bg(async move {
// Run the root procedure.
runner.run().await
});
}
}
#[async_trait]
impl ProcedureManager for LocalManager {
fn register_loader(&self, name: &str, loader: BoxedProcedureLoader) -> Result<()> {
let mut loaders = self.manager_ctx.loaders.lock().unwrap();
ensure!(!loaders.contains_key(name), LoaderConflictSnafu { name });
loaders.insert(name.to_string(), loader);
Ok(())
}
async fn submit(&self, procedure: ProcedureWithId) -> Result<()> {
let procedure_id = procedure.id;
ensure!(
!self.manager_ctx.contains_procedure(procedure_id),
DuplicateProcedureSnafu { procedure_id }
);
self.submit_root(procedure.id, 0, procedure.procedure);
Ok(())
}
async fn recover(&self) -> Result<()> {
unimplemented!()
}
async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
Ok(self.manager_ctx.state(procedure_id))
}
}
/// Create a new [ProcedureMeta] for test purpose.
#[cfg(test)]
@@ -91,7 +247,13 @@ fn procedure_meta_for_test() -> ProcedureMeta {
#[cfg(test)]
mod tests {
use object_store::services::fs::Builder;
use serde::{Deserialize, Serialize};
use tempdir::TempDir;
use super::*;
use crate::error::Error;
use crate::{Context, Procedure, Status};
#[test]
fn test_locks_needed() {
@@ -111,4 +273,150 @@ mod tests {
locks
);
}
fn new_object_store(dir: &TempDir) -> ObjectStore {
let store_dir = dir.path().to_str().unwrap();
let accessor = Builder::default().root(store_dir).build().unwrap();
ObjectStore::new(accessor)
}
#[test]
fn test_manager_context() {
let ctx = ManagerContext::new();
let meta = Arc::new(procedure_meta_for_test());
assert!(!ctx.contains_procedure(meta.id));
assert!(ctx.state(meta.id).is_none());
ctx.insert_procedure(meta.clone());
assert!(ctx.contains_procedure(meta.id));
assert_eq!(ProcedureState::Running, ctx.state(meta.id).unwrap());
meta.set_state(ProcedureState::Done);
assert_eq!(ProcedureState::Done, ctx.state(meta.id).unwrap());
}
#[test]
#[should_panic]
fn test_manager_context_insert_duplicate() {
let ctx = ManagerContext::new();
let meta = Arc::new(procedure_meta_for_test());
ctx.insert_procedure(meta.clone());
ctx.insert_procedure(meta);
}
#[test]
fn test_register_loader() {
let dir = TempDir::new("register").unwrap();
let config = ManagerConfig {
object_store: new_object_store(&dir),
};
let manager = LocalManager::new(config);
#[derive(Debug, Serialize, Deserialize)]
struct MockData {
id: u32,
content: String,
}
#[derive(Debug)]
struct ProcedureToLoad {
data: MockData,
}
#[async_trait]
impl Procedure for ProcedureToLoad {
fn type_name(&self) -> &str {
"ProcedureToLoad"
}
async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
unimplemented!()
}
fn dump(&self) -> Result<String> {
Ok(serde_json::to_string(&self.data).unwrap())
}
fn lock_key(&self) -> Option<LockKey> {
None
}
}
let loader = |json: &str| {
let data = serde_json::from_str(json).unwrap();
let procedure = ProcedureToLoad { data };
Ok(Box::new(procedure) as _)
};
manager
.register_loader("ProcedureToLoad", Box::new(loader))
.unwrap();
// Register duplicate loader.
let err = manager
.register_loader("ProcedureToLoad", Box::new(loader))
.unwrap_err();
assert!(matches!(err, Error::LoaderConflict { .. }), "{err}");
}
#[tokio::test]
async fn test_submit_procedure() {
let dir = TempDir::new("submit").unwrap();
let config = ManagerConfig {
object_store: new_object_store(&dir),
};
let manager = LocalManager::new(config);
#[derive(Debug)]
struct MockProcedure {}
#[async_trait]
impl Procedure for MockProcedure {
fn type_name(&self) -> &str {
"MockProcedure"
}
async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
unimplemented!()
}
fn dump(&self) -> Result<String> {
unimplemented!()
}
fn lock_key(&self) -> Option<LockKey> {
Some(LockKey::new("test.submit"))
}
}
let procedure_id = ProcedureId::random();
assert!(manager
.procedure_state(procedure_id)
.await
.unwrap()
.is_none());
manager
.submit(ProcedureWithId {
id: procedure_id,
procedure: Box::new(MockProcedure {}),
})
.await
.unwrap();
assert!(manager
.procedure_state(procedure_id)
.await
.unwrap()
.is_some());
// Try to submit procedure with same id again.
let err = manager
.submit(ProcedureWithId {
id: procedure_id,
procedure: Box::new(MockProcedure {}),
})
.await
.unwrap_err();
assert!(matches!(err, Error::DuplicateProcedure { .. }), "{err:?}");
}
}

View File

@@ -54,13 +54,13 @@ impl Lock {
}
/// Manages lock entries for procedures.
struct LockMap {
pub(crate) struct LockMap {
locks: RwLock<HashMap<String, Lock>>,
}
impl LockMap {
/// Returns a new [LockMap].
fn new() -> LockMap {
pub(crate) fn new() -> LockMap {
LockMap {
locks: RwLock::new(HashMap::new()),
}
@@ -73,7 +73,7 @@ impl LockMap {
///
/// # Panics
/// Panics if the procedure acquires the lock recursively.
async fn acquire_lock(&self, key: &str, meta: ProcedureMetaRef) {
pub(crate) async fn acquire_lock(&self, key: &str, meta: ProcedureMetaRef) {
assert!(!self.hold_lock(key, meta.id));
{
@@ -101,7 +101,7 @@ impl LockMap {
}
/// Release lock by `key`.
fn release_lock(&self, key: &str, procedure_id: ProcedureId) {
pub(crate) fn release_lock(&self, key: &str, procedure_id: ProcedureId) {
let mut locks = self.locks.write().unwrap();
if let Some(lock) = locks.get_mut(key) {
if lock.owner.id != procedure_id {

View File

@@ -0,0 +1,70 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_telemetry::logging;
use crate::local::{ManagerContext, ProcedureMetaRef};
use crate::store::ProcedureStore;
use crate::BoxedProcedure;
pub(crate) struct Runner {
pub(crate) meta: ProcedureMetaRef,
pub(crate) procedure: BoxedProcedure,
pub(crate) manager_ctx: Arc<ManagerContext>,
pub(crate) step: u32,
pub(crate) store: ProcedureStore,
}
impl Runner {
/// Run the procedure.
pub(crate) async fn run(self) {
logging::info!(
"Runner {}-{} starts",
self.procedure.type_name(),
self.meta.id
);
// We use the lock key in ProcedureMeta as it considers locks inherited from
// its parent.
let lock_key = self.meta.lock_key.clone();
// TODO(yingwen): Support multiple lock keys.
// Acquire lock if necessary.
if let Some(key) = &lock_key {
self.manager_ctx
.lock_map
.acquire_lock(key.key(), self.meta.clone())
.await;
}
// TODO(yingwen): Execute the procedure.
if let Some(key) = &lock_key {
self.manager_ctx
.lock_map
.release_lock(key.key(), self.meta.id);
}
// We can't remove the metadata of the procedure now as users and its parent might
// need to query its state.
// TODO(yingwen): 1. Add TTL to the metadata; 2. Only keep state in the procedure store
// so we don't need to always store the metadata in memory after the procedure is done.
logging::info!(
"Runner {}-{} exits",
self.procedure.type_name(),
self.meta.id
);
}
}

View File

@@ -163,7 +163,7 @@ pub type BoxedProcedureLoader = Box<dyn Fn(&str) -> Result<BoxedProcedure> + Sen
// TODO(yingwen): Find a way to return the error message if the procedure is failed.
/// State of a submitted procedure.
#[derive(Debug)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ProcedureState {
/// The procedure is running.
Running,

View File

@@ -21,28 +21,33 @@ use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use crate::error::{Result, ToJsonSnafu};
use crate::store::state_store::StateStoreRef;
pub(crate) use crate::store::state_store::{ObjectStateStore, StateStoreRef};
use crate::{BoxedProcedure, ProcedureId};
mod state_store;
/// Serialized data of a procedure.
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
struct ProcedureMessage {
pub struct ProcedureMessage {
/// Type name of the procedure. The procedure framework also use the type name to
/// find a loader to load the procedure.
type_name: String,
pub type_name: String,
/// The data of the procedure.
data: String,
pub data: String,
/// Parent procedure id.
parent_id: Option<ProcedureId>,
pub parent_id: Option<ProcedureId>,
}
/// Procedure storage layer.
#[derive(Clone)]
struct ProcedureStore(StateStoreRef);
pub(crate) struct ProcedureStore(StateStoreRef);
impl ProcedureStore {
/// Creates a new [ProcedureStore] from specific [StateStoreRef].
pub(crate) fn new(state_store: StateStoreRef) -> ProcedureStore {
ProcedureStore(state_store)
}
/// Dump the `procedure` to the storage.
async fn store_procedure(
&self,
@@ -327,7 +332,7 @@ mod tests {
#[tokio::test]
async fn test_commit_procedure() {
let dir = TempDir::new("store_procedure").unwrap();
let dir = TempDir::new("commit_procedure").unwrap();
let store = new_procedure_store(&dir);
let procedure_id = ProcedureId::random();
@@ -345,7 +350,7 @@ mod tests {
#[tokio::test]
async fn test_load_messages() {
let dir = TempDir::new("store_procedure").unwrap();
let dir = TempDir::new("load_messages").unwrap();
let store = new_procedure_store(&dir);
// store 3 steps