mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 21:20:37 +00:00
Use serde_as DisplayFromStr everywhere (#4103)
We used `display_serialize` previously, but it works only for Serialize. `DisplayFromStr` does the same, but also works for Deserialize.
This commit is contained in:
committed by
GitHub
parent
fdacfaabfd
commit
b03143dfc8
@@ -1,9 +1,7 @@
|
||||
use std::fmt::Display;
|
||||
|
||||
use anyhow::Context;
|
||||
use bytes::Buf;
|
||||
use hyper::{header, Body, Request, Response, StatusCode};
|
||||
use serde::{Deserialize, Serialize, Serializer};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::error::ApiError;
|
||||
|
||||
@@ -33,12 +31,3 @@ pub fn json_response<T: Serialize>(
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
/// Serialize through Display trait.
|
||||
pub fn display_serialize<S, F>(z: &F, s: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
F: Display,
|
||||
{
|
||||
s.serialize_str(&format!("{}", z))
|
||||
}
|
||||
|
||||
@@ -265,6 +265,26 @@ impl fmt::Display for TenantTimelineId {
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for TenantTimelineId {
|
||||
type Err = anyhow::Error;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
let mut parts = s.split('/');
|
||||
let tenant_id = parts
|
||||
.next()
|
||||
.ok_or_else(|| anyhow::anyhow!("TenantTimelineId must contain tenant_id"))?
|
||||
.parse()?;
|
||||
let timeline_id = parts
|
||||
.next()
|
||||
.ok_or_else(|| anyhow::anyhow!("TenantTimelineId must contain timeline_id"))?
|
||||
.parse()?;
|
||||
if parts.next().is_some() {
|
||||
anyhow::bail!("TenantTimelineId must contain only tenant_id and timeline_id");
|
||||
}
|
||||
Ok(TenantTimelineId::new(tenant_id, timeline_id))
|
||||
}
|
||||
}
|
||||
|
||||
// Unique ID of a storage node (safekeeper or pageserver). Supposed to be issued
|
||||
// by the console.
|
||||
#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Hash, Debug, Serialize, Deserialize)]
|
||||
|
||||
@@ -1,21 +1,12 @@
|
||||
use std::time::{Duration, SystemTime};
|
||||
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use chrono::{DateTime, Utc};
|
||||
use pq_proto::{read_cstr, PG_EPOCH};
|
||||
use serde::{Serialize, Serializer};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::{serde_as, DisplayFromStr};
|
||||
use tracing::{trace, warn};
|
||||
|
||||
use crate::{http::json::display_serialize, lsn::Lsn};
|
||||
|
||||
// serialize SystemTime as ISO string in UTC.
|
||||
fn serialize_system_time_iso<S>(ts: &SystemTime, s: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
let chrono_dt: DateTime<Utc> = (*ts).into();
|
||||
s.serialize_str(&chrono_dt.to_rfc3339())
|
||||
}
|
||||
use crate::lsn::Lsn;
|
||||
|
||||
/// Feedback pageserver sends to safekeeper and safekeeper resends to compute.
|
||||
/// Serialized in custom flexible key/value format. In replication protocol, it
|
||||
@@ -24,22 +15,24 @@ where
|
||||
///
|
||||
/// serde Serialize is used only for human readable dump to json (e.g. in
|
||||
/// safekeepers debug_dump).
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
|
||||
#[serde_as]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct PageserverFeedback {
|
||||
/// Last known size of the timeline. Used to enforce timeline size limit.
|
||||
pub current_timeline_size: u64,
|
||||
/// LSN last received and ingested by the pageserver. Controls backpressure.
|
||||
#[serde(serialize_with = "display_serialize")]
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub last_received_lsn: Lsn,
|
||||
/// LSN up to which data is persisted by the pageserver to its local disc.
|
||||
/// Controls backpressure.
|
||||
#[serde(serialize_with = "display_serialize")]
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub disk_consistent_lsn: Lsn,
|
||||
/// LSN up to which data is persisted by the pageserver on s3; safekeepers
|
||||
/// consider WAL before it can be removed.
|
||||
#[serde(serialize_with = "display_serialize")]
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub remote_consistent_lsn: Lsn,
|
||||
#[serde(serialize_with = "serialize_system_time_iso")]
|
||||
// Serialize with RFC3339 format.
|
||||
#[serde(with = "serde_systemtime")]
|
||||
pub replytime: SystemTime,
|
||||
}
|
||||
|
||||
@@ -150,6 +143,31 @@ impl PageserverFeedback {
|
||||
}
|
||||
}
|
||||
|
||||
mod serde_systemtime {
|
||||
use std::time::SystemTime;
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::{Deserialize, Deserializer, Serializer};
|
||||
|
||||
pub fn serialize<S>(ts: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
let chrono_dt: DateTime<Utc> = (*ts).into();
|
||||
serializer.serialize_str(&chrono_dt.to_rfc3339())
|
||||
}
|
||||
|
||||
pub fn deserialize<'de, D>(deserializer: D) -> Result<SystemTime, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
let time: String = Deserialize::deserialize(deserializer)?;
|
||||
Ok(DateTime::parse_from_rfc3339(&time)
|
||||
.map_err(serde::de::Error::custom)?
|
||||
.into())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -9,9 +9,10 @@ use std::path::PathBuf;
|
||||
use anyhow::Result;
|
||||
use chrono::{DateTime, Utc};
|
||||
use postgres_ffi::XLogSegNo;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
|
||||
use utils::http::json::display_serialize;
|
||||
use serde_with::{serde_as, DisplayFromStr};
|
||||
use utils::id::NodeId;
|
||||
use utils::id::TenantTimelineId;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
@@ -26,7 +27,7 @@ use crate::send_wal::WalSenderState;
|
||||
use crate::GlobalTimelines;
|
||||
|
||||
/// Various filters that influence the resulting JSON output.
|
||||
#[derive(Debug, Serialize)]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct Args {
|
||||
/// Dump all available safekeeper state. False by default.
|
||||
pub dump_all: bool,
|
||||
@@ -51,7 +52,7 @@ pub struct Args {
|
||||
}
|
||||
|
||||
/// Response for debug dump request.
|
||||
#[derive(Debug, Serialize)]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct Response {
|
||||
pub start_time: DateTime<Utc>,
|
||||
pub finish_time: DateTime<Utc>,
|
||||
@@ -61,7 +62,7 @@ pub struct Response {
|
||||
}
|
||||
|
||||
/// Safekeeper configuration.
|
||||
#[derive(Debug, Serialize)]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct Config {
|
||||
pub id: NodeId,
|
||||
pub workdir: PathBuf,
|
||||
@@ -72,18 +73,19 @@ pub struct Config {
|
||||
pub wal_backup_enabled: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
#[serde_as]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct Timeline {
|
||||
#[serde(serialize_with = "display_serialize")]
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub tenant_id: TenantId,
|
||||
#[serde(serialize_with = "display_serialize")]
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub timeline_id: TimelineId,
|
||||
pub control_file: Option<SafeKeeperState>,
|
||||
pub memory: Option<Memory>,
|
||||
pub disk_content: Option<DiskContent>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct Memory {
|
||||
pub is_cancelled: bool,
|
||||
pub peers_info_len: usize,
|
||||
@@ -102,12 +104,12 @@ pub struct Memory {
|
||||
pub file_open: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct DiskContent {
|
||||
pub files: Vec<FileInfo>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct FileInfo {
|
||||
pub name: String,
|
||||
pub size: u64,
|
||||
|
||||
@@ -3,7 +3,8 @@ use hyper::{Body, Request, Response, StatusCode, Uri};
|
||||
use once_cell::sync::Lazy;
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
use safekeeper_api::models::SkTimelineInfo;
|
||||
use serde::Serialize;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::{serde_as, DisplayFromStr};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fmt;
|
||||
use std::str::FromStr;
|
||||
@@ -11,7 +12,6 @@ use std::sync::Arc;
|
||||
use storage_broker::proto::SafekeeperTimelineInfo;
|
||||
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
|
||||
use tokio::task::JoinError;
|
||||
use utils::http::json::display_serialize;
|
||||
|
||||
use crate::debug_dump;
|
||||
use crate::safekeeper::ServerInfo;
|
||||
@@ -57,44 +57,46 @@ fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {
|
||||
|
||||
/// Same as TermSwitchEntry, but serializes LSN using display serializer
|
||||
/// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response.
|
||||
#[derive(Debug, Serialize)]
|
||||
struct TermSwitchApiEntry {
|
||||
#[serde_as]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct TermSwitchApiEntry {
|
||||
pub term: Term,
|
||||
#[serde(serialize_with = "display_serialize")]
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub lsn: Lsn,
|
||||
}
|
||||
|
||||
/// Augment AcceptorState with epoch for convenience
|
||||
#[derive(Debug, Serialize)]
|
||||
struct AcceptorStateStatus {
|
||||
term: Term,
|
||||
epoch: Term,
|
||||
term_history: Vec<TermSwitchApiEntry>,
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct AcceptorStateStatus {
|
||||
pub term: Term,
|
||||
pub epoch: Term,
|
||||
pub term_history: Vec<TermSwitchApiEntry>,
|
||||
}
|
||||
|
||||
/// Info about timeline on safekeeper ready for reporting.
|
||||
#[derive(Debug, Serialize)]
|
||||
struct TimelineStatus {
|
||||
#[serde(serialize_with = "display_serialize")]
|
||||
tenant_id: TenantId,
|
||||
#[serde(serialize_with = "display_serialize")]
|
||||
timeline_id: TimelineId,
|
||||
acceptor_state: AcceptorStateStatus,
|
||||
pg_info: ServerInfo,
|
||||
#[serde(serialize_with = "display_serialize")]
|
||||
flush_lsn: Lsn,
|
||||
#[serde(serialize_with = "display_serialize")]
|
||||
timeline_start_lsn: Lsn,
|
||||
#[serde(serialize_with = "display_serialize")]
|
||||
local_start_lsn: Lsn,
|
||||
#[serde(serialize_with = "display_serialize")]
|
||||
commit_lsn: Lsn,
|
||||
#[serde(serialize_with = "display_serialize")]
|
||||
backup_lsn: Lsn,
|
||||
#[serde(serialize_with = "display_serialize")]
|
||||
peer_horizon_lsn: Lsn,
|
||||
#[serde(serialize_with = "display_serialize")]
|
||||
remote_consistent_lsn: Lsn,
|
||||
#[serde_as]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct TimelineStatus {
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub tenant_id: TenantId,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub timeline_id: TimelineId,
|
||||
pub acceptor_state: AcceptorStateStatus,
|
||||
pub pg_info: ServerInfo,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub flush_lsn: Lsn,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub timeline_start_lsn: Lsn,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub local_start_lsn: Lsn,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub commit_lsn: Lsn,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub backup_lsn: Lsn,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub peer_horizon_lsn: Lsn,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub remote_consistent_lsn: Lsn,
|
||||
}
|
||||
|
||||
fn check_permission(request: &Request<Body>, tenant_id: Option<TenantId>) -> Result<(), ApiError> {
|
||||
|
||||
@@ -206,7 +206,7 @@ pub struct SafeKeeperState {
|
||||
pub peers: PersistedPeers,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
// In memory safekeeper state. Fields mirror ones in `SafeKeeperState`; values
|
||||
// are not flushed yet.
|
||||
pub struct SafekeeperMemState {
|
||||
|
||||
@@ -15,8 +15,8 @@ use postgres_ffi::get_current_timestamp;
|
||||
use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
|
||||
use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::{serde_as, DisplayFromStr};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use utils::http::json::display_serialize;
|
||||
use utils::id::TenantTimelineId;
|
||||
use utils::lsn::AtomicLsn;
|
||||
use utils::pageserver_feedback::PageserverFeedback;
|
||||
@@ -81,7 +81,7 @@ impl StandbyReply {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Serialize)]
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
||||
pub struct StandbyFeedback {
|
||||
reply: StandbyReply,
|
||||
hs_feedback: HotStandbyFeedback,
|
||||
@@ -312,9 +312,10 @@ impl WalSendersShared {
|
||||
}
|
||||
|
||||
// Serialized is used only for pretty printing in json.
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
#[serde_as]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct WalSenderState {
|
||||
#[serde(serialize_with = "display_serialize")]
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
ttid: TenantTimelineId,
|
||||
addr: SocketAddr,
|
||||
conn_id: ConnectionId,
|
||||
@@ -325,7 +326,7 @@ pub struct WalSenderState {
|
||||
|
||||
// Receiver is either pageserver or regular standby, which have different
|
||||
// feedbacks.
|
||||
#[derive(Debug, Clone, Copy, Serialize)]
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
||||
enum ReplicationFeedback {
|
||||
Pageserver(PageserverFeedback),
|
||||
Standby(StandbyFeedback),
|
||||
|
||||
Reference in New Issue
Block a user