diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index fa8500256c..9631347c28 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -108,6 +108,7 @@ impl ProcedureMeta { /// Reference counted pointer to [ProcedureMeta]. type ProcedureMetaRef = Arc; + /// Procedure loaded from store. struct LoadedProcedure { procedure: BoxedProcedure, @@ -176,9 +177,20 @@ impl ManagerContext { /// Load procedure with specific `procedure_id` from cached [ProcedureMessage]s. fn load_one_procedure(&self, procedure_id: ProcedureId) -> Option { - let messages = self.messages.lock().unwrap(); - let message = messages.get(&procedure_id)?; + let message = { + let messages = self.messages.lock().unwrap(); + messages.get(&procedure_id).cloned()? + }; + self.load_one_procedure_from_message(procedure_id, &message) + } + + /// Load procedure from specific [ProcedureMessage]. + fn load_one_procedure_from_message( + &self, + procedure_id: ProcedureId, + message: &ProcedureMessage, + ) -> Option { let loaders = self.loaders.lock().unwrap(); let loader = loaders.get(&message.type_name).or_else(|| { logging::error!( @@ -344,7 +356,38 @@ impl ProcedureManager for LocalManager { } async fn recover(&self) -> Result<()> { - todo!("Recover procedure and messages") + logging::info!("LocalManager start to recover"); + + let procedure_store = ProcedureStore::new(self.state_store.clone()); + let messages = procedure_store.load_messages().await?; + + for (procedure_id, message) in &messages { + if message.parent_id.is_none() { + // This is the root procedure. We only submit the root procedure as it will + // submit sub-procedures to the manager. + let Some(loaded_procedure) = self.manager_ctx.load_one_procedure_from_message(*procedure_id, message) else { + // Try to load other procedures. + continue; + }; + + logging::info!( + "Recover root procedure {}-{}, step: {}", + loaded_procedure.procedure.type_name(), + procedure_id, + loaded_procedure.step + ); + + if let Err(e) = self.submit_root( + *procedure_id, + loaded_procedure.step, + loaded_procedure.procedure, + ) { + logging::error!(e; "Failed to recover procedure {}", procedure_id); + } + } + } + + Ok(()) } async fn procedure_state(&self, procedure_id: ProcedureId) -> Result> { @@ -380,7 +423,6 @@ mod test_util { #[cfg(test)] mod tests { - use serde::{Deserialize, Serialize}; use tempdir::TempDir; use super::*; @@ -447,6 +489,46 @@ mod tests { assert_eq!(expect, ctx.procedures_in_tree(&root)); } + #[derive(Debug)] + struct ProcedureToLoad { + content: String, + } + + #[async_trait] + impl Procedure for ProcedureToLoad { + fn type_name(&self) -> &str { + "ProcedureToLoad" + } + + async fn execute(&mut self, _ctx: &Context) -> Result { + Ok(Status::Done) + } + + fn dump(&self) -> Result { + Ok(self.content.clone()) + } + + fn lock_key(&self) -> Option { + None + } + } + + impl ProcedureToLoad { + fn new(content: &str) -> ProcedureToLoad { + ProcedureToLoad { + content: content.to_string(), + } + } + + fn loader() -> BoxedProcedureLoader { + let f = |json: &str| { + let procedure = ProcedureToLoad::new(json); + Ok(Box::new(procedure) as _) + }; + Box::new(f) + } + } + #[test] fn test_register_loader() { let dir = TempDir::new("register").unwrap(); @@ -455,51 +537,61 @@ mod tests { }; let manager = LocalManager::new(config); - #[derive(Debug, Serialize, Deserialize)] - struct MockData { - id: u32, - content: String, - } - - #[derive(Debug)] - struct ProcedureToLoad { - data: MockData, - } - - #[async_trait] - impl Procedure for ProcedureToLoad { - fn type_name(&self) -> &str { - "ProcedureToLoad" - } - - async fn execute(&mut self, _ctx: &Context) -> Result { - unimplemented!() - } - - fn dump(&self) -> Result { - Ok(serde_json::to_string(&self.data).unwrap()) - } - - fn lock_key(&self) -> Option { - None - } - } - - let loader = |json: &str| { - let data = serde_json::from_str(json).unwrap(); - let procedure = ProcedureToLoad { data }; - Ok(Box::new(procedure) as _) - }; manager - .register_loader("ProcedureToLoad", Box::new(loader)) + .register_loader("ProcedureToLoad", ProcedureToLoad::loader()) .unwrap(); // Register duplicate loader. let err = manager - .register_loader("ProcedureToLoad", Box::new(loader)) + .register_loader("ProcedureToLoad", ProcedureToLoad::loader()) .unwrap_err(); assert!(matches!(err, Error::LoaderConflict { .. }), "{err}"); } + #[tokio::test] + async fn test_recover() { + let dir = TempDir::new("recover").unwrap(); + let object_store = test_util::new_object_store(&dir); + let config = ManagerConfig { + object_store: object_store.clone(), + }; + let manager = LocalManager::new(config); + + manager + .register_loader("ProcedureToLoad", ProcedureToLoad::loader()) + .unwrap(); + + // Prepare data + let procedure_store = ProcedureStore::from(object_store.clone()); + let root: BoxedProcedure = Box::new(ProcedureToLoad::new("test recover manager")); + let root_id = ProcedureId::random(); + // Prepare data for the root procedure. + for step in 0..3 { + procedure_store + .store_procedure(root_id, step, &root, None) + .await + .unwrap(); + } + + let child: BoxedProcedure = Box::new(ProcedureToLoad::new("a child procedure")); + let child_id = ProcedureId::random(); + // Prepare data for the child procedure + for step in 0..2 { + procedure_store + .store_procedure(child_id, step, &child, Some(root_id)) + .await + .unwrap(); + } + + // Recover the manager + manager.recover().await.unwrap(); + + // The manager should submit the root procedure. + assert!(manager.procedure_state(root_id).await.unwrap().is_some()); + // Since the mocked root procedure actually doesn't submit subprocedures, so there is no + // related state. + assert!(manager.procedure_state(child_id).await.unwrap().is_none()); + } + #[tokio::test] async fn test_submit_procedure() { let dir = TempDir::new("submit").unwrap(); diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index b61ee17e40..e3d1ab485d 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -366,7 +366,6 @@ mod tests { use super::*; use crate::local::test_util; - use crate::store::ObjectStateStore; use crate::{LockKey, Procedure}; const ROOT_ID: &str = "9f805a1f-05f7-490c-9f91-bd56e3cc54c1"; @@ -385,12 +384,6 @@ mod tests { } } - fn new_procedure_store(object_store: ObjectStore) -> ProcedureStore { - let state_store = ObjectStateStore::new(object_store); - - ProcedureStore::new(Arc::new(state_store)) - } - async fn check_files(object_store: &ObjectStore, procedure_id: ProcedureId, files: &[&str]) { let dir = format!("{procedure_id}/"); let object = object_store.object(&dir); @@ -469,7 +462,7 @@ mod tests { procedure_id: meta.id, }; let object_store = test_util::new_object_store(&dir); - let procedure_store = new_procedure_store(object_store.clone()); + let procedure_store = ProcedureStore::from(object_store.clone()); let mut runner = new_runner(meta, Box::new(normal), procedure_store); let res = runner.execute_once(&ctx).await; @@ -519,7 +512,7 @@ mod tests { procedure_id: meta.id, }; let object_store = test_util::new_object_store(&dir); - let procedure_store = new_procedure_store(object_store.clone()); + let procedure_store = ProcedureStore::from(object_store.clone()); let mut runner = new_runner(meta, Box::new(suspend), procedure_store); let res = runner.execute_once(&ctx).await; @@ -609,7 +602,7 @@ mod tests { assert!(manager_ctx.try_insert_procedure(meta.clone())); let object_store = test_util::new_object_store(&dir); - let procedure_store = new_procedure_store(object_store.clone()); + let procedure_store = ProcedureStore::from(object_store.clone()); let mut runner = new_runner(meta, Box::new(parent), procedure_store); // Replace the manager ctx. runner.manager_ctx = manager_ctx; @@ -649,7 +642,7 @@ mod tests { procedure_id: meta.id, }; let object_store = test_util::new_object_store(&dir); - let procedure_store = new_procedure_store(object_store.clone()); + let procedure_store = ProcedureStore::from(object_store.clone()); let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store); let res = runner.execute_once(&ctx).await; @@ -721,7 +714,7 @@ mod tests { assert!(manager_ctx.try_insert_procedure(meta.clone())); let object_store = test_util::new_object_store(&dir); - let procedure_store = new_procedure_store(object_store.clone()); + let procedure_store = ProcedureStore::from(object_store.clone()); let mut runner = new_runner(meta, Box::new(parent), procedure_store); // Replace the manager ctx. runner.manager_ctx = manager_ctx; diff --git a/src/common/procedure/src/store.rs b/src/common/procedure/src/store.rs index efdebf3329..3f7ee76396 100644 --- a/src/common/procedure/src/store.rs +++ b/src/common/procedure/src/store.rs @@ -14,9 +14,11 @@ use std::collections::HashMap; use std::fmt; +use std::sync::Arc; use common_telemetry::logging; use futures::TryStreamExt; +use object_store::ObjectStore; use serde::{Deserialize, Serialize}; use snafu::ResultExt; @@ -27,7 +29,7 @@ use crate::{BoxedProcedure, ProcedureId}; mod state_store; /// Serialized data of a procedure. -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct ProcedureMessage { /// Type name of the procedure. The procedure framework also use the type name to /// find a loader to load the procedure. @@ -115,7 +117,7 @@ impl ProcedureStore { } /// Load uncommitted procedures from the storage. - async fn load_messages(&self) -> Result> { + pub(crate) async fn load_messages(&self) -> Result> { let mut messages = HashMap::new(); // Track the key-value pair by procedure id. let mut procedure_key_values: HashMap<_, (ParsedKey, Vec)> = HashMap::new(); @@ -163,6 +165,14 @@ impl ProcedureStore { } } +impl From for ProcedureStore { + fn from(store: ObjectStore) -> ProcedureStore { + let state_store = ObjectStateStore::new(store); + + ProcedureStore::new(Arc::new(state_store)) + } +} + /// Suffix type of the key. #[derive(Debug, PartialEq, Eq)] enum KeyType { @@ -235,11 +245,8 @@ impl ParsedKey { #[cfg(test)] mod tests { - use std::sync::Arc; - use async_trait::async_trait; use object_store::services::fs::Builder; - use object_store::ObjectStore; use tempdir::TempDir; use super::*; @@ -249,9 +256,8 @@ mod tests { let store_dir = dir.path().to_str().unwrap(); let accessor = Builder::default().root(store_dir).build().unwrap(); let object_store = ObjectStore::new(accessor); - let state_store = ObjectStateStore::new(object_store); - ProcedureStore::new(Arc::new(state_store)) + ProcedureStore::from(object_store) } #[test]