diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index e74fe89cc8..a4a68581fa 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -6,7 +6,6 @@ use crate::context::{DownloadBehavior, RequestContext}; use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}; use crate::tenant::{mgr, LogicalSizeCalculationCause}; -use anyhow; use chrono::{DateTime, Utc}; use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE}; use futures::stream::StreamExt; @@ -17,6 +16,7 @@ use serde_with::{serde_as, DisplayFromStr}; use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, SystemTime}; +use tokio_util::sync::CancellationToken; use tracing::*; use utils::id::{NodeId, TenantId, TimelineId}; use utils::lsn::Lsn; @@ -328,6 +328,8 @@ async fn collect_metrics_iteration( return; } + let cancel = task_mgr::shutdown_token(); + // Send metrics. // Split into chunks of 1000 metrics to avoid exceeding the max request size let chunks = current_metrics.chunks(CHUNK_SIZE); @@ -336,6 +338,8 @@ async fn collect_metrics_iteration( let node_id = node_id.to_string(); + let mut buffer = bytes::BytesMut::new(); + for chunk in chunks { chunk_to_send.clear(); @@ -351,45 +355,28 @@ async fn collect_metrics_iteration( }, })); - const MAX_RETRIES: u32 = 3; + use bytes::BufMut; - for attempt in 0..MAX_RETRIES { - let res = client - .post(metric_collection_endpoint.clone()) - .json(&EventChunk { - events: (&chunk_to_send).into(), - }) - .send() - .await; + // FIXME: this is a new panic we did not have before. not really essential, but panics from + // this task currently restart pageserver. + serde_json::to_writer( + (&mut buffer).writer(), + &EventChunk { + events: (&chunk_to_send).into(), + }, + ) + .expect("serialization must not fail and bytesmut grows"); - match res { - Ok(res) => { - if res.status().is_success() { - // update cached metrics after they were sent successfully - for (curr_key, curr_val) in chunk.iter() { - cached_metrics.insert(curr_key.clone(), *curr_val); - } - } else { - error!("metrics endpoint refused the sent metrics: {:?}", res); - for metric in chunk_to_send - .iter() - .filter(|metric| metric.value > (1u64 << 40)) - { - // Report if the metric value is suspiciously large - error!("potentially abnormal metric value: {:?}", metric); - } - } - break; - } - Err(err) if err.is_timeout() => { - error!(attempt, "timeout sending metrics, retrying immediately"); - continue; - } - Err(err) => { - error!(attempt, ?err, "failed to send metrics"); - break; - } + let body = buffer.split().freeze(); + + let res = upload(client, metric_collection_endpoint, body, &cancel).await; + + if res.is_ok() { + for (curr_key, curr_val) in chunk.iter() { + cached_metrics.insert(curr_key.clone(), *curr_val); } + } else { + // no need to log, backoff::retry and upload have done it, just give up uploading. } } } @@ -448,6 +435,94 @@ where current_metrics } +enum UploadError { + Rejected(reqwest::StatusCode), + Reqwest(reqwest::Error), + Cancelled, +} + +impl std::fmt::Debug for UploadError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // use same impl because backoff::retry will log this using both + std::fmt::Display::fmt(self, f) + } +} + +impl std::fmt::Display for UploadError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use UploadError::*; + + match self { + Rejected(code) => write!(f, "server rejected the metrics with {code}"), + Reqwest(e) => write!(f, "request failed: {e}"), + Cancelled => write!(f, "cancelled"), + } + } +} + +impl UploadError { + fn is_reject(&self) -> bool { + matches!(self, UploadError::Rejected(_)) + } +} + +async fn upload( + client: &reqwest::Client, + metric_collection_endpoint: &reqwest::Url, + body: bytes::Bytes, + cancel: &CancellationToken, +) -> Result<(), UploadError> { + let warn_after = 3; + let max_attempts = 10; + let res = utils::backoff::retry( + move || { + let body = body.clone(); + async move { + let res = client + .post(metric_collection_endpoint.clone()) + .header(reqwest::header::CONTENT_TYPE, "application/json") + .body(body) + .send() + .await; + + let res = res.and_then(|res| res.error_for_status()); + + match res { + Ok(_response) => Ok(()), + Err(e) => { + let status = e.status().filter(|s| s.is_client_error()); + if let Some(status) = status { + Err(UploadError::Rejected(status)) + } else { + Err(UploadError::Reqwest(e)) + } + } + } + } + }, + UploadError::is_reject, + warn_after, + max_attempts, + "upload consumption_metrics", + utils::backoff::Cancel::new(cancel.clone(), || UploadError::Cancelled), + ) + .await; + + match &res { + Ok(_) => {} + Err(e) if e.is_reject() => { + // permanent errors currently do not get logged by backoff::retry + // display alternate has no effect, but keeping it here for easier pattern matching. + tracing::error!("failed to upload metrics: {e:#}"); + } + Err(_) => { + // these have been logged already + } + } + + res +} + /// Testing helping in-between abstraction allowing testing metrics without actual Tenants. struct TenantSnapshot { resident_size: u64, diff --git a/test_runner/regress/test_threshold_based_eviction.py b/test_runner/regress/test_threshold_based_eviction.py index 6f5e3e2518..12866accc7 100644 --- a/test_runner/regress/test_threshold_based_eviction.py +++ b/test_runner/regress/test_threshold_based_eviction.py @@ -32,7 +32,9 @@ def test_threshold_based_eviction( synthetic_size_calculation_interval="2s" metric_collection_endpoint="http://{host}:{port}/nonexistent" """ - metrics_refused_log_line = ".*metrics endpoint refused the sent metrics.*/nonexistent.*" + metrics_refused_log_line = ( + ".*metrics_collection:.* upload consumption_metrics (still failed|failed, will retry).*" + ) env = neon_env_builder.init_start() env.pageserver.allowed_errors.append(metrics_refused_log_line)