From 30027d94a26ad6624e1b0f55d3819a1c4cb8f59d Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Mon, 1 Jul 2024 01:49:49 +0300 Subject: [PATCH 1/6] Fix tracking of the nextMulti in the pageserver's copy of CheckPoint (#6528) Whenever we see an XLOG_MULTIXACT_CREATE_ID WAL record, we need to update the nextMulti and NextMultiOffset fields in the pageserver's copy of the CheckPoint struct, to cover the new multi-XID. In PostgreSQL, this is done by updating an in-memory struct during WAL replay, but because in Neon you can start a compute node at any LSN, we need to have an up-to-date value pre-calculated in the pageserver at all times. We do the same for nextXid. However, we had a bug in WAL ingestion code that does that: the multi-XIDs will wrap around at 2^32, just like XIDs, so we need to do the comparisons in a wraparound-aware fashion. Fix that, and add tests. Fixes issue #6520 Co-authored-by: Konstantin Knizhnik --- libs/postgres_ffi/src/xlog_utils.rs | 22 ++ .../wal_craft/src/xlog_utils_test.rs | 47 +++ pageserver/src/walingest.rs | 29 +- test_runner/regress/test_next_xid.py | 273 ++++++++++++++++++ 4 files changed, 365 insertions(+), 6 deletions(-) diff --git a/libs/postgres_ffi/src/xlog_utils.rs b/libs/postgres_ffi/src/xlog_utils.rs index 0bbb91afc2..d25b23663b 100644 --- a/libs/postgres_ffi/src/xlog_utils.rs +++ b/libs/postgres_ffi/src/xlog_utils.rs @@ -356,6 +356,28 @@ impl CheckPoint { } false } + + /// Advance next multi-XID/offset to those given in arguments. + /// + /// It's important that this handles wraparound correctly. This should match the + /// MultiXactAdvanceNextMXact() logic in PostgreSQL's xlog_redo() function. + /// + /// Returns 'true' if the Checkpoint was updated. + pub fn update_next_multixid(&mut self, multi_xid: u32, multi_offset: u32) -> bool { + let mut modified = false; + + if multi_xid.wrapping_sub(self.nextMulti) as i32 > 0 { + self.nextMulti = multi_xid; + modified = true; + } + + if multi_offset.wrapping_sub(self.nextMultiOffset) as i32 > 0 { + self.nextMultiOffset = multi_offset; + modified = true; + } + + modified + } } /// Generate new, empty WAL segment, with correct block headers at the first diff --git a/libs/postgres_ffi/wal_craft/src/xlog_utils_test.rs b/libs/postgres_ffi/wal_craft/src/xlog_utils_test.rs index 496458b2e4..750affc94e 100644 --- a/libs/postgres_ffi/wal_craft/src/xlog_utils_test.rs +++ b/libs/postgres_ffi/wal_craft/src/xlog_utils_test.rs @@ -202,6 +202,53 @@ pub fn test_update_next_xid() { assert_eq!(checkpoint.nextXid.value, 2048); } +#[test] +pub fn test_update_next_multixid() { + let checkpoint_buf = [0u8; std::mem::size_of::()]; + let mut checkpoint = CheckPoint::decode(&checkpoint_buf).unwrap(); + + // simple case + checkpoint.nextMulti = 20; + checkpoint.nextMultiOffset = 20; + checkpoint.update_next_multixid(1000, 2000); + assert_eq!(checkpoint.nextMulti, 1000); + assert_eq!(checkpoint.nextMultiOffset, 2000); + + // No change + checkpoint.update_next_multixid(500, 900); + assert_eq!(checkpoint.nextMulti, 1000); + assert_eq!(checkpoint.nextMultiOffset, 2000); + + // Close to wraparound, but not wrapped around yet + checkpoint.nextMulti = 0xffff0000; + checkpoint.nextMultiOffset = 0xfffe0000; + checkpoint.update_next_multixid(0xffff00ff, 0xfffe00ff); + assert_eq!(checkpoint.nextMulti, 0xffff00ff); + assert_eq!(checkpoint.nextMultiOffset, 0xfffe00ff); + + // Wraparound + checkpoint.update_next_multixid(1, 900); + assert_eq!(checkpoint.nextMulti, 1); + assert_eq!(checkpoint.nextMultiOffset, 900); + + // Wraparound nextMulti to 0. + // + // It's a bit surprising that nextMulti can be 0, because that's a special value + // (InvalidMultiXactId). However, that's how Postgres does it at multi-xid wraparound: + // nextMulti wraps around to 0, but then when the next multi-xid is assigned, it skips + // the 0 and the next multi-xid actually assigned is 1. + checkpoint.nextMulti = 0xffff0000; + checkpoint.nextMultiOffset = 0xfffe0000; + checkpoint.update_next_multixid(0, 0xfffe00ff); + assert_eq!(checkpoint.nextMulti, 0); + assert_eq!(checkpoint.nextMultiOffset, 0xfffe00ff); + + // Wraparound nextMultiOffset to 0 + checkpoint.update_next_multixid(0, 0); + assert_eq!(checkpoint.nextMulti, 0); + assert_eq!(checkpoint.nextMultiOffset, 0); +} + #[test] pub fn test_encode_logical_message() { let expected = [ diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 4f26f2f6d1..fb10bca5a6 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -1384,14 +1384,31 @@ impl WalIngest { // Note: The multixact members can wrap around, even within one WAL record. offset = offset.wrapping_add(n_this_page as u32); } - if xlrec.mid >= self.checkpoint.nextMulti { - self.checkpoint.nextMulti = xlrec.mid + 1; - self.checkpoint_modified = true; - } - if xlrec.moff + xlrec.nmembers > self.checkpoint.nextMultiOffset { - self.checkpoint.nextMultiOffset = xlrec.moff + xlrec.nmembers; + let next_offset = offset; + assert!(xlrec.moff.wrapping_add(xlrec.nmembers) == next_offset); + + // Update next-multi-xid and next-offset + // + // NB: In PostgreSQL, the next-multi-xid stored in the control file is allowed to + // go to 0, and it's fixed up by skipping to FirstMultiXactId in functions that + // read it, like GetNewMultiXactId(). This is different from how nextXid is + // incremented! nextXid skips over < FirstNormalTransactionId when the the value + // is stored, so it's never 0 in a checkpoint. + // + // I don't know why it's done that way, it seems less error-prone to skip over 0 + // when the value is stored rather than when it's read. But let's do it the same + // way here. + let next_multi_xid = xlrec.mid.wrapping_add(1); + + if self + .checkpoint + .update_next_multixid(next_multi_xid, next_offset) + { self.checkpoint_modified = true; } + + // Also update the next-xid with the highest member. According to the comments in + // multixact_redo(), this shouldn't be necessary, but let's do the same here. let max_mbr_xid = xlrec.members.iter().fold(None, |acc, mbr| { if let Some(max_xid) = acc { if mbr.xid.wrapping_sub(max_xid) as i32 > 0 { diff --git a/test_runner/regress/test_next_xid.py b/test_runner/regress/test_next_xid.py index b9e7e642b5..51e847135e 100644 --- a/test_runner/regress/test_next_xid.py +++ b/test_runner/regress/test_next_xid.py @@ -7,6 +7,7 @@ from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnvBuilder, PgBin, + VanillaPostgres, import_timeline_from_vanilla_postgres, wait_for_wal_insert_lsn, ) @@ -182,3 +183,275 @@ def test_import_at_2bil( cur = conn.cursor() cur.execute("SELECT count(*) from t") assert cur.fetchone() == (10000 + 1 + 1,) + + +# Constants and macros copied from PostgreSQL multixact.c and headers. These are needed to +# calculate the SLRU segments that a particular multixid or multixid-offsets falls into. +BLCKSZ = 8192 +MULTIXACT_OFFSETS_PER_PAGE = int(BLCKSZ / 4) +SLRU_PAGES_PER_SEGMENT = int(32) +MXACT_MEMBER_BITS_PER_XACT = 8 +MXACT_MEMBER_FLAGS_PER_BYTE = 1 +MULTIXACT_FLAGBYTES_PER_GROUP = 4 +MULTIXACT_MEMBERS_PER_MEMBERGROUP = MULTIXACT_FLAGBYTES_PER_GROUP * MXACT_MEMBER_FLAGS_PER_BYTE +MULTIXACT_MEMBERGROUP_SIZE = 4 * MULTIXACT_MEMBERS_PER_MEMBERGROUP + MULTIXACT_FLAGBYTES_PER_GROUP +MULTIXACT_MEMBERGROUPS_PER_PAGE = int(BLCKSZ / MULTIXACT_MEMBERGROUP_SIZE) +MULTIXACT_MEMBERS_PER_PAGE = MULTIXACT_MEMBERGROUPS_PER_PAGE * MULTIXACT_MEMBERS_PER_MEMBERGROUP + + +def MultiXactIdToOffsetSegment(xid: int): + return int(xid / (SLRU_PAGES_PER_SEGMENT * MULTIXACT_OFFSETS_PER_PAGE)) + + +def MXOffsetToMemberSegment(off: int): + return int(off / (SLRU_PAGES_PER_SEGMENT * MULTIXACT_MEMBERS_PER_PAGE)) + + +def advance_multixid_to( + pg_bin: PgBin, vanilla_pg: VanillaPostgres, next_multi_xid: int, next_multi_offset: int +): + """ + Use pg_resetwal to advance the nextMulti and nextMultiOffset values in a stand-alone + Postgres cluster. This is useful to get close to wraparound or some other interesting + value, without having to burn a lot of time consuming the (multi-)XIDs one by one. + + The new values should be higher than the old ones, in a wraparound-aware sense. + + On entry, the server should be running. It will be shut down and restarted. + """ + + # Read old values from the last checkpoint. We will pass the old oldestMultiXid value + # back to pg_resetwal, there's no option to leave it alone. + with vanilla_pg.connect() as conn: + with conn.cursor() as cur: + # Make sure the oldest-multi-xid value in the control file is up-to-date + cur.execute("checkpoint") + cur.execute("select oldest_multi_xid, next_multixact_id from pg_control_checkpoint()") + rec = cur.fetchone() + assert rec is not None + (ckpt_oldest_multi_xid, ckpt_next_multi_xid) = rec + log.info(f"oldestMultiXid was {ckpt_oldest_multi_xid}, nextMultiXid was {ckpt_next_multi_xid}") + log.info(f"Resetting to {next_multi_xid}") + + # Use pg_resetwal to reset the next multiXid and multiOffset to given values. + vanilla_pg.stop() + pg_resetwal_path = os.path.join(pg_bin.pg_bin_path, "pg_resetwal") + cmd = [ + pg_resetwal_path, + f"--multixact-ids={next_multi_xid},{ckpt_oldest_multi_xid}", + f"--multixact-offset={next_multi_offset}", + "-D", + str(vanilla_pg.pgdatadir), + ] + pg_bin.run_capture(cmd) + + # Because we skip over a lot of values, Postgres hasn't created the SLRU segments for + # the new values yet. Create them manually, to allow Postgres to start up. + # + # This leaves "gaps" in the SLRU where segments between old value and new value are + # missing. That's OK for our purposes. Autovacuum will print some warnings about the + # missing segments, but will clean it up by truncating the SLRUs up to the new value, + # closing the gap. + segname = "%04X" % MultiXactIdToOffsetSegment(next_multi_xid) + log.info(f"Creating dummy segment pg_multixact/offsets/{segname}") + with open(vanilla_pg.pgdatadir / "pg_multixact" / "offsets" / segname, "w") as of: + of.write("\0" * SLRU_PAGES_PER_SEGMENT * BLCKSZ) + of.flush() + + segname = "%04X" % MXOffsetToMemberSegment(next_multi_offset) + log.info(f"Creating dummy segment pg_multixact/members/{segname}") + with open(vanilla_pg.pgdatadir / "pg_multixact" / "members" / segname, "w") as of: + of.write("\0" * SLRU_PAGES_PER_SEGMENT * BLCKSZ) + of.flush() + + # Start Postgres again and wait until autovacuum has processed all the databases + # + # This allows truncating the SLRUs, fixing the gaps with missing segments. + vanilla_pg.start() + with vanilla_pg.connect().cursor() as cur: + for _ in range(1000): + datminmxid = int( + query_scalar(cur, "select min(datminmxid::text::int8) from pg_database") + ) + log.info(f"datminmxid {datminmxid}") + if next_multi_xid - datminmxid < 1_000_000: # not wraparound-aware! + break + time.sleep(0.5) + + +def test_multixid_wraparound_import( + neon_env_builder: NeonEnvBuilder, + test_output_dir: Path, + pg_bin: PgBin, + vanilla_pg, +): + """ + Test that the wraparound of the "next-multi-xid" counter is handled correctly in + pageserver, And multi-offsets as well + """ + env = neon_env_builder.init_start() + + # In order to to test multixid wraparound, we need to first advance the counter to + # within spitting distance of the wraparound, that is 2^32 multi-XIDs. We could simply + # run a workload that consumes a lot of multi-XIDs until we approach that, but that + # takes a very long time. So we cheat. + # + # Our strategy is to create a vanilla Postgres cluster, and use pg_resetwal to + # directly set the multi-xid counter a higher value. However, we cannot directly set + # it to just before 2^32 (~ 4 billion), because that would make the exisitng + # 'relminmxid' values to look like they're in the future. It's not clear how the + # system would behave in that situation. So instead, we bump it up ~ 1 billion + # multi-XIDs at a time, and let autovacuum to process all the relations and update + # 'relminmxid' between each run. + # + # XXX: For the multi-offsets, most of the bump is done in the last call. This is + # because advancing it ~ 1 billion at a time hit a pathological case in the + # MultiXactMemberFreezeThreshold() function, causing autovacuum not trigger multixid + # freezing. See + # https://www.postgresql.org/message-id/85fb354c-f89f-4d47-b3a2-3cbd461c90a3%40iki.fi + # Multi-offsets don't have the same wraparound problems at 2 billion mark as + # multi-xids do, so one big jump is fine. + vanilla_pg.configure( + [ + "log_autovacuum_min_duration = 0", + # Perform anti-wraparound vacuuming aggressively + "autovacuum_naptime='1 s'", + "autovacuum_freeze_max_age = 1000000", + "autovacuum_multixact_freeze_max_age = 1000000", + ], + ) + vanilla_pg.start() + advance_multixid_to(pg_bin, vanilla_pg, 0x40000000, 0x10000000) + advance_multixid_to(pg_bin, vanilla_pg, 0x80000000, 0x20000000) + advance_multixid_to(pg_bin, vanilla_pg, 0xC0000000, 0x30000000) + advance_multixid_to(pg_bin, vanilla_pg, 0xFFFFFF00, 0xFFFFFF00) + + vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser") + vanilla_pg.safe_psql("create table tt as select g as id from generate_series(1, 10) g") + vanilla_pg.safe_psql("CHECKPOINT") + + # Import the cluster to the pageserver + tenant_id = TenantId.generate() + env.pageserver.tenant_create(tenant_id) + timeline_id = TimelineId.generate() + import_timeline_from_vanilla_postgres( + test_output_dir, + env, + pg_bin, + tenant_id, + timeline_id, + "imported_multixid_wraparound_test", + vanilla_pg.connstr(), + ) + vanilla_pg.stop() + + endpoint = env.endpoints.create_start( + "imported_multixid_wraparound_test", + tenant_id=tenant_id, + config_lines=[ + "log_autovacuum_min_duration = 0", + "autovacuum_naptime='5 s'", + "autovacuum=off", + ], + ) + conn = endpoint.connect() + cur = conn.cursor() + assert query_scalar(cur, "select count(*) from tt") == 10 # sanity check + + # Install extension containing function needed for test + cur.execute("CREATE EXTENSION neon_test_utils") + + # Consume a lot of XIDs, just to advance the XIDs to different range than the + # multi-xids. That avoids confusion while debugging + cur.execute("select test_consume_xids(100000)") + cur.execute("select pg_switch_wal()") + cur.execute("checkpoint") + + # Use subtransactions so that each row in 'tt' is stamped with different XID. Leave + # the transaction open. + cur.execute("BEGIN") + cur.execute( + """ +do $$ +declare + idvar int; +begin + for idvar in select id from tt loop + begin + update tt set id = idvar where id = idvar; + exception when others then + raise 'didn''t expect an error: %', sqlerrm; + end; + end loop; +end; +$$; +""" + ) + + # In a different transaction, acquire a FOR KEY SHARE lock on each row. This generates + # a new multixid for each row, with the previous xmax and this transaction's XID as the + # members. + # + # Repeat this until the multi-xid counter wraps around. + conn3 = endpoint.connect() + cur3 = conn3.cursor() + next_multixact_id_before_restart = 0 + observed_before_wraparound = False + while True: + cur3.execute("BEGIN") + cur3.execute("SELECT * FROM tt FOR KEY SHARE") + + # Get the xmax of one of the rows we locked. It should be a multi-xid. It might + # not be the latest one, but close enough. + row_xmax = int(query_scalar(cur3, "SELECT xmax FROM tt LIMIT 1")) + cur3.execute("COMMIT") + log.info(f"observed a row with xmax {row_xmax}") + + # High value means not wrapped around yet + if row_xmax >= 0xFFFFFF00: + observed_before_wraparound = True + continue + + # xmax should not be a regular XID. (We bumped up the regular XID range earlier + # to around 100000 and above.) + assert row_xmax < 100 + + # xmax values < FirstNormalTransactionId (== 3) could be special XID values, or + # multixid values after wraparound. We don't know for sure which, so keep going to + # be sure we see value that's unambiguously a wrapped-around multixid + if row_xmax < 3: + continue + + next_multixact_id_before_restart = row_xmax + log.info( + f"next_multixact_id is now at {next_multixact_id_before_restart} or a little higher" + ) + break + + # We should have observed the state before wraparound + assert observed_before_wraparound + + cur.execute("COMMIT") + + # Wait until pageserver has received all the data, and restart the endpoint + wait_for_wal_insert_lsn(env, endpoint, tenant_id, timeline_id) + endpoint.stop(mode="immediate") # 'immediate' to avoid writing shutdown checkpoint + endpoint.start() + + # Check that the next-multixid value wrapped around correctly + conn = endpoint.connect() + cur = conn.cursor() + cur.execute("select next_multixact_id from pg_control_checkpoint()") + next_multixact_id_after_restart = int( + query_scalar(cur, "select next_multixact_id from pg_control_checkpoint()") + ) + log.info(f"next_multixact_id after restart: {next_multixact_id_after_restart}") + assert next_multixact_id_after_restart >= next_multixact_id_before_restart + + # The multi-offset should wrap around as well + cur.execute("select next_multi_offset from pg_control_checkpoint()") + next_multi_offset_after_restart = int( + query_scalar(cur, "select next_multi_offset from pg_control_checkpoint()") + ) + log.info(f"next_multi_offset after restart: {next_multi_offset_after_restart}") + assert next_multi_offset_after_restart < 100000 From 57535c039c938f7c179693d9db8b052912019823 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Mon, 1 Jul 2024 11:23:31 +0300 Subject: [PATCH 2/6] tests: remove a leftover 'running' flag (#8216) The 'running' boolean was replaced with a semaphore in commit f0e2bb79b2, but this initialization was missed. Remove it so that if a test tries to access it, you get an error rather than always claiming that the endpoint is not running. Spotted by Arseny at https://github.com/neondatabase/neon/pull/7288#discussion_r1660068657 --- test_runner/fixtures/neon_fixtures.py | 1 - 1 file changed, 1 deletion(-) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 4911917bf4..a1cb1b5195 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -3491,7 +3491,6 @@ class Endpoint(PgProtocol, LogUtils): ): super().__init__(host="localhost", port=pg_port, user="cloud_admin", dbname="postgres") self.env = env - self.running = False self.branch_name: Optional[str] = None # dubious self.endpoint_id: Optional[str] = None # dubious, see asserts below self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA From 75c84c846a2517cbbe414ae5f3e0649f4a359036 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Mon, 1 Jul 2024 12:58:08 +0300 Subject: [PATCH 3/6] tests: Make neon_xlogflush() flush all WAL, if you omit the LSN arg This makes it much more convenient to use in the common case that you want to flush all the WAL. (Passing pg_current_wal_insert_lsn() as the argument doesn't work for the same reasons as explained in the comments: we need to be back off to the beginning of a page if the previous record ended at page boundary.) I plan to use this to fix the issue that Arseny Sher called out at https://github.com/neondatabase/neon/pull/7288#discussion_r1660063852 --- pgxn/neon_test_utils/Makefile | 2 +- ...tils--1.1.sql => neon_test_utils--1.2.sql} | 2 +- pgxn/neon_test_utils/neon_test_utils.control | 2 +- pgxn/neon_test_utils/neontest.c | 38 ++++++++++++++++++- 4 files changed, 40 insertions(+), 4 deletions(-) rename pgxn/neon_test_utils/{neon_test_utils--1.1.sql => neon_test_utils--1.2.sql} (96%) diff --git a/pgxn/neon_test_utils/Makefile b/pgxn/neon_test_utils/Makefile index 1ee87357e5..1371272439 100644 --- a/pgxn/neon_test_utils/Makefile +++ b/pgxn/neon_test_utils/Makefile @@ -7,7 +7,7 @@ OBJS = \ neontest.o EXTENSION = neon_test_utils -DATA = neon_test_utils--1.1.sql +DATA = neon_test_utils--1.2.sql PGFILEDESC = "neon_test_utils - helpers for neon testing and debugging" PG_CONFIG = pg_config diff --git a/pgxn/neon_test_utils/neon_test_utils--1.1.sql b/pgxn/neon_test_utils/neon_test_utils--1.2.sql similarity index 96% rename from pgxn/neon_test_utils/neon_test_utils--1.1.sql rename to pgxn/neon_test_utils/neon_test_utils--1.2.sql index 534784f319..f84a24ec8d 100644 --- a/pgxn/neon_test_utils/neon_test_utils--1.1.sql +++ b/pgxn/neon_test_utils/neon_test_utils--1.2.sql @@ -41,7 +41,7 @@ RETURNS bytea AS 'MODULE_PATHNAME', 'get_raw_page_at_lsn_ex' LANGUAGE C PARALLEL UNSAFE; -CREATE FUNCTION neon_xlogflush(lsn pg_lsn) +CREATE FUNCTION neon_xlogflush(lsn pg_lsn DEFAULT NULL) RETURNS VOID AS 'MODULE_PATHNAME', 'neon_xlogflush' LANGUAGE C PARALLEL UNSAFE; diff --git a/pgxn/neon_test_utils/neon_test_utils.control b/pgxn/neon_test_utils/neon_test_utils.control index 5f6d640835..c7b9191ddc 100644 --- a/pgxn/neon_test_utils/neon_test_utils.control +++ b/pgxn/neon_test_utils/neon_test_utils.control @@ -1,6 +1,6 @@ # neon_test_utils extension comment = 'helpers for neon testing and debugging' -default_version = '1.1' +default_version = '1.2' module_pathname = '$libdir/neon_test_utils' relocatable = true trusted = true diff --git a/pgxn/neon_test_utils/neontest.c b/pgxn/neon_test_utils/neontest.c index 47f245fbf1..944936d395 100644 --- a/pgxn/neon_test_utils/neontest.c +++ b/pgxn/neon_test_utils/neontest.c @@ -15,6 +15,7 @@ #include "access/relation.h" #include "access/xact.h" #include "access/xlog.h" +#include "access/xlog_internal.h" #include "catalog/namespace.h" #include "fmgr.h" #include "funcapi.h" @@ -444,11 +445,46 @@ get_raw_page_at_lsn_ex(PG_FUNCTION_ARGS) /* * Directly calls XLogFlush(lsn) to flush WAL buffers. + * + * If 'lsn' is not specified (is NULL), flush all generated WAL. */ Datum neon_xlogflush(PG_FUNCTION_ARGS) { - XLogRecPtr lsn = PG_GETARG_LSN(0); + XLogRecPtr lsn; + + if (RecoveryInProgress()) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("recovery is in progress"), + errhint("cannot flush WAL during recovery."))); + + if (!PG_ARGISNULL(0)) + lsn = PG_GETARG_LSN(0); + else + { + lsn = GetXLogInsertRecPtr(); + + /*--- + * The LSN returned by GetXLogInsertRecPtr() is the position where the + * next inserted record would begin. If the last record ended just at + * the page boundary, the next record will begin after the page header + * on the next page, and that's what GetXLogInsertRecPtr().returns, + * but the page header has not been written yet. If we tried to flush + * it, XLogFlush() would throw an error: + * + * ERROR : xlog flush request %X/%X is not satisfied --- flushed only to %X/%X + * + * To avoid that, if the insert position points to just after the page + * header, back off to page boundary. + */ + if (lsn % XLOG_BLCKSZ == SizeOfXLogShortPHD && + XLogSegmentOffset(lsn, wal_segment_size) > XLOG_BLCKSZ) + lsn -= SizeOfXLogShortPHD; + else if (lsn % XLOG_BLCKSZ == SizeOfXLogLongPHD && + XLogSegmentOffset(lsn, wal_segment_size) < XLOG_BLCKSZ) + lsn -= SizeOfXLogLongPHD; + } XLogFlush(lsn); PG_RETURN_VOID(); From 9ce193082a26714400a788f96e0c0cf95c7879df Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Mon, 1 Jul 2024 12:58:12 +0300 Subject: [PATCH 4/6] Restore running xacts from CLOG on replica startup (#7288) We have one pretty serious MVCC visibility bug with hot standby replicas. We incorrectly treat any transactions that are in progress in the primary, when the standby is started, as aborted. That can break MVCC for queries running concurrently in the standby. It can also lead to hint bits being set incorrectly, and that damage can last until the replica is restarted. The fundamental bug was that we treated any replica start as starting from a shut down server. The fix for that is straightforward: we need to set 'wasShutdown = false' in InitWalRecovery() (see changes in the postgres repo). However, that introduces a new problem: with wasShutdown = false, the standby will not open up for queries until it receives a running-xacts WAL record from the primary. That's correct, and that's how Postgres hot standby always works. But it's a problem for Neon, because: * It changes the historical behavior for existing users. Currently, the standby immediately opens up for queries, so if they now need to wait, we can breka existing use cases that were working fine (assuming you don't hit the MVCC issues). * The problem is much worse for Neon than it is for standalone PostgreSQL, because in Neon, we can start a replica from an arbitrary LSN. In standalone PostgreSQL, the replica always starts WAL replay from a checkpoint record, and the primary arranges things so that there is always a running-xacts record soon after each checkpoint record. You can still hit this issue with PostgreSQL if you have a transaction with lots of subtransactions running in the primary, but it's pretty rare in practice. To mitigate that, we introduce another way to collect the running-xacts information at startup, without waiting for the running-xacts WAL record: We can the CLOG for XIDs that haven't been marked as committed or aborted. It has limitations with subtransactions too, but should mitigate the problem for most users. See https://github.com/neondatabase/neon/issues/7236. Co-authored-by: Konstantin Knizhnik --- pageserver/src/walingest.rs | 40 +- pgxn/neon/neon.c | 293 ++++++++ test_runner/fixtures/neon_fixtures.py | 4 +- test_runner/fixtures/pageserver/utils.py | 2 +- test_runner/regress/test_replica_start.py | 646 ++++++++++++++++++ test_runner/regress/test_replication_start.py | 32 - vendor/postgres-v14 | 2 +- vendor/postgres-v15 | 2 +- vendor/postgres-v16 | 2 +- vendor/revisions.json | 6 +- 10 files changed, 981 insertions(+), 48 deletions(-) create mode 100644 test_runner/regress/test_replica_start.py delete mode 100644 test_runner/regress/test_replication_start.py diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index fb10bca5a6..07c90385e6 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -343,7 +343,33 @@ impl WalIngest { xlog_checkpoint.oldestActiveXid, self.checkpoint.oldestActiveXid ); - self.checkpoint.oldestActiveXid = xlog_checkpoint.oldestActiveXid; + + // A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`, + // because at shutdown, all in-progress transactions will implicitly + // end. Postgres startup code knows that, and allows hot standby to start + // immediately from a shutdown checkpoint. + // + // In Neon, Postgres hot standby startup always behaves as if starting from + // an online checkpoint. It needs a valid `oldestActiveXid` value, so + // instead of overwriting self.checkpoint.oldestActiveXid with + // InvalidTransactionid from the checkpoint WAL record, update it to a + // proper value, knowing that there are no in-progress transactions at this + // point, except for prepared transactions. + // + // See also the neon code changes in the InitWalRecovery() function. + if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID + && info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN + { + let mut oldest_active_xid = self.checkpoint.nextXid.value as u32; + for xid in modification.tline.list_twophase_files(lsn, ctx).await? { + if (xid.wrapping_sub(oldest_active_xid) as i32) < 0 { + oldest_active_xid = xid; + } + } + self.checkpoint.oldestActiveXid = oldest_active_xid; + } else { + self.checkpoint.oldestActiveXid = xlog_checkpoint.oldestActiveXid; + } // Write a new checkpoint key-value pair on every checkpoint record, even // if nothing really changed. Not strictly required, but it seems nice to @@ -375,6 +401,7 @@ impl WalIngest { if info == pg_constants::XLOG_RUNNING_XACTS { let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf); self.checkpoint.oldestActiveXid = xlrec.oldest_running_xid; + self.checkpoint_modified = true; } } pg_constants::RM_REPLORIGIN_ID => { @@ -1277,13 +1304,10 @@ impl WalIngest { xlrec.pageno, xlrec.oldest_xid, xlrec.oldest_xid_db ); - // Here we treat oldestXid and oldestXidDB - // differently from postgres redo routines. - // In postgres checkpoint.oldestXid lags behind xlrec.oldest_xid - // until checkpoint happens and updates the value. - // Here we can use the most recent value. - // It's just an optimization, though and can be deleted. - // TODO Figure out if there will be any issues with replica. + // In Postgres, oldestXid and oldestXidDB are updated in memory when the CLOG is + // truncated, but a checkpoint record with the updated values isn't written until + // later. In Neon, a server can start at any LSN, not just on a checkpoint record, + // so we keep the oldestXid and oldestXidDB up-to-date. self.checkpoint.oldestXid = xlrec.oldest_xid; self.checkpoint.oldestXidDB = xlrec.oldest_xid_db; self.checkpoint_modified = true; diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index b6b2db7e71..e4968bdf89 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -12,6 +12,8 @@ #include "fmgr.h" #include "miscadmin.h" +#include "access/subtrans.h" +#include "access/twophase.h" #include "access/xact.h" #include "access/xlog.h" #include "storage/buf_internals.h" @@ -22,10 +24,12 @@ #include "replication/logical.h" #include "replication/slot.h" #include "replication/walsender.h" +#include "storage/proc.h" #include "storage/procsignal.h" #include "tcop/tcopprot.h" #include "funcapi.h" #include "access/htup_details.h" +#include "utils/builtins.h" #include "utils/pg_lsn.h" #include "utils/guc.h" #include "utils/wait_event.h" @@ -266,6 +270,293 @@ LogicalSlotsMonitorMain(Datum main_arg) } } +/* + * XXX: These private to procarray.c, but we need them here. + */ +#define PROCARRAY_MAXPROCS (MaxBackends + max_prepared_xacts) +#define TOTAL_MAX_CACHED_SUBXIDS \ + ((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS) + +/* + * Restore running-xact information by scanning the CLOG at startup. + * + * In PostgreSQL, a standby always has to wait for a running-xacts WAL record + * to arrive before it can start accepting queries. Furthermore, if there are + * transactions with too many subxids (> 64) open to fit in the in-memory + * subxids cache, the running-xacts record will be marked as "suboverflowed", + * and the standby will need to also wait for the currently in-progress + * transactions to finish. + * + * That's not great in PostgreSQL, because a hot standby does not necessary + * open up for queries immediately as you might expect. But it's worse in + * Neon: A standby in Neon doesn't need to start WAL replay from a checkpoint + * record; it can start at any LSN. Postgres arranges things so that there is + * a running-xacts record soon after every checkpoint record, but when you + * start from an arbitrary LSN, that doesn't help. If the primary is idle, or + * not running at all, it might never write a new running-xacts record, + * leaving the replica in a limbo where it can never start accepting queries. + * + * To mitigate that, we have an additional mechanism to find the running-xacts + * information: we scan the CLOG, making note of any XIDs not marked as + * committed or aborted. They are added to the Postgres known-assigned XIDs + * array by calling ProcArrayApplyRecoveryInfo() in the caller of this + * function. + * + * There is one big limitation with that mechanism: The size of the + * known-assigned XIDs is limited, so if there are a lot of in-progress XIDs, + * we have to give up. Furthermore, we don't know how many of the in-progress + * XIDs are subtransactions, and if we use up all the space in the + * known-assigned XIDs array for subtransactions, we might run out of space in + * the array later during WAL replay, causing the replica to shut down with + * "ERROR: too many KnownAssignedXids". The safe # of XIDs that we can add to + * the known-assigned array without risking that error later is very low, + * merely PGPROC_MAX_CACHED_SUBXIDS == 64, so we take our chances and use up + * to half of the known-assigned XIDs array for the subtransactions, even + * though that risks getting the error later. + * + * Note: It's OK if the recovered list of XIDs includes some transactions that + * have crashed in the primary, and hence will never commit. They will be seen + * as in-progress, until we see a new next running-acts record with an + * oldestActiveXid that invalidates them. That's how the known-assigned XIDs + * array always works. + * + * If scraping the CLOG doesn't succeed for some reason, like the subxid + * overflow, Postgres will fall back to waiting for a running-xacts record + * like usual. + * + * Returns true if a complete list of in-progress XIDs was scraped. + */ +static bool +RestoreRunningXactsFromClog(CheckPoint *checkpoint, TransactionId **xids, int *nxids) +{ + TransactionId from; + TransactionId till; + int max_xcnt; + TransactionId *prepared_xids = NULL; + int n_prepared_xids; + TransactionId *restored_xids = NULL; + int n_restored_xids; + int next_prepared_idx; + + Assert(*xids == NULL); + + /* + * If the checkpoint doesn't have a valid oldestActiveXid, bail out. We + * don't know where to start the scan. + * + * This shouldn't happen, because the pageserver always maintains a valid + * oldestActiveXid nowadays. Except when starting at an old point in time + * that was ingested before the pageserver was taught to do that. + */ + if (!TransactionIdIsValid(checkpoint->oldestActiveXid)) + { + elog(LOG, "cannot restore running-xacts from CLOG because oldestActiveXid is not set"); + goto fail; + } + + /* + * We will scan the CLOG starting from the oldest active XID. + * + * In some corner cases, the oldestActiveXid from the last checkpoint + * might already have been truncated from the CLOG. That is, + * oldestActiveXid might be older than oldestXid. That's possible because + * oldestActiveXid is only updated at checkpoints. After the last + * checkpoint, the oldest transaction might have committed, and the CLOG + * might also have been already truncated. So if oldestActiveXid is older + * than oldestXid, start at oldestXid instead. (Otherwise we'd try to + * access CLOG segments that have already been truncated away.) + */ + from = TransactionIdPrecedes(checkpoint->oldestXid, checkpoint->oldestActiveXid) + ? checkpoint->oldestActiveXid : checkpoint->oldestXid; + till = XidFromFullTransactionId(checkpoint->nextXid); + + /* + * To avoid "too many KnownAssignedXids" error later during replay, we + * limit number of collected transactions. This is a tradeoff: if we are + * willing to consume more of the KnownAssignedXids space for the XIDs + * now, that allows us to start up, but we might run out of space later. + * + * The size of the KnownAssignedXids array is TOTAL_MAX_CACHED_SUBXIDS, + * which is (PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS). In + * PostgreSQL, that's always enough because the primary will always write + * an XLOG_XACT_ASSIGNMENT record if a transaction has more than + * PGPROC_MAX_CACHED_SUBXIDS subtransactions. Seeing that record allows + * the standby to mark the XIDs in pg_subtrans and removing them from the + * KnowingAssignedXids array. + * + * Here, we don't know which XIDs belong to subtransactions that have + * already been WAL-logged with an XLOG_XACT_ASSIGNMENT record. If we + * wanted to be totally safe and avoid the possibility of getting a "too + * many KnownAssignedXids" error later, we would have to limit ourselves + * to PGPROC_MAX_CACHED_SUBXIDS, which is not much. And that includes top + * transaction IDs too, because we cannot distinguish between top + * transaction IDs and subtransactions here. + * + * Somewhat arbitrarily, we use up to half of KnownAssignedXids. That + * strikes a sensible balance between being useful, and risking a "too + * many KnownAssignedXids" error later. + */ + max_xcnt = TOTAL_MAX_CACHED_SUBXIDS / 2; + + /* + * Collect XIDs of prepared transactions in an array. This includes only + * their top-level XIDs. We assume that StandbyRecoverPreparedTransactions + * has already been called, so we can find all the sub-transactions in + * pg_subtrans. + */ + PrescanPreparedTransactions(&prepared_xids, &n_prepared_xids); + qsort(prepared_xids, n_prepared_xids, sizeof(TransactionId), xidLogicalComparator); + + /* + * Scan the CLOG, collecting in-progress XIDs into 'restored_xids'. + */ + elog(DEBUG1, "scanning CLOG between %u and %u for in-progress XIDs", from, till); + restored_xids = (TransactionId *) palloc(max_xcnt * sizeof(TransactionId)); + n_restored_xids = 0; + next_prepared_idx = 0; + for (TransactionId xid = from; xid != till;) + { + XLogRecPtr xidlsn; + XidStatus xidstatus; + + xidstatus = TransactionIdGetStatus(xid, &xidlsn); + + /* + * "Merge" the prepared transactions into the restored_xids array as + * we go. The prepared transactions array is sorted. This is mostly + * a sanity check to ensure that all the prpeared transactions are + * seen as in-progress. (There is a check after the loop that we didn't + * miss any.) + */ + if (next_prepared_idx < n_prepared_xids && xid == prepared_xids[next_prepared_idx]) + { + /* + * This is a top-level transaction ID of a prepared transaction. + * Include it in the array. + */ + + /* sanity check */ + if (xidstatus != TRANSACTION_STATUS_IN_PROGRESS) + { + elog(LOG, "prepared transaction %u has unexpected status %X, cannot restore running-xacts from CLOG", + xid, xidstatus); + Assert(false); + goto fail; + } + + elog(DEBUG1, "XID %u: was next prepared xact (%d / %d)", xid, next_prepared_idx, n_prepared_xids); + next_prepared_idx++; + } + else if (xidstatus == TRANSACTION_STATUS_COMMITTED) + { + elog(DEBUG1, "XID %u: was committed", xid); + goto skip; + } + else if (xidstatus == TRANSACTION_STATUS_ABORTED) + { + elog(DEBUG1, "XID %u: was aborted", xid); + goto skip; + } + else if (xidstatus == TRANSACTION_STATUS_IN_PROGRESS) + { + /* + * In-progress transactions are included in the array. + * + * Except subtransactions of the prepared transactions. They are + * already set in pg_subtrans, and hence don't need to be tracked + * in the known-assigned XIDs array. + */ + if (n_prepared_xids > 0) + { + TransactionId parent = SubTransGetParent(xid); + + if (TransactionIdIsValid(parent)) + { + /* + * This is a subtransaction belonging to a prepared + * transaction. + * + * Sanity check that it is in the prepared XIDs array. It + * should be, because StandbyRecoverPreparedTransactions + * populated pg_subtrans, and no other XID should be set + * in it yet. (This also relies on the fact that + * StandbyRecoverPreparedTransactions sets the parent of + * each subxid to point directly to the top-level XID, + * rather than restoring the original subtransaction + * hierarchy.) + */ + if (bsearch(&parent, prepared_xids, next_prepared_idx, + sizeof(TransactionId), xidLogicalComparator) == NULL) + { + elog(LOG, "sub-XID %u has unexpected parent %u, cannot restore running-xacts from CLOG", + xid, parent); + Assert(false); + goto fail; + } + elog(DEBUG1, "XID %u: was a subtransaction of prepared xid %u", xid, parent); + goto skip; + } + } + + /* include it in the array */ + elog(DEBUG1, "XID %u: is in progress", xid); + } + else + { + /* + * SUB_COMMITTED is a transient state used at commit. We don't + * expect to see that here. + */ + elog(LOG, "XID %u has unexpected status %X in pg_xact, cannot restore running-xacts from CLOG", + xid, xidstatus); + Assert(false); + goto fail; + } + + if (n_restored_xids >= max_xcnt) + { + /* + * Overflowed. We won't be able to install the RunningTransactions + * snapshot. + */ + elog(LOG, "too many running xacts to restore from the CLOG; oldestXid=%u oldestActiveXid=%u nextXid %u", + checkpoint->oldestXid, checkpoint->oldestActiveXid, + XidFromFullTransactionId(checkpoint->nextXid)); + goto fail; + } + + restored_xids[n_restored_xids++] = xid; + + skip: + TransactionIdAdvance(xid); + continue; + } + + /* sanity check */ + if (next_prepared_idx != n_prepared_xids) + { + elog(LOG, "prepared transaction ID %u was not visited in the CLOG scan, cannot restore running-xacts from CLOG", + prepared_xids[next_prepared_idx]); + Assert(false); + goto fail; + } + + elog(LOG, "restored %d running xacts by scanning the CLOG; oldestXid=%u oldestActiveXid=%u nextXid %u", + n_restored_xids, checkpoint->oldestXid, checkpoint->oldestActiveXid, XidFromFullTransactionId(checkpoint->nextXid)); + *nxids = n_restored_xids; + *xids = restored_xids; + return true; + + fail: + *nxids = 0; + *xids = NULL; + if (restored_xids) + pfree(restored_xids); + if (prepared_xids) + pfree(prepared_xids); + return false; +} + void _PG_init(void) { @@ -288,6 +579,8 @@ _PG_init(void) pg_init_extension_server(); + restore_running_xacts_callback = RestoreRunningXactsFromClog; + /* * Important: This must happen after other parts of the extension are * loaded, otherwise any settings to GUCs that were set before the diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index a1cb1b5195..e1c8514351 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -3856,7 +3856,9 @@ class EndpointFactory: return self - def new_replica(self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]]): + def new_replica( + self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]] = None + ): branch_name = origin.branch_name assert origin in self.endpoints assert branch_name is not None diff --git a/test_runner/fixtures/pageserver/utils.py b/test_runner/fixtures/pageserver/utils.py index 60535b7592..b75a480a63 100644 --- a/test_runner/fixtures/pageserver/utils.py +++ b/test_runner/fixtures/pageserver/utils.py @@ -198,7 +198,7 @@ def wait_for_last_record_lsn( lsn: Lsn, ) -> Lsn: """waits for pageserver to catch up to a certain lsn, returns the last observed lsn.""" - for i in range(100): + for i in range(1000): current_lsn = last_record_lsn(pageserver_http, tenant, timeline) if current_lsn >= lsn: return current_lsn diff --git a/test_runner/regress/test_replica_start.py b/test_runner/regress/test_replica_start.py new file mode 100644 index 0000000000..17d476a8a6 --- /dev/null +++ b/test_runner/regress/test_replica_start.py @@ -0,0 +1,646 @@ +""" +In PostgreSQL, a standby always has to wait for a running-xacts WAL record to +arrive before it can start accepting queries. Furthermore, if there are +transactions with too many subxids (> 64) open to fit in the in-memory subxids +cache, the running-xacts record will be marked as "suboverflowed", and the +standby will need to also wait for the currently in-progress transactions to +finish. + +In Neon, we have an additional mechanism that scans the CLOG at server startup +to determine the list of running transactions, so that the standby can start up +immediately without waiting for the running-xacts record, but that mechanism +only works if the # of active (sub-)transactions is reasonably small. Otherwise +it falls back to waiting. Furthermore, it's somewhat optimistic in using up the +known-assigned XIDs array: if too many transactions with subxids are started in +the primary later, the replay in the replica will crash with "too many +KnownAssignedXids" error. + +This module contains tests for those various cases at standby startup: starting +from shutdown checkpoint, using the CLOG scanning mechanism, waiting for +running-xacts record and for in-progress transactions to finish etc. +""" + +import threading +from contextlib import closing + +import psycopg2 +import pytest +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnv, wait_for_last_flush_lsn, wait_replica_caughtup +from fixtures.pg_version import PgVersion +from fixtures.utils import query_scalar, wait_until + +CREATE_SUBXACTS_FUNC = """ +create or replace function create_subxacts(n integer) returns void as $$ +declare + i integer; +begin + for i in 1..n loop + begin + insert into t (payload) values (0); + exception + when others then + raise exception 'caught something: %', sqlerrm; + end; + end loop; +end; $$ language plpgsql +""" + + +def test_replica_start_scan_clog(neon_simple_env: NeonEnv): + """ + Test the CLOG-scanning mechanism at hot standby startup. There is one + transaction active in the primary when the standby is started. The primary + is killed before it has a chance to write a running-xacts record. The + CLOG-scanning at neon startup allows the standby to start up anyway. + + See the module docstring for background. + """ + + # Initialize the primary, a test table, and a helper function to create lots + # of subtransactions. + env = neon_simple_env + primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary") + primary_conn = primary.connect() + primary_cur = primary_conn.cursor() + primary_cur.execute("CREATE EXTENSION neon_test_utils") + primary_cur.execute("create table t(pk serial primary key, payload integer)") + primary_cur.execute(CREATE_SUBXACTS_FUNC) + primary_cur.execute("select pg_switch_wal()") + + # Start a transaction in the primary. Leave the transaction open. + # + # The transaction has some subtransactions, but not too many to cause the + # CLOG-scanning mechanism to give up. + primary_cur.execute("begin") + primary_cur.execute("select create_subxacts(50)") + + # Wait for the WAL to be flushed, but then immediately kill the primary, + # before it has a chance to generate a running-xacts record. + primary_cur.execute("select neon_xlogflush()") + wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline) + primary.stop(mode="immediate") + + # Create a replica. It should start up normally, thanks to the CLOG-scanning + # mechanism. + secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") + + # The transaction did not commit, so it should not be visible in the secondary + secondary_conn = secondary.connect() + secondary_cur = secondary_conn.cursor() + secondary_cur.execute("select count(*) from t") + assert secondary_cur.fetchone() == (0,) + + +def test_replica_start_scan_clog_crashed_xids(neon_simple_env: NeonEnv): + """ + Test the CLOG-scanning mechanism at hot standby startup, after + leaving behind crashed transactions. + + See the module docstring for background. + """ + + # Initialize the primary, a test table, and a helper function to create lots + # of subtransactions. + env = neon_simple_env + primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary") + primary_conn = primary.connect() + primary_cur = primary_conn.cursor() + primary_cur.execute("create table t(pk serial primary key, payload integer)") + primary_cur.execute(CREATE_SUBXACTS_FUNC) + primary_cur.execute("select pg_switch_wal()") + + # Consume a lot of XIDs, then kill Postgres without giving it a + # chance to write abort records for them. + primary_cur.execute("begin") + primary_cur.execute("select create_subxacts(100000)") + primary.stop(mode="immediate") + + # Restart the primary. Do some light work, and shut it down cleanly + primary.start() + primary_conn = primary.connect() + primary_cur = primary_conn.cursor() + primary_cur.execute("insert into t (payload) values (0)") + primary.stop(mode="fast") + + # Create a replica. It should start up normally, thanks to the CLOG-scanning + # mechanism. (Restarting the primary writes a checkpoint and/or running-xacts + # record, which allows the standby to know that the crashed XIDs are aborted) + secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") + + secondary_conn = secondary.connect() + secondary_cur = secondary_conn.cursor() + secondary_cur.execute("select count(*) from t") + assert secondary_cur.fetchone() == (1,) + + +def test_replica_start_at_running_xacts(neon_simple_env: NeonEnv, pg_version): + """ + Test that starting a replica works right after the primary has + created a running-xacts record. This may seem like a trivial case, + but during development, we had a bug that was triggered by having + oldestActiveXid == nextXid. Starting right after a running-xacts + record is one way to test that case. + + See the module docstring for background. + """ + env = neon_simple_env + + if env.pg_version == PgVersion.V14 or env.pg_version == PgVersion.V15: + pytest.skip("pg_log_standby_snapshot() function is available only in PG16") + + primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary") + primary_conn = primary.connect() + primary_cur = primary_conn.cursor() + + primary_cur.execute("CREATE EXTENSION neon_test_utils") + primary_cur.execute("select pg_log_standby_snapshot()") + primary_cur.execute("select neon_xlogflush()") + wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline) + + secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") + + secondary_conn = secondary.connect() + secondary_cur = secondary_conn.cursor() + secondary_cur.execute("select 123") + assert secondary_cur.fetchone() == (123,) + + +def test_replica_start_wait_subxids_finish(neon_simple_env: NeonEnv): + """ + Test replica startup when there are a lot of (sub)transactions active in the + primary. That's too many for the CLOG-scanning mechanism to handle, so the + replica has to wait for the large transaction to finish before it starts to + accept queries. + + After replica startup, test MVCC with transactions that were in-progress + when the replica was started. + + See the module docstring for background. + """ + + # Initialize the primary, a test table, and a helper function to create + # lots of subtransactions. + env = neon_simple_env + primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary") + primary_conn = primary.connect() + primary_cur = primary_conn.cursor() + primary_cur.execute("create table t(pk serial primary key, payload integer)") + primary_cur.execute(CREATE_SUBXACTS_FUNC) + + # Start a transaction with 100000 subtransactions, and leave it open. That's + # too many to fit in the "known-assigned XIDs array" in the replica, and + # also too many to fit in the subxid caches so the running-xacts record will + # also overflow. + primary_cur.execute("begin") + primary_cur.execute("select create_subxacts(100000)") + + # Start another, smaller transaction in the primary. We'll come back to this + # later. + primary_conn2 = primary.connect() + primary_cur2 = primary_conn2.cursor() + primary_cur2.execute("begin") + primary_cur2.execute("insert into t (payload) values (0)") + + # Create a replica. but before that, wait for the wal to be flushed to + # safekeepers, so that the replica is started at a point where the large + # transaction is already active. (The whole transaction might not be flushed + # yet, but that's OK.) + # + # Start it in a separate thread, so that we can do other stuff while it's + # blocked waiting for the startup to finish. + wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline) + secondary = env.endpoints.new_replica(origin=primary, endpoint_id="secondary") + start_secondary_thread = threading.Thread(target=secondary.start) + start_secondary_thread.start() + + # Verify that the replica has otherwise started up, but cannot start + # accepting queries yet. + log.info("Waiting 5 s to verify that the secondary does not start") + start_secondary_thread.join(5) + assert secondary.log_contains("consistent recovery state reached") + assert secondary.log_contains("started streaming WAL from primary") + # The "redo starts" message is printed when the first WAL record is + # received. It might or might not be present in the log depending on how + # far exactly the WAL was flushed when the replica was started, and whether + # background activity caused any more WAL records to be flushed on the + # primary afterwards. + # + # assert secondary.log_contains("redo # starts") + + # should not be open for connections yet + assert start_secondary_thread.is_alive() + assert not secondary.is_running() + assert not secondary.log_contains("database system is ready to accept read-only connections") + + # Commit the large transaction in the primary. + # + # Within the next 15 s, the primary should write a new running-xacts record + # to the WAL which shows the transaction as completed. Once the replica + # replays that record, it will start accepting queries. + primary_cur.execute("commit") + start_secondary_thread.join() + + # Verify that the large transaction is correctly visible in the secondary + # (but not the second, small transaction, which is still in-progress!) + secondary_conn = secondary.connect() + secondary_cur = secondary_conn.cursor() + secondary_cur.execute("select count(*) from t") + assert secondary_cur.fetchone() == (100000,) + + # Perform some more MVCC testing using the second transaction that was + # started in the primary before the replica was created + primary_cur2.execute("select create_subxacts(10000)") + + # The second transaction still hasn't committed + wait_replica_caughtup(primary, secondary) + secondary_cur.execute("BEGIN ISOLATION LEVEL REPEATABLE READ") + secondary_cur.execute("select count(*) from t") + assert secondary_cur.fetchone() == (100000,) + + # Commit the second transaction in the primary + primary_cur2.execute("commit") + + # Should still be invisible to the old snapshot + wait_replica_caughtup(primary, secondary) + secondary_cur.execute("select count(*) from t") + assert secondary_cur.fetchone() == (100000,) + + # Commit the REPEATABLE READ transaction in the replica. Both + # primary transactions should now be visible to a new snapshot. + secondary_cur.execute("commit") + secondary_cur.execute("select count(*) from t") + assert secondary_cur.fetchone() == (110001,) + + +def test_replica_too_many_known_assigned_xids(neon_simple_env: NeonEnv): + """ + The CLOG-scanning mechanism fills the known-assigned XIDs array + optimistically at standby startup, betting that it can still fit + upcoming transactions replayed later from the WAL in the + array. This test tests what happens when that bet fails and the + known-assigned XID array fills up after the standby has already + been started. The WAL redo will fail with an error: + + FATAL: too many KnownAssignedXids + CONTEXT: WAL redo at 0/1895CB0 for neon/INSERT: off: 25, flags: 0x08; blkref #0: rel 1663/5/16385, blk 64 + + which causes the standby to shut down. + + See the module docstring for background. + """ + + # Initialize the primary, a test table, and a helper function to create lots + # of subtransactions. + env = neon_simple_env + primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary") + primary_conn = primary.connect() + primary_cur = primary_conn.cursor() + primary_cur.execute("CREATE EXTENSION neon_test_utils") + primary_cur.execute("create table t(pk serial primary key, payload integer)") + primary_cur.execute(CREATE_SUBXACTS_FUNC) + + # Determine how many connections we can use + primary_cur.execute("show max_connections") + max_connections = int(primary_cur.fetchall()[0][0]) + primary_cur.execute("show superuser_reserved_connections") + superuser_reserved_connections = int(primary_cur.fetchall()[0][0]) + n_connections = max_connections - superuser_reserved_connections + n_subxids = 200 + + # Start one top transaction in primary, with lots of subtransactions. This + # uses up much of the known-assigned XIDs space in the standby, but doesn't + # cause it to overflow. + large_p_conn = primary.connect() + large_p_cur = large_p_conn.cursor() + large_p_cur.execute("begin") + large_p_cur.execute(f"select create_subxacts({max_connections} * 30)") + + with closing(primary.connect()) as small_p_conn: + with small_p_conn.cursor() as small_p_cur: + small_p_cur.execute("select create_subxacts(1)") + + # Create a replica at this LSN + primary_cur.execute("select neon_xlogflush()") + wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline) + secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") + secondary_conn = secondary.connect() + secondary_cur = secondary_conn.cursor() + + # The transaction in primary has not committed yet. + wait_replica_caughtup(primary, secondary) + secondary_cur.execute("select count(*) from t") + assert secondary_cur.fetchone() == (1,) + + # Start max number of top transactions in primary, with a lot of + # subtransactions each. We add the subtransactions to each top transaction + # in a round-robin fashion, instead of adding a lot of subtransactions to + # one top transaction at a time. This way, we will have the max number of + # subtransactions in the in-memory subxid cache of each top transaction, + # until they all overflow. + # + # Currently, PGPROC_MAX_CACHED_SUBXIDS == 64, so this will overflow the all + # the subxid caches after creating 64 subxids in each top transaction. The + # point just before the caches have overflowed is the most interesting point + # in time, but we'll keep going beyond that, to ensure that this test is + # robust even if PGPROC_MAX_CACHED_SUBXIDS changes. + p_curs = [] + for _ in range(0, n_connections): + p_cur = primary.connect().cursor() + p_cur.execute("begin") + p_curs.append(p_cur) + + for _subxid in range(0, n_subxids): + for i in range(0, n_connections): + p_curs[i].execute("select create_subxacts(1)") + + # Commit all the transactions in the primary + for i in range(0, n_connections): + p_curs[i].execute("commit") + large_p_cur.execute("commit") + + # Wait until the replica crashes with "too many KnownAssignedXids" error. + def check_replica_crashed(): + try: + secondary.connect() + except psycopg2.Error: + # Once the connection fails, return success + return None + raise RuntimeError("connection succeeded") + + wait_until(20, 0.5, check_replica_crashed) + assert secondary.log_contains("too many KnownAssignedXids") + + # Replica is crashed, so ignore stop result + secondary.check_stop_result = False + + +def test_replica_start_repro_visibility_bug(neon_simple_env: NeonEnv): + """ + Before PR #7288, a hot standby in neon incorrectly started up + immediately, before it had received a running-xacts record. That + led to visibility bugs if there were active transactions in the + primary. This test reproduces the incorrect query results and + incorrectly set hint bits, before that was fixed. + """ + env = neon_simple_env + + primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary") + p_cur = primary.connect().cursor() + + p_cur.execute("begin") + p_cur.execute("create table t(pk integer primary key, payload integer)") + p_cur.execute("insert into t values (generate_series(1,100000), 0)") + + secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") + wait_replica_caughtup(primary, secondary) + s_cur = secondary.connect().cursor() + + # Set hint bits for pg_class tuples. If primary's transaction is + # not marked as in-progress in MVCC snapshot, then XMIN_INVALID + # hint bit will be set for table's 't' tuple, making it invisible + # even after the commit record is replayed later. + s_cur.execute("select * from pg_class") + + p_cur.execute("commit") + wait_replica_caughtup(primary, secondary) + s_cur.execute("select * from t where pk = 1") + assert s_cur.fetchone() == (1, 0) + + +@pytest.mark.parametrize("shutdown", [True, False]) +def test_replica_start_with_prepared_xacts(neon_simple_env: NeonEnv, shutdown: bool): + """ + Test the CLOG-scanning mechanism at hot standby startup in the presence of + prepared transactions. + + This test is run in two variants: one where the primary server is shut down + before starting the secondary, or not. + """ + + # Initialize the primary, a test table, and a helper function to create lots + # of subtransactions. + env = neon_simple_env + primary = env.endpoints.create_start( + branch_name="main", endpoint_id="primary", config_lines=["max_prepared_transactions=5"] + ) + primary_conn = primary.connect() + primary_cur = primary_conn.cursor() + primary_cur.execute("CREATE EXTENSION neon_test_utils") + primary_cur.execute("create table t(pk serial primary key, payload integer)") + primary_cur.execute("create table t1(pk integer primary key)") + primary_cur.execute("create table t2(pk integer primary key)") + primary_cur.execute(CREATE_SUBXACTS_FUNC) + + # Prepare a transaction for two-phase commit + primary_cur.execute("begin") + primary_cur.execute("insert into t1 values (1)") + primary_cur.execute("prepare transaction 't1'") + + # Prepare another transaction for two-phase commit, with a subtransaction + primary_cur.execute("begin") + primary_cur.execute("insert into t2 values (2)") + primary_cur.execute("savepoint sp") + primary_cur.execute("insert into t2 values (3)") + primary_cur.execute("prepare transaction 't2'") + + # Start a transaction in the primary. Leave the transaction open. + # + # The transaction has some subtransactions, but not too many to cause the + # CLOG-scanning mechanism to give up. + primary_cur.execute("begin") + primary_cur.execute("select create_subxacts(50)") + + # Wait for the WAL to be flushed + primary_cur.execute("select neon_xlogflush()") + wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline) + + if shutdown: + primary.stop(mode="fast") + + # Create a replica. It should start up normally, thanks to the CLOG-scanning + # mechanism. + secondary = env.endpoints.new_replica_start( + origin=primary, endpoint_id="secondary", config_lines=["max_prepared_transactions=5"] + ) + + # The transaction did not commit, so it should not be visible in the secondary + secondary_conn = secondary.connect() + secondary_cur = secondary_conn.cursor() + secondary_cur.execute("select count(*) from t") + assert secondary_cur.fetchone() == (0,) + secondary_cur.execute("select count(*) from t1") + assert secondary_cur.fetchone() == (0,) + secondary_cur.execute("select count(*) from t2") + assert secondary_cur.fetchone() == (0,) + + if shutdown: + primary.start() + primary_conn = primary.connect() + primary_cur = primary_conn.cursor() + else: + primary_cur.execute("commit") + primary_cur.execute("commit prepared 't1'") + primary_cur.execute("commit prepared 't2'") + + wait_replica_caughtup(primary, secondary) + + secondary_cur.execute("select count(*) from t") + if shutdown: + assert secondary_cur.fetchone() == (0,) + else: + assert secondary_cur.fetchone() == (50,) + secondary_cur.execute("select * from t1") + assert secondary_cur.fetchall() == [(1,)] + secondary_cur.execute("select * from t2") + assert secondary_cur.fetchall() == [(2,), (3,)] + + +def test_replica_start_with_prepared_xacts_with_subxacts(neon_simple_env: NeonEnv): + """ + Test the CLOG-scanning mechanism at hot standby startup in the presence of + prepared transactions, with subtransactions. + """ + + # Initialize the primary, a test table, and a helper function to create lots + # of subtransactions. + env = neon_simple_env + primary = env.endpoints.create_start( + branch_name="main", endpoint_id="primary", config_lines=["max_prepared_transactions=5"] + ) + primary_conn = primary.connect() + primary_cur = primary_conn.cursor() + + # Install extension containing function needed for test + primary_cur.execute("CREATE EXTENSION neon_test_utils") + + primary_cur.execute("create table t(pk serial primary key, payload integer)") + primary_cur.execute(CREATE_SUBXACTS_FUNC) + + # Advance nextXid close to the beginning of the next pg_subtrans segment (2^16 XIDs) + # + # This is interesting, because it tests that pg_subtrans is initialized correctly + # at standby startup. (We had a bug where it didn't at one point during development.) + while True: + xid = int(query_scalar(primary_cur, "SELECT txid_current()")) + log.info(f"xid now {xid}") + # Consume 500 transactions at a time until we get close + if xid < 65535 - 600: + primary_cur.execute("select test_consume_xids(500);") + else: + break + primary_cur.execute("checkpoint") + + # Prepare a transaction for two-phase commit + primary_cur.execute("begin") + primary_cur.execute("select create_subxacts(1000)") + primary_cur.execute("prepare transaction 't1'") + + # Wait for the WAL to be flushed, and stop the primary + wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline) + primary.stop(mode="fast") + + # Create a replica. It should start up normally, thanks to the CLOG-scanning + # mechanism. + secondary = env.endpoints.new_replica_start( + origin=primary, endpoint_id="secondary", config_lines=["max_prepared_transactions=5"] + ) + + # The transaction did not commit, so it should not be visible in the secondary + secondary_conn = secondary.connect() + secondary_cur = secondary_conn.cursor() + secondary_cur.execute("select count(*) from t") + assert secondary_cur.fetchone() == (0,) + + primary.start() + + # Open a lot of subtransactions in the primary, causing the subxids cache to overflow + primary_conn = primary.connect() + primary_cur = primary_conn.cursor() + primary_cur.execute("select create_subxacts(100000)") + + wait_replica_caughtup(primary, secondary) + + secondary_cur.execute("select count(*) from t") + assert secondary_cur.fetchone() == (100000,) + + primary_cur.execute("commit prepared 't1'") + + wait_replica_caughtup(primary, secondary) + secondary_cur.execute("select count(*) from t") + assert secondary_cur.fetchone() == (101000,) + + +def test_replica_start_with_prepared_xacts_with_many_subxacts(neon_simple_env: NeonEnv): + """ + Test the CLOG-scanning mechanism at hot standby startup in the presence of + prepared transactions, with lots of subtransactions. + + Like test_replica_start_with_prepared_xacts_with_subxacts, but with more + subxacts, to test that the prepared transaction's subxids don't consume + space in the known-assigned XIDs array. (They are set in pg_subtrans + instead) + """ + + # Initialize the primary, a test table, and a helper function to create lots + # of subtransactions. + env = neon_simple_env + primary = env.endpoints.create_start( + branch_name="main", endpoint_id="primary", config_lines=["max_prepared_transactions=5"] + ) + primary_conn = primary.connect() + primary_cur = primary_conn.cursor() + + # Install extension containing function needed for test + primary_cur.execute("CREATE EXTENSION neon_test_utils") + + primary_cur.execute("create table t(pk serial primary key, payload integer)") + primary_cur.execute(CREATE_SUBXACTS_FUNC) + + # Prepare a transaction for two-phase commit, with lots of subxids + primary_cur.execute("begin") + primary_cur.execute("select create_subxacts(50000)") + + # to make things a bit more varied, intersperse a few other XIDs in between + # the prepared transaction's sub-XIDs + with primary.connect().cursor() as primary_cur2: + primary_cur2.execute("insert into t (payload) values (123)") + primary_cur2.execute("begin; insert into t (payload) values (-1); rollback") + + primary_cur.execute("select create_subxacts(50000)") + primary_cur.execute("prepare transaction 't1'") + + # Wait for the WAL to be flushed + wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline) + + primary.stop(mode="fast") + + # Create a replica. It should start up normally, thanks to the CLOG-scanning + # mechanism. + secondary = env.endpoints.new_replica_start( + origin=primary, endpoint_id="secondary", config_lines=["max_prepared_transactions=5"] + ) + + # The transaction did not commit, so it should not be visible in the secondary + secondary_conn = secondary.connect() + secondary_cur = secondary_conn.cursor() + secondary_cur.execute("select count(*) from t") + assert secondary_cur.fetchone() == (1,) + + primary.start() + + # Open a lot of subtransactions in the primary, causing the subxids cache to overflow + primary_conn = primary.connect() + primary_cur = primary_conn.cursor() + primary_cur.execute("select create_subxacts(100000)") + + wait_replica_caughtup(primary, secondary) + + secondary_cur.execute("select count(*) from t") + assert secondary_cur.fetchone() == (100001,) + + primary_cur.execute("commit prepared 't1'") + + wait_replica_caughtup(primary, secondary) + secondary_cur.execute("select count(*) from t") + assert secondary_cur.fetchone() == (200001,) diff --git a/test_runner/regress/test_replication_start.py b/test_runner/regress/test_replication_start.py deleted file mode 100644 index 2360745990..0000000000 --- a/test_runner/regress/test_replication_start.py +++ /dev/null @@ -1,32 +0,0 @@ -import pytest -from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup - - -@pytest.mark.xfail -def test_replication_start(neon_simple_env: NeonEnv): - env = neon_simple_env - - with env.endpoints.create_start(branch_name="main", endpoint_id="primary") as primary: - with primary.connect() as p_con: - with p_con.cursor() as p_cur: - p_cur.execute("begin") - p_cur.execute("create table t(pk integer primary key, payload integer)") - p_cur.execute("insert into t values (generate_series(1,100000), 0)") - p_cur.execute("select txid_current()") - xid = p_cur.fetchall()[0][0] - log.info(f"Master transaction {xid}") - with env.endpoints.new_replica_start( - origin=primary, endpoint_id="secondary" - ) as secondary: - wait_replica_caughtup(primary, secondary) - with secondary.connect() as s_con: - with s_con.cursor() as s_cur: - # Enforce setting hint bits for pg_class tuples. - # If master's transaction is not marked as in-progress in MVCC snapshot, - # then XMIN_INVALID hint bit will be set for table's 't' tuple makeing it invisible. - s_cur.execute("select * from pg_class") - p_cur.execute("commit") - wait_replica_caughtup(primary, secondary) - s_cur.execute("select * from t where pk = 1") - assert s_cur.fetchone() == (1, 0) diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index 223dd92595..ad73770c44 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit 223dd925959f8124711dd3d867dc8ba6629d52c0 +Subproject commit ad73770c446ea361f43e4f0404798b7e5e7a62d8 diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index f54d7373eb..4874c8e52e 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit f54d7373eb0de5a54bce2becdb1c801026c7edff +Subproject commit 4874c8e52ed349a9f8290bbdcd91eb92677a5d24 diff --git a/vendor/postgres-v16 b/vendor/postgres-v16 index e06bebc753..b810fdfcbb 160000 --- a/vendor/postgres-v16 +++ b/vendor/postgres-v16 @@ -1 +1 @@ -Subproject commit e06bebc75306b583e758b52c95946d41109239b2 +Subproject commit b810fdfcbb59afea7ea7bbe0cf94eaccb55a2ea2 diff --git a/vendor/revisions.json b/vendor/revisions.json index 574e371934..da49ff19c3 100644 --- a/vendor/revisions.json +++ b/vendor/revisions.json @@ -1,5 +1,5 @@ { - "v16": ["16.3", "e06bebc75306b583e758b52c95946d41109239b2"], - "v15": ["15.7", "f54d7373eb0de5a54bce2becdb1c801026c7edff"], - "v14": ["14.12", "223dd925959f8124711dd3d867dc8ba6629d52c0"] + "v16": ["16.3", "b810fdfcbb59afea7ea7bbe0cf94eaccb55a2ea2"], + "v15": ["15.7", "4874c8e52ed349a9f8290bbdcd91eb92677a5d24"], + "v14": ["14.12", "ad73770c446ea361f43e4f0404798b7e5e7a62d8"] } From aea5cfe21e62b4df285c0c55c12f79df8fbde1a4 Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 1 Jul 2024 12:48:20 +0100 Subject: [PATCH 5/6] pageserver: add metric `pageserver_secondary_resident_physical_size` (#8204) ## Problem We lack visibility of how much local disk space is used by secondary tenant locations Close: https://github.com/neondatabase/neon/issues/8181 ## Summary of changes - Add `pageserver_secondary_resident_physical_size`, tagged by tenant - Register & de-register label sets from SecondaryTenant - Add+use wrappers in SecondaryDetail that update metrics when adding+removing layers/timelines --- pageserver/src/metrics.rs | 11 +- pageserver/src/tenant/secondary.rs | 37 +++- pageserver/src/tenant/secondary/downloader.rs | 173 ++++++++++++++---- 3 files changed, 171 insertions(+), 50 deletions(-) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index f5aca6dfb3..9cd7ffa042 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -476,7 +476,7 @@ static STANDBY_HORIZON: Lazy = Lazy::new(|| { static RESIDENT_PHYSICAL_SIZE: Lazy = Lazy::new(|| { register_uint_gauge_vec!( "pageserver_resident_physical_size", - "The size of the layer files present in the pageserver's filesystem.", + "The size of the layer files present in the pageserver's filesystem, for attached locations.", &["tenant_id", "shard_id", "timeline_id"] ) .expect("failed to define a metric") @@ -1691,6 +1691,15 @@ pub(crate) static SECONDARY_MODE: Lazy = Lazy::new(|| { } }); +pub(crate) static SECONDARY_RESIDENT_PHYSICAL_SIZE: Lazy = Lazy::new(|| { + register_uint_gauge_vec!( + "pageserver_secondary_resident_physical_size", + "The size of the layer files present in the pageserver's filesystem, for secondary locations.", + &["tenant_id", "shard_id"] + ) + .expect("failed to define a metric") +}); + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum RemoteOpKind { Upload, diff --git a/pageserver/src/tenant/secondary.rs b/pageserver/src/tenant/secondary.rs index af6840f525..a233d11c4a 100644 --- a/pageserver/src/tenant/secondary.rs +++ b/pageserver/src/tenant/secondary.rs @@ -23,6 +23,8 @@ use super::{ storage_layer::LayerName, }; +use crate::metrics::SECONDARY_RESIDENT_PHYSICAL_SIZE; +use metrics::UIntGauge; use pageserver_api::{ models, shard::{ShardIdentity, TenantShardId}, @@ -99,6 +101,17 @@ pub(crate) struct SecondaryTenant { // Public state indicating overall progress of downloads relative to the last heatmap seen pub(crate) progress: std::sync::Mutex, + + // Sum of layer sizes on local disk + pub(super) resident_size_metric: UIntGauge, +} + +impl Drop for SecondaryTenant { + fn drop(&mut self) { + let tenant_id = self.tenant_shard_id.tenant_id.to_string(); + let shard_id = format!("{}", self.tenant_shard_id.shard_slug()); + let _ = SECONDARY_RESIDENT_PHYSICAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]); + } } impl SecondaryTenant { @@ -108,6 +121,12 @@ impl SecondaryTenant { tenant_conf: TenantConfOpt, config: &SecondaryLocationConfig, ) -> Arc { + let tenant_id = tenant_shard_id.tenant_id.to_string(); + let shard_id = format!("{}", tenant_shard_id.shard_slug()); + let resident_size_metric = SECONDARY_RESIDENT_PHYSICAL_SIZE + .get_metric_with_label_values(&[&tenant_id, &shard_id]) + .unwrap(); + Arc::new(Self { tenant_shard_id, // todo: shall we make this a descendent of the @@ -123,6 +142,8 @@ impl SecondaryTenant { detail: std::sync::Mutex::new(SecondaryDetail::new(config.clone())), progress: std::sync::Mutex::default(), + + resident_size_metric, }) } @@ -211,16 +232,12 @@ impl SecondaryTenant { // have to 100% match what is on disk, because it's a best-effort warming // of the cache. let mut detail = this.detail.lock().unwrap(); - if let Some(timeline_detail) = detail.timelines.get_mut(&timeline_id) { - let removed = timeline_detail.on_disk_layers.remove(&name); - - // We might race with removal of the same layer during downloads, if it was removed - // from the heatmap. If we see that the OnDiskState is gone, then no need to - // do a physical deletion or store in evicted_at. - if let Some(removed) = removed { - removed.remove_blocking(); - timeline_detail.evicted_at.insert(name, now); - } + if let Some(removed) = + detail.evict_layer(name, &timeline_id, now, &this.resident_size_metric) + { + // We might race with removal of the same layer during downloads, so finding the layer we + // were trying to remove is optional. Only issue the disk I/O to remove it if we found it. + removed.remove_blocking(); } }) .await diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index f6f30641db..27439d4f03 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -46,6 +46,7 @@ use crate::tenant::{ use camino::Utf8PathBuf; use chrono::format::{DelayedFormat, StrftimeItems}; use futures::Future; +use metrics::UIntGauge; use pageserver_api::models::SecondaryProgress; use pageserver_api::shard::TenantShardId; use remote_storage::{DownloadError, Etag, GenericRemoteStorage}; @@ -131,16 +132,66 @@ impl OnDiskState { .or_else(fs_ext::ignore_not_found) .fatal_err("Deleting secondary layer") } + + pub(crate) fn file_size(&self) -> u64 { + self.metadata.file_size + } } #[derive(Debug, Clone, Default)] pub(super) struct SecondaryDetailTimeline { - pub(super) on_disk_layers: HashMap, + on_disk_layers: HashMap, /// We remember when layers were evicted, to prevent re-downloading them. pub(super) evicted_at: HashMap, } +impl SecondaryDetailTimeline { + pub(super) fn remove_layer( + &mut self, + name: &LayerName, + resident_metric: &UIntGauge, + ) -> Option { + let removed = self.on_disk_layers.remove(name); + if let Some(removed) = &removed { + resident_metric.sub(removed.file_size()); + } + removed + } + + /// `local_path` + fn touch_layer( + &mut self, + conf: &'static PageServerConf, + tenant_shard_id: &TenantShardId, + timeline_id: &TimelineId, + touched: &HeatMapLayer, + resident_metric: &UIntGauge, + local_path: F, + ) where + F: FnOnce() -> Utf8PathBuf, + { + use std::collections::hash_map::Entry; + match self.on_disk_layers.entry(touched.name.clone()) { + Entry::Occupied(mut v) => { + v.get_mut().access_time = touched.access_time; + } + Entry::Vacant(e) => { + e.insert(OnDiskState::new( + conf, + tenant_shard_id, + timeline_id, + touched.name.clone(), + touched.metadata.clone(), + touched.access_time, + local_path(), + )); + resident_metric.add(touched.metadata.file_size); + } + } + } +} + // Aspects of a heatmap that we remember after downloading it #[derive(Clone, Debug)] struct DownloadSummary { @@ -158,7 +209,7 @@ pub(super) struct SecondaryDetail { last_download: Option, next_download: Option, - pub(super) timelines: HashMap, + timelines: HashMap, } /// Helper for logging SystemTime @@ -191,6 +242,38 @@ impl SecondaryDetail { } } + pub(super) fn evict_layer( + &mut self, + name: LayerName, + timeline_id: &TimelineId, + now: SystemTime, + resident_metric: &UIntGauge, + ) -> Option { + let timeline = self.timelines.get_mut(timeline_id)?; + let removed = timeline.remove_layer(&name, resident_metric); + if removed.is_some() { + timeline.evicted_at.insert(name, now); + } + removed + } + + pub(super) fn remove_timeline( + &mut self, + timeline_id: &TimelineId, + resident_metric: &UIntGauge, + ) { + let removed = self.timelines.remove(timeline_id); + if let Some(removed) = removed { + resident_metric.sub( + removed + .on_disk_layers + .values() + .map(|l| l.metadata.file_size) + .sum(), + ); + } + } + /// Additionally returns the total number of layers, used for more stable relative access time /// based eviction. pub(super) fn get_layers_for_eviction( @@ -601,8 +684,13 @@ impl<'a> TenantDownloader<'a> { Some(t) => t, None => { // We have no existing state: need to scan local disk for layers first. - let timeline_state = - init_timeline_state(self.conf, tenant_shard_id, timeline).await; + let timeline_state = init_timeline_state( + self.conf, + tenant_shard_id, + timeline, + &self.secondary_state.resident_size_metric, + ) + .await; // Re-acquire detail lock now that we're done with async load from local FS self.secondary_state @@ -671,6 +759,25 @@ impl<'a> TenantDownloader<'a> { .await?; } + // Metrics consistency check in testing builds + if cfg!(feature = "testing") { + let detail = self.secondary_state.detail.lock().unwrap(); + let resident_size = detail + .timelines + .values() + .map(|tl| { + tl.on_disk_layers + .values() + .map(|v| v.metadata.file_size) + .sum::() + }) + .sum::(); + assert_eq!( + resident_size, + self.secondary_state.resident_size_metric.get() + ); + } + // Only update last_etag after a full successful download: this way will not skip // the next download, even if the heatmap's actual etag is unchanged. self.secondary_state.detail.lock().unwrap().last_download = Some(DownloadSummary { @@ -783,7 +890,7 @@ impl<'a> TenantDownloader<'a> { for delete_timeline in &delete_timelines { // We haven't removed from disk yet, but optimistically remove from in-memory state: if removal // from disk fails that will be a fatal error. - detail.timelines.remove(delete_timeline); + detail.remove_timeline(delete_timeline, &self.secondary_state.resident_size_metric); } } @@ -801,7 +908,7 @@ impl<'a> TenantDownloader<'a> { let Some(timeline_state) = detail.timelines.get_mut(&timeline_id) else { continue; }; - timeline_state.on_disk_layers.remove(&layer_name); + timeline_state.remove_layer(&layer_name, &self.secondary_state.resident_size_metric); } for timeline_id in delete_timelines { @@ -1000,33 +1107,24 @@ impl<'a> TenantDownloader<'a> { let timeline_detail = detail.timelines.entry(timeline_id).or_default(); tracing::info!("Wrote timeline_detail for {} touched layers", touched.len()); - - for t in touched { - use std::collections::hash_map::Entry; - match timeline_detail.on_disk_layers.entry(t.name.clone()) { - Entry::Occupied(mut v) => { - v.get_mut().access_time = t.access_time; - } - Entry::Vacant(e) => { - let local_path = local_layer_path( + touched.into_iter().for_each(|t| { + timeline_detail.touch_layer( + self.conf, + tenant_shard_id, + &timeline_id, + &t, + &self.secondary_state.resident_size_metric, + || { + local_layer_path( self.conf, tenant_shard_id, &timeline_id, &t.name, &t.metadata.generation, - ); - e.insert(OnDiskState::new( - self.conf, - tenant_shard_id, - &timeline_id, - t.name, - t.metadata.clone(), - t.access_time, - local_path, - )); - } - } - } + ) + }, + ) + }); } result @@ -1135,6 +1233,7 @@ async fn init_timeline_state( conf: &'static PageServerConf, tenant_shard_id: &TenantShardId, heatmap: &HeatMapTimeline, + resident_metric: &UIntGauge, ) -> SecondaryDetailTimeline { let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id); let mut detail = SecondaryDetailTimeline::default(); @@ -1210,17 +1309,13 @@ async fn init_timeline_state( } else { // We expect the access time to be initialized immediately afterwards, when // the latest heatmap is applied to the state. - detail.on_disk_layers.insert( - name.clone(), - OnDiskState::new( - conf, - tenant_shard_id, - &heatmap.timeline_id, - name, - remote_meta.metadata.clone(), - remote_meta.access_time, - file_path, - ), + detail.touch_layer( + conf, + tenant_shard_id, + &heatmap.timeline_id, + remote_meta, + resident_metric, + || file_path, ); } } From e823b9294714d0c5048942907c06b678c4a6c4a0 Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Mon, 1 Jul 2024 13:11:55 +0100 Subject: [PATCH 6/6] CI(build-tools): Remove libpq from build image (#8206) ## Problem We use `build-tools` image as a base image to build other images, and it has a pretty old `libpq-dev` installed (v13; it wasn't that old until I removed system Postgres 14 from `build-tools` image in https://github.com/neondatabase/neon/pull/6540) ## Summary of changes - Remove `libpq-dev` from `build-tools` image - Set `LD_LIBRARY_PATH` for tests (for different Postgres binaries that we use, like psql and pgbench) - Set `PQ_LIB_DIR` to build Storage Controller - Set `LD_LIBRARY_PATH`/`DYLD_LIBRARY_PATH` in the Storage Controller where it calls Postgres binaries --- .../actions/run-python-test-set/action.yml | 1 + .github/workflows/benchmarking.yml | 4 +++ .github/workflows/build-build-tools-image.yml | 1 + .github/workflows/build_and_test.yml | 7 ++++ .github/workflows/neon_extra_builds.yml | 7 ++++ Dockerfile | 3 +- Dockerfile.build-tools | 1 - control_plane/src/local_env.rs | 11 ++++-- control_plane/src/storage_controller.rs | 34 +++++++++++++++---- 9 files changed, 57 insertions(+), 12 deletions(-) diff --git a/.github/actions/run-python-test-set/action.yml b/.github/actions/run-python-test-set/action.yml index c6ea52ba88..a2aae0772b 100644 --- a/.github/actions/run-python-test-set/action.yml +++ b/.github/actions/run-python-test-set/action.yml @@ -114,6 +114,7 @@ runs: export PLATFORM=${PLATFORM:-github-actions-selfhosted} export POSTGRES_DISTRIB_DIR=${POSTGRES_DISTRIB_DIR:-/tmp/neon/pg_install} export DEFAULT_PG_VERSION=${PG_VERSION#v} + export LD_LIBRARY_PATH=${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/lib if [ "${BUILD_TYPE}" = "remote" ]; then export REMOTE_ENV=1 diff --git a/.github/workflows/benchmarking.yml b/.github/workflows/benchmarking.yml index db4209500f..0e748adeb6 100644 --- a/.github/workflows/benchmarking.yml +++ b/.github/workflows/benchmarking.yml @@ -379,6 +379,10 @@ jobs: - name: Add Postgres binaries to PATH run: | + LD_LIBRARY_PATH="${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/lib" + export LD_LIBRARY_PATH + echo "LD_LIBRARY_PATH=${LD_LIBRARY_PATH}" >> $GITHUB_ENV + ${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin/pgbench --version echo "${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin" >> $GITHUB_PATH diff --git a/.github/workflows/build-build-tools-image.yml b/.github/workflows/build-build-tools-image.yml index 5a94dd8e6f..f1c39e7e4f 100644 --- a/.github/workflows/build-build-tools-image.yml +++ b/.github/workflows/build-build-tools-image.yml @@ -82,6 +82,7 @@ jobs: tags: neondatabase/build-tools:${{ inputs.image-tag }}-${{ matrix.arch }} - name: Remove custom docker config directory + if: always() run: | rm -rf /tmp/.docker-custom diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 9cea9f4148..24ad26205b 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -335,6 +335,8 @@ jobs: - name: Run cargo build run: | + PQ_LIB_DIR=$(pwd)/pg_install/v16/lib + export PQ_LIB_DIR ${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins --tests # Do install *before* running rust tests because they might recompile the @@ -383,6 +385,11 @@ jobs: env: NEXTEST_RETRIES: 3 run: | + PQ_LIB_DIR=$(pwd)/pg_install/v16/lib + export PQ_LIB_DIR + LD_LIBRARY_PATH=$(pwd)/pg_install/v16/lib + export LD_LIBRARY_PATH + #nextest does not yet support running doctests cargo test --doc $CARGO_FLAGS $CARGO_FEATURES diff --git a/.github/workflows/neon_extra_builds.yml b/.github/workflows/neon_extra_builds.yml index 7d2187e59c..330d858c0e 100644 --- a/.github/workflows/neon_extra_builds.yml +++ b/.github/workflows/neon_extra_builds.yml @@ -232,12 +232,19 @@ jobs: - name: Run cargo build run: | + PQ_LIB_DIR=$(pwd)/pg_install/v16/lib + export PQ_LIB_DIR mold -run cargo build --locked $CARGO_FLAGS $CARGO_FEATURES --bins --tests -j$(nproc) - name: Run cargo test env: NEXTEST_RETRIES: 3 run: | + PQ_LIB_DIR=$(pwd)/pg_install/v16/lib + export PQ_LIB_DIR + LD_LIBRARY_PATH=$(pwd)/pg_install/v16/lib + export LD_LIBRARY_PATH + cargo nextest run $CARGO_FEATURES -j$(nproc) # Run separate tests for real S3 diff --git a/Dockerfile b/Dockerfile index b4900d4a94..f0197758e4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -42,12 +42,13 @@ ARG CACHEPOT_BUCKET=neon-github-dev COPY --from=pg-build /home/nonroot/pg_install/v14/include/postgresql/server pg_install/v14/include/postgresql/server COPY --from=pg-build /home/nonroot/pg_install/v15/include/postgresql/server pg_install/v15/include/postgresql/server COPY --from=pg-build /home/nonroot/pg_install/v16/include/postgresql/server pg_install/v16/include/postgresql/server +COPY --from=pg-build /home/nonroot/pg_install/v16/lib pg_install/v16/lib COPY --chown=nonroot . . # Show build caching stats to check if it was used in the end. # Has to be the part of the same RUN since cachepot daemon is killed in the end of this RUN, losing the compilation stats. RUN set -e \ - && RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment" cargo build \ + && PQ_LIB_DIR=$(pwd)/pg_install/v16/lib RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment" cargo build \ --bin pg_sni_router \ --bin pageserver \ --bin pagectl \ diff --git a/Dockerfile.build-tools b/Dockerfile.build-tools index f85706ef6a..30314376ef 100644 --- a/Dockerfile.build-tools +++ b/Dockerfile.build-tools @@ -26,7 +26,6 @@ RUN set -e \ liblzma-dev \ libncurses5-dev \ libncursesw5-dev \ - libpq-dev \ libreadline-dev \ libseccomp-dev \ libsqlite3-dev \ diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 6634274d2a..3ac3ce21df 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -325,11 +325,16 @@ impl LocalEnv { } } - pub fn pg_bin_dir(&self, pg_version: u32) -> anyhow::Result { - Ok(self.pg_distrib_dir(pg_version)?.join("bin")) + pub fn pg_dir(&self, pg_version: u32, dir_name: &str) -> anyhow::Result { + Ok(self.pg_distrib_dir(pg_version)?.join(dir_name)) } + + pub fn pg_bin_dir(&self, pg_version: u32) -> anyhow::Result { + self.pg_dir(pg_version, "bin") + } + pub fn pg_lib_dir(&self, pg_version: u32) -> anyhow::Result { - Ok(self.pg_distrib_dir(pg_version)?.join("lib")) + self.pg_dir(pg_version, "lib") } pub fn pageserver_bin(&self) -> PathBuf { diff --git a/control_plane/src/storage_controller.rs b/control_plane/src/storage_controller.rs index 5ca1b13b2a..47103a2e0a 100644 --- a/control_plane/src/storage_controller.rs +++ b/control_plane/src/storage_controller.rs @@ -155,16 +155,16 @@ impl StorageController { .expect("non-Unicode path") } - /// Find the directory containing postgres binaries, such as `initdb` and `pg_ctl` + /// Find the directory containing postgres subdirectories, such `bin` and `lib` /// /// This usually uses STORAGE_CONTROLLER_POSTGRES_VERSION of postgres, but will fall back /// to other versions if that one isn't found. Some automated tests create circumstances /// where only one version is available in pg_distrib_dir, such as `test_remote_extensions`. - pub async fn get_pg_bin_dir(&self) -> anyhow::Result { + async fn get_pg_dir(&self, dir_name: &str) -> anyhow::Result { let prefer_versions = [STORAGE_CONTROLLER_POSTGRES_VERSION, 15, 14]; for v in prefer_versions { - let path = Utf8PathBuf::from_path_buf(self.env.pg_bin_dir(v)?).unwrap(); + let path = Utf8PathBuf::from_path_buf(self.env.pg_dir(v, dir_name)?).unwrap(); if tokio::fs::try_exists(&path).await? { return Ok(path); } @@ -172,11 +172,20 @@ impl StorageController { // Fall through anyhow::bail!( - "Postgres binaries not found in {}", - self.env.pg_distrib_dir.display() + "Postgres directory '{}' not found in {}", + dir_name, + self.env.pg_distrib_dir.display(), ); } + pub async fn get_pg_bin_dir(&self) -> anyhow::Result { + self.get_pg_dir("bin").await + } + + pub async fn get_pg_lib_dir(&self) -> anyhow::Result { + self.get_pg_dir("lib").await + } + /// Readiness check for our postgres process async fn pg_isready(&self, pg_bin_dir: &Utf8Path) -> anyhow::Result { let bin_path = pg_bin_dir.join("pg_isready"); @@ -229,12 +238,17 @@ impl StorageController { .unwrap() .join("storage_controller_db"); let pg_bin_dir = self.get_pg_bin_dir().await?; + let pg_lib_dir = self.get_pg_lib_dir().await?; let pg_log_path = pg_data_path.join("postgres.log"); if !tokio::fs::try_exists(&pg_data_path).await? { // Initialize empty database let initdb_path = pg_bin_dir.join("initdb"); let mut child = Command::new(&initdb_path) + .envs(vec![ + ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), + ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), + ]) .args(["-D", pg_data_path.as_ref()]) .spawn() .expect("Failed to spawn initdb"); @@ -269,7 +283,10 @@ impl StorageController { &self.env.base_data_dir, pg_bin_dir.join("pg_ctl").as_std_path(), db_start_args, - [], + vec![ + ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), + ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), + ], background_process::InitialPidFile::Create(self.postgres_pid_file()), retry_timeout, || self.pg_isready(&pg_bin_dir), @@ -324,7 +341,10 @@ impl StorageController { &self.env.base_data_dir, &self.env.storage_controller_bin(), args, - [], + vec![ + ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), + ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), + ], background_process::InitialPidFile::Create(self.pid_file()), retry_timeout, || async {