Move PageserverFeedback to utils.

It allows to replace u64 with proper Lsn and pretty print PageserverFeedback
with serde(_json). Now walsenders on safekeepers queried with debug_dump look
like

"walsenders": [
  {
    "ttid": "fafe0cf39a99c608c872706149de9d2a/b4fb3be6f576935e7f0fcb84bdb909a1",
    "addr": "127.0.0.1:48774",
    "conn_id": 3,
    "appname": "pageserver",
    "feedback": {
      "Pageserver": {
	"current_timeline_size": 32096256,
	"last_received_lsn": "0/2415298",
	"disk_consistent_lsn": "0/1696628",
	"remote_consistent_lsn": "0/0",
	"replytime": "2023-04-12T13:54:53.958856+00:00"
      }
    }
  }
],
This commit is contained in:
Arseny Sher
2023-04-12 17:58:19 +04:00
committed by Arseny Sher
parent b2a3981ead
commit fdacfaabfd
12 changed files with 222 additions and 195 deletions

5
Cargo.lock generated
View File

@@ -2959,7 +2959,6 @@ dependencies = [
"pin-project-lite",
"postgres-protocol",
"rand",
"serde",
"thiserror",
"tokio",
"tracing",
@@ -4865,6 +4864,7 @@ dependencies = [
"bincode",
"byteorder",
"bytes",
"chrono",
"criterion",
"futures",
"heapless",
@@ -4876,6 +4876,7 @@ dependencies = [
"nix",
"once_cell",
"pin-project-lite",
"pq_proto",
"rand",
"regex",
"routerify",
@@ -5291,13 +5292,11 @@ name = "workspace_hack"
version = "0.1.0"
dependencies = [
"anyhow",
"byteorder",
"bytes",
"chrono",
"clap 4.2.2",
"clap_builder",
"crossbeam-utils",
"digest",
"either",
"fail",
"futures",

View File

@@ -10,7 +10,6 @@ byteorder.workspace = true
pin-project-lite.workspace = true
postgres-protocol.workspace = true
rand.workspace = true
serde.workspace = true
tokio.workspace = true
tracing.workspace = true
thiserror.workspace = true

View File

@@ -6,15 +6,10 @@ pub mod framed;
use byteorder::{BigEndian, ReadBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use postgres_protocol::PG_EPOCH;
use serde::{Deserialize, Serialize};
use std::{
borrow::Cow,
collections::HashMap,
fmt, io, str,
time::{Duration, SystemTime},
};
use tracing::{trace, warn};
use std::{borrow::Cow, collections::HashMap, fmt, io, str};
// re-export for use in utils pageserver_feedback.rs
pub use postgres_protocol::PG_EPOCH;
pub type Oid = u32;
pub type SystemId = u64;
@@ -664,7 +659,7 @@ fn write_cstr(s: impl AsRef<[u8]>, buf: &mut BytesMut) -> Result<(), ProtocolErr
}
/// Read cstring from buf, advancing it.
fn read_cstr(buf: &mut Bytes) -> Result<Bytes, ProtocolError> {
pub fn read_cstr(buf: &mut Bytes) -> Result<Bytes, ProtocolError> {
let pos = buf
.iter()
.position(|x| *x == 0)
@@ -939,176 +934,10 @@ impl<'a> BeMessage<'a> {
}
}
/// Feedback pageserver sends to safekeeper and safekeeper resends to compute.
/// Serialized in custom flexible key/value format. In replication protocol, it
/// is marked with NEON_STATUS_UPDATE_TAG_BYTE to differentiate from postgres
/// Standby status update / Hot standby feedback messages.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct PageserverFeedback {
/// Last known size of the timeline. Used to enforce timeline size limit.
pub current_timeline_size: u64,
/// LSN last received and ingested by the pageserver. Controls backpressure.
pub last_received_lsn: u64,
/// LSN up to which data is persisted by the pageserver to its local disc.
/// Controls backpressure.
pub disk_consistent_lsn: u64,
/// LSN up to which data is persisted by the pageserver on s3; safekeepers
/// consider WAL before it can be removed.
pub remote_consistent_lsn: u64,
pub replytime: SystemTime,
}
// NOTE: Do not forget to increment this number when adding new fields to PageserverFeedback.
// Do not remove previously available fields because this might be backwards incompatible.
pub const PAGESERVER_FEEDBACK_FIELDS_NUMBER: u8 = 5;
impl PageserverFeedback {
pub fn empty() -> PageserverFeedback {
PageserverFeedback {
current_timeline_size: 0,
last_received_lsn: 0,
remote_consistent_lsn: 0,
disk_consistent_lsn: 0,
replytime: *PG_EPOCH,
}
}
// Serialize PageserverFeedback using custom format
// to support protocol extensibility.
//
// Following layout is used:
// char - number of key-value pairs that follow.
//
// key-value pairs:
// null-terminated string - key,
// uint32 - value length in bytes
// value itself
//
// TODO: change serialized fields names once all computes migrate to rename.
pub fn serialize(&self, buf: &mut BytesMut) {
buf.put_u8(PAGESERVER_FEEDBACK_FIELDS_NUMBER); // # of keys
buf.put_slice(b"current_timeline_size\0");
buf.put_i32(8);
buf.put_u64(self.current_timeline_size);
buf.put_slice(b"ps_writelsn\0");
buf.put_i32(8);
buf.put_u64(self.last_received_lsn);
buf.put_slice(b"ps_flushlsn\0");
buf.put_i32(8);
buf.put_u64(self.disk_consistent_lsn);
buf.put_slice(b"ps_applylsn\0");
buf.put_i32(8);
buf.put_u64(self.remote_consistent_lsn);
let timestamp = self
.replytime
.duration_since(*PG_EPOCH)
.expect("failed to serialize pg_replytime earlier than PG_EPOCH")
.as_micros() as i64;
buf.put_slice(b"ps_replytime\0");
buf.put_i32(8);
buf.put_i64(timestamp);
}
// Deserialize PageserverFeedback message
// TODO: change serialized fields names once all computes migrate to rename.
pub fn parse(mut buf: Bytes) -> PageserverFeedback {
let mut rf = PageserverFeedback::empty();
let nfields = buf.get_u8();
for _ in 0..nfields {
let key = read_cstr(&mut buf).unwrap();
match key.as_ref() {
b"current_timeline_size" => {
let len = buf.get_i32();
assert_eq!(len, 8);
rf.current_timeline_size = buf.get_u64();
}
b"ps_writelsn" => {
let len = buf.get_i32();
assert_eq!(len, 8);
rf.last_received_lsn = buf.get_u64();
}
b"ps_flushlsn" => {
let len = buf.get_i32();
assert_eq!(len, 8);
rf.disk_consistent_lsn = buf.get_u64();
}
b"ps_applylsn" => {
let len = buf.get_i32();
assert_eq!(len, 8);
rf.remote_consistent_lsn = buf.get_u64();
}
b"ps_replytime" => {
let len = buf.get_i32();
assert_eq!(len, 8);
let raw_time = buf.get_i64();
if raw_time > 0 {
rf.replytime = *PG_EPOCH + Duration::from_micros(raw_time as u64);
} else {
rf.replytime = *PG_EPOCH - Duration::from_micros(-raw_time as u64);
}
}
_ => {
let len = buf.get_i32();
warn!(
"PageserverFeedback parse. unknown key {} of len {len}. Skip it.",
String::from_utf8_lossy(key.as_ref())
);
buf.advance(len as usize);
}
}
}
trace!("PageserverFeedback parsed is {:?}", rf);
rf
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_replication_feedback_serialization() {
let mut rf = PageserverFeedback::empty();
// Fill rf with some values
rf.current_timeline_size = 12345678;
// Set rounded time to be able to compare it with deserialized value,
// because it is rounded up to microseconds during serialization.
rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
let mut data = BytesMut::new();
rf.serialize(&mut data);
let rf_parsed = PageserverFeedback::parse(data.freeze());
assert_eq!(rf, rf_parsed);
}
#[test]
fn test_replication_feedback_unknown_key() {
let mut rf = PageserverFeedback::empty();
// Fill rf with some values
rf.current_timeline_size = 12345678;
// Set rounded time to be able to compare it with deserialized value,
// because it is rounded up to microseconds during serialization.
rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
let mut data = BytesMut::new();
rf.serialize(&mut data);
// Add an extra field to the buffer and adjust number of keys
if let Some(first) = data.first_mut() {
*first = PAGESERVER_FEEDBACK_FIELDS_NUMBER + 1;
}
data.put_slice(b"new_field_one\0");
data.put_i32(8);
data.put_u64(42);
// Parse serialized data and check that new field is not parsed
let rf_parsed = PageserverFeedback::parse(data.freeze());
assert_eq!(rf, rf_parsed);
}
#[test]
fn test_startup_message_params_options_escaped() {
fn split_options(params: &StartupMessageParams) -> Vec<Cow<'_, str>> {

View File

@@ -11,6 +11,7 @@ async-trait.workspace = true
anyhow.workspace = true
bincode.workspace = true
bytes.workspace = true
chrono.workspace = true
heapless.workspace = true
hex = { workspace = true, features = ["serde"] }
hyper = { workspace = true, features = ["full"] }
@@ -36,6 +37,7 @@ strum_macros.workspace = true
url.workspace = true
uuid.workspace = true
pq_proto.workspace = true
metrics.workspace = true
workspace_hack.workspace = true

View File

@@ -54,6 +54,8 @@ pub mod measured_stream;
pub mod serde_percent;
pub mod serde_regex;
pub mod pageserver_feedback;
pub mod tracing_span_assert;
/// use with fail::cfg("$name", "return(2000)")

View File

@@ -0,0 +1,196 @@
use std::time::{Duration, SystemTime};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use chrono::{DateTime, Utc};
use pq_proto::{read_cstr, PG_EPOCH};
use serde::{Serialize, Serializer};
use tracing::{trace, warn};
use crate::{http::json::display_serialize, lsn::Lsn};
// serialize SystemTime as ISO string in UTC.
fn serialize_system_time_iso<S>(ts: &SystemTime, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let chrono_dt: DateTime<Utc> = (*ts).into();
s.serialize_str(&chrono_dt.to_rfc3339())
}
/// Feedback pageserver sends to safekeeper and safekeeper resends to compute.
/// Serialized in custom flexible key/value format. In replication protocol, it
/// is marked with NEON_STATUS_UPDATE_TAG_BYTE to differentiate from postgres
/// Standby status update / Hot standby feedback messages.
///
/// serde Serialize is used only for human readable dump to json (e.g. in
/// safekeepers debug_dump).
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
pub struct PageserverFeedback {
/// Last known size of the timeline. Used to enforce timeline size limit.
pub current_timeline_size: u64,
/// LSN last received and ingested by the pageserver. Controls backpressure.
#[serde(serialize_with = "display_serialize")]
pub last_received_lsn: Lsn,
/// LSN up to which data is persisted by the pageserver to its local disc.
/// Controls backpressure.
#[serde(serialize_with = "display_serialize")]
pub disk_consistent_lsn: Lsn,
/// LSN up to which data is persisted by the pageserver on s3; safekeepers
/// consider WAL before it can be removed.
#[serde(serialize_with = "display_serialize")]
pub remote_consistent_lsn: Lsn,
#[serde(serialize_with = "serialize_system_time_iso")]
pub replytime: SystemTime,
}
// NOTE: Do not forget to increment this number when adding new fields to PageserverFeedback.
// Do not remove previously available fields because this might be backwards incompatible.
pub const PAGESERVER_FEEDBACK_FIELDS_NUMBER: u8 = 5;
impl PageserverFeedback {
pub fn empty() -> PageserverFeedback {
PageserverFeedback {
current_timeline_size: 0,
last_received_lsn: Lsn::INVALID,
remote_consistent_lsn: Lsn::INVALID,
disk_consistent_lsn: Lsn::INVALID,
replytime: *PG_EPOCH,
}
}
// Serialize PageserverFeedback using custom format
// to support protocol extensibility.
//
// Following layout is used:
// char - number of key-value pairs that follow.
//
// key-value pairs:
// null-terminated string - key,
// uint32 - value length in bytes
// value itself
//
// TODO: change serialized fields names once all computes migrate to rename.
pub fn serialize(&self, buf: &mut BytesMut) {
buf.put_u8(PAGESERVER_FEEDBACK_FIELDS_NUMBER); // # of keys
buf.put_slice(b"current_timeline_size\0");
buf.put_i32(8);
buf.put_u64(self.current_timeline_size);
buf.put_slice(b"ps_writelsn\0");
buf.put_i32(8);
buf.put_u64(self.last_received_lsn.0);
buf.put_slice(b"ps_flushlsn\0");
buf.put_i32(8);
buf.put_u64(self.disk_consistent_lsn.0);
buf.put_slice(b"ps_applylsn\0");
buf.put_i32(8);
buf.put_u64(self.remote_consistent_lsn.0);
let timestamp = self
.replytime
.duration_since(*PG_EPOCH)
.expect("failed to serialize pg_replytime earlier than PG_EPOCH")
.as_micros() as i64;
buf.put_slice(b"ps_replytime\0");
buf.put_i32(8);
buf.put_i64(timestamp);
}
// Deserialize PageserverFeedback message
// TODO: change serialized fields names once all computes migrate to rename.
pub fn parse(mut buf: Bytes) -> PageserverFeedback {
let mut rf = PageserverFeedback::empty();
let nfields = buf.get_u8();
for _ in 0..nfields {
let key = read_cstr(&mut buf).unwrap();
match key.as_ref() {
b"current_timeline_size" => {
let len = buf.get_i32();
assert_eq!(len, 8);
rf.current_timeline_size = buf.get_u64();
}
b"ps_writelsn" => {
let len = buf.get_i32();
assert_eq!(len, 8);
rf.last_received_lsn = Lsn(buf.get_u64());
}
b"ps_flushlsn" => {
let len = buf.get_i32();
assert_eq!(len, 8);
rf.disk_consistent_lsn = Lsn(buf.get_u64());
}
b"ps_applylsn" => {
let len = buf.get_i32();
assert_eq!(len, 8);
rf.remote_consistent_lsn = Lsn(buf.get_u64());
}
b"ps_replytime" => {
let len = buf.get_i32();
assert_eq!(len, 8);
let raw_time = buf.get_i64();
if raw_time > 0 {
rf.replytime = *PG_EPOCH + Duration::from_micros(raw_time as u64);
} else {
rf.replytime = *PG_EPOCH - Duration::from_micros(-raw_time as u64);
}
}
_ => {
let len = buf.get_i32();
warn!(
"PageserverFeedback parse. unknown key {} of len {len}. Skip it.",
String::from_utf8_lossy(key.as_ref())
);
buf.advance(len as usize);
}
}
}
trace!("PageserverFeedback parsed is {:?}", rf);
rf
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_replication_feedback_serialization() {
let mut rf = PageserverFeedback::empty();
// Fill rf with some values
rf.current_timeline_size = 12345678;
// Set rounded time to be able to compare it with deserialized value,
// because it is rounded up to microseconds during serialization.
rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
let mut data = BytesMut::new();
rf.serialize(&mut data);
let rf_parsed = PageserverFeedback::parse(data.freeze());
assert_eq!(rf, rf_parsed);
}
#[test]
fn test_replication_feedback_unknown_key() {
let mut rf = PageserverFeedback::empty();
// Fill rf with some values
rf.current_timeline_size = 12345678;
// Set rounded time to be able to compare it with deserialized value,
// because it is rounded up to microseconds during serialization.
rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
let mut data = BytesMut::new();
rf.serialize(&mut data);
// Add an extra field to the buffer and adjust number of keys
if let Some(first) = data.first_mut() {
*first = PAGESERVER_FEEDBACK_FIELDS_NUMBER + 1;
}
data.put_slice(b"new_field_one\0");
data.put_i32(8);
data.put_u64(42);
// Parse serialized data and check that new field is not parsed
let rf_parsed = PageserverFeedback::parse(data.freeze());
assert_eq!(rf, rf_parsed);
}
}

View File

@@ -37,8 +37,8 @@ use crate::{
use postgres_backend::is_expected_io_error;
use postgres_connection::PgConnectionConfig;
use postgres_ffi::waldecoder::WalStreamDecoder;
use pq_proto::PageserverFeedback;
use utils::lsn::Lsn;
use utils::pageserver_feedback::PageserverFeedback;
/// Status of the connection.
#[derive(Debug, Clone, Copy)]
@@ -319,12 +319,12 @@ pub(super) async fn handle_walreceiver_connection(
timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0));
// The last LSN we processed. It is not guaranteed to survive pageserver crash.
let last_received_lsn = u64::from(last_lsn);
let last_received_lsn = last_lsn;
// `disk_consistent_lsn` is the LSN at which page server guarantees local persistence of all received data
let disk_consistent_lsn = u64::from(timeline.get_disk_consistent_lsn());
let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
// The last LSN that is synced to remote storage and is guaranteed to survive pageserver crash
// Used by safekeepers to remove WAL preceding `remote_consistent_lsn`.
let remote_consistent_lsn = u64::from(timeline_remote_consistent_lsn);
let remote_consistent_lsn = timeline_remote_consistent_lsn;
let ts = SystemTime::now();
// Update the status about what we just received. This is shown in the mgmt API.

View File

@@ -50,7 +50,7 @@ pub struct AppendLogicalMessage {
pub pg_version: u32,
}
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize)]
struct AppendResult {
// safekeeper state after append
state: SafeKeeperState,
@@ -133,7 +133,7 @@ fn send_proposer_elected(tli: &Arc<Timeline>, term: Term, lsn: Lsn) -> anyhow::R
Ok(())
}
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize)]
pub struct InsertedWAL {
begin_lsn: Lsn,
pub end_lsn: Lsn,

View File

@@ -15,7 +15,7 @@ use metrics::{
use once_cell::sync::Lazy;
use postgres_ffi::XLogSegNo;
use pq_proto::PageserverFeedback;
use utils::pageserver_feedback::PageserverFeedback;
use utils::{id::TenantTimelineId, lsn::Lsn};
use crate::{
@@ -557,7 +557,7 @@ impl Collector for TimelineCollector {
self.ps_last_received_lsn
.with_label_values(labels)
.set(tli.ps_feedback.last_received_lsn);
.set(tli.ps_feedback.last_received_lsn.0);
if let Ok(unix_time) = tli
.ps_feedback
.replytime

View File

@@ -18,7 +18,8 @@ use crate::control_file;
use crate::send_wal::HotStandbyFeedback;
use crate::wal_storage;
use pq_proto::{PageserverFeedback, SystemId};
use pq_proto::SystemId;
use utils::pageserver_feedback::PageserverFeedback;
use utils::{
bin_ser::LeSer,
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
@@ -346,7 +347,7 @@ pub struct AppendRequestHeader {
}
/// Report safekeeper state to proposer
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize)]
pub struct AppendResponse {
// Current term of the safekeeper; if it is higher than proposer's, the
// compute is out of date.

View File

@@ -13,12 +13,13 @@ use postgres_backend::PostgresBackend;
use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError};
use postgres_ffi::get_current_timestamp;
use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
use pq_proto::{BeMessage, PageserverFeedback, WalSndKeepAlive, XLogDataBody};
use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncWrite};
use utils::http::json::display_serialize;
use utils::id::TenantTimelineId;
use utils::lsn::AtomicLsn;
use utils::pageserver_feedback::PageserverFeedback;
use std::cmp::{max, min};
use std::net::SocketAddr;
@@ -155,7 +156,7 @@ impl WalSenders {
let mut shared = self.mutex.lock();
shared.get_slot_mut(id).feedback = ReplicationFeedback::Pageserver(*feedback);
shared.update_ps_feedback();
self.update_remote_consistent_lsn(Lsn(shared.agg_ps_feedback.remote_consistent_lsn));
self.update_remote_consistent_lsn(shared.agg_ps_feedback.remote_consistent_lsn);
}
/// Record standby reply.
@@ -195,7 +196,7 @@ impl WalSenders {
let shared = self.mutex.lock();
let slot = shared.get_slot(id);
match slot.feedback {
ReplicationFeedback::Pageserver(feedback) => Some(Lsn(feedback.remote_consistent_lsn)),
ReplicationFeedback::Pageserver(feedback) => Some(feedback.remote_consistent_lsn),
_ => None,
}
}

View File

@@ -14,13 +14,11 @@ publish = false
### BEGIN HAKARI SECTION
[dependencies]
anyhow = { version = "1", features = ["backtrace"] }
byteorder = { version = "1" }
bytes = { version = "1", features = ["serde"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "serde"] }
clap = { version = "4", features = ["derive", "string"] }
clap_builder = { version = "4", default-features = false, features = ["color", "help", "std", "string", "suggestions", "usage"] }
crossbeam-utils = { version = "0.8" }
digest = { version = "0.10", features = ["mac", "std"] }
either = { version = "1" }
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
futures = { version = "0.3" }