diff --git a/cpp/simdcomp_wrapper.c b/cpp/simdcomp_wrapper.c index 4530e3f3b..1ffff9778 100644 --- a/cpp/simdcomp_wrapper.c +++ b/cpp/simdcomp_wrapper.c @@ -40,3 +40,8 @@ size_t uncompress_unsorted( simdunpack((__m128i *)compressed_data, output, b); return 1 + b * sizeof(__m128i); } + + +size_t compressedbytes(const uint32_t length, const uint8_t num_bits) { + return simdpack_compressedbytes((int)length, (uint32_t)num_bits); +} diff --git a/src/compression/mod.rs b/src/compression/mod.rs index 8a44c24b7..8384c65eb 100644 --- a/src/compression/mod.rs +++ b/src/compression/mod.rs @@ -1,6 +1,10 @@ #![allow(dead_code)] +mod stream; + +pub use self::stream::CompressedIntStream; + #[cfg(not(feature="simdcompression"))] mod pack { mod compression_pack_nosimd; @@ -13,7 +17,7 @@ mod pack { pub use self::compression_pack_simd::*; } -pub use self::pack::{BlockEncoder, BlockDecoder}; +pub use self::pack::{BlockEncoder, BlockDecoder, compressedbytes}; #[cfg( any(not(feature="simdcompression"), target_env="msvc") )] mod vint { diff --git a/src/compression/pack/compression_pack_simd.rs b/src/compression/pack/compression_pack_simd.rs index 78cf58c37..ba3518521 100644 --- a/src/compression/pack/compression_pack_simd.rs +++ b/src/compression/pack/compression_pack_simd.rs @@ -16,9 +16,15 @@ mod simdcomp { pub fn compress_unsorted(data: *const u32, output: *mut u8) -> size_t; pub fn uncompress_unsorted(compressed_data: *const u8, output: *mut u32) -> size_t; + + pub fn compressedbytes(length: u32, num_bits: u8) -> size_t; } } +pub fn compressedbytes(length: u32, num_bits: u8) -> usize { + unsafe { simdcomp::compressedbytes(length, num_bits) } +} + fn compress_sorted(vals: &[u32], output: &mut [u8], offset: u32) -> usize { unsafe { simdcomp::compress_sorted(vals.as_ptr(), output.as_mut_ptr(), offset) } } diff --git a/src/compression/stream.rs b/src/compression/stream.rs new file mode 100644 index 000000000..274310b77 --- /dev/null +++ b/src/compression/stream.rs @@ -0,0 +1,110 @@ +use compression::BlockDecoder; +use compression::NUM_DOCS_PER_BLOCK; +use compression::compressedbytes; + +pub struct CompressedIntStream<'a> { + buffer: &'a [u8], + block_decoder: BlockDecoder, + inner_offset: usize, +} + +impl<'a> CompressedIntStream<'a> { + fn wrap(buffer: &'a [u8]) -> CompressedIntStream<'a> { + CompressedIntStream { + buffer: buffer, + block_decoder: BlockDecoder::new(), + inner_offset: NUM_DOCS_PER_BLOCK, + } + } + + fn read(&mut self, mut output: &mut [u32]) { + let mut num_els: usize = output.len(); + let mut start: usize = 0; + loop { + let available = NUM_DOCS_PER_BLOCK - self.inner_offset; + if num_els >= available { + if available > 0 { + let uncompressed_block = &self.block_decoder.output_array()[self.inner_offset..]; + &mut output[start..start + available].clone_from_slice(uncompressed_block); + } + num_els -= available; + start += available; + self.buffer = self.block_decoder.uncompress_block_unsorted(self.buffer); + self.inner_offset = 0; + } + else { + let uncompressed_block = &self.block_decoder.output_array()[self.inner_offset..self.inner_offset + num_els]; + &output[start..start + num_els].clone_from_slice(uncompressed_block); + self.inner_offset += num_els; + break; + } + } + } + + fn skip(&mut self, mut skip_len: usize) { + let available = NUM_DOCS_PER_BLOCK - self.inner_offset; + if available >= skip_len { + self.inner_offset += skip_len; + } + else { + skip_len -= available; + // entirely skip decompressing some blocks. + while skip_len >= NUM_DOCS_PER_BLOCK { + skip_len -= NUM_DOCS_PER_BLOCK; + let num_bits: u8 = self.buffer[0]; + let block_len = compressedbytes(128, num_bits); + self.buffer = &self.buffer[1 + block_len..]; + } + self.buffer = self.block_decoder.uncompress_block_unsorted(self.buffer); + self.inner_offset = skip_len; + } + } +} + + +#[cfg(test)] +pub mod tests { + + use super::CompressedIntStream; + use tests; + use compression::compressedbytes; + use compression::NUM_DOCS_PER_BLOCK; + use compression::BlockEncoder; + + fn create_stream_buffer() -> Vec { + let mut buffer: Vec = vec!(); + let mut encoder = BlockEncoder::new(); + let vals: Vec = (0u32..1_025u32).collect(); + for chunk in vals.chunks(NUM_DOCS_PER_BLOCK) { + let compressed_block = encoder.compress_block_unsorted(chunk); + let num_bits = compressed_block[0]; + assert_eq!(compressedbytes(128, num_bits) + 1, compressed_block.len()); + buffer.extend_from_slice(compressed_block); + } + buffer + } + + #[test] + fn test_compressed_int_stream() { + let buffer = create_stream_buffer(); + let mut stream = CompressedIntStream::wrap(&buffer[..]); + let mut block: [u32; NUM_DOCS_PER_BLOCK] = [0u32; NUM_DOCS_PER_BLOCK]; + + stream.read(&mut block[0..2]); + assert_eq!(block[0], 0); + assert_eq!(block[1], 1); + stream.skip(5); + stream.read(&mut block[0..3]); + assert_eq!(block[0], 7); + assert_eq!(block[1], 8); + assert_eq!(block[2], 9); + stream.skip(500); + stream.read(&mut block[0..3]); + assert_eq!(block[0], 510); + assert_eq!(block[1], 511); + assert_eq!(block[2], 512); + stream.skip(511); + stream.read(&mut block[..1]); + assert_eq!(block[0], 1024); + } +} diff --git a/src/postings/freq_handler.rs b/src/postings/freq_handler.rs index 9bc6fb49e..acc91ce9a 100644 --- a/src/postings/freq_handler.rs +++ b/src/postings/freq_handler.rs @@ -10,7 +10,6 @@ pub struct FreqHandler { freq_decoder: BlockDecoder, positions: UnsafeCell>, option: SegmentPostingsOption, - positions_offsets: [usize; NUM_DOCS_PER_BLOCK + 1], } @@ -22,7 +21,6 @@ impl FreqHandler { freq_decoder: BlockDecoder::with_val(1u32), positions: UnsafeCell::new(Vec::with_capacity(0)), option: SegmentPostingsOption::NoFreq, - positions_offsets: [0; NUM_DOCS_PER_BLOCK + 1], } } @@ -32,7 +30,6 @@ impl FreqHandler { freq_decoder: BlockDecoder::new(), positions: UnsafeCell::new(Vec::with_capacity(0)), option: SegmentPostingsOption::Freq, - positions_offsets: [0; NUM_DOCS_PER_BLOCK + 1], } } @@ -42,7 +39,6 @@ impl FreqHandler { freq_decoder: BlockDecoder::new(), positions: UnsafeCell::new(Vec::with_capacity(NUM_DOCS_PER_BLOCK)), option: SegmentPostingsOption::FreqAndPositions, - positions_offsets: [0; NUM_DOCS_PER_BLOCK + 1], } }