From d96a8d60cff4e9667bd67c8d88a86cd6b2abcba6 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 22 May 2023 18:26:48 +0200 Subject: [PATCH] tenant loops: refactor wait-for-active and cancellation `wait_for_active_tenant` is mis-named: its purpose is not to wait for the tenant to become active, but, to prevent new loop iterations while the tenant is not active. However, we never allow a tenant to transition from `!Active` to `Active` state again. So, the "while not active" aspect is moot. Futher, we know that we spawnt he background loops `Tenant::activate` when we just made the tenant `Active`. So, we will never actually wait for the tenant to become active. The only condition where the tenant can be observed `!Active` is when we're shutting down, i.e., transitioning the tenant to `Stopping`. The loops should exit when that happens. But `wait_for_active_tenant` doesn't handle that case. The individual loops use `task_mgr::shutdown_token()` for that. This patch simplifies the code by 1. removing `wait_for_active_tenant` which we have shown above to be quite useless, and 2. by making cancellation of the loops a concern of the `Tenant::set_stopping` / `Tenant::set_broken` This in turn allows us to remove `Tenant::subscribe_for_state_updates`, which is great because now `Tenant::state` is only watched through well-defined APIs like `Tenant::wait_to_become_active`. The context for this PR is me trying to find an alternative to https://github.com/neondatabase/neon/pull/4291 which is s part of the https://github.com/orgs/neondatabase/projects/38 (async get_value_reconstruct_data). I don't know if this leads to a true alternative for 4291, but, it's a useful cleanup by itself. --- pageserver/src/tenant.rs | 29 ++++++++++---- pageserver/src/tenant/tasks.rs | 69 ++++------------------------------ 2 files changed, 30 insertions(+), 68 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 7348503791..4b7c5388e0 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -18,6 +18,7 @@ use remote_storage::DownloadError; use remote_storage::GenericRemoteStorage; use tokio::sync::watch; use tokio::task::JoinSet; +use tokio_util::sync::CancellationToken; use tracing::*; use utils::crashsafe::path_with_suffix_extension; @@ -154,6 +155,8 @@ pub struct Tenant { cached_synthetic_tenant_size: Arc, eviction_task_tenant_state: tokio::sync::Mutex, + + background_loops_cancel: Mutex>, } /// A timeline with some of its files on disk, being initialized. @@ -1620,8 +1623,15 @@ impl Tenant { .filter(|timeline| timeline.current_state() != TimelineState::Broken); // Spawn gc and compaction loops. The loops will shut themselves - // down when they notice that the tenant is inactive. - tasks::start_background_loops(self); + // down in response to the cancellation token getting dropped. + let background_loops_cancel = CancellationToken::new(); + let existing = self + .background_loops_cancel + .lock() + .unwrap() + .replace(background_loops_cancel.clone()); + assert!(existing.is_none(), "we don't support re-activation"); + tasks::start_background_loops(self, background_loops_cancel.clone()); let mut activated_timelines = 0; let mut timelines_broken_during_activation = 0; @@ -1677,6 +1687,10 @@ impl Tenant { TenantState::Active | TenantState::Loading | TenantState::Attaching => { *current_state = TenantState::Stopping; + if let Some(cancel) = self.background_loops_cancel.lock().unwrap().take() { + cancel.cancel(); + } + // FIXME: If the tenant is still Loading or Attaching, new timelines // might be created after this. That's harmless, as the Timelines // won't be accessible to anyone, when the Tenant is in Stopping @@ -1711,6 +1725,10 @@ impl Tenant { // we can, but it shouldn't happen. warn!("Changing Active tenant to Broken state, reason: {}", reason); *current_state = TenantState::broken_from_reason(reason); + + if let Some(cancel) = self.background_loops_cancel.lock().unwrap().take() { + cancel.cancel(); + } } TenantState::Broken { .. } => { // This shouldn't happen either @@ -1732,11 +1750,7 @@ impl Tenant { }); } - pub fn subscribe_for_state_updates(&self) -> watch::Receiver { - self.state.subscribe() - } - - pub async fn wait_to_become_active(&self) -> anyhow::Result<()> { + pub async fn wait_to_become_active(&self) -> Result<(), WaitToBecomeActiveError> { let mut receiver = self.state.subscribe(); loop { let current_state = receiver.borrow_and_update().clone(); @@ -1982,6 +1996,7 @@ impl Tenant { cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()), cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)), eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()), + background_loops_cancel: Mutex::new(None), } } diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index b3c8a4a3bb..273aaa492c 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -1,7 +1,6 @@ //! This module contains functions to serve per-tenant background processes, //! such as compaction and GC -use std::ops::ControlFlow; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -9,11 +8,11 @@ use crate::context::{DownloadBehavior, RequestContext}; use crate::metrics::TENANT_TASK_EVENTS; use crate::task_mgr; use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME}; -use crate::tenant::{Tenant, TenantState}; +use crate::tenant::Tenant; use tokio_util::sync::CancellationToken; use tracing::*; -pub fn start_background_loops(tenant: &Arc) { +pub fn start_background_loops(tenant: &Arc, cancel: CancellationToken) { let tenant_id = tenant.tenant_id; task_mgr::spawn( BACKGROUND_RUNTIME.handle(), @@ -24,8 +23,9 @@ pub fn start_background_loops(tenant: &Arc) { false, { let tenant = Arc::clone(tenant); + let cancel = cancel.clone(); async move { - compaction_loop(tenant) + compaction_loop(tenant, cancel) .instrument(info_span!("compaction_loop", tenant_id = %tenant_id)) .await; Ok(()) @@ -41,8 +41,9 @@ pub fn start_background_loops(tenant: &Arc) { false, { let tenant = Arc::clone(tenant); + let cancel = cancel.clone(); async move { - gc_loop(tenant) + gc_loop(tenant, cancel) .instrument(info_span!("gc_loop", tenant_id = %tenant_id)) .await; Ok(()) @@ -54,28 +55,16 @@ pub fn start_background_loops(tenant: &Arc) { /// /// Compaction task's main loop /// -async fn compaction_loop(tenant: Arc) { +async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { let wait_duration = Duration::from_secs(2); info!("starting"); TENANT_TASK_EVENTS.with_label_values(&["start"]).inc(); async { - let cancel = task_mgr::shutdown_token(); let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download); let mut first = true; loop { trace!("waking up"); - tokio::select! { - _ = cancel.cancelled() => { - info!("received cancellation request"); - return; - }, - tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result { - ControlFlow::Break(()) => return, - ControlFlow::Continue(()) => (), - }, - } - let period = tenant.get_compaction_period(); // TODO: we shouldn't need to await to find tenant and this could be moved outside of @@ -124,12 +113,11 @@ async fn compaction_loop(tenant: Arc) { /// /// GC task's main loop /// -async fn gc_loop(tenant: Arc) { +async fn gc_loop(tenant: Arc, cancel: CancellationToken) { let wait_duration = Duration::from_secs(2); info!("starting"); TENANT_TASK_EVENTS.with_label_values(&["start"]).inc(); async { - let cancel = task_mgr::shutdown_token(); // GC might require downloading, to find the cutoff LSN that corresponds to the // cutoff specified as time. let ctx = @@ -138,17 +126,6 @@ async fn gc_loop(tenant: Arc) { loop { trace!("waking up"); - tokio::select! { - _ = cancel.cancelled() => { - info!("received cancellation request"); - return; - }, - tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result { - ControlFlow::Break(()) => return, - ControlFlow::Continue(()) => (), - }, - } - let period = tenant.get_gc_period(); if first { @@ -195,36 +172,6 @@ async fn gc_loop(tenant: Arc) { trace!("GC loop stopped."); } -async fn wait_for_active_tenant(tenant: &Arc) -> ControlFlow<()> { - // if the tenant has a proper status already, no need to wait for anything - if tenant.current_state() == TenantState::Active { - ControlFlow::Continue(()) - } else { - let mut tenant_state_updates = tenant.subscribe_for_state_updates(); - loop { - match tenant_state_updates.changed().await { - Ok(()) => { - let new_state = &*tenant_state_updates.borrow(); - match new_state { - TenantState::Active => { - debug!("Tenant state changed to active, continuing the task loop"); - return ControlFlow::Continue(()); - } - state => { - debug!("Not running the task loop, tenant is not active: {state:?}"); - continue; - } - } - } - Err(_sender_dropped_error) => { - info!("Tenant dropped the state updates sender, quitting waiting for tenant and the task loop"); - return ControlFlow::Break(()); - } - } - } - } -} - #[derive(thiserror::Error, Debug)] #[error("cancelled")] pub(crate) struct Cancelled;