mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-05-30 15:10:40 +00:00
Fix managed paths (#5)
This commit is contained in:
@@ -58,3 +58,9 @@ pub static META_LOCK: Lazy<Lock> = Lazy::new(|| Lock {
|
||||
filepath: PathBuf::from(".tantivy-meta.lock"),
|
||||
is_blocking: true,
|
||||
});
|
||||
|
||||
#[allow(missing_docs)]
|
||||
pub static MANAGED_LOCK: Lazy<Lock> = Lazy::new(|| Lock {
|
||||
filepath: PathBuf::from(".tantivy-managed.lock"),
|
||||
is_blocking: true,
|
||||
});
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::collections::HashSet;
|
||||
use std::io::Write;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, RwLock, RwLockWriteGuard};
|
||||
use std::sync::Arc;
|
||||
use std::{io, result};
|
||||
|
||||
use crc32fast::Hasher;
|
||||
@@ -11,7 +11,7 @@ use crate::directory::error::{DeleteError, LockError, OpenReadError, OpenWriteEr
|
||||
use crate::directory::footer::{Footer, FooterProxy};
|
||||
use crate::directory::{
|
||||
DirectoryLock, FileHandle, FileSlice, GarbageCollectionResult, Lock, WatchCallback,
|
||||
WatchHandle, WritePtr, META_LOCK,
|
||||
WatchHandle, WritePtr, MANAGED_LOCK, META_LOCK,
|
||||
};
|
||||
use crate::error::DataCorruption;
|
||||
use crate::Directory;
|
||||
@@ -39,9 +39,9 @@ fn is_managed(path: &Path) -> bool {
|
||||
#[derive(Debug)]
|
||||
pub struct ManagedDirectory {
|
||||
directory: Box<dyn Directory>,
|
||||
meta_informations: Arc<RwLock<MetaInformation>>,
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
#[derive(Debug, Default)]
|
||||
struct MetaInformation {
|
||||
managed_paths: HashSet<PathBuf>,
|
||||
@@ -51,9 +51,9 @@ struct MetaInformation {
|
||||
/// that were created by tantivy.
|
||||
fn save_managed_paths(
|
||||
directory: &dyn Directory,
|
||||
wlock: &RwLockWriteGuard<'_, MetaInformation>,
|
||||
managed_paths: &HashSet<PathBuf>,
|
||||
) -> io::Result<()> {
|
||||
let mut w = serde_json::to_vec(&wlock.managed_paths)?;
|
||||
let mut w = serde_json::to_vec(managed_paths)?;
|
||||
writeln!(&mut w)?;
|
||||
directory.atomic_write(&MANAGED_FILEPATH, &w[..])?;
|
||||
Ok(())
|
||||
@@ -62,7 +62,12 @@ fn save_managed_paths(
|
||||
impl ManagedDirectory {
|
||||
/// Wraps a directory as managed directory.
|
||||
pub fn wrap(directory: Box<dyn Directory>) -> crate::Result<ManagedDirectory> {
|
||||
match directory.atomic_read(&MANAGED_FILEPATH) {
|
||||
Ok(ManagedDirectory { directory })
|
||||
}
|
||||
|
||||
#[allow(missing_docs)]
|
||||
pub fn get_managed_paths(&self) -> crate::Result<HashSet<PathBuf>> {
|
||||
match self.directory.atomic_read(&MANAGED_FILEPATH) {
|
||||
Ok(data) => {
|
||||
let managed_files_json = String::from_utf8_lossy(&data);
|
||||
let managed_files: HashSet<PathBuf> = serde_json::from_str(&managed_files_json)
|
||||
@@ -72,17 +77,9 @@ impl ManagedDirectory {
|
||||
format!("Managed file cannot be deserialized: {e:?}. "),
|
||||
)
|
||||
})?;
|
||||
Ok(ManagedDirectory {
|
||||
directory,
|
||||
meta_informations: Arc::new(RwLock::new(MetaInformation {
|
||||
managed_paths: managed_files,
|
||||
})),
|
||||
})
|
||||
Ok(managed_files)
|
||||
}
|
||||
Err(OpenReadError::FileDoesNotExist(_)) => Ok(ManagedDirectory {
|
||||
directory,
|
||||
meta_informations: Arc::default(),
|
||||
}),
|
||||
Err(OpenReadError::FileDoesNotExist(_)) => Ok(HashSet::new()),
|
||||
io_err @ Err(OpenReadError::IoError { .. }) => Err(io_err.err().unwrap().into()),
|
||||
Err(OpenReadError::IncompatibleIndex(incompatibility)) => {
|
||||
// For the moment, this should never happen `meta.json`
|
||||
@@ -110,9 +107,11 @@ impl ManagedDirectory {
|
||||
&mut self,
|
||||
get_living_files: L,
|
||||
) -> crate::Result<GarbageCollectionResult> {
|
||||
info!("Garbage collect");
|
||||
let mut files_to_delete = vec![];
|
||||
|
||||
// We're about to do an atomic write to managed.json, lock it down
|
||||
let _lock = self.acquire_lock(&MANAGED_LOCK)?;
|
||||
let managed_paths = self.get_managed_paths()?;
|
||||
// It is crucial to get the living files after acquiring the
|
||||
// read lock of meta information. That way, we
|
||||
// avoid the following scenario.
|
||||
@@ -124,11 +123,6 @@ impl ManagedDirectory {
|
||||
//
|
||||
// releasing the lock as .delete() will use it too.
|
||||
{
|
||||
let meta_informations_rlock = self
|
||||
.meta_informations
|
||||
.read()
|
||||
.expect("Managed directory rlock poisoned in garbage collect.");
|
||||
|
||||
// The point of this second "file" lock is to enforce the following scenario
|
||||
// 1) process B tries to load a new set of searcher.
|
||||
// The list of segments is loaded
|
||||
@@ -138,7 +132,7 @@ impl ManagedDirectory {
|
||||
match self.acquire_lock(&META_LOCK) {
|
||||
Ok(_meta_lock) => {
|
||||
let living_files = get_living_files();
|
||||
for managed_path in &meta_informations_rlock.managed_paths {
|
||||
for managed_path in &managed_paths {
|
||||
if !living_files.contains(managed_path) {
|
||||
files_to_delete.push(managed_path.clone());
|
||||
}
|
||||
@@ -181,16 +175,12 @@ impl ManagedDirectory {
|
||||
if !deleted_files.is_empty() {
|
||||
// update the list of managed files by removing
|
||||
// the file that were removed.
|
||||
let mut meta_informations_wlock = self
|
||||
.meta_informations
|
||||
.write()
|
||||
.expect("Managed directory wlock poisoned (2).");
|
||||
let managed_paths_write = &mut meta_informations_wlock.managed_paths;
|
||||
let mut managed_paths_write = managed_paths;
|
||||
for delete_file in &deleted_files {
|
||||
managed_paths_write.remove(delete_file);
|
||||
}
|
||||
self.directory.sync_directory()?;
|
||||
save_managed_paths(self.directory.as_mut(), &meta_informations_wlock)?;
|
||||
save_managed_paths(self.directory.as_mut(), &managed_paths_write)?;
|
||||
}
|
||||
|
||||
Ok(GarbageCollectionResult {
|
||||
@@ -215,15 +205,20 @@ impl ManagedDirectory {
|
||||
if !is_managed(filepath) {
|
||||
return Ok(());
|
||||
}
|
||||
let mut meta_wlock = self
|
||||
.meta_informations
|
||||
.write()
|
||||
.expect("Managed file lock poisoned");
|
||||
let has_changed = meta_wlock.managed_paths.insert(filepath.to_owned());
|
||||
|
||||
// We're about to do an atomic write to managed.json, lock it down
|
||||
let _lock = self
|
||||
.acquire_lock(&MANAGED_LOCK)
|
||||
.expect("must be able to acquire lock for managed.json");
|
||||
|
||||
let mut managed_paths = self
|
||||
.get_managed_paths()
|
||||
.expect("reading managed files should not fail");
|
||||
let has_changed = managed_paths.insert(filepath.to_owned());
|
||||
if !has_changed {
|
||||
return Ok(());
|
||||
}
|
||||
save_managed_paths(self.directory.as_ref(), &meta_wlock)?;
|
||||
save_managed_paths(self.directory.as_ref(), &managed_paths)?;
|
||||
// This is not the first file we add.
|
||||
// Therefore, we are sure that `.managed.json` has been already
|
||||
// properly created and we do not need to sync its parent directory.
|
||||
@@ -231,11 +226,12 @@ impl ManagedDirectory {
|
||||
// (It might seem like a nicer solution to create the managed_json on the
|
||||
// creation of the ManagedDirectory instance but it would actually
|
||||
// prevent the use of read-only directories..)
|
||||
let managed_file_definitely_already_exists = meta_wlock.managed_paths.len() > 1;
|
||||
let managed_file_definitely_already_exists = managed_paths.len() > 1;
|
||||
if managed_file_definitely_already_exists {
|
||||
return Ok(());
|
||||
}
|
||||
self.directory.sync_directory()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -258,13 +254,11 @@ impl ManagedDirectory {
|
||||
|
||||
/// List all managed files
|
||||
pub fn list_managed_files(&self) -> HashSet<PathBuf> {
|
||||
let managed_paths = self
|
||||
.meta_informations
|
||||
.read()
|
||||
.expect("Managed directory rlock poisoned in list damaged.")
|
||||
.managed_paths
|
||||
.clone();
|
||||
managed_paths
|
||||
let _lock = self
|
||||
.acquire_lock(&MANAGED_LOCK)
|
||||
.expect("must be able to acquire lock for managed.json");
|
||||
self.get_managed_paths()
|
||||
.expect("reading managed files should not fail")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -329,7 +323,6 @@ impl Clone for ManagedDirectory {
|
||||
fn clone(&self) -> ManagedDirectory {
|
||||
ManagedDirectory {
|
||||
directory: self.directory.box_clone(),
|
||||
meta_informations: Arc::clone(&self.meta_informations),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,7 +24,7 @@ pub use common::{AntiCallToken, OwnedBytes, TerminatingWrite};
|
||||
|
||||
pub(crate) use self::composite_file::{CompositeFile, CompositeWrite};
|
||||
pub use self::directory::{Directory, DirectoryClone, DirectoryLock};
|
||||
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
|
||||
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, MANAGED_LOCK, META_LOCK};
|
||||
pub use self::ram_directory::RamDirectory;
|
||||
pub use self::watch_event_router::{WatchCallback, WatchCallbackList, WatchHandle};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user