From 062e38a2aba0326759c112dbf2c029166dfdb3ec Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 20 Feb 2017 23:45:05 +0900 Subject: [PATCH] Fixes #72 - Cache directory uses weak ref. Introduced CacheInfo object. --- Cargo.toml | 4 +- src/directory/mmap_directory.rs | 298 +++++++++++++++++++++++++------- src/lib.rs | 2 - 3 files changed, 240 insertions(+), 64 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bf59afcc2..3e17a4cfc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,10 +14,10 @@ keywords = ["search", "information", "retrieval"] [dependencies] byteorder = "1.0" -memmap = "0.5" +memmap = "0.4" lazy_static = "0.2.1" regex = "0.2" -fst = "0.1.35" +fst = "0.1.37" atomicwrites = "0.1.3" tempfile = "2.1" rustc-serialize = "0.3" diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index e4e595fec..aed5499d2 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -1,28 +1,128 @@ -use std::path::{Path, PathBuf}; -use tempdir::TempDir; -use std::collections::HashMap; -use std::collections::hash_map::Entry as HashMapEntry; -use fst::raw::MmapReadOnly; -use std::fs::File; use atomicwrites; -use std::sync::RwLock; +use common::make_io_err; +use directory::Directory; +use directory::error::{OpenWriteError, FileError, OpenDirectoryError}; +use directory::ReadOnlySource; +use directory::shared_vec_slice::SharedVecSlice; +use directory::WritePtr; +use fst::raw::MmapReadOnly; +use memmap::{Mmap, Protection}; +use std::collections::hash_map::Entry as HashMapEntry; +use std::collections::HashMap; +use std::mem; +use std::convert::From; use std::fmt; -use std::io::Write; +use std::fs; +use std::fs::File; +use std::fs::OpenOptions; use std::io; use std::io::{Seek, SeekFrom}; -use directory::Directory; -use directory::ReadOnlySource; -use directory::WritePtr; use std::io::BufWriter; -use std::fs::OpenOptions; -use directory::error::{OpenWriteError, FileError, OpenDirectoryError}; +use std::io::Write; +use std::path::{Path, PathBuf}; use std::result; -use common::make_io_err; use std::sync::Arc; -use std::fs; -use directory::shared_vec_slice::SharedVecSlice; +use std::sync::RwLock; +use std::sync::Weak; +use tempdir::TempDir; +fn open_mmap(full_path: &PathBuf) -> result::Result>, FileError> { + let convert_file_error = |err: io::Error| { + if err.kind() == io::ErrorKind::NotFound { + FileError::FileDoesNotExist(full_path.clone()) + } + else { + FileError::IOError(err) + } + }; + let file = File::open(&full_path).map_err(convert_file_error)?; + if try!(file.metadata()).len() == 0 { + // if the file size is 0, it will not be possible + // to mmap the file, so we return an anonymous mmap_cache + // instead. + return Ok(None) + } + Ok(Some(Arc::new(Mmap::open(&file, Protection::Read)?))) +} + +#[derive(Default,Clone,Debug,RustcDecodable,RustcEncodable)] +pub struct CacheCounters { + hit: usize, + miss_empty: usize, + miss_weak: usize, +} + +#[derive(Clone,Debug,RustcDecodable,RustcEncodable)] +pub struct CacheInfo { + pub counters: CacheCounters, + pub mmapped: Vec, +} + +#[derive(Default)] +struct MmapCache { + counters: CacheCounters, + cache: HashMap>, +} + +impl MmapCache { + + fn cleanup(&mut self) { + let mut new_cache = HashMap::new(); + mem::swap(&mut new_cache, &mut self.cache); + self.cache = new_cache + .into_iter() + .filter(|&(_, ref weak_ref)| weak_ref.upgrade().is_some()) + .collect(); + } + + fn get_info(&mut self) -> CacheInfo { + self.cleanup(); + let paths: Vec = self.cache.keys() + .cloned() + .collect(); + CacheInfo { + counters: self.counters.clone(), + mmapped: paths, + } + } + + fn get_mmap(&mut self, full_path: PathBuf) -> Result>, FileError> { + if self.cache.len() > 100 { + self.cleanup(); + } + Ok(match self.cache.entry(full_path.clone()) { + HashMapEntry::Occupied(mut occupied_entry) => { + if let Some(mmap_arc) = occupied_entry.get().upgrade() { + self.counters.hit += 1; + Some(mmap_arc.clone()) + } + else { + // The entry exists but the weak ref has been destroyed. + self.counters.miss_weak += 1; + if let Some(mmap_arc) = open_mmap(&full_path)? { + occupied_entry.insert(Arc::downgrade(&mmap_arc)); + Some(mmap_arc) + } + else { + None + } + } + } + HashMapEntry::Vacant(vacant_entry) => { + self.counters.miss_empty += 1; + if let Some(mmap_arc) = open_mmap(&full_path)? { + vacant_entry.insert(Arc::downgrade(&mmap_arc)); + Some(mmap_arc) + } + else { + None + } + } + }) + } +} + /// Directory storing data in files, read via mmap. /// /// The Mmap object are cached to limit the @@ -30,8 +130,9 @@ use directory::shared_vec_slice::SharedVecSlice; #[derive(Clone)] pub struct MmapDirectory { root_path: PathBuf, - mmap_cache: Arc>>, + mmap_cache: Arc>, _temp_directory: Arc>, + } impl fmt::Debug for MmapDirectory { @@ -40,8 +141,6 @@ impl fmt::Debug for MmapDirectory { } } - - impl MmapDirectory { /// Creates a new MmapDirectory in a temporary directory. @@ -53,13 +152,12 @@ impl MmapDirectory { let tempdir_path = PathBuf::from(tempdir.path()); let directory = MmapDirectory { root_path: PathBuf::from(tempdir_path), - mmap_cache: Arc::new(RwLock::new(HashMap::new())), + mmap_cache: Arc::new(RwLock::new(MmapCache::default())), _temp_directory: Arc::new(Some(tempdir)) }; Ok(directory) } - /// Opens a MmapDirectory in a directory. /// /// Returns an error if the `directory_path` does not @@ -74,7 +172,7 @@ impl MmapDirectory { else { Ok(MmapDirectory { root_path: PathBuf::from(directory_path), - mmap_cache: Arc::new(RwLock::new(HashMap::new())), + mmap_cache: Arc::new(RwLock::new(MmapCache::default())), _temp_directory: Arc::new(None) }) } @@ -95,6 +193,14 @@ impl MmapDirectory { Ok(()) } + pub fn get_cache_info(&mut self) -> CacheInfo { + self.mmap_cache + .write() + .expect("Mmap cache lock is poisoned.") + .get_info() + } + + } /// This Write wraps a File, but has the specificity of @@ -128,47 +234,21 @@ impl Seek for SafeFileWriter { impl Directory for MmapDirectory { - - fn open_read(&self, path: &Path) -> result::Result { debug!("Open Read {:?}", path); let full_path = self.resolve_path(path); - let mut mmap_cache = try!( - self.mmap_cache - .write() - .map_err(|_| { - make_io_err(format!("Failed to acquired write lock on mmap cache while reading {:?}", path)) - }) - ); - - let mmap = match mmap_cache.entry(full_path.clone()) { - HashMapEntry::Occupied(e) => { - e.get().clone() - } - HashMapEntry::Vacant(vacant_entry) => { - let file = try!( - File::open(&full_path).map_err(|err| { - if err.kind() == io::ErrorKind::NotFound { - FileError::FileDoesNotExist(full_path.clone()) - } - else { - FileError::IOError(err) - } - }) - ); - if try!(file.metadata()).len() == 0 { - // if the file size is 0, it will not be possible - // to mmap the file, so we return an anonymous mmap_cache - // instead. - return Ok(ReadOnlySource::Anonymous(SharedVecSlice::empty())) - } - let new_mmap = try!(MmapReadOnly::open(&file)); - vacant_entry.insert(new_mmap.clone()); - new_mmap - } - }; - Ok(ReadOnlySource::Mmap(mmap)) + let mut mmap_cache = self.mmap_cache + .write() + .map_err(|_| { + make_io_err(format!("Failed to acquired write lock on mmap cache while reading {:?}", path)) + })?; + + Ok(mmap_cache.get_mmap(full_path)? + .map(MmapReadOnly::from) + .map(ReadOnlySource::Mmap) + .unwrap_or(ReadOnlySource::Anonymous(SharedVecSlice::empty())) + ) } fn open_write(&mut self, path: &Path) -> Result { @@ -214,7 +294,7 @@ impl Directory for MmapDirectory { // Removing the entry in the MMap cache. // The munmap will appear on Drop, // when the last reference is gone. - mmap_cache.remove(&full_path); + mmap_cache.cache.remove(&full_path); try!(fs::remove_file(&full_path)); try!(self.sync_directory()); Ok(()) @@ -240,3 +320,101 @@ impl Directory for MmapDirectory { } } + + + + +#[cfg(test)] +mod tests { + + // There are more tests in directory/mod.rs + // The following tests are specific to the MmapDirectory + + use super::*; + + #[test] + fn test_open_empty() { + // empty file is actually an edge case because those + // cannot be mmapped. + // + // In that case the directory returns a SharedVecSlice. + let mut mmap_directory = MmapDirectory::create_from_tempdir().unwrap(); + let path = PathBuf::from("test"); + { + let mut w = mmap_directory.open_write(&path).unwrap(); + w.flush().unwrap(); + } + let readonlymap = mmap_directory.open_read(&path).unwrap(); + assert_eq!(readonlymap.len(), 0); + } + + #[test] + fn test_cache() { + + + let content = "abc".as_bytes(); + + // here we test if the cache releases + // mmaps correctly. + let mut mmap_directory = MmapDirectory::create_from_tempdir().unwrap(); + let paths: Vec = (0..10) + .map(|i| PathBuf::from(&*format!("file_{}", i))) + .collect(); + { + for path in &paths { + let mut w = mmap_directory.open_write(path).unwrap(); + w.write(content).unwrap(); + w.flush().unwrap(); + } + } + { + for path in &paths { + { + let _r = mmap_directory.open_read(path).unwrap(); + assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 1); + } + assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0); + } + } + assert_eq!(mmap_directory.get_cache_info().counters.miss_empty, 10); + + + { + // test weak miss + // the first pass create the weak refs. + for path in &paths { + let _r = mmap_directory.open_read(path).unwrap(); + } + // ... the second hits the weak refs. + for path in &paths { + let _r = mmap_directory.open_read(path).unwrap(); + } + let cache_info = mmap_directory.get_cache_info(); + assert_eq!(cache_info.counters.miss_empty, 20); + assert_eq!(cache_info.counters.miss_weak, 10); + } + + { + let mut saved_readmmaps = vec!(); + // Keeps reference alive + for (i, path) in paths.iter().enumerate() { + let r = mmap_directory.open_read(path).unwrap(); + saved_readmmaps.push(r); + assert_eq!(mmap_directory.get_cache_info().mmapped.len(), i + 1); + } + let cache_info = mmap_directory.get_cache_info(); + println!("{:?}", cache_info); + assert_eq!(cache_info.counters.miss_empty, 30); + assert_eq!(cache_info.counters.miss_weak, 10); + assert_eq!(cache_info.mmapped.len(), 10); + + for saved_readmmap in saved_readmmaps { + assert_eq!(saved_readmmap.as_slice(), content); + } + } + + assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0); + + } + +} diff --git a/src/lib.rs b/src/lib.rs index e82447472..397cd404e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,8 +28,6 @@ extern crate log; #[macro_use] extern crate version; - -#[macro_use] extern crate fst; extern crate byteorder; extern crate memmap;