use std::time::{Duration, SystemTime}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use pq_proto::{read_cstr, PG_EPOCH}; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; use tracing::{trace, warn}; use crate::lsn::Lsn; /// Feedback pageserver sends to safekeeper and safekeeper resends to compute. /// Serialized in custom flexible key/value format. In replication protocol, it /// is marked with NEON_STATUS_UPDATE_TAG_BYTE to differentiate from postgres /// Standby status update / Hot standby feedback messages. /// /// serde Serialize is used only for human readable dump to json (e.g. in /// safekeepers debug_dump). #[serde_as] #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub struct PageserverFeedback { /// Last known size of the timeline. Used to enforce timeline size limit. pub current_timeline_size: u64, /// LSN last received and ingested by the pageserver. Controls backpressure. #[serde_as(as = "DisplayFromStr")] pub last_received_lsn: Lsn, /// LSN up to which data is persisted by the pageserver to its local disc. /// Controls backpressure. #[serde_as(as = "DisplayFromStr")] pub disk_consistent_lsn: Lsn, /// LSN up to which data is persisted by the pageserver on s3; safekeepers /// consider WAL before it can be removed. #[serde_as(as = "DisplayFromStr")] pub remote_consistent_lsn: Lsn, // Serialize with RFC3339 format. #[serde(with = "serde_systemtime")] pub replytime: SystemTime, } // NOTE: Do not forget to increment this number when adding new fields to PageserverFeedback. // Do not remove previously available fields because this might be backwards incompatible. pub const PAGESERVER_FEEDBACK_FIELDS_NUMBER: u8 = 5; impl PageserverFeedback { pub fn empty() -> PageserverFeedback { PageserverFeedback { current_timeline_size: 0, last_received_lsn: Lsn::INVALID, remote_consistent_lsn: Lsn::INVALID, disk_consistent_lsn: Lsn::INVALID, replytime: *PG_EPOCH, } } // Serialize PageserverFeedback using custom format // to support protocol extensibility. // // Following layout is used: // char - number of key-value pairs that follow. // // key-value pairs: // null-terminated string - key, // uint32 - value length in bytes // value itself // // TODO: change serialized fields names once all computes migrate to rename. pub fn serialize(&self, buf: &mut BytesMut) { buf.put_u8(PAGESERVER_FEEDBACK_FIELDS_NUMBER); // # of keys buf.put_slice(b"current_timeline_size\0"); buf.put_i32(8); buf.put_u64(self.current_timeline_size); buf.put_slice(b"ps_writelsn\0"); buf.put_i32(8); buf.put_u64(self.last_received_lsn.0); buf.put_slice(b"ps_flushlsn\0"); buf.put_i32(8); buf.put_u64(self.disk_consistent_lsn.0); buf.put_slice(b"ps_applylsn\0"); buf.put_i32(8); buf.put_u64(self.remote_consistent_lsn.0); let timestamp = self .replytime .duration_since(*PG_EPOCH) .expect("failed to serialize pg_replytime earlier than PG_EPOCH") .as_micros() as i64; buf.put_slice(b"ps_replytime\0"); buf.put_i32(8); buf.put_i64(timestamp); } // Deserialize PageserverFeedback message // TODO: change serialized fields names once all computes migrate to rename. pub fn parse(mut buf: Bytes) -> PageserverFeedback { let mut rf = PageserverFeedback::empty(); let nfields = buf.get_u8(); for _ in 0..nfields { let key = read_cstr(&mut buf).unwrap(); match key.as_ref() { b"current_timeline_size" => { let len = buf.get_i32(); assert_eq!(len, 8); rf.current_timeline_size = buf.get_u64(); } b"ps_writelsn" => { let len = buf.get_i32(); assert_eq!(len, 8); rf.last_received_lsn = Lsn(buf.get_u64()); } b"ps_flushlsn" => { let len = buf.get_i32(); assert_eq!(len, 8); rf.disk_consistent_lsn = Lsn(buf.get_u64()); } b"ps_applylsn" => { let len = buf.get_i32(); assert_eq!(len, 8); rf.remote_consistent_lsn = Lsn(buf.get_u64()); } b"ps_replytime" => { let len = buf.get_i32(); assert_eq!(len, 8); let raw_time = buf.get_i64(); if raw_time > 0 { rf.replytime = *PG_EPOCH + Duration::from_micros(raw_time as u64); } else { rf.replytime = *PG_EPOCH - Duration::from_micros(-raw_time as u64); } } _ => { let len = buf.get_i32(); warn!( "PageserverFeedback parse. unknown key {} of len {len}. Skip it.", String::from_utf8_lossy(key.as_ref()) ); buf.advance(len as usize); } } } trace!("PageserverFeedback parsed is {:?}", rf); rf } } mod serde_systemtime { use std::time::SystemTime; use chrono::{DateTime, Utc}; use serde::{Deserialize, Deserializer, Serializer}; pub fn serialize(ts: &SystemTime, serializer: S) -> Result where S: Serializer, { let chrono_dt: DateTime = (*ts).into(); serializer.serialize_str(&chrono_dt.to_rfc3339()) } pub fn deserialize<'de, D>(deserializer: D) -> Result where D: Deserializer<'de>, { let time: String = Deserialize::deserialize(deserializer)?; Ok(DateTime::parse_from_rfc3339(&time) .map_err(serde::de::Error::custom)? .into()) } } #[cfg(test)] mod tests { use super::*; #[test] fn test_replication_feedback_serialization() { let mut rf = PageserverFeedback::empty(); // Fill rf with some values rf.current_timeline_size = 12345678; // Set rounded time to be able to compare it with deserialized value, // because it is rounded up to microseconds during serialization. rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000); let mut data = BytesMut::new(); rf.serialize(&mut data); let rf_parsed = PageserverFeedback::parse(data.freeze()); assert_eq!(rf, rf_parsed); } #[test] fn test_replication_feedback_unknown_key() { let mut rf = PageserverFeedback::empty(); // Fill rf with some values rf.current_timeline_size = 12345678; // Set rounded time to be able to compare it with deserialized value, // because it is rounded up to microseconds during serialization. rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000); let mut data = BytesMut::new(); rf.serialize(&mut data); // Add an extra field to the buffer and adjust number of keys if let Some(first) = data.first_mut() { *first = PAGESERVER_FEEDBACK_FIELDS_NUMBER + 1; } data.put_slice(b"new_field_one\0"); data.put_i32(8); data.put_u64(42); // Parse serialized data and check that new field is not parsed let rf_parsed = PageserverFeedback::parse(data.freeze()); assert_eq!(rf, rf_parsed); } }