diff --git a/src/aggregation/agg_req_with_accessor.rs b/src/aggregation/agg_req_with_accessor.rs index 8ed82ac5c..ed44ab6ef 100644 --- a/src/aggregation/agg_req_with_accessor.rs +++ b/src/aggregation/agg_req_with_accessor.rs @@ -1,5 +1,7 @@ //! This will enhance the request tree with access to the fastfield and metadata. +use std::rc::Rc; +use std::sync::atomic::AtomicU32; use std::sync::Arc; use super::agg_req::{Aggregation, Aggregations, BucketAggregationType, MetricAggregation}; diff --git a/src/aggregation/bucket/histogram/histogram.rs b/src/aggregation/bucket/histogram/histogram.rs index 0d5f5574c..c2a7e3472 100644 --- a/src/aggregation/bucket/histogram/histogram.rs +++ b/src/aggregation/bucket/histogram/histogram.rs @@ -311,7 +311,7 @@ impl SegmentHistogramCollector { doc: &[DocId], bucket_with_accessor: &BucketAggregationWithAccessor, force_flush: bool, - ) { + ) -> crate::Result<()> { let bounds = self.bounds; let interval = self.interval; let offset = self.offset; @@ -341,28 +341,28 @@ impl SegmentHistogramCollector { bucket_pos0, docs[0], &bucket_with_accessor.sub_aggregation, - ); + )?; self.increment_bucket_if_in_bounds( val1, &bounds, bucket_pos1, docs[1], &bucket_with_accessor.sub_aggregation, - ); + )?; self.increment_bucket_if_in_bounds( val2, &bounds, bucket_pos2, docs[2], &bucket_with_accessor.sub_aggregation, - ); + )?; self.increment_bucket_if_in_bounds( val3, &bounds, bucket_pos3, docs[3], &bucket_with_accessor.sub_aggregation, - ); + )?; } for doc in iter.remainder() { let val = f64_from_fastfield_u64(accessor.get(*doc), &self.field_type); @@ -376,16 +376,17 @@ impl SegmentHistogramCollector { self.buckets[bucket_pos].key, get_bucket_val(val, self.interval, self.offset) as f64 ); - self.increment_bucket(bucket_pos, *doc, &bucket_with_accessor.sub_aggregation); + self.increment_bucket(bucket_pos, *doc, &bucket_with_accessor.sub_aggregation)?; } if force_flush { if let Some(sub_aggregations) = self.sub_aggregations.as_mut() { for sub_aggregation in sub_aggregations { sub_aggregation - .flush_staged_docs(&bucket_with_accessor.sub_aggregation, force_flush); + .flush_staged_docs(&bucket_with_accessor.sub_aggregation, force_flush)?; } } } + Ok(()) } #[inline] @@ -396,15 +397,16 @@ impl SegmentHistogramCollector { bucket_pos: usize, doc: DocId, bucket_with_accessor: &AggregationsWithAccessor, - ) { + ) -> crate::Result<()> { if bounds.contains(val) { debug_assert_eq!( self.buckets[bucket_pos].key, get_bucket_val(val, self.interval, self.offset) as f64 ); - self.increment_bucket(bucket_pos, doc, bucket_with_accessor); + self.increment_bucket(bucket_pos, doc, bucket_with_accessor)?; } + Ok(()) } #[inline] @@ -413,12 +415,13 @@ impl SegmentHistogramCollector { bucket_pos: usize, doc: DocId, bucket_with_accessor: &AggregationsWithAccessor, - ) { + ) -> crate::Result<()> { let bucket = &mut self.buckets[bucket_pos]; bucket.doc_count += 1; if let Some(sub_aggregation) = self.sub_aggregations.as_mut() { - (&mut sub_aggregation[bucket_pos]).collect(doc, bucket_with_accessor); + (&mut sub_aggregation[bucket_pos]).collect(doc, bucket_with_accessor)?; } + Ok(()) } fn f64_from_fastfield_u64(&self, val: u64) -> f64 { diff --git a/src/aggregation/bucket/range.rs b/src/aggregation/bucket/range.rs index 69206b110..610453e03 100644 --- a/src/aggregation/bucket/range.rs +++ b/src/aggregation/bucket/range.rs @@ -224,7 +224,7 @@ impl SegmentRangeCollector { doc: &[DocId], bucket_with_accessor: &BucketAggregationWithAccessor, force_flush: bool, - ) { + ) -> crate::Result<()> { let mut iter = doc.chunks_exact(4); let accessor = bucket_with_accessor .accessor @@ -240,24 +240,25 @@ impl SegmentRangeCollector { let bucket_pos3 = self.get_bucket_pos(val3); let bucket_pos4 = self.get_bucket_pos(val4); - self.increment_bucket(bucket_pos1, docs[0], &bucket_with_accessor.sub_aggregation); - self.increment_bucket(bucket_pos2, docs[1], &bucket_with_accessor.sub_aggregation); - self.increment_bucket(bucket_pos3, docs[2], &bucket_with_accessor.sub_aggregation); - self.increment_bucket(bucket_pos4, docs[3], &bucket_with_accessor.sub_aggregation); + self.increment_bucket(bucket_pos1, docs[0], &bucket_with_accessor.sub_aggregation)?; + self.increment_bucket(bucket_pos2, docs[1], &bucket_with_accessor.sub_aggregation)?; + self.increment_bucket(bucket_pos3, docs[2], &bucket_with_accessor.sub_aggregation)?; + self.increment_bucket(bucket_pos4, docs[3], &bucket_with_accessor.sub_aggregation)?; } for doc in iter.remainder() { let val = accessor.get(*doc); let bucket_pos = self.get_bucket_pos(val); - self.increment_bucket(bucket_pos, *doc, &bucket_with_accessor.sub_aggregation); + self.increment_bucket(bucket_pos, *doc, &bucket_with_accessor.sub_aggregation)?; } if force_flush { for bucket in &mut self.buckets { if let Some(sub_aggregation) = &mut bucket.bucket.sub_aggregation { sub_aggregation - .flush_staged_docs(&bucket_with_accessor.sub_aggregation, force_flush); + .flush_staged_docs(&bucket_with_accessor.sub_aggregation, force_flush)?; } } } + Ok(()) } #[inline] @@ -266,13 +267,14 @@ impl SegmentRangeCollector { bucket_pos: usize, doc: DocId, bucket_with_accessor: &AggregationsWithAccessor, - ) { + ) -> crate::Result<()> { let bucket = &mut self.buckets[bucket_pos]; bucket.bucket.doc_count += 1; if let Some(sub_aggregation) = &mut bucket.bucket.sub_aggregation { - sub_aggregation.collect(doc, bucket_with_accessor); + sub_aggregation.collect(doc, bucket_with_accessor)?; } + Ok(()) } #[inline] diff --git a/src/aggregation/bucket/term_agg.rs b/src/aggregation/bucket/term_agg.rs index c9833c885..1ed63d3e6 100644 --- a/src/aggregation/bucket/term_agg.rs +++ b/src/aggregation/bucket/term_agg.rs @@ -246,8 +246,7 @@ impl TermBuckets { doc: DocId, bucket_with_accessor: &AggregationsWithAccessor, blueprint: &Option, - ) { - // self.ensure_vec_exists(term_ids); + ) -> crate::Result<()> { for &term_id in term_ids { let entry = self .entries @@ -255,17 +254,19 @@ impl TermBuckets { .or_insert_with(|| TermBucketEntry::from_blueprint(blueprint)); entry.doc_count += 1; if let Some(sub_aggregations) = entry.sub_aggregations.as_mut() { - sub_aggregations.collect(doc, bucket_with_accessor); + sub_aggregations.collect(doc, bucket_with_accessor)?; } } + Ok(()) } - fn force_flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) { + fn force_flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> { for entry in &mut self.entries.values_mut() { if let Some(sub_aggregations) = entry.sub_aggregations.as_mut() { - sub_aggregations.flush_staged_docs(agg_with_accessor, false); + sub_aggregations.flush_staged_docs(agg_with_accessor, false)?; } } + Ok(()) } } @@ -421,7 +422,7 @@ impl SegmentTermCollector { doc: &[DocId], bucket_with_accessor: &BucketAggregationWithAccessor, force_flush: bool, - ) { + ) -> crate::Result<()> { let accessor = bucket_with_accessor .accessor .as_multi() @@ -442,25 +443,25 @@ impl SegmentTermCollector { docs[0], &bucket_with_accessor.sub_aggregation, &self.blueprint, - ); + )?; self.term_buckets.increment_bucket( &vals2, docs[1], &bucket_with_accessor.sub_aggregation, &self.blueprint, - ); + )?; self.term_buckets.increment_bucket( &vals3, docs[2], &bucket_with_accessor.sub_aggregation, &self.blueprint, - ); + )?; self.term_buckets.increment_bucket( &vals4, docs[3], &bucket_with_accessor.sub_aggregation, &self.blueprint, - ); + )?; } for &doc in iter.remainder() { accessor.get_vals(doc, &mut vals1); @@ -470,12 +471,13 @@ impl SegmentTermCollector { doc, &bucket_with_accessor.sub_aggregation, &self.blueprint, - ); + )?; } if force_flush { self.term_buckets - .force_flush(&bucket_with_accessor.sub_aggregation); + .force_flush(&bucket_with_accessor.sub_aggregation)?; } + Ok(()) } } diff --git a/src/aggregation/collector.rs b/src/aggregation/collector.rs index 3cbbbcdc4..69913931c 100644 --- a/src/aggregation/collector.rs +++ b/src/aggregation/collector.rs @@ -133,13 +133,13 @@ impl SegmentCollector for AggregationSegmentCollector { #[inline] fn collect(&mut self, doc: crate::DocId, _score: crate::Score) -> crate::Result<()> { - self.result.collect(doc, &self.aggs_with_accessor); + self.result.collect(doc, &self.aggs_with_accessor)?; Ok(()) } fn harvest(mut self) -> Self::Fruit { self.result - .flush_staged_docs(&self.aggs_with_accessor, true); + .flush_staged_docs(&self.aggs_with_accessor, true)?; self.result .into_intermediate_aggregations_result(&self.aggs_with_accessor) } diff --git a/src/aggregation/segment_agg_result.rs b/src/aggregation/segment_agg_result.rs index 81f2b85de..121fb4cf3 100644 --- a/src/aggregation/segment_agg_result.rs +++ b/src/aggregation/segment_agg_result.rs @@ -115,21 +115,22 @@ impl SegmentAggregationResultsCollector { &mut self, doc: crate::DocId, agg_with_accessor: &AggregationsWithAccessor, - ) { + ) -> crate::Result<()> { self.staged_docs[self.num_staged_docs] = doc; self.num_staged_docs += 1; if self.num_staged_docs == self.staged_docs.len() { - self.flush_staged_docs(agg_with_accessor, false); + self.flush_staged_docs(agg_with_accessor, false)?; } + Ok(()) } pub(crate) fn flush_staged_docs( &mut self, agg_with_accessor: &AggregationsWithAccessor, force_flush: bool, - ) { + ) -> crate::Result<()> { if self.num_staged_docs == 0 { - return; + return Ok(()); } if let Some(metrics) = &mut self.metrics { for (collector, agg_with_accessor) in @@ -148,11 +149,12 @@ impl SegmentAggregationResultsCollector { &self.staged_docs[..self.num_staged_docs], agg_with_accessor, force_flush, - ); + )?; } } self.num_staged_docs = 0; + Ok(()) } } @@ -256,17 +258,18 @@ impl SegmentBucketResultCollector { doc: &[DocId], bucket_with_accessor: &BucketAggregationWithAccessor, force_flush: bool, - ) { + ) -> crate::Result<()> { match self { SegmentBucketResultCollector::Range(range) => { - range.collect_block(doc, bucket_with_accessor, force_flush); + range.collect_block(doc, bucket_with_accessor, force_flush)?; } SegmentBucketResultCollector::Histogram(histogram) => { - histogram.collect_block(doc, bucket_with_accessor, force_flush) + histogram.collect_block(doc, bucket_with_accessor, force_flush)?; } SegmentBucketResultCollector::Terms(terms) => { - terms.collect_block(doc, bucket_with_accessor, force_flush) + terms.collect_block(doc, bucket_with_accessor, force_flush)?; } } + Ok(()) } }