From ecadbc1435db920a8965ab64c76ef40d571bf766 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Tue, 7 Feb 2023 11:33:13 +0800 Subject: [PATCH] feat: Add procedure manager LocalManager (#946) * feat: Add ManagerContext and LocalManager * test: Add register_loader test * feat: Remove some unused methods * fix: Fix submit_procedure ensure condition --- src/common/procedure/src/local.rs | 314 ++++++++++++++++++++++- src/common/procedure/src/local/lock.rs | 8 +- src/common/procedure/src/local/runner.rs | 70 +++++ src/common/procedure/src/procedure.rs | 2 +- src/common/procedure/src/store.rs | 21 +- 5 files changed, 399 insertions(+), 16 deletions(-) create mode 100644 src/common/procedure/src/local/runner.rs diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index 507dc5e3e9..db16e0299e 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -13,12 +13,24 @@ // limitations under the License. mod lock; +mod runner; -use std::sync::{Arc, Mutex}; +use std::collections::HashMap; +use std::sync::{Arc, Mutex, RwLock}; +use async_trait::async_trait; +use object_store::ObjectStore; +use snafu::ensure; use tokio::sync::Notify; -use crate::{LockKey, ProcedureId, ProcedureState}; +use crate::error::{DuplicateProcedureSnafu, LoaderConflictSnafu, Result}; +use crate::local::lock::LockMap; +use crate::local::runner::Runner; +use crate::procedure::BoxedProcedureLoader; +use crate::store::{ObjectStateStore, ProcedureMessage, ProcedureStore, StateStoreRef}; +use crate::{ + BoxedProcedure, LockKey, ProcedureId, ProcedureManager, ProcedureState, ProcedureWithId, +}; /// Mutable metadata of a procedure during execution. #[derive(Debug)] @@ -36,7 +48,7 @@ struct ExecMeta { /// for children; /// 2. always use `notify_one` and ensure there are only one waiter. #[derive(Debug)] -struct ProcedureMeta { +pub(crate) struct ProcedureMeta { /// Id of this procedure. id: ProcedureId, /// Notify to wait for a lock. @@ -68,10 +80,154 @@ impl ProcedureMeta { locks } + + /// Returns current [ProcedureState]. + fn state(&self) -> ProcedureState { + let meta = self.exec_meta.lock().unwrap(); + meta.state.clone() + } + + /// Update current [ProcedureState]. + fn set_state(&self, state: ProcedureState) { + let mut meta = self.exec_meta.lock().unwrap(); + meta.state = state; + } } /// Reference counted pointer to [ProcedureMeta]. type ProcedureMetaRef = Arc; +/// Procedure and its parent procedure id. +struct ProcedureAndParent(BoxedProcedure, Option); + +/// Shared context of the manager. +pub(crate) struct ManagerContext { + /// Procedure loaders. The key is the type name of the procedure which the loader returns. + loaders: Mutex>, + lock_map: LockMap, + procedures: RwLock>, + // TODO(yingwen): Now we never clean the messages. But when the root procedure is done, we + // should be able to remove the its message and all its child messages. + /// Messages loaded from the procedure store. + messages: Mutex>, +} + +impl ManagerContext { + /// Returns a new [ManagerContext]. + fn new() -> ManagerContext { + ManagerContext { + loaders: Mutex::new(HashMap::new()), + lock_map: LockMap::new(), + procedures: RwLock::new(HashMap::new()), + messages: Mutex::new(HashMap::new()), + } + } + + /// Returns true if the procedure with specific `procedure_id` exists. + fn contains_procedure(&self, procedure_id: ProcedureId) -> bool { + let procedures = self.procedures.read().unwrap(); + procedures.contains_key(&procedure_id) + } + + /// Insert the `procedure` to the context. + /// + /// # Panics + /// Panics if the procedure already exists. + fn insert_procedure(&self, meta: ProcedureMetaRef) { + let mut procedures = self.procedures.write().unwrap(); + let old = procedures.insert(meta.id, meta); + assert!(old.is_none()); + } + + /// Returns the [ProcedureState] of specific `procedure_id`. + fn state(&self, procedure_id: ProcedureId) -> Option { + let procedures = self.procedures.read().unwrap(); + procedures.get(&procedure_id).map(|meta| meta.state()) + } +} + +/// Config for [LocalManager]. +#[derive(Debug)] +pub struct ManagerConfig { + /// Object store + object_store: ObjectStore, +} + +/// A [ProcedureManager] that maintains procedure states locally. +pub struct LocalManager { + manager_ctx: Arc, + state_store: StateStoreRef, +} + +impl LocalManager { + /// Create a new [LocalManager] with specific `config`. + pub fn new(config: ManagerConfig) -> LocalManager { + LocalManager { + manager_ctx: Arc::new(ManagerContext::new()), + state_store: Arc::new(ObjectStateStore::new(config.object_store)), + } + } + + /// Submit a root procedure with given `procedure_id`. + fn submit_root(&self, procedure_id: ProcedureId, step: u32, procedure: BoxedProcedure) { + let meta = Arc::new(ProcedureMeta { + id: procedure_id, + lock_notify: Notify::new(), + parent_id: None, + child_notify: Notify::new(), + parent_locks: Vec::new(), + lock_key: procedure.lock_key(), + exec_meta: Mutex::new(ExecMeta { + state: ProcedureState::Running, + }), + }); + let runner = Runner { + meta: meta.clone(), + procedure, + manager_ctx: self.manager_ctx.clone(), + step, + store: ProcedureStore::new(self.state_store.clone()), + }; + + self.manager_ctx.insert_procedure(meta); + + common_runtime::spawn_bg(async move { + // Run the root procedure. + runner.run().await + }); + } +} + +#[async_trait] +impl ProcedureManager for LocalManager { + fn register_loader(&self, name: &str, loader: BoxedProcedureLoader) -> Result<()> { + let mut loaders = self.manager_ctx.loaders.lock().unwrap(); + ensure!(!loaders.contains_key(name), LoaderConflictSnafu { name }); + + loaders.insert(name.to_string(), loader); + + Ok(()) + } + + async fn submit(&self, procedure: ProcedureWithId) -> Result<()> { + let procedure_id = procedure.id; + ensure!( + !self.manager_ctx.contains_procedure(procedure_id), + DuplicateProcedureSnafu { procedure_id } + ); + + self.submit_root(procedure.id, 0, procedure.procedure); + + Ok(()) + } + + async fn recover(&self) -> Result<()> { + unimplemented!() + } + + async fn procedure_state(&self, procedure_id: ProcedureId) -> Result> { + Ok(self.manager_ctx.state(procedure_id)) + } +} /// Create a new [ProcedureMeta] for test purpose. #[cfg(test)] @@ -91,7 +247,13 @@ fn procedure_meta_for_test() -> ProcedureMeta { #[cfg(test)] mod tests { + use object_store::services::fs::Builder; + use serde::{Deserialize, Serialize}; + use tempdir::TempDir; + use super::*; + use crate::error::Error; + use crate::{Context, Procedure, Status}; #[test] fn test_locks_needed() { @@ -111,4 +273,150 @@ mod tests { locks ); } + + fn new_object_store(dir: &TempDir) -> ObjectStore { + let store_dir = dir.path().to_str().unwrap(); + let accessor = Builder::default().root(store_dir).build().unwrap(); + ObjectStore::new(accessor) + } + + #[test] + fn test_manager_context() { + let ctx = ManagerContext::new(); + let meta = Arc::new(procedure_meta_for_test()); + + assert!(!ctx.contains_procedure(meta.id)); + assert!(ctx.state(meta.id).is_none()); + + ctx.insert_procedure(meta.clone()); + assert!(ctx.contains_procedure(meta.id)); + + assert_eq!(ProcedureState::Running, ctx.state(meta.id).unwrap()); + meta.set_state(ProcedureState::Done); + assert_eq!(ProcedureState::Done, ctx.state(meta.id).unwrap()); + } + + #[test] + #[should_panic] + fn test_manager_context_insert_duplicate() { + let ctx = ManagerContext::new(); + let meta = Arc::new(procedure_meta_for_test()); + + ctx.insert_procedure(meta.clone()); + ctx.insert_procedure(meta); + } + + #[test] + fn test_register_loader() { + let dir = TempDir::new("register").unwrap(); + let config = ManagerConfig { + object_store: new_object_store(&dir), + }; + let manager = LocalManager::new(config); + + #[derive(Debug, Serialize, Deserialize)] + struct MockData { + id: u32, + content: String, + } + + #[derive(Debug)] + struct ProcedureToLoad { + data: MockData, + } + + #[async_trait] + impl Procedure for ProcedureToLoad { + fn type_name(&self) -> &str { + "ProcedureToLoad" + } + + async fn execute(&mut self, _ctx: &Context) -> Result { + unimplemented!() + } + + fn dump(&self) -> Result { + Ok(serde_json::to_string(&self.data).unwrap()) + } + + fn lock_key(&self) -> Option { + None + } + } + + let loader = |json: &str| { + let data = serde_json::from_str(json).unwrap(); + let procedure = ProcedureToLoad { data }; + Ok(Box::new(procedure) as _) + }; + manager + .register_loader("ProcedureToLoad", Box::new(loader)) + .unwrap(); + // Register duplicate loader. + let err = manager + .register_loader("ProcedureToLoad", Box::new(loader)) + .unwrap_err(); + assert!(matches!(err, Error::LoaderConflict { .. }), "{err}"); + } + + #[tokio::test] + async fn test_submit_procedure() { + let dir = TempDir::new("submit").unwrap(); + let config = ManagerConfig { + object_store: new_object_store(&dir), + }; + let manager = LocalManager::new(config); + + #[derive(Debug)] + struct MockProcedure {} + + #[async_trait] + impl Procedure for MockProcedure { + fn type_name(&self) -> &str { + "MockProcedure" + } + + async fn execute(&mut self, _ctx: &Context) -> Result { + unimplemented!() + } + + fn dump(&self) -> Result { + unimplemented!() + } + + fn lock_key(&self) -> Option { + Some(LockKey::new("test.submit")) + } + } + + let procedure_id = ProcedureId::random(); + assert!(manager + .procedure_state(procedure_id) + .await + .unwrap() + .is_none()); + + manager + .submit(ProcedureWithId { + id: procedure_id, + procedure: Box::new(MockProcedure {}), + }) + .await + .unwrap(); + assert!(manager + .procedure_state(procedure_id) + .await + .unwrap() + .is_some()); + + // Try to submit procedure with same id again. + let err = manager + .submit(ProcedureWithId { + id: procedure_id, + procedure: Box::new(MockProcedure {}), + }) + .await + .unwrap_err(); + assert!(matches!(err, Error::DuplicateProcedure { .. }), "{err:?}"); + } } diff --git a/src/common/procedure/src/local/lock.rs b/src/common/procedure/src/local/lock.rs index bd337c948d..93e401ec59 100644 --- a/src/common/procedure/src/local/lock.rs +++ b/src/common/procedure/src/local/lock.rs @@ -54,13 +54,13 @@ impl Lock { } /// Manages lock entries for procedures. -struct LockMap { +pub(crate) struct LockMap { locks: RwLock>, } impl LockMap { /// Returns a new [LockMap]. - fn new() -> LockMap { + pub(crate) fn new() -> LockMap { LockMap { locks: RwLock::new(HashMap::new()), } @@ -73,7 +73,7 @@ impl LockMap { /// /// # Panics /// Panics if the procedure acquires the lock recursively. - async fn acquire_lock(&self, key: &str, meta: ProcedureMetaRef) { + pub(crate) async fn acquire_lock(&self, key: &str, meta: ProcedureMetaRef) { assert!(!self.hold_lock(key, meta.id)); { @@ -101,7 +101,7 @@ impl LockMap { } /// Release lock by `key`. - fn release_lock(&self, key: &str, procedure_id: ProcedureId) { + pub(crate) fn release_lock(&self, key: &str, procedure_id: ProcedureId) { let mut locks = self.locks.write().unwrap(); if let Some(lock) = locks.get_mut(key) { if lock.owner.id != procedure_id { diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs new file mode 100644 index 0000000000..0df557a0d9 --- /dev/null +++ b/src/common/procedure/src/local/runner.rs @@ -0,0 +1,70 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_telemetry::logging; + +use crate::local::{ManagerContext, ProcedureMetaRef}; +use crate::store::ProcedureStore; +use crate::BoxedProcedure; + +pub(crate) struct Runner { + pub(crate) meta: ProcedureMetaRef, + pub(crate) procedure: BoxedProcedure, + pub(crate) manager_ctx: Arc, + pub(crate) step: u32, + pub(crate) store: ProcedureStore, +} + +impl Runner { + /// Run the procedure. + pub(crate) async fn run(self) { + logging::info!( + "Runner {}-{} starts", + self.procedure.type_name(), + self.meta.id + ); + // We use the lock key in ProcedureMeta as it considers locks inherited from + // its parent. + let lock_key = self.meta.lock_key.clone(); + + // TODO(yingwen): Support multiple lock keys. + // Acquire lock if necessary. + if let Some(key) = &lock_key { + self.manager_ctx + .lock_map + .acquire_lock(key.key(), self.meta.clone()) + .await; + } + + // TODO(yingwen): Execute the procedure. + + if let Some(key) = &lock_key { + self.manager_ctx + .lock_map + .release_lock(key.key(), self.meta.id); + } + // We can't remove the metadata of the procedure now as users and its parent might + // need to query its state. + // TODO(yingwen): 1. Add TTL to the metadata; 2. Only keep state in the procedure store + // so we don't need to always store the metadata in memory after the procedure is done. + + logging::info!( + "Runner {}-{} exits", + self.procedure.type_name(), + self.meta.id + ); + } +} diff --git a/src/common/procedure/src/procedure.rs b/src/common/procedure/src/procedure.rs index 0c9de92142..57337b4b80 100644 --- a/src/common/procedure/src/procedure.rs +++ b/src/common/procedure/src/procedure.rs @@ -163,7 +163,7 @@ pub type BoxedProcedureLoader = Box Result + Sen // TODO(yingwen): Find a way to return the error message if the procedure is failed. /// State of a submitted procedure. -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq, Eq)] pub enum ProcedureState { /// The procedure is running. Running, diff --git a/src/common/procedure/src/store.rs b/src/common/procedure/src/store.rs index 0f306d9195..ae862dcf34 100644 --- a/src/common/procedure/src/store.rs +++ b/src/common/procedure/src/store.rs @@ -21,28 +21,33 @@ use serde::{Deserialize, Serialize}; use snafu::ResultExt; use crate::error::{Result, ToJsonSnafu}; -use crate::store::state_store::StateStoreRef; +pub(crate) use crate::store::state_store::{ObjectStateStore, StateStoreRef}; use crate::{BoxedProcedure, ProcedureId}; mod state_store; /// Serialized data of a procedure. #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] -struct ProcedureMessage { +pub struct ProcedureMessage { /// Type name of the procedure. The procedure framework also use the type name to /// find a loader to load the procedure. - type_name: String, + pub type_name: String, /// The data of the procedure. - data: String, + pub data: String, /// Parent procedure id. - parent_id: Option, + pub parent_id: Option, } /// Procedure storage layer. #[derive(Clone)] -struct ProcedureStore(StateStoreRef); +pub(crate) struct ProcedureStore(StateStoreRef); impl ProcedureStore { + /// Creates a new [ProcedureStore] from specific [StateStoreRef]. + pub(crate) fn new(state_store: StateStoreRef) -> ProcedureStore { + ProcedureStore(state_store) + } + /// Dump the `procedure` to the storage. async fn store_procedure( &self, @@ -327,7 +332,7 @@ mod tests { #[tokio::test] async fn test_commit_procedure() { - let dir = TempDir::new("store_procedure").unwrap(); + let dir = TempDir::new("commit_procedure").unwrap(); let store = new_procedure_store(&dir); let procedure_id = ProcedureId::random(); @@ -345,7 +350,7 @@ mod tests { #[tokio::test] async fn test_load_messages() { - let dir = TempDir::new("store_procedure").unwrap(); + let dir = TempDir::new("load_messages").unwrap(); let store = new_procedure_store(&dir); // store 3 steps