mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-05 00:32:55 +00:00
186 lines
5.5 KiB
Rust
186 lines
5.5 KiB
Rust
use std::io::BufRead;
|
|
use std::path::Path;
|
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
use std::{fs, io, thread};
|
|
|
|
use crc32fast::Hasher;
|
|
|
|
use crate::directory::{WatchCallback, WatchCallbackList, WatchHandle};
|
|
|
|
const POLLING_INTERVAL: Duration = Duration::from_millis(if cfg!(test) { 1 } else { 500 });
|
|
|
|
// Watches a file and executes registered callbacks when the file is modified.
|
|
pub struct FileWatcher {
|
|
path: Arc<Path>,
|
|
callbacks: Arc<WatchCallbackList>,
|
|
state: Arc<AtomicUsize>, // 0: new, 1: runnable, 2: terminated
|
|
}
|
|
|
|
impl FileWatcher {
|
|
pub fn new(path: &Path) -> FileWatcher {
|
|
FileWatcher {
|
|
path: Arc::from(path),
|
|
callbacks: Default::default(),
|
|
state: Default::default(),
|
|
}
|
|
}
|
|
|
|
pub fn spawn(&self) {
|
|
if self
|
|
.state
|
|
.compare_exchange(0, 1, Ordering::SeqCst, Ordering::SeqCst)
|
|
.is_err()
|
|
{
|
|
return;
|
|
}
|
|
|
|
let path = self.path.clone();
|
|
let callbacks = self.callbacks.clone();
|
|
let state = self.state.clone();
|
|
|
|
thread::Builder::new()
|
|
.name("thread-tantivy-meta-file-watcher".to_string())
|
|
.spawn(move || {
|
|
let mut current_checksum_opt = None;
|
|
|
|
while state.load(Ordering::SeqCst) == 1 {
|
|
if let Ok(checksum) = FileWatcher::compute_checksum(&path) {
|
|
let metafile_has_changed = current_checksum_opt
|
|
.map(|current_checksum| current_checksum != checksum)
|
|
.unwrap_or(true);
|
|
if metafile_has_changed {
|
|
info!("Meta file {:?} was modified", path);
|
|
current_checksum_opt = Some(checksum);
|
|
// We actually ignore callbacks failing here.
|
|
// We just wait for the end of their execution.
|
|
let _ = callbacks.broadcast().wait();
|
|
}
|
|
}
|
|
|
|
thread::sleep(POLLING_INTERVAL);
|
|
}
|
|
})
|
|
.expect("Failed to spawn meta file watcher thread");
|
|
}
|
|
|
|
pub fn watch(&self, callback: WatchCallback) -> WatchHandle {
|
|
let handle = self.callbacks.subscribe(callback);
|
|
self.spawn();
|
|
handle
|
|
}
|
|
|
|
fn compute_checksum(path: &Path) -> Result<u32, io::Error> {
|
|
let reader = match fs::File::open(path) {
|
|
Ok(f) => io::BufReader::new(f),
|
|
Err(e) => {
|
|
warn!("Failed to open meta file {:?}: {:?}", path, e);
|
|
return Err(e);
|
|
}
|
|
};
|
|
|
|
let mut hasher = Hasher::new();
|
|
|
|
for line in reader.lines() {
|
|
hasher.update(line?.as_bytes())
|
|
}
|
|
|
|
Ok(hasher.finalize())
|
|
}
|
|
}
|
|
|
|
impl Drop for FileWatcher {
|
|
fn drop(&mut self) {
|
|
self.state.store(2, Ordering::SeqCst);
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
|
|
use std::mem;
|
|
|
|
use super::*;
|
|
use crate::directory::mmap_directory::atomic_write;
|
|
|
|
#[test]
|
|
fn test_file_watcher_drop_watcher() -> crate::Result<()> {
|
|
let tmp_dir = tempfile::TempDir::new()?;
|
|
let tmp_file = tmp_dir.path().join("watched.txt");
|
|
|
|
let counter: Arc<AtomicUsize> = Default::default();
|
|
let (tx, rx) = crossbeam_channel::unbounded();
|
|
let timeout = Duration::from_millis(100);
|
|
|
|
let watcher = FileWatcher::new(&tmp_file);
|
|
|
|
let state = watcher.state.clone();
|
|
assert_eq!(state.load(Ordering::SeqCst), 0);
|
|
|
|
let counter_clone = counter.clone();
|
|
|
|
let _handle = watcher.watch(WatchCallback::new(move || {
|
|
let val = counter_clone.fetch_add(1, Ordering::SeqCst);
|
|
tx.send(val + 1).unwrap();
|
|
}));
|
|
|
|
assert_eq!(counter.load(Ordering::SeqCst), 0);
|
|
assert_eq!(state.load(Ordering::SeqCst), 1);
|
|
|
|
atomic_write(&tmp_file, b"foo")?;
|
|
assert_eq!(rx.recv_timeout(timeout), Ok(1));
|
|
|
|
atomic_write(&tmp_file, b"foo")?;
|
|
assert!(rx.recv_timeout(timeout).is_err());
|
|
|
|
atomic_write(&tmp_file, b"bar")?;
|
|
assert_eq!(rx.recv_timeout(timeout), Ok(2));
|
|
|
|
mem::drop(watcher);
|
|
|
|
atomic_write(&tmp_file, b"qux")?;
|
|
thread::sleep(Duration::from_millis(10));
|
|
assert_eq!(counter.load(Ordering::SeqCst), 2);
|
|
assert_eq!(state.load(Ordering::SeqCst), 2);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn test_file_watcher_drop_handle() -> crate::Result<()> {
|
|
let tmp_dir = tempfile::TempDir::new()?;
|
|
let tmp_file = tmp_dir.path().join("watched.txt");
|
|
|
|
let counter: Arc<AtomicUsize> = Default::default();
|
|
let (tx, rx) = crossbeam_channel::unbounded();
|
|
let timeout = Duration::from_millis(100);
|
|
|
|
let watcher = FileWatcher::new(&tmp_file);
|
|
|
|
let state = watcher.state.clone();
|
|
assert_eq!(state.load(Ordering::SeqCst), 0);
|
|
|
|
let counter_clone = counter.clone();
|
|
|
|
let handle = watcher.watch(WatchCallback::new(move || {
|
|
let val = counter_clone.fetch_add(1, Ordering::SeqCst);
|
|
tx.send(val + 1).unwrap();
|
|
}));
|
|
|
|
assert_eq!(counter.load(Ordering::SeqCst), 0);
|
|
assert_eq!(state.load(Ordering::SeqCst), 1);
|
|
|
|
atomic_write(&tmp_file, b"foo")?;
|
|
assert_eq!(rx.recv_timeout(timeout), Ok(1));
|
|
|
|
mem::drop(handle);
|
|
|
|
atomic_write(&tmp_file, b"qux")?;
|
|
assert_eq!(counter.load(Ordering::SeqCst), 1);
|
|
assert_eq!(state.load(Ordering::SeqCst), 1);
|
|
|
|
Ok(())
|
|
}
|
|
}
|