Compare commits

...

26 Commits

Author SHA1 Message Date
Mikhail Kot
00bb419ed5 initial 2025-06-05 18:11:05 +01:00
Konstantin Knizhnik
9624fbfeb1 Fix ThisTimeLineID in StartProposerReplication 2025-06-03 17:09:01 +01:00
Konstantin Knizhnik
1435f4e142 Make ruff happy 2025-06-03 17:09:01 +01:00
Konstantin Knizhnik
76ed9db896 Add test_replica_promotes.py 2025-06-03 17:09:01 +01:00
Konstantin Knizhnik
7cef29c663 Bump Postgre version 2025-06-03 17:09:01 +01:00
Konstantin Knizhnik
0c40f22516 Add check that shared mmeory was not detached 2025-06-03 17:09:01 +01:00
Konstantin Knizhnik
355c4568cf Move replica_promote flag to WalproposerShmemState 2025-06-03 17:09:01 +01:00
Konstantin Knizhnik
2fdcf3e49a Do not panic in WalProposerMain is safekeeper list is empty 2025-06-03 17:09:01 +01:00
Konstantin Knizhnik
411ae952c8 Add replicaPromote to WalProposerConfig 2025-06-03 17:09:01 +01:00
Konstantin Knizhnik
fe2c8c2f59 Add replicaPromote flag to walproposer config 2025-06-03 17:09:01 +01:00
Konstantin Knizhnik
22ac47a7cf Remove unused field 2025-06-03 17:09:01 +01:00
Konstantin Knizhnik
32e3b7dfd2 Do not explicitly launch wal_proposer: rely on BgWorkerStart_RecoveryFinished 2025-06-03 17:09:01 +01:00
Konstantin Knizhnik
2e35475c9a Remove special implementation of pg_promote for PG14 2025-06-03 17:09:01 +01:00
Konstantin Knizhnik
2dc9b4640c Make mypy happy 2025-06-03 17:09:01 +01:00
Konstantin Knizhnik
58f7662f65 Bump Postgres version 2025-06-03 17:09:01 +01:00
Konstantin Knizhnik
2ed5ba7ab2 Make mypy happy 2025-06-03 17:09:01 +01:00
Konstantin Knizhnik
04baa8bc3b Add priomote support for pg14-16 2025-06-03 17:09:01 +01:00
Konstantin Knizhnik
dc238f086b Add priomote support for pg14-16 2025-06-03 17:09:01 +01:00
Konstantin Knizhnik
a2633a8a6a Make ruff happy 2025-06-03 17:09:01 +01:00
Konstantin Knizhnik
ad524e2920 Make test_replica_promote.py pass at pg17 2025-06-03 17:09:01 +01:00
Konstantin Knizhnik
8caf3d6ae2 Undo adding set_redo_start_lsn function to walproposer API 2025-06-03 17:09:01 +01:00
Konstantin Knizhnik
6704883ded Some hacks for replica primotion 2025-06-03 17:09:01 +01:00
Konstantin Knizhnik
607813a1cf Start walproposer on replica promotion 2025-06-03 17:09:01 +01:00
Matthias van de Meent
bf00aed76c Update test_replica_promote.py 2025-06-03 17:09:01 +01:00
Matthias van de Meent
05677e1b78 Add test for replica promotion
This validates that replicas can promote, and start write changes,
and that these changes are also persisted.  However, this does not
check any less-than-happy paths.
2025-06-03 17:09:01 +01:00
Matthias van de Meent
29d9d558f1 Add test for replica promotion
This validates that replicas can promote, and start write changes,
and that these changes are also persisted.  However, this does not
check any less-than-happy paths.
2025-06-03 17:09:01 +01:00
21 changed files with 318 additions and 29 deletions

View File

@@ -3,7 +3,7 @@ use chrono::{DateTime, Utc};
use compute_api::privilege::Privilege;
use compute_api::responses::{
ComputeConfig, ComputeCtlConfig, ComputeMetrics, ComputeStatus, LfcOffloadState,
LfcPrewarmState,
LfcPrewarmState, PromoteState
};
use compute_api::spec::{
ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PgIdent,
@@ -161,6 +161,8 @@ pub struct ComputeState {
pub lfc_prewarm_state: LfcPrewarmState,
pub lfc_offload_state: LfcOffloadState,
pub promote_state: PromoteState,
pub metrics: ComputeMetrics,
}

View File

@@ -0,0 +1,26 @@
use compute_api::responses::{LfcOffloadState, PromoteState};
use crate::compute::ComputeNode;
impl ComputeNode {
pub async fn promote(&self) -> PromoteState {
{
let state = &mut self.state.lock().unwrap().promote_state;
if let PromoteState::Promoting =
std::mem::replace(state, PromoteState::Promoting)
{
return state;
}
}
// reference:: configure
// 1. Check if we're not primary
// 4. Check we have safekeepers list supplied from primary
// 2. Check we have prewarmed LFC
// 3. Wait for last LSN to be committed
// 4. Call pg_promote
if !matches!(self.lfc_offload_state(), LfcOffloadState::Completed) {
}
}
}

View File

@@ -10,7 +10,7 @@ pub(in crate::http) async fn prewarm_state(compute: Compute) -> Json<LfcPrewarmS
}
// Following functions are marked async for axum, as it's more convenient than wrapping these
// in async lambdas at call site
// in asyncr is:closed lambdas at call site
pub(in crate::http) async fn offload_state(compute: Compute) -> Json<LfcOffloadState> {
Json(compute.lfc_offload_state())

View File

@@ -12,6 +12,7 @@ pub(in crate::http) mod failpoints;
pub(in crate::http) mod grants;
pub(in crate::http) mod insights;
pub(in crate::http) mod lfc;
pub(in crate::http) mod promote;
pub(in crate::http) mod metrics;
pub(in crate::http) mod metrics_json;
pub(in crate::http) mod status;

View File

@@ -0,0 +1,17 @@
use axum::response::Response;
use compute_api::responses::PromoteState;
use http::StatusCode;
use crate::http::JsonResponse;
type Compute = axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>;
/// Returns only when promote failes or succeeds.
/// If a network error occurs, this does not stop promotion, and subsequent
/// calls block until first error or success
pub(in crate::http) async fn promote(compute: Compute) -> Response {
let state = compute.promote().await;
if let PromoteState::Failed { error } = state {
return JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, error);
}
JsonResponse::success(StatusCode::OK, state)
}

View File

@@ -23,7 +23,7 @@ use super::{
middleware::authorize::Authorize,
routes::{
check_writability, configure, database_schema, dbs_and_roles, extension_server, extensions,
grants, insights, lfc, metrics, metrics_json, status, terminate,
grants, insights, lfc, metrics, metrics_json, promote, status, terminate,
},
};
use crate::compute::ComputeNode;
@@ -87,6 +87,7 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
let authenticated_router = Router::<Arc<ComputeNode>>::new()
.route("/lfc/prewarm", get(lfc::prewarm_state).post(lfc::prewarm))
.route("/lfc/offload", get(lfc::offload_state).post(lfc::offload))
.route("/promote", post(promote::promote))
.route("/check_writability", post(check_writability::is_writable))
.route("/configure", post(configure::configure))
.route("/database_schema", get(database_schema::get_schema_dump))

View File

@@ -12,6 +12,7 @@ pub mod logger;
pub mod catalog;
pub mod compute;
pub mod compute_prewarm;
pub mod compute_promote;
pub mod disk_quota;
pub mod extension_server;
pub mod installed_extensions;

View File

@@ -70,6 +70,18 @@ pub enum LfcOffloadState {
},
}
#[derive(Serialize, Default, Debug, Clone)]
#[serde(tag = "status", rename_all = "snake_case")]
pub enum PromoteState {
#[default]
NotPromoted,
Promoting,
Completed,
Failed {
error: String,
},
}
/// Response of the /status API
#[derive(Serialize, Debug, Deserialize)]
#[serde(rename_all = "snake_case")]

View File

@@ -439,6 +439,7 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState {
currentClusterSize: crate::bindings::pg_atomic_uint64 { value: 0 },
shard_ps_feedback: [empty_feedback; 128],
num_shards: 0,
replica_promote: false,
min_ps_feedback: empty_feedback,
}
}

View File

@@ -1380,7 +1380,7 @@ ProcessPropStartPos(WalProposer *wp)
* we must bail out, as clog and other non rel data is inconsistent.
*/
walprop_shared = wp->api.get_shmem_state(wp);
if (!wp->config->syncSafekeepers)
if (!wp->config->syncSafekeepers && !walprop_shared->replica_promote)
{
/*
* Basebackup LSN always points to the beginning of the record (not

View File

@@ -391,6 +391,7 @@ typedef struct WalproposerShmemState
/* last feedback from each shard */
PageserverFeedback shard_ps_feedback[MAX_SHARDS];
int num_shards;
bool replica_promote;
/* aggregated feedback with min LSNs across shards */
PageserverFeedback min_ps_feedback;

View File

@@ -35,6 +35,7 @@
#include "storage/proc.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
#include "storage/pg_shmem.h"
#include "storage/shmem.h"
#include "storage/spin.h"
#include "tcop/tcopprot.h"
@@ -159,6 +160,12 @@ WalProposerMain(Datum main_arg)
{
WalProposer *wp;
if (*wal_acceptors_list == '\0')
{
wpg_log(WARNING, "Safekeepers list is empty");
return;
}
init_walprop_config(false);
walprop_pg_init_bgworker();
am_walproposer = true;
@@ -306,16 +313,15 @@ safekeepers_cmp(char *old, char *new)
return true;
}
/*
* GUC assign_hook for neon.safekeepers. Restarts walproposer through FATAL if
* the list changed.
*/
static void
assign_neon_safekeepers(const char *newval, void *extra)
{
char *newval_copy;
char *oldval;
if (newval && *newval != '\0' && UsedShmemSegAddr && walprop_shared && RecoveryInProgress())
walprop_shared->replica_promote = true;
if (!am_walproposer)
return;
@@ -512,10 +518,6 @@ walprop_register_bgworker(void)
{
BackgroundWorker bgw;
/* If no wal acceptors are specified, don't start the background worker. */
if (*wal_acceptors_list == '\0')
return;
memset(&bgw, 0, sizeof(bgw));
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
@@ -1292,9 +1294,7 @@ StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd)
#if PG_VERSION_NUM < 150000
if (ThisTimeLineID == 0)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
ThisTimeLineID = 1;
#endif
/*

View File

@@ -4710,7 +4710,7 @@ class EndpointFactory:
origin: Endpoint,
endpoint_id: str | None = None,
config_lines: list[str] | None = None,
):
) -> Endpoint:
branch_name = origin.branch_name
assert origin in self.endpoints
assert branch_name is not None
@@ -4729,7 +4729,7 @@ class EndpointFactory:
origin: Endpoint,
endpoint_id: str | None = None,
config_lines: list[str] | None = None,
):
) -> Endpoint:
branch_name = origin.branch_name
assert origin in self.endpoints
assert branch_name is not None

View File

@@ -74,8 +74,9 @@ def test_hot_standby(neon_simple_env: NeonEnv):
for query in queries:
with s_con.cursor() as secondary_cursor:
secondary_cursor.execute(query)
response = secondary_cursor.fetchone()
assert response is not None
res = secondary_cursor.fetchone()
assert res is not None
response = res
assert response == responses[query]
# Check for corrupted WAL messages which might otherwise go unnoticed if
@@ -164,7 +165,7 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool):
s_cur.execute("SELECT COUNT(*) FROM test")
res = s_cur.fetchone()
assert res[0] == 10000
assert res == (10000,)
# Clear the cache in the standby, so that when we
# re-execute the query, it will make GetPage
@@ -195,7 +196,7 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool):
s_cur.execute("SELECT COUNT(*) FROM test")
log_replica_lag(primary, secondary)
res = s_cur.fetchone()
assert res[0] == 10000
assert res == (10000,)
def run_pgbench(connstr: str, pg_bin: PgBin):

View File

@@ -0,0 +1,93 @@
"""
File with secondary->primary promotion testing.
This far, only contains a test that we don't break and that the data is persisted.
"""
import psycopg2
from fixtures.neon_fixtures import Endpoint, NeonEnv, wait_replica_caughtup
from fixtures.pg_version import PgVersion
from pytest import raises
def test_replica_promotes(neon_simple_env: NeonEnv, pg_version: PgVersion):
"""
Test that a replica safely promotes, and can commit data updates which
show up when the primary boots up after the promoted secondary endpoint
shut down.
"""
# Initialize the primary, a test table, and a helper function to create lots
# of subtransactions.
env: NeonEnv = neon_simple_env
primary: Endpoint = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
secondary: Endpoint = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
with primary.connect() as primary_conn:
primary_cur = primary_conn.cursor()
primary_cur.execute(
"create table t(pk bigint GENERATED ALWAYS AS IDENTITY, payload integer)"
)
primary_cur.execute("INSERT INTO t(payload) SELECT generate_series(1, 100)")
primary_cur.execute("select pg_switch_wal()")
primary_cur.execute("show neon.safekeepers")
safekeepers = primary_cur.fetchall()[0][0]
wait_replica_caughtup(primary, secondary)
with secondary.connect() as secondary_conn:
secondary_cur = secondary_conn.cursor()
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (100,)
with raises(psycopg2.Error):
secondary_cur.execute("INSERT INTO t (payload) SELECT generate_series(101, 200)")
secondary_conn.commit()
secondary_conn.rollback()
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (100,)
wait_replica_caughtup(primary, secondary)
primary.stop()
secondary_conn = secondary.connect()
secondary_cur = secondary_conn.cursor()
secondary_cur.execute(f"alter system set neon.safekeepers='{safekeepers}'")
secondary_cur.execute("select pg_reload_conf()")
secondary_cur.execute("SELECT * FROM pg_promote()")
assert secondary_cur.fetchone() == (True,)
new_primary = secondary
old_primary = primary
new_primary_conn = new_primary.connect()
new_primary_cur = new_primary_conn.cursor()
new_primary_cur.execute("INSERT INTO t (payload) SELECT generate_series(101, 200)")
new_primary_cur.execute("select count(*) from t")
assert new_primary_cur.fetchone() == (200,)
new_primary.stop(mode="immediate")
new_primary.respec(mode="Primary")
new_primary.start()
with new_primary.connect() as new_primary_conn:
new_primary_cur = new_primary_conn.cursor()
new_primary_cur.execute("select count(*) from t")
assert new_primary_cur.fetchone() == (200,)
new_primary_cur.execute("INSERT INTO t (payload) SELECT generate_series(201, 300)")
new_primary.stop()
old_primary.start()
with old_primary.connect() as old_primary_conn:
old_primary_cur = old_primary_conn.cursor()
old_primary_cur.execute("select count(*) from t")
assert old_primary_cur.fetchone() == (300,)
old_primary_cur.execute("INSERT INTO t (payload) SELECT generate_series(301, 400)")
old_primary_cur.execute("select count(*) from t")
assert old_primary_cur.fetchone() == (400,)

View File

@@ -0,0 +1,133 @@
"""
File with secondary->primary promotion testing.
This far, only contains a test that we don't break and that the data is persisted.
"""
import psycopg2
from fixtures.log_helper import log
from fixtures.neon_fixtures import Endpoint, NeonEnv, wait_replica_caughtup
from fixtures.pg_version import PgVersion
from pytest import raises
def test_replica_promotes(neon_simple_env: NeonEnv, pg_version: PgVersion):
"""
Test that a replica safely promotes, and can commit data updates which
show up when the primary boots up after the promoted secondary endpoint
shut down.
"""
# Initialize the primary, a test table, and a helper function to create lots
# of subtransactions.
env: NeonEnv = neon_simple_env
primary: Endpoint = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
secondary: Endpoint = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
with primary.connect() as primary_conn:
primary_cur = primary_conn.cursor()
primary_cur.execute(
"create table t(pk bigint GENERATED ALWAYS AS IDENTITY, payload integer)"
)
primary_cur.execute("INSERT INTO t(payload) SELECT generate_series(1, 100)")
primary_cur.execute(
"""
SELECT pg_current_wal_insert_lsn(),
pg_current_wal_lsn(),
pg_current_wal_flush_lsn()
"""
)
log.info(f"Primary: Current LSN after workload is {primary_cur.fetchone()}")
primary_cur.execute("show neon.safekeepers")
safekeepers = primary_cur.fetchall()[0][0]
wait_replica_caughtup(primary, secondary)
with secondary.connect() as secondary_conn:
secondary_cur = secondary_conn.cursor()
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (100,)
with raises(psycopg2.Error):
secondary_cur.execute("INSERT INTO t (payload) SELECT generate_series(101, 200)")
secondary_conn.commit()
secondary_conn.rollback()
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (100,)
primary.stop_and_destroy(mode="immediate")
# Reconnect to the secondary to make sure we get a read-write connection
promo_conn = secondary.connect()
promo_cur = promo_conn.cursor()
promo_cur.execute(f"alter system set neon.safekeepers='{safekeepers}'")
promo_cur.execute("select pg_reload_conf()")
promo_cur.execute("SELECT * FROM pg_promote()")
assert promo_cur.fetchone() == (True,)
promo_cur.execute(
"""
SELECT pg_current_wal_insert_lsn(),
pg_current_wal_lsn(),
pg_current_wal_flush_lsn()
"""
)
log.info(f"Secondary: LSN after promotion is {promo_cur.fetchone()}")
# Reconnect to the secondary to make sure we get a read-write connection
with secondary.connect() as new_primary_conn:
new_primary_cur = new_primary_conn.cursor()
new_primary_cur.execute("select count(*) from t")
assert new_primary_cur.fetchone() == (100,)
new_primary_cur.execute(
"INSERT INTO t (payload) SELECT generate_series(101, 200) RETURNING payload"
)
assert new_primary_cur.fetchall() == [(it,) for it in range(101, 201)]
new_primary_cur = new_primary_conn.cursor()
new_primary_cur.execute("select payload from t")
assert new_primary_cur.fetchall() == [(it,) for it in range(1, 201)]
new_primary_cur.execute("select count(*) from t")
assert new_primary_cur.fetchone() == (200,)
new_primary_cur.execute(
"""
SELECT pg_current_wal_insert_lsn(),
pg_current_wal_lsn(),
pg_current_wal_flush_lsn()
"""
)
log.info(f"Secondary: LSN after workload is {new_primary_cur.fetchone()}")
with secondary.connect() as second_viewpoint_conn:
new_primary_cur = second_viewpoint_conn.cursor()
new_primary_cur.execute("select payload from t")
assert new_primary_cur.fetchall() == [(it,) for it in range(1, 201)]
# wait_for_last_flush_lsn(env, secondary, env.initial_tenant, env.initial_timeline)
secondary.stop_and_destroy()
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
with primary.connect() as new_primary:
new_primary_cur = new_primary.cursor()
new_primary_cur.execute(
"""
SELECT pg_current_wal_insert_lsn(),
pg_current_wal_lsn(),
pg_current_wal_flush_lsn()
"""
)
log.info(f"New primary: Boot LSN is {new_primary_cur.fetchone()}")
new_primary_cur.execute("select count(*) from t")
assert new_primary_cur.fetchone() == (200,)
new_primary_cur.execute("INSERT INTO t (payload) SELECT generate_series(201, 300)")
new_primary_cur.execute("select count(*) from t")
assert new_primary_cur.fetchone() == (300,)
primary.stop(mode="immediate")

View File

@@ -1,18 +1,18 @@
{
"v17": [
"17.5",
"8be779fd3ab9e87206da96a7e4842ef1abf04f44"
"32d704d965d8ad632c0ddef64b45a5ba95536442"
],
"v16": [
"16.9",
"0bf96bd6d70301a0b43b0b3457bb3cf8fb43c198"
"77c63bfebff5c833682cc2654e2191fec4d5b24e"
],
"v15": [
"15.13",
"de7640f55da07512834d5cc40c4b3fb376b5f04f"
"20f8491225f86bdedbc986e9a69ebafb1c94aa99"
],
"v14": [
"14.18",
"55c0d45abe6467c02084c2192bca117eda6ce1e7"
"b6eece3f528fdc380e6e2c13381434470606787f"
]
}