fetch blocks of vals in aggregation for all cardinality (#1950)

* fetch blocks of vals in aggregation for all cardinality

* move caching in common accessor
This commit is contained in:
PSeitz
2023-03-23 15:41:11 +08:00
committed by GitHub
parent 5504cfd012
commit da2804644f
12 changed files with 202 additions and 138 deletions

View File

@@ -0,0 +1,36 @@
use crate::{Column, DocId, RowId};
#[derive(Debug, Default, Clone)]
pub struct ColumnBlockAccessor<T> {
val_cache: Vec<T>,
docid_cache: Vec<DocId>,
row_id_cache: Vec<RowId>,
}
impl<T: PartialOrd + Copy + std::fmt::Debug + Send + Sync + 'static + Default>
ColumnBlockAccessor<T>
{
#[inline]
pub fn fetch_block(&mut self, docs: &[u32], accessor: &Column<T>) {
self.docid_cache.clear();
self.row_id_cache.clear();
accessor.row_ids_for_docs(docs, &mut self.docid_cache, &mut self.row_id_cache);
self.val_cache.resize(self.row_id_cache.len(), T::default());
accessor
.values
.get_vals(&self.row_id_cache, &mut self.val_cache);
}
#[inline]
pub fn iter_vals(&self) -> impl Iterator<Item = T> + '_ {
self.val_cache.iter().cloned()
}
#[inline]
pub fn iter_docid_vals(&self) -> impl Iterator<Item = (DocId, T)> + '_ {
self.docid_cache
.iter()
.cloned()
.zip(self.val_cache.iter().cloned())
}
}

View File

@@ -16,7 +16,7 @@ pub use serialize::{
use crate::column_index::ColumnIndex;
use crate::column_values::monotonic_mapping::StrictlyMonotonicMappingToInternal;
use crate::column_values::{monotonic_map_column, ColumnValues};
use crate::{Cardinality, MonotonicallyMappableToU64, RowId};
use crate::{Cardinality, DocId, MonotonicallyMappableToU64, RowId};
#[derive(Clone)]
pub struct Column<T = u64> {
@@ -68,8 +68,25 @@ impl<T: PartialOrd + Copy + Debug + Send + Sync + 'static> Column<T> {
self.values_for_doc(row_id).next()
}
pub fn values_for_doc(&self, row_id: RowId) -> impl Iterator<Item = T> + '_ {
self.value_row_ids(row_id)
/// Translates a block of docis to row_ids.
///
/// returns the row_ids and the matching docids on the same index
/// e.g.
/// DocId In: [0, 5, 6]
/// DocId Out: [0, 0, 6, 6]
/// RowId Out: [0, 1, 2, 3]
#[inline]
pub fn row_ids_for_docs(
&self,
doc_ids: &[DocId],
doc_ids_out: &mut Vec<DocId>,
row_ids: &mut Vec<RowId>,
) {
self.idx.docids_to_rowids(doc_ids, doc_ids_out, row_ids)
}
pub fn values_for_doc(&self, doc_id: DocId) -> impl Iterator<Item = T> + '_ {
self.value_row_ids(doc_id)
.map(|value_row_id: RowId| self.values.get_val(value_row_id))
}

View File

@@ -74,6 +74,45 @@ impl ColumnIndex {
}
}
/// Translates a block of docis to row_ids.
///
/// returns the row_ids and the matching docids on the same index
/// e.g.
/// DocId In: [0, 5, 6]
/// DocId Out: [0, 0, 6, 6]
/// RowId Out: [0, 1, 2, 3]
#[inline]
pub fn docids_to_rowids(
&self,
doc_ids: &[DocId],
doc_ids_out: &mut Vec<DocId>,
row_ids: &mut Vec<RowId>,
) {
match self {
ColumnIndex::Empty { .. } => {}
ColumnIndex::Full => {
doc_ids_out.extend_from_slice(doc_ids);
row_ids.extend_from_slice(doc_ids);
}
ColumnIndex::Optional(optional_index) => {
for doc_id in doc_ids {
if let Some(row_id) = optional_index.rank_if_exists(*doc_id) {
doc_ids_out.push(*doc_id);
row_ids.push(row_id);
}
}
}
ColumnIndex::Multivalued(multivalued_index) => {
for doc_id in doc_ids {
for row_id in multivalued_index.range(*doc_id) {
doc_ids_out.push(*doc_id);
row_ids.push(row_id);
}
}
}
}
}
pub fn docid_range_to_rowids(&self, doc_id: Range<DocId>) -> Range<RowId> {
match self {
ColumnIndex::Empty { .. } => 0..0,

View File

@@ -9,6 +9,7 @@ extern crate test;
use std::io;
mod block_accessor;
mod column;
mod column_index;
pub mod column_values;
@@ -19,6 +20,7 @@ mod iterable;
pub(crate) mod utils;
mod value;
pub use block_accessor::ColumnBlockAccessor;
pub use column::{BytesColumn, Column, StrColumn};
pub use column_index::ColumnIndex;
pub use column_values::{ColumnValues, MonotonicallyMappableToU128, MonotonicallyMappableToU64};

View File

@@ -2,7 +2,7 @@
use std::sync::Arc;
use columnar::{Column, ColumnType, ColumnValues, StrColumn};
use columnar::{Column, ColumnBlockAccessor, ColumnType, ColumnValues, StrColumn};
use super::agg_req::{Aggregation, Aggregations, BucketAggregationType, MetricAggregation};
use super::bucket::{
@@ -45,6 +45,7 @@ pub struct BucketAggregationWithAccessor {
pub(crate) bucket_agg: BucketAggregationType,
pub(crate) sub_aggregation: AggregationsWithAccessor,
pub(crate) limits: AggregationLimits,
pub(crate) column_block_accessor: ColumnBlockAccessor<u64>,
}
impl BucketAggregationWithAccessor {
@@ -85,6 +86,7 @@ impl BucketAggregationWithAccessor {
bucket_agg: bucket.clone(),
str_dict_column,
limits,
column_block_accessor: Default::default(),
})
}
}
@@ -95,6 +97,7 @@ pub struct MetricAggregationWithAccessor {
pub metric: MetricAggregation,
pub field_type: ColumnType,
pub accessor: Column<u64>,
pub column_block_accessor: ColumnBlockAccessor<u64>,
}
impl MetricAggregationWithAccessor {
@@ -115,6 +118,7 @@ impl MetricAggregationWithAccessor {
accessor,
field_type,
metric: metric.clone(),
column_block_accessor: Default::default(),
})
}
}

View File

@@ -20,7 +20,7 @@ use crate::aggregation::segment_agg_result::{
build_segment_agg_collector, AggregationLimits, SegmentAggregationCollector,
};
use crate::aggregation::{f64_from_fastfield_u64, format_date, VecWithNames};
use crate::{DocId, TantivyError};
use crate::TantivyError;
/// Histogram is a bucket aggregation, where buckets are created dynamically for given `interval`.
/// Each document value is rounded down to its bucket.
@@ -235,7 +235,7 @@ impl SegmentAggregationCollector for SegmentHistogramCollector {
fn collect(
&mut self,
doc: crate::DocId,
agg_with_accessor: &AggregationsWithAccessor,
agg_with_accessor: &mut AggregationsWithAccessor,
) -> crate::Result<()> {
self.collect_block(&[doc], agg_with_accessor)
}
@@ -244,11 +244,9 @@ impl SegmentAggregationCollector for SegmentHistogramCollector {
fn collect_block(
&mut self,
docs: &[crate::DocId],
agg_with_accessor: &AggregationsWithAccessor,
agg_with_accessor: &mut AggregationsWithAccessor,
) -> crate::Result<()> {
let accessor = &agg_with_accessor.buckets.values[self.accessor_idx].accessor;
let sub_aggregation_accessor =
&agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;
let bucket_agg_accessor = &mut agg_with_accessor.buckets.values[self.accessor_idx];
let mem_pre = self.get_memory_consumption();
@@ -257,20 +255,26 @@ impl SegmentAggregationCollector for SegmentHistogramCollector {
let offset = self.offset;
let get_bucket_pos = |val| (get_bucket_pos_f64(val, interval, offset) as i64);
for doc in docs {
for val in accessor.values_for_doc(*doc) {
let val = self.f64_from_fastfield_u64(val);
bucket_agg_accessor
.column_block_accessor
.fetch_block(docs, &bucket_agg_accessor.accessor);
let bucket_pos = get_bucket_pos(val);
for (doc, val) in bucket_agg_accessor.column_block_accessor.iter_docid_vals() {
let val = self.f64_from_fastfield_u64(val);
if bounds.contains(val) {
self.increment_bucket(
bucket_pos,
*doc,
sub_aggregation_accessor,
interval,
offset,
)?;
let bucket_pos = get_bucket_pos(val);
if bounds.contains(val) {
let bucket = self.buckets.entry(bucket_pos).or_insert_with(|| {
let key = get_bucket_key_from_pos(bucket_pos as f64, interval, offset);
SegmentHistogramBucketEntry { key, doc_count: 0 }
});
bucket.doc_count += 1;
if let Some(sub_aggregation_blueprint) = self.sub_aggregation_blueprint.as_mut() {
self.sub_aggregations
.entry(bucket_pos)
.or_insert_with(|| sub_aggregation_blueprint.clone())
.collect(doc, &mut bucket_agg_accessor.sub_aggregation)?;
}
}
}
@@ -283,9 +287,9 @@ impl SegmentAggregationCollector for SegmentHistogramCollector {
Ok(())
}
fn flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> {
fn flush(&mut self, agg_with_accessor: &mut AggregationsWithAccessor) -> crate::Result<()> {
let sub_aggregation_accessor =
&agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;
&mut agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;
for sub_aggregation in self.sub_aggregations.values_mut() {
sub_aggregation.flush(sub_aggregation_accessor)?;
@@ -360,29 +364,6 @@ impl SegmentHistogramCollector {
})
}
#[inline]
fn increment_bucket(
&mut self,
bucket_pos: i64,
doc: DocId,
bucket_with_accessor: &AggregationsWithAccessor,
interval: f64,
offset: f64,
) -> crate::Result<()> {
let bucket = self.buckets.entry(bucket_pos).or_insert_with(|| {
let key = get_bucket_key_from_pos(bucket_pos as f64, interval, offset);
SegmentHistogramBucketEntry { key, doc_count: 0 }
});
bucket.doc_count += 1;
if let Some(sub_aggregation_blueprint) = self.sub_aggregation_blueprint.as_mut() {
self.sub_aggregations
.entry(bucket_pos)
.or_insert_with(|| sub_aggregation_blueprint.clone())
.collect(doc, bucket_with_accessor)?;
}
Ok(())
}
#[inline]
fn f64_from_fastfield_u64(&self, val: u64) -> f64 {
f64_from_fastfield_u64(val, &self.column_type)

View File

@@ -212,7 +212,7 @@ impl SegmentAggregationCollector for SegmentRangeCollector {
fn collect(
&mut self,
doc: crate::DocId,
agg_with_accessor: &AggregationsWithAccessor,
agg_with_accessor: &mut AggregationsWithAccessor,
) -> crate::Result<()> {
self.collect_block(&[doc], agg_with_accessor)
}
@@ -221,30 +221,31 @@ impl SegmentAggregationCollector for SegmentRangeCollector {
fn collect_block(
&mut self,
docs: &[crate::DocId],
agg_with_accessor: &AggregationsWithAccessor,
agg_with_accessor: &mut AggregationsWithAccessor,
) -> crate::Result<()> {
let accessor = &agg_with_accessor.buckets.values[self.accessor_idx].accessor;
let sub_aggregation_accessor =
&agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;
for doc in docs {
for val in accessor.values_for_doc(*doc) {
let bucket_pos = self.get_bucket_pos(val);
let bucket_agg_accessor = &mut agg_with_accessor.buckets.values[self.accessor_idx];
let bucket = &mut self.buckets[bucket_pos];
bucket_agg_accessor
.column_block_accessor
.fetch_block(docs, &bucket_agg_accessor.accessor);
bucket.bucket.doc_count += 1;
if let Some(sub_aggregation) = &mut bucket.bucket.sub_aggregation {
sub_aggregation.collect(*doc, sub_aggregation_accessor)?;
}
for (doc, val) in bucket_agg_accessor.column_block_accessor.iter_docid_vals() {
let bucket_pos = self.get_bucket_pos(val);
let bucket = &mut self.buckets[bucket_pos];
bucket.bucket.doc_count += 1;
if let Some(sub_aggregation) = &mut bucket.bucket.sub_aggregation {
sub_aggregation.collect(doc, &mut bucket_agg_accessor.sub_aggregation)?;
}
}
Ok(())
}
fn flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> {
fn flush(&mut self, agg_with_accessor: &mut AggregationsWithAccessor) -> crate::Result<()> {
let sub_aggregation_accessor =
&agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;
&mut agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;
for bucket in self.buckets.iter_mut() {
if let Some(sub_agg) = bucket.bucket.sub_aggregation.as_mut() {

View File

@@ -1,6 +1,6 @@
use std::fmt::Debug;
use columnar::{Cardinality, ColumnType};
use columnar::ColumnType;
use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize};
@@ -210,7 +210,10 @@ struct TermBuckets {
}
impl TermBuckets {
fn force_flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> {
fn force_flush(
&mut self,
agg_with_accessor: &mut AggregationsWithAccessor,
) -> crate::Result<()> {
for sub_aggregations in &mut self.sub_aggs.values_mut() {
sub_aggregations.as_mut().flush(agg_with_accessor)?;
}
@@ -228,7 +231,6 @@ pub struct SegmentTermCollector {
blueprint: Option<Box<dyn SegmentAggregationCollector>>,
field_type: ColumnType,
accessor_idx: usize,
val_cache: Vec<u64>,
}
pub(crate) fn get_agg_name_and_property(name: &str) -> (&str, &str) {
@@ -257,7 +259,7 @@ impl SegmentAggregationCollector for SegmentTermCollector {
fn collect(
&mut self,
doc: crate::DocId,
agg_with_accessor: &AggregationsWithAccessor,
agg_with_accessor: &mut AggregationsWithAccessor,
) -> crate::Result<()> {
self.collect_block(&[doc], agg_with_accessor)
}
@@ -266,53 +268,34 @@ impl SegmentAggregationCollector for SegmentTermCollector {
fn collect_block(
&mut self,
docs: &[crate::DocId],
agg_with_accessor: &AggregationsWithAccessor,
agg_with_accessor: &mut AggregationsWithAccessor,
) -> crate::Result<()> {
let accessor = &agg_with_accessor.buckets.values[self.accessor_idx].accessor;
let sub_aggregation_accessor =
&agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;
let bucket_agg_accessor = &mut agg_with_accessor.buckets.values[self.accessor_idx];
if accessor.get_cardinality() == Cardinality::Full {
self.val_cache.resize(docs.len(), 0);
accessor.values.get_vals(docs, &mut self.val_cache);
for term_id in self.val_cache.iter().cloned() {
let entry = self.term_buckets.entries.entry(term_id).or_default();
*entry += 1;
}
// has subagg
if let Some(blueprint) = self.blueprint.as_ref() {
for (doc, term_id) in docs.iter().zip(self.val_cache.iter().cloned()) {
let sub_aggregations = self
.term_buckets
.sub_aggs
.entry(term_id)
.or_insert_with(|| blueprint.clone());
sub_aggregations.collect(*doc, sub_aggregation_accessor)?;
}
}
} else {
for doc in docs {
for term_id in accessor.values_for_doc(*doc) {
let entry = self.term_buckets.entries.entry(term_id).or_default();
*entry += 1;
// TODO: check if seperate loop is faster (may depend on the codec)
if let Some(blueprint) = self.blueprint.as_ref() {
let sub_aggregations = self
.term_buckets
.sub_aggs
.entry(term_id)
.or_insert_with(|| blueprint.clone());
sub_aggregations.collect(*doc, sub_aggregation_accessor)?;
}
}
bucket_agg_accessor
.column_block_accessor
.fetch_block(docs, &bucket_agg_accessor.accessor);
for term_id in bucket_agg_accessor.column_block_accessor.iter_vals() {
let entry = self.term_buckets.entries.entry(term_id).or_default();
*entry += 1;
}
// has subagg
if let Some(blueprint) = self.blueprint.as_ref() {
for (doc, term_id) in bucket_agg_accessor.column_block_accessor.iter_docid_vals() {
let sub_aggregations = self
.term_buckets
.sub_aggs
.entry(term_id)
.or_insert_with(|| blueprint.clone());
sub_aggregations.collect(doc, &mut bucket_agg_accessor.sub_aggregation)?;
}
}
Ok(())
}
fn flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> {
fn flush(&mut self, agg_with_accessor: &mut AggregationsWithAccessor) -> crate::Result<()> {
let sub_aggregation_accessor =
&agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;
&mut agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;
self.term_buckets.force_flush(sub_aggregation_accessor)?;
Ok(())
@@ -356,7 +339,6 @@ impl SegmentTermCollector {
blueprint,
field_type,
accessor_idx,
val_cache: Default::default(),
})
}

View File

@@ -46,7 +46,7 @@ impl SegmentAggregationCollector for BufAggregationCollector {
fn collect(
&mut self,
doc: crate::DocId,
agg_with_accessor: &AggregationsWithAccessor,
agg_with_accessor: &mut AggregationsWithAccessor,
) -> crate::Result<()> {
self.staged_docs[self.num_staged_docs] = doc;
self.num_staged_docs += 1;
@@ -62,7 +62,7 @@ impl SegmentAggregationCollector for BufAggregationCollector {
fn collect_block(
&mut self,
docs: &[crate::DocId],
agg_with_accessor: &AggregationsWithAccessor,
agg_with_accessor: &mut AggregationsWithAccessor,
) -> crate::Result<()> {
self.collector.collect_block(docs, agg_with_accessor)?;
@@ -70,7 +70,7 @@ impl SegmentAggregationCollector for BufAggregationCollector {
}
#[inline]
fn flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> {
fn flush(&mut self, agg_with_accessor: &mut AggregationsWithAccessor) -> crate::Result<()> {
self.collector
.collect_block(&self.staged_docs[..self.num_staged_docs], agg_with_accessor)?;
self.num_staged_docs = 0;

View File

@@ -156,7 +156,10 @@ impl SegmentCollector for AggregationSegmentCollector {
if self.error.is_some() {
return;
}
if let Err(err) = self.agg_collector.collect(doc, &self.aggs_with_accessor) {
if let Err(err) = self
.agg_collector
.collect(doc, &mut self.aggs_with_accessor)
{
self.error = Some(err);
}
}
@@ -170,7 +173,7 @@ impl SegmentCollector for AggregationSegmentCollector {
}
if let Err(err) = self
.agg_collector
.collect_block(docs, &self.aggs_with_accessor)
.collect_block(docs, &mut self.aggs_with_accessor)
{
self.error = Some(err);
}
@@ -180,7 +183,7 @@ impl SegmentCollector for AggregationSegmentCollector {
if let Some(err) = self.error {
return Err(err);
}
self.agg_collector.flush(&self.aggs_with_accessor)?;
self.agg_collector.flush(&mut self.aggs_with_accessor)?;
Box::new(self.agg_collector).into_intermediate_aggregations_result(&self.aggs_with_accessor)
}
}

View File

@@ -1,8 +1,10 @@
use columnar::{Cardinality, Column, ColumnType};
use columnar::ColumnType;
use serde::{Deserialize, Serialize};
use super::*;
use crate::aggregation::agg_req_with_accessor::AggregationsWithAccessor;
use crate::aggregation::agg_req_with_accessor::{
AggregationsWithAccessor, MetricAggregationWithAccessor,
};
use crate::aggregation::intermediate_agg_result::{
IntermediateAggregationResults, IntermediateMetricResult,
};
@@ -174,21 +176,18 @@ impl SegmentStatsCollector {
}
}
#[inline]
pub(crate) fn collect_block_with_field(&mut self, docs: &[DocId], field: &Column<u64>) {
if field.get_cardinality() == Cardinality::Full {
self.val_cache.resize(docs.len(), 0);
field.values.get_vals(docs, &mut self.val_cache);
for val in self.val_cache.iter() {
let val1 = f64_from_fastfield_u64(*val, &self.field_type);
self.stats.collect(val1);
}
} else {
for doc in docs {
for val in field.values_for_doc(*doc) {
let val1 = f64_from_fastfield_u64(val, &self.field_type);
self.stats.collect(val1);
}
}
pub(crate) fn collect_block_with_field(
&mut self,
docs: &[DocId],
agg_accessor: &mut MetricAggregationWithAccessor,
) {
agg_accessor
.column_block_accessor
.fetch_block(docs, &agg_accessor.accessor);
for val in agg_accessor.column_block_accessor.iter_vals() {
let val1 = f64_from_fastfield_u64(val, &self.field_type);
self.stats.collect(val1);
}
}
}
@@ -235,7 +234,7 @@ impl SegmentAggregationCollector for SegmentStatsCollector {
fn collect(
&mut self,
doc: crate::DocId,
agg_with_accessor: &AggregationsWithAccessor,
agg_with_accessor: &mut AggregationsWithAccessor,
) -> crate::Result<()> {
let field = &agg_with_accessor.metrics.values[self.accessor_idx].accessor;
@@ -251,9 +250,9 @@ impl SegmentAggregationCollector for SegmentStatsCollector {
fn collect_block(
&mut self,
docs: &[crate::DocId],
agg_with_accessor: &AggregationsWithAccessor,
agg_with_accessor: &mut AggregationsWithAccessor,
) -> crate::Result<()> {
let field = &agg_with_accessor.metrics.values[self.accessor_idx].accessor;
let field = &mut agg_with_accessor.metrics.values[self.accessor_idx];
self.collect_block_with_field(docs, field);
Ok(())
}

View File

@@ -28,18 +28,18 @@ pub(crate) trait SegmentAggregationCollector: CollectorClone + Debug {
fn collect(
&mut self,
doc: crate::DocId,
agg_with_accessor: &AggregationsWithAccessor,
agg_with_accessor: &mut AggregationsWithAccessor,
) -> crate::Result<()>;
fn collect_block(
&mut self,
docs: &[crate::DocId],
agg_with_accessor: &AggregationsWithAccessor,
agg_with_accessor: &mut AggregationsWithAccessor,
) -> crate::Result<()>;
/// Finalize method. Some Aggregator collect blocks of docs before calling `collect_block`.
/// This method ensures those staged docs will be collected.
fn flush(&mut self, _agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> {
fn flush(&mut self, _agg_with_accessor: &mut AggregationsWithAccessor) -> crate::Result<()> {
Ok(())
}
}
@@ -206,7 +206,7 @@ impl SegmentAggregationCollector for GenericSegmentAggregationResultsCollector {
fn collect(
&mut self,
doc: crate::DocId,
agg_with_accessor: &AggregationsWithAccessor,
agg_with_accessor: &mut AggregationsWithAccessor,
) -> crate::Result<()> {
self.collect_block(&[doc], agg_with_accessor)?;
@@ -216,7 +216,7 @@ impl SegmentAggregationCollector for GenericSegmentAggregationResultsCollector {
fn collect_block(
&mut self,
docs: &[crate::DocId],
agg_with_accessor: &AggregationsWithAccessor,
agg_with_accessor: &mut AggregationsWithAccessor,
) -> crate::Result<()> {
if let Some(metrics) = self.metrics.as_mut() {
for collector in metrics {
@@ -233,7 +233,7 @@ impl SegmentAggregationCollector for GenericSegmentAggregationResultsCollector {
Ok(())
}
fn flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> {
fn flush(&mut self, agg_with_accessor: &mut AggregationsWithAccessor) -> crate::Result<()> {
if let Some(metrics) = &mut self.metrics {
for collector in metrics {
collector.flush(agg_with_accessor)?;