diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 587f3774d4..756f2b02db 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -551,6 +551,11 @@ impl PageServerNode { .map(|x| x.parse::()) .transpose() .context("Falied to parse 'relsize_snapshot_cache_capacity' as integer")?, + basebackup_cache_enabled: settings + .remove("basebackup_cache_enabled") + .map(|x| x.parse::()) + .transpose() + .context("Failed to parse 'basebackup_cache_enabled' as bool")?, }; if !settings.is_empty() { bail!("Unrecognized tenant settings: {settings:?}") diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index 73b6eee554..0fb2ff38ff 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -183,6 +183,8 @@ pub struct ConfigToml { pub enable_tls_page_service_api: bool, pub dev_mode: bool, pub timeline_import_config: TimelineImportConfig, + #[serde(skip_serializing_if = "Option::is_none")] + pub basebackup_cache_config: Option, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -308,6 +310,26 @@ pub struct TimelineImportConfig { pub import_job_checkpoint_threshold: NonZeroUsize, } +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[serde(default)] +pub struct BasebackupCacheConfig { + #[serde(with = "humantime_serde")] + pub cleanup_period: Duration, + // FIXME: Support max_size_bytes. + // pub max_size_bytes: usize, + pub max_size_entries: i64, +} + +impl Default for BasebackupCacheConfig { + fn default() -> Self { + Self { + cleanup_period: Duration::from_secs(60), + // max_size_bytes: 1024 * 1024 * 1024, // 1 GiB + max_size_entries: 1000, + } + } +} + pub mod statvfs { pub mod mock { #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -491,8 +513,14 @@ pub struct TenantConfigToml { /// Tenant level performance sampling ratio override. Controls the ratio of get page requests /// that will get perf sampling for the tenant. pub sampling_ratio: Option, + /// Capacity of relsize snapshot cache (used by replicas). pub relsize_snapshot_cache_capacity: usize, + + /// Enable preparing basebackup on XLOG_CHECKPOINT_SHUTDOWN and using it in basebackup requests. + // FIXME: Remove skip_serializing_if when the feature is stable. + #[serde(skip_serializing_if = "std::ops::Not::not")] + pub basebackup_cache_enabled: bool, } pub mod defaults { @@ -666,6 +694,7 @@ impl Default for ConfigToml { import_job_soft_size_limit: NonZeroUsize::new(1024 * 1024 * 1024).unwrap(), import_job_checkpoint_threshold: NonZeroUsize::new(128).unwrap(), }, + basebackup_cache_config: None, } } } @@ -791,6 +820,7 @@ impl Default for TenantConfigToml { gc_compaction_ratio_percent: DEFAULT_GC_COMPACTION_RATIO_PERCENT, sampling_ratio: None, relsize_snapshot_cache_capacity: DEFAULT_RELSIZE_SNAPSHOT_CACHE_CAPACITY, + basebackup_cache_enabled: false, } } } diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index ca26286b86..383939a13f 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -632,6 +632,8 @@ pub struct TenantConfigPatch { pub sampling_ratio: FieldPatch>, #[serde(skip_serializing_if = "FieldPatch::is_noop")] pub relsize_snapshot_cache_capacity: FieldPatch, + #[serde(skip_serializing_if = "FieldPatch::is_noop")] + pub basebackup_cache_enabled: FieldPatch, } /// Like [`crate::config::TenantConfigToml`], but preserves the information @@ -764,6 +766,9 @@ pub struct TenantConfig { #[serde(skip_serializing_if = "Option::is_none")] pub relsize_snapshot_cache_capacity: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub basebackup_cache_enabled: Option, } impl TenantConfig { @@ -810,6 +815,7 @@ impl TenantConfig { mut gc_compaction_ratio_percent, mut sampling_ratio, mut relsize_snapshot_cache_capacity, + mut basebackup_cache_enabled, } = self; patch.checkpoint_distance.apply(&mut checkpoint_distance); @@ -914,6 +920,9 @@ impl TenantConfig { patch .relsize_snapshot_cache_capacity .apply(&mut relsize_snapshot_cache_capacity); + patch + .basebackup_cache_enabled + .apply(&mut basebackup_cache_enabled); Ok(Self { checkpoint_distance, @@ -954,6 +963,7 @@ impl TenantConfig { gc_compaction_ratio_percent, sampling_ratio, relsize_snapshot_cache_capacity, + basebackup_cache_enabled, }) } @@ -1065,6 +1075,9 @@ impl TenantConfig { relsize_snapshot_cache_capacity: self .relsize_snapshot_cache_capacity .unwrap_or(global_conf.relsize_snapshot_cache_capacity), + basebackup_cache_enabled: self + .basebackup_cache_enabled + .unwrap_or(global_conf.basebackup_cache_enabled), } } } diff --git a/pageserver/src/basebackup_cache.rs b/pageserver/src/basebackup_cache.rs new file mode 100644 index 0000000000..3a8ec555f7 --- /dev/null +++ b/pageserver/src/basebackup_cache.rs @@ -0,0 +1,518 @@ +use std::{collections::HashMap, sync::Arc}; + +use async_compression::tokio::write::GzipEncoder; +use camino::{Utf8Path, Utf8PathBuf}; +use metrics::core::{AtomicU64, GenericCounter}; +use pageserver_api::{config::BasebackupCacheConfig, models::TenantState}; +use tokio::{ + io::{AsyncWriteExt, BufWriter}, + sync::mpsc::{UnboundedReceiver, UnboundedSender}, +}; +use tokio_util::sync::CancellationToken; +use utils::{ + id::{TenantId, TenantTimelineId, TimelineId}, + lsn::Lsn, + shard::TenantShardId, +}; + +use crate::{ + basebackup::send_basebackup_tarball, + context::{DownloadBehavior, RequestContext}, + metrics::{BASEBACKUP_CACHE_ENTRIES, BASEBACKUP_CACHE_PREPARE, BASEBACKUP_CACHE_READ}, + task_mgr::TaskKind, + tenant::{ + Timeline, + mgr::{TenantManager, TenantSlot}, + }, +}; + +pub struct BasebackupPrepareRequest { + pub tenant_shard_id: TenantShardId, + pub timeline_id: TimelineId, + pub lsn: Lsn, +} + +pub type BasebackupPrepareSender = UnboundedSender; +pub type BasebackupPrepareReceiver = UnboundedReceiver; + +type BasebackupRemoveEntrySender = UnboundedSender; +type BasebackupRemoveEntryReceiver = UnboundedReceiver; + +/// BasebackupCache stores cached basebackup archives for timelines on local disk. +/// +/// The main purpose of this cache is to speed up the startup process of compute nodes +/// after scaling to zero. +/// Thus, the basebackup is stored only for the latest LSN of the timeline and with +/// fixed set of parameters (gzip=true, full_backup=false, replica=false, prev_lsn=none). +/// +/// The cache receives prepare requests through the `BasebackupPrepareSender` channel, +/// generates a basebackup from the timeline in the background, and stores it on disk. +/// +/// Basebackup requests are pretty rare. We expect ~thousands of entries in the cache +/// and ~1 RPS for get requests. +pub struct BasebackupCache { + data_dir: Utf8PathBuf, + config: BasebackupCacheConfig, + tenant_manager: Arc, + remove_entry_sender: BasebackupRemoveEntrySender, + + entries: std::sync::Mutex>, + + cancel: CancellationToken, + + read_hit_count: GenericCounter, + read_miss_count: GenericCounter, + read_err_count: GenericCounter, + + prepare_ok_count: GenericCounter, + prepare_skip_count: GenericCounter, + prepare_err_count: GenericCounter, +} + +impl BasebackupCache { + /// Creates a BasebackupCache and spawns the background task. + /// The initialization of the cache is performed in the background and does not + /// block the caller. The cache will return `None` for any get requests until + /// initialization is complete. + pub fn spawn( + runtime_handle: &tokio::runtime::Handle, + data_dir: Utf8PathBuf, + config: Option, + prepare_receiver: BasebackupPrepareReceiver, + tenant_manager: Arc, + cancel: CancellationToken, + ) -> Arc { + let (remove_entry_sender, remove_entry_receiver) = tokio::sync::mpsc::unbounded_channel(); + + let enabled = config.is_some(); + + let cache = Arc::new(BasebackupCache { + data_dir, + config: config.unwrap_or_default(), + tenant_manager, + remove_entry_sender, + + entries: std::sync::Mutex::new(HashMap::new()), + + cancel, + + read_hit_count: BASEBACKUP_CACHE_READ.with_label_values(&["hit"]), + read_miss_count: BASEBACKUP_CACHE_READ.with_label_values(&["miss"]), + read_err_count: BASEBACKUP_CACHE_READ.with_label_values(&["error"]), + + prepare_ok_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["ok"]), + prepare_skip_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["skip"]), + prepare_err_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["error"]), + }); + + if enabled { + runtime_handle.spawn( + cache + .clone() + .background(prepare_receiver, remove_entry_receiver), + ); + } + + cache + } + + /// Gets a basebackup entry from the cache. + /// If the entry is found, opens a file with the basebackup archive and returns it. + /// The open file descriptor will prevent the file system from deleting the file + /// even if the entry is removed from the cache in the background. + pub async fn get( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + lsn: Lsn, + ) -> Option { + // Fast path. Check if the entry exists using the in-memory state. + let tti = TenantTimelineId::new(tenant_id, timeline_id); + if self.entries.lock().unwrap().get(&tti) != Some(&lsn) { + self.read_miss_count.inc(); + return None; + } + + let path = self.entry_path(tenant_id, timeline_id, lsn); + + match tokio::fs::File::open(path).await { + Ok(file) => { + self.read_hit_count.inc(); + Some(file) + } + Err(e) => { + if e.kind() == std::io::ErrorKind::NotFound { + // We may end up here if the basebackup was concurrently removed by the cleanup task. + self.read_miss_count.inc(); + } else { + self.read_err_count.inc(); + tracing::warn!("Unexpected error opening basebackup cache file: {:?}", e); + } + None + } + } + } + + // Private methods. + + fn entry_filename(tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn) -> String { + // The default format for LSN is 0/ABCDEF. + // The backslash is not filename friendly, so serialize it as plain hex. + let lsn = lsn.0; + format!("basebackup_{tenant_id}_{timeline_id}_{lsn:016X}.tar.gz") + } + + fn entry_path(&self, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn) -> Utf8PathBuf { + self.data_dir + .join(Self::entry_filename(tenant_id, timeline_id, lsn)) + } + + fn entry_tmp_path( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + lsn: Lsn, + ) -> Utf8PathBuf { + self.data_dir + .join("tmp") + .join(Self::entry_filename(tenant_id, timeline_id, lsn)) + } + + fn parse_entry_filename(filename: &str) -> Option<(TenantId, TimelineId, Lsn)> { + let parts: Vec<&str> = filename + .strip_prefix("basebackup_")? + .strip_suffix(".tar.gz")? + .split('_') + .collect(); + if parts.len() != 3 { + return None; + } + let tenant_id = parts[0].parse::().ok()?; + let timeline_id = parts[1].parse::().ok()?; + let lsn = Lsn(u64::from_str_radix(parts[2], 16).ok()?); + + Some((tenant_id, timeline_id, lsn)) + } + + async fn cleanup(&self) -> anyhow::Result<()> { + // Cleanup tmp directory. + let tmp_dir = self.data_dir.join("tmp"); + let mut tmp_dir = tokio::fs::read_dir(&tmp_dir).await?; + while let Some(dir_entry) = tmp_dir.next_entry().await? { + if let Err(e) = tokio::fs::remove_file(dir_entry.path()).await { + tracing::warn!("Failed to remove basebackup cache tmp file: {:#}", e); + } + } + + // Remove outdated entries. + let entries_old = self.entries.lock().unwrap().clone(); + let mut entries_new = HashMap::new(); + for (tenant_shard_id, tenant_slot) in self.tenant_manager.list() { + if !tenant_shard_id.is_shard_zero() { + continue; + } + let TenantSlot::Attached(tenant) = tenant_slot else { + continue; + }; + let tenant_id = tenant_shard_id.tenant_id; + + for timeline in tenant.list_timelines() { + let tti = TenantTimelineId::new(tenant_id, timeline.timeline_id); + if let Some(&entry_lsn) = entries_old.get(&tti) { + if timeline.get_last_record_lsn() <= entry_lsn { + entries_new.insert(tti, entry_lsn); + } + } + } + } + + for (&tti, &lsn) in entries_old.iter() { + if !entries_new.contains_key(&tti) { + self.remove_entry_sender + .send(self.entry_path(tti.tenant_id, tti.timeline_id, lsn)) + .unwrap(); + } + } + + BASEBACKUP_CACHE_ENTRIES.set(entries_new.len() as i64); + *self.entries.lock().unwrap() = entries_new; + + Ok(()) + } + + async fn on_startup(&self) -> anyhow::Result<()> { + // Create data_dir and tmp directory if they do not exist. + tokio::fs::create_dir_all(&self.data_dir.join("tmp")) + .await + .map_err(|e| { + anyhow::anyhow!( + "Failed to create basebackup cache data_dir {:?}: {:?}", + self.data_dir, + e + ) + })?; + + // Read existing entries from the data_dir and add them to in-memory state. + let mut entries = HashMap::new(); + let mut dir = tokio::fs::read_dir(&self.data_dir).await?; + while let Some(dir_entry) = dir.next_entry().await? { + let filename = dir_entry.file_name(); + + if filename == "tmp" { + // Skip the tmp directory. + continue; + } + + let parsed = Self::parse_entry_filename(filename.to_string_lossy().as_ref()); + let Some((tenant_id, timeline_id, lsn)) = parsed else { + tracing::warn!("Invalid basebackup cache file name: {:?}", filename); + continue; + }; + + let tti = TenantTimelineId::new(tenant_id, timeline_id); + + use std::collections::hash_map::Entry::*; + + match entries.entry(tti) { + Occupied(mut entry) => { + let entry_lsn = *entry.get(); + // Leave only the latest entry, remove the old one. + if lsn < entry_lsn { + self.remove_entry_sender.send(self.entry_path( + tenant_id, + timeline_id, + lsn, + ))?; + } else if lsn > entry_lsn { + self.remove_entry_sender.send(self.entry_path( + tenant_id, + timeline_id, + entry_lsn, + ))?; + entry.insert(lsn); + } else { + // Two different filenames parsed to the same timline_id and LSN. + // Should never happen. + return Err(anyhow::anyhow!( + "Duplicate basebackup cache entry with the same LSN: {:?}", + filename + )); + } + } + Vacant(entry) => { + entry.insert(lsn); + } + } + } + + BASEBACKUP_CACHE_ENTRIES.set(entries.len() as i64); + *self.entries.lock().unwrap() = entries; + + Ok(()) + } + + async fn background( + self: Arc, + mut prepare_receiver: BasebackupPrepareReceiver, + mut remove_entry_receiver: BasebackupRemoveEntryReceiver, + ) { + // Panic in the background is a safe fallback. + // It will drop receivers and the cache will be effectively disabled. + self.on_startup() + .await + .expect("Failed to initialize basebackup cache"); + + let mut cleanup_ticker = tokio::time::interval(self.config.cleanup_period); + cleanup_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + tokio::select! { + Some(req) = prepare_receiver.recv() => { + if let Err(err) = self.prepare_basebackup( + req.tenant_shard_id, + req.timeline_id, + req.lsn, + ).await { + tracing::info!("Failed to prepare basebackup: {:#}", err); + self.prepare_err_count.inc(); + continue; + } + } + Some(req) = remove_entry_receiver.recv() => { + if let Err(e) = tokio::fs::remove_file(req).await { + tracing::warn!("Failed to remove basebackup cache file: {:#}", e); + } + } + _ = cleanup_ticker.tick() => { + self.cleanup().await.unwrap_or_else(|e| { + tracing::warn!("Failed to clean up basebackup cache: {:#}", e); + }); + } + _ = self.cancel.cancelled() => { + tracing::info!("BasebackupCache background task cancelled"); + break; + } + } + } + } + + /// Prepare a basebackup for the given timeline. + /// + /// If the basebackup already exists with a higher LSN or the timeline already + /// has a higher last_record_lsn, skip the preparation. + /// + /// The basebackup is prepared in a temporary directory and then moved to the final + /// location to make the operation atomic. + async fn prepare_basebackup( + &self, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + req_lsn: Lsn, + ) -> anyhow::Result<()> { + tracing::info!( + tenant_id = %tenant_shard_id.tenant_id, + %timeline_id, + %req_lsn, + "Preparing basebackup for timeline", + ); + + let tti = TenantTimelineId::new(tenant_shard_id.tenant_id, timeline_id); + + { + let entries = self.entries.lock().unwrap(); + if let Some(&entry_lsn) = entries.get(&tti) { + if entry_lsn >= req_lsn { + tracing::info!( + %timeline_id, + %req_lsn, + %entry_lsn, + "Basebackup entry already exists for timeline with higher LSN, skipping basebackup", + ); + self.prepare_skip_count.inc(); + return Ok(()); + } + } + + if entries.len() as i64 >= self.config.max_size_entries { + tracing::info!( + %timeline_id, + %req_lsn, + "Basebackup cache is full, skipping basebackup", + ); + self.prepare_skip_count.inc(); + return Ok(()); + } + } + + let tenant = self + .tenant_manager + .get_attached_tenant_shard(tenant_shard_id)?; + + let tenant_state = tenant.current_state(); + if tenant_state != TenantState::Active { + anyhow::bail!( + "Tenant {} is not active, current state: {:?}", + tenant_shard_id.tenant_id, + tenant_state + ) + } + + let timeline = tenant.get_timeline(timeline_id, true)?; + + let last_record_lsn = timeline.get_last_record_lsn(); + if last_record_lsn > req_lsn { + tracing::info!( + %timeline_id, + %req_lsn, + %last_record_lsn, + "Timeline has a higher LSN than the requested one, skipping basebackup", + ); + self.prepare_skip_count.inc(); + return Ok(()); + } + + let entry_tmp_path = self.entry_tmp_path(tenant_shard_id.tenant_id, timeline_id, req_lsn); + + let res = self + .prepare_basebackup_tmp(&entry_tmp_path, &timeline, req_lsn) + .await; + + if let Err(err) = res { + tracing::info!("Failed to prepare basebackup tmp file: {:#}", err); + // Try to clean up tmp file. If we fail, the background clean up task will take care of it. + match tokio::fs::remove_file(&entry_tmp_path).await { + Ok(_) => {} + Err(e) if e.kind() == std::io::ErrorKind::NotFound => {} + Err(e) => { + tracing::info!("Failed to remove basebackup tmp file: {:?}", e); + } + } + return Err(err); + } + + // Move the tmp file to the final location atomically. + let entry_path = self.entry_path(tenant_shard_id.tenant_id, timeline_id, req_lsn); + tokio::fs::rename(&entry_tmp_path, &entry_path).await?; + + let mut entries = self.entries.lock().unwrap(); + if let Some(old_lsn) = entries.insert(tti, req_lsn) { + // Remove the old entry if it exists. + self.remove_entry_sender + .send(self.entry_path(tenant_shard_id.tenant_id, timeline_id, old_lsn)) + .unwrap(); + } + BASEBACKUP_CACHE_ENTRIES.set(entries.len() as i64); + + self.prepare_ok_count.inc(); + Ok(()) + } + + /// Prepares a basebackup in a temporary file. + async fn prepare_basebackup_tmp( + &self, + emptry_tmp_path: &Utf8Path, + timeline: &Arc, + req_lsn: Lsn, + ) -> anyhow::Result<()> { + let ctx = RequestContext::new(TaskKind::BasebackupCache, DownloadBehavior::Download); + let ctx = ctx.with_scope_timeline(timeline); + + let file = tokio::fs::File::create(emptry_tmp_path).await?; + let mut writer = BufWriter::new(file); + + let mut encoder = GzipEncoder::with_quality( + &mut writer, + // Level::Best because compression is not on the hot path of basebackup requests. + // The decompression is almost not affected by the compression level. + async_compression::Level::Best, + ); + + // We may receive a request before the WAL record is applied to the timeline. + // Wait for the requested LSN to be applied. + timeline + .wait_lsn( + req_lsn, + crate::tenant::timeline::WaitLsnWaiter::BaseBackupCache, + crate::tenant::timeline::WaitLsnTimeout::Default, + &ctx, + ) + .await?; + + send_basebackup_tarball( + &mut encoder, + timeline, + Some(req_lsn), + None, + false, + false, + &ctx, + ) + .await?; + + encoder.shutdown().await?; + writer.flush().await?; + writer.into_inner().sync_all().await?; + + Ok(()) + } +} diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 4c2572a577..6001ea0345 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -16,6 +16,7 @@ use http_utils::tls_certs::ReloadingCertificateResolver; use metrics::launch_timestamp::{LaunchTimestamp, set_launch_timestamp_metric}; use metrics::set_build_info_metric; use nix::sys::socket::{setsockopt, sockopt}; +use pageserver::basebackup_cache::BasebackupCache; use pageserver::config::{PageServerConf, PageserverIdentity, ignored_fields}; use pageserver::controller_upcall_client::StorageControllerUpcallClient; use pageserver::deletion_queue::DeletionQueue; @@ -541,6 +542,8 @@ fn start_pageserver( pageserver::l0_flush::L0FlushGlobalState::new(conf.l0_flush.clone()); // Scan the local 'tenants/' directory and start loading the tenants + let (basebackup_prepare_sender, basebackup_prepare_receiver) = + tokio::sync::mpsc::unbounded_channel(); let deletion_queue_client = deletion_queue.new_client(); let background_purges = mgr::BackgroundPurges::default(); let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr( @@ -551,12 +554,22 @@ fn start_pageserver( remote_storage: remote_storage.clone(), deletion_queue_client, l0_flush_global_state, + basebackup_prepare_sender, }, order, shutdown_pageserver.clone(), ))?; let tenant_manager = Arc::new(tenant_manager); + let basebackup_cache = BasebackupCache::spawn( + BACKGROUND_RUNTIME.handle(), + conf.basebackup_cache_dir(), + conf.basebackup_cache_config.clone(), + basebackup_prepare_receiver, + Arc::clone(&tenant_manager), + shutdown_pageserver.child_token(), + ); + BACKGROUND_RUNTIME.spawn({ let shutdown_pageserver = shutdown_pageserver.clone(); let drive_init = async move { @@ -763,6 +776,7 @@ fn start_pageserver( } else { None }, + basebackup_cache, ); // All started up! Now just sit and wait for shutdown signal. diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 62f5b009f7..e8b3b7b3ab 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -232,6 +232,8 @@ pub struct PageServerConf { pub dev_mode: bool, pub timeline_import_config: pageserver_api::config::TimelineImportConfig, + + pub basebackup_cache_config: Option, } /// Token for authentication to safekeepers @@ -261,6 +263,10 @@ impl PageServerConf { self.workdir.join("metadata.json") } + pub fn basebackup_cache_dir(&self) -> Utf8PathBuf { + self.workdir.join("basebackup_cache") + } + pub fn deletion_list_path(&self, sequence: u64) -> Utf8PathBuf { // Encode a version in the filename, so that if we ever switch away from JSON we can // increment this. @@ -407,6 +413,7 @@ impl PageServerConf { enable_tls_page_service_api, dev_mode, timeline_import_config, + basebackup_cache_config, } = config_toml; let mut conf = PageServerConf { @@ -461,6 +468,7 @@ impl PageServerConf { enable_tls_page_service_api, dev_mode, timeline_import_config, + basebackup_cache_config, // ------------------------------------------------------------ // fields that require additional validation or custom handling diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 42454e7356..71d9c6603f 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -3,6 +3,7 @@ mod auth; pub mod basebackup; +pub mod basebackup_cache; pub mod config; pub mod consumption_metrics; pub mod context; diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index eae3045a3b..3076c7f1d6 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -4359,6 +4359,42 @@ pub(crate) fn set_tokio_runtime_setup(setup: &str, num_threads: NonZeroUsize) { .set(u64::try_from(num_threads.get()).unwrap()); } +pub(crate) static BASEBACKUP_CACHE_READ: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "pageserver_basebackup_cache_read_total", + "Number of read accesses to the basebackup cache grouped by hit/miss/error", + &["result"] + ) + .expect("failed to define a metric") +}); + +pub(crate) static BASEBACKUP_CACHE_PREPARE: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "pageserver_basebackup_cache_prepare_total", + "Number of prepare requests processed by the basebackup cache grouped by ok/skip/error", + &["result"] + ) + .expect("failed to define a metric") +}); + +pub(crate) static BASEBACKUP_CACHE_ENTRIES: Lazy = Lazy::new(|| { + register_int_gauge!( + "pageserver_basebackup_cache_entries_total", + "Number of entries in the basebackup cache" + ) + .expect("failed to define a metric") +}); + +// FIXME: Support basebackup cache size metrics. +#[allow(dead_code)] +pub(crate) static BASEBACKUP_CACHE_SIZE: Lazy = Lazy::new(|| { + register_int_gauge!( + "pageserver_basebackup_cache_size_bytes", + "Total size of all basebackup cache entries on disk in bytes" + ) + .expect("failed to define a metric") +}); + static PAGESERVER_CONFIG_IGNORED_ITEMS: Lazy = Lazy::new(|| { register_uint_gauge_vec!( "pageserver_config_ignored_items", diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index e46ba8d3a1..69519dfa87 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -9,7 +9,6 @@ use std::sync::Arc; use std::time::{Duration, Instant, SystemTime}; use std::{io, str}; -use crate::PERF_TRACE_TARGET; use anyhow::{Context, bail}; use async_compression::tokio::write::GzipEncoder; use bytes::Buf; @@ -52,8 +51,10 @@ use utils::simple_rcu::RcuReadGuard; use utils::sync::gate::{Gate, GateGuard}; use utils::sync::spsc_fold; +use crate::PERF_TRACE_TARGET; use crate::auth::check_permission; use crate::basebackup::BasebackupError; +use crate::basebackup_cache::BasebackupCache; use crate::config::PageServerConf; use crate::context::{ DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder, @@ -107,6 +108,7 @@ pub fn spawn( perf_trace_dispatch: Option, tcp_listener: tokio::net::TcpListener, tls_config: Option>, + basebackup_cache: Arc, ) -> Listener { let cancel = CancellationToken::new(); let libpq_ctx = RequestContext::todo_child( @@ -128,6 +130,7 @@ pub fn spawn( conf.pg_auth_type, tls_config, conf.page_service_pipelining.clone(), + basebackup_cache, libpq_ctx, cancel.clone(), ) @@ -186,6 +189,7 @@ pub async fn libpq_listener_main( auth_type: AuthType, tls_config: Option>, pipelining_config: PageServicePipeliningConfig, + basebackup_cache: Arc, listener_ctx: RequestContext, listener_cancel: CancellationToken, ) -> Connections { @@ -229,6 +233,7 @@ pub async fn libpq_listener_main( auth_type, tls_config.clone(), pipelining_config.clone(), + Arc::clone(&basebackup_cache), connection_ctx, connections_cancel.child_token(), gate_guard, @@ -271,6 +276,7 @@ async fn page_service_conn_main( auth_type: AuthType, tls_config: Option>, pipelining_config: PageServicePipeliningConfig, + basebackup_cache: Arc, connection_ctx: RequestContext, cancel: CancellationToken, gate_guard: GateGuard, @@ -336,6 +342,7 @@ async fn page_service_conn_main( pipelining_config, conf.get_vectored_concurrent_io, perf_span_fields, + basebackup_cache, connection_ctx, cancel.clone(), gate_guard, @@ -390,6 +397,8 @@ struct PageServerHandler { pipelining_config: PageServicePipeliningConfig, get_vectored_concurrent_io: GetVectoredConcurrentIo, + basebackup_cache: Arc, + gate_guard: GateGuard, } @@ -849,6 +858,7 @@ impl PageServerHandler { pipelining_config: PageServicePipeliningConfig, get_vectored_concurrent_io: GetVectoredConcurrentIo, perf_span_fields: ConnectionPerfSpanFields, + basebackup_cache: Arc, connection_ctx: RequestContext, cancel: CancellationToken, gate_guard: GateGuard, @@ -862,6 +872,7 @@ impl PageServerHandler { cancel, pipelining_config, get_vectored_concurrent_io, + basebackup_cache, gate_guard, } } @@ -2493,6 +2504,8 @@ impl PageServerHandler { .map_err(QueryError::Disconnected)?; self.flush_cancellable(pgb, &self.cancel).await?; + let mut from_cache = false; + // Send a tarball of the latest layer on the timeline. Compress if not // fullbackup. TODO Compress in that case too (tests need to be updated) if full_backup { @@ -2510,7 +2523,33 @@ impl PageServerHandler { .map_err(map_basebackup_error)?; } else { let mut writer = BufWriter::new(pgb.copyout_writer()); - if gzip { + + let cached = { + // Basebackup is cached only for this combination of parameters. + if timeline.is_basebackup_cache_enabled() + && gzip + && lsn.is_some() + && prev_lsn.is_none() + { + self.basebackup_cache + .get(tenant_id, timeline_id, lsn.unwrap()) + .await + } else { + None + } + }; + + if let Some(mut cached) = cached { + from_cache = true; + tokio::io::copy(&mut cached, &mut writer) + .await + .map_err(|e| { + map_basebackup_error(BasebackupError::Client( + e, + "handle_basebackup_request,cached,copy", + )) + })?; + } else if gzip { let mut encoder = GzipEncoder::with_quality( &mut writer, // NOTE using fast compression because it's on the critical path @@ -2569,6 +2608,7 @@ impl PageServerHandler { info!( lsn_await_millis = lsn_awaited_after.as_millis(), basebackup_millis = basebackup_after.as_millis(), + %from_cache, "basebackup complete" ); diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index d4873e60a1..55272b2125 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -380,6 +380,10 @@ pub enum TaskKind { DetachAncestor, ImportPgdata, + + /// Background task of [`crate::basebackup_cache::BasebackupCache`]. + /// Prepares basebackups and clears outdated entries. + BasebackupCache, } #[derive(Default)] diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index c15b44469a..bf3f71e35a 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -78,6 +78,7 @@ use self::timeline::uninit::{TimelineCreateGuard, TimelineExclusionError, Uninit use self::timeline::{ EvictionTaskTenantState, GcCutoffs, TimelineDeleteProgress, TimelineResources, WaitLsnError, }; +use crate::basebackup_cache::BasebackupPrepareSender; use crate::config::PageServerConf; use crate::context; use crate::context::RequestContextBuilder; @@ -157,6 +158,7 @@ pub struct TenantSharedResources { pub remote_storage: GenericRemoteStorage, pub deletion_queue_client: DeletionQueueClient, pub l0_flush_global_state: L0FlushGlobalState, + pub basebackup_prepare_sender: BasebackupPrepareSender, } /// A [`TenantShard`] is really an _attached_ tenant. The configuration @@ -317,12 +319,15 @@ pub struct TenantShard { gc_cs: tokio::sync::Mutex<()>, walredo_mgr: Option>, - // provides access to timeline data sitting in the remote storage + /// Provides access to timeline data sitting in the remote storage. pub(crate) remote_storage: GenericRemoteStorage, - // Access to global deletion queue for when this tenant wants to schedule a deletion + /// Access to global deletion queue for when this tenant wants to schedule a deletion. deletion_queue_client: DeletionQueueClient, + /// A channel to send async requests to prepare a basebackup for the basebackup cache. + basebackup_prepare_sender: BasebackupPrepareSender, + /// Cached logical sizes updated updated on each [`TenantShard::gather_size_inputs`]. cached_logical_sizes: tokio::sync::Mutex>, cached_synthetic_tenant_size: Arc, @@ -1286,6 +1291,7 @@ impl TenantShard { remote_storage, deletion_queue_client, l0_flush_global_state, + basebackup_prepare_sender, } = resources; let attach_mode = attached_conf.location.attach_mode; @@ -1301,6 +1307,7 @@ impl TenantShard { remote_storage.clone(), deletion_queue_client, l0_flush_global_state, + basebackup_prepare_sender, )); // The attach task will carry a GateGuard, so that shutdown() reliably waits for it to drop out if @@ -4239,6 +4246,7 @@ impl TenantShard { remote_storage: GenericRemoteStorage, deletion_queue_client: DeletionQueueClient, l0_flush_global_state: L0FlushGlobalState, + basebackup_prepare_sender: BasebackupPrepareSender, ) -> TenantShard { assert!(!attached_conf.location.generation.is_none()); @@ -4342,6 +4350,7 @@ impl TenantShard { ongoing_timeline_detach: std::sync::Mutex::default(), gc_block: Default::default(), l0_flush_global_state, + basebackup_prepare_sender, } } @@ -5261,6 +5270,7 @@ impl TenantShard { pagestream_throttle_metrics: self.pagestream_throttle_metrics.clone(), l0_compaction_trigger: self.l0_compaction_trigger.clone(), l0_flush_global_state: self.l0_flush_global_state.clone(), + basebackup_prepare_sender: self.basebackup_prepare_sender.clone(), } } @@ -5843,6 +5853,8 @@ pub(crate) mod harness { ) -> anyhow::Result> { let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager)); + let (basebackup_requst_sender, _) = tokio::sync::mpsc::unbounded_channel(); + let tenant = Arc::new(TenantShard::new( TenantState::Attaching, self.conf, @@ -5860,6 +5872,7 @@ pub(crate) mod harness { self.deletion_queue.new_client(), // TODO: ideally we should run all unit tests with both configs L0FlushGlobalState::new(L0FlushConfig::default()), + basebackup_requst_sender, )); let preload = tenant diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index a251163419..54dc3b2d0b 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -24,8 +24,6 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering}; use std::sync::{Arc, Mutex, OnceLock, RwLock, Weak}; use std::time::{Duration, Instant, SystemTime}; -use crate::PERF_TRACE_TARGET; -use crate::walredo::RedoAttemptType; use anyhow::{Context, Result, anyhow, bail, ensure}; use arc_swap::{ArcSwap, ArcSwapOption}; use bytes::Bytes; @@ -94,10 +92,12 @@ use super::storage_layer::{LayerFringe, LayerVisibilityHint, ReadableLayer}; use super::tasks::log_compaction_error; use super::upload_queue::NotInitialized; use super::{ - AttachedTenantConf, GcError, HeatMapTimeline, MaybeOffloaded, + AttachedTenantConf, BasebackupPrepareSender, GcError, HeatMapTimeline, MaybeOffloaded, debug_assert_current_span_has_tenant_and_timeline_id, }; +use crate::PERF_TRACE_TARGET; use crate::aux_file::AuxFileSizeEstimator; +use crate::basebackup_cache::BasebackupPrepareRequest; use crate::config::PageServerConf; use crate::context::{ DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder, @@ -131,6 +131,7 @@ use crate::tenant::tasks::BackgroundLoopKind; use crate::tenant::timeline::logical_size::CurrentLogicalSize; use crate::virtual_file::{MaybeFatalIo, VirtualFile}; use crate::walingest::WalLagCooldown; +use crate::walredo::RedoAttemptType; use crate::{ZERO_PAGE, task_mgr, walredo}; #[derive(Debug, PartialEq, Eq, Clone, Copy)] @@ -196,6 +197,7 @@ pub struct TimelineResources { pub pagestream_throttle_metrics: Arc, pub l0_compaction_trigger: Arc, pub l0_flush_global_state: l0_flush::L0FlushGlobalState, + pub basebackup_prepare_sender: BasebackupPrepareSender, } pub struct Timeline { @@ -439,6 +441,9 @@ pub struct Timeline { pub(crate) rel_size_v2_status: ArcSwapOption, wait_lsn_log_slow: tokio::sync::Semaphore, + + /// A channel to send async requests to prepare a basebackup for the basebackup cache. + basebackup_prepare_sender: BasebackupPrepareSender, } pub(crate) enum PreviousHeatmap { @@ -1028,6 +1033,7 @@ pub(crate) enum WaitLsnWaiter<'a> { Tenant, PageService, HttpEndpoint, + BaseBackupCache, } /// Argument to [`Timeline::shutdown`]. @@ -1554,7 +1560,8 @@ impl Timeline { } WaitLsnWaiter::Tenant | WaitLsnWaiter::PageService - | WaitLsnWaiter::HttpEndpoint => unreachable!( + | WaitLsnWaiter::HttpEndpoint + | WaitLsnWaiter::BaseBackupCache => unreachable!( "tenant or page_service context are not expected to have task kind {:?}", ctx.task_kind() ), @@ -2459,6 +2466,41 @@ impl Timeline { false } } + + pub(crate) fn is_basebackup_cache_enabled(&self) -> bool { + let tenant_conf = self.tenant_conf.load(); + tenant_conf + .tenant_conf + .basebackup_cache_enabled + .unwrap_or(self.conf.default_tenant_conf.basebackup_cache_enabled) + } + + /// Prepare basebackup for the given LSN and store it in the basebackup cache. + /// The method is asynchronous and returns immediately. + /// The actual basebackup preparation is performed in the background + /// by the basebackup cache on a best-effort basis. + pub(crate) fn prepare_basebackup(&self, lsn: Lsn) { + if !self.is_basebackup_cache_enabled() { + return; + } + if !self.tenant_shard_id.is_shard_zero() { + // In theory we should never get here, but just in case check it. + // Preparing basebackup doesn't make sense for shards other than shard zero. + return; + } + + let res = self + .basebackup_prepare_sender + .send(BasebackupPrepareRequest { + tenant_shard_id: self.tenant_shard_id, + timeline_id: self.timeline_id, + lsn, + }); + if let Err(e) = res { + // May happen during shutdown, it's not critical. + info!("Failed to send shutdown checkpoint: {e:#}"); + } + } } /// Number of times we will compute partition within a checkpoint distance. @@ -3028,6 +3070,8 @@ impl Timeline { rel_size_v2_status: ArcSwapOption::from_pointee(rel_size_v2_status), wait_lsn_log_slow: tokio::sync::Semaphore::new(1), + + basebackup_prepare_sender: resources.basebackup_prepare_sender, }; result.repartition_threshold = diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index c7a6655052..c1a3b79915 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -1316,6 +1316,10 @@ impl WalIngest { } }); + if info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN { + modification.tline.prepare_basebackup(lsn); + } + Ok(()) } diff --git a/test_runner/regress/test_basebackup.py b/test_runner/regress/test_basebackup.py new file mode 100644 index 0000000000..b083c394c7 --- /dev/null +++ b/test_runner/regress/test_basebackup.py @@ -0,0 +1,77 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from fixtures.utils import wait_until + +if TYPE_CHECKING: + from fixtures.neon_fixtures import NeonEnvBuilder + + +def test_basebackup_cache(neon_env_builder: NeonEnvBuilder): + """ + Simple test for basebackup cache. + 1. Check that we always hit the cache after compute restart. + 2. Check that we eventually delete old basebackup files, but not the latest one. + 3. Check that we delete basebackup file for timeline with active compute. + """ + + neon_env_builder.pageserver_config_override = """ + tenant_config = { basebackup_cache_enabled = true } + basebackup_cache_config = { cleanup_period = '1s' } + """ + + env = neon_env_builder.init_start() + ep = env.endpoints.create("main") + ps = env.pageserver + ps_http = ps.http_client() + + # 1. Check that we always hit the cache after compute restart. + for i in range(3): + ep.start() + ep.stop() + + def check_metrics(i=i): + metrics = ps_http.get_metrics() + # Never miss. + # The first time compute_ctl sends `get_basebackup` with lsn=None, we do not cache such requests. + # All other requests should be a hit + assert ( + metrics.query_one( + "pageserver_basebackup_cache_read_total", {"result": "miss"} + ).value + == 0 + ) + # All but the first requests are hits. + assert ( + metrics.query_one("pageserver_basebackup_cache_read_total", {"result": "hit"}).value + == i + ) + # Every compute shut down should trigger a prepare reuest. + assert ( + metrics.query_one( + "pageserver_basebackup_cache_prepare_total", {"result": "ok"} + ).value + == i + 1 + ) + + wait_until(check_metrics) + + # 2. Check that we eventually delete old basebackup files, but not the latest one. + def check_bb_file_count(): + bb_files = list(ps.workdir.joinpath("basebackup_cache").iterdir()) + # tmp dir + 1 basebackup file. + assert len(bb_files) == 2 + + wait_until(check_bb_file_count) + + # 3. Check that we delete basebackup file for timeline with active compute. + ep.start() + ep.safe_psql("create table t1 as select generate_series(1, 10) as n") + + def check_bb_dir_empty(): + bb_files = list(ps.workdir.joinpath("basebackup_cache").iterdir()) + # only tmp dir. + assert len(bb_files) == 1 + + wait_until(check_bb_dir_empty)