From b03143dfc8bf032cdea0d247e74b6bc0338221f9 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Fri, 28 Apr 2023 13:55:07 +0300 Subject: [PATCH] Use serde_as DisplayFromStr everywhere (#4103) We used `display_serialize` previously, but it works only for Serialize. `DisplayFromStr` does the same, but also works for Deserialize. --- libs/utils/src/http/json.rs | 13 +----- libs/utils/src/id.rs | 20 ++++++++ libs/utils/src/pageserver_feedback.rs | 52 ++++++++++++++------- safekeeper/src/debug_dump.rs | 22 +++++---- safekeeper/src/http/routes.rs | 66 ++++++++++++++------------- safekeeper/src/safekeeper.rs | 2 +- safekeeper/src/send_wal.rs | 11 +++-- 7 files changed, 109 insertions(+), 77 deletions(-) diff --git a/libs/utils/src/http/json.rs b/libs/utils/src/http/json.rs index 40e61e3d0c..8981fdd1dd 100644 --- a/libs/utils/src/http/json.rs +++ b/libs/utils/src/http/json.rs @@ -1,9 +1,7 @@ -use std::fmt::Display; - use anyhow::Context; use bytes::Buf; use hyper::{header, Body, Request, Response, StatusCode}; -use serde::{Deserialize, Serialize, Serializer}; +use serde::{Deserialize, Serialize}; use super::error::ApiError; @@ -33,12 +31,3 @@ pub fn json_response( .map_err(|e| ApiError::InternalServerError(e.into()))?; Ok(response) } - -/// Serialize through Display trait. -pub fn display_serialize(z: &F, s: S) -> Result -where - S: Serializer, - F: Display, -{ - s.serialize_str(&format!("{}", z)) -} diff --git a/libs/utils/src/id.rs b/libs/utils/src/id.rs index b27c5cda35..20b601f68d 100644 --- a/libs/utils/src/id.rs +++ b/libs/utils/src/id.rs @@ -265,6 +265,26 @@ impl fmt::Display for TenantTimelineId { } } +impl FromStr for TenantTimelineId { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + let mut parts = s.split('/'); + let tenant_id = parts + .next() + .ok_or_else(|| anyhow::anyhow!("TenantTimelineId must contain tenant_id"))? + .parse()?; + let timeline_id = parts + .next() + .ok_or_else(|| anyhow::anyhow!("TenantTimelineId must contain timeline_id"))? + .parse()?; + if parts.next().is_some() { + anyhow::bail!("TenantTimelineId must contain only tenant_id and timeline_id"); + } + Ok(TenantTimelineId::new(tenant_id, timeline_id)) + } +} + // Unique ID of a storage node (safekeeper or pageserver). Supposed to be issued // by the console. #[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Hash, Debug, Serialize, Deserialize)] diff --git a/libs/utils/src/pageserver_feedback.rs b/libs/utils/src/pageserver_feedback.rs index ae2150b6df..a3b53201d3 100644 --- a/libs/utils/src/pageserver_feedback.rs +++ b/libs/utils/src/pageserver_feedback.rs @@ -1,21 +1,12 @@ use std::time::{Duration, SystemTime}; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use chrono::{DateTime, Utc}; use pq_proto::{read_cstr, PG_EPOCH}; -use serde::{Serialize, Serializer}; +use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DisplayFromStr}; use tracing::{trace, warn}; -use crate::{http::json::display_serialize, lsn::Lsn}; - -// serialize SystemTime as ISO string in UTC. -fn serialize_system_time_iso(ts: &SystemTime, s: S) -> Result -where - S: Serializer, -{ - let chrono_dt: DateTime = (*ts).into(); - s.serialize_str(&chrono_dt.to_rfc3339()) -} +use crate::lsn::Lsn; /// Feedback pageserver sends to safekeeper and safekeeper resends to compute. /// Serialized in custom flexible key/value format. In replication protocol, it @@ -24,22 +15,24 @@ where /// /// serde Serialize is used only for human readable dump to json (e.g. in /// safekeepers debug_dump). -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde_as] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub struct PageserverFeedback { /// Last known size of the timeline. Used to enforce timeline size limit. pub current_timeline_size: u64, /// LSN last received and ingested by the pageserver. Controls backpressure. - #[serde(serialize_with = "display_serialize")] + #[serde_as(as = "DisplayFromStr")] pub last_received_lsn: Lsn, /// LSN up to which data is persisted by the pageserver to its local disc. /// Controls backpressure. - #[serde(serialize_with = "display_serialize")] + #[serde_as(as = "DisplayFromStr")] pub disk_consistent_lsn: Lsn, /// LSN up to which data is persisted by the pageserver on s3; safekeepers /// consider WAL before it can be removed. - #[serde(serialize_with = "display_serialize")] + #[serde_as(as = "DisplayFromStr")] pub remote_consistent_lsn: Lsn, - #[serde(serialize_with = "serialize_system_time_iso")] + // Serialize with RFC3339 format. + #[serde(with = "serde_systemtime")] pub replytime: SystemTime, } @@ -150,6 +143,31 @@ impl PageserverFeedback { } } +mod serde_systemtime { + use std::time::SystemTime; + + use chrono::{DateTime, Utc}; + use serde::{Deserialize, Deserializer, Serializer}; + + pub fn serialize(ts: &SystemTime, serializer: S) -> Result + where + S: Serializer, + { + let chrono_dt: DateTime = (*ts).into(); + serializer.serialize_str(&chrono_dt.to_rfc3339()) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let time: String = Deserialize::deserialize(deserializer)?; + Ok(DateTime::parse_from_rfc3339(&time) + .map_err(serde::de::Error::custom)? + .into()) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/safekeeper/src/debug_dump.rs b/safekeeper/src/debug_dump.rs index 954fbfc438..f711c4429d 100644 --- a/safekeeper/src/debug_dump.rs +++ b/safekeeper/src/debug_dump.rs @@ -9,9 +9,10 @@ use std::path::PathBuf; use anyhow::Result; use chrono::{DateTime, Utc}; use postgres_ffi::XLogSegNo; +use serde::Deserialize; use serde::Serialize; -use utils::http::json::display_serialize; +use serde_with::{serde_as, DisplayFromStr}; use utils::id::NodeId; use utils::id::TenantTimelineId; use utils::id::{TenantId, TimelineId}; @@ -26,7 +27,7 @@ use crate::send_wal::WalSenderState; use crate::GlobalTimelines; /// Various filters that influence the resulting JSON output. -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct Args { /// Dump all available safekeeper state. False by default. pub dump_all: bool, @@ -51,7 +52,7 @@ pub struct Args { } /// Response for debug dump request. -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct Response { pub start_time: DateTime, pub finish_time: DateTime, @@ -61,7 +62,7 @@ pub struct Response { } /// Safekeeper configuration. -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct Config { pub id: NodeId, pub workdir: PathBuf, @@ -72,18 +73,19 @@ pub struct Config { pub wal_backup_enabled: bool, } -#[derive(Debug, Serialize)] +#[serde_as] +#[derive(Debug, Serialize, Deserialize)] pub struct Timeline { - #[serde(serialize_with = "display_serialize")] + #[serde_as(as = "DisplayFromStr")] pub tenant_id: TenantId, - #[serde(serialize_with = "display_serialize")] + #[serde_as(as = "DisplayFromStr")] pub timeline_id: TimelineId, pub control_file: Option, pub memory: Option, pub disk_content: Option, } -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct Memory { pub is_cancelled: bool, pub peers_info_len: usize, @@ -102,12 +104,12 @@ pub struct Memory { pub file_open: bool, } -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct DiskContent { pub files: Vec, } -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct FileInfo { pub name: String, pub size: u64, diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index ef691c5fe6..eeb08d2733 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -3,7 +3,8 @@ use hyper::{Body, Request, Response, StatusCode, Uri}; use once_cell::sync::Lazy; use postgres_ffi::WAL_SEGMENT_SIZE; use safekeeper_api::models::SkTimelineInfo; -use serde::Serialize; +use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DisplayFromStr}; use std::collections::{HashMap, HashSet}; use std::fmt; use std::str::FromStr; @@ -11,7 +12,6 @@ use std::sync::Arc; use storage_broker::proto::SafekeeperTimelineInfo; use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; use tokio::task::JoinError; -use utils::http::json::display_serialize; use crate::debug_dump; use crate::safekeeper::ServerInfo; @@ -57,44 +57,46 @@ fn get_conf(request: &Request) -> &SafeKeeperConf { /// Same as TermSwitchEntry, but serializes LSN using display serializer /// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response. -#[derive(Debug, Serialize)] -struct TermSwitchApiEntry { +#[serde_as] +#[derive(Debug, Serialize, Deserialize)] +pub struct TermSwitchApiEntry { pub term: Term, - #[serde(serialize_with = "display_serialize")] + #[serde_as(as = "DisplayFromStr")] pub lsn: Lsn, } /// Augment AcceptorState with epoch for convenience -#[derive(Debug, Serialize)] -struct AcceptorStateStatus { - term: Term, - epoch: Term, - term_history: Vec, +#[derive(Debug, Serialize, Deserialize)] +pub struct AcceptorStateStatus { + pub term: Term, + pub epoch: Term, + pub term_history: Vec, } /// Info about timeline on safekeeper ready for reporting. -#[derive(Debug, Serialize)] -struct TimelineStatus { - #[serde(serialize_with = "display_serialize")] - tenant_id: TenantId, - #[serde(serialize_with = "display_serialize")] - timeline_id: TimelineId, - acceptor_state: AcceptorStateStatus, - pg_info: ServerInfo, - #[serde(serialize_with = "display_serialize")] - flush_lsn: Lsn, - #[serde(serialize_with = "display_serialize")] - timeline_start_lsn: Lsn, - #[serde(serialize_with = "display_serialize")] - local_start_lsn: Lsn, - #[serde(serialize_with = "display_serialize")] - commit_lsn: Lsn, - #[serde(serialize_with = "display_serialize")] - backup_lsn: Lsn, - #[serde(serialize_with = "display_serialize")] - peer_horizon_lsn: Lsn, - #[serde(serialize_with = "display_serialize")] - remote_consistent_lsn: Lsn, +#[serde_as] +#[derive(Debug, Serialize, Deserialize)] +pub struct TimelineStatus { + #[serde_as(as = "DisplayFromStr")] + pub tenant_id: TenantId, + #[serde_as(as = "DisplayFromStr")] + pub timeline_id: TimelineId, + pub acceptor_state: AcceptorStateStatus, + pub pg_info: ServerInfo, + #[serde_as(as = "DisplayFromStr")] + pub flush_lsn: Lsn, + #[serde_as(as = "DisplayFromStr")] + pub timeline_start_lsn: Lsn, + #[serde_as(as = "DisplayFromStr")] + pub local_start_lsn: Lsn, + #[serde_as(as = "DisplayFromStr")] + pub commit_lsn: Lsn, + #[serde_as(as = "DisplayFromStr")] + pub backup_lsn: Lsn, + #[serde_as(as = "DisplayFromStr")] + pub peer_horizon_lsn: Lsn, + #[serde_as(as = "DisplayFromStr")] + pub remote_consistent_lsn: Lsn, } fn check_permission(request: &Request, tenant_id: Option) -> Result<(), ApiError> { diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index f9ba4c763b..33da0c8e5a 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -206,7 +206,7 @@ pub struct SafeKeeperState { pub peers: PersistedPeers, } -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] // In memory safekeeper state. Fields mirror ones in `SafeKeeperState`; values // are not flushed yet. pub struct SafekeeperMemState { diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index ae4def3906..6b303eb0fe 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -15,8 +15,8 @@ use postgres_ffi::get_current_timestamp; use postgres_ffi::{TimestampTz, MAX_SEND_SIZE}; use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody}; use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DisplayFromStr}; use tokio::io::{AsyncRead, AsyncWrite}; -use utils::http::json::display_serialize; use utils::id::TenantTimelineId; use utils::lsn::AtomicLsn; use utils::pageserver_feedback::PageserverFeedback; @@ -81,7 +81,7 @@ impl StandbyReply { } } -#[derive(Debug, Clone, Copy, Serialize)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub struct StandbyFeedback { reply: StandbyReply, hs_feedback: HotStandbyFeedback, @@ -312,9 +312,10 @@ impl WalSendersShared { } // Serialized is used only for pretty printing in json. -#[derive(Debug, Clone, Serialize)] +#[serde_as] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct WalSenderState { - #[serde(serialize_with = "display_serialize")] + #[serde_as(as = "DisplayFromStr")] ttid: TenantTimelineId, addr: SocketAddr, conn_id: ConnectionId, @@ -325,7 +326,7 @@ pub struct WalSenderState { // Receiver is either pageserver or regular standby, which have different // feedbacks. -#[derive(Debug, Clone, Copy, Serialize)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] enum ReplicationFeedback { Pageserver(PageserverFeedback), Standby(StandbyFeedback),