mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-08 01:52:54 +00:00
add null value detection for ip codec
This commit is contained in:
@@ -61,6 +61,7 @@ serde_cbor = { version = "0.11.2", optional = true }
|
||||
async-trait = "0.1.53"
|
||||
arc-swap = "1.5.0"
|
||||
gcd = "2.1.0"
|
||||
roaring = "0.9.0"
|
||||
|
||||
[target.'cfg(windows)'.dependencies]
|
||||
winapi = "0.3.9"
|
||||
|
||||
@@ -14,12 +14,15 @@
|
||||
use std::{
|
||||
cmp::Ordering,
|
||||
collections::BinaryHeap,
|
||||
io,
|
||||
io::{self, Write},
|
||||
net::{IpAddr, Ipv6Addr},
|
||||
ops::RangeInclusive,
|
||||
};
|
||||
|
||||
use tantivy_bitpacker::{self, BitPacker, BitUnpacker};
|
||||
|
||||
use crate::FastFieldCodecReaderU128;
|
||||
|
||||
pub fn ip_to_u128(ip_addr: IpAddr) -> u128 {
|
||||
let ip_addr_v6: Ipv6Addr = match ip_addr {
|
||||
IpAddr::V4(v4) => v4.to_ipv6_mapped(),
|
||||
@@ -34,9 +37,10 @@ const INTERVALL_COST_IN_BITS: usize = 64;
|
||||
pub struct IntervalEncoding();
|
||||
|
||||
pub struct IntervalCompressor {
|
||||
pub null_value: u128,
|
||||
min_value: u128,
|
||||
max_value: u128,
|
||||
ranges_and_compact_start: CompactSpace,
|
||||
compact_space: CompactSpace,
|
||||
pub num_bits: u8,
|
||||
}
|
||||
|
||||
@@ -125,7 +129,7 @@ fn get_deltas(ip_addrs_sorted: &[u128]) -> BinaryHeap<DeltaAndPos> {
|
||||
|
||||
/// Will find blanks if it will affect the number of bits used on the compact space.
|
||||
/// Returns the new amplitude and the positions of blanks
|
||||
fn get_blanks(ip_addrs_sorted: &[u128], cost_per_interval: usize) -> (u128, CompactSpace) {
|
||||
fn get_compact_space(ip_addrs_sorted: &[u128], cost_per_interval: usize) -> CompactSpace {
|
||||
let mut deltas = get_deltas(ip_addrs_sorted);
|
||||
let mut amplitude_compact_space = *ip_addrs_sorted.last().unwrap() + 1;
|
||||
let mut amplitude_bits: u8 = (amplitude_compact_space as f64).log2().ceil() as u8;
|
||||
@@ -140,7 +144,8 @@ fn get_blanks(ip_addrs_sorted: &[u128], cost_per_interval: usize) -> (u128, Comp
|
||||
let pos = ip_addr_and_pos.pos;
|
||||
staged_blanks.push((delta, pos));
|
||||
let staged_spaces_sum: u128 = staged_blanks.iter().map(|(delta, _)| delta - 1).sum();
|
||||
let amplitude_new_compact_space = amplitude_compact_space - staged_spaces_sum;
|
||||
// +1 for later added null value
|
||||
let amplitude_new_compact_space = amplitude_compact_space - staged_spaces_sum + 1;
|
||||
let amplitude_new_bits = (amplitude_new_compact_space as f64).log2().ceil() as u8;
|
||||
if amplitude_bits == amplitude_new_bits {
|
||||
continue;
|
||||
@@ -169,7 +174,7 @@ fn get_blanks(ip_addrs_sorted: &[u128], cost_per_interval: usize) -> (u128, Comp
|
||||
}
|
||||
compact_space.add_hole(*ip_addrs_sorted.last().unwrap() + 1..=u128::MAX);
|
||||
|
||||
(amplitude_compact_space, compact_space.finish())
|
||||
compact_space.finish()
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -178,8 +183,11 @@ fn get_blanks_test() {
|
||||
let ips = vec![
|
||||
2u128, 4u128, 1000, 1001, 1002, 1003, 1004, 1005, 1008, 1010, 1012, 1260,
|
||||
];
|
||||
let (amplitude, ranges_and_compact_start) = get_blanks(&ips, 11);
|
||||
assert_eq!(amplitude, 19);
|
||||
let ranges_and_compact_start = get_compact_space(&ips, 11);
|
||||
let null_value = ranges_and_compact_start.null_value;
|
||||
let amplitude = ranges_and_compact_start.amplitude_compact_space();
|
||||
assert_eq!(null_value, 5);
|
||||
assert_eq!(amplitude, 20);
|
||||
assert_eq!(2, ranges_and_compact_start.to_compact(2).unwrap());
|
||||
|
||||
assert_eq!(ranges_and_compact_start.to_compact(100).unwrap_err(), 0);
|
||||
@@ -197,6 +205,11 @@ impl CompactSpaceBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
fn assign_and_return_null(&mut self) -> u128 {
|
||||
self.covered_space[0] = *self.covered_space[0].start()..=*self.covered_space[0].end() + 1;
|
||||
*self.covered_space[0].end()
|
||||
}
|
||||
|
||||
// Assumes that repeated add_hole calls don't overlap.
|
||||
fn add_hole(&mut self, hole: std::ops::RangeInclusive<u128>) {
|
||||
let position = self
|
||||
@@ -222,7 +235,9 @@ impl CompactSpaceBuilder {
|
||||
self.covered_space.insert(position, new_range_start);
|
||||
}
|
||||
}
|
||||
fn finish(self) -> CompactSpace {
|
||||
fn finish(mut self) -> CompactSpace {
|
||||
let null_value = self.assign_and_return_null();
|
||||
|
||||
let mut compact_start: u64 = 0;
|
||||
let mut ranges_and_compact_start = vec![];
|
||||
for cov in self.covered_space {
|
||||
@@ -232,6 +247,7 @@ impl CompactSpaceBuilder {
|
||||
}
|
||||
CompactSpace {
|
||||
ranges_and_compact_start,
|
||||
null_value,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -239,15 +255,19 @@ impl CompactSpaceBuilder {
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
struct CompactSpace {
|
||||
ranges_and_compact_start: Vec<(std::ops::RangeInclusive<u128>, u64)>,
|
||||
pub null_value: u128,
|
||||
}
|
||||
impl CompactSpace {
|
||||
fn len(&self) -> usize {
|
||||
self.ranges_and_compact_start.len()
|
||||
fn amplitude_compact_space(&self) -> u128 {
|
||||
let last_range = &self.ranges_and_compact_start[self.ranges_and_compact_start.len() - 1];
|
||||
last_range.1 as u128 + (last_range.0.end() - last_range.0.start()) + 1
|
||||
}
|
||||
fn get_range(&self, pos: usize) -> &(std::ops::RangeInclusive<u128>, u64) {
|
||||
|
||||
fn get_range_and_compact_start(&self, pos: usize) -> &(std::ops::RangeInclusive<u128>, u64) {
|
||||
&self.ranges_and_compact_start[pos]
|
||||
}
|
||||
fn serialize(&self, output: &mut Vec<u8>) {
|
||||
serialize_vint(self.null_value as u128, output);
|
||||
serialize_vint(self.ranges_and_compact_start.len() as u128, output);
|
||||
let mut prev_ip = 0;
|
||||
for (ip_range, _compact) in &self.ranges_and_compact_start {
|
||||
@@ -261,9 +281,9 @@ impl CompactSpace {
|
||||
}
|
||||
}
|
||||
|
||||
fn deserialize(mut data: &[u8]) -> io::Result<(&[u8], Self)> {
|
||||
let (num_ip_addrs, new_data) = deserialize_vint(data)?;
|
||||
data = new_data;
|
||||
fn deserialize(data: &[u8]) -> io::Result<(&[u8], Self)> {
|
||||
let (null_value, data) = deserialize_vint(data)?;
|
||||
let (num_ip_addrs, mut data) = deserialize_vint(data)?;
|
||||
let mut ip_addr = 0u128;
|
||||
let mut compact = 0u64;
|
||||
let mut ranges_and_compact_start: Vec<(std::ops::RangeInclusive<u128>, u64)> = vec![];
|
||||
@@ -286,6 +306,7 @@ impl CompactSpace {
|
||||
Ok((
|
||||
data,
|
||||
Self {
|
||||
null_value,
|
||||
ranges_and_compact_start,
|
||||
},
|
||||
))
|
||||
@@ -331,7 +352,8 @@ fn ranges_and_compact_start_test() {
|
||||
let ips = vec![
|
||||
2u128, 4u128, 1000, 1001, 1002, 1003, 1004, 1005, 1008, 1010, 1012, 1260,
|
||||
];
|
||||
let (_amplitude, ranges_and_compact_start) = get_blanks(&ips, 11);
|
||||
let ranges_and_compact_start = get_compact_space(&ips, 11);
|
||||
assert_eq!(ranges_and_compact_start.null_value, 5);
|
||||
|
||||
let mut output = vec![];
|
||||
ranges_and_compact_start.serialize(&mut output);
|
||||
@@ -348,59 +370,90 @@ fn ranges_and_compact_start_test() {
|
||||
}
|
||||
|
||||
pub fn train(ip_addrs_sorted: &[u128]) -> IntervalCompressor {
|
||||
let (amplitude, ranges_and_compact_start) = get_blanks(ip_addrs_sorted, INTERVALL_COST_IN_BITS);
|
||||
let ranges_and_compact_start = get_compact_space(ip_addrs_sorted, INTERVALL_COST_IN_BITS);
|
||||
let null_value = ranges_and_compact_start.null_value;
|
||||
let amplitude_compact_space = ranges_and_compact_start.amplitude_compact_space();
|
||||
|
||||
assert!(amplitude <= u64::MAX as u128, "case unsupported.");
|
||||
assert!(
|
||||
amplitude_compact_space <= u64::MAX as u128,
|
||||
"case unsupported."
|
||||
);
|
||||
|
||||
let num_bits = tantivy_bitpacker::compute_num_bits(amplitude as u64);
|
||||
let num_bits = tantivy_bitpacker::compute_num_bits(amplitude_compact_space as u64);
|
||||
let min_value = ip_addrs_sorted[0];
|
||||
let max_value = ip_addrs_sorted[ip_addrs_sorted.len() - 1];
|
||||
let compressor = IntervalCompressor {
|
||||
null_value,
|
||||
min_value,
|
||||
max_value,
|
||||
ranges_and_compact_start,
|
||||
compact_space: ranges_and_compact_start,
|
||||
num_bits,
|
||||
};
|
||||
|
||||
let max_value = *ip_addrs_sorted.last().unwrap().max(&null_value);
|
||||
assert_eq!(
|
||||
compressor.to_compact(*ip_addrs_sorted.last().unwrap()) + 1,
|
||||
amplitude as u64
|
||||
compressor.to_compact(max_value) + 1,
|
||||
amplitude_compact_space as u64
|
||||
);
|
||||
compressor
|
||||
}
|
||||
|
||||
impl IntervalCompressor {
|
||||
fn to_compact(&self, ip_addr: u128) -> u64 {
|
||||
self.ranges_and_compact_start.to_compact(ip_addr).unwrap()
|
||||
pub fn from_vals(mut vals: Vec<u128>) -> Self {
|
||||
vals.sort();
|
||||
train(&vals)
|
||||
}
|
||||
|
||||
fn write_header(&self, output: &mut Vec<u8>) {
|
||||
assert!(output.is_empty());
|
||||
fn to_compact(&self, ip_addr: u128) -> u64 {
|
||||
self.compact_space.to_compact(ip_addr).unwrap()
|
||||
}
|
||||
|
||||
fn write_footer(&self, write: &mut impl Write, num_vals: u128) -> io::Result<()> {
|
||||
let mut footer = vec![];
|
||||
|
||||
// header flags to for future optional dictionary encoding
|
||||
let header_flags = 0u64;
|
||||
output.extend_from_slice(&header_flags.to_le_bytes());
|
||||
footer.extend_from_slice(&header_flags.to_le_bytes());
|
||||
|
||||
serialize_vint(self.min_value, output);
|
||||
serialize_vint(self.max_value, output);
|
||||
let null_value = self
|
||||
.compact_space
|
||||
.to_compact(self.null_value)
|
||||
.expect("could not convert null to compact space");
|
||||
serialize_vint(null_value as u128, &mut footer);
|
||||
serialize_vint(self.min_value, &mut footer);
|
||||
serialize_vint(self.max_value, &mut footer);
|
||||
|
||||
self.ranges_and_compact_start.serialize(output);
|
||||
output.push(self.num_bits);
|
||||
self.compact_space.serialize(&mut footer);
|
||||
|
||||
footer.push(self.num_bits);
|
||||
serialize_vint(num_vals as u128, &mut footer);
|
||||
|
||||
write.write_all(&footer)?;
|
||||
let footer_len = footer.len() as u32;
|
||||
write.write_all(&footer_len.to_le_bytes())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn compress(&self, vals: &[u128]) -> Vec<u8> {
|
||||
pub fn compress(&self, vals: &[u128]) -> io::Result<Vec<u8>> {
|
||||
let mut output = vec![];
|
||||
self.compress_into(vals, &mut output);
|
||||
output
|
||||
self.compress_into(vals.iter().cloned(), &mut output)?;
|
||||
Ok(output)
|
||||
}
|
||||
pub fn compress_into(&self, vals: &[u128], output: &mut Vec<u8>) {
|
||||
self.write_header(output);
|
||||
serialize_vint(vals.len() as u128, output);
|
||||
pub fn compress_into(
|
||||
&self,
|
||||
vals: impl Iterator<Item = u128>,
|
||||
write: &mut impl Write,
|
||||
) -> io::Result<()> {
|
||||
let mut bitpacker = BitPacker::default();
|
||||
for &ip_addr in vals {
|
||||
let mut num_vals = 0;
|
||||
for ip_addr in vals {
|
||||
let compact = self.to_compact(ip_addr);
|
||||
bitpacker.write(compact, self.num_bits, output).unwrap();
|
||||
bitpacker.write(compact, self.num_bits, write).unwrap();
|
||||
num_vals += 1;
|
||||
}
|
||||
bitpacker.close(output).unwrap();
|
||||
bitpacker.close(write).unwrap();
|
||||
self.write_footer(write, num_vals as u128)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -408,22 +461,56 @@ impl IntervalCompressor {
|
||||
pub struct IntervallDecompressor {
|
||||
compact_space: CompactSpace,
|
||||
bit_unpacker: BitUnpacker,
|
||||
null_compact_space: u64,
|
||||
min_value: u128,
|
||||
max_value: u128,
|
||||
num_vals: usize,
|
||||
}
|
||||
|
||||
impl FastFieldCodecReaderU128 for IntervallDecompressor {
|
||||
fn open_from_bytes(bytes: &[u8]) -> std::io::Result<Self> {
|
||||
Self::open(bytes)
|
||||
}
|
||||
|
||||
fn get(&self, doc: u64, data: &[u8]) -> Option<u128> {
|
||||
self.get(doc, data)
|
||||
}
|
||||
|
||||
fn get_range(&self, range: RangeInclusive<u128>, data: &[u8]) -> Vec<usize> {
|
||||
self.get_range(range, data)
|
||||
}
|
||||
|
||||
fn min_value(&self) -> u128 {
|
||||
self.min_value()
|
||||
}
|
||||
|
||||
fn max_value(&self) -> u128 {
|
||||
self.max_value()
|
||||
}
|
||||
|
||||
/// The computed and assigned number for null values
|
||||
fn null_value(&self) -> u128 {
|
||||
self.compact_space.null_value
|
||||
}
|
||||
}
|
||||
|
||||
impl IntervallDecompressor {
|
||||
pub fn open(data: &[u8]) -> io::Result<(IntervallDecompressor, &[u8])> {
|
||||
pub fn open(data: &[u8]) -> io::Result<IntervallDecompressor> {
|
||||
let (data, footer_len_bytes) = data.split_at(data.len() - 4);
|
||||
let footer_len = u32::from_le_bytes(footer_len_bytes.try_into().unwrap());
|
||||
|
||||
let data = &data[data.len() - footer_len as usize..];
|
||||
let (_header_flags, data) = data.split_at(8);
|
||||
let (null_compact_space, data) = deserialize_vint(data)?;
|
||||
let (min_value, data) = deserialize_vint(data)?;
|
||||
let (max_value, data) = deserialize_vint(data)?;
|
||||
let (mut data, compact_space) = CompactSpace::deserialize(data).unwrap();
|
||||
|
||||
let num_bits = data[0];
|
||||
data = &data[1..];
|
||||
let (num_vals, data) = deserialize_vint(data)?;
|
||||
let (num_vals, _data) = deserialize_vint(data)?;
|
||||
let decompressor = IntervallDecompressor {
|
||||
null_compact_space: null_compact_space as u64,
|
||||
min_value,
|
||||
max_value,
|
||||
compact_space,
|
||||
@@ -431,7 +518,7 @@ impl IntervallDecompressor {
|
||||
bit_unpacker: BitUnpacker::new(num_bits),
|
||||
};
|
||||
|
||||
Ok((decompressor, data))
|
||||
Ok(decompressor)
|
||||
}
|
||||
|
||||
/// Converting to compact space for the decompressor is more complex, since we may get values which are
|
||||
@@ -452,7 +539,9 @@ impl IntervallDecompressor {
|
||||
/// Comparing on compact space: 1.2 GElements/s
|
||||
///
|
||||
/// Comparing on original space: .06 GElements/s (not completely optimized)
|
||||
pub fn get_range(&self, from_ip_addr: u128, to_ip_addr: u128, data: &[u8]) -> Vec<usize> {
|
||||
pub fn get_range(&self, range: RangeInclusive<u128>, data: &[u8]) -> Vec<usize> {
|
||||
let from_ip_addr = *range.start();
|
||||
let to_ip_addr = *range.end();
|
||||
assert!(to_ip_addr >= from_ip_addr);
|
||||
let compact_from = self.to_compact(from_ip_addr);
|
||||
let compact_to = self.to_compact(to_ip_addr);
|
||||
@@ -464,14 +553,14 @@ impl IntervallDecompressor {
|
||||
}
|
||||
|
||||
let compact_from = compact_from.unwrap_or_else(|pos| {
|
||||
let range_and_compact_start = self.compact_space.get_range(pos);
|
||||
let range_and_compact_start = self.compact_space.get_range_and_compact_start(pos);
|
||||
let compact_end = range_and_compact_start.1
|
||||
+ (range_and_compact_start.0.end() - range_and_compact_start.0.start()) as u64;
|
||||
compact_end + 1
|
||||
});
|
||||
// If there is no compact space, we go to the closest upperbound compact space
|
||||
let compact_to = compact_to.unwrap_or_else(|pos| {
|
||||
let range_and_compact_start = self.compact_space.get_range(pos);
|
||||
let range_and_compact_start = self.compact_space.get_range_and_compact_start(pos);
|
||||
let compact_end = range_and_compact_start.1
|
||||
+ (range_and_compact_start.0.end() - range_and_compact_start.0.start()) as u64;
|
||||
compact_end
|
||||
@@ -491,20 +580,25 @@ impl IntervallDecompressor {
|
||||
|
||||
#[inline]
|
||||
pub fn iter_compact<'a>(&'a self, data: &'a [u8]) -> impl Iterator<Item = u64> + 'a {
|
||||
(0..self.num_vals).map(move |idx| self.bit_unpacker.get(idx as u64, data) as u64)
|
||||
(0..self.num_vals)
|
||||
.map(move |idx| self.bit_unpacker.get(idx as u64, data) as u64)
|
||||
.filter(|val| *val != self.null_compact_space)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn iter<'a>(&'a self, data: &'a [u8]) -> impl Iterator<Item = u128> + 'a {
|
||||
// TODO: Performance. It would be better to iterate on the ranges and check existence via the bit_unpacker.
|
||||
(0..self.num_vals)
|
||||
.map(move |idx| self.bit_unpacker.get(idx as u64, data) as u64)
|
||||
self.iter_compact(data)
|
||||
.map(|compact| self.compact_to_ip_addr(compact))
|
||||
}
|
||||
|
||||
pub fn get(&self, idx: usize, data: &[u8]) -> u128 {
|
||||
let base = self.bit_unpacker.get(idx as u64, data);
|
||||
self.compact_to_ip_addr(base)
|
||||
pub fn get(&self, idx: u64, data: &[u8]) -> Option<u128> {
|
||||
let compact = self.bit_unpacker.get(idx, data);
|
||||
if compact == self.null_compact_space {
|
||||
None
|
||||
} else {
|
||||
Some(self.compact_to_ip_addr(compact))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn min_value(&self) -> u128 {
|
||||
@@ -522,12 +616,13 @@ impl IntervalEncoding {
|
||||
train(&vals)
|
||||
}
|
||||
|
||||
// TODO move to test
|
||||
pub fn encode(&self, vals: &[u128]) -> Vec<u8> {
|
||||
if vals.is_empty() {
|
||||
return Vec::new();
|
||||
}
|
||||
let compressor = self.train(vals.to_vec());
|
||||
compressor.compress(&vals)
|
||||
compressor.compress(&vals).unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -537,11 +632,13 @@ mod tests {
|
||||
use super::*;
|
||||
|
||||
fn decode_all(data: &[u8]) -> Vec<u128> {
|
||||
let (decompressor, data) = IntervallDecompressor::open(data).unwrap();
|
||||
let decompressor = IntervallDecompressor::open(data).unwrap();
|
||||
let mut u128_vals = Vec::new();
|
||||
for idx in 0..decompressor.num_vals as usize {
|
||||
let val = decompressor.get(idx, data);
|
||||
u128_vals.push(val);
|
||||
let val = decompressor.get(idx as u64, data);
|
||||
if let Some(val) = val {
|
||||
u128_vals.push(val);
|
||||
}
|
||||
}
|
||||
u128_vals
|
||||
}
|
||||
@@ -554,7 +651,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_compress() {
|
||||
fn test_range_1() {
|
||||
let vals = &[
|
||||
1u128,
|
||||
100u128,
|
||||
@@ -568,28 +665,50 @@ mod tests {
|
||||
];
|
||||
let interval_encoding = IntervalEncoding::default();
|
||||
let data = test_aux_vals(&interval_encoding, vals);
|
||||
let (decomp, data) = IntervallDecompressor::open(&data).unwrap();
|
||||
let positions = decomp.get_range(0, 1, data);
|
||||
let decomp = IntervallDecompressor::open(&data).unwrap();
|
||||
let positions = decomp.get_range(0..=1, &data);
|
||||
assert_eq!(positions, vec![0]);
|
||||
let positions = decomp.get_range(0, 2, data);
|
||||
let positions = decomp.get_range(0..=2, &data);
|
||||
assert_eq!(positions, vec![0]);
|
||||
let positions = decomp.get_range(0, 3, data);
|
||||
let positions = decomp.get_range(0..=3, &data);
|
||||
assert_eq!(positions, vec![0, 2]);
|
||||
assert_eq!(decomp.get_range(99999u128, 99999u128, data), vec![3]);
|
||||
assert_eq!(decomp.get_range(99998u128, 100000u128, data), vec![3, 4]);
|
||||
assert_eq!(decomp.get_range(99998u128, 99999u128, data), vec![3]);
|
||||
assert_eq!(decomp.get_range(99998u128, 99998u128, data), vec![]);
|
||||
assert_eq!(decomp.get_range(333u128, 333u128, data), vec![8]);
|
||||
assert_eq!(decomp.get_range(332u128, 333u128, data), vec![8]);
|
||||
assert_eq!(decomp.get_range(332u128, 334u128, data), vec![8]);
|
||||
assert_eq!(decomp.get_range(333u128, 334u128, data), vec![8]);
|
||||
assert_eq!(decomp.get_range(99999u128..=99999u128, &data), vec![3]);
|
||||
assert_eq!(decomp.get_range(99998u128..=100000u128, &data), vec![3, 4]);
|
||||
assert_eq!(decomp.get_range(99998u128..=99999u128, &data), vec![3]);
|
||||
assert_eq!(decomp.get_range(99998u128..=99998u128, &data), vec![]);
|
||||
assert_eq!(decomp.get_range(333u128..=333u128, &data), vec![8]);
|
||||
assert_eq!(decomp.get_range(332u128..=333u128, &data), vec![8]);
|
||||
assert_eq!(decomp.get_range(332u128..=334u128, &data), vec![8]);
|
||||
assert_eq!(decomp.get_range(333u128..=334u128, &data), vec![8]);
|
||||
|
||||
assert_eq!(
|
||||
decomp.get_range(4_000_211_221u128, 5_000_000_000u128, data),
|
||||
decomp.get_range(4_000_211_221u128..=5_000_000_000u128, &data),
|
||||
vec![6, 7]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_range_2() {
|
||||
let vals = &[
|
||||
100u128,
|
||||
99999u128,
|
||||
100000u128,
|
||||
100001u128,
|
||||
4_000_211_221u128,
|
||||
4_000_211_222u128,
|
||||
333u128,
|
||||
];
|
||||
let interval_encoding = IntervalEncoding::default();
|
||||
let data = test_aux_vals(&interval_encoding, vals);
|
||||
let decomp = IntervallDecompressor::open(&data).unwrap();
|
||||
let positions = decomp.get_range(0..=5, &data);
|
||||
assert_eq!(positions, vec![]);
|
||||
let positions = decomp.get_range(0..=100, &data);
|
||||
assert_eq!(positions, vec![0]);
|
||||
let positions = decomp.get_range(0..=105, &data);
|
||||
assert_eq!(positions, vec![0]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_first_large_gaps() {
|
||||
let vals = &[1_000_000_000u128; 100];
|
||||
|
||||
@@ -4,6 +4,7 @@ extern crate more_asserts;
|
||||
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
use std::ops::RangeInclusive;
|
||||
|
||||
pub mod bitpacked;
|
||||
pub mod ip_codec;
|
||||
@@ -20,10 +21,27 @@ pub trait FastFieldCodecReader: Sized {
|
||||
fn max_value(&self) -> u64;
|
||||
}
|
||||
|
||||
pub trait FastFieldCodecReaderU128: Sized {
|
||||
/// reads the metadata and returns the CodecReader
|
||||
fn open_from_bytes(bytes: &[u8]) -> std::io::Result<Self>;
|
||||
|
||||
/// Get value for doc
|
||||
fn get(&self, doc: u64, data: &[u8]) -> Option<u128>;
|
||||
|
||||
/// Get docs for value range
|
||||
fn get_range(&self, range: RangeInclusive<u128>, data: &[u8]) -> Vec<usize>;
|
||||
|
||||
/// The computed and assigned number value for null values
|
||||
fn null_value(&self) -> u128;
|
||||
|
||||
fn min_value(&self) -> u128;
|
||||
fn max_value(&self) -> u128;
|
||||
}
|
||||
|
||||
/// The FastFieldSerializerEstimate trait is required on all variants
|
||||
/// of fast field compressions, to decide which one to choose.
|
||||
pub trait FastFieldCodecSerializer {
|
||||
/// A codex needs to provide a unique name and id, which is
|
||||
/// A codec needs to provide a unique name and id, which is
|
||||
/// used for debugging and de/serialization.
|
||||
const NAME: &'static str;
|
||||
const ID: u8;
|
||||
|
||||
@@ -94,12 +94,12 @@ fn bench_ip() {
|
||||
let encoding = IntervalEncoding();
|
||||
let dataset = ip_dataset();
|
||||
print_set_stats(&dataset);
|
||||
let enc = encoding.encode(&dataset);
|
||||
let (decompressor, data) = IntervallDecompressor::open(&enc).unwrap();
|
||||
let data = encoding.encode(&dataset);
|
||||
let decompressor = IntervallDecompressor::open(&data).unwrap();
|
||||
|
||||
for i in 11100..11150 {
|
||||
print_time!("get range");
|
||||
let doc_values = decompressor.get_range(dataset[i], dataset[i], data);
|
||||
let doc_values = decompressor.get_range(dataset[i]..=dataset[i], &data);
|
||||
println!("{:?}", doc_values.len());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -327,6 +327,14 @@ impl CompositeFastFieldSerializer {
|
||||
FastBytesFieldSerializer { write: field_write }
|
||||
}
|
||||
|
||||
/// Closes the serializer
|
||||
///
|
||||
/// After this call the data must be persistently saved on disk.
|
||||
pub fn get_field_writer(&mut self, field: Field, idx: usize) -> &mut impl Write {
|
||||
let field_write = self.composite_write.for_field_with_idx(field, idx);
|
||||
field_write
|
||||
}
|
||||
|
||||
/// Closes the serializer
|
||||
///
|
||||
/// After this call the data must be persistently saved on disk.
|
||||
|
||||
@@ -2,7 +2,9 @@ use std::collections::HashMap;
|
||||
use std::io;
|
||||
|
||||
use common;
|
||||
use fastfield_codecs::ip_codec::{ip_to_u128, IntervalCompressor, IntervalEncoding};
|
||||
use fnv::FnvHashMap;
|
||||
use roaring::RoaringBitmap;
|
||||
use tantivy_bitpacker::BlockedBitpacker;
|
||||
|
||||
use super::multivalued::MultiValuedFastFieldWriter;
|
||||
@@ -35,6 +37,7 @@ fn fast_field_default_value(field_entry: &FieldEntry) -> u64 {
|
||||
impl FastFieldsWriter {
|
||||
/// Create all `FastFieldWriter` required by the schema.
|
||||
pub fn from_schema(schema: &Schema) -> FastFieldsWriter {
|
||||
let mut u128_value_writers = Vec::new();
|
||||
let mut single_value_writers = Vec::new();
|
||||
let mut term_id_writers = Vec::new();
|
||||
let mut multi_values_writers = Vec::new();
|
||||
@@ -103,6 +106,7 @@ impl FastFieldsWriter {
|
||||
}
|
||||
}
|
||||
FastFieldsWriter {
|
||||
u128_value_writers,
|
||||
term_id_writers,
|
||||
single_value_writers,
|
||||
multi_values_writers,
|
||||
@@ -192,8 +196,7 @@ impl FastFieldsWriter {
|
||||
.iter_mut()
|
||||
.find(|field_writer| field_writer.field() == field)
|
||||
}
|
||||
bytes_value_writers
|
||||
/// Indexes all of the fastfields of a new document.
|
||||
// Indexes all of the fastfields of a new document.
|
||||
pub fn add_document(&mut self, doc: &Document) {
|
||||
for field_writer in &mut self.term_id_writers {
|
||||
field_writer.add_document(doc);
|
||||
@@ -236,6 +239,101 @@ bytes_value_writers
|
||||
}
|
||||
}
|
||||
|
||||
/// Fast field writer for u128 values.
|
||||
/// The fast field writer just keeps the values in memory.
|
||||
///
|
||||
/// Only when the segment writer can be closed and
|
||||
/// persisted on disc, the fast field writer is
|
||||
/// sent to a `FastFieldSerializer` via the `.serialize(...)`
|
||||
/// method.
|
||||
///
|
||||
/// We cannot serialize earlier as the values are
|
||||
/// bitpacked and the number of bits required for bitpacking
|
||||
/// can only been known once we have seen all of the values.
|
||||
///
|
||||
/// Both u64, i64 and f64 use the same writer.
|
||||
pub struct U128FastFieldWriter {
|
||||
field: Field,
|
||||
vals: Vec<u128>,
|
||||
val_count: u32,
|
||||
|
||||
null_values: RoaringBitmap,
|
||||
//val_if_missing: u64,
|
||||
//val_min: u128,
|
||||
//val_max: u128,
|
||||
}
|
||||
|
||||
impl U128FastFieldWriter {
|
||||
/// Creates a new `IntFastFieldWriter`
|
||||
pub fn new(field: Field) -> Self {
|
||||
Self {
|
||||
field,
|
||||
vals: vec![],
|
||||
val_count: 0,
|
||||
null_values: RoaringBitmap::new(),
|
||||
//val_min: u64::MAX,
|
||||
//val_max: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// The memory used (inclusive childs)
|
||||
pub fn mem_usage(&self) -> usize {
|
||||
self.vals.len() * 16
|
||||
}
|
||||
|
||||
/// Returns the field that this writer is targeting.
|
||||
pub fn field(&self) -> Field {
|
||||
self.field
|
||||
}
|
||||
|
||||
/// Records a new value.
|
||||
///
|
||||
/// The n-th value being recorded is implicitely
|
||||
/// associated to the document with the `DocId` n.
|
||||
/// (Well, `n-1` actually because of 0-indexing)
|
||||
pub fn add_val(&mut self, val: u128) {
|
||||
self.vals.push(val);
|
||||
}
|
||||
|
||||
/// Extract the fast field value from the document
|
||||
/// (or use the default value) and records it.
|
||||
///
|
||||
/// Extract the value associated to the fast field for
|
||||
/// this document.
|
||||
///
|
||||
pub fn add_document(&mut self, doc: &Document) {
|
||||
match doc.get_first(self.field) {
|
||||
Some(v) => {
|
||||
let ip_addr = v.as_ip().unwrap();
|
||||
let value = ip_to_u128(ip_addr);
|
||||
self.add_val(value);
|
||||
}
|
||||
None => {
|
||||
self.null_values.insert(self.val_count as u32);
|
||||
}
|
||||
};
|
||||
self.val_count += 1;
|
||||
}
|
||||
|
||||
/// Push the fast fields value to the `FastFieldWriter`.
|
||||
pub fn serialize(
|
||||
&self,
|
||||
serializer: &mut CompositeFastFieldSerializer,
|
||||
doc_id_map: Option<&DocIdMapping>,
|
||||
) -> io::Result<()> {
|
||||
//let field_write = serializer.get_field_writer(self.field, 0);
|
||||
//let compressor = IntervalCompressor::from_vals(self.vals.to_vec());
|
||||
//let vals = (0..self.val_count).map(|idx|
|
||||
//if self.null_values.contains(idx as u32) {
|
||||
//self.comp
|
||||
|
||||
//}
|
||||
//)
|
||||
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
/// Fast field writer for ints.
|
||||
/// The fast field writer just keeps the values in memory.
|
||||
///
|
||||
|
||||
Reference in New Issue
Block a user