From 207c527270c461868e723794a9183020e8cc14cc Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Thu, 30 Nov 2023 16:35:16 +0300 Subject: [PATCH] Safekeepers: persist state before timeline deactivation. Without it, sometimes on restart we lose latest remote_consistent_lsn which leads to excessive ps -> sk reconnections. https://github.com/neondatabase/neon/issues/5993 --- safekeeper/src/safekeeper.rs | 19 ++++++------ safekeeper/src/timeline.rs | 37 +++++++++++++++--------- test_runner/regress/test_wal_acceptor.py | 31 +++++++++++++++----- 3 files changed, 57 insertions(+), 30 deletions(-) diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 47a624281d..217a5f89ee 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -914,9 +914,14 @@ where Ok(()) } - /// Persist control file to disk, called only after timeline creation (bootstrap). - pub async fn persist(&mut self) -> Result<()> { - self.persist_control_file(self.state.clone()).await + /// Persist in-memory state of control file to disk. + // + // TODO: passing inmem_remote_consistent_lsn everywhere is ugly, better + // separate state completely and give Arc to all those who need it. + pub async fn persist_inmem(&mut self, inmem_remote_consistent_lsn: Lsn) -> Result<()> { + let mut state = self.state.clone(); + state.remote_consistent_lsn = inmem_remote_consistent_lsn; + self.persist_control_file(state).await } /// Persist in-memory state to the disk, taking other data from state. @@ -930,7 +935,7 @@ where /// Persist control file if there is something to save and enough time /// passed after the last save. - pub async fn maybe_persist_control_file( + pub async fn maybe_persist_inmem_control_file( &mut self, inmem_remote_consistent_lsn: Lsn, ) -> Result<()> { @@ -943,9 +948,7 @@ where || self.inmem.peer_horizon_lsn > self.state.peer_horizon_lsn || inmem_remote_consistent_lsn > self.state.remote_consistent_lsn; if need_persist { - let mut state = self.state.clone(); - state.remote_consistent_lsn = inmem_remote_consistent_lsn; - self.persist_control_file(state).await?; + self.persist_inmem(inmem_remote_consistent_lsn).await?; trace!("saved control file: {CF_SAVE_INTERVAL:?} passed"); } Ok(()) @@ -1064,8 +1067,6 @@ where if sync_control_file { let mut state = self.state.clone(); - // Note: we could make remote_consistent_lsn update in cf common by - // storing Arc to walsenders in Safekeeper. state.remote_consistent_lsn = new_remote_consistent_lsn; self.persist_control_file(state).await?; } diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 2ba871207e..bdc9088138 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -182,8 +182,9 @@ impl SharedState { } /// Mark timeline active/inactive and return whether s3 offloading requires - /// start/stop action. - fn update_status( + /// start/stop action. If timeline is deactivated, control file is persisted + /// as maintenance task does that only for active timelines. + async fn update_status( &mut self, num_computes: usize, remote_consistent_lsn: Lsn, @@ -191,7 +192,15 @@ impl SharedState { ) -> bool { let is_active = self.is_active(num_computes, remote_consistent_lsn); if self.active != is_active { - info!("timeline {} active={} now", ttid, is_active); + info!( + "timeline {} active={} now, remote_consistent_lsn={}, commit_lsn={}", + ttid, is_active, remote_consistent_lsn, self.sk.inmem.commit_lsn + ); + if !is_active { + if let Err(e) = self.sk.persist_inmem(remote_consistent_lsn).await { + warn!("control file save in update_status failed: {:?}", e); + } + } } self.active = is_active; self.is_wal_backup_action_pending(num_computes) @@ -438,7 +447,7 @@ impl Timeline { fs::create_dir_all(&self.timeline_dir).await?; // Write timeline to disk and start background tasks. - if let Err(e) = shared_state.sk.persist().await { + if let Err(e) = shared_state.sk.persist_inmem(Lsn::INVALID).await { // Bootstrap failed, cancel timeline and remove timeline directory. self.cancel(shared_state); @@ -511,12 +520,14 @@ impl Timeline { self.mutex.lock().await } - fn update_status(&self, shared_state: &mut SharedState) -> bool { - shared_state.update_status( - self.walreceivers.get_num(), - self.get_walsenders().get_remote_consistent_lsn(), - self.ttid, - ) + async fn update_status(&self, shared_state: &mut SharedState) -> bool { + shared_state + .update_status( + self.walreceivers.get_num(), + self.get_walsenders().get_remote_consistent_lsn(), + self.ttid, + ) + .await } /// Update timeline status and kick wal backup launcher to stop/start offloading if needed. @@ -526,7 +537,7 @@ impl Timeline { } let is_wal_backup_action_pending: bool = { let mut shared_state = self.write_shared_state().await; - self.update_status(&mut shared_state) + self.update_status(&mut shared_state).await }; if is_wal_backup_action_pending { // Can fail only if channel to a static thread got closed, which is not normal at all. @@ -683,7 +694,7 @@ impl Timeline { shared_state.sk.record_safekeeper_info(&sk_info).await?; let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now()); shared_state.peers_info.upsert(&peer_info); - is_wal_backup_action_pending = self.update_status(&mut shared_state); + is_wal_backup_action_pending = self.update_status(&mut shared_state).await; commit_lsn = shared_state.sk.inmem.commit_lsn; } self.commit_lsn_watch_tx.send(commit_lsn)?; @@ -828,7 +839,7 @@ impl Timeline { self.write_shared_state() .await .sk - .maybe_persist_control_file(remote_consistent_lsn) + .maybe_persist_inmem_control_file(remote_consistent_lsn) .await } diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index b7eaaf39bc..ad12b56874 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -30,6 +30,7 @@ from fixtures.neon_fixtures import ( Safekeeper, SafekeeperHttpClient, SafekeeperPort, + last_flush_lsn_upload, ) from fixtures.pageserver.utils import ( timeline_delete_wait_completed, @@ -286,29 +287,43 @@ def test_broker(neon_env_builder: NeonEnvBuilder): # wait until remote_consistent_lsn gets advanced on all safekeepers clients = [sk.http_client() for sk in env.safekeepers] stat_before = [cli.timeline_status(tenant_id, timeline_id) for cli in clients] - log.info(f"statuses is {stat_before}") + log.info(f"statuses before insert: {stat_before}") endpoint.safe_psql("INSERT INTO t SELECT generate_series(1,100), 'payload'") - # force checkpoint in pageserver to advance remote_consistent_lsn - wait_lsn_force_checkpoint(tenant_id, timeline_id, endpoint, env.pageserver) + # wait for remote_consistent_lsn to reach flush_lsn, forcing it with checkpoint + new_rcl = last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id) + log.info(f"new_rcl: {new_rcl}") + endpoint.stop() # and wait till remote_consistent_lsn propagates to all safekeepers + # + # TODO: this executes long as timeline on safekeeper is immediately + # deactivated once rcl reaches pageserver one, and thus we generally wait + # till pageserver reconnects to all safekeepers one by one here. Timeline + # status on safekeeper should take into account peers state as well. started_at = time.time() while True: stat_after = [cli.timeline_status(tenant_id, timeline_id) for cli in clients] - if all( - s_after.remote_consistent_lsn > s_before.remote_consistent_lsn - for s_after, s_before in zip(stat_after, stat_before) - ): + if all([s_after.remote_consistent_lsn >= new_rcl for s_after in stat_after]): break elapsed = time.time() - started_at - if elapsed > 20: + if elapsed > 30: raise RuntimeError( f"timed out waiting {elapsed:.0f}s for remote_consistent_lsn propagation: status before {stat_before}, status current {stat_after}" ) time.sleep(1) + # Ensure that safekeepers don't lose remote_consistent_lsn on restart. + # Control file is persisted each 5s. TODO: do that on shutdown and remove sleep. + time.sleep(6) + for sk in env.safekeepers: + sk.stop() + sk.start() + stat_after_restart = [cli.timeline_status(tenant_id, timeline_id) for cli in clients] + log.info(f"statuses after {stat_after_restart}") + assert all([s.remote_consistent_lsn >= new_rcl for s in stat_after_restart]) + # Test that old WAL consumed by peers and pageserver is removed from safekeepers. @pytest.mark.parametrize("auth_enabled", [False, True])