Added CompressedIntStream

This commit is contained in:
Paul Masurel
2017-08-05 12:28:57 +09:00
parent aff7e64d4e
commit efb910f4e8
5 changed files with 126 additions and 5 deletions

View File

@@ -40,3 +40,8 @@ size_t uncompress_unsorted(
simdunpack((__m128i *)compressed_data, output, b);
return 1 + b * sizeof(__m128i);
}
size_t compressedbytes(const uint32_t length, const uint8_t num_bits) {
return simdpack_compressedbytes((int)length, (uint32_t)num_bits);
}

View File

@@ -1,6 +1,10 @@
#![allow(dead_code)]
mod stream;
pub use self::stream::CompressedIntStream;
#[cfg(not(feature="simdcompression"))]
mod pack {
mod compression_pack_nosimd;
@@ -13,7 +17,7 @@ mod pack {
pub use self::compression_pack_simd::*;
}
pub use self::pack::{BlockEncoder, BlockDecoder};
pub use self::pack::{BlockEncoder, BlockDecoder, compressedbytes};
#[cfg( any(not(feature="simdcompression"), target_env="msvc") )]
mod vint {

View File

@@ -16,9 +16,15 @@ mod simdcomp {
pub fn compress_unsorted(data: *const u32, output: *mut u8) -> size_t;
pub fn uncompress_unsorted(compressed_data: *const u8, output: *mut u32) -> size_t;
pub fn compressedbytes(length: u32, num_bits: u8) -> size_t;
}
}
pub fn compressedbytes(length: u32, num_bits: u8) -> usize {
unsafe { simdcomp::compressedbytes(length, num_bits) }
}
fn compress_sorted(vals: &[u32], output: &mut [u8], offset: u32) -> usize {
unsafe { simdcomp::compress_sorted(vals.as_ptr(), output.as_mut_ptr(), offset) }
}

110
src/compression/stream.rs Normal file
View File

@@ -0,0 +1,110 @@
use compression::BlockDecoder;
use compression::NUM_DOCS_PER_BLOCK;
use compression::compressedbytes;
pub struct CompressedIntStream<'a> {
buffer: &'a [u8],
block_decoder: BlockDecoder,
inner_offset: usize,
}
impl<'a> CompressedIntStream<'a> {
fn wrap(buffer: &'a [u8]) -> CompressedIntStream<'a> {
CompressedIntStream {
buffer: buffer,
block_decoder: BlockDecoder::new(),
inner_offset: NUM_DOCS_PER_BLOCK,
}
}
fn read(&mut self, mut output: &mut [u32]) {
let mut num_els: usize = output.len();
let mut start: usize = 0;
loop {
let available = NUM_DOCS_PER_BLOCK - self.inner_offset;
if num_els >= available {
if available > 0 {
let uncompressed_block = &self.block_decoder.output_array()[self.inner_offset..];
&mut output[start..start + available].clone_from_slice(uncompressed_block);
}
num_els -= available;
start += available;
self.buffer = self.block_decoder.uncompress_block_unsorted(self.buffer);
self.inner_offset = 0;
}
else {
let uncompressed_block = &self.block_decoder.output_array()[self.inner_offset..self.inner_offset + num_els];
&output[start..start + num_els].clone_from_slice(uncompressed_block);
self.inner_offset += num_els;
break;
}
}
}
fn skip(&mut self, mut skip_len: usize) {
let available = NUM_DOCS_PER_BLOCK - self.inner_offset;
if available >= skip_len {
self.inner_offset += skip_len;
}
else {
skip_len -= available;
// entirely skip decompressing some blocks.
while skip_len >= NUM_DOCS_PER_BLOCK {
skip_len -= NUM_DOCS_PER_BLOCK;
let num_bits: u8 = self.buffer[0];
let block_len = compressedbytes(128, num_bits);
self.buffer = &self.buffer[1 + block_len..];
}
self.buffer = self.block_decoder.uncompress_block_unsorted(self.buffer);
self.inner_offset = skip_len;
}
}
}
#[cfg(test)]
pub mod tests {
use super::CompressedIntStream;
use tests;
use compression::compressedbytes;
use compression::NUM_DOCS_PER_BLOCK;
use compression::BlockEncoder;
fn create_stream_buffer() -> Vec<u8> {
let mut buffer: Vec<u8> = vec!();
let mut encoder = BlockEncoder::new();
let vals: Vec<u32> = (0u32..1_025u32).collect();
for chunk in vals.chunks(NUM_DOCS_PER_BLOCK) {
let compressed_block = encoder.compress_block_unsorted(chunk);
let num_bits = compressed_block[0];
assert_eq!(compressedbytes(128, num_bits) + 1, compressed_block.len());
buffer.extend_from_slice(compressed_block);
}
buffer
}
#[test]
fn test_compressed_int_stream() {
let buffer = create_stream_buffer();
let mut stream = CompressedIntStream::wrap(&buffer[..]);
let mut block: [u32; NUM_DOCS_PER_BLOCK] = [0u32; NUM_DOCS_PER_BLOCK];
stream.read(&mut block[0..2]);
assert_eq!(block[0], 0);
assert_eq!(block[1], 1);
stream.skip(5);
stream.read(&mut block[0..3]);
assert_eq!(block[0], 7);
assert_eq!(block[1], 8);
assert_eq!(block[2], 9);
stream.skip(500);
stream.read(&mut block[0..3]);
assert_eq!(block[0], 510);
assert_eq!(block[1], 511);
assert_eq!(block[2], 512);
stream.skip(511);
stream.read(&mut block[..1]);
assert_eq!(block[0], 1024);
}
}

View File

@@ -10,7 +10,6 @@ pub struct FreqHandler {
freq_decoder: BlockDecoder,
positions: UnsafeCell<Vec<u32>>,
option: SegmentPostingsOption,
positions_offsets: [usize; NUM_DOCS_PER_BLOCK + 1],
}
@@ -22,7 +21,6 @@ impl FreqHandler {
freq_decoder: BlockDecoder::with_val(1u32),
positions: UnsafeCell::new(Vec::with_capacity(0)),
option: SegmentPostingsOption::NoFreq,
positions_offsets: [0; NUM_DOCS_PER_BLOCK + 1],
}
}
@@ -32,7 +30,6 @@ impl FreqHandler {
freq_decoder: BlockDecoder::new(),
positions: UnsafeCell::new(Vec::with_capacity(0)),
option: SegmentPostingsOption::Freq,
positions_offsets: [0; NUM_DOCS_PER_BLOCK + 1],
}
}
@@ -42,7 +39,6 @@ impl FreqHandler {
freq_decoder: BlockDecoder::new(),
positions: UnsafeCell::new(Vec::with_capacity(NUM_DOCS_PER_BLOCK)),
option: SegmentPostingsOption::FreqAndPositions,
positions_offsets: [0; NUM_DOCS_PER_BLOCK + 1],
}
}