From 3adc85c017115e0f7fff2e6eb58b0664eb4f7fd1 Mon Sep 17 00:00:00 2001 From: Ming Ying Date: Wed, 13 Nov 2024 10:39:23 -0500 Subject: [PATCH] Directory trait can read/write meta/managed --- src/directory/directory.rs | 37 +++++++++ src/directory/managed_directory.rs | 127 +++++++++++++++++------------ src/index/index.rs | 42 ++++++---- src/index/index_meta.rs | 8 +- src/index/mod.rs | 5 +- src/indexer/segment_updater.rs | 33 +++++--- src/lib.rs | 1 - 7 files changed, 166 insertions(+), 87 deletions(-) diff --git a/src/directory/directory.rs b/src/directory/directory.rs index 0bc4b7f95..e26b5f98e 100644 --- a/src/directory/directory.rs +++ b/src/directory/directory.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; use std::io::Write; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -7,6 +8,8 @@ use std::{fmt, io, thread}; use crate::directory::directory_lock::Lock; use crate::directory::error::{DeleteError, LockError, OpenReadError, OpenWriteError}; use crate::directory::{FileHandle, FileSlice, WatchCallback, WatchHandle, WritePtr}; +use crate::index::SegmentMetaInventory; +use crate::IndexMeta; /// Retry the logic of acquiring locks is pretty simple. /// We just retry `n` times after a given `duratio`, both @@ -223,6 +226,40 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static { /// `OnCommitWithDelay` `ReloadPolicy`. Not implementing watch in a `Directory` only prevents /// the `OnCommitWithDelay` `ReloadPolicy` to work properly. fn watch(&self, watch_callback: WatchCallback) -> crate::Result; + + /// Allows the directory to list managed files, overriding the ManagedDirectory's default + /// list_managed_files + fn list_managed_files(&self) -> crate::Result> { + Err(crate::TantivyError::InternalError( + "list_managed_files not implemented".to_string(), + )) + } + + /// Allows the directory to register a file as managed, overriding the ManagedDirectory's + /// default register_file_as_managed + fn register_files_as_managed( + &self, + _files: Vec, + _overwrite: bool, + ) -> crate::Result<()> { + Err(crate::TantivyError::InternalError( + "register_files_as_managed not implemented".to_string(), + )) + } + + /// Allows the directory to save IndexMeta, overriding the SegmentUpdater's default save_meta + fn save_metas(&self, _meta: &IndexMeta) -> crate::Result<()> { + Err(crate::TantivyError::InternalError( + "save_meta not implemented".to_string(), + )) + } + + /// Allows the directory to load IndexMeta, overriding the SegmentUpdater's default load_meta + fn load_metas(&self, _inventory: &SegmentMetaInventory) -> crate::Result { + Err(crate::TantivyError::InternalError( + "load_metas not implemented".to_string(), + )) + } } /// DirectoryClone diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs index e2d5d3d60..5a52c9463 100644 --- a/src/directory/managed_directory.rs +++ b/src/directory/managed_directory.rs @@ -14,7 +14,8 @@ use crate::directory::{ WatchHandle, WritePtr, MANAGED_LOCK, META_LOCK, }; use crate::error::DataCorruption; -use crate::Directory; +use crate::index::SegmentMetaInventory; +use crate::{Directory, IndexMeta}; /// Returns true if the file is "managed". /// Non-managed file are not subject to garbage collection. @@ -65,27 +66,34 @@ impl ManagedDirectory { Ok(ManagedDirectory { directory }) } - #[allow(missing_docs)] - pub fn get_managed_paths(&self) -> crate::Result> { - match self.directory.atomic_read(&MANAGED_FILEPATH) { - Ok(data) => { - let managed_files_json = String::from_utf8_lossy(&data); - let managed_files: HashSet = serde_json::from_str(&managed_files_json) - .map_err(|e| { - DataCorruption::new( - MANAGED_FILEPATH.to_path_buf(), - format!("Managed file cannot be deserialized: {e:?}. "), - ) - })?; - Ok(managed_files) - } - Err(OpenReadError::FileDoesNotExist(_)) => Ok(HashSet::new()), - io_err @ Err(OpenReadError::IoError { .. }) => Err(io_err.err().unwrap().into()), - Err(OpenReadError::IncompatibleIndex(incompatibility)) => { - // For the moment, this should never happen `meta.json` - // do not have any footer and cannot detect incompatibility. - Err(crate::TantivyError::IncompatibleIndex(incompatibility)) + pub fn list_managed_files(&self) -> crate::Result> { + match self.directory.list_managed_files() { + Ok(managed_files) => Ok(managed_files), + Err(crate::TantivyError::InternalError(_)) => { + match self.directory.atomic_read(&MANAGED_FILEPATH) { + Ok(data) => { + let managed_files_json = String::from_utf8_lossy(&data); + let managed_files: HashSet = + serde_json::from_str(&managed_files_json).map_err(|e| { + DataCorruption::new( + MANAGED_FILEPATH.to_path_buf(), + format!("Managed file cannot be deserialized: {e:?}. "), + ) + })?; + Ok(managed_files) + } + Err(OpenReadError::FileDoesNotExist(_)) => Ok(HashSet::new()), + io_err @ Err(OpenReadError::IoError { .. }) => { + Err(io_err.err().unwrap().into()) + } + Err(OpenReadError::IncompatibleIndex(incompatibility)) => { + // For the moment, this should never happen `meta.json` + // do not have any footer and cannot detect incompatibility. + Err(crate::TantivyError::IncompatibleIndex(incompatibility)) + } + } } + Err(err) => Err(err), } } @@ -111,7 +119,15 @@ impl ManagedDirectory { // We're about to do an atomic write to managed.json, lock it down let _lock = self.acquire_lock(&MANAGED_LOCK)?; - let managed_paths = self.get_managed_paths()?; + let managed_paths = match self.directory.list_managed_files() { + Ok(managed_paths) => managed_paths, + Err(crate::TantivyError::InternalError(_)) => { + // If the managed.json file does not exist, we consider + // that there is no managed file. + self.list_managed_files()? + } + Err(err) => return Err(err), + }; // It is crucial to get the living files after acquiring the // read lock of meta information. That way, we // avoid the following scenario. @@ -180,7 +196,13 @@ impl ManagedDirectory { managed_paths_write.remove(delete_file); } self.directory.sync_directory()?; - save_managed_paths(self.directory.as_mut(), &managed_paths_write)?; + + if let Err(crate::TantivyError::InternalError(_)) = self + .directory + .register_files_as_managed(managed_paths_write.clone().into_iter().collect(), true) + { + save_managed_paths(self.directory.as_mut(), &managed_paths_write)?; + } } Ok(GarbageCollectionResult { @@ -211,25 +233,31 @@ impl ManagedDirectory { .acquire_lock(&MANAGED_LOCK) .expect("must be able to acquire lock for managed.json"); - let mut managed_paths = self - .get_managed_paths() - .expect("reading managed files should not fail"); - let has_changed = managed_paths.insert(filepath.to_owned()); - if !has_changed { - return Ok(()); - } - save_managed_paths(self.directory.as_ref(), &managed_paths)?; - // This is not the first file we add. - // Therefore, we are sure that `.managed.json` has been already - // properly created and we do not need to sync its parent directory. - // - // (It might seem like a nicer solution to create the managed_json on the - // creation of the ManagedDirectory instance but it would actually - // prevent the use of read-only directories..) - let managed_file_definitely_already_exists = managed_paths.len() > 1; - if managed_file_definitely_already_exists { - return Ok(()); + if let Err(crate::TantivyError::InternalError(_)) = self + .directory + .register_files_as_managed(vec![filepath.to_owned()], false) + { + let mut managed_paths = self + .list_managed_files() + .expect("reading managed files should not fail"); + let has_changed = managed_paths.insert(filepath.to_owned()); + if !has_changed { + return Ok(()); + } + save_managed_paths(self.directory.as_ref(), &managed_paths)?; + // This is not the first file we add. + // Therefore, we are sure that `.managed.json` has been already + // properly created and we do not need to sync its parent directory. + // + // (It might seem like a nicer solution to create the managed_json on the + // creation of the ManagedDirectory instance but it would actually + // prevent the use of read-only directories..) + let managed_file_definitely_already_exists = managed_paths.len() > 1; + if managed_file_definitely_already_exists { + return Ok(()); + } } + self.directory.sync_directory()?; Ok(()) @@ -251,15 +279,6 @@ impl ManagedDirectory { let crc = hasher.finalize(); Ok(footer.crc() == crc) } - - /// List all managed files - pub fn list_managed_files(&self) -> HashSet { - let _lock = self - .acquire_lock(&MANAGED_LOCK) - .expect("must be able to acquire lock for managed.json"); - self.get_managed_paths() - .expect("reading managed files should not fail") - } } impl Directory for ManagedDirectory { @@ -317,6 +336,14 @@ impl Directory for ManagedDirectory { self.directory.sync_directory()?; Ok(()) } + + fn save_metas(&self, metas: &IndexMeta) -> crate::Result<()> { + self.directory.save_metas(metas) + } + + fn load_metas(&self, inventory: &SegmentMetaInventory) -> crate::Result { + self.directory.load_metas(inventory) + } } impl Clone for ManagedDirectory { diff --git a/src/index/index.rs b/src/index/index.rs index 5495ddced..98ee33b3a 100644 --- a/src/index/index.rs +++ b/src/index/index.rs @@ -30,22 +30,30 @@ fn load_metas( directory: &dyn Directory, inventory: &SegmentMetaInventory, ) -> crate::Result { - let meta_data = directory.atomic_read(&META_FILEPATH)?; - let meta_string = String::from_utf8(meta_data).map_err(|_utf8_err| { - error!("Meta data is not valid utf8."); - DataCorruption::new( - META_FILEPATH.to_path_buf(), - "Meta file does not contain valid utf8 file.".to_string(), - ) - })?; - IndexMeta::deserialize(&meta_string, inventory) - .map_err(|e| { - DataCorruption::new( - META_FILEPATH.to_path_buf(), - format!("Meta file cannot be deserialized. {e:?}. Content: {meta_string:?}"), - ) - }) - .map_err(From::from) + match directory.load_metas(inventory) { + Ok(metas) => Ok(metas), + Err(crate::TantivyError::InternalError(_)) => { + let meta_data = directory.atomic_read(&META_FILEPATH)?; + let meta_string = String::from_utf8(meta_data).map_err(|_utf8_err| { + error!("Meta data is not valid utf8."); + DataCorruption::new( + META_FILEPATH.to_path_buf(), + "Meta file does not contain valid utf8 file.".to_string(), + ) + })?; + IndexMeta::deserialize(&meta_string, inventory) + .map_err(|e| { + DataCorruption::new( + META_FILEPATH.to_path_buf(), + format!( + "Meta file cannot be deserialized. {e:?}. Content: {meta_string:?}" + ), + ) + }) + .map_err(From::from) + } + Err(err) => Err(err), + } } /// Save the index meta file. @@ -688,7 +696,7 @@ impl Index { /// Returns the set of corrupted files pub fn validate_checksum(&self) -> crate::Result> { - let managed_files = self.directory.list_managed_files(); + let managed_files = self.directory.list_managed_files()?; let active_segments_files: HashSet = self .searchable_segment_metas()? .iter() diff --git a/src/index/index_meta.rs b/src/index/index_meta.rs index 86eaa35d6..3ef315e96 100644 --- a/src/index/index_meta.rs +++ b/src/index/index_meta.rs @@ -19,7 +19,7 @@ struct DeleteMeta { } #[derive(Clone, Default)] -pub(crate) struct SegmentMetaInventory { +pub struct SegmentMetaInventory { inventory: Inventory, } @@ -50,7 +50,7 @@ impl SegmentMetaInventory { /// how many are deleted, etc. #[derive(Clone)] pub struct SegmentMeta { - tracked: TrackedObject, + pub tracked: TrackedObject, } impl fmt::Debug for SegmentMeta { @@ -210,8 +210,8 @@ impl SegmentMeta { } #[derive(Debug, Clone, Serialize, Deserialize)] -struct InnerSegmentMeta { - segment_id: SegmentId, +pub struct InnerSegmentMeta { + pub segment_id: SegmentId, max_doc: u32, deletes: Option, /// If you want to avoid the SegmentComponent::TempStore file to be covered by diff --git a/src/index/mod.rs b/src/index/mod.rs index 76dc3ed9b..d8a4d032f 100644 --- a/src/index/mod.rs +++ b/src/index/mod.rs @@ -11,8 +11,9 @@ mod segment_id; mod segment_reader; pub use self::index::{Index, IndexBuilder}; -pub(crate) use self::index_meta::SegmentMetaInventory; -pub use self::index_meta::{IndexMeta, IndexSettings, Order, SegmentMeta}; +pub use self::index_meta::{ + IndexMeta, IndexSettings, InnerSegmentMeta, Order, SegmentMeta, SegmentMetaInventory, +}; pub use self::inverted_index_reader::InvertedIndexReader; pub use self::segment::Segment; pub use self::segment_component::SegmentComponent; diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index bc941d1ed..875913d83 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -37,19 +37,26 @@ const PANIC_CAUGHT: &str = "Panic caught in merge thread"; /// This method is not part of tantivy's public API pub(crate) fn save_metas(metas: &IndexMeta, directory: &dyn Directory) -> crate::Result<()> { info!("save metas"); - let mut buffer = serde_json::to_vec_pretty(metas)?; - // Just adding a new line at the end of the buffer. - writeln!(&mut buffer)?; - crate::fail_point!("save_metas", |msg| Err(crate::TantivyError::from( - std::io::Error::new( - std::io::ErrorKind::Other, - msg.unwrap_or_else(|| "Undefined".to_string()) - ) - ))); - directory.sync_directory()?; - directory.atomic_write(&META_FILEPATH, &buffer[..])?; - debug!("Saved metas {:?}", serde_json::to_string_pretty(&metas)); - Ok(()) + + match directory.save_metas(metas) { + Ok(_) => Ok(()), + Err(crate::TantivyError::InternalError(_)) => { + let mut buffer = serde_json::to_vec_pretty(metas)?; + // Just adding a new line at the end of the buffer. + writeln!(&mut buffer)?; + crate::fail_point!("save_metas", |msg| Err(crate::TantivyError::from( + std::io::Error::new( + std::io::ErrorKind::Other, + msg.unwrap_or_else(|| "Undefined".to_string()) + ) + ))); + directory.sync_directory()?; + directory.atomic_write(&META_FILEPATH, &buffer[..])?; + debug!("Saved metas {:?}", serde_json::to_string_pretty(&metas)); + Ok(()) + } + Err(e) => Err(e), + } } // The segment update runner is in charge of processing all diff --git a/src/lib.rs b/src/lib.rs index 22eab343a..7b091f7d2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,6 @@ #![doc(html_logo_url = "http://fulmicoton.com/tantivy-logo/tantivy-logo.png")] #![cfg_attr(all(feature = "unstable", test), feature(test))] #![doc(test(attr(allow(unused_variables), deny(warnings))))] -#![warn(missing_docs)] #![allow( clippy::len_without_is_empty, clippy::derive_partial_eq_without_eq,