walredo: add a ping method

This commit is contained in:
Christian Schwarz
2024-09-04 18:48:26 +00:00
parent 12d70e7515
commit 2b08a56f90
5 changed files with 83 additions and 4 deletions

View File

@@ -441,6 +441,14 @@ impl WalRedoManager {
}
}
pub(crate) async fn ping(&self,pg_version: u32) -> Result<(), walredo::Error> {
match self {
Self::Prod(_, mgr) => mgr.ping(pg_version).await,
#[cfg(test)]
Self::Test(_) => unimplemented!(),
}
}
pub(crate) fn status(&self) -> Option<WalRedoManagerStatus> {
match self {
WalRedoManager::Prod(_, m) => Some(m.status()),

View File

@@ -33,6 +33,7 @@ use crate::repository::Key;
use crate::walrecord::NeonWalRecord;
use anyhow::Context;
use bytes::{Bytes, BytesMut};
use once_cell::sync::Lazy;
use pageserver_api::models::{WalRedoManagerProcessStatus, WalRedoManagerStatus};
use pageserver_api::shard::TenantShardId;
use std::future::Future;
@@ -206,6 +207,15 @@ impl PostgresRedoManager {
}
}
pub async fn ping(&self, pg_version: u32) -> Result<(), Error> {
self.do_with_walredo_process(pg_version, |proc| async move {
proc.ping(Duration::from_secs(1))
.await
.map_err(Error::Other)
})
.await
}
pub fn status(&self) -> WalRedoManagerStatus {
WalRedoManagerStatus {
last_redo_at: {
@@ -546,6 +556,17 @@ mod tests {
use tracing::Instrument;
use utils::{id::TenantId, lsn::Lsn};
#[tokio::test]
async fn test_ping() {
let h = RedoHarness::new().unwrap();
h.manager
.ping(14)
.instrument(h.span())
.await
.expect("ping should work");
}
#[tokio::test]
async fn short_v14_redo() {
let expected = std::fs::read("test_data/short_v14_redo.page").unwrap();

View File

@@ -4,10 +4,7 @@ mod protocol;
use self::no_leak_child::NoLeakChild;
use crate::{
config::PageServerConf,
metrics::{WalRedoKillCause, WAL_REDO_PROCESS_COUNTERS, WAL_REDO_RECORD_COUNTER},
span::debug_assert_current_span_has_tenant_id,
walrecord::NeonWalRecord,
config::PageServerConf, metrics::{WalRedoKillCause, WAL_REDO_PROCESS_COUNTERS, WAL_REDO_RECORD_COUNTER}, page_cache::PAGE_SZ, span::debug_assert_current_span_has_tenant_id, walrecord::NeonWalRecord
};
use anyhow::Context;
use bytes::Bytes;
@@ -237,6 +234,23 @@ impl WalRedoProcess {
res
}
pub(crate) async fn ping(&self, timeout: Duration) -> anyhow::Result<()> {
let mut writebuf: Vec<u8> = Vec::with_capacity(4);
protocol::build_ping_msg(&mut writebuf);
let Ok(res) = tokio::time::timeout(timeout, self.apply_wal_records0(&writebuf)).await
else {
anyhow::bail!("WAL redo ping timed out");
};
let response = res?;
if response.len() != PAGE_SZ {
anyhow::bail!(
"WAL redo ping response should respond with page-sized response: {}",
response.len()
);
}
Ok(())
}
/// # Cancel-Safety
///
/// When not polled to completion (e.g. because in `tokio::select!` another

View File

@@ -55,3 +55,8 @@ pub(crate) fn build_get_page_msg(tag: BufferTag, buf: &mut Vec<u8>) {
tag.ser_into(buf)
.expect("serialize BufferTag should always succeed");
}
pub(crate) fn build_ping_msg(buf: &mut Vec<u8>) {
buf.put_u8(b'H');
buf.put_u32(4);
}

View File

@@ -24,6 +24,7 @@
* PushPage ('P'): Copy a page image (in the payload) to buffer cache
* ApplyRecord ('A'): Apply a WAL record (in the payload)
* GetPage ('G'): Return a page image from buffer cache.
* Ping ('H'): Return the input message.
*
* Currently, you only get a response to GetPage requests; the response is
* simply a 8k page, without any headers. Errors are logged to stderr.
@@ -130,6 +131,7 @@ static void ApplyRecord(StringInfo input_message);
static void apply_error_callback(void *arg);
static bool redo_block_filter(XLogReaderState *record, uint8 block_id);
static void GetPage(StringInfo input_message);
static void Ping(StringInfo input_message);
static ssize_t buffered_read(void *buf, size_t count);
static void CreateFakeSharedMemoryAndSemaphores();
@@ -391,6 +393,10 @@ WalRedoMain(int argc, char *argv[])
GetPage(&input_message);
break;
case 'H': /* Ping */
Ping(&input_message);
break;
/*
* EOF means we're done. Perform normal shutdown.
*/
@@ -1047,6 +1053,31 @@ GetPage(StringInfo input_message)
}
static void
Ping(StringInfo input_message)
{
int tot_written;
/* Response: the input message */
tot_written = 0;
do {
ssize_t rc;
static const char response[BLCKSZ] = {0};
rc = write(STDOUT_FILENO, &response[tot_written], BLCKSZ - tot_written);
if (rc < 0) {
/* If interrupted by signal, just retry */
if (errno == EINTR)
continue;
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write to stdout: %m")));
}
tot_written += rc;
} while (tot_written < BLCKSZ);
elog(TRACE, "Page sent back for ping");
}
/* Buffer used by buffered_read() */
static char stdin_buf[16 * 1024];
static size_t stdin_len = 0; /* # of bytes in buffer */