Compare commits

...

21 Commits

Author SHA1 Message Date
Matthias van de Meent
a9d0ec4cab Update the replica promotion test
The new one works and is slightly more ergonomic.
2025-05-28 14:36:51 +02:00
Matthias van de Meent
efb58f0743 Pass PG's current TimeLineID around to the right places
This allows pg_promote() to change PG's local timeline ID to a new one.
2025-05-28 14:34:40 +02:00
Matthias van de Meent
d33fbeee06 Always include SK endpoints in the neon.safekeepers configuration
This makes endpoint promotion much easier.
2025-05-28 14:31:35 +02:00
Konstantin Knizhnik
684bf85ca2 Add replicaPromote to WalProposerConfig 2025-05-28 10:35:42 +03:00
Konstantin Knizhnik
5ef5bc6c2b Add replicaPromote flag to walproposer config 2025-05-28 08:57:59 +03:00
Konstantin Knizhnik
e084c12ef6 Remove unused field 2025-05-27 22:04:13 +03:00
Konstantin Knizhnik
9bc82d7ad2 Do not explicitly launch wal_proposer: rely on BgWorkerStart_RecoveryFinished 2025-05-27 17:39:39 +03:00
Konstantin Knizhnik
279c6c0417 Remove special implementation of pg_promote for PG14 2025-05-27 11:42:29 +03:00
Konstantin Knizhnik
dc4388580e Make mypy happy 2025-05-25 21:42:20 +03:00
Konstantin Knizhnik
2d7a3a6f0f Bump Postgres version 2025-05-25 21:42:20 +03:00
Konstantin Knizhnik
f7bdc138c2 Make mypy happy 2025-05-25 21:42:20 +03:00
Konstantin Knizhnik
2cb1e43604 Add priomote support for pg14-16 2025-05-25 21:42:19 +03:00
Konstantin Knizhnik
cdc8057a7e Add priomote support for pg14-16 2025-05-25 21:42:19 +03:00
Konstantin Knizhnik
a3d88258a7 Make ruff happy 2025-05-25 21:42:19 +03:00
Konstantin Knizhnik
a5d45bceed Make test_replica_promote.py pass at pg17 2025-05-25 21:42:19 +03:00
Konstantin Knizhnik
d7cecc485c Undo adding set_redo_start_lsn function to walproposer API 2025-05-25 21:42:18 +03:00
Konstantin Knizhnik
b94054dca0 Some hacks for replica primotion 2025-05-25 21:42:18 +03:00
Konstantin Knizhnik
41adde29d7 Start walproposer on replica promotion 2025-05-25 21:42:18 +03:00
Matthias van de Meent
45649ccd62 Update test_replica_promote.py 2025-05-25 21:42:18 +03:00
Matthias van de Meent
19c5eb53e5 Add test for replica promotion
This validates that replicas can promote, and start write changes,
and that these changes are also persisted.  However, this does not
check any less-than-happy paths.
2025-05-25 21:42:17 +03:00
Matthias van de Meent
397e030fb0 Add test for replica promotion
This validates that replicas can promote, and start write changes,
and that these changes are also persisted.  However, this does not
check any less-than-happy paths.
2025-05-25 21:42:17 +03:00
16 changed files with 206 additions and 45 deletions

View File

@@ -616,19 +616,16 @@ impl Endpoint {
/// Map safekeepers ids to the actual connection strings.
fn build_safekeepers_connstrs(&self, sk_ids: Vec<NodeId>) -> Result<Vec<String>> {
let mut safekeeper_connstrings = Vec::new();
if self.mode == ComputeMode::Primary {
for sk_id in sk_ids {
let sk = self
.env
sk_ids.into_iter()
.map(|node_id| {
self.env
.safekeepers
.iter()
.find(|node| node.id == sk_id)
.ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
}
}
Ok(safekeeper_connstrings)
.find(|node| node.id == node_id)
.map(|node| format!("127.0.0.1:{}", node.get_compute_port()))
.ok_or_else(|| anyhow!("safekeeer {node_id} does not exist"))
})
.collect::<Result<Vec<String>>>()
}
/// Generate a JWT with the correct claims.

View File

@@ -213,6 +213,7 @@ impl Wrapper {
safekeeper_connection_timeout: config.safekeeper_connection_timeout,
wal_segment_size: WAL_SEGMENT_SIZE as i32, // default 16MB
syncSafekeepers: config.sync_safekeepers,
replicaPromote: false,
systemId: 0,
pgTimeline: 1,
proto_version: 3,

View File

@@ -69,6 +69,7 @@ struct NeonWALReader
WALSegmentContext segcxt;
WALOpenSegment seg;
int wre_errno;
TimeLineID local_active_tlid;
/* Explains failure to read, static for simplicity. */
char err_msg[NEON_WALREADER_ERR_MSG_LEN];
@@ -106,7 +107,8 @@ struct NeonWALReader
/* palloc and initialize NeonWALReader */
NeonWALReader *
NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, char *log_prefix)
NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn,
char *log_prefix, TimeLineID tlid)
{
NeonWALReader *reader;
@@ -118,6 +120,7 @@ NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, char *log_
MemoryContextAllocZero(TopMemoryContext, sizeof(NeonWALReader));
reader->available_lsn = available_lsn;
reader->local_active_tlid = tlid;
reader->seg.ws_file = -1;
reader->seg.ws_segno = 0;
reader->seg.ws_tli = 0;
@@ -577,6 +580,17 @@ NeonWALReaderIsRemConnEstablished(NeonWALReader *state)
return state->rem_state == RS_ESTABLISHED;
}
/*
* Whether remote connection is established. Once this is done, until successful
* local read or error socket is stable and user can update socket events
* instead of readding it each time.
*/
TimeLineID
NeonWALReaderLocalActiveTimeLineID(NeonWALReader *state)
{
return state->local_active_tlid;
}
/*
* Returns events user should wait on connection socket or 0 if remote
* connection is not active.

View File

@@ -19,9 +19,12 @@ typedef enum
NEON_WALREAD_ERROR,
} NeonWALReadResult;
extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, char *log_prefix);
extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size,
XLogRecPtr available_lsn,
char *log_prefix, TimeLineID tlid);
extern void NeonWALReaderFree(NeonWALReader *state);
extern void NeonWALReaderResetRemote(NeonWALReader *state);
extern TimeLineID NeonWALReaderLocalActiveTimeLineID(NeonWALReader *state);
extern NeonWALReadResult NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
extern pgsocket NeonWALReaderSocket(NeonWALReader *state);
extern uint32 NeonWALReaderEvents(NeonWALReader *state);

View File

@@ -98,6 +98,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
wp = palloc0(sizeof(WalProposer));
wp->config = config;
wp->api = api;
wp->localTimeLineID = config->pgTimeline;
wp->state = WPS_COLLECTING_TERMS;
wp->mconf.generation = INVALID_GENERATION;
wp->mconf.members.len = 0;
@@ -1379,7 +1380,7 @@ ProcessPropStartPos(WalProposer *wp)
* we must bail out, as clog and other non rel data is inconsistent.
*/
walprop_shared = wp->api.get_shmem_state(wp);
if (!wp->config->syncSafekeepers)
if (!wp->config->syncSafekeepers && !wp->config->replicaPromote)
{
/*
* Basebackup LSN always points to the beginning of the record (not

View File

@@ -739,6 +739,11 @@ typedef struct WalProposerConfig
*/
bool syncSafekeepers;
/*
* Replica is promoted to primary
*/
bool replicaPromote;
/* Will be passed to safekeepers in greet request. */
uint64 systemId;
@@ -805,6 +810,8 @@ typedef struct WalProposer
/* WAL has been generated up to this point */
XLogRecPtr availableLsn;
/* Current local TimeLineId in use */
TimeLineID localTimeLineID;
/* cached GetAcknowledgedByQuorumWALPosition result */
XLogRecPtr commitLsn;

View File

@@ -73,6 +73,7 @@ static XLogRecPtr sentPtr = InvalidXLogRecPtr;
static const walproposer_api walprop_pg;
static volatile sig_atomic_t got_SIGUSR2 = false;
static bool reported_sigusr2 = false;
static bool start_as_replica = false;
static XLogRecPtr standby_flush_lsn = InvalidXLogRecPtr;
static XLogRecPtr standby_apply_lsn = InvalidXLogRecPtr;
@@ -123,6 +124,7 @@ init_walprop_config(bool syncSafekeepers)
walprop_config.safekeeper_connection_timeout = wal_acceptor_connection_timeout;
walprop_config.wal_segment_size = wal_segment_size;
walprop_config.syncSafekeepers = syncSafekeepers;
walprop_config.replicaPromote = start_as_replica;
if (!syncSafekeepers)
walprop_config.systemId = GetSystemIdentifier();
else
@@ -149,6 +151,8 @@ WalProposerSync(int argc, char *argv[])
WalProposerStart(wp);
}
#define GUC_POLL_DELAY 100000L // 0.1 sec
/*
* WAL proposer bgworker entry point.
*/
@@ -157,12 +161,18 @@ WalProposerMain(Datum main_arg)
{
WalProposer *wp;
if (*wal_acceptors_list == '\0')
{
elog(PANIC, "Safekeepers list is empty");
}
init_walprop_config(false);
walprop_pg_init_bgworker();
am_walproposer = true;
walprop_pg_load_libpqwalreceiver();
wp = WalProposerCreate(&walprop_config, walprop_pg);
wp->localTimeLineID = GetWALInsertionTimeLine();
wp->last_reconnect_attempt = walprop_pg_get_current_timestamp(wp);
walprop_pg_init_walsender();
@@ -294,16 +304,15 @@ safekeepers_cmp(char *old, char *new)
return true;
}
/*
* GUC assign_hook for neon.safekeepers. Restarts walproposer through FATAL if
* the list changed.
*/
static void
assign_neon_safekeepers(const char *newval, void *extra)
{
char *newval_copy;
char *oldval;
if (newval && *newval == '\0')
start_as_replica = true;
if (!am_walproposer)
return;
@@ -500,10 +509,6 @@ walprop_register_bgworker(void)
{
BackgroundWorker bgw;
/* If no wal acceptors are specified, don't start the background worker. */
if (*wal_acceptors_list == '\0')
return;
memset(&bgw, 0, sizeof(bgw));
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
@@ -514,7 +519,6 @@ walprop_register_bgworker(void)
bgw.bgw_restart_time = 1;
bgw.bgw_notify_pid = 0;
bgw.bgw_main_arg = (Datum) 0;
RegisterBackgroundWorker(&bgw);
}
@@ -1496,7 +1500,10 @@ walprop_pg_wal_reader_allocate(Safekeeper *sk)
snprintf(log_prefix, sizeof(log_prefix), WP_LOG_PREFIX "sk %s:%s nwr: ", sk->host, sk->port);
Assert(!sk->xlogreader);
sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propTermStartLsn, log_prefix);
/* note that WalProposer shouldn't access safekeepers when active */
sk->xlogreader = NeonWALReaderAllocate(wal_segment_size,
sk->wp->propTermStartLsn, log_prefix,
sk->wp->localTimeLineID);
if (sk->xlogreader == NULL)
wpg_log(FATAL, "failed to allocate xlog reader");
}
@@ -1510,7 +1517,7 @@ walprop_pg_wal_read(Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count,
buf,
startptr,
count,
walprop_pg_get_timeline_id());
sk->wp->localTimeLineID);
if (res == NEON_WALREAD_SUCCESS)
{
@@ -2009,6 +2016,7 @@ walprop_pg_get_redo_start_lsn(WalProposer *wp)
return GetRedoStartLsn();
}
static bool
walprop_pg_strong_random(WalProposer *wp, void *buf, size_t len)
{

View File

@@ -68,8 +68,7 @@ NeonWALReadWaitForWAL(XLogRecPtr loc)
}
static int
NeonWALPageRead(
XLogReaderState *xlogreader,
NeonWALPageRead(XLogReaderState *xlogreader,
XLogRecPtr targetPagePtr,
int reqLen,
XLogRecPtr targetRecPtr,
@@ -106,12 +105,11 @@ NeonWALPageRead(
for (;;)
{
NeonWALReadResult res = NeonWALRead(
wal_reader,
NeonWALReadResult res = NeonWALRead(wal_reader,
readBuf,
targetPagePtr,
count,
walprop_pg_get_timeline_id());
NeonWALReaderLocalActiveTimeLineID(wal_reader));
if (res == NEON_WALREAD_SUCCESS)
{
@@ -202,7 +200,8 @@ NeonOnDemandXLogReaderRoutines(XLogReaderRoutine *xlr)
{
elog(ERROR, "unable to start walsender when basebackupLsn is 0");
}
wal_reader = NeonWALReaderAllocate(wal_segment_size, basebackupLsn, "[walsender] ");
wal_reader = NeonWALReaderAllocate(wal_segment_size, basebackupLsn,
"[walsender] ", 1);
}
xlr->page_read = NeonWALPageRead;
xlr->segment_open = NeonWALReadSegmentOpen;

View File

@@ -4658,7 +4658,7 @@ class EndpointFactory:
origin: Endpoint,
endpoint_id: str | None = None,
config_lines: list[str] | None = None,
):
) -> Endpoint:
branch_name = origin.branch_name
assert origin in self.endpoints
assert branch_name is not None
@@ -4677,7 +4677,7 @@ class EndpointFactory:
origin: Endpoint,
endpoint_id: str | None = None,
config_lines: list[str] | None = None,
):
) -> Endpoint:
branch_name = origin.branch_name
assert origin in self.endpoints
assert branch_name is not None

View File

@@ -74,8 +74,9 @@ def test_hot_standby(neon_simple_env: NeonEnv):
for query in queries:
with s_con.cursor() as secondary_cursor:
secondary_cursor.execute(query)
response = secondary_cursor.fetchone()
assert response is not None
res = secondary_cursor.fetchone()
assert res is not None
response = res
assert response == responses[query]
# Check for corrupted WAL messages which might otherwise go unnoticed if
@@ -164,7 +165,7 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool):
s_cur.execute("SELECT COUNT(*) FROM test")
res = s_cur.fetchone()
assert res[0] == 10000
assert res == (10000,)
# Clear the cache in the standby, so that when we
# re-execute the query, it will make GetPage
@@ -195,7 +196,7 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool):
s_cur.execute("SELECT COUNT(*) FROM test")
log_replica_lag(primary, secondary)
res = s_cur.fetchone()
assert res[0] == 10000
assert res == (10000,)
def run_pgbench(connstr: str, pg_bin: PgBin):

View File

@@ -0,0 +1,130 @@
"""
File with secondary->primary promotion testing.
This far, only contains a test that we don't break and that the data is persisted.
"""
import psycopg2
from fixtures.log_helper import log
from fixtures.neon_fixtures import Endpoint, NeonEnv, wait_replica_caughtup, wait_for_last_flush_lsn
from fixtures.pg_version import PgVersion
from pytest import raises
def test_replica_promotes(neon_simple_env: NeonEnv, pg_version: PgVersion):
"""
Test that a replica safely promotes, and can commit data updates which
show up when the primary boots up after the promoted secondary endpoint
shut down.
"""
# Initialize the primary, a test table, and a helper function to create lots
# of subtransactions.
env: NeonEnv = neon_simple_env
primary: Endpoint = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
secondary: Endpoint = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
with primary.connect() as primary_conn:
primary_cur = primary_conn.cursor()
primary_cur.execute(
"create table t(pk bigint GENERATED ALWAYS AS IDENTITY, payload integer)"
)
primary_cur.execute("INSERT INTO t(payload) SELECT generate_series(1, 100)")
primary_cur.execute(
"""
SELECT pg_current_wal_insert_lsn(),
pg_current_wal_lsn(),
pg_current_wal_flush_lsn()
"""
)
log.info(f"Primary: Current LSN after workload is {primary_cur.fetchone()}")
wait_replica_caughtup(primary, secondary)
with secondary.connect() as secondary_conn:
secondary_cur = secondary_conn.cursor()
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (100,)
with raises(psycopg2.Error):
secondary_cur.execute("INSERT INTO t (payload) SELECT generate_series(101, 200)")
secondary_conn.commit()
secondary_conn.rollback()
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (100,)
primary.stop_and_destroy(mode="immediate")
primary = None
# Reconnect to the secondary to make sure we get a read-write connection
with secondary.connect() as promo_conn:
promo_cur = promo_conn.cursor()
promo_cur.execute("SELECT * FROM pg_promote()")
assert promo_cur.fetchone() == (True,)
promo_cur.execute(
"""
SELECT pg_current_wal_insert_lsn(),
pg_current_wal_lsn(),
pg_current_wal_flush_lsn()
"""
)
log.info(f"Secondary: LSN after promotion is {promo_cur.fetchone()}")
# Reconnect to the secondary to make sure we get a read-write connection
with secondary.connect() as new_primary_conn:
new_primary_cur = new_primary_conn.cursor()
new_primary_cur.execute("select count(*) from t")
assert new_primary_cur.fetchone() == (100,)
new_primary_cur.execute("INSERT INTO t (payload) SELECT generate_series(101, 200) RETURNING payload")
assert new_primary_cur.fetchall() == [(it,) for it in range(101, 201)]
new_primary_cur = new_primary_conn.cursor()
new_primary_cur.execute("select payload from t")
assert new_primary_cur.fetchall() == [(it,) for it in range(1, 201)]
new_primary_cur.execute("select count(*) from t")
assert new_primary_cur.fetchone() == (200,)
new_primary_cur.execute(
"""
SELECT pg_current_wal_insert_lsn(),
pg_current_wal_lsn(),
pg_current_wal_flush_lsn()
"""
)
log.info(f"Secondary: LSN after workload is {new_primary_cur.fetchone()}")
with secondary.connect() as second_viewpoint_conn:
new_primary_cur = second_viewpoint_conn.cursor()
new_primary_cur.execute("select payload from t")
assert new_primary_cur.fetchall() == [(it,) for it in range(1, 201)]
wait_for_last_flush_lsn(env, secondary, env.initial_tenant, env.initial_timeline)
secondary.stop_and_destroy()
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
with primary.connect() as new_primary:
new_primary_cur = new_primary.cursor()
new_primary_cur.execute(
"""
SELECT pg_current_wal_insert_lsn(),
pg_current_wal_lsn(),
pg_current_wal_flush_lsn()
"""
)
log.info(f"New primary: Boot LSN is {new_primary_cur.fetchone()}")
new_primary_cur.execute("select count(*) from t")
assert new_primary_cur.fetchone() == (200,)
new_primary_cur.execute("INSERT INTO t (payload) SELECT generate_series(201, 300)")
new_primary_cur.execute("select count(*) from t")
assert new_primary_cur.fetchone() == (300,)
primary.stop(mode="immediate")

View File

@@ -1,18 +1,18 @@
{
"v17": [
"17.5",
"8be779fd3ab9e87206da96a7e4842ef1abf04f44"
"28b88cfedf9a6028cf7ea32a80ea15a9bf971803"
],
"v16": [
"16.9",
"0bf96bd6d70301a0b43b0b3457bb3cf8fb43c198"
"3d6896afaa89c05b693e9a29275001b801b8b479"
],
"v15": [
"15.13",
"de7640f55da07512834d5cc40c4b3fb376b5f04f"
"f2cd31737536218c39c66f920ff62f72c51d0d61"
],
"v14": [
"14.18",
"55c0d45abe6467c02084c2192bca117eda6ce1e7"
"78c1568afbbacc1604ac5e5cc2ebc9b7ed0cfde9"
]
}