mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-21 23:20:40 +00:00
pageserver: include secondary tenants in disk usage eviction
This commit is contained in:
@@ -48,20 +48,21 @@ use std::{
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, instrument, warn, Instrument};
|
||||
use utils::completion;
|
||||
use utils::serde_percent::Percent;
|
||||
use utils::{id::TimelineId, serde_percent::Percent};
|
||||
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
||||
tenant::{
|
||||
self,
|
||||
mgr::TenantManager,
|
||||
secondary::SecondaryTenant,
|
||||
storage_layer::{AsLayerDesc, EvictionError, Layer},
|
||||
Timeline,
|
||||
},
|
||||
@@ -194,7 +195,9 @@ async fn disk_usage_eviction_task_iteration(
|
||||
let tenants_dir = tenant_manager.get_conf().tenants_path();
|
||||
let usage_pre = filesystem_level_usage::get(&tenants_dir, task_config)
|
||||
.context("get filesystem-level disk usage before evictions")?;
|
||||
let res = disk_usage_eviction_task_iteration_impl(state, storage, usage_pre, cancel).await;
|
||||
let res =
|
||||
disk_usage_eviction_task_iteration_impl(state, storage, usage_pre, tenant_manager, cancel)
|
||||
.await;
|
||||
match res {
|
||||
Ok(outcome) => {
|
||||
debug!(?outcome, "disk_usage_eviction_iteration finished");
|
||||
@@ -280,6 +283,7 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
state: &State,
|
||||
_storage: &GenericRemoteStorage,
|
||||
usage_pre: U,
|
||||
tenant_manager: &Arc<TenantManager>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<IterationOutcome<U>> {
|
||||
// use tokio's mutex to get a Sync guard (instead of std::sync::Mutex)
|
||||
@@ -299,7 +303,7 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
"running disk usage based eviction due to pressure"
|
||||
);
|
||||
|
||||
let candidates = match collect_eviction_candidates(cancel).await? {
|
||||
let candidates = match collect_eviction_candidates(tenant_manager, cancel).await? {
|
||||
EvictionCandidates::Cancelled => {
|
||||
return Ok(IterationOutcome::Cancelled);
|
||||
}
|
||||
@@ -335,11 +339,14 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
// If we get far enough in the list that we start to evict layers that are below
|
||||
// the tenant's min-resident-size threshold, print a warning, and memorize the disk
|
||||
// usage at that point, in 'usage_planned_min_resident_size_respecting'.
|
||||
let mut secondary_by_tenant: HashMap<TenantShardId, Vec<(TimelineId, Layer)>> = HashMap::new();
|
||||
|
||||
let mut warned = None;
|
||||
let mut usage_planned = usage_pre;
|
||||
let mut evicted_amount = 0;
|
||||
|
||||
for (i, (partition, candidate)) in candidates.iter().enumerate() {
|
||||
let mut attached_candidates = Vec::new();
|
||||
for (i, (partition, candidate)) in candidates.into_iter().enumerate() {
|
||||
if !usage_planned.has_pressure() {
|
||||
debug!(
|
||||
no_candidates_evicted = i,
|
||||
@@ -348,14 +355,23 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
break;
|
||||
}
|
||||
|
||||
if partition == &MinResidentSizePartition::Below && warned.is_none() {
|
||||
if partition == MinResidentSizePartition::Below && warned.is_none() {
|
||||
warn!(?usage_pre, ?usage_planned, candidate_no=i, "tenant_min_resident_size-respecting LRU would not relieve pressure, evicting more following global LRU policy");
|
||||
warned = Some(usage_planned);
|
||||
}
|
||||
|
||||
usage_planned.add_available_bytes(candidate.layer.layer_desc().file_size);
|
||||
evicted_amount += 1;
|
||||
|
||||
// Split off attached vs. secondary tenants' layers: these are handled differently later
|
||||
if let EvictionCandidateSource::Secondary(ttid) = candidate.source {
|
||||
let batch = secondary_by_tenant.entry(ttid.0).or_default();
|
||||
batch.push((ttid.1, candidate.layer));
|
||||
} else {
|
||||
attached_candidates.push((partition, candidate));
|
||||
}
|
||||
}
|
||||
let candidates = attached_candidates;
|
||||
|
||||
let usage_planned = match warned {
|
||||
Some(respecting_tenant_min_resident_size) => PlannedUsage {
|
||||
@@ -369,7 +385,20 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
};
|
||||
debug!(?usage_planned, "usage planned");
|
||||
|
||||
// phase2: evict layers
|
||||
// phase2 (secondary tenants): evict victims batched by tenant
|
||||
for (tenant_shard_id, timeline_layers) in secondary_by_tenant {
|
||||
// Q: Why do we go via TenantManager again rather than just deleting files, or keeping
|
||||
// an Arc ref to the secondary state?
|
||||
// A: It's because a given tenant's local storage **belongs** to whoever is currently
|
||||
// live in the TenantManager. We must avoid a race where we might plan an eviction
|
||||
// for secondary, and then execute it when the tenant is actually in an attached state.
|
||||
tenant_manager
|
||||
.evict_tenant_layers(&tenant_shard_id, timeline_layers)
|
||||
.instrument(tracing::info_span!("evict_tenant_layers", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))
|
||||
.await;
|
||||
}
|
||||
|
||||
// phase2 (attached tenants): evict layers
|
||||
|
||||
let mut js = tokio::task::JoinSet::new();
|
||||
let limit = 1000;
|
||||
@@ -419,13 +448,10 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
};
|
||||
|
||||
js.spawn(async move {
|
||||
let rtc = candidate.timeline.remote_client.as_ref().expect(
|
||||
"holding the witness, all timelines must have a remote timeline client",
|
||||
);
|
||||
let file_size = candidate.layer.layer_desc().file_size;
|
||||
candidate
|
||||
.layer
|
||||
.evict_and_wait(rtc)
|
||||
.evict_and_wait()
|
||||
.await
|
||||
.map(|()| file_size)
|
||||
.map_err(|e| (file_size, e))
|
||||
@@ -456,9 +482,18 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
}))
|
||||
}
|
||||
|
||||
// An eviction candidate might originate from either an attached tenant
|
||||
// with a [`Tenant`] and [`Timeline`] object, or from a secondary tenant
|
||||
// location. These differ in how we will execute the eviction.
|
||||
#[derive(Clone)]
|
||||
enum EvictionCandidateSource {
|
||||
Attached(Arc<Timeline>),
|
||||
Secondary((TenantShardId, TimelineId)),
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct EvictionCandidate {
|
||||
timeline: Arc<Timeline>,
|
||||
source: EvictionCandidateSource,
|
||||
layer: Layer,
|
||||
last_activity_ts: SystemTime,
|
||||
}
|
||||
@@ -508,30 +543,25 @@ enum EvictionCandidates {
|
||||
/// after exhauting the `Above` partition.
|
||||
/// So, we did not respect each tenant's min_resident_size.
|
||||
async fn collect_eviction_candidates(
|
||||
tenant_manager: &Arc<TenantManager>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<EvictionCandidates> {
|
||||
// get a snapshot of the list of tenants
|
||||
let tenants = tenant::mgr::list_tenants()
|
||||
.await
|
||||
.context("get list of tenants")?;
|
||||
|
||||
let mut candidates = Vec::new();
|
||||
|
||||
for (tenant_id, _state) in &tenants {
|
||||
let tenants = tenant_manager.get_attached_active_tenant_shards();
|
||||
|
||||
for tenant in tenants {
|
||||
if cancel.is_cancelled() {
|
||||
return Ok(EvictionCandidates::Cancelled);
|
||||
}
|
||||
let tenant = match tenant::mgr::get_tenant(*tenant_id, true) {
|
||||
Ok(tenant) => tenant,
|
||||
Err(e) => {
|
||||
// this can happen if tenant has lifecycle transition after we fetched it
|
||||
debug!("failed to get tenant: {e:#}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if tenant.cancel.is_cancelled() {
|
||||
info!(%tenant_id, "Skipping tenant for eviction, it is shutting down");
|
||||
info!(tenant_shard_id=%tenant.get_tenant_shard_id(), "Skipping tenant for eviction, it is shutting down");
|
||||
}
|
||||
|
||||
if !tenant.is_active() {
|
||||
debug!(tenant_shard_id=%tenant.get_tenant_shard_id(), "Ignoring non-active tenant for eviction");
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -596,7 +626,7 @@ async fn collect_eviction_candidates(
|
||||
for (timeline, layer_info) in tenant_candidates.into_iter() {
|
||||
let file_size = layer_info.file_size();
|
||||
let candidate = EvictionCandidate {
|
||||
timeline,
|
||||
source: EvictionCandidateSource::Attached(timeline),
|
||||
last_activity_ts: layer_info.last_activity_ts,
|
||||
layer: layer_info.layer,
|
||||
};
|
||||
@@ -610,6 +640,60 @@ async fn collect_eviction_candidates(
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: this is a long loop over all secondary locations. At the least, respect
|
||||
// cancellation here, but really we need to break up the loop. We could extract the
|
||||
// Arc<SecondaryTenant>s and iterate over them with some tokio yields in there. Ideally
|
||||
// though we should just reduce the total amount of work: our eviction goals do not require
|
||||
// listing absolutely every layer in every tenant: we could sample this.
|
||||
let mut secondary_tenants = Vec::new();
|
||||
tenant_manager.foreach_secondary_tenants(
|
||||
|tenant_shard_id: &TenantShardId, state: &Arc<SecondaryTenant>| {
|
||||
secondary_tenants.push((*tenant_shard_id, state.clone()));
|
||||
},
|
||||
);
|
||||
|
||||
for (tenant_shard_id, secondary_tenant) in secondary_tenants {
|
||||
for (timeline_id, layer_info) in secondary_tenant
|
||||
.get_layers_for_eviction(tenant_manager.get_conf(), tenant_shard_id)
|
||||
.instrument(tracing::info_span!("get_layers_for_eviction", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))
|
||||
.await
|
||||
{
|
||||
let mut tenant_candidates = Vec::new();
|
||||
debug!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, "timeline resident layers (secondary) count: {}", layer_info.resident_layers.len());
|
||||
tenant_candidates.extend(
|
||||
layer_info
|
||||
.resident_layers
|
||||
.into_iter()
|
||||
.map(|layer_infos| (timeline_id, layer_infos)),
|
||||
);
|
||||
|
||||
tenant_candidates.sort_unstable_by_key(|(_, layer_info)| {
|
||||
std::cmp::Reverse(layer_info.last_activity_ts)
|
||||
});
|
||||
|
||||
candidates.extend(
|
||||
tenant_candidates
|
||||
.into_iter()
|
||||
.map(|(timeline_id, candidate)| {
|
||||
(
|
||||
// Secondary locations' layers are always considered above the min resident size,
|
||||
// i.e. secondary locations are permitted to be trimmed to zero layers if all
|
||||
// the layers have sufficiently old access times.
|
||||
MinResidentSizePartition::Above,
|
||||
EvictionCandidate {
|
||||
source: EvictionCandidateSource::Secondary((
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
)),
|
||||
last_activity_ts: candidate.last_activity_ts,
|
||||
layer: candidate.layer,
|
||||
},
|
||||
)
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
debug_assert!(MinResidentSizePartition::Above < MinResidentSizePartition::Below,
|
||||
"as explained in the function's doc comment, layers that aren't in the tenant's min_resident_size are evicted first");
|
||||
candidates
|
||||
|
||||
@@ -1618,10 +1618,14 @@ async fn disk_usage_eviction_run(
|
||||
)));
|
||||
};
|
||||
|
||||
let state = state.disk_usage_eviction_state.clone();
|
||||
let eviction_state = state.disk_usage_eviction_state.clone();
|
||||
|
||||
let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl(
|
||||
&state, storage, usage, &cancel,
|
||||
&eviction_state,
|
||||
storage,
|
||||
usage,
|
||||
&state.tenant_manager,
|
||||
&cancel,
|
||||
)
|
||||
.await;
|
||||
|
||||
|
||||
@@ -16,7 +16,7 @@ use utils::sync::heavier_once_cell;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::repository::Key;
|
||||
use crate::tenant::{remote_timeline_client::LayerFileMetadata, RemoteTimelineClient, Timeline};
|
||||
use crate::tenant::{remote_timeline_client::LayerFileMetadata, Timeline};
|
||||
|
||||
use super::delta_layer::{self, DeltaEntry};
|
||||
use super::image_layer;
|
||||
@@ -233,17 +233,14 @@ impl Layer {
|
||||
///
|
||||
/// Technically cancellation safe, but cancelling might shift the viewpoint of what generation
|
||||
/// of download-evict cycle on retry.
|
||||
pub(crate) async fn evict_and_wait(
|
||||
&self,
|
||||
rtc: &RemoteTimelineClient,
|
||||
) -> Result<(), EvictionError> {
|
||||
self.0.evict_and_wait(rtc).await
|
||||
pub(crate) async fn evict_and_wait(&self) -> Result<(), EvictionError> {
|
||||
self.0.evict_and_wait().await
|
||||
}
|
||||
|
||||
/// Delete the layer file when the `self` gets dropped, also try to schedule a remote index upload
|
||||
/// then.
|
||||
///
|
||||
/// On drop, this will cause a call to [`RemoteTimelineClient::schedule_deletion_of_unlinked`].
|
||||
/// On drop, this will cause a call to [`crate::tenant::remote_timeline_client::RemoteTimelineClient::schedule_deletion_of_unlinked`].
|
||||
/// This means that the unlinking by [gc] or [compaction] must have happened strictly before
|
||||
/// the value this is called on gets dropped.
|
||||
///
|
||||
@@ -640,10 +637,7 @@ impl LayerInner {
|
||||
|
||||
/// Cancellation safe, however dropping the future and calling this method again might result
|
||||
/// in a new attempt to evict OR join the previously started attempt.
|
||||
pub(crate) async fn evict_and_wait(
|
||||
&self,
|
||||
_: &RemoteTimelineClient,
|
||||
) -> Result<(), EvictionError> {
|
||||
pub(crate) async fn evict_and_wait(&self) -> Result<(), EvictionError> {
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
|
||||
assert!(self.have_remote_client);
|
||||
|
||||
@@ -1127,12 +1127,11 @@ impl Timeline {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let rtc = self
|
||||
.remote_client
|
||||
self.remote_client
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("remote storage not configured; cannot evict"))?;
|
||||
|
||||
match local_layer.evict_and_wait(rtc).await {
|
||||
match local_layer.evict_and_wait().await {
|
||||
Ok(()) => Ok(Some(true)),
|
||||
Err(EvictionError::NotFound) => Ok(Some(false)),
|
||||
Err(EvictionError::Downloaded) => Ok(Some(false)),
|
||||
@@ -4598,11 +4597,6 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let rtc = timeline
|
||||
.remote_client
|
||||
.clone()
|
||||
.expect("just configured this");
|
||||
|
||||
let layer = find_some_layer(&timeline).await;
|
||||
let layer = layer
|
||||
.keep_resident()
|
||||
@@ -4611,8 +4605,8 @@ mod tests {
|
||||
.expect("should had been resident")
|
||||
.drop_eviction_guard();
|
||||
|
||||
let first = async { layer.evict_and_wait(&rtc).await };
|
||||
let second = async { layer.evict_and_wait(&rtc).await };
|
||||
let first = async { layer.evict_and_wait().await };
|
||||
let second = async { layer.evict_and_wait().await };
|
||||
|
||||
let (first, second) = tokio::join!(first, second);
|
||||
|
||||
|
||||
@@ -215,13 +215,10 @@ impl Timeline {
|
||||
|
||||
// So, we just need to deal with this.
|
||||
|
||||
let remote_client = match self.remote_client.as_ref() {
|
||||
Some(c) => c,
|
||||
None => {
|
||||
error!("no remote storage configured, cannot evict layers");
|
||||
return ControlFlow::Continue(());
|
||||
}
|
||||
};
|
||||
if self.remote_client.is_none() {
|
||||
error!("no remote storage configured, cannot evict layers");
|
||||
return ControlFlow::Continue(());
|
||||
}
|
||||
|
||||
let mut js = tokio::task::JoinSet::new();
|
||||
{
|
||||
@@ -274,9 +271,8 @@ impl Timeline {
|
||||
};
|
||||
let layer = guard.drop_eviction_guard();
|
||||
if no_activity_for > p.threshold {
|
||||
let remote_client = remote_client.clone();
|
||||
// this could cause a lot of allocations in some cases
|
||||
js.spawn(async move { layer.evict_and_wait(&remote_client).await });
|
||||
js.spawn(async move { layer.evict_and_wait().await });
|
||||
stats.candidates += 1;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user