refactor parameters

This commit is contained in:
Pascal Seitz
2022-03-17 14:04:15 +08:00
parent 47dcbdbeae
commit aa391bf843
4 changed files with 232 additions and 94 deletions

View File

@@ -11,7 +11,7 @@ use itertools::Itertools;
use serde::{Deserialize, Serialize};
use super::agg_req::{Aggregations, CollectorAggregations, CollectorBucketAggregation};
use super::bucket::generate_buckets;
use super::bucket::intermediate_buckets_to_final_buckets;
use super::intermediate_agg_result::{
IntermediateAggregationResults, IntermediateBucketResult, IntermediateHistogramBucketEntry,
IntermediateMetricResult, IntermediateRangeBucketEntry,
@@ -23,37 +23,52 @@ use super::Key;
/// The final aggegation result.
pub struct AggregationResults(pub HashMap<String, AggregationResult>);
impl From<(IntermediateAggregationResults, Aggregations)> for AggregationResults {
fn from(tree_and_req: (IntermediateAggregationResults, Aggregations)) -> Self {
let agg: CollectorAggregations = tree_and_req.1.into();
(tree_and_req.0, &agg).into()
impl AggregationResults {
/// Convert and intermediate result and its aggregation request to the final result
pub fn from_intermediate_and_req(
results: IntermediateAggregationResults,
agg: Aggregations,
) -> Self {
AggregationResults::from_intermediate_and_req_internal(results, &(agg.into()))
}
}
impl From<(IntermediateAggregationResults, &CollectorAggregations)> for AggregationResults {
fn from(data: (IntermediateAggregationResults, &CollectorAggregations)) -> Self {
let tree = data.0;
let req = data.1;
/// Convert and intermediate result and its aggregation request to the final result
///
/// Internal function, CollectorAggregations is used instead Aggregations, which is optimized
/// for internal processing
fn from_intermediate_and_req_internal(
results: IntermediateAggregationResults,
req: &CollectorAggregations,
) -> Self {
let mut result = HashMap::default();
// Important assumption:
// When the tree contains buckets/metric, we expect it to have all buckets/metrics from the
// request
if let Some(buckets) = tree.buckets {
if let Some(buckets) = results.buckets {
result.extend(buckets.into_iter().zip(req.buckets.values()).map(
|((key, bucket), req)| (key, AggregationResult::BucketResult((bucket, req).into())),
|((key, bucket), req)| {
(
key,
AggregationResult::BucketResult(BucketResult::from_intermediate_and_req(
bucket, req,
)),
)
},
));
} else {
result.extend(req.buckets.iter().map(|(key, req)| {
let empty_bucket = IntermediateBucketResult::empty_from_req(&req.bucket_agg);
(
key.to_string(),
AggregationResult::BucketResult((empty_bucket, req).into()),
AggregationResult::BucketResult(BucketResult::from_intermediate_and_req(
empty_bucket,
req,
)),
)
}));
}
if let Some(metrics) = tree.metrics {
if let Some(metrics) = results.metrics {
result.extend(
metrics
.into_iter()
@@ -127,15 +142,18 @@ pub enum BucketResult {
},
}
impl From<(IntermediateBucketResult, &CollectorBucketAggregation)> for BucketResult {
fn from(result_and_req: (IntermediateBucketResult, &CollectorBucketAggregation)) -> Self {
let bucket_result = result_and_req.0;
let req = result_and_req.1;
impl BucketResult {
fn from_intermediate_and_req(
bucket_result: IntermediateBucketResult,
req: &CollectorBucketAggregation,
) -> Self {
match bucket_result {
IntermediateBucketResult::Range(range_map) => {
let mut buckets: Vec<RangeBucketEntry> = range_map
.into_iter()
.map(|(_, bucket)| (bucket, &req.sub_aggregation).into())
.map(|(_, bucket)| {
RangeBucketEntry::from_intermediate_and_req(bucket, &req.sub_aggregation)
})
.collect_vec();
buckets.sort_by(|a, b| {
@@ -147,59 +165,11 @@ impl From<(IntermediateBucketResult, &CollectorBucketAggregation)> for BucketRes
BucketResult::Range { buckets }
}
IntermediateBucketResult::Histogram { buckets } => {
let histogram_req = req.as_histogram();
let buckets = if histogram_req.min_doc_count() == 0 {
// With min_doc_count != 0, we may need to add buckets, so that there are no
// gaps, since intermediate result does not contain empty buckets (filtered to
// reduce serialization size).
let (min, max) = if buckets.is_empty() {
(f64::MAX, f64::MIN)
} else {
let min = buckets[0].key;
let max = buckets[buckets.len() - 1].key;
(min, max)
};
let fill_gaps_buckets = generate_buckets(histogram_req, min, max);
let sub_aggregation =
IntermediateAggregationResults::empty_from_req(&req.sub_aggregation);
buckets
.into_iter()
.merge_join_by(
fill_gaps_buckets.into_iter(),
|existing_bucket, fill_gaps_bucket| {
existing_bucket
.key
.partial_cmp(fill_gaps_bucket)
.unwrap_or(Ordering::Equal)
},
)
.map(|either| match either {
itertools::EitherOrBoth::Both(existing, _) => {
(existing, &req.sub_aggregation).into()
}
itertools::EitherOrBoth::Left(existing) => {
(existing, &req.sub_aggregation).into()
}
// Add missing bucket
itertools::EitherOrBoth::Right(bucket) => BucketEntry {
key: Key::F64(bucket),
doc_count: 0,
sub_aggregation: (sub_aggregation.clone(), &req.sub_aggregation)
.into(),
},
})
.collect_vec()
} else {
buckets
.into_iter()
.filter(|bucket| bucket.doc_count >= histogram_req.min_doc_count())
.map(|bucket| (bucket, &req.sub_aggregation).into())
.collect_vec()
};
let buckets = intermediate_buckets_to_final_buckets(
buckets,
req.as_histogram(),
&req.sub_aggregation,
);
BucketResult::Histogram { buckets }
}
@@ -244,14 +214,18 @@ pub struct BucketEntry {
pub sub_aggregation: AggregationResults,
}
impl From<(IntermediateHistogramBucketEntry, &CollectorAggregations)> for BucketEntry {
fn from(entry_and_req: (IntermediateHistogramBucketEntry, &CollectorAggregations)) -> Self {
let entry = entry_and_req.0;
let req = entry_and_req.1;
impl BucketEntry {
pub(crate) fn from_intermediate_and_req(
entry: IntermediateHistogramBucketEntry,
req: &CollectorAggregations,
) -> Self {
BucketEntry {
key: Key::F64(entry.key),
doc_count: entry.doc_count,
sub_aggregation: (entry.sub_aggregation, req).into(),
sub_aggregation: AggregationResults::from_intermediate_and_req_internal(
entry.sub_aggregation,
req,
),
}
}
}
@@ -303,14 +277,18 @@ pub struct RangeBucketEntry {
pub to: Option<f64>,
}
impl From<(IntermediateRangeBucketEntry, &CollectorAggregations)> for RangeBucketEntry {
fn from(entry_and_req: (IntermediateRangeBucketEntry, &CollectorAggregations)) -> Self {
let entry = entry_and_req.0;
let req = entry_and_req.1;
impl RangeBucketEntry {
fn from_intermediate_and_req(
entry: IntermediateRangeBucketEntry,
req: &CollectorAggregations,
) -> Self {
RangeBucketEntry {
key: entry.key,
doc_count: entry.doc_count,
sub_aggregation: (entry.sub_aggregation, req).into(),
sub_aggregation: AggregationResults::from_intermediate_and_req_internal(
entry.sub_aggregation,
req,
),
to: entry.to,
from: entry.from,
}

View File

@@ -1,12 +1,18 @@
use std::cmp::Ordering;
use std::fmt::Display;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use crate::aggregation::agg_req::CollectorAggregations;
use crate::aggregation::agg_req_with_accessor::{
AggregationsWithAccessor, BucketAggregationWithAccessor,
};
use crate::aggregation::agg_result::BucketEntry;
use crate::aggregation::f64_from_fastfield_u64;
use crate::aggregation::intermediate_agg_result::IntermediateBucketResult;
use crate::aggregation::intermediate_agg_result::{
IntermediateAggregationResults, IntermediateBucketResult, IntermediateHistogramBucketEntry,
};
use crate::aggregation::segment_agg_result::{
SegmentAggregationResultsCollector, SegmentHistogramBucketEntry,
};
@@ -208,7 +214,7 @@ impl SegmentHistogramCollector {
let min = f64_from_fastfield_u64(accessor.min_value(), &field_type);
let max = f64_from_fastfield_u64(accessor.max_value(), &field_type);
let (min, max) = get_req_min_max(req, min, max);
let (min, max) = get_req_min_max(req, Some((min, max)));
// We compute and generate the buckets range (min, max) based on the request and the min
// max in the fast field, but this is likely not ideal when this is a subbucket, where many
@@ -231,8 +237,6 @@ impl SegmentHistogramCollector {
})
.collect();
let (min, _) = get_req_min_max(req, min, max);
let first_bucket_num =
get_bucket_num_f64(min, req.interval, req.offset.unwrap_or(0.0)) as i64;
@@ -245,7 +249,7 @@ impl SegmentHistogramCollector {
buckets,
field_type,
interval: req.interval,
offset: req.offset.unwrap_or(0f64),
offset: req.offset.unwrap_or(0.0),
first_bucket_num,
bounds,
sub_aggregations,
@@ -381,11 +385,94 @@ fn get_bucket_val(val: f64, interval: f64, offset: f64) -> f64 {
bucket_pos * interval + offset
}
fn get_req_min_max(req: &HistogramAggregation, mut min: f64, mut max: f64) -> (f64, f64) {
// Convert to BucketEntry and fill gaps
fn intermediate_buckets_to_final_buckets_fill_gaps(
buckets: Vec<IntermediateHistogramBucketEntry>,
histogram_req: &HistogramAggregation,
sub_aggregation: &CollectorAggregations,
) -> Vec<BucketEntry> {
// Generate the the full list of buckets without gaps.
//
// The bounds are the min max from the current buckets, optionally extended by
// extended_bounds from the request
let min_max = if buckets.is_empty() {
None
} else {
let min = buckets[0].key;
let max = buckets[buckets.len() - 1].key;
Some((min, max))
};
let fill_gaps_buckets = generate_buckets_with_opt_minmax(histogram_req, min_max);
let empty_sub_aggregation = IntermediateAggregationResults::empty_from_req(&sub_aggregation);
// Use merge_join_by to fill in gaps, since buckets are sorted
let buckets = buckets
.into_iter()
.merge_join_by(
fill_gaps_buckets.into_iter(),
|existing_bucket, fill_gaps_bucket| {
existing_bucket
.key
.partial_cmp(fill_gaps_bucket)
.unwrap_or(Ordering::Equal)
},
)
.map(|either| match either {
// Ignore the generated bucket
itertools::EitherOrBoth::Both(existing, _) => existing,
itertools::EitherOrBoth::Left(existing) => existing,
// Add missing bucket
itertools::EitherOrBoth::Right(missing_bucket) => IntermediateHistogramBucketEntry {
key: missing_bucket,
doc_count: 0,
sub_aggregation: empty_sub_aggregation.clone(),
},
})
.map(|intermediate_bucket| {
BucketEntry::from_intermediate_and_req(intermediate_bucket, &sub_aggregation)
})
.collect_vec();
return buckets;
}
// Convert to BucketEntry
pub(crate) fn intermediate_buckets_to_final_buckets(
buckets: Vec<IntermediateHistogramBucketEntry>,
histogram_req: &HistogramAggregation,
sub_aggregation: &CollectorAggregations,
) -> Vec<BucketEntry> {
if histogram_req.min_doc_count() == 0 {
// With min_doc_count != 0, we may need to add buckets, so that there are no
// gaps, since intermediate result does not contain empty buckets (filtered to
// reduce serialization size).
let buckets = intermediate_buckets_to_final_buckets_fill_gaps(
buckets,
histogram_req,
sub_aggregation,
);
return buckets;
} else {
let buckets = buckets
.into_iter()
.filter(|bucket| bucket.doc_count >= histogram_req.min_doc_count())
.map(|bucket| BucketEntry::from_intermediate_and_req(bucket, &sub_aggregation))
.collect_vec();
return buckets;
};
}
/// Applies req extended_bounds/hard_bounds on the min_max value
///
/// May return (f64::MAX, f64::MIN), if there is no range.
fn get_req_min_max(req: &HistogramAggregation, min_max: Option<(f64, f64)>) -> (f64, f64) {
let (mut min, mut max) = min_max.unwrap_or((f64::MAX, f64::MIN));
if let Some(extended_bounds) = &req.extended_bounds {
min = min.min(extended_bounds.min);
max = max.max(extended_bounds.max);
}
if let Some(hard_bounds) = &req.hard_bounds {
min = min.max(hard_bounds.min);
max = max.min(hard_bounds.max);
@@ -394,9 +481,20 @@ fn get_req_min_max(req: &HistogramAggregation, mut min: f64, mut max: f64) -> (f
(min, max)
}
/// Generates buckets with req.interval, for given min, max
/// Generates buckets with req.interval
/// range is computed for provided min_max and request extended_bounds/hard_bounds
pub(crate) fn generate_buckets(req: &HistogramAggregation, min: f64, max: f64) -> Vec<f64> {
let (min, max) = get_req_min_max(req, min, max);
generate_buckets_with_opt_minmax(req, Some((min, max)))
}
/// Generates buckets with req.interval
/// Range is computed for provided min_max and request extended_bounds/hard_bounds
/// returns empty vec when there is no range to span
pub(crate) fn generate_buckets_with_opt_minmax(
req: &HistogramAggregation,
min_max: Option<(f64, f64)>,
) -> Vec<f64> {
let (min, max) = get_req_min_max(req, min_max);
let offset = req.offset.unwrap_or(0.0);
let first_bucket_num = get_bucket_num_f64(min, req.interval, offset) as i64;
@@ -1090,6 +1188,64 @@ mod tests {
assert_eq!(res["histogram"]["buckets"][10]["key"], 12.0);
assert_eq!(res["histogram"]["buckets"][10]["doc_count"], 0);
let agg_req: Aggregations = vec![(
"histogram".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Histogram(HistogramAggregation {
field: "score_f64".to_string(),
interval: 1.0,
extended_bounds: Some(HistogramBounds { min: 2.0, max: 5.0 }),
hard_bounds: Some(HistogramBounds {
min: 2.0,
max: 12.0,
}),
..Default::default()
}),
sub_aggregation: Default::default(),
}),
)]
.into_iter()
.collect();
let res = exec_request(agg_req, &index)?;
assert_eq!(res["histogram"]["buckets"][0]["key"], 2.0);
assert_eq!(res["histogram"]["buckets"][0]["doc_count"], 0);
assert_eq!(res["histogram"]["buckets"][1]["key"], 3.0);
assert_eq!(res["histogram"]["buckets"][1]["doc_count"], 0);
assert_eq!(res["histogram"]["buckets"][2]["doc_count"], 0);
assert_eq!(res["histogram"]["buckets"][10], Value::Null);
// hard_bounds will not extend the result
let agg_req: Aggregations = vec![(
"histogram".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Histogram(HistogramAggregation {
field: "score_f64".to_string(),
interval: 1.0,
hard_bounds: Some(HistogramBounds {
min: 2.0,
max: 12.0,
}),
..Default::default()
}),
sub_aggregation: Default::default(),
}),
)]
.into_iter()
.collect();
let res = exec_request(agg_req, &index)?;
assert_eq!(
res,
json!({
"histogram": {
"buckets": []
}
})
);
let agg_req: Aggregations = vec![
(
"stats".to_string(),

View File

@@ -86,7 +86,8 @@ impl Collector for AggregationCollector {
&self,
segment_fruits: Vec<<Self::Child as SegmentCollector>::Fruit>,
) -> crate::Result<Self::Fruit> {
merge_fruits(segment_fruits).map(|res| (res, self.agg.clone()).into())
merge_fruits(segment_fruits)
.map(|res| AggregationResults::from_intermediate_and_req(res, self.agg.clone()))
}
}

View File

@@ -456,7 +456,10 @@ mod tests {
let collector = DistributedAggregationCollector::from_aggs(agg_req.clone());
let searcher = reader.searcher();
(searcher.search(&term_query, &collector).unwrap(), agg_req).into()
AggregationResults::from_intermediate_and_req(
searcher.search(&term_query, &collector).unwrap(),
agg_req,
)
} else {
let collector = AggregationCollector::from_aggs(agg_req);
@@ -835,7 +838,7 @@ mod tests {
// Test de/serialization roundtrip on intermediate_agg_result
let res: IntermediateAggregationResults =
serde_json::from_str(&serde_json::to_string(&res).unwrap()).unwrap();
(res, agg_req.clone()).into()
AggregationResults::from_intermediate_and_req(res, agg_req.clone())
} else {
let collector = AggregationCollector::from_aggs(agg_req.clone());