Merge pull request #932 from tantivy-search/fix-unit-test-file-watcher

Fixing unit test.
This commit is contained in:
Paul Masurel
2020-11-13 11:47:15 +09:00
committed by GitHub
2 changed files with 82 additions and 28 deletions

View File

@@ -91,48 +91,88 @@ impl Drop for FileWatcher {
#[cfg(test)]
mod tests {
use std::mem;
use crate::directory::mmap_directory::atomic_write;
use super::*;
#[test]
fn test_file_watcher() {
let tmp_dir = tempfile::TempDir::new().unwrap();
fn test_file_watcher_drop_watcher() -> crate::Result<()> {
let tmp_dir = tempfile::TempDir::new()?;
let tmp_file = tmp_dir.path().join("watched.txt");
let counter: Arc<AtomicUsize> = Default::default();
let _handle;
let state;
let (tx, rx) = crossbeam::channel::unbounded();
let timeout = Duration::from_millis(100);
{
let watcher = FileWatcher::new(&tmp_file);
let watcher = FileWatcher::new(&tmp_file);
state = watcher.state.clone();
assert_eq!(state.load(Ordering::SeqCst), 0);
let state = watcher.state.clone();
assert_eq!(state.load(Ordering::SeqCst), 0);
let counter_clone = counter.clone();
let counter_clone = counter.clone();
_handle = watcher.watch(WatchCallback::new(move || {
let val = counter_clone.fetch_add(1, Ordering::SeqCst);
tx.send(val + 1).unwrap();
}));
let _handle = watcher.watch(WatchCallback::new(move || {
let val = counter_clone.fetch_add(1, Ordering::SeqCst);
tx.send(val + 1).unwrap();
}));
assert_eq!(counter.load(Ordering::SeqCst), 0);
assert_eq!(state.load(Ordering::SeqCst), 1);
assert_eq!(counter.load(Ordering::SeqCst), 0);
assert_eq!(state.load(Ordering::SeqCst), 1);
fs::write(&tmp_file, b"foo").unwrap();
assert_eq!(rx.recv_timeout(timeout), Ok(1));
atomic_write(&tmp_file, b"foo")?;
assert_eq!(rx.recv_timeout(timeout), Ok(1));
fs::write(&tmp_file, b"foo").unwrap();
assert!(rx.recv_timeout(timeout).is_err());
atomic_write(&tmp_file, b"foo")?;
assert!(rx.recv_timeout(timeout).is_err());
fs::write(&tmp_file, b"bar").unwrap();
assert_eq!(rx.recv_timeout(timeout), Ok(2));
}
atomic_write(&tmp_file, b"bar")?;
assert_eq!(rx.recv_timeout(timeout), Ok(2));
fs::write(&tmp_file, b"qux").unwrap();
mem::drop(watcher);
atomic_write(&tmp_file, b"qux")?;
thread::sleep(Duration::from_millis(10));
assert_eq!(counter.load(Ordering::SeqCst), 2);
assert_eq!(state.load(Ordering::SeqCst), 2);
Ok(())
}
#[test]
fn test_file_watcher_drop_handle() -> crate::Result<()> {
let tmp_dir = tempfile::TempDir::new()?;
let tmp_file = tmp_dir.path().join("watched.txt");
let counter: Arc<AtomicUsize> = Default::default();
let (tx, rx) = crossbeam::channel::unbounded();
let timeout = Duration::from_millis(100);
let watcher = FileWatcher::new(&tmp_file);
let state = watcher.state.clone();
assert_eq!(state.load(Ordering::SeqCst), 0);
let counter_clone = counter.clone();
let handle = watcher.watch(WatchCallback::new(move || {
let val = counter_clone.fetch_add(1, Ordering::SeqCst);
tx.send(val + 1).unwrap();
}));
assert_eq!(counter.load(Ordering::SeqCst), 0);
assert_eq!(state.load(Ordering::SeqCst), 1);
atomic_write(&tmp_file, b"foo")?;
assert_eq!(rx.recv_timeout(timeout), Ok(1));
mem::drop(handle);
atomic_write(&tmp_file, b"qux")?;
assert_eq!(counter.load(Ordering::SeqCst), 1);
assert_eq!(state.load(Ordering::SeqCst), 1);
Ok(())
}
}

View File

@@ -327,6 +327,24 @@ impl Deref for MmapArc {
}
unsafe impl StableDeref for MmapArc {}
/// Writes a file in an atomic manner.
pub(crate) fn atomic_write(path: &Path, content: &[u8]) -> io::Result<()> {
// We create the temporary file in the same directory as the target file.
// Indeed the canonical temp directory and the target file might sit in different
// filesystem, in which case the atomic write may actually not work.
let parent_path = path.parent().ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"Path {:?} does not have parent directory.",
)
})?;
let mut tempfile = tempfile::Builder::new().tempfile_in(&parent_path)?;
tempfile.write_all(content)?;
tempfile.flush()?;
tempfile.into_temp_path().persist(path)?;
Ok(())
}
impl Directory for MmapDirectory {
fn open_read(&self, path: &Path) -> result::Result<FileSlice, OpenReadError> {
debug!("Open Read {:?}", path);
@@ -427,12 +445,8 @@ impl Directory for MmapDirectory {
fn atomic_write(&self, path: &Path, content: &[u8]) -> io::Result<()> {
debug!("Atomic Write {:?}", path);
let mut tempfile = tempfile::Builder::new().tempfile_in(&self.inner.root_path)?;
tempfile.write_all(content)?;
tempfile.flush()?;
let full_path = self.resolve_path(path);
tempfile.into_temp_path().persist(full_path)?;
Ok(())
atomic_write(&full_path, content)
}
fn acquire_lock(&self, lock: &Lock) -> Result<DirectoryLock, LockError> {