From 0baf91fcacb52a61ea69e07546cbdbe5b087f4e5 Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 26 Oct 2023 20:26:20 +0100 Subject: [PATCH] pageserver: include secondary tenants in disk usage eviction --- pageserver/src/disk_usage_eviction_task.rs | 125 ++++++++++++++++----- pageserver/src/http/routes.rs | 6 +- 2 files changed, 101 insertions(+), 30 deletions(-) diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index 8a2ea6483a..edaea10dd3 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -53,15 +53,18 @@ use serde::{Deserialize, Serialize}; use tokio::time::Instant; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, instrument, warn, Instrument}; -use utils::completion; -use utils::serde_percent::Percent; +use utils::{ + completion, + id::{TenantId, TenantTimelineId}, +}; +use utils::{id::TimelineId, serde_percent::Percent}; use crate::{ config::PageServerConf, task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}, tenant::{ - self, mgr::TenantManager, + secondary::SecondaryTenant, storage_layer::{AsLayerDesc, EvictionError, Layer}, Timeline, }, @@ -188,7 +191,8 @@ async fn disk_usage_eviction_task_iteration( let tenants_dir = tenant_manager.get_conf().tenants_path(); let usage_pre = filesystem_level_usage::get(&tenants_dir, task_config) .context("get filesystem-level disk usage before evictions")?; - let res = disk_usage_eviction_task_iteration_impl(state, usage_pre, cancel).await; + let res = + disk_usage_eviction_task_iteration_impl(state, usage_pre, tenant_manager, cancel).await; match res { Ok(outcome) => { debug!(?outcome, "disk_usage_eviction_iteration finished"); @@ -273,6 +277,7 @@ struct LayerCount { pub async fn disk_usage_eviction_task_iteration_impl( state: &State, usage_pre: U, + tenant_manager: &Arc, cancel: &CancellationToken, ) -> anyhow::Result> { // use tokio's mutex to get a Sync guard (instead of std::sync::Mutex) @@ -292,7 +297,7 @@ pub async fn disk_usage_eviction_task_iteration_impl( "running disk usage based eviction due to pressure" ); - let candidates = match collect_eviction_candidates(cancel).await? { + let candidates = match collect_eviction_candidates(tenant_manager, cancel).await? { EvictionCandidates::Cancelled => { return Ok(IterationOutcome::Cancelled); } @@ -328,7 +333,13 @@ pub async fn disk_usage_eviction_task_iteration_impl( // If we get far enough in the list that we start to evict layers that are below // the tenant's min-resident-size threshold, print a warning, and memorize the disk // usage at that point, in 'usage_planned_min_resident_size_respecting'. + + // Evictions for attached tenants, batched by timeline let mut batched: HashMap<_, Vec<_>> = HashMap::new(); + + // Evictions for secondary locations, batched by tenant + let mut secondary_by_tenant: HashMap> = HashMap::new(); + let mut warned = None; let mut usage_planned = usage_pre; let mut max_batch_size = 0; @@ -351,14 +362,22 @@ pub async fn disk_usage_eviction_task_iteration_impl( // FIXME: batching makes no sense anymore because of no layermap locking, should just spawn // tasks to evict all seen layers until we have evicted enough - let batch = batched.entry(TimelineKey(candidate.timeline)).or_default(); + match candidate.source { + EvictionCandidateSource::Attached(timeline) => { + let batch = batched.entry(TimelineKey(timeline)).or_default(); - // semaphore will later be used to limit eviction concurrency, and we can express at - // most u32 number of permits. unlikely we would have u32::MAX layers to be evicted, - // but fail gracefully by not making batches larger. - if batch.len() < u32::MAX as usize { - batch.push(candidate.layer); - max_batch_size = max_batch_size.max(batch.len()); + // semaphore will later be used to limit eviction concurrency, and we can express at + // most u32 number of permits. unlikely we would have u32::MAX layers to be evicted, + // but fail gracefully by not making batches larger. + if batch.len() < u32::MAX as usize { + batch.push(candidate.layer); + max_batch_size = max_batch_size.max(batch.len()); + } + } + EvictionCandidateSource::Secondary(ttid) => { + let batch = secondary_by_tenant.entry(ttid.tenant_id).or_default(); + batch.push((ttid.timeline_id, candidate.layer)); + } } } @@ -374,7 +393,20 @@ pub async fn disk_usage_eviction_task_iteration_impl( }; debug!(?usage_planned, "usage planned"); - // phase2: evict victims batched by timeline + // phase2 (secondary tenants): evict victims batched by tenant + for (tenant_id, timeline_layers) in secondary_by_tenant { + // Q: Why do we go via TenantManager again rather than just deleting files, or keeping + // an Arc ref to the secondary state? + // A: It's because a given tenant's local storage **belongs** to whoever is currently + // live in the TenantManager. We must avoid a race where we might plan an eviction + // for secondary, and then execute it when the tenant is actually in an attached state. + tenant_manager + .evict_tenant_layers(&tenant_id, timeline_layers) + .instrument(tracing::info_span!("evict_batch", %tenant_id)) + .await; + } + + // phase2 (attached tenants): evict victims batched by timeline let mut js = tokio::task::JoinSet::new(); @@ -482,9 +514,18 @@ pub async fn disk_usage_eviction_task_iteration_impl( })) } +// An eviction candidate might originate from either an attached tenant +// with a [`Tenant`] and [`Timeline`] object, or from a secondary tenant +// location. These differ in how we will execute the eviction. +#[derive(Clone)] +enum EvictionCandidateSource { + Attached(Arc), + Secondary(TenantTimelineId), +} + #[derive(Clone)] struct EvictionCandidate { - timeline: Arc, + source: EvictionCandidateSource, layer: Layer, last_activity_ts: SystemTime, } @@ -534,27 +575,18 @@ enum EvictionCandidates { /// after exhauting the `Above` partition. /// So, we did not respect each tenant's min_resident_size. async fn collect_eviction_candidates( + tenant_manager: &Arc, cancel: &CancellationToken, ) -> anyhow::Result { // get a snapshot of the list of tenants - let tenants = tenant::mgr::list_tenants() - .await - .context("get list of tenants")?; - let mut candidates = Vec::new(); - for (tenant_id, _state) in &tenants { + let tenants = tenant_manager.get_attached_tenants(); + + for tenant in tenants { if cancel.is_cancelled() { return Ok(EvictionCandidates::Cancelled); } - let tenant = match tenant::mgr::get_tenant(*tenant_id, true) { - Ok(tenant) => tenant, - Err(e) => { - // this can happen if tenant has lifecycle transition after we fetched it - debug!("failed to get tenant: {e:#}"); - continue; - } - }; // collect layers from all timelines in this tenant // @@ -617,7 +649,7 @@ async fn collect_eviction_candidates( for (timeline, layer_info) in tenant_candidates.into_iter() { let file_size = layer_info.file_size(); let candidate = EvictionCandidate { - timeline, + source: EvictionCandidateSource::Attached(timeline), last_activity_ts: layer_info.last_activity_ts, layer: layer_info.layer, }; @@ -631,6 +663,43 @@ async fn collect_eviction_candidates( } } + // FIXME: this is a long loop over all secondary locations. At the least, respect + // cancellation here, but really we need to break up the loop. We could extract the + // Arcs and iterate over them with some tokio yields in there. Ideally + // though we should just reduce the total amount of work: our eviction goals do not require + // listing absolutely every layer in every tenant: we could sample this. + tenant_manager.foreach_secondary_tenants( + |tenant_id: &TenantId, state: &Arc| { + let mut tenant_candidates = Vec::new(); + for (timeline_id, layer_info) in state.get_layers_for_eviction() { + debug!(tenant_id=%tenant_id, timeline_id=%timeline_id, "timeline resident layers (secondary) count: {}", layer_info.resident_layers.len()); + tenant_candidates.extend( + layer_info.resident_layers + .into_iter() + .map(|layer_infos| (timeline_id, layer_infos)), + ); + } + + tenant_candidates + .sort_unstable_by_key(|(_, layer_info)| std::cmp::Reverse(layer_info.last_activity_ts)); + + candidates.extend(tenant_candidates.into_iter().map(|(timeline_id, candidate)| { + ( + // Secondary locations' layers are always considered above the min resident size, + // i.e. secondary locations are permitted to be trimmed to zero layers if all + // the layers have sufficiently old access times. + MinResidentSizePartition::Above, + EvictionCandidate { + source: EvictionCandidateSource::Secondary(TenantTimelineId { tenant_id: *tenant_id, timeline_id}), + last_activity_ts: candidate.last_activity_ts, + layer: candidate.layer, + } + ) + })); + + }, + ); + debug_assert!(MinResidentSizePartition::Above < MinResidentSizePartition::Below, "as explained in the function's doc comment, layers that aren't in the tenant's min_resident_size are evicted first"); candidates diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 638ec9cf39..cc0b8d8948 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -1547,11 +1547,12 @@ async fn disk_usage_eviction_run( ))); } - let state = state.disk_usage_eviction_state.clone(); + let eviction_state = state.disk_usage_eviction_state.clone(); let cancel = CancellationToken::new(); let child_cancel = cancel.clone(); let _g = cancel.drop_guard(); + let tenant_manager = state.tenant_manager.clone(); crate::task_mgr::spawn( crate::task_mgr::BACKGROUND_RUNTIME.handle(), @@ -1562,8 +1563,9 @@ async fn disk_usage_eviction_run( false, async move { let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl( - &state, + &eviction_state, usage, + &tenant_manager, &child_cancel, ) .await;