add date_histogram (#1900)

* add date_histogram

* add return result
This commit is contained in:
PSeitz
2023-03-02 12:17:35 +08:00
committed by GitHub
parent faa706d804
commit ca20bfa776
10 changed files with 410 additions and 31 deletions

View File

@@ -50,7 +50,7 @@ use std::collections::{HashMap, HashSet};
use serde::{Deserialize, Serialize};
pub use super::bucket::RangeAggregation;
use super::bucket::{HistogramAggregation, TermsAggregation};
use super::bucket::{DateHistogramAggregationReq, HistogramAggregation, TermsAggregation};
use super::metric::{
AverageAggregation, CountAggregation, MaxAggregation, MinAggregation, StatsAggregation,
SumAggregation,
@@ -110,10 +110,13 @@ impl BucketAggregationInternal {
_ => None,
}
}
pub(crate) fn as_histogram(&self) -> Option<&HistogramAggregation> {
pub(crate) fn as_histogram(&self) -> crate::Result<Option<HistogramAggregation>> {
match &self.bucket_agg {
BucketAggregationType::Histogram(histogram) => Some(histogram),
_ => None,
BucketAggregationType::Histogram(histogram) => Ok(Some(histogram.clone())),
BucketAggregationType::DateHistogram(histogram) => {
Ok(Some(histogram.to_histogram_req()?))
}
_ => Ok(None),
}
}
pub(crate) fn as_term(&self) -> Option<&TermsAggregation> {
@@ -196,6 +199,9 @@ pub enum BucketAggregationType {
/// Put data into buckets of user-defined ranges.
#[serde(rename = "histogram")]
Histogram(HistogramAggregation),
/// Put data into buckets of user-defined ranges.
#[serde(rename = "date_histogram")]
DateHistogram(DateHistogramAggregationReq),
/// Put data into buckets of terms.
#[serde(rename = "terms")]
Terms(TermsAggregation),
@@ -207,6 +213,7 @@ impl BucketAggregationType {
BucketAggregationType::Terms(terms) => terms.field.as_str(),
BucketAggregationType::Range(range) => range.field.as_str(),
BucketAggregationType::Histogram(histogram) => histogram.field.as_str(),
BucketAggregationType::DateHistogram(histogram) => histogram.field.as_str(),
}
}
}

View File

@@ -6,7 +6,9 @@ use std::sync::atomic::AtomicU32;
use columnar::{Column, ColumnType, StrColumn};
use super::agg_req::{Aggregation, Aggregations, BucketAggregationType, MetricAggregation};
use super::bucket::{HistogramAggregation, RangeAggregation, TermsAggregation};
use super::bucket::{
DateHistogramAggregationReq, HistogramAggregation, RangeAggregation, TermsAggregation,
};
use super::metric::{
AverageAggregation, CountAggregation, MaxAggregation, MinAggregation, StatsAggregation,
SumAggregation,
@@ -62,6 +64,10 @@ impl BucketAggregationWithAccessor {
BucketAggregationType::Histogram(HistogramAggregation {
field: field_name, ..
}) => get_ff_reader_and_validate(reader, field_name)?,
BucketAggregationType::DateHistogram(DateHistogramAggregationReq {
field: field_name,
..
}) => get_ff_reader_and_validate(reader, field_name)?,
BucketAggregationType::Terms(TermsAggregation {
field: field_name, ..
}) => {

View File

@@ -1,5 +1,8 @@
use serde::{Deserialize, Serialize};
use super::{HistogramAggregation, HistogramBounds};
use crate::aggregation::AggregationError;
/// DateHistogramAggregation is similar to `HistogramAggregation`, but it can only be used with date
/// type.
///
@@ -29,8 +32,16 @@ use serde::{Deserialize, Serialize};
/// See [`BucketEntry`](crate::aggregation::agg_result::BucketEntry)
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct DateHistogramAggregationReq {
#[doc(hidden)]
/// Only for validation
interval: Option<String>,
#[doc(hidden)]
/// Only for validation
date_interval: Option<String>,
/// The field to aggregate on.
pub field: String,
/// The format to format dates.
pub format: Option<String>,
/// The interval to chunk your data range. Each bucket spans a value range of
/// [0..fixed_interval). Accepted values
///
@@ -55,29 +66,132 @@ pub struct DateHistogramAggregationReq {
/// Intervals implicitly defines an absolute grid of buckets `[interval * k, interval * (k +
/// 1))`.
pub offset: Option<String>,
/// The minimum number of documents in a bucket to be returned. Defaults to 0.
pub min_doc_count: Option<u64>,
/// Limits the data range to `[min, max]` closed interval.
///
/// This can be used to filter values if they are not in the data range.
///
/// hard_bounds only limits the buckets, to force a range set both extended_bounds and
/// hard_bounds to the same range.
///
/// Needs to be provided as timestamp in microseconds precision.
///
/// ## Example
/// ```json
/// {
/// "sales_over_time": {
/// "date_histogram": {
/// "field": "dates",
/// "interval": "1d",
/// "hard_bounds": {
/// "min": 0,
/// "max": 1420502400000000
/// }
/// }
/// }
/// }
/// ```
pub hard_bounds: Option<HistogramBounds>,
/// Can be set to extend your bounds. The range of the buckets is by default defined by the
/// data range of the values of the documents. As the name suggests, this can only be used to
/// extend the value range. If the bounds for min or max are not extending the range, the value
/// has no effect on the returned buckets.
///
/// Cannot be set in conjunction with min_doc_count > 0, since the empty buckets from extended
/// bounds would not be returned.
pub extended_bounds: Option<HistogramBounds>,
/// Whether to return the buckets as a hash map
#[serde(default)]
pub keyed: bool,
}
impl DateHistogramAggregationReq {
pub(crate) fn to_histogram_req(&self) -> crate::Result<HistogramAggregation> {
self.validate()?;
Ok(HistogramAggregation {
field: self.field.to_string(),
interval: parse_into_microseconds(&self.fixed_interval)? as f64,
offset: self
.offset
.as_ref()
.map(|offset| parse_offset_into_microseconds(offset))
.transpose()?
.map(|el| el as f64),
min_doc_count: self.min_doc_count,
hard_bounds: None,
extended_bounds: None,
keyed: self.keyed,
})
}
fn validate(&self) -> crate::Result<()> {
if self.interval.is_some() {
return Err(crate::TantivyError::InvalidArgument(format!(
"`interval` parameter {:?} in date histogram is unsupported, only \
`fixed_interval` is supported",
self.interval
)));
}
if self.format.is_some() {
return Err(crate::TantivyError::InvalidArgument(
"format parameter on date_histogram is unsupported".to_string(),
));
}
if self.date_interval.is_some() {
return Err(crate::TantivyError::InvalidArgument(
"date_interval in date histogram is unsupported, only `fixed_interval` is \
supported"
.to_string(),
));
}
parse_into_microseconds(&self.fixed_interval)?;
Ok(())
}
}
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Error)]
/// Errors when parsing the fixed interval for `DateHistogramAggregationReq`.
pub enum DateHistogramParseError {
/// Unit not recognized in passed String
#[error("Unit not recognized in passed String {0:?}")]
UnitNotRecognized(String),
/// Number not found in passed String
#[error("Number not found in passed String {0:?}")]
NumberMissing(String),
/// Unit not found in passed String
#[error("Unit not found in passed String {0:?}")]
UnitMissing(String),
/// Offset invalid
#[error("passed offset is invalid {0:?}")]
InvalidOffset(String),
}
fn parse_into_milliseconds(input: &str) -> Result<u64, DateHistogramParseError> {
fn parse_offset_into_microseconds(input: &str) -> Result<i64, AggregationError> {
let is_sign = |byte| &[byte] == b"-" || &[byte] == b"+";
if input.is_empty() {
return Err(DateHistogramParseError::InvalidOffset(input.to_string()).into());
}
let has_sign = is_sign(input.as_bytes()[0]);
if has_sign {
let (sign, input) = input.split_at(1);
let val = parse_into_microseconds(input)?;
if sign == "-" {
Ok(-val)
} else {
Ok(val)
}
} else {
parse_into_microseconds(input)
}
}
fn parse_into_microseconds(input: &str) -> Result<i64, AggregationError> {
let split_boundary = input
.as_bytes()
.iter()
@@ -85,12 +199,12 @@ fn parse_into_milliseconds(input: &str) -> Result<u64, DateHistogramParseError>
.count();
let (number, unit) = input.split_at(split_boundary);
if number.is_empty() {
return Err(DateHistogramParseError::NumberMissing(input.to_string()));
return Err(DateHistogramParseError::NumberMissing(input.to_string()).into());
}
if unit.is_empty() {
return Err(DateHistogramParseError::UnitMissing(input.to_string()));
return Err(DateHistogramParseError::UnitMissing(input.to_string()).into());
}
let number: u64 = number
let number: i64 = number
.parse()
// Technically this should never happen, but there was a bug
// here and being defensive does not hurt.
@@ -102,36 +216,260 @@ fn parse_into_milliseconds(input: &str) -> Result<u64, DateHistogramParseError>
"m" => 60 * 1000,
"h" => 60 * 60 * 1000,
"d" => 24 * 60 * 60 * 1000,
_ => return Err(DateHistogramParseError::UnitNotRecognized(unit.to_string())),
_ => return Err(DateHistogramParseError::UnitNotRecognized(unit.to_string()).into()),
};
Ok(number * multiplier_from_unit)
Ok(number * multiplier_from_unit * 1000)
}
#[cfg(test)]
mod tests {
use pretty_assertions::assert_eq;
use super::*;
use crate::aggregation::agg_req::Aggregations;
use crate::aggregation::tests::exec_request;
use crate::indexer::NoMergePolicy;
use crate::schema::{Schema, FAST};
use crate::Index;
#[test]
fn test_parse_into_milliseconds() {
assert_eq!(parse_into_milliseconds("1m").unwrap(), 60_000);
assert_eq!(parse_into_milliseconds("2m").unwrap(), 120_000);
fn test_parse_into_microseconds() {
assert_eq!(parse_into_microseconds("1m").unwrap(), 60_000_000);
assert_eq!(parse_into_microseconds("2m").unwrap(), 120_000_000);
assert_eq!(
parse_into_milliseconds("2y").unwrap_err(),
DateHistogramParseError::UnitNotRecognized("y".to_string())
parse_into_microseconds("2y").unwrap_err(),
DateHistogramParseError::UnitNotRecognized("y".to_string()).into()
);
assert_eq!(
parse_into_milliseconds("2000").unwrap_err(),
DateHistogramParseError::UnitMissing("2000".to_string())
parse_into_microseconds("2000").unwrap_err(),
DateHistogramParseError::UnitMissing("2000".to_string()).into()
);
assert_eq!(
parse_into_milliseconds("ms").unwrap_err(),
DateHistogramParseError::NumberMissing("ms".to_string())
parse_into_microseconds("ms").unwrap_err(),
DateHistogramParseError::NumberMissing("ms".to_string()).into()
);
}
#[test]
fn test_parse_offset_into_microseconds() {
assert_eq!(parse_offset_into_microseconds("1m").unwrap(), 60_000_000);
assert_eq!(parse_offset_into_microseconds("+1m").unwrap(), 60_000_000);
assert_eq!(parse_offset_into_microseconds("-1m").unwrap(), -60_000_000);
assert_eq!(parse_offset_into_microseconds("2m").unwrap(), 120_000_000);
assert_eq!(parse_offset_into_microseconds("+2m").unwrap(), 120_000_000);
assert_eq!(parse_offset_into_microseconds("-2m").unwrap(), -120_000_000);
assert_eq!(parse_offset_into_microseconds("-2ms").unwrap(), -2_000);
assert_eq!(
parse_offset_into_microseconds("2y").unwrap_err(),
DateHistogramParseError::UnitNotRecognized("y".to_string()).into()
);
assert_eq!(
parse_offset_into_microseconds("2000").unwrap_err(),
DateHistogramParseError::UnitMissing("2000".to_string()).into()
);
assert_eq!(
parse_offset_into_microseconds("ms").unwrap_err(),
DateHistogramParseError::NumberMissing("ms".to_string()).into()
);
}
#[test]
fn test_parse_into_milliseconds_do_not_accept_non_ascii() {
assert!(parse_into_milliseconds("m").is_err());
assert!(parse_into_microseconds("m").is_err());
}
pub fn get_test_index_from_docs(
merge_segments: bool,
segment_and_docs: &[Vec<&str>],
) -> crate::Result<Index> {
let mut schema_builder = Schema::builder();
schema_builder.add_date_field("date", FAST);
schema_builder.add_text_field("text", FAST);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema.clone());
{
let mut index_writer = index.writer_with_num_threads(1, 30_000_000)?;
index_writer.set_merge_policy(Box::new(NoMergePolicy));
for values in segment_and_docs {
for doc_str in values {
let doc = schema.parse_document(doc_str)?;
index_writer.add_document(doc)?;
}
// writing the segment
index_writer.commit()?;
}
}
if merge_segments {
let segment_ids = index
.searchable_segment_ids()
.expect("Searchable segments failed.");
if segment_ids.len() > 1 {
let mut index_writer = index.writer_for_tests()?;
index_writer.merge(&segment_ids).wait()?;
index_writer.wait_merging_threads()?;
}
}
Ok(index)
}
#[test]
fn histogram_test_date_force_merge_segments() -> crate::Result<()> {
histogram_test_date_merge_segments(true)
}
#[test]
fn histogram_test_date() -> crate::Result<()> {
histogram_test_date_merge_segments(false)
}
fn histogram_test_date_merge_segments(merge_segments: bool) -> crate::Result<()> {
let docs = vec![
vec![r#"{ "date": "2015-01-01T12:10:30Z", "text": "aaa" }"#],
vec![r#"{ "date": "2015-01-01T11:11:30Z", "text": "bbb" }"#],
vec![r#"{ "date": "2015-01-02T00:00:00Z", "text": "bbb" }"#],
vec![r#"{ "date": "2015-01-06T00:00:00Z", "text": "ccc" }"#],
];
let index = get_test_index_from_docs(merge_segments, &docs)?;
// 30day + offset
let elasticsearch_compatible_json = json!(
{
"sales_over_time": {
"date_histogram": {
"field": "date",
"fixed_interval": "30d",
"offset": "-4d"
}
}
}
);
let agg_req: Aggregations =
serde_json::from_str(&serde_json::to_string(&elasticsearch_compatible_json).unwrap())
.unwrap();
let res = exec_request(agg_req, &index)?;
let expected_res = json!({
"sales_over_time" : {
"buckets" : [
{
"key_as_string" : "2015-01-01T00:00:00Z",
"key" : 1420070400000000.0,
"doc_count" : 4
}
]
}
});
assert_eq!(res, expected_res);
// 30day + offset + sub_agg
let elasticsearch_compatible_json = json!(
{
"sales_over_time": {
"date_histogram": {
"field": "date",
"fixed_interval": "30d",
"offset": "-4d"
},
"aggs": {
"texts": {
"terms": {"field": "text"}
}
}
}
}
);
let agg_req: Aggregations =
serde_json::from_str(&serde_json::to_string(&elasticsearch_compatible_json).unwrap())
.unwrap();
let res = exec_request(agg_req, &index)?;
println!("{}", serde_json::to_string_pretty(&res).unwrap());
let expected_res = json!({
"sales_over_time" : {
"buckets" : [
{
"key_as_string" : "2015-01-01T00:00:00Z",
"key" : 1420070400000000.0,
"doc_count" : 4,
"texts": {
"buckets": [
{
"doc_count": 2,
"key": "bbb"
},
{
"doc_count": 1,
"key": "ccc"
},
{
"doc_count": 1,
"key": "aaa"
}
],
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0
}
}
]
}
});
assert_eq!(res, expected_res);
// 1day
let elasticsearch_compatible_json = json!(
{
"sales_over_time": {
"date_histogram": {
"field": "date",
"fixed_interval": "1d"
}
}
}
);
let agg_req: Aggregations =
serde_json::from_str(&serde_json::to_string(&elasticsearch_compatible_json).unwrap())
.unwrap();
let res = exec_request(agg_req, &index)?;
let expected_res = json!( {
"sales_over_time": {
"buckets": [
{
"doc_count": 2,
"key": 1420070400000000.0,
"key_as_string": "2015-01-01T00:00:00Z"
},
{
"doc_count": 1,
"key": 1420156800000000.0,
"key_as_string": "2015-01-02T00:00:00Z"
},
{
"doc_count": 0,
"key": 1420243200000000.0,
"key_as_string": "2015-01-03T00:00:00Z"
},
{
"doc_count": 0,
"key": 1420329600000000.0,
"key_as_string": "2015-01-04T00:00:00Z"
},
{
"doc_count": 0,
"key": 1420416000000000.0,
"key_as_string": "2015-01-05T00:00:00Z"
},
{
"doc_count": 1,
"key": 1420502400000000.0,
"key_as_string": "2015-01-06T00:00:00Z"
}
]
}
});
assert_eq!(res, expected_res);
Ok(())
}
}

View File

@@ -394,6 +394,7 @@ fn intermediate_buckets_to_final_buckets_fill_gaps(
// extended_bounds from the request
let min_max = minmax(buckets.iter().map(|bucket| bucket.key));
// TODO add memory check
let fill_gaps_buckets = generate_buckets_with_opt_minmax(histogram_req, min_max);
let empty_sub_aggregation = IntermediateAggregationResults::empty_from_req(sub_aggregation);

View File

@@ -1,4 +1,4 @@
// mod date_histogram;
mod date_histogram;
mod histogram;
// pub use date_histogram::*;
pub use date_histogram::*;
pub use histogram::*;

9
src/aggregation/error.rs Normal file
View File

@@ -0,0 +1,9 @@
use super::bucket::DateHistogramParseError;
/// Error that may occur when opening a directory
#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum AggregationError {
/// Failed to open the directory.
#[error("Date histogram parse error: {0:?}")]
DateHistogramParseError(#[from] DateHistogramParseError),
}

View File

@@ -329,15 +329,17 @@ impl IntermediateBucketResult {
column_type,
buckets,
} => {
let histogram_req = &req
.as_histogram()?
.expect("unexpected aggregation, expected histogram aggregation");
let buckets = intermediate_histogram_buckets_to_final_buckets(
buckets,
column_type,
req.as_histogram()
.expect("unexpected aggregation, expected histogram aggregation"),
histogram_req,
&req.sub_aggregation,
)?;
let buckets = if req.as_histogram().unwrap().keyed {
let buckets = if histogram_req.keyed {
let mut bucket_map =
FxHashMap::with_capacity_and_hasher(buckets.len(), Default::default());
for bucket in buckets {
@@ -361,10 +363,12 @@ impl IntermediateBucketResult {
match req {
BucketAggregationType::Terms(_) => IntermediateBucketResult::Terms(Default::default()),
BucketAggregationType::Range(_) => IntermediateBucketResult::Range(Default::default()),
BucketAggregationType::Histogram(_) => IntermediateBucketResult::Histogram {
buckets: vec![],
column_type: None,
},
BucketAggregationType::Histogram(_) | BucketAggregationType::DateHistogram(_) => {
IntermediateBucketResult::Histogram {
buckets: vec![],
column_type: None,
}
}
}
}
fn merge_fruits(&mut self, other: IntermediateBucketResult) {

View File

@@ -162,6 +162,7 @@ pub mod bucket;
mod buf_collector;
mod collector;
mod date;
mod error;
pub mod intermediate_agg_result;
pub mod metric;
mod segment_agg_result;
@@ -177,6 +178,7 @@ pub use collector::{
};
use columnar::{ColumnType, MonotonicallyMappableToU64};
pub(crate) use date::format_date;
pub use error::AggregationError;
use itertools::Itertools;
use serde::{Deserialize, Serialize};

View File

@@ -144,6 +144,14 @@ pub(crate) fn build_bucket_segment_agg_collector(
accessor_idx,
)?))
}
BucketAggregationType::DateHistogram(histogram) => {
Ok(Box::new(SegmentHistogramCollector::from_req_and_validate(
&histogram.to_histogram_req()?,
&req.sub_aggregation,
req.field_type,
accessor_idx,
)?))
}
}
}

View File

@@ -6,6 +6,7 @@ use std::{fmt, io};
use thiserror::Error;
use crate::aggregation::AggregationError;
use crate::directory::error::{
Incompatibility, LockError, OpenDirectoryError, OpenReadError, OpenWriteError,
};
@@ -53,6 +54,9 @@ impl fmt::Debug for DataCorruption {
/// The library's error enum
#[derive(Debug, Clone, Error)]
pub enum TantivyError {
/// Error when handling aggregations.
#[error("AggregationError {0:?}")]
AggregationError(#[from] AggregationError),
/// Failed to open the directory.
#[error("Failed to open the directory: '{0:?}'")]
OpenDirectoryError(#[from] OpenDirectoryError),