Compare commits

...

12 Commits

Author SHA1 Message Date
Pascal Seitz
c62ddb61b7 rename, add position to docid function 2022-09-06 12:47:49 +08:00
Pascal Seitz
ed85ba62b3 make measure_time optional
move u128 vint to common, add u128 tests
2022-08-22 11:11:53 +02:00
Pascal Seitz
4b7ed27595 add multivalued ip fast field
fix null value handling in value range search
2022-08-18 12:54:20 +02:00
Pascal Seitz
66ccba2878 fix composite file issue, add proptest
Fix composite file issue. The composite file had an issue with the last written fast field, where the wrong field was set as the last range in the composite file due to sorting.
Fix handling of empty fastfields for ip codec.
2022-08-16 12:24:06 +02:00
Pascal Seitz
c56f4572f4 add merge code for u128 2022-08-12 08:48:02 +02:00
Pascal Seitz
399b137617 clippy 2022-08-11 18:50:59 +02:00
Pascal Seitz
f3efb41d4e fmt 2022-08-11 18:50:59 +02:00
Pascal Seitz
20a09282a1 plug u128 field writer 2022-08-11 18:50:59 +02:00
Pascal Seitz
1107400ae0 add null value detection for ip codec 2022-08-11 18:50:58 +02:00
Pascal Seitz
391f881fa1 fix upperrange outside compact space 2022-08-11 18:50:00 +02:00
Pascal Seitz
eec908e962 add ip codec 2022-08-11 18:50:00 +02:00
Pascal Seitz
4a1b251a08 add ip field 2022-08-11 18:50:00 +02:00
31 changed files with 2397 additions and 244 deletions

View File

@@ -61,6 +61,7 @@ serde_cbor = { version = "0.11.2", optional = true }
async-trait = "0.1.53"
arc-swap = "1.5.0"
gcd = "2.1.0"
roaring = "0.9.0"
[target.'cfg(windows)'.dependencies]
winapi = "0.3.9"

View File

@@ -11,7 +11,10 @@ mod writer;
pub use bitset::*;
pub use serialize::{BinarySerializable, DeserializeFrom, FixedSize};
pub use vint::{read_u32_vint, read_u32_vint_no_advance, serialize_vint_u32, write_u32_vint, VInt};
pub use vint::{
deserialize_vint_u128, read_u32_vint, read_u32_vint_no_advance, serialize_vint_u128,
serialize_vint_u32, write_u32_vint, VInt,
};
pub use writer::{AntiCallToken, CountingWriter, TerminatingWrite};
/// Has length trait

View File

@@ -5,6 +5,40 @@ use byteorder::{ByteOrder, LittleEndian};
use super::BinarySerializable;
/// Variable int serializes a u128 number
pub fn serialize_vint_u128(mut val: u128, output: &mut Vec<u8>) {
loop {
let next_byte: u8 = (val % 128u128) as u8;
val /= 128u128;
if val == 0 {
output.push(next_byte | STOP_BIT);
return;
} else {
output.push(next_byte);
}
}
}
/// Deserializes a u128 number
///
/// Returns the number and the slice after the vint
pub fn deserialize_vint_u128(data: &[u8]) -> io::Result<(u128, &[u8])> {
let mut result = 0u128;
let mut shift = 0u64;
for i in 0..19 {
let b = data[i];
result |= u128::from(b % 128u8) << shift;
if b >= STOP_BIT {
return Ok((result, &data[i + 1..]));
}
shift += 7;
}
Err(io::Error::new(
io::ErrorKind::InvalidData,
"Failed to deserialize u128 vint",
))
}
/// Wrapper over a `u64` that serializes as a variable int.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct VInt(pub u64);
@@ -176,6 +210,7 @@ impl BinarySerializable for VInt {
mod tests {
use super::{serialize_vint_u32, BinarySerializable, VInt};
use crate::vint::{deserialize_vint_u128, serialize_vint_u128};
fn aux_test_vint(val: u64) {
let mut v = [14u8; 10];
@@ -217,6 +252,21 @@ mod tests {
assert_eq!(&buffer[..len_vint], res2, "array wrong for {}", val);
}
fn aux_test_vint_u128(val: u128) {
let mut data = vec![];
serialize_vint_u128(val, &mut data);
let (deser_val, _data) = deserialize_vint_u128(&data).unwrap();
assert_eq!(val, deser_val);
}
#[test]
fn test_vint_u128() {
aux_test_vint_u128(0);
aux_test_vint_u128(1);
aux_test_vint_u128(u128::MAX / 3);
aux_test_vint_u128(u128::MAX);
}
#[test]
fn test_vint_u32() {
aux_test_serialize_vint_u32(0);

View File

@@ -12,13 +12,16 @@ description = "Fast field codecs used by tantivy"
common = { version = "0.3", path = "../common/", package = "tantivy-common" }
tantivy-bitpacker = { version="0.2", path = "../bitpacker/" }
prettytable-rs = {version="0.8.0", optional= true}
rand = {version="0.8.3", optional= true}
rand = { version="0.8.3", optional= true}
itertools = { version="0.10.3", optional=true}
measure_time = { version="0.8.2", optional=true}
[dev-dependencies]
more-asserts = "0.3.0"
proptest = "1.0.0"
rand = "0.8.3"
[features]
bin = ["prettytable-rs", "rand"]
bin = ["prettytable-rs", "rand", "itertools", "measure_time"]
default = ["bin"]

View File

@@ -0,0 +1,729 @@
/// This codec takes a large number space (u128) and reduces it to a compact number space.
///
/// It will find spaces in the numer range. For example:
///
/// 100, 101, 102, 103, 104, 50000, 50001
/// could be mapped to
/// 100..104 -> 0..4
/// 50000..50001 -> 5..6
///
/// Compact space 0..6 requires much less bits than 100..50001
///
/// The codec is created to compress ip addresses, but may be employed in other use cases.
use std::{
cmp::Ordering,
collections::BinaryHeap,
io::{self, Write},
net::{IpAddr, Ipv6Addr},
ops::RangeInclusive,
};
use common::{deserialize_vint_u128, serialize_vint_u128};
use tantivy_bitpacker::{self, BitPacker, BitUnpacker};
use crate::FastFieldCodecReaderU128;
pub fn ip_to_u128(ip_addr: IpAddr) -> u128 {
let ip_addr_v6: Ipv6Addr = match ip_addr {
IpAddr::V4(v4) => v4.to_ipv6_mapped(),
IpAddr::V6(v6) => v6,
};
u128::from_be_bytes(ip_addr_v6.octets())
}
const INTERVAL_COST_IN_BITS: usize = 64;
#[derive(Default, Debug)]
pub struct IntervalEncoding();
pub struct IntervalCompressor {
pub null_value: u128,
min_value: u128,
max_value: u128,
compact_space: CompactSpace,
pub num_bits: u8,
}
#[derive(Debug, Eq, PartialEq)]
struct DeltaAndPos {
delta: u128,
pos: usize,
}
impl DeltaAndPos {
fn new(ip: u128, pos: usize) -> Self {
DeltaAndPos { delta: ip, pos }
}
}
impl Ord for DeltaAndPos {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.delta.cmp(&other.delta)
}
}
impl PartialOrd for DeltaAndPos {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
self.delta.partial_cmp(&other.delta)
}
}
#[test]
fn test_delta_and_pos_sort() {
let mut deltas: BinaryHeap<DeltaAndPos> = BinaryHeap::new();
deltas.push(DeltaAndPos::new(10, 1));
deltas.push(DeltaAndPos::new(100, 10));
deltas.push(DeltaAndPos::new(1, 10));
assert_eq!(deltas.pop().unwrap().delta, 100);
assert_eq!(deltas.pop().unwrap().delta, 10);
}
/// Put the deltas for the sorted ip addresses into a binary heap
fn get_deltas(ip_addrs_sorted: &[u128]) -> BinaryHeap<DeltaAndPos> {
let mut prev_opt = None;
let mut deltas: BinaryHeap<DeltaAndPos> = BinaryHeap::new();
for (pos, ip_addr) in ip_addrs_sorted.iter().cloned().enumerate() {
let delta = if let Some(prev) = prev_opt {
ip_addr - prev
} else {
ip_addr + 1
};
// skip too small deltas
if delta > 2 {
deltas.push(DeltaAndPos::new(delta, pos));
}
prev_opt = Some(ip_addr);
}
deltas
}
/// Will collect blanks and add them to compact space if it will affect the number of bits used on
/// the compact space.
fn get_compact_space(ip_addrs_sorted: &[u128], cost_per_interval: usize) -> CompactSpace {
let max_val = *ip_addrs_sorted.last().unwrap_or(&0u128) + 1;
let mut deltas = get_deltas(ip_addrs_sorted);
let mut amplitude_compact_space = max_val;
let mut amplitude_bits: u8 = (amplitude_compact_space as f64).log2().ceil() as u8;
let mut staged_blanks = vec![];
let mut compact_space = CompactSpaceBuilder::new();
// We will stage blanks until they reduce the compact space by 1 bit.
// Binary heap to process the gaps by their size
while let Some(ip_addr_and_pos) = deltas.pop() {
let delta = ip_addr_and_pos.delta;
let pos = ip_addr_and_pos.pos;
staged_blanks.push((delta, pos));
let staged_spaces_sum: u128 = staged_blanks.iter().map(|(delta, _)| delta - 1).sum();
// +1 for later added null value
let amplitude_new_compact_space = amplitude_compact_space - staged_spaces_sum + 1;
let amplitude_new_bits = (amplitude_new_compact_space as f64).log2().ceil() as u8;
if amplitude_bits == amplitude_new_bits {
continue;
}
let saved_bits = (amplitude_bits - amplitude_new_bits) as usize * ip_addrs_sorted.len();
let cost = staged_blanks.len() * cost_per_interval;
if cost >= saved_bits {
// Continue here, since although we walk over the deltas by size,
// we can potentially save a lot at the last bits, which are smaller deltas
//
// E.g. if the first range reduces the compact space by 1000 from 2000 to 1000, which
// saves 11-10=1 bit and the next range reduces the compact space by 950 to
// 50, which saves 10-6=4 bit
continue;
}
amplitude_compact_space = amplitude_new_compact_space;
amplitude_bits = amplitude_new_bits;
for (_, pos) in staged_blanks.drain(..) {
let ip_addr = ip_addrs_sorted[pos];
if pos == 0 {
compact_space.add_hole(0..=ip_addr - 1);
} else {
compact_space.add_hole(ip_addrs_sorted[pos - 1] + 1..=ip_addr - 1);
}
}
}
compact_space.add_hole(max_val..=u128::MAX);
compact_space.finish()
}
#[test]
fn compact_space_test() {
// small ranges are ignored here
let ips = vec![
2u128, 4u128, 1000, 1001, 1002, 1003, 1004, 1005, 1008, 1010, 1012, 1260,
];
let ranges_and_compact_start = get_compact_space(&ips, 11);
let null_value = ranges_and_compact_start.null_value;
let amplitude = ranges_and_compact_start.amplitude_compact_space();
assert_eq!(null_value, 5);
assert_eq!(amplitude, 20);
assert_eq!(2, ranges_and_compact_start.to_compact(2).unwrap());
assert_eq!(ranges_and_compact_start.to_compact(100).unwrap_err(), 0);
}
#[derive(Debug, Clone, Eq, PartialEq)]
struct CompactSpaceBuilder {
covered_space: Vec<std::ops::RangeInclusive<u128>>,
}
impl CompactSpaceBuilder {
fn new() -> Self {
Self {
covered_space: vec![0..=u128::MAX],
}
}
// Will extend the first range and add a null value to it.
fn assign_and_return_null(&mut self) -> u128 {
self.covered_space[0] = *self.covered_space[0].start()..=*self.covered_space[0].end() + 1;
*self.covered_space[0].end()
}
// Assumes that repeated add_hole calls don't overlap.
fn add_hole(&mut self, hole: std::ops::RangeInclusive<u128>) {
let position = self
.covered_space
.iter()
.position(|range| range.start() <= hole.start() && range.end() >= hole.end());
if let Some(position) = position {
let old_range = self.covered_space.remove(position);
if old_range == hole {
return;
}
let new_range_end = hole.end().saturating_add(1)..=*old_range.end();
if old_range.start() == hole.start() {
self.covered_space.insert(position, new_range_end);
return;
}
let new_range_start = *old_range.start()..=hole.start().saturating_sub(1);
if old_range.end() == hole.end() {
self.covered_space.insert(position, new_range_start);
return;
}
self.covered_space.insert(position, new_range_end);
self.covered_space.insert(position, new_range_start);
}
}
fn finish(mut self) -> CompactSpace {
let null_value = self.assign_and_return_null();
let mut compact_start: u64 = 0;
let mut ranges_and_compact_start = vec![];
for cov in self.covered_space {
let covered_range_len = cov.end() - cov.start();
ranges_and_compact_start.push((cov, compact_start));
compact_start += covered_range_len as u64 + 1;
}
CompactSpace {
ranges_and_compact_start,
null_value,
}
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
struct CompactSpace {
ranges_and_compact_start: Vec<(std::ops::RangeInclusive<u128>, u64)>,
pub null_value: u128,
}
impl CompactSpace {
fn amplitude_compact_space(&self) -> u128 {
let last_range = &self.ranges_and_compact_start[self.ranges_and_compact_start.len() - 1];
last_range.1 as u128 + (last_range.0.end() - last_range.0.start()) + 1
}
fn get_range_and_compact_start(&self, pos: usize) -> &(std::ops::RangeInclusive<u128>, u64) {
&self.ranges_and_compact_start[pos]
}
fn serialize(&self, output: &mut Vec<u8>) {
serialize_vint_u128(self.null_value as u128, output);
serialize_vint_u128(self.ranges_and_compact_start.len() as u128, output);
let mut prev_ip = 0;
for (ip_range, _compact) in &self.ranges_and_compact_start {
let delta_ip = ip_range.start() - prev_ip;
serialize_vint_u128(delta_ip as u128, output);
prev_ip = *ip_range.start();
let delta_ip = ip_range.end() - prev_ip;
serialize_vint_u128(delta_ip as u128, output);
prev_ip = *ip_range.end();
}
}
fn deserialize(data: &[u8]) -> io::Result<(&[u8], Self)> {
let (null_value, data) = deserialize_vint_u128(data)?;
let (num_ip_addrs, mut data) = deserialize_vint_u128(data)?;
let mut ip_addr = 0u128;
let mut compact = 0u64;
let mut ranges_and_compact_start: Vec<(std::ops::RangeInclusive<u128>, u64)> = vec![];
for _ in 0..num_ip_addrs {
let (ip_addr_delta, new_data) = deserialize_vint_u128(data)?;
data = new_data;
ip_addr += ip_addr_delta;
let ip_addr_start = ip_addr;
let (ip_addr_delta, new_data) = deserialize_vint_u128(data)?;
data = new_data;
ip_addr += ip_addr_delta;
let ip_addr_end = ip_addr;
let compact_delta = ip_addr_end - ip_addr_start + 1;
ranges_and_compact_start.push((ip_addr_start..=ip_addr_end, compact));
compact += compact_delta as u64;
}
Ok((
data,
Self {
null_value,
ranges_and_compact_start,
},
))
}
/// Returns either Ok(the value in the compact space) or if it is outside the compact space the
/// Err(position on the next larger range above the value)
fn to_compact(&self, ip: u128) -> Result<u64, usize> {
self.ranges_and_compact_start
.binary_search_by(|probe| {
let ip_range = &probe.0;
if *ip_range.start() <= ip && *ip_range.end() >= ip {
return Ordering::Equal;
} else if ip < *ip_range.start() {
return Ordering::Greater;
} else if ip > *ip_range.end() {
return Ordering::Less;
}
panic!("not covered all ranges in check");
})
.map(|pos| {
let (range, compact_start) = &self.ranges_and_compact_start[pos];
compact_start + (ip - range.start()) as u64
})
.map_err(|pos| pos - 1)
}
/// Unpacks a ip from compact space to u128 space
fn unpack_ip(&self, compact: u64) -> u128 {
let pos = self
.ranges_and_compact_start
.binary_search_by_key(&compact, |probe| probe.1)
.map_or_else(|e| e - 1, |v| v);
let range_and_compact_start = &self.ranges_and_compact_start[pos];
let diff = compact - self.ranges_and_compact_start[pos].1;
range_and_compact_start.0.start() + diff as u128
}
}
#[test]
fn ranges_and_compact_start_test() {
let ips = vec![
2u128, 4u128, 1000, 1001, 1002, 1003, 1004, 1005, 1008, 1010, 1012, 1260,
];
let ranges_and_compact_start = get_compact_space(&ips, 11);
assert_eq!(ranges_and_compact_start.null_value, 5);
let mut output = vec![];
ranges_and_compact_start.serialize(&mut output);
assert_eq!(
ranges_and_compact_start,
CompactSpace::deserialize(&output).unwrap().1
);
for ip in &ips {
let compact = ranges_and_compact_start.to_compact(*ip).unwrap();
assert_eq!(ranges_and_compact_start.unpack_ip(compact), *ip);
}
}
pub fn train(ip_addrs_sorted: &[u128]) -> IntervalCompressor {
let ranges_and_compact_start = get_compact_space(ip_addrs_sorted, INTERVAL_COST_IN_BITS);
let null_value = ranges_and_compact_start.null_value;
let amplitude_compact_space = ranges_and_compact_start.amplitude_compact_space();
assert!(
amplitude_compact_space <= u64::MAX as u128,
"case unsupported."
);
let num_bits = tantivy_bitpacker::compute_num_bits(amplitude_compact_space as u64);
let min_value = *ip_addrs_sorted.first().unwrap_or(&0);
let max_value = *ip_addrs_sorted.last().unwrap_or(&0);
let compressor = IntervalCompressor {
null_value,
min_value,
max_value,
compact_space: ranges_and_compact_start,
num_bits,
};
let max_value = *ip_addrs_sorted.last().unwrap_or(&0u128).max(&null_value);
assert_eq!(
compressor.to_compact(max_value) + 1,
amplitude_compact_space as u64
);
compressor
}
impl IntervalCompressor {
/// Taking the vals as Vec may cost a lot of memory.
/// It is used to sort the vals.
///
/// Less memory alternative: We could just store the index (u32), and use that as sorting.
pub fn from_vals(mut vals: Vec<u128>) -> Self {
vals.sort();
train(&vals)
}
fn to_compact(&self, ip_addr: u128) -> u64 {
self.compact_space.to_compact(ip_addr).unwrap()
}
fn write_footer(&self, write: &mut impl Write, num_vals: u128) -> io::Result<()> {
let mut footer = vec![];
// header flags for future optional dictionary encoding
let header_flags = 0u64;
footer.extend_from_slice(&header_flags.to_le_bytes());
let null_value = self
.compact_space
.to_compact(self.null_value)
.expect("could not convert null to compact space");
serialize_vint_u128(null_value as u128, &mut footer);
serialize_vint_u128(self.min_value, &mut footer);
serialize_vint_u128(self.max_value, &mut footer);
self.compact_space.serialize(&mut footer);
footer.push(self.num_bits);
serialize_vint_u128(num_vals as u128, &mut footer);
write.write_all(&footer)?;
let footer_len = footer.len() as u32;
write.write_all(&footer_len.to_le_bytes())?;
Ok(())
}
pub fn compress(&self, vals: &[u128]) -> io::Result<Vec<u8>> {
let mut output = vec![];
self.compress_into(vals.iter().cloned(), &mut output)?;
Ok(output)
}
pub fn compress_into(
&self,
vals: impl Iterator<Item = u128>,
write: &mut impl Write,
) -> io::Result<()> {
let mut bitpacker = BitPacker::default();
let mut num_vals = 0;
for ip_addr in vals {
let compact = self.to_compact(ip_addr);
bitpacker.write(compact, self.num_bits, write).unwrap();
num_vals += 1;
}
bitpacker.close(write).unwrap();
self.write_footer(write, num_vals as u128)?;
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct IntervallDecompressor {
compact_space: CompactSpace,
bit_unpacker: BitUnpacker,
null_compact_space: u64,
min_value: u128,
max_value: u128,
num_vals: usize,
}
impl FastFieldCodecReaderU128 for IntervallDecompressor {
fn open_from_bytes(bytes: &[u8]) -> std::io::Result<Self> {
Self::open(bytes)
}
fn get(&self, doc: u64, data: &[u8]) -> Option<u128> {
self.get(doc, data)
}
fn get_between_vals(&self, range: RangeInclusive<u128>, data: &[u8]) -> Vec<usize> {
self.get_range(range, data)
}
fn min_value(&self) -> u128 {
self.min_value()
}
fn max_value(&self) -> u128 {
self.max_value()
}
/// The computed and assigned number for null values
fn null_value(&self) -> u128 {
self.compact_space.null_value
}
fn iter<'a>(&'a self, data: &'a [u8]) -> Box<dyn Iterator<Item = Option<u128>> + 'a> {
Box::new(self.iter(data))
}
}
impl IntervallDecompressor {
pub fn open(data: &[u8]) -> io::Result<IntervallDecompressor> {
let (data, footer_len_bytes) = data.split_at(data.len() - 4);
let footer_len = u32::from_le_bytes(footer_len_bytes.try_into().unwrap());
let data = &data[data.len() - footer_len as usize..];
let (_header_flags, data) = data.split_at(8);
let (null_compact_space, data) = deserialize_vint_u128(data)?;
let (min_value, data) = deserialize_vint_u128(data)?;
let (max_value, data) = deserialize_vint_u128(data)?;
let (mut data, compact_space) = CompactSpace::deserialize(data).unwrap();
let num_bits = data[0];
data = &data[1..];
let (num_vals, _data) = deserialize_vint_u128(data)?;
let decompressor = IntervallDecompressor {
null_compact_space: null_compact_space as u64,
min_value,
max_value,
compact_space,
num_vals: num_vals as usize,
bit_unpacker: BitUnpacker::new(num_bits),
};
Ok(decompressor)
}
/// Converting to compact space for the decompressor is more complex, since we may get values
/// which are outside the compact space. e.g. if we map
/// 1000 => 5
/// 2000 => 6
///
/// and we want a mapping for 1005, there is no equivalent compact space. We instead return an
/// error with the index of the next range.
fn to_compact(&self, ip_addr: u128) -> Result<u64, usize> {
self.compact_space.to_compact(ip_addr)
}
fn compact_to_ip_addr(&self, compact: u64) -> u128 {
self.compact_space.unpack_ip(compact)
}
/// Comparing on compact space: 1.2 GElements/s
///
/// Comparing on original space: .06 GElements/s (not completely optimized)
pub fn get_range(&self, range: RangeInclusive<u128>, data: &[u8]) -> Vec<usize> {
let from_ip_addr = *range.start();
let to_ip_addr = *range.end();
assert!(to_ip_addr >= from_ip_addr);
let compact_from = self.to_compact(from_ip_addr);
let compact_to = self.to_compact(to_ip_addr);
// Quick return, if both ranges fall into the same non-mapped space, the range can't cover
// any values, so we can early exit
match (compact_to, compact_from) {
(Err(pos1), Err(pos2)) if pos1 == pos2 => return vec![],
_ => {}
}
let compact_from = compact_from.unwrap_or_else(|pos| {
let range_and_compact_start = self.compact_space.get_range_and_compact_start(pos);
let compact_end = range_and_compact_start.1
+ (range_and_compact_start.0.end() - range_and_compact_start.0.start()) as u64;
compact_end + 1
});
// If there is no compact space, we go to the closest upperbound compact space
let compact_to = compact_to.unwrap_or_else(|pos| {
let range_and_compact_start = self.compact_space.get_range_and_compact_start(pos);
let compact_end = range_and_compact_start.1
+ (range_and_compact_start.0.end() - range_and_compact_start.0.start()) as u64;
compact_end
});
let range = compact_from..=compact_to;
let mut positions = vec![];
for (pos, compact_ip) in self
.iter_compact(data)
.enumerate()
.filter(|(_pos, val)| *val != self.null_compact_space)
{
if range.contains(&compact_ip) {
positions.push(pos);
}
}
positions
}
#[inline]
pub fn iter_compact<'a>(&'a self, data: &'a [u8]) -> impl Iterator<Item = u64> + 'a {
(0..self.num_vals).map(move |idx| self.bit_unpacker.get(idx as u64, data) as u64)
}
#[inline]
fn iter<'a>(&'a self, data: &'a [u8]) -> impl Iterator<Item = Option<u128>> + 'a {
// TODO: Performance. It would be better to iterate on the ranges and check existence via
// the bit_unpacker.
self.iter_compact(data).map(|compact| {
if compact == self.null_compact_space {
None
} else {
Some(self.compact_to_ip_addr(compact))
}
})
}
pub fn get(&self, idx: u64, data: &[u8]) -> Option<u128> {
let compact = self.bit_unpacker.get(idx, data);
if compact == self.null_compact_space {
None
} else {
Some(self.compact_to_ip_addr(compact))
}
}
pub fn min_value(&self) -> u128 {
self.min_value
}
pub fn max_value(&self) -> u128 {
self.max_value
}
}
impl IntervalEncoding {
pub fn train(&self, mut vals: Vec<u128>) -> IntervalCompressor {
vals.sort();
train(&vals)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn decode_all(data: &[u8]) -> Vec<u128> {
let decompressor = IntervallDecompressor::open(data).unwrap();
let mut u128_vals = Vec::new();
for idx in 0..decompressor.num_vals as usize {
let val = decompressor.get(idx as u64, data);
if let Some(val) = val {
u128_vals.push(val);
}
}
u128_vals
}
fn test_aux_vals(encoder: &IntervalEncoding, u128_vals: &[u128]) -> Vec<u8> {
let compressor = encoder.train(u128_vals.to_vec());
let data = compressor.compress(u128_vals).unwrap();
let decoded_val = decode_all(&data);
assert_eq!(&decoded_val, u128_vals);
data
}
#[test]
fn test_range_1() {
let vals = &[
1u128,
100u128,
3u128,
99999u128,
100000u128,
100001u128,
4_000_211_221u128,
4_000_211_222u128,
333u128,
];
let interval_encoding = IntervalEncoding::default();
let data = test_aux_vals(&interval_encoding, vals);
let decomp = IntervallDecompressor::open(&data).unwrap();
let positions = decomp.get_range(0..=1, &data);
assert_eq!(positions, vec![0]);
let positions = decomp.get_range(0..=2, &data);
assert_eq!(positions, vec![0]);
let positions = decomp.get_range(0..=3, &data);
assert_eq!(positions, vec![0, 2]);
assert_eq!(decomp.get_range(99999u128..=99999u128, &data), vec![3]);
assert_eq!(decomp.get_range(99998u128..=100000u128, &data), vec![3, 4]);
assert_eq!(decomp.get_range(99998u128..=99999u128, &data), vec![3]);
assert_eq!(decomp.get_range(99998u128..=99998u128, &data), vec![]);
assert_eq!(decomp.get_range(333u128..=333u128, &data), vec![8]);
assert_eq!(decomp.get_range(332u128..=333u128, &data), vec![8]);
assert_eq!(decomp.get_range(332u128..=334u128, &data), vec![8]);
assert_eq!(decomp.get_range(333u128..=334u128, &data), vec![8]);
assert_eq!(
decomp.get_range(4_000_211_221u128..=5_000_000_000u128, &data),
vec![6, 7]
);
}
#[test]
fn test_empty() {
let vals = &[];
let interval_encoding = IntervalEncoding::default();
let data = test_aux_vals(&interval_encoding, vals);
let _decomp = IntervallDecompressor::open(&data).unwrap();
}
#[test]
fn test_range_2() {
let vals = &[
100u128,
99999u128,
100000u128,
100001u128,
4_000_211_221u128,
4_000_211_222u128,
333u128,
];
let interval_encoding = IntervalEncoding::default();
let data = test_aux_vals(&interval_encoding, vals);
let decomp = IntervallDecompressor::open(&data).unwrap();
let positions = decomp.get_range(0..=5, &data);
assert_eq!(positions, vec![]);
let positions = decomp.get_range(0..=100, &data);
assert_eq!(positions, vec![0]);
let positions = decomp.get_range(0..=105, &data);
assert_eq!(positions, vec![0]);
}
#[test]
fn test_null() {
let vals = &[2u128];
let interval_encoding = IntervalEncoding::default().train(vals.to_vec());
let vals = vec![interval_encoding.null_value, 2u128];
let data = interval_encoding.compress(&vals).unwrap();
let decomp = IntervallDecompressor::open(&data).unwrap();
let positions = decomp.get_range(0..=1, &data);
assert_eq!(positions, vec![]);
let positions = decomp.get_range(2..=2, &data);
assert_eq!(positions, vec![1]);
}
#[test]
fn test_first_large_gaps() {
let vals = &[1_000_000_000u128; 100];
let interval_encoding = IntervalEncoding::default();
let _data = test_aux_vals(&interval_encoding, vals);
}
use proptest::prelude::*;
proptest! {
#[test]
fn compress_decompress_random(vals in proptest::collection::vec(any::<u128>()
, 1..1000)) {
let interval_encoding = IntervalEncoding::default();
let _data = test_aux_vals(&interval_encoding, &vals);
}
}
}

View File

@@ -4,8 +4,10 @@ extern crate more_asserts;
use std::io;
use std::io::Write;
use std::ops::RangeInclusive;
pub mod bitpacked;
pub mod ip_codec;
pub mod linearinterpol;
pub mod multilinearinterpol;
@@ -19,10 +21,32 @@ pub trait FastFieldCodecReader: Sized {
fn max_value(&self) -> u64;
}
pub trait FastFieldCodecReaderU128: Sized {
/// reads the metadata and returns the CodecReader
fn open_from_bytes(bytes: &[u8]) -> std::io::Result<Self>;
/// Get value for doc
fn get(&self, doc: u64, data: &[u8]) -> Option<u128>;
/// Iterator
///
/// Replace with opaque type after: https://github.com/rust-lang/rust/issues/63063
fn iter<'a>(&'a self, data: &'a [u8]) -> Box<dyn Iterator<Item = Option<u128>> + 'a>;
/// Get positions (=docs in single value) for provided value range
fn get_between_vals(&self, range: RangeInclusive<u128>, data: &[u8]) -> Vec<usize>;
/// The computed and assigned number value for null values
fn null_value(&self) -> u128;
fn min_value(&self) -> u128;
fn max_value(&self) -> u128;
}
/// The FastFieldSerializerEstimate trait is required on all variants
/// of fast field compressions, to decide which one to choose.
pub trait FastFieldCodecSerializer {
/// A codex needs to provide a unique name and id, which is
/// A codec needs to provide a unique name and id, which is
/// used for debugging and de/serialization.
const NAME: &'static str;
const ID: u8;

View File

@@ -1,11 +1,117 @@
#[macro_use]
extern crate prettytable;
use std::collections::HashSet;
use std::env;
use std::io::BufRead;
use std::net::{IpAddr, Ipv6Addr};
use std::str::FromStr;
use fastfield_codecs::ip_codec::{IntervalEncoding, IntervallDecompressor};
use fastfield_codecs::linearinterpol::LinearInterpolFastFieldSerializer;
use fastfield_codecs::multilinearinterpol::MultiLinearInterpolFastFieldSerializer;
use fastfield_codecs::{FastFieldCodecSerializer, FastFieldStats};
use itertools::Itertools;
use measure_time::print_time;
use prettytable::{Cell, Row, Table};
fn print_set_stats(ip_addrs: &[u128]) {
println!("NumIps\t{}", ip_addrs.len());
let ip_addr_set: HashSet<u128> = ip_addrs.iter().cloned().collect();
println!("NumUniqueIps\t{}", ip_addr_set.len());
let ratio_unique = ip_addr_set.len() as f64 / ip_addrs.len() as f64;
println!("RatioUniqueOverTotal\t{ratio_unique:.4}");
// histogram
let mut ip_addrs = ip_addrs.to_vec();
ip_addrs.sort();
let mut cnts: Vec<usize> = ip_addrs
.into_iter()
.dedup_with_count()
.map(|(cnt, _)| cnt)
.collect();
cnts.sort();
let top_256_cnt: usize = cnts.iter().rev().take(256).sum();
let top_128_cnt: usize = cnts.iter().rev().take(128).sum();
let top_64_cnt: usize = cnts.iter().rev().take(64).sum();
let top_8_cnt: usize = cnts.iter().rev().take(8).sum();
let total: usize = cnts.iter().sum();
println!("{}", total);
println!("{}", top_256_cnt);
println!("{}", top_128_cnt);
println!("Percentage Top8 {:02}", top_8_cnt as f32 / total as f32);
println!("Percentage Top64 {:02}", top_64_cnt as f32 / total as f32);
println!("Percentage Top128 {:02}", top_128_cnt as f32 / total as f32);
println!("Percentage Top256 {:02}", top_256_cnt as f32 / total as f32);
let mut cnts: Vec<(usize, usize)> = cnts.into_iter().dedup_with_count().collect();
cnts.sort_by(|a, b| {
if a.1 == b.1 {
a.0.cmp(&b.0)
} else {
b.1.cmp(&a.1)
}
});
println!("\n\n----\nIP Address histogram");
println!("IPAddrCount\tFrequency");
for (ip_addr_count, times) in cnts {
println!("{}\t{}", ip_addr_count, times);
}
}
fn ip_dataset() -> Vec<u128> {
let mut ip_addr_v4 = 0;
let stdin = std::io::stdin();
let ip_addrs: Vec<u128> = stdin
.lock()
.lines()
.flat_map(|line| {
let line = line.unwrap();
let line = line.trim();
let ip_addr = IpAddr::from_str(line.trim()).ok()?;
if ip_addr.is_ipv4() {
ip_addr_v4 += 1;
}
let ip_addr_v6: Ipv6Addr = match ip_addr {
IpAddr::V4(v4) => v4.to_ipv6_mapped(),
IpAddr::V6(v6) => v6,
};
Some(ip_addr_v6)
})
.map(|ip_v6| u128::from_be_bytes(ip_v6.octets()))
.collect();
println!("IpAddrsAny\t{}", ip_addrs.len());
println!("IpAddrsV4\t{}", ip_addr_v4);
ip_addrs
}
fn bench_ip() {
let encoding = IntervalEncoding();
let dataset = ip_dataset();
print_set_stats(&dataset);
let compressor = encoding.train(dataset.to_vec());
let data = compressor.compress(&dataset).unwrap();
let decompressor = IntervallDecompressor::open(&data).unwrap();
for i in 11100..11150 {
print_time!("get range");
let doc_values = decompressor.get_range(dataset[i]..=dataset[i], &data);
println!("{:?}", doc_values.len());
}
}
fn main() {
if env::args().nth(1).unwrap() == "bench" {
bench_ip();
return;
}
let mut table = Table::new();
// Add a row per time

View File

@@ -38,7 +38,7 @@ impl BinarySerializable for FileAddr {
/// A `CompositeWrite` is used to write a `CompositeFile`.
pub struct CompositeWrite<W = WritePtr> {
write: CountingWriter<W>,
offsets: HashMap<FileAddr, u64>,
offsets: Vec<(FileAddr, u64)>,
}
impl<W: TerminatingWrite + Write> CompositeWrite<W> {
@@ -47,7 +47,7 @@ impl<W: TerminatingWrite + Write> CompositeWrite<W> {
pub fn wrap(w: W) -> CompositeWrite<W> {
CompositeWrite {
write: CountingWriter::wrap(w),
offsets: HashMap::new(),
offsets: Vec::new(),
}
}
@@ -60,8 +60,8 @@ impl<W: TerminatingWrite + Write> CompositeWrite<W> {
pub fn for_field_with_idx(&mut self, field: Field, idx: usize) -> &mut CountingWriter<W> {
let offset = self.write.written_bytes();
let file_addr = FileAddr::new(field, idx);
assert!(!self.offsets.contains_key(&file_addr));
self.offsets.insert(file_addr, offset);
assert!(!self.offsets.iter().any(|el| el.0 == file_addr));
self.offsets.push((file_addr, offset));
&mut self.write
}
@@ -73,16 +73,8 @@ impl<W: TerminatingWrite + Write> CompositeWrite<W> {
let footer_offset = self.write.written_bytes();
VInt(self.offsets.len() as u64).serialize(&mut self.write)?;
let mut offset_fields: Vec<_> = self
.offsets
.iter()
.map(|(file_addr, offset)| (*offset, *file_addr))
.collect();
offset_fields.sort();
let mut prev_offset = 0;
for (offset, file_addr) in offset_fields {
for (file_addr, offset) in self.offsets {
VInt((offset - prev_offset) as u64).serialize(&mut self.write)?;
file_addr.serialize(&mut self.write)?;
prev_offset = offset;
@@ -106,6 +98,14 @@ pub struct CompositeFile {
offsets_index: HashMap<FileAddr, Range<usize>>,
}
impl std::fmt::Debug for CompositeFile {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CompositeFile")
.field("offsets_index", &self.offsets_index)
.finish()
}
}
impl CompositeFile {
/// Opens a composite file stored in a given
/// `FileSlice`.

View File

@@ -52,6 +52,11 @@ impl BytesFastFieldReader {
}
impl MultiValueLength for BytesFastFieldReader {
fn get_range(&self, doc_id: DocId) -> std::ops::Range<u64> {
let (start, stop) = self.range(doc_id);
start as u64..stop as u64
}
fn get_len(&self, doc_id: DocId) -> u64 {
self.num_bytes(doc_id) as u64
}

241
src/fastfield/fast_value.rs Normal file
View File

@@ -0,0 +1,241 @@
use std::net::{IpAddr, Ipv6Addr};
use crate::schema::{Cardinality, FieldType, Type};
use crate::DateTime;
pub fn ip_to_u128(ip_addr: IpAddr) -> u128 {
let ip_addr_v6: Ipv6Addr = match ip_addr {
IpAddr::V4(v4) => v4.to_ipv6_mapped(),
IpAddr::V6(v6) => v6,
};
u128::from_be_bytes(ip_addr_v6.octets())
}
/// Trait for large types that are allowed for fast fields: u128, IpAddr
pub trait FastValueU128: Clone + Copy + Send + Sync + PartialOrd + 'static {
/// Converts a value from u128
///
/// Internally all fast field values are encoded as u128.
fn from_u128(val: u128) -> Self;
/// Converts a value to u128.
///
/// Internally all fast field values are encoded as u128.
fn to_u128(&self) -> u128;
/// Cast value to `u128`.
/// The value is just reinterpreted in memory.
fn as_u128(&self) -> u128;
/// Returns the `schema::Type` for this FastValue.
fn to_type() -> Type;
/// Build a default value. This default value is never used, so the value does not
/// really matter.
fn make_zero() -> Self {
Self::from_u128(0u128)
}
}
impl FastValueU128 for u128 {
fn from_u128(val: u128) -> Self {
val
}
fn to_u128(&self) -> u128 {
*self
}
fn as_u128(&self) -> u128 {
*self
}
fn to_type() -> Type {
Type::U128
}
}
impl FastValueU128 for IpAddr {
fn from_u128(val: u128) -> Self {
IpAddr::from(val.to_be_bytes())
}
fn to_u128(&self) -> u128 {
ip_to_u128(*self)
}
fn as_u128(&self) -> u128 {
ip_to_u128(*self)
}
fn to_type() -> Type {
Type::Ip
}
}
/// Trait for types that are allowed for fast fields:
/// (u64, i64 and f64, bool, DateTime).
pub trait FastValue: Clone + Copy + Send + Sync + PartialOrd + 'static {
/// Converts a value from u64
///
/// Internally all fast field values are encoded as u64.
/// **Note: To be used for converting encoded Term, Posting values.**
fn from_u64(val: u64) -> Self;
/// Converts a value to u64.
///
/// Internally all fast field values are encoded as u64.
fn to_u64(&self) -> u64;
/// Returns the fast field cardinality that can be extracted from the given
/// `FieldType`.
///
/// If the type is not a fast field, `None` is returned.
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality>;
/// Cast value to `u64`.
/// The value is just reinterpreted in memory.
fn as_u64(&self) -> u64;
/// Build a default value. This default value is never used, so the value does not
/// really matter.
fn make_zero() -> Self {
Self::from_u64(0i64.to_u64())
}
/// Returns the `schema::Type` for this FastValue.
fn to_type() -> Type;
}
impl FastValue for u64 {
fn from_u64(val: u64) -> Self {
val
}
fn to_u64(&self) -> u64 {
*self
}
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
match *field_type {
FieldType::U64(ref integer_options) => integer_options.get_fastfield_cardinality(),
FieldType::Facet(_) => Some(Cardinality::MultiValues),
_ => None,
}
}
fn as_u64(&self) -> u64 {
*self
}
fn to_type() -> Type {
Type::U64
}
}
impl FastValue for i64 {
fn from_u64(val: u64) -> Self {
common::u64_to_i64(val)
}
fn to_u64(&self) -> u64 {
common::i64_to_u64(*self)
}
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
match *field_type {
FieldType::I64(ref integer_options) => integer_options.get_fastfield_cardinality(),
_ => None,
}
}
fn as_u64(&self) -> u64 {
*self as u64
}
fn to_type() -> Type {
Type::I64
}
}
impl FastValue for f64 {
fn from_u64(val: u64) -> Self {
common::u64_to_f64(val)
}
fn to_u64(&self) -> u64 {
common::f64_to_u64(*self)
}
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
match *field_type {
FieldType::F64(ref integer_options) => integer_options.get_fastfield_cardinality(),
_ => None,
}
}
fn as_u64(&self) -> u64 {
self.to_bits()
}
fn to_type() -> Type {
Type::F64
}
}
impl FastValue for bool {
fn from_u64(val: u64) -> Self {
val != 0u64
}
fn to_u64(&self) -> u64 {
match self {
false => 0,
true => 1,
}
}
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
match *field_type {
FieldType::Bool(ref integer_options) => integer_options.get_fastfield_cardinality(),
_ => None,
}
}
fn as_u64(&self) -> u64 {
*self as u64
}
fn to_type() -> Type {
Type::Bool
}
}
impl FastValue for DateTime {
/// Converts a timestamp microseconds into DateTime.
///
/// **Note the timestamps is expected to be in microseconds.**
fn from_u64(timestamp_micros_u64: u64) -> Self {
let timestamp_micros = i64::from_u64(timestamp_micros_u64);
Self::from_timestamp_micros(timestamp_micros)
}
fn to_u64(&self) -> u64 {
common::i64_to_u64(self.into_timestamp_micros())
}
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
match *field_type {
FieldType::Date(ref options) => options.get_fastfield_cardinality(),
_ => None,
}
}
fn as_u64(&self) -> u64 {
self.into_timestamp_micros().as_u64()
}
fn to_type() -> Type {
Type::Date
}
}

View File

@@ -20,24 +20,30 @@
//!
//! Read access performance is comparable to that of an array lookup.
use std::collections::btree_map::Range;
pub use self::alive_bitset::{intersect_alive_bitsets, write_alive_bitset, AliveBitSet};
pub use self::bytes::{BytesFastFieldReader, BytesFastFieldWriter};
pub use self::error::{FastFieldNotAvailableError, Result};
pub use self::facet_reader::FacetReader;
pub use self::fast_value::{FastValue, FastValueU128};
pub(crate) use self::gcd::{find_gcd, GCDFastFieldCodec, GCD_CODEC_ID, GCD_DEFAULT};
pub use self::multivalued::{MultiValuedFastFieldReader, MultiValuedFastFieldWriter};
pub use self::reader::{DynamicFastFieldReader, FastFieldReader};
pub use self::multivalued::{
MultiValuedFastFieldReader, MultiValuedFastFieldWriter, MultiValuedU128FastFieldReader,
};
pub use self::reader::{DynamicFastFieldReader, FastFieldReader, FastFieldReaderCodecWrapperU128};
pub use self::readers::FastFieldReaders;
pub(crate) use self::readers::{type_and_cardinality, FastType};
pub use self::serializer::{CompositeFastFieldSerializer, FastFieldDataAccess, FastFieldStats};
pub use self::writer::{FastFieldsWriter, IntFastFieldWriter};
use crate::schema::{Cardinality, FieldType, Type, Value};
use crate::{DateTime, DocId};
use crate::schema::Value;
use crate::DocId;
mod alive_bitset;
mod bytes;
mod error;
mod facet_reader;
mod fast_value;
mod gcd;
mod multivalued;
mod reader;
@@ -57,182 +63,6 @@ pub(crate) const ALL_CODECS: &[FastFieldCodecName; 3] = &[
FastFieldCodecName::BlockwiseLinearInterpol,
];
/// Trait for `BytesFastFieldReader` and `MultiValuedFastFieldReader` to return the length of data
/// for a doc_id
pub trait MultiValueLength {
/// returns the num of values associated to a doc_id
fn get_len(&self, doc_id: DocId) -> u64;
/// returns the sum of num values for all doc_ids
fn get_total_len(&self) -> u64;
}
/// Trait for types that are allowed for fast fields:
/// (u64, i64 and f64, bool, DateTime).
pub trait FastValue: Clone + Copy + Send + Sync + PartialOrd + 'static {
/// Converts a value from u64
///
/// Internally all fast field values are encoded as u64.
/// **Note: To be used for converting encoded Term, Posting values.**
fn from_u64(val: u64) -> Self;
/// Converts a value to u64.
///
/// Internally all fast field values are encoded as u64.
fn to_u64(&self) -> u64;
/// Returns the fast field cardinality that can be extracted from the given
/// `FieldType`.
///
/// If the type is not a fast field, `None` is returned.
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality>;
/// Cast value to `u64`.
/// The value is just reinterpreted in memory.
fn as_u64(&self) -> u64;
/// Build a default value. This default value is never used, so the value does not
/// really matter.
fn make_zero() -> Self {
Self::from_u64(0i64.to_u64())
}
/// Returns the `schema::Type` for this FastValue.
fn to_type() -> Type;
}
impl FastValue for u64 {
fn from_u64(val: u64) -> Self {
val
}
fn to_u64(&self) -> u64 {
*self
}
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
match *field_type {
FieldType::U64(ref integer_options) => integer_options.get_fastfield_cardinality(),
FieldType::Facet(_) => Some(Cardinality::MultiValues),
_ => None,
}
}
fn as_u64(&self) -> u64 {
*self
}
fn to_type() -> Type {
Type::U64
}
}
impl FastValue for i64 {
fn from_u64(val: u64) -> Self {
common::u64_to_i64(val)
}
fn to_u64(&self) -> u64 {
common::i64_to_u64(*self)
}
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
match *field_type {
FieldType::I64(ref integer_options) => integer_options.get_fastfield_cardinality(),
_ => None,
}
}
fn as_u64(&self) -> u64 {
*self as u64
}
fn to_type() -> Type {
Type::I64
}
}
impl FastValue for f64 {
fn from_u64(val: u64) -> Self {
common::u64_to_f64(val)
}
fn to_u64(&self) -> u64 {
common::f64_to_u64(*self)
}
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
match *field_type {
FieldType::F64(ref integer_options) => integer_options.get_fastfield_cardinality(),
_ => None,
}
}
fn as_u64(&self) -> u64 {
self.to_bits()
}
fn to_type() -> Type {
Type::F64
}
}
impl FastValue for bool {
fn from_u64(val: u64) -> Self {
val != 0u64
}
fn to_u64(&self) -> u64 {
match self {
false => 0,
true => 1,
}
}
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
match *field_type {
FieldType::Bool(ref integer_options) => integer_options.get_fastfield_cardinality(),
_ => None,
}
}
fn as_u64(&self) -> u64 {
*self as u64
}
fn to_type() -> Type {
Type::Bool
}
}
impl FastValue for DateTime {
/// Converts a timestamp microseconds into DateTime.
///
/// **Note the timestamps is expected to be in microseconds.**
fn from_u64(timestamp_micros_u64: u64) -> Self {
let timestamp_micros = i64::from_u64(timestamp_micros_u64);
Self::from_timestamp_micros(timestamp_micros)
}
fn to_u64(&self) -> u64 {
common::i64_to_u64(self.into_timestamp_micros())
}
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
match *field_type {
FieldType::Date(ref options) => options.get_fastfield_cardinality(),
_ => None,
}
}
fn as_u64(&self) -> u64 {
self.into_timestamp_micros().as_u64()
}
fn to_type() -> Type {
Type::Date
}
}
fn value_to_u64(value: &Value) -> u64 {
match value {
Value::U64(val) => val.to_u64(),
@@ -244,6 +74,17 @@ fn value_to_u64(value: &Value) -> u64 {
}
}
/// Trait for `BytesFastFieldReader` and `MultiValuedFastFieldReader` to return the length of data
/// for a doc_id
pub trait MultiValueLength {
/// returns the positions of values associated to a doc_id
fn get_range(&self, doc_id: DocId) -> std::ops::Range<u64>;
/// returns the num of values associated to a doc_id
fn get_len(&self, doc_id: DocId) -> u64;
/// returns the sum of num values for all doc_ids
fn get_total_len(&self) -> u64;
}
/// The fast field type
pub enum FastFieldType {
/// Numeric type, e.g. f64.
@@ -268,6 +109,7 @@ impl FastFieldType {
mod tests {
use std::collections::HashMap;
use std::net::IpAddr;
use std::ops::Range;
use std::path::Path;
@@ -280,9 +122,11 @@ mod tests {
use super::*;
use crate::directory::{CompositeFile, Directory, RamDirectory, WritePtr};
use crate::merge_policy::NoMergePolicy;
use crate::schema::{Document, Field, Schema, FAST, STRING, TEXT};
use crate::schema::{
self, Cardinality, Document, Field, IpOptions, Schema, FAST, INDEXED, STORED, STRING, TEXT,
};
use crate::time::OffsetDateTime;
use crate::{DateOptions, DatePrecision, Index, SegmentId, SegmentReader};
use crate::{DateOptions, DatePrecision, DateTime, Index, SegmentId, SegmentReader};
pub static SCHEMA: Lazy<Schema> = Lazy::new(|| {
let mut schema_builder = Schema::builder();
@@ -308,7 +152,7 @@ mod tests {
}
#[test]
pub fn test_fastfield_i64_u64() {
pub fn test_datetime_conversion() {
let datetime = DateTime::from_utc(OffsetDateTime::UNIX_EPOCH);
assert_eq!(i64::from_u64(datetime.to_u64()), 0i64);
}
@@ -617,6 +461,85 @@ mod tests {
all
}
#[test]
fn test_ip_fastfield_minimal() -> crate::Result<()> {
let mut schema_builder = schema::Schema::builder();
let ip_field = schema_builder.add_ip_field("ip", FAST | INDEXED | STORED);
let ips_field = schema_builder.add_ip_field(
"ips",
IpOptions::default().set_fast(Cardinality::MultiValues),
);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let ip1 = IpAddr::from((1_u128).to_be_bytes());
let ip2 = IpAddr::from((2_u128).to_be_bytes());
let ip3 = IpAddr::from((3_u128).to_be_bytes());
let mut index_writer = index.writer_for_tests()?;
index_writer.set_merge_policy(Box::new(NoMergePolicy));
index_writer.add_document(doc!())?;
index_writer.add_document(doc!(
ip_field => ip2,
ips_field => ip2,
ips_field => ip2,
))?;
index_writer.commit()?;
let reader = index.reader()?;
let searcher = reader.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
let segment_reader = searcher.segment_reader(0);
let fast_fields = segment_reader.fast_fields();
// single value
let ip_addr_fast_field = fast_fields.ip_addr(ip_field).unwrap();
assert_eq!(ip_addr_fast_field.get_val(0), None);
assert_eq!(ip_addr_fast_field.get_val(1), Some(ip2));
assert_eq!(ip_addr_fast_field.get_between_vals(ip2..=ip2), vec![1]);
assert_eq!(ip_addr_fast_field.get_between_vals(ip1..=ip2), vec![1]);
assert_eq!(ip_addr_fast_field.get_between_vals(ip2..=ip3), vec![1]);
assert_eq!(ip_addr_fast_field.get_between_vals(ip1..=ip3), vec![1]);
assert_eq!(
ip_addr_fast_field.get_between_vals(ip1..=ip1),
vec![] as Vec<usize>
);
assert_eq!(
ip_addr_fast_field.get_between_vals(ip3..=ip3),
vec![] as Vec<usize>
);
// multi value
let ip_addr_fast_field = fast_fields.ip_addrs(ips_field).unwrap();
assert_eq!(ip_addr_fast_field.get_first_val(0), None);
assert_eq!(ip_addr_fast_field.get_first_val(1), Some(ip2));
let mut out = vec![];
ip_addr_fast_field.get_vals(0, &mut out);
assert_eq!(out, vec![] as Vec<IpAddr>);
let mut out = vec![];
ip_addr_fast_field.get_vals(1, &mut out);
assert_eq!(out, vec![ip2, ip2]);
assert_eq!(ip_addr_fast_field.get_between_vals(ip2..=ip2), vec![1]);
assert_eq!(ip_addr_fast_field.get_between_vals(ip1..=ip2), vec![1]);
assert_eq!(ip_addr_fast_field.get_between_vals(ip2..=ip3), vec![1]);
assert_eq!(ip_addr_fast_field.get_between_vals(ip1..=ip3), vec![1]);
assert_eq!(
ip_addr_fast_field.get_between_vals(ip1..=ip1),
vec![] as Vec<usize>
);
assert_eq!(
ip_addr_fast_field.get_between_vals(ip3..=ip3),
vec![] as Vec<usize>
);
Ok(())
}
#[test]
fn test_text_fastfield() -> crate::Result<()> {
let mut schema_builder = Schema::builder();

View File

@@ -1,8 +1,8 @@
mod reader;
mod writer;
pub use self::reader::MultiValuedFastFieldReader;
pub use self::writer::MultiValuedFastFieldWriter;
pub use self::reader::{MultiValuedFastFieldReader, MultiValuedU128FastFieldReader};
pub use self::writer::{MultiValuedFastFieldWriter, U128MultiValueFastFieldWriter};
#[cfg(test)]
mod tests {

View File

@@ -1,6 +1,11 @@
use std::ops::Range;
use std::ops::{Range, RangeInclusive};
use crate::fastfield::{DynamicFastFieldReader, FastFieldReader, FastValue, MultiValueLength};
use fastfield_codecs::ip_codec::IntervallDecompressor;
use crate::fastfield::{
DynamicFastFieldReader, FastFieldReader, FastFieldReaderCodecWrapperU128, FastValue,
FastValueU128, MultiValueLength,
};
use crate::DocId;
/// Reader for a multivalued `u64` fast field.
@@ -84,6 +89,155 @@ impl<Item: FastValue> MultiValuedFastFieldReader<Item> {
}
impl<Item: FastValue> MultiValueLength for MultiValuedFastFieldReader<Item> {
fn get_range(&self, doc_id: DocId) -> std::ops::Range<u64> {
self.range(doc_id)
}
fn get_len(&self, doc_id: DocId) -> u64 {
self.num_vals(doc_id) as u64
}
fn get_total_len(&self) -> u64 {
self.total_num_vals() as u64
}
}
/// Reader for a multivalued `u128` fast field.
///
/// The reader is implemented as a `u64` fast field for the index and a `u128` fast field.
///
/// The `vals_reader` will access the concatenated list of all
/// values for all reader.
/// The `idx_reader` associated, for each document, the index of its first value.
#[derive(Clone)]
pub struct MultiValuedU128FastFieldReader<Item: FastValueU128> {
idx_reader: DynamicFastFieldReader<u64>,
vals_reader: FastFieldReaderCodecWrapperU128<Item, IntervallDecompressor>,
}
impl<Item: FastValueU128> MultiValuedU128FastFieldReader<Item> {
pub(crate) fn open(
idx_reader: DynamicFastFieldReader<u64>,
vals_reader: FastFieldReaderCodecWrapperU128<Item, IntervallDecompressor>,
) -> MultiValuedU128FastFieldReader<Item> {
Self {
idx_reader,
vals_reader,
}
}
/// Returns `[start, end)`, such that the values associated
/// to the given document are `start..end`.
#[inline]
fn range(&self, doc: DocId) -> Range<u64> {
let start = self.idx_reader.get(doc);
let end = self.idx_reader.get(doc + 1);
start..end
}
/// Returns the array of values associated to the given `doc`.
#[inline]
pub fn get_first_val(&self, doc: DocId) -> Option<Item> {
let range = self.range(doc);
if range.is_empty() {
return None;
}
self.vals_reader.get_val(range.start)
}
/// Returns the array of values associated to the given `doc`.
#[inline]
fn get_vals_for_range(&self, range: Range<u64>, vals: &mut Vec<Item>) {
let len = (range.end - range.start) as usize;
vals.resize(len, Item::make_zero());
self.vals_reader.get_range(range.start, &mut vals[..]);
}
/// Returns the array of values associated to the given `doc`.
#[inline]
pub fn get_vals(&self, doc: DocId, vals: &mut Vec<Item>) {
let range = self.range(doc);
self.get_vals_for_range(range, vals);
}
/// Returns all docids which are in the provided value range
pub fn get_between_vals(&self, range: RangeInclusive<Item>) -> Vec<DocId> {
let positions = self.vals_reader.get_between_vals(range);
positions_to_docids(&positions, self)
}
/// Iterates over all elements in the fast field
pub fn iter(&self) -> impl Iterator<Item = Option<Item>> + '_ {
self.vals_reader.iter()
}
/// Returns the minimum value for this fast field.
///
/// The min value does not take in account of possible
/// deleted document, and should be considered as a lower bound
/// of the actual mimimum value.
pub fn min_value(&self) -> Item {
self.vals_reader.min_value()
}
/// Returns the maximum value for this fast field.
///
/// The max value does not take in account of possible
/// deleted document, and should be considered as an upper bound
/// of the actual maximum value.
pub fn max_value(&self) -> Item {
self.vals_reader.max_value()
}
/// Returns the number of values associated with the document `DocId`.
#[inline]
pub fn num_vals(&self, doc: DocId) -> usize {
let range = self.range(doc);
(range.end - range.start) as usize
}
/// Returns the overall number of values in this field .
#[inline]
pub fn total_num_vals(&self) -> u64 {
self.idx_reader.max_value()
}
}
/// Converts a list of positions of values in a 1:n index to the corresponding list of DocIds.
///
/// Since there is no index for value pos -> docid, but docid -> value pos range, we scan the index.
///
/// Correctness: positions needs to be sorted.
///
/// TODO: Instead of a linear scan we can employ a binary search to match a docid to its value
/// position.
fn positions_to_docids<T: MultiValueLength>(positions: &[usize], multival_idx: &T) -> Vec<DocId> {
let mut docs = vec![];
let mut cur_doc = 0u32;
let mut last_doc = None;
for pos in positions {
loop {
let range = multival_idx.get_range(cur_doc);
if range.contains(&(*pos as u64)) {
// avoid duplicates
if Some(cur_doc) == last_doc {
break;
}
docs.push(cur_doc);
last_doc = Some(cur_doc);
break;
}
cur_doc += 1;
}
}
docs
}
impl<Item: FastValueU128> MultiValueLength for MultiValuedU128FastFieldReader<Item> {
fn get_range(&self, doc_id: DocId) -> std::ops::Range<u64> {
self.range(doc_id)
}
fn get_len(&self, doc_id: DocId) -> u64 {
self.num_vals(doc_id) as u64
}
@@ -92,6 +246,7 @@ impl<Item: FastValue> MultiValueLength for MultiValuedFastFieldReader<Item> {
self.total_num_vals() as u64
}
}
#[cfg(test)]
mod tests {

View File

@@ -1,5 +1,6 @@
use std::io;
use fastfield_codecs::ip_codec::{ip_to_u128, IntervalCompressor};
use fnv::FnvHashMap;
use tantivy_bitpacker::minmax;
@@ -120,25 +121,9 @@ impl MultiValuedFastFieldWriter {
&'a self,
doc_id_map: Option<&'b DocIdMapping>,
) -> impl Iterator<Item = &'b [u64]> {
let doc_id_iter: Box<dyn Iterator<Item = u32>> = if let Some(doc_id_map) = doc_id_map {
Box::new(doc_id_map.iter_old_doc_ids())
} else {
let max_doc = self.doc_index.len() as DocId;
Box::new(0..max_doc)
};
doc_id_iter.map(move |doc_id| self.get_values_for_doc_id(doc_id))
get_ordered_values(&self.vals, &self.doc_index, doc_id_map)
}
/// returns all values for a doc_ids
fn get_values_for_doc_id(&self, doc_id: u32) -> &[u64] {
let start_pos = self.doc_index[doc_id as usize] as usize;
let end_pos = self
.doc_index
.get(doc_id as usize + 1)
.cloned()
.unwrap_or(self.vals.len() as u64) as usize; // special case, last doc_id has no offset information
&self.vals[start_pos..end_pos]
}
/// Serializes fast field values by pushing them to the `FastFieldSerializer`.
///
/// If a mapping is given, the values are remapped *and sorted* before serialization.
@@ -220,3 +205,132 @@ impl MultiValuedFastFieldWriter {
Ok(())
}
}
/// Writer for multi-valued (as in, more than one value per document)
/// int fast field.
///
/// This `Writer` is only useful for advanced users.
/// The normal way to get your multivalued int in your index
/// is to
/// - declare your field with fast set to `Cardinality::MultiValues`
/// in your schema
/// - add your document simply by calling `.add_document(...)`.
///
/// The `MultiValuedFastFieldWriter` can be acquired from the
pub struct U128MultiValueFastFieldWriter {
field: Field,
vals: Vec<u128>,
doc_index: Vec<u64>,
}
impl U128MultiValueFastFieldWriter {
/// Creates a new `U128MultiValueFastFieldWriter`
pub(crate) fn new(field: Field) -> Self {
U128MultiValueFastFieldWriter {
field,
vals: Vec::new(),
doc_index: Vec::new(),
}
}
/// The memory used (inclusive childs)
pub fn mem_usage(&self) -> usize {
self.vals.capacity() * std::mem::size_of::<UnorderedTermId>()
+ self.doc_index.capacity() * std::mem::size_of::<u64>()
}
/// Finalize the current document.
pub(crate) fn next_doc(&mut self) {
self.doc_index.push(self.vals.len() as u64);
}
/// Pushes a new value to the current document.
pub(crate) fn add_val(&mut self, val: u128) {
self.vals.push(val);
}
/// Shift to the next document and adds
/// all of the matching field values present in the document.
pub fn add_document(&mut self, doc: &Document) {
self.next_doc();
for field_value in doc.field_values() {
if field_value.field == self.field {
let value = field_value.value();
let ip_addr = value.as_ip().unwrap();
let value = ip_to_u128(ip_addr);
self.add_val(value);
}
}
}
/// Returns an iterator over values per doc_id in ascending doc_id order.
///
/// Normally the order is simply iterating self.doc_id_index.
/// With doc_id_map it accounts for the new mapping, returning values in the order of the
/// new doc_ids.
fn get_ordered_values<'a: 'b, 'b>(
&'a self,
doc_id_map: Option<&'b DocIdMapping>,
) -> impl Iterator<Item = &'b [u128]> {
get_ordered_values(&self.vals, &self.doc_index, doc_id_map)
}
/// Serializes fast field values.
pub fn serialize(
&self,
serializer: &mut CompositeFastFieldSerializer,
doc_id_map: Option<&DocIdMapping>,
) -> io::Result<()> {
{
// writing the offset index
let mut doc_index_serializer =
serializer.new_u64_fast_field_with_idx(self.field, 0, self.vals.len() as u64, 0)?;
let mut offset = 0;
for vals in self.get_ordered_values(doc_id_map) {
doc_index_serializer.add_val(offset)?;
offset += vals.len() as u64;
}
doc_index_serializer.add_val(self.vals.len() as u64)?;
doc_index_serializer.close_field()?;
}
{
let field_write = serializer.get_field_writer(self.field, 1);
let compressor = IntervalCompressor::from_vals(self.vals.to_vec());
let iter = self.get_ordered_values(doc_id_map).flatten().cloned();
compressor.compress_into(iter, field_write)?;
}
Ok(())
}
}
/// Returns an iterator over values per doc_id in ascending doc_id order.
///
/// Normally the order is simply iterating self.doc_id_index.
/// With doc_id_map it accounts for the new mapping, returning values in the order of the
/// new doc_ids.
fn get_ordered_values<'a: 'b, 'b, T>(
vals: &'a [T],
doc_index: &'a [u64],
doc_id_map: Option<&'b DocIdMapping>,
) -> impl Iterator<Item = &'b [T]> {
let doc_id_iter: Box<dyn Iterator<Item = u32>> = if let Some(doc_id_map) = doc_id_map {
Box::new(doc_id_map.iter_old_doc_ids())
} else {
let max_doc = doc_index.len() as DocId;
Box::new(0..max_doc)
};
doc_id_iter.map(move |doc_id| get_values_for_doc_id(doc_id, vals, doc_index))
}
/// returns all values for a doc_id
fn get_values_for_doc_id<'a, T>(doc_id: u32, vals: &'a [T], doc_index: &'a [u64]) -> &'a [T] {
let start_pos = doc_index[doc_id as usize] as usize;
let end_pos = doc_index
.get(doc_id as usize + 1)
.cloned()
.unwrap_or(vals.len() as u64) as usize; // special case, last doc_id has no offset information
&vals[start_pos..end_pos]
}

View File

@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::marker::PhantomData;
use std::ops::RangeInclusive;
use std::path::Path;
use fastfield_codecs::bitpacked::{
@@ -11,9 +12,9 @@ use fastfield_codecs::linearinterpol::{
use fastfield_codecs::multilinearinterpol::{
MultiLinearInterpolFastFieldReader, MultiLinearInterpolFastFieldSerializer,
};
use fastfield_codecs::{FastFieldCodecReader, FastFieldCodecSerializer};
use fastfield_codecs::{FastFieldCodecReader, FastFieldCodecReaderU128, FastFieldCodecSerializer};
use super::{FastValue, GCDFastFieldCodec, GCD_CODEC_ID};
use super::{FastValue, FastValueU128, GCDFastFieldCodec, GCD_CODEC_ID};
use crate::directory::{CompositeFile, Directory, FileSlice, OwnedBytes, RamDirectory, WritePtr};
use crate::fastfield::{CompositeFastFieldSerializer, FastFieldsWriter};
use crate::schema::{Schema, FAST};
@@ -210,6 +211,78 @@ impl<Item: FastValue> FastFieldReader<Item> for DynamicFastFieldReader<Item> {
}
}
/// Wrapper for accessing a fastfield.
///
/// Holds the data and the codec to the read the data.
#[derive(Clone)]
pub struct FastFieldReaderCodecWrapperU128<Item: FastValueU128, CodecReader> {
reader: CodecReader,
bytes: OwnedBytes,
_phantom: PhantomData<Item>,
}
impl<Item: FastValueU128, C: FastFieldCodecReaderU128> FastFieldReaderCodecWrapperU128<Item, C> {
/// Opens a fast field given the bytes.
pub fn open_from_bytes(bytes: OwnedBytes) -> crate::Result<Self> {
let reader = C::open_from_bytes(bytes.as_slice())?;
Ok(Self {
reader,
bytes,
_phantom: PhantomData,
})
}
/// Returns the item for the docid, if present
pub fn get_val(&self, doc: u64) -> Option<Item> {
self.reader
.get(doc, self.bytes.as_slice())
.map(|el| Item::from_u128(el))
}
/// Internally `multivalued` also use SingleValue Fast fields.
/// It works as follows... A first column contains the list of start index
/// for each document, a second column contains the actual values.
///
/// The values associated to a given doc, are then
/// `second_column[first_column.get(doc)..first_column.get(doc+1)]`.
///
/// Which means single value fast field reader can be indexed internally with
/// something different from a `DocId`. For this use case, we want to use `u64`
/// values.
///
/// See `get_range` for an actual documentation about this method.
pub(crate) fn get_range(&self, start: u64, output: &mut [Item]) {
for (i, out) in output.iter_mut().enumerate() {
if let Some(val) = self.get_val(start + (i as u64)) {
*out = val
}
}
}
/// Iterates over all elements in the fast field
pub fn iter(&self) -> impl Iterator<Item = Option<Item>> + '_ {
self.reader
.iter(self.bytes.as_slice())
.map(|el| el.map(Item::from_u128))
}
/// Returns all docids which are in the provided value range
pub fn get_between_vals(&self, range: RangeInclusive<Item>) -> Vec<usize> {
let range = range.start().to_u128()..=range.end().to_u128();
self.reader.get_between_vals(range, self.bytes.as_slice())
}
/// Return min_value.
pub fn min_value(&self) -> Item {
Item::from_u128(self.reader.min_value())
}
/// Return max_value.
pub fn max_value(&self) -> Item {
Item::from_u128(self.reader.max_value())
}
}
/// Wrapper for accessing a fastfield.
///
/// Holds the data and the codec to the read the data.

View File

@@ -1,4 +1,9 @@
use super::reader::DynamicFastFieldReader;
use std::net::IpAddr;
use fastfield_codecs::ip_codec::IntervallDecompressor;
use super::multivalued::MultiValuedU128FastFieldReader;
use super::reader::{DynamicFastFieldReader, FastFieldReaderCodecWrapperU128};
use crate::directory::{CompositeFile, FileSlice};
use crate::fastfield::{
BytesFastFieldReader, FastFieldNotAvailableError, FastValue, MultiValuedFastFieldReader,
@@ -20,6 +25,7 @@ pub struct FastFieldReaders {
pub(crate) enum FastType {
I64,
U64,
U128,
F64,
Bool,
Date,
@@ -46,6 +52,9 @@ pub(crate) fn type_and_cardinality(field_type: &FieldType) -> Option<(FastType,
FieldType::Str(options) if options.is_fast() => {
Some((FastType::U64, Cardinality::MultiValues))
}
FieldType::Ip(options) => options
.get_fastfield_cardinality()
.map(|cardinality| (FastType::U128, cardinality)),
_ => None,
}
}
@@ -137,6 +146,69 @@ impl FastFieldReaders {
self.typed_fast_field_reader(field)
}
/// Returns the `ip` fast field reader reader associated to `field`.
///
/// If `field` is not a u128 fast field, this method returns an Error.
pub fn ip_addr(
&self,
field: Field,
) -> crate::Result<FastFieldReaderCodecWrapperU128<IpAddr, IntervallDecompressor>> {
self.check_type(field, FastType::U128, Cardinality::SingleValue)?;
let fast_field_slice = self.fast_field_data(field, 0)?;
let bytes = fast_field_slice.read_bytes()?;
FastFieldReaderCodecWrapperU128::<IpAddr, IntervallDecompressor>::open_from_bytes(bytes)
}
/// Returns the `ip` fast field reader reader associated to `field`.
///
/// If `field` is not a u128 fast field, this method returns an Error.
pub fn ip_addrs(&self, field: Field) -> crate::Result<MultiValuedU128FastFieldReader<IpAddr>> {
self.check_type(field, FastType::U128, Cardinality::MultiValues)?;
let idx_reader: DynamicFastFieldReader<u64> = self.typed_fast_field_reader(field)?;
let fast_field_slice = self.fast_field_data(field, 1)?;
let bytes = fast_field_slice.read_bytes()?;
let vals_reader =
FastFieldReaderCodecWrapperU128::<IpAddr, IntervallDecompressor>::open_from_bytes(
bytes,
)?;
Ok(MultiValuedU128FastFieldReader::open(
idx_reader,
vals_reader,
))
}
/// Returns the `u128` fast field reader reader associated to `field`.
///
/// If `field` is not a u128 fast field, this method returns an Error.
pub fn u128(
&self,
field: Field,
) -> crate::Result<FastFieldReaderCodecWrapperU128<u128, IntervallDecompressor>> {
let fast_field_slice = self.fast_field_data(field, 0)?;
let bytes = fast_field_slice.read_bytes()?;
FastFieldReaderCodecWrapperU128::<u128, IntervallDecompressor>::open_from_bytes(bytes)
}
/// Returns the `u128` multi-valued fast field reader reader associated to `field`.
///
/// If `field` is not a u128 multi-valued fast field, this method returns an Error.
pub fn u128s(&self, field: Field) -> crate::Result<MultiValuedU128FastFieldReader<u128>> {
self.check_type(field, FastType::U128, Cardinality::MultiValues)?;
let idx_reader: DynamicFastFieldReader<u64> = self.typed_fast_field_reader(field)?;
let fast_field_slice = self.fast_field_data(field, 1)?;
let bytes = fast_field_slice.read_bytes()?;
let vals_reader =
FastFieldReaderCodecWrapperU128::<u128, IntervallDecompressor>::open_from_bytes(bytes)?;
Ok(MultiValuedU128FastFieldReader::open(
idx_reader,
vals_reader,
))
}
/// Returns the `u64` fast field reader reader associated to `field`, regardless of whether the
/// given field is effectively of type `u64` or not.
///

View File

@@ -327,6 +327,11 @@ impl CompositeFastFieldSerializer {
FastBytesFieldSerializer { write: field_write }
}
/// Gets the underlying writer
pub fn get_field_writer(&mut self, field: Field, idx: usize) -> &mut impl Write {
self.composite_write.for_field_with_idx(field, idx)
}
/// Closes the serializer
///
/// After this call the data must be persistently saved on disk.

View File

@@ -2,10 +2,12 @@ use std::collections::HashMap;
use std::io;
use common;
use fastfield_codecs::ip_codec::{ip_to_u128, IntervalCompressor};
use fnv::FnvHashMap;
use roaring::RoaringBitmap;
use tantivy_bitpacker::BlockedBitpacker;
use super::multivalued::MultiValuedFastFieldWriter;
use super::multivalued::{MultiValuedFastFieldWriter, U128MultiValueFastFieldWriter};
use super::serializer::FastFieldStats;
use super::{FastFieldDataAccess, FastFieldType, FastValue};
use crate::fastfield::{BytesFastFieldWriter, CompositeFastFieldSerializer};
@@ -19,6 +21,8 @@ use crate::DatePrecision;
pub struct FastFieldsWriter {
term_id_writers: Vec<MultiValuedFastFieldWriter>,
single_value_writers: Vec<IntFastFieldWriter>,
u128_value_writers: Vec<U128FastFieldWriter>,
u128_multi_value_writers: Vec<U128MultiValueFastFieldWriter>,
multi_values_writers: Vec<MultiValuedFastFieldWriter>,
bytes_value_writers: Vec<BytesFastFieldWriter>,
}
@@ -34,6 +38,8 @@ fn fast_field_default_value(field_entry: &FieldEntry) -> u64 {
impl FastFieldsWriter {
/// Create all `FastFieldWriter` required by the schema.
pub fn from_schema(schema: &Schema) -> FastFieldsWriter {
let mut u128_value_writers = Vec::new();
let mut u128_multi_value_writers = Vec::new();
let mut single_value_writers = Vec::new();
let mut term_id_writers = Vec::new();
let mut multi_values_writers = Vec::new();
@@ -97,10 +103,27 @@ impl FastFieldsWriter {
bytes_value_writers.push(fast_field_writer);
}
}
FieldType::Ip(opt) => {
if opt.is_fast() {
match opt.get_fastfield_cardinality() {
Some(Cardinality::SingleValue) => {
let fast_field_writer = U128FastFieldWriter::new(field);
u128_value_writers.push(fast_field_writer);
}
Some(Cardinality::MultiValues) => {
let fast_field_writer = U128MultiValueFastFieldWriter::new(field);
u128_multi_value_writers.push(fast_field_writer);
}
None => {}
}
}
}
FieldType::Str(_) | FieldType::JsonObject(_) => {}
}
}
FastFieldsWriter {
u128_value_writers,
u128_multi_value_writers,
term_id_writers,
single_value_writers,
multi_values_writers,
@@ -129,6 +152,16 @@ impl FastFieldsWriter {
.iter()
.map(|w| w.mem_usage())
.sum::<usize>()
+ self
.u128_value_writers
.iter()
.map(|w| w.mem_usage())
.sum::<usize>()
+ self
.u128_multi_value_writers
.iter()
.map(|w| w.mem_usage())
.sum::<usize>()
}
/// Get the `FastFieldWriter` associated to a field.
@@ -190,7 +223,6 @@ impl FastFieldsWriter {
.iter_mut()
.find(|field_writer| field_writer.field() == field)
}
/// Indexes all of the fastfields of a new document.
pub fn add_document(&mut self, doc: &Document) {
for field_writer in &mut self.term_id_writers {
@@ -205,6 +237,12 @@ impl FastFieldsWriter {
for field_writer in &mut self.bytes_value_writers {
field_writer.add_document(doc);
}
for field_writer in &mut self.u128_value_writers {
field_writer.add_document(doc);
}
for field_writer in &mut self.u128_multi_value_writers {
field_writer.add_document(doc);
}
}
/// Serializes all of the `FastFieldWriter`s by pushing them in
@@ -230,6 +268,129 @@ impl FastFieldsWriter {
for field_writer in &self.bytes_value_writers {
field_writer.serialize(serializer, doc_id_map)?;
}
for field_writer in &self.u128_value_writers {
field_writer.serialize(serializer, doc_id_map)?;
}
for field_writer in &self.u128_multi_value_writers {
field_writer.serialize(serializer, doc_id_map)?;
}
Ok(())
}
}
/// Fast field writer for u128 values.
/// The fast field writer just keeps the values in memory.
///
/// Only when the segment writer can be closed and
/// persisted on disc, the fast field writer is
/// sent to a `FastFieldSerializer` via the `.serialize(...)`
/// method.
///
/// We cannot serialize earlier as the values are
/// compressed to a compact number space and the number of
/// bits required for bitpacking can only been known once
/// we have seen all of the values.
pub struct U128FastFieldWriter {
field: Field,
vals: Vec<u128>,
val_count: u32,
null_values: RoaringBitmap,
}
impl U128FastFieldWriter {
/// Creates a new `IntFastFieldWriter`
pub fn new(field: Field) -> Self {
Self {
field,
vals: vec![],
val_count: 0,
null_values: RoaringBitmap::new(),
}
}
/// The memory used (inclusive childs)
pub fn mem_usage(&self) -> usize {
self.vals.len() * 16
}
/// Records a new value.
///
/// The n-th value being recorded is implicitely
/// associated to the document with the `DocId` n.
/// (Well, `n-1` actually because of 0-indexing)
pub fn add_val(&mut self, val: u128) {
self.vals.push(val);
}
/// Extract the fast field value from the document
/// (or use the default value) and records it.
///
/// Extract the value associated to the fast field for
/// this document.
pub fn add_document(&mut self, doc: &Document) {
match doc.get_first(self.field) {
Some(v) => {
let ip_addr = v.as_ip().unwrap();
let value = ip_to_u128(ip_addr);
self.add_val(value);
}
None => {
self.null_values.insert(self.val_count as u32);
}
};
self.val_count += 1;
}
/// Push the fast fields value to the `FastFieldWriter`.
pub fn serialize(
&self,
serializer: &mut CompositeFastFieldSerializer,
doc_id_map: Option<&DocIdMapping>,
) -> io::Result<()> {
let mut field_write = serializer.get_field_writer(self.field, 0);
let compressor = IntervalCompressor::from_vals(self.vals.to_vec());
let mut val_idx = 0;
let mut get_val = |idx| {
if self.null_values.contains(idx as u32) {
compressor.null_value
} else {
let val = self.vals[val_idx];
val_idx += 1;
val
}
};
if let Some(doc_id_map) = doc_id_map {
// To get the actual value, we could materialize the vec with u128 including nulls, but
// that could cost a lot of memory. Instead we just compute the index for of
// the values
let mut idx_to_val_idx = vec![];
idx_to_val_idx.resize(self.val_count as usize, 0);
let mut val_idx = 0;
for idx in 0..self.val_count {
if !self.null_values.contains(idx as u32) {
idx_to_val_idx[idx as usize] = val_idx as u32;
val_idx += 1;
}
}
let iter = doc_id_map.iter_old_doc_ids().map(|idx| {
if self.null_values.contains(idx as u32) {
compressor.null_value
} else {
self.vals[idx_to_val_idx[idx as usize] as usize]
}
});
compressor.compress_into(iter, &mut field_write)?;
} else {
let iter = (0..self.val_count).map(&mut get_val);
compressor.compress_into(iter, &mut field_write)?;
}
Ok(())
}
}

View File

@@ -776,6 +776,7 @@ impl Drop for IndexWriter {
#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
use std::net::IpAddr;
use proptest::prelude::*;
use proptest::prop_oneof;
@@ -789,7 +790,7 @@ mod tests {
use crate::indexer::NoMergePolicy;
use crate::query::{QueryParser, TermQuery};
use crate::schema::{
self, Cardinality, Facet, FacetOptions, IndexRecordOption, NumericOptions,
self, Cardinality, Facet, FacetOptions, IndexRecordOption, IpOptions, NumericOptions,
TextFieldIndexing, TextOptions, FAST, INDEXED, STORED, STRING, TEXT,
};
use crate::store::DOCSTORE_CACHE_CAPACITY;
@@ -1384,6 +1385,11 @@ mod tests {
force_end_merge: bool,
) -> crate::Result<()> {
let mut schema_builder = schema::Schema::builder();
let ip_field = schema_builder.add_ip_field("ip", FAST | INDEXED | STORED);
let ips_field = schema_builder.add_ip_field(
"ips",
IpOptions::default().set_fast(Cardinality::MultiValues),
);
let id_field = schema_builder.add_u64_field("id", FAST | INDEXED | STORED);
let bytes_field = schema_builder.add_bytes_field("bytes", FAST | INDEXED | STORED);
let bool_field = schema_builder.add_bool_field("bool", FAST | INDEXED | STORED);
@@ -1439,17 +1445,37 @@ mod tests {
match op {
IndexingOp::AddDoc { id } => {
let facet = Facet::from(&("/cola/".to_string() + &id.to_string()));
index_writer.add_document(doc!(id_field=>id,
bytes_field => id.to_le_bytes().as_slice(),
multi_numbers=> id,
multi_numbers => id,
bool_field => (id % 2u64) != 0,
multi_bools => (id % 2u64) != 0,
multi_bools => (id % 2u64) == 0,
text_field => id.to_string(),
facet_field => facet,
large_text_field=> LOREM
))?;
let ip_from_id = IpAddr::from((id as u128).to_be_bytes());
if id % 3 == 0 {
// every 3rd doc has no ip field
index_writer.add_document(doc!(id_field=>id,
bytes_field => id.to_le_bytes().as_slice(),
multi_numbers=> id,
multi_numbers => id,
bool_field => (id % 2u64) != 0,
multi_bools => (id % 2u64) != 0,
multi_bools => (id % 2u64) == 0,
text_field => id.to_string(),
facet_field => facet,
large_text_field=> LOREM
))?;
} else {
index_writer.add_document(doc!(id_field=>id,
bytes_field => id.to_le_bytes().as_slice(),
ip_field => ip_from_id,
ips_field => ip_from_id,
ips_field => ip_from_id,
multi_numbers=> id,
multi_numbers => id,
bool_field => (id % 2u64) != 0,
multi_bools => (id % 2u64) != 0,
multi_bools => (id % 2u64) == 0,
text_field => id.to_string(),
facet_field => facet,
large_text_field=> LOREM
))?;
}
}
IndexingOp::DeleteDoc { id } => {
index_writer.delete_term(Term::from_field_u64(id_field, id));
@@ -1530,6 +1556,54 @@ mod tests {
.collect::<HashSet<_>>()
);
// Check ip addr
let ips: HashSet<Option<IpAddr>> = searcher
.segment_readers()
.iter()
.flat_map(|segment_reader| {
let ff_reader = segment_reader.fast_fields().ip_addr(ip_field).unwrap();
segment_reader
.doc_ids_alive()
.map(move |doc| ff_reader.get_val(doc as u64))
})
.collect();
let expected_ips = expected_ids_and_num_occurrences
.keys()
.map(|id| {
if id % 3 == 0 {
None
} else {
Some(IpAddr::from((*id as u128).to_be_bytes()))
}
})
.collect::<HashSet<_>>();
assert_eq!(ips, expected_ips);
let expected_ips = expected_ids_and_num_occurrences
.keys()
.filter_map(|id| {
if id % 3 == 0 {
None
} else {
Some(IpAddr::from((*id as u128).to_be_bytes()))
}
})
.collect::<HashSet<_>>();
let ips: HashSet<IpAddr> = searcher
.segment_readers()
.iter()
.flat_map(|segment_reader| {
let ff_reader = segment_reader.fast_fields().ip_addrs(ips_field).unwrap();
segment_reader.doc_ids_alive().flat_map(move |doc| {
let mut vals = vec![];
ff_reader.get_vals(doc, &mut vals);
vals
})
})
.collect();
assert_eq!(ips, expected_ips);
// multivalue fast field tests
for segment_reader in searcher.segment_readers().iter() {
let ff_reader = segment_reader.fast_fields().u64s(multi_numbers).unwrap();
@@ -1631,6 +1705,31 @@ mod tests {
Ok(())
}
#[test]
fn test_minimal() {
assert!(test_operation_strategy(
&[
IndexingOp::AddDoc { id: 23 },
IndexingOp::AddDoc { id: 13 },
IndexingOp::DeleteDoc { id: 13 }
],
true,
false
)
.is_ok());
assert!(test_operation_strategy(
&[
IndexingOp::AddDoc { id: 23 },
IndexingOp::AddDoc { id: 13 },
IndexingOp::DeleteDoc { id: 13 }
],
false,
false
)
.is_ok());
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(20))]
#[test]

View File

@@ -2,6 +2,7 @@ use std::cmp;
use std::collections::HashMap;
use std::sync::Arc;
use fastfield_codecs::ip_codec::{IntervalCompressor, IntervallDecompressor};
use itertools::Itertools;
use measure_time::debug_time;
use tantivy_bitpacker::minmax;
@@ -11,7 +12,8 @@ use crate::docset::{DocSet, TERMINATED};
use crate::error::DataCorruption;
use crate::fastfield::{
AliveBitSet, CompositeFastFieldSerializer, DynamicFastFieldReader, FastFieldDataAccess,
FastFieldReader, FastFieldStats, MultiValueLength, MultiValuedFastFieldReader,
FastFieldReader, FastFieldReaderCodecWrapperU128, FastFieldStats, MultiValueLength,
MultiValuedFastFieldReader, MultiValuedU128FastFieldReader,
};
use crate::fieldnorm::{FieldNormReader, FieldNormReaders, FieldNormsSerializer, FieldNormsWriter};
use crate::indexer::doc_id_mapping::{expect_field_id_for_sort_field, SegmentDocIdMapping};
@@ -321,6 +323,24 @@ impl IndexMerger {
self.write_bytes_fast_field(field, fast_field_serializer, doc_id_mapping)?;
}
}
FieldType::Ip(options) => match options.get_fastfield_cardinality() {
Some(Cardinality::SingleValue) => {
self.write_u128_single_fast_field(
field,
fast_field_serializer,
doc_id_mapping,
)?;
}
Some(Cardinality::MultiValues) => {
self.write_u128_multi_fast_field(
field,
fast_field_serializer,
doc_id_mapping,
)?;
}
None => {}
},
FieldType::JsonObject(_) | FieldType::Facet(_) | FieldType::Str(_) => {
// We don't handle json fast field for the moment
// They can be implemented using what is done
@@ -331,6 +351,114 @@ impl IndexMerger {
Ok(())
}
// used to merge `u128` single fast fields.
fn write_u128_multi_fast_field(
&self,
field: Field,
fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<()> {
let reader_ordinal_and_field_accessors = self
.readers
.iter()
.map(|segment_reader| {
let val_length_reader: MultiValuedU128FastFieldReader<u128> =
segment_reader.fast_fields().u128s(field).expect(
"Failed to find index for multivalued field. This is a bug in tantivy, \
please report.",
);
(segment_reader, val_length_reader)
})
.collect::<Vec<_>>();
Self::write_1_n_fast_field_idx_generic(
field,
fast_field_serializer,
doc_id_mapping,
&reader_ordinal_and_field_accessors,
)?;
let fast_field_readers = self
.readers
.iter()
.map(|reader| {
let u128_reader: MultiValuedU128FastFieldReader<u128> =
reader.fast_fields().u128s(field).expect(
"Failed to find a reader for single fast field. This is a tantivy bug and \
it should never happen.",
);
u128_reader
})
.collect::<Vec<_>>();
let compressor = {
let vals = fast_field_readers
.iter()
.flat_map(|reader| reader.iter())
.flatten()
.collect::<Vec<u128>>();
IntervalCompressor::from_vals(vals)
};
let iter = doc_id_mapping.iter().flat_map(|(doc_id, reader_ordinal)| {
let fast_field_reader = &fast_field_readers[*reader_ordinal as usize];
let mut out = vec![];
fast_field_reader.get_vals(*doc_id, &mut out);
out.into_iter()
});
let field_write = fast_field_serializer.get_field_writer(field, 1);
compressor.compress_into(iter, field_write)?;
Ok(())
}
// used to merge `u128` single fast fields.
fn write_u128_single_fast_field(
&self,
field: Field,
fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<()> {
let fast_field_readers = self
.readers
.iter()
.map(|reader| {
let u128_reader: FastFieldReaderCodecWrapperU128<u128, IntervallDecompressor> =
reader.fast_fields().u128(field).expect(
"Failed to find a reader for single fast field. This is a tantivy bug and \
it should never happen.",
);
u128_reader
})
.collect::<Vec<_>>();
let compressor = {
let vals = fast_field_readers
.iter()
.flat_map(|reader| reader.iter())
.flatten()
.collect::<Vec<u128>>();
IntervalCompressor::from_vals(vals)
};
let iter = doc_id_mapping.iter().map(|(doc_id, reader_ordinal)| {
let fast_field_reader = &fast_field_readers[*reader_ordinal as usize];
fast_field_reader
.get_val(*doc_id as u64)
.unwrap_or(compressor.null_value)
});
let field_write = fast_field_serializer.get_field_writer(field, 0);
compressor.compress_into(iter, field_write)?;
Ok(())
}
// used both to merge field norms, `u64/i64` single fast fields.
fn write_single_fast_field(
&self,
@@ -521,16 +649,16 @@ impl IndexMerger {
// This is required by the bitpacker, as it needs to know
// what should be the bit length use for bitpacking.
let mut num_docs = 0;
for (reader, u64s_reader) in reader_and_field_accessors.iter() {
for (reader, value_length_reader) in reader_and_field_accessors.iter() {
if let Some(alive_bitset) = reader.alive_bitset() {
num_docs += alive_bitset.num_alive_docs() as u64;
for doc in reader.doc_ids_alive() {
let num_vals = u64s_reader.get_len(doc) as u64;
let num_vals = value_length_reader.get_len(doc) as u64;
total_num_vals += num_vals;
}
} else {
num_docs += reader.max_doc() as u64;
total_num_vals += u64s_reader.get_total_len();
total_num_vals += value_length_reader.get_total_len();
}
}

View File

@@ -294,6 +294,13 @@ impl SegmentWriter {
ctx,
)?;
}
FieldType::Ip(_) => {
for value in values {
let ip_val = value.as_ip().ok_or_else(make_schema_error)?;
term_buffer.set_text(&ip_val.to_string());
postings_writer.subscribe(doc_id, 0u32, term_buffer, ctx);
}
}
}
}
Ok(())

View File

@@ -50,6 +50,7 @@ fn posting_writer_from_field_entry(field_entry: &FieldEntry) -> Box<dyn Postings
| FieldType::Bool(_)
| FieldType::Date(_)
| FieldType::Bytes(_)
| FieldType::Ip(_)
| FieldType::Facet(_) => Box::new(SpecializedPostingsWriter::<NothingRecorder>::default()),
FieldType::JsonObject(ref json_object_options) => {
if let Some(text_indexing_option) = json_object_options.get_text_indexing_options() {

View File

@@ -89,6 +89,7 @@ pub(crate) fn serialize_postings(
| FieldType::Bool(_) => {}
FieldType::Bytes(_) => {}
FieldType::JsonObject(_) => {}
FieldType::Ip(_) => {} // TODO check
}
let postings_writer = per_field_postings_writers.get_for_field(field);

View File

@@ -400,6 +400,7 @@ impl QueryParser {
let bytes = base64::decode(phrase).map_err(QueryParserError::ExpectedBase64)?;
Ok(Term::from_field_bytes(field, &bytes))
}
FieldType::Ip(_) => Ok(Term::from_field_text(field, phrase)),
}
}
@@ -506,6 +507,13 @@ impl QueryParser {
let bytes_term = Term::from_field_bytes(field, &bytes);
Ok(vec![LogicalLiteral::Term(bytes_term)])
}
FieldType::Ip(ref option) => {
if !option.is_indexed() {
return Err(QueryParserError::FieldNotIndexed(field_name.to_string()));
}
let text_term = Term::from_field_text(field, phrase);
Ok(vec![LogicalLiteral::Term(text_term)])
}
}
}

View File

@@ -1,5 +1,6 @@
use serde::{Deserialize, Serialize};
use super::ip_options::IpOptions;
use crate::schema::bytes_options::BytesOptions;
use crate::schema::{
is_valid_field_name, DateOptions, FacetOptions, FieldType, JsonObjectOptions, NumericOptions,
@@ -60,6 +61,11 @@ impl FieldEntry {
Self::new(field_name, FieldType::Date(date_options))
}
/// Creates a new ip field entry.
pub fn new_ip(field_name: String, ip_options: IpOptions) -> FieldEntry {
Self::new(field_name, FieldType::Ip(ip_options))
}
/// Creates a field entry for a facet.
pub fn new_facet(field_name: String, facet_options: FacetOptions) -> FieldEntry {
Self::new(field_name, FieldType::Facet(facet_options))
@@ -114,6 +120,7 @@ impl FieldEntry {
FieldType::Facet(ref options) => options.is_stored(),
FieldType::Bytes(ref options) => options.is_stored(),
FieldType::JsonObject(ref options) => options.is_stored(),
FieldType::Ip(ref options) => options.is_stored(),
}
}
}

View File

@@ -1,7 +1,11 @@
use std::net::IpAddr;
use std::str::FromStr;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use thiserror::Error;
use super::ip_options::IpOptions;
use crate::schema::bytes_options::BytesOptions;
use crate::schema::facet_options::FacetOptions;
use crate::schema::{
@@ -61,9 +65,13 @@ pub enum Type {
Bytes = b'b',
/// Leaf in a Json object.
Json = b'j',
/// IpAddr
Ip = b'p',
/// IpAddr
U128 = b'1',
}
const ALL_TYPES: [Type; 9] = [
const ALL_TYPES: [Type; 11] = [
Type::Str,
Type::U64,
Type::I64,
@@ -73,6 +81,8 @@ const ALL_TYPES: [Type; 9] = [
Type::Facet,
Type::Bytes,
Type::Json,
Type::Ip,
Type::U128,
];
impl Type {
@@ -99,6 +109,8 @@ impl Type {
Type::Facet => "Facet",
Type::Bytes => "Bytes",
Type::Json => "Json",
Type::Ip => "Ip",
Type::U128 => "U128",
}
}
@@ -115,6 +127,8 @@ impl Type {
b'h' => Some(Type::Facet),
b'b' => Some(Type::Bytes),
b'j' => Some(Type::Json),
b'p' => Some(Type::Ip),
b'1' => Some(Type::U128),
_ => None,
}
}
@@ -145,6 +159,8 @@ pub enum FieldType {
Bytes(BytesOptions),
/// Json object
JsonObject(JsonObjectOptions),
/// IpAddr field
Ip(IpOptions),
}
impl FieldType {
@@ -160,6 +176,7 @@ impl FieldType {
FieldType::Facet(_) => Type::Facet,
FieldType::Bytes(_) => Type::Bytes,
FieldType::JsonObject(_) => Type::Json,
FieldType::Ip(_) => Type::Ip,
}
}
@@ -175,6 +192,7 @@ impl FieldType {
FieldType::Facet(ref _facet_options) => true,
FieldType::Bytes(ref bytes_options) => bytes_options.is_indexed(),
FieldType::JsonObject(ref json_object_options) => json_object_options.is_indexed(),
FieldType::Ip(ref ip_options) => ip_options.is_indexed(),
}
}
@@ -209,6 +227,7 @@ impl FieldType {
| FieldType::F64(ref int_options)
| FieldType::Bool(ref int_options) => int_options.is_fast(),
FieldType::Date(ref date_options) => date_options.is_fast(),
FieldType::Ip(ref options) => options.is_fast(),
FieldType::Facet(_) => true,
FieldType::JsonObject(_) => false,
}
@@ -229,6 +248,7 @@ impl FieldType {
FieldType::Facet(_) => false,
FieldType::Bytes(ref bytes_options) => bytes_options.fieldnorms(),
FieldType::JsonObject(ref _json_object_options) => false,
FieldType::Ip(_) => false,
}
}
@@ -273,6 +293,13 @@ impl FieldType {
FieldType::JsonObject(ref json_obj_options) => json_obj_options
.get_text_indexing_options()
.map(TextFieldIndexing::index_option),
FieldType::Ip(ref ip_options) => {
if ip_options.is_indexed() {
Some(IndexRecordOption::Basic)
} else {
None
}
}
}
}
@@ -312,6 +339,14 @@ impl FieldType {
expected: "a json object",
json: JsonValue::String(field_text),
}),
FieldType::Ip(_) => {
Ok(Value::Ip(IpAddr::from_str(&field_text).map_err(|err| {
ValueParsingError::ParseError {
error: err.to_string(),
json: JsonValue::String(field_text),
}
})?))
}
}
}
JsonValue::Number(field_val_num) => match self {
@@ -359,6 +394,10 @@ impl FieldType {
expected: "a json object",
json: JsonValue::Number(field_val_num),
}),
FieldType::Ip(_) => Err(ValueParsingError::TypeError {
expected: "a string with an ip addr",
json: JsonValue::Number(field_val_num),
}),
},
JsonValue::Object(json_map) => match self {
FieldType::Str(_) => {

131
src/schema/ip_options.rs Normal file
View File

@@ -0,0 +1,131 @@
use std::ops::BitOr;
use serde::{Deserialize, Serialize};
use super::flags::{FastFlag, IndexedFlag, SchemaFlagList, StoredFlag};
use super::Cardinality;
/// Define how an ip field should be handled by tantivy.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct IpOptions {
indexed: bool,
#[serde(skip_serializing_if = "Option::is_none")]
fast: Option<Cardinality>,
stored: bool,
}
impl IpOptions {
/// Returns true iff the value is a fast field.
pub fn is_fast(&self) -> bool {
self.fast.is_some()
}
/// Returns `true` if the json object should be stored.
pub fn is_stored(&self) -> bool {
self.stored
}
/// Returns `true` iff the json object should be indexed.
pub fn is_indexed(&self) -> bool {
self.indexed
}
/// Returns the cardinality of the fastfield.
///
/// If the field has not been declared as a fastfield, then
/// the method returns None.
pub fn get_fastfield_cardinality(&self) -> Option<Cardinality> {
self.fast
}
/// Set the field as indexed.
///
/// Setting an integer as indexed will generate
/// a posting list for each value taken by the integer.
///
/// This is required for the field to be searchable.
#[must_use]
pub fn set_indexed(mut self) -> Self {
self.indexed = true;
self
}
/// Sets the field as stored
#[must_use]
pub fn set_stored(mut self) -> Self {
self.stored = true;
self
}
/// Set the field as a fast field.
///
/// Fast fields are designed for random access.
/// Access time are similar to a random lookup in an array.
/// If more than one value is associated to a fast field, only the last one is
/// kept.
#[must_use]
pub fn set_fast(mut self, cardinality: Cardinality) -> Self {
self.fast = Some(cardinality);
self
}
}
impl From<()> for IpOptions {
fn from(_: ()) -> IpOptions {
IpOptions::default()
}
}
impl From<FastFlag> for IpOptions {
fn from(_: FastFlag) -> Self {
IpOptions {
indexed: false,
stored: false,
fast: Some(Cardinality::SingleValue),
}
}
}
impl From<StoredFlag> for IpOptions {
fn from(_: StoredFlag) -> Self {
IpOptions {
indexed: false,
stored: true,
fast: None,
}
}
}
impl From<IndexedFlag> for IpOptions {
fn from(_: IndexedFlag) -> Self {
IpOptions {
indexed: true,
stored: false,
fast: None,
}
}
}
impl<T: Into<IpOptions>> BitOr<T> for IpOptions {
type Output = IpOptions;
fn bitor(self, other: T) -> IpOptions {
let other = other.into();
IpOptions {
indexed: self.indexed | other.indexed,
stored: self.stored | other.stored,
fast: self.fast.or(other.fast),
}
}
}
impl<Head, Tail> From<SchemaFlagList<Head, Tail>> for IpOptions
where
Head: Clone,
Tail: Clone,
Self: BitOr<Output = Self> + From<Head> + From<Tail>,
{
fn from(head_tail: SchemaFlagList<Head, Tail>) -> Self {
Self::from(head_tail.head) | Self::from(head_tail.tail)
}
}

View File

@@ -121,6 +121,7 @@ mod date_time_options;
mod field;
mod flags;
mod index_record_option;
mod ip_options;
mod json_object_options;
mod named_field_document;
mod numeric_options;
@@ -139,6 +140,7 @@ pub use self::field_type::{FieldType, Type};
pub use self::field_value::FieldValue;
pub use self::flags::{FAST, INDEXED, STORED};
pub use self::index_record_option::IndexRecordOption;
pub use self::ip_options::IpOptions;
pub use self::json_object_options::JsonObjectOptions;
pub use self::named_field_document::NamedFieldDocument;
pub use self::numeric_options::NumericOptions;

View File

@@ -7,6 +7,7 @@ use serde::ser::SerializeSeq;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_json::{self, Value as JsonValue};
use super::ip_options::IpOptions;
use super::*;
use crate::schema::bytes_options::BytesOptions;
use crate::schema::field_type::ValueParsingError;
@@ -144,6 +145,28 @@ impl SchemaBuilder {
self.add_field(field_entry)
}
/// Adds a ip field.
/// Returns the associated field handle
/// Internally, Tantivy simply stores ips as u64,
/// while the user supplies IpAddr values for convenience.
///
/// # Caution
///
/// Appending two fields with the same name
/// will result in the shadowing of the first
/// by the second one.
/// The first field will get a field id
/// but only the second one will be indexed
pub fn add_ip_field<T: Into<IpOptions>>(
&mut self,
field_name_str: &str,
field_options: T,
) -> Field {
let field_name = String::from(field_name_str);
let field_entry = FieldEntry::new_ip(field_name, field_options.into());
self.add_field(field_entry)
}
/// Adds a new text field.
/// Returns the associated field handle
///

View File

@@ -415,6 +415,14 @@ fn debug_value_bytes(typ: Type, bytes: &[u8], f: &mut fmt::Formatter) -> fmt::Re
debug_value_bytes(typ, bytes, f)?;
}
}
Type::Ip => {
let s = as_str(bytes); // TODO: change when serialization changes
write_opt(f, s)?;
}
Type::U128 => {
let s = as_str(bytes); // TODO: change when serialization changes
write_opt(f, s)?;
}
}
Ok(())
}

View File

@@ -1,4 +1,5 @@
use std::fmt;
use std::net::IpAddr;
use serde::de::Visitor;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
@@ -32,6 +33,8 @@ pub enum Value {
Bytes(Vec<u8>),
/// Json object value.
JsonObject(serde_json::Map<String, serde_json::Value>),
/// Ip
Ip(IpAddr),
}
impl Eq for Value {}
@@ -50,6 +53,7 @@ impl Serialize for Value {
Value::Facet(ref facet) => facet.serialize(serializer),
Value::Bytes(ref bytes) => serializer.serialize_bytes(bytes),
Value::JsonObject(ref obj) => obj.serialize(serializer),
Value::Ip(ref obj) => obj.serialize(serializer), // TODO check serialization
}
}
}
@@ -201,6 +205,16 @@ impl Value {
None
}
}
/// Returns the ip addr, provided the value is of the `Ip` type.
/// (Returns None if the value is not of the `Ip` type)
pub fn as_ip(&self) -> Option<IpAddr> {
if let Value::Ip(val) = self {
Some(*val)
} else {
None
}
}
}
impl From<String> for Value {
@@ -209,6 +223,12 @@ impl From<String> for Value {
}
}
impl From<IpAddr> for Value {
fn from(v: IpAddr) -> Value {
Value::Ip(v)
}
}
impl From<u64> for Value {
fn from(v: u64) -> Value {
Value::U64(v)
@@ -287,7 +307,9 @@ impl From<serde_json::Value> for Value {
}
mod binary_serialize {
use std::io::{self, Read, Write};
use std::io::{self, ErrorKind, Read, Write};
use std::net::IpAddr;
use std::str::FromStr;
use common::{f64_to_u64, u64_to_f64, BinarySerializable};
@@ -306,6 +328,7 @@ mod binary_serialize {
const EXT_CODE: u8 = 7;
const JSON_OBJ_CODE: u8 = 8;
const BOOL_CODE: u8 = 9;
const IP_CODE: u8 = 10;
// extended types
@@ -366,6 +389,10 @@ mod binary_serialize {
serde_json::to_writer(writer, &map)?;
Ok(())
}
Value::Ip(ref ip) => {
IP_CODE.serialize(writer)?;
ip.to_string().serialize(writer) // TODO Check best format
}
}
}
@@ -436,6 +463,13 @@ mod binary_serialize {
let json_map = <serde_json::Map::<String, serde_json::Value> as serde::Deserialize>::deserialize(&mut de)?;
Ok(Value::JsonObject(json_map))
}
IP_CODE => {
let text = String::deserialize(reader)?;
Ok(Value::Ip(IpAddr::from_str(&text).map_err(|err| {
io::Error::new(ErrorKind::Other, err.to_string())
})?))
}
_ => Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("No field type is associated with code {:?}", type_code),