diff --git a/columnar/src/block_accessor.rs b/columnar/src/block_accessor.rs new file mode 100644 index 000000000..5c1913a36 --- /dev/null +++ b/columnar/src/block_accessor.rs @@ -0,0 +1,36 @@ +use crate::{Column, DocId, RowId}; + +#[derive(Debug, Default, Clone)] +pub struct ColumnBlockAccessor { + val_cache: Vec, + docid_cache: Vec, + row_id_cache: Vec, +} + +impl + ColumnBlockAccessor +{ + #[inline] + pub fn fetch_block(&mut self, docs: &[u32], accessor: &Column) { + self.docid_cache.clear(); + self.row_id_cache.clear(); + accessor.row_ids_for_docs(docs, &mut self.docid_cache, &mut self.row_id_cache); + self.val_cache.resize(self.row_id_cache.len(), T::default()); + accessor + .values + .get_vals(&self.row_id_cache, &mut self.val_cache); + } + + #[inline] + pub fn iter_vals(&self) -> impl Iterator + '_ { + self.val_cache.iter().cloned() + } + + #[inline] + pub fn iter_docid_vals(&self) -> impl Iterator + '_ { + self.docid_cache + .iter() + .cloned() + .zip(self.val_cache.iter().cloned()) + } +} diff --git a/columnar/src/column/mod.rs b/columnar/src/column/mod.rs index 17614d264..d64543bd0 100644 --- a/columnar/src/column/mod.rs +++ b/columnar/src/column/mod.rs @@ -16,7 +16,7 @@ pub use serialize::{ use crate::column_index::ColumnIndex; use crate::column_values::monotonic_mapping::StrictlyMonotonicMappingToInternal; use crate::column_values::{monotonic_map_column, ColumnValues}; -use crate::{Cardinality, MonotonicallyMappableToU64, RowId}; +use crate::{Cardinality, DocId, MonotonicallyMappableToU64, RowId}; #[derive(Clone)] pub struct Column { @@ -68,8 +68,25 @@ impl Column { self.values_for_doc(row_id).next() } - pub fn values_for_doc(&self, row_id: RowId) -> impl Iterator + '_ { - self.value_row_ids(row_id) + /// Translates a block of docis to row_ids. + /// + /// returns the row_ids and the matching docids on the same index + /// e.g. + /// DocId In: [0, 5, 6] + /// DocId Out: [0, 0, 6, 6] + /// RowId Out: [0, 1, 2, 3] + #[inline] + pub fn row_ids_for_docs( + &self, + doc_ids: &[DocId], + doc_ids_out: &mut Vec, + row_ids: &mut Vec, + ) { + self.idx.docids_to_rowids(doc_ids, doc_ids_out, row_ids) + } + + pub fn values_for_doc(&self, doc_id: DocId) -> impl Iterator + '_ { + self.value_row_ids(doc_id) .map(|value_row_id: RowId| self.values.get_val(value_row_id)) } diff --git a/columnar/src/column_index/mod.rs b/columnar/src/column_index/mod.rs index fa7a0408b..aba51dd8d 100644 --- a/columnar/src/column_index/mod.rs +++ b/columnar/src/column_index/mod.rs @@ -74,6 +74,45 @@ impl ColumnIndex { } } + /// Translates a block of docis to row_ids. + /// + /// returns the row_ids and the matching docids on the same index + /// e.g. + /// DocId In: [0, 5, 6] + /// DocId Out: [0, 0, 6, 6] + /// RowId Out: [0, 1, 2, 3] + #[inline] + pub fn docids_to_rowids( + &self, + doc_ids: &[DocId], + doc_ids_out: &mut Vec, + row_ids: &mut Vec, + ) { + match self { + ColumnIndex::Empty { .. } => {} + ColumnIndex::Full => { + doc_ids_out.extend_from_slice(doc_ids); + row_ids.extend_from_slice(doc_ids); + } + ColumnIndex::Optional(optional_index) => { + for doc_id in doc_ids { + if let Some(row_id) = optional_index.rank_if_exists(*doc_id) { + doc_ids_out.push(*doc_id); + row_ids.push(row_id); + } + } + } + ColumnIndex::Multivalued(multivalued_index) => { + for doc_id in doc_ids { + for row_id in multivalued_index.range(*doc_id) { + doc_ids_out.push(*doc_id); + row_ids.push(row_id); + } + } + } + } + } + pub fn docid_range_to_rowids(&self, doc_id: Range) -> Range { match self { ColumnIndex::Empty { .. } => 0..0, diff --git a/columnar/src/lib.rs b/columnar/src/lib.rs index 8e82eb475..59d1558ed 100644 --- a/columnar/src/lib.rs +++ b/columnar/src/lib.rs @@ -9,6 +9,7 @@ extern crate test; use std::io; +mod block_accessor; mod column; mod column_index; pub mod column_values; @@ -19,6 +20,7 @@ mod iterable; pub(crate) mod utils; mod value; +pub use block_accessor::ColumnBlockAccessor; pub use column::{BytesColumn, Column, StrColumn}; pub use column_index::ColumnIndex; pub use column_values::{ColumnValues, MonotonicallyMappableToU128, MonotonicallyMappableToU64}; diff --git a/src/aggregation/agg_req_with_accessor.rs b/src/aggregation/agg_req_with_accessor.rs index c5d6c21b6..27a10ca88 100644 --- a/src/aggregation/agg_req_with_accessor.rs +++ b/src/aggregation/agg_req_with_accessor.rs @@ -2,7 +2,7 @@ use std::sync::Arc; -use columnar::{Column, ColumnType, ColumnValues, StrColumn}; +use columnar::{Column, ColumnBlockAccessor, ColumnType, ColumnValues, StrColumn}; use super::agg_req::{Aggregation, Aggregations, BucketAggregationType, MetricAggregation}; use super::bucket::{ @@ -45,6 +45,7 @@ pub struct BucketAggregationWithAccessor { pub(crate) bucket_agg: BucketAggregationType, pub(crate) sub_aggregation: AggregationsWithAccessor, pub(crate) limits: AggregationLimits, + pub(crate) column_block_accessor: ColumnBlockAccessor, } impl BucketAggregationWithAccessor { @@ -85,6 +86,7 @@ impl BucketAggregationWithAccessor { bucket_agg: bucket.clone(), str_dict_column, limits, + column_block_accessor: Default::default(), }) } } @@ -95,6 +97,7 @@ pub struct MetricAggregationWithAccessor { pub metric: MetricAggregation, pub field_type: ColumnType, pub accessor: Column, + pub column_block_accessor: ColumnBlockAccessor, } impl MetricAggregationWithAccessor { @@ -115,6 +118,7 @@ impl MetricAggregationWithAccessor { accessor, field_type, metric: metric.clone(), + column_block_accessor: Default::default(), }) } } diff --git a/src/aggregation/bucket/histogram/histogram.rs b/src/aggregation/bucket/histogram/histogram.rs index ee5f4b1c5..bdf26dd97 100644 --- a/src/aggregation/bucket/histogram/histogram.rs +++ b/src/aggregation/bucket/histogram/histogram.rs @@ -20,7 +20,7 @@ use crate::aggregation::segment_agg_result::{ build_segment_agg_collector, AggregationLimits, SegmentAggregationCollector, }; use crate::aggregation::{f64_from_fastfield_u64, format_date, VecWithNames}; -use crate::{DocId, TantivyError}; +use crate::TantivyError; /// Histogram is a bucket aggregation, where buckets are created dynamically for given `interval`. /// Each document value is rounded down to its bucket. @@ -235,7 +235,7 @@ impl SegmentAggregationCollector for SegmentHistogramCollector { fn collect( &mut self, doc: crate::DocId, - agg_with_accessor: &AggregationsWithAccessor, + agg_with_accessor: &mut AggregationsWithAccessor, ) -> crate::Result<()> { self.collect_block(&[doc], agg_with_accessor) } @@ -244,11 +244,9 @@ impl SegmentAggregationCollector for SegmentHistogramCollector { fn collect_block( &mut self, docs: &[crate::DocId], - agg_with_accessor: &AggregationsWithAccessor, + agg_with_accessor: &mut AggregationsWithAccessor, ) -> crate::Result<()> { - let accessor = &agg_with_accessor.buckets.values[self.accessor_idx].accessor; - let sub_aggregation_accessor = - &agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation; + let bucket_agg_accessor = &mut agg_with_accessor.buckets.values[self.accessor_idx]; let mem_pre = self.get_memory_consumption(); @@ -257,20 +255,26 @@ impl SegmentAggregationCollector for SegmentHistogramCollector { let offset = self.offset; let get_bucket_pos = |val| (get_bucket_pos_f64(val, interval, offset) as i64); - for doc in docs { - for val in accessor.values_for_doc(*doc) { - let val = self.f64_from_fastfield_u64(val); + bucket_agg_accessor + .column_block_accessor + .fetch_block(docs, &bucket_agg_accessor.accessor); - let bucket_pos = get_bucket_pos(val); + for (doc, val) in bucket_agg_accessor.column_block_accessor.iter_docid_vals() { + let val = self.f64_from_fastfield_u64(val); - if bounds.contains(val) { - self.increment_bucket( - bucket_pos, - *doc, - sub_aggregation_accessor, - interval, - offset, - )?; + let bucket_pos = get_bucket_pos(val); + + if bounds.contains(val) { + let bucket = self.buckets.entry(bucket_pos).or_insert_with(|| { + let key = get_bucket_key_from_pos(bucket_pos as f64, interval, offset); + SegmentHistogramBucketEntry { key, doc_count: 0 } + }); + bucket.doc_count += 1; + if let Some(sub_aggregation_blueprint) = self.sub_aggregation_blueprint.as_mut() { + self.sub_aggregations + .entry(bucket_pos) + .or_insert_with(|| sub_aggregation_blueprint.clone()) + .collect(doc, &mut bucket_agg_accessor.sub_aggregation)?; } } } @@ -283,9 +287,9 @@ impl SegmentAggregationCollector for SegmentHistogramCollector { Ok(()) } - fn flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> { + fn flush(&mut self, agg_with_accessor: &mut AggregationsWithAccessor) -> crate::Result<()> { let sub_aggregation_accessor = - &agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation; + &mut agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation; for sub_aggregation in self.sub_aggregations.values_mut() { sub_aggregation.flush(sub_aggregation_accessor)?; @@ -360,29 +364,6 @@ impl SegmentHistogramCollector { }) } - #[inline] - fn increment_bucket( - &mut self, - bucket_pos: i64, - doc: DocId, - bucket_with_accessor: &AggregationsWithAccessor, - interval: f64, - offset: f64, - ) -> crate::Result<()> { - let bucket = self.buckets.entry(bucket_pos).or_insert_with(|| { - let key = get_bucket_key_from_pos(bucket_pos as f64, interval, offset); - SegmentHistogramBucketEntry { key, doc_count: 0 } - }); - bucket.doc_count += 1; - if let Some(sub_aggregation_blueprint) = self.sub_aggregation_blueprint.as_mut() { - self.sub_aggregations - .entry(bucket_pos) - .or_insert_with(|| sub_aggregation_blueprint.clone()) - .collect(doc, bucket_with_accessor)?; - } - Ok(()) - } - #[inline] fn f64_from_fastfield_u64(&self, val: u64) -> f64 { f64_from_fastfield_u64(val, &self.column_type) diff --git a/src/aggregation/bucket/range.rs b/src/aggregation/bucket/range.rs index 87453730f..50f0e0824 100644 --- a/src/aggregation/bucket/range.rs +++ b/src/aggregation/bucket/range.rs @@ -212,7 +212,7 @@ impl SegmentAggregationCollector for SegmentRangeCollector { fn collect( &mut self, doc: crate::DocId, - agg_with_accessor: &AggregationsWithAccessor, + agg_with_accessor: &mut AggregationsWithAccessor, ) -> crate::Result<()> { self.collect_block(&[doc], agg_with_accessor) } @@ -221,30 +221,31 @@ impl SegmentAggregationCollector for SegmentRangeCollector { fn collect_block( &mut self, docs: &[crate::DocId], - agg_with_accessor: &AggregationsWithAccessor, + agg_with_accessor: &mut AggregationsWithAccessor, ) -> crate::Result<()> { - let accessor = &agg_with_accessor.buckets.values[self.accessor_idx].accessor; - let sub_aggregation_accessor = - &agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation; - for doc in docs { - for val in accessor.values_for_doc(*doc) { - let bucket_pos = self.get_bucket_pos(val); + let bucket_agg_accessor = &mut agg_with_accessor.buckets.values[self.accessor_idx]; - let bucket = &mut self.buckets[bucket_pos]; + bucket_agg_accessor + .column_block_accessor + .fetch_block(docs, &bucket_agg_accessor.accessor); - bucket.bucket.doc_count += 1; - if let Some(sub_aggregation) = &mut bucket.bucket.sub_aggregation { - sub_aggregation.collect(*doc, sub_aggregation_accessor)?; - } + for (doc, val) in bucket_agg_accessor.column_block_accessor.iter_docid_vals() { + let bucket_pos = self.get_bucket_pos(val); + + let bucket = &mut self.buckets[bucket_pos]; + + bucket.bucket.doc_count += 1; + if let Some(sub_aggregation) = &mut bucket.bucket.sub_aggregation { + sub_aggregation.collect(doc, &mut bucket_agg_accessor.sub_aggregation)?; } } Ok(()) } - fn flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> { + fn flush(&mut self, agg_with_accessor: &mut AggregationsWithAccessor) -> crate::Result<()> { let sub_aggregation_accessor = - &agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation; + &mut agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation; for bucket in self.buckets.iter_mut() { if let Some(sub_agg) = bucket.bucket.sub_aggregation.as_mut() { diff --git a/src/aggregation/bucket/term_agg.rs b/src/aggregation/bucket/term_agg.rs index f37c4870a..09a7fa128 100644 --- a/src/aggregation/bucket/term_agg.rs +++ b/src/aggregation/bucket/term_agg.rs @@ -1,6 +1,6 @@ use std::fmt::Debug; -use columnar::{Cardinality, ColumnType}; +use columnar::ColumnType; use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; @@ -210,7 +210,10 @@ struct TermBuckets { } impl TermBuckets { - fn force_flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> { + fn force_flush( + &mut self, + agg_with_accessor: &mut AggregationsWithAccessor, + ) -> crate::Result<()> { for sub_aggregations in &mut self.sub_aggs.values_mut() { sub_aggregations.as_mut().flush(agg_with_accessor)?; } @@ -228,7 +231,6 @@ pub struct SegmentTermCollector { blueprint: Option>, field_type: ColumnType, accessor_idx: usize, - val_cache: Vec, } pub(crate) fn get_agg_name_and_property(name: &str) -> (&str, &str) { @@ -257,7 +259,7 @@ impl SegmentAggregationCollector for SegmentTermCollector { fn collect( &mut self, doc: crate::DocId, - agg_with_accessor: &AggregationsWithAccessor, + agg_with_accessor: &mut AggregationsWithAccessor, ) -> crate::Result<()> { self.collect_block(&[doc], agg_with_accessor) } @@ -266,53 +268,34 @@ impl SegmentAggregationCollector for SegmentTermCollector { fn collect_block( &mut self, docs: &[crate::DocId], - agg_with_accessor: &AggregationsWithAccessor, + agg_with_accessor: &mut AggregationsWithAccessor, ) -> crate::Result<()> { - let accessor = &agg_with_accessor.buckets.values[self.accessor_idx].accessor; - let sub_aggregation_accessor = - &agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation; + let bucket_agg_accessor = &mut agg_with_accessor.buckets.values[self.accessor_idx]; - if accessor.get_cardinality() == Cardinality::Full { - self.val_cache.resize(docs.len(), 0); - accessor.values.get_vals(docs, &mut self.val_cache); - for term_id in self.val_cache.iter().cloned() { - let entry = self.term_buckets.entries.entry(term_id).or_default(); - *entry += 1; - } - // has subagg - if let Some(blueprint) = self.blueprint.as_ref() { - for (doc, term_id) in docs.iter().zip(self.val_cache.iter().cloned()) { - let sub_aggregations = self - .term_buckets - .sub_aggs - .entry(term_id) - .or_insert_with(|| blueprint.clone()); - sub_aggregations.collect(*doc, sub_aggregation_accessor)?; - } - } - } else { - for doc in docs { - for term_id in accessor.values_for_doc(*doc) { - let entry = self.term_buckets.entries.entry(term_id).or_default(); - *entry += 1; - // TODO: check if seperate loop is faster (may depend on the codec) - if let Some(blueprint) = self.blueprint.as_ref() { - let sub_aggregations = self - .term_buckets - .sub_aggs - .entry(term_id) - .or_insert_with(|| blueprint.clone()); - sub_aggregations.collect(*doc, sub_aggregation_accessor)?; - } - } + bucket_agg_accessor + .column_block_accessor + .fetch_block(docs, &bucket_agg_accessor.accessor); + for term_id in bucket_agg_accessor.column_block_accessor.iter_vals() { + let entry = self.term_buckets.entries.entry(term_id).or_default(); + *entry += 1; + } + // has subagg + if let Some(blueprint) = self.blueprint.as_ref() { + for (doc, term_id) in bucket_agg_accessor.column_block_accessor.iter_docid_vals() { + let sub_aggregations = self + .term_buckets + .sub_aggs + .entry(term_id) + .or_insert_with(|| blueprint.clone()); + sub_aggregations.collect(doc, &mut bucket_agg_accessor.sub_aggregation)?; } } Ok(()) } - fn flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> { + fn flush(&mut self, agg_with_accessor: &mut AggregationsWithAccessor) -> crate::Result<()> { let sub_aggregation_accessor = - &agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation; + &mut agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation; self.term_buckets.force_flush(sub_aggregation_accessor)?; Ok(()) @@ -356,7 +339,6 @@ impl SegmentTermCollector { blueprint, field_type, accessor_idx, - val_cache: Default::default(), }) } diff --git a/src/aggregation/buf_collector.rs b/src/aggregation/buf_collector.rs index a5ae8b6da..d8ec399bc 100644 --- a/src/aggregation/buf_collector.rs +++ b/src/aggregation/buf_collector.rs @@ -46,7 +46,7 @@ impl SegmentAggregationCollector for BufAggregationCollector { fn collect( &mut self, doc: crate::DocId, - agg_with_accessor: &AggregationsWithAccessor, + agg_with_accessor: &mut AggregationsWithAccessor, ) -> crate::Result<()> { self.staged_docs[self.num_staged_docs] = doc; self.num_staged_docs += 1; @@ -62,7 +62,7 @@ impl SegmentAggregationCollector for BufAggregationCollector { fn collect_block( &mut self, docs: &[crate::DocId], - agg_with_accessor: &AggregationsWithAccessor, + agg_with_accessor: &mut AggregationsWithAccessor, ) -> crate::Result<()> { self.collector.collect_block(docs, agg_with_accessor)?; @@ -70,7 +70,7 @@ impl SegmentAggregationCollector for BufAggregationCollector { } #[inline] - fn flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> { + fn flush(&mut self, agg_with_accessor: &mut AggregationsWithAccessor) -> crate::Result<()> { self.collector .collect_block(&self.staged_docs[..self.num_staged_docs], agg_with_accessor)?; self.num_staged_docs = 0; diff --git a/src/aggregation/collector.rs b/src/aggregation/collector.rs index 8029f0b0b..5840db6f2 100644 --- a/src/aggregation/collector.rs +++ b/src/aggregation/collector.rs @@ -156,7 +156,10 @@ impl SegmentCollector for AggregationSegmentCollector { if self.error.is_some() { return; } - if let Err(err) = self.agg_collector.collect(doc, &self.aggs_with_accessor) { + if let Err(err) = self + .agg_collector + .collect(doc, &mut self.aggs_with_accessor) + { self.error = Some(err); } } @@ -170,7 +173,7 @@ impl SegmentCollector for AggregationSegmentCollector { } if let Err(err) = self .agg_collector - .collect_block(docs, &self.aggs_with_accessor) + .collect_block(docs, &mut self.aggs_with_accessor) { self.error = Some(err); } @@ -180,7 +183,7 @@ impl SegmentCollector for AggregationSegmentCollector { if let Some(err) = self.error { return Err(err); } - self.agg_collector.flush(&self.aggs_with_accessor)?; + self.agg_collector.flush(&mut self.aggs_with_accessor)?; Box::new(self.agg_collector).into_intermediate_aggregations_result(&self.aggs_with_accessor) } } diff --git a/src/aggregation/metric/stats.rs b/src/aggregation/metric/stats.rs index b5ee41a1a..6af31061a 100644 --- a/src/aggregation/metric/stats.rs +++ b/src/aggregation/metric/stats.rs @@ -1,8 +1,10 @@ -use columnar::{Cardinality, Column, ColumnType}; +use columnar::ColumnType; use serde::{Deserialize, Serialize}; use super::*; -use crate::aggregation::agg_req_with_accessor::AggregationsWithAccessor; +use crate::aggregation::agg_req_with_accessor::{ + AggregationsWithAccessor, MetricAggregationWithAccessor, +}; use crate::aggregation::intermediate_agg_result::{ IntermediateAggregationResults, IntermediateMetricResult, }; @@ -174,21 +176,18 @@ impl SegmentStatsCollector { } } #[inline] - pub(crate) fn collect_block_with_field(&mut self, docs: &[DocId], field: &Column) { - if field.get_cardinality() == Cardinality::Full { - self.val_cache.resize(docs.len(), 0); - field.values.get_vals(docs, &mut self.val_cache); - for val in self.val_cache.iter() { - let val1 = f64_from_fastfield_u64(*val, &self.field_type); - self.stats.collect(val1); - } - } else { - for doc in docs { - for val in field.values_for_doc(*doc) { - let val1 = f64_from_fastfield_u64(val, &self.field_type); - self.stats.collect(val1); - } - } + pub(crate) fn collect_block_with_field( + &mut self, + docs: &[DocId], + agg_accessor: &mut MetricAggregationWithAccessor, + ) { + agg_accessor + .column_block_accessor + .fetch_block(docs, &agg_accessor.accessor); + + for val in agg_accessor.column_block_accessor.iter_vals() { + let val1 = f64_from_fastfield_u64(val, &self.field_type); + self.stats.collect(val1); } } } @@ -235,7 +234,7 @@ impl SegmentAggregationCollector for SegmentStatsCollector { fn collect( &mut self, doc: crate::DocId, - agg_with_accessor: &AggregationsWithAccessor, + agg_with_accessor: &mut AggregationsWithAccessor, ) -> crate::Result<()> { let field = &agg_with_accessor.metrics.values[self.accessor_idx].accessor; @@ -251,9 +250,9 @@ impl SegmentAggregationCollector for SegmentStatsCollector { fn collect_block( &mut self, docs: &[crate::DocId], - agg_with_accessor: &AggregationsWithAccessor, + agg_with_accessor: &mut AggregationsWithAccessor, ) -> crate::Result<()> { - let field = &agg_with_accessor.metrics.values[self.accessor_idx].accessor; + let field = &mut agg_with_accessor.metrics.values[self.accessor_idx]; self.collect_block_with_field(docs, field); Ok(()) } diff --git a/src/aggregation/segment_agg_result.rs b/src/aggregation/segment_agg_result.rs index 6b8400119..c159825ff 100644 --- a/src/aggregation/segment_agg_result.rs +++ b/src/aggregation/segment_agg_result.rs @@ -28,18 +28,18 @@ pub(crate) trait SegmentAggregationCollector: CollectorClone + Debug { fn collect( &mut self, doc: crate::DocId, - agg_with_accessor: &AggregationsWithAccessor, + agg_with_accessor: &mut AggregationsWithAccessor, ) -> crate::Result<()>; fn collect_block( &mut self, docs: &[crate::DocId], - agg_with_accessor: &AggregationsWithAccessor, + agg_with_accessor: &mut AggregationsWithAccessor, ) -> crate::Result<()>; /// Finalize method. Some Aggregator collect blocks of docs before calling `collect_block`. /// This method ensures those staged docs will be collected. - fn flush(&mut self, _agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> { + fn flush(&mut self, _agg_with_accessor: &mut AggregationsWithAccessor) -> crate::Result<()> { Ok(()) } } @@ -206,7 +206,7 @@ impl SegmentAggregationCollector for GenericSegmentAggregationResultsCollector { fn collect( &mut self, doc: crate::DocId, - agg_with_accessor: &AggregationsWithAccessor, + agg_with_accessor: &mut AggregationsWithAccessor, ) -> crate::Result<()> { self.collect_block(&[doc], agg_with_accessor)?; @@ -216,7 +216,7 @@ impl SegmentAggregationCollector for GenericSegmentAggregationResultsCollector { fn collect_block( &mut self, docs: &[crate::DocId], - agg_with_accessor: &AggregationsWithAccessor, + agg_with_accessor: &mut AggregationsWithAccessor, ) -> crate::Result<()> { if let Some(metrics) = self.metrics.as_mut() { for collector in metrics { @@ -233,7 +233,7 @@ impl SegmentAggregationCollector for GenericSegmentAggregationResultsCollector { Ok(()) } - fn flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> { + fn flush(&mut self, agg_with_accessor: &mut AggregationsWithAccessor) -> crate::Result<()> { if let Some(metrics) = &mut self.metrics { for collector in metrics { collector.flush(agg_with_accessor)?;