Closes #930 Minor bug.

Watch callback could be callback if the last watch handle was dropped
shortly before meta.json is called.
This commit is contained in:
Paul Masurel
2020-11-11 15:51:23 +09:00
parent a49e59053c
commit eef348004e
5 changed files with 35 additions and 21 deletions

View File

@@ -400,7 +400,7 @@ impl fmt::Debug for Index {
#[cfg(test)]
mod tests {
use crate::directory::RAMDirectory;
use crate::directory::{RAMDirectory, WatchCallback};
use crate::schema::Field;
use crate::schema::{Schema, INDEXED, TEXT};
use crate::IndexReader;
@@ -525,7 +525,7 @@ mod tests {
assert_eq!(reader.searcher().num_docs(), 0);
writer.add_document(doc!(field=>1u64));
let (sender, receiver) = crossbeam::channel::unbounded();
let _handle = index.directory_mut().watch(Box::new(move || {
let _handle = index.directory_mut().watch(WatchCallback::new(move || {
let _ = sender.send(());
}));
writer.commit().unwrap();
@@ -555,9 +555,11 @@ mod tests {
fn test_index_on_commit_reload_policy_aux(field: Field, index: &Index, reader: &IndexReader) {
let mut reader_index = reader.index();
let (sender, receiver) = crossbeam::channel::unbounded();
let _watch_handle = reader_index.directory_mut().watch(Box::new(move || {
let _ = sender.send(());
}));
let _watch_handle = reader_index
.directory_mut()
.watch(WatchCallback::new(move || {
let _ = sender.send(());
}));
let mut writer = index.writer_for_tests().unwrap();
assert_eq!(reader.searcher().num_docs(), 0);
writer.add_document(doc!(field=>1u64));
@@ -596,7 +598,7 @@ mod tests {
writer.add_document(doc!(field => i));
}
let (sender, receiver) = crossbeam::channel::unbounded();
let _handle = directory.watch(Box::new(move || {
let _handle = directory.watch(WatchCallback::new(move || {
let _ = sender.send(());
}));
writer.commit().unwrap();

View File

@@ -112,7 +112,7 @@ mod tests {
let counter_clone = counter.clone();
_handle = watcher.watch(Box::new(move || {
_handle = watcher.watch(WatchCallback::new(move || {
let val = counter_clone.fetch_add(1, Ordering::SeqCst);
tx.send(val + 1).unwrap();
}));

View File

@@ -195,7 +195,7 @@ fn test_watch(directory: &dyn Directory) {
let timeout = Duration::from_millis(500);
let handle = directory
.watch(Box::new(move || {
.watch(WatchCallback::new(move || {
let val = counter.fetch_add(1, SeqCst);
tx.send(val + 1).unwrap();
}))

View File

@@ -4,8 +4,20 @@ use std::sync::Arc;
use std::sync::RwLock;
use std::sync::Weak;
/// Type alias for callbacks registered when watching files of a `Directory`.
pub type WatchCallback = Box<dyn Fn() + Sync + Send>;
/// Cloneable wrapper for callbacks registered when watching files of a `Directory`.
#[derive(Clone)]
pub struct WatchCallback(Arc<Box<dyn Fn() + Sync + Send>>);
impl WatchCallback {
/// Wraps a `Fn()` to create a WatchCallback.
pub fn new<F: Fn() + Sync + Send + 'static>(op: F) -> Self {
WatchCallback(Arc::new(Box::new(op)))
}
fn call(&self) {
self.0()
}
}
/// Helper struct to implement the watch method in `Directory` implementations.
///
@@ -34,7 +46,7 @@ impl WatchHandle {
///
/// This function is only useful when implementing a readonly directory.
pub fn empty() -> WatchHandle {
WatchHandle::new(Arc::new(Box::new(|| {})))
WatchHandle::new(Arc::new(WatchCallback::new(|| {})))
}
}
@@ -47,13 +59,13 @@ impl WatchCallbackList {
WatchHandle::new(watch_callback_arc)
}
fn list_callback(&self) -> Vec<Arc<WatchCallback>> {
let mut callbacks = vec![];
fn list_callback(&self) -> Vec<WatchCallback> {
let mut callbacks: Vec<WatchCallback> = vec![];
let mut router_wlock = self.router.write().unwrap();
let mut i = 0;
while i < router_wlock.len() {
if let Some(watch) = router_wlock[i].upgrade() {
callbacks.push(watch);
callbacks.push(watch.as_ref().clone());
i += 1;
} else {
router_wlock.swap_remove(i);
@@ -75,7 +87,7 @@ impl WatchCallbackList {
.name("watch-callbacks".to_string())
.spawn(move || {
for callback in callbacks {
callback();
callback.call();
}
let _ = sender.send(());
});
@@ -91,7 +103,7 @@ impl WatchCallbackList {
#[cfg(test)]
mod tests {
use crate::directory::WatchCallbackList;
use crate::directory::{WatchCallback, WatchCallbackList};
use futures::executor::block_on;
use std::mem;
use std::sync::atomic::{AtomicUsize, Ordering};
@@ -102,7 +114,7 @@ mod tests {
let watch_event_router = WatchCallbackList::default();
let counter: Arc<AtomicUsize> = Default::default();
let counter_clone = counter.clone();
let inc_callback = Box::new(move || {
let inc_callback = WatchCallback::new(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
block_on(watch_event_router.broadcast());
@@ -130,7 +142,7 @@ mod tests {
let counter: Arc<AtomicUsize> = Default::default();
let inc_callback = |inc: usize| {
let counter_clone = counter.clone();
Box::new(move || {
WatchCallback::new(move || {
counter_clone.fetch_add(inc, Ordering::SeqCst);
})
};
@@ -158,7 +170,7 @@ mod tests {
let watch_event_router = WatchCallbackList::default();
let counter: Arc<AtomicUsize> = Default::default();
let counter_clone = counter.clone();
let inc_callback = Box::new(move || {
let inc_callback = WatchCallback::new(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
let handle_a = watch_event_router.subscribe(inc_callback);

View File

@@ -3,9 +3,9 @@ mod pool;
pub use self::pool::LeasedItem;
use self::pool::Pool;
use crate::core::Segment;
use crate::directory::Directory;
use crate::directory::WatchHandle;
use crate::directory::META_LOCK;
use crate::directory::{Directory, WatchCallback};
use crate::Index;
use crate::Searcher;
use crate::SegmentReader;
@@ -88,7 +88,7 @@ impl IndexReaderBuilder {
let watch_handle = inner_reader_arc
.index
.directory()
.watch(Box::new(callback))?;
.watch(WatchCallback::new(callback))?;
watch_handle_opt = Some(watch_handle);
}
}