mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 13:32:57 +00:00
compute_ctl: return LSN in /terminate (#12240)
- Add optional `?mode=fast|immediate` to `/terminate`, `fast` is default. Immediate avoids waiting 30 seconds before returning from `terminate`. - Add `TerminateMode` to `ComputeStatus::TerminationPending` - Use `/terminate?mode=immediate` in `neon_local` instead of `pg_ctl stop` for `test_replica_promotes`. - Change `test_replica_promotes` to check returned LSN - Annotate `finish_sync_safekeepers` as `noreturn`. https://github.com/neondatabase/cloud/issues/29807
This commit is contained in:
@@ -620,7 +620,7 @@ class NeonLocalCli(AbstractNeonCli):
|
||||
destroy=False,
|
||||
check_return_code=True,
|
||||
mode: str | None = None,
|
||||
) -> subprocess.CompletedProcess[str]:
|
||||
) -> tuple[Lsn | None, subprocess.CompletedProcess[str]]:
|
||||
args = [
|
||||
"endpoint",
|
||||
"stop",
|
||||
@@ -632,7 +632,11 @@ class NeonLocalCli(AbstractNeonCli):
|
||||
if endpoint_id is not None:
|
||||
args.append(endpoint_id)
|
||||
|
||||
return self.raw_cli(args, check_return_code=check_return_code)
|
||||
proc = self.raw_cli(args, check_return_code=check_return_code)
|
||||
log.debug(f"endpoint stop stdout: {proc.stdout}")
|
||||
lsn_str = proc.stdout.split()[-1]
|
||||
lsn: Lsn | None = None if lsn_str == "null" else Lsn(lsn_str)
|
||||
return lsn, proc
|
||||
|
||||
def mappings_map_branch(
|
||||
self, name: str, tenant_id: TenantId, timeline_id: TimelineId
|
||||
|
||||
@@ -4192,6 +4192,8 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
self._running = threading.Semaphore(0)
|
||||
self.__jwt: str | None = None
|
||||
|
||||
self.terminate_flush_lsn: Lsn | None = None
|
||||
|
||||
def http_client(self, retries: Retry | None = None) -> EndpointHttpClient:
|
||||
assert self.__jwt is not None
|
||||
return EndpointHttpClient(
|
||||
@@ -4494,9 +4496,10 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
running = self._running.acquire(blocking=False)
|
||||
if running:
|
||||
assert self.endpoint_id is not None
|
||||
self.env.neon_cli.endpoint_stop(
|
||||
lsn, _ = self.env.neon_cli.endpoint_stop(
|
||||
self.endpoint_id, check_return_code=self.check_stop_result, mode=mode
|
||||
)
|
||||
self.terminate_flush_lsn = lsn
|
||||
|
||||
if sks_wait_walreceiver_gone is not None:
|
||||
for sk in sks_wait_walreceiver_gone[0]:
|
||||
@@ -4514,9 +4517,10 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
running = self._running.acquire(blocking=False)
|
||||
if running:
|
||||
assert self.endpoint_id is not None
|
||||
self.env.neon_cli.endpoint_stop(
|
||||
lsn, _ = self.env.neon_cli.endpoint_stop(
|
||||
self.endpoint_id, True, check_return_code=self.check_stop_result, mode=mode
|
||||
)
|
||||
self.terminate_flush_lsn = lsn
|
||||
self.endpoint_id = None
|
||||
|
||||
return self
|
||||
|
||||
@@ -4,13 +4,25 @@ File with secondary->primary promotion testing.
|
||||
This far, only contains a test that we don't break and that the data is persisted.
|
||||
"""
|
||||
|
||||
from typing import cast
|
||||
|
||||
import psycopg2
|
||||
from fixtures.common_types import Lsn
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv, wait_replica_caughtup
|
||||
from fixtures.pg_version import PgVersion
|
||||
from pytest import raises
|
||||
|
||||
|
||||
def stop_and_check_lsn(ep: Endpoint, expected_lsn: Lsn | None):
|
||||
ep.stop(mode="immediate-terminate")
|
||||
lsn = ep.terminate_flush_lsn
|
||||
if expected_lsn is not None:
|
||||
assert lsn >= expected_lsn, f"{expected_lsn=} < {lsn=}"
|
||||
else:
|
||||
assert lsn == expected_lsn, f"{expected_lsn=} != {lsn=}"
|
||||
|
||||
|
||||
def test_replica_promotes(neon_simple_env: NeonEnv, pg_version: PgVersion):
|
||||
"""
|
||||
Test that a replica safely promotes, and can commit data updates which
|
||||
@@ -37,7 +49,9 @@ def test_replica_promotes(neon_simple_env: NeonEnv, pg_version: PgVersion):
|
||||
pg_current_wal_flush_lsn()
|
||||
"""
|
||||
)
|
||||
log.info(f"Primary: Current LSN after workload is {primary_cur.fetchone()}")
|
||||
lsn_triple = cast("tuple[str, str, str]", primary_cur.fetchone())
|
||||
log.info(f"Primary: Current LSN after workload is {lsn_triple}")
|
||||
expected_primary_lsn: Lsn = Lsn(lsn_triple[2])
|
||||
primary_cur.execute("show neon.safekeepers")
|
||||
safekeepers = primary_cur.fetchall()[0][0]
|
||||
|
||||
@@ -57,7 +71,7 @@ def test_replica_promotes(neon_simple_env: NeonEnv, pg_version: PgVersion):
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (100,)
|
||||
|
||||
primary.stop_and_destroy(mode="immediate")
|
||||
stop_and_check_lsn(primary, expected_primary_lsn)
|
||||
|
||||
# Reconnect to the secondary to make sure we get a read-write connection
|
||||
promo_conn = secondary.connect()
|
||||
@@ -109,9 +123,10 @@ def test_replica_promotes(neon_simple_env: NeonEnv, pg_version: PgVersion):
|
||||
|
||||
# wait_for_last_flush_lsn(env, secondary, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
secondary.stop_and_destroy()
|
||||
# secondaries don't sync safekeepers on finish so LSN will be None
|
||||
stop_and_check_lsn(secondary, None)
|
||||
|
||||
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary2")
|
||||
|
||||
with primary.connect() as new_primary:
|
||||
new_primary_cur = new_primary.cursor()
|
||||
@@ -122,7 +137,9 @@ def test_replica_promotes(neon_simple_env: NeonEnv, pg_version: PgVersion):
|
||||
pg_current_wal_flush_lsn()
|
||||
"""
|
||||
)
|
||||
log.info(f"New primary: Boot LSN is {new_primary_cur.fetchone()}")
|
||||
lsn_triple = cast("tuple[str, str, str]", new_primary_cur.fetchone())
|
||||
expected_primary_lsn = Lsn(lsn_triple[2])
|
||||
log.info(f"New primary: Boot LSN is {lsn_triple}")
|
||||
|
||||
new_primary_cur.execute("select count(*) from t")
|
||||
assert new_primary_cur.fetchone() == (200,)
|
||||
@@ -130,4 +147,4 @@ def test_replica_promotes(neon_simple_env: NeonEnv, pg_version: PgVersion):
|
||||
new_primary_cur.execute("select count(*) from t")
|
||||
assert new_primary_cur.fetchone() == (300,)
|
||||
|
||||
primary.stop(mode="immediate")
|
||||
stop_and_check_lsn(primary, expected_primary_lsn)
|
||||
|
||||
Reference in New Issue
Block a user