Stream working, all test passing

This commit is contained in:
Paul Masurel
2017-08-27 20:20:38 +09:00
parent 69351fb4a5
commit 5b1e71947f
6 changed files with 132 additions and 37 deletions

View File

@@ -43,6 +43,12 @@ impl ReadOnlySource {
}
}
pub fn split(self, addr: usize) -> (ReadOnlySource, ReadOnlySource) {
let left = self.slice(0, addr);
let right = self.slice_from(addr);
(left, right)
}
/// Creates a ReadOnlySource that is just a
/// view over a slice of the data.
///

View File

@@ -340,7 +340,8 @@ mod tests {
#[test]
fn test_stream_range() {
let ids: Vec<_> = (0u32..50_000u32)
// let ids: Vec<_> = (0u32..10_000u32)
let ids: Vec<_> = (0u32..10_000u32)
.map(|i| (format!("doc{:0>6}", i), i))
.collect();
let field_type = FieldType::Str(TEXT);

View File

@@ -1,4 +1,5 @@
use postings::TermInfo;
use super::CheckPoint;
use std::mem;
/// Returns the len of the longest
@@ -80,6 +81,10 @@ impl TermInfoDeltaEncoder {
}
}
pub fn term_info(&self) -> &TermInfo {
&self.term_info
}
pub fn encode(&mut self, term_info: TermInfo) -> DeltaTermInfo {
let mut delta_term_info = DeltaTermInfo {
doc_freq: term_info.doc_freq,
@@ -102,14 +107,28 @@ pub struct TermInfoDeltaDecoder {
has_positions: bool,
}
impl TermInfoDeltaDecoder {
pub fn new(has_positions: bool) -> TermInfoDeltaDecoder {
pub fn from_term_info(term_info: TermInfo, has_positions: bool) -> TermInfoDeltaDecoder {
TermInfoDeltaDecoder {
term_info: TermInfo::default(),
term_info: term_info,
has_positions: has_positions,
}
}
pub fn from_checkpoint(checkpoint: &CheckPoint, has_positions: bool) -> TermInfoDeltaDecoder {
TermInfoDeltaDecoder {
term_info: TermInfo {
doc_freq: 0u32,
postings_offset: checkpoint.postings_offset,
positions_offset: checkpoint.positions_offset,
positions_inner_offset: 0u8,
},
has_positions: has_positions
}
}
pub fn decode(&mut self, code: u8, cursor: &mut &[u8]) {
let num_bytes_docfreq: usize = ((code >> 1) & 3) as usize;
let num_bytes_postings_offset: usize = ((code >> 3) & 3) as usize;

View File

@@ -1,7 +1,11 @@
use std::io::{self, Write, Read};
use common::BinarySerializable;
mod termdict;
mod streamer;
mod delta_encoder;
pub use self::delta_encoder::{TermDeltaEncoder, TermDeltaDecoder};
pub use self::delta_encoder::{TermInfoDeltaEncoder, TermInfoDeltaDecoder, DeltaTermInfo};
@@ -10,3 +14,30 @@ pub use self::termdict::TermDictionaryBuilderImpl;
pub use self::streamer::TermStreamerImpl;
pub use self::streamer::TermStreamerBuilderImpl;
#[derive(Debug)]
pub struct CheckPoint {
pub stream_offset: u32,
pub postings_offset: u32,
pub positions_offset: u32,
}
impl BinarySerializable for CheckPoint {
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
self.stream_offset.serialize(writer)?;
self.postings_offset.serialize(writer)?;
self.positions_offset.serialize(writer)?;
Ok(())
}
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
let stream_offset = u32::deserialize(reader)?;
let postings_offset = u32::deserialize(reader)?;
let positions_offset = u32::deserialize(reader)?;
Ok(CheckPoint {
stream_offset: stream_offset,
postings_offset: postings_offset,
positions_offset: positions_offset,
})
}
}

View File

@@ -7,17 +7,19 @@ use postings::TermInfo;
use common::BinarySerializable;
use super::delta_encoder::{TermInfoDeltaDecoder, TermDeltaDecoder};
fn stream_before<'a>(term_dictionary: &'a TermDictionaryImpl,
target_key: &[u8],
has_positions: bool)
-> TermStreamerImpl<'a>
{
let (prev_key, offset) = term_dictionary.strictly_previous_key(target_key.as_ref());
let offset: usize = offset as usize;
let (prev_key, checkpoint) = term_dictionary.strictly_previous_key(target_key.as_ref());
let stream_data: &'a [u8] = &term_dictionary.stream_data()[checkpoint.stream_offset as usize..];
TermStreamerImpl {
cursor: &term_dictionary.stream_data()[offset..],
cursor: stream_data,
term_delta_decoder: TermDeltaDecoder::with_previous_term(prev_key),
term_info_decoder: TermInfoDeltaDecoder::new(has_positions), // TODO checkpoint
term_info_decoder: TermInfoDeltaDecoder::from_checkpoint(&checkpoint, has_positions),
}
}
@@ -30,6 +32,7 @@ pub struct TermStreamerBuilderImpl<'a>
offset_from: usize,
offset_to: usize,
current_key: Vec<u8>,
term_info: TermInfo,
has_positions: bool,
}
@@ -42,8 +45,9 @@ impl<'a> TermStreamerBuilder for TermStreamerBuilderImpl<'a>
let target_key = bound.as_ref();
let streamer = stream_before(self.term_dictionary, target_key.as_ref(), self.has_positions);
let smaller_than = |k: &[u8]| k.lt(target_key);
let (offset_before, current_key) = get_offset(smaller_than, streamer);
let (offset_before, current_key, term_info) = get_offset(smaller_than, streamer);
self.current_key = current_key;
self.term_info = term_info;
self.offset_from = offset_before - self.origin;
self
}
@@ -53,8 +57,9 @@ impl<'a> TermStreamerBuilder for TermStreamerBuilderImpl<'a>
let target_key = bound.as_ref();
let streamer = stream_before(self.term_dictionary, target_key.as_ref(), self.has_positions);
let smaller_than = |k: &[u8]| k.le(target_key);
let (offset_before, current_key) = get_offset(smaller_than, streamer);
let (offset_before, current_key, term_info) = get_offset(smaller_than, streamer);
self.current_key = current_key;
self.term_info = term_info;
self.offset_from = offset_before - self.origin;
self
}
@@ -64,7 +69,7 @@ impl<'a> TermStreamerBuilder for TermStreamerBuilderImpl<'a>
let target_key = bound.as_ref();
let streamer = stream_before(self.term_dictionary, target_key.as_ref(), self.has_positions);
let smaller_than = |k: &[u8]| k.lt(target_key);
let (offset_before, _) = get_offset(smaller_than, streamer);
let (offset_before, _, _) = get_offset(smaller_than, streamer);
self.offset_to = offset_before - self.origin;
self
}
@@ -74,7 +79,7 @@ impl<'a> TermStreamerBuilder for TermStreamerBuilderImpl<'a>
let target_key = bound.as_ref();
let streamer = stream_before(self.term_dictionary, target_key.as_ref(), self.has_positions);
let smaller_than = |k: &[u8]| k.le(target_key);
let (offset_before, _) = get_offset(smaller_than, streamer);
let (offset_before, _, _) = get_offset(smaller_than, streamer);
self.offset_to = offset_before - self.origin;
self
}
@@ -87,7 +92,7 @@ impl<'a> TermStreamerBuilder for TermStreamerBuilderImpl<'a>
TermStreamerImpl {
cursor: &data[start..stop],
term_delta_decoder: TermDeltaDecoder::with_previous_term(self.current_key),
term_info_decoder: TermInfoDeltaDecoder::new(self.has_positions), // TODO checkpoint
term_info_decoder: TermInfoDeltaDecoder::from_term_info(self.term_info, self.has_positions), // TODO checkpoint
}
}
}
@@ -101,21 +106,23 @@ impl<'a> TermStreamerBuilder for TermStreamerBuilderImpl<'a>
/// - the term_buffer state to initialize the block)
fn get_offset<'a, P: Fn(&[u8]) -> bool>(predicate: P,
mut streamer: TermStreamerImpl<'a>)
-> (usize, Vec<u8>)
-> (usize, Vec<u8>, TermInfo)
{
let mut prev: &[u8] = streamer.cursor;
let mut term_info = streamer.value().clone();
let mut prev_data: Vec<u8> = Vec::from(streamer.term_delta_decoder.term());
while let Some((iter_key, _)) = streamer.next() {
while let Some((iter_key, iter_term_info)) = streamer.next() {
if !predicate(iter_key.as_ref()) {
return (prev.as_ptr() as usize, prev_data);
return (prev.as_ptr() as usize, prev_data, term_info);
}
prev = streamer.cursor;
prev_data.clear();
prev_data.extend_from_slice(iter_key.as_ref());
term_info = iter_term_info.clone();
}
(prev.as_ptr() as usize, prev_data)
(prev.as_ptr() as usize, prev_data, term_info)
}
impl<'a> TermStreamerBuilderImpl<'a>
@@ -127,6 +134,7 @@ impl<'a> TermStreamerBuilderImpl<'a>
let origin = data.as_ptr() as usize;
TermStreamerBuilderImpl {
term_dictionary: term_dictionary,
term_info: TermInfo::default(),
origin: origin,
offset_from: 0,
offset_to: data.len(),

View File

@@ -1,6 +1,7 @@
#![allow(should_implement_trait)]
use std::io::{self, Write};
use super::CheckPoint;
use fst;
use fst::raw::Fst;
@@ -30,9 +31,9 @@ fn has_positions(field_type: &FieldType) -> bool {
if indexing_options.is_position_enabled() {
true
}
else {
false
}
else {
false
}
}
_ => {
false
@@ -47,6 +48,7 @@ pub struct TermDictionaryBuilderImpl<W>
term_delta_encoder: TermDeltaEncoder,
term_info_encoder: TermInfoDeltaEncoder,
block_index: fst::MapBuilder<Vec<u8>>,
checkpoints: Vec<u8>,
len: usize,
}
@@ -62,9 +64,20 @@ impl<W> TermDictionaryBuilderImpl<W>
where W: Write
{
fn add_index_entry(&mut self) {
let stream_offset = self.write.written_bytes() as u32;
let term_info = self.term_info_encoder.term_info();
let postings_offset = term_info.postings_offset as u32;
let positions_offset = term_info.positions_offset as u32;
let checkpoint = CheckPoint {
stream_offset: stream_offset,
postings_offset: postings_offset,
positions_offset: positions_offset,
};
self.block_index
.insert(&self.term_delta_encoder.term(), self.write.written_bytes() as u64)
.unwrap();
.insert(&self.term_delta_encoder.term(), self.checkpoints.len() as u64)
.expect("Serializing fst on a Vec<u8> should never fail. Where your terms not in order maybe?");
checkpoint.serialize(&mut self.checkpoints)
.expect("Serializing checkpoint on a Vec<u8> should never fail.");
}
/// # Warning
@@ -156,6 +169,7 @@ impl<W> TermDictionaryBuilder<W> for TermDictionaryBuilderImpl<W>
term_delta_encoder: TermDeltaEncoder::default(),
term_info_encoder: TermInfoDeltaEncoder::new(has_positions),
block_index: fst::MapBuilder::new(vec![]).expect("This cannot fail"),
checkpoints: vec!(),
len: 0,
})
}
@@ -173,12 +187,16 @@ impl<W> TermDictionaryBuilder<W> for TermDictionaryBuilderImpl<W>
/// Finalize writing the builder, and returns the underlying
/// `Write` object.
fn finish(mut self) -> io::Result<W> {
self.add_index_entry();
self.write.write_all(&[0u8; PADDING_SIZE])?;
// self.add_index_entry();
let (mut w, split_len) = self.write.finish()?;
let fst_addr = self.write.written_bytes();
let fst_write = self.block_index.into_inner().map_err(convert_fst_error)?;
w.write_all(&fst_write)?;
(split_len as u64).serialize(&mut w)?;
self.write.write_all(&fst_write)?;
let check_points_addr = self.write.written_bytes();
let (mut w, _) = self.write.finish()?;
w.write_all(&self.checkpoints)?;
(fst_addr as u64).serialize(&mut w)?;
(check_points_addr as u64).serialize(&mut w)?;
w.flush()?;
Ok(w)
}
@@ -204,6 +222,7 @@ pub struct TermDictionaryImpl
{
stream_data: ReadOnlySource,
fst_index: fst::Map,
checkpoints_data: ReadOnlySource,
has_positions: bool,
}
@@ -213,7 +232,15 @@ impl TermDictionaryImpl
self.stream_data.as_slice()
}
pub(crate) fn strictly_previous_key(&self, key: &[u8]) -> (Vec<u8>, u64) {
pub(crate) fn strictly_previous_key(&self, key: &[u8]) -> (Vec<u8>, CheckPoint) {
let (term, checkpoint_offset) = self.strictly_previous_key_checkpoint_offset(key);
let mut checkpoint_data = &self.checkpoints_data.as_slice()[checkpoint_offset..];
let checkpoint = CheckPoint::deserialize(&mut checkpoint_data)
.expect("Checkpoint data is corrupted");
(term, checkpoint)
}
fn strictly_previous_key_checkpoint_offset(&self, key: &[u8]) -> (Vec<u8>, usize) {
let fst_map = &self.fst_index;
let fst = fst_map.as_fst();
let mut node = fst.root();
@@ -246,12 +273,12 @@ impl TermDictionaryImpl
result.push(last_transition.inp);
let fork_node = fst.node(last_transition.addr);
fill_last(fst, fork_node, &mut result);
let val = fst_map.get(&result).unwrap();
let val = fst_map.get(&result).expect("Fst data corrupted") as usize;
return (result, val);
} else if cur_node.is_final() {
// the previous key is a prefix
let result_buffer = Vec::from(&key[..i]);
let val = fst_map.get(&result_buffer).unwrap();
let val = fst_map.get(&result_buffer).expect("Fst data corrupted") as usize;
return (result_buffer, val);
}
}
@@ -273,19 +300,22 @@ impl<'a> TermDictionary<'a> for TermDictionaryImpl
source = source.slice_from(1);
let total_len = source.len();
let length_offset = total_len - 8;
let split_len: usize = {
let mut split_len_buffer: &[u8] = &source.as_slice()[length_offset..];
u64::deserialize(&mut split_len_buffer)? as usize
};
let stream_data = source.slice(0, split_len);
let fst_data = source.slice(split_len, length_offset);
let (body, footer) = source.split(total_len - 16);
let mut footer_buffer: &[u8] = footer.as_slice();
let fst_addr: usize = u64::deserialize(&mut footer_buffer)? as usize;
let checkpoints_addr: usize = u64::deserialize(&mut footer_buffer)? as usize;
let stream_data = body.slice(0, fst_addr - PADDING_SIZE);
let fst_data = body.slice(fst_addr, checkpoints_addr);
let checkpoints_data = body.slice_from(checkpoints_addr);
let fst_index = open_fst_index(fst_data)?;
let len_without_padding = stream_data.len() - PADDING_SIZE;
Ok(TermDictionaryImpl {
has_positions: has_positions,
stream_data: stream_data.slice(0, len_without_padding),
stream_data: stream_data,
checkpoints_data: checkpoints_data,
fst_index: fst_index,
})
}