diff --git a/src/core/codec.rs b/src/core/codec.rs index a3adf4df4..6d2d663dd 100644 --- a/src/core/codec.rs +++ b/src/core/codec.rs @@ -19,6 +19,7 @@ pub struct SimpleCodec; // TODO should we vint? pub struct SimpleSegmentSerializer { + segment: Segment, written_bytes_postings: usize, postings_write: File, store_write: File, @@ -27,6 +28,13 @@ pub struct SimpleSegmentSerializer { encoder: simdcompression::Encoder, } + +impl SimpleSegmentSerializer { + pub fn segment(&self,) -> Segment { + self.segment.clone() + } +} + impl SegmentSerializer<()> for SimpleSegmentSerializer { fn store_doc(&mut self, field: &mut Iterator) { @@ -84,13 +92,14 @@ impl SimpleCodec { // TODO impl packed int // TODO skip lists // TODO make that part of the codec API - fn serializer(segment: &Segment) -> Result { + pub fn serializer(segment: &Segment) -> Result { let term_write = try!(segment.open_writable(SegmentComponent::TERMS)); let postings_write = try!(segment.open_writable(SegmentComponent::POSTINGS)); let store_write = try!(segment.open_writable(SegmentComponent::STORE)); let term_fst_builder_result = MapBuilder::new(term_write); let term_fst_builder = term_fst_builder_result.unwrap(); Ok(SimpleSegmentSerializer { + segment: segment.clone(), written_bytes_postings: 0, postings_write: postings_write, store_write: store_write, @@ -102,7 +111,7 @@ impl SimpleCodec { pub fn write(index: &I, segment: &Segment) -> Result<()> { - let serializer = try!(SimpleCodec::serializer(segment)); - index.write(serializer) + let mut serializer = try!(SimpleCodec::serializer(segment)); + index.write(&mut serializer) } } diff --git a/src/core/serial.rs b/src/core/serial.rs index f5c085e05..117efc8a5 100644 --- a/src/core/serial.rs +++ b/src/core/serial.rs @@ -12,7 +12,7 @@ pub trait SegmentSerializer { } pub trait SerializableSegment { - fn write>(&self, serializer: SegSer) -> Result; + fn write>(&self, serializer: &mut SegSer) -> Result; } @@ -30,8 +30,8 @@ impl fmt::Debug for DebugSegmentSerializer { impl DebugSegmentSerializer { pub fn debug_string(index: &S) -> String { - let serializer = DebugSegmentSerializer::new(); - index.write(serializer).unwrap() + let mut serializer = DebugSegmentSerializer::new(); + index.write(&mut serializer).unwrap() } pub fn new() -> DebugSegmentSerializer { diff --git a/src/core/writer.rs b/src/core/writer.rs index 6b20a04cb..6322cc1f5 100644 --- a/src/core/writer.rs +++ b/src/core/writer.rs @@ -4,6 +4,7 @@ use std::slice; use core::global::*; use core::schema::*; use core::codec::*; +use std::rc::Rc; use core::directory::Directory; use core::analyzer::SimpleTokenizer; use std::collections::{HashMap, BTreeMap}; @@ -40,24 +41,30 @@ impl PostingsWriter { } pub struct IndexWriter { - segment_writer: SegmentWriter, + segment_writer: Rc, directory: Directory, schema: Schema, } + + fn new_segment_writer(directory: &Directory, ) -> SegmentWriter { + let segment = directory.new_segment(); + SegmentWriter::for_segment(segment) +} + impl IndexWriter { pub fn open(directory: &Directory) -> IndexWriter { let schema = directory.schema(); IndexWriter { - segment_writer: SegmentWriter::new(), + segment_writer: Rc::new(new_segment_writer(&directory)), directory: directory.clone(), schema: schema, } } pub fn add(&mut self, doc: Document) { - self.segment_writer.add(doc, &self.schema); + Rc::get_mut(&mut self.segment_writer).unwrap().add(doc, &self.schema); } // TODO remove that some day @@ -66,14 +73,27 @@ impl IndexWriter { } pub fn commit(&mut self,) -> Result { - let segment = self.directory.new_segment(); - try!(SimpleCodec::write(&self.segment_writer, &segment).map(|sz| (segment.clone(), sz))); - // At this point, the segment is written - // We still need to sync all of the file, as well as the parent directory. - try!(self.directory.sync(segment.clone())); - self.directory.publish_segment(segment.clone()); - self.segment_writer = SegmentWriter::new(); - Ok(segment) + // TODO error handling + let mut segment_writer_rc = self.segment_writer.clone(); + self.segment_writer = Rc::new(new_segment_writer(&self.directory)); + let segment_writer_res = Rc::try_unwrap(segment_writer_rc); + match segment_writer_res { + Ok(segment_writer) => { + let segment = segment_writer.segment(); + segment_writer.write_pending(); + // write(self.segment_serializer); + // try!(SimpleCodec::write(&self.segment_writer, &segment).map(|sz| (segment.clone(), sz))); + // At this point, the segment is written + // We still need to sync all of the file, as well as the parent directory. + try!(self.directory.sync(segment.clone())); + self.directory.publish_segment(segment.clone()); + Ok(segment) + }, + Err(_) => { + panic!("error while acquiring segment writer."); + } + } + } } @@ -85,24 +105,49 @@ pub struct SegmentWriter { postings: Vec, term_index: BTreeMap, tokenizer: SimpleTokenizer, + segment_serializer: SimpleSegmentSerializer, } -impl Drop for SegmentWriter { - fn drop(&mut self) { - println!("num tokens {}", self.num_tokens); - } -} +// impl Drop for SegmentWriter { +// fn drop(&mut self) { +// println!("num tokens {}", self.num_tokens); +// } +// } impl SegmentWriter { - fn new() -> SegmentWriter { + // write on disk all of the stuff that + // are still on RAM. + // for this version, that's the term dictionary + // and the postings + fn write_pending(mut self,) -> Result<()> { + //self.write(&mut self.segment_serializer); + { + for (term, postings_id) in self.term_index.iter() { + let doc_ids = &self.postings[postings_id.clone()].doc_ids; + let term_docfreq = doc_ids.len() as u32; + self.segment_serializer.new_term(&term, term_docfreq); + self.segment_serializer.write_docs(&doc_ids); + } + } + self.segment_serializer.close() + } + + pub fn segment(&self,) -> Segment { + self.segment_serializer.segment() + } + + fn for_segment(segment: Segment) -> SegmentWriter { + // TODO handle error + let segment_serializer = SimpleCodec::serializer(&segment).unwrap(); SegmentWriter { num_tokens: 0, max_doc: 0, postings: Vec::new(), term_index: BTreeMap::new(), tokenizer: SimpleTokenizer::new(), + segment_serializer: segment_serializer, } } @@ -141,15 +186,15 @@ impl SegmentWriter { self.get_postings_writer(term).suscribe(doc); } } - -impl SerializableSegment for SegmentWriter { - fn write>(&self, mut serializer: SegSer) -> Result { - for (term, postings_id) in self.term_index.iter() { - let doc_ids = &self.postings[postings_id.clone()].doc_ids; - let term_docfreq = doc_ids.len() as u32; - serializer.new_term(&term, term_docfreq); - serializer.write_docs(&doc_ids); - } - serializer.close() - } -} +// +// impl SerializableSegment for SegmentWriter { +// fn write>(&self, serializer: &mut SegSer) -> Result { +// for (term, postings_id) in self.term_index.iter() { +// let doc_ids = &self.postings[postings_id.clone()].doc_ids; +// let term_docfreq = doc_ids.len() as u32; +// serializer.new_term(&term, term_docfreq); +// serializer.write_docs(&doc_ids); +// } +// serializer.close() +// } +// } diff --git a/tests/core.rs b/tests/core.rs index 3d3aa1d58..dbd824c24 100644 --- a/tests/core.rs +++ b/tests/core.rs @@ -94,8 +94,8 @@ fn test_indexing() { index_writer.add(doc); } - let debug_serializer = DebugSegmentSerializer::new(); - let segment_str_before_writing = DebugSegmentSerializer::debug_string(index_writer.current_segment_writer()); + //let debug_serializer = DebugSegmentSerializer::new(); + //let segment_str_before_writing = DebugSegmentSerializer::debug_string(index_writer.current_segment_writer()); let commit_result = index_writer.commit(); assert!(commit_result.is_ok()); let segment = commit_result.unwrap();