From 345db8e62dbda005026d4f17b14070fc6a7c495d Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 1 Mar 2016 18:59:13 +0900 Subject: [PATCH 1/3] renamed directory to index --- src/core/directory.rs | 91 +++++++++++++++++++++---------------------- src/core/searcher.rs | 8 ++-- src/core/writer.rs | 8 ++-- tests/core.rs | 10 ++--- 4 files changed, 58 insertions(+), 59 deletions(-) diff --git a/src/core/directory.rs b/src/core/directory.rs index 3aa87dda8..41986fd49 100644 --- a/src/core/directory.rs +++ b/src/core/directory.rs @@ -31,29 +31,29 @@ pub fn generate_segment_name() -> SegmentId { } #[derive(Clone,Debug,RustcDecodable, RustcEncodable)] -pub struct DirectoryMeta { +pub struct IndexMeta { segments: Vec, schema: Schema, } -impl DirectoryMeta { - fn new() -> DirectoryMeta { - DirectoryMeta { +impl IndexMeta { + fn new() -> IndexMeta { + IndexMeta { segments: Vec::new(), schema: Schema::new(), } } - fn with_schema(schema: Schema) -> DirectoryMeta { - DirectoryMeta { + fn with_schema(schema: Schema) -> IndexMeta { + IndexMeta { segments: Vec::new(), schema: schema, } } } -impl fmt::Debug for Directory { +impl fmt::Debug for Index { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Directory({:?})", self.inner_directory.read().unwrap().index_path) + write!(f, "Index({:?})", self.inner_index.read().unwrap().index_path) } } @@ -68,41 +68,40 @@ fn sync_file(filepath: &PathBuf) -> Result<(), IOError> { #[derive(Clone)] -pub struct Directory { - inner_directory: Arc>, +pub struct Index { + inner_index: Arc>, } - pub enum CreateError { - RootDirectoryDoesNotExist, - DirectoryAlreadyExists, + RootIndexDoesNotExist, + IndexAlreadyExists, CannotOpenMetaFile, } -struct DirectoryError; +struct IndexError; -impl Directory { +impl Index { - pub fn create(filepath: &Path, schema: Schema) -> Result { - let inner_directory = try!(InnerDirectory::create(filepath, schema)); - Ok(Directory { - inner_directory: Arc::new(RwLock::new(inner_directory)), + pub fn create(filepath: &Path, schema: Schema) -> Result { + let inner_index = try!(InnerIndex::create(filepath, schema)); + Ok(Index { + inner_index: Arc::new(RwLock::new(inner_index)), }) } - pub fn create_from_tempdir(schema: Schema) -> Result { - let inner_directory = try!(InnerDirectory::create_from_tempdir(schema)); - Ok(Directory { - inner_directory: Arc::new(RwLock::new(inner_directory)), + pub fn create_from_tempdir(schema: Schema) -> Result { + let inner_index = try!(InnerIndex::create_from_tempdir(schema)); + Ok(Index { + inner_index: Arc::new(RwLock::new(inner_index)), }) } - pub fn open>(filepath: &P) -> Result { - let inner_directory = try!(InnerDirectory::open(filepath)); - Ok(Directory { - inner_directory: Arc::new(RwLock::new(inner_directory)), + pub fn open>(filepath: &P) -> Result { + let inner_index = try!(InnerIndex::open(filepath)); + Ok(Index { + inner_index: Arc::new(RwLock::new(inner_index)), }) } @@ -110,16 +109,16 @@ impl Directory { self.get_read().unwrap().metas.schema.clone() } - fn get_write(&mut self) -> Result, IOError> { - self.inner_directory + fn get_write(&mut self) -> Result, IOError> { + self.inner_index .write() .map_err(|e| IOError::new(IOErrorKind::Other, format!("Failed acquiring lock on directory.\n It can happen if another thread panicked! Error was: {:?}", e) )) } - fn get_read(&self) -> Result, IOError> { - self.inner_directory + fn get_read(&self) -> Result, IOError> { + self.inner_index .read() .map_err(|e| IOError::new(IOErrorKind::Other, format!("Failed acquiring lock on directory.\n @@ -133,7 +132,7 @@ impl Directory { pub fn load_metas(&self,) -> Result<(), IOError> { - self.inner_directory + self.inner_index .write() .unwrap() // only fail when another thread has already panicked. .load_metas() @@ -145,7 +144,7 @@ impl Directory { pub fn segments(&self,) -> Vec { // TODO handle error - self.inner_directory + self.inner_index .read() .unwrap() .segment_ids() @@ -176,10 +175,10 @@ impl Directory { } -struct InnerDirectory { +struct InnerIndex { index_path: PathBuf, mmap_cache: RefCell>, - metas: DirectoryMeta, + metas: IndexMeta, _temp_directory: Option, } @@ -189,7 +188,7 @@ fn create_tempdir() -> Result { } -impl InnerDirectory { +impl InnerIndex { // TODO find a rusty way to hide that, while keeping // it visible for IndexWriters. @@ -199,34 +198,34 @@ impl InnerDirectory { self.save_metas() } - pub fn create>(filepath: P, schema: Schema) -> Result { + pub fn create>(filepath: P, schema: Schema) -> Result { let filepath_os_path = filepath.as_ref().as_os_str(); - let mut directory = InnerDirectory { + let mut directory = InnerIndex { index_path: PathBuf::from(&filepath_os_path), mmap_cache: RefCell::new(HashMap::new()), - metas: DirectoryMeta::with_schema(schema), + metas: IndexMeta::with_schema(schema), _temp_directory: None, }; Ok(directory) } - pub fn create_from_tempdir(schema: Schema) -> Result { + pub fn create_from_tempdir(schema: Schema) -> Result { let tempdir = try!(create_tempdir()); let tempdir_path = PathBuf::from(tempdir.path()); - let mut directory = InnerDirectory { + let mut directory = InnerIndex { index_path: PathBuf::from(tempdir_path), mmap_cache: RefCell::new(HashMap::new()), - metas: DirectoryMeta::with_schema(schema), + metas: IndexMeta::with_schema(schema), _temp_directory: Some(tempdir) }; Ok(directory) } - pub fn open>(filepath: &P) -> Result { - let mut directory = InnerDirectory { + pub fn open>(filepath: &P) -> Result { + let mut directory = InnerIndex { index_path: PathBuf::from(filepath.as_ref().as_os_str()), mmap_cache: RefCell::new(HashMap::new()), - metas: DirectoryMeta::new(), + metas: IndexMeta::new(), _temp_directory: None, }; try!(directory.load_metas()); //< does the directory already exists? @@ -320,7 +319,7 @@ pub enum SegmentComponent { #[derive(Debug, Clone)] pub struct Segment { - directory: Directory, + directory: Index, segment_id: SegmentId, } diff --git a/src/core/searcher.rs b/src/core/searcher.rs index 145af5717..ebf1baf74 100644 --- a/src/core/searcher.rs +++ b/src/core/searcher.rs @@ -1,9 +1,9 @@ use core::reader::SegmentReader; -use core::directory::Directory; +use core::directory::Index; +use core::directory::Segment; use core::directory::SegmentId; use core::schema::DocId; use core::schema::Document; -use core::directory::Segment; use core::collector::Collector; use std::collections::HashMap; use core::schema::Term; @@ -44,9 +44,9 @@ impl Searcher { } } - pub fn for_directory(directory: Directory) -> Searcher { + pub fn for_index(index: Index) -> Searcher { let mut searcher = Searcher::new(); - for segment in directory.segments().into_iter() { + for segment in index.segments().into_iter() { searcher.add_segment(segment); } searcher diff --git a/src/core/writer.rs b/src/core/writer.rs index ecf360db9..104c9c693 100644 --- a/src/core/writer.rs +++ b/src/core/writer.rs @@ -2,7 +2,7 @@ use core::schema::*; use core::codec::*; use std::io; use std::rc::Rc; -use core::directory::Directory; +use core::directory::Index; use core::analyzer::SimpleTokenizer; use std::collections::BTreeMap; use core::analyzer::StreamingIterator; @@ -32,18 +32,18 @@ impl PostingsWriter { pub struct IndexWriter { segment_writer: Rc, - directory: Directory, + directory: Index, schema: Schema, } -fn new_segment_writer(directory: &Directory, ) -> SegmentWriter { +fn new_segment_writer(directory: &Index, ) -> SegmentWriter { let segment = directory.new_segment(); SegmentWriter::for_segment(segment) } impl IndexWriter { - pub fn open(directory: &Directory) -> IndexWriter { + pub fn open(directory: &Index) -> IndexWriter { let schema = directory.schema(); IndexWriter { segment_writer: Rc::new(new_segment_writer(&directory)), diff --git a/tests/core.rs b/tests/core.rs index d4763f0f2..a7f159a93 100644 --- a/tests/core.rs +++ b/tests/core.rs @@ -8,7 +8,7 @@ use tantivy::core::schema::*; use tantivy::core::writer::IndexWriter; use tantivy::core::collector::Collector; use tantivy::core::searcher::Searcher; -use tantivy::core::directory::{Directory, generate_segment_name, SegmentId}; +use tantivy::core::directory::{Index, generate_segment_name, SegmentId}; use tantivy::core::reader::SegmentReader; use regex::Regex; use tantivy::core::serial::DebugSegmentSerializer; @@ -49,7 +49,7 @@ fn test_indexing() { let text_fieldtype = FieldOptions::new().set_tokenized_indexed(); let text_field = schema.add_field("text", &text_fieldtype); - let directory = Directory::create_from_tempdir(schema).unwrap(); + let directory = Index::create_from_tempdir(schema).unwrap(); { // writing the segment @@ -92,11 +92,11 @@ fn test_searcher() { let mut schema = Schema::new(); let text_fieldtype = FieldOptions::new().set_tokenized_indexed(); let text_field = schema.add_field("text", &text_fieldtype); - let directory = Directory::create_from_tempdir(schema).unwrap(); + let index = Index::create_from_tempdir(schema).unwrap(); { // writing the segment - let mut index_writer = IndexWriter::open(&directory); + let mut index_writer = IndexWriter::open(&index); { let mut doc = Document::new(); doc.set(&text_field, "af b"); @@ -117,7 +117,7 @@ fn test_searcher() { } { - let searcher = Searcher::for_directory(directory); + let searcher = Searcher::for_index(index); let get_doc_ids = |terms: Vec| { let mut collector = TestCollector::new(); searcher.search(&terms, &mut collector); From a6945c8090e9c32f0e51f1ec537b4686af47b735 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 1 Mar 2016 19:46:57 +0900 Subject: [PATCH 2/3] blop --- src/core/directory.rs | 158 +++++++++++++++++++----------------------- 1 file changed, 71 insertions(+), 87 deletions(-) diff --git a/src/core/directory.rs b/src/core/directory.rs index 41986fd49..12ba2e909 100644 --- a/src/core/directory.rs +++ b/src/core/directory.rs @@ -69,6 +69,7 @@ fn sync_file(filepath: &PathBuf) -> Result<(), IOError> { #[derive(Clone)] pub struct Index { + metas: IndexMeta, inner_index: Arc>, } @@ -80,33 +81,38 @@ pub enum CreateError { struct IndexError; - +lazy_static! { + static ref META_FILEPATH: PathBuf = PathBuf::from("meta.json"); +} impl Index { - pub fn create(filepath: &Path, schema: Schema) -> Result { - let inner_index = try!(InnerIndex::create(filepath, schema)); - Ok(Index { + fn from_inner_index(inner_index: InnerIndex, schema: Schema) -> Index { + Index { + metas: IndexMeta::with_schema(schema), inner_index: Arc::new(RwLock::new(inner_index)), - }) + } + } + + pub fn create(filepath: &Path, schema: Schema) -> Result { + let inner_index = try!(InnerIndex::create(filepath)); + Ok(Index::from_inner_index(inner_index, schema)) } pub fn create_from_tempdir(schema: Schema) -> Result { - let inner_index = try!(InnerIndex::create_from_tempdir(schema)); - Ok(Index { - inner_index: Arc::new(RwLock::new(inner_index)), - }) + let inner_index = try!(InnerIndex::create_from_tempdir()); + Ok(Index::from_inner_index(inner_index, schema)) } pub fn open>(filepath: &P) -> Result { let inner_index = try!(InnerIndex::open(filepath)); - Ok(Index { - inner_index: Arc::new(RwLock::new(inner_index)), - }) + let mut index = Index::from_inner_index(inner_index, Schema::new()); + try!(index.load_metas()); //< does the directory already exists? + Ok(index) } pub fn schema(&self,) -> Schema { - self.get_read().unwrap().metas.schema.clone() + self.metas.schema.clone() } fn get_write(&mut self) -> Result, IOError> { @@ -125,18 +131,20 @@ impl Index { It can happen if another thread panicked! Error was: {:?}", e) )) } + // TODO find a rusty way to hide that, while keeping + // it visible for IndexWriters. pub fn publish_segment(&mut self, segment: Segment) -> Result<(), IOError> { - try!(self.get_write()).publish_segment(segment) + self.metas.segments.push(segment.segment_id.0.clone()); + // TODO use logs + self.save_metas() } - - - pub fn load_metas(&self,) -> Result<(), IOError> { - self.inner_index - .write() - .unwrap() // only fail when another thread has already panicked. - .load_metas() - } + // pub fn load_metas(&self,) -> Result<(), IOError> { + // self.inner_index + // .write() + // .unwrap() // only fail when another thread has already panicked. + // .load_metas() + // } pub fn sync(&mut self, segment: Segment) -> Result<(), IOError> { try!(self.get_write()).sync(segment) @@ -144,10 +152,7 @@ impl Index { pub fn segments(&self,) -> Vec { // TODO handle error - self.inner_index - .read() - .unwrap() - .segment_ids() + self.segment_ids() .into_iter() .map(|segment_id| self.segment(&segment_id)) .collect() @@ -155,30 +160,52 @@ impl Index { pub fn segment(&self, segment_id: &SegmentId) -> Segment { Segment { - directory: self.clone(), + index: self.clone(), segment_id: segment_id.clone() } } + fn segment_ids(&self,) -> Vec { + self.metas + .segments + .iter() + .cloned() + .map(SegmentId) + .collect() + } + pub fn new_segment(&self,) -> Segment { // TODO check it does not exists self.segment(&generate_segment_name()) } - fn open_writable(&self, relative_path: &PathBuf) -> Result { - try!(self.get_read()).open_writable(relative_path) + + pub fn load_metas(&mut self,) -> Result<(), IOError> { + let mut meta_file = try!(self.inner_index.read().unwrap().mmap(&META_FILEPATH)); + let mut meta_content = String::from_utf8_lossy(unsafe {meta_file.as_slice()}); + println!("META CONTENT {:?}", meta_content); + self.metas = json::decode(&meta_content).unwrap(); + Ok(()) } - fn mmap(&self, relative_path: &PathBuf) -> Result { - try!(self.get_read()).mmap(relative_path) + pub fn save_metas(&self,) -> Result<(), IOError> { + let encoded = json::encode(&self.metas).unwrap(); + self.inner_index.write().unwrap().write_atomic(&META_FILEPATH, encoded) } + + // fn open_writable(&self, relative_path: &PathBuf) -> Result { + // try!(self.get_read()).open_writable(relative_path) + // } + // + // fn mmap(&self, relative_path: &PathBuf) -> Result { + // try!(self.get_read()).mmap(relative_path) + // } } struct InnerIndex { index_path: PathBuf, mmap_cache: RefCell>, - metas: IndexMeta, _temp_directory: Option, } @@ -190,32 +217,31 @@ fn create_tempdir() -> Result { impl InnerIndex { - // TODO find a rusty way to hide that, while keeping - // it visible for IndexWriters. - pub fn publish_segment(&mut self, segment: Segment) -> Result<(), IOError> { - self.metas.segments.push(segment.segment_id.0.clone()); - // TODO use logs - self.save_metas() + + + pub fn write_atomic(&self, path: &PathBuf, data: String) -> Result<(), IOError> { + let meta_file = atomicwrites::AtomicFile::new(path, atomicwrites::AllowOverwrite); + meta_file.write(|f| { + f.write_all(data.as_bytes()) + }) } - pub fn create>(filepath: P, schema: Schema) -> Result { + pub fn create>(filepath: P) -> Result { let filepath_os_path = filepath.as_ref().as_os_str(); let mut directory = InnerIndex { index_path: PathBuf::from(&filepath_os_path), mmap_cache: RefCell::new(HashMap::new()), - metas: IndexMeta::with_schema(schema), _temp_directory: None, }; Ok(directory) } - pub fn create_from_tempdir(schema: Schema) -> Result { + pub fn create_from_tempdir() -> Result { let tempdir = try!(create_tempdir()); let tempdir_path = PathBuf::from(tempdir.path()); let mut directory = InnerIndex { index_path: PathBuf::from(tempdir_path), mmap_cache: RefCell::new(HashMap::new()), - metas: IndexMeta::with_schema(schema), _temp_directory: Some(tempdir) }; Ok(directory) @@ -225,53 +251,11 @@ impl InnerIndex { let mut directory = InnerIndex { index_path: PathBuf::from(filepath.as_ref().as_os_str()), mmap_cache: RefCell::new(HashMap::new()), - metas: IndexMeta::new(), _temp_directory: None, }; - try!(directory.load_metas()); //< does the directory already exists? Ok(directory) } - pub fn segment_ids(&self,) -> Vec { - self.metas - .segments - .iter() - .cloned() - .map(SegmentId) - .collect() - } - - - - pub fn load_metas(&mut self,) -> Result<(), IOError> { - let meta_filepath = self.meta_filepath(); - let meta_data = fs::metadata(&meta_filepath); - if meta_data.is_err() { - // There is no meta data file. - // TODO check that the directory is empty. - return Ok(()); - } - - let mut meta_file = File::open(&meta_filepath).unwrap(); - let mut meta_content = String::new(); - meta_file.read_to_string(&mut meta_content); - self.metas = json::decode(&meta_content).unwrap(); - Ok(()) - } - - fn meta_filepath(&self,) -> PathBuf { - self.resolve_path(&PathBuf::from("meta.json")) - } - - pub fn save_metas(&self,) -> Result<(), IOError> { - let encoded = json::encode(&self.metas).unwrap(); - let meta_filepath = self.meta_filepath(); - let meta_file = atomicwrites::AtomicFile::new(meta_filepath, atomicwrites::AllowOverwrite); - meta_file.write(|f| { - f.write_all(encoded.as_bytes()) - }) - } - pub fn sync(&mut self, segment: Segment) -> Result<(), IOError> { for component in [SegmentComponent::POSTINGS, SegmentComponent::TERMS].iter() { let relative_path = segment.relative_path(component); @@ -319,7 +303,7 @@ pub enum SegmentComponent { #[derive(Debug, Clone)] pub struct Segment { - directory: Index, + index: Index, segment_id: SegmentId, } @@ -348,11 +332,11 @@ impl Segment { pub fn mmap(&self, component: SegmentComponent) -> Result { let path = self.relative_path(&component); - self.directory.mmap(&path) + self.index.inner_index.read().unwrap().mmap(&path) } pub fn open_writable(&self, component: SegmentComponent) -> Result { let path = self.relative_path(&component); - self.directory.open_writable(&path) + self.index.inner_index.write().unwrap().open_writable(&path) } } From 20fa8122182b677f687d1b5a37853adf277343d2 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 1 Mar 2016 23:45:04 +0900 Subject: [PATCH 3/3] blop --- src/core/directory.rs | 46 +++++++++++++++++-------------------------- src/core/searcher.rs | 1 + tests/core.rs | 1 + 3 files changed, 20 insertions(+), 28 deletions(-) diff --git a/src/core/directory.rs b/src/core/directory.rs index 12ba2e909..6dd4e89a6 100644 --- a/src/core/directory.rs +++ b/src/core/directory.rs @@ -30,7 +30,7 @@ pub fn generate_segment_name() -> SegmentId { SegmentId( String::from("_") + &random_name) } -#[derive(Clone,Debug,RustcDecodable, RustcEncodable)] +#[derive(Clone,Debug,RustcDecodable,RustcEncodable)] pub struct IndexMeta { segments: Vec, schema: Schema, @@ -69,7 +69,7 @@ fn sync_file(filepath: &PathBuf) -> Result<(), IOError> { #[derive(Clone)] pub struct Index { - metas: IndexMeta, + metas: Arc>, inner_index: Arc>, } @@ -87,13 +87,6 @@ lazy_static! { impl Index { - fn from_inner_index(inner_index: InnerIndex, schema: Schema) -> Index { - Index { - metas: IndexMeta::with_schema(schema), - inner_index: Arc::new(RwLock::new(inner_index)), - } - } - pub fn create(filepath: &Path, schema: Schema) -> Result { let inner_index = try!(InnerIndex::create(filepath)); Ok(Index::from_inner_index(inner_index, schema)) @@ -112,7 +105,7 @@ impl Index { } pub fn schema(&self,) -> Schema { - self.metas.schema.clone() + self.metas.read().unwrap().schema.clone() } fn get_write(&mut self) -> Result, IOError> { @@ -131,21 +124,22 @@ impl Index { It can happen if another thread panicked! Error was: {:?}", e) )) } + fn from_inner_index(inner_index: InnerIndex, schema: Schema) -> Index { + Index { + metas: Arc::new(RwLock::new(IndexMeta::with_schema(schema))), + inner_index: Arc::new(RwLock::new(inner_index)), + } + } + // TODO find a rusty way to hide that, while keeping // it visible for IndexWriters. pub fn publish_segment(&mut self, segment: Segment) -> Result<(), IOError> { - self.metas.segments.push(segment.segment_id.0.clone()); + println!("publish segment {:?}", segment); + self.metas.write().unwrap().segments.push(segment.segment_id.0.clone()); // TODO use logs self.save_metas() } - // pub fn load_metas(&self,) -> Result<(), IOError> { - // self.inner_index - // .write() - // .unwrap() // only fail when another thread has already panicked. - // .load_metas() - // } - pub fn sync(&mut self, segment: Segment) -> Result<(), IOError> { try!(self.get_write()).sync(segment) } @@ -167,6 +161,8 @@ impl Index { fn segment_ids(&self,) -> Vec { self.metas + .read() + .unwrap() .segments .iter() .cloned() @@ -184,22 +180,16 @@ impl Index { let mut meta_file = try!(self.inner_index.read().unwrap().mmap(&META_FILEPATH)); let mut meta_content = String::from_utf8_lossy(unsafe {meta_file.as_slice()}); println!("META CONTENT {:?}", meta_content); - self.metas = json::decode(&meta_content).unwrap(); + let loaded_meta: IndexMeta = json::decode(&meta_content).unwrap(); + self.metas.write().unwrap().clone_from(&loaded_meta); Ok(()) } pub fn save_metas(&self,) -> Result<(), IOError> { - let encoded = json::encode(&self.metas).unwrap(); + let metas_lock = self.metas.read().unwrap(); + let encoded = json::encode(&*metas_lock).unwrap(); self.inner_index.write().unwrap().write_atomic(&META_FILEPATH, encoded) } - - // fn open_writable(&self, relative_path: &PathBuf) -> Result { - // try!(self.get_read()).open_writable(relative_path) - // } - // - // fn mmap(&self, relative_path: &PathBuf) -> Result { - // try!(self.get_read()).mmap(relative_path) - // } } diff --git a/src/core/searcher.rs b/src/core/searcher.rs index ebf1baf74..b989faa7b 100644 --- a/src/core/searcher.rs +++ b/src/core/searcher.rs @@ -47,6 +47,7 @@ impl Searcher { pub fn for_index(index: Index) -> Searcher { let mut searcher = Searcher::new(); for segment in index.segments().into_iter() { + println!("Segment {:?} ", segment); searcher.add_segment(segment); } searcher diff --git a/tests/core.rs b/tests/core.rs index a7f159a93..d03aba0fc 100644 --- a/tests/core.rs +++ b/tests/core.rs @@ -115,6 +115,7 @@ fn test_searcher() { let commit_result = index_writer.commit(); let segment = commit_result.unwrap(); } + println!("index {:?}", index.schema()); { let searcher = Searcher::for_index(index);