Compare commits

...

3 Commits

Author SHA1 Message Date
Christian Schwarz
2f748e47ee fixup: type spilled over from https://github.com/neondatabase/neon/pull/4300 2023-05-22 20:06:39 +02:00
Christian Schwarz
d96a8d60cf tenant loops: refactor wait-for-active and cancellation
`wait_for_active_tenant` is mis-named: its purpose is not to wait for
the tenant to become active, but, to prevent new loop iterations while
the tenant is not active.

However, we never allow a tenant to transition from `!Active` to
`Active` state again. So, the "while not active" aspect is moot.

Futher, we know that we spawnt he background loops `Tenant::activate`
when we just made the tenant `Active`. So, we will never actually
wait for the tenant to become active.

The only condition where the tenant can be observed `!Active` is when
we're shutting down, i.e., transitioning the tenant to `Stopping`.
The loops should exit when that happens.

But `wait_for_active_tenant` doesn't handle that case.
The individual loops use `task_mgr::shutdown_token()` for that.

This patch simplifies the code by

1. removing `wait_for_active_tenant` which we have shown
   above to be quite useless, and
2. by making cancellation of the loops a concern of the
   `Tenant::set_stopping` / `Tenant::set_broken`

This in turn allows us to remove `Tenant::subscribe_for_state_updates`,
which is great because now `Tenant::state` is only watched through
well-defined APIs like `Tenant::wait_to_become_active`.

The context for this PR is me trying to find an alternative to
https://github.com/neondatabase/neon/pull/4291
which is s part of the https://github.com/orgs/neondatabase/projects/38
(async get_value_reconstruct_data).

I don't know if this leads to a true alternative for 4291, but,
it's a useful cleanup by itself.
2023-05-22 18:26:48 +02:00
Christian Schwarz
21c444e85f tenant loops: operate on the Arc<Tenant> directly
(Instead of going through mgr every iteration.)

The `wait_for_active_tenant` function's `wait` argument could be removed
because it was only used for the loop that waits for the tenant to show
up in the tenants map. Since we're passing the tenant in, we now longer
need to get it from the tenants map.

NB that there's no guarantee that the tenant object is in the tenants map
at the time the background loop function starts running. But the
tenant mgr guarantees that it will be quite soon. See
`tenant_map_insert` way upwards in the call hierarchy for details.

This is prep work to eliminate `subscribe_for_state_updates`.
Which I'm exploring as an alternative to
https://github.com/neondatabase/neon/pull/4291

So, it's part of the https://github.com/orgs/neondatabase/projects/38
    (async get_value_reconstruct_data)
2023-05-22 17:50:08 +02:00
2 changed files with 50 additions and 93 deletions

View File

@@ -18,6 +18,7 @@ use remote_storage::DownloadError;
use remote_storage::GenericRemoteStorage;
use tokio::sync::watch;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::crashsafe::path_with_suffix_extension;
@@ -154,6 +155,8 @@ pub struct Tenant {
cached_synthetic_tenant_size: Arc<AtomicU64>,
eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
background_loops_cancel: Mutex<Option<CancellationToken>>,
}
/// A timeline with some of its files on disk, being initialized.
@@ -1588,7 +1591,7 @@ impl Tenant {
}
/// Changes tenant status to active, unless shutdown was already requested.
fn activate(&self, ctx: &RequestContext) -> anyhow::Result<()> {
fn activate(self: &Arc<Self>, ctx: &RequestContext) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
let mut result = Ok(());
@@ -1620,8 +1623,15 @@ impl Tenant {
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
tasks::start_background_loops(self.tenant_id);
// down in response to the cancellation token getting dropped.
let background_loops_cancel = CancellationToken::new();
let existing = self
.background_loops_cancel
.lock()
.unwrap()
.replace(background_loops_cancel.clone());
assert!(existing.is_none(), "we don't support re-activation");
tasks::start_background_loops(self, background_loops_cancel.clone());
let mut activated_timelines = 0;
let mut timelines_broken_during_activation = 0;
@@ -1677,6 +1687,10 @@ impl Tenant {
TenantState::Active | TenantState::Loading | TenantState::Attaching => {
*current_state = TenantState::Stopping;
if let Some(cancel) = self.background_loops_cancel.lock().unwrap().take() {
cancel.cancel();
}
// FIXME: If the tenant is still Loading or Attaching, new timelines
// might be created after this. That's harmless, as the Timelines
// won't be accessible to anyone, when the Tenant is in Stopping
@@ -1711,6 +1725,10 @@ impl Tenant {
// we can, but it shouldn't happen.
warn!("Changing Active tenant to Broken state, reason: {}", reason);
*current_state = TenantState::broken_from_reason(reason);
if let Some(cancel) = self.background_loops_cancel.lock().unwrap().take() {
cancel.cancel();
}
}
TenantState::Broken { .. } => {
// This shouldn't happen either
@@ -1732,10 +1750,6 @@ impl Tenant {
});
}
pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TenantState> {
self.state.subscribe()
}
pub async fn wait_to_become_active(&self) -> anyhow::Result<()> {
let mut receiver = self.state.subscribe();
loop {
@@ -1982,6 +1996,7 @@ impl Tenant {
cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
background_loops_cancel: Mutex::new(None),
}
}

View File

@@ -1,7 +1,6 @@
//! This module contains functions to serve per-tenant background processes,
//! such as compaction and GC
use std::ops::ControlFlow;
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -9,13 +8,12 @@ use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics::TENANT_TASK_EVENTS;
use crate::task_mgr;
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::mgr;
use crate::tenant::{Tenant, TenantState};
use crate::tenant::Tenant;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::id::TenantId;
pub fn start_background_loops(tenant_id: TenantId) {
pub fn start_background_loops(tenant: &Arc<Tenant>, cancel: CancellationToken) {
let tenant_id = tenant.tenant_id;
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::Compaction,
@@ -23,11 +21,15 @@ pub fn start_background_loops(tenant_id: TenantId) {
None,
&format!("compactor for tenant {tenant_id}"),
false,
async move {
compaction_loop(tenant_id)
.instrument(info_span!("compaction_loop", tenant_id = %tenant_id))
.await;
Ok(())
{
let tenant = Arc::clone(tenant);
let cancel = cancel.clone();
async move {
compaction_loop(tenant, cancel)
.instrument(info_span!("compaction_loop", tenant_id = %tenant_id))
.await;
Ok(())
}
},
);
task_mgr::spawn(
@@ -37,11 +39,15 @@ pub fn start_background_loops(tenant_id: TenantId) {
None,
&format!("garbage collector for tenant {tenant_id}"),
false,
async move {
gc_loop(tenant_id)
.instrument(info_span!("gc_loop", tenant_id = %tenant_id))
.await;
Ok(())
{
let tenant = Arc::clone(tenant);
let cancel = cancel.clone();
async move {
gc_loop(tenant, cancel)
.instrument(info_span!("gc_loop", tenant_id = %tenant_id))
.await;
Ok(())
}
},
);
}
@@ -49,28 +55,16 @@ pub fn start_background_loops(tenant_id: TenantId) {
///
/// Compaction task's main loop
///
async fn compaction_loop(tenant_id: TenantId) {
async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
let wait_duration = Duration::from_secs(2);
info!("starting");
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
let cancel = task_mgr::shutdown_token();
let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
let mut first = true;
loop {
trace!("waking up");
let tenant = tokio::select! {
_ = cancel.cancelled() => {
info!("received cancellation request");
return;
},
tenant_wait_result = wait_for_active_tenant(tenant_id, wait_duration) => match tenant_wait_result {
ControlFlow::Break(()) => return,
ControlFlow::Continue(tenant) => tenant,
},
};
let period = tenant.get_compaction_period();
// TODO: we shouldn't need to await to find tenant and this could be moved outside of
@@ -119,30 +113,19 @@ async fn compaction_loop(tenant_id: TenantId) {
///
/// GC task's main loop
///
async fn gc_loop(tenant_id: TenantId) {
async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
let wait_duration = Duration::from_secs(2);
info!("starting");
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
let cancel = task_mgr::shutdown_token();
// GC might require downloading, to find the cutoff LSN that corresponds to the
// cutoff specified as time.
let ctx = RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
let ctx =
RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
let mut first = true;
loop {
trace!("waking up");
let tenant = tokio::select! {
_ = cancel.cancelled() => {
info!("received cancellation request");
return;
},
tenant_wait_result = wait_for_active_tenant(tenant_id, wait_duration) => match tenant_wait_result {
ControlFlow::Break(()) => return,
ControlFlow::Continue(tenant) => tenant,
},
};
let period = tenant.get_gc_period();
if first {
@@ -161,7 +144,9 @@ async fn gc_loop(tenant_id: TenantId) {
Duration::from_secs(10)
} else {
// Run gc
let res = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &ctx).await;
let res = tenant
.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &ctx)
.await;
if let Err(e) = res {
error!("Gc failed, retrying in {:?}: {e:?}", wait_duration);
wait_duration
@@ -187,49 +172,6 @@ async fn gc_loop(tenant_id: TenantId) {
trace!("GC loop stopped.");
}
async fn wait_for_active_tenant(
tenant_id: TenantId,
wait: Duration,
) -> ControlFlow<(), Arc<Tenant>> {
let tenant = loop {
match mgr::get_tenant(tenant_id, false).await {
Ok(tenant) => break tenant,
Err(e) => {
error!("Failed to get a tenant {tenant_id}: {e:#}");
tokio::time::sleep(wait).await;
}
}
};
// if the tenant has a proper status already, no need to wait for anything
if tenant.current_state() == TenantState::Active {
ControlFlow::Continue(tenant)
} else {
let mut tenant_state_updates = tenant.subscribe_for_state_updates();
loop {
match tenant_state_updates.changed().await {
Ok(()) => {
let new_state = &*tenant_state_updates.borrow();
match new_state {
TenantState::Active => {
debug!("Tenant state changed to active, continuing the task loop");
return ControlFlow::Continue(tenant);
}
state => {
debug!("Not running the task loop, tenant is not active: {state:?}");
continue;
}
}
}
Err(_sender_dropped_error) => {
info!("Tenant dropped the state updates sender, quitting waiting for tenant and the task loop");
return ControlFlow::Break(());
}
}
}
}
}
#[derive(thiserror::Error, Debug)]
#[error("cancelled")]
pub(crate) struct Cancelled;