From 3d0082d0202c8a0c0b6b82c503605c427b6d72bd Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Sat, 26 Aug 2017 19:38:29 +0900 Subject: [PATCH] Delta encoded. Range and get are broken --- src/compression/stream.rs | 11 +-- src/core/field_reader.rs | 4 +- src/directory/mod.rs | 9 +- src/directory/read_only_source.rs | 9 +- src/postings/docset.rs | 16 +++ src/postings/serializer.rs | 11 ++- src/termdict/mod.rs | 1 - src/termdict/streamdict/delta_encoder.rs | 118 ++++++++++++++++++++--- src/termdict/streamdict/mod.rs | 4 +- src/termdict/streamdict/streamer.rs | 53 +++------- src/termdict/streamdict/termdict.rs | 50 +++++----- 11 files changed, 189 insertions(+), 97 deletions(-) diff --git a/src/compression/stream.rs b/src/compression/stream.rs index 29d180353..de902da85 100644 --- a/src/compression/stream.rs +++ b/src/compression/stream.rs @@ -1,7 +1,7 @@ use compression::BlockDecoder; use compression::NUM_DOCS_PER_BLOCK; use compression::compressed_block_size; -use directory::SourceRead; +use directory::{ReadOnlySource, SourceRead}; pub struct CompressedIntStream { buffer: SourceRead, @@ -10,9 +10,9 @@ pub struct CompressedIntStream { } impl CompressedIntStream { - pub fn wrap(buffer: SourceRead) -> CompressedIntStream { + pub(crate) fn wrap(source: ReadOnlySource) -> CompressedIntStream { CompressedIntStream { - buffer: buffer, + buffer: SourceRead::from(source), block_decoder: BlockDecoder::new(), inner_offset: NUM_DOCS_PER_BLOCK, } @@ -72,7 +72,7 @@ pub mod tests { use compression::compressed_block_size; use compression::NUM_DOCS_PER_BLOCK; use compression::BlockEncoder; - use directory::{SourceRead, ReadOnlySource}; + use directory::ReadOnlySource; fn create_stream_buffer() -> ReadOnlySource { let mut buffer: Vec = vec!(); @@ -90,8 +90,7 @@ pub mod tests { #[test] fn test_compressed_int_stream() { let buffer = create_stream_buffer(); - let buffer_reader = SourceRead::from(buffer); - let mut stream = CompressedIntStream::wrap(buffer_reader); + let mut stream = CompressedIntStream::wrap(buffer); let mut block: [u32; NUM_DOCS_PER_BLOCK] = [0u32; NUM_DOCS_PER_BLOCK]; stream.read(&mut block[0..2]); diff --git a/src/core/field_reader.rs b/src/core/field_reader.rs index eaf35514b..ca0e95111 100644 --- a/src/core/field_reader.rs +++ b/src/core/field_reader.rs @@ -100,8 +100,8 @@ impl FieldReader { let position_stream = { if option.has_positions() { let position_offset = term_info.positions_offset; - let positions_reader = SourceRead::from(self.positions_source.slice_from(position_offset as usize)); - let mut stream = CompressedIntStream::wrap(positions_reader); + let positions_source = self.positions_source.slice_from(position_offset as usize); + let mut stream = CompressedIntStream::wrap(positions_source); stream.skip(term_info.positions_inner_offset as usize); Some(stream) } diff --git a/src/directory/mod.rs b/src/directory/mod.rs index cfdaee719..b4c18b359 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -13,14 +13,15 @@ mod managed_directory; /// Errors specific to the directory module. pub mod error; -use std::io::{Write, Seek}; +use std::io::{Write, Seek, BufWriter}; -use std::io::BufWriter; -pub use self::read_only_source::{SourceRead, ReadOnlySource}; +pub use self::read_only_source::ReadOnlySource; pub use self::directory::Directory; pub use self::ram_directory::RAMDirectory; pub use self::mmap_directory::MmapDirectory; -pub use self::managed_directory::{ManagedDirectory, FileProtection}; + +pub(crate) use self::read_only_source::SourceRead; +pub(crate) use self::managed_directory::{ManagedDirectory, FileProtection}; /// Synonym of Seek + Write pub trait SeekableWrite: Seek + Write {} diff --git a/src/directory/read_only_source.rs b/src/directory/read_only_source.rs index 1fd0afc0f..3db74bb01 100644 --- a/src/directory/read_only_source.rs +++ b/src/directory/read_only_source.rs @@ -65,6 +65,8 @@ impl ReadOnlySource { } } + /// Like `.slice(...)` but enforcing only the `from` + /// boundary. pub fn slice_from(&self, from_offset: usize) -> ReadOnlySource { let len = self.len(); self.slice(from_offset, len) @@ -90,12 +92,15 @@ impl From> for ReadOnlySource { } } -pub struct SourceRead { + +/// Acts as a owning cursor over the data backed up by a ReadOnlySource +pub(crate) struct SourceRead { _data_owner: ReadOnlySource, cursor: &'static [u8] } impl SourceRead { + // Advance the cursor by a given number of bytes. pub fn advance(&mut self, len: usize) { self.cursor = &self.cursor[len..]; } @@ -108,6 +113,8 @@ impl AsRef<[u8]> for SourceRead { } impl From for SourceRead { + + // Creates a new `SourceRead` from a given `ReadOnlySource` fn from(source: ReadOnlySource) -> SourceRead { let len = source.len(); let slice_ptr = source.as_slice().as_ptr(); diff --git a/src/postings/docset.rs b/src/postings/docset.rs index 219a85dcb..4b1ea3c7a 100644 --- a/src/postings/docset.rs +++ b/src/postings/docset.rs @@ -52,6 +52,22 @@ pub trait DocSet { } } + + /// Fills a given mutable buffer with the next doc ids from the + /// `DocSet` + /// + /// If that many `DocId`s are available, the method should + /// fill the entire buffer and return the length of the buffer. + /// + /// If we reach the end of the `DocSet` before filling + /// it entirely, then the buffer is filled up to this point, and + /// return value is the number of elements that were filled. + /// + /// # Warning + /// + /// This method is only here for specific high-performance + /// use case where batching. The normal way to + /// go through the `DocId`'s is to call `.advance()`. fn fill_buffer(&mut self, buffer: &mut [DocId]) -> usize { for (i, buffer_val) in buffer.iter_mut().enumerate() { if self.advance() { diff --git a/src/postings/serializer.rs b/src/postings/serializer.rs index caec58b1f..5c24256cc 100644 --- a/src/postings/serializer.rs +++ b/src/postings/serializer.rs @@ -108,6 +108,8 @@ impl InvertedIndexSerializer { } +/// The field serializer is in charge of +/// the serialization of a specific field. pub struct FieldSerializer<'a> { term_dictionary_builder: TermDictionaryBuilderImpl<&'a mut CountingWriter>, postings_serializer: PostingsSerializer<&'a mut CountingWriter>, @@ -173,9 +175,10 @@ impl<'a> FieldSerializer<'a> { /// to the lexicographical order. /// * doc_freq - return the number of document containing the term. pub fn new_term(&mut self, term: &[u8]) -> io::Result<()> { - if self.term_open { - panic!("Called new_term, while the previous term was not closed."); - } + assert!( + !self.term_open, + "Called new_term, while the previous term was not closed." + ); self.term_open = true; self.postings_serializer.clear(); self.current_term_info = self.current_term_info(); @@ -217,6 +220,8 @@ impl<'a> FieldSerializer<'a> { Ok(()) } + + /// Closes the current current field. pub fn close(mut self) -> io::Result<()> { self.close_term()?; if let Some(positions_serializer) = self.positions_serializer_opt { diff --git a/src/termdict/mod.rs b/src/termdict/mod.rs index da8b65910..1ce1d6c54 100644 --- a/src/termdict/mod.rs +++ b/src/termdict/mod.rs @@ -347,7 +347,6 @@ mod tests { let buffer: Vec = { let mut term_dictionary_builder = TermDictionaryBuilderImpl::new(vec![], field_type).unwrap(); for &(ref id, ref i) in &ids { - println!("doc {}", id); term_dictionary_builder.insert(id.as_bytes(), &make_term_info(*i)).unwrap(); } term_dictionary_builder.finish().unwrap() diff --git a/src/termdict/streamdict/delta_encoder.rs b/src/termdict/streamdict/delta_encoder.rs index 21e5aac74..7418e4f85 100644 --- a/src/termdict/streamdict/delta_encoder.rs +++ b/src/termdict/streamdict/delta_encoder.rs @@ -1,4 +1,15 @@ -pub fn common_prefix_len(s1: &[u8], s2: &[u8]) -> usize { +use postings::TermInfo; +use common::VInt; +use common::BinarySerializable; +use std::io::{self, Write}; +use std::mem; + +/// Returns the len of the longest +/// common prefix of `s1` and `s2`. +/// +/// ie: the greatest `L` such that +/// for all `0 <= i < L`, `s1[i] == s2[i]` +fn common_prefix_len(s1: &[u8], s2: &[u8]) -> usize { s1.iter() .zip(s2.iter()) .take_while(|&(a, b)| a==b) @@ -7,16 +18,20 @@ pub fn common_prefix_len(s1: &[u8], s2: &[u8]) -> usize { #[derive(Default)] -pub struct DeltaEncoder { +pub struct TermDeltaEncoder { last_term: Vec, } -impl DeltaEncoder { - pub fn encode<'a>(&mut self, term: &'a [u8]) -> (usize, &'a [u8]) { +impl TermDeltaEncoder { + pub fn encode<'a, W: Write>(&mut self, term: &'a [u8], write: &mut W) -> io::Result<()> { let prefix_len = common_prefix_len(term, &self.last_term); self.last_term.truncate(prefix_len); self.last_term.extend_from_slice(&term[prefix_len..]); - (prefix_len, &term[prefix_len..]) + let suffix = &term[prefix_len..]; + VInt(prefix_len as u64).serialize(write)?; + VInt(suffix.len() as u64).serialize(write)?; + write.write_all(suffix)?; + Ok(()) } pub fn term(&self) -> &[u8] { @@ -25,24 +40,105 @@ impl DeltaEncoder { } #[derive(Default)] -pub struct DeltaDecoder { +pub struct TermDeltaDecoder { term: Vec, } -impl DeltaDecoder { - pub fn with_previous_term(term: Vec) -> DeltaDecoder { - DeltaDecoder { +impl TermDeltaDecoder { + pub fn with_previous_term(term: Vec) -> TermDeltaDecoder { + TermDeltaDecoder { term: Vec::from(term) } } - pub fn decode(&mut self, prefix_len: usize, suffix: &[u8]) -> &[u8] { + pub fn decode(&mut self, cursor: &mut &[u8]) { + let prefix_len: usize = deserialize_vint(cursor) as usize; + let suffix_length: usize = deserialize_vint(cursor) as usize; + let suffix = &cursor[..suffix_length]; + *cursor = &cursor[suffix_length..]; self.term.truncate(prefix_len); self.term.extend_from_slice(suffix); - &self.term[..] } pub fn term(&self) -> &[u8] { &self.term[..] } } + + + +pub struct TermInfoDeltaEncoder { + term_info: TermInfo, + has_positions: bool, +} + +impl TermInfoDeltaEncoder { + + pub fn new(has_positions: bool) -> Self { + TermInfoDeltaEncoder { + term_info: TermInfo::default(), + has_positions: has_positions, + } + } + + pub fn encode(&mut self, term_info: TermInfo, write: &mut W) -> io::Result<()> { + VInt(term_info.doc_freq as u64).serialize(write)?; + let delta_postings_offset = term_info.postings_offset - self.term_info.postings_offset; + VInt(delta_postings_offset as u64).serialize(write)?; + if self.has_positions { + let delta_positions_offset = term_info.positions_offset - self.term_info.positions_offset; + VInt(delta_positions_offset as u64).serialize(write)?; + write.write(&[term_info.positions_inner_offset])?; + } + mem::replace(&mut self.term_info, term_info); + Ok(()) + } +} + +fn deserialize_vint(data: &mut &[u8]) -> u64 { + let mut res = 0; + let mut shift = 0; + for i in 0.. { + let b = data[i]; + res |= ((b % 128u8) as u64) << shift; + if b & 128u8 != 0u8 { + *data = &data[(i + 1)..]; + break; + } + shift += 7; + } + res +} + +pub struct TermInfoDeltaDecoder { + term_info: TermInfo, + has_positions: bool, +} + +impl TermInfoDeltaDecoder { + pub fn new(has_positions: bool) -> TermInfoDeltaDecoder { + TermInfoDeltaDecoder { + term_info: TermInfo::default(), + has_positions: has_positions, + } + } + + pub fn decode(&mut self, cursor: &mut &[u8]) { + let doc_freq = deserialize_vint(cursor) as u32; + self.term_info.doc_freq = doc_freq; + let delta_postings = deserialize_vint(cursor) as u32; + self.term_info.postings_offset += delta_postings; + if self.has_positions { + let delta_positions = deserialize_vint(cursor) as u32; + self.term_info.positions_offset += delta_positions; + let position_inner_offset = cursor[0]; + *cursor = &cursor[1..]; + self.term_info.positions_inner_offset = position_inner_offset; + } + } + + pub fn term_info(&self) -> &TermInfo { + &self.term_info + } +} + diff --git a/src/termdict/streamdict/mod.rs b/src/termdict/streamdict/mod.rs index 96a2c4141..1c9a148a1 100644 --- a/src/termdict/streamdict/mod.rs +++ b/src/termdict/streamdict/mod.rs @@ -2,7 +2,9 @@ mod termdict; mod streamer; mod delta_encoder; -pub use self::delta_encoder::{DeltaEncoder, DeltaDecoder}; +pub use self::delta_encoder::{TermDeltaEncoder, TermDeltaDecoder}; +pub use self::delta_encoder::{TermInfoDeltaEncoder, TermInfoDeltaDecoder}; + pub use self::termdict::TermDictionaryImpl; pub use self::termdict::TermDictionaryBuilderImpl; pub use self::streamer::TermStreamerImpl; diff --git a/src/termdict/streamdict/streamer.rs b/src/termdict/streamdict/streamer.rs index 8ed95fda9..1363bf50a 100644 --- a/src/termdict/streamdict/streamer.rs +++ b/src/termdict/streamdict/streamer.rs @@ -4,7 +4,7 @@ use std::cmp::max; use super::TermDictionaryImpl; use termdict::{TermStreamerBuilder, TermStreamer}; use postings::TermInfo; -use super::delta_encoder::DeltaDecoder; +use super::delta_encoder::{TermInfoDeltaDecoder, TermDeltaDecoder}; fn stream_before<'a>(term_dictionary: &'a TermDictionaryImpl, @@ -16,9 +16,8 @@ fn stream_before<'a>(term_dictionary: &'a TermDictionaryImpl, let offset: usize = offset as usize; TermStreamerImpl { cursor: &term_dictionary.stream_data()[offset..], - delta_decoder: DeltaDecoder::with_previous_term(prev_key), - term_info: TermInfo::default(), - has_positions: has_positions, + term_delta_decoder: TermDeltaDecoder::with_previous_term(prev_key), + term_info_decoder: TermInfoDeltaDecoder::new(has_positions), // TODO checkpoint } } @@ -87,9 +86,8 @@ impl<'a> TermStreamerBuilder for TermStreamerBuilderImpl<'a> let stop = max(self.offset_to, start); TermStreamerImpl { cursor: &data[start..stop], - delta_decoder: DeltaDecoder::with_previous_term(self.current_key), - term_info: TermInfo::default(), - has_positions: self.has_positions, + term_delta_decoder: TermDeltaDecoder::with_previous_term(self.current_key), + term_info_decoder: TermInfoDeltaDecoder::new(self.has_positions), // TODO checkpoint } } } @@ -107,7 +105,7 @@ fn get_offset<'a, P: Fn(&[u8]) -> bool>(predicate: P, { let mut prev: &[u8] = streamer.cursor; - let mut prev_data: Vec = Vec::from(streamer.delta_decoder.term()); + let mut prev_data: Vec = Vec::from(streamer.term_delta_decoder.term()); while let Some((iter_key, _)) = streamer.next() { if !predicate(iter_key.as_ref()) { @@ -144,26 +142,12 @@ impl<'a> TermStreamerBuilderImpl<'a> pub struct TermStreamerImpl<'a> { cursor: &'a [u8], - delta_decoder: DeltaDecoder, - term_info: TermInfo, - has_positions: bool + term_delta_decoder: TermDeltaDecoder, + term_info_decoder: TermInfoDeltaDecoder, } -fn deserialize_vint(data: &mut &[u8]) -> u64 { - let mut res = 0; - let mut shift = 0; - for i in 0.. { - let b = data[i]; - res |= ((b % 128u8) as u64) << shift; - if b & 128u8 != 0u8 { - *data = &data[(i + 1)..]; - break; - } - shift += 7; - } - res -} + impl<'a> TermStreamer for TermStreamerImpl<'a> { @@ -171,28 +155,17 @@ impl<'a> TermStreamer for TermStreamerImpl<'a> if self.cursor.is_empty() { return false; } - let common_length: usize = deserialize_vint(&mut self.cursor) as usize; - let suffix_length: usize = deserialize_vint(&mut self.cursor) as usize; - self.delta_decoder.decode(common_length, &self.cursor[..suffix_length]); - self.cursor = &self.cursor[suffix_length..]; - - self.term_info.doc_freq = deserialize_vint(&mut self.cursor) as u32; - self.term_info.postings_offset = deserialize_vint(&mut self.cursor) as u32; - - if self.has_positions { - self.term_info.positions_offset = deserialize_vint(&mut self.cursor) as u32; - self.term_info.positions_inner_offset = self.cursor[0]; - self.cursor = &self.cursor[1..]; - } + self.term_delta_decoder.decode(&mut self.cursor); + self.term_info_decoder.decode(&mut self.cursor); true } fn key(&self) -> &[u8] { - self.delta_decoder.term() + self.term_delta_decoder.term() } fn value(&self) -> &TermInfo { - &self.term_info + &self.term_info_decoder.term_info() } } diff --git a/src/termdict/streamdict/termdict.rs b/src/termdict/streamdict/termdict.rs index 9c0dfb841..ab5f9dfc3 100644 --- a/src/termdict/streamdict/termdict.rs +++ b/src/termdict/streamdict/termdict.rs @@ -8,9 +8,8 @@ use common::BinarySerializable; use common::CountingWriter; use postings::TermInfo; use schema::FieldType; -use super::DeltaEncoder; +use super::{TermDeltaEncoder, TermInfoDeltaEncoder}; use fst::raw::Node; -use common::VInt; use termdict::{TermDictionary, TermDictionaryBuilder, TermStreamer}; use super::{TermStreamerImpl, TermStreamerBuilderImpl}; use termdict::TermStreamerBuilder; @@ -41,9 +40,9 @@ fn has_positions(field_type: &FieldType) -> bool { /// See [`TermDictionaryBuilder`](./trait.TermDictionaryBuilder.html) pub struct TermDictionaryBuilderImpl { - has_positions: bool, write: CountingWriter, - delta_encoder: DeltaEncoder, + term_delta_encoder: TermDeltaEncoder, + term_info_encoder: TermInfoDeltaEncoder, block_index: fst::MapBuilder>, len: usize, } @@ -61,7 +60,7 @@ impl TermDictionaryBuilderImpl { fn add_index_entry(&mut self) { self.block_index - .insert(&self.delta_encoder.term(), self.write.written_bytes() as u64) + .insert(&self.term_delta_encoder.term(), self.write.written_bytes() as u64) .unwrap(); } @@ -76,21 +75,13 @@ impl TermDictionaryBuilderImpl if self.len % INDEX_INTERVAL == 0 { self.add_index_entry(); } - let (common_prefix_len, suffix) = self.delta_encoder.encode(key); - VInt(common_prefix_len as u64).serialize(&mut self.write)?; - VInt(suffix.len() as u64).serialize(&mut self.write)?; - self.write.write_all(suffix)?; + self.term_delta_encoder.encode(key, &mut self.write)?; self.len += 1; Ok(()) } - pub(crate) fn insert_value(&mut self, value: &TermInfo) -> io::Result<()> { - VInt(value.doc_freq as u64).serialize(&mut self.write)?; - VInt(value.postings_offset as u64).serialize(&mut self.write)?; - if self.has_positions { - VInt(value.positions_offset as u64).serialize(&mut self.write)?; - self.write.write(&[value.positions_inner_offset])?; - } + pub(crate) fn insert_value(&mut self, term_info: &TermInfo) -> io::Result<()> { + self.term_info_encoder.encode(term_info.clone(), &mut self.write)?; Ok(()) } } @@ -104,9 +95,9 @@ impl TermDictionaryBuilder for TermDictionaryBuilderImpl let has_positions_code = if has_positions { 255u8 } else { 0u8 }; write.write_all(&[has_positions_code])?; Ok(TermDictionaryBuilderImpl { - has_positions: has_positions, write: CountingWriter::wrap(write), - delta_encoder: DeltaEncoder::default(), + term_delta_encoder: TermDeltaEncoder::default(), + term_info_encoder: TermInfoDeltaEncoder::new(has_positions), block_index: fst::MapBuilder::new(vec![]).expect("This cannot fail"), len: 0, }) @@ -118,7 +109,8 @@ impl TermDictionaryBuilder for TermDictionaryBuilderImpl fn insert>(&mut self, key_ref: K, value: &TermInfo) -> io::Result<()> { let key = key_ref.as_ref(); self.insert_key(key)?; - self.insert_value(value) + self.insert_value(value)?; + Ok(()) } /// Finalize writing the builder, and returns the underlying @@ -136,15 +128,17 @@ impl TermDictionaryBuilder for TermDictionaryBuilderImpl fn open_fst_index(source: ReadOnlySource) -> io::Result { - Ok(fst::Map::from(match source { - ReadOnlySource::Anonymous(data) => { - try!(Fst::from_shared_bytes(data.data, data.start, data.len) - .map_err(convert_fst_error)) - } - ReadOnlySource::Mmap(mmap_readonly) => { - try!(Fst::from_mmap(mmap_readonly).map_err(convert_fst_error)) - } - })) + use self::ReadOnlySource::*; + let fst_result = match source { + Anonymous(data) => { + Fst::from_shared_bytes(data.data, data.start, data.len) + } + Mmap(mmap_readonly) => { + Fst::from_mmap(mmap_readonly) + } + }; + let fst = fst_result.map_err(convert_fst_error)?; + Ok(fst::Map::from(fst)) } /// See [`TermDictionary`](./trait.TermDictionary.html)