diff --git a/bitpacker/Cargo.toml b/bitpacker/Cargo.toml index 576f2be47..4c14a2398 100644 --- a/bitpacker/Cargo.toml +++ b/bitpacker/Cargo.toml @@ -15,6 +15,7 @@ homepage = "https://github.com/quickwit-oss/tantivy" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +bitpacking = {version="0.8", default-features=false, features = ["bitpacker1x"]} [dev-dependencies] rand = "0.8" diff --git a/bitpacker/src/bitpacker.rs b/bitpacker/src/bitpacker.rs index 6d75c5e72..65cd3abe0 100644 --- a/bitpacker/src/bitpacker.rs +++ b/bitpacker/src/bitpacker.rs @@ -1,10 +1,14 @@ use std::convert::TryInto; use std::io; +use std::ops::{Range, RangeInclusive}; + +use bitpacking::{BitPacker as ExternalBitPackerTrait, BitPacker1x}; pub struct BitPacker { mini_buffer: u64, mini_buffer_written: usize, } + impl Default for BitPacker { fn default() -> Self { BitPacker::new() @@ -118,6 +122,125 @@ impl BitUnpacker { let val_shifted = val_unshifted_unmasked >> bit_shift; val_shifted & self.mask } + + // Decodes the range of bitpacked `u32` values with idx + // in [start_idx, start_idx + output.len()). + // + // #Panics + // + // This methods panics if `num_bits` is > 32. + fn get_batch_u32s(&self, start_idx: u32, data: &[u8], output: &mut [u32]) { + assert!( + self.bit_width() <= 32, + "Bitwidth must be <= 32 to use this method." + ); + + let end_idx = start_idx + output.len() as u32; + + let end_bit_read = end_idx * self.num_bits; + let end_byte_read = (end_bit_read + 7) / 8; + assert!( + end_byte_read as usize <= data.len(), + "Requested index is out of bounds." + ); + + // Simple slow implementation of get_batch_u32s, to deal with our ramps. + let get_batch_ramp = |start_idx: u32, output: &mut [u32]| { + for (out, idx) in output.iter_mut().zip(start_idx..) { + *out = self.get(idx, data) as u32; + } + }; + + // We use an unrolled routine to decode 32 values at once. + // We therefore decompose our range of values to decode into three ranges: + // - Entrance ramp: [start_idx, fast_track_start) (up to 31 values) + // - Highway: [fast_track_start, fast_track_end) (a length multiple of 32s) + // - Exit ramp: [fast_track_end, start_idx + output.len()) (up to 31 values) + + // We want the start of the fast track to start align with bytes. + // A sufficient condition is to start with an idx that is a multiple of 8, + // so highway start is the closest multiple of 8 that is >= start_idx. + let entrance_ramp_len = 8 - (start_idx % 8) % 8; + + let highway_start: u32 = start_idx + entrance_ramp_len; + + if highway_start + BitPacker1x::BLOCK_LEN as u32 > end_idx { + // We don't have enough values to have even a single block of highway. + // Let's just supply the values the simple way. + get_batch_ramp(start_idx, output); + return; + } + + let num_blocks: u32 = (end_idx - highway_start) / BitPacker1x::BLOCK_LEN as u32; + + // Entrance ramp + get_batch_ramp(start_idx, &mut output[..entrance_ramp_len as usize]); + + // Highway + let mut offset = (highway_start * self.num_bits) as usize / 8; + let mut output_cursor = (highway_start - start_idx) as usize; + for _ in 0..num_blocks { + offset += BitPacker1x.decompress( + &data[offset..], + &mut output[output_cursor..], + self.num_bits as u8, + ); + output_cursor += 32; + } + + // Exit ramp + let highway_end = highway_start + num_blocks * BitPacker1x::BLOCK_LEN as u32; + get_batch_ramp(highway_end, &mut output[output_cursor..]); + } + + pub fn get_ids_for_value_range( + &self, + range: RangeInclusive, + id_range: Range, + data: &[u8], + positions: &mut Vec, + ) { + if self.bit_width() > 32 { + self.get_ids_for_value_range_slow(range, id_range, data, positions) + } else { + if *range.start() > u32::MAX as u64 { + positions.clear(); + return; + } + let range_u32 = (*range.start() as u32)..=(*range.end()).min(u32::MAX as u64) as u32; + self.get_ids_for_value_range_fast(range_u32, id_range, data, positions) + } + } + + fn get_ids_for_value_range_slow( + &self, + range: RangeInclusive, + id_range: Range, + data: &[u8], + positions: &mut Vec, + ) { + positions.clear(); + for i in id_range { + // If we cared we could make this branchless, but the slow implementation should rarely + // kick in. + let val = self.get(i, data); + if range.contains(&val) { + positions.push(i); + } + } + } + + fn get_ids_for_value_range_fast( + &self, + value_range: RangeInclusive, + id_range: Range, + data: &[u8], + positions: &mut Vec, + ) { + positions.resize(id_range.len(), 0u32); + self.get_batch_u32s(id_range.start, data, positions); + crate::filter_vec::filter_vec_in_place(value_range, id_range.start, positions) + } } #[cfg(test)] @@ -200,4 +323,58 @@ mod test { test_bitpacker_aux(num_bits, &vals); } } + + #[test] + #[should_panic] + fn test_get_batch_panics_over_32_bits() { + let bitunpacker = BitUnpacker::new(33); + let mut output: [u32; 1] = [0u32]; + bitunpacker.get_batch_u32s(0, &[0, 0, 0, 0, 0, 0, 0, 0], &mut output[..]); + } + + #[test] + fn test_get_batch_limit() { + let bitunpacker = BitUnpacker::new(1); + let mut output: [u32; 3] = [0u32, 0u32, 0u32]; + bitunpacker.get_batch_u32s(8 * 4 - 3, &[0u8, 0u8, 0u8, 0u8], &mut output[..]); + } + + #[test] + #[should_panic] + fn test_get_batch_panics_when_off_scope() { + let bitunpacker = BitUnpacker::new(1); + let mut output: [u32; 3] = [0u32, 0u32, 0u32]; + // We are missing exactly one bit. + bitunpacker.get_batch_u32s(8 * 4 - 2, &[0u8, 0u8, 0u8, 0u8], &mut output[..]); + } + + proptest::proptest! { + #[test] + fn test_get_batch_u32s_proptest(num_bits in 0u8..=32u8) { + let mask = + if num_bits == 32u8 { + u32::MAX + } else { + (1u32 << num_bits) - 1 + }; + let mut buffer: Vec = Vec::new(); + let mut bitpacker = BitPacker::new(); + for val in 0..100 { + bitpacker.write(val & mask as u64, num_bits, &mut buffer).unwrap(); + } + bitpacker.flush(&mut buffer).unwrap(); + let bitunpacker = BitUnpacker::new(num_bits); + let mut output: Vec = Vec::new(); + for len in [0, 1, 2, 32, 33, 34, 64] { + for start_idx in 0u32..32u32 { + output.resize(len as usize, 0); + bitunpacker.get_batch_u32s(start_idx, &buffer, &mut output); + for i in 0..len { + let expected = (start_idx + i as u32) & mask; + assert_eq!(output[i], expected); + } + } + } + } + } } diff --git a/bitpacker/src/filter_vec/avx2.rs b/bitpacker/src/filter_vec/avx2.rs new file mode 100644 index 000000000..fa1400d2b --- /dev/null +++ b/bitpacker/src/filter_vec/avx2.rs @@ -0,0 +1,365 @@ +//! SIMD filtering of a vector as described in the following blog post. +//! https://quickwit.io/blog/filtering%20a%20vector%20with%20simd%20instructions%20avx-2%20and%20avx-512 +use std::arch::x86_64::{ + __m256i as DataType, _mm256_add_epi32 as op_add, _mm256_cmpgt_epi32 as op_greater, + _mm256_lddqu_si256 as load_unaligned, _mm256_or_si256 as op_or, _mm256_set1_epi32 as set1, + _mm256_storeu_si256 as store_unaligned, _mm256_xor_si256 as op_xor, *, +}; +use std::ops::RangeInclusive; + +const NUM_LANES: usize = 8; + +const HIGHEST_BIT: u32 = 1 << 31; + +#[inline] +fn u32_to_i32(val: u32) -> i32 { + (val ^ HIGHEST_BIT) as i32 +} + +#[inline] +unsafe fn u32_to_i32_avx2(vals_u32x8s: DataType) -> DataType { + const HIGHEST_BIT_MASK: DataType = from_u32x8([HIGHEST_BIT; NUM_LANES]); + op_xor(vals_u32x8s, HIGHEST_BIT_MASK) +} + +pub fn filter_vec_in_place(range: RangeInclusive, offset: u32, output: &mut Vec) { + // We use a monotonic mapping from u32 to i32 to make the comparison possible in AVX2. + let range_i32: RangeInclusive = u32_to_i32(*range.start())..=u32_to_i32(*range.end()); + let num_words = output.len() / NUM_LANES; + let mut output_len = unsafe { + filter_vec_avx2_aux( + output.as_ptr() as *const __m256i, + range_i32, + output.as_mut_ptr(), + offset, + num_words, + ) + }; + let reminder_start = num_words * NUM_LANES; + for i in reminder_start..output.len() { + let val = output[i]; + output[output_len] = offset + i as u32; + output_len += if range.contains(&val) { 1 } else { 0 }; + } + output.truncate(output_len); +} + +#[target_feature(enable = "avx2")] +unsafe fn filter_vec_avx2_aux( + mut input: *const __m256i, + range: RangeInclusive, + output: *mut u32, + offset: u32, + num_words: usize, +) -> usize { + let mut output_tail = output; + let range_simd = set1(*range.start())..=set1(*range.end()); + let mut ids = from_u32x8([ + offset, + offset + 1, + offset + 2, + offset + 3, + offset + 4, + offset + 5, + offset + 6, + offset + 7, + ]); + const SHIFT: __m256i = from_u32x8([NUM_LANES as u32; NUM_LANES]); + for _ in 0..num_words { + let word = load_unaligned(input); + let word = u32_to_i32_avx2(word); + let keeper_bitset = compute_filter_bitset(word, range_simd.clone()); + let added_len = keeper_bitset.count_ones(); + let filtered_doc_ids = compact(ids, keeper_bitset); + store_unaligned(output_tail as *mut __m256i, filtered_doc_ids); + output_tail = output_tail.offset(added_len as isize); + ids = op_add(ids, SHIFT); + input = input.offset(1); + } + output_tail.offset_from(output) as usize +} + +#[inline] +#[target_feature(enable = "avx2")] +unsafe fn compact(data: DataType, mask: u8) -> DataType { + let vperm_mask = MASK_TO_PERMUTATION[mask as usize]; + _mm256_permutevar8x32_epi32(data, vperm_mask) +} + +#[inline] +#[target_feature(enable = "avx2")] +unsafe fn compute_filter_bitset(val: __m256i, range: std::ops::RangeInclusive<__m256i>) -> u8 { + let too_low = op_greater(*range.start(), val); + let too_high = op_greater(val, *range.end()); + let inside = op_or(too_low, too_high); + 255 - std::arch::x86_64::_mm256_movemask_ps(std::mem::transmute::(inside)) + as u8 +} + +union U8x32 { + vector: DataType, + vals: [u32; NUM_LANES], +} + +const fn from_u32x8(vals: [u32; NUM_LANES]) -> DataType { + unsafe { U8x32 { vals }.vector } +} + +const MASK_TO_PERMUTATION: [DataType; 256] = [ + from_u32x8([0, 0, 0, 0, 0, 0, 0, 0]), + from_u32x8([0, 0, 0, 0, 0, 0, 0, 0]), + from_u32x8([1, 0, 0, 0, 0, 0, 0, 0]), + from_u32x8([0, 1, 0, 0, 0, 0, 0, 0]), + from_u32x8([2, 0, 0, 0, 0, 0, 0, 0]), + from_u32x8([0, 2, 0, 0, 0, 0, 0, 0]), + from_u32x8([1, 2, 0, 0, 0, 0, 0, 0]), + from_u32x8([0, 1, 2, 0, 0, 0, 0, 0]), + from_u32x8([3, 0, 0, 0, 0, 0, 0, 0]), + from_u32x8([0, 3, 0, 0, 0, 0, 0, 0]), + from_u32x8([1, 3, 0, 0, 0, 0, 0, 0]), + from_u32x8([0, 1, 3, 0, 0, 0, 0, 0]), + from_u32x8([2, 3, 0, 0, 0, 0, 0, 0]), + from_u32x8([0, 2, 3, 0, 0, 0, 0, 0]), + from_u32x8([1, 2, 3, 0, 0, 0, 0, 0]), + from_u32x8([0, 1, 2, 3, 0, 0, 0, 0]), + from_u32x8([4, 0, 0, 0, 0, 0, 0, 0]), + from_u32x8([0, 4, 0, 0, 0, 0, 0, 0]), + from_u32x8([1, 4, 0, 0, 0, 0, 0, 0]), + from_u32x8([0, 1, 4, 0, 0, 0, 0, 0]), + from_u32x8([2, 4, 0, 0, 0, 0, 0, 0]), + from_u32x8([0, 2, 4, 0, 0, 0, 0, 0]), + from_u32x8([1, 2, 4, 0, 0, 0, 0, 0]), + from_u32x8([0, 1, 2, 4, 0, 0, 0, 0]), + from_u32x8([3, 4, 0, 0, 0, 0, 0, 0]), + from_u32x8([0, 3, 4, 0, 0, 0, 0, 0]), + from_u32x8([1, 3, 4, 0, 0, 0, 0, 0]), + from_u32x8([0, 1, 3, 4, 0, 0, 0, 0]), + from_u32x8([2, 3, 4, 0, 0, 0, 0, 0]), + from_u32x8([0, 2, 3, 4, 0, 0, 0, 0]), + from_u32x8([1, 2, 3, 4, 0, 0, 0, 0]), + from_u32x8([0, 1, 2, 3, 4, 0, 0, 0]), + from_u32x8([5, 0, 0, 0, 0, 0, 0, 0]), + from_u32x8([0, 5, 0, 0, 0, 0, 0, 0]), + from_u32x8([1, 5, 0, 0, 0, 0, 0, 0]), + from_u32x8([0, 1, 5, 0, 0, 0, 0, 0]), + from_u32x8([2, 5, 0, 0, 0, 0, 0, 0]), + from_u32x8([0, 2, 5, 0, 0, 0, 0, 0]), + from_u32x8([1, 2, 5, 0, 0, 0, 0, 0]), + from_u32x8([0, 1, 2, 5, 0, 0, 0, 0]), + from_u32x8([3, 5, 0, 0, 0, 0, 0, 0]), + from_u32x8([0, 3, 5, 0, 0, 0, 0, 0]), + from_u32x8([1, 3, 5, 0, 0, 0, 0, 0]), + from_u32x8([0, 1, 3, 5, 0, 0, 0, 0]), + from_u32x8([2, 3, 5, 0, 0, 0, 0, 0]), + from_u32x8([0, 2, 3, 5, 0, 0, 0, 0]), + from_u32x8([1, 2, 3, 5, 0, 0, 0, 0]), + from_u32x8([0, 1, 2, 3, 5, 0, 0, 0]), + from_u32x8([4, 5, 0, 0, 0, 0, 0, 0]), + from_u32x8([0, 4, 5, 0, 0, 0, 0, 0]), + from_u32x8([1, 4, 5, 0, 0, 0, 0, 0]), + from_u32x8([0, 1, 4, 5, 0, 0, 0, 0]), + from_u32x8([2, 4, 5, 0, 0, 0, 0, 0]), + from_u32x8([0, 2, 4, 5, 0, 0, 0, 0]), + from_u32x8([1, 2, 4, 5, 0, 0, 0, 0]), + from_u32x8([0, 1, 2, 4, 5, 0, 0, 0]), + from_u32x8([3, 4, 5, 0, 0, 0, 0, 0]), + from_u32x8([0, 3, 4, 5, 0, 0, 0, 0]), + from_u32x8([1, 3, 4, 5, 0, 0, 0, 0]), + from_u32x8([0, 1, 3, 4, 5, 0, 0, 0]), + from_u32x8([2, 3, 4, 5, 0, 0, 0, 0]), + from_u32x8([0, 2, 3, 4, 5, 0, 0, 0]), + from_u32x8([1, 2, 3, 4, 5, 0, 0, 0]), + from_u32x8([0, 1, 2, 3, 4, 5, 0, 0]), + from_u32x8([6, 0, 0, 0, 0, 0, 0, 0]), + from_u32x8([0, 6, 0, 0, 0, 0, 0, 0]), + from_u32x8([1, 6, 0, 0, 0, 0, 0, 0]), + from_u32x8([0, 1, 6, 0, 0, 0, 0, 0]), + from_u32x8([2, 6, 0, 0, 0, 0, 0, 0]), + from_u32x8([0, 2, 6, 0, 0, 0, 0, 0]), + from_u32x8([1, 2, 6, 0, 0, 0, 0, 0]), + from_u32x8([0, 1, 2, 6, 0, 0, 0, 0]), + from_u32x8([3, 6, 0, 0, 0, 0, 0, 0]), + from_u32x8([0, 3, 6, 0, 0, 0, 0, 0]), + from_u32x8([1, 3, 6, 0, 0, 0, 0, 0]), + from_u32x8([0, 1, 3, 6, 0, 0, 0, 0]), + from_u32x8([2, 3, 6, 0, 0, 0, 0, 0]), + from_u32x8([0, 2, 3, 6, 0, 0, 0, 0]), + from_u32x8([1, 2, 3, 6, 0, 0, 0, 0]), + from_u32x8([0, 1, 2, 3, 6, 0, 0, 0]), + from_u32x8([4, 6, 0, 0, 0, 0, 0, 0]), + from_u32x8([0, 4, 6, 0, 0, 0, 0, 0]), + from_u32x8([1, 4, 6, 0, 0, 0, 0, 0]), + from_u32x8([0, 1, 4, 6, 0, 0, 0, 0]), + from_u32x8([2, 4, 6, 0, 0, 0, 0, 0]), + from_u32x8([0, 2, 4, 6, 0, 0, 0, 0]), + from_u32x8([1, 2, 4, 6, 0, 0, 0, 0]), + from_u32x8([0, 1, 2, 4, 6, 0, 0, 0]), + from_u32x8([3, 4, 6, 0, 0, 0, 0, 0]), + from_u32x8([0, 3, 4, 6, 0, 0, 0, 0]), + from_u32x8([1, 3, 4, 6, 0, 0, 0, 0]), + from_u32x8([0, 1, 3, 4, 6, 0, 0, 0]), + from_u32x8([2, 3, 4, 6, 0, 0, 0, 0]), + from_u32x8([0, 2, 3, 4, 6, 0, 0, 0]), + from_u32x8([1, 2, 3, 4, 6, 0, 0, 0]), + from_u32x8([0, 1, 2, 3, 4, 6, 0, 0]), + from_u32x8([5, 6, 0, 0, 0, 0, 0, 0]), + from_u32x8([0, 5, 6, 0, 0, 0, 0, 0]), + from_u32x8([1, 5, 6, 0, 0, 0, 0, 0]), + from_u32x8([0, 1, 5, 6, 0, 0, 0, 0]), + from_u32x8([2, 5, 6, 0, 0, 0, 0, 0]), + from_u32x8([0, 2, 5, 6, 0, 0, 0, 0]), + from_u32x8([1, 2, 5, 6, 0, 0, 0, 0]), + from_u32x8([0, 1, 2, 5, 6, 0, 0, 0]), + from_u32x8([3, 5, 6, 0, 0, 0, 0, 0]), + from_u32x8([0, 3, 5, 6, 0, 0, 0, 0]), + from_u32x8([1, 3, 5, 6, 0, 0, 0, 0]), + from_u32x8([0, 1, 3, 5, 6, 0, 0, 0]), + from_u32x8([2, 3, 5, 6, 0, 0, 0, 0]), + from_u32x8([0, 2, 3, 5, 6, 0, 0, 0]), + from_u32x8([1, 2, 3, 5, 6, 0, 0, 0]), + from_u32x8([0, 1, 2, 3, 5, 6, 0, 0]), + from_u32x8([4, 5, 6, 0, 0, 0, 0, 0]), + from_u32x8([0, 4, 5, 6, 0, 0, 0, 0]), + from_u32x8([1, 4, 5, 6, 0, 0, 0, 0]), + from_u32x8([0, 1, 4, 5, 6, 0, 0, 0]), + from_u32x8([2, 4, 5, 6, 0, 0, 0, 0]), + from_u32x8([0, 2, 4, 5, 6, 0, 0, 0]), + from_u32x8([1, 2, 4, 5, 6, 0, 0, 0]), + from_u32x8([0, 1, 2, 4, 5, 6, 0, 0]), + from_u32x8([3, 4, 5, 6, 0, 0, 0, 0]), + from_u32x8([0, 3, 4, 5, 6, 0, 0, 0]), + from_u32x8([1, 3, 4, 5, 6, 0, 0, 0]), + from_u32x8([0, 1, 3, 4, 5, 6, 0, 0]), + from_u32x8([2, 3, 4, 5, 6, 0, 0, 0]), + from_u32x8([0, 2, 3, 4, 5, 6, 0, 0]), + from_u32x8([1, 2, 3, 4, 5, 6, 0, 0]), + from_u32x8([0, 1, 2, 3, 4, 5, 6, 0]), + from_u32x8([7, 0, 0, 0, 0, 0, 0, 0]), + from_u32x8([0, 7, 0, 0, 0, 0, 0, 0]), + from_u32x8([1, 7, 0, 0, 0, 0, 0, 0]), + from_u32x8([0, 1, 7, 0, 0, 0, 0, 0]), + from_u32x8([2, 7, 0, 0, 0, 0, 0, 0]), + from_u32x8([0, 2, 7, 0, 0, 0, 0, 0]), + from_u32x8([1, 2, 7, 0, 0, 0, 0, 0]), + from_u32x8([0, 1, 2, 7, 0, 0, 0, 0]), + from_u32x8([3, 7, 0, 0, 0, 0, 0, 0]), + from_u32x8([0, 3, 7, 0, 0, 0, 0, 0]), + from_u32x8([1, 3, 7, 0, 0, 0, 0, 0]), + from_u32x8([0, 1, 3, 7, 0, 0, 0, 0]), + from_u32x8([2, 3, 7, 0, 0, 0, 0, 0]), + from_u32x8([0, 2, 3, 7, 0, 0, 0, 0]), + from_u32x8([1, 2, 3, 7, 0, 0, 0, 0]), + from_u32x8([0, 1, 2, 3, 7, 0, 0, 0]), + from_u32x8([4, 7, 0, 0, 0, 0, 0, 0]), + from_u32x8([0, 4, 7, 0, 0, 0, 0, 0]), + from_u32x8([1, 4, 7, 0, 0, 0, 0, 0]), + from_u32x8([0, 1, 4, 7, 0, 0, 0, 0]), + from_u32x8([2, 4, 7, 0, 0, 0, 0, 0]), + from_u32x8([0, 2, 4, 7, 0, 0, 0, 0]), + from_u32x8([1, 2, 4, 7, 0, 0, 0, 0]), + from_u32x8([0, 1, 2, 4, 7, 0, 0, 0]), + from_u32x8([3, 4, 7, 0, 0, 0, 0, 0]), + from_u32x8([0, 3, 4, 7, 0, 0, 0, 0]), + from_u32x8([1, 3, 4, 7, 0, 0, 0, 0]), + from_u32x8([0, 1, 3, 4, 7, 0, 0, 0]), + from_u32x8([2, 3, 4, 7, 0, 0, 0, 0]), + from_u32x8([0, 2, 3, 4, 7, 0, 0, 0]), + from_u32x8([1, 2, 3, 4, 7, 0, 0, 0]), + from_u32x8([0, 1, 2, 3, 4, 7, 0, 0]), + from_u32x8([5, 7, 0, 0, 0, 0, 0, 0]), + from_u32x8([0, 5, 7, 0, 0, 0, 0, 0]), + from_u32x8([1, 5, 7, 0, 0, 0, 0, 0]), + from_u32x8([0, 1, 5, 7, 0, 0, 0, 0]), + from_u32x8([2, 5, 7, 0, 0, 0, 0, 0]), + from_u32x8([0, 2, 5, 7, 0, 0, 0, 0]), + from_u32x8([1, 2, 5, 7, 0, 0, 0, 0]), + from_u32x8([0, 1, 2, 5, 7, 0, 0, 0]), + from_u32x8([3, 5, 7, 0, 0, 0, 0, 0]), + from_u32x8([0, 3, 5, 7, 0, 0, 0, 0]), + from_u32x8([1, 3, 5, 7, 0, 0, 0, 0]), + from_u32x8([0, 1, 3, 5, 7, 0, 0, 0]), + from_u32x8([2, 3, 5, 7, 0, 0, 0, 0]), + from_u32x8([0, 2, 3, 5, 7, 0, 0, 0]), + from_u32x8([1, 2, 3, 5, 7, 0, 0, 0]), + from_u32x8([0, 1, 2, 3, 5, 7, 0, 0]), + from_u32x8([4, 5, 7, 0, 0, 0, 0, 0]), + from_u32x8([0, 4, 5, 7, 0, 0, 0, 0]), + from_u32x8([1, 4, 5, 7, 0, 0, 0, 0]), + from_u32x8([0, 1, 4, 5, 7, 0, 0, 0]), + from_u32x8([2, 4, 5, 7, 0, 0, 0, 0]), + from_u32x8([0, 2, 4, 5, 7, 0, 0, 0]), + from_u32x8([1, 2, 4, 5, 7, 0, 0, 0]), + from_u32x8([0, 1, 2, 4, 5, 7, 0, 0]), + from_u32x8([3, 4, 5, 7, 0, 0, 0, 0]), + from_u32x8([0, 3, 4, 5, 7, 0, 0, 0]), + from_u32x8([1, 3, 4, 5, 7, 0, 0, 0]), + from_u32x8([0, 1, 3, 4, 5, 7, 0, 0]), + from_u32x8([2, 3, 4, 5, 7, 0, 0, 0]), + from_u32x8([0, 2, 3, 4, 5, 7, 0, 0]), + from_u32x8([1, 2, 3, 4, 5, 7, 0, 0]), + from_u32x8([0, 1, 2, 3, 4, 5, 7, 0]), + from_u32x8([6, 7, 0, 0, 0, 0, 0, 0]), + from_u32x8([0, 6, 7, 0, 0, 0, 0, 0]), + from_u32x8([1, 6, 7, 0, 0, 0, 0, 0]), + from_u32x8([0, 1, 6, 7, 0, 0, 0, 0]), + from_u32x8([2, 6, 7, 0, 0, 0, 0, 0]), + from_u32x8([0, 2, 6, 7, 0, 0, 0, 0]), + from_u32x8([1, 2, 6, 7, 0, 0, 0, 0]), + from_u32x8([0, 1, 2, 6, 7, 0, 0, 0]), + from_u32x8([3, 6, 7, 0, 0, 0, 0, 0]), + from_u32x8([0, 3, 6, 7, 0, 0, 0, 0]), + from_u32x8([1, 3, 6, 7, 0, 0, 0, 0]), + from_u32x8([0, 1, 3, 6, 7, 0, 0, 0]), + from_u32x8([2, 3, 6, 7, 0, 0, 0, 0]), + from_u32x8([0, 2, 3, 6, 7, 0, 0, 0]), + from_u32x8([1, 2, 3, 6, 7, 0, 0, 0]), + from_u32x8([0, 1, 2, 3, 6, 7, 0, 0]), + from_u32x8([4, 6, 7, 0, 0, 0, 0, 0]), + from_u32x8([0, 4, 6, 7, 0, 0, 0, 0]), + from_u32x8([1, 4, 6, 7, 0, 0, 0, 0]), + from_u32x8([0, 1, 4, 6, 7, 0, 0, 0]), + from_u32x8([2, 4, 6, 7, 0, 0, 0, 0]), + from_u32x8([0, 2, 4, 6, 7, 0, 0, 0]), + from_u32x8([1, 2, 4, 6, 7, 0, 0, 0]), + from_u32x8([0, 1, 2, 4, 6, 7, 0, 0]), + from_u32x8([3, 4, 6, 7, 0, 0, 0, 0]), + from_u32x8([0, 3, 4, 6, 7, 0, 0, 0]), + from_u32x8([1, 3, 4, 6, 7, 0, 0, 0]), + from_u32x8([0, 1, 3, 4, 6, 7, 0, 0]), + from_u32x8([2, 3, 4, 6, 7, 0, 0, 0]), + from_u32x8([0, 2, 3, 4, 6, 7, 0, 0]), + from_u32x8([1, 2, 3, 4, 6, 7, 0, 0]), + from_u32x8([0, 1, 2, 3, 4, 6, 7, 0]), + from_u32x8([5, 6, 7, 0, 0, 0, 0, 0]), + from_u32x8([0, 5, 6, 7, 0, 0, 0, 0]), + from_u32x8([1, 5, 6, 7, 0, 0, 0, 0]), + from_u32x8([0, 1, 5, 6, 7, 0, 0, 0]), + from_u32x8([2, 5, 6, 7, 0, 0, 0, 0]), + from_u32x8([0, 2, 5, 6, 7, 0, 0, 0]), + from_u32x8([1, 2, 5, 6, 7, 0, 0, 0]), + from_u32x8([0, 1, 2, 5, 6, 7, 0, 0]), + from_u32x8([3, 5, 6, 7, 0, 0, 0, 0]), + from_u32x8([0, 3, 5, 6, 7, 0, 0, 0]), + from_u32x8([1, 3, 5, 6, 7, 0, 0, 0]), + from_u32x8([0, 1, 3, 5, 6, 7, 0, 0]), + from_u32x8([2, 3, 5, 6, 7, 0, 0, 0]), + from_u32x8([0, 2, 3, 5, 6, 7, 0, 0]), + from_u32x8([1, 2, 3, 5, 6, 7, 0, 0]), + from_u32x8([0, 1, 2, 3, 5, 6, 7, 0]), + from_u32x8([4, 5, 6, 7, 0, 0, 0, 0]), + from_u32x8([0, 4, 5, 6, 7, 0, 0, 0]), + from_u32x8([1, 4, 5, 6, 7, 0, 0, 0]), + from_u32x8([0, 1, 4, 5, 6, 7, 0, 0]), + from_u32x8([2, 4, 5, 6, 7, 0, 0, 0]), + from_u32x8([0, 2, 4, 5, 6, 7, 0, 0]), + from_u32x8([1, 2, 4, 5, 6, 7, 0, 0]), + from_u32x8([0, 1, 2, 4, 5, 6, 7, 0]), + from_u32x8([3, 4, 5, 6, 7, 0, 0, 0]), + from_u32x8([0, 3, 4, 5, 6, 7, 0, 0]), + from_u32x8([1, 3, 4, 5, 6, 7, 0, 0]), + from_u32x8([0, 1, 3, 4, 5, 6, 7, 0]), + from_u32x8([2, 3, 4, 5, 6, 7, 0, 0]), + from_u32x8([0, 2, 3, 4, 5, 6, 7, 0]), + from_u32x8([1, 2, 3, 4, 5, 6, 7, 0]), + from_u32x8([0, 1, 2, 3, 4, 5, 6, 7]), +]; diff --git a/bitpacker/src/filter_vec/mod.rs b/bitpacker/src/filter_vec/mod.rs new file mode 100644 index 000000000..70d73447d --- /dev/null +++ b/bitpacker/src/filter_vec/mod.rs @@ -0,0 +1,167 @@ +use std::ops::RangeInclusive; +use std::sync::atomic::AtomicU8; + +#[cfg(any(target_arch = "x86_64"))] +mod avx2; + +mod scalar; + +#[derive(Clone, Copy, Eq, PartialEq, Debug)] +#[repr(u8)] +enum FilterImplPerInstructionSet { + #[cfg(target_arch = "x86_64")] + AVX2 = 0u8, + Scalar = 1u8, +} + +impl FilterImplPerInstructionSet { + #[inline] + pub fn is_available(&self) -> bool { + match *self { + #[cfg(target_arch = "x86_64")] + FilterImplPerInstructionSet::AVX2 => is_x86_feature_detected!("avx2"), + FilterImplPerInstructionSet::Scalar => true, + } + } +} + +// List of available implementation in preferred order. +#[cfg(target_arch = "x86_64")] +const IMPLS: [FilterImplPerInstructionSet; 2] = [ + FilterImplPerInstructionSet::AVX2, + FilterImplPerInstructionSet::Scalar, +]; + +impl FilterImplPerInstructionSet { + #[inline] + fn from(code: u8) -> FilterImplPerInstructionSet { + if code == FilterImplPerInstructionSet::AVX2 as u8 { + FilterImplPerInstructionSet::AVX2 + } else { + FilterImplPerInstructionSet::Scalar + } + } + + #[inline] + fn filter_vec_in_place(self, range: RangeInclusive, offset: u32, output: &mut Vec) { + match self { + #[cfg(target_arch = "x86_64")] + FilterImplPerInstructionSet::AVX2 => avx2::filter_vec_in_place(range, offset, output), + FilterImplPerInstructionSet::Scalar => { + scalar::filter_vec_in_place(range, offset, output) + } + } + } +} + +#[cfg(target_arch = "x86_64")] +#[inline] +fn get_best_available_instruction_set() -> FilterImplPerInstructionSet { + use std::sync::atomic::Ordering; + static INSTRUCTION_SET_BYTE: AtomicU8 = AtomicU8::new(u8::MAX); + let instruction_set_byte: u8 = INSTRUCTION_SET_BYTE.load(Ordering::Relaxed); + if instruction_set_byte == u8::MAX { + // Let's initialize the instruction set and cache it. + let instruction_set = IMPLS + .into_iter() + .find(FilterImplPerInstructionSet::is_available) + .unwrap(); + INSTRUCTION_SET_BYTE.store(instruction_set as u8, Ordering::Relaxed); + return instruction_set; + } + FilterImplPerInstructionSet::from(instruction_set_byte) +} + +#[cfg(not(target_arch = "x86_64"))] +#[inline] +const fn get_best_available_instruction_set() -> FilterImplPerInstructionSet { + FilterImplPerInstructionSet::Scalar +} + +pub fn filter_vec_in_place(range: RangeInclusive, offset: u32, output: &mut Vec) { + get_best_available_instruction_set().filter_vec_in_place(range, offset, output) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_get_best_available_instruction_set() { + // This does not test much unfortunately. + // We just make sure the function returns without crashing and returns the same result. + let instruction_set = get_best_available_instruction_set(); + assert_eq!(get_best_available_instruction_set(), instruction_set); + } + + #[cfg(target_arch = "x86_64")] + #[test] + fn test_instruction_set_to_code_from_code() { + for instruction_set in [ + FilterImplPerInstructionSet::AVX2, + FilterImplPerInstructionSet::Scalar, + ] { + let code = instruction_set as u8; + assert_eq!(instruction_set, FilterImplPerInstructionSet::from(code)); + } + } + + fn test_filter_impl_empty_aux(filter_impl: FilterImplPerInstructionSet) { + let mut output = vec![]; + filter_impl.filter_vec_in_place(0..=u32::MAX, 0, &mut output); + assert_eq!(&output, &[]); + } + + fn test_filter_impl_simple_aux(filter_impl: FilterImplPerInstructionSet) { + let mut output = vec![3, 2, 1, 5, 11, 2, 5, 10, 2]; + filter_impl.filter_vec_in_place(3..=10, 0, &mut output); + assert_eq!(&output, &[0, 3, 6, 7]); + } + + fn test_filter_impl_simple_aux_shifted(filter_impl: FilterImplPerInstructionSet) { + let mut output = vec![3, 2, 1, 5, 11, 2, 5, 10, 2]; + filter_impl.filter_vec_in_place(3..=10, 10, &mut output); + assert_eq!(&output, &[10, 13, 16, 17]); + } + + fn test_filter_impl_simple_outside_i32_range(filter_impl: FilterImplPerInstructionSet) { + let mut output = vec![u32::MAX, i32::MAX as u32 + 1, 0, 1, 3, 1, 1, 1, 1]; + filter_impl.filter_vec_in_place(1..=i32::MAX as u32 + 1u32, 0, &mut output); + assert_eq!(&output, &[1, 3, 4, 5, 6, 7, 8]); + } + + fn test_filter_impl_test_suite(filter_impl: FilterImplPerInstructionSet) { + test_filter_impl_empty_aux(filter_impl); + test_filter_impl_simple_aux(filter_impl); + test_filter_impl_simple_aux_shifted(filter_impl); + test_filter_impl_simple_outside_i32_range(filter_impl); + } + + #[test] + fn test_filter_implementation_avx2() { + if FilterImplPerInstructionSet::AVX2.is_available() { + test_filter_impl_test_suite(FilterImplPerInstructionSet::AVX2); + } + } + + #[test] + fn test_filter_implementation_scalar() { + test_filter_impl_test_suite(FilterImplPerInstructionSet::Scalar); + } + + proptest::proptest! { + #[test] + fn test_filter_impl_proptest( + start in proptest::prelude::any::(), + end in proptest::prelude::any::(), + offset in 0u32..2u32, + mut vals in proptest::collection::vec(0..u32::MAX, 0..30)) { + if FilterImplPerInstructionSet::AVX2.is_available() { + let mut vals_clone = vals.clone(); + FilterImplPerInstructionSet::AVX2.filter_vec_in_place(start..=end, offset, &mut vals); + FilterImplPerInstructionSet::Scalar.filter_vec_in_place(start..=end, offset, &mut vals_clone); + assert_eq!(&vals, &vals_clone); + } + } + } +} diff --git a/bitpacker/src/filter_vec/scalar.rs b/bitpacker/src/filter_vec/scalar.rs new file mode 100644 index 000000000..b1acfabe7 --- /dev/null +++ b/bitpacker/src/filter_vec/scalar.rs @@ -0,0 +1,13 @@ +use std::ops::RangeInclusive; + +pub fn filter_vec_in_place(range: RangeInclusive, offset: u32, output: &mut Vec) { + // We restrict the accepted boundary, because unsigned integers & SIMD don't + // play well. + let mut output_cursor = 0; + for i in 0..output.len() { + let val = output[i]; + output[output_cursor] = offset + i as u32; + output_cursor += if range.contains(&val) { 1 } else { 0 }; + } + output.truncate(output_cursor); +} diff --git a/bitpacker/src/lib.rs b/bitpacker/src/lib.rs index b436f45dd..b2eacec05 100644 --- a/bitpacker/src/lib.rs +++ b/bitpacker/src/lib.rs @@ -1,5 +1,6 @@ mod bitpacker; mod blocked_bitpacker; +mod filter_vec; use std::cmp::Ordering; diff --git a/columnar/src/column_values/mod.rs b/columnar/src/column_values/mod.rs index 923ba15ef..42cf796b0 100644 --- a/columnar/src/column_values/mod.rs +++ b/columnar/src/column_values/mod.rs @@ -94,7 +94,6 @@ pub trait ColumnValues: Send + Sync { /// Get the row ids of values which are in the provided value range. /// /// Note that position == docid for single value fast fields - #[inline(always)] fn get_row_ids_for_value_range( &self, value_range: RangeInclusive, diff --git a/columnar/src/column_values/u128_based/compact_space/build_compact_space.rs b/columnar/src/column_values/u128_based/compact_space/build_compact_space.rs index 51d1912ca..48d631b76 100644 --- a/columnar/src/column_values/u128_based/compact_space/build_compact_space.rs +++ b/columnar/src/column_values/u128_based/compact_space/build_compact_space.rs @@ -10,7 +10,7 @@ use super::{CompactSpace, RangeMapping}; /// Put the blanks for the sorted values into a binary heap fn get_blanks(values_sorted: &BTreeSet) -> BinaryHeap { let mut blanks: BinaryHeap = BinaryHeap::new(); - for (first, second) in values_sorted.iter().tuple_windows() { + for (first, second) in values_sorted.iter().copied().tuple_windows() { // Correctness Overflow: the values are deduped and sorted (BTreeSet property), that means // there's always space between two values. let blank_range = first + 1..=second - 1; @@ -65,12 +65,12 @@ pub fn get_compact_space( return compact_space_builder.finish(); } - let mut blanks: BinaryHeap = get_blanks(values_deduped_sorted); - // Replace after stabilization of https://github.com/rust-lang/rust/issues/62924 - // We start by space that's limited to min_value..=max_value - let min_value = *values_deduped_sorted.iter().next().unwrap_or(&0); - let max_value = *values_deduped_sorted.iter().last().unwrap_or(&0); + // Replace after stabilization of https://github.com/rust-lang/rust/issues/62924 + let min_value = values_deduped_sorted.iter().next().copied().unwrap_or(0); + let max_value = values_deduped_sorted.iter().last().copied().unwrap_or(0); + + let mut blanks: BinaryHeap = get_blanks(values_deduped_sorted); // +1 for null, in case min and max covers the whole space, we are off by one. let mut amplitude_compact_space = (max_value - min_value).saturating_add(1); @@ -84,6 +84,7 @@ pub fn get_compact_space( let mut amplitude_bits: u8 = num_bits(amplitude_compact_space); let mut blank_collector = BlankCollector::new(); + // We will stage blanks until they reduce the compact space by at least 1 bit and then flush // them if the metadata cost is lower than the total number of saved bits. // Binary heap to process the gaps by their size @@ -93,6 +94,7 @@ pub fn get_compact_space( let staged_spaces_sum: u128 = blank_collector.staged_blanks_sum(); let amplitude_new_compact_space = amplitude_compact_space - staged_spaces_sum; let amplitude_new_bits = num_bits(amplitude_new_compact_space); + if amplitude_bits == amplitude_new_bits { continue; } @@ -100,7 +102,16 @@ pub fn get_compact_space( // TODO: Maybe calculate exact cost of blanks and run this more expensive computation only, // when amplitude_new_bits changes let cost = blank_collector.num_staged_blanks() * cost_per_blank; - if cost >= saved_bits { + + // We want to end up with a compact space that fits into 32 bits. + // In order to deal with pathological cases, we force the algorithm to keep + // refining the compact space the amplitude bits is lower than 32. + // + // The worst case scenario happens for a large number of u128s regularly + // spread over the full u128 space. + // + // This change will force the algorithm to degenerate into dictionary encoding. + if amplitude_bits <= 32 && cost >= saved_bits { // Continue here, since although we walk over the blanks by size, // we can potentially save a lot at the last bits, which are smaller blanks // @@ -115,6 +126,8 @@ pub fn get_compact_space( compact_space_builder.add_blanks(blank_collector.drain().map(|blank| blank.blank_range())); } + assert!(amplitude_bits <= 32); + // special case, when we don't collected any blanks because: // * the data is empty (early exit) // * the algorithm did decide it's not worth the cost, which can be the case for single values @@ -199,7 +212,7 @@ impl CompactSpaceBuilder { covered_space.push(0..=0); // empty data case }; - let mut compact_start: u64 = 1; // 0 is reserved for `null` + let mut compact_start: u32 = 1; // 0 is reserved for `null` let mut ranges_mapping: Vec = Vec::with_capacity(covered_space.len()); for cov in covered_space { let range_mapping = super::RangeMapping { @@ -218,6 +231,7 @@ impl CompactSpaceBuilder { #[cfg(test)] mod tests { use super::*; + use crate::column_values::u128_based::compact_space::COST_PER_BLANK_IN_BITS; #[test] fn test_binary_heap_pop_order() { @@ -228,4 +242,11 @@ mod tests { assert_eq!(blanks.pop().unwrap().blank_size(), 101); assert_eq!(blanks.pop().unwrap().blank_size(), 11); } + + #[test] + fn test_worst_case_scenario() { + let vals: BTreeSet = (0..8).map(|i| i * ((1u128 << 34) / 8)).collect(); + let compact_space = get_compact_space(&vals, vals.len() as u32, COST_PER_BLANK_IN_BITS); + assert!(compact_space.amplitude_compact_space() < u32::MAX as u128); + } } diff --git a/columnar/src/column_values/u128_based/compact_space/mod.rs b/columnar/src/column_values/u128_based/compact_space/mod.rs index a32b6f5b8..3b1069657 100644 --- a/columnar/src/column_values/u128_based/compact_space/mod.rs +++ b/columnar/src/column_values/u128_based/compact_space/mod.rs @@ -42,15 +42,15 @@ pub struct CompactSpace { #[derive(Debug, Clone, Eq, PartialEq)] struct RangeMapping { value_range: RangeInclusive, - compact_start: u64, + compact_start: u32, } impl RangeMapping { - fn range_length(&self) -> u64 { - (self.value_range.end() - self.value_range.start()) as u64 + 1 + fn range_length(&self) -> u32 { + (self.value_range.end() - self.value_range.start()) as u32 + 1 } // The last value of the compact space in this range - fn compact_end(&self) -> u64 { + fn compact_end(&self) -> u32 { self.compact_start + self.range_length() - 1 } } @@ -81,7 +81,7 @@ impl BinarySerializable for CompactSpace { let num_ranges = VInt::deserialize(reader)?.0; let mut ranges_mapping: Vec = vec![]; let mut value = 0u128; - let mut compact_start = 1u64; // 0 is reserved for `null` + let mut compact_start = 1u32; // 0 is reserved for `null` for _ in 0..num_ranges { let blank_delta_start = VIntU128::deserialize(reader)?.0; value += blank_delta_start; @@ -122,10 +122,10 @@ impl CompactSpace { /// Returns either Ok(the value in the compact space) or if it is outside the compact space the /// Err(position where it would be inserted) - fn u128_to_compact(&self, value: u128) -> Result { + fn u128_to_compact(&self, value: u128) -> Result { self.ranges_mapping .binary_search_by(|probe| { - let value_range = &probe.value_range; + let value_range: &RangeInclusive = &probe.value_range; if value < *value_range.start() { Ordering::Greater } else if value > *value_range.end() { @@ -136,13 +136,13 @@ impl CompactSpace { }) .map(|pos| { let range_mapping = &self.ranges_mapping[pos]; - let pos_in_range = (value - range_mapping.value_range.start()) as u64; + let pos_in_range: u32 = (value - range_mapping.value_range.start()) as u32; range_mapping.compact_start + pos_in_range }) } - /// Unpacks a value from compact space u64 to u128 space - fn compact_to_u128(&self, compact: u64) -> u128 { + /// Unpacks a value from compact space u32 to u128 space + fn compact_to_u128(&self, compact: u32) -> u128 { let pos = self .ranges_mapping .binary_search_by_key(&compact, |range_mapping| range_mapping.compact_start) @@ -178,11 +178,15 @@ impl CompactSpaceCompressor { /// Taking the vals as Vec may cost a lot of memory. It is used to sort the vals. pub fn train_from(iter: impl Iterator) -> Self { let mut values_sorted = BTreeSet::new(); + // Total number of values, with their redundancy. let mut total_num_values = 0u32; for val in iter { total_num_values += 1u32; values_sorted.insert(val); } + let min_value = *values_sorted.iter().next().unwrap_or(&0); + let max_value = *values_sorted.iter().last().unwrap_or(&0); + let compact_space = get_compact_space(&values_sorted, total_num_values, COST_PER_BLANK_IN_BITS); let amplitude_compact_space = compact_space.amplitude_compact_space(); @@ -193,13 +197,12 @@ impl CompactSpaceCompressor { ); let num_bits = tantivy_bitpacker::compute_num_bits(amplitude_compact_space as u64); - let min_value = *values_sorted.iter().next().unwrap_or(&0); - let max_value = *values_sorted.iter().last().unwrap_or(&0); + assert_eq!( compact_space .u128_to_compact(max_value) .expect("could not convert max value to compact space"), - amplitude_compact_space as u64 + amplitude_compact_space as u32 ); CompactSpaceCompressor { params: IPCodecParams { @@ -240,7 +243,7 @@ impl CompactSpaceCompressor { "Could not convert value to compact_space. This is a bug.", ) })?; - bitpacker.write(compact, self.params.num_bits, write)?; + bitpacker.write(compact as u64, self.params.num_bits, write)?; } bitpacker.close(write)?; self.write_footer(write)?; @@ -314,48 +317,6 @@ impl ColumnValues for CompactSpaceDecompressor { #[inline] fn get_row_ids_for_value_range( - &self, - value_range: RangeInclusive, - positions_range: Range, - positions: &mut Vec, - ) { - self.get_positions_for_value_range(value_range, positions_range, positions) - } -} - -impl CompactSpaceDecompressor { - pub fn open(data: OwnedBytes) -> io::Result { - let (data_slice, footer_len_bytes) = data.split_at(data.len() - 4); - let footer_len = u32::deserialize(&mut &footer_len_bytes[..])?; - - let data_footer = &data_slice[data_slice.len() - footer_len as usize..]; - let params = IPCodecParams::deserialize(&mut &data_footer[..])?; - let decompressor = CompactSpaceDecompressor { data, params }; - - Ok(decompressor) - } - - /// Converting to compact space for the decompressor is more complex, since we may get values - /// which are outside the compact space. e.g. if we map - /// 1000 => 5 - /// 2000 => 6 - /// - /// and we want a mapping for 1005, there is no equivalent compact space. We instead return an - /// error with the index of the next range. - fn u128_to_compact(&self, value: u128) -> Result { - self.params.compact_space.u128_to_compact(value) - } - - fn compact_to_u128(&self, compact: u64) -> u128 { - self.params.compact_space.compact_to_u128(compact) - } - - /// Comparing on compact space: Random dataset 0,24 (50% random hit) - 1.05 GElements/s - /// Comparing on compact space: Real dataset 1.08 GElements/s - /// - /// Comparing on original space: Real dataset .06 GElements/s (not completely optimized) - #[inline] - pub fn get_positions_for_value_range( &self, value_range: RangeInclusive, position_range: Range, @@ -395,44 +356,42 @@ impl CompactSpaceDecompressor { range_mapping.compact_end() }); - let range = compact_from..=compact_to; + let value_range = compact_from..=compact_to; + self.get_positions_for_compact_value_range(value_range, position_range, positions); + } +} - let scan_num_docs = position_range.end - position_range.start; +impl CompactSpaceDecompressor { + pub fn open(data: OwnedBytes) -> io::Result { + let (data_slice, footer_len_bytes) = data.split_at(data.len() - 4); + let footer_len = u32::deserialize(&mut &footer_len_bytes[..])?; - let step_size = 4; - let cutoff = position_range.start + scan_num_docs - scan_num_docs % step_size; + let data_footer = &data_slice[data_slice.len() - footer_len as usize..]; + let params = IPCodecParams::deserialize(&mut &data_footer[..])?; + let decompressor = CompactSpaceDecompressor { data, params }; - let mut push_if_in_range = |idx, val| { - if range.contains(&val) { - positions.push(idx); - } - }; - let get_val = |idx| self.params.bit_unpacker.get(idx, &self.data); - // unrolled loop - for idx in (position_range.start..cutoff).step_by(step_size as usize) { - let idx1 = idx; - let idx2 = idx + 1; - let idx3 = idx + 2; - let idx4 = idx + 3; - let val1 = get_val(idx1); - let val2 = get_val(idx2); - let val3 = get_val(idx3); - let val4 = get_val(idx4); - push_if_in_range(idx1, val1); - push_if_in_range(idx2, val2); - push_if_in_range(idx3, val3); - push_if_in_range(idx4, val4); - } + Ok(decompressor) + } - // handle rest - for idx in cutoff..position_range.end { - push_if_in_range(idx, get_val(idx)); - } + /// Converting to compact space for the decompressor is more complex, since we may get values + /// which are outside the compact space. e.g. if we map + /// 1000 => 5 + /// 2000 => 6 + /// + /// and we want a mapping for 1005, there is no equivalent compact space. We instead return an + /// error with the index of the next range. + fn u128_to_compact(&self, value: u128) -> Result { + self.params.compact_space.u128_to_compact(value) + } + + fn compact_to_u128(&self, compact: u32) -> u128 { + self.params.compact_space.compact_to_u128(compact) } #[inline] - fn iter_compact(&self) -> impl Iterator + '_ { - (0..self.params.num_vals).map(move |idx| self.params.bit_unpacker.get(idx, &self.data)) + fn iter_compact(&self) -> impl Iterator + '_ { + (0..self.params.num_vals) + .map(move |idx| self.params.bit_unpacker.get(idx, &self.data) as u32) } #[inline] @@ -445,7 +404,7 @@ impl CompactSpaceDecompressor { #[inline] pub fn get(&self, idx: u32) -> u128 { - let compact = self.params.bit_unpacker.get(idx, &self.data); + let compact = self.params.bit_unpacker.get(idx, &self.data) as u32; self.compact_to_u128(compact) } @@ -456,6 +415,20 @@ impl CompactSpaceDecompressor { pub fn max_value(&self) -> u128 { self.params.max_value } + + fn get_positions_for_compact_value_range( + &self, + value_range: RangeInclusive, + position_range: Range, + positions: &mut Vec, + ) { + self.params.bit_unpacker.get_ids_for_value_range( + *value_range.start() as u64..=*value_range.end() as u64, + position_range, + &self.data, + positions, + ); + } } #[cfg(test)] @@ -469,12 +442,12 @@ mod tests { #[test] fn compact_space_test() { - let ips = &[ + let ips: BTreeSet = [ 2u128, 4u128, 1000, 1001, 1002, 1003, 1004, 1005, 1008, 1010, 1012, 1260, ] .into_iter() .collect(); - let compact_space = get_compact_space(ips, ips.len() as u32, 11); + let compact_space = get_compact_space(&ips, ips.len() as u32, 11); let amplitude = compact_space.amplitude_compact_space(); assert_eq!(amplitude, 17); assert_eq!(1, compact_space.u128_to_compact(2).unwrap()); @@ -497,8 +470,8 @@ mod tests { ); for ip in ips { - let compact = compact_space.u128_to_compact(*ip).unwrap(); - assert_eq!(compact_space.compact_to_u128(compact), *ip); + let compact = compact_space.u128_to_compact(ip).unwrap(); + assert_eq!(compact_space.compact_to_u128(compact), ip); } } @@ -524,7 +497,7 @@ mod tests { .map(|pos| pos as u32) .collect::>(); let mut positions = Vec::new(); - decompressor.get_positions_for_value_range( + decompressor.get_row_ids_for_value_range( range, 0..decompressor.num_vals(), &mut positions, @@ -569,7 +542,7 @@ mod tests { let val = *val; let pos = pos as u32; let mut positions = Vec::new(); - decomp.get_positions_for_value_range(val..=val, pos..pos + 1, &mut positions); + decomp.get_row_ids_for_value_range(val..=val, pos..pos + 1, &mut positions); assert_eq!(positions, vec![pos]); } diff --git a/columnar/src/column_values/u64_based/bitpacked.rs b/columnar/src/column_values/u64_based/bitpacked.rs index 4b2c8546e..b59c8ef1b 100644 --- a/columnar/src/column_values/u64_based/bitpacked.rs +++ b/columnar/src/column_values/u64_based/bitpacked.rs @@ -1,4 +1,6 @@ use std::io::{self, Write}; +use std::num::NonZeroU64; +use std::ops::{Range, RangeInclusive}; use common::{BinarySerializable, OwnedBytes}; use fastdivide::DividerU64; @@ -16,6 +18,46 @@ pub struct BitpackedReader { stats: ColumnStats, } +#[inline(always)] +const fn div_ceil(n: u64, q: NonZeroU64) -> u64 { + // copied from unstable rust standard library. + let d = n / q.get(); + let r = n % q.get(); + if r > 0 { + d + 1 + } else { + d + } +} + +// The bitpacked codec applies a linear transformation `f` over data that are bitpacked. +// f is defined by: +// f: bitpacked -> stats.min_value + stats.gcd * bitpacked +// +// In order to run range queries, we invert the transformation. +// `transform_range_before_linear_transformation` returns the range of values +// [min_bipacked_value..max_bitpacked_value] such that +// f(bitpacked) ∈ [min_value, max_value] <=> bitpacked ∈ [min_bitpacked_value, max_bitpacked_value] +fn transform_range_before_linear_transformation( + stats: &ColumnStats, + range: RangeInclusive, +) -> Option> { + if range.is_empty() { + return None; + } + if stats.min_value > *range.end() { + return None; + } + if stats.max_value < *range.start() { + return None; + } + let shifted_range = + range.start().saturating_sub(stats.min_value)..=range.end().saturating_sub(stats.min_value); + let start_before_gcd_multiplication: u64 = div_ceil(*shifted_range.start(), stats.gcd); + let end_before_gcd_multiplication: u64 = *shifted_range.end() / stats.gcd; + Some(start_before_gcd_multiplication..=end_before_gcd_multiplication) +} + impl ColumnValues for BitpackedReader { #[inline(always)] fn get_val(&self, doc: u32) -> u64 { @@ -34,6 +76,25 @@ impl ColumnValues for BitpackedReader { fn num_vals(&self) -> RowId { self.stats.num_rows } + + fn get_row_ids_for_value_range( + &self, + range: RangeInclusive, + doc_id_range: Range, + positions: &mut Vec, + ) { + let Some(transformed_range) = transform_range_before_linear_transformation(&self.stats, range) + else { + positions.clear(); + return; + }; + self.bit_unpacker.get_ids_for_value_range( + transformed_range, + doc_id_range, + &self.data, + positions, + ); + } } fn num_bits(stats: &ColumnStats) -> u8 {