Share exponential backoff code and fix logic for delete task failure (#2252)

This commit is contained in:
Kirill Bulatov
2022-08-11 23:21:06 +03:00
committed by GitHub
parent e593cbaaba
commit 995a2de21e
3 changed files with 115 additions and 104 deletions

View File

@@ -93,3 +93,56 @@ pub fn shutdown_pageserver(exit_code: i32) {
info!("Shut down successfully completed");
std::process::exit(exit_code);
}
const DEFAULT_BASE_BACKOFF_SECONDS: f64 = 0.1;
const DEFAULT_MAX_BACKOFF_SECONDS: f64 = 3.0;
async fn exponential_backoff(n: u32, base_increment: f64, max_seconds: f64) {
let backoff_duration_seconds =
exponential_backoff_duration_seconds(n, base_increment, max_seconds);
if backoff_duration_seconds > 0.0 {
info!(
"Backoff: waiting {backoff_duration_seconds} seconds before processing with the task",
);
tokio::time::sleep(std::time::Duration::from_secs_f64(backoff_duration_seconds)).await;
}
}
fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_seconds: f64) -> f64 {
if n == 0 {
0.0
} else {
(1.0 + base_increment).powf(f64::from(n)).min(max_seconds)
}
}
#[cfg(test)]
mod backoff_defaults_tests {
use super::*;
#[test]
fn backoff_defaults_produce_growing_backoff_sequence() {
let mut current_backoff_value = None;
for i in 0..10_000 {
let new_backoff_value = exponential_backoff_duration_seconds(
i,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
);
if let Some(old_backoff_value) = current_backoff_value.replace(new_backoff_value) {
assert!(
old_backoff_value <= new_backoff_value,
"{i}th backoff value {new_backoff_value} is smaller than the previous one {old_backoff_value}"
)
}
}
assert_eq!(
current_backoff_value.expect("Should have produced backoff values to compare"),
DEFAULT_MAX_BACKOFF_SECONDS,
"Given big enough of retries, backoff should reach its allowed max value"
);
}
}

View File

@@ -172,6 +172,7 @@ use self::{
};
use crate::{
config::PageServerConf,
exponential_backoff,
layered_repository::{
ephemeral_file::is_ephemeral_file,
metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME},
@@ -969,14 +970,19 @@ fn storage_sync_loop<P, S>(
}
}
// needed to check whether the download happened
// more informative than just a bool
#[derive(Debug)]
enum DownloadMarker {
enum DownloadStatus {
Downloaded,
Nothing,
}
#[derive(Debug)]
enum UploadStatus {
Uploaded,
Failed,
Nothing,
}
async fn process_batches<P, S>(
conf: &'static PageServerConf,
max_sync_errors: NonZeroU32,
@@ -1016,7 +1022,7 @@ where
"Finished storage sync task for sync id {sync_id} download marker {:?}",
download_marker
);
if matches!(download_marker, DownloadMarker::Downloaded) {
if matches!(download_marker, DownloadStatus::Downloaded) {
downloaded_timelines.insert(sync_id.tenant_id);
}
}
@@ -1030,7 +1036,7 @@ async fn process_sync_task_batch<P, S>(
max_sync_errors: NonZeroU32,
sync_id: ZTenantTimelineId,
batch: SyncTaskBatch,
) -> DownloadMarker
) -> DownloadStatus
where
P: Debug + Send + Sync + 'static,
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
@@ -1047,7 +1053,7 @@ where
// When operating in a system without tasks failing over the error threshold,
// current batching and task processing systems aim to update the layer set and metadata files (remote and local),
// without "losing" such layer files.
let (upload_result, status_update) = tokio::join!(
let (upload_status, download_status) = tokio::join!(
async {
if let Some(upload_data) = upload_data {
match validate_task_retries(upload_data, max_sync_errors)
@@ -1065,7 +1071,7 @@ where
"upload",
)
.await;
return Some(());
UploadStatus::Uploaded
}
ControlFlow::Break(failed_upload_data) => {
if let Err(e) = update_remote_data(
@@ -1082,10 +1088,13 @@ where
{
error!("Failed to update remote timeline {sync_id}: {e:?}");
}
UploadStatus::Failed
}
}
} else {
UploadStatus::Nothing
}
None
}
.instrument(info_span!("upload_timeline_data")),
async {
@@ -1115,51 +1124,53 @@ where
}
}
}
DownloadMarker::Nothing
DownloadStatus::Nothing
}
.instrument(info_span!("download_timeline_data")),
);
if let Some(mut delete_data) = batch.delete {
if upload_result.is_some() {
match validate_task_retries(delete_data, max_sync_errors)
.instrument(info_span!("retries_validation"))
.await
{
ControlFlow::Continue(new_delete_data) => {
delete_timeline_data(
conf,
(storage.as_ref(), &index, sync_queue),
sync_id,
new_delete_data,
sync_start,
"delete",
)
.instrument(info_span!("delete_timeline_data"))
.await;
}
ControlFlow::Break(failed_delete_data) => {
if let Err(e) = update_remote_data(
conf,
storage.as_ref(),
&index,
sync_id,
RemoteDataUpdate::Delete(&failed_delete_data.data.deleted_layers),
)
if let Some(delete_data) = batch.delete {
match upload_status {
UploadStatus::Uploaded | UploadStatus::Nothing => {
match validate_task_retries(delete_data, max_sync_errors)
.instrument(info_span!("retries_validation"))
.await
{
error!("Failed to update remote timeline {sync_id}: {e:?}");
{
ControlFlow::Continue(new_delete_data) => {
delete_timeline_data(
conf,
(storage.as_ref(), &index, sync_queue),
sync_id,
new_delete_data,
sync_start,
"delete",
)
.instrument(info_span!("delete_timeline_data"))
.await;
}
ControlFlow::Break(failed_delete_data) => {
if let Err(e) = update_remote_data(
conf,
storage.as_ref(),
&index,
sync_id,
RemoteDataUpdate::Delete(&failed_delete_data.data.deleted_layers),
)
.await
{
error!("Failed to update remote timeline {sync_id}: {e:?}");
}
}
}
}
} else {
delete_data.retries += 1;
sync_queue.push(sync_id, SyncTask::Delete(delete_data));
warn!("Skipping delete task due to failed upload tasks, reenqueuing");
UploadStatus::Failed => {
warn!("Skipping delete task due to failed upload tasks, reenqueuing");
sync_queue.push(sync_id, SyncTask::Delete(delete_data));
}
}
}
status_update
download_status
}
async fn download_timeline_data<P, S>(
@@ -1170,7 +1181,7 @@ async fn download_timeline_data<P, S>(
new_download_data: SyncData<LayersDownload>,
sync_start: Instant,
task_name: &str,
) -> DownloadMarker
) -> DownloadStatus
where
P: Debug + Send + Sync + 'static,
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
@@ -1199,7 +1210,7 @@ where
Ok(()) => match index.write().await.set_awaits_download(&sync_id, false) {
Ok(()) => {
register_sync_status(sync_id, sync_start, task_name, Some(true));
return DownloadMarker::Downloaded;
return DownloadStatus::Downloaded;
}
Err(e) => {
error!("Timeline {sync_id} was expected to be in the remote index after a successful download, but it's absent: {e:?}");
@@ -1215,7 +1226,7 @@ where
}
}
DownloadMarker::Nothing
DownloadStatus::Nothing
}
async fn update_local_metadata(
@@ -1493,11 +1504,7 @@ async fn validate_task_retries<T>(
return ControlFlow::Break(sync_data);
}
if current_attempt > 0 {
let seconds_to_wait = 2.0_f64.powf(current_attempt as f64 - 1.0).min(30.0);
info!("Waiting {seconds_to_wait} seconds before starting the task");
tokio::time::sleep(Duration::from_secs_f64(seconds_to_wait)).await;
}
exponential_backoff(current_attempt, 1.0, 30.0).await;
ControlFlow::Continue(sync_data)
}

View File

@@ -25,7 +25,11 @@ use etcd_broker::{
use tokio::select;
use tracing::*;
use crate::repository::{Repository, Timeline};
use crate::{
exponential_backoff,
repository::{Repository, Timeline},
DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
};
use crate::{RepositoryImpl, TimelineImpl};
use utils::{
lsn::Lsn,
@@ -230,28 +234,6 @@ async fn subscribe_for_timeline_updates(
}
}
const DEFAULT_BASE_BACKOFF_SECONDS: f64 = 0.1;
const DEFAULT_MAX_BACKOFF_SECONDS: f64 = 3.0;
fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_seconds: f64) -> f64 {
if n == 0 {
0.0
} else {
(1.0 + base_increment).powf(f64::from(n)).min(max_seconds)
}
}
async fn exponential_backoff(n: u32, base_increment: f64, max_seconds: f64) {
let backoff_duration_seconds =
exponential_backoff_duration_seconds(n, base_increment, max_seconds);
if backoff_duration_seconds > 0.0 {
info!(
"Backoff: waiting {backoff_duration_seconds} seconds before proceeding with the task",
);
tokio::time::sleep(Duration::from_secs_f64(backoff_duration_seconds)).await;
}
}
/// All data that's needed to run endless broker loop and keep the WAL streaming connection alive, if possible.
struct WalreceiverState {
id: ZTenantTimelineId,
@@ -1227,34 +1209,3 @@ mod tests {
}
}
}
#[cfg(test)]
mod backoff_defaults_tests {
use super::*;
#[test]
fn backoff_defaults_produce_growing_backoff_sequence() {
let mut current_backoff_value = None;
for i in 0..10_000 {
let new_backoff_value = exponential_backoff_duration_seconds(
i,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
);
if let Some(old_backoff_value) = current_backoff_value.replace(new_backoff_value) {
assert!(
old_backoff_value <= new_backoff_value,
"{i}th backoff value {new_backoff_value} is smaller than the previous one {old_backoff_value}"
)
}
}
assert_eq!(
current_backoff_value.expect("Should have produced backoff values to compare"),
DEFAULT_MAX_BACKOFF_SECONDS,
"Given big enough of retries, backoff should reach its allowed max value"
);
}
}