mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-27 08:09:58 +00:00
Make WAL segment init atomic.
Since fdatasync is used for flushing WAL, changing file size is unsafe. Make segment creation atomic by using tmp file + rename to avoid using partially initialized segments. fixes https://github.com/neondatabase/neon/issues/6402
This commit is contained in:
@@ -112,6 +112,55 @@ pub async fn fsync_async(path: impl AsRef<Utf8Path>) -> Result<(), std::io::Erro
|
||||
tokio::fs::File::open(path.as_ref()).await?.sync_all().await
|
||||
}
|
||||
|
||||
pub async fn fsync_async_opt(
|
||||
path: impl AsRef<Utf8Path>,
|
||||
do_fsync: bool,
|
||||
) -> Result<(), std::io::Error> {
|
||||
if do_fsync {
|
||||
fsync_async(path.as_ref()).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Like postgres' durable_rename, renames file issuing fsyncs do make it
|
||||
/// durable. After return, file and rename are guaranteed to be persisted.
|
||||
///
|
||||
/// Unlike postgres, it only does fsyncs to 1) file to be renamed to make
|
||||
/// contents durable; 2) its directory entry to make rename durable 3) again to
|
||||
/// already renamed file, which is not required by standards but postgres does
|
||||
/// it, let's stick to that. Postgres additionally fsyncs newpath *before*
|
||||
/// rename if it exists to ensure that at least one of the files survives, but
|
||||
/// current callers don't need that.
|
||||
///
|
||||
/// virtual_file.rs has similar code, but it doesn't use vfs.
|
||||
///
|
||||
/// Useful links: <https://lwn.net/Articles/457667/>
|
||||
/// <https://www.postgresql.org/message-id/flat/56583BDD.9060302%402ndquadrant.com>
|
||||
/// <https://thunk.org/tytso/blog/2009/03/15/dont-fear-the-fsync/>
|
||||
pub async fn durable_rename(
|
||||
old_path: impl AsRef<Utf8Path>,
|
||||
new_path: impl AsRef<Utf8Path>,
|
||||
do_fsync: bool,
|
||||
) -> io::Result<()> {
|
||||
// first fsync the file
|
||||
fsync_async_opt(old_path.as_ref(), do_fsync).await?;
|
||||
|
||||
// Time to do the real deal.
|
||||
tokio::fs::rename(old_path.as_ref(), new_path.as_ref()).await?;
|
||||
|
||||
// Postgres'ish fsync of renamed file.
|
||||
fsync_async_opt(new_path.as_ref(), do_fsync).await?;
|
||||
|
||||
// Now fsync the parent
|
||||
let parent = match new_path.as_ref().parent() {
|
||||
Some(p) => p,
|
||||
None => Utf8Path::new("./"), // assume current dir if there is no parent
|
||||
};
|
||||
fsync_async_opt(parent, do_fsync).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
|
||||
@@ -3,8 +3,9 @@
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
|
||||
use camino::Utf8PathBuf;
|
||||
use tokio::fs::{self, File};
|
||||
use tokio::fs::File;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use utils::crashsafe::durable_rename;
|
||||
|
||||
use std::io::Read;
|
||||
use std::ops::Deref;
|
||||
@@ -203,35 +204,8 @@ impl Storage for FileStorage {
|
||||
)
|
||||
})?;
|
||||
|
||||
// fsync the file
|
||||
if !self.conf.no_sync {
|
||||
control_partial.sync_all().await.with_context(|| {
|
||||
format!(
|
||||
"failed to sync partial control file at {}",
|
||||
control_partial_path
|
||||
)
|
||||
})?;
|
||||
}
|
||||
|
||||
let control_path = self.timeline_dir.join(CONTROL_FILE_NAME);
|
||||
|
||||
// rename should be atomic
|
||||
fs::rename(&control_partial_path, &control_path).await?;
|
||||
// this sync is not required by any standard but postgres does this (see durable_rename)
|
||||
if !self.conf.no_sync {
|
||||
let new_f = File::open(&control_path).await?;
|
||||
new_f
|
||||
.sync_all()
|
||||
.await
|
||||
.with_context(|| format!("failed to sync control file at: {}", &control_path))?;
|
||||
|
||||
// fsync the directory (linux specific)
|
||||
let tli_dir = File::open(&self.timeline_dir).await?;
|
||||
tli_dir
|
||||
.sync_all()
|
||||
.await
|
||||
.context("failed to sync control file directory")?;
|
||||
}
|
||||
durable_rename(&control_partial_path, &control_path, !self.conf.no_sync).await?;
|
||||
|
||||
// update internal state
|
||||
self.state = s.clone();
|
||||
@@ -249,6 +223,7 @@ mod test {
|
||||
use super::*;
|
||||
use crate::SafeKeeperConf;
|
||||
use anyhow::Result;
|
||||
use tokio::fs;
|
||||
use utils::{id::TenantTimelineId, lsn::Lsn};
|
||||
|
||||
fn stub_conf() -> SafeKeeperConf {
|
||||
|
||||
@@ -21,6 +21,7 @@ use tokio::fs::{self, remove_file, File, OpenOptions};
|
||||
use tokio::io::{AsyncRead, AsyncWriteExt};
|
||||
use tokio::io::{AsyncReadExt, AsyncSeekExt};
|
||||
use tracing::*;
|
||||
use utils::crashsafe::durable_rename;
|
||||
|
||||
use crate::metrics::{time_io_closure, WalStorageMetrics, REMOVED_WAL_SEGMENTS};
|
||||
use crate::state::TimelinePersistentState;
|
||||
@@ -196,15 +197,6 @@ impl PhysicalStorage {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Call fsync if config requires so.
|
||||
async fn fsync_file(&mut self, file: &File) -> Result<()> {
|
||||
if !self.conf.no_sync {
|
||||
self.metrics
|
||||
.observe_flush_seconds(time_io_closure(file.sync_all()).await?);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Open or create WAL segment file. Caller must call seek to the wanted position.
|
||||
/// Returns `file` and `is_partial`.
|
||||
async fn open_or_create(&mut self, segno: XLogSegNo) -> Result<(File, bool)> {
|
||||
@@ -223,15 +215,33 @@ impl PhysicalStorage {
|
||||
Ok((file, true))
|
||||
} else {
|
||||
// Create and fill new partial file
|
||||
//
|
||||
// We're using fdatasync during WAL writing, so file size must not
|
||||
// change; to this end it is filled with zeros here. To avoid using
|
||||
// half initialized segment, first bake it under tmp filename and
|
||||
// then rename.
|
||||
let tmp_path = self.timeline_dir.join("waltmp");
|
||||
let mut file = OpenOptions::new()
|
||||
.create(true)
|
||||
.write(true)
|
||||
.open(&wal_file_partial_path)
|
||||
.open(&tmp_path)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open log file {:?}", &wal_file_path))?;
|
||||
.with_context(|| format!("Failed to open tmp wal file {:?}", &tmp_path))?;
|
||||
|
||||
write_zeroes(&mut file, self.wal_seg_size).await?;
|
||||
self.fsync_file(&file).await?;
|
||||
|
||||
// Note: this doesn't get into observe_flush_seconds metric. But
|
||||
// segment init should be separate metric, if any.
|
||||
if let Err(e) =
|
||||
durable_rename(&tmp_path, &wal_file_partial_path, !self.conf.no_sync).await
|
||||
{
|
||||
// Probably rename succeeded, but fsync of it failed. Remove
|
||||
// the file then to avoid using it.
|
||||
remove_file(wal_file_partial_path)
|
||||
.await
|
||||
.or_else(utils::fs_ext::ignore_not_found)?;
|
||||
return Err(e.into());
|
||||
}
|
||||
Ok((file, true))
|
||||
}
|
||||
}
|
||||
@@ -718,6 +728,11 @@ const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
|
||||
|
||||
/// Helper for filling file with zeroes.
|
||||
async fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> {
|
||||
fail::fail_point!("sk-write-zeroes", |_| {
|
||||
info!("write_zeroes hit failpoint");
|
||||
Err(anyhow::anyhow!("failpoint: sk-write-zeroes"))
|
||||
});
|
||||
|
||||
while count >= XLOG_BLCKSZ {
|
||||
file.write_all(ZERO_BLOCK).await?;
|
||||
count -= XLOG_BLCKSZ;
|
||||
|
||||
@@ -515,6 +515,42 @@ def test_recovery_uncommitted(neon_env_builder: NeonEnvBuilder):
|
||||
asyncio.run(run_recovery_uncommitted(env))
|
||||
|
||||
|
||||
async def run_segment_init_failure(env: NeonEnv):
|
||||
env.neon_cli.create_branch("test_segment_init_failure")
|
||||
ep = env.endpoints.create_start("test_segment_init_failure")
|
||||
ep.safe_psql("create table t(key int, value text)")
|
||||
ep.safe_psql("insert into t select generate_series(1, 100), 'payload'")
|
||||
|
||||
sk = env.safekeepers[0]
|
||||
sk_http = sk.http_client()
|
||||
sk_http.configure_failpoints([("sk-write-zeroes", "return")])
|
||||
conn = await ep.connect_async()
|
||||
ep.safe_psql("select pg_switch_wal()") # jump to the segment boundary
|
||||
# next insertion should hang until failpoint is disabled.
|
||||
asyncio.create_task(conn.execute("insert into t select generate_series(1,1), 'payload'"))
|
||||
sleep_sec = 2
|
||||
await asyncio.sleep(sleep_sec)
|
||||
# also restart ep at segment boundary to make test more interesting
|
||||
ep.stop()
|
||||
# it must still be not finished
|
||||
# assert not bg_query.done()
|
||||
# Without segment rename during init (#6402) previous statement created
|
||||
# partially initialized 16MB segment, so sk restart also triggers #6401.
|
||||
sk.stop().start()
|
||||
ep = env.endpoints.create_start("test_segment_init_failure")
|
||||
ep.safe_psql("insert into t select generate_series(1,1), 'payload'") # should be ok now
|
||||
|
||||
|
||||
# Test (injected) failure during WAL segment init.
|
||||
# https://github.com/neondatabase/neon/issues/6401
|
||||
# https://github.com/neondatabase/neon/issues/6402
|
||||
def test_segment_init_failure(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 1
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
asyncio.run(run_segment_init_failure(env))
|
||||
|
||||
|
||||
@dataclass
|
||||
class RaceConditionTest:
|
||||
iteration: int
|
||||
|
||||
Reference in New Issue
Block a user