From 960c2ee39df0b14c5afdce36e3ed7f69b794ddbb Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 12 Nov 2020 14:06:22 +0900 Subject: [PATCH] Fixing unit test. --- src/directory/file_watcher.rs | 86 ++++++++++++++++++++++++--------- src/directory/mmap_directory.rs | 24 +++++++-- 2 files changed, 82 insertions(+), 28 deletions(-) diff --git a/src/directory/file_watcher.rs b/src/directory/file_watcher.rs index 086d5f83b..8b5dcdcd8 100644 --- a/src/directory/file_watcher.rs +++ b/src/directory/file_watcher.rs @@ -91,48 +91,88 @@ impl Drop for FileWatcher { #[cfg(test)] mod tests { + use std::mem; + + use crate::directory::mmap_directory::atomic_write; + use super::*; #[test] - fn test_file_watcher() { - let tmp_dir = tempfile::TempDir::new().unwrap(); + fn test_file_watcher_drop_watcher() -> crate::Result<()> { + let tmp_dir = tempfile::TempDir::new()?; let tmp_file = tmp_dir.path().join("watched.txt"); let counter: Arc = Default::default(); - let _handle; - let state; let (tx, rx) = crossbeam::channel::unbounded(); let timeout = Duration::from_millis(100); - { - let watcher = FileWatcher::new(&tmp_file); + let watcher = FileWatcher::new(&tmp_file); - state = watcher.state.clone(); - assert_eq!(state.load(Ordering::SeqCst), 0); + let state = watcher.state.clone(); + assert_eq!(state.load(Ordering::SeqCst), 0); - let counter_clone = counter.clone(); + let counter_clone = counter.clone(); - _handle = watcher.watch(WatchCallback::new(move || { - let val = counter_clone.fetch_add(1, Ordering::SeqCst); - tx.send(val + 1).unwrap(); - })); + let _handle = watcher.watch(WatchCallback::new(move || { + let val = counter_clone.fetch_add(1, Ordering::SeqCst); + tx.send(val + 1).unwrap(); + })); - assert_eq!(counter.load(Ordering::SeqCst), 0); - assert_eq!(state.load(Ordering::SeqCst), 1); + assert_eq!(counter.load(Ordering::SeqCst), 0); + assert_eq!(state.load(Ordering::SeqCst), 1); - fs::write(&tmp_file, b"foo").unwrap(); - assert_eq!(rx.recv_timeout(timeout), Ok(1)); + atomic_write(&tmp_file, b"foo")?; + assert_eq!(rx.recv_timeout(timeout), Ok(1)); - fs::write(&tmp_file, b"foo").unwrap(); - assert!(rx.recv_timeout(timeout).is_err()); + atomic_write(&tmp_file, b"foo")?; + assert!(rx.recv_timeout(timeout).is_err()); - fs::write(&tmp_file, b"bar").unwrap(); - assert_eq!(rx.recv_timeout(timeout), Ok(2)); - } + atomic_write(&tmp_file, b"bar")?; + assert_eq!(rx.recv_timeout(timeout), Ok(2)); - fs::write(&tmp_file, b"qux").unwrap(); + mem::drop(watcher); + + atomic_write(&tmp_file, b"qux")?; thread::sleep(Duration::from_millis(10)); assert_eq!(counter.load(Ordering::SeqCst), 2); assert_eq!(state.load(Ordering::SeqCst), 2); + + Ok(()) + } + + #[test] + fn test_file_watcher_drop_handle() -> crate::Result<()> { + let tmp_dir = tempfile::TempDir::new()?; + let tmp_file = tmp_dir.path().join("watched.txt"); + + let counter: Arc = Default::default(); + let (tx, rx) = crossbeam::channel::unbounded(); + let timeout = Duration::from_millis(100); + + let watcher = FileWatcher::new(&tmp_file); + + let state = watcher.state.clone(); + assert_eq!(state.load(Ordering::SeqCst), 0); + + let counter_clone = counter.clone(); + + let handle = watcher.watch(WatchCallback::new(move || { + let val = counter_clone.fetch_add(1, Ordering::SeqCst); + tx.send(val + 1).unwrap(); + })); + + assert_eq!(counter.load(Ordering::SeqCst), 0); + assert_eq!(state.load(Ordering::SeqCst), 1); + + atomic_write(&tmp_file, b"foo")?; + assert_eq!(rx.recv_timeout(timeout), Ok(1)); + + mem::drop(handle); + + atomic_write(&tmp_file, b"qux")?; + assert_eq!(counter.load(Ordering::SeqCst), 1); + assert_eq!(state.load(Ordering::SeqCst), 1); + + Ok(()) } } diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index c5adf8460..fca92ded9 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -327,6 +327,24 @@ impl Deref for MmapArc { } unsafe impl StableDeref for MmapArc {} +/// Writes a file in an atomic manner. +pub(crate) fn atomic_write(path: &Path, content: &[u8]) -> io::Result<()> { + // We create the temporary file in the same directory as the target file. + // Indeed the canonical temp directory and the target file might sit in different + // filesystem, in which case the atomic write may actually not work. + let parent_path = path.parent().ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "Path {:?} does not have parent directory.", + ) + })?; + let mut tempfile = tempfile::Builder::new().tempfile_in(&parent_path)?; + tempfile.write_all(content)?; + tempfile.flush()?; + tempfile.into_temp_path().persist(path)?; + Ok(()) +} + impl Directory for MmapDirectory { fn open_read(&self, path: &Path) -> result::Result { debug!("Open Read {:?}", path); @@ -427,12 +445,8 @@ impl Directory for MmapDirectory { fn atomic_write(&self, path: &Path, content: &[u8]) -> io::Result<()> { debug!("Atomic Write {:?}", path); - let mut tempfile = tempfile::Builder::new().tempfile_in(&self.inner.root_path)?; - tempfile.write_all(content)?; - tempfile.flush()?; let full_path = self.resolve_path(path); - tempfile.into_temp_path().persist(full_path)?; - Ok(()) + atomic_write(&full_path, content) } fn acquire_lock(&self, lock: &Lock) -> Result {