Files
neon/libs/pageserver_api/src/models.rs
Christian Schwarz 8895f28dae make evictions_low_residence_duration_metric_threshold per-tenant (#3949)
Before this patch, if a tenant would override its eviction_policy
setting to use a lower LayerAccessThreshold::threshold than the
`evictions_low_residence_duration_metric_threshold`, the evictions done
for that tenant would count towards the
`evictions_with_low_residence_duration` metric.

That metric is used to identify pre-mature evictions, commonly triggered
by disk-usage-based eviction under disk pressure.

We don't want that to happen for the legitimate evictions of the tenant
that overrides its eviction_policy.

So, this patch
- moves the setting into TenantConf
- adds test coverage
- updates the staging & prod yamls

Forward Compatibility:
Software before this patch will ignore the new tenant conf field and use
the global one instead.
So we can roll back safely.

Backward Compatibility:
Parsing old configs with software as of this patch will fail in
`PageServerConf::parse_and_validate` with error 
`unrecognized pageserver option 'evictions_low_residence_duration_metric_threshold'`
if the option is still present in the global section.
We deal with this by updating the configs in Ansible.

fixes https://github.com/neondatabase/neon/issues/3940
2023-04-14 13:25:45 +03:00

740 lines
23 KiB
Rust

use std::{
collections::HashMap,
num::{NonZeroU64, NonZeroUsize},
time::SystemTime,
};
use byteorder::{BigEndian, ReadBytesExt};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use strum_macros;
use utils::{
history_buffer::HistoryBufferWithDropCounter,
id::{NodeId, TenantId, TimelineId},
lsn::Lsn,
};
use crate::reltag::RelTag;
use anyhow::bail;
use bytes::{BufMut, Bytes, BytesMut};
/// A state of a tenant in pageserver's memory.
#[derive(
Clone,
PartialEq,
Eq,
serde::Serialize,
serde::Deserialize,
strum_macros::Display,
strum_macros::EnumString,
strum_macros::EnumVariantNames,
strum_macros::AsRefStr,
strum_macros::IntoStaticStr,
)]
#[serde(tag = "slug", content = "data")]
pub enum TenantState {
/// This tenant is being loaded from local disk
Loading,
/// This tenant is being downloaded from cloud storage.
Attaching,
/// Tenant is fully operational
Active,
/// A tenant is recognized by pageserver, but it is being detached or the
/// system is being shut down.
Stopping,
/// A tenant is recognized by the pageserver, but can no longer be used for
/// any operations, because it failed to be activated.
Broken { reason: String, backtrace: String },
}
impl TenantState {
pub fn has_in_progress_downloads(&self) -> bool {
match self {
Self::Loading => true,
Self::Attaching => true,
Self::Active => false,
Self::Stopping => false,
Self::Broken { .. } => false,
}
}
pub fn broken_from_reason(reason: String) -> Self {
let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
Self::Broken {
reason,
backtrace: backtrace_str,
}
}
}
impl std::fmt::Debug for TenantState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Broken { reason, backtrace } if !reason.is_empty() => {
write!(f, "Broken due to: {reason}. Backtrace:\n{backtrace}")
}
_ => write!(f, "{self}"),
}
}
}
/// A state of a timeline in pageserver's memory.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum TimelineState {
/// The timeline is recognized by the pageserver but is not yet operational.
/// In particular, the walreceiver connection loop is not running for this timeline.
/// It will eventually transition to state Active or Broken.
Loading,
/// The timeline is fully operational.
/// It can be queried, and the walreceiver connection loop is running.
Active,
/// The timeline was previously Loading or Active but is shutting down.
/// It cannot transition back into any other state.
Stopping,
/// The timeline is broken and not operational (previous states: Loading or Active).
Broken,
}
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct TimelineCreateRequest {
#[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub new_timeline_id: Option<TimelineId>,
#[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_timeline_id: Option<TimelineId>,
#[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_start_lsn: Option<Lsn>,
pub pg_version: Option<u32>,
}
#[serde_as]
#[derive(Serialize, Deserialize, Default)]
pub struct TenantCreateRequest {
#[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub new_tenant_id: Option<TenantId>,
pub checkpoint_distance: Option<u64>,
pub checkpoint_timeout: Option<String>,
pub compaction_target_size: Option<u64>,
pub compaction_period: Option<String>,
pub compaction_threshold: Option<usize>,
pub gc_horizon: Option<u64>,
pub gc_period: Option<String>,
pub image_creation_threshold: Option<usize>,
pub pitr_interval: Option<String>,
pub walreceiver_connect_timeout: Option<String>,
pub lagging_wal_timeout: Option<String>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
pub trace_read_requests: Option<bool>,
// We defer the parsing of the eviction_policy field to the request handler.
// Otherwise we'd have to move the types for eviction policy into this package.
// We might do that once the eviction feature has stabilizied.
// For now, this field is not even documented in the openapi_spec.yml.
pub eviction_policy: Option<serde_json::Value>,
pub min_resident_size_override: Option<u64>,
pub evictions_low_residence_duration_metric_threshold: Option<String>,
}
#[serde_as]
#[derive(Serialize, Deserialize)]
#[serde(transparent)]
pub struct TenantCreateResponse(#[serde_as(as = "DisplayFromStr")] pub TenantId);
#[derive(Serialize)]
pub struct StatusResponse {
pub id: NodeId,
}
impl TenantCreateRequest {
pub fn new(new_tenant_id: Option<TenantId>) -> TenantCreateRequest {
TenantCreateRequest {
new_tenant_id,
..Default::default()
}
}
}
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct TenantConfigRequest {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
#[serde(default)]
pub checkpoint_distance: Option<u64>,
pub checkpoint_timeout: Option<String>,
pub compaction_target_size: Option<u64>,
pub compaction_period: Option<String>,
pub compaction_threshold: Option<usize>,
pub gc_horizon: Option<u64>,
pub gc_period: Option<String>,
pub image_creation_threshold: Option<usize>,
pub pitr_interval: Option<String>,
pub walreceiver_connect_timeout: Option<String>,
pub lagging_wal_timeout: Option<String>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
pub trace_read_requests: Option<bool>,
// We defer the parsing of the eviction_policy field to the request handler.
// Otherwise we'd have to move the types for eviction policy into this package.
// We might do that once the eviction feature has stabilizied.
// For now, this field is not even documented in the openapi_spec.yml.
pub eviction_policy: Option<serde_json::Value>,
pub min_resident_size_override: Option<u64>,
pub evictions_low_residence_duration_metric_threshold: Option<String>,
}
impl TenantConfigRequest {
pub fn new(tenant_id: TenantId) -> TenantConfigRequest {
TenantConfigRequest {
tenant_id,
checkpoint_distance: None,
checkpoint_timeout: None,
compaction_target_size: None,
compaction_period: None,
compaction_threshold: None,
gc_horizon: None,
gc_period: None,
image_creation_threshold: None,
pitr_interval: None,
walreceiver_connect_timeout: None,
lagging_wal_timeout: None,
max_lsn_wal_lag: None,
trace_read_requests: None,
eviction_policy: None,
min_resident_size_override: None,
evictions_low_residence_duration_metric_threshold: None,
}
}
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone)]
pub struct TenantInfo {
#[serde_as(as = "DisplayFromStr")]
pub id: TenantId,
pub state: TenantState,
/// Sum of the size of all layer files.
/// If a layer is present in both local FS and S3, it counts only once.
pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
pub has_in_progress_downloads: Option<bool>,
}
/// This represents the output of the "timeline_detail" and "timeline_list" API calls.
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TimelineInfo {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
#[serde_as(as = "DisplayFromStr")]
pub timeline_id: TimelineId,
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_timeline_id: Option<TimelineId>,
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_lsn: Option<Lsn>,
#[serde_as(as = "DisplayFromStr")]
pub last_record_lsn: Lsn,
#[serde_as(as = "Option<DisplayFromStr>")]
pub prev_record_lsn: Option<Lsn>,
#[serde_as(as = "DisplayFromStr")]
pub latest_gc_cutoff_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
pub disk_consistent_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
pub remote_consistent_lsn: Lsn,
pub current_logical_size: Option<u64>, // is None when timeline is Unloaded
/// Sum of the size of all layer files.
/// If a layer is present in both local FS and S3, it counts only once.
pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
pub current_logical_size_non_incremental: Option<u64>,
pub timeline_dir_layer_file_size_sum: Option<u64>,
pub wal_source_connstr: Option<String>,
#[serde_as(as = "Option<DisplayFromStr>")]
pub last_received_msg_lsn: Option<Lsn>,
/// the timestamp (in microseconds) of the last received message
pub last_received_msg_ts: Option<u128>,
pub pg_version: u32,
pub state: TimelineState,
}
#[derive(Debug, Clone, Serialize)]
pub struct LayerMapInfo {
pub in_memory_layers: Vec<InMemoryLayerInfo>,
pub historic_layers: Vec<HistoricLayerInfo>,
}
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, enum_map::Enum)]
#[repr(usize)]
pub enum LayerAccessKind {
GetValueReconstructData,
Iter,
KeyIter,
Dump,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LayerAccessStatFullDetails {
pub when_millis_since_epoch: u64,
pub task_kind: &'static str,
pub access_kind: LayerAccessKind,
}
/// An event that impacts the layer's residence status.
#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LayerResidenceEvent {
/// The time when the event occurred.
/// NB: this timestamp is captured while the residence status changes.
/// So, it might be behind/ahead of the actual residence change by a short amount of time.
///
#[serde(rename = "timestamp_millis_since_epoch")]
#[serde_as(as = "serde_with::TimestampMilliSeconds")]
pub timestamp: SystemTime,
/// The new residence status of the layer.
pub status: LayerResidenceStatus,
/// The reason why we had to record this event.
pub reason: LayerResidenceEventReason,
}
/// The reason for recording a given [`ResidenceEvent`].
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum LayerResidenceEventReason {
/// The layer map is being populated, e.g. during timeline load or attach.
/// This includes [`RemoteLayer`] objects created in [`reconcile_with_remote`].
/// We need to record such events because there is no persistent storage for the events.
LayerLoad,
/// We just created the layer (e.g., freeze_and_flush or compaction).
/// Such layers are always [`LayerResidenceStatus::Resident`].
LayerCreate,
/// We on-demand downloaded or evicted the given layer.
ResidenceChange,
}
/// The residence status of the layer, after the given [`LayerResidenceEvent`].
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum LayerResidenceStatus {
/// Residence status for a layer file that exists locally.
/// It may also exist on the remote, we don't care here.
Resident,
/// Residence status for a layer file that only exists on the remote.
Evicted,
}
impl LayerResidenceEvent {
pub fn new(status: LayerResidenceStatus, reason: LayerResidenceEventReason) -> Self {
Self {
status,
reason,
timestamp: SystemTime::now(),
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct LayerAccessStats {
pub access_count_by_access_kind: HashMap<LayerAccessKind, u64>,
pub task_kind_access_flag: Vec<&'static str>,
pub first: Option<LayerAccessStatFullDetails>,
pub accesses_history: HistoryBufferWithDropCounter<LayerAccessStatFullDetails, 16>,
pub residence_events_history: HistoryBufferWithDropCounter<LayerResidenceEvent, 16>,
}
#[serde_as]
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "kind")]
pub enum InMemoryLayerInfo {
Open {
#[serde_as(as = "DisplayFromStr")]
lsn_start: Lsn,
},
Frozen {
#[serde_as(as = "DisplayFromStr")]
lsn_start: Lsn,
#[serde_as(as = "DisplayFromStr")]
lsn_end: Lsn,
},
}
#[serde_as]
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "kind")]
pub enum HistoricLayerInfo {
Delta {
layer_file_name: String,
layer_file_size: u64,
#[serde_as(as = "DisplayFromStr")]
lsn_start: Lsn,
#[serde_as(as = "DisplayFromStr")]
lsn_end: Lsn,
remote: bool,
access_stats: LayerAccessStats,
},
Image {
layer_file_name: String,
layer_file_size: u64,
#[serde_as(as = "DisplayFromStr")]
lsn_start: Lsn,
remote: bool,
access_stats: LayerAccessStats,
},
}
#[derive(Debug, Serialize, Deserialize)]
pub struct DownloadRemoteLayersTaskSpawnRequest {
pub max_concurrent_downloads: NonZeroUsize,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct DownloadRemoteLayersTaskInfo {
pub task_id: String,
pub state: DownloadRemoteLayersTaskState,
pub total_layer_count: u64, // stable once `completed`
pub successful_download_count: u64, // stable once `completed`
pub failed_download_count: u64, // stable once `completed`
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum DownloadRemoteLayersTaskState {
Running,
Completed,
ShutDown,
}
pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
/// Information for configuring a single fail point
#[derive(Debug, Serialize, Deserialize)]
pub struct FailpointConfig {
/// Name of the fail point
pub name: String,
/// List of actions to take, using the format described in `fail::cfg`
///
/// We also support `actions = "exit"` to cause the fail point to immediately exit.
pub actions: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct TimelineGcRequest {
pub gc_horizon: Option<u64>,
}
// Wrapped in libpq CopyData
#[derive(PartialEq, Eq, Debug)]
pub enum PagestreamFeMessage {
Exists(PagestreamExistsRequest),
Nblocks(PagestreamNblocksRequest),
GetPage(PagestreamGetPageRequest),
DbSize(PagestreamDbSizeRequest),
}
// Wrapped in libpq CopyData
pub enum PagestreamBeMessage {
Exists(PagestreamExistsResponse),
Nblocks(PagestreamNblocksResponse),
GetPage(PagestreamGetPageResponse),
Error(PagestreamErrorResponse),
DbSize(PagestreamDbSizeResponse),
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamExistsRequest {
pub latest: bool,
pub lsn: Lsn,
pub rel: RelTag,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamNblocksRequest {
pub latest: bool,
pub lsn: Lsn,
pub rel: RelTag,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamGetPageRequest {
pub latest: bool,
pub lsn: Lsn,
pub rel: RelTag,
pub blkno: u32,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamDbSizeRequest {
pub latest: bool,
pub lsn: Lsn,
pub dbnode: u32,
}
#[derive(Debug)]
pub struct PagestreamExistsResponse {
pub exists: bool,
}
#[derive(Debug)]
pub struct PagestreamNblocksResponse {
pub n_blocks: u32,
}
#[derive(Debug)]
pub struct PagestreamGetPageResponse {
pub page: Bytes,
}
#[derive(Debug)]
pub struct PagestreamErrorResponse {
pub message: String,
}
#[derive(Debug)]
pub struct PagestreamDbSizeResponse {
pub db_size: i64,
}
impl PagestreamFeMessage {
pub fn serialize(&self) -> Bytes {
let mut bytes = BytesMut::new();
match self {
Self::Exists(req) => {
bytes.put_u8(0);
bytes.put_u8(u8::from(req.latest));
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
}
Self::Nblocks(req) => {
bytes.put_u8(1);
bytes.put_u8(u8::from(req.latest));
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
}
Self::GetPage(req) => {
bytes.put_u8(2);
bytes.put_u8(u8::from(req.latest));
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
bytes.put_u32(req.blkno);
}
Self::DbSize(req) => {
bytes.put_u8(3);
bytes.put_u8(u8::from(req.latest));
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.dbnode);
}
}
bytes.into()
}
pub fn parse<R: std::io::Read>(body: &mut R) -> anyhow::Result<PagestreamFeMessage> {
// TODO these gets can fail
// these correspond to the NeonMessageTag enum in pagestore_client.h
//
// TODO: consider using protobuf or serde bincode for less error prone
// serialization.
let msg_tag = body.read_u8()?;
match msg_tag {
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
})),
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
})),
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
blkno: body.read_u32::<BigEndian>()?,
})),
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
dbnode: body.read_u32::<BigEndian>()?,
})),
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
}
}
}
impl PagestreamBeMessage {
pub fn serialize(&self) -> Bytes {
let mut bytes = BytesMut::new();
match self {
Self::Exists(resp) => {
bytes.put_u8(100); /* tag from pagestore_client.h */
bytes.put_u8(resp.exists as u8);
}
Self::Nblocks(resp) => {
bytes.put_u8(101); /* tag from pagestore_client.h */
bytes.put_u32(resp.n_blocks);
}
Self::GetPage(resp) => {
bytes.put_u8(102); /* tag from pagestore_client.h */
bytes.put(&resp.page[..]);
}
Self::Error(resp) => {
bytes.put_u8(103); /* tag from pagestore_client.h */
bytes.put(resp.message.as_bytes());
bytes.put_u8(0); // null terminator
}
Self::DbSize(resp) => {
bytes.put_u8(104); /* tag from pagestore_client.h */
bytes.put_i64(resp.db_size);
}
}
bytes.into()
}
}
#[cfg(test)]
mod tests {
use bytes::Buf;
use serde_json::json;
use super::*;
#[test]
fn test_pagestream() {
// Test serialization/deserialization of PagestreamFeMessage
let messages = vec![
PagestreamFeMessage::Exists(PagestreamExistsRequest {
latest: true,
lsn: Lsn(4),
rel: RelTag {
forknum: 1,
spcnode: 2,
dbnode: 3,
relnode: 4,
},
}),
PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
latest: false,
lsn: Lsn(4),
rel: RelTag {
forknum: 1,
spcnode: 2,
dbnode: 3,
relnode: 4,
},
}),
PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
latest: true,
lsn: Lsn(4),
rel: RelTag {
forknum: 1,
spcnode: 2,
dbnode: 3,
relnode: 4,
},
blkno: 7,
}),
PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
latest: true,
lsn: Lsn(4),
dbnode: 7,
}),
];
for msg in messages {
let bytes = msg.serialize();
let reconstructed = PagestreamFeMessage::parse(&mut bytes.reader()).unwrap();
assert!(msg == reconstructed);
}
}
#[test]
fn test_tenantinfo_serde() {
// Test serialization/deserialization of TenantInfo
let original_active = TenantInfo {
id: TenantId::generate(),
state: TenantState::Active,
current_physical_size: Some(42),
has_in_progress_downloads: Some(false),
};
let expected_active = json!({
"id": original_active.id.to_string(),
"state": {
"slug": "Active",
},
"current_physical_size": 42,
"has_in_progress_downloads": false,
});
let original_broken = TenantInfo {
id: TenantId::generate(),
state: TenantState::Broken {
reason: "reason".into(),
backtrace: "backtrace info".into(),
},
current_physical_size: Some(42),
has_in_progress_downloads: Some(false),
};
let expected_broken = json!({
"id": original_broken.id.to_string(),
"state": {
"slug": "Broken",
"data": {
"backtrace": "backtrace info",
"reason": "reason",
}
},
"current_physical_size": 42,
"has_in_progress_downloads": false,
});
assert_eq!(
serde_json::to_value(&original_active).unwrap(),
expected_active
);
assert_eq!(
serde_json::to_value(&original_broken).unwrap(),
expected_broken
);
assert!(format!("{:?}", &original_broken.state).contains("reason"));
assert!(format!("{:?}", &original_broken.state).contains("backtrace info"));
}
}