From a56afee2692caeaa7a2d2323cd311a8a14d2a03a Mon Sep 17 00:00:00 2001 From: Mikhail Date: Wed, 23 Jul 2025 21:11:34 +0100 Subject: [PATCH] Accept primary compute spec in /promote, promotion corner cases testing (#12574) https://github.com/neondatabase/cloud/issues/19011 - Accept `ComputeSpec` in `/promote` instead of just passing safekeepers and LSN. Update API spec - Add corner case tests for promotion when promotion or perwarm fails (using failpoints) - Print root error for prewarm and promotion in status handlers --- compute_tools/src/compute_prewarm.rs | 34 ++++-- compute_tools/src/compute_promote.rs | 84 +++++++++---- compute_tools/src/http/openapi_spec.yaml | 30 ++--- compute_tools/src/http/routes/promote.rs | 10 +- control_plane/src/bin/neon_local.rs | 2 +- libs/compute_api/src/responses.rs | 7 +- test_runner/fixtures/endpoint/http.py | 13 +- test_runner/fixtures/neon_fixtures.py | 11 +- test_runner/regress/test_lfc_prewarm.py | 19 +++ test_runner/regress/test_replica_promotes.py | 118 ++++++++++++++++--- 10 files changed, 242 insertions(+), 86 deletions(-) diff --git a/compute_tools/src/compute_prewarm.rs b/compute_tools/src/compute_prewarm.rs index 07b4a596cc..97e62c1c80 100644 --- a/compute_tools/src/compute_prewarm.rs +++ b/compute_tools/src/compute_prewarm.rs @@ -90,6 +90,7 @@ impl ComputeNode { } /// If there is a prewarm request ongoing, return `false`, `true` otherwise. + /// Has a failpoint "compute-prewarm" pub fn prewarm_lfc(self: &Arc, from_endpoint: Option) -> bool { { let state = &mut self.state.lock().unwrap().lfc_prewarm_state; @@ -112,9 +113,8 @@ impl ComputeNode { Err(err) => { crate::metrics::LFC_PREWARM_ERRORS.inc(); error!(%err, "could not prewarm LFC"); - LfcPrewarmState::Failed { - error: err.to_string(), + error: format!("{err:#}"), } } }; @@ -135,16 +135,20 @@ impl ComputeNode { async fn prewarm_impl(&self, from_endpoint: Option) -> Result { let EndpointStoragePair { url, token } = self.endpoint_storage_pair(from_endpoint)?; + #[cfg(feature = "testing")] + fail::fail_point!("compute-prewarm", |_| { + bail!("prewarm configured to fail because of a failpoint") + }); + info!(%url, "requesting LFC state from endpoint storage"); let request = Client::new().get(&url).bearer_auth(token); let res = request.send().await.context("querying endpoint storage")?; - let status = res.status(); - match status { + match res.status() { StatusCode::OK => (), StatusCode::NOT_FOUND => { return Ok(false); } - _ => bail!("{status} querying endpoint storage"), + status => bail!("{status} querying endpoint storage"), } let mut uncompressed = Vec::new(); @@ -205,7 +209,7 @@ impl ComputeNode { crate::metrics::LFC_OFFLOAD_ERRORS.inc(); error!(%err, "could not offload LFC state to endpoint storage"); self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Failed { - error: err.to_string(), + error: format!("{err:#}"), }; } @@ -213,16 +217,22 @@ impl ComputeNode { let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?; info!(%url, "requesting LFC state from Postgres"); - let mut compressed = Vec::new(); - ComputeNode::get_maintenance_client(&self.tokio_conn_conf) + let row = ComputeNode::get_maintenance_client(&self.tokio_conn_conf) .await .context("connecting to postgres")? .query_one("select neon.get_local_cache_state()", &[]) .await - .context("querying LFC state")? - .try_get::(0) - .context("deserializing LFC state") - .map(ZstdEncoder::new)? + .context("querying LFC state")?; + let state = row + .try_get::>(0) + .context("deserializing LFC state")?; + let Some(state) = state else { + info!(%url, "empty LFC state, not exporting"); + return Ok(()); + }; + + let mut compressed = Vec::new(); + ZstdEncoder::new(state) .read_to_end(&mut compressed) .await .context("compressing LFC state")?; diff --git a/compute_tools/src/compute_promote.rs b/compute_tools/src/compute_promote.rs index 42256faa22..a34368c531 100644 --- a/compute_tools/src/compute_promote.rs +++ b/compute_tools/src/compute_promote.rs @@ -1,11 +1,12 @@ use crate::compute::ComputeNode; use anyhow::{Context, Result, bail}; -use compute_api::{ - responses::{LfcPrewarmState, PromoteState, SafekeepersLsn}, - spec::ComputeMode, -}; +use compute_api::responses::{LfcPrewarmState, PromoteConfig, PromoteState}; +use compute_api::spec::ComputeMode; +use itertools::Itertools; +use std::collections::HashMap; use std::{sync::Arc, time::Duration}; use tokio::time::sleep; +use tracing::info; use utils::lsn::Lsn; impl ComputeNode { @@ -13,21 +14,22 @@ impl ComputeNode { /// and http client disconnects, this does not stop promotion, and subsequent /// calls block until promote finishes. /// Called by control plane on secondary after primary endpoint is terminated - pub async fn promote(self: &Arc, safekeepers_lsn: SafekeepersLsn) -> PromoteState { + /// Has a failpoint "compute-promotion" + pub async fn promote(self: &Arc, cfg: PromoteConfig) -> PromoteState { let cloned = self.clone(); + let promote_fn = async move || { + let Err(err) = cloned.promote_impl(cfg).await else { + return PromoteState::Completed; + }; + tracing::error!(%err, "promoting"); + PromoteState::Failed { + error: format!("{err:#}"), + } + }; + let start_promotion = || { let (tx, rx) = tokio::sync::watch::channel(PromoteState::NotPromoted); - tokio::spawn(async move { - tx.send(match cloned.promote_impl(safekeepers_lsn).await { - Ok(_) => PromoteState::Completed, - Err(err) => { - tracing::error!(%err, "promoting"); - PromoteState::Failed { - error: err.to_string(), - } - } - }) - }); + tokio::spawn(async move { tx.send(promote_fn().await) }); rx }; @@ -47,9 +49,7 @@ impl ComputeNode { task.borrow().clone() } - // Why do we have to supply safekeepers? - // For secondary we use primary_connection_conninfo so safekeepers field is empty - async fn promote_impl(&self, safekeepers_lsn: SafekeepersLsn) -> Result<()> { + async fn promote_impl(&self, mut cfg: PromoteConfig) -> Result<()> { { let state = self.state.lock().unwrap(); let mode = &state.pspec.as_ref().unwrap().spec.mode; @@ -73,7 +73,7 @@ impl ComputeNode { .await .context("connecting to postgres")?; - let primary_lsn = safekeepers_lsn.wal_flush_lsn; + let primary_lsn = cfg.wal_flush_lsn; let mut last_wal_replay_lsn: Lsn = Lsn::INVALID; const RETRIES: i32 = 20; for i in 0..=RETRIES { @@ -86,7 +86,7 @@ impl ComputeNode { if last_wal_replay_lsn >= primary_lsn { break; } - tracing::info!("Try {i}, replica lsn {last_wal_replay_lsn}, primary lsn {primary_lsn}"); + info!("Try {i}, replica lsn {last_wal_replay_lsn}, primary lsn {primary_lsn}"); sleep(Duration::from_secs(1)).await; } if last_wal_replay_lsn < primary_lsn { @@ -96,7 +96,7 @@ impl ComputeNode { // using $1 doesn't work with ALTER SYSTEM SET let safekeepers_sql = format!( "ALTER SYSTEM SET neon.safekeepers='{}'", - safekeepers_lsn.safekeepers + cfg.spec.safekeeper_connstrings.join(",") ); client .query(&safekeepers_sql, &[]) @@ -106,6 +106,12 @@ impl ComputeNode { .query("SELECT pg_reload_conf()", &[]) .await .context("reloading postgres config")?; + + #[cfg(feature = "testing")] + fail::fail_point!("compute-promotion", |_| { + bail!("promotion configured to fail because of a failpoint") + }); + let row = client .query_one("SELECT * FROM pg_promote()", &[]) .await @@ -125,8 +131,36 @@ impl ComputeNode { bail!("replica in read only mode after promotion"); } - let mut state = self.state.lock().unwrap(); - state.pspec.as_mut().unwrap().spec.mode = ComputeMode::Primary; - Ok(()) + { + let mut state = self.state.lock().unwrap(); + let spec = &mut state.pspec.as_mut().unwrap().spec; + spec.mode = ComputeMode::Primary; + let new_conf = cfg.spec.cluster.postgresql_conf.as_mut().unwrap(); + let existing_conf = spec.cluster.postgresql_conf.as_ref().unwrap(); + Self::merge_spec(new_conf, existing_conf); + } + info!("applied new spec, reconfiguring as primary"); + self.reconfigure() + } + + /// Merge old and new Postgres conf specs to apply on secondary. + /// Change new spec's port and safekeepers since they are supplied + /// differenly + fn merge_spec(new_conf: &mut String, existing_conf: &str) { + let mut new_conf_set: HashMap<&str, &str> = new_conf + .split_terminator('\n') + .map(|e| e.split_once("=").expect("invalid item")) + .collect(); + new_conf_set.remove("neon.safekeepers"); + + let existing_conf_set: HashMap<&str, &str> = existing_conf + .split_terminator('\n') + .map(|e| e.split_once("=").expect("invalid item")) + .collect(); + new_conf_set.insert("port", existing_conf_set["port"]); + *new_conf = new_conf_set + .iter() + .map(|(k, v)| format!("{k}={v}")) + .join("\n"); } } diff --git a/compute_tools/src/http/openapi_spec.yaml b/compute_tools/src/http/openapi_spec.yaml index 3cf5ea7c51..ab729d62b5 100644 --- a/compute_tools/src/http/openapi_spec.yaml +++ b/compute_tools/src/http/openapi_spec.yaml @@ -96,7 +96,7 @@ paths: content: application/json: schema: - $ref: "#/components/schemas/SafekeepersLsn" + $ref: "#/components/schemas/ComputeSchemaWithLsn" responses: 200: description: Promote succeeded or wasn't started @@ -297,14 +297,7 @@ paths: content: application/json: schema: - type: object - required: - - spec - properties: - spec: - # XXX: I don't want to explain current spec in the OpenAPI format, - # as it could be changed really soon. Consider doing it later. - type: object + $ref: "#/components/schemas/ComputeSchema" responses: 200: description: Compute configuration finished. @@ -591,18 +584,25 @@ components: type: string example: "1.0.0" - SafekeepersLsn: + ComputeSchema: type: object required: - - safekeepers + - spec + properties: + spec: + type: object + ComputeSchemaWithLsn: + type: object + required: + - spec - wal_flush_lsn properties: - safekeepers: - description: Primary replica safekeepers - type: string + spec: + $ref: "#/components/schemas/ComputeState" wal_flush_lsn: - description: Primary last WAL flush LSN type: string + description: "last WAL flush LSN" + example: "0/028F10D8" LfcPrewarmState: type: object diff --git a/compute_tools/src/http/routes/promote.rs b/compute_tools/src/http/routes/promote.rs index bc5f93b4da..7ca3464b63 100644 --- a/compute_tools/src/http/routes/promote.rs +++ b/compute_tools/src/http/routes/promote.rs @@ -1,14 +1,14 @@ use crate::http::JsonResponse; -use axum::Form; +use axum::extract::Json; use http::StatusCode; pub(in crate::http) async fn promote( compute: axum::extract::State>, - Form(safekeepers_lsn): Form, + Json(cfg): Json, ) -> axum::response::Response { - let state = compute.promote(safekeepers_lsn).await; - if let compute_api::responses::PromoteState::Failed { error } = state { - return JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, error); + let state = compute.promote(cfg).await; + if let compute_api::responses::PromoteState::Failed { error: _ } = state { + return JsonResponse::create_response(StatusCode::INTERNAL_SERVER_ERROR, state); } JsonResponse::success(StatusCode::OK, state) } diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index e036e9d44b..f68bc1ed48 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -1517,7 +1517,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res let endpoint = cplane .endpoints .get(endpoint_id.as_str()) - .ok_or_else(|| anyhow::anyhow!("endpoint {endpoint_id} not found"))?; + .ok_or_else(|| anyhow!("endpoint {endpoint_id} not found"))?; if !args.allow_multiple { cplane.check_conflicting_endpoints( diff --git a/libs/compute_api/src/responses.rs b/libs/compute_api/src/responses.rs index 5b8fc49750..2ef1e6aab8 100644 --- a/libs/compute_api/src/responses.rs +++ b/libs/compute_api/src/responses.rs @@ -108,11 +108,10 @@ pub enum PromoteState { Failed { error: String }, } -#[derive(Deserialize, Serialize, Default, Debug, Clone)] +#[derive(Deserialize, Default, Debug)] #[serde(rename_all = "snake_case")] -/// Result of /safekeepers_lsn -pub struct SafekeepersLsn { - pub safekeepers: String, +pub struct PromoteConfig { + pub spec: ComputeSpec, pub wal_flush_lsn: utils::lsn::Lsn, } diff --git a/test_runner/fixtures/endpoint/http.py b/test_runner/fixtures/endpoint/http.py index c43445e89d..64db2b1f17 100644 --- a/test_runner/fixtures/endpoint/http.py +++ b/test_runner/fixtures/endpoint/http.py @@ -87,9 +87,10 @@ class EndpointHttpClient(requests.Session): def prewarmed(): json = self.prewarm_lfc_status() status, err = json["status"], json.get("error") - assert status == "completed", f"{status}, {err=}" + assert status in ["failed", "completed", "skipped"], f"{status}, {err=}" wait_until(prewarmed, timeout=60) + assert self.prewarm_lfc_status()["status"] != "failed" def offload_lfc_status(self) -> dict[str, str]: res = self.get(self.offload_url) @@ -105,19 +106,19 @@ class EndpointHttpClient(requests.Session): def offloaded(): json = self.offload_lfc_status() status, err = json["status"], json.get("error") - assert status == "completed", f"{status}, {err=}" + assert status in ["failed", "completed"], f"{status}, {err=}" wait_until(offloaded) + assert self.offload_lfc_status()["status"] != "failed" - def promote(self, safekeepers_lsn: dict[str, Any], disconnect: bool = False): + def promote(self, promote_spec: dict[str, Any], disconnect: bool = False): url = f"http://localhost:{self.external_port}/promote" if disconnect: try: # send first request to start promote and disconnect - self.post(url, data=safekeepers_lsn, timeout=0.001) + self.post(url, json=promote_spec, timeout=0.001) except ReadTimeout: pass # wait on second request which returns on promotion finish - res = self.post(url, data=safekeepers_lsn) - res.raise_for_status() + res = self.post(url, json=promote_spec) json: dict[str, str] = res.json() return json diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 33a18e4394..e7763de0e7 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4794,9 +4794,10 @@ class Endpoint(PgProtocol, LogUtils): m = re.search(r"=\s*(\S+)", line) assert m is not None, f"malformed config line {line}" size = m.group(1) - assert size_to_bytes(size) >= size_to_bytes("1MB"), ( - "LFC size cannot be set less than 1MB" - ) + if size_to_bytes(size) > 0: + assert size_to_bytes(size) >= size_to_bytes("1MB"), ( + "LFC size cannot be set less than 1MB" + ) lfc_path_escaped = str(lfc_path).replace("'", "''") config_lines = [ f"neon.file_cache_path = '{lfc_path_escaped}'", @@ -4951,6 +4952,10 @@ class Endpoint(PgProtocol, LogUtils): log.debug(json.dumps(dict(data_dict, **kwargs))) json.dump(dict(data_dict, **kwargs), file, indent=4) + def get_compute_spec(self) -> dict[str, Any]: + out = json.loads((Path(self.endpoint_path()) / "config.json").read_text())["spec"] + return cast("dict[str, Any]", out) + def respec_deep(self, **kwargs: Any) -> None: """ Update the endpoint.json file taking into account nested keys. diff --git a/test_runner/regress/test_lfc_prewarm.py b/test_runner/regress/test_lfc_prewarm.py index 0f0cf4cc6d..2bbe8c3e97 100644 --- a/test_runner/regress/test_lfc_prewarm.py +++ b/test_runner/regress/test_lfc_prewarm.py @@ -164,6 +164,25 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod): check_prewarmed(method, client, desired) +@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping") +def test_lfc_prewarm_empty(neon_simple_env: NeonEnv): + """ + Test there are no errors when trying to offload or prewarm endpoint without cache using compute_ctl. + Endpoint without cache is simulated by turning off LFC manually, but in cloud/ setup this is + also reproduced on fresh endpoints + """ + env = neon_simple_env + ep = env.endpoints.create_start("main", config_lines=["neon.file_cache_size_limit=0"]) + client = ep.http_client() + conn = ep.connect() + cur = conn.cursor() + cur.execute("create schema neon; create extension neon with schema neon") + method = PrewarmMethod.COMPUTE_CTL + offload_lfc(method, client, cur) + prewarm_endpoint(method, client, cur, None) + assert client.prewarm_lfc_status()["status"] == "skipped" + + # autoprewarm isn't needed as we prewarm manually WORKLOAD_VALUES = METHOD_VALUES[:-1] WORKLOAD_IDS = METHOD_IDS[:-1] diff --git a/test_runner/regress/test_replica_promotes.py b/test_runner/regress/test_replica_promotes.py index 8d39ac123a..9415d6886c 100644 --- a/test_runner/regress/test_replica_promotes.py +++ b/test_runner/regress/test_replica_promotes.py @@ -90,6 +90,7 @@ def test_replica_promote(neon_simple_env: NeonEnv, method: PromoteMethod): secondary_cur.execute("select count(*) from t") assert secondary_cur.fetchone() == (100,) + primary_spec = primary.get_compute_spec() primary_endpoint_id = primary.endpoint_id stop_and_check_lsn(primary, expected_primary_lsn) @@ -99,10 +100,9 @@ def test_replica_promote(neon_simple_env: NeonEnv, method: PromoteMethod): if method == PromoteMethod.COMPUTE_CTL: client = secondary.http_client() client.prewarm_lfc(primary_endpoint_id) - # control plane knows safekeepers, simulate it by querying primary assert (lsn := primary.terminate_flush_lsn) - safekeepers_lsn = {"safekeepers": safekeepers, "wal_flush_lsn": lsn} - assert client.promote(safekeepers_lsn)["status"] == "completed" + promote_spec = {"spec": primary_spec, "wal_flush_lsn": str(lsn)} + assert client.promote(promote_spec)["status"] == "completed" else: promo_cur.execute(f"alter system set neon.safekeepers='{safekeepers}'") promo_cur.execute("select pg_reload_conf()") @@ -131,21 +131,35 @@ def test_replica_promote(neon_simple_env: NeonEnv, method: PromoteMethod): lsn_triple = get_lsn_triple(new_primary_cur) log.info(f"Secondary: LSN after workload is {lsn_triple}") - expected_promoted_lsn = Lsn(lsn_triple[2]) + expected_lsn = Lsn(lsn_triple[2]) with secondary.connect() as conn, conn.cursor() as new_primary_cur: new_primary_cur.execute("select payload from t") assert new_primary_cur.fetchall() == [(it,) for it in range(1, 201)] if method == PromoteMethod.COMPUTE_CTL: - # compute_ctl's /promote switches replica type to Primary so it syncs - # safekeepers on finish - stop_and_check_lsn(secondary, expected_promoted_lsn) + # compute_ctl's /promote switches replica type to Primary so it syncs safekeepers on finish + stop_and_check_lsn(secondary, expected_lsn) else: - # on testing postgres, we don't update replica type, secondaries don't - # sync so lsn should be None + # on testing postgres, we don't update replica type, secondaries don't sync so lsn should be None stop_and_check_lsn(secondary, None) + if method == PromoteMethod.COMPUTE_CTL: + secondary.stop() + # In production, compute ultimately receives new compute spec from cplane. + secondary.respec(mode="Primary") + secondary.start() + + with secondary.connect() as conn, conn.cursor() as new_primary_cur: + new_primary_cur.execute( + "INSERT INTO t (payload) SELECT generate_series(101, 200) RETURNING payload" + ) + assert new_primary_cur.fetchall() == [(it,) for it in range(101, 201)] + lsn_triple = get_lsn_triple(new_primary_cur) + log.info(f"Secondary: LSN after restart and workload is {lsn_triple}") + expected_lsn = Lsn(lsn_triple[2]) + stop_and_check_lsn(secondary, expected_lsn) + primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary2") with primary.connect() as new_primary, new_primary.cursor() as new_primary_cur: @@ -154,10 +168,11 @@ def test_replica_promote(neon_simple_env: NeonEnv, method: PromoteMethod): log.info(f"New primary: Boot LSN is {lsn_triple}") new_primary_cur.execute("select count(*) from t") - assert new_primary_cur.fetchone() == (200,) + compute_ctl_count = 100 * (method == PromoteMethod.COMPUTE_CTL) + assert new_primary_cur.fetchone() == (200 + compute_ctl_count,) new_primary_cur.execute("INSERT INTO t (payload) SELECT generate_series(201, 300)") new_primary_cur.execute("select count(*) from t") - assert new_primary_cur.fetchone() == (300,) + assert new_primary_cur.fetchone() == (300 + compute_ctl_count,) stop_and_check_lsn(primary, expected_primary_lsn) @@ -175,18 +190,91 @@ def test_replica_promote_handler_disconnects(neon_simple_env: NeonEnv): cur.execute("create schema neon;create extension neon with schema neon") cur.execute("create table t(pk bigint GENERATED ALWAYS AS IDENTITY, payload integer)") cur.execute("INSERT INTO t(payload) SELECT generate_series(1, 100)") - cur.execute("show neon.safekeepers") - safekeepers = cur.fetchall()[0][0] primary.http_client().offload_lfc() + primary_spec = primary.get_compute_spec() primary_endpoint_id = primary.endpoint_id primary.stop(mode="immediate-terminate") assert (lsn := primary.terminate_flush_lsn) client = secondary.http_client() client.prewarm_lfc(primary_endpoint_id) - safekeepers_lsn = {"safekeepers": safekeepers, "wal_flush_lsn": lsn} - assert client.promote(safekeepers_lsn, disconnect=True)["status"] == "completed" + promote_spec = {"spec": primary_spec, "wal_flush_lsn": str(lsn)} + assert client.promote(promote_spec, disconnect=True)["status"] == "completed" + + with secondary.connect() as conn, conn.cursor() as cur: + cur.execute("select count(*) from t") + assert cur.fetchone() == (100,) + cur.execute("INSERT INTO t (payload) SELECT generate_series(101, 200) RETURNING payload") + cur.execute("select count(*) from t") + assert cur.fetchone() == (200,) + + +@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping") +def test_replica_promote_fails(neon_simple_env: NeonEnv): + """ + Test that if a /promote route fails, we can safely start primary back + """ + env: NeonEnv = neon_simple_env + primary: Endpoint = env.endpoints.create_start(branch_name="main", endpoint_id="primary") + secondary: Endpoint = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") + secondary.stop() + secondary.start(env={"FAILPOINTS": "compute-promotion=return(0)"}) + + with primary.connect() as conn, conn.cursor() as cur: + cur.execute("create schema neon;create extension neon with schema neon") + cur.execute("create table t(pk bigint GENERATED ALWAYS AS IDENTITY, payload integer)") + cur.execute("INSERT INTO t(payload) SELECT generate_series(1, 100)") + + primary.http_client().offload_lfc() + primary_spec = primary.get_compute_spec() + primary_endpoint_id = primary.endpoint_id + primary.stop(mode="immediate-terminate") + assert (lsn := primary.terminate_flush_lsn) + + client = secondary.http_client() + client.prewarm_lfc(primary_endpoint_id) + promote_spec = {"spec": primary_spec, "wal_flush_lsn": str(lsn)} + assert client.promote(promote_spec)["status"] == "failed" + secondary.stop() + + primary.start() + with primary.connect() as conn, conn.cursor() as cur: + cur.execute("select count(*) from t") + assert cur.fetchone() == (100,) + cur.execute("INSERT INTO t (payload) SELECT generate_series(101, 200) RETURNING payload") + cur.execute("select count(*) from t") + assert cur.fetchone() == (200,) + + +@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping") +def test_replica_promote_prewarm_fails(neon_simple_env: NeonEnv): + """ + Test that if /lfc/prewarm route fails, we are able to promote + """ + env: NeonEnv = neon_simple_env + primary: Endpoint = env.endpoints.create_start(branch_name="main", endpoint_id="primary") + secondary: Endpoint = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") + secondary.stop() + secondary.start(env={"FAILPOINTS": "compute-prewarm=return(0)"}) + + with primary.connect() as conn, conn.cursor() as cur: + cur.execute("create schema neon;create extension neon with schema neon") + cur.execute("create table t(pk bigint GENERATED ALWAYS AS IDENTITY, payload integer)") + cur.execute("INSERT INTO t(payload) SELECT generate_series(1, 100)") + + primary.http_client().offload_lfc() + primary_spec = primary.get_compute_spec() + primary_endpoint_id = primary.endpoint_id + primary.stop(mode="immediate-terminate") + assert (lsn := primary.terminate_flush_lsn) + + client = secondary.http_client() + with pytest.raises(AssertionError): + client.prewarm_lfc(primary_endpoint_id) + assert client.prewarm_lfc_status()["status"] == "failed" + promote_spec = {"spec": primary_spec, "wal_flush_lsn": str(lsn)} + assert client.promote(promote_spec)["status"] == "completed" with secondary.connect() as conn, conn.cursor() as cur: cur.execute("select count(*) from t")