take ColumnV2 as parameter

improve algorithm
stricter assertions
improve names
This commit is contained in:
Pascal Seitz
2022-09-15 17:10:37 +08:00
parent 592caeefa0
commit 237b64025e
4 changed files with 145 additions and 106 deletions

View File

@@ -152,6 +152,42 @@ impl<'a, T: Copy + PartialOrd + Send + Sync> Column<T> for VecColumn<'a, T> {
}
}
impl<'a, T: Copy + PartialOrd> ColumnV2<T> for VecColumn<'a, T> {
fn get_val(&self, position: u64) -> Option<T> {
Some(self.values[position as usize])
}
fn min_value(&self) -> T {
self.min_value
}
fn max_value(&self) -> T {
self.max_value
}
fn num_vals(&self) -> u64 {
self.values.len() as u64
}
}
impl<'a, T: Copy + PartialOrd> ColumnV2<T> for VecColumn<'a, Option<T>> {
fn get_val(&self, position: u64) -> Option<T> {
self.values[position as usize]
}
fn min_value(&self) -> T {
self.min_value.unwrap()
}
fn max_value(&self) -> T {
self.max_value.unwrap()
}
fn num_vals(&self) -> u64 {
self.values.len() as u64
}
}
impl<'a, T: Copy + Ord + Default, V> From<&'a V> for VecColumn<'a, T>
where V: AsRef<[T]> + ?Sized
{

View File

@@ -1,4 +1,5 @@
use std::collections::{BTreeSet, BinaryHeap};
use std::iter;
use std::ops::RangeInclusive;
use itertools::Itertools;
@@ -9,35 +10,16 @@ use super::{CompactSpace, RangeMapping};
/// Put the blanks for the sorted values into a binary heap
fn get_blanks(values_sorted: &BTreeSet<u128>) -> BinaryHeap<BlankRange> {
let mut blanks: BinaryHeap<BlankRange> = BinaryHeap::new();
let mut add_range = |blank_range: RangeInclusive<u128>| {
let blank_range: Result<BlankRange, _> = blank_range.try_into();
if let Ok(blank_range) = blank_range {
blanks.push(blank_range);
}
};
for (first, second) in values_sorted.iter().tuple_windows() {
// Correctness Overflow: the values are deduped and sorted (BTreeSet property), that means
// there's always space between two values.
let blank_range = first + 1..=second - 1;
add_range(blank_range);
}
// Replace after stabilization of https://github.com/rust-lang/rust/issues/62924
// Add preceeding range if values don't start at 0
if let Some(first_val) = values_sorted.iter().next() {
if *first_val != 0 {
let blank_range = 0..=first_val - 1;
add_range(blank_range);
let blank_range: Result<BlankRange, _> = blank_range.try_into();
if let Ok(blank_range) = blank_range {
blanks.push(blank_range);
}
}
// Add succeeding range if values don't end at u128::MAX
if let Some(last_val) = values_sorted.iter().last() {
if *last_val != u128::MAX {
let blank_range = last_val + 1..=u128::MAX;
add_range(blank_range);
}
}
blanks
}
@@ -75,32 +57,46 @@ fn num_bits(val: u128) -> u8 {
/// metadata.
pub fn get_compact_space(
values_deduped_sorted: &BTreeSet<u128>,
total_num_values: usize,
total_num_values: u64,
cost_per_blank: usize,
) -> CompactSpace {
let mut compact_space = CompactSpaceBuilder::new();
let mut compact_space_builder = CompactSpaceBuilder::new();
if values_deduped_sorted.is_empty() {
return compact_space.finish();
return compact_space_builder.finish();
}
let mut blanks: BinaryHeap<BlankRange> = get_blanks(values_deduped_sorted);
let mut amplitude_compact_space = u128::MAX;
// Replace after stabilization of https://github.com/rust-lang/rust/issues/62924
// We start by space that's limited to min_value..=max_value
let min_value = *values_deduped_sorted.iter().next().unwrap_or(&0);
let max_value = *values_deduped_sorted.iter().last().unwrap_or(&0);
// +1 for null, in case min and max covers the whole space, we are off by one.
let mut amplitude_compact_space = (max_value - min_value).saturating_add(1);
if min_value != 0 {
compact_space_builder.add_blanks(iter::once(0..=min_value - 1));
}
if max_value != u128::MAX {
compact_space_builder.add_blanks(iter::once(max_value + 1..=u128::MAX));
}
let mut amplitude_bits: u8 = num_bits(amplitude_compact_space);
let mut blank_collector = BlankCollector::new();
// We will stage blanks until they reduce the compact space by 1 bit.
// We will stage blanks until they reduce the compact space by at least 1 bit and then flush
// them if the metadata cost is lower than the total number of saved bits.
// Binary heap to process the gaps by their size
while let Some(blank_range) = blanks.pop() {
blank_collector.stage_blank(blank_range);
let staged_spaces_sum: u128 = blank_collector.staged_blanks_sum();
// +1 for later added null value
let amplitude_new_compact_space = amplitude_compact_space - staged_spaces_sum + 1;
let amplitude_new_compact_space = amplitude_compact_space - staged_spaces_sum;
let amplitude_new_bits = num_bits(amplitude_new_compact_space);
if amplitude_bits == amplitude_new_bits {
continue;
}
let saved_bits = (amplitude_bits - amplitude_new_bits) as usize * total_num_values;
let saved_bits = (amplitude_bits - amplitude_new_bits) as usize * total_num_values as usize;
// TODO: Maybe calculate exact cost of blanks and run this more expensive computation only,
// when amplitude_new_bits changes
let cost = blank_collector.num_staged_blanks() * cost_per_blank;
@@ -116,7 +112,7 @@ pub fn get_compact_space(
amplitude_compact_space = amplitude_new_compact_space;
amplitude_bits = amplitude_new_bits;
compact_space.add_blanks(blank_collector.drain().map(|blank| blank.blank_range()));
compact_space_builder.add_blanks(blank_collector.drain().map(|blank| blank.blank_range()));
}
// special case, when we don't collected any blanks because:
@@ -126,8 +122,8 @@ pub fn get_compact_space(
// We drain one collected blank unconditionally, so the empty case is reserved for empty
// data, and therefore empty compact_space means the data is empty and no data is covered
// (conversely to all data) and we can assign null to it.
if compact_space.is_empty() {
compact_space.add_blanks(
if compact_space_builder.is_empty() {
compact_space_builder.add_blanks(
blank_collector
.drain()
.map(|blank| blank.blank_range())
@@ -135,7 +131,14 @@ pub fn get_compact_space(
);
}
compact_space.finish()
let compact_space = compact_space_builder.finish();
if max_value - min_value != u128::MAX {
debug_assert_eq!(
compact_space.amplitude_compact_space(),
amplitude_compact_space
);
}
compact_space
}
#[derive(Debug, Clone, Eq, PartialEq)]
@@ -146,7 +149,7 @@ struct CompactSpaceBuilder {
impl CompactSpaceBuilder {
/// Creates a new compact space builder which will initially cover the whole space.
fn new() -> Self {
Self { blanks: vec![] }
Self { blanks: Vec::new() }
}
/// Assumes that repeated add_blank calls don't overlap and are not adjacent,

View File

@@ -123,7 +123,7 @@ impl CompactSpace {
fn amplitude_compact_space(&self) -> u128 {
self.ranges_mapping
.last()
.map(|last_range| last_range.compact_end() as u128 + 1)
.map(|last_range| last_range.compact_end() as u128)
.unwrap_or(1) // compact space starts at 1, 0 == null
}
@@ -133,7 +133,7 @@ impl CompactSpace {
/// Returns either Ok(the value in the compact space) or if it is outside the compact space the
/// Err(position where it would be inserted)
fn to_compact(&self, value: u128) -> Result<u64, usize> {
fn u128_to_compact(&self, value: u128) -> Result<u64, usize> {
self.ranges_mapping
.binary_search_by(|probe| {
let value_range = &probe.value_range;
@@ -153,7 +153,7 @@ impl CompactSpace {
}
/// Unpacks a value from compact space u64 to u128 space
fn unpack(&self, compact: u64) -> u128 {
fn compact_to_u128(&self, compact: u64) -> u128 {
let pos = self
.ranges_mapping
.binary_search_by_key(&compact, |range_mapping| range_mapping.compact_start)
@@ -182,14 +182,39 @@ pub struct IPCodecParams {
impl CompactSpaceCompressor {
/// Taking the vals as Vec may cost a lot of memory. It is used to sort the vals.
pub fn train_from(
vals: impl Iterator<Item = u128>,
total_num_values_incl_nulls: usize,
) -> Self {
let mut tree = BTreeSet::new();
tree.extend(vals);
assert!(tree.len() <= total_num_values_incl_nulls);
train(&tree, total_num_values_incl_nulls)
pub fn train_from(column: impl ColumnV2<u128>) -> Self {
let mut values_sorted = BTreeSet::new();
values_sorted.extend(column.iter().flatten());
let total_num_values = column.num_vals();
let compact_space =
get_compact_space(&values_sorted, total_num_values, COST_PER_BLANK_IN_BITS);
let amplitude_compact_space = compact_space.amplitude_compact_space();
assert!(
amplitude_compact_space <= u64::MAX as u128,
"case unsupported."
);
let num_bits = tantivy_bitpacker::compute_num_bits(amplitude_compact_space as u64);
let min_value = *values_sorted.iter().next().unwrap_or(&0);
let max_value = *values_sorted.iter().last().unwrap_or(&0);
assert_eq!(
compact_space
.u128_to_compact(max_value)
.expect("could not convert max value to compact space"),
amplitude_compact_space as u64
);
CompactSpaceCompressor {
params: IPCodecParams {
compact_space,
bit_unpacker: BitUnpacker::new(num_bits),
min_value,
max_value,
num_vals: total_num_values as u64,
num_bits,
},
}
}
fn write_footer(self, writer: &mut impl Write) -> io::Result<()> {
@@ -216,12 +241,15 @@ impl CompactSpaceCompressor {
let mut bitpacker = BitPacker::default();
for val in vals {
let compact = if let Some(val) = val {
self.params.compact_space.to_compact(val).map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidData,
"Could not convert value to compact_space. This is a bug.",
)
})?
self.params
.compact_space
.u128_to_compact(val)
.map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidData,
"Could not convert value to compact_space. This is a bug.",
)
})?
} else {
NULL_VALUE_COMPACT_SPACE
};
@@ -233,36 +261,6 @@ impl CompactSpaceCompressor {
}
}
fn train(values_sorted: &BTreeSet<u128>, total_num_values: usize) -> CompactSpaceCompressor {
let compact_space = get_compact_space(values_sorted, total_num_values, COST_PER_BLANK_IN_BITS);
let amplitude_compact_space = compact_space.amplitude_compact_space();
assert!(
amplitude_compact_space <= u64::MAX as u128,
"case unsupported."
);
let num_bits = tantivy_bitpacker::compute_num_bits(amplitude_compact_space as u64);
let min_value = *values_sorted.iter().next().unwrap_or(&0);
let max_value = *values_sorted.iter().last().unwrap_or(&0);
assert!(
compact_space
.to_compact(max_value)
.expect("could not convert max value to compact space")
< amplitude_compact_space as u64
);
CompactSpaceCompressor {
params: IPCodecParams {
compact_space,
bit_unpacker: BitUnpacker::new(num_bits),
min_value,
max_value,
num_vals: total_num_values as u64,
num_bits,
},
}
}
#[derive(Debug, Clone)]
pub struct CompactSpaceDecompressor {
data: OwnedBytes,
@@ -353,12 +351,12 @@ impl CompactSpaceDecompressor {
///
/// and we want a mapping for 1005, there is no equivalent compact space. We instead return an
/// error with the index of the next range.
fn to_compact(&self, value: u128) -> Result<u64, usize> {
self.params.compact_space.to_compact(value)
fn u128_to_compact(&self, value: u128) -> Result<u64, usize> {
self.params.compact_space.u128_to_compact(value)
}
fn compact_to_u128(&self, compact: u64) -> u128 {
self.params.compact_space.unpack(compact)
self.params.compact_space.compact_to_u128(compact)
}
/// Comparing on compact space: 1.08 GElements/s, which equals a throughput of 17,3 Gb/s
@@ -372,8 +370,8 @@ impl CompactSpaceDecompressor {
let from_value = *range.start();
let to_value = *range.end();
assert!(to_value >= from_value);
let compact_from = self.to_compact(from_value);
let compact_to = self.to_compact(to_value);
let compact_from = self.u128_to_compact(from_value);
let compact_to = self.u128_to_compact(to_value);
// Quick return, if both ranges fall into the same non-mapped space, the range can't cover
// any values, so we can early exit
@@ -477,6 +475,7 @@ impl CompactSpaceDecompressor {
mod tests {
use super::*;
use crate::VecColumn;
#[test]
fn compact_space_test() {
@@ -485,12 +484,12 @@ mod tests {
]
.into_iter()
.collect();
let compact_space = get_compact_space(ips, ips.len(), 11);
let compact_space = get_compact_space(ips, ips.len() as u64, 11);
let amplitude = compact_space.amplitude_compact_space();
assert_eq!(amplitude, 20);
assert_eq!(3, compact_space.to_compact(2).unwrap());
assert_eq!(4, compact_space.to_compact(3).unwrap());
assert_eq!(compact_space.to_compact(100).unwrap_err(), 1);
assert_eq!(amplitude, 17);
assert_eq!(1, compact_space.u128_to_compact(2).unwrap());
assert_eq!(2, compact_space.u128_to_compact(3).unwrap());
assert_eq!(compact_space.u128_to_compact(100).unwrap_err(), 1);
for (num1, num2) in (0..3).tuple_windows() {
assert_eq!(
@@ -508,17 +507,17 @@ mod tests {
);
for ip in ips {
let compact = compact_space.to_compact(*ip).unwrap();
assert_eq!(compact_space.unpack(compact), *ip);
let compact = compact_space.u128_to_compact(*ip).unwrap();
assert_eq!(compact_space.compact_to_u128(compact), *ip);
}
}
#[test]
fn compact_space_amplitude_test() {
let ips = &[100000u128, 1000000].into_iter().collect();
let compact_space = get_compact_space(ips, ips.len(), 1);
let compact_space = get_compact_space(ips, ips.len() as u64, 1);
let amplitude = compact_space.amplitude_compact_space();
assert_eq!(amplitude, 3);
assert_eq!(amplitude, 2);
}
fn test_all(data: OwnedBytes, expected: &[Option<u128>]) {
@@ -547,10 +546,7 @@ mod tests {
}
fn test_aux_vals_opt(u128_vals: &[Option<u128>]) -> OwnedBytes {
let compressor = CompactSpaceCompressor::train_from(
u128_vals.iter().cloned().flatten(),
u128_vals.len(),
);
let compressor = CompactSpaceCompressor::train_from(VecColumn::from(u128_vals));
let data = compressor.compress(u128_vals.iter().cloned()).unwrap();
let data = OwnedBytes::new(data);
test_all(data.clone(), u128_vals);
@@ -628,9 +624,8 @@ mod tests {
#[test]
fn test_null() {
let vals = &[2u128];
let compressor = CompactSpaceCompressor::train_from(vals.iter().cloned(), 2);
let vals = vec![None, Some(2u128)];
let compressor = CompactSpaceCompressor::train_from(VecColumn::from(&vals));
let data = compressor.compress(vals.iter().cloned()).unwrap();
let decomp = CompactSpaceDecompressor::open(OwnedBytes::new(data)).unwrap();
let positions = decomp.get_range(0..=1);
@@ -668,7 +663,7 @@ mod tests {
1_000_000,
5_000_000_000,
];
let compressor = CompactSpaceCompressor::train_from(vals.iter().cloned(), vals.len());
let compressor = CompactSpaceCompressor::train_from(VecColumn::from(vals));
let data = compressor.compress(vals.iter().cloned().map(Some)).unwrap();
let decomp = CompactSpaceDecompressor::open(OwnedBytes::new(data)).unwrap();
@@ -696,6 +691,12 @@ mod tests {
let _data = test_aux_vals(vals);
}
#[test]
fn test_bug4() {
let vals = &[340282366920938463463374607431768211455, 0];
let _data = test_aux_vals(vals);
}
#[test]
fn test_first_large_gaps() {
let vals = &[1_000_000_000u128; 100];

View File

@@ -93,8 +93,7 @@ fn bench_ip() {
{
let mut data = vec![];
for dataset in dataset.chunks(50_000) {
let compressor =
CompactSpaceCompressor::train_from(dataset.iter().cloned(), dataset.len());
let compressor = CompactSpaceCompressor::train_from(VecColumn::from(dataset));
compressor
.compress_into(dataset.iter().cloned().map(Some), &mut data)
.unwrap();
@@ -107,7 +106,7 @@ fn bench_ip() {
);
}
let compressor = CompactSpaceCompressor::train_from(dataset.iter().cloned(), dataset.len());
let compressor = CompactSpaceCompressor::train_from(VecColumn::from(&dataset));
let data = compressor
.compress(dataset.iter().cloned().map(Some))
.unwrap();