mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
download.rs: fix error 'captured variable cannot escape FnMut closure body"
This commit is contained in:
@@ -37,6 +37,20 @@ pub fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_sec
|
||||
}
|
||||
}
|
||||
|
||||
pub trait Op<T, E> {
|
||||
async fn call(&mut self) -> Result<T, E>;
|
||||
}
|
||||
|
||||
impl<T, E, F, Fut> Op<T, E> for F
|
||||
where
|
||||
F: FnMut() -> Fut,
|
||||
Fut: Future<Output = Result<T, E>>,
|
||||
{
|
||||
async fn call(&mut self) -> Result<T, E> {
|
||||
(&mut *self)().await
|
||||
}
|
||||
}
|
||||
|
||||
/// Retries passed operation until one of the following conditions are met:
|
||||
/// - encountered error is considered as permanent (non-retryable)
|
||||
/// - retries have been exhausted
|
||||
@@ -51,8 +65,8 @@ pub fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_sec
|
||||
/// for any other error type. Final failed attempt is logged with `{:?}`.
|
||||
///
|
||||
/// Returns `None` if cancellation was noticed during backoff or the terminal result.
|
||||
pub async fn retry<T, O, F, E>(
|
||||
mut op: O,
|
||||
pub async fn retry<T, E>(
|
||||
mut op: impl Op<T, E>,
|
||||
is_permanent: impl Fn(&E) -> bool,
|
||||
warn_threshold: u32,
|
||||
max_retries: u32,
|
||||
@@ -63,8 +77,6 @@ where
|
||||
// Not std::error::Error because anyhow::Error doesnt implement it.
|
||||
// For context see https://github.com/dtolnay/anyhow/issues/63
|
||||
E: Display + Debug + 'static,
|
||||
O: FnMut() -> F,
|
||||
F: Future<Output = Result<T, E>>,
|
||||
{
|
||||
let mut attempts = 0;
|
||||
loop {
|
||||
@@ -72,7 +84,7 @@ where
|
||||
return None;
|
||||
}
|
||||
|
||||
let result = op().await;
|
||||
let result = op.call().await;
|
||||
match &result {
|
||||
Ok(_) => {
|
||||
if attempts > 0 {
|
||||
|
||||
@@ -78,8 +78,33 @@ pub async fn download_layer_file<'a>(
|
||||
// If pageserver crashes the temp file will be deleted on startup and re-downloaded.
|
||||
let temp_file_path = path_with_suffix_extension(local_path, TEMP_DOWNLOAD_EXTENSION);
|
||||
|
||||
struct DownloadObjectClosure<'a> {
|
||||
storage: &'a GenericRemoteStorage,
|
||||
remote_path: &'a RemotePath,
|
||||
temp_file_path: &'a Utf8PathBuf,
|
||||
cancel: &'a CancellationToken,
|
||||
ctx: &'a mut RequestContext,
|
||||
}
|
||||
impl backoff::Op<u64, DownloadError> for DownloadObjectClosure<'_> {
|
||||
async fn call(&mut self) -> Result<u64, DownloadError> {
|
||||
let DownloadObjectClosure {
|
||||
storage,
|
||||
remote_path,
|
||||
temp_file_path,
|
||||
cancel,
|
||||
ctx,
|
||||
} = self;
|
||||
download_object(storage, remote_path, temp_file_path, cancel, ctx).await
|
||||
}
|
||||
}
|
||||
let bytes_amount = download_retry(
|
||||
|| async { download_object(storage, &remote_path, &temp_file_path, cancel, ctx).await },
|
||||
DownloadObjectClosure {
|
||||
storage,
|
||||
remote_path: &remote_path,
|
||||
temp_file_path: &temp_file_path,
|
||||
cancel,
|
||||
ctx,
|
||||
},
|
||||
&format!("download {remote_path:?}"),
|
||||
cancel,
|
||||
)
|
||||
@@ -568,15 +593,11 @@ pub(crate) async fn download_initdb_tar_zst(
|
||||
/// with backoff.
|
||||
///
|
||||
/// (See similar logic for uploads in `perform_upload_task`)
|
||||
pub(super) async fn download_retry<T, O, F>(
|
||||
op: O,
|
||||
pub(super) async fn download_retry<T>(
|
||||
op: impl backoff::Op<T, DownloadError>,
|
||||
description: &str,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<T, DownloadError>
|
||||
where
|
||||
O: FnMut() -> F,
|
||||
F: Future<Output = Result<T, DownloadError>>,
|
||||
{
|
||||
) -> Result<T, DownloadError> {
|
||||
backoff::retry(
|
||||
op,
|
||||
DownloadError::is_permanent,
|
||||
|
||||
Reference in New Issue
Block a user