This commit is contained in:
Paul Masurel
2016-02-21 11:41:29 +09:00
parent 1215049adf
commit b130ccaa83
2 changed files with 119 additions and 22 deletions

View File

@@ -1,25 +1,36 @@
use std::io::BufWriter;
use std::fs::File;
use std::fmt;
use std::cell::RefCell;
use core::global::DocId;
use core::schema::Document;
use core::schema::Field;
use core::schema::FieldValue;
use core::schema::FieldOptions;
use core::schema::Schema;
use core::error;
use core::serialize::BinarySerializable;
use std::io::Write;
use std::io::Read;
use std::io::Cursor;
use std::io::SeekFrom;
use fst::raw::MmapReadOnly;
use std::io::Seek;
use lz4;
use tempfile;
const BLOCK_SIZE: usize = 262144;
pub struct StoreWriter {
doc: DocId,
offsets: Vec<OffsetIndex>,
offsets: Vec<OffsetIndex>, // TODO have a better index.
written: u64,
writer: BufWriter<File>,
intermediary_buffer: Vec<u8>,
current_block: Vec<u8>,
}
#[derive(Debug)]
#[derive(Debug, Clone, PartialEq, Eq)]
struct OffsetIndex(DocId, u64);
impl BinarySerializable for OffsetIndex {
@@ -34,8 +45,6 @@ impl BinarySerializable for OffsetIndex {
}
}
impl StoreWriter {
pub fn new(file: File) -> StoreWriter {
@@ -44,11 +53,12 @@ impl StoreWriter {
written: 0,
offsets: Vec::new(),
writer: BufWriter::new(file),
intermediary_buffer: Vec::new(),
current_block: Vec::new(),
}
}
pub fn store(&mut self, field_values: &Vec<&FieldValue>) {
pub fn store<'a>(&mut self, field_values: &Vec<&'a FieldValue>) {
(field_values.len() as u32).serialize(&mut self.current_block);
for field_value in field_values.iter() {
(*field_value).serialize(&mut self.current_block);
@@ -61,11 +71,18 @@ impl StoreWriter {
fn write_and_compress_block(&mut self,) {
// err handling
let mut encoder = lz4::EncoderBuilder::new()
.build(&mut self.writer)
.unwrap();
encoder.write_all(&self.current_block);
self.written = self.current_block.len() as u64;
self.intermediary_buffer.clear();
{
let mut encoder = lz4::EncoderBuilder::new()
.build(&mut self.intermediary_buffer)
.unwrap();
encoder.write_all(&self.current_block);
encoder.finish();
}
let compressed_block_size = self.intermediary_buffer.len() as u64;
self.written += (compressed_block_size as u32).serialize(&mut self.writer).unwrap() as u64;
self.writer.write_all(&self.intermediary_buffer);
self.written += compressed_block_size;
self.offsets.push(OffsetIndex(self.doc, self.written));
self.current_block.clear();
}
@@ -74,22 +91,101 @@ impl StoreWriter {
if self.current_block.len() > 0 {
self.write_and_compress_block();
}
let header_offset: u64 = self.written;
self.offsets.serialize(&mut self.writer);
header_offset.serialize(&mut self.writer);
self.writer.flush();
}
}
struct StoreReader {
data: MmapReadOnly,
offsets: Vec<OffsetIndex>,
current_block: RefCell<Vec<u8>>,
}
impl StoreReader {
fn read_header(data: &MmapReadOnly) -> Vec<OffsetIndex> {
// todo err
let mut cursor = Cursor::new(unsafe {data.as_slice()} );
cursor.seek(SeekFrom::End(-8));
let offset = u64::deserialize(&mut cursor).unwrap();
cursor.seek(SeekFrom::Start(offset));
Vec::deserialize(&mut cursor).unwrap()
}
fn block_offset(&self, doc_id: DocId) -> OffsetIndex {
let mut offset = OffsetIndex(0, 0);
for &OffsetIndex(first_doc_id, block_offset) in self.offsets.iter() {
if first_doc_id > doc_id {
break;
}
else {
offset = OffsetIndex(first_doc_id, block_offset);
}
}
return offset;
}
fn read_block(&self, block_offset: usize) {
let mut current_block_mut = self.current_block.borrow_mut();
current_block_mut.clear();
let total_buffer = unsafe {self.data.as_slice()};
let mut cursor = Cursor::new(total_buffer);
let block_length = u32::deserialize(&mut cursor).unwrap();
let block_array: &[u8] = &total_buffer[(block_offset + 4 as usize)..(block_offset + 4 + block_length as usize)];
let mut lz4_decoder = lz4::Decoder::new(Cursor::new(block_array)).unwrap();
lz4_decoder.read_to_end(&mut current_block_mut);
}
pub fn get(&self, doc_id: DocId) -> Vec<FieldValue> {
let OffsetIndex(first_doc_id, block_offset) = self.block_offset(doc_id);
self.read_block(block_offset as usize);
// for first_doc_id..doc_id {
//
// }
Vec::new()
}
pub fn new(data: MmapReadOnly) -> StoreReader {
let offsets = StoreReader::read_header(&data);
StoreReader {
data: data,
offsets: offsets,
current_block: RefCell::new(Vec::new()),
}
}
}
// impl<T: BinarySerializable> BinarySerializable for Vec<T> {
// fn serialize(&self, field_values: &mut Iterator<Item=&FieldValue>) -> error::Result<usize> {
// let mut total_size = 0;
// writer.write_u32::<BigEndian>(self.len() as u32);
// total_size += 4;
// for it in self.iter() {
// let item_size = try!(it.serialize(writer));
// total_size += item_size;
// }
// Ok(total_size)
// }
// }
#[test]
fn test_store() {
let offsets;
let mut store_file = tempfile::NamedTempFile::new().unwrap();
{
let mut schema = Schema::new();
let field = schema.add_field("titi", &FieldOptions::new().set_stored());
let mut store_writer = StoreWriter::new(store_file.reopen().unwrap());
for i in 0..10000 {
let mut fields: Vec<FieldValue> = Vec::new();
let text = format!("Doc {} he LZ4 algorithm represents the data as a series of sequences. Each sequence begins with a one byte token that is broken into two 4 bit fields. The first field represents the number of literal bytes that are to be copied to the output. The second field represents the number of bytes to copy from the already decoded output buffer (with 0 representing the minimum match length of 4 bytes). A value of 15 in either of the bitfields indicates that the length is larger and there is an extra byte of data that is to be added to the length. A value of 255 in these extra bytes indicates that yet another byte to be added. Hence arbitrary lengths are represented by a series of extra bytes containing the value 255. The string of literals comes after the token and any extra bytes needed to indicate string length. This is followed by an offset that indicates how far back in the output buffer to begin copying. The extra bytes (if any) of the match-length come at the end of the sequence.[2] Compression can be carried out in a stream or in blocks. Higher compression ratios can be achieved by investing more effort in finding the best matches. This results in both a smaller output and faster d", i);
let field_value = FieldValue {
field: field.clone(),
text: text,
};
fields.push(field_value);
let fields_refs: Vec<&FieldValue> = fields.iter().collect();
store_writer.store(&fields_refs);
}
store_writer.close();
offsets = store_writer.offsets.clone();
}
let store_mmap = MmapReadOnly::open(&store_file).unwrap();
let store = StoreReader::new(store_mmap);
assert_eq!(offsets, store.offsets);
println!("{:?}", store.get(143));
assert!(false);
}

View File

@@ -13,6 +13,7 @@ extern crate byteorder;
extern crate memmap;
extern crate rand;
extern crate regex;
extern crate tempfile;
extern crate rustc_serialize;
extern crate combine;
extern crate atomicwrites;