Fix managed paths (#5)

This commit is contained in:
Ming
2024-10-23 22:18:34 -04:00
committed by Philippe Noël
parent 17d366eb51
commit feab1647a2
6 changed files with 54 additions and 63 deletions

View File

@@ -58,3 +58,9 @@ pub static META_LOCK: Lazy<Lock> = Lazy::new(|| Lock {
filepath: PathBuf::from(".tantivy-meta.lock"),
is_blocking: true,
});
#[allow(missing_docs)]
pub static MANAGED_LOCK: Lazy<Lock> = Lazy::new(|| Lock {
filepath: PathBuf::from(".tantivy-managed.lock"),
is_blocking: true,
});

View File

@@ -1,7 +1,7 @@
use std::collections::HashSet;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock, RwLockWriteGuard};
use std::sync::Arc;
use std::{io, result};
use crc32fast::Hasher;
@@ -11,7 +11,7 @@ use crate::directory::error::{DeleteError, LockError, OpenReadError, OpenWriteEr
use crate::directory::footer::{Footer, FooterProxy};
use crate::directory::{
DirectoryLock, FileHandle, FileSlice, GarbageCollectionResult, Lock, WatchCallback,
WatchHandle, WritePtr, META_LOCK,
WatchHandle, WritePtr, MANAGED_LOCK, META_LOCK,
};
use crate::error::DataCorruption;
use crate::Directory;
@@ -39,9 +39,9 @@ fn is_managed(path: &Path) -> bool {
#[derive(Debug)]
pub struct ManagedDirectory {
directory: Box<dyn Directory>,
meta_informations: Arc<RwLock<MetaInformation>>,
}
#[allow(unused)]
#[derive(Debug, Default)]
struct MetaInformation {
managed_paths: HashSet<PathBuf>,
@@ -51,9 +51,9 @@ struct MetaInformation {
/// that were created by tantivy.
fn save_managed_paths(
directory: &dyn Directory,
wlock: &RwLockWriteGuard<'_, MetaInformation>,
managed_paths: &HashSet<PathBuf>,
) -> io::Result<()> {
let mut w = serde_json::to_vec(&wlock.managed_paths)?;
let mut w = serde_json::to_vec(managed_paths)?;
writeln!(&mut w)?;
directory.atomic_write(&MANAGED_FILEPATH, &w[..])?;
Ok(())
@@ -62,7 +62,12 @@ fn save_managed_paths(
impl ManagedDirectory {
/// Wraps a directory as managed directory.
pub fn wrap(directory: Box<dyn Directory>) -> crate::Result<ManagedDirectory> {
match directory.atomic_read(&MANAGED_FILEPATH) {
Ok(ManagedDirectory { directory })
}
#[allow(missing_docs)]
pub fn get_managed_paths(&self) -> crate::Result<HashSet<PathBuf>> {
match self.directory.atomic_read(&MANAGED_FILEPATH) {
Ok(data) => {
let managed_files_json = String::from_utf8_lossy(&data);
let managed_files: HashSet<PathBuf> = serde_json::from_str(&managed_files_json)
@@ -72,17 +77,9 @@ impl ManagedDirectory {
format!("Managed file cannot be deserialized: {e:?}. "),
)
})?;
Ok(ManagedDirectory {
directory,
meta_informations: Arc::new(RwLock::new(MetaInformation {
managed_paths: managed_files,
})),
})
Ok(managed_files)
}
Err(OpenReadError::FileDoesNotExist(_)) => Ok(ManagedDirectory {
directory,
meta_informations: Arc::default(),
}),
Err(OpenReadError::FileDoesNotExist(_)) => Ok(HashSet::new()),
io_err @ Err(OpenReadError::IoError { .. }) => Err(io_err.err().unwrap().into()),
Err(OpenReadError::IncompatibleIndex(incompatibility)) => {
// For the moment, this should never happen `meta.json`
@@ -110,9 +107,11 @@ impl ManagedDirectory {
&mut self,
get_living_files: L,
) -> crate::Result<GarbageCollectionResult> {
info!("Garbage collect");
let mut files_to_delete = vec![];
// We're about to do an atomic write to managed.json, lock it down
let _lock = self.acquire_lock(&MANAGED_LOCK)?;
let managed_paths = self.get_managed_paths()?;
// It is crucial to get the living files after acquiring the
// read lock of meta information. That way, we
// avoid the following scenario.
@@ -124,11 +123,6 @@ impl ManagedDirectory {
//
// releasing the lock as .delete() will use it too.
{
let meta_informations_rlock = self
.meta_informations
.read()
.expect("Managed directory rlock poisoned in garbage collect.");
// The point of this second "file" lock is to enforce the following scenario
// 1) process B tries to load a new set of searcher.
// The list of segments is loaded
@@ -138,7 +132,7 @@ impl ManagedDirectory {
match self.acquire_lock(&META_LOCK) {
Ok(_meta_lock) => {
let living_files = get_living_files();
for managed_path in &meta_informations_rlock.managed_paths {
for managed_path in &managed_paths {
if !living_files.contains(managed_path) {
files_to_delete.push(managed_path.clone());
}
@@ -181,16 +175,12 @@ impl ManagedDirectory {
if !deleted_files.is_empty() {
// update the list of managed files by removing
// the file that were removed.
let mut meta_informations_wlock = self
.meta_informations
.write()
.expect("Managed directory wlock poisoned (2).");
let managed_paths_write = &mut meta_informations_wlock.managed_paths;
let mut managed_paths_write = managed_paths;
for delete_file in &deleted_files {
managed_paths_write.remove(delete_file);
}
self.directory.sync_directory()?;
save_managed_paths(self.directory.as_mut(), &meta_informations_wlock)?;
save_managed_paths(self.directory.as_mut(), &managed_paths_write)?;
}
Ok(GarbageCollectionResult {
@@ -215,15 +205,20 @@ impl ManagedDirectory {
if !is_managed(filepath) {
return Ok(());
}
let mut meta_wlock = self
.meta_informations
.write()
.expect("Managed file lock poisoned");
let has_changed = meta_wlock.managed_paths.insert(filepath.to_owned());
// We're about to do an atomic write to managed.json, lock it down
let _lock = self
.acquire_lock(&MANAGED_LOCK)
.expect("must be able to acquire lock for managed.json");
let mut managed_paths = self
.get_managed_paths()
.expect("reading managed files should not fail");
let has_changed = managed_paths.insert(filepath.to_owned());
if !has_changed {
return Ok(());
}
save_managed_paths(self.directory.as_ref(), &meta_wlock)?;
save_managed_paths(self.directory.as_ref(), &managed_paths)?;
// This is not the first file we add.
// Therefore, we are sure that `.managed.json` has been already
// properly created and we do not need to sync its parent directory.
@@ -231,11 +226,12 @@ impl ManagedDirectory {
// (It might seem like a nicer solution to create the managed_json on the
// creation of the ManagedDirectory instance but it would actually
// prevent the use of read-only directories..)
let managed_file_definitely_already_exists = meta_wlock.managed_paths.len() > 1;
let managed_file_definitely_already_exists = managed_paths.len() > 1;
if managed_file_definitely_already_exists {
return Ok(());
}
self.directory.sync_directory()?;
Ok(())
}
@@ -258,13 +254,11 @@ impl ManagedDirectory {
/// List all managed files
pub fn list_managed_files(&self) -> HashSet<PathBuf> {
let managed_paths = self
.meta_informations
.read()
.expect("Managed directory rlock poisoned in list damaged.")
.managed_paths
.clone();
managed_paths
let _lock = self
.acquire_lock(&MANAGED_LOCK)
.expect("must be able to acquire lock for managed.json");
self.get_managed_paths()
.expect("reading managed files should not fail")
}
}
@@ -329,7 +323,6 @@ impl Clone for ManagedDirectory {
fn clone(&self) -> ManagedDirectory {
ManagedDirectory {
directory: self.directory.box_clone(),
meta_informations: Arc::clone(&self.meta_informations),
}
}
}

View File

@@ -24,7 +24,7 @@ pub use common::{AntiCallToken, OwnedBytes, TerminatingWrite};
pub(crate) use self::composite_file::{CompositeFile, CompositeWrite};
pub use self::directory::{Directory, DirectoryClone, DirectoryLock};
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, MANAGED_LOCK, META_LOCK};
pub use self::ram_directory::RamDirectory;
pub use self::watch_event_router::{WatchCallback, WatchCallbackList, WatchHandle};

View File

@@ -11,16 +11,14 @@ use crate::termdict::TermOrdinal;
/// `TermStreamerBuilder` is a helper object used to define
/// a range of terms that should be streamed.
pub struct TermStreamerBuilder<'a, A = AlwaysMatch>
where
A: Automaton,
where A: Automaton
{
fst_map: &'a TermDictionary,
stream_builder: StreamBuilder<'a, A>,
}
impl<'a, A> TermStreamerBuilder<'a, A>
where
A: Automaton,
where A: Automaton
{
pub(crate) fn new(fst_map: &'a TermDictionary, stream_builder: StreamBuilder<'a, A>) -> Self {
TermStreamerBuilder {
@@ -75,8 +73,7 @@ where
/// `TermStreamer` acts as a cursor over a range of terms of a segment.
/// Terms are guaranteed to be sorted.
pub struct TermStreamer<'a, A = AlwaysMatch>
where
A: Automaton,
where A: Automaton
{
pub(crate) fst_map: &'a TermDictionary,
pub(crate) stream: Stream<'a, A>,

View File

@@ -28,8 +28,7 @@ pub struct TermDictionaryBuilder<W> {
}
impl<W> TermDictionaryBuilder<W>
where
W: Write,
where W: Write
{
/// Creates a new `TermDictionaryBuilder`
pub fn create(w: W) -> io::Result<Self> {

View File

@@ -40,14 +40,12 @@ use common::file_slice::FileSlice;
use common::BinarySerializable;
use tantivy_fst::Automaton;
pub use self::termdict::{TermMerger, TermStreamer, TermWithStateStreamer};
use self::{
fst_termdict::TermWithStateStreamerBuilder,
termdict::{
TermDictionary as InnerTermDict, TermDictionaryBuilder as InnerTermDictBuilder,
TermStreamerBuilder,
},
use self::fst_termdict::TermWithStateStreamerBuilder;
use self::termdict::{
TermDictionary as InnerTermDict, TermDictionaryBuilder as InnerTermDictBuilder,
TermStreamerBuilder,
};
pub use self::termdict::{TermMerger, TermStreamer, TermWithStateStreamer};
use crate::postings::TermInfo;
#[derive(Debug, Eq, PartialEq)]
@@ -161,9 +159,7 @@ impl TermDictionary {
/// Returns a search builder, to stream all of the terms
/// within the Automaton
pub fn search<'a, A: Automaton + 'a>(&'a self, automaton: A) -> TermStreamerBuilder<'a, A>
where
A::State: Clone,
{
where A::State: Clone {
self.0.search(automaton)
}