From 37dad206f4ef1d942076b544a5368458cf86dc5e Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 18 Jul 2023 18:07:31 +0900 Subject: [PATCH] fix: fix wait procedure watcher bug (#1987) --- Cargo.lock | 18 +++++++ src/common/procedure/src/local.rs | 2 +- src/common/procedure/src/watcher.rs | 76 ++++++++++++++++++++++++++++- 3 files changed, 94 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 90a30a51eb..110f5f2ce5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4300,6 +4300,17 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "hostname" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c731c3e10504cc8ed35cfe2f1db4c9274c3d35fa486e3b31df46f068ef3e867" +dependencies = [ + "libc", + "match_cfg", + "winapi", +] + [[package]] name = "http" version = "0.2.9" @@ -5108,6 +5119,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" +[[package]] +name = "match_cfg" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" + [[package]] name = "matchers" version = "0.1.0" @@ -8697,6 +8714,7 @@ dependencies = [ "digest", "futures", "hex", + "hostname", "http-body", "humantime-serde", "hyper", diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index 8d33c49b02..d32efdb494 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -545,7 +545,7 @@ impl TaskFunction for RemoveOutdatedMetaFunction { /// Create a new [ProcedureMeta] for test purpose. #[cfg(test)] -mod test_util { +pub(crate) mod test_util { use common_test_util::temp_dir::TempDir; use object_store::services::Fs as Builder; use object_store::ObjectStore; diff --git a/src/common/procedure/src/watcher.rs b/src/common/procedure/src/watcher.rs index 13d32f73b3..576c3d420b 100644 --- a/src/common/procedure/src/watcher.rs +++ b/src/common/procedure/src/watcher.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_telemetry::debug; use snafu::ResultExt; use tokio::sync::watch::Receiver; @@ -34,8 +35,81 @@ pub async fn wait(watcher: &mut Watcher) -> Result<()> { return Err(error.clone()).context(ProcedureExecSnafu); } ProcedureState::Retrying { error } => { - return Err(error.clone()).context(ProcedureExecSnafu); + debug!("retrying, source: {}", error) } } } } + +#[cfg(test)] +mod tests { + + use std::sync::Arc; + use std::time::Duration; + + use async_trait::async_trait; + use common_error::mock::MockError; + use common_error::status_code::StatusCode; + use common_test_util::temp_dir::create_temp_dir; + + use super::*; + use crate::error::Error; + use crate::local::{test_util, LocalManager, ManagerConfig}; + use crate::store::state_store::ObjectStateStore; + use crate::{ + Context, LockKey, Procedure, ProcedureId, ProcedureManager, ProcedureWithId, Status, + }; + + #[tokio::test] + async fn test_success_after_retry() { + let dir = create_temp_dir("after_retry"); + let config = ManagerConfig { + parent_path: "data/".to_string(), + max_retry_times: 3, + retry_delay: Duration::from_millis(500), + ..Default::default() + }; + let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); + let manager = LocalManager::new(config, state_store); + + #[derive(Debug)] + struct MockProcedure { + error: bool, + } + + #[async_trait] + impl Procedure for MockProcedure { + fn type_name(&self) -> &str { + "MockProcedure" + } + + async fn execute(&mut self, _ctx: &Context) -> Result { + if self.error { + self.error = !self.error; + Err(Error::retry_later(MockError::new(StatusCode::Internal))) + } else { + Ok(Status::Done) + } + } + + fn dump(&self) -> Result { + Ok(String::new()) + } + + fn lock_key(&self) -> LockKey { + LockKey::single("test.submit") + } + } + + let procedure_id = ProcedureId::random(); + let mut watcher = manager + .submit(ProcedureWithId { + id: procedure_id, + procedure: Box::new(MockProcedure { error: true }), + }) + .await + .unwrap(); + + wait(&mut watcher).await.unwrap(); + } +}