handle multiple agg results (#2035)

handle multiple intermediate aggregation results with the same name.
This commit is contained in:
PSeitz
2023-05-10 21:00:38 +08:00
committed by GitHub
parent d1988be8e9
commit ba3a885a3b
7 changed files with 24 additions and 24 deletions

View File

@@ -226,7 +226,7 @@ impl SegmentAggregationCollector for SegmentHistogramCollector {
let agg_with_accessor = &agg_with_accessor.aggs.values[self.accessor_idx];
let bucket = self.into_intermediate_bucket_result(agg_with_accessor)?;
results.push(name, IntermediateAggregationResult::Bucket(bucket));
results.push(name, IntermediateAggregationResult::Bucket(bucket))?;
Ok(())
}

View File

@@ -204,7 +204,7 @@ impl SegmentAggregationCollector for SegmentRangeCollector {
column_type: Some(self.column_type),
});
results.push(name, IntermediateAggregationResult::Bucket(bucket));
results.push(name, IntermediateAggregationResult::Bucket(bucket))?;
Ok(())
}

View File

@@ -251,7 +251,7 @@ impl SegmentAggregationCollector for SegmentTermCollector {
let agg_with_accessor = &agg_with_accessor.aggs.values[self.accessor_idx];
let bucket = self.into_intermediate_bucket_result(agg_with_accessor)?;
results.push(name, IntermediateAggregationResult::Bucket(bucket));
results.push(name, IntermediateAggregationResult::Bucket(bucket))?;
Ok(())
}

View File

@@ -3,6 +3,7 @@
//! indices.
use std::cmp::Ordering;
use std::collections::hash_map::Entry;
use std::hash::Hash;
use columnar::ColumnType;
@@ -22,7 +23,7 @@ use super::metric::{
IntermediateSum, PercentilesCollector,
};
use super::segment_agg_result::AggregationLimits;
use super::{format_date, AggregationError, Key, SerializedKey, VecWithNames};
use super::{format_date, AggregationError, Key, SerializedKey};
use crate::aggregation::agg_result::{AggregationResults, BucketEntries, BucketEntry};
use crate::aggregation::bucket::TermsAggregationInternal;
use crate::TantivyError;
@@ -33,7 +34,7 @@ use crate::TantivyError;
/// Notice: This struct should not be de/serialized via JSON format.
#[derive(Default, Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct IntermediateAggregationResults {
pub(crate) aggs_res: VecWithNames<IntermediateAggregationResult>,
pub(crate) aggs_res: FxHashMap<String, IntermediateAggregationResult>,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq)]
@@ -77,8 +78,18 @@ impl std::hash::Hash for IntermediateKey {
impl IntermediateAggregationResults {
/// Add a result
pub fn push(&mut self, key: String, value: IntermediateAggregationResult) {
self.aggs_res.push(key, value);
pub fn push(&mut self, key: String, value: IntermediateAggregationResult) -> crate::Result<()> {
let entry = self.aggs_res.entry(key);
match entry {
Entry::Occupied(mut e) => {
// In case of term aggregation over different types, we need to merge the results.
e.get_mut().merge_fruits(value)?;
}
Entry::Vacant(e) => {
e.insert(value);
}
}
Ok(())
}
/// Convert intermediate result and its aggregation request to the final result.
@@ -128,10 +139,10 @@ impl IntermediateAggregationResults {
}
pub(crate) fn empty_from_req(req: &Aggregations) -> Self {
let mut aggs_res: VecWithNames<IntermediateAggregationResult> = VecWithNames::default();
let mut aggs_res: FxHashMap<String, IntermediateAggregationResult> = FxHashMap::default();
for (key, req) in req.iter() {
let empty_res = empty_from_req(req);
aggs_res.push(key.to_string(), empty_res);
aggs_res.insert(key.to_string(), empty_res);
}
Self { aggs_res }
@@ -765,7 +776,7 @@ mod tests {
)),
);
IntermediateAggregationResults {
aggs_res: VecWithNames::from_entries(map.into_iter().collect()),
aggs_res: map.into_iter().collect(),
}
}
@@ -799,7 +810,7 @@ mod tests {
)),
);
IntermediateAggregationResults {
aggs_res: VecWithNames::from_entries(map.into_iter().collect()),
aggs_res: map.into_iter().collect(),
}
}

View File

@@ -265,7 +265,7 @@ impl SegmentAggregationCollector for SegmentPercentilesCollector {
results.push(
name,
IntermediateAggregationResult::Metric(intermediate_metric_result),
);
)?;
Ok(())
}

View File

@@ -222,7 +222,7 @@ impl SegmentAggregationCollector for SegmentStatsCollector {
results.push(
name,
IntermediateAggregationResult::Metric(intermediate_metric_result),
);
)?;
Ok(())
}

View File

@@ -193,11 +193,6 @@ impl<T> From<HashMap<String, T>> for VecWithNames<T> {
}
impl<T> VecWithNames<T> {
fn push(&mut self, key: String, value: T) {
self.keys.push(key);
self.values.push(value);
}
fn from_entries(mut entries: Vec<(String, T)>) -> Self {
// Sort to ensure order of elements match across multiple instances
entries.sort_by(|left, right| left.0.cmp(&right.0));
@@ -212,18 +207,12 @@ impl<T> VecWithNames<T> {
keys: data_names,
}
}
fn into_iter(self) -> impl Iterator<Item = (String, T)> {
self.keys.into_iter().zip(self.values.into_iter())
}
fn iter(&self) -> impl Iterator<Item = (&str, &T)> + '_ {
self.keys().zip(self.values.iter())
}
fn keys(&self) -> impl Iterator<Item = &str> + '_ {
self.keys.iter().map(|key| key.as_str())
}
fn into_values(self) -> impl Iterator<Item = T> {
self.values.into_iter()
}
fn values_mut(&mut self) -> impl Iterator<Item = &mut T> + '_ {
self.values.iter_mut()
}