diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs
index ba912a3702..140260e0d0 100644
--- a/pageserver/src/lib.rs
+++ b/pageserver/src/lib.rs
@@ -93,3 +93,56 @@ pub fn shutdown_pageserver(exit_code: i32) {
info!("Shut down successfully completed");
std::process::exit(exit_code);
}
+
+const DEFAULT_BASE_BACKOFF_SECONDS: f64 = 0.1;
+const DEFAULT_MAX_BACKOFF_SECONDS: f64 = 3.0;
+
+async fn exponential_backoff(n: u32, base_increment: f64, max_seconds: f64) {
+ let backoff_duration_seconds =
+ exponential_backoff_duration_seconds(n, base_increment, max_seconds);
+ if backoff_duration_seconds > 0.0 {
+ info!(
+ "Backoff: waiting {backoff_duration_seconds} seconds before processing with the task",
+ );
+ tokio::time::sleep(std::time::Duration::from_secs_f64(backoff_duration_seconds)).await;
+ }
+}
+
+fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_seconds: f64) -> f64 {
+ if n == 0 {
+ 0.0
+ } else {
+ (1.0 + base_increment).powf(f64::from(n)).min(max_seconds)
+ }
+}
+
+#[cfg(test)]
+mod backoff_defaults_tests {
+ use super::*;
+
+ #[test]
+ fn backoff_defaults_produce_growing_backoff_sequence() {
+ let mut current_backoff_value = None;
+
+ for i in 0..10_000 {
+ let new_backoff_value = exponential_backoff_duration_seconds(
+ i,
+ DEFAULT_BASE_BACKOFF_SECONDS,
+ DEFAULT_MAX_BACKOFF_SECONDS,
+ );
+
+ if let Some(old_backoff_value) = current_backoff_value.replace(new_backoff_value) {
+ assert!(
+ old_backoff_value <= new_backoff_value,
+ "{i}th backoff value {new_backoff_value} is smaller than the previous one {old_backoff_value}"
+ )
+ }
+ }
+
+ assert_eq!(
+ current_backoff_value.expect("Should have produced backoff values to compare"),
+ DEFAULT_MAX_BACKOFF_SECONDS,
+ "Given big enough of retries, backoff should reach its allowed max value"
+ );
+ }
+}
diff --git a/pageserver/src/storage_sync.rs b/pageserver/src/storage_sync.rs
index 222a406c81..d1c8922259 100644
--- a/pageserver/src/storage_sync.rs
+++ b/pageserver/src/storage_sync.rs
@@ -172,6 +172,7 @@ use self::{
};
use crate::{
config::PageServerConf,
+ exponential_backoff,
layered_repository::{
ephemeral_file::is_ephemeral_file,
metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME},
@@ -969,14 +970,19 @@ fn storage_sync_loop
(
}
}
-// needed to check whether the download happened
-// more informative than just a bool
#[derive(Debug)]
-enum DownloadMarker {
+enum DownloadStatus {
Downloaded,
Nothing,
}
+#[derive(Debug)]
+enum UploadStatus {
+ Uploaded,
+ Failed,
+ Nothing,
+}
+
async fn process_batches
(
conf: &'static PageServerConf,
max_sync_errors: NonZeroU32,
@@ -1016,7 +1022,7 @@ where
"Finished storage sync task for sync id {sync_id} download marker {:?}",
download_marker
);
- if matches!(download_marker, DownloadMarker::Downloaded) {
+ if matches!(download_marker, DownloadStatus::Downloaded) {
downloaded_timelines.insert(sync_id.tenant_id);
}
}
@@ -1030,7 +1036,7 @@ async fn process_sync_task_batch
(
max_sync_errors: NonZeroU32,
sync_id: ZTenantTimelineId,
batch: SyncTaskBatch,
-) -> DownloadMarker
+) -> DownloadStatus
where
P: Debug + Send + Sync + 'static,
S: RemoteStorage + Send + Sync + 'static,
@@ -1047,7 +1053,7 @@ where
// When operating in a system without tasks failing over the error threshold,
// current batching and task processing systems aim to update the layer set and metadata files (remote and local),
// without "losing" such layer files.
- let (upload_result, status_update) = tokio::join!(
+ let (upload_status, download_status) = tokio::join!(
async {
if let Some(upload_data) = upload_data {
match validate_task_retries(upload_data, max_sync_errors)
@@ -1065,7 +1071,7 @@ where
"upload",
)
.await;
- return Some(());
+ UploadStatus::Uploaded
}
ControlFlow::Break(failed_upload_data) => {
if let Err(e) = update_remote_data(
@@ -1082,10 +1088,13 @@ where
{
error!("Failed to update remote timeline {sync_id}: {e:?}");
}
+
+ UploadStatus::Failed
}
}
+ } else {
+ UploadStatus::Nothing
}
- None
}
.instrument(info_span!("upload_timeline_data")),
async {
@@ -1115,51 +1124,53 @@ where
}
}
}
- DownloadMarker::Nothing
+ DownloadStatus::Nothing
}
.instrument(info_span!("download_timeline_data")),
);
- if let Some(mut delete_data) = batch.delete {
- if upload_result.is_some() {
- match validate_task_retries(delete_data, max_sync_errors)
- .instrument(info_span!("retries_validation"))
- .await
- {
- ControlFlow::Continue(new_delete_data) => {
- delete_timeline_data(
- conf,
- (storage.as_ref(), &index, sync_queue),
- sync_id,
- new_delete_data,
- sync_start,
- "delete",
- )
- .instrument(info_span!("delete_timeline_data"))
- .await;
- }
- ControlFlow::Break(failed_delete_data) => {
- if let Err(e) = update_remote_data(
- conf,
- storage.as_ref(),
- &index,
- sync_id,
- RemoteDataUpdate::Delete(&failed_delete_data.data.deleted_layers),
- )
+ if let Some(delete_data) = batch.delete {
+ match upload_status {
+ UploadStatus::Uploaded | UploadStatus::Nothing => {
+ match validate_task_retries(delete_data, max_sync_errors)
+ .instrument(info_span!("retries_validation"))
.await
- {
- error!("Failed to update remote timeline {sync_id}: {e:?}");
+ {
+ ControlFlow::Continue(new_delete_data) => {
+ delete_timeline_data(
+ conf,
+ (storage.as_ref(), &index, sync_queue),
+ sync_id,
+ new_delete_data,
+ sync_start,
+ "delete",
+ )
+ .instrument(info_span!("delete_timeline_data"))
+ .await;
+ }
+ ControlFlow::Break(failed_delete_data) => {
+ if let Err(e) = update_remote_data(
+ conf,
+ storage.as_ref(),
+ &index,
+ sync_id,
+ RemoteDataUpdate::Delete(&failed_delete_data.data.deleted_layers),
+ )
+ .await
+ {
+ error!("Failed to update remote timeline {sync_id}: {e:?}");
+ }
}
}
}
- } else {
- delete_data.retries += 1;
- sync_queue.push(sync_id, SyncTask::Delete(delete_data));
- warn!("Skipping delete task due to failed upload tasks, reenqueuing");
+ UploadStatus::Failed => {
+ warn!("Skipping delete task due to failed upload tasks, reenqueuing");
+ sync_queue.push(sync_id, SyncTask::Delete(delete_data));
+ }
}
}
- status_update
+ download_status
}
async fn download_timeline_data(
@@ -1170,7 +1181,7 @@ async fn download_timeline_data
(
new_download_data: SyncData,
sync_start: Instant,
task_name: &str,
-) -> DownloadMarker
+) -> DownloadStatus
where
P: Debug + Send + Sync + 'static,
S: RemoteStorage + Send + Sync + 'static,
@@ -1199,7 +1210,7 @@ where
Ok(()) => match index.write().await.set_awaits_download(&sync_id, false) {
Ok(()) => {
register_sync_status(sync_id, sync_start, task_name, Some(true));
- return DownloadMarker::Downloaded;
+ return DownloadStatus::Downloaded;
}
Err(e) => {
error!("Timeline {sync_id} was expected to be in the remote index after a successful download, but it's absent: {e:?}");
@@ -1215,7 +1226,7 @@ where
}
}
- DownloadMarker::Nothing
+ DownloadStatus::Nothing
}
async fn update_local_metadata(
@@ -1493,11 +1504,7 @@ async fn validate_task_retries(
return ControlFlow::Break(sync_data);
}
- if current_attempt > 0 {
- let seconds_to_wait = 2.0_f64.powf(current_attempt as f64 - 1.0).min(30.0);
- info!("Waiting {seconds_to_wait} seconds before starting the task");
- tokio::time::sleep(Duration::from_secs_f64(seconds_to_wait)).await;
- }
+ exponential_backoff(current_attempt, 1.0, 30.0).await;
ControlFlow::Continue(sync_data)
}
diff --git a/pageserver/src/walreceiver/connection_manager.rs b/pageserver/src/walreceiver/connection_manager.rs
index 5b6a211566..09142c4d44 100644
--- a/pageserver/src/walreceiver/connection_manager.rs
+++ b/pageserver/src/walreceiver/connection_manager.rs
@@ -25,7 +25,11 @@ use etcd_broker::{
use tokio::select;
use tracing::*;
-use crate::repository::{Repository, Timeline};
+use crate::{
+ exponential_backoff,
+ repository::{Repository, Timeline},
+ DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
+};
use crate::{RepositoryImpl, TimelineImpl};
use utils::{
lsn::Lsn,
@@ -230,28 +234,6 @@ async fn subscribe_for_timeline_updates(
}
}
-const DEFAULT_BASE_BACKOFF_SECONDS: f64 = 0.1;
-const DEFAULT_MAX_BACKOFF_SECONDS: f64 = 3.0;
-
-fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_seconds: f64) -> f64 {
- if n == 0 {
- 0.0
- } else {
- (1.0 + base_increment).powf(f64::from(n)).min(max_seconds)
- }
-}
-
-async fn exponential_backoff(n: u32, base_increment: f64, max_seconds: f64) {
- let backoff_duration_seconds =
- exponential_backoff_duration_seconds(n, base_increment, max_seconds);
- if backoff_duration_seconds > 0.0 {
- info!(
- "Backoff: waiting {backoff_duration_seconds} seconds before proceeding with the task",
- );
- tokio::time::sleep(Duration::from_secs_f64(backoff_duration_seconds)).await;
- }
-}
-
/// All data that's needed to run endless broker loop and keep the WAL streaming connection alive, if possible.
struct WalreceiverState {
id: ZTenantTimelineId,
@@ -1227,34 +1209,3 @@ mod tests {
}
}
}
-
-#[cfg(test)]
-mod backoff_defaults_tests {
- use super::*;
-
- #[test]
- fn backoff_defaults_produce_growing_backoff_sequence() {
- let mut current_backoff_value = None;
-
- for i in 0..10_000 {
- let new_backoff_value = exponential_backoff_duration_seconds(
- i,
- DEFAULT_BASE_BACKOFF_SECONDS,
- DEFAULT_MAX_BACKOFF_SECONDS,
- );
-
- if let Some(old_backoff_value) = current_backoff_value.replace(new_backoff_value) {
- assert!(
- old_backoff_value <= new_backoff_value,
- "{i}th backoff value {new_backoff_value} is smaller than the previous one {old_backoff_value}"
- )
- }
- }
-
- assert_eq!(
- current_backoff_value.expect("Should have produced backoff values to compare"),
- DEFAULT_MAX_BACKOFF_SECONDS,
- "Given big enough of retries, backoff should reach its allowed max value"
- );
- }
-}