Accept primary compute spec in /promote, promotion corner cases testing (#12574)

https://github.com/neondatabase/cloud/issues/19011
- Accept `ComputeSpec` in `/promote` instead of just passing safekeepers
and LSN. Update API spec
- Add corner case tests for promotion when promotion or perwarm fails
(using failpoints)
- Print root error for prewarm and promotion in status handlers
This commit is contained in:
Mikhail
2025-07-23 21:11:34 +01:00
committed by GitHub
parent 9e6ca2932f
commit a56afee269
10 changed files with 242 additions and 86 deletions

View File

@@ -90,6 +90,7 @@ impl ComputeNode {
}
/// If there is a prewarm request ongoing, return `false`, `true` otherwise.
/// Has a failpoint "compute-prewarm"
pub fn prewarm_lfc(self: &Arc<Self>, from_endpoint: Option<String>) -> bool {
{
let state = &mut self.state.lock().unwrap().lfc_prewarm_state;
@@ -112,9 +113,8 @@ impl ComputeNode {
Err(err) => {
crate::metrics::LFC_PREWARM_ERRORS.inc();
error!(%err, "could not prewarm LFC");
LfcPrewarmState::Failed {
error: err.to_string(),
error: format!("{err:#}"),
}
}
};
@@ -135,16 +135,20 @@ impl ComputeNode {
async fn prewarm_impl(&self, from_endpoint: Option<String>) -> Result<bool> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(from_endpoint)?;
#[cfg(feature = "testing")]
fail::fail_point!("compute-prewarm", |_| {
bail!("prewarm configured to fail because of a failpoint")
});
info!(%url, "requesting LFC state from endpoint storage");
let request = Client::new().get(&url).bearer_auth(token);
let res = request.send().await.context("querying endpoint storage")?;
let status = res.status();
match status {
match res.status() {
StatusCode::OK => (),
StatusCode::NOT_FOUND => {
return Ok(false);
}
_ => bail!("{status} querying endpoint storage"),
status => bail!("{status} querying endpoint storage"),
}
let mut uncompressed = Vec::new();
@@ -205,7 +209,7 @@ impl ComputeNode {
crate::metrics::LFC_OFFLOAD_ERRORS.inc();
error!(%err, "could not offload LFC state to endpoint storage");
self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Failed {
error: err.to_string(),
error: format!("{err:#}"),
};
}
@@ -213,16 +217,22 @@ impl ComputeNode {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?;
info!(%url, "requesting LFC state from Postgres");
let mut compressed = Vec::new();
ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
let row = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?
.query_one("select neon.get_local_cache_state()", &[])
.await
.context("querying LFC state")?
.try_get::<usize, &[u8]>(0)
.context("deserializing LFC state")
.map(ZstdEncoder::new)?
.context("querying LFC state")?;
let state = row
.try_get::<usize, Option<&[u8]>>(0)
.context("deserializing LFC state")?;
let Some(state) = state else {
info!(%url, "empty LFC state, not exporting");
return Ok(());
};
let mut compressed = Vec::new();
ZstdEncoder::new(state)
.read_to_end(&mut compressed)
.await
.context("compressing LFC state")?;

View File

@@ -1,11 +1,12 @@
use crate::compute::ComputeNode;
use anyhow::{Context, Result, bail};
use compute_api::{
responses::{LfcPrewarmState, PromoteState, SafekeepersLsn},
spec::ComputeMode,
};
use compute_api::responses::{LfcPrewarmState, PromoteConfig, PromoteState};
use compute_api::spec::ComputeMode;
use itertools::Itertools;
use std::collections::HashMap;
use std::{sync::Arc, time::Duration};
use tokio::time::sleep;
use tracing::info;
use utils::lsn::Lsn;
impl ComputeNode {
@@ -13,21 +14,22 @@ impl ComputeNode {
/// and http client disconnects, this does not stop promotion, and subsequent
/// calls block until promote finishes.
/// Called by control plane on secondary after primary endpoint is terminated
pub async fn promote(self: &Arc<Self>, safekeepers_lsn: SafekeepersLsn) -> PromoteState {
/// Has a failpoint "compute-promotion"
pub async fn promote(self: &Arc<Self>, cfg: PromoteConfig) -> PromoteState {
let cloned = self.clone();
let promote_fn = async move || {
let Err(err) = cloned.promote_impl(cfg).await else {
return PromoteState::Completed;
};
tracing::error!(%err, "promoting");
PromoteState::Failed {
error: format!("{err:#}"),
}
};
let start_promotion = || {
let (tx, rx) = tokio::sync::watch::channel(PromoteState::NotPromoted);
tokio::spawn(async move {
tx.send(match cloned.promote_impl(safekeepers_lsn).await {
Ok(_) => PromoteState::Completed,
Err(err) => {
tracing::error!(%err, "promoting");
PromoteState::Failed {
error: err.to_string(),
}
}
})
});
tokio::spawn(async move { tx.send(promote_fn().await) });
rx
};
@@ -47,9 +49,7 @@ impl ComputeNode {
task.borrow().clone()
}
// Why do we have to supply safekeepers?
// For secondary we use primary_connection_conninfo so safekeepers field is empty
async fn promote_impl(&self, safekeepers_lsn: SafekeepersLsn) -> Result<()> {
async fn promote_impl(&self, mut cfg: PromoteConfig) -> Result<()> {
{
let state = self.state.lock().unwrap();
let mode = &state.pspec.as_ref().unwrap().spec.mode;
@@ -73,7 +73,7 @@ impl ComputeNode {
.await
.context("connecting to postgres")?;
let primary_lsn = safekeepers_lsn.wal_flush_lsn;
let primary_lsn = cfg.wal_flush_lsn;
let mut last_wal_replay_lsn: Lsn = Lsn::INVALID;
const RETRIES: i32 = 20;
for i in 0..=RETRIES {
@@ -86,7 +86,7 @@ impl ComputeNode {
if last_wal_replay_lsn >= primary_lsn {
break;
}
tracing::info!("Try {i}, replica lsn {last_wal_replay_lsn}, primary lsn {primary_lsn}");
info!("Try {i}, replica lsn {last_wal_replay_lsn}, primary lsn {primary_lsn}");
sleep(Duration::from_secs(1)).await;
}
if last_wal_replay_lsn < primary_lsn {
@@ -96,7 +96,7 @@ impl ComputeNode {
// using $1 doesn't work with ALTER SYSTEM SET
let safekeepers_sql = format!(
"ALTER SYSTEM SET neon.safekeepers='{}'",
safekeepers_lsn.safekeepers
cfg.spec.safekeeper_connstrings.join(",")
);
client
.query(&safekeepers_sql, &[])
@@ -106,6 +106,12 @@ impl ComputeNode {
.query("SELECT pg_reload_conf()", &[])
.await
.context("reloading postgres config")?;
#[cfg(feature = "testing")]
fail::fail_point!("compute-promotion", |_| {
bail!("promotion configured to fail because of a failpoint")
});
let row = client
.query_one("SELECT * FROM pg_promote()", &[])
.await
@@ -125,8 +131,36 @@ impl ComputeNode {
bail!("replica in read only mode after promotion");
}
let mut state = self.state.lock().unwrap();
state.pspec.as_mut().unwrap().spec.mode = ComputeMode::Primary;
Ok(())
{
let mut state = self.state.lock().unwrap();
let spec = &mut state.pspec.as_mut().unwrap().spec;
spec.mode = ComputeMode::Primary;
let new_conf = cfg.spec.cluster.postgresql_conf.as_mut().unwrap();
let existing_conf = spec.cluster.postgresql_conf.as_ref().unwrap();
Self::merge_spec(new_conf, existing_conf);
}
info!("applied new spec, reconfiguring as primary");
self.reconfigure()
}
/// Merge old and new Postgres conf specs to apply on secondary.
/// Change new spec's port and safekeepers since they are supplied
/// differenly
fn merge_spec(new_conf: &mut String, existing_conf: &str) {
let mut new_conf_set: HashMap<&str, &str> = new_conf
.split_terminator('\n')
.map(|e| e.split_once("=").expect("invalid item"))
.collect();
new_conf_set.remove("neon.safekeepers");
let existing_conf_set: HashMap<&str, &str> = existing_conf
.split_terminator('\n')
.map(|e| e.split_once("=").expect("invalid item"))
.collect();
new_conf_set.insert("port", existing_conf_set["port"]);
*new_conf = new_conf_set
.iter()
.map(|(k, v)| format!("{k}={v}"))
.join("\n");
}
}

View File

@@ -96,7 +96,7 @@ paths:
content:
application/json:
schema:
$ref: "#/components/schemas/SafekeepersLsn"
$ref: "#/components/schemas/ComputeSchemaWithLsn"
responses:
200:
description: Promote succeeded or wasn't started
@@ -297,14 +297,7 @@ paths:
content:
application/json:
schema:
type: object
required:
- spec
properties:
spec:
# XXX: I don't want to explain current spec in the OpenAPI format,
# as it could be changed really soon. Consider doing it later.
type: object
$ref: "#/components/schemas/ComputeSchema"
responses:
200:
description: Compute configuration finished.
@@ -591,18 +584,25 @@ components:
type: string
example: "1.0.0"
SafekeepersLsn:
ComputeSchema:
type: object
required:
- safekeepers
- spec
properties:
spec:
type: object
ComputeSchemaWithLsn:
type: object
required:
- spec
- wal_flush_lsn
properties:
safekeepers:
description: Primary replica safekeepers
type: string
spec:
$ref: "#/components/schemas/ComputeState"
wal_flush_lsn:
description: Primary last WAL flush LSN
type: string
description: "last WAL flush LSN"
example: "0/028F10D8"
LfcPrewarmState:
type: object

View File

@@ -1,14 +1,14 @@
use crate::http::JsonResponse;
use axum::Form;
use axum::extract::Json;
use http::StatusCode;
pub(in crate::http) async fn promote(
compute: axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>,
Form(safekeepers_lsn): Form<compute_api::responses::SafekeepersLsn>,
Json(cfg): Json<compute_api::responses::PromoteConfig>,
) -> axum::response::Response {
let state = compute.promote(safekeepers_lsn).await;
if let compute_api::responses::PromoteState::Failed { error } = state {
return JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, error);
let state = compute.promote(cfg).await;
if let compute_api::responses::PromoteState::Failed { error: _ } = state {
return JsonResponse::create_response(StatusCode::INTERNAL_SERVER_ERROR, state);
}
JsonResponse::success(StatusCode::OK, state)
}

View File

@@ -1517,7 +1517,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
let endpoint = cplane
.endpoints
.get(endpoint_id.as_str())
.ok_or_else(|| anyhow::anyhow!("endpoint {endpoint_id} not found"))?;
.ok_or_else(|| anyhow!("endpoint {endpoint_id} not found"))?;
if !args.allow_multiple {
cplane.check_conflicting_endpoints(

View File

@@ -108,11 +108,10 @@ pub enum PromoteState {
Failed { error: String },
}
#[derive(Deserialize, Serialize, Default, Debug, Clone)]
#[derive(Deserialize, Default, Debug)]
#[serde(rename_all = "snake_case")]
/// Result of /safekeepers_lsn
pub struct SafekeepersLsn {
pub safekeepers: String,
pub struct PromoteConfig {
pub spec: ComputeSpec,
pub wal_flush_lsn: utils::lsn::Lsn,
}

View File

@@ -87,9 +87,10 @@ class EndpointHttpClient(requests.Session):
def prewarmed():
json = self.prewarm_lfc_status()
status, err = json["status"], json.get("error")
assert status == "completed", f"{status}, {err=}"
assert status in ["failed", "completed", "skipped"], f"{status}, {err=}"
wait_until(prewarmed, timeout=60)
assert self.prewarm_lfc_status()["status"] != "failed"
def offload_lfc_status(self) -> dict[str, str]:
res = self.get(self.offload_url)
@@ -105,19 +106,19 @@ class EndpointHttpClient(requests.Session):
def offloaded():
json = self.offload_lfc_status()
status, err = json["status"], json.get("error")
assert status == "completed", f"{status}, {err=}"
assert status in ["failed", "completed"], f"{status}, {err=}"
wait_until(offloaded)
assert self.offload_lfc_status()["status"] != "failed"
def promote(self, safekeepers_lsn: dict[str, Any], disconnect: bool = False):
def promote(self, promote_spec: dict[str, Any], disconnect: bool = False):
url = f"http://localhost:{self.external_port}/promote"
if disconnect:
try: # send first request to start promote and disconnect
self.post(url, data=safekeepers_lsn, timeout=0.001)
self.post(url, json=promote_spec, timeout=0.001)
except ReadTimeout:
pass # wait on second request which returns on promotion finish
res = self.post(url, data=safekeepers_lsn)
res.raise_for_status()
res = self.post(url, json=promote_spec)
json: dict[str, str] = res.json()
return json

View File

@@ -4794,9 +4794,10 @@ class Endpoint(PgProtocol, LogUtils):
m = re.search(r"=\s*(\S+)", line)
assert m is not None, f"malformed config line {line}"
size = m.group(1)
assert size_to_bytes(size) >= size_to_bytes("1MB"), (
"LFC size cannot be set less than 1MB"
)
if size_to_bytes(size) > 0:
assert size_to_bytes(size) >= size_to_bytes("1MB"), (
"LFC size cannot be set less than 1MB"
)
lfc_path_escaped = str(lfc_path).replace("'", "''")
config_lines = [
f"neon.file_cache_path = '{lfc_path_escaped}'",
@@ -4951,6 +4952,10 @@ class Endpoint(PgProtocol, LogUtils):
log.debug(json.dumps(dict(data_dict, **kwargs)))
json.dump(dict(data_dict, **kwargs), file, indent=4)
def get_compute_spec(self) -> dict[str, Any]:
out = json.loads((Path(self.endpoint_path()) / "config.json").read_text())["spec"]
return cast("dict[str, Any]", out)
def respec_deep(self, **kwargs: Any) -> None:
"""
Update the endpoint.json file taking into account nested keys.

View File

@@ -164,6 +164,25 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod):
check_prewarmed(method, client, desired)
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_lfc_prewarm_empty(neon_simple_env: NeonEnv):
"""
Test there are no errors when trying to offload or prewarm endpoint without cache using compute_ctl.
Endpoint without cache is simulated by turning off LFC manually, but in cloud/ setup this is
also reproduced on fresh endpoints
"""
env = neon_simple_env
ep = env.endpoints.create_start("main", config_lines=["neon.file_cache_size_limit=0"])
client = ep.http_client()
conn = ep.connect()
cur = conn.cursor()
cur.execute("create schema neon; create extension neon with schema neon")
method = PrewarmMethod.COMPUTE_CTL
offload_lfc(method, client, cur)
prewarm_endpoint(method, client, cur, None)
assert client.prewarm_lfc_status()["status"] == "skipped"
# autoprewarm isn't needed as we prewarm manually
WORKLOAD_VALUES = METHOD_VALUES[:-1]
WORKLOAD_IDS = METHOD_IDS[:-1]

View File

@@ -90,6 +90,7 @@ def test_replica_promote(neon_simple_env: NeonEnv, method: PromoteMethod):
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (100,)
primary_spec = primary.get_compute_spec()
primary_endpoint_id = primary.endpoint_id
stop_and_check_lsn(primary, expected_primary_lsn)
@@ -99,10 +100,9 @@ def test_replica_promote(neon_simple_env: NeonEnv, method: PromoteMethod):
if method == PromoteMethod.COMPUTE_CTL:
client = secondary.http_client()
client.prewarm_lfc(primary_endpoint_id)
# control plane knows safekeepers, simulate it by querying primary
assert (lsn := primary.terminate_flush_lsn)
safekeepers_lsn = {"safekeepers": safekeepers, "wal_flush_lsn": lsn}
assert client.promote(safekeepers_lsn)["status"] == "completed"
promote_spec = {"spec": primary_spec, "wal_flush_lsn": str(lsn)}
assert client.promote(promote_spec)["status"] == "completed"
else:
promo_cur.execute(f"alter system set neon.safekeepers='{safekeepers}'")
promo_cur.execute("select pg_reload_conf()")
@@ -131,21 +131,35 @@ def test_replica_promote(neon_simple_env: NeonEnv, method: PromoteMethod):
lsn_triple = get_lsn_triple(new_primary_cur)
log.info(f"Secondary: LSN after workload is {lsn_triple}")
expected_promoted_lsn = Lsn(lsn_triple[2])
expected_lsn = Lsn(lsn_triple[2])
with secondary.connect() as conn, conn.cursor() as new_primary_cur:
new_primary_cur.execute("select payload from t")
assert new_primary_cur.fetchall() == [(it,) for it in range(1, 201)]
if method == PromoteMethod.COMPUTE_CTL:
# compute_ctl's /promote switches replica type to Primary so it syncs
# safekeepers on finish
stop_and_check_lsn(secondary, expected_promoted_lsn)
# compute_ctl's /promote switches replica type to Primary so it syncs safekeepers on finish
stop_and_check_lsn(secondary, expected_lsn)
else:
# on testing postgres, we don't update replica type, secondaries don't
# sync so lsn should be None
# on testing postgres, we don't update replica type, secondaries don't sync so lsn should be None
stop_and_check_lsn(secondary, None)
if method == PromoteMethod.COMPUTE_CTL:
secondary.stop()
# In production, compute ultimately receives new compute spec from cplane.
secondary.respec(mode="Primary")
secondary.start()
with secondary.connect() as conn, conn.cursor() as new_primary_cur:
new_primary_cur.execute(
"INSERT INTO t (payload) SELECT generate_series(101, 200) RETURNING payload"
)
assert new_primary_cur.fetchall() == [(it,) for it in range(101, 201)]
lsn_triple = get_lsn_triple(new_primary_cur)
log.info(f"Secondary: LSN after restart and workload is {lsn_triple}")
expected_lsn = Lsn(lsn_triple[2])
stop_and_check_lsn(secondary, expected_lsn)
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary2")
with primary.connect() as new_primary, new_primary.cursor() as new_primary_cur:
@@ -154,10 +168,11 @@ def test_replica_promote(neon_simple_env: NeonEnv, method: PromoteMethod):
log.info(f"New primary: Boot LSN is {lsn_triple}")
new_primary_cur.execute("select count(*) from t")
assert new_primary_cur.fetchone() == (200,)
compute_ctl_count = 100 * (method == PromoteMethod.COMPUTE_CTL)
assert new_primary_cur.fetchone() == (200 + compute_ctl_count,)
new_primary_cur.execute("INSERT INTO t (payload) SELECT generate_series(201, 300)")
new_primary_cur.execute("select count(*) from t")
assert new_primary_cur.fetchone() == (300,)
assert new_primary_cur.fetchone() == (300 + compute_ctl_count,)
stop_and_check_lsn(primary, expected_primary_lsn)
@@ -175,18 +190,91 @@ def test_replica_promote_handler_disconnects(neon_simple_env: NeonEnv):
cur.execute("create schema neon;create extension neon with schema neon")
cur.execute("create table t(pk bigint GENERATED ALWAYS AS IDENTITY, payload integer)")
cur.execute("INSERT INTO t(payload) SELECT generate_series(1, 100)")
cur.execute("show neon.safekeepers")
safekeepers = cur.fetchall()[0][0]
primary.http_client().offload_lfc()
primary_spec = primary.get_compute_spec()
primary_endpoint_id = primary.endpoint_id
primary.stop(mode="immediate-terminate")
assert (lsn := primary.terminate_flush_lsn)
client = secondary.http_client()
client.prewarm_lfc(primary_endpoint_id)
safekeepers_lsn = {"safekeepers": safekeepers, "wal_flush_lsn": lsn}
assert client.promote(safekeepers_lsn, disconnect=True)["status"] == "completed"
promote_spec = {"spec": primary_spec, "wal_flush_lsn": str(lsn)}
assert client.promote(promote_spec, disconnect=True)["status"] == "completed"
with secondary.connect() as conn, conn.cursor() as cur:
cur.execute("select count(*) from t")
assert cur.fetchone() == (100,)
cur.execute("INSERT INTO t (payload) SELECT generate_series(101, 200) RETURNING payload")
cur.execute("select count(*) from t")
assert cur.fetchone() == (200,)
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_replica_promote_fails(neon_simple_env: NeonEnv):
"""
Test that if a /promote route fails, we can safely start primary back
"""
env: NeonEnv = neon_simple_env
primary: Endpoint = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
secondary: Endpoint = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
secondary.stop()
secondary.start(env={"FAILPOINTS": "compute-promotion=return(0)"})
with primary.connect() as conn, conn.cursor() as cur:
cur.execute("create schema neon;create extension neon with schema neon")
cur.execute("create table t(pk bigint GENERATED ALWAYS AS IDENTITY, payload integer)")
cur.execute("INSERT INTO t(payload) SELECT generate_series(1, 100)")
primary.http_client().offload_lfc()
primary_spec = primary.get_compute_spec()
primary_endpoint_id = primary.endpoint_id
primary.stop(mode="immediate-terminate")
assert (lsn := primary.terminate_flush_lsn)
client = secondary.http_client()
client.prewarm_lfc(primary_endpoint_id)
promote_spec = {"spec": primary_spec, "wal_flush_lsn": str(lsn)}
assert client.promote(promote_spec)["status"] == "failed"
secondary.stop()
primary.start()
with primary.connect() as conn, conn.cursor() as cur:
cur.execute("select count(*) from t")
assert cur.fetchone() == (100,)
cur.execute("INSERT INTO t (payload) SELECT generate_series(101, 200) RETURNING payload")
cur.execute("select count(*) from t")
assert cur.fetchone() == (200,)
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_replica_promote_prewarm_fails(neon_simple_env: NeonEnv):
"""
Test that if /lfc/prewarm route fails, we are able to promote
"""
env: NeonEnv = neon_simple_env
primary: Endpoint = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
secondary: Endpoint = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
secondary.stop()
secondary.start(env={"FAILPOINTS": "compute-prewarm=return(0)"})
with primary.connect() as conn, conn.cursor() as cur:
cur.execute("create schema neon;create extension neon with schema neon")
cur.execute("create table t(pk bigint GENERATED ALWAYS AS IDENTITY, payload integer)")
cur.execute("INSERT INTO t(payload) SELECT generate_series(1, 100)")
primary.http_client().offload_lfc()
primary_spec = primary.get_compute_spec()
primary_endpoint_id = primary.endpoint_id
primary.stop(mode="immediate-terminate")
assert (lsn := primary.terminate_flush_lsn)
client = secondary.http_client()
with pytest.raises(AssertionError):
client.prewarm_lfc(primary_endpoint_id)
assert client.prewarm_lfc_status()["status"] == "failed"
promote_spec = {"spec": primary_spec, "wal_flush_lsn": str(lsn)}
assert client.promote(promote_spec)["status"] == "completed"
with secondary.connect() as conn, conn.cursor() as cur:
cur.execute("select count(*) from t")