From de780d2e0fd0718522b5011dffc4a6f96ccc0628 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 25 May 2023 14:07:00 +0200 Subject: [PATCH] make TenantState::{Loading,Attaching,Activating} owned by spawn_load / spawn_attach See the Mermaid diagram in the doc comment for the now-possible state transitions. The two core insights / changes are: - spawn_load and spawn_attach own the tenant state until they're done - once load()/attach() calls are done - if they failed, transition them to Broken directly (we know that there's no background activity because we didn't call activate yet) - if they succeed, call activate. We can make it infallible. How? Later. - set_broken() and set_stopping() are changed to wait for spawn_load() / spawn_attach() to finish. This sounds scary because it might hinder detach or shutdown, but actually, concurrent attach+detach, or attach+shutdown, or load+shutdown, or attach+shutdown were just racy. With this change, they're not anymore. We can add a CancellationToken stored in Tenant for load/attach and cancel it from set_stopping() or set_broken() if necessary in the future. So, why can activate() be infallible now: because we declare that spawn_load and spawn_attach own the tenant state until they're done. And we enforce that ownership using the wait_for at the start of set_stopping and set_broken. --- libs/pageserver_api/src/models.rs | 28 ++- pageserver/src/tenant.rs | 183 ++++++++++---------- pageserver/src/tenant/mgr.rs | 8 +- test_runner/regress/test_broken_timeline.py | 2 +- test_runner/regress/test_remote_storage.py | 2 +- test_runner/regress/test_tenant_detach.py | 4 +- test_runner/regress/test_tenants.py | 17 +- 7 files changed, 130 insertions(+), 114 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 3d98fd63a8..5d1ac5ee15 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -18,7 +18,29 @@ use crate::reltag::RelTag; use anyhow::bail; use bytes::{BufMut, Bytes, BytesMut}; -/// A state of a tenant in pageserver's memory. +/// The state of a tenant in this pageserver. +/// +/// ```mermaid +/// stateDiagram-v2 +/// +/// [*] --> Loading: spawn_load() +/// [*] --> Attaching: spawn_attach() +/// +/// Loading --> Activating: activate() +/// Attaching --> Activating: activate() +/// Activating --> Active: infallible +/// +/// Loading --> Broken: load() failure +/// Attaching --> Broken: attach() failure +/// +/// Active --> Stopping: set_stopping(), part of shutdown & detach +/// Stopping --> Broken: late error in remove_tenant_from_memory +/// +/// Broken --> [*]: ignore / detach / shutdown +/// Stopping --> [*]: remove_from_memory complete +/// +/// Active --> Broken: cfg(testing)-only tenant break point +/// ``` #[derive( Clone, PartialEq, @@ -35,11 +57,11 @@ use bytes::{BufMut, Bytes, BytesMut}; pub enum TenantState { /// This tenant is being loaded from local disk Loading, - /// This tenant is being downloaded from cloud storage. + /// This tenant is being attached to the pageserver. Attaching, /// The tenant is transitioning from Loading/Attaching to Active. Activating, - /// Tenant is fully operational + /// The tenant has finished activating and is open for business. Active, /// A tenant is recognized by pageserver, but it is being detached or the /// system is being shut down. diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 4f100484a9..304651b6ac 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -617,16 +617,17 @@ impl Tenant { "attach tenant", false, async move { - let doit = async { - tenant_clone.attach(&ctx).await?; - tenant_clone.activate(broker_client, &ctx)?; - anyhow::Ok(()) - }; - match doit.await { - Ok(_) => {} + match tenant_clone.attach(&ctx).await { + Ok(()) => { + info!("attach finished, activating"); + tenant_clone.activate(broker_client, &ctx); + } Err(e) => { - tenant_clone.set_broken(e.to_string()).await; - error!("error attaching tenant: {:?}", e); + error!("attach failed, setting tenant state to Broken: {:?}", e); + tenant_clone.state.send_modify(|state| { + assert_eq!(*state, TenantState::Attaching, "the attach task owns the tenant state until activation is complete"); + *state = TenantState::broken_from_reason(e.to_string()); + }); } } Ok(()) @@ -643,6 +644,8 @@ impl Tenant { /// /// Background task that downloads all data for a tenant and brings it to Active state. /// + /// No background tasks are started as part of this routine. + /// async fn attach(self: &Arc, ctx: &RequestContext) -> anyhow::Result<()> { debug_assert_current_span_has_tenant_id(); @@ -892,20 +895,20 @@ impl Tenant { "initial tenant load", false, async move { - let doit = async { - tenant_clone.load(&ctx).await?; - tenant_clone.activate(broker_client, &ctx)?; - anyhow::Ok(()) - }; - match doit.await { - Ok(()) => {} + match tenant_clone.load(&ctx).await { + Ok(()) => { + info!("load finished, activating"); + tenant_clone.activate(broker_client, &ctx); + } Err(err) => { - tenant_clone.set_broken(err.to_string()).await; - error!("could not load tenant {tenant_id}: {err:?}"); + error!("load failed, setting tenant state to Broken: {err:?}"); + tenant_clone.state.send_modify(|state| { + assert_eq!(*state, TenantState::Loading, "the loading task owns the tenant state until activation is complete"); + *state = TenantState::broken_from_reason(err.to_string()); + }); } } - info!("initial load for tenant {tenant_id} finished!"); - Ok(()) + Ok(()) } .instrument({ let span = tracing::info_span!(parent: None, "load", tenant_id=%tenant_id); @@ -923,6 +926,7 @@ impl Tenant { /// Background task to load in-memory data structures for this tenant, from /// files on disk. Used at pageserver startup. /// + /// No background tasks are started as part of this routine. async fn load(self: &Arc, ctx: &RequestContext) -> anyhow::Result<()> { debug_assert_current_span_has_tenant_id(); @@ -1613,37 +1617,16 @@ impl Tenant { } /// Changes tenant status to active, unless shutdown was already requested. - fn activate( - self: &Arc, - broker_client: BrokerClientChannel, - ctx: &RequestContext, - ) -> anyhow::Result<()> { + fn activate(self: &Arc, broker_client: BrokerClientChannel, ctx: &RequestContext) { debug_assert_current_span_has_tenant_id(); - let mut result = Ok(()); let mut activating = false; self.state.send_modify(|current_state| { match &*current_state { - TenantState::Activating => { - // activate() was called on an already Activating tenant. Shouldn't happen. - result = Err(anyhow::anyhow!("Tenant is already activating")); + TenantState::Activating | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping => { + panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {state:?}", state = current_state); } - TenantState::Active => { - // activate() was called on an already Active tenant. Shouldn't happen. - result = Err(anyhow::anyhow!("Tenant is already active")); - } - TenantState::Broken { reason, .. } => { - // This shouldn't happen either - result = Err(anyhow::anyhow!( - "Could not activate tenant because it is in broken state due to: {reason}", - )); - } - TenantState::Stopping => { - // The tenant was detached, or system shutdown was requested, while we were - // loading or attaching the tenant. - info!("Tenant is already in Stopping state, skipping activation"); - } - TenantState::Loading | TenantState::Attaching => { + TenantState::Loading | TenantState::Attaching => { *current_state = TenantState::Activating; debug!(tenant_id = %self.tenant_id, "Activating tenant"); activating = true; @@ -1652,10 +1635,6 @@ impl Tenant { } } }); - if let Err(e) = result { - assert!(!activating, "transition into Activating is infallible"); - return Err(e); - } if activating { let timelines_accessor = self.timelines.lock().unwrap(); @@ -1696,42 +1675,47 @@ impl Tenant { "activation attempt finished" ); }); - }; - Ok(()) + } } /// Change tenant status to Stopping, to mark that it is being shut down. /// + /// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state. + /// /// This function is not cancel-safe! pub async fn set_stopping(&self) { - // `Activating` is a transient state during which no external state transitions are supported. let mut rx = self.state.subscribe(); - rx.wait_for(|state| state != TenantState::Activating) - .await - .expect("cannot drop self.state while on a &self method"); + // cannot stop before we're done activating, so wait out until we're done activating + rx.wait_for(|state| match state { + TenantState::Activating | TenantState::Loading | TenantState::Attaching => false, // TODO log that we're waiting + TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping {} => true, + }) + .await + .expect("cannot drop self.state while on a &self method"); + + // we now know we're done activating, let's see whether this task is the winner to transition into Stopping let mut stopping = false; - self.state.send_modify(|current_state| { - match current_state { - TenantState::Activating => unreachable!("we checked above and never transition back into Activating state"), - // FIXME: If the tenant is still Loading or Attaching, new timelines - // might be created after this. That's harmless, as the Timelines - // won't be accessible to anyone, when the Tenant is in Stopping - // state. - TenantState::Active | TenantState::Loading | TenantState::Attaching => { - *current_state = TenantState::Stopping; - stopping = true; - // Continue outside the closure. We need to grab timelines.lock() - // and we plan to turn it into a tokio::sync::Mutex in a future patch. - } - TenantState::Broken { reason, .. } => { - info!("Cannot set tenant to Stopping state, it is in Broken state due to: {reason}"); - } - TenantState::Stopping => { - // The tenant was detached, or system shutdown was requested, while we were - // loading or attaching the tenant. - info!("Tenant is already in Stopping state"); - } + self.state.send_modify(|current_state| match current_state { + TenantState::Activating | TenantState::Loading | TenantState::Attaching => { + unreachable!("we ensured above that we're done with activation, and, there is no re-activation") + } + TenantState::Active => { + // FIXME: due to time-of-check vs time-of-use issues, it can happen that new timelines + // are created after the transition to Stopping. That's harmless, as the Timelines + // won't be accessible to anyone afterwards, because the Tenant is in Stopping state. + *current_state = TenantState::Stopping; + stopping = true; + // Continue stopping outside the closure. We need to grab timelines.lock() + // and we plan to turn it into a tokio::sync::Mutex in a future patch. + } + TenantState::Broken { reason, .. } => { + info!( + "Cannot set tenant to Stopping state, it is in Broken state due to: {reason}" + ); + } + TenantState::Stopping => { + info!("Tenant is already in Stopping state"); } }); @@ -1746,43 +1730,50 @@ impl Tenant { } } - pub async fn set_broken(&self, reason: String) { - // `Activating` is a transient state during which no external state transitions are supported. + /// Method for tenant::mgr to transition us into Broken state in case of a late failure in + /// `remove_tenant_from_memory` + /// + /// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state. + /// + /// In tests, we also use this to set tenants to Broken state on purpose. + pub(crate) async fn set_broken(&self, reason: String) { let mut rx = self.state.subscribe(); - rx.wait_for(|state| state != TenantState::Activating) - .await - .expect("cannot drop self.state while on a &self method"); + // The load & attach routines own the tenant state until it has reached `Active`. + // So, wait until it's done. + rx.wait_for(|state| match state { + TenantState::Activating | TenantState::Loading | TenantState::Attaching => false, // TODO log that we're waiting + TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping {} => true, + }) + .await + .expect("cannot drop self.state while on a &self method"); + + // we now know we're done activating, let's see whether this task is the winner to transition into Broken self.state.send_modify(|current_state| { match *current_state { - TenantState::Activating => { - unreachable!("we checked above and never transition back into Activating state") + TenantState::Activating | TenantState::Loading | TenantState::Attaching => { + unreachable!("we ensured above that we're done with activation, and, there is no re-activation") } TenantState::Active => { - // Broken tenants can currently only used for fatal errors that happen - // while loading or attaching a tenant. A tenant that has already been - // activated should never be marked as broken. We cope with it the best - // we can, but it shouldn't happen. - warn!("Changing Active tenant to Broken state, reason: {}", reason); - *current_state = TenantState::broken_from_reason(reason); + if cfg!(feature = "testing") { + warn!("Changing Active tenant to Broken state, reason: {}", reason); + *current_state = TenantState::broken_from_reason(reason); + } else { + unreachable!("not allowed to call set_broken on Active tenants in non-testing builds") + } } TenantState::Broken { .. } => { - // This shouldn't happen either warn!("Tenant is already in Broken state"); } + // This is the only "expected" path, any other path is a bug. TenantState::Stopping => { - // This shouldn't happen either warn!( "Marking Stopping tenant as Broken state, reason: {}", reason ); *current_state = TenantState::broken_from_reason(reason); } - TenantState::Loading | TenantState::Attaching => { - info!("Setting tenant as Broken state, reason: {}", reason); - *current_state = TenantState::broken_from_reason(reason); - } - } + } }); } diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index a96364ba78..191ecf1008 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -246,11 +246,9 @@ pub async fn shutdown_all_tenants() { let mut tenants_to_freeze_and_flush = Vec::with_capacity(tenants_to_shut_down.len()); for (_, tenant) in tenants_to_shut_down { - if tenant.is_active() { - // updates tenant state, forbidding new GC and compaction iterations from starting - tenant.set_stopping().await; - tenants_to_freeze_and_flush.push(tenant); - } + // updates tenant state, forbidding new GC and compaction iterations from starting + tenant.set_stopping().await; + tenants_to_freeze_and_flush.push(tenant); } // Shut down all existing walreceiver connections and stop accepting the new ones. diff --git a/test_runner/regress/test_broken_timeline.py b/test_runner/regress/test_broken_timeline.py index fb592bfbc3..0fb3b4f262 100644 --- a/test_runner/regress/test_broken_timeline.py +++ b/test_runner/regress/test_broken_timeline.py @@ -20,7 +20,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder): ".*is not active. Current state: Broken.*", ".*will not become active. Current state: Broken.*", ".*failed to load metadata.*", - ".*could not load tenant.*load local timeline.*", + ".*load failed.*load local timeline.*", ] ) diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 02f1aac99c..aefc8befeb 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -140,7 +140,7 @@ def test_remote_storage_backup_and_restore( # This is before the failures injected by test_remote_failures, so it's a permanent error. pageserver_http.configure_failpoints(("storage-sync-list-remote-timelines", "return")) env.pageserver.allowed_errors.append( - ".*error attaching tenant: storage-sync-list-remote-timelines", + ".*attach failed.*: storage-sync-list-remote-timelines", ) # Attach it. This HTTP request will succeed and launch a # background task to load the tenant. In that background task, diff --git a/test_runner/regress/test_tenant_detach.py b/test_runner/regress/test_tenant_detach.py index 82664cff94..f5e0e34bc9 100644 --- a/test_runner/regress/test_tenant_detach.py +++ b/test_runner/regress/test_tenant_detach.py @@ -647,7 +647,9 @@ def test_ignored_tenant_stays_broken_without_metadata( metadata_removed = True assert metadata_removed, f"Failed to find metadata file in {tenant_timeline_dir}" - env.pageserver.allowed_errors.append(".*could not load tenant .*?: failed to load metadata.*") + env.pageserver.allowed_errors.append( + f".*{tenant_id}.*: load failed.*: failed to load metadata.*" + ) # now, load it from the local files and expect it to be broken due to inability to load tenant files into memory pageserver_http.tenant_load(tenant_id=tenant_id) diff --git a/test_runner/regress/test_tenants.py b/test_runner/regress/test_tenants.py index 5642449ce6..07da6a8145 100644 --- a/test_runner/regress/test_tenants.py +++ b/test_runner/regress/test_tenants.py @@ -22,6 +22,7 @@ from fixtures.neon_fixtures import ( available_remote_storages, ) from fixtures.types import Lsn, TenantId, TimelineId +from fixtures.utils import wait_until from prometheus_client.samples import Sample @@ -308,9 +309,7 @@ def test_pageserver_with_empty_tenants( env.pageserver.allowed_errors.append( ".*marking .* as locally complete, while it doesnt exist in remote index.*" ) - env.pageserver.allowed_errors.append( - ".*could not load tenant.*Failed to list timelines directory.*" - ) + env.pageserver.allowed_errors.append(".*load failed.*Failed to list timelines directory.*") client = env.pageserver.http_client() @@ -340,9 +339,15 @@ def test_pageserver_with_empty_tenants( env.pageserver.start() client = env.pageserver.http_client() - tenants = client.tenant_list() - assert len(tenants) == 2 + def not_loading(): + tenants = client.tenant_list() + assert len(tenants) == 2 + assert all(t["state"]["slug"] != "Loading" for t in tenants) + + wait_until(10, 0.2, not_loading) + + tenants = client.tenant_list() [broken_tenant] = [t for t in tenants if t["id"] == str(tenant_without_timelines_dir)] assert ( @@ -354,8 +359,6 @@ def test_pageserver_with_empty_tenants( broken_tenant_status["state"]["slug"] == "Broken" ), f"Tenant {tenant_without_timelines_dir} without timelines dir should be broken" - assert env.pageserver.log_contains(".*Setting tenant as Broken state, reason:.*") - [loaded_tenant] = [t for t in tenants if t["id"] == str(tenant_with_empty_timelines_dir)] assert ( loaded_tenant["state"]["slug"] == "Active"