use std::{collections::HashMap, sync::Arc}; use async_compression::tokio::write::GzipEncoder; use camino::{Utf8Path, Utf8PathBuf}; use metrics::core::{AtomicU64, GenericCounter}; use pageserver_api::{config::BasebackupCacheConfig, models::TenantState}; use tokio::{ io::{AsyncWriteExt, BufWriter}, sync::mpsc::{UnboundedReceiver, UnboundedSender}, }; use tokio_util::sync::CancellationToken; use utils::{ id::{TenantId, TenantTimelineId, TimelineId}, lsn::Lsn, shard::TenantShardId, }; use crate::{ basebackup::send_basebackup_tarball, context::{DownloadBehavior, RequestContext}, metrics::{BASEBACKUP_CACHE_ENTRIES, BASEBACKUP_CACHE_PREPARE, BASEBACKUP_CACHE_READ}, task_mgr::TaskKind, tenant::{ Timeline, mgr::{TenantManager, TenantSlot}, }, }; pub struct BasebackupPrepareRequest { pub tenant_shard_id: TenantShardId, pub timeline_id: TimelineId, pub lsn: Lsn, } pub type BasebackupPrepareSender = UnboundedSender; pub type BasebackupPrepareReceiver = UnboundedReceiver; type BasebackupRemoveEntrySender = UnboundedSender; type BasebackupRemoveEntryReceiver = UnboundedReceiver; /// BasebackupCache stores cached basebackup archives for timelines on local disk. /// /// The main purpose of this cache is to speed up the startup process of compute nodes /// after scaling to zero. /// Thus, the basebackup is stored only for the latest LSN of the timeline and with /// fixed set of parameters (gzip=true, full_backup=false, replica=false, prev_lsn=none). /// /// The cache receives prepare requests through the `BasebackupPrepareSender` channel, /// generates a basebackup from the timeline in the background, and stores it on disk. /// /// Basebackup requests are pretty rare. We expect ~thousands of entries in the cache /// and ~1 RPS for get requests. pub struct BasebackupCache { data_dir: Utf8PathBuf, config: BasebackupCacheConfig, tenant_manager: Arc, remove_entry_sender: BasebackupRemoveEntrySender, entries: std::sync::Mutex>, cancel: CancellationToken, read_hit_count: GenericCounter, read_miss_count: GenericCounter, read_err_count: GenericCounter, prepare_ok_count: GenericCounter, prepare_skip_count: GenericCounter, prepare_err_count: GenericCounter, } impl BasebackupCache { /// Creates a BasebackupCache and spawns the background task. /// The initialization of the cache is performed in the background and does not /// block the caller. The cache will return `None` for any get requests until /// initialization is complete. pub fn spawn( runtime_handle: &tokio::runtime::Handle, data_dir: Utf8PathBuf, config: Option, prepare_receiver: BasebackupPrepareReceiver, tenant_manager: Arc, cancel: CancellationToken, ) -> Arc { let (remove_entry_sender, remove_entry_receiver) = tokio::sync::mpsc::unbounded_channel(); let enabled = config.is_some(); let cache = Arc::new(BasebackupCache { data_dir, config: config.unwrap_or_default(), tenant_manager, remove_entry_sender, entries: std::sync::Mutex::new(HashMap::new()), cancel, read_hit_count: BASEBACKUP_CACHE_READ.with_label_values(&["hit"]), read_miss_count: BASEBACKUP_CACHE_READ.with_label_values(&["miss"]), read_err_count: BASEBACKUP_CACHE_READ.with_label_values(&["error"]), prepare_ok_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["ok"]), prepare_skip_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["skip"]), prepare_err_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["error"]), }); if enabled { runtime_handle.spawn( cache .clone() .background(prepare_receiver, remove_entry_receiver), ); } cache } /// Gets a basebackup entry from the cache. /// If the entry is found, opens a file with the basebackup archive and returns it. /// The open file descriptor will prevent the file system from deleting the file /// even if the entry is removed from the cache in the background. pub async fn get( &self, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn, ) -> Option { // Fast path. Check if the entry exists using the in-memory state. let tti = TenantTimelineId::new(tenant_id, timeline_id); if self.entries.lock().unwrap().get(&tti) != Some(&lsn) { self.read_miss_count.inc(); return None; } let path = self.entry_path(tenant_id, timeline_id, lsn); match tokio::fs::File::open(path).await { Ok(file) => { self.read_hit_count.inc(); Some(file) } Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { // We may end up here if the basebackup was concurrently removed by the cleanup task. self.read_miss_count.inc(); } else { self.read_err_count.inc(); tracing::warn!("Unexpected error opening basebackup cache file: {:?}", e); } None } } } // Private methods. fn entry_filename(tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn) -> String { // The default format for LSN is 0/ABCDEF. // The backslash is not filename friendly, so serialize it as plain hex. let lsn = lsn.0; format!("basebackup_{tenant_id}_{timeline_id}_{lsn:016X}.tar.gz") } fn entry_path(&self, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn) -> Utf8PathBuf { self.data_dir .join(Self::entry_filename(tenant_id, timeline_id, lsn)) } fn entry_tmp_path( &self, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn, ) -> Utf8PathBuf { self.data_dir .join("tmp") .join(Self::entry_filename(tenant_id, timeline_id, lsn)) } fn parse_entry_filename(filename: &str) -> Option<(TenantId, TimelineId, Lsn)> { let parts: Vec<&str> = filename .strip_prefix("basebackup_")? .strip_suffix(".tar.gz")? .split('_') .collect(); if parts.len() != 3 { return None; } let tenant_id = parts[0].parse::().ok()?; let timeline_id = parts[1].parse::().ok()?; let lsn = Lsn(u64::from_str_radix(parts[2], 16).ok()?); Some((tenant_id, timeline_id, lsn)) } async fn cleanup(&self) -> anyhow::Result<()> { // Cleanup tmp directory. let tmp_dir = self.data_dir.join("tmp"); let mut tmp_dir = tokio::fs::read_dir(&tmp_dir).await?; while let Some(dir_entry) = tmp_dir.next_entry().await? { if let Err(e) = tokio::fs::remove_file(dir_entry.path()).await { tracing::warn!("Failed to remove basebackup cache tmp file: {:#}", e); } } // Remove outdated entries. let entries_old = self.entries.lock().unwrap().clone(); let mut entries_new = HashMap::new(); for (tenant_shard_id, tenant_slot) in self.tenant_manager.list() { if !tenant_shard_id.is_shard_zero() { continue; } let TenantSlot::Attached(tenant) = tenant_slot else { continue; }; let tenant_id = tenant_shard_id.tenant_id; for timeline in tenant.list_timelines() { let tti = TenantTimelineId::new(tenant_id, timeline.timeline_id); if let Some(&entry_lsn) = entries_old.get(&tti) { if timeline.get_last_record_lsn() <= entry_lsn { entries_new.insert(tti, entry_lsn); } } } } for (&tti, &lsn) in entries_old.iter() { if !entries_new.contains_key(&tti) { self.remove_entry_sender .send(self.entry_path(tti.tenant_id, tti.timeline_id, lsn)) .unwrap(); } } BASEBACKUP_CACHE_ENTRIES.set(entries_new.len() as i64); *self.entries.lock().unwrap() = entries_new; Ok(()) } async fn on_startup(&self) -> anyhow::Result<()> { // Create data_dir and tmp directory if they do not exist. tokio::fs::create_dir_all(&self.data_dir.join("tmp")) .await .map_err(|e| { anyhow::anyhow!( "Failed to create basebackup cache data_dir {:?}: {:?}", self.data_dir, e ) })?; // Read existing entries from the data_dir and add them to in-memory state. let mut entries = HashMap::new(); let mut dir = tokio::fs::read_dir(&self.data_dir).await?; while let Some(dir_entry) = dir.next_entry().await? { let filename = dir_entry.file_name(); if filename == "tmp" { // Skip the tmp directory. continue; } let parsed = Self::parse_entry_filename(filename.to_string_lossy().as_ref()); let Some((tenant_id, timeline_id, lsn)) = parsed else { tracing::warn!("Invalid basebackup cache file name: {:?}", filename); continue; }; let tti = TenantTimelineId::new(tenant_id, timeline_id); use std::collections::hash_map::Entry::*; match entries.entry(tti) { Occupied(mut entry) => { let entry_lsn = *entry.get(); // Leave only the latest entry, remove the old one. if lsn < entry_lsn { self.remove_entry_sender.send(self.entry_path( tenant_id, timeline_id, lsn, ))?; } else if lsn > entry_lsn { self.remove_entry_sender.send(self.entry_path( tenant_id, timeline_id, entry_lsn, ))?; entry.insert(lsn); } else { // Two different filenames parsed to the same timline_id and LSN. // Should never happen. return Err(anyhow::anyhow!( "Duplicate basebackup cache entry with the same LSN: {:?}", filename )); } } Vacant(entry) => { entry.insert(lsn); } } } BASEBACKUP_CACHE_ENTRIES.set(entries.len() as i64); *self.entries.lock().unwrap() = entries; Ok(()) } async fn background( self: Arc, mut prepare_receiver: BasebackupPrepareReceiver, mut remove_entry_receiver: BasebackupRemoveEntryReceiver, ) { // Panic in the background is a safe fallback. // It will drop receivers and the cache will be effectively disabled. self.on_startup() .await .expect("Failed to initialize basebackup cache"); let mut cleanup_ticker = tokio::time::interval(self.config.cleanup_period); cleanup_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { tokio::select! { Some(req) = prepare_receiver.recv() => { if let Err(err) = self.prepare_basebackup( req.tenant_shard_id, req.timeline_id, req.lsn, ).await { tracing::info!("Failed to prepare basebackup: {:#}", err); self.prepare_err_count.inc(); continue; } } Some(req) = remove_entry_receiver.recv() => { if let Err(e) = tokio::fs::remove_file(req).await { tracing::warn!("Failed to remove basebackup cache file: {:#}", e); } } _ = cleanup_ticker.tick() => { self.cleanup().await.unwrap_or_else(|e| { tracing::warn!("Failed to clean up basebackup cache: {:#}", e); }); } _ = self.cancel.cancelled() => { tracing::info!("BasebackupCache background task cancelled"); break; } } } } /// Prepare a basebackup for the given timeline. /// /// If the basebackup already exists with a higher LSN or the timeline already /// has a higher last_record_lsn, skip the preparation. /// /// The basebackup is prepared in a temporary directory and then moved to the final /// location to make the operation atomic. async fn prepare_basebackup( &self, tenant_shard_id: TenantShardId, timeline_id: TimelineId, req_lsn: Lsn, ) -> anyhow::Result<()> { tracing::info!( tenant_id = %tenant_shard_id.tenant_id, %timeline_id, %req_lsn, "Preparing basebackup for timeline", ); let tti = TenantTimelineId::new(tenant_shard_id.tenant_id, timeline_id); { let entries = self.entries.lock().unwrap(); if let Some(&entry_lsn) = entries.get(&tti) { if entry_lsn >= req_lsn { tracing::info!( %timeline_id, %req_lsn, %entry_lsn, "Basebackup entry already exists for timeline with higher LSN, skipping basebackup", ); self.prepare_skip_count.inc(); return Ok(()); } } if entries.len() as i64 >= self.config.max_size_entries { tracing::info!( %timeline_id, %req_lsn, "Basebackup cache is full, skipping basebackup", ); self.prepare_skip_count.inc(); return Ok(()); } } let tenant = self .tenant_manager .get_attached_tenant_shard(tenant_shard_id)?; let tenant_state = tenant.current_state(); if tenant_state != TenantState::Active { anyhow::bail!( "Tenant {} is not active, current state: {:?}", tenant_shard_id.tenant_id, tenant_state ) } let timeline = tenant.get_timeline(timeline_id, true)?; let last_record_lsn = timeline.get_last_record_lsn(); if last_record_lsn > req_lsn { tracing::info!( %timeline_id, %req_lsn, %last_record_lsn, "Timeline has a higher LSN than the requested one, skipping basebackup", ); self.prepare_skip_count.inc(); return Ok(()); } let entry_tmp_path = self.entry_tmp_path(tenant_shard_id.tenant_id, timeline_id, req_lsn); let res = self .prepare_basebackup_tmp(&entry_tmp_path, &timeline, req_lsn) .await; if let Err(err) = res { tracing::info!("Failed to prepare basebackup tmp file: {:#}", err); // Try to clean up tmp file. If we fail, the background clean up task will take care of it. match tokio::fs::remove_file(&entry_tmp_path).await { Ok(_) => {} Err(e) if e.kind() == std::io::ErrorKind::NotFound => {} Err(e) => { tracing::info!("Failed to remove basebackup tmp file: {:?}", e); } } return Err(err); } // Move the tmp file to the final location atomically. let entry_path = self.entry_path(tenant_shard_id.tenant_id, timeline_id, req_lsn); tokio::fs::rename(&entry_tmp_path, &entry_path).await?; let mut entries = self.entries.lock().unwrap(); if let Some(old_lsn) = entries.insert(tti, req_lsn) { // Remove the old entry if it exists. self.remove_entry_sender .send(self.entry_path(tenant_shard_id.tenant_id, timeline_id, old_lsn)) .unwrap(); } BASEBACKUP_CACHE_ENTRIES.set(entries.len() as i64); self.prepare_ok_count.inc(); Ok(()) } /// Prepares a basebackup in a temporary file. async fn prepare_basebackup_tmp( &self, emptry_tmp_path: &Utf8Path, timeline: &Arc, req_lsn: Lsn, ) -> anyhow::Result<()> { let ctx = RequestContext::new(TaskKind::BasebackupCache, DownloadBehavior::Download); let ctx = ctx.with_scope_timeline(timeline); let file = tokio::fs::File::create(emptry_tmp_path).await?; let mut writer = BufWriter::new(file); let mut encoder = GzipEncoder::with_quality( &mut writer, // Level::Best because compression is not on the hot path of basebackup requests. // The decompression is almost not affected by the compression level. async_compression::Level::Best, ); // We may receive a request before the WAL record is applied to the timeline. // Wait for the requested LSN to be applied. timeline .wait_lsn( req_lsn, crate::tenant::timeline::WaitLsnWaiter::BaseBackupCache, crate::tenant::timeline::WaitLsnTimeout::Default, &ctx, ) .await?; send_basebackup_tarball( &mut encoder, timeline, Some(req_lsn), None, false, false, &ctx, ) .await?; encoder.shutdown().await?; writer.flush().await?; writer.into_inner().sync_all().await?; Ok(()) } }