mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-14 03:30:36 +00:00
Compare commits
29 Commits
ephemerals
...
MMeent/tes
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bd3f4f9638 | ||
|
|
a5af7d9d63 | ||
|
|
1c4a851549 | ||
|
|
af0b3db6a0 | ||
|
|
d0dce0c995 | ||
|
|
6ed73517ab | ||
|
|
9406cf7b20 | ||
|
|
5dba897c3a | ||
|
|
a9d0ec4cab | ||
|
|
efb58f0743 | ||
|
|
d33fbeee06 | ||
|
|
684bf85ca2 | ||
|
|
5ef5bc6c2b | ||
|
|
e084c12ef6 | ||
|
|
9bc82d7ad2 | ||
|
|
279c6c0417 | ||
|
|
dc4388580e | ||
|
|
2d7a3a6f0f | ||
|
|
f7bdc138c2 | ||
|
|
2cb1e43604 | ||
|
|
cdc8057a7e | ||
|
|
a3d88258a7 | ||
|
|
a5d45bceed | ||
|
|
d7cecc485c | ||
|
|
b94054dca0 | ||
|
|
41adde29d7 | ||
|
|
45649ccd62 | ||
|
|
19c5eb53e5 | ||
|
|
397e030fb0 |
@@ -616,19 +616,17 @@ impl Endpoint {
|
||||
|
||||
/// Map safekeepers ids to the actual connection strings.
|
||||
fn build_safekeepers_connstrs(&self, sk_ids: Vec<NodeId>) -> Result<Vec<String>> {
|
||||
let mut safekeeper_connstrings = Vec::new();
|
||||
if self.mode == ComputeMode::Primary {
|
||||
for sk_id in sk_ids {
|
||||
let sk = self
|
||||
.env
|
||||
sk_ids
|
||||
.into_iter()
|
||||
.map(|node_id| {
|
||||
self.env
|
||||
.safekeepers
|
||||
.iter()
|
||||
.find(|node| node.id == sk_id)
|
||||
.ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
|
||||
safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
|
||||
}
|
||||
}
|
||||
Ok(safekeeper_connstrings)
|
||||
.find(|node| node.id == node_id)
|
||||
.map(|node| format!("127.0.0.1:{}", node.get_compute_port()))
|
||||
.ok_or_else(|| anyhow!("safekeeer {node_id} does not exist"))
|
||||
})
|
||||
.collect::<Result<Vec<String>>>()
|
||||
}
|
||||
|
||||
/// Generate a JWT with the correct claims.
|
||||
|
||||
@@ -439,6 +439,7 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState {
|
||||
currentClusterSize: crate::bindings::pg_atomic_uint64 { value: 0 },
|
||||
shard_ps_feedback: [empty_feedback; 128],
|
||||
num_shards: 0,
|
||||
replica_promote: false,
|
||||
min_ps_feedback: empty_feedback,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -69,6 +69,7 @@ struct NeonWALReader
|
||||
WALSegmentContext segcxt;
|
||||
WALOpenSegment seg;
|
||||
int wre_errno;
|
||||
TimeLineID local_active_tlid;
|
||||
/* Explains failure to read, static for simplicity. */
|
||||
char err_msg[NEON_WALREADER_ERR_MSG_LEN];
|
||||
|
||||
@@ -106,7 +107,8 @@ struct NeonWALReader
|
||||
|
||||
/* palloc and initialize NeonWALReader */
|
||||
NeonWALReader *
|
||||
NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, char *log_prefix)
|
||||
NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn,
|
||||
char *log_prefix, TimeLineID tlid)
|
||||
{
|
||||
NeonWALReader *reader;
|
||||
|
||||
@@ -118,6 +120,7 @@ NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, char *log_
|
||||
MemoryContextAllocZero(TopMemoryContext, sizeof(NeonWALReader));
|
||||
|
||||
reader->available_lsn = available_lsn;
|
||||
reader->local_active_tlid = tlid;
|
||||
reader->seg.ws_file = -1;
|
||||
reader->seg.ws_segno = 0;
|
||||
reader->seg.ws_tli = 0;
|
||||
@@ -577,6 +580,17 @@ NeonWALReaderIsRemConnEstablished(NeonWALReader *state)
|
||||
return state->rem_state == RS_ESTABLISHED;
|
||||
}
|
||||
|
||||
/*
|
||||
* Whether remote connection is established. Once this is done, until successful
|
||||
* local read or error socket is stable and user can update socket events
|
||||
* instead of readding it each time.
|
||||
*/
|
||||
TimeLineID
|
||||
NeonWALReaderLocalActiveTimeLineID(NeonWALReader *state)
|
||||
{
|
||||
return state->local_active_tlid;
|
||||
}
|
||||
|
||||
/*
|
||||
* Returns events user should wait on connection socket or 0 if remote
|
||||
* connection is not active.
|
||||
|
||||
@@ -19,9 +19,12 @@ typedef enum
|
||||
NEON_WALREAD_ERROR,
|
||||
} NeonWALReadResult;
|
||||
|
||||
extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, char *log_prefix);
|
||||
extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size,
|
||||
XLogRecPtr available_lsn,
|
||||
char *log_prefix, TimeLineID tlid);
|
||||
extern void NeonWALReaderFree(NeonWALReader *state);
|
||||
extern void NeonWALReaderResetRemote(NeonWALReader *state);
|
||||
extern TimeLineID NeonWALReaderLocalActiveTimeLineID(NeonWALReader *state);
|
||||
extern NeonWALReadResult NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
|
||||
extern pgsocket NeonWALReaderSocket(NeonWALReader *state);
|
||||
extern uint32 NeonWALReaderEvents(NeonWALReader *state);
|
||||
|
||||
@@ -98,6 +98,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
|
||||
wp = palloc0(sizeof(WalProposer));
|
||||
wp->config = config;
|
||||
wp->api = api;
|
||||
wp->localTimeLineID = config->pgTimeline;
|
||||
wp->state = WPS_COLLECTING_TERMS;
|
||||
wp->mconf.generation = INVALID_GENERATION;
|
||||
wp->mconf.members.len = 0;
|
||||
@@ -1379,7 +1380,7 @@ ProcessPropStartPos(WalProposer *wp)
|
||||
* we must bail out, as clog and other non rel data is inconsistent.
|
||||
*/
|
||||
walprop_shared = wp->api.get_shmem_state(wp);
|
||||
if (!wp->config->syncSafekeepers)
|
||||
if (!wp->config->syncSafekeepers && !walprop_shared->replica_promote)
|
||||
{
|
||||
/*
|
||||
* Basebackup LSN always points to the beginning of the record (not
|
||||
|
||||
@@ -391,6 +391,7 @@ typedef struct WalproposerShmemState
|
||||
/* last feedback from each shard */
|
||||
PageserverFeedback shard_ps_feedback[MAX_SHARDS];
|
||||
int num_shards;
|
||||
bool replica_promote;
|
||||
|
||||
/* aggregated feedback with min LSNs across shards */
|
||||
PageserverFeedback min_ps_feedback;
|
||||
@@ -805,6 +806,8 @@ typedef struct WalProposer
|
||||
|
||||
/* WAL has been generated up to this point */
|
||||
XLogRecPtr availableLsn;
|
||||
/* Current local TimeLineId in use */
|
||||
TimeLineID localTimeLineID;
|
||||
|
||||
/* cached GetAcknowledgedByQuorumWALPosition result */
|
||||
XLogRecPtr commitLsn;
|
||||
|
||||
@@ -35,6 +35,7 @@
|
||||
#include "storage/proc.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/lwlock.h"
|
||||
#include "storage/pg_shmem.h"
|
||||
#include "storage/shmem.h"
|
||||
#include "storage/spin.h"
|
||||
#include "tcop/tcopprot.h"
|
||||
@@ -157,12 +158,23 @@ WalProposerMain(Datum main_arg)
|
||||
{
|
||||
WalProposer *wp;
|
||||
|
||||
if (*wal_acceptors_list == '\0')
|
||||
{
|
||||
wpg_log(WARNING, "Safekeepers list is empty");
|
||||
return;
|
||||
}
|
||||
|
||||
init_walprop_config(false);
|
||||
walprop_pg_init_bgworker();
|
||||
am_walproposer = true;
|
||||
walprop_pg_load_libpqwalreceiver();
|
||||
|
||||
wp = WalProposerCreate(&walprop_config, walprop_pg);
|
||||
#if PG_MAJORVERSION_NUM < 15
|
||||
wp->localTimeLineID = ThisTimeLineID;
|
||||
#else
|
||||
wp->localTimeLineID = GetWALInsertionTimeLine();
|
||||
#endif
|
||||
wp->last_reconnect_attempt = walprop_pg_get_current_timestamp(wp);
|
||||
|
||||
walprop_pg_init_walsender();
|
||||
@@ -294,16 +306,15 @@ safekeepers_cmp(char *old, char *new)
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* GUC assign_hook for neon.safekeepers. Restarts walproposer through FATAL if
|
||||
* the list changed.
|
||||
*/
|
||||
static void
|
||||
assign_neon_safekeepers(const char *newval, void *extra)
|
||||
{
|
||||
char *newval_copy;
|
||||
char *oldval;
|
||||
|
||||
if (newval && *newval != '\0' && UsedShmemSegAddr && walprop_shared && RecoveryInProgress())
|
||||
walprop_shared->replica_promote = true;
|
||||
|
||||
if (!am_walproposer)
|
||||
return;
|
||||
|
||||
@@ -500,10 +511,6 @@ walprop_register_bgworker(void)
|
||||
{
|
||||
BackgroundWorker bgw;
|
||||
|
||||
/* If no wal acceptors are specified, don't start the background worker. */
|
||||
if (*wal_acceptors_list == '\0')
|
||||
return;
|
||||
|
||||
memset(&bgw, 0, sizeof(bgw));
|
||||
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
|
||||
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
|
||||
@@ -1496,7 +1503,10 @@ walprop_pg_wal_reader_allocate(Safekeeper *sk)
|
||||
|
||||
snprintf(log_prefix, sizeof(log_prefix), WP_LOG_PREFIX "sk %s:%s nwr: ", sk->host, sk->port);
|
||||
Assert(!sk->xlogreader);
|
||||
sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propTermStartLsn, log_prefix);
|
||||
/* note that WalProposer shouldn't access safekeepers when active */
|
||||
sk->xlogreader = NeonWALReaderAllocate(wal_segment_size,
|
||||
sk->wp->propTermStartLsn, log_prefix,
|
||||
sk->wp->localTimeLineID);
|
||||
if (sk->xlogreader == NULL)
|
||||
wpg_log(FATAL, "failed to allocate xlog reader");
|
||||
}
|
||||
@@ -1510,7 +1520,7 @@ walprop_pg_wal_read(Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count,
|
||||
buf,
|
||||
startptr,
|
||||
count,
|
||||
walprop_pg_get_timeline_id());
|
||||
sk->wp->localTimeLineID);
|
||||
|
||||
if (res == NEON_WALREAD_SUCCESS)
|
||||
{
|
||||
|
||||
@@ -68,8 +68,7 @@ NeonWALReadWaitForWAL(XLogRecPtr loc)
|
||||
}
|
||||
|
||||
static int
|
||||
NeonWALPageRead(
|
||||
XLogReaderState *xlogreader,
|
||||
NeonWALPageRead(XLogReaderState *xlogreader,
|
||||
XLogRecPtr targetPagePtr,
|
||||
int reqLen,
|
||||
XLogRecPtr targetRecPtr,
|
||||
@@ -106,12 +105,11 @@ NeonWALPageRead(
|
||||
|
||||
for (;;)
|
||||
{
|
||||
NeonWALReadResult res = NeonWALRead(
|
||||
wal_reader,
|
||||
NeonWALReadResult res = NeonWALRead(wal_reader,
|
||||
readBuf,
|
||||
targetPagePtr,
|
||||
count,
|
||||
walprop_pg_get_timeline_id());
|
||||
NeonWALReaderLocalActiveTimeLineID(wal_reader));
|
||||
|
||||
if (res == NEON_WALREAD_SUCCESS)
|
||||
{
|
||||
@@ -202,7 +200,8 @@ NeonOnDemandXLogReaderRoutines(XLogReaderRoutine *xlr)
|
||||
{
|
||||
elog(ERROR, "unable to start walsender when basebackupLsn is 0");
|
||||
}
|
||||
wal_reader = NeonWALReaderAllocate(wal_segment_size, basebackupLsn, "[walsender] ");
|
||||
wal_reader = NeonWALReaderAllocate(wal_segment_size, basebackupLsn,
|
||||
"[walsender] ", 1);
|
||||
}
|
||||
xlr->page_read = NeonWALPageRead;
|
||||
xlr->segment_open = NeonWALReadSegmentOpen;
|
||||
|
||||
@@ -4658,7 +4658,7 @@ class EndpointFactory:
|
||||
origin: Endpoint,
|
||||
endpoint_id: str | None = None,
|
||||
config_lines: list[str] | None = None,
|
||||
):
|
||||
) -> Endpoint:
|
||||
branch_name = origin.branch_name
|
||||
assert origin in self.endpoints
|
||||
assert branch_name is not None
|
||||
@@ -4677,7 +4677,7 @@ class EndpointFactory:
|
||||
origin: Endpoint,
|
||||
endpoint_id: str | None = None,
|
||||
config_lines: list[str] | None = None,
|
||||
):
|
||||
) -> Endpoint:
|
||||
branch_name = origin.branch_name
|
||||
assert origin in self.endpoints
|
||||
assert branch_name is not None
|
||||
|
||||
@@ -74,8 +74,9 @@ def test_hot_standby(neon_simple_env: NeonEnv):
|
||||
for query in queries:
|
||||
with s_con.cursor() as secondary_cursor:
|
||||
secondary_cursor.execute(query)
|
||||
response = secondary_cursor.fetchone()
|
||||
assert response is not None
|
||||
res = secondary_cursor.fetchone()
|
||||
assert res is not None
|
||||
response = res
|
||||
assert response == responses[query]
|
||||
|
||||
# Check for corrupted WAL messages which might otherwise go unnoticed if
|
||||
@@ -164,7 +165,7 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool):
|
||||
|
||||
s_cur.execute("SELECT COUNT(*) FROM test")
|
||||
res = s_cur.fetchone()
|
||||
assert res[0] == 10000
|
||||
assert res == (10000,)
|
||||
|
||||
# Clear the cache in the standby, so that when we
|
||||
# re-execute the query, it will make GetPage
|
||||
@@ -195,7 +196,7 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool):
|
||||
s_cur.execute("SELECT COUNT(*) FROM test")
|
||||
log_replica_lag(primary, secondary)
|
||||
res = s_cur.fetchone()
|
||||
assert res[0] == 10000
|
||||
assert res == (10000,)
|
||||
|
||||
|
||||
def run_pgbench(connstr: str, pg_bin: PgBin):
|
||||
|
||||
129
test_runner/regress/test_replica_promotes.py
Normal file
129
test_runner/regress/test_replica_promotes.py
Normal file
@@ -0,0 +1,129 @@
|
||||
"""
|
||||
File with secondary->primary promotion testing.
|
||||
|
||||
This far, only contains a test that we don't break and that the data is persisted.
|
||||
"""
|
||||
|
||||
import psycopg2
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv, wait_for_last_flush_lsn, wait_replica_caughtup
|
||||
from fixtures.pg_version import PgVersion
|
||||
from pytest import raises
|
||||
|
||||
|
||||
def test_replica_promotes(neon_simple_env: NeonEnv, pg_version: PgVersion):
|
||||
"""
|
||||
Test that a replica safely promotes, and can commit data updates which
|
||||
show up when the primary boots up after the promoted secondary endpoint
|
||||
shut down.
|
||||
"""
|
||||
|
||||
# Initialize the primary, a test table, and a helper function to create lots
|
||||
# of subtransactions.
|
||||
env: NeonEnv = neon_simple_env
|
||||
primary: Endpoint = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
secondary: Endpoint = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
|
||||
|
||||
with primary.connect() as primary_conn:
|
||||
primary_cur = primary_conn.cursor()
|
||||
primary_cur.execute(
|
||||
"create table t(pk bigint GENERATED ALWAYS AS IDENTITY, payload integer)"
|
||||
)
|
||||
primary_cur.execute("INSERT INTO t(payload) SELECT generate_series(1, 100)")
|
||||
primary_cur.execute(
|
||||
"""
|
||||
SELECT pg_current_wal_insert_lsn(),
|
||||
pg_current_wal_lsn(),
|
||||
pg_current_wal_flush_lsn()
|
||||
"""
|
||||
)
|
||||
log.info(f"Primary: Current LSN after workload is {primary_cur.fetchone()}")
|
||||
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
|
||||
with secondary.connect() as secondary_conn:
|
||||
secondary_cur = secondary_conn.cursor()
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
|
||||
assert secondary_cur.fetchone() == (100,)
|
||||
|
||||
with raises(psycopg2.Error):
|
||||
secondary_cur.execute("INSERT INTO t (payload) SELECT generate_series(101, 200)")
|
||||
secondary_conn.commit()
|
||||
|
||||
secondary_conn.rollback()
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (100,)
|
||||
|
||||
primary.stop_and_destroy(mode="immediate")
|
||||
|
||||
# Reconnect to the secondary to make sure we get a read-write connection
|
||||
with secondary.connect() as promo_conn:
|
||||
promo_cur = promo_conn.cursor()
|
||||
|
||||
promo_cur.execute("SELECT * FROM pg_promote()")
|
||||
assert promo_cur.fetchone() == (True,)
|
||||
promo_cur.execute(
|
||||
"""
|
||||
SELECT pg_current_wal_insert_lsn(),
|
||||
pg_current_wal_lsn(),
|
||||
pg_current_wal_flush_lsn()
|
||||
"""
|
||||
)
|
||||
log.info(f"Secondary: LSN after promotion is {promo_cur.fetchone()}")
|
||||
|
||||
# Reconnect to the secondary to make sure we get a read-write connection
|
||||
with secondary.connect() as new_primary_conn:
|
||||
new_primary_cur = new_primary_conn.cursor()
|
||||
new_primary_cur.execute("select count(*) from t")
|
||||
assert new_primary_cur.fetchone() == (100,)
|
||||
|
||||
new_primary_cur.execute(
|
||||
"INSERT INTO t (payload) SELECT generate_series(101, 200) RETURNING payload"
|
||||
)
|
||||
assert new_primary_cur.fetchall() == [(it,) for it in range(101, 201)]
|
||||
|
||||
new_primary_cur = new_primary_conn.cursor()
|
||||
new_primary_cur.execute("select payload from t")
|
||||
assert new_primary_cur.fetchall() == [(it,) for it in range(1, 201)]
|
||||
|
||||
new_primary_cur.execute("select count(*) from t")
|
||||
assert new_primary_cur.fetchone() == (200,)
|
||||
new_primary_cur.execute(
|
||||
"""
|
||||
SELECT pg_current_wal_insert_lsn(),
|
||||
pg_current_wal_lsn(),
|
||||
pg_current_wal_flush_lsn()
|
||||
"""
|
||||
)
|
||||
log.info(f"Secondary: LSN after workload is {new_primary_cur.fetchone()}")
|
||||
|
||||
with secondary.connect() as second_viewpoint_conn:
|
||||
new_primary_cur = second_viewpoint_conn.cursor()
|
||||
new_primary_cur.execute("select payload from t")
|
||||
assert new_primary_cur.fetchall() == [(it,) for it in range(1, 201)]
|
||||
|
||||
wait_for_last_flush_lsn(env, secondary, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
secondary.stop_and_destroy()
|
||||
|
||||
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
|
||||
with primary.connect() as new_primary:
|
||||
new_primary_cur = new_primary.cursor()
|
||||
new_primary_cur.execute(
|
||||
"""
|
||||
SELECT pg_current_wal_insert_lsn(),
|
||||
pg_current_wal_lsn(),
|
||||
pg_current_wal_flush_lsn()
|
||||
"""
|
||||
)
|
||||
log.info(f"New primary: Boot LSN is {new_primary_cur.fetchone()}")
|
||||
|
||||
new_primary_cur.execute("select count(*) from t")
|
||||
assert new_primary_cur.fetchone() == (200,)
|
||||
new_primary_cur.execute("INSERT INTO t (payload) SELECT generate_series(201, 300)")
|
||||
new_primary_cur.execute("select count(*) from t")
|
||||
assert new_primary_cur.fetchone() == (300,)
|
||||
|
||||
primary.stop(mode="immediate")
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 55c0d45abe...b6eece3f52
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: de7640f55d...20f8491225
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 0bf96bd6d7...77c63bfebf
2
vendor/postgres-v17
vendored
2
vendor/postgres-v17
vendored
Submodule vendor/postgres-v17 updated: 8be779fd3a...32d704d965
8
vendor/revisions.json
vendored
8
vendor/revisions.json
vendored
@@ -1,18 +1,18 @@
|
||||
{
|
||||
"v17": [
|
||||
"17.5",
|
||||
"8be779fd3ab9e87206da96a7e4842ef1abf04f44"
|
||||
"32d704d965d8ad632c0ddef64b45a5ba95536442"
|
||||
],
|
||||
"v16": [
|
||||
"16.9",
|
||||
"0bf96bd6d70301a0b43b0b3457bb3cf8fb43c198"
|
||||
"77c63bfebff5c833682cc2654e2191fec4d5b24e"
|
||||
],
|
||||
"v15": [
|
||||
"15.13",
|
||||
"de7640f55da07512834d5cc40c4b3fb376b5f04f"
|
||||
"20f8491225f86bdedbc986e9a69ebafb1c94aa99"
|
||||
],
|
||||
"v14": [
|
||||
"14.18",
|
||||
"55c0d45abe6467c02084c2192bca117eda6ce1e7"
|
||||
"b6eece3f528fdc380e6e2c13381434470606787f"
|
||||
]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user