diff --git a/src/core/codec.rs b/src/core/codec.rs index 14d2359f2..3ac6fdd72 100644 --- a/src/core/codec.rs +++ b/src/core/codec.rs @@ -1,79 +1,41 @@ use std::io; -use std::io::{Read, Write}; +use std::io::Write; use rustc_serialize::json; -use core::directory::WritePtr; use core::index::Segment; use core::index::SegmentInfo; use core::index::SegmentComponent; -use core::schema::Term; -use core::schema::DocId; -use core::fstmap::FstMapBuilder; use core::fastfield::FastFieldSerializer; use core::store::StoreWriter; -use core::serialize::BinarySerializable; -use core::simdcompression; +use core::postings::PostingsSerializer; use core::schema::TextFieldValue; use core::convert_to_ioerror; - -#[derive(Debug)] -pub struct TermInfo { - pub doc_freq: u32, - pub postings_offset: u32, -} - - -impl BinarySerializable for TermInfo { - fn serialize(&self, writer: &mut Write) -> io::Result { - Ok( - try!(self.doc_freq.serialize(writer)) + - try!(self.postings_offset.serialize(writer)) - ) - } - fn deserialize(reader: &mut Read) -> io::Result { - let doc_freq = try!(u32::deserialize(reader)); - let offset = try!(u32::deserialize(reader)); - Ok(TermInfo { - doc_freq: doc_freq, - postings_offset: offset, - }) - } -} - - pub struct SegmentSerializer { segment: Segment, - written_bytes_postings: usize, - postings_write: WritePtr, store_writer: StoreWriter, fast_field_serializer: FastFieldSerializer, - term_fst_builder: FstMapBuilder, // TODO find an alternative to work around the "move" - encoder: simdcompression::Encoder, + postings_serializer: PostingsSerializer, } - - impl SegmentSerializer { pub fn for_segment(segment: &Segment) -> io::Result { - let term_write = try!(segment.open_write(SegmentComponent::TERMS)); - let postings_write = try!(segment.open_write(SegmentComponent::POSTINGS)); let store_write = try!(segment.open_write(SegmentComponent::STORE)); let fast_field_write = try!(segment.open_write(SegmentComponent::FASTFIELDS)); - let term_fst_builder_result = FstMapBuilder::new(term_write); - let term_fst_builder = term_fst_builder_result.unwrap(); let fast_field_serializer = try!(FastFieldSerializer::new(fast_field_write)); + let postings_serializer = try!(PostingsSerializer::open(segment)); Ok(SegmentSerializer { segment: segment.clone(), - written_bytes_postings: 0, - postings_write: postings_write, + postings_serializer: postings_serializer, store_writer: StoreWriter::new(store_write), fast_field_serializer: fast_field_serializer, - term_fst_builder: term_fst_builder, - encoder: simdcompression::Encoder::new(), }) } + pub fn get_postings_serializer(&mut self,) -> &mut PostingsSerializer { + &mut self.postings_serializer + } + pub fn get_fast_field_serializer(&mut self,) -> &mut FastFieldSerializer { &mut self.fast_field_serializer } @@ -88,24 +50,6 @@ impl SegmentSerializer { Ok(()) } - pub fn new_term(&mut self, term: &Term, doc_freq: DocId) -> io::Result<()> { - let term_info = TermInfo { - doc_freq: doc_freq, - postings_offset: self.written_bytes_postings as u32, - }; - self.term_fst_builder - .insert(term.as_slice(), &term_info) - } - - pub fn write_docs(&mut self, doc_ids: &[DocId]) -> io::Result<()> { - let docs_data = self.encoder.encode_sorted(doc_ids); - self.written_bytes_postings += try!((docs_data.len() as u32).serialize(&mut self.postings_write)); - for num in docs_data { - self.written_bytes_postings += try!(num.serialize(&mut self.postings_write)); - } - Ok(()) - } - pub fn write_segment_info(&mut self, segment_info: &SegmentInfo) -> io::Result<()> { let mut write = try!(self.segment.open_write(SegmentComponent::INFO)); let json_data = try!(json::encode(segment_info).map_err(convert_to_ioerror)); @@ -115,7 +59,9 @@ impl SegmentSerializer { } pub fn close(mut self,) -> io::Result<()> { - try!(self.term_fst_builder.finish()); - self.store_writer.close() + try!(self.fast_field_serializer.close()); + try!(self.postings_serializer.close()); + try!(self.store_writer.close()); + Ok(()) } } diff --git a/src/core/fastfield.rs b/src/core/fastfield.rs index c185b683c..735008027 100644 --- a/src/core/fastfield.rs +++ b/src/core/fastfield.rs @@ -87,7 +87,7 @@ impl FastFieldSerializer { Ok(()) } - pub fn close(&mut self,) -> io::Result { + pub fn close(mut self,) -> io::Result { if self.field_open { return Err(io::Error::new(io::ErrorKind::Other, "Last field not closed")); } @@ -134,7 +134,7 @@ impl U32FastFieldsWriter { for field_writer in self.field_writers.iter() { try!(field_writer.serialize(serializer)); } - serializer.close().map(|_| ()) + Ok(()) } } @@ -317,6 +317,7 @@ mod tests { add_single_field_doc(&mut fast_field_writers, &field, 14u32); add_single_field_doc(&mut fast_field_writers, &field, 2u32); fast_field_writers.serialize(&mut serializer).unwrap(); + serializer.close().unwrap(); } let source = directory.open_read(&path).unwrap(); { @@ -351,6 +352,7 @@ mod tests { add_single_field_doc(&mut fast_field_writers, &field, 1_501u32); add_single_field_doc(&mut fast_field_writers, &field, 215u32); fast_field_writers.serialize(&mut serializer).unwrap(); + serializer.close().unwrap(); } let source = directory.open_read(&path).unwrap(); { @@ -395,6 +397,7 @@ mod tests { add_single_field_doc(&mut fast_field_writers, &field, x.clone()); } fast_field_writers.serialize(&mut serializer).unwrap(); + serializer.close().unwrap(); } let source = directory.open_read(&path).unwrap(); { @@ -449,6 +452,7 @@ mod tests { add_single_field_doc(&mut fast_field_writers, &field, x.clone()); } fast_field_writers.serialize(&mut serializer).unwrap(); + serializer.close().unwrap(); } let source = directory.open_read(&path).unwrap(); { @@ -480,6 +484,7 @@ mod tests { add_single_field_doc(&mut fast_field_writers, &field, x.clone()); } fast_field_writers.serialize(&mut serializer).unwrap(); + serializer.close().unwrap(); } let source = directory.open_read(&path).unwrap(); { diff --git a/src/core/postings.rs b/src/core/postings.rs index fa04c30a0..7a46a4bbb 100644 --- a/src/core/postings.rs +++ b/src/core/postings.rs @@ -2,9 +2,39 @@ use core::schema::DocId; use std::ptr; use std::collections::BTreeMap; use core::schema::Term; -use core::codec::SegmentSerializer; +use core::fstmap::FstMapBuilder; +use core::index::Segment; +use core::directory::WritePtr; +use core::index::SegmentComponent; +use core::simdcompression; +use core::serialize::BinarySerializable; +use std::io::{Read, Write}; use std::io; +#[derive(Debug)] +pub struct TermInfo { + pub doc_freq: u32, + pub postings_offset: u32, +} + +impl BinarySerializable for TermInfo { + fn serialize(&self, writer: &mut Write) -> io::Result { + Ok( + try!(self.doc_freq.serialize(writer)) + + try!(self.postings_offset.serialize(writer)) + ) + } + fn deserialize(reader: &mut Read) -> io::Result { + let doc_freq = try!(u32::deserialize(reader)); + let offset = try!(u32::deserialize(reader)); + Ok(TermInfo { + doc_freq: doc_freq, + postings_offset: offset, + }) + } +} + + pub struct PostingsWriter { postings: Vec>, term_index: BTreeMap, @@ -39,7 +69,7 @@ impl PostingsWriter { &mut self.postings[unord_id] } - pub fn serialize(&self, serializer: &mut SegmentSerializer) -> io::Result<()> { + pub fn serialize(&self, serializer: &mut PostingsSerializer) -> io::Result<()> { for (term, postings_id) in self.term_index.iter() { let doc_ids = &self.postings[postings_id.clone()]; let term_docfreq = doc_ids.len() as u32; @@ -113,6 +143,53 @@ impl Iterator for IntersectionPostings { } } +pub struct PostingsSerializer { + terms_fst_builder: FstMapBuilder, // TODO find an alternative to work around the "move" + postings_write: WritePtr, + written_bytes_postings: usize, + encoder: simdcompression::Encoder, +} + +impl PostingsSerializer { + + pub fn open(segment: &Segment) -> io::Result { + let terms_write = try!(segment.open_write(SegmentComponent::TERMS)); + let terms_fst_builder = try!(FstMapBuilder::new(terms_write)); + let postings_write = try!(segment.open_write(SegmentComponent::POSTINGS)); + Ok(PostingsSerializer { + terms_fst_builder: terms_fst_builder, + postings_write: postings_write, + written_bytes_postings: 0, + encoder: simdcompression::Encoder::new(), + }) + } + + pub fn new_term(&mut self, term: &Term, doc_freq: DocId) -> io::Result<()> { + let term_info = TermInfo { + doc_freq: doc_freq, + postings_offset: self.written_bytes_postings as u32, + }; + self.terms_fst_builder + .insert(term.as_slice(), &term_info) + } + + pub fn write_docs(&mut self, doc_ids: &[DocId]) -> io::Result<()> { + let docs_data = self.encoder.encode_sorted(doc_ids); + self.written_bytes_postings += try!((docs_data.len() as u32).serialize(&mut self.postings_write)); + for num in docs_data { + self.written_bytes_postings += try!(num.serialize(&mut self.postings_write)); + } + Ok(()) + } + + pub fn close(mut self,) -> io::Result<()> { + try!(self.terms_fst_builder.finish()); + try!(self.postings_write.flush()); + Ok(()) + } +} + + #[cfg(test)] mod tests { diff --git a/src/core/reader.rs b/src/core/reader.rs index 108603da4..60a8182eb 100644 --- a/src/core/reader.rs +++ b/src/core/reader.rs @@ -11,12 +11,10 @@ use core::postings::Postings; use core::simdcompression::Decoder; use std::io; use std::str; -use core::codec::TermInfo; +use core::postings::TermInfo; use core::fstmap::FstMap; use std::fmt; use rustc_serialize::json; -use core::codec::SegmentSerializer; -use core::index::SerializableSegment; use core::index::SegmentInfo; use core::schema::U32Field; use core::convert_to_ioerror; @@ -175,27 +173,36 @@ impl SegmentReader { } } - - -impl SerializableSegment for SegmentReader { - - fn write(&self, mut serializer: SegmentSerializer) -> io::Result<()> { - let mut term_offsets_it = self.term_offsets.stream(); - loop { - match term_offsets_it.next() { - Some((term_data, term_info)) => { - let term = Term::from(term_data); - try!(serializer.new_term(&term, term_info.doc_freq)); - let segment_postings = self.read_postings(term_info.postings_offset); - try!(serializer.write_docs(&segment_postings.doc_ids[..])); - }, - None => { break; } - } - } - for doc_id in 0..self.max_doc() { - let doc = try!(self.store_reader.get(&doc_id)); - try!(serializer.store_doc(&mut doc.text_fields())); - } - serializer.close() - } -} +// +// +// impl SerializableSegment for SegmentReader { +// +// fn write_postings(&self, mut serializer: PostingsSerializer) -> io::Result<()> { +// let mut term_offsets_it = self.term_offsets.stream(); +// loop { +// match term_offsets_it.next() { +// Some((term_data, term_info)) => { +// let term = Term::from(term_data); +// try!(serializer.new_term(&term, term_info.doc_freq)); +// let segment_postings = self.read_postings(term_info.postings_offset); +// try!(serializer.write_docs(&segment_postings.doc_ids[..])); +// }, +// None => { break; } +// } +// } +// Ok(()) +// } +// +// fn write_store(&self, ) +// +// fn write(&self, mut serializer: SegmentSerializer) -> io::Result<()> { +// try!(self.write_postings(serializer.get_postings_serializer())); +// try!(self.write_store(serializer.get_store_serializer())); +// +// for doc_id in 0..self.max_doc() { +// let doc = try!(self.store_reader.get(&doc_id)); +// try!(serializer.store_doc(&mut doc.text_fields())); +// } +// serializer.close() +// } +// } diff --git a/src/core/writer.rs b/src/core/writer.rs index c30fa0523..7af3c3e7f 100644 --- a/src/core/writer.rs +++ b/src/core/writer.rs @@ -65,14 +65,13 @@ pub struct SegmentWriter { } impl SegmentWriter { - // Write on disk all of the stuff that // is still on RAM : // - the dictionary in an fst // - the postings // - the segment info fn finalize(mut self,) -> io::Result<()> { - try!(self.postings_writer.serialize(&mut self.segment_serializer)); + try!(self.postings_writer.serialize(self.segment_serializer.get_postings_serializer())); { let segment_info = SegmentInfo { max_doc: self.max_doc @@ -134,7 +133,7 @@ impl SegmentWriter { impl SerializableSegment for SegmentWriter { fn write(&self, mut serializer: SegmentSerializer) -> io::Result<()> { - try!(self.postings_writer.serialize(&mut serializer)); + try!(self.postings_writer.serialize(serializer.get_postings_serializer())); try!(self.fast_field_writers.serialize(serializer.get_fast_field_serializer())); serializer.close() }