handle ip adresses in term aggregation (#2319)

* handle ip adresses in term aggregation

Stores IpAdresses during the segment term aggregation via u64 representation
and convert to u128(IpV6Adress) via downcast when converting to intermediate results.

Enable Downcasting on `ColumnValues`
Expose u64 variant for u128 encoded data via `open_u64_lenient` method.
Remove lifetime in VecColumn, to avoid 'static lifetime requirement coming
from downcast trait.

* rename method
This commit is contained in:
PSeitz
2024-03-14 09:41:18 +01:00
committed by GitHub
parent ec37295b2f
commit b0e65560a1
19 changed files with 256 additions and 63 deletions

View File

@@ -17,6 +17,7 @@ sstable = { version= "0.2", path = "../sstable", package = "tantivy-sstable" }
common = { version= "0.6", path = "../common", package = "tantivy-common" }
tantivy-bitpacker = { version= "0.5", path = "../bitpacker/" }
serde = "1.0.152"
downcast-rs = "1.2.0"
[dev-dependencies]
proptest = "1"

View File

@@ -9,8 +9,8 @@ use std::sync::Arc;
use common::BinarySerializable;
pub use dictionary_encoded::{BytesColumn, StrColumn};
pub use serialize::{
open_column_bytes, open_column_str, open_column_u128, open_column_u64,
serialize_column_mappable_to_u128, serialize_column_mappable_to_u64,
open_column_bytes, open_column_str, open_column_u128, open_column_u128_as_compact_u64,
open_column_u64, serialize_column_mappable_to_u128, serialize_column_mappable_to_u64,
};
use crate::column_index::ColumnIndex;
@@ -169,6 +169,7 @@ struct FirstValueWithDefault<T: Copy> {
impl<T: PartialOrd + Debug + Send + Sync + Copy + 'static> ColumnValues<T>
for FirstValueWithDefault<T>
{
#[inline(always)]
fn get_val(&self, idx: u32) -> T {
self.column.first(idx).unwrap_or(self.default_value)
}

View File

@@ -76,6 +76,26 @@ pub fn open_column_u128<T: MonotonicallyMappableToU128>(
})
}
/// Open the column as u64.
///
/// See [`open_u128_as_compact_u64`] for more details.
pub fn open_column_u128_as_compact_u64(bytes: OwnedBytes) -> io::Result<Column<u64>> {
let (body, column_index_num_bytes_payload) = bytes.rsplit(4);
let column_index_num_bytes = u32::from_le_bytes(
column_index_num_bytes_payload
.as_slice()
.try_into()
.unwrap(),
);
let (column_index_data, column_values_data) = body.split(column_index_num_bytes as usize);
let column_index = crate::column_index::open_column_index(column_index_data)?;
let column_values = crate::column_values::open_u128_as_compact_u64(column_values_data)?;
Ok(Column {
index: column_index,
values: column_values,
})
}
pub fn open_column_bytes(data: OwnedBytes) -> io::Result<BytesColumn> {
let (body, dictionary_len_bytes) = data.rsplit(4);
let dictionary_len = u32::from_le_bytes(dictionary_len_bytes.as_slice().try_into().unwrap());

View File

@@ -10,7 +10,7 @@ pub(crate) struct MergedColumnValues<'a, T> {
pub(crate) merge_row_order: &'a MergeRowOrder,
}
impl<'a, T: Copy + PartialOrd + Debug> Iterable<T> for MergedColumnValues<'a, T> {
impl<'a, T: Copy + PartialOrd + Debug + 'static> Iterable<T> for MergedColumnValues<'a, T> {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = T> + '_> {
match self.merge_row_order {
MergeRowOrder::Stack(_) => Box::new(

View File

@@ -10,6 +10,7 @@ use std::fmt::Debug;
use std::ops::{Range, RangeInclusive};
use std::sync::Arc;
use downcast_rs::DowncastSync;
pub use monotonic_mapping::{MonotonicallyMappableToU64, StrictlyMonotonicFn};
pub use monotonic_mapping_u128::MonotonicallyMappableToU128;
@@ -25,7 +26,10 @@ mod monotonic_column;
pub(crate) use merge::MergedColumnValues;
pub use stats::ColumnStats;
pub use u128_based::{open_u128_mapped, serialize_column_values_u128};
pub use u128_based::{
open_u128_as_compact_u64, open_u128_mapped, serialize_column_values_u128,
CompactSpaceU64Accessor,
};
pub use u64_based::{
load_u64_based_column_values, serialize_and_load_u64_based_column_values,
serialize_u64_based_column_values, CodecType, ALL_U64_CODEC_TYPES,
@@ -41,7 +45,7 @@ use crate::RowId;
///
/// Any methods with a default and specialized implementation need to be called in the
/// wrappers that implement the trait: Arc and MonotonicMappingColumn
pub trait ColumnValues<T: PartialOrd = u64>: Send + Sync {
pub trait ColumnValues<T: PartialOrd = u64>: Send + Sync + DowncastSync {
/// Return the value associated with the given idx.
///
/// This accessor should return as fast as possible.
@@ -139,6 +143,7 @@ pub trait ColumnValues<T: PartialOrd = u64>: Send + Sync {
Box::new((0..self.num_vals()).map(|idx| self.get_val(idx)))
}
}
downcast_rs::impl_downcast!(sync ColumnValues<T> where T: PartialOrd);
/// Empty column of values.
pub struct EmptyColumnValues;
@@ -161,7 +166,7 @@ impl<T: PartialOrd + Default> ColumnValues<T> for EmptyColumnValues {
}
}
impl<T: Copy + PartialOrd + Debug> ColumnValues<T> for Arc<dyn ColumnValues<T>> {
impl<T: Copy + PartialOrd + Debug + 'static> ColumnValues<T> for Arc<dyn ColumnValues<T>> {
#[inline(always)]
fn get_val(&self, idx: u32) -> T {
self.as_ref().get_val(idx)

View File

@@ -31,10 +31,10 @@ pub fn monotonic_map_column<C, T, Input, Output>(
monotonic_mapping: T,
) -> impl ColumnValues<Output>
where
C: ColumnValues<Input>,
T: StrictlyMonotonicFn<Input, Output> + Send + Sync,
Input: PartialOrd + Debug + Send + Sync + Clone,
Output: PartialOrd + Debug + Send + Sync + Clone,
C: ColumnValues<Input> + 'static,
T: StrictlyMonotonicFn<Input, Output> + Send + Sync + 'static,
Input: PartialOrd + Debug + Send + Sync + Clone + 'static,
Output: PartialOrd + Debug + Send + Sync + Clone + 'static,
{
MonotonicMappingColumn {
from_column,
@@ -45,10 +45,10 @@ where
impl<C, T, Input, Output> ColumnValues<Output> for MonotonicMappingColumn<C, T, Input>
where
C: ColumnValues<Input>,
T: StrictlyMonotonicFn<Input, Output> + Send + Sync,
Input: PartialOrd + Send + Debug + Sync + Clone,
Output: PartialOrd + Send + Debug + Sync + Clone,
C: ColumnValues<Input> + 'static,
T: StrictlyMonotonicFn<Input, Output> + Send + Sync + 'static,
Input: PartialOrd + Send + Debug + Sync + Clone + 'static,
Output: PartialOrd + Send + Debug + Sync + Clone + 'static,
{
#[inline(always)]
fn get_val(&self, idx: u32) -> Output {
@@ -107,7 +107,7 @@ mod tests {
#[test]
fn test_monotonic_mapping_iter() {
let vals: Vec<u64> = (0..100u64).map(|el| el * 10).collect();
let col = VecColumn::from(&vals);
let col = VecColumn::from(vals);
let mapped = monotonic_map_column(
col,
StrictlyMonotonicMappingInverter::from(StrictlyMonotonicMappingToInternal::<i64>::new()),

View File

@@ -292,6 +292,63 @@ impl BinarySerializable for IPCodecParams {
}
}
/// Exposes the compact space compressed values as u64.
///
/// This allows faster access to the values, as u64 is faster to work with than u128.
/// It also allows to handle u128 values like u64, via the `open_u64_lenient` as a uniform
/// access interface.
///
/// When converting from the internal u64 to u128 `compact_to_u128` can be used.
pub struct CompactSpaceU64Accessor(CompactSpaceDecompressor);
impl CompactSpaceU64Accessor {
pub(crate) fn open(data: OwnedBytes) -> io::Result<CompactSpaceU64Accessor> {
let decompressor = CompactSpaceU64Accessor(CompactSpaceDecompressor::open(data)?);
Ok(decompressor)
}
/// Convert a compact space value to u128
pub fn compact_to_u128(&self, compact: u32) -> u128 {
self.0.compact_to_u128(compact)
}
}
impl ColumnValues<u64> for CompactSpaceU64Accessor {
#[inline]
fn get_val(&self, doc: u32) -> u64 {
let compact = self.0.get_compact(doc);
compact as u64
}
fn min_value(&self) -> u64 {
self.0.u128_to_compact(self.0.min_value()).unwrap() as u64
}
fn max_value(&self) -> u64 {
self.0.u128_to_compact(self.0.max_value()).unwrap() as u64
}
fn num_vals(&self) -> u32 {
self.0.params.num_vals
}
#[inline]
fn iter(&self) -> Box<dyn Iterator<Item = u64> + '_> {
Box::new(self.0.iter_compact().map(|el| el as u64))
}
#[inline]
fn get_row_ids_for_value_range(
&self,
value_range: RangeInclusive<u64>,
position_range: Range<u32>,
positions: &mut Vec<u32>,
) {
let value_range = self.0.compact_to_u128(*value_range.start() as u32)
..=self.0.compact_to_u128(*value_range.end() as u32);
self.0
.get_row_ids_for_value_range(value_range, position_range, positions)
}
}
impl ColumnValues<u128> for CompactSpaceDecompressor {
#[inline]
fn get_val(&self, doc: u32) -> u128 {
@@ -402,9 +459,14 @@ impl CompactSpaceDecompressor {
.map(|compact| self.compact_to_u128(compact))
}
#[inline]
pub fn get_compact(&self, idx: u32) -> u32 {
self.params.bit_unpacker.get(idx, &self.data) as u32
}
#[inline]
pub fn get(&self, idx: u32) -> u128 {
let compact = self.params.bit_unpacker.get(idx, &self.data) as u32;
let compact = self.get_compact(idx);
self.compact_to_u128(compact)
}

View File

@@ -6,7 +6,9 @@ use std::sync::Arc;
mod compact_space;
use common::{BinarySerializable, OwnedBytes, VInt};
use compact_space::{CompactSpaceCompressor, CompactSpaceDecompressor};
pub use compact_space::{
CompactSpaceCompressor, CompactSpaceDecompressor, CompactSpaceU64Accessor,
};
use crate::column_values::monotonic_map_column;
use crate::column_values::monotonic_mapping::{
@@ -108,6 +110,23 @@ pub fn open_u128_mapped<T: MonotonicallyMappableToU128 + Debug>(
StrictlyMonotonicMappingToInternal::<T>::new().into();
Ok(Arc::new(monotonic_map_column(reader, inverted)))
}
/// Returns the u64 representation of the u128 data.
/// The internal representation of the data as u64 is useful for faster processing.
///
/// In order to convert to u128 back cast to `CompactSpaceU64Accessor` and call
/// `compact_to_u128`.
///
/// # Notice
/// In case there are new codecs added, check for usages of `CompactSpaceDecompressorU64` and
/// also handle the new codecs.
pub fn open_u128_as_compact_u64(mut bytes: OwnedBytes) -> io::Result<Arc<dyn ColumnValues<u64>>> {
let header = U128Header::deserialize(&mut bytes)?;
assert_eq!(header.codec_type, U128FastFieldCodecType::CompactSpace);
let reader = CompactSpaceU64Accessor::open(bytes)?;
Ok(Arc::new(reader))
}
#[cfg(test)]
pub mod tests {
use super::*;

View File

@@ -63,7 +63,6 @@ impl ColumnValues for BitpackedReader {
fn get_val(&self, doc: u32) -> u64 {
self.stats.min_value + self.stats.gcd.get() * self.bit_unpacker.get(doc, &self.data)
}
#[inline]
fn min_value(&self) -> u64 {
self.stats.min_value

View File

@@ -63,7 +63,10 @@ impl BlockwiseLinearEstimator {
if self.block.is_empty() {
return;
}
let line = Line::train(&VecColumn::from(&self.block));
let column = VecColumn::from(std::mem::take(&mut self.block));
let line = Line::train(&column);
self.block = column.into();
let mut max_value = 0u64;
for (i, buffer_val) in self.block.iter().enumerate() {
let interpolated_val = line.eval(i as u32);
@@ -125,7 +128,7 @@ impl ColumnCodecEstimator for BlockwiseLinearEstimator {
*buffer_val = gcd_divider.divide(*buffer_val - stats.min_value);
}
let line = Line::train(&VecColumn::from(&buffer));
let line = Line::train(&VecColumn::from(buffer.to_vec()));
assert!(!buffer.is_empty());

View File

@@ -184,7 +184,7 @@ mod tests {
}
fn test_eval_max_err(ys: &[u64]) -> Option<u64> {
let line = Line::train(&VecColumn::from(&ys));
let line = Line::train(&VecColumn::from(ys.to_vec()));
ys.iter()
.enumerate()
.map(|(x, y)| y.wrapping_sub(line.eval(x as u32)))

View File

@@ -173,7 +173,9 @@ impl LinearCodecEstimator {
fn collect_before_line_estimation(&mut self, value: u64) {
self.block.push(value);
if self.block.len() == LINE_ESTIMATION_BLOCK_LEN {
let line = Line::train(&VecColumn::from(&self.block));
let column = VecColumn::from(std::mem::take(&mut self.block));
let line = Line::train(&column);
self.block = column.into();
let block = std::mem::take(&mut self.block);
for val in block {
self.collect_after_line_estimation(&line, val);

View File

@@ -4,14 +4,14 @@ use tantivy_bitpacker::minmax;
use crate::ColumnValues;
/// VecColumn provides `Column` over a slice.
pub struct VecColumn<'a, T = u64> {
pub(crate) values: &'a [T],
/// VecColumn provides `Column` over a `Vec<T>`.
pub struct VecColumn<T = u64> {
pub(crate) values: Vec<T>,
pub(crate) min_value: T,
pub(crate) max_value: T,
}
impl<'a, T: Copy + PartialOrd + Send + Sync + Debug> ColumnValues<T> for VecColumn<'a, T> {
impl<T: Copy + PartialOrd + Send + Sync + Debug + 'static> ColumnValues<T> for VecColumn<T> {
fn get_val(&self, position: u32) -> T {
self.values[position as usize]
}
@@ -37,11 +37,8 @@ impl<'a, T: Copy + PartialOrd + Send + Sync + Debug> ColumnValues<T> for VecColu
}
}
impl<'a, T: Copy + PartialOrd + Default, V> From<&'a V> for VecColumn<'a, T>
where V: AsRef<[T]> + ?Sized
{
fn from(values: &'a V) -> Self {
let values = values.as_ref();
impl<T: Copy + PartialOrd + Default> From<Vec<T>> for VecColumn<T> {
fn from(values: Vec<T>) -> Self {
let (min_value, max_value) = minmax(values.iter().copied()).unwrap_or_default();
Self {
values,
@@ -50,3 +47,8 @@ where V: AsRef<[T]> + ?Sized
}
}
}
impl From<VecColumn> for Vec<u64> {
fn from(column: VecColumn) -> Self {
column.values
}
}

View File

@@ -13,9 +13,7 @@ pub(crate) use serializer::ColumnarSerializer;
use stacker::{Addr, ArenaHashMap, MemoryArena};
use crate::column_index::SerializableColumnIndex;
use crate::column_values::{
ColumnValues, MonotonicallyMappableToU128, MonotonicallyMappableToU64, VecColumn,
};
use crate::column_values::{MonotonicallyMappableToU128, MonotonicallyMappableToU64};
use crate::columnar::column_type::ColumnType;
use crate::columnar::writer::column_writers::{
ColumnWriter, NumericalColumnWriter, StrOrBytesColumnWriter,
@@ -645,10 +643,7 @@ fn send_to_serialize_column_mappable_to_u128<
value_index_builders: &mut PreallocatedIndexBuilders,
values: &mut Vec<T>,
mut wrt: impl io::Write,
) -> io::Result<()>
where
for<'a> VecColumn<'a, T>: ColumnValues<T>,
{
) -> io::Result<()> {
values.clear();
// TODO: split index and values
let serializable_column_index = match cardinality {
@@ -701,10 +696,7 @@ fn send_to_serialize_column_mappable_to_u64(
value_index_builders: &mut PreallocatedIndexBuilders,
values: &mut Vec<u64>,
mut wrt: impl io::Write,
) -> io::Result<()>
where
for<'a> VecColumn<'a, u64>: ColumnValues<u64>,
{
) -> io::Result<()> {
values.clear();
let serializable_column_index = match cardinality {
Cardinality::Full => {

View File

@@ -8,7 +8,7 @@ use common::{ByteCount, DateTime, HasLen, OwnedBytes};
use crate::column::{BytesColumn, Column, StrColumn};
use crate::column_values::{monotonic_map_column, StrictlyMonotonicFn};
use crate::columnar::ColumnType;
use crate::{Cardinality, ColumnIndex, NumericalType};
use crate::{Cardinality, ColumnIndex, ColumnValues, NumericalType};
#[derive(Clone)]
pub enum DynamicColumn {
@@ -247,7 +247,12 @@ impl DynamicColumnHandle {
}
/// Returns the `u64` fast field reader reader associated with `fields` of types
/// Str, u64, i64, f64, bool, or datetime.
/// Str, u64, i64, f64, bool, ip, or datetime.
///
/// Notice that for IpAddr, the fastfield reader will return the u64 representation of the
/// IpAddr.
/// In order to convert to u128 back cast to `CompactSpaceU64Accessor` and call
/// `compact_to_u128`.
///
/// If not, the fastfield reader will returns the u64-value associated with the original
/// FastValue.
@@ -258,7 +263,10 @@ impl DynamicColumnHandle {
let column: BytesColumn = crate::column::open_column_bytes(column_bytes)?;
Ok(Some(column.term_ord_column))
}
ColumnType::IpAddr => Ok(None),
ColumnType::IpAddr => {
let column = crate::column::open_column_u128_as_compact_u64(column_bytes)?;
Ok(Some(column))
}
ColumnType::Bool
| ColumnType::I64
| ColumnType::U64

View File

@@ -170,8 +170,8 @@ impl AggregationWithAccessor {
ColumnType::Str,
ColumnType::DateTime,
ColumnType::Bool,
ColumnType::IpAddr,
// ColumnType::Bytes Unsupported
// ColumnType::IpAddr Unsupported
];
// In case the column is empty we want the shim column to match the missing type

View File

@@ -816,38 +816,38 @@ fn test_aggregation_on_json_object_mixed_types() {
let mut index_writer: IndexWriter = index.writer_for_tests().unwrap();
// => Segment with all values numeric
index_writer
.add_document(doc!(json => json!({"mixed_type": 10.0})))
.add_document(doc!(json => json!({"mixed_type": 10.0, "mixed_price": 10.0})))
.unwrap();
index_writer.commit().unwrap();
// => Segment with all values text
index_writer
.add_document(doc!(json => json!({"mixed_type": "blue"})))
.add_document(doc!(json => json!({"mixed_type": "blue", "mixed_price": 5.0})))
.unwrap();
index_writer
.add_document(doc!(json => json!({"mixed_type": "blue"})))
.add_document(doc!(json => json!({"mixed_type": "blue", "mixed_price": 5.0})))
.unwrap();
index_writer
.add_document(doc!(json => json!({"mixed_type": "blue"})))
.add_document(doc!(json => json!({"mixed_type": "blue", "mixed_price": 5.0})))
.unwrap();
index_writer.commit().unwrap();
// => Segment with all boolen
index_writer
.add_document(doc!(json => json!({"mixed_type": true})))
.add_document(doc!(json => json!({"mixed_type": true, "mixed_price": "no_price"})))
.unwrap();
index_writer.commit().unwrap();
// => Segment with mixed values
index_writer
.add_document(doc!(json => json!({"mixed_type": "red"})))
.add_document(doc!(json => json!({"mixed_type": "red", "mixed_price": 1.0})))
.unwrap();
index_writer
.add_document(doc!(json => json!({"mixed_type": "red"})))
.add_document(doc!(json => json!({"mixed_type": "red", "mixed_price": 1.0})))
.unwrap();
index_writer
.add_document(doc!(json => json!({"mixed_type": -20.5})))
.add_document(doc!(json => json!({"mixed_type": -20.5, "mixed_price": -20.5})))
.unwrap();
index_writer
.add_document(doc!(json => json!({"mixed_type": true})))
.add_document(doc!(json => json!({"mixed_type": true, "mixed_price": "no_price"})))
.unwrap();
index_writer.commit().unwrap();
@@ -861,7 +861,7 @@ fn test_aggregation_on_json_object_mixed_types() {
"order": { "min_price": "desc" }
},
"aggs": {
"min_price": { "min": { "field": "json.mixed_type" } }
"min_price": { "min": { "field": "json.mixed_price" } }
}
},
"rangeagg": {
@@ -885,7 +885,6 @@ fn test_aggregation_on_json_object_mixed_types() {
let aggregation_results = searcher.search(&AllQuery, &aggregation_collector).unwrap();
let aggregation_res_json = serde_json::to_value(aggregation_results).unwrap();
// pretty print as json
use pretty_assertions::assert_eq;
assert_eq!(
&aggregation_res_json,
@@ -901,10 +900,10 @@ fn test_aggregation_on_json_object_mixed_types() {
"termagg": {
"buckets": [
{ "doc_count": 1, "key": 10.0, "min_price": { "value": 10.0 } },
{ "doc_count": 3, "key": "blue", "min_price": { "value": 5.0 } },
{ "doc_count": 2, "key": "red", "min_price": { "value": 1.0 } },
{ "doc_count": 1, "key": -20.5, "min_price": { "value": -20.5 } },
{ "doc_count": 2, "key": "red", "min_price": { "value": null } },
{ "doc_count": 2, "key": 1.0, "key_as_string": "true", "min_price": { "value": null } },
{ "doc_count": 3, "key": "blue", "min_price": { "value": null } },
],
"sum_other_doc_count": 0
}

View File

@@ -1,6 +1,10 @@
use std::fmt::Debug;
use std::net::Ipv6Addr;
use columnar::{BytesColumn, ColumnType, MonotonicallyMappableToU64, StrColumn};
use columnar::column_values::CompactSpaceU64Accessor;
use columnar::{
BytesColumn, ColumnType, MonotonicallyMappableToU128, MonotonicallyMappableToU64, StrColumn,
};
use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize};
@@ -538,6 +542,27 @@ impl SegmentTermCollector {
let val = bool::from_u64(val);
dict.insert(IntermediateKey::Bool(val), intermediate_entry);
}
} else if self.column_type == ColumnType::IpAddr {
let compact_space_accessor = agg_with_accessor
.accessor
.values
.clone()
.downcast_arc::<CompactSpaceU64Accessor>()
.map_err(|_| {
TantivyError::AggregationError(
crate::aggregation::AggregationError::InternalError(
"Type mismatch: Could not downcast to CompactSpaceU64Accessor"
.to_string(),
),
)
})?;
for (val, doc_count) in entries {
let intermediate_entry = into_intermediate_bucket_entry(val, doc_count)?;
let val: u128 = compact_space_accessor.compact_to_u128(val as u32);
let val = Ipv6Addr::from_u128(val);
dict.insert(IntermediateKey::IpAddr(val), intermediate_entry);
}
} else {
for (val, doc_count) in entries {
let intermediate_entry = into_intermediate_bucket_entry(val, doc_count)?;
@@ -590,6 +615,9 @@ pub(crate) fn cut_off_buckets<T: GetDocCount + Debug>(
#[cfg(test)]
mod tests {
use std::net::IpAddr;
use std::str::FromStr;
use common::DateTime;
use time::{Date, Month};
@@ -600,7 +628,7 @@ mod tests {
};
use crate::aggregation::AggregationLimits;
use crate::indexer::NoMergePolicy;
use crate::schema::{Schema, FAST, STRING};
use crate::schema::{IntoIpv6Addr, Schema, FAST, STRING};
use crate::{Index, IndexWriter};
#[test]
@@ -1182,9 +1210,9 @@ mod tests {
assert_eq!(res["my_texts"]["buckets"][0]["key"], "terma");
assert_eq!(res["my_texts"]["buckets"][0]["doc_count"], 4);
assert_eq!(res["my_texts"]["buckets"][1]["key"], "termc");
assert_eq!(res["my_texts"]["buckets"][1]["key"], "termb");
assert_eq!(res["my_texts"]["buckets"][1]["doc_count"], 0);
assert_eq!(res["my_texts"]["buckets"][2]["key"], "termb");
assert_eq!(res["my_texts"]["buckets"][2]["key"], "termc");
assert_eq!(res["my_texts"]["buckets"][2]["doc_count"], 0);
assert_eq!(res["my_texts"]["sum_other_doc_count"], 0);
assert_eq!(res["my_texts"]["doc_count_error_upper_bound"], 0);
@@ -1930,4 +1958,44 @@ mod tests {
Ok(())
}
#[test]
fn terms_aggregation_ip_addr() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let field = schema_builder.add_ip_addr_field("ip_field", FAST);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
{
let mut writer = index.writer_with_num_threads(1, 15_000_000)?;
// IpV6 loopback
writer.add_document(doc!(field=>IpAddr::from_str("::1").unwrap().into_ipv6_addr()))?;
writer.add_document(doc!(field=>IpAddr::from_str("::1").unwrap().into_ipv6_addr()))?;
// IpV4
writer.add_document(
doc!(field=>IpAddr::from_str("127.0.0.1").unwrap().into_ipv6_addr()),
)?;
writer.commit()?;
}
let agg_req: Aggregations = serde_json::from_value(json!({
"my_bool": {
"terms": {
"field": "ip_field"
},
}
}))
.unwrap();
let res = exec_request_with_query(agg_req, &index, None)?;
// print as json
// println!("{}", serde_json::to_string_pretty(&res).unwrap());
assert_eq!(res["my_bool"]["buckets"][0]["key"], "::1");
assert_eq!(res["my_bool"]["buckets"][0]["doc_count"], 2);
assert_eq!(res["my_bool"]["buckets"][1]["key"], "127.0.0.1");
assert_eq!(res["my_bool"]["buckets"][1]["doc_count"], 1);
assert_eq!(res["my_bool"]["buckets"][2]["key"], serde_json::Value::Null);
Ok(())
}
}

View File

@@ -5,6 +5,7 @@
use std::cmp::Ordering;
use std::collections::hash_map::Entry;
use std::hash::Hash;
use std::net::Ipv6Addr;
use columnar::ColumnType;
use itertools::Itertools;
@@ -41,6 +42,8 @@ pub struct IntermediateAggregationResults {
/// This might seem redundant with `Key`, but the point is to have a different
/// Serialize implementation.
pub enum IntermediateKey {
/// Ip Addr key
IpAddr(Ipv6Addr),
/// Bool key
Bool(bool),
/// String key
@@ -60,6 +63,14 @@ impl From<IntermediateKey> for Key {
fn from(value: IntermediateKey) -> Self {
match value {
IntermediateKey::Str(s) => Self::Str(s),
IntermediateKey::IpAddr(s) => {
// Prefer to use the IPv4 representation if possible
if let Some(ip) = s.to_ipv4_mapped() {
Self::Str(ip.to_string())
} else {
Self::Str(s.to_string())
}
}
IntermediateKey::F64(f) => Self::F64(f),
IntermediateKey::Bool(f) => Self::F64(f as u64 as f64),
}
@@ -75,6 +86,7 @@ impl std::hash::Hash for IntermediateKey {
IntermediateKey::Str(text) => text.hash(state),
IntermediateKey::F64(val) => val.to_bits().hash(state),
IntermediateKey::Bool(val) => val.hash(state),
IntermediateKey::IpAddr(val) => val.hash(state),
}
}
}