diff --git a/src/directory/directory_lock.rs b/src/directory/directory_lock.rs index a6321b50b..7159fca06 100644 --- a/src/directory/directory_lock.rs +++ b/src/directory/directory_lock.rs @@ -58,3 +58,9 @@ pub static META_LOCK: Lazy = Lazy::new(|| Lock { filepath: PathBuf::from(".tantivy-meta.lock"), is_blocking: true, }); + +#[allow(missing_docs)] +pub static MANAGED_LOCK: Lazy = Lazy::new(|| Lock { + filepath: PathBuf::from(".tantivy-managed.lock"), + is_blocking: true, +}); diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs index 754c2ea34..e2d5d3d60 100644 --- a/src/directory/managed_directory.rs +++ b/src/directory/managed_directory.rs @@ -1,7 +1,7 @@ use std::collections::HashSet; use std::io::Write; use std::path::{Path, PathBuf}; -use std::sync::{Arc, RwLock, RwLockWriteGuard}; +use std::sync::Arc; use std::{io, result}; use crc32fast::Hasher; @@ -11,7 +11,7 @@ use crate::directory::error::{DeleteError, LockError, OpenReadError, OpenWriteEr use crate::directory::footer::{Footer, FooterProxy}; use crate::directory::{ DirectoryLock, FileHandle, FileSlice, GarbageCollectionResult, Lock, WatchCallback, - WatchHandle, WritePtr, META_LOCK, + WatchHandle, WritePtr, MANAGED_LOCK, META_LOCK, }; use crate::error::DataCorruption; use crate::Directory; @@ -39,9 +39,9 @@ fn is_managed(path: &Path) -> bool { #[derive(Debug)] pub struct ManagedDirectory { directory: Box, - meta_informations: Arc>, } +#[allow(unused)] #[derive(Debug, Default)] struct MetaInformation { managed_paths: HashSet, @@ -51,9 +51,9 @@ struct MetaInformation { /// that were created by tantivy. fn save_managed_paths( directory: &dyn Directory, - wlock: &RwLockWriteGuard<'_, MetaInformation>, + managed_paths: &HashSet, ) -> io::Result<()> { - let mut w = serde_json::to_vec(&wlock.managed_paths)?; + let mut w = serde_json::to_vec(managed_paths)?; writeln!(&mut w)?; directory.atomic_write(&MANAGED_FILEPATH, &w[..])?; Ok(()) @@ -62,7 +62,12 @@ fn save_managed_paths( impl ManagedDirectory { /// Wraps a directory as managed directory. pub fn wrap(directory: Box) -> crate::Result { - match directory.atomic_read(&MANAGED_FILEPATH) { + Ok(ManagedDirectory { directory }) + } + + #[allow(missing_docs)] + pub fn get_managed_paths(&self) -> crate::Result> { + match self.directory.atomic_read(&MANAGED_FILEPATH) { Ok(data) => { let managed_files_json = String::from_utf8_lossy(&data); let managed_files: HashSet = serde_json::from_str(&managed_files_json) @@ -72,17 +77,9 @@ impl ManagedDirectory { format!("Managed file cannot be deserialized: {e:?}. "), ) })?; - Ok(ManagedDirectory { - directory, - meta_informations: Arc::new(RwLock::new(MetaInformation { - managed_paths: managed_files, - })), - }) + Ok(managed_files) } - Err(OpenReadError::FileDoesNotExist(_)) => Ok(ManagedDirectory { - directory, - meta_informations: Arc::default(), - }), + Err(OpenReadError::FileDoesNotExist(_)) => Ok(HashSet::new()), io_err @ Err(OpenReadError::IoError { .. }) => Err(io_err.err().unwrap().into()), Err(OpenReadError::IncompatibleIndex(incompatibility)) => { // For the moment, this should never happen `meta.json` @@ -110,9 +107,11 @@ impl ManagedDirectory { &mut self, get_living_files: L, ) -> crate::Result { - info!("Garbage collect"); let mut files_to_delete = vec![]; + // We're about to do an atomic write to managed.json, lock it down + let _lock = self.acquire_lock(&MANAGED_LOCK)?; + let managed_paths = self.get_managed_paths()?; // It is crucial to get the living files after acquiring the // read lock of meta information. That way, we // avoid the following scenario. @@ -124,11 +123,6 @@ impl ManagedDirectory { // // releasing the lock as .delete() will use it too. { - let meta_informations_rlock = self - .meta_informations - .read() - .expect("Managed directory rlock poisoned in garbage collect."); - // The point of this second "file" lock is to enforce the following scenario // 1) process B tries to load a new set of searcher. // The list of segments is loaded @@ -138,7 +132,7 @@ impl ManagedDirectory { match self.acquire_lock(&META_LOCK) { Ok(_meta_lock) => { let living_files = get_living_files(); - for managed_path in &meta_informations_rlock.managed_paths { + for managed_path in &managed_paths { if !living_files.contains(managed_path) { files_to_delete.push(managed_path.clone()); } @@ -181,16 +175,12 @@ impl ManagedDirectory { if !deleted_files.is_empty() { // update the list of managed files by removing // the file that were removed. - let mut meta_informations_wlock = self - .meta_informations - .write() - .expect("Managed directory wlock poisoned (2)."); - let managed_paths_write = &mut meta_informations_wlock.managed_paths; + let mut managed_paths_write = managed_paths; for delete_file in &deleted_files { managed_paths_write.remove(delete_file); } self.directory.sync_directory()?; - save_managed_paths(self.directory.as_mut(), &meta_informations_wlock)?; + save_managed_paths(self.directory.as_mut(), &managed_paths_write)?; } Ok(GarbageCollectionResult { @@ -215,15 +205,20 @@ impl ManagedDirectory { if !is_managed(filepath) { return Ok(()); } - let mut meta_wlock = self - .meta_informations - .write() - .expect("Managed file lock poisoned"); - let has_changed = meta_wlock.managed_paths.insert(filepath.to_owned()); + + // We're about to do an atomic write to managed.json, lock it down + let _lock = self + .acquire_lock(&MANAGED_LOCK) + .expect("must be able to acquire lock for managed.json"); + + let mut managed_paths = self + .get_managed_paths() + .expect("reading managed files should not fail"); + let has_changed = managed_paths.insert(filepath.to_owned()); if !has_changed { return Ok(()); } - save_managed_paths(self.directory.as_ref(), &meta_wlock)?; + save_managed_paths(self.directory.as_ref(), &managed_paths)?; // This is not the first file we add. // Therefore, we are sure that `.managed.json` has been already // properly created and we do not need to sync its parent directory. @@ -231,11 +226,12 @@ impl ManagedDirectory { // (It might seem like a nicer solution to create the managed_json on the // creation of the ManagedDirectory instance but it would actually // prevent the use of read-only directories..) - let managed_file_definitely_already_exists = meta_wlock.managed_paths.len() > 1; + let managed_file_definitely_already_exists = managed_paths.len() > 1; if managed_file_definitely_already_exists { return Ok(()); } self.directory.sync_directory()?; + Ok(()) } @@ -258,13 +254,11 @@ impl ManagedDirectory { /// List all managed files pub fn list_managed_files(&self) -> HashSet { - let managed_paths = self - .meta_informations - .read() - .expect("Managed directory rlock poisoned in list damaged.") - .managed_paths - .clone(); - managed_paths + let _lock = self + .acquire_lock(&MANAGED_LOCK) + .expect("must be able to acquire lock for managed.json"); + self.get_managed_paths() + .expect("reading managed files should not fail") } } @@ -329,7 +323,6 @@ impl Clone for ManagedDirectory { fn clone(&self) -> ManagedDirectory { ManagedDirectory { directory: self.directory.box_clone(), - meta_informations: Arc::clone(&self.meta_informations), } } } diff --git a/src/directory/mod.rs b/src/directory/mod.rs index 7fab7e051..8fbebba98 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -24,7 +24,7 @@ pub use common::{AntiCallToken, OwnedBytes, TerminatingWrite}; pub(crate) use self::composite_file::{CompositeFile, CompositeWrite}; pub use self::directory::{Directory, DirectoryClone, DirectoryLock}; -pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK}; +pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, MANAGED_LOCK, META_LOCK}; pub use self::ram_directory::RamDirectory; pub use self::watch_event_router::{WatchCallback, WatchCallbackList, WatchHandle};