automatic layer eviction

This patch adds a per-timeline periodic task that executes an eviction
policy. The eviction policy is configurable per tenant.

Two policies exist:
- NoEviction (the default one)
- LayerAccessThreshold

The LayerAccessThreshold policy examines the last access timestamp per
layer in the layer map and evicts the layer if that last access is
further in the past than a configurable threshold value.
This policy kind is evaluated periodically at a configurable period.
It logs a summary statistic at `info!()` or `warn!()` level, depending
on whether any evictions failed.

This feature has no explicit killswitch since it's off by default.
This commit is contained in:
Christian Schwarz
2023-02-06 19:20:25 +01:00
committed by Christian Schwarz
parent 1fdf01e3bc
commit 175a577ad4
15 changed files with 339 additions and 23 deletions

3
Cargo.lock generated
View File

@@ -917,6 +917,7 @@ dependencies = [
"reqwest",
"safekeeper_api",
"serde",
"serde_json",
"serde_with",
"storage_broker",
"tar",
@@ -2421,6 +2422,7 @@ dependencies = [
"crc32c",
"criterion",
"crossbeam-utils",
"either",
"enum-map",
"enumset",
"fail",
@@ -2484,6 +2486,7 @@ dependencies = [
"enum-map",
"postgres_ffi",
"serde",
"serde_json",
"serde_with",
"utils",
"workspace_hack",

View File

@@ -38,6 +38,7 @@ comfy-table = "6.1"
const_format = "0.2"
crc32c = "0.6"
crossbeam-utils = "0.8.5"
either = "1.8"
enum-map = "2.4.2"
enumset = "1.0.12"
fail = "0.5.0"

View File

@@ -15,6 +15,7 @@ postgres.workspace = true
regex.workspace = true
reqwest = { workspace = true, features = ["blocking", "json"] }
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
tar.workspace = true
thiserror.workspace = true

View File

@@ -419,6 +419,11 @@ impl PageServerNode {
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'trace_read_requests' as bool")?,
eviction_policy: settings
.get("eviction_policy")
.map(|x| serde_json::from_str(x))
.transpose()
.context("Failed to parse 'eviction_policy' json")?,
})
.send()?
.error_from_body()?;

View File

@@ -14,5 +14,6 @@ byteorder.workspace = true
utils.workspace = true
postgres_ffi.workspace = true
enum-map.workspace = true
serde_json.workspace = true
workspace_hack.workspace = true

View File

@@ -155,6 +155,11 @@ pub struct TenantConfigRequest {
pub lagging_wal_timeout: Option<String>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
pub trace_read_requests: Option<bool>,
// We defer the parsing of the eviction_policy field to the request handler.
// Otherwise we'd have to move the types for eviction policy into this package.
// We might do that once the eviction feature has stabilizied.
// For now, this field is not even documented in the openapi_spec.yml.
pub eviction_policy: Option<serde_json::Value>,
}
impl TenantConfigRequest {
@@ -174,6 +179,7 @@ impl TenantConfigRequest {
lagging_wal_timeout: None,
max_lsn_wal_lag: None,
trace_read_requests: None,
eviction_policy: None,
}
}
}
@@ -263,11 +269,11 @@ pub struct LayerResidenceEvent {
///
#[serde(rename = "timestamp_millis_since_epoch")]
#[serde_as(as = "serde_with::TimestampMilliSeconds")]
timestamp: SystemTime,
pub timestamp: SystemTime,
/// The new residence status of the layer.
status: LayerResidenceStatus,
pub status: LayerResidenceStatus,
/// The reason why we had to record this event.
reason: LayerResidenceEventReason,
pub reason: LayerResidenceEventReason,
}
/// The reason for recording a given [`ResidenceEvent`].

View File

@@ -23,6 +23,7 @@ const_format.workspace = true
consumption_metrics.workspace = true
crc32c.workspace = true
crossbeam-utils.workspace = true
either.workspace = true
fail.workspace = true
futures.workspace = true
git-version.workspace = true
@@ -51,7 +52,7 @@ thiserror.workspace = true
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] }
tokio-postgres.workspace = true
tokio-util.workspace = true
toml_edit.workspace = true
toml_edit = { workspace = true, features = [ "serde" ] }
tracing.workspace = true
url.workspace = true
walkdir.workspace = true

View File

@@ -731,6 +731,13 @@ impl PageServerConf {
})?);
}
if let Some(eviction_policy) = item.get("eviction_policy") {
t_conf.eviction_policy = Some(
toml_edit::de::from_item(eviction_policy.clone())
.context("parse eviction_policy")?,
);
}
Ok(t_conf)
}

View File

@@ -797,6 +797,14 @@ async fn update_tenant_config_handler(
);
}
if let Some(eviction_policy) = request_data.eviction_policy {
tenant_conf.eviction_policy = Some(
serde_json::from_value(eviction_policy)
.context("parse field `eviction_policy`")
.map_err(ApiError::BadRequest)?,
);
}
let state = get_state(&request);
mgr::set_new_tenant_config(state.conf, tenant_conf, tenant_id)
.instrument(info_span!("tenant_config", tenant = ?tenant_id))

View File

@@ -231,6 +231,9 @@ pub enum TaskKind {
// Compaction. One per tenant.
Compaction,
// Eviction. One per timeline.
Eviction,
// Initial logical size calculation
InitialLogicalSizeCalculation,

View File

@@ -2757,6 +2757,7 @@ pub mod harness {
lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout),
max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag),
trace_read_requests: Some(tenant_conf.trace_read_requests),
eviction_policy: Some(tenant_conf.eviction_policy),
}
}
}

View File

@@ -91,6 +91,7 @@ pub struct TenantConf {
/// to avoid eager reconnects.
pub max_lsn_wal_lag: NonZeroU64,
pub trace_read_requests: bool,
pub eviction_policy: EvictionPolicy,
}
/// Same as TenantConf, but this struct preserves the information about
@@ -153,6 +154,34 @@ pub struct TenantConfOpt {
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub trace_read_requests: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub eviction_policy: Option<EvictionPolicy>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind")]
pub enum EvictionPolicy {
NoEviction,
LayerAccessThreshold(EvictionPolicyLayerAccessThreshold),
}
impl EvictionPolicy {
pub fn discriminant_str(&self) -> &'static str {
match self {
EvictionPolicy::NoEviction => "NoEviction",
EvictionPolicy::LayerAccessThreshold(_) => "LayerAccessThreshold",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct EvictionPolicyLayerAccessThreshold {
#[serde(with = "humantime_serde")]
pub period: Duration,
#[serde(with = "humantime_serde")]
pub threshold: Duration,
}
impl TenantConfOpt {
@@ -189,6 +218,7 @@ impl TenantConfOpt {
trace_read_requests: self
.trace_read_requests
.unwrap_or(global_conf.trace_read_requests),
eviction_policy: self.eviction_policy.unwrap_or(global_conf.eviction_policy),
}
}
@@ -261,6 +291,7 @@ impl Default for TenantConf {
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
.expect("cannot parse default max walreceiver Lsn wal lag"),
trace_read_requests: false,
eviction_policy: EvictionPolicy::NoEviction,
}
}
}

View File

@@ -13,6 +13,7 @@ use crate::task_mgr::TaskKind;
use crate::walrecord::NeonWalRecord;
use anyhow::Result;
use bytes::Bytes;
use either::Either;
use enum_map::EnumMap;
use enumset::EnumSet;
use pageserver_api::models::LayerAccessKind;
@@ -92,7 +93,23 @@ pub enum ValueReconstructResult {
}
#[derive(Debug)]
pub struct LayerAccessStats(Mutex<LayerAccessStatsInner>);
pub struct LayerAccessStats(Mutex<LayerAccessStatsLocked>);
/// This struct holds two instances of [`LayerAccessStatsInner`].
/// Accesses are recorded to both instances.
/// The `for_scraping_api`instance can be reset from the management API via [`LayerAccessStatsReset`].
/// The `for_eviction_policy` is never reset.
#[derive(Debug, Default, Clone)]
struct LayerAccessStatsLocked {
for_scraping_api: LayerAccessStatsInner,
for_eviction_policy: LayerAccessStatsInner,
}
impl LayerAccessStatsLocked {
fn iter_mut(&mut self) -> impl Iterator<Item = &mut LayerAccessStatsInner> {
[&mut self.for_scraping_api, &mut self.for_eviction_policy].into_iter()
}
}
#[derive(Debug, Default, Clone)]
struct LayerAccessStatsInner {
@@ -104,10 +121,10 @@ struct LayerAccessStatsInner {
}
#[derive(Debug, Clone, Copy)]
struct LayerAccessStatFullDetails {
when: SystemTime,
task_kind: TaskKind,
access_kind: LayerAccessKind,
pub(super) struct LayerAccessStatFullDetails {
pub(super) when: SystemTime,
pub(super) task_kind: TaskKind,
pub(super) access_kind: LayerAccessKind,
}
#[derive(Clone, Copy, strum_macros::EnumString)]
@@ -142,13 +159,13 @@ impl LayerAccessStatFullDetails {
impl LayerAccessStats {
pub(crate) fn for_loading_layer(status: LayerResidenceStatus) -> Self {
let new = LayerAccessStats(Mutex::new(LayerAccessStatsInner::default()));
let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default()));
new.record_residence_event(status, LayerResidenceEventReason::LayerLoad);
new
}
pub(crate) fn for_new_layer_file() -> Self {
let new = LayerAccessStats(Mutex::new(LayerAccessStatsInner::default()));
let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default()));
new.record_residence_event(
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
@@ -176,37 +193,43 @@ impl LayerAccessStats {
status: LayerResidenceStatus,
reason: LayerResidenceEventReason,
) {
let mut inner = self.0.lock().unwrap();
inner
.last_residence_changes
.write(LayerResidenceEvent::new(status, reason));
let mut locked = self.0.lock().unwrap();
locked.iter_mut().for_each(|inner| {
inner
.last_residence_changes
.write(LayerResidenceEvent::new(status, reason))
});
}
fn record_access(&self, access_kind: LayerAccessKind, task_kind: TaskKind) {
let mut inner = self.0.lock().unwrap();
let this_access = LayerAccessStatFullDetails {
when: SystemTime::now(),
task_kind,
access_kind,
};
inner.first_access.get_or_insert(this_access);
inner.count_by_access_kind[access_kind] += 1;
inner.task_kind_flag |= task_kind;
inner.last_accesses.write(this_access);
let mut locked = self.0.lock().unwrap();
locked.iter_mut().for_each(|inner| {
inner.first_access.get_or_insert(this_access);
inner.count_by_access_kind[access_kind] += 1;
inner.task_kind_flag |= task_kind;
inner.last_accesses.write(this_access);
})
}
fn as_api_model(
&self,
reset: LayerAccessStatsReset,
) -> pageserver_api::models::LayerAccessStats {
let mut inner = self.0.lock().unwrap();
let mut locked = self.0.lock().unwrap();
let inner = &mut locked.for_scraping_api;
let LayerAccessStatsInner {
first_access,
count_by_access_kind,
task_kind_flag,
last_accesses,
last_residence_changes,
} = &*inner;
} = inner;
let ret = pageserver_api::models::LayerAccessStats {
access_count_by_access_kind: count_by_access_kind
.iter()
@@ -231,6 +254,20 @@ impl LayerAccessStats {
}
ret
}
pub(super) fn most_recent_access_or_residence_event(
&self,
) -> Either<LayerAccessStatFullDetails, LayerResidenceEvent> {
let locked = self.0.lock().unwrap();
let inner = &locked.for_eviction_policy;
match inner.last_accesses.recent() {
Some(a) => Either::Left(*a),
None => match inner.last_residence_changes.recent() {
Some(e) => Either::Right(e.clone()),
None => unreachable!("constructors for LayerAccessStats ensure that there's always a residence change event"),
}
}
}
}
/// Supertrait of the [`Layer`] trait that captures the bare minimum interface

View File

@@ -1,5 +1,6 @@
//!
mod eviction_task;
mod walreceiver;
use anyhow::{anyhow, bail, ensure, Context};
@@ -47,7 +48,7 @@ use crate::metrics::TimelineMetrics;
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError};
use crate::tenant::config::TenantConfOpt;
use crate::tenant::config::{EvictionPolicy, TenantConfOpt};
use pageserver_api::reltag::RelTag;
use postgres_connection::PgConnectionConfig;
@@ -801,6 +802,7 @@ impl Timeline {
pub fn activate(self: &Arc<Self>) {
self.set_state(TimelineState::Active);
self.launch_wal_receiver();
self.launch_eviction_task();
}
pub fn set_state(&self, new_state: TimelineState) {
@@ -889,7 +891,10 @@ impl Timeline {
}
}
/// Evict multiple layers at once, continuing through errors.
///
/// Try to evict the given `layers_to_evict` by
///
/// 1. Replacing the given layer object in the layer map with a corresponding [`RemoteLayer`] object.
/// 2. Deleting the now unreferenced layer file from disk.
///
@@ -1057,6 +1062,13 @@ impl Timeline {
.unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
}
fn get_eviction_policy(&self) -> EvictionPolicy {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.eviction_policy
.unwrap_or(self.conf.default_tenant_conf.eviction_policy)
}
/// Open a Timeline handle.
///
/// Loads the metadata for the timeline into memory, but not the layer map.

View File

@@ -0,0 +1,199 @@
//! The per-timeline layer eviction task.
use std::{
ops::ControlFlow,
sync::Arc,
time::{Duration, SystemTime},
};
use either::Either;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, warn};
use crate::{
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
storage_layer::PersistentLayer,
},
};
use super::Timeline;
impl Timeline {
pub(super) fn launch_eviction_task(self: &Arc<Self>) {
let self_clone = Arc::clone(self);
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::Eviction,
Some(self.tenant_id),
Some(self.timeline_id),
&format!("layer eviction for {}/{}", self.tenant_id, self.timeline_id),
false,
async move {
self_clone.eviction_task(task_mgr::shutdown_token()).await;
info!("eviction task finishing");
Ok(())
},
);
}
#[instrument(skip_all, fields(tenant_id = %self.tenant_id, timeline_id = %self.timeline_id))]
async fn eviction_task(self: Arc<Self>, cancel: CancellationToken) {
loop {
let policy = self.get_eviction_policy();
let cf = self.eviction_iteration(&policy, cancel.clone()).await;
match cf {
ControlFlow::Break(()) => break,
ControlFlow::Continue(sleep_until) => {
tokio::select! {
_ = cancel.cancelled() => {
info!("shutting down");
break;
}
_ = tokio::time::sleep_until(sleep_until) => { }
}
}
}
}
}
#[instrument(skip_all, fields(policy_kind = policy.discriminant_str()))]
async fn eviction_iteration(
self: &Arc<Self>,
policy: &EvictionPolicy,
cancel: CancellationToken,
) -> ControlFlow<(), Instant> {
debug!("eviction iteration: {policy:?}");
match policy {
EvictionPolicy::NoEviction => {
// check again in 10 seconds; XXX config watch mechanism
ControlFlow::Continue(Instant::now() + Duration::from_secs(10))
}
EvictionPolicy::LayerAccessThreshold(p) => {
let start = Instant::now();
match self.eviction_iteration_threshold(p, cancel).await {
ControlFlow::Break(()) => return ControlFlow::Break(()),
ControlFlow::Continue(()) => (),
}
let elapsed = start.elapsed();
if elapsed > p.period {
warn!(
configured_period = %humantime::format_duration(p.period),
last_period = %humantime::format_duration(elapsed),
"this eviction period took longer than the configured period"
);
}
ControlFlow::Continue(start + p.period)
}
}
}
async fn eviction_iteration_threshold(
self: &Arc<Self>,
p: &EvictionPolicyLayerAccessThreshold,
cancel: CancellationToken,
) -> ControlFlow<()> {
let now = SystemTime::now();
#[allow(dead_code)]
#[derive(Debug, Default)]
struct EvictionStats {
not_considered_due_to_clock_skew: usize,
candidates: usize,
evicted: usize,
errors: usize,
not_evictable: usize,
skipped_for_shutdown: usize,
}
let mut stats = EvictionStats::default();
// Gather layers for eviction.
// NB: all the checks can be invalidated as soon as we release the layer map lock.
// We don't want to hold the layer map lock during eviction.
// So, we just need to deal with this.
let candidates: Vec<Arc<dyn PersistentLayer>> = {
let layers = self.layers.read().unwrap();
let mut candidates = Vec::new();
for hist_layer in layers.iter_historic_layers() {
if hist_layer.is_remote_layer() {
continue;
}
let last_activity_ts = match hist_layer
.access_stats()
.most_recent_access_or_residence_event()
{
Either::Left(mra) => mra.when,
Either::Right(re) => re.timestamp,
};
let no_activity_for = match now.duration_since(last_activity_ts) {
Ok(d) => d,
Err(_e) => {
// NB: don't log the error. If there are many layers and the system clock
// is skewed, we'd be flooding the log.
stats.not_considered_due_to_clock_skew += 1;
continue;
}
};
if no_activity_for > p.threshold {
candidates.push(hist_layer)
}
}
candidates
};
stats.candidates = candidates.len();
let remote_client = match self.remote_client.as_ref() {
None => {
error!(
num_candidates = candidates.len(),
"no remote storage configured, cannot evict layers"
);
return ControlFlow::Continue(());
}
Some(c) => c,
};
let results = match self
.evict_layer_batch(remote_client, &candidates[..], cancel)
.await
{
Err(pre_err) => {
stats.errors += candidates.len();
error!("could not do any evictions: {pre_err:#}");
return ControlFlow::Continue(());
}
Ok(results) => results,
};
assert_eq!(results.len(), candidates.len());
for (l, result) in candidates.iter().zip(results) {
match result {
None => {
stats.skipped_for_shutdown += 1;
}
Some(Ok(true)) => {
debug!("evicted layer {l:?}");
stats.evicted += 1;
}
Some(Ok(false)) => {
debug!("layer is not evictable: {l:?}");
stats.not_evictable += 1;
}
Some(Err(e)) => {
// This variant is the case where an unexpected error happened during eviction.
// Expected errors that result in non-eviction are `Some(Ok(false))`.
// So, dump Debug here to gather as much info as possible in this rare case.
warn!("failed to evict layer {l:?}: {e:?}");
stats.errors += 1;
}
}
}
if stats.not_considered_due_to_clock_skew > 0 || stats.errors > 0 || stats.not_evictable > 0
{
warn!(stats=?stats, "eviction iteration complete");
} else {
info!(stats=?stats, "eviction iteration complete");
}
ControlFlow::Continue(())
}
}