From 21c444e85fa517d8085289296056b42da9f39161 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 22 May 2023 17:34:16 +0200 Subject: [PATCH] tenant loops: operate on the Arc directly (Instead of going through mgr every iteration.) The `wait_for_active_tenant` function's `wait` argument could be removed because it was only used for the loop that waits for the tenant to show up in the tenants map. Since we're passing the tenant in, we now longer need to get it from the tenants map. NB that there's no guarantee that the tenant object is in the tenants map at the time the background loop function starts running. But the tenant mgr guarantees that it will be quite soon. See `tenant_map_insert` way upwards in the call hierarchy for details. This is prep work to eliminate `subscribe_for_state_updates`. Which I'm exploring as an alternative to https://github.com/neondatabase/neon/pull/4291 So, it's part of the https://github.com/orgs/neondatabase/projects/38 (async get_value_reconstruct_data) --- pageserver/src/tenant.rs | 4 +- pageserver/src/tenant/tasks.rs | 77 ++++++++++++++++------------------ 2 files changed, 38 insertions(+), 43 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 8349e1993f..7348503791 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -1588,7 +1588,7 @@ impl Tenant { } /// Changes tenant status to active, unless shutdown was already requested. - fn activate(&self, ctx: &RequestContext) -> anyhow::Result<()> { + fn activate(self: &Arc, ctx: &RequestContext) -> anyhow::Result<()> { debug_assert_current_span_has_tenant_id(); let mut result = Ok(()); @@ -1621,7 +1621,7 @@ impl Tenant { // Spawn gc and compaction loops. The loops will shut themselves // down when they notice that the tenant is inactive. - tasks::start_background_loops(self.tenant_id); + tasks::start_background_loops(self); let mut activated_timelines = 0; let mut timelines_broken_during_activation = 0; diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 6bf26f1da1..b3c8a4a3bb 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -9,13 +9,12 @@ use crate::context::{DownloadBehavior, RequestContext}; use crate::metrics::TENANT_TASK_EVENTS; use crate::task_mgr; use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME}; -use crate::tenant::mgr; use crate::tenant::{Tenant, TenantState}; use tokio_util::sync::CancellationToken; use tracing::*; -use utils::id::TenantId; -pub fn start_background_loops(tenant_id: TenantId) { +pub fn start_background_loops(tenant: &Arc) { + let tenant_id = tenant.tenant_id; task_mgr::spawn( BACKGROUND_RUNTIME.handle(), TaskKind::Compaction, @@ -23,11 +22,14 @@ pub fn start_background_loops(tenant_id: TenantId) { None, &format!("compactor for tenant {tenant_id}"), false, - async move { - compaction_loop(tenant_id) - .instrument(info_span!("compaction_loop", tenant_id = %tenant_id)) - .await; - Ok(()) + { + let tenant = Arc::clone(tenant); + async move { + compaction_loop(tenant) + .instrument(info_span!("compaction_loop", tenant_id = %tenant_id)) + .await; + Ok(()) + } }, ); task_mgr::spawn( @@ -37,11 +39,14 @@ pub fn start_background_loops(tenant_id: TenantId) { None, &format!("garbage collector for tenant {tenant_id}"), false, - async move { - gc_loop(tenant_id) - .instrument(info_span!("gc_loop", tenant_id = %tenant_id)) - .await; - Ok(()) + { + let tenant = Arc::clone(tenant); + async move { + gc_loop(tenant) + .instrument(info_span!("gc_loop", tenant_id = %tenant_id)) + .await; + Ok(()) + } }, ); } @@ -49,7 +54,7 @@ pub fn start_background_loops(tenant_id: TenantId) { /// /// Compaction task's main loop /// -async fn compaction_loop(tenant_id: TenantId) { +async fn compaction_loop(tenant: Arc) { let wait_duration = Duration::from_secs(2); info!("starting"); TENANT_TASK_EVENTS.with_label_values(&["start"]).inc(); @@ -60,16 +65,16 @@ async fn compaction_loop(tenant_id: TenantId) { loop { trace!("waking up"); - let tenant = tokio::select! { + tokio::select! { _ = cancel.cancelled() => { info!("received cancellation request"); return; }, - tenant_wait_result = wait_for_active_tenant(tenant_id, wait_duration) => match tenant_wait_result { + tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result { ControlFlow::Break(()) => return, - ControlFlow::Continue(tenant) => tenant, + ControlFlow::Continue(()) => (), }, - }; + } let period = tenant.get_compaction_period(); @@ -119,7 +124,7 @@ async fn compaction_loop(tenant_id: TenantId) { /// /// GC task's main loop /// -async fn gc_loop(tenant_id: TenantId) { +async fn gc_loop(tenant: Arc) { let wait_duration = Duration::from_secs(2); info!("starting"); TENANT_TASK_EVENTS.with_label_values(&["start"]).inc(); @@ -127,21 +132,22 @@ async fn gc_loop(tenant_id: TenantId) { let cancel = task_mgr::shutdown_token(); // GC might require downloading, to find the cutoff LSN that corresponds to the // cutoff specified as time. - let ctx = RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download); + let ctx = + RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download); let mut first = true; loop { trace!("waking up"); - let tenant = tokio::select! { + tokio::select! { _ = cancel.cancelled() => { info!("received cancellation request"); return; }, - tenant_wait_result = wait_for_active_tenant(tenant_id, wait_duration) => match tenant_wait_result { + tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result { ControlFlow::Break(()) => return, - ControlFlow::Continue(tenant) => tenant, + ControlFlow::Continue(()) => (), }, - }; + } let period = tenant.get_gc_period(); @@ -161,7 +167,9 @@ async fn gc_loop(tenant_id: TenantId) { Duration::from_secs(10) } else { // Run gc - let res = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &ctx).await; + let res = tenant + .gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &ctx) + .await; if let Err(e) = res { error!("Gc failed, retrying in {:?}: {e:?}", wait_duration); wait_duration @@ -187,23 +195,10 @@ async fn gc_loop(tenant_id: TenantId) { trace!("GC loop stopped."); } -async fn wait_for_active_tenant( - tenant_id: TenantId, - wait: Duration, -) -> ControlFlow<(), Arc> { - let tenant = loop { - match mgr::get_tenant(tenant_id, false).await { - Ok(tenant) => break tenant, - Err(e) => { - error!("Failed to get a tenant {tenant_id}: {e:#}"); - tokio::time::sleep(wait).await; - } - } - }; - +async fn wait_for_active_tenant(tenant: &Arc) -> ControlFlow<()> { // if the tenant has a proper status already, no need to wait for anything if tenant.current_state() == TenantState::Active { - ControlFlow::Continue(tenant) + ControlFlow::Continue(()) } else { let mut tenant_state_updates = tenant.subscribe_for_state_updates(); loop { @@ -213,7 +208,7 @@ async fn wait_for_active_tenant( match new_state { TenantState::Active => { debug!("Tenant state changed to active, continuing the task loop"); - return ControlFlow::Continue(tenant); + return ControlFlow::Continue(()); } state => { debug!("Not running the task loop, tenant is not active: {state:?}");