From f54f0e8e2d4da4f2d297d73ffaceee37de4500d0 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Thu, 7 Nov 2024 14:29:52 +0300 Subject: [PATCH] Fix direct reading from WAL buffers. (#9639) Fix direct reading from WAL buffers. Pointer wasn't advanced which resulted in sending corrupted WAL if part of read used WAL buffers and part read from the file. Also move it to neon_walreader so that e.g. replication could also make use of it. ref https://github.com/neondatabase/cloud/issues/19567 --- pgxn/neon/neon_walreader.c | 11 ++++++++ pgxn/neon/walproposer.c | 40 +++++++++++++++------------ pgxn/neon/walproposer_pg.c | 32 ++++----------------- test_runner/fixtures/neon_fixtures.py | 12 ++++++-- 4 files changed, 48 insertions(+), 47 deletions(-) diff --git a/pgxn/neon/neon_walreader.c b/pgxn/neon/neon_walreader.c index b575712dbe..5854a7ef0f 100644 --- a/pgxn/neon/neon_walreader.c +++ b/pgxn/neon/neon_walreader.c @@ -611,6 +611,17 @@ NeonWALReadLocal(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size coun recptr = startptr; nbytes = count; +/* Try to read directly from WAL buffers first. */ +#if PG_MAJORVERSION_NUM >= 17 + { + Size rbytes; + rbytes = WALReadFromBuffers(p, recptr, nbytes, tli); + recptr += rbytes; + nbytes -= rbytes; + p += rbytes; + } +#endif + while (nbytes > 0) { uint32 startoff; diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index d2a6104c74..e89ffdb628 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -1361,29 +1361,35 @@ SendAppendRequests(Safekeeper *sk) if (sk->active_state == SS_ACTIVE_READ_WAL) { char *errmsg; + int req_len; req = &sk->appendRequest; + req_len = req->endLsn - req->beginLsn; - switch (wp->api.wal_read(sk, - &sk->outbuf.data[sk->outbuf.len], - req->beginLsn, - req->endLsn - req->beginLsn, - &errmsg)) + /* We send zero sized AppenRequests as heartbeats; don't wal_read for these. */ + if (req_len > 0) { - case NEON_WALREAD_SUCCESS: - break; - case NEON_WALREAD_WOULDBLOCK: - return true; - case NEON_WALREAD_ERROR: - wp_log(WARNING, "WAL reading for node %s:%s failed: %s", - sk->host, sk->port, errmsg); - ShutdownConnection(sk); - return false; - default: - Assert(false); + switch (wp->api.wal_read(sk, + &sk->outbuf.data[sk->outbuf.len], + req->beginLsn, + req_len, + &errmsg)) + { + case NEON_WALREAD_SUCCESS: + break; + case NEON_WALREAD_WOULDBLOCK: + return true; + case NEON_WALREAD_ERROR: + wp_log(WARNING, "WAL reading for node %s:%s failed: %s", + sk->host, sk->port, errmsg); + ShutdownConnection(sk); + return false; + default: + Assert(false); + } } - sk->outbuf.len += req->endLsn - req->beginLsn; + sk->outbuf.len += req_len; writeResult = wp->api.conn_async_write(sk, sk->outbuf.data, sk->outbuf.len); diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index 706941c3f0..86444084ff 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -1489,33 +1489,11 @@ walprop_pg_wal_read(Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count, { NeonWALReadResult res; -#if PG_MAJORVERSION_NUM >= 17 - if (!sk->wp->config->syncSafekeepers) - { - Size rbytes; - rbytes = WALReadFromBuffers(buf, startptr, count, - walprop_pg_get_timeline_id()); - - startptr += rbytes; - count -= rbytes; - } -#endif - - if (count == 0) - { - res = NEON_WALREAD_SUCCESS; - } - else - { - Assert(count > 0); - - /* Now read the remaining WAL from the WAL file */ - res = NeonWALRead(sk->xlogreader, - buf, - startptr, - count, - walprop_pg_get_timeline_id()); - } + res = NeonWALRead(sk->xlogreader, + buf, + startptr, + count, + walprop_pg_get_timeline_id()); if (res == NEON_WALREAD_SUCCESS) { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index e4d6e6da5d..e23f46d1ca 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4177,9 +4177,15 @@ class Safekeeper(LogUtils): return self def assert_no_errors(self): - assert not self.log_contains("manager task finished prematurely") - assert not self.log_contains("error while acquiring WalResidentTimeline guard") - assert not self.log_contains("timeout while acquiring WalResidentTimeline guard") + not_allowed = [ + "manager task finished prematurely", + "error while acquiring WalResidentTimeline guard", + "timeout while acquiring WalResidentTimeline guard", + "invalid xlog page header:", + "WAL record crc mismatch at", + ] + for na in not_allowed: + assert not self.log_contains(na) def append_logical_message( self, tenant_id: TenantId, timeline_id: TimelineId, request: dict[str, Any]