mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 05:52:55 +00:00
Improve retries and logging
This commit is contained in:
@@ -3,8 +3,9 @@ use std::sync::Arc;
|
||||
|
||||
use anyhow::{anyhow, ensure};
|
||||
use pageserver_page_api as page_api;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::backoff;
|
||||
use tokio::time::Instant;
|
||||
use tracing::{error, info, warn};
|
||||
use utils::backoff::exponential_backoff_duration;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::shard::{ShardCount, ShardIndex, ShardNumber};
|
||||
|
||||
@@ -43,7 +44,7 @@ impl PageserverClient {
|
||||
&self,
|
||||
req: page_api::CheckRelExistsRequest,
|
||||
) -> tonic::Result<page_api::CheckRelExistsResponse> {
|
||||
self.with_retries("check_rel_exists", async || {
|
||||
self.with_retries(async || {
|
||||
// Relation metadata is only available on shard 0.
|
||||
let mut client = self.shards.get_zero().client().await?;
|
||||
client.check_rel_exists(req).await
|
||||
@@ -56,7 +57,7 @@ impl PageserverClient {
|
||||
&self,
|
||||
req: page_api::GetDbSizeRequest,
|
||||
) -> tonic::Result<page_api::GetDbSizeResponse> {
|
||||
self.with_retries("get_db_size", async || {
|
||||
self.with_retries(async || {
|
||||
// Relation metadata is only available on shard 0.
|
||||
let mut client = self.shards.get_zero().client().await?;
|
||||
client.get_db_size(req).await
|
||||
@@ -75,7 +76,7 @@ impl PageserverClient {
|
||||
// TODO: support multiple shards.
|
||||
let shard_id = ShardIndex::unsharded();
|
||||
|
||||
self.with_retries("get_page", async || {
|
||||
self.with_retries(async || {
|
||||
let stream = self.shards.get(shard_id)?.stream().await;
|
||||
let resp = stream.send(req.clone()).await?;
|
||||
|
||||
@@ -96,7 +97,7 @@ impl PageserverClient {
|
||||
&self,
|
||||
req: page_api::GetRelSizeRequest,
|
||||
) -> tonic::Result<page_api::GetRelSizeResponse> {
|
||||
self.with_retries("get_rel_size", async || {
|
||||
self.with_retries(async || {
|
||||
// Relation metadata is only available on shard 0.
|
||||
let mut client = self.shards.get_zero().client().await?;
|
||||
client.get_rel_size(req).await
|
||||
@@ -109,7 +110,7 @@ impl PageserverClient {
|
||||
&self,
|
||||
req: page_api::GetSlruSegmentRequest,
|
||||
) -> tonic::Result<page_api::GetSlruSegmentResponse> {
|
||||
self.with_retries("get_slru_segment", async || {
|
||||
self.with_retries(async || {
|
||||
// SLRU segments are only available on shard 0.
|
||||
let mut client = self.shards.get_zero().client().await?;
|
||||
client.get_slru_segment(req).await
|
||||
@@ -117,50 +118,114 @@ impl PageserverClient {
|
||||
.await
|
||||
}
|
||||
|
||||
/// Runs the given closure with exponential backoff retries.
|
||||
async fn with_retries<T, F, O>(&self, name: &str, f: F) -> tonic::Result<T>
|
||||
/// Runs the given closure with retries (exponential backoff). Logs errors.
|
||||
async fn with_retries<T, F, O>(&self, mut f: F) -> tonic::Result<T>
|
||||
where
|
||||
F: FnMut() -> O,
|
||||
O: Future<Output = tonic::Result<T>>,
|
||||
{
|
||||
/// TODO: tune retry parameters (retry forever?).
|
||||
/// TODO: add timeouts?
|
||||
const WARN_THRESHOLD: u32 = 1;
|
||||
const MAX_RETRIES: u32 = 10;
|
||||
|
||||
fn is_permanent(err: &tonic::Status) -> bool {
|
||||
match err.code() {
|
||||
// Not really an error, but whatever. Don't retry.
|
||||
tonic::Code::Ok => true,
|
||||
// TODO: tune these, and/or make them configurable. Should we retry forever?
|
||||
const REQUEST_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
const TOTAL_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);
|
||||
const BASE_BACKOFF: f64 = 0.1;
|
||||
const MAX_BACKOFF: f64 = 10.0;
|
||||
|
||||
fn should_retry(code: tonic::Code) -> bool {
|
||||
match code {
|
||||
tonic::Code::Ok => panic!("unexpected Ok status code"),
|
||||
// These codes are transient, so retry them.
|
||||
tonic::Code::Aborted => false,
|
||||
tonic::Code::Cancelled => false,
|
||||
tonic::Code::DeadlineExceeded => false, // maybe transient slowness
|
||||
tonic::Code::Internal => false, // maybe transient failure
|
||||
tonic::Code::ResourceExhausted => false,
|
||||
tonic::Code::Unavailable => false,
|
||||
tonic::Code::Unknown => false, // may as well retry?
|
||||
|
||||
tonic::Code::Aborted => true,
|
||||
tonic::Code::Cancelled => true,
|
||||
tonic::Code::DeadlineExceeded => true, // maybe transient slowness
|
||||
tonic::Code::Internal => true, // maybe transient failure?
|
||||
tonic::Code::ResourceExhausted => true,
|
||||
tonic::Code::Unavailable => true,
|
||||
// The following codes will like continue to fail, so don't retry.
|
||||
tonic::Code::AlreadyExists => true,
|
||||
tonic::Code::DataLoss => true,
|
||||
tonic::Code::FailedPrecondition => true,
|
||||
tonic::Code::InvalidArgument => true,
|
||||
tonic::Code::NotFound => true,
|
||||
tonic::Code::OutOfRange => true,
|
||||
tonic::Code::PermissionDenied => true,
|
||||
tonic::Code::Unimplemented => true,
|
||||
tonic::Code::Unauthenticated => true,
|
||||
tonic::Code::AlreadyExists => false,
|
||||
tonic::Code::DataLoss => false,
|
||||
tonic::Code::FailedPrecondition => false,
|
||||
tonic::Code::InvalidArgument => false,
|
||||
tonic::Code::NotFound => false,
|
||||
tonic::Code::OutOfRange => false,
|
||||
tonic::Code::PermissionDenied => false,
|
||||
tonic::Code::Unauthenticated => false,
|
||||
tonic::Code::Unimplemented => false,
|
||||
tonic::Code::Unknown => false,
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: consider custom logic and logging here, using the caller's span for name.
|
||||
// TODO: cancellation? Could just drop the future.
|
||||
let cancel = CancellationToken::new();
|
||||
backoff::retry(f, is_permanent, WARN_THRESHOLD, MAX_RETRIES, name, &cancel)
|
||||
.await
|
||||
.expect("never cancelled (for now)")
|
||||
let started = Instant::now();
|
||||
let deadline = started + TOTAL_TIMEOUT;
|
||||
let mut last_error = None;
|
||||
let mut retries = 0;
|
||||
loop {
|
||||
// Set up a future to wait for the backoff (if any) and run the request with a timeout.
|
||||
let backoff = exponential_backoff_duration(retries, BASE_BACKOFF, MAX_BACKOFF);
|
||||
let backoff_and_try = async {
|
||||
tokio::time::sleep(backoff).await;
|
||||
let request_started = Instant::now();
|
||||
tokio::time::timeout(REQUEST_TIMEOUT, f())
|
||||
.await
|
||||
.map_err(|_| {
|
||||
tonic::Status::deadline_exceeded(format!(
|
||||
"request timed out after {:.3}s",
|
||||
request_started.elapsed().as_secs_f64()
|
||||
))
|
||||
})?
|
||||
};
|
||||
|
||||
// Wait for the backoff and request, or bail out if the total timeout is exceeded.
|
||||
let result = tokio::select! {
|
||||
result = backoff_and_try => result,
|
||||
|
||||
_ = tokio::time::sleep_until(deadline) => {
|
||||
let last_error = last_error.unwrap_or_else(|| {
|
||||
tonic::Status::deadline_exceeded(format!(
|
||||
"request timed out after {:.3}s",
|
||||
started.elapsed().as_secs_f64()
|
||||
))
|
||||
});
|
||||
error!(
|
||||
"giving up after {:.3}s and {retries} retries, last error {:?}: {}",
|
||||
started.elapsed().as_secs_f64(), last_error.code(), last_error.message(),
|
||||
);
|
||||
return Err(last_error);
|
||||
}
|
||||
};
|
||||
|
||||
match result {
|
||||
Ok(result) => {
|
||||
if retries > 0 {
|
||||
info!(
|
||||
"request succeeded after {retries} retries in {:.3}s",
|
||||
started.elapsed().as_secs_f64(),
|
||||
);
|
||||
}
|
||||
|
||||
return Ok(result);
|
||||
}
|
||||
|
||||
Err(status) => {
|
||||
let (code, message) = (status.code(), status.message());
|
||||
let should_retry = should_retry(code);
|
||||
let attempt = retries + 1;
|
||||
|
||||
if !should_retry {
|
||||
// NB: include the attempt here too. This isn't necessarily the first
|
||||
// attempt, because the error may change between attempts.
|
||||
error!(
|
||||
"request failed with {code:?}: {message}, not retrying (attempt {attempt})"
|
||||
);
|
||||
return Err(status);
|
||||
}
|
||||
|
||||
warn!("request failed with {code:?}: {message}, retrying (attempt {attempt})");
|
||||
|
||||
retries += 1;
|
||||
last_error = Some(status);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user