Delta encoded. Range and get are broken

This commit is contained in:
Paul Masurel
2017-08-26 19:38:29 +09:00
parent 8e450c770a
commit 3d0082d020
11 changed files with 189 additions and 97 deletions

View File

@@ -1,7 +1,7 @@
use compression::BlockDecoder;
use compression::NUM_DOCS_PER_BLOCK;
use compression::compressed_block_size;
use directory::SourceRead;
use directory::{ReadOnlySource, SourceRead};
pub struct CompressedIntStream {
buffer: SourceRead,
@@ -10,9 +10,9 @@ pub struct CompressedIntStream {
}
impl CompressedIntStream {
pub fn wrap(buffer: SourceRead) -> CompressedIntStream {
pub(crate) fn wrap(source: ReadOnlySource) -> CompressedIntStream {
CompressedIntStream {
buffer: buffer,
buffer: SourceRead::from(source),
block_decoder: BlockDecoder::new(),
inner_offset: NUM_DOCS_PER_BLOCK,
}
@@ -72,7 +72,7 @@ pub mod tests {
use compression::compressed_block_size;
use compression::NUM_DOCS_PER_BLOCK;
use compression::BlockEncoder;
use directory::{SourceRead, ReadOnlySource};
use directory::ReadOnlySource;
fn create_stream_buffer() -> ReadOnlySource {
let mut buffer: Vec<u8> = vec!();
@@ -90,8 +90,7 @@ pub mod tests {
#[test]
fn test_compressed_int_stream() {
let buffer = create_stream_buffer();
let buffer_reader = SourceRead::from(buffer);
let mut stream = CompressedIntStream::wrap(buffer_reader);
let mut stream = CompressedIntStream::wrap(buffer);
let mut block: [u32; NUM_DOCS_PER_BLOCK] = [0u32; NUM_DOCS_PER_BLOCK];
stream.read(&mut block[0..2]);

View File

@@ -100,8 +100,8 @@ impl FieldReader {
let position_stream = {
if option.has_positions() {
let position_offset = term_info.positions_offset;
let positions_reader = SourceRead::from(self.positions_source.slice_from(position_offset as usize));
let mut stream = CompressedIntStream::wrap(positions_reader);
let positions_source = self.positions_source.slice_from(position_offset as usize);
let mut stream = CompressedIntStream::wrap(positions_source);
stream.skip(term_info.positions_inner_offset as usize);
Some(stream)
}

View File

@@ -13,14 +13,15 @@ mod managed_directory;
/// Errors specific to the directory module.
pub mod error;
use std::io::{Write, Seek};
use std::io::{Write, Seek, BufWriter};
use std::io::BufWriter;
pub use self::read_only_source::{SourceRead, ReadOnlySource};
pub use self::read_only_source::ReadOnlySource;
pub use self::directory::Directory;
pub use self::ram_directory::RAMDirectory;
pub use self::mmap_directory::MmapDirectory;
pub use self::managed_directory::{ManagedDirectory, FileProtection};
pub(crate) use self::read_only_source::SourceRead;
pub(crate) use self::managed_directory::{ManagedDirectory, FileProtection};
/// Synonym of Seek + Write
pub trait SeekableWrite: Seek + Write {}

View File

@@ -65,6 +65,8 @@ impl ReadOnlySource {
}
}
/// Like `.slice(...)` but enforcing only the `from`
/// boundary.
pub fn slice_from(&self, from_offset: usize) -> ReadOnlySource {
let len = self.len();
self.slice(from_offset, len)
@@ -90,12 +92,15 @@ impl From<Vec<u8>> for ReadOnlySource {
}
}
pub struct SourceRead {
/// Acts as a owning cursor over the data backed up by a ReadOnlySource
pub(crate) struct SourceRead {
_data_owner: ReadOnlySource,
cursor: &'static [u8]
}
impl SourceRead {
// Advance the cursor by a given number of bytes.
pub fn advance(&mut self, len: usize) {
self.cursor = &self.cursor[len..];
}
@@ -108,6 +113,8 @@ impl AsRef<[u8]> for SourceRead {
}
impl From<ReadOnlySource> for SourceRead {
// Creates a new `SourceRead` from a given `ReadOnlySource`
fn from(source: ReadOnlySource) -> SourceRead {
let len = source.len();
let slice_ptr = source.as_slice().as_ptr();

View File

@@ -52,6 +52,22 @@ pub trait DocSet {
}
}
/// Fills a given mutable buffer with the next doc ids from the
/// `DocSet`
///
/// If that many `DocId`s are available, the method should
/// fill the entire buffer and return the length of the buffer.
///
/// If we reach the end of the `DocSet` before filling
/// it entirely, then the buffer is filled up to this point, and
/// return value is the number of elements that were filled.
///
/// # Warning
///
/// This method is only here for specific high-performance
/// use case where batching. The normal way to
/// go through the `DocId`'s is to call `.advance()`.
fn fill_buffer(&mut self, buffer: &mut [DocId]) -> usize {
for (i, buffer_val) in buffer.iter_mut().enumerate() {
if self.advance() {

View File

@@ -108,6 +108,8 @@ impl InvertedIndexSerializer {
}
/// The field serializer is in charge of
/// the serialization of a specific field.
pub struct FieldSerializer<'a> {
term_dictionary_builder: TermDictionaryBuilderImpl<&'a mut CountingWriter<WritePtr>>,
postings_serializer: PostingsSerializer<&'a mut CountingWriter<WritePtr>>,
@@ -173,9 +175,10 @@ impl<'a> FieldSerializer<'a> {
/// to the lexicographical order.
/// * doc_freq - return the number of document containing the term.
pub fn new_term(&mut self, term: &[u8]) -> io::Result<()> {
if self.term_open {
panic!("Called new_term, while the previous term was not closed.");
}
assert!(
!self.term_open,
"Called new_term, while the previous term was not closed."
);
self.term_open = true;
self.postings_serializer.clear();
self.current_term_info = self.current_term_info();
@@ -217,6 +220,8 @@ impl<'a> FieldSerializer<'a> {
Ok(())
}
/// Closes the current current field.
pub fn close(mut self) -> io::Result<()> {
self.close_term()?;
if let Some(positions_serializer) = self.positions_serializer_opt {

View File

@@ -347,7 +347,6 @@ mod tests {
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilderImpl::new(vec![], field_type).unwrap();
for &(ref id, ref i) in &ids {
println!("doc {}", id);
term_dictionary_builder.insert(id.as_bytes(), &make_term_info(*i)).unwrap();
}
term_dictionary_builder.finish().unwrap()

View File

@@ -1,4 +1,15 @@
pub fn common_prefix_len(s1: &[u8], s2: &[u8]) -> usize {
use postings::TermInfo;
use common::VInt;
use common::BinarySerializable;
use std::io::{self, Write};
use std::mem;
/// Returns the len of the longest
/// common prefix of `s1` and `s2`.
///
/// ie: the greatest `L` such that
/// for all `0 <= i < L`, `s1[i] == s2[i]`
fn common_prefix_len(s1: &[u8], s2: &[u8]) -> usize {
s1.iter()
.zip(s2.iter())
.take_while(|&(a, b)| a==b)
@@ -7,16 +18,20 @@ pub fn common_prefix_len(s1: &[u8], s2: &[u8]) -> usize {
#[derive(Default)]
pub struct DeltaEncoder {
pub struct TermDeltaEncoder {
last_term: Vec<u8>,
}
impl DeltaEncoder {
pub fn encode<'a>(&mut self, term: &'a [u8]) -> (usize, &'a [u8]) {
impl TermDeltaEncoder {
pub fn encode<'a, W: Write>(&mut self, term: &'a [u8], write: &mut W) -> io::Result<()> {
let prefix_len = common_prefix_len(term, &self.last_term);
self.last_term.truncate(prefix_len);
self.last_term.extend_from_slice(&term[prefix_len..]);
(prefix_len, &term[prefix_len..])
let suffix = &term[prefix_len..];
VInt(prefix_len as u64).serialize(write)?;
VInt(suffix.len() as u64).serialize(write)?;
write.write_all(suffix)?;
Ok(())
}
pub fn term(&self) -> &[u8] {
@@ -25,24 +40,105 @@ impl DeltaEncoder {
}
#[derive(Default)]
pub struct DeltaDecoder {
pub struct TermDeltaDecoder {
term: Vec<u8>,
}
impl DeltaDecoder {
pub fn with_previous_term(term: Vec<u8>) -> DeltaDecoder {
DeltaDecoder {
impl TermDeltaDecoder {
pub fn with_previous_term(term: Vec<u8>) -> TermDeltaDecoder {
TermDeltaDecoder {
term: Vec::from(term)
}
}
pub fn decode(&mut self, prefix_len: usize, suffix: &[u8]) -> &[u8] {
pub fn decode(&mut self, cursor: &mut &[u8]) {
let prefix_len: usize = deserialize_vint(cursor) as usize;
let suffix_length: usize = deserialize_vint(cursor) as usize;
let suffix = &cursor[..suffix_length];
*cursor = &cursor[suffix_length..];
self.term.truncate(prefix_len);
self.term.extend_from_slice(suffix);
&self.term[..]
}
pub fn term(&self) -> &[u8] {
&self.term[..]
}
}
pub struct TermInfoDeltaEncoder {
term_info: TermInfo,
has_positions: bool,
}
impl TermInfoDeltaEncoder {
pub fn new(has_positions: bool) -> Self {
TermInfoDeltaEncoder {
term_info: TermInfo::default(),
has_positions: has_positions,
}
}
pub fn encode<W: Write>(&mut self, term_info: TermInfo, write: &mut W) -> io::Result<()> {
VInt(term_info.doc_freq as u64).serialize(write)?;
let delta_postings_offset = term_info.postings_offset - self.term_info.postings_offset;
VInt(delta_postings_offset as u64).serialize(write)?;
if self.has_positions {
let delta_positions_offset = term_info.positions_offset - self.term_info.positions_offset;
VInt(delta_positions_offset as u64).serialize(write)?;
write.write(&[term_info.positions_inner_offset])?;
}
mem::replace(&mut self.term_info, term_info);
Ok(())
}
}
fn deserialize_vint(data: &mut &[u8]) -> u64 {
let mut res = 0;
let mut shift = 0;
for i in 0.. {
let b = data[i];
res |= ((b % 128u8) as u64) << shift;
if b & 128u8 != 0u8 {
*data = &data[(i + 1)..];
break;
}
shift += 7;
}
res
}
pub struct TermInfoDeltaDecoder {
term_info: TermInfo,
has_positions: bool,
}
impl TermInfoDeltaDecoder {
pub fn new(has_positions: bool) -> TermInfoDeltaDecoder {
TermInfoDeltaDecoder {
term_info: TermInfo::default(),
has_positions: has_positions,
}
}
pub fn decode(&mut self, cursor: &mut &[u8]) {
let doc_freq = deserialize_vint(cursor) as u32;
self.term_info.doc_freq = doc_freq;
let delta_postings = deserialize_vint(cursor) as u32;
self.term_info.postings_offset += delta_postings;
if self.has_positions {
let delta_positions = deserialize_vint(cursor) as u32;
self.term_info.positions_offset += delta_positions;
let position_inner_offset = cursor[0];
*cursor = &cursor[1..];
self.term_info.positions_inner_offset = position_inner_offset;
}
}
pub fn term_info(&self) -> &TermInfo {
&self.term_info
}
}

View File

@@ -2,7 +2,9 @@ mod termdict;
mod streamer;
mod delta_encoder;
pub use self::delta_encoder::{DeltaEncoder, DeltaDecoder};
pub use self::delta_encoder::{TermDeltaEncoder, TermDeltaDecoder};
pub use self::delta_encoder::{TermInfoDeltaEncoder, TermInfoDeltaDecoder};
pub use self::termdict::TermDictionaryImpl;
pub use self::termdict::TermDictionaryBuilderImpl;
pub use self::streamer::TermStreamerImpl;

View File

@@ -4,7 +4,7 @@ use std::cmp::max;
use super::TermDictionaryImpl;
use termdict::{TermStreamerBuilder, TermStreamer};
use postings::TermInfo;
use super::delta_encoder::DeltaDecoder;
use super::delta_encoder::{TermInfoDeltaDecoder, TermDeltaDecoder};
fn stream_before<'a>(term_dictionary: &'a TermDictionaryImpl,
@@ -16,9 +16,8 @@ fn stream_before<'a>(term_dictionary: &'a TermDictionaryImpl,
let offset: usize = offset as usize;
TermStreamerImpl {
cursor: &term_dictionary.stream_data()[offset..],
delta_decoder: DeltaDecoder::with_previous_term(prev_key),
term_info: TermInfo::default(),
has_positions: has_positions,
term_delta_decoder: TermDeltaDecoder::with_previous_term(prev_key),
term_info_decoder: TermInfoDeltaDecoder::new(has_positions), // TODO checkpoint
}
}
@@ -87,9 +86,8 @@ impl<'a> TermStreamerBuilder for TermStreamerBuilderImpl<'a>
let stop = max(self.offset_to, start);
TermStreamerImpl {
cursor: &data[start..stop],
delta_decoder: DeltaDecoder::with_previous_term(self.current_key),
term_info: TermInfo::default(),
has_positions: self.has_positions,
term_delta_decoder: TermDeltaDecoder::with_previous_term(self.current_key),
term_info_decoder: TermInfoDeltaDecoder::new(self.has_positions), // TODO checkpoint
}
}
}
@@ -107,7 +105,7 @@ fn get_offset<'a, P: Fn(&[u8]) -> bool>(predicate: P,
{
let mut prev: &[u8] = streamer.cursor;
let mut prev_data: Vec<u8> = Vec::from(streamer.delta_decoder.term());
let mut prev_data: Vec<u8> = Vec::from(streamer.term_delta_decoder.term());
while let Some((iter_key, _)) = streamer.next() {
if !predicate(iter_key.as_ref()) {
@@ -144,26 +142,12 @@ impl<'a> TermStreamerBuilderImpl<'a>
pub struct TermStreamerImpl<'a>
{
cursor: &'a [u8],
delta_decoder: DeltaDecoder,
term_info: TermInfo,
has_positions: bool
term_delta_decoder: TermDeltaDecoder,
term_info_decoder: TermInfoDeltaDecoder,
}
fn deserialize_vint(data: &mut &[u8]) -> u64 {
let mut res = 0;
let mut shift = 0;
for i in 0.. {
let b = data[i];
res |= ((b % 128u8) as u64) << shift;
if b & 128u8 != 0u8 {
*data = &data[(i + 1)..];
break;
}
shift += 7;
}
res
}
impl<'a> TermStreamer for TermStreamerImpl<'a>
{
@@ -171,28 +155,17 @@ impl<'a> TermStreamer for TermStreamerImpl<'a>
if self.cursor.is_empty() {
return false;
}
let common_length: usize = deserialize_vint(&mut self.cursor) as usize;
let suffix_length: usize = deserialize_vint(&mut self.cursor) as usize;
self.delta_decoder.decode(common_length, &self.cursor[..suffix_length]);
self.cursor = &self.cursor[suffix_length..];
self.term_info.doc_freq = deserialize_vint(&mut self.cursor) as u32;
self.term_info.postings_offset = deserialize_vint(&mut self.cursor) as u32;
if self.has_positions {
self.term_info.positions_offset = deserialize_vint(&mut self.cursor) as u32;
self.term_info.positions_inner_offset = self.cursor[0];
self.cursor = &self.cursor[1..];
}
self.term_delta_decoder.decode(&mut self.cursor);
self.term_info_decoder.decode(&mut self.cursor);
true
}
fn key(&self) -> &[u8] {
self.delta_decoder.term()
self.term_delta_decoder.term()
}
fn value(&self) -> &TermInfo {
&self.term_info
&self.term_info_decoder.term_info()
}
}

View File

@@ -8,9 +8,8 @@ use common::BinarySerializable;
use common::CountingWriter;
use postings::TermInfo;
use schema::FieldType;
use super::DeltaEncoder;
use super::{TermDeltaEncoder, TermInfoDeltaEncoder};
use fst::raw::Node;
use common::VInt;
use termdict::{TermDictionary, TermDictionaryBuilder, TermStreamer};
use super::{TermStreamerImpl, TermStreamerBuilderImpl};
use termdict::TermStreamerBuilder;
@@ -41,9 +40,9 @@ fn has_positions(field_type: &FieldType) -> bool {
/// See [`TermDictionaryBuilder`](./trait.TermDictionaryBuilder.html)
pub struct TermDictionaryBuilderImpl<W>
{
has_positions: bool,
write: CountingWriter<W>,
delta_encoder: DeltaEncoder,
term_delta_encoder: TermDeltaEncoder,
term_info_encoder: TermInfoDeltaEncoder,
block_index: fst::MapBuilder<Vec<u8>>,
len: usize,
}
@@ -61,7 +60,7 @@ impl<W> TermDictionaryBuilderImpl<W>
{
fn add_index_entry(&mut self) {
self.block_index
.insert(&self.delta_encoder.term(), self.write.written_bytes() as u64)
.insert(&self.term_delta_encoder.term(), self.write.written_bytes() as u64)
.unwrap();
}
@@ -76,21 +75,13 @@ impl<W> TermDictionaryBuilderImpl<W>
if self.len % INDEX_INTERVAL == 0 {
self.add_index_entry();
}
let (common_prefix_len, suffix) = self.delta_encoder.encode(key);
VInt(common_prefix_len as u64).serialize(&mut self.write)?;
VInt(suffix.len() as u64).serialize(&mut self.write)?;
self.write.write_all(suffix)?;
self.term_delta_encoder.encode(key, &mut self.write)?;
self.len += 1;
Ok(())
}
pub(crate) fn insert_value(&mut self, value: &TermInfo) -> io::Result<()> {
VInt(value.doc_freq as u64).serialize(&mut self.write)?;
VInt(value.postings_offset as u64).serialize(&mut self.write)?;
if self.has_positions {
VInt(value.positions_offset as u64).serialize(&mut self.write)?;
self.write.write(&[value.positions_inner_offset])?;
}
pub(crate) fn insert_value(&mut self, term_info: &TermInfo) -> io::Result<()> {
self.term_info_encoder.encode(term_info.clone(), &mut self.write)?;
Ok(())
}
}
@@ -104,9 +95,9 @@ impl<W> TermDictionaryBuilder<W> for TermDictionaryBuilderImpl<W>
let has_positions_code = if has_positions { 255u8 } else { 0u8 };
write.write_all(&[has_positions_code])?;
Ok(TermDictionaryBuilderImpl {
has_positions: has_positions,
write: CountingWriter::wrap(write),
delta_encoder: DeltaEncoder::default(),
term_delta_encoder: TermDeltaEncoder::default(),
term_info_encoder: TermInfoDeltaEncoder::new(has_positions),
block_index: fst::MapBuilder::new(vec![]).expect("This cannot fail"),
len: 0,
})
@@ -118,7 +109,8 @@ impl<W> TermDictionaryBuilder<W> for TermDictionaryBuilderImpl<W>
fn insert<K: AsRef<[u8]>>(&mut self, key_ref: K, value: &TermInfo) -> io::Result<()> {
let key = key_ref.as_ref();
self.insert_key(key)?;
self.insert_value(value)
self.insert_value(value)?;
Ok(())
}
/// Finalize writing the builder, and returns the underlying
@@ -136,15 +128,17 @@ impl<W> TermDictionaryBuilder<W> for TermDictionaryBuilderImpl<W>
fn open_fst_index(source: ReadOnlySource) -> io::Result<fst::Map> {
Ok(fst::Map::from(match source {
ReadOnlySource::Anonymous(data) => {
try!(Fst::from_shared_bytes(data.data, data.start, data.len)
.map_err(convert_fst_error))
}
ReadOnlySource::Mmap(mmap_readonly) => {
try!(Fst::from_mmap(mmap_readonly).map_err(convert_fst_error))
}
}))
use self::ReadOnlySource::*;
let fst_result = match source {
Anonymous(data) => {
Fst::from_shared_bytes(data.data, data.start, data.len)
}
Mmap(mmap_readonly) => {
Fst::from_mmap(mmap_readonly)
}
};
let fst = fst_result.map_err(convert_fst_error)?;
Ok(fst::Map::from(fst))
}
/// See [`TermDictionary`](./trait.TermDictionary.html)