Safekeepers: persist state before timeline deactivation.

Without it, sometimes on restart we lose latest remote_consistent_lsn which
leads to excessive ps -> sk reconnections.

https://github.com/neondatabase/neon/issues/5993
This commit is contained in:
Arseny Sher
2023-11-30 16:35:16 +03:00
committed by Arseny Sher
parent eae49ff598
commit 207c527270
3 changed files with 57 additions and 30 deletions

View File

@@ -914,9 +914,14 @@ where
Ok(())
}
/// Persist control file to disk, called only after timeline creation (bootstrap).
pub async fn persist(&mut self) -> Result<()> {
self.persist_control_file(self.state.clone()).await
/// Persist in-memory state of control file to disk.
//
// TODO: passing inmem_remote_consistent_lsn everywhere is ugly, better
// separate state completely and give Arc to all those who need it.
pub async fn persist_inmem(&mut self, inmem_remote_consistent_lsn: Lsn) -> Result<()> {
let mut state = self.state.clone();
state.remote_consistent_lsn = inmem_remote_consistent_lsn;
self.persist_control_file(state).await
}
/// Persist in-memory state to the disk, taking other data from state.
@@ -930,7 +935,7 @@ where
/// Persist control file if there is something to save and enough time
/// passed after the last save.
pub async fn maybe_persist_control_file(
pub async fn maybe_persist_inmem_control_file(
&mut self,
inmem_remote_consistent_lsn: Lsn,
) -> Result<()> {
@@ -943,9 +948,7 @@ where
|| self.inmem.peer_horizon_lsn > self.state.peer_horizon_lsn
|| inmem_remote_consistent_lsn > self.state.remote_consistent_lsn;
if need_persist {
let mut state = self.state.clone();
state.remote_consistent_lsn = inmem_remote_consistent_lsn;
self.persist_control_file(state).await?;
self.persist_inmem(inmem_remote_consistent_lsn).await?;
trace!("saved control file: {CF_SAVE_INTERVAL:?} passed");
}
Ok(())
@@ -1064,8 +1067,6 @@ where
if sync_control_file {
let mut state = self.state.clone();
// Note: we could make remote_consistent_lsn update in cf common by
// storing Arc to walsenders in Safekeeper.
state.remote_consistent_lsn = new_remote_consistent_lsn;
self.persist_control_file(state).await?;
}

View File

@@ -182,8 +182,9 @@ impl SharedState {
}
/// Mark timeline active/inactive and return whether s3 offloading requires
/// start/stop action.
fn update_status(
/// start/stop action. If timeline is deactivated, control file is persisted
/// as maintenance task does that only for active timelines.
async fn update_status(
&mut self,
num_computes: usize,
remote_consistent_lsn: Lsn,
@@ -191,7 +192,15 @@ impl SharedState {
) -> bool {
let is_active = self.is_active(num_computes, remote_consistent_lsn);
if self.active != is_active {
info!("timeline {} active={} now", ttid, is_active);
info!(
"timeline {} active={} now, remote_consistent_lsn={}, commit_lsn={}",
ttid, is_active, remote_consistent_lsn, self.sk.inmem.commit_lsn
);
if !is_active {
if let Err(e) = self.sk.persist_inmem(remote_consistent_lsn).await {
warn!("control file save in update_status failed: {:?}", e);
}
}
}
self.active = is_active;
self.is_wal_backup_action_pending(num_computes)
@@ -438,7 +447,7 @@ impl Timeline {
fs::create_dir_all(&self.timeline_dir).await?;
// Write timeline to disk and start background tasks.
if let Err(e) = shared_state.sk.persist().await {
if let Err(e) = shared_state.sk.persist_inmem(Lsn::INVALID).await {
// Bootstrap failed, cancel timeline and remove timeline directory.
self.cancel(shared_state);
@@ -511,12 +520,14 @@ impl Timeline {
self.mutex.lock().await
}
fn update_status(&self, shared_state: &mut SharedState) -> bool {
shared_state.update_status(
self.walreceivers.get_num(),
self.get_walsenders().get_remote_consistent_lsn(),
self.ttid,
)
async fn update_status(&self, shared_state: &mut SharedState) -> bool {
shared_state
.update_status(
self.walreceivers.get_num(),
self.get_walsenders().get_remote_consistent_lsn(),
self.ttid,
)
.await
}
/// Update timeline status and kick wal backup launcher to stop/start offloading if needed.
@@ -526,7 +537,7 @@ impl Timeline {
}
let is_wal_backup_action_pending: bool = {
let mut shared_state = self.write_shared_state().await;
self.update_status(&mut shared_state)
self.update_status(&mut shared_state).await
};
if is_wal_backup_action_pending {
// Can fail only if channel to a static thread got closed, which is not normal at all.
@@ -683,7 +694,7 @@ impl Timeline {
shared_state.sk.record_safekeeper_info(&sk_info).await?;
let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now());
shared_state.peers_info.upsert(&peer_info);
is_wal_backup_action_pending = self.update_status(&mut shared_state);
is_wal_backup_action_pending = self.update_status(&mut shared_state).await;
commit_lsn = shared_state.sk.inmem.commit_lsn;
}
self.commit_lsn_watch_tx.send(commit_lsn)?;
@@ -828,7 +839,7 @@ impl Timeline {
self.write_shared_state()
.await
.sk
.maybe_persist_control_file(remote_consistent_lsn)
.maybe_persist_inmem_control_file(remote_consistent_lsn)
.await
}

View File

@@ -30,6 +30,7 @@ from fixtures.neon_fixtures import (
Safekeeper,
SafekeeperHttpClient,
SafekeeperPort,
last_flush_lsn_upload,
)
from fixtures.pageserver.utils import (
timeline_delete_wait_completed,
@@ -286,29 +287,43 @@ def test_broker(neon_env_builder: NeonEnvBuilder):
# wait until remote_consistent_lsn gets advanced on all safekeepers
clients = [sk.http_client() for sk in env.safekeepers]
stat_before = [cli.timeline_status(tenant_id, timeline_id) for cli in clients]
log.info(f"statuses is {stat_before}")
log.info(f"statuses before insert: {stat_before}")
endpoint.safe_psql("INSERT INTO t SELECT generate_series(1,100), 'payload'")
# force checkpoint in pageserver to advance remote_consistent_lsn
wait_lsn_force_checkpoint(tenant_id, timeline_id, endpoint, env.pageserver)
# wait for remote_consistent_lsn to reach flush_lsn, forcing it with checkpoint
new_rcl = last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
log.info(f"new_rcl: {new_rcl}")
endpoint.stop()
# and wait till remote_consistent_lsn propagates to all safekeepers
#
# TODO: this executes long as timeline on safekeeper is immediately
# deactivated once rcl reaches pageserver one, and thus we generally wait
# till pageserver reconnects to all safekeepers one by one here. Timeline
# status on safekeeper should take into account peers state as well.
started_at = time.time()
while True:
stat_after = [cli.timeline_status(tenant_id, timeline_id) for cli in clients]
if all(
s_after.remote_consistent_lsn > s_before.remote_consistent_lsn
for s_after, s_before in zip(stat_after, stat_before)
):
if all([s_after.remote_consistent_lsn >= new_rcl for s_after in stat_after]):
break
elapsed = time.time() - started_at
if elapsed > 20:
if elapsed > 30:
raise RuntimeError(
f"timed out waiting {elapsed:.0f}s for remote_consistent_lsn propagation: status before {stat_before}, status current {stat_after}"
)
time.sleep(1)
# Ensure that safekeepers don't lose remote_consistent_lsn on restart.
# Control file is persisted each 5s. TODO: do that on shutdown and remove sleep.
time.sleep(6)
for sk in env.safekeepers:
sk.stop()
sk.start()
stat_after_restart = [cli.timeline_status(tenant_id, timeline_id) for cli in clients]
log.info(f"statuses after {stat_after_restart}")
assert all([s.remote_consistent_lsn >= new_rcl for s in stat_after_restart])
# Test that old WAL consumed by peers and pageserver is removed from safekeepers.
@pytest.mark.parametrize("auth_enabled", [False, True])