From c9876b099397c7b990a7d359dcc0fa3b9dade926 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Fri, 2 Feb 2024 21:49:11 +0200 Subject: [PATCH] Fix double-free bug in walredo process. (#6534) At the end of ApplyRecord(), we called pfree on the decoded record, if it was "oversized". However, we had alread linked it to the "decode queue" list in XLogReaderState. If we later called XLogBeginRead(), it called ResetDecoder and tried to free the same record again. The conditions to hit this are: - a large WAL record (larger than aboue 64 kB I think, per DEFAULT_DECODE_BUFFER_SIZE), and - another WAL record processed by the same WAL redo process after the large one. I think the reason we haven't seen this earlier is that you don't get WAL records that large that are sent to the WAL redo process, except when logical replication is enabled. Logical replication adds data to the WAL records, making them larger. To fix, allocate the buffer ourselves, and don't link it to the decode queue. Alternatively, we could perhaps have just removed the pfree(), but frankly I'm a bit scared about the whole queue thing. --- pgxn/neon_walredo/walredoproc.c | 48 +++++++--------- .../regress/test_logical_replication.py | 57 +++++++++++++++++++ 2 files changed, 78 insertions(+), 27 deletions(-) diff --git a/pgxn/neon_walredo/walredoproc.c b/pgxn/neon_walredo/walredoproc.c index 7ca4fe93df..6ca0b2a274 100644 --- a/pgxn/neon_walredo/walredoproc.c +++ b/pgxn/neon_walredo/walredoproc.c @@ -804,6 +804,9 @@ ApplyRecord(StringInfo input_message) ErrorContextCallback errcallback; #if PG_VERSION_NUM >= 150000 DecodedXLogRecord *decoded; +#define STATIC_DECODEBUF_SIZE (64 * 1024) + static char *static_decodebuf = NULL; + size_t required_space; #endif /* @@ -833,7 +836,19 @@ ApplyRecord(StringInfo input_message) XLogBeginRead(reader_state, lsn); #if PG_VERSION_NUM >= 150000 - decoded = (DecodedXLogRecord *) XLogReadRecordAlloc(reader_state, record->xl_tot_len, true); + /* + * For reasonably small records, reuse a fixed size buffer to reduce + * palloc overhead. + */ + required_space = DecodeXLogRecordRequiredSpace(record->xl_tot_len); + if (required_space <= STATIC_DECODEBUF_SIZE) + { + if (static_decodebuf == NULL) + static_decodebuf = MemoryContextAlloc(TopMemoryContext, STATIC_DECODEBUF_SIZE); + decoded = (DecodedXLogRecord *) static_decodebuf; + } + else + decoded = palloc(required_space); if (!DecodeXLogRecord(reader_state, decoded, record, lsn, &errormsg)) elog(ERROR, "failed to decode WAL record: %s", errormsg); @@ -842,37 +857,15 @@ ApplyRecord(StringInfo input_message) /* Record the location of the next record. */ decoded->next_lsn = reader_state->NextRecPtr; - /* - * If it's in the decode buffer, mark the decode buffer space as - * occupied. - */ - if (!decoded->oversized) - { - /* The new decode buffer head must be MAXALIGNed. */ - Assert(decoded->size == MAXALIGN(decoded->size)); - if ((char *) decoded == reader_state->decode_buffer) - reader_state->decode_buffer_tail = reader_state->decode_buffer + decoded->size; - else - reader_state->decode_buffer_tail += decoded->size; - } - - /* Insert it into the queue of decoded records. */ - Assert(reader_state->decode_queue_tail != decoded); - if (reader_state->decode_queue_tail) - reader_state->decode_queue_tail->next = decoded; - reader_state->decode_queue_tail = decoded; - if (!reader_state->decode_queue_head) - reader_state->decode_queue_head = decoded; - /* * Update the pointers to the beginning and one-past-the-end of this * record, again for the benefit of historical code that expected the * decoder to track this rather than accessing these fields of the record * itself. */ - reader_state->record = reader_state->decode_queue_head; - reader_state->ReadRecPtr = reader_state->record->lsn; - reader_state->EndRecPtr = reader_state->record->next_lsn; + reader_state->record = decoded; + reader_state->ReadRecPtr = decoded->lsn; + reader_state->EndRecPtr = decoded->next_lsn; } #else /* @@ -912,8 +905,9 @@ ApplyRecord(StringInfo input_message) elog(TRACE, "applied WAL record with LSN %X/%X", (uint32) (lsn >> 32), (uint32) lsn); + #if PG_VERSION_NUM >= 150000 - if (decoded && decoded->oversized) + if ((char *) decoded != static_decodebuf) pfree(decoded); #endif } diff --git a/test_runner/regress/test_logical_replication.py b/test_runner/regress/test_logical_replication.py index 51e358e60d..059ddf79ec 100644 --- a/test_runner/regress/test_logical_replication.py +++ b/test_runner/regress/test_logical_replication.py @@ -1,4 +1,6 @@ import time +from random import choice +from string import ascii_lowercase import pytest from fixtures.log_helper import log @@ -11,6 +13,10 @@ from fixtures.types import Lsn from fixtures.utils import query_scalar +def random_string(n: int): + return "".join([choice(ascii_lowercase) for _ in range(n)]) + + def test_logical_replication(neon_simple_env: NeonEnv, vanilla_pg): env = neon_simple_env @@ -238,6 +244,57 @@ def test_wal_page_boundary_start(neon_simple_env: NeonEnv, vanilla_pg): ) == endpoint.safe_psql("select sum(somedata) from replication_example") +# Test that WAL redo works for fairly large records. +# +# See https://github.com/neondatabase/neon/pull/6534. That wasn't a +# logical replication bug as such, but without logical replication, +# records passed ot the WAL redo process are never large enough to hit +# the bug. +def test_large_records(neon_simple_env: NeonEnv, vanilla_pg): + env = neon_simple_env + + env.neon_cli.create_branch("init") + endpoint = env.endpoints.create_start("init") + + cur = endpoint.connect().cursor() + cur.execute("CREATE TABLE reptbl(id int, largeval text);") + cur.execute("alter table reptbl replica identity full") + cur.execute("create publication pub1 for table reptbl") + + # now start subscriber + vanilla_pg.start() + vanilla_pg.safe_psql("CREATE TABLE reptbl(id int, largeval text);") + + log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}") + connstr = endpoint.connstr().replace("'", "''") + vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1") + + # Test simple insert, update, delete. But with very large values + value = random_string(10_000_000) + cur.execute(f"INSERT INTO reptbl VALUES (1, '{value}')") + logical_replication_sync(vanilla_pg, endpoint) + assert vanilla_pg.safe_psql("select id, largeval from reptbl") == [(1, value)] + + # Test delete, and reinsert another value + cur.execute("DELETE FROM reptbl WHERE id = 1") + cur.execute(f"INSERT INTO reptbl VALUES (2, '{value}')") + logical_replication_sync(vanilla_pg, endpoint) + assert vanilla_pg.safe_psql("select id, largeval from reptbl") == [(2, value)] + + value = random_string(10_000_000) + cur.execute(f"UPDATE reptbl SET largeval='{value}'") + logical_replication_sync(vanilla_pg, endpoint) + assert vanilla_pg.safe_psql("select id, largeval from reptbl") == [(2, value)] + + endpoint.stop() + endpoint.start() + cur = endpoint.connect().cursor() + value = random_string(10_000_000) + cur.execute(f"UPDATE reptbl SET largeval='{value}'") + logical_replication_sync(vanilla_pg, endpoint) + assert vanilla_pg.safe_psql("select id, largeval from reptbl") == [(2, value)] + + # # Check that slots are not inherited in brnach #