safekeeper: refactor static remote storage usage to use Arc (#10179)

Greetings! Please add `w=1` to github url when viewing diff
(sepcifically `wal_backup.rs`)

## Problem

This PR is aimed at addressing the remaining work of #8200. Namely,
removing static usage of remote storage in favour of arc. I did not opt
to pass `Arc<RemoteStorage>` directly since it is actually
`Optional<RemoteStorage>` as it is not necessarily always configured. I
wanted to avoid having to pass `Arc<Optional<RemoteStorage>>` everywhere
with individual consuming functions likely needing to handle unwrapping.

Instead I've added a `WalBackup` struct that holds
`Optional<RemoteStorage>` and handles initialization/unwrapping
RemoteStorage internally. wal_backup functions now take self and
`Arc<WalBackup>` is passed as a dependency through the various consumers
that need it.

## Summary of changes
- Add `WalBackup` that holds `Optional<RemoteStorage>` and handles
initialization and unwrapping
- Modify wal_backup functions to take `WalBackup` as self (Add `w=1` to
github url when viewing diff here)
- Initialize `WalBackup` in safekeeper root
- Store `Arc<WalBackup>` in `GlobalTimelineMap` and pass and store in
each Timeline as loaded
- use `WalBackup` through Timeline as needed

## Refs

- task to remove global variables
https://github.com/neondatabase/neon/issues/8200
- drive-by fixes https://github.com/neondatabase/neon/issues/11501 
by turning the panic reported there into an error `remote storage not
configured`

---------

Co-authored-by: Christian Schwarz <christian@neon.tech>
This commit is contained in:
Evan Fleming
2025-05-16 05:41:10 -07:00
committed by GitHub
parent 2d247375b3
commit aa22572d8c
13 changed files with 255 additions and 113 deletions

View File

@@ -22,9 +22,10 @@ use safekeeper::defaults::{
DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR, DEFAULT_SSL_CERT_FILE, DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR, DEFAULT_SSL_CERT_FILE,
DEFAULT_SSL_CERT_RELOAD_PERIOD, DEFAULT_SSL_KEY_FILE, DEFAULT_SSL_CERT_RELOAD_PERIOD, DEFAULT_SSL_KEY_FILE,
}; };
use safekeeper::wal_backup::WalBackup;
use safekeeper::{ use safekeeper::{
BACKGROUND_RUNTIME, BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf, BACKGROUND_RUNTIME, BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf,
WAL_SERVICE_RUNTIME, broker, control_file, http, wal_backup, wal_service, WAL_SERVICE_RUNTIME, broker, control_file, http, wal_service,
}; };
use sd_notify::NotifyState; use sd_notify::NotifyState;
use storage_broker::{DEFAULT_ENDPOINT, Uri}; use storage_broker::{DEFAULT_ENDPOINT, Uri};
@@ -484,15 +485,15 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
None => None, None => None,
}; };
let global_timelines = Arc::new(GlobalTimelines::new(conf.clone())); let wal_backup = Arc::new(WalBackup::new(&conf).await?);
let global_timelines = Arc::new(GlobalTimelines::new(conf.clone(), wal_backup.clone()));
// Register metrics collector for active timelines. It's important to do this // Register metrics collector for active timelines. It's important to do this
// after daemonizing, otherwise process collector will be upset. // after daemonizing, otherwise process collector will be upset.
let timeline_collector = safekeeper::metrics::TimelineCollector::new(global_timelines.clone()); let timeline_collector = safekeeper::metrics::TimelineCollector::new(global_timelines.clone());
metrics::register_internal(Box::new(timeline_collector))?; metrics::register_internal(Box::new(timeline_collector))?;
wal_backup::init_remote_storage(&conf).await;
// Keep handles to main tasks to die if any of them disappears. // Keep handles to main tasks to die if any of them disappears.
let mut tasks_handles: FuturesUnordered<BoxFuture<(String, JoinTaskRes)>> = let mut tasks_handles: FuturesUnordered<BoxFuture<(String, JoinTaskRes)>> =
FuturesUnordered::new(); FuturesUnordered::new();

View File

@@ -3,6 +3,7 @@ use std::sync::Arc;
use anyhow::{Result, bail}; use anyhow::{Result, bail};
use camino::Utf8PathBuf; use camino::Utf8PathBuf;
use postgres_ffi::{MAX_SEND_SIZE, WAL_SEGMENT_SIZE}; use postgres_ffi::{MAX_SEND_SIZE, WAL_SEGMENT_SIZE};
use remote_storage::GenericRemoteStorage;
use safekeeper_api::membership::Configuration; use safekeeper_api::membership::Configuration;
use tokio::fs::OpenOptions; use tokio::fs::OpenOptions;
use tokio::io::{AsyncSeekExt, AsyncWriteExt}; use tokio::io::{AsyncSeekExt, AsyncWriteExt};
@@ -30,6 +31,7 @@ pub struct Request {
pub async fn handle_request( pub async fn handle_request(
request: Request, request: Request,
global_timelines: Arc<GlobalTimelines>, global_timelines: Arc<GlobalTimelines>,
storage: Arc<GenericRemoteStorage>,
) -> Result<()> { ) -> Result<()> {
// TODO: request.until_lsn MUST be a valid LSN, and we cannot check it :( // TODO: request.until_lsn MUST be a valid LSN, and we cannot check it :(
// if LSN will point to the middle of a WAL record, timeline will be in "broken" state // if LSN will point to the middle of a WAL record, timeline will be in "broken" state
@@ -127,6 +129,7 @@ pub async fn handle_request(
assert!(first_ondisk_segment >= first_segment); assert!(first_ondisk_segment >= first_segment);
copy_s3_segments( copy_s3_segments(
&storage,
wal_seg_size, wal_seg_size,
&request.source_ttid, &request.source_ttid,
&request.destination_ttid, &request.destination_ttid,

View File

@@ -258,6 +258,7 @@ async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Bo
let global_timelines = get_global_timelines(&request); let global_timelines = get_global_timelines(&request);
let tli = global_timelines.get(ttid).map_err(ApiError::from)?; let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
let storage = global_timelines.get_wal_backup().get_storage();
// To stream the body use wrap_stream which wants Stream of Result<Bytes>, // To stream the body use wrap_stream which wants Stream of Result<Bytes>,
// so create the chan and write to it in another task. // so create the chan and write to it in another task.
@@ -269,6 +270,7 @@ async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Bo
conf.my_id, conf.my_id,
destination, destination,
tx, tx,
storage,
)); ));
let rx_stream = ReceiverStream::new(rx); let rx_stream = ReceiverStream::new(rx);
@@ -390,12 +392,18 @@ async fn timeline_copy_handler(mut request: Request<Body>) -> Result<Response<Bo
); );
let global_timelines = get_global_timelines(&request); let global_timelines = get_global_timelines(&request);
let wal_backup = global_timelines.get_wal_backup();
let storage = wal_backup
.get_storage()
.ok_or(ApiError::BadRequest(anyhow::anyhow!(
"Remote Storage is not configured"
)))?;
copy_timeline::handle_request(copy_timeline::Request{ copy_timeline::handle_request(copy_timeline::Request{
source_ttid, source_ttid,
until_lsn: request_data.until_lsn, until_lsn: request_data.until_lsn,
destination_ttid: TenantTimelineId::new(source_ttid.tenant_id, request_data.target_timeline_id), destination_ttid: TenantTimelineId::new(source_ttid.tenant_id, request_data.target_timeline_id),
}, global_timelines) }, global_timelines, storage)
.instrument(info_span!("copy_timeline", from=%source_ttid, to=%request_data.target_timeline_id, until_lsn=%request_data.until_lsn)) .instrument(info_span!("copy_timeline", from=%source_ttid, to=%request_data.target_timeline_id, until_lsn=%request_data.until_lsn))
.await .await
.map_err(ApiError::InternalServerError)?; .map_err(ApiError::InternalServerError)?;

View File

@@ -125,12 +125,6 @@ pub struct SafeKeeperConf {
pub enable_tls_wal_service_api: bool, pub enable_tls_wal_service_api: bool,
} }
impl SafeKeeperConf {
pub fn is_wal_backup_enabled(&self) -> bool {
self.remote_storage.is_some() && self.wal_backup_enabled
}
}
impl SafeKeeperConf { impl SafeKeeperConf {
pub fn dummy() -> Self { pub fn dummy() -> Self {
SafeKeeperConf { SafeKeeperConf {

View File

@@ -9,6 +9,7 @@ use chrono::{DateTime, Utc};
use futures::{SinkExt, StreamExt, TryStreamExt}; use futures::{SinkExt, StreamExt, TryStreamExt};
use http_utils::error::ApiError; use http_utils::error::ApiError;
use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo}; use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo};
use remote_storage::GenericRemoteStorage;
use reqwest::Certificate; use reqwest::Certificate;
use safekeeper_api::Term; use safekeeper_api::Term;
use safekeeper_api::models::{PullTimelineRequest, PullTimelineResponse, TimelineStatus}; use safekeeper_api::models::{PullTimelineRequest, PullTimelineResponse, TimelineStatus};
@@ -43,6 +44,7 @@ pub async fn stream_snapshot(
source: NodeId, source: NodeId,
destination: NodeId, destination: NodeId,
tx: mpsc::Sender<Result<Bytes>>, tx: mpsc::Sender<Result<Bytes>>,
storage: Option<Arc<GenericRemoteStorage>>,
) { ) {
match tli.try_wal_residence_guard().await { match tli.try_wal_residence_guard().await {
Err(e) => { Err(e) => {
@@ -53,10 +55,32 @@ pub async fn stream_snapshot(
Ok(maybe_resident_tli) => { Ok(maybe_resident_tli) => {
if let Err(e) = match maybe_resident_tli { if let Err(e) = match maybe_resident_tli {
Some(resident_tli) => { Some(resident_tli) => {
stream_snapshot_resident_guts(resident_tli, source, destination, tx.clone()) stream_snapshot_resident_guts(
.await resident_tli,
source,
destination,
tx.clone(),
storage,
)
.await
}
None => {
if let Some(storage) = storage {
stream_snapshot_offloaded_guts(
tli,
source,
destination,
tx.clone(),
&storage,
)
.await
} else {
tx.send(Err(anyhow!("remote storage not configured")))
.await
.ok();
return;
}
} }
None => stream_snapshot_offloaded_guts(tli, source, destination, tx.clone()).await,
} { } {
// Error type/contents don't matter as they won't can't reach the client // Error type/contents don't matter as they won't can't reach the client
// (hyper likely doesn't do anything with it), but http stream will be // (hyper likely doesn't do anything with it), but http stream will be
@@ -123,10 +147,12 @@ pub(crate) async fn stream_snapshot_offloaded_guts(
source: NodeId, source: NodeId,
destination: NodeId, destination: NodeId,
tx: mpsc::Sender<Result<Bytes>>, tx: mpsc::Sender<Result<Bytes>>,
storage: &GenericRemoteStorage,
) -> Result<()> { ) -> Result<()> {
let mut ar = prepare_tar_stream(tx); let mut ar = prepare_tar_stream(tx);
tli.snapshot_offloaded(&mut ar, source, destination).await?; tli.snapshot_offloaded(&mut ar, source, destination, storage)
.await?;
ar.finish().await?; ar.finish().await?;
@@ -139,10 +165,13 @@ pub async fn stream_snapshot_resident_guts(
source: NodeId, source: NodeId,
destination: NodeId, destination: NodeId,
tx: mpsc::Sender<Result<Bytes>>, tx: mpsc::Sender<Result<Bytes>>,
storage: Option<Arc<GenericRemoteStorage>>,
) -> Result<()> { ) -> Result<()> {
let mut ar = prepare_tar_stream(tx); let mut ar = prepare_tar_stream(tx);
let bctx = tli.start_snapshot(&mut ar, source, destination).await?; let bctx = tli
.start_snapshot(&mut ar, source, destination, storage)
.await?;
pausable_failpoint!("sk-snapshot-after-list-pausable"); pausable_failpoint!("sk-snapshot-after-list-pausable");
let tli_dir = tli.get_timeline_dir(); let tli_dir = tli.get_timeline_dir();
@@ -182,6 +211,7 @@ impl Timeline {
ar: &mut tokio_tar::Builder<W>, ar: &mut tokio_tar::Builder<W>,
source: NodeId, source: NodeId,
destination: NodeId, destination: NodeId,
storage: &GenericRemoteStorage,
) -> Result<()> { ) -> Result<()> {
// Take initial copy of control file, then release state lock // Take initial copy of control file, then release state lock
let mut control_file = { let mut control_file = {
@@ -216,6 +246,7 @@ impl Timeline {
// can fail if the timeline was un-evicted and modified in the background. // can fail if the timeline was un-evicted and modified in the background.
let remote_timeline_path = &self.remote_path; let remote_timeline_path = &self.remote_path;
wal_backup::copy_partial_segment( wal_backup::copy_partial_segment(
storage,
&replace.previous.remote_path(remote_timeline_path), &replace.previous.remote_path(remote_timeline_path),
&replace.current.remote_path(remote_timeline_path), &replace.current.remote_path(remote_timeline_path),
) )
@@ -262,6 +293,7 @@ impl WalResidentTimeline {
ar: &mut tokio_tar::Builder<W>, ar: &mut tokio_tar::Builder<W>,
source: NodeId, source: NodeId,
destination: NodeId, destination: NodeId,
storage: Option<Arc<GenericRemoteStorage>>,
) -> Result<SnapshotContext> { ) -> Result<SnapshotContext> {
let mut shared_state = self.write_shared_state().await; let mut shared_state = self.write_shared_state().await;
let wal_seg_size = shared_state.get_wal_seg_size(); let wal_seg_size = shared_state.get_wal_seg_size();
@@ -283,6 +315,7 @@ impl WalResidentTimeline {
let remote_timeline_path = &self.tli.remote_path; let remote_timeline_path = &self.tli.remote_path;
wal_backup::copy_partial_segment( wal_backup::copy_partial_segment(
&*storage.context("remote storage not configured")?,
&replace.previous.remote_path(remote_timeline_path), &replace.previous.remote_path(remote_timeline_path),
&replace.current.remote_path(remote_timeline_path), &replace.current.remote_path(remote_timeline_path),
) )

View File

@@ -18,7 +18,7 @@ use crate::send_wal::EndWatch;
use crate::state::{TimelinePersistentState, TimelineState}; use crate::state::{TimelinePersistentState, TimelineState};
use crate::timeline::{SharedState, StateSK, Timeline, get_timeline_dir}; use crate::timeline::{SharedState, StateSK, Timeline, get_timeline_dir};
use crate::timelines_set::TimelinesSet; use crate::timelines_set::TimelinesSet;
use crate::wal_backup::remote_timeline_path; use crate::wal_backup::{WalBackup, remote_timeline_path};
use crate::{SafeKeeperConf, control_file, receive_wal, wal_storage}; use crate::{SafeKeeperConf, control_file, receive_wal, wal_storage};
/// A Safekeeper testing or benchmarking environment. Uses a tempdir for storage, removed on drop. /// A Safekeeper testing or benchmarking environment. Uses a tempdir for storage, removed on drop.
@@ -101,18 +101,22 @@ impl Env {
let safekeeper = self.make_safekeeper(node_id, ttid, start_lsn).await?; let safekeeper = self.make_safekeeper(node_id, ttid, start_lsn).await?;
let shared_state = SharedState::new(StateSK::Loaded(safekeeper)); let shared_state = SharedState::new(StateSK::Loaded(safekeeper));
let wal_backup = Arc::new(WalBackup::new(&conf).await?);
let timeline = Timeline::new( let timeline = Timeline::new(
ttid, ttid,
&timeline_dir, &timeline_dir,
&remote_path, &remote_path,
shared_state, shared_state,
conf.clone(), conf.clone(),
wal_backup.clone(),
); );
timeline.bootstrap( timeline.bootstrap(
&mut timeline.write_shared_state().await, &mut timeline.write_shared_state().await,
&conf, &conf,
Arc::new(TimelinesSet::default()), // ignored for now Arc::new(TimelinesSet::default()), // ignored for now
RateLimiter::new(0, 0), RateLimiter::new(0, 0),
wal_backup,
); );
Ok(timeline) Ok(timeline)
} }

View File

@@ -35,7 +35,8 @@ use crate::state::{EvictionState, TimelineMemState, TimelinePersistentState, Tim
use crate::timeline_guard::ResidenceGuard; use crate::timeline_guard::ResidenceGuard;
use crate::timeline_manager::{AtomicStatus, ManagerCtl}; use crate::timeline_manager::{AtomicStatus, ManagerCtl};
use crate::timelines_set::TimelinesSet; use crate::timelines_set::TimelinesSet;
use crate::wal_backup::{self, remote_timeline_path}; use crate::wal_backup;
use crate::wal_backup::{WalBackup, remote_timeline_path};
use crate::wal_backup_partial::PartialRemoteSegment; use crate::wal_backup_partial::PartialRemoteSegment;
use crate::wal_storage::{Storage as wal_storage_iface, WalReader}; use crate::wal_storage::{Storage as wal_storage_iface, WalReader};
use crate::{SafeKeeperConf, control_file, debug_dump, timeline_manager, wal_storage}; use crate::{SafeKeeperConf, control_file, debug_dump, timeline_manager, wal_storage};
@@ -452,6 +453,8 @@ pub struct Timeline {
manager_ctl: ManagerCtl, manager_ctl: ManagerCtl,
conf: Arc<SafeKeeperConf>, conf: Arc<SafeKeeperConf>,
pub(crate) wal_backup: Arc<WalBackup>,
remote_deletion: std::sync::Mutex<Option<RemoteDeletionReceiver>>, remote_deletion: std::sync::Mutex<Option<RemoteDeletionReceiver>>,
/// Hold this gate from code that depends on the Timeline's non-shut-down state. While holding /// Hold this gate from code that depends on the Timeline's non-shut-down state. While holding
@@ -476,6 +479,7 @@ impl Timeline {
remote_path: &RemotePath, remote_path: &RemotePath,
shared_state: SharedState, shared_state: SharedState,
conf: Arc<SafeKeeperConf>, conf: Arc<SafeKeeperConf>,
wal_backup: Arc<WalBackup>,
) -> Arc<Self> { ) -> Arc<Self> {
let (commit_lsn_watch_tx, commit_lsn_watch_rx) = let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
watch::channel(shared_state.sk.state().commit_lsn); watch::channel(shared_state.sk.state().commit_lsn);
@@ -509,6 +513,7 @@ impl Timeline {
wal_backup_active: AtomicBool::new(false), wal_backup_active: AtomicBool::new(false),
last_removed_segno: AtomicU64::new(0), last_removed_segno: AtomicU64::new(0),
mgr_status: AtomicStatus::new(), mgr_status: AtomicStatus::new(),
wal_backup,
}) })
} }
@@ -516,6 +521,7 @@ impl Timeline {
pub fn load_timeline( pub fn load_timeline(
conf: Arc<SafeKeeperConf>, conf: Arc<SafeKeeperConf>,
ttid: TenantTimelineId, ttid: TenantTimelineId,
wal_backup: Arc<WalBackup>,
) -> Result<Arc<Timeline>> { ) -> Result<Arc<Timeline>> {
let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered(); let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
@@ -529,6 +535,7 @@ impl Timeline {
&remote_path, &remote_path,
shared_state, shared_state,
conf, conf,
wal_backup,
)) ))
} }
@@ -539,6 +546,7 @@ impl Timeline {
conf: &SafeKeeperConf, conf: &SafeKeeperConf,
broker_active_set: Arc<TimelinesSet>, broker_active_set: Arc<TimelinesSet>,
partial_backup_rate_limiter: RateLimiter, partial_backup_rate_limiter: RateLimiter,
wal_backup: Arc<WalBackup>,
) { ) {
let (tx, rx) = self.manager_ctl.bootstrap_manager(); let (tx, rx) = self.manager_ctl.bootstrap_manager();
@@ -561,6 +569,7 @@ impl Timeline {
tx, tx,
rx, rx,
partial_backup_rate_limiter, partial_backup_rate_limiter,
wal_backup,
) )
.await .await
} }
@@ -606,9 +615,10 @@ impl Timeline {
// it is cancelled, so WAL storage won't be opened again. // it is cancelled, so WAL storage won't be opened again.
shared_state.sk.close_wal_store(); shared_state.sk.close_wal_store();
if !only_local && self.conf.is_wal_backup_enabled() { if !only_local {
self.remote_delete().await?; self.remote_delete().await?;
} }
let dir_existed = delete_dir(&self.timeline_dir).await?; let dir_existed = delete_dir(&self.timeline_dir).await?;
Ok(dir_existed) Ok(dir_existed)
} }
@@ -675,11 +685,20 @@ impl Timeline {
guard: &mut std::sync::MutexGuard<Option<RemoteDeletionReceiver>>, guard: &mut std::sync::MutexGuard<Option<RemoteDeletionReceiver>>,
) -> RemoteDeletionReceiver { ) -> RemoteDeletionReceiver {
tracing::info!("starting remote deletion"); tracing::info!("starting remote deletion");
let storage = self.wal_backup.get_storage().clone();
let (result_tx, result_rx) = tokio::sync::watch::channel(None); let (result_tx, result_rx) = tokio::sync::watch::channel(None);
let ttid = self.ttid; let ttid = self.ttid;
tokio::task::spawn( tokio::task::spawn(
async move { async move {
let r = wal_backup::delete_timeline(&ttid).await; let r = if let Some(storage) = storage {
wal_backup::delete_timeline(&storage, &ttid).await
} else {
tracing::info!(
"skipping remote deletion because no remote storage is configured; this effectively leaks the objects in remote storage"
);
Ok(())
};
if let Err(e) = &r { if let Err(e) = &r {
// Log error here in case nobody ever listens for our result (e.g. dropped API request) // Log error here in case nobody ever listens for our result (e.g. dropped API request)
tracing::error!("remote deletion failed: {e}"); tracing::error!("remote deletion failed: {e}");
@@ -1046,14 +1065,13 @@ impl WalResidentTimeline {
pub async fn get_walreader(&self, start_lsn: Lsn) -> Result<WalReader> { pub async fn get_walreader(&self, start_lsn: Lsn) -> Result<WalReader> {
let (_, persisted_state) = self.get_state().await; let (_, persisted_state) = self.get_state().await;
let enable_remote_read = self.conf.is_wal_backup_enabled();
WalReader::new( WalReader::new(
&self.ttid, &self.ttid,
self.timeline_dir.clone(), self.timeline_dir.clone(),
&persisted_state, &persisted_state,
start_lsn, start_lsn,
enable_remote_read, self.wal_backup.clone(),
) )
} }

View File

@@ -6,7 +6,7 @@
use anyhow::Context; use anyhow::Context;
use camino::Utf8PathBuf; use camino::Utf8PathBuf;
use remote_storage::RemotePath; use remote_storage::{GenericRemoteStorage, RemotePath};
use tokio::fs::File; use tokio::fs::File;
use tokio::io::{AsyncRead, AsyncWriteExt}; use tokio::io::{AsyncRead, AsyncWriteExt};
use tracing::{debug, info, instrument, warn}; use tracing::{debug, info, instrument, warn};
@@ -68,6 +68,10 @@ impl Manager {
#[instrument(name = "evict_timeline", skip_all)] #[instrument(name = "evict_timeline", skip_all)]
pub(crate) async fn evict_timeline(&mut self) -> bool { pub(crate) async fn evict_timeline(&mut self) -> bool {
assert!(!self.is_offloaded); assert!(!self.is_offloaded);
let Some(storage) = self.wal_backup.get_storage() else {
warn!("no remote storage configured, skipping uneviction");
return false;
};
let partial_backup_uploaded = match &self.partial_backup_uploaded { let partial_backup_uploaded = match &self.partial_backup_uploaded {
Some(p) => p.clone(), Some(p) => p.clone(),
None => { None => {
@@ -87,7 +91,7 @@ impl Manager {
.inc(); .inc();
}); });
if let Err(e) = do_eviction(self, &partial_backup_uploaded).await { if let Err(e) = do_eviction(self, &partial_backup_uploaded, &storage).await {
warn!("failed to evict timeline: {:?}", e); warn!("failed to evict timeline: {:?}", e);
return false; return false;
} }
@@ -102,6 +106,10 @@ impl Manager {
#[instrument(name = "unevict_timeline", skip_all)] #[instrument(name = "unevict_timeline", skip_all)]
pub(crate) async fn unevict_timeline(&mut self) { pub(crate) async fn unevict_timeline(&mut self) {
assert!(self.is_offloaded); assert!(self.is_offloaded);
let Some(storage) = self.wal_backup.get_storage() else {
warn!("no remote storage configured, skipping uneviction");
return;
};
let partial_backup_uploaded = match &self.partial_backup_uploaded { let partial_backup_uploaded = match &self.partial_backup_uploaded {
Some(p) => p.clone(), Some(p) => p.clone(),
None => { None => {
@@ -121,7 +129,7 @@ impl Manager {
.inc(); .inc();
}); });
if let Err(e) = do_uneviction(self, &partial_backup_uploaded).await { if let Err(e) = do_uneviction(self, &partial_backup_uploaded, &storage).await {
warn!("failed to unevict timeline: {:?}", e); warn!("failed to unevict timeline: {:?}", e);
return; return;
} }
@@ -137,8 +145,12 @@ impl Manager {
/// Ensure that content matches the remote partial backup, if local segment exists. /// Ensure that content matches the remote partial backup, if local segment exists.
/// Then change state in control file and in-memory. If `delete_offloaded_wal` is set, /// Then change state in control file and in-memory. If `delete_offloaded_wal` is set,
/// delete the local segment. /// delete the local segment.
async fn do_eviction(mgr: &mut Manager, partial: &PartialRemoteSegment) -> anyhow::Result<()> { async fn do_eviction(
compare_local_segment_with_remote(mgr, partial).await?; mgr: &mut Manager,
partial: &PartialRemoteSegment,
storage: &GenericRemoteStorage,
) -> anyhow::Result<()> {
compare_local_segment_with_remote(mgr, partial, storage).await?;
mgr.tli.switch_to_offloaded(partial).await?; mgr.tli.switch_to_offloaded(partial).await?;
// switch manager state as soon as possible // switch manager state as soon as possible
@@ -153,12 +165,16 @@ async fn do_eviction(mgr: &mut Manager, partial: &PartialRemoteSegment) -> anyho
/// Ensure that content matches the remote partial backup, if local segment exists. /// Ensure that content matches the remote partial backup, if local segment exists.
/// Then download segment to local disk and change state in control file and in-memory. /// Then download segment to local disk and change state in control file and in-memory.
async fn do_uneviction(mgr: &mut Manager, partial: &PartialRemoteSegment) -> anyhow::Result<()> { async fn do_uneviction(
mgr: &mut Manager,
partial: &PartialRemoteSegment,
storage: &GenericRemoteStorage,
) -> anyhow::Result<()> {
// if the local segment is present, validate it // if the local segment is present, validate it
compare_local_segment_with_remote(mgr, partial).await?; compare_local_segment_with_remote(mgr, partial, storage).await?;
// atomically download the partial segment // atomically download the partial segment
redownload_partial_segment(mgr, partial).await?; redownload_partial_segment(mgr, partial, storage).await?;
mgr.tli.switch_to_present().await?; mgr.tli.switch_to_present().await?;
// switch manager state as soon as possible // switch manager state as soon as possible
@@ -181,6 +197,7 @@ async fn delete_local_segment(mgr: &Manager, partial: &PartialRemoteSegment) ->
async fn redownload_partial_segment( async fn redownload_partial_segment(
mgr: &Manager, mgr: &Manager,
partial: &PartialRemoteSegment, partial: &PartialRemoteSegment,
storage: &GenericRemoteStorage,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let tmp_file = mgr.tli.timeline_dir().join("remote_partial.tmp"); let tmp_file = mgr.tli.timeline_dir().join("remote_partial.tmp");
let remote_segfile = remote_segment_path(mgr, partial); let remote_segfile = remote_segment_path(mgr, partial);
@@ -190,7 +207,7 @@ async fn redownload_partial_segment(
remote_segfile, tmp_file remote_segfile, tmp_file
); );
let mut reader = wal_backup::read_object(&remote_segfile, 0).await?; let mut reader = wal_backup::read_object(storage, &remote_segfile, 0).await?;
let mut file = File::create(&tmp_file).await?; let mut file = File::create(&tmp_file).await?;
let actual_len = tokio::io::copy(&mut reader, &mut file).await?; let actual_len = tokio::io::copy(&mut reader, &mut file).await?;
@@ -234,13 +251,16 @@ async fn redownload_partial_segment(
async fn compare_local_segment_with_remote( async fn compare_local_segment_with_remote(
mgr: &Manager, mgr: &Manager,
partial: &PartialRemoteSegment, partial: &PartialRemoteSegment,
storage: &GenericRemoteStorage,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let local_path = local_segment_path(mgr, partial); let local_path = local_segment_path(mgr, partial);
match File::open(&local_path).await { match File::open(&local_path).await {
Ok(mut local_file) => do_validation(mgr, &mut local_file, mgr.wal_seg_size, partial) Ok(mut local_file) => {
.await do_validation(mgr, &mut local_file, mgr.wal_seg_size, partial, storage)
.context("validation failed"), .await
.context("validation failed")
}
Err(_) => { Err(_) => {
info!( info!(
"local WAL file {} is not present, skipping validation", "local WAL file {} is not present, skipping validation",
@@ -258,6 +278,7 @@ async fn do_validation(
file: &mut File, file: &mut File,
wal_seg_size: usize, wal_seg_size: usize,
partial: &PartialRemoteSegment, partial: &PartialRemoteSegment,
storage: &GenericRemoteStorage,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let local_size = file.metadata().await?.len() as usize; let local_size = file.metadata().await?.len() as usize;
if local_size != wal_seg_size { if local_size != wal_seg_size {
@@ -270,7 +291,7 @@ async fn do_validation(
let remote_segfile = remote_segment_path(mgr, partial); let remote_segfile = remote_segment_path(mgr, partial);
let mut remote_reader: std::pin::Pin<Box<dyn AsyncRead + Send + Sync>> = let mut remote_reader: std::pin::Pin<Box<dyn AsyncRead + Send + Sync>> =
wal_backup::read_object(&remote_segfile, 0).await?; wal_backup::read_object(storage, &remote_segfile, 0).await?;
// remote segment should have bytes excatly up to `flush_lsn` // remote segment should have bytes excatly up to `flush_lsn`
let expected_remote_size = partial.flush_lsn.segment_offset(mgr.wal_seg_size); let expected_remote_size = partial.flush_lsn.segment_offset(mgr.wal_seg_size);

View File

@@ -35,7 +35,7 @@ use crate::state::TimelineState;
use crate::timeline::{ManagerTimeline, ReadGuardSharedState, StateSK, WalResidentTimeline}; use crate::timeline::{ManagerTimeline, ReadGuardSharedState, StateSK, WalResidentTimeline};
use crate::timeline_guard::{AccessService, GuardId, ResidenceGuard}; use crate::timeline_guard::{AccessService, GuardId, ResidenceGuard};
use crate::timelines_set::{TimelineSetGuard, TimelinesSet}; use crate::timelines_set::{TimelineSetGuard, TimelinesSet};
use crate::wal_backup::{self, WalBackupTaskHandle}; use crate::wal_backup::{self, WalBackup, WalBackupTaskHandle};
use crate::wal_backup_partial::{self, PartialBackup, PartialRemoteSegment}; use crate::wal_backup_partial::{self, PartialBackup, PartialRemoteSegment};
pub(crate) struct StateSnapshot { pub(crate) struct StateSnapshot {
@@ -200,6 +200,7 @@ pub(crate) struct Manager {
pub(crate) conf: SafeKeeperConf, pub(crate) conf: SafeKeeperConf,
pub(crate) wal_seg_size: usize, pub(crate) wal_seg_size: usize,
pub(crate) walsenders: Arc<WalSenders>, pub(crate) walsenders: Arc<WalSenders>,
pub(crate) wal_backup: Arc<WalBackup>,
// current state // current state
pub(crate) state_version_rx: tokio::sync::watch::Receiver<usize>, pub(crate) state_version_rx: tokio::sync::watch::Receiver<usize>,
@@ -238,6 +239,7 @@ pub async fn main_task(
manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>, manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
mut manager_rx: tokio::sync::mpsc::UnboundedReceiver<ManagerCtlMessage>, mut manager_rx: tokio::sync::mpsc::UnboundedReceiver<ManagerCtlMessage>,
global_rate_limiter: RateLimiter, global_rate_limiter: RateLimiter,
wal_backup: Arc<WalBackup>,
) { ) {
tli.set_status(Status::Started); tli.set_status(Status::Started);
@@ -256,6 +258,7 @@ pub async fn main_task(
broker_active_set, broker_active_set,
manager_tx, manager_tx,
global_rate_limiter, global_rate_limiter,
wal_backup,
) )
.await; .await;
@@ -371,7 +374,7 @@ pub async fn main_task(
mgr.tli_broker_active.set(false); mgr.tli_broker_active.set(false);
// shutdown background tasks // shutdown background tasks
if mgr.conf.is_wal_backup_enabled() { if let Some(storage) = mgr.wal_backup.get_storage() {
if let Some(backup_task) = mgr.backup_task.take() { if let Some(backup_task) = mgr.backup_task.take() {
// If we fell through here, then the timeline is shutting down. This is important // If we fell through here, then the timeline is shutting down. This is important
// because otherwise joining on the wal_backup handle might hang. // because otherwise joining on the wal_backup handle might hang.
@@ -379,7 +382,7 @@ pub async fn main_task(
backup_task.join().await; backup_task.join().await;
} }
wal_backup::update_task(&mut mgr, false, &last_state).await; wal_backup::update_task(&mut mgr, storage, false, &last_state).await;
} }
if let Some(recovery_task) = &mut mgr.recovery_task { if let Some(recovery_task) = &mut mgr.recovery_task {
@@ -415,11 +418,13 @@ impl Manager {
broker_active_set: Arc<TimelinesSet>, broker_active_set: Arc<TimelinesSet>,
manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>, manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
global_rate_limiter: RateLimiter, global_rate_limiter: RateLimiter,
wal_backup: Arc<WalBackup>,
) -> Manager { ) -> Manager {
let (is_offloaded, partial_backup_uploaded) = tli.bootstrap_mgr().await; let (is_offloaded, partial_backup_uploaded) = tli.bootstrap_mgr().await;
Manager { Manager {
wal_seg_size: tli.get_wal_seg_size().await, wal_seg_size: tli.get_wal_seg_size().await,
walsenders: tli.get_walsenders().clone(), walsenders: tli.get_walsenders().clone(),
wal_backup,
state_version_rx: tli.get_state_version_rx(), state_version_rx: tli.get_state_version_rx(),
num_computes_rx: tli.get_walreceivers().get_num_rx(), num_computes_rx: tli.get_walreceivers().get_num_rx(),
tli_broker_active: broker_active_set.guard(tli.clone()), tli_broker_active: broker_active_set.guard(tli.clone()),
@@ -477,8 +482,8 @@ impl Manager {
let is_wal_backup_required = let is_wal_backup_required =
wal_backup::is_wal_backup_required(self.wal_seg_size, num_computes, state); wal_backup::is_wal_backup_required(self.wal_seg_size, num_computes, state);
if self.conf.is_wal_backup_enabled() { if let Some(storage) = self.wal_backup.get_storage() {
wal_backup::update_task(self, is_wal_backup_required, state).await; wal_backup::update_task(self, storage, is_wal_backup_required, state).await;
} }
// update the state in Arc<Timeline> // update the state in Arc<Timeline>
@@ -624,9 +629,9 @@ impl Manager {
/// Spawns partial WAL backup task if needed. /// Spawns partial WAL backup task if needed.
async fn update_partial_backup(&mut self, state: &StateSnapshot) { async fn update_partial_backup(&mut self, state: &StateSnapshot) {
// check if WAL backup is enabled and should be started // check if WAL backup is enabled and should be started
if !self.conf.is_wal_backup_enabled() { let Some(storage) = self.wal_backup.get_storage() else {
return; return;
} };
if self.partial_backup_task.is_some() { if self.partial_backup_task.is_some() {
// partial backup is already running // partial backup is already running
@@ -650,6 +655,7 @@ impl Manager {
self.conf.clone(), self.conf.clone(),
self.global_rate_limiter.clone(), self.global_rate_limiter.clone(),
cancel.clone(), cancel.clone(),
storage,
)); ));
self.partial_backup_task = Some((handle, cancel)); self.partial_backup_task = Some((handle, cancel));
} }
@@ -669,6 +675,10 @@ impl Manager {
/// Reset partial backup state and remove its remote storage data. Since it /// Reset partial backup state and remove its remote storage data. Since it
/// might concurrently uploading something, cancel the task first. /// might concurrently uploading something, cancel the task first.
async fn backup_partial_reset(&mut self) -> anyhow::Result<Vec<String>> { async fn backup_partial_reset(&mut self) -> anyhow::Result<Vec<String>> {
let Some(storage) = self.wal_backup.get_storage() else {
anyhow::bail!("remote storage is not enabled");
};
info!("resetting partial backup state"); info!("resetting partial backup state");
// Force unevict timeline if it is evicted before erasing partial backup // Force unevict timeline if it is evicted before erasing partial backup
// state. The intended use of this function is to drop corrupted remote // state. The intended use of this function is to drop corrupted remote
@@ -689,7 +699,7 @@ impl Manager {
} }
let tli = self.wal_resident_timeline()?; let tli = self.wal_resident_timeline()?;
let mut partial_backup = PartialBackup::new(tli, self.conf.clone()).await; let mut partial_backup = PartialBackup::new(tli, self.conf.clone(), storage).await;
// Reset might fail e.g. when cfile is already reset but s3 removal // Reset might fail e.g. when cfile is already reset but s3 removal
// failed, so set manager state to None beforehand. In any case caller // failed, so set manager state to None beforehand. In any case caller
// is expected to retry until success. // is expected to retry until success.

View File

@@ -25,6 +25,7 @@ use crate::rate_limit::RateLimiter;
use crate::state::TimelinePersistentState; use crate::state::TimelinePersistentState;
use crate::timeline::{Timeline, TimelineError, delete_dir, get_tenant_dir, get_timeline_dir}; use crate::timeline::{Timeline, TimelineError, delete_dir, get_tenant_dir, get_timeline_dir};
use crate::timelines_set::TimelinesSet; use crate::timelines_set::TimelinesSet;
use crate::wal_backup::WalBackup;
use crate::wal_storage::Storage; use crate::wal_storage::Storage;
use crate::{SafeKeeperConf, control_file, wal_storage}; use crate::{SafeKeeperConf, control_file, wal_storage};
@@ -47,15 +48,24 @@ struct GlobalTimelinesState {
conf: Arc<SafeKeeperConf>, conf: Arc<SafeKeeperConf>,
broker_active_set: Arc<TimelinesSet>, broker_active_set: Arc<TimelinesSet>,
global_rate_limiter: RateLimiter, global_rate_limiter: RateLimiter,
wal_backup: Arc<WalBackup>,
} }
impl GlobalTimelinesState { impl GlobalTimelinesState {
/// Get dependencies for a timeline constructor. /// Get dependencies for a timeline constructor.
fn get_dependencies(&self) -> (Arc<SafeKeeperConf>, Arc<TimelinesSet>, RateLimiter) { fn get_dependencies(
&self,
) -> (
Arc<SafeKeeperConf>,
Arc<TimelinesSet>,
RateLimiter,
Arc<WalBackup>,
) {
( (
self.conf.clone(), self.conf.clone(),
self.broker_active_set.clone(), self.broker_active_set.clone(),
self.global_rate_limiter.clone(), self.global_rate_limiter.clone(),
self.wal_backup.clone(),
) )
} }
@@ -84,7 +94,7 @@ pub struct GlobalTimelines {
impl GlobalTimelines { impl GlobalTimelines {
/// Create a new instance of the global timelines map. /// Create a new instance of the global timelines map.
pub fn new(conf: Arc<SafeKeeperConf>) -> Self { pub fn new(conf: Arc<SafeKeeperConf>, wal_backup: Arc<WalBackup>) -> Self {
Self { Self {
state: Mutex::new(GlobalTimelinesState { state: Mutex::new(GlobalTimelinesState {
timelines: HashMap::new(), timelines: HashMap::new(),
@@ -92,6 +102,7 @@ impl GlobalTimelines {
conf, conf,
broker_active_set: Arc::new(TimelinesSet::default()), broker_active_set: Arc::new(TimelinesSet::default()),
global_rate_limiter: RateLimiter::new(1, 1), global_rate_limiter: RateLimiter::new(1, 1),
wal_backup,
}), }),
} }
} }
@@ -147,7 +158,7 @@ impl GlobalTimelines {
/// just lock and unlock it for each timeline -- this function is called /// just lock and unlock it for each timeline -- this function is called
/// during init when nothing else is running, so this is fine. /// during init when nothing else is running, so this is fine.
async fn load_tenant_timelines(&self, tenant_id: TenantId) -> Result<()> { async fn load_tenant_timelines(&self, tenant_id: TenantId) -> Result<()> {
let (conf, broker_active_set, partial_backup_rate_limiter) = { let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup) = {
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
state.get_dependencies() state.get_dependencies()
}; };
@@ -162,7 +173,7 @@ impl GlobalTimelines {
TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or("")) TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
{ {
let ttid = TenantTimelineId::new(tenant_id, timeline_id); let ttid = TenantTimelineId::new(tenant_id, timeline_id);
match Timeline::load_timeline(conf.clone(), ttid) { match Timeline::load_timeline(conf.clone(), ttid, wal_backup.clone()) {
Ok(tli) => { Ok(tli) => {
let mut shared_state = tli.write_shared_state().await; let mut shared_state = tli.write_shared_state().await;
self.state self.state
@@ -175,6 +186,7 @@ impl GlobalTimelines {
&conf, &conf,
broker_active_set.clone(), broker_active_set.clone(),
partial_backup_rate_limiter.clone(), partial_backup_rate_limiter.clone(),
wal_backup.clone(),
); );
} }
// If we can't load a timeline, it's most likely because of a corrupted // If we can't load a timeline, it's most likely because of a corrupted
@@ -212,6 +224,10 @@ impl GlobalTimelines {
self.state.lock().unwrap().broker_active_set.clone() self.state.lock().unwrap().broker_active_set.clone()
} }
pub fn get_wal_backup(&self) -> Arc<WalBackup> {
self.state.lock().unwrap().wal_backup.clone()
}
/// Create a new timeline with the given id. If the timeline already exists, returns /// Create a new timeline with the given id. If the timeline already exists, returns
/// an existing timeline. /// an existing timeline.
pub(crate) async fn create( pub(crate) async fn create(
@@ -222,7 +238,7 @@ impl GlobalTimelines {
start_lsn: Lsn, start_lsn: Lsn,
commit_lsn: Lsn, commit_lsn: Lsn,
) -> Result<Arc<Timeline>> { ) -> Result<Arc<Timeline>> {
let (conf, _, _) = { let (conf, _, _, _) = {
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
if let Ok(timeline) = state.get(&ttid) { if let Ok(timeline) = state.get(&ttid) {
// Timeline already exists, return it. // Timeline already exists, return it.
@@ -267,7 +283,7 @@ impl GlobalTimelines {
check_tombstone: bool, check_tombstone: bool,
) -> Result<Arc<Timeline>> { ) -> Result<Arc<Timeline>> {
// Check for existence and mark that we're creating it. // Check for existence and mark that we're creating it.
let (conf, broker_active_set, partial_backup_rate_limiter) = { let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup) = {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
match state.timelines.get(&ttid) { match state.timelines.get(&ttid) {
Some(GlobalMapTimeline::CreationInProgress) => { Some(GlobalMapTimeline::CreationInProgress) => {
@@ -296,7 +312,14 @@ impl GlobalTimelines {
}; };
// Do the actual move and reflect the result in the map. // Do the actual move and reflect the result in the map.
match GlobalTimelines::install_temp_timeline(ttid, tmp_path, conf.clone()).await { match GlobalTimelines::install_temp_timeline(
ttid,
tmp_path,
conf.clone(),
wal_backup.clone(),
)
.await
{
Ok(timeline) => { Ok(timeline) => {
let mut timeline_shared_state = timeline.write_shared_state().await; let mut timeline_shared_state = timeline.write_shared_state().await;
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
@@ -314,6 +337,7 @@ impl GlobalTimelines {
&conf, &conf,
broker_active_set, broker_active_set,
partial_backup_rate_limiter, partial_backup_rate_limiter,
wal_backup,
); );
drop(timeline_shared_state); drop(timeline_shared_state);
Ok(timeline) Ok(timeline)
@@ -336,6 +360,7 @@ impl GlobalTimelines {
ttid: TenantTimelineId, ttid: TenantTimelineId,
tmp_path: &Utf8PathBuf, tmp_path: &Utf8PathBuf,
conf: Arc<SafeKeeperConf>, conf: Arc<SafeKeeperConf>,
wal_backup: Arc<WalBackup>,
) -> Result<Arc<Timeline>> { ) -> Result<Arc<Timeline>> {
let tenant_path = get_tenant_dir(conf.as_ref(), &ttid.tenant_id); let tenant_path = get_tenant_dir(conf.as_ref(), &ttid.tenant_id);
let timeline_path = get_timeline_dir(conf.as_ref(), &ttid); let timeline_path = get_timeline_dir(conf.as_ref(), &ttid);
@@ -377,7 +402,7 @@ impl GlobalTimelines {
// Do the move. // Do the move.
durable_rename(tmp_path, &timeline_path, !conf.no_sync).await?; durable_rename(tmp_path, &timeline_path, !conf.no_sync).await?;
Timeline::load_timeline(conf, ttid) Timeline::load_timeline(conf, ttid, wal_backup)
} }
/// Get a timeline from the global map. If it's not present, it doesn't exist on disk, /// Get a timeline from the global map. If it's not present, it doesn't exist on disk,

View File

@@ -2,6 +2,7 @@ use std::cmp::min;
use std::collections::HashSet; use std::collections::HashSet;
use std::num::NonZeroU32; use std::num::NonZeroU32;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
@@ -17,7 +18,7 @@ use safekeeper_api::models::PeerInfo;
use tokio::fs::File; use tokio::fs::File;
use tokio::select; use tokio::select;
use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::{OnceCell, watch}; use tokio::sync::watch;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::*; use tracing::*;
@@ -63,7 +64,12 @@ pub(crate) fn is_wal_backup_required(
/// Based on peer information determine which safekeeper should offload; if it /// Based on peer information determine which safekeeper should offload; if it
/// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task /// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task
/// is running, kill it. /// is running, kill it.
pub(crate) async fn update_task(mgr: &mut Manager, need_backup: bool, state: &StateSnapshot) { pub(crate) async fn update_task(
mgr: &mut Manager,
storage: Arc<GenericRemoteStorage>,
need_backup: bool,
state: &StateSnapshot,
) {
let (offloader, election_dbg_str) = let (offloader, election_dbg_str) =
determine_offloader(&state.peers, state.backup_lsn, mgr.tli.ttid, &mgr.conf); determine_offloader(&state.peers, state.backup_lsn, mgr.tli.ttid, &mgr.conf);
let elected_me = Some(mgr.conf.my_id) == offloader; let elected_me = Some(mgr.conf.my_id) == offloader;
@@ -82,7 +88,12 @@ pub(crate) async fn update_task(mgr: &mut Manager, need_backup: bool, state: &St
return; return;
}; };
let async_task = backup_task_main(resident, mgr.conf.backup_parallel_jobs, shutdown_rx); let async_task = backup_task_main(
resident,
storage,
mgr.conf.backup_parallel_jobs,
shutdown_rx,
);
let handle = if mgr.conf.current_thread_runtime { let handle = if mgr.conf.current_thread_runtime {
tokio::spawn(async_task) tokio::spawn(async_task)
@@ -169,33 +180,31 @@ fn determine_offloader(
} }
} }
static REMOTE_STORAGE: OnceCell<Option<GenericRemoteStorage>> = OnceCell::const_new(); pub struct WalBackup {
storage: Option<Arc<GenericRemoteStorage>>,
// Storage must be configured and initialized when this is called.
fn get_configured_remote_storage() -> &'static GenericRemoteStorage {
REMOTE_STORAGE
.get()
.expect("failed to get remote storage")
.as_ref()
.unwrap()
} }
pub async fn init_remote_storage(conf: &SafeKeeperConf) { impl WalBackup {
// TODO: refactor REMOTE_STORAGE to avoid using global variables, and provide /// Create a new WalBackup instance.
// dependencies to all tasks instead. pub async fn new(conf: &SafeKeeperConf) -> Result<Self> {
REMOTE_STORAGE if !conf.wal_backup_enabled {
.get_or_init(|| async { return Ok(Self { storage: None });
if let Some(conf) = conf.remote_storage.as_ref() { }
Some(
GenericRemoteStorage::from_config(conf) match conf.remote_storage.as_ref() {
.await Some(config) => {
.expect("failed to create remote storage"), let storage = GenericRemoteStorage::from_config(config).await?;
) Ok(Self {
} else { storage: Some(Arc::new(storage)),
None })
} }
}) None => Ok(Self { storage: None }),
.await; }
}
pub fn get_storage(&self) -> Option<Arc<GenericRemoteStorage>> {
self.storage.clone()
}
} }
struct WalBackupTask { struct WalBackupTask {
@@ -204,12 +213,14 @@ struct WalBackupTask {
wal_seg_size: usize, wal_seg_size: usize,
parallel_jobs: usize, parallel_jobs: usize,
commit_lsn_watch_rx: watch::Receiver<Lsn>, commit_lsn_watch_rx: watch::Receiver<Lsn>,
storage: Arc<GenericRemoteStorage>,
} }
/// Offload single timeline. /// Offload single timeline.
#[instrument(name = "wal_backup", skip_all, fields(ttid = %tli.ttid))] #[instrument(name = "wal_backup", skip_all, fields(ttid = %tli.ttid))]
async fn backup_task_main( async fn backup_task_main(
tli: WalResidentTimeline, tli: WalResidentTimeline,
storage: Arc<GenericRemoteStorage>,
parallel_jobs: usize, parallel_jobs: usize,
mut shutdown_rx: Receiver<()>, mut shutdown_rx: Receiver<()>,
) { ) {
@@ -223,6 +234,7 @@ async fn backup_task_main(
timeline_dir: tli.get_timeline_dir(), timeline_dir: tli.get_timeline_dir(),
timeline: tli, timeline: tli,
parallel_jobs, parallel_jobs,
storage,
}; };
// task is spinned up only when wal_seg_size already initialized // task is spinned up only when wal_seg_size already initialized
@@ -293,6 +305,7 @@ impl WalBackupTask {
match backup_lsn_range( match backup_lsn_range(
&self.timeline, &self.timeline,
self.storage.clone(),
&mut backup_lsn, &mut backup_lsn,
commit_lsn, commit_lsn,
self.wal_seg_size, self.wal_seg_size,
@@ -322,6 +335,7 @@ impl WalBackupTask {
async fn backup_lsn_range( async fn backup_lsn_range(
timeline: &WalResidentTimeline, timeline: &WalResidentTimeline,
storage: Arc<GenericRemoteStorage>,
backup_lsn: &mut Lsn, backup_lsn: &mut Lsn,
end_lsn: Lsn, end_lsn: Lsn,
wal_seg_size: usize, wal_seg_size: usize,
@@ -352,7 +366,12 @@ async fn backup_lsn_range(
loop { loop {
let added_task = match iter.next() { let added_task = match iter.next() {
Some(s) => { Some(s) => {
uploads.push_back(backup_single_segment(s, timeline_dir, remote_timeline_path)); uploads.push_back(backup_single_segment(
&storage,
s,
timeline_dir,
remote_timeline_path,
));
true true
} }
None => false, None => false,
@@ -388,6 +407,7 @@ async fn backup_lsn_range(
} }
async fn backup_single_segment( async fn backup_single_segment(
storage: &GenericRemoteStorage,
seg: &Segment, seg: &Segment,
timeline_dir: &Utf8Path, timeline_dir: &Utf8Path,
remote_timeline_path: &RemotePath, remote_timeline_path: &RemotePath,
@@ -395,7 +415,13 @@ async fn backup_single_segment(
let segment_file_path = seg.file_path(timeline_dir)?; let segment_file_path = seg.file_path(timeline_dir)?;
let remote_segment_path = seg.remote_path(remote_timeline_path); let remote_segment_path = seg.remote_path(remote_timeline_path);
let res = backup_object(&segment_file_path, &remote_segment_path, seg.size()).await; let res = backup_object(
storage,
&segment_file_path,
&remote_segment_path,
seg.size(),
)
.await;
if res.is_ok() { if res.is_ok() {
BACKED_UP_SEGMENTS.inc(); BACKED_UP_SEGMENTS.inc();
} else { } else {
@@ -455,12 +481,11 @@ fn get_segments(start: Lsn, end: Lsn, seg_size: usize) -> Vec<Segment> {
} }
async fn backup_object( async fn backup_object(
storage: &GenericRemoteStorage,
source_file: &Utf8Path, source_file: &Utf8Path,
target_file: &RemotePath, target_file: &RemotePath,
size: usize, size: usize,
) -> Result<()> { ) -> Result<()> {
let storage = get_configured_remote_storage();
let file = File::open(&source_file) let file = File::open(&source_file)
.await .await
.with_context(|| format!("Failed to open file {source_file:?} for wal backup"))?; .with_context(|| format!("Failed to open file {source_file:?} for wal backup"))?;
@@ -475,12 +500,11 @@ async fn backup_object(
} }
pub(crate) async fn backup_partial_segment( pub(crate) async fn backup_partial_segment(
storage: &GenericRemoteStorage,
source_file: &Utf8Path, source_file: &Utf8Path,
target_file: &RemotePath, target_file: &RemotePath,
size: usize, size: usize,
) -> Result<()> { ) -> Result<()> {
let storage = get_configured_remote_storage();
let file = File::open(&source_file) let file = File::open(&source_file)
.await .await
.with_context(|| format!("Failed to open file {source_file:?} for wal backup"))?; .with_context(|| format!("Failed to open file {source_file:?} for wal backup"))?;
@@ -504,25 +528,20 @@ pub(crate) async fn backup_partial_segment(
} }
pub(crate) async fn copy_partial_segment( pub(crate) async fn copy_partial_segment(
storage: &GenericRemoteStorage,
source: &RemotePath, source: &RemotePath,
destination: &RemotePath, destination: &RemotePath,
) -> Result<()> { ) -> Result<()> {
let storage = get_configured_remote_storage();
let cancel = CancellationToken::new(); let cancel = CancellationToken::new();
storage.copy_object(source, destination, &cancel).await storage.copy_object(source, destination, &cancel).await
} }
pub async fn read_object( pub async fn read_object(
storage: &GenericRemoteStorage,
file_path: &RemotePath, file_path: &RemotePath,
offset: u64, offset: u64,
) -> anyhow::Result<Pin<Box<dyn tokio::io::AsyncRead + Send + Sync>>> { ) -> anyhow::Result<Pin<Box<dyn tokio::io::AsyncRead + Send + Sync>>> {
let storage = REMOTE_STORAGE
.get()
.context("Failed to get remote storage")?
.as_ref()
.context("No remote storage configured")?;
info!("segment download about to start from remote path {file_path:?} at offset {offset}"); info!("segment download about to start from remote path {file_path:?} at offset {offset}");
let cancel = CancellationToken::new(); let cancel = CancellationToken::new();
@@ -547,8 +566,10 @@ pub async fn read_object(
/// Delete WAL files for the given timeline. Remote storage must be configured /// Delete WAL files for the given timeline. Remote storage must be configured
/// when called. /// when called.
pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> { pub async fn delete_timeline(
let storage = get_configured_remote_storage(); storage: &GenericRemoteStorage,
ttid: &TenantTimelineId,
) -> Result<()> {
let remote_path = remote_timeline_path(ttid)?; let remote_path = remote_timeline_path(ttid)?;
// see DEFAULT_MAX_KEYS_PER_LIST_RESPONSE // see DEFAULT_MAX_KEYS_PER_LIST_RESPONSE
@@ -618,14 +639,14 @@ pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> {
} }
/// Used by wal_backup_partial. /// Used by wal_backup_partial.
pub async fn delete_objects(paths: &[RemotePath]) -> Result<()> { pub async fn delete_objects(storage: &GenericRemoteStorage, paths: &[RemotePath]) -> Result<()> {
let cancel = CancellationToken::new(); // not really used let cancel = CancellationToken::new(); // not really used
let storage = get_configured_remote_storage();
storage.delete_objects(paths, &cancel).await storage.delete_objects(paths, &cancel).await
} }
/// Copy segments from one timeline to another. Used in copy_timeline. /// Copy segments from one timeline to another. Used in copy_timeline.
pub async fn copy_s3_segments( pub async fn copy_s3_segments(
storage: &GenericRemoteStorage,
wal_seg_size: usize, wal_seg_size: usize,
src_ttid: &TenantTimelineId, src_ttid: &TenantTimelineId,
dst_ttid: &TenantTimelineId, dst_ttid: &TenantTimelineId,
@@ -634,12 +655,6 @@ pub async fn copy_s3_segments(
) -> Result<()> { ) -> Result<()> {
const SEGMENTS_PROGRESS_REPORT_INTERVAL: u64 = 1024; const SEGMENTS_PROGRESS_REPORT_INTERVAL: u64 = 1024;
let storage = REMOTE_STORAGE
.get()
.expect("failed to get remote storage")
.as_ref()
.unwrap();
let remote_dst_path = remote_timeline_path(dst_ttid)?; let remote_dst_path = remote_timeline_path(dst_ttid)?;
let cancel = CancellationToken::new(); let cancel = CancellationToken::new();

View File

@@ -19,9 +19,11 @@
//! file. Code updates state in the control file before doing any S3 operations. //! file. Code updates state in the control file before doing any S3 operations.
//! This way control file stores information about all potentially existing //! This way control file stores information about all potentially existing
//! remote partial segments and can clean them up after uploading a newer version. //! remote partial segments and can clean them up after uploading a newer version.
use std::sync::Arc;
use camino::Utf8PathBuf; use camino::Utf8PathBuf;
use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo}; use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo};
use remote_storage::RemotePath; use remote_storage::{GenericRemoteStorage, RemotePath};
use safekeeper_api::Term; use safekeeper_api::Term;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
@@ -154,12 +156,16 @@ pub struct PartialBackup {
conf: SafeKeeperConf, conf: SafeKeeperConf,
local_prefix: Utf8PathBuf, local_prefix: Utf8PathBuf,
remote_timeline_path: RemotePath, remote_timeline_path: RemotePath,
storage: Arc<GenericRemoteStorage>,
state: State, state: State,
} }
impl PartialBackup { impl PartialBackup {
pub async fn new(tli: WalResidentTimeline, conf: SafeKeeperConf) -> PartialBackup { pub async fn new(
tli: WalResidentTimeline,
conf: SafeKeeperConf,
storage: Arc<GenericRemoteStorage>,
) -> PartialBackup {
let (_, persistent_state) = tli.get_state().await; let (_, persistent_state) = tli.get_state().await;
let wal_seg_size = tli.get_wal_seg_size().await; let wal_seg_size = tli.get_wal_seg_size().await;
@@ -173,6 +179,7 @@ impl PartialBackup {
conf, conf,
local_prefix, local_prefix,
remote_timeline_path, remote_timeline_path,
storage,
} }
} }
@@ -240,7 +247,8 @@ impl PartialBackup {
let remote_path = prepared.remote_path(&self.remote_timeline_path); let remote_path = prepared.remote_path(&self.remote_timeline_path);
// Upload first `backup_bytes` bytes of the segment to the remote storage. // Upload first `backup_bytes` bytes of the segment to the remote storage.
wal_backup::backup_partial_segment(&local_path, &remote_path, backup_bytes).await?; wal_backup::backup_partial_segment(&self.storage, &local_path, &remote_path, backup_bytes)
.await?;
PARTIAL_BACKUP_UPLOADED_BYTES.inc_by(backup_bytes as u64); PARTIAL_BACKUP_UPLOADED_BYTES.inc_by(backup_bytes as u64);
// We uploaded the segment, now let's verify that the data is still actual. // We uploaded the segment, now let's verify that the data is still actual.
@@ -326,7 +334,7 @@ impl PartialBackup {
let remote_path = self.remote_timeline_path.join(seg); let remote_path = self.remote_timeline_path.join(seg);
objects_to_delete.push(remote_path); objects_to_delete.push(remote_path);
} }
wal_backup::delete_objects(&objects_to_delete).await wal_backup::delete_objects(&self.storage, &objects_to_delete).await
} }
/// Delete all non-Uploaded segments from the remote storage. There should be only one /// Delete all non-Uploaded segments from the remote storage. There should be only one
@@ -424,6 +432,7 @@ pub async fn main_task(
conf: SafeKeeperConf, conf: SafeKeeperConf,
limiter: RateLimiter, limiter: RateLimiter,
cancel: CancellationToken, cancel: CancellationToken,
storage: Arc<GenericRemoteStorage>,
) -> Option<PartialRemoteSegment> { ) -> Option<PartialRemoteSegment> {
debug!("started"); debug!("started");
let await_duration = conf.partial_backup_timeout; let await_duration = conf.partial_backup_timeout;
@@ -432,7 +441,7 @@ pub async fn main_task(
let mut commit_lsn_rx = tli.get_commit_lsn_watch_rx(); let mut commit_lsn_rx = tli.get_commit_lsn_watch_rx();
let mut flush_lsn_rx = tli.get_term_flush_lsn_watch_rx(); let mut flush_lsn_rx = tli.get_term_flush_lsn_watch_rx();
let mut backup = PartialBackup::new(tli, conf).await; let mut backup = PartialBackup::new(tli, conf, storage).await;
debug!("state: {:?}", backup.state); debug!("state: {:?}", backup.state);

View File

@@ -21,6 +21,7 @@ use postgres_ffi::waldecoder::WalStreamDecoder;
use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo, dispatch_pgversion}; use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo, dispatch_pgversion};
use pq_proto::SystemId; use pq_proto::SystemId;
use remote_storage::RemotePath; use remote_storage::RemotePath;
use std::sync::Arc;
use tokio::fs::{self, File, OpenOptions, remove_file}; use tokio::fs::{self, File, OpenOptions, remove_file};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use tracing::*; use tracing::*;
@@ -32,7 +33,7 @@ use crate::metrics::{
REMOVED_WAL_SEGMENTS, WAL_STORAGE_OPERATION_SECONDS, WalStorageMetrics, time_io_closure, REMOVED_WAL_SEGMENTS, WAL_STORAGE_OPERATION_SECONDS, WalStorageMetrics, time_io_closure,
}; };
use crate::state::TimelinePersistentState; use crate::state::TimelinePersistentState;
use crate::wal_backup::{read_object, remote_timeline_path}; use crate::wal_backup::{WalBackup, read_object, remote_timeline_path};
pub trait Storage { pub trait Storage {
// Last written LSN. // Last written LSN.
@@ -645,7 +646,7 @@ pub struct WalReader {
wal_segment: Option<Pin<Box<dyn AsyncRead + Send + Sync>>>, wal_segment: Option<Pin<Box<dyn AsyncRead + Send + Sync>>>,
// S3 will be used to read WAL if LSN is not available locally // S3 will be used to read WAL if LSN is not available locally
enable_remote_read: bool, wal_backup: Arc<WalBackup>,
// We don't have WAL locally if LSN is less than local_start_lsn // We don't have WAL locally if LSN is less than local_start_lsn
local_start_lsn: Lsn, local_start_lsn: Lsn,
@@ -664,7 +665,7 @@ impl WalReader {
timeline_dir: Utf8PathBuf, timeline_dir: Utf8PathBuf,
state: &TimelinePersistentState, state: &TimelinePersistentState,
start_pos: Lsn, start_pos: Lsn,
enable_remote_read: bool, wal_backup: Arc<WalBackup>,
) -> Result<Self> { ) -> Result<Self> {
if state.server.wal_seg_size == 0 || state.local_start_lsn == Lsn(0) { if state.server.wal_seg_size == 0 || state.local_start_lsn == Lsn(0) {
bail!("state uninitialized, no data to read"); bail!("state uninitialized, no data to read");
@@ -693,7 +694,7 @@ impl WalReader {
wal_seg_size: state.server.wal_seg_size as usize, wal_seg_size: state.server.wal_seg_size as usize,
pos: start_pos, pos: start_pos,
wal_segment: None, wal_segment: None,
enable_remote_read, wal_backup,
local_start_lsn: state.local_start_lsn, local_start_lsn: state.local_start_lsn,
timeline_start_lsn: state.timeline_start_lsn, timeline_start_lsn: state.timeline_start_lsn,
pg_version: state.server.pg_version / 10000, pg_version: state.server.pg_version / 10000,
@@ -812,9 +813,9 @@ impl WalReader {
} }
// Try to open remote file, if remote reads are enabled // Try to open remote file, if remote reads are enabled
if self.enable_remote_read { if let Some(storage) = self.wal_backup.get_storage() {
let remote_wal_file_path = self.remote_path.join(&wal_file_name); let remote_wal_file_path = self.remote_path.join(&wal_file_name);
return read_object(&remote_wal_file_path, xlogoff as u64).await; return read_object(&storage, &remote_wal_file_path, xlogoff as u64).await;
} }
bail!("WAL segment is not found") bail!("WAL segment is not found")