moving for new serialize api

This commit is contained in:
Paul Masurel
2016-01-21 14:41:59 +09:00
parent d1ff654ded
commit e5eba2a530
4 changed files with 277 additions and 307 deletions

View File

@@ -12,65 +12,66 @@ use core::reader::*;
pub struct SimpleCodec;
impl SimpleCodec {
fn write_postings<D: DocCursor, W: Write>(doc_it: D, postings: &mut W) -> Result<usize> {
let mut written_bytes: usize = 4;
match postings.write_u32::<LittleEndian>(doc_it.len() as u32) {
Ok(_) => {},
Err(_) => {
let msg = String::from("Failed writing posting list length");
return Err(Error::WriteError(msg));
},
}
for doc_id in doc_it {
println!(" Doc {}", doc_id);
match postings.write_u32::<LittleEndian>(doc_id as u32) {
Ok(_) => {},
Err(_) => {
let msg = String::from("Failed while writing posting list");
return Err(Error::WriteError(msg));
},
}
written_bytes += 4;
}
Ok(written_bytes)
}
// fn write_postings<D: DocCursor, W: Write>(doc_it: D, postings: &mut W) -> Result<usize> {
// let mut written_bytes: usize = 4;
// match postings.write_u32::<LittleEndian>(doc_it.len() as u32) {
// Ok(_) => {},
// Err(_) => {
// let msg = String::from("Failed writing posting list length");
// return Err(Error::WriteError(msg));
// },
// }
// for doc_id in doc_it {
// println!(" Doc {}", doc_id);
// match postings.write_u32::<LittleEndian>(doc_id as u32) {
// Ok(_) => {},
// Err(_) => {
// let msg = String::from("Failed while writing posting list");
// return Err(Error::WriteError(msg));
// },
// }
// written_bytes += 4;
// }
// Ok(written_bytes)
// }
// TODO impl packed int
// TODO skip lists
pub fn write<'a, I: SerializableSegment<'a>>(index: &'a I, segment: &'a Segment) -> Result<usize> {
let term_write = try!(segment.open_writable(SegmentComponent::TERMS));
let mut postings_write = try!(segment.open_writable(SegmentComponent::POSTINGS));
let term_trie_builder_result = MapBuilder::new(term_write);
if term_trie_builder_result.is_err() {
// TODO include cause somehow
return Err(Error::WriteError(String::from("Failed creating the term builder")));
}
let mut term_buffer: Vec<u8> = Vec::new();
let mut term_trie_builder = term_trie_builder_result.unwrap();
let mut term_cursor = index.term_cursor();
let mut offset: usize = 0;
loop {
match term_cursor.next() {
Some((term, doc_it)) => {
println!("{:?}", term);
term.write_into(&mut term_buffer);
match term_trie_builder.insert(&term_buffer, offset as u64) {
Ok(_) => {}
Err(_) => {
return Err(Error::WriteError(String::from("Failed while inserting into the fst")))
},
}
offset += try!(SimpleCodec::write_postings(doc_it, &mut postings_write));
},
None => {
break;
}
}
}
term_trie_builder.finish();
Ok(0)
}
// pub fn write<'a, I: SerializableSegment<'a>>(index: &'a I, segment: &'a Segment) -> Result<usize> {
// let term_write = try!(segment.open_writable(SegmentComponent::TERMS));
// let mut postings_write = try!(segment.open_writable(SegmentComponent::POSTINGS));
// let term_trie_builder_result = MapBuilder::new(term_write);
// if term_trie_builder_result.is_err() {
// // TODO include cause somehow
// return Err(Error::WriteError(String::from("Failed creating the term builder")));
// }
// let mut term_buffer: Vec<u8> = Vec::new();
// let mut term_trie_builder = term_trie_builder_result.unwrap();
// let mut term_cursor = index.term_cursor();
// let mut offset: usize = 0;
// loop {
// match term_cursor.next() {
// Some((term, doc_it)) => {
// println!("{:?}", term);
// term.write_into(&mut term_buffer);
// match term_trie_builder.insert(&term_buffer, offset as u64) {
// Ok(_) => {}
// Err(_) => {
// return Err(Error::WriteError(String::from("Failed while inserting into the fst")))
// },
// }
// offset += try!(SimpleCodec::write_postings(doc_it, &mut postings_write));
// },
// None => {
// break;
// }
// }
// }
// term_trie_builder.finish();
// Ok(0)
//
// }
}

View File

@@ -3,86 +3,18 @@ use core::directory::Segment;
use core::schema::Term;
use fst::Streamer;
use fst;
use std::io;
use fst::raw::Fst;
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use std::borrow::Borrow;
use std::io::Cursor;
use core::global::DocId;
use core::serial::{DocCursor, TermCursor};
use core::serial::SerializableSegment;
use core::serial::*;
use core::directory::SegmentComponent;
use fst::raw::MmapReadOnly;
use core::error::{Result, Error};
pub struct SegmentDocCursor<'a> {
postings_data: Cursor<&'a [u8]>,
num_docs: DocId,
current_doc: DocId,
}
impl<'a> Iterator for SegmentDocCursor<'a> {
type Item = DocId;
fn next(&mut self) -> Option<DocId> {
if self.num_docs == 0 {
None
}
else {
self.num_docs -= 1;
self.current_doc = self.postings_data.read_u32::<LittleEndian>().unwrap();
Some(self.current_doc)
}
}
}
impl<'a> DocCursor for SegmentDocCursor<'a> {
fn doc(&self) -> DocId{
self.current_doc
}
fn len(&self) -> DocId {
self.num_docs
}
}
// ------------------------
pub struct SegmentTermCur<'a> {
segment: &'a Segment,
fst_streamer: fst::map::Stream<'a>,
postings_data: &'a [u8],
}
impl<'a> TermCursor for SegmentTermCur<'a> {
type DocCur = SegmentDocCursor<'a>;
fn next(&mut self,) -> Option<(Term, SegmentDocCursor<'a>)> {
match self.fst_streamer.next() {
Some((k, offset_u64)) => {
let term = Term::from(k);
let offset = offset_u64 as usize;
let data = &self.postings_data[offset..];
let mut cursor = Cursor::new(data);
let num_docs = cursor.read_u32::<LittleEndian>().unwrap();
let doc_cursor = SegmentDocCursor {
postings_data: cursor,
num_docs: num_docs,
current_doc: 0,
};
Some((term, doc_cursor))
},
None => None
}
}
}
// ----------------------
// TODO file structure should be in codec
pub struct SegmentIndexReader {
@@ -109,18 +41,36 @@ impl SegmentIndexReader {
segment: segment,
})
}
}
impl<'a> SerializableSegment<'a> for SegmentIndexReader {
type TermCur = SegmentTermCur<'a>;
fn term_cursor(&'a self) -> SegmentTermCur<'a> {
SegmentTermCur {
segment: &self.segment,
fst_streamer: self.term_offsets.stream(),
postings_data: unsafe { self.postings_data.borrow().as_slice() },
}
fn write_postings<R: io::Read, SegSer: SegmentSerializer>(mut cursor: R, num_docs: DocId, serializer: &mut SegSer) -> Result<()> {
for i in 0..num_docs {
let doc_id = cursor.read_u32::<LittleEndian>().unwrap();
try!(serializer.add_doc(doc_id));
}
Ok(())
}
impl SerializableSegment for SegmentIndexReader {
fn write<SegSer: SegmentSerializer>(&self, serializer: &mut SegSer) -> Result<()> {
let mut term_offsets_it = self.term_offsets.stream();
loop {
match term_offsets_it.next() {
Some((term_data, offset_u64)) => {
let term = Term::from(term_data);
let offset = offset_u64 as usize;
let data = unsafe { &self.postings_data.as_slice()[offset..] };
let mut cursor = Cursor::new(data);
let num_docs = cursor.read_u32::<LittleEndian>().unwrap() as DocId;
try!(serializer.new_term(&term, num_docs));
try!(write_postings(cursor, num_docs, serializer));
},
None => { break; }
}
}
Ok(())
}
}

View File

@@ -1,20 +1,39 @@
use core::global::*;
use core::schema::*;
use core::error::{Result, Error};
// Trait sufficient to serialize a segment.
pub trait SerializableSegment<'a> {
type TermCur: TermCursor; // TODO rename TermCursorImpl
fn term_cursor(&'a self) -> Self::TermCur;
// change the API to remove the lifetime, by
// "pushing" the data to a SegmentSerializer.
struct DebugSegmentSerialize {
text: String,
}
pub trait DocCursor: Iterator<Item=DocId> {
fn doc(&self) -> DocId;
fn len(&self) -> DocId;
impl DebugSegmentSerialize {
pub fn to_string(&self,) -> &String {
&self.text
}
}
impl SegmentSerializer for DebugSegmentSerialize {
fn new_term(&mut self, term: &Term, doc_freq: DocId) -> Result<()> {
self.text.push_str(&format!("{:?}\n", term));
Ok(())
}
fn add_doc(&mut self, doc_id: DocId) -> Result<()> {
self.text.push_str(&format!(" - Doc {:?}\n", doc_id));
Ok(())
}
}
pub trait SegmentSerializer {
fn new_term(&mut self, term: &Term, doc_freq: DocId) -> Result<()>;
fn add_doc(&mut self, doc_id: DocId) -> Result<()>;
}
pub trait SerializableSegment {
fn write<SegSer: SegmentSerializer>(&self, serializer: &mut SegSer) -> Result<()>;
}
// TODO make iteration over Fields somehow sorted
pub trait TermCursor {
type DocCur: DocCursor;
fn next(&mut self,) -> Option<(Term, Self::DocCur)>;
}

View File

@@ -143,166 +143,166 @@ impl SegmentWriter {
//////////////////////////////////
// CIWFormCursor
//
struct CIWFormCursor<'a> {
term_it: btree_map::Iter<'a, String, usize>, // term -> postings_idx
postings_map: &'a Vec<SimplePostingsWriter>, // postings_idx -> postings
}
struct FormPostings<'a> {
form: &'a str,
postings: &'a SimplePostingsWriter,
}
impl<'a> Iterator for CIWFormCursor<'a> {
type Item = FormPostings<'a>;
fn next(&mut self,) -> Option<FormPostings<'a>> {
self.term_it.next()
.map(|(form, postings_idx)| {
FormPostings {
form: form,
postings: unsafe { self.postings_map.get_unchecked(*postings_idx) }
}
})
}
}
// struct CIWFormCursor<'a> {
// term_it: btree_map::Iter<'a, String, usize>, // term -> postings_idx
// postings_map: &'a Vec<SimplePostingsWriter>, // postings_idx -> postings
// }
//
// struct FormPostings<'a> {
// form: &'a str,
// postings: &'a SimplePostingsWriter,
// }
//
// impl<'a> Iterator for CIWFormCursor<'a> {
// type Item = FormPostings<'a>;
//
// fn next(&mut self,) -> Option<FormPostings<'a>> {
// self.term_it.next()
// .map(|(form, postings_idx)| {
// FormPostings {
// form: form,
// postings: unsafe { self.postings_map.get_unchecked(*postings_idx) }
// }
// })
// }
// }
//////////////////////////////////
// CIWDocCursor
//
pub struct CIWTermCursor<'a> {
field_it: hash_map::Iter<'a, Field, FieldWriter>,
form_it: CIWFormCursor<'a>,
current_form_postings: Option<FormPostings<'a>>,
field: &'a Field,
}
impl<'a> CIWTermCursor<'a> {
fn advance(&mut self,) -> bool {
let next_form = self.next_form();
if next_form {
true
}
else {
if self.next_field() {
self.advance()
}
else {
false
}
}
}
fn get_term(&self) -> Term {
let field = self.field.clone();
let value = self.current_form_postings.as_ref().unwrap().form;
Term::from_field_text(field, value)
}
fn doc_cursor(&self,) -> CIWDocCursor<'a> {
let postings = self.current_form_postings
.as_ref()
.unwrap()
.postings;
let num_docs = postings.doc_ids.len() as DocId;
CIWDocCursor {
num_docs: num_docs,
docs_it: postings
.doc_ids
.iter(),
current: None
}
}
fn next_form(&mut self,) -> bool {
match self.form_it.next() {
Some(form_postings) => {
self.current_form_postings = Some(form_postings);
return true;
},
None => { false }
}
}
// Advance to the next field
// sets up form_it to iterate on forms
// returns true iff there was a next field
fn next_field(&mut self,) -> bool {
match self.field_it.next() {
Some((field, field_writer)) => {
self.form_it = CIWFormCursor {
term_it: field_writer.term_index.iter(),
postings_map: &field_writer.postings,
};
self.field = field;
true
},
None => false,
}
}
}
impl<'a> TermCursor for CIWTermCursor<'a> {
type DocCur = CIWDocCursor<'a>;
fn next(&mut self,) -> Option<(Term, CIWDocCursor<'a>)> {
if self.advance() {
Some((self.get_term(), self.doc_cursor()))
}
else {
None
}
}
}
impl<'a> SerializableSegment<'a> for SegmentWriter {
type TermCur = CIWTermCursor<'a>;
fn term_cursor(&'a self) -> CIWTermCursor<'a> {
let mut field_it: hash_map::Iter<'a, Field, FieldWriter> = self.term_writers.iter();
let (field, field_writer) = field_it.next().unwrap();
CIWTermCursor {
field_it: field_it,
form_it: CIWFormCursor {
term_it: field_writer.term_index.iter(),
postings_map: &field_writer.postings,
},
field: field,
current_form_postings: None,
}
// TODO handle having no fields at all
}
}
// TODO add positions
pub struct CIWDocCursor<'a> {
docs_it: slice::Iter<'a, DocId>,
current: Option<DocId>,
num_docs: DocId,
}
impl<'a> Iterator for CIWDocCursor<'a> {
type Item=DocId;
fn next(&mut self) -> Option<DocId> {
self.current = self.docs_it.next().map(|x| *x);
self.current
}
}
impl<'a> DocCursor for CIWDocCursor<'a> {
fn doc(&self,) -> DocId {
self.current.unwrap()
}
fn len(&self) -> DocId {
self.num_docs
}
}
//
// pub struct CIWTermCursor<'a> {
// field_it: hash_map::Iter<'a, Field, FieldWriter>,
// form_it: CIWFormCursor<'a>,
// current_form_postings: Option<FormPostings<'a>>,
// field: &'a Field,
// }
//
// impl<'a> CIWTermCursor<'a> {
//
// fn advance(&mut self,) -> bool {
// let next_form = self.next_form();
// if next_form {
// true
// }
// else {
// if self.next_field() {
// self.advance()
// }
// else {
// false
// }
// }
// }
//
// fn get_term(&self) -> Term {
// let field = self.field.clone();
// let value = self.current_form_postings.as_ref().unwrap().form;
// Term::from_field_text(field, value)
// }
//
// fn doc_cursor(&self,) -> CIWDocCursor<'a> {
// let postings = self.current_form_postings
// .as_ref()
// .unwrap()
// .postings;
// let num_docs = postings.doc_ids.len() as DocId;
// CIWDocCursor {
// num_docs: num_docs,
// docs_it: postings
// .doc_ids
// .iter(),
// current: None
// }
// }
//
// fn next_form(&mut self,) -> bool {
// match self.form_it.next() {
// Some(form_postings) => {
// self.current_form_postings = Some(form_postings);
// return true;
// },
// None => { false }
// }
// }
//
// // Advance to the next field
// // sets up form_it to iterate on forms
// // returns true iff there was a next field
// fn next_field(&mut self,) -> bool {
// match self.field_it.next() {
// Some((field, field_writer)) => {
// self.form_it = CIWFormCursor {
// term_it: field_writer.term_index.iter(),
// postings_map: &field_writer.postings,
// };
// self.field = field;
// true
// },
// None => false,
// }
// }
// }
//
// impl<'a> TermCursor for CIWTermCursor<'a> {
//
// type DocCur = CIWDocCursor<'a>;
//
// fn next(&mut self,) -> Option<(Term, CIWDocCursor<'a>)> {
// if self.advance() {
// Some((self.get_term(), self.doc_cursor()))
// }
// else {
// None
// }
// }
// }
//
//
// impl<'a> SerializableSegment<'a> for SegmentWriter {
//
// type TermCur = CIWTermCursor<'a>;
//
// fn term_cursor(&'a self) -> CIWTermCursor<'a> {
// let mut field_it: hash_map::Iter<'a, Field, FieldWriter> = self.term_writers.iter();
// let (field, field_writer) = field_it.next().unwrap();
// CIWTermCursor {
// field_it: field_it,
// form_it: CIWFormCursor {
// term_it: field_writer.term_index.iter(),
// postings_map: &field_writer.postings,
// },
// field: field,
// current_form_postings: None,
// }
// // TODO handle having no fields at all
// }
// }
//
// // TODO add positions
//
// pub struct CIWDocCursor<'a> {
// docs_it: slice::Iter<'a, DocId>,
// current: Option<DocId>,
// num_docs: DocId,
// }
//
// impl<'a> Iterator for CIWDocCursor<'a> {
// type Item=DocId;
//
// fn next(&mut self) -> Option<DocId> {
// self.current = self.docs_it.next().map(|x| *x);
// self.current
// }
// }
//
// impl<'a> DocCursor for CIWDocCursor<'a> {
//
// fn doc(&self,) -> DocId {
// self.current.unwrap()
// }
//
// fn len(&self) -> DocId {
// self.num_docs
// }
// }