diff --git a/Cargo.toml b/Cargo.toml index 85f579c40..6a3647cb1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,6 @@ serde_json = "1" num_cpus = "1" fs2={version="0.4", optional=true} levenshtein_automata = "0.2" -notify = {version="4", optional=true} uuid = { version = "0.8", features = ["v4", "serde"] } crossbeam = "0.8" futures = {version = "0.3", features=["thread-pool"] } @@ -73,7 +72,7 @@ overflow-checks = true [features] default = ["mmap"] -mmap = ["fs2", "tempfile", "memmap", "notify"] +mmap = ["fs2", "tempfile", "memmap"] brotli-compression = ["brotli"] lz4-compression = ["lz4"] failpoints = ["fail/failpoints"] diff --git a/src/directory/file_watcher.rs b/src/directory/file_watcher.rs new file mode 100644 index 000000000..27cbaddf9 --- /dev/null +++ b/src/directory/file_watcher.rs @@ -0,0 +1,138 @@ +use crate::directory::{WatchCallback, WatchCallbackList, WatchHandle}; +use crc32fast::Hasher; +use std::fs; +use std::io; +use std::io::BufRead; +use std::path::PathBuf; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::thread; +use std::time::Duration; + +pub const POLLING_INTERVAL: Duration = Duration::from_millis(if cfg!(test) { 1 } else { 500 }); + +// Watches a file and executes registered callbacks when the file is modified. +pub struct FileWatcher { + path: Arc, + callbacks: Arc, + state: Arc, // 0: new, 1: runnable, 2: terminated +} + +impl FileWatcher { + pub fn new(path: &PathBuf) -> FileWatcher { + FileWatcher { + path: Arc::new(path.clone()), + callbacks: Default::default(), + state: Default::default(), + } + } + + pub fn spawn(&self) { + if self.state.compare_and_swap(0, 1, Ordering::SeqCst) > 0 { + return; + } + + let path = self.path.clone(); + let callbacks = self.callbacks.clone(); + let state = self.state.clone(); + + thread::Builder::new() + .name("thread-tantivy-meta-file-watcher".to_string()) + .spawn(move || { + let mut current_checksum = None; + + while state.load(Ordering::SeqCst) == 1 { + if let Ok(checksum) = FileWatcher::compute_checksum(&path) { + // `None.unwrap_or_else(|| !checksum) != checksum` evaluates to `true` + if current_checksum.unwrap_or_else(|| !checksum) != checksum { + info!("Meta file {:?} was modified", path); + current_checksum = Some(checksum); + futures::executor::block_on(callbacks.broadcast()); + } + } + + thread::sleep(POLLING_INTERVAL); + } + }) + .expect("Failed to spawn meta file watcher thread"); + } + + pub fn watch(&self, callback: WatchCallback) -> WatchHandle { + let handle = self.callbacks.subscribe(callback); + self.spawn(); + handle + } + + fn compute_checksum(path: &PathBuf) -> Result { + let reader = match fs::File::open(path) { + Ok(f) => io::BufReader::new(f), + Err(e) => { + warn!("Failed to open meta file {:?}: {:?}", path, e); + return Err(e); + } + }; + + let mut hasher = Hasher::new(); + + for line in reader.lines() { + hasher.update(line?.as_bytes()) + } + + Ok(hasher.finalize()) + } +} + +impl Drop for FileWatcher { + fn drop(&mut self) { + self.state.store(2, Ordering::SeqCst); + } +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + fn test_file_watcher() { + let tmp_dir = tempfile::TempDir::new().unwrap(); + let tmp_file = tmp_dir.path().join("watched.txt"); + + let counter: Arc = Default::default(); + let _handle; + let state; + let (tx, rx) = crossbeam::channel::unbounded(); + let timeout = Duration::from_millis(100); + + { + let watcher = FileWatcher::new(&tmp_file); + + state = watcher.state.clone(); + assert_eq!(state.load(Ordering::SeqCst), 0); + + let counter_clone = counter.clone(); + + _handle = watcher.watch(Box::new(move || { + let val = counter_clone.fetch_add(1, Ordering::SeqCst); + tx.send(val + 1).unwrap(); + })); + + assert_eq!(counter.load(Ordering::SeqCst), 0); + assert_eq!(state.load(Ordering::SeqCst), 1); + + fs::write(&tmp_file, b"foo").unwrap(); + assert_eq!(rx.recv_timeout(timeout), Ok(1)); + + fs::write(&tmp_file, b"foo").unwrap(); + assert!(rx.recv_timeout(timeout).is_err()); + + fs::write(&tmp_file, b"bar").unwrap(); + assert_eq!(rx.recv_timeout(timeout), Ok(2)); + } + + fs::write(&tmp_file, b"qux").unwrap(); + thread::sleep(Duration::from_millis(10)); + assert_eq!(counter.load(Ordering::SeqCst), 2); + assert_eq!(state.load(Ordering::SeqCst), 2); + } +} diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index 89bdffff8..6093f0c93 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -1,6 +1,7 @@ use crate::core::META_FILEPATH; use crate::directory::error::LockError; use crate::directory::error::{DeleteError, OpenDirectoryError, OpenReadError, OpenWriteError}; +use crate::directory::file_watcher::FileWatcher; use crate::directory::AntiCallToken; use crate::directory::BoxedData; use crate::directory::Directory; @@ -8,14 +9,10 @@ use crate::directory::DirectoryLock; use crate::directory::FileSlice; use crate::directory::Lock; use crate::directory::WatchCallback; -use crate::directory::WatchCallbackList; use crate::directory::WatchHandle; use crate::directory::{TerminatingWrite, WritePtr}; use fs2::FileExt; use memmap::Mmap; -use notify::RawEvent; -use notify::RecursiveMode; -use notify::Watcher; use serde::{Deserialize, Serialize}; use stable_deref_trait::StableDeref; use std::convert::From; @@ -26,12 +23,9 @@ use std::io::{self, Seek, SeekFrom}; use std::io::{BufWriter, Read, Write}; use std::path::{Path, PathBuf}; use std::result; -use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::Arc; -use std::sync::Mutex; use std::sync::RwLock; use std::sync::Weak; -use std::thread; use std::{collections::HashMap, ops::Deref}; use tempfile::TempDir; @@ -137,67 +131,6 @@ impl MmapCache { } } -struct WatcherWrapper { - _watcher: Mutex, - watcher_router: Arc, -} - -impl WatcherWrapper { - pub fn new(path: &Path) -> Result { - let (tx, watcher_recv): (Sender, Receiver) = channel(); - // We need to initialize the - let watcher = notify::raw_watcher(tx) - .and_then(|mut watcher| { - watcher.watch(path, RecursiveMode::Recursive)?; - Ok(watcher) - }) - .map_err(|err| match err { - notify::Error::PathNotFound => OpenDirectoryError::DoesNotExist(path.to_owned()), - _ => { - panic!("Unknown error while starting watching directory {:?}", path); - } - })?; - let watcher_router: Arc = Default::default(); - let watcher_router_clone = watcher_router.clone(); - thread::Builder::new() - .name("meta-file-watch-thread".to_string()) - .spawn(move || { - loop { - match watcher_recv.recv().map(|evt| evt.path) { - Ok(Some(changed_path)) => { - // ... Actually subject to false positive. - // We might want to be more accurate than this at one point. - if let Some(filename) = changed_path.file_name() { - if filename == *META_FILEPATH { - let _ = watcher_router_clone.broadcast(); - } - } - } - Ok(None) => { - // not an event we are interested in. - } - Err(_e) => { - // the watch send channel was dropped - break; - } - } - } - }) - .map_err(|io_error| OpenDirectoryError::IoError { - io_error, - directory_path: path.to_path_buf(), - })?; - Ok(WatcherWrapper { - _watcher: Mutex::new(watcher), - watcher_router, - }) - } - - pub fn watch(&mut self, watch_callback: WatchCallback) -> WatchHandle { - self.watcher_router.subscribe(watch_callback) - } -} - /// Directory storing data in files, read via mmap. /// /// The Mmap object are cached to limit the @@ -219,40 +152,21 @@ struct MmapDirectoryInner { root_path: PathBuf, mmap_cache: RwLock, _temp_directory: Option, - watcher: RwLock>, + watcher: FileWatcher, } impl MmapDirectoryInner { fn new(root_path: PathBuf, temp_directory: Option) -> MmapDirectoryInner { MmapDirectoryInner { - root_path, mmap_cache: Default::default(), _temp_directory: temp_directory, - watcher: RwLock::new(None), + watcher: FileWatcher::new(&root_path.join(*META_FILEPATH)), + root_path: root_path, } } - fn watch(&self, watch_callback: WatchCallback) -> crate::Result { - // a lot of juggling here, to ensure we don't do anything that panics - // while the rwlock is held. That way we ensure that the rwlock cannot - // be poisoned. - // - // The downside is that we might create a watch wrapper that is not useful. - let need_initialization = self.watcher.read().unwrap().is_none(); - if need_initialization { - let watch_wrapper = WatcherWrapper::new(&self.root_path)?; - let mut watch_wlock = self.watcher.write().unwrap(); - // the watcher could have been initialized when we released the lock, and - // we do not want to lose the watched files that were set. - if watch_wlock.is_none() { - *watch_wlock = Some(watch_wrapper); - } - } - if let Some(watch_wrapper) = self.watcher.write().unwrap().as_mut() { - Ok(watch_wrapper.watch(watch_callback)) - } else { - unreachable!("At this point, watch wrapper is supposed to be initialized"); - } + fn watch(&self, callback: WatchCallback) -> crate::Result { + Ok(self.watcher.watch(callback)) } } @@ -557,8 +471,6 @@ mod tests { use crate::Index; use crate::ReloadPolicy; use crate::{common::HasLen, indexer::LogMergePolicy}; - use std::fs; - use std::sync::atomic::{AtomicUsize, Ordering}; #[test] fn test_open_non_existent_path() { @@ -647,27 +559,6 @@ mod tests { assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0); } - #[test] - fn test_watch_wrapper() { - let counter: Arc = Default::default(); - let counter_clone = counter.clone(); - let tmp_dir = tempfile::TempDir::new().unwrap(); - let tmp_dirpath = tmp_dir.path().to_owned(); - let mut watch_wrapper = WatcherWrapper::new(&tmp_dirpath).unwrap(); - let tmp_file = tmp_dirpath.join(*META_FILEPATH); - let _handle = watch_wrapper.watch(Box::new(move || { - counter_clone.fetch_add(1, Ordering::SeqCst); - })); - let (sender, receiver) = crossbeam::channel::unbounded(); - let _handle2 = watch_wrapper.watch(Box::new(move || { - let _ = sender.send(()); - })); - assert_eq!(counter.load(Ordering::SeqCst), 0); - fs::write(&tmp_file, b"whateverwilldo").unwrap(); - assert!(receiver.recv().is_ok()); - assert!(counter.load(Ordering::SeqCst) >= 1); - } - #[test] fn test_mmap_released() { let mmap_directory = MmapDirectory::create_from_tempdir().unwrap(); diff --git a/src/directory/mod.rs b/src/directory/mod.rs index c52b62526..ae65833d0 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -10,6 +10,7 @@ mod mmap_directory; mod directory; mod directory_lock; mod file_slice; +mod file_watcher; mod footer; mod managed_directory; mod owned_bytes; diff --git a/src/directory/tests.rs b/src/directory/tests.rs index dceda43f2..b677e9590 100644 --- a/src/directory/tests.rs +++ b/src/directory/tests.rs @@ -190,38 +190,33 @@ fn test_directory_delete(directory: &dyn Directory) -> crate::Result<()> { } fn test_watch(directory: &dyn Directory) { - let num_progress: Arc = Default::default(); let counter: Arc = Default::default(); - let counter_clone = counter.clone(); - let (sender, receiver) = crossbeam::channel::unbounded(); - let watch_callback = Box::new(move || { - counter_clone.fetch_add(1, SeqCst); - }); - // This callback is used to synchronize watching in our unit test. - // We bind it to a variable because the callback is removed when that - // handle is dropped. - let watch_handle = directory.watch(watch_callback).unwrap(); - let _progress_listener = directory + let (tx, rx) = crossbeam::channel::unbounded(); + let timeout = Duration::from_millis(500); + + let handle = directory .watch(Box::new(move || { - let val = num_progress.fetch_add(1, SeqCst); - let _ = sender.send(val); + let val = counter.fetch_add(1, SeqCst); + tx.send(val + 1).unwrap(); })) .unwrap(); - for i in 0..10 { - assert!(i <= counter.load(SeqCst)); - assert!(directory - .atomic_write(Path::new("meta.json"), b"random_test_data_2") - .is_ok()); - assert_eq!(receiver.recv_timeout(Duration::from_millis(500)), Ok(i)); - assert!(i + 1 <= counter.load(SeqCst)); // notify can trigger more than once. - } - mem::drop(watch_handle); assert!(directory - .atomic_write(Path::new("meta.json"), b"random_test_data") + .atomic_write(Path::new("meta.json"), b"foo") .is_ok()); - assert!(receiver.recv_timeout(Duration::from_millis(500)).is_ok()); - assert!(10 <= counter.load(SeqCst)); + assert_eq!(rx.recv_timeout(timeout), Ok(1)); + + assert!(directory + .atomic_write(Path::new("meta.json"), b"bar") + .is_ok()); + assert_eq!(rx.recv_timeout(timeout), Ok(2)); + + mem::drop(handle); + + assert!(directory + .atomic_write(Path::new("meta.json"), b"qux") + .is_ok()); + assert!(rx.recv_timeout(timeout).is_err()); } fn test_lock_non_blocking(directory: &dyn Directory) {