fix: fix wait procedure watcher bug (#1987)

This commit is contained in:
Weny Xu
2023-07-18 18:07:31 +09:00
committed by GitHub
parent 1783e4c5cb
commit 37dad206f4
3 changed files with 94 additions and 2 deletions

18
Cargo.lock generated
View File

@@ -4300,6 +4300,17 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "hostname"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c731c3e10504cc8ed35cfe2f1db4c9274c3d35fa486e3b31df46f068ef3e867"
dependencies = [
"libc",
"match_cfg",
"winapi",
]
[[package]]
name = "http"
version = "0.2.9"
@@ -5108,6 +5119,12 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d"
[[package]]
name = "match_cfg"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4"
[[package]]
name = "matchers"
version = "0.1.0"
@@ -8697,6 +8714,7 @@ dependencies = [
"digest",
"futures",
"hex",
"hostname",
"http-body",
"humantime-serde",
"hyper",

View File

@@ -545,7 +545,7 @@ impl TaskFunction<Error> for RemoveOutdatedMetaFunction {
/// Create a new [ProcedureMeta] for test purpose.
#[cfg(test)]
mod test_util {
pub(crate) mod test_util {
use common_test_util::temp_dir::TempDir;
use object_store::services::Fs as Builder;
use object_store::ObjectStore;

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::debug;
use snafu::ResultExt;
use tokio::sync::watch::Receiver;
@@ -34,8 +35,81 @@ pub async fn wait(watcher: &mut Watcher) -> Result<()> {
return Err(error.clone()).context(ProcedureExecSnafu);
}
ProcedureState::Retrying { error } => {
return Err(error.clone()).context(ProcedureExecSnafu);
debug!("retrying, source: {}", error)
}
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use common_error::mock::MockError;
use common_error::status_code::StatusCode;
use common_test_util::temp_dir::create_temp_dir;
use super::*;
use crate::error::Error;
use crate::local::{test_util, LocalManager, ManagerConfig};
use crate::store::state_store::ObjectStateStore;
use crate::{
Context, LockKey, Procedure, ProcedureId, ProcedureManager, ProcedureWithId, Status,
};
#[tokio::test]
async fn test_success_after_retry() {
let dir = create_temp_dir("after_retry");
let config = ManagerConfig {
parent_path: "data/".to_string(),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
..Default::default()
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let manager = LocalManager::new(config, state_store);
#[derive(Debug)]
struct MockProcedure {
error: bool,
}
#[async_trait]
impl Procedure for MockProcedure {
fn type_name(&self) -> &str {
"MockProcedure"
}
async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
if self.error {
self.error = !self.error;
Err(Error::retry_later(MockError::new(StatusCode::Internal)))
} else {
Ok(Status::Done)
}
}
fn dump(&self) -> Result<String> {
Ok(String::new())
}
fn lock_key(&self) -> LockKey {
LockKey::single("test.submit")
}
}
let procedure_id = ProcedureId::random();
let mut watcher = manager
.submit(ProcedureWithId {
id: procedure_id,
procedure: Box::new(MockProcedure { error: true }),
})
.await
.unwrap();
wait(&mut watcher).await.unwrap();
}
}