diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index dd8e91bd51..e75d9f0d26 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -1605,7 +1605,7 @@ impl Tenant { } /// Changes tenant status to active, unless shutdown was already requested. - fn activate(&self, ctx: &RequestContext) -> anyhow::Result<()> { + fn activate(self: &Arc, ctx: &RequestContext) -> anyhow::Result<()> { debug_assert_current_span_has_tenant_id(); let mut result = Ok(()); @@ -1638,7 +1638,7 @@ impl Tenant { // Spawn gc and compaction loops. The loops will shut themselves // down when they notice that the tenant is inactive. - tasks::start_background_loops(self.tenant_id); + tasks::start_background_loops(self); let mut activated_timelines = 0; let mut timelines_broken_during_activation = 0; diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 6bf26f1da1..b3c8a4a3bb 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -9,13 +9,12 @@ use crate::context::{DownloadBehavior, RequestContext}; use crate::metrics::TENANT_TASK_EVENTS; use crate::task_mgr; use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME}; -use crate::tenant::mgr; use crate::tenant::{Tenant, TenantState}; use tokio_util::sync::CancellationToken; use tracing::*; -use utils::id::TenantId; -pub fn start_background_loops(tenant_id: TenantId) { +pub fn start_background_loops(tenant: &Arc) { + let tenant_id = tenant.tenant_id; task_mgr::spawn( BACKGROUND_RUNTIME.handle(), TaskKind::Compaction, @@ -23,11 +22,14 @@ pub fn start_background_loops(tenant_id: TenantId) { None, &format!("compactor for tenant {tenant_id}"), false, - async move { - compaction_loop(tenant_id) - .instrument(info_span!("compaction_loop", tenant_id = %tenant_id)) - .await; - Ok(()) + { + let tenant = Arc::clone(tenant); + async move { + compaction_loop(tenant) + .instrument(info_span!("compaction_loop", tenant_id = %tenant_id)) + .await; + Ok(()) + } }, ); task_mgr::spawn( @@ -37,11 +39,14 @@ pub fn start_background_loops(tenant_id: TenantId) { None, &format!("garbage collector for tenant {tenant_id}"), false, - async move { - gc_loop(tenant_id) - .instrument(info_span!("gc_loop", tenant_id = %tenant_id)) - .await; - Ok(()) + { + let tenant = Arc::clone(tenant); + async move { + gc_loop(tenant) + .instrument(info_span!("gc_loop", tenant_id = %tenant_id)) + .await; + Ok(()) + } }, ); } @@ -49,7 +54,7 @@ pub fn start_background_loops(tenant_id: TenantId) { /// /// Compaction task's main loop /// -async fn compaction_loop(tenant_id: TenantId) { +async fn compaction_loop(tenant: Arc) { let wait_duration = Duration::from_secs(2); info!("starting"); TENANT_TASK_EVENTS.with_label_values(&["start"]).inc(); @@ -60,16 +65,16 @@ async fn compaction_loop(tenant_id: TenantId) { loop { trace!("waking up"); - let tenant = tokio::select! { + tokio::select! { _ = cancel.cancelled() => { info!("received cancellation request"); return; }, - tenant_wait_result = wait_for_active_tenant(tenant_id, wait_duration) => match tenant_wait_result { + tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result { ControlFlow::Break(()) => return, - ControlFlow::Continue(tenant) => tenant, + ControlFlow::Continue(()) => (), }, - }; + } let period = tenant.get_compaction_period(); @@ -119,7 +124,7 @@ async fn compaction_loop(tenant_id: TenantId) { /// /// GC task's main loop /// -async fn gc_loop(tenant_id: TenantId) { +async fn gc_loop(tenant: Arc) { let wait_duration = Duration::from_secs(2); info!("starting"); TENANT_TASK_EVENTS.with_label_values(&["start"]).inc(); @@ -127,21 +132,22 @@ async fn gc_loop(tenant_id: TenantId) { let cancel = task_mgr::shutdown_token(); // GC might require downloading, to find the cutoff LSN that corresponds to the // cutoff specified as time. - let ctx = RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download); + let ctx = + RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download); let mut first = true; loop { trace!("waking up"); - let tenant = tokio::select! { + tokio::select! { _ = cancel.cancelled() => { info!("received cancellation request"); return; }, - tenant_wait_result = wait_for_active_tenant(tenant_id, wait_duration) => match tenant_wait_result { + tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result { ControlFlow::Break(()) => return, - ControlFlow::Continue(tenant) => tenant, + ControlFlow::Continue(()) => (), }, - }; + } let period = tenant.get_gc_period(); @@ -161,7 +167,9 @@ async fn gc_loop(tenant_id: TenantId) { Duration::from_secs(10) } else { // Run gc - let res = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &ctx).await; + let res = tenant + .gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &ctx) + .await; if let Err(e) = res { error!("Gc failed, retrying in {:?}: {e:?}", wait_duration); wait_duration @@ -187,23 +195,10 @@ async fn gc_loop(tenant_id: TenantId) { trace!("GC loop stopped."); } -async fn wait_for_active_tenant( - tenant_id: TenantId, - wait: Duration, -) -> ControlFlow<(), Arc> { - let tenant = loop { - match mgr::get_tenant(tenant_id, false).await { - Ok(tenant) => break tenant, - Err(e) => { - error!("Failed to get a tenant {tenant_id}: {e:#}"); - tokio::time::sleep(wait).await; - } - } - }; - +async fn wait_for_active_tenant(tenant: &Arc) -> ControlFlow<()> { // if the tenant has a proper status already, no need to wait for anything if tenant.current_state() == TenantState::Active { - ControlFlow::Continue(tenant) + ControlFlow::Continue(()) } else { let mut tenant_state_updates = tenant.subscribe_for_state_updates(); loop { @@ -213,7 +208,7 @@ async fn wait_for_active_tenant( match new_state { TenantState::Active => { debug!("Tenant state changed to active, continuing the task loop"); - return ControlFlow::Continue(tenant); + return ControlFlow::Continue(()); } state => { debug!("Not running the task loop, tenant is not active: {state:?}"); diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 59afc104e6..3ff5429616 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1603,8 +1603,6 @@ class NeonPageserver(PgProtocol): # https://github.com/neondatabase/neon/issues/2442 ".*could not remove ephemeral file.*No such file or directory.*", # FIXME: These need investigation - ".*gc_loop.*Failed to get a tenant .* Tenant .* not found.*", - ".*compaction_loop.*Failed to get a tenant .* Tenant .* not found.*", ".*manual_gc.*is_shutdown_requested\\(\\) called in an unexpected task or thread.*", ".*tenant_list: timeline is not found in remote index while it is present in the tenants registry.*", ".*Removing intermediate uninit mark file.*",