From c5412b37a0fa2f1f9c260028aef3d31d3043fd53 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Sun, 12 May 2024 06:52:39 +0300 Subject: [PATCH] safekeeper: implement timeline pause. So far it immediately stops only writes to the timeline, which is already might be useful. --- safekeeper/src/control_file.rs | 2 +- safekeeper/src/control_file_upgrade.rs | 72 +++++++++++++++++++ safekeeper/src/http/routes.rs | 25 +++++++ safekeeper/src/safekeeper.rs | 3 + safekeeper/src/state.rs | 4 ++ safekeeper/src/timeline.rs | 62 ++++++++++++---- test_runner/fixtures/safekeeper/http.py | 15 ++++ .../regress/test_wal_acceptor_async.py | 37 ++++++++++ 8 files changed, 204 insertions(+), 16 deletions(-) diff --git a/safekeeper/src/control_file.rs b/safekeeper/src/control_file.rs index fe9f2e6899..83a5180153 100644 --- a/safekeeper/src/control_file.rs +++ b/safekeeper/src/control_file.rs @@ -20,7 +20,7 @@ use utils::{bin_ser::LeSer, id::TenantTimelineId}; use crate::SafeKeeperConf; pub const SK_MAGIC: u32 = 0xcafeceefu32; -pub const SK_FORMAT_VERSION: u32 = 8; +pub const SK_FORMAT_VERSION: u32 = 9; // contains persistent metadata for safekeeper const CONTROL_FILE_NAME: &str = "safekeeper.control"; diff --git a/safekeeper/src/control_file_upgrade.rs b/safekeeper/src/control_file_upgrade.rs index 8f4dfe9b43..59bb4d0861 100644 --- a/safekeeper/src/control_file_upgrade.rs +++ b/safekeeper/src/control_file_upgrade.rs @@ -183,6 +183,53 @@ pub struct SafeKeeperStateV7 { pub peers: PersistedPeers, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct SafeKeeperStateV8 { + #[serde(with = "hex")] + pub tenant_id: TenantId, + #[serde(with = "hex")] + pub timeline_id: TimelineId, + /// persistent acceptor state + pub acceptor_state: AcceptorState, + /// information about server + pub server: ServerInfo, + /// Unique id of the last *elected* proposer we dealt with. Not needed + /// for correctness, exists for monitoring purposes. + #[serde(with = "hex")] + pub proposer_uuid: PgUuid, + /// Since which LSN this timeline generally starts. Safekeeper might have + /// joined later. + pub timeline_start_lsn: Lsn, + /// Since which LSN safekeeper has (had) WAL for this timeline. + /// All WAL segments next to one containing local_start_lsn are + /// filled with data from the beginning. + pub local_start_lsn: Lsn, + /// Part of WAL acknowledged by quorum *and available locally*. Always points + /// to record boundary. + pub commit_lsn: Lsn, + /// LSN that points to the end of the last backed up segment. Useful to + /// persist to avoid finding out offloading progress on boot. + pub backup_lsn: Lsn, + /// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn + /// of last record streamed to everyone). Persisting it helps skipping + /// recovery in walproposer, generally we compute it from peers. In + /// walproposer proto called 'truncate_lsn'. Updates are currently drived + /// only by walproposer. + pub peer_horizon_lsn: Lsn, + /// LSN of the oldest known checkpoint made by pageserver and successfully + /// pushed to s3. We don't remove WAL beyond it. Persisted only for + /// informational purposes, we receive it from pageserver (or broker). + pub remote_consistent_lsn: Lsn, + /// Peers and their state as we remember it. Knowing peers themselves is + /// fundamental; but state is saved here only for informational purposes and + /// obviously can be stale. (Currently not saved at all, but let's provision + /// place to have less file version upgrades). + pub peers: PersistedPeers, + /// Holds names of partial segments uploaded to remote storage. Used to + /// clean up old objects without leaving garbage in remote storage. + pub partial_backup: wal_backup_partial::State, +} + pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result { // migrate to storing full term history if version == 1 { @@ -213,6 +260,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result Result Result Result Result) -> Result) -> Result, ApiError> { + check_permission(&request, None)?; + + let data: PauseRequest = json_request(&mut request).await?; + let ttid = TenantTimelineId::new( + parse_request_param(&request, "tenant_id")?, + parse_request_param(&request, "timeline_id")?, + ); + + let tli = GlobalTimelines::get(ttid)?; + tli.set_pause(data.pause) + .await + .map_err(ApiError::InternalServerError)?; + json_response(StatusCode::OK, ()) +} + /// Pull timeline from peer safekeeper instances. async fn timeline_pull_handler(mut request: Request) -> Result, ApiError> { check_permission(&request, None)?; @@ -536,6 +558,9 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder .delete("/v1/tenant/:tenant_id", |r| { request_span(r, tenant_delete_handler) }) + .post("/v1/tenant/:tenant_id/timeline/:timeline_id/pause", |r| { + request_span(r, timeline_pause_handler) + }) .post("/v1/pull_timeline", |r| { request_span(r, timeline_pull_handler) }) diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index e671d4f36a..ebec4d1370 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -1238,6 +1238,7 @@ mod tests { }, )]), partial_backup: crate::wal_backup_partial::State::default(), + paused: false, }; let ser = state.ser().unwrap(); @@ -1285,6 +1286,8 @@ mod tests { 0xb0, 0x01, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00, // partial_backup 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // paused + 0x00 ]; assert_eq!(Hex(&ser), Hex(&expected)); diff --git a/safekeeper/src/state.rs b/safekeeper/src/state.rs index be5e516296..5ae0ba406d 100644 --- a/safekeeper/src/state.rs +++ b/safekeeper/src/state.rs @@ -63,6 +63,9 @@ pub struct TimelinePersistentState { /// Holds names of partial segments uploaded to remote storage. Used to /// clean up old objects without leaving garbage in remote storage. pub partial_backup: wal_backup_partial::State, + /// Paused timeline forbids writes to it (it might be useful to pause any + /// other activity as well, but that's a todo). + pub paused: bool, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] @@ -98,6 +101,7 @@ impl TimelinePersistentState { .collect(), ), partial_backup: wal_backup_partial::State::default(), + paused: false, } } diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 6f72f7ed3a..286595e96c 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -313,6 +313,8 @@ impl SharedState { #[derive(Debug, thiserror::Error)] pub enum TimelineError { + #[error("Timeline {0} is paused")] + Paused(TenantTimelineId), #[error("Timeline {0} was cancelled and cannot be used anymore")] Cancelled(TenantTimelineId), #[error("Timeline {0} was not found in global map")] @@ -508,6 +510,27 @@ impl Timeline { } } + /// Pause timeline, forbidding writes to it, or resume after it has been paused. + pub async fn set_pause(&self, paused: bool) -> Result<()> { + { + let mut ss = self.write_shared_state(true).await?; + if paused && !ss.sk.state.paused { + let mut persistent_state = ss.sk.state.start_change(); + persistent_state.paused = true; + ss.sk.state.finish_change(&persistent_state).await?; + info!("paused timeline {}", self.ttid); + } + if !paused && ss.sk.state.paused { + let mut persistent_state = ss.sk.state.start_change(); + persistent_state.paused = false; + ss.sk.state.finish_change(&persistent_state).await?; + info!("resumed timeline {}", self.ttid); + } + } + self.update_status_notify().await?; + Ok(()) + } + /// Delete timeline from disk completely, by removing timeline directory. /// Background timeline activities will stop eventually. /// @@ -569,23 +592,30 @@ impl Timeline { /// Take a writing mutual exclusive lock on timeline shared_state. /// /// Unless allowed_cancelled is true errors out if timeline is cancelled (deleted). + /// Unless allow_paused is true errors out if timeline is paused: so far pause + /// immediately forbids only compute writes, but we might want to pause other + /// activites in the future. async fn write_shared_state_internal( &self, allow_cancelled: bool, + allow_paused: bool, ) -> Result> { let state = self.mutex.lock().await; if !allow_cancelled && self.is_cancelled() { bail!(TimelineError::Cancelled(self.ttid)); } + if !allow_paused && state.sk.state.paused { + bail!(TimelineError::Paused(self.ttid)); + } Ok(state) } - pub async fn write_shared_state(&self) -> Result> { - self.write_shared_state_internal(false).await + pub async fn write_shared_state(&self, allow_paused: bool) -> Result> { + self.write_shared_state_internal(false, allow_paused).await } pub async fn write_shared_state_allow_cancelled(&self) -> MutexGuard { - self.write_shared_state_internal(true) + self.write_shared_state_internal(true, true) .await .expect("timeline is cancelled and allow_cancelled is false") } @@ -599,7 +629,7 @@ impl Timeline { /// Update timeline status and kick wal backup launcher to stop/start offloading if needed. pub async fn update_status_notify(&self) -> Result<()> { let is_wal_backup_action_pending: bool = { - let mut shared_state = self.write_shared_state().await?; + let mut shared_state = self.write_shared_state_allow_cancelled().await; self.update_status(&mut shared_state).await }; if is_wal_backup_action_pending { @@ -615,7 +645,7 @@ impl Timeline { /// remote_consistent_lsn update through replication feedback, and we want /// to stop pushing to the broker if pageserver is fully caughtup. pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool { - let shared_state = match self.write_shared_state().await { + let shared_state = match self.write_shared_state(false).await { Ok(state) => state, Err(_) => return true, }; @@ -626,9 +656,9 @@ impl Timeline { false } - /// Ensure thht current term is t, erroring otherwise, and lock the state. + /// Ensure that current term is t, erroring otherwise, and lock the state. pub async fn acquire_term(&self, t: Term) -> Result> { - let ss = self.write_shared_state().await?; + let ss = self.write_shared_state(false).await?; if ss.sk.state.acceptor_state.term != t { bail!( "failed to acquire term {}, current term {}", @@ -642,10 +672,12 @@ impl Timeline { /// Returns whether s3 offloading is required and sets current status as /// matching it. pub async fn wal_backup_attend(&self) -> bool { - match self.write_shared_state().await { - Ok(mut state) => state.wal_backup_attend(self.walreceivers.get_num()), - Err(_) => false, + let mut ss = self.write_shared_state_allow_cancelled().await; + if self.is_cancelled() || ss.sk.state.paused { + ss.wal_backup_active = false; + return false; } + ss.wal_backup_attend(self.walreceivers.get_num()) } /// Returns commit_lsn watch channel. @@ -667,7 +699,7 @@ impl Timeline { let commit_lsn: Lsn; let term_flush_lsn: TermLsn; { - let mut shared_state = self.write_shared_state().await?; + let mut shared_state = self.write_shared_state(false).await?; rmsg = shared_state.sk.process_msg(msg).await?; // if this is AppendResponse, fill in proper hot standby feedback. @@ -693,7 +725,7 @@ impl Timeline { /// Returns true only if the timeline is loaded and active. pub async fn is_active(&self) -> bool { - match self.write_shared_state().await { + match self.write_shared_state(false).await { Ok(state) => state.active, Err(_) => false, } @@ -717,7 +749,7 @@ impl Timeline { /// Sets backup_lsn to the given value. pub async fn set_wal_backup_lsn(&self, backup_lsn: Lsn) -> Result<()> { - let mut state = self.write_shared_state().await?; + let mut state = self.write_shared_state(true).await?; state.sk.state.inmem.backup_lsn = max(state.sk.state.inmem.backup_lsn, backup_lsn); // we should check whether to shut down offloader, but this will be done // soon by peer communication anyway. @@ -872,7 +904,7 @@ impl Timeline { let horizon_segno: XLogSegNo; let remover = { - let shared_state = self.write_shared_state().await?; + let shared_state = self.write_shared_state(true).await?; horizon_segno = shared_state.get_horizon_segno(wal_backup_enabled, replication_horizon_lsn); if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno { @@ -897,7 +929,7 @@ impl Timeline { /// to date so that storage nodes restart doesn't cause many pageserver -> /// safekeeper reconnections. pub async fn maybe_persist_control_file(&self) -> Result<()> { - self.write_shared_state() + self.write_shared_state(true) .await? .sk .maybe_persist_inmem_control_file() diff --git a/test_runner/fixtures/safekeeper/http.py b/test_runner/fixtures/safekeeper/http.py index b9c1986818..a730c0feee 100644 --- a/test_runner/fixtures/safekeeper/http.py +++ b/test_runner/fixtures/safekeeper/http.py @@ -77,6 +77,21 @@ class SafekeeperHttpClient(requests.Session): assert res_json is None return res_json + def set_pause( + self, + tenant_id: TenantId, + timeline_id: TimelineId, + pause: bool, + ): + res = self.post( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/pause", + json={ + "pause": pause, + }, + ) + res.raise_for_status() + return res.json() + def debug_dump(self, params: Optional[Dict[str, str]] = None) -> Dict[str, Any]: params = params or {} res = self.get(f"http://localhost:{self.port}/v1/debug_dump", params=params) diff --git a/test_runner/regress/test_wal_acceptor_async.py b/test_runner/regress/test_wal_acceptor_async.py index dce5616ac6..6b7122dc39 100644 --- a/test_runner/regress/test_wal_acceptor_async.py +++ b/test_runner/regress/test_wal_acceptor_async.py @@ -531,6 +531,43 @@ def test_recovery_uncommitted(neon_env_builder: NeonEnvBuilder): asyncio.run(run_recovery_uncommitted(env)) +async def run_pause(env: NeonEnv): + (sk1, sk2, _) = env.safekeepers + (sk1_http, sk2_http, _) = [sk.http_client() for sk in env.safekeepers] + + tenant_id = env.initial_tenant + timeline_id = env.neon_cli.create_branch("test_pause") + + ep = env.endpoints.create_start("test_pause") + ep.safe_psql("create table t(key int, value text)") + ep.safe_psql("insert into t select generate_series(1, 100), 'payload'") + + # Halt two safekeepers, query should hang + sk1_http.set_pause(tenant_id, timeline_id, True) + sk2_http.set_pause(tenant_id, timeline_id, True) + conn = await ep.connect_async() + # query should hang, so execute in separate task + bg_query = asyncio.create_task( + conn.execute("insert into t select generate_series(1, 200), 'payload'") + ) + sleep_sec = 2 + await asyncio.sleep(sleep_sec) + # it must still be not finished + assert not bg_query.done() + + # resume timeline on one of paused safekeepers, should be enough to commit + sk2_http.set_pause(tenant_id, timeline_id, False) + ep.safe_psql("insert into t select generate_series(1, 2000), 'payload'") + + +# Test timeline pause +def test_pause(neon_env_builder: NeonEnvBuilder): + neon_env_builder.num_safekeepers = 3 + env = neon_env_builder.init_start() + + asyncio.run(run_pause(env)) + + async def run_segment_init_failure(env: NeonEnv): env.neon_cli.create_branch("test_segment_init_failure") ep = env.endpoints.create_start("test_segment_init_failure")