This commit is contained in:
Paul Masurel
2016-03-27 14:54:31 +09:00
parent a74271f180
commit e477c28025
5 changed files with 135 additions and 101 deletions

View File

@@ -1,79 +1,41 @@
use std::io;
use std::io::{Read, Write};
use std::io::Write;
use rustc_serialize::json;
use core::directory::WritePtr;
use core::index::Segment;
use core::index::SegmentInfo;
use core::index::SegmentComponent;
use core::schema::Term;
use core::schema::DocId;
use core::fstmap::FstMapBuilder;
use core::fastfield::FastFieldSerializer;
use core::store::StoreWriter;
use core::serialize::BinarySerializable;
use core::simdcompression;
use core::postings::PostingsSerializer;
use core::schema::TextFieldValue;
use core::convert_to_ioerror;
#[derive(Debug)]
pub struct TermInfo {
pub doc_freq: u32,
pub postings_offset: u32,
}
impl BinarySerializable for TermInfo {
fn serialize(&self, writer: &mut Write) -> io::Result<usize> {
Ok(
try!(self.doc_freq.serialize(writer)) +
try!(self.postings_offset.serialize(writer))
)
}
fn deserialize(reader: &mut Read) -> io::Result<Self> {
let doc_freq = try!(u32::deserialize(reader));
let offset = try!(u32::deserialize(reader));
Ok(TermInfo {
doc_freq: doc_freq,
postings_offset: offset,
})
}
}
pub struct SegmentSerializer {
segment: Segment,
written_bytes_postings: usize,
postings_write: WritePtr,
store_writer: StoreWriter,
fast_field_serializer: FastFieldSerializer,
term_fst_builder: FstMapBuilder<WritePtr, TermInfo>, // TODO find an alternative to work around the "move"
encoder: simdcompression::Encoder,
postings_serializer: PostingsSerializer,
}
impl SegmentSerializer {
pub fn for_segment(segment: &Segment) -> io::Result<SegmentSerializer> {
let term_write = try!(segment.open_write(SegmentComponent::TERMS));
let postings_write = try!(segment.open_write(SegmentComponent::POSTINGS));
let store_write = try!(segment.open_write(SegmentComponent::STORE));
let fast_field_write = try!(segment.open_write(SegmentComponent::FASTFIELDS));
let term_fst_builder_result = FstMapBuilder::new(term_write);
let term_fst_builder = term_fst_builder_result.unwrap();
let fast_field_serializer = try!(FastFieldSerializer::new(fast_field_write));
let postings_serializer = try!(PostingsSerializer::open(segment));
Ok(SegmentSerializer {
segment: segment.clone(),
written_bytes_postings: 0,
postings_write: postings_write,
postings_serializer: postings_serializer,
store_writer: StoreWriter::new(store_write),
fast_field_serializer: fast_field_serializer,
term_fst_builder: term_fst_builder,
encoder: simdcompression::Encoder::new(),
})
}
pub fn get_postings_serializer(&mut self,) -> &mut PostingsSerializer {
&mut self.postings_serializer
}
pub fn get_fast_field_serializer(&mut self,) -> &mut FastFieldSerializer {
&mut self.fast_field_serializer
}
@@ -88,24 +50,6 @@ impl SegmentSerializer {
Ok(())
}
pub fn new_term(&mut self, term: &Term, doc_freq: DocId) -> io::Result<()> {
let term_info = TermInfo {
doc_freq: doc_freq,
postings_offset: self.written_bytes_postings as u32,
};
self.term_fst_builder
.insert(term.as_slice(), &term_info)
}
pub fn write_docs(&mut self, doc_ids: &[DocId]) -> io::Result<()> {
let docs_data = self.encoder.encode_sorted(doc_ids);
self.written_bytes_postings += try!((docs_data.len() as u32).serialize(&mut self.postings_write));
for num in docs_data {
self.written_bytes_postings += try!(num.serialize(&mut self.postings_write));
}
Ok(())
}
pub fn write_segment_info(&mut self, segment_info: &SegmentInfo) -> io::Result<()> {
let mut write = try!(self.segment.open_write(SegmentComponent::INFO));
let json_data = try!(json::encode(segment_info).map_err(convert_to_ioerror));
@@ -115,7 +59,9 @@ impl SegmentSerializer {
}
pub fn close(mut self,) -> io::Result<()> {
try!(self.term_fst_builder.finish());
self.store_writer.close()
try!(self.fast_field_serializer.close());
try!(self.postings_serializer.close());
try!(self.store_writer.close());
Ok(())
}
}

View File

@@ -87,7 +87,7 @@ impl FastFieldSerializer {
Ok(())
}
pub fn close(&mut self,) -> io::Result<usize> {
pub fn close(mut self,) -> io::Result<usize> {
if self.field_open {
return Err(io::Error::new(io::ErrorKind::Other, "Last field not closed"));
}
@@ -134,7 +134,7 @@ impl U32FastFieldsWriter {
for field_writer in self.field_writers.iter() {
try!(field_writer.serialize(serializer));
}
serializer.close().map(|_| ())
Ok(())
}
}
@@ -317,6 +317,7 @@ mod tests {
add_single_field_doc(&mut fast_field_writers, &field, 14u32);
add_single_field_doc(&mut fast_field_writers, &field, 2u32);
fast_field_writers.serialize(&mut serializer).unwrap();
serializer.close().unwrap();
}
let source = directory.open_read(&path).unwrap();
{
@@ -351,6 +352,7 @@ mod tests {
add_single_field_doc(&mut fast_field_writers, &field, 1_501u32);
add_single_field_doc(&mut fast_field_writers, &field, 215u32);
fast_field_writers.serialize(&mut serializer).unwrap();
serializer.close().unwrap();
}
let source = directory.open_read(&path).unwrap();
{
@@ -395,6 +397,7 @@ mod tests {
add_single_field_doc(&mut fast_field_writers, &field, x.clone());
}
fast_field_writers.serialize(&mut serializer).unwrap();
serializer.close().unwrap();
}
let source = directory.open_read(&path).unwrap();
{
@@ -449,6 +452,7 @@ mod tests {
add_single_field_doc(&mut fast_field_writers, &field, x.clone());
}
fast_field_writers.serialize(&mut serializer).unwrap();
serializer.close().unwrap();
}
let source = directory.open_read(&path).unwrap();
{
@@ -480,6 +484,7 @@ mod tests {
add_single_field_doc(&mut fast_field_writers, &field, x.clone());
}
fast_field_writers.serialize(&mut serializer).unwrap();
serializer.close().unwrap();
}
let source = directory.open_read(&path).unwrap();
{

View File

@@ -2,9 +2,39 @@ use core::schema::DocId;
use std::ptr;
use std::collections::BTreeMap;
use core::schema::Term;
use core::codec::SegmentSerializer;
use core::fstmap::FstMapBuilder;
use core::index::Segment;
use core::directory::WritePtr;
use core::index::SegmentComponent;
use core::simdcompression;
use core::serialize::BinarySerializable;
use std::io::{Read, Write};
use std::io;
#[derive(Debug)]
pub struct TermInfo {
pub doc_freq: u32,
pub postings_offset: u32,
}
impl BinarySerializable for TermInfo {
fn serialize(&self, writer: &mut Write) -> io::Result<usize> {
Ok(
try!(self.doc_freq.serialize(writer)) +
try!(self.postings_offset.serialize(writer))
)
}
fn deserialize(reader: &mut Read) -> io::Result<Self> {
let doc_freq = try!(u32::deserialize(reader));
let offset = try!(u32::deserialize(reader));
Ok(TermInfo {
doc_freq: doc_freq,
postings_offset: offset,
})
}
}
pub struct PostingsWriter {
postings: Vec<Vec<DocId>>,
term_index: BTreeMap<Term, usize>,
@@ -39,7 +69,7 @@ impl PostingsWriter {
&mut self.postings[unord_id]
}
pub fn serialize(&self, serializer: &mut SegmentSerializer) -> io::Result<()> {
pub fn serialize(&self, serializer: &mut PostingsSerializer) -> io::Result<()> {
for (term, postings_id) in self.term_index.iter() {
let doc_ids = &self.postings[postings_id.clone()];
let term_docfreq = doc_ids.len() as u32;
@@ -113,6 +143,53 @@ impl<T: Postings> Iterator for IntersectionPostings<T> {
}
}
pub struct PostingsSerializer {
terms_fst_builder: FstMapBuilder<WritePtr, TermInfo>, // TODO find an alternative to work around the "move"
postings_write: WritePtr,
written_bytes_postings: usize,
encoder: simdcompression::Encoder,
}
impl PostingsSerializer {
pub fn open(segment: &Segment) -> io::Result<PostingsSerializer> {
let terms_write = try!(segment.open_write(SegmentComponent::TERMS));
let terms_fst_builder = try!(FstMapBuilder::new(terms_write));
let postings_write = try!(segment.open_write(SegmentComponent::POSTINGS));
Ok(PostingsSerializer {
terms_fst_builder: terms_fst_builder,
postings_write: postings_write,
written_bytes_postings: 0,
encoder: simdcompression::Encoder::new(),
})
}
pub fn new_term(&mut self, term: &Term, doc_freq: DocId) -> io::Result<()> {
let term_info = TermInfo {
doc_freq: doc_freq,
postings_offset: self.written_bytes_postings as u32,
};
self.terms_fst_builder
.insert(term.as_slice(), &term_info)
}
pub fn write_docs(&mut self, doc_ids: &[DocId]) -> io::Result<()> {
let docs_data = self.encoder.encode_sorted(doc_ids);
self.written_bytes_postings += try!((docs_data.len() as u32).serialize(&mut self.postings_write));
for num in docs_data {
self.written_bytes_postings += try!(num.serialize(&mut self.postings_write));
}
Ok(())
}
pub fn close(mut self,) -> io::Result<()> {
try!(self.terms_fst_builder.finish());
try!(self.postings_write.flush());
Ok(())
}
}
#[cfg(test)]
mod tests {

View File

@@ -11,12 +11,10 @@ use core::postings::Postings;
use core::simdcompression::Decoder;
use std::io;
use std::str;
use core::codec::TermInfo;
use core::postings::TermInfo;
use core::fstmap::FstMap;
use std::fmt;
use rustc_serialize::json;
use core::codec::SegmentSerializer;
use core::index::SerializableSegment;
use core::index::SegmentInfo;
use core::schema::U32Field;
use core::convert_to_ioerror;
@@ -175,27 +173,36 @@ impl SegmentReader {
}
}
impl SerializableSegment for SegmentReader {
fn write(&self, mut serializer: SegmentSerializer) -> io::Result<()> {
let mut term_offsets_it = self.term_offsets.stream();
loop {
match term_offsets_it.next() {
Some((term_data, term_info)) => {
let term = Term::from(term_data);
try!(serializer.new_term(&term, term_info.doc_freq));
let segment_postings = self.read_postings(term_info.postings_offset);
try!(serializer.write_docs(&segment_postings.doc_ids[..]));
},
None => { break; }
}
}
for doc_id in 0..self.max_doc() {
let doc = try!(self.store_reader.get(&doc_id));
try!(serializer.store_doc(&mut doc.text_fields()));
}
serializer.close()
}
}
//
//
// impl SerializableSegment for SegmentReader {
//
// fn write_postings(&self, mut serializer: PostingsSerializer) -> io::Result<()> {
// let mut term_offsets_it = self.term_offsets.stream();
// loop {
// match term_offsets_it.next() {
// Some((term_data, term_info)) => {
// let term = Term::from(term_data);
// try!(serializer.new_term(&term, term_info.doc_freq));
// let segment_postings = self.read_postings(term_info.postings_offset);
// try!(serializer.write_docs(&segment_postings.doc_ids[..]));
// },
// None => { break; }
// }
// }
// Ok(())
// }
//
// fn write_store(&self, )
//
// fn write(&self, mut serializer: SegmentSerializer) -> io::Result<()> {
// try!(self.write_postings(serializer.get_postings_serializer()));
// try!(self.write_store(serializer.get_store_serializer()));
//
// for doc_id in 0..self.max_doc() {
// let doc = try!(self.store_reader.get(&doc_id));
// try!(serializer.store_doc(&mut doc.text_fields()));
// }
// serializer.close()
// }
// }

View File

@@ -65,14 +65,13 @@ pub struct SegmentWriter {
}
impl SegmentWriter {
// Write on disk all of the stuff that
// is still on RAM :
// - the dictionary in an fst
// - the postings
// - the segment info
fn finalize(mut self,) -> io::Result<()> {
try!(self.postings_writer.serialize(&mut self.segment_serializer));
try!(self.postings_writer.serialize(self.segment_serializer.get_postings_serializer()));
{
let segment_info = SegmentInfo {
max_doc: self.max_doc
@@ -134,7 +133,7 @@ impl SegmentWriter {
impl SerializableSegment for SegmentWriter {
fn write(&self, mut serializer: SegmentSerializer) -> io::Result<()> {
try!(self.postings_writer.serialize(&mut serializer));
try!(self.postings_writer.serialize(serializer.get_postings_serializer()));
try!(self.fast_field_writers.serialize(serializer.get_fast_field_serializer()));
serializer.close()
}