issue/96 Added functionality to protect files from deletion

Hopefully fixed the race condition happening when merging files.
This commit is contained in:
Paul Masurel
2017-04-02 18:48:20 +09:00
parent 9eb2d3e8c5
commit 17631ed866
9 changed files with 156 additions and 54 deletions

View File

@@ -4,7 +4,7 @@ use schema::Schema;
use DocId;
use std::fmt;
use core::SegmentId;
use directory::{ReadOnlySource, WritePtr};
use directory::{ReadOnlySource, WritePtr, FileProtection};
use indexer::segment_serializer::SegmentSerializer;
use super::SegmentComponent;
use core::Index;
@@ -70,6 +70,11 @@ impl Segment {
self.meta.relative_path(component)
}
pub fn protect_from_delete(&self, component: SegmentComponent) -> FileProtection {
let path = self.relative_path(component);
self.index.directory().protect_file_from_delete(&path)
}
/// Open one of the component file for read.
pub fn open_read(&self, component: SegmentComponent) -> result::Result<ReadOnlySource, FileError> {
let path = self.relative_path(component);

View File

@@ -1,7 +1,7 @@
use std::marker::Send;
use std::fmt;
use std::path::Path;
use directory::error::{FileError, OpenWriteError};
use directory::error::{FileError, DeleteError, OpenWriteError};
use directory::{ReadOnlySource, WritePtr};
use std::result;
use std::io;
@@ -35,7 +35,7 @@ pub trait Directory: fmt::Debug + Send + Sync + 'static {
///
/// Removing a nonexistent file, yields a
/// `FileError::DoesNotExist`.
fn delete(&self, path: &Path) -> result::Result<(), FileError>;
fn delete(&self, path: &Path) -> result::Result<(), DeleteError>;
/// Returns true iff the file exists
fn exists(&self, path: &Path) -> bool;

View File

@@ -27,7 +27,7 @@ impl From<io::Error> for OpenWriteError {
}
}
/// Error that may occur when accessing a file (read, or delete)
/// Error that may occur when accessing a file read
#[derive(Debug)]
pub enum FileError {
/// The file does not exists.
@@ -36,3 +36,17 @@ pub enum FileError {
/// interacting with the underlying IO device.
IOError(io::Error),
}
/// Error that may occur when trying to delete a file
#[derive(Debug)]
pub enum DeleteError {
/// The file does not exists.
FileDoesNotExist(PathBuf),
/// Any kind of IO error that happens when
/// interacting with the underlying IO device.
IOError(io::Error),
/// The file may not be deleted because it is
/// protected.
FileProtected(PathBuf),
}

View File

@@ -1,5 +1,5 @@
use std::path::{Path, PathBuf};
use directory::error::{FileError, OpenWriteError};
use directory::error::{FileError, DeleteError, OpenWriteError};
use directory::{ReadOnlySource, WritePtr};
use std::result;
use std::io;
@@ -9,6 +9,8 @@ use std::collections::HashSet;
use std::io::Write;
use rustc_serialize::json;
use core::MANAGED_FILEPATH;
use std::collections::HashMap;
use std::fmt;
use Result;
use Error;
@@ -24,7 +26,30 @@ use Error;
#[derive(Debug)]
pub struct ManagedDirectory {
directory: Box<Directory>,
managed_paths: Arc<RwLock<HashSet<PathBuf>>>,
meta_informations: Arc<RwLock<MetaInformation>>,
}
#[derive(Debug, Default)]
struct MetaInformation {
managed_paths: HashSet<PathBuf>,
protected_files: HashMap<PathBuf, usize>,
}
pub struct FileProtection {
directory: ManagedDirectory,
path: PathBuf,
}
impl fmt::Debug for FileProtection {
fn fmt(&self, formatter: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
write!(formatter, "FileProtectionFor({:?})", self.path)
}
}
impl Drop for FileProtection {
fn drop(&mut self) {
self.directory.unprotect_file_from_delete(&self.path);
}
}
impl ManagedDirectory {
@@ -38,13 +63,17 @@ impl ManagedDirectory {
.map_err(|e| Error::CorruptedFile(MANAGED_FILEPATH.clone(), Box::new(e)))?;
Ok(ManagedDirectory {
directory: box directory,
managed_paths: Arc::new(RwLock::new(managed_files)),
meta_informations: Arc::new(RwLock::new(
MetaInformation {
managed_paths: managed_files,
protected_files: HashMap::default()
})),
})
}
Err(FileError::FileDoesNotExist(_)) => {
Ok(ManagedDirectory {
directory: box directory,
managed_paths: Arc::default(),
meta_informations: Arc::default(),
})
}
Err(FileError::IOError(e)) => {
@@ -65,54 +94,98 @@ impl ManagedDirectory {
/// an error is simply logged, and the file remains in the list of managed
/// files.
pub fn garbage_collect(&mut self, living_files: HashSet<PathBuf>) {
let mut managed_has_changed: bool = false;
{
let mut files_to_delete = vec!();
let mut managed_paths_write = self.managed_paths.write().unwrap();
let mut files_to_delete = vec!();
{ // releasing the lock as .delete() will use it too.
let mut meta_informations_wlock = self.meta_informations.write().unwrap();
let managed_paths_write = &mut meta_informations_wlock.managed_paths;
for managed_path in managed_paths_write.iter() {
if !living_files.contains(managed_path) {
files_to_delete.push(managed_path.clone());
}
}
}
let mut deleted_files = vec!();
{
for file_to_delete in files_to_delete {
match self.directory.delete(&file_to_delete) {
match self.delete(&file_to_delete) {
Ok(_) => {
info!("Deleted {:?}", file_to_delete);
managed_has_changed |= managed_paths_write.remove(&file_to_delete);
deleted_files.push(file_to_delete);
}
Err(file_error) => {
error!("Failed to delete {:?}", file_to_delete);
match file_error {
FileError::FileDoesNotExist(_) => {
managed_has_changed |= managed_paths_write.remove(&file_to_delete);
DeleteError::FileDoesNotExist(_) => {
deleted_files.push(file_to_delete);
}
FileError::IOError(_) => {
DeleteError::IOError(_) => {
if !cfg!(target_os = "windows") {
error!("Failed to delete {:?}", file_to_delete);
}
}
DeleteError::FileProtected(_) => {
// this is expected.
}
}
}
}
}
}
if managed_has_changed {
if !deleted_files.is_empty() {
// update the list of managed files by removing
// the file that were removed.
{
let mut meta_informations_wlock = self.meta_informations.write().unwrap();
let managed_paths_write = &mut meta_informations_wlock.managed_paths;
for delete_file in &deleted_files {
managed_paths_write.remove(delete_file);
}
}
if let Err(_) = self.save_managed_paths() {
error!("Failed to save the list of managed files.");
}
}
}
pub fn protect_file_from_delete(&self, path: &Path) -> FileProtection {
let mut meta_informations_wlock = self.meta_informations
.write()
.expect("Managed file lock poisoned");
let pathbuf = path.to_owned();
*meta_informations_wlock
.protected_files
.entry(pathbuf.clone())
.or_insert(0) += 1;
FileProtection {
directory: self.clone(),
path: pathbuf.clone(),
}
}
pub fn unprotect_file_from_delete(&self, path: &Path) {
let mut meta_informations_wlock = self.meta_informations
.write()
.expect("Managed file lock poisoned");
if let Some(counter_ref_mut) = meta_informations_wlock
.protected_files
.get_mut(path) {
(*counter_ref_mut) -= 1;
}
}
/// Saves the file containing the list of existing files
/// that were created by tantivy.
fn save_managed_paths(&mut self,) -> io::Result<()> {
let managed_files_lock = self.managed_paths
let meta_informations_rlock = self.meta_informations
.read()
.expect("Managed file lock poisoned");
let mut w = vec!();
try!(write!(&mut w, "{}\n", json::as_pretty_json(&*managed_files_lock)));
try!(write!(&mut w, "{}\n", json::as_pretty_json(&meta_informations_rlock.managed_paths)));
self.directory.atomic_write(&MANAGED_FILEPATH, &w[..])?;
Ok(())
}
@@ -126,11 +199,10 @@ impl ManagedDirectory {
/// never get removed.
fn register_file_as_managed(&mut self, filepath: &Path) -> io::Result<()> {
let has_changed = {
let mut managed_files_lock = self
.managed_paths
let mut meta_wlock = self.meta_informations
.write()
.expect("Managed file lock poisoned");
managed_files_lock.insert(filepath.to_owned())
meta_wlock.managed_paths.insert(filepath.to_owned())
};
if has_changed {
self.save_managed_paths()?;
@@ -159,7 +231,17 @@ impl Directory for ManagedDirectory {
self.directory.atomic_read(path)
}
fn delete(&self, path: &Path) -> result::Result<(), FileError> {
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
{
let metas_rlock = self.meta_informations
.read()
.expect("poisoned lock in managed directory meta");
if let Some(counter) = metas_rlock.protected_files.get(path) {
if *counter > 0 {
return Err(DeleteError::FileProtected(path.to_owned()))
}
}
}
self.directory.delete(path)
}
@@ -177,7 +259,7 @@ impl Clone for ManagedDirectory {
fn clone(&self) -> ManagedDirectory {
ManagedDirectory {
directory: self.directory.box_clone(),
managed_paths: self.managed_paths.clone(),
meta_informations: self.meta_informations.clone(),
}
}
}

View File

@@ -1,7 +1,7 @@
use atomicwrites;
use common::make_io_err;
use directory::Directory;
use directory::error::{OpenWriteError, FileError, OpenDirectoryError};
use directory::error::{OpenWriteError, FileError, DeleteError, OpenDirectoryError};
use directory::ReadOnlySource;
use directory::shared_vec_slice::SharedVecSlice;
use directory::WritePtr;
@@ -334,13 +334,13 @@ impl Directory for MmapDirectory {
Ok(BufWriter::new(Box::new(writer)))
}
fn delete(&self, path: &Path) -> result::Result<(), FileError> {
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
debug!("Deleting file {:?}", path);
let full_path = self.resolve_path(path);
let mut mmap_cache = try!(self.mmap_cache
.write()
.map_err(|_|
FileError::IOError(make_io_err(format!("Failed to acquired write lock on mmap cache while deleting {:?}", path))))
DeleteError::IOError(make_io_err(format!("Failed to acquired write lock on mmap cache while deleting {:?}", path))))
);
// Removing the entry in the MMap cache.
// The munmap will appear on Drop,
@@ -349,14 +349,14 @@ impl Directory for MmapDirectory {
match fs::remove_file(&full_path) {
Ok(_) => {
self.sync_directory()
.map_err(|e| FileError::IOError(e))
.map_err(|e| DeleteError::IOError(e))
}
Err(e) => {
if e.kind() == io::ErrorKind::NotFound {
Err(FileError::FileDoesNotExist(path.to_owned()))
Err(DeleteError::FileDoesNotExist(path.to_owned()))
}
else {
Err(FileError::IOError(e))
Err(DeleteError::IOError(e))
}
}
}

View File

@@ -15,7 +15,7 @@ pub use self::read_only_source::ReadOnlySource;
pub use self::directory::Directory;
pub use self::ram_directory::RAMDirectory;
pub use self::mmap_directory::MmapDirectory;
pub use self::managed_directory::ManagedDirectory;
pub use self::managed_directory::{ManagedDirectory, FileProtection};
/// Synonym of Seek + Write
pub trait SeekableWrite: Seek + Write {}

View File

@@ -6,7 +6,7 @@ use std::result;
use std::sync::{Arc, RwLock};
use common::make_io_err;
use directory::{Directory, ReadOnlySource};
use directory::error::{OpenWriteError, FileError};
use directory::error::{OpenWriteError, FileError, DeleteError};
use directory::WritePtr;
use super::shared_vec_slice::SharedVecSlice;
@@ -104,12 +104,12 @@ impl InnerDirectory {
})
}
fn delete(&self, path: &Path) -> result::Result<(), FileError> {
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
self.0
.write()
.map_err(|_| {
let io_err = make_io_err(format!("Failed to acquire write lock for the directory, when trying to delete {:?}", path));
FileError::IOError(io_err)
DeleteError::IOError(io_err)
})
.and_then(|mut writable_map| {
match writable_map.remove(path) {
@@ -117,7 +117,7 @@ impl InnerDirectory {
Ok(())
},
None => {
Err(FileError::FileDoesNotExist(PathBuf::from(path)))
Err(DeleteError::FileDoesNotExist(PathBuf::from(path)))
}
}
})
@@ -176,7 +176,7 @@ impl Directory for RAMDirectory {
}
}
fn delete(&self, path: &Path) -> result::Result<(), FileError> {
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
self.fs.delete(path)
}

View File

@@ -8,6 +8,7 @@ use core::SegmentMeta;
use core::SegmentReader;
use indexer::stamper::Stamper;
use datastruct::stacker::Heap;
use directory::FileProtection;
use Error;
use Directory;
use fastfield::delete::write_delete_bitset;
@@ -207,13 +208,15 @@ pub fn compute_deleted_bitset(
pub fn advance_deletes(
mut segment: Segment,
segment_entry: &mut SegmentEntry,
target_opstamp: u64) -> Result<()> {
target_opstamp: u64) -> Result<Option<FileProtection>> {
let mut file_protect: Option<FileProtection> = None;
{
if let Some(previous_opstamp) = segment_entry.meta().delete_opstamp() {
// We are already up-to-date here.
if target_opstamp == previous_opstamp {
return Ok(());
return Ok(file_protect);
}
}
let segment_reader = SegmentReader::open(segment.clone())?;
@@ -245,13 +248,14 @@ pub fn advance_deletes(
let num_deleted_docs = delete_bitset.len();
if num_deleted_docs > 0 {
segment.set_delete_meta(num_deleted_docs as u32, target_opstamp);
file_protect = Some(segment.protect_from_delete(SegmentComponent::DELETE));
let mut delete_file = segment.open_write(SegmentComponent::DELETE)?;
write_delete_bitset(&delete_bitset, &mut delete_file)?;
}
}
segment_entry.set_meta(segment.meta().clone());
Ok(())
Ok(file_protect)
}
fn index_documents(heap: &mut Heap,

View File

@@ -14,6 +14,7 @@ use futures_cpupool::CpuPool;
use futures::Future;
use futures::Canceled;
use futures::oneshot;
use directory::FileProtection;
use indexer::{MergePolicy, DefaultMergePolicy};
use indexer::index_writer::advance_deletes;
use indexer::MergeCandidate;
@@ -105,12 +106,17 @@ fn perform_merge(segment_ids: &[SegmentId],
let ref index = segment_updater.0.index;
let schema = index.schema();
let mut segment_entries = vec!();
let mut file_protections: Vec<FileProtection> = vec!();
for segment_id in segment_ids {
if let Some(mut segment_entry) = segment_updater.0
.segment_manager
.segment_entry(segment_id) {
let segment = index.segment(segment_entry.meta().clone());
advance_deletes(segment, &mut segment_entry, target_opstamp)?;
if let Some(file_protection) = advance_deletes(segment, &mut segment_entry, target_opstamp)? {
file_protections.push(file_protection);
}
segment_entries.push(segment_entry);
}
else {
@@ -119,14 +125,6 @@ fn perform_merge(segment_ids: &[SegmentId],
}
}
// TODO REMOVEEEEE THIIIIIS
{
let living_files = segment_updater.0.segment_manager.list_files();
let mut index = merged_segment.index().clone();
index.directory_mut().garbage_collect(living_files);
}
let delete_cursor = segment_entries[0].delete_cursor().clone();
let segments: Vec<Segment> = segment_entries
@@ -135,10 +133,11 @@ fn perform_merge(segment_ids: &[SegmentId],
index.segment(segment_entry.meta().clone())
})
.collect();
// An IndexMerger is like a "view" of our merged segments.
let merger: IndexMerger = IndexMerger::open(schema, &segments[..])?;
// ... we just serialize this index merger in our new segment
// to merge the two segments.
@@ -317,13 +316,11 @@ impl SegmentUpdater {
let _merging_future_res = merging_future_send.send(merged_segment_meta);
}
Err(e) => {
error!("Merge of {:?} was cancelled: {:?}", segment_ids_vec, e);
// ... cancel merge
if cfg!(test) {
panic!("Merge failed.");
}
else {
error!("Merge of {:?} was cancelled: {:?}", segment_ids_vec, e);
}
segment_updater_clone.cancel_merge(&segment_ids_vec, merged_segment_id);
// merging_future_send will be dropped, sending an error to the future.
}