mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 00:42:54 +00:00
Better storage sync logs (#2268)
This commit is contained in:
@@ -979,7 +979,7 @@ enum DownloadStatus {
|
||||
#[derive(Debug)]
|
||||
enum UploadStatus {
|
||||
Uploaded,
|
||||
Failed,
|
||||
Failed(anyhow::Error),
|
||||
Nothing,
|
||||
}
|
||||
|
||||
@@ -1056,41 +1056,43 @@ where
|
||||
let (upload_status, download_status) = tokio::join!(
|
||||
async {
|
||||
if let Some(upload_data) = upload_data {
|
||||
match validate_task_retries(upload_data, max_sync_errors)
|
||||
let upload_retries = upload_data.retries;
|
||||
match validate_task_retries(upload_retries, max_sync_errors)
|
||||
.instrument(info_span!("retries_validation"))
|
||||
.await
|
||||
{
|
||||
ControlFlow::Continue(new_upload_data) => {
|
||||
ControlFlow::Continue(()) => {
|
||||
upload_timeline_data(
|
||||
conf,
|
||||
(storage.as_ref(), &index, sync_queue),
|
||||
current_remote_timeline.as_ref(),
|
||||
sync_id,
|
||||
new_upload_data,
|
||||
upload_data,
|
||||
sync_start,
|
||||
"upload",
|
||||
)
|
||||
.await;
|
||||
UploadStatus::Uploaded
|
||||
}
|
||||
ControlFlow::Break(failed_upload_data) => {
|
||||
if let Err(e) = update_remote_data(
|
||||
conf,
|
||||
storage.as_ref(),
|
||||
&index,
|
||||
sync_id,
|
||||
RemoteDataUpdate::Upload {
|
||||
uploaded_data: failed_upload_data.data,
|
||||
upload_failed: true,
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
error!("Failed to update remote timeline {sync_id}: {e:?}");
|
||||
}
|
||||
|
||||
UploadStatus::Failed
|
||||
}
|
||||
ControlFlow::Break(()) => match update_remote_data(
|
||||
conf,
|
||||
storage.as_ref(),
|
||||
&index,
|
||||
sync_id,
|
||||
RemoteDataUpdate::Upload {
|
||||
uploaded_data: upload_data.data,
|
||||
upload_failed: true,
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(()) => UploadStatus::Failed(anyhow::anyhow!(
|
||||
"Aborted after retries validation, current retries: {upload_retries}, max retries allowed: {max_sync_errors}"
|
||||
)),
|
||||
Err(e) => {
|
||||
error!("Failed to update remote timeline {sync_id}: {e:?}");
|
||||
UploadStatus::Failed(e)
|
||||
}
|
||||
},
|
||||
}
|
||||
} else {
|
||||
UploadStatus::Nothing
|
||||
@@ -1099,23 +1101,23 @@ where
|
||||
.instrument(info_span!("upload_timeline_data")),
|
||||
async {
|
||||
if let Some(download_data) = download_data {
|
||||
match validate_task_retries(download_data, max_sync_errors)
|
||||
match validate_task_retries(download_data.retries, max_sync_errors)
|
||||
.instrument(info_span!("retries_validation"))
|
||||
.await
|
||||
{
|
||||
ControlFlow::Continue(new_download_data) => {
|
||||
ControlFlow::Continue(()) => {
|
||||
return download_timeline_data(
|
||||
conf,
|
||||
(storage.as_ref(), &index, sync_queue),
|
||||
current_remote_timeline.as_ref(),
|
||||
sync_id,
|
||||
new_download_data,
|
||||
download_data,
|
||||
sync_start,
|
||||
"download",
|
||||
)
|
||||
.await;
|
||||
}
|
||||
ControlFlow::Break(_) => {
|
||||
ControlFlow::Break(()) => {
|
||||
index
|
||||
.write()
|
||||
.await
|
||||
@@ -1132,29 +1134,29 @@ where
|
||||
if let Some(delete_data) = batch.delete {
|
||||
match upload_status {
|
||||
UploadStatus::Uploaded | UploadStatus::Nothing => {
|
||||
match validate_task_retries(delete_data, max_sync_errors)
|
||||
match validate_task_retries(delete_data.retries, max_sync_errors)
|
||||
.instrument(info_span!("retries_validation"))
|
||||
.await
|
||||
{
|
||||
ControlFlow::Continue(new_delete_data) => {
|
||||
ControlFlow::Continue(()) => {
|
||||
delete_timeline_data(
|
||||
conf,
|
||||
(storage.as_ref(), &index, sync_queue),
|
||||
sync_id,
|
||||
new_delete_data,
|
||||
delete_data,
|
||||
sync_start,
|
||||
"delete",
|
||||
)
|
||||
.instrument(info_span!("delete_timeline_data"))
|
||||
.await;
|
||||
}
|
||||
ControlFlow::Break(failed_delete_data) => {
|
||||
ControlFlow::Break(()) => {
|
||||
if let Err(e) = update_remote_data(
|
||||
conf,
|
||||
storage.as_ref(),
|
||||
&index,
|
||||
sync_id,
|
||||
RemoteDataUpdate::Delete(&failed_delete_data.data.deleted_layers),
|
||||
RemoteDataUpdate::Delete(&delete_data.data.deleted_layers),
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -1163,8 +1165,8 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
UploadStatus::Failed => {
|
||||
warn!("Skipping delete task due to failed upload tasks, reenqueuing");
|
||||
UploadStatus::Failed(e) => {
|
||||
warn!("Skipping delete task due to failed upload tasks, reenqueuing. Upload data: {:?}, delete data: {delete_data:?}. Upload failure: {e:#}", batch.upload);
|
||||
sync_queue.push(sync_id, SyncTask::Delete(delete_data));
|
||||
}
|
||||
}
|
||||
@@ -1349,7 +1351,8 @@ async fn upload_timeline_data<P, S>(
|
||||
new_upload_data: SyncData<LayersUpload>,
|
||||
sync_start: Instant,
|
||||
task_name: &str,
|
||||
) where
|
||||
) -> UploadStatus
|
||||
where
|
||||
P: Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
@@ -1362,9 +1365,9 @@ async fn upload_timeline_data<P, S>(
|
||||
)
|
||||
.await
|
||||
{
|
||||
UploadedTimeline::FailedAndRescheduled => {
|
||||
UploadedTimeline::FailedAndRescheduled(e) => {
|
||||
register_sync_status(sync_id, sync_start, task_name, Some(false));
|
||||
return;
|
||||
return UploadStatus::Failed(e);
|
||||
}
|
||||
UploadedTimeline::Successful(upload_data) => upload_data,
|
||||
};
|
||||
@@ -1383,12 +1386,14 @@ async fn upload_timeline_data<P, S>(
|
||||
{
|
||||
Ok(()) => {
|
||||
register_sync_status(sync_id, sync_start, task_name, Some(true));
|
||||
UploadStatus::Uploaded
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to update remote timeline {sync_id}: {e:?}");
|
||||
uploaded_data.retries += 1;
|
||||
sync_queue.push(sync_id, SyncTask::Upload(uploaded_data));
|
||||
register_sync_status(sync_id, sync_start, task_name, Some(false));
|
||||
UploadStatus::Failed(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1491,21 +1496,17 @@ where
|
||||
.context("Failed to upload new index part")
|
||||
}
|
||||
|
||||
async fn validate_task_retries<T>(
|
||||
sync_data: SyncData<T>,
|
||||
async fn validate_task_retries(
|
||||
current_attempt: u32,
|
||||
max_sync_errors: NonZeroU32,
|
||||
) -> ControlFlow<SyncData<T>, SyncData<T>> {
|
||||
let current_attempt = sync_data.retries;
|
||||
) -> ControlFlow<(), ()> {
|
||||
let max_sync_errors = max_sync_errors.get();
|
||||
if current_attempt >= max_sync_errors {
|
||||
error!(
|
||||
"Aborting task that failed {current_attempt} times, exceeding retries threshold of {max_sync_errors}",
|
||||
);
|
||||
return ControlFlow::Break(sync_data);
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
|
||||
exponential_backoff(current_attempt, 1.0, 30.0).await;
|
||||
ControlFlow::Continue(sync_data)
|
||||
ControlFlow::Continue(())
|
||||
}
|
||||
|
||||
fn schedule_first_sync_tasks(
|
||||
|
||||
@@ -95,6 +95,8 @@ where
|
||||
debug!("Reenqueuing failed delete task for timeline {sync_id}");
|
||||
delete_data.retries += 1;
|
||||
sync_queue.push(sync_id, SyncTask::Delete(delete_data));
|
||||
} else {
|
||||
info!("Successfully deleted all layers");
|
||||
}
|
||||
errored
|
||||
}
|
||||
|
||||
@@ -75,7 +75,7 @@ where
|
||||
#[derive(Debug)]
|
||||
pub(super) enum UploadedTimeline {
|
||||
/// Upload failed due to some error, the upload task is rescheduled for another retry.
|
||||
FailedAndRescheduled,
|
||||
FailedAndRescheduled(anyhow::Error),
|
||||
/// No issues happened during the upload, all task files were put into the remote storage.
|
||||
Successful(SyncData<LayersUpload>),
|
||||
}
|
||||
@@ -179,7 +179,7 @@ where
|
||||
})
|
||||
.collect::<FuturesUnordered<_>>();
|
||||
|
||||
let mut errors_happened = false;
|
||||
let mut errors = Vec::new();
|
||||
while let Some(upload_result) = upload_tasks.next().await {
|
||||
match upload_result {
|
||||
Ok(uploaded_path) => {
|
||||
@@ -188,13 +188,13 @@ where
|
||||
}
|
||||
Err(e) => match e {
|
||||
UploadError::Other(e) => {
|
||||
errors_happened = true;
|
||||
error!("Failed to upload a layer for timeline {sync_id}: {e:?}");
|
||||
errors.push(format!("{e:#}"));
|
||||
}
|
||||
UploadError::MissingLocalFile(source_path, e) => {
|
||||
if source_path.exists() {
|
||||
errors_happened = true;
|
||||
error!("Failed to upload a layer for timeline {sync_id}: {e:?}");
|
||||
errors.push(format!("{e:#}"));
|
||||
} else {
|
||||
// We have run the upload sync task, but the file we wanted to upload is gone.
|
||||
// This is "fine" due the asynchronous nature of the sync loop: it only reacts to events and might need to
|
||||
@@ -217,14 +217,17 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
if errors_happened {
|
||||
if errors.is_empty() {
|
||||
info!("Successfully uploaded all layers");
|
||||
UploadedTimeline::Successful(upload_data)
|
||||
} else {
|
||||
debug!("Reenqueuing failed upload task for timeline {sync_id}");
|
||||
upload_data.retries += 1;
|
||||
sync_queue.push(sync_id, SyncTask::Upload(upload_data));
|
||||
UploadedTimeline::FailedAndRescheduled
|
||||
} else {
|
||||
info!("Successfully uploaded all layers");
|
||||
UploadedTimeline::Successful(upload_data)
|
||||
UploadedTimeline::FailedAndRescheduled(anyhow::anyhow!(
|
||||
"Errors appeared during layer uploads: {:?}",
|
||||
errors
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user