diff --git a/src/core/index.rs b/src/core/index.rs index e4f2c5775..5dacfcbf5 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -400,7 +400,7 @@ impl fmt::Debug for Index { #[cfg(test)] mod tests { - use crate::directory::RAMDirectory; + use crate::directory::{RAMDirectory, WatchCallback}; use crate::schema::Field; use crate::schema::{Schema, INDEXED, TEXT}; use crate::IndexReader; @@ -525,7 +525,7 @@ mod tests { assert_eq!(reader.searcher().num_docs(), 0); writer.add_document(doc!(field=>1u64)); let (sender, receiver) = crossbeam::channel::unbounded(); - let _handle = index.directory_mut().watch(Box::new(move || { + let _handle = index.directory_mut().watch(WatchCallback::new(move || { let _ = sender.send(()); })); writer.commit().unwrap(); @@ -555,9 +555,11 @@ mod tests { fn test_index_on_commit_reload_policy_aux(field: Field, index: &Index, reader: &IndexReader) { let mut reader_index = reader.index(); let (sender, receiver) = crossbeam::channel::unbounded(); - let _watch_handle = reader_index.directory_mut().watch(Box::new(move || { - let _ = sender.send(()); - })); + let _watch_handle = reader_index + .directory_mut() + .watch(WatchCallback::new(move || { + let _ = sender.send(()); + })); let mut writer = index.writer_for_tests().unwrap(); assert_eq!(reader.searcher().num_docs(), 0); writer.add_document(doc!(field=>1u64)); @@ -596,7 +598,7 @@ mod tests { writer.add_document(doc!(field => i)); } let (sender, receiver) = crossbeam::channel::unbounded(); - let _handle = directory.watch(Box::new(move || { + let _handle = directory.watch(WatchCallback::new(move || { let _ = sender.send(()); })); writer.commit().unwrap(); diff --git a/src/directory/file_watcher.rs b/src/directory/file_watcher.rs index 27cbaddf9..086d5f83b 100644 --- a/src/directory/file_watcher.rs +++ b/src/directory/file_watcher.rs @@ -112,7 +112,7 @@ mod tests { let counter_clone = counter.clone(); - _handle = watcher.watch(Box::new(move || { + _handle = watcher.watch(WatchCallback::new(move || { let val = counter_clone.fetch_add(1, Ordering::SeqCst); tx.send(val + 1).unwrap(); })); diff --git a/src/directory/tests.rs b/src/directory/tests.rs index 097941d7a..734e309b3 100644 --- a/src/directory/tests.rs +++ b/src/directory/tests.rs @@ -195,7 +195,7 @@ fn test_watch(directory: &dyn Directory) { let timeout = Duration::from_millis(500); let handle = directory - .watch(Box::new(move || { + .watch(WatchCallback::new(move || { let val = counter.fetch_add(1, SeqCst); tx.send(val + 1).unwrap(); })) diff --git a/src/directory/watch_event_router.rs b/src/directory/watch_event_router.rs index da66121ca..d0523d66c 100644 --- a/src/directory/watch_event_router.rs +++ b/src/directory/watch_event_router.rs @@ -4,8 +4,20 @@ use std::sync::Arc; use std::sync::RwLock; use std::sync::Weak; -/// Type alias for callbacks registered when watching files of a `Directory`. -pub type WatchCallback = Box; +/// Cloneable wrapper for callbacks registered when watching files of a `Directory`. +#[derive(Clone)] +pub struct WatchCallback(Arc>); + +impl WatchCallback { + /// Wraps a `Fn()` to create a WatchCallback. + pub fn new(op: F) -> Self { + WatchCallback(Arc::new(Box::new(op))) + } + + fn call(&self) { + self.0() + } +} /// Helper struct to implement the watch method in `Directory` implementations. /// @@ -34,7 +46,7 @@ impl WatchHandle { /// /// This function is only useful when implementing a readonly directory. pub fn empty() -> WatchHandle { - WatchHandle::new(Arc::new(Box::new(|| {}))) + WatchHandle::new(Arc::new(WatchCallback::new(|| {}))) } } @@ -47,13 +59,13 @@ impl WatchCallbackList { WatchHandle::new(watch_callback_arc) } - fn list_callback(&self) -> Vec> { - let mut callbacks = vec![]; + fn list_callback(&self) -> Vec { + let mut callbacks: Vec = vec![]; let mut router_wlock = self.router.write().unwrap(); let mut i = 0; while i < router_wlock.len() { if let Some(watch) = router_wlock[i].upgrade() { - callbacks.push(watch); + callbacks.push(watch.as_ref().clone()); i += 1; } else { router_wlock.swap_remove(i); @@ -75,7 +87,7 @@ impl WatchCallbackList { .name("watch-callbacks".to_string()) .spawn(move || { for callback in callbacks { - callback(); + callback.call(); } let _ = sender.send(()); }); @@ -91,7 +103,7 @@ impl WatchCallbackList { #[cfg(test)] mod tests { - use crate::directory::WatchCallbackList; + use crate::directory::{WatchCallback, WatchCallbackList}; use futures::executor::block_on; use std::mem; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -102,7 +114,7 @@ mod tests { let watch_event_router = WatchCallbackList::default(); let counter: Arc = Default::default(); let counter_clone = counter.clone(); - let inc_callback = Box::new(move || { + let inc_callback = WatchCallback::new(move || { counter_clone.fetch_add(1, Ordering::SeqCst); }); block_on(watch_event_router.broadcast()); @@ -130,7 +142,7 @@ mod tests { let counter: Arc = Default::default(); let inc_callback = |inc: usize| { let counter_clone = counter.clone(); - Box::new(move || { + WatchCallback::new(move || { counter_clone.fetch_add(inc, Ordering::SeqCst); }) }; @@ -158,7 +170,7 @@ mod tests { let watch_event_router = WatchCallbackList::default(); let counter: Arc = Default::default(); let counter_clone = counter.clone(); - let inc_callback = Box::new(move || { + let inc_callback = WatchCallback::new(move || { counter_clone.fetch_add(1, Ordering::SeqCst); }); let handle_a = watch_event_router.subscribe(inc_callback); diff --git a/src/reader/mod.rs b/src/reader/mod.rs index 0a38c8cbb..679abc7c0 100644 --- a/src/reader/mod.rs +++ b/src/reader/mod.rs @@ -3,9 +3,9 @@ mod pool; pub use self::pool::LeasedItem; use self::pool::Pool; use crate::core::Segment; -use crate::directory::Directory; use crate::directory::WatchHandle; use crate::directory::META_LOCK; +use crate::directory::{Directory, WatchCallback}; use crate::Index; use crate::Searcher; use crate::SegmentReader; @@ -88,7 +88,7 @@ impl IndexReaderBuilder { let watch_handle = inner_reader_arc .index .directory() - .watch(Box::new(callback))?; + .watch(WatchCallback::new(callback))?; watch_handle_opt = Some(watch_handle); } }