Compare commits

...

1 Commits

Author SHA1 Message Date
Paul Masurel
79894657df Address #656
Broke the reference loop to make sure that the watch_router can
be dropped, and the thread exits.
2019-09-30 12:54:02 +09:00
3 changed files with 32 additions and 38 deletions

View File

@@ -3,6 +3,12 @@ Tantivy 0.11.0
- Added f64 field. Internally reuse u64 code the same way i64 does (@fdb-hiroshima) - Added f64 field. Internally reuse u64 code the same way i64 does (@fdb-hiroshima)
Tantivy 0.10.2
=====================
- Closes #656. Solving memory leak.
Tantivy 0.10.1 Tantivy 0.10.1
===================== =====================

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "tantivy" name = "tantivy"
version = "0.10.1" version = "0.10.2"
authors = ["Paul Masurel <paul.masurel@gmail.com>"] authors = ["Paul Masurel <paul.masurel@gmail.com>"]
license = "MIT" license = "MIT"
categories = ["database-implementations", "data-structures"] categories = ["database-implementations", "data-structures"]
@@ -98,4 +98,4 @@ features = ["failpoints"]
[[test]] [[test]]
name = "failpoints" name = "failpoints"
path = "tests/failpoints/mod.rs" path = "tests/failpoints/mod.rs"
required-features = ["fail/failpoints"] required-features = ["fail/failpoints"]

View File

@@ -141,42 +141,28 @@ impl MmapCache {
} }
} }
struct InnerWatcherWrapper {
_watcher: Mutex<notify::RecommendedWatcher>,
watcher_router: WatchCallbackList,
}
impl InnerWatcherWrapper {
pub fn new(path: &Path) -> Result<(Self, Receiver<notify::RawEvent>), notify::Error> {
let (tx, watcher_recv): (Sender<RawEvent>, Receiver<RawEvent>) = channel();
// We need to initialize the
let mut watcher = notify::raw_watcher(tx)?;
watcher.watch(path, RecursiveMode::Recursive)?;
let inner = InnerWatcherWrapper {
_watcher: Mutex::new(watcher),
watcher_router: Default::default(),
};
Ok((inner, watcher_recv))
}
}
#[derive(Clone)]
struct WatcherWrapper { struct WatcherWrapper {
inner: Arc<InnerWatcherWrapper>, _watcher: Mutex<notify::RecommendedWatcher>,
watcher_router: Arc<WatchCallbackList>,
} }
impl WatcherWrapper { impl WatcherWrapper {
pub fn new(path: &Path) -> Result<Self, OpenDirectoryError> { pub fn new(path: &Path) -> Result<Self, OpenDirectoryError> {
let (inner, watcher_recv) = InnerWatcherWrapper::new(path).map_err(|err| match err { let (tx, watcher_recv): (Sender<RawEvent>, Receiver<RawEvent>) = channel();
notify::Error::PathNotFound => OpenDirectoryError::DoesNotExist(path.to_owned()), // We need to initialize the
_ => { let watcher = notify::raw_watcher(tx)
panic!("Unknown error while starting watching directory {:?}", path); .and_then(|mut watcher| {
} watcher.watch(path, RecursiveMode::Recursive)?;
})?; Ok(watcher)
let watcher_wrapper = WatcherWrapper { })
inner: Arc::new(inner), .map_err(|err| match err {
}; notify::Error::PathNotFound => OpenDirectoryError::DoesNotExist(path.to_owned()),
let watcher_wrapper_clone = watcher_wrapper.clone(); _ => {
panic!("Unknown error while starting watching directory {:?}", path);
}
})?;
let watcher_router: Arc<WatchCallbackList> = Default::default();
let watcher_router_clone = watcher_router.clone();
thread::Builder::new() thread::Builder::new()
.name("meta-file-watch-thread".to_string()) .name("meta-file-watch-thread".to_string())
.spawn(move || { .spawn(move || {
@@ -187,7 +173,7 @@ impl WatcherWrapper {
// We might want to be more accurate than this at one point. // We might want to be more accurate than this at one point.
if let Some(filename) = changed_path.file_name() { if let Some(filename) = changed_path.file_name() {
if filename == *META_FILEPATH { if filename == *META_FILEPATH {
watcher_wrapper_clone.inner.watcher_router.broadcast(); watcher_router_clone.broadcast();
} }
} }
} }
@@ -200,13 +186,15 @@ impl WatcherWrapper {
} }
} }
} }
}) })?;
.expect("Failed to spawn thread to watch meta.json"); Ok(WatcherWrapper {
Ok(watcher_wrapper) _watcher: Mutex::new(watcher),
watcher_router,
})
} }
pub fn watch(&mut self, watch_callback: WatchCallback) -> WatchHandle { pub fn watch(&mut self, watch_callback: WatchCallback) -> WatchHandle {
self.inner.watcher_router.subscribe(watch_callback) self.watcher_router.subscribe(watch_callback)
} }
} }