diff --git a/src/aggregation/bucket/histogram/histogram.rs b/src/aggregation/bucket/histogram/histogram.rs index 7f57ed178..2f0e24943 100644 --- a/src/aggregation/bucket/histogram/histogram.rs +++ b/src/aggregation/bucket/histogram/histogram.rs @@ -168,13 +168,13 @@ impl SegmentHistogramBucketEntry { self, sub_aggregation: SegmentAggregationResultsCollector, agg_with_accessor: &AggregationsWithAccessor, - ) -> IntermediateHistogramBucketEntry { - IntermediateHistogramBucketEntry { + ) -> crate::Result { + Ok(IntermediateHistogramBucketEntry { key: self.key, doc_count: self.doc_count, sub_aggregation: sub_aggregation - .into_intermediate_aggregations_result(agg_with_accessor), - } + .into_intermediate_aggregations_result(agg_with_accessor)?, + }) } } @@ -196,7 +196,7 @@ impl SegmentHistogramCollector { pub fn into_intermediate_bucket_result( self, agg_with_accessor: &BucketAggregationWithAccessor, - ) -> IntermediateBucketResult { + ) -> crate::Result { let mut buckets = Vec::with_capacity( self.buckets .iter() @@ -210,18 +210,20 @@ impl SegmentHistogramCollector { // // Empty buckets may be added later again in the final result, depending on the request. if let Some(sub_aggregations) = self.sub_aggregations { - buckets.extend( - self.buckets - .into_iter() - .zip(sub_aggregations.into_iter()) - .filter(|(bucket, _sub_aggregation)| bucket.doc_count != 0) - .map(|(bucket, sub_aggregation)| { - bucket.into_intermediate_bucket_entry( - sub_aggregation, - &agg_with_accessor.sub_aggregation, - ) - }), - ) + for bucket_res in self + .buckets + .into_iter() + .zip(sub_aggregations.into_iter()) + .filter(|(bucket, _sub_aggregation)| bucket.doc_count != 0) + .map(|(bucket, sub_aggregation)| { + bucket.into_intermediate_bucket_entry( + sub_aggregation, + &agg_with_accessor.sub_aggregation, + ) + }) + { + buckets.push(bucket_res?); + } } else { buckets.extend( self.buckets @@ -231,7 +233,7 @@ impl SegmentHistogramCollector { ); }; - IntermediateBucketResult::Histogram { buckets } + Ok(IntermediateBucketResult::Histogram { buckets }) } pub(crate) fn from_req_and_validate( diff --git a/src/aggregation/bucket/range.rs b/src/aggregation/bucket/range.rs index 15b69227c..dfe7f6ef2 100644 --- a/src/aggregation/bucket/range.rs +++ b/src/aggregation/bucket/range.rs @@ -128,20 +128,20 @@ impl SegmentRangeBucketEntry { pub(crate) fn into_intermediate_bucket_entry( self, agg_with_accessor: &AggregationsWithAccessor, - ) -> IntermediateRangeBucketEntry { + ) -> crate::Result { let sub_aggregation = if let Some(sub_aggregation) = self.sub_aggregation { - sub_aggregation.into_intermediate_aggregations_result(agg_with_accessor) + sub_aggregation.into_intermediate_aggregations_result(agg_with_accessor)? } else { Default::default() }; - IntermediateRangeBucketEntry { + Ok(IntermediateRangeBucketEntry { key: self.key, doc_count: self.doc_count, sub_aggregation, from: self.from, to: self.to, - } + }) } } @@ -149,23 +149,23 @@ impl SegmentRangeCollector { pub fn into_intermediate_bucket_result( self, agg_with_accessor: &BucketAggregationWithAccessor, - ) -> IntermediateBucketResult { + ) -> crate::Result { let field_type = self.field_type; let buckets = self .buckets .into_iter() .map(move |range_bucket| { - ( + Ok(( range_to_string(&range_bucket.range, &field_type), range_bucket .bucket - .into_intermediate_bucket_entry(&agg_with_accessor.sub_aggregation), - ) + .into_intermediate_bucket_entry(&agg_with_accessor.sub_aggregation)?, + )) }) - .collect(); + .collect::>()?; - IntermediateBucketResult::Range(buckets) + Ok(IntermediateBucketResult::Range(buckets)) } pub(crate) fn from_req_and_validate( @@ -395,7 +395,8 @@ mod tests { ranges, }; - SegmentRangeCollector::from_req_and_validate(&req, &Default::default(), field_type).unwrap() + SegmentRangeCollector::from_req_and_validate(&req, &Default::default(), field_type) + .expect("unexpected error") } #[test] diff --git a/src/aggregation/bucket/term_agg.rs b/src/aggregation/bucket/term_agg.rs index 73b228394..6cd534b8a 100644 --- a/src/aggregation/bucket/term_agg.rs +++ b/src/aggregation/bucket/term_agg.rs @@ -10,6 +10,7 @@ use crate::aggregation::intermediate_agg_result::{ IntermediateBucketResult, IntermediateTermBucketEntry, IntermediateTermBucketResult, }; use crate::aggregation::segment_agg_result::SegmentAggregationResultsCollector; +use crate::error::DataCorruption; use crate::fastfield::MultiValuedFastFieldReader; use crate::schema::Type; use crate::DocId; @@ -182,17 +183,17 @@ impl TermBucketEntry { pub(crate) fn into_intermediate_bucket_entry( self, agg_with_accessor: &AggregationsWithAccessor, - ) -> IntermediateTermBucketEntry { + ) -> crate::Result { let sub_aggregation = if let Some(sub_aggregation) = self.sub_aggregations { - sub_aggregation.into_intermediate_aggregations_result(agg_with_accessor) + sub_aggregation.into_intermediate_aggregations_result(agg_with_accessor)? } else { Default::default() }; - IntermediateTermBucketEntry { + Ok(IntermediateTermBucketEntry { doc_count: self.doc_count, sub_aggregation, - } + }) } } @@ -288,7 +289,7 @@ impl SegmentTermCollector { pub(crate) fn into_intermediate_bucket_result( self, agg_with_accessor: &BucketAggregationWithAccessor, - ) -> IntermediateBucketResult { + ) -> crate::Result { let mut entries: Vec<_> = self.term_buckets.entries.into_iter().collect(); let (term_doc_count_before_cutoff, sum_other_doc_count) = @@ -307,15 +308,29 @@ impl SegmentTermCollector { .ord_to_term(term_id as u64, &mut buffer) .expect("could not find term"); dict.insert( - String::from_utf8(buffer.to_vec()).unwrap(), - entry.into_intermediate_bucket_entry(&agg_with_accessor.sub_aggregation), + String::from_utf8(buffer.to_vec()) + .map_err(|utf8_err| DataCorruption::comment_only(utf8_err.to_string()))?, + entry.into_intermediate_bucket_entry(&agg_with_accessor.sub_aggregation)?, ); } - IntermediateBucketResult::Terms(IntermediateTermBucketResult { - entries: dict, - sum_other_doc_count, - doc_count_error_upper_bound: term_doc_count_before_cutoff, - }) + if self.req.min_doc_count == 0 { + let mut stream = term_dict.stream()?; + while let Some((key, _ord)) = stream.next() { + let key = std::str::from_utf8(&key) + .map_err(|utf8_err| DataCorruption::comment_only(utf8_err.to_string()))?; + if !dict.contains_key(key) { + dict.insert(key.to_owned(), Default::default()); + } + } + } + + Ok(IntermediateBucketResult::Terms( + IntermediateTermBucketResult { + entries: dict, + sum_other_doc_count, + doc_count_error_upper_bound: term_doc_count_before_cutoff, + }, + )) } #[inline] diff --git a/src/aggregation/collector.rs b/src/aggregation/collector.rs index 2191f8b7c..9f35045ba 100644 --- a/src/aggregation/collector.rs +++ b/src/aggregation/collector.rs @@ -92,11 +92,12 @@ impl Collector for AggregationCollector { } fn merge_fruits( - mut segment_fruits: Vec, + mut segment_fruits: Vec>, ) -> crate::Result { - if let Some(mut fruit) = segment_fruits.pop() { + if let Some(fruit) = segment_fruits.pop() { + let mut fruit = fruit?; for next_fruit in segment_fruits { - fruit.merge_fruits(next_fruit); + fruit.merge_fruits(next_fruit?); } Ok(fruit) } else { @@ -128,7 +129,7 @@ impl AggregationSegmentCollector { } impl SegmentCollector for AggregationSegmentCollector { - type Fruit = IntermediateAggregationResults; + type Fruit = crate::Result; #[inline] fn collect(&mut self, doc: crate::DocId, _score: crate::Score) { diff --git a/src/aggregation/intermediate_agg_result.rs b/src/aggregation/intermediate_agg_result.rs index e4faa94c1..2909997f8 100644 --- a/src/aggregation/intermediate_agg_result.rs +++ b/src/aggregation/intermediate_agg_result.rs @@ -345,7 +345,7 @@ pub struct IntermediateRangeBucketEntry { /// This is the term entry for a bucket, which contains a count, and optionally /// sub_aggregations. -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)] pub struct IntermediateTermBucketEntry { /// The number of documents in the bucket. pub doc_count: u64, diff --git a/src/aggregation/segment_agg_result.rs b/src/aggregation/segment_agg_result.rs index b0645785f..11f4b324d 100644 --- a/src/aggregation/segment_agg_result.rs +++ b/src/aggregation/segment_agg_result.rs @@ -44,18 +44,20 @@ impl SegmentAggregationResultsCollector { pub fn into_intermediate_aggregations_result( self, agg_with_accessor: &AggregationsWithAccessor, - ) -> IntermediateAggregationResults { - let buckets = self.buckets.map(|buckets| { + ) -> crate::Result { + let buckets = if let Some(buckets) = self.buckets { let entries = buckets .into_iter() .zip(agg_with_accessor.buckets.values()) - .map(|((key, bucket), acc)| (key, bucket.into_intermediate_bucket_result(acc))) - .collect::>(); - VecWithNames::from_entries(entries) - }); + .map(|((key, bucket), acc)| Ok((key, bucket.into_intermediate_bucket_result(acc)?))) + .collect::>>()?; + Some(VecWithNames::from_entries(entries)) + } else { + None + }; let metrics = self.metrics.map(VecWithNames::from_other); - IntermediateAggregationResults { metrics, buckets } + Ok(IntermediateAggregationResults { metrics, buckets }) } pub(crate) fn from_req_and_validate(req: &AggregationsWithAccessor) -> crate::Result { @@ -191,7 +193,7 @@ impl SegmentBucketResultCollector { pub fn into_intermediate_bucket_result( self, agg_with_accessor: &BucketAggregationWithAccessor, - ) -> IntermediateBucketResult { + ) -> crate::Result { match self { SegmentBucketResultCollector::Terms(terms) => { terms.into_intermediate_bucket_result(agg_with_accessor)