From 7cfd116856d07ab34a63be6c221cc7f9ffd92a58 Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 30 Sep 2024 09:27:28 +0100 Subject: [PATCH] pageserver: refactor immediate_gc into TenantManager (#9183) ## Problem Legacy functions that were called as `mgr::` and relied on the static TENANTS, see #5796 ## Summary of changes - Move the last stray function (immediate_gc) into TenantManager Closes: https://github.com/neondatabase/neon/issues/5796 --- pageserver/src/http/routes.rs | 9 +- pageserver/src/tenant/mgr.rs | 155 +++++++++++++++++----------------- 2 files changed, 85 insertions(+), 79 deletions(-) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 6f0402e7b0..1cc5502bd6 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -56,6 +56,7 @@ use utils::http::endpoint::request_span; use utils::http::request::must_parse_query_param; use utils::http::request::{get_request_param, must_get_query_param, parse_query_param}; +use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext}; use crate::deletion_queue::DeletionQueueClient; use crate::pgdatadir_mapping::LsnForTimestamp; @@ -80,7 +81,6 @@ use crate::tenant::timeline::CompactionError; use crate::tenant::timeline::Timeline; use crate::tenant::GetTimelineError; use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError}; -use crate::{config::PageServerConf, tenant::mgr}; use crate::{disk_usage_eviction_task, tenant}; use pageserver_api::models::{ StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest, TimelineGcRequest, @@ -1719,8 +1719,13 @@ async fn timeline_gc_handler( let gc_req: TimelineGcRequest = json_request(&mut request).await?; + let state = get_state(&request); + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); - let gc_result = mgr::immediate_gc(tenant_shard_id, timeline_id, gc_req, cancel, &ctx).await?; + let gc_result = state + .tenant_manager + .immediate_gc(tenant_shard_id, timeline_id, gc_req, cancel, &ctx) + .await?; json_response(StatusCode::OK, gc_result) } diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index c7212e89ba..9d9852c525 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -2197,6 +2197,82 @@ impl TenantManager { Ok((wanted_bytes, shard_count as u32)) } + + #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id))] + pub(crate) async fn immediate_gc( + &self, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + gc_req: TimelineGcRequest, + cancel: CancellationToken, + ctx: &RequestContext, + ) -> Result { + let tenant = { + let guard = self.tenants.read().unwrap(); + guard + .get(&tenant_shard_id) + .cloned() + .with_context(|| format!("tenant {tenant_shard_id}")) + .map_err(|e| ApiError::NotFound(e.into()))? + }; + + let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon()); + // Use tenant's pitr setting + let pitr = tenant.get_pitr_interval(); + + tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?; + + // Run in task_mgr to avoid race with tenant_detach operation + let ctx: RequestContext = + ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download); + + let _gate_guard = tenant.gate.enter().map_err(|_| ApiError::ShuttingDown)?; + + fail::fail_point!("immediate_gc_task_pre"); + + #[allow(unused_mut)] + let mut result = tenant + .gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx) + .await; + // FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it + // better once the types support it. + + #[cfg(feature = "testing")] + { + // we need to synchronize with drop completion for python tests without polling for + // log messages + if let Ok(result) = result.as_mut() { + let mut js = tokio::task::JoinSet::new(); + for layer in std::mem::take(&mut result.doomed_layers) { + js.spawn(layer.wait_drop()); + } + tracing::info!( + total = js.len(), + "starting to wait for the gc'd layers to be dropped" + ); + while let Some(res) = js.join_next().await { + res.expect("wait_drop should not panic"); + } + } + + let timeline = tenant.get_timeline(timeline_id, false).ok(); + let rtc = timeline.as_ref().map(|x| &x.remote_client); + + if let Some(rtc) = rtc { + // layer drops schedule actions on remote timeline client to actually do the + // deletions; don't care about the shutdown error, just exit fast + drop(rtc.wait_completion().await); + } + } + + result.map_err(|e| match e { + GcError::TenantCancelled | GcError::TimelineCancelled => ApiError::ShuttingDown, + GcError::TimelineNotFound => { + ApiError::NotFound(anyhow::anyhow!("Timeline not found").into()) + } + other => ApiError::InternalServerError(anyhow::anyhow!(other)), + }) + } } #[derive(Debug, thiserror::Error)] @@ -2341,7 +2417,7 @@ enum TenantSlotDropError { /// Errors that can happen any time we are walking the tenant map to try and acquire /// the TenantSlot for a particular tenant. #[derive(Debug, thiserror::Error)] -pub enum TenantMapError { +pub(crate) enum TenantMapError { // Tried to read while initializing #[error("tenant map is still initializing")] StillInitializing, @@ -2371,7 +2447,7 @@ pub enum TenantMapError { /// The `old_value` may be dropped before the SlotGuard is dropped, by calling /// `drop_old_value`. It is an error to call this without shutting down /// the conents of `old_value`. -pub struct SlotGuard { +pub(crate) struct SlotGuard { tenant_shard_id: TenantShardId, old_value: Option, upserted: bool, @@ -2764,81 +2840,6 @@ use { utils::http::error::ApiError, }; -#[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id))] -pub(crate) async fn immediate_gc( - tenant_shard_id: TenantShardId, - timeline_id: TimelineId, - gc_req: TimelineGcRequest, - cancel: CancellationToken, - ctx: &RequestContext, -) -> Result { - let tenant = { - let guard = TENANTS.read().unwrap(); - guard - .get(&tenant_shard_id) - .cloned() - .with_context(|| format!("tenant {tenant_shard_id}")) - .map_err(|e| ApiError::NotFound(e.into()))? - }; - - let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon()); - // Use tenant's pitr setting - let pitr = tenant.get_pitr_interval(); - - tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?; - - // Run in task_mgr to avoid race with tenant_detach operation - let ctx: RequestContext = - ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download); - - let _gate_guard = tenant.gate.enter().map_err(|_| ApiError::ShuttingDown)?; - - fail::fail_point!("immediate_gc_task_pre"); - - #[allow(unused_mut)] - let mut result = tenant - .gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx) - .await; - // FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it - // better once the types support it. - - #[cfg(feature = "testing")] - { - // we need to synchronize with drop completion for python tests without polling for - // log messages - if let Ok(result) = result.as_mut() { - let mut js = tokio::task::JoinSet::new(); - for layer in std::mem::take(&mut result.doomed_layers) { - js.spawn(layer.wait_drop()); - } - tracing::info!( - total = js.len(), - "starting to wait for the gc'd layers to be dropped" - ); - while let Some(res) = js.join_next().await { - res.expect("wait_drop should not panic"); - } - } - - let timeline = tenant.get_timeline(timeline_id, false).ok(); - let rtc = timeline.as_ref().map(|x| &x.remote_client); - - if let Some(rtc) = rtc { - // layer drops schedule actions on remote timeline client to actually do the - // deletions; don't care about the shutdown error, just exit fast - drop(rtc.wait_completion().await); - } - } - - result.map_err(|e| match e { - GcError::TenantCancelled | GcError::TimelineCancelled => ApiError::ShuttingDown, - GcError::TimelineNotFound => { - ApiError::NotFound(anyhow::anyhow!("Timeline not found").into()) - } - other => ApiError::InternalServerError(anyhow::anyhow!(other)), - }) -} - #[cfg(test)] mod tests { use std::collections::BTreeMap;