mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-05 16:52:55 +00:00
Int based linear interpol (#1482)
* Rename BlockwiseLinear to BlockwiseLinearLegacy Reimplements the blockwise multilinear codec using integer arithmetics. Added comments * add estimate for blockwise * Added one unit test * use int based for linear interpol * fix merge conflicts * reuse code * cargo fmt * fix clippy * fix test * fix off by one fix off by one to accurately interpolate autoincrement fields * extend test, fix estimate * remove legacy codec Co-authored-by: Pascal Seitz <pascal.seitz@gmail.com>
This commit is contained in:
@@ -1,436 +1,223 @@
|
||||
//! The BlockwiseLinear codec uses linear interpolation to guess a values and stores the
|
||||
//! offset, but in blocks of 512.
|
||||
//!
|
||||
//! With a CHUNK_SIZE of 512 and 29 byte metadata per block, we get a overhead for metadata of 232 /
|
||||
//! 512 = 0,45 bits per element. The additional space required per element in a block is the the
|
||||
//! maximum deviation of the linear interpolation estimation function.
|
||||
//!
|
||||
//! E.g. if the maximum deviation of an element is 12, all elements cost 4bits.
|
||||
//!
|
||||
//! Size per block:
|
||||
//! Num Elements * Maximum Deviation from Interpolation + 29 Byte Metadata
|
||||
|
||||
use std::io::{self, Read, Write};
|
||||
use std::ops::Sub;
|
||||
use std::io;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common::{BinarySerializable, CountingWriter, DeserializeFrom};
|
||||
use ownedbytes::OwnedBytes;
|
||||
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
|
||||
|
||||
use crate::linear::{get_calculated_value, get_slope};
|
||||
use crate::{Column, FastFieldCodec, FastFieldCodecType};
|
||||
use crate::line::Line;
|
||||
use crate::{Column, FastFieldCodec, FastFieldCodecType, VecColumn};
|
||||
|
||||
const CHUNK_SIZE: u64 = 512;
|
||||
const CHUNK_SIZE: usize = 512;
|
||||
|
||||
/// Depending on the field type, a different
|
||||
/// fast field is required.
|
||||
#[derive(Clone)]
|
||||
pub struct BlockwiseLinearReader {
|
||||
data: OwnedBytes,
|
||||
pub footer: BlockwiseLinearFooter,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
struct Function {
|
||||
// The offset in the data is required, because we have different bit_widths per block
|
||||
data_start_offset: u64,
|
||||
// start_pos in the block will be CHUNK_SIZE * BLOCK_NUM
|
||||
start_pos: u64,
|
||||
// only used during serialization, 0 after deserialization
|
||||
end_pos: u64,
|
||||
// only used during serialization, 0 after deserialization
|
||||
value_start_pos: u64,
|
||||
// only used during serialization, 0 after deserialization
|
||||
value_end_pos: u64,
|
||||
slope: f32,
|
||||
// The offset so that all values are positive when writing them
|
||||
positive_val_offset: u64,
|
||||
num_bits: u8,
|
||||
#[derive(Debug, Default)]
|
||||
struct Block {
|
||||
line: Line,
|
||||
bit_unpacker: BitUnpacker,
|
||||
data_start_offset: usize,
|
||||
}
|
||||
|
||||
impl Function {
|
||||
fn calc_slope(&mut self) {
|
||||
let num_vals = self.end_pos - self.start_pos;
|
||||
self.slope = get_slope(self.value_start_pos, self.value_end_pos, num_vals);
|
||||
}
|
||||
// split the interpolation into two function, change self and return the second split
|
||||
fn split(&mut self, split_pos: u64, split_pos_value: u64) -> Function {
|
||||
let mut new_function = Function {
|
||||
start_pos: split_pos,
|
||||
end_pos: self.end_pos,
|
||||
value_start_pos: split_pos_value,
|
||||
value_end_pos: self.value_end_pos,
|
||||
..Default::default()
|
||||
};
|
||||
new_function.calc_slope();
|
||||
self.end_pos = split_pos;
|
||||
self.value_end_pos = split_pos_value;
|
||||
self.calc_slope();
|
||||
new_function
|
||||
}
|
||||
}
|
||||
|
||||
impl BinarySerializable for Function {
|
||||
fn serialize<W: Write>(&self, write: &mut W) -> io::Result<()> {
|
||||
self.data_start_offset.serialize(write)?;
|
||||
self.value_start_pos.serialize(write)?;
|
||||
self.positive_val_offset.serialize(write)?;
|
||||
self.slope.serialize(write)?;
|
||||
self.num_bits.serialize(write)?;
|
||||
impl BinarySerializable for Block {
|
||||
fn serialize<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
|
||||
self.line.serialize(writer)?;
|
||||
self.bit_unpacker.bit_width().serialize(writer)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Function> {
|
||||
let data_start_offset = u64::deserialize(reader)?;
|
||||
let value_start_pos = u64::deserialize(reader)?;
|
||||
let offset = u64::deserialize(reader)?;
|
||||
let slope = f32::deserialize(reader)?;
|
||||
let num_bits = u8::deserialize(reader)?;
|
||||
let interpolation = Function {
|
||||
data_start_offset,
|
||||
value_start_pos,
|
||||
positive_val_offset: offset,
|
||||
num_bits,
|
||||
bit_unpacker: BitUnpacker::new(num_bits),
|
||||
slope,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
Ok(interpolation)
|
||||
fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<Self> {
|
||||
let line = Line::deserialize(reader)?;
|
||||
let bit_width = u8::deserialize(reader)?;
|
||||
Ok(Block {
|
||||
line,
|
||||
bit_unpacker: BitUnpacker::new(bit_width),
|
||||
data_start_offset: 0,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct BlockwiseLinearFooter {
|
||||
pub num_vals: u64,
|
||||
pub min_value: u64,
|
||||
pub max_value: u64,
|
||||
interpolations: Vec<Function>,
|
||||
#[derive(Debug)]
|
||||
struct BlockwiseLinearParams {
|
||||
num_vals: u64,
|
||||
min_value: u64,
|
||||
max_value: u64,
|
||||
blocks: Vec<Block>,
|
||||
}
|
||||
|
||||
impl BinarySerializable for BlockwiseLinearFooter {
|
||||
fn serialize<W: Write>(&self, write: &mut W) -> io::Result<()> {
|
||||
let mut out = vec![];
|
||||
self.num_vals.serialize(&mut out)?;
|
||||
self.min_value.serialize(&mut out)?;
|
||||
self.max_value.serialize(&mut out)?;
|
||||
self.interpolations.serialize(&mut out)?;
|
||||
write.write_all(&out)?;
|
||||
(out.len() as u32).serialize(write)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn deserialize<R: Read>(reader: &mut R) -> io::Result<BlockwiseLinearFooter> {
|
||||
let mut footer = BlockwiseLinearFooter {
|
||||
num_vals: u64::deserialize(reader)?,
|
||||
min_value: u64::deserialize(reader)?,
|
||||
max_value: u64::deserialize(reader)?,
|
||||
interpolations: Vec::<Function>::deserialize(reader)?,
|
||||
};
|
||||
for (num, interpol) in footer.interpolations.iter_mut().enumerate() {
|
||||
interpol.start_pos = CHUNK_SIZE * num as u64;
|
||||
impl BinarySerializable for BlockwiseLinearParams {
|
||||
fn serialize<W: io::Write>(&self, wrt: &mut W) -> io::Result<()> {
|
||||
self.num_vals.serialize(wrt)?;
|
||||
self.min_value.serialize(wrt)?;
|
||||
self.max_value.serialize(wrt)?;
|
||||
let expected_num_blocks = compute_num_blocks(self.num_vals);
|
||||
assert_eq!(expected_num_blocks, self.blocks.len());
|
||||
for block in &self.blocks {
|
||||
block.serialize(wrt)?;
|
||||
}
|
||||
Ok(footer)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<BlockwiseLinearParams> {
|
||||
let num_vals = u64::deserialize(reader)?;
|
||||
let min_value = u64::deserialize(reader)?;
|
||||
let max_value = u64::deserialize(reader)?;
|
||||
let num_blocks = compute_num_blocks(num_vals);
|
||||
let mut blocks = Vec::with_capacity(num_blocks);
|
||||
for _ in 0..num_blocks {
|
||||
blocks.push(Block::deserialize(reader)?);
|
||||
}
|
||||
Ok(BlockwiseLinearParams {
|
||||
num_vals,
|
||||
min_value,
|
||||
max_value,
|
||||
blocks,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn get_interpolation_position(doc: u64) -> usize {
|
||||
let index = doc / CHUNK_SIZE;
|
||||
index as usize
|
||||
fn compute_num_blocks(num_vals: u64) -> usize {
|
||||
(num_vals as usize + CHUNK_SIZE - 1) / CHUNK_SIZE
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn get_interpolation_function(doc: u64, interpolations: &[Function]) -> &Function {
|
||||
&interpolations[get_interpolation_position(doc)]
|
||||
}
|
||||
|
||||
impl Column for BlockwiseLinearReader {
|
||||
#[inline]
|
||||
fn get_val(&self, idx: u64) -> u64 {
|
||||
let interpolation = get_interpolation_function(idx, &self.footer.interpolations);
|
||||
let in_block_idx = idx - interpolation.start_pos;
|
||||
let calculated_value = get_calculated_value(
|
||||
interpolation.value_start_pos,
|
||||
in_block_idx,
|
||||
interpolation.slope,
|
||||
);
|
||||
let diff = interpolation.bit_unpacker.get(
|
||||
in_block_idx,
|
||||
&self.data[interpolation.data_start_offset as usize..],
|
||||
);
|
||||
(calculated_value + diff) - interpolation.positive_val_offset
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn min_value(&self) -> u64 {
|
||||
self.footer.min_value
|
||||
}
|
||||
#[inline]
|
||||
fn max_value(&self) -> u64 {
|
||||
self.footer.max_value
|
||||
}
|
||||
#[inline]
|
||||
fn num_vals(&self) -> u64 {
|
||||
self.footer.num_vals
|
||||
}
|
||||
}
|
||||
|
||||
/// Same as LinearSerializer, but working on chunks of CHUNK_SIZE elements.
|
||||
pub struct BlockwiseLinearCodec;
|
||||
|
||||
impl FastFieldCodec for BlockwiseLinearCodec {
|
||||
const CODEC_TYPE: FastFieldCodecType = FastFieldCodecType::BlockwiseLinear;
|
||||
|
||||
const CODEC_TYPE: crate::FastFieldCodecType = FastFieldCodecType::BlockwiseLinear;
|
||||
type Reader = BlockwiseLinearReader;
|
||||
|
||||
/// Opens a fast field given a file.
|
||||
fn open_from_bytes(bytes: OwnedBytes) -> io::Result<Self::Reader> {
|
||||
fn open_from_bytes(bytes: ownedbytes::OwnedBytes) -> io::Result<Self::Reader> {
|
||||
let footer_len: u32 = (&bytes[bytes.len() - 4..]).deserialize()?;
|
||||
let footer_offset = bytes.len() - 4 - footer_len as usize;
|
||||
let (data, mut footer) = bytes.split(footer_offset);
|
||||
let footer = BlockwiseLinearFooter::deserialize(&mut footer)?;
|
||||
Ok(BlockwiseLinearReader { data, footer })
|
||||
let mut params = BlockwiseLinearParams::deserialize(&mut footer)?;
|
||||
let mut start_offset = 0;
|
||||
for block in params.blocks.iter_mut() {
|
||||
block.data_start_offset = start_offset;
|
||||
start_offset += (block.bit_unpacker.bit_width() as usize) * CHUNK_SIZE / 8;
|
||||
}
|
||||
Ok(BlockwiseLinearReader {
|
||||
params: Arc::new(params),
|
||||
data,
|
||||
})
|
||||
}
|
||||
|
||||
/// Creates a new fast field serializer.
|
||||
fn serialize(write: &mut impl Write, fastfield_accessor: &dyn Column) -> io::Result<()> {
|
||||
assert!(fastfield_accessor.min_value() <= fastfield_accessor.max_value());
|
||||
|
||||
let first_val = fastfield_accessor.get_val(0);
|
||||
let last_val = fastfield_accessor.get_val(fastfield_accessor.num_vals() as u64 - 1);
|
||||
|
||||
let mut first_function = Function {
|
||||
end_pos: fastfield_accessor.num_vals(),
|
||||
value_start_pos: first_val,
|
||||
value_end_pos: last_val,
|
||||
..Default::default()
|
||||
};
|
||||
first_function.calc_slope();
|
||||
let mut interpolations = vec![first_function];
|
||||
|
||||
// Since we potentially apply multiple passes over the data, the data is cached.
|
||||
// Multiple iteration can be expensive (merge with index sorting can add lot of overhead per
|
||||
// iteration)
|
||||
let data = fastfield_accessor.iter().collect::<Vec<_>>();
|
||||
|
||||
//// let's split this into chunks of CHUNK_SIZE
|
||||
for data_pos in (0..data.len() as u64).step_by(CHUNK_SIZE as usize).skip(1) {
|
||||
let new_fun = {
|
||||
let current_interpolation = interpolations.last_mut().unwrap();
|
||||
current_interpolation.split(data_pos, data[data_pos as usize])
|
||||
};
|
||||
interpolations.push(new_fun);
|
||||
}
|
||||
// calculate offset and max (-> numbits) for each function
|
||||
for interpolation in &mut interpolations {
|
||||
let mut offset = 0;
|
||||
let mut rel_positive_max = 0;
|
||||
for (pos, actual_value) in data
|
||||
[interpolation.start_pos as usize..interpolation.end_pos as usize]
|
||||
.iter()
|
||||
.cloned()
|
||||
.enumerate()
|
||||
{
|
||||
let calculated_value = get_calculated_value(
|
||||
interpolation.value_start_pos,
|
||||
pos as u64,
|
||||
interpolation.slope,
|
||||
);
|
||||
if calculated_value > actual_value {
|
||||
// negative value we need to apply an offset
|
||||
// we ignore negative values in the max value calculation, because negative
|
||||
// values will be offset to 0
|
||||
offset = offset.max(calculated_value - actual_value);
|
||||
} else {
|
||||
// positive value no offset reuqired
|
||||
rel_positive_max = rel_positive_max.max(actual_value - calculated_value);
|
||||
}
|
||||
}
|
||||
|
||||
interpolation.positive_val_offset = offset;
|
||||
interpolation.num_bits = compute_num_bits(rel_positive_max + offset);
|
||||
}
|
||||
let mut bit_packer = BitPacker::new();
|
||||
|
||||
let write = &mut CountingWriter::wrap(write);
|
||||
for interpolation in &mut interpolations {
|
||||
interpolation.data_start_offset = write.written_bytes();
|
||||
let num_bits = interpolation.num_bits;
|
||||
for (pos, actual_value) in data
|
||||
[interpolation.start_pos as usize..interpolation.end_pos as usize]
|
||||
.iter()
|
||||
.cloned()
|
||||
.enumerate()
|
||||
{
|
||||
let calculated_value = get_calculated_value(
|
||||
interpolation.value_start_pos,
|
||||
pos as u64,
|
||||
interpolation.slope,
|
||||
);
|
||||
let diff = (actual_value + interpolation.positive_val_offset) - calculated_value;
|
||||
bit_packer.write(diff, num_bits, write)?;
|
||||
}
|
||||
bit_packer.flush(write)?;
|
||||
}
|
||||
bit_packer.close(write)?;
|
||||
|
||||
let footer = BlockwiseLinearFooter {
|
||||
num_vals: fastfield_accessor.num_vals(),
|
||||
min_value: fastfield_accessor.min_value(),
|
||||
max_value: fastfield_accessor.max_value(),
|
||||
interpolations,
|
||||
};
|
||||
footer.serialize(write)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// estimation for linear interpolation is hard because, you don't know
|
||||
/// where the local maxima are for the deviation of the calculated value and
|
||||
/// the offset is also unknown.
|
||||
#[allow(clippy::question_mark)]
|
||||
fn estimate(fastfield_accessor: &impl Column) -> Option<f32> {
|
||||
if fastfield_accessor.num_vals() < 10 * CHUNK_SIZE {
|
||||
// Estimate first_chunk and extrapolate
|
||||
fn estimate(fastfield_accessor: &impl crate::Column) -> Option<f32> {
|
||||
if fastfield_accessor.num_vals() < 10 * CHUNK_SIZE as u64 {
|
||||
return None;
|
||||
}
|
||||
|
||||
// On serialization the offset is added to the actual value.
|
||||
// We need to make sure this won't run into overflow calculation issues.
|
||||
// For this we take the maximum theroretical offset and add this to the max value.
|
||||
// If this doesn't overflow the algorithm should be fine
|
||||
let theorethical_maximum_offset =
|
||||
fastfield_accessor.max_value() - fastfield_accessor.min_value();
|
||||
if fastfield_accessor
|
||||
.max_value()
|
||||
.checked_add(theorethical_maximum_offset)
|
||||
.is_none()
|
||||
{
|
||||
return None;
|
||||
}
|
||||
|
||||
let first_val_in_first_block = fastfield_accessor.get_val(0);
|
||||
let last_elem_in_first_chunk = CHUNK_SIZE.min(fastfield_accessor.num_vals());
|
||||
let last_val_in_first_block =
|
||||
fastfield_accessor.get_val(last_elem_in_first_chunk as u64 - 1);
|
||||
let slope = get_slope(
|
||||
first_val_in_first_block,
|
||||
last_val_in_first_block,
|
||||
fastfield_accessor.num_vals(),
|
||||
);
|
||||
|
||||
// let's sample at 0%, 5%, 10% .. 95%, 100%, but for the first block only
|
||||
let sample_positions = (0..20)
|
||||
.map(|pos| (last_elem_in_first_chunk as f32 / 100.0 * pos as f32 * 5.0) as usize)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let max_distance = sample_positions
|
||||
let mut first_chunk: Vec<u64> = fastfield_accessor
|
||||
.iter()
|
||||
.map(|pos| {
|
||||
let calculated_value =
|
||||
get_calculated_value(first_val_in_first_block, *pos as u64, slope);
|
||||
let actual_value = fastfield_accessor.get_val(*pos as u64);
|
||||
distance(calculated_value, actual_value)
|
||||
})
|
||||
.take(CHUNK_SIZE as usize)
|
||||
.collect();
|
||||
let line = Line::train(&VecColumn::from(&first_chunk));
|
||||
for (i, buffer_val) in first_chunk.iter_mut().enumerate() {
|
||||
let interpolated_val = line.eval(i as u64);
|
||||
*buffer_val = buffer_val.wrapping_sub(interpolated_val);
|
||||
}
|
||||
let estimated_bit_width = first_chunk
|
||||
.iter()
|
||||
.map(|el| ((el + 1) as f32 * 3.0) as u64)
|
||||
.map(compute_num_bits)
|
||||
.max()
|
||||
.unwrap();
|
||||
|
||||
// Estimate one block and extrapolate the cost to all blocks.
|
||||
// the theory would be that we don't have the actual max_distance, but we are close within
|
||||
// 50% threshold.
|
||||
// It is multiplied by 2 because in a log case scenario the line would be as much above as
|
||||
// below. So the offset would = max_distance
|
||||
//
|
||||
let relative_max_value = (max_distance as f32 * 1.5) * 2.0;
|
||||
|
||||
let num_bits = compute_num_bits(relative_max_value as u64) as u64 * fastfield_accessor.num_vals() as u64
|
||||
let metadata_per_block = {
|
||||
let mut out = vec![];
|
||||
Block::default().serialize(&mut out).unwrap();
|
||||
out.len()
|
||||
};
|
||||
let num_bits = estimated_bit_width as u64 * fastfield_accessor.num_vals() as u64
|
||||
// function metadata per block
|
||||
+ 29 * (fastfield_accessor.num_vals() / CHUNK_SIZE);
|
||||
+ metadata_per_block as u64 * (fastfield_accessor.num_vals() / CHUNK_SIZE as u64);
|
||||
let num_bits_uncompressed = 64 * fastfield_accessor.num_vals();
|
||||
Some(num_bits as f32 / num_bits_uncompressed as f32)
|
||||
}
|
||||
}
|
||||
|
||||
fn distance<T: Sub<Output = T> + Ord>(x: T, y: T) -> T {
|
||||
if x < y {
|
||||
y - x
|
||||
} else {
|
||||
x - y
|
||||
}
|
||||
}
|
||||
fn serialize(
|
||||
wrt: &mut impl io::Write,
|
||||
fastfield_accessor: &dyn crate::Column,
|
||||
) -> io::Result<()> {
|
||||
let mut buffer = Vec::with_capacity(CHUNK_SIZE);
|
||||
let num_vals = fastfield_accessor.num_vals();
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::tests::get_codec_test_datasets;
|
||||
let num_blocks = compute_num_blocks(num_vals);
|
||||
let mut blocks = Vec::with_capacity(num_blocks);
|
||||
|
||||
fn create_and_validate(data: &[u64], name: &str) -> Option<(f32, f32)> {
|
||||
crate::tests::create_and_validate::<BlockwiseLinearCodec>(data, name)
|
||||
}
|
||||
let mut vals = fastfield_accessor.iter();
|
||||
|
||||
const HIGHEST_BIT: u64 = 1 << 63;
|
||||
pub fn i64_to_u64(val: i64) -> u64 {
|
||||
(val as u64) ^ HIGHEST_BIT
|
||||
}
|
||||
let mut bit_packer = BitPacker::new();
|
||||
|
||||
#[test]
|
||||
fn test_compression_i64() {
|
||||
let data = (i64::MAX - 600_000..=i64::MAX - 550_000)
|
||||
.map(i64_to_u64)
|
||||
.collect::<Vec<_>>();
|
||||
let (estimate, actual_compression) =
|
||||
create_and_validate(&data, "simple monotonically large i64").unwrap();
|
||||
assert!(actual_compression < 0.2);
|
||||
assert!(estimate < 0.20);
|
||||
assert!(estimate > 0.15);
|
||||
assert!(actual_compression > 0.01);
|
||||
}
|
||||
for _ in 0..num_blocks {
|
||||
buffer.clear();
|
||||
buffer.extend((&mut vals).take(CHUNK_SIZE));
|
||||
let line = Line::train(&VecColumn::from(&buffer));
|
||||
|
||||
#[test]
|
||||
fn test_compression() {
|
||||
let data = (10..=6_000_u64).collect::<Vec<_>>();
|
||||
let (estimate, actual_compression) =
|
||||
create_and_validate(&data, "simple monotonically large").unwrap();
|
||||
assert!(actual_compression < 0.2);
|
||||
assert!(estimate < 0.20);
|
||||
assert!(estimate > 0.15);
|
||||
assert!(actual_compression > 0.01);
|
||||
}
|
||||
assert!(!buffer.is_empty());
|
||||
|
||||
#[test]
|
||||
fn test_with_codec_data_sets() {
|
||||
let data_sets = get_codec_test_datasets();
|
||||
for (mut data, name) in data_sets {
|
||||
create_and_validate(&data, name);
|
||||
data.reverse();
|
||||
create_and_validate(&data, name);
|
||||
for (i, buffer_val) in buffer.iter_mut().enumerate() {
|
||||
let interpolated_val = line.eval(i as u64);
|
||||
*buffer_val = buffer_val.wrapping_sub(interpolated_val);
|
||||
}
|
||||
let bit_width = buffer.iter().copied().map(compute_num_bits).max().unwrap();
|
||||
|
||||
for &buffer_val in &buffer {
|
||||
bit_packer.write(buffer_val, bit_width, wrt)?;
|
||||
}
|
||||
|
||||
blocks.push(Block {
|
||||
line,
|
||||
bit_unpacker: BitUnpacker::new(bit_width),
|
||||
data_start_offset: 0,
|
||||
});
|
||||
}
|
||||
}
|
||||
#[test]
|
||||
fn test_simple() {
|
||||
let data = (10..=20_u64).collect::<Vec<_>>();
|
||||
create_and_validate(&data, "simple monotonically");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn border_cases_1() {
|
||||
let data = (0..1024).collect::<Vec<_>>();
|
||||
create_and_validate(&data, "border case");
|
||||
}
|
||||
#[test]
|
||||
fn border_case_2() {
|
||||
let data = (0..1025).collect::<Vec<_>>();
|
||||
create_and_validate(&data, "border case");
|
||||
}
|
||||
#[test]
|
||||
fn rand() {
|
||||
for _ in 0..10 {
|
||||
let mut data = (5_000..20_000)
|
||||
.map(|_| rand::random::<u32>() as u64)
|
||||
.collect::<Vec<_>>();
|
||||
let _ = create_and_validate(&data, "random");
|
||||
data.reverse();
|
||||
create_and_validate(&data, "random");
|
||||
}
|
||||
let params = BlockwiseLinearParams {
|
||||
num_vals,
|
||||
min_value: fastfield_accessor.min_value(),
|
||||
max_value: fastfield_accessor.max_value(),
|
||||
blocks,
|
||||
};
|
||||
bit_packer.close(wrt)?;
|
||||
|
||||
let mut counting_wrt = CountingWriter::wrap(wrt);
|
||||
|
||||
params.serialize(&mut counting_wrt)?;
|
||||
|
||||
let footer_len = counting_wrt.written_bytes();
|
||||
|
||||
(footer_len as u32).serialize(&mut counting_wrt)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct BlockwiseLinearReader {
|
||||
params: Arc<BlockwiseLinearParams>,
|
||||
data: OwnedBytes,
|
||||
}
|
||||
|
||||
impl Column for BlockwiseLinearReader {
|
||||
#[inline(always)]
|
||||
fn get_val(&self, idx: u64) -> u64 {
|
||||
let block_id = (idx / CHUNK_SIZE as u64) as usize;
|
||||
let idx_within_block = idx % (CHUNK_SIZE as u64);
|
||||
let block = &self.params.blocks[block_id];
|
||||
let interpoled_val: u64 = block.line.eval(idx_within_block);
|
||||
let block_bytes = &self.data[block.data_start_offset..];
|
||||
let bitpacked_diff = block.bit_unpacker.get(idx_within_block, block_bytes);
|
||||
interpoled_val.wrapping_add(bitpacked_diff)
|
||||
}
|
||||
|
||||
fn min_value(&self) -> u64 {
|
||||
self.params.min_value
|
||||
}
|
||||
|
||||
fn max_value(&self) -> u64 {
|
||||
self.params.max_value
|
||||
}
|
||||
|
||||
fn num_vals(&self) -> u64 {
|
||||
self.params.num_vals
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@ use ownedbytes::OwnedBytes;
|
||||
|
||||
pub mod bitpacked;
|
||||
pub mod blockwise_linear;
|
||||
pub(crate) mod line;
|
||||
pub mod linear;
|
||||
|
||||
mod column;
|
||||
@@ -287,10 +288,10 @@ mod tests {
|
||||
|
||||
let multi_linear_interpol_estimation = BlockwiseLinearCodec::estimate(&data).unwrap();
|
||||
assert_le!(multi_linear_interpol_estimation, 0.2);
|
||||
assert_le!(linear_interpol_estimation, multi_linear_interpol_estimation);
|
||||
assert_lt!(linear_interpol_estimation, multi_linear_interpol_estimation);
|
||||
|
||||
let bitpacked_estimation = BitpackedCodec::estimate(&data).unwrap();
|
||||
assert_le!(linear_interpol_estimation, bitpacked_estimation);
|
||||
assert_lt!(linear_interpol_estimation, bitpacked_estimation);
|
||||
}
|
||||
#[test]
|
||||
fn estimation_test_bad_interpolation_case() {
|
||||
@@ -298,11 +299,23 @@ mod tests {
|
||||
|
||||
let data: VecColumn = data.into();
|
||||
let linear_interpol_estimation = LinearCodec::estimate(&data).unwrap();
|
||||
assert_le!(linear_interpol_estimation, 0.32);
|
||||
assert_le!(linear_interpol_estimation, 0.34);
|
||||
|
||||
let bitpacked_estimation = BitpackedCodec::estimate(&data).unwrap();
|
||||
assert_le!(bitpacked_estimation, linear_interpol_estimation);
|
||||
assert_lt!(bitpacked_estimation, linear_interpol_estimation);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn estimation_prefer_bitpacked() {
|
||||
let data: &[u64] = &[10, 10, 10, 10];
|
||||
|
||||
let data: VecColumn = data.into();
|
||||
let linear_interpol_estimation = LinearCodec::estimate(&data).unwrap();
|
||||
|
||||
let bitpacked_estimation = BitpackedCodec::estimate(&data).unwrap();
|
||||
assert_lt!(bitpacked_estimation, linear_interpol_estimation);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn estimation_test_bad_interpolation_case_monotonically_increasing() {
|
||||
let mut data: Vec<u64> = (200..=20000_u64).collect();
|
||||
|
||||
204
fastfield_codecs/src/line.rs
Normal file
204
fastfield_codecs/src/line.rs
Normal file
@@ -0,0 +1,204 @@
|
||||
use std::io;
|
||||
use std::num::NonZeroU64;
|
||||
|
||||
use common::{BinarySerializable, VInt};
|
||||
|
||||
use crate::Column;
|
||||
|
||||
const MID_POINT: u64 = (1u64 << 32) - 1u64;
|
||||
|
||||
/// `Line` describes a line function `y: ax + b` using integer
|
||||
/// arithmetics.
|
||||
///
|
||||
/// The slope is in fact a decimal split into a 32 bit integer value,
|
||||
/// and a 32-bit decimal value.
|
||||
///
|
||||
/// The multiplication then becomes.
|
||||
/// `y = m * x >> 32 + b`
|
||||
#[derive(Debug, Clone, Copy, Default)]
|
||||
pub struct Line {
|
||||
slope: u64,
|
||||
intercept: u64,
|
||||
}
|
||||
|
||||
/// Compute the line slope.
|
||||
///
|
||||
/// This function has the nice property of being
|
||||
/// invariant by translation.
|
||||
/// `
|
||||
/// compute_slope(y0, y1)
|
||||
/// = compute_slope(y0 + X % 2^64, y1 + X % 2^64)
|
||||
/// `
|
||||
fn compute_slope(y0: u64, y1: u64, num_vals: NonZeroU64) -> u64 {
|
||||
let dy = y1.wrapping_sub(y0);
|
||||
let sign = dy <= (1 << 63);
|
||||
let abs_dy = if sign {
|
||||
y1.wrapping_sub(y0)
|
||||
} else {
|
||||
y0.wrapping_sub(y1)
|
||||
};
|
||||
if abs_dy >= 1 << 32 {
|
||||
// This is outside of realm we handle.
|
||||
// Let's just bail.
|
||||
return 0u64;
|
||||
}
|
||||
|
||||
let abs_slope = (abs_dy << 32) / num_vals.get();
|
||||
if sign {
|
||||
abs_slope
|
||||
} else {
|
||||
// The complement does indeed create the
|
||||
// opposite decreasing slope...
|
||||
//
|
||||
// Intuitively (without the bitshifts and % u64::MAX)
|
||||
// ```
|
||||
// (x + shift)*(u64::MAX - abs_slope)
|
||||
// - (x * (u64::MAX - abs_slope))
|
||||
// = - shift * abs_slope
|
||||
// ```
|
||||
u64::MAX - abs_slope
|
||||
}
|
||||
}
|
||||
|
||||
impl Line {
|
||||
#[inline(always)]
|
||||
pub fn eval(&self, x: u64) -> u64 {
|
||||
let linear_part = (x.wrapping_mul(self.slope) >> 32) as i32 as u64;
|
||||
self.intercept.wrapping_add(linear_part)
|
||||
}
|
||||
|
||||
// Same as train, but the intercept is only estimated from provided sample positions
|
||||
pub fn estimate(ys: &dyn Column, sample_positions: &[u64]) -> Self {
|
||||
Self::train_from(ys, sample_positions.iter().cloned())
|
||||
}
|
||||
|
||||
// Intercept is only computed from provided positions
|
||||
fn train_from(ys: &dyn Column, positions: impl Iterator<Item = u64>) -> Self {
|
||||
let num_vals = if let Some(num_vals) = NonZeroU64::new(ys.num_vals() - 1) {
|
||||
num_vals
|
||||
} else {
|
||||
return Line::default();
|
||||
};
|
||||
|
||||
let y0 = ys.get_val(0);
|
||||
let y1 = ys.get_val(num_vals.get());
|
||||
|
||||
// We first independently pick our slope.
|
||||
let slope = compute_slope(y0, y1, num_vals);
|
||||
|
||||
// We picked our slope. Note that it does not have to be perfect.
|
||||
// Now we need to compute the best intercept.
|
||||
//
|
||||
// Intuitively, the best intercept is such that line passes through one of the
|
||||
// `(i, ys[])`.
|
||||
//
|
||||
// The best intercept therefore has the form
|
||||
// `y[i] - line.eval(i)` (using wrapping arithmetics).
|
||||
// In other words, the best intercept is one of the `y - Line::eval(ys[i])`
|
||||
// and our task is just to pick the one that minimizes our error.
|
||||
//
|
||||
// Without sorting our values, this is a difficult problem.
|
||||
// We however rely on the following trick...
|
||||
//
|
||||
// We only focus on the case where the interpolation is half decent.
|
||||
// If the line interpolation is doing its job on a dataset suited for it,
|
||||
// we can hope that the maximum error won't be larger than `u64::MAX / 2`.
|
||||
//
|
||||
// In other words, even without the intercept the values `y - Line::eval(ys[i])` will all be
|
||||
// within an interval that takes less than half of the modulo space of `u64`.
|
||||
//
|
||||
// Our task is therefore to identify this interval.
|
||||
// Here we simply translate all of our values by `y0 - 2^63` and pick the min.
|
||||
let mut line = Line {
|
||||
slope,
|
||||
intercept: 0,
|
||||
};
|
||||
let heuristic_shift = y0.wrapping_sub(MID_POINT);
|
||||
line.intercept = positions
|
||||
.map(|pos| {
|
||||
let y = ys.get_val(pos);
|
||||
y.wrapping_sub(line.eval(pos))
|
||||
})
|
||||
.min_by_key(|&val| val.wrapping_sub(heuristic_shift))
|
||||
.unwrap_or(0u64); //< Never happens.
|
||||
line
|
||||
}
|
||||
|
||||
/// Returns a line that attemps to approximate a function
|
||||
/// f: i in 0..[ys.num_vals()) -> ys[i].
|
||||
///
|
||||
/// - The approximation is always lower than the actual value.
|
||||
/// Or more rigorously, formally `f(i).wrapping_sub(ys[i])` is small
|
||||
/// for any i in [0..ys.len()).
|
||||
/// - It computes without panicking for any value of it.
|
||||
///
|
||||
/// This function is only invariable by translation if all of the
|
||||
/// `ys` are packaged into half of the space. (See heuristic below)
|
||||
pub fn train(ys: &dyn Column) -> Self {
|
||||
Self::train_from(ys, 0..ys.num_vals())
|
||||
}
|
||||
}
|
||||
|
||||
impl BinarySerializable for Line {
|
||||
fn serialize<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
|
||||
VInt(self.slope).serialize(writer)?;
|
||||
VInt(self.intercept).serialize(writer)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<Self> {
|
||||
let slope = VInt::deserialize(reader)?.0;
|
||||
let intercept = VInt::deserialize(reader)?.0;
|
||||
Ok(Line { slope, intercept })
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::VecColumn;
|
||||
|
||||
/// Test training a line and ensuring that the maximum difference between
|
||||
/// the data points and the line is `expected`.
|
||||
///
|
||||
/// This function operates translation over the data for better coverage.
|
||||
#[track_caller]
|
||||
fn test_line_interpol_with_translation(ys: &[u64], expected: Option<u64>) {
|
||||
let mut translations = vec![0, 100, u64::MAX / 2, u64::MAX, u64::MAX - 1];
|
||||
translations.extend_from_slice(ys);
|
||||
for translation in translations {
|
||||
let translated_ys: Vec<u64> = ys
|
||||
.iter()
|
||||
.copied()
|
||||
.map(|y| y.wrapping_add(translation))
|
||||
.collect();
|
||||
let largest_err = test_eval_max_err(&translated_ys);
|
||||
assert_eq!(largest_err, expected);
|
||||
}
|
||||
}
|
||||
|
||||
fn test_eval_max_err(ys: &[u64]) -> Option<u64> {
|
||||
let line = Line::train(&VecColumn::from(&ys));
|
||||
ys.iter()
|
||||
.enumerate()
|
||||
.map(|(x, y)| y.wrapping_sub(line.eval(x as u64)))
|
||||
.max()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_train() {
|
||||
test_line_interpol_with_translation(&[11, 11, 11, 12, 12, 13], Some(1));
|
||||
test_line_interpol_with_translation(&[13, 12, 12, 11, 11, 11], Some(1));
|
||||
test_line_interpol_with_translation(&[13, 13, 12, 11, 11, 11], Some(1));
|
||||
test_line_interpol_with_translation(&[13, 13, 12, 11, 11, 11], Some(1));
|
||||
test_line_interpol_with_translation(&[u64::MAX - 1, 0, 0, 1], Some(1));
|
||||
test_line_interpol_with_translation(&[u64::MAX - 1, u64::MAX, 0, 1], Some(0));
|
||||
test_line_interpol_with_translation(&[0, 1, 2, 3, 5], Some(0));
|
||||
test_line_interpol_with_translation(&[1, 2, 3, 4], Some(0));
|
||||
|
||||
let data: Vec<u64> = (0..255).collect();
|
||||
test_line_interpol_with_translation(&data, Some(0));
|
||||
let data: Vec<u64> = (0..255).map(|el| el * 2).collect();
|
||||
test_line_interpol_with_translation(&data, Some(0));
|
||||
}
|
||||
}
|
||||
@@ -1,10 +1,10 @@
|
||||
use std::io::{self, Read, Write};
|
||||
use std::ops::Sub;
|
||||
use std::io::{self, Write};
|
||||
|
||||
use common::{BinarySerializable, FixedSize};
|
||||
use common::{BinarySerializable, CountingWriter, DeserializeFrom};
|
||||
use ownedbytes::OwnedBytes;
|
||||
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
|
||||
|
||||
use crate::line::Line;
|
||||
use crate::{Column, FastFieldCodec, FastFieldCodecType};
|
||||
|
||||
/// Depending on the field type, a different
|
||||
@@ -12,56 +12,15 @@ use crate::{Column, FastFieldCodec, FastFieldCodecType};
|
||||
#[derive(Clone)]
|
||||
pub struct LinearReader {
|
||||
data: OwnedBytes,
|
||||
bit_unpacker: BitUnpacker,
|
||||
pub footer: LinearFooter,
|
||||
pub slope: f32,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LinearFooter {
|
||||
pub relative_max_value: u64,
|
||||
pub offset: u64,
|
||||
pub first_val: u64,
|
||||
pub last_val: u64,
|
||||
pub num_vals: u64,
|
||||
pub min_value: u64,
|
||||
pub max_value: u64,
|
||||
}
|
||||
|
||||
impl BinarySerializable for LinearFooter {
|
||||
fn serialize<W: Write>(&self, write: &mut W) -> io::Result<()> {
|
||||
self.relative_max_value.serialize(write)?;
|
||||
self.offset.serialize(write)?;
|
||||
self.first_val.serialize(write)?;
|
||||
self.last_val.serialize(write)?;
|
||||
self.num_vals.serialize(write)?;
|
||||
self.min_value.serialize(write)?;
|
||||
self.max_value.serialize(write)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn deserialize<R: Read>(reader: &mut R) -> io::Result<LinearFooter> {
|
||||
Ok(LinearFooter {
|
||||
relative_max_value: u64::deserialize(reader)?,
|
||||
offset: u64::deserialize(reader)?,
|
||||
first_val: u64::deserialize(reader)?,
|
||||
last_val: u64::deserialize(reader)?,
|
||||
num_vals: u64::deserialize(reader)?,
|
||||
min_value: u64::deserialize(reader)?,
|
||||
max_value: u64::deserialize(reader)?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl FixedSize for LinearFooter {
|
||||
const SIZE_IN_BYTES: usize = 56;
|
||||
footer: LinearParams,
|
||||
}
|
||||
|
||||
impl Column for LinearReader {
|
||||
#[inline]
|
||||
fn get_val(&self, doc: u64) -> u64 {
|
||||
let calculated_value = get_calculated_value(self.footer.first_val, doc, self.slope);
|
||||
(calculated_value + self.bit_unpacker.get(doc, &self.data)) - self.footer.offset
|
||||
let interpoled_val: u64 = self.footer.line.eval(doc);
|
||||
let bitpacked_diff = self.footer.bit_unpacker.get(doc, &self.data);
|
||||
interpoled_val.wrapping_add(bitpacked_diff)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
@@ -82,42 +41,38 @@ impl Column for LinearReader {
|
||||
/// and stores the difference bitpacked.
|
||||
pub struct LinearCodec;
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn get_slope(first_val: u64, last_val: u64, num_vals: u64) -> f32 {
|
||||
if num_vals <= 1 {
|
||||
return 0.0;
|
||||
}
|
||||
// We calculate the slope with f64 high precision and use the result in lower precision f32
|
||||
// This is done in order to handle estimations for very large values like i64::MAX
|
||||
let diff = diff(last_val, first_val);
|
||||
(diff / (num_vals - 1) as f64) as f32
|
||||
#[derive(Debug, Clone)]
|
||||
struct LinearParams {
|
||||
num_vals: u64,
|
||||
min_value: u64,
|
||||
max_value: u64,
|
||||
line: Line,
|
||||
bit_unpacker: BitUnpacker,
|
||||
}
|
||||
|
||||
/// Delay the cast, to improve precision for very large u64 values.
|
||||
///
|
||||
/// Since i64 is mapped monotonically to u64 space, 0i64 is after the mapping i64::MAX.
|
||||
/// So very large values are not uncommon.
|
||||
///
|
||||
/// ```rust
|
||||
/// let val1 = i64::MAX;
|
||||
/// let val2 = i64::MAX - 100;
|
||||
/// assert_eq!(val1 - val2, 100);
|
||||
/// assert_eq!(val1 as f64 - val2 as f64, 0.0);
|
||||
/// ```
|
||||
fn diff(val1: u64, val2: u64) -> f64 {
|
||||
if val1 >= val2 {
|
||||
(val1 - val2) as f64
|
||||
} else {
|
||||
(val2 - val1) as f64 * -1.0
|
||||
impl BinarySerializable for LinearParams {
|
||||
fn serialize<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
|
||||
self.num_vals.serialize(writer)?;
|
||||
self.min_value.serialize(writer)?;
|
||||
self.max_value.serialize(writer)?;
|
||||
self.line.serialize(writer)?;
|
||||
self.bit_unpacker.bit_width().serialize(writer)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn get_calculated_value(first_val: u64, pos: u64, slope: f32) -> u64 {
|
||||
if slope < 0.0 {
|
||||
first_val.saturating_sub((pos as f32 * -slope) as u64)
|
||||
} else {
|
||||
first_val.saturating_add((pos as f32 * slope) as u64)
|
||||
fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<Self> {
|
||||
let num_vals = u64::deserialize(reader)?;
|
||||
let min_value = u64::deserialize(reader)?;
|
||||
let max_value = u64::deserialize(reader)?;
|
||||
let line = Line::deserialize(reader)?;
|
||||
let bit_width = u8::deserialize(reader)?;
|
||||
Ok(Self {
|
||||
num_vals,
|
||||
min_value,
|
||||
max_value,
|
||||
line,
|
||||
bit_unpacker: BitUnpacker::new(bit_width),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -128,65 +83,50 @@ impl FastFieldCodec for LinearCodec {
|
||||
|
||||
/// Opens a fast field given a file.
|
||||
fn open_from_bytes(bytes: OwnedBytes) -> io::Result<Self::Reader> {
|
||||
let footer_offset = bytes.len() - LinearFooter::SIZE_IN_BYTES;
|
||||
let footer_len: u32 = (&bytes[bytes.len() - 4..]).deserialize()?;
|
||||
let footer_offset = bytes.len() - 4 - footer_len as usize;
|
||||
let (data, mut footer) = bytes.split(footer_offset);
|
||||
let footer = LinearFooter::deserialize(&mut footer)?;
|
||||
let slope = get_slope(footer.first_val, footer.last_val, footer.num_vals);
|
||||
let num_bits = compute_num_bits(footer.relative_max_value);
|
||||
let bit_unpacker = BitUnpacker::new(num_bits);
|
||||
Ok(LinearReader {
|
||||
data,
|
||||
bit_unpacker,
|
||||
footer,
|
||||
slope,
|
||||
})
|
||||
let footer = LinearParams::deserialize(&mut footer)?;
|
||||
Ok(LinearReader { data, footer })
|
||||
}
|
||||
|
||||
/// Creates a new fast field serializer.
|
||||
fn serialize(write: &mut impl Write, fastfield_accessor: &dyn Column) -> io::Result<()> {
|
||||
assert!(fastfield_accessor.min_value() <= fastfield_accessor.max_value());
|
||||
let line = Line::train(fastfield_accessor);
|
||||
|
||||
let first_val = fastfield_accessor.get_val(0);
|
||||
let last_val = fastfield_accessor.get_val(fastfield_accessor.num_vals() as u64 - 1);
|
||||
let slope = get_slope(first_val, last_val, fastfield_accessor.num_vals());
|
||||
// calculate offset to ensure all values are positive
|
||||
let mut offset = 0;
|
||||
let mut rel_positive_max = 0;
|
||||
for (pos, actual_value) in fastfield_accessor.iter().enumerate() {
|
||||
let calculated_value = get_calculated_value(first_val, pos as u64, slope);
|
||||
if calculated_value > actual_value {
|
||||
// negative value we need to apply an offset
|
||||
// we ignore negative values in the max value calculation, because negative values
|
||||
// will be offset to 0
|
||||
offset = offset.max(calculated_value - actual_value);
|
||||
} else {
|
||||
// positive value no offset reuqired
|
||||
rel_positive_max = rel_positive_max.max(actual_value - calculated_value);
|
||||
}
|
||||
}
|
||||
let max_offset_from_line = fastfield_accessor
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(pos, actual_value)| {
|
||||
let calculated_value = line.eval(pos as u64);
|
||||
actual_value.wrapping_sub(calculated_value)
|
||||
})
|
||||
.max()
|
||||
.unwrap();
|
||||
|
||||
// rel_positive_max will be adjusted by offset
|
||||
let relative_max_value = rel_positive_max + offset;
|
||||
|
||||
let num_bits = compute_num_bits(relative_max_value);
|
||||
let num_bits = compute_num_bits(max_offset_from_line);
|
||||
let mut bit_packer = BitPacker::new();
|
||||
for (pos, val) in fastfield_accessor.iter().enumerate() {
|
||||
let calculated_value = get_calculated_value(first_val, pos as u64, slope);
|
||||
let diff = (val + offset) - calculated_value;
|
||||
bit_packer.write(diff, num_bits, write)?;
|
||||
for (pos, actual_value) in fastfield_accessor.iter().enumerate() {
|
||||
let calculated_value = line.eval(pos as u64);
|
||||
let offset = actual_value.wrapping_sub(calculated_value);
|
||||
bit_packer.write(offset, num_bits, write)?;
|
||||
}
|
||||
bit_packer.close(write)?;
|
||||
|
||||
let footer = LinearFooter {
|
||||
relative_max_value,
|
||||
offset,
|
||||
first_val,
|
||||
last_val,
|
||||
let footer = LinearParams {
|
||||
num_vals: fastfield_accessor.num_vals(),
|
||||
min_value: fastfield_accessor.min_value(),
|
||||
max_value: fastfield_accessor.max_value(),
|
||||
line,
|
||||
bit_unpacker: BitUnpacker::new(num_bits),
|
||||
};
|
||||
footer.serialize(write)?;
|
||||
|
||||
let mut counting_wrt = CountingWriter::wrap(write);
|
||||
footer.serialize(&mut counting_wrt)?;
|
||||
let footer_len = counting_wrt.written_bytes();
|
||||
(footer_len as u32).serialize(&mut counting_wrt)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -199,64 +139,32 @@ impl FastFieldCodec for LinearCodec {
|
||||
return None; // disable compressor for this case
|
||||
}
|
||||
|
||||
// On serialisation the offset is added to the actual value.
|
||||
// We need to make sure this won't run into overflow calculation issues.
|
||||
// For this we take the maximum theroretical offset and add this to the max value.
|
||||
// If this doesn't overflow the algorithm should be fine
|
||||
let theorethical_maximum_offset =
|
||||
fastfield_accessor.max_value() - fastfield_accessor.min_value();
|
||||
if fastfield_accessor
|
||||
.max_value()
|
||||
.checked_add(theorethical_maximum_offset)
|
||||
.is_none()
|
||||
{
|
||||
return None;
|
||||
}
|
||||
|
||||
let first_val = fastfield_accessor.get_val(0);
|
||||
let last_val = fastfield_accessor.get_val(fastfield_accessor.num_vals() as u64 - 1);
|
||||
let slope = get_slope(first_val, last_val, fastfield_accessor.num_vals());
|
||||
|
||||
// let's sample at 0%, 5%, 10% .. 95%, 100%
|
||||
let num_vals = fastfield_accessor.num_vals() as f32 / 100.0;
|
||||
let sample_positions = (0..20)
|
||||
.map(|pos| (num_vals * pos as f32 * 5.0) as usize)
|
||||
.map(|pos| (num_vals * pos as f32 * 5.0) as u64)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let max_distance = sample_positions
|
||||
.iter()
|
||||
let line = Line::estimate(fastfield_accessor, &sample_positions);
|
||||
|
||||
let estimated_bit_width = sample_positions
|
||||
.into_iter()
|
||||
.map(|pos| {
|
||||
let calculated_value = get_calculated_value(first_val, *pos as u64, slope);
|
||||
let actual_value = fastfield_accessor.get_val(*pos as u64);
|
||||
distance(calculated_value, actual_value)
|
||||
let actual_value = fastfield_accessor.get_val(pos);
|
||||
let interpolated_val = line.eval(pos as u64);
|
||||
actual_value.wrapping_sub(interpolated_val)
|
||||
})
|
||||
.map(|diff| ((diff as f32 * 1.5) * 2.0) as u64)
|
||||
.map(compute_num_bits)
|
||||
.max()
|
||||
.unwrap_or(0);
|
||||
|
||||
// the theory would be that we don't have the actual max_distance, but we are close within
|
||||
// 50% threshold.
|
||||
// It is multiplied by 2 because in a log case scenario the line would be as much above as
|
||||
// below. So the offset would = max_distance
|
||||
//
|
||||
let relative_max_value = (max_distance as f32 * 1.5) * 2.0;
|
||||
|
||||
let num_bits = compute_num_bits(relative_max_value as u64) as u64
|
||||
* fastfield_accessor.num_vals()
|
||||
+ LinearFooter::SIZE_IN_BYTES as u64;
|
||||
let num_bits = (estimated_bit_width as u64 * fastfield_accessor.num_vals() as u64) + 64;
|
||||
let num_bits_uncompressed = 64 * fastfield_accessor.num_vals();
|
||||
Some(num_bits as f32 / num_bits_uncompressed as f32)
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn distance<T: Sub<Output = T> + Ord>(x: T, y: T) -> T {
|
||||
if x < y {
|
||||
y - x
|
||||
} else {
|
||||
x - y
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use rand::RngCore;
|
||||
@@ -268,34 +176,14 @@ mod tests {
|
||||
crate::tests::create_and_validate::<LinearCodec>(data, name)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn get_calculated_value_test() {
|
||||
// pos slope
|
||||
assert_eq!(get_calculated_value(100, 10, 5.0), 150);
|
||||
|
||||
// neg slope
|
||||
assert_eq!(get_calculated_value(100, 10, -5.0), 50);
|
||||
|
||||
// pos slope, very high values
|
||||
assert_eq!(
|
||||
get_calculated_value(i64::MAX as u64, 10, 5.0),
|
||||
i64::MAX as u64 + 50
|
||||
);
|
||||
// neg slope, very high values
|
||||
assert_eq!(
|
||||
get_calculated_value(i64::MAX as u64, 10, -5.0),
|
||||
i64::MAX as u64 - 50
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_compression() {
|
||||
let data = (10..=6_000_u64).collect::<Vec<_>>();
|
||||
let (estimate, actual_compression) =
|
||||
create_and_validate(&data, "simple monotonically large").unwrap();
|
||||
|
||||
assert!(actual_compression < 0.01);
|
||||
assert!(estimate < 0.01);
|
||||
assert_le!(actual_compression, 0.001);
|
||||
assert_le!(estimate, 0.02);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -400,7 +400,11 @@ mod tests {
|
||||
let file = directory.open_read(path).unwrap();
|
||||
// assert_eq!(file.len(), 17710 as usize); //bitpacked size
|
||||
// assert_eq!(file.len(), 10175_usize); // linear interpol size
|
||||
assert_eq!(file.len(), 75_usize); // linear interpol size after calc improvement
|
||||
// assert_eq!(file.len(), 75_usize); // linear interpol size after calc improvement
|
||||
// assert_eq!(file.len(), 1325_usize); // linear interpol size after switching to int based
|
||||
assert_eq!(file.len(), 62_usize); // linear interpol size after switching to int based, off
|
||||
// by one fix
|
||||
|
||||
{
|
||||
let fast_fields_composite = CompositeFile::open(&file)?;
|
||||
let data = fast_fields_composite
|
||||
|
||||
Reference in New Issue
Block a user