From f3acfb2d80729ca7a2cfffdd4d924bd934101b06 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Fri, 2 Aug 2024 15:26:46 +0100 Subject: [PATCH] Improve safekeepers eviction rate limiting (#8456) This commit tries to fix regular load spikes on staging, caused by too many eviction and partial upload operations running at the same time. Usually it was hapenning after restart, for partial backup the load was delayed. - Add a semaphore for evictions (2 permits by default) - Rename `resident_since` to `evict_not_before` and smooth out the curve by using random duration - Use random duration in partial uploads as well related to https://github.com/neondatabase/neon/issues/6338 some discussion in https://neondb.slack.com/archives/C033RQ5SPDH/p1720601531744029 --- safekeeper/src/lib.rs | 2 ++ safekeeper/src/rate_limit.rs | 49 ++++++++++++++++++++++++++ safekeeper/src/timeline.rs | 3 +- safekeeper/src/timeline_eviction.rs | 6 ++-- safekeeper/src/timeline_manager.rs | 48 ++++++++++++++++++------- safekeeper/src/timelines_global_map.rs | 14 +++++--- safekeeper/src/wal_backup_partial.rs | 39 +++++++------------- 7 files changed, 112 insertions(+), 49 deletions(-) create mode 100644 safekeeper/src/rate_limit.rs diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 8f2920ada3..56d61e8287 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -21,6 +21,7 @@ pub mod json_ctrl; pub mod metrics; pub mod patch_control_file; pub mod pull_timeline; +pub mod rate_limit; pub mod receive_wal; pub mod recovery; pub mod remove_wal; @@ -53,6 +54,7 @@ pub mod defaults { pub const DEFAULT_PARTIAL_BACKUP_TIMEOUT: &str = "15m"; pub const DEFAULT_CONTROL_FILE_SAVE_INTERVAL: &str = "300s"; pub const DEFAULT_PARTIAL_BACKUP_CONCURRENCY: &str = "5"; + pub const DEFAULT_EVICTION_CONCURRENCY: usize = 2; // By default, our required residency before eviction is the same as the period that passes // before uploading a partial segment, so that in normal operation the eviction can happen diff --git a/safekeeper/src/rate_limit.rs b/safekeeper/src/rate_limit.rs new file mode 100644 index 0000000000..72373b5786 --- /dev/null +++ b/safekeeper/src/rate_limit.rs @@ -0,0 +1,49 @@ +use std::sync::Arc; + +use rand::Rng; + +use crate::metrics::MISC_OPERATION_SECONDS; + +/// Global rate limiter for background tasks. +#[derive(Clone)] +pub struct RateLimiter { + partial_backup: Arc, + eviction: Arc, +} + +impl RateLimiter { + /// Create a new rate limiter. + /// - `partial_backup_max`: maximum number of concurrent partial backups. + /// - `eviction_max`: maximum number of concurrent timeline evictions. + pub fn new(partial_backup_max: usize, eviction_max: usize) -> Self { + Self { + partial_backup: Arc::new(tokio::sync::Semaphore::new(partial_backup_max)), + eviction: Arc::new(tokio::sync::Semaphore::new(eviction_max)), + } + } + + /// Get a permit for partial backup. This will block if the maximum number of concurrent + /// partial backups is reached. + pub async fn acquire_partial_backup(&self) -> tokio::sync::OwnedSemaphorePermit { + let _timer = MISC_OPERATION_SECONDS + .with_label_values(&["partial_permit_acquire"]) + .start_timer(); + self.partial_backup + .clone() + .acquire_owned() + .await + .expect("semaphore is closed") + } + + /// Try to get a permit for timeline eviction. This will return None if the maximum number of + /// concurrent timeline evictions is reached. + pub fn try_acquire_eviction(&self) -> Option { + self.eviction.clone().try_acquire_owned().ok() + } +} + +/// Generate a random duration that is a fraction of the given duration. +pub fn rand_duration(duration: &std::time::Duration) -> std::time::Duration { + let randf64 = rand::thread_rng().gen_range(0.0..1.0); + duration.mul_f64(randf64) +} diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 132e5ec32f..57935d879f 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -25,6 +25,7 @@ use utils::{ use storage_broker::proto::SafekeeperTimelineInfo; use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; +use crate::rate_limit::RateLimiter; use crate::receive_wal::WalReceivers; use crate::safekeeper::{ AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, ServerInfo, Term, TermLsn, @@ -36,7 +37,7 @@ use crate::timeline_guard::ResidenceGuard; use crate::timeline_manager::{AtomicStatus, ManagerCtl}; use crate::timelines_set::TimelinesSet; use crate::wal_backup::{self}; -use crate::wal_backup_partial::{PartialRemoteSegment, RateLimiter}; +use crate::wal_backup_partial::PartialRemoteSegment; use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION}; use crate::metrics::{FullTimelineInfo, WalStorageMetrics, MISC_OPERATION_SECONDS}; diff --git a/safekeeper/src/timeline_eviction.rs b/safekeeper/src/timeline_eviction.rs index 7947d83eb4..ae6f3f4b7e 100644 --- a/safekeeper/src/timeline_eviction.rs +++ b/safekeeper/src/timeline_eviction.rs @@ -5,7 +5,6 @@ use anyhow::Context; use camino::Utf8PathBuf; use remote_storage::RemotePath; -use std::time::Instant; use tokio::{ fs::File, io::{AsyncRead, AsyncWriteExt}, @@ -15,6 +14,7 @@ use utils::crashsafe::durable_rename; use crate::{ metrics::{EvictionEvent, EVICTION_EVENTS_COMPLETED, EVICTION_EVENTS_STARTED}, + rate_limit::rand_duration, timeline_manager::{Manager, StateSnapshot}, wal_backup, wal_backup_partial::{self, PartialRemoteSegment}, @@ -50,7 +50,6 @@ impl Manager { .flush_lsn .segment_number(self.wal_seg_size) == self.last_removed_segno + 1 - && self.resident_since.elapsed() >= self.conf.eviction_min_resident } /// Evict the timeline to remote storage. @@ -112,7 +111,8 @@ impl Manager { return; } - self.resident_since = Instant::now(); + self.evict_not_before = + tokio::time::Instant::now() + rand_duration(&self.conf.eviction_min_resident); info!("successfully restored evicted timeline"); } diff --git a/safekeeper/src/timeline_manager.rs b/safekeeper/src/timeline_manager.rs index debf8c824f..c224dcd398 100644 --- a/safekeeper/src/timeline_manager.rs +++ b/safekeeper/src/timeline_manager.rs @@ -23,6 +23,7 @@ use utils::lsn::Lsn; use crate::{ control_file::{FileStorage, Storage}, metrics::{MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL, MISC_OPERATION_SECONDS}, + rate_limit::{rand_duration, RateLimiter}, recovery::recovery_main, remove_wal::calc_horizon_lsn, safekeeper::Term, @@ -32,7 +33,7 @@ use crate::{ timeline_guard::{AccessService, GuardId, ResidenceGuard}, timelines_set::{TimelineSetGuard, TimelinesSet}, wal_backup::{self, WalBackupTaskHandle}, - wal_backup_partial::{self, PartialRemoteSegment, RateLimiter}, + wal_backup_partial::{self, PartialRemoteSegment}, SafeKeeperConf, }; @@ -185,11 +186,11 @@ pub(crate) struct Manager { // misc pub(crate) access_service: AccessService, - pub(crate) partial_backup_rate_limiter: RateLimiter, + pub(crate) global_rate_limiter: RateLimiter, // Anti-flapping state: we evict timelines eagerly if they are inactive, but should not // evict them if they go inactive very soon after being restored. - pub(crate) resident_since: std::time::Instant, + pub(crate) evict_not_before: Instant, } /// This task gets spawned alongside each timeline and is responsible for managing the timeline's @@ -202,7 +203,7 @@ pub async fn main_task( broker_active_set: Arc, manager_tx: tokio::sync::mpsc::UnboundedSender, mut manager_rx: tokio::sync::mpsc::UnboundedReceiver, - partial_backup_rate_limiter: RateLimiter, + global_rate_limiter: RateLimiter, ) { tli.set_status(Status::Started); @@ -220,7 +221,7 @@ pub async fn main_task( conf, broker_active_set, manager_tx, - partial_backup_rate_limiter, + global_rate_limiter, ) .await; @@ -254,9 +255,29 @@ pub async fn main_task( mgr.set_status(Status::UpdatePartialBackup); mgr.update_partial_backup(&state_snapshot).await; - if mgr.conf.enable_offload && mgr.ready_for_eviction(&next_event, &state_snapshot) { - mgr.set_status(Status::EvictTimeline); - mgr.evict_timeline().await; + let now = Instant::now(); + if mgr.evict_not_before > now { + // we should wait until evict_not_before + update_next_event(&mut next_event, mgr.evict_not_before); + } + + if mgr.conf.enable_offload + && mgr.evict_not_before <= now + && mgr.ready_for_eviction(&next_event, &state_snapshot) + { + // check rate limiter and evict timeline if possible + match mgr.global_rate_limiter.try_acquire_eviction() { + Some(_permit) => { + mgr.set_status(Status::EvictTimeline); + mgr.evict_timeline().await; + } + None => { + // we can't evict timeline now, will try again later + mgr.evict_not_before = + Instant::now() + rand_duration(&mgr.conf.eviction_min_resident); + update_next_event(&mut next_event, mgr.evict_not_before); + } + } } } @@ -334,11 +355,10 @@ impl Manager { conf: SafeKeeperConf, broker_active_set: Arc, manager_tx: tokio::sync::mpsc::UnboundedSender, - partial_backup_rate_limiter: RateLimiter, + global_rate_limiter: RateLimiter, ) -> Manager { let (is_offloaded, partial_backup_uploaded) = tli.bootstrap_mgr().await; Manager { - conf, wal_seg_size: tli.get_wal_seg_size().await, walsenders: tli.get_walsenders().clone(), state_version_rx: tli.get_state_version_rx(), @@ -353,8 +373,10 @@ impl Manager { partial_backup_uploaded, access_service: AccessService::new(manager_tx), tli, - partial_backup_rate_limiter, - resident_since: std::time::Instant::now(), + global_rate_limiter, + // to smooth out evictions spike after restart + evict_not_before: Instant::now() + rand_duration(&conf.eviction_min_resident), + conf, } } @@ -541,7 +563,7 @@ impl Manager { self.partial_backup_task = Some(tokio::spawn(wal_backup_partial::main_task( self.wal_resident_timeline(), self.conf.clone(), - self.partial_backup_rate_limiter.clone(), + self.global_rate_limiter.clone(), ))); } diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs index f57da5c7cb..6662e18817 100644 --- a/safekeeper/src/timelines_global_map.rs +++ b/safekeeper/src/timelines_global_map.rs @@ -2,10 +2,11 @@ //! All timelines should always be present in this map, this is done by loading them //! all from the disk on startup and keeping them in memory. +use crate::defaults::DEFAULT_EVICTION_CONCURRENCY; +use crate::rate_limit::RateLimiter; use crate::safekeeper::ServerInfo; use crate::timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError}; use crate::timelines_set::TimelinesSet; -use crate::wal_backup_partial::RateLimiter; use crate::SafeKeeperConf; use anyhow::{bail, Context, Result}; use camino::Utf8PathBuf; @@ -31,7 +32,7 @@ struct GlobalTimelinesState { conf: Option, broker_active_set: Arc, load_lock: Arc>, - partial_backup_rate_limiter: RateLimiter, + global_rate_limiter: RateLimiter, } // Used to prevent concurrent timeline loading. @@ -50,7 +51,7 @@ impl GlobalTimelinesState { ( self.get_conf().clone(), self.broker_active_set.clone(), - self.partial_backup_rate_limiter.clone(), + self.global_rate_limiter.clone(), ) } @@ -85,7 +86,7 @@ static TIMELINES_STATE: Lazy> = Lazy::new(|| { conf: None, broker_active_set: Arc::new(TimelinesSet::default()), load_lock: Arc::new(tokio::sync::Mutex::new(TimelineLoadLock)), - partial_backup_rate_limiter: RateLimiter::new(1), + global_rate_limiter: RateLimiter::new(1, 1), }) }); @@ -99,7 +100,10 @@ impl GlobalTimelines { // lock, so use explicit block let tenants_dir = { let mut state = TIMELINES_STATE.lock().unwrap(); - state.partial_backup_rate_limiter = RateLimiter::new(conf.partial_backup_concurrency); + state.global_rate_limiter = RateLimiter::new( + conf.partial_backup_concurrency, + DEFAULT_EVICTION_CONCURRENCY, + ); state.conf = Some(conf); // Iterate through all directories and load tenants for all directories diff --git a/safekeeper/src/wal_backup_partial.rs b/safekeeper/src/wal_backup_partial.rs index b1efa9749f..52765b0e98 100644 --- a/safekeeper/src/wal_backup_partial.rs +++ b/safekeeper/src/wal_backup_partial.rs @@ -18,8 +18,6 @@ //! This way control file stores information about all potentially existing //! remote partial segments and can clean them up after uploading a newer version. -use std::sync::Arc; - use camino::Utf8PathBuf; use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI}; use remote_storage::RemotePath; @@ -30,6 +28,7 @@ use utils::lsn::Lsn; use crate::{ metrics::{MISC_OPERATION_SECONDS, PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS}, + rate_limit::{rand_duration, RateLimiter}, safekeeper::Term, timeline::WalResidentTimeline, timeline_manager::StateSnapshot, @@ -37,30 +36,6 @@ use crate::{ SafeKeeperConf, }; -#[derive(Clone)] -pub struct RateLimiter { - semaphore: Arc, -} - -impl RateLimiter { - pub fn new(permits: usize) -> Self { - Self { - semaphore: Arc::new(tokio::sync::Semaphore::new(permits)), - } - } - - async fn acquire_owned(&self) -> tokio::sync::OwnedSemaphorePermit { - let _timer = MISC_OPERATION_SECONDS - .with_label_values(&["partial_permit_acquire"]) - .start_timer(); - self.semaphore - .clone() - .acquire_owned() - .await - .expect("semaphore is closed") - } -} - #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub enum UploadStatus { /// Upload is in progress. This status should be used only for garbage collection, @@ -352,6 +327,7 @@ pub async fn main_task( ) -> Option { debug!("started"); let await_duration = conf.partial_backup_timeout; + let mut first_iteration = true; let (_, persistent_state) = tli.get_state().await; let mut commit_lsn_rx = tli.get_commit_lsn_watch_rx(); @@ -419,6 +395,15 @@ pub async fn main_task( } } + // smoothing the load after restart, by sleeping for a random time. + // if this is not the first iteration, we will wait for the full await_duration + let await_duration = if first_iteration { + first_iteration = false; + rand_duration(&await_duration) + } else { + await_duration + }; + // fixing the segno and waiting some time to prevent reuploading the same segment too often let pending_segno = backup.segno(flush_lsn_rx.borrow().lsn); let timeout = tokio::time::sleep(await_duration); @@ -454,7 +439,7 @@ pub async fn main_task( } // limit concurrent uploads - let _upload_permit = limiter.acquire_owned().await; + let _upload_permit = limiter.acquire_partial_backup().await; let prepared = backup.prepare_upload().await; if let Some(seg) = &uploaded_segment {