refactoring toward adding stored values.

This commit is contained in:
Paul Masurel
2016-02-19 12:38:58 +09:00
parent 2475655be3
commit e13262e70b
4 changed files with 91 additions and 37 deletions

View File

@@ -19,6 +19,7 @@ pub struct SimpleCodec;
// TODO should we vint?
pub struct SimpleSegmentSerializer {
segment: Segment,
written_bytes_postings: usize,
postings_write: File,
store_write: File,
@@ -27,6 +28,13 @@ pub struct SimpleSegmentSerializer {
encoder: simdcompression::Encoder,
}
impl SimpleSegmentSerializer {
pub fn segment(&self,) -> Segment {
self.segment.clone()
}
}
impl SegmentSerializer<()> for SimpleSegmentSerializer {
fn store_doc(&mut self, field: &mut Iterator<Item=&FieldValue>) {
@@ -84,13 +92,14 @@ impl SimpleCodec {
// TODO impl packed int
// TODO skip lists
// TODO make that part of the codec API
fn serializer(segment: &Segment) -> Result<SimpleSegmentSerializer> {
pub fn serializer(segment: &Segment) -> Result<SimpleSegmentSerializer> {
let term_write = try!(segment.open_writable(SegmentComponent::TERMS));
let postings_write = try!(segment.open_writable(SegmentComponent::POSTINGS));
let store_write = try!(segment.open_writable(SegmentComponent::STORE));
let term_fst_builder_result = MapBuilder::new(term_write);
let term_fst_builder = term_fst_builder_result.unwrap();
Ok(SimpleSegmentSerializer {
segment: segment.clone(),
written_bytes_postings: 0,
postings_write: postings_write,
store_write: store_write,
@@ -102,7 +111,7 @@ impl SimpleCodec {
pub fn write<I: SerializableSegment>(index: &I, segment: &Segment) -> Result<()> {
let serializer = try!(SimpleCodec::serializer(segment));
index.write(serializer)
let mut serializer = try!(SimpleCodec::serializer(segment));
index.write(&mut serializer)
}
}

View File

@@ -12,7 +12,7 @@ pub trait SegmentSerializer<Output> {
}
pub trait SerializableSegment {
fn write<Output, SegSer: SegmentSerializer<Output>>(&self, serializer: SegSer) -> Result<Output>;
fn write<Output, SegSer: SegmentSerializer<Output>>(&self, serializer: &mut SegSer) -> Result<Output>;
}
@@ -30,8 +30,8 @@ impl fmt::Debug for DebugSegmentSerializer {
impl DebugSegmentSerializer {
pub fn debug_string<S: SerializableSegment>(index: &S) -> String {
let serializer = DebugSegmentSerializer::new();
index.write(serializer).unwrap()
let mut serializer = DebugSegmentSerializer::new();
index.write(&mut serializer).unwrap()
}
pub fn new() -> DebugSegmentSerializer {

View File

@@ -4,6 +4,7 @@ use std::slice;
use core::global::*;
use core::schema::*;
use core::codec::*;
use std::rc::Rc;
use core::directory::Directory;
use core::analyzer::SimpleTokenizer;
use std::collections::{HashMap, BTreeMap};
@@ -40,24 +41,30 @@ impl PostingsWriter {
}
pub struct IndexWriter {
segment_writer: SegmentWriter,
segment_writer: Rc<SegmentWriter>,
directory: Directory,
schema: Schema,
}
fn new_segment_writer(directory: &Directory, ) -> SegmentWriter {
let segment = directory.new_segment();
SegmentWriter::for_segment(segment)
}
impl IndexWriter {
pub fn open(directory: &Directory) -> IndexWriter {
let schema = directory.schema();
IndexWriter {
segment_writer: SegmentWriter::new(),
segment_writer: Rc::new(new_segment_writer(&directory)),
directory: directory.clone(),
schema: schema,
}
}
pub fn add(&mut self, doc: Document) {
self.segment_writer.add(doc, &self.schema);
Rc::get_mut(&mut self.segment_writer).unwrap().add(doc, &self.schema);
}
// TODO remove that some day
@@ -66,14 +73,27 @@ impl IndexWriter {
}
pub fn commit(&mut self,) -> Result<Segment> {
let segment = self.directory.new_segment();
try!(SimpleCodec::write(&self.segment_writer, &segment).map(|sz| (segment.clone(), sz)));
// At this point, the segment is written
// We still need to sync all of the file, as well as the parent directory.
try!(self.directory.sync(segment.clone()));
self.directory.publish_segment(segment.clone());
self.segment_writer = SegmentWriter::new();
Ok(segment)
// TODO error handling
let mut segment_writer_rc = self.segment_writer.clone();
self.segment_writer = Rc::new(new_segment_writer(&self.directory));
let segment_writer_res = Rc::try_unwrap(segment_writer_rc);
match segment_writer_res {
Ok(segment_writer) => {
let segment = segment_writer.segment();
segment_writer.write_pending();
// write(self.segment_serializer);
// try!(SimpleCodec::write(&self.segment_writer, &segment).map(|sz| (segment.clone(), sz)));
// At this point, the segment is written
// We still need to sync all of the file, as well as the parent directory.
try!(self.directory.sync(segment.clone()));
self.directory.publish_segment(segment.clone());
Ok(segment)
},
Err(_) => {
panic!("error while acquiring segment writer.");
}
}
}
}
@@ -85,24 +105,49 @@ pub struct SegmentWriter {
postings: Vec<PostingsWriter>,
term_index: BTreeMap<Term, usize>,
tokenizer: SimpleTokenizer,
segment_serializer: SimpleSegmentSerializer,
}
impl Drop for SegmentWriter {
fn drop(&mut self) {
println!("num tokens {}", self.num_tokens);
}
}
// impl Drop for SegmentWriter {
// fn drop(&mut self) {
// println!("num tokens {}", self.num_tokens);
// }
// }
impl SegmentWriter {
fn new() -> SegmentWriter {
// write on disk all of the stuff that
// are still on RAM.
// for this version, that's the term dictionary
// and the postings
fn write_pending(mut self,) -> Result<()> {
//self.write(&mut self.segment_serializer);
{
for (term, postings_id) in self.term_index.iter() {
let doc_ids = &self.postings[postings_id.clone()].doc_ids;
let term_docfreq = doc_ids.len() as u32;
self.segment_serializer.new_term(&term, term_docfreq);
self.segment_serializer.write_docs(&doc_ids);
}
}
self.segment_serializer.close()
}
pub fn segment(&self,) -> Segment {
self.segment_serializer.segment()
}
fn for_segment(segment: Segment) -> SegmentWriter {
// TODO handle error
let segment_serializer = SimpleCodec::serializer(&segment).unwrap();
SegmentWriter {
num_tokens: 0,
max_doc: 0,
postings: Vec::new(),
term_index: BTreeMap::new(),
tokenizer: SimpleTokenizer::new(),
segment_serializer: segment_serializer,
}
}
@@ -141,15 +186,15 @@ impl SegmentWriter {
self.get_postings_writer(term).suscribe(doc);
}
}
impl SerializableSegment for SegmentWriter {
fn write<Output, SegSer: SegmentSerializer<Output>>(&self, mut serializer: SegSer) -> Result<Output> {
for (term, postings_id) in self.term_index.iter() {
let doc_ids = &self.postings[postings_id.clone()].doc_ids;
let term_docfreq = doc_ids.len() as u32;
serializer.new_term(&term, term_docfreq);
serializer.write_docs(&doc_ids);
}
serializer.close()
}
}
//
// impl SerializableSegment for SegmentWriter {
// fn write<Output, SegSer: SegmentSerializer<Output>>(&self, serializer: &mut SegSer) -> Result<Output> {
// for (term, postings_id) in self.term_index.iter() {
// let doc_ids = &self.postings[postings_id.clone()].doc_ids;
// let term_docfreq = doc_ids.len() as u32;
// serializer.new_term(&term, term_docfreq);
// serializer.write_docs(&doc_ids);
// }
// serializer.close()
// }
// }

View File

@@ -94,8 +94,8 @@ fn test_indexing() {
index_writer.add(doc);
}
let debug_serializer = DebugSegmentSerializer::new();
let segment_str_before_writing = DebugSegmentSerializer::debug_string(index_writer.current_segment_writer());
//let debug_serializer = DebugSegmentSerializer::new();
//let segment_str_before_writing = DebugSegmentSerializer::debug_string(index_writer.current_segment_writer());
let commit_result = index_writer.commit();
assert!(commit_result.is_ok());
let segment = commit_result.unwrap();