This commit is contained in:
Pascal Seitz
2021-04-30 16:29:02 +02:00
parent f38daab7f7
commit fde9d27482

View File

@@ -12,7 +12,7 @@ pub struct BlockedBitpacker {
// bitpacked blocks
compressed_blocks: Vec<u8>,
// uncompressed data, collected until BLOCK_SIZE
cache: Vec<u64>,
buffer: Vec<u64>,
offset_and_bits: Vec<BlockedBitpackerEntryMetaData>,
}
@@ -60,7 +60,7 @@ impl BlockedBitpacker {
compressed_blocks.resize(8, 0);
Self {
compressed_blocks,
cache: vec![],
buffer: vec![],
offset_and_bits: vec![],
}
}
@@ -71,56 +71,58 @@ impl BlockedBitpacker {
+ self.compressed_blocks.capacity()
+ self.offset_and_bits.capacity()
* std::mem::size_of_val(&self.offset_and_bits.get(0).cloned().unwrap_or_default())
+ self.cache.capacity()
* std::mem::size_of_val(&self.cache.get(0).cloned().unwrap_or_default())
+ self.buffer.capacity()
* std::mem::size_of_val(&self.buffer.get(0).cloned().unwrap_or_default())
}
pub fn add(&mut self, val: u64) {
self.cache.push(val);
if self.cache.len() == BLOCK_SIZE as usize {
self.buffer.push(val);
if self.buffer.len() == BLOCK_SIZE as usize {
self.flush();
}
}
pub fn flush(&mut self) {
if self.cache.is_empty() {
if let Some(min_value) = self.buffer.iter().min() {
let mut bit_packer = BitPacker::new();
let num_bits_block = self
.buffer
.iter()
.map(|val| compute_num_bits(*val - min_value))
.max()
.unwrap();
// todo performance: the padding handling could be done better, e.g. use a slice and
// return num_bytes written from bitpacker
self.compressed_blocks
.resize(self.compressed_blocks.len() - 8, 0); // remove padding for bitpacker
let offset = self.compressed_blocks.len() as u64;
// todo performance: for some bit_width we
// can encode multiple vals into the
// mini_buffer before checking to flush
// (to be done in BitPacker)
for val in self.buffer.iter() {
bit_packer
.write(
*val - min_value,
num_bits_block,
&mut self.compressed_blocks,
)
.expect("cannot write bitpacking to output"); // write to in memory can't fail
}
bit_packer.flush(&mut self.compressed_blocks).unwrap();
self.offset_and_bits
.push(BlockedBitpackerEntryMetaData::new(
offset,
num_bits_block,
*min_value,
));
self.buffer.clear();
self.compressed_blocks
.resize(self.compressed_blocks.len() + 8, 0); // add padding for bitpacker
} else {
return;
}
let mut bit_packer = BitPacker::new();
let base_value = self.cache.iter().min().unwrap();
let num_bits_block = self
.cache
.iter()
.map(|val| compute_num_bits(*val - base_value))
.max()
.unwrap();
self.compressed_blocks
.resize(self.compressed_blocks.len() - 8, 0); // remove padding for bitpacker
let offset = self.compressed_blocks.len() as u64;
// todo performance: for some bit_width we
// can encode multiple vals into the
// mini_buffer before checking to flush
// (to be done in BitPacker)
for val in self.cache.iter() {
bit_packer
.write(
*val - base_value,
num_bits_block,
&mut self.compressed_blocks,
)
.expect("cannot write bitpacking to output"); // write to im can't fail
}
bit_packer.flush(&mut self.compressed_blocks).unwrap();
self.offset_and_bits
.push(BlockedBitpackerEntryMetaData::new(
offset,
num_bits_block,
*base_value,
));
self.cache.clear();
self.compressed_blocks
.resize(self.compressed_blocks.len() + 8, 0); // add padding for bitpacker
}
pub fn get(&self, idx: usize) -> u64 {
let metadata_pos = idx / BLOCK_SIZE as usize;
@@ -132,7 +134,7 @@ impl BlockedBitpacker {
);
unpacked + metadata.base_value()
} else {
self.cache[pos_in_block]
self.buffer[pos_in_block]
}
}
@@ -141,7 +143,7 @@ impl BlockedBitpacker {
let bitpacked_elems = self.offset_and_bits.len() * BLOCK_SIZE;
let iter = (0..bitpacked_elems)
.map(move |idx| self.get(idx))
.chain(self.cache.iter().cloned());
.chain(self.buffer.iter().cloned());
iter
}
}