fix(consumption_metrics): exp backoff retry (#5317)

Split off from #5297. Depends on #5315.
Cc: #5175 for retry
This commit is contained in:
Joonas Koivunen
2023-09-16 01:11:01 +03:00
committed by GitHub
parent 00c4c8e2e8
commit a7f4ee02a3
2 changed files with 115 additions and 38 deletions

View File

@@ -6,7 +6,6 @@
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::{mgr, LogicalSizeCalculationCause};
use anyhow;
use chrono::{DateTime, Utc};
use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
use futures::stream::StreamExt;
@@ -17,6 +16,7 @@ use serde_with::{serde_as, DisplayFromStr};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::id::{NodeId, TenantId, TimelineId};
use utils::lsn::Lsn;
@@ -328,6 +328,8 @@ async fn collect_metrics_iteration(
return;
}
let cancel = task_mgr::shutdown_token();
// Send metrics.
// Split into chunks of 1000 metrics to avoid exceeding the max request size
let chunks = current_metrics.chunks(CHUNK_SIZE);
@@ -336,6 +338,8 @@ async fn collect_metrics_iteration(
let node_id = node_id.to_string();
let mut buffer = bytes::BytesMut::new();
for chunk in chunks {
chunk_to_send.clear();
@@ -351,45 +355,28 @@ async fn collect_metrics_iteration(
},
}));
const MAX_RETRIES: u32 = 3;
use bytes::BufMut;
for attempt in 0..MAX_RETRIES {
let res = client
.post(metric_collection_endpoint.clone())
.json(&EventChunk {
events: (&chunk_to_send).into(),
})
.send()
.await;
// FIXME: this is a new panic we did not have before. not really essential, but panics from
// this task currently restart pageserver.
serde_json::to_writer(
(&mut buffer).writer(),
&EventChunk {
events: (&chunk_to_send).into(),
},
)
.expect("serialization must not fail and bytesmut grows");
match res {
Ok(res) => {
if res.status().is_success() {
// update cached metrics after they were sent successfully
for (curr_key, curr_val) in chunk.iter() {
cached_metrics.insert(curr_key.clone(), *curr_val);
}
} else {
error!("metrics endpoint refused the sent metrics: {:?}", res);
for metric in chunk_to_send
.iter()
.filter(|metric| metric.value > (1u64 << 40))
{
// Report if the metric value is suspiciously large
error!("potentially abnormal metric value: {:?}", metric);
}
}
break;
}
Err(err) if err.is_timeout() => {
error!(attempt, "timeout sending metrics, retrying immediately");
continue;
}
Err(err) => {
error!(attempt, ?err, "failed to send metrics");
break;
}
let body = buffer.split().freeze();
let res = upload(client, metric_collection_endpoint, body, &cancel).await;
if res.is_ok() {
for (curr_key, curr_val) in chunk.iter() {
cached_metrics.insert(curr_key.clone(), *curr_val);
}
} else {
// no need to log, backoff::retry and upload have done it, just give up uploading.
}
}
}
@@ -448,6 +435,94 @@ where
current_metrics
}
enum UploadError {
Rejected(reqwest::StatusCode),
Reqwest(reqwest::Error),
Cancelled,
}
impl std::fmt::Debug for UploadError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// use same impl because backoff::retry will log this using both
std::fmt::Display::fmt(self, f)
}
}
impl std::fmt::Display for UploadError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
use UploadError::*;
match self {
Rejected(code) => write!(f, "server rejected the metrics with {code}"),
Reqwest(e) => write!(f, "request failed: {e}"),
Cancelled => write!(f, "cancelled"),
}
}
}
impl UploadError {
fn is_reject(&self) -> bool {
matches!(self, UploadError::Rejected(_))
}
}
async fn upload(
client: &reqwest::Client,
metric_collection_endpoint: &reqwest::Url,
body: bytes::Bytes,
cancel: &CancellationToken,
) -> Result<(), UploadError> {
let warn_after = 3;
let max_attempts = 10;
let res = utils::backoff::retry(
move || {
let body = body.clone();
async move {
let res = client
.post(metric_collection_endpoint.clone())
.header(reqwest::header::CONTENT_TYPE, "application/json")
.body(body)
.send()
.await;
let res = res.and_then(|res| res.error_for_status());
match res {
Ok(_response) => Ok(()),
Err(e) => {
let status = e.status().filter(|s| s.is_client_error());
if let Some(status) = status {
Err(UploadError::Rejected(status))
} else {
Err(UploadError::Reqwest(e))
}
}
}
}
},
UploadError::is_reject,
warn_after,
max_attempts,
"upload consumption_metrics",
utils::backoff::Cancel::new(cancel.clone(), || UploadError::Cancelled),
)
.await;
match &res {
Ok(_) => {}
Err(e) if e.is_reject() => {
// permanent errors currently do not get logged by backoff::retry
// display alternate has no effect, but keeping it here for easier pattern matching.
tracing::error!("failed to upload metrics: {e:#}");
}
Err(_) => {
// these have been logged already
}
}
res
}
/// Testing helping in-between abstraction allowing testing metrics without actual Tenants.
struct TenantSnapshot {
resident_size: u64,

View File

@@ -32,7 +32,9 @@ def test_threshold_based_eviction(
synthetic_size_calculation_interval="2s"
metric_collection_endpoint="http://{host}:{port}/nonexistent"
"""
metrics_refused_log_line = ".*metrics endpoint refused the sent metrics.*/nonexistent.*"
metrics_refused_log_line = (
".*metrics_collection:.* upload consumption_metrics (still failed|failed, will retry).*"
)
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.append(metrics_refused_log_line)