From d1f61a50c1f1598c0698adc1c55dfd661efddca4 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Sun, 6 Aug 2017 16:03:07 +0900 Subject: [PATCH] issue/207 Lazily decompressing positions. --- cpp/simdcomp_wrapper.c | 5 -- src/compression/mod.rs | 20 ++++- src/compression/pack/compression_pack_simd.rs | 7 +- src/compression/stream.rs | 11 ++- src/core/segment_reader.rs | 4 +- src/indexer/merger.rs | 29 ++++++- src/postings/postings.rs | 20 ----- src/postings/segment_postings.rs | 84 +++++++++++-------- src/postings/segment_postings_option.rs | 5 ++ src/postings/serializer.rs | 3 - src/postings/vec_postings.rs | 5 -- 11 files changed, 110 insertions(+), 83 deletions(-) diff --git a/cpp/simdcomp_wrapper.c b/cpp/simdcomp_wrapper.c index 1ffff9778..4530e3f3b 100644 --- a/cpp/simdcomp_wrapper.c +++ b/cpp/simdcomp_wrapper.c @@ -40,8 +40,3 @@ size_t uncompress_unsorted( simdunpack((__m128i *)compressed_data, output, b); return 1 + b * sizeof(__m128i); } - - -size_t compressedbytes(const uint32_t length, const uint8_t num_bits) { - return simdpack_compressedbytes((int)length, (uint32_t)num_bits); -} diff --git a/src/compression/mod.rs b/src/compression/mod.rs index 8384c65eb..d8540892b 100644 --- a/src/compression/mod.rs +++ b/src/compression/mod.rs @@ -17,7 +17,7 @@ mod pack { pub use self::compression_pack_simd::*; } -pub use self::pack::{BlockEncoder, BlockDecoder, compressedbytes}; +pub use self::pack::{BlockEncoder, BlockDecoder}; #[cfg( any(not(feature="simdcompression"), target_env="msvc") )] mod vint { @@ -31,6 +31,10 @@ mod vint { pub use self::compression_vint_simd::*; } +/// Returns the size in bytes of a compressed block, given num_bits. +pub fn compressed_block_size(num_bits: u8) -> usize { + 1 + (num_bits as usize) * 16 +} pub trait VIntEncoder { fn compress_vint_sorted(&mut self, input: &[u32], offset: u32) -> &[u8]; @@ -87,6 +91,7 @@ pub mod tests { use super::*; use tests; use test::Bencher; + use std::iter; #[test] fn test_encode_sorted_block() { @@ -194,6 +199,19 @@ pub mod tests { b.iter(|| { decoder.uncompress_block_sorted(compressed, 0u32); }); } + #[test] + fn test_all_docs_compression_numbits() { + for num_bits in 0..33 { + let mut data: Vec = iter::repeat(0u32).take(128).collect(); + if num_bits > 0 { + data[0] = 1 << (num_bits - 1); + } + let mut encoder = BlockEncoder::new(); + let compressed = encoder.compress_block_unsorted(&data); + assert_eq!(compressed[0] as usize, num_bits); + assert_eq!(compressed.len(), compressed_block_size(compressed[0])); + } + } const NUM_INTS_BENCH_VINT: usize = 10; diff --git a/src/compression/pack/compression_pack_simd.rs b/src/compression/pack/compression_pack_simd.rs index ba3518521..6842e0cc2 100644 --- a/src/compression/pack/compression_pack_simd.rs +++ b/src/compression/pack/compression_pack_simd.rs @@ -16,15 +16,9 @@ mod simdcomp { pub fn compress_unsorted(data: *const u32, output: *mut u8) -> size_t; pub fn uncompress_unsorted(compressed_data: *const u8, output: *mut u32) -> size_t; - - pub fn compressedbytes(length: u32, num_bits: u8) -> size_t; } } -pub fn compressedbytes(length: u32, num_bits: u8) -> usize { - unsafe { simdcomp::compressedbytes(length, num_bits) } -} - fn compress_sorted(vals: &[u32], output: &mut [u8], offset: u32) -> usize { unsafe { simdcomp::compress_sorted(vals.as_ptr(), output.as_mut_ptr(), offset) } } @@ -123,4 +117,5 @@ mod tests { let compressed = encoder.compress_block_sorted(&data, 0u32); assert_eq!(compressed.len(), 17); } + } diff --git a/src/compression/stream.rs b/src/compression/stream.rs index b3bbc8716..735eb7bef 100644 --- a/src/compression/stream.rs +++ b/src/compression/stream.rs @@ -1,6 +1,6 @@ use compression::BlockDecoder; use compression::NUM_DOCS_PER_BLOCK; -use compression::compressedbytes; +use compression::compressed_block_size; pub struct CompressedIntStream<'a> { buffer: &'a [u8], @@ -52,8 +52,8 @@ impl<'a> CompressedIntStream<'a> { while skip_len >= NUM_DOCS_PER_BLOCK { skip_len -= NUM_DOCS_PER_BLOCK; let num_bits: u8 = self.buffer[0]; - let block_len = compressedbytes(128, num_bits); - self.buffer = &self.buffer[1 + block_len..]; + let block_len = compressed_block_size(num_bits); + self.buffer = &self.buffer[block_len..]; } self.buffer = self.block_decoder.uncompress_block_unsorted(self.buffer); self.inner_offset = skip_len; @@ -66,8 +66,7 @@ impl<'a> CompressedIntStream<'a> { pub mod tests { use super::CompressedIntStream; - use tests; - use compression::compressedbytes; + use compression::compressed_block_size; use compression::NUM_DOCS_PER_BLOCK; use compression::BlockEncoder; @@ -78,7 +77,7 @@ pub mod tests { for chunk in vals.chunks(NUM_DOCS_PER_BLOCK) { let compressed_block = encoder.compress_block_unsorted(chunk); let num_bits = compressed_block[0]; - assert_eq!(compressedbytes(128, num_bits) + 1, compressed_block.len()); + assert_eq!(compressed_block_size(num_bits), compressed_block.len()); buffer.extend_from_slice(compressed_block); } buffer diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index 56247e942..619888228 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -197,10 +197,10 @@ impl SegmentReader { /// For instance, requesting `SegmentPostingsOption::FreqAndPositions` for a /// `TextIndexingOptions` that does not index position will return a `SegmentPostings` /// with `DocId`s and frequencies. - pub fn read_postings<'a>(&'a self, + pub fn read_postings(&self, term: &Term, option: SegmentPostingsOption) - -> Option> { + -> Option { let field = term.field(); let field_entry = self.schema.get_field_entry(field); let term_info = get!(self.get_term_info(term)); diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 86f0a0e78..75f329186 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -61,6 +61,31 @@ fn extract_fast_field_reader(segment_reader: &SegmentReader, segment_reader.fast_fields_reader().open_reader(field) } +struct DeltaComputer { + buffer: Vec, +} + +impl DeltaComputer { + fn new() -> DeltaComputer { + DeltaComputer { buffer: vec![0u32; 512] } + } + + fn compute_delta(&mut self, positions: &[u32]) -> &[u32] { + if positions.len() > self.buffer.len() { + self.buffer.resize(positions.len(), 0u32); + } + let mut last_pos = 0u32; + let num_positions = positions.len(); + for i in 0..num_positions { + let cur_pos = positions[i]; + self.buffer[i] = cur_pos - last_pos; + last_pos = cur_pos; + } + &self.buffer[..positions.len()] + } +} + + impl IndexMerger { pub fn open(schema: Schema, segments: &[Segment]) -> Result { let mut readers = vec![]; @@ -169,6 +194,7 @@ impl IndexMerger { fn write_postings(&self, serializer: &mut PostingsSerializer) -> Result<()> { + let mut delta_computer = DeltaComputer::new(); let mut merged_terms = TermMerger::from(&self.readers[..]); let mut max_doc = 0; @@ -270,8 +296,9 @@ impl IndexMerger { old_to_new_doc_id[segment_postings.doc() as usize] { // we make sure to only write the term iff // there is at least one document. - let delta_positions: &[u32] = segment_postings.delta_positions(); + let positions: &[u32] = segment_postings.positions(); let term_freq = segment_postings.term_freq(); + let delta_positions = delta_computer.compute_delta(positions); serializer .write_doc(remapped_doc_id, term_freq, delta_positions)?; } diff --git a/src/postings/postings.rs b/src/postings/postings.rs index 29538e0d2..52f16198a 100644 --- a/src/postings/postings.rs +++ b/src/postings/postings.rs @@ -17,16 +17,6 @@ pub trait Postings: DocSet { /// Returns the list of positions of the term, expressed as a list of /// token ordinals. fn positions(&self) -> &[u32]; - /// Return the list of delta positions. - /// - /// Delta positions is simply the difference between - /// two consecutive positions. - /// The first delta position is the first position of the - /// term in the document. - /// - /// For instance, if positions are `[7,13,17]` - /// then delta positions `[7, 6, 4]` - fn delta_positions(&self) -> &[u32]; } impl Postings for Box { @@ -39,11 +29,6 @@ impl Postings for Box { let unboxed: &TPostings = self.borrow(); unboxed.positions() } - - fn delta_positions(&self) -> &[u32] { - let unboxed: &TPostings = self.borrow(); - unboxed.delta_positions() - } } impl<'a, TPostings: Postings> Postings for &'a mut TPostings { @@ -56,9 +41,4 @@ impl<'a, TPostings: Postings> Postings for &'a mut TPostings { let unref: &TPostings = *self; unref.positions() } - - fn delta_positions(&self) -> &[u32] { - let unref: &TPostings = *self; - unref.delta_positions() - } } diff --git a/src/postings/segment_postings.rs b/src/postings/segment_postings.rs index ab4805d5e..26810edf4 100644 --- a/src/postings/segment_postings.rs +++ b/src/postings/segment_postings.rs @@ -16,8 +16,6 @@ struct PositionComputer<'a> { // if none, position are already loaded in // the positions vec. position_to_skip: Option, - - delta_positions: Vec, positions: Vec, positions_stream: CompressedIntStream<'a>, } @@ -28,7 +26,6 @@ impl<'a> PositionComputer<'a> { PositionComputer { position_to_skip: None, positions: vec!(), - delta_positions: vec!(), positions_stream: positions_stream, } } @@ -42,24 +39,21 @@ impl<'a> PositionComputer<'a> { } pub fn positions(&mut self, term_freq: usize) -> &[u32] { - self.delta_positions(term_freq); - &self.positions[..term_freq] - } - - pub fn delta_positions(&mut self, term_freq: usize) -> &[u32] { if let Some(num_skip) = self.position_to_skip { - self.delta_positions.resize(term_freq, 0u32); - self.positions_stream.skip(num_skip); - self.positions_stream.read(&mut self.delta_positions[..term_freq]); + self.positions.resize(term_freq, 0u32); + + self.positions_stream.skip(num_skip); + self.positions_stream.read(&mut self.positions[..term_freq]); + let mut cum = 0u32; for i in 0..term_freq as usize { - cum += self.delta_positions[i]; + cum += self.positions[i]; self.positions[i] = cum; } self.position_to_skip = None; } - &self.delta_positions[..term_freq] + &self.positions[..term_freq] } } @@ -74,7 +68,6 @@ pub struct SegmentPostings<'a> { block_cursor: BlockSegmentPostings<'a>, cur: usize, delete_bitset: DeleteBitSet, - position_computer: Option>>, } @@ -111,6 +104,16 @@ impl<'a> SegmentPostings<'a> { position_computer: None, } } + + + fn position_add_skipusize>(&self, num_skips_fn: F) { + if let Some(ref position_computer) = self.position_computer.as_ref() { + let num_skips = num_skips_fn(); + unsafe { + (*position_computer.get()).add_skip(num_skips); + } + } + } } @@ -119,9 +122,7 @@ impl<'a> DocSet for SegmentPostings<'a> { // next needs to be called a first time to point to the correct element. #[inline] fn advance(&mut self) -> bool { - let mut pos_to_skip = 0u32; loop { - pos_to_skip += self.term_freq(); self.cur += 1; if self.cur >= self.block_cursor.block_len() { self.cur = 0; @@ -130,12 +131,8 @@ impl<'a> DocSet for SegmentPostings<'a> { return false; } } + self.position_add_skip(|| { self.term_freq() as usize }); if !self.delete_bitset.is_deleted(self.doc()) { - if let Some(ref mut position_computer) = self.position_computer.as_mut() { - unsafe { - (*position_computer.get()).add_skip(pos_to_skip as usize); - } - } return true; } } @@ -147,6 +144,10 @@ impl<'a> DocSet for SegmentPostings<'a> { return SkipResult::End; } + // in the following, thanks to the call to advance above, + // we know that the position is not loaded and we need + // to skip every doc_freq we cross. + // skip blocks until one that might contain the target loop { // check if we need to go to the next block @@ -155,13 +156,26 @@ impl<'a> DocSet for SegmentPostings<'a> { (block_docs[self.cur], block_docs[block_docs.len() - 1]) }; if target > last_doc_in_block { + + // we add skip for the current term independantly, + // so that position_add_skip will decide if it should + // just set itself to Some(0) or effectively + // add the term freq. + //let num_skips: u32 = ; + self.position_add_skip(|| { + let freqs_skipped = &self.block_cursor.freqs()[self.cur..]; + let sum_freq: u32 = freqs_skipped.iter().cloned().sum(); + sum_freq as usize + }); + if !self.block_cursor.advance() { return SkipResult::End; } + self.cur = 0; } else { if target < current_doc { - // We've overpassed the target after the first `advance` call + // We've passed the target after the first `advance` call // or we're at the beginning of a block. // Either way, we're on the first `DocId` greater than `target` return SkipResult::OverStep; @@ -207,6 +221,13 @@ impl<'a> DocSet for SegmentPostings<'a> { // `doc` is now >= `target` let doc = block_docs[start]; + + self.position_add_skip(|| { + let freqs_skipped = &self.block_cursor.freqs()[self.cur..start]; + let sum_freqs: u32 = freqs_skipped.iter().sum(); + sum_freqs as usize + }); + self.cur = start; if !self.delete_bitset.is_deleted(doc) { @@ -228,6 +249,7 @@ impl<'a> DocSet for SegmentPostings<'a> { self.len() } + /// Return the current document's `DocId`. #[inline] fn doc(&self) -> DocId { let docs = self.block_cursor.docs(); @@ -249,28 +271,19 @@ impl<'a> Postings for SegmentPostings<'a> { } fn positions(&self) -> &[u32] { - let term_freq = self.term_freq(); - let position_computer_ptr: *mut PositionComputer = self.position_computer - .as_ref() - .expect("Segment reader does not have positions.") - .get(); - unsafe { - (&mut *position_computer_ptr).positions(term_freq as usize) - } - } - - fn delta_positions(&self) -> &[u32] { let term_freq = self.term_freq(); self.position_computer .as_ref() .map(|position_computer| { unsafe { - (&mut *position_computer.get()).delta_positions(term_freq as usize) + (&mut *position_computer.get()).positions(term_freq as usize) } }) .unwrap_or(&EMPTY_POSITIONS[..]) } + + } /// `BlockSegmentPostings` is a cursor iterating over blocks @@ -351,16 +364,19 @@ impl<'a> BlockSegmentPostings<'a> { self.doc_decoder.output_array() } + /// Return the document at index `idx` of the block. #[inline] pub fn doc(&self, idx: usize) -> u32 { self.doc_decoder.output(idx) } + /// Return the array of `term freq` in the block. #[inline] pub fn freqs(&self) -> &[u32] { self.freq_decoder.output_array() } + /// Return the frequency at index `idx` of the block. #[inline] pub fn freq(&self, idx: usize) -> u32 { self.freq_decoder.output(idx) diff --git a/src/postings/segment_postings_option.rs b/src/postings/segment_postings_option.rs index 1f87d2e41..2aba4ec8e 100644 --- a/src/postings/segment_postings_option.rs +++ b/src/postings/segment_postings_option.rs @@ -17,6 +17,9 @@ pub enum SegmentPostingsOption { } impl SegmentPostingsOption { + + /// Returns true iff this option includes encoding + /// term frequencies. pub fn has_freq(&self) -> bool { match *self { SegmentPostingsOption::NoFreq => false, @@ -24,6 +27,8 @@ impl SegmentPostingsOption { } } + /// Returns true iff this option include encoding + /// term positions. pub fn has_positions(&self) -> bool { match *self { SegmentPostingsOption::FreqAndPositions => true, diff --git a/src/postings/serializer.rs b/src/postings/serializer.rs index 34ba47382..5c5e93a7d 100644 --- a/src/postings/serializer.rs +++ b/src/postings/serializer.rs @@ -12,11 +12,8 @@ use DocId; use core::Segment; use std::io::{self, Write}; use compression::VIntEncoder; -use common::VInt; -use common::BinarySerializable; use common::CountingWriter; use termdict::TermDictionaryBuilder; -use datastruct::{SkipList, SkipListBuilder}; /// `PostingsSerializer` is in charge of serializing diff --git a/src/postings/vec_postings.rs b/src/postings/vec_postings.rs index eb47933b4..8c9512fb1 100644 --- a/src/postings/vec_postings.rs +++ b/src/postings/vec_postings.rs @@ -54,11 +54,6 @@ impl Postings for VecPostings { fn positions(&self) -> &[u32] { &EMPTY_ARRAY } - - fn delta_positions(&self) -> &[u32] { - &EMPTY_ARRAY - } - } #[cfg(test)]