Merge pull request #1553 from quickwit-oss/ip_field

ip field
This commit is contained in:
PSeitz
2022-10-11 13:09:47 +08:00
committed by GitHub
30 changed files with 1285 additions and 124 deletions

View File

@@ -107,6 +107,19 @@ impl FixedSize for u64 {
const SIZE_IN_BYTES: usize = 8;
}
impl BinarySerializable for u128 {
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
writer.write_u128::<Endianness>(*self)
}
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
reader.read_u128::<Endianness>()
}
}
impl FixedSize for u128 {
const SIZE_IN_BYTES: usize = 16;
}
impl BinarySerializable for f32 {
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
writer.write_f32::<Endianness>(*self)

View File

@@ -100,9 +100,10 @@ mod tests {
fn get_u128_column_from_data(data: &[u128]) -> Arc<dyn Column<u128>> {
let mut out = vec![];
serialize_u128(VecColumn::from(&data), &mut out).unwrap();
let iter_gen = || data.iter().cloned();
serialize_u128(iter_gen, data.len() as u64, &mut out).unwrap();
let out = OwnedBytes::new(out);
open_u128(out).unwrap()
open_u128::<u128>(out).unwrap()
}
#[bench]

View File

@@ -3,6 +3,8 @@ use std::ops::RangeInclusive;
use tantivy_bitpacker::minmax;
use crate::monotonic_mapping::StrictlyMonotonicFn;
pub trait Column<T: PartialOrd = u64>: Send + Sync {
/// Return the value associated with the given idx.
///
@@ -143,16 +145,30 @@ struct MonotonicMappingColumn<C, T, Input> {
_phantom: PhantomData<Input>,
}
/// Creates a view of a column transformed by a monotonic mapping.
pub fn monotonic_map_column<C, T, Input: PartialOrd, Output: PartialOrd>(
/// Creates a view of a column transformed by a strictly monotonic mapping. See
/// [`StrictlyMonotonicFn`].
///
/// E.g. apply a gcd monotonic_mapping([100, 200, 300]) == [1, 2, 3]
/// monotonic_mapping.mapping() is expected to be injective, and we should always have
/// monotonic_mapping.inverse(monotonic_mapping.mapping(el)) == el
///
/// The inverse of the mapping is required for:
/// `fn get_between_vals(&self, range: RangeInclusive<T>) -> Vec<u64> `
/// The user provides the original value range and we need to monotonic map them in the same way the
/// serialization does before calling the underlying column.
///
/// Note that when opening a codec, the monotonic_mapping should be the inverse of the mapping
/// during serialization. And therefore the monotonic_mapping_inv when opening is the same as
/// monotonic_mapping during serialization.
pub fn monotonic_map_column<C, T, Input, Output>(
from_column: C,
monotonic_mapping: T,
) -> impl Column<Output>
where
C: Column<Input>,
T: Fn(Input) -> Output + Send + Sync,
Input: Send + Sync,
Output: Send + Sync,
T: StrictlyMonotonicFn<Input, Output> + Send + Sync,
Input: PartialOrd + Send + Sync + Clone,
Output: PartialOrd + Send + Sync + Clone,
{
MonotonicMappingColumn {
from_column,
@@ -161,28 +177,27 @@ where
}
}
impl<C, T, Input: PartialOrd, Output: PartialOrd> Column<Output>
for MonotonicMappingColumn<C, T, Input>
impl<C, T, Input, Output> Column<Output> for MonotonicMappingColumn<C, T, Input>
where
C: Column<Input>,
T: Fn(Input) -> Output + Send + Sync,
Input: Send + Sync,
Output: Send + Sync,
T: StrictlyMonotonicFn<Input, Output> + Send + Sync,
Input: PartialOrd + Send + Sync + Clone,
Output: PartialOrd + Send + Sync + Clone,
{
#[inline]
fn get_val(&self, idx: u64) -> Output {
let from_val = self.from_column.get_val(idx);
(self.monotonic_mapping)(from_val)
self.monotonic_mapping.mapping(from_val)
}
fn min_value(&self) -> Output {
let from_min_value = self.from_column.min_value();
(self.monotonic_mapping)(from_min_value)
self.monotonic_mapping.mapping(from_min_value)
}
fn max_value(&self) -> Output {
let from_max_value = self.from_column.max_value();
(self.monotonic_mapping)(from_max_value)
self.monotonic_mapping.mapping(from_max_value)
}
fn num_vals(&self) -> u64 {
@@ -190,7 +205,18 @@ where
}
fn iter(&self) -> Box<dyn Iterator<Item = Output> + '_> {
Box::new(self.from_column.iter().map(&self.monotonic_mapping))
Box::new(
self.from_column
.iter()
.map(|el| self.monotonic_mapping.mapping(el)),
)
}
fn get_between_vals(&self, range: RangeInclusive<Output>) -> Vec<u64> {
self.from_column.get_between_vals(
self.monotonic_mapping.inverse(range.start().clone())
..=self.monotonic_mapping.inverse(range.end().clone()),
)
}
// We voluntarily do not implement get_range as it yields a regression,
@@ -236,19 +262,22 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::MonotonicallyMappableToU64;
use crate::monotonic_mapping::{
StrictlyMonotonicMappingInverter, StrictlyMonotonicMappingToInternalBaseval,
StrictlyMonotonicMappingToInternalGCDBaseval,
};
#[test]
fn test_monotonic_mapping() {
let vals = &[1u64, 3u64][..];
let vals = &[3u64, 5u64][..];
let col = VecColumn::from(vals);
let mapped = monotonic_map_column(col, |el| el + 4);
assert_eq!(mapped.min_value(), 5u64);
assert_eq!(mapped.max_value(), 7u64);
let mapped = monotonic_map_column(col, StrictlyMonotonicMappingToInternalBaseval::new(2));
assert_eq!(mapped.min_value(), 1u64);
assert_eq!(mapped.max_value(), 3u64);
assert_eq!(mapped.num_vals(), 2);
assert_eq!(mapped.num_vals(), 2);
assert_eq!(mapped.get_val(0), 5);
assert_eq!(mapped.get_val(1), 7);
assert_eq!(mapped.get_val(0), 1);
assert_eq!(mapped.get_val(1), 3);
}
#[test]
@@ -260,10 +289,15 @@ mod tests {
#[test]
fn test_monotonic_mapping_iter() {
let vals: Vec<u64> = (-1..99).map(i64::to_u64).collect();
let vals: Vec<u64> = (10..110u64).map(|el| el * 10).collect();
let col = VecColumn::from(&vals);
let mapped = monotonic_map_column(col, |el| i64::from_u64(el) * 10i64);
let val_i64s: Vec<i64> = mapped.iter().collect();
let mapped = monotonic_map_column(
col,
StrictlyMonotonicMappingInverter::from(
StrictlyMonotonicMappingToInternalGCDBaseval::new(10, 100),
),
);
let val_i64s: Vec<u64> = mapped.iter().collect();
for i in 0..100 {
assert_eq!(val_i64s[i as usize], mapped.get_val(i));
}
@@ -271,20 +305,26 @@ mod tests {
#[test]
fn test_monotonic_mapping_get_range() {
let vals: Vec<u64> = (-1..99).map(i64::to_u64).collect();
let vals: Vec<u64> = (0..100u64).map(|el| el * 10).collect();
let col = VecColumn::from(&vals);
let mapped = monotonic_map_column(col, |el| i64::from_u64(el) * 10i64);
assert_eq!(mapped.min_value(), -10i64);
assert_eq!(mapped.max_value(), 980i64);
let mapped = monotonic_map_column(
col,
StrictlyMonotonicMappingInverter::from(
StrictlyMonotonicMappingToInternalGCDBaseval::new(10, 0),
),
);
assert_eq!(mapped.min_value(), 0u64);
assert_eq!(mapped.max_value(), 9900u64);
assert_eq!(mapped.num_vals(), 100);
let val_i64s: Vec<i64> = mapped.iter().collect();
assert_eq!(val_i64s.len(), 100);
let val_u64s: Vec<u64> = mapped.iter().collect();
assert_eq!(val_u64s.len(), 100);
for i in 0..100 {
assert_eq!(val_i64s[i as usize], mapped.get_val(i));
assert_eq!(val_i64s[i as usize], i64::from_u64(vals[i as usize]) * 10);
assert_eq!(val_u64s[i as usize], mapped.get_val(i));
assert_eq!(val_u64s[i as usize], vals[i as usize] * 10);
}
let mut buf = [0i64; 20];
let mut buf = [0u64; 20];
mapped.get_range(7, &mut buf[..]);
assert_eq!(&val_i64s[7..][..20], &buf);
assert_eq!(&val_u64s[7..][..20], &buf);
}
}

View File

@@ -171,10 +171,10 @@ pub struct IPCodecParams {
impl CompactSpaceCompressor {
/// Taking the vals as Vec may cost a lot of memory. It is used to sort the vals.
pub fn train_from(column: &impl Column<u128>) -> Self {
pub fn train_from(iter: impl Iterator<Item = u128>, num_vals: u64) -> Self {
let mut values_sorted = BTreeSet::new();
values_sorted.extend(column.iter());
let total_num_values = column.num_vals();
values_sorted.extend(iter);
let total_num_values = num_vals;
let compact_space =
get_compact_space(&values_sorted, total_num_values, COST_PER_BLANK_IN_BITS);
@@ -443,7 +443,7 @@ impl CompactSpaceDecompressor {
mod tests {
use super::*;
use crate::{open_u128, serialize_u128, VecColumn};
use crate::{open_u128, serialize_u128};
#[test]
fn compact_space_test() {
@@ -513,7 +513,12 @@ mod tests {
fn test_aux_vals(u128_vals: &[u128]) -> OwnedBytes {
let mut out = Vec::new();
serialize_u128(VecColumn::from(u128_vals), &mut out).unwrap();
serialize_u128(
|| u128_vals.iter().cloned(),
u128_vals.len() as u64,
&mut out,
)
.unwrap();
let data = OwnedBytes::new(out);
test_all(data.clone(), u128_vals);
@@ -603,8 +608,8 @@ mod tests {
5_000_000_000,
];
let mut out = Vec::new();
serialize_u128(VecColumn::from(vals), &mut out).unwrap();
let decomp = open_u128(OwnedBytes::new(out)).unwrap();
serialize_u128(|| vals.iter().cloned(), vals.len() as u64, &mut out).unwrap();
let decomp = open_u128::<u128>(OwnedBytes::new(out)).unwrap();
assert_eq!(decomp.get_between_vals(199..=200), vec![0]);
assert_eq!(decomp.get_between_vals(199..=201), vec![0, 1]);

View File

@@ -13,6 +13,10 @@ use std::sync::Arc;
use common::BinarySerializable;
use compact_space::CompactSpaceDecompressor;
use monotonic_mapping::{
StrictlyMonotonicMappingInverter, StrictlyMonotonicMappingToInternal,
StrictlyMonotonicMappingToInternalBaseval, StrictlyMonotonicMappingToInternalGCDBaseval,
};
use ownedbytes::OwnedBytes;
use serialize::Header;
@@ -22,6 +26,7 @@ mod compact_space;
mod line;
mod linear;
mod monotonic_mapping;
mod monotonic_mapping_u128;
mod column;
mod gcd;
@@ -31,7 +36,8 @@ use self::bitpacked::BitpackedCodec;
use self::blockwise_linear::BlockwiseLinearCodec;
pub use self::column::{monotonic_map_column, Column, VecColumn};
use self::linear::LinearCodec;
pub use self::monotonic_mapping::MonotonicallyMappableToU64;
pub use self::monotonic_mapping::{MonotonicallyMappableToU64, StrictlyMonotonicFn};
pub use self::monotonic_mapping_u128::MonotonicallyMappableToU128;
pub use self::serialize::{
estimate, serialize, serialize_and_load, serialize_u128, NormalizedHeader,
};
@@ -73,8 +79,13 @@ impl FastFieldCodecType {
}
/// Returns the correct codec reader wrapped in the `Arc` for the data.
pub fn open_u128(bytes: OwnedBytes) -> io::Result<Arc<dyn Column<u128>>> {
Ok(Arc::new(CompactSpaceDecompressor::open(bytes)?))
pub fn open_u128<Item: MonotonicallyMappableToU128>(
bytes: OwnedBytes,
) -> io::Result<Arc<dyn Column<Item>>> {
let reader = CompactSpaceDecompressor::open(bytes)?;
let inverted: StrictlyMonotonicMappingInverter<StrictlyMonotonicMappingToInternal<Item>> =
StrictlyMonotonicMappingToInternal::<Item>::new().into();
Ok(Arc::new(monotonic_map_column(reader, inverted)))
}
/// Returns the correct codec reader wrapped in the `Arc` for the data.
@@ -99,11 +110,15 @@ fn open_specific_codec<C: FastFieldCodec, Item: MonotonicallyMappableToU64>(
let reader = C::open_from_bytes(bytes, normalized_header)?;
let min_value = header.min_value;
if let Some(gcd) = header.gcd {
let monotonic_mapping = move |val: u64| Item::from_u64(min_value + val * gcd.get());
Ok(Arc::new(monotonic_map_column(reader, monotonic_mapping)))
let mapping = StrictlyMonotonicMappingInverter::from(
StrictlyMonotonicMappingToInternalGCDBaseval::new(gcd.get(), min_value),
);
Ok(Arc::new(monotonic_map_column(reader, mapping)))
} else {
let monotonic_mapping = move |val: u64| Item::from_u64(min_value + val);
Ok(Arc::new(monotonic_map_column(reader, monotonic_mapping)))
let mapping = StrictlyMonotonicMappingInverter::from(
StrictlyMonotonicMappingToInternalBaseval::new(min_value),
);
Ok(Arc::new(monotonic_map_column(reader, mapping)))
}
}
@@ -143,6 +158,7 @@ pub const ALL_CODEC_TYPES: [FastFieldCodecType; 3] = [
#[cfg(test)]
mod tests {
use proptest::prelude::*;
use proptest::strategy::Strategy;
use proptest::{prop_oneof, proptest};
@@ -177,6 +193,18 @@ mod tests {
`{data:?}`",
);
}
if !data.is_empty() {
let test_rand_idx = rand::thread_rng().gen_range(0..=data.len() - 1);
let expected_positions: Vec<u64> = data
.iter()
.enumerate()
.filter(|(_, el)| **el == data[test_rand_idx])
.map(|(pos, _)| pos as u64)
.collect();
let positions = reader.get_between_vals(data[test_rand_idx]..=data[test_rand_idx]);
assert_eq!(expected_positions, positions);
}
Some((estimation, actual_compression))
}

View File

@@ -90,7 +90,7 @@ fn bench_ip() {
{
let mut data = vec![];
for dataset in dataset.chunks(500_000) {
serialize_u128(VecColumn::from(dataset), &mut data).unwrap();
serialize_u128(|| dataset.iter().cloned(), dataset.len() as u64, &mut data).unwrap();
}
let compression = data.len() as f64 / (dataset.len() * 16) as f64;
println!("Compression 50_000 chunks {:.4}", compression);
@@ -101,7 +101,10 @@ fn bench_ip() {
}
let mut data = vec![];
serialize_u128(VecColumn::from(&dataset), &mut data).unwrap();
{
print_time!("creation");
serialize_u128(|| dataset.iter().cloned(), dataset.len() as u64, &mut data).unwrap();
}
let compression = data.len() as f64 / (dataset.len() * 16) as f64;
println!("Compression {:.2}", compression);
@@ -110,7 +113,7 @@ fn bench_ip() {
(data.len() * 8) as f32 / dataset.len() as f32
);
let decompressor = open_u128(OwnedBytes::new(data)).unwrap();
let decompressor = open_u128::<u128>(OwnedBytes::new(data)).unwrap();
// Sample some ranges
for value in dataset.iter().take(1110).skip(1100).cloned() {
print_time!("get range");

View File

@@ -1,3 +1,9 @@
use std::marker::PhantomData;
use fastdivide::DividerU64;
use crate::MonotonicallyMappableToU128;
pub trait MonotonicallyMappableToU64: 'static + PartialOrd + Copy + Send + Sync {
/// Converts a value to u64.
///
@@ -11,6 +17,145 @@ pub trait MonotonicallyMappableToU64: 'static + PartialOrd + Copy + Send + Sync
fn from_u64(val: u64) -> Self;
}
/// Values need to be strictly monotonic mapped to a `Internal` value (u64 or u128) that can be
/// used in fast field codecs.
///
/// The monotonic mapping is required so that `PartialOrd` can be used on `Internal` without
/// converting to `External`.
///
/// All strictly monotonic functions are invertible because they are guaranteed to have a one-to-one
/// mapping from their range to their domain. The `inverse` method is required when opening a codec,
/// so a value can be converted back to its original domain (e.g. ip address or f64) from its
/// internal representation.
pub trait StrictlyMonotonicFn<External, Internal> {
/// Strictly monotonically maps the value from External to Internal.
fn mapping(&self, inp: External) -> Internal;
/// Inverse of `mapping`. Maps the value from Internal to External.
fn inverse(&self, out: Internal) -> External;
}
/// Inverts a strictly monotonic mapping from `StrictlyMonotonicFn<A, B>` to
/// `StrictlyMonotonicFn<B, A>`.
///
/// # Warning
///
/// This type comes with a footgun. A type being strictly monotonic does not impose that the inverse
/// mapping is strictly monotonic over the entire space External. e.g. a -> a * 2. Use at your own
/// risks.
pub(crate) struct StrictlyMonotonicMappingInverter<T> {
orig_mapping: T,
}
impl<T> From<T> for StrictlyMonotonicMappingInverter<T> {
fn from(orig_mapping: T) -> Self {
Self { orig_mapping }
}
}
impl<From, To, T> StrictlyMonotonicFn<To, From> for StrictlyMonotonicMappingInverter<T>
where T: StrictlyMonotonicFn<From, To>
{
fn mapping(&self, val: To) -> From {
self.orig_mapping.inverse(val)
}
fn inverse(&self, val: From) -> To {
self.orig_mapping.mapping(val)
}
}
/// Applies the strictly monotonic mapping from `T` without any additional changes.
pub(crate) struct StrictlyMonotonicMappingToInternal<T> {
_phantom: PhantomData<T>,
}
impl<T> StrictlyMonotonicMappingToInternal<T> {
pub(crate) fn new() -> StrictlyMonotonicMappingToInternal<T> {
Self {
_phantom: PhantomData,
}
}
}
impl<External: MonotonicallyMappableToU128, T: MonotonicallyMappableToU128>
StrictlyMonotonicFn<External, u128> for StrictlyMonotonicMappingToInternal<T>
where T: MonotonicallyMappableToU128
{
fn mapping(&self, inp: External) -> u128 {
External::to_u128(inp)
}
fn inverse(&self, out: u128) -> External {
External::from_u128(out)
}
}
impl<External: MonotonicallyMappableToU64, T: MonotonicallyMappableToU64>
StrictlyMonotonicFn<External, u64> for StrictlyMonotonicMappingToInternal<T>
where T: MonotonicallyMappableToU64
{
fn mapping(&self, inp: External) -> u64 {
External::to_u64(inp)
}
fn inverse(&self, out: u64) -> External {
External::from_u64(out)
}
}
/// Mapping dividing by gcd and a base value.
///
/// The function is assumed to be only called on values divided by passed
/// gcd value. (It is necessary for the function to be monotonic.)
pub(crate) struct StrictlyMonotonicMappingToInternalGCDBaseval {
gcd_divider: DividerU64,
gcd: u64,
min_value: u64,
}
impl StrictlyMonotonicMappingToInternalGCDBaseval {
pub(crate) fn new(gcd: u64, min_value: u64) -> Self {
let gcd_divider = DividerU64::divide_by(gcd);
Self {
gcd_divider,
gcd,
min_value,
}
}
}
impl<External: MonotonicallyMappableToU64> StrictlyMonotonicFn<External, u64>
for StrictlyMonotonicMappingToInternalGCDBaseval
{
fn mapping(&self, inp: External) -> u64 {
self.gcd_divider
.divide(External::to_u64(inp) - self.min_value)
}
fn inverse(&self, out: u64) -> External {
External::from_u64(self.min_value + out * self.gcd)
}
}
/// Strictly monotonic mapping with a base value.
pub(crate) struct StrictlyMonotonicMappingToInternalBaseval {
min_value: u64,
}
impl StrictlyMonotonicMappingToInternalBaseval {
pub(crate) fn new(min_value: u64) -> Self {
Self { min_value }
}
}
impl<External: MonotonicallyMappableToU64> StrictlyMonotonicFn<External, u64>
for StrictlyMonotonicMappingToInternalBaseval
{
fn mapping(&self, val: External) -> u64 {
External::to_u64(val) - self.min_value
}
fn inverse(&self, val: u64) -> External {
External::from_u64(self.min_value + val)
}
}
impl MonotonicallyMappableToU64 for u64 {
fn to_u64(self) -> u64 {
self
@@ -54,3 +199,33 @@ impl MonotonicallyMappableToU64 for f64 {
common::u64_to_f64(val)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn strictly_monotonic_test() {
// identity mapping
test_round_trip(&StrictlyMonotonicMappingToInternal::<u64>::new(), 100u64);
// round trip to i64
test_round_trip(&StrictlyMonotonicMappingToInternal::<i64>::new(), 100u64);
// identity mapping
test_round_trip(&StrictlyMonotonicMappingToInternal::<u128>::new(), 100u128);
// base value to i64 round trip
let mapping = StrictlyMonotonicMappingToInternalBaseval::new(100);
test_round_trip::<_, _, u64>(&mapping, 100i64);
// base value and gcd to u64 round trip
let mapping = StrictlyMonotonicMappingToInternalGCDBaseval::new(10, 100);
test_round_trip::<_, _, u64>(&mapping, 100u64);
}
fn test_round_trip<T: StrictlyMonotonicFn<K, L>, K: std::fmt::Debug + Eq + Copy, L>(
mapping: &T,
test_val: K,
) {
assert_eq!(mapping.inverse(mapping.mapping(test_val)), test_val);
}
}

View File

@@ -1,4 +1,4 @@
use std::net::{IpAddr, Ipv6Addr};
use std::net::Ipv6Addr;
pub trait MonotonicallyMappableToU128: 'static + PartialOrd + Copy + Send + Sync {
/// Converts a value to u128.
@@ -23,20 +23,16 @@ impl MonotonicallyMappableToU128 for u128 {
}
}
impl MonotonicallyMappableToU128 for IpAddr {
impl MonotonicallyMappableToU128 for Ipv6Addr {
fn to_u128(self) -> u128 {
ip_to_u128(self)
}
fn from_u128(val: u128) -> Self {
IpAddr::from(val.to_be_bytes())
Ipv6Addr::from(val.to_be_bytes())
}
}
fn ip_to_u128(ip_addr: IpAddr) -> u128 {
let ip_addr_v6: Ipv6Addr = match ip_addr {
IpAddr::V4(v4) => v4.to_ipv6_mapped(),
IpAddr::V6(v6) => v6,
};
u128::from_be_bytes(ip_addr_v6.octets())
fn ip_to_u128(ip_addr: Ipv6Addr) -> u128 {
u128::from_be_bytes(ip_addr.octets())
}

View File

@@ -22,7 +22,6 @@ use std::num::NonZeroU64;
use std::sync::Arc;
use common::{BinarySerializable, VInt};
use fastdivide::DividerU64;
use log::warn;
use ownedbytes::OwnedBytes;
@@ -30,6 +29,10 @@ use crate::bitpacked::BitpackedCodec;
use crate::blockwise_linear::BlockwiseLinearCodec;
use crate::compact_space::CompactSpaceCompressor;
use crate::linear::LinearCodec;
use crate::monotonic_mapping::{
StrictlyMonotonicFn, StrictlyMonotonicMappingToInternal,
StrictlyMonotonicMappingToInternalGCDBaseval,
};
use crate::{
monotonic_map_column, Column, FastFieldCodec, FastFieldCodecType, MonotonicallyMappableToU64,
VecColumn, ALL_CODEC_TYPES,
@@ -57,8 +60,11 @@ pub(crate) struct Header {
impl Header {
pub fn normalized(self) -> NormalizedHeader {
let max_value =
(self.max_value - self.min_value) / self.gcd.map(|gcd| gcd.get()).unwrap_or(1);
let gcd = self.gcd.map(|gcd| gcd.get()).unwrap_or(1);
let gcd_min_val_mapping =
StrictlyMonotonicMappingToInternalGCDBaseval::new(gcd, self.min_value);
let max_value = gcd_min_val_mapping.mapping(self.max_value);
NormalizedHeader {
num_vals: self.num_vals,
max_value,
@@ -66,10 +72,7 @@ impl Header {
}
pub fn normalize_column<C: Column>(&self, from_column: C) -> impl Column {
let min_value = self.min_value;
let gcd = self.gcd.map(|gcd| gcd.get()).unwrap_or(1);
let divider = DividerU64::divide_by(gcd);
monotonic_map_column(from_column, move |val| divider.divide(val - min_value))
normalize_column(from_column, self.min_value, self.gcd)
}
pub fn compute_header(
@@ -81,9 +84,8 @@ impl Header {
let max_value = column.max_value();
let gcd = crate::gcd::find_gcd(column.iter().map(|val| val - min_value))
.filter(|gcd| gcd.get() > 1u64);
let divider = DividerU64::divide_by(gcd.map(|gcd| gcd.get()).unwrap_or(1u64));
let shifted_column = monotonic_map_column(&column, |val| divider.divide(val - min_value));
let codec_type = detect_codec(shifted_column, codecs)?;
let normalized_column = normalize_column(column, min_value, gcd);
let codec_type = detect_codec(normalized_column, codecs)?;
Some(Header {
num_vals,
min_value,
@@ -94,6 +96,16 @@ impl Header {
}
}
pub fn normalize_column<C: Column>(
from_column: C,
min_value: u64,
gcd: Option<NonZeroU64>,
) -> impl Column {
let gcd = gcd.map(|gcd| gcd.get()).unwrap_or(1);
let mapping = StrictlyMonotonicMappingToInternalGCDBaseval::new(gcd, min_value);
monotonic_map_column(from_column, mapping)
}
impl BinarySerializable for Header {
fn serialize<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
VInt(self.num_vals).serialize(writer)?;
@@ -129,12 +141,15 @@ pub fn estimate<T: MonotonicallyMappableToU64>(
typed_column: impl Column<T>,
codec_type: FastFieldCodecType,
) -> Option<f32> {
let column = monotonic_map_column(typed_column, T::to_u64);
let column = monotonic_map_column(typed_column, StrictlyMonotonicMappingToInternal::<T>::new());
let min_value = column.min_value();
let gcd = crate::gcd::find_gcd(column.iter().map(|val| val - min_value))
.filter(|gcd| gcd.get() > 1u64);
let divider = DividerU64::divide_by(gcd.map(|gcd| gcd.get()).unwrap_or(1u64));
let normalized_column = monotonic_map_column(&column, |val| divider.divide(val - min_value));
let mapping = StrictlyMonotonicMappingToInternalGCDBaseval::new(
gcd.map(|gcd| gcd.get()).unwrap_or(1u64),
min_value,
);
let normalized_column = monotonic_map_column(&column, mapping);
match codec_type {
FastFieldCodecType::Bitpacked => BitpackedCodec::estimate(&normalized_column),
FastFieldCodecType::Linear => LinearCodec::estimate(&normalized_column),
@@ -142,15 +157,14 @@ pub fn estimate<T: MonotonicallyMappableToU64>(
}
}
pub fn serialize_u128(
typed_column: impl Column<u128>,
pub fn serialize_u128<F: Fn() -> I, I: Iterator<Item = u128>>(
iter_gen: F,
num_vals: u64,
output: &mut impl io::Write,
) -> io::Result<()> {
// TODO write header, to later support more codecs
let compressor = CompactSpaceCompressor::train_from(&typed_column);
compressor
.compress_into(typed_column.iter(), output)
.unwrap();
let compressor = CompactSpaceCompressor::train_from(iter_gen(), num_vals);
compressor.compress_into(iter_gen(), output).unwrap();
Ok(())
}
@@ -160,7 +174,7 @@ pub fn serialize<T: MonotonicallyMappableToU64>(
output: &mut impl io::Write,
codecs: &[FastFieldCodecType],
) -> io::Result<()> {
let column = monotonic_map_column(typed_column, T::to_u64);
let column = monotonic_map_column(typed_column, StrictlyMonotonicMappingToInternal::<T>::new());
let header = Header::compute_header(&column, codecs).ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,

View File

@@ -27,7 +27,10 @@ pub use self::bytes::{BytesFastFieldReader, BytesFastFieldWriter};
pub use self::error::{FastFieldNotAvailableError, Result};
pub use self::facet_reader::FacetReader;
pub(crate) use self::multivalued::{get_fastfield_codecs_for_multivalue, MultivalueStartIndex};
pub use self::multivalued::{MultiValuedFastFieldReader, MultiValuedFastFieldWriter};
pub use self::multivalued::{
MultiValueU128FastFieldWriter, MultiValuedFastFieldReader, MultiValuedFastFieldWriter,
MultiValuedU128FastFieldReader,
};
pub use self::readers::FastFieldReaders;
pub(crate) use self::readers::{type_and_cardinality, FastType};
pub use self::serializer::{Column, CompositeFastFieldSerializer};

View File

@@ -3,9 +3,9 @@ mod writer;
use fastfield_codecs::FastFieldCodecType;
pub use self::reader::MultiValuedFastFieldReader;
pub use self::writer::MultiValuedFastFieldWriter;
pub use self::reader::{MultiValuedFastFieldReader, MultiValuedU128FastFieldReader};
pub(crate) use self::writer::MultivalueStartIndex;
pub use self::writer::{MultiValueU128FastFieldWriter, MultiValuedFastFieldWriter};
/// The valid codecs for multivalue values excludes the linear interpolation codec.
///

View File

@@ -1,7 +1,7 @@
use std::ops::Range;
use std::ops::{Range, RangeInclusive};
use std::sync::Arc;
use fastfield_codecs::Column;
use fastfield_codecs::{Column, MonotonicallyMappableToU128};
use crate::fastfield::{FastValue, MultiValueLength};
use crate::DocId;
@@ -99,12 +99,176 @@ impl<Item: FastValue> MultiValueLength for MultiValuedFastFieldReader<Item> {
self.total_num_vals() as u64
}
}
/// Reader for a multivalued `u128` fast field.
///
/// The reader is implemented as a `u64` fast field for the index and a `u128` fast field.
///
/// The `vals_reader` will access the concatenated list of all
/// values for all reader.
/// The `idx_reader` associated, for each document, the index of its first value.
#[derive(Clone)]
pub struct MultiValuedU128FastFieldReader<T: MonotonicallyMappableToU128> {
idx_reader: Arc<dyn Column<u64>>,
vals_reader: Arc<dyn Column<T>>,
}
impl<T: MonotonicallyMappableToU128> MultiValuedU128FastFieldReader<T> {
pub(crate) fn open(
idx_reader: Arc<dyn Column<u64>>,
vals_reader: Arc<dyn Column<T>>,
) -> MultiValuedU128FastFieldReader<T> {
Self {
idx_reader,
vals_reader,
}
}
/// Returns `[start, end)`, such that the values associated
/// to the given document are `start..end`.
#[inline]
fn range(&self, doc: DocId) -> Range<u64> {
let start = self.idx_reader.get_val(doc as u64);
let end = self.idx_reader.get_val(doc as u64 + 1);
start..end
}
/// Returns the array of values associated to the given `doc`.
#[inline]
pub fn get_first_val(&self, doc: DocId) -> Option<T> {
let range = self.range(doc);
if range.is_empty() {
return None;
}
Some(self.vals_reader.get_val(range.start))
}
/// Returns the array of values associated to the given `doc`.
#[inline]
fn get_vals_for_range(&self, range: Range<u64>, vals: &mut Vec<T>) {
let len = (range.end - range.start) as usize;
vals.resize(len, T::from_u128(0));
self.vals_reader.get_range(range.start, &mut vals[..]);
}
/// Returns the array of values associated to the given `doc`.
#[inline]
pub fn get_vals(&self, doc: DocId, vals: &mut Vec<T>) {
let range = self.range(doc);
self.get_vals_for_range(range, vals);
}
/// Returns all docids which are in the provided value range
pub fn get_between_vals(&self, range: RangeInclusive<T>) -> Vec<DocId> {
let positions = self.vals_reader.get_between_vals(range);
positions_to_docids(&positions, self.idx_reader.as_ref())
}
/// Iterates over all elements in the fast field
pub fn iter(&self) -> impl Iterator<Item = T> + '_ {
self.vals_reader.iter()
}
/// Returns the minimum value for this fast field.
///
/// The min value does not take in account of possible
/// deleted document, and should be considered as a lower bound
/// of the actual mimimum value.
pub fn min_value(&self) -> T {
self.vals_reader.min_value()
}
/// Returns the maximum value for this fast field.
///
/// The max value does not take in account of possible
/// deleted document, and should be considered as an upper bound
/// of the actual maximum value.
pub fn max_value(&self) -> T {
self.vals_reader.max_value()
}
/// Returns the number of values associated with the document `DocId`.
#[inline]
pub fn num_vals(&self, doc: DocId) -> usize {
let range = self.range(doc);
(range.end - range.start) as usize
}
/// Returns the overall number of values in this field.
#[inline]
pub fn total_num_vals(&self) -> u64 {
self.idx_reader.max_value()
}
}
impl<T: MonotonicallyMappableToU128> MultiValueLength for MultiValuedU128FastFieldReader<T> {
fn get_range(&self, doc_id: DocId) -> std::ops::Range<u64> {
self.range(doc_id)
}
fn get_len(&self, doc_id: DocId) -> u64 {
self.num_vals(doc_id) as u64
}
fn get_total_len(&self) -> u64 {
self.total_num_vals() as u64
}
}
/// Converts a list of positions of values in a 1:n index to the corresponding list of DocIds.
///
/// Since there is no index for value pos -> docid, but docid -> value pos range, we scan the index.
///
/// Correctness: positions needs to be sorted. idx_reader needs to contain monotonically increasing
/// positions.
///
/// TODO: Instead of a linear scan we can employ a expotential search into binary search to match a
/// docid to its value position.
fn positions_to_docids<C: Column + ?Sized>(positions: &[u64], idx_reader: &C) -> Vec<DocId> {
let mut docs = vec![];
let mut cur_doc = 0u32;
let mut last_doc = None;
for pos in positions {
loop {
let end = idx_reader.get_val(cur_doc as u64 + 1);
if end > *pos {
// avoid duplicates
if Some(cur_doc) == last_doc {
break;
}
docs.push(cur_doc);
last_doc = Some(cur_doc);
break;
}
cur_doc += 1;
}
}
docs
}
#[cfg(test)]
mod tests {
use fastfield_codecs::VecColumn;
use crate::core::Index;
use crate::fastfield::multivalued::reader::positions_to_docids;
use crate::schema::{Cardinality, Facet, FacetOptions, NumericOptions, Schema};
#[test]
fn test_positions_to_docid() {
let positions = vec![10u64, 11, 15, 20, 21, 22];
let offsets = vec![0, 10, 12, 15, 22, 23];
{
let column = VecColumn::from(&offsets);
let docids = positions_to_docids(&positions, &column);
assert_eq!(docids, vec![1, 3, 4]);
}
}
#[test]
fn test_multifastfield_reader() -> crate::Result<()> {
let mut schema_builder = Schema::builder();

View File

@@ -1,6 +1,8 @@
use std::io;
use fastfield_codecs::{Column, MonotonicallyMappableToU64, VecColumn};
use fastfield_codecs::{
Column, MonotonicallyMappableToU128, MonotonicallyMappableToU64, VecColumn,
};
use fnv::FnvHashMap;
use super::get_fastfield_codecs_for_multivalue;
@@ -264,6 +266,143 @@ fn iter_remapped_multivalue_index<'a, C: Column>(
}))
}
/// Writer for multi-valued (as in, more than one value per document)
/// int fast field.
///
/// This `Writer` is only useful for advanced users.
/// The normal way to get your multivalued int in your index
/// is to
/// - declare your field with fast set to `Cardinality::MultiValues`
/// in your schema
/// - add your document simply by calling `.add_document(...)`.
///
/// The `MultiValuedFastFieldWriter` can be acquired from the
pub struct MultiValueU128FastFieldWriter {
field: Field,
vals: Vec<u128>,
doc_index: Vec<u64>,
}
impl MultiValueU128FastFieldWriter {
/// Creates a new `U128MultiValueFastFieldWriter`
pub(crate) fn new(field: Field) -> Self {
MultiValueU128FastFieldWriter {
field,
vals: Vec::new(),
doc_index: Vec::new(),
}
}
/// The memory used (inclusive childs)
pub fn mem_usage(&self) -> usize {
self.vals.capacity() * std::mem::size_of::<UnorderedTermId>()
+ self.doc_index.capacity() * std::mem::size_of::<u64>()
}
/// Finalize the current document.
pub(crate) fn next_doc(&mut self) {
self.doc_index.push(self.vals.len() as u64);
}
/// Pushes a new value to the current document.
pub(crate) fn add_val(&mut self, val: u128) {
self.vals.push(val);
}
/// Shift to the next document and adds
/// all of the matching field values present in the document.
pub fn add_document(&mut self, doc: &Document) {
self.next_doc();
for field_value in doc.field_values() {
if field_value.field == self.field {
let value = field_value.value();
let ip_addr = value
.as_ip_addr()
.unwrap_or_else(|| panic!("expected and ip, but got {:?}", value));
let ip_addr_u128 = ip_addr.to_u128();
self.add_val(ip_addr_u128);
}
}
}
/// Returns an iterator over values per doc_id in ascending doc_id order.
///
/// Normally the order is simply iterating self.doc_id_index.
/// With doc_id_map it accounts for the new mapping, returning values in the order of the
/// new doc_ids.
fn get_ordered_values<'a: 'b, 'b>(
&'a self,
doc_id_map: Option<&'b DocIdMapping>,
) -> impl Iterator<Item = &'b [u128]> {
get_ordered_values(&self.vals, &self.doc_index, doc_id_map)
}
/// Serializes fast field values.
pub fn serialize(
mut self,
serializer: &mut CompositeFastFieldSerializer,
doc_id_map: Option<&DocIdMapping>,
) -> io::Result<()> {
{
// writing the offset index
//
self.doc_index.push(self.vals.len() as u64);
let col = VecColumn::from(&self.doc_index[..]);
if let Some(doc_id_map) = doc_id_map {
let multi_value_start_index = MultivalueStartIndex::new(&col, doc_id_map);
serializer.create_auto_detect_u64_fast_field_with_idx(
self.field,
multi_value_start_index,
0,
)?;
} else {
serializer.create_auto_detect_u64_fast_field_with_idx(self.field, col, 0)?;
}
}
{
let iter_gen = || self.get_ordered_values(doc_id_map).flatten().cloned();
serializer.create_u128_fast_field_with_idx(
self.field,
iter_gen,
self.vals.len() as u64,
1,
)?;
}
Ok(())
}
}
/// Returns an iterator over values per doc_id in ascending doc_id order.
///
/// Normally the order is simply iterating self.doc_id_index.
/// With doc_id_map it accounts for the new mapping, returning values in the order of the
/// new doc_ids.
fn get_ordered_values<'a: 'b, 'b, T>(
vals: &'a [T],
doc_index: &'a [u64],
doc_id_map: Option<&'b DocIdMapping>,
) -> impl Iterator<Item = &'b [T]> {
let doc_id_iter: Box<dyn Iterator<Item = u32>> = if let Some(doc_id_map) = doc_id_map {
Box::new(doc_id_map.iter_old_doc_ids())
} else {
let max_doc = doc_index.len() as DocId;
Box::new(0..max_doc)
};
doc_id_iter.map(move |doc_id| get_values_for_doc_id(doc_id, vals, doc_index))
}
/// returns all values for a doc_id
fn get_values_for_doc_id<'a, T>(doc_id: u32, vals: &'a [T], doc_index: &'a [u64]) -> &'a [T] {
let start_pos = doc_index[doc_id as usize] as usize;
let end_pos = doc_index
.get(doc_id as usize + 1)
.cloned()
.unwrap_or(vals.len() as u64) as usize; // special case, last doc_id has no offset information
&vals[start_pos..end_pos]
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -1,7 +1,9 @@
use std::net::Ipv6Addr;
use std::sync::Arc;
use fastfield_codecs::{open, Column};
use fastfield_codecs::{open, open_u128, Column};
use super::multivalued::MultiValuedU128FastFieldReader;
use crate::directory::{CompositeFile, FileSlice};
use crate::fastfield::{
BytesFastFieldReader, FastFieldNotAvailableError, FastValue, MultiValuedFastFieldReader,
@@ -23,6 +25,7 @@ pub struct FastFieldReaders {
pub(crate) enum FastType {
I64,
U64,
U128,
F64,
Bool,
Date,
@@ -49,6 +52,9 @@ pub(crate) fn type_and_cardinality(field_type: &FieldType) -> Option<(FastType,
FieldType::Str(options) if options.is_fast() => {
Some((FastType::U64, Cardinality::MultiValues))
}
FieldType::IpAddr(options) => options
.get_fastfield_cardinality()
.map(|cardinality| (FastType::U128, cardinality)),
_ => None,
}
}
@@ -143,6 +149,59 @@ impl FastFieldReaders {
self.typed_fast_field_reader(field)
}
/// Returns the `ip` fast field reader reader associated to `field`.
///
/// If `field` is not a u128 fast field, this method returns an Error.
pub fn ip_addr(&self, field: Field) -> crate::Result<Arc<dyn Column<Ipv6Addr>>> {
self.check_type(field, FastType::U128, Cardinality::SingleValue)?;
let bytes = self.fast_field_data(field, 0)?.read_bytes()?;
Ok(open_u128::<Ipv6Addr>(bytes)?)
}
/// Returns the `ip` fast field reader reader associated to `field`.
///
/// If `field` is not a u128 fast field, this method returns an Error.
pub fn ip_addrs(
&self,
field: Field,
) -> crate::Result<MultiValuedU128FastFieldReader<Ipv6Addr>> {
self.check_type(field, FastType::U128, Cardinality::MultiValues)?;
let idx_reader: Arc<dyn Column<u64>> = self.typed_fast_field_reader(field)?;
let bytes = self.fast_field_data(field, 1)?.read_bytes()?;
let vals_reader = open_u128::<Ipv6Addr>(bytes)?;
Ok(MultiValuedU128FastFieldReader::open(
idx_reader,
vals_reader,
))
}
/// Returns the `u128` fast field reader reader associated to `field`.
///
/// If `field` is not a u128 fast field, this method returns an Error.
pub(crate) fn u128(&self, field: Field) -> crate::Result<Arc<dyn Column<u128>>> {
self.check_type(field, FastType::U128, Cardinality::SingleValue)?;
let bytes = self.fast_field_data(field, 0)?.read_bytes()?;
Ok(open_u128::<u128>(bytes)?)
}
/// Returns the `u128` multi-valued fast field reader reader associated to `field`.
///
/// If `field` is not a u128 multi-valued fast field, this method returns an Error.
pub fn u128s(&self, field: Field) -> crate::Result<MultiValuedU128FastFieldReader<u128>> {
self.check_type(field, FastType::U128, Cardinality::MultiValues)?;
let idx_reader: Arc<dyn Column<u64>> = self.typed_fast_field_reader(field)?;
let bytes = self.fast_field_data(field, 1)?.read_bytes()?;
let vals_reader = open_u128::<u128>(bytes)?;
Ok(MultiValuedU128FastFieldReader::open(
idx_reader,
vals_reader,
))
}
/// Returns the `u64` fast field reader reader associated with `field`, regardless of whether
/// the given field is effectively of type `u64` or not.
///

View File

@@ -84,6 +84,21 @@ impl CompositeFastFieldSerializer {
Ok(())
}
/// Serialize data into a new u128 fast field. The codec will be compact space compressor,
/// which is optimized for scanning the fast field for a given range.
pub fn create_u128_fast_field_with_idx<F: Fn() -> I, I: Iterator<Item = u128>>(
&mut self,
field: Field,
iter_gen: F,
num_vals: u64,
idx: usize,
) -> io::Result<()> {
let field_write = self.composite_write.for_field_with_idx(field, idx);
fastfield_codecs::serialize_u128(iter_gen, num_vals, field_write)?;
Ok(())
}
/// Start serializing a new [u8] fast field. Use the returned writer to write data into the
/// bytes field. To associate the bytes with documents a seperate index must be created on
/// index 0. See bytes/writer.rs::serialize for an example.

View File

@@ -2,11 +2,11 @@ use std::collections::HashMap;
use std::io;
use common;
use fastfield_codecs::{Column, MonotonicallyMappableToU64};
use fastfield_codecs::{Column, MonotonicallyMappableToU128, MonotonicallyMappableToU64};
use fnv::FnvHashMap;
use tantivy_bitpacker::BlockedBitpacker;
use super::multivalued::MultiValuedFastFieldWriter;
use super::multivalued::{MultiValueU128FastFieldWriter, MultiValuedFastFieldWriter};
use super::FastFieldType;
use crate::fastfield::{BytesFastFieldWriter, CompositeFastFieldSerializer};
use crate::indexer::doc_id_mapping::DocIdMapping;
@@ -19,6 +19,8 @@ use crate::DatePrecision;
pub struct FastFieldsWriter {
term_id_writers: Vec<MultiValuedFastFieldWriter>,
single_value_writers: Vec<IntFastFieldWriter>,
u128_value_writers: Vec<U128FastFieldWriter>,
u128_multi_value_writers: Vec<MultiValueU128FastFieldWriter>,
multi_values_writers: Vec<MultiValuedFastFieldWriter>,
bytes_value_writers: Vec<BytesFastFieldWriter>,
}
@@ -34,6 +36,8 @@ fn fast_field_default_value(field_entry: &FieldEntry) -> u64 {
impl FastFieldsWriter {
/// Create all `FastFieldWriter` required by the schema.
pub fn from_schema(schema: &Schema) -> FastFieldsWriter {
let mut u128_value_writers = Vec::new();
let mut u128_multi_value_writers = Vec::new();
let mut single_value_writers = Vec::new();
let mut term_id_writers = Vec::new();
let mut multi_values_writers = Vec::new();
@@ -97,10 +101,27 @@ impl FastFieldsWriter {
bytes_value_writers.push(fast_field_writer);
}
}
FieldType::IpAddr(opt) => {
if opt.is_fast() {
match opt.get_fastfield_cardinality() {
Some(Cardinality::SingleValue) => {
let fast_field_writer = U128FastFieldWriter::new(field);
u128_value_writers.push(fast_field_writer);
}
Some(Cardinality::MultiValues) => {
let fast_field_writer = MultiValueU128FastFieldWriter::new(field);
u128_multi_value_writers.push(fast_field_writer);
}
None => {}
}
}
}
FieldType::Str(_) | FieldType::JsonObject(_) => {}
}
}
FastFieldsWriter {
u128_value_writers,
u128_multi_value_writers,
term_id_writers,
single_value_writers,
multi_values_writers,
@@ -129,6 +150,16 @@ impl FastFieldsWriter {
.iter()
.map(|w| w.mem_usage())
.sum::<usize>()
+ self
.u128_value_writers
.iter()
.map(|w| w.mem_usage())
.sum::<usize>()
+ self
.u128_multi_value_writers
.iter()
.map(|w| w.mem_usage())
.sum::<usize>()
}
/// Get the `FastFieldWriter` associated with a field.
@@ -190,7 +221,6 @@ impl FastFieldsWriter {
.iter_mut()
.find(|field_writer| field_writer.field() == field)
}
/// Indexes all of the fastfields of a new document.
pub fn add_document(&mut self, doc: &Document) {
for field_writer in &mut self.term_id_writers {
@@ -205,6 +235,12 @@ impl FastFieldsWriter {
for field_writer in &mut self.bytes_value_writers {
field_writer.add_document(doc);
}
for field_writer in &mut self.u128_value_writers {
field_writer.add_document(doc);
}
for field_writer in &mut self.u128_multi_value_writers {
field_writer.add_document(doc);
}
}
/// Serializes all of the `FastFieldWriter`s by pushing them in
@@ -230,6 +266,110 @@ impl FastFieldsWriter {
for field_writer in self.bytes_value_writers {
field_writer.serialize(serializer, doc_id_map)?;
}
for field_writer in self.u128_value_writers {
field_writer.serialize(serializer, doc_id_map)?;
}
for field_writer in self.u128_multi_value_writers {
field_writer.serialize(serializer, doc_id_map)?;
}
Ok(())
}
}
/// Fast field writer for u128 values.
/// The fast field writer just keeps the values in memory.
///
/// Only when the segment writer can be closed and
/// persisted on disk, the fast field writer is
/// sent to a `FastFieldSerializer` via the `.serialize(...)`
/// method.
///
/// We cannot serialize earlier as the values are
/// compressed to a compact number space and the number of
/// bits required for bitpacking can only been known once
/// we have seen all of the values.
pub struct U128FastFieldWriter {
field: Field,
vals: Vec<u128>,
val_count: u32,
}
impl U128FastFieldWriter {
/// Creates a new `IntFastFieldWriter`
pub fn new(field: Field) -> Self {
Self {
field,
vals: vec![],
val_count: 0,
}
}
/// The memory used (inclusive childs)
pub fn mem_usage(&self) -> usize {
self.vals.len() * 16
}
/// Records a new value.
///
/// The n-th value being recorded is implicitely
/// associated to the document with the `DocId` n.
/// (Well, `n-1` actually because of 0-indexing)
pub fn add_val(&mut self, val: u128) {
self.vals.push(val);
}
/// Extract the fast field value from the document
/// (or use the default value) and records it.
///
/// Extract the value associated to the fast field for
/// this document.
pub fn add_document(&mut self, doc: &Document) {
match doc.get_first(self.field) {
Some(v) => {
let ip_addr = v
.as_ip_addr()
.unwrap_or_else(|| panic!("expected and ip, but got {:?}", v));
let value = ip_addr.to_u128();
self.add_val(value);
}
None => {
self.add_val(0); // TODO fix null handling
}
};
self.val_count += 1;
}
/// Push the fast fields value to the `FastFieldWriter`.
pub fn serialize(
&self,
serializer: &mut CompositeFastFieldSerializer,
doc_id_map: Option<&DocIdMapping>,
) -> io::Result<()> {
if let Some(doc_id_map) = doc_id_map {
let iter_gen = || {
doc_id_map
.iter_old_doc_ids()
.map(|idx| self.vals[idx as usize])
};
serializer.create_u128_fast_field_with_idx(
self.field,
iter_gen,
self.val_count as u64,
0,
)?;
} else {
let iter_gen = || self.vals.iter().cloned();
serializer.create_u128_fast_field_with_idx(
self.field,
iter_gen,
self.val_count as u64,
0,
)?;
}
Ok(())
}
}
@@ -238,7 +378,7 @@ impl FastFieldsWriter {
/// The fast field writer just keeps the values in memory.
///
/// Only when the segment writer can be closed and
/// persisted on disc, the fast field writer is
/// persisted on disk, the fast field writer is
/// sent to a `FastFieldSerializer` via the `.serialize(...)`
/// method.
///

View File

@@ -803,7 +803,9 @@ impl Drop for IndexWriter {
#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
use std::net::Ipv6Addr;
use fastfield_codecs::MonotonicallyMappableToU128;
use proptest::prelude::*;
use proptest::prop_oneof;
use proptest::strategy::Strategy;
@@ -815,7 +817,7 @@ mod tests {
use crate::indexer::NoMergePolicy;
use crate::query::{BooleanQuery, Occur, Query, QueryParser, TermQuery};
use crate::schema::{
self, Cardinality, Facet, FacetOptions, IndexRecordOption, NumericOptions,
self, Cardinality, Facet, FacetOptions, IndexRecordOption, IpAddrOptions, NumericOptions,
TextFieldIndexing, TextOptions, FAST, INDEXED, STORED, STRING, TEXT,
};
use crate::store::DOCSTORE_CACHE_CAPACITY;
@@ -1593,6 +1595,11 @@ mod tests {
force_end_merge: bool,
) -> crate::Result<()> {
let mut schema_builder = schema::Schema::builder();
let ip_field = schema_builder.add_ip_addr_field("ip", FAST | INDEXED | STORED);
let ips_field = schema_builder.add_ip_addr_field(
"ips",
IpAddrOptions::default().set_fast(Cardinality::MultiValues),
);
let id_field = schema_builder.add_u64_field("id", FAST | INDEXED | STORED);
let bytes_field = schema_builder.add_bytes_field("bytes", FAST | INDEXED | STORED);
let bool_field = schema_builder.add_bool_field("bool", FAST | INDEXED | STORED);
@@ -1648,17 +1655,37 @@ mod tests {
match op {
IndexingOp::AddDoc { id } => {
let facet = Facet::from(&("/cola/".to_string() + &id.to_string()));
index_writer.add_document(doc!(id_field=>id,
bytes_field => id.to_le_bytes().as_slice(),
multi_numbers=> id,
multi_numbers => id,
bool_field => (id % 2u64) != 0,
multi_bools => (id % 2u64) != 0,
multi_bools => (id % 2u64) == 0,
text_field => id.to_string(),
facet_field => facet,
large_text_field=> LOREM
))?;
let ip_from_id = Ipv6Addr::from_u128(id as u128);
if id % 3 == 0 {
// every 3rd doc has no ip field
index_writer.add_document(doc!(id_field=>id,
bytes_field => id.to_le_bytes().as_slice(),
multi_numbers=> id,
multi_numbers => id,
bool_field => (id % 2u64) != 0,
multi_bools => (id % 2u64) != 0,
multi_bools => (id % 2u64) == 0,
text_field => id.to_string(),
facet_field => facet,
large_text_field=> LOREM
))?;
} else {
index_writer.add_document(doc!(id_field=>id,
bytes_field => id.to_le_bytes().as_slice(),
ip_field => ip_from_id,
ips_field => ip_from_id,
ips_field => ip_from_id,
multi_numbers=> id,
multi_numbers => id,
bool_field => (id % 2u64) != 0,
multi_bools => (id % 2u64) != 0,
multi_bools => (id % 2u64) == 0,
text_field => id.to_string(),
facet_field => facet,
large_text_field=> LOREM
))?;
}
}
IndexingOp::DeleteDoc { id } => {
index_writer.delete_term(Term::from_field_u64(id_field, id));
@@ -1744,6 +1771,60 @@ mod tests {
.collect::<HashSet<_>>()
);
// Load all ips addr
let ips: HashSet<Ipv6Addr> = searcher
.segment_readers()
.iter()
.flat_map(|segment_reader| {
let ff_reader = segment_reader.fast_fields().ip_addr(ip_field).unwrap();
segment_reader.doc_ids_alive().flat_map(move |doc| {
let val = ff_reader.get_val(doc as u64);
if val == Ipv6Addr::from_u128(0) {
// TODO Fix null handling
None
} else {
Some(val)
}
})
})
.collect();
let expected_ips = expected_ids_and_num_occurrences
.keys()
.flat_map(|id| {
if id % 3 == 0 {
None
} else {
Some(Ipv6Addr::from_u128(*id as u128))
}
})
.collect::<HashSet<_>>();
assert_eq!(ips, expected_ips);
let expected_ips = expected_ids_and_num_occurrences
.keys()
.filter_map(|id| {
if id % 3 == 0 {
None
} else {
Some(Ipv6Addr::from_u128(*id as u128))
}
})
.collect::<HashSet<_>>();
let ips: HashSet<Ipv6Addr> = searcher
.segment_readers()
.iter()
.flat_map(|segment_reader| {
let ff_reader = segment_reader.fast_fields().ip_addrs(ips_field).unwrap();
segment_reader.doc_ids_alive().flat_map(move |doc| {
let mut vals = vec![];
ff_reader.get_vals(doc, &mut vals);
vals.into_iter().filter(|val| val.to_u128() != 0) // TODO Fix null handling
})
})
.collect();
assert_eq!(ips, expected_ips);
// multivalue fast field tests
for segment_reader in searcher.segment_readers().iter() {
let id_reader = segment_reader.fast_fields().u64(id_field).unwrap();
@@ -1847,6 +1928,36 @@ mod tests {
Ok(())
}
#[test]
fn test_minimal() {
assert!(test_operation_strategy(
&[
IndexingOp::AddDoc { id: 23 },
IndexingOp::AddDoc { id: 13 },
IndexingOp::DeleteDoc { id: 13 }
],
true,
false
)
.is_ok());
assert!(test_operation_strategy(
&[
IndexingOp::AddDoc { id: 23 },
IndexingOp::AddDoc { id: 13 },
IndexingOp::DeleteDoc { id: 13 }
],
false,
false
)
.is_ok());
}
#[test]
fn test_minimal_sort_merge() {
assert!(test_operation_strategy(&[IndexingOp::AddDoc { id: 3 },], true, true).is_ok());
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(20))]
#[test]

View File

@@ -6,13 +6,14 @@ use fastfield_codecs::VecColumn;
use itertools::Itertools;
use measure_time::debug_time;
use super::flat_map_with_buffer::FlatMapWithBufferIter;
use super::sorted_doc_id_multivalue_column::RemappedDocIdMultiValueIndexColumn;
use crate::core::{Segment, SegmentReader};
use crate::docset::{DocSet, TERMINATED};
use crate::error::DataCorruption;
use crate::fastfield::{
get_fastfield_codecs_for_multivalue, AliveBitSet, Column, CompositeFastFieldSerializer,
MultiValueLength, MultiValuedFastFieldReader,
MultiValueLength, MultiValuedFastFieldReader, MultiValuedU128FastFieldReader,
};
use crate::fieldnorm::{FieldNormReader, FieldNormReaders, FieldNormsSerializer, FieldNormsWriter};
use crate::indexer::doc_id_mapping::{expect_field_id_for_sort_field, SegmentDocIdMapping};
@@ -295,6 +296,24 @@ impl IndexMerger {
self.write_bytes_fast_field(field, fast_field_serializer, doc_id_mapping)?;
}
}
FieldType::IpAddr(options) => match options.get_fastfield_cardinality() {
Some(Cardinality::SingleValue) => {
self.write_u128_single_fast_field(
field,
fast_field_serializer,
doc_id_mapping,
)?;
}
Some(Cardinality::MultiValues) => {
self.write_u128_multi_fast_field(
field,
fast_field_serializer,
doc_id_mapping,
)?;
}
None => {}
},
FieldType::JsonObject(_) | FieldType::Facet(_) | FieldType::Str(_) => {
// We don't handle json fast field for the moment
// They can be implemented using what is done
@@ -305,6 +324,91 @@ impl IndexMerger {
Ok(())
}
// used to merge `u128` single fast fields.
fn write_u128_multi_fast_field(
&self,
field: Field,
fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<()> {
let segment_and_ff_readers: Vec<(&SegmentReader, MultiValuedU128FastFieldReader<u128>)> =
self.readers
.iter()
.map(|segment_reader| {
let ff_reader: MultiValuedU128FastFieldReader<u128> =
segment_reader.fast_fields().u128s(field).expect(
"Failed to find index for multivalued field. This is a bug in \
tantivy, please report.",
);
(segment_reader, ff_reader)
})
.collect::<Vec<_>>();
Self::write_1_n_fast_field_idx_generic(
field,
fast_field_serializer,
doc_id_mapping,
&segment_and_ff_readers,
)?;
let fast_field_readers = segment_and_ff_readers
.into_iter()
.map(|(_, ff_reader)| ff_reader)
.collect::<Vec<_>>();
let iter_gen = || {
doc_id_mapping
.iter_old_doc_addrs()
.flat_map_with_buffer(|doc_addr, buffer| {
let fast_field_reader = &fast_field_readers[doc_addr.segment_ord as usize];
fast_field_reader.get_vals(doc_addr.doc_id, buffer);
})
};
fast_field_serializer.create_u128_fast_field_with_idx(
field,
iter_gen,
doc_id_mapping.len() as u64,
1,
)?;
Ok(())
}
// used to merge `u128` single fast fields.
fn write_u128_single_fast_field(
&self,
field: Field,
fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<()> {
let fast_field_readers = self
.readers
.iter()
.map(|reader| {
let u128_reader: Arc<dyn Column<u128>> = reader.fast_fields().u128(field).expect(
"Failed to find a reader for single fast field. This is a tantivy bug and it \
should never happen.",
);
u128_reader
})
.collect::<Vec<_>>();
let iter_gen = || {
doc_id_mapping.iter_old_doc_addrs().map(|doc_addr| {
let fast_field_reader = &fast_field_readers[doc_addr.segment_ord as usize];
fast_field_reader.get_val(doc_addr.doc_id as u64)
})
};
fast_field_serializer.create_u128_fast_field_with_idx(
field,
iter_gen,
doc_id_mapping.len() as u64,
0,
)?;
Ok(())
}
// used both to merge field norms, `u64/i64` single fast fields.
fn write_single_fast_field(
&self,

View File

@@ -294,6 +294,7 @@ impl SegmentWriter {
ctx,
)?;
}
FieldType::IpAddr(_) => {}
}
}
Ok(())

View File

@@ -50,6 +50,7 @@ fn posting_writer_from_field_entry(field_entry: &FieldEntry) -> Box<dyn Postings
| FieldType::Bool(_)
| FieldType::Date(_)
| FieldType::Bytes(_)
| FieldType::IpAddr(_)
| FieldType::Facet(_) => Box::new(SpecializedPostingsWriter::<NothingRecorder>::default()),
FieldType::JsonObject(ref json_object_options) => {
if let Some(text_indexing_option) = json_object_options.get_text_indexing_options() {

View File

@@ -89,6 +89,7 @@ pub(crate) fn serialize_postings(
| FieldType::Bool(_) => {}
FieldType::Bytes(_) => {}
FieldType::JsonObject(_) => {}
FieldType::IpAddr(_) => {}
}
let postings_writer = per_field_postings_writers.get_for_field(field);

View File

@@ -400,6 +400,9 @@ impl QueryParser {
let bytes = base64::decode(phrase).map_err(QueryParserError::ExpectedBase64)?;
Ok(Term::from_field_bytes(field, &bytes))
}
FieldType::IpAddr(_) => Err(QueryParserError::UnsupportedQuery(
"Range query are not supported on IpAddr field.".to_string(),
)),
}
}
@@ -506,6 +509,7 @@ impl QueryParser {
let bytes_term = Term::from_field_bytes(field, &bytes);
Ok(vec![LogicalLiteral::Term(bytes_term)])
}
FieldType::IpAddr(_) => Err(QueryParserError::FieldNotIndexed(field_name.to_string())),
}
}

View File

@@ -1,6 +1,7 @@
use std::collections::{HashMap, HashSet};
use std::io::{self, Read, Write};
use std::mem;
use std::net::Ipv6Addr;
use common::{BinarySerializable, VInt};
@@ -97,6 +98,11 @@ impl Document {
self.add_field_value(field, value);
}
/// Add a IP address field. Internally only Ipv6Addr is used.
pub fn add_ip_addr(&mut self, field: Field, value: Ipv6Addr) {
self.add_field_value(field, value);
}
/// Add a i64 field
pub fn add_i64(&mut self, field: Field, value: i64) {
self.add_field_value(field, value);

View File

@@ -1,5 +1,6 @@
use serde::{Deserialize, Serialize};
use super::ip_options::IpAddrOptions;
use crate::schema::bytes_options::BytesOptions;
use crate::schema::{
is_valid_field_name, DateOptions, FacetOptions, FieldType, JsonObjectOptions, NumericOptions,
@@ -60,6 +61,11 @@ impl FieldEntry {
Self::new(field_name, FieldType::Date(date_options))
}
/// Creates a new ip address field entry.
pub fn new_ip_addr(field_name: String, ip_options: IpAddrOptions) -> FieldEntry {
Self::new(field_name, FieldType::IpAddr(ip_options))
}
/// Creates a field entry for a facet.
pub fn new_facet(field_name: String, facet_options: FacetOptions) -> FieldEntry {
Self::new(field_name, FieldType::Facet(facet_options))
@@ -114,6 +120,7 @@ impl FieldEntry {
FieldType::Facet(ref options) => options.is_stored(),
FieldType::Bytes(ref options) => options.is_stored(),
FieldType::JsonObject(ref options) => options.is_stored(),
FieldType::IpAddr(ref options) => options.is_stored(),
}
}
}

View File

@@ -1,7 +1,11 @@
use std::net::{IpAddr, Ipv6Addr};
use std::str::FromStr;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use thiserror::Error;
use super::ip_options::IpAddrOptions;
use super::Cardinality;
use crate::schema::bytes_options::BytesOptions;
use crate::schema::facet_options::FacetOptions;
@@ -62,9 +66,11 @@ pub enum Type {
Bytes = b'b',
/// Leaf in a Json object.
Json = b'j',
/// IpAddr
IpAddr = b'p',
}
const ALL_TYPES: [Type; 9] = [
const ALL_TYPES: [Type; 10] = [
Type::Str,
Type::U64,
Type::I64,
@@ -74,6 +80,7 @@ const ALL_TYPES: [Type; 9] = [
Type::Facet,
Type::Bytes,
Type::Json,
Type::IpAddr,
];
impl Type {
@@ -100,6 +107,7 @@ impl Type {
Type::Facet => "Facet",
Type::Bytes => "Bytes",
Type::Json => "Json",
Type::IpAddr => "IpAddr",
}
}
@@ -116,6 +124,7 @@ impl Type {
b'h' => Some(Type::Facet),
b'b' => Some(Type::Bytes),
b'j' => Some(Type::Json),
b'p' => Some(Type::IpAddr),
_ => None,
}
}
@@ -146,6 +155,8 @@ pub enum FieldType {
Bytes(BytesOptions),
/// Json object
JsonObject(JsonObjectOptions),
/// IpAddr field
IpAddr(IpAddrOptions),
}
impl FieldType {
@@ -161,6 +172,7 @@ impl FieldType {
FieldType::Facet(_) => Type::Facet,
FieldType::Bytes(_) => Type::Bytes,
FieldType::JsonObject(_) => Type::Json,
FieldType::IpAddr(_) => Type::IpAddr,
}
}
@@ -176,6 +188,7 @@ impl FieldType {
FieldType::Facet(ref _facet_options) => true,
FieldType::Bytes(ref bytes_options) => bytes_options.is_indexed(),
FieldType::JsonObject(ref json_object_options) => json_object_options.is_indexed(),
FieldType::IpAddr(_) => false,
}
}
@@ -210,6 +223,7 @@ impl FieldType {
| FieldType::F64(ref int_options)
| FieldType::Bool(ref int_options) => int_options.is_fast(),
FieldType::Date(ref date_options) => date_options.is_fast(),
FieldType::IpAddr(ref ip_addr_options) => ip_addr_options.is_fast(),
FieldType::Facet(_) => true,
FieldType::JsonObject(_) => false,
}
@@ -250,6 +264,7 @@ impl FieldType {
FieldType::Facet(_) => false,
FieldType::Bytes(ref bytes_options) => bytes_options.fieldnorms(),
FieldType::JsonObject(ref _json_object_options) => false,
FieldType::IpAddr(_) => false,
}
}
@@ -294,6 +309,7 @@ impl FieldType {
FieldType::JsonObject(ref json_obj_options) => json_obj_options
.get_text_indexing_options()
.map(TextFieldIndexing::index_option),
FieldType::IpAddr(_) => None,
}
}
@@ -333,6 +349,19 @@ impl FieldType {
expected: "a json object",
json: JsonValue::String(field_text),
}),
FieldType::IpAddr(_) => {
let ip_addr: IpAddr = IpAddr::from_str(&field_text).map_err(|err| {
ValueParsingError::ParseError {
error: err.to_string(),
json: JsonValue::String(field_text),
}
})?;
let ip_addr_v6: Ipv6Addr = match ip_addr {
IpAddr::V4(v4) => v4.to_ipv6_mapped(),
IpAddr::V6(v6) => v6,
};
Ok(Value::IpAddr(ip_addr_v6))
}
}
}
JsonValue::Number(field_val_num) => match self {
@@ -380,6 +409,10 @@ impl FieldType {
expected: "a json object",
json: JsonValue::Number(field_val_num),
}),
FieldType::IpAddr(_) => Err(ValueParsingError::TypeError {
expected: "a string with an ip addr",
json: JsonValue::Number(field_val_num),
}),
},
JsonValue::Object(json_map) => match self {
FieldType::Str(_) => {

View File

@@ -7,13 +7,13 @@ use super::Cardinality;
/// Define how an ip field should be handled by tantivy.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct IpOptions {
pub struct IpAddrOptions {
#[serde(skip_serializing_if = "Option::is_none")]
fast: Option<Cardinality>,
stored: bool,
}
impl IpOptions {
impl IpAddrOptions {
/// Returns true iff the value is a fast field.
pub fn is_fast(&self) -> bool {
self.fast.is_some()
@@ -52,52 +52,52 @@ impl IpOptions {
}
}
impl From<()> for IpOptions {
fn from(_: ()) -> IpOptions {
IpOptions::default()
impl From<()> for IpAddrOptions {
fn from(_: ()) -> IpAddrOptions {
IpAddrOptions::default()
}
}
impl From<FastFlag> for IpOptions {
impl From<FastFlag> for IpAddrOptions {
fn from(_: FastFlag) -> Self {
IpOptions {
IpAddrOptions {
stored: false,
fast: Some(Cardinality::SingleValue),
}
}
}
impl From<StoredFlag> for IpOptions {
impl From<StoredFlag> for IpAddrOptions {
fn from(_: StoredFlag) -> Self {
IpOptions {
IpAddrOptions {
stored: true,
fast: None,
}
}
}
impl From<IndexedFlag> for IpOptions {
impl From<IndexedFlag> for IpAddrOptions {
fn from(_: IndexedFlag) -> Self {
IpOptions {
IpAddrOptions {
stored: false,
fast: None,
}
}
}
impl<T: Into<IpOptions>> BitOr<T> for IpOptions {
type Output = IpOptions;
impl<T: Into<IpAddrOptions>> BitOr<T> for IpAddrOptions {
type Output = IpAddrOptions;
fn bitor(self, other: T) -> IpOptions {
fn bitor(self, other: T) -> IpAddrOptions {
let other = other.into();
IpOptions {
IpAddrOptions {
stored: self.stored | other.stored,
fast: self.fast.or(other.fast),
}
}
}
impl<Head, Tail> From<SchemaFlagList<Head, Tail>> for IpOptions
impl<Head, Tail> From<SchemaFlagList<Head, Tail>> for IpAddrOptions
where
Head: Clone,
Tail: Clone,

View File

@@ -138,7 +138,7 @@ pub use self::field_type::{FieldType, Type};
pub use self::field_value::FieldValue;
pub use self::flags::{FAST, INDEXED, STORED};
pub use self::index_record_option::IndexRecordOption;
pub use self::ip_options::IpOptions;
pub use self::ip_options::IpAddrOptions;
pub use self::json_object_options::JsonObjectOptions;
pub use self::named_field_document::NamedFieldDocument;
pub use self::numeric_options::NumericOptions;

View File

@@ -7,6 +7,7 @@ use serde::ser::SerializeSeq;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_json::{self, Value as JsonValue};
use super::ip_options::IpAddrOptions;
use super::*;
use crate::schema::bytes_options::BytesOptions;
use crate::schema::field_type::ValueParsingError;
@@ -144,6 +145,26 @@ impl SchemaBuilder {
self.add_field(field_entry)
}
/// Adds a ip field.
/// Returns the associated field handle.
///
/// # Caution
///
/// Appending two fields with the same name
/// will result in the shadowing of the first
/// by the second one.
/// The first field will get a field id
/// but only the second one will be indexed
pub fn add_ip_addr_field<T: Into<IpAddrOptions>>(
&mut self,
field_name_str: &str,
field_options: T,
) -> Field {
let field_name = String::from(field_name_str);
let field_entry = FieldEntry::new_ip_addr(field_name, field_options.into());
self.add_field(field_entry)
}
/// Adds a new text field.
/// Returns the associated field handle
///
@@ -598,12 +619,14 @@ mod tests {
schema_builder.add_text_field("title", TEXT);
schema_builder.add_text_field("author", STRING);
schema_builder.add_u64_field("count", count_options);
schema_builder.add_ip_addr_field("ip", FAST | STORED);
schema_builder.add_bool_field("is_read", is_read_options);
let schema = schema_builder.build();
let doc_json = r#"{
"title": "my title",
"author": "fulmicoton",
"count": 4,
"ip": "127.0.0.1",
"is_read": true
}"#;
let doc = schema.parse_document(doc_json).unwrap();
@@ -612,6 +635,39 @@ mod tests {
assert_eq!(doc, doc_serdeser);
}
#[test]
pub fn test_document_to_ipv4_json() {
let mut schema_builder = Schema::builder();
schema_builder.add_ip_addr_field("ip", FAST | STORED);
let schema = schema_builder.build();
// IpV4 loopback
let doc_json = r#"{
"ip": "127.0.0.1"
}"#;
let doc = schema.parse_document(doc_json).unwrap();
let value: serde_json::Value = serde_json::from_str(&schema.to_json(&doc)).unwrap();
assert_eq!(value["ip"][0], "127.0.0.1");
// Special case IpV6 loopback. We don't want to map that to IPv4
let doc_json = r#"{
"ip": "::1"
}"#;
let doc = schema.parse_document(doc_json).unwrap();
let value: serde_json::Value = serde_json::from_str(&schema.to_json(&doc)).unwrap();
assert_eq!(value["ip"][0], "::1");
// testing ip address of every router in the world
let doc_json = r#"{
"ip": "192.168.0.1"
}"#;
let doc = schema.parse_document(doc_json).unwrap();
let value: serde_json::Value = serde_json::from_str(&schema.to_json(&doc)).unwrap();
assert_eq!(value["ip"][0], "192.168.0.1");
}
#[test]
pub fn test_document_from_nameddoc() {
let mut schema_builder = Schema::builder();

View File

@@ -415,6 +415,9 @@ fn debug_value_bytes(typ: Type, bytes: &[u8], f: &mut fmt::Formatter) -> fmt::Re
debug_value_bytes(typ, bytes, f)?;
}
}
Type::IpAddr => {
write!(f, "")?; // TODO change once we actually have IP address terms.
}
}
Ok(())
}

View File

@@ -1,4 +1,5 @@
use std::fmt;
use std::net::Ipv6Addr;
use serde::de::Visitor;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
@@ -32,6 +33,8 @@ pub enum Value {
Bytes(Vec<u8>),
/// Json object value.
JsonObject(serde_json::Map<String, serde_json::Value>),
/// IpV6 Address. Internally there is no IpV4, it needs to be converted to `Ipv6Addr`.
IpAddr(Ipv6Addr),
}
impl Eq for Value {}
@@ -50,6 +53,14 @@ impl Serialize for Value {
Value::Facet(ref facet) => facet.serialize(serializer),
Value::Bytes(ref bytes) => serializer.serialize_str(&base64::encode(bytes)),
Value::JsonObject(ref obj) => obj.serialize(serializer),
Value::IpAddr(ref obj) => {
// Ensure IpV4 addresses get serialized as IpV4, but excluding IpV6 loopback.
if let Some(ip_v4) = obj.to_ipv4_mapped() {
ip_v4.serialize(serializer)
} else {
obj.serialize(serializer)
}
}
}
}
}
@@ -201,6 +212,16 @@ impl Value {
None
}
}
/// Returns the ip addr, provided the value is of the `Ip` type.
/// (Returns None if the value is not of the `Ip` type)
pub fn as_ip_addr(&self) -> Option<Ipv6Addr> {
if let Value::IpAddr(val) = self {
Some(*val)
} else {
None
}
}
}
impl From<String> for Value {
@@ -209,6 +230,12 @@ impl From<String> for Value {
}
}
impl From<Ipv6Addr> for Value {
fn from(v: Ipv6Addr) -> Value {
Value::IpAddr(v)
}
}
impl From<u64> for Value {
fn from(v: u64) -> Value {
Value::U64(v)
@@ -288,8 +315,10 @@ impl From<serde_json::Value> for Value {
mod binary_serialize {
use std::io::{self, Read, Write};
use std::net::Ipv6Addr;
use common::{f64_to_u64, u64_to_f64, BinarySerializable};
use fastfield_codecs::MonotonicallyMappableToU128;
use super::Value;
use crate::schema::Facet;
@@ -306,6 +335,7 @@ mod binary_serialize {
const EXT_CODE: u8 = 7;
const JSON_OBJ_CODE: u8 = 8;
const BOOL_CODE: u8 = 9;
const IP_CODE: u8 = 10;
// extended types
@@ -366,6 +396,10 @@ mod binary_serialize {
serde_json::to_writer(writer, &map)?;
Ok(())
}
Value::IpAddr(ref ip) => {
IP_CODE.serialize(writer)?;
ip.to_u128().serialize(writer)
}
}
}
@@ -436,6 +470,11 @@ mod binary_serialize {
let json_map = <serde_json::Map::<String, serde_json::Value> as serde::Deserialize>::deserialize(&mut de)?;
Ok(Value::JsonObject(json_map))
}
IP_CODE => {
let value = u128::deserialize(reader)?;
Ok(Value::IpAddr(Ipv6Addr::from_u128(value)))
}
_ => Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("No field type is associated with code {:?}", type_code),