From e95f2f9a671d276afd72e669bbece122f305b39a Mon Sep 17 00:00:00 2001 From: Mikhail Date: Wed, 18 Jun 2025 13:25:19 +0100 Subject: [PATCH] compute_ctl: return LSN in /terminate (#12240) - Add optional `?mode=fast|immediate` to `/terminate`, `fast` is default. Immediate avoids waiting 30 seconds before returning from `terminate`. - Add `TerminateMode` to `ComputeStatus::TerminationPending` - Use `/terminate?mode=immediate` in `neon_local` instead of `pg_ctl stop` for `test_replica_promotes`. - Change `test_replica_promotes` to check returned LSN - Annotate `finish_sync_safekeepers` as `noreturn`. https://github.com/neondatabase/cloud/issues/29807 --- compute_tools/src/compute.rs | 33 ++++++++--- compute_tools/src/http/routes/terminate.rs | 39 ++++++++----- compute_tools/src/monitor.rs | 4 +- control_plane/src/bin/neon_local.rs | 21 ++++--- control_plane/src/endpoint.rs | 58 ++++++++++++++++--- libs/compute_api/src/responses.rs | 19 +++++- libs/desim/src/executor.rs | 4 +- libs/walproposer/src/api_bindings.rs | 2 +- libs/walproposer/src/walproposer.rs | 4 +- pgxn/neon/walproposer.h | 3 +- pgxn/neon/walproposer_pg.c | 2 +- .../tests/walproposer_sim/walproposer_api.rs | 2 +- test_runner/fixtures/neon_cli.py | 8 ++- test_runner/fixtures/neon_fixtures.py | 8 ++- test_runner/regress/test_replica_promotes.py | 29 ++++++++-- 15 files changed, 176 insertions(+), 60 deletions(-) diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 9744cc2dac..d42e3cc860 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -163,6 +163,10 @@ pub struct ComputeState { pub lfc_prewarm_state: LfcPrewarmState, pub lfc_offload_state: LfcOffloadState, + /// WAL flush LSN that is set after terminating Postgres and syncing safekeepers if + /// mode == ComputeMode::Primary. None otherwise + pub terminate_flush_lsn: Option, + pub metrics: ComputeMetrics, } @@ -178,6 +182,7 @@ impl ComputeState { metrics: ComputeMetrics::default(), lfc_prewarm_state: LfcPrewarmState::default(), lfc_offload_state: LfcOffloadState::default(), + terminate_flush_lsn: None, } } @@ -531,12 +536,21 @@ impl ComputeNode { // Reap the postgres process delay_exit |= this.cleanup_after_postgres_exit()?; + // /terminate returns LSN. If we don't sleep at all, connection will break and we + // won't get result. If we sleep too much, tests will take significantly longer + // and Github Action run will error out + let sleep_duration = if delay_exit { + Duration::from_secs(30) + } else { + Duration::from_millis(300) + }; + // If launch failed, keep serving HTTP requests for a while, so the cloud // control plane can get the actual error. if delay_exit { info!("giving control plane 30s to collect the error before shutdown"); - std::thread::sleep(Duration::from_secs(30)); } + std::thread::sleep(sleep_duration); Ok(exit_code) } @@ -908,20 +922,25 @@ impl ComputeNode { // Maybe sync safekeepers again, to speed up next startup let compute_state = self.state.lock().unwrap().clone(); let pspec = compute_state.pspec.as_ref().expect("spec must be set"); - if matches!(pspec.spec.mode, compute_api::spec::ComputeMode::Primary) { + let lsn = if matches!(pspec.spec.mode, compute_api::spec::ComputeMode::Primary) { info!("syncing safekeepers on shutdown"); let storage_auth_token = pspec.storage_auth_token.clone(); let lsn = self.sync_safekeepers(storage_auth_token)?; - info!("synced safekeepers at lsn {lsn}"); - } + info!(%lsn, "synced safekeepers"); + Some(lsn) + } else { + info!("not primary, not syncing safekeepers"); + None + }; let mut delay_exit = false; let mut state = self.state.lock().unwrap(); - if state.status == ComputeStatus::TerminationPending { + state.terminate_flush_lsn = lsn; + if let ComputeStatus::TerminationPending { mode } = state.status { state.status = ComputeStatus::Terminated; self.state_changed.notify_all(); // we were asked to terminate gracefully, don't exit to avoid restart - delay_exit = true + delay_exit = mode == compute_api::responses::TerminateMode::Fast } drop(state); @@ -1792,7 +1811,7 @@ impl ComputeNode { // exit loop ComputeStatus::Failed - | ComputeStatus::TerminationPending + | ComputeStatus::TerminationPending { .. } | ComputeStatus::Terminated => break 'cert_update, // wait diff --git a/compute_tools/src/http/routes/terminate.rs b/compute_tools/src/http/routes/terminate.rs index 92a89c0ee7..32d90a5990 100644 --- a/compute_tools/src/http/routes/terminate.rs +++ b/compute_tools/src/http/routes/terminate.rs @@ -1,29 +1,39 @@ -use std::sync::Arc; - +use crate::compute::{ComputeNode, forward_termination_signal}; +use crate::http::JsonResponse; use axum::extract::State; -use axum::response::{IntoResponse, Response}; -use compute_api::responses::ComputeStatus; +use axum::response::Response; +use axum_extra::extract::OptionalQuery; +use compute_api::responses::{ComputeStatus, TerminateResponse}; use http::StatusCode; +use serde::Deserialize; +use std::sync::Arc; use tokio::task; use tracing::info; -use crate::compute::{ComputeNode, forward_termination_signal}; -use crate::http::JsonResponse; +#[derive(Deserialize, Default)] +pub struct TerminateQuery { + mode: compute_api::responses::TerminateMode, +} /// Terminate the compute. -pub(in crate::http) async fn terminate(State(compute): State>) -> Response { +pub(in crate::http) async fn terminate( + State(compute): State>, + OptionalQuery(terminate): OptionalQuery, +) -> Response { + let mode = terminate.unwrap_or_default().mode; { let mut state = compute.state.lock().unwrap(); if state.status == ComputeStatus::Terminated { - return StatusCode::CREATED.into_response(); + return JsonResponse::success(StatusCode::CREATED, state.terminate_flush_lsn); } if !matches!(state.status, ComputeStatus::Empty | ComputeStatus::Running) { return JsonResponse::invalid_status(state.status); } - - state.set_status(ComputeStatus::TerminationPending, &compute.state_changed); - drop(state); + state.set_status( + ComputeStatus::TerminationPending { mode }, + &compute.state_changed, + ); } forward_termination_signal(false); @@ -34,7 +44,7 @@ pub(in crate::http) async fn terminate(State(compute): State>) // be able to serve other requests while some particular request // is waiting for compute to finish configuration. let c = compute.clone(); - task::spawn_blocking(move || { + let lsn = task::spawn_blocking(move || { let mut state = c.state.lock().unwrap(); while state.status != ComputeStatus::Terminated { state = c.state_changed.wait(state).unwrap(); @@ -44,11 +54,10 @@ pub(in crate::http) async fn terminate(State(compute): State>) state.status ); } + state.terminate_flush_lsn }) .await .unwrap(); - info!("terminated Postgres"); - - StatusCode::OK.into_response() + JsonResponse::success(StatusCode::OK, TerminateResponse { lsn }) } diff --git a/compute_tools/src/monitor.rs b/compute_tools/src/monitor.rs index bacaf05cd5..8a2f6addad 100644 --- a/compute_tools/src/monitor.rs +++ b/compute_tools/src/monitor.rs @@ -83,7 +83,9 @@ impl ComputeMonitor { let compute_status = self.compute.get_status(); if matches!( compute_status, - ComputeStatus::Terminated | ComputeStatus::TerminationPending | ComputeStatus::Failed + ComputeStatus::Terminated + | ComputeStatus::TerminationPending { .. } + | ComputeStatus::Failed ) { info!( "compute is in {} status, stopping compute monitor", diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index aeabf4a519..21f55336aa 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -18,7 +18,7 @@ use clap::Parser; use compute_api::requests::ComputeClaimsScope; use compute_api::spec::ComputeMode; use control_plane::broker::StorageBroker; -use control_plane::endpoint::{ComputeControlPlane, PageserverProtocol}; +use control_plane::endpoint::{ComputeControlPlane, EndpointTerminateMode, PageserverProtocol}; use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_ADDR, EndpointStorage}; use control_plane::local_env; use control_plane::local_env::{ @@ -711,10 +711,9 @@ struct EndpointStopCmdArgs { )] destroy: bool, - #[clap(long, help = "Postgres shutdown mode, passed to \"pg_ctl -m \"")] - #[arg(value_parser(["smart", "fast", "immediate"]))] - #[arg(default_value = "fast")] - mode: String, + #[clap(long, help = "Postgres shutdown mode")] + #[clap(default_value = "fast")] + mode: EndpointTerminateMode, } #[derive(clap::Args)] @@ -1658,7 +1657,10 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res .endpoints .get(endpoint_id) .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?; - endpoint.stop(&args.mode, args.destroy)?; + match endpoint.stop(args.mode, args.destroy).await?.lsn { + Some(lsn) => println!("{lsn}"), + None => println!("null"), + } } EndpointCmd::GenerateJwt(args) => { let endpoint = { @@ -2090,11 +2092,16 @@ async fn handle_stop_all(args: &StopCmdArgs, env: &local_env::LocalEnv) -> Resul } async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) { + let mode = if immediate { + EndpointTerminateMode::Immediate + } else { + EndpointTerminateMode::Fast + }; // Stop all endpoints match ComputeControlPlane::load(env.clone()) { Ok(cplane) => { for (_k, node) in cplane.endpoints { - if let Err(e) = node.stop(if immediate { "immediate" } else { "fast" }, false) { + if let Err(e) = node.stop(mode, false).await { eprintln!("postgres stop failed: {e:#}"); } } diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 2df71df57d..ae81e7abbe 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -52,7 +52,8 @@ use compute_api::requests::{ COMPUTE_AUDIENCE, ComputeClaims, ComputeClaimsScope, ConfigurationRequest, }; use compute_api::responses::{ - ComputeConfig, ComputeCtlConfig, ComputeStatus, ComputeStatusResponse, TlsConfig, + ComputeConfig, ComputeCtlConfig, ComputeStatus, ComputeStatusResponse, TerminateResponse, + TlsConfig, }; use compute_api::spec::{ Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, PgIdent, @@ -341,13 +342,33 @@ pub enum EndpointStatus { impl Display for EndpointStatus { fn fmt(&self, writer: &mut std::fmt::Formatter) -> std::fmt::Result { - let s = match self { + writer.write_str(match self { Self::Running => "running", Self::Stopped => "stopped", Self::Crashed => "crashed", Self::RunningNoPidfile => "running, no pidfile", - }; - write!(writer, "{}", s) + }) + } +} + +#[derive(Default, Clone, Copy, clap::ValueEnum)] +pub enum EndpointTerminateMode { + #[default] + /// Use pg_ctl stop -m fast + Fast, + /// Use pg_ctl stop -m immediate + Immediate, + /// Use /terminate?mode=immediate + ImmediateTerminate, +} + +impl std::fmt::Display for EndpointTerminateMode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(match &self { + EndpointTerminateMode::Fast => "fast", + EndpointTerminateMode::Immediate => "immediate", + EndpointTerminateMode::ImmediateTerminate => "immediate-terminate", + }) } } @@ -918,7 +939,7 @@ impl Endpoint { ComputeStatus::Empty | ComputeStatus::ConfigurationPending | ComputeStatus::Configuration - | ComputeStatus::TerminationPending + | ComputeStatus::TerminationPending { .. } | ComputeStatus::Terminated => { bail!("unexpected compute status: {:?}", state.status) } @@ -1040,8 +1061,27 @@ impl Endpoint { } } - pub fn stop(&self, mode: &str, destroy: bool) -> Result<()> { - self.pg_ctl(&["-m", mode, "stop"], &None)?; + pub async fn stop( + &self, + mode: EndpointTerminateMode, + destroy: bool, + ) -> Result { + // pg_ctl stop is fast but doesn't allow us to collect LSN. /terminate is + // slow, and test runs time out. Solution: special mode "immediate-terminate" + // which uses /terminate + let response = if let EndpointTerminateMode::ImmediateTerminate = mode { + let ip = self.external_http_address.ip(); + let port = self.external_http_address.port(); + let url = format!("http://{ip}:{port}/terminate?mode=immediate"); + let token = self.generate_jwt(Some(ComputeClaimsScope::Admin))?; + let request = reqwest::Client::new().post(url).bearer_auth(token); + let response = request.send().await.context("/terminate")?; + let text = response.text().await.context("/terminate result")?; + serde_json::from_str(&text).with_context(|| format!("deserializing {text}"))? + } else { + self.pg_ctl(&["-m", &mode.to_string(), "stop"], &None)?; + TerminateResponse { lsn: None } + }; // Also wait for the compute_ctl process to die. It might have some // cleanup work to do after postgres stops, like syncing safekeepers, @@ -1051,7 +1091,7 @@ impl Endpoint { // waiting. Sometimes we do *not* want this cleanup: tests intentionally // do stop when majority of safekeepers is down, so sync-safekeepers // would hang otherwise. This could be a separate flag though. - let send_sigterm = destroy || mode == "immediate"; + let send_sigterm = destroy || !matches!(mode, EndpointTerminateMode::Fast); self.wait_for_compute_ctl_to_exit(send_sigterm)?; if destroy { println!( @@ -1060,7 +1100,7 @@ impl Endpoint { ); std::fs::remove_dir_all(self.endpoint_path())?; } - Ok(()) + Ok(response) } pub fn connstr(&self, user: &str, db_name: &str) -> String { diff --git a/libs/compute_api/src/responses.rs b/libs/compute_api/src/responses.rs index 24d371c6eb..5cad849e3d 100644 --- a/libs/compute_api/src/responses.rs +++ b/libs/compute_api/src/responses.rs @@ -83,6 +83,16 @@ pub struct ComputeStatusResponse { pub error: Option, } +#[derive(Serialize, Clone, Copy, Debug, Deserialize, PartialEq, Eq, Default)] +#[serde(rename_all = "snake_case")] +pub enum TerminateMode { + #[default] + /// wait 30s till returning from /terminate to allow control plane to get the error + Fast, + /// return from /terminate immediately as soon as all components are terminated + Immediate, +} + #[derive(Serialize, Clone, Copy, Debug, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum ComputeStatus { @@ -103,11 +113,16 @@ pub enum ComputeStatus { // control-plane to terminate it. Failed, // Termination requested - TerminationPending, + TerminationPending { mode: TerminateMode }, // Terminated Postgres Terminated, } +#[derive(Deserialize, Serialize)] +pub struct TerminateResponse { + pub lsn: Option, +} + impl Display for ComputeStatus { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -117,7 +132,7 @@ impl Display for ComputeStatus { ComputeStatus::Running => f.write_str("running"), ComputeStatus::Configuration => f.write_str("configuration"), ComputeStatus::Failed => f.write_str("failed"), - ComputeStatus::TerminationPending => f.write_str("termination-pending"), + ComputeStatus::TerminationPending { .. } => f.write_str("termination-pending"), ComputeStatus::Terminated => f.write_str("terminated"), } } diff --git a/libs/desim/src/executor.rs b/libs/desim/src/executor.rs index df8b071c06..51b11ff97e 100644 --- a/libs/desim/src/executor.rs +++ b/libs/desim/src/executor.rs @@ -419,13 +419,13 @@ pub fn now() -> u64 { with_thread_context(|ctx| ctx.clock.get().unwrap().now()) } -pub fn exit(code: i32, msg: String) { +pub fn exit(code: i32, msg: String) -> ! { with_thread_context(|ctx| { ctx.allow_panic.store(true, Ordering::SeqCst); let mut result = ctx.result.lock(); *result = (code, msg); panic!("exit"); - }); + }) } pub(crate) fn get_thread_ctx() -> Arc { diff --git a/libs/walproposer/src/api_bindings.rs b/libs/walproposer/src/api_bindings.rs index 4d6cbae9a9..b89f1877fd 100644 --- a/libs/walproposer/src/api_bindings.rs +++ b/libs/walproposer/src/api_bindings.rs @@ -311,7 +311,7 @@ extern "C" fn get_redo_start_lsn(wp: *mut WalProposer) -> XLogRecPtr { } } -extern "C-unwind" fn finish_sync_safekeepers(wp: *mut WalProposer, lsn: XLogRecPtr) { +unsafe extern "C-unwind" fn finish_sync_safekeepers(wp: *mut WalProposer, lsn: XLogRecPtr) -> ! { unsafe { let callback_data = (*(*wp).config).callback_data; let api = callback_data as *mut Box; diff --git a/libs/walproposer/src/walproposer.rs b/libs/walproposer/src/walproposer.rs index e95494297c..c853658ddf 100644 --- a/libs/walproposer/src/walproposer.rs +++ b/libs/walproposer/src/walproposer.rs @@ -144,7 +144,7 @@ pub trait ApiImpl { todo!() } - fn finish_sync_safekeepers(&self, _lsn: u64) { + fn finish_sync_safekeepers(&self, _lsn: u64) -> ! { todo!() } @@ -469,7 +469,7 @@ mod tests { true } - fn finish_sync_safekeepers(&self, lsn: u64) { + fn finish_sync_safekeepers(&self, lsn: u64) -> ! { self.sync_channel.send(lsn).unwrap(); panic!("sync safekeepers finished at lsn={}", lsn); } diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index 08087e5a55..4b223b6b18 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -679,8 +679,7 @@ typedef struct walproposer_api * Finish sync safekeepers with the given LSN. This function should not * return and should exit the program. */ - void (*finish_sync_safekeepers) (WalProposer *wp, XLogRecPtr lsn); - + void (*finish_sync_safekeepers) (WalProposer *wp, XLogRecPtr lsn) __attribute__((noreturn)) ; /* * Called after every AppendResponse from the safekeeper. Used to * propagate backpressure feedback and to confirm WAL persistence (has diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index 3d6a92ad79..185fc83ace 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -1890,7 +1890,7 @@ walprop_pg_wait_event_set(WalProposer *wp, long timeout, Safekeeper **sk, uint32 return rc; } -static void +static void __attribute__((noreturn)) walprop_pg_finish_sync_safekeepers(WalProposer *wp, XLogRecPtr lsn) { fprintf(stdout, "%X/%X\n", LSN_FORMAT_ARGS(lsn)); diff --git a/safekeeper/tests/walproposer_sim/walproposer_api.rs b/safekeeper/tests/walproposer_sim/walproposer_api.rs index 82e7a32881..c2604c4bdc 100644 --- a/safekeeper/tests/walproposer_sim/walproposer_api.rs +++ b/safekeeper/tests/walproposer_sim/walproposer_api.rs @@ -499,7 +499,7 @@ impl ApiImpl for SimulationApi { true } - fn finish_sync_safekeepers(&self, lsn: u64) { + fn finish_sync_safekeepers(&self, lsn: u64) -> ! { debug!("finish_sync_safekeepers, lsn={}", lsn); executor::exit(0, Lsn(lsn).to_string()); } diff --git a/test_runner/fixtures/neon_cli.py b/test_runner/fixtures/neon_cli.py index 48a1a36e66..e177145294 100644 --- a/test_runner/fixtures/neon_cli.py +++ b/test_runner/fixtures/neon_cli.py @@ -620,7 +620,7 @@ class NeonLocalCli(AbstractNeonCli): destroy=False, check_return_code=True, mode: str | None = None, - ) -> subprocess.CompletedProcess[str]: + ) -> tuple[Lsn | None, subprocess.CompletedProcess[str]]: args = [ "endpoint", "stop", @@ -632,7 +632,11 @@ class NeonLocalCli(AbstractNeonCli): if endpoint_id is not None: args.append(endpoint_id) - return self.raw_cli(args, check_return_code=check_return_code) + proc = self.raw_cli(args, check_return_code=check_return_code) + log.debug(f"endpoint stop stdout: {proc.stdout}") + lsn_str = proc.stdout.split()[-1] + lsn: Lsn | None = None if lsn_str == "null" else Lsn(lsn_str) + return lsn, proc def mappings_map_branch( self, name: str, tenant_id: TenantId, timeline_id: TimelineId diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index df34573b12..970175a631 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4192,6 +4192,8 @@ class Endpoint(PgProtocol, LogUtils): self._running = threading.Semaphore(0) self.__jwt: str | None = None + self.terminate_flush_lsn: Lsn | None = None + def http_client(self, retries: Retry | None = None) -> EndpointHttpClient: assert self.__jwt is not None return EndpointHttpClient( @@ -4494,9 +4496,10 @@ class Endpoint(PgProtocol, LogUtils): running = self._running.acquire(blocking=False) if running: assert self.endpoint_id is not None - self.env.neon_cli.endpoint_stop( + lsn, _ = self.env.neon_cli.endpoint_stop( self.endpoint_id, check_return_code=self.check_stop_result, mode=mode ) + self.terminate_flush_lsn = lsn if sks_wait_walreceiver_gone is not None: for sk in sks_wait_walreceiver_gone[0]: @@ -4514,9 +4517,10 @@ class Endpoint(PgProtocol, LogUtils): running = self._running.acquire(blocking=False) if running: assert self.endpoint_id is not None - self.env.neon_cli.endpoint_stop( + lsn, _ = self.env.neon_cli.endpoint_stop( self.endpoint_id, True, check_return_code=self.check_stop_result, mode=mode ) + self.terminate_flush_lsn = lsn self.endpoint_id = None return self diff --git a/test_runner/regress/test_replica_promotes.py b/test_runner/regress/test_replica_promotes.py index e378d37635..4486901bae 100644 --- a/test_runner/regress/test_replica_promotes.py +++ b/test_runner/regress/test_replica_promotes.py @@ -4,13 +4,25 @@ File with secondary->primary promotion testing. This far, only contains a test that we don't break and that the data is persisted. """ +from typing import cast + import psycopg2 +from fixtures.common_types import Lsn from fixtures.log_helper import log from fixtures.neon_fixtures import Endpoint, NeonEnv, wait_replica_caughtup from fixtures.pg_version import PgVersion from pytest import raises +def stop_and_check_lsn(ep: Endpoint, expected_lsn: Lsn | None): + ep.stop(mode="immediate-terminate") + lsn = ep.terminate_flush_lsn + if expected_lsn is not None: + assert lsn >= expected_lsn, f"{expected_lsn=} < {lsn=}" + else: + assert lsn == expected_lsn, f"{expected_lsn=} != {lsn=}" + + def test_replica_promotes(neon_simple_env: NeonEnv, pg_version: PgVersion): """ Test that a replica safely promotes, and can commit data updates which @@ -37,7 +49,9 @@ def test_replica_promotes(neon_simple_env: NeonEnv, pg_version: PgVersion): pg_current_wal_flush_lsn() """ ) - log.info(f"Primary: Current LSN after workload is {primary_cur.fetchone()}") + lsn_triple = cast("tuple[str, str, str]", primary_cur.fetchone()) + log.info(f"Primary: Current LSN after workload is {lsn_triple}") + expected_primary_lsn: Lsn = Lsn(lsn_triple[2]) primary_cur.execute("show neon.safekeepers") safekeepers = primary_cur.fetchall()[0][0] @@ -57,7 +71,7 @@ def test_replica_promotes(neon_simple_env: NeonEnv, pg_version: PgVersion): secondary_cur.execute("select count(*) from t") assert secondary_cur.fetchone() == (100,) - primary.stop_and_destroy(mode="immediate") + stop_and_check_lsn(primary, expected_primary_lsn) # Reconnect to the secondary to make sure we get a read-write connection promo_conn = secondary.connect() @@ -109,9 +123,10 @@ def test_replica_promotes(neon_simple_env: NeonEnv, pg_version: PgVersion): # wait_for_last_flush_lsn(env, secondary, env.initial_tenant, env.initial_timeline) - secondary.stop_and_destroy() + # secondaries don't sync safekeepers on finish so LSN will be None + stop_and_check_lsn(secondary, None) - primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary") + primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary2") with primary.connect() as new_primary: new_primary_cur = new_primary.cursor() @@ -122,7 +137,9 @@ def test_replica_promotes(neon_simple_env: NeonEnv, pg_version: PgVersion): pg_current_wal_flush_lsn() """ ) - log.info(f"New primary: Boot LSN is {new_primary_cur.fetchone()}") + lsn_triple = cast("tuple[str, str, str]", new_primary_cur.fetchone()) + expected_primary_lsn = Lsn(lsn_triple[2]) + log.info(f"New primary: Boot LSN is {lsn_triple}") new_primary_cur.execute("select count(*) from t") assert new_primary_cur.fetchone() == (200,) @@ -130,4 +147,4 @@ def test_replica_promotes(neon_simple_env: NeonEnv, pg_version: PgVersion): new_primary_cur.execute("select count(*) from t") assert new_primary_cur.fetchone() == (300,) - primary.stop(mode="immediate") + stop_and_check_lsn(primary, expected_primary_lsn)