From 9aeee51bf3fe09dba2ccd1067cf309780b26674d Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 11 Jul 2025 12:53:33 +0200 Subject: [PATCH] safekeeper compiles now --- libs/neon_failpoint/src/macros.rs | 28 ++++++++++++++++++++++++++++ safekeeper/src/bin/safekeeper.rs | 2 +- safekeeper/src/send_wal.rs | 12 ++++++------ safekeeper/src/wal_backup.rs | 2 +- 4 files changed, 36 insertions(+), 8 deletions(-) diff --git a/libs/neon_failpoint/src/macros.rs b/libs/neon_failpoint/src/macros.rs index 76d3a8aa04..7029b4cf3e 100644 --- a/libs/neon_failpoint/src/macros.rs +++ b/libs/neon_failpoint/src/macros.rs @@ -28,6 +28,20 @@ macro_rules! fail_point { } } }}; + ($name:literal, $condition:expr, $closure:expr) => {{ + if cfg!(feature = "testing") { + if $condition { + match $crate::failpoint($name, None).await { + $crate::FailpointResult::Continue => {}, + $crate::FailpointResult::Return(value) => { + let closure = $closure; + return closure(value.as_str()); + }, + $crate::FailpointResult::Cancelled => {}, + } + } + } + }}; } /// Failpoint macro with context support @@ -56,6 +70,20 @@ macro_rules! fail_point_with_context { } } }}; + ($name:literal, $context:expr, $condition:expr, $closure:expr) => {{ + if cfg!(feature = "testing") { + if $condition { + match $crate::failpoint($name, Some($context)).await { + $crate::FailpointResult::Continue => {}, + $crate::FailpointResult::Return(value) => { + let closure = $closure; + return closure(value.as_str()); + }, + $crate::FailpointResult::Cancelled => {}, + } + } + } + }}; } /// Pausable failpoint macro - equivalent to the old pausable_failpoint diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index b2d5976ef4..ec3144a17c 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -65,7 +65,7 @@ const FEATURES: &[&str] = &[ fn version() -> String { format!( "{GIT_VERSION} failpoints: {}, features: {:?}", - fail::has_failpoints(), + neon_failpoint::has_failpoints(), FEATURES, ) } diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 177e759db5..f3fa8a5e47 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -872,14 +872,14 @@ impl WalSender<'_, IO> { async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> { loop { self.end_pos = self.end_watch.get(); - let have_something_to_send = (|| { - fail::fail_point!( + let have_something_to_send = async { + neon_failpoint::fail_point!( "sk-pause-send", self.appname.as_deref() != Some("pageserver"), |_| { false } ); self.end_pos > self.start_pos - })(); + }.await; if have_something_to_send { trace!("got end_pos {:?}, streaming", self.end_pos); @@ -931,14 +931,14 @@ impl WalSender<'_, IO> { /// - Err in case of error -- only if 1) term changed while fetching in recovery /// mode 2) watch channel closed, which must never happen. async fn wait_for_lsn(&mut self) -> anyhow::Result> { - let fp = (|| { - fail::fail_point!( + let fp = async { + neon_failpoint::fail_point!( "sk-pause-send", self.appname.as_deref() != Some("pageserver"), |_| { true } ); false - })(); + }.await; if fp { tokio::time::sleep(POLL_STATE_TIMEOUT).await; return Ok(None); diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 0e8dfd64c3..50a764a4d2 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -657,7 +657,7 @@ pub async fn delete_timeline( pausable_failpoint!("sk-delete-timeline-remote-pause"); - fail::fail_point!("sk-delete-timeline-remote", |_| { + neon_failpoint::fail_point!("sk-delete-timeline-remote", |_| { Err(anyhow::anyhow!("failpoint: sk-delete-timeline-remote")) });