refactor, use BTreeSet for sorted deduped values

This commit is contained in:
Pascal Seitz
2022-09-12 15:10:40 +08:00
parent 762e662bfd
commit df32ee2df2
5 changed files with 342 additions and 322 deletions

View File

@@ -16,7 +16,7 @@ prettytable-rs = {version="0.9.0", optional= true}
rand = {version="0.8.3", optional= true}
fastdivide = "0.4"
log = "0.4"
itertools = { version="0.10.3", optional=true}
itertools = { version = "0.10.3", optional = true }
measure_time = { version="0.8.2", optional=true}
[dev-dependencies]

View File

@@ -12,8 +12,7 @@
/// The codec is created to compress ip addresses, but may be employed in other use cases.
use std::{
cmp::Ordering,
collections::BinaryHeap,
convert::{TryFrom, TryInto},
collections::BTreeSet,
io::{self, Write},
net::{IpAddr, Ipv6Addr},
ops::RangeInclusive,
@@ -24,6 +23,10 @@ use ownedbytes::OwnedBytes;
use tantivy_bitpacker::{self, BitPacker, BitUnpacker};
use crate::column::{ColumnV2, ColumnV2Ext};
use crate::compact_space::build_compact_space::get_compact_space;
mod blank_range;
mod build_compact_space;
pub fn ip_to_u128(ip_addr: IpAddr) -> u128 {
let ip_addr_v6: Ipv6Addr = match ip_addr {
@@ -39,256 +42,8 @@ pub fn ip_to_u128(ip_addr: IpAddr) -> u128 {
/// The number is taken by looking at a real dataset. It is optimized for larger datasets.
const COST_PER_BLANK_IN_BITS: usize = 36;
/// The range of a blank in value space.
///
/// A blank is an unoccupied space in the data.
/// Ordered by size
///
/// Use the try_into(), invalid ranges will be rejected.
///
/// TODO: move to own module to force try_into
#[derive(Debug, Eq, PartialEq, Clone)]
struct BlankRange {
blank_range: RangeInclusive<u128>,
}
impl TryFrom<RangeInclusive<u128>> for BlankRange {
type Error = &'static str;
fn try_from(range: RangeInclusive<u128>) -> Result<Self, Self::Error> {
let blank_size = range.end().saturating_sub(*range.start());
if blank_size < 2 {
Err("invalid range")
} else {
Ok(BlankRange { blank_range: range })
}
}
}
impl BlankRange {
fn blank_size(&self) -> u128 {
self.blank_range.end() - self.blank_range.start()
}
}
impl Ord for BlankRange {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.blank_size().cmp(&other.blank_size())
}
}
impl PartialOrd for BlankRange {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
self.blank_size().partial_cmp(&other.blank_size())
}
}
/// Put the blanks for the sorted values into a binary heap
fn get_blanks(values_sorted: &[u128]) -> BinaryHeap<BlankRange> {
let mut blanks: BinaryHeap<BlankRange> = BinaryHeap::new();
let mut add_range = |blank_range: RangeInclusive<u128>| {
let blank_range: Result<BlankRange, _> = blank_range.try_into();
if let Ok(blank_range) = blank_range {
blanks.push(blank_range);
}
};
for values in values_sorted.windows(2) {
let blank_range = values[0] + 1..=values[1] - 1;
add_range(blank_range);
}
if let Some(first_val) = values_sorted.first().filter(|first_val| **first_val != 0) {
let blank_range = 0..=first_val - 1;
add_range(blank_range);
}
if let Some(last_val) = values_sorted
.last()
.filter(|last_val| **last_val != u128::MAX)
{
let blank_range = last_val + 1..=u128::MAX;
add_range(blank_range);
}
blanks
}
struct BlankCollector {
blanks: Vec<BlankRange>,
staged_blanks_sum: u128,
}
impl BlankCollector {
fn new() -> Self {
Self {
blanks: vec![],
staged_blanks_sum: 0,
}
}
fn stage_blank(&mut self, blank: BlankRange) {
self.staged_blanks_sum += blank.blank_size();
self.blanks.push(blank);
}
fn drain(&mut self) -> std::vec::Drain<'_, BlankRange> {
self.staged_blanks_sum = 0;
self.blanks.drain(..)
}
fn staged_blanks_sum(&self) -> u128 {
self.staged_blanks_sum
}
fn num_blanks(&self) -> usize {
self.blanks.len()
}
}
fn num_bits(val: u128) -> u8 {
(128u32 - val.leading_zeros()) as u8
}
/// Will collect blanks and add them to compact space if more bits are saved than cost from
/// metadata.
fn get_compact_space(
values_deduped_sorted: &[u128],
total_num_values: usize,
cost_per_blank: usize,
) -> CompactSpace {
let mut blanks = get_blanks(values_deduped_sorted);
let mut amplitude_compact_space = u128::MAX;
let mut amplitude_bits: u8 = num_bits(amplitude_compact_space);
let mut compact_space = CompactSpaceBuilder::new();
if values_deduped_sorted.is_empty() {
return compact_space.finish();
}
let mut blank_collector = BlankCollector::new();
// We will stage blanks until they reduce the compact space by 1 bit.
// Binary heap to process the gaps by their size
while let Some(blank_range) = blanks.pop() {
blank_collector.stage_blank(blank_range);
let staged_spaces_sum: u128 = blank_collector.staged_blanks_sum();
// +1 for later added null value
let amplitude_new_compact_space = amplitude_compact_space - staged_spaces_sum + 1;
let amplitude_new_bits = num_bits(amplitude_new_compact_space);
if amplitude_bits == amplitude_new_bits {
continue;
}
let saved_bits = (amplitude_bits - amplitude_new_bits) as usize * total_num_values;
// TODO: Maybe calculate exact cost of blanks and run this more expensive computation only,
// when amplitude_new_bits changes
let cost = blank_collector.num_blanks() * cost_per_blank;
if cost >= saved_bits {
// Continue here, since although we walk over the blanks by size,
// we can potentially save a lot at the last bits, which are smaller blanks
//
// E.g. if the first range reduces the compact space by 1000 from 2000 to 1000, which
// saves 11-10=1 bit and the next range reduces the compact space by 950 to
// 50, which saves 10-6=4 bit
continue;
}
amplitude_compact_space = amplitude_new_compact_space;
amplitude_bits = amplitude_new_bits;
compact_space.add_blanks(blank_collector.drain().map(|blank| blank.blank_range));
}
// special case, when we don't collected any blanks because:
// * the data is empty
// * the algorithm did decide it's not worth the cost, which can be the case for single values
//
// We drain one collected blank unconditionally, so the empty case is reserved for empty
// data, and therefore empty compact_space means the data is empty and no data is covered
// (conversely to all data) and we can assign null to it.
if compact_space.is_empty() {
compact_space.add_blanks(
blank_collector
.drain()
.map(|blank| blank.blank_range)
.take(1),
);
}
compact_space.finish()
}
#[derive(Debug, Clone, Eq, PartialEq)]
struct CompactSpaceBuilder {
blanks: Vec<RangeInclusive<u128>>,
}
impl CompactSpaceBuilder {
/// Creates a new compact space builder which will initially cover the whole space.
fn new() -> Self {
Self { blanks: vec![] }
}
/// Assumes that repeated add_blank calls don't overlap and are not adjacent,
/// e.g. [3..=5, 5..=10] is not allowed
///
/// Both of those assumptions are true when blanks are produced from sorted values.
fn add_blanks(&mut self, blank: impl Iterator<Item = RangeInclusive<u128>>) {
self.blanks.extend(blank);
}
fn is_empty(&self) -> bool {
self.blanks.is_empty()
}
/// Convert blanks to covered space and assign null value
fn finish(mut self) -> CompactSpace {
// sort by start since ranges are not allowed to overlap
self.blanks.sort_by_key(|blank| *blank.start());
// Between the blanks
let mut covered_space = self
.blanks
.windows(2)
.map(|blanks| {
assert!(
blanks[0].end() < blanks[1].start(),
"overlapping or adjacent ranges detected"
);
*blanks[0].end() + 1..=*blanks[1].start() - 1
})
.collect::<Vec<_>>();
// Outside the blanks
if let Some(first_blank_start) = self.blanks.first().map(RangeInclusive::start) {
if *first_blank_start != 0 {
covered_space.insert(0, 0..=first_blank_start - 1);
}
}
if let Some(last_blank_end) = self.blanks.last().map(RangeInclusive::end) {
if *last_blank_end != u128::MAX {
covered_space.push(last_blank_end + 1..=u128::MAX);
}
}
// Extend the first range and assign the null value to it.
let null_value = if let Some(first_covered_space) = covered_space.first_mut() {
// in case the first covered space ends at u128::MAX, assign null to the beginning
if *first_covered_space.end() == u128::MAX {
*first_covered_space = first_covered_space.start() - 1..=*first_covered_space.end();
*first_covered_space.start()
} else {
*first_covered_space = *first_covered_space.start()..=first_covered_space.end() + 1;
*first_covered_space.end()
}
} else {
covered_space.push(0..=0); // empty data case
0u128
};
let mut compact_start: u64 = 0;
let mut ranges_and_compact_start = Vec::with_capacity(covered_space.len());
for cov in covered_space {
let covered_range_len = cov.end() - cov.start() + 1; // e.g. 0..=1 covered space 1-0+1= 2
ranges_and_compact_start.push((cov, compact_start));
compact_start += covered_range_len as u64;
}
CompactSpace {
ranges_and_compact_start,
null_value,
}
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
struct CompactSpace {
pub struct CompactSpace {
ranges_and_compact_start: Vec<(RangeInclusive<u128>, u64)>,
pub null_value: u128,
}
@@ -400,24 +155,19 @@ pub struct IPCodecParams {
}
impl CompactSpaceCompressor {
pub fn null_value(&self) -> u128 {
self.params.null_value
pub fn null_value_compact_space(&self) -> u64 {
self.params.null_value_compact_space
}
/// Taking the vals as Vec may cost a lot of memory.
/// It is used to sort the vals.
///
/// The lower memory alternative to just store the index (u32) and use that as sorting may be an
/// issue for the merge case, where random access is more expensive.
///
/// TODO: Should we take Option<u128> here? (better api, but 24bytes instead 16 per element)
pub fn train_from(mut vals: Vec<u128>) -> Self {
let total_num_values = vals.len(); // TODO: Null values should be here too
vals.sort();
// We don't care for duplicates
vals.dedup();
vals.shrink_to_fit();
train(&vals, total_num_values)
/// Taking the vals as Vec may cost a lot of memory. It is used to sort the vals.
pub fn train_from(
vals: impl Iterator<Item = u128>,
total_num_values_incl_nulls: usize,
) -> Self {
let mut tree = BTreeSet::new();
tree.extend(vals);
assert!(tree.len() <= total_num_values_incl_nulls);
train(&tree, total_num_values_incl_nulls)
}
fn to_compact(&self, value: u128) -> u64 {
@@ -435,22 +185,25 @@ impl CompactSpaceCompressor {
Ok(())
}
pub fn compress(self, vals: impl Iterator<Item = u128>) -> io::Result<Vec<u8>> {
pub fn compress(self, vals: impl Iterator<Item = Option<u128>>) -> io::Result<Vec<u8>> {
let mut output = vec![];
self.compress_into(vals, &mut output)?;
Ok(output)
}
/// TODO: Should we take Option<u128> here? Otherwise the caller has to replace None with
/// `self.null_value()`
pub fn compress_into(
self,
vals: impl Iterator<Item = u128>,
vals: impl Iterator<Item = Option<u128>>,
write: &mut impl Write,
) -> io::Result<()> {
let mut bitpacker = BitPacker::default();
let mut num_vals = 0;
for val in vals {
let compact = self.to_compact(val);
let compact = if let Some(val) = val {
self.to_compact(val)
} else {
self.null_value_compact_space()
};
bitpacker
.write(compact, self.params.num_bits, write)
.unwrap();
@@ -462,7 +215,7 @@ impl CompactSpaceCompressor {
}
}
fn train(values_sorted: &[u128], total_num_values: usize) -> CompactSpaceCompressor {
fn train(values_sorted: &BTreeSet<u128>, total_num_values: usize) -> CompactSpaceCompressor {
let compact_space = get_compact_space(values_sorted, total_num_values, COST_PER_BLANK_IN_BITS);
let null_value = compact_space.null_value;
let null_compact_space = compact_space
@@ -476,8 +229,8 @@ fn train(values_sorted: &[u128], total_num_values: usize) -> CompactSpaceCompres
);
let num_bits = tantivy_bitpacker::compute_num_bits(amplitude_compact_space as u64);
let min_value = *values_sorted.first().unwrap_or(&0);
let max_value = *values_sorted.last().unwrap_or(&0);
let min_value = *values_sorted.iter().next().unwrap_or(&0);
let max_value = *values_sorted.iter().last().unwrap_or(&0);
let compressor = CompactSpaceCompressor {
params: IPCodecParams {
compact_space,
@@ -491,8 +244,8 @@ fn train(values_sorted: &[u128], total_num_values: usize) -> CompactSpaceCompres
},
};
let max_value = *values_sorted.last().unwrap_or(&0u128).max(&null_value);
assert!(compressor.to_compact(max_value) < amplitude_compact_space as u64);
let max_value_in_value_space = max_value.max(null_value);
assert!(compressor.to_compact(max_value_in_value_space) < amplitude_compact_space as u64);
compressor
}
@@ -700,27 +453,13 @@ mod tests {
use super::*;
#[test]
fn test_binary_heap_pop_order() {
let mut blanks: BinaryHeap<BlankRange> = BinaryHeap::new();
blanks.push(BlankRange {
blank_range: 0..=10,
});
blanks.push(BlankRange {
blank_range: 100..=200,
});
blanks.push(BlankRange {
blank_range: 100..=110,
});
assert_eq!(blanks.pop().unwrap().blank_size(), 100);
assert_eq!(blanks.pop().unwrap().blank_size(), 10);
}
#[test]
fn compact_space_test() {
let ips = &[
2u128, 4u128, 1000, 1001, 1002, 1003, 1004, 1005, 1008, 1010, 1012, 1260,
];
]
.into_iter()
.collect();
let compact_space = get_compact_space(ips, ips.len(), 11);
assert_eq!(compact_space.null_value, 5);
let amplitude = compact_space.amplitude_compact_space();
@@ -745,7 +484,7 @@ mod tests {
#[test]
fn compact_space_amplitude_test() {
let ips = &[100000u128, 1000000];
let ips = &[100000u128, 1000000].into_iter().collect();
let compact_space = get_compact_space(ips, ips.len(), 1);
assert_eq!(compact_space.null_value, 100001);
let amplitude = compact_space.amplitude_compact_space();
@@ -754,8 +493,7 @@ mod tests {
fn test_all(data: OwnedBytes, expected: &[u128]) {
let decompressor = CompactSpaceDecompressor::open(data).unwrap();
for idx in 0..decompressor.params.num_vals as usize {
let expected_val = expected[idx];
for (idx, expected_val) in expected.iter().cloned().enumerate() {
let val = decompressor.get(idx as u64);
assert_eq!(val, Some(expected_val));
let positions = decompressor.get_range(expected_val.saturating_sub(1)..=expected_val);
@@ -771,8 +509,11 @@ mod tests {
}
fn test_aux_vals(u128_vals: &[u128]) -> OwnedBytes {
let compressor = CompactSpaceCompressor::train_from(u128_vals.to_vec());
let data = compressor.compress(u128_vals.iter().cloned()).unwrap();
let compressor =
CompactSpaceCompressor::train_from(u128_vals.iter().cloned(), u128_vals.len());
let data = compressor
.compress(u128_vals.iter().cloned().map(Some))
.unwrap();
let data = OwnedBytes::new(data);
test_all(data.clone(), u128_vals);
data
@@ -846,8 +587,8 @@ mod tests {
#[test]
fn test_null() {
let vals = &[2u128];
let compressor = CompactSpaceCompressor::train_from(vals.to_vec());
let vals = vec![compressor.null_value(), 2u128];
let compressor = CompactSpaceCompressor::train_from(vals.iter().cloned(), vals.len());
let vals = vec![None, Some(2u128)];
let data = compressor.compress(vals.iter().cloned()).unwrap();
let decomp = CompactSpaceDecompressor::open(OwnedBytes::new(data)).unwrap();
let positions = decomp.get_range(0..=1);
@@ -885,9 +626,8 @@ mod tests {
1_000_000,
5_000_000_000,
];
let compressor = CompactSpaceCompressor::train_from(vals.to_vec());
// let vals = vec![compressor.null_value(), 2u128];
let data = compressor.compress(vals.iter().cloned()).unwrap();
let compressor = CompactSpaceCompressor::train_from(vals.iter().cloned(), vals.len());
let data = compressor.compress(vals.iter().cloned().map(Some)).unwrap();
let decomp = CompactSpaceDecompressor::open(OwnedBytes::new(data)).unwrap();
assert_eq!(decomp.get_range(199..=200), vec![0]);

View File

@@ -0,0 +1,42 @@
use std::ops::RangeInclusive;
/// The range of a blank in value space.
///
/// A blank is an unoccupied space in the data.
/// Ordered by size
///
/// Use the try_into(), invalid ranges will be rejected.
#[derive(Debug, Eq, PartialEq, Clone)]
pub(crate) struct BlankRange {
blank_range: RangeInclusive<u128>,
}
impl TryFrom<RangeInclusive<u128>> for BlankRange {
type Error = &'static str;
fn try_from(range: RangeInclusive<u128>) -> Result<Self, Self::Error> {
let blank_size = range.end().saturating_sub(*range.start());
if blank_size < 2 {
Err("invalid range")
} else {
Ok(BlankRange { blank_range: range })
}
}
}
impl BlankRange {
pub(crate) fn blank_size(&self) -> u128 {
self.blank_range.end() - self.blank_range.start()
}
pub(crate) fn blank_range(&self) -> RangeInclusive<u128> {
self.blank_range.clone()
}
}
impl Ord for BlankRange {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.blank_size().cmp(&other.blank_size())
}
}
impl PartialOrd for BlankRange {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
self.blank_size().partial_cmp(&other.blank_size())
}
}

View File

@@ -0,0 +1,237 @@
use std::collections::{BTreeSet, BinaryHeap};
use std::ops::RangeInclusive;
use itertools::Itertools;
use super::blank_range::BlankRange;
use super::CompactSpace;
/// Put the blanks for the sorted values into a binary heap
fn get_blanks(values_sorted: &BTreeSet<u128>) -> BinaryHeap<BlankRange> {
let mut blanks: BinaryHeap<BlankRange> = BinaryHeap::new();
let mut add_range = |blank_range: RangeInclusive<u128>| {
let blank_range: Result<BlankRange, _> = blank_range.try_into();
if let Ok(blank_range) = blank_range {
blanks.push(blank_range);
}
};
for (first, second) in values_sorted.iter().tuple_windows() {
// Correctness Overflow: the values are deduped and sorted (BTreeSet property), that means
// there's always space between two values.
let blank_range = first + 1..=second - 1;
add_range(blank_range);
}
// Replace after stabilization of https://github.com/rust-lang/rust/issues/62924
// Add preceeding range if values don't start at 0
if let Some(first_val) = values_sorted.iter().next() {
if *first_val != 0 {
let blank_range = 0..=first_val - 1;
add_range(blank_range);
}
}
// Add succeeding range if values don't end at u128::MAX
if let Some(last_val) = values_sorted.iter().last() {
if *last_val != u128::MAX {
let blank_range = last_val + 1..=u128::MAX;
add_range(blank_range);
}
}
blanks
}
struct BlankCollector {
blanks: Vec<BlankRange>,
staged_blanks_sum: u128,
}
impl BlankCollector {
fn new() -> Self {
Self {
blanks: vec![],
staged_blanks_sum: 0,
}
}
fn stage_blank(&mut self, blank: BlankRange) {
self.staged_blanks_sum += blank.blank_size();
self.blanks.push(blank);
}
fn drain(&mut self) -> std::vec::Drain<'_, BlankRange> {
self.staged_blanks_sum = 0;
self.blanks.drain(..)
}
fn staged_blanks_sum(&self) -> u128 {
self.staged_blanks_sum
}
fn num_blanks(&self) -> usize {
self.blanks.len()
}
}
fn num_bits(val: u128) -> u8 {
(128u32 - val.leading_zeros()) as u8
}
/// Will collect blanks and add them to compact space if more bits are saved than cost from
/// metadata.
pub fn get_compact_space(
values_deduped_sorted: &BTreeSet<u128>,
total_num_values: usize,
cost_per_blank: usize,
) -> CompactSpace {
let mut blanks = get_blanks(values_deduped_sorted);
let mut amplitude_compact_space = u128::MAX;
let mut amplitude_bits: u8 = num_bits(amplitude_compact_space);
let mut compact_space = CompactSpaceBuilder::new();
if values_deduped_sorted.is_empty() {
return compact_space.finish();
}
let mut blank_collector = BlankCollector::new();
// We will stage blanks until they reduce the compact space by 1 bit.
// Binary heap to process the gaps by their size
while let Some(blank_range) = blanks.pop() {
blank_collector.stage_blank(blank_range);
let staged_spaces_sum: u128 = blank_collector.staged_blanks_sum();
// +1 for later added null value
let amplitude_new_compact_space = amplitude_compact_space - staged_spaces_sum + 1;
let amplitude_new_bits = num_bits(amplitude_new_compact_space);
if amplitude_bits == amplitude_new_bits {
continue;
}
let saved_bits = (amplitude_bits - amplitude_new_bits) as usize * total_num_values;
// TODO: Maybe calculate exact cost of blanks and run this more expensive computation only,
// when amplitude_new_bits changes
let cost = blank_collector.num_blanks() * cost_per_blank;
if cost >= saved_bits {
// Continue here, since although we walk over the blanks by size,
// we can potentially save a lot at the last bits, which are smaller blanks
//
// E.g. if the first range reduces the compact space by 1000 from 2000 to 1000, which
// saves 11-10=1 bit and the next range reduces the compact space by 950 to
// 50, which saves 10-6=4 bit
continue;
}
amplitude_compact_space = amplitude_new_compact_space;
amplitude_bits = amplitude_new_bits;
compact_space.add_blanks(blank_collector.drain().map(|blank| blank.blank_range()));
}
// special case, when we don't collected any blanks because:
// * the data is empty (early exit)
// * the algorithm did decide it's not worth the cost, which can be the case for single values
//
// We drain one collected blank unconditionally, so the empty case is reserved for empty
// data, and therefore empty compact_space means the data is empty and no data is covered
// (conversely to all data) and we can assign null to it.
if compact_space.is_empty() {
compact_space.add_blanks(
blank_collector
.drain()
.map(|blank| blank.blank_range())
.take(1),
);
}
compact_space.finish()
}
#[derive(Debug, Clone, Eq, PartialEq)]
struct CompactSpaceBuilder {
blanks: Vec<RangeInclusive<u128>>,
}
impl CompactSpaceBuilder {
/// Creates a new compact space builder which will initially cover the whole space.
fn new() -> Self {
Self { blanks: vec![] }
}
/// Assumes that repeated add_blank calls don't overlap and are not adjacent,
/// e.g. [3..=5, 5..=10] is not allowed
///
/// Both of those assumptions are true when blanks are produced from sorted values.
fn add_blanks(&mut self, blank: impl Iterator<Item = RangeInclusive<u128>>) {
self.blanks.extend(blank);
}
fn is_empty(&self) -> bool {
self.blanks.is_empty()
}
/// Convert blanks to covered space and assign null value
fn finish(mut self) -> CompactSpace {
// sort by start since ranges are not allowed to overlap
self.blanks.sort_by_key(|blank| *blank.start());
// Between the blanks
let mut covered_space = self
.blanks
.windows(2)
.map(|blanks| {
assert!(
blanks[0].end() < blanks[1].start(),
"overlapping or adjacent ranges detected"
);
*blanks[0].end() + 1..=*blanks[1].start() - 1
})
.collect::<Vec<_>>();
// Outside the blanks
if let Some(first_blank_start) = self.blanks.first().map(RangeInclusive::start) {
if *first_blank_start != 0 {
covered_space.insert(0, 0..=first_blank_start - 1);
}
}
if let Some(last_blank_end) = self.blanks.last().map(RangeInclusive::end) {
if *last_blank_end != u128::MAX {
covered_space.push(last_blank_end + 1..=u128::MAX);
}
}
// Extend the first range and assign the null value to it.
let null_value = if let Some(first_covered_space) = covered_space.first_mut() {
// in case the first covered space ends at u128::MAX, assign null to the beginning
if *first_covered_space.end() == u128::MAX {
*first_covered_space = first_covered_space.start() - 1..=*first_covered_space.end();
*first_covered_space.start()
} else {
*first_covered_space = *first_covered_space.start()..=first_covered_space.end() + 1;
*first_covered_space.end()
}
} else {
covered_space.push(0..=0); // empty data case
0u128
};
let mut compact_start: u64 = 0;
let mut ranges_and_compact_start = Vec::with_capacity(covered_space.len());
for cov in covered_space {
let covered_range_len = cov.end() - cov.start() + 1; // e.g. 0..=1 covered space 1-0+1= 2
ranges_and_compact_start.push((cov, compact_start));
compact_start += covered_range_len as u64;
}
CompactSpace {
ranges_and_compact_start,
null_value,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_binary_heap_pop_order() {
let mut blanks: BinaryHeap<BlankRange> = BinaryHeap::new();
blanks.push((0..=10).try_into().unwrap());
blanks.push((100..=200).try_into().unwrap());
blanks.push((100..=110).try_into().unwrap());
assert_eq!(blanks.pop().unwrap().blank_size(), 100);
assert_eq!(blanks.pop().unwrap().blank_size(), 10);
}
}

View File

@@ -7,9 +7,12 @@ use std::net::{IpAddr, Ipv6Addr};
use std::str::FromStr;
use fastfield_codecs::{
Column, CompactSpaceCompressor, FastFieldCodecType, FastFieldStats, VecColumn,
Column, CompactSpaceCompressor, CompactSpaceDecompressor, FastFieldCodecType, FastFieldStats,
VecColumn,
};
use itertools::Itertools;
use measure_time::print_time;
use ownedbytes::OwnedBytes;
use prettytable::{Cell, Row, Table};
fn print_set_stats(ip_addrs: &[u128]) {
@@ -51,12 +54,6 @@ fn print_set_stats(ip_addrs: &[u128]) {
b.1.cmp(&a.1)
}
});
// println!("\n\n----\nIP Address histogram");
// println!("IPAddrCount\tFrequency");
// for (ip_addr_count, times) in cnts {
// println!("{}\t{}", ip_addr_count, times);
//}
}
fn ip_dataset() -> Vec<u128> {
@@ -96,9 +93,10 @@ fn bench_ip() {
{
let mut data = vec![];
for dataset in dataset.chunks(50_000) {
let compressor = CompactSpaceCompressor::train_from(dataset.to_vec());
let compressor =
CompactSpaceCompressor::train_from(dataset.iter().cloned(), dataset.len());
compressor
.compress_into(dataset.iter().cloned(), &mut data)
.compress_into(dataset.iter().cloned().map(Some), &mut data)
.unwrap();
}
let compression = data.len() as f64 / (dataset.len() * 16) as f64;
@@ -109,8 +107,10 @@ fn bench_ip() {
);
}
let compressor = CompactSpaceCompressor::train_from(dataset.to_vec());
let data = compressor.compress(dataset.iter().cloned()).unwrap();
let compressor = CompactSpaceCompressor::train_from(dataset.iter().cloned(), dataset.len());
let data = compressor
.compress(dataset.iter().cloned().map(Some))
.unwrap();
let compression = data.len() as f64 / (dataset.len() * 16) as f64;
println!("Compression {:.2}", compression);
@@ -119,12 +119,13 @@ fn bench_ip() {
(data.len() * 8) as f32 / dataset.len() as f32
);
// let decompressor = CompactSpaceDecompressor::open(OwnedBytes::new(data)).unwrap();
// for i in 11100..11150 {
// print_time!("get range");
// let doc_values = decompressor.get_range(dataset[i]..=dataset[i]);
// println!("{:?}", doc_values.len());
//}
let decompressor = CompactSpaceDecompressor::open(OwnedBytes::new(data)).unwrap();
// Sample some ranges
for value in dataset.iter().take(1110).skip(1100).cloned() {
print_time!("get range");
let doc_values = decompressor.get_range(value..=value);
println!("{:?}", doc_values.len());
}
}
fn main() {