mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 22:10:39 +00:00
Add neon.primary_is_running GUC. (#6705)
We set it for neon replica, if primary is running. Postgres uses this GUC at the start, to determine if replica should wait for RUNNING_XACTS from primary or not. Corresponding cloud PR is https://github.com/neondatabase/cloud/pull/10183 * Add test hot-standby replica startup. * Extract oldest_running_xid from XlRunningXits WAL records. --------- Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech> Co-authored-by: Konstantin Knizhnik <knizhnik@garret.ru> Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
This commit is contained in:
committed by
GitHub
parent
cd449d66ea
commit
a12e4261a3
@@ -82,6 +82,12 @@ pub fn write_postgres_conf(
|
||||
ComputeMode::Replica => {
|
||||
// hot_standby is 'on' by default, but let's be explicit
|
||||
writeln!(file, "hot_standby=on")?;
|
||||
|
||||
// Inform the replica about the primary state
|
||||
// Default is 'false'
|
||||
if let Some(primary_is_running) = spec.primary_is_running {
|
||||
writeln!(file, "neon.primary_is_running={}", primary_is_running)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -590,6 +590,7 @@ impl Endpoint {
|
||||
remote_extensions,
|
||||
pgbouncer_settings: None,
|
||||
shard_stripe_size: Some(shard_stripe_size),
|
||||
primary_is_running: None,
|
||||
};
|
||||
let spec_path = self.endpoint_path().join("spec.json");
|
||||
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
|
||||
|
||||
@@ -79,6 +79,12 @@ pub struct ComputeSpec {
|
||||
// Stripe size for pageserver sharding, in pages
|
||||
#[serde(default)]
|
||||
pub shard_stripe_size: Option<usize>,
|
||||
|
||||
// When we are starting a new replica in hot standby mode,
|
||||
// we need to know if the primary is running.
|
||||
// This is used to determine if replica should wait for
|
||||
// RUNNING_XACTS from primary or not.
|
||||
pub primary_is_running: Option<bool>,
|
||||
}
|
||||
|
||||
/// Feature flag to signal `compute_ctl` to enable certain experimental functionality.
|
||||
|
||||
@@ -80,6 +80,9 @@ pub const XLOG_XACT_ABORT: u8 = 0x20;
|
||||
pub const XLOG_XACT_COMMIT_PREPARED: u8 = 0x30;
|
||||
pub const XLOG_XACT_ABORT_PREPARED: u8 = 0x40;
|
||||
|
||||
// From standbydefs.h
|
||||
pub const XLOG_RUNNING_XACTS: u8 = 0x10;
|
||||
|
||||
// From srlu.h
|
||||
pub const SLRU_PAGES_PER_SEGMENT: u32 = 32;
|
||||
pub const SLRU_SEG_SIZE: usize = BLCKSZ as usize * SLRU_PAGES_PER_SEGMENT as usize;
|
||||
|
||||
@@ -119,11 +119,6 @@ pub fn generate_pg_control(
|
||||
// Generate new pg_control needed for bootstrap
|
||||
checkpoint.redo = normalize_lsn(lsn, WAL_SEGMENT_SIZE).0;
|
||||
|
||||
//reset some fields we don't want to preserve
|
||||
//TODO Check this.
|
||||
//We may need to determine the value from twophase data.
|
||||
checkpoint.oldestActiveXid = 0;
|
||||
|
||||
//save new values in pg_control
|
||||
pg_control.checkPoint = 0;
|
||||
pg_control.checkPointCopy = checkpoint;
|
||||
|
||||
@@ -334,6 +334,12 @@ impl WalIngest {
|
||||
{
|
||||
self.checkpoint.oldestXid = xlog_checkpoint.oldestXid;
|
||||
}
|
||||
trace!(
|
||||
"xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
|
||||
xlog_checkpoint.oldestActiveXid,
|
||||
self.checkpoint.oldestActiveXid
|
||||
);
|
||||
self.checkpoint.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
|
||||
|
||||
// Write a new checkpoint key-value pair on every checkpoint record, even
|
||||
// if nothing really changed. Not strictly required, but it seems nice to
|
||||
@@ -360,6 +366,13 @@ impl WalIngest {
|
||||
}
|
||||
}
|
||||
}
|
||||
pg_constants::RM_STANDBY_ID => {
|
||||
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
if info == pg_constants::XLOG_RUNNING_XACTS {
|
||||
let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf);
|
||||
self.checkpoint.oldestActiveXid = xlrec.oldest_running_xid;
|
||||
}
|
||||
}
|
||||
_x => {
|
||||
// TODO: should probably log & fail here instead of blindly
|
||||
// doing something without understanding the protocol
|
||||
|
||||
@@ -773,6 +773,42 @@ impl XlLogicalMessage {
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug)]
|
||||
pub struct XlRunningXacts {
|
||||
pub xcnt: u32,
|
||||
pub subxcnt: u32,
|
||||
pub subxid_overflow: bool,
|
||||
pub next_xid: TransactionId,
|
||||
pub oldest_running_xid: TransactionId,
|
||||
pub latest_completed_xid: TransactionId,
|
||||
pub xids: Vec<TransactionId>,
|
||||
}
|
||||
|
||||
impl XlRunningXacts {
|
||||
pub fn decode(buf: &mut Bytes) -> XlRunningXacts {
|
||||
let xcnt = buf.get_u32_le();
|
||||
let subxcnt = buf.get_u32_le();
|
||||
let subxid_overflow = buf.get_u32_le() != 0;
|
||||
let next_xid = buf.get_u32_le();
|
||||
let oldest_running_xid = buf.get_u32_le();
|
||||
let latest_completed_xid = buf.get_u32_le();
|
||||
let mut xids = Vec::new();
|
||||
for _ in 0..(xcnt + subxcnt) {
|
||||
xids.push(buf.get_u32_le());
|
||||
}
|
||||
XlRunningXacts {
|
||||
xcnt,
|
||||
subxcnt,
|
||||
subxid_overflow,
|
||||
next_xid,
|
||||
oldest_running_xid,
|
||||
latest_completed_xid,
|
||||
xids,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Main routine to decode a WAL record and figure out which blocks are modified
|
||||
//
|
||||
// See xlogrecord.h for details
|
||||
|
||||
@@ -38,6 +38,7 @@ PG_MODULE_MAGIC;
|
||||
void _PG_init(void);
|
||||
|
||||
static int logical_replication_max_snap_files = 300;
|
||||
bool primary_is_running = false;
|
||||
|
||||
static void
|
||||
InitLogicalReplicationMonitor(void)
|
||||
@@ -267,6 +268,7 @@ LogicalSlotsMonitorMain(Datum main_arg)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
_PG_init(void)
|
||||
{
|
||||
@@ -287,6 +289,15 @@ _PG_init(void)
|
||||
|
||||
pg_init_extension_server();
|
||||
|
||||
DefineCustomBoolVariable(
|
||||
"neon.primary_is_running",
|
||||
"true if the primary was running at replica startup. false otherwise",
|
||||
NULL,
|
||||
&primary_is_running,
|
||||
false,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
/*
|
||||
* Important: This must happen after other parts of the extension are
|
||||
* loaded, otherwise any settings to GUCs that were set before the
|
||||
|
||||
@@ -3104,6 +3104,8 @@ class Endpoint(PgProtocol):
|
||||
# set small 'max_replication_write_lag' to enable backpressure
|
||||
# and make tests more stable.
|
||||
config_lines = ["max_replication_write_lag=15MB"] + config_lines
|
||||
|
||||
config_lines = ["neon.primary_is_running=on"] + config_lines
|
||||
self.config(config_lines)
|
||||
|
||||
return self
|
||||
@@ -4147,6 +4149,21 @@ def tenant_get_shards(
|
||||
return [(TenantShardId(tenant_id, 0, 0), override_pageserver or env.pageserver)]
|
||||
|
||||
|
||||
def wait_replica_caughtup(primary: Endpoint, secondary: Endpoint):
|
||||
primary_lsn = Lsn(
|
||||
primary.safe_psql_scalar("SELECT pg_current_wal_flush_lsn()", log_query=False)
|
||||
)
|
||||
while True:
|
||||
secondary_lsn = Lsn(
|
||||
secondary.safe_psql_scalar("SELECT pg_last_wal_replay_lsn()", log_query=False)
|
||||
)
|
||||
caught_up = secondary_lsn >= primary_lsn
|
||||
log.info(f"caughtup={caught_up}, primary_lsn={primary_lsn}, secondary_lsn={secondary_lsn}")
|
||||
if caught_up:
|
||||
return
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
def wait_for_last_flush_lsn(
|
||||
env: NeonEnv,
|
||||
endpoint: Endpoint,
|
||||
|
||||
@@ -3,22 +3,7 @@ import re
|
||||
import time
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv
|
||||
|
||||
|
||||
def wait_caughtup(primary: Endpoint, secondary: Endpoint):
|
||||
primary_lsn = primary.safe_psql_scalar(
|
||||
"SELECT pg_current_wal_insert_lsn()::text", log_query=False
|
||||
)
|
||||
while True:
|
||||
secondary_lsn = secondary.safe_psql_scalar(
|
||||
"SELECT pg_last_wal_replay_lsn()", log_query=False
|
||||
)
|
||||
caught_up = secondary_lsn >= primary_lsn
|
||||
log.info(f"caughtup={caught_up}, primary_lsn={primary_lsn}, secondary_lsn={secondary_lsn}")
|
||||
if caught_up:
|
||||
return
|
||||
time.sleep(1)
|
||||
from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
|
||||
|
||||
|
||||
# Check for corrupted WAL messages which might otherwise go unnoticed if
|
||||
@@ -79,7 +64,7 @@ def test_hot_standby(neon_simple_env: NeonEnv):
|
||||
primary.safe_psql("create table t(key int, value text)")
|
||||
primary.safe_psql("insert into t select generate_series(1, 100000), 'payload'")
|
||||
|
||||
wait_caughtup(primary, secondary)
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
|
||||
with secondary.connect() as s_con:
|
||||
with s_con.cursor() as s_cur:
|
||||
|
||||
30
test_runner/regress/test_replication_start.py
Normal file
30
test_runner/regress/test_replication_start.py
Normal file
@@ -0,0 +1,30 @@
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
|
||||
|
||||
|
||||
def test_replication_start(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
with env.endpoints.create_start(branch_name="main", endpoint_id="primary") as primary:
|
||||
with primary.connect() as p_con:
|
||||
with p_con.cursor() as p_cur:
|
||||
p_cur.execute("begin")
|
||||
p_cur.execute("create table t(pk integer primary key, payload integer)")
|
||||
p_cur.execute("insert into t values (generate_series(1,100000), 0)")
|
||||
p_cur.execute("select txid_current()")
|
||||
xid = p_cur.fetchall()[0][0]
|
||||
log.info(f"Master transaction {xid}")
|
||||
with env.endpoints.new_replica_start(
|
||||
origin=primary, endpoint_id="secondary"
|
||||
) as secondary:
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
with secondary.connect() as s_con:
|
||||
with s_con.cursor() as s_cur:
|
||||
# Enforce setting hint bits for pg_class tuples.
|
||||
# If master's transaction is not marked as in-progress in MVCC snapshot,
|
||||
# then XMIN_INVALID hint bit will be set for table's 't' tuple makeing it invisible.
|
||||
s_cur.execute("select * from pg_class")
|
||||
p_cur.execute("commit")
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
s_cur.execute("select * from t where pk = 1")
|
||||
assert s_cur.fetchone() == (1, 0)
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 17101190de...4cdba8ec5a
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 0baccce15a...0ec04712d5
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: dc40299045...cc98378b0f
7
vendor/revisions.json
vendored
7
vendor/revisions.json
vendored
@@ -1,5 +1,6 @@
|
||||
{
|
||||
"postgres-v16": "dc40299045a377ec3b302c900134468a1b0f58ee",
|
||||
"postgres-v15": "0baccce15a3b0446af5c403d2e869a04541b63c4",
|
||||
"postgres-v14": "17101190de8a54b95e0831c66c3da426ed33db34"
|
||||
"postgres-v16": "cc98378b0fa7413b78a197e3292a806865e4056a",
|
||||
"postgres-v15": "0ec04712d55539550278595e853c172f7aa5fe3e",
|
||||
"postgres-v14": "4cdba8ec5a3868cec4826bbb3f16c1d3d2ac2283"
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user