diff --git a/src/aggregation/agg_result.rs b/src/aggregation/agg_result.rs index 690f5e272..0c4fd09b9 100644 --- a/src/aggregation/agg_result.rs +++ b/src/aggregation/agg_result.rs @@ -12,8 +12,8 @@ use serde::{Deserialize, Serialize}; use super::bucket::generate_buckets; use super::intermediate_agg_result::{ - IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult, - IntermediateHistogramBucketEntry, IntermediateMetricResult, IntermediateRangeBucketEntry, + IntermediateAggregationResults, IntermediateBucketResult, IntermediateHistogramBucketEntry, + IntermediateMetricResult, IntermediateRangeBucketEntry, }; use super::metric::{SingleMetricResult, Stats}; use super::Key; @@ -25,9 +25,16 @@ pub struct AggregationResults(pub HashMap); impl From for AggregationResults { fn from(tree: IntermediateAggregationResults) -> Self { Self( - tree.0 + tree.buckets + .unwrap_or_default() .into_iter() - .map(|(key, agg)| (key, agg.into())) + .map(|(key, bucket)| (key, AggregationResult::BucketResult(bucket.into()))) + .chain( + tree.metrics + .unwrap_or_default() + .into_iter() + .map(|(key, metric)| (key, AggregationResult::MetricResult(metric.into()))), + ) .collect(), ) } @@ -42,18 +49,6 @@ pub enum AggregationResult { /// Metric result variant. MetricResult(MetricResult), } -impl From for AggregationResult { - fn from(tree: IntermediateAggregationResult) -> Self { - match tree { - IntermediateAggregationResult::Bucket(bucket) => { - AggregationResult::BucketResult(bucket.into()) - } - IntermediateAggregationResult::Metric(metric) => { - AggregationResult::MetricResult(metric.into()) - } - } - } -} #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(untagged)] diff --git a/src/aggregation/bucket/histogram/histogram.rs b/src/aggregation/bucket/histogram/histogram.rs index 387215674..f5f8ab9dd 100644 --- a/src/aggregation/bucket/histogram/histogram.rs +++ b/src/aggregation/bucket/histogram/histogram.rs @@ -141,30 +141,26 @@ impl SegmentHistogramCollector { .filter(|bucket| bucket.doc_count != 0) .count(), ); + + // Below we remove empty buckets for two reasons + // 1. To reduce the size of the intermediate result, which may be passed on the wire. + // 2. To mimic elasticsearch, there are no empty buckets at the start and end. + // + // Empty buckets may be added later again in the final result, depending on the request. if let Some(sub_aggregations) = self.sub_aggregations { buckets.extend( self.buckets .into_iter() .zip(sub_aggregations.into_iter()) - // Here we remove the empty buckets for two reasons - // 1. To reduce the size of the intermediate result, which may be passed on the wire. - // 2. To mimic elasticsearch, there are no empty buckets at the start and end. - // - // Empty buckets may be added later again in the final result, depending on the request. .filter(|(bucket, _sub_aggregation)| bucket.doc_count != 0) - .map(|(bucket, sub_aggregation)| (bucket, Some(sub_aggregation)).into()), + .map(|(bucket, sub_aggregation)| (bucket, sub_aggregation).into()), ) } else { buckets.extend( self.buckets .into_iter() - // Here we remove the empty buckets for two reasons - // 1. To reduce the size of the intermediate result, which may be passed on the wire. - // 2. To mimic elasticsearch, there are no empty buckets at the start and end. - // - // Empty buckets may be added later again in the final result, depending on the request. .filter(|bucket| bucket.doc_count != 0) - .map(|bucket| (bucket, None).into()), + .map(|bucket| bucket.into()), ); }; diff --git a/src/aggregation/intermediate_agg_result.rs b/src/aggregation/intermediate_agg_result.rs index d8b76c062..28b240d22 100644 --- a/src/aggregation/intermediate_agg_result.rs +++ b/src/aggregation/intermediate_agg_result.rs @@ -19,18 +19,26 @@ use super::{Key, MergeFruits, SerializedKey, VecWithNames}; /// Contains the intermediate aggregation result, which is optimized to be merged with other /// intermediate results. #[derive(Default, Clone, Debug, PartialEq, Serialize, Deserialize)] -pub struct IntermediateAggregationResults(pub(crate) VecWithNames); +pub struct IntermediateAggregationResults { + pub(crate) metrics: Option>, + pub(crate) buckets: Option>, +} impl From for IntermediateAggregationResults { fn from(tree: SegmentAggregationResultsCollector) -> Self { - let mut data = vec![]; - for (key, bucket) in tree.buckets.into_iter() { - data.push((key, IntermediateAggregationResult::Bucket(bucket.into()))); - } - for (key, metric) in tree.metrics.into_iter() { - data.push((key, IntermediateAggregationResult::Metric(metric.into()))); - } - Self(VecWithNames::from_entries(data)) + let metrics = if tree.metrics.is_empty() { + None + } else { + Some(VecWithNames::from_other(tree.metrics)) + }; + + let buckets = if tree.buckets.is_empty() { + None + } else { + Some(VecWithNames::from_other(tree.buckets)) + }; + + Self { metrics, buckets } } } @@ -40,8 +48,18 @@ impl IntermediateAggregationResults { /// The order of the values need to be the same on both results. This is ensured when the same /// (key values) are present on the underlying VecWithNames struct. pub fn merge_fruits(&mut self, other: &IntermediateAggregationResults) { - for (tree_left, tree_right) in self.0.values_mut().zip(other.0.values()) { - tree_left.merge_fruits(tree_right); + if let (Some(buckets_left), Some(buckets_right)) = (&mut self.buckets, &other.buckets) { + for (bucket_left, bucket_right) in buckets_left.values_mut().zip(buckets_right.values()) + { + bucket_left.merge_fruits(bucket_right); + } + } + + if let (Some(metrics_left), Some(metrics_right)) = (&mut self.metrics, &other.metrics) { + for (metric_left, metric_right) in metrics_left.values_mut().zip(metrics_right.values()) + { + metric_left.merge_fruits(metric_right); + } } } } @@ -55,28 +73,6 @@ pub enum IntermediateAggregationResult { Metric(IntermediateMetricResult), } -impl IntermediateAggregationResult { - fn merge_fruits(&mut self, other: &IntermediateAggregationResult) { - match (self, other) { - ( - IntermediateAggregationResult::Bucket(res_left), - IntermediateAggregationResult::Bucket(res_right), - ) => { - res_left.merge_fruits(res_right); - } - ( - IntermediateAggregationResult::Metric(res_left), - IntermediateAggregationResult::Metric(res_right), - ) => { - res_left.merge_fruits(res_right); - } - _ => { - panic!("incompatible types in aggregation tree on merge fruits"); - } - } - } -} - /// Holds the intermediate data for metric results #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum IntermediateMetricResult { @@ -196,13 +192,6 @@ impl IntermediateBucketResult { } } -// fn merge_sorted_vecs(entries_left: &mut Vec, entries_right: &Vec) { -// for el in entries_left -//.iter_mut() -//.merge_join_by(entries_right.iter(), |left, right| left.key.cmp(right.key)) -//{} -//} - fn merge_maps( entries_left: &mut FnvHashMap, entries_right: &FnvHashMap, @@ -232,25 +221,32 @@ pub struct IntermediateHistogramBucketEntry { pub sub_aggregation: IntermediateAggregationResults, } +impl From for IntermediateHistogramBucketEntry { + fn from(entry: SegmentHistogramBucketEntry) -> Self { + IntermediateHistogramBucketEntry { + key: entry.key, + doc_count: entry.doc_count, + sub_aggregation: Default::default(), + } + } +} + impl From<( SegmentHistogramBucketEntry, - Option, + SegmentAggregationResultsCollector, )> for IntermediateHistogramBucketEntry { fn from( entry: ( SegmentHistogramBucketEntry, - Option, + SegmentAggregationResultsCollector, ), ) -> Self { IntermediateHistogramBucketEntry { key: entry.0.key, doc_count: entry.0.doc_count, - sub_aggregation: entry - .1 - .map(|sub_aggregations| sub_aggregations.into()) - .unwrap_or_default(), + sub_aggregation: entry.1.into(), } } } @@ -333,9 +329,12 @@ mod tests { } map.insert( "my_agg_level2".to_string(), - IntermediateAggregationResult::Bucket(IntermediateBucketResult::Range(buckets)), + IntermediateBucketResult::Range(buckets), ); - IntermediateAggregationResults(VecWithNames::from_entries(map.into_iter().collect())) + IntermediateAggregationResults { + buckets: Some(VecWithNames::from_entries(map.into_iter().collect())), + metrics: Default::default(), + } } fn get_test_tree(data: &[(String, u64, String, u64)]) -> IntermediateAggregationResults { @@ -359,9 +358,12 @@ mod tests { } map.insert( "my_agg_level1".to_string(), - IntermediateAggregationResult::Bucket(IntermediateBucketResult::Range(buckets)), + IntermediateBucketResult::Range(buckets), ); - IntermediateAggregationResults(VecWithNames::from_entries(map.into_iter().collect())) + IntermediateAggregationResults { + buckets: Some(VecWithNames::from_entries(map.into_iter().collect())), + metrics: Default::default(), + } } #[test] diff --git a/src/aggregation/mod.rs b/src/aggregation/mod.rs index 7bff2c79e..ca54ea294 100644 --- a/src/aggregation/mod.rs +++ b/src/aggregation/mod.rs @@ -176,6 +176,7 @@ pub(crate) struct VecWithNames { values: Vec, keys: Vec, } + impl Default for VecWithNames { fn default() -> Self { Self { @@ -198,6 +199,15 @@ impl From> for VecWithNames { } impl VecWithNames { + fn from_other>(entries: VecWithNames) -> Self { + let mut values = Vec::with_capacity(entries.len()); + values.extend(entries.values.into_iter().map(Into::into)); + Self { + keys: entries.keys, + values, + } + } + fn from_entries(mut entries: Vec<(String, T)>) -> Self { // Sort to ensure order of elements match across multiple instances entries.sort_by(|left, right| left.0.cmp(&right.0)); @@ -230,6 +240,9 @@ impl VecWithNames { fn entries(&self) -> impl Iterator + '_ { self.keys().zip(self.values.iter()) } + fn len(&self) -> usize { + self.values.len() + } fn is_empty(&self) -> bool { self.keys.is_empty() } @@ -384,7 +397,7 @@ mod tests { merge_segments: bool, use_distributed_collector: bool, ) -> crate::Result<()> { - let index = get_test_index_with_num_docs(merge_segments, 300)?; + let index = get_test_index_with_num_docs(merge_segments, 80)?; let reader = index.reader()?; let text_field = reader.searcher().schema().get_field("text").unwrap(); @@ -394,12 +407,12 @@ mod tests { IndexRecordOption::Basic, ); - assert_eq!(DOC_BLOCK_SIZE, 256); + assert_eq!(DOC_BLOCK_SIZE, 64); // In the tree we cache Documents of DOC_BLOCK_SIZE, before passing them down as one block. // // Build a request so that on the first level we have one full cache, which is then flushed. - // The same cache should have some residue docs at the end, which are flushed (Range 0-266) - // -> 266 docs + // The same cache should have some residue docs at the end, which are flushed (Range 0-70) + // -> 70 docs // // The second level should also have some residue docs in the cache that are flushed at the // end. @@ -412,13 +425,13 @@ mod tests { "bucketsL1": { "range": { "field": "score", - "ranges": [ { "to": 3.0f64 }, { "from": 3.0f64, "to": 266.0f64 }, { "from": 266.0f64 } ] + "ranges": [ { "to": 3.0f64 }, { "from": 3.0f64, "to": 70.0f64 }, { "from": 70.0f64 } ] }, "aggs": { "bucketsL2": { "range": { "field": "score", - "ranges": [ { "to": 100.0f64 }, { "from": 100.0f64, "to": 266.0f64 }, { "from": 266.0f64 } ] + "ranges": [ { "to": 30.0f64 }, { "from": 30.0f64, "to": 70.0f64 }, { "from": 70.0f64 } ] } } } @@ -426,14 +439,14 @@ mod tests { "histogram_test":{ "histogram": { "field": "score", - "interval": 263.0, + "interval": 70.0, "offset": 3.0, }, "aggs": { "bucketsL2": { "histogram": { "field": "score", - "interval": 263.0 + "interval": 70.0 } } } @@ -463,15 +476,15 @@ mod tests { res["bucketsL1"]["buckets"][0]["bucketsL2"]["buckets"][0]["doc_count"], 3 ); - assert_eq!(res["bucketsL1"]["buckets"][1]["key"], "3-266"); - assert_eq!(res["bucketsL1"]["buckets"][1]["doc_count"], 266 - 3); + assert_eq!(res["bucketsL1"]["buckets"][1]["key"], "3-70"); + assert_eq!(res["bucketsL1"]["buckets"][1]["doc_count"], 70 - 3); assert_eq!( res["bucketsL1"]["buckets"][1]["bucketsL2"]["buckets"][0]["doc_count"], - 97 + 27 ); assert_eq!( res["bucketsL1"]["buckets"][1]["bucketsL2"]["buckets"][1]["doc_count"], - 166 + 40 ); assert_eq!( res["bucketsL1"]["buckets"][1]["bucketsL2"]["buckets"][2]["doc_count"], @@ -479,9 +492,9 @@ mod tests { ); assert_eq!( res["bucketsL1"]["buckets"][2]["bucketsL2"]["buckets"][2]["doc_count"], - 300 - 266 + 80 - 70 ); - assert_eq!(res["bucketsL1"]["buckets"][2]["doc_count"], 300 - 266); + assert_eq!(res["bucketsL1"]["buckets"][2]["doc_count"], 80 - 70); Ok(()) } diff --git a/src/aggregation/segment_agg_result.rs b/src/aggregation/segment_agg_result.rs index 0cb7ebe2d..2f7884373 100644 --- a/src/aggregation/segment_agg_result.rs +++ b/src/aggregation/segment_agg_result.rs @@ -17,7 +17,7 @@ use super::{Key, VecWithNames}; use crate::aggregation::agg_req::BucketAggregationType; use crate::DocId; -pub(crate) const DOC_BLOCK_SIZE: usize = 256; +pub(crate) const DOC_BLOCK_SIZE: usize = 64; pub(crate) type DocBlock = [DocId; DOC_BLOCK_SIZE]; #[derive(Clone, PartialEq)]