feat: metrics for send series timeout

This commit is contained in:
evenyag
2025-04-15 16:30:47 +08:00
parent 5e836a0d1b
commit 68414bf593
2 changed files with 16 additions and 1 deletions

View File

@@ -92,6 +92,8 @@ struct ScanMetricsSet {
/// Elapsed time before the first poll operation.
first_poll: Duration,
/// Number of send timeout in SeriesScan.
num_series_send_timeout: usize,
}
impl fmt::Debug for ScanMetricsSet {
@@ -122,6 +124,7 @@ impl fmt::Debug for ScanMetricsSet {
num_sst_batches,
num_sst_rows,
first_poll,
num_series_send_timeout,
} = self;
write!(
@@ -150,7 +153,8 @@ impl fmt::Debug for ScanMetricsSet {
num_sst_record_batches={num_sst_record_batches}, \
num_sst_batches={num_sst_batches}, \
num_sst_rows={num_sst_rows}, \
first_poll={first_poll:?}}}"
first_poll={first_poll:?}, \
num_series_send_timeout={num_series_send_timeout}}}"
)
}
}
@@ -439,6 +443,12 @@ impl PartitionMetrics {
pub(crate) fn on_finish(&self) {
self.0.on_finish();
}
/// Sets the `num_series_send_timeout`.
pub(crate) fn set_num_series_send_timeout(&self, num_timeout: usize) {
let mut metrics = self.0.metrics.lock().unwrap();
metrics.num_series_send_timeout = num_timeout;
}
}
impl fmt::Debug for PartitionMetrics {

View File

@@ -395,6 +395,7 @@ impl SeriesDistributor {
metrics.scan_cost += fetch_start.elapsed();
part_metrics.merge_metrics(&metrics);
part_metrics.set_num_series_send_timeout(self.senders.num_timeout);
part_metrics.on_finish();
@@ -437,6 +438,8 @@ struct SenderList {
num_nones: usize,
/// Index of the current partition to send.
sender_idx: usize,
/// Number of timeout.
num_timeout: usize,
}
impl SenderList {
@@ -446,6 +449,7 @@ impl SenderList {
senders,
num_nones,
sender_idx: 0,
num_timeout: 0,
}
}
@@ -505,6 +509,7 @@ impl SenderList {
match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await {
Ok(()) => break,
Err(SendTimeoutError::Timeout(res)) => {
self.num_timeout += 1;
// Safety: we send Ok.
batch = res.unwrap();
}