tenant loops: refactor wait-for-active and cancellation

`wait_for_active_tenant` is mis-named: its purpose is not to wait for
the tenant to become active, but, to prevent new loop iterations while
the tenant is not active.

However, we never allow a tenant to transition from `!Active` to
`Active` state again. So, the "while not active" aspect is moot.

Futher, we know that we spawnt he background loops `Tenant::activate`
when we just made the tenant `Active`. So, we will never actually
wait for the tenant to become active.

The only condition where the tenant can be observed `!Active` is when
we're shutting down, i.e., transitioning the tenant to `Stopping`.
The loops should exit when that happens.

But `wait_for_active_tenant` doesn't handle that case.
The individual loops use `task_mgr::shutdown_token()` for that.

This patch simplifies the code by

1. removing `wait_for_active_tenant` which we have shown
   above to be quite useless, and
2. by making cancellation of the loops a concern of the
   `Tenant::set_stopping` / `Tenant::set_broken`

This in turn allows us to remove `Tenant::subscribe_for_state_updates`,
which is great because now `Tenant::state` is only watched through
well-defined APIs like `Tenant::wait_to_become_active`.

The context for this PR is me trying to find an alternative to
https://github.com/neondatabase/neon/pull/4291
which is s part of the https://github.com/orgs/neondatabase/projects/38
(async get_value_reconstruct_data).

I don't know if this leads to a true alternative for 4291, but,
it's a useful cleanup by itself.
This commit is contained in:
Christian Schwarz
2023-05-22 18:26:48 +02:00
parent 21c444e85f
commit d96a8d60cf
2 changed files with 30 additions and 68 deletions

View File

@@ -18,6 +18,7 @@ use remote_storage::DownloadError;
use remote_storage::GenericRemoteStorage;
use tokio::sync::watch;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::crashsafe::path_with_suffix_extension;
@@ -154,6 +155,8 @@ pub struct Tenant {
cached_synthetic_tenant_size: Arc<AtomicU64>,
eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
background_loops_cancel: Mutex<Option<CancellationToken>>,
}
/// A timeline with some of its files on disk, being initialized.
@@ -1620,8 +1623,15 @@ impl Tenant {
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
tasks::start_background_loops(self);
// down in response to the cancellation token getting dropped.
let background_loops_cancel = CancellationToken::new();
let existing = self
.background_loops_cancel
.lock()
.unwrap()
.replace(background_loops_cancel.clone());
assert!(existing.is_none(), "we don't support re-activation");
tasks::start_background_loops(self, background_loops_cancel.clone());
let mut activated_timelines = 0;
let mut timelines_broken_during_activation = 0;
@@ -1677,6 +1687,10 @@ impl Tenant {
TenantState::Active | TenantState::Loading | TenantState::Attaching => {
*current_state = TenantState::Stopping;
if let Some(cancel) = self.background_loops_cancel.lock().unwrap().take() {
cancel.cancel();
}
// FIXME: If the tenant is still Loading or Attaching, new timelines
// might be created after this. That's harmless, as the Timelines
// won't be accessible to anyone, when the Tenant is in Stopping
@@ -1711,6 +1725,10 @@ impl Tenant {
// we can, but it shouldn't happen.
warn!("Changing Active tenant to Broken state, reason: {}", reason);
*current_state = TenantState::broken_from_reason(reason);
if let Some(cancel) = self.background_loops_cancel.lock().unwrap().take() {
cancel.cancel();
}
}
TenantState::Broken { .. } => {
// This shouldn't happen either
@@ -1732,11 +1750,7 @@ impl Tenant {
});
}
pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TenantState> {
self.state.subscribe()
}
pub async fn wait_to_become_active(&self) -> anyhow::Result<()> {
pub async fn wait_to_become_active(&self) -> Result<(), WaitToBecomeActiveError> {
let mut receiver = self.state.subscribe();
loop {
let current_state = receiver.borrow_and_update().clone();
@@ -1982,6 +1996,7 @@ impl Tenant {
cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
background_loops_cancel: Mutex::new(None),
}
}

View File

@@ -1,7 +1,6 @@
//! This module contains functions to serve per-tenant background processes,
//! such as compaction and GC
use std::ops::ControlFlow;
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -9,11 +8,11 @@ use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics::TENANT_TASK_EVENTS;
use crate::task_mgr;
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::{Tenant, TenantState};
use crate::tenant::Tenant;
use tokio_util::sync::CancellationToken;
use tracing::*;
pub fn start_background_loops(tenant: &Arc<Tenant>) {
pub fn start_background_loops(tenant: &Arc<Tenant>, cancel: CancellationToken) {
let tenant_id = tenant.tenant_id;
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
@@ -24,8 +23,9 @@ pub fn start_background_loops(tenant: &Arc<Tenant>) {
false,
{
let tenant = Arc::clone(tenant);
let cancel = cancel.clone();
async move {
compaction_loop(tenant)
compaction_loop(tenant, cancel)
.instrument(info_span!("compaction_loop", tenant_id = %tenant_id))
.await;
Ok(())
@@ -41,8 +41,9 @@ pub fn start_background_loops(tenant: &Arc<Tenant>) {
false,
{
let tenant = Arc::clone(tenant);
let cancel = cancel.clone();
async move {
gc_loop(tenant)
gc_loop(tenant, cancel)
.instrument(info_span!("gc_loop", tenant_id = %tenant_id))
.await;
Ok(())
@@ -54,28 +55,16 @@ pub fn start_background_loops(tenant: &Arc<Tenant>) {
///
/// Compaction task's main loop
///
async fn compaction_loop(tenant: Arc<Tenant>) {
async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
let wait_duration = Duration::from_secs(2);
info!("starting");
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
let cancel = task_mgr::shutdown_token();
let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
let mut first = true;
loop {
trace!("waking up");
tokio::select! {
_ = cancel.cancelled() => {
info!("received cancellation request");
return;
},
tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
ControlFlow::Break(()) => return,
ControlFlow::Continue(()) => (),
},
}
let period = tenant.get_compaction_period();
// TODO: we shouldn't need to await to find tenant and this could be moved outside of
@@ -124,12 +113,11 @@ async fn compaction_loop(tenant: Arc<Tenant>) {
///
/// GC task's main loop
///
async fn gc_loop(tenant: Arc<Tenant>) {
async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
let wait_duration = Duration::from_secs(2);
info!("starting");
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
let cancel = task_mgr::shutdown_token();
// GC might require downloading, to find the cutoff LSN that corresponds to the
// cutoff specified as time.
let ctx =
@@ -138,17 +126,6 @@ async fn gc_loop(tenant: Arc<Tenant>) {
loop {
trace!("waking up");
tokio::select! {
_ = cancel.cancelled() => {
info!("received cancellation request");
return;
},
tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
ControlFlow::Break(()) => return,
ControlFlow::Continue(()) => (),
},
}
let period = tenant.get_gc_period();
if first {
@@ -195,36 +172,6 @@ async fn gc_loop(tenant: Arc<Tenant>) {
trace!("GC loop stopped.");
}
async fn wait_for_active_tenant(tenant: &Arc<Tenant>) -> ControlFlow<()> {
// if the tenant has a proper status already, no need to wait for anything
if tenant.current_state() == TenantState::Active {
ControlFlow::Continue(())
} else {
let mut tenant_state_updates = tenant.subscribe_for_state_updates();
loop {
match tenant_state_updates.changed().await {
Ok(()) => {
let new_state = &*tenant_state_updates.borrow();
match new_state {
TenantState::Active => {
debug!("Tenant state changed to active, continuing the task loop");
return ControlFlow::Continue(());
}
state => {
debug!("Not running the task loop, tenant is not active: {state:?}");
continue;
}
}
}
Err(_sender_dropped_error) => {
info!("Tenant dropped the state updates sender, quitting waiting for tenant and the task loop");
return ControlFlow::Break(());
}
}
}
}
}
#[derive(thiserror::Error, Debug)]
#[error("cancelled")]
pub(crate) struct Cancelled;