From bc684e9d3bcc9285a8ad6d47651fb90bcb47886c Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Fri, 26 Jan 2024 16:51:41 +0300 Subject: [PATCH] Make WAL segment init atomic. Since fdatasync is used for flushing WAL, changing file size is unsafe. Make segment creation atomic by using tmp file + rename to avoid using partially initialized segments. fixes https://github.com/neondatabase/neon/issues/6402 --- libs/utils/src/crashsafe.rs | 49 +++++++++++++++++++ safekeeper/src/control_file.rs | 33 ++----------- safekeeper/src/wal_storage.rs | 39 ++++++++++----- .../regress/test_wal_acceptor_async.py | 36 ++++++++++++++ 4 files changed, 116 insertions(+), 41 deletions(-) diff --git a/libs/utils/src/crashsafe.rs b/libs/utils/src/crashsafe.rs index b089af4a02..1c72e9cae9 100644 --- a/libs/utils/src/crashsafe.rs +++ b/libs/utils/src/crashsafe.rs @@ -112,6 +112,55 @@ pub async fn fsync_async(path: impl AsRef) -> Result<(), std::io::Erro tokio::fs::File::open(path.as_ref()).await?.sync_all().await } +pub async fn fsync_async_opt( + path: impl AsRef, + do_fsync: bool, +) -> Result<(), std::io::Error> { + if do_fsync { + fsync_async(path.as_ref()).await?; + } + Ok(()) +} + +/// Like postgres' durable_rename, renames file issuing fsyncs do make it +/// durable. After return, file and rename are guaranteed to be persisted. +/// +/// Unlike postgres, it only does fsyncs to 1) file to be renamed to make +/// contents durable; 2) its directory entry to make rename durable 3) again to +/// already renamed file, which is not required by standards but postgres does +/// it, let's stick to that. Postgres additionally fsyncs newpath *before* +/// rename if it exists to ensure that at least one of the files survives, but +/// current callers don't need that. +/// +/// virtual_file.rs has similar code, but it doesn't use vfs. +/// +/// Useful links: +/// +/// +pub async fn durable_rename( + old_path: impl AsRef, + new_path: impl AsRef, + do_fsync: bool, +) -> io::Result<()> { + // first fsync the file + fsync_async_opt(old_path.as_ref(), do_fsync).await?; + + // Time to do the real deal. + tokio::fs::rename(old_path.as_ref(), new_path.as_ref()).await?; + + // Postgres'ish fsync of renamed file. + fsync_async_opt(new_path.as_ref(), do_fsync).await?; + + // Now fsync the parent + let parent = match new_path.as_ref().parent() { + Some(p) => p, + None => Utf8Path::new("./"), // assume current dir if there is no parent + }; + fsync_async_opt(parent, do_fsync).await?; + + Ok(()) +} + #[cfg(test)] mod tests { diff --git a/safekeeper/src/control_file.rs b/safekeeper/src/control_file.rs index f1daddd7c3..c39c1dbf28 100644 --- a/safekeeper/src/control_file.rs +++ b/safekeeper/src/control_file.rs @@ -3,8 +3,9 @@ use anyhow::{bail, ensure, Context, Result}; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use camino::Utf8PathBuf; -use tokio::fs::{self, File}; +use tokio::fs::File; use tokio::io::AsyncWriteExt; +use utils::crashsafe::durable_rename; use std::io::Read; use std::ops::Deref; @@ -203,35 +204,8 @@ impl Storage for FileStorage { ) })?; - // fsync the file - if !self.conf.no_sync { - control_partial.sync_all().await.with_context(|| { - format!( - "failed to sync partial control file at {}", - control_partial_path - ) - })?; - } - let control_path = self.timeline_dir.join(CONTROL_FILE_NAME); - - // rename should be atomic - fs::rename(&control_partial_path, &control_path).await?; - // this sync is not required by any standard but postgres does this (see durable_rename) - if !self.conf.no_sync { - let new_f = File::open(&control_path).await?; - new_f - .sync_all() - .await - .with_context(|| format!("failed to sync control file at: {}", &control_path))?; - - // fsync the directory (linux specific) - let tli_dir = File::open(&self.timeline_dir).await?; - tli_dir - .sync_all() - .await - .context("failed to sync control file directory")?; - } + durable_rename(&control_partial_path, &control_path, !self.conf.no_sync).await?; // update internal state self.state = s.clone(); @@ -249,6 +223,7 @@ mod test { use super::*; use crate::SafeKeeperConf; use anyhow::Result; + use tokio::fs; use utils::{id::TenantTimelineId, lsn::Lsn}; fn stub_conf() -> SafeKeeperConf { diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index ed6190042a..8bbd95e9e8 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -21,6 +21,7 @@ use tokio::fs::{self, remove_file, File, OpenOptions}; use tokio::io::{AsyncRead, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncSeekExt}; use tracing::*; +use utils::crashsafe::durable_rename; use crate::metrics::{time_io_closure, WalStorageMetrics, REMOVED_WAL_SEGMENTS}; use crate::state::TimelinePersistentState; @@ -196,15 +197,6 @@ impl PhysicalStorage { Ok(()) } - /// Call fsync if config requires so. - async fn fsync_file(&mut self, file: &File) -> Result<()> { - if !self.conf.no_sync { - self.metrics - .observe_flush_seconds(time_io_closure(file.sync_all()).await?); - } - Ok(()) - } - /// Open or create WAL segment file. Caller must call seek to the wanted position. /// Returns `file` and `is_partial`. async fn open_or_create(&mut self, segno: XLogSegNo) -> Result<(File, bool)> { @@ -223,15 +215,33 @@ impl PhysicalStorage { Ok((file, true)) } else { // Create and fill new partial file + // + // We're using fdatasync during WAL writing, so file size must not + // change; to this end it is filled with zeros here. To avoid using + // half initialized segment, first bake it under tmp filename and + // then rename. + let tmp_path = self.timeline_dir.join("waltmp"); let mut file = OpenOptions::new() .create(true) .write(true) - .open(&wal_file_partial_path) + .open(&tmp_path) .await - .with_context(|| format!("Failed to open log file {:?}", &wal_file_path))?; + .with_context(|| format!("Failed to open tmp wal file {:?}", &tmp_path))?; write_zeroes(&mut file, self.wal_seg_size).await?; - self.fsync_file(&file).await?; + + // Note: this doesn't get into observe_flush_seconds metric. But + // segment init should be separate metric, if any. + if let Err(e) = + durable_rename(&tmp_path, &wal_file_partial_path, !self.conf.no_sync).await + { + // Probably rename succeeded, but fsync of it failed. Remove + // the file then to avoid using it. + remove_file(wal_file_partial_path) + .await + .or_else(utils::fs_ext::ignore_not_found)?; + return Err(e.into()); + } Ok((file, true)) } } @@ -718,6 +728,11 @@ const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ]; /// Helper for filling file with zeroes. async fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> { + fail::fail_point!("sk-write-zeroes", |_| { + info!("write_zeroes hit failpoint"); + Err(anyhow::anyhow!("failpoint: sk-write-zeroes")) + }); + while count >= XLOG_BLCKSZ { file.write_all(ZERO_BLOCK).await?; count -= XLOG_BLCKSZ; diff --git a/test_runner/regress/test_wal_acceptor_async.py b/test_runner/regress/test_wal_acceptor_async.py index 77d67cd63a..720633189e 100644 --- a/test_runner/regress/test_wal_acceptor_async.py +++ b/test_runner/regress/test_wal_acceptor_async.py @@ -515,6 +515,42 @@ def test_recovery_uncommitted(neon_env_builder: NeonEnvBuilder): asyncio.run(run_recovery_uncommitted(env)) +async def run_segment_init_failure(env: NeonEnv): + env.neon_cli.create_branch("test_segment_init_failure") + ep = env.endpoints.create_start("test_segment_init_failure") + ep.safe_psql("create table t(key int, value text)") + ep.safe_psql("insert into t select generate_series(1, 100), 'payload'") + + sk = env.safekeepers[0] + sk_http = sk.http_client() + sk_http.configure_failpoints([("sk-write-zeroes", "return")]) + conn = await ep.connect_async() + ep.safe_psql("select pg_switch_wal()") # jump to the segment boundary + # next insertion should hang until failpoint is disabled. + asyncio.create_task(conn.execute("insert into t select generate_series(1,1), 'payload'")) + sleep_sec = 2 + await asyncio.sleep(sleep_sec) + # also restart ep at segment boundary to make test more interesting + ep.stop() + # it must still be not finished + # assert not bg_query.done() + # Without segment rename during init (#6402) previous statement created + # partially initialized 16MB segment, so sk restart also triggers #6401. + sk.stop().start() + ep = env.endpoints.create_start("test_segment_init_failure") + ep.safe_psql("insert into t select generate_series(1,1), 'payload'") # should be ok now + + +# Test (injected) failure during WAL segment init. +# https://github.com/neondatabase/neon/issues/6401 +# https://github.com/neondatabase/neon/issues/6402 +def test_segment_init_failure(neon_env_builder: NeonEnvBuilder): + neon_env_builder.num_safekeepers = 1 + env = neon_env_builder.init_start() + + asyncio.run(run_segment_init_failure(env)) + + @dataclass class RaceConditionTest: iteration: int