mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-06 09:12:55 +00:00
switch to ms in histogram for date type (#2045)
* switch to ms in histogram for date type switch to ms in histogram, by adding a normalization step that converts to nanoseconds precision when creating the collector. closes #2028 related to #2026 * add missing unit long variants * use single thread to avoid handling test case * fix docs * revert CI * cleanup * improve docs * Update src/aggregation/bucket/histogram/histogram.rs Co-authored-by: Paul Masurel <paul@quickwit.io> --------- Co-authored-by: Paul Masurel <paul@quickwit.io>
This commit is contained in:
@@ -54,6 +54,9 @@ impl ColumnType {
|
||||
pub fn to_code(self) -> u8 {
|
||||
self as u8
|
||||
}
|
||||
pub fn is_date_time(&self) -> bool {
|
||||
self == &ColumnType::DateTime
|
||||
}
|
||||
|
||||
pub(crate) fn try_from_code(code: u8) -> Result<ColumnType, InvalidData> {
|
||||
COLUMN_TYPES.get(code as usize).copied().ok_or(InvalidData)
|
||||
|
||||
@@ -39,7 +39,7 @@ use super::metric::{
|
||||
};
|
||||
|
||||
/// The top-level aggregation request structure, which contains [`Aggregation`] and their user
|
||||
/// defined names. It is also used in [buckets](BucketAggregation) to define sub-aggregations.
|
||||
/// defined names. It is also used in buckets aggregations to define sub-aggregations.
|
||||
///
|
||||
/// The key is the user defined name of the aggregation.
|
||||
pub type Aggregations = HashMap<String, Aggregation>;
|
||||
|
||||
@@ -32,12 +32,13 @@ impl AggregationsWithAccessor {
|
||||
|
||||
pub struct AggregationWithAccessor {
|
||||
/// In general there can be buckets without fast field access, e.g. buckets that are created
|
||||
/// based on search terms. So eventually this needs to be Option or moved.
|
||||
/// based on search terms. That is not that case currently, but eventually this needs to be
|
||||
/// Option or moved.
|
||||
pub(crate) accessor: Column<u64>,
|
||||
pub(crate) str_dict_column: Option<StrColumn>,
|
||||
pub(crate) field_type: ColumnType,
|
||||
/// In case there are multiple types of fast fields, e.g. string and numeric.
|
||||
/// Only used for term aggregations
|
||||
/// Only used for term aggregations currently.
|
||||
pub(crate) accessor2: Option<(Column<u64>, ColumnType)>,
|
||||
pub(crate) sub_aggregation: AggregationsWithAccessor,
|
||||
pub(crate) limits: ResourceLimitGuard,
|
||||
@@ -105,6 +106,7 @@ impl AggregationWithAccessor {
|
||||
(accessor, field_type)
|
||||
}
|
||||
};
|
||||
|
||||
let sub_aggregation = sub_aggregation.clone();
|
||||
Ok(AggregationWithAccessor {
|
||||
accessor,
|
||||
|
||||
@@ -67,6 +67,13 @@ pub struct DateHistogramAggregationReq {
|
||||
pub fixed_interval: Option<String>,
|
||||
/// Intervals implicitly defines an absolute grid of buckets `[interval * k, interval * (k +
|
||||
/// 1))`.
|
||||
///
|
||||
/// Offset makes it possible to shift this grid into
|
||||
/// `[offset + interval * k, offset + interval * (k + 1))`. Offset has to be in the range [0,
|
||||
/// interval).
|
||||
///
|
||||
/// The `offset` parameter is has the same syntax as the `fixed_interval` parameter, but
|
||||
/// also allows for negative values.
|
||||
pub offset: Option<String>,
|
||||
/// The minimum number of documents in a bucket to be returned. Defaults to 0.
|
||||
pub min_doc_count: Option<u64>,
|
||||
@@ -77,7 +84,7 @@ pub struct DateHistogramAggregationReq {
|
||||
/// hard_bounds only limits the buckets, to force a range set both extended_bounds and
|
||||
/// hard_bounds to the same range.
|
||||
///
|
||||
/// Needs to be provided as timestamp in nanosecond precision.
|
||||
/// Needs to be provided as timestamp in millisecond precision.
|
||||
///
|
||||
/// ## Example
|
||||
/// ```json
|
||||
@@ -88,7 +95,7 @@ pub struct DateHistogramAggregationReq {
|
||||
/// "interval": "1d",
|
||||
/// "hard_bounds": {
|
||||
/// "min": 0,
|
||||
/// "max": 1420502400000000000
|
||||
/// "max": 1420502400000
|
||||
/// }
|
||||
/// }
|
||||
/// }
|
||||
@@ -114,11 +121,11 @@ impl DateHistogramAggregationReq {
|
||||
self.validate()?;
|
||||
Ok(HistogramAggregation {
|
||||
field: self.field.to_string(),
|
||||
interval: parse_into_nanoseconds(self.fixed_interval.as_ref().unwrap())? as f64,
|
||||
interval: parse_into_milliseconds(self.fixed_interval.as_ref().unwrap())? as f64,
|
||||
offset: self
|
||||
.offset
|
||||
.as_ref()
|
||||
.map(|offset| parse_offset_into_nanosecs(offset))
|
||||
.map(|offset| parse_offset_into_milliseconds(offset))
|
||||
.transpose()?
|
||||
.map(|el| el as f64),
|
||||
min_doc_count: self.min_doc_count,
|
||||
@@ -153,7 +160,7 @@ impl DateHistogramAggregationReq {
|
||||
));
|
||||
}
|
||||
|
||||
parse_into_nanoseconds(self.fixed_interval.as_ref().unwrap())?;
|
||||
parse_into_milliseconds(self.fixed_interval.as_ref().unwrap())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -179,7 +186,7 @@ pub enum DateHistogramParseError {
|
||||
OutOfBounds(String),
|
||||
}
|
||||
|
||||
fn parse_offset_into_nanosecs(input: &str) -> Result<i64, AggregationError> {
|
||||
fn parse_offset_into_milliseconds(input: &str) -> Result<i64, AggregationError> {
|
||||
let is_sign = |byte| &[byte] == b"-" || &[byte] == b"+";
|
||||
if input.is_empty() {
|
||||
return Err(DateHistogramParseError::InvalidOffset(input.to_string()).into());
|
||||
@@ -188,18 +195,18 @@ fn parse_offset_into_nanosecs(input: &str) -> Result<i64, AggregationError> {
|
||||
let has_sign = is_sign(input.as_bytes()[0]);
|
||||
if has_sign {
|
||||
let (sign, input) = input.split_at(1);
|
||||
let val = parse_into_nanoseconds(input)?;
|
||||
let val = parse_into_milliseconds(input)?;
|
||||
if sign == "-" {
|
||||
Ok(-val)
|
||||
} else {
|
||||
Ok(val)
|
||||
}
|
||||
} else {
|
||||
parse_into_nanoseconds(input)
|
||||
parse_into_milliseconds(input)
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_into_nanoseconds(input: &str) -> Result<i64, AggregationError> {
|
||||
fn parse_into_milliseconds(input: &str) -> Result<i64, AggregationError> {
|
||||
let split_boundary = input
|
||||
.as_bytes()
|
||||
.iter()
|
||||
@@ -218,17 +225,18 @@ fn parse_into_nanoseconds(input: &str) -> Result<i64, AggregationError> {
|
||||
// here and being defensive does not hurt.
|
||||
.map_err(|_err| DateHistogramParseError::NumberMissing(input.to_string()))?;
|
||||
|
||||
let multiplier_from_unit = match unit {
|
||||
"ms" => 1,
|
||||
"s" => 1000,
|
||||
"m" => 60 * 1000,
|
||||
"h" => 60 * 60 * 1000,
|
||||
"d" => 24 * 60 * 60 * 1000,
|
||||
let unit_in_ms = match unit {
|
||||
"ms" | "milliseconds" => 1,
|
||||
"s" | "seconds" => 1000,
|
||||
"m" | "minutes" => 60 * 1000,
|
||||
"h" | "hours" => 60 * 60 * 1000,
|
||||
"d" | "days" => 24 * 60 * 60 * 1000,
|
||||
_ => return Err(DateHistogramParseError::UnitNotRecognized(unit.to_string()).into()),
|
||||
};
|
||||
|
||||
let val = (number * multiplier_from_unit)
|
||||
.checked_mul(1_000_000)
|
||||
let val = number * unit_in_ms;
|
||||
// The field type is in nanoseconds precision, so validate the value to fit the range
|
||||
val.checked_mul(1_000_000)
|
||||
.ok_or_else(|| DateHistogramParseError::OutOfBounds(input.to_string()))?;
|
||||
|
||||
Ok(val)
|
||||
@@ -246,49 +254,50 @@ mod tests {
|
||||
use crate::Index;
|
||||
|
||||
#[test]
|
||||
fn test_parse_into_nanosecs() {
|
||||
assert_eq!(parse_into_nanoseconds("1m").unwrap(), 60_000_000_000);
|
||||
assert_eq!(parse_into_nanoseconds("2m").unwrap(), 120_000_000_000);
|
||||
fn test_parse_into_millisecs() {
|
||||
assert_eq!(parse_into_milliseconds("1m").unwrap(), 60_000);
|
||||
assert_eq!(parse_into_milliseconds("2m").unwrap(), 120_000);
|
||||
assert_eq!(parse_into_milliseconds("2minutes").unwrap(), 120_000);
|
||||
assert_eq!(
|
||||
parse_into_nanoseconds("2y").unwrap_err(),
|
||||
parse_into_milliseconds("2y").unwrap_err(),
|
||||
DateHistogramParseError::UnitNotRecognized("y".to_string()).into()
|
||||
);
|
||||
assert_eq!(
|
||||
parse_into_nanoseconds("2000").unwrap_err(),
|
||||
parse_into_milliseconds("2000").unwrap_err(),
|
||||
DateHistogramParseError::UnitMissing("2000".to_string()).into()
|
||||
);
|
||||
assert_eq!(
|
||||
parse_into_nanoseconds("ms").unwrap_err(),
|
||||
parse_into_milliseconds("ms").unwrap_err(),
|
||||
DateHistogramParseError::NumberMissing("ms".to_string()).into()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_offset_into_nanosecs() {
|
||||
assert_eq!(parse_offset_into_nanosecs("1m").unwrap(), 60_000_000_000);
|
||||
assert_eq!(parse_offset_into_nanosecs("+1m").unwrap(), 60_000_000_000);
|
||||
assert_eq!(parse_offset_into_nanosecs("-1m").unwrap(), -60_000_000_000);
|
||||
assert_eq!(parse_offset_into_nanosecs("2m").unwrap(), 120_000_000_000);
|
||||
assert_eq!(parse_offset_into_nanosecs("+2m").unwrap(), 120_000_000_000);
|
||||
assert_eq!(parse_offset_into_nanosecs("-2m").unwrap(), -120_000_000_000);
|
||||
assert_eq!(parse_offset_into_nanosecs("-2ms").unwrap(), -2_000_000);
|
||||
fn test_parse_offset_into_milliseconds() {
|
||||
assert_eq!(parse_offset_into_milliseconds("1m").unwrap(), 60_000);
|
||||
assert_eq!(parse_offset_into_milliseconds("+1m").unwrap(), 60_000);
|
||||
assert_eq!(parse_offset_into_milliseconds("-1m").unwrap(), -60_000);
|
||||
assert_eq!(parse_offset_into_milliseconds("2m").unwrap(), 120_000);
|
||||
assert_eq!(parse_offset_into_milliseconds("+2m").unwrap(), 120_000);
|
||||
assert_eq!(parse_offset_into_milliseconds("-2m").unwrap(), -120_000);
|
||||
assert_eq!(parse_offset_into_milliseconds("-2ms").unwrap(), -2);
|
||||
assert_eq!(
|
||||
parse_offset_into_nanosecs("2y").unwrap_err(),
|
||||
parse_offset_into_milliseconds("2y").unwrap_err(),
|
||||
DateHistogramParseError::UnitNotRecognized("y".to_string()).into()
|
||||
);
|
||||
assert_eq!(
|
||||
parse_offset_into_nanosecs("2000").unwrap_err(),
|
||||
parse_offset_into_milliseconds("2000").unwrap_err(),
|
||||
DateHistogramParseError::UnitMissing("2000".to_string()).into()
|
||||
);
|
||||
assert_eq!(
|
||||
parse_offset_into_nanosecs("ms").unwrap_err(),
|
||||
parse_offset_into_milliseconds("ms").unwrap_err(),
|
||||
DateHistogramParseError::NumberMissing("ms".to_string()).into()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_into_milliseconds_do_not_accept_non_ascii() {
|
||||
assert!(parse_into_nanoseconds("1m").is_err());
|
||||
assert!(parse_into_milliseconds("1m").is_err());
|
||||
}
|
||||
|
||||
pub fn get_test_index_from_docs(
|
||||
@@ -369,7 +378,7 @@ mod tests {
|
||||
"buckets" : [
|
||||
{
|
||||
"key_as_string" : "2015-01-01T00:00:00Z",
|
||||
"key" : 1420070400000000000.0,
|
||||
"key" : 1420070400000.0,
|
||||
"doc_count" : 4
|
||||
}
|
||||
]
|
||||
@@ -407,7 +416,7 @@ mod tests {
|
||||
"buckets" : [
|
||||
{
|
||||
"key_as_string" : "2015-01-01T00:00:00Z",
|
||||
"key" : 1420070400000000000.0,
|
||||
"key" : 1420070400000.0,
|
||||
"doc_count" : 4,
|
||||
"texts": {
|
||||
"buckets": [
|
||||
@@ -456,32 +465,32 @@ mod tests {
|
||||
"buckets": [
|
||||
{
|
||||
"doc_count": 2,
|
||||
"key": 1420070400000000000.0,
|
||||
"key": 1420070400000.0,
|
||||
"key_as_string": "2015-01-01T00:00:00Z"
|
||||
},
|
||||
{
|
||||
"doc_count": 1,
|
||||
"key": 1420156800000000000.0,
|
||||
"key": 1420156800000.0,
|
||||
"key_as_string": "2015-01-02T00:00:00Z"
|
||||
},
|
||||
{
|
||||
"doc_count": 0,
|
||||
"key": 1420243200000000000.0,
|
||||
"key": 1420243200000.0,
|
||||
"key_as_string": "2015-01-03T00:00:00Z"
|
||||
},
|
||||
{
|
||||
"doc_count": 0,
|
||||
"key": 1420329600000000000.0,
|
||||
"key": 1420329600000.0,
|
||||
"key_as_string": "2015-01-04T00:00:00Z"
|
||||
},
|
||||
{
|
||||
"doc_count": 0,
|
||||
"key": 1420416000000000000.0,
|
||||
"key": 1420416000000.0,
|
||||
"key_as_string": "2015-01-05T00:00:00Z"
|
||||
},
|
||||
{
|
||||
"doc_count": 1,
|
||||
"key": 1420502400000000000.0,
|
||||
"key": 1420502400000.0,
|
||||
"key_as_string": "2015-01-06T00:00:00Z"
|
||||
}
|
||||
]
|
||||
@@ -499,8 +508,8 @@ mod tests {
|
||||
"field": "date",
|
||||
"fixed_interval": "1d",
|
||||
"extended_bounds": {
|
||||
"min": 1419984000000000000.0,
|
||||
"max": 1420588800000000000.0
|
||||
"min": 1419984000000.0,
|
||||
"max": 1420588800000.0
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -517,42 +526,42 @@ mod tests {
|
||||
"buckets": [
|
||||
{
|
||||
"doc_count": 0,
|
||||
"key": 1419984000000000000.0,
|
||||
"key": 1419984000000.0,
|
||||
"key_as_string": "2014-12-31T00:00:00Z"
|
||||
},
|
||||
{
|
||||
"doc_count": 2,
|
||||
"key": 1420070400000000000.0,
|
||||
"key": 1420070400000.0,
|
||||
"key_as_string": "2015-01-01T00:00:00Z"
|
||||
},
|
||||
{
|
||||
"doc_count": 1,
|
||||
"key": 1420156800000000000.0,
|
||||
"key": 1420156800000.0,
|
||||
"key_as_string": "2015-01-02T00:00:00Z"
|
||||
},
|
||||
{
|
||||
"doc_count": 0,
|
||||
"key": 1420243200000000000.0,
|
||||
"key": 1420243200000.0,
|
||||
"key_as_string": "2015-01-03T00:00:00Z"
|
||||
},
|
||||
{
|
||||
"doc_count": 0,
|
||||
"key": 1420329600000000000.0,
|
||||
"key": 1420329600000.0,
|
||||
"key_as_string": "2015-01-04T00:00:00Z"
|
||||
},
|
||||
{
|
||||
"doc_count": 0,
|
||||
"key": 1420416000000000000.0,
|
||||
"key": 1420416000000.0,
|
||||
"key_as_string": "2015-01-05T00:00:00Z"
|
||||
},
|
||||
{
|
||||
"doc_count": 1,
|
||||
"key": 1420502400000000000.0,
|
||||
"key": 1420502400000.0,
|
||||
"key_as_string": "2015-01-06T00:00:00Z"
|
||||
},
|
||||
{
|
||||
"doc_count": 0,
|
||||
"key": 1.4205888e18,
|
||||
"key": 1420588800000.0,
|
||||
"key_as_string": "2015-01-07T00:00:00Z"
|
||||
}
|
||||
]
|
||||
@@ -569,8 +578,8 @@ mod tests {
|
||||
"field": "date",
|
||||
"fixed_interval": "1d",
|
||||
"hard_bounds": {
|
||||
"min": 1420156800000000000.0,
|
||||
"max": 1420243200000000000.0
|
||||
"min": 1420156800000.0,
|
||||
"max": 1420243200000.0
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -587,7 +596,7 @@ mod tests {
|
||||
"buckets": [
|
||||
{
|
||||
"doc_count": 1,
|
||||
"key": 1420156800000000000.0,
|
||||
"key": 1420156800000.0,
|
||||
"key_as_string": "2015-01-02T00:00:00Z"
|
||||
}
|
||||
]
|
||||
|
||||
@@ -125,6 +125,22 @@ pub struct HistogramAggregation {
|
||||
}
|
||||
|
||||
impl HistogramAggregation {
|
||||
pub(crate) fn normalize(&mut self, column_type: ColumnType) {
|
||||
if column_type.is_date_time() {
|
||||
// values are provided in ms, but the fastfield is in nano seconds
|
||||
self.interval *= 1_000_000.0;
|
||||
self.offset = self.offset.map(|off| off * 1_000_000.0);
|
||||
self.hard_bounds = self.hard_bounds.map(|bounds| HistogramBounds {
|
||||
min: bounds.min * 1_000_000.0,
|
||||
max: bounds.max * 1_000_000.0,
|
||||
});
|
||||
self.extended_bounds = self.extended_bounds.map(|bounds| HistogramBounds {
|
||||
min: bounds.min * 1_000_000.0,
|
||||
max: bounds.max * 1_000_000.0,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn validate(&self) -> crate::Result<()> {
|
||||
if self.interval <= 0.0f64 {
|
||||
return Err(TantivyError::InvalidArgument(
|
||||
@@ -187,12 +203,14 @@ pub(crate) struct SegmentHistogramBucketEntry {
|
||||
impl SegmentHistogramBucketEntry {
|
||||
pub(crate) fn into_intermediate_bucket_entry(
|
||||
self,
|
||||
sub_aggregation: Box<dyn SegmentAggregationCollector>,
|
||||
sub_aggregation: Option<Box<dyn SegmentAggregationCollector>>,
|
||||
agg_with_accessor: &AggregationsWithAccessor,
|
||||
) -> crate::Result<IntermediateHistogramBucketEntry> {
|
||||
let mut sub_aggregation_res = IntermediateAggregationResults::default();
|
||||
sub_aggregation
|
||||
.add_intermediate_aggregation_result(agg_with_accessor, &mut sub_aggregation_res)?;
|
||||
if let Some(sub_aggregation) = sub_aggregation {
|
||||
sub_aggregation
|
||||
.add_intermediate_aggregation_result(agg_with_accessor, &mut sub_aggregation_res)?;
|
||||
}
|
||||
Ok(IntermediateHistogramBucketEntry {
|
||||
key: self.key,
|
||||
doc_count: self.doc_count,
|
||||
@@ -312,19 +330,15 @@ impl SegmentHistogramCollector {
|
||||
) -> crate::Result<IntermediateBucketResult> {
|
||||
let mut buckets = Vec::with_capacity(self.buckets.len());
|
||||
|
||||
if self.sub_aggregation_blueprint.is_some() {
|
||||
for (bucket_pos, bucket) in self.buckets.into_iter() {
|
||||
let bucket_res = bucket.into_intermediate_bucket_entry(
|
||||
self.sub_aggregations.get(&bucket_pos).unwrap().clone(),
|
||||
&agg_with_accessor.sub_aggregation,
|
||||
);
|
||||
for (bucket_pos, bucket) in self.buckets {
|
||||
let bucket_res = bucket.into_intermediate_bucket_entry(
|
||||
self.sub_aggregations.get(&bucket_pos).cloned(),
|
||||
&agg_with_accessor.sub_aggregation,
|
||||
);
|
||||
|
||||
buckets.push(bucket_res?);
|
||||
}
|
||||
} else {
|
||||
buckets.extend(self.buckets.into_values().map(|bucket| bucket.into()));
|
||||
};
|
||||
buckets.sort_unstable_by(|b1, b2| b1.key.partial_cmp(&b2.key).unwrap_or(Ordering::Equal));
|
||||
buckets.push(bucket_res?);
|
||||
}
|
||||
buckets.sort_unstable_by(|b1, b2| b1.key.total_cmp(&b2.key));
|
||||
|
||||
Ok(IntermediateBucketResult::Histogram {
|
||||
buckets,
|
||||
@@ -333,12 +347,13 @@ impl SegmentHistogramCollector {
|
||||
}
|
||||
|
||||
pub(crate) fn from_req_and_validate(
|
||||
req: &HistogramAggregation,
|
||||
mut req: HistogramAggregation,
|
||||
sub_aggregation: &mut AggregationsWithAccessor,
|
||||
field_type: ColumnType,
|
||||
accessor_idx: usize,
|
||||
) -> crate::Result<Self> {
|
||||
req.validate()?;
|
||||
req.normalize(field_type);
|
||||
|
||||
let sub_aggregation_blueprint = if sub_aggregation.is_empty() {
|
||||
None
|
||||
@@ -396,11 +411,11 @@ fn intermediate_buckets_to_final_buckets_fill_gaps(
|
||||
// memory check upfront
|
||||
let (_, first_bucket_num, last_bucket_num) =
|
||||
generate_bucket_pos_with_opt_minmax(histogram_req, min_max);
|
||||
let added_buckets = (first_bucket_num..=last_bucket_num)
|
||||
.count()
|
||||
.saturating_sub(buckets.len());
|
||||
// It's based on user input, so we need to account for overflows
|
||||
let added_buckets = ((last_bucket_num.saturating_sub(first_bucket_num)).max(0) as u64)
|
||||
.saturating_sub(buckets.len() as u64);
|
||||
limits.add_memory_consumed(
|
||||
added_buckets as u64 * std::mem::size_of::<IntermediateHistogramBucketEntry>() as u64,
|
||||
added_buckets * std::mem::size_of::<IntermediateHistogramBucketEntry>() as u64,
|
||||
)?;
|
||||
// create buckets
|
||||
let fill_gaps_buckets = generate_buckets_with_opt_minmax(histogram_req, min_max);
|
||||
@@ -409,7 +424,7 @@ fn intermediate_buckets_to_final_buckets_fill_gaps(
|
||||
|
||||
// Use merge_join_by to fill in gaps, since buckets are sorted
|
||||
|
||||
buckets
|
||||
let final_buckets: Vec<BucketEntry> = buckets
|
||||
.into_iter()
|
||||
.merge_join_by(
|
||||
fill_gaps_buckets.into_iter(),
|
||||
@@ -434,7 +449,9 @@ fn intermediate_buckets_to_final_buckets_fill_gaps(
|
||||
.map(|intermediate_bucket| {
|
||||
intermediate_bucket.into_final_bucket_entry(sub_aggregation, limits)
|
||||
})
|
||||
.collect::<crate::Result<Vec<_>>>()
|
||||
.collect::<crate::Result<Vec<_>>>()?;
|
||||
|
||||
Ok(final_buckets)
|
||||
}
|
||||
|
||||
// Convert to BucketEntry
|
||||
@@ -445,14 +462,20 @@ pub(crate) fn intermediate_histogram_buckets_to_final_buckets(
|
||||
sub_aggregation: &Aggregations,
|
||||
limits: &AggregationLimits,
|
||||
) -> crate::Result<Vec<BucketEntry>> {
|
||||
// Normalization is column type dependent.
|
||||
// The request used in the the call to final is not yet be normalized.
|
||||
// Normalization is changing the precision from milliseconds to nanoseconds.
|
||||
let mut histogram_req = histogram_req.clone();
|
||||
if let Some(column_type) = column_type {
|
||||
histogram_req.normalize(column_type);
|
||||
}
|
||||
let mut buckets = if histogram_req.min_doc_count() == 0 {
|
||||
// With min_doc_count != 0, we may need to add buckets, so that there are no
|
||||
// gaps, since intermediate result does not contain empty buckets (filtered to
|
||||
// reduce serialization size).
|
||||
|
||||
intermediate_buckets_to_final_buckets_fill_gaps(
|
||||
buckets,
|
||||
histogram_req,
|
||||
&histogram_req,
|
||||
sub_aggregation,
|
||||
limits,
|
||||
)?
|
||||
@@ -467,10 +490,12 @@ pub(crate) fn intermediate_histogram_buckets_to_final_buckets(
|
||||
};
|
||||
|
||||
// If we have a date type on the histogram buckets, we add the `key_as_string` field as rfc339
|
||||
// and normalize from nanoseconds to milliseconds
|
||||
if column_type == Some(ColumnType::DateTime) {
|
||||
for bucket in buckets.iter_mut() {
|
||||
if let crate::aggregation::Key::F64(val) = bucket.key {
|
||||
let key_as_string = format_date(val as i64)?;
|
||||
if let crate::aggregation::Key::F64(ref mut val) = bucket.key {
|
||||
let key_as_string = format_date(*val as i64)?;
|
||||
*val /= 1_000_000.0;
|
||||
bucket.key_as_string = Some(key_as_string);
|
||||
}
|
||||
}
|
||||
@@ -1203,7 +1228,7 @@ mod tests {
|
||||
"histogram": {
|
||||
"histogram": {
|
||||
"field": "date",
|
||||
"interval": 86400000000000.0, // one day in nano seconds
|
||||
"interval": 86400000.0, // one day in milliseconds seconds
|
||||
},
|
||||
}
|
||||
}))
|
||||
@@ -1213,14 +1238,14 @@ mod tests {
|
||||
|
||||
let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?;
|
||||
|
||||
assert_eq!(res["histogram"]["buckets"][0]["key"], 1546300800000000000.0);
|
||||
assert_eq!(res["histogram"]["buckets"][0]["key"], 1546300800000.0);
|
||||
assert_eq!(
|
||||
res["histogram"]["buckets"][0]["key_as_string"],
|
||||
"2019-01-01T00:00:00Z"
|
||||
);
|
||||
assert_eq!(res["histogram"]["buckets"][0]["doc_count"], 1);
|
||||
|
||||
assert_eq!(res["histogram"]["buckets"][1]["key"], 1546387200000000000.0);
|
||||
assert_eq!(res["histogram"]["buckets"][1]["key"], 1546387200000.0);
|
||||
assert_eq!(
|
||||
res["histogram"]["buckets"][1]["key_as_string"],
|
||||
"2019-01-02T00:00:00Z"
|
||||
@@ -1228,7 +1253,7 @@ mod tests {
|
||||
|
||||
assert_eq!(res["histogram"]["buckets"][1]["doc_count"], 5);
|
||||
|
||||
assert_eq!(res["histogram"]["buckets"][2]["key"], 1546473600000000000.0);
|
||||
assert_eq!(res["histogram"]["buckets"][2]["key"], 1546473600000.0);
|
||||
assert_eq!(
|
||||
res["histogram"]["buckets"][2]["key_as_string"],
|
||||
"2019-01-03T00:00:00Z"
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
//! Module for all bucket aggregations.
|
||||
//!
|
||||
//! BucketAggregations create buckets of documents
|
||||
//! [`BucketAggregation`](super::agg_req::BucketAggregation).
|
||||
//! BucketAggregations create buckets of documents.
|
||||
//! Each bucket is associated with a rule which
|
||||
//! determines whether or not a document in the falls into it. In other words, the buckets
|
||||
//! effectively define document sets. Buckets are not necessarily disjunct, therefore a document can
|
||||
|
||||
@@ -15,8 +15,7 @@ use super::agg_req::{Aggregation, AggregationVariants, Aggregations};
|
||||
use super::agg_result::{AggregationResult, BucketResult, MetricResult, RangeBucketEntry};
|
||||
use super::bucket::{
|
||||
cut_off_buckets, get_agg_name_and_property, intermediate_histogram_buckets_to_final_buckets,
|
||||
GetDocCount, Order, OrderTarget, RangeAggregation, SegmentHistogramBucketEntry,
|
||||
TermsAggregation,
|
||||
GetDocCount, Order, OrderTarget, RangeAggregation, TermsAggregation,
|
||||
};
|
||||
use super::metric::{
|
||||
IntermediateAverage, IntermediateCount, IntermediateMax, IntermediateMin, IntermediateStats,
|
||||
@@ -646,16 +645,6 @@ impl IntermediateHistogramBucketEntry {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SegmentHistogramBucketEntry> for IntermediateHistogramBucketEntry {
|
||||
fn from(entry: SegmentHistogramBucketEntry) -> Self {
|
||||
IntermediateHistogramBucketEntry {
|
||||
key: entry.key,
|
||||
doc_count: entry.doc_count,
|
||||
sub_aggregation: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// This is the range entry for a bucket, which contains a key, count, and optionally
|
||||
/// sub_aggregations.
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
|
||||
@@ -21,20 +21,19 @@ use crate::{DocId, TantivyError};
|
||||
/// data falls. For instance, the 95th percentile indicates the value below which
|
||||
/// 95% of the data points can be found.
|
||||
///
|
||||
/// This aggregation can be particularly interesting for analyzing website load
|
||||
/// times. By computing the percentiles of load times, you can get insights into
|
||||
/// how quickly your website loads for different users and identify areas where
|
||||
/// improvements can be made.
|
||||
/// This aggregation can be particularly interesting for analyzing website or service response
|
||||
/// times. For example, if the 95th percentile website load time is significantly higher than the
|
||||
/// median, this indicates that a small percentage of users are experiencing much slower load times
|
||||
/// than the majority.
|
||||
///
|
||||
/// To use the percentiles aggregation, you'll need to provide a field to
|
||||
/// aggregate on. In the case of website load times, this would typically be a
|
||||
/// field containing the duration of time it takes for the site to load.
|
||||
///
|
||||
/// The JSON format for a percentiles aggregation request is straightforward. The
|
||||
/// following example demonstrates a request for the percentiles of the "load_time"
|
||||
/// The following example demonstrates a request for the percentiles of the "load_time"
|
||||
/// field:
|
||||
///
|
||||
/// ```json
|
||||
/// ```JSON
|
||||
/// {
|
||||
/// "percentiles": {
|
||||
/// "field": "load_time"
|
||||
@@ -46,7 +45,7 @@ use crate::{DocId, TantivyError};
|
||||
/// 25, 50 (median), 75, 95, and 99). You can also customize the percentiles you want to
|
||||
/// calculate by providing an array of values in the "percents" parameter:
|
||||
///
|
||||
/// ```json
|
||||
/// ```JSON
|
||||
/// {
|
||||
/// "percentiles": {
|
||||
/// "field": "load_time",
|
||||
@@ -90,7 +89,7 @@ fn default_as_true() -> bool {
|
||||
}
|
||||
|
||||
impl PercentilesAggregationReq {
|
||||
/// Creates a new [`PercentilesAggregation`] instance from a field name.
|
||||
/// Creates a new [`PercentilesAggregationReq`] instance from a field name.
|
||||
pub fn from_field_name(field_name: String) -> Self {
|
||||
PercentilesAggregationReq {
|
||||
field: field_name,
|
||||
|
||||
@@ -25,8 +25,7 @@
|
||||
//! Aggregations request and result structures de/serialize into elasticsearch compatible JSON.
|
||||
//!
|
||||
//! Notice: Intermediate aggregation results should not be de/serialized via JSON format.
|
||||
//! See compatibility tests here: https://github.com/PSeitz/test_serde_formats
|
||||
//! TLDR: use ciborium.
|
||||
//! Postcard is a good choice.
|
||||
//!
|
||||
//! ```verbatim
|
||||
//! let agg_req: Aggregations = serde_json::from_str(json_request_string).unwrap();
|
||||
@@ -39,6 +38,7 @@
|
||||
//! ## Supported Aggregations
|
||||
//! - [Bucket](bucket)
|
||||
//! - [Histogram](bucket::HistogramAggregation)
|
||||
//! - [DateHistogram](bucket::DateHistogramAggregationReq)
|
||||
//! - [Range](bucket::RangeAggregation)
|
||||
//! - [Terms](bucket::TermsAggregation)
|
||||
//! - [Metric](metric)
|
||||
@@ -48,6 +48,7 @@
|
||||
//! - [Max](metric::MaxAggregation)
|
||||
//! - [Sum](metric::SumAggregation)
|
||||
//! - [Count](metric::CountAggregation)
|
||||
//! - [Percentiles](metric::PercentilesAggregationReq)
|
||||
//!
|
||||
//! # Example
|
||||
//! Compute the average metric, by building [`agg_req::Aggregations`], which is built from an
|
||||
@@ -121,7 +122,7 @@
|
||||
//! [`merge_fruits`](intermediate_agg_result::IntermediateAggregationResults::merge_fruits) method
|
||||
//! to merge multiple results. The merged result can then be converted into
|
||||
//! [`AggregationResults`](agg_result::AggregationResults) via the
|
||||
//! [`into_final_bucket_result`](intermediate_agg_result::IntermediateAggregationResults::into_final_bucket_result) method.
|
||||
//! [`into_final_result`](intermediate_agg_result::IntermediateAggregationResults::into_final_result) method.
|
||||
|
||||
mod agg_limits;
|
||||
pub mod agg_req;
|
||||
|
||||
@@ -109,13 +109,13 @@ pub(crate) fn build_single_agg_segment_collector(
|
||||
accessor_idx,
|
||||
)?)),
|
||||
Histogram(histogram) => Ok(Box::new(SegmentHistogramCollector::from_req_and_validate(
|
||||
histogram,
|
||||
histogram.clone(),
|
||||
&mut req.sub_aggregation,
|
||||
req.field_type,
|
||||
accessor_idx,
|
||||
)?)),
|
||||
DateHistogram(histogram) => Ok(Box::new(SegmentHistogramCollector::from_req_and_validate(
|
||||
&histogram.to_histogram_req()?,
|
||||
histogram.to_histogram_req()?,
|
||||
&mut req.sub_aggregation,
|
||||
req.field_type,
|
||||
accessor_idx,
|
||||
|
||||
Reference in New Issue
Block a user