mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-07 09:32:54 +00:00
Fixes #72 - Cache directory uses weak ref. Introduced CacheInfo object.
This commit is contained in:
@@ -14,10 +14,10 @@ keywords = ["search", "information", "retrieval"]
|
||||
|
||||
[dependencies]
|
||||
byteorder = "1.0"
|
||||
memmap = "0.5"
|
||||
memmap = "0.4"
|
||||
lazy_static = "0.2.1"
|
||||
regex = "0.2"
|
||||
fst = "0.1.35"
|
||||
fst = "0.1.37"
|
||||
atomicwrites = "0.1.3"
|
||||
tempfile = "2.1"
|
||||
rustc-serialize = "0.3"
|
||||
|
||||
@@ -1,28 +1,128 @@
|
||||
use std::path::{Path, PathBuf};
|
||||
use tempdir::TempDir;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::hash_map::Entry as HashMapEntry;
|
||||
use fst::raw::MmapReadOnly;
|
||||
use std::fs::File;
|
||||
use atomicwrites;
|
||||
use std::sync::RwLock;
|
||||
use common::make_io_err;
|
||||
use directory::Directory;
|
||||
use directory::error::{OpenWriteError, FileError, OpenDirectoryError};
|
||||
use directory::ReadOnlySource;
|
||||
use directory::shared_vec_slice::SharedVecSlice;
|
||||
use directory::WritePtr;
|
||||
use fst::raw::MmapReadOnly;
|
||||
use memmap::{Mmap, Protection};
|
||||
use std::collections::hash_map::Entry as HashMapEntry;
|
||||
use std::collections::HashMap;
|
||||
use std::mem;
|
||||
use std::convert::From;
|
||||
use std::fmt;
|
||||
use std::io::Write;
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::fs::OpenOptions;
|
||||
use std::io;
|
||||
use std::io::{Seek, SeekFrom};
|
||||
use directory::Directory;
|
||||
use directory::ReadOnlySource;
|
||||
use directory::WritePtr;
|
||||
use std::io::BufWriter;
|
||||
use std::fs::OpenOptions;
|
||||
use directory::error::{OpenWriteError, FileError, OpenDirectoryError};
|
||||
use std::io::Write;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::result;
|
||||
use common::make_io_err;
|
||||
use std::sync::Arc;
|
||||
use std::fs;
|
||||
use directory::shared_vec_slice::SharedVecSlice;
|
||||
use std::sync::RwLock;
|
||||
use std::sync::Weak;
|
||||
use tempdir::TempDir;
|
||||
|
||||
|
||||
fn open_mmap(full_path: &PathBuf) -> result::Result<Option<Arc<Mmap>>, FileError> {
|
||||
let convert_file_error = |err: io::Error| {
|
||||
if err.kind() == io::ErrorKind::NotFound {
|
||||
FileError::FileDoesNotExist(full_path.clone())
|
||||
}
|
||||
else {
|
||||
FileError::IOError(err)
|
||||
}
|
||||
};
|
||||
let file = File::open(&full_path).map_err(convert_file_error)?;
|
||||
if try!(file.metadata()).len() == 0 {
|
||||
// if the file size is 0, it will not be possible
|
||||
// to mmap the file, so we return an anonymous mmap_cache
|
||||
// instead.
|
||||
return Ok(None)
|
||||
}
|
||||
Ok(Some(Arc::new(Mmap::open(&file, Protection::Read)?)))
|
||||
}
|
||||
|
||||
#[derive(Default,Clone,Debug,RustcDecodable,RustcEncodable)]
|
||||
pub struct CacheCounters {
|
||||
hit: usize,
|
||||
miss_empty: usize,
|
||||
miss_weak: usize,
|
||||
}
|
||||
|
||||
#[derive(Clone,Debug,RustcDecodable,RustcEncodable)]
|
||||
pub struct CacheInfo {
|
||||
pub counters: CacheCounters,
|
||||
pub mmapped: Vec<PathBuf>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct MmapCache {
|
||||
counters: CacheCounters,
|
||||
cache: HashMap<PathBuf, Weak<Mmap>>,
|
||||
}
|
||||
|
||||
impl MmapCache {
|
||||
|
||||
fn cleanup(&mut self) {
|
||||
let mut new_cache = HashMap::new();
|
||||
mem::swap(&mut new_cache, &mut self.cache);
|
||||
self.cache = new_cache
|
||||
.into_iter()
|
||||
.filter(|&(_, ref weak_ref)| weak_ref.upgrade().is_some())
|
||||
.collect();
|
||||
}
|
||||
|
||||
fn get_info(&mut self) -> CacheInfo {
|
||||
self.cleanup();
|
||||
let paths: Vec<PathBuf> = self.cache.keys()
|
||||
.cloned()
|
||||
.collect();
|
||||
CacheInfo {
|
||||
counters: self.counters.clone(),
|
||||
mmapped: paths,
|
||||
}
|
||||
}
|
||||
|
||||
fn get_mmap(&mut self, full_path: PathBuf) -> Result<Option<Arc<Mmap>>, FileError> {
|
||||
if self.cache.len() > 100 {
|
||||
self.cleanup();
|
||||
}
|
||||
Ok(match self.cache.entry(full_path.clone()) {
|
||||
HashMapEntry::Occupied(mut occupied_entry) => {
|
||||
if let Some(mmap_arc) = occupied_entry.get().upgrade() {
|
||||
self.counters.hit += 1;
|
||||
Some(mmap_arc.clone())
|
||||
}
|
||||
else {
|
||||
// The entry exists but the weak ref has been destroyed.
|
||||
self.counters.miss_weak += 1;
|
||||
if let Some(mmap_arc) = open_mmap(&full_path)? {
|
||||
occupied_entry.insert(Arc::downgrade(&mmap_arc));
|
||||
Some(mmap_arc)
|
||||
}
|
||||
else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
HashMapEntry::Vacant(vacant_entry) => {
|
||||
self.counters.miss_empty += 1;
|
||||
if let Some(mmap_arc) = open_mmap(&full_path)? {
|
||||
vacant_entry.insert(Arc::downgrade(&mmap_arc));
|
||||
Some(mmap_arc)
|
||||
}
|
||||
else {
|
||||
None
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Directory storing data in files, read via mmap.
|
||||
///
|
||||
/// The Mmap object are cached to limit the
|
||||
@@ -30,8 +130,9 @@ use directory::shared_vec_slice::SharedVecSlice;
|
||||
#[derive(Clone)]
|
||||
pub struct MmapDirectory {
|
||||
root_path: PathBuf,
|
||||
mmap_cache: Arc<RwLock<HashMap<PathBuf, MmapReadOnly>>>,
|
||||
mmap_cache: Arc<RwLock<MmapCache>>,
|
||||
_temp_directory: Arc<Option<TempDir>>,
|
||||
|
||||
}
|
||||
|
||||
impl fmt::Debug for MmapDirectory {
|
||||
@@ -40,8 +141,6 @@ impl fmt::Debug for MmapDirectory {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
impl MmapDirectory {
|
||||
|
||||
/// Creates a new MmapDirectory in a temporary directory.
|
||||
@@ -53,13 +152,12 @@ impl MmapDirectory {
|
||||
let tempdir_path = PathBuf::from(tempdir.path());
|
||||
let directory = MmapDirectory {
|
||||
root_path: PathBuf::from(tempdir_path),
|
||||
mmap_cache: Arc::new(RwLock::new(HashMap::new())),
|
||||
mmap_cache: Arc::new(RwLock::new(MmapCache::default())),
|
||||
_temp_directory: Arc::new(Some(tempdir))
|
||||
};
|
||||
Ok(directory)
|
||||
}
|
||||
|
||||
|
||||
/// Opens a MmapDirectory in a directory.
|
||||
///
|
||||
/// Returns an error if the `directory_path` does not
|
||||
@@ -74,7 +172,7 @@ impl MmapDirectory {
|
||||
else {
|
||||
Ok(MmapDirectory {
|
||||
root_path: PathBuf::from(directory_path),
|
||||
mmap_cache: Arc::new(RwLock::new(HashMap::new())),
|
||||
mmap_cache: Arc::new(RwLock::new(MmapCache::default())),
|
||||
_temp_directory: Arc::new(None)
|
||||
})
|
||||
}
|
||||
@@ -95,6 +193,14 @@ impl MmapDirectory {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_cache_info(&mut self) -> CacheInfo {
|
||||
self.mmap_cache
|
||||
.write()
|
||||
.expect("Mmap cache lock is poisoned.")
|
||||
.get_info()
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
/// This Write wraps a File, but has the specificity of
|
||||
@@ -128,47 +234,21 @@ impl Seek for SafeFileWriter {
|
||||
|
||||
impl Directory for MmapDirectory {
|
||||
|
||||
|
||||
|
||||
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, FileError> {
|
||||
debug!("Open Read {:?}", path);
|
||||
let full_path = self.resolve_path(path);
|
||||
|
||||
let mut mmap_cache = try!(
|
||||
self.mmap_cache
|
||||
.write()
|
||||
.map_err(|_| {
|
||||
make_io_err(format!("Failed to acquired write lock on mmap cache while reading {:?}", path))
|
||||
})
|
||||
);
|
||||
|
||||
let mmap = match mmap_cache.entry(full_path.clone()) {
|
||||
HashMapEntry::Occupied(e) => {
|
||||
e.get().clone()
|
||||
}
|
||||
HashMapEntry::Vacant(vacant_entry) => {
|
||||
let file = try!(
|
||||
File::open(&full_path).map_err(|err| {
|
||||
if err.kind() == io::ErrorKind::NotFound {
|
||||
FileError::FileDoesNotExist(full_path.clone())
|
||||
}
|
||||
else {
|
||||
FileError::IOError(err)
|
||||
}
|
||||
})
|
||||
);
|
||||
if try!(file.metadata()).len() == 0 {
|
||||
// if the file size is 0, it will not be possible
|
||||
// to mmap the file, so we return an anonymous mmap_cache
|
||||
// instead.
|
||||
return Ok(ReadOnlySource::Anonymous(SharedVecSlice::empty()))
|
||||
}
|
||||
let new_mmap = try!(MmapReadOnly::open(&file));
|
||||
vacant_entry.insert(new_mmap.clone());
|
||||
new_mmap
|
||||
}
|
||||
};
|
||||
Ok(ReadOnlySource::Mmap(mmap))
|
||||
let mut mmap_cache = self.mmap_cache
|
||||
.write()
|
||||
.map_err(|_| {
|
||||
make_io_err(format!("Failed to acquired write lock on mmap cache while reading {:?}", path))
|
||||
})?;
|
||||
|
||||
Ok(mmap_cache.get_mmap(full_path)?
|
||||
.map(MmapReadOnly::from)
|
||||
.map(ReadOnlySource::Mmap)
|
||||
.unwrap_or(ReadOnlySource::Anonymous(SharedVecSlice::empty()))
|
||||
)
|
||||
}
|
||||
|
||||
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError> {
|
||||
@@ -214,7 +294,7 @@ impl Directory for MmapDirectory {
|
||||
// Removing the entry in the MMap cache.
|
||||
// The munmap will appear on Drop,
|
||||
// when the last reference is gone.
|
||||
mmap_cache.remove(&full_path);
|
||||
mmap_cache.cache.remove(&full_path);
|
||||
try!(fs::remove_file(&full_path));
|
||||
try!(self.sync_directory());
|
||||
Ok(())
|
||||
@@ -240,3 +320,101 @@ impl Directory for MmapDirectory {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
// There are more tests in directory/mod.rs
|
||||
// The following tests are specific to the MmapDirectory
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_open_empty() {
|
||||
// empty file is actually an edge case because those
|
||||
// cannot be mmapped.
|
||||
//
|
||||
// In that case the directory returns a SharedVecSlice.
|
||||
let mut mmap_directory = MmapDirectory::create_from_tempdir().unwrap();
|
||||
let path = PathBuf::from("test");
|
||||
{
|
||||
let mut w = mmap_directory.open_write(&path).unwrap();
|
||||
w.flush().unwrap();
|
||||
}
|
||||
let readonlymap = mmap_directory.open_read(&path).unwrap();
|
||||
assert_eq!(readonlymap.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cache() {
|
||||
|
||||
|
||||
let content = "abc".as_bytes();
|
||||
|
||||
// here we test if the cache releases
|
||||
// mmaps correctly.
|
||||
let mut mmap_directory = MmapDirectory::create_from_tempdir().unwrap();
|
||||
let paths: Vec<PathBuf> = (0..10)
|
||||
.map(|i| PathBuf::from(&*format!("file_{}", i)))
|
||||
.collect();
|
||||
{
|
||||
for path in &paths {
|
||||
let mut w = mmap_directory.open_write(path).unwrap();
|
||||
w.write(content).unwrap();
|
||||
w.flush().unwrap();
|
||||
}
|
||||
}
|
||||
{
|
||||
for path in &paths {
|
||||
{
|
||||
let _r = mmap_directory.open_read(path).unwrap();
|
||||
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 1);
|
||||
}
|
||||
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0);
|
||||
}
|
||||
}
|
||||
assert_eq!(mmap_directory.get_cache_info().counters.miss_empty, 10);
|
||||
|
||||
|
||||
{
|
||||
// test weak miss
|
||||
// the first pass create the weak refs.
|
||||
for path in &paths {
|
||||
let _r = mmap_directory.open_read(path).unwrap();
|
||||
}
|
||||
// ... the second hits the weak refs.
|
||||
for path in &paths {
|
||||
let _r = mmap_directory.open_read(path).unwrap();
|
||||
}
|
||||
let cache_info = mmap_directory.get_cache_info();
|
||||
assert_eq!(cache_info.counters.miss_empty, 20);
|
||||
assert_eq!(cache_info.counters.miss_weak, 10);
|
||||
}
|
||||
|
||||
{
|
||||
let mut saved_readmmaps = vec!();
|
||||
// Keeps reference alive
|
||||
for (i, path) in paths.iter().enumerate() {
|
||||
let r = mmap_directory.open_read(path).unwrap();
|
||||
saved_readmmaps.push(r);
|
||||
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), i + 1);
|
||||
}
|
||||
let cache_info = mmap_directory.get_cache_info();
|
||||
println!("{:?}", cache_info);
|
||||
assert_eq!(cache_info.counters.miss_empty, 30);
|
||||
assert_eq!(cache_info.counters.miss_weak, 10);
|
||||
assert_eq!(cache_info.mmapped.len(), 10);
|
||||
|
||||
for saved_readmmap in saved_readmmaps {
|
||||
assert_eq!(saved_readmmap.as_slice(), content);
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -28,8 +28,6 @@ extern crate log;
|
||||
|
||||
#[macro_use]
|
||||
extern crate version;
|
||||
|
||||
#[macro_use]
|
||||
extern crate fst;
|
||||
extern crate byteorder;
|
||||
extern crate memmap;
|
||||
|
||||
Reference in New Issue
Block a user