mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
Fix double-free bug in walredo process. (#6534)
At the end of ApplyRecord(), we called pfree on the decoded record, if it was "oversized". However, we had alread linked it to the "decode queue" list in XLogReaderState. If we later called XLogBeginRead(), it called ResetDecoder and tried to free the same record again. The conditions to hit this are: - a large WAL record (larger than aboue 64 kB I think, per DEFAULT_DECODE_BUFFER_SIZE), and - another WAL record processed by the same WAL redo process after the large one. I think the reason we haven't seen this earlier is that you don't get WAL records that large that are sent to the WAL redo process, except when logical replication is enabled. Logical replication adds data to the WAL records, making them larger. To fix, allocate the buffer ourselves, and don't link it to the decode queue. Alternatively, we could perhaps have just removed the pfree(), but frankly I'm a bit scared about the whole queue thing.
This commit is contained in:
committed by
GitHub
parent
786e9cf75b
commit
c9876b0993
@@ -804,6 +804,9 @@ ApplyRecord(StringInfo input_message)
|
||||
ErrorContextCallback errcallback;
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
DecodedXLogRecord *decoded;
|
||||
#define STATIC_DECODEBUF_SIZE (64 * 1024)
|
||||
static char *static_decodebuf = NULL;
|
||||
size_t required_space;
|
||||
#endif
|
||||
|
||||
/*
|
||||
@@ -833,7 +836,19 @@ ApplyRecord(StringInfo input_message)
|
||||
XLogBeginRead(reader_state, lsn);
|
||||
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
decoded = (DecodedXLogRecord *) XLogReadRecordAlloc(reader_state, record->xl_tot_len, true);
|
||||
/*
|
||||
* For reasonably small records, reuse a fixed size buffer to reduce
|
||||
* palloc overhead.
|
||||
*/
|
||||
required_space = DecodeXLogRecordRequiredSpace(record->xl_tot_len);
|
||||
if (required_space <= STATIC_DECODEBUF_SIZE)
|
||||
{
|
||||
if (static_decodebuf == NULL)
|
||||
static_decodebuf = MemoryContextAlloc(TopMemoryContext, STATIC_DECODEBUF_SIZE);
|
||||
decoded = (DecodedXLogRecord *) static_decodebuf;
|
||||
}
|
||||
else
|
||||
decoded = palloc(required_space);
|
||||
|
||||
if (!DecodeXLogRecord(reader_state, decoded, record, lsn, &errormsg))
|
||||
elog(ERROR, "failed to decode WAL record: %s", errormsg);
|
||||
@@ -842,37 +857,15 @@ ApplyRecord(StringInfo input_message)
|
||||
/* Record the location of the next record. */
|
||||
decoded->next_lsn = reader_state->NextRecPtr;
|
||||
|
||||
/*
|
||||
* If it's in the decode buffer, mark the decode buffer space as
|
||||
* occupied.
|
||||
*/
|
||||
if (!decoded->oversized)
|
||||
{
|
||||
/* The new decode buffer head must be MAXALIGNed. */
|
||||
Assert(decoded->size == MAXALIGN(decoded->size));
|
||||
if ((char *) decoded == reader_state->decode_buffer)
|
||||
reader_state->decode_buffer_tail = reader_state->decode_buffer + decoded->size;
|
||||
else
|
||||
reader_state->decode_buffer_tail += decoded->size;
|
||||
}
|
||||
|
||||
/* Insert it into the queue of decoded records. */
|
||||
Assert(reader_state->decode_queue_tail != decoded);
|
||||
if (reader_state->decode_queue_tail)
|
||||
reader_state->decode_queue_tail->next = decoded;
|
||||
reader_state->decode_queue_tail = decoded;
|
||||
if (!reader_state->decode_queue_head)
|
||||
reader_state->decode_queue_head = decoded;
|
||||
|
||||
/*
|
||||
* Update the pointers to the beginning and one-past-the-end of this
|
||||
* record, again for the benefit of historical code that expected the
|
||||
* decoder to track this rather than accessing these fields of the record
|
||||
* itself.
|
||||
*/
|
||||
reader_state->record = reader_state->decode_queue_head;
|
||||
reader_state->ReadRecPtr = reader_state->record->lsn;
|
||||
reader_state->EndRecPtr = reader_state->record->next_lsn;
|
||||
reader_state->record = decoded;
|
||||
reader_state->ReadRecPtr = decoded->lsn;
|
||||
reader_state->EndRecPtr = decoded->next_lsn;
|
||||
}
|
||||
#else
|
||||
/*
|
||||
@@ -912,8 +905,9 @@ ApplyRecord(StringInfo input_message)
|
||||
|
||||
elog(TRACE, "applied WAL record with LSN %X/%X",
|
||||
(uint32) (lsn >> 32), (uint32) lsn);
|
||||
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
if (decoded && decoded->oversized)
|
||||
if ((char *) decoded != static_decodebuf)
|
||||
pfree(decoded);
|
||||
#endif
|
||||
}
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
import time
|
||||
from random import choice
|
||||
from string import ascii_lowercase
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
@@ -11,6 +13,10 @@ from fixtures.types import Lsn
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
def random_string(n: int):
|
||||
return "".join([choice(ascii_lowercase) for _ in range(n)])
|
||||
|
||||
|
||||
def test_logical_replication(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
env = neon_simple_env
|
||||
|
||||
@@ -238,6 +244,57 @@ def test_wal_page_boundary_start(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
) == endpoint.safe_psql("select sum(somedata) from replication_example")
|
||||
|
||||
|
||||
# Test that WAL redo works for fairly large records.
|
||||
#
|
||||
# See https://github.com/neondatabase/neon/pull/6534. That wasn't a
|
||||
# logical replication bug as such, but without logical replication,
|
||||
# records passed ot the WAL redo process are never large enough to hit
|
||||
# the bug.
|
||||
def test_large_records(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
env = neon_simple_env
|
||||
|
||||
env.neon_cli.create_branch("init")
|
||||
endpoint = env.endpoints.create_start("init")
|
||||
|
||||
cur = endpoint.connect().cursor()
|
||||
cur.execute("CREATE TABLE reptbl(id int, largeval text);")
|
||||
cur.execute("alter table reptbl replica identity full")
|
||||
cur.execute("create publication pub1 for table reptbl")
|
||||
|
||||
# now start subscriber
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("CREATE TABLE reptbl(id int, largeval text);")
|
||||
|
||||
log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}")
|
||||
connstr = endpoint.connstr().replace("'", "''")
|
||||
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1")
|
||||
|
||||
# Test simple insert, update, delete. But with very large values
|
||||
value = random_string(10_000_000)
|
||||
cur.execute(f"INSERT INTO reptbl VALUES (1, '{value}')")
|
||||
logical_replication_sync(vanilla_pg, endpoint)
|
||||
assert vanilla_pg.safe_psql("select id, largeval from reptbl") == [(1, value)]
|
||||
|
||||
# Test delete, and reinsert another value
|
||||
cur.execute("DELETE FROM reptbl WHERE id = 1")
|
||||
cur.execute(f"INSERT INTO reptbl VALUES (2, '{value}')")
|
||||
logical_replication_sync(vanilla_pg, endpoint)
|
||||
assert vanilla_pg.safe_psql("select id, largeval from reptbl") == [(2, value)]
|
||||
|
||||
value = random_string(10_000_000)
|
||||
cur.execute(f"UPDATE reptbl SET largeval='{value}'")
|
||||
logical_replication_sync(vanilla_pg, endpoint)
|
||||
assert vanilla_pg.safe_psql("select id, largeval from reptbl") == [(2, value)]
|
||||
|
||||
endpoint.stop()
|
||||
endpoint.start()
|
||||
cur = endpoint.connect().cursor()
|
||||
value = random_string(10_000_000)
|
||||
cur.execute(f"UPDATE reptbl SET largeval='{value}'")
|
||||
logical_replication_sync(vanilla_pg, endpoint)
|
||||
assert vanilla_pg.safe_psql("select id, largeval from reptbl") == [(2, value)]
|
||||
|
||||
|
||||
#
|
||||
# Check that slots are not inherited in brnach
|
||||
#
|
||||
|
||||
Reference in New Issue
Block a user