fix: add metrics to partition metrics list

This commit is contained in:
evenyag
2025-04-11 13:22:38 +08:00
parent bfdaa28b25
commit d7b97fc877

View File

@@ -41,7 +41,7 @@ use crate::error::{
};
use crate::read::range::RangeBuilderList;
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::PartitionMetrics;
use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList};
use crate::read::seq_scan::{build_sources, SeqScan};
use crate::read::{Batch, ScannerMetrics};
@@ -63,6 +63,9 @@ pub struct SeriesScan {
stream_ctx: Arc<StreamContext>,
/// Receivers of each partition.
receivers: Mutex<ReceiverList>,
/// Metrics for each partition.
/// The scanner only sets in query and keeps it empty during compaction.
metrics_list: Arc<PartitionMetricsList>,
}
impl SeriesScan {
@@ -78,6 +81,7 @@ impl SeriesScan {
properties,
stream_ctx,
receivers: Mutex::new(Vec::new()),
metrics_list: Arc::new(PartitionMetricsList::default()),
}
}
@@ -96,9 +100,10 @@ impl SeriesScan {
));
}
self.maybe_start_distributor(metrics_set);
self.maybe_start_distributor(metrics_set, &self.metrics_list);
let part_metrics = new_partition_metrics(&self.stream_ctx, metrics_set, partition);
let part_metrics =
new_partition_metrics(&self.stream_ctx, metrics_set, partition, &self.metrics_list);
let mut receiver = self.take_receiver(partition).map_err(BoxedError::new)?;
let stream_ctx = self.stream_ctx.clone();
@@ -161,7 +166,11 @@ impl SeriesScan {
}
/// Starts the distributor if the receiver list is empty.
fn maybe_start_distributor(&self, metrics_set: &ExecutionPlanMetricsSet) {
fn maybe_start_distributor(
&self,
metrics_set: &ExecutionPlanMetricsSet,
metrics_list: &Arc<PartitionMetricsList>,
) {
let mut rx_list = self.receivers.lock().unwrap();
if !rx_list.is_empty() {
return;
@@ -174,6 +183,7 @@ impl SeriesScan {
partitions: self.properties.partitions.clone(),
senders,
metrics_set: metrics_set.clone(),
metrics_list: metrics_list.clone(),
};
common_runtime::spawn_global(async move {
distributor.execute().await;
@@ -297,6 +307,7 @@ struct SeriesDistributor {
/// get per-partition metrics in verbose mode to see the metrics of the
/// distributor.
metrics_set: ExecutionPlanMetricsSet,
metrics_list: Arc<PartitionMetricsList>,
}
impl SeriesDistributor {
@@ -309,8 +320,12 @@ impl SeriesDistributor {
/// Scans all parts.
async fn scan_partitions(&mut self) -> Result<()> {
let part_metrics =
new_partition_metrics(&self.stream_ctx, &self.metrics_set, self.partitions.len());
let part_metrics = new_partition_metrics(
&self.stream_ctx,
&self.metrics_set,
self.partitions.len(),
&self.metrics_list,
);
part_metrics.on_first_poll();
let range_builder_list = Arc::new(RangeBuilderList::new(
@@ -525,12 +540,16 @@ fn new_partition_metrics(
stream_ctx: &StreamContext,
metrics_set: &ExecutionPlanMetricsSet,
partition: usize,
metrics_list: &PartitionMetricsList,
) -> PartitionMetrics {
PartitionMetrics::new(
let metrics = PartitionMetrics::new(
stream_ctx.input.mapper.metadata().region_id,
partition,
"SeriesScan",
stream_ctx.query_start,
metrics_set,
)
);
metrics_list.set(partition, metrics.clone());
metrics
}