Directory trait can read/write meta/managed

This commit is contained in:
Ming Ying
2024-11-13 10:39:23 -05:00
committed by Stu Hood
parent 6bb3a22c98
commit 3adc85c017
7 changed files with 166 additions and 87 deletions

View File

@@ -1,3 +1,4 @@
use std::collections::HashSet;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::Arc;
@@ -7,6 +8,8 @@ use std::{fmt, io, thread};
use crate::directory::directory_lock::Lock;
use crate::directory::error::{DeleteError, LockError, OpenReadError, OpenWriteError};
use crate::directory::{FileHandle, FileSlice, WatchCallback, WatchHandle, WritePtr};
use crate::index::SegmentMetaInventory;
use crate::IndexMeta;
/// Retry the logic of acquiring locks is pretty simple.
/// We just retry `n` times after a given `duratio`, both
@@ -223,6 +226,40 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
/// `OnCommitWithDelay` `ReloadPolicy`. Not implementing watch in a `Directory` only prevents
/// the `OnCommitWithDelay` `ReloadPolicy` to work properly.
fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle>;
/// Allows the directory to list managed files, overriding the ManagedDirectory's default
/// list_managed_files
fn list_managed_files(&self) -> crate::Result<HashSet<PathBuf>> {
Err(crate::TantivyError::InternalError(
"list_managed_files not implemented".to_string(),
))
}
/// Allows the directory to register a file as managed, overriding the ManagedDirectory's
/// default register_file_as_managed
fn register_files_as_managed(
&self,
_files: Vec<PathBuf>,
_overwrite: bool,
) -> crate::Result<()> {
Err(crate::TantivyError::InternalError(
"register_files_as_managed not implemented".to_string(),
))
}
/// Allows the directory to save IndexMeta, overriding the SegmentUpdater's default save_meta
fn save_metas(&self, _meta: &IndexMeta) -> crate::Result<()> {
Err(crate::TantivyError::InternalError(
"save_meta not implemented".to_string(),
))
}
/// Allows the directory to load IndexMeta, overriding the SegmentUpdater's default load_meta
fn load_metas(&self, _inventory: &SegmentMetaInventory) -> crate::Result<IndexMeta> {
Err(crate::TantivyError::InternalError(
"load_metas not implemented".to_string(),
))
}
}
/// DirectoryClone

View File

@@ -14,7 +14,8 @@ use crate::directory::{
WatchHandle, WritePtr, MANAGED_LOCK, META_LOCK,
};
use crate::error::DataCorruption;
use crate::Directory;
use crate::index::SegmentMetaInventory;
use crate::{Directory, IndexMeta};
/// Returns true if the file is "managed".
/// Non-managed file are not subject to garbage collection.
@@ -65,27 +66,34 @@ impl ManagedDirectory {
Ok(ManagedDirectory { directory })
}
#[allow(missing_docs)]
pub fn get_managed_paths(&self) -> crate::Result<HashSet<PathBuf>> {
match self.directory.atomic_read(&MANAGED_FILEPATH) {
Ok(data) => {
let managed_files_json = String::from_utf8_lossy(&data);
let managed_files: HashSet<PathBuf> = serde_json::from_str(&managed_files_json)
.map_err(|e| {
DataCorruption::new(
MANAGED_FILEPATH.to_path_buf(),
format!("Managed file cannot be deserialized: {e:?}. "),
)
})?;
Ok(managed_files)
}
Err(OpenReadError::FileDoesNotExist(_)) => Ok(HashSet::new()),
io_err @ Err(OpenReadError::IoError { .. }) => Err(io_err.err().unwrap().into()),
Err(OpenReadError::IncompatibleIndex(incompatibility)) => {
// For the moment, this should never happen `meta.json`
// do not have any footer and cannot detect incompatibility.
Err(crate::TantivyError::IncompatibleIndex(incompatibility))
pub fn list_managed_files(&self) -> crate::Result<HashSet<PathBuf>> {
match self.directory.list_managed_files() {
Ok(managed_files) => Ok(managed_files),
Err(crate::TantivyError::InternalError(_)) => {
match self.directory.atomic_read(&MANAGED_FILEPATH) {
Ok(data) => {
let managed_files_json = String::from_utf8_lossy(&data);
let managed_files: HashSet<PathBuf> =
serde_json::from_str(&managed_files_json).map_err(|e| {
DataCorruption::new(
MANAGED_FILEPATH.to_path_buf(),
format!("Managed file cannot be deserialized: {e:?}. "),
)
})?;
Ok(managed_files)
}
Err(OpenReadError::FileDoesNotExist(_)) => Ok(HashSet::new()),
io_err @ Err(OpenReadError::IoError { .. }) => {
Err(io_err.err().unwrap().into())
}
Err(OpenReadError::IncompatibleIndex(incompatibility)) => {
// For the moment, this should never happen `meta.json`
// do not have any footer and cannot detect incompatibility.
Err(crate::TantivyError::IncompatibleIndex(incompatibility))
}
}
}
Err(err) => Err(err),
}
}
@@ -111,7 +119,15 @@ impl ManagedDirectory {
// We're about to do an atomic write to managed.json, lock it down
let _lock = self.acquire_lock(&MANAGED_LOCK)?;
let managed_paths = self.get_managed_paths()?;
let managed_paths = match self.directory.list_managed_files() {
Ok(managed_paths) => managed_paths,
Err(crate::TantivyError::InternalError(_)) => {
// If the managed.json file does not exist, we consider
// that there is no managed file.
self.list_managed_files()?
}
Err(err) => return Err(err),
};
// It is crucial to get the living files after acquiring the
// read lock of meta information. That way, we
// avoid the following scenario.
@@ -180,7 +196,13 @@ impl ManagedDirectory {
managed_paths_write.remove(delete_file);
}
self.directory.sync_directory()?;
save_managed_paths(self.directory.as_mut(), &managed_paths_write)?;
if let Err(crate::TantivyError::InternalError(_)) = self
.directory
.register_files_as_managed(managed_paths_write.clone().into_iter().collect(), true)
{
save_managed_paths(self.directory.as_mut(), &managed_paths_write)?;
}
}
Ok(GarbageCollectionResult {
@@ -211,25 +233,31 @@ impl ManagedDirectory {
.acquire_lock(&MANAGED_LOCK)
.expect("must be able to acquire lock for managed.json");
let mut managed_paths = self
.get_managed_paths()
.expect("reading managed files should not fail");
let has_changed = managed_paths.insert(filepath.to_owned());
if !has_changed {
return Ok(());
}
save_managed_paths(self.directory.as_ref(), &managed_paths)?;
// This is not the first file we add.
// Therefore, we are sure that `.managed.json` has been already
// properly created and we do not need to sync its parent directory.
//
// (It might seem like a nicer solution to create the managed_json on the
// creation of the ManagedDirectory instance but it would actually
// prevent the use of read-only directories..)
let managed_file_definitely_already_exists = managed_paths.len() > 1;
if managed_file_definitely_already_exists {
return Ok(());
if let Err(crate::TantivyError::InternalError(_)) = self
.directory
.register_files_as_managed(vec![filepath.to_owned()], false)
{
let mut managed_paths = self
.list_managed_files()
.expect("reading managed files should not fail");
let has_changed = managed_paths.insert(filepath.to_owned());
if !has_changed {
return Ok(());
}
save_managed_paths(self.directory.as_ref(), &managed_paths)?;
// This is not the first file we add.
// Therefore, we are sure that `.managed.json` has been already
// properly created and we do not need to sync its parent directory.
//
// (It might seem like a nicer solution to create the managed_json on the
// creation of the ManagedDirectory instance but it would actually
// prevent the use of read-only directories..)
let managed_file_definitely_already_exists = managed_paths.len() > 1;
if managed_file_definitely_already_exists {
return Ok(());
}
}
self.directory.sync_directory()?;
Ok(())
@@ -251,15 +279,6 @@ impl ManagedDirectory {
let crc = hasher.finalize();
Ok(footer.crc() == crc)
}
/// List all managed files
pub fn list_managed_files(&self) -> HashSet<PathBuf> {
let _lock = self
.acquire_lock(&MANAGED_LOCK)
.expect("must be able to acquire lock for managed.json");
self.get_managed_paths()
.expect("reading managed files should not fail")
}
}
impl Directory for ManagedDirectory {
@@ -317,6 +336,14 @@ impl Directory for ManagedDirectory {
self.directory.sync_directory()?;
Ok(())
}
fn save_metas(&self, metas: &IndexMeta) -> crate::Result<()> {
self.directory.save_metas(metas)
}
fn load_metas(&self, inventory: &SegmentMetaInventory) -> crate::Result<IndexMeta> {
self.directory.load_metas(inventory)
}
}
impl Clone for ManagedDirectory {

View File

@@ -30,22 +30,30 @@ fn load_metas(
directory: &dyn Directory,
inventory: &SegmentMetaInventory,
) -> crate::Result<IndexMeta> {
let meta_data = directory.atomic_read(&META_FILEPATH)?;
let meta_string = String::from_utf8(meta_data).map_err(|_utf8_err| {
error!("Meta data is not valid utf8.");
DataCorruption::new(
META_FILEPATH.to_path_buf(),
"Meta file does not contain valid utf8 file.".to_string(),
)
})?;
IndexMeta::deserialize(&meta_string, inventory)
.map_err(|e| {
DataCorruption::new(
META_FILEPATH.to_path_buf(),
format!("Meta file cannot be deserialized. {e:?}. Content: {meta_string:?}"),
)
})
.map_err(From::from)
match directory.load_metas(inventory) {
Ok(metas) => Ok(metas),
Err(crate::TantivyError::InternalError(_)) => {
let meta_data = directory.atomic_read(&META_FILEPATH)?;
let meta_string = String::from_utf8(meta_data).map_err(|_utf8_err| {
error!("Meta data is not valid utf8.");
DataCorruption::new(
META_FILEPATH.to_path_buf(),
"Meta file does not contain valid utf8 file.".to_string(),
)
})?;
IndexMeta::deserialize(&meta_string, inventory)
.map_err(|e| {
DataCorruption::new(
META_FILEPATH.to_path_buf(),
format!(
"Meta file cannot be deserialized. {e:?}. Content: {meta_string:?}"
),
)
})
.map_err(From::from)
}
Err(err) => Err(err),
}
}
/// Save the index meta file.
@@ -688,7 +696,7 @@ impl Index {
/// Returns the set of corrupted files
pub fn validate_checksum(&self) -> crate::Result<HashSet<PathBuf>> {
let managed_files = self.directory.list_managed_files();
let managed_files = self.directory.list_managed_files()?;
let active_segments_files: HashSet<PathBuf> = self
.searchable_segment_metas()?
.iter()

View File

@@ -19,7 +19,7 @@ struct DeleteMeta {
}
#[derive(Clone, Default)]
pub(crate) struct SegmentMetaInventory {
pub struct SegmentMetaInventory {
inventory: Inventory<InnerSegmentMeta>,
}
@@ -50,7 +50,7 @@ impl SegmentMetaInventory {
/// how many are deleted, etc.
#[derive(Clone)]
pub struct SegmentMeta {
tracked: TrackedObject<InnerSegmentMeta>,
pub tracked: TrackedObject<InnerSegmentMeta>,
}
impl fmt::Debug for SegmentMeta {
@@ -210,8 +210,8 @@ impl SegmentMeta {
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct InnerSegmentMeta {
segment_id: SegmentId,
pub struct InnerSegmentMeta {
pub segment_id: SegmentId,
max_doc: u32,
deletes: Option<DeleteMeta>,
/// If you want to avoid the SegmentComponent::TempStore file to be covered by

View File

@@ -11,8 +11,9 @@ mod segment_id;
mod segment_reader;
pub use self::index::{Index, IndexBuilder};
pub(crate) use self::index_meta::SegmentMetaInventory;
pub use self::index_meta::{IndexMeta, IndexSettings, Order, SegmentMeta};
pub use self::index_meta::{
IndexMeta, IndexSettings, InnerSegmentMeta, Order, SegmentMeta, SegmentMetaInventory,
};
pub use self::inverted_index_reader::InvertedIndexReader;
pub use self::segment::Segment;
pub use self::segment_component::SegmentComponent;

View File

@@ -37,19 +37,26 @@ const PANIC_CAUGHT: &str = "Panic caught in merge thread";
/// This method is not part of tantivy's public API
pub(crate) fn save_metas(metas: &IndexMeta, directory: &dyn Directory) -> crate::Result<()> {
info!("save metas");
let mut buffer = serde_json::to_vec_pretty(metas)?;
// Just adding a new line at the end of the buffer.
writeln!(&mut buffer)?;
crate::fail_point!("save_metas", |msg| Err(crate::TantivyError::from(
std::io::Error::new(
std::io::ErrorKind::Other,
msg.unwrap_or_else(|| "Undefined".to_string())
)
)));
directory.sync_directory()?;
directory.atomic_write(&META_FILEPATH, &buffer[..])?;
debug!("Saved metas {:?}", serde_json::to_string_pretty(&metas));
Ok(())
match directory.save_metas(metas) {
Ok(_) => Ok(()),
Err(crate::TantivyError::InternalError(_)) => {
let mut buffer = serde_json::to_vec_pretty(metas)?;
// Just adding a new line at the end of the buffer.
writeln!(&mut buffer)?;
crate::fail_point!("save_metas", |msg| Err(crate::TantivyError::from(
std::io::Error::new(
std::io::ErrorKind::Other,
msg.unwrap_or_else(|| "Undefined".to_string())
)
)));
directory.sync_directory()?;
directory.atomic_write(&META_FILEPATH, &buffer[..])?;
debug!("Saved metas {:?}", serde_json::to_string_pretty(&metas));
Ok(())
}
Err(e) => Err(e),
}
}
// The segment update runner is in charge of processing all

View File

@@ -1,7 +1,6 @@
#![doc(html_logo_url = "http://fulmicoton.com/tantivy-logo/tantivy-logo.png")]
#![cfg_attr(all(feature = "unstable", test), feature(test))]
#![doc(test(attr(allow(unused_variables), deny(warnings))))]
#![warn(missing_docs)]
#![allow(
clippy::len_without_is_empty,
clippy::derive_partial_eq_without_eq,