diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 0e8dfd64c3..03c8f7e84a 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -8,7 +8,7 @@ use std::time::Duration; use anyhow::{Context, Result}; use camino::{Utf8Path, Utf8PathBuf}; use futures::StreamExt; -use futures::stream::FuturesOrdered; +use futures::stream::{self, FuturesOrdered}; use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr; use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo}; use remote_storage::{ @@ -723,8 +723,6 @@ pub async fn copy_s3_segments( from_segment: XLogSegNo, to_segment: XLogSegNo, ) -> Result<()> { - const SEGMENTS_PROGRESS_REPORT_INTERVAL: u64 = 1024; - let remote_dst_path = remote_timeline_path(dst_ttid)?; let cancel = CancellationToken::new(); @@ -744,27 +742,69 @@ pub async fn copy_s3_segments( .filter_map(|o| o.key.object_name().map(ToOwned::to_owned)) .collect::>(); - debug!( + info!( "these segments have already been uploaded: {:?}", uploaded_segments ); - for segno in from_segment..to_segment { - if segno % SEGMENTS_PROGRESS_REPORT_INTERVAL == 0 { - info!("copied all segments from {} until {}", from_segment, segno); - } + /* BEGIN_HADRON */ + // Copying multiple segments async. + let mut copy_stream = stream::iter(from_segment..to_segment) + .map(|segno| { + let segment_name = XLogFileName(PG_TLI, segno, wal_seg_size); + let remote_dst_path = remote_dst_path.clone(); + let cancel = cancel.clone(); - let segment_name = XLogFileName(PG_TLI, segno, wal_seg_size); - if uploaded_segments.contains(&segment_name) { - continue; - } - debug!("copying segment {}", segment_name); + async move { + if uploaded_segments.contains(&segment_name) { + return Ok(()); + } - let from = remote_timeline_path(src_ttid)?.join(&segment_name); - let to = remote_dst_path.join(&segment_name); + if segno % 1000 == 0 { + info!("copying segment {} {}", segno, segment_name); + } - storage.copy_object(&from, &to, &cancel).await?; + let from = remote_timeline_path(src_ttid)?.join(&segment_name); + let to = remote_dst_path.join(&segment_name); + + // Retry logic: retry up to 10 times with 1 second delay + let mut retry_count = 0; + const MAX_RETRIES: u32 = 10; + + loop { + match storage.copy_object(&from, &to, &cancel).await { + Ok(()) => return Ok(()), + Err(e) => { + if cancel.is_cancelled() { + // Don't retry if cancellation was requested + return Err(e); + } + + retry_count += 1; + if retry_count >= MAX_RETRIES { + error!( + "Failed to copy segment {} after {} retries: {}", + segment_name, MAX_RETRIES, e + ); + return Err(e); + } + warn!( + "Failed to copy segment {} (attempt {}/{}): {}, retrying...", + segment_name, retry_count, MAX_RETRIES, e + ); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } + } + }) + .buffer_unordered(32); // Limit to 32 concurrent uploads + + // Process results, stopping on first error + while let Some(result) = copy_stream.next().await { + result?; } + /* END_HADRON */ info!( "finished copying segments from {} until {}",