diff --git a/src/core/codec.rs b/src/core/codec.rs index c962abd82..91cdf37c0 100644 --- a/src/core/codec.rs +++ b/src/core/codec.rs @@ -39,7 +39,6 @@ impl SimpleCodec { // TODO skip lists - pub fn write<'a, I: SerializableSegment<'a>>(index: &'a I, segment: &'a Segment) -> Result { let term_write = try!(segment.open_writable(SegmentComponent::TERMS)); let mut postings_write = try!(segment.open_writable(SegmentComponent::POSTINGS)); diff --git a/src/core/directory.rs b/src/core/directory.rs index fb1ab0360..2d94e9841 100644 --- a/src/core/directory.rs +++ b/src/core/directory.rs @@ -14,8 +14,8 @@ use std::ops::Deref; use std::cell::RefCell; use core::error::*; use rand::{thread_rng, Rng}; -// use memmap::{Mmap, Protection}; use fst::raw::MmapReadOnly; +// use sys::fs as fs_imp; #[derive(Clone, Debug)] pub struct SegmentId(pub String); @@ -33,6 +33,7 @@ pub fn generate_segment_name() -> SegmentId { pub struct Directory { index_path: PathBuf, mmap_cache: Arc>>, + segments: Vec, } impl fmt::Debug for Directory { @@ -52,13 +53,58 @@ fn open_mmap(full_path: &PathBuf) -> Result { } } +fn sync_file(filepath: &PathBuf) -> Result<()> { + match File::open(filepath.clone()) { + Ok(fd) => { + match fd.sync_all() { + Err(err) => Err(Error::IOError(err.kind(), format!("Failed to sync {:?}", filepath))), + _ => Ok(()) + } + }, + Err(err) => Err(Error::IOError(err.kind(), format!("Cause: {:?}", err))) + } +} + impl Directory { - pub fn from(filepath: &str) -> Directory { - Directory { + // TODO find a rusty way to hide that, while keeping + // it visible for IndexWriters. + pub fn publish_segment(&mut self, segment: Segment) { + self.segments.push(segment.clone()); + self.save_metas(); + } + + pub fn from(filepath: &str) -> Result { + // TODO error management + let mut directory = Directory { index_path: PathBuf::from(filepath), mmap_cache: Arc::new(Mutex::new(HashMap::new())), + segments: Vec::new() + }; + try!(directory.load_metas()); //< does the directory already exists? + Ok(directory) + } + + pub fn load_metas(&mut self,) -> Result<()> { + // TODO load segment info + Ok(()) + } + + pub fn save_metas(&self,) -> Result<()> { + // TODO + Ok(()) + } + + + pub fn sync(&self, segment: Segment) -> Result<()> { + for component in [SegmentComponent::POSTINGS, SegmentComponent::TERMS].iter() { + let relative_path = segment.relative_path(component); + let full_path = self.resolve_path(&relative_path); + try!(sync_file(&full_path)); } + // syncing the directory itself + try!(sync_file(&self.index_path)); + Ok(()) } fn resolve_path(&self, relative_path: &PathBuf) -> PathBuf { @@ -113,38 +159,38 @@ impl Directory { pub enum SegmentComponent { POSTINGS, - POSITIONS, + // POSITIONS, TERMS, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Segment { directory: Directory, segment_id: SegmentId, } impl Segment { - fn path_suffix(component: SegmentComponent)-> &'static str { - match component { + fn path_suffix(component: &SegmentComponent)-> &'static str { + match *component { SegmentComponent::POSTINGS => ".idx", - SegmentComponent::POSITIONS => ".pos", + // SegmentComponent::POSITIONS => ".pos", SegmentComponent::TERMS => ".term", } } - pub fn relative_path(&self, component: SegmentComponent) -> PathBuf { + pub fn relative_path(&self, component: &SegmentComponent) -> PathBuf { let SegmentId(ref segment_id_str) = self.segment_id; let filename = String::new() + segment_id_str + Segment::path_suffix(component); PathBuf::from(filename) } pub fn mmap(&self, component: SegmentComponent) -> Result { - let path = self.relative_path(component); + let path = self.relative_path(&component); self.directory.mmap(&path) } pub fn open_writable(&self, component: SegmentComponent) -> Result { - let path = self.relative_path(component); + let path = self.relative_path(&component); self.directory.open_writable(&path) } } diff --git a/src/core/reader.rs b/src/core/reader.rs index f482c69bb..f9b220bf6 100644 --- a/src/core/reader.rs +++ b/src/core/reader.rs @@ -98,7 +98,7 @@ impl SegmentIndexReader { let term_offsets = match fst::Map::from_mmap(term_shared_mmap) { Ok(term_offsets) => term_offsets, Err(_) => { - let filepath = segment.relative_path(SegmentComponent::TERMS); + let filepath = segment.relative_path(&SegmentComponent::TERMS); return Err(Error::FSTFormat(format!("The file {:?} does not seem to be a valid term to offset transducer.", filepath))); } }; diff --git a/src/core/writer.rs b/src/core/writer.rs index 6de0401ba..559fef3a5 100644 --- a/src/core/writer.rs +++ b/src/core/writer.rs @@ -70,9 +70,22 @@ impl FieldWriter { } } -pub struct IndexWriter { +pub struct SegmentWriter { max_doc: DocId, term_writers: HashMap, +} + +impl SegmentWriter { + fn new() -> SegmentWriter { + SegmentWriter { + max_doc: 0, + term_writers: HashMap::new(), + } + } +} + +pub struct IndexWriter { + segment_writer: SegmentWriter, directory: Directory, } @@ -80,12 +93,29 @@ impl IndexWriter { pub fn open(directory: &Directory) -> IndexWriter { IndexWriter { - max_doc: 0, - term_writers: HashMap::new(), - directory: (*directory).clone(), - } + segment_writer: SegmentWriter::new(), + directory: directory.clone(), + } } + pub fn add(&mut self, doc: Document) { + self.segment_writer.add(doc); + } + + pub fn commit(&mut self,) -> Result { + let segment = self.directory.new_segment(); + try!(SimpleCodec::write(&self.segment_writer, &segment).map(|sz| (segment.clone(), sz))); + // At this point, the segment is written + // We still need to sync all of the file, as well as the parent directory. + try!(self.directory.sync(segment.clone())); + self.directory.publish_segment(segment.clone()); + self.segment_writer = SegmentWriter::new(); + Ok(segment) + } + +} + +impl SegmentWriter { fn get_field_writer<'a>(&'a mut self, field: &Field) -> &'a mut FieldWriter { if !self.term_writers.contains_key(field) { self.term_writers.insert((*field).clone(), FieldWriter::new()); @@ -105,10 +135,6 @@ impl IndexWriter { self.max_doc += 1; } - pub fn commit(self,) -> Result<(Segment, usize)> { - let segment = self.directory.new_segment(); - SimpleCodec::write(&self, &segment).map(|sz| (segment, sz)) - } } @@ -232,16 +258,14 @@ impl<'a> TermCursor for CIWTermCursor<'a> { } } -// -// TODO use a Term type -// -impl<'a> SerializableSegment<'a> for IndexWriter { + +impl<'a> SerializableSegment<'a> for SegmentWriter { type TermCur = CIWTermCursor<'a>; fn term_cursor(&'a self) -> CIWTermCursor<'a> { let mut field_it: hash_map::Iter<'a, Field, FieldWriter> = self.term_writers.iter(); - let (field, field_writer) = field_it.next().unwrap(); // TODO handle no field + let (field, field_writer) = field_it.next().unwrap(); CIWTermCursor { field_it: field_it, form_it: CIWFormCursor { diff --git a/tests/core.rs b/tests/core.rs index 0753d320e..964ea72fa 100644 --- a/tests/core.rs +++ b/tests/core.rs @@ -19,6 +19,7 @@ use tantivy::core::reader::SegmentIndexReader; use std::io::{ BufWriter, Write}; use regex::Regex; use std::convert::From; +use std::path::PathBuf; #[test] fn test_intersection() { @@ -37,7 +38,7 @@ fn test_tokenizer() { #[test] fn test_indexing() { - let directory = Directory::from("/Users/pmasurel/temp/idx"); + let directory = Directory::from("/Users/pmasurel/temp/idx").unwrap(); { // writing the segment let mut index_writer = IndexWriter::open(&directory); @@ -57,10 +58,12 @@ fn test_indexing() { index_writer.add(doc); } - let (segment, num_bytes) = index_writer.commit().unwrap(); + let commit_result = index_writer.commit(); + assert!(commit_result.is_ok()); // reading the segment println!("------"); { + let segment = commit_result.unwrap(); let index_reader = SegmentIndexReader::open(segment).unwrap(); let mut term_cursor = index_reader.term_cursor(); loop { @@ -86,6 +89,8 @@ fn test_indexing() { } + + #[test] fn test_new_segment() { let SegmentId(segment_name) = generate_segment_name();