diff --git a/src/core/segment.rs b/src/core/segment.rs index 75f4d2596..7baf9516c 100644 --- a/src/core/segment.rs +++ b/src/core/segment.rs @@ -4,7 +4,7 @@ use schema::Schema; use DocId; use std::fmt; use core::SegmentId; -use directory::{ReadOnlySource, WritePtr}; +use directory::{ReadOnlySource, WritePtr, FileProtection}; use indexer::segment_serializer::SegmentSerializer; use super::SegmentComponent; use core::Index; @@ -70,6 +70,11 @@ impl Segment { self.meta.relative_path(component) } + pub fn protect_from_delete(&self, component: SegmentComponent) -> FileProtection { + let path = self.relative_path(component); + self.index.directory().protect_file_from_delete(&path) + } + /// Open one of the component file for read. pub fn open_read(&self, component: SegmentComponent) -> result::Result { let path = self.relative_path(component); diff --git a/src/directory/directory.rs b/src/directory/directory.rs index 2f3cb4146..c4a98b237 100644 --- a/src/directory/directory.rs +++ b/src/directory/directory.rs @@ -1,7 +1,7 @@ use std::marker::Send; use std::fmt; use std::path::Path; -use directory::error::{FileError, OpenWriteError}; +use directory::error::{FileError, DeleteError, OpenWriteError}; use directory::{ReadOnlySource, WritePtr}; use std::result; use std::io; @@ -35,7 +35,7 @@ pub trait Directory: fmt::Debug + Send + Sync + 'static { /// /// Removing a nonexistent file, yields a /// `FileError::DoesNotExist`. - fn delete(&self, path: &Path) -> result::Result<(), FileError>; + fn delete(&self, path: &Path) -> result::Result<(), DeleteError>; /// Returns true iff the file exists fn exists(&self, path: &Path) -> bool; diff --git a/src/directory/error.rs b/src/directory/error.rs index aacfe62d3..711f4a3df 100644 --- a/src/directory/error.rs +++ b/src/directory/error.rs @@ -27,7 +27,7 @@ impl From for OpenWriteError { } } -/// Error that may occur when accessing a file (read, or delete) +/// Error that may occur when accessing a file read #[derive(Debug)] pub enum FileError { /// The file does not exists. @@ -36,3 +36,17 @@ pub enum FileError { /// interacting with the underlying IO device. IOError(io::Error), } + + +/// Error that may occur when trying to delete a file +#[derive(Debug)] +pub enum DeleteError { + /// The file does not exists. + FileDoesNotExist(PathBuf), + /// Any kind of IO error that happens when + /// interacting with the underlying IO device. + IOError(io::Error), + /// The file may not be deleted because it is + /// protected. + FileProtected(PathBuf), +} diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs index e764471fc..ffa1697af 100644 --- a/src/directory/managed_directory.rs +++ b/src/directory/managed_directory.rs @@ -1,5 +1,5 @@ use std::path::{Path, PathBuf}; -use directory::error::{FileError, OpenWriteError}; +use directory::error::{FileError, DeleteError, OpenWriteError}; use directory::{ReadOnlySource, WritePtr}; use std::result; use std::io; @@ -9,6 +9,8 @@ use std::collections::HashSet; use std::io::Write; use rustc_serialize::json; use core::MANAGED_FILEPATH; +use std::collections::HashMap; +use std::fmt; use Result; use Error; @@ -24,7 +26,30 @@ use Error; #[derive(Debug)] pub struct ManagedDirectory { directory: Box, - managed_paths: Arc>>, + meta_informations: Arc>, +} + +#[derive(Debug, Default)] +struct MetaInformation { + managed_paths: HashSet, + protected_files: HashMap, +} + +pub struct FileProtection { + directory: ManagedDirectory, + path: PathBuf, +} + +impl fmt::Debug for FileProtection { + fn fmt(&self, formatter: &mut fmt::Formatter) -> result::Result<(), fmt::Error> { + write!(formatter, "FileProtectionFor({:?})", self.path) + } +} + +impl Drop for FileProtection { + fn drop(&mut self) { + self.directory.unprotect_file_from_delete(&self.path); + } } impl ManagedDirectory { @@ -38,13 +63,17 @@ impl ManagedDirectory { .map_err(|e| Error::CorruptedFile(MANAGED_FILEPATH.clone(), Box::new(e)))?; Ok(ManagedDirectory { directory: box directory, - managed_paths: Arc::new(RwLock::new(managed_files)), + meta_informations: Arc::new(RwLock::new( + MetaInformation { + managed_paths: managed_files, + protected_files: HashMap::default() + })), }) } Err(FileError::FileDoesNotExist(_)) => { Ok(ManagedDirectory { directory: box directory, - managed_paths: Arc::default(), + meta_informations: Arc::default(), }) } Err(FileError::IOError(e)) => { @@ -65,54 +94,98 @@ impl ManagedDirectory { /// an error is simply logged, and the file remains in the list of managed /// files. pub fn garbage_collect(&mut self, living_files: HashSet) { - let mut managed_has_changed: bool = false; - { - let mut files_to_delete = vec!(); - let mut managed_paths_write = self.managed_paths.write().unwrap(); + let mut files_to_delete = vec!(); + { // releasing the lock as .delete() will use it too. + let mut meta_informations_wlock = self.meta_informations.write().unwrap(); + let managed_paths_write = &mut meta_informations_wlock.managed_paths; for managed_path in managed_paths_write.iter() { if !living_files.contains(managed_path) { files_to_delete.push(managed_path.clone()); } } + } + + let mut deleted_files = vec!(); + { for file_to_delete in files_to_delete { - match self.directory.delete(&file_to_delete) { + match self.delete(&file_to_delete) { Ok(_) => { info!("Deleted {:?}", file_to_delete); - managed_has_changed |= managed_paths_write.remove(&file_to_delete); + deleted_files.push(file_to_delete); } Err(file_error) => { error!("Failed to delete {:?}", file_to_delete); match file_error { - FileError::FileDoesNotExist(_) => { - managed_has_changed |= managed_paths_write.remove(&file_to_delete); + DeleteError::FileDoesNotExist(_) => { + deleted_files.push(file_to_delete); } - FileError::IOError(_) => { + DeleteError::IOError(_) => { if !cfg!(target_os = "windows") { error!("Failed to delete {:?}", file_to_delete); } } - + DeleteError::FileProtected(_) => { + // this is expected. + } } } } } } - if managed_has_changed { + + + if !deleted_files.is_empty() { + // update the list of managed files by removing + // the file that were removed. + { + let mut meta_informations_wlock = self.meta_informations.write().unwrap(); + let managed_paths_write = &mut meta_informations_wlock.managed_paths; + for delete_file in &deleted_files { + managed_paths_write.remove(delete_file); + } + } if let Err(_) = self.save_managed_paths() { error!("Failed to save the list of managed files."); } } + + } + + pub fn protect_file_from_delete(&self, path: &Path) -> FileProtection { + let mut meta_informations_wlock = self.meta_informations + .write() + .expect("Managed file lock poisoned"); + let pathbuf = path.to_owned(); + *meta_informations_wlock + .protected_files + .entry(pathbuf.clone()) + .or_insert(0) += 1; + FileProtection { + directory: self.clone(), + path: pathbuf.clone(), + } + } + + pub fn unprotect_file_from_delete(&self, path: &Path) { + let mut meta_informations_wlock = self.meta_informations + .write() + .expect("Managed file lock poisoned"); + if let Some(counter_ref_mut) = meta_informations_wlock + .protected_files + .get_mut(path) { + (*counter_ref_mut) -= 1; + } } /// Saves the file containing the list of existing files /// that were created by tantivy. fn save_managed_paths(&mut self,) -> io::Result<()> { - let managed_files_lock = self.managed_paths + let meta_informations_rlock = self.meta_informations .read() .expect("Managed file lock poisoned"); let mut w = vec!(); - try!(write!(&mut w, "{}\n", json::as_pretty_json(&*managed_files_lock))); + try!(write!(&mut w, "{}\n", json::as_pretty_json(&meta_informations_rlock.managed_paths))); self.directory.atomic_write(&MANAGED_FILEPATH, &w[..])?; Ok(()) } @@ -126,11 +199,10 @@ impl ManagedDirectory { /// never get removed. fn register_file_as_managed(&mut self, filepath: &Path) -> io::Result<()> { let has_changed = { - let mut managed_files_lock = self - .managed_paths + let mut meta_wlock = self.meta_informations .write() .expect("Managed file lock poisoned"); - managed_files_lock.insert(filepath.to_owned()) + meta_wlock.managed_paths.insert(filepath.to_owned()) }; if has_changed { self.save_managed_paths()?; @@ -159,7 +231,17 @@ impl Directory for ManagedDirectory { self.directory.atomic_read(path) } - fn delete(&self, path: &Path) -> result::Result<(), FileError> { + fn delete(&self, path: &Path) -> result::Result<(), DeleteError> { + { + let metas_rlock = self.meta_informations + .read() + .expect("poisoned lock in managed directory meta"); + if let Some(counter) = metas_rlock.protected_files.get(path) { + if *counter > 0 { + return Err(DeleteError::FileProtected(path.to_owned())) + } + } + } self.directory.delete(path) } @@ -177,7 +259,7 @@ impl Clone for ManagedDirectory { fn clone(&self) -> ManagedDirectory { ManagedDirectory { directory: self.directory.box_clone(), - managed_paths: self.managed_paths.clone(), + meta_informations: self.meta_informations.clone(), } } } diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index 6b2730a4b..9a5670b7f 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -1,7 +1,7 @@ use atomicwrites; use common::make_io_err; use directory::Directory; -use directory::error::{OpenWriteError, FileError, OpenDirectoryError}; +use directory::error::{OpenWriteError, FileError, DeleteError, OpenDirectoryError}; use directory::ReadOnlySource; use directory::shared_vec_slice::SharedVecSlice; use directory::WritePtr; @@ -334,13 +334,13 @@ impl Directory for MmapDirectory { Ok(BufWriter::new(Box::new(writer))) } - fn delete(&self, path: &Path) -> result::Result<(), FileError> { + fn delete(&self, path: &Path) -> result::Result<(), DeleteError> { debug!("Deleting file {:?}", path); let full_path = self.resolve_path(path); let mut mmap_cache = try!(self.mmap_cache .write() .map_err(|_| - FileError::IOError(make_io_err(format!("Failed to acquired write lock on mmap cache while deleting {:?}", path)))) + DeleteError::IOError(make_io_err(format!("Failed to acquired write lock on mmap cache while deleting {:?}", path)))) ); // Removing the entry in the MMap cache. // The munmap will appear on Drop, @@ -349,14 +349,14 @@ impl Directory for MmapDirectory { match fs::remove_file(&full_path) { Ok(_) => { self.sync_directory() - .map_err(|e| FileError::IOError(e)) + .map_err(|e| DeleteError::IOError(e)) } Err(e) => { if e.kind() == io::ErrorKind::NotFound { - Err(FileError::FileDoesNotExist(path.to_owned())) + Err(DeleteError::FileDoesNotExist(path.to_owned())) } else { - Err(FileError::IOError(e)) + Err(DeleteError::IOError(e)) } } } diff --git a/src/directory/mod.rs b/src/directory/mod.rs index f0bf91101..09f61da3e 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -15,7 +15,7 @@ pub use self::read_only_source::ReadOnlySource; pub use self::directory::Directory; pub use self::ram_directory::RAMDirectory; pub use self::mmap_directory::MmapDirectory; -pub use self::managed_directory::ManagedDirectory; +pub use self::managed_directory::{ManagedDirectory, FileProtection}; /// Synonym of Seek + Write pub trait SeekableWrite: Seek + Write {} diff --git a/src/directory/ram_directory.rs b/src/directory/ram_directory.rs index 32d44e184..b8a3b5c95 100644 --- a/src/directory/ram_directory.rs +++ b/src/directory/ram_directory.rs @@ -6,7 +6,7 @@ use std::result; use std::sync::{Arc, RwLock}; use common::make_io_err; use directory::{Directory, ReadOnlySource}; -use directory::error::{OpenWriteError, FileError}; +use directory::error::{OpenWriteError, FileError, DeleteError}; use directory::WritePtr; use super::shared_vec_slice::SharedVecSlice; @@ -104,12 +104,12 @@ impl InnerDirectory { }) } - fn delete(&self, path: &Path) -> result::Result<(), FileError> { + fn delete(&self, path: &Path) -> result::Result<(), DeleteError> { self.0 .write() .map_err(|_| { let io_err = make_io_err(format!("Failed to acquire write lock for the directory, when trying to delete {:?}", path)); - FileError::IOError(io_err) + DeleteError::IOError(io_err) }) .and_then(|mut writable_map| { match writable_map.remove(path) { @@ -117,7 +117,7 @@ impl InnerDirectory { Ok(()) }, None => { - Err(FileError::FileDoesNotExist(PathBuf::from(path))) + Err(DeleteError::FileDoesNotExist(PathBuf::from(path))) } } }) @@ -176,7 +176,7 @@ impl Directory for RAMDirectory { } } - fn delete(&self, path: &Path) -> result::Result<(), FileError> { + fn delete(&self, path: &Path) -> result::Result<(), DeleteError> { self.fs.delete(path) } diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 756d37242..a050dff4d 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -8,6 +8,7 @@ use core::SegmentMeta; use core::SegmentReader; use indexer::stamper::Stamper; use datastruct::stacker::Heap; +use directory::FileProtection; use Error; use Directory; use fastfield::delete::write_delete_bitset; @@ -207,13 +208,15 @@ pub fn compute_deleted_bitset( pub fn advance_deletes( mut segment: Segment, segment_entry: &mut SegmentEntry, - target_opstamp: u64) -> Result<()> { + target_opstamp: u64) -> Result> { + + let mut file_protect: Option = None; { if let Some(previous_opstamp) = segment_entry.meta().delete_opstamp() { // We are already up-to-date here. if target_opstamp == previous_opstamp { - return Ok(()); + return Ok(file_protect); } } let segment_reader = SegmentReader::open(segment.clone())?; @@ -245,13 +248,14 @@ pub fn advance_deletes( let num_deleted_docs = delete_bitset.len(); if num_deleted_docs > 0 { segment.set_delete_meta(num_deleted_docs as u32, target_opstamp); + file_protect = Some(segment.protect_from_delete(SegmentComponent::DELETE)); let mut delete_file = segment.open_write(SegmentComponent::DELETE)?; write_delete_bitset(&delete_bitset, &mut delete_file)?; } } segment_entry.set_meta(segment.meta().clone()); - Ok(()) + Ok(file_protect) } fn index_documents(heap: &mut Heap, diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index ca3c61684..651e9077b 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -14,6 +14,7 @@ use futures_cpupool::CpuPool; use futures::Future; use futures::Canceled; use futures::oneshot; +use directory::FileProtection; use indexer::{MergePolicy, DefaultMergePolicy}; use indexer::index_writer::advance_deletes; use indexer::MergeCandidate; @@ -105,12 +106,17 @@ fn perform_merge(segment_ids: &[SegmentId], let ref index = segment_updater.0.index; let schema = index.schema(); let mut segment_entries = vec!(); + + let mut file_protections: Vec = vec!(); + for segment_id in segment_ids { if let Some(mut segment_entry) = segment_updater.0 .segment_manager .segment_entry(segment_id) { let segment = index.segment(segment_entry.meta().clone()); - advance_deletes(segment, &mut segment_entry, target_opstamp)?; + if let Some(file_protection) = advance_deletes(segment, &mut segment_entry, target_opstamp)? { + file_protections.push(file_protection); + } segment_entries.push(segment_entry); } else { @@ -119,14 +125,6 @@ fn perform_merge(segment_ids: &[SegmentId], } } - // TODO REMOVEEEEE THIIIIIS - { - let living_files = segment_updater.0.segment_manager.list_files(); - let mut index = merged_segment.index().clone(); - index.directory_mut().garbage_collect(living_files); - } - - let delete_cursor = segment_entries[0].delete_cursor().clone(); let segments: Vec = segment_entries @@ -135,10 +133,11 @@ fn perform_merge(segment_ids: &[SegmentId], index.segment(segment_entry.meta().clone()) }) .collect(); + // An IndexMerger is like a "view" of our merged segments. let merger: IndexMerger = IndexMerger::open(schema, &segments[..])?; - + // ... we just serialize this index merger in our new segment // to merge the two segments. @@ -317,13 +316,11 @@ impl SegmentUpdater { let _merging_future_res = merging_future_send.send(merged_segment_meta); } Err(e) => { + error!("Merge of {:?} was cancelled: {:?}", segment_ids_vec, e); // ... cancel merge if cfg!(test) { panic!("Merge failed."); } - else { - error!("Merge of {:?} was cancelled: {:?}", segment_ids_vec, e); - } segment_updater_clone.cancel_merge(&segment_ids_vec, merged_segment_id); // merging_future_send will be dropped, sending an error to the future. }