diff --git a/pageserver/src/storage_sync.rs b/pageserver/src/storage_sync.rs index d1c8922259..15f24d7e24 100644 --- a/pageserver/src/storage_sync.rs +++ b/pageserver/src/storage_sync.rs @@ -979,7 +979,7 @@ enum DownloadStatus { #[derive(Debug)] enum UploadStatus { Uploaded, - Failed, + Failed(anyhow::Error), Nothing, } @@ -1056,41 +1056,43 @@ where let (upload_status, download_status) = tokio::join!( async { if let Some(upload_data) = upload_data { - match validate_task_retries(upload_data, max_sync_errors) + let upload_retries = upload_data.retries; + match validate_task_retries(upload_retries, max_sync_errors) .instrument(info_span!("retries_validation")) .await { - ControlFlow::Continue(new_upload_data) => { + ControlFlow::Continue(()) => { upload_timeline_data( conf, (storage.as_ref(), &index, sync_queue), current_remote_timeline.as_ref(), sync_id, - new_upload_data, + upload_data, sync_start, "upload", ) - .await; - UploadStatus::Uploaded - } - ControlFlow::Break(failed_upload_data) => { - if let Err(e) = update_remote_data( - conf, - storage.as_ref(), - &index, - sync_id, - RemoteDataUpdate::Upload { - uploaded_data: failed_upload_data.data, - upload_failed: true, - }, - ) .await - { - error!("Failed to update remote timeline {sync_id}: {e:?}"); - } - - UploadStatus::Failed } + ControlFlow::Break(()) => match update_remote_data( + conf, + storage.as_ref(), + &index, + sync_id, + RemoteDataUpdate::Upload { + uploaded_data: upload_data.data, + upload_failed: true, + }, + ) + .await + { + Ok(()) => UploadStatus::Failed(anyhow::anyhow!( + "Aborted after retries validation, current retries: {upload_retries}, max retries allowed: {max_sync_errors}" + )), + Err(e) => { + error!("Failed to update remote timeline {sync_id}: {e:?}"); + UploadStatus::Failed(e) + } + }, } } else { UploadStatus::Nothing @@ -1099,23 +1101,23 @@ where .instrument(info_span!("upload_timeline_data")), async { if let Some(download_data) = download_data { - match validate_task_retries(download_data, max_sync_errors) + match validate_task_retries(download_data.retries, max_sync_errors) .instrument(info_span!("retries_validation")) .await { - ControlFlow::Continue(new_download_data) => { + ControlFlow::Continue(()) => { return download_timeline_data( conf, (storage.as_ref(), &index, sync_queue), current_remote_timeline.as_ref(), sync_id, - new_download_data, + download_data, sync_start, "download", ) .await; } - ControlFlow::Break(_) => { + ControlFlow::Break(()) => { index .write() .await @@ -1132,29 +1134,29 @@ where if let Some(delete_data) = batch.delete { match upload_status { UploadStatus::Uploaded | UploadStatus::Nothing => { - match validate_task_retries(delete_data, max_sync_errors) + match validate_task_retries(delete_data.retries, max_sync_errors) .instrument(info_span!("retries_validation")) .await { - ControlFlow::Continue(new_delete_data) => { + ControlFlow::Continue(()) => { delete_timeline_data( conf, (storage.as_ref(), &index, sync_queue), sync_id, - new_delete_data, + delete_data, sync_start, "delete", ) .instrument(info_span!("delete_timeline_data")) .await; } - ControlFlow::Break(failed_delete_data) => { + ControlFlow::Break(()) => { if let Err(e) = update_remote_data( conf, storage.as_ref(), &index, sync_id, - RemoteDataUpdate::Delete(&failed_delete_data.data.deleted_layers), + RemoteDataUpdate::Delete(&delete_data.data.deleted_layers), ) .await { @@ -1163,8 +1165,8 @@ where } } } - UploadStatus::Failed => { - warn!("Skipping delete task due to failed upload tasks, reenqueuing"); + UploadStatus::Failed(e) => { + warn!("Skipping delete task due to failed upload tasks, reenqueuing. Upload data: {:?}, delete data: {delete_data:?}. Upload failure: {e:#}", batch.upload); sync_queue.push(sync_id, SyncTask::Delete(delete_data)); } } @@ -1349,7 +1351,8 @@ async fn upload_timeline_data
(
new_upload_data: SyncData (
)
.await
{
- UploadedTimeline::FailedAndRescheduled => {
+ UploadedTimeline::FailedAndRescheduled(e) => {
register_sync_status(sync_id, sync_start, task_name, Some(false));
- return;
+ return UploadStatus::Failed(e);
}
UploadedTimeline::Successful(upload_data) => upload_data,
};
@@ -1383,12 +1386,14 @@ async fn upload_timeline_data (
{
Ok(()) => {
register_sync_status(sync_id, sync_start, task_name, Some(true));
+ UploadStatus::Uploaded
}
Err(e) => {
error!("Failed to update remote timeline {sync_id}: {e:?}");
uploaded_data.retries += 1;
sync_queue.push(sync_id, SyncTask::Upload(uploaded_data));
register_sync_status(sync_id, sync_start, task_name, Some(false));
+ UploadStatus::Failed(e)
}
}
}
@@ -1491,21 +1496,17 @@ where
.context("Failed to upload new index part")
}
-async fn validate_task_retries