Merge remote-tracking branch 'origin/directorytoindex' into newdirectory

This commit is contained in:
Paul Masurel
2016-03-02 08:57:07 +09:00
4 changed files with 105 additions and 130 deletions

View File

@@ -30,30 +30,30 @@ pub fn generate_segment_name() -> SegmentId {
SegmentId( String::from("_") + &random_name)
}
#[derive(Clone,Debug,RustcDecodable, RustcEncodable)]
pub struct DirectoryMeta {
#[derive(Clone,Debug,RustcDecodable,RustcEncodable)]
pub struct IndexMeta {
segments: Vec<String>,
schema: Schema,
}
impl DirectoryMeta {
fn new() -> DirectoryMeta {
DirectoryMeta {
impl IndexMeta {
fn new() -> IndexMeta {
IndexMeta {
segments: Vec::new(),
schema: Schema::new(),
}
}
fn with_schema(schema: Schema) -> DirectoryMeta {
DirectoryMeta {
fn with_schema(schema: Schema) -> IndexMeta {
IndexMeta {
segments: Vec::new(),
schema: schema,
}
}
}
impl fmt::Debug for Directory {
impl fmt::Debug for Index {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Directory({:?})", self.inner_directory.read().unwrap().index_path)
write!(f, "Index({:?})", self.inner_index.read().unwrap().index_path)
}
}
@@ -68,75 +68,76 @@ fn sync_file(filepath: &PathBuf) -> Result<(), IOError> {
#[derive(Clone)]
pub struct Directory {
inner_directory: Arc<RwLock<InnerDirectory>>,
pub struct Index {
metas: Arc<RwLock<IndexMeta>>,
inner_index: Arc<RwLock<InnerIndex>>,
}
pub enum CreateError {
RootDirectoryDoesNotExist,
DirectoryAlreadyExists,
RootIndexDoesNotExist,
IndexAlreadyExists,
CannotOpenMetaFile,
}
struct DirectoryError;
struct IndexError;
lazy_static! {
static ref META_FILEPATH: PathBuf = PathBuf::from("meta.json");
}
impl Index {
impl Directory {
pub fn create(filepath: &Path, schema: Schema) -> Result<Directory, CreateError> {
let inner_directory = try!(InnerDirectory::create(filepath, schema));
Ok(Directory {
inner_directory: Arc::new(RwLock::new(inner_directory)),
})
pub fn create(filepath: &Path, schema: Schema) -> Result<Index, CreateError> {
let inner_index = try!(InnerIndex::create(filepath));
Ok(Index::from_inner_index(inner_index, schema))
}
pub fn create_from_tempdir(schema: Schema) -> Result<Directory, IOError> {
let inner_directory = try!(InnerDirectory::create_from_tempdir(schema));
Ok(Directory {
inner_directory: Arc::new(RwLock::new(inner_directory)),
})
pub fn create_from_tempdir(schema: Schema) -> Result<Index, IOError> {
let inner_index = try!(InnerIndex::create_from_tempdir());
Ok(Index::from_inner_index(inner_index, schema))
}
pub fn open<P: AsRef<Path>>(filepath: &P) -> Result<Directory, IOError> {
let inner_directory = try!(InnerDirectory::open(filepath));
Ok(Directory {
inner_directory: Arc::new(RwLock::new(inner_directory)),
})
pub fn open<P: AsRef<Path>>(filepath: &P) -> Result<Index, IOError> {
let inner_index = try!(InnerIndex::open(filepath));
let mut index = Index::from_inner_index(inner_index, Schema::new());
try!(index.load_metas()); //< does the directory already exists?
Ok(index)
}
pub fn schema(&self,) -> Schema {
self.get_read().unwrap().metas.schema.clone()
self.metas.read().unwrap().schema.clone()
}
fn get_write(&mut self) -> Result<RwLockWriteGuard<InnerDirectory>, IOError> {
self.inner_directory
fn get_write(&mut self) -> Result<RwLockWriteGuard<InnerIndex>, IOError> {
self.inner_index
.write()
.map_err(|e| IOError::new(IOErrorKind::Other,
format!("Failed acquiring lock on directory.\n
It can happen if another thread panicked! Error was: {:?}", e) ))
}
fn get_read(&self) -> Result<RwLockReadGuard<InnerDirectory>, IOError> {
self.inner_directory
fn get_read(&self) -> Result<RwLockReadGuard<InnerIndex>, IOError> {
self.inner_index
.read()
.map_err(|e| IOError::new(IOErrorKind::Other,
format!("Failed acquiring lock on directory.\n
It can happen if another thread panicked! Error was: {:?}", e) ))
}
pub fn publish_segment(&mut self, segment: Segment) -> Result<(), IOError> {
try!(self.get_write()).publish_segment(segment)
fn from_inner_index(inner_index: InnerIndex, schema: Schema) -> Index {
Index {
metas: Arc::new(RwLock::new(IndexMeta::with_schema(schema))),
inner_index: Arc::new(RwLock::new(inner_index)),
}
}
pub fn load_metas(&self,) -> Result<(), IOError> {
self.inner_directory
.write()
.unwrap() // only fail when another thread has already panicked.
.load_metas()
// TODO find a rusty way to hide that, while keeping
// it visible for IndexWriters.
pub fn publish_segment(&mut self, segment: Segment) -> Result<(), IOError> {
println!("publish segment {:?}", segment);
self.metas.write().unwrap().segments.push(segment.segment_id.0.clone());
// TODO use logs
self.save_metas()
}
pub fn sync(&mut self, segment: Segment) -> Result<(), IOError> {
@@ -145,10 +146,7 @@ impl Directory {
pub fn segments(&self,) -> Vec<Segment> {
// TODO handle error
self.inner_directory
.read()
.unwrap()
.segment_ids()
self.segment_ids()
.into_iter()
.map(|segment_id| self.segment(&segment_id))
.collect()
@@ -156,30 +154,48 @@ impl Directory {
pub fn segment(&self, segment_id: &SegmentId) -> Segment {
Segment {
directory: self.clone(),
index: self.clone(),
segment_id: segment_id.clone()
}
}
fn segment_ids(&self,) -> Vec<SegmentId> {
self.metas
.read()
.unwrap()
.segments
.iter()
.cloned()
.map(SegmentId)
.collect()
}
pub fn new_segment(&self,) -> Segment {
// TODO check it does not exists
self.segment(&generate_segment_name())
}
fn open_writable(&self, relative_path: &PathBuf) -> Result<File, IOError> {
try!(self.get_read()).open_writable(relative_path)
pub fn load_metas(&mut self,) -> Result<(), IOError> {
let mut meta_file = try!(self.inner_index.read().unwrap().mmap(&META_FILEPATH));
let mut meta_content = String::from_utf8_lossy(unsafe {meta_file.as_slice()});
println!("META CONTENT {:?}", meta_content);
let loaded_meta: IndexMeta = json::decode(&meta_content).unwrap();
self.metas.write().unwrap().clone_from(&loaded_meta);
Ok(())
}
fn mmap(&self, relative_path: &PathBuf) -> Result<MmapReadOnly, IOError> {
try!(self.get_read()).mmap(relative_path)
pub fn save_metas(&self,) -> Result<(), IOError> {
let metas_lock = self.metas.read().unwrap();
let encoded = json::encode(&*metas_lock).unwrap();
self.inner_index.write().unwrap().write_atomic(&META_FILEPATH, encoded)
}
}
struct InnerDirectory {
struct InnerIndex {
index_path: PathBuf,
mmap_cache: RefCell<HashMap<PathBuf, MmapReadOnly>>,
metas: DirectoryMeta,
_temp_directory: Option<TempDir>,
}
@@ -189,90 +205,47 @@ fn create_tempdir() -> Result<TempDir, IOError> {
}
impl InnerDirectory {
impl InnerIndex {
// TODO find a rusty way to hide that, while keeping
// it visible for IndexWriters.
pub fn publish_segment(&mut self, segment: Segment) -> Result<(), IOError> {
self.metas.segments.push(segment.segment_id.0.clone());
// TODO use logs
self.save_metas()
pub fn write_atomic(&self, path: &PathBuf, data: String) -> Result<(), IOError> {
let meta_file = atomicwrites::AtomicFile::new(path, atomicwrites::AllowOverwrite);
meta_file.write(|f| {
f.write_all(data.as_bytes())
})
}
pub fn create<P: AsRef<Path>>(filepath: P, schema: Schema) -> Result<InnerDirectory, CreateError> {
pub fn create<P: AsRef<Path>>(filepath: P) -> Result<InnerIndex, CreateError> {
let filepath_os_path = filepath.as_ref().as_os_str();
let mut directory = InnerDirectory {
let mut directory = InnerIndex {
index_path: PathBuf::from(&filepath_os_path),
mmap_cache: RefCell::new(HashMap::new()),
metas: DirectoryMeta::with_schema(schema),
_temp_directory: None,
};
Ok(directory)
}
pub fn create_from_tempdir(schema: Schema) -> Result<InnerDirectory, IOError> {
pub fn create_from_tempdir() -> Result<InnerIndex, IOError> {
let tempdir = try!(create_tempdir());
let tempdir_path = PathBuf::from(tempdir.path());
let mut directory = InnerDirectory {
let mut directory = InnerIndex {
index_path: PathBuf::from(tempdir_path),
mmap_cache: RefCell::new(HashMap::new()),
metas: DirectoryMeta::with_schema(schema),
_temp_directory: Some(tempdir)
};
Ok(directory)
}
pub fn open<P: AsRef<Path>>(filepath: &P) -> Result<InnerDirectory, IOError> {
let mut directory = InnerDirectory {
pub fn open<P: AsRef<Path>>(filepath: &P) -> Result<InnerIndex, IOError> {
let mut directory = InnerIndex {
index_path: PathBuf::from(filepath.as_ref().as_os_str()),
mmap_cache: RefCell::new(HashMap::new()),
metas: DirectoryMeta::new(),
_temp_directory: None,
};
try!(directory.load_metas()); //< does the directory already exists?
Ok(directory)
}
pub fn segment_ids(&self,) -> Vec<SegmentId> {
self.metas
.segments
.iter()
.cloned()
.map(SegmentId)
.collect()
}
pub fn load_metas(&mut self,) -> Result<(), IOError> {
let meta_filepath = self.meta_filepath();
let meta_data = fs::metadata(&meta_filepath);
if meta_data.is_err() {
// There is no meta data file.
// TODO check that the directory is empty.
return Ok(());
}
let mut meta_file = File::open(&meta_filepath).unwrap();
let mut meta_content = String::new();
meta_file.read_to_string(&mut meta_content);
self.metas = json::decode(&meta_content).unwrap();
Ok(())
}
fn meta_filepath(&self,) -> PathBuf {
self.resolve_path(&PathBuf::from("meta.json"))
}
pub fn save_metas(&self,) -> Result<(), IOError> {
let encoded = json::encode(&self.metas).unwrap();
let meta_filepath = self.meta_filepath();
let meta_file = atomicwrites::AtomicFile::new(meta_filepath, atomicwrites::AllowOverwrite);
meta_file.write(|f| {
f.write_all(encoded.as_bytes())
})
}
pub fn sync(&mut self, segment: Segment) -> Result<(), IOError> {
for component in [SegmentComponent::POSTINGS, SegmentComponent::TERMS].iter() {
let relative_path = segment.relative_path(component);
@@ -320,7 +293,7 @@ pub enum SegmentComponent {
#[derive(Debug, Clone)]
pub struct Segment {
directory: Directory,
index: Index,
segment_id: SegmentId,
}
@@ -349,11 +322,11 @@ impl Segment {
pub fn mmap(&self, component: SegmentComponent) -> Result<MmapReadOnly, IOError> {
let path = self.relative_path(&component);
self.directory.mmap(&path)
self.index.inner_index.read().unwrap().mmap(&path)
}
pub fn open_writable(&self, component: SegmentComponent) -> Result<File, IOError> {
let path = self.relative_path(&component);
self.directory.open_writable(&path)
self.index.inner_index.write().unwrap().open_writable(&path)
}
}

View File

@@ -1,9 +1,9 @@
use core::reader::SegmentReader;
use core::directory::Directory;
use core::directory::Index;
use core::directory::Segment;
use core::directory::SegmentId;
use core::schema::DocId;
use core::schema::Document;
use core::directory::Segment;
use core::collector::Collector;
use std::collections::HashMap;
use core::schema::Term;
@@ -44,9 +44,10 @@ impl Searcher {
}
}
pub fn for_directory(directory: Directory) -> Searcher {
pub fn for_index(index: Index) -> Searcher {
let mut searcher = Searcher::new();
for segment in directory.segments().into_iter() {
for segment in index.segments().into_iter() {
println!("Segment {:?} ", segment);
searcher.add_segment(segment);
}
searcher

View File

@@ -2,7 +2,7 @@ use core::schema::*;
use core::codec::*;
use std::io;
use std::rc::Rc;
use core::directory::Directory;
use core::directory::Index;
use core::analyzer::SimpleTokenizer;
use std::collections::BTreeMap;
use core::analyzer::StreamingIterator;
@@ -32,18 +32,18 @@ impl PostingsWriter {
pub struct IndexWriter {
segment_writer: Rc<SegmentWriter>,
directory: Directory,
directory: Index,
schema: Schema,
}
fn new_segment_writer(directory: &Directory, ) -> SegmentWriter {
fn new_segment_writer(directory: &Index, ) -> SegmentWriter {
let segment = directory.new_segment();
SegmentWriter::for_segment(segment)
}
impl IndexWriter {
pub fn open(directory: &Directory) -> IndexWriter {
pub fn open(directory: &Index) -> IndexWriter {
let schema = directory.schema();
IndexWriter {
segment_writer: Rc::new(new_segment_writer(&directory)),

View File

@@ -8,7 +8,7 @@ use tantivy::core::schema::*;
use tantivy::core::writer::IndexWriter;
use tantivy::core::collector::Collector;
use tantivy::core::searcher::Searcher;
use tantivy::core::directory::{Directory, generate_segment_name, SegmentId};
use tantivy::core::directory::{Index, generate_segment_name, SegmentId};
use tantivy::core::reader::SegmentReader;
use regex::Regex;
use tantivy::core::serial::DebugSegmentSerializer;
@@ -49,7 +49,7 @@ fn test_indexing() {
let text_fieldtype = FieldOptions::new().set_tokenized_indexed();
let text_field = schema.add_field("text", &text_fieldtype);
let directory = Directory::create_from_tempdir(schema).unwrap();
let directory = Index::create_from_tempdir(schema).unwrap();
{
// writing the segment
@@ -92,11 +92,11 @@ fn test_searcher() {
let mut schema = Schema::new();
let text_fieldtype = FieldOptions::new().set_tokenized_indexed();
let text_field = schema.add_field("text", &text_fieldtype);
let directory = Directory::create_from_tempdir(schema).unwrap();
let index = Index::create_from_tempdir(schema).unwrap();
{
// writing the segment
let mut index_writer = IndexWriter::open(&directory);
let mut index_writer = IndexWriter::open(&index);
{
let mut doc = Document::new();
doc.set(&text_field, "af b");
@@ -115,9 +115,10 @@ fn test_searcher() {
let commit_result = index_writer.commit();
let segment = commit_result.unwrap();
}
println!("index {:?}", index.schema());
{
let searcher = Searcher::for_directory(directory);
let searcher = Searcher::for_index(index);
let get_doc_ids = |terms: Vec<Term>| {
let mut collector = TestCollector::new();
searcher.search(&terms, &mut collector);