mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-23 16:10:37 +00:00
This is prep for https://github.com/neondatabase/neon/pull/4255 [1/X] OpenAPI: share a single definition of TenantConfig DRYs up the pageserver OpenAPI YAML's representation of tenant config. All the fields of tenant config are now located in a model schema called TenantConfig. The tenant create & config-change endpoints have separate schemas, TenantCreateInfo and TenantConfigureArg, respectively. These schemas inherit from TenantConfig, using allOf 1. The tenant config-GET handler's response was previously named TenantConfig. It's now named TenantConfigResponse. None of these changes affect how the request looks on the wire. The generated Go code will change for Console because the OpenAPI code generator maps `allOf` to a Go struct embedding. Luckily, usage of tenant config in Console is still very lightweigt, but that will change in the near future. So, this is a good chance to set things straight. The console changes are tracked in https://github.com/neondatabase/cloud/pull/5046 [2/x]: extract the tenant config parts of create & config requests [3/x]: code movement: move TenantConfigRequestConfig next to TenantCreateRequestConfig [4/x] type-alias TenantConfigRequestConfig = TenantCreateRequestConfig; They are exactly the same. [5/x] switch to qualified use for tenant create/config request api models [6/x] rename models::TenantConfig{RequestConfig,} and remove the alias [7/x] OpenAPI: sync tenant create & configure body names from Rust code [8/x]: dedupe the two TryFrom<...> for TenantConfOpt impls The only difference is that the TenantConfigRequest impl does ``` tenant_conf.max_lsn_wal_lag = request_data.max_lsn_wal_lag; tenant_conf.trace_read_requests = request_data.trace_read_requests; ``` and the TenantCreateRequest impl does ``` if let Some(max_lsn_wal_lag) = request_data.max_lsn_wal_lag { tenant_conf.max_lsn_wal_lag = Some(max_lsn_wal_lag); } if let Some(trace_read_requests) = request_data.trace_read_requests { tenant_conf.trace_read_requests = Some(trace_read_requests); } ``` As far as I can tell, these are identical.
772 lines
25 KiB
Rust
772 lines
25 KiB
Rust
use std::{
|
|
collections::HashMap,
|
|
num::{NonZeroU64, NonZeroUsize},
|
|
time::SystemTime,
|
|
};
|
|
|
|
use byteorder::{BigEndian, ReadBytesExt};
|
|
use serde::{Deserialize, Serialize};
|
|
use serde_with::{serde_as, DisplayFromStr};
|
|
use strum_macros;
|
|
use utils::{
|
|
history_buffer::HistoryBufferWithDropCounter,
|
|
id::{NodeId, TenantId, TimelineId},
|
|
lsn::Lsn,
|
|
};
|
|
|
|
use crate::reltag::RelTag;
|
|
use anyhow::bail;
|
|
use bytes::{BufMut, Bytes, BytesMut};
|
|
|
|
/// A state of a tenant in pageserver's memory.
|
|
#[derive(
|
|
Clone,
|
|
PartialEq,
|
|
Eq,
|
|
serde::Serialize,
|
|
serde::Deserialize,
|
|
strum_macros::Display,
|
|
strum_macros::EnumString,
|
|
strum_macros::EnumVariantNames,
|
|
strum_macros::AsRefStr,
|
|
strum_macros::IntoStaticStr,
|
|
)]
|
|
#[serde(tag = "slug", content = "data")]
|
|
pub enum TenantState {
|
|
/// This tenant is being loaded from local disk
|
|
Loading,
|
|
/// This tenant is being downloaded from cloud storage.
|
|
Attaching,
|
|
/// Tenant is fully operational
|
|
Active,
|
|
/// A tenant is recognized by pageserver, but it is being detached or the
|
|
/// system is being shut down.
|
|
Stopping,
|
|
/// A tenant is recognized by the pageserver, but can no longer be used for
|
|
/// any operations, because it failed to be activated.
|
|
Broken { reason: String, backtrace: String },
|
|
}
|
|
|
|
impl TenantState {
|
|
pub fn attachment_status(&self) -> TenantAttachmentStatus {
|
|
use TenantAttachmentStatus::*;
|
|
match self {
|
|
// The attach procedure writes the marker file before adding the Attaching tenant to the tenants map.
|
|
// So, technically, we can return Attached here.
|
|
// However, as soon as Console observes Attached, it will proceed with the Postgres-level health check.
|
|
// But, our attach task might still be fetching the remote timelines, etc.
|
|
// So, return `Maybe` while Attaching, making Console wait for the attach task to finish.
|
|
Self::Attaching => Maybe,
|
|
// tenant mgr startup distinguishes attaching from loading via marker file.
|
|
// If it's loading, there is no attach marker file, i.e., attach had finished in the past.
|
|
Self::Loading => Attached,
|
|
// We only reach Active after successful load / attach.
|
|
// So, call atttachment status Attached.
|
|
Self::Active => Attached,
|
|
// If the (initial or resumed) attach procedure fails, the tenant becomes Broken.
|
|
// However, it also becomes Broken if the regular load fails.
|
|
// We would need a separate TenantState variant to distinguish these cases.
|
|
// However, there's no practical difference from Console's perspective.
|
|
// It will run a Postgres-level health check as soon as it observes Attached.
|
|
// That will fail on Broken tenants.
|
|
// Console can then rollback the attach, or, wait for operator to fix the Broken tenant.
|
|
Self::Broken { .. } => Attached,
|
|
// Why is Stopping a Maybe case? Because, during pageserver shutdown,
|
|
// we set the Stopping state irrespective of whether the tenant
|
|
// has finished attaching or not.
|
|
Self::Stopping => Maybe,
|
|
}
|
|
}
|
|
|
|
pub fn broken_from_reason(reason: String) -> Self {
|
|
let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
|
|
Self::Broken {
|
|
reason,
|
|
backtrace: backtrace_str,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl std::fmt::Debug for TenantState {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
match self {
|
|
Self::Broken { reason, backtrace } if !reason.is_empty() => {
|
|
write!(f, "Broken due to: {reason}. Backtrace:\n{backtrace}")
|
|
}
|
|
_ => write!(f, "{self}"),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// A state of a timeline in pageserver's memory.
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
|
pub enum TimelineState {
|
|
/// The timeline is recognized by the pageserver but is not yet operational.
|
|
/// In particular, the walreceiver connection loop is not running for this timeline.
|
|
/// It will eventually transition to state Active or Broken.
|
|
Loading,
|
|
/// The timeline is fully operational.
|
|
/// It can be queried, and the walreceiver connection loop is running.
|
|
Active,
|
|
/// The timeline was previously Loading or Active but is shutting down.
|
|
/// It cannot transition back into any other state.
|
|
Stopping,
|
|
/// The timeline is broken and not operational (previous states: Loading or Active).
|
|
Broken,
|
|
}
|
|
|
|
#[serde_as]
|
|
#[derive(Serialize, Deserialize)]
|
|
pub struct TimelineCreateRequest {
|
|
#[serde(default)]
|
|
#[serde_as(as = "Option<DisplayFromStr>")]
|
|
pub new_timeline_id: Option<TimelineId>,
|
|
#[serde(default)]
|
|
#[serde_as(as = "Option<DisplayFromStr>")]
|
|
pub ancestor_timeline_id: Option<TimelineId>,
|
|
#[serde(default)]
|
|
#[serde_as(as = "Option<DisplayFromStr>")]
|
|
pub ancestor_start_lsn: Option<Lsn>,
|
|
pub pg_version: Option<u32>,
|
|
}
|
|
|
|
#[serde_as]
|
|
#[derive(Serialize, Deserialize, Default)]
|
|
pub struct TenantCreateRequest {
|
|
#[serde(default)]
|
|
#[serde_as(as = "Option<DisplayFromStr>")]
|
|
pub new_tenant_id: Option<TenantId>,
|
|
#[serde(flatten)]
|
|
pub config: TenantConfig,
|
|
}
|
|
|
|
impl std::ops::Deref for TenantCreateRequest {
|
|
type Target = TenantConfig;
|
|
|
|
fn deref(&self) -> &Self::Target {
|
|
&self.config
|
|
}
|
|
}
|
|
|
|
#[derive(Serialize, Deserialize, Default)]
|
|
pub struct TenantConfig {
|
|
pub checkpoint_distance: Option<u64>,
|
|
pub checkpoint_timeout: Option<String>,
|
|
pub compaction_target_size: Option<u64>,
|
|
pub compaction_period: Option<String>,
|
|
pub compaction_threshold: Option<usize>,
|
|
pub gc_horizon: Option<u64>,
|
|
pub gc_period: Option<String>,
|
|
pub image_creation_threshold: Option<usize>,
|
|
pub pitr_interval: Option<String>,
|
|
pub walreceiver_connect_timeout: Option<String>,
|
|
pub lagging_wal_timeout: Option<String>,
|
|
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
|
pub trace_read_requests: Option<bool>,
|
|
// We defer the parsing of the eviction_policy field to the request handler.
|
|
// Otherwise we'd have to move the types for eviction policy into this package.
|
|
// We might do that once the eviction feature has stabilizied.
|
|
// For now, this field is not even documented in the openapi_spec.yml.
|
|
pub eviction_policy: Option<serde_json::Value>,
|
|
pub min_resident_size_override: Option<u64>,
|
|
pub evictions_low_residence_duration_metric_threshold: Option<String>,
|
|
}
|
|
|
|
#[serde_as]
|
|
#[derive(Serialize, Deserialize)]
|
|
#[serde(transparent)]
|
|
pub struct TenantCreateResponse(#[serde_as(as = "DisplayFromStr")] pub TenantId);
|
|
|
|
#[derive(Serialize)]
|
|
pub struct StatusResponse {
|
|
pub id: NodeId,
|
|
}
|
|
|
|
impl TenantCreateRequest {
|
|
pub fn new(new_tenant_id: Option<TenantId>) -> TenantCreateRequest {
|
|
TenantCreateRequest {
|
|
new_tenant_id,
|
|
..Default::default()
|
|
}
|
|
}
|
|
}
|
|
|
|
#[serde_as]
|
|
#[derive(Serialize, Deserialize)]
|
|
pub struct TenantConfigRequest {
|
|
#[serde_as(as = "DisplayFromStr")]
|
|
pub tenant_id: TenantId,
|
|
#[serde(flatten)]
|
|
pub config: TenantConfig,
|
|
}
|
|
|
|
impl std::ops::Deref for TenantConfigRequest {
|
|
type Target = TenantConfig;
|
|
|
|
fn deref(&self) -> &Self::Target {
|
|
&self.config
|
|
}
|
|
}
|
|
|
|
impl TenantConfigRequest {
|
|
pub fn new(tenant_id: TenantId) -> TenantConfigRequest {
|
|
let config = TenantConfig {
|
|
checkpoint_distance: None,
|
|
checkpoint_timeout: None,
|
|
compaction_target_size: None,
|
|
compaction_period: None,
|
|
compaction_threshold: None,
|
|
gc_horizon: None,
|
|
gc_period: None,
|
|
image_creation_threshold: None,
|
|
pitr_interval: None,
|
|
walreceiver_connect_timeout: None,
|
|
lagging_wal_timeout: None,
|
|
max_lsn_wal_lag: None,
|
|
trace_read_requests: None,
|
|
eviction_policy: None,
|
|
min_resident_size_override: None,
|
|
evictions_low_residence_duration_metric_threshold: None,
|
|
};
|
|
TenantConfigRequest { tenant_id, config }
|
|
}
|
|
}
|
|
|
|
/// See [`TenantState::attachment_status`] and the OpenAPI docs for context.
|
|
#[derive(Serialize, Deserialize, Clone)]
|
|
#[serde(rename_all = "snake_case")]
|
|
pub enum TenantAttachmentStatus {
|
|
Maybe,
|
|
Attached,
|
|
}
|
|
|
|
#[serde_as]
|
|
#[derive(Serialize, Deserialize, Clone)]
|
|
pub struct TenantInfo {
|
|
#[serde_as(as = "DisplayFromStr")]
|
|
pub id: TenantId,
|
|
// NB: intentionally not part of OpenAPI, we don't want to commit to a specific set of TenantState's
|
|
pub state: TenantState,
|
|
/// Sum of the size of all layer files.
|
|
/// If a layer is present in both local FS and S3, it counts only once.
|
|
pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
|
|
pub attachment_status: TenantAttachmentStatus,
|
|
}
|
|
|
|
/// This represents the output of the "timeline_detail" and "timeline_list" API calls.
|
|
#[serde_as]
|
|
#[derive(Debug, Serialize, Deserialize, Clone)]
|
|
pub struct TimelineInfo {
|
|
#[serde_as(as = "DisplayFromStr")]
|
|
pub tenant_id: TenantId,
|
|
#[serde_as(as = "DisplayFromStr")]
|
|
pub timeline_id: TimelineId,
|
|
|
|
#[serde_as(as = "Option<DisplayFromStr>")]
|
|
pub ancestor_timeline_id: Option<TimelineId>,
|
|
#[serde_as(as = "Option<DisplayFromStr>")]
|
|
pub ancestor_lsn: Option<Lsn>,
|
|
#[serde_as(as = "DisplayFromStr")]
|
|
pub last_record_lsn: Lsn,
|
|
#[serde_as(as = "Option<DisplayFromStr>")]
|
|
pub prev_record_lsn: Option<Lsn>,
|
|
#[serde_as(as = "DisplayFromStr")]
|
|
pub latest_gc_cutoff_lsn: Lsn,
|
|
#[serde_as(as = "DisplayFromStr")]
|
|
pub disk_consistent_lsn: Lsn,
|
|
#[serde_as(as = "DisplayFromStr")]
|
|
pub remote_consistent_lsn: Lsn,
|
|
pub current_logical_size: Option<u64>, // is None when timeline is Unloaded
|
|
/// Sum of the size of all layer files.
|
|
/// If a layer is present in both local FS and S3, it counts only once.
|
|
pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
|
|
pub current_logical_size_non_incremental: Option<u64>,
|
|
|
|
pub timeline_dir_layer_file_size_sum: Option<u64>,
|
|
|
|
pub wal_source_connstr: Option<String>,
|
|
#[serde_as(as = "Option<DisplayFromStr>")]
|
|
pub last_received_msg_lsn: Option<Lsn>,
|
|
/// the timestamp (in microseconds) of the last received message
|
|
pub last_received_msg_ts: Option<u128>,
|
|
pub pg_version: u32,
|
|
|
|
pub state: TimelineState,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize)]
|
|
pub struct LayerMapInfo {
|
|
pub in_memory_layers: Vec<InMemoryLayerInfo>,
|
|
pub historic_layers: Vec<HistoricLayerInfo>,
|
|
}
|
|
|
|
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, enum_map::Enum)]
|
|
#[repr(usize)]
|
|
pub enum LayerAccessKind {
|
|
GetValueReconstructData,
|
|
Iter,
|
|
KeyIter,
|
|
Dump,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct LayerAccessStatFullDetails {
|
|
pub when_millis_since_epoch: u64,
|
|
pub task_kind: &'static str,
|
|
pub access_kind: LayerAccessKind,
|
|
}
|
|
|
|
/// An event that impacts the layer's residence status.
|
|
#[serde_as]
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct LayerResidenceEvent {
|
|
/// The time when the event occurred.
|
|
/// NB: this timestamp is captured while the residence status changes.
|
|
/// So, it might be behind/ahead of the actual residence change by a short amount of time.
|
|
///
|
|
#[serde(rename = "timestamp_millis_since_epoch")]
|
|
#[serde_as(as = "serde_with::TimestampMilliSeconds")]
|
|
pub timestamp: SystemTime,
|
|
/// The new residence status of the layer.
|
|
pub status: LayerResidenceStatus,
|
|
/// The reason why we had to record this event.
|
|
pub reason: LayerResidenceEventReason,
|
|
}
|
|
|
|
/// The reason for recording a given [`ResidenceEvent`].
|
|
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
|
pub enum LayerResidenceEventReason {
|
|
/// The layer map is being populated, e.g. during timeline load or attach.
|
|
/// This includes [`RemoteLayer`] objects created in [`reconcile_with_remote`].
|
|
/// We need to record such events because there is no persistent storage for the events.
|
|
LayerLoad,
|
|
/// We just created the layer (e.g., freeze_and_flush or compaction).
|
|
/// Such layers are always [`LayerResidenceStatus::Resident`].
|
|
LayerCreate,
|
|
/// We on-demand downloaded or evicted the given layer.
|
|
ResidenceChange,
|
|
}
|
|
|
|
/// The residence status of the layer, after the given [`LayerResidenceEvent`].
|
|
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
|
pub enum LayerResidenceStatus {
|
|
/// Residence status for a layer file that exists locally.
|
|
/// It may also exist on the remote, we don't care here.
|
|
Resident,
|
|
/// Residence status for a layer file that only exists on the remote.
|
|
Evicted,
|
|
}
|
|
|
|
impl LayerResidenceEvent {
|
|
pub fn new(status: LayerResidenceStatus, reason: LayerResidenceEventReason) -> Self {
|
|
Self {
|
|
status,
|
|
reason,
|
|
timestamp: SystemTime::now(),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize)]
|
|
pub struct LayerAccessStats {
|
|
pub access_count_by_access_kind: HashMap<LayerAccessKind, u64>,
|
|
pub task_kind_access_flag: Vec<&'static str>,
|
|
pub first: Option<LayerAccessStatFullDetails>,
|
|
pub accesses_history: HistoryBufferWithDropCounter<LayerAccessStatFullDetails, 16>,
|
|
pub residence_events_history: HistoryBufferWithDropCounter<LayerResidenceEvent, 16>,
|
|
}
|
|
|
|
#[serde_as]
|
|
#[derive(Debug, Clone, Serialize)]
|
|
#[serde(tag = "kind")]
|
|
pub enum InMemoryLayerInfo {
|
|
Open {
|
|
#[serde_as(as = "DisplayFromStr")]
|
|
lsn_start: Lsn,
|
|
},
|
|
Frozen {
|
|
#[serde_as(as = "DisplayFromStr")]
|
|
lsn_start: Lsn,
|
|
#[serde_as(as = "DisplayFromStr")]
|
|
lsn_end: Lsn,
|
|
},
|
|
}
|
|
|
|
#[serde_as]
|
|
#[derive(Debug, Clone, Serialize)]
|
|
#[serde(tag = "kind")]
|
|
pub enum HistoricLayerInfo {
|
|
Delta {
|
|
layer_file_name: String,
|
|
layer_file_size: u64,
|
|
|
|
#[serde_as(as = "DisplayFromStr")]
|
|
lsn_start: Lsn,
|
|
#[serde_as(as = "DisplayFromStr")]
|
|
lsn_end: Lsn,
|
|
remote: bool,
|
|
access_stats: LayerAccessStats,
|
|
},
|
|
Image {
|
|
layer_file_name: String,
|
|
layer_file_size: u64,
|
|
|
|
#[serde_as(as = "DisplayFromStr")]
|
|
lsn_start: Lsn,
|
|
remote: bool,
|
|
access_stats: LayerAccessStats,
|
|
},
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct DownloadRemoteLayersTaskSpawnRequest {
|
|
pub max_concurrent_downloads: NonZeroUsize,
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize, Clone)]
|
|
pub struct DownloadRemoteLayersTaskInfo {
|
|
pub task_id: String,
|
|
pub state: DownloadRemoteLayersTaskState,
|
|
pub total_layer_count: u64, // stable once `completed`
|
|
pub successful_download_count: u64, // stable once `completed`
|
|
pub failed_download_count: u64, // stable once `completed`
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize, Clone)]
|
|
pub enum DownloadRemoteLayersTaskState {
|
|
Running,
|
|
Completed,
|
|
ShutDown,
|
|
}
|
|
|
|
pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
|
|
|
|
/// Information for configuring a single fail point
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct FailpointConfig {
|
|
/// Name of the fail point
|
|
pub name: String,
|
|
/// List of actions to take, using the format described in `fail::cfg`
|
|
///
|
|
/// We also support `actions = "exit"` to cause the fail point to immediately exit.
|
|
pub actions: String,
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct TimelineGcRequest {
|
|
pub gc_horizon: Option<u64>,
|
|
}
|
|
|
|
// Wrapped in libpq CopyData
|
|
#[derive(PartialEq, Eq, Debug)]
|
|
pub enum PagestreamFeMessage {
|
|
Exists(PagestreamExistsRequest),
|
|
Nblocks(PagestreamNblocksRequest),
|
|
GetPage(PagestreamGetPageRequest),
|
|
DbSize(PagestreamDbSizeRequest),
|
|
}
|
|
|
|
// Wrapped in libpq CopyData
|
|
pub enum PagestreamBeMessage {
|
|
Exists(PagestreamExistsResponse),
|
|
Nblocks(PagestreamNblocksResponse),
|
|
GetPage(PagestreamGetPageResponse),
|
|
Error(PagestreamErrorResponse),
|
|
DbSize(PagestreamDbSizeResponse),
|
|
}
|
|
|
|
#[derive(Debug, PartialEq, Eq)]
|
|
pub struct PagestreamExistsRequest {
|
|
pub latest: bool,
|
|
pub lsn: Lsn,
|
|
pub rel: RelTag,
|
|
}
|
|
|
|
#[derive(Debug, PartialEq, Eq)]
|
|
pub struct PagestreamNblocksRequest {
|
|
pub latest: bool,
|
|
pub lsn: Lsn,
|
|
pub rel: RelTag,
|
|
}
|
|
|
|
#[derive(Debug, PartialEq, Eq)]
|
|
pub struct PagestreamGetPageRequest {
|
|
pub latest: bool,
|
|
pub lsn: Lsn,
|
|
pub rel: RelTag,
|
|
pub blkno: u32,
|
|
}
|
|
|
|
#[derive(Debug, PartialEq, Eq)]
|
|
pub struct PagestreamDbSizeRequest {
|
|
pub latest: bool,
|
|
pub lsn: Lsn,
|
|
pub dbnode: u32,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct PagestreamExistsResponse {
|
|
pub exists: bool,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct PagestreamNblocksResponse {
|
|
pub n_blocks: u32,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct PagestreamGetPageResponse {
|
|
pub page: Bytes,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct PagestreamErrorResponse {
|
|
pub message: String,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct PagestreamDbSizeResponse {
|
|
pub db_size: i64,
|
|
}
|
|
|
|
impl PagestreamFeMessage {
|
|
pub fn serialize(&self) -> Bytes {
|
|
let mut bytes = BytesMut::new();
|
|
|
|
match self {
|
|
Self::Exists(req) => {
|
|
bytes.put_u8(0);
|
|
bytes.put_u8(u8::from(req.latest));
|
|
bytes.put_u64(req.lsn.0);
|
|
bytes.put_u32(req.rel.spcnode);
|
|
bytes.put_u32(req.rel.dbnode);
|
|
bytes.put_u32(req.rel.relnode);
|
|
bytes.put_u8(req.rel.forknum);
|
|
}
|
|
|
|
Self::Nblocks(req) => {
|
|
bytes.put_u8(1);
|
|
bytes.put_u8(u8::from(req.latest));
|
|
bytes.put_u64(req.lsn.0);
|
|
bytes.put_u32(req.rel.spcnode);
|
|
bytes.put_u32(req.rel.dbnode);
|
|
bytes.put_u32(req.rel.relnode);
|
|
bytes.put_u8(req.rel.forknum);
|
|
}
|
|
|
|
Self::GetPage(req) => {
|
|
bytes.put_u8(2);
|
|
bytes.put_u8(u8::from(req.latest));
|
|
bytes.put_u64(req.lsn.0);
|
|
bytes.put_u32(req.rel.spcnode);
|
|
bytes.put_u32(req.rel.dbnode);
|
|
bytes.put_u32(req.rel.relnode);
|
|
bytes.put_u8(req.rel.forknum);
|
|
bytes.put_u32(req.blkno);
|
|
}
|
|
|
|
Self::DbSize(req) => {
|
|
bytes.put_u8(3);
|
|
bytes.put_u8(u8::from(req.latest));
|
|
bytes.put_u64(req.lsn.0);
|
|
bytes.put_u32(req.dbnode);
|
|
}
|
|
}
|
|
|
|
bytes.into()
|
|
}
|
|
|
|
pub fn parse<R: std::io::Read>(body: &mut R) -> anyhow::Result<PagestreamFeMessage> {
|
|
// TODO these gets can fail
|
|
|
|
// these correspond to the NeonMessageTag enum in pagestore_client.h
|
|
//
|
|
// TODO: consider using protobuf or serde bincode for less error prone
|
|
// serialization.
|
|
let msg_tag = body.read_u8()?;
|
|
match msg_tag {
|
|
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
|
|
latest: body.read_u8()? != 0,
|
|
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
|
rel: RelTag {
|
|
spcnode: body.read_u32::<BigEndian>()?,
|
|
dbnode: body.read_u32::<BigEndian>()?,
|
|
relnode: body.read_u32::<BigEndian>()?,
|
|
forknum: body.read_u8()?,
|
|
},
|
|
})),
|
|
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
|
|
latest: body.read_u8()? != 0,
|
|
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
|
rel: RelTag {
|
|
spcnode: body.read_u32::<BigEndian>()?,
|
|
dbnode: body.read_u32::<BigEndian>()?,
|
|
relnode: body.read_u32::<BigEndian>()?,
|
|
forknum: body.read_u8()?,
|
|
},
|
|
})),
|
|
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
|
|
latest: body.read_u8()? != 0,
|
|
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
|
rel: RelTag {
|
|
spcnode: body.read_u32::<BigEndian>()?,
|
|
dbnode: body.read_u32::<BigEndian>()?,
|
|
relnode: body.read_u32::<BigEndian>()?,
|
|
forknum: body.read_u8()?,
|
|
},
|
|
blkno: body.read_u32::<BigEndian>()?,
|
|
})),
|
|
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
|
|
latest: body.read_u8()? != 0,
|
|
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
|
dbnode: body.read_u32::<BigEndian>()?,
|
|
})),
|
|
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl PagestreamBeMessage {
|
|
pub fn serialize(&self) -> Bytes {
|
|
let mut bytes = BytesMut::new();
|
|
|
|
match self {
|
|
Self::Exists(resp) => {
|
|
bytes.put_u8(100); /* tag from pagestore_client.h */
|
|
bytes.put_u8(resp.exists as u8);
|
|
}
|
|
|
|
Self::Nblocks(resp) => {
|
|
bytes.put_u8(101); /* tag from pagestore_client.h */
|
|
bytes.put_u32(resp.n_blocks);
|
|
}
|
|
|
|
Self::GetPage(resp) => {
|
|
bytes.put_u8(102); /* tag from pagestore_client.h */
|
|
bytes.put(&resp.page[..]);
|
|
}
|
|
|
|
Self::Error(resp) => {
|
|
bytes.put_u8(103); /* tag from pagestore_client.h */
|
|
bytes.put(resp.message.as_bytes());
|
|
bytes.put_u8(0); // null terminator
|
|
}
|
|
Self::DbSize(resp) => {
|
|
bytes.put_u8(104); /* tag from pagestore_client.h */
|
|
bytes.put_i64(resp.db_size);
|
|
}
|
|
}
|
|
|
|
bytes.into()
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use bytes::Buf;
|
|
use serde_json::json;
|
|
|
|
use super::*;
|
|
|
|
#[test]
|
|
fn test_pagestream() {
|
|
// Test serialization/deserialization of PagestreamFeMessage
|
|
let messages = vec![
|
|
PagestreamFeMessage::Exists(PagestreamExistsRequest {
|
|
latest: true,
|
|
lsn: Lsn(4),
|
|
rel: RelTag {
|
|
forknum: 1,
|
|
spcnode: 2,
|
|
dbnode: 3,
|
|
relnode: 4,
|
|
},
|
|
}),
|
|
PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
|
|
latest: false,
|
|
lsn: Lsn(4),
|
|
rel: RelTag {
|
|
forknum: 1,
|
|
spcnode: 2,
|
|
dbnode: 3,
|
|
relnode: 4,
|
|
},
|
|
}),
|
|
PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
|
|
latest: true,
|
|
lsn: Lsn(4),
|
|
rel: RelTag {
|
|
forknum: 1,
|
|
spcnode: 2,
|
|
dbnode: 3,
|
|
relnode: 4,
|
|
},
|
|
blkno: 7,
|
|
}),
|
|
PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
|
|
latest: true,
|
|
lsn: Lsn(4),
|
|
dbnode: 7,
|
|
}),
|
|
];
|
|
for msg in messages {
|
|
let bytes = msg.serialize();
|
|
let reconstructed = PagestreamFeMessage::parse(&mut bytes.reader()).unwrap();
|
|
assert!(msg == reconstructed);
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn test_tenantinfo_serde() {
|
|
// Test serialization/deserialization of TenantInfo
|
|
let original_active = TenantInfo {
|
|
id: TenantId::generate(),
|
|
state: TenantState::Active,
|
|
current_physical_size: Some(42),
|
|
attachment_status: TenantAttachmentStatus::Attached,
|
|
};
|
|
let expected_active = json!({
|
|
"id": original_active.id.to_string(),
|
|
"state": {
|
|
"slug": "Active",
|
|
},
|
|
"current_physical_size": 42,
|
|
"attachment_status": "attached",
|
|
});
|
|
|
|
let original_broken = TenantInfo {
|
|
id: TenantId::generate(),
|
|
state: TenantState::Broken {
|
|
reason: "reason".into(),
|
|
backtrace: "backtrace info".into(),
|
|
},
|
|
current_physical_size: Some(42),
|
|
attachment_status: TenantAttachmentStatus::Attached,
|
|
};
|
|
let expected_broken = json!({
|
|
"id": original_broken.id.to_string(),
|
|
"state": {
|
|
"slug": "Broken",
|
|
"data": {
|
|
"backtrace": "backtrace info",
|
|
"reason": "reason",
|
|
}
|
|
},
|
|
"current_physical_size": 42,
|
|
"attachment_status": "attached",
|
|
});
|
|
|
|
assert_eq!(
|
|
serde_json::to_value(&original_active).unwrap(),
|
|
expected_active
|
|
);
|
|
|
|
assert_eq!(
|
|
serde_json::to_value(&original_broken).unwrap(),
|
|
expected_broken
|
|
);
|
|
assert!(format!("{:?}", &original_broken.state).contains("reason"));
|
|
assert!(format!("{:?}", &original_broken.state).contains("backtrace info"));
|
|
}
|
|
}
|