From 605fb04f8912d5939d72c9c7b17b8c543f8fc078 Mon Sep 17 00:00:00 2001 From: Dmitrii Kovalkov <34828390+DimasKovas@users.noreply.github.com> Date: Thu, 26 Jun 2025 17:26:24 +0400 Subject: [PATCH] pageserver: use bounded sender for basebackup cache (#12342) ## Problem Basebackup cache now uses unbounded channel for prepare requests. In theory it can grow large if the cache is hung and does not process the requests. - Part of https://github.com/neondatabase/cloud/issues/29353 ## Summary of changes - Replace an unbounded channel with a bounded one, the size is configurable. - Add `pageserver_basebackup_cache_prepare_queue_size` to observe the size of the queue. - Refactor a bit to move all metrics logic to `basebackup_cache.rs` --- libs/pageserver_api/src/config.rs | 4 ++ pageserver/src/basebackup_cache.rs | 109 ++++++++++++++++++++++++----- pageserver/src/bin/pageserver.rs | 13 ++-- pageserver/src/metrics.rs | 8 +++ pageserver/src/page_service.rs | 15 +--- pageserver/src/tenant.rs | 20 +++--- pageserver/src/tenant/mgr.rs | 16 +++-- pageserver/src/tenant/timeline.rs | 30 ++++---- 8 files changed, 142 insertions(+), 73 deletions(-) diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index 7926e839cf..0cfa1c8485 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -371,6 +371,9 @@ pub struct BasebackupCacheConfig { // TODO(diko): support max_entry_size_bytes. // pub max_entry_size_bytes: u64, pub max_size_entries: usize, + /// Size of the channel used to send prepare requests to the basebackup cache worker. + /// If exceeded, new prepare requests will be dropped. + pub prepare_channel_size: usize, } impl Default for BasebackupCacheConfig { @@ -380,6 +383,7 @@ impl Default for BasebackupCacheConfig { max_total_size_bytes: 1024 * 1024 * 1024, // 1 GiB // max_entry_size_bytes: 16 * 1024 * 1024, // 16 MiB max_size_entries: 1000, + prepare_channel_size: 100, } } } diff --git a/pageserver/src/basebackup_cache.rs b/pageserver/src/basebackup_cache.rs index 69438dae7f..4966fee2d7 100644 --- a/pageserver/src/basebackup_cache.rs +++ b/pageserver/src/basebackup_cache.rs @@ -6,7 +6,7 @@ use metrics::core::{AtomicU64, GenericCounter}; use pageserver_api::{config::BasebackupCacheConfig, models::TenantState}; use tokio::{ io::{AsyncWriteExt, BufWriter}, - sync::mpsc::{UnboundedReceiver, UnboundedSender}, + sync::mpsc::{Receiver, Sender, error::TrySendError}, }; use tokio_util::sync::CancellationToken; use utils::{ @@ -19,8 +19,8 @@ use crate::{ basebackup::send_basebackup_tarball, context::{DownloadBehavior, RequestContext}, metrics::{ - BASEBACKUP_CACHE_ENTRIES, BASEBACKUP_CACHE_PREPARE, BASEBACKUP_CACHE_READ, - BASEBACKUP_CACHE_SIZE, + BASEBACKUP_CACHE_ENTRIES, BASEBACKUP_CACHE_PREPARE, BASEBACKUP_CACHE_PREPARE_QUEUE_SIZE, + BASEBACKUP_CACHE_READ, BASEBACKUP_CACHE_SIZE, }, task_mgr::TaskKind, tenant::{ @@ -35,8 +35,8 @@ pub struct BasebackupPrepareRequest { pub lsn: Lsn, } -pub type BasebackupPrepareSender = UnboundedSender; -pub type BasebackupPrepareReceiver = UnboundedReceiver; +pub type BasebackupPrepareSender = Sender; +pub type BasebackupPrepareReceiver = Receiver; #[derive(Clone)] struct CacheEntry { @@ -60,40 +60,65 @@ struct CacheEntry { /// and ~1 RPS for get requests. pub struct BasebackupCache { data_dir: Utf8PathBuf, + config: Option, entries: std::sync::Mutex>, + prepare_sender: BasebackupPrepareSender, + read_hit_count: GenericCounter, read_miss_count: GenericCounter, read_err_count: GenericCounter, + + prepare_skip_count: GenericCounter, } impl BasebackupCache { - /// Creates a BasebackupCache and spawns the background task. - /// The initialization of the cache is performed in the background and does not - /// block the caller. The cache will return `None` for any get requests until - /// initialization is complete. - pub fn spawn( - runtime_handle: &tokio::runtime::Handle, + /// Create a new BasebackupCache instance. + /// Also returns a BasebackupPrepareReceiver which is needed to start + /// the background task. + /// The cache is initialized from the data_dir in the background task. + /// The cache will return `None` for any get requests until the initialization is complete. + /// The background task is spawned separately using [`Self::spawn_background_task`] + /// to avoid a circular dependency between the cache and the tenant manager. + pub fn new( data_dir: Utf8PathBuf, config: Option, - prepare_receiver: BasebackupPrepareReceiver, - tenant_manager: Arc, - cancel: CancellationToken, - ) -> Arc { + ) -> (Arc, BasebackupPrepareReceiver) { + let chan_size = config.as_ref().map(|c| c.max_size_entries).unwrap_or(1); + + let (prepare_sender, prepare_receiver) = tokio::sync::mpsc::channel(chan_size); + let cache = Arc::new(BasebackupCache { data_dir, - + config, entries: std::sync::Mutex::new(HashMap::new()), + prepare_sender, read_hit_count: BASEBACKUP_CACHE_READ.with_label_values(&["hit"]), read_miss_count: BASEBACKUP_CACHE_READ.with_label_values(&["miss"]), read_err_count: BASEBACKUP_CACHE_READ.with_label_values(&["error"]), + + prepare_skip_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["skip"]), }); - if let Some(config) = config { + (cache, prepare_receiver) + } + + /// Spawns the background task. + /// The background task initializes the cache from the disk, + /// processes prepare requests, and cleans up outdated cache entries. + /// Noop if the cache is disabled (config is None). + pub fn spawn_background_task( + self: Arc, + runtime_handle: &tokio::runtime::Handle, + prepare_receiver: BasebackupPrepareReceiver, + tenant_manager: Arc, + cancel: CancellationToken, + ) { + if let Some(config) = self.config.clone() { let background = BackgroundTask { - c: cache.clone(), + c: self, config, tenant_manager, @@ -108,8 +133,45 @@ impl BasebackupCache { }; runtime_handle.spawn(background.run(prepare_receiver)); } + } - cache + /// Send a basebackup prepare request to the background task. + /// The basebackup will be prepared asynchronously, it does not block the caller. + /// The request will be skipped if any cache limits are exceeded. + pub fn send_prepare(&self, tenant_shard_id: TenantShardId, timeline_id: TimelineId, lsn: Lsn) { + let req = BasebackupPrepareRequest { + tenant_shard_id, + timeline_id, + lsn, + }; + + BASEBACKUP_CACHE_PREPARE_QUEUE_SIZE.inc(); + let res = self.prepare_sender.try_send(req); + + if let Err(e) = res { + BASEBACKUP_CACHE_PREPARE_QUEUE_SIZE.dec(); + self.prepare_skip_count.inc(); + match e { + TrySendError::Full(_) => { + // Basebackup prepares are pretty rare, normally we should not hit this. + tracing::info!( + tenant_id = %tenant_shard_id.tenant_id, + %timeline_id, + %lsn, + "Basebackup prepare channel is full, skipping the request" + ); + } + TrySendError::Closed(_) => { + // Normal during shutdown, not critical. + tracing::info!( + tenant_id = %tenant_shard_id.tenant_id, + %timeline_id, + %lsn, + "Basebackup prepare channel is closed, skipping the request" + ); + } + } + } } /// Gets a basebackup entry from the cache. @@ -122,6 +184,10 @@ impl BasebackupCache { timeline_id: TimelineId, lsn: Lsn, ) -> Option { + if !self.is_enabled() { + return None; + } + // Fast path. Check if the entry exists using the in-memory state. let tti = TenantTimelineId::new(tenant_id, timeline_id); if self.entries.lock().unwrap().get(&tti).map(|e| e.lsn) != Some(lsn) { @@ -149,6 +215,10 @@ impl BasebackupCache { } } + pub fn is_enabled(&self) -> bool { + self.config.is_some() + } + // Private methods. fn entry_filename(tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn) -> String { @@ -366,6 +436,7 @@ impl BackgroundTask { loop { tokio::select! { Some(req) = prepare_receiver.recv() => { + BASEBACKUP_CACHE_PREPARE_QUEUE_SIZE.dec(); if let Err(err) = self.prepare_basebackup( req.tenant_shard_id, req.timeline_id, diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index d137d651eb..327384fd82 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -569,8 +569,10 @@ fn start_pageserver( pageserver::l0_flush::L0FlushGlobalState::new(conf.l0_flush.clone()); // Scan the local 'tenants/' directory and start loading the tenants - let (basebackup_prepare_sender, basebackup_prepare_receiver) = - tokio::sync::mpsc::unbounded_channel(); + let (basebackup_cache, basebackup_prepare_receiver) = BasebackupCache::new( + conf.basebackup_cache_dir(), + conf.basebackup_cache_config.clone(), + ); let deletion_queue_client = deletion_queue.new_client(); let background_purges = mgr::BackgroundPurges::default(); @@ -582,7 +584,7 @@ fn start_pageserver( remote_storage: remote_storage.clone(), deletion_queue_client, l0_flush_global_state, - basebackup_prepare_sender, + basebackup_cache: Arc::clone(&basebackup_cache), feature_resolver: feature_resolver.clone(), }, shutdown_pageserver.clone(), @@ -590,10 +592,8 @@ fn start_pageserver( let tenant_manager = Arc::new(tenant_manager); BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(tenant_manager.clone(), order))?; - let basebackup_cache = BasebackupCache::spawn( + basebackup_cache.spawn_background_task( BACKGROUND_RUNTIME.handle(), - conf.basebackup_cache_dir(), - conf.basebackup_cache_config.clone(), basebackup_prepare_receiver, Arc::clone(&tenant_manager), shutdown_pageserver.child_token(), @@ -806,7 +806,6 @@ fn start_pageserver( } else { None }, - basebackup_cache, ); // Spawn a Pageserver gRPC server task. It will spawn separate tasks for diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 7929b094b4..21faceef49 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -4439,6 +4439,14 @@ pub(crate) static BASEBACKUP_CACHE_SIZE: Lazy = Lazy::new(|| { .expect("failed to define a metric") }); +pub(crate) static BASEBACKUP_CACHE_PREPARE_QUEUE_SIZE: Lazy = Lazy::new(|| { + register_uint_gauge!( + "pageserver_basebackup_cache_prepare_queue_size", + "Number of requests in the basebackup prepare channel" + ) + .expect("failed to define a metric") +}); + static PAGESERVER_CONFIG_IGNORED_ITEMS: Lazy = Lazy::new(|| { register_uint_gauge_vec!( "pageserver_config_ignored_items", diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index dd02947e5c..0287a2bdb5 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -62,7 +62,6 @@ use utils::{failpoint_support, span_record}; use crate::auth::check_permission; use crate::basebackup::{self, BasebackupError}; -use crate::basebackup_cache::BasebackupCache; use crate::config::PageServerConf; use crate::context::{ DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder, @@ -137,7 +136,6 @@ pub fn spawn( perf_trace_dispatch: Option, tcp_listener: tokio::net::TcpListener, tls_config: Option>, - basebackup_cache: Arc, ) -> Listener { let cancel = CancellationToken::new(); let libpq_ctx = RequestContext::todo_child( @@ -159,7 +157,6 @@ pub fn spawn( conf.pg_auth_type, tls_config, conf.page_service_pipelining.clone(), - basebackup_cache, libpq_ctx, cancel.clone(), ) @@ -218,7 +215,6 @@ pub async fn libpq_listener_main( auth_type: AuthType, tls_config: Option>, pipelining_config: PageServicePipeliningConfig, - basebackup_cache: Arc, listener_ctx: RequestContext, listener_cancel: CancellationToken, ) -> Connections { @@ -262,7 +258,6 @@ pub async fn libpq_listener_main( auth_type, tls_config.clone(), pipelining_config.clone(), - Arc::clone(&basebackup_cache), connection_ctx, connections_cancel.child_token(), gate_guard, @@ -305,7 +300,6 @@ async fn page_service_conn_main( auth_type: AuthType, tls_config: Option>, pipelining_config: PageServicePipeliningConfig, - basebackup_cache: Arc, connection_ctx: RequestContext, cancel: CancellationToken, gate_guard: GateGuard, @@ -371,7 +365,6 @@ async fn page_service_conn_main( pipelining_config, conf.get_vectored_concurrent_io, perf_span_fields, - basebackup_cache, connection_ctx, cancel.clone(), gate_guard, @@ -425,8 +418,6 @@ struct PageServerHandler { pipelining_config: PageServicePipeliningConfig, get_vectored_concurrent_io: GetVectoredConcurrentIo, - basebackup_cache: Arc, - gate_guard: GateGuard, } @@ -912,7 +903,6 @@ impl PageServerHandler { pipelining_config: PageServicePipeliningConfig, get_vectored_concurrent_io: GetVectoredConcurrentIo, perf_span_fields: ConnectionPerfSpanFields, - basebackup_cache: Arc, connection_ctx: RequestContext, cancel: CancellationToken, gate_guard: GateGuard, @@ -926,7 +916,6 @@ impl PageServerHandler { cancel, pipelining_config, get_vectored_concurrent_io, - basebackup_cache, gate_guard, } } @@ -2626,9 +2615,7 @@ impl PageServerHandler { && lsn.is_some() && prev_lsn.is_none() { - self.basebackup_cache - .get(tenant_id, timeline_id, lsn.unwrap()) - .await + timeline.get_cached_basebackup(lsn.unwrap()).await } else { None } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index c71655ce17..2613528143 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -80,7 +80,7 @@ use self::timeline::uninit::{TimelineCreateGuard, TimelineExclusionError, Uninit use self::timeline::{ EvictionTaskTenantState, GcCutoffs, TimelineDeleteProgress, TimelineResources, WaitLsnError, }; -use crate::basebackup_cache::BasebackupPrepareSender; +use crate::basebackup_cache::BasebackupCache; use crate::config::PageServerConf; use crate::context; use crate::context::RequestContextBuilder; @@ -162,7 +162,7 @@ pub struct TenantSharedResources { pub remote_storage: GenericRemoteStorage, pub deletion_queue_client: DeletionQueueClient, pub l0_flush_global_state: L0FlushGlobalState, - pub basebackup_prepare_sender: BasebackupPrepareSender, + pub basebackup_cache: Arc, pub feature_resolver: FeatureResolver, } @@ -331,7 +331,7 @@ pub struct TenantShard { deletion_queue_client: DeletionQueueClient, /// A channel to send async requests to prepare a basebackup for the basebackup cache. - basebackup_prepare_sender: BasebackupPrepareSender, + basebackup_cache: Arc, /// Cached logical sizes updated updated on each [`TenantShard::gather_size_inputs`]. cached_logical_sizes: tokio::sync::Mutex>, @@ -1363,7 +1363,7 @@ impl TenantShard { remote_storage, deletion_queue_client, l0_flush_global_state, - basebackup_prepare_sender, + basebackup_cache, feature_resolver, } = resources; @@ -1380,7 +1380,7 @@ impl TenantShard { remote_storage.clone(), deletion_queue_client, l0_flush_global_state, - basebackup_prepare_sender, + basebackup_cache, feature_resolver, )); @@ -4380,7 +4380,7 @@ impl TenantShard { remote_storage: GenericRemoteStorage, deletion_queue_client: DeletionQueueClient, l0_flush_global_state: L0FlushGlobalState, - basebackup_prepare_sender: BasebackupPrepareSender, + basebackup_cache: Arc, feature_resolver: FeatureResolver, ) -> TenantShard { assert!(!attached_conf.location.generation.is_none()); @@ -4485,7 +4485,7 @@ impl TenantShard { ongoing_timeline_detach: std::sync::Mutex::default(), gc_block: Default::default(), l0_flush_global_state, - basebackup_prepare_sender, + basebackup_cache, feature_resolver, } } @@ -5414,7 +5414,7 @@ impl TenantShard { pagestream_throttle_metrics: self.pagestream_throttle_metrics.clone(), l0_compaction_trigger: self.l0_compaction_trigger.clone(), l0_flush_global_state: self.l0_flush_global_state.clone(), - basebackup_prepare_sender: self.basebackup_prepare_sender.clone(), + basebackup_cache: self.basebackup_cache.clone(), feature_resolver: self.feature_resolver.clone(), } } @@ -6000,7 +6000,7 @@ pub(crate) mod harness { ) -> anyhow::Result> { let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager)); - let (basebackup_requst_sender, _) = tokio::sync::mpsc::unbounded_channel(); + let (basebackup_cache, _) = BasebackupCache::new(Utf8PathBuf::new(), None); let tenant = Arc::new(TenantShard::new( TenantState::Attaching, @@ -6018,7 +6018,7 @@ pub(crate) mod harness { self.deletion_queue.new_client(), // TODO: ideally we should run all unit tests with both configs L0FlushGlobalState::new(L0FlushConfig::default()), - basebackup_requst_sender, + basebackup_cache, FeatureResolver::new_disabled(), )); diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 76937dd959..0a494e7923 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -2891,14 +2891,18 @@ mod tests { use std::collections::BTreeMap; use std::sync::Arc; + use camino::Utf8PathBuf; use storage_broker::BrokerClientChannel; use tracing::Instrument; use super::super::harness::TenantHarness; use super::TenantsMap; - use crate::tenant::{ - TenantSharedResources, - mgr::{BackgroundPurges, TenantManager, TenantSlot}, + use crate::{ + basebackup_cache::BasebackupCache, + tenant::{ + TenantSharedResources, + mgr::{BackgroundPurges, TenantManager, TenantSlot}, + }, }; #[tokio::test(start_paused = true)] @@ -2924,9 +2928,7 @@ mod tests { // Invoke remove_tenant_from_memory with a cleanup hook that blocks until we manually // permit it to proceed: that will stick the tenant in InProgress - let (basebackup_prepare_sender, _) = tokio::sync::mpsc::unbounded_channel::< - crate::basebackup_cache::BasebackupPrepareRequest, - >(); + let (basebackup_cache, _) = BasebackupCache::new(Utf8PathBuf::new(), None); let tenant_manager = TenantManager { tenants: std::sync::RwLock::new(TenantsMap::Open(tenants)), @@ -2940,7 +2942,7 @@ mod tests { l0_flush_global_state: crate::l0_flush::L0FlushGlobalState::new( h.conf.l0_flush.clone(), ), - basebackup_prepare_sender, + basebackup_cache, feature_resolver: crate::feature_resolver::FeatureResolver::new_disabled(), }, cancel: tokio_util::sync::CancellationToken::new(), diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 4ca005bfd4..bec2f0ed52 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -95,12 +95,12 @@ use super::storage_layer::{LayerFringe, LayerVisibilityHint, ReadableLayer}; use super::tasks::log_compaction_error; use super::upload_queue::NotInitialized; use super::{ - AttachedTenantConf, BasebackupPrepareSender, GcError, HeatMapTimeline, MaybeOffloaded, + AttachedTenantConf, GcError, HeatMapTimeline, MaybeOffloaded, debug_assert_current_span_has_tenant_and_timeline_id, }; use crate::PERF_TRACE_TARGET; use crate::aux_file::AuxFileSizeEstimator; -use crate::basebackup_cache::BasebackupPrepareRequest; +use crate::basebackup_cache::BasebackupCache; use crate::config::PageServerConf; use crate::context::{ DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder, @@ -201,7 +201,7 @@ pub struct TimelineResources { pub pagestream_throttle_metrics: Arc, pub l0_compaction_trigger: Arc, pub l0_flush_global_state: l0_flush::L0FlushGlobalState, - pub basebackup_prepare_sender: BasebackupPrepareSender, + pub basebackup_cache: Arc, pub feature_resolver: FeatureResolver, } @@ -448,7 +448,7 @@ pub struct Timeline { wait_lsn_log_slow: tokio::sync::Semaphore, /// A channel to send async requests to prepare a basebackup for the basebackup cache. - basebackup_prepare_sender: BasebackupPrepareSender, + basebackup_cache: Arc, feature_resolver: FeatureResolver, } @@ -2500,6 +2500,13 @@ impl Timeline { .unwrap_or(self.conf.default_tenant_conf.basebackup_cache_enabled) } + /// Try to get a basebackup from the on-disk cache. + pub(crate) async fn get_cached_basebackup(&self, lsn: Lsn) -> Option { + self.basebackup_cache + .get(self.tenant_shard_id.tenant_id, self.timeline_id, lsn) + .await + } + /// Prepare basebackup for the given LSN and store it in the basebackup cache. /// The method is asynchronous and returns immediately. /// The actual basebackup preparation is performed in the background @@ -2521,17 +2528,8 @@ impl Timeline { return; } - let res = self - .basebackup_prepare_sender - .send(BasebackupPrepareRequest { - tenant_shard_id: self.tenant_shard_id, - timeline_id: self.timeline_id, - lsn, - }); - if let Err(e) = res { - // May happen during shutdown, it's not critical. - info!("Failed to send shutdown checkpoint: {e:#}"); - } + self.basebackup_cache + .send_prepare(self.tenant_shard_id, self.timeline_id, lsn); } } @@ -3088,7 +3086,7 @@ impl Timeline { wait_lsn_log_slow: tokio::sync::Semaphore::new(1), - basebackup_prepare_sender: resources.basebackup_prepare_sender, + basebackup_cache: resources.basebackup_cache, feature_resolver: resources.feature_resolver, };