From e015b2bf3eb96c7a5d8b9bf0253ae13e569f5747 Mon Sep 17 00:00:00 2001 From: John Spray Date: Wed, 22 May 2024 16:10:58 +0100 Subject: [PATCH] safekeeper: use CancellationToken instead of watch channel (#7836) ## Problem Safekeeper Timeline uses a channel for cancellation, but we have a dedicated type for that. ## Summary of changes - Use CancellationToken in Timeline --- safekeeper/src/recovery.rs | 10 ++------- safekeeper/src/timeline.rs | 31 +++++++--------------------- safekeeper/src/timeline_manager.rs | 10 +-------- safekeeper/src/wal_backup_partial.rs | 14 +++---------- 4 files changed, 13 insertions(+), 52 deletions(-) diff --git a/safekeeper/src/recovery.rs b/safekeeper/src/recovery.rs index e8fa6c55f4..dfa1892c40 100644 --- a/safekeeper/src/recovery.rs +++ b/safekeeper/src/recovery.rs @@ -37,17 +37,11 @@ use crate::{ #[instrument(name = "recovery task", skip_all, fields(ttid = %tli.ttid))] pub async fn recovery_main(tli: Arc, conf: SafeKeeperConf) { info!("started"); - let mut cancellation_rx = match tli.get_cancellation_rx() { - Ok(rx) => rx, - Err(_) => { - info!("timeline canceled during task start"); - return; - } - }; + let cancel = tli.cancel.clone(); select! { _ = recovery_main_loop(tli, conf) => { unreachable!() } - _ = cancellation_rx.changed() => { + _ = cancel.cancelled() => { info!("stopped"); } } diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index da2e3f4538..89c157d514 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -6,6 +6,7 @@ use camino::Utf8PathBuf; use postgres_ffi::XLogSegNo; use serde::{Deserialize, Serialize}; use tokio::fs; +use tokio_util::sync::CancellationToken; use std::cmp::max; use std::ops::{Deref, DerefMut}; @@ -342,12 +343,8 @@ pub struct Timeline { walsenders: Arc, walreceivers: Arc, - /// Cancellation channel. Delete/cancel will send `true` here as a cancellation signal. - cancellation_tx: watch::Sender, - - /// Timeline should not be used after cancellation. Background tasks should - /// monitor this channel and stop eventually after receiving `true` from this channel. - cancellation_rx: watch::Receiver, + /// Delete/cancel will trigger this, background tasks should drop out as soon as it fires + pub(crate) cancel: CancellationToken, /// Directory where timeline state is stored. pub timeline_dir: Utf8PathBuf, @@ -376,7 +373,6 @@ impl Timeline { shared_state.sk.flush_lsn(), ))); let (shared_state_version_tx, shared_state_version_rx) = watch::channel(0); - let (cancellation_tx, cancellation_rx) = watch::channel(false); let walreceivers = WalReceivers::new(); Ok(Timeline { @@ -390,8 +386,7 @@ impl Timeline { mutex: RwLock::new(shared_state), walsenders: WalSenders::new(walreceivers.clone()), walreceivers, - cancellation_rx, - cancellation_tx, + cancel: CancellationToken::default(), timeline_dir: conf.timeline_dir(&ttid), walsenders_keep_horizon: conf.walsenders_keep_horizon, broker_active: AtomicBool::new(false), @@ -411,7 +406,6 @@ impl Timeline { let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) = watch::channel(TermLsn::from((INVALID_TERM, Lsn::INVALID))); let (shared_state_version_tx, shared_state_version_rx) = watch::channel(0); - let (cancellation_tx, cancellation_rx) = watch::channel(false); let state = TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn); @@ -428,8 +422,7 @@ impl Timeline { mutex: RwLock::new(SharedState::create_new(conf, &ttid, state)?), walsenders: WalSenders::new(walreceivers.clone()), walreceivers, - cancellation_rx, - cancellation_tx, + cancel: CancellationToken::default(), timeline_dir: conf.timeline_dir(&ttid), walsenders_keep_horizon: conf.walsenders_keep_horizon, broker_active: AtomicBool::new(false), @@ -535,7 +528,7 @@ impl Timeline { /// eventually after receiving cancellation signal. fn cancel(&self, shared_state: &mut WriteGuardSharedState<'_>) { info!("timeline {} is cancelled", self.ttid); - let _ = self.cancellation_tx.send(true); + self.cancel.cancel(); // Close associated FDs. Nobody will be able to touch timeline data once // it is cancelled, so WAL storage won't be opened again. shared_state.sk.wal_store.close(); @@ -543,17 +536,7 @@ impl Timeline { /// Returns if timeline is cancelled. pub fn is_cancelled(&self) -> bool { - *self.cancellation_rx.borrow() - } - - /// Returns watch channel which gets value when timeline is cancelled. It is - /// guaranteed to have not cancelled value observed (errors otherwise). - pub fn get_cancellation_rx(&self) -> Result> { - let rx = self.cancellation_rx.clone(); - if *rx.borrow() { - bail!(TimelineError::Cancelled(self.ttid)); - } - Ok(rx) + self.cancel.is_cancelled() } /// Take a writing mutual exclusive lock on timeline shared_state. diff --git a/safekeeper/src/timeline_manager.rs b/safekeeper/src/timeline_manager.rs index 52ad915065..e74ba37ad8 100644 --- a/safekeeper/src/timeline_manager.rs +++ b/safekeeper/src/timeline_manager.rs @@ -47,14 +47,6 @@ pub async fn main_task( conf: SafeKeeperConf, broker_active_set: Arc, ) { - let mut cancellation_rx = match tli.get_cancellation_rx() { - Ok(rx) => rx, - Err(_) => { - info!("timeline canceled during task start"); - return; - } - }; - scopeguard::defer! { if tli.is_cancelled() { info!("manager task finished"); @@ -129,7 +121,7 @@ pub async fn main_task( // wait until something changes. tx channels are stored under Arc, so they will not be // dropped until the manager task is finished. tokio::select! { - _ = cancellation_rx.changed() => { + _ = tli.cancel.cancelled() => { // timeline was deleted break 'outer state_snapshot; } diff --git a/safekeeper/src/wal_backup_partial.rs b/safekeeper/src/wal_backup_partial.rs index 200096ac5c..29e944bff3 100644 --- a/safekeeper/src/wal_backup_partial.rs +++ b/safekeeper/src/wal_backup_partial.rs @@ -277,14 +277,6 @@ pub async fn main_task(tli: Arc, conf: SafeKeeperConf) { debug!("started"); let await_duration = conf.partial_backup_timeout; - let mut cancellation_rx = match tli.get_cancellation_rx() { - Ok(rx) => rx, - Err(_) => { - info!("timeline canceled during task start"); - return; - } - }; - // sleep for random time to avoid thundering herd { let randf64 = rand::thread_rng().gen_range(0.0..1.0); @@ -327,7 +319,7 @@ pub async fn main_task(tli: Arc, conf: SafeKeeperConf) { && flush_lsn_rx.borrow().term == seg.term { tokio::select! { - _ = cancellation_rx.changed() => { + _ = backup.tli.cancel.cancelled() => { info!("timeline canceled"); return; } @@ -340,7 +332,7 @@ pub async fn main_task(tli: Arc, conf: SafeKeeperConf) { // if we don't have any data and zero LSNs, wait for something while flush_lsn_rx.borrow().lsn == Lsn(0) { tokio::select! { - _ = cancellation_rx.changed() => { + _ = backup.tli.cancel.cancelled() => { info!("timeline canceled"); return; } @@ -357,7 +349,7 @@ pub async fn main_task(tli: Arc, conf: SafeKeeperConf) { // waiting until timeout expires OR segno changes 'inner: loop { tokio::select! { - _ = cancellation_rx.changed() => { + _ = backup.tli.cancel.cancelled() => { info!("timeline canceled"); return; }