diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 3699a2a74c..fecbb8bd41 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -108,6 +108,9 @@ struct Args { /// available to the system. #[arg(long)] wal_backup_threads: Option, + /// Number of max parallel WAL segments to be offloaded to remote storage. + #[arg(long, default_value = "5")] + wal_backup_parallel_jobs: usize, /// Disable WAL backup to s3. When disabled, safekeeper removes WAL ignoring /// WAL backup horizon. #[arg(long)] @@ -182,6 +185,7 @@ fn main() -> anyhow::Result<()> { max_offloader_lag_bytes: args.max_offloader_lag, backup_runtime_threads: args.wal_backup_threads, wal_backup_enabled: !args.disable_wal_backup, + backup_parallel_jobs: args.wal_backup_parallel_jobs, auth, }; diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index ff621fdbc0..22d6d57e19 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -61,6 +61,7 @@ pub struct SafeKeeperConf { pub remote_storage: Option, pub max_offloader_lag_bytes: u64, pub backup_runtime_threads: Option, + pub backup_parallel_jobs: usize, pub wal_backup_enabled: bool, pub auth: Option>, } @@ -93,6 +94,7 @@ impl SafeKeeperConf { broker_keepalive_interval: Duration::from_secs(5), backup_runtime_threads: None, wal_backup_enabled: true, + backup_parallel_jobs: 1, auth: None, heartbeat_timeout: Duration::new(5, 0), max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES, diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 953c7d0022..4d341a7ef8 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -1,5 +1,7 @@ use anyhow::{Context, Result}; +use futures::stream::FuturesOrdered; +use futures::StreamExt; use tokio::task::JoinHandle; use utils::id::NodeId; @@ -155,8 +157,14 @@ async fn update_task( let timeline_dir = conf.timeline_dir(&ttid); let handle = tokio::spawn( - backup_task_main(ttid, timeline_dir, conf.workdir.clone(), shutdown_rx) - .instrument(info_span!("WAL backup task", ttid = %ttid)), + backup_task_main( + ttid, + timeline_dir, + conf.workdir.clone(), + conf.backup_parallel_jobs, + shutdown_rx, + ) + .instrument(info_span!("WAL backup task", ttid = %ttid)), ); entry.handle = Some(WalBackupTaskHandle { @@ -240,6 +248,7 @@ struct WalBackupTask { timeline_dir: PathBuf, workspace_dir: PathBuf, wal_seg_size: usize, + parallel_jobs: usize, commit_lsn_watch_rx: watch::Receiver, } @@ -248,6 +257,7 @@ async fn backup_task_main( ttid: TenantTimelineId, timeline_dir: PathBuf, workspace_dir: PathBuf, + parallel_jobs: usize, mut shutdown_rx: Receiver<()>, ) { info!("started"); @@ -264,6 +274,7 @@ async fn backup_task_main( timeline: tli, timeline_dir, workspace_dir, + parallel_jobs, }; // task is spinned up only when wal_seg_size already initialized @@ -330,6 +341,7 @@ impl WalBackupTask { self.wal_seg_size, &self.timeline_dir, &self.workspace_dir, + self.parallel_jobs, ) .await { @@ -356,20 +368,49 @@ pub async fn backup_lsn_range( wal_seg_size: usize, timeline_dir: &Path, workspace_dir: &Path, + parallel_jobs: usize, ) -> Result<()> { + if parallel_jobs < 1 { + anyhow::bail!("parallel_jobs must be >= 1"); + } + let start_lsn = *backup_lsn; let segments = get_segments(start_lsn, end_lsn, wal_seg_size); - for s in &segments { - backup_single_segment(s, timeline_dir, workspace_dir) - .await - .with_context(|| format!("offloading segno {}", s.seg_no))?; - let new_backup_lsn = s.end_lsn; - timeline - .set_wal_backup_lsn(new_backup_lsn) - .context("setting wal_backup_lsn")?; - *backup_lsn = new_backup_lsn; + // Pool of concurrent upload tasks. We use `FuturesOrdered` to + // preserve order of uploads, and update `backup_lsn` only after + // all previous uploads are finished. + let mut uploads = FuturesOrdered::new(); + let mut iter = segments.iter(); + + loop { + let added_task = match iter.next() { + Some(s) => { + uploads.push_back(backup_single_segment(s, timeline_dir, workspace_dir)); + true + } + None => false, + }; + + // Wait for the next segment to upload if we don't have any more segments, + // or if we have too many concurrent uploads. + if !added_task || uploads.len() >= parallel_jobs { + let next = uploads.next().await; + if let Some(res) = next { + // next segment uploaded + let segment = res?; + let new_backup_lsn = segment.end_lsn; + timeline + .set_wal_backup_lsn(new_backup_lsn) + .context("setting wal_backup_lsn")?; + *backup_lsn = new_backup_lsn; + } else { + // no more segments to upload + break; + } + } } + info!( "offloaded segnos {:?} up to {}, previous backup_lsn {}", segments.iter().map(|&s| s.seg_no).collect::>(), @@ -383,7 +424,7 @@ async fn backup_single_segment( seg: &Segment, timeline_dir: &Path, workspace_dir: &Path, -) -> Result<()> { +) -> Result { let segment_file_path = seg.file_path(timeline_dir)?; let remote_segment_path = segment_file_path .strip_prefix(workspace_dir) @@ -404,7 +445,7 @@ async fn backup_single_segment( res?; debug!("Backup of {} done", segment_file_path.display()); - Ok(()) + Ok(*seg) } #[derive(Debug, Copy, Clone)]