From ea72cf34d60cf5080c609239b7bf935bfe18229f Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 5 Sep 2022 15:53:00 +0900 Subject: [PATCH] Int based linear interpol (#1482) * Rename BlockwiseLinear to BlockwiseLinearLegacy Reimplements the blockwise multilinear codec using integer arithmetics. Added comments * add estimate for blockwise * Added one unit test * use int based for linear interpol * fix merge conflicts * reuse code * cargo fmt * fix clippy * fix test * fix off by one fix off by one to accurately interpolate autoincrement fields * extend test, fix estimate * remove legacy codec Co-authored-by: Pascal Seitz --- fastfield_codecs/src/blockwise_linear.rs | 549 +++++++---------------- fastfield_codecs/src/lib.rs | 21 +- fastfield_codecs/src/line.rs | 204 +++++++++ fastfield_codecs/src/linear.rs | 264 ++++------- src/fastfield/mod.rs | 6 +- 5 files changed, 470 insertions(+), 574 deletions(-) create mode 100644 fastfield_codecs/src/line.rs diff --git a/fastfield_codecs/src/blockwise_linear.rs b/fastfield_codecs/src/blockwise_linear.rs index da342eb78..32dcaf005 100644 --- a/fastfield_codecs/src/blockwise_linear.rs +++ b/fastfield_codecs/src/blockwise_linear.rs @@ -1,436 +1,223 @@ -//! The BlockwiseLinear codec uses linear interpolation to guess a values and stores the -//! offset, but in blocks of 512. -//! -//! With a CHUNK_SIZE of 512 and 29 byte metadata per block, we get a overhead for metadata of 232 / -//! 512 = 0,45 bits per element. The additional space required per element in a block is the the -//! maximum deviation of the linear interpolation estimation function. -//! -//! E.g. if the maximum deviation of an element is 12, all elements cost 4bits. -//! -//! Size per block: -//! Num Elements * Maximum Deviation from Interpolation + 29 Byte Metadata - -use std::io::{self, Read, Write}; -use std::ops::Sub; +use std::io; +use std::sync::Arc; use common::{BinarySerializable, CountingWriter, DeserializeFrom}; use ownedbytes::OwnedBytes; use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker}; -use crate::linear::{get_calculated_value, get_slope}; -use crate::{Column, FastFieldCodec, FastFieldCodecType}; +use crate::line::Line; +use crate::{Column, FastFieldCodec, FastFieldCodecType, VecColumn}; -const CHUNK_SIZE: u64 = 512; +const CHUNK_SIZE: usize = 512; -/// Depending on the field type, a different -/// fast field is required. -#[derive(Clone)] -pub struct BlockwiseLinearReader { - data: OwnedBytes, - pub footer: BlockwiseLinearFooter, -} - -#[derive(Clone, Debug, Default)] -struct Function { - // The offset in the data is required, because we have different bit_widths per block - data_start_offset: u64, - // start_pos in the block will be CHUNK_SIZE * BLOCK_NUM - start_pos: u64, - // only used during serialization, 0 after deserialization - end_pos: u64, - // only used during serialization, 0 after deserialization - value_start_pos: u64, - // only used during serialization, 0 after deserialization - value_end_pos: u64, - slope: f32, - // The offset so that all values are positive when writing them - positive_val_offset: u64, - num_bits: u8, +#[derive(Debug, Default)] +struct Block { + line: Line, bit_unpacker: BitUnpacker, + data_start_offset: usize, } -impl Function { - fn calc_slope(&mut self) { - let num_vals = self.end_pos - self.start_pos; - self.slope = get_slope(self.value_start_pos, self.value_end_pos, num_vals); - } - // split the interpolation into two function, change self and return the second split - fn split(&mut self, split_pos: u64, split_pos_value: u64) -> Function { - let mut new_function = Function { - start_pos: split_pos, - end_pos: self.end_pos, - value_start_pos: split_pos_value, - value_end_pos: self.value_end_pos, - ..Default::default() - }; - new_function.calc_slope(); - self.end_pos = split_pos; - self.value_end_pos = split_pos_value; - self.calc_slope(); - new_function - } -} - -impl BinarySerializable for Function { - fn serialize(&self, write: &mut W) -> io::Result<()> { - self.data_start_offset.serialize(write)?; - self.value_start_pos.serialize(write)?; - self.positive_val_offset.serialize(write)?; - self.slope.serialize(write)?; - self.num_bits.serialize(write)?; +impl BinarySerializable for Block { + fn serialize(&self, writer: &mut W) -> io::Result<()> { + self.line.serialize(writer)?; + self.bit_unpacker.bit_width().serialize(writer)?; Ok(()) } - fn deserialize(reader: &mut R) -> io::Result { - let data_start_offset = u64::deserialize(reader)?; - let value_start_pos = u64::deserialize(reader)?; - let offset = u64::deserialize(reader)?; - let slope = f32::deserialize(reader)?; - let num_bits = u8::deserialize(reader)?; - let interpolation = Function { - data_start_offset, - value_start_pos, - positive_val_offset: offset, - num_bits, - bit_unpacker: BitUnpacker::new(num_bits), - slope, - ..Default::default() - }; - - Ok(interpolation) + fn deserialize(reader: &mut R) -> io::Result { + let line = Line::deserialize(reader)?; + let bit_width = u8::deserialize(reader)?; + Ok(Block { + line, + bit_unpacker: BitUnpacker::new(bit_width), + data_start_offset: 0, + }) } } -#[derive(Clone, Debug)] -pub struct BlockwiseLinearFooter { - pub num_vals: u64, - pub min_value: u64, - pub max_value: u64, - interpolations: Vec, +#[derive(Debug)] +struct BlockwiseLinearParams { + num_vals: u64, + min_value: u64, + max_value: u64, + blocks: Vec, } -impl BinarySerializable for BlockwiseLinearFooter { - fn serialize(&self, write: &mut W) -> io::Result<()> { - let mut out = vec![]; - self.num_vals.serialize(&mut out)?; - self.min_value.serialize(&mut out)?; - self.max_value.serialize(&mut out)?; - self.interpolations.serialize(&mut out)?; - write.write_all(&out)?; - (out.len() as u32).serialize(write)?; - Ok(()) - } - - fn deserialize(reader: &mut R) -> io::Result { - let mut footer = BlockwiseLinearFooter { - num_vals: u64::deserialize(reader)?, - min_value: u64::deserialize(reader)?, - max_value: u64::deserialize(reader)?, - interpolations: Vec::::deserialize(reader)?, - }; - for (num, interpol) in footer.interpolations.iter_mut().enumerate() { - interpol.start_pos = CHUNK_SIZE * num as u64; +impl BinarySerializable for BlockwiseLinearParams { + fn serialize(&self, wrt: &mut W) -> io::Result<()> { + self.num_vals.serialize(wrt)?; + self.min_value.serialize(wrt)?; + self.max_value.serialize(wrt)?; + let expected_num_blocks = compute_num_blocks(self.num_vals); + assert_eq!(expected_num_blocks, self.blocks.len()); + for block in &self.blocks { + block.serialize(wrt)?; } - Ok(footer) + Ok(()) + } + + fn deserialize(reader: &mut R) -> io::Result { + let num_vals = u64::deserialize(reader)?; + let min_value = u64::deserialize(reader)?; + let max_value = u64::deserialize(reader)?; + let num_blocks = compute_num_blocks(num_vals); + let mut blocks = Vec::with_capacity(num_blocks); + for _ in 0..num_blocks { + blocks.push(Block::deserialize(reader)?); + } + Ok(BlockwiseLinearParams { + num_vals, + min_value, + max_value, + blocks, + }) } } -#[inline] -fn get_interpolation_position(doc: u64) -> usize { - let index = doc / CHUNK_SIZE; - index as usize +fn compute_num_blocks(num_vals: u64) -> usize { + (num_vals as usize + CHUNK_SIZE - 1) / CHUNK_SIZE } -#[inline] -fn get_interpolation_function(doc: u64, interpolations: &[Function]) -> &Function { - &interpolations[get_interpolation_position(doc)] -} - -impl Column for BlockwiseLinearReader { - #[inline] - fn get_val(&self, idx: u64) -> u64 { - let interpolation = get_interpolation_function(idx, &self.footer.interpolations); - let in_block_idx = idx - interpolation.start_pos; - let calculated_value = get_calculated_value( - interpolation.value_start_pos, - in_block_idx, - interpolation.slope, - ); - let diff = interpolation.bit_unpacker.get( - in_block_idx, - &self.data[interpolation.data_start_offset as usize..], - ); - (calculated_value + diff) - interpolation.positive_val_offset - } - - #[inline] - fn min_value(&self) -> u64 { - self.footer.min_value - } - #[inline] - fn max_value(&self) -> u64 { - self.footer.max_value - } - #[inline] - fn num_vals(&self) -> u64 { - self.footer.num_vals - } -} - -/// Same as LinearSerializer, but working on chunks of CHUNK_SIZE elements. pub struct BlockwiseLinearCodec; impl FastFieldCodec for BlockwiseLinearCodec { - const CODEC_TYPE: FastFieldCodecType = FastFieldCodecType::BlockwiseLinear; - + const CODEC_TYPE: crate::FastFieldCodecType = FastFieldCodecType::BlockwiseLinear; type Reader = BlockwiseLinearReader; - /// Opens a fast field given a file. - fn open_from_bytes(bytes: OwnedBytes) -> io::Result { + fn open_from_bytes(bytes: ownedbytes::OwnedBytes) -> io::Result { let footer_len: u32 = (&bytes[bytes.len() - 4..]).deserialize()?; let footer_offset = bytes.len() - 4 - footer_len as usize; let (data, mut footer) = bytes.split(footer_offset); - let footer = BlockwiseLinearFooter::deserialize(&mut footer)?; - Ok(BlockwiseLinearReader { data, footer }) + let mut params = BlockwiseLinearParams::deserialize(&mut footer)?; + let mut start_offset = 0; + for block in params.blocks.iter_mut() { + block.data_start_offset = start_offset; + start_offset += (block.bit_unpacker.bit_width() as usize) * CHUNK_SIZE / 8; + } + Ok(BlockwiseLinearReader { + params: Arc::new(params), + data, + }) } - /// Creates a new fast field serializer. - fn serialize(write: &mut impl Write, fastfield_accessor: &dyn Column) -> io::Result<()> { - assert!(fastfield_accessor.min_value() <= fastfield_accessor.max_value()); - - let first_val = fastfield_accessor.get_val(0); - let last_val = fastfield_accessor.get_val(fastfield_accessor.num_vals() as u64 - 1); - - let mut first_function = Function { - end_pos: fastfield_accessor.num_vals(), - value_start_pos: first_val, - value_end_pos: last_val, - ..Default::default() - }; - first_function.calc_slope(); - let mut interpolations = vec![first_function]; - - // Since we potentially apply multiple passes over the data, the data is cached. - // Multiple iteration can be expensive (merge with index sorting can add lot of overhead per - // iteration) - let data = fastfield_accessor.iter().collect::>(); - - //// let's split this into chunks of CHUNK_SIZE - for data_pos in (0..data.len() as u64).step_by(CHUNK_SIZE as usize).skip(1) { - let new_fun = { - let current_interpolation = interpolations.last_mut().unwrap(); - current_interpolation.split(data_pos, data[data_pos as usize]) - }; - interpolations.push(new_fun); - } - // calculate offset and max (-> numbits) for each function - for interpolation in &mut interpolations { - let mut offset = 0; - let mut rel_positive_max = 0; - for (pos, actual_value) in data - [interpolation.start_pos as usize..interpolation.end_pos as usize] - .iter() - .cloned() - .enumerate() - { - let calculated_value = get_calculated_value( - interpolation.value_start_pos, - pos as u64, - interpolation.slope, - ); - if calculated_value > actual_value { - // negative value we need to apply an offset - // we ignore negative values in the max value calculation, because negative - // values will be offset to 0 - offset = offset.max(calculated_value - actual_value); - } else { - // positive value no offset reuqired - rel_positive_max = rel_positive_max.max(actual_value - calculated_value); - } - } - - interpolation.positive_val_offset = offset; - interpolation.num_bits = compute_num_bits(rel_positive_max + offset); - } - let mut bit_packer = BitPacker::new(); - - let write = &mut CountingWriter::wrap(write); - for interpolation in &mut interpolations { - interpolation.data_start_offset = write.written_bytes(); - let num_bits = interpolation.num_bits; - for (pos, actual_value) in data - [interpolation.start_pos as usize..interpolation.end_pos as usize] - .iter() - .cloned() - .enumerate() - { - let calculated_value = get_calculated_value( - interpolation.value_start_pos, - pos as u64, - interpolation.slope, - ); - let diff = (actual_value + interpolation.positive_val_offset) - calculated_value; - bit_packer.write(diff, num_bits, write)?; - } - bit_packer.flush(write)?; - } - bit_packer.close(write)?; - - let footer = BlockwiseLinearFooter { - num_vals: fastfield_accessor.num_vals(), - min_value: fastfield_accessor.min_value(), - max_value: fastfield_accessor.max_value(), - interpolations, - }; - footer.serialize(write)?; - Ok(()) - } - - /// estimation for linear interpolation is hard because, you don't know - /// where the local maxima are for the deviation of the calculated value and - /// the offset is also unknown. - #[allow(clippy::question_mark)] - fn estimate(fastfield_accessor: &impl Column) -> Option { - if fastfield_accessor.num_vals() < 10 * CHUNK_SIZE { + // Estimate first_chunk and extrapolate + fn estimate(fastfield_accessor: &impl crate::Column) -> Option { + if fastfield_accessor.num_vals() < 10 * CHUNK_SIZE as u64 { return None; } - - // On serialization the offset is added to the actual value. - // We need to make sure this won't run into overflow calculation issues. - // For this we take the maximum theroretical offset and add this to the max value. - // If this doesn't overflow the algorithm should be fine - let theorethical_maximum_offset = - fastfield_accessor.max_value() - fastfield_accessor.min_value(); - if fastfield_accessor - .max_value() - .checked_add(theorethical_maximum_offset) - .is_none() - { - return None; - } - - let first_val_in_first_block = fastfield_accessor.get_val(0); - let last_elem_in_first_chunk = CHUNK_SIZE.min(fastfield_accessor.num_vals()); - let last_val_in_first_block = - fastfield_accessor.get_val(last_elem_in_first_chunk as u64 - 1); - let slope = get_slope( - first_val_in_first_block, - last_val_in_first_block, - fastfield_accessor.num_vals(), - ); - - // let's sample at 0%, 5%, 10% .. 95%, 100%, but for the first block only - let sample_positions = (0..20) - .map(|pos| (last_elem_in_first_chunk as f32 / 100.0 * pos as f32 * 5.0) as usize) - .collect::>(); - - let max_distance = sample_positions + let mut first_chunk: Vec = fastfield_accessor .iter() - .map(|pos| { - let calculated_value = - get_calculated_value(first_val_in_first_block, *pos as u64, slope); - let actual_value = fastfield_accessor.get_val(*pos as u64); - distance(calculated_value, actual_value) - }) + .take(CHUNK_SIZE as usize) + .collect(); + let line = Line::train(&VecColumn::from(&first_chunk)); + for (i, buffer_val) in first_chunk.iter_mut().enumerate() { + let interpolated_val = line.eval(i as u64); + *buffer_val = buffer_val.wrapping_sub(interpolated_val); + } + let estimated_bit_width = first_chunk + .iter() + .map(|el| ((el + 1) as f32 * 3.0) as u64) + .map(compute_num_bits) .max() .unwrap(); - // Estimate one block and extrapolate the cost to all blocks. - // the theory would be that we don't have the actual max_distance, but we are close within - // 50% threshold. - // It is multiplied by 2 because in a log case scenario the line would be as much above as - // below. So the offset would = max_distance - // - let relative_max_value = (max_distance as f32 * 1.5) * 2.0; - - let num_bits = compute_num_bits(relative_max_value as u64) as u64 * fastfield_accessor.num_vals() as u64 + let metadata_per_block = { + let mut out = vec![]; + Block::default().serialize(&mut out).unwrap(); + out.len() + }; + let num_bits = estimated_bit_width as u64 * fastfield_accessor.num_vals() as u64 // function metadata per block - + 29 * (fastfield_accessor.num_vals() / CHUNK_SIZE); + + metadata_per_block as u64 * (fastfield_accessor.num_vals() / CHUNK_SIZE as u64); let num_bits_uncompressed = 64 * fastfield_accessor.num_vals(); Some(num_bits as f32 / num_bits_uncompressed as f32) } -} -fn distance + Ord>(x: T, y: T) -> T { - if x < y { - y - x - } else { - x - y - } -} + fn serialize( + wrt: &mut impl io::Write, + fastfield_accessor: &dyn crate::Column, + ) -> io::Result<()> { + let mut buffer = Vec::with_capacity(CHUNK_SIZE); + let num_vals = fastfield_accessor.num_vals(); -#[cfg(test)] -mod tests { - use super::*; - use crate::tests::get_codec_test_datasets; + let num_blocks = compute_num_blocks(num_vals); + let mut blocks = Vec::with_capacity(num_blocks); - fn create_and_validate(data: &[u64], name: &str) -> Option<(f32, f32)> { - crate::tests::create_and_validate::(data, name) - } + let mut vals = fastfield_accessor.iter(); - const HIGHEST_BIT: u64 = 1 << 63; - pub fn i64_to_u64(val: i64) -> u64 { - (val as u64) ^ HIGHEST_BIT - } + let mut bit_packer = BitPacker::new(); - #[test] - fn test_compression_i64() { - let data = (i64::MAX - 600_000..=i64::MAX - 550_000) - .map(i64_to_u64) - .collect::>(); - let (estimate, actual_compression) = - create_and_validate(&data, "simple monotonically large i64").unwrap(); - assert!(actual_compression < 0.2); - assert!(estimate < 0.20); - assert!(estimate > 0.15); - assert!(actual_compression > 0.01); - } + for _ in 0..num_blocks { + buffer.clear(); + buffer.extend((&mut vals).take(CHUNK_SIZE)); + let line = Line::train(&VecColumn::from(&buffer)); - #[test] - fn test_compression() { - let data = (10..=6_000_u64).collect::>(); - let (estimate, actual_compression) = - create_and_validate(&data, "simple monotonically large").unwrap(); - assert!(actual_compression < 0.2); - assert!(estimate < 0.20); - assert!(estimate > 0.15); - assert!(actual_compression > 0.01); - } + assert!(!buffer.is_empty()); - #[test] - fn test_with_codec_data_sets() { - let data_sets = get_codec_test_datasets(); - for (mut data, name) in data_sets { - create_and_validate(&data, name); - data.reverse(); - create_and_validate(&data, name); + for (i, buffer_val) in buffer.iter_mut().enumerate() { + let interpolated_val = line.eval(i as u64); + *buffer_val = buffer_val.wrapping_sub(interpolated_val); + } + let bit_width = buffer.iter().copied().map(compute_num_bits).max().unwrap(); + + for &buffer_val in &buffer { + bit_packer.write(buffer_val, bit_width, wrt)?; + } + + blocks.push(Block { + line, + bit_unpacker: BitUnpacker::new(bit_width), + data_start_offset: 0, + }); } - } - #[test] - fn test_simple() { - let data = (10..=20_u64).collect::>(); - create_and_validate(&data, "simple monotonically"); - } - #[test] - fn border_cases_1() { - let data = (0..1024).collect::>(); - create_and_validate(&data, "border case"); - } - #[test] - fn border_case_2() { - let data = (0..1025).collect::>(); - create_and_validate(&data, "border case"); - } - #[test] - fn rand() { - for _ in 0..10 { - let mut data = (5_000..20_000) - .map(|_| rand::random::() as u64) - .collect::>(); - let _ = create_and_validate(&data, "random"); - data.reverse(); - create_and_validate(&data, "random"); - } + let params = BlockwiseLinearParams { + num_vals, + min_value: fastfield_accessor.min_value(), + max_value: fastfield_accessor.max_value(), + blocks, + }; + bit_packer.close(wrt)?; + + let mut counting_wrt = CountingWriter::wrap(wrt); + + params.serialize(&mut counting_wrt)?; + + let footer_len = counting_wrt.written_bytes(); + + (footer_len as u32).serialize(&mut counting_wrt)?; + + Ok(()) + } +} + +#[derive(Clone)] +pub struct BlockwiseLinearReader { + params: Arc, + data: OwnedBytes, +} + +impl Column for BlockwiseLinearReader { + #[inline(always)] + fn get_val(&self, idx: u64) -> u64 { + let block_id = (idx / CHUNK_SIZE as u64) as usize; + let idx_within_block = idx % (CHUNK_SIZE as u64); + let block = &self.params.blocks[block_id]; + let interpoled_val: u64 = block.line.eval(idx_within_block); + let block_bytes = &self.data[block.data_start_offset..]; + let bitpacked_diff = block.bit_unpacker.get(idx_within_block, block_bytes); + interpoled_val.wrapping_add(bitpacked_diff) + } + + fn min_value(&self) -> u64 { + self.params.min_value + } + + fn max_value(&self) -> u64 { + self.params.max_value + } + + fn num_vals(&self) -> u64 { + self.params.num_vals } } diff --git a/fastfield_codecs/src/lib.rs b/fastfield_codecs/src/lib.rs index ed219ac1e..92cf6961a 100644 --- a/fastfield_codecs/src/lib.rs +++ b/fastfield_codecs/src/lib.rs @@ -15,6 +15,7 @@ use ownedbytes::OwnedBytes; pub mod bitpacked; pub mod blockwise_linear; +pub(crate) mod line; pub mod linear; mod column; @@ -287,10 +288,10 @@ mod tests { let multi_linear_interpol_estimation = BlockwiseLinearCodec::estimate(&data).unwrap(); assert_le!(multi_linear_interpol_estimation, 0.2); - assert_le!(linear_interpol_estimation, multi_linear_interpol_estimation); + assert_lt!(linear_interpol_estimation, multi_linear_interpol_estimation); let bitpacked_estimation = BitpackedCodec::estimate(&data).unwrap(); - assert_le!(linear_interpol_estimation, bitpacked_estimation); + assert_lt!(linear_interpol_estimation, bitpacked_estimation); } #[test] fn estimation_test_bad_interpolation_case() { @@ -298,11 +299,23 @@ mod tests { let data: VecColumn = data.into(); let linear_interpol_estimation = LinearCodec::estimate(&data).unwrap(); - assert_le!(linear_interpol_estimation, 0.32); + assert_le!(linear_interpol_estimation, 0.34); let bitpacked_estimation = BitpackedCodec::estimate(&data).unwrap(); - assert_le!(bitpacked_estimation, linear_interpol_estimation); + assert_lt!(bitpacked_estimation, linear_interpol_estimation); } + + #[test] + fn estimation_prefer_bitpacked() { + let data: &[u64] = &[10, 10, 10, 10]; + + let data: VecColumn = data.into(); + let linear_interpol_estimation = LinearCodec::estimate(&data).unwrap(); + + let bitpacked_estimation = BitpackedCodec::estimate(&data).unwrap(); + assert_lt!(bitpacked_estimation, linear_interpol_estimation); + } + #[test] fn estimation_test_bad_interpolation_case_monotonically_increasing() { let mut data: Vec = (200..=20000_u64).collect(); diff --git a/fastfield_codecs/src/line.rs b/fastfield_codecs/src/line.rs new file mode 100644 index 000000000..3cdaa7c88 --- /dev/null +++ b/fastfield_codecs/src/line.rs @@ -0,0 +1,204 @@ +use std::io; +use std::num::NonZeroU64; + +use common::{BinarySerializable, VInt}; + +use crate::Column; + +const MID_POINT: u64 = (1u64 << 32) - 1u64; + +/// `Line` describes a line function `y: ax + b` using integer +/// arithmetics. +/// +/// The slope is in fact a decimal split into a 32 bit integer value, +/// and a 32-bit decimal value. +/// +/// The multiplication then becomes. +/// `y = m * x >> 32 + b` +#[derive(Debug, Clone, Copy, Default)] +pub struct Line { + slope: u64, + intercept: u64, +} + +/// Compute the line slope. +/// +/// This function has the nice property of being +/// invariant by translation. +/// ` +/// compute_slope(y0, y1) +/// = compute_slope(y0 + X % 2^64, y1 + X % 2^64) +/// ` +fn compute_slope(y0: u64, y1: u64, num_vals: NonZeroU64) -> u64 { + let dy = y1.wrapping_sub(y0); + let sign = dy <= (1 << 63); + let abs_dy = if sign { + y1.wrapping_sub(y0) + } else { + y0.wrapping_sub(y1) + }; + if abs_dy >= 1 << 32 { + // This is outside of realm we handle. + // Let's just bail. + return 0u64; + } + + let abs_slope = (abs_dy << 32) / num_vals.get(); + if sign { + abs_slope + } else { + // The complement does indeed create the + // opposite decreasing slope... + // + // Intuitively (without the bitshifts and % u64::MAX) + // ``` + // (x + shift)*(u64::MAX - abs_slope) + // - (x * (u64::MAX - abs_slope)) + // = - shift * abs_slope + // ``` + u64::MAX - abs_slope + } +} + +impl Line { + #[inline(always)] + pub fn eval(&self, x: u64) -> u64 { + let linear_part = (x.wrapping_mul(self.slope) >> 32) as i32 as u64; + self.intercept.wrapping_add(linear_part) + } + + // Same as train, but the intercept is only estimated from provided sample positions + pub fn estimate(ys: &dyn Column, sample_positions: &[u64]) -> Self { + Self::train_from(ys, sample_positions.iter().cloned()) + } + + // Intercept is only computed from provided positions + fn train_from(ys: &dyn Column, positions: impl Iterator) -> Self { + let num_vals = if let Some(num_vals) = NonZeroU64::new(ys.num_vals() - 1) { + num_vals + } else { + return Line::default(); + }; + + let y0 = ys.get_val(0); + let y1 = ys.get_val(num_vals.get()); + + // We first independently pick our slope. + let slope = compute_slope(y0, y1, num_vals); + + // We picked our slope. Note that it does not have to be perfect. + // Now we need to compute the best intercept. + // + // Intuitively, the best intercept is such that line passes through one of the + // `(i, ys[])`. + // + // The best intercept therefore has the form + // `y[i] - line.eval(i)` (using wrapping arithmetics). + // In other words, the best intercept is one of the `y - Line::eval(ys[i])` + // and our task is just to pick the one that minimizes our error. + // + // Without sorting our values, this is a difficult problem. + // We however rely on the following trick... + // + // We only focus on the case where the interpolation is half decent. + // If the line interpolation is doing its job on a dataset suited for it, + // we can hope that the maximum error won't be larger than `u64::MAX / 2`. + // + // In other words, even without the intercept the values `y - Line::eval(ys[i])` will all be + // within an interval that takes less than half of the modulo space of `u64`. + // + // Our task is therefore to identify this interval. + // Here we simply translate all of our values by `y0 - 2^63` and pick the min. + let mut line = Line { + slope, + intercept: 0, + }; + let heuristic_shift = y0.wrapping_sub(MID_POINT); + line.intercept = positions + .map(|pos| { + let y = ys.get_val(pos); + y.wrapping_sub(line.eval(pos)) + }) + .min_by_key(|&val| val.wrapping_sub(heuristic_shift)) + .unwrap_or(0u64); //< Never happens. + line + } + + /// Returns a line that attemps to approximate a function + /// f: i in 0..[ys.num_vals()) -> ys[i]. + /// + /// - The approximation is always lower than the actual value. + /// Or more rigorously, formally `f(i).wrapping_sub(ys[i])` is small + /// for any i in [0..ys.len()). + /// - It computes without panicking for any value of it. + /// + /// This function is only invariable by translation if all of the + /// `ys` are packaged into half of the space. (See heuristic below) + pub fn train(ys: &dyn Column) -> Self { + Self::train_from(ys, 0..ys.num_vals()) + } +} + +impl BinarySerializable for Line { + fn serialize(&self, writer: &mut W) -> io::Result<()> { + VInt(self.slope).serialize(writer)?; + VInt(self.intercept).serialize(writer)?; + Ok(()) + } + + fn deserialize(reader: &mut R) -> io::Result { + let slope = VInt::deserialize(reader)?.0; + let intercept = VInt::deserialize(reader)?.0; + Ok(Line { slope, intercept }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::VecColumn; + + /// Test training a line and ensuring that the maximum difference between + /// the data points and the line is `expected`. + /// + /// This function operates translation over the data for better coverage. + #[track_caller] + fn test_line_interpol_with_translation(ys: &[u64], expected: Option) { + let mut translations = vec![0, 100, u64::MAX / 2, u64::MAX, u64::MAX - 1]; + translations.extend_from_slice(ys); + for translation in translations { + let translated_ys: Vec = ys + .iter() + .copied() + .map(|y| y.wrapping_add(translation)) + .collect(); + let largest_err = test_eval_max_err(&translated_ys); + assert_eq!(largest_err, expected); + } + } + + fn test_eval_max_err(ys: &[u64]) -> Option { + let line = Line::train(&VecColumn::from(&ys)); + ys.iter() + .enumerate() + .map(|(x, y)| y.wrapping_sub(line.eval(x as u64))) + .max() + } + + #[test] + fn test_train() { + test_line_interpol_with_translation(&[11, 11, 11, 12, 12, 13], Some(1)); + test_line_interpol_with_translation(&[13, 12, 12, 11, 11, 11], Some(1)); + test_line_interpol_with_translation(&[13, 13, 12, 11, 11, 11], Some(1)); + test_line_interpol_with_translation(&[13, 13, 12, 11, 11, 11], Some(1)); + test_line_interpol_with_translation(&[u64::MAX - 1, 0, 0, 1], Some(1)); + test_line_interpol_with_translation(&[u64::MAX - 1, u64::MAX, 0, 1], Some(0)); + test_line_interpol_with_translation(&[0, 1, 2, 3, 5], Some(0)); + test_line_interpol_with_translation(&[1, 2, 3, 4], Some(0)); + + let data: Vec = (0..255).collect(); + test_line_interpol_with_translation(&data, Some(0)); + let data: Vec = (0..255).map(|el| el * 2).collect(); + test_line_interpol_with_translation(&data, Some(0)); + } +} diff --git a/fastfield_codecs/src/linear.rs b/fastfield_codecs/src/linear.rs index 32dd43ec7..dc411b768 100644 --- a/fastfield_codecs/src/linear.rs +++ b/fastfield_codecs/src/linear.rs @@ -1,10 +1,10 @@ -use std::io::{self, Read, Write}; -use std::ops::Sub; +use std::io::{self, Write}; -use common::{BinarySerializable, FixedSize}; +use common::{BinarySerializable, CountingWriter, DeserializeFrom}; use ownedbytes::OwnedBytes; use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker}; +use crate::line::Line; use crate::{Column, FastFieldCodec, FastFieldCodecType}; /// Depending on the field type, a different @@ -12,56 +12,15 @@ use crate::{Column, FastFieldCodec, FastFieldCodecType}; #[derive(Clone)] pub struct LinearReader { data: OwnedBytes, - bit_unpacker: BitUnpacker, - pub footer: LinearFooter, - pub slope: f32, -} - -#[derive(Clone, Debug)] -pub struct LinearFooter { - pub relative_max_value: u64, - pub offset: u64, - pub first_val: u64, - pub last_val: u64, - pub num_vals: u64, - pub min_value: u64, - pub max_value: u64, -} - -impl BinarySerializable for LinearFooter { - fn serialize(&self, write: &mut W) -> io::Result<()> { - self.relative_max_value.serialize(write)?; - self.offset.serialize(write)?; - self.first_val.serialize(write)?; - self.last_val.serialize(write)?; - self.num_vals.serialize(write)?; - self.min_value.serialize(write)?; - self.max_value.serialize(write)?; - Ok(()) - } - - fn deserialize(reader: &mut R) -> io::Result { - Ok(LinearFooter { - relative_max_value: u64::deserialize(reader)?, - offset: u64::deserialize(reader)?, - first_val: u64::deserialize(reader)?, - last_val: u64::deserialize(reader)?, - num_vals: u64::deserialize(reader)?, - min_value: u64::deserialize(reader)?, - max_value: u64::deserialize(reader)?, - }) - } -} - -impl FixedSize for LinearFooter { - const SIZE_IN_BYTES: usize = 56; + footer: LinearParams, } impl Column for LinearReader { #[inline] fn get_val(&self, doc: u64) -> u64 { - let calculated_value = get_calculated_value(self.footer.first_val, doc, self.slope); - (calculated_value + self.bit_unpacker.get(doc, &self.data)) - self.footer.offset + let interpoled_val: u64 = self.footer.line.eval(doc); + let bitpacked_diff = self.footer.bit_unpacker.get(doc, &self.data); + interpoled_val.wrapping_add(bitpacked_diff) } #[inline] @@ -82,42 +41,38 @@ impl Column for LinearReader { /// and stores the difference bitpacked. pub struct LinearCodec; -#[inline] -pub(crate) fn get_slope(first_val: u64, last_val: u64, num_vals: u64) -> f32 { - if num_vals <= 1 { - return 0.0; - } - // We calculate the slope with f64 high precision and use the result in lower precision f32 - // This is done in order to handle estimations for very large values like i64::MAX - let diff = diff(last_val, first_val); - (diff / (num_vals - 1) as f64) as f32 +#[derive(Debug, Clone)] +struct LinearParams { + num_vals: u64, + min_value: u64, + max_value: u64, + line: Line, + bit_unpacker: BitUnpacker, } -/// Delay the cast, to improve precision for very large u64 values. -/// -/// Since i64 is mapped monotonically to u64 space, 0i64 is after the mapping i64::MAX. -/// So very large values are not uncommon. -/// -/// ```rust -/// let val1 = i64::MAX; -/// let val2 = i64::MAX - 100; -/// assert_eq!(val1 - val2, 100); -/// assert_eq!(val1 as f64 - val2 as f64, 0.0); -/// ``` -fn diff(val1: u64, val2: u64) -> f64 { - if val1 >= val2 { - (val1 - val2) as f64 - } else { - (val2 - val1) as f64 * -1.0 +impl BinarySerializable for LinearParams { + fn serialize(&self, writer: &mut W) -> io::Result<()> { + self.num_vals.serialize(writer)?; + self.min_value.serialize(writer)?; + self.max_value.serialize(writer)?; + self.line.serialize(writer)?; + self.bit_unpacker.bit_width().serialize(writer)?; + Ok(()) } -} -#[inline] -pub fn get_calculated_value(first_val: u64, pos: u64, slope: f32) -> u64 { - if slope < 0.0 { - first_val.saturating_sub((pos as f32 * -slope) as u64) - } else { - first_val.saturating_add((pos as f32 * slope) as u64) + fn deserialize(reader: &mut R) -> io::Result { + let num_vals = u64::deserialize(reader)?; + let min_value = u64::deserialize(reader)?; + let max_value = u64::deserialize(reader)?; + let line = Line::deserialize(reader)?; + let bit_width = u8::deserialize(reader)?; + Ok(Self { + num_vals, + min_value, + max_value, + line, + bit_unpacker: BitUnpacker::new(bit_width), + }) } } @@ -128,65 +83,50 @@ impl FastFieldCodec for LinearCodec { /// Opens a fast field given a file. fn open_from_bytes(bytes: OwnedBytes) -> io::Result { - let footer_offset = bytes.len() - LinearFooter::SIZE_IN_BYTES; + let footer_len: u32 = (&bytes[bytes.len() - 4..]).deserialize()?; + let footer_offset = bytes.len() - 4 - footer_len as usize; let (data, mut footer) = bytes.split(footer_offset); - let footer = LinearFooter::deserialize(&mut footer)?; - let slope = get_slope(footer.first_val, footer.last_val, footer.num_vals); - let num_bits = compute_num_bits(footer.relative_max_value); - let bit_unpacker = BitUnpacker::new(num_bits); - Ok(LinearReader { - data, - bit_unpacker, - footer, - slope, - }) + let footer = LinearParams::deserialize(&mut footer)?; + Ok(LinearReader { data, footer }) } /// Creates a new fast field serializer. fn serialize(write: &mut impl Write, fastfield_accessor: &dyn Column) -> io::Result<()> { assert!(fastfield_accessor.min_value() <= fastfield_accessor.max_value()); + let line = Line::train(fastfield_accessor); - let first_val = fastfield_accessor.get_val(0); - let last_val = fastfield_accessor.get_val(fastfield_accessor.num_vals() as u64 - 1); - let slope = get_slope(first_val, last_val, fastfield_accessor.num_vals()); - // calculate offset to ensure all values are positive - let mut offset = 0; - let mut rel_positive_max = 0; - for (pos, actual_value) in fastfield_accessor.iter().enumerate() { - let calculated_value = get_calculated_value(first_val, pos as u64, slope); - if calculated_value > actual_value { - // negative value we need to apply an offset - // we ignore negative values in the max value calculation, because negative values - // will be offset to 0 - offset = offset.max(calculated_value - actual_value); - } else { - // positive value no offset reuqired - rel_positive_max = rel_positive_max.max(actual_value - calculated_value); - } - } + let max_offset_from_line = fastfield_accessor + .iter() + .enumerate() + .map(|(pos, actual_value)| { + let calculated_value = line.eval(pos as u64); + actual_value.wrapping_sub(calculated_value) + }) + .max() + .unwrap(); - // rel_positive_max will be adjusted by offset - let relative_max_value = rel_positive_max + offset; - - let num_bits = compute_num_bits(relative_max_value); + let num_bits = compute_num_bits(max_offset_from_line); let mut bit_packer = BitPacker::new(); - for (pos, val) in fastfield_accessor.iter().enumerate() { - let calculated_value = get_calculated_value(first_val, pos as u64, slope); - let diff = (val + offset) - calculated_value; - bit_packer.write(diff, num_bits, write)?; + for (pos, actual_value) in fastfield_accessor.iter().enumerate() { + let calculated_value = line.eval(pos as u64); + let offset = actual_value.wrapping_sub(calculated_value); + bit_packer.write(offset, num_bits, write)?; } bit_packer.close(write)?; - let footer = LinearFooter { - relative_max_value, - offset, - first_val, - last_val, + let footer = LinearParams { num_vals: fastfield_accessor.num_vals(), min_value: fastfield_accessor.min_value(), max_value: fastfield_accessor.max_value(), + line, + bit_unpacker: BitUnpacker::new(num_bits), }; - footer.serialize(write)?; + + let mut counting_wrt = CountingWriter::wrap(write); + footer.serialize(&mut counting_wrt)?; + let footer_len = counting_wrt.written_bytes(); + (footer_len as u32).serialize(&mut counting_wrt)?; + Ok(()) } @@ -199,64 +139,32 @@ impl FastFieldCodec for LinearCodec { return None; // disable compressor for this case } - // On serialisation the offset is added to the actual value. - // We need to make sure this won't run into overflow calculation issues. - // For this we take the maximum theroretical offset and add this to the max value. - // If this doesn't overflow the algorithm should be fine - let theorethical_maximum_offset = - fastfield_accessor.max_value() - fastfield_accessor.min_value(); - if fastfield_accessor - .max_value() - .checked_add(theorethical_maximum_offset) - .is_none() - { - return None; - } - - let first_val = fastfield_accessor.get_val(0); - let last_val = fastfield_accessor.get_val(fastfield_accessor.num_vals() as u64 - 1); - let slope = get_slope(first_val, last_val, fastfield_accessor.num_vals()); - // let's sample at 0%, 5%, 10% .. 95%, 100% let num_vals = fastfield_accessor.num_vals() as f32 / 100.0; let sample_positions = (0..20) - .map(|pos| (num_vals * pos as f32 * 5.0) as usize) + .map(|pos| (num_vals * pos as f32 * 5.0) as u64) .collect::>(); - let max_distance = sample_positions - .iter() + let line = Line::estimate(fastfield_accessor, &sample_positions); + + let estimated_bit_width = sample_positions + .into_iter() .map(|pos| { - let calculated_value = get_calculated_value(first_val, *pos as u64, slope); - let actual_value = fastfield_accessor.get_val(*pos as u64); - distance(calculated_value, actual_value) + let actual_value = fastfield_accessor.get_val(pos); + let interpolated_val = line.eval(pos as u64); + actual_value.wrapping_sub(interpolated_val) }) + .map(|diff| ((diff as f32 * 1.5) * 2.0) as u64) + .map(compute_num_bits) .max() .unwrap_or(0); - // the theory would be that we don't have the actual max_distance, but we are close within - // 50% threshold. - // It is multiplied by 2 because in a log case scenario the line would be as much above as - // below. So the offset would = max_distance - // - let relative_max_value = (max_distance as f32 * 1.5) * 2.0; - - let num_bits = compute_num_bits(relative_max_value as u64) as u64 - * fastfield_accessor.num_vals() - + LinearFooter::SIZE_IN_BYTES as u64; + let num_bits = (estimated_bit_width as u64 * fastfield_accessor.num_vals() as u64) + 64; let num_bits_uncompressed = 64 * fastfield_accessor.num_vals(); Some(num_bits as f32 / num_bits_uncompressed as f32) } } -#[inline] -fn distance + Ord>(x: T, y: T) -> T { - if x < y { - y - x - } else { - x - y - } -} - #[cfg(test)] mod tests { use rand::RngCore; @@ -268,34 +176,14 @@ mod tests { crate::tests::create_and_validate::(data, name) } - #[test] - fn get_calculated_value_test() { - // pos slope - assert_eq!(get_calculated_value(100, 10, 5.0), 150); - - // neg slope - assert_eq!(get_calculated_value(100, 10, -5.0), 50); - - // pos slope, very high values - assert_eq!( - get_calculated_value(i64::MAX as u64, 10, 5.0), - i64::MAX as u64 + 50 - ); - // neg slope, very high values - assert_eq!( - get_calculated_value(i64::MAX as u64, 10, -5.0), - i64::MAX as u64 - 50 - ); - } - #[test] fn test_compression() { let data = (10..=6_000_u64).collect::>(); let (estimate, actual_compression) = create_and_validate(&data, "simple monotonically large").unwrap(); - assert!(actual_compression < 0.01); - assert!(estimate < 0.01); + assert_le!(actual_compression, 0.001); + assert_le!(estimate, 0.02); } #[test] diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index c68d7c338..be4713c7b 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -400,7 +400,11 @@ mod tests { let file = directory.open_read(path).unwrap(); // assert_eq!(file.len(), 17710 as usize); //bitpacked size // assert_eq!(file.len(), 10175_usize); // linear interpol size - assert_eq!(file.len(), 75_usize); // linear interpol size after calc improvement + // assert_eq!(file.len(), 75_usize); // linear interpol size after calc improvement + // assert_eq!(file.len(), 1325_usize); // linear interpol size after switching to int based + assert_eq!(file.len(), 62_usize); // linear interpol size after switching to int based, off + // by one fix + { let fast_fields_composite = CompositeFile::open(&file)?; let data = fast_fields_composite