mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-16 04:30:38 +00:00
make TenantState::{Loading,Attaching,Activating} owned by spawn_load / spawn_attach
See the Mermaid diagram in the doc comment for the now-possible state transitions.
The two core insights / changes are:
- spawn_load and spawn_attach own the tenant state until they're done
- once load()/attach() calls are done
- if they failed, transition them to Broken directly (we know
that there's no background activity because we didn't call activate yet)
- if they succeed, call activate. We can make it infallible. How? Later.
- set_broken() and set_stopping() are changed to wait for spawn_load() /
spawn_attach() to finish. This sounds scary because it might hinder
detach or shutdown, but actually, concurrent attach+detach, or
attach+shutdown, or load+shutdown, or attach+shutdown were just racy.
With this change, they're not anymore.
We can add a CancellationToken stored in Tenant for load/attach and cancel
it from set_stopping() or set_broken() if necessary in the future.
So, why can activate() be infallible now: because we declare that
spawn_load and spawn_attach own the tenant state until they're done.
And we enforce that ownership using the wait_for at the start of
set_stopping and set_broken.
This commit is contained in:
@@ -18,7 +18,29 @@ use crate::reltag::RelTag;
|
||||
use anyhow::bail;
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
|
||||
/// A state of a tenant in pageserver's memory.
|
||||
/// The state of a tenant in this pageserver.
|
||||
///
|
||||
/// ```mermaid
|
||||
/// stateDiagram-v2
|
||||
///
|
||||
/// [*] --> Loading: spawn_load()
|
||||
/// [*] --> Attaching: spawn_attach()
|
||||
///
|
||||
/// Loading --> Activating: activate()
|
||||
/// Attaching --> Activating: activate()
|
||||
/// Activating --> Active: infallible
|
||||
///
|
||||
/// Loading --> Broken: load() failure
|
||||
/// Attaching --> Broken: attach() failure
|
||||
///
|
||||
/// Active --> Stopping: set_stopping(), part of shutdown & detach
|
||||
/// Stopping --> Broken: late error in remove_tenant_from_memory
|
||||
///
|
||||
/// Broken --> [*]: ignore / detach / shutdown
|
||||
/// Stopping --> [*]: remove_from_memory complete
|
||||
///
|
||||
/// Active --> Broken: cfg(testing)-only tenant break point
|
||||
/// ```
|
||||
#[derive(
|
||||
Clone,
|
||||
PartialEq,
|
||||
@@ -35,11 +57,11 @@ use bytes::{BufMut, Bytes, BytesMut};
|
||||
pub enum TenantState {
|
||||
/// This tenant is being loaded from local disk
|
||||
Loading,
|
||||
/// This tenant is being downloaded from cloud storage.
|
||||
/// This tenant is being attached to the pageserver.
|
||||
Attaching,
|
||||
/// The tenant is transitioning from Loading/Attaching to Active.
|
||||
Activating,
|
||||
/// Tenant is fully operational
|
||||
/// The tenant has finished activating and is open for business.
|
||||
Active,
|
||||
/// A tenant is recognized by pageserver, but it is being detached or the
|
||||
/// system is being shut down.
|
||||
|
||||
@@ -617,16 +617,17 @@ impl Tenant {
|
||||
"attach tenant",
|
||||
false,
|
||||
async move {
|
||||
let doit = async {
|
||||
tenant_clone.attach(&ctx).await?;
|
||||
tenant_clone.activate(broker_client, &ctx)?;
|
||||
anyhow::Ok(())
|
||||
};
|
||||
match doit.await {
|
||||
Ok(_) => {}
|
||||
match tenant_clone.attach(&ctx).await {
|
||||
Ok(()) => {
|
||||
info!("attach finished, activating");
|
||||
tenant_clone.activate(broker_client, &ctx);
|
||||
}
|
||||
Err(e) => {
|
||||
tenant_clone.set_broken(e.to_string()).await;
|
||||
error!("error attaching tenant: {:?}", e);
|
||||
error!("attach failed, setting tenant state to Broken: {:?}", e);
|
||||
tenant_clone.state.send_modify(|state| {
|
||||
assert_eq!(*state, TenantState::Attaching, "the attach task owns the tenant state until activation is complete");
|
||||
*state = TenantState::broken_from_reason(e.to_string());
|
||||
});
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
@@ -643,6 +644,8 @@ impl Tenant {
|
||||
///
|
||||
/// Background task that downloads all data for a tenant and brings it to Active state.
|
||||
///
|
||||
/// No background tasks are started as part of this routine.
|
||||
///
|
||||
async fn attach(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
|
||||
@@ -892,20 +895,20 @@ impl Tenant {
|
||||
"initial tenant load",
|
||||
false,
|
||||
async move {
|
||||
let doit = async {
|
||||
tenant_clone.load(&ctx).await?;
|
||||
tenant_clone.activate(broker_client, &ctx)?;
|
||||
anyhow::Ok(())
|
||||
};
|
||||
match doit.await {
|
||||
Ok(()) => {}
|
||||
match tenant_clone.load(&ctx).await {
|
||||
Ok(()) => {
|
||||
info!("load finished, activating");
|
||||
tenant_clone.activate(broker_client, &ctx);
|
||||
}
|
||||
Err(err) => {
|
||||
tenant_clone.set_broken(err.to_string()).await;
|
||||
error!("could not load tenant {tenant_id}: {err:?}");
|
||||
error!("load failed, setting tenant state to Broken: {err:?}");
|
||||
tenant_clone.state.send_modify(|state| {
|
||||
assert_eq!(*state, TenantState::Loading, "the loading task owns the tenant state until activation is complete");
|
||||
*state = TenantState::broken_from_reason(err.to_string());
|
||||
});
|
||||
}
|
||||
}
|
||||
info!("initial load for tenant {tenant_id} finished!");
|
||||
Ok(())
|
||||
Ok(())
|
||||
}
|
||||
.instrument({
|
||||
let span = tracing::info_span!(parent: None, "load", tenant_id=%tenant_id);
|
||||
@@ -923,6 +926,7 @@ impl Tenant {
|
||||
/// Background task to load in-memory data structures for this tenant, from
|
||||
/// files on disk. Used at pageserver startup.
|
||||
///
|
||||
/// No background tasks are started as part of this routine.
|
||||
async fn load(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
|
||||
@@ -1613,37 +1617,16 @@ impl Tenant {
|
||||
}
|
||||
|
||||
/// Changes tenant status to active, unless shutdown was already requested.
|
||||
fn activate(
|
||||
self: &Arc<Self>,
|
||||
broker_client: BrokerClientChannel,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
fn activate(self: &Arc<Self>, broker_client: BrokerClientChannel, ctx: &RequestContext) {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let mut result = Ok(());
|
||||
let mut activating = false;
|
||||
self.state.send_modify(|current_state| {
|
||||
match &*current_state {
|
||||
TenantState::Activating => {
|
||||
// activate() was called on an already Activating tenant. Shouldn't happen.
|
||||
result = Err(anyhow::anyhow!("Tenant is already activating"));
|
||||
TenantState::Activating | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping => {
|
||||
panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {state:?}", state = current_state);
|
||||
}
|
||||
TenantState::Active => {
|
||||
// activate() was called on an already Active tenant. Shouldn't happen.
|
||||
result = Err(anyhow::anyhow!("Tenant is already active"));
|
||||
}
|
||||
TenantState::Broken { reason, .. } => {
|
||||
// This shouldn't happen either
|
||||
result = Err(anyhow::anyhow!(
|
||||
"Could not activate tenant because it is in broken state due to: {reason}",
|
||||
));
|
||||
}
|
||||
TenantState::Stopping => {
|
||||
// The tenant was detached, or system shutdown was requested, while we were
|
||||
// loading or attaching the tenant.
|
||||
info!("Tenant is already in Stopping state, skipping activation");
|
||||
}
|
||||
TenantState::Loading | TenantState::Attaching => {
|
||||
TenantState::Loading | TenantState::Attaching => {
|
||||
*current_state = TenantState::Activating;
|
||||
debug!(tenant_id = %self.tenant_id, "Activating tenant");
|
||||
activating = true;
|
||||
@@ -1652,10 +1635,6 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
});
|
||||
if let Err(e) = result {
|
||||
assert!(!activating, "transition into Activating is infallible");
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
if activating {
|
||||
let timelines_accessor = self.timelines.lock().unwrap();
|
||||
@@ -1696,42 +1675,47 @@ impl Tenant {
|
||||
"activation attempt finished"
|
||||
);
|
||||
});
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Change tenant status to Stopping, to mark that it is being shut down.
|
||||
///
|
||||
/// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
|
||||
///
|
||||
/// This function is not cancel-safe!
|
||||
pub async fn set_stopping(&self) {
|
||||
// `Activating` is a transient state during which no external state transitions are supported.
|
||||
let mut rx = self.state.subscribe();
|
||||
rx.wait_for(|state| state != TenantState::Activating)
|
||||
.await
|
||||
.expect("cannot drop self.state while on a &self method");
|
||||
|
||||
// cannot stop before we're done activating, so wait out until we're done activating
|
||||
rx.wait_for(|state| match state {
|
||||
TenantState::Activating | TenantState::Loading | TenantState::Attaching => false, // TODO log that we're waiting
|
||||
TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping {} => true,
|
||||
})
|
||||
.await
|
||||
.expect("cannot drop self.state while on a &self method");
|
||||
|
||||
// we now know we're done activating, let's see whether this task is the winner to transition into Stopping
|
||||
let mut stopping = false;
|
||||
self.state.send_modify(|current_state| {
|
||||
match current_state {
|
||||
TenantState::Activating => unreachable!("we checked above and never transition back into Activating state"),
|
||||
// FIXME: If the tenant is still Loading or Attaching, new timelines
|
||||
// might be created after this. That's harmless, as the Timelines
|
||||
// won't be accessible to anyone, when the Tenant is in Stopping
|
||||
// state.
|
||||
TenantState::Active | TenantState::Loading | TenantState::Attaching => {
|
||||
*current_state = TenantState::Stopping;
|
||||
stopping = true;
|
||||
// Continue outside the closure. We need to grab timelines.lock()
|
||||
// and we plan to turn it into a tokio::sync::Mutex in a future patch.
|
||||
}
|
||||
TenantState::Broken { reason, .. } => {
|
||||
info!("Cannot set tenant to Stopping state, it is in Broken state due to: {reason}");
|
||||
}
|
||||
TenantState::Stopping => {
|
||||
// The tenant was detached, or system shutdown was requested, while we were
|
||||
// loading or attaching the tenant.
|
||||
info!("Tenant is already in Stopping state");
|
||||
}
|
||||
self.state.send_modify(|current_state| match current_state {
|
||||
TenantState::Activating | TenantState::Loading | TenantState::Attaching => {
|
||||
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
|
||||
}
|
||||
TenantState::Active => {
|
||||
// FIXME: due to time-of-check vs time-of-use issues, it can happen that new timelines
|
||||
// are created after the transition to Stopping. That's harmless, as the Timelines
|
||||
// won't be accessible to anyone afterwards, because the Tenant is in Stopping state.
|
||||
*current_state = TenantState::Stopping;
|
||||
stopping = true;
|
||||
// Continue stopping outside the closure. We need to grab timelines.lock()
|
||||
// and we plan to turn it into a tokio::sync::Mutex in a future patch.
|
||||
}
|
||||
TenantState::Broken { reason, .. } => {
|
||||
info!(
|
||||
"Cannot set tenant to Stopping state, it is in Broken state due to: {reason}"
|
||||
);
|
||||
}
|
||||
TenantState::Stopping => {
|
||||
info!("Tenant is already in Stopping state");
|
||||
}
|
||||
});
|
||||
|
||||
@@ -1746,43 +1730,50 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn set_broken(&self, reason: String) {
|
||||
// `Activating` is a transient state during which no external state transitions are supported.
|
||||
/// Method for tenant::mgr to transition us into Broken state in case of a late failure in
|
||||
/// `remove_tenant_from_memory`
|
||||
///
|
||||
/// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
|
||||
///
|
||||
/// In tests, we also use this to set tenants to Broken state on purpose.
|
||||
pub(crate) async fn set_broken(&self, reason: String) {
|
||||
let mut rx = self.state.subscribe();
|
||||
rx.wait_for(|state| state != TenantState::Activating)
|
||||
.await
|
||||
.expect("cannot drop self.state while on a &self method");
|
||||
|
||||
// The load & attach routines own the tenant state until it has reached `Active`.
|
||||
// So, wait until it's done.
|
||||
rx.wait_for(|state| match state {
|
||||
TenantState::Activating | TenantState::Loading | TenantState::Attaching => false, // TODO log that we're waiting
|
||||
TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping {} => true,
|
||||
})
|
||||
.await
|
||||
.expect("cannot drop self.state while on a &self method");
|
||||
|
||||
// we now know we're done activating, let's see whether this task is the winner to transition into Broken
|
||||
self.state.send_modify(|current_state| {
|
||||
match *current_state {
|
||||
TenantState::Activating => {
|
||||
unreachable!("we checked above and never transition back into Activating state")
|
||||
TenantState::Activating | TenantState::Loading | TenantState::Attaching => {
|
||||
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
|
||||
}
|
||||
TenantState::Active => {
|
||||
// Broken tenants can currently only used for fatal errors that happen
|
||||
// while loading or attaching a tenant. A tenant that has already been
|
||||
// activated should never be marked as broken. We cope with it the best
|
||||
// we can, but it shouldn't happen.
|
||||
warn!("Changing Active tenant to Broken state, reason: {}", reason);
|
||||
*current_state = TenantState::broken_from_reason(reason);
|
||||
if cfg!(feature = "testing") {
|
||||
warn!("Changing Active tenant to Broken state, reason: {}", reason);
|
||||
*current_state = TenantState::broken_from_reason(reason);
|
||||
} else {
|
||||
unreachable!("not allowed to call set_broken on Active tenants in non-testing builds")
|
||||
}
|
||||
}
|
||||
TenantState::Broken { .. } => {
|
||||
// This shouldn't happen either
|
||||
warn!("Tenant is already in Broken state");
|
||||
}
|
||||
// This is the only "expected" path, any other path is a bug.
|
||||
TenantState::Stopping => {
|
||||
// This shouldn't happen either
|
||||
warn!(
|
||||
"Marking Stopping tenant as Broken state, reason: {}",
|
||||
reason
|
||||
);
|
||||
*current_state = TenantState::broken_from_reason(reason);
|
||||
}
|
||||
TenantState::Loading | TenantState::Attaching => {
|
||||
info!("Setting tenant as Broken state, reason: {}", reason);
|
||||
*current_state = TenantState::broken_from_reason(reason);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -246,11 +246,9 @@ pub async fn shutdown_all_tenants() {
|
||||
|
||||
let mut tenants_to_freeze_and_flush = Vec::with_capacity(tenants_to_shut_down.len());
|
||||
for (_, tenant) in tenants_to_shut_down {
|
||||
if tenant.is_active() {
|
||||
// updates tenant state, forbidding new GC and compaction iterations from starting
|
||||
tenant.set_stopping().await;
|
||||
tenants_to_freeze_and_flush.push(tenant);
|
||||
}
|
||||
// updates tenant state, forbidding new GC and compaction iterations from starting
|
||||
tenant.set_stopping().await;
|
||||
tenants_to_freeze_and_flush.push(tenant);
|
||||
}
|
||||
|
||||
// Shut down all existing walreceiver connections and stop accepting the new ones.
|
||||
|
||||
@@ -20,7 +20,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
".*is not active. Current state: Broken.*",
|
||||
".*will not become active. Current state: Broken.*",
|
||||
".*failed to load metadata.*",
|
||||
".*could not load tenant.*load local timeline.*",
|
||||
".*load failed.*load local timeline.*",
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
@@ -140,7 +140,7 @@ def test_remote_storage_backup_and_restore(
|
||||
# This is before the failures injected by test_remote_failures, so it's a permanent error.
|
||||
pageserver_http.configure_failpoints(("storage-sync-list-remote-timelines", "return"))
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*error attaching tenant: storage-sync-list-remote-timelines",
|
||||
".*attach failed.*: storage-sync-list-remote-timelines",
|
||||
)
|
||||
# Attach it. This HTTP request will succeed and launch a
|
||||
# background task to load the tenant. In that background task,
|
||||
|
||||
@@ -647,7 +647,9 @@ def test_ignored_tenant_stays_broken_without_metadata(
|
||||
metadata_removed = True
|
||||
assert metadata_removed, f"Failed to find metadata file in {tenant_timeline_dir}"
|
||||
|
||||
env.pageserver.allowed_errors.append(".*could not load tenant .*?: failed to load metadata.*")
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*{tenant_id}.*: load failed.*: failed to load metadata.*"
|
||||
)
|
||||
|
||||
# now, load it from the local files and expect it to be broken due to inability to load tenant files into memory
|
||||
pageserver_http.tenant_load(tenant_id=tenant_id)
|
||||
|
||||
@@ -22,6 +22,7 @@ from fixtures.neon_fixtures import (
|
||||
available_remote_storages,
|
||||
)
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from fixtures.utils import wait_until
|
||||
from prometheus_client.samples import Sample
|
||||
|
||||
|
||||
@@ -308,9 +309,7 @@ def test_pageserver_with_empty_tenants(
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*marking .* as locally complete, while it doesnt exist in remote index.*"
|
||||
)
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*could not load tenant.*Failed to list timelines directory.*"
|
||||
)
|
||||
env.pageserver.allowed_errors.append(".*load failed.*Failed to list timelines directory.*")
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
@@ -340,9 +339,15 @@ def test_pageserver_with_empty_tenants(
|
||||
env.pageserver.start()
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
tenants = client.tenant_list()
|
||||
|
||||
assert len(tenants) == 2
|
||||
def not_loading():
|
||||
tenants = client.tenant_list()
|
||||
assert len(tenants) == 2
|
||||
assert all(t["state"]["slug"] != "Loading" for t in tenants)
|
||||
|
||||
wait_until(10, 0.2, not_loading)
|
||||
|
||||
tenants = client.tenant_list()
|
||||
|
||||
[broken_tenant] = [t for t in tenants if t["id"] == str(tenant_without_timelines_dir)]
|
||||
assert (
|
||||
@@ -354,8 +359,6 @@ def test_pageserver_with_empty_tenants(
|
||||
broken_tenant_status["state"]["slug"] == "Broken"
|
||||
), f"Tenant {tenant_without_timelines_dir} without timelines dir should be broken"
|
||||
|
||||
assert env.pageserver.log_contains(".*Setting tenant as Broken state, reason:.*")
|
||||
|
||||
[loaded_tenant] = [t for t in tenants if t["id"] == str(tenant_with_empty_timelines_dir)]
|
||||
assert (
|
||||
loaded_tenant["state"]["slug"] == "Active"
|
||||
|
||||
Reference in New Issue
Block a user