diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index fb30857ddf..a214a451f9 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -441,6 +441,14 @@ impl WalRedoManager { } } + pub(crate) async fn ping(&self,pg_version: u32) -> Result<(), walredo::Error> { + match self { + Self::Prod(_, mgr) => mgr.ping(pg_version).await, + #[cfg(test)] + Self::Test(_) => unimplemented!(), + } + } + pub(crate) fn status(&self) -> Option { match self { WalRedoManager::Prod(_, m) => Some(m.status()), diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 74255eccd2..632dbf37ee 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -33,6 +33,7 @@ use crate::repository::Key; use crate::walrecord::NeonWalRecord; use anyhow::Context; use bytes::{Bytes, BytesMut}; +use once_cell::sync::Lazy; use pageserver_api::models::{WalRedoManagerProcessStatus, WalRedoManagerStatus}; use pageserver_api::shard::TenantShardId; use std::future::Future; @@ -206,6 +207,15 @@ impl PostgresRedoManager { } } + pub async fn ping(&self, pg_version: u32) -> Result<(), Error> { + self.do_with_walredo_process(pg_version, |proc| async move { + proc.ping(Duration::from_secs(1)) + .await + .map_err(Error::Other) + }) + .await + } + pub fn status(&self) -> WalRedoManagerStatus { WalRedoManagerStatus { last_redo_at: { @@ -546,6 +556,17 @@ mod tests { use tracing::Instrument; use utils::{id::TenantId, lsn::Lsn}; + #[tokio::test] + async fn test_ping() { + let h = RedoHarness::new().unwrap(); + + h.manager + .ping(14) + .instrument(h.span()) + .await + .expect("ping should work"); + } + #[tokio::test] async fn short_v14_redo() { let expected = std::fs::read("test_data/short_v14_redo.page").unwrap(); diff --git a/pageserver/src/walredo/process.rs b/pageserver/src/walredo/process.rs index 9140d4f6aa..7c3fe0ae36 100644 --- a/pageserver/src/walredo/process.rs +++ b/pageserver/src/walredo/process.rs @@ -4,10 +4,7 @@ mod protocol; use self::no_leak_child::NoLeakChild; use crate::{ - config::PageServerConf, - metrics::{WalRedoKillCause, WAL_REDO_PROCESS_COUNTERS, WAL_REDO_RECORD_COUNTER}, - span::debug_assert_current_span_has_tenant_id, - walrecord::NeonWalRecord, + config::PageServerConf, metrics::{WalRedoKillCause, WAL_REDO_PROCESS_COUNTERS, WAL_REDO_RECORD_COUNTER}, page_cache::PAGE_SZ, span::debug_assert_current_span_has_tenant_id, walrecord::NeonWalRecord }; use anyhow::Context; use bytes::Bytes; @@ -237,6 +234,23 @@ impl WalRedoProcess { res } + pub(crate) async fn ping(&self, timeout: Duration) -> anyhow::Result<()> { + let mut writebuf: Vec = Vec::with_capacity(4); + protocol::build_ping_msg(&mut writebuf); + let Ok(res) = tokio::time::timeout(timeout, self.apply_wal_records0(&writebuf)).await + else { + anyhow::bail!("WAL redo ping timed out"); + }; + let response = res?; + if response.len() != PAGE_SZ { + anyhow::bail!( + "WAL redo ping response should respond with page-sized response: {}", + response.len() + ); + } + Ok(()) + } + /// # Cancel-Safety /// /// When not polled to completion (e.g. because in `tokio::select!` another diff --git a/pageserver/src/walredo/process/protocol.rs b/pageserver/src/walredo/process/protocol.rs index b703344cc8..de3ca8741b 100644 --- a/pageserver/src/walredo/process/protocol.rs +++ b/pageserver/src/walredo/process/protocol.rs @@ -55,3 +55,8 @@ pub(crate) fn build_get_page_msg(tag: BufferTag, buf: &mut Vec) { tag.ser_into(buf) .expect("serialize BufferTag should always succeed"); } + +pub(crate) fn build_ping_msg(buf: &mut Vec) { + buf.put_u8(b'H'); + buf.put_u32(4); +} diff --git a/pgxn/neon_walredo/walredoproc.c b/pgxn/neon_walredo/walredoproc.c index cc545393f5..8b319a23b6 100644 --- a/pgxn/neon_walredo/walredoproc.c +++ b/pgxn/neon_walredo/walredoproc.c @@ -24,6 +24,7 @@ * PushPage ('P'): Copy a page image (in the payload) to buffer cache * ApplyRecord ('A'): Apply a WAL record (in the payload) * GetPage ('G'): Return a page image from buffer cache. + * Ping ('H'): Return the input message. * * Currently, you only get a response to GetPage requests; the response is * simply a 8k page, without any headers. Errors are logged to stderr. @@ -130,6 +131,7 @@ static void ApplyRecord(StringInfo input_message); static void apply_error_callback(void *arg); static bool redo_block_filter(XLogReaderState *record, uint8 block_id); static void GetPage(StringInfo input_message); +static void Ping(StringInfo input_message); static ssize_t buffered_read(void *buf, size_t count); static void CreateFakeSharedMemoryAndSemaphores(); @@ -391,6 +393,10 @@ WalRedoMain(int argc, char *argv[]) GetPage(&input_message); break; + case 'H': /* Ping */ + Ping(&input_message); + break; + /* * EOF means we're done. Perform normal shutdown. */ @@ -1047,6 +1053,31 @@ GetPage(StringInfo input_message) } +static void +Ping(StringInfo input_message) +{ + int tot_written; + /* Response: the input message */ + tot_written = 0; + do { + ssize_t rc; + static const char response[BLCKSZ] = {0}; + rc = write(STDOUT_FILENO, &response[tot_written], BLCKSZ - tot_written); + if (rc < 0) { + /* If interrupted by signal, just retry */ + if (errno == EINTR) + continue; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to stdout: %m"))); + } + tot_written += rc; + } while (tot_written < BLCKSZ); + + elog(TRACE, "Page sent back for ping"); +} + + /* Buffer used by buffered_read() */ static char stdin_buf[16 * 1024]; static size_t stdin_len = 0; /* # of bytes in buffer */