From 807c34b953c10c1450f23133978f48dbd331dd63 Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 26 Oct 2023 20:26:20 +0100 Subject: [PATCH] pageserver: include secondary tenants in disk usage eviction --- pageserver/src/disk_usage_eviction_task.rs | 138 ++++++++++++++---- pageserver/src/http/routes.rs | 8 +- pageserver/src/tenant/storage_layer/layer.rs | 16 +- pageserver/src/tenant/timeline.rs | 14 +- .../src/tenant/timeline/eviction_task.rs | 14 +- 5 files changed, 131 insertions(+), 59 deletions(-) diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index 41cc2d7283..2565c7c6c4 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -48,20 +48,21 @@ use std::{ }; use anyhow::Context; +use pageserver_api::shard::TenantShardId; use remote_storage::GenericRemoteStorage; use serde::{Deserialize, Serialize}; use tokio::time::Instant; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, instrument, warn, Instrument}; use utils::completion; -use utils::serde_percent::Percent; +use utils::{id::TimelineId, serde_percent::Percent}; use crate::{ config::PageServerConf, task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}, tenant::{ - self, mgr::TenantManager, + secondary::SecondaryTenant, storage_layer::{AsLayerDesc, EvictionError, Layer}, Timeline, }, @@ -194,7 +195,9 @@ async fn disk_usage_eviction_task_iteration( let tenants_dir = tenant_manager.get_conf().tenants_path(); let usage_pre = filesystem_level_usage::get(&tenants_dir, task_config) .context("get filesystem-level disk usage before evictions")?; - let res = disk_usage_eviction_task_iteration_impl(state, storage, usage_pre, cancel).await; + let res = + disk_usage_eviction_task_iteration_impl(state, storage, usage_pre, tenant_manager, cancel) + .await; match res { Ok(outcome) => { debug!(?outcome, "disk_usage_eviction_iteration finished"); @@ -280,6 +283,7 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl( state: &State, _storage: &GenericRemoteStorage, usage_pre: U, + tenant_manager: &Arc, cancel: &CancellationToken, ) -> anyhow::Result> { // use tokio's mutex to get a Sync guard (instead of std::sync::Mutex) @@ -299,7 +303,7 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl( "running disk usage based eviction due to pressure" ); - let candidates = match collect_eviction_candidates(cancel).await? { + let candidates = match collect_eviction_candidates(tenant_manager, cancel).await? { EvictionCandidates::Cancelled => { return Ok(IterationOutcome::Cancelled); } @@ -335,11 +339,14 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl( // If we get far enough in the list that we start to evict layers that are below // the tenant's min-resident-size threshold, print a warning, and memorize the disk // usage at that point, in 'usage_planned_min_resident_size_respecting'. + let mut secondary_by_tenant: HashMap> = HashMap::new(); + let mut warned = None; let mut usage_planned = usage_pre; let mut evicted_amount = 0; - for (i, (partition, candidate)) in candidates.iter().enumerate() { + let mut attached_candidates = Vec::new(); + for (i, (partition, candidate)) in candidates.into_iter().enumerate() { if !usage_planned.has_pressure() { debug!( no_candidates_evicted = i, @@ -348,14 +355,23 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl( break; } - if partition == &MinResidentSizePartition::Below && warned.is_none() { + if partition == MinResidentSizePartition::Below && warned.is_none() { warn!(?usage_pre, ?usage_planned, candidate_no=i, "tenant_min_resident_size-respecting LRU would not relieve pressure, evicting more following global LRU policy"); warned = Some(usage_planned); } usage_planned.add_available_bytes(candidate.layer.layer_desc().file_size); evicted_amount += 1; + + // Split off attached vs. secondary tenants' layers: these are handled differently later + if let EvictionCandidateSource::Secondary(ttid) = candidate.source { + let batch = secondary_by_tenant.entry(ttid.0).or_default(); + batch.push((ttid.1, candidate.layer)); + } else { + attached_candidates.push((partition, candidate)); + } } + let candidates = attached_candidates; let usage_planned = match warned { Some(respecting_tenant_min_resident_size) => PlannedUsage { @@ -369,7 +385,20 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl( }; debug!(?usage_planned, "usage planned"); - // phase2: evict layers + // phase2 (secondary tenants): evict victims batched by tenant + for (tenant_shard_id, timeline_layers) in secondary_by_tenant { + // Q: Why do we go via TenantManager again rather than just deleting files, or keeping + // an Arc ref to the secondary state? + // A: It's because a given tenant's local storage **belongs** to whoever is currently + // live in the TenantManager. We must avoid a race where we might plan an eviction + // for secondary, and then execute it when the tenant is actually in an attached state. + tenant_manager + .evict_tenant_layers(&tenant_shard_id, timeline_layers) + .instrument(tracing::info_span!("evict_tenant_layers", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug())) + .await; + } + + // phase2 (attached tenants): evict layers let mut js = tokio::task::JoinSet::new(); let limit = 1000; @@ -419,13 +448,10 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl( }; js.spawn(async move { - let rtc = candidate.timeline.remote_client.as_ref().expect( - "holding the witness, all timelines must have a remote timeline client", - ); let file_size = candidate.layer.layer_desc().file_size; candidate .layer - .evict_and_wait(rtc) + .evict_and_wait() .await .map(|()| file_size) .map_err(|e| (file_size, e)) @@ -456,9 +482,18 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl( })) } +// An eviction candidate might originate from either an attached tenant +// with a [`Tenant`] and [`Timeline`] object, or from a secondary tenant +// location. These differ in how we will execute the eviction. +#[derive(Clone)] +enum EvictionCandidateSource { + Attached(Arc), + Secondary((TenantShardId, TimelineId)), +} + #[derive(Clone)] struct EvictionCandidate { - timeline: Arc, + source: EvictionCandidateSource, layer: Layer, last_activity_ts: SystemTime, } @@ -508,30 +543,25 @@ enum EvictionCandidates { /// after exhauting the `Above` partition. /// So, we did not respect each tenant's min_resident_size. async fn collect_eviction_candidates( + tenant_manager: &Arc, cancel: &CancellationToken, ) -> anyhow::Result { // get a snapshot of the list of tenants - let tenants = tenant::mgr::list_tenants() - .await - .context("get list of tenants")?; - let mut candidates = Vec::new(); - for (tenant_id, _state) in &tenants { + let tenants = tenant_manager.get_attached_active_tenant_shards(); + + for tenant in tenants { if cancel.is_cancelled() { return Ok(EvictionCandidates::Cancelled); } - let tenant = match tenant::mgr::get_tenant(*tenant_id, true) { - Ok(tenant) => tenant, - Err(e) => { - // this can happen if tenant has lifecycle transition after we fetched it - debug!("failed to get tenant: {e:#}"); - continue; - } - }; if tenant.cancel.is_cancelled() { - info!(%tenant_id, "Skipping tenant for eviction, it is shutting down"); + info!(tenant_shard_id=%tenant.get_tenant_shard_id(), "Skipping tenant for eviction, it is shutting down"); + } + + if !tenant.is_active() { + debug!(tenant_shard_id=%tenant.get_tenant_shard_id(), "Ignoring non-active tenant for eviction"); continue; } @@ -596,7 +626,7 @@ async fn collect_eviction_candidates( for (timeline, layer_info) in tenant_candidates.into_iter() { let file_size = layer_info.file_size(); let candidate = EvictionCandidate { - timeline, + source: EvictionCandidateSource::Attached(timeline), last_activity_ts: layer_info.last_activity_ts, layer: layer_info.layer, }; @@ -610,6 +640,60 @@ async fn collect_eviction_candidates( } } + // FIXME: this is a long loop over all secondary locations. At the least, respect + // cancellation here, but really we need to break up the loop. We could extract the + // Arcs and iterate over them with some tokio yields in there. Ideally + // though we should just reduce the total amount of work: our eviction goals do not require + // listing absolutely every layer in every tenant: we could sample this. + let mut secondary_tenants = Vec::new(); + tenant_manager.foreach_secondary_tenants( + |tenant_shard_id: &TenantShardId, state: &Arc| { + secondary_tenants.push((*tenant_shard_id, state.clone())); + }, + ); + + for (tenant_shard_id, secondary_tenant) in secondary_tenants { + for (timeline_id, layer_info) in secondary_tenant + .get_layers_for_eviction(tenant_manager.get_conf(), tenant_shard_id) + .instrument(tracing::info_span!("get_layers_for_eviction", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug())) + .await + { + let mut tenant_candidates = Vec::new(); + debug!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, "timeline resident layers (secondary) count: {}", layer_info.resident_layers.len()); + tenant_candidates.extend( + layer_info + .resident_layers + .into_iter() + .map(|layer_infos| (timeline_id, layer_infos)), + ); + + tenant_candidates.sort_unstable_by_key(|(_, layer_info)| { + std::cmp::Reverse(layer_info.last_activity_ts) + }); + + candidates.extend( + tenant_candidates + .into_iter() + .map(|(timeline_id, candidate)| { + ( + // Secondary locations' layers are always considered above the min resident size, + // i.e. secondary locations are permitted to be trimmed to zero layers if all + // the layers have sufficiently old access times. + MinResidentSizePartition::Above, + EvictionCandidate { + source: EvictionCandidateSource::Secondary(( + tenant_shard_id, + timeline_id, + )), + last_activity_ts: candidate.last_activity_ts, + layer: candidate.layer, + }, + ) + }), + ); + } + } + debug_assert!(MinResidentSizePartition::Above < MinResidentSizePartition::Below, "as explained in the function's doc comment, layers that aren't in the tenant's min_resident_size are evicted first"); candidates diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index f864d4410c..ab5598cdce 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -1618,10 +1618,14 @@ async fn disk_usage_eviction_run( ))); }; - let state = state.disk_usage_eviction_state.clone(); + let eviction_state = state.disk_usage_eviction_state.clone(); let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl( - &state, storage, usage, &cancel, + &eviction_state, + storage, + usage, + &state.tenant_manager, + &cancel, ) .await; diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 903be784ac..994a320690 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -16,7 +16,7 @@ use utils::sync::heavier_once_cell; use crate::config::PageServerConf; use crate::context::RequestContext; use crate::repository::Key; -use crate::tenant::{remote_timeline_client::LayerFileMetadata, RemoteTimelineClient, Timeline}; +use crate::tenant::{remote_timeline_client::LayerFileMetadata, Timeline}; use super::delta_layer::{self, DeltaEntry}; use super::image_layer; @@ -233,17 +233,14 @@ impl Layer { /// /// Technically cancellation safe, but cancelling might shift the viewpoint of what generation /// of download-evict cycle on retry. - pub(crate) async fn evict_and_wait( - &self, - rtc: &RemoteTimelineClient, - ) -> Result<(), EvictionError> { - self.0.evict_and_wait(rtc).await + pub(crate) async fn evict_and_wait(&self) -> Result<(), EvictionError> { + self.0.evict_and_wait().await } /// Delete the layer file when the `self` gets dropped, also try to schedule a remote index upload /// then. /// - /// On drop, this will cause a call to [`RemoteTimelineClient::schedule_deletion_of_unlinked`]. + /// On drop, this will cause a call to [`crate::tenant::remote_timeline_client::RemoteTimelineClient::schedule_deletion_of_unlinked`]. /// This means that the unlinking by [gc] or [compaction] must have happened strictly before /// the value this is called on gets dropped. /// @@ -640,10 +637,7 @@ impl LayerInner { /// Cancellation safe, however dropping the future and calling this method again might result /// in a new attempt to evict OR join the previously started attempt. - pub(crate) async fn evict_and_wait( - &self, - _: &RemoteTimelineClient, - ) -> Result<(), EvictionError> { + pub(crate) async fn evict_and_wait(&self) -> Result<(), EvictionError> { use tokio::sync::broadcast::error::RecvError; assert!(self.have_remote_client); diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 1e84fa1848..f0a381250f 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1127,12 +1127,11 @@ impl Timeline { return Ok(None); }; - let rtc = self - .remote_client + self.remote_client .as_ref() .ok_or_else(|| anyhow::anyhow!("remote storage not configured; cannot evict"))?; - match local_layer.evict_and_wait(rtc).await { + match local_layer.evict_and_wait().await { Ok(()) => Ok(Some(true)), Err(EvictionError::NotFound) => Ok(Some(false)), Err(EvictionError::Downloaded) => Ok(Some(false)), @@ -4598,11 +4597,6 @@ mod tests { .await .unwrap(); - let rtc = timeline - .remote_client - .clone() - .expect("just configured this"); - let layer = find_some_layer(&timeline).await; let layer = layer .keep_resident() @@ -4611,8 +4605,8 @@ mod tests { .expect("should had been resident") .drop_eviction_guard(); - let first = async { layer.evict_and_wait(&rtc).await }; - let second = async { layer.evict_and_wait(&rtc).await }; + let first = async { layer.evict_and_wait().await }; + let second = async { layer.evict_and_wait().await }; let (first, second) = tokio::join!(first, second); diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index ea5f5f5fa7..86e36189d2 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -215,13 +215,10 @@ impl Timeline { // So, we just need to deal with this. - let remote_client = match self.remote_client.as_ref() { - Some(c) => c, - None => { - error!("no remote storage configured, cannot evict layers"); - return ControlFlow::Continue(()); - } - }; + if self.remote_client.is_none() { + error!("no remote storage configured, cannot evict layers"); + return ControlFlow::Continue(()); + } let mut js = tokio::task::JoinSet::new(); { @@ -274,9 +271,8 @@ impl Timeline { }; let layer = guard.drop_eviction_guard(); if no_activity_for > p.threshold { - let remote_client = remote_client.clone(); // this could cause a lot of allocations in some cases - js.spawn(async move { layer.evict_and_wait(&remote_client).await }); + js.spawn(async move { layer.evict_and_wait().await }); stats.candidates += 1; } }