mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 21:20:37 +00:00
Merge pull request #3890 from neondatabase/heikki/disk-usage-eviction
Rewrite parts of disk usage eviction implementation to make it more understandable (I hope).
This commit is contained in:
@@ -25,13 +25,10 @@
|
||||
//!
|
||||
//! The iteration evicts layers in LRU fashion, but, with a weak reservation per tenant.
|
||||
//! The reservation is to keep the most recently accessed X bytes per tenant resident.
|
||||
//! All layers that don't make the cut are put on a list and become eviction candidates.
|
||||
//! We evict until we're below the two thresholds.
|
||||
//! If we cannot relieve pressure by evicting layers outside of the reservation, we
|
||||
//! start evicting layers that are part of the reservation, LRU first.
|
||||
//!
|
||||
//! If the above strategy wouldn't free enough space, we fall back to global LRU right away,
|
||||
//! not respecting any per-tenant reservations.
|
||||
//!
|
||||
//! This value for the per-tenant reservation is referred to as `tenant_min_resident_size`
|
||||
//! The value for the per-tenant reservation is referred to as `tenant_min_resident_size`
|
||||
//! throughout the code, but, no actual variable carries that name.
|
||||
//! The per-tenant default value is the `max(tenant's layer file sizes, regardless of local or remote)`.
|
||||
//! The idea is to allow at least one layer to be resident per tenant, to ensure it can make forward progress
|
||||
@@ -43,7 +40,11 @@
|
||||
// - The `#[allow(dead_code)]` above various structs are to suppress warnings about only the Debug impl
|
||||
// reading these fields. We use the Debug impl for semi-structured logging, though.
|
||||
|
||||
use std::{collections::HashMap, ops::ControlFlow, sync::Arc, time::Duration};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::Arc,
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use nix::dir::Dir;
|
||||
@@ -53,12 +54,12 @@ use sync_wrapper::SyncWrapper;
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, instrument, warn, Instrument};
|
||||
use utils::{id::TenantId, serde_percent::Percent};
|
||||
use utils::serde_percent::Percent;
|
||||
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
||||
tenant::{self, LocalLayerInfoForDiskUsageEviction, Timeline},
|
||||
tenant::{self, storage_layer::PersistentLayer, Timeline},
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
@@ -281,7 +282,6 @@ struct LayerCount {
|
||||
count: usize,
|
||||
}
|
||||
|
||||
#[allow(clippy::needless_late_init)]
|
||||
pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
state: &State,
|
||||
storage: &GenericRemoteStorage,
|
||||
@@ -294,12 +294,6 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
.try_lock()
|
||||
.map_err(|_| anyhow::anyhow!("iteration is already executing"))?;
|
||||
|
||||
// planned post-eviction usage
|
||||
let mut usage_planned_min_resident_size_respecting = usage_pre;
|
||||
let mut usage_planned_global_lru = None;
|
||||
// achieved post-eviction usage according to internal accounting
|
||||
let mut usage_assumed = usage_pre;
|
||||
|
||||
debug!(?usage_pre, "disk usage");
|
||||
|
||||
if !usage_pre.has_pressure() {
|
||||
@@ -311,42 +305,46 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
"running disk usage based eviction due to pressure"
|
||||
);
|
||||
|
||||
let mut lru_candidates: Vec<(_, LocalLayerInfoForDiskUsageEviction)> = Vec::new();
|
||||
|
||||
// get a snapshot of the list of tenants
|
||||
let tenants = tenant::mgr::list_tenants()
|
||||
.await
|
||||
.context("get list of tenants")?;
|
||||
|
||||
{
|
||||
let mut tmp = Vec::new();
|
||||
for (tenant_id, _state) in &tenants {
|
||||
let flow = extend_lru_candidates(
|
||||
Mode::RespectTenantMinResidentSize,
|
||||
*tenant_id,
|
||||
&mut lru_candidates,
|
||||
&mut tmp,
|
||||
cancel,
|
||||
)
|
||||
.await;
|
||||
|
||||
if let ControlFlow::Break(()) = flow {
|
||||
return Ok(IterationOutcome::Cancelled);
|
||||
}
|
||||
|
||||
assert!(tmp.is_empty(), "tmp has to be fully drained each iteration");
|
||||
let candidates = match collect_eviction_candidates(cancel).await? {
|
||||
EvictionCandidates::Cancelled => {
|
||||
return Ok(IterationOutcome::Cancelled);
|
||||
}
|
||||
}
|
||||
EvictionCandidates::Finished(partitioned) => partitioned,
|
||||
};
|
||||
|
||||
if cancel.is_cancelled() {
|
||||
return Ok(IterationOutcome::Cancelled);
|
||||
// Debug-log the list of candidates
|
||||
let now = SystemTime::now();
|
||||
for (i, (partition, candidate)) in candidates.iter().enumerate() {
|
||||
debug!(
|
||||
"cand {}/{}: size={}, no_access_for={}us, parition={:?}, tenant={} timeline={} layer={}",
|
||||
i + 1,
|
||||
candidates.len(),
|
||||
candidate.layer.file_size(),
|
||||
now.duration_since(candidate.last_activity_ts)
|
||||
.unwrap()
|
||||
.as_micros(),
|
||||
partition,
|
||||
candidate.layer.get_tenant_id(),
|
||||
candidate.layer.get_timeline_id(),
|
||||
candidate.layer.filename().file_name(),
|
||||
);
|
||||
}
|
||||
|
||||
// phase1: select victims to relieve pressure
|
||||
lru_candidates.sort_unstable_by_key(|(_, layer)| layer.last_activity_ts);
|
||||
let mut batched: HashMap<_, Vec<LocalLayerInfoForDiskUsageEviction>> = HashMap::new();
|
||||
for (i, (timeline, layer)) in lru_candidates.into_iter().enumerate() {
|
||||
if !usage_planned_min_resident_size_respecting.has_pressure() {
|
||||
//
|
||||
// Walk through the list of candidates, until we have accumulated enough layers to get
|
||||
// us back under the pressure threshold. 'usage_planned' is updated so that it tracks
|
||||
// how much disk space would be used after evicting all the layers up to the current
|
||||
// point in the list. The layers are collected in 'batched', grouped per timeline.
|
||||
//
|
||||
// If we get far enough in the list that we start to evict layers that are below
|
||||
// the tenant's min-resident-size threshold, print a warning, and memorize the disk
|
||||
// usage at that point, in 'usage_planned_min_resident_size_respecting'.
|
||||
let mut batched: HashMap<_, Vec<Arc<dyn PersistentLayer>>> = HashMap::new();
|
||||
let mut warned = None;
|
||||
let mut usage_planned = usage_pre;
|
||||
for (i, (partition, candidate)) in candidates.into_iter().enumerate() {
|
||||
if !usage_planned.has_pressure() {
|
||||
debug!(
|
||||
no_candidates_evicted = i,
|
||||
"took enough candidates for pressure to be relieved"
|
||||
@@ -354,66 +352,40 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
break;
|
||||
}
|
||||
|
||||
usage_planned_min_resident_size_respecting.add_available_bytes(layer.file_size());
|
||||
if partition == MinResidentSizePartition::Below && warned.is_none() {
|
||||
warn!(?usage_pre, ?usage_planned, candidate_no=i, "tenant_min_resident_size-respecting LRU would not relieve pressure, evicting more following global LRU policy");
|
||||
warned = Some(usage_planned);
|
||||
}
|
||||
|
||||
usage_planned.add_available_bytes(candidate.layer.file_size());
|
||||
|
||||
batched
|
||||
.entry(TimelineKey(timeline.clone()))
|
||||
.entry(TimelineKey(candidate.timeline))
|
||||
.or_default()
|
||||
.push(layer);
|
||||
.push(candidate.layer);
|
||||
}
|
||||
// If we can't relieve pressure while respecting tenant_min_resident_size, fall back to global LRU.
|
||||
if usage_planned_min_resident_size_respecting.has_pressure() {
|
||||
// NB: tests depend on parts of this log message
|
||||
warn!(?usage_pre, ?usage_planned_min_resident_size_respecting, "tenant_min_resident_size-respecting LRU would not relieve pressure, falling back to global LRU");
|
||||
batched.clear();
|
||||
let mut usage_planned = usage_pre;
|
||||
let mut global_lru_candidates = Vec::new();
|
||||
let mut tmp = Vec::new();
|
||||
for (tenant_id, _state) in &tenants {
|
||||
let flow = extend_lru_candidates(
|
||||
Mode::GlobalLru,
|
||||
*tenant_id,
|
||||
&mut global_lru_candidates,
|
||||
&mut tmp,
|
||||
cancel,
|
||||
)
|
||||
.await;
|
||||
|
||||
if let ControlFlow::Break(()) = flow {
|
||||
return Ok(IterationOutcome::Cancelled);
|
||||
}
|
||||
|
||||
assert!(tmp.is_empty(), "tmp has to be fully drained each iteration");
|
||||
}
|
||||
global_lru_candidates.sort_unstable_by_key(|(_, layer)| layer.last_activity_ts);
|
||||
for (timeline, layer) in global_lru_candidates {
|
||||
usage_planned.add_available_bytes(layer.file_size());
|
||||
batched
|
||||
.entry(TimelineKey(timeline.clone()))
|
||||
.or_default()
|
||||
.push(layer);
|
||||
if cancel.is_cancelled() {
|
||||
return Ok(IterationOutcome::Cancelled);
|
||||
}
|
||||
}
|
||||
usage_planned_global_lru = Some(usage_planned);
|
||||
}
|
||||
let usage_planned = PlannedUsage {
|
||||
respecting_tenant_min_resident_size: usage_planned_min_resident_size_respecting,
|
||||
fallback_to_global_lru: usage_planned_global_lru,
|
||||
let usage_planned = match warned {
|
||||
Some(respecting_tenant_min_resident_size) => PlannedUsage {
|
||||
respecting_tenant_min_resident_size,
|
||||
fallback_to_global_lru: Some(usage_planned),
|
||||
},
|
||||
None => PlannedUsage {
|
||||
respecting_tenant_min_resident_size: usage_planned,
|
||||
fallback_to_global_lru: None,
|
||||
},
|
||||
};
|
||||
|
||||
debug!(?usage_planned, "usage planned");
|
||||
|
||||
// phase2: evict victims batched by timeline
|
||||
let mut batch = Vec::new();
|
||||
|
||||
// After the loop, `usage_assumed` is the post-eviction usage,
|
||||
// according to internal accounting.
|
||||
let mut usage_assumed = usage_pre;
|
||||
let mut evictions_failed = LayerCount::default();
|
||||
for (timeline, layers) in batched {
|
||||
for (timeline, batch) in batched {
|
||||
let tenant_id = timeline.tenant_id;
|
||||
let timeline_id = timeline.timeline_id;
|
||||
|
||||
batch.clear();
|
||||
batch.extend(layers.iter().map(|x| &x.layer).cloned());
|
||||
let batch_size = batch.len();
|
||||
|
||||
debug!(%timeline_id, "evicting batch for timeline");
|
||||
@@ -426,8 +398,8 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
warn!("failed to evict batch: {:#}", e);
|
||||
}
|
||||
Ok(results) => {
|
||||
assert_eq!(results.len(), layers.len());
|
||||
for (result, layer) in results.into_iter().zip(layers.iter()) {
|
||||
assert_eq!(results.len(), batch.len());
|
||||
for (result, layer) in results.into_iter().zip(batch.iter()) {
|
||||
match result {
|
||||
Some(Ok(true)) => {
|
||||
usage_assumed.add_available_bytes(layer.file_size());
|
||||
@@ -470,106 +442,161 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
}))
|
||||
}
|
||||
|
||||
/// Different modes of gathering tenant's least recently used layers.
|
||||
#[derive(Debug)]
|
||||
enum Mode {
|
||||
/// Add all but the most recently used `min_resident_size` worth of layers to the candidates
|
||||
/// list.
|
||||
///
|
||||
/// `min_resident_size` defaults to maximum layer file size of the tenant. This ensures that
|
||||
/// the tenant will always have one layer resident. If we cannot compute `min_resident_size`
|
||||
/// accurately because metadata is missing we use hardcoded constant. `min_resident_size` can
|
||||
/// be overridden per tenant for important tenants.
|
||||
RespectTenantMinResidentSize,
|
||||
/// Consider all layer files from all tenants in LRU order.
|
||||
///
|
||||
/// This is done if the `min_resident_size` respecting does not relieve pressure.
|
||||
GlobalLru,
|
||||
#[derive(Clone)]
|
||||
struct EvictionCandidate {
|
||||
timeline: Arc<Timeline>,
|
||||
layer: Arc<dyn PersistentLayer>,
|
||||
last_activity_ts: SystemTime,
|
||||
}
|
||||
|
||||
/// Figure out eviction candidates for the given tenant and append them to `lru_candidates`.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
|
||||
enum MinResidentSizePartition {
|
||||
Above,
|
||||
Below,
|
||||
}
|
||||
|
||||
enum EvictionCandidates {
|
||||
Cancelled,
|
||||
Finished(Vec<(MinResidentSizePartition, EvictionCandidate)>),
|
||||
}
|
||||
|
||||
/// Gather the eviction candidates.
|
||||
///
|
||||
/// The `mode` argument controls which layers get appended to `lru_candidates`.
|
||||
/// Read its type's doc comments for more details.
|
||||
/// The returned `Ok(EvictionCandidates::Finished(candidates))` is sorted in eviction
|
||||
/// order. A caller that evicts in that order, until pressure is relieved, implements
|
||||
/// the eviction policy outlined in the module comment.
|
||||
///
|
||||
/// The caller is responsible for sorting `lru_candidates` once it has called this function
|
||||
/// for all tenants.
|
||||
/// # Example
|
||||
///
|
||||
/// The `scratch` vector is temporary storage and taken as an argument to avoid allocations.
|
||||
/// It must be empty when calling this function. It is guaranteed to be empty when we
|
||||
/// return `ControlFlow::Continue`.
|
||||
#[instrument(skip_all, fields(?mode, %tenant_id))]
|
||||
async fn extend_lru_candidates(
|
||||
mode: Mode,
|
||||
tenant_id: TenantId,
|
||||
lru_candidates: &mut Vec<(Arc<Timeline>, LocalLayerInfoForDiskUsageEviction)>,
|
||||
scratch: &mut Vec<(Arc<Timeline>, LocalLayerInfoForDiskUsageEviction)>,
|
||||
/// Imagine that there are two tenants, A and B, with five layers each, a-e.
|
||||
/// Each layer has size 100, and both tenant's min_resident_size is 150.
|
||||
/// The eviction order would be
|
||||
///
|
||||
/// ```text
|
||||
/// partition last_activity_ts tenant/layer
|
||||
/// Above 18:30 A/c
|
||||
/// Above 19:00 A/b
|
||||
/// Above 18:29 B/c
|
||||
/// Above 19:05 B/b
|
||||
/// Above 20:00 B/a
|
||||
/// Above 20:03 A/a
|
||||
/// Below 20:30 A/d
|
||||
/// Below 20:40 B/d
|
||||
/// Below 20:45 B/e
|
||||
/// Below 20:58 A/e
|
||||
/// ```
|
||||
///
|
||||
/// Now, if we need to evict 300 bytes to relieve pressure, we'd evict `A/c, A/b, B/c`.
|
||||
/// They are all in the `Above` partition, so, we respected each tenant's min_resident_size.
|
||||
///
|
||||
/// But, if we need to evict 900 bytes to relieve pressure, we'd evict
|
||||
/// `A/c, A/b, B/c, B/b, B/a, A/a, A/d, B/d, B/e`, reaching into the `Below` partition
|
||||
/// after exhauting the `Above` partition.
|
||||
/// So, we did not respect each tenant's min_resident_size.
|
||||
async fn collect_eviction_candidates(
|
||||
cancel: &CancellationToken,
|
||||
) -> ControlFlow<()> {
|
||||
debug!("begin");
|
||||
) -> anyhow::Result<EvictionCandidates> {
|
||||
// get a snapshot of the list of tenants
|
||||
let tenants = tenant::mgr::list_tenants()
|
||||
.await
|
||||
.context("get list of tenants")?;
|
||||
|
||||
let tenant = match tenant::mgr::get_tenant(tenant_id, true).await {
|
||||
Ok(tenant) => tenant,
|
||||
Err(e) => {
|
||||
// this can happen if tenant has lifecycle transition after we fetched it
|
||||
debug!("failed to get tenant: {e:#}");
|
||||
return ControlFlow::Continue(());
|
||||
}
|
||||
};
|
||||
|
||||
if cancel.is_cancelled() {
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
|
||||
// If one of the timelines becomes `!is_active()` during the iteration,
|
||||
// for example because we're shutting down, then `max_layer_size` can be too small.
|
||||
// That's OK. This code only runs under a disk pressure situation, and being
|
||||
// a little unfair to tenants during shutdown in such a situation is tolerable.
|
||||
let mut max_layer_size = 0;
|
||||
for tl in tenant.list_timelines() {
|
||||
if !tl.is_active() {
|
||||
continue;
|
||||
}
|
||||
let info = tl.get_local_layers_for_disk_usage_eviction();
|
||||
debug!(timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len());
|
||||
scratch.extend(
|
||||
info.resident_layers
|
||||
.into_iter()
|
||||
.map(|layer_infos| (tl.clone(), layer_infos)),
|
||||
);
|
||||
max_layer_size = max_layer_size.max(info.max_layer_size.unwrap_or(0));
|
||||
let mut candidates = Vec::new();
|
||||
|
||||
for (tenant_id, _state) in &tenants {
|
||||
if cancel.is_cancelled() {
|
||||
return ControlFlow::Break(());
|
||||
return Ok(EvictionCandidates::Cancelled);
|
||||
}
|
||||
let tenant = match tenant::mgr::get_tenant(*tenant_id, true).await {
|
||||
Ok(tenant) => tenant,
|
||||
Err(e) => {
|
||||
// this can happen if tenant has lifecycle transition after we fetched it
|
||||
debug!("failed to get tenant: {e:#}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// collect layers from all timelines in this tenant
|
||||
//
|
||||
// If one of the timelines becomes `!is_active()` during the iteration,
|
||||
// for example because we're shutting down, then `max_layer_size` can be too small.
|
||||
// That's OK. This code only runs under a disk pressure situation, and being
|
||||
// a little unfair to tenants during shutdown in such a situation is tolerable.
|
||||
let mut tenant_candidates = Vec::new();
|
||||
let mut max_layer_size = 0;
|
||||
for tl in tenant.list_timelines() {
|
||||
if !tl.is_active() {
|
||||
continue;
|
||||
}
|
||||
let info = tl.get_local_layers_for_disk_usage_eviction();
|
||||
debug!(tenant_id=%tl.tenant_id, timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len());
|
||||
tenant_candidates.extend(
|
||||
info.resident_layers
|
||||
.into_iter()
|
||||
.map(|layer_infos| (tl.clone(), layer_infos)),
|
||||
);
|
||||
max_layer_size = max_layer_size.max(info.max_layer_size.unwrap_or(0));
|
||||
|
||||
if cancel.is_cancelled() {
|
||||
return Ok(EvictionCandidates::Cancelled);
|
||||
}
|
||||
}
|
||||
|
||||
// `min_resident_size` defaults to maximum layer file size of the tenant.
|
||||
// This ensures that each tenant can have at least one layer resident at a given time,
|
||||
// ensuring forward progress for a single Timeline::get in that tenant.
|
||||
// It's a questionable heuristic since, usually, there are many Timeline::get
|
||||
// requests going on for a tenant, and, at least in Neon prod, the median
|
||||
// layer file size is much smaller than the compaction target size.
|
||||
// We could be better here, e.g., sum of all L0 layers + most recent L1 layer.
|
||||
// That's what's typically used by the various background loops.
|
||||
//
|
||||
// The default can be overriden with a fixed value in the tenant conf.
|
||||
// A default override can be put in the default tenant conf in the pageserver.toml.
|
||||
let min_resident_size = if let Some(s) = tenant.get_min_resident_size_override() {
|
||||
debug!(
|
||||
tenant_id=%tenant.tenant_id(),
|
||||
overriden_size=s,
|
||||
"using overridden min resident size for tenant"
|
||||
);
|
||||
s
|
||||
} else {
|
||||
debug!(
|
||||
tenant_id=%tenant.tenant_id(),
|
||||
max_layer_size,
|
||||
"using max layer size as min_resident_size for tenant",
|
||||
);
|
||||
max_layer_size
|
||||
};
|
||||
|
||||
// Sort layers most-recently-used first, then partition by
|
||||
// cumsum above/below min_resident_size.
|
||||
tenant_candidates
|
||||
.sort_unstable_by_key(|(_, layer_info)| std::cmp::Reverse(layer_info.last_activity_ts));
|
||||
let mut cumsum: i128 = 0;
|
||||
for (timeline, layer_info) in tenant_candidates.into_iter() {
|
||||
let file_size = layer_info.file_size();
|
||||
let candidate = EvictionCandidate {
|
||||
timeline,
|
||||
last_activity_ts: layer_info.last_activity_ts,
|
||||
layer: layer_info.layer,
|
||||
};
|
||||
let partition = if cumsum > min_resident_size as i128 {
|
||||
MinResidentSizePartition::Above
|
||||
} else {
|
||||
MinResidentSizePartition::Below
|
||||
};
|
||||
candidates.push((partition, candidate));
|
||||
cumsum += i128::from(file_size);
|
||||
}
|
||||
}
|
||||
|
||||
let min_resident_size = match mode {
|
||||
Mode::GlobalLru => {
|
||||
lru_candidates.append(scratch);
|
||||
return ControlFlow::Continue(());
|
||||
}
|
||||
Mode::RespectTenantMinResidentSize => tenant
|
||||
.get_min_resident_size_override()
|
||||
.unwrap_or(max_layer_size),
|
||||
};
|
||||
debug_assert!(MinResidentSizePartition::Above < MinResidentSizePartition::Below,
|
||||
"as explained in the function's doc comment, layers that aren't in the tenant's min_resident_size are evicted first");
|
||||
candidates
|
||||
.sort_unstable_by_key(|(partition, candidate)| (*partition, candidate.last_activity_ts));
|
||||
|
||||
scratch.sort_unstable_by_key(|(_, layer_info)| layer_info.last_activity_ts);
|
||||
|
||||
let mut current: u64 = scratch.iter().map(|(_, layer)| layer.file_size()).sum();
|
||||
for (tl, layer) in scratch.drain(..) {
|
||||
if cancel.is_cancelled() {
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
if current <= min_resident_size {
|
||||
break;
|
||||
}
|
||||
current -= layer.file_size();
|
||||
debug!(?layer, "adding layer to lru_candidates");
|
||||
lru_candidates.push((tl, layer));
|
||||
}
|
||||
|
||||
ControlFlow::Continue(())
|
||||
Ok(EvictionCandidates::Finished(candidates))
|
||||
}
|
||||
|
||||
struct TimelineKey(Arc<Timeline>);
|
||||
|
||||
@@ -13,8 +13,11 @@ from fixtures.neon_fixtures import (
|
||||
PgBin,
|
||||
RemoteStorageKind,
|
||||
wait_for_last_flush_lsn,
|
||||
wait_for_upload_queue_empty,
|
||||
)
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
|
||||
GLOBAL_LRU_LOG_LINE = "tenant_min_resident_size-respecting LRU would not relieve pressure, evicting more following global LRU policy"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("config_level_override", [None, 400])
|
||||
@@ -68,6 +71,7 @@ class EvictionEnv:
|
||||
pg_bin: PgBin
|
||||
pageserver_http: PageserverHttpClient
|
||||
layer_size: int
|
||||
pgbench_init_lsns: Dict[TenantId, Lsn]
|
||||
|
||||
def timelines_du(self) -> Tuple[int, int, int]:
|
||||
return poor_mans_du(self.neon_env, [(tid, tlid) for tid, tlid, _ in self.timelines])
|
||||
@@ -78,6 +82,15 @@ class EvictionEnv:
|
||||
for tid, tlid, _ in self.timelines
|
||||
}
|
||||
|
||||
def warm_up_tenant(self, tenant_id: TenantId):
|
||||
"""
|
||||
Start a read-only compute at the LSN after pgbench -i, and run pgbench -S against it.
|
||||
This assumes that the tenant is still at the state after pbench -i.
|
||||
"""
|
||||
lsn = self.pgbench_init_lsns[tenant_id]
|
||||
with self.neon_env.postgres.create_start("main", tenant_id=tenant_id, lsn=lsn) as pg:
|
||||
self.pg_bin.run(["pgbench", "-S", pg.connstr()])
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> Iterator[EvictionEnv]:
|
||||
@@ -118,6 +131,8 @@ def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> It
|
||||
pgbench_scales = [4, 6]
|
||||
layer_size = 5 * 1024**2
|
||||
|
||||
pgbench_init_lsns = {}
|
||||
|
||||
for scale in pgbench_scales:
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
@@ -134,6 +149,12 @@ def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> It
|
||||
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
|
||||
|
||||
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
wait_for_upload_queue_empty(env.pageserver, tenant_id, timeline_id)
|
||||
tl_info = pageserver_http.timeline_detail(tenant_id, timeline_id)
|
||||
assert tl_info["last_record_lsn"] == tl_info["disk_consistent_lsn"]
|
||||
assert tl_info["disk_consistent_lsn"] == tl_info["remote_consistent_lsn"]
|
||||
pgbench_init_lsns[tenant_id] = Lsn(tl_info["last_record_lsn"])
|
||||
|
||||
layers = pageserver_http.layer_map_info(tenant_id, timeline_id)
|
||||
log.info(f"{layers}")
|
||||
assert len(layers.historic_layers) >= 4
|
||||
@@ -146,6 +167,7 @@ def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> It
|
||||
pageserver_http=pageserver_http,
|
||||
layer_size=layer_size,
|
||||
pg_bin=pg_bin,
|
||||
pgbench_init_lsns=pgbench_init_lsns,
|
||||
)
|
||||
|
||||
yield eviction_env
|
||||
@@ -204,9 +226,11 @@ def test_pageserver_respects_overridden_resident_size(eviction_env: EvictionEnv)
|
||||
du_by_timeline[large_tenant] - du_by_timeline[small_tenant] > 5 * env.layer_size
|
||||
), "ensure this test will do more than 1 eviction"
|
||||
|
||||
# give the larger tenant a haircut while prevening the smaller tenant from getting one
|
||||
# Give the larger tenant a haircut while preventing the smaller tenant from getting one.
|
||||
# To prevent the smaller from getting a haircut, we set min_resident_size to its current size.
|
||||
# To ensure the larger tenant is getting a haircut, any non-zero `target` will do.
|
||||
min_resident_size = du_by_timeline[small_tenant]
|
||||
target = du_by_timeline[large_tenant] - du_by_timeline[small_tenant]
|
||||
target = 1
|
||||
assert any(
|
||||
[du > min_resident_size for du in du_by_timeline.values()]
|
||||
), "ensure the larger tenant will get a haircut"
|
||||
@@ -214,13 +238,17 @@ def test_pageserver_respects_overridden_resident_size(eviction_env: EvictionEnv)
|
||||
ps_http.set_tenant_config(small_tenant[0], {"min_resident_size_override": min_resident_size})
|
||||
ps_http.set_tenant_config(large_tenant[0], {"min_resident_size_override": min_resident_size})
|
||||
|
||||
# Make the large tenant more-recently used. An incorrect implemention would try to evict
|
||||
# from the smaller tenant first, since its layers would be the least-recently-used.
|
||||
env.warm_up_tenant(large_tenant[0])
|
||||
|
||||
# do one run
|
||||
response = ps_http.disk_usage_eviction_run({"evict_bytes": target})
|
||||
log.info(f"{response}")
|
||||
|
||||
time.sleep(1) # give log time to flush
|
||||
assert not env.neon_env.pageserver.log_contains(
|
||||
"falling back to global LRU"
|
||||
GLOBAL_LRU_LOG_LINE,
|
||||
), "this test is pointless if it fell back to global LRU"
|
||||
|
||||
(later_total_on_disk, _, _) = env.timelines_du()
|
||||
@@ -246,8 +274,8 @@ def test_pageserver_respects_overridden_resident_size(eviction_env: EvictionEnv)
|
||||
|
||||
def test_pageserver_falls_back_to_global_lru(eviction_env: EvictionEnv):
|
||||
"""
|
||||
The pageserver should fall back to global LRU if the tenant_min_resident_size-respecting eviction
|
||||
wouldn't evict enough.
|
||||
If we can't relieve pressure using tenant_min_resident_size-respecting eviction,
|
||||
we should continue to evict layers following global LRU.
|
||||
"""
|
||||
env = eviction_env
|
||||
ps_http = env.pageserver_http
|
||||
@@ -264,8 +292,8 @@ def test_pageserver_falls_back_to_global_lru(eviction_env: EvictionEnv):
|
||||
assert actual_change >= target, "eviction must always evict more than target"
|
||||
|
||||
time.sleep(1) # give log time to flush
|
||||
assert env.neon_env.pageserver.log_contains("falling back to global LRU")
|
||||
env.neon_env.pageserver.allowed_errors.append(".*falling back to global LRU")
|
||||
assert env.neon_env.pageserver.log_contains(GLOBAL_LRU_LOG_LINE)
|
||||
env.neon_env.pageserver.allowed_errors.append(".*" + GLOBAL_LRU_LOG_LINE)
|
||||
|
||||
|
||||
def test_partial_evict_tenant(eviction_env: EvictionEnv):
|
||||
@@ -281,8 +309,7 @@ def test_partial_evict_tenant(eviction_env: EvictionEnv):
|
||||
tenant_usage = du_by_timeline[our_tenant]
|
||||
|
||||
# make our tenant more recently used than the other one
|
||||
with env.neon_env.postgres.create_start("main", tenant_id=tenant_id) as pg:
|
||||
env.pg_bin.run(["pgbench", "-S", pg.connstr()])
|
||||
env.warm_up_tenant(tenant_id)
|
||||
|
||||
target = total_on_disk - (tenant_usage // 2)
|
||||
response = ps_http.disk_usage_eviction_run({"evict_bytes": target})
|
||||
|
||||
Reference in New Issue
Block a user