feat: clean procedure's state after it is done (#1543)

* feat(common-procedure): pub(crate) use proc_path

* feat(common-procedure): Implement delete_procedure

* feat(common-procedure): Clean procedure after it is finished

* chore(common-procedure): put path_string in front of try_stream

* test(common-procedure): Test cleaning up procedures

* feat(common-procedure): Clean procedure states in recover()

* feat(common-procedure): Use VecDeque for finished procedures
This commit is contained in:
Yingwen
2023-05-09 11:44:50 +08:00
committed by GitHub
parent 05e6ca1e14
commit 2ab0e42d6f
5 changed files with 289 additions and 48 deletions

View File

@@ -17,7 +17,7 @@ mod runner;
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use std::time::{Duration, Instant};
use async_trait::async_trait;
use backon::ExponentialBuilder;
@@ -36,6 +36,9 @@ use crate::{
ProcedureWithId, Watcher,
};
/// The expired time of a procedure's metadata.
const META_TTL: Duration = Duration::from_secs(60 * 10);
/// Shared metadata of a procedure.
///
/// # Note
@@ -126,6 +129,8 @@ pub(crate) struct ManagerContext {
procedures: RwLock<HashMap<ProcedureId, ProcedureMetaRef>>,
/// Messages loaded from the procedure store.
messages: Mutex<HashMap<ProcedureId, ProcedureMessage>>,
/// Ids and finished time of finished procedures.
finished_procedures: Mutex<VecDeque<(ProcedureId, Instant)>>,
}
#[async_trait]
@@ -143,6 +148,7 @@ impl ManagerContext {
lock_map: LockMap::new(),
procedures: RwLock::new(HashMap::new()),
messages: Mutex::new(HashMap::new()),
finished_procedures: Mutex::new(VecDeque::new()),
}
}
@@ -285,6 +291,49 @@ impl ManagerContext {
messages.remove(procedure_id);
}
}
/// Clean resources of finished procedures.
fn on_procedures_finish(&self, procedure_ids: &[ProcedureId]) {
self.remove_messages(procedure_ids);
// Since users need to query the procedure state, so we can't remove the
// meta of the procedure directly.
let now = Instant::now();
let mut finished_procedures = self.finished_procedures.lock().unwrap();
finished_procedures.extend(procedure_ids.iter().map(|id| (*id, now)));
}
/// Remove metadata of outdated procedures.
fn remove_outdated_meta(&self, ttl: Duration) {
let ids = {
let mut finished_procedures = self.finished_procedures.lock().unwrap();
if finished_procedures.is_empty() {
return;
}
let mut ids_to_remove = Vec::new();
while let Some((id, finish_time)) = finished_procedures.front() {
if finish_time.elapsed() > ttl {
ids_to_remove.push(*id);
finished_procedures.pop_front();
} else {
// The rest procedures are finished later, so we can break
// the loop.
break;
}
}
ids_to_remove
};
if ids.is_empty() {
return;
}
let mut procedures = self.procedures.write().unwrap();
for id in ids {
procedures.remove(&id);
}
}
}
/// Config for [LocalManager].
@@ -377,14 +426,18 @@ impl ProcedureManager for LocalManager {
DuplicateProcedureSnafu { procedure_id }
);
// TODO(yingwen): We can use a repeated task to remove outdated meta.
self.manager_ctx.remove_outdated_meta(META_TTL);
self.submit_root(procedure.id, 0, procedure.procedure)
}
async fn recover(&self) -> Result<()> {
logging::info!("LocalManager start to recover");
let recover_start = Instant::now();
let procedure_store = ProcedureStore::new(self.state_store.clone());
let messages = procedure_store.load_messages().await?;
let (messages, finished_ids) = procedure_store.load_messages().await?;
for (procedure_id, message) in &messages {
if message.parent_id.is_none() {
@@ -412,14 +465,36 @@ impl ProcedureManager for LocalManager {
}
}
if !finished_ids.is_empty() {
logging::info!(
"LocalManager try to clean finished procedures, num: {}",
finished_ids.len()
);
for procedure_id in finished_ids {
if let Err(e) = procedure_store.delete_procedure(procedure_id).await {
logging::error!(e; "Failed to delete procedure {}", procedure_id);
}
}
}
logging::info!(
"LocalManager finish recovery, cost: {}ms",
recover_start.elapsed().as_millis()
);
Ok(())
}
async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
self.manager_ctx.remove_outdated_meta(META_TTL);
Ok(self.manager_ctx.state(procedure_id))
}
fn procedure_watcher(&self, procedure_id: ProcedureId) -> Option<Watcher> {
self.manager_ctx.remove_outdated_meta(META_TTL);
self.manager_ctx.watcher(procedure_id)
}
}

View File

@@ -150,7 +150,19 @@ impl Runner {
// If this is the root procedure, clean up message cache.
if self.meta.parent_id.is_none() {
let procedure_ids = self.manager_ctx.procedures_in_tree(&self.meta);
self.manager_ctx.remove_messages(&procedure_ids);
// Clean resources.
self.manager_ctx.on_procedures_finish(&procedure_ids);
for id in procedure_ids {
if let Err(e) = self.store.delete_procedure(id).await {
logging::error!(
e;
"Runner {}-{} failed to delete procedure {}",
self.procedure.type_name(),
self.meta.id,
id,
);
}
}
}
logging::info!(
@@ -451,7 +463,7 @@ mod tests {
use super::*;
use crate::local::test_util;
use crate::store::PROC_PATH;
use crate::store::proc_path;
use crate::{ContextProvider, Error, LockKey, Procedure};
const ROOT_ID: &str = "9f805a1f-05f7-490c-9f91-bd56e3cc54c1";
@@ -473,7 +485,7 @@ mod tests {
}
async fn check_files(object_store: &ObjectStore, procedure_id: ProcedureId, files: &[&str]) {
let dir = format!("{PROC_PATH}/{procedure_id}/");
let dir = proc_path!("{procedure_id}/");
let lister = object_store.list(&dir).await.unwrap();
let mut files_in_dir: Vec<_> = lister
.map_ok(|de| de.name().to_string())
@@ -720,25 +732,28 @@ mod tests {
// Manually add this procedure to the manager ctx.
assert!(manager_ctx.try_insert_procedure(meta));
// Replace the manager ctx.
runner.manager_ctx = manager_ctx;
runner.manager_ctx = manager_ctx.clone();
runner.run().await;
// Check files on store.
// Check child procedures.
for child_id in children_ids {
check_files(
&object_store,
child_id,
&["0000000000.step", "0000000001.commit"],
)
.await;
let state = manager_ctx.state(child_id).unwrap();
assert!(state.is_done(), "{state:?}");
}
let state = manager_ctx.state(procedure_id).unwrap();
assert!(state.is_done(), "{state:?}");
// Files are removed.
check_files(&object_store, procedure_id, &[]).await;
tokio::time::sleep(Duration::from_millis(5)).await;
// Clean outdated meta.
manager_ctx.remove_outdated_meta(Duration::from_millis(1));
assert!(manager_ctx.state(procedure_id).is_none());
assert!(manager_ctx.finished_procedures.lock().unwrap().is_empty());
for child_id in children_ids {
assert!(manager_ctx.state(child_id).is_none());
}
check_files(
&object_store,
procedure_id,
&["0000000000.step", "0000000001.commit"],
)
.await;
}
#[tokio::test]

View File

@@ -31,6 +31,14 @@ pub mod state_store;
/// Key prefix of procedure store.
pub(crate) const PROC_PATH: &str = "procedure/";
/// Constructs a path for procedure store.
macro_rules! proc_path {
($fmt:expr) => { format!("{}{}", $crate::store::PROC_PATH, format_args!($fmt)) };
($fmt:expr, $($args:tt)*) => { format!("{}{}", $crate::store::PROC_PATH, format_args!($fmt, $($args)*)) };
}
pub(crate) use proc_path;
/// Serialized data of a procedure.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ProcedureMessage {
@@ -119,9 +127,50 @@ impl ProcedureStore {
Ok(())
}
/// Load uncommitted procedures from the storage.
pub(crate) async fn load_messages(&self) -> Result<HashMap<ProcedureId, ProcedureMessage>> {
let mut messages = HashMap::new();
/// Delete states of procedure from the storage.
pub(crate) async fn delete_procedure(&self, procedure_id: ProcedureId) -> Result<()> {
let path = proc_path!("{procedure_id}/");
// TODO(yingwen): We can optimize this to avoid reading the value.
let mut key_values = self.0.walk_top_down(&path).await?;
// 8 should be enough for most procedures.
let mut step_keys = Vec::with_capacity(8);
let mut finish_keys = Vec::new();
while let Some((key, _)) = key_values.try_next().await? {
let Some(curr_key) = ParsedKey::parse_str(&key) else {
logging::warn!("Unknown key while deleting procedures, key: {}", key);
continue;
};
if curr_key.key_type == KeyType::Step {
step_keys.push(key);
} else {
// .commit or .rollback
finish_keys.push(key);
}
}
logging::debug!(
"Delete keys for procedure {}, step_keys: {:?}, finish_keys: {:?}",
procedure_id,
step_keys,
finish_keys
);
// We delete all step keys first.
self.0.batch_delete(step_keys.as_slice()).await?;
// Then we delete the finish keys, to ensure
self.0.batch_delete(finish_keys.as_slice()).await?;
// Finally we remove the directory itself.
self.0.delete(&path).await?;
// Maybe we could use procedure_id.commit/rollback as the file name so we could
// use remove_all to remove the directory and then remove the commit/rollback file.
Ok(())
}
/// Load procedures from the storage. Returns a map of uncommitted procedures and a list
/// of finished procedures' ids.
pub(crate) async fn load_messages(
&self,
) -> Result<(HashMap<ProcedureId, ProcedureMessage>, Vec<ProcedureId>)> {
// Track the key-value pair by procedure id.
let mut procedure_key_values: HashMap<_, (ParsedKey, Vec<u8>)> = HashMap::new();
@@ -143,6 +192,8 @@ impl ProcedureStore {
}
}
let mut messages = HashMap::with_capacity(procedure_key_values.len());
let mut finished_ids = Vec::new();
for (procedure_id, (parsed_key, value)) in procedure_key_values {
if parsed_key.key_type == KeyType::Step {
let Some(message) = self.load_one_message(&parsed_key, &value) else {
@@ -151,10 +202,12 @@ impl ProcedureStore {
continue;
};
messages.insert(procedure_id, message);
} else {
finished_ids.push(procedure_id);
}
}
Ok(messages)
Ok((messages, finished_ids))
}
fn load_one_message(&self, key: &ParsedKey, value: &[u8]) -> Option<ProcedureMessage> {
@@ -266,11 +319,6 @@ mod tests {
ProcedureStore::from(object_store)
}
macro_rules! proc_path {
($fmt:expr) => { format!("{}{}", PROC_PATH, format_args!($fmt)) };
($fmt:expr, $($args:tt)*) => { format!("{}{}", PROC_PATH, format_args!($fmt, $($args)*)) };
}
#[test]
fn test_parsed_key() {
let procedure_id = ProcedureId::random();
@@ -403,8 +451,9 @@ mod tests {
.await
.unwrap();
let messages = store.load_messages().await.unwrap();
let (messages, finished) = store.load_messages().await.unwrap();
assert_eq!(1, messages.len());
assert!(finished.is_empty());
let msg = messages.get(&procedure_id).unwrap();
let expect = ProcedureMessage {
type_name: "MockProcedure".to_string(),
@@ -429,8 +478,9 @@ mod tests {
.unwrap();
store.commit_procedure(procedure_id, 1).await.unwrap();
let messages = store.load_messages().await.unwrap();
let (messages, finished) = store.load_messages().await.unwrap();
assert!(messages.is_empty());
assert_eq!(&[procedure_id], &finished[..]);
}
#[tokio::test]
@@ -447,8 +497,58 @@ mod tests {
.unwrap();
store.rollback_procedure(procedure_id, 1).await.unwrap();
let messages = store.load_messages().await.unwrap();
let (messages, finished) = store.load_messages().await.unwrap();
assert!(messages.is_empty());
assert_eq!(&[procedure_id], &finished[..]);
}
#[tokio::test]
async fn test_delete_procedure() {
let dir = create_temp_dir("delete_procedure");
let store = procedure_store_for_test(&dir);
let procedure_id = ProcedureId::random();
let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure"));
store
.store_procedure(procedure_id, 0, &procedure, None)
.await
.unwrap();
store
.store_procedure(procedure_id, 1, &procedure, None)
.await
.unwrap();
store.delete_procedure(procedure_id).await.unwrap();
let (messages, finished) = store.load_messages().await.unwrap();
assert!(messages.is_empty());
assert!(finished.is_empty());
}
#[tokio::test]
async fn test_delete_committed_procedure() {
let dir = create_temp_dir("delete_committed");
let store = procedure_store_for_test(&dir);
let procedure_id = ProcedureId::random();
let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure"));
store
.store_procedure(procedure_id, 0, &procedure, None)
.await
.unwrap();
store
.store_procedure(procedure_id, 1, &procedure, None)
.await
.unwrap();
store.commit_procedure(procedure_id, 2).await.unwrap();
store.delete_procedure(procedure_id).await.unwrap();
let (messages, finished) = store.load_messages().await.unwrap();
assert!(messages.is_empty());
assert!(finished.is_empty());
}
#[tokio::test]
@@ -496,8 +596,9 @@ mod tests {
.await
.unwrap();
let messages = store.load_messages().await.unwrap();
let (messages, finished) = store.load_messages().await.unwrap();
assert_eq!(2, messages.len());
assert_eq!(1, finished.len());
let msg = messages.get(&id0).unwrap();
assert_eq!("id0-2", msg.data);

View File

@@ -45,7 +45,11 @@ pub trait StateStore: Send + Sync {
async fn walk_top_down(&self, path: &str) -> Result<KeyValueStream>;
/// Deletes key-value pairs by `keys`.
async fn delete(&self, keys: &[String]) -> Result<()>;
async fn batch_delete(&self, keys: &[String]) -> Result<()>;
/// Deletes one key-value pair by `key`. Return `Ok` if the key
/// does not exist.
async fn delete(&self, key: &str) -> Result<()>;
}
/// Reference counted pointer to [StateStore].
@@ -80,8 +84,6 @@ impl StateStore for ObjectStateStore {
}
async fn walk_top_down(&self, path: &str) -> Result<KeyValueStream> {
let path_string = path.to_string();
let mut lister = self
.store
.scan(path)
@@ -92,12 +94,11 @@ impl StateStore for ObjectStateStore {
StatusCode::StorageUnavailable,
))
})
.with_context(|_| ListStateSnafu {
path: path_string.clone(),
})?;
.context(ListStateSnafu { path })?;
let store = self.store.clone();
let path_string = path.to_string();
let stream = try_stream!({
while let Some(res) = lister.next().await {
let entry = res
@@ -138,13 +139,22 @@ impl StateStore for ObjectStateStore {
Ok(Box::pin(stream))
}
async fn delete(&self, keys: &[String]) -> Result<()> {
for key in keys {
self.store
.delete(key)
.await
.context(DeleteStateSnafu { key })?;
}
async fn batch_delete(&self, keys: &[String]) -> Result<()> {
self.store
.remove(keys.to_vec())
.await
.with_context(|_| DeleteStateSnafu {
key: format!("{:?}", keys),
})?;
Ok(())
}
async fn delete(&self, key: &str) -> Result<()> {
self.store
.delete(key)
.await
.with_context(|_| DeleteStateSnafu { key })?;
Ok(())
}
@@ -215,7 +225,7 @@ mod tests {
);
state_store
.delete(&["a/2".to_string(), "b/1".to_string()])
.batch_delete(&["a/2".to_string(), "b/1".to_string()])
.await
.unwrap();
let mut data: Vec<_> = state_store
@@ -228,4 +238,40 @@ mod tests {
data.sort_unstable_by(|a, b| a.0.cmp(&b.0));
assert_eq!(vec![("a/1".to_string(), b"v1".to_vec()),], data);
}
#[tokio::test]
async fn test_object_state_store_delete() {
let dir = create_temp_dir("state_store_list");
let store_dir = dir.path().to_str().unwrap();
let mut builder = Builder::default();
builder.root(store_dir);
let object_store = ObjectStore::new(builder).unwrap().finish();
let state_store = ObjectStateStore::new(object_store);
state_store.put("a/1", b"v1".to_vec()).await.unwrap();
state_store.put("a/2", b"v2".to_vec()).await.unwrap();
state_store.put("b/1", b"v3".to_vec()).await.unwrap();
state_store.delete("b/1").await.unwrap();
let mut data: Vec<_> = state_store
.walk_top_down("a/")
.await
.unwrap()
.try_collect()
.await
.unwrap();
data.sort_unstable_by(|a, b| a.0.cmp(&b.0));
assert_eq!(
vec![
("a/1".to_string(), b"v1".to_vec()),
("a/2".to_string(), b"v2".to_vec()),
],
data
);
// Delete returns Ok even the key doesn't exist.
state_store.delete("b/1").await.unwrap();
}
}

View File

@@ -109,7 +109,7 @@ impl StateStore for MetaStateStore {
Ok(Box::pin(stream))
}
async fn delete(&self, keys: &[String]) -> Result<()> {
async fn batch_delete(&self, keys: &[String]) -> Result<()> {
let _ = self
.kv_store
.batch_delete(BatchDeleteRequest {
@@ -126,6 +126,10 @@ impl StateStore for MetaStateStore {
})?;
Ok(())
}
async fn delete(&self, key: &str) -> Result<()> {
self.batch_delete(&[key.to_string()]).await
}
}
#[cfg(test)]
@@ -184,7 +188,7 @@ mod tests {
);
store
.delete(&["a/2".to_string(), "b/1".to_string()])
.batch_delete(&["a/2".to_string(), "b/1".to_string()])
.await
.unwrap();