From 024372a3db071c945cbdd7f4cc1b759e56386534 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 14 Feb 2024 20:17:12 +0100 Subject: [PATCH] Revert "refactor(VirtualFile::crashsafe_overwrite): avoid Handle::block_on in callers" (#6765) Reverts neondatabase/neon#6731 On high tenant count Pageservers in staging, memory and CPU usage shoots to 100% with this change. (NB: staging currently has tokio-epoll-uring enabled) Will analyze tomorrow. https://neondb.slack.com/archives/C03H1K0PGKH/p1707933875639379?thread_ts=1707929541.125329&cid=C03H1K0PGKH --- libs/utils/src/crashsafe.rs | 44 +----------- pageserver/src/deletion_queue.rs | 5 +- pageserver/src/tenant.rs | 33 ++++++--- pageserver/src/tenant/metadata.rs | 2 +- pageserver/src/tenant/secondary/downloader.rs | 11 ++- pageserver/src/virtual_file.rs | 72 +++++++++++-------- 6 files changed, 78 insertions(+), 89 deletions(-) diff --git a/libs/utils/src/crashsafe.rs b/libs/utils/src/crashsafe.rs index 756b19138c..1c72e9cae9 100644 --- a/libs/utils/src/crashsafe.rs +++ b/libs/utils/src/crashsafe.rs @@ -1,7 +1,7 @@ use std::{ borrow::Cow, fs::{self, File}, - io::{self, Write}, + io, }; use camino::{Utf8Path, Utf8PathBuf}; @@ -161,48 +161,6 @@ pub async fn durable_rename( Ok(()) } -/// Writes a file to the specified `final_path` in a crash safe fasion, using [`std::fs`]. -/// -/// The file is first written to the specified `tmp_path`, and in a second -/// step, the `tmp_path` is renamed to the `final_path`. Intermediary fsync -/// and atomic rename guarantee that, if we crash at any point, there will never -/// be a partially written file at `final_path` (but maybe at `tmp_path`). -/// -/// Callers are responsible for serializing calls of this function for a given `final_path`. -/// If they don't, there may be an error due to conflicting `tmp_path`, or there will -/// be no error and the content of `final_path` will be the "winner" caller's `content`. -/// I.e., the atomticity guarantees still hold. -pub fn overwrite( - final_path: &Utf8Path, - tmp_path: &Utf8Path, - content: &[u8], -) -> std::io::Result<()> { - let Some(final_path_parent) = final_path.parent() else { - return Err(std::io::Error::from_raw_os_error( - nix::errno::Errno::EINVAL as i32, - )); - }; - std::fs::remove_file(tmp_path).or_else(crate::fs_ext::ignore_not_found)?; - let mut file = std::fs::OpenOptions::new() - .write(true) - // Use `create_new` so that, if we race with ourselves or something else, - // we bail out instead of causing damage. - .create_new(true) - .open(tmp_path)?; - file.write_all(content)?; - file.sync_all()?; - drop(file); // don't keep the fd open for longer than we have to - - std::fs::rename(tmp_path, final_path)?; - - let final_parent_dirfd = std::fs::OpenOptions::new() - .read(true) - .open(final_path_parent)?; - - final_parent_dirfd.sync_all()?; - Ok(()) -} - #[cfg(test)] mod tests { diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index f8f2866a3b..81938b14b3 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -234,7 +234,7 @@ impl DeletionHeader { let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?; let header_path = conf.deletion_header_path(); let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX); - VirtualFile::crashsafe_overwrite(header_path, temp_path, header_bytes) + VirtualFile::crashsafe_overwrite(&header_path, &temp_path, header_bytes) .await .maybe_fatal_err("save deletion header")?; @@ -325,8 +325,7 @@ impl DeletionList { let temp_path = path_with_suffix_extension(&path, TEMP_SUFFIX); let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list"); - - VirtualFile::crashsafe_overwrite(path, temp_path, bytes) + VirtualFile::crashsafe_overwrite(&path, &temp_path, bytes) .await .maybe_fatal_err("save deletion list") .map_err(Into::into) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index dc9b8247a5..88f4ae7086 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -28,6 +28,7 @@ use remote_storage::GenericRemoteStorage; use std::fmt; use storage_broker::BrokerClientChannel; use tokio::io::BufReader; +use tokio::runtime::Handle; use tokio::sync::watch; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; @@ -2877,10 +2878,17 @@ impl Tenant { let tenant_shard_id = *tenant_shard_id; let config_path = config_path.to_owned(); - let conf_content = conf_content.into_bytes(); - VirtualFile::crashsafe_overwrite(config_path.clone(), temp_path, conf_content) - .await - .with_context(|| format!("write tenant {tenant_shard_id} config to {config_path}"))?; + tokio::task::spawn_blocking(move || { + Handle::current().block_on(async move { + let conf_content = conf_content.into_bytes(); + VirtualFile::crashsafe_overwrite(&config_path, &temp_path, conf_content) + .await + .with_context(|| { + format!("write tenant {tenant_shard_id} config to {config_path}") + }) + }) + }) + .await??; Ok(()) } @@ -2907,12 +2915,17 @@ impl Tenant { let tenant_shard_id = *tenant_shard_id; let target_config_path = target_config_path.to_owned(); - let conf_content = conf_content.into_bytes(); - VirtualFile::crashsafe_overwrite(target_config_path.clone(), temp_path, conf_content) - .await - .with_context(|| { - format!("write tenant {tenant_shard_id} config to {target_config_path}") - })?; + tokio::task::spawn_blocking(move || { + Handle::current().block_on(async move { + let conf_content = conf_content.into_bytes(); + VirtualFile::crashsafe_overwrite(&target_config_path, &temp_path, conf_content) + .await + .with_context(|| { + format!("write tenant {tenant_shard_id} config to {target_config_path}") + }) + }) + }) + .await??; Ok(()) } diff --git a/pageserver/src/tenant/metadata.rs b/pageserver/src/tenant/metadata.rs index 233acfd431..dcbe781f90 100644 --- a/pageserver/src/tenant/metadata.rs +++ b/pageserver/src/tenant/metadata.rs @@ -279,7 +279,7 @@ pub async fn save_metadata( let path = conf.metadata_path(tenant_shard_id, timeline_id); let temp_path = path_with_suffix_extension(&path, TEMP_FILE_SUFFIX); let metadata_bytes = data.to_bytes().context("serialize metadata")?; - VirtualFile::crashsafe_overwrite(path, temp_path, metadata_bytes) + VirtualFile::crashsafe_overwrite(&path, &temp_path, metadata_bytes) .await .context("write metadata")?; Ok(()) diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index c8288acc20..c23416a7f0 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -484,9 +484,14 @@ impl<'a> TenantDownloader<'a> { let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX); let context_msg = format!("write tenant {tenant_shard_id} heatmap to {heatmap_path}"); let heatmap_path_bg = heatmap_path.clone(); - VirtualFile::crashsafe_overwrite(heatmap_path_bg, temp_path, heatmap_bytes) - .await - .maybe_fatal_err(&context_msg)?; + tokio::task::spawn_blocking(move || { + tokio::runtime::Handle::current().block_on(async move { + VirtualFile::crashsafe_overwrite(&heatmap_path_bg, &temp_path, heatmap_bytes).await + }) + }) + .await + .expect("Blocking task is never aborted") + .maybe_fatal_err(&context_msg)?; tracing::debug!("Wrote local heatmap to {}", heatmap_path); diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 858fc0ef64..45c3e19cfc 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -19,13 +19,14 @@ use once_cell::sync::OnceCell; use pageserver_api::shard::TenantShardId; use std::fs::{self, File}; use std::io::{Error, ErrorKind, Seek, SeekFrom}; -use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice}; +use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice}; use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; use std::os::unix::fs::FileExt; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use tokio::time::Instant; +use utils::fs_ext; pub use pageserver_api::models::virtual_file as api; pub(crate) mod io_engine; @@ -403,34 +404,47 @@ impl VirtualFile { Ok(vfile) } - /// Async version of [`::utils::crashsafe::overwrite`]. + /// Writes a file to the specified `final_path` in a crash safe fasion /// - /// # NB: - /// - /// Doesn't actually use the [`VirtualFile`] file descriptor cache, but, - /// it did at an earlier time. - /// And it will use this module's [`io_engine`] in the near future, so, leaving it here. - pub async fn crashsafe_overwrite + Send, Buf: IoBuf + Send>( - final_path: Utf8PathBuf, - tmp_path: Utf8PathBuf, + /// The file is first written to the specified tmp_path, and in a second + /// step, the tmp path is renamed to the final path. As renames are + /// atomic, a crash during the write operation will never leave behind a + /// partially written file. + pub async fn crashsafe_overwrite( + final_path: &Utf8Path, + tmp_path: &Utf8Path, content: B, ) -> std::io::Result<()> { - // TODO: use tokio_epoll_uring if configured as `io_engine`. - // See https://github.com/neondatabase/neon/issues/6663 - - tokio::task::spawn_blocking(move || { - let slice_storage; - let content_len = content.bytes_init(); - let content = if content.bytes_init() > 0 { - slice_storage = Some(content.slice(0..content_len)); - slice_storage.as_deref().expect("just set it to Some()") - } else { - &[] - }; - utils::crashsafe::overwrite(&final_path, &tmp_path, content) - }) - .await - .expect("blocking task is never aborted") + let Some(final_path_parent) = final_path.parent() else { + return Err(std::io::Error::from_raw_os_error( + nix::errno::Errno::EINVAL as i32, + )); + }; + std::fs::remove_file(tmp_path).or_else(fs_ext::ignore_not_found)?; + let mut file = Self::open_with_options( + tmp_path, + OpenOptions::new() + .write(true) + // Use `create_new` so that, if we race with ourselves or something else, + // we bail out instead of causing damage. + .create_new(true), + ) + .await?; + let (_content, res) = file.write_all(content).await; + res?; + file.sync_all().await?; + drop(file); // before the rename, that's important! + // renames are atomic + std::fs::rename(tmp_path, final_path)?; + // Only open final path parent dirfd now, so that this operation only + // ever holds one VirtualFile fd at a time. That's important because + // the current `find_victim_slot` impl might pick the same slot for both + // VirtualFile., and it eventually does a blocking write lock instead of + // try_lock. + let final_parent_dirfd = + Self::open_with_options(final_path_parent, OpenOptions::new().read(true)).await?; + final_parent_dirfd.sync_all().await?; + Ok(()) } /// Call File::sync_all() on the underlying File. @@ -1323,7 +1337,7 @@ mod tests { let path = testdir.join("myfile"); let tmp_path = testdir.join("myfile.tmp"); - VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec()) + VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec()) .await .unwrap(); let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap()); @@ -1332,7 +1346,7 @@ mod tests { assert!(!tmp_path.exists()); drop(file); - VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec()) + VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"bar".to_vec()) .await .unwrap(); let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap()); @@ -1354,7 +1368,7 @@ mod tests { std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap(); assert!(tmp_path.exists()); - VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec()) + VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec()) .await .unwrap();