diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index b15070635d..b903a22463 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -25,13 +25,10 @@ //! //! The iteration evicts layers in LRU fashion, but, with a weak reservation per tenant. //! The reservation is to keep the most recently accessed X bytes per tenant resident. -//! All layers that don't make the cut are put on a list and become eviction candidates. -//! We evict until we're below the two thresholds. +//! If we cannot relieve pressure by evicting layers outside of the reservation, we +//! start evicting layers that are part of the reservation, LRU first. //! -//! If the above strategy wouldn't free enough space, we fall back to global LRU right away, -//! not respecting any per-tenant reservations. -//! -//! This value for the per-tenant reservation is referred to as `tenant_min_resident_size` +//! The value for the per-tenant reservation is referred to as `tenant_min_resident_size` //! throughout the code, but, no actual variable carries that name. //! The per-tenant default value is the `max(tenant's layer file sizes, regardless of local or remote)`. //! The idea is to allow at least one layer to be resident per tenant, to ensure it can make forward progress @@ -43,7 +40,11 @@ // - The `#[allow(dead_code)]` above various structs are to suppress warnings about only the Debug impl // reading these fields. We use the Debug impl for semi-structured logging, though. -use std::{collections::HashMap, ops::ControlFlow, sync::Arc, time::Duration}; +use std::{ + collections::HashMap, + sync::Arc, + time::{Duration, SystemTime}, +}; use anyhow::Context; use nix::dir::Dir; @@ -53,12 +54,12 @@ use sync_wrapper::SyncWrapper; use tokio::time::Instant; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, instrument, warn, Instrument}; -use utils::{id::TenantId, serde_percent::Percent}; +use utils::serde_percent::Percent; use crate::{ config::PageServerConf, task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}, - tenant::{self, LocalLayerInfoForDiskUsageEviction, Timeline}, + tenant::{self, storage_layer::PersistentLayer, Timeline}, }; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -281,7 +282,6 @@ struct LayerCount { count: usize, } -#[allow(clippy::needless_late_init)] pub async fn disk_usage_eviction_task_iteration_impl( state: &State, storage: &GenericRemoteStorage, @@ -294,12 +294,6 @@ pub async fn disk_usage_eviction_task_iteration_impl( .try_lock() .map_err(|_| anyhow::anyhow!("iteration is already executing"))?; - // planned post-eviction usage - let mut usage_planned_min_resident_size_respecting = usage_pre; - let mut usage_planned_global_lru = None; - // achieved post-eviction usage according to internal accounting - let mut usage_assumed = usage_pre; - debug!(?usage_pre, "disk usage"); if !usage_pre.has_pressure() { @@ -311,42 +305,46 @@ pub async fn disk_usage_eviction_task_iteration_impl( "running disk usage based eviction due to pressure" ); - let mut lru_candidates: Vec<(_, LocalLayerInfoForDiskUsageEviction)> = Vec::new(); - - // get a snapshot of the list of tenants - let tenants = tenant::mgr::list_tenants() - .await - .context("get list of tenants")?; - - { - let mut tmp = Vec::new(); - for (tenant_id, _state) in &tenants { - let flow = extend_lru_candidates( - Mode::RespectTenantMinResidentSize, - *tenant_id, - &mut lru_candidates, - &mut tmp, - cancel, - ) - .await; - - if let ControlFlow::Break(()) = flow { - return Ok(IterationOutcome::Cancelled); - } - - assert!(tmp.is_empty(), "tmp has to be fully drained each iteration"); + let candidates = match collect_eviction_candidates(cancel).await? { + EvictionCandidates::Cancelled => { + return Ok(IterationOutcome::Cancelled); } - } + EvictionCandidates::Finished(partitioned) => partitioned, + }; - if cancel.is_cancelled() { - return Ok(IterationOutcome::Cancelled); + // Debug-log the list of candidates + let now = SystemTime::now(); + for (i, (partition, candidate)) in candidates.iter().enumerate() { + debug!( + "cand {}/{}: size={}, no_access_for={}us, parition={:?}, tenant={} timeline={} layer={}", + i + 1, + candidates.len(), + candidate.layer.file_size(), + now.duration_since(candidate.last_activity_ts) + .unwrap() + .as_micros(), + partition, + candidate.layer.get_tenant_id(), + candidate.layer.get_timeline_id(), + candidate.layer.filename().file_name(), + ); } // phase1: select victims to relieve pressure - lru_candidates.sort_unstable_by_key(|(_, layer)| layer.last_activity_ts); - let mut batched: HashMap<_, Vec> = HashMap::new(); - for (i, (timeline, layer)) in lru_candidates.into_iter().enumerate() { - if !usage_planned_min_resident_size_respecting.has_pressure() { + // + // Walk through the list of candidates, until we have accumulated enough layers to get + // us back under the pressure threshold. 'usage_planned' is updated so that it tracks + // how much disk space would be used after evicting all the layers up to the current + // point in the list. The layers are collected in 'batched', grouped per timeline. + // + // If we get far enough in the list that we start to evict layers that are below + // the tenant's min-resident-size threshold, print a warning, and memorize the disk + // usage at that point, in 'usage_planned_min_resident_size_respecting'. + let mut batched: HashMap<_, Vec>> = HashMap::new(); + let mut warned = None; + let mut usage_planned = usage_pre; + for (i, (partition, candidate)) in candidates.into_iter().enumerate() { + if !usage_planned.has_pressure() { debug!( no_candidates_evicted = i, "took enough candidates for pressure to be relieved" @@ -354,66 +352,40 @@ pub async fn disk_usage_eviction_task_iteration_impl( break; } - usage_planned_min_resident_size_respecting.add_available_bytes(layer.file_size()); + if partition == MinResidentSizePartition::Below && warned.is_none() { + warn!(?usage_pre, ?usage_planned, candidate_no=i, "tenant_min_resident_size-respecting LRU would not relieve pressure, evicting more following global LRU policy"); + warned = Some(usage_planned); + } + + usage_planned.add_available_bytes(candidate.layer.file_size()); batched - .entry(TimelineKey(timeline.clone())) + .entry(TimelineKey(candidate.timeline)) .or_default() - .push(layer); + .push(candidate.layer); } - // If we can't relieve pressure while respecting tenant_min_resident_size, fall back to global LRU. - if usage_planned_min_resident_size_respecting.has_pressure() { - // NB: tests depend on parts of this log message - warn!(?usage_pre, ?usage_planned_min_resident_size_respecting, "tenant_min_resident_size-respecting LRU would not relieve pressure, falling back to global LRU"); - batched.clear(); - let mut usage_planned = usage_pre; - let mut global_lru_candidates = Vec::new(); - let mut tmp = Vec::new(); - for (tenant_id, _state) in &tenants { - let flow = extend_lru_candidates( - Mode::GlobalLru, - *tenant_id, - &mut global_lru_candidates, - &mut tmp, - cancel, - ) - .await; - if let ControlFlow::Break(()) = flow { - return Ok(IterationOutcome::Cancelled); - } - - assert!(tmp.is_empty(), "tmp has to be fully drained each iteration"); - } - global_lru_candidates.sort_unstable_by_key(|(_, layer)| layer.last_activity_ts); - for (timeline, layer) in global_lru_candidates { - usage_planned.add_available_bytes(layer.file_size()); - batched - .entry(TimelineKey(timeline.clone())) - .or_default() - .push(layer); - if cancel.is_cancelled() { - return Ok(IterationOutcome::Cancelled); - } - } - usage_planned_global_lru = Some(usage_planned); - } - let usage_planned = PlannedUsage { - respecting_tenant_min_resident_size: usage_planned_min_resident_size_respecting, - fallback_to_global_lru: usage_planned_global_lru, + let usage_planned = match warned { + Some(respecting_tenant_min_resident_size) => PlannedUsage { + respecting_tenant_min_resident_size, + fallback_to_global_lru: Some(usage_planned), + }, + None => PlannedUsage { + respecting_tenant_min_resident_size: usage_planned, + fallback_to_global_lru: None, + }, }; - debug!(?usage_planned, "usage planned"); // phase2: evict victims batched by timeline - let mut batch = Vec::new(); + + // After the loop, `usage_assumed` is the post-eviction usage, + // according to internal accounting. + let mut usage_assumed = usage_pre; let mut evictions_failed = LayerCount::default(); - for (timeline, layers) in batched { + for (timeline, batch) in batched { let tenant_id = timeline.tenant_id; let timeline_id = timeline.timeline_id; - - batch.clear(); - batch.extend(layers.iter().map(|x| &x.layer).cloned()); let batch_size = batch.len(); debug!(%timeline_id, "evicting batch for timeline"); @@ -426,8 +398,8 @@ pub async fn disk_usage_eviction_task_iteration_impl( warn!("failed to evict batch: {:#}", e); } Ok(results) => { - assert_eq!(results.len(), layers.len()); - for (result, layer) in results.into_iter().zip(layers.iter()) { + assert_eq!(results.len(), batch.len()); + for (result, layer) in results.into_iter().zip(batch.iter()) { match result { Some(Ok(true)) => { usage_assumed.add_available_bytes(layer.file_size()); @@ -470,106 +442,161 @@ pub async fn disk_usage_eviction_task_iteration_impl( })) } -/// Different modes of gathering tenant's least recently used layers. -#[derive(Debug)] -enum Mode { - /// Add all but the most recently used `min_resident_size` worth of layers to the candidates - /// list. - /// - /// `min_resident_size` defaults to maximum layer file size of the tenant. This ensures that - /// the tenant will always have one layer resident. If we cannot compute `min_resident_size` - /// accurately because metadata is missing we use hardcoded constant. `min_resident_size` can - /// be overridden per tenant for important tenants. - RespectTenantMinResidentSize, - /// Consider all layer files from all tenants in LRU order. - /// - /// This is done if the `min_resident_size` respecting does not relieve pressure. - GlobalLru, +#[derive(Clone)] +struct EvictionCandidate { + timeline: Arc, + layer: Arc, + last_activity_ts: SystemTime, } -/// Figure out eviction candidates for the given tenant and append them to `lru_candidates`. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +enum MinResidentSizePartition { + Above, + Below, +} + +enum EvictionCandidates { + Cancelled, + Finished(Vec<(MinResidentSizePartition, EvictionCandidate)>), +} + +/// Gather the eviction candidates. /// -/// The `mode` argument controls which layers get appended to `lru_candidates`. -/// Read its type's doc comments for more details. +/// The returned `Ok(EvictionCandidates::Finished(candidates))` is sorted in eviction +/// order. A caller that evicts in that order, until pressure is relieved, implements +/// the eviction policy outlined in the module comment. /// -/// The caller is responsible for sorting `lru_candidates` once it has called this function -/// for all tenants. +/// # Example /// -/// The `scratch` vector is temporary storage and taken as an argument to avoid allocations. -/// It must be empty when calling this function. It is guaranteed to be empty when we -/// return `ControlFlow::Continue`. -#[instrument(skip_all, fields(?mode, %tenant_id))] -async fn extend_lru_candidates( - mode: Mode, - tenant_id: TenantId, - lru_candidates: &mut Vec<(Arc, LocalLayerInfoForDiskUsageEviction)>, - scratch: &mut Vec<(Arc, LocalLayerInfoForDiskUsageEviction)>, +/// Imagine that there are two tenants, A and B, with five layers each, a-e. +/// Each layer has size 100, and both tenant's min_resident_size is 150. +/// The eviction order would be +/// +/// ```text +/// partition last_activity_ts tenant/layer +/// Above 18:30 A/c +/// Above 19:00 A/b +/// Above 18:29 B/c +/// Above 19:05 B/b +/// Above 20:00 B/a +/// Above 20:03 A/a +/// Below 20:30 A/d +/// Below 20:40 B/d +/// Below 20:45 B/e +/// Below 20:58 A/e +/// ``` +/// +/// Now, if we need to evict 300 bytes to relieve pressure, we'd evict `A/c, A/b, B/c`. +/// They are all in the `Above` partition, so, we respected each tenant's min_resident_size. +/// +/// But, if we need to evict 900 bytes to relieve pressure, we'd evict +/// `A/c, A/b, B/c, B/b, B/a, A/a, A/d, B/d, B/e`, reaching into the `Below` partition +/// after exhauting the `Above` partition. +/// So, we did not respect each tenant's min_resident_size. +async fn collect_eviction_candidates( cancel: &CancellationToken, -) -> ControlFlow<()> { - debug!("begin"); +) -> anyhow::Result { + // get a snapshot of the list of tenants + let tenants = tenant::mgr::list_tenants() + .await + .context("get list of tenants")?; - let tenant = match tenant::mgr::get_tenant(tenant_id, true).await { - Ok(tenant) => tenant, - Err(e) => { - // this can happen if tenant has lifecycle transition after we fetched it - debug!("failed to get tenant: {e:#}"); - return ControlFlow::Continue(()); - } - }; - - if cancel.is_cancelled() { - return ControlFlow::Break(()); - } - - // If one of the timelines becomes `!is_active()` during the iteration, - // for example because we're shutting down, then `max_layer_size` can be too small. - // That's OK. This code only runs under a disk pressure situation, and being - // a little unfair to tenants during shutdown in such a situation is tolerable. - let mut max_layer_size = 0; - for tl in tenant.list_timelines() { - if !tl.is_active() { - continue; - } - let info = tl.get_local_layers_for_disk_usage_eviction(); - debug!(timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len()); - scratch.extend( - info.resident_layers - .into_iter() - .map(|layer_infos| (tl.clone(), layer_infos)), - ); - max_layer_size = max_layer_size.max(info.max_layer_size.unwrap_or(0)); + let mut candidates = Vec::new(); + for (tenant_id, _state) in &tenants { if cancel.is_cancelled() { - return ControlFlow::Break(()); + return Ok(EvictionCandidates::Cancelled); + } + let tenant = match tenant::mgr::get_tenant(*tenant_id, true).await { + Ok(tenant) => tenant, + Err(e) => { + // this can happen if tenant has lifecycle transition after we fetched it + debug!("failed to get tenant: {e:#}"); + continue; + } + }; + + // collect layers from all timelines in this tenant + // + // If one of the timelines becomes `!is_active()` during the iteration, + // for example because we're shutting down, then `max_layer_size` can be too small. + // That's OK. This code only runs under a disk pressure situation, and being + // a little unfair to tenants during shutdown in such a situation is tolerable. + let mut tenant_candidates = Vec::new(); + let mut max_layer_size = 0; + for tl in tenant.list_timelines() { + if !tl.is_active() { + continue; + } + let info = tl.get_local_layers_for_disk_usage_eviction(); + debug!(tenant_id=%tl.tenant_id, timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len()); + tenant_candidates.extend( + info.resident_layers + .into_iter() + .map(|layer_infos| (tl.clone(), layer_infos)), + ); + max_layer_size = max_layer_size.max(info.max_layer_size.unwrap_or(0)); + + if cancel.is_cancelled() { + return Ok(EvictionCandidates::Cancelled); + } + } + + // `min_resident_size` defaults to maximum layer file size of the tenant. + // This ensures that each tenant can have at least one layer resident at a given time, + // ensuring forward progress for a single Timeline::get in that tenant. + // It's a questionable heuristic since, usually, there are many Timeline::get + // requests going on for a tenant, and, at least in Neon prod, the median + // layer file size is much smaller than the compaction target size. + // We could be better here, e.g., sum of all L0 layers + most recent L1 layer. + // That's what's typically used by the various background loops. + // + // The default can be overriden with a fixed value in the tenant conf. + // A default override can be put in the default tenant conf in the pageserver.toml. + let min_resident_size = if let Some(s) = tenant.get_min_resident_size_override() { + debug!( + tenant_id=%tenant.tenant_id(), + overriden_size=s, + "using overridden min resident size for tenant" + ); + s + } else { + debug!( + tenant_id=%tenant.tenant_id(), + max_layer_size, + "using max layer size as min_resident_size for tenant", + ); + max_layer_size + }; + + // Sort layers most-recently-used first, then partition by + // cumsum above/below min_resident_size. + tenant_candidates + .sort_unstable_by_key(|(_, layer_info)| std::cmp::Reverse(layer_info.last_activity_ts)); + let mut cumsum: i128 = 0; + for (timeline, layer_info) in tenant_candidates.into_iter() { + let file_size = layer_info.file_size(); + let candidate = EvictionCandidate { + timeline, + last_activity_ts: layer_info.last_activity_ts, + layer: layer_info.layer, + }; + let partition = if cumsum > min_resident_size as i128 { + MinResidentSizePartition::Above + } else { + MinResidentSizePartition::Below + }; + candidates.push((partition, candidate)); + cumsum += i128::from(file_size); } } - let min_resident_size = match mode { - Mode::GlobalLru => { - lru_candidates.append(scratch); - return ControlFlow::Continue(()); - } - Mode::RespectTenantMinResidentSize => tenant - .get_min_resident_size_override() - .unwrap_or(max_layer_size), - }; + debug_assert!(MinResidentSizePartition::Above < MinResidentSizePartition::Below, + "as explained in the function's doc comment, layers that aren't in the tenant's min_resident_size are evicted first"); + candidates + .sort_unstable_by_key(|(partition, candidate)| (*partition, candidate.last_activity_ts)); - scratch.sort_unstable_by_key(|(_, layer_info)| layer_info.last_activity_ts); - - let mut current: u64 = scratch.iter().map(|(_, layer)| layer.file_size()).sum(); - for (tl, layer) in scratch.drain(..) { - if cancel.is_cancelled() { - return ControlFlow::Break(()); - } - if current <= min_resident_size { - break; - } - current -= layer.file_size(); - debug!(?layer, "adding layer to lru_candidates"); - lru_candidates.push((tl, layer)); - } - - ControlFlow::Continue(()) + Ok(EvictionCandidates::Finished(candidates)) } struct TimelineKey(Arc); diff --git a/test_runner/regress/test_disk_usage_eviction.py b/test_runner/regress/test_disk_usage_eviction.py index be6f6ff048..2c836f04c7 100644 --- a/test_runner/regress/test_disk_usage_eviction.py +++ b/test_runner/regress/test_disk_usage_eviction.py @@ -13,8 +13,11 @@ from fixtures.neon_fixtures import ( PgBin, RemoteStorageKind, wait_for_last_flush_lsn, + wait_for_upload_queue_empty, ) -from fixtures.types import TenantId, TimelineId +from fixtures.types import Lsn, TenantId, TimelineId + +GLOBAL_LRU_LOG_LINE = "tenant_min_resident_size-respecting LRU would not relieve pressure, evicting more following global LRU policy" @pytest.mark.parametrize("config_level_override", [None, 400]) @@ -68,6 +71,7 @@ class EvictionEnv: pg_bin: PgBin pageserver_http: PageserverHttpClient layer_size: int + pgbench_init_lsns: Dict[TenantId, Lsn] def timelines_du(self) -> Tuple[int, int, int]: return poor_mans_du(self.neon_env, [(tid, tlid) for tid, tlid, _ in self.timelines]) @@ -78,6 +82,15 @@ class EvictionEnv: for tid, tlid, _ in self.timelines } + def warm_up_tenant(self, tenant_id: TenantId): + """ + Start a read-only compute at the LSN after pgbench -i, and run pgbench -S against it. + This assumes that the tenant is still at the state after pbench -i. + """ + lsn = self.pgbench_init_lsns[tenant_id] + with self.neon_env.postgres.create_start("main", tenant_id=tenant_id, lsn=lsn) as pg: + self.pg_bin.run(["pgbench", "-S", pg.connstr()]) + @pytest.fixture def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> Iterator[EvictionEnv]: @@ -118,6 +131,8 @@ def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> It pgbench_scales = [4, 6] layer_size = 5 * 1024**2 + pgbench_init_lsns = {} + for scale in pgbench_scales: tenant_id, timeline_id = env.neon_cli.create_tenant( conf={ @@ -134,6 +149,12 @@ def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> It wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id) pageserver_http.timeline_checkpoint(tenant_id, timeline_id) + wait_for_upload_queue_empty(env.pageserver, tenant_id, timeline_id) + tl_info = pageserver_http.timeline_detail(tenant_id, timeline_id) + assert tl_info["last_record_lsn"] == tl_info["disk_consistent_lsn"] + assert tl_info["disk_consistent_lsn"] == tl_info["remote_consistent_lsn"] + pgbench_init_lsns[tenant_id] = Lsn(tl_info["last_record_lsn"]) + layers = pageserver_http.layer_map_info(tenant_id, timeline_id) log.info(f"{layers}") assert len(layers.historic_layers) >= 4 @@ -146,6 +167,7 @@ def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> It pageserver_http=pageserver_http, layer_size=layer_size, pg_bin=pg_bin, + pgbench_init_lsns=pgbench_init_lsns, ) yield eviction_env @@ -204,9 +226,11 @@ def test_pageserver_respects_overridden_resident_size(eviction_env: EvictionEnv) du_by_timeline[large_tenant] - du_by_timeline[small_tenant] > 5 * env.layer_size ), "ensure this test will do more than 1 eviction" - # give the larger tenant a haircut while prevening the smaller tenant from getting one + # Give the larger tenant a haircut while preventing the smaller tenant from getting one. + # To prevent the smaller from getting a haircut, we set min_resident_size to its current size. + # To ensure the larger tenant is getting a haircut, any non-zero `target` will do. min_resident_size = du_by_timeline[small_tenant] - target = du_by_timeline[large_tenant] - du_by_timeline[small_tenant] + target = 1 assert any( [du > min_resident_size for du in du_by_timeline.values()] ), "ensure the larger tenant will get a haircut" @@ -214,13 +238,17 @@ def test_pageserver_respects_overridden_resident_size(eviction_env: EvictionEnv) ps_http.set_tenant_config(small_tenant[0], {"min_resident_size_override": min_resident_size}) ps_http.set_tenant_config(large_tenant[0], {"min_resident_size_override": min_resident_size}) + # Make the large tenant more-recently used. An incorrect implemention would try to evict + # from the smaller tenant first, since its layers would be the least-recently-used. + env.warm_up_tenant(large_tenant[0]) + # do one run response = ps_http.disk_usage_eviction_run({"evict_bytes": target}) log.info(f"{response}") time.sleep(1) # give log time to flush assert not env.neon_env.pageserver.log_contains( - "falling back to global LRU" + GLOBAL_LRU_LOG_LINE, ), "this test is pointless if it fell back to global LRU" (later_total_on_disk, _, _) = env.timelines_du() @@ -246,8 +274,8 @@ def test_pageserver_respects_overridden_resident_size(eviction_env: EvictionEnv) def test_pageserver_falls_back_to_global_lru(eviction_env: EvictionEnv): """ - The pageserver should fall back to global LRU if the tenant_min_resident_size-respecting eviction - wouldn't evict enough. + If we can't relieve pressure using tenant_min_resident_size-respecting eviction, + we should continue to evict layers following global LRU. """ env = eviction_env ps_http = env.pageserver_http @@ -264,8 +292,8 @@ def test_pageserver_falls_back_to_global_lru(eviction_env: EvictionEnv): assert actual_change >= target, "eviction must always evict more than target" time.sleep(1) # give log time to flush - assert env.neon_env.pageserver.log_contains("falling back to global LRU") - env.neon_env.pageserver.allowed_errors.append(".*falling back to global LRU") + assert env.neon_env.pageserver.log_contains(GLOBAL_LRU_LOG_LINE) + env.neon_env.pageserver.allowed_errors.append(".*" + GLOBAL_LRU_LOG_LINE) def test_partial_evict_tenant(eviction_env: EvictionEnv): @@ -281,8 +309,7 @@ def test_partial_evict_tenant(eviction_env: EvictionEnv): tenant_usage = du_by_timeline[our_tenant] # make our tenant more recently used than the other one - with env.neon_env.postgres.create_start("main", tenant_id=tenant_id) as pg: - env.pg_bin.run(["pgbench", "-S", pg.connstr()]) + env.warm_up_tenant(tenant_id) target = total_on_disk - (tenant_usage // 2) response = ps_http.disk_usage_eviction_run({"evict_bytes": target})