Allow parallel backup in safekeepers (#4177)

Add `wal_backup_parallel_jobs` cmdline argument to specify the max count
of parallel segments upload. New default value is 5, meaning that
safekeepers will try to upload 5 segments concurrently if they are
available. Setting this value to 1 will be equivalent to the sequential
upload that we had before.

Part of the https://github.com/neondatabase/neon/issues/3957
This commit is contained in:
Arthur Petukhovsky
2023-05-09 12:20:35 +03:00
committed by GitHub
parent 4bd7b1daf2
commit d62315327a
3 changed files with 60 additions and 13 deletions

View File

@@ -108,6 +108,9 @@ struct Args {
/// available to the system.
#[arg(long)]
wal_backup_threads: Option<usize>,
/// Number of max parallel WAL segments to be offloaded to remote storage.
#[arg(long, default_value = "5")]
wal_backup_parallel_jobs: usize,
/// Disable WAL backup to s3. When disabled, safekeeper removes WAL ignoring
/// WAL backup horizon.
#[arg(long)]
@@ -182,6 +185,7 @@ fn main() -> anyhow::Result<()> {
max_offloader_lag_bytes: args.max_offloader_lag,
backup_runtime_threads: args.wal_backup_threads,
wal_backup_enabled: !args.disable_wal_backup,
backup_parallel_jobs: args.wal_backup_parallel_jobs,
auth,
};

View File

@@ -61,6 +61,7 @@ pub struct SafeKeeperConf {
pub remote_storage: Option<RemoteStorageConfig>,
pub max_offloader_lag_bytes: u64,
pub backup_runtime_threads: Option<usize>,
pub backup_parallel_jobs: usize,
pub wal_backup_enabled: bool,
pub auth: Option<Arc<JwtAuth>>,
}
@@ -93,6 +94,7 @@ impl SafeKeeperConf {
broker_keepalive_interval: Duration::from_secs(5),
backup_runtime_threads: None,
wal_backup_enabled: true,
backup_parallel_jobs: 1,
auth: None,
heartbeat_timeout: Duration::new(5, 0),
max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES,

View File

@@ -1,5 +1,7 @@
use anyhow::{Context, Result};
use futures::stream::FuturesOrdered;
use futures::StreamExt;
use tokio::task::JoinHandle;
use utils::id::NodeId;
@@ -155,8 +157,14 @@ async fn update_task(
let timeline_dir = conf.timeline_dir(&ttid);
let handle = tokio::spawn(
backup_task_main(ttid, timeline_dir, conf.workdir.clone(), shutdown_rx)
.instrument(info_span!("WAL backup task", ttid = %ttid)),
backup_task_main(
ttid,
timeline_dir,
conf.workdir.clone(),
conf.backup_parallel_jobs,
shutdown_rx,
)
.instrument(info_span!("WAL backup task", ttid = %ttid)),
);
entry.handle = Some(WalBackupTaskHandle {
@@ -240,6 +248,7 @@ struct WalBackupTask {
timeline_dir: PathBuf,
workspace_dir: PathBuf,
wal_seg_size: usize,
parallel_jobs: usize,
commit_lsn_watch_rx: watch::Receiver<Lsn>,
}
@@ -248,6 +257,7 @@ async fn backup_task_main(
ttid: TenantTimelineId,
timeline_dir: PathBuf,
workspace_dir: PathBuf,
parallel_jobs: usize,
mut shutdown_rx: Receiver<()>,
) {
info!("started");
@@ -264,6 +274,7 @@ async fn backup_task_main(
timeline: tli,
timeline_dir,
workspace_dir,
parallel_jobs,
};
// task is spinned up only when wal_seg_size already initialized
@@ -330,6 +341,7 @@ impl WalBackupTask {
self.wal_seg_size,
&self.timeline_dir,
&self.workspace_dir,
self.parallel_jobs,
)
.await
{
@@ -356,20 +368,49 @@ pub async fn backup_lsn_range(
wal_seg_size: usize,
timeline_dir: &Path,
workspace_dir: &Path,
parallel_jobs: usize,
) -> Result<()> {
if parallel_jobs < 1 {
anyhow::bail!("parallel_jobs must be >= 1");
}
let start_lsn = *backup_lsn;
let segments = get_segments(start_lsn, end_lsn, wal_seg_size);
for s in &segments {
backup_single_segment(s, timeline_dir, workspace_dir)
.await
.with_context(|| format!("offloading segno {}", s.seg_no))?;
let new_backup_lsn = s.end_lsn;
timeline
.set_wal_backup_lsn(new_backup_lsn)
.context("setting wal_backup_lsn")?;
*backup_lsn = new_backup_lsn;
// Pool of concurrent upload tasks. We use `FuturesOrdered` to
// preserve order of uploads, and update `backup_lsn` only after
// all previous uploads are finished.
let mut uploads = FuturesOrdered::new();
let mut iter = segments.iter();
loop {
let added_task = match iter.next() {
Some(s) => {
uploads.push_back(backup_single_segment(s, timeline_dir, workspace_dir));
true
}
None => false,
};
// Wait for the next segment to upload if we don't have any more segments,
// or if we have too many concurrent uploads.
if !added_task || uploads.len() >= parallel_jobs {
let next = uploads.next().await;
if let Some(res) = next {
// next segment uploaded
let segment = res?;
let new_backup_lsn = segment.end_lsn;
timeline
.set_wal_backup_lsn(new_backup_lsn)
.context("setting wal_backup_lsn")?;
*backup_lsn = new_backup_lsn;
} else {
// no more segments to upload
break;
}
}
}
info!(
"offloaded segnos {:?} up to {}, previous backup_lsn {}",
segments.iter().map(|&s| s.seg_no).collect::<Vec<_>>(),
@@ -383,7 +424,7 @@ async fn backup_single_segment(
seg: &Segment,
timeline_dir: &Path,
workspace_dir: &Path,
) -> Result<()> {
) -> Result<Segment> {
let segment_file_path = seg.file_path(timeline_dir)?;
let remote_segment_path = segment_file_path
.strip_prefix(workspace_dir)
@@ -404,7 +445,7 @@ async fn backup_single_segment(
res?;
debug!("Backup of {} done", segment_file_path.display());
Ok(())
Ok(*seg)
}
#[derive(Debug, Copy, Clone)]