merge_fruits pass by value

This commit is contained in:
Pascal Seitz
2022-03-15 12:59:22 +08:00
parent 613aad7a8a
commit 0262e44bbd
7 changed files with 48 additions and 55 deletions

View File

@@ -208,7 +208,7 @@ impl SegmentHistogramCollector {
let first_bucket_num =
get_bucket_num_f64(min, req.interval, req.offset.unwrap_or(0.0)) as i64;
let bounds = req.hard_bounds.clone().unwrap_or(HistogramBounds {
let bounds = req.hard_bounds.unwrap_or(HistogramBounds {
min: f64::MIN,
max: f64::MAX,
});
@@ -319,7 +319,7 @@ impl SegmentHistogramCollector {
get_bucket_val(val, self.req.interval, self.offset) as f64
);
self.increment_bucket(bucket_pos, doc, &bucket_with_accessor);
self.increment_bucket(bucket_pos, doc, bucket_with_accessor);
}
}

View File

@@ -75,13 +75,7 @@ impl Collector for AggregationCollector {
_segment_local_id: crate::SegmentOrdinal,
reader: &crate::SegmentReader,
) -> crate::Result<Self::Child> {
let aggs_with_accessor = get_aggs_with_accessor_and_validate(&self.agg, reader)?;
let result =
SegmentAggregationResultsCollector::from_req_and_validate(&aggs_with_accessor)?;
Ok(AggregationSegmentCollector {
aggs: aggs_with_accessor,
result,
})
AggregationSegmentCollector::from_agg_req_and_reader(&self.agg, reader)
}
fn requires_scoring(&self) -> bool {
@@ -101,7 +95,7 @@ fn merge_fruits(
) -> crate::Result<IntermediateAggregationResults> {
if let Some(mut fruit) = segment_fruits.pop() {
for next_fruit in segment_fruits {
fruit.merge_fruits(&next_fruit);
fruit.merge_fruits(next_fruit);
}
Ok(fruit)
} else {
@@ -128,7 +122,7 @@ impl AggregationSegmentCollector {
let result =
SegmentAggregationResultsCollector::from_req_and_validate(&aggs_with_accessor)?;
Ok(AggregationSegmentCollector {
aggs: aggs_with_accessor,
aggs: aggs_with_accessor.into(),
result,
})
}

View File

@@ -14,7 +14,7 @@ use super::segment_agg_result::{
SegmentAggregationResultsCollector, SegmentBucketResultCollector, SegmentHistogramBucketEntry,
SegmentMetricResultCollector, SegmentRangeBucketEntry,
};
use super::{Key, MergeFruits, SerializedKey, VecWithNames};
use super::{Key, SerializedKey, VecWithNames};
/// Contains the intermediate aggregation result, which is optimized to be merged with other
/// intermediate results.
@@ -26,12 +26,8 @@ pub struct IntermediateAggregationResults {
impl From<SegmentAggregationResultsCollector> for IntermediateAggregationResults {
fn from(tree: SegmentAggregationResultsCollector) -> Self {
let metrics = tree
.metrics
.map(|metrics| VecWithNames::from_other(metrics));
let buckets = tree
.buckets
.map(|buckets| VecWithNames::from_other(buckets));
let metrics = tree.metrics.map(VecWithNames::from_other);
let buckets = tree.buckets.map(VecWithNames::from_other);
Self { metrics, buckets }
}
@@ -42,16 +38,18 @@ impl IntermediateAggregationResults {
///
/// The order of the values need to be the same on both results. This is ensured when the same
/// (key values) are present on the underlying VecWithNames struct.
pub fn merge_fruits(&mut self, other: &IntermediateAggregationResults) {
if let (Some(buckets_left), Some(buckets_right)) = (&mut self.buckets, &other.buckets) {
for (bucket_left, bucket_right) in buckets_left.values_mut().zip(buckets_right.values())
pub fn merge_fruits(&mut self, other: IntermediateAggregationResults) {
if let (Some(buckets_left), Some(buckets_right)) = (&mut self.buckets, other.buckets) {
for (bucket_left, bucket_right) in
buckets_left.values_mut().zip(buckets_right.into_values())
{
bucket_left.merge_fruits(bucket_right);
}
}
if let (Some(metrics_left), Some(metrics_right)) = (&mut self.metrics, &other.metrics) {
for (metric_left, metric_right) in metrics_left.values_mut().zip(metrics_right.values())
if let (Some(metrics_left), Some(metrics_right)) = (&mut self.metrics, other.metrics) {
for (metric_left, metric_right) in
metrics_left.values_mut().zip(metrics_right.into_values())
{
metric_left.merge_fruits(metric_right);
}
@@ -91,7 +89,7 @@ impl From<SegmentMetricResultCollector> for IntermediateMetricResult {
}
impl IntermediateMetricResult {
fn merge_fruits(&mut self, other: &IntermediateMetricResult) {
fn merge_fruits(&mut self, other: IntermediateMetricResult) {
match (self, other) {
(
IntermediateMetricResult::Average(avg_data_left),
@@ -106,7 +104,7 @@ impl IntermediateMetricResult {
stats_left.merge_fruits(stats_right);
}
_ => {
panic!("incompatible fruit types in tree {:?}", other);
panic!("incompatible fruit types in tree");
}
}
}
@@ -142,7 +140,7 @@ impl From<SegmentBucketResultCollector> for IntermediateBucketResult {
}
impl IntermediateBucketResult {
fn merge_fruits(&mut self, other: &IntermediateBucketResult) {
fn merge_fruits(&mut self, other: IntermediateBucketResult) {
match (self, other) {
(
IntermediateBucketResult::Range(entries_left),
@@ -162,7 +160,7 @@ impl IntermediateBucketResult {
) => {
let mut buckets = entries_left
.drain(..)
.merge_join_by(entries_right.iter(), |left, right| {
.merge_join_by(entries_right.into_iter(), |left, right| {
left.key.partial_cmp(&right.key).unwrap_or(Ordering::Equal)
})
.map(|either| match either {
@@ -171,7 +169,7 @@ impl IntermediateBucketResult {
left
}
itertools::EitherOrBoth::Left(left) => left,
itertools::EitherOrBoth::Right(right) => right.clone(),
itertools::EitherOrBoth::Right(right) => right,
})
.collect();
@@ -187,20 +185,22 @@ impl IntermediateBucketResult {
}
}
trait MergeFruits {
fn merge_fruits(&mut self, other: Self);
}
fn merge_maps<V: MergeFruits + Clone>(
entries_left: &mut FnvHashMap<SerializedKey, V>,
entries_right: &FnvHashMap<SerializedKey, V>,
mut entries_right: FnvHashMap<SerializedKey, V>,
) {
for (name, entry_left) in entries_left.iter_mut() {
if let Some(entry_right) = entries_right.get(name) {
if let Some(entry_right) = entries_right.remove(name) {
entry_left.merge_fruits(entry_right);
}
}
for (key, res) in entries_right.iter() {
if !entries_left.contains_key(key) {
entries_left.insert(key.clone(), res.clone());
}
for (key, res) in entries_right.into_iter() {
entries_left.entry(key).or_insert(res);
}
}
@@ -285,16 +285,16 @@ impl From<SegmentRangeBucketEntry> for IntermediateRangeBucketEntry {
}
impl MergeFruits for IntermediateRangeBucketEntry {
fn merge_fruits(&mut self, other: &IntermediateRangeBucketEntry) {
fn merge_fruits(&mut self, other: IntermediateRangeBucketEntry) {
self.doc_count += other.doc_count;
self.sub_aggregation.merge_fruits(&other.sub_aggregation);
self.sub_aggregation.merge_fruits(other.sub_aggregation);
}
}
impl MergeFruits for IntermediateHistogramBucketEntry {
fn merge_fruits(&mut self, other: &IntermediateHistogramBucketEntry) {
fn merge_fruits(&mut self, other: IntermediateHistogramBucketEntry) {
self.doc_count += other.doc_count;
self.sub_aggregation.merge_fruits(&other.sub_aggregation);
self.sub_aggregation.merge_fruits(other.sub_aggregation);
}
}
@@ -372,7 +372,7 @@ mod tests {
("blue".to_string(), 25, "1900".to_string(), 50),
]);
tree_left.merge_fruits(&tree_right);
tree_left.merge_fruits(tree_right);
let tree_expected = get_test_tree(&[
("red".to_string(), 110, "1900".to_string(), 55),
@@ -393,7 +393,7 @@ mod tests {
("green".to_string(), 25, "1900".to_string(), 50),
]);
tree_left.merge_fruits(&tree_right);
tree_left.merge_fruits(tree_right);
let tree_expected = get_test_tree(&[
("red".to_string(), 110, "1900".to_string(), 55),

View File

@@ -94,7 +94,7 @@ impl IntermediateAverage {
}
/// Merge average data into this instance.
pub fn merge_fruits(&mut self, other: &IntermediateAverage) {
pub fn merge_fruits(&mut self, other: IntermediateAverage) {
self.sum += other.sum;
self.doc_count += other.doc_count;
}

View File

@@ -92,7 +92,7 @@ impl IntermediateStats {
}
/// Merge data from other stats into this instance.
pub fn merge_fruits(&mut self, other: &IntermediateStats) {
pub fn merge_fruits(&mut self, other: IntermediateStats) {
self.count += other.count;
self.sum += other.sum;
self.squared_sum += other.squared_sum;

View File

@@ -231,6 +231,9 @@ impl<T: Clone> VecWithNames<T> {
fn keys(&self) -> impl Iterator<Item = &str> + '_ {
self.keys.iter().map(|key| key.as_str())
}
fn into_values(self) -> impl Iterator<Item = T> {
self.values.into_iter()
}
fn values(&self) -> impl Iterator<Item = &T> + '_ {
self.values.iter()
}
@@ -261,10 +264,6 @@ pub enum Key {
F64(f64),
}
trait MergeFruits {
fn merge_fruits(&mut self, other: &Self);
}
impl Display for Key {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
@@ -368,14 +367,14 @@ mod tests {
let index = Index::create_in_ram(schema_builder.build());
{
let mut index_writer = index.writer_for_tests()?;
for i in values {
for &i in values {
// writing the segment
index_writer.add_document(doc!(
text_field => "cool",
score_field => *i as u64,
score_field_f64 => *i as f64,
score_field_i64 => *i as i64,
fraction_field => *i as f64/100.0,
score_field => i as u64,
score_field_f64 => i as f64,
score_field_i64 => i as i64,
fraction_field => i as f64/100.0,
))?;
index_writer.commit()?;
}

View File

@@ -98,8 +98,8 @@ impl SegmentAggregationResultsCollector {
force_flush: bool,
) {
if let Some(metrics) = &mut self.metrics {
for (agg_with_accessor, collector) in
agg_with_accessor.metrics.values().zip(metrics.values_mut())
for (collector, agg_with_accessor) in
metrics.values_mut().zip(agg_with_accessor.metrics.values())
{
collector
.collect_block(&self.staged_docs[..self.num_staged_docs], agg_with_accessor);
@@ -107,8 +107,8 @@ impl SegmentAggregationResultsCollector {
}
if let Some(buckets) = &mut self.buckets {
for (agg_with_accessor, collector) in
agg_with_accessor.buckets.values().zip(buckets.values_mut())
for (collector, agg_with_accessor) in
buckets.values_mut().zip(agg_with_accessor.buckets.values())
{
collector.collect_block(
&self.staged_docs[..self.num_staged_docs],