diff --git a/Cargo.lock b/Cargo.lock index f08097c95a..271eb444de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -738,12 +738,12 @@ dependencies = [ "api", "async-trait", "common-base", + "common-config", "common-error", "common-macro", "common-telemetry", "common-test-util", "digest", - "notify", "sha1", "snafu 0.8.6", "sql", @@ -2055,6 +2055,7 @@ dependencies = [ "datanode", "humantime-serde", "meta-client", + "notify", "object-store", "serde", "serde_json", @@ -2253,6 +2254,7 @@ dependencies = [ "arrow-flight", "bytes", "common-base", + "common-config", "common-error", "common-macro", "common-recordbatch", @@ -2266,7 +2268,6 @@ dependencies = [ "hyper 1.6.0", "hyper-util", "lazy_static", - "notify", "prost 0.13.5", "rand 0.9.1", "serde", diff --git a/src/auth/Cargo.toml b/src/auth/Cargo.toml index 905bd72373..9c91023da5 100644 --- a/src/auth/Cargo.toml +++ b/src/auth/Cargo.toml @@ -15,11 +15,11 @@ workspace = true api.workspace = true async-trait.workspace = true common-base.workspace = true +common-config.workspace = true common-error.workspace = true common-macro.workspace = true common-telemetry.workspace = true digest = "0.10" -notify.workspace = true sha1 = "0.10" snafu.workspace = true sql.workspace = true diff --git a/src/auth/src/error.rs b/src/auth/src/error.rs index d28a85f828..a8dfe7f629 100644 --- a/src/auth/src/error.rs +++ b/src/auth/src/error.rs @@ -75,11 +75,12 @@ pub enum Error { username: String, }, - #[snafu(display("Failed to initialize a watcher for file {}", path))] + #[snafu(display("Failed to initialize a file watcher"))] FileWatch { - path: String, #[snafu(source)] - error: notify::Error, + source: common_config::error::Error, + #[snafu(implicit)] + location: Location, }, #[snafu(display("User is not authorized to perform this action"))] diff --git a/src/auth/src/user_provider/watch_file_user_provider.rs b/src/auth/src/user_provider/watch_file_user_provider.rs index 4df17502b7..451efd5cc4 100644 --- a/src/auth/src/user_provider/watch_file_user_provider.rs +++ b/src/auth/src/user_provider/watch_file_user_provider.rs @@ -12,16 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::path::Path; -use std::sync::mpsc::channel; use std::sync::{Arc, Mutex}; use async_trait::async_trait; +use common_config::file_watcher::{FileWatcherBuilder, FileWatcherConfig}; use common_telemetry::{info, warn}; -use notify::{EventKind, RecursiveMode, Watcher}; -use snafu::{ResultExt, ensure}; +use snafu::ResultExt; -use crate::error::{FileWatchSnafu, InvalidConfigSnafu, Result}; +use crate::error::{FileWatchSnafu, Result}; use crate::user_provider::{UserInfoMap, authenticate_with_credential, load_credential_from_file}; use crate::{Identity, Password, UserInfoRef, UserProvider}; @@ -41,61 +39,36 @@ impl WatchFileUserProvider { pub fn new(filepath: &str) -> Result { let credential = load_credential_from_file(filepath)?; let users = Arc::new(Mutex::new(credential)); - let this = WatchFileUserProvider { - users: users.clone(), - }; - let (tx, rx) = channel::>(); - let mut debouncer = - notify::recommended_watcher(tx).context(FileWatchSnafu { path: "" })?; - let mut dir = Path::new(filepath).to_path_buf(); - ensure!( - dir.pop(), - InvalidConfigSnafu { - value: filepath, - msg: "UserProvider path must be a file path", - } - ); - debouncer - .watch(&dir, RecursiveMode::NonRecursive) - .context(FileWatchSnafu { path: filepath })?; + let users_clone = users.clone(); + let filepath_owned = filepath.to_string(); - let filepath = filepath.to_string(); - std::thread::spawn(move || { - let filename = Path::new(&filepath).file_name(); - let _hold = debouncer; - while let Ok(res) = rx.recv() { - if let Ok(event) = res { - let is_this_file = event.paths.iter().any(|p| p.file_name() == filename); - let is_relevant_event = matches!( - event.kind, - EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_) + FileWatcherBuilder::new() + .watch_path(filepath) + .context(FileWatchSnafu)? + .config(FileWatcherConfig::new()) + .spawn(move || match load_credential_from_file(&filepath_owned) { + Ok(credential) => { + let mut users = users_clone.lock().expect("users credential must be valid"); + #[cfg(not(test))] + info!("User provider file {} reloaded", &filepath_owned); + #[cfg(test)] + info!( + "User provider file {} reloaded: {:?}", + &filepath_owned, credential ); - if is_this_file && is_relevant_event { - info!(?event.kind, "User provider file {} changed", &filepath); - match load_credential_from_file(&filepath) { - Ok(credential) => { - let mut users = - users.lock().expect("users credential must be valid"); - #[cfg(not(test))] - info!("User provider file {filepath} reloaded"); - #[cfg(test)] - info!("User provider file {filepath} reloaded: {credential:?}"); - *users = credential; - } - Err(err) => { - warn!( - ?err, - "Fail to load credential from file {filepath}; keep the old one", - ) - } - } - } + *users = credential; } - } - }); + Err(err) => { + warn!( + ?err, + "Fail to load credential from file {}; keep the old one", &filepath_owned + ) + } + }) + .context(FileWatchSnafu)?; - Ok(this) + Ok(WatchFileUserProvider { users }) } } diff --git a/src/common/config/Cargo.toml b/src/common/config/Cargo.toml index b45c03a6c3..2737f82a58 100644 --- a/src/common/config/Cargo.toml +++ b/src/common/config/Cargo.toml @@ -11,8 +11,10 @@ workspace = true common-base.workspace = true common-error.workspace = true common-macro.workspace = true +common-telemetry.workspace = true config.workspace = true humantime-serde.workspace = true +notify.workspace = true object-store.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/src/common/config/src/error.rs b/src/common/config/src/error.rs index fbce83fd00..82abd8a9b8 100644 --- a/src/common/config/src/error.rs +++ b/src/common/config/src/error.rs @@ -49,14 +49,41 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to watch file: {}", path))] + FileWatch { + path: String, + #[snafu(source)] + error: notify::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to canonicalize path: {}", path))] + CanonicalizePath { + path: String, + #[snafu(source)] + error: std::io::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Invalid path '{}': expected a file, not a directory", path))] + InvalidPath { + path: String, + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { - Error::TomlFormat { .. } | Error::LoadLayeredConfig { .. } => { - StatusCode::InvalidArguments - } + Error::TomlFormat { .. } + | Error::LoadLayeredConfig { .. } + | Error::FileWatch { .. } + | Error::InvalidPath { .. } + | Error::CanonicalizePath { .. } => StatusCode::InvalidArguments, Error::SerdeJson { .. } => StatusCode::Unexpected, } } diff --git a/src/common/config/src/file_watcher.rs b/src/common/config/src/file_watcher.rs new file mode 100644 index 0000000000..2507af024a --- /dev/null +++ b/src/common/config/src/file_watcher.rs @@ -0,0 +1,355 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Common file watching utilities for configuration hot-reloading. +//! +//! This module provides a generic file watcher that can be used to watch +//! files for changes and trigger callbacks when changes occur. +//! +//! The watcher monitors the parent directory of each file rather than the +//! file itself. This ensures that file deletions and recreations are properly +//! tracked, which is common with editors that use atomic saves or when +//! configuration files are replaced. + +use std::collections::HashSet; +use std::path::{Path, PathBuf}; +use std::sync::mpsc::channel; + +use common_telemetry::{error, info, warn}; +use notify::{EventKind, RecursiveMode, Watcher}; +use snafu::ResultExt; + +use crate::error::{CanonicalizePathSnafu, FileWatchSnafu, InvalidPathSnafu, Result}; + +/// Configuration for the file watcher behavior. +#[derive(Debug, Clone, Default)] +pub struct FileWatcherConfig { + /// Whether to include Remove events in addition to Modify and Create. + pub include_remove_events: bool, +} + +impl FileWatcherConfig { + pub fn new() -> Self { + Self::default() + } + + pub fn with_modify_and_create(mut self) -> Self { + self.include_remove_events = false; + self + } + + pub fn with_remove_events(mut self) -> Self { + self.include_remove_events = true; + self + } +} + +/// A builder for creating file watchers with flexible configuration. +/// +/// The watcher monitors the parent directory of each file to handle file +/// deletion and recreation properly. Events are filtered to only trigger +/// callbacks for the specific files being watched. +pub struct FileWatcherBuilder { + config: FileWatcherConfig, + /// Canonicalized paths of files to watch. + file_paths: Vec, +} + +impl FileWatcherBuilder { + /// Create a new builder with default configuration. + pub fn new() -> Self { + Self { + config: FileWatcherConfig::default(), + file_paths: Vec::new(), + } + } + + /// Set the watcher configuration. + pub fn config(mut self, config: FileWatcherConfig) -> Self { + self.config = config; + self + } + + /// Add a file path to watch. + /// + /// Returns an error if the path is a directory. + /// The path is canonicalized for reliable comparison with events. + pub fn watch_path>(mut self, path: P) -> Result { + let path = path.as_ref(); + snafu::ensure!( + path.is_file(), + InvalidPathSnafu { + path: path.display().to_string(), + } + ); + // Canonicalize the path for reliable comparison with event paths + let canonical = path.canonicalize().context(CanonicalizePathSnafu { + path: path.display().to_string(), + })?; + self.file_paths.push(canonical); + Ok(self) + } + + /// Add multiple file paths to watch. + /// + /// Returns an error if any path is a directory. + pub fn watch_paths, I: IntoIterator>( + mut self, + paths: I, + ) -> Result { + for path in paths { + self = self.watch_path(path)?; + } + Ok(self) + } + + /// Build and spawn the file watcher with the given callback. + /// + /// The callback is invoked when relevant file events are detected for + /// the watched files. The watcher monitors the parent directories to + /// handle file deletion and recreation properly. + /// + /// The spawned watcher thread runs for the lifetime of the process. + pub fn spawn(self, callback: F) -> Result<()> + where + F: Fn() + Send + 'static, + { + let (tx, rx) = channel::>(); + let mut watcher = + notify::recommended_watcher(tx).context(FileWatchSnafu { path: "" })?; + + // Collect unique parent directories to watch + let mut watched_dirs: HashSet = HashSet::new(); + for file_path in &self.file_paths { + if let Some(parent) = file_path.parent() + && watched_dirs.insert(parent.to_path_buf()) + { + watcher + .watch(parent, RecursiveMode::NonRecursive) + .context(FileWatchSnafu { + path: parent.display().to_string(), + })?; + } + } + + let config = self.config; + let watched_files: HashSet = self.file_paths.iter().cloned().collect(); + + info!( + "Spawning file watcher for paths: {:?} (watching parent directories)", + self.file_paths + .iter() + .map(|p| p.display().to_string()) + .collect::>() + ); + + std::thread::spawn(move || { + // Keep watcher alive in the thread + let _watcher = watcher; + + while let Ok(res) = rx.recv() { + match res { + Ok(event) => { + if !is_relevant_event(&event.kind, &config) { + continue; + } + + // Check if any of the event paths match our watched files + let is_watched_file = event.paths.iter().any(|event_path| { + // Try to canonicalize the event path for comparison + // If the file was deleted, canonicalize will fail, so we also + // compare the raw path + if let Ok(canonical) = event_path.canonicalize() + && watched_files.contains(&canonical) + { + return true; + } + // For deleted files, compare using the raw path + watched_files.contains(event_path) + }); + + if !is_watched_file { + continue; + } + + info!(?event.kind, ?event.paths, "Detected file change"); + callback(); + } + Err(err) => { + warn!("File watcher error: {}", err); + } + } + } + + error!("File watcher channel closed unexpectedly"); + }); + + Ok(()) + } +} + +impl Default for FileWatcherBuilder { + fn default() -> Self { + Self::new() + } +} + +/// Check if an event kind is relevant based on the configuration. +fn is_relevant_event(kind: &EventKind, config: &FileWatcherConfig) -> bool { + match kind { + EventKind::Modify(_) | EventKind::Create(_) => true, + EventKind::Remove(_) => config.include_remove_events, + _ => false, + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::time::Duration; + + use common_test_util::temp_dir::create_temp_dir; + + use super::*; + + #[test] + fn test_file_watcher_detects_changes() { + common_telemetry::init_default_ut_logging(); + + let dir = create_temp_dir("test_file_watcher"); + let file_path = dir.path().join("test_file.txt"); + + // Create initial file + std::fs::write(&file_path, "initial content").unwrap(); + + let counter = Arc::new(AtomicUsize::new(0)); + let counter_clone = counter.clone(); + + FileWatcherBuilder::new() + .watch_path(&file_path) + .unwrap() + .config(FileWatcherConfig::new()) + .spawn(move || { + counter_clone.fetch_add(1, Ordering::SeqCst); + }) + .unwrap(); + + // Give watcher time to start + std::thread::sleep(Duration::from_millis(100)); + + // Modify the file + std::fs::write(&file_path, "modified content").unwrap(); + + // Wait for the event to be processed + std::thread::sleep(Duration::from_millis(500)); + + assert!( + counter.load(Ordering::SeqCst) >= 1, + "Watcher should have detected at least one change" + ); + } + + #[test] + fn test_file_watcher_detects_delete_and_recreate() { + common_telemetry::init_default_ut_logging(); + + let dir = create_temp_dir("test_file_watcher_recreate"); + let file_path = dir.path().join("test_file.txt"); + + // Create initial file + std::fs::write(&file_path, "initial content").unwrap(); + + let counter = Arc::new(AtomicUsize::new(0)); + let counter_clone = counter.clone(); + + FileWatcherBuilder::new() + .watch_path(&file_path) + .unwrap() + .config(FileWatcherConfig::new()) + .spawn(move || { + counter_clone.fetch_add(1, Ordering::SeqCst); + }) + .unwrap(); + + // Give watcher time to start + std::thread::sleep(Duration::from_millis(100)); + + // Delete the file + std::fs::remove_file(&file_path).unwrap(); + std::thread::sleep(Duration::from_millis(100)); + + // Recreate the file - this should still be detected because we watch the directory + std::fs::write(&file_path, "recreated content").unwrap(); + + // Wait for the event to be processed + std::thread::sleep(Duration::from_millis(500)); + + assert!( + counter.load(Ordering::SeqCst) >= 1, + "Watcher should have detected file recreation" + ); + } + + #[test] + fn test_file_watcher_ignores_other_files() { + common_telemetry::init_default_ut_logging(); + + let dir = create_temp_dir("test_file_watcher_other"); + let watched_file = dir.path().join("watched.txt"); + let other_file = dir.path().join("other.txt"); + + // Create both files + std::fs::write(&watched_file, "watched content").unwrap(); + std::fs::write(&other_file, "other content").unwrap(); + + let counter = Arc::new(AtomicUsize::new(0)); + let counter_clone = counter.clone(); + + FileWatcherBuilder::new() + .watch_path(&watched_file) + .unwrap() + .config(FileWatcherConfig::new()) + .spawn(move || { + counter_clone.fetch_add(1, Ordering::SeqCst); + }) + .unwrap(); + + // Give watcher time to start + std::thread::sleep(Duration::from_millis(100)); + + // Modify the other file - should NOT trigger callback + std::fs::write(&other_file, "modified other content").unwrap(); + + // Wait for potential event + std::thread::sleep(Duration::from_millis(500)); + + assert_eq!( + counter.load(Ordering::SeqCst), + 0, + "Watcher should not have detected changes to other files" + ); + + // Now modify the watched file - SHOULD trigger callback + std::fs::write(&watched_file, "modified watched content").unwrap(); + + // Wait for the event to be processed + std::thread::sleep(Duration::from_millis(500)); + + assert!( + counter.load(Ordering::SeqCst) >= 1, + "Watcher should have detected change to watched file" + ); + } +} diff --git a/src/common/config/src/lib.rs b/src/common/config/src/lib.rs index cc25ebce16..eea3b1351d 100644 --- a/src/common/config/src/lib.rs +++ b/src/common/config/src/lib.rs @@ -14,6 +14,7 @@ pub mod config; pub mod error; +pub mod file_watcher; use std::time::Duration; diff --git a/src/common/grpc/Cargo.toml b/src/common/grpc/Cargo.toml index 9978791a7a..e57b9124fa 100644 --- a/src/common/grpc/Cargo.toml +++ b/src/common/grpc/Cargo.toml @@ -12,6 +12,7 @@ api.workspace = true arrow-flight.workspace = true bytes.workspace = true common-base.workspace = true +common-config.workspace = true common-error.workspace = true common-macro.workspace = true common-recordbatch.workspace = true @@ -23,7 +24,6 @@ datatypes.workspace = true flatbuffers = "25.2" hyper.workspace = true lazy_static.workspace = true -notify.workspace = true prost.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/src/common/grpc/src/error.rs b/src/common/grpc/src/error.rs index 4f9b8e92dd..1d987514df 100644 --- a/src/common/grpc/src/error.rs +++ b/src/common/grpc/src/error.rs @@ -38,11 +38,10 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to watch config file path: {}", path))] + #[snafu(display("Failed to watch config file"))] FileWatch { - path: String, #[snafu(source)] - error: notify::Error, + source: common_config::error::Error, #[snafu(implicit)] location: Location, }, diff --git a/src/common/grpc/src/reloadable_tls.rs b/src/common/grpc/src/reloadable_tls.rs index c1bd3aca52..1f4f07590e 100644 --- a/src/common/grpc/src/reloadable_tls.rs +++ b/src/common/grpc/src/reloadable_tls.rs @@ -15,11 +15,10 @@ use std::path::Path; use std::result::Result as StdResult; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; +use common_config::file_watcher::{FileWatcherBuilder, FileWatcherConfig}; use common_telemetry::{error, info}; -use notify::{EventKind, RecursiveMode, Watcher}; use snafu::ResultExt; use crate::error::{FileWatchSnafu, Result}; @@ -119,45 +118,28 @@ where return Ok(()); } + let watch_paths: Vec<_> = tls_config + .get_tls_option() + .watch_paths() + .iter() + .map(|p| p.to_path_buf()) + .collect(); + let tls_config_for_watcher = tls_config.clone(); - let (tx, rx) = channel::>(); - let mut watcher = notify::recommended_watcher(tx).context(FileWatchSnafu { path: "" })?; - - // Watch all paths returned by the TlsConfigLoader - for path in tls_config.get_tls_option().watch_paths() { - watcher - .watch(path, RecursiveMode::NonRecursive) - .with_context(|_| FileWatchSnafu { - path: path.display().to_string(), - })?; - } - - info!("Spawning background task for watching TLS cert/key file changes"); - std::thread::spawn(move || { - let _watcher = watcher; - loop { - match rx.recv() { - Ok(Ok(event)) => { - if let EventKind::Modify(_) | EventKind::Create(_) = event.kind { - info!("Detected TLS cert/key file change: {:?}", event); - if let Err(err) = tls_config_for_watcher.reload() { - error!("Failed to reload TLS config: {}", err); - } else { - info!("Reloaded TLS cert/key file successfully."); - on_reload(); - } - } - } - Ok(Err(err)) => { - error!("Failed to watch TLS cert/key file: {}", err); - } - Err(err) => { - error!("TLS cert/key file watcher channel closed: {}", err); - } + FileWatcherBuilder::new() + .watch_paths(&watch_paths) + .context(FileWatchSnafu)? + .config(FileWatcherConfig::new()) + .spawn(move || { + if let Err(err) = tls_config_for_watcher.reload() { + error!("Failed to reload TLS config: {}", err); + } else { + info!("Reloaded TLS cert/key file successfully."); + on_reload(); } - } - }); + }) + .context(FileWatchSnafu)?; Ok(()) }