diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index 0c59b85b9c..419ab9d14f 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -17,7 +17,7 @@ mod runner; use std::collections::{HashMap, VecDeque}; use std::sync::{Arc, Mutex, RwLock}; -use std::time::Duration; +use std::time::{Duration, Instant}; use async_trait::async_trait; use backon::ExponentialBuilder; @@ -36,6 +36,9 @@ use crate::{ ProcedureWithId, Watcher, }; +/// The expired time of a procedure's metadata. +const META_TTL: Duration = Duration::from_secs(60 * 10); + /// Shared metadata of a procedure. /// /// # Note @@ -126,6 +129,8 @@ pub(crate) struct ManagerContext { procedures: RwLock>, /// Messages loaded from the procedure store. messages: Mutex>, + /// Ids and finished time of finished procedures. + finished_procedures: Mutex>, } #[async_trait] @@ -143,6 +148,7 @@ impl ManagerContext { lock_map: LockMap::new(), procedures: RwLock::new(HashMap::new()), messages: Mutex::new(HashMap::new()), + finished_procedures: Mutex::new(VecDeque::new()), } } @@ -285,6 +291,49 @@ impl ManagerContext { messages.remove(procedure_id); } } + + /// Clean resources of finished procedures. + fn on_procedures_finish(&self, procedure_ids: &[ProcedureId]) { + self.remove_messages(procedure_ids); + + // Since users need to query the procedure state, so we can't remove the + // meta of the procedure directly. + let now = Instant::now(); + let mut finished_procedures = self.finished_procedures.lock().unwrap(); + finished_procedures.extend(procedure_ids.iter().map(|id| (*id, now))); + } + + /// Remove metadata of outdated procedures. + fn remove_outdated_meta(&self, ttl: Duration) { + let ids = { + let mut finished_procedures = self.finished_procedures.lock().unwrap(); + if finished_procedures.is_empty() { + return; + } + + let mut ids_to_remove = Vec::new(); + while let Some((id, finish_time)) = finished_procedures.front() { + if finish_time.elapsed() > ttl { + ids_to_remove.push(*id); + finished_procedures.pop_front(); + } else { + // The rest procedures are finished later, so we can break + // the loop. + break; + } + } + ids_to_remove + }; + + if ids.is_empty() { + return; + } + + let mut procedures = self.procedures.write().unwrap(); + for id in ids { + procedures.remove(&id); + } + } } /// Config for [LocalManager]. @@ -377,14 +426,18 @@ impl ProcedureManager for LocalManager { DuplicateProcedureSnafu { procedure_id } ); + // TODO(yingwen): We can use a repeated task to remove outdated meta. + self.manager_ctx.remove_outdated_meta(META_TTL); + self.submit_root(procedure.id, 0, procedure.procedure) } async fn recover(&self) -> Result<()> { logging::info!("LocalManager start to recover"); + let recover_start = Instant::now(); let procedure_store = ProcedureStore::new(self.state_store.clone()); - let messages = procedure_store.load_messages().await?; + let (messages, finished_ids) = procedure_store.load_messages().await?; for (procedure_id, message) in &messages { if message.parent_id.is_none() { @@ -412,14 +465,36 @@ impl ProcedureManager for LocalManager { } } + if !finished_ids.is_empty() { + logging::info!( + "LocalManager try to clean finished procedures, num: {}", + finished_ids.len() + ); + + for procedure_id in finished_ids { + if let Err(e) = procedure_store.delete_procedure(procedure_id).await { + logging::error!(e; "Failed to delete procedure {}", procedure_id); + } + } + } + + logging::info!( + "LocalManager finish recovery, cost: {}ms", + recover_start.elapsed().as_millis() + ); + Ok(()) } async fn procedure_state(&self, procedure_id: ProcedureId) -> Result> { + self.manager_ctx.remove_outdated_meta(META_TTL); + Ok(self.manager_ctx.state(procedure_id)) } fn procedure_watcher(&self, procedure_id: ProcedureId) -> Option { + self.manager_ctx.remove_outdated_meta(META_TTL); + self.manager_ctx.watcher(procedure_id) } } diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index 43f0412264..21ed2a11fc 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -150,7 +150,19 @@ impl Runner { // If this is the root procedure, clean up message cache. if self.meta.parent_id.is_none() { let procedure_ids = self.manager_ctx.procedures_in_tree(&self.meta); - self.manager_ctx.remove_messages(&procedure_ids); + // Clean resources. + self.manager_ctx.on_procedures_finish(&procedure_ids); + for id in procedure_ids { + if let Err(e) = self.store.delete_procedure(id).await { + logging::error!( + e; + "Runner {}-{} failed to delete procedure {}", + self.procedure.type_name(), + self.meta.id, + id, + ); + } + } } logging::info!( @@ -451,7 +463,7 @@ mod tests { use super::*; use crate::local::test_util; - use crate::store::PROC_PATH; + use crate::store::proc_path; use crate::{ContextProvider, Error, LockKey, Procedure}; const ROOT_ID: &str = "9f805a1f-05f7-490c-9f91-bd56e3cc54c1"; @@ -473,7 +485,7 @@ mod tests { } async fn check_files(object_store: &ObjectStore, procedure_id: ProcedureId, files: &[&str]) { - let dir = format!("{PROC_PATH}/{procedure_id}/"); + let dir = proc_path!("{procedure_id}/"); let lister = object_store.list(&dir).await.unwrap(); let mut files_in_dir: Vec<_> = lister .map_ok(|de| de.name().to_string()) @@ -720,25 +732,28 @@ mod tests { // Manually add this procedure to the manager ctx. assert!(manager_ctx.try_insert_procedure(meta)); // Replace the manager ctx. - runner.manager_ctx = manager_ctx; + runner.manager_ctx = manager_ctx.clone(); runner.run().await; - // Check files on store. + // Check child procedures. for child_id in children_ids { - check_files( - &object_store, - child_id, - &["0000000000.step", "0000000001.commit"], - ) - .await; + let state = manager_ctx.state(child_id).unwrap(); + assert!(state.is_done(), "{state:?}"); + } + let state = manager_ctx.state(procedure_id).unwrap(); + assert!(state.is_done(), "{state:?}"); + // Files are removed. + check_files(&object_store, procedure_id, &[]).await; + + tokio::time::sleep(Duration::from_millis(5)).await; + // Clean outdated meta. + manager_ctx.remove_outdated_meta(Duration::from_millis(1)); + assert!(manager_ctx.state(procedure_id).is_none()); + assert!(manager_ctx.finished_procedures.lock().unwrap().is_empty()); + for child_id in children_ids { + assert!(manager_ctx.state(child_id).is_none()); } - check_files( - &object_store, - procedure_id, - &["0000000000.step", "0000000001.commit"], - ) - .await; } #[tokio::test] diff --git a/src/common/procedure/src/store.rs b/src/common/procedure/src/store.rs index 7c332a9e99..4c8ad4bb8a 100644 --- a/src/common/procedure/src/store.rs +++ b/src/common/procedure/src/store.rs @@ -31,6 +31,14 @@ pub mod state_store; /// Key prefix of procedure store. pub(crate) const PROC_PATH: &str = "procedure/"; +/// Constructs a path for procedure store. +macro_rules! proc_path { + ($fmt:expr) => { format!("{}{}", $crate::store::PROC_PATH, format_args!($fmt)) }; + ($fmt:expr, $($args:tt)*) => { format!("{}{}", $crate::store::PROC_PATH, format_args!($fmt, $($args)*)) }; +} + +pub(crate) use proc_path; + /// Serialized data of a procedure. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct ProcedureMessage { @@ -119,9 +127,50 @@ impl ProcedureStore { Ok(()) } - /// Load uncommitted procedures from the storage. - pub(crate) async fn load_messages(&self) -> Result> { - let mut messages = HashMap::new(); + /// Delete states of procedure from the storage. + pub(crate) async fn delete_procedure(&self, procedure_id: ProcedureId) -> Result<()> { + let path = proc_path!("{procedure_id}/"); + // TODO(yingwen): We can optimize this to avoid reading the value. + let mut key_values = self.0.walk_top_down(&path).await?; + // 8 should be enough for most procedures. + let mut step_keys = Vec::with_capacity(8); + let mut finish_keys = Vec::new(); + while let Some((key, _)) = key_values.try_next().await? { + let Some(curr_key) = ParsedKey::parse_str(&key) else { + logging::warn!("Unknown key while deleting procedures, key: {}", key); + continue; + }; + if curr_key.key_type == KeyType::Step { + step_keys.push(key); + } else { + // .commit or .rollback + finish_keys.push(key); + } + } + + logging::debug!( + "Delete keys for procedure {}, step_keys: {:?}, finish_keys: {:?}", + procedure_id, + step_keys, + finish_keys + ); + // We delete all step keys first. + self.0.batch_delete(step_keys.as_slice()).await?; + // Then we delete the finish keys, to ensure + self.0.batch_delete(finish_keys.as_slice()).await?; + // Finally we remove the directory itself. + self.0.delete(&path).await?; + // Maybe we could use procedure_id.commit/rollback as the file name so we could + // use remove_all to remove the directory and then remove the commit/rollback file. + + Ok(()) + } + + /// Load procedures from the storage. Returns a map of uncommitted procedures and a list + /// of finished procedures' ids. + pub(crate) async fn load_messages( + &self, + ) -> Result<(HashMap, Vec)> { // Track the key-value pair by procedure id. let mut procedure_key_values: HashMap<_, (ParsedKey, Vec)> = HashMap::new(); @@ -143,6 +192,8 @@ impl ProcedureStore { } } + let mut messages = HashMap::with_capacity(procedure_key_values.len()); + let mut finished_ids = Vec::new(); for (procedure_id, (parsed_key, value)) in procedure_key_values { if parsed_key.key_type == KeyType::Step { let Some(message) = self.load_one_message(&parsed_key, &value) else { @@ -151,10 +202,12 @@ impl ProcedureStore { continue; }; messages.insert(procedure_id, message); + } else { + finished_ids.push(procedure_id); } } - Ok(messages) + Ok((messages, finished_ids)) } fn load_one_message(&self, key: &ParsedKey, value: &[u8]) -> Option { @@ -266,11 +319,6 @@ mod tests { ProcedureStore::from(object_store) } - macro_rules! proc_path { - ($fmt:expr) => { format!("{}{}", PROC_PATH, format_args!($fmt)) }; - ($fmt:expr, $($args:tt)*) => { format!("{}{}", PROC_PATH, format_args!($fmt, $($args)*)) }; - } - #[test] fn test_parsed_key() { let procedure_id = ProcedureId::random(); @@ -403,8 +451,9 @@ mod tests { .await .unwrap(); - let messages = store.load_messages().await.unwrap(); + let (messages, finished) = store.load_messages().await.unwrap(); assert_eq!(1, messages.len()); + assert!(finished.is_empty()); let msg = messages.get(&procedure_id).unwrap(); let expect = ProcedureMessage { type_name: "MockProcedure".to_string(), @@ -429,8 +478,9 @@ mod tests { .unwrap(); store.commit_procedure(procedure_id, 1).await.unwrap(); - let messages = store.load_messages().await.unwrap(); + let (messages, finished) = store.load_messages().await.unwrap(); assert!(messages.is_empty()); + assert_eq!(&[procedure_id], &finished[..]); } #[tokio::test] @@ -447,8 +497,58 @@ mod tests { .unwrap(); store.rollback_procedure(procedure_id, 1).await.unwrap(); - let messages = store.load_messages().await.unwrap(); + let (messages, finished) = store.load_messages().await.unwrap(); assert!(messages.is_empty()); + assert_eq!(&[procedure_id], &finished[..]); + } + + #[tokio::test] + async fn test_delete_procedure() { + let dir = create_temp_dir("delete_procedure"); + let store = procedure_store_for_test(&dir); + + let procedure_id = ProcedureId::random(); + let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure")); + + store + .store_procedure(procedure_id, 0, &procedure, None) + .await + .unwrap(); + store + .store_procedure(procedure_id, 1, &procedure, None) + .await + .unwrap(); + + store.delete_procedure(procedure_id).await.unwrap(); + + let (messages, finished) = store.load_messages().await.unwrap(); + assert!(messages.is_empty()); + assert!(finished.is_empty()); + } + + #[tokio::test] + async fn test_delete_committed_procedure() { + let dir = create_temp_dir("delete_committed"); + let store = procedure_store_for_test(&dir); + + let procedure_id = ProcedureId::random(); + let procedure: BoxedProcedure = Box::new(MockProcedure::new("test store procedure")); + + store + .store_procedure(procedure_id, 0, &procedure, None) + .await + .unwrap(); + store + .store_procedure(procedure_id, 1, &procedure, None) + .await + .unwrap(); + store.commit_procedure(procedure_id, 2).await.unwrap(); + + store.delete_procedure(procedure_id).await.unwrap(); + + let (messages, finished) = store.load_messages().await.unwrap(); + assert!(messages.is_empty()); + assert!(finished.is_empty()); } #[tokio::test] @@ -496,8 +596,9 @@ mod tests { .await .unwrap(); - let messages = store.load_messages().await.unwrap(); + let (messages, finished) = store.load_messages().await.unwrap(); assert_eq!(2, messages.len()); + assert_eq!(1, finished.len()); let msg = messages.get(&id0).unwrap(); assert_eq!("id0-2", msg.data); diff --git a/src/common/procedure/src/store/state_store.rs b/src/common/procedure/src/store/state_store.rs index 88db83cf75..22686c8092 100644 --- a/src/common/procedure/src/store/state_store.rs +++ b/src/common/procedure/src/store/state_store.rs @@ -45,7 +45,11 @@ pub trait StateStore: Send + Sync { async fn walk_top_down(&self, path: &str) -> Result; /// Deletes key-value pairs by `keys`. - async fn delete(&self, keys: &[String]) -> Result<()>; + async fn batch_delete(&self, keys: &[String]) -> Result<()>; + + /// Deletes one key-value pair by `key`. Return `Ok` if the key + /// does not exist. + async fn delete(&self, key: &str) -> Result<()>; } /// Reference counted pointer to [StateStore]. @@ -80,8 +84,6 @@ impl StateStore for ObjectStateStore { } async fn walk_top_down(&self, path: &str) -> Result { - let path_string = path.to_string(); - let mut lister = self .store .scan(path) @@ -92,12 +94,11 @@ impl StateStore for ObjectStateStore { StatusCode::StorageUnavailable, )) }) - .with_context(|_| ListStateSnafu { - path: path_string.clone(), - })?; + .context(ListStateSnafu { path })?; let store = self.store.clone(); + let path_string = path.to_string(); let stream = try_stream!({ while let Some(res) = lister.next().await { let entry = res @@ -138,13 +139,22 @@ impl StateStore for ObjectStateStore { Ok(Box::pin(stream)) } - async fn delete(&self, keys: &[String]) -> Result<()> { - for key in keys { - self.store - .delete(key) - .await - .context(DeleteStateSnafu { key })?; - } + async fn batch_delete(&self, keys: &[String]) -> Result<()> { + self.store + .remove(keys.to_vec()) + .await + .with_context(|_| DeleteStateSnafu { + key: format!("{:?}", keys), + })?; + + Ok(()) + } + + async fn delete(&self, key: &str) -> Result<()> { + self.store + .delete(key) + .await + .with_context(|_| DeleteStateSnafu { key })?; Ok(()) } @@ -215,7 +225,7 @@ mod tests { ); state_store - .delete(&["a/2".to_string(), "b/1".to_string()]) + .batch_delete(&["a/2".to_string(), "b/1".to_string()]) .await .unwrap(); let mut data: Vec<_> = state_store @@ -228,4 +238,40 @@ mod tests { data.sort_unstable_by(|a, b| a.0.cmp(&b.0)); assert_eq!(vec![("a/1".to_string(), b"v1".to_vec()),], data); } + + #[tokio::test] + async fn test_object_state_store_delete() { + let dir = create_temp_dir("state_store_list"); + let store_dir = dir.path().to_str().unwrap(); + let mut builder = Builder::default(); + builder.root(store_dir); + + let object_store = ObjectStore::new(builder).unwrap().finish(); + let state_store = ObjectStateStore::new(object_store); + + state_store.put("a/1", b"v1".to_vec()).await.unwrap(); + state_store.put("a/2", b"v2".to_vec()).await.unwrap(); + state_store.put("b/1", b"v3".to_vec()).await.unwrap(); + + state_store.delete("b/1").await.unwrap(); + + let mut data: Vec<_> = state_store + .walk_top_down("a/") + .await + .unwrap() + .try_collect() + .await + .unwrap(); + data.sort_unstable_by(|a, b| a.0.cmp(&b.0)); + assert_eq!( + vec![ + ("a/1".to_string(), b"v1".to_vec()), + ("a/2".to_string(), b"v2".to_vec()), + ], + data + ); + + // Delete returns Ok even the key doesn't exist. + state_store.delete("b/1").await.unwrap(); + } } diff --git a/src/meta-srv/src/procedure/state_store.rs b/src/meta-srv/src/procedure/state_store.rs index 19bb226b01..5745f10c00 100644 --- a/src/meta-srv/src/procedure/state_store.rs +++ b/src/meta-srv/src/procedure/state_store.rs @@ -109,7 +109,7 @@ impl StateStore for MetaStateStore { Ok(Box::pin(stream)) } - async fn delete(&self, keys: &[String]) -> Result<()> { + async fn batch_delete(&self, keys: &[String]) -> Result<()> { let _ = self .kv_store .batch_delete(BatchDeleteRequest { @@ -126,6 +126,10 @@ impl StateStore for MetaStateStore { })?; Ok(()) } + + async fn delete(&self, key: &str) -> Result<()> { + self.batch_delete(&[key.to_string()]).await + } } #[cfg(test)] @@ -184,7 +188,7 @@ mod tests { ); store - .delete(&["a/2".to_string(), "b/1".to_string()]) + .batch_delete(&["a/2".to_string(), "b/1".to_string()]) .await .unwrap();