feat: Implement LocalManager::recover (#981)

* feat: Implement LocalManager::recover

* feat: Impl From<ObjectStore> for ProcedureStore
This commit is contained in:
Yingwen
2023-02-14 14:50:43 +08:00
committed by GitHub
parent 9ad6c45913
commit 0f7e5a2fb2
3 changed files with 151 additions and 60 deletions

View File

@@ -108,6 +108,7 @@ impl ProcedureMeta {
/// Reference counted pointer to [ProcedureMeta].
type ProcedureMetaRef = Arc<ProcedureMeta>;
/// Procedure loaded from store.
struct LoadedProcedure {
procedure: BoxedProcedure,
@@ -176,9 +177,20 @@ impl ManagerContext {
/// Load procedure with specific `procedure_id` from cached [ProcedureMessage]s.
fn load_one_procedure(&self, procedure_id: ProcedureId) -> Option<LoadedProcedure> {
let messages = self.messages.lock().unwrap();
let message = messages.get(&procedure_id)?;
let message = {
let messages = self.messages.lock().unwrap();
messages.get(&procedure_id).cloned()?
};
self.load_one_procedure_from_message(procedure_id, &message)
}
/// Load procedure from specific [ProcedureMessage].
fn load_one_procedure_from_message(
&self,
procedure_id: ProcedureId,
message: &ProcedureMessage,
) -> Option<LoadedProcedure> {
let loaders = self.loaders.lock().unwrap();
let loader = loaders.get(&message.type_name).or_else(|| {
logging::error!(
@@ -344,7 +356,38 @@ impl ProcedureManager for LocalManager {
}
async fn recover(&self) -> Result<()> {
todo!("Recover procedure and messages")
logging::info!("LocalManager start to recover");
let procedure_store = ProcedureStore::new(self.state_store.clone());
let messages = procedure_store.load_messages().await?;
for (procedure_id, message) in &messages {
if message.parent_id.is_none() {
// This is the root procedure. We only submit the root procedure as it will
// submit sub-procedures to the manager.
let Some(loaded_procedure) = self.manager_ctx.load_one_procedure_from_message(*procedure_id, message) else {
// Try to load other procedures.
continue;
};
logging::info!(
"Recover root procedure {}-{}, step: {}",
loaded_procedure.procedure.type_name(),
procedure_id,
loaded_procedure.step
);
if let Err(e) = self.submit_root(
*procedure_id,
loaded_procedure.step,
loaded_procedure.procedure,
) {
logging::error!(e; "Failed to recover procedure {}", procedure_id);
}
}
}
Ok(())
}
async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
@@ -380,7 +423,6 @@ mod test_util {
#[cfg(test)]
mod tests {
use serde::{Deserialize, Serialize};
use tempdir::TempDir;
use super::*;
@@ -447,6 +489,46 @@ mod tests {
assert_eq!(expect, ctx.procedures_in_tree(&root));
}
#[derive(Debug)]
struct ProcedureToLoad {
content: String,
}
#[async_trait]
impl Procedure for ProcedureToLoad {
fn type_name(&self) -> &str {
"ProcedureToLoad"
}
async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
Ok(Status::Done)
}
fn dump(&self) -> Result<String> {
Ok(self.content.clone())
}
fn lock_key(&self) -> Option<LockKey> {
None
}
}
impl ProcedureToLoad {
fn new(content: &str) -> ProcedureToLoad {
ProcedureToLoad {
content: content.to_string(),
}
}
fn loader() -> BoxedProcedureLoader {
let f = |json: &str| {
let procedure = ProcedureToLoad::new(json);
Ok(Box::new(procedure) as _)
};
Box::new(f)
}
}
#[test]
fn test_register_loader() {
let dir = TempDir::new("register").unwrap();
@@ -455,51 +537,61 @@ mod tests {
};
let manager = LocalManager::new(config);
#[derive(Debug, Serialize, Deserialize)]
struct MockData {
id: u32,
content: String,
}
#[derive(Debug)]
struct ProcedureToLoad {
data: MockData,
}
#[async_trait]
impl Procedure for ProcedureToLoad {
fn type_name(&self) -> &str {
"ProcedureToLoad"
}
async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
unimplemented!()
}
fn dump(&self) -> Result<String> {
Ok(serde_json::to_string(&self.data).unwrap())
}
fn lock_key(&self) -> Option<LockKey> {
None
}
}
let loader = |json: &str| {
let data = serde_json::from_str(json).unwrap();
let procedure = ProcedureToLoad { data };
Ok(Box::new(procedure) as _)
};
manager
.register_loader("ProcedureToLoad", Box::new(loader))
.register_loader("ProcedureToLoad", ProcedureToLoad::loader())
.unwrap();
// Register duplicate loader.
let err = manager
.register_loader("ProcedureToLoad", Box::new(loader))
.register_loader("ProcedureToLoad", ProcedureToLoad::loader())
.unwrap_err();
assert!(matches!(err, Error::LoaderConflict { .. }), "{err}");
}
#[tokio::test]
async fn test_recover() {
let dir = TempDir::new("recover").unwrap();
let object_store = test_util::new_object_store(&dir);
let config = ManagerConfig {
object_store: object_store.clone(),
};
let manager = LocalManager::new(config);
manager
.register_loader("ProcedureToLoad", ProcedureToLoad::loader())
.unwrap();
// Prepare data
let procedure_store = ProcedureStore::from(object_store.clone());
let root: BoxedProcedure = Box::new(ProcedureToLoad::new("test recover manager"));
let root_id = ProcedureId::random();
// Prepare data for the root procedure.
for step in 0..3 {
procedure_store
.store_procedure(root_id, step, &root, None)
.await
.unwrap();
}
let child: BoxedProcedure = Box::new(ProcedureToLoad::new("a child procedure"));
let child_id = ProcedureId::random();
// Prepare data for the child procedure
for step in 0..2 {
procedure_store
.store_procedure(child_id, step, &child, Some(root_id))
.await
.unwrap();
}
// Recover the manager
manager.recover().await.unwrap();
// The manager should submit the root procedure.
assert!(manager.procedure_state(root_id).await.unwrap().is_some());
// Since the mocked root procedure actually doesn't submit subprocedures, so there is no
// related state.
assert!(manager.procedure_state(child_id).await.unwrap().is_none());
}
#[tokio::test]
async fn test_submit_procedure() {
let dir = TempDir::new("submit").unwrap();

View File

@@ -366,7 +366,6 @@ mod tests {
use super::*;
use crate::local::test_util;
use crate::store::ObjectStateStore;
use crate::{LockKey, Procedure};
const ROOT_ID: &str = "9f805a1f-05f7-490c-9f91-bd56e3cc54c1";
@@ -385,12 +384,6 @@ mod tests {
}
}
fn new_procedure_store(object_store: ObjectStore) -> ProcedureStore {
let state_store = ObjectStateStore::new(object_store);
ProcedureStore::new(Arc::new(state_store))
}
async fn check_files(object_store: &ObjectStore, procedure_id: ProcedureId, files: &[&str]) {
let dir = format!("{procedure_id}/");
let object = object_store.object(&dir);
@@ -469,7 +462,7 @@ mod tests {
procedure_id: meta.id,
};
let object_store = test_util::new_object_store(&dir);
let procedure_store = new_procedure_store(object_store.clone());
let procedure_store = ProcedureStore::from(object_store.clone());
let mut runner = new_runner(meta, Box::new(normal), procedure_store);
let res = runner.execute_once(&ctx).await;
@@ -519,7 +512,7 @@ mod tests {
procedure_id: meta.id,
};
let object_store = test_util::new_object_store(&dir);
let procedure_store = new_procedure_store(object_store.clone());
let procedure_store = ProcedureStore::from(object_store.clone());
let mut runner = new_runner(meta, Box::new(suspend), procedure_store);
let res = runner.execute_once(&ctx).await;
@@ -609,7 +602,7 @@ mod tests {
assert!(manager_ctx.try_insert_procedure(meta.clone()));
let object_store = test_util::new_object_store(&dir);
let procedure_store = new_procedure_store(object_store.clone());
let procedure_store = ProcedureStore::from(object_store.clone());
let mut runner = new_runner(meta, Box::new(parent), procedure_store);
// Replace the manager ctx.
runner.manager_ctx = manager_ctx;
@@ -649,7 +642,7 @@ mod tests {
procedure_id: meta.id,
};
let object_store = test_util::new_object_store(&dir);
let procedure_store = new_procedure_store(object_store.clone());
let procedure_store = ProcedureStore::from(object_store.clone());
let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store);
let res = runner.execute_once(&ctx).await;
@@ -721,7 +714,7 @@ mod tests {
assert!(manager_ctx.try_insert_procedure(meta.clone()));
let object_store = test_util::new_object_store(&dir);
let procedure_store = new_procedure_store(object_store.clone());
let procedure_store = ProcedureStore::from(object_store.clone());
let mut runner = new_runner(meta, Box::new(parent), procedure_store);
// Replace the manager ctx.
runner.manager_ctx = manager_ctx;

View File

@@ -14,9 +14,11 @@
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use common_telemetry::logging;
use futures::TryStreamExt;
use object_store::ObjectStore;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
@@ -27,7 +29,7 @@ use crate::{BoxedProcedure, ProcedureId};
mod state_store;
/// Serialized data of a procedure.
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ProcedureMessage {
/// Type name of the procedure. The procedure framework also use the type name to
/// find a loader to load the procedure.
@@ -115,7 +117,7 @@ impl ProcedureStore {
}
/// Load uncommitted procedures from the storage.
async fn load_messages(&self) -> Result<HashMap<ProcedureId, ProcedureMessage>> {
pub(crate) async fn load_messages(&self) -> Result<HashMap<ProcedureId, ProcedureMessage>> {
let mut messages = HashMap::new();
// Track the key-value pair by procedure id.
let mut procedure_key_values: HashMap<_, (ParsedKey, Vec<u8>)> = HashMap::new();
@@ -163,6 +165,14 @@ impl ProcedureStore {
}
}
impl From<ObjectStore> for ProcedureStore {
fn from(store: ObjectStore) -> ProcedureStore {
let state_store = ObjectStateStore::new(store);
ProcedureStore::new(Arc::new(state_store))
}
}
/// Suffix type of the key.
#[derive(Debug, PartialEq, Eq)]
enum KeyType {
@@ -235,11 +245,8 @@ impl ParsedKey {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use async_trait::async_trait;
use object_store::services::fs::Builder;
use object_store::ObjectStore;
use tempdir::TempDir;
use super::*;
@@ -249,9 +256,8 @@ mod tests {
let store_dir = dir.path().to_str().unwrap();
let accessor = Builder::default().root(store_dir).build().unwrap();
let object_store = ObjectStore::new(accessor);
let state_store = ObjectStateStore::new(object_store);
ProcedureStore::new(Arc::new(state_store))
ProcedureStore::from(object_store)
}
#[test]