diff --git a/Cargo.lock b/Cargo.lock index f4c261f768..922dee62dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -703,6 +703,7 @@ dependencies = [ "common-macro", "common-telemetry", "common-test-util", + "crossbeam-channel", "digest", "hex", "notify", @@ -2416,11 +2417,10 @@ dependencies = [ [[package]] name = "crossbeam-channel" -version = "0.5.10" +version = "0.5.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82a9b73a36529d9c47029b9fb3a6f0ea3cc916a261195352ba19e770fc1748b2" +checksum = "ab3db02a9c5b5121e1e42fbdb1aeb65f5e02624cc58c43f2884c6ccac0b82f95" dependencies = [ - "cfg-if 1.0.0", "crossbeam-utils", ] diff --git a/src/auth/Cargo.toml b/src/auth/Cargo.toml index db8c200545..5fa4bcaa2a 100644 --- a/src/auth/Cargo.toml +++ b/src/auth/Cargo.toml @@ -28,3 +28,4 @@ tokio.workspace = true [dev-dependencies] common-test-util.workspace = true +crossbeam-channel = "0.5.12" diff --git a/src/auth/src/user_provider/watch_file_user_provider.rs b/src/auth/src/user_provider/watch_file_user_provider.rs index 19b6530f21..50a7cfa566 100644 --- a/src/auth/src/user_provider/watch_file_user_provider.rs +++ b/src/auth/src/user_provider/watch_file_user_provider.rs @@ -36,14 +36,21 @@ type WatchedCredentialRef = Arc>>>>; /// Empty file is invalid; but file not exist means every user can be authenticated. pub(crate) struct WatchFileUserProvider { users: WatchedCredentialRef, + #[cfg(test)] + notify: crossbeam_channel::Receiver<()>, } impl WatchFileUserProvider { pub fn new(filepath: &str) -> Result { + #[cfg(test)] + let (tx_notify, rx_notify) = crossbeam_channel::unbounded(); + let credential = load_credential_from_file(filepath)?; let users = Arc::new(Mutex::new(credential)); let this = WatchFileUserProvider { users: users.clone(), + #[cfg(test)] + notify: rx_notify, }; let (tx, rx) = channel::>(); @@ -66,13 +73,15 @@ impl WatchFileUserProvider { match load_credential_from_file(&filepath) { Ok(credential) => { *users.lock().expect("users credential must be valid") = credential; - info!("User provider file {filepath} reloaded") + info!("User provider file {filepath} reloaded"); + #[cfg(test)] + let _ = tx_notify.send(()); } Err(err) => { warn!( ?err, "Fail to load credential from file {filepath}; keep the old one", - ) + ); } } } @@ -117,7 +126,6 @@ pub mod test { use std::time::Duration; use common_test_util::temp_dir::create_temp_dir; - use tokio::time::sleep; use crate::user_provider::watch_file_user_provider::WatchFileUserProvider; use crate::user_provider::{Identity, Password}; @@ -140,12 +148,14 @@ pub mod test { ok, "username: {}, password: {}", username, - password + password, ); } #[tokio::test] async fn test_file_provider() { + common_telemetry::init_default_ut_logging(); + let dir = create_temp_dir("test_file_provider"); let file_path = format!("{}/test_file_provider", dir.path().to_str().unwrap()); { @@ -164,33 +174,45 @@ pub mod test { { // update the tmp file + let _ = provider.notify.try_iter().into_iter(); let file = File::create(&file_path).unwrap(); let mut lw = LineWriter::new(file); assert!(writeln!(lw, "root=654321").is_ok()); lw.flush().unwrap(); } - sleep(Duration::from_secs(2)).await; // wait the watcher to apply the change + provider + .notify + .recv_timeout(Duration::from_secs(1)) + .unwrap(); test_authenticate(&provider, "root", "123456", false).await; test_authenticate(&provider, "root", "654321", true).await; test_authenticate(&provider, "admin", "654321", false).await; { // remove the tmp file + let _ = provider.notify.try_iter().into_iter(); std::fs::remove_file(&file_path).unwrap(); } - sleep(Duration::from_secs(2)).await; // wait the watcher to apply the change + provider + .notify + .recv_timeout(Duration::from_secs(1)) + .unwrap(); test_authenticate(&provider, "root", "123456", true).await; test_authenticate(&provider, "root", "654321", true).await; test_authenticate(&provider, "admin", "654321", true).await; { // recreate the tmp file + let _ = provider.notify.try_iter().into_iter(); let file = File::create(&file_path).unwrap(); let mut lw = LineWriter::new(file); assert!(writeln!(lw, "root=123456").is_ok()); lw.flush().unwrap(); } - sleep(Duration::from_secs(2)).await; // wait the watcher to apply the change + provider + .notify + .recv_timeout(Duration::from_secs(1)) + .unwrap(); test_authenticate(&provider, "root", "123456", true).await; test_authenticate(&provider, "root", "654321", false).await; test_authenticate(&provider, "admin", "654321", false).await;