Fix tracking of the nextMulti in the pageserver's copy of CheckPoint (#6528)

Whenever we see an XLOG_MULTIXACT_CREATE_ID WAL record, we need to
update the nextMulti and NextMultiOffset fields in the pageserver's
copy of the CheckPoint struct, to cover the new multi-XID. In
PostgreSQL, this is done by updating an in-memory struct during WAL
replay, but because in Neon you can start a compute node at any LSN,
we need to have an up-to-date value pre-calculated in the pageserver
at all times. We do the same for nextXid.

However, we had a bug in WAL ingestion code that does that: the
multi-XIDs will wrap around at 2^32, just like XIDs, so we need to do
the comparisons in a wraparound-aware fashion.

Fix that, and add tests.

Fixes issue #6520

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
This commit is contained in:
Heikki Linnakangas
2024-07-01 01:49:49 +03:00
parent bc704917a3
commit 30027d94a2
4 changed files with 365 additions and 6 deletions

View File

@@ -356,6 +356,28 @@ impl CheckPoint {
}
false
}
/// Advance next multi-XID/offset to those given in arguments.
///
/// It's important that this handles wraparound correctly. This should match the
/// MultiXactAdvanceNextMXact() logic in PostgreSQL's xlog_redo() function.
///
/// Returns 'true' if the Checkpoint was updated.
pub fn update_next_multixid(&mut self, multi_xid: u32, multi_offset: u32) -> bool {
let mut modified = false;
if multi_xid.wrapping_sub(self.nextMulti) as i32 > 0 {
self.nextMulti = multi_xid;
modified = true;
}
if multi_offset.wrapping_sub(self.nextMultiOffset) as i32 > 0 {
self.nextMultiOffset = multi_offset;
modified = true;
}
modified
}
}
/// Generate new, empty WAL segment, with correct block headers at the first

View File

@@ -202,6 +202,53 @@ pub fn test_update_next_xid() {
assert_eq!(checkpoint.nextXid.value, 2048);
}
#[test]
pub fn test_update_next_multixid() {
let checkpoint_buf = [0u8; std::mem::size_of::<CheckPoint>()];
let mut checkpoint = CheckPoint::decode(&checkpoint_buf).unwrap();
// simple case
checkpoint.nextMulti = 20;
checkpoint.nextMultiOffset = 20;
checkpoint.update_next_multixid(1000, 2000);
assert_eq!(checkpoint.nextMulti, 1000);
assert_eq!(checkpoint.nextMultiOffset, 2000);
// No change
checkpoint.update_next_multixid(500, 900);
assert_eq!(checkpoint.nextMulti, 1000);
assert_eq!(checkpoint.nextMultiOffset, 2000);
// Close to wraparound, but not wrapped around yet
checkpoint.nextMulti = 0xffff0000;
checkpoint.nextMultiOffset = 0xfffe0000;
checkpoint.update_next_multixid(0xffff00ff, 0xfffe00ff);
assert_eq!(checkpoint.nextMulti, 0xffff00ff);
assert_eq!(checkpoint.nextMultiOffset, 0xfffe00ff);
// Wraparound
checkpoint.update_next_multixid(1, 900);
assert_eq!(checkpoint.nextMulti, 1);
assert_eq!(checkpoint.nextMultiOffset, 900);
// Wraparound nextMulti to 0.
//
// It's a bit surprising that nextMulti can be 0, because that's a special value
// (InvalidMultiXactId). However, that's how Postgres does it at multi-xid wraparound:
// nextMulti wraps around to 0, but then when the next multi-xid is assigned, it skips
// the 0 and the next multi-xid actually assigned is 1.
checkpoint.nextMulti = 0xffff0000;
checkpoint.nextMultiOffset = 0xfffe0000;
checkpoint.update_next_multixid(0, 0xfffe00ff);
assert_eq!(checkpoint.nextMulti, 0);
assert_eq!(checkpoint.nextMultiOffset, 0xfffe00ff);
// Wraparound nextMultiOffset to 0
checkpoint.update_next_multixid(0, 0);
assert_eq!(checkpoint.nextMulti, 0);
assert_eq!(checkpoint.nextMultiOffset, 0);
}
#[test]
pub fn test_encode_logical_message() {
let expected = [

View File

@@ -1384,14 +1384,31 @@ impl WalIngest {
// Note: The multixact members can wrap around, even within one WAL record.
offset = offset.wrapping_add(n_this_page as u32);
}
if xlrec.mid >= self.checkpoint.nextMulti {
self.checkpoint.nextMulti = xlrec.mid + 1;
self.checkpoint_modified = true;
}
if xlrec.moff + xlrec.nmembers > self.checkpoint.nextMultiOffset {
self.checkpoint.nextMultiOffset = xlrec.moff + xlrec.nmembers;
let next_offset = offset;
assert!(xlrec.moff.wrapping_add(xlrec.nmembers) == next_offset);
// Update next-multi-xid and next-offset
//
// NB: In PostgreSQL, the next-multi-xid stored in the control file is allowed to
// go to 0, and it's fixed up by skipping to FirstMultiXactId in functions that
// read it, like GetNewMultiXactId(). This is different from how nextXid is
// incremented! nextXid skips over < FirstNormalTransactionId when the the value
// is stored, so it's never 0 in a checkpoint.
//
// I don't know why it's done that way, it seems less error-prone to skip over 0
// when the value is stored rather than when it's read. But let's do it the same
// way here.
let next_multi_xid = xlrec.mid.wrapping_add(1);
if self
.checkpoint
.update_next_multixid(next_multi_xid, next_offset)
{
self.checkpoint_modified = true;
}
// Also update the next-xid with the highest member. According to the comments in
// multixact_redo(), this shouldn't be necessary, but let's do the same here.
let max_mbr_xid = xlrec.members.iter().fold(None, |acc, mbr| {
if let Some(max_xid) = acc {
if mbr.xid.wrapping_sub(max_xid) as i32 > 0 {

View File

@@ -7,6 +7,7 @@ from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
VanillaPostgres,
import_timeline_from_vanilla_postgres,
wait_for_wal_insert_lsn,
)
@@ -182,3 +183,275 @@ def test_import_at_2bil(
cur = conn.cursor()
cur.execute("SELECT count(*) from t")
assert cur.fetchone() == (10000 + 1 + 1,)
# Constants and macros copied from PostgreSQL multixact.c and headers. These are needed to
# calculate the SLRU segments that a particular multixid or multixid-offsets falls into.
BLCKSZ = 8192
MULTIXACT_OFFSETS_PER_PAGE = int(BLCKSZ / 4)
SLRU_PAGES_PER_SEGMENT = int(32)
MXACT_MEMBER_BITS_PER_XACT = 8
MXACT_MEMBER_FLAGS_PER_BYTE = 1
MULTIXACT_FLAGBYTES_PER_GROUP = 4
MULTIXACT_MEMBERS_PER_MEMBERGROUP = MULTIXACT_FLAGBYTES_PER_GROUP * MXACT_MEMBER_FLAGS_PER_BYTE
MULTIXACT_MEMBERGROUP_SIZE = 4 * MULTIXACT_MEMBERS_PER_MEMBERGROUP + MULTIXACT_FLAGBYTES_PER_GROUP
MULTIXACT_MEMBERGROUPS_PER_PAGE = int(BLCKSZ / MULTIXACT_MEMBERGROUP_SIZE)
MULTIXACT_MEMBERS_PER_PAGE = MULTIXACT_MEMBERGROUPS_PER_PAGE * MULTIXACT_MEMBERS_PER_MEMBERGROUP
def MultiXactIdToOffsetSegment(xid: int):
return int(xid / (SLRU_PAGES_PER_SEGMENT * MULTIXACT_OFFSETS_PER_PAGE))
def MXOffsetToMemberSegment(off: int):
return int(off / (SLRU_PAGES_PER_SEGMENT * MULTIXACT_MEMBERS_PER_PAGE))
def advance_multixid_to(
pg_bin: PgBin, vanilla_pg: VanillaPostgres, next_multi_xid: int, next_multi_offset: int
):
"""
Use pg_resetwal to advance the nextMulti and nextMultiOffset values in a stand-alone
Postgres cluster. This is useful to get close to wraparound or some other interesting
value, without having to burn a lot of time consuming the (multi-)XIDs one by one.
The new values should be higher than the old ones, in a wraparound-aware sense.
On entry, the server should be running. It will be shut down and restarted.
"""
# Read old values from the last checkpoint. We will pass the old oldestMultiXid value
# back to pg_resetwal, there's no option to leave it alone.
with vanilla_pg.connect() as conn:
with conn.cursor() as cur:
# Make sure the oldest-multi-xid value in the control file is up-to-date
cur.execute("checkpoint")
cur.execute("select oldest_multi_xid, next_multixact_id from pg_control_checkpoint()")
rec = cur.fetchone()
assert rec is not None
(ckpt_oldest_multi_xid, ckpt_next_multi_xid) = rec
log.info(f"oldestMultiXid was {ckpt_oldest_multi_xid}, nextMultiXid was {ckpt_next_multi_xid}")
log.info(f"Resetting to {next_multi_xid}")
# Use pg_resetwal to reset the next multiXid and multiOffset to given values.
vanilla_pg.stop()
pg_resetwal_path = os.path.join(pg_bin.pg_bin_path, "pg_resetwal")
cmd = [
pg_resetwal_path,
f"--multixact-ids={next_multi_xid},{ckpt_oldest_multi_xid}",
f"--multixact-offset={next_multi_offset}",
"-D",
str(vanilla_pg.pgdatadir),
]
pg_bin.run_capture(cmd)
# Because we skip over a lot of values, Postgres hasn't created the SLRU segments for
# the new values yet. Create them manually, to allow Postgres to start up.
#
# This leaves "gaps" in the SLRU where segments between old value and new value are
# missing. That's OK for our purposes. Autovacuum will print some warnings about the
# missing segments, but will clean it up by truncating the SLRUs up to the new value,
# closing the gap.
segname = "%04X" % MultiXactIdToOffsetSegment(next_multi_xid)
log.info(f"Creating dummy segment pg_multixact/offsets/{segname}")
with open(vanilla_pg.pgdatadir / "pg_multixact" / "offsets" / segname, "w") as of:
of.write("\0" * SLRU_PAGES_PER_SEGMENT * BLCKSZ)
of.flush()
segname = "%04X" % MXOffsetToMemberSegment(next_multi_offset)
log.info(f"Creating dummy segment pg_multixact/members/{segname}")
with open(vanilla_pg.pgdatadir / "pg_multixact" / "members" / segname, "w") as of:
of.write("\0" * SLRU_PAGES_PER_SEGMENT * BLCKSZ)
of.flush()
# Start Postgres again and wait until autovacuum has processed all the databases
#
# This allows truncating the SLRUs, fixing the gaps with missing segments.
vanilla_pg.start()
with vanilla_pg.connect().cursor() as cur:
for _ in range(1000):
datminmxid = int(
query_scalar(cur, "select min(datminmxid::text::int8) from pg_database")
)
log.info(f"datminmxid {datminmxid}")
if next_multi_xid - datminmxid < 1_000_000: # not wraparound-aware!
break
time.sleep(0.5)
def test_multixid_wraparound_import(
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,
pg_bin: PgBin,
vanilla_pg,
):
"""
Test that the wraparound of the "next-multi-xid" counter is handled correctly in
pageserver, And multi-offsets as well
"""
env = neon_env_builder.init_start()
# In order to to test multixid wraparound, we need to first advance the counter to
# within spitting distance of the wraparound, that is 2^32 multi-XIDs. We could simply
# run a workload that consumes a lot of multi-XIDs until we approach that, but that
# takes a very long time. So we cheat.
#
# Our strategy is to create a vanilla Postgres cluster, and use pg_resetwal to
# directly set the multi-xid counter a higher value. However, we cannot directly set
# it to just before 2^32 (~ 4 billion), because that would make the exisitng
# 'relminmxid' values to look like they're in the future. It's not clear how the
# system would behave in that situation. So instead, we bump it up ~ 1 billion
# multi-XIDs at a time, and let autovacuum to process all the relations and update
# 'relminmxid' between each run.
#
# XXX: For the multi-offsets, most of the bump is done in the last call. This is
# because advancing it ~ 1 billion at a time hit a pathological case in the
# MultiXactMemberFreezeThreshold() function, causing autovacuum not trigger multixid
# freezing. See
# https://www.postgresql.org/message-id/85fb354c-f89f-4d47-b3a2-3cbd461c90a3%40iki.fi
# Multi-offsets don't have the same wraparound problems at 2 billion mark as
# multi-xids do, so one big jump is fine.
vanilla_pg.configure(
[
"log_autovacuum_min_duration = 0",
# Perform anti-wraparound vacuuming aggressively
"autovacuum_naptime='1 s'",
"autovacuum_freeze_max_age = 1000000",
"autovacuum_multixact_freeze_max_age = 1000000",
],
)
vanilla_pg.start()
advance_multixid_to(pg_bin, vanilla_pg, 0x40000000, 0x10000000)
advance_multixid_to(pg_bin, vanilla_pg, 0x80000000, 0x20000000)
advance_multixid_to(pg_bin, vanilla_pg, 0xC0000000, 0x30000000)
advance_multixid_to(pg_bin, vanilla_pg, 0xFFFFFF00, 0xFFFFFF00)
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")
vanilla_pg.safe_psql("create table tt as select g as id from generate_series(1, 10) g")
vanilla_pg.safe_psql("CHECKPOINT")
# Import the cluster to the pageserver
tenant_id = TenantId.generate()
env.pageserver.tenant_create(tenant_id)
timeline_id = TimelineId.generate()
import_timeline_from_vanilla_postgres(
test_output_dir,
env,
pg_bin,
tenant_id,
timeline_id,
"imported_multixid_wraparound_test",
vanilla_pg.connstr(),
)
vanilla_pg.stop()
endpoint = env.endpoints.create_start(
"imported_multixid_wraparound_test",
tenant_id=tenant_id,
config_lines=[
"log_autovacuum_min_duration = 0",
"autovacuum_naptime='5 s'",
"autovacuum=off",
],
)
conn = endpoint.connect()
cur = conn.cursor()
assert query_scalar(cur, "select count(*) from tt") == 10 # sanity check
# Install extension containing function needed for test
cur.execute("CREATE EXTENSION neon_test_utils")
# Consume a lot of XIDs, just to advance the XIDs to different range than the
# multi-xids. That avoids confusion while debugging
cur.execute("select test_consume_xids(100000)")
cur.execute("select pg_switch_wal()")
cur.execute("checkpoint")
# Use subtransactions so that each row in 'tt' is stamped with different XID. Leave
# the transaction open.
cur.execute("BEGIN")
cur.execute(
"""
do $$
declare
idvar int;
begin
for idvar in select id from tt loop
begin
update tt set id = idvar where id = idvar;
exception when others then
raise 'didn''t expect an error: %', sqlerrm;
end;
end loop;
end;
$$;
"""
)
# In a different transaction, acquire a FOR KEY SHARE lock on each row. This generates
# a new multixid for each row, with the previous xmax and this transaction's XID as the
# members.
#
# Repeat this until the multi-xid counter wraps around.
conn3 = endpoint.connect()
cur3 = conn3.cursor()
next_multixact_id_before_restart = 0
observed_before_wraparound = False
while True:
cur3.execute("BEGIN")
cur3.execute("SELECT * FROM tt FOR KEY SHARE")
# Get the xmax of one of the rows we locked. It should be a multi-xid. It might
# not be the latest one, but close enough.
row_xmax = int(query_scalar(cur3, "SELECT xmax FROM tt LIMIT 1"))
cur3.execute("COMMIT")
log.info(f"observed a row with xmax {row_xmax}")
# High value means not wrapped around yet
if row_xmax >= 0xFFFFFF00:
observed_before_wraparound = True
continue
# xmax should not be a regular XID. (We bumped up the regular XID range earlier
# to around 100000 and above.)
assert row_xmax < 100
# xmax values < FirstNormalTransactionId (== 3) could be special XID values, or
# multixid values after wraparound. We don't know for sure which, so keep going to
# be sure we see value that's unambiguously a wrapped-around multixid
if row_xmax < 3:
continue
next_multixact_id_before_restart = row_xmax
log.info(
f"next_multixact_id is now at {next_multixact_id_before_restart} or a little higher"
)
break
# We should have observed the state before wraparound
assert observed_before_wraparound
cur.execute("COMMIT")
# Wait until pageserver has received all the data, and restart the endpoint
wait_for_wal_insert_lsn(env, endpoint, tenant_id, timeline_id)
endpoint.stop(mode="immediate") # 'immediate' to avoid writing shutdown checkpoint
endpoint.start()
# Check that the next-multixid value wrapped around correctly
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("select next_multixact_id from pg_control_checkpoint()")
next_multixact_id_after_restart = int(
query_scalar(cur, "select next_multixact_id from pg_control_checkpoint()")
)
log.info(f"next_multixact_id after restart: {next_multixact_id_after_restart}")
assert next_multixact_id_after_restart >= next_multixact_id_before_restart
# The multi-offset should wrap around as well
cur.execute("select next_multi_offset from pg_control_checkpoint()")
next_multi_offset_after_restart = int(
query_scalar(cur, "select next_multi_offset from pg_control_checkpoint()")
)
log.info(f"next_multi_offset after restart: {next_multi_offset_after_restart}")
assert next_multi_offset_after_restart < 100000