mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-03 19:42:55 +00:00
safekeeper: parallelise segment copy (#12664)
Parallelise segment copying on the SK. I'm not aware of the neon deployment using this endpoint.
This commit is contained in:
@@ -8,7 +8,7 @@ use std::time::Duration;
|
||||
use anyhow::{Context, Result};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use futures::StreamExt;
|
||||
use futures::stream::FuturesOrdered;
|
||||
use futures::stream::{self, FuturesOrdered};
|
||||
use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr;
|
||||
use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo};
|
||||
use remote_storage::{
|
||||
@@ -723,8 +723,6 @@ pub async fn copy_s3_segments(
|
||||
from_segment: XLogSegNo,
|
||||
to_segment: XLogSegNo,
|
||||
) -> Result<()> {
|
||||
const SEGMENTS_PROGRESS_REPORT_INTERVAL: u64 = 1024;
|
||||
|
||||
let remote_dst_path = remote_timeline_path(dst_ttid)?;
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
@@ -744,27 +742,69 @@ pub async fn copy_s3_segments(
|
||||
.filter_map(|o| o.key.object_name().map(ToOwned::to_owned))
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
debug!(
|
||||
info!(
|
||||
"these segments have already been uploaded: {:?}",
|
||||
uploaded_segments
|
||||
);
|
||||
|
||||
for segno in from_segment..to_segment {
|
||||
if segno % SEGMENTS_PROGRESS_REPORT_INTERVAL == 0 {
|
||||
info!("copied all segments from {} until {}", from_segment, segno);
|
||||
}
|
||||
/* BEGIN_HADRON */
|
||||
// Copying multiple segments async.
|
||||
let mut copy_stream = stream::iter(from_segment..to_segment)
|
||||
.map(|segno| {
|
||||
let segment_name = XLogFileName(PG_TLI, segno, wal_seg_size);
|
||||
let remote_dst_path = remote_dst_path.clone();
|
||||
let cancel = cancel.clone();
|
||||
|
||||
let segment_name = XLogFileName(PG_TLI, segno, wal_seg_size);
|
||||
if uploaded_segments.contains(&segment_name) {
|
||||
continue;
|
||||
}
|
||||
debug!("copying segment {}", segment_name);
|
||||
async move {
|
||||
if uploaded_segments.contains(&segment_name) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let from = remote_timeline_path(src_ttid)?.join(&segment_name);
|
||||
let to = remote_dst_path.join(&segment_name);
|
||||
if segno % 1000 == 0 {
|
||||
info!("copying segment {} {}", segno, segment_name);
|
||||
}
|
||||
|
||||
storage.copy_object(&from, &to, &cancel).await?;
|
||||
let from = remote_timeline_path(src_ttid)?.join(&segment_name);
|
||||
let to = remote_dst_path.join(&segment_name);
|
||||
|
||||
// Retry logic: retry up to 10 times with 1 second delay
|
||||
let mut retry_count = 0;
|
||||
const MAX_RETRIES: u32 = 10;
|
||||
|
||||
loop {
|
||||
match storage.copy_object(&from, &to, &cancel).await {
|
||||
Ok(()) => return Ok(()),
|
||||
Err(e) => {
|
||||
if cancel.is_cancelled() {
|
||||
// Don't retry if cancellation was requested
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
retry_count += 1;
|
||||
if retry_count >= MAX_RETRIES {
|
||||
error!(
|
||||
"Failed to copy segment {} after {} retries: {}",
|
||||
segment_name, MAX_RETRIES, e
|
||||
);
|
||||
return Err(e);
|
||||
}
|
||||
warn!(
|
||||
"Failed to copy segment {} (attempt {}/{}): {}, retrying...",
|
||||
segment_name, retry_count, MAX_RETRIES, e
|
||||
);
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.buffer_unordered(32); // Limit to 32 concurrent uploads
|
||||
|
||||
// Process results, stopping on first error
|
||||
while let Some(result) = copy_stream.next().await {
|
||||
result?;
|
||||
}
|
||||
/* END_HADRON */
|
||||
|
||||
info!(
|
||||
"finished copying segments from {} until {}",
|
||||
|
||||
Reference in New Issue
Block a user