This commit is contained in:
Paul Masurel
2016-02-19 18:41:01 +09:00
6 changed files with 126 additions and 50 deletions

View File

@@ -11,6 +11,7 @@ use core::schema::Term;
use core::DocId;
use std::fs::File;
use core::simdcompression;
use core::schema::FieldValue;
pub struct SimpleCodec;
@@ -18,14 +19,28 @@ pub struct SimpleCodec;
// TODO should we vint?
pub struct SimpleSegmentSerializer {
segment: Segment,
written_bytes_postings: usize,
postings_write: File,
store_write: File,
term_fst_builder: MapBuilder<File>, // TODO find an alternative to work around the "move"
cur_term_num_docs: DocId,
encoder: simdcompression::Encoder,
}
impl SimpleSegmentSerializer {
pub fn segment(&self,) -> Segment {
self.segment.clone()
}
}
impl SegmentSerializer<()> for SimpleSegmentSerializer {
fn store_doc(&mut self, field: &mut Iterator<Item=&FieldValue>) {
}
fn new_term(&mut self, term: &Term, doc_freq: DocId) -> Result<()> {
self.term_fst_builder.insert(term.as_slice(), self.written_bytes_postings as u64);
self.cur_term_num_docs = doc_freq;
@@ -77,14 +92,17 @@ impl SimpleCodec {
// TODO impl packed int
// TODO skip lists
// TODO make that part of the codec API
fn serializer(segment: &Segment) -> Result<SimpleSegmentSerializer> {
pub fn serializer(segment: &Segment) -> Result<SimpleSegmentSerializer> {
let term_write = try!(segment.open_writable(SegmentComponent::TERMS));
let postings_write = try!(segment.open_writable(SegmentComponent::POSTINGS));
let store_write = try!(segment.open_writable(SegmentComponent::STORE));
let term_fst_builder_result = MapBuilder::new(term_write);
let term_fst_builder = term_fst_builder_result.unwrap();
Ok(SimpleSegmentSerializer {
segment: segment.clone(),
written_bytes_postings: 0,
postings_write: postings_write,
store_write: store_write,
term_fst_builder: term_fst_builder,
cur_term_num_docs: 0,
encoder: simdcompression::Encoder::new(),
@@ -93,7 +111,7 @@ impl SimpleCodec {
pub fn write<I: SerializableSegment>(index: &I, segment: &Segment) -> Result<()> {
let serializer = try!(SimpleCodec::serializer(segment));
index.write(serializer)
let mut serializer = try!(SimpleCodec::serializer(segment));
index.write(&mut serializer)
}
}

View File

@@ -331,6 +331,7 @@ pub enum SegmentComponent {
POSTINGS,
// POSITIONS,
TERMS,
STORE,
}
#[derive(Debug, Clone)]
@@ -352,6 +353,7 @@ impl Segment {
SegmentComponent::POSTINGS => ".idx",
// SegmentComponent::POSITIONS => ".pos",
SegmentComponent::TERMS => ".term",
SegmentComponent::STORE => ".store",
}
}

View File

@@ -7,16 +7,18 @@ use std::fmt;
pub trait SegmentSerializer<Output> {
fn new_term(&mut self, term: &Term, doc_freq: DocId) -> Result<()>;
fn write_docs(&mut self, docs: &[DocId]) -> Result<()>; // TODO add size
fn store_doc(&mut self, field: &mut Iterator<Item=&FieldValue>);
fn close(self,) -> Result<Output>;
}
pub trait SerializableSegment {
fn write<Output, SegSer: SegmentSerializer<Output>>(&self, serializer: SegSer) -> Result<Output>;
fn write<Output, SegSer: SegmentSerializer<Output>>(&self, serializer: &mut SegSer) -> Result<Output>;
}
pub struct DebugSegmentSerializer {
text: String,
num_docs: u32,
}
impl fmt::Debug for DebugSegmentSerializer {
@@ -28,13 +30,14 @@ impl fmt::Debug for DebugSegmentSerializer {
impl DebugSegmentSerializer {
pub fn debug_string<S: SerializableSegment>(index: &S) -> String {
let serializer = DebugSegmentSerializer::new();
index.write(serializer).unwrap()
let mut serializer = DebugSegmentSerializer::new();
index.write(&mut serializer).unwrap()
}
pub fn new() -> DebugSegmentSerializer {
DebugSegmentSerializer {
text: String::new(),
num_docs: 0,
}
}
}
@@ -46,6 +49,17 @@ impl SegmentSerializer<String> for DebugSegmentSerializer {
Ok(())
}
fn store_doc(&mut self, fields: &mut Iterator<Item=&FieldValue>) {
if self.num_docs == 0 {
self.text.push_str(&format!("# STORED DOC\n======\n"))
}
self.text.push_str(&format!("doc {}", self.num_docs));
for field_value in fields {
self.text.push_str(&format!("field {:?} |", field_value.field));
self.text.push_str(&format!("value {:?}\n", field_value.text));
}
}
fn write_docs(&mut self, docs: &[DocId]) -> Result<()> {
for doc in docs {
self.text.push_str(&format!(" - Doc {:?}\n", doc));

View File

@@ -24,15 +24,14 @@ impl Encoder {
}
pub fn encode(&mut self, input: &[u32]) -> &[u32] {
self.input_buffer.clear();
let input_len = input.len();
if input_len >= self.input_buffer.len() {
self.input_buffer = (0..input_len as u32).collect();
self.output_buffer = (0..input_len as u32 + 1000).collect();
// TODO use resize when available
}
// TODO use clone_from when available
self.input_buffer.clear();
let input_len = input.len();
if input_len >= self.input_buffer.len() {
self.input_buffer = (0..input_len as u32).collect();
self.output_buffer = (0..input_len as u32 + 1000).collect();
// TODO use resize when available
}
// TODO use clone_from when available
unsafe {
ptr::copy_nonoverlapping(input.as_ptr(), self.input_buffer.as_mut_ptr(), input_len);
let written_size = encode_native(
@@ -60,12 +59,11 @@ impl Decoder {
compressed_data: &[u32],
uncompressed_values: &mut [u32]) -> size_t {
unsafe {
let num_elements = decode_native(
return decode_native(
compressed_data.as_ptr(),
compressed_data.len() as size_t,
uncompressed_values.as_mut_ptr(),
uncompressed_values.len() as size_t);
return num_elements;
}
}
}
@@ -78,11 +76,11 @@ fn test_encode_big() {
let mut encoder = Encoder::new();
let input: Vec<u32> = (0..100000).into_iter().collect();
let data = encoder.encode(&input);
assert_eq!(data.len(), 4);
assert_eq!(data.len(), 9578);
let decoder = Decoder::new();
let mut data_output: Vec<u32> = (0..10000).collect();
assert_eq!(10000, decoder.decode(&data[0..4], &mut data_output));
for i in 0..10000 {
let mut data_output: Vec<u32> = (0..100000).collect();
assert_eq!(100000, decoder.decode(&data[0..9578], &mut data_output));
for i in 0..100000 {
assert_eq!(data_output[i], input[i]) ;
}
}

View File

@@ -4,11 +4,12 @@ use std::slice;
use core::global::*;
use core::schema::*;
use core::codec::*;
use std::rc::Rc;
use core::directory::Directory;
use core::analyzer::SimpleTokenizer;
use std::collections::{HashMap, BTreeMap};
use std::collections::{hash_map, btree_map};
use std::io::{BufWriter, Write};
use std::io::{Write};
use std::sync::Arc;
use std::mem;
use byteorder::{NativeEndian, ReadBytesExt, WriteBytesExt};
@@ -40,24 +41,30 @@ impl PostingsWriter {
}
pub struct IndexWriter {
segment_writer: SegmentWriter,
segment_writer: Rc<SegmentWriter>,
directory: Directory,
schema: Schema,
}
fn new_segment_writer(directory: &Directory, ) -> SegmentWriter {
let segment = directory.new_segment();
SegmentWriter::for_segment(segment)
}
impl IndexWriter {
pub fn open(directory: &Directory) -> IndexWriter {
let schema = directory.schema();
IndexWriter {
segment_writer: SegmentWriter::new(),
segment_writer: Rc::new(new_segment_writer(&directory)),
directory: directory.clone(),
schema: schema,
}
}
pub fn add(&mut self, doc: Document) {
self.segment_writer.add(doc, &self.schema);
Rc::get_mut(&mut self.segment_writer).unwrap().add(doc, &self.schema);
}
// TODO remove that some day
@@ -66,14 +73,27 @@ impl IndexWriter {
}
pub fn commit(&mut self,) -> Result<Segment> {
let segment = self.directory.new_segment();
try!(SimpleCodec::write(&self.segment_writer, &segment).map(|sz| (segment.clone(), sz)));
// At this point, the segment is written
// We still need to sync all of the file, as well as the parent directory.
try!(self.directory.sync(segment.clone()));
self.directory.publish_segment(segment.clone());
self.segment_writer = SegmentWriter::new();
Ok(segment)
// TODO error handling
let mut segment_writer_rc = self.segment_writer.clone();
self.segment_writer = Rc::new(new_segment_writer(&self.directory));
let segment_writer_res = Rc::try_unwrap(segment_writer_rc);
match segment_writer_res {
Ok(segment_writer) => {
let segment = segment_writer.segment();
segment_writer.write_pending();
// write(self.segment_serializer);
// try!(SimpleCodec::write(&self.segment_writer, &segment).map(|sz| (segment.clone(), sz)));
// At this point, the segment is written
// We still need to sync all of the file, as well as the parent directory.
try!(self.directory.sync(segment.clone()));
self.directory.publish_segment(segment.clone());
Ok(segment)
},
Err(_) => {
panic!("error while acquiring segment writer.");
}
}
}
}
@@ -85,17 +105,42 @@ pub struct SegmentWriter {
postings: Vec<PostingsWriter>,
term_index: BTreeMap<Term, usize>,
tokenizer: SimpleTokenizer,
segment_serializer: SimpleSegmentSerializer,
}
impl SegmentWriter {
fn new() -> SegmentWriter {
// write on disk all of the stuff that
// are still on RAM.
// for this version, that's the term dictionary
// and the postings
fn write_pending(mut self,) -> Result<()> {
//self.write(&mut self.segment_serializer);
{
for (term, postings_id) in self.term_index.iter() {
let doc_ids = &self.postings[postings_id.clone()].doc_ids;
let term_docfreq = doc_ids.len() as u32;
self.segment_serializer.new_term(&term, term_docfreq);
self.segment_serializer.write_docs(&doc_ids);
}
}
self.segment_serializer.close()
}
pub fn segment(&self,) -> Segment {
self.segment_serializer.segment()
}
fn for_segment(segment: Segment) -> SegmentWriter {
// TODO handle error
let segment_serializer = SimpleCodec::serializer(&segment).unwrap();
SegmentWriter {
num_tokens: 0,
max_doc: 0,
postings: Vec::new(),
term_index: BTreeMap::new(),
tokenizer: SimpleTokenizer::new(),
segment_serializer: segment_serializer,
}
}
@@ -113,6 +158,7 @@ impl SegmentWriter {
}
}
}
self.max_doc += 1;
}
@@ -131,19 +177,17 @@ impl SegmentWriter {
pub fn suscribe(&mut self, doc: DocId, term: Term) {
self.get_postings_writer(term).suscribe(doc);
}
}
impl SerializableSegment for SegmentWriter {
fn write<Output, SegSer: SegmentSerializer<Output>>(&self, mut serializer: SegSer) -> Result<Output> {
for (term, postings_id) in self.term_index.iter() {
let doc_ids = &self.postings[postings_id.clone()].doc_ids;
let term_docfreq = doc_ids.len() as u32;
serializer.new_term(&term, term_docfreq);
serializer.write_docs(&doc_ids);
}
serializer.close()
}
}
//
// impl SerializableSegment for SegmentWriter {
// fn write<Output, SegSer: SegmentSerializer<Output>>(&self, serializer: &mut SegSer) -> Result<Output> {
// for (term, postings_id) in self.term_index.iter() {
// let doc_ids = &self.postings[postings_id.clone()].doc_ids;
// let term_docfreq = doc_ids.len() as u32;
// serializer.new_term(&term, term_docfreq);
// serializer.write_docs(&doc_ids);
// }
// serializer.close()
// }
// }

View File

@@ -94,8 +94,8 @@ fn test_indexing() {
index_writer.add(doc);
}
let debug_serializer = DebugSegmentSerializer::new();
let segment_str_before_writing = DebugSegmentSerializer::debug_string(index_writer.current_segment_writer());
//let debug_serializer = DebugSegmentSerializer::new();
//let segment_str_before_writing = DebugSegmentSerializer::debug_string(index_writer.current_segment_writer());
let commit_result = index_writer.commit();
assert!(commit_result.is_ok());
let segment = commit_result.unwrap();