diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index f6001d68c4..2d7a06a72f 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -356,16 +356,21 @@ pub struct TimelineImportConfig { pub struct BasebackupCacheConfig { #[serde(with = "humantime_serde")] pub cleanup_period: Duration, - // FIXME: Support max_size_bytes. - // pub max_size_bytes: usize, - pub max_size_entries: i64, + /// Maximum total size of basebackup cache entries on disk in bytes. + /// The cache may slightly exceed this limit because we do not know + /// the exact size of the cache entry untill it's written to disk. + pub max_total_size_bytes: u64, + // TODO(diko): support max_entry_size_bytes. + // pub max_entry_size_bytes: u64, + pub max_size_entries: usize, } impl Default for BasebackupCacheConfig { fn default() -> Self { Self { cleanup_period: Duration::from_secs(60), - // max_size_bytes: 1024 * 1024 * 1024, // 1 GiB + max_total_size_bytes: 1024 * 1024 * 1024, // 1 GiB + // max_entry_size_bytes: 16 * 1024 * 1024, // 16 MiB max_size_entries: 1000, } } diff --git a/pageserver/src/basebackup_cache.rs b/pageserver/src/basebackup_cache.rs index 7dde3e02fe..24f6413380 100644 --- a/pageserver/src/basebackup_cache.rs +++ b/pageserver/src/basebackup_cache.rs @@ -19,7 +19,10 @@ use utils::{ use crate::{ basebackup::send_basebackup_tarball, context::{DownloadBehavior, RequestContext}, - metrics::{BASEBACKUP_CACHE_ENTRIES, BASEBACKUP_CACHE_PREPARE, BASEBACKUP_CACHE_READ}, + metrics::{ + BASEBACKUP_CACHE_ENTRIES, BASEBACKUP_CACHE_PREPARE, BASEBACKUP_CACHE_READ, + BASEBACKUP_CACHE_SIZE, + }, task_mgr::TaskKind, tenant::{ Timeline, @@ -36,8 +39,13 @@ pub struct BasebackupPrepareRequest { pub type BasebackupPrepareSender = UnboundedSender; pub type BasebackupPrepareReceiver = UnboundedReceiver; -type BasebackupRemoveEntrySender = UnboundedSender; -type BasebackupRemoveEntryReceiver = UnboundedReceiver; +#[derive(Clone)] +struct CacheEntry { + /// LSN at which the basebackup was taken. + lsn: Lsn, + /// Size of the basebackup archive in bytes. + size_bytes: u64, +} /// BasebackupCache stores cached basebackup archives for timelines on local disk. /// @@ -53,21 +61,12 @@ type BasebackupRemoveEntryReceiver = UnboundedReceiver; /// and ~1 RPS for get requests. pub struct BasebackupCache { data_dir: Utf8PathBuf, - config: BasebackupCacheConfig, - tenant_manager: Arc, - remove_entry_sender: BasebackupRemoveEntrySender, - entries: std::sync::Mutex>, - - cancel: CancellationToken, + entries: std::sync::Mutex>, read_hit_count: GenericCounter, read_miss_count: GenericCounter, read_err_count: GenericCounter, - - prepare_ok_count: GenericCounter, - prepare_skip_count: GenericCounter, - prepare_err_count: GenericCounter, } impl BasebackupCache { @@ -83,35 +82,32 @@ impl BasebackupCache { tenant_manager: Arc, cancel: CancellationToken, ) -> Arc { - let (remove_entry_sender, remove_entry_receiver) = tokio::sync::mpsc::unbounded_channel(); - - let enabled = config.is_some(); - let cache = Arc::new(BasebackupCache { data_dir, - config: config.unwrap_or_default(), - tenant_manager, - remove_entry_sender, entries: std::sync::Mutex::new(HashMap::new()), - cancel, - read_hit_count: BASEBACKUP_CACHE_READ.with_label_values(&["hit"]), read_miss_count: BASEBACKUP_CACHE_READ.with_label_values(&["miss"]), read_err_count: BASEBACKUP_CACHE_READ.with_label_values(&["error"]), - - prepare_ok_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["ok"]), - prepare_skip_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["skip"]), - prepare_err_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["error"]), }); - if enabled { - runtime_handle.spawn( - cache - .clone() - .background(prepare_receiver, remove_entry_receiver), - ); + if let Some(config) = config { + let background = BackgroundTask { + c: cache.clone(), + + config, + tenant_manager, + cancel, + + entry_count: 0, + total_size_bytes: 0, + + prepare_ok_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["ok"]), + prepare_skip_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["skip"]), + prepare_err_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["error"]), + }; + runtime_handle.spawn(background.run(prepare_receiver)); } cache @@ -129,7 +125,7 @@ impl BasebackupCache { ) -> Option { // Fast path. Check if the entry exists using the in-memory state. let tti = TenantTimelineId::new(tenant_id, timeline_id); - if self.entries.lock().unwrap().get(&tti) != Some(&lsn) { + if self.entries.lock().unwrap().get(&tti).map(|e| e.lsn) != Some(lsn) { self.read_miss_count.inc(); return None; } @@ -167,9 +163,41 @@ impl BasebackupCache { self.data_dir .join(Self::entry_filename(tenant_id, timeline_id, lsn)) } +} +/// The background task that does the job to prepare basebackups +/// and manage the cache entries on disk. +/// It is a separate struct from BasebackupCache to allow holding +/// a mutable reference to this state without a mutex lock, +/// while BasebackupCache is referenced by the clients. +struct BackgroundTask { + c: Arc, + + config: BasebackupCacheConfig, + tenant_manager: Arc, + cancel: CancellationToken, + + /// Number of the entries in the cache. + /// This counter is used for metrics and applying cache limits. + /// It generally should be equal to c.entries.len(), but it's calculated + /// pessimistically for abnormal situations: if we encountered some errors + /// during removing the entry from disk, we won't decrement this counter to + /// make sure that we don't exceed the limit with "trashed" files on the disk. + /// It will also count files in the data_dir that are not valid cache entries. + entry_count: usize, + /// Total size of all the entries on the disk. + /// This counter is used for metrics and applying cache limits. + /// Similar to entry_count, it is calculated pessimistically for abnormal situations. + total_size_bytes: u64, + + prepare_ok_count: GenericCounter, + prepare_skip_count: GenericCounter, + prepare_err_count: GenericCounter, +} + +impl BackgroundTask { fn tmp_dir(&self) -> Utf8PathBuf { - self.data_dir.join("tmp") + self.c.data_dir.join("tmp") } fn entry_tmp_path( @@ -179,7 +207,7 @@ impl BasebackupCache { lsn: Lsn, ) -> Utf8PathBuf { self.tmp_dir() - .join(Self::entry_filename(tenant_id, timeline_id, lsn)) + .join(BasebackupCache::entry_filename(tenant_id, timeline_id, lsn)) } fn parse_entry_filename(filename: &str) -> Option<(TenantId, TimelineId, Lsn)> { @@ -208,11 +236,11 @@ impl BasebackupCache { Ok(()) } - async fn cleanup(&self) -> anyhow::Result<()> { + async fn cleanup(&mut self) -> anyhow::Result<()> { self.clean_tmp_dir().await?; - // Remove outdated entries. - let entries_old = self.entries.lock().unwrap().clone(); + // Leave only up-to-date entries. + let entries_old = self.c.entries.lock().unwrap().clone(); let mut entries_new = HashMap::new(); for (tenant_shard_id, tenant_slot) in self.tenant_manager.list() { if !tenant_shard_id.is_shard_zero() { @@ -225,31 +253,32 @@ impl BasebackupCache { for timeline in tenant.list_timelines() { let tti = TenantTimelineId::new(tenant_id, timeline.timeline_id); - if let Some(&entry_lsn) = entries_old.get(&tti) { - if timeline.get_last_record_lsn() <= entry_lsn { - entries_new.insert(tti, entry_lsn); + if let Some(entry) = entries_old.get(&tti) { + if timeline.get_last_record_lsn() <= entry.lsn { + entries_new.insert(tti, entry.clone()); } } } } - for (&tti, &lsn) in entries_old.iter() { + // Try to remove all entries that are not up-to-date. + for (&tti, entry) in entries_old.iter() { if !entries_new.contains_key(&tti) { - self.remove_entry_sender - .send(self.entry_path(tti.tenant_id, tti.timeline_id, lsn)) - .unwrap(); + self.try_remove_entry(tti.tenant_id, tti.timeline_id, entry) + .await; } } - BASEBACKUP_CACHE_ENTRIES.set(entries_new.len() as i64); - *self.entries.lock().unwrap() = entries_new; + // Note: BackgroundTask is the only writer for self.c.entries, + // so it couldn't have been modified concurrently. + *self.c.entries.lock().unwrap() = entries_new; Ok(()) } - async fn on_startup(&self) -> anyhow::Result<()> { + async fn on_startup(&mut self) -> anyhow::Result<()> { // Create data_dir if it does not exist. - tokio::fs::create_dir_all(&self.data_dir) + tokio::fs::create_dir_all(&self.c.data_dir) .await .context("Failed to create basebackup cache data directory")?; @@ -258,8 +287,8 @@ impl BasebackupCache { .context("Failed to clean tmp directory")?; // Read existing entries from the data_dir and add them to in-memory state. - let mut entries = HashMap::new(); - let mut dir = tokio::fs::read_dir(&self.data_dir).await?; + let mut entries = HashMap::::new(); + let mut dir = tokio::fs::read_dir(&self.c.data_dir).await?; while let Some(dir_entry) = dir.next_entry().await? { let filename = dir_entry.file_name(); @@ -268,33 +297,43 @@ impl BasebackupCache { continue; } + let size_bytes = dir_entry + .metadata() + .await + .map_err(|e| { + anyhow::anyhow!("Failed to read metadata for file {:?}: {:?}", filename, e) + })? + .len(); + + self.entry_count += 1; + BASEBACKUP_CACHE_ENTRIES.set(self.entry_count as u64); + + self.total_size_bytes += size_bytes; + BASEBACKUP_CACHE_SIZE.set(self.total_size_bytes); + let parsed = Self::parse_entry_filename(filename.to_string_lossy().as_ref()); let Some((tenant_id, timeline_id, lsn)) = parsed else { tracing::warn!("Invalid basebackup cache file name: {:?}", filename); continue; }; + let cur_entry = CacheEntry { lsn, size_bytes }; + let tti = TenantTimelineId::new(tenant_id, timeline_id); use std::collections::hash_map::Entry::*; match entries.entry(tti) { Occupied(mut entry) => { - let entry_lsn = *entry.get(); + let found_entry = entry.get(); // Leave only the latest entry, remove the old one. - if lsn < entry_lsn { - self.remove_entry_sender.send(self.entry_path( - tenant_id, - timeline_id, - lsn, - ))?; - } else if lsn > entry_lsn { - self.remove_entry_sender.send(self.entry_path( - tenant_id, - timeline_id, - entry_lsn, - ))?; - entry.insert(lsn); + if cur_entry.lsn < found_entry.lsn { + self.try_remove_entry(tenant_id, timeline_id, &cur_entry) + .await; + } else if cur_entry.lsn > found_entry.lsn { + self.try_remove_entry(tenant_id, timeline_id, found_entry) + .await; + entry.insert(cur_entry); } else { // Two different filenames parsed to the same timline_id and LSN. // Should never happen. @@ -305,22 +344,17 @@ impl BasebackupCache { } } Vacant(entry) => { - entry.insert(lsn); + entry.insert(cur_entry); } } } - BASEBACKUP_CACHE_ENTRIES.set(entries.len() as i64); - *self.entries.lock().unwrap() = entries; + *self.c.entries.lock().unwrap() = entries; Ok(()) } - async fn background( - self: Arc, - mut prepare_receiver: BasebackupPrepareReceiver, - mut remove_entry_receiver: BasebackupRemoveEntryReceiver, - ) { + async fn run(mut self, mut prepare_receiver: BasebackupPrepareReceiver) { // Panic in the background is a safe fallback. // It will drop receivers and the cache will be effectively disabled. self.on_startup() @@ -343,11 +377,6 @@ impl BasebackupCache { continue; } } - Some(req) = remove_entry_receiver.recv() => { - if let Err(e) = tokio::fs::remove_file(req).await { - tracing::warn!("Failed to remove basebackup cache file: {:#}", e); - } - } _ = cleanup_ticker.tick() => { self.cleanup().await.unwrap_or_else(|e| { tracing::warn!("Failed to clean up basebackup cache: {:#}", e); @@ -361,6 +390,67 @@ impl BasebackupCache { } } + /// Try to remove an entry from disk. + /// The caller is responsible for removing the entry from the in-memory state. + /// Updates size counters and corresponding metrics. + /// Ignores the filesystem errors as not-so-important, but the size counters + /// are not decremented in this case, so the file will continue to be counted + /// towards the size limits. + async fn try_remove_entry( + &mut self, + tenant_id: TenantId, + timeline_id: TimelineId, + entry: &CacheEntry, + ) { + let entry_path = self.c.entry_path(tenant_id, timeline_id, entry.lsn); + + match tokio::fs::remove_file(&entry_path).await { + Ok(_) => {} + Err(e) if e.kind() == std::io::ErrorKind::NotFound => {} + Err(e) => { + tracing::warn!( + "Failed to remove basebackup cache file for tenant {} timeline {} LSN {}: {:#}", + tenant_id, + timeline_id, + entry.lsn, + e + ); + return; + } + } + + self.entry_count -= 1; + BASEBACKUP_CACHE_ENTRIES.set(self.entry_count as u64); + + self.total_size_bytes -= entry.size_bytes; + BASEBACKUP_CACHE_SIZE.set(self.total_size_bytes); + } + + /// Insert the cache entry into in-memory state and update the size counters. + /// Assumes that the file for the entry already exists on disk. + /// If the entry already exists with previous LSN, it will be removed. + async fn upsert_entry( + &mut self, + tenant_id: TenantId, + timeline_id: TimelineId, + entry: CacheEntry, + ) { + let tti = TenantTimelineId::new(tenant_id, timeline_id); + + self.entry_count += 1; + BASEBACKUP_CACHE_ENTRIES.set(self.entry_count as u64); + + self.total_size_bytes += entry.size_bytes; + BASEBACKUP_CACHE_SIZE.set(self.total_size_bytes); + + let old_entry = self.c.entries.lock().unwrap().insert(tti, entry); + + if let Some(old_entry) = old_entry { + self.try_remove_entry(tenant_id, timeline_id, &old_entry) + .await; + } + } + /// Prepare a basebackup for the given timeline. /// /// If the basebackup already exists with a higher LSN or the timeline already @@ -369,7 +459,7 @@ impl BasebackupCache { /// The basebackup is prepared in a temporary directory and then moved to the final /// location to make the operation atomic. async fn prepare_basebackup( - &self, + &mut self, tenant_shard_id: TenantShardId, timeline_id: TimelineId, req_lsn: Lsn, @@ -383,30 +473,44 @@ impl BasebackupCache { let tti = TenantTimelineId::new(tenant_shard_id.tenant_id, timeline_id); + // TODO(diko): I don't think we will hit the limit, + // but if we do, it makes sense to try to evict oldest entries. here + if self.entry_count >= self.config.max_size_entries { + tracing::info!( + %tenant_shard_id, + %timeline_id, + %req_lsn, + "Basebackup cache is full (max_size_entries), skipping basebackup", + ); + self.prepare_skip_count.inc(); + return Ok(()); + } + + if self.total_size_bytes >= self.config.max_total_size_bytes { + tracing::info!( + %tenant_shard_id, + %timeline_id, + %req_lsn, + "Basebackup cache is full (max_total_size_bytes), skipping basebackup", + ); + self.prepare_skip_count.inc(); + return Ok(()); + } + { - let entries = self.entries.lock().unwrap(); - if let Some(&entry_lsn) = entries.get(&tti) { - if entry_lsn >= req_lsn { + let entries = self.c.entries.lock().unwrap(); + if let Some(entry) = entries.get(&tti) { + if entry.lsn >= req_lsn { tracing::info!( %timeline_id, %req_lsn, - %entry_lsn, + %entry.lsn, "Basebackup entry already exists for timeline with higher LSN, skipping basebackup", ); self.prepare_skip_count.inc(); return Ok(()); } } - - if entries.len() as i64 >= self.config.max_size_entries { - tracing::info!( - %timeline_id, - %req_lsn, - "Basebackup cache is full, skipping basebackup", - ); - self.prepare_skip_count.inc(); - return Ok(()); - } } let tenant = self @@ -442,18 +546,21 @@ impl BasebackupCache { .prepare_basebackup_tmp(&entry_tmp_path, &timeline, req_lsn) .await; - if let Err(err) = res { - tracing::info!("Failed to prepare basebackup tmp file: {:#}", err); - // Try to clean up tmp file. If we fail, the background clean up task will take care of it. - match tokio::fs::remove_file(&entry_tmp_path).await { - Ok(_) => {} - Err(e) if e.kind() == std::io::ErrorKind::NotFound => {} - Err(e) => { - tracing::info!("Failed to remove basebackup tmp file: {:?}", e); + let entry = match res { + Ok(entry) => entry, + Err(err) => { + tracing::info!("Failed to prepare basebackup tmp file: {:#}", err); + // Try to clean up tmp file. If we fail, the background clean up task will take care of it. + match tokio::fs::remove_file(&entry_tmp_path).await { + Ok(_) => {} + Err(e) if e.kind() == std::io::ErrorKind::NotFound => {} + Err(e) => { + tracing::info!("Failed to remove basebackup tmp file: {:?}", e); + } } + return Err(err); } - return Err(err); - } + }; // Move the tmp file to the final location atomically. // The tmp file is fsynced, so it's guaranteed that we will not have a partial file @@ -461,17 +568,13 @@ impl BasebackupCache { // It's not necessary to fsync the inode after renaming, because the worst case is that // the rename operation will be rolled back on the disk failure, the entry will disappear // from the main directory, and the entry access will cause a cache miss. - let entry_path = self.entry_path(tenant_shard_id.tenant_id, timeline_id, req_lsn); + let entry_path = self + .c + .entry_path(tenant_shard_id.tenant_id, timeline_id, req_lsn); tokio::fs::rename(&entry_tmp_path, &entry_path).await?; - let mut entries = self.entries.lock().unwrap(); - if let Some(old_lsn) = entries.insert(tti, req_lsn) { - // Remove the old entry if it exists. - self.remove_entry_sender - .send(self.entry_path(tenant_shard_id.tenant_id, timeline_id, old_lsn)) - .unwrap(); - } - BASEBACKUP_CACHE_ENTRIES.set(entries.len() as i64); + self.upsert_entry(tenant_shard_id.tenant_id, timeline_id, entry) + .await; self.prepare_ok_count.inc(); Ok(()) @@ -484,7 +587,7 @@ impl BasebackupCache { entry_tmp_path: &Utf8Path, timeline: &Arc, req_lsn: Lsn, - ) -> anyhow::Result<()> { + ) -> anyhow::Result { let ctx = RequestContext::new(TaskKind::BasebackupCache, DownloadBehavior::Download); let ctx = ctx.with_scope_timeline(timeline); @@ -524,6 +627,12 @@ impl BasebackupCache { writer.flush().await?; writer.into_inner().sync_all().await?; - Ok(()) + // TODO(diko): we can count it via Writer wrapper instead of a syscall. + let size_bytes = tokio::fs::metadata(entry_tmp_path).await?.len(); + + Ok(CacheEntry { + lsn: req_lsn, + size_bytes, + }) } } diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 3b3522c36a..bf54614baa 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -4428,18 +4428,16 @@ pub(crate) static BASEBACKUP_CACHE_PREPARE: Lazy = Lazy::new(|| { .expect("failed to define a metric") }); -pub(crate) static BASEBACKUP_CACHE_ENTRIES: Lazy = Lazy::new(|| { - register_int_gauge!( +pub(crate) static BASEBACKUP_CACHE_ENTRIES: Lazy = Lazy::new(|| { + register_uint_gauge!( "pageserver_basebackup_cache_entries_total", "Number of entries in the basebackup cache" ) .expect("failed to define a metric") }); -// FIXME: Support basebackup cache size metrics. -#[allow(dead_code)] -pub(crate) static BASEBACKUP_CACHE_SIZE: Lazy = Lazy::new(|| { - register_int_gauge!( +pub(crate) static BASEBACKUP_CACHE_SIZE: Lazy = Lazy::new(|| { + register_uint_gauge!( "pageserver_basebackup_cache_size_bytes", "Total size of all basebackup cache entries on disk in bytes" ) diff --git a/test_runner/regress/test_basebackup.py b/test_runner/regress/test_basebackup.py index 2d42be4051..d1b10ec85d 100644 --- a/test_runner/regress/test_basebackup.py +++ b/test_runner/regress/test_basebackup.py @@ -69,6 +69,11 @@ def test_basebackup_cache(neon_env_builder: NeonEnvBuilder): ).value == i + 1 ) + # There should be only one basebackup file in the cache. + assert metrics.query_one("pageserver_basebackup_cache_entries_total").value == 1 + # The size of one basebackup for new DB is ~20KB. + size_bytes = metrics.query_one("pageserver_basebackup_cache_size_bytes").value + assert 10 * 1024 <= size_bytes <= 100 * 1024 wait_until(check_metrics)