Implement FileWatcher

This commit is contained in:
Adrien Guillo
2020-11-05 15:00:14 -08:00
parent 6b5a5ac1d0
commit 2875deb4b1
5 changed files with 166 additions and 142 deletions

View File

@@ -30,7 +30,6 @@ serde_json = "1"
num_cpus = "1"
fs2={version="0.4", optional=true}
levenshtein_automata = "0.2"
notify = {version="4", optional=true}
uuid = { version = "0.8", features = ["v4", "serde"] }
crossbeam = "0.8"
futures = {version = "0.3", features=["thread-pool"] }
@@ -73,7 +72,7 @@ overflow-checks = true
[features]
default = ["mmap"]
mmap = ["fs2", "tempfile", "memmap", "notify"]
mmap = ["fs2", "tempfile", "memmap"]
brotli-compression = ["brotli"]
lz4-compression = ["lz4"]
failpoints = ["fail/failpoints"]

View File

@@ -0,0 +1,138 @@
use crate::directory::{WatchCallback, WatchCallbackList, WatchHandle};
use crc32fast::Hasher;
use std::fs;
use std::io;
use std::io::BufRead;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
pub const POLLING_INTERVAL: Duration = Duration::from_millis(if cfg!(test) { 1 } else { 500 });
// Watches a file and executes registered callbacks when the file is modified.
pub struct FileWatcher {
path: Arc<PathBuf>,
callbacks: Arc<WatchCallbackList>,
state: Arc<AtomicUsize>, // 0: new, 1: runnable, 2: terminated
}
impl FileWatcher {
pub fn new(path: &PathBuf) -> FileWatcher {
FileWatcher {
path: Arc::new(path.clone()),
callbacks: Default::default(),
state: Default::default(),
}
}
pub fn spawn(&self) {
if self.state.compare_and_swap(0, 1, Ordering::SeqCst) > 0 {
return;
}
let path = self.path.clone();
let callbacks = self.callbacks.clone();
let state = self.state.clone();
thread::Builder::new()
.name("thread-tantivy-meta-file-watcher".to_string())
.spawn(move || {
let mut current_checksum = None;
while state.load(Ordering::SeqCst) == 1 {
if let Ok(checksum) = FileWatcher::compute_checksum(&path) {
// `None.unwrap_or_else(|| !checksum) != checksum` evaluates to `true`
if current_checksum.unwrap_or_else(|| !checksum) != checksum {
info!("Meta file {:?} was modified", path);
current_checksum = Some(checksum);
futures::executor::block_on(callbacks.broadcast());
}
}
thread::sleep(POLLING_INTERVAL);
}
})
.expect("Failed to spawn meta file watcher thread");
}
pub fn watch(&self, callback: WatchCallback) -> WatchHandle {
let handle = self.callbacks.subscribe(callback);
self.spawn();
handle
}
fn compute_checksum(path: &PathBuf) -> Result<u32, io::Error> {
let reader = match fs::File::open(path) {
Ok(f) => io::BufReader::new(f),
Err(e) => {
warn!("Failed to open meta file {:?}: {:?}", path, e);
return Err(e);
}
};
let mut hasher = Hasher::new();
for line in reader.lines() {
hasher.update(line?.as_bytes())
}
Ok(hasher.finalize())
}
}
impl Drop for FileWatcher {
fn drop(&mut self) {
self.state.store(2, Ordering::SeqCst);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_file_watcher() {
let tmp_dir = tempfile::TempDir::new().unwrap();
let tmp_file = tmp_dir.path().join("watched.txt");
let counter: Arc<AtomicUsize> = Default::default();
let _handle;
let state;
let (tx, rx) = crossbeam::channel::unbounded();
let timeout = Duration::from_millis(100);
{
let watcher = FileWatcher::new(&tmp_file);
state = watcher.state.clone();
assert_eq!(state.load(Ordering::SeqCst), 0);
let counter_clone = counter.clone();
_handle = watcher.watch(Box::new(move || {
let val = counter_clone.fetch_add(1, Ordering::SeqCst);
tx.send(val + 1).unwrap();
}));
assert_eq!(counter.load(Ordering::SeqCst), 0);
assert_eq!(state.load(Ordering::SeqCst), 1);
fs::write(&tmp_file, b"foo").unwrap();
assert_eq!(rx.recv_timeout(timeout), Ok(1));
fs::write(&tmp_file, b"foo").unwrap();
assert!(rx.recv_timeout(timeout).is_err());
fs::write(&tmp_file, b"bar").unwrap();
assert_eq!(rx.recv_timeout(timeout), Ok(2));
}
fs::write(&tmp_file, b"qux").unwrap();
thread::sleep(Duration::from_millis(10));
assert_eq!(counter.load(Ordering::SeqCst), 2);
assert_eq!(state.load(Ordering::SeqCst), 2);
}
}

View File

@@ -1,6 +1,7 @@
use crate::core::META_FILEPATH;
use crate::directory::error::LockError;
use crate::directory::error::{DeleteError, OpenDirectoryError, OpenReadError, OpenWriteError};
use crate::directory::file_watcher::FileWatcher;
use crate::directory::AntiCallToken;
use crate::directory::BoxedData;
use crate::directory::Directory;
@@ -8,14 +9,10 @@ use crate::directory::DirectoryLock;
use crate::directory::FileSlice;
use crate::directory::Lock;
use crate::directory::WatchCallback;
use crate::directory::WatchCallbackList;
use crate::directory::WatchHandle;
use crate::directory::{TerminatingWrite, WritePtr};
use fs2::FileExt;
use memmap::Mmap;
use notify::RawEvent;
use notify::RecursiveMode;
use notify::Watcher;
use serde::{Deserialize, Serialize};
use stable_deref_trait::StableDeref;
use std::convert::From;
@@ -26,12 +23,9 @@ use std::io::{self, Seek, SeekFrom};
use std::io::{BufWriter, Read, Write};
use std::path::{Path, PathBuf};
use std::result;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::RwLock;
use std::sync::Weak;
use std::thread;
use std::{collections::HashMap, ops::Deref};
use tempfile::TempDir;
@@ -137,67 +131,6 @@ impl MmapCache {
}
}
struct WatcherWrapper {
_watcher: Mutex<notify::RecommendedWatcher>,
watcher_router: Arc<WatchCallbackList>,
}
impl WatcherWrapper {
pub fn new(path: &Path) -> Result<Self, OpenDirectoryError> {
let (tx, watcher_recv): (Sender<RawEvent>, Receiver<RawEvent>) = channel();
// We need to initialize the
let watcher = notify::raw_watcher(tx)
.and_then(|mut watcher| {
watcher.watch(path, RecursiveMode::Recursive)?;
Ok(watcher)
})
.map_err(|err| match err {
notify::Error::PathNotFound => OpenDirectoryError::DoesNotExist(path.to_owned()),
_ => {
panic!("Unknown error while starting watching directory {:?}", path);
}
})?;
let watcher_router: Arc<WatchCallbackList> = Default::default();
let watcher_router_clone = watcher_router.clone();
thread::Builder::new()
.name("meta-file-watch-thread".to_string())
.spawn(move || {
loop {
match watcher_recv.recv().map(|evt| evt.path) {
Ok(Some(changed_path)) => {
// ... Actually subject to false positive.
// We might want to be more accurate than this at one point.
if let Some(filename) = changed_path.file_name() {
if filename == *META_FILEPATH {
let _ = watcher_router_clone.broadcast();
}
}
}
Ok(None) => {
// not an event we are interested in.
}
Err(_e) => {
// the watch send channel was dropped
break;
}
}
}
})
.map_err(|io_error| OpenDirectoryError::IoError {
io_error,
directory_path: path.to_path_buf(),
})?;
Ok(WatcherWrapper {
_watcher: Mutex::new(watcher),
watcher_router,
})
}
pub fn watch(&mut self, watch_callback: WatchCallback) -> WatchHandle {
self.watcher_router.subscribe(watch_callback)
}
}
/// Directory storing data in files, read via mmap.
///
/// The Mmap object are cached to limit the
@@ -219,40 +152,21 @@ struct MmapDirectoryInner {
root_path: PathBuf,
mmap_cache: RwLock<MmapCache>,
_temp_directory: Option<TempDir>,
watcher: RwLock<Option<WatcherWrapper>>,
watcher: FileWatcher,
}
impl MmapDirectoryInner {
fn new(root_path: PathBuf, temp_directory: Option<TempDir>) -> MmapDirectoryInner {
MmapDirectoryInner {
root_path,
mmap_cache: Default::default(),
_temp_directory: temp_directory,
watcher: RwLock::new(None),
watcher: FileWatcher::new(&root_path.join(*META_FILEPATH)),
root_path: root_path,
}
}
fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle> {
// a lot of juggling here, to ensure we don't do anything that panics
// while the rwlock is held. That way we ensure that the rwlock cannot
// be poisoned.
//
// The downside is that we might create a watch wrapper that is not useful.
let need_initialization = self.watcher.read().unwrap().is_none();
if need_initialization {
let watch_wrapper = WatcherWrapper::new(&self.root_path)?;
let mut watch_wlock = self.watcher.write().unwrap();
// the watcher could have been initialized when we released the lock, and
// we do not want to lose the watched files that were set.
if watch_wlock.is_none() {
*watch_wlock = Some(watch_wrapper);
}
}
if let Some(watch_wrapper) = self.watcher.write().unwrap().as_mut() {
Ok(watch_wrapper.watch(watch_callback))
} else {
unreachable!("At this point, watch wrapper is supposed to be initialized");
}
fn watch(&self, callback: WatchCallback) -> crate::Result<WatchHandle> {
Ok(self.watcher.watch(callback))
}
}
@@ -557,8 +471,6 @@ mod tests {
use crate::Index;
use crate::ReloadPolicy;
use crate::{common::HasLen, indexer::LogMergePolicy};
use std::fs;
use std::sync::atomic::{AtomicUsize, Ordering};
#[test]
fn test_open_non_existent_path() {
@@ -647,27 +559,6 @@ mod tests {
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0);
}
#[test]
fn test_watch_wrapper() {
let counter: Arc<AtomicUsize> = Default::default();
let counter_clone = counter.clone();
let tmp_dir = tempfile::TempDir::new().unwrap();
let tmp_dirpath = tmp_dir.path().to_owned();
let mut watch_wrapper = WatcherWrapper::new(&tmp_dirpath).unwrap();
let tmp_file = tmp_dirpath.join(*META_FILEPATH);
let _handle = watch_wrapper.watch(Box::new(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
}));
let (sender, receiver) = crossbeam::channel::unbounded();
let _handle2 = watch_wrapper.watch(Box::new(move || {
let _ = sender.send(());
}));
assert_eq!(counter.load(Ordering::SeqCst), 0);
fs::write(&tmp_file, b"whateverwilldo").unwrap();
assert!(receiver.recv().is_ok());
assert!(counter.load(Ordering::SeqCst) >= 1);
}
#[test]
fn test_mmap_released() {
let mmap_directory = MmapDirectory::create_from_tempdir().unwrap();

View File

@@ -10,6 +10,7 @@ mod mmap_directory;
mod directory;
mod directory_lock;
mod file_slice;
mod file_watcher;
mod footer;
mod managed_directory;
mod owned_bytes;

View File

@@ -190,38 +190,33 @@ fn test_directory_delete(directory: &dyn Directory) -> crate::Result<()> {
}
fn test_watch(directory: &dyn Directory) {
let num_progress: Arc<AtomicUsize> = Default::default();
let counter: Arc<AtomicUsize> = Default::default();
let counter_clone = counter.clone();
let (sender, receiver) = crossbeam::channel::unbounded();
let watch_callback = Box::new(move || {
counter_clone.fetch_add(1, SeqCst);
});
// This callback is used to synchronize watching in our unit test.
// We bind it to a variable because the callback is removed when that
// handle is dropped.
let watch_handle = directory.watch(watch_callback).unwrap();
let _progress_listener = directory
let (tx, rx) = crossbeam::channel::unbounded();
let timeout = Duration::from_millis(500);
let handle = directory
.watch(Box::new(move || {
let val = num_progress.fetch_add(1, SeqCst);
let _ = sender.send(val);
let val = counter.fetch_add(1, SeqCst);
tx.send(val + 1).unwrap();
}))
.unwrap();
for i in 0..10 {
assert!(i <= counter.load(SeqCst));
assert!(directory
.atomic_write(Path::new("meta.json"), b"random_test_data_2")
.is_ok());
assert_eq!(receiver.recv_timeout(Duration::from_millis(500)), Ok(i));
assert!(i + 1 <= counter.load(SeqCst)); // notify can trigger more than once.
}
mem::drop(watch_handle);
assert!(directory
.atomic_write(Path::new("meta.json"), b"random_test_data")
.atomic_write(Path::new("meta.json"), b"foo")
.is_ok());
assert!(receiver.recv_timeout(Duration::from_millis(500)).is_ok());
assert!(10 <= counter.load(SeqCst));
assert_eq!(rx.recv_timeout(timeout), Ok(1));
assert!(directory
.atomic_write(Path::new("meta.json"), b"bar")
.is_ok());
assert_eq!(rx.recv_timeout(timeout), Ok(2));
mem::drop(handle);
assert!(directory
.atomic_write(Path::new("meta.json"), b"qux")
.is_ok());
assert!(rx.recv_timeout(timeout).is_err());
}
fn test_lock_non_blocking(directory: &dyn Directory) {