Keep walproposer alive until shutdown checkpoint is safe on safekepeers

The walproposer pretends to be a walsender in many ways. It has a
WalSnd slot, it claims to be a walsender by calling
MarkPostmasterChildWalSender() etc. But one different to real
walsenders was that the postmaster still treated it as a bgworker
rather than a walsender. The difference is that at shutdown,
walsenders are not killed until the very end, after the checkpointer
process has written the shutdown checkpoint and exited.

As a result, the walproposer always got killed before the shutdown
checkpoint was written, so the shutdown checkpoint never made it to
safekeepers. That's fine in principle, we don't require a clean
shutdown after all. But it also feels a bit silly not to stream the
shutdown checkpoint. It could be useful for initializing hot standby
mode in a read replica, for example.

Change postmaster to treat background workers that have called
MarkPostmasterChildWalSender() as walsenders. That unfortunately
requires another small change in postgres core.

After doing that, walproposers stay alive longer. However, it also
means that the checkpointer will wait for the walproposer to switch to
WALSNDSTATE_STOPPING state, when the checkpointer sends the
PROCSIG_WALSND_INIT_STOPPING signal. We don't have the machinery in
walproposer to receive and handle that signal reliably. Instead, we
mark walproposer as being in WALSNDSTATE_STOPPING always.

In commit 568f91420a, I assumed that shutdown will wait for all the
remaining WAL to be streamed to safekeepers, but before this commit
that was not true, and the test became flaky. This should make it
stable again.

Some tests wrongly assumed that no WAL could have been written between
pg_current_wal_flush_lsn and quick pg stop after it. Fix them by introducing
flush_ep_to_pageserver which first stops the endpoint and then waits till all
committed WAL reaches the pageserver.

In passing extract safekeeper http client to its own module.
This commit is contained in:
Heikki Linnakangas
2024-02-09 22:01:20 +02:00
committed by Arseny Sher
parent 0cf0731d8b
commit 74d09b78c7
18 changed files with 460 additions and 281 deletions

View File

@@ -324,11 +324,11 @@ extern "C" fn finish_sync_safekeepers(wp: *mut WalProposer, lsn: XLogRecPtr) {
}
}
extern "C" fn process_safekeeper_feedback(wp: *mut WalProposer, commit_lsn: XLogRecPtr) {
extern "C" fn process_safekeeper_feedback(wp: *mut WalProposer) {
unsafe {
let callback_data = (*(*wp).config).callback_data;
let api = callback_data as *mut Box<dyn ApiImpl>;
(*api).process_safekeeper_feedback(&mut (*wp), commit_lsn)
(*api).process_safekeeper_feedback(&mut (*wp))
}
}

View File

@@ -142,7 +142,7 @@ pub trait ApiImpl {
todo!()
}
fn process_safekeeper_feedback(&self, _wp: &mut WalProposer, _commit_lsn: u64) {
fn process_safekeeper_feedback(&mut self, _wp: &mut WalProposer) {
todo!()
}

View File

@@ -1220,7 +1220,7 @@ PrepareAppendRequest(WalProposer *wp, AppendRequestHeader *req, XLogRecPtr begin
req->epochStartLsn = wp->propEpochStartLsn;
req->beginLsn = beginLsn;
req->endLsn = endLsn;
req->commitLsn = GetAcknowledgedByQuorumWALPosition(wp);
req->commitLsn = wp->commitLsn;
req->truncateLsn = wp->truncateLsn;
req->proposerId = wp->greetRequest.proposerId;
}
@@ -1405,7 +1405,7 @@ static bool
RecvAppendResponses(Safekeeper *sk)
{
WalProposer *wp = sk->wp;
XLogRecPtr minQuorumLsn;
XLogRecPtr newCommitLsn;
bool readAnything = false;
while (true)
@@ -1444,18 +1444,19 @@ RecvAppendResponses(Safekeeper *sk)
if (!readAnything)
return sk->state == SS_ACTIVE;
HandleSafekeeperResponse(wp);
/* update commit_lsn */
newCommitLsn = GetAcknowledgedByQuorumWALPosition(wp);
/*
* Also send the new commit lsn to all the safekeepers.
* Send the new value to all safekeepers.
*/
minQuorumLsn = GetAcknowledgedByQuorumWALPosition(wp);
if (minQuorumLsn > wp->lastSentCommitLsn)
if (newCommitLsn > wp->commitLsn)
{
wp->commitLsn = newCommitLsn;
BroadcastAppendRequest(wp);
wp->lastSentCommitLsn = minQuorumLsn;
}
HandleSafekeeperResponse(wp);
return sk->state == SS_ACTIVE;
}
@@ -1632,11 +1633,9 @@ GetDonor(WalProposer *wp, XLogRecPtr *donor_lsn)
static void
HandleSafekeeperResponse(WalProposer *wp)
{
XLogRecPtr minQuorumLsn;
XLogRecPtr candidateTruncateLsn;
minQuorumLsn = GetAcknowledgedByQuorumWALPosition(wp);
wp->api.process_safekeeper_feedback(wp, minQuorumLsn);
wp->api.process_safekeeper_feedback(wp);
/*
* Try to advance truncateLsn -- the last record flushed to all
@@ -1649,7 +1648,7 @@ HandleSafekeeperResponse(WalProposer *wp)
* can't commit entries from previous term' in Raft); 2)
*/
candidateTruncateLsn = CalculateMinFlushLsn(wp);
candidateTruncateLsn = Min(candidateTruncateLsn, minQuorumLsn);
candidateTruncateLsn = Min(candidateTruncateLsn, wp->commitLsn);
if (candidateTruncateLsn > wp->truncateLsn)
{
wp->truncateLsn = candidateTruncateLsn;

View File

@@ -564,7 +564,7 @@ typedef struct walproposer_api
* backpressure feedback and to confirm WAL persistence (has been commited
* on the quorum of safekeepers).
*/
void (*process_safekeeper_feedback) (WalProposer *wp, XLogRecPtr commitLsn);
void (*process_safekeeper_feedback) (WalProposer *wp);
/*
* Write a log message to the internal log processor. This is used only
@@ -646,8 +646,8 @@ typedef struct WalProposer
/* WAL has been generated up to this point */
XLogRecPtr availableLsn;
/* last commitLsn broadcasted to safekeepers */
XLogRecPtr lastSentCommitLsn;
/* cached GetAcknowledgedByQuorumWALPosition result */
XLogRecPtr commitLsn;
ProposerGreeting greetRequest;

View File

@@ -68,6 +68,8 @@ static WalproposerShmemState *walprop_shared;
static WalProposerConfig walprop_config;
static XLogRecPtr sentPtr = InvalidXLogRecPtr;
static const walproposer_api walprop_pg;
static volatile sig_atomic_t got_SIGUSR2 = false;
static bool reported_sigusr2 = false;
static void nwp_shmem_startup_hook(void);
static void nwp_register_gucs(void);
@@ -101,6 +103,8 @@ static void add_nwr_event_set(Safekeeper *sk, uint32 events);
static void update_nwr_event_set(Safekeeper *sk, uint32 events);
static void rm_safekeeper_event_set(Safekeeper *to_remove, bool is_sk);
static void CheckGracefulShutdown(WalProposer *wp);
static XLogRecPtr GetLogRepRestartLSN(WalProposer *wp);
static void
@@ -492,6 +496,24 @@ walprop_pg_init_standalone_sync_safekeepers(void)
BackgroundWorkerUnblockSignals();
}
/*
* We pretend to be a walsender process, and the lifecycle of a walsender is
* slightly different than other procesess. At shutdown, walsender processes
* stay alive until the very end, after the checkpointer has written the
* shutdown checkpoint. When the checkpointer exits, the postmaster sends all
* remaining walsender processes SIGUSR2. On receiving SIGUSR2, we try to send
* the remaining WAL, and then exit. This ensures that the checkpoint record
* reaches durable storage (in safekeepers), before the server shuts down
* completely.
*/
static void
walprop_sigusr2(SIGNAL_ARGS)
{
got_SIGUSR2 = true;
SetLatch(MyLatch);
}
static void
walprop_pg_init_bgworker(void)
{
@@ -503,6 +525,7 @@ walprop_pg_init_bgworker(void)
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
pqsignal(SIGHUP, SignalHandlerForConfigReload);
pqsignal(SIGTERM, die);
pqsignal(SIGUSR2, walprop_sigusr2);
BackgroundWorkerUnblockSignals();
@@ -1075,14 +1098,26 @@ StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd)
#endif
/*
* When we first start replication the standby will be behind the primary.
* For some applications, for example synchronous replication, it is
* important to have a clear state for this initial catchup mode, so we
* can trigger actions when we change streaming state later. We may stay
* in this state for a long time, which is exactly why we want to be able
* to monitor whether or not we are still here.
* XXX: Move straight to STOPPING state, skipping the STREAMING state.
*
* This is a bit weird. Normal walsenders stay in STREAMING state, until
* the checkpointer signals them that it is about to start writing the
* shutdown checkpoint. The walsenders acknowledge that they have received
* that signal by switching to STOPPING state. That tells the walsenders
* that they must not write any new WAL.
*
* However, we cannot easily intercept that signal from the checkpointer.
* It's sent by WalSndInitStopping(), using
* SendProcSignal(PROCSIGNAL_WALSND_INIT_STOPPING). It's received by
* HandleWalSndInitStopping, which sets a process-local got_STOPPING flag.
* However, that's all private to walsender.c.
*
* We don't need to do anything special upon receiving the signal, the
* walproposer doesn't write any WAL anyway, so we skip the STREAMING
* state and go directly to STOPPING mode. That way, the checkpointer
* won't wait for us.
*/
WalSndSetState(WALSNDSTATE_CATCHUP);
WalSndSetState(WALSNDSTATE_STOPPING);
/*
* Don't allow a request to stream from a future point in WAL that hasn't
@@ -1122,6 +1157,8 @@ StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd)
static void
WalSndLoop(WalProposer *wp)
{
XLogRecPtr flushPtr;
/* Clear any already-pending wakeups */
ResetLatch(MyLatch);
@@ -1130,9 +1167,6 @@ WalSndLoop(WalProposer *wp)
CHECK_FOR_INTERRUPTS();
XLogBroadcastWalProposer(wp);
if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
WalSndSetState(WALSNDSTATE_STREAMING);
WalProposerPoll(wp);
}
}
@@ -1744,6 +1778,9 @@ walprop_pg_wait_event_set(WalProposer *wp, long timeout, Safekeeper **sk, uint32
{
ConditionVariableCancelSleep();
ResetLatch(MyLatch);
CheckGracefulShutdown(wp);
*events = WL_LATCH_SET;
return 1;
}
@@ -1797,6 +1834,41 @@ walprop_pg_finish_sync_safekeepers(WalProposer *wp, XLogRecPtr lsn)
exit(0);
}
/*
* Like vanilla walsender, on sigusr2 send all remaining WAL and exit.
*
* Note that unlike sync-safekeepers waiting here is not reliable: we
* don't check that majority of safekeepers received and persisted
* commit_lsn -- only that walproposer reached it (which immediately
* broadcasts new value). Doing that without incurring redundant control
* file syncing would need wp -> sk protocol change. OTOH unlike
* sync-safekeepers which must bump commit_lsn or basebackup will fail,
* this catchup is important only for tests where safekeepers/network
* don't crash on their own.
*/
static void
CheckGracefulShutdown(WalProposer *wp)
{
if (got_SIGUSR2)
{
if (!reported_sigusr2)
{
XLogRecPtr flushPtr = walprop_pg_get_flush_rec_ptr(wp);
wpg_log(LOG, "walproposer will send and wait for remaining WAL between %X/%X and %X/%X",
LSN_FORMAT_ARGS(wp->commitLsn), LSN_FORMAT_ARGS(flushPtr));
reported_sigusr2 = true;
}
if (wp->commitLsn >= walprop_pg_get_flush_rec_ptr(wp))
{
wpg_log(LOG, "walproposer sent all WAL up to %X/%X, exiting",
LSN_FORMAT_ARGS(wp->commitLsn));
proc_exit(0);
}
}
}
/*
* Choose most advanced PageserverFeedback and set it to *rf.
*/
@@ -1877,7 +1949,7 @@ CombineHotStanbyFeedbacks(HotStandbyFeedback *hs, WalProposer *wp)
* None of that is functional in sync-safekeepers.
*/
static void
walprop_pg_process_safekeeper_feedback(WalProposer *wp, XLogRecPtr commitLsn)
walprop_pg_process_safekeeper_feedback(WalProposer *wp)
{
HotStandbyFeedback hsFeedback;
XLogRecPtr oldDiskConsistentLsn;
@@ -1892,10 +1964,10 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, XLogRecPtr commitLsn)
replication_feedback_set(&quorumFeedback.rf);
SetZenithCurrentClusterSize(quorumFeedback.rf.currentClusterSize);
if (commitLsn > quorumFeedback.flushLsn || oldDiskConsistentLsn != quorumFeedback.rf.disk_consistent_lsn)
if (wp->commitLsn > quorumFeedback.flushLsn || oldDiskConsistentLsn != quorumFeedback.rf.disk_consistent_lsn)
{
if (commitLsn > quorumFeedback.flushLsn)
quorumFeedback.flushLsn = commitLsn;
if (wp->commitLsn > quorumFeedback.flushLsn)
quorumFeedback.flushLsn = wp->commitLsn;
/*
* Advance the replication slot to commitLsn. WAL before it is
@@ -1928,6 +2000,8 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, XLogRecPtr commitLsn)
XidFromFullTransactionId(hsFeedback.catalog_xmin),
EpochFromFullTransactionId(hsFeedback.catalog_xmin));
}
CheckGracefulShutdown(wp);
}
static XLogRecPtr

View File

@@ -196,6 +196,7 @@ pub struct SimulationApi {
safekeepers: RefCell<Vec<SafekeeperConn>>,
disk: Arc<DiskWalProposer>,
redo_start_lsn: Option<Lsn>,
last_logged_commit_lsn: u64,
shmem: UnsafeCell<walproposer::bindings::WalproposerShmemState>,
config: Config,
event_set: RefCell<Option<EventSet>>,
@@ -228,6 +229,7 @@ impl SimulationApi {
safekeepers: RefCell::new(sk_conns),
disk: args.disk,
redo_start_lsn: args.redo_start_lsn,
last_logged_commit_lsn: 0,
shmem: UnsafeCell::new(walproposer::bindings::WalproposerShmemState {
mutex: 0,
feedback: PageserverFeedback {
@@ -596,14 +598,11 @@ impl ApiImpl for SimulationApi {
}
}
fn process_safekeeper_feedback(
&self,
wp: &mut walproposer::bindings::WalProposer,
commit_lsn: u64,
) {
debug!("process_safekeeper_feedback, commit_lsn={}", commit_lsn);
if commit_lsn > wp.lastSentCommitLsn {
self.os.log_event(format!("commit_lsn;{}", commit_lsn));
fn process_safekeeper_feedback(&mut self, wp: &mut walproposer::bindings::WalProposer) {
debug!("process_safekeeper_feedback, commit_lsn={}", wp.commitLsn);
if wp.commitLsn > self.last_logged_commit_lsn {
self.os.log_event(format!("commit_lsn;{}", wp.commitLsn));
self.last_logged_commit_lsn = wp.commitLsn;
}
}

View File

@@ -15,11 +15,11 @@ import threading
import time
import uuid
from contextlib import closing, contextmanager
from dataclasses import dataclass, field
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from fcntl import LOCK_EX, LOCK_UN, flock
from functools import cached_property
from functools import cached_property, partial
from itertools import chain, product
from pathlib import Path
from types import TracebackType
@@ -70,6 +70,8 @@ from fixtures.remote_storage import (
default_remote_storage,
remote_storage_to_toml_inline_table,
)
from fixtures.safekeeper.http import SafekeeperHttpClient
from fixtures.safekeeper.utils import are_walreceivers_absent
from fixtures.types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.utils import (
ATTACHMENT_NAME_REGEX,
@@ -2547,6 +2549,20 @@ class PgBin:
)
return base_path
def get_pg_controldata_checkpoint_lsn(self, pgdata: str) -> Lsn:
"""
Run pg_controldata on given datadir and extract checkpoint lsn.
"""
pg_controldata_path = os.path.join(self.pg_bin_path, "pg_controldata")
cmd = f"{pg_controldata_path} -D {pgdata}"
result = subprocess.run(cmd, capture_output=True, text=True, shell=True)
checkpoint_lsn = re.findall(
"Latest checkpoint location:\\s+([0-9A-F]+/[0-9A-F]+)", result.stdout
)[0]
log.info(f"last checkpoint at {checkpoint_lsn}")
return Lsn(checkpoint_lsn)
@pytest.fixture(scope="function")
def pg_bin(test_output_dir: Path, pg_distrib_dir: Path, pg_version: PgVersion) -> PgBin:
@@ -3565,220 +3581,6 @@ class Safekeeper:
return segments
# Walreceiver as returned by sk's timeline status endpoint.
@dataclass
class Walreceiver:
conn_id: int
state: str
@dataclass
class SafekeeperTimelineStatus:
acceptor_epoch: int
pg_version: int # Not exactly a PgVersion, safekeeper returns version as int, for example 150002 for 15.2
flush_lsn: Lsn
commit_lsn: Lsn
timeline_start_lsn: Lsn
backup_lsn: Lsn
peer_horizon_lsn: Lsn
remote_consistent_lsn: Lsn
walreceivers: List[Walreceiver]
@dataclass
class SafekeeperMetrics:
# These are metrics from Prometheus which uses float64 internally.
# As a consequence, values may differ from real original int64s.
flush_lsn_inexact: Dict[Tuple[TenantId, TimelineId], int] = field(default_factory=dict)
commit_lsn_inexact: Dict[Tuple[TenantId, TimelineId], int] = field(default_factory=dict)
class SafekeeperHttpClient(requests.Session):
HTTPError = requests.HTTPError
def __init__(self, port: int, auth_token: Optional[str] = None, is_testing_enabled=False):
super().__init__()
self.port = port
self.auth_token = auth_token
self.is_testing_enabled = is_testing_enabled
if auth_token is not None:
self.headers["Authorization"] = f"Bearer {auth_token}"
def check_status(self):
self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()
def is_testing_enabled_or_skip(self):
if not self.is_testing_enabled:
pytest.skip("safekeeper was built without 'testing' feature")
def configure_failpoints(self, config_strings: Tuple[str, str] | List[Tuple[str, str]]):
self.is_testing_enabled_or_skip()
if isinstance(config_strings, tuple):
pairs = [config_strings]
else:
pairs = config_strings
log.info(f"Requesting config failpoints: {repr(pairs)}")
res = self.put(
f"http://localhost:{self.port}/v1/failpoints",
json=[{"name": name, "actions": actions} for name, actions in pairs],
)
log.info(f"Got failpoints request response code {res.status_code}")
res.raise_for_status()
res_json = res.json()
assert res_json is None
return res_json
def debug_dump(self, params: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
params = params or {}
res = self.get(f"http://localhost:{self.port}/v1/debug_dump", params=params)
res.raise_for_status()
res_json = json.loads(res.text)
assert isinstance(res_json, dict)
return res_json
def patch_control_file(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
patch: Dict[str, Any],
) -> Dict[str, Any]:
res = self.patch(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/control_file",
json={
"updates": patch,
"apply_fields": list(patch.keys()),
},
)
res.raise_for_status()
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def pull_timeline(self, body: Dict[str, Any]) -> Dict[str, Any]:
res = self.post(f"http://localhost:{self.port}/v1/pull_timeline", json=body)
res.raise_for_status()
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def copy_timeline(self, tenant_id: TenantId, timeline_id: TimelineId, body: Dict[str, Any]):
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/copy",
json=body,
)
res.raise_for_status()
def timeline_digest(
self, tenant_id: TenantId, timeline_id: TimelineId, from_lsn: Lsn, until_lsn: Lsn
) -> Dict[str, Any]:
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/digest",
params={
"from_lsn": str(from_lsn),
"until_lsn": str(until_lsn),
},
)
res.raise_for_status()
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def timeline_create(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
pg_version: int, # Not exactly a PgVersion, safekeeper returns version as int, for example 150002 for 15.2
commit_lsn: Lsn,
):
body = {
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
"pg_version": pg_version,
"commit_lsn": str(commit_lsn),
}
res = self.post(f"http://localhost:{self.port}/v1/tenant/timeline", json=body)
res.raise_for_status()
def timeline_status(
self, tenant_id: TenantId, timeline_id: TimelineId
) -> SafekeeperTimelineStatus:
res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}")
res.raise_for_status()
resj = res.json()
walreceivers = [Walreceiver(wr["conn_id"], wr["status"]) for wr in resj["walreceivers"]]
return SafekeeperTimelineStatus(
acceptor_epoch=resj["acceptor_state"]["epoch"],
pg_version=resj["pg_info"]["pg_version"],
flush_lsn=Lsn(resj["flush_lsn"]),
commit_lsn=Lsn(resj["commit_lsn"]),
timeline_start_lsn=Lsn(resj["timeline_start_lsn"]),
backup_lsn=Lsn(resj["backup_lsn"]),
peer_horizon_lsn=Lsn(resj["peer_horizon_lsn"]),
remote_consistent_lsn=Lsn(resj["remote_consistent_lsn"]),
walreceivers=walreceivers,
)
def record_safekeeper_info(self, tenant_id: TenantId, timeline_id: TimelineId, body):
res = self.post(
f"http://localhost:{self.port}/v1/record_safekeeper_info/{tenant_id}/{timeline_id}",
json=body,
)
res.raise_for_status()
# only_local doesn't remove segments in the remote storage.
def timeline_delete(
self, tenant_id: TenantId, timeline_id: TimelineId, only_local: bool = False
) -> Dict[Any, Any]:
res = self.delete(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}",
params={
"only_local": str(only_local).lower(),
},
)
res.raise_for_status()
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def tenant_delete_force(self, tenant_id: TenantId) -> Dict[Any, Any]:
res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
res.raise_for_status()
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def get_metrics_str(self) -> str:
request_result = self.get(f"http://localhost:{self.port}/metrics")
request_result.raise_for_status()
return request_result.text
def get_metrics(self) -> SafekeeperMetrics:
all_metrics_text = self.get_metrics_str()
metrics = SafekeeperMetrics()
for match in re.finditer(
r'^safekeeper_flush_lsn{tenant_id="([0-9a-f]+)",timeline_id="([0-9a-f]+)"} (\S+)$',
all_metrics_text,
re.MULTILINE,
):
metrics.flush_lsn_inexact[(TenantId(match.group(1)), TimelineId(match.group(2)))] = int(
match.group(3)
)
for match in re.finditer(
r'^safekeeper_commit_lsn{tenant_id="([0-9a-f]+)",timeline_id="([0-9a-f]+)"} (\S+)$',
all_metrics_text,
re.MULTILINE,
):
metrics.commit_lsn_inexact[
(TenantId(match.group(1)), TimelineId(match.group(2)))
] = int(match.group(3))
return metrics
class S3Scrubber:
def __init__(self, env: NeonEnvBuilder, log_dir: Optional[Path] = None):
self.env = env
@@ -4262,6 +4064,49 @@ def wait_for_last_flush_lsn(
return min(results)
def flush_ep_to_pageserver(
env: NeonEnv,
ep: Endpoint,
tenant: TenantId,
timeline: TimelineId,
pageserver_id: Optional[int] = None,
) -> Lsn:
"""
Stop endpoint and wait until all committed WAL reaches the pageserver
(last_record_lsn). This is for use by tests which want everything written so
far to reach pageserver *and* expecting that no more data will arrive until
endpoint starts again, so unlike wait_for_last_flush_lsn it polls
safekeepers instead of compute to learn LSN.
Returns the catch up LSN.
"""
ep.stop()
commit_lsn: Lsn = Lsn(0)
# In principle in the absense of failures polling single sk would be enough.
for sk in env.safekeepers:
cli = sk.http_client()
# wait until compute connections are gone
wait_until(30, 0.5, partial(are_walreceivers_absent, cli, tenant, timeline))
commit_lsn = max(cli.get_commit_lsn(tenant, timeline), commit_lsn)
# Note: depending on WAL filtering implementation, probably most shards
# won't be able to reach commit_lsn (unless gaps are also ack'ed), so this
# is broken in sharded case.
shards = tenant_get_shards(env, tenant, pageserver_id)
for tenant_shard_id, pageserver in shards:
log.info(
f"flush_ep_to_pageserver: waiting for {commit_lsn} on shard {tenant_shard_id} on pageserver {pageserver.id})"
)
waited = wait_for_last_record_lsn(
pageserver.http_client(), tenant_shard_id, timeline, commit_lsn
)
assert waited >= commit_lsn
return commit_lsn
def wait_for_wal_insert_lsn(
env: NeonEnv,
endpoint: Endpoint,

View File

@@ -0,0 +1,227 @@
import json
import re
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple, Union
import pytest
import requests
from fixtures.log_helper import log
from fixtures.types import Lsn, TenantId, TimelineId
# Walreceiver as returned by sk's timeline status endpoint.
@dataclass
class Walreceiver:
conn_id: int
state: str
@dataclass
class SafekeeperTimelineStatus:
acceptor_epoch: int
pg_version: int # Not exactly a PgVersion, safekeeper returns version as int, for example 150002 for 15.2
flush_lsn: Lsn
commit_lsn: Lsn
timeline_start_lsn: Lsn
backup_lsn: Lsn
peer_horizon_lsn: Lsn
remote_consistent_lsn: Lsn
walreceivers: List[Walreceiver]
@dataclass
class SafekeeperMetrics:
# These are metrics from Prometheus which uses float64 internally.
# As a consequence, values may differ from real original int64s.
flush_lsn_inexact: Dict[Tuple[TenantId, TimelineId], int] = field(default_factory=dict)
commit_lsn_inexact: Dict[Tuple[TenantId, TimelineId], int] = field(default_factory=dict)
class SafekeeperHttpClient(requests.Session):
HTTPError = requests.HTTPError
def __init__(self, port: int, auth_token: Optional[str] = None, is_testing_enabled=False):
super().__init__()
self.port = port
self.auth_token = auth_token
self.is_testing_enabled = is_testing_enabled
if auth_token is not None:
self.headers["Authorization"] = f"Bearer {auth_token}"
def check_status(self):
self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()
def is_testing_enabled_or_skip(self):
if not self.is_testing_enabled:
pytest.skip("safekeeper was built without 'testing' feature")
def configure_failpoints(self, config_strings: Union[Tuple[str, str], List[Tuple[str, str]]]):
self.is_testing_enabled_or_skip()
if isinstance(config_strings, tuple):
pairs = [config_strings]
else:
pairs = config_strings
log.info(f"Requesting config failpoints: {repr(pairs)}")
res = self.put(
f"http://localhost:{self.port}/v1/failpoints",
json=[{"name": name, "actions": actions} for name, actions in pairs],
)
log.info(f"Got failpoints request response code {res.status_code}")
res.raise_for_status()
res_json = res.json()
assert res_json is None
return res_json
def debug_dump(self, params: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
params = params or {}
res = self.get(f"http://localhost:{self.port}/v1/debug_dump", params=params)
res.raise_for_status()
res_json = json.loads(res.text)
assert isinstance(res_json, dict)
return res_json
def patch_control_file(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
patch: Dict[str, Any],
) -> Dict[str, Any]:
res = self.patch(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/control_file",
json={
"updates": patch,
"apply_fields": list(patch.keys()),
},
)
res.raise_for_status()
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def pull_timeline(self, body: Dict[str, Any]) -> Dict[str, Any]:
res = self.post(f"http://localhost:{self.port}/v1/pull_timeline", json=body)
res.raise_for_status()
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def copy_timeline(self, tenant_id: TenantId, timeline_id: TimelineId, body: Dict[str, Any]):
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/copy",
json=body,
)
res.raise_for_status()
def timeline_digest(
self, tenant_id: TenantId, timeline_id: TimelineId, from_lsn: Lsn, until_lsn: Lsn
) -> Dict[str, Any]:
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/digest",
params={
"from_lsn": str(from_lsn),
"until_lsn": str(until_lsn),
},
)
res.raise_for_status()
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def timeline_create(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
pg_version: int, # Not exactly a PgVersion, safekeeper returns version as int, for example 150002 for 15.2
commit_lsn: Lsn,
):
body = {
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
"pg_version": pg_version,
"commit_lsn": str(commit_lsn),
}
res = self.post(f"http://localhost:{self.port}/v1/tenant/timeline", json=body)
res.raise_for_status()
def timeline_status(
self, tenant_id: TenantId, timeline_id: TimelineId
) -> SafekeeperTimelineStatus:
res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}")
res.raise_for_status()
resj = res.json()
walreceivers = [Walreceiver(wr["conn_id"], wr["status"]) for wr in resj["walreceivers"]]
return SafekeeperTimelineStatus(
acceptor_epoch=resj["acceptor_state"]["epoch"],
pg_version=resj["pg_info"]["pg_version"],
flush_lsn=Lsn(resj["flush_lsn"]),
commit_lsn=Lsn(resj["commit_lsn"]),
timeline_start_lsn=Lsn(resj["timeline_start_lsn"]),
backup_lsn=Lsn(resj["backup_lsn"]),
peer_horizon_lsn=Lsn(resj["peer_horizon_lsn"]),
remote_consistent_lsn=Lsn(resj["remote_consistent_lsn"]),
walreceivers=walreceivers,
)
def get_commit_lsn(self, tenant_id: TenantId, timeline_id: TimelineId) -> Lsn:
return self.timeline_status(tenant_id, timeline_id).commit_lsn
def record_safekeeper_info(self, tenant_id: TenantId, timeline_id: TimelineId, body):
res = self.post(
f"http://localhost:{self.port}/v1/record_safekeeper_info/{tenant_id}/{timeline_id}",
json=body,
)
res.raise_for_status()
# only_local doesn't remove segments in the remote storage.
def timeline_delete(
self, tenant_id: TenantId, timeline_id: TimelineId, only_local: bool = False
) -> Dict[Any, Any]:
res = self.delete(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}",
params={
"only_local": str(only_local).lower(),
},
)
res.raise_for_status()
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def tenant_delete_force(self, tenant_id: TenantId) -> Dict[Any, Any]:
res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
res.raise_for_status()
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def get_metrics_str(self) -> str:
request_result = self.get(f"http://localhost:{self.port}/metrics")
request_result.raise_for_status()
return request_result.text
def get_metrics(self) -> SafekeeperMetrics:
all_metrics_text = self.get_metrics_str()
metrics = SafekeeperMetrics()
for match in re.finditer(
r'^safekeeper_flush_lsn{tenant_id="([0-9a-f]+)",timeline_id="([0-9a-f]+)"} (\S+)$',
all_metrics_text,
re.MULTILINE,
):
metrics.flush_lsn_inexact[(TenantId(match.group(1)), TimelineId(match.group(2)))] = int(
match.group(3)
)
for match in re.finditer(
r'^safekeeper_commit_lsn{tenant_id="([0-9a-f]+)",timeline_id="([0-9a-f]+)"} (\S+)$',
all_metrics_text,
re.MULTILINE,
):
metrics.commit_lsn_inexact[
(TenantId(match.group(1)), TimelineId(match.group(2)))
] = int(match.group(3))
return metrics

View File

@@ -0,0 +1,11 @@
from fixtures.log_helper import log
from fixtures.safekeeper.http import SafekeeperHttpClient
from fixtures.types import TenantId, TimelineId
def are_walreceivers_absent(
sk_http_cli: SafekeeperHttpClient, tenant_id: TenantId, timeline_id: TimelineId
):
status = sk_http_cli.timeline_status(tenant_id, timeline_id)
log.info(f"waiting for walreceivers to be gone, currently {status.walreceivers}")
return len(status.walreceivers) == 0

View File

@@ -4,12 +4,11 @@ import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
flush_ep_to_pageserver,
wait_for_last_flush_lsn,
)
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.pageserver.utils import wait_for_upload
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import Lsn
from fixtures.utils import query_scalar
# Crates a few layers, ensures that we can evict them (removing locally but keeping track of them anyway)
@@ -46,14 +45,15 @@ def test_basic_eviction(
FROM generate_series(1, 5000000) g
"""
)
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
wait_for_last_record_lsn(client, tenant_id, timeline_id, current_lsn)
# stops the endpoint
current_lsn = flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id)
client.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(client, tenant_id, timeline_id, current_lsn)
# disable compute & sks to avoid on-demand downloads by walreceiver / getpage
endpoint.stop()
# stop sks to avoid on-demand downloads by walreceiver / getpage; endpoint
# has already been stopped by flush_ep_to_pageserver
for sk in env.safekeepers:
sk.stop()

View File

@@ -1,7 +1,7 @@
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.neon_fixtures import NeonEnvBuilder, flush_ep_to_pageserver
from fixtures.pageserver.types import (
DeltaLayerFileName,
ImageLayerFileName,
@@ -115,8 +115,7 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder):
)
== 0
)
endpoint.stop()
last_record_lsn = flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id)
wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id)

View File

@@ -8,6 +8,7 @@ from typing import Any, DefaultDict, Dict, Tuple
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
flush_ep_to_pageserver,
last_flush_lsn_upload,
wait_for_last_flush_lsn,
)
@@ -517,7 +518,7 @@ def test_compaction_downloads_on_demand_without_image_creation(neon_env_builder:
with endpoint.cursor() as cur:
cur.execute("update a set id = -id")
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id)
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
layers = pageserver_http.layer_map_info(tenant_id, timeline_id)

View File

@@ -28,7 +28,6 @@ from fixtures.neon_fixtures import (
PgBin,
PgProtocol,
Safekeeper,
SafekeeperHttpClient,
SafekeeperPort,
last_flush_lsn_upload,
)
@@ -46,6 +45,8 @@ from fixtures.remote_storage import (
default_remote_storage,
s3_storage,
)
from fixtures.safekeeper.http import SafekeeperHttpClient
from fixtures.safekeeper.utils import are_walreceivers_absent
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import get_dir_size, query_scalar, start_in_background
@@ -1097,12 +1098,6 @@ def is_flush_lsn_aligned(sk_http_clis, tenant_id, timeline_id):
return all([flush_lsns[0] == flsn for flsn in flush_lsns])
def are_walreceivers_absent(sk_http_cli, tenant_id: TenantId, timeline_id: TimelineId):
status = sk_http_cli.timeline_status(tenant_id, timeline_id)
log.info(f"waiting for walreceivers to be gone, currently {status.walreceivers}")
return len(status.walreceivers) == 0
# Assert by xxd that WAL on given safekeepers is identical. No compute must be
# running for this to be reliable.
def cmp_sk_wal(sks: List[Safekeeper], tenant_id: TenantId, timeline_id: TimelineId):
@@ -1347,6 +1342,36 @@ def test_peer_recovery(neon_env_builder: NeonEnvBuilder):
endpoint.safe_psql("insert into t select generate_series(1,100), 'payload'")
# Test that when compute is terminated in fast (or smart) mode, walproposer is
# allowed to run and self terminate after shutdown checkpoint is written, so it
# commits it to safekeepers before exiting. This not required for correctness,
# but needed for tests using check_restored_datadir_content.
def test_wp_graceful_shutdown(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
neon_env_builder.num_safekeepers = 1
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.neon_cli.create_branch("test_wp_graceful_shutdown")
ep = env.endpoints.create_start("test_wp_graceful_shutdown")
ep.safe_psql("create table t(key int, value text)")
ep.stop()
# figure out checkpoint lsn
ckpt_lsn = pg_bin.get_pg_controldata_checkpoint_lsn(ep.pg_data_dir_path())
sk_http_cli = env.safekeepers[0].http_client()
commit_lsn = sk_http_cli.timeline_status(tenant_id, timeline_id).commit_lsn
# Note: this is in memory value. Graceful shutdown of walproposer currently
# doesn't guarantee persisted value, which is ok as we need it only for
# tests. Persisting it without risking too many cf flushes needs a wp -> sk
# protocol change. (though in reality shutdown sync-safekeepers does flush
# of cf, so most of the time persisted value wouldn't lag)
log.info(f"sk commit_lsn {commit_lsn}")
# note that ckpt_lsn is the *beginning* of checkpoint record, so commit_lsn
# must be actually higher
assert commit_lsn > ckpt_lsn, "safekeeper must have checkpoint record"
class SafekeeperEnv:
def __init__(
self,

View File

@@ -1,6 +1,5 @@
{
"postgres-v16": "072697b2250da3251af75887b577104554b9cd44",
"postgres-v15": "e8b9a28006a550d7ca7cbb9bd0238eb9cd57bbd8",
"postgres-v14": "f49a962b9b3715d6f47017d1dcf905c36f93ae5e"
"postgres-v16": "90078947229aa7f9ac5f7ed4527b2c7386d5332b",
"postgres-v15": "56f32c0e7330d17aaeee8bf211a73995180bd133",
"postgres-v14": "b980d6f090c676e55fb2c830fb2434f532f635c0"
}