From e5eba2a5305da70978c8b8efeca000c538eaf3be Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 21 Jan 2016 14:41:59 +0900 Subject: [PATCH] moving for new serialize api --- src/core/codec.rs | 113 ++++++++-------- src/core/reader.rs | 110 +++++----------- src/core/serial.rs | 43 ++++-- src/core/writer.rs | 318 ++++++++++++++++++++++----------------------- 4 files changed, 277 insertions(+), 307 deletions(-) diff --git a/src/core/codec.rs b/src/core/codec.rs index 91cdf37c0..f8862dc4b 100644 --- a/src/core/codec.rs +++ b/src/core/codec.rs @@ -12,65 +12,66 @@ use core::reader::*; pub struct SimpleCodec; impl SimpleCodec { - fn write_postings(doc_it: D, postings: &mut W) -> Result { - let mut written_bytes: usize = 4; - match postings.write_u32::(doc_it.len() as u32) { - Ok(_) => {}, - Err(_) => { - let msg = String::from("Failed writing posting list length"); - return Err(Error::WriteError(msg)); - }, - } - for doc_id in doc_it { - println!(" Doc {}", doc_id); - match postings.write_u32::(doc_id as u32) { - Ok(_) => {}, - Err(_) => { - let msg = String::from("Failed while writing posting list"); - return Err(Error::WriteError(msg)); - }, - } - written_bytes += 4; - } - Ok(written_bytes) - } + + // fn write_postings(doc_it: D, postings: &mut W) -> Result { + // let mut written_bytes: usize = 4; + // match postings.write_u32::(doc_it.len() as u32) { + // Ok(_) => {}, + // Err(_) => { + // let msg = String::from("Failed writing posting list length"); + // return Err(Error::WriteError(msg)); + // }, + // } + // for doc_id in doc_it { + // println!(" Doc {}", doc_id); + // match postings.write_u32::(doc_id as u32) { + // Ok(_) => {}, + // Err(_) => { + // let msg = String::from("Failed while writing posting list"); + // return Err(Error::WriteError(msg)); + // }, + // } + // written_bytes += 4; + // } + // Ok(written_bytes) + // } // TODO impl packed int // TODO skip lists - pub fn write<'a, I: SerializableSegment<'a>>(index: &'a I, segment: &'a Segment) -> Result { - let term_write = try!(segment.open_writable(SegmentComponent::TERMS)); - let mut postings_write = try!(segment.open_writable(SegmentComponent::POSTINGS)); - let term_trie_builder_result = MapBuilder::new(term_write); - if term_trie_builder_result.is_err() { - // TODO include cause somehow - return Err(Error::WriteError(String::from("Failed creating the term builder"))); - } - let mut term_buffer: Vec = Vec::new(); - let mut term_trie_builder = term_trie_builder_result.unwrap(); - let mut term_cursor = index.term_cursor(); - let mut offset: usize = 0; - loop { - match term_cursor.next() { - Some((term, doc_it)) => { - println!("{:?}", term); - term.write_into(&mut term_buffer); - match term_trie_builder.insert(&term_buffer, offset as u64) { - Ok(_) => {} - Err(_) => { - return Err(Error::WriteError(String::from("Failed while inserting into the fst"))) - }, - } - offset += try!(SimpleCodec::write_postings(doc_it, &mut postings_write)); - }, - None => { - break; - } - } - } - term_trie_builder.finish(); - Ok(0) - - } + // pub fn write<'a, I: SerializableSegment<'a>>(index: &'a I, segment: &'a Segment) -> Result { + // let term_write = try!(segment.open_writable(SegmentComponent::TERMS)); + // let mut postings_write = try!(segment.open_writable(SegmentComponent::POSTINGS)); + // let term_trie_builder_result = MapBuilder::new(term_write); + // if term_trie_builder_result.is_err() { + // // TODO include cause somehow + // return Err(Error::WriteError(String::from("Failed creating the term builder"))); + // } + // let mut term_buffer: Vec = Vec::new(); + // let mut term_trie_builder = term_trie_builder_result.unwrap(); + // let mut term_cursor = index.term_cursor(); + // let mut offset: usize = 0; + // loop { + // match term_cursor.next() { + // Some((term, doc_it)) => { + // println!("{:?}", term); + // term.write_into(&mut term_buffer); + // match term_trie_builder.insert(&term_buffer, offset as u64) { + // Ok(_) => {} + // Err(_) => { + // return Err(Error::WriteError(String::from("Failed while inserting into the fst"))) + // }, + // } + // offset += try!(SimpleCodec::write_postings(doc_it, &mut postings_write)); + // }, + // None => { + // break; + // } + // } + // } + // term_trie_builder.finish(); + // Ok(0) + // + // } } diff --git a/src/core/reader.rs b/src/core/reader.rs index f9b220bf6..0723a75a1 100644 --- a/src/core/reader.rs +++ b/src/core/reader.rs @@ -3,86 +3,18 @@ use core::directory::Segment; use core::schema::Term; use fst::Streamer; use fst; +use std::io; use fst::raw::Fst; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use std::borrow::Borrow; use std::io::Cursor; use core::global::DocId; -use core::serial::{DocCursor, TermCursor}; -use core::serial::SerializableSegment; +use core::serial::*; use core::directory::SegmentComponent; use fst::raw::MmapReadOnly; use core::error::{Result, Error}; - - -pub struct SegmentDocCursor<'a> { - postings_data: Cursor<&'a [u8]>, - num_docs: DocId, - current_doc: DocId, -} - -impl<'a> Iterator for SegmentDocCursor<'a> { - type Item = DocId; - - fn next(&mut self) -> Option { - if self.num_docs == 0 { - None - } - else { - self.num_docs -= 1; - self.current_doc = self.postings_data.read_u32::().unwrap(); - Some(self.current_doc) - } - } -} - -impl<'a> DocCursor for SegmentDocCursor<'a> { - fn doc(&self) -> DocId{ - self.current_doc - } - - fn len(&self) -> DocId { - self.num_docs - } -} - -// ------------------------ - -pub struct SegmentTermCur<'a> { - segment: &'a Segment, - fst_streamer: fst::map::Stream<'a>, - postings_data: &'a [u8], -} - -impl<'a> TermCursor for SegmentTermCur<'a> { - - type DocCur = SegmentDocCursor<'a>; - - fn next(&mut self,) -> Option<(Term, SegmentDocCursor<'a>)> { - match self.fst_streamer.next() { - Some((k, offset_u64)) => { - let term = Term::from(k); - let offset = offset_u64 as usize; - let data = &self.postings_data[offset..]; - let mut cursor = Cursor::new(data); - let num_docs = cursor.read_u32::().unwrap(); - let doc_cursor = SegmentDocCursor { - postings_data: cursor, - num_docs: num_docs, - current_doc: 0, - }; - Some((term, doc_cursor)) - }, - None => None - } - } -} - - -// ---------------------- - // TODO file structure should be in codec pub struct SegmentIndexReader { @@ -109,18 +41,36 @@ impl SegmentIndexReader { segment: segment, }) } - } -impl<'a> SerializableSegment<'a> for SegmentIndexReader { - type TermCur = SegmentTermCur<'a>; - - fn term_cursor(&'a self) -> SegmentTermCur<'a> { - SegmentTermCur { - segment: &self.segment, - fst_streamer: self.term_offsets.stream(), - postings_data: unsafe { self.postings_data.borrow().as_slice() }, - } +fn write_postings(mut cursor: R, num_docs: DocId, serializer: &mut SegSer) -> Result<()> { + for i in 0..num_docs { + let doc_id = cursor.read_u32::().unwrap(); + try!(serializer.add_doc(doc_id)); } + Ok(()) +} + +impl SerializableSegment for SegmentIndexReader { + + fn write(&self, serializer: &mut SegSer) -> Result<()> { + let mut term_offsets_it = self.term_offsets.stream(); + loop { + match term_offsets_it.next() { + Some((term_data, offset_u64)) => { + let term = Term::from(term_data); + let offset = offset_u64 as usize; + let data = unsafe { &self.postings_data.as_slice()[offset..] }; + let mut cursor = Cursor::new(data); + let num_docs = cursor.read_u32::().unwrap() as DocId; + try!(serializer.new_term(&term, num_docs)); + try!(write_postings(cursor, num_docs, serializer)); + }, + None => { break; } + } + } + Ok(()) + } + } diff --git a/src/core/serial.rs b/src/core/serial.rs index 6739acf48..40e1592f3 100644 --- a/src/core/serial.rs +++ b/src/core/serial.rs @@ -1,20 +1,39 @@ use core::global::*; use core::schema::*; +use core::error::{Result, Error}; -// Trait sufficient to serialize a segment. -pub trait SerializableSegment<'a> { - type TermCur: TermCursor; // TODO rename TermCursorImpl - fn term_cursor(&'a self) -> Self::TermCur; +// change the API to remove the lifetime, by +// "pushing" the data to a SegmentSerializer. + +struct DebugSegmentSerialize { + text: String, } -pub trait DocCursor: Iterator { - fn doc(&self) -> DocId; - fn len(&self) -> DocId; +impl DebugSegmentSerialize { + pub fn to_string(&self,) -> &String { + &self.text + } +} + +impl SegmentSerializer for DebugSegmentSerialize { + fn new_term(&mut self, term: &Term, doc_freq: DocId) -> Result<()> { + self.text.push_str(&format!("{:?}\n", term)); + Ok(()) + } + + fn add_doc(&mut self, doc_id: DocId) -> Result<()> { + self.text.push_str(&format!(" - Doc {:?}\n", doc_id)); + Ok(()) + } +} + +pub trait SegmentSerializer { + fn new_term(&mut self, term: &Term, doc_freq: DocId) -> Result<()>; + fn add_doc(&mut self, doc_id: DocId) -> Result<()>; +} + +pub trait SerializableSegment { + fn write(&self, serializer: &mut SegSer) -> Result<()>; } // TODO make iteration over Fields somehow sorted - -pub trait TermCursor { - type DocCur: DocCursor; - fn next(&mut self,) -> Option<(Term, Self::DocCur)>; -} diff --git a/src/core/writer.rs b/src/core/writer.rs index 559fef3a5..b0770d8ac 100644 --- a/src/core/writer.rs +++ b/src/core/writer.rs @@ -143,166 +143,166 @@ impl SegmentWriter { ////////////////////////////////// // CIWFormCursor // -struct CIWFormCursor<'a> { - term_it: btree_map::Iter<'a, String, usize>, // term -> postings_idx - postings_map: &'a Vec, // postings_idx -> postings -} - -struct FormPostings<'a> { - form: &'a str, - postings: &'a SimplePostingsWriter, -} - -impl<'a> Iterator for CIWFormCursor<'a> { - type Item = FormPostings<'a>; - - fn next(&mut self,) -> Option> { - self.term_it.next() - .map(|(form, postings_idx)| { - FormPostings { - form: form, - postings: unsafe { self.postings_map.get_unchecked(*postings_idx) } - } - }) - } -} +// struct CIWFormCursor<'a> { +// term_it: btree_map::Iter<'a, String, usize>, // term -> postings_idx +// postings_map: &'a Vec, // postings_idx -> postings +// } +// +// struct FormPostings<'a> { +// form: &'a str, +// postings: &'a SimplePostingsWriter, +// } +// +// impl<'a> Iterator for CIWFormCursor<'a> { +// type Item = FormPostings<'a>; +// +// fn next(&mut self,) -> Option> { +// self.term_it.next() +// .map(|(form, postings_idx)| { +// FormPostings { +// form: form, +// postings: unsafe { self.postings_map.get_unchecked(*postings_idx) } +// } +// }) +// } +// } ////////////////////////////////// // CIWDocCursor // - -pub struct CIWTermCursor<'a> { - field_it: hash_map::Iter<'a, Field, FieldWriter>, - form_it: CIWFormCursor<'a>, - current_form_postings: Option>, - field: &'a Field, -} - -impl<'a> CIWTermCursor<'a> { - - fn advance(&mut self,) -> bool { - let next_form = self.next_form(); - if next_form { - true - } - else { - if self.next_field() { - self.advance() - } - else { - false - } - } - } - - fn get_term(&self) -> Term { - let field = self.field.clone(); - let value = self.current_form_postings.as_ref().unwrap().form; - Term::from_field_text(field, value) - } - - fn doc_cursor(&self,) -> CIWDocCursor<'a> { - let postings = self.current_form_postings - .as_ref() - .unwrap() - .postings; - let num_docs = postings.doc_ids.len() as DocId; - CIWDocCursor { - num_docs: num_docs, - docs_it: postings - .doc_ids - .iter(), - current: None - } - } - - fn next_form(&mut self,) -> bool { - match self.form_it.next() { - Some(form_postings) => { - self.current_form_postings = Some(form_postings); - return true; - }, - None => { false } - } - } - - // Advance to the next field - // sets up form_it to iterate on forms - // returns true iff there was a next field - fn next_field(&mut self,) -> bool { - match self.field_it.next() { - Some((field, field_writer)) => { - self.form_it = CIWFormCursor { - term_it: field_writer.term_index.iter(), - postings_map: &field_writer.postings, - }; - self.field = field; - true - }, - None => false, - } - } -} - -impl<'a> TermCursor for CIWTermCursor<'a> { - - type DocCur = CIWDocCursor<'a>; - - fn next(&mut self,) -> Option<(Term, CIWDocCursor<'a>)> { - if self.advance() { - Some((self.get_term(), self.doc_cursor())) - } - else { - None - } - } -} - - -impl<'a> SerializableSegment<'a> for SegmentWriter { - - type TermCur = CIWTermCursor<'a>; - - fn term_cursor(&'a self) -> CIWTermCursor<'a> { - let mut field_it: hash_map::Iter<'a, Field, FieldWriter> = self.term_writers.iter(); - let (field, field_writer) = field_it.next().unwrap(); - CIWTermCursor { - field_it: field_it, - form_it: CIWFormCursor { - term_it: field_writer.term_index.iter(), - postings_map: &field_writer.postings, - }, - field: field, - current_form_postings: None, - } - // TODO handle having no fields at all - } -} - -// TODO add positions - -pub struct CIWDocCursor<'a> { - docs_it: slice::Iter<'a, DocId>, - current: Option, - num_docs: DocId, -} - -impl<'a> Iterator for CIWDocCursor<'a> { - type Item=DocId; - - fn next(&mut self) -> Option { - self.current = self.docs_it.next().map(|x| *x); - self.current - } -} - -impl<'a> DocCursor for CIWDocCursor<'a> { - - fn doc(&self,) -> DocId { - self.current.unwrap() - } - - fn len(&self) -> DocId { - self.num_docs - } -} +// +// pub struct CIWTermCursor<'a> { +// field_it: hash_map::Iter<'a, Field, FieldWriter>, +// form_it: CIWFormCursor<'a>, +// current_form_postings: Option>, +// field: &'a Field, +// } +// +// impl<'a> CIWTermCursor<'a> { +// +// fn advance(&mut self,) -> bool { +// let next_form = self.next_form(); +// if next_form { +// true +// } +// else { +// if self.next_field() { +// self.advance() +// } +// else { +// false +// } +// } +// } +// +// fn get_term(&self) -> Term { +// let field = self.field.clone(); +// let value = self.current_form_postings.as_ref().unwrap().form; +// Term::from_field_text(field, value) +// } +// +// fn doc_cursor(&self,) -> CIWDocCursor<'a> { +// let postings = self.current_form_postings +// .as_ref() +// .unwrap() +// .postings; +// let num_docs = postings.doc_ids.len() as DocId; +// CIWDocCursor { +// num_docs: num_docs, +// docs_it: postings +// .doc_ids +// .iter(), +// current: None +// } +// } +// +// fn next_form(&mut self,) -> bool { +// match self.form_it.next() { +// Some(form_postings) => { +// self.current_form_postings = Some(form_postings); +// return true; +// }, +// None => { false } +// } +// } +// +// // Advance to the next field +// // sets up form_it to iterate on forms +// // returns true iff there was a next field +// fn next_field(&mut self,) -> bool { +// match self.field_it.next() { +// Some((field, field_writer)) => { +// self.form_it = CIWFormCursor { +// term_it: field_writer.term_index.iter(), +// postings_map: &field_writer.postings, +// }; +// self.field = field; +// true +// }, +// None => false, +// } +// } +// } +// +// impl<'a> TermCursor for CIWTermCursor<'a> { +// +// type DocCur = CIWDocCursor<'a>; +// +// fn next(&mut self,) -> Option<(Term, CIWDocCursor<'a>)> { +// if self.advance() { +// Some((self.get_term(), self.doc_cursor())) +// } +// else { +// None +// } +// } +// } +// +// +// impl<'a> SerializableSegment<'a> for SegmentWriter { +// +// type TermCur = CIWTermCursor<'a>; +// +// fn term_cursor(&'a self) -> CIWTermCursor<'a> { +// let mut field_it: hash_map::Iter<'a, Field, FieldWriter> = self.term_writers.iter(); +// let (field, field_writer) = field_it.next().unwrap(); +// CIWTermCursor { +// field_it: field_it, +// form_it: CIWFormCursor { +// term_it: field_writer.term_index.iter(), +// postings_map: &field_writer.postings, +// }, +// field: field, +// current_form_postings: None, +// } +// // TODO handle having no fields at all +// } +// } +// +// // TODO add positions +// +// pub struct CIWDocCursor<'a> { +// docs_it: slice::Iter<'a, DocId>, +// current: Option, +// num_docs: DocId, +// } +// +// impl<'a> Iterator for CIWDocCursor<'a> { +// type Item=DocId; +// +// fn next(&mut self) -> Option { +// self.current = self.docs_it.next().map(|x| *x); +// self.current +// } +// } +// +// impl<'a> DocCursor for CIWDocCursor<'a> { +// +// fn doc(&self,) -> DocId { +// self.current.unwrap() +// } +// +// fn len(&self) -> DocId { +// self.num_docs +// } +// }