mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2025-12-27 20:42:54 +00:00
Compare commits
12 Commits
remove-byt
...
ip_fastfie
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c62ddb61b7 | ||
|
|
ed85ba62b3 | ||
|
|
4b7ed27595 | ||
|
|
66ccba2878 | ||
|
|
c56f4572f4 | ||
|
|
399b137617 | ||
|
|
f3efb41d4e | ||
|
|
20a09282a1 | ||
|
|
1107400ae0 | ||
|
|
391f881fa1 | ||
|
|
eec908e962 | ||
|
|
4a1b251a08 |
@@ -61,6 +61,7 @@ serde_cbor = { version = "0.11.2", optional = true }
|
||||
async-trait = "0.1.53"
|
||||
arc-swap = "1.5.0"
|
||||
gcd = "2.1.0"
|
||||
roaring = "0.9.0"
|
||||
|
||||
[target.'cfg(windows)'.dependencies]
|
||||
winapi = "0.3.9"
|
||||
|
||||
@@ -11,7 +11,10 @@ mod writer;
|
||||
|
||||
pub use bitset::*;
|
||||
pub use serialize::{BinarySerializable, DeserializeFrom, FixedSize};
|
||||
pub use vint::{read_u32_vint, read_u32_vint_no_advance, serialize_vint_u32, write_u32_vint, VInt};
|
||||
pub use vint::{
|
||||
deserialize_vint_u128, read_u32_vint, read_u32_vint_no_advance, serialize_vint_u128,
|
||||
serialize_vint_u32, write_u32_vint, VInt,
|
||||
};
|
||||
pub use writer::{AntiCallToken, CountingWriter, TerminatingWrite};
|
||||
|
||||
/// Has length trait
|
||||
|
||||
@@ -5,6 +5,40 @@ use byteorder::{ByteOrder, LittleEndian};
|
||||
|
||||
use super::BinarySerializable;
|
||||
|
||||
/// Variable int serializes a u128 number
|
||||
pub fn serialize_vint_u128(mut val: u128, output: &mut Vec<u8>) {
|
||||
loop {
|
||||
let next_byte: u8 = (val % 128u128) as u8;
|
||||
val /= 128u128;
|
||||
if val == 0 {
|
||||
output.push(next_byte | STOP_BIT);
|
||||
return;
|
||||
} else {
|
||||
output.push(next_byte);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Deserializes a u128 number
|
||||
///
|
||||
/// Returns the number and the slice after the vint
|
||||
pub fn deserialize_vint_u128(data: &[u8]) -> io::Result<(u128, &[u8])> {
|
||||
let mut result = 0u128;
|
||||
let mut shift = 0u64;
|
||||
for i in 0..19 {
|
||||
let b = data[i];
|
||||
result |= u128::from(b % 128u8) << shift;
|
||||
if b >= STOP_BIT {
|
||||
return Ok((result, &data[i + 1..]));
|
||||
}
|
||||
shift += 7;
|
||||
}
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
"Failed to deserialize u128 vint",
|
||||
))
|
||||
}
|
||||
|
||||
/// Wrapper over a `u64` that serializes as a variable int.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
pub struct VInt(pub u64);
|
||||
@@ -176,6 +210,7 @@ impl BinarySerializable for VInt {
|
||||
mod tests {
|
||||
|
||||
use super::{serialize_vint_u32, BinarySerializable, VInt};
|
||||
use crate::vint::{deserialize_vint_u128, serialize_vint_u128};
|
||||
|
||||
fn aux_test_vint(val: u64) {
|
||||
let mut v = [14u8; 10];
|
||||
@@ -217,6 +252,21 @@ mod tests {
|
||||
assert_eq!(&buffer[..len_vint], res2, "array wrong for {}", val);
|
||||
}
|
||||
|
||||
fn aux_test_vint_u128(val: u128) {
|
||||
let mut data = vec![];
|
||||
serialize_vint_u128(val, &mut data);
|
||||
let (deser_val, _data) = deserialize_vint_u128(&data).unwrap();
|
||||
assert_eq!(val, deser_val);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_vint_u128() {
|
||||
aux_test_vint_u128(0);
|
||||
aux_test_vint_u128(1);
|
||||
aux_test_vint_u128(u128::MAX / 3);
|
||||
aux_test_vint_u128(u128::MAX);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_vint_u32() {
|
||||
aux_test_serialize_vint_u32(0);
|
||||
|
||||
@@ -12,13 +12,16 @@ description = "Fast field codecs used by tantivy"
|
||||
common = { version = "0.3", path = "../common/", package = "tantivy-common" }
|
||||
tantivy-bitpacker = { version="0.2", path = "../bitpacker/" }
|
||||
prettytable-rs = {version="0.8.0", optional= true}
|
||||
rand = {version="0.8.3", optional= true}
|
||||
rand = { version="0.8.3", optional= true}
|
||||
itertools = { version="0.10.3", optional=true}
|
||||
measure_time = { version="0.8.2", optional=true}
|
||||
|
||||
[dev-dependencies]
|
||||
more-asserts = "0.3.0"
|
||||
proptest = "1.0.0"
|
||||
rand = "0.8.3"
|
||||
|
||||
[features]
|
||||
bin = ["prettytable-rs", "rand"]
|
||||
bin = ["prettytable-rs", "rand", "itertools", "measure_time"]
|
||||
default = ["bin"]
|
||||
|
||||
|
||||
729
fastfield_codecs/src/ip_codec.rs
Normal file
729
fastfield_codecs/src/ip_codec.rs
Normal file
@@ -0,0 +1,729 @@
|
||||
/// This codec takes a large number space (u128) and reduces it to a compact number space.
|
||||
///
|
||||
/// It will find spaces in the numer range. For example:
|
||||
///
|
||||
/// 100, 101, 102, 103, 104, 50000, 50001
|
||||
/// could be mapped to
|
||||
/// 100..104 -> 0..4
|
||||
/// 50000..50001 -> 5..6
|
||||
///
|
||||
/// Compact space 0..6 requires much less bits than 100..50001
|
||||
///
|
||||
/// The codec is created to compress ip addresses, but may be employed in other use cases.
|
||||
use std::{
|
||||
cmp::Ordering,
|
||||
collections::BinaryHeap,
|
||||
io::{self, Write},
|
||||
net::{IpAddr, Ipv6Addr},
|
||||
ops::RangeInclusive,
|
||||
};
|
||||
|
||||
use common::{deserialize_vint_u128, serialize_vint_u128};
|
||||
use tantivy_bitpacker::{self, BitPacker, BitUnpacker};
|
||||
|
||||
use crate::FastFieldCodecReaderU128;
|
||||
|
||||
pub fn ip_to_u128(ip_addr: IpAddr) -> u128 {
|
||||
let ip_addr_v6: Ipv6Addr = match ip_addr {
|
||||
IpAddr::V4(v4) => v4.to_ipv6_mapped(),
|
||||
IpAddr::V6(v6) => v6,
|
||||
};
|
||||
u128::from_be_bytes(ip_addr_v6.octets())
|
||||
}
|
||||
|
||||
const INTERVAL_COST_IN_BITS: usize = 64;
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct IntervalEncoding();
|
||||
|
||||
pub struct IntervalCompressor {
|
||||
pub null_value: u128,
|
||||
min_value: u128,
|
||||
max_value: u128,
|
||||
compact_space: CompactSpace,
|
||||
pub num_bits: u8,
|
||||
}
|
||||
|
||||
#[derive(Debug, Eq, PartialEq)]
|
||||
struct DeltaAndPos {
|
||||
delta: u128,
|
||||
pos: usize,
|
||||
}
|
||||
impl DeltaAndPos {
|
||||
fn new(ip: u128, pos: usize) -> Self {
|
||||
DeltaAndPos { delta: ip, pos }
|
||||
}
|
||||
}
|
||||
|
||||
impl Ord for DeltaAndPos {
|
||||
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
||||
self.delta.cmp(&other.delta)
|
||||
}
|
||||
}
|
||||
impl PartialOrd for DeltaAndPos {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
|
||||
self.delta.partial_cmp(&other.delta)
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_delta_and_pos_sort() {
|
||||
let mut deltas: BinaryHeap<DeltaAndPos> = BinaryHeap::new();
|
||||
deltas.push(DeltaAndPos::new(10, 1));
|
||||
deltas.push(DeltaAndPos::new(100, 10));
|
||||
deltas.push(DeltaAndPos::new(1, 10));
|
||||
assert_eq!(deltas.pop().unwrap().delta, 100);
|
||||
assert_eq!(deltas.pop().unwrap().delta, 10);
|
||||
}
|
||||
|
||||
/// Put the deltas for the sorted ip addresses into a binary heap
|
||||
fn get_deltas(ip_addrs_sorted: &[u128]) -> BinaryHeap<DeltaAndPos> {
|
||||
let mut prev_opt = None;
|
||||
let mut deltas: BinaryHeap<DeltaAndPos> = BinaryHeap::new();
|
||||
for (pos, ip_addr) in ip_addrs_sorted.iter().cloned().enumerate() {
|
||||
let delta = if let Some(prev) = prev_opt {
|
||||
ip_addr - prev
|
||||
} else {
|
||||
ip_addr + 1
|
||||
};
|
||||
// skip too small deltas
|
||||
if delta > 2 {
|
||||
deltas.push(DeltaAndPos::new(delta, pos));
|
||||
}
|
||||
prev_opt = Some(ip_addr);
|
||||
}
|
||||
deltas
|
||||
}
|
||||
|
||||
/// Will collect blanks and add them to compact space if it will affect the number of bits used on
|
||||
/// the compact space.
|
||||
fn get_compact_space(ip_addrs_sorted: &[u128], cost_per_interval: usize) -> CompactSpace {
|
||||
let max_val = *ip_addrs_sorted.last().unwrap_or(&0u128) + 1;
|
||||
let mut deltas = get_deltas(ip_addrs_sorted);
|
||||
let mut amplitude_compact_space = max_val;
|
||||
let mut amplitude_bits: u8 = (amplitude_compact_space as f64).log2().ceil() as u8;
|
||||
let mut staged_blanks = vec![];
|
||||
|
||||
let mut compact_space = CompactSpaceBuilder::new();
|
||||
|
||||
// We will stage blanks until they reduce the compact space by 1 bit.
|
||||
// Binary heap to process the gaps by their size
|
||||
while let Some(ip_addr_and_pos) = deltas.pop() {
|
||||
let delta = ip_addr_and_pos.delta;
|
||||
let pos = ip_addr_and_pos.pos;
|
||||
staged_blanks.push((delta, pos));
|
||||
let staged_spaces_sum: u128 = staged_blanks.iter().map(|(delta, _)| delta - 1).sum();
|
||||
// +1 for later added null value
|
||||
let amplitude_new_compact_space = amplitude_compact_space - staged_spaces_sum + 1;
|
||||
let amplitude_new_bits = (amplitude_new_compact_space as f64).log2().ceil() as u8;
|
||||
if amplitude_bits == amplitude_new_bits {
|
||||
continue;
|
||||
}
|
||||
let saved_bits = (amplitude_bits - amplitude_new_bits) as usize * ip_addrs_sorted.len();
|
||||
let cost = staged_blanks.len() * cost_per_interval;
|
||||
if cost >= saved_bits {
|
||||
// Continue here, since although we walk over the deltas by size,
|
||||
// we can potentially save a lot at the last bits, which are smaller deltas
|
||||
//
|
||||
// E.g. if the first range reduces the compact space by 1000 from 2000 to 1000, which
|
||||
// saves 11-10=1 bit and the next range reduces the compact space by 950 to
|
||||
// 50, which saves 10-6=4 bit
|
||||
continue;
|
||||
}
|
||||
|
||||
amplitude_compact_space = amplitude_new_compact_space;
|
||||
amplitude_bits = amplitude_new_bits;
|
||||
for (_, pos) in staged_blanks.drain(..) {
|
||||
let ip_addr = ip_addrs_sorted[pos];
|
||||
if pos == 0 {
|
||||
compact_space.add_hole(0..=ip_addr - 1);
|
||||
} else {
|
||||
compact_space.add_hole(ip_addrs_sorted[pos - 1] + 1..=ip_addr - 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
compact_space.add_hole(max_val..=u128::MAX);
|
||||
|
||||
compact_space.finish()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn compact_space_test() {
|
||||
// small ranges are ignored here
|
||||
let ips = vec![
|
||||
2u128, 4u128, 1000, 1001, 1002, 1003, 1004, 1005, 1008, 1010, 1012, 1260,
|
||||
];
|
||||
let ranges_and_compact_start = get_compact_space(&ips, 11);
|
||||
let null_value = ranges_and_compact_start.null_value;
|
||||
let amplitude = ranges_and_compact_start.amplitude_compact_space();
|
||||
assert_eq!(null_value, 5);
|
||||
assert_eq!(amplitude, 20);
|
||||
assert_eq!(2, ranges_and_compact_start.to_compact(2).unwrap());
|
||||
|
||||
assert_eq!(ranges_and_compact_start.to_compact(100).unwrap_err(), 0);
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
struct CompactSpaceBuilder {
|
||||
covered_space: Vec<std::ops::RangeInclusive<u128>>,
|
||||
}
|
||||
|
||||
impl CompactSpaceBuilder {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
covered_space: vec![0..=u128::MAX],
|
||||
}
|
||||
}
|
||||
|
||||
// Will extend the first range and add a null value to it.
|
||||
fn assign_and_return_null(&mut self) -> u128 {
|
||||
self.covered_space[0] = *self.covered_space[0].start()..=*self.covered_space[0].end() + 1;
|
||||
*self.covered_space[0].end()
|
||||
}
|
||||
|
||||
// Assumes that repeated add_hole calls don't overlap.
|
||||
fn add_hole(&mut self, hole: std::ops::RangeInclusive<u128>) {
|
||||
let position = self
|
||||
.covered_space
|
||||
.iter()
|
||||
.position(|range| range.start() <= hole.start() && range.end() >= hole.end());
|
||||
if let Some(position) = position {
|
||||
let old_range = self.covered_space.remove(position);
|
||||
if old_range == hole {
|
||||
return;
|
||||
}
|
||||
let new_range_end = hole.end().saturating_add(1)..=*old_range.end();
|
||||
if old_range.start() == hole.start() {
|
||||
self.covered_space.insert(position, new_range_end);
|
||||
return;
|
||||
}
|
||||
let new_range_start = *old_range.start()..=hole.start().saturating_sub(1);
|
||||
if old_range.end() == hole.end() {
|
||||
self.covered_space.insert(position, new_range_start);
|
||||
return;
|
||||
}
|
||||
self.covered_space.insert(position, new_range_end);
|
||||
self.covered_space.insert(position, new_range_start);
|
||||
}
|
||||
}
|
||||
fn finish(mut self) -> CompactSpace {
|
||||
let null_value = self.assign_and_return_null();
|
||||
|
||||
let mut compact_start: u64 = 0;
|
||||
let mut ranges_and_compact_start = vec![];
|
||||
for cov in self.covered_space {
|
||||
let covered_range_len = cov.end() - cov.start();
|
||||
ranges_and_compact_start.push((cov, compact_start));
|
||||
compact_start += covered_range_len as u64 + 1;
|
||||
}
|
||||
CompactSpace {
|
||||
ranges_and_compact_start,
|
||||
null_value,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
struct CompactSpace {
|
||||
ranges_and_compact_start: Vec<(std::ops::RangeInclusive<u128>, u64)>,
|
||||
pub null_value: u128,
|
||||
}
|
||||
impl CompactSpace {
|
||||
fn amplitude_compact_space(&self) -> u128 {
|
||||
let last_range = &self.ranges_and_compact_start[self.ranges_and_compact_start.len() - 1];
|
||||
last_range.1 as u128 + (last_range.0.end() - last_range.0.start()) + 1
|
||||
}
|
||||
|
||||
fn get_range_and_compact_start(&self, pos: usize) -> &(std::ops::RangeInclusive<u128>, u64) {
|
||||
&self.ranges_and_compact_start[pos]
|
||||
}
|
||||
fn serialize(&self, output: &mut Vec<u8>) {
|
||||
serialize_vint_u128(self.null_value as u128, output);
|
||||
serialize_vint_u128(self.ranges_and_compact_start.len() as u128, output);
|
||||
let mut prev_ip = 0;
|
||||
for (ip_range, _compact) in &self.ranges_and_compact_start {
|
||||
let delta_ip = ip_range.start() - prev_ip;
|
||||
serialize_vint_u128(delta_ip as u128, output);
|
||||
prev_ip = *ip_range.start();
|
||||
|
||||
let delta_ip = ip_range.end() - prev_ip;
|
||||
serialize_vint_u128(delta_ip as u128, output);
|
||||
prev_ip = *ip_range.end();
|
||||
}
|
||||
}
|
||||
|
||||
fn deserialize(data: &[u8]) -> io::Result<(&[u8], Self)> {
|
||||
let (null_value, data) = deserialize_vint_u128(data)?;
|
||||
let (num_ip_addrs, mut data) = deserialize_vint_u128(data)?;
|
||||
let mut ip_addr = 0u128;
|
||||
let mut compact = 0u64;
|
||||
let mut ranges_and_compact_start: Vec<(std::ops::RangeInclusive<u128>, u64)> = vec![];
|
||||
for _ in 0..num_ip_addrs {
|
||||
let (ip_addr_delta, new_data) = deserialize_vint_u128(data)?;
|
||||
data = new_data;
|
||||
ip_addr += ip_addr_delta;
|
||||
let ip_addr_start = ip_addr;
|
||||
|
||||
let (ip_addr_delta, new_data) = deserialize_vint_u128(data)?;
|
||||
data = new_data;
|
||||
ip_addr += ip_addr_delta;
|
||||
let ip_addr_end = ip_addr;
|
||||
|
||||
let compact_delta = ip_addr_end - ip_addr_start + 1;
|
||||
|
||||
ranges_and_compact_start.push((ip_addr_start..=ip_addr_end, compact));
|
||||
compact += compact_delta as u64;
|
||||
}
|
||||
Ok((
|
||||
data,
|
||||
Self {
|
||||
null_value,
|
||||
ranges_and_compact_start,
|
||||
},
|
||||
))
|
||||
}
|
||||
|
||||
/// Returns either Ok(the value in the compact space) or if it is outside the compact space the
|
||||
/// Err(position on the next larger range above the value)
|
||||
fn to_compact(&self, ip: u128) -> Result<u64, usize> {
|
||||
self.ranges_and_compact_start
|
||||
.binary_search_by(|probe| {
|
||||
let ip_range = &probe.0;
|
||||
if *ip_range.start() <= ip && *ip_range.end() >= ip {
|
||||
return Ordering::Equal;
|
||||
} else if ip < *ip_range.start() {
|
||||
return Ordering::Greater;
|
||||
} else if ip > *ip_range.end() {
|
||||
return Ordering::Less;
|
||||
}
|
||||
panic!("not covered all ranges in check");
|
||||
})
|
||||
.map(|pos| {
|
||||
let (range, compact_start) = &self.ranges_and_compact_start[pos];
|
||||
compact_start + (ip - range.start()) as u64
|
||||
})
|
||||
.map_err(|pos| pos - 1)
|
||||
}
|
||||
|
||||
/// Unpacks a ip from compact space to u128 space
|
||||
fn unpack_ip(&self, compact: u64) -> u128 {
|
||||
let pos = self
|
||||
.ranges_and_compact_start
|
||||
.binary_search_by_key(&compact, |probe| probe.1)
|
||||
.map_or_else(|e| e - 1, |v| v);
|
||||
|
||||
let range_and_compact_start = &self.ranges_and_compact_start[pos];
|
||||
let diff = compact - self.ranges_and_compact_start[pos].1;
|
||||
range_and_compact_start.0.start() + diff as u128
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ranges_and_compact_start_test() {
|
||||
let ips = vec![
|
||||
2u128, 4u128, 1000, 1001, 1002, 1003, 1004, 1005, 1008, 1010, 1012, 1260,
|
||||
];
|
||||
let ranges_and_compact_start = get_compact_space(&ips, 11);
|
||||
assert_eq!(ranges_and_compact_start.null_value, 5);
|
||||
|
||||
let mut output = vec![];
|
||||
ranges_and_compact_start.serialize(&mut output);
|
||||
|
||||
assert_eq!(
|
||||
ranges_and_compact_start,
|
||||
CompactSpace::deserialize(&output).unwrap().1
|
||||
);
|
||||
|
||||
for ip in &ips {
|
||||
let compact = ranges_and_compact_start.to_compact(*ip).unwrap();
|
||||
assert_eq!(ranges_and_compact_start.unpack_ip(compact), *ip);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn train(ip_addrs_sorted: &[u128]) -> IntervalCompressor {
|
||||
let ranges_and_compact_start = get_compact_space(ip_addrs_sorted, INTERVAL_COST_IN_BITS);
|
||||
let null_value = ranges_and_compact_start.null_value;
|
||||
let amplitude_compact_space = ranges_and_compact_start.amplitude_compact_space();
|
||||
|
||||
assert!(
|
||||
amplitude_compact_space <= u64::MAX as u128,
|
||||
"case unsupported."
|
||||
);
|
||||
|
||||
let num_bits = tantivy_bitpacker::compute_num_bits(amplitude_compact_space as u64);
|
||||
let min_value = *ip_addrs_sorted.first().unwrap_or(&0);
|
||||
let max_value = *ip_addrs_sorted.last().unwrap_or(&0);
|
||||
let compressor = IntervalCompressor {
|
||||
null_value,
|
||||
min_value,
|
||||
max_value,
|
||||
compact_space: ranges_and_compact_start,
|
||||
num_bits,
|
||||
};
|
||||
|
||||
let max_value = *ip_addrs_sorted.last().unwrap_or(&0u128).max(&null_value);
|
||||
assert_eq!(
|
||||
compressor.to_compact(max_value) + 1,
|
||||
amplitude_compact_space as u64
|
||||
);
|
||||
compressor
|
||||
}
|
||||
|
||||
impl IntervalCompressor {
|
||||
/// Taking the vals as Vec may cost a lot of memory.
|
||||
/// It is used to sort the vals.
|
||||
///
|
||||
/// Less memory alternative: We could just store the index (u32), and use that as sorting.
|
||||
pub fn from_vals(mut vals: Vec<u128>) -> Self {
|
||||
vals.sort();
|
||||
train(&vals)
|
||||
}
|
||||
|
||||
fn to_compact(&self, ip_addr: u128) -> u64 {
|
||||
self.compact_space.to_compact(ip_addr).unwrap()
|
||||
}
|
||||
|
||||
fn write_footer(&self, write: &mut impl Write, num_vals: u128) -> io::Result<()> {
|
||||
let mut footer = vec![];
|
||||
|
||||
// header flags for future optional dictionary encoding
|
||||
let header_flags = 0u64;
|
||||
footer.extend_from_slice(&header_flags.to_le_bytes());
|
||||
|
||||
let null_value = self
|
||||
.compact_space
|
||||
.to_compact(self.null_value)
|
||||
.expect("could not convert null to compact space");
|
||||
serialize_vint_u128(null_value as u128, &mut footer);
|
||||
serialize_vint_u128(self.min_value, &mut footer);
|
||||
serialize_vint_u128(self.max_value, &mut footer);
|
||||
|
||||
self.compact_space.serialize(&mut footer);
|
||||
|
||||
footer.push(self.num_bits);
|
||||
serialize_vint_u128(num_vals as u128, &mut footer);
|
||||
|
||||
write.write_all(&footer)?;
|
||||
let footer_len = footer.len() as u32;
|
||||
write.write_all(&footer_len.to_le_bytes())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn compress(&self, vals: &[u128]) -> io::Result<Vec<u8>> {
|
||||
let mut output = vec![];
|
||||
self.compress_into(vals.iter().cloned(), &mut output)?;
|
||||
Ok(output)
|
||||
}
|
||||
pub fn compress_into(
|
||||
&self,
|
||||
vals: impl Iterator<Item = u128>,
|
||||
write: &mut impl Write,
|
||||
) -> io::Result<()> {
|
||||
let mut bitpacker = BitPacker::default();
|
||||
let mut num_vals = 0;
|
||||
for ip_addr in vals {
|
||||
let compact = self.to_compact(ip_addr);
|
||||
bitpacker.write(compact, self.num_bits, write).unwrap();
|
||||
num_vals += 1;
|
||||
}
|
||||
bitpacker.close(write).unwrap();
|
||||
self.write_footer(write, num_vals as u128)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct IntervallDecompressor {
|
||||
compact_space: CompactSpace,
|
||||
bit_unpacker: BitUnpacker,
|
||||
null_compact_space: u64,
|
||||
min_value: u128,
|
||||
max_value: u128,
|
||||
num_vals: usize,
|
||||
}
|
||||
|
||||
impl FastFieldCodecReaderU128 for IntervallDecompressor {
|
||||
fn open_from_bytes(bytes: &[u8]) -> std::io::Result<Self> {
|
||||
Self::open(bytes)
|
||||
}
|
||||
|
||||
fn get(&self, doc: u64, data: &[u8]) -> Option<u128> {
|
||||
self.get(doc, data)
|
||||
}
|
||||
|
||||
fn get_between_vals(&self, range: RangeInclusive<u128>, data: &[u8]) -> Vec<usize> {
|
||||
self.get_range(range, data)
|
||||
}
|
||||
|
||||
fn min_value(&self) -> u128 {
|
||||
self.min_value()
|
||||
}
|
||||
|
||||
fn max_value(&self) -> u128 {
|
||||
self.max_value()
|
||||
}
|
||||
|
||||
/// The computed and assigned number for null values
|
||||
fn null_value(&self) -> u128 {
|
||||
self.compact_space.null_value
|
||||
}
|
||||
|
||||
fn iter<'a>(&'a self, data: &'a [u8]) -> Box<dyn Iterator<Item = Option<u128>> + 'a> {
|
||||
Box::new(self.iter(data))
|
||||
}
|
||||
}
|
||||
|
||||
impl IntervallDecompressor {
|
||||
pub fn open(data: &[u8]) -> io::Result<IntervallDecompressor> {
|
||||
let (data, footer_len_bytes) = data.split_at(data.len() - 4);
|
||||
let footer_len = u32::from_le_bytes(footer_len_bytes.try_into().unwrap());
|
||||
|
||||
let data = &data[data.len() - footer_len as usize..];
|
||||
let (_header_flags, data) = data.split_at(8);
|
||||
let (null_compact_space, data) = deserialize_vint_u128(data)?;
|
||||
let (min_value, data) = deserialize_vint_u128(data)?;
|
||||
let (max_value, data) = deserialize_vint_u128(data)?;
|
||||
let (mut data, compact_space) = CompactSpace::deserialize(data).unwrap();
|
||||
|
||||
let num_bits = data[0];
|
||||
data = &data[1..];
|
||||
let (num_vals, _data) = deserialize_vint_u128(data)?;
|
||||
let decompressor = IntervallDecompressor {
|
||||
null_compact_space: null_compact_space as u64,
|
||||
min_value,
|
||||
max_value,
|
||||
compact_space,
|
||||
num_vals: num_vals as usize,
|
||||
bit_unpacker: BitUnpacker::new(num_bits),
|
||||
};
|
||||
|
||||
Ok(decompressor)
|
||||
}
|
||||
|
||||
/// Converting to compact space for the decompressor is more complex, since we may get values
|
||||
/// which are outside the compact space. e.g. if we map
|
||||
/// 1000 => 5
|
||||
/// 2000 => 6
|
||||
///
|
||||
/// and we want a mapping for 1005, there is no equivalent compact space. We instead return an
|
||||
/// error with the index of the next range.
|
||||
fn to_compact(&self, ip_addr: u128) -> Result<u64, usize> {
|
||||
self.compact_space.to_compact(ip_addr)
|
||||
}
|
||||
|
||||
fn compact_to_ip_addr(&self, compact: u64) -> u128 {
|
||||
self.compact_space.unpack_ip(compact)
|
||||
}
|
||||
|
||||
/// Comparing on compact space: 1.2 GElements/s
|
||||
///
|
||||
/// Comparing on original space: .06 GElements/s (not completely optimized)
|
||||
pub fn get_range(&self, range: RangeInclusive<u128>, data: &[u8]) -> Vec<usize> {
|
||||
let from_ip_addr = *range.start();
|
||||
let to_ip_addr = *range.end();
|
||||
assert!(to_ip_addr >= from_ip_addr);
|
||||
let compact_from = self.to_compact(from_ip_addr);
|
||||
let compact_to = self.to_compact(to_ip_addr);
|
||||
// Quick return, if both ranges fall into the same non-mapped space, the range can't cover
|
||||
// any values, so we can early exit
|
||||
match (compact_to, compact_from) {
|
||||
(Err(pos1), Err(pos2)) if pos1 == pos2 => return vec![],
|
||||
_ => {}
|
||||
}
|
||||
|
||||
let compact_from = compact_from.unwrap_or_else(|pos| {
|
||||
let range_and_compact_start = self.compact_space.get_range_and_compact_start(pos);
|
||||
let compact_end = range_and_compact_start.1
|
||||
+ (range_and_compact_start.0.end() - range_and_compact_start.0.start()) as u64;
|
||||
compact_end + 1
|
||||
});
|
||||
// If there is no compact space, we go to the closest upperbound compact space
|
||||
let compact_to = compact_to.unwrap_or_else(|pos| {
|
||||
let range_and_compact_start = self.compact_space.get_range_and_compact_start(pos);
|
||||
let compact_end = range_and_compact_start.1
|
||||
+ (range_and_compact_start.0.end() - range_and_compact_start.0.start()) as u64;
|
||||
compact_end
|
||||
});
|
||||
|
||||
let range = compact_from..=compact_to;
|
||||
let mut positions = vec![];
|
||||
|
||||
for (pos, compact_ip) in self
|
||||
.iter_compact(data)
|
||||
.enumerate()
|
||||
.filter(|(_pos, val)| *val != self.null_compact_space)
|
||||
{
|
||||
if range.contains(&compact_ip) {
|
||||
positions.push(pos);
|
||||
}
|
||||
}
|
||||
|
||||
positions
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn iter_compact<'a>(&'a self, data: &'a [u8]) -> impl Iterator<Item = u64> + 'a {
|
||||
(0..self.num_vals).map(move |idx| self.bit_unpacker.get(idx as u64, data) as u64)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn iter<'a>(&'a self, data: &'a [u8]) -> impl Iterator<Item = Option<u128>> + 'a {
|
||||
// TODO: Performance. It would be better to iterate on the ranges and check existence via
|
||||
// the bit_unpacker.
|
||||
self.iter_compact(data).map(|compact| {
|
||||
if compact == self.null_compact_space {
|
||||
None
|
||||
} else {
|
||||
Some(self.compact_to_ip_addr(compact))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get(&self, idx: u64, data: &[u8]) -> Option<u128> {
|
||||
let compact = self.bit_unpacker.get(idx, data);
|
||||
if compact == self.null_compact_space {
|
||||
None
|
||||
} else {
|
||||
Some(self.compact_to_ip_addr(compact))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn min_value(&self) -> u128 {
|
||||
self.min_value
|
||||
}
|
||||
|
||||
pub fn max_value(&self) -> u128 {
|
||||
self.max_value
|
||||
}
|
||||
}
|
||||
|
||||
impl IntervalEncoding {
|
||||
pub fn train(&self, mut vals: Vec<u128>) -> IntervalCompressor {
|
||||
vals.sort();
|
||||
train(&vals)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use super::*;
|
||||
|
||||
fn decode_all(data: &[u8]) -> Vec<u128> {
|
||||
let decompressor = IntervallDecompressor::open(data).unwrap();
|
||||
let mut u128_vals = Vec::new();
|
||||
for idx in 0..decompressor.num_vals as usize {
|
||||
let val = decompressor.get(idx as u64, data);
|
||||
if let Some(val) = val {
|
||||
u128_vals.push(val);
|
||||
}
|
||||
}
|
||||
u128_vals
|
||||
}
|
||||
|
||||
fn test_aux_vals(encoder: &IntervalEncoding, u128_vals: &[u128]) -> Vec<u8> {
|
||||
let compressor = encoder.train(u128_vals.to_vec());
|
||||
let data = compressor.compress(u128_vals).unwrap();
|
||||
let decoded_val = decode_all(&data);
|
||||
assert_eq!(&decoded_val, u128_vals);
|
||||
data
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_range_1() {
|
||||
let vals = &[
|
||||
1u128,
|
||||
100u128,
|
||||
3u128,
|
||||
99999u128,
|
||||
100000u128,
|
||||
100001u128,
|
||||
4_000_211_221u128,
|
||||
4_000_211_222u128,
|
||||
333u128,
|
||||
];
|
||||
let interval_encoding = IntervalEncoding::default();
|
||||
let data = test_aux_vals(&interval_encoding, vals);
|
||||
let decomp = IntervallDecompressor::open(&data).unwrap();
|
||||
let positions = decomp.get_range(0..=1, &data);
|
||||
assert_eq!(positions, vec![0]);
|
||||
let positions = decomp.get_range(0..=2, &data);
|
||||
assert_eq!(positions, vec![0]);
|
||||
let positions = decomp.get_range(0..=3, &data);
|
||||
assert_eq!(positions, vec![0, 2]);
|
||||
assert_eq!(decomp.get_range(99999u128..=99999u128, &data), vec![3]);
|
||||
assert_eq!(decomp.get_range(99998u128..=100000u128, &data), vec![3, 4]);
|
||||
assert_eq!(decomp.get_range(99998u128..=99999u128, &data), vec![3]);
|
||||
assert_eq!(decomp.get_range(99998u128..=99998u128, &data), vec![]);
|
||||
assert_eq!(decomp.get_range(333u128..=333u128, &data), vec![8]);
|
||||
assert_eq!(decomp.get_range(332u128..=333u128, &data), vec![8]);
|
||||
assert_eq!(decomp.get_range(332u128..=334u128, &data), vec![8]);
|
||||
assert_eq!(decomp.get_range(333u128..=334u128, &data), vec![8]);
|
||||
|
||||
assert_eq!(
|
||||
decomp.get_range(4_000_211_221u128..=5_000_000_000u128, &data),
|
||||
vec![6, 7]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_empty() {
|
||||
let vals = &[];
|
||||
let interval_encoding = IntervalEncoding::default();
|
||||
let data = test_aux_vals(&interval_encoding, vals);
|
||||
let _decomp = IntervallDecompressor::open(&data).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_range_2() {
|
||||
let vals = &[
|
||||
100u128,
|
||||
99999u128,
|
||||
100000u128,
|
||||
100001u128,
|
||||
4_000_211_221u128,
|
||||
4_000_211_222u128,
|
||||
333u128,
|
||||
];
|
||||
let interval_encoding = IntervalEncoding::default();
|
||||
let data = test_aux_vals(&interval_encoding, vals);
|
||||
let decomp = IntervallDecompressor::open(&data).unwrap();
|
||||
let positions = decomp.get_range(0..=5, &data);
|
||||
assert_eq!(positions, vec![]);
|
||||
let positions = decomp.get_range(0..=100, &data);
|
||||
assert_eq!(positions, vec![0]);
|
||||
let positions = decomp.get_range(0..=105, &data);
|
||||
assert_eq!(positions, vec![0]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_null() {
|
||||
let vals = &[2u128];
|
||||
let interval_encoding = IntervalEncoding::default().train(vals.to_vec());
|
||||
let vals = vec![interval_encoding.null_value, 2u128];
|
||||
let data = interval_encoding.compress(&vals).unwrap();
|
||||
let decomp = IntervallDecompressor::open(&data).unwrap();
|
||||
let positions = decomp.get_range(0..=1, &data);
|
||||
assert_eq!(positions, vec![]);
|
||||
let positions = decomp.get_range(2..=2, &data);
|
||||
assert_eq!(positions, vec![1]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_first_large_gaps() {
|
||||
let vals = &[1_000_000_000u128; 100];
|
||||
let interval_encoding = IntervalEncoding::default();
|
||||
let _data = test_aux_vals(&interval_encoding, vals);
|
||||
}
|
||||
use proptest::prelude::*;
|
||||
|
||||
proptest! {
|
||||
|
||||
#[test]
|
||||
fn compress_decompress_random(vals in proptest::collection::vec(any::<u128>()
|
||||
, 1..1000)) {
|
||||
let interval_encoding = IntervalEncoding::default();
|
||||
let _data = test_aux_vals(&interval_encoding, &vals);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -4,8 +4,10 @@ extern crate more_asserts;
|
||||
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
use std::ops::RangeInclusive;
|
||||
|
||||
pub mod bitpacked;
|
||||
pub mod ip_codec;
|
||||
pub mod linearinterpol;
|
||||
pub mod multilinearinterpol;
|
||||
|
||||
@@ -19,10 +21,32 @@ pub trait FastFieldCodecReader: Sized {
|
||||
fn max_value(&self) -> u64;
|
||||
}
|
||||
|
||||
pub trait FastFieldCodecReaderU128: Sized {
|
||||
/// reads the metadata and returns the CodecReader
|
||||
fn open_from_bytes(bytes: &[u8]) -> std::io::Result<Self>;
|
||||
|
||||
/// Get value for doc
|
||||
fn get(&self, doc: u64, data: &[u8]) -> Option<u128>;
|
||||
|
||||
/// Iterator
|
||||
///
|
||||
/// Replace with opaque type after: https://github.com/rust-lang/rust/issues/63063
|
||||
fn iter<'a>(&'a self, data: &'a [u8]) -> Box<dyn Iterator<Item = Option<u128>> + 'a>;
|
||||
|
||||
/// Get positions (=docs in single value) for provided value range
|
||||
fn get_between_vals(&self, range: RangeInclusive<u128>, data: &[u8]) -> Vec<usize>;
|
||||
|
||||
/// The computed and assigned number value for null values
|
||||
fn null_value(&self) -> u128;
|
||||
|
||||
fn min_value(&self) -> u128;
|
||||
fn max_value(&self) -> u128;
|
||||
}
|
||||
|
||||
/// The FastFieldSerializerEstimate trait is required on all variants
|
||||
/// of fast field compressions, to decide which one to choose.
|
||||
pub trait FastFieldCodecSerializer {
|
||||
/// A codex needs to provide a unique name and id, which is
|
||||
/// A codec needs to provide a unique name and id, which is
|
||||
/// used for debugging and de/serialization.
|
||||
const NAME: &'static str;
|
||||
const ID: u8;
|
||||
|
||||
@@ -1,11 +1,117 @@
|
||||
#[macro_use]
|
||||
extern crate prettytable;
|
||||
use std::collections::HashSet;
|
||||
use std::env;
|
||||
use std::io::BufRead;
|
||||
use std::net::{IpAddr, Ipv6Addr};
|
||||
use std::str::FromStr;
|
||||
|
||||
use fastfield_codecs::ip_codec::{IntervalEncoding, IntervallDecompressor};
|
||||
use fastfield_codecs::linearinterpol::LinearInterpolFastFieldSerializer;
|
||||
use fastfield_codecs::multilinearinterpol::MultiLinearInterpolFastFieldSerializer;
|
||||
use fastfield_codecs::{FastFieldCodecSerializer, FastFieldStats};
|
||||
use itertools::Itertools;
|
||||
use measure_time::print_time;
|
||||
use prettytable::{Cell, Row, Table};
|
||||
|
||||
fn print_set_stats(ip_addrs: &[u128]) {
|
||||
println!("NumIps\t{}", ip_addrs.len());
|
||||
let ip_addr_set: HashSet<u128> = ip_addrs.iter().cloned().collect();
|
||||
println!("NumUniqueIps\t{}", ip_addr_set.len());
|
||||
let ratio_unique = ip_addr_set.len() as f64 / ip_addrs.len() as f64;
|
||||
println!("RatioUniqueOverTotal\t{ratio_unique:.4}");
|
||||
|
||||
// histogram
|
||||
let mut ip_addrs = ip_addrs.to_vec();
|
||||
ip_addrs.sort();
|
||||
let mut cnts: Vec<usize> = ip_addrs
|
||||
.into_iter()
|
||||
.dedup_with_count()
|
||||
.map(|(cnt, _)| cnt)
|
||||
.collect();
|
||||
cnts.sort();
|
||||
|
||||
let top_256_cnt: usize = cnts.iter().rev().take(256).sum();
|
||||
let top_128_cnt: usize = cnts.iter().rev().take(128).sum();
|
||||
let top_64_cnt: usize = cnts.iter().rev().take(64).sum();
|
||||
let top_8_cnt: usize = cnts.iter().rev().take(8).sum();
|
||||
let total: usize = cnts.iter().sum();
|
||||
|
||||
println!("{}", total);
|
||||
println!("{}", top_256_cnt);
|
||||
println!("{}", top_128_cnt);
|
||||
println!("Percentage Top8 {:02}", top_8_cnt as f32 / total as f32);
|
||||
println!("Percentage Top64 {:02}", top_64_cnt as f32 / total as f32);
|
||||
println!("Percentage Top128 {:02}", top_128_cnt as f32 / total as f32);
|
||||
println!("Percentage Top256 {:02}", top_256_cnt as f32 / total as f32);
|
||||
|
||||
let mut cnts: Vec<(usize, usize)> = cnts.into_iter().dedup_with_count().collect();
|
||||
cnts.sort_by(|a, b| {
|
||||
if a.1 == b.1 {
|
||||
a.0.cmp(&b.0)
|
||||
} else {
|
||||
b.1.cmp(&a.1)
|
||||
}
|
||||
});
|
||||
|
||||
println!("\n\n----\nIP Address histogram");
|
||||
println!("IPAddrCount\tFrequency");
|
||||
for (ip_addr_count, times) in cnts {
|
||||
println!("{}\t{}", ip_addr_count, times);
|
||||
}
|
||||
}
|
||||
|
||||
fn ip_dataset() -> Vec<u128> {
|
||||
let mut ip_addr_v4 = 0;
|
||||
|
||||
let stdin = std::io::stdin();
|
||||
let ip_addrs: Vec<u128> = stdin
|
||||
.lock()
|
||||
.lines()
|
||||
.flat_map(|line| {
|
||||
let line = line.unwrap();
|
||||
let line = line.trim();
|
||||
let ip_addr = IpAddr::from_str(line.trim()).ok()?;
|
||||
if ip_addr.is_ipv4() {
|
||||
ip_addr_v4 += 1;
|
||||
}
|
||||
let ip_addr_v6: Ipv6Addr = match ip_addr {
|
||||
IpAddr::V4(v4) => v4.to_ipv6_mapped(),
|
||||
IpAddr::V6(v6) => v6,
|
||||
};
|
||||
Some(ip_addr_v6)
|
||||
})
|
||||
.map(|ip_v6| u128::from_be_bytes(ip_v6.octets()))
|
||||
.collect();
|
||||
|
||||
println!("IpAddrsAny\t{}", ip_addrs.len());
|
||||
println!("IpAddrsV4\t{}", ip_addr_v4);
|
||||
|
||||
ip_addrs
|
||||
}
|
||||
|
||||
fn bench_ip() {
|
||||
let encoding = IntervalEncoding();
|
||||
let dataset = ip_dataset();
|
||||
print_set_stats(&dataset);
|
||||
|
||||
let compressor = encoding.train(dataset.to_vec());
|
||||
let data = compressor.compress(&dataset).unwrap();
|
||||
|
||||
let decompressor = IntervallDecompressor::open(&data).unwrap();
|
||||
|
||||
for i in 11100..11150 {
|
||||
print_time!("get range");
|
||||
let doc_values = decompressor.get_range(dataset[i]..=dataset[i], &data);
|
||||
println!("{:?}", doc_values.len());
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
if env::args().nth(1).unwrap() == "bench" {
|
||||
bench_ip();
|
||||
return;
|
||||
}
|
||||
let mut table = Table::new();
|
||||
|
||||
// Add a row per time
|
||||
|
||||
@@ -38,7 +38,7 @@ impl BinarySerializable for FileAddr {
|
||||
/// A `CompositeWrite` is used to write a `CompositeFile`.
|
||||
pub struct CompositeWrite<W = WritePtr> {
|
||||
write: CountingWriter<W>,
|
||||
offsets: HashMap<FileAddr, u64>,
|
||||
offsets: Vec<(FileAddr, u64)>,
|
||||
}
|
||||
|
||||
impl<W: TerminatingWrite + Write> CompositeWrite<W> {
|
||||
@@ -47,7 +47,7 @@ impl<W: TerminatingWrite + Write> CompositeWrite<W> {
|
||||
pub fn wrap(w: W) -> CompositeWrite<W> {
|
||||
CompositeWrite {
|
||||
write: CountingWriter::wrap(w),
|
||||
offsets: HashMap::new(),
|
||||
offsets: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -60,8 +60,8 @@ impl<W: TerminatingWrite + Write> CompositeWrite<W> {
|
||||
pub fn for_field_with_idx(&mut self, field: Field, idx: usize) -> &mut CountingWriter<W> {
|
||||
let offset = self.write.written_bytes();
|
||||
let file_addr = FileAddr::new(field, idx);
|
||||
assert!(!self.offsets.contains_key(&file_addr));
|
||||
self.offsets.insert(file_addr, offset);
|
||||
assert!(!self.offsets.iter().any(|el| el.0 == file_addr));
|
||||
self.offsets.push((file_addr, offset));
|
||||
&mut self.write
|
||||
}
|
||||
|
||||
@@ -73,16 +73,8 @@ impl<W: TerminatingWrite + Write> CompositeWrite<W> {
|
||||
let footer_offset = self.write.written_bytes();
|
||||
VInt(self.offsets.len() as u64).serialize(&mut self.write)?;
|
||||
|
||||
let mut offset_fields: Vec<_> = self
|
||||
.offsets
|
||||
.iter()
|
||||
.map(|(file_addr, offset)| (*offset, *file_addr))
|
||||
.collect();
|
||||
|
||||
offset_fields.sort();
|
||||
|
||||
let mut prev_offset = 0;
|
||||
for (offset, file_addr) in offset_fields {
|
||||
for (file_addr, offset) in self.offsets {
|
||||
VInt((offset - prev_offset) as u64).serialize(&mut self.write)?;
|
||||
file_addr.serialize(&mut self.write)?;
|
||||
prev_offset = offset;
|
||||
@@ -106,6 +98,14 @@ pub struct CompositeFile {
|
||||
offsets_index: HashMap<FileAddr, Range<usize>>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for CompositeFile {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("CompositeFile")
|
||||
.field("offsets_index", &self.offsets_index)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl CompositeFile {
|
||||
/// Opens a composite file stored in a given
|
||||
/// `FileSlice`.
|
||||
|
||||
@@ -52,6 +52,11 @@ impl BytesFastFieldReader {
|
||||
}
|
||||
|
||||
impl MultiValueLength for BytesFastFieldReader {
|
||||
fn get_range(&self, doc_id: DocId) -> std::ops::Range<u64> {
|
||||
let (start, stop) = self.range(doc_id);
|
||||
start as u64..stop as u64
|
||||
}
|
||||
|
||||
fn get_len(&self, doc_id: DocId) -> u64 {
|
||||
self.num_bytes(doc_id) as u64
|
||||
}
|
||||
|
||||
241
src/fastfield/fast_value.rs
Normal file
241
src/fastfield/fast_value.rs
Normal file
@@ -0,0 +1,241 @@
|
||||
use std::net::{IpAddr, Ipv6Addr};
|
||||
|
||||
use crate::schema::{Cardinality, FieldType, Type};
|
||||
use crate::DateTime;
|
||||
|
||||
pub fn ip_to_u128(ip_addr: IpAddr) -> u128 {
|
||||
let ip_addr_v6: Ipv6Addr = match ip_addr {
|
||||
IpAddr::V4(v4) => v4.to_ipv6_mapped(),
|
||||
IpAddr::V6(v6) => v6,
|
||||
};
|
||||
u128::from_be_bytes(ip_addr_v6.octets())
|
||||
}
|
||||
|
||||
/// Trait for large types that are allowed for fast fields: u128, IpAddr
|
||||
pub trait FastValueU128: Clone + Copy + Send + Sync + PartialOrd + 'static {
|
||||
/// Converts a value from u128
|
||||
///
|
||||
/// Internally all fast field values are encoded as u128.
|
||||
fn from_u128(val: u128) -> Self;
|
||||
|
||||
/// Converts a value to u128.
|
||||
///
|
||||
/// Internally all fast field values are encoded as u128.
|
||||
fn to_u128(&self) -> u128;
|
||||
|
||||
/// Cast value to `u128`.
|
||||
/// The value is just reinterpreted in memory.
|
||||
fn as_u128(&self) -> u128;
|
||||
|
||||
/// Returns the `schema::Type` for this FastValue.
|
||||
fn to_type() -> Type;
|
||||
|
||||
/// Build a default value. This default value is never used, so the value does not
|
||||
/// really matter.
|
||||
fn make_zero() -> Self {
|
||||
Self::from_u128(0u128)
|
||||
}
|
||||
}
|
||||
|
||||
impl FastValueU128 for u128 {
|
||||
fn from_u128(val: u128) -> Self {
|
||||
val
|
||||
}
|
||||
|
||||
fn to_u128(&self) -> u128 {
|
||||
*self
|
||||
}
|
||||
|
||||
fn as_u128(&self) -> u128 {
|
||||
*self
|
||||
}
|
||||
|
||||
fn to_type() -> Type {
|
||||
Type::U128
|
||||
}
|
||||
}
|
||||
|
||||
impl FastValueU128 for IpAddr {
|
||||
fn from_u128(val: u128) -> Self {
|
||||
IpAddr::from(val.to_be_bytes())
|
||||
}
|
||||
|
||||
fn to_u128(&self) -> u128 {
|
||||
ip_to_u128(*self)
|
||||
}
|
||||
|
||||
fn as_u128(&self) -> u128 {
|
||||
ip_to_u128(*self)
|
||||
}
|
||||
|
||||
fn to_type() -> Type {
|
||||
Type::Ip
|
||||
}
|
||||
}
|
||||
|
||||
/// Trait for types that are allowed for fast fields:
|
||||
/// (u64, i64 and f64, bool, DateTime).
|
||||
pub trait FastValue: Clone + Copy + Send + Sync + PartialOrd + 'static {
|
||||
/// Converts a value from u64
|
||||
///
|
||||
/// Internally all fast field values are encoded as u64.
|
||||
/// **Note: To be used for converting encoded Term, Posting values.**
|
||||
fn from_u64(val: u64) -> Self;
|
||||
|
||||
/// Converts a value to u64.
|
||||
///
|
||||
/// Internally all fast field values are encoded as u64.
|
||||
fn to_u64(&self) -> u64;
|
||||
|
||||
/// Returns the fast field cardinality that can be extracted from the given
|
||||
/// `FieldType`.
|
||||
///
|
||||
/// If the type is not a fast field, `None` is returned.
|
||||
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality>;
|
||||
|
||||
/// Cast value to `u64`.
|
||||
/// The value is just reinterpreted in memory.
|
||||
fn as_u64(&self) -> u64;
|
||||
|
||||
/// Build a default value. This default value is never used, so the value does not
|
||||
/// really matter.
|
||||
fn make_zero() -> Self {
|
||||
Self::from_u64(0i64.to_u64())
|
||||
}
|
||||
|
||||
/// Returns the `schema::Type` for this FastValue.
|
||||
fn to_type() -> Type;
|
||||
}
|
||||
|
||||
impl FastValue for u64 {
|
||||
fn from_u64(val: u64) -> Self {
|
||||
val
|
||||
}
|
||||
|
||||
fn to_u64(&self) -> u64 {
|
||||
*self
|
||||
}
|
||||
|
||||
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
|
||||
match *field_type {
|
||||
FieldType::U64(ref integer_options) => integer_options.get_fastfield_cardinality(),
|
||||
FieldType::Facet(_) => Some(Cardinality::MultiValues),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn as_u64(&self) -> u64 {
|
||||
*self
|
||||
}
|
||||
|
||||
fn to_type() -> Type {
|
||||
Type::U64
|
||||
}
|
||||
}
|
||||
|
||||
impl FastValue for i64 {
|
||||
fn from_u64(val: u64) -> Self {
|
||||
common::u64_to_i64(val)
|
||||
}
|
||||
|
||||
fn to_u64(&self) -> u64 {
|
||||
common::i64_to_u64(*self)
|
||||
}
|
||||
|
||||
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
|
||||
match *field_type {
|
||||
FieldType::I64(ref integer_options) => integer_options.get_fastfield_cardinality(),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn as_u64(&self) -> u64 {
|
||||
*self as u64
|
||||
}
|
||||
|
||||
fn to_type() -> Type {
|
||||
Type::I64
|
||||
}
|
||||
}
|
||||
|
||||
impl FastValue for f64 {
|
||||
fn from_u64(val: u64) -> Self {
|
||||
common::u64_to_f64(val)
|
||||
}
|
||||
|
||||
fn to_u64(&self) -> u64 {
|
||||
common::f64_to_u64(*self)
|
||||
}
|
||||
|
||||
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
|
||||
match *field_type {
|
||||
FieldType::F64(ref integer_options) => integer_options.get_fastfield_cardinality(),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn as_u64(&self) -> u64 {
|
||||
self.to_bits()
|
||||
}
|
||||
|
||||
fn to_type() -> Type {
|
||||
Type::F64
|
||||
}
|
||||
}
|
||||
|
||||
impl FastValue for bool {
|
||||
fn from_u64(val: u64) -> Self {
|
||||
val != 0u64
|
||||
}
|
||||
|
||||
fn to_u64(&self) -> u64 {
|
||||
match self {
|
||||
false => 0,
|
||||
true => 1,
|
||||
}
|
||||
}
|
||||
|
||||
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
|
||||
match *field_type {
|
||||
FieldType::Bool(ref integer_options) => integer_options.get_fastfield_cardinality(),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn as_u64(&self) -> u64 {
|
||||
*self as u64
|
||||
}
|
||||
|
||||
fn to_type() -> Type {
|
||||
Type::Bool
|
||||
}
|
||||
}
|
||||
|
||||
impl FastValue for DateTime {
|
||||
/// Converts a timestamp microseconds into DateTime.
|
||||
///
|
||||
/// **Note the timestamps is expected to be in microseconds.**
|
||||
fn from_u64(timestamp_micros_u64: u64) -> Self {
|
||||
let timestamp_micros = i64::from_u64(timestamp_micros_u64);
|
||||
Self::from_timestamp_micros(timestamp_micros)
|
||||
}
|
||||
|
||||
fn to_u64(&self) -> u64 {
|
||||
common::i64_to_u64(self.into_timestamp_micros())
|
||||
}
|
||||
|
||||
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
|
||||
match *field_type {
|
||||
FieldType::Date(ref options) => options.get_fastfield_cardinality(),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn as_u64(&self) -> u64 {
|
||||
self.into_timestamp_micros().as_u64()
|
||||
}
|
||||
|
||||
fn to_type() -> Type {
|
||||
Type::Date
|
||||
}
|
||||
}
|
||||
@@ -20,24 +20,30 @@
|
||||
//!
|
||||
//! Read access performance is comparable to that of an array lookup.
|
||||
|
||||
use std::collections::btree_map::Range;
|
||||
|
||||
pub use self::alive_bitset::{intersect_alive_bitsets, write_alive_bitset, AliveBitSet};
|
||||
pub use self::bytes::{BytesFastFieldReader, BytesFastFieldWriter};
|
||||
pub use self::error::{FastFieldNotAvailableError, Result};
|
||||
pub use self::facet_reader::FacetReader;
|
||||
pub use self::fast_value::{FastValue, FastValueU128};
|
||||
pub(crate) use self::gcd::{find_gcd, GCDFastFieldCodec, GCD_CODEC_ID, GCD_DEFAULT};
|
||||
pub use self::multivalued::{MultiValuedFastFieldReader, MultiValuedFastFieldWriter};
|
||||
pub use self::reader::{DynamicFastFieldReader, FastFieldReader};
|
||||
pub use self::multivalued::{
|
||||
MultiValuedFastFieldReader, MultiValuedFastFieldWriter, MultiValuedU128FastFieldReader,
|
||||
};
|
||||
pub use self::reader::{DynamicFastFieldReader, FastFieldReader, FastFieldReaderCodecWrapperU128};
|
||||
pub use self::readers::FastFieldReaders;
|
||||
pub(crate) use self::readers::{type_and_cardinality, FastType};
|
||||
pub use self::serializer::{CompositeFastFieldSerializer, FastFieldDataAccess, FastFieldStats};
|
||||
pub use self::writer::{FastFieldsWriter, IntFastFieldWriter};
|
||||
use crate::schema::{Cardinality, FieldType, Type, Value};
|
||||
use crate::{DateTime, DocId};
|
||||
use crate::schema::Value;
|
||||
use crate::DocId;
|
||||
|
||||
mod alive_bitset;
|
||||
mod bytes;
|
||||
mod error;
|
||||
mod facet_reader;
|
||||
mod fast_value;
|
||||
mod gcd;
|
||||
mod multivalued;
|
||||
mod reader;
|
||||
@@ -57,182 +63,6 @@ pub(crate) const ALL_CODECS: &[FastFieldCodecName; 3] = &[
|
||||
FastFieldCodecName::BlockwiseLinearInterpol,
|
||||
];
|
||||
|
||||
/// Trait for `BytesFastFieldReader` and `MultiValuedFastFieldReader` to return the length of data
|
||||
/// for a doc_id
|
||||
pub trait MultiValueLength {
|
||||
/// returns the num of values associated to a doc_id
|
||||
fn get_len(&self, doc_id: DocId) -> u64;
|
||||
/// returns the sum of num values for all doc_ids
|
||||
fn get_total_len(&self) -> u64;
|
||||
}
|
||||
|
||||
/// Trait for types that are allowed for fast fields:
|
||||
/// (u64, i64 and f64, bool, DateTime).
|
||||
pub trait FastValue: Clone + Copy + Send + Sync + PartialOrd + 'static {
|
||||
/// Converts a value from u64
|
||||
///
|
||||
/// Internally all fast field values are encoded as u64.
|
||||
/// **Note: To be used for converting encoded Term, Posting values.**
|
||||
fn from_u64(val: u64) -> Self;
|
||||
|
||||
/// Converts a value to u64.
|
||||
///
|
||||
/// Internally all fast field values are encoded as u64.
|
||||
fn to_u64(&self) -> u64;
|
||||
|
||||
/// Returns the fast field cardinality that can be extracted from the given
|
||||
/// `FieldType`.
|
||||
///
|
||||
/// If the type is not a fast field, `None` is returned.
|
||||
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality>;
|
||||
|
||||
/// Cast value to `u64`.
|
||||
/// The value is just reinterpreted in memory.
|
||||
fn as_u64(&self) -> u64;
|
||||
|
||||
/// Build a default value. This default value is never used, so the value does not
|
||||
/// really matter.
|
||||
fn make_zero() -> Self {
|
||||
Self::from_u64(0i64.to_u64())
|
||||
}
|
||||
|
||||
/// Returns the `schema::Type` for this FastValue.
|
||||
fn to_type() -> Type;
|
||||
}
|
||||
|
||||
impl FastValue for u64 {
|
||||
fn from_u64(val: u64) -> Self {
|
||||
val
|
||||
}
|
||||
|
||||
fn to_u64(&self) -> u64 {
|
||||
*self
|
||||
}
|
||||
|
||||
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
|
||||
match *field_type {
|
||||
FieldType::U64(ref integer_options) => integer_options.get_fastfield_cardinality(),
|
||||
FieldType::Facet(_) => Some(Cardinality::MultiValues),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn as_u64(&self) -> u64 {
|
||||
*self
|
||||
}
|
||||
|
||||
fn to_type() -> Type {
|
||||
Type::U64
|
||||
}
|
||||
}
|
||||
|
||||
impl FastValue for i64 {
|
||||
fn from_u64(val: u64) -> Self {
|
||||
common::u64_to_i64(val)
|
||||
}
|
||||
|
||||
fn to_u64(&self) -> u64 {
|
||||
common::i64_to_u64(*self)
|
||||
}
|
||||
|
||||
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
|
||||
match *field_type {
|
||||
FieldType::I64(ref integer_options) => integer_options.get_fastfield_cardinality(),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn as_u64(&self) -> u64 {
|
||||
*self as u64
|
||||
}
|
||||
|
||||
fn to_type() -> Type {
|
||||
Type::I64
|
||||
}
|
||||
}
|
||||
|
||||
impl FastValue for f64 {
|
||||
fn from_u64(val: u64) -> Self {
|
||||
common::u64_to_f64(val)
|
||||
}
|
||||
|
||||
fn to_u64(&self) -> u64 {
|
||||
common::f64_to_u64(*self)
|
||||
}
|
||||
|
||||
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
|
||||
match *field_type {
|
||||
FieldType::F64(ref integer_options) => integer_options.get_fastfield_cardinality(),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn as_u64(&self) -> u64 {
|
||||
self.to_bits()
|
||||
}
|
||||
|
||||
fn to_type() -> Type {
|
||||
Type::F64
|
||||
}
|
||||
}
|
||||
|
||||
impl FastValue for bool {
|
||||
fn from_u64(val: u64) -> Self {
|
||||
val != 0u64
|
||||
}
|
||||
|
||||
fn to_u64(&self) -> u64 {
|
||||
match self {
|
||||
false => 0,
|
||||
true => 1,
|
||||
}
|
||||
}
|
||||
|
||||
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
|
||||
match *field_type {
|
||||
FieldType::Bool(ref integer_options) => integer_options.get_fastfield_cardinality(),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn as_u64(&self) -> u64 {
|
||||
*self as u64
|
||||
}
|
||||
|
||||
fn to_type() -> Type {
|
||||
Type::Bool
|
||||
}
|
||||
}
|
||||
|
||||
impl FastValue for DateTime {
|
||||
/// Converts a timestamp microseconds into DateTime.
|
||||
///
|
||||
/// **Note the timestamps is expected to be in microseconds.**
|
||||
fn from_u64(timestamp_micros_u64: u64) -> Self {
|
||||
let timestamp_micros = i64::from_u64(timestamp_micros_u64);
|
||||
Self::from_timestamp_micros(timestamp_micros)
|
||||
}
|
||||
|
||||
fn to_u64(&self) -> u64 {
|
||||
common::i64_to_u64(self.into_timestamp_micros())
|
||||
}
|
||||
|
||||
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
|
||||
match *field_type {
|
||||
FieldType::Date(ref options) => options.get_fastfield_cardinality(),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn as_u64(&self) -> u64 {
|
||||
self.into_timestamp_micros().as_u64()
|
||||
}
|
||||
|
||||
fn to_type() -> Type {
|
||||
Type::Date
|
||||
}
|
||||
}
|
||||
|
||||
fn value_to_u64(value: &Value) -> u64 {
|
||||
match value {
|
||||
Value::U64(val) => val.to_u64(),
|
||||
@@ -244,6 +74,17 @@ fn value_to_u64(value: &Value) -> u64 {
|
||||
}
|
||||
}
|
||||
|
||||
/// Trait for `BytesFastFieldReader` and `MultiValuedFastFieldReader` to return the length of data
|
||||
/// for a doc_id
|
||||
pub trait MultiValueLength {
|
||||
/// returns the positions of values associated to a doc_id
|
||||
fn get_range(&self, doc_id: DocId) -> std::ops::Range<u64>;
|
||||
/// returns the num of values associated to a doc_id
|
||||
fn get_len(&self, doc_id: DocId) -> u64;
|
||||
/// returns the sum of num values for all doc_ids
|
||||
fn get_total_len(&self) -> u64;
|
||||
}
|
||||
|
||||
/// The fast field type
|
||||
pub enum FastFieldType {
|
||||
/// Numeric type, e.g. f64.
|
||||
@@ -268,6 +109,7 @@ impl FastFieldType {
|
||||
mod tests {
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::net::IpAddr;
|
||||
use std::ops::Range;
|
||||
use std::path::Path;
|
||||
|
||||
@@ -280,9 +122,11 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::directory::{CompositeFile, Directory, RamDirectory, WritePtr};
|
||||
use crate::merge_policy::NoMergePolicy;
|
||||
use crate::schema::{Document, Field, Schema, FAST, STRING, TEXT};
|
||||
use crate::schema::{
|
||||
self, Cardinality, Document, Field, IpOptions, Schema, FAST, INDEXED, STORED, STRING, TEXT,
|
||||
};
|
||||
use crate::time::OffsetDateTime;
|
||||
use crate::{DateOptions, DatePrecision, Index, SegmentId, SegmentReader};
|
||||
use crate::{DateOptions, DatePrecision, DateTime, Index, SegmentId, SegmentReader};
|
||||
|
||||
pub static SCHEMA: Lazy<Schema> = Lazy::new(|| {
|
||||
let mut schema_builder = Schema::builder();
|
||||
@@ -308,7 +152,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_fastfield_i64_u64() {
|
||||
pub fn test_datetime_conversion() {
|
||||
let datetime = DateTime::from_utc(OffsetDateTime::UNIX_EPOCH);
|
||||
assert_eq!(i64::from_u64(datetime.to_u64()), 0i64);
|
||||
}
|
||||
@@ -617,6 +461,85 @@ mod tests {
|
||||
all
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ip_fastfield_minimal() -> crate::Result<()> {
|
||||
let mut schema_builder = schema::Schema::builder();
|
||||
let ip_field = schema_builder.add_ip_field("ip", FAST | INDEXED | STORED);
|
||||
|
||||
let ips_field = schema_builder.add_ip_field(
|
||||
"ips",
|
||||
IpOptions::default().set_fast(Cardinality::MultiValues),
|
||||
);
|
||||
|
||||
let schema = schema_builder.build();
|
||||
|
||||
let index = Index::create_in_ram(schema);
|
||||
|
||||
let ip1 = IpAddr::from((1_u128).to_be_bytes());
|
||||
let ip2 = IpAddr::from((2_u128).to_be_bytes());
|
||||
let ip3 = IpAddr::from((3_u128).to_be_bytes());
|
||||
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
index_writer.set_merge_policy(Box::new(NoMergePolicy));
|
||||
index_writer.add_document(doc!())?;
|
||||
index_writer.add_document(doc!(
|
||||
ip_field => ip2,
|
||||
ips_field => ip2,
|
||||
ips_field => ip2,
|
||||
))?;
|
||||
index_writer.commit()?;
|
||||
|
||||
let reader = index.reader()?;
|
||||
let searcher = reader.searcher();
|
||||
assert_eq!(searcher.segment_readers().len(), 1);
|
||||
let segment_reader = searcher.segment_reader(0);
|
||||
let fast_fields = segment_reader.fast_fields();
|
||||
|
||||
// single value
|
||||
let ip_addr_fast_field = fast_fields.ip_addr(ip_field).unwrap();
|
||||
assert_eq!(ip_addr_fast_field.get_val(0), None);
|
||||
assert_eq!(ip_addr_fast_field.get_val(1), Some(ip2));
|
||||
assert_eq!(ip_addr_fast_field.get_between_vals(ip2..=ip2), vec![1]);
|
||||
assert_eq!(ip_addr_fast_field.get_between_vals(ip1..=ip2), vec![1]);
|
||||
assert_eq!(ip_addr_fast_field.get_between_vals(ip2..=ip3), vec![1]);
|
||||
assert_eq!(ip_addr_fast_field.get_between_vals(ip1..=ip3), vec![1]);
|
||||
assert_eq!(
|
||||
ip_addr_fast_field.get_between_vals(ip1..=ip1),
|
||||
vec![] as Vec<usize>
|
||||
);
|
||||
assert_eq!(
|
||||
ip_addr_fast_field.get_between_vals(ip3..=ip3),
|
||||
vec![] as Vec<usize>
|
||||
);
|
||||
|
||||
// multi value
|
||||
let ip_addr_fast_field = fast_fields.ip_addrs(ips_field).unwrap();
|
||||
assert_eq!(ip_addr_fast_field.get_first_val(0), None);
|
||||
assert_eq!(ip_addr_fast_field.get_first_val(1), Some(ip2));
|
||||
|
||||
let mut out = vec![];
|
||||
ip_addr_fast_field.get_vals(0, &mut out);
|
||||
assert_eq!(out, vec![] as Vec<IpAddr>);
|
||||
let mut out = vec![];
|
||||
ip_addr_fast_field.get_vals(1, &mut out);
|
||||
assert_eq!(out, vec![ip2, ip2]);
|
||||
|
||||
assert_eq!(ip_addr_fast_field.get_between_vals(ip2..=ip2), vec![1]);
|
||||
assert_eq!(ip_addr_fast_field.get_between_vals(ip1..=ip2), vec![1]);
|
||||
assert_eq!(ip_addr_fast_field.get_between_vals(ip2..=ip3), vec![1]);
|
||||
assert_eq!(ip_addr_fast_field.get_between_vals(ip1..=ip3), vec![1]);
|
||||
assert_eq!(
|
||||
ip_addr_fast_field.get_between_vals(ip1..=ip1),
|
||||
vec![] as Vec<usize>
|
||||
);
|
||||
assert_eq!(
|
||||
ip_addr_fast_field.get_between_vals(ip3..=ip3),
|
||||
vec![] as Vec<usize>
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_text_fastfield() -> crate::Result<()> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
mod reader;
|
||||
mod writer;
|
||||
|
||||
pub use self::reader::MultiValuedFastFieldReader;
|
||||
pub use self::writer::MultiValuedFastFieldWriter;
|
||||
pub use self::reader::{MultiValuedFastFieldReader, MultiValuedU128FastFieldReader};
|
||||
pub use self::writer::{MultiValuedFastFieldWriter, U128MultiValueFastFieldWriter};
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
@@ -1,6 +1,11 @@
|
||||
use std::ops::Range;
|
||||
use std::ops::{Range, RangeInclusive};
|
||||
|
||||
use crate::fastfield::{DynamicFastFieldReader, FastFieldReader, FastValue, MultiValueLength};
|
||||
use fastfield_codecs::ip_codec::IntervallDecompressor;
|
||||
|
||||
use crate::fastfield::{
|
||||
DynamicFastFieldReader, FastFieldReader, FastFieldReaderCodecWrapperU128, FastValue,
|
||||
FastValueU128, MultiValueLength,
|
||||
};
|
||||
use crate::DocId;
|
||||
|
||||
/// Reader for a multivalued `u64` fast field.
|
||||
@@ -84,6 +89,155 @@ impl<Item: FastValue> MultiValuedFastFieldReader<Item> {
|
||||
}
|
||||
|
||||
impl<Item: FastValue> MultiValueLength for MultiValuedFastFieldReader<Item> {
|
||||
fn get_range(&self, doc_id: DocId) -> std::ops::Range<u64> {
|
||||
self.range(doc_id)
|
||||
}
|
||||
fn get_len(&self, doc_id: DocId) -> u64 {
|
||||
self.num_vals(doc_id) as u64
|
||||
}
|
||||
fn get_total_len(&self) -> u64 {
|
||||
self.total_num_vals() as u64
|
||||
}
|
||||
}
|
||||
|
||||
/// Reader for a multivalued `u128` fast field.
|
||||
///
|
||||
/// The reader is implemented as a `u64` fast field for the index and a `u128` fast field.
|
||||
///
|
||||
/// The `vals_reader` will access the concatenated list of all
|
||||
/// values for all reader.
|
||||
/// The `idx_reader` associated, for each document, the index of its first value.
|
||||
#[derive(Clone)]
|
||||
pub struct MultiValuedU128FastFieldReader<Item: FastValueU128> {
|
||||
idx_reader: DynamicFastFieldReader<u64>,
|
||||
vals_reader: FastFieldReaderCodecWrapperU128<Item, IntervallDecompressor>,
|
||||
}
|
||||
|
||||
impl<Item: FastValueU128> MultiValuedU128FastFieldReader<Item> {
|
||||
pub(crate) fn open(
|
||||
idx_reader: DynamicFastFieldReader<u64>,
|
||||
vals_reader: FastFieldReaderCodecWrapperU128<Item, IntervallDecompressor>,
|
||||
) -> MultiValuedU128FastFieldReader<Item> {
|
||||
Self {
|
||||
idx_reader,
|
||||
vals_reader,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns `[start, end)`, such that the values associated
|
||||
/// to the given document are `start..end`.
|
||||
#[inline]
|
||||
fn range(&self, doc: DocId) -> Range<u64> {
|
||||
let start = self.idx_reader.get(doc);
|
||||
let end = self.idx_reader.get(doc + 1);
|
||||
start..end
|
||||
}
|
||||
|
||||
/// Returns the array of values associated to the given `doc`.
|
||||
#[inline]
|
||||
pub fn get_first_val(&self, doc: DocId) -> Option<Item> {
|
||||
let range = self.range(doc);
|
||||
if range.is_empty() {
|
||||
return None;
|
||||
}
|
||||
self.vals_reader.get_val(range.start)
|
||||
}
|
||||
|
||||
/// Returns the array of values associated to the given `doc`.
|
||||
#[inline]
|
||||
fn get_vals_for_range(&self, range: Range<u64>, vals: &mut Vec<Item>) {
|
||||
let len = (range.end - range.start) as usize;
|
||||
vals.resize(len, Item::make_zero());
|
||||
self.vals_reader.get_range(range.start, &mut vals[..]);
|
||||
}
|
||||
|
||||
/// Returns the array of values associated to the given `doc`.
|
||||
#[inline]
|
||||
pub fn get_vals(&self, doc: DocId, vals: &mut Vec<Item>) {
|
||||
let range = self.range(doc);
|
||||
self.get_vals_for_range(range, vals);
|
||||
}
|
||||
|
||||
/// Returns all docids which are in the provided value range
|
||||
pub fn get_between_vals(&self, range: RangeInclusive<Item>) -> Vec<DocId> {
|
||||
let positions = self.vals_reader.get_between_vals(range);
|
||||
|
||||
positions_to_docids(&positions, self)
|
||||
}
|
||||
|
||||
/// Iterates over all elements in the fast field
|
||||
pub fn iter(&self) -> impl Iterator<Item = Option<Item>> + '_ {
|
||||
self.vals_reader.iter()
|
||||
}
|
||||
|
||||
/// Returns the minimum value for this fast field.
|
||||
///
|
||||
/// The min value does not take in account of possible
|
||||
/// deleted document, and should be considered as a lower bound
|
||||
/// of the actual mimimum value.
|
||||
pub fn min_value(&self) -> Item {
|
||||
self.vals_reader.min_value()
|
||||
}
|
||||
|
||||
/// Returns the maximum value for this fast field.
|
||||
///
|
||||
/// The max value does not take in account of possible
|
||||
/// deleted document, and should be considered as an upper bound
|
||||
/// of the actual maximum value.
|
||||
pub fn max_value(&self) -> Item {
|
||||
self.vals_reader.max_value()
|
||||
}
|
||||
|
||||
/// Returns the number of values associated with the document `DocId`.
|
||||
#[inline]
|
||||
pub fn num_vals(&self, doc: DocId) -> usize {
|
||||
let range = self.range(doc);
|
||||
(range.end - range.start) as usize
|
||||
}
|
||||
|
||||
/// Returns the overall number of values in this field .
|
||||
#[inline]
|
||||
pub fn total_num_vals(&self) -> u64 {
|
||||
self.idx_reader.max_value()
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts a list of positions of values in a 1:n index to the corresponding list of DocIds.
|
||||
///
|
||||
/// Since there is no index for value pos -> docid, but docid -> value pos range, we scan the index.
|
||||
///
|
||||
/// Correctness: positions needs to be sorted.
|
||||
///
|
||||
/// TODO: Instead of a linear scan we can employ a binary search to match a docid to its value
|
||||
/// position.
|
||||
fn positions_to_docids<T: MultiValueLength>(positions: &[usize], multival_idx: &T) -> Vec<DocId> {
|
||||
let mut docs = vec![];
|
||||
let mut cur_doc = 0u32;
|
||||
let mut last_doc = None;
|
||||
|
||||
for pos in positions {
|
||||
loop {
|
||||
let range = multival_idx.get_range(cur_doc);
|
||||
if range.contains(&(*pos as u64)) {
|
||||
// avoid duplicates
|
||||
if Some(cur_doc) == last_doc {
|
||||
break;
|
||||
}
|
||||
docs.push(cur_doc);
|
||||
last_doc = Some(cur_doc);
|
||||
break;
|
||||
}
|
||||
cur_doc += 1;
|
||||
}
|
||||
}
|
||||
|
||||
docs
|
||||
}
|
||||
|
||||
impl<Item: FastValueU128> MultiValueLength for MultiValuedU128FastFieldReader<Item> {
|
||||
fn get_range(&self, doc_id: DocId) -> std::ops::Range<u64> {
|
||||
self.range(doc_id)
|
||||
}
|
||||
fn get_len(&self, doc_id: DocId) -> u64 {
|
||||
self.num_vals(doc_id) as u64
|
||||
}
|
||||
@@ -92,6 +246,7 @@ impl<Item: FastValue> MultiValueLength for MultiValuedFastFieldReader<Item> {
|
||||
self.total_num_vals() as u64
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::io;
|
||||
|
||||
use fastfield_codecs::ip_codec::{ip_to_u128, IntervalCompressor};
|
||||
use fnv::FnvHashMap;
|
||||
use tantivy_bitpacker::minmax;
|
||||
|
||||
@@ -120,25 +121,9 @@ impl MultiValuedFastFieldWriter {
|
||||
&'a self,
|
||||
doc_id_map: Option<&'b DocIdMapping>,
|
||||
) -> impl Iterator<Item = &'b [u64]> {
|
||||
let doc_id_iter: Box<dyn Iterator<Item = u32>> = if let Some(doc_id_map) = doc_id_map {
|
||||
Box::new(doc_id_map.iter_old_doc_ids())
|
||||
} else {
|
||||
let max_doc = self.doc_index.len() as DocId;
|
||||
Box::new(0..max_doc)
|
||||
};
|
||||
doc_id_iter.map(move |doc_id| self.get_values_for_doc_id(doc_id))
|
||||
get_ordered_values(&self.vals, &self.doc_index, doc_id_map)
|
||||
}
|
||||
|
||||
/// returns all values for a doc_ids
|
||||
fn get_values_for_doc_id(&self, doc_id: u32) -> &[u64] {
|
||||
let start_pos = self.doc_index[doc_id as usize] as usize;
|
||||
let end_pos = self
|
||||
.doc_index
|
||||
.get(doc_id as usize + 1)
|
||||
.cloned()
|
||||
.unwrap_or(self.vals.len() as u64) as usize; // special case, last doc_id has no offset information
|
||||
&self.vals[start_pos..end_pos]
|
||||
}
|
||||
/// Serializes fast field values by pushing them to the `FastFieldSerializer`.
|
||||
///
|
||||
/// If a mapping is given, the values are remapped *and sorted* before serialization.
|
||||
@@ -220,3 +205,132 @@ impl MultiValuedFastFieldWriter {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Writer for multi-valued (as in, more than one value per document)
|
||||
/// int fast field.
|
||||
///
|
||||
/// This `Writer` is only useful for advanced users.
|
||||
/// The normal way to get your multivalued int in your index
|
||||
/// is to
|
||||
/// - declare your field with fast set to `Cardinality::MultiValues`
|
||||
/// in your schema
|
||||
/// - add your document simply by calling `.add_document(...)`.
|
||||
///
|
||||
/// The `MultiValuedFastFieldWriter` can be acquired from the
|
||||
|
||||
pub struct U128MultiValueFastFieldWriter {
|
||||
field: Field,
|
||||
vals: Vec<u128>,
|
||||
doc_index: Vec<u64>,
|
||||
}
|
||||
|
||||
impl U128MultiValueFastFieldWriter {
|
||||
/// Creates a new `U128MultiValueFastFieldWriter`
|
||||
pub(crate) fn new(field: Field) -> Self {
|
||||
U128MultiValueFastFieldWriter {
|
||||
field,
|
||||
vals: Vec::new(),
|
||||
doc_index: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// The memory used (inclusive childs)
|
||||
pub fn mem_usage(&self) -> usize {
|
||||
self.vals.capacity() * std::mem::size_of::<UnorderedTermId>()
|
||||
+ self.doc_index.capacity() * std::mem::size_of::<u64>()
|
||||
}
|
||||
|
||||
/// Finalize the current document.
|
||||
pub(crate) fn next_doc(&mut self) {
|
||||
self.doc_index.push(self.vals.len() as u64);
|
||||
}
|
||||
|
||||
/// Pushes a new value to the current document.
|
||||
pub(crate) fn add_val(&mut self, val: u128) {
|
||||
self.vals.push(val);
|
||||
}
|
||||
|
||||
/// Shift to the next document and adds
|
||||
/// all of the matching field values present in the document.
|
||||
pub fn add_document(&mut self, doc: &Document) {
|
||||
self.next_doc();
|
||||
for field_value in doc.field_values() {
|
||||
if field_value.field == self.field {
|
||||
let value = field_value.value();
|
||||
let ip_addr = value.as_ip().unwrap();
|
||||
let value = ip_to_u128(ip_addr);
|
||||
self.add_val(value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns an iterator over values per doc_id in ascending doc_id order.
|
||||
///
|
||||
/// Normally the order is simply iterating self.doc_id_index.
|
||||
/// With doc_id_map it accounts for the new mapping, returning values in the order of the
|
||||
/// new doc_ids.
|
||||
fn get_ordered_values<'a: 'b, 'b>(
|
||||
&'a self,
|
||||
doc_id_map: Option<&'b DocIdMapping>,
|
||||
) -> impl Iterator<Item = &'b [u128]> {
|
||||
get_ordered_values(&self.vals, &self.doc_index, doc_id_map)
|
||||
}
|
||||
|
||||
/// Serializes fast field values.
|
||||
pub fn serialize(
|
||||
&self,
|
||||
serializer: &mut CompositeFastFieldSerializer,
|
||||
doc_id_map: Option<&DocIdMapping>,
|
||||
) -> io::Result<()> {
|
||||
{
|
||||
// writing the offset index
|
||||
let mut doc_index_serializer =
|
||||
serializer.new_u64_fast_field_with_idx(self.field, 0, self.vals.len() as u64, 0)?;
|
||||
|
||||
let mut offset = 0;
|
||||
for vals in self.get_ordered_values(doc_id_map) {
|
||||
doc_index_serializer.add_val(offset)?;
|
||||
offset += vals.len() as u64;
|
||||
}
|
||||
doc_index_serializer.add_val(self.vals.len() as u64)?;
|
||||
|
||||
doc_index_serializer.close_field()?;
|
||||
}
|
||||
{
|
||||
let field_write = serializer.get_field_writer(self.field, 1);
|
||||
let compressor = IntervalCompressor::from_vals(self.vals.to_vec());
|
||||
let iter = self.get_ordered_values(doc_id_map).flatten().cloned();
|
||||
compressor.compress_into(iter, field_write)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns an iterator over values per doc_id in ascending doc_id order.
|
||||
///
|
||||
/// Normally the order is simply iterating self.doc_id_index.
|
||||
/// With doc_id_map it accounts for the new mapping, returning values in the order of the
|
||||
/// new doc_ids.
|
||||
fn get_ordered_values<'a: 'b, 'b, T>(
|
||||
vals: &'a [T],
|
||||
doc_index: &'a [u64],
|
||||
doc_id_map: Option<&'b DocIdMapping>,
|
||||
) -> impl Iterator<Item = &'b [T]> {
|
||||
let doc_id_iter: Box<dyn Iterator<Item = u32>> = if let Some(doc_id_map) = doc_id_map {
|
||||
Box::new(doc_id_map.iter_old_doc_ids())
|
||||
} else {
|
||||
let max_doc = doc_index.len() as DocId;
|
||||
Box::new(0..max_doc)
|
||||
};
|
||||
doc_id_iter.map(move |doc_id| get_values_for_doc_id(doc_id, vals, doc_index))
|
||||
}
|
||||
|
||||
/// returns all values for a doc_id
|
||||
fn get_values_for_doc_id<'a, T>(doc_id: u32, vals: &'a [T], doc_index: &'a [u64]) -> &'a [T] {
|
||||
let start_pos = doc_index[doc_id as usize] as usize;
|
||||
let end_pos = doc_index
|
||||
.get(doc_id as usize + 1)
|
||||
.cloned()
|
||||
.unwrap_or(vals.len() as u64) as usize; // special case, last doc_id has no offset information
|
||||
&vals[start_pos..end_pos]
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::collections::HashMap;
|
||||
use std::marker::PhantomData;
|
||||
use std::ops::RangeInclusive;
|
||||
use std::path::Path;
|
||||
|
||||
use fastfield_codecs::bitpacked::{
|
||||
@@ -11,9 +12,9 @@ use fastfield_codecs::linearinterpol::{
|
||||
use fastfield_codecs::multilinearinterpol::{
|
||||
MultiLinearInterpolFastFieldReader, MultiLinearInterpolFastFieldSerializer,
|
||||
};
|
||||
use fastfield_codecs::{FastFieldCodecReader, FastFieldCodecSerializer};
|
||||
use fastfield_codecs::{FastFieldCodecReader, FastFieldCodecReaderU128, FastFieldCodecSerializer};
|
||||
|
||||
use super::{FastValue, GCDFastFieldCodec, GCD_CODEC_ID};
|
||||
use super::{FastValue, FastValueU128, GCDFastFieldCodec, GCD_CODEC_ID};
|
||||
use crate::directory::{CompositeFile, Directory, FileSlice, OwnedBytes, RamDirectory, WritePtr};
|
||||
use crate::fastfield::{CompositeFastFieldSerializer, FastFieldsWriter};
|
||||
use crate::schema::{Schema, FAST};
|
||||
@@ -210,6 +211,78 @@ impl<Item: FastValue> FastFieldReader<Item> for DynamicFastFieldReader<Item> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrapper for accessing a fastfield.
|
||||
///
|
||||
/// Holds the data and the codec to the read the data.
|
||||
#[derive(Clone)]
|
||||
pub struct FastFieldReaderCodecWrapperU128<Item: FastValueU128, CodecReader> {
|
||||
reader: CodecReader,
|
||||
bytes: OwnedBytes,
|
||||
_phantom: PhantomData<Item>,
|
||||
}
|
||||
|
||||
impl<Item: FastValueU128, C: FastFieldCodecReaderU128> FastFieldReaderCodecWrapperU128<Item, C> {
|
||||
/// Opens a fast field given the bytes.
|
||||
pub fn open_from_bytes(bytes: OwnedBytes) -> crate::Result<Self> {
|
||||
let reader = C::open_from_bytes(bytes.as_slice())?;
|
||||
Ok(Self {
|
||||
reader,
|
||||
bytes,
|
||||
_phantom: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns the item for the docid, if present
|
||||
pub fn get_val(&self, doc: u64) -> Option<Item> {
|
||||
self.reader
|
||||
.get(doc, self.bytes.as_slice())
|
||||
.map(|el| Item::from_u128(el))
|
||||
}
|
||||
|
||||
/// Internally `multivalued` also use SingleValue Fast fields.
|
||||
/// It works as follows... A first column contains the list of start index
|
||||
/// for each document, a second column contains the actual values.
|
||||
///
|
||||
/// The values associated to a given doc, are then
|
||||
/// `second_column[first_column.get(doc)..first_column.get(doc+1)]`.
|
||||
///
|
||||
/// Which means single value fast field reader can be indexed internally with
|
||||
/// something different from a `DocId`. For this use case, we want to use `u64`
|
||||
/// values.
|
||||
///
|
||||
/// See `get_range` for an actual documentation about this method.
|
||||
pub(crate) fn get_range(&self, start: u64, output: &mut [Item]) {
|
||||
for (i, out) in output.iter_mut().enumerate() {
|
||||
if let Some(val) = self.get_val(start + (i as u64)) {
|
||||
*out = val
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterates over all elements in the fast field
|
||||
pub fn iter(&self) -> impl Iterator<Item = Option<Item>> + '_ {
|
||||
self.reader
|
||||
.iter(self.bytes.as_slice())
|
||||
.map(|el| el.map(Item::from_u128))
|
||||
}
|
||||
|
||||
/// Returns all docids which are in the provided value range
|
||||
pub fn get_between_vals(&self, range: RangeInclusive<Item>) -> Vec<usize> {
|
||||
let range = range.start().to_u128()..=range.end().to_u128();
|
||||
self.reader.get_between_vals(range, self.bytes.as_slice())
|
||||
}
|
||||
|
||||
/// Return min_value.
|
||||
pub fn min_value(&self) -> Item {
|
||||
Item::from_u128(self.reader.min_value())
|
||||
}
|
||||
|
||||
/// Return max_value.
|
||||
pub fn max_value(&self) -> Item {
|
||||
Item::from_u128(self.reader.max_value())
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrapper for accessing a fastfield.
|
||||
///
|
||||
/// Holds the data and the codec to the read the data.
|
||||
|
||||
@@ -1,4 +1,9 @@
|
||||
use super::reader::DynamicFastFieldReader;
|
||||
use std::net::IpAddr;
|
||||
|
||||
use fastfield_codecs::ip_codec::IntervallDecompressor;
|
||||
|
||||
use super::multivalued::MultiValuedU128FastFieldReader;
|
||||
use super::reader::{DynamicFastFieldReader, FastFieldReaderCodecWrapperU128};
|
||||
use crate::directory::{CompositeFile, FileSlice};
|
||||
use crate::fastfield::{
|
||||
BytesFastFieldReader, FastFieldNotAvailableError, FastValue, MultiValuedFastFieldReader,
|
||||
@@ -20,6 +25,7 @@ pub struct FastFieldReaders {
|
||||
pub(crate) enum FastType {
|
||||
I64,
|
||||
U64,
|
||||
U128,
|
||||
F64,
|
||||
Bool,
|
||||
Date,
|
||||
@@ -46,6 +52,9 @@ pub(crate) fn type_and_cardinality(field_type: &FieldType) -> Option<(FastType,
|
||||
FieldType::Str(options) if options.is_fast() => {
|
||||
Some((FastType::U64, Cardinality::MultiValues))
|
||||
}
|
||||
FieldType::Ip(options) => options
|
||||
.get_fastfield_cardinality()
|
||||
.map(|cardinality| (FastType::U128, cardinality)),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
@@ -137,6 +146,69 @@ impl FastFieldReaders {
|
||||
self.typed_fast_field_reader(field)
|
||||
}
|
||||
|
||||
/// Returns the `ip` fast field reader reader associated to `field`.
|
||||
///
|
||||
/// If `field` is not a u128 fast field, this method returns an Error.
|
||||
pub fn ip_addr(
|
||||
&self,
|
||||
field: Field,
|
||||
) -> crate::Result<FastFieldReaderCodecWrapperU128<IpAddr, IntervallDecompressor>> {
|
||||
self.check_type(field, FastType::U128, Cardinality::SingleValue)?;
|
||||
let fast_field_slice = self.fast_field_data(field, 0)?;
|
||||
let bytes = fast_field_slice.read_bytes()?;
|
||||
FastFieldReaderCodecWrapperU128::<IpAddr, IntervallDecompressor>::open_from_bytes(bytes)
|
||||
}
|
||||
|
||||
/// Returns the `ip` fast field reader reader associated to `field`.
|
||||
///
|
||||
/// If `field` is not a u128 fast field, this method returns an Error.
|
||||
pub fn ip_addrs(&self, field: Field) -> crate::Result<MultiValuedU128FastFieldReader<IpAddr>> {
|
||||
self.check_type(field, FastType::U128, Cardinality::MultiValues)?;
|
||||
let idx_reader: DynamicFastFieldReader<u64> = self.typed_fast_field_reader(field)?;
|
||||
|
||||
let fast_field_slice = self.fast_field_data(field, 1)?;
|
||||
let bytes = fast_field_slice.read_bytes()?;
|
||||
|
||||
let vals_reader =
|
||||
FastFieldReaderCodecWrapperU128::<IpAddr, IntervallDecompressor>::open_from_bytes(
|
||||
bytes,
|
||||
)?;
|
||||
Ok(MultiValuedU128FastFieldReader::open(
|
||||
idx_reader,
|
||||
vals_reader,
|
||||
))
|
||||
}
|
||||
|
||||
/// Returns the `u128` fast field reader reader associated to `field`.
|
||||
///
|
||||
/// If `field` is not a u128 fast field, this method returns an Error.
|
||||
pub fn u128(
|
||||
&self,
|
||||
field: Field,
|
||||
) -> crate::Result<FastFieldReaderCodecWrapperU128<u128, IntervallDecompressor>> {
|
||||
let fast_field_slice = self.fast_field_data(field, 0)?;
|
||||
let bytes = fast_field_slice.read_bytes()?;
|
||||
FastFieldReaderCodecWrapperU128::<u128, IntervallDecompressor>::open_from_bytes(bytes)
|
||||
}
|
||||
|
||||
/// Returns the `u128` multi-valued fast field reader reader associated to `field`.
|
||||
///
|
||||
/// If `field` is not a u128 multi-valued fast field, this method returns an Error.
|
||||
pub fn u128s(&self, field: Field) -> crate::Result<MultiValuedU128FastFieldReader<u128>> {
|
||||
self.check_type(field, FastType::U128, Cardinality::MultiValues)?;
|
||||
let idx_reader: DynamicFastFieldReader<u64> = self.typed_fast_field_reader(field)?;
|
||||
|
||||
let fast_field_slice = self.fast_field_data(field, 1)?;
|
||||
let bytes = fast_field_slice.read_bytes()?;
|
||||
|
||||
let vals_reader =
|
||||
FastFieldReaderCodecWrapperU128::<u128, IntervallDecompressor>::open_from_bytes(bytes)?;
|
||||
Ok(MultiValuedU128FastFieldReader::open(
|
||||
idx_reader,
|
||||
vals_reader,
|
||||
))
|
||||
}
|
||||
|
||||
/// Returns the `u64` fast field reader reader associated to `field`, regardless of whether the
|
||||
/// given field is effectively of type `u64` or not.
|
||||
///
|
||||
|
||||
@@ -327,6 +327,11 @@ impl CompositeFastFieldSerializer {
|
||||
FastBytesFieldSerializer { write: field_write }
|
||||
}
|
||||
|
||||
/// Gets the underlying writer
|
||||
pub fn get_field_writer(&mut self, field: Field, idx: usize) -> &mut impl Write {
|
||||
self.composite_write.for_field_with_idx(field, idx)
|
||||
}
|
||||
|
||||
/// Closes the serializer
|
||||
///
|
||||
/// After this call the data must be persistently saved on disk.
|
||||
|
||||
@@ -2,10 +2,12 @@ use std::collections::HashMap;
|
||||
use std::io;
|
||||
|
||||
use common;
|
||||
use fastfield_codecs::ip_codec::{ip_to_u128, IntervalCompressor};
|
||||
use fnv::FnvHashMap;
|
||||
use roaring::RoaringBitmap;
|
||||
use tantivy_bitpacker::BlockedBitpacker;
|
||||
|
||||
use super::multivalued::MultiValuedFastFieldWriter;
|
||||
use super::multivalued::{MultiValuedFastFieldWriter, U128MultiValueFastFieldWriter};
|
||||
use super::serializer::FastFieldStats;
|
||||
use super::{FastFieldDataAccess, FastFieldType, FastValue};
|
||||
use crate::fastfield::{BytesFastFieldWriter, CompositeFastFieldSerializer};
|
||||
@@ -19,6 +21,8 @@ use crate::DatePrecision;
|
||||
pub struct FastFieldsWriter {
|
||||
term_id_writers: Vec<MultiValuedFastFieldWriter>,
|
||||
single_value_writers: Vec<IntFastFieldWriter>,
|
||||
u128_value_writers: Vec<U128FastFieldWriter>,
|
||||
u128_multi_value_writers: Vec<U128MultiValueFastFieldWriter>,
|
||||
multi_values_writers: Vec<MultiValuedFastFieldWriter>,
|
||||
bytes_value_writers: Vec<BytesFastFieldWriter>,
|
||||
}
|
||||
@@ -34,6 +38,8 @@ fn fast_field_default_value(field_entry: &FieldEntry) -> u64 {
|
||||
impl FastFieldsWriter {
|
||||
/// Create all `FastFieldWriter` required by the schema.
|
||||
pub fn from_schema(schema: &Schema) -> FastFieldsWriter {
|
||||
let mut u128_value_writers = Vec::new();
|
||||
let mut u128_multi_value_writers = Vec::new();
|
||||
let mut single_value_writers = Vec::new();
|
||||
let mut term_id_writers = Vec::new();
|
||||
let mut multi_values_writers = Vec::new();
|
||||
@@ -97,10 +103,27 @@ impl FastFieldsWriter {
|
||||
bytes_value_writers.push(fast_field_writer);
|
||||
}
|
||||
}
|
||||
FieldType::Ip(opt) => {
|
||||
if opt.is_fast() {
|
||||
match opt.get_fastfield_cardinality() {
|
||||
Some(Cardinality::SingleValue) => {
|
||||
let fast_field_writer = U128FastFieldWriter::new(field);
|
||||
u128_value_writers.push(fast_field_writer);
|
||||
}
|
||||
Some(Cardinality::MultiValues) => {
|
||||
let fast_field_writer = U128MultiValueFastFieldWriter::new(field);
|
||||
u128_multi_value_writers.push(fast_field_writer);
|
||||
}
|
||||
None => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
FieldType::Str(_) | FieldType::JsonObject(_) => {}
|
||||
}
|
||||
}
|
||||
FastFieldsWriter {
|
||||
u128_value_writers,
|
||||
u128_multi_value_writers,
|
||||
term_id_writers,
|
||||
single_value_writers,
|
||||
multi_values_writers,
|
||||
@@ -129,6 +152,16 @@ impl FastFieldsWriter {
|
||||
.iter()
|
||||
.map(|w| w.mem_usage())
|
||||
.sum::<usize>()
|
||||
+ self
|
||||
.u128_value_writers
|
||||
.iter()
|
||||
.map(|w| w.mem_usage())
|
||||
.sum::<usize>()
|
||||
+ self
|
||||
.u128_multi_value_writers
|
||||
.iter()
|
||||
.map(|w| w.mem_usage())
|
||||
.sum::<usize>()
|
||||
}
|
||||
|
||||
/// Get the `FastFieldWriter` associated to a field.
|
||||
@@ -190,7 +223,6 @@ impl FastFieldsWriter {
|
||||
.iter_mut()
|
||||
.find(|field_writer| field_writer.field() == field)
|
||||
}
|
||||
|
||||
/// Indexes all of the fastfields of a new document.
|
||||
pub fn add_document(&mut self, doc: &Document) {
|
||||
for field_writer in &mut self.term_id_writers {
|
||||
@@ -205,6 +237,12 @@ impl FastFieldsWriter {
|
||||
for field_writer in &mut self.bytes_value_writers {
|
||||
field_writer.add_document(doc);
|
||||
}
|
||||
for field_writer in &mut self.u128_value_writers {
|
||||
field_writer.add_document(doc);
|
||||
}
|
||||
for field_writer in &mut self.u128_multi_value_writers {
|
||||
field_writer.add_document(doc);
|
||||
}
|
||||
}
|
||||
|
||||
/// Serializes all of the `FastFieldWriter`s by pushing them in
|
||||
@@ -230,6 +268,129 @@ impl FastFieldsWriter {
|
||||
for field_writer in &self.bytes_value_writers {
|
||||
field_writer.serialize(serializer, doc_id_map)?;
|
||||
}
|
||||
for field_writer in &self.u128_value_writers {
|
||||
field_writer.serialize(serializer, doc_id_map)?;
|
||||
}
|
||||
for field_writer in &self.u128_multi_value_writers {
|
||||
field_writer.serialize(serializer, doc_id_map)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Fast field writer for u128 values.
|
||||
/// The fast field writer just keeps the values in memory.
|
||||
///
|
||||
/// Only when the segment writer can be closed and
|
||||
/// persisted on disc, the fast field writer is
|
||||
/// sent to a `FastFieldSerializer` via the `.serialize(...)`
|
||||
/// method.
|
||||
///
|
||||
/// We cannot serialize earlier as the values are
|
||||
/// compressed to a compact number space and the number of
|
||||
/// bits required for bitpacking can only been known once
|
||||
/// we have seen all of the values.
|
||||
pub struct U128FastFieldWriter {
|
||||
field: Field,
|
||||
vals: Vec<u128>,
|
||||
val_count: u32,
|
||||
|
||||
null_values: RoaringBitmap,
|
||||
}
|
||||
|
||||
impl U128FastFieldWriter {
|
||||
/// Creates a new `IntFastFieldWriter`
|
||||
pub fn new(field: Field) -> Self {
|
||||
Self {
|
||||
field,
|
||||
vals: vec![],
|
||||
val_count: 0,
|
||||
null_values: RoaringBitmap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// The memory used (inclusive childs)
|
||||
pub fn mem_usage(&self) -> usize {
|
||||
self.vals.len() * 16
|
||||
}
|
||||
|
||||
/// Records a new value.
|
||||
///
|
||||
/// The n-th value being recorded is implicitely
|
||||
/// associated to the document with the `DocId` n.
|
||||
/// (Well, `n-1` actually because of 0-indexing)
|
||||
pub fn add_val(&mut self, val: u128) {
|
||||
self.vals.push(val);
|
||||
}
|
||||
|
||||
/// Extract the fast field value from the document
|
||||
/// (or use the default value) and records it.
|
||||
///
|
||||
/// Extract the value associated to the fast field for
|
||||
/// this document.
|
||||
pub fn add_document(&mut self, doc: &Document) {
|
||||
match doc.get_first(self.field) {
|
||||
Some(v) => {
|
||||
let ip_addr = v.as_ip().unwrap();
|
||||
let value = ip_to_u128(ip_addr);
|
||||
self.add_val(value);
|
||||
}
|
||||
None => {
|
||||
self.null_values.insert(self.val_count as u32);
|
||||
}
|
||||
};
|
||||
self.val_count += 1;
|
||||
}
|
||||
|
||||
/// Push the fast fields value to the `FastFieldWriter`.
|
||||
pub fn serialize(
|
||||
&self,
|
||||
serializer: &mut CompositeFastFieldSerializer,
|
||||
doc_id_map: Option<&DocIdMapping>,
|
||||
) -> io::Result<()> {
|
||||
let mut field_write = serializer.get_field_writer(self.field, 0);
|
||||
let compressor = IntervalCompressor::from_vals(self.vals.to_vec());
|
||||
|
||||
let mut val_idx = 0;
|
||||
let mut get_val = |idx| {
|
||||
if self.null_values.contains(idx as u32) {
|
||||
compressor.null_value
|
||||
} else {
|
||||
let val = self.vals[val_idx];
|
||||
val_idx += 1;
|
||||
val
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(doc_id_map) = doc_id_map {
|
||||
// To get the actual value, we could materialize the vec with u128 including nulls, but
|
||||
// that could cost a lot of memory. Instead we just compute the index for of
|
||||
// the values
|
||||
let mut idx_to_val_idx = vec![];
|
||||
idx_to_val_idx.resize(self.val_count as usize, 0);
|
||||
|
||||
let mut val_idx = 0;
|
||||
for idx in 0..self.val_count {
|
||||
if !self.null_values.contains(idx as u32) {
|
||||
idx_to_val_idx[idx as usize] = val_idx as u32;
|
||||
val_idx += 1;
|
||||
}
|
||||
}
|
||||
|
||||
let iter = doc_id_map.iter_old_doc_ids().map(|idx| {
|
||||
if self.null_values.contains(idx as u32) {
|
||||
compressor.null_value
|
||||
} else {
|
||||
self.vals[idx_to_val_idx[idx as usize] as usize]
|
||||
}
|
||||
});
|
||||
compressor.compress_into(iter, &mut field_write)?;
|
||||
} else {
|
||||
let iter = (0..self.val_count).map(&mut get_val);
|
||||
compressor.compress_into(iter, &mut field_write)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -776,6 +776,7 @@ impl Drop for IndexWriter {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::net::IpAddr;
|
||||
|
||||
use proptest::prelude::*;
|
||||
use proptest::prop_oneof;
|
||||
@@ -789,7 +790,7 @@ mod tests {
|
||||
use crate::indexer::NoMergePolicy;
|
||||
use crate::query::{QueryParser, TermQuery};
|
||||
use crate::schema::{
|
||||
self, Cardinality, Facet, FacetOptions, IndexRecordOption, NumericOptions,
|
||||
self, Cardinality, Facet, FacetOptions, IndexRecordOption, IpOptions, NumericOptions,
|
||||
TextFieldIndexing, TextOptions, FAST, INDEXED, STORED, STRING, TEXT,
|
||||
};
|
||||
use crate::store::DOCSTORE_CACHE_CAPACITY;
|
||||
@@ -1384,6 +1385,11 @@ mod tests {
|
||||
force_end_merge: bool,
|
||||
) -> crate::Result<()> {
|
||||
let mut schema_builder = schema::Schema::builder();
|
||||
let ip_field = schema_builder.add_ip_field("ip", FAST | INDEXED | STORED);
|
||||
let ips_field = schema_builder.add_ip_field(
|
||||
"ips",
|
||||
IpOptions::default().set_fast(Cardinality::MultiValues),
|
||||
);
|
||||
let id_field = schema_builder.add_u64_field("id", FAST | INDEXED | STORED);
|
||||
let bytes_field = schema_builder.add_bytes_field("bytes", FAST | INDEXED | STORED);
|
||||
let bool_field = schema_builder.add_bool_field("bool", FAST | INDEXED | STORED);
|
||||
@@ -1439,17 +1445,37 @@ mod tests {
|
||||
match op {
|
||||
IndexingOp::AddDoc { id } => {
|
||||
let facet = Facet::from(&("/cola/".to_string() + &id.to_string()));
|
||||
index_writer.add_document(doc!(id_field=>id,
|
||||
bytes_field => id.to_le_bytes().as_slice(),
|
||||
multi_numbers=> id,
|
||||
multi_numbers => id,
|
||||
bool_field => (id % 2u64) != 0,
|
||||
multi_bools => (id % 2u64) != 0,
|
||||
multi_bools => (id % 2u64) == 0,
|
||||
text_field => id.to_string(),
|
||||
facet_field => facet,
|
||||
large_text_field=> LOREM
|
||||
))?;
|
||||
let ip_from_id = IpAddr::from((id as u128).to_be_bytes());
|
||||
|
||||
if id % 3 == 0 {
|
||||
// every 3rd doc has no ip field
|
||||
index_writer.add_document(doc!(id_field=>id,
|
||||
bytes_field => id.to_le_bytes().as_slice(),
|
||||
multi_numbers=> id,
|
||||
multi_numbers => id,
|
||||
bool_field => (id % 2u64) != 0,
|
||||
multi_bools => (id % 2u64) != 0,
|
||||
multi_bools => (id % 2u64) == 0,
|
||||
text_field => id.to_string(),
|
||||
facet_field => facet,
|
||||
large_text_field=> LOREM
|
||||
))?;
|
||||
} else {
|
||||
index_writer.add_document(doc!(id_field=>id,
|
||||
bytes_field => id.to_le_bytes().as_slice(),
|
||||
ip_field => ip_from_id,
|
||||
ips_field => ip_from_id,
|
||||
ips_field => ip_from_id,
|
||||
multi_numbers=> id,
|
||||
multi_numbers => id,
|
||||
bool_field => (id % 2u64) != 0,
|
||||
multi_bools => (id % 2u64) != 0,
|
||||
multi_bools => (id % 2u64) == 0,
|
||||
text_field => id.to_string(),
|
||||
facet_field => facet,
|
||||
large_text_field=> LOREM
|
||||
))?;
|
||||
}
|
||||
}
|
||||
IndexingOp::DeleteDoc { id } => {
|
||||
index_writer.delete_term(Term::from_field_u64(id_field, id));
|
||||
@@ -1530,6 +1556,54 @@ mod tests {
|
||||
.collect::<HashSet<_>>()
|
||||
);
|
||||
|
||||
// Check ip addr
|
||||
let ips: HashSet<Option<IpAddr>> = searcher
|
||||
.segment_readers()
|
||||
.iter()
|
||||
.flat_map(|segment_reader| {
|
||||
let ff_reader = segment_reader.fast_fields().ip_addr(ip_field).unwrap();
|
||||
segment_reader
|
||||
.doc_ids_alive()
|
||||
.map(move |doc| ff_reader.get_val(doc as u64))
|
||||
})
|
||||
.collect();
|
||||
|
||||
let expected_ips = expected_ids_and_num_occurrences
|
||||
.keys()
|
||||
.map(|id| {
|
||||
if id % 3 == 0 {
|
||||
None
|
||||
} else {
|
||||
Some(IpAddr::from((*id as u128).to_be_bytes()))
|
||||
}
|
||||
})
|
||||
.collect::<HashSet<_>>();
|
||||
assert_eq!(ips, expected_ips);
|
||||
|
||||
let expected_ips = expected_ids_and_num_occurrences
|
||||
.keys()
|
||||
.filter_map(|id| {
|
||||
if id % 3 == 0 {
|
||||
None
|
||||
} else {
|
||||
Some(IpAddr::from((*id as u128).to_be_bytes()))
|
||||
}
|
||||
})
|
||||
.collect::<HashSet<_>>();
|
||||
let ips: HashSet<IpAddr> = searcher
|
||||
.segment_readers()
|
||||
.iter()
|
||||
.flat_map(|segment_reader| {
|
||||
let ff_reader = segment_reader.fast_fields().ip_addrs(ips_field).unwrap();
|
||||
segment_reader.doc_ids_alive().flat_map(move |doc| {
|
||||
let mut vals = vec![];
|
||||
ff_reader.get_vals(doc, &mut vals);
|
||||
vals
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
assert_eq!(ips, expected_ips);
|
||||
|
||||
// multivalue fast field tests
|
||||
for segment_reader in searcher.segment_readers().iter() {
|
||||
let ff_reader = segment_reader.fast_fields().u64s(multi_numbers).unwrap();
|
||||
@@ -1631,6 +1705,31 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_minimal() {
|
||||
assert!(test_operation_strategy(
|
||||
&[
|
||||
IndexingOp::AddDoc { id: 23 },
|
||||
IndexingOp::AddDoc { id: 13 },
|
||||
IndexingOp::DeleteDoc { id: 13 }
|
||||
],
|
||||
true,
|
||||
false
|
||||
)
|
||||
.is_ok());
|
||||
|
||||
assert!(test_operation_strategy(
|
||||
&[
|
||||
IndexingOp::AddDoc { id: 23 },
|
||||
IndexingOp::AddDoc { id: 13 },
|
||||
IndexingOp::DeleteDoc { id: 13 }
|
||||
],
|
||||
false,
|
||||
false
|
||||
)
|
||||
.is_ok());
|
||||
}
|
||||
|
||||
proptest! {
|
||||
#![proptest_config(ProptestConfig::with_cases(20))]
|
||||
#[test]
|
||||
|
||||
@@ -2,6 +2,7 @@ use std::cmp;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use fastfield_codecs::ip_codec::{IntervalCompressor, IntervallDecompressor};
|
||||
use itertools::Itertools;
|
||||
use measure_time::debug_time;
|
||||
use tantivy_bitpacker::minmax;
|
||||
@@ -11,7 +12,8 @@ use crate::docset::{DocSet, TERMINATED};
|
||||
use crate::error::DataCorruption;
|
||||
use crate::fastfield::{
|
||||
AliveBitSet, CompositeFastFieldSerializer, DynamicFastFieldReader, FastFieldDataAccess,
|
||||
FastFieldReader, FastFieldStats, MultiValueLength, MultiValuedFastFieldReader,
|
||||
FastFieldReader, FastFieldReaderCodecWrapperU128, FastFieldStats, MultiValueLength,
|
||||
MultiValuedFastFieldReader, MultiValuedU128FastFieldReader,
|
||||
};
|
||||
use crate::fieldnorm::{FieldNormReader, FieldNormReaders, FieldNormsSerializer, FieldNormsWriter};
|
||||
use crate::indexer::doc_id_mapping::{expect_field_id_for_sort_field, SegmentDocIdMapping};
|
||||
@@ -321,6 +323,24 @@ impl IndexMerger {
|
||||
self.write_bytes_fast_field(field, fast_field_serializer, doc_id_mapping)?;
|
||||
}
|
||||
}
|
||||
FieldType::Ip(options) => match options.get_fastfield_cardinality() {
|
||||
Some(Cardinality::SingleValue) => {
|
||||
self.write_u128_single_fast_field(
|
||||
field,
|
||||
fast_field_serializer,
|
||||
doc_id_mapping,
|
||||
)?;
|
||||
}
|
||||
Some(Cardinality::MultiValues) => {
|
||||
self.write_u128_multi_fast_field(
|
||||
field,
|
||||
fast_field_serializer,
|
||||
doc_id_mapping,
|
||||
)?;
|
||||
}
|
||||
None => {}
|
||||
},
|
||||
|
||||
FieldType::JsonObject(_) | FieldType::Facet(_) | FieldType::Str(_) => {
|
||||
// We don't handle json fast field for the moment
|
||||
// They can be implemented using what is done
|
||||
@@ -331,6 +351,114 @@ impl IndexMerger {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// used to merge `u128` single fast fields.
|
||||
fn write_u128_multi_fast_field(
|
||||
&self,
|
||||
field: Field,
|
||||
fast_field_serializer: &mut CompositeFastFieldSerializer,
|
||||
doc_id_mapping: &SegmentDocIdMapping,
|
||||
) -> crate::Result<()> {
|
||||
let reader_ordinal_and_field_accessors = self
|
||||
.readers
|
||||
.iter()
|
||||
.map(|segment_reader| {
|
||||
let val_length_reader: MultiValuedU128FastFieldReader<u128> =
|
||||
segment_reader.fast_fields().u128s(field).expect(
|
||||
"Failed to find index for multivalued field. This is a bug in tantivy, \
|
||||
please report.",
|
||||
);
|
||||
(segment_reader, val_length_reader)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Self::write_1_n_fast_field_idx_generic(
|
||||
field,
|
||||
fast_field_serializer,
|
||||
doc_id_mapping,
|
||||
&reader_ordinal_and_field_accessors,
|
||||
)?;
|
||||
|
||||
let fast_field_readers = self
|
||||
.readers
|
||||
.iter()
|
||||
.map(|reader| {
|
||||
let u128_reader: MultiValuedU128FastFieldReader<u128> =
|
||||
reader.fast_fields().u128s(field).expect(
|
||||
"Failed to find a reader for single fast field. This is a tantivy bug and \
|
||||
it should never happen.",
|
||||
);
|
||||
u128_reader
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let compressor = {
|
||||
let vals = fast_field_readers
|
||||
.iter()
|
||||
.flat_map(|reader| reader.iter())
|
||||
.flatten()
|
||||
.collect::<Vec<u128>>();
|
||||
|
||||
IntervalCompressor::from_vals(vals)
|
||||
};
|
||||
|
||||
let iter = doc_id_mapping.iter().flat_map(|(doc_id, reader_ordinal)| {
|
||||
let fast_field_reader = &fast_field_readers[*reader_ordinal as usize];
|
||||
let mut out = vec![];
|
||||
fast_field_reader.get_vals(*doc_id, &mut out);
|
||||
out.into_iter()
|
||||
});
|
||||
|
||||
let field_write = fast_field_serializer.get_field_writer(field, 1);
|
||||
|
||||
compressor.compress_into(iter, field_write)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// used to merge `u128` single fast fields.
|
||||
fn write_u128_single_fast_field(
|
||||
&self,
|
||||
field: Field,
|
||||
fast_field_serializer: &mut CompositeFastFieldSerializer,
|
||||
doc_id_mapping: &SegmentDocIdMapping,
|
||||
) -> crate::Result<()> {
|
||||
let fast_field_readers = self
|
||||
.readers
|
||||
.iter()
|
||||
.map(|reader| {
|
||||
let u128_reader: FastFieldReaderCodecWrapperU128<u128, IntervallDecompressor> =
|
||||
reader.fast_fields().u128(field).expect(
|
||||
"Failed to find a reader for single fast field. This is a tantivy bug and \
|
||||
it should never happen.",
|
||||
);
|
||||
u128_reader
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let compressor = {
|
||||
let vals = fast_field_readers
|
||||
.iter()
|
||||
.flat_map(|reader| reader.iter())
|
||||
.flatten()
|
||||
.collect::<Vec<u128>>();
|
||||
|
||||
IntervalCompressor::from_vals(vals)
|
||||
};
|
||||
|
||||
let iter = doc_id_mapping.iter().map(|(doc_id, reader_ordinal)| {
|
||||
let fast_field_reader = &fast_field_readers[*reader_ordinal as usize];
|
||||
fast_field_reader
|
||||
.get_val(*doc_id as u64)
|
||||
.unwrap_or(compressor.null_value)
|
||||
});
|
||||
|
||||
let field_write = fast_field_serializer.get_field_writer(field, 0);
|
||||
|
||||
compressor.compress_into(iter, field_write)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// used both to merge field norms, `u64/i64` single fast fields.
|
||||
fn write_single_fast_field(
|
||||
&self,
|
||||
@@ -521,16 +649,16 @@ impl IndexMerger {
|
||||
// This is required by the bitpacker, as it needs to know
|
||||
// what should be the bit length use for bitpacking.
|
||||
let mut num_docs = 0;
|
||||
for (reader, u64s_reader) in reader_and_field_accessors.iter() {
|
||||
for (reader, value_length_reader) in reader_and_field_accessors.iter() {
|
||||
if let Some(alive_bitset) = reader.alive_bitset() {
|
||||
num_docs += alive_bitset.num_alive_docs() as u64;
|
||||
for doc in reader.doc_ids_alive() {
|
||||
let num_vals = u64s_reader.get_len(doc) as u64;
|
||||
let num_vals = value_length_reader.get_len(doc) as u64;
|
||||
total_num_vals += num_vals;
|
||||
}
|
||||
} else {
|
||||
num_docs += reader.max_doc() as u64;
|
||||
total_num_vals += u64s_reader.get_total_len();
|
||||
total_num_vals += value_length_reader.get_total_len();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -294,6 +294,13 @@ impl SegmentWriter {
|
||||
ctx,
|
||||
)?;
|
||||
}
|
||||
FieldType::Ip(_) => {
|
||||
for value in values {
|
||||
let ip_val = value.as_ip().ok_or_else(make_schema_error)?;
|
||||
term_buffer.set_text(&ip_val.to_string());
|
||||
postings_writer.subscribe(doc_id, 0u32, term_buffer, ctx);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -50,6 +50,7 @@ fn posting_writer_from_field_entry(field_entry: &FieldEntry) -> Box<dyn Postings
|
||||
| FieldType::Bool(_)
|
||||
| FieldType::Date(_)
|
||||
| FieldType::Bytes(_)
|
||||
| FieldType::Ip(_)
|
||||
| FieldType::Facet(_) => Box::new(SpecializedPostingsWriter::<NothingRecorder>::default()),
|
||||
FieldType::JsonObject(ref json_object_options) => {
|
||||
if let Some(text_indexing_option) = json_object_options.get_text_indexing_options() {
|
||||
|
||||
@@ -89,6 +89,7 @@ pub(crate) fn serialize_postings(
|
||||
| FieldType::Bool(_) => {}
|
||||
FieldType::Bytes(_) => {}
|
||||
FieldType::JsonObject(_) => {}
|
||||
FieldType::Ip(_) => {} // TODO check
|
||||
}
|
||||
|
||||
let postings_writer = per_field_postings_writers.get_for_field(field);
|
||||
|
||||
@@ -400,6 +400,7 @@ impl QueryParser {
|
||||
let bytes = base64::decode(phrase).map_err(QueryParserError::ExpectedBase64)?;
|
||||
Ok(Term::from_field_bytes(field, &bytes))
|
||||
}
|
||||
FieldType::Ip(_) => Ok(Term::from_field_text(field, phrase)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -506,6 +507,13 @@ impl QueryParser {
|
||||
let bytes_term = Term::from_field_bytes(field, &bytes);
|
||||
Ok(vec![LogicalLiteral::Term(bytes_term)])
|
||||
}
|
||||
FieldType::Ip(ref option) => {
|
||||
if !option.is_indexed() {
|
||||
return Err(QueryParserError::FieldNotIndexed(field_name.to_string()));
|
||||
}
|
||||
let text_term = Term::from_field_text(field, phrase);
|
||||
Ok(vec![LogicalLiteral::Term(text_term)])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::ip_options::IpOptions;
|
||||
use crate::schema::bytes_options::BytesOptions;
|
||||
use crate::schema::{
|
||||
is_valid_field_name, DateOptions, FacetOptions, FieldType, JsonObjectOptions, NumericOptions,
|
||||
@@ -60,6 +61,11 @@ impl FieldEntry {
|
||||
Self::new(field_name, FieldType::Date(date_options))
|
||||
}
|
||||
|
||||
/// Creates a new ip field entry.
|
||||
pub fn new_ip(field_name: String, ip_options: IpOptions) -> FieldEntry {
|
||||
Self::new(field_name, FieldType::Ip(ip_options))
|
||||
}
|
||||
|
||||
/// Creates a field entry for a facet.
|
||||
pub fn new_facet(field_name: String, facet_options: FacetOptions) -> FieldEntry {
|
||||
Self::new(field_name, FieldType::Facet(facet_options))
|
||||
@@ -114,6 +120,7 @@ impl FieldEntry {
|
||||
FieldType::Facet(ref options) => options.is_stored(),
|
||||
FieldType::Bytes(ref options) => options.is_stored(),
|
||||
FieldType::JsonObject(ref options) => options.is_stored(),
|
||||
FieldType::Ip(ref options) => options.is_stored(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,11 @@
|
||||
use std::net::IpAddr;
|
||||
use std::str::FromStr;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value as JsonValue;
|
||||
use thiserror::Error;
|
||||
|
||||
use super::ip_options::IpOptions;
|
||||
use crate::schema::bytes_options::BytesOptions;
|
||||
use crate::schema::facet_options::FacetOptions;
|
||||
use crate::schema::{
|
||||
@@ -61,9 +65,13 @@ pub enum Type {
|
||||
Bytes = b'b',
|
||||
/// Leaf in a Json object.
|
||||
Json = b'j',
|
||||
/// IpAddr
|
||||
Ip = b'p',
|
||||
/// IpAddr
|
||||
U128 = b'1',
|
||||
}
|
||||
|
||||
const ALL_TYPES: [Type; 9] = [
|
||||
const ALL_TYPES: [Type; 11] = [
|
||||
Type::Str,
|
||||
Type::U64,
|
||||
Type::I64,
|
||||
@@ -73,6 +81,8 @@ const ALL_TYPES: [Type; 9] = [
|
||||
Type::Facet,
|
||||
Type::Bytes,
|
||||
Type::Json,
|
||||
Type::Ip,
|
||||
Type::U128,
|
||||
];
|
||||
|
||||
impl Type {
|
||||
@@ -99,6 +109,8 @@ impl Type {
|
||||
Type::Facet => "Facet",
|
||||
Type::Bytes => "Bytes",
|
||||
Type::Json => "Json",
|
||||
Type::Ip => "Ip",
|
||||
Type::U128 => "U128",
|
||||
}
|
||||
}
|
||||
|
||||
@@ -115,6 +127,8 @@ impl Type {
|
||||
b'h' => Some(Type::Facet),
|
||||
b'b' => Some(Type::Bytes),
|
||||
b'j' => Some(Type::Json),
|
||||
b'p' => Some(Type::Ip),
|
||||
b'1' => Some(Type::U128),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
@@ -145,6 +159,8 @@ pub enum FieldType {
|
||||
Bytes(BytesOptions),
|
||||
/// Json object
|
||||
JsonObject(JsonObjectOptions),
|
||||
/// IpAddr field
|
||||
Ip(IpOptions),
|
||||
}
|
||||
|
||||
impl FieldType {
|
||||
@@ -160,6 +176,7 @@ impl FieldType {
|
||||
FieldType::Facet(_) => Type::Facet,
|
||||
FieldType::Bytes(_) => Type::Bytes,
|
||||
FieldType::JsonObject(_) => Type::Json,
|
||||
FieldType::Ip(_) => Type::Ip,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -175,6 +192,7 @@ impl FieldType {
|
||||
FieldType::Facet(ref _facet_options) => true,
|
||||
FieldType::Bytes(ref bytes_options) => bytes_options.is_indexed(),
|
||||
FieldType::JsonObject(ref json_object_options) => json_object_options.is_indexed(),
|
||||
FieldType::Ip(ref ip_options) => ip_options.is_indexed(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -209,6 +227,7 @@ impl FieldType {
|
||||
| FieldType::F64(ref int_options)
|
||||
| FieldType::Bool(ref int_options) => int_options.is_fast(),
|
||||
FieldType::Date(ref date_options) => date_options.is_fast(),
|
||||
FieldType::Ip(ref options) => options.is_fast(),
|
||||
FieldType::Facet(_) => true,
|
||||
FieldType::JsonObject(_) => false,
|
||||
}
|
||||
@@ -229,6 +248,7 @@ impl FieldType {
|
||||
FieldType::Facet(_) => false,
|
||||
FieldType::Bytes(ref bytes_options) => bytes_options.fieldnorms(),
|
||||
FieldType::JsonObject(ref _json_object_options) => false,
|
||||
FieldType::Ip(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -273,6 +293,13 @@ impl FieldType {
|
||||
FieldType::JsonObject(ref json_obj_options) => json_obj_options
|
||||
.get_text_indexing_options()
|
||||
.map(TextFieldIndexing::index_option),
|
||||
FieldType::Ip(ref ip_options) => {
|
||||
if ip_options.is_indexed() {
|
||||
Some(IndexRecordOption::Basic)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -312,6 +339,14 @@ impl FieldType {
|
||||
expected: "a json object",
|
||||
json: JsonValue::String(field_text),
|
||||
}),
|
||||
FieldType::Ip(_) => {
|
||||
Ok(Value::Ip(IpAddr::from_str(&field_text).map_err(|err| {
|
||||
ValueParsingError::ParseError {
|
||||
error: err.to_string(),
|
||||
json: JsonValue::String(field_text),
|
||||
}
|
||||
})?))
|
||||
}
|
||||
}
|
||||
}
|
||||
JsonValue::Number(field_val_num) => match self {
|
||||
@@ -359,6 +394,10 @@ impl FieldType {
|
||||
expected: "a json object",
|
||||
json: JsonValue::Number(field_val_num),
|
||||
}),
|
||||
FieldType::Ip(_) => Err(ValueParsingError::TypeError {
|
||||
expected: "a string with an ip addr",
|
||||
json: JsonValue::Number(field_val_num),
|
||||
}),
|
||||
},
|
||||
JsonValue::Object(json_map) => match self {
|
||||
FieldType::Str(_) => {
|
||||
|
||||
131
src/schema/ip_options.rs
Normal file
131
src/schema/ip_options.rs
Normal file
@@ -0,0 +1,131 @@
|
||||
use std::ops::BitOr;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::flags::{FastFlag, IndexedFlag, SchemaFlagList, StoredFlag};
|
||||
use super::Cardinality;
|
||||
|
||||
/// Define how an ip field should be handled by tantivy.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
pub struct IpOptions {
|
||||
indexed: bool,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
fast: Option<Cardinality>,
|
||||
stored: bool,
|
||||
}
|
||||
|
||||
impl IpOptions {
|
||||
/// Returns true iff the value is a fast field.
|
||||
pub fn is_fast(&self) -> bool {
|
||||
self.fast.is_some()
|
||||
}
|
||||
|
||||
/// Returns `true` if the json object should be stored.
|
||||
pub fn is_stored(&self) -> bool {
|
||||
self.stored
|
||||
}
|
||||
|
||||
/// Returns `true` iff the json object should be indexed.
|
||||
pub fn is_indexed(&self) -> bool {
|
||||
self.indexed
|
||||
}
|
||||
|
||||
/// Returns the cardinality of the fastfield.
|
||||
///
|
||||
/// If the field has not been declared as a fastfield, then
|
||||
/// the method returns None.
|
||||
pub fn get_fastfield_cardinality(&self) -> Option<Cardinality> {
|
||||
self.fast
|
||||
}
|
||||
|
||||
/// Set the field as indexed.
|
||||
///
|
||||
/// Setting an integer as indexed will generate
|
||||
/// a posting list for each value taken by the integer.
|
||||
///
|
||||
/// This is required for the field to be searchable.
|
||||
#[must_use]
|
||||
pub fn set_indexed(mut self) -> Self {
|
||||
self.indexed = true;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the field as stored
|
||||
#[must_use]
|
||||
pub fn set_stored(mut self) -> Self {
|
||||
self.stored = true;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the field as a fast field.
|
||||
///
|
||||
/// Fast fields are designed for random access.
|
||||
/// Access time are similar to a random lookup in an array.
|
||||
/// If more than one value is associated to a fast field, only the last one is
|
||||
/// kept.
|
||||
#[must_use]
|
||||
pub fn set_fast(mut self, cardinality: Cardinality) -> Self {
|
||||
self.fast = Some(cardinality);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl From<()> for IpOptions {
|
||||
fn from(_: ()) -> IpOptions {
|
||||
IpOptions::default()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<FastFlag> for IpOptions {
|
||||
fn from(_: FastFlag) -> Self {
|
||||
IpOptions {
|
||||
indexed: false,
|
||||
stored: false,
|
||||
fast: Some(Cardinality::SingleValue),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<StoredFlag> for IpOptions {
|
||||
fn from(_: StoredFlag) -> Self {
|
||||
IpOptions {
|
||||
indexed: false,
|
||||
stored: true,
|
||||
fast: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<IndexedFlag> for IpOptions {
|
||||
fn from(_: IndexedFlag) -> Self {
|
||||
IpOptions {
|
||||
indexed: true,
|
||||
stored: false,
|
||||
fast: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Into<IpOptions>> BitOr<T> for IpOptions {
|
||||
type Output = IpOptions;
|
||||
|
||||
fn bitor(self, other: T) -> IpOptions {
|
||||
let other = other.into();
|
||||
IpOptions {
|
||||
indexed: self.indexed | other.indexed,
|
||||
stored: self.stored | other.stored,
|
||||
fast: self.fast.or(other.fast),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Head, Tail> From<SchemaFlagList<Head, Tail>> for IpOptions
|
||||
where
|
||||
Head: Clone,
|
||||
Tail: Clone,
|
||||
Self: BitOr<Output = Self> + From<Head> + From<Tail>,
|
||||
{
|
||||
fn from(head_tail: SchemaFlagList<Head, Tail>) -> Self {
|
||||
Self::from(head_tail.head) | Self::from(head_tail.tail)
|
||||
}
|
||||
}
|
||||
@@ -121,6 +121,7 @@ mod date_time_options;
|
||||
mod field;
|
||||
mod flags;
|
||||
mod index_record_option;
|
||||
mod ip_options;
|
||||
mod json_object_options;
|
||||
mod named_field_document;
|
||||
mod numeric_options;
|
||||
@@ -139,6 +140,7 @@ pub use self::field_type::{FieldType, Type};
|
||||
pub use self::field_value::FieldValue;
|
||||
pub use self::flags::{FAST, INDEXED, STORED};
|
||||
pub use self::index_record_option::IndexRecordOption;
|
||||
pub use self::ip_options::IpOptions;
|
||||
pub use self::json_object_options::JsonObjectOptions;
|
||||
pub use self::named_field_document::NamedFieldDocument;
|
||||
pub use self::numeric_options::NumericOptions;
|
||||
|
||||
@@ -7,6 +7,7 @@ use serde::ser::SerializeSeq;
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
use serde_json::{self, Value as JsonValue};
|
||||
|
||||
use super::ip_options::IpOptions;
|
||||
use super::*;
|
||||
use crate::schema::bytes_options::BytesOptions;
|
||||
use crate::schema::field_type::ValueParsingError;
|
||||
@@ -144,6 +145,28 @@ impl SchemaBuilder {
|
||||
self.add_field(field_entry)
|
||||
}
|
||||
|
||||
/// Adds a ip field.
|
||||
/// Returns the associated field handle
|
||||
/// Internally, Tantivy simply stores ips as u64,
|
||||
/// while the user supplies IpAddr values for convenience.
|
||||
///
|
||||
/// # Caution
|
||||
///
|
||||
/// Appending two fields with the same name
|
||||
/// will result in the shadowing of the first
|
||||
/// by the second one.
|
||||
/// The first field will get a field id
|
||||
/// but only the second one will be indexed
|
||||
pub fn add_ip_field<T: Into<IpOptions>>(
|
||||
&mut self,
|
||||
field_name_str: &str,
|
||||
field_options: T,
|
||||
) -> Field {
|
||||
let field_name = String::from(field_name_str);
|
||||
let field_entry = FieldEntry::new_ip(field_name, field_options.into());
|
||||
self.add_field(field_entry)
|
||||
}
|
||||
|
||||
/// Adds a new text field.
|
||||
/// Returns the associated field handle
|
||||
///
|
||||
|
||||
@@ -415,6 +415,14 @@ fn debug_value_bytes(typ: Type, bytes: &[u8], f: &mut fmt::Formatter) -> fmt::Re
|
||||
debug_value_bytes(typ, bytes, f)?;
|
||||
}
|
||||
}
|
||||
Type::Ip => {
|
||||
let s = as_str(bytes); // TODO: change when serialization changes
|
||||
write_opt(f, s)?;
|
||||
}
|
||||
Type::U128 => {
|
||||
let s = as_str(bytes); // TODO: change when serialization changes
|
||||
write_opt(f, s)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use std::fmt;
|
||||
use std::net::IpAddr;
|
||||
|
||||
use serde::de::Visitor;
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
@@ -32,6 +33,8 @@ pub enum Value {
|
||||
Bytes(Vec<u8>),
|
||||
/// Json object value.
|
||||
JsonObject(serde_json::Map<String, serde_json::Value>),
|
||||
/// Ip
|
||||
Ip(IpAddr),
|
||||
}
|
||||
|
||||
impl Eq for Value {}
|
||||
@@ -50,6 +53,7 @@ impl Serialize for Value {
|
||||
Value::Facet(ref facet) => facet.serialize(serializer),
|
||||
Value::Bytes(ref bytes) => serializer.serialize_bytes(bytes),
|
||||
Value::JsonObject(ref obj) => obj.serialize(serializer),
|
||||
Value::Ip(ref obj) => obj.serialize(serializer), // TODO check serialization
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -201,6 +205,16 @@ impl Value {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the ip addr, provided the value is of the `Ip` type.
|
||||
/// (Returns None if the value is not of the `Ip` type)
|
||||
pub fn as_ip(&self) -> Option<IpAddr> {
|
||||
if let Value::Ip(val) = self {
|
||||
Some(*val)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<String> for Value {
|
||||
@@ -209,6 +223,12 @@ impl From<String> for Value {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<IpAddr> for Value {
|
||||
fn from(v: IpAddr) -> Value {
|
||||
Value::Ip(v)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<u64> for Value {
|
||||
fn from(v: u64) -> Value {
|
||||
Value::U64(v)
|
||||
@@ -287,7 +307,9 @@ impl From<serde_json::Value> for Value {
|
||||
}
|
||||
|
||||
mod binary_serialize {
|
||||
use std::io::{self, Read, Write};
|
||||
use std::io::{self, ErrorKind, Read, Write};
|
||||
use std::net::IpAddr;
|
||||
use std::str::FromStr;
|
||||
|
||||
use common::{f64_to_u64, u64_to_f64, BinarySerializable};
|
||||
|
||||
@@ -306,6 +328,7 @@ mod binary_serialize {
|
||||
const EXT_CODE: u8 = 7;
|
||||
const JSON_OBJ_CODE: u8 = 8;
|
||||
const BOOL_CODE: u8 = 9;
|
||||
const IP_CODE: u8 = 10;
|
||||
|
||||
// extended types
|
||||
|
||||
@@ -366,6 +389,10 @@ mod binary_serialize {
|
||||
serde_json::to_writer(writer, &map)?;
|
||||
Ok(())
|
||||
}
|
||||
Value::Ip(ref ip) => {
|
||||
IP_CODE.serialize(writer)?;
|
||||
ip.to_string().serialize(writer) // TODO Check best format
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -436,6 +463,13 @@ mod binary_serialize {
|
||||
let json_map = <serde_json::Map::<String, serde_json::Value> as serde::Deserialize>::deserialize(&mut de)?;
|
||||
Ok(Value::JsonObject(json_map))
|
||||
}
|
||||
IP_CODE => {
|
||||
let text = String::deserialize(reader)?;
|
||||
Ok(Value::Ip(IpAddr::from_str(&text).map_err(|err| {
|
||||
io::Error::new(ErrorKind::Other, err.to_string())
|
||||
})?))
|
||||
}
|
||||
|
||||
_ => Err(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
format!("No field type is associated with code {:?}", type_code),
|
||||
|
||||
Reference in New Issue
Block a user