From aa22572d8c7602c1e6b26c0afde2df3a4e90f36d Mon Sep 17 00:00:00 2001 From: Evan Fleming Date: Fri, 16 May 2025 05:41:10 -0700 Subject: [PATCH] safekeeper: refactor static remote storage usage to use Arc (#10179) Greetings! Please add `w=1` to github url when viewing diff (sepcifically `wal_backup.rs`) ## Problem This PR is aimed at addressing the remaining work of #8200. Namely, removing static usage of remote storage in favour of arc. I did not opt to pass `Arc` directly since it is actually `Optional` as it is not necessarily always configured. I wanted to avoid having to pass `Arc>` everywhere with individual consuming functions likely needing to handle unwrapping. Instead I've added a `WalBackup` struct that holds `Optional` and handles initialization/unwrapping RemoteStorage internally. wal_backup functions now take self and `Arc` is passed as a dependency through the various consumers that need it. ## Summary of changes - Add `WalBackup` that holds `Optional` and handles initialization and unwrapping - Modify wal_backup functions to take `WalBackup` as self (Add `w=1` to github url when viewing diff here) - Initialize `WalBackup` in safekeeper root - Store `Arc` in `GlobalTimelineMap` and pass and store in each Timeline as loaded - use `WalBackup` through Timeline as needed ## Refs - task to remove global variables https://github.com/neondatabase/neon/issues/8200 - drive-by fixes https://github.com/neondatabase/neon/issues/11501 by turning the panic reported there into an error `remote storage not configured` --------- Co-authored-by: Christian Schwarz --- safekeeper/src/bin/safekeeper.rs | 9 +- safekeeper/src/copy_timeline.rs | 3 + safekeeper/src/http/routes.rs | 10 ++- safekeeper/src/lib.rs | 6 -- safekeeper/src/pull_timeline.rs | 43 +++++++-- safekeeper/src/test_utils.rs | 6 +- safekeeper/src/timeline.rs | 28 ++++-- safekeeper/src/timeline_eviction.rs | 47 +++++++--- safekeeper/src/timeline_manager.rs | 26 ++++-- safekeeper/src/timelines_global_map.rs | 41 +++++++-- safekeeper/src/wal_backup.rs | 115 ++++++++++++++----------- safekeeper/src/wal_backup_partial.rs | 21 +++-- safekeeper/src/wal_storage.rs | 13 +-- 13 files changed, 255 insertions(+), 113 deletions(-) diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index c267a55cb6..8d31ada24f 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -22,9 +22,10 @@ use safekeeper::defaults::{ DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR, DEFAULT_SSL_CERT_FILE, DEFAULT_SSL_CERT_RELOAD_PERIOD, DEFAULT_SSL_KEY_FILE, }; +use safekeeper::wal_backup::WalBackup; use safekeeper::{ BACKGROUND_RUNTIME, BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf, - WAL_SERVICE_RUNTIME, broker, control_file, http, wal_backup, wal_service, + WAL_SERVICE_RUNTIME, broker, control_file, http, wal_service, }; use sd_notify::NotifyState; use storage_broker::{DEFAULT_ENDPOINT, Uri}; @@ -484,15 +485,15 @@ async fn start_safekeeper(conf: Arc) -> Result<()> { None => None, }; - let global_timelines = Arc::new(GlobalTimelines::new(conf.clone())); + let wal_backup = Arc::new(WalBackup::new(&conf).await?); + + let global_timelines = Arc::new(GlobalTimelines::new(conf.clone(), wal_backup.clone())); // Register metrics collector for active timelines. It's important to do this // after daemonizing, otherwise process collector will be upset. let timeline_collector = safekeeper::metrics::TimelineCollector::new(global_timelines.clone()); metrics::register_internal(Box::new(timeline_collector))?; - wal_backup::init_remote_storage(&conf).await; - // Keep handles to main tasks to die if any of them disappears. let mut tasks_handles: FuturesUnordered> = FuturesUnordered::new(); diff --git a/safekeeper/src/copy_timeline.rs b/safekeeper/src/copy_timeline.rs index 11daff22cb..7984c2e2b9 100644 --- a/safekeeper/src/copy_timeline.rs +++ b/safekeeper/src/copy_timeline.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use anyhow::{Result, bail}; use camino::Utf8PathBuf; use postgres_ffi::{MAX_SEND_SIZE, WAL_SEGMENT_SIZE}; +use remote_storage::GenericRemoteStorage; use safekeeper_api::membership::Configuration; use tokio::fs::OpenOptions; use tokio::io::{AsyncSeekExt, AsyncWriteExt}; @@ -30,6 +31,7 @@ pub struct Request { pub async fn handle_request( request: Request, global_timelines: Arc, + storage: Arc, ) -> Result<()> { // TODO: request.until_lsn MUST be a valid LSN, and we cannot check it :( // if LSN will point to the middle of a WAL record, timeline will be in "broken" state @@ -127,6 +129,7 @@ pub async fn handle_request( assert!(first_ondisk_segment >= first_segment); copy_s3_segments( + &storage, wal_seg_size, &request.source_ttid, &request.destination_ttid, diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 1a25b07496..384c582678 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -258,6 +258,7 @@ async fn timeline_snapshot_handler(request: Request) -> Result, // so create the chan and write to it in another task. @@ -269,6 +270,7 @@ async fn timeline_snapshot_handler(request: Request) -> Result) -> Result bool { - self.remote_storage.is_some() && self.wal_backup_enabled - } -} - impl SafeKeeperConf { pub fn dummy() -> Self { SafeKeeperConf { diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs index c955e667bd..14aef1ee5e 100644 --- a/safekeeper/src/pull_timeline.rs +++ b/safekeeper/src/pull_timeline.rs @@ -9,6 +9,7 @@ use chrono::{DateTime, Utc}; use futures::{SinkExt, StreamExt, TryStreamExt}; use http_utils::error::ApiError; use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo}; +use remote_storage::GenericRemoteStorage; use reqwest::Certificate; use safekeeper_api::Term; use safekeeper_api::models::{PullTimelineRequest, PullTimelineResponse, TimelineStatus}; @@ -43,6 +44,7 @@ pub async fn stream_snapshot( source: NodeId, destination: NodeId, tx: mpsc::Sender>, + storage: Option>, ) { match tli.try_wal_residence_guard().await { Err(e) => { @@ -53,10 +55,32 @@ pub async fn stream_snapshot( Ok(maybe_resident_tli) => { if let Err(e) = match maybe_resident_tli { Some(resident_tli) => { - stream_snapshot_resident_guts(resident_tli, source, destination, tx.clone()) - .await + stream_snapshot_resident_guts( + resident_tli, + source, + destination, + tx.clone(), + storage, + ) + .await + } + None => { + if let Some(storage) = storage { + stream_snapshot_offloaded_guts( + tli, + source, + destination, + tx.clone(), + &storage, + ) + .await + } else { + tx.send(Err(anyhow!("remote storage not configured"))) + .await + .ok(); + return; + } } - None => stream_snapshot_offloaded_guts(tli, source, destination, tx.clone()).await, } { // Error type/contents don't matter as they won't can't reach the client // (hyper likely doesn't do anything with it), but http stream will be @@ -123,10 +147,12 @@ pub(crate) async fn stream_snapshot_offloaded_guts( source: NodeId, destination: NodeId, tx: mpsc::Sender>, + storage: &GenericRemoteStorage, ) -> Result<()> { let mut ar = prepare_tar_stream(tx); - tli.snapshot_offloaded(&mut ar, source, destination).await?; + tli.snapshot_offloaded(&mut ar, source, destination, storage) + .await?; ar.finish().await?; @@ -139,10 +165,13 @@ pub async fn stream_snapshot_resident_guts( source: NodeId, destination: NodeId, tx: mpsc::Sender>, + storage: Option>, ) -> Result<()> { let mut ar = prepare_tar_stream(tx); - let bctx = tli.start_snapshot(&mut ar, source, destination).await?; + let bctx = tli + .start_snapshot(&mut ar, source, destination, storage) + .await?; pausable_failpoint!("sk-snapshot-after-list-pausable"); let tli_dir = tli.get_timeline_dir(); @@ -182,6 +211,7 @@ impl Timeline { ar: &mut tokio_tar::Builder, source: NodeId, destination: NodeId, + storage: &GenericRemoteStorage, ) -> Result<()> { // Take initial copy of control file, then release state lock let mut control_file = { @@ -216,6 +246,7 @@ impl Timeline { // can fail if the timeline was un-evicted and modified in the background. let remote_timeline_path = &self.remote_path; wal_backup::copy_partial_segment( + storage, &replace.previous.remote_path(remote_timeline_path), &replace.current.remote_path(remote_timeline_path), ) @@ -262,6 +293,7 @@ impl WalResidentTimeline { ar: &mut tokio_tar::Builder, source: NodeId, destination: NodeId, + storage: Option>, ) -> Result { let mut shared_state = self.write_shared_state().await; let wal_seg_size = shared_state.get_wal_seg_size(); @@ -283,6 +315,7 @@ impl WalResidentTimeline { let remote_timeline_path = &self.tli.remote_path; wal_backup::copy_partial_segment( + &*storage.context("remote storage not configured")?, &replace.previous.remote_path(remote_timeline_path), &replace.current.remote_path(remote_timeline_path), ) diff --git a/safekeeper/src/test_utils.rs b/safekeeper/src/test_utils.rs index 618e2b59d2..e2817c8337 100644 --- a/safekeeper/src/test_utils.rs +++ b/safekeeper/src/test_utils.rs @@ -18,7 +18,7 @@ use crate::send_wal::EndWatch; use crate::state::{TimelinePersistentState, TimelineState}; use crate::timeline::{SharedState, StateSK, Timeline, get_timeline_dir}; use crate::timelines_set::TimelinesSet; -use crate::wal_backup::remote_timeline_path; +use crate::wal_backup::{WalBackup, remote_timeline_path}; use crate::{SafeKeeperConf, control_file, receive_wal, wal_storage}; /// A Safekeeper testing or benchmarking environment. Uses a tempdir for storage, removed on drop. @@ -101,18 +101,22 @@ impl Env { let safekeeper = self.make_safekeeper(node_id, ttid, start_lsn).await?; let shared_state = SharedState::new(StateSK::Loaded(safekeeper)); + let wal_backup = Arc::new(WalBackup::new(&conf).await?); + let timeline = Timeline::new( ttid, &timeline_dir, &remote_path, shared_state, conf.clone(), + wal_backup.clone(), ); timeline.bootstrap( &mut timeline.write_shared_state().await, &conf, Arc::new(TimelinesSet::default()), // ignored for now RateLimiter::new(0, 0), + wal_backup, ); Ok(timeline) } diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index b7ba28f435..588bd4f2c9 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -35,7 +35,8 @@ use crate::state::{EvictionState, TimelineMemState, TimelinePersistentState, Tim use crate::timeline_guard::ResidenceGuard; use crate::timeline_manager::{AtomicStatus, ManagerCtl}; use crate::timelines_set::TimelinesSet; -use crate::wal_backup::{self, remote_timeline_path}; +use crate::wal_backup; +use crate::wal_backup::{WalBackup, remote_timeline_path}; use crate::wal_backup_partial::PartialRemoteSegment; use crate::wal_storage::{Storage as wal_storage_iface, WalReader}; use crate::{SafeKeeperConf, control_file, debug_dump, timeline_manager, wal_storage}; @@ -452,6 +453,8 @@ pub struct Timeline { manager_ctl: ManagerCtl, conf: Arc, + pub(crate) wal_backup: Arc, + remote_deletion: std::sync::Mutex>, /// Hold this gate from code that depends on the Timeline's non-shut-down state. While holding @@ -476,6 +479,7 @@ impl Timeline { remote_path: &RemotePath, shared_state: SharedState, conf: Arc, + wal_backup: Arc, ) -> Arc { let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(shared_state.sk.state().commit_lsn); @@ -509,6 +513,7 @@ impl Timeline { wal_backup_active: AtomicBool::new(false), last_removed_segno: AtomicU64::new(0), mgr_status: AtomicStatus::new(), + wal_backup, }) } @@ -516,6 +521,7 @@ impl Timeline { pub fn load_timeline( conf: Arc, ttid: TenantTimelineId, + wal_backup: Arc, ) -> Result> { let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered(); @@ -529,6 +535,7 @@ impl Timeline { &remote_path, shared_state, conf, + wal_backup, )) } @@ -539,6 +546,7 @@ impl Timeline { conf: &SafeKeeperConf, broker_active_set: Arc, partial_backup_rate_limiter: RateLimiter, + wal_backup: Arc, ) { let (tx, rx) = self.manager_ctl.bootstrap_manager(); @@ -561,6 +569,7 @@ impl Timeline { tx, rx, partial_backup_rate_limiter, + wal_backup, ) .await } @@ -606,9 +615,10 @@ impl Timeline { // it is cancelled, so WAL storage won't be opened again. shared_state.sk.close_wal_store(); - if !only_local && self.conf.is_wal_backup_enabled() { + if !only_local { self.remote_delete().await?; } + let dir_existed = delete_dir(&self.timeline_dir).await?; Ok(dir_existed) } @@ -675,11 +685,20 @@ impl Timeline { guard: &mut std::sync::MutexGuard>, ) -> RemoteDeletionReceiver { tracing::info!("starting remote deletion"); + let storage = self.wal_backup.get_storage().clone(); let (result_tx, result_rx) = tokio::sync::watch::channel(None); let ttid = self.ttid; tokio::task::spawn( async move { - let r = wal_backup::delete_timeline(&ttid).await; + let r = if let Some(storage) = storage { + wal_backup::delete_timeline(&storage, &ttid).await + } else { + tracing::info!( + "skipping remote deletion because no remote storage is configured; this effectively leaks the objects in remote storage" + ); + Ok(()) + }; + if let Err(e) = &r { // Log error here in case nobody ever listens for our result (e.g. dropped API request) tracing::error!("remote deletion failed: {e}"); @@ -1046,14 +1065,13 @@ impl WalResidentTimeline { pub async fn get_walreader(&self, start_lsn: Lsn) -> Result { let (_, persisted_state) = self.get_state().await; - let enable_remote_read = self.conf.is_wal_backup_enabled(); WalReader::new( &self.ttid, self.timeline_dir.clone(), &persisted_state, start_lsn, - enable_remote_read, + self.wal_backup.clone(), ) } diff --git a/safekeeper/src/timeline_eviction.rs b/safekeeper/src/timeline_eviction.rs index 84c636daf6..e817dbf6f9 100644 --- a/safekeeper/src/timeline_eviction.rs +++ b/safekeeper/src/timeline_eviction.rs @@ -6,7 +6,7 @@ use anyhow::Context; use camino::Utf8PathBuf; -use remote_storage::RemotePath; +use remote_storage::{GenericRemoteStorage, RemotePath}; use tokio::fs::File; use tokio::io::{AsyncRead, AsyncWriteExt}; use tracing::{debug, info, instrument, warn}; @@ -68,6 +68,10 @@ impl Manager { #[instrument(name = "evict_timeline", skip_all)] pub(crate) async fn evict_timeline(&mut self) -> bool { assert!(!self.is_offloaded); + let Some(storage) = self.wal_backup.get_storage() else { + warn!("no remote storage configured, skipping uneviction"); + return false; + }; let partial_backup_uploaded = match &self.partial_backup_uploaded { Some(p) => p.clone(), None => { @@ -87,7 +91,7 @@ impl Manager { .inc(); }); - if let Err(e) = do_eviction(self, &partial_backup_uploaded).await { + if let Err(e) = do_eviction(self, &partial_backup_uploaded, &storage).await { warn!("failed to evict timeline: {:?}", e); return false; } @@ -102,6 +106,10 @@ impl Manager { #[instrument(name = "unevict_timeline", skip_all)] pub(crate) async fn unevict_timeline(&mut self) { assert!(self.is_offloaded); + let Some(storage) = self.wal_backup.get_storage() else { + warn!("no remote storage configured, skipping uneviction"); + return; + }; let partial_backup_uploaded = match &self.partial_backup_uploaded { Some(p) => p.clone(), None => { @@ -121,7 +129,7 @@ impl Manager { .inc(); }); - if let Err(e) = do_uneviction(self, &partial_backup_uploaded).await { + if let Err(e) = do_uneviction(self, &partial_backup_uploaded, &storage).await { warn!("failed to unevict timeline: {:?}", e); return; } @@ -137,8 +145,12 @@ impl Manager { /// Ensure that content matches the remote partial backup, if local segment exists. /// Then change state in control file and in-memory. If `delete_offloaded_wal` is set, /// delete the local segment. -async fn do_eviction(mgr: &mut Manager, partial: &PartialRemoteSegment) -> anyhow::Result<()> { - compare_local_segment_with_remote(mgr, partial).await?; +async fn do_eviction( + mgr: &mut Manager, + partial: &PartialRemoteSegment, + storage: &GenericRemoteStorage, +) -> anyhow::Result<()> { + compare_local_segment_with_remote(mgr, partial, storage).await?; mgr.tli.switch_to_offloaded(partial).await?; // switch manager state as soon as possible @@ -153,12 +165,16 @@ async fn do_eviction(mgr: &mut Manager, partial: &PartialRemoteSegment) -> anyho /// Ensure that content matches the remote partial backup, if local segment exists. /// Then download segment to local disk and change state in control file and in-memory. -async fn do_uneviction(mgr: &mut Manager, partial: &PartialRemoteSegment) -> anyhow::Result<()> { +async fn do_uneviction( + mgr: &mut Manager, + partial: &PartialRemoteSegment, + storage: &GenericRemoteStorage, +) -> anyhow::Result<()> { // if the local segment is present, validate it - compare_local_segment_with_remote(mgr, partial).await?; + compare_local_segment_with_remote(mgr, partial, storage).await?; // atomically download the partial segment - redownload_partial_segment(mgr, partial).await?; + redownload_partial_segment(mgr, partial, storage).await?; mgr.tli.switch_to_present().await?; // switch manager state as soon as possible @@ -181,6 +197,7 @@ async fn delete_local_segment(mgr: &Manager, partial: &PartialRemoteSegment) -> async fn redownload_partial_segment( mgr: &Manager, partial: &PartialRemoteSegment, + storage: &GenericRemoteStorage, ) -> anyhow::Result<()> { let tmp_file = mgr.tli.timeline_dir().join("remote_partial.tmp"); let remote_segfile = remote_segment_path(mgr, partial); @@ -190,7 +207,7 @@ async fn redownload_partial_segment( remote_segfile, tmp_file ); - let mut reader = wal_backup::read_object(&remote_segfile, 0).await?; + let mut reader = wal_backup::read_object(storage, &remote_segfile, 0).await?; let mut file = File::create(&tmp_file).await?; let actual_len = tokio::io::copy(&mut reader, &mut file).await?; @@ -234,13 +251,16 @@ async fn redownload_partial_segment( async fn compare_local_segment_with_remote( mgr: &Manager, partial: &PartialRemoteSegment, + storage: &GenericRemoteStorage, ) -> anyhow::Result<()> { let local_path = local_segment_path(mgr, partial); match File::open(&local_path).await { - Ok(mut local_file) => do_validation(mgr, &mut local_file, mgr.wal_seg_size, partial) - .await - .context("validation failed"), + Ok(mut local_file) => { + do_validation(mgr, &mut local_file, mgr.wal_seg_size, partial, storage) + .await + .context("validation failed") + } Err(_) => { info!( "local WAL file {} is not present, skipping validation", @@ -258,6 +278,7 @@ async fn do_validation( file: &mut File, wal_seg_size: usize, partial: &PartialRemoteSegment, + storage: &GenericRemoteStorage, ) -> anyhow::Result<()> { let local_size = file.metadata().await?.len() as usize; if local_size != wal_seg_size { @@ -270,7 +291,7 @@ async fn do_validation( let remote_segfile = remote_segment_path(mgr, partial); let mut remote_reader: std::pin::Pin> = - wal_backup::read_object(&remote_segfile, 0).await?; + wal_backup::read_object(storage, &remote_segfile, 0).await?; // remote segment should have bytes excatly up to `flush_lsn` let expected_remote_size = partial.flush_lsn.segment_offset(mgr.wal_seg_size); diff --git a/safekeeper/src/timeline_manager.rs b/safekeeper/src/timeline_manager.rs index 71e99a4de7..48eda92fed 100644 --- a/safekeeper/src/timeline_manager.rs +++ b/safekeeper/src/timeline_manager.rs @@ -35,7 +35,7 @@ use crate::state::TimelineState; use crate::timeline::{ManagerTimeline, ReadGuardSharedState, StateSK, WalResidentTimeline}; use crate::timeline_guard::{AccessService, GuardId, ResidenceGuard}; use crate::timelines_set::{TimelineSetGuard, TimelinesSet}; -use crate::wal_backup::{self, WalBackupTaskHandle}; +use crate::wal_backup::{self, WalBackup, WalBackupTaskHandle}; use crate::wal_backup_partial::{self, PartialBackup, PartialRemoteSegment}; pub(crate) struct StateSnapshot { @@ -200,6 +200,7 @@ pub(crate) struct Manager { pub(crate) conf: SafeKeeperConf, pub(crate) wal_seg_size: usize, pub(crate) walsenders: Arc, + pub(crate) wal_backup: Arc, // current state pub(crate) state_version_rx: tokio::sync::watch::Receiver, @@ -238,6 +239,7 @@ pub async fn main_task( manager_tx: tokio::sync::mpsc::UnboundedSender, mut manager_rx: tokio::sync::mpsc::UnboundedReceiver, global_rate_limiter: RateLimiter, + wal_backup: Arc, ) { tli.set_status(Status::Started); @@ -256,6 +258,7 @@ pub async fn main_task( broker_active_set, manager_tx, global_rate_limiter, + wal_backup, ) .await; @@ -371,7 +374,7 @@ pub async fn main_task( mgr.tli_broker_active.set(false); // shutdown background tasks - if mgr.conf.is_wal_backup_enabled() { + if let Some(storage) = mgr.wal_backup.get_storage() { if let Some(backup_task) = mgr.backup_task.take() { // If we fell through here, then the timeline is shutting down. This is important // because otherwise joining on the wal_backup handle might hang. @@ -379,7 +382,7 @@ pub async fn main_task( backup_task.join().await; } - wal_backup::update_task(&mut mgr, false, &last_state).await; + wal_backup::update_task(&mut mgr, storage, false, &last_state).await; } if let Some(recovery_task) = &mut mgr.recovery_task { @@ -415,11 +418,13 @@ impl Manager { broker_active_set: Arc, manager_tx: tokio::sync::mpsc::UnboundedSender, global_rate_limiter: RateLimiter, + wal_backup: Arc, ) -> Manager { let (is_offloaded, partial_backup_uploaded) = tli.bootstrap_mgr().await; Manager { wal_seg_size: tli.get_wal_seg_size().await, walsenders: tli.get_walsenders().clone(), + wal_backup, state_version_rx: tli.get_state_version_rx(), num_computes_rx: tli.get_walreceivers().get_num_rx(), tli_broker_active: broker_active_set.guard(tli.clone()), @@ -477,8 +482,8 @@ impl Manager { let is_wal_backup_required = wal_backup::is_wal_backup_required(self.wal_seg_size, num_computes, state); - if self.conf.is_wal_backup_enabled() { - wal_backup::update_task(self, is_wal_backup_required, state).await; + if let Some(storage) = self.wal_backup.get_storage() { + wal_backup::update_task(self, storage, is_wal_backup_required, state).await; } // update the state in Arc @@ -624,9 +629,9 @@ impl Manager { /// Spawns partial WAL backup task if needed. async fn update_partial_backup(&mut self, state: &StateSnapshot) { // check if WAL backup is enabled and should be started - if !self.conf.is_wal_backup_enabled() { + let Some(storage) = self.wal_backup.get_storage() else { return; - } + }; if self.partial_backup_task.is_some() { // partial backup is already running @@ -650,6 +655,7 @@ impl Manager { self.conf.clone(), self.global_rate_limiter.clone(), cancel.clone(), + storage, )); self.partial_backup_task = Some((handle, cancel)); } @@ -669,6 +675,10 @@ impl Manager { /// Reset partial backup state and remove its remote storage data. Since it /// might concurrently uploading something, cancel the task first. async fn backup_partial_reset(&mut self) -> anyhow::Result> { + let Some(storage) = self.wal_backup.get_storage() else { + anyhow::bail!("remote storage is not enabled"); + }; + info!("resetting partial backup state"); // Force unevict timeline if it is evicted before erasing partial backup // state. The intended use of this function is to drop corrupted remote @@ -689,7 +699,7 @@ impl Manager { } let tli = self.wal_resident_timeline()?; - let mut partial_backup = PartialBackup::new(tli, self.conf.clone()).await; + let mut partial_backup = PartialBackup::new(tli, self.conf.clone(), storage).await; // Reset might fail e.g. when cfile is already reset but s3 removal // failed, so set manager state to None beforehand. In any case caller // is expected to retry until success. diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs index 41abee369e..af33bcbd20 100644 --- a/safekeeper/src/timelines_global_map.rs +++ b/safekeeper/src/timelines_global_map.rs @@ -25,6 +25,7 @@ use crate::rate_limit::RateLimiter; use crate::state::TimelinePersistentState; use crate::timeline::{Timeline, TimelineError, delete_dir, get_tenant_dir, get_timeline_dir}; use crate::timelines_set::TimelinesSet; +use crate::wal_backup::WalBackup; use crate::wal_storage::Storage; use crate::{SafeKeeperConf, control_file, wal_storage}; @@ -47,15 +48,24 @@ struct GlobalTimelinesState { conf: Arc, broker_active_set: Arc, global_rate_limiter: RateLimiter, + wal_backup: Arc, } impl GlobalTimelinesState { /// Get dependencies for a timeline constructor. - fn get_dependencies(&self) -> (Arc, Arc, RateLimiter) { + fn get_dependencies( + &self, + ) -> ( + Arc, + Arc, + RateLimiter, + Arc, + ) { ( self.conf.clone(), self.broker_active_set.clone(), self.global_rate_limiter.clone(), + self.wal_backup.clone(), ) } @@ -84,7 +94,7 @@ pub struct GlobalTimelines { impl GlobalTimelines { /// Create a new instance of the global timelines map. - pub fn new(conf: Arc) -> Self { + pub fn new(conf: Arc, wal_backup: Arc) -> Self { Self { state: Mutex::new(GlobalTimelinesState { timelines: HashMap::new(), @@ -92,6 +102,7 @@ impl GlobalTimelines { conf, broker_active_set: Arc::new(TimelinesSet::default()), global_rate_limiter: RateLimiter::new(1, 1), + wal_backup, }), } } @@ -147,7 +158,7 @@ impl GlobalTimelines { /// just lock and unlock it for each timeline -- this function is called /// during init when nothing else is running, so this is fine. async fn load_tenant_timelines(&self, tenant_id: TenantId) -> Result<()> { - let (conf, broker_active_set, partial_backup_rate_limiter) = { + let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup) = { let state = self.state.lock().unwrap(); state.get_dependencies() }; @@ -162,7 +173,7 @@ impl GlobalTimelines { TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or("")) { let ttid = TenantTimelineId::new(tenant_id, timeline_id); - match Timeline::load_timeline(conf.clone(), ttid) { + match Timeline::load_timeline(conf.clone(), ttid, wal_backup.clone()) { Ok(tli) => { let mut shared_state = tli.write_shared_state().await; self.state @@ -175,6 +186,7 @@ impl GlobalTimelines { &conf, broker_active_set.clone(), partial_backup_rate_limiter.clone(), + wal_backup.clone(), ); } // If we can't load a timeline, it's most likely because of a corrupted @@ -212,6 +224,10 @@ impl GlobalTimelines { self.state.lock().unwrap().broker_active_set.clone() } + pub fn get_wal_backup(&self) -> Arc { + self.state.lock().unwrap().wal_backup.clone() + } + /// Create a new timeline with the given id. If the timeline already exists, returns /// an existing timeline. pub(crate) async fn create( @@ -222,7 +238,7 @@ impl GlobalTimelines { start_lsn: Lsn, commit_lsn: Lsn, ) -> Result> { - let (conf, _, _) = { + let (conf, _, _, _) = { let state = self.state.lock().unwrap(); if let Ok(timeline) = state.get(&ttid) { // Timeline already exists, return it. @@ -267,7 +283,7 @@ impl GlobalTimelines { check_tombstone: bool, ) -> Result> { // Check for existence and mark that we're creating it. - let (conf, broker_active_set, partial_backup_rate_limiter) = { + let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup) = { let mut state = self.state.lock().unwrap(); match state.timelines.get(&ttid) { Some(GlobalMapTimeline::CreationInProgress) => { @@ -296,7 +312,14 @@ impl GlobalTimelines { }; // Do the actual move and reflect the result in the map. - match GlobalTimelines::install_temp_timeline(ttid, tmp_path, conf.clone()).await { + match GlobalTimelines::install_temp_timeline( + ttid, + tmp_path, + conf.clone(), + wal_backup.clone(), + ) + .await + { Ok(timeline) => { let mut timeline_shared_state = timeline.write_shared_state().await; let mut state = self.state.lock().unwrap(); @@ -314,6 +337,7 @@ impl GlobalTimelines { &conf, broker_active_set, partial_backup_rate_limiter, + wal_backup, ); drop(timeline_shared_state); Ok(timeline) @@ -336,6 +360,7 @@ impl GlobalTimelines { ttid: TenantTimelineId, tmp_path: &Utf8PathBuf, conf: Arc, + wal_backup: Arc, ) -> Result> { let tenant_path = get_tenant_dir(conf.as_ref(), &ttid.tenant_id); let timeline_path = get_timeline_dir(conf.as_ref(), &ttid); @@ -377,7 +402,7 @@ impl GlobalTimelines { // Do the move. durable_rename(tmp_path, &timeline_path, !conf.no_sync).await?; - Timeline::load_timeline(conf, ttid) + Timeline::load_timeline(conf, ttid, wal_backup) } /// Get a timeline from the global map. If it's not present, it doesn't exist on disk, diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 56f4a2faf9..0beb272a60 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -2,6 +2,7 @@ use std::cmp::min; use std::collections::HashSet; use std::num::NonZeroU32; use std::pin::Pin; +use std::sync::Arc; use std::time::Duration; use anyhow::{Context, Result}; @@ -17,7 +18,7 @@ use safekeeper_api::models::PeerInfo; use tokio::fs::File; use tokio::select; use tokio::sync::mpsc::{self, Receiver, Sender}; -use tokio::sync::{OnceCell, watch}; +use tokio::sync::watch; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tracing::*; @@ -63,7 +64,12 @@ pub(crate) fn is_wal_backup_required( /// Based on peer information determine which safekeeper should offload; if it /// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task /// is running, kill it. -pub(crate) async fn update_task(mgr: &mut Manager, need_backup: bool, state: &StateSnapshot) { +pub(crate) async fn update_task( + mgr: &mut Manager, + storage: Arc, + need_backup: bool, + state: &StateSnapshot, +) { let (offloader, election_dbg_str) = determine_offloader(&state.peers, state.backup_lsn, mgr.tli.ttid, &mgr.conf); let elected_me = Some(mgr.conf.my_id) == offloader; @@ -82,7 +88,12 @@ pub(crate) async fn update_task(mgr: &mut Manager, need_backup: bool, state: &St return; }; - let async_task = backup_task_main(resident, mgr.conf.backup_parallel_jobs, shutdown_rx); + let async_task = backup_task_main( + resident, + storage, + mgr.conf.backup_parallel_jobs, + shutdown_rx, + ); let handle = if mgr.conf.current_thread_runtime { tokio::spawn(async_task) @@ -169,33 +180,31 @@ fn determine_offloader( } } -static REMOTE_STORAGE: OnceCell> = OnceCell::const_new(); - -// Storage must be configured and initialized when this is called. -fn get_configured_remote_storage() -> &'static GenericRemoteStorage { - REMOTE_STORAGE - .get() - .expect("failed to get remote storage") - .as_ref() - .unwrap() +pub struct WalBackup { + storage: Option>, } -pub async fn init_remote_storage(conf: &SafeKeeperConf) { - // TODO: refactor REMOTE_STORAGE to avoid using global variables, and provide - // dependencies to all tasks instead. - REMOTE_STORAGE - .get_or_init(|| async { - if let Some(conf) = conf.remote_storage.as_ref() { - Some( - GenericRemoteStorage::from_config(conf) - .await - .expect("failed to create remote storage"), - ) - } else { - None +impl WalBackup { + /// Create a new WalBackup instance. + pub async fn new(conf: &SafeKeeperConf) -> Result { + if !conf.wal_backup_enabled { + return Ok(Self { storage: None }); + } + + match conf.remote_storage.as_ref() { + Some(config) => { + let storage = GenericRemoteStorage::from_config(config).await?; + Ok(Self { + storage: Some(Arc::new(storage)), + }) } - }) - .await; + None => Ok(Self { storage: None }), + } + } + + pub fn get_storage(&self) -> Option> { + self.storage.clone() + } } struct WalBackupTask { @@ -204,12 +213,14 @@ struct WalBackupTask { wal_seg_size: usize, parallel_jobs: usize, commit_lsn_watch_rx: watch::Receiver, + storage: Arc, } /// Offload single timeline. #[instrument(name = "wal_backup", skip_all, fields(ttid = %tli.ttid))] async fn backup_task_main( tli: WalResidentTimeline, + storage: Arc, parallel_jobs: usize, mut shutdown_rx: Receiver<()>, ) { @@ -223,6 +234,7 @@ async fn backup_task_main( timeline_dir: tli.get_timeline_dir(), timeline: tli, parallel_jobs, + storage, }; // task is spinned up only when wal_seg_size already initialized @@ -293,6 +305,7 @@ impl WalBackupTask { match backup_lsn_range( &self.timeline, + self.storage.clone(), &mut backup_lsn, commit_lsn, self.wal_seg_size, @@ -322,6 +335,7 @@ impl WalBackupTask { async fn backup_lsn_range( timeline: &WalResidentTimeline, + storage: Arc, backup_lsn: &mut Lsn, end_lsn: Lsn, wal_seg_size: usize, @@ -352,7 +366,12 @@ async fn backup_lsn_range( loop { let added_task = match iter.next() { Some(s) => { - uploads.push_back(backup_single_segment(s, timeline_dir, remote_timeline_path)); + uploads.push_back(backup_single_segment( + &storage, + s, + timeline_dir, + remote_timeline_path, + )); true } None => false, @@ -388,6 +407,7 @@ async fn backup_lsn_range( } async fn backup_single_segment( + storage: &GenericRemoteStorage, seg: &Segment, timeline_dir: &Utf8Path, remote_timeline_path: &RemotePath, @@ -395,7 +415,13 @@ async fn backup_single_segment( let segment_file_path = seg.file_path(timeline_dir)?; let remote_segment_path = seg.remote_path(remote_timeline_path); - let res = backup_object(&segment_file_path, &remote_segment_path, seg.size()).await; + let res = backup_object( + storage, + &segment_file_path, + &remote_segment_path, + seg.size(), + ) + .await; if res.is_ok() { BACKED_UP_SEGMENTS.inc(); } else { @@ -455,12 +481,11 @@ fn get_segments(start: Lsn, end: Lsn, seg_size: usize) -> Vec { } async fn backup_object( + storage: &GenericRemoteStorage, source_file: &Utf8Path, target_file: &RemotePath, size: usize, ) -> Result<()> { - let storage = get_configured_remote_storage(); - let file = File::open(&source_file) .await .with_context(|| format!("Failed to open file {source_file:?} for wal backup"))?; @@ -475,12 +500,11 @@ async fn backup_object( } pub(crate) async fn backup_partial_segment( + storage: &GenericRemoteStorage, source_file: &Utf8Path, target_file: &RemotePath, size: usize, ) -> Result<()> { - let storage = get_configured_remote_storage(); - let file = File::open(&source_file) .await .with_context(|| format!("Failed to open file {source_file:?} for wal backup"))?; @@ -504,25 +528,20 @@ pub(crate) async fn backup_partial_segment( } pub(crate) async fn copy_partial_segment( + storage: &GenericRemoteStorage, source: &RemotePath, destination: &RemotePath, ) -> Result<()> { - let storage = get_configured_remote_storage(); let cancel = CancellationToken::new(); storage.copy_object(source, destination, &cancel).await } pub async fn read_object( + storage: &GenericRemoteStorage, file_path: &RemotePath, offset: u64, ) -> anyhow::Result>> { - let storage = REMOTE_STORAGE - .get() - .context("Failed to get remote storage")? - .as_ref() - .context("No remote storage configured")?; - info!("segment download about to start from remote path {file_path:?} at offset {offset}"); let cancel = CancellationToken::new(); @@ -547,8 +566,10 @@ pub async fn read_object( /// Delete WAL files for the given timeline. Remote storage must be configured /// when called. -pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> { - let storage = get_configured_remote_storage(); +pub async fn delete_timeline( + storage: &GenericRemoteStorage, + ttid: &TenantTimelineId, +) -> Result<()> { let remote_path = remote_timeline_path(ttid)?; // see DEFAULT_MAX_KEYS_PER_LIST_RESPONSE @@ -618,14 +639,14 @@ pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> { } /// Used by wal_backup_partial. -pub async fn delete_objects(paths: &[RemotePath]) -> Result<()> { +pub async fn delete_objects(storage: &GenericRemoteStorage, paths: &[RemotePath]) -> Result<()> { let cancel = CancellationToken::new(); // not really used - let storage = get_configured_remote_storage(); storage.delete_objects(paths, &cancel).await } /// Copy segments from one timeline to another. Used in copy_timeline. pub async fn copy_s3_segments( + storage: &GenericRemoteStorage, wal_seg_size: usize, src_ttid: &TenantTimelineId, dst_ttid: &TenantTimelineId, @@ -634,12 +655,6 @@ pub async fn copy_s3_segments( ) -> Result<()> { const SEGMENTS_PROGRESS_REPORT_INTERVAL: u64 = 1024; - let storage = REMOTE_STORAGE - .get() - .expect("failed to get remote storage") - .as_ref() - .unwrap(); - let remote_dst_path = remote_timeline_path(dst_ttid)?; let cancel = CancellationToken::new(); diff --git a/safekeeper/src/wal_backup_partial.rs b/safekeeper/src/wal_backup_partial.rs index 049852a048..fe0f1b3607 100644 --- a/safekeeper/src/wal_backup_partial.rs +++ b/safekeeper/src/wal_backup_partial.rs @@ -19,9 +19,11 @@ //! file. Code updates state in the control file before doing any S3 operations. //! This way control file stores information about all potentially existing //! remote partial segments and can clean them up after uploading a newer version. +use std::sync::Arc; + use camino::Utf8PathBuf; use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo}; -use remote_storage::RemotePath; +use remote_storage::{GenericRemoteStorage, RemotePath}; use safekeeper_api::Term; use serde::{Deserialize, Serialize}; use tokio_util::sync::CancellationToken; @@ -154,12 +156,16 @@ pub struct PartialBackup { conf: SafeKeeperConf, local_prefix: Utf8PathBuf, remote_timeline_path: RemotePath, - + storage: Arc, state: State, } impl PartialBackup { - pub async fn new(tli: WalResidentTimeline, conf: SafeKeeperConf) -> PartialBackup { + pub async fn new( + tli: WalResidentTimeline, + conf: SafeKeeperConf, + storage: Arc, + ) -> PartialBackup { let (_, persistent_state) = tli.get_state().await; let wal_seg_size = tli.get_wal_seg_size().await; @@ -173,6 +179,7 @@ impl PartialBackup { conf, local_prefix, remote_timeline_path, + storage, } } @@ -240,7 +247,8 @@ impl PartialBackup { let remote_path = prepared.remote_path(&self.remote_timeline_path); // Upload first `backup_bytes` bytes of the segment to the remote storage. - wal_backup::backup_partial_segment(&local_path, &remote_path, backup_bytes).await?; + wal_backup::backup_partial_segment(&self.storage, &local_path, &remote_path, backup_bytes) + .await?; PARTIAL_BACKUP_UPLOADED_BYTES.inc_by(backup_bytes as u64); // We uploaded the segment, now let's verify that the data is still actual. @@ -326,7 +334,7 @@ impl PartialBackup { let remote_path = self.remote_timeline_path.join(seg); objects_to_delete.push(remote_path); } - wal_backup::delete_objects(&objects_to_delete).await + wal_backup::delete_objects(&self.storage, &objects_to_delete).await } /// Delete all non-Uploaded segments from the remote storage. There should be only one @@ -424,6 +432,7 @@ pub async fn main_task( conf: SafeKeeperConf, limiter: RateLimiter, cancel: CancellationToken, + storage: Arc, ) -> Option { debug!("started"); let await_duration = conf.partial_backup_timeout; @@ -432,7 +441,7 @@ pub async fn main_task( let mut commit_lsn_rx = tli.get_commit_lsn_watch_rx(); let mut flush_lsn_rx = tli.get_term_flush_lsn_watch_rx(); - let mut backup = PartialBackup::new(tli, conf).await; + let mut backup = PartialBackup::new(tli, conf, storage).await; debug!("state: {:?}", backup.state); diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index f0bac4b40a..8ba3e7cc47 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -21,6 +21,7 @@ use postgres_ffi::waldecoder::WalStreamDecoder; use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo, dispatch_pgversion}; use pq_proto::SystemId; use remote_storage::RemotePath; +use std::sync::Arc; use tokio::fs::{self, File, OpenOptions, remove_file}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; use tracing::*; @@ -32,7 +33,7 @@ use crate::metrics::{ REMOVED_WAL_SEGMENTS, WAL_STORAGE_OPERATION_SECONDS, WalStorageMetrics, time_io_closure, }; use crate::state::TimelinePersistentState; -use crate::wal_backup::{read_object, remote_timeline_path}; +use crate::wal_backup::{WalBackup, read_object, remote_timeline_path}; pub trait Storage { // Last written LSN. @@ -645,7 +646,7 @@ pub struct WalReader { wal_segment: Option>>, // S3 will be used to read WAL if LSN is not available locally - enable_remote_read: bool, + wal_backup: Arc, // We don't have WAL locally if LSN is less than local_start_lsn local_start_lsn: Lsn, @@ -664,7 +665,7 @@ impl WalReader { timeline_dir: Utf8PathBuf, state: &TimelinePersistentState, start_pos: Lsn, - enable_remote_read: bool, + wal_backup: Arc, ) -> Result { if state.server.wal_seg_size == 0 || state.local_start_lsn == Lsn(0) { bail!("state uninitialized, no data to read"); @@ -693,7 +694,7 @@ impl WalReader { wal_seg_size: state.server.wal_seg_size as usize, pos: start_pos, wal_segment: None, - enable_remote_read, + wal_backup, local_start_lsn: state.local_start_lsn, timeline_start_lsn: state.timeline_start_lsn, pg_version: state.server.pg_version / 10000, @@ -812,9 +813,9 @@ impl WalReader { } // Try to open remote file, if remote reads are enabled - if self.enable_remote_read { + if let Some(storage) = self.wal_backup.get_storage() { let remote_wal_file_path = self.remote_path.join(&wal_file_name); - return read_object(&remote_wal_file_path, xlogoff as u64).await; + return read_object(&storage, &remote_wal_file_path, xlogoff as u64).await; } bail!("WAL segment is not found")