Merge branch 'issue/38'

This commit is contained in:
Paul Masurel
2016-10-24 22:33:59 +09:00
8 changed files with 115 additions and 174 deletions

View File

@@ -124,7 +124,7 @@ impl SegmentReader {
);
let source = try!(segment.open_read(SegmentComponent::TERMS));
let term_infos = try!(FstMap::from_source(source));
let store_reader = StoreReader::new(try!(segment.open_read(SegmentComponent::STORE)));
let store_reader = StoreReader::from(try!(segment.open_read(SegmentComponent::STORE)));
let postings_shared_mmap = try!(segment.open_read(SegmentComponent::POSTINGS));
let fast_field_data = try!(segment.open_read(SegmentComponent::FASTFIELDS));

View File

@@ -19,7 +19,7 @@ mod tests {
let mut skip_list_builder: SkipListBuilder<u32> = SkipListBuilder::new(10);
skip_list_builder.insert(2, &3).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<u32> = SkipList::read(&mut output);
let mut skip_list: SkipList<u32> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next(), Some((2, 3)));
}
@@ -28,7 +28,7 @@ mod tests {
let mut output: Vec<u8> = Vec::new();
let skip_list_builder: SkipListBuilder<u32> = SkipListBuilder::new(10);
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<u32> = SkipList::read(&mut output);
let mut skip_list: SkipList<u32> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next(), None);
}
@@ -42,7 +42,7 @@ mod tests {
skip_list_builder.insert(7, &()).unwrap();
skip_list_builder.insert(9, &()).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<()> = SkipList::read(&mut output);
let mut skip_list: SkipList<()> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next().unwrap(), (2, ()));
assert_eq!(skip_list.next().unwrap(), (3, ()));
assert_eq!(skip_list.next().unwrap(), (5, ()));
@@ -61,7 +61,7 @@ mod tests {
skip_list_builder.insert(7, &()).unwrap();
skip_list_builder.insert(9, &()).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<()> = SkipList::read(&mut output);
let mut skip_list: SkipList<()> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next().unwrap(), (2, ()));
skip_list.seek(5);
assert_eq!(skip_list.next().unwrap(), (5, ()));
@@ -79,7 +79,7 @@ mod tests {
skip_list_builder.insert(5, &()).unwrap();
skip_list_builder.insert(6, &()).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<()> = SkipList::read(&mut output);
let mut skip_list: SkipList<()> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next().unwrap(), (2, ()));
skip_list.seek(6);
assert_eq!(skip_list.next().unwrap(), (6, ()));
@@ -96,7 +96,7 @@ mod tests {
skip_list_builder.insert(7, &()).unwrap();
skip_list_builder.insert(9, &()).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<()> = SkipList::read(&mut output);
let mut skip_list: SkipList<()> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next().unwrap(), (2, ()));
skip_list.seek(10);
assert_eq!(skip_list.next(), None);
@@ -111,7 +111,7 @@ mod tests {
}
skip_list_builder.insert(1004, &()).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<()> = SkipList::read(&mut output);
let mut skip_list: SkipList<()> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next().unwrap(), (0, ()));
skip_list.seek(431);
assert_eq!(skip_list.next().unwrap(), (431,()) );

View File

@@ -1,13 +1,13 @@
use common::BinarySerializable;
use std::io::Cursor;
use std::io::SeekFrom;
use std::io::Seek;
use std::marker::PhantomData;
use DocId;
use std::cmp::max;
static EMPTY: [u8; 0] = [];
struct Layer<'a, T> {
cursor: Cursor<&'a [u8]>,
data: &'a [u8],
cursor: &'a [u8],
next_id: DocId,
_phantom_: PhantomData<T>,
}
@@ -23,51 +23,41 @@ impl<'a, T: BinarySerializable> Iterator for Layer<'a, T> {
else {
let cur_val = T::deserialize(&mut self.cursor).unwrap();
let cur_id = self.next_id;
self.next_id =
match u32::deserialize(&mut self.cursor) {
Ok(val) => val,
Err(_) => u32::max_value()
};
self.next_id = u32::deserialize(&mut self.cursor).unwrap_or(u32::max_value());
Some((cur_id, cur_val))
}
}
}
static EMPTY: [u8; 0] = [];
impl<'a, T: BinarySerializable> Layer<'a, T> {
fn read(mut cursor: Cursor<&'a [u8]>) -> Layer<'a, T> {
// TODO error handling?
let next_id = match u32::deserialize(&mut cursor) {
Ok(val) => val,
Err(_) => u32::max_value(),
};
impl<'a, T: BinarySerializable> From<&'a [u8]> for Layer<'a, T> {
fn from(data: &'a [u8]) -> Layer<'a, T> {
let mut cursor = data;
let next_id = u32::deserialize(&mut cursor).unwrap_or(u32::max_value());
Layer {
data: data,
cursor: cursor,
next_id: next_id,
_phantom_: PhantomData,
}
}
}
impl<'a, T: BinarySerializable> Layer<'a, T> {
fn empty() -> Layer<'a, T> {
Layer {
cursor: Cursor::new(&EMPTY),
data: &EMPTY,
cursor: &EMPTY,
next_id: DocId::max_value(),
_phantom_: PhantomData,
}
}
fn seek_offset(&mut self, offset: usize) {
self.cursor.seek(SeekFrom::Start(offset as u64)).unwrap();
self.next_id = match u32::deserialize(&mut self.cursor) {
Ok(val) => val,
Err(_) => u32::max_value(),
};
self.cursor = &self.data[offset..];
self.next_id = u32::deserialize(&mut self.cursor).unwrap_or(u32::max_value());
}
// Returns the last element (key, val)
// such that (key < doc_id)
//
@@ -104,8 +94,7 @@ impl<'a, T: BinarySerializable> SkipList<'a, T> {
pub fn seek(&mut self, doc_id: DocId) -> Option<(DocId, T)> {
let mut next_layer_skip: Option<(DocId, u32)> = None;
for skip_layer_id in 0..self.skip_layers.len() {
let mut skip_layer: &mut Layer<'a, u32> = &mut self.skip_layers[skip_layer_id];
for skip_layer in &mut self.skip_layers {
if let Some((_, offset)) = next_layer_skip {
skip_layer.seek_offset(offset as usize);
}
@@ -117,37 +106,32 @@ impl<'a, T: BinarySerializable> SkipList<'a, T> {
self.data_layer.seek(doc_id)
}
pub fn read(data: &'a [u8]) -> SkipList<'a, T> {
let mut cursor = Cursor::new(data);
let offsets: Vec<u32> = Vec::deserialize(&mut cursor).unwrap();
}
impl<'a, T: BinarySerializable> From<&'a [u8]> for SkipList<'a, T> {
fn from(mut data: &'a [u8]) -> SkipList<'a, T> {
let offsets: Vec<u32> = Vec::deserialize(&mut data).unwrap();
let num_layers = offsets.len();
let start_position = cursor.position() as usize;
let layers_data: &[u8] = &data[start_position..data.len()];
let layers_data: &[u8] = data;
let data_layer: Layer<'a, T> =
if num_layers == 0 { Layer::empty() }
else {
let first_layer_data: &[u8] = &layers_data[..offsets[0] as usize];
let first_layer_cursor = Cursor::new(first_layer_data);
Layer::read(first_layer_cursor)
Layer::from(first_layer_data)
};
let mut skip_layers =
if num_layers > 0 {
offsets.iter()
.zip(&offsets[1..])
.map(|(start, stop)| {
let layer_data: &[u8] = &layers_data[*start as usize..*stop as usize];
let cursor = Cursor::new(layer_data);
Layer::read(cursor)
})
.collect()
}
else {
Vec::new()
};
skip_layers.reverse();
let skip_layers = (0..max(1, num_layers) - 1)
.map(|i| (offsets[i] as usize, offsets[i + 1] as usize))
.map(|(start, stop)| {
Layer::from(&layers_data[start..stop])
})
.collect();
SkipList {
skip_layers: skip_layers,
data_layer: data_layer,
}
}
}

View File

@@ -37,17 +37,13 @@ impl<T: BinarySerializable> LayerBuilder<T> {
self.remaining -= 1;
self.len += 1;
let offset = self.written_size() as u32; // TODO not sure if we want after or here
let res =
if self.remaining == 0 {
self.remaining = self.period;
Some((doc_id, offset))
}
else {
None
};
try!(doc_id.serialize(&mut self.buffer));
try!(value.serialize(&mut self.buffer));
Ok(res)
Ok(if self.remaining == 0 {
self.remaining = self.period;
Some((doc_id, offset))
}
else { None })
}
}
@@ -97,13 +93,13 @@ impl<T: BinarySerializable> SkipListBuilder<T> {
let mut layer_sizes: Vec<u32> = Vec::new();
size += self.data_layer.buffer.len() as u32;
layer_sizes.push(size);
for layer in &self.skip_layers {
for layer in self.skip_layers.iter().rev() {
size += layer.buffer.len() as u32;
layer_sizes.push(size);
}
try!(layer_sizes.serialize(output));
try!(self.data_layer.write(output));
for layer in &self.skip_layers {
for layer in self.skip_layers.iter().rev() {
try!(layer.write(output));
}
Ok(())

View File

@@ -21,9 +21,8 @@ pub struct SegmentSerializer {
}
impl SegmentSerializer {
/// Creates a new `SegmentSerializer`.
pub fn for_segment(segment: &mut Segment) -> Result<SegmentSerializer> {
pub fn for_segment(segment: &mut Segment) -> Result<SegmentSerializer> {
let store_write = try!(segment.open_write(SegmentComponent::STORE));
let fast_field_write = try!(segment.open_write(SegmentComponent::FASTFIELDS));
@@ -41,39 +40,39 @@ impl SegmentSerializer {
fieldnorms_serializer: fieldnorms_serializer,
})
}
/// Accessor to the `PostingsSerializer`.
pub fn get_postings_serializer(&mut self,) -> &mut PostingsSerializer {
pub fn get_postings_serializer(&mut self) -> &mut PostingsSerializer {
&mut self.postings_serializer
}
/// Accessor to the `FastFieldSerializer`.
pub fn get_fast_field_serializer(&mut self,) -> &mut FastFieldSerializer {
pub fn get_fast_field_serializer(&mut self) -> &mut FastFieldSerializer {
&mut self.fast_field_serializer
}
/// Accessor to the field norm serializer.
pub fn get_fieldnorms_serializer(&mut self,) -> &mut FastFieldSerializer {
pub fn get_fieldnorms_serializer(&mut self) -> &mut FastFieldSerializer {
&mut self.fieldnorms_serializer
}
/// Accessor to the `StoreWriter`.
pub fn get_store_writer(&mut self,) -> &mut StoreWriter {
pub fn get_store_writer(&mut self) -> &mut StoreWriter {
&mut self.store_writer
}
/// Write the `SegmentInfo`
pub fn write_segment_info(&mut self, segment_info: &SegmentInfo) -> Result<()> {
let mut write = try!(self.segment.open_write(SegmentComponent::INFO));
let json_data = json::encode(segment_info)
.expect("Encoding to segment_info to JSON failed. This should never happen");
.expect("Encoding to segment_info to JSON failed. This should never happen");
try!(write.write_all(json_data.as_bytes()));
try!(write.flush());
Ok(())
}
/// Finalize the segment serialization.
pub fn close(mut self,) -> Result<()> {
pub fn close(self) -> Result<()> {
try!(self.fast_field_serializer.close());
try!(self.postings_serializer.close());
try!(self.store_writer.close());

View File

@@ -1,12 +1,8 @@
mod reader;
mod writer;
use DocId;
pub use self::reader::StoreReader;
pub use self::writer::StoreWriter;
#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd)]
pub struct OffsetIndex(DocId, u64);
#[cfg(test)]
mod tests {
@@ -55,7 +51,7 @@ mod tests {
let schema = write_lorem_ipsum_store(store_file);
let field_title = schema.get_field("title").unwrap();
let store_source = directory.open_read(path).unwrap();
let store = StoreReader::new(store_source);
let store = StoreReader::from(store_source);
for i in (0..10).map(|i| i * 3 / 2) {
assert_eq!(*store.get(i).unwrap().get_first(field_title).unwrap().text(), format!("Doc {}", i));
}
@@ -78,7 +74,7 @@ mod tests {
let path = Path::new("store");
write_lorem_ipsum_store(directory.open_write(path).unwrap());
let store_source = directory.open_read(path).unwrap();
let store = StoreReader::new(store_source);
let store = StoreReader::from(store_source);
b.iter(|| {
store.get(12).unwrap();
});

View File

@@ -6,53 +6,23 @@ use DocId;
use schema::Document;
use schema::FieldValue;
use common::BinarySerializable;
use std::io::Read;
use std::io;
use std::cmp::Ordering;
use std::mem::size_of;
use std::io::{self, Read};
use datastruct::SkipList;
use lz4;
use super::OffsetIndex;
pub struct StoreReader {
pub data: ReadOnlySource,
pub offsets: Vec<OffsetIndex>,
pub offset_index_source: ReadOnlySource,
current_block: RefCell<Vec<u8>>,
pub max_doc: DocId,
}
impl StoreReader {
fn read_header(data: &ReadOnlySource) -> Vec<OffsetIndex> {
// TODO err
// the first offset is implicitely (0, 0)
let mut offsets = vec!(OffsetIndex(0, 0));
let buffer: &[u8] = data.as_slice();
let offset = {
let mut cursor = &buffer[buffer.len() - 8..];
u64::deserialize(&mut cursor).unwrap() as usize
};
{
let mut cursor = &buffer[offset..];
offsets.append(&mut Vec::deserialize(&mut cursor).unwrap());
}
offsets
}
fn block_offset(&self, seek: DocId) -> OffsetIndex {
fn search(offsets: &[OffsetIndex], seek: DocId) -> OffsetIndex {
let m = offsets.len() / 2;
let pivot_offset = &offsets[m];
if offsets.len() <= 1 {
return pivot_offset.clone()
}
match pivot_offset.0.cmp(&seek) {
Ordering::Less => search(&offsets[m..], seek),
Ordering::Equal => pivot_offset.clone(),
Ordering::Greater => search(&offsets[..m], seek),
}
}
search(&self.offsets, seek)
fn block_offset(&self, doc_id: DocId) -> (DocId, u64) {
SkipList::from(self.offset_index_source.as_slice())
.seek(doc_id)
.unwrap_or((0u32, 0u64))
}
fn read_block(&self, block_offset: usize) -> io::Result<()> {
@@ -61,17 +31,18 @@ impl StoreReader {
let total_buffer = self.data.as_slice();
let mut cursor = &total_buffer[block_offset..];
let block_length = u32::deserialize(&mut cursor).unwrap();
let block_array: &[u8] = &total_buffer[(block_offset + 4 as usize)..(block_offset + 4 + block_length as usize)];
let block_array: &[u8] =
&total_buffer[(block_offset + 4 as usize)..(block_offset + 4 + block_length as usize)];
let mut lz4_decoder = try!(lz4::Decoder::new(block_array));
lz4_decoder.read_to_end(&mut current_block_mut).map(|_| ())
}
pub fn get(&self, doc_id: DocId) -> Result<Document> {
let OffsetIndex(first_doc_id, block_offset) = self.block_offset(doc_id);
let (first_doc_id, block_offset) = self.block_offset(doc_id);
try!(self.read_block(block_offset as usize));
let current_block_mut = self.current_block.borrow_mut();
let mut cursor = &current_block_mut[..];
for _ in first_doc_id..doc_id {
for _ in first_doc_id..doc_id {
let block_length = try!(u32::deserialize(&mut cursor));
cursor = &cursor[block_length as usize..];
}
@@ -84,13 +55,29 @@ impl StoreReader {
}
Ok(Document::from(field_values))
}
}
pub fn new(data: ReadOnlySource) -> StoreReader {
let offsets = StoreReader::read_header(&data);
fn split_source(data: ReadOnlySource) -> (ReadOnlySource, ReadOnlySource, DocId) {
let data_len = data.len();
let footer_offset = data_len - size_of::<u64>() - size_of::<u32>();
let serialized_offset: ReadOnlySource = data.slice(footer_offset, data_len);
let mut serialized_offset_buf = serialized_offset.as_slice();
let offset = u64::deserialize(&mut serialized_offset_buf).unwrap();
let offset = offset as usize;
let max_doc = u32::deserialize(&mut serialized_offset_buf).unwrap();
(data.slice(0, offset), data.slice(offset, footer_offset), max_doc)
}
impl From<ReadOnlySource> for StoreReader {
fn from(data: ReadOnlySource) -> StoreReader {
let (data_source, offset_index_source, max_doc) = split_source(data);
StoreReader {
data: data,
offsets: offsets,
data: data_source,
offset_index_source: offset_index_source,
current_block: RefCell::new(Vec::new()),
max_doc: max_doc,
}
}
}

View File

@@ -3,66 +3,45 @@ use DocId;
use schema::FieldValue;
use common::BinarySerializable;
use std::io::Write;
use std::io::Read;
use std::io;
use error::Result;
use lz4;
use datastruct::SkipListBuilder;
use super::StoreReader;
use super::OffsetIndex;
const BLOCK_SIZE: usize = 16_384;
pub struct StoreWriter {
doc: DocId,
offsets: Vec<OffsetIndex>, // TODO have a better index.
written: u64,
offset_index_writer: SkipListBuilder<u64>,
writer: WritePtr,
intermediary_buffer: Vec<u8>,
current_block: Vec<u8>,
}
impl BinarySerializable for OffsetIndex {
fn serialize(&self, writer: &mut Write) -> io::Result<usize> {
let OffsetIndex(a, b) = *self;
Ok(try!(a.serialize(writer)) + try!(b.serialize(writer)))
}
fn deserialize(reader: &mut Read) -> io::Result<OffsetIndex> {
let a = try!(DocId::deserialize(reader));
let b = try!(u64::deserialize(reader));
Ok(OffsetIndex(a, b))
}
}
impl StoreWriter {
pub fn new(writer: WritePtr) -> StoreWriter {
StoreWriter {
doc: 0,
written: 0,
offsets: Vec::new(),
offset_index_writer: SkipListBuilder::new(3),
writer: writer,
intermediary_buffer: Vec::new(),
current_block: Vec::new(),
}
}
pub fn stack_reader(&mut self, reader: &StoreReader) -> io::Result<()> {
if !self.current_block.is_empty() {
try!(self.write_and_compress_block());
}
match reader.offsets.last() {
Some(&OffsetIndex(ref num_docs, ref body_size)) => {
try!(self.writer.write_all(&reader.data.as_slice()[0..*body_size as usize]));
for &OffsetIndex(doc, offset) in &reader.offsets {
self.offsets.push(OffsetIndex(self.doc + doc, self.written + offset));
}
self.written += *body_size;
self.doc += *num_docs;
Ok(())
},
None => {
Err(io::Error::new(io::ErrorKind::Other, "No offset for reader"))
}
pub fn stack_reader(&mut self, reader: &StoreReader) -> Result<()> {
for doc_id in 0..reader.max_doc {
let doc = try!(reader.get(doc_id));
let field_values: Vec<&FieldValue> = doc.field_values()
.iter()
.collect();
try!(self.store(&field_values));
}
Ok(())
}
pub fn store<'a>(&mut self, field_values: &[&'a FieldValue]) -> io::Result<()> {
@@ -80,7 +59,7 @@ impl StoreWriter {
Ok(())
}
fn write_and_compress_block(&mut self,) -> io::Result<()> {
fn write_and_compress_block(&mut self) -> io::Result<()> {
self.intermediary_buffer.clear();
{
let mut encoder = try!(lz4::EncoderBuilder::new().build(&mut self.intermediary_buffer));
@@ -92,19 +71,19 @@ impl StoreWriter {
self.written += try!((compressed_block_size as u32).serialize(&mut self.writer)) as u64;
try!(self.writer.write_all(&self.intermediary_buffer));
self.written += compressed_block_size;
self.offsets.push(OffsetIndex(self.doc, self.written));
try!(self.offset_index_writer.insert(self.doc, &self.written));
self.current_block.clear();
Ok(())
}
pub fn close(&mut self,) -> io::Result<()> {
pub fn close(mut self) -> io::Result<()> {
if !self.current_block.is_empty() {
try!(self.write_and_compress_block());
}
let header_offset: u64 = self.written;
try!(self.offsets.serialize(&mut self.writer));
try!(self.offset_index_writer.write::<Box<Write>>(&mut self.writer));
try!(header_offset.serialize(&mut self.writer));
try!(self.doc.serialize(&mut self.writer));
self.writer.flush()
}
}