diff --git a/src/aggregation/bucket/histogram/histogram.rs b/src/aggregation/bucket/histogram/histogram.rs index f5f8ab9dd..1ff49b49a 100644 --- a/src/aggregation/bucket/histogram/histogram.rs +++ b/src/aggregation/bucket/histogram/histogram.rs @@ -208,7 +208,7 @@ impl SegmentHistogramCollector { let first_bucket_num = get_bucket_num_f64(min, req.interval, req.offset.unwrap_or(0.0)) as i64; - let bounds = req.hard_bounds.clone().unwrap_or(HistogramBounds { + let bounds = req.hard_bounds.unwrap_or(HistogramBounds { min: f64::MIN, max: f64::MAX, }); @@ -319,7 +319,7 @@ impl SegmentHistogramCollector { get_bucket_val(val, self.req.interval, self.offset) as f64 ); - self.increment_bucket(bucket_pos, doc, &bucket_with_accessor); + self.increment_bucket(bucket_pos, doc, bucket_with_accessor); } } diff --git a/src/aggregation/collector.rs b/src/aggregation/collector.rs index ec581d6a7..4b109bddc 100644 --- a/src/aggregation/collector.rs +++ b/src/aggregation/collector.rs @@ -75,13 +75,7 @@ impl Collector for AggregationCollector { _segment_local_id: crate::SegmentOrdinal, reader: &crate::SegmentReader, ) -> crate::Result { - let aggs_with_accessor = get_aggs_with_accessor_and_validate(&self.agg, reader)?; - let result = - SegmentAggregationResultsCollector::from_req_and_validate(&aggs_with_accessor)?; - Ok(AggregationSegmentCollector { - aggs: aggs_with_accessor, - result, - }) + AggregationSegmentCollector::from_agg_req_and_reader(&self.agg, reader) } fn requires_scoring(&self) -> bool { @@ -101,7 +95,7 @@ fn merge_fruits( ) -> crate::Result { if let Some(mut fruit) = segment_fruits.pop() { for next_fruit in segment_fruits { - fruit.merge_fruits(&next_fruit); + fruit.merge_fruits(next_fruit); } Ok(fruit) } else { @@ -128,7 +122,7 @@ impl AggregationSegmentCollector { let result = SegmentAggregationResultsCollector::from_req_and_validate(&aggs_with_accessor)?; Ok(AggregationSegmentCollector { - aggs: aggs_with_accessor, + aggs: aggs_with_accessor.into(), result, }) } diff --git a/src/aggregation/intermediate_agg_result.rs b/src/aggregation/intermediate_agg_result.rs index 68c6567c1..a5d36d2e9 100644 --- a/src/aggregation/intermediate_agg_result.rs +++ b/src/aggregation/intermediate_agg_result.rs @@ -14,7 +14,7 @@ use super::segment_agg_result::{ SegmentAggregationResultsCollector, SegmentBucketResultCollector, SegmentHistogramBucketEntry, SegmentMetricResultCollector, SegmentRangeBucketEntry, }; -use super::{Key, MergeFruits, SerializedKey, VecWithNames}; +use super::{Key, SerializedKey, VecWithNames}; /// Contains the intermediate aggregation result, which is optimized to be merged with other /// intermediate results. @@ -26,12 +26,8 @@ pub struct IntermediateAggregationResults { impl From for IntermediateAggregationResults { fn from(tree: SegmentAggregationResultsCollector) -> Self { - let metrics = tree - .metrics - .map(|metrics| VecWithNames::from_other(metrics)); - let buckets = tree - .buckets - .map(|buckets| VecWithNames::from_other(buckets)); + let metrics = tree.metrics.map(VecWithNames::from_other); + let buckets = tree.buckets.map(VecWithNames::from_other); Self { metrics, buckets } } @@ -42,16 +38,18 @@ impl IntermediateAggregationResults { /// /// The order of the values need to be the same on both results. This is ensured when the same /// (key values) are present on the underlying VecWithNames struct. - pub fn merge_fruits(&mut self, other: &IntermediateAggregationResults) { - if let (Some(buckets_left), Some(buckets_right)) = (&mut self.buckets, &other.buckets) { - for (bucket_left, bucket_right) in buckets_left.values_mut().zip(buckets_right.values()) + pub fn merge_fruits(&mut self, other: IntermediateAggregationResults) { + if let (Some(buckets_left), Some(buckets_right)) = (&mut self.buckets, other.buckets) { + for (bucket_left, bucket_right) in + buckets_left.values_mut().zip(buckets_right.into_values()) { bucket_left.merge_fruits(bucket_right); } } - if let (Some(metrics_left), Some(metrics_right)) = (&mut self.metrics, &other.metrics) { - for (metric_left, metric_right) in metrics_left.values_mut().zip(metrics_right.values()) + if let (Some(metrics_left), Some(metrics_right)) = (&mut self.metrics, other.metrics) { + for (metric_left, metric_right) in + metrics_left.values_mut().zip(metrics_right.into_values()) { metric_left.merge_fruits(metric_right); } @@ -91,7 +89,7 @@ impl From for IntermediateMetricResult { } impl IntermediateMetricResult { - fn merge_fruits(&mut self, other: &IntermediateMetricResult) { + fn merge_fruits(&mut self, other: IntermediateMetricResult) { match (self, other) { ( IntermediateMetricResult::Average(avg_data_left), @@ -106,7 +104,7 @@ impl IntermediateMetricResult { stats_left.merge_fruits(stats_right); } _ => { - panic!("incompatible fruit types in tree {:?}", other); + panic!("incompatible fruit types in tree"); } } } @@ -142,7 +140,7 @@ impl From for IntermediateBucketResult { } impl IntermediateBucketResult { - fn merge_fruits(&mut self, other: &IntermediateBucketResult) { + fn merge_fruits(&mut self, other: IntermediateBucketResult) { match (self, other) { ( IntermediateBucketResult::Range(entries_left), @@ -162,7 +160,7 @@ impl IntermediateBucketResult { ) => { let mut buckets = entries_left .drain(..) - .merge_join_by(entries_right.iter(), |left, right| { + .merge_join_by(entries_right.into_iter(), |left, right| { left.key.partial_cmp(&right.key).unwrap_or(Ordering::Equal) }) .map(|either| match either { @@ -171,7 +169,7 @@ impl IntermediateBucketResult { left } itertools::EitherOrBoth::Left(left) => left, - itertools::EitherOrBoth::Right(right) => right.clone(), + itertools::EitherOrBoth::Right(right) => right, }) .collect(); @@ -187,20 +185,22 @@ impl IntermediateBucketResult { } } +trait MergeFruits { + fn merge_fruits(&mut self, other: Self); +} + fn merge_maps( entries_left: &mut FnvHashMap, - entries_right: &FnvHashMap, + mut entries_right: FnvHashMap, ) { for (name, entry_left) in entries_left.iter_mut() { - if let Some(entry_right) = entries_right.get(name) { + if let Some(entry_right) = entries_right.remove(name) { entry_left.merge_fruits(entry_right); } } - for (key, res) in entries_right.iter() { - if !entries_left.contains_key(key) { - entries_left.insert(key.clone(), res.clone()); - } + for (key, res) in entries_right.into_iter() { + entries_left.entry(key).or_insert(res); } } @@ -285,16 +285,16 @@ impl From for IntermediateRangeBucketEntry { } impl MergeFruits for IntermediateRangeBucketEntry { - fn merge_fruits(&mut self, other: &IntermediateRangeBucketEntry) { + fn merge_fruits(&mut self, other: IntermediateRangeBucketEntry) { self.doc_count += other.doc_count; - self.sub_aggregation.merge_fruits(&other.sub_aggregation); + self.sub_aggregation.merge_fruits(other.sub_aggregation); } } impl MergeFruits for IntermediateHistogramBucketEntry { - fn merge_fruits(&mut self, other: &IntermediateHistogramBucketEntry) { + fn merge_fruits(&mut self, other: IntermediateHistogramBucketEntry) { self.doc_count += other.doc_count; - self.sub_aggregation.merge_fruits(&other.sub_aggregation); + self.sub_aggregation.merge_fruits(other.sub_aggregation); } } @@ -372,7 +372,7 @@ mod tests { ("blue".to_string(), 25, "1900".to_string(), 50), ]); - tree_left.merge_fruits(&tree_right); + tree_left.merge_fruits(tree_right); let tree_expected = get_test_tree(&[ ("red".to_string(), 110, "1900".to_string(), 55), @@ -393,7 +393,7 @@ mod tests { ("green".to_string(), 25, "1900".to_string(), 50), ]); - tree_left.merge_fruits(&tree_right); + tree_left.merge_fruits(tree_right); let tree_expected = get_test_tree(&[ ("red".to_string(), 110, "1900".to_string(), 55), diff --git a/src/aggregation/metric/average.rs b/src/aggregation/metric/average.rs index e582947eb..7a12c7ef9 100644 --- a/src/aggregation/metric/average.rs +++ b/src/aggregation/metric/average.rs @@ -94,7 +94,7 @@ impl IntermediateAverage { } /// Merge average data into this instance. - pub fn merge_fruits(&mut self, other: &IntermediateAverage) { + pub fn merge_fruits(&mut self, other: IntermediateAverage) { self.sum += other.sum; self.doc_count += other.doc_count; } diff --git a/src/aggregation/metric/stats.rs b/src/aggregation/metric/stats.rs index b596cd7a5..daf5c0400 100644 --- a/src/aggregation/metric/stats.rs +++ b/src/aggregation/metric/stats.rs @@ -92,7 +92,7 @@ impl IntermediateStats { } /// Merge data from other stats into this instance. - pub fn merge_fruits(&mut self, other: &IntermediateStats) { + pub fn merge_fruits(&mut self, other: IntermediateStats) { self.count += other.count; self.sum += other.sum; self.squared_sum += other.squared_sum; diff --git a/src/aggregation/mod.rs b/src/aggregation/mod.rs index ca54ea294..addcbe346 100644 --- a/src/aggregation/mod.rs +++ b/src/aggregation/mod.rs @@ -231,6 +231,9 @@ impl VecWithNames { fn keys(&self) -> impl Iterator + '_ { self.keys.iter().map(|key| key.as_str()) } + fn into_values(self) -> impl Iterator { + self.values.into_iter() + } fn values(&self) -> impl Iterator + '_ { self.values.iter() } @@ -261,10 +264,6 @@ pub enum Key { F64(f64), } -trait MergeFruits { - fn merge_fruits(&mut self, other: &Self); -} - impl Display for Key { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -368,14 +367,14 @@ mod tests { let index = Index::create_in_ram(schema_builder.build()); { let mut index_writer = index.writer_for_tests()?; - for i in values { + for &i in values { // writing the segment index_writer.add_document(doc!( text_field => "cool", - score_field => *i as u64, - score_field_f64 => *i as f64, - score_field_i64 => *i as i64, - fraction_field => *i as f64/100.0, + score_field => i as u64, + score_field_f64 => i as f64, + score_field_i64 => i as i64, + fraction_field => i as f64/100.0, ))?; index_writer.commit()?; } diff --git a/src/aggregation/segment_agg_result.rs b/src/aggregation/segment_agg_result.rs index 0af15d3fe..0064546a3 100644 --- a/src/aggregation/segment_agg_result.rs +++ b/src/aggregation/segment_agg_result.rs @@ -98,8 +98,8 @@ impl SegmentAggregationResultsCollector { force_flush: bool, ) { if let Some(metrics) = &mut self.metrics { - for (agg_with_accessor, collector) in - agg_with_accessor.metrics.values().zip(metrics.values_mut()) + for (collector, agg_with_accessor) in + metrics.values_mut().zip(agg_with_accessor.metrics.values()) { collector .collect_block(&self.staged_docs[..self.num_staged_docs], agg_with_accessor); @@ -107,8 +107,8 @@ impl SegmentAggregationResultsCollector { } if let Some(buckets) = &mut self.buckets { - for (agg_with_accessor, collector) in - agg_with_accessor.buckets.values().zip(buckets.values_mut()) + for (collector, agg_with_accessor) in + buckets.values_mut().zip(agg_with_accessor.buckets.values()) { collector.collect_block( &self.staged_docs[..self.num_staged_docs],