pageserver: eviction for secondary mode tenants (#6225)

Follows #6123 

Closes: https://github.com/neondatabase/neon/issues/5342

The approach here is to avoid using `Layer` from secondary tenants, and
instead make the eviction types (e.g. `EvictionCandidate`) have a
variant that carries a Layer for attached tenants, and a different
variant for secondary tenants.

Other changes:
- EvictionCandidate no longer carries a `Timeline`: this was only used
for providing a witness reference to remote timeline client.
- The types for returning eviction candidates are all in
disk_usage_eviction_task.rs now, whereas some of them were in
timeline.rs before.
- The EvictionCandidate type replaces LocalLayerInfoForDiskUsageEviction
type, which was basically the same thing.
This commit is contained in:
John Spray
2024-01-16 10:29:26 +00:00
committed by GitHub
parent 887e94d7da
commit bf4e708646
9 changed files with 500 additions and 190 deletions

View File

@@ -527,6 +527,7 @@ fn start_pageserver(
conf,
remote_storage.clone(),
disk_usage_eviction_state.clone(),
tenant_manager.clone(),
background_jobs_barrier.clone(),
)?;
}

View File

@@ -47,21 +47,24 @@ use std::{
};
use anyhow::Context;
use camino::Utf8Path;
use pageserver_api::shard::TenantShardId;
use remote_storage::GenericRemoteStorage;
use serde::{Deserialize, Serialize};
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, warn, Instrument};
use utils::completion;
use utils::serde_percent::Percent;
use utils::{completion, id::TimelineId};
use crate::{
config::PageServerConf,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{
self,
storage_layer::{AsLayerDesc, EvictionError, Layer},
mgr::TenantManager,
remote_timeline_client::LayerFileMetadata,
secondary::SecondaryTenant,
storage_layer::{AsLayerDesc, EvictionError, Layer, LayerFileName},
Timeline,
},
};
@@ -125,6 +128,7 @@ pub fn launch_disk_usage_global_eviction_task(
conf: &'static PageServerConf,
storage: GenericRemoteStorage,
state: Arc<State>,
tenant_manager: Arc<TenantManager>,
background_jobs_barrier: completion::Barrier,
) -> anyhow::Result<()> {
let Some(task_config) = &conf.disk_usage_based_eviction else {
@@ -150,8 +154,7 @@ pub fn launch_disk_usage_global_eviction_task(
_ = background_jobs_barrier.wait() => { }
};
disk_usage_eviction_task(&state, task_config, &storage, &conf.tenants_path(), cancel)
.await;
disk_usage_eviction_task(&state, task_config, &storage, tenant_manager, cancel).await;
Ok(())
},
);
@@ -164,7 +167,7 @@ async fn disk_usage_eviction_task(
state: &State,
task_config: &DiskUsageEvictionTaskConfig,
storage: &GenericRemoteStorage,
tenants_dir: &Utf8Path,
tenant_manager: Arc<TenantManager>,
cancel: CancellationToken,
) {
scopeguard::defer! {
@@ -191,7 +194,7 @@ async fn disk_usage_eviction_task(
state,
task_config,
storage,
tenants_dir,
&tenant_manager,
&cancel,
)
.await;
@@ -226,15 +229,17 @@ async fn disk_usage_eviction_task_iteration(
state: &State,
task_config: &DiskUsageEvictionTaskConfig,
storage: &GenericRemoteStorage,
tenants_dir: &Utf8Path,
tenant_manager: &Arc<TenantManager>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let usage_pre = filesystem_level_usage::get(tenants_dir, task_config)
let tenants_dir = tenant_manager.get_conf().tenants_path();
let usage_pre = filesystem_level_usage::get(&tenants_dir, task_config)
.context("get filesystem-level disk usage before evictions")?;
let res = disk_usage_eviction_task_iteration_impl(
state,
storage,
usage_pre,
tenant_manager,
task_config.eviction_order,
cancel,
)
@@ -248,7 +253,7 @@ async fn disk_usage_eviction_task_iteration(
}
IterationOutcome::Finished(outcome) => {
// Verify with statvfs whether we made any real progress
let after = filesystem_level_usage::get(tenants_dir, task_config)
let after = filesystem_level_usage::get(&tenants_dir, task_config)
// It's quite unlikely to hit the error here. Keep the code simple and bail out.
.context("get filesystem-level disk usage after evictions")?;
@@ -324,6 +329,7 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
state: &State,
_storage: &GenericRemoteStorage,
usage_pre: U,
tenant_manager: &Arc<TenantManager>,
eviction_order: EvictionOrder,
cancel: &CancellationToken,
) -> anyhow::Result<IterationOutcome<U>> {
@@ -344,29 +350,29 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
"running disk usage based eviction due to pressure"
);
let candidates = match collect_eviction_candidates(eviction_order, cancel).await? {
EvictionCandidates::Cancelled => {
return Ok(IterationOutcome::Cancelled);
}
EvictionCandidates::Finished(partitioned) => partitioned,
};
let candidates =
match collect_eviction_candidates(tenant_manager, eviction_order, cancel).await? {
EvictionCandidates::Cancelled => {
return Ok(IterationOutcome::Cancelled);
}
EvictionCandidates::Finished(partitioned) => partitioned,
};
// Debug-log the list of candidates
let now = SystemTime::now();
for (i, (partition, candidate)) in candidates.iter().enumerate() {
let nth = i + 1;
let desc = candidate.layer.layer_desc();
let total_candidates = candidates.len();
let size = desc.file_size;
let size = candidate.layer.get_file_size();
let rel = candidate.relative_last_activity;
debug!(
"cand {nth}/{total_candidates}: size={size}, rel_last_activity={rel}, no_access_for={}us, partition={partition:?}, {}/{}/{}",
now.duration_since(candidate.last_activity_ts)
.unwrap()
.as_micros(),
desc.tenant_shard_id,
desc.timeline_id,
candidate.layer,
candidate.layer.get_tenant_shard_id(),
candidate.layer.get_timeline_id(),
candidate.layer.get_name(),
);
}
@@ -398,7 +404,7 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
warned = Some(usage_planned);
}
usage_planned.add_available_bytes(candidate.layer.layer_desc().file_size);
usage_planned.add_available_bytes(candidate.layer.get_file_size());
evicted_amount += 1;
}
@@ -463,19 +469,30 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
continue;
};
js.spawn(async move {
let rtc = candidate.timeline.remote_client.as_ref().expect(
"holding the witness, all timelines must have a remote timeline client",
);
let file_size = candidate.layer.layer_desc().file_size;
candidate
.layer
.evict_and_wait(rtc)
.await
.map(|()| file_size)
.map_err(|e| (file_size, e))
});
match candidate.layer {
EvictionLayer::Attached(layer) => {
let file_size = layer.layer_desc().file_size;
js.spawn(async move {
layer
.evict_and_wait()
.await
.map(|()| file_size)
.map_err(|e| (file_size, e))
});
}
EvictionLayer::Secondary(layer) => {
let file_size = layer.metadata.file_size();
let tenant_manager = tenant_manager.clone();
js.spawn(async move {
layer
.secondary_tenant
.evict_layer(tenant_manager.get_conf(), layer.timeline_id, layer.name)
.await;
Ok(file_size)
});
}
}
tokio::task::yield_now().await;
}
@@ -502,11 +519,100 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
}
#[derive(Clone)]
struct EvictionCandidate {
timeline: Arc<Timeline>,
layer: Layer,
last_activity_ts: SystemTime,
relative_last_activity: finite_f32::FiniteF32,
pub(crate) struct EvictionSecondaryLayer {
pub(crate) secondary_tenant: Arc<SecondaryTenant>,
pub(crate) timeline_id: TimelineId,
pub(crate) name: LayerFileName,
pub(crate) metadata: LayerFileMetadata,
}
/// Full [`Layer`] objects are specific to tenants in attached mode. This type is a layer
/// of indirection to store either a `Layer`, or a reference to a secondary tenant and a layer name.
#[derive(Clone)]
pub(crate) enum EvictionLayer {
Attached(Layer),
#[allow(dead_code)]
Secondary(EvictionSecondaryLayer),
}
impl From<Layer> for EvictionLayer {
fn from(value: Layer) -> Self {
Self::Attached(value)
}
}
impl EvictionLayer {
pub(crate) fn get_tenant_shard_id(&self) -> &TenantShardId {
match self {
Self::Attached(l) => &l.layer_desc().tenant_shard_id,
Self::Secondary(sl) => sl.secondary_tenant.get_tenant_shard_id(),
}
}
pub(crate) fn get_timeline_id(&self) -> &TimelineId {
match self {
Self::Attached(l) => &l.layer_desc().timeline_id,
Self::Secondary(sl) => &sl.timeline_id,
}
}
pub(crate) fn get_name(&self) -> LayerFileName {
match self {
Self::Attached(l) => l.layer_desc().filename(),
Self::Secondary(sl) => sl.name.clone(),
}
}
pub(crate) fn get_file_size(&self) -> u64 {
match self {
Self::Attached(l) => l.layer_desc().file_size,
Self::Secondary(sl) => sl.metadata.file_size(),
}
}
}
#[derive(Clone)]
pub(crate) struct EvictionCandidate {
pub(crate) layer: EvictionLayer,
pub(crate) last_activity_ts: SystemTime,
pub(crate) relative_last_activity: finite_f32::FiniteF32,
}
impl std::fmt::Display for EvictionLayer {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Self::Attached(l) => l.fmt(f),
Self::Secondary(sl) => {
write!(f, "{}/{}", sl.timeline_id, sl.name)
}
}
}
}
pub(crate) struct DiskUsageEvictionInfo {
/// Timeline's largest layer (remote or resident)
pub max_layer_size: Option<u64>,
/// Timeline's resident layers
pub resident_layers: Vec<EvictionCandidate>,
}
impl std::fmt::Debug for EvictionCandidate {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// format the tv_sec, tv_nsec into rfc3339 in case someone is looking at it
// having to allocate a string to this is bad, but it will rarely be formatted
let ts = chrono::DateTime::<chrono::Utc>::from(self.last_activity_ts);
let ts = ts.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true);
struct DisplayIsDebug<'a, T>(&'a T);
impl<'a, T: std::fmt::Display> std::fmt::Debug for DisplayIsDebug<'a, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
f.debug_struct("LocalLayerInfoForDiskUsageEviction")
.field("layer", &DisplayIsDebug(&self.layer))
.field("last_activity", &ts)
.finish()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
@@ -623,6 +729,7 @@ enum EvictionCandidates {
/// - tenant B 1 layer
/// - tenant C 8 layers
async fn collect_eviction_candidates(
tenant_manager: &Arc<TenantManager>,
eviction_order: EvictionOrder,
cancel: &CancellationToken,
) -> anyhow::Result<EvictionCandidates> {
@@ -631,13 +738,16 @@ async fn collect_eviction_candidates(
.await
.context("get list of tenants")?;
// TODO: avoid listing every layer in every tenant: this loop can block the executor,
// and the resulting data structure can be huge.
// (https://github.com/neondatabase/neon/issues/6224)
let mut candidates = Vec::new();
for (tenant_id, _state, _gen) in &tenants {
for (tenant_id, _state, _gen) in tenants {
if cancel.is_cancelled() {
return Ok(EvictionCandidates::Cancelled);
}
let tenant = match tenant::mgr::get_tenant(*tenant_id, true) {
let tenant = match tenant::mgr::get_tenant(tenant_id, true) {
Ok(tenant) => tenant,
Err(e) => {
// this can happen if tenant has lifecycle transition after we fetched it
@@ -665,11 +775,7 @@ async fn collect_eviction_candidates(
}
let info = tl.get_local_layers_for_disk_usage_eviction().await;
debug!(tenant_id=%tl.tenant_shard_id.tenant_id, shard_id=%tl.tenant_shard_id.shard_slug(), timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len());
tenant_candidates.extend(
info.resident_layers
.into_iter()
.map(|layer_infos| (tl.clone(), layer_infos)),
);
tenant_candidates.extend(info.resident_layers.into_iter());
max_layer_size = max_layer_size.max(info.max_layer_size.unwrap_or(0));
if cancel.is_cancelled() {
@@ -707,7 +813,7 @@ async fn collect_eviction_candidates(
// Sort layers most-recently-used first, then partition by
// cumsum above/below min_resident_size.
tenant_candidates
.sort_unstable_by_key(|(_, layer_info)| std::cmp::Reverse(layer_info.last_activity_ts));
.sort_unstable_by_key(|layer_info| std::cmp::Reverse(layer_info.last_activity_ts));
let mut cumsum: i128 = 0;
// keeping the -1 or not decides if every tenant should lose their least recently accessed
@@ -741,12 +847,10 @@ async fn collect_eviction_candidates(
.unwrap_or(1);
let divider = total as f32;
for (i, (timeline, layer_info)) in tenant_candidates.into_iter().enumerate() {
let file_size = layer_info.file_size();
for (i, mut candidate) in tenant_candidates.into_iter().enumerate() {
// as we iterate this reverse sorted list, the most recently accessed layer will always
// be 1.0; this is for us to evict it last.
let relative_last_activity = if matches!(
candidate.relative_last_activity = if matches!(
eviction_order,
EvictionOrder::RelativeAccessed { .. }
) {
@@ -761,22 +865,46 @@ async fn collect_eviction_candidates(
finite_f32::FiniteF32::ZERO
};
let candidate = EvictionCandidate {
timeline,
last_activity_ts: layer_info.last_activity_ts,
layer: layer_info.layer,
relative_last_activity,
};
let partition = if cumsum > min_resident_size as i128 {
MinResidentSizePartition::Above
} else {
MinResidentSizePartition::Below
};
cumsum += i128::from(candidate.layer.get_file_size());
candidates.push((partition, candidate));
cumsum += i128::from(file_size);
}
}
// Note: the same tenant ID might be hit twice, if it transitions from attached to
// secondary while we run. That is okay: when we eventually try and run the eviction,
// the `Gate` on the object will ensure that whichever one has already been shut down
// will not delete anything.
let mut secondary_tenants = Vec::new();
tenant_manager.foreach_secondary_tenants(
|_tenant_shard_id: &TenantShardId, state: &Arc<SecondaryTenant>| {
secondary_tenants.push(state.clone());
},
);
for secondary_tenant in secondary_tenants {
let mut layer_info = secondary_tenant.get_layers_for_eviction();
layer_info
.resident_layers
.sort_unstable_by_key(|layer_info| std::cmp::Reverse(layer_info.last_activity_ts));
candidates.extend(layer_info.resident_layers.into_iter().map(|candidate| {
(
// Secondary locations' layers are always considered above the min resident size,
// i.e. secondary locations are permitted to be trimmed to zero layers if all
// the layers have sufficiently old access times.
MinResidentSizePartition::Above,
candidate,
)
}));
}
debug_assert!(MinResidentSizePartition::Above < MinResidentSizePartition::Below,
"as explained in the function's doc comment, layers that aren't in the tenant's min_resident_size are evicted first");
@@ -821,7 +949,7 @@ impl std::ops::Deref for TimelineKey {
}
/// A totally ordered f32 subset we can use with sorting functions.
mod finite_f32 {
pub(crate) mod finite_f32 {
/// A totally ordered f32 subset we can use with sorting functions.
#[derive(Clone, Copy, PartialEq)]

View File

@@ -1655,12 +1655,13 @@ async fn disk_usage_eviction_run(
)));
};
let state = state.disk_usage_eviction_state.clone();
let eviction_state = state.disk_usage_eviction_state.clone();
let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl(
&state,
&eviction_state,
storage,
usage,
&state.tenant_manager,
config.eviction_order,
&cancel,
)

View File

@@ -3,22 +3,31 @@ pub mod heatmap;
mod heatmap_uploader;
mod scheduler;
use std::sync::Arc;
use std::{sync::Arc, time::SystemTime};
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
use crate::{
config::PageServerConf,
disk_usage_eviction_task::DiskUsageEvictionInfo,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
virtual_file::MaybeFatalIo,
};
use self::{
downloader::{downloader_task, SecondaryDetail},
heatmap_uploader::heatmap_uploader_task,
};
use super::{config::SecondaryLocationConfig, mgr::TenantManager};
use super::{
config::SecondaryLocationConfig, mgr::TenantManager,
span::debug_assert_current_span_has_tenant_id, storage_layer::LayerFileName,
};
use pageserver_api::shard::TenantShardId;
use remote_storage::GenericRemoteStorage;
use tokio_util::sync::CancellationToken;
use utils::{completion::Barrier, sync::gate::Gate};
use tracing::instrument;
use utils::{completion::Barrier, fs_ext, id::TimelineId, sync::gate::Gate};
enum DownloadCommand {
Download(TenantShardId),
@@ -107,9 +116,67 @@ impl SecondaryTenant {
self.detail.lock().unwrap().config = config.clone();
}
fn get_tenant_shard_id(&self) -> &TenantShardId {
pub(crate) fn get_tenant_shard_id(&self) -> &TenantShardId {
&self.tenant_shard_id
}
pub(crate) fn get_layers_for_eviction(self: &Arc<Self>) -> DiskUsageEvictionInfo {
self.detail.lock().unwrap().get_layers_for_eviction(self)
}
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline_id, name=%name))]
pub(crate) async fn evict_layer(
&self,
conf: &PageServerConf,
timeline_id: TimelineId,
name: LayerFileName,
) {
debug_assert_current_span_has_tenant_id();
let _guard = match self.gate.enter() {
Ok(g) => g,
Err(_) => {
tracing::debug!("Dropping layer evictions, secondary tenant shutting down",);
return;
}
};
let now = SystemTime::now();
let path = conf
.timeline_path(&self.tenant_shard_id, &timeline_id)
.join(name.file_name());
// We tolerate ENOENT, because between planning eviction and executing
// it, the secondary downloader could have seen an updated heatmap that
// resulted in a layer being deleted.
// Other local I/O errors are process-fatal: these should never happen.
tokio::fs::remove_file(path)
.await
.or_else(fs_ext::ignore_not_found)
.fatal_err("Deleting layer during eviction");
// Update the timeline's state. This does not have to be synchronized with
// the download process, because:
// - If downloader is racing with us to remove a file (e.g. because it is
// removed from heatmap), then our mutual .remove() operations will both
// succeed.
// - If downloader is racing with us to download the object (this would require
// multiple eviction iterations to race with multiple download iterations), then
// if we remove it from the state, the worst that happens is the downloader
// downloads it again before re-inserting, or we delete the file but it remains
// in the state map (in which case it will be downloaded if this secondary
// tenant transitions to attached and tries to access it)
//
// The important assumption here is that the secondary timeline state does not
// have to 100% match what is on disk, because it's a best-effort warming
// of the cache.
let mut detail = self.detail.lock().unwrap();
if let Some(timeline_detail) = detail.timelines.get_mut(&timeline_id) {
timeline_detail.on_disk_layers.remove(&name);
timeline_detail.evicted_at.insert(name, now);
}
}
}
/// The SecondaryController is a pseudo-rpc client for administrative control of secondary mode downloads,

View File

@@ -8,6 +8,9 @@ use std::{
use crate::{
config::PageServerConf,
disk_usage_eviction_task::{
finite_f32, DiskUsageEvictionInfo, EvictionCandidate, EvictionLayer, EvictionSecondaryLayer,
},
metrics::SECONDARY_MODE,
tenant::{
config::SecondaryLocationConfig,
@@ -142,6 +145,46 @@ impl SecondaryDetail {
timelines: HashMap::new(),
}
}
pub(super) fn get_layers_for_eviction(
&self,
parent: &Arc<SecondaryTenant>,
) -> DiskUsageEvictionInfo {
let mut result = DiskUsageEvictionInfo {
max_layer_size: None,
resident_layers: Vec::new(),
};
for (timeline_id, timeline_detail) in &self.timelines {
result
.resident_layers
.extend(timeline_detail.on_disk_layers.iter().map(|(name, ods)| {
EvictionCandidate {
layer: EvictionLayer::Secondary(EvictionSecondaryLayer {
secondary_tenant: parent.clone(),
timeline_id: *timeline_id,
name: name.clone(),
metadata: ods.metadata.clone(),
}),
last_activity_ts: ods.access_time,
relative_last_activity: finite_f32::FiniteF32::ZERO,
}
}));
}
result.max_layer_size = result
.resident_layers
.iter()
.map(|l| l.layer.get_file_size())
.max();
tracing::debug!(
"eviction: secondary tenant {} found {} timelines, {} layers",
parent.get_tenant_shard_id(),
self.timelines.len(),
result.resident_layers.len()
);
result
}
}
struct PendingDownload {

View File

@@ -15,7 +15,7 @@ use utils::sync::heavier_once_cell;
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::repository::Key;
use crate::tenant::{remote_timeline_client::LayerFileMetadata, RemoteTimelineClient, Timeline};
use crate::tenant::{remote_timeline_client::LayerFileMetadata, Timeline};
use super::delta_layer::{self, DeltaEntry};
use super::image_layer;
@@ -204,17 +204,14 @@ impl Layer {
///
/// Technically cancellation safe, but cancelling might shift the viewpoint of what generation
/// of download-evict cycle on retry.
pub(crate) async fn evict_and_wait(
&self,
rtc: &RemoteTimelineClient,
) -> Result<(), EvictionError> {
self.0.evict_and_wait(rtc).await
pub(crate) async fn evict_and_wait(&self) -> Result<(), EvictionError> {
self.0.evict_and_wait().await
}
/// Delete the layer file when the `self` gets dropped, also try to schedule a remote index upload
/// then.
///
/// On drop, this will cause a call to [`RemoteTimelineClient::schedule_deletion_of_unlinked`].
/// On drop, this will cause a call to [`crate::tenant::remote_timeline_client::RemoteTimelineClient::schedule_deletion_of_unlinked`].
/// This means that the unlinking by [gc] or [compaction] must have happened strictly before
/// the value this is called on gets dropped.
///
@@ -606,10 +603,7 @@ impl LayerInner {
/// Cancellation safe, however dropping the future and calling this method again might result
/// in a new attempt to evict OR join the previously started attempt.
pub(crate) async fn evict_and_wait(
&self,
_: &RemoteTimelineClient,
) -> Result<(), EvictionError> {
pub(crate) async fn evict_and_wait(&self) -> Result<(), EvictionError> {
use tokio::sync::broadcast::error::RecvError;
assert!(self.have_remote_client);

View File

@@ -42,15 +42,6 @@ use std::{
ops::ControlFlow,
};
use crate::context::{
AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder,
};
use crate::tenant::storage_layer::delta_layer::DeltaEntry;
use crate::tenant::storage_layer::{
AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer,
LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult,
ValueReconstructState,
};
use crate::tenant::tasks::BackgroundLoopKind;
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
use crate::tenant::{
@@ -58,7 +49,22 @@ use crate::tenant::{
metadata::{save_metadata, TimelineMetadata},
par_fsync,
};
use crate::{
context::{AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder},
disk_usage_eviction_task::DiskUsageEvictionInfo,
};
use crate::{deletion_queue::DeletionQueueClient, tenant::remote_timeline_client::StopError};
use crate::{
disk_usage_eviction_task::finite_f32,
tenant::storage_layer::{
AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer,
LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult,
ValueReconstructState,
},
};
use crate::{
disk_usage_eviction_task::EvictionCandidate, tenant::storage_layer::delta_layer::DeltaEntry,
};
use crate::config::PageServerConf;
use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
@@ -1133,12 +1139,7 @@ impl Timeline {
return Ok(None);
};
let rtc = self
.remote_client
.as_ref()
.ok_or_else(|| anyhow::anyhow!("remote storage not configured; cannot evict"))?;
match local_layer.evict_and_wait(rtc).await {
match local_layer.evict_and_wait().await {
Ok(()) => Ok(Some(true)),
Err(EvictionError::NotFound) => Ok(Some(false)),
Err(EvictionError::Downloaded) => Ok(Some(false)),
@@ -2102,7 +2103,7 @@ impl Timeline {
let layer_file_names = eviction_info
.resident_layers
.iter()
.map(|l| l.layer.layer_desc().filename())
.map(|l| l.layer.get_name())
.collect::<Vec<_>>();
let decorated = match remote_client.get_layers_metadata(layer_file_names) {
@@ -2120,7 +2121,7 @@ impl Timeline {
.filter_map(|(layer, remote_info)| {
remote_info.map(|remote_info| {
HeatMapLayer::new(
layer.layer.layer_desc().filename(),
layer.layer.get_name(),
IndexLayerMetadata::from(remote_info),
layer.last_activity_ts,
)
@@ -4423,43 +4424,6 @@ impl Timeline {
}
}
pub(crate) struct DiskUsageEvictionInfo {
/// Timeline's largest layer (remote or resident)
pub max_layer_size: Option<u64>,
/// Timeline's resident layers
pub resident_layers: Vec<LocalLayerInfoForDiskUsageEviction>,
}
pub(crate) struct LocalLayerInfoForDiskUsageEviction {
pub layer: Layer,
pub last_activity_ts: SystemTime,
}
impl std::fmt::Debug for LocalLayerInfoForDiskUsageEviction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// format the tv_sec, tv_nsec into rfc3339 in case someone is looking at it
// having to allocate a string to this is bad, but it will rarely be formatted
let ts = chrono::DateTime::<chrono::Utc>::from(self.last_activity_ts);
let ts = ts.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true);
struct DisplayIsDebug<'a, T>(&'a T);
impl<'a, T: std::fmt::Display> std::fmt::Debug for DisplayIsDebug<'a, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
f.debug_struct("LocalLayerInfoForDiskUsageEviction")
.field("layer", &DisplayIsDebug(&self.layer))
.field("last_activity", &ts)
.finish()
}
}
impl LocalLayerInfoForDiskUsageEviction {
pub fn file_size(&self) -> u64 {
self.layer.layer_desc().file_size
}
}
impl Timeline {
/// Returns non-remote layers for eviction.
pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
@@ -4493,9 +4457,10 @@ impl Timeline {
SystemTime::now()
});
resident_layers.push(LocalLayerInfoForDiskUsageEviction {
layer: l.drop_eviction_guard(),
resident_layers.push(EvictionCandidate {
layer: l.drop_eviction_guard().into(),
last_activity_ts,
relative_last_activity: finite_f32::FiniteF32::ZERO,
});
}
@@ -4652,11 +4617,6 @@ mod tests {
.await
.unwrap();
let rtc = timeline
.remote_client
.clone()
.expect("just configured this");
let layer = find_some_layer(&timeline).await;
let layer = layer
.keep_resident()
@@ -4665,8 +4625,8 @@ mod tests {
.expect("should had been resident")
.drop_eviction_guard();
let first = async { layer.evict_and_wait(&rtc).await };
let second = async { layer.evict_and_wait(&rtc).await };
let first = async { layer.evict_and_wait().await };
let second = async { layer.evict_and_wait().await };
let (first, second) = tokio::join!(first, second);

View File

@@ -215,13 +215,10 @@ impl Timeline {
// So, we just need to deal with this.
let remote_client = match self.remote_client.as_ref() {
Some(c) => c,
None => {
error!("no remote storage configured, cannot evict layers");
return ControlFlow::Continue(());
}
};
if self.remote_client.is_none() {
error!("no remote storage configured, cannot evict layers");
return ControlFlow::Continue(());
}
let mut js = tokio::task::JoinSet::new();
{
@@ -274,9 +271,8 @@ impl Timeline {
};
let layer = guard.drop_eviction_guard();
if no_activity_for > p.threshold {
let remote_client = remote_client.clone();
// this could cause a lot of allocations in some cases
js.spawn(async move { layer.evict_and_wait(&remote_client).await });
js.spawn(async move { layer.evict_and_wait().await });
stats.candidates += 1;
}
}

View File

@@ -9,6 +9,7 @@ from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
NeonPageserver,
PgBin,
wait_for_last_flush_lsn,
)
@@ -75,9 +76,15 @@ class EvictionOrder(str, enum.Enum):
if self == EvictionOrder.ABSOLUTE_ORDER:
return {"type": "AbsoluteAccessed"}
elif self == EvictionOrder.RELATIVE_ORDER_EQUAL:
return {"type": "RelativeAccessed", "args": {"highest_layer_count_loses_first": False}}
return {
"type": "RelativeAccessed",
"args": {"highest_layer_count_loses_first": False},
}
elif self == EvictionOrder.RELATIVE_ORDER_SPARE:
return {"type": "RelativeAccessed", "args": {"highest_layer_count_loses_first": True}}
return {
"type": "RelativeAccessed",
"args": {"highest_layer_count_loses_first": True},
}
else:
raise RuntimeError(f"not implemented: {self}")
@@ -91,14 +98,24 @@ class EvictionEnv:
layer_size: int
pgbench_init_lsns: Dict[TenantId, Lsn]
def timelines_du(self) -> Tuple[int, int, int]:
@property
def pageserver(self):
"""
Shortcut for tests that only use one pageserver.
"""
return self.neon_env.pageserver
def timelines_du(self, pageserver: NeonPageserver) -> Tuple[int, int, int]:
return poor_mans_du(
self.neon_env, [(tid, tlid) for tid, tlid in self.timelines], verbose=False
self.neon_env,
[(tid, tlid) for tid, tlid in self.timelines],
pageserver,
verbose=False,
)
def du_by_timeline(self) -> Dict[Tuple[TenantId, TimelineId], int]:
def du_by_timeline(self, pageserver: NeonPageserver) -> Dict[Tuple[TenantId, TimelineId], int]:
return {
(tid, tlid): poor_mans_du(self.neon_env, [(tid, tlid)], verbose=True)[0]
(tid, tlid): poor_mans_du(self.neon_env, [(tid, tlid)], pageserver, verbose=True)[0]
for tid, tlid in self.timelines
}
@@ -126,7 +143,13 @@ class EvictionEnv:
_avg = cur.fetchone()
def pageserver_start_with_disk_usage_eviction(
self, period, max_usage_pct, min_avail_bytes, mock_behavior, eviction_order: EvictionOrder
self,
pageserver: NeonPageserver,
period,
max_usage_pct,
min_avail_bytes,
mock_behavior,
eviction_order: EvictionOrder,
):
disk_usage_config = {
"period": period,
@@ -138,7 +161,12 @@ class EvictionEnv:
enc = toml.TomlEncoder()
self.neon_env.pageserver.start(
# these can sometimes happen during startup before any tenants have been
# loaded, so nothing can be evicted, we just wait for next iteration which
# is able to evict.
pageserver.allowed_errors.append(".*WARN.* disk usage still high.*")
pageserver.start(
overrides=(
"--pageserver-config-override=disk_usage_based_eviction="
+ enc.dump_inline_table(disk_usage_config).replace("\n", " "),
@@ -152,15 +180,10 @@ class EvictionEnv:
)
def statvfs_called():
assert self.neon_env.pageserver.log_contains(".*running mocked statvfs.*")
assert pageserver.log_contains(".*running mocked statvfs.*")
wait_until(10, 1, statvfs_called)
# these can sometimes happen during startup before any tenants have been
# loaded, so nothing can be evicted, we just wait for next iteration which
# is able to evict.
self.neon_env.pageserver.allowed_errors.append(".*WARN.* disk usage still high.*")
def human_bytes(amt: float) -> str:
suffixes = ["", "Ki", "Mi", "Gi"]
@@ -175,23 +198,28 @@ def human_bytes(amt: float) -> str:
raise RuntimeError("unreachable")
@pytest.fixture
def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> EvictionEnv:
def _eviction_env(
request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, num_pageservers: int
) -> EvictionEnv:
"""
Creates two tenants, one somewhat larger than the other.
"""
log.info(f"setting up eviction_env for test {request.node.name}")
neon_env_builder.num_pageservers = num_pageservers
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
# initial tenant will not be present on this pageserver
env = neon_env_builder.init_configs()
env.start()
pageserver_http = env.pageserver.http_client()
# We will create all tenants on the 0th pageserver
pageserver_http = env.pageservers[0].http_client()
# allow because we are invoking this manually; we always warn on executing disk based eviction
env.pageserver.allowed_errors.append(r".* running disk usage based eviction due to pressure.*")
for ps in env.pageservers:
ps.allowed_errors.append(r".* running disk usage based eviction due to pressure.*")
# Choose small layer_size so that we can use low pgbench_scales and still get a large count of layers.
# Large count of layers and small layer size is good for testing because it makes evictions predictable.
@@ -216,7 +244,7 @@ def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> Ev
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
pg_bin.run(["pgbench", "-i", f"-s{scale}", endpoint.connstr()])
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id, pageserver_id=1)
timelines.append((tenant_id, timeline_id))
@@ -252,6 +280,20 @@ def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> Ev
return eviction_env
@pytest.fixture
def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> EvictionEnv:
return _eviction_env(request, neon_env_builder, pg_bin, num_pageservers=1)
@pytest.fixture
def eviction_env_ha(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> EvictionEnv:
"""
Variant of the eviction environment with two pageservers for testing eviction on
HA configurations with a secondary location.
"""
return _eviction_env(request, neon_env_builder, pg_bin, num_pageservers=2)
def test_broken_tenants_are_skipped(eviction_env: EvictionEnv):
env = eviction_env
@@ -264,10 +306,16 @@ def test_broken_tenants_are_skipped(eviction_env: EvictionEnv):
healthy_tenant_id, healthy_timeline_id = env.timelines[1]
broken_size_pre, _, _ = poor_mans_du(
env.neon_env, [(broken_tenant_id, broken_timeline_id)], verbose=True
env.neon_env,
[(broken_tenant_id, broken_timeline_id)],
env.pageserver,
verbose=True,
)
healthy_size_pre, _, _ = poor_mans_du(
env.neon_env, [(healthy_tenant_id, healthy_timeline_id)], verbose=True
env.neon_env,
[(healthy_tenant_id, healthy_timeline_id)],
env.pageserver,
verbose=True,
)
# try to evict everything, then validate that broken tenant wasn't touched
@@ -277,10 +325,16 @@ def test_broken_tenants_are_skipped(eviction_env: EvictionEnv):
log.info(f"{response}")
broken_size_post, _, _ = poor_mans_du(
env.neon_env, [(broken_tenant_id, broken_timeline_id)], verbose=True
env.neon_env,
[(broken_tenant_id, broken_timeline_id)],
env.pageserver,
verbose=True,
)
healthy_size_post, _, _ = poor_mans_du(
env.neon_env, [(healthy_tenant_id, healthy_timeline_id)], verbose=True
env.neon_env,
[(healthy_tenant_id, healthy_timeline_id)],
env.pageserver,
verbose=True,
)
assert broken_size_pre == broken_size_post, "broken tenant should not be touched"
@@ -302,7 +356,7 @@ def test_pageserver_evicts_until_pressure_is_relieved(
env = eviction_env
pageserver_http = env.pageserver_http
(total_on_disk, _, _) = env.timelines_du()
(total_on_disk, _, _) = env.timelines_du(env.pageserver)
target = total_on_disk // 2
@@ -311,7 +365,7 @@ def test_pageserver_evicts_until_pressure_is_relieved(
)
log.info(f"{response}")
(later_total_on_disk, _, _) = env.timelines_du()
(later_total_on_disk, _, _) = env.timelines_du(env.pageserver)
actual_change = total_on_disk - later_total_on_disk
@@ -336,8 +390,8 @@ def test_pageserver_respects_overridden_resident_size(
env = eviction_env
ps_http = env.pageserver_http
(total_on_disk, _, _) = env.timelines_du()
du_by_timeline = env.du_by_timeline()
(total_on_disk, _, _) = env.timelines_du(env.pageserver)
du_by_timeline = env.du_by_timeline(env.pageserver)
log.info("du_by_timeline: %s", du_by_timeline)
assert len(du_by_timeline) == 2, "this test assumes two tenants"
@@ -379,8 +433,8 @@ def test_pageserver_respects_overridden_resident_size(
GLOBAL_LRU_LOG_LINE,
), "this test is pointless if it fell back to global LRU"
(later_total_on_disk, _, _) = env.timelines_du()
later_du_by_timeline = env.du_by_timeline()
(later_total_on_disk, _, _) = env.timelines_du(env.pageserver)
later_du_by_timeline = env.du_by_timeline(env.pageserver)
log.info("later_du_by_timeline: %s", later_du_by_timeline)
actual_change = total_on_disk - later_total_on_disk
@@ -412,7 +466,7 @@ def test_pageserver_falls_back_to_global_lru(eviction_env: EvictionEnv, order: E
env = eviction_env
ps_http = env.pageserver_http
(total_on_disk, _, _) = env.timelines_du()
(total_on_disk, _, _) = env.timelines_du(env.pageserver)
target = total_on_disk
response = ps_http.disk_usage_eviction_run(
@@ -420,7 +474,7 @@ def test_pageserver_falls_back_to_global_lru(eviction_env: EvictionEnv, order: E
)
log.info(f"{response}")
(later_total_on_disk, _, _) = env.timelines_du()
(later_total_on_disk, _, _) = env.timelines_du(env.pageserver)
actual_change = total_on_disk - later_total_on_disk
assert 0 <= actual_change, "nothing can load layers during this test"
assert actual_change >= target, "eviction must always evict more than target"
@@ -448,8 +502,8 @@ def test_partial_evict_tenant(eviction_env: EvictionEnv, order: EvictionOrder):
env = eviction_env
ps_http = env.pageserver_http
(total_on_disk, _, _) = env.timelines_du()
du_by_timeline = env.du_by_timeline()
(total_on_disk, _, _) = env.timelines_du(env.pageserver)
du_by_timeline = env.du_by_timeline(env.pageserver)
# pick smaller or greater (iteration order is insertion order of scale=4 and scale=6)
[warm, cold] = list(du_by_timeline.keys())
@@ -467,12 +521,12 @@ def test_partial_evict_tenant(eviction_env: EvictionEnv, order: EvictionOrder):
)
log.info(f"{response}")
(later_total_on_disk, _, _) = env.timelines_du()
(later_total_on_disk, _, _) = env.timelines_du(env.pageserver)
actual_change = total_on_disk - later_total_on_disk
assert 0 <= actual_change, "nothing can load layers during this test"
assert actual_change >= target, "eviction must always evict more than target"
later_du_by_timeline = env.du_by_timeline()
later_du_by_timeline = env.du_by_timeline(env.pageserver)
for tenant, later_tenant_usage in later_du_by_timeline.items():
assert (
later_tenant_usage < du_by_timeline[tenant]
@@ -508,7 +562,10 @@ def test_partial_evict_tenant(eviction_env: EvictionEnv, order: EvictionOrder):
def poor_mans_du(
env: NeonEnv, timelines: list[Tuple[TenantId, TimelineId]], verbose: bool = False
env: NeonEnv,
timelines: list[Tuple[TenantId, TimelineId]],
pageserver: NeonPageserver,
verbose: bool = False,
) -> Tuple[int, int, int]:
"""
Disk usage, largest, smallest layer for layer files over the given (tenant, timeline) tuples;
@@ -518,7 +575,7 @@ def poor_mans_du(
largest_layer = 0
smallest_layer = None
for tenant_id, timeline_id in timelines:
timeline_dir = env.pageserver.timeline_dir(tenant_id, timeline_id)
timeline_dir = pageserver.timeline_dir(tenant_id, timeline_id)
assert timeline_dir.exists(), f"timeline dir does not exist: {timeline_dir}"
total = 0
for file in timeline_dir.iterdir():
@@ -549,6 +606,7 @@ def test_statvfs_error_handling(eviction_env: EvictionEnv):
env = eviction_env
env.neon_env.pageserver.stop()
env.pageserver_start_with_disk_usage_eviction(
env.pageserver,
period="1s",
max_usage_pct=90,
min_avail_bytes=0,
@@ -573,11 +631,12 @@ def test_statvfs_pressure_usage(eviction_env: EvictionEnv):
env.neon_env.pageserver.stop()
# make it seem like we're at 100% utilization by setting total bytes to the used bytes
total_size, _, _ = env.timelines_du()
total_size, _, _ = env.timelines_du(env.pageserver)
blocksize = 512
total_blocks = (total_size + (blocksize - 1)) // blocksize
env.pageserver_start_with_disk_usage_eviction(
env.pageserver,
period="1s",
max_usage_pct=33,
min_avail_bytes=0,
@@ -597,7 +656,7 @@ def test_statvfs_pressure_usage(eviction_env: EvictionEnv):
wait_until(10, 1, relieved_log_message)
post_eviction_total_size, _, _ = env.timelines_du()
post_eviction_total_size, _, _ = env.timelines_du(env.pageserver)
assert post_eviction_total_size <= 0.33 * total_size, "we requested max 33% usage"
@@ -612,13 +671,14 @@ def test_statvfs_pressure_min_avail_bytes(eviction_env: EvictionEnv):
env.neon_env.pageserver.stop()
# make it seem like we're at 100% utilization by setting total bytes to the used bytes
total_size, _, _ = env.timelines_du()
total_size, _, _ = env.timelines_du(env.pageserver)
blocksize = 512
total_blocks = (total_size + (blocksize - 1)) // blocksize
min_avail_bytes = total_size // 3
env.pageserver_start_with_disk_usage_eviction(
env.pageserver,
period="1s",
max_usage_pct=100,
min_avail_bytes=min_avail_bytes,
@@ -638,7 +698,67 @@ def test_statvfs_pressure_min_avail_bytes(eviction_env: EvictionEnv):
wait_until(10, 1, relieved_log_message)
post_eviction_total_size, _, _ = env.timelines_du()
post_eviction_total_size, _, _ = env.timelines_du(env.pageserver)
assert (
total_size - post_eviction_total_size >= min_avail_bytes
), "we requested at least min_avail_bytes worth of free space"
def test_secondary_mode_eviction(eviction_env_ha: EvictionEnv):
env = eviction_env_ha
tenant_ids = [t[0] for t in env.timelines]
log.info("Setting up secondary location...")
ps_attached = env.neon_env.pageservers[0]
ps_secondary = env.neon_env.pageservers[1]
for tenant_id in tenant_ids:
ps_secondary.tenant_location_configure(
tenant_id,
{
"mode": "Secondary",
"secondary_conf": {"warm": True},
"tenant_conf": {},
},
)
readback_conf = ps_secondary.read_tenant_location_conf(tenant_id)
log.info(f"Read back conf: {readback_conf}")
# Request secondary location to download all layers that the attached location has
ps_attached.http_client().tenant_heatmap_upload(tenant_id)
ps_secondary.http_client().tenant_secondary_download(tenant_id)
# Configure the secondary pageserver to have a phony small disk size
ps_secondary.stop()
total_size, _, _ = env.timelines_du(ps_secondary)
blocksize = 512
total_blocks = (total_size + (blocksize - 1)) // blocksize
min_avail_bytes = total_size // 3
env.pageserver_start_with_disk_usage_eviction(
ps_secondary,
period="1s",
max_usage_pct=100,
min_avail_bytes=min_avail_bytes,
mock_behavior={
"type": "Success",
"blocksize": blocksize,
"total_blocks": total_blocks,
# Only count layer files towards used bytes in the mock_statvfs.
# This avoids accounting for metadata files & tenant conf in the tests.
"name_filter": ".*__.*",
},
eviction_order=EvictionOrder.ABSOLUTE_ORDER,
)
def relieved_log_message():
assert ps_secondary.log_contains(".*disk usage pressure relieved")
wait_until(10, 1, relieved_log_message)
post_eviction_total_size, _, _ = env.timelines_du(ps_secondary)
assert (
total_size - post_eviction_total_size >= min_avail_bytes