issue/77 Added managed directory

This commit is contained in:
Paul Masurel
2017-03-03 22:41:30 +09:00
parent 590a8582c9
commit 4b7afa2ae7
13 changed files with 168 additions and 124 deletions

View File

@@ -19,6 +19,7 @@ use super::pool::LeasedItem;
use std::path::Path;
use core::IndexMeta;
use IndexWriter;
use directory::ManagedDirectory;
use core::META_FILEPATH;
use super::segment::create_segment;
use indexer::segment_updater::save_new_metas;
@@ -26,7 +27,6 @@ use directory::error::FileError;
const NUM_SEARCHERS: usize = 12;
fn load_metas(directory: &Directory) -> Result<IndexMeta> {
let meta_data = directory.atomic_read(&META_FILEPATH)?;
let meta_string = String::from_utf8_lossy(&meta_data);
@@ -36,58 +36,19 @@ fn load_metas(directory: &Directory) -> Result<IndexMeta> {
/// Tantivy's Search Index
pub struct Index {
directory: Box<Directory>,
directory: ManagedDirectory,
schema: Schema,
searcher_pool: Arc<Pool<Searcher>>,
}
/// Deletes all of the files of the segment.
/// This is called when there is a merge or a rollback.
///
/// # Disclaimer
/// If deletion of a file fails (e.g. a file
/// was read-only.), the method does not
/// fail and just logs an error when it fails.
#[doc(hidden)]
pub fn delete_segment(directory: &Directory, segment_id: SegmentId) {
info!("Deleting segment {:?}", segment_id);
let segment_filepaths_res = directory.ls_starting_with(
&*segment_id.uuid_string()
);
match segment_filepaths_res {
Ok(segment_filepaths) => {
for segment_filepath in &segment_filepaths {
if let Err(err) = directory.delete(&segment_filepath) {
match err {
FileError::FileDoesNotExist(_) => {
// this is normal behavior.
// the position file for instance may not exists.
}
FileError::IOError(err) => {
error!("Failed to remove {:?} : {:?}", segment_id, err);
}
}
}
}
}
Err(_) => {
error!("Failed to list files of segment {:?} for deletion.", segment_id.uuid_string());
}
}
}
impl Index {
/// Creates a new index using the `RAMDirectory`.
///
/// The index will be allocated in anonymous memory.
/// This should only be used for unit tests.
pub fn create_in_ram(schema: Schema) -> Index {
let directory = Box::new(RAMDirectory::create());
let directory = ManagedDirectory::new(RAMDirectory::create());
Index::from_directory(directory, schema).expect("Creating a RAMDirectory should never fail") // unwrap is ok here
}
@@ -96,8 +57,8 @@ impl Index {
///
/// If a previous index was in this directory, then its meta file will be destroyed.
pub fn create(directory_path: &Path, schema: Schema) -> Result<Index> {
let directory = MmapDirectory::open(directory_path)?;
Index::from_directory(box directory, schema)
let directory = ManagedDirectory::new(MmapDirectory::open(directory_path)?);
Index::from_directory(directory, schema)
}
/// Creates a new index in a temp directory.
@@ -109,12 +70,12 @@ impl Index {
/// The temp directory is only used for testing the `MmapDirectory`.
/// For other unit tests, prefer the `RAMDirectory`, see: `create_in_ram`.
pub fn create_from_tempdir(schema: Schema) -> Result<Index> {
let directory = Box::new(try!(MmapDirectory::create_from_tempdir()));
let directory = ManagedDirectory::new(MmapDirectory::create_from_tempdir()?);
Index::from_directory(directory, schema)
}
/// Creates a new index given a directory and an `IndexMeta`.
fn create_from_metas(directory: Box<Directory>, metas: IndexMeta) -> Result<Index> {
fn create_from_metas(directory: ManagedDirectory, metas: IndexMeta) -> Result<Index> {
let schema = metas.schema.clone();
let index = Index {
directory: directory,
@@ -126,16 +87,16 @@ impl Index {
}
/// Create a new index from a directory.
pub fn from_directory(mut directory: Box<Directory>, schema: Schema) -> Result<Index> {
pub fn from_directory(mut directory: ManagedDirectory, schema: Schema) -> Result<Index> {
save_new_metas(schema.clone(), 0, directory.borrow_mut())?;
Index::create_from_metas(directory, IndexMeta::with_schema(schema))
}
/// Opens a new directory from an index path.
pub fn open(directory_path: &Path) -> Result<Index> {
let directory = try!(MmapDirectory::open(directory_path));
let directory = ManagedDirectory::new(MmapDirectory::open(directory_path)?);
let metas = try!(load_metas(&directory));
Index::create_from_metas(directory.box_clone(), metas)
Index::create_from_metas(directory, metas)
}
/// Returns the index opstamp.
@@ -196,16 +157,7 @@ impl Index {
.map(|segment_meta| self.segment(segment_meta))
.collect())
}
/// Remove all of the file associated with the segment.
///
/// This method cannot fail. If a problem occurs,
/// some files may end up never being removed.
/// The error will only be logged.
pub fn delete_segment(&self, segment_id: SegmentId) {
delete_segment(self.directory(), segment_id);
}
#[doc(hidden)]
pub fn segment(&self, segment_meta: SegmentMeta) -> Segment {
create_segment(self.clone(), segment_meta)
@@ -219,12 +171,12 @@ impl Index {
/// Return a reference to the index directory.
pub fn directory(&self) -> &Directory {
&*self.directory
&self.directory
}
/// Return a mutable reference to the index directory.
pub fn directory_mut(&mut self) -> &mut Directory {
&mut *self.directory
&mut self.directory
}
/// Reads the meta.json and returns the list of
@@ -288,7 +240,7 @@ impl fmt::Debug for Index {
impl Clone for Index {
fn clone(&self) -> Index {
Index {
directory: self.directory.box_clone(),
directory: self.directory.clone(),
schema: self.schema.clone(),
searcher_pool: self.searcher_pool.clone(),
}

View File

@@ -26,4 +26,5 @@ use std::path::PathBuf;
lazy_static! {
pub static ref META_FILEPATH: PathBuf = PathBuf::from("meta.json");
pub static ref MANAGED_FILEPATH: PathBuf = PathBuf::from(".managed.json");
}

View File

@@ -62,19 +62,7 @@ impl Segment {
/// It just joins the segment id with the extension
/// associated to a segment component.
pub fn relative_path(&self, component: SegmentComponent) -> PathBuf {
use self::SegmentComponent::*;
let mut path = self.id().uuid_string();
path.push_str(&*match component {
POSITIONS => ".pos".to_string(),
INFO => ".info".to_string(),
POSTINGS => ".idx".to_string(),
TERMS => ".term".to_string(),
STORE => ".store".to_string(),
FASTFIELDS => ".fast".to_string(),
FIELDNORMS => ".fieldnorm".to_string(),
DELETE => {format!(".{}.del", self.meta.delete_opstamp().unwrap_or(0))},
});
PathBuf::from(path)
self.meta.relative_path(component)
}
/// Open one of the component file for read.

View File

@@ -10,5 +10,20 @@ pub enum SegmentComponent {
DELETE
}
impl SegmentComponent {
pub fn iterator() -> impl Iterator<Item=&'static SegmentComponent> {
static SEGMENT_COMPONENTS: [SegmentComponent; 8] = [
SegmentComponent::INFO,
SegmentComponent::POSTINGS,
SegmentComponent::POSITIONS,
SegmentComponent::FASTFIELDS,
SegmentComponent::FIELDNORMS,
SegmentComponent::TERMS,
SegmentComponent::STORE,
SegmentComponent::DELETE
];
SEGMENT_COMPONENTS.into_iter()
}
}

View File

@@ -1,4 +1,6 @@
use core::SegmentId;
use super::SegmentComponent;
use std::path::PathBuf;
#[derive(Clone, Debug, RustcDecodable,RustcEncodable)]
@@ -43,6 +45,35 @@ impl SegmentMeta {
.unwrap_or(0u32)
}
pub fn alive_files(&self) -> Vec<PathBuf> {
SegmentComponent::iterator()
.map(|component| {
self.relative_path(*component)
})
.collect::<Vec<PathBuf>>()
}
/// Returns the relative path of a component of our segment.
///
/// It just joins the segment id with the extension
/// associated to a segment component.
pub fn relative_path(&self, component: SegmentComponent) -> PathBuf {
use self::SegmentComponent::*;
let mut path = self.id().uuid_string();
path.push_str(&*match component {
POSITIONS => ".pos".to_string(),
INFO => ".info".to_string(),
POSTINGS => ".idx".to_string(),
TERMS => ".term".to_string(),
STORE => ".store".to_string(),
FASTFIELDS => ".fast".to_string(),
FIELDNORMS => ".fieldnorm".to_string(),
DELETE => {format!(".{}.del", self.delete_opstamp().unwrap_or(0))},
});
PathBuf::from(path)
}
/// Return the highest doc id + 1
///
/// If there are no deletes, then num_docs = max_docs

View File

@@ -78,9 +78,6 @@ pub trait Directory: fmt::Debug + Send + Sync + 'static {
/// Clones the directory and boxes the clone
fn box_clone(&self) -> Box<Directory>;
/// Returns the list of files starting by a given
/// prefix.
fn ls_starting_with(&self, prefix: &str) -> io::Result<Vec<PathBuf>>;
}

View File

@@ -0,0 +1,81 @@
use Result;
use std::path::{Path, PathBuf};
use directory::error::{FileError, OpenWriteError};
use directory::{ReadOnlySource, WritePtr};
use std::result;
use std::io;
use Directory;
use std::sync::{Arc, RwLock};
use std::collections::HashSet;
use std::io::Write;
use rustc_serialize::json;
use core::MANAGED_FILEPATH;
#[derive(Debug)]
pub struct ManagedDirectory {
directory: Box<Directory>,
managed_paths: Arc<RwLock<HashSet<PathBuf>>>,
}
impl ManagedDirectory {
pub fn new<Dir: Directory>(directory: Dir) -> ManagedDirectory {
ManagedDirectory {
directory: box directory,
managed_paths: Arc::default(),
}
}
fn register_file_as_managed(&mut self, filepath: PathBuf) -> Result<()> {
let mut managed_files_lock = self.managed_paths.write()?;
if managed_files_lock.insert(filepath) {
let mut w = vec!();
try!(write!(&mut w, "{}\n", json::as_pretty_json(&*managed_files_lock)));
self.directory.atomic_write(&MANAGED_FILEPATH, &w[..])?;
}
Ok(())
}
}
impl Directory for ManagedDirectory {
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, FileError> {
self.directory.open_read(path)
}
fn open_write(&mut self, path: &Path) -> result::Result<WritePtr, OpenWriteError> {
self.directory.open_write(path)
}
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> {
self.directory.atomic_write(path, data)
}
fn atomic_read(&self, path: &Path) -> result::Result<Vec<u8>, FileError> {
self.directory.atomic_read(path)
}
fn delete(&self, path: &Path) -> result::Result<(), FileError> {
self.directory.delete(path)
}
fn exists(&self, path: &Path) -> bool {
self.directory.exists(path)
}
fn box_clone(&self) -> Box<Directory> {
box self.clone()
}
}
impl Clone for ManagedDirectory {
fn clone(&self) -> ManagedDirectory {
ManagedDirectory {
directory: self.directory.box_clone(),
managed_paths: self.managed_paths.clone(),
}
}
}

View File

@@ -352,26 +352,6 @@ impl Directory for MmapDirectory {
fn box_clone(&self,) -> Box<Directory> {
Box::new(self.clone())
}
fn ls_starting_with(&self, prefix: &str) -> io::Result<Vec<PathBuf>> {
fs::read_dir(&self.root_path)
.map(|paths: ReadDir| {
paths
.filter_map(|dir_entry_res|
dir_entry_res
.ok()
.map(|dir_entry| dir_entry.path())
)
.filter(|path|
path.to_str()
.map(|filepath| filepath.starts_with(prefix))
.unwrap_or(false)
)
.map(PathBuf::from)
.collect()
})
}
}

View File

@@ -3,6 +3,7 @@ mod ram_directory;
mod directory;
mod read_only_source;
mod shared_vec_slice;
mod managed_directory;
/// Errors specific to the directory module.
pub mod error;
@@ -14,6 +15,7 @@ pub use self::read_only_source::ReadOnlySource;
pub use self::directory::Directory;
pub use self::ram_directory::RAMDirectory;
pub use self::mmap_directory::MmapDirectory;
pub use self::managed_directory::ManagedDirectory;
/// Synonym of Seek + Write
pub trait SeekableWrite: Seek + Write {}

View File

@@ -130,20 +130,6 @@ impl InnerDirectory {
.contains_key(path)
}
fn ls_starting_with(&self, prefix: &str) -> Vec<PathBuf> {
self.0
.read()
.expect("Failed to get read lock directory.")
.keys()
.filter(|path: &&PathBuf|
path.to_str()
.map(|p: &str| p.starts_with(prefix))
.unwrap_or(false)
)
.cloned()
.collect()
}
}
impl fmt::Debug for RAMDirectory {
@@ -218,9 +204,4 @@ impl Directory for RAMDirectory {
Box::new(self.clone())
}
fn ls_starting_with(&self, prefix: &str) -> io::Result<Vec<PathBuf>> {
Ok(self.fs.ls_starting_with(prefix))
}
}

View File

@@ -3,7 +3,7 @@ use std::sync::RwLock;
use core::SegmentMeta;
use core::SegmentId;
use indexer::{SegmentEntry, SegmentState};
use std::path::PathBuf;
use std::sync::{RwLockReadGuard, RwLockWriteGuard};
use std::fmt::{self, Debug, Formatter};
@@ -67,6 +67,17 @@ impl SegmentManager {
segment_entries
}
pub fn alive_files(&self) -> Vec<PathBuf> {
let mut files = vec!();
let (segment_meta_uncommitted, segment_meta_committed) = get_segments(self);
for segment_meta in segment_meta_uncommitted
.into_iter()
.chain(segment_meta_committed.into_iter()) {
files.extend(segment_meta.alive_files());
}
files
}
pub fn segment_state(&self, segment_id: &SegmentId) -> Option<SegmentState> {
self.segment_entry(segment_id)
.map(|segment_entry| segment_entry.state())

View File

@@ -4,6 +4,7 @@ use core::SegmentMeta;
use std::fmt;
use std::fmt::{Debug, Formatter};
use indexer::segment_entry::SegmentEntry;
use std::path::PathBuf;

View File

@@ -21,6 +21,7 @@ use indexer::merger::IndexMerger;
use indexer::SegmentEntry;
use indexer::SegmentSerializer;
use Result;
use std::path::PathBuf;
use rustc_serialize::json;
use schema::Schema;
use std::borrow::BorrowMut;
@@ -74,13 +75,16 @@ pub fn save_metas(segment_metas: Vec<SegmentMeta>,
schema: schema,
opstamp: opstamp,
};
let mut w = Vec::new();
let mut w = vec!();
try!(write!(&mut w, "{}\n", json::as_pretty_json(&metas)));
Ok(directory
.atomic_write(&META_FILEPATH, &w[..])?)
}
fn garbage_collect_files(directory: &Directory, alive_files: Vec<PathBuf>) {
//
}
// The segment update runner is in charge of processing all
@@ -183,7 +187,10 @@ impl SegmentUpdater {
segment_updater.0.index.schema(),
opstamp,
directory.borrow_mut()).expect("Could not save metas.");
let useful_files = segment_updater.0.segment_manager.alive_files();
garbage_collect_files(&*directory, useful_files);
segment_updater.consider_merge_options();
})
}
@@ -290,9 +297,6 @@ impl SegmentUpdater {
segment_updater.0.index.schema(),
segment_updater.0.index.opstamp(),
directory.borrow_mut()).expect("Could not save metas.");
for segment_meta in merged_segment_metas {
segment_updater.0.index.delete_segment(segment_meta.id());
}
})
}