diff --git a/libs/postgres_backend/src/lib.rs b/libs/postgres_backend/src/lib.rs index 9075a019b4..8c024375c1 100644 --- a/libs/postgres_backend/src/lib.rs +++ b/libs/postgres_backend/src/lib.rs @@ -834,7 +834,7 @@ impl PostgresBackend { use CopyStreamHandlerEnd::*; let expected_end = match &end { - ServerInitiated(_) | CopyDone | CopyFail | Terminate | EOF => true, + ServerInitiated(_) | CopyDone | CopyFail | Terminate | EOF | Cancelled => true, CopyStreamHandlerEnd::Disconnected(ConnectionError::Io(io_error)) if is_expected_io_error(io_error) => { @@ -874,6 +874,9 @@ impl PostgresBackend { // message from server' when it receives ErrorResponse (anything but // CopyData/CopyDone) back. CopyFail => Some((end.to_string(), SQLSTATE_SUCCESSFUL_COMPLETION)), + + // When cancelled, send no response: we must not risk blocking on sending that response + Cancelled => None, _ => None, }; if let Some((err, errcode)) = err_to_send_and_errcode { @@ -1051,6 +1054,8 @@ pub enum CopyStreamHandlerEnd { /// The connection was lost #[error("connection error: {0}")] Disconnected(#[from] ConnectionError), + #[error("Shutdown")] + Cancelled, /// Some other error #[error(transparent)] Other(#[from] anyhow::Error), diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index 2edcc4ef6f..bfa1764abf 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -239,6 +239,10 @@ impl SafekeeperPostgresHandler { pgb: &mut PostgresBackend, tli: &mut Option, ) -> Result<(), CopyStreamHandlerEnd> { + // The `tli` parameter is only used for passing _out_ a timeline, one should + // not have been passed in. + assert!(tli.is_none()); + // Notify the libpq client that it's allowed to send `CopyData` messages pgb.write_message(&BeMessage::CopyBothResponse).await?; @@ -256,6 +260,7 @@ impl SafekeeperPostgresHandler { // sends, so this avoids deadlocks. let mut pgb_reader = pgb.split().context("START_WAL_PUSH split")?; let peer_addr = *pgb.get_peer_addr(); + let mut network_reader = NetworkReader { ttid: self.ttid, conn_id: self.conn_id, @@ -275,10 +280,14 @@ impl SafekeeperPostgresHandler { .subscribe(); *tli = Some(timeline.wal_residence_guard().await?); + let timeline_cancel = timeline.cancel.clone(); tokio::select! { // todo: add read|write .context to these errors r = network_reader.run(msg_tx, msg_rx, reply_tx, timeline, next_msg) => r, r = network_write(pgb, reply_rx, pageserver_feedback_rx) => r, + _ = timeline_cancel.cancelled() => { + return Err(CopyStreamHandlerEnd::Cancelled); + } } } else { res.map(|_| ()) @@ -303,7 +312,7 @@ impl SafekeeperPostgresHandler { // Otherwise, WalAcceptor thread must have errored. match wal_acceptor_res { - Ok(Ok(_)) => Ok(()), // can't happen currently; would be if we add graceful termination + Ok(Ok(_)) => Ok(()), // Clean shutdown Ok(Err(e)) => Err(CopyStreamHandlerEnd::Other(e.context("WAL acceptor"))), Err(_) => Err(CopyStreamHandlerEnd::Other(anyhow!( "WalAcceptor task panicked", @@ -356,6 +365,7 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> { Ok((tli, next_msg)) } + /// This function is cancellation-safe (only does network I/O and channel read/writes). async fn run( self, msg_tx: Sender, @@ -397,6 +407,7 @@ async fn read_network_loop( loop { let started = Instant::now(); let size = next_msg.size(); + match msg_tx.send_timeout(next_msg, SLOW_THRESHOLD).await { Ok(()) => {} // Slow send, log a message and keep trying. Log context has timeline ID. @@ -428,6 +439,8 @@ async fn read_network_loop( /// Read replies from WalAcceptor and pass them back to socket. Returns Ok(()) /// if reply_rx closed; it must mean WalAcceptor terminated, joining it should /// tell the error. +/// +/// This function is cancellation-safe (only does network I/O and channel read/writes). async fn network_write( pgb_writer: &mut PostgresBackend, mut reply_rx: Receiver, @@ -461,7 +474,7 @@ async fn network_write( Some(AcceptorProposerMessage::AppendResponse(append_response)) } _ => None, - } + }, }; let Some(msg) = msg else { @@ -527,6 +540,10 @@ impl WalAcceptor { /// The main loop. Returns Ok(()) if either msg_rx or reply_tx got closed; /// it must mean that network thread terminated. + /// + /// This function is *not* cancellation safe, it does local disk I/O: it should always + /// be allowed to run to completion. It respects Timeline::cancel and shuts down cleanly + /// when that gets triggered. async fn run(&mut self) -> anyhow::Result<()> { let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id); @@ -541,7 +558,7 @@ impl WalAcceptor { // Tracks whether we have unflushed appends. let mut dirty = false; - loop { + while !self.tli.is_cancelled() { let reply = tokio::select! { // Process inbound message. msg = self.msg_rx.recv() => { @@ -599,6 +616,10 @@ impl WalAcceptor { WAL_RECEIVER_QUEUE_DEPTH.observe(self.msg_rx.len() as f64); None // no reply } + + _ = self.tli.cancel.cancelled() => { + break; + } }; // Send reply, if any. @@ -610,7 +631,7 @@ impl WalAcceptor { } // Flush WAL on disconnect, see https://github.com/neondatabase/neon/issues/9259. - if dirty { + if dirty && !self.tli.cancel.is_cancelled() { self.tli .process_msg(&ProposerAcceptorMessage::FlushWAL) .await?; diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 6d94ff98b1..aa65ec851b 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -456,6 +456,8 @@ impl SafekeeperPostgresHandler { // not synchronized with sends, so this avoids deadlocks. let reader = pgb.split().context("START_REPLICATION split")?; + let tli_cancel = tli.cancel.clone(); + let mut sender = WalSender { pgb, // should succeed since we're already holding another guard @@ -479,6 +481,9 @@ impl SafekeeperPostgresHandler { // todo: add read|write .context to these errors r = sender.run() => r, r = reply_reader.run() => r, + _ = tli_cancel.cancelled() => { + return Err(CopyStreamHandlerEnd::Cancelled); + } }; let ws_state = ws_guard @@ -557,6 +562,7 @@ impl WalSender<'_, IO> { /// Send WAL until /// - an error occurs /// - receiver is caughtup and there is no computes (if streaming up to commit_lsn) + /// - timeline's cancellation token fires /// /// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ? /// convenience. @@ -601,15 +607,14 @@ impl WalSender<'_, IO> { }; let send_buf = &send_buf[..send_size]; - // and send it - self.pgb - .write_message(&BeMessage::XLogData(XLogDataBody { - wal_start: self.start_pos.0, - wal_end: self.end_pos.0, - timestamp: get_current_timestamp(), - data: send_buf, - })) - .await?; + // and send it, while respecting Timeline::cancel + let msg = BeMessage::XLogData(XLogDataBody { + wal_start: self.start_pos.0, + wal_end: self.end_pos.0, + timestamp: get_current_timestamp(), + data: send_buf, + }); + self.pgb.write_message(&msg).await?; if let Some(appname) = &self.appname { if appname == "replica" { @@ -674,13 +679,13 @@ impl WalSender<'_, IO> { } } - self.pgb - .write_message(&BeMessage::KeepAlive(WalSndKeepAlive { - wal_end: self.end_pos.0, - timestamp: get_current_timestamp(), - request_reply: true, - })) - .await?; + let msg = BeMessage::KeepAlive(WalSndKeepAlive { + wal_end: self.end_pos.0, + timestamp: get_current_timestamp(), + request_reply: true, + }); + + self.pgb.write_message(&msg).await?; } } diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 85add6bfea..ef928f7633 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize}; use tokio::fs::{self}; use tokio_util::sync::CancellationToken; use utils::id::TenantId; +use utils::sync::gate::Gate; use std::cmp::max; use std::ops::{Deref, DerefMut}; @@ -467,6 +468,10 @@ pub struct Timeline { timeline_dir: Utf8PathBuf, manager_ctl: ManagerCtl, + /// Hold this gate from code that depends on the Timeline's non-shut-down state. While holding + /// this gate, you must respect [`Timeline::cancel`] + pub(crate) gate: Gate, + /// Delete/cancel will trigger this, background tasks should drop out as soon as it fires pub(crate) cancel: CancellationToken, @@ -508,6 +513,7 @@ impl Timeline { mutex: RwLock::new(shared_state), walsenders: WalSenders::new(walreceivers.clone()), walreceivers, + gate: Default::default(), cancel: CancellationToken::default(), manager_ctl: ManagerCtl::new(), broker_active: AtomicBool::new(false), @@ -533,56 +539,6 @@ impl Timeline { )) } - /// Initialize fresh timeline on disk and start background tasks. If init - /// fails, timeline is cancelled and cannot be used anymore. - /// - /// Init is transactional, so if it fails, created files will be deleted, - /// and state on disk should remain unchanged. - pub async fn init_new( - self: &Arc, - shared_state: &mut WriteGuardSharedState<'_>, - conf: &SafeKeeperConf, - broker_active_set: Arc, - partial_backup_rate_limiter: RateLimiter, - ) -> Result<()> { - match fs::metadata(&self.timeline_dir).await { - Ok(_) => { - // Timeline directory exists on disk, we should leave state unchanged - // and return error. - bail!(TimelineError::Invalid(self.ttid)); - } - Err(e) if e.kind() == std::io::ErrorKind::NotFound => {} - Err(e) => { - return Err(e.into()); - } - } - - // Create timeline directory. - fs::create_dir_all(&self.timeline_dir).await?; - - // Write timeline to disk and start background tasks. - if let Err(e) = shared_state.sk.state_mut().flush().await { - // Bootstrap failed, cancel timeline and remove timeline directory. - self.cancel(shared_state); - - if let Err(fs_err) = fs::remove_dir_all(&self.timeline_dir).await { - warn!( - "failed to remove timeline {} directory after bootstrap failure: {}", - self.ttid, fs_err - ); - } - - return Err(e); - } - self.bootstrap( - shared_state, - conf, - broker_active_set, - partial_backup_rate_limiter, - ); - Ok(()) - } - /// Bootstrap new or existing timeline starting background tasks. pub fn bootstrap( self: &Arc, @@ -593,33 +549,61 @@ impl Timeline { ) { let (tx, rx) = self.manager_ctl.bootstrap_manager(); + let Ok(gate_guard) = self.gate.enter() else { + // Init raced with shutdown + return; + }; + // Start manager task which will monitor timeline state and update // background tasks. - tokio::spawn(timeline_manager::main_task( - ManagerTimeline { tli: self.clone() }, - conf.clone(), - broker_active_set, - tx, - rx, - partial_backup_rate_limiter, - )); + tokio::spawn({ + let this = self.clone(); + let conf = conf.clone(); + async move { + let _gate_guard = gate_guard; + timeline_manager::main_task( + ManagerTimeline { tli: this }, + conf, + broker_active_set, + tx, + rx, + partial_backup_rate_limiter, + ) + .await + } + }); + } + + /// Background timeline activities (which hold Timeline::gate) will no + /// longer run once this function completes. + pub async fn shutdown(&self) { + info!("timeline {} shutting down", self.ttid); + self.cancel.cancel(); + + // Wait for any concurrent tasks to stop using this timeline, to avoid e.g. attempts + // to read deleted files. + self.gate.close().await; } /// Delete timeline from disk completely, by removing timeline directory. - /// Background timeline activities will stop eventually. /// /// Also deletes WAL in s3. Might fail if e.g. s3 is unavailable, but /// deletion API endpoint is retriable. + /// + /// Timeline must be in shut-down state (i.e. call [`Self::shutdown`] first) pub async fn delete( &self, shared_state: &mut WriteGuardSharedState<'_>, only_local: bool, ) -> Result { - self.cancel(shared_state); + // Assert that [`Self::shutdown`] was already called + assert!(self.cancel.is_cancelled()); + assert!(self.gate.close_complete()); + + // Close associated FDs. Nobody will be able to touch timeline data once + // it is cancelled, so WAL storage won't be opened again. + shared_state.sk.close_wal_store(); - // TODO: It's better to wait for s3 offloader termination before - // removing data from s3. Though since s3 doesn't have transactions it - // still wouldn't guarantee absense of data after removal. let conf = GlobalTimelines::get_global_config(); if !only_local && conf.is_wal_backup_enabled() { // Note: we concurrently delete remote storage data from multiple @@ -631,16 +615,6 @@ impl Timeline { Ok(dir_existed) } - /// Cancel timeline to prevent further usage. Background tasks will stop - /// eventually after receiving cancellation signal. - fn cancel(&self, shared_state: &mut WriteGuardSharedState<'_>) { - info!("timeline {} is cancelled", self.ttid); - self.cancel.cancel(); - // Close associated FDs. Nobody will be able to touch timeline data once - // it is cancelled, so WAL storage won't be opened again. - shared_state.sk.close_wal_store(); - } - /// Returns if timeline is cancelled. pub fn is_cancelled(&self) -> bool { self.cancel.is_cancelled() diff --git a/safekeeper/src/timeline_guard.rs b/safekeeper/src/timeline_guard.rs index 1ddac573d2..9102a40df8 100644 --- a/safekeeper/src/timeline_guard.rs +++ b/safekeeper/src/timeline_guard.rs @@ -7,6 +7,7 @@ use std::collections::HashSet; use tracing::debug; +use utils::sync::gate::GateGuard; use crate::timeline_manager::ManagerCtlMessage; @@ -16,6 +17,12 @@ pub struct GuardId(u64); pub struct ResidenceGuard { manager_tx: tokio::sync::mpsc::UnboundedSender, guard_id: GuardId, + + /// [`ResidenceGuard`] represents a guarantee that a timeline's data remains resident, + /// which by extension also means the timeline is not shut down (since after shut down + /// our data may be deleted). Therefore everyone holding a residence guard must also + /// hold a guard on [`crate::timeline::Timeline::gate`] + _gate_guard: GateGuard, } impl Drop for ResidenceGuard { @@ -52,7 +59,8 @@ impl AccessService { self.guards.is_empty() } - pub(crate) fn create_guard(&mut self) -> ResidenceGuard { + /// `timeline_gate_guard` is a guarantee that the timeline is not shut down + pub(crate) fn create_guard(&mut self, timeline_gate_guard: GateGuard) -> ResidenceGuard { let guard_id = self.next_guard_id; self.next_guard_id += 1; self.guards.insert(guard_id); @@ -63,6 +71,7 @@ impl AccessService { ResidenceGuard { manager_tx: self.manager_tx.clone(), guard_id, + _gate_guard: timeline_gate_guard, } } diff --git a/safekeeper/src/timeline_manager.rs b/safekeeper/src/timeline_manager.rs index e9fed21bf5..c02fb904cf 100644 --- a/safekeeper/src/timeline_manager.rs +++ b/safekeeper/src/timeline_manager.rs @@ -266,8 +266,10 @@ pub async fn main_task( // Start recovery task which always runs on the timeline. if !mgr.is_offloaded && mgr.conf.peer_recovery_enabled { - let tli = mgr.wal_resident_timeline(); - mgr.recovery_task = Some(tokio::spawn(recovery_main(tli, mgr.conf.clone()))); + // Recovery task is only spawned if we can get a residence guard (i.e. timeline is not already shutting down) + if let Ok(tli) = mgr.wal_resident_timeline() { + mgr.recovery_task = Some(tokio::spawn(recovery_main(tli, mgr.conf.clone()))); + } } // If timeline is evicted, reflect that in the metric. @@ -375,6 +377,13 @@ pub async fn main_task( // shutdown background tasks if mgr.conf.is_wal_backup_enabled() { + if let Some(backup_task) = mgr.backup_task.take() { + // If we fell through here, then the timeline is shutting down. This is important + // because otherwise joining on the wal_backup handle might hang. + assert!(mgr.tli.cancel.is_cancelled()); + + backup_task.join().await; + } wal_backup::update_task(&mut mgr, false, &last_state).await; } @@ -442,10 +451,18 @@ impl Manager { /// Get a WalResidentTimeline. /// Manager code must use this function instead of one from `Timeline` /// directly, because it will deadlock. - pub(crate) fn wal_resident_timeline(&mut self) -> WalResidentTimeline { + /// + /// This function is fallible because the guard may not be created if the timeline is + /// shutting down. + pub(crate) fn wal_resident_timeline(&mut self) -> anyhow::Result { assert!(!self.is_offloaded); - let guard = self.access_service.create_guard(); - WalResidentTimeline::new(self.tli.clone(), guard) + let guard = self.access_service.create_guard( + self.tli + .gate + .enter() + .map_err(|_| anyhow::anyhow!("Timeline shutting down"))?, + ); + Ok(WalResidentTimeline::new(self.tli.clone(), guard)) } /// Get a snapshot of the timeline state. @@ -559,6 +576,11 @@ impl Manager { if removal_horizon_segno > self.last_removed_segno { // we need to remove WAL + let Ok(timeline_gate_guard) = self.tli.gate.enter() else { + tracing::info!("Timeline shutdown, not spawning WAL removal task"); + return; + }; + let remover = match self.tli.read_shared_state().await.sk { StateSK::Loaded(ref sk) => { crate::wal_storage::Storage::remove_up_to(&sk.wal_store, removal_horizon_segno) @@ -573,6 +595,8 @@ impl Manager { self.wal_removal_task = Some(tokio::spawn( async move { + let _timeline_gate_guard = timeline_gate_guard; + remover.await?; Ok(removal_horizon_segno) } @@ -619,10 +643,15 @@ impl Manager { return; } + let Ok(resident) = self.wal_resident_timeline() else { + // Shutting down + return; + }; + // Get WalResidentTimeline and start partial backup task. let cancel = CancellationToken::new(); let handle = tokio::spawn(wal_backup_partial::main_task( - self.wal_resident_timeline(), + resident, self.conf.clone(), self.global_rate_limiter.clone(), cancel.clone(), @@ -664,7 +693,7 @@ impl Manager { self.partial_backup_task = None; } - let tli = self.wal_resident_timeline(); + let tli = self.wal_resident_timeline()?; let mut partial_backup = PartialBackup::new(tli, self.conf.clone()).await; // Reset might fail e.g. when cfile is already reset but s3 removal // failed, so set manager state to None beforehand. In any case caller @@ -688,7 +717,12 @@ impl Manager { let guard = if self.is_offloaded { Err(anyhow::anyhow!("timeline is offloaded, can't get a guard")) } else { - Ok(self.access_service.create_guard()) + match self.tli.gate.enter() { + Ok(gate_guard) => Ok(self.access_service.create_guard(gate_guard)), + Err(_) => Err(anyhow::anyhow!( + "timeline is shutting down, can't get a guard" + )), + } }; if tx.send(guard).is_err() { @@ -699,7 +733,10 @@ impl Manager { let result = if self.is_offloaded { None } else { - Some(self.access_service.create_guard()) + match self.tli.gate.enter() { + Ok(gate_guard) => Some(self.access_service.create_guard(gate_guard)), + Err(_) => None, + } }; if tx.send(result).is_err() { diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs index 33d94da034..067945fd5f 100644 --- a/safekeeper/src/timelines_global_map.rs +++ b/safekeeper/src/timelines_global_map.rs @@ -457,10 +457,12 @@ impl GlobalTimelines { Ok(timeline) => { let was_active = timeline.broker_active.load(Ordering::Relaxed); + info!("deleting timeline {}, only_local={}", ttid, only_local); + timeline.shutdown().await; + // Take a lock and finish the deletion holding this mutex. let mut shared_state = timeline.write_shared_state().await; - info!("deleting timeline {}, only_local={}", ttid, only_local); let dir_existed = timeline.delete(&mut shared_state, only_local).await?; Ok(TimelineDeleteForceResult { diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 6c87e5a926..34b5dbeaa1 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -25,7 +25,6 @@ use tokio::fs::File; use tokio::select; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::{watch, OnceCell}; -use tokio::time::sleep; use tracing::*; use utils::{id::TenantTimelineId, lsn::Lsn}; @@ -46,6 +45,14 @@ pub struct WalBackupTaskHandle { handle: JoinHandle<()>, } +impl WalBackupTaskHandle { + pub(crate) async fn join(self) { + if let Err(e) = self.handle.await { + error!("WAL backup task panicked: {}", e); + } + } +} + /// Do we have anything to upload to S3, i.e. should safekeepers run backup activity? pub(crate) fn is_wal_backup_required( wal_seg_size: usize, @@ -74,11 +81,12 @@ pub(crate) async fn update_task(mgr: &mut Manager, need_backup: bool, state: &St let (shutdown_tx, shutdown_rx) = mpsc::channel(1); - let async_task = backup_task_main( - mgr.wal_resident_timeline(), - mgr.conf.backup_parallel_jobs, - shutdown_rx, - ); + let Ok(resident) = mgr.wal_resident_timeline() else { + info!("Timeline shut down"); + return; + }; + + let async_task = backup_task_main(resident, mgr.conf.backup_parallel_jobs, shutdown_rx); let handle = if mgr.conf.current_thread_runtime { tokio::spawn(async_task) @@ -108,9 +116,7 @@ async fn shut_down_task(entry: &mut Option) { // Tell the task to shutdown. Error means task exited earlier, that's ok. let _ = wb_handle.shutdown_tx.send(()).await; // Await the task itself. TODO: restart panicked tasks earlier. - if let Err(e) = wb_handle.handle.await { - warn!("WAL backup task panicked: {}", e); - } + wb_handle.join().await; } } @@ -214,6 +220,7 @@ async fn backup_task_main( let _guard = WAL_BACKUP_TASKS.guard(); info!("started"); + let cancel = tli.tli.cancel.clone(); let mut wb = WalBackupTask { wal_seg_size: tli.get_wal_seg_size().await, commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(), @@ -230,25 +237,34 @@ async fn backup_task_main( _ = wb.run() => {} _ = shutdown_rx.recv() => { canceled = true; + }, + _ = cancel.cancelled() => { + canceled = true; } } info!("task {}", if canceled { "canceled" } else { "terminated" }); } impl WalBackupTask { + /// This function must be called from a select! that also respects self.timeline's + /// cancellation token. This is done in [`backup_task_main`]. + /// + /// The future returned by this function is safe to drop at any time because it + /// does not write to local disk. async fn run(&mut self) { let mut backup_lsn = Lsn(0); let mut retry_attempt = 0u32; // offload loop - loop { + while !self.timeline.cancel.is_cancelled() { if retry_attempt == 0 { // wait for new WAL to arrive if let Err(e) = self.commit_lsn_watch_rx.changed().await { - // should never happen, as we hold Arc to timeline. + // should never happen, as we hold Arc to timeline and transmitter's lifetime + // is within Timeline's error!("commit_lsn watch shut down: {:?}", e); return; - } + }; } else { // or just sleep if we errored previously let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS; @@ -256,7 +272,7 @@ impl WalBackupTask { { retry_delay = min(retry_delay, backoff_delay); } - sleep(Duration::from_millis(retry_delay)).await; + tokio::time::sleep(Duration::from_millis(retry_delay)).await; } let commit_lsn = *self.commit_lsn_watch_rx.borrow(); diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index 0676b3dd9a..6eaaa3c37f 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -1784,6 +1784,89 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): cur.execute("INSERT INTO t (key) VALUES (123)") +def test_delete_timeline_under_load(neon_env_builder: NeonEnvBuilder): + """ + Test deleting timelines on a safekeeper while they're under load. + + This should not happen under normal operation, but it can happen if + there is some rogue compute/pageserver that is writing/reading to a + safekeeper that we're migrating a timeline away from, or if the timeline + is being deleted while such a rogue client is running. + """ + neon_env_builder.auth_enabled = True + env = neon_env_builder.init_start() + + # Create two endpoints that will generate load + timeline_id_a = env.create_branch("deleteme_a") + timeline_id_b = env.create_branch("deleteme_b") + + endpoint_a = env.endpoints.create("deleteme_a") + endpoint_a.start() + endpoint_b = env.endpoints.create("deleteme_b") + endpoint_b.start() + + # Get tenant and timeline IDs + tenant_id = env.initial_tenant + + # Start generating load on both timelines + def generate_load(endpoint: Endpoint): + with closing(endpoint.connect()) as conn: + with conn.cursor() as cur: + cur.execute("CREATE TABLE IF NOT EXISTS t(key int, value text)") + while True: + try: + cur.execute("INSERT INTO t SELECT generate_series(1,1000), 'data'") + except: # noqa + # Ignore errors since timeline may be deleted + break + + t_a = threading.Thread(target=generate_load, args=(endpoint_a,)) + t_b = threading.Thread(target=generate_load, args=(endpoint_b,)) + try: + t_a.start() + t_b.start() + + # Let the load run for a bit + log.info("Warming up...") + time.sleep(2) + + # Safekeeper errors will propagate to the pageserver: it is correct that these are + # logged at error severity because they indicate the pageserver is trying to read + # a timeline that it shouldn't. + env.pageserver.allowed_errors.extend( + [ + ".*Timeline.*was cancelled.*", + ".*Timeline.*was not found.*", + ] + ) + + # Try deleting timelines while under load + sk = env.safekeepers[0] + sk_http = sk.http_client(auth_token=env.auth_keys.generate_tenant_token(tenant_id)) + + # Delete first timeline + log.info(f"Deleting {timeline_id_a}...") + assert sk_http.timeline_delete(tenant_id, timeline_id_a, only_local=True)["dir_existed"] + + # Delete second timeline + log.info(f"Deleting {timeline_id_b}...") + assert sk_http.timeline_delete(tenant_id, timeline_id_b, only_local=True)["dir_existed"] + + # Verify timelines are gone from disk + sk_data_dir = sk.data_dir + assert not (sk_data_dir / str(tenant_id) / str(timeline_id_a)).exists() + # assert not (sk_data_dir / str(tenant_id) / str(timeline_id_b)).exists() + + finally: + log.info("Stopping endpoints...") + # Stop endpoints with immediate mode because we deleted the timeline out from under the compute, which may cause it to hang + endpoint_a.stop(mode="immediate") + endpoint_b.stop(mode="immediate") + log.info("Joining threads...") + t_a.join() + t_b.join() + + # Basic pull_timeline test. # When live_sk_change is False, compute is restarted to change set of # safekeepers; otherwise it is live reload.