use std::{ collections::HashMap, num::{NonZeroU64, NonZeroUsize}, time::SystemTime, }; use byteorder::{BigEndian, ReadBytesExt}; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; use utils::{ history_buffer::HistoryBufferWithDropCounter, id::{NodeId, TenantId, TimelineId}, lsn::Lsn, }; use crate::reltag::RelTag; use anyhow::bail; use bytes::{BufMut, Bytes, BytesMut}; /// A state of a tenant in pageserver's memory. #[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub enum TenantState { // This tenant is being loaded from local disk Loading, // This tenant is being downloaded from cloud storage. Attaching, /// Tenant is fully operational Active, /// A tenant is recognized by pageserver, but it is being detached or the /// system is being shut down. Stopping, /// A tenant is recognized by the pageserver, but can no longer be used for /// any operations, because it failed to be activated. Broken, } pub mod state { pub const LOADING: &str = "loading"; pub const ATTACHING: &str = "attaching"; pub const ACTIVE: &str = "active"; pub const STOPPING: &str = "stopping"; pub const BROKEN: &str = "broken"; } impl TenantState { pub fn has_in_progress_downloads(&self) -> bool { match self { Self::Loading => true, Self::Attaching => true, Self::Active => false, Self::Stopping => false, Self::Broken => false, } } pub fn as_str(&self) -> &'static str { match self { TenantState::Loading => state::LOADING, TenantState::Attaching => state::ATTACHING, TenantState::Active => state::ACTIVE, TenantState::Stopping => state::STOPPING, TenantState::Broken => state::BROKEN, } } } /// A state of a timeline in pageserver's memory. #[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub enum TimelineState { /// The timeline is recognized by the pageserver but is not yet operational. /// In particular, the walreceiver connection loop is not running for this timeline. /// It will eventually transition to state Active or Broken. Loading, /// The timeline is fully operational. /// It can be queried, and the walreceiver connection loop is running. Active, /// The timeline was previously Loading or Active but is shutting down. /// It cannot transition back into any other state. Stopping, /// The timeline is broken and not operational (previous states: Loading or Active). Broken, } #[serde_as] #[derive(Serialize, Deserialize)] pub struct TimelineCreateRequest { #[serde(default)] #[serde_as(as = "Option")] pub new_timeline_id: Option, #[serde(default)] #[serde_as(as = "Option")] pub ancestor_timeline_id: Option, #[serde(default)] #[serde_as(as = "Option")] pub ancestor_start_lsn: Option, pub pg_version: Option, } #[serde_as] #[derive(Serialize, Deserialize, Default)] pub struct TenantCreateRequest { #[serde(default)] #[serde_as(as = "Option")] pub new_tenant_id: Option, pub checkpoint_distance: Option, pub checkpoint_timeout: Option, pub compaction_target_size: Option, pub compaction_period: Option, pub compaction_threshold: Option, pub gc_horizon: Option, pub gc_period: Option, pub image_creation_threshold: Option, pub pitr_interval: Option, pub walreceiver_connect_timeout: Option, pub lagging_wal_timeout: Option, pub max_lsn_wal_lag: Option, pub trace_read_requests: Option, // We defer the parsing of the eviction_policy field to the request handler. // Otherwise we'd have to move the types for eviction policy into this package. // We might do that once the eviction feature has stabilizied. // For now, this field is not even documented in the openapi_spec.yml. pub eviction_policy: Option, } #[serde_as] #[derive(Serialize, Deserialize)] #[serde(transparent)] pub struct TenantCreateResponse(#[serde_as(as = "DisplayFromStr")] pub TenantId); #[derive(Serialize)] pub struct StatusResponse { pub id: NodeId, } impl TenantCreateRequest { pub fn new(new_tenant_id: Option) -> TenantCreateRequest { TenantCreateRequest { new_tenant_id, ..Default::default() } } } #[serde_as] #[derive(Serialize, Deserialize)] pub struct TenantConfigRequest { #[serde_as(as = "DisplayFromStr")] pub tenant_id: TenantId, #[serde(default)] pub checkpoint_distance: Option, pub checkpoint_timeout: Option, pub compaction_target_size: Option, pub compaction_period: Option, pub compaction_threshold: Option, pub gc_horizon: Option, pub gc_period: Option, pub image_creation_threshold: Option, pub pitr_interval: Option, pub walreceiver_connect_timeout: Option, pub lagging_wal_timeout: Option, pub max_lsn_wal_lag: Option, pub trace_read_requests: Option, // We defer the parsing of the eviction_policy field to the request handler. // Otherwise we'd have to move the types for eviction policy into this package. // We might do that once the eviction feature has stabilizied. // For now, this field is not even documented in the openapi_spec.yml. pub eviction_policy: Option, } impl TenantConfigRequest { pub fn new(tenant_id: TenantId) -> TenantConfigRequest { TenantConfigRequest { tenant_id, checkpoint_distance: None, checkpoint_timeout: None, compaction_target_size: None, compaction_period: None, compaction_threshold: None, gc_horizon: None, gc_period: None, image_creation_threshold: None, pitr_interval: None, walreceiver_connect_timeout: None, lagging_wal_timeout: None, max_lsn_wal_lag: None, trace_read_requests: None, eviction_policy: None, } } } #[serde_as] #[derive(Serialize, Deserialize, Clone)] pub struct TenantInfo { #[serde_as(as = "DisplayFromStr")] pub id: TenantId, pub state: TenantState, /// Sum of the size of all layer files. /// If a layer is present in both local FS and S3, it counts only once. pub current_physical_size: Option, // physical size is only included in `tenant_status` endpoint pub has_in_progress_downloads: Option, } /// This represents the output of the "timeline_detail" and "timeline_list" API calls. #[serde_as] #[derive(Debug, Serialize, Deserialize, Clone)] pub struct TimelineInfo { #[serde_as(as = "DisplayFromStr")] pub tenant_id: TenantId, #[serde_as(as = "DisplayFromStr")] pub timeline_id: TimelineId, #[serde_as(as = "Option")] pub ancestor_timeline_id: Option, #[serde_as(as = "Option")] pub ancestor_lsn: Option, #[serde_as(as = "DisplayFromStr")] pub last_record_lsn: Lsn, #[serde_as(as = "Option")] pub prev_record_lsn: Option, #[serde_as(as = "DisplayFromStr")] pub latest_gc_cutoff_lsn: Lsn, #[serde_as(as = "DisplayFromStr")] pub disk_consistent_lsn: Lsn, #[serde_as(as = "DisplayFromStr")] pub remote_consistent_lsn: Lsn, pub current_logical_size: Option, // is None when timeline is Unloaded /// Sum of the size of all layer files. /// If a layer is present in both local FS and S3, it counts only once. pub current_physical_size: Option, // is None when timeline is Unloaded pub current_logical_size_non_incremental: Option, pub timeline_dir_layer_file_size_sum: Option, pub wal_source_connstr: Option, #[serde_as(as = "Option")] pub last_received_msg_lsn: Option, /// the timestamp (in microseconds) of the last received message pub last_received_msg_ts: Option, pub pg_version: u32, pub state: TimelineState, } #[derive(Debug, Clone, Serialize)] pub struct LayerMapInfo { pub in_memory_layers: Vec, pub historic_layers: Vec, } #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, enum_map::Enum)] #[repr(usize)] pub enum LayerAccessKind { GetValueReconstructData, Iter, KeyIter, Dump, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct LayerAccessStatFullDetails { pub when_millis_since_epoch: u64, pub task_kind: &'static str, pub access_kind: LayerAccessKind, } /// An event that impacts the layer's residence status. #[serde_as] #[derive(Debug, Clone, Serialize, Deserialize)] pub struct LayerResidenceEvent { /// The time when the event occurred. /// NB: this timestamp is captured while the residence status changes. /// So, it might be behind/ahead of the actual residence change by a short amount of time. /// #[serde(rename = "timestamp_millis_since_epoch")] #[serde_as(as = "serde_with::TimestampMilliSeconds")] pub timestamp: SystemTime, /// The new residence status of the layer. pub status: LayerResidenceStatus, /// The reason why we had to record this event. pub reason: LayerResidenceEventReason, } /// The reason for recording a given [`ResidenceEvent`]. #[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub enum LayerResidenceEventReason { /// The layer map is being populated, e.g. during timeline load or attach. /// This includes [`RemoteLayer`] objects created in [`reconcile_with_remote`]. /// We need to record such events because there is no persistent storage for the events. LayerLoad, /// We just created the layer (e.g., freeze_and_flush or compaction). /// Such layers are always [`LayerResidenceStatus::Resident`]. LayerCreate, /// We on-demand downloaded or evicted the given layer. ResidenceChange, } /// The residence status of the layer, after the given [`LayerResidenceEvent`]. #[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub enum LayerResidenceStatus { /// Residence status for a layer file that exists locally. /// It may also exist on the remote, we don't care here. Resident, /// Residence status for a layer file that only exists on the remote. Evicted, } impl LayerResidenceEvent { pub fn new(status: LayerResidenceStatus, reason: LayerResidenceEventReason) -> Self { Self { status, reason, timestamp: SystemTime::now(), } } } #[derive(Debug, Clone, Serialize)] pub struct LayerAccessStats { pub access_count_by_access_kind: HashMap, pub task_kind_access_flag: Vec<&'static str>, pub first: Option, pub accesses_history: HistoryBufferWithDropCounter, pub residence_events_history: HistoryBufferWithDropCounter, } #[serde_as] #[derive(Debug, Clone, Serialize)] #[serde(tag = "kind")] pub enum InMemoryLayerInfo { Open { #[serde_as(as = "DisplayFromStr")] lsn_start: Lsn, }, Frozen { #[serde_as(as = "DisplayFromStr")] lsn_start: Lsn, #[serde_as(as = "DisplayFromStr")] lsn_end: Lsn, }, } #[serde_as] #[derive(Debug, Clone, Serialize)] #[serde(tag = "kind")] pub enum HistoricLayerInfo { Delta { layer_file_name: String, layer_file_size: u64, #[serde_as(as = "DisplayFromStr")] lsn_start: Lsn, #[serde_as(as = "DisplayFromStr")] lsn_end: Lsn, remote: bool, access_stats: LayerAccessStats, }, Image { layer_file_name: String, layer_file_size: u64, #[serde_as(as = "DisplayFromStr")] lsn_start: Lsn, remote: bool, access_stats: LayerAccessStats, }, } #[derive(Debug, Serialize, Deserialize)] pub struct DownloadRemoteLayersTaskSpawnRequest { pub max_concurrent_downloads: NonZeroUsize, } #[derive(Debug, Serialize, Deserialize, Clone)] pub struct DownloadRemoteLayersTaskInfo { pub task_id: String, pub state: DownloadRemoteLayersTaskState, pub total_layer_count: u64, // stable once `completed` pub successful_download_count: u64, // stable once `completed` pub failed_download_count: u64, // stable once `completed` } #[derive(Debug, Serialize, Deserialize, Clone)] pub enum DownloadRemoteLayersTaskState { Running, Completed, ShutDown, } pub type ConfigureFailpointsRequest = Vec; /// Information for configuring a single fail point #[derive(Debug, Serialize, Deserialize)] pub struct FailpointConfig { /// Name of the fail point pub name: String, /// List of actions to take, using the format described in `fail::cfg` /// /// We also support `actions = "exit"` to cause the fail point to immediately exit. pub actions: String, } #[derive(Debug, Serialize, Deserialize)] pub struct TimelineGcRequest { pub gc_horizon: Option, } // Wrapped in libpq CopyData #[derive(PartialEq, Eq, Debug)] pub enum PagestreamFeMessage { Exists(PagestreamExistsRequest), Nblocks(PagestreamNblocksRequest), GetPage(PagestreamGetPageRequest), DbSize(PagestreamDbSizeRequest), } // Wrapped in libpq CopyData pub enum PagestreamBeMessage { Exists(PagestreamExistsResponse), Nblocks(PagestreamNblocksResponse), GetPage(PagestreamGetPageResponse), Error(PagestreamErrorResponse), DbSize(PagestreamDbSizeResponse), } #[derive(Debug, PartialEq, Eq)] pub struct PagestreamExistsRequest { pub latest: bool, pub lsn: Lsn, pub rel: RelTag, } #[derive(Debug, PartialEq, Eq)] pub struct PagestreamNblocksRequest { pub latest: bool, pub lsn: Lsn, pub rel: RelTag, } #[derive(Debug, PartialEq, Eq)] pub struct PagestreamGetPageRequest { pub latest: bool, pub lsn: Lsn, pub rel: RelTag, pub blkno: u32, } #[derive(Debug, PartialEq, Eq)] pub struct PagestreamDbSizeRequest { pub latest: bool, pub lsn: Lsn, pub dbnode: u32, } #[derive(Debug)] pub struct PagestreamExistsResponse { pub exists: bool, } #[derive(Debug)] pub struct PagestreamNblocksResponse { pub n_blocks: u32, } #[derive(Debug)] pub struct PagestreamGetPageResponse { pub page: Bytes, } #[derive(Debug)] pub struct PagestreamErrorResponse { pub message: String, } #[derive(Debug)] pub struct PagestreamDbSizeResponse { pub db_size: i64, } impl PagestreamFeMessage { pub fn serialize(&self) -> Bytes { let mut bytes = BytesMut::new(); match self { Self::Exists(req) => { bytes.put_u8(0); bytes.put_u8(u8::from(req.latest)); bytes.put_u64(req.lsn.0); bytes.put_u32(req.rel.spcnode); bytes.put_u32(req.rel.dbnode); bytes.put_u32(req.rel.relnode); bytes.put_u8(req.rel.forknum); } Self::Nblocks(req) => { bytes.put_u8(1); bytes.put_u8(u8::from(req.latest)); bytes.put_u64(req.lsn.0); bytes.put_u32(req.rel.spcnode); bytes.put_u32(req.rel.dbnode); bytes.put_u32(req.rel.relnode); bytes.put_u8(req.rel.forknum); } Self::GetPage(req) => { bytes.put_u8(2); bytes.put_u8(u8::from(req.latest)); bytes.put_u64(req.lsn.0); bytes.put_u32(req.rel.spcnode); bytes.put_u32(req.rel.dbnode); bytes.put_u32(req.rel.relnode); bytes.put_u8(req.rel.forknum); bytes.put_u32(req.blkno); } Self::DbSize(req) => { bytes.put_u8(3); bytes.put_u8(u8::from(req.latest)); bytes.put_u64(req.lsn.0); bytes.put_u32(req.dbnode); } } bytes.into() } pub fn parse(body: &mut R) -> anyhow::Result { // TODO these gets can fail // these correspond to the NeonMessageTag enum in pagestore_client.h // // TODO: consider using protobuf or serde bincode for less error prone // serialization. let msg_tag = body.read_u8()?; match msg_tag { 0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest { latest: body.read_u8()? != 0, lsn: Lsn::from(body.read_u64::()?), rel: RelTag { spcnode: body.read_u32::()?, dbnode: body.read_u32::()?, relnode: body.read_u32::()?, forknum: body.read_u8()?, }, })), 1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest { latest: body.read_u8()? != 0, lsn: Lsn::from(body.read_u64::()?), rel: RelTag { spcnode: body.read_u32::()?, dbnode: body.read_u32::()?, relnode: body.read_u32::()?, forknum: body.read_u8()?, }, })), 2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest { latest: body.read_u8()? != 0, lsn: Lsn::from(body.read_u64::()?), rel: RelTag { spcnode: body.read_u32::()?, dbnode: body.read_u32::()?, relnode: body.read_u32::()?, forknum: body.read_u8()?, }, blkno: body.read_u32::()?, })), 3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest { latest: body.read_u8()? != 0, lsn: Lsn::from(body.read_u64::()?), dbnode: body.read_u32::()?, })), _ => bail!("unknown smgr message tag: {:?}", msg_tag), } } } impl PagestreamBeMessage { pub fn serialize(&self) -> Bytes { let mut bytes = BytesMut::new(); match self { Self::Exists(resp) => { bytes.put_u8(100); /* tag from pagestore_client.h */ bytes.put_u8(resp.exists as u8); } Self::Nblocks(resp) => { bytes.put_u8(101); /* tag from pagestore_client.h */ bytes.put_u32(resp.n_blocks); } Self::GetPage(resp) => { bytes.put_u8(102); /* tag from pagestore_client.h */ bytes.put(&resp.page[..]); } Self::Error(resp) => { bytes.put_u8(103); /* tag from pagestore_client.h */ bytes.put(resp.message.as_bytes()); bytes.put_u8(0); // null terminator } Self::DbSize(resp) => { bytes.put_u8(104); /* tag from pagestore_client.h */ bytes.put_i64(resp.db_size); } } bytes.into() } } #[cfg(test)] mod tests { use bytes::Buf; use super::*; #[test] fn test_pagestream() { // Test serialization/deserialization of PagestreamFeMessage let messages = vec![ PagestreamFeMessage::Exists(PagestreamExistsRequest { latest: true, lsn: Lsn(4), rel: RelTag { forknum: 1, spcnode: 2, dbnode: 3, relnode: 4, }, }), PagestreamFeMessage::Nblocks(PagestreamNblocksRequest { latest: false, lsn: Lsn(4), rel: RelTag { forknum: 1, spcnode: 2, dbnode: 3, relnode: 4, }, }), PagestreamFeMessage::GetPage(PagestreamGetPageRequest { latest: true, lsn: Lsn(4), rel: RelTag { forknum: 1, spcnode: 2, dbnode: 3, relnode: 4, }, blkno: 7, }), PagestreamFeMessage::DbSize(PagestreamDbSizeRequest { latest: true, lsn: Lsn(4), dbnode: 7, }), ]; for msg in messages { let bytes = msg.serialize(); let reconstructed = PagestreamFeMessage::parse(&mut bytes.reader()).unwrap(); assert!(msg == reconstructed); } } }