From a12e4261a32522f3e95602870ca44a18c95766fb Mon Sep 17 00:00:00 2001 From: Anastasia Lubennikova Date: Fri, 23 Feb 2024 13:56:41 +0000 Subject: [PATCH] Add neon.primary_is_running GUC. (#6705) We set it for neon replica, if primary is running. Postgres uses this GUC at the start, to determine if replica should wait for RUNNING_XACTS from primary or not. Corresponding cloud PR is https://github.com/neondatabase/cloud/pull/10183 * Add test hot-standby replica startup. * Extract oldest_running_xid from XlRunningXits WAL records. --------- Co-authored-by: Konstantin Knizhnik Co-authored-by: Konstantin Knizhnik Co-authored-by: Heikki Linnakangas --- compute_tools/src/config.rs | 6 ++++ control_plane/src/endpoint.rs | 1 + libs/compute_api/src/spec.rs | 6 ++++ libs/postgres_ffi/src/pg_constants.rs | 3 ++ libs/postgres_ffi/src/xlog_utils.rs | 5 --- pageserver/src/walingest.rs | 13 +++++++ pageserver/src/walrecord.rs | 36 +++++++++++++++++++ pgxn/neon/neon.c | 11 ++++++ test_runner/fixtures/neon_fixtures.py | 17 +++++++++ test_runner/regress/test_hot_standby.py | 19 ++-------- test_runner/regress/test_replication_start.py | 30 ++++++++++++++++ vendor/postgres-v14 | 2 +- vendor/postgres-v15 | 2 +- vendor/postgres-v16 | 2 +- vendor/revisions.json | 7 ++-- 15 files changed, 132 insertions(+), 28 deletions(-) create mode 100644 test_runner/regress/test_replication_start.py diff --git a/compute_tools/src/config.rs b/compute_tools/src/config.rs index 03fd56aa97..42b8480211 100644 --- a/compute_tools/src/config.rs +++ b/compute_tools/src/config.rs @@ -82,6 +82,12 @@ pub fn write_postgres_conf( ComputeMode::Replica => { // hot_standby is 'on' by default, but let's be explicit writeln!(file, "hot_standby=on")?; + + // Inform the replica about the primary state + // Default is 'false' + if let Some(primary_is_running) = spec.primary_is_running { + writeln!(file, "neon.primary_is_running={}", primary_is_running)?; + } } } diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index bab7a70ce7..de7eb797d6 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -590,6 +590,7 @@ impl Endpoint { remote_extensions, pgbouncer_settings: None, shard_stripe_size: Some(shard_stripe_size), + primary_is_running: None, }; let spec_path = self.endpoint_path().join("spec.json"); std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?; diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index 2f412b61a3..71ae66c45c 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -79,6 +79,12 @@ pub struct ComputeSpec { // Stripe size for pageserver sharding, in pages #[serde(default)] pub shard_stripe_size: Option, + + // When we are starting a new replica in hot standby mode, + // we need to know if the primary is running. + // This is used to determine if replica should wait for + // RUNNING_XACTS from primary or not. + pub primary_is_running: Option, } /// Feature flag to signal `compute_ctl` to enable certain experimental functionality. diff --git a/libs/postgres_ffi/src/pg_constants.rs b/libs/postgres_ffi/src/pg_constants.rs index d59e0e4a15..2701ddf5e0 100644 --- a/libs/postgres_ffi/src/pg_constants.rs +++ b/libs/postgres_ffi/src/pg_constants.rs @@ -80,6 +80,9 @@ pub const XLOG_XACT_ABORT: u8 = 0x20; pub const XLOG_XACT_COMMIT_PREPARED: u8 = 0x30; pub const XLOG_XACT_ABORT_PREPARED: u8 = 0x40; +// From standbydefs.h +pub const XLOG_RUNNING_XACTS: u8 = 0x10; + // From srlu.h pub const SLRU_PAGES_PER_SEGMENT: u32 = 32; pub const SLRU_SEG_SIZE: usize = BLCKSZ as usize * SLRU_PAGES_PER_SEGMENT as usize; diff --git a/libs/postgres_ffi/src/xlog_utils.rs b/libs/postgres_ffi/src/xlog_utils.rs index 977653848d..4a66a0ab1d 100644 --- a/libs/postgres_ffi/src/xlog_utils.rs +++ b/libs/postgres_ffi/src/xlog_utils.rs @@ -119,11 +119,6 @@ pub fn generate_pg_control( // Generate new pg_control needed for bootstrap checkpoint.redo = normalize_lsn(lsn, WAL_SEGMENT_SIZE).0; - //reset some fields we don't want to preserve - //TODO Check this. - //We may need to determine the value from twophase data. - checkpoint.oldestActiveXid = 0; - //save new values in pg_control pg_control.checkPoint = 0; pg_control.checkPointCopy = checkpoint; diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 8df2f1713a..3a2705bb50 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -334,6 +334,12 @@ impl WalIngest { { self.checkpoint.oldestXid = xlog_checkpoint.oldestXid; } + trace!( + "xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}", + xlog_checkpoint.oldestActiveXid, + self.checkpoint.oldestActiveXid + ); + self.checkpoint.oldestActiveXid = xlog_checkpoint.oldestActiveXid; // Write a new checkpoint key-value pair on every checkpoint record, even // if nothing really changed. Not strictly required, but it seems nice to @@ -360,6 +366,13 @@ impl WalIngest { } } } + pg_constants::RM_STANDBY_ID => { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + if info == pg_constants::XLOG_RUNNING_XACTS { + let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf); + self.checkpoint.oldestActiveXid = xlrec.oldest_running_xid; + } + } _x => { // TODO: should probably log & fail here instead of blindly // doing something without understanding the protocol diff --git a/pageserver/src/walrecord.rs b/pageserver/src/walrecord.rs index 1b7777a544..ae2d996879 100644 --- a/pageserver/src/walrecord.rs +++ b/pageserver/src/walrecord.rs @@ -773,6 +773,42 @@ impl XlLogicalMessage { } } +#[repr(C)] +#[derive(Debug)] +pub struct XlRunningXacts { + pub xcnt: u32, + pub subxcnt: u32, + pub subxid_overflow: bool, + pub next_xid: TransactionId, + pub oldest_running_xid: TransactionId, + pub latest_completed_xid: TransactionId, + pub xids: Vec, +} + +impl XlRunningXacts { + pub fn decode(buf: &mut Bytes) -> XlRunningXacts { + let xcnt = buf.get_u32_le(); + let subxcnt = buf.get_u32_le(); + let subxid_overflow = buf.get_u32_le() != 0; + let next_xid = buf.get_u32_le(); + let oldest_running_xid = buf.get_u32_le(); + let latest_completed_xid = buf.get_u32_le(); + let mut xids = Vec::new(); + for _ in 0..(xcnt + subxcnt) { + xids.push(buf.get_u32_le()); + } + XlRunningXacts { + xcnt, + subxcnt, + subxid_overflow, + next_xid, + oldest_running_xid, + latest_completed_xid, + xids, + } + } +} + /// Main routine to decode a WAL record and figure out which blocks are modified // // See xlogrecord.h for details diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index 24ec909c79..a14288b33a 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -38,6 +38,7 @@ PG_MODULE_MAGIC; void _PG_init(void); static int logical_replication_max_snap_files = 300; +bool primary_is_running = false; static void InitLogicalReplicationMonitor(void) @@ -267,6 +268,7 @@ LogicalSlotsMonitorMain(Datum main_arg) } } + void _PG_init(void) { @@ -287,6 +289,15 @@ _PG_init(void) pg_init_extension_server(); + DefineCustomBoolVariable( + "neon.primary_is_running", + "true if the primary was running at replica startup. false otherwise", + NULL, + &primary_is_running, + false, + PGC_POSTMASTER, + 0, + NULL, NULL, NULL); /* * Important: This must happen after other parts of the extension are * loaded, otherwise any settings to GUCs that were set before the diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 79a4c7cde8..441b64ebfc 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -3104,6 +3104,8 @@ class Endpoint(PgProtocol): # set small 'max_replication_write_lag' to enable backpressure # and make tests more stable. config_lines = ["max_replication_write_lag=15MB"] + config_lines + + config_lines = ["neon.primary_is_running=on"] + config_lines self.config(config_lines) return self @@ -4147,6 +4149,21 @@ def tenant_get_shards( return [(TenantShardId(tenant_id, 0, 0), override_pageserver or env.pageserver)] +def wait_replica_caughtup(primary: Endpoint, secondary: Endpoint): + primary_lsn = Lsn( + primary.safe_psql_scalar("SELECT pg_current_wal_flush_lsn()", log_query=False) + ) + while True: + secondary_lsn = Lsn( + secondary.safe_psql_scalar("SELECT pg_last_wal_replay_lsn()", log_query=False) + ) + caught_up = secondary_lsn >= primary_lsn + log.info(f"caughtup={caught_up}, primary_lsn={primary_lsn}, secondary_lsn={secondary_lsn}") + if caught_up: + return + time.sleep(1) + + def wait_for_last_flush_lsn( env: NeonEnv, endpoint: Endpoint, diff --git a/test_runner/regress/test_hot_standby.py b/test_runner/regress/test_hot_standby.py index 7822e29ed9..0497e1965c 100644 --- a/test_runner/regress/test_hot_standby.py +++ b/test_runner/regress/test_hot_standby.py @@ -3,22 +3,7 @@ import re import time from fixtures.log_helper import log -from fixtures.neon_fixtures import Endpoint, NeonEnv - - -def wait_caughtup(primary: Endpoint, secondary: Endpoint): - primary_lsn = primary.safe_psql_scalar( - "SELECT pg_current_wal_insert_lsn()::text", log_query=False - ) - while True: - secondary_lsn = secondary.safe_psql_scalar( - "SELECT pg_last_wal_replay_lsn()", log_query=False - ) - caught_up = secondary_lsn >= primary_lsn - log.info(f"caughtup={caught_up}, primary_lsn={primary_lsn}, secondary_lsn={secondary_lsn}") - if caught_up: - return - time.sleep(1) +from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup # Check for corrupted WAL messages which might otherwise go unnoticed if @@ -79,7 +64,7 @@ def test_hot_standby(neon_simple_env: NeonEnv): primary.safe_psql("create table t(key int, value text)") primary.safe_psql("insert into t select generate_series(1, 100000), 'payload'") - wait_caughtup(primary, secondary) + wait_replica_caughtup(primary, secondary) with secondary.connect() as s_con: with s_con.cursor() as s_cur: diff --git a/test_runner/regress/test_replication_start.py b/test_runner/regress/test_replication_start.py new file mode 100644 index 0000000000..b4699c7be8 --- /dev/null +++ b/test_runner/regress/test_replication_start.py @@ -0,0 +1,30 @@ +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup + + +def test_replication_start(neon_simple_env: NeonEnv): + env = neon_simple_env + + with env.endpoints.create_start(branch_name="main", endpoint_id="primary") as primary: + with primary.connect() as p_con: + with p_con.cursor() as p_cur: + p_cur.execute("begin") + p_cur.execute("create table t(pk integer primary key, payload integer)") + p_cur.execute("insert into t values (generate_series(1,100000), 0)") + p_cur.execute("select txid_current()") + xid = p_cur.fetchall()[0][0] + log.info(f"Master transaction {xid}") + with env.endpoints.new_replica_start( + origin=primary, endpoint_id="secondary" + ) as secondary: + wait_replica_caughtup(primary, secondary) + with secondary.connect() as s_con: + with s_con.cursor() as s_cur: + # Enforce setting hint bits for pg_class tuples. + # If master's transaction is not marked as in-progress in MVCC snapshot, + # then XMIN_INVALID hint bit will be set for table's 't' tuple makeing it invisible. + s_cur.execute("select * from pg_class") + p_cur.execute("commit") + wait_replica_caughtup(primary, secondary) + s_cur.execute("select * from t where pk = 1") + assert s_cur.fetchone() == (1, 0) diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index 17101190de..4cdba8ec5a 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit 17101190de8a54b95e0831c66c3da426ed33db34 +Subproject commit 4cdba8ec5a3868cec4826bbb3f16c1d3d2ac2283 diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index 0baccce15a..0ec04712d5 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit 0baccce15a3b0446af5c403d2e869a04541b63c4 +Subproject commit 0ec04712d55539550278595e853c172f7aa5fe3e diff --git a/vendor/postgres-v16 b/vendor/postgres-v16 index dc40299045..cc98378b0f 160000 --- a/vendor/postgres-v16 +++ b/vendor/postgres-v16 @@ -1 +1 @@ -Subproject commit dc40299045a377ec3b302c900134468a1b0f58ee +Subproject commit cc98378b0fa7413b78a197e3292a806865e4056a diff --git a/vendor/revisions.json b/vendor/revisions.json index d18f1588f5..540b7ec898 100644 --- a/vendor/revisions.json +++ b/vendor/revisions.json @@ -1,5 +1,6 @@ { - "postgres-v16": "dc40299045a377ec3b302c900134468a1b0f58ee", - "postgres-v15": "0baccce15a3b0446af5c403d2e869a04541b63c4", - "postgres-v14": "17101190de8a54b95e0831c66c3da426ed33db34" + "postgres-v16": "cc98378b0fa7413b78a197e3292a806865e4056a", + "postgres-v15": "0ec04712d55539550278595e853c172f7aa5fe3e", + "postgres-v14": "4cdba8ec5a3868cec4826bbb3f16c1d3d2ac2283" } +