From a9ec4eb4fc7777a529ff8c5ede814dd657390e58 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Wed, 14 Feb 2024 10:26:32 +0000 Subject: [PATCH 1/5] hold cancel session (#6750) ## Problem In a recent refactor, we accidentally dropped the cancel session early ## Summary of changes Hold the cancel session during proxy passthrough --- proxy/src/proxy.rs | 1 + proxy/src/proxy/passthrough.rs | 2 ++ 2 files changed, 3 insertions(+) diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index ce77098a5f..8a9445303a 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -331,6 +331,7 @@ pub async fn handle_client( compute: node, req: _request_gauge, conn: _client_gauge, + cancel: session, })) } diff --git a/proxy/src/proxy/passthrough.rs b/proxy/src/proxy/passthrough.rs index c98f68d8d1..73c170fc0b 100644 --- a/proxy/src/proxy/passthrough.rs +++ b/proxy/src/proxy/passthrough.rs @@ -1,4 +1,5 @@ use crate::{ + cancellation, compute::PostgresConnection, console::messages::MetricsAuxInfo, metrics::NUM_BYTES_PROXIED_COUNTER, @@ -57,6 +58,7 @@ pub struct ProxyPassthrough { pub req: IntCounterPairGuard, pub conn: IntCounterPairGuard, + pub cancel: cancellation::Session, } impl ProxyPassthrough { From f39b0fce9b24a049208e74cc7d2a6b006b487839 Mon Sep 17 00:00:00 2001 From: John Spray Date: Wed, 14 Feb 2024 10:57:01 +0000 Subject: [PATCH 2/5] Revert #6666 "tests: try to make restored-datadir comparison tests not flaky" (#6751) The #6666 change appears to have made the test fail more often. PR https://github.com/neondatabase/neon/pull/6712 should re-instate this change, along with its change to make the overall flow more reliable. This reverts commit 568f91420a9c677e77aeb736cb3f995a85f0b106. --- test_runner/fixtures/neon_fixtures.py | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 26f2b999a6..04af73c327 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -3967,27 +3967,24 @@ def list_files_to_compare(pgdata_dir: Path) -> List[str]: # pg is the existing and running compute node, that we want to compare with a basebackup def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, endpoint: Endpoint): - pg_bin = PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version) - # Get the timeline ID. We need it for the 'basebackup' command timeline_id = TimelineId(endpoint.safe_psql("SHOW neon.timeline_id")[0][0]) + # many tests already checkpoint, but do it just in case + with closing(endpoint.connect()) as conn: + with conn.cursor() as cur: + cur.execute("CHECKPOINT") + + # wait for pageserver to catch up + wait_for_last_flush_lsn(env, endpoint, endpoint.tenant_id, timeline_id) # stop postgres to ensure that files won't change endpoint.stop() - # Read the shutdown checkpoint's LSN - pg_controldata_path = os.path.join(pg_bin.pg_bin_path, "pg_controldata") - cmd = f"{pg_controldata_path} -D {endpoint.pgdata_dir}" - result = subprocess.run(cmd, capture_output=True, text=True, shell=True) - checkpoint_lsn = re.findall( - "Latest checkpoint location:\\s+([0-9A-F]+/[0-9A-F]+)", result.stdout - )[0] - log.debug(f"last checkpoint at {checkpoint_lsn}") - # Take a basebackup from pageserver restored_dir_path = env.repo_dir / f"{endpoint.endpoint_id}_restored_datadir" restored_dir_path.mkdir(exist_ok=True) + pg_bin = PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version) psql_path = os.path.join(pg_bin.pg_bin_path, "psql") pageserver_id = env.attachment_service.locate(endpoint.tenant_id)[0]["node_id"] @@ -3995,7 +3992,7 @@ def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, endpoint {psql_path} \ --no-psqlrc \ postgres://localhost:{env.get_pageserver(pageserver_id).service_port.pg} \ - -c 'basebackup {endpoint.tenant_id} {timeline_id} {checkpoint_lsn}' \ + -c 'basebackup {endpoint.tenant_id} {timeline_id}' \ | tar -x -C {restored_dir_path} """ From df5d588f63fd329c701c37e61f77d9524ebcb19b Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 14 Feb 2024 15:22:41 +0100 Subject: [PATCH 3/5] refactor(VirtualFile::crashsafe_overwrite): avoid Handle::block_on in callers (#6731) Some callers of `VirtualFile::crashsafe_overwrite` call it on the executor thread, thereby potentially stalling it. Others are more diligent and wrap it in `spawn_blocking(..., Handle::block_on, ... )` to avoid stalling the executor thread. However, because `crashsafe_overwrite` uses VirtualFile::open_with_options internally, we spawn a new thread-local `tokio-epoll-uring::System` in the blocking pool thread that's used for the `spawn_blocking` call. This PR refactors the situation such that we do the `spawn_blocking` inside `VirtualFile::crashsafe_overwrite`. This unifies the situation for the better: 1. Callers who didn't wrap in `spawn_blocking(..., Handle::block_on, ...)` before no longer stall the executor. 2. Callers who did it before now can avoid the `block_on`, resolving the problem with the short-lived `tokio-epoll-uring::System`s in the blocking pool threads. A future PR will build on top of this and divert to tokio-epoll-uring if it's configures as the IO engine. Changes ------- - Convert implementation to std::fs and move it into `crashsafe.rs` - Yes, I know, Safekeepers (cc @arssher ) added `durable_rename` and `fsync_async_opt` recently. However, `crashsafe_overwrite` is different in the sense that it's higher level, i.e., it's more like `std::fs::write` and the Safekeeper team's code is more building block style. - The consequence is that we don't use the VirtualFile file descriptor cache anymore. - I don't think it's a big deal because we have plenty of slack wrt production file descriptor limit rlimit (see [this dashboard](https://neonprod.grafana.net/d/e4a40325-9acf-4aa0-8fd9-f6322b3f30bd/pageserver-open-file-descriptors?orgId=1)) - Use `tokio::task::spawn_blocking` in `VirtualFile::crashsafe_overwrite` to call the new `crashsafe::overwrite` API. - Inspect all callers to remove any double-`spawn_blocking` - spawn_blocking requires the captures data to be 'static + Send. So, refactor the callers. We'll need this for future tokio-epoll-uring support anyway, because tokio-epoll-uring requires owned buffers. Related Issues -------------- - overall epic to enable write path to tokio-epoll-uring: #6663 - this is also kind of relevant to the tokio-epoll-uring System creation failures that we encountered in staging, investigation being tracked in #6667 - why is it relevant? Because this PR removes two uses of `spawn_blocking+Handle::block_on` --- libs/utils/src/crashsafe.rs | 44 +++++++++++- pageserver/src/deletion_queue.rs | 5 +- pageserver/src/tenant.rs | 33 +++------ pageserver/src/tenant/metadata.rs | 2 +- pageserver/src/tenant/secondary/downloader.rs | 11 +-- pageserver/src/virtual_file.rs | 72 ++++++++----------- 6 files changed, 89 insertions(+), 78 deletions(-) diff --git a/libs/utils/src/crashsafe.rs b/libs/utils/src/crashsafe.rs index 1c72e9cae9..756b19138c 100644 --- a/libs/utils/src/crashsafe.rs +++ b/libs/utils/src/crashsafe.rs @@ -1,7 +1,7 @@ use std::{ borrow::Cow, fs::{self, File}, - io, + io::{self, Write}, }; use camino::{Utf8Path, Utf8PathBuf}; @@ -161,6 +161,48 @@ pub async fn durable_rename( Ok(()) } +/// Writes a file to the specified `final_path` in a crash safe fasion, using [`std::fs`]. +/// +/// The file is first written to the specified `tmp_path`, and in a second +/// step, the `tmp_path` is renamed to the `final_path`. Intermediary fsync +/// and atomic rename guarantee that, if we crash at any point, there will never +/// be a partially written file at `final_path` (but maybe at `tmp_path`). +/// +/// Callers are responsible for serializing calls of this function for a given `final_path`. +/// If they don't, there may be an error due to conflicting `tmp_path`, or there will +/// be no error and the content of `final_path` will be the "winner" caller's `content`. +/// I.e., the atomticity guarantees still hold. +pub fn overwrite( + final_path: &Utf8Path, + tmp_path: &Utf8Path, + content: &[u8], +) -> std::io::Result<()> { + let Some(final_path_parent) = final_path.parent() else { + return Err(std::io::Error::from_raw_os_error( + nix::errno::Errno::EINVAL as i32, + )); + }; + std::fs::remove_file(tmp_path).or_else(crate::fs_ext::ignore_not_found)?; + let mut file = std::fs::OpenOptions::new() + .write(true) + // Use `create_new` so that, if we race with ourselves or something else, + // we bail out instead of causing damage. + .create_new(true) + .open(tmp_path)?; + file.write_all(content)?; + file.sync_all()?; + drop(file); // don't keep the fd open for longer than we have to + + std::fs::rename(tmp_path, final_path)?; + + let final_parent_dirfd = std::fs::OpenOptions::new() + .read(true) + .open(final_path_parent)?; + + final_parent_dirfd.sync_all()?; + Ok(()) +} + #[cfg(test)] mod tests { diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 9046fe881b..e0c40ea1b0 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -234,7 +234,7 @@ impl DeletionHeader { let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?; let header_path = conf.deletion_header_path(); let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX); - VirtualFile::crashsafe_overwrite(&header_path, &temp_path, header_bytes) + VirtualFile::crashsafe_overwrite(header_path, temp_path, header_bytes) .await .maybe_fatal_err("save deletion header")?; @@ -325,7 +325,8 @@ impl DeletionList { let temp_path = path_with_suffix_extension(&path, TEMP_SUFFIX); let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list"); - VirtualFile::crashsafe_overwrite(&path, &temp_path, bytes) + + VirtualFile::crashsafe_overwrite(path, temp_path, bytes) .await .maybe_fatal_err("save deletion list") .map_err(Into::into) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 9f1f188bf2..1f3bc13472 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -28,7 +28,6 @@ use remote_storage::GenericRemoteStorage; use std::fmt; use storage_broker::BrokerClientChannel; use tokio::io::BufReader; -use tokio::runtime::Handle; use tokio::sync::watch; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; @@ -2878,17 +2877,10 @@ impl Tenant { let tenant_shard_id = *tenant_shard_id; let config_path = config_path.to_owned(); - tokio::task::spawn_blocking(move || { - Handle::current().block_on(async move { - let conf_content = conf_content.into_bytes(); - VirtualFile::crashsafe_overwrite(&config_path, &temp_path, conf_content) - .await - .with_context(|| { - format!("write tenant {tenant_shard_id} config to {config_path}") - }) - }) - }) - .await??; + let conf_content = conf_content.into_bytes(); + VirtualFile::crashsafe_overwrite(config_path.clone(), temp_path, conf_content) + .await + .with_context(|| format!("write tenant {tenant_shard_id} config to {config_path}"))?; Ok(()) } @@ -2915,17 +2907,12 @@ impl Tenant { let tenant_shard_id = *tenant_shard_id; let target_config_path = target_config_path.to_owned(); - tokio::task::spawn_blocking(move || { - Handle::current().block_on(async move { - let conf_content = conf_content.into_bytes(); - VirtualFile::crashsafe_overwrite(&target_config_path, &temp_path, conf_content) - .await - .with_context(|| { - format!("write tenant {tenant_shard_id} config to {target_config_path}") - }) - }) - }) - .await??; + let conf_content = conf_content.into_bytes(); + VirtualFile::crashsafe_overwrite(target_config_path.clone(), temp_path, conf_content) + .await + .with_context(|| { + format!("write tenant {tenant_shard_id} config to {target_config_path}") + })?; Ok(()) } diff --git a/pageserver/src/tenant/metadata.rs b/pageserver/src/tenant/metadata.rs index dcbe781f90..233acfd431 100644 --- a/pageserver/src/tenant/metadata.rs +++ b/pageserver/src/tenant/metadata.rs @@ -279,7 +279,7 @@ pub async fn save_metadata( let path = conf.metadata_path(tenant_shard_id, timeline_id); let temp_path = path_with_suffix_extension(&path, TEMP_FILE_SUFFIX); let metadata_bytes = data.to_bytes().context("serialize metadata")?; - VirtualFile::crashsafe_overwrite(&path, &temp_path, metadata_bytes) + VirtualFile::crashsafe_overwrite(path, temp_path, metadata_bytes) .await .context("write metadata")?; Ok(()) diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index c23416a7f0..c8288acc20 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -484,14 +484,9 @@ impl<'a> TenantDownloader<'a> { let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX); let context_msg = format!("write tenant {tenant_shard_id} heatmap to {heatmap_path}"); let heatmap_path_bg = heatmap_path.clone(); - tokio::task::spawn_blocking(move || { - tokio::runtime::Handle::current().block_on(async move { - VirtualFile::crashsafe_overwrite(&heatmap_path_bg, &temp_path, heatmap_bytes).await - }) - }) - .await - .expect("Blocking task is never aborted") - .maybe_fatal_err(&context_msg)?; + VirtualFile::crashsafe_overwrite(heatmap_path_bg, temp_path, heatmap_bytes) + .await + .maybe_fatal_err(&context_msg)?; tracing::debug!("Wrote local heatmap to {}", heatmap_path); diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 6cff748d42..2a8c22430b 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -19,14 +19,13 @@ use once_cell::sync::OnceCell; use pageserver_api::shard::TenantShardId; use std::fs::{self, File}; use std::io::{Error, ErrorKind, Seek, SeekFrom}; -use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice}; +use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice}; use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; use std::os::unix::fs::FileExt; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use tokio::time::Instant; -use utils::fs_ext; pub use pageserver_api::models::virtual_file as api; pub(crate) mod io_engine; @@ -404,47 +403,34 @@ impl VirtualFile { Ok(vfile) } - /// Writes a file to the specified `final_path` in a crash safe fasion + /// Async version of [`::utils::crashsafe::overwrite`]. /// - /// The file is first written to the specified tmp_path, and in a second - /// step, the tmp path is renamed to the final path. As renames are - /// atomic, a crash during the write operation will never leave behind a - /// partially written file. - pub async fn crashsafe_overwrite( - final_path: &Utf8Path, - tmp_path: &Utf8Path, + /// # NB: + /// + /// Doesn't actually use the [`VirtualFile`] file descriptor cache, but, + /// it did at an earlier time. + /// And it will use this module's [`io_engine`] in the near future, so, leaving it here. + pub async fn crashsafe_overwrite + Send, Buf: IoBuf + Send>( + final_path: Utf8PathBuf, + tmp_path: Utf8PathBuf, content: B, ) -> std::io::Result<()> { - let Some(final_path_parent) = final_path.parent() else { - return Err(std::io::Error::from_raw_os_error( - nix::errno::Errno::EINVAL as i32, - )); - }; - std::fs::remove_file(tmp_path).or_else(fs_ext::ignore_not_found)?; - let mut file = Self::open_with_options( - tmp_path, - OpenOptions::new() - .write(true) - // Use `create_new` so that, if we race with ourselves or something else, - // we bail out instead of causing damage. - .create_new(true), - ) - .await?; - let (_content, res) = file.write_all(content).await; - res?; - file.sync_all().await?; - drop(file); // before the rename, that's important! - // renames are atomic - std::fs::rename(tmp_path, final_path)?; - // Only open final path parent dirfd now, so that this operation only - // ever holds one VirtualFile fd at a time. That's important because - // the current `find_victim_slot` impl might pick the same slot for both - // VirtualFile., and it eventually does a blocking write lock instead of - // try_lock. - let final_parent_dirfd = - Self::open_with_options(final_path_parent, OpenOptions::new().read(true)).await?; - final_parent_dirfd.sync_all().await?; - Ok(()) + // TODO: use tokio_epoll_uring if configured as `io_engine`. + // See https://github.com/neondatabase/neon/issues/6663 + + tokio::task::spawn_blocking(move || { + let slice_storage; + let content_len = content.bytes_init(); + let content = if content.bytes_init() > 0 { + slice_storage = Some(content.slice(0..content_len)); + slice_storage.as_deref().expect("just set it to Some()") + } else { + &[] + }; + utils::crashsafe::overwrite(&final_path, &tmp_path, content) + }) + .await + .expect("blocking task is never aborted") } /// Call File::sync_all() on the underlying File. @@ -1315,7 +1301,7 @@ mod tests { let path = testdir.join("myfile"); let tmp_path = testdir.join("myfile.tmp"); - VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec()) + VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec()) .await .unwrap(); let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap()); @@ -1324,7 +1310,7 @@ mod tests { assert!(!tmp_path.exists()); drop(file); - VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"bar".to_vec()) + VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec()) .await .unwrap(); let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap()); @@ -1346,7 +1332,7 @@ mod tests { std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap(); assert!(tmp_path.exists()); - VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec()) + VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec()) .await .unwrap(); From 774a6e74757d1b1d1e3c75ab103bdd38587a38f1 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 14 Feb 2024 15:59:06 +0100 Subject: [PATCH 4/5] refactor(virtual_file) make write_all_at take owned buffers (#6673) context: https://github.com/neondatabase/neon/issues/6663 Building atop #6664, this PR switches `write_all_at` to take owned buffers. The main challenge here is the `EphemeralFile::mutable_tail`, for which I'm picking the ugly solution of an `Option` that is `None` while the IO is in flight. After this, we will be able to switch `write_at` to take owned buffers and call tokio-epoll-uring's `write` function with that owned buffer. That'll be done in #6378. --- pageserver/src/tenant/ephemeral_file.rs | 51 ++++++++++++++++++------- pageserver/src/virtual_file.rs | 50 +++++++++++++++++------- 2 files changed, 74 insertions(+), 27 deletions(-) diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 6b8cd77d78..2bedbf7f61 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -6,6 +6,7 @@ use crate::context::RequestContext; use crate::page_cache::{self, PAGE_SZ}; use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader}; use crate::virtual_file::{self, VirtualFile}; +use bytes::BytesMut; use camino::Utf8PathBuf; use pageserver_api::shard::TenantShardId; use std::cmp::min; @@ -26,7 +27,10 @@ pub struct EphemeralFile { /// An ephemeral file is append-only. /// We keep the last page, which can still be modified, in [`Self::mutable_tail`]. /// The other pages, which can no longer be modified, are accessed through the page cache. - mutable_tail: [u8; PAGE_SZ], + /// + /// None <=> IO is ongoing. + /// Size is fixed to PAGE_SZ at creation time and must not be changed. + mutable_tail: Option, } impl EphemeralFile { @@ -60,7 +64,7 @@ impl EphemeralFile { _timeline_id: timeline_id, file, len: 0, - mutable_tail: [0u8; PAGE_SZ], + mutable_tail: Some(BytesMut::zeroed(PAGE_SZ)), }) } @@ -103,7 +107,13 @@ impl EphemeralFile { }; } else { debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64); - Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail)) + Ok(BlockLease::EphemeralFileMutableTail( + self.mutable_tail + .as_deref() + .expect("we're not doing IO, it must be Some()") + .try_into() + .expect("we ensure that it's always PAGE_SZ"), + )) } } @@ -135,21 +145,27 @@ impl EphemeralFile { ) -> Result<(), io::Error> { let mut src_remaining = src; while !src_remaining.is_empty() { - let dst_remaining = &mut self.ephemeral_file.mutable_tail[self.off..]; + let dst_remaining = &mut self + .ephemeral_file + .mutable_tail + .as_deref_mut() + .expect("IO is not yet ongoing")[self.off..]; let n = min(dst_remaining.len(), src_remaining.len()); dst_remaining[..n].copy_from_slice(&src_remaining[..n]); self.off += n; src_remaining = &src_remaining[n..]; if self.off == PAGE_SZ { - match self + let mutable_tail = std::mem::take(&mut self.ephemeral_file.mutable_tail) + .expect("IO is not yet ongoing"); + let (mutable_tail, res) = self .ephemeral_file .file - .write_all_at( - &self.ephemeral_file.mutable_tail, - self.blknum as u64 * PAGE_SZ as u64, - ) - .await - { + .write_all_at(mutable_tail, self.blknum as u64 * PAGE_SZ as u64) + .await; + // TODO: If we panic before we can put the mutable_tail back, subsequent calls will fail. + // I.e., the IO isn't retryable if we panic. + self.ephemeral_file.mutable_tail = Some(mutable_tail); + match res { Ok(_) => { // Pre-warm the page cache with what we just wrote. // This isn't necessary for coherency/correctness, but it's how we've always done it. @@ -169,7 +185,12 @@ impl EphemeralFile { Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => { let buf: &mut [u8] = write_guard.deref_mut(); debug_assert_eq!(buf.len(), PAGE_SZ); - buf.copy_from_slice(&self.ephemeral_file.mutable_tail); + buf.copy_from_slice( + self.ephemeral_file + .mutable_tail + .as_deref() + .expect("IO is not ongoing"), + ); let _ = write_guard.mark_valid(); // pre-warm successful } @@ -181,7 +202,11 @@ impl EphemeralFile { // Zero the buffer for re-use. // Zeroing is critical for correcntess because the write_blob code below // and similarly read_blk expect zeroed pages. - self.ephemeral_file.mutable_tail.fill(0); + self.ephemeral_file + .mutable_tail + .as_deref_mut() + .expect("IO is not ongoing") + .fill(0); // This block is done, move to next one. self.blknum += 1; self.off = 0; diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 2a8c22430b..858fc0ef64 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -568,24 +568,37 @@ impl VirtualFile { } // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235 - pub async fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> Result<(), Error> { + pub async fn write_all_at( + &self, + buf: B, + mut offset: u64, + ) -> (B::Buf, Result<(), Error>) { + let buf_len = buf.bytes_init(); + if buf_len == 0 { + return (Slice::into_inner(buf.slice_full()), Ok(())); + } + let mut buf = buf.slice(0..buf_len); while !buf.is_empty() { - match self.write_at(buf, offset).await { + // TODO: push `buf` further down + match self.write_at(&buf, offset).await { Ok(0) => { - return Err(Error::new( - std::io::ErrorKind::WriteZero, - "failed to write whole buffer", - )); + return ( + Slice::into_inner(buf), + Err(Error::new( + std::io::ErrorKind::WriteZero, + "failed to write whole buffer", + )), + ); } Ok(n) => { - buf = &buf[n..]; + buf = buf.slice(n..); offset += n as u64; } Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} - Err(e) => return Err(e), + Err(e) => return (Slice::into_inner(buf), Err(e)), } } - Ok(()) + (Slice::into_inner(buf), Ok(())) } /// Writes `buf.slice(0..buf.bytes_init())`. @@ -1050,10 +1063,19 @@ mod tests { MaybeVirtualFile::File(file) => file.read_exact_at(&mut buf, offset).map(|()| buf), } } - async fn write_all_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> { + async fn write_all_at(&self, buf: B, offset: u64) -> Result<(), Error> { match self { - MaybeVirtualFile::VirtualFile(file) => file.write_all_at(buf, offset).await, - MaybeVirtualFile::File(file) => file.write_all_at(buf, offset), + MaybeVirtualFile::VirtualFile(file) => { + let (_buf, res) = file.write_all_at(buf, offset).await; + res + } + MaybeVirtualFile::File(file) => { + let buf_len = buf.bytes_init(); + if buf_len == 0 { + return Ok(()); + } + file.write_all_at(&buf.slice(0..buf_len), offset) + } } } async fn seek(&mut self, pos: SeekFrom) -> Result { @@ -1200,8 +1222,8 @@ mod tests { .to_owned(), ) .await?; - file_b.write_all_at(b"BAR", 3).await?; - file_b.write_all_at(b"FOO", 0).await?; + file_b.write_all_at(b"BAR".to_vec(), 3).await?; + file_b.write_all_at(b"FOO".to_vec(), 0).await?; assert_eq!(file_b.read_string_at(2, 3).await?, "OBA"); From 840abe395413508db40d0428e30f09343c051fed Mon Sep 17 00:00:00 2001 From: John Spray Date: Wed, 14 Feb 2024 15:01:16 +0000 Subject: [PATCH 5/5] pageserver: store aux files as deltas (#6742) ## Problem Aux files were stored with an O(N^2) cost, since on each modification the entire map is re-written as a page image. This addresses one axis of the inefficiency in logical replication's use of storage (https://github.com/neondatabase/neon/issues/6626). It will still be writing a large amount of duplicative data if writing the same slot's state every 15 seconds, but the impact will be O(N) instead of O(N^2). ## Summary of changes - Introduce `NeonWalRecord::AuxFile` - In `DatadirModification`, if the AUX_FILES_KEY has already been set, then write a delta instead of an image --- pageserver/src/pgdatadir_mapping.rs | 162 +++++++++++++++++++++++---- pageserver/src/tenant.rs | 41 ++++--- pageserver/src/walrecord.rs | 5 + pageserver/src/walredo.rs | 2 +- pageserver/src/walredo/apply_neon.rs | 70 +++++++++++- 5 files changed, 242 insertions(+), 38 deletions(-) diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 5f80ea9b5e..0ff03303d4 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -156,6 +156,7 @@ impl Timeline { pending_updates: HashMap::new(), pending_deletions: Vec::new(), pending_nblocks: 0, + pending_aux_files: None, pending_directory_entries: Vec::new(), lsn, } @@ -870,6 +871,14 @@ pub struct DatadirModification<'a> { pending_updates: HashMap>, pending_deletions: Vec<(Range, Lsn)>, pending_nblocks: i64, + + // If we already wrote any aux file changes in this modification, stash the latest dir. If set, + // [`Self::put_file`] may assume that it is safe to emit a delta rather than checking + // if AUX_FILES_KEY is already set. + pending_aux_files: Option, + + /// For special "directory" keys that store key-value maps, track the size of the map + /// if it was updated in this modification. pending_directory_entries: Vec<(DirectoryKind, usize)>, } @@ -1384,31 +1393,76 @@ impl<'a> DatadirModification<'a> { content: &[u8], ctx: &RequestContext, ) -> anyhow::Result<()> { - let mut dir = match self.get(AUX_FILES_KEY, ctx).await { - Ok(buf) => AuxFilesDirectory::des(&buf)?, - Err(e) => { - // This is expected: historical databases do not have the key. - debug!("Failed to get info about AUX files: {}", e); - AuxFilesDirectory { - files: HashMap::new(), + let file_path = path.to_string(); + let content = if content.is_empty() { + None + } else { + Some(Bytes::copy_from_slice(content)) + }; + + let dir = if let Some(mut dir) = self.pending_aux_files.take() { + // We already updated aux files in `self`: emit a delta and update our latest value + + self.put( + AUX_FILES_KEY, + Value::WalRecord(NeonWalRecord::AuxFile { + file_path: file_path.clone(), + content: content.clone(), + }), + ); + + dir.upsert(file_path, content); + dir + } else { + // Check if the AUX_FILES_KEY is initialized + match self.get(AUX_FILES_KEY, ctx).await { + Ok(dir_bytes) => { + let mut dir = AuxFilesDirectory::des(&dir_bytes)?; + // Key is already set, we may append a delta + self.put( + AUX_FILES_KEY, + Value::WalRecord(NeonWalRecord::AuxFile { + file_path: file_path.clone(), + content: content.clone(), + }), + ); + dir.upsert(file_path, content); + dir + } + Err( + e @ (PageReconstructError::AncestorStopping(_) + | PageReconstructError::Cancelled + | PageReconstructError::AncestorLsnTimeout(_)), + ) => { + // Important that we do not interpret a shutdown error as "not found" and thereby + // reset the map. + return Err(e.into()); + } + // FIXME: PageReconstructError doesn't have an explicit variant for key-not-found, so + // we are assuming that all _other_ possible errors represents a missing key. If some + // other error occurs, we may incorrectly reset the map of aux files. + Err(PageReconstructError::Other(_) | PageReconstructError::WalRedo(_)) => { + // Key is missing, we must insert an image as the basis for subsequent deltas. + + let mut dir = AuxFilesDirectory { + files: HashMap::new(), + }; + dir.upsert(file_path, content); + self.put( + AUX_FILES_KEY, + Value::Image(Bytes::from( + AuxFilesDirectory::ser(&dir).context("serialize")?, + )), + ); + dir } } }; - let path = path.to_string(); - if content.is_empty() { - dir.files.remove(&path); - } else { - dir.files.insert(path, Bytes::copy_from_slice(content)); - } + self.pending_directory_entries .push((DirectoryKind::AuxFiles, dir.files.len())); + self.pending_aux_files = Some(dir); - self.put( - AUX_FILES_KEY, - Value::Image(Bytes::from( - AuxFilesDirectory::ser(&dir).context("serialize")?, - )), - ); Ok(()) } @@ -1618,8 +1672,18 @@ struct RelDirectory { } #[derive(Debug, Serialize, Deserialize, Default)] -struct AuxFilesDirectory { - files: HashMap, +pub(crate) struct AuxFilesDirectory { + pub(crate) files: HashMap, +} + +impl AuxFilesDirectory { + pub(crate) fn upsert(&mut self, key: String, value: Option) { + if let Some(value) = value { + self.files.insert(key, value); + } else { + self.files.remove(&key); + } + } } #[derive(Debug, Serialize, Deserialize)] @@ -1655,8 +1719,60 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]); #[allow(clippy::bool_assert_comparison)] #[cfg(test)] mod tests { - //use super::repo_harness::*; - //use super::*; + use hex_literal::hex; + use utils::id::TimelineId; + + use super::*; + + use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION}; + + /// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline + #[tokio::test] + async fn aux_files_round_trip() -> anyhow::Result<()> { + let name = "aux_files_round_trip"; + let harness = TenantHarness::create(name)?; + + pub const TIMELINE_ID: TimelineId = + TimelineId::from_array(hex!("11223344556677881122334455667788")); + + let (tenant, ctx) = harness.load().await; + let tline = tenant + .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) + .await?; + let tline = tline.raw_timeline().unwrap(); + + // First modification: insert two keys + let mut modification = tline.begin_modification(Lsn(0x1000)); + modification.put_file("foo/bar1", b"content1", &ctx).await?; + modification.set_lsn(Lsn(0x1008))?; + modification.put_file("foo/bar2", b"content2", &ctx).await?; + modification.commit(&ctx).await?; + let expect_1008 = HashMap::from([ + ("foo/bar1".to_string(), Bytes::from_static(b"content1")), + ("foo/bar2".to_string(), Bytes::from_static(b"content2")), + ]); + + let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?; + assert_eq!(readback, expect_1008); + + // Second modification: update one key, remove the other + let mut modification = tline.begin_modification(Lsn(0x2000)); + modification.put_file("foo/bar1", b"content3", &ctx).await?; + modification.set_lsn(Lsn(0x2008))?; + modification.put_file("foo/bar2", b"", &ctx).await?; + modification.commit(&ctx).await?; + let expect_2008 = + HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]); + + let readback = tline.list_aux_files(Lsn(0x2008), &ctx).await?; + assert_eq!(readback, expect_2008); + + // Reading back in time works + let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?; + assert_eq!(readback, expect_1008); + + Ok(()) + } /* fn assert_current_logical_size(timeline: &DatadirTimeline, lsn: Lsn) { diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 1f3bc13472..44a446d697 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3901,6 +3901,7 @@ pub(crate) mod harness { use utils::lsn::Lsn; use crate::deletion_queue::mock::MockDeletionQueue; + use crate::walredo::apply_neon; use crate::{ config::PageServerConf, repository::Key, tenant::Tenant, walrecord::NeonWalRecord, }; @@ -4160,20 +4161,34 @@ pub(crate) mod harness { records: Vec<(Lsn, NeonWalRecord)>, _pg_version: u32, ) -> anyhow::Result { - let s = format!( - "redo for {} to get to {}, with {} and {} records", - key, - lsn, - if base_img.is_some() { - "base image" - } else { - "no base image" - }, - records.len() - ); - println!("{s}"); + let records_neon = records.iter().all(|r| apply_neon::can_apply_in_neon(&r.1)); - Ok(TEST_IMG(&s)) + if records_neon { + // For Neon wal records, we can decode without spawning postgres, so do so. + let base_img = base_img.expect("Neon WAL redo requires base image").1; + let mut page = BytesMut::new(); + page.extend_from_slice(&base_img); + for (_record_lsn, record) in records { + apply_neon::apply_in_neon(&record, key, &mut page)?; + } + Ok(page.freeze()) + } else { + // We never spawn a postgres walredo process in unit tests: just log what we might have done. + let s = format!( + "redo for {} to get to {}, with {} and {} records", + key, + lsn, + if base_img.is_some() { + "base image" + } else { + "no base image" + }, + records.len() + ); + println!("{s}"); + + Ok(TEST_IMG(&s)) + } } } } diff --git a/pageserver/src/walrecord.rs b/pageserver/src/walrecord.rs index ff6bc9194b..1b7777a544 100644 --- a/pageserver/src/walrecord.rs +++ b/pageserver/src/walrecord.rs @@ -44,6 +44,11 @@ pub enum NeonWalRecord { moff: MultiXactOffset, members: Vec, }, + /// Update the map of AUX files, either writing or dropping an entry + AuxFile { + file_path: String, + content: Option, + }, } impl NeonWalRecord { diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 98a6a0bb6c..35cbefb92c 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -22,7 +22,7 @@ mod process; /// Code to apply [`NeonWalRecord`]s. -mod apply_neon; +pub(crate) mod apply_neon; use crate::config::PageServerConf; use crate::metrics::{ diff --git a/pageserver/src/walredo/apply_neon.rs b/pageserver/src/walredo/apply_neon.rs index 52899349c4..6ce90e0c47 100644 --- a/pageserver/src/walredo/apply_neon.rs +++ b/pageserver/src/walredo/apply_neon.rs @@ -1,7 +1,8 @@ +use crate::pgdatadir_mapping::AuxFilesDirectory; use crate::walrecord::NeonWalRecord; use anyhow::Context; use byteorder::{ByteOrder, LittleEndian}; -use bytes::BytesMut; +use bytes::{BufMut, BytesMut}; use pageserver_api::key::{key_to_rel_block, key_to_slru_block, Key}; use pageserver_api::reltag::SlruKind; use postgres_ffi::pg_constants; @@ -12,6 +13,7 @@ use postgres_ffi::v14::nonrelfile_utils::{ }; use postgres_ffi::BLCKSZ; use tracing::*; +use utils::bin_ser::BeSer; /// Can this request be served by neon redo functions /// or we need to pass it to wal-redo postgres process? @@ -230,6 +232,72 @@ pub(crate) fn apply_in_neon( LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid); } } + NeonWalRecord::AuxFile { file_path, content } => { + let mut dir = AuxFilesDirectory::des(page)?; + dir.upsert(file_path.clone(), content.clone()); + + page.clear(); + let mut writer = page.writer(); + dir.ser_into(&mut writer)?; + } } Ok(()) } + +#[cfg(test)] +mod test { + use bytes::Bytes; + use pageserver_api::key::AUX_FILES_KEY; + + use super::*; + use std::collections::HashMap; + + use crate::{pgdatadir_mapping::AuxFilesDirectory, walrecord::NeonWalRecord}; + + /// Test [`apply_in_neon`]'s handling of NeonWalRecord::AuxFile + #[test] + fn apply_aux_file_deltas() -> anyhow::Result<()> { + let base_dir = AuxFilesDirectory { + files: HashMap::from([ + ("two".to_string(), Bytes::from_static(b"content0")), + ("three".to_string(), Bytes::from_static(b"contentX")), + ]), + }; + let base_image = AuxFilesDirectory::ser(&base_dir)?; + + let deltas = vec![ + // Insert + NeonWalRecord::AuxFile { + file_path: "one".to_string(), + content: Some(Bytes::from_static(b"content1")), + }, + // Update + NeonWalRecord::AuxFile { + file_path: "two".to_string(), + content: Some(Bytes::from_static(b"content99")), + }, + // Delete + NeonWalRecord::AuxFile { + file_path: "three".to_string(), + content: None, + }, + ]; + + let file_path = AUX_FILES_KEY; + let mut page = BytesMut::from_iter(base_image); + + for record in deltas { + apply_in_neon(&record, file_path, &mut page)?; + } + + let reconstructed = AuxFilesDirectory::des(&page)?; + let expect = HashMap::from([ + ("one".to_string(), Bytes::from_static(b"content1")), + ("two".to_string(), Bytes::from_static(b"content99")), + ]); + + assert_eq!(reconstructed.files, expect); + + Ok(()) + } +}