From d0d5db4515dc820f8daee1e043b5231c3b3ac7fb Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Sat, 19 Aug 2017 12:03:04 +0900 Subject: [PATCH] Streamdict using SIMD instruction. --- src/lib.rs | 3 +- src/termdict/streamdict/mod.rs | 15 ++ src/termdict/streamdict/streamer.rs | 132 +++++++------- src/termdict/streamdict/term_block_encoder.rs | 164 ++++++++++++++++++ src/termdict/streamdict/termdict.rs | 69 ++++---- .../streamdict/terminfo_block_encoder.rs | 117 +++++++++++++ 6 files changed, 404 insertions(+), 96 deletions(-) create mode 100644 src/termdict/streamdict/term_block_encoder.rs create mode 100644 src/termdict/streamdict/terminfo_block_encoder.rs diff --git a/src/lib.rs b/src/lib.rs index c926b67b3..5f4cfa3ee 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -262,7 +262,7 @@ mod tests { } #[test] - fn test_docfreq() { + fn test_docfreq1() { let mut schema_builder = SchemaBuilder::default(); let text_field = schema_builder.add_text_field("text", TEXT); let index = Index::create_in_ram(schema_builder.build()); @@ -301,7 +301,6 @@ mod tests { } } - #[test] fn test_fieldnorm() { let mut schema_builder = SchemaBuilder::default(); diff --git a/src/termdict/streamdict/mod.rs b/src/termdict/streamdict/mod.rs index 101d8e9fb..4a4db7690 100644 --- a/src/termdict/streamdict/mod.rs +++ b/src/termdict/streamdict/mod.rs @@ -1,11 +1,15 @@ mod termdict; mod streamer; +mod term_block_encoder; +mod terminfo_block_encoder; pub use self::termdict::TermDictionaryImpl; pub use self::termdict::TermDictionaryBuilderImpl; pub use self::streamer::TermStreamerImpl; pub use self::streamer::TermStreamerBuilderImpl; +use self::term_block_encoder::{TermBlockEncoder, TermBlockDecoder}; +use self::terminfo_block_encoder::{TermInfoBlockEncoder, TermInfoBlockDecoder}; use schema::FieldType; @@ -16,6 +20,17 @@ pub(crate) enum TermDeserializerOption { U64, } +impl TermDeserializerOption { + + pub fn has_positions(&self) -> bool { + match *self { + TermDeserializerOption::StrWithPositions => true, + _ => false + } + } + +} + fn make_deserializer_options(field_type: &FieldType) -> TermDeserializerOption { match *field_type { FieldType::Str(ref text_options) => { diff --git a/src/termdict/streamdict/streamer.rs b/src/termdict/streamdict/streamer.rs index 2f302c8fd..419468308 100644 --- a/src/termdict/streamdict/streamer.rs +++ b/src/termdict/streamdict/streamer.rs @@ -3,6 +3,7 @@ use std::cmp::max; use super::TermDictionaryImpl; use termdict::{TermStreamerBuilder, TermStreamer}; +use super::{TermBlockDecoder, TermInfoBlockDecoder}; use postings::TermInfo; use super::TermDeserializerOption; @@ -15,10 +16,10 @@ pub(crate) fn stream_before<'a>(term_dictionary: &'a TermDictionaryImpl, let (prev_key, offset) = term_dictionary.strictly_previous_key(target_key.as_ref()); let offset: usize = offset as usize; TermStreamerImpl { + remaining_in_block: 0, + term_block_decoder: TermBlockDecoder::given_previous_term(&prev_key[..]), + terminfo_block_decoder: TermInfoBlockDecoder::new(deserializer_option.has_positions()), cursor: &term_dictionary.stream_data()[offset..], - current_key: Vec::from(prev_key), - current_value: TermInfo::default(), - term_deserializer_option: deserializer_option, } } @@ -27,7 +28,9 @@ pub(crate) fn stream_before<'a>(term_dictionary: &'a TermDictionaryImpl, pub struct TermStreamerBuilderImpl<'a> { term_dictionary: &'a TermDictionaryImpl, + block_start: &'a [u8], origin: usize, + cursor: usize, offset_from: usize, offset_to: usize, current_key: Vec, @@ -40,44 +43,60 @@ impl<'a> TermStreamerBuilder for TermStreamerBuilderImpl<'a> /// Limit the range to terms greater or equal to the bound fn ge>(mut self, bound: T) -> Self { + unimplemented!(); + /* let target_key = bound.as_ref(); let streamer = stream_before(self.term_dictionary, target_key.as_ref(), self.deserializer_option); let smaller_than = |k: &[u8]| k.lt(target_key); - let (offset_before, current_key) = get_offset(smaller_than, streamer); + let (block_start, cursor, current_key) = get_offset(smaller_than, streamer); + self.block_start = block_start; self.current_key = current_key; - self.offset_from = offset_before - self.origin; + self.cursor = cursor; + //self.offset_from = ; + */ self } /// Limit the range to terms strictly greater than the bound fn gt>(mut self, bound: T) -> Self { + unimplemented!(); + /* let target_key = bound.as_ref(); let streamer = stream_before(self.term_dictionary, target_key.as_ref(), self.deserializer_option); let smaller_than = |k: &[u8]| k.le(target_key); - let (offset_before, current_key) = get_offset(smaller_than, streamer); + let (block_start, cursor, current_key) = get_offset(smaller_than, streamer); + self.block_start = block_start; self.current_key = current_key; - self.offset_from = offset_before - self.origin; + self.cursor = cursor; + //self.offset_from = offset_before - self.origin; + */ self } /// Limit the range to terms lesser or equal to the bound fn lt>(mut self, bound: T) -> Self { + unimplemented!(); + /* let target_key = bound.as_ref(); let streamer = stream_before(self.term_dictionary, target_key.as_ref(), self.deserializer_option); let smaller_than = |k: &[u8]| k.lt(target_key); let (offset_before, _) = get_offset(smaller_than, streamer); self.offset_to = offset_before - self.origin; self + */ } /// Limit the range to terms lesser or equal to the bound fn le>(mut self, bound: T) -> Self { + unimplemented!(); + /* let target_key = bound.as_ref(); let streamer = stream_before(self.term_dictionary, target_key.as_ref(), self.deserializer_option); let smaller_than = |k: &[u8]| k.le(target_key); let (offset_before, _) = get_offset(smaller_than, streamer); self.offset_to = offset_before - self.origin; self + */ } /// Build the streamer. @@ -85,12 +104,12 @@ impl<'a> TermStreamerBuilder for TermStreamerBuilderImpl<'a> let data: &[u8] = self.term_dictionary.stream_data(); let start = self.offset_from; let stop = max(self.offset_to, start); - + println!("current_key {:?}", self.current_key); TermStreamerImpl { + remaining_in_block: 0, cursor: &data[start..stop], - current_key: self.current_key, - current_value: TermInfo::default(), - term_deserializer_option: self.deserializer_option, + term_block_decoder: TermBlockDecoder::given_previous_term(&self.current_key), + terminfo_block_decoder: TermInfoBlockDecoder::new(self.deserializer_option.has_positions()), } } } @@ -98,24 +117,30 @@ impl<'a> TermStreamerBuilder for TermStreamerBuilderImpl<'a> /// Returns offset information for the first /// key in the stream matching a given predicate. /// -/// returns (start offset, the data required to load the value) +/// returns +/// - the block start +/// - the index within this block +/// - the term_buffer state to initialize the block) fn get_offset<'a, P: Fn(&[u8]) -> bool>(predicate: P, mut streamer: TermStreamerImpl<'a>) - -> (usize, Vec) -{ - let mut prev: &[u8] = streamer.cursor; + -> (&'a [u8], usize, Vec) +{//&'a [u8] + let mut block_start: &[u8] = streamer.cursor; + let mut cursor = 0; + let mut term_buffer: Vec = vec!(); - let mut prev_data: Vec = streamer.current_key.clone(); - - while let Some((iter_key, _)) = streamer.next() { + while streamer.advance() { + let iter_key = streamer.key(); if !predicate(iter_key.as_ref()) { - return (prev.as_ptr() as usize, prev_data); + return (block_start, streamer.term_block_decoder.cursor() - 1, term_buffer); + } + if streamer.remaining_in_block == 0 { + block_start = streamer.cursor; + term_buffer.clear(); + term_buffer.extend_from_slice(iter_key.as_ref()); } - prev = streamer.cursor; - prev_data.clear(); - prev_data.extend_from_slice(iter_key.as_ref()); } - (prev.as_ptr() as usize, prev_data) + (block_start, streamer.term_block_decoder.cursor() - 1, term_buffer) } impl<'a> TermStreamerBuilderImpl<'a> @@ -127,6 +152,8 @@ impl<'a> TermStreamerBuilderImpl<'a> let origin = data.as_ptr() as usize; TermStreamerBuilderImpl { term_dictionary: term_dictionary, + block_start: term_dictionary.stream_data().as_ref(), + cursor: 0, origin: origin, offset_from: 0, offset_to: data.len(), @@ -141,66 +168,49 @@ impl<'a> TermStreamerBuilderImpl<'a> /// See [`TermStreamer`](./trait.TermStreamer.html) pub struct TermStreamerImpl<'a> { + remaining_in_block: usize, + term_block_decoder: TermBlockDecoder<'a>, + terminfo_block_decoder: TermInfoBlockDecoder<'a>, cursor: &'a [u8], - current_key: Vec, - current_value: TermInfo, - term_deserializer_option: TermDeserializerOption, } impl<'a> TermStreamerImpl<'a> { - pub(crate) fn extract_value(self) -> TermInfo { - self.current_value - } - - fn deserialize_value(&mut self) { - self.current_value.doc_freq = deserialize_vint(&mut self.cursor) as u32; - self.current_value.postings_offset = deserialize_vint(&mut self.cursor) as u32; - if self.term_deserializer_option == TermDeserializerOption::StrWithPositions { - self.current_value.positions_offset = deserialize_vint(&mut self.cursor) as u32; - self.current_value.positions_inner_offset = self.cursor[0]; + fn load_block(&mut self) -> bool { + self.remaining_in_block = self.cursor[0] as usize; + if self.remaining_in_block == 0 { + false + } + else { self.cursor = &self.cursor[1..]; + self.cursor = self.term_block_decoder.decode_block(self.cursor); + self.cursor = self.terminfo_block_decoder.decode_block(self.cursor, self.remaining_in_block); + true } } } -fn deserialize_vint(data: &mut &[u8]) -> u64 { - let mut res = 0; - let mut shift = 0; - for i in 0.. { - let b = data[i]; - res |= ((b % 128u8) as u64) << shift; - if b & 128u8 != 0u8 { - *data = &data[(i + 1)..]; - break; - } - shift += 7; - } - res -} impl<'a> TermStreamer for TermStreamerImpl<'a> { fn advance(&mut self) -> bool { - if self.cursor.is_empty() { - return false; + if self.remaining_in_block == 0 { + if !self.load_block() { + return false; + } } - let common_length: usize = deserialize_vint(&mut self.cursor) as usize; - self.current_key.truncate(common_length); - let added_length: usize = deserialize_vint(&mut self.cursor) as usize; - self.current_key.extend(&self.cursor[..added_length]); - - self.cursor = &self.cursor[added_length..]; - self.deserialize_value(); + self.remaining_in_block -= 1; + self.term_block_decoder.advance(); + self.terminfo_block_decoder.advance(); true } fn key(&self) -> &[u8] { - &self.current_key + self.term_block_decoder.term() } fn value(&self) -> &TermInfo { - &self.current_value + self.terminfo_block_decoder.term_info() } } diff --git a/src/termdict/streamdict/term_block_encoder.rs b/src/termdict/streamdict/term_block_encoder.rs new file mode 100644 index 000000000..157a3cf28 --- /dev/null +++ b/src/termdict/streamdict/term_block_encoder.rs @@ -0,0 +1,164 @@ +use compression::{BlockEncoder, BlockDecoder, NUM_DOCS_PER_BLOCK}; +use std::io::{self, Write}; + +fn compute_common_prefix_length(left: &[u8], right: &[u8]) -> usize { + left.iter() + .cloned() + .zip(right.iter().cloned()) + .take_while(|&(b1, b2)| b1 == b2) + .count() +} + + +pub struct TermBlockEncoder { + block_encoder: BlockEncoder, + + pop_lens: [u32; NUM_DOCS_PER_BLOCK], + push_lens: [u32; NUM_DOCS_PER_BLOCK], + suffixes: Vec, + + previous_key: Vec, + count: usize, +} + +impl TermBlockEncoder { + pub fn new() -> TermBlockEncoder { + TermBlockEncoder { + block_encoder: BlockEncoder::new(), + pop_lens: [0u32; NUM_DOCS_PER_BLOCK], + push_lens: [0u32; NUM_DOCS_PER_BLOCK], + suffixes: Vec::with_capacity(NUM_DOCS_PER_BLOCK*5), + + previous_key: Vec::with_capacity(30), + + count: 0, + } + } + + pub fn encode(&mut self, key: &[u8]) { + let common_prefix_len = compute_common_prefix_length(&self.previous_key, key); + self.pop_lens[self.count] = (self.previous_key.len() - common_prefix_len) as u32; + self.push_lens[self.count] = (key.len() - common_prefix_len) as u32; + self.previous_key.clear(); + let suffix = &key[common_prefix_len..]; + self.suffixes.extend_from_slice(suffix); + self.previous_key.extend_from_slice(key); + self.count += 1; + } + + pub fn len(&self) -> usize { + self.count + } + + pub fn flush(&mut self, output: &mut W) -> io::Result<()> { + for i in self.count..NUM_DOCS_PER_BLOCK { + self.pop_lens[i] = 0u32; + self.push_lens[i] = 0u32; + } + output.write_all(self.block_encoder.compress_block_unsorted(&self.pop_lens))?; + output.write_all(self.block_encoder.compress_block_unsorted(&self.push_lens))?; + output.write_all(&self.suffixes[..])?; + self.suffixes.clear(); + self.count = 0; + Ok(()) + } +} + + + +pub struct TermBlockDecoder<'a> { + pop_lens_decoder: BlockDecoder, + push_lens_decoder: BlockDecoder, + suffixes: &'a [u8], + current_key: Vec, + cursor: usize, +} + + +impl<'a> TermBlockDecoder<'a> { + pub fn new() -> TermBlockDecoder<'a> { + TermBlockDecoder::given_previous_term(&[]) + } + + pub fn cursor(&self) -> usize { + self.cursor + } + + pub fn given_previous_term(previous_term: &[u8]) -> TermBlockDecoder<'a> { + let mut current_key = Vec::with_capacity(30); + current_key.extend_from_slice(previous_term); + TermBlockDecoder { + pop_lens_decoder: BlockDecoder::new(), + push_lens_decoder: BlockDecoder::new(), + current_key: current_key, + suffixes: &[], + cursor: 0, + } + } + + pub fn term(&self) -> &[u8] { + &self.current_key + } + + pub fn decode_block(&mut self, mut compressed_data: &'a [u8]) -> &'a [u8] { + { + let consumed_data_len = self.pop_lens_decoder.uncompress_block_unsorted(compressed_data); + compressed_data = &compressed_data[consumed_data_len..]; + } + { + let consumed_data_len = self.push_lens_decoder.uncompress_block_unsorted(compressed_data); + compressed_data = &compressed_data[consumed_data_len..]; + } + let suffix_len: u32 = self.push_lens_decoder.output_array()[0..].iter().cloned().sum(); + let suffix_len: usize = suffix_len as usize; + self.suffixes = &compressed_data[..suffix_len]; + self.cursor = 0; + &compressed_data[suffix_len..] + } + + pub fn advance(&mut self) { + assert!(self.cursor < NUM_DOCS_PER_BLOCK); + let pop_len = self.pop_lens_decoder.output(self.cursor) as usize; + let push_len = self.push_lens_decoder.output(self.cursor) as usize; + let previous_len = self.current_key.len(); + self.current_key.truncate(previous_len - pop_len); + self.current_key.extend_from_slice(&self.suffixes[..push_len]); + self.suffixes = &self.suffixes[push_len..]; + self.cursor += 1; + } +} + + + +#[cfg(test)] +mod tests { + use super::{TermBlockEncoder, TermBlockDecoder}; + + #[test] + fn test_encoding_terms() { + let mut buffer: Vec = vec!(); + let mut terms = vec!(); + { + let mut term_block_encoder = TermBlockEncoder::new(); + for i in 0..128 { + terms.push(format!("term{}", i * 7231)); + } + for term in &terms { + term_block_encoder.encode(term.as_bytes()); + } + term_block_encoder.flush(&mut buffer).unwrap(); + } + assert_eq!(buffer.len(), 711); + + let mut block_decoder = TermBlockDecoder::new(); + assert_eq!(block_decoder.decode_block(&buffer[..]).len(), 0); + for i in 0..128 { + block_decoder.advance(); + assert_eq!(block_decoder.term(), terms[i].as_bytes()); + } + + } +} + + + diff --git a/src/termdict/streamdict/termdict.rs b/src/termdict/streamdict/termdict.rs index 3a3b6101b..5c15652cb 100644 --- a/src/termdict/streamdict/termdict.rs +++ b/src/termdict/streamdict/termdict.rs @@ -3,7 +3,6 @@ use std::io::{self, Write}; use fst; use fst::raw::Fst; -use common::VInt; use directory::ReadOnlySource; use common::BinarySerializable; use common::CountingWriter; @@ -12,13 +11,15 @@ use std::cmp::Ordering; use postings::TermInfo; use schema::FieldType; use fst::raw::Node; +use compression::NUM_DOCS_PER_BLOCK; use super::make_deserializer_options; use super::TermDeserializerOption; use super::streamer::stream_before; use termdict::{TermDictionary, TermDictionaryBuilder, TermStreamer}; +use super::{TermBlockEncoder, TermInfoBlockEncoder}; use super::{TermStreamerImpl, TermStreamerBuilderImpl}; -const BLOCK_SIZE: usize = 1024; +const INDEX_INTERVAL: usize = 1024; fn convert_fst_error(e: fst::Error) -> io::Error { io::Error::new(io::ErrorKind::Other, e) @@ -28,19 +29,16 @@ fn convert_fst_error(e: fst::Error) -> io::Error { pub struct TermDictionaryBuilderImpl { write: CountingWriter, + + term_block_encoder: TermBlockEncoder, + terminfo_block_encoder: TermInfoBlockEncoder, + block_index: fst::MapBuilder>, last_key: Vec, + len: usize, - deserializer_options: TermDeserializerOption, } -fn common_prefix_length(left: &[u8], right: &[u8]) -> usize { - left.iter() - .cloned() - .zip(right.iter().cloned()) - .take_while(|&(b1, b2)| b1 == b2) - .count() -} fn fill_last<'a>(fst: &'a Fst, mut node: Node<'a>, buffer: &mut Vec) { while let Some(transition) = node.transitions().last() { @@ -66,30 +64,31 @@ impl TermDictionaryBuilderImpl /// /// Prefer using `.insert(key, value)` pub(crate) fn insert_key(&mut self, key: &[u8]) -> io::Result<()> { - if self.len % BLOCK_SIZE == 0 { + if self.len % INDEX_INTERVAL == 0 { self.add_index_entry(); } + self.last_key.clear(); + self.last_key.extend_from_slice(key); + self.term_block_encoder.encode(key); self.len += 1; - let common_len = common_prefix_length(key, &self.last_key); - VInt(common_len as u64).serialize(&mut self.write)?; - self.last_key.truncate(common_len); - self.last_key.extend_from_slice(&key[common_len..]); - VInt((key.len() - common_len) as u64) - .serialize(&mut self.write)?; - self.write.write_all(&key[common_len..])?; + Ok(()) + } + + fn flush_block(&mut self) -> io::Result<()> { + let block_size = self.term_block_encoder.len(); + if block_size > 0 { + self.write.write(&[block_size as u8])?; + self.term_block_encoder.flush(&mut self.write)?; + self.terminfo_block_encoder.flush(&mut self.write)?; + } Ok(()) } pub(crate) fn insert_value(&mut self, value: &TermInfo) -> io::Result<()> { - - VInt(value.doc_freq as u64).serialize(&mut self.write)?; - VInt(value.postings_offset as u64).serialize(&mut self.write)?; - - if self.deserializer_options == TermDeserializerOption::StrWithPositions { - VInt(value.positions_offset as u64).serialize(&mut self.write)?; - self.write.write_all(&[value.positions_inner_offset])?; + self.terminfo_block_encoder.encode(value); + if self.len % NUM_DOCS_PER_BLOCK == 0 { + self.flush_block()?; } - Ok(()) } } @@ -107,13 +106,15 @@ impl TermDictionaryBuilder for TermDictionaryBuilderImpl write.write_all(&[data.len() as u8])?; write.write_all(&data[..])?; } + let has_positions = deserializer_options.has_positions(); Ok(TermDictionaryBuilderImpl { - write: CountingWriter::wrap(write), - block_index: fst::MapBuilder::new(vec![]).expect("This cannot fail"), - last_key: Vec::with_capacity(128), - len: 0, - deserializer_options: deserializer_options, - }) + term_block_encoder: TermBlockEncoder::new(), + terminfo_block_encoder: TermInfoBlockEncoder::new(has_positions), + write: CountingWriter::wrap(write), + block_index: fst::MapBuilder::new(vec![]).expect("This cannot fail"), + last_key: Vec::with_capacity(128), + len: 0, + }) } /// Inserts a `(key, value)` pair in the term dictionary. @@ -128,7 +129,9 @@ impl TermDictionaryBuilder for TermDictionaryBuilderImpl /// Finalize writing the builder, and returns the underlying /// `Write` object. fn finish(mut self) -> io::Result { + self.flush_block()?; self.add_index_entry(); + self.write.write_all(&[0u8])?; let (mut w, split_len) = self.write.finish()?; let fst_write = self.block_index.into_inner().map_err(convert_fst_error)?; w.write_all(&fst_write)?; @@ -253,7 +256,7 @@ impl<'a> TermDictionary<'a> for TermDictionaryImpl let position = streamer.key().cmp(target_key.as_ref()); match position { Ordering::Less => {} - Ordering::Equal => return Some(streamer.extract_value()), + Ordering::Equal => return Some(streamer.value().clone()), Ordering::Greater => { return None; } diff --git a/src/termdict/streamdict/terminfo_block_encoder.rs b/src/termdict/streamdict/terminfo_block_encoder.rs new file mode 100644 index 000000000..c2d2ef26b --- /dev/null +++ b/src/termdict/streamdict/terminfo_block_encoder.rs @@ -0,0 +1,117 @@ +use compression::{BlockEncoder, BlockDecoder, VIntEncoder, VIntDecoder, NUM_DOCS_PER_BLOCK}; +use postings::TermInfo; +use std::io::{self, Write}; + +pub struct TermInfoBlockEncoder { + block_encoder: BlockEncoder, + + doc_freqs: [u32; NUM_DOCS_PER_BLOCK], + postings_offsets: [u32; NUM_DOCS_PER_BLOCK], + positions_offsets: [u32; NUM_DOCS_PER_BLOCK], + positions_inner_offset: [u8; NUM_DOCS_PER_BLOCK], + + cursor: usize, + encode_positions: bool, +} + +impl TermInfoBlockEncoder { + pub fn new(encode_positions: bool) -> TermInfoBlockEncoder { + TermInfoBlockEncoder { + block_encoder: BlockEncoder::new(), + + doc_freqs: [0u32; NUM_DOCS_PER_BLOCK], + postings_offsets: [0u32; NUM_DOCS_PER_BLOCK], + positions_offsets: [0u32; NUM_DOCS_PER_BLOCK], + positions_inner_offset: [0u8; NUM_DOCS_PER_BLOCK], + + cursor: 0, + encode_positions: encode_positions, + } + } + + pub fn encode(&mut self, term_info: &TermInfo) { + self.doc_freqs[self.cursor] = term_info.doc_freq; + self.postings_offsets[self.cursor] = term_info.postings_offset; + self.positions_offsets[self.cursor] = term_info.positions_offset; + self.positions_inner_offset[self.cursor] = term_info.positions_inner_offset; + self.cursor += 1; + } + + pub fn flush(&mut self, output: &mut W) -> io::Result<()> { + output.write_all(self.block_encoder.compress_vint_unsorted(&self.doc_freqs[..self.cursor]))?; + output.write_all(self.block_encoder.compress_vint_sorted(&self.postings_offsets[..self.cursor], 0u32))?; + if self.encode_positions { + output.write_all(self.block_encoder.compress_vint_sorted(&self.positions_offsets[..self.cursor], 0u32))?; + output.write_all(&self.positions_inner_offset[..self.cursor])?; + } + self.cursor = 0; + Ok(()) + } +} + + + +pub struct TermInfoBlockDecoder<'a> { + doc_freq_decoder: BlockDecoder, + postings_decoder: BlockDecoder, + positions_decoder: BlockDecoder, + positions_inner_offset: &'a [u8], + current_term_info: TermInfo, + + cursor: usize, + has_positions: bool, +} + + +impl<'a> TermInfoBlockDecoder<'a> { + pub fn new(has_positions: bool) -> TermInfoBlockDecoder<'a> { + TermInfoBlockDecoder { + doc_freq_decoder: BlockDecoder::new(), + postings_decoder: BlockDecoder::new(), + positions_decoder: BlockDecoder::new(), + positions_inner_offset: &[], + + current_term_info: TermInfo::default(), + cursor: 0, + has_positions: has_positions, + } + } + + + pub fn term_info(&self) -> &TermInfo { + &self.current_term_info + } + + pub fn decode_block(&mut self, mut compressed_data: &'a [u8], num_els: usize) -> &'a [u8] { + self.cursor = 0; + { + let consumed_size = self.doc_freq_decoder.uncompress_vint_unsorted(compressed_data, num_els); + compressed_data = &compressed_data[consumed_size..]; + } + { + let consumed_size = self.postings_decoder.uncompress_vint_sorted(compressed_data, 0u32, num_els); + compressed_data = &compressed_data[consumed_size..]; + } + if self.has_positions { + let consumed_size = self.positions_decoder.uncompress_vint_sorted(compressed_data, 0u32, num_els); + compressed_data = &compressed_data[consumed_size..]; + self.positions_inner_offset = &compressed_data[..num_els]; + &compressed_data[num_els..] + } + else { + compressed_data + } + } + + pub fn advance(&mut self) { + assert!(self.cursor < NUM_DOCS_PER_BLOCK); + self.current_term_info.doc_freq = self.doc_freq_decoder.output(self.cursor); + self.current_term_info.postings_offset = self.postings_decoder.output(self.cursor); + if self.has_positions { + self.current_term_info.positions_offset = self.positions_decoder.output(self.cursor); + self.current_term_info.positions_inner_offset = self.positions_inner_offset[self.cursor]; + } + self.cursor += 1; + } + +} \ No newline at end of file