diff --git a/pgxn/neon/neon_walreader.c b/pgxn/neon/neon_walreader.c index b575712dbe..5854a7ef0f 100644 --- a/pgxn/neon/neon_walreader.c +++ b/pgxn/neon/neon_walreader.c @@ -611,6 +611,17 @@ NeonWALReadLocal(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size coun recptr = startptr; nbytes = count; +/* Try to read directly from WAL buffers first. */ +#if PG_MAJORVERSION_NUM >= 17 + { + Size rbytes; + rbytes = WALReadFromBuffers(p, recptr, nbytes, tli); + recptr += rbytes; + nbytes -= rbytes; + p += rbytes; + } +#endif + while (nbytes > 0) { uint32 startoff; diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index d2a6104c74..e89ffdb628 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -1361,29 +1361,35 @@ SendAppendRequests(Safekeeper *sk) if (sk->active_state == SS_ACTIVE_READ_WAL) { char *errmsg; + int req_len; req = &sk->appendRequest; + req_len = req->endLsn - req->beginLsn; - switch (wp->api.wal_read(sk, - &sk->outbuf.data[sk->outbuf.len], - req->beginLsn, - req->endLsn - req->beginLsn, - &errmsg)) + /* We send zero sized AppenRequests as heartbeats; don't wal_read for these. */ + if (req_len > 0) { - case NEON_WALREAD_SUCCESS: - break; - case NEON_WALREAD_WOULDBLOCK: - return true; - case NEON_WALREAD_ERROR: - wp_log(WARNING, "WAL reading for node %s:%s failed: %s", - sk->host, sk->port, errmsg); - ShutdownConnection(sk); - return false; - default: - Assert(false); + switch (wp->api.wal_read(sk, + &sk->outbuf.data[sk->outbuf.len], + req->beginLsn, + req_len, + &errmsg)) + { + case NEON_WALREAD_SUCCESS: + break; + case NEON_WALREAD_WOULDBLOCK: + return true; + case NEON_WALREAD_ERROR: + wp_log(WARNING, "WAL reading for node %s:%s failed: %s", + sk->host, sk->port, errmsg); + ShutdownConnection(sk); + return false; + default: + Assert(false); + } } - sk->outbuf.len += req->endLsn - req->beginLsn; + sk->outbuf.len += req_len; writeResult = wp->api.conn_async_write(sk, sk->outbuf.data, sk->outbuf.len); diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index 706941c3f0..86444084ff 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -1489,33 +1489,11 @@ walprop_pg_wal_read(Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count, { NeonWALReadResult res; -#if PG_MAJORVERSION_NUM >= 17 - if (!sk->wp->config->syncSafekeepers) - { - Size rbytes; - rbytes = WALReadFromBuffers(buf, startptr, count, - walprop_pg_get_timeline_id()); - - startptr += rbytes; - count -= rbytes; - } -#endif - - if (count == 0) - { - res = NEON_WALREAD_SUCCESS; - } - else - { - Assert(count > 0); - - /* Now read the remaining WAL from the WAL file */ - res = NeonWALRead(sk->xlogreader, - buf, - startptr, - count, - walprop_pg_get_timeline_id()); - } + res = NeonWALRead(sk->xlogreader, + buf, + startptr, + count, + walprop_pg_get_timeline_id()); if (res == NEON_WALREAD_SUCCESS) { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index e4d6e6da5d..e23f46d1ca 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4177,9 +4177,15 @@ class Safekeeper(LogUtils): return self def assert_no_errors(self): - assert not self.log_contains("manager task finished prematurely") - assert not self.log_contains("error while acquiring WalResidentTimeline guard") - assert not self.log_contains("timeout while acquiring WalResidentTimeline guard") + not_allowed = [ + "manager task finished prematurely", + "error while acquiring WalResidentTimeline guard", + "timeout while acquiring WalResidentTimeline guard", + "invalid xlog page header:", + "WAL record crc mismatch at", + ] + for na in not_allowed: + assert not self.log_contains(na) def append_logical_message( self, tenant_id: TenantId, timeline_id: TimelineId, request: dict[str, Any]