From 68414bf593b953509249c29b118fd5271e2cbfea Mon Sep 17 00:00:00 2001 From: evenyag Date: Tue, 15 Apr 2025 16:30:47 +0800 Subject: [PATCH] feat: metrics for send series timeout --- src/mito2/src/read/scan_util.rs | 12 +++++++++++- src/mito2/src/read/series_scan.rs | 5 +++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 36899796b0..2f9e2fcffc 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -92,6 +92,8 @@ struct ScanMetricsSet { /// Elapsed time before the first poll operation. first_poll: Duration, + /// Number of send timeout in SeriesScan. + num_series_send_timeout: usize, } impl fmt::Debug for ScanMetricsSet { @@ -122,6 +124,7 @@ impl fmt::Debug for ScanMetricsSet { num_sst_batches, num_sst_rows, first_poll, + num_series_send_timeout, } = self; write!( @@ -150,7 +153,8 @@ impl fmt::Debug for ScanMetricsSet { num_sst_record_batches={num_sst_record_batches}, \ num_sst_batches={num_sst_batches}, \ num_sst_rows={num_sst_rows}, \ - first_poll={first_poll:?}}}" + first_poll={first_poll:?}, \ + num_series_send_timeout={num_series_send_timeout}}}" ) } } @@ -439,6 +443,12 @@ impl PartitionMetrics { pub(crate) fn on_finish(&self) { self.0.on_finish(); } + + /// Sets the `num_series_send_timeout`. + pub(crate) fn set_num_series_send_timeout(&self, num_timeout: usize) { + let mut metrics = self.0.metrics.lock().unwrap(); + metrics.num_series_send_timeout = num_timeout; + } } impl fmt::Debug for PartitionMetrics { diff --git a/src/mito2/src/read/series_scan.rs b/src/mito2/src/read/series_scan.rs index d865340932..df5676b41d 100644 --- a/src/mito2/src/read/series_scan.rs +++ b/src/mito2/src/read/series_scan.rs @@ -395,6 +395,7 @@ impl SeriesDistributor { metrics.scan_cost += fetch_start.elapsed(); part_metrics.merge_metrics(&metrics); + part_metrics.set_num_series_send_timeout(self.senders.num_timeout); part_metrics.on_finish(); @@ -437,6 +438,8 @@ struct SenderList { num_nones: usize, /// Index of the current partition to send. sender_idx: usize, + /// Number of timeout. + num_timeout: usize, } impl SenderList { @@ -446,6 +449,7 @@ impl SenderList { senders, num_nones, sender_idx: 0, + num_timeout: 0, } } @@ -505,6 +509,7 @@ impl SenderList { match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await { Ok(()) => break, Err(SendTimeoutError::Timeout(res)) => { + self.num_timeout += 1; // Safety: we send Ok. batch = res.unwrap(); }