From 4b7afa2ae71a3ba2b4fc6369736f0e121bb8be21 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 3 Mar 2017 22:41:30 +0900 Subject: [PATCH] issue/77 Added managed directory --- src/core/index.rs | 76 ++++++---------------------- src/core/mod.rs | 1 + src/core/segment.rs | 14 +----- src/core/segment_component.rs | 19 ++++++- src/core/segment_meta.rs | 31 ++++++++++++ src/directory/directory.rs | 3 -- src/directory/managed_directory.rs | 81 ++++++++++++++++++++++++++++++ src/directory/mmap_directory.rs | 20 -------- src/directory/mod.rs | 2 + src/directory/ram_directory.rs | 19 ------- src/indexer/segment_manager.rs | 13 ++++- src/indexer/segment_register.rs | 1 + src/indexer/segment_updater.rs | 12 +++-- 13 files changed, 168 insertions(+), 124 deletions(-) create mode 100644 src/directory/managed_directory.rs diff --git a/src/core/index.rs b/src/core/index.rs index afce57a21..cc7d51cf3 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -19,6 +19,7 @@ use super::pool::LeasedItem; use std::path::Path; use core::IndexMeta; use IndexWriter; +use directory::ManagedDirectory; use core::META_FILEPATH; use super::segment::create_segment; use indexer::segment_updater::save_new_metas; @@ -26,7 +27,6 @@ use directory::error::FileError; const NUM_SEARCHERS: usize = 12; - fn load_metas(directory: &Directory) -> Result { let meta_data = directory.atomic_read(&META_FILEPATH)?; let meta_string = String::from_utf8_lossy(&meta_data); @@ -36,58 +36,19 @@ fn load_metas(directory: &Directory) -> Result { /// Tantivy's Search Index pub struct Index { - directory: Box, + directory: ManagedDirectory, schema: Schema, searcher_pool: Arc>, } - - -/// Deletes all of the files of the segment. -/// This is called when there is a merge or a rollback. -/// -/// # Disclaimer -/// If deletion of a file fails (e.g. a file -/// was read-only.), the method does not -/// fail and just logs an error when it fails. -#[doc(hidden)] -pub fn delete_segment(directory: &Directory, segment_id: SegmentId) { - info!("Deleting segment {:?}", segment_id); - let segment_filepaths_res = directory.ls_starting_with( - &*segment_id.uuid_string() - ); - - match segment_filepaths_res { - Ok(segment_filepaths) => { - for segment_filepath in &segment_filepaths { - if let Err(err) = directory.delete(&segment_filepath) { - match err { - FileError::FileDoesNotExist(_) => { - // this is normal behavior. - // the position file for instance may not exists. - } - FileError::IOError(err) => { - error!("Failed to remove {:?} : {:?}", segment_id, err); - } - } - } - } - } - Err(_) => { - error!("Failed to list files of segment {:?} for deletion.", segment_id.uuid_string()); - } - } -} - - impl Index { /// Creates a new index using the `RAMDirectory`. /// /// The index will be allocated in anonymous memory. /// This should only be used for unit tests. pub fn create_in_ram(schema: Schema) -> Index { - let directory = Box::new(RAMDirectory::create()); + let directory = ManagedDirectory::new(RAMDirectory::create()); Index::from_directory(directory, schema).expect("Creating a RAMDirectory should never fail") // unwrap is ok here } @@ -96,8 +57,8 @@ impl Index { /// /// If a previous index was in this directory, then its meta file will be destroyed. pub fn create(directory_path: &Path, schema: Schema) -> Result { - let directory = MmapDirectory::open(directory_path)?; - Index::from_directory(box directory, schema) + let directory = ManagedDirectory::new(MmapDirectory::open(directory_path)?); + Index::from_directory(directory, schema) } /// Creates a new index in a temp directory. @@ -109,12 +70,12 @@ impl Index { /// The temp directory is only used for testing the `MmapDirectory`. /// For other unit tests, prefer the `RAMDirectory`, see: `create_in_ram`. pub fn create_from_tempdir(schema: Schema) -> Result { - let directory = Box::new(try!(MmapDirectory::create_from_tempdir())); + let directory = ManagedDirectory::new(MmapDirectory::create_from_tempdir()?); Index::from_directory(directory, schema) } /// Creates a new index given a directory and an `IndexMeta`. - fn create_from_metas(directory: Box, metas: IndexMeta) -> Result { + fn create_from_metas(directory: ManagedDirectory, metas: IndexMeta) -> Result { let schema = metas.schema.clone(); let index = Index { directory: directory, @@ -126,16 +87,16 @@ impl Index { } /// Create a new index from a directory. - pub fn from_directory(mut directory: Box, schema: Schema) -> Result { + pub fn from_directory(mut directory: ManagedDirectory, schema: Schema) -> Result { save_new_metas(schema.clone(), 0, directory.borrow_mut())?; Index::create_from_metas(directory, IndexMeta::with_schema(schema)) } /// Opens a new directory from an index path. pub fn open(directory_path: &Path) -> Result { - let directory = try!(MmapDirectory::open(directory_path)); + let directory = ManagedDirectory::new(MmapDirectory::open(directory_path)?); let metas = try!(load_metas(&directory)); - Index::create_from_metas(directory.box_clone(), metas) + Index::create_from_metas(directory, metas) } /// Returns the index opstamp. @@ -196,16 +157,7 @@ impl Index { .map(|segment_meta| self.segment(segment_meta)) .collect()) } - - /// Remove all of the file associated with the segment. - /// - /// This method cannot fail. If a problem occurs, - /// some files may end up never being removed. - /// The error will only be logged. - pub fn delete_segment(&self, segment_id: SegmentId) { - delete_segment(self.directory(), segment_id); - } - + #[doc(hidden)] pub fn segment(&self, segment_meta: SegmentMeta) -> Segment { create_segment(self.clone(), segment_meta) @@ -219,12 +171,12 @@ impl Index { /// Return a reference to the index directory. pub fn directory(&self) -> &Directory { - &*self.directory + &self.directory } /// Return a mutable reference to the index directory. pub fn directory_mut(&mut self) -> &mut Directory { - &mut *self.directory + &mut self.directory } /// Reads the meta.json and returns the list of @@ -288,7 +240,7 @@ impl fmt::Debug for Index { impl Clone for Index { fn clone(&self) -> Index { Index { - directory: self.directory.box_clone(), + directory: self.directory.clone(), schema: self.schema.clone(), searcher_pool: self.searcher_pool.clone(), } diff --git a/src/core/mod.rs b/src/core/mod.rs index d6238c48c..4e11428e0 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -26,4 +26,5 @@ use std::path::PathBuf; lazy_static! { pub static ref META_FILEPATH: PathBuf = PathBuf::from("meta.json"); + pub static ref MANAGED_FILEPATH: PathBuf = PathBuf::from(".managed.json"); } \ No newline at end of file diff --git a/src/core/segment.rs b/src/core/segment.rs index 22f157420..cd78743b9 100644 --- a/src/core/segment.rs +++ b/src/core/segment.rs @@ -62,19 +62,7 @@ impl Segment { /// It just joins the segment id with the extension /// associated to a segment component. pub fn relative_path(&self, component: SegmentComponent) -> PathBuf { - use self::SegmentComponent::*; - let mut path = self.id().uuid_string(); - path.push_str(&*match component { - POSITIONS => ".pos".to_string(), - INFO => ".info".to_string(), - POSTINGS => ".idx".to_string(), - TERMS => ".term".to_string(), - STORE => ".store".to_string(), - FASTFIELDS => ".fast".to_string(), - FIELDNORMS => ".fieldnorm".to_string(), - DELETE => {format!(".{}.del", self.meta.delete_opstamp().unwrap_or(0))}, - }); - PathBuf::from(path) + self.meta.relative_path(component) } /// Open one of the component file for read. diff --git a/src/core/segment_component.rs b/src/core/segment_component.rs index 93aacd506..5e380c597 100644 --- a/src/core/segment_component.rs +++ b/src/core/segment_component.rs @@ -10,5 +10,20 @@ pub enum SegmentComponent { DELETE } - - \ No newline at end of file +impl SegmentComponent { + + pub fn iterator() -> impl Iterator { + static SEGMENT_COMPONENTS: [SegmentComponent; 8] = [ + SegmentComponent::INFO, + SegmentComponent::POSTINGS, + SegmentComponent::POSITIONS, + SegmentComponent::FASTFIELDS, + SegmentComponent::FIELDNORMS, + SegmentComponent::TERMS, + SegmentComponent::STORE, + SegmentComponent::DELETE + ]; + SEGMENT_COMPONENTS.into_iter() + } + +} \ No newline at end of file diff --git a/src/core/segment_meta.rs b/src/core/segment_meta.rs index ffcd2f6b9..b342a18d0 100644 --- a/src/core/segment_meta.rs +++ b/src/core/segment_meta.rs @@ -1,4 +1,6 @@ use core::SegmentId; +use super::SegmentComponent; +use std::path::PathBuf; #[derive(Clone, Debug, RustcDecodable,RustcEncodable)] @@ -43,6 +45,35 @@ impl SegmentMeta { .unwrap_or(0u32) } + pub fn alive_files(&self) -> Vec { + SegmentComponent::iterator() + .map(|component| { + self.relative_path(*component) + }) + .collect::>() + + } + + /// Returns the relative path of a component of our segment. + /// + /// It just joins the segment id with the extension + /// associated to a segment component. + pub fn relative_path(&self, component: SegmentComponent) -> PathBuf { + use self::SegmentComponent::*; + let mut path = self.id().uuid_string(); + path.push_str(&*match component { + POSITIONS => ".pos".to_string(), + INFO => ".info".to_string(), + POSTINGS => ".idx".to_string(), + TERMS => ".term".to_string(), + STORE => ".store".to_string(), + FASTFIELDS => ".fast".to_string(), + FIELDNORMS => ".fieldnorm".to_string(), + DELETE => {format!(".{}.del", self.delete_opstamp().unwrap_or(0))}, + }); + PathBuf::from(path) + } + /// Return the highest doc id + 1 /// /// If there are no deletes, then num_docs = max_docs diff --git a/src/directory/directory.rs b/src/directory/directory.rs index b3ef71016..3f41f5011 100644 --- a/src/directory/directory.rs +++ b/src/directory/directory.rs @@ -78,9 +78,6 @@ pub trait Directory: fmt::Debug + Send + Sync + 'static { /// Clones the directory and boxes the clone fn box_clone(&self) -> Box; - /// Returns the list of files starting by a given - /// prefix. - fn ls_starting_with(&self, prefix: &str) -> io::Result>; } diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs new file mode 100644 index 000000000..ba93a405e --- /dev/null +++ b/src/directory/managed_directory.rs @@ -0,0 +1,81 @@ +use Result; +use std::path::{Path, PathBuf}; +use directory::error::{FileError, OpenWriteError}; +use directory::{ReadOnlySource, WritePtr}; +use std::result; +use std::io; +use Directory; +use std::sync::{Arc, RwLock}; +use std::collections::HashSet; +use std::io::Write; +use rustc_serialize::json; +use core::MANAGED_FILEPATH; + + + +#[derive(Debug)] +pub struct ManagedDirectory { + directory: Box, + managed_paths: Arc>>, +} + + +impl ManagedDirectory { + pub fn new(directory: Dir) -> ManagedDirectory { + ManagedDirectory { + directory: box directory, + managed_paths: Arc::default(), + } + } + + fn register_file_as_managed(&mut self, filepath: PathBuf) -> Result<()> { + let mut managed_files_lock = self.managed_paths.write()?; + if managed_files_lock.insert(filepath) { + let mut w = vec!(); + try!(write!(&mut w, "{}\n", json::as_pretty_json(&*managed_files_lock))); + self.directory.atomic_write(&MANAGED_FILEPATH, &w[..])?; + } + Ok(()) + } +} + +impl Directory for ManagedDirectory { + + fn open_read(&self, path: &Path) -> result::Result { + self.directory.open_read(path) + } + + fn open_write(&mut self, path: &Path) -> result::Result { + self.directory.open_write(path) + } + + fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> { + self.directory.atomic_write(path, data) + } + + fn atomic_read(&self, path: &Path) -> result::Result, FileError> { + self.directory.atomic_read(path) + } + + fn delete(&self, path: &Path) -> result::Result<(), FileError> { + self.directory.delete(path) + } + + fn exists(&self, path: &Path) -> bool { + self.directory.exists(path) + } + + fn box_clone(&self) -> Box { + box self.clone() + } + +} + +impl Clone for ManagedDirectory { + fn clone(&self) -> ManagedDirectory { + ManagedDirectory { + directory: self.directory.box_clone(), + managed_paths: self.managed_paths.clone(), + } + } +} \ No newline at end of file diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index 3b18144b3..0a8e6f4ac 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -352,26 +352,6 @@ impl Directory for MmapDirectory { fn box_clone(&self,) -> Box { Box::new(self.clone()) } - - fn ls_starting_with(&self, prefix: &str) -> io::Result> { - fs::read_dir(&self.root_path) - .map(|paths: ReadDir| { - paths - .filter_map(|dir_entry_res| - dir_entry_res - .ok() - .map(|dir_entry| dir_entry.path()) - ) - .filter(|path| - path.to_str() - .map(|filepath| filepath.starts_with(prefix)) - .unwrap_or(false) - ) - .map(PathBuf::from) - .collect() - }) - - } } diff --git a/src/directory/mod.rs b/src/directory/mod.rs index e03435199..760c2c0d5 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -3,6 +3,7 @@ mod ram_directory; mod directory; mod read_only_source; mod shared_vec_slice; +mod managed_directory; /// Errors specific to the directory module. pub mod error; @@ -14,6 +15,7 @@ pub use self::read_only_source::ReadOnlySource; pub use self::directory::Directory; pub use self::ram_directory::RAMDirectory; pub use self::mmap_directory::MmapDirectory; +pub use self::managed_directory::ManagedDirectory; /// Synonym of Seek + Write pub trait SeekableWrite: Seek + Write {} diff --git a/src/directory/ram_directory.rs b/src/directory/ram_directory.rs index 2a85a735d..3f798485d 100644 --- a/src/directory/ram_directory.rs +++ b/src/directory/ram_directory.rs @@ -130,20 +130,6 @@ impl InnerDirectory { .contains_key(path) } - fn ls_starting_with(&self, prefix: &str) -> Vec { - self.0 - .read() - .expect("Failed to get read lock directory.") - .keys() - .filter(|path: &&PathBuf| - path.to_str() - .map(|p: &str| p.starts_with(prefix)) - .unwrap_or(false) - ) - .cloned() - .collect() - } - } impl fmt::Debug for RAMDirectory { @@ -218,9 +204,4 @@ impl Directory for RAMDirectory { Box::new(self.clone()) } - - fn ls_starting_with(&self, prefix: &str) -> io::Result> { - Ok(self.fs.ls_starting_with(prefix)) - } - } diff --git a/src/indexer/segment_manager.rs b/src/indexer/segment_manager.rs index 250830a84..2533eeeda 100644 --- a/src/indexer/segment_manager.rs +++ b/src/indexer/segment_manager.rs @@ -3,7 +3,7 @@ use std::sync::RwLock; use core::SegmentMeta; use core::SegmentId; use indexer::{SegmentEntry, SegmentState}; - +use std::path::PathBuf; use std::sync::{RwLockReadGuard, RwLockWriteGuard}; use std::fmt::{self, Debug, Formatter}; @@ -67,6 +67,17 @@ impl SegmentManager { segment_entries } + pub fn alive_files(&self) -> Vec { + let mut files = vec!(); + let (segment_meta_uncommitted, segment_meta_committed) = get_segments(self); + for segment_meta in segment_meta_uncommitted + .into_iter() + .chain(segment_meta_committed.into_iter()) { + files.extend(segment_meta.alive_files()); + } + files + } + pub fn segment_state(&self, segment_id: &SegmentId) -> Option { self.segment_entry(segment_id) .map(|segment_entry| segment_entry.state()) diff --git a/src/indexer/segment_register.rs b/src/indexer/segment_register.rs index 418d92ef7..288b7a95f 100644 --- a/src/indexer/segment_register.rs +++ b/src/indexer/segment_register.rs @@ -4,6 +4,7 @@ use core::SegmentMeta; use std::fmt; use std::fmt::{Debug, Formatter}; use indexer::segment_entry::SegmentEntry; +use std::path::PathBuf; diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 6c5b9b451..b2d1b1ba5 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -21,6 +21,7 @@ use indexer::merger::IndexMerger; use indexer::SegmentEntry; use indexer::SegmentSerializer; use Result; +use std::path::PathBuf; use rustc_serialize::json; use schema::Schema; use std::borrow::BorrowMut; @@ -74,13 +75,16 @@ pub fn save_metas(segment_metas: Vec, schema: schema, opstamp: opstamp, }; - let mut w = Vec::new(); + let mut w = vec!(); try!(write!(&mut w, "{}\n", json::as_pretty_json(&metas))); Ok(directory .atomic_write(&META_FILEPATH, &w[..])?) } +fn garbage_collect_files(directory: &Directory, alive_files: Vec) { + // +} // The segment update runner is in charge of processing all @@ -183,7 +187,10 @@ impl SegmentUpdater { segment_updater.0.index.schema(), opstamp, directory.borrow_mut()).expect("Could not save metas."); + let useful_files = segment_updater.0.segment_manager.alive_files(); + garbage_collect_files(&*directory, useful_files); segment_updater.consider_merge_options(); + }) } @@ -290,9 +297,6 @@ impl SegmentUpdater { segment_updater.0.index.schema(), segment_updater.0.index.opstamp(), directory.borrow_mut()).expect("Could not save metas."); - for segment_meta in merged_segment_metas { - segment_updater.0.index.delete_segment(segment_meta.id()); - } }) }