diff --git a/pageserver/src/basebackup_cache.rs b/pageserver/src/basebackup_cache.rs index 3a8ec555f7..7dde3e02fe 100644 --- a/pageserver/src/basebackup_cache.rs +++ b/pageserver/src/basebackup_cache.rs @@ -1,5 +1,6 @@ use std::{collections::HashMap, sync::Arc}; +use anyhow::Context; use async_compression::tokio::write::GzipEncoder; use camino::{Utf8Path, Utf8PathBuf}; use metrics::core::{AtomicU64, GenericCounter}; @@ -167,14 +168,17 @@ impl BasebackupCache { .join(Self::entry_filename(tenant_id, timeline_id, lsn)) } + fn tmp_dir(&self) -> Utf8PathBuf { + self.data_dir.join("tmp") + } + fn entry_tmp_path( &self, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn, ) -> Utf8PathBuf { - self.data_dir - .join("tmp") + self.tmp_dir() .join(Self::entry_filename(tenant_id, timeline_id, lsn)) } @@ -194,15 +198,18 @@ impl BasebackupCache { Some((tenant_id, timeline_id, lsn)) } - async fn cleanup(&self) -> anyhow::Result<()> { - // Cleanup tmp directory. - let tmp_dir = self.data_dir.join("tmp"); - let mut tmp_dir = tokio::fs::read_dir(&tmp_dir).await?; - while let Some(dir_entry) = tmp_dir.next_entry().await? { - if let Err(e) = tokio::fs::remove_file(dir_entry.path()).await { - tracing::warn!("Failed to remove basebackup cache tmp file: {:#}", e); - } + // Recreate the tmp directory to clear all files in it. + async fn clean_tmp_dir(&self) -> anyhow::Result<()> { + let tmp_dir = self.tmp_dir(); + if tmp_dir.exists() { + tokio::fs::remove_dir_all(&tmp_dir).await?; } + tokio::fs::create_dir_all(&tmp_dir).await?; + Ok(()) + } + + async fn cleanup(&self) -> anyhow::Result<()> { + self.clean_tmp_dir().await?; // Remove outdated entries. let entries_old = self.entries.lock().unwrap().clone(); @@ -241,16 +248,14 @@ impl BasebackupCache { } async fn on_startup(&self) -> anyhow::Result<()> { - // Create data_dir and tmp directory if they do not exist. - tokio::fs::create_dir_all(&self.data_dir.join("tmp")) + // Create data_dir if it does not exist. + tokio::fs::create_dir_all(&self.data_dir) .await - .map_err(|e| { - anyhow::anyhow!( - "Failed to create basebackup cache data_dir {:?}: {:?}", - self.data_dir, - e - ) - })?; + .context("Failed to create basebackup cache data directory")?; + + self.clean_tmp_dir() + .await + .context("Failed to clean tmp directory")?; // Read existing entries from the data_dir and add them to in-memory state. let mut entries = HashMap::new(); @@ -451,6 +456,11 @@ impl BasebackupCache { } // Move the tmp file to the final location atomically. + // The tmp file is fsynced, so it's guaranteed that we will not have a partial file + // in the main directory. + // It's not necessary to fsync the inode after renaming, because the worst case is that + // the rename operation will be rolled back on the disk failure, the entry will disappear + // from the main directory, and the entry access will cause a cache miss. let entry_path = self.entry_path(tenant_shard_id.tenant_id, timeline_id, req_lsn); tokio::fs::rename(&entry_tmp_path, &entry_path).await?; @@ -468,16 +478,17 @@ impl BasebackupCache { } /// Prepares a basebackup in a temporary file. + /// Guarantees that the tmp file is fsynced before returning. async fn prepare_basebackup_tmp( &self, - emptry_tmp_path: &Utf8Path, + entry_tmp_path: &Utf8Path, timeline: &Arc, req_lsn: Lsn, ) -> anyhow::Result<()> { let ctx = RequestContext::new(TaskKind::BasebackupCache, DownloadBehavior::Download); let ctx = ctx.with_scope_timeline(timeline); - let file = tokio::fs::File::create(emptry_tmp_path).await?; + let file = tokio::fs::File::create(entry_tmp_path).await?; let mut writer = BufWriter::new(file); let mut encoder = GzipEncoder::with_quality(