From 5abe2129c6e2b2e01caeb62fcf390e1c377edeb6 Mon Sep 17 00:00:00 2001 From: anastasia Date: Wed, 22 Dec 2021 20:06:23 +0300 Subject: [PATCH] Extend replication protocol with ZentihFeedback message to pass current_timeline_size to compute node Put standby_status_update fields into ZenithFeedback and send them as one message. Pass values sizes together with keys in ZenithFeedback message. --- Cargo.lock | 99 ++++++++++++++---- control_plane/Cargo.toml | 2 +- pageserver/Cargo.toml | 8 +- pageserver/src/walreceiver.rs | 28 ++++-- proxy/Cargo.toml | 2 +- vendor/postgres | 2 +- walkeeper/Cargo.toml | 6 +- walkeeper/src/safekeeper.rs | 11 +- walkeeper/src/send_wal.rs | 35 +++++-- walkeeper/src/timeline.rs | 57 ++++++++--- zenith/Cargo.toml | 2 +- zenith_utils/Cargo.toml | 3 +- zenith_utils/src/pq_proto.rs | 184 +++++++++++++++++++++++++++++++++- 13 files changed, 373 insertions(+), 66 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e2a95d1aab..4ac9e9f3d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -375,7 +375,7 @@ dependencies = [ "hyper", "libc", "log", - "postgres", + "postgres 0.19.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858)", "regex", "serde", "serde_json", @@ -411,7 +411,7 @@ dependencies = [ "lazy_static", "nix", "pageserver", - "postgres", + "postgres 0.19.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7)", "regex", "reqwest", "serde", @@ -1270,9 +1270,9 @@ dependencies = [ "nix", "once_cell", "parking_lot", - "postgres", - "postgres-protocol", - "postgres-types", + "postgres 0.19.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7)", + "postgres-protocol 0.6.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7)", + "postgres-types 0.2.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7)", "postgres_ffi", "rand", "regex", @@ -1286,7 +1286,7 @@ dependencies = [ "tempfile", "thiserror", "tokio", - "tokio-postgres", + "tokio-postgres 0.7.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7)", "tokio-stream", "toml_edit", "tracing", @@ -1395,6 +1395,20 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "postgres" +version = "0.19.1" +source = "git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7#2949d98df52587d562986aad155dd4e889e408b7" +dependencies = [ + "bytes", + "fallible-iterator", + "futures", + "log", + "postgres-protocol 0.6.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7)", + "tokio", + "tokio-postgres 0.7.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7)", +] + [[package]] name = "postgres" version = "0.19.1" @@ -1404,9 +1418,27 @@ dependencies = [ "fallible-iterator", "futures", "log", - "postgres-protocol", + "postgres-protocol 0.6.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858)", "tokio", - "tokio-postgres", + "tokio-postgres 0.7.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858)", +] + +[[package]] +name = "postgres-protocol" +version = "0.6.1" +source = "git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7#2949d98df52587d562986aad155dd4e889e408b7" +dependencies = [ + "base64 0.13.0", + "byteorder", + "bytes", + "fallible-iterator", + "hmac 0.10.1", + "lazy_static", + "md-5", + "memchr", + "rand", + "sha2", + "stringprep", ] [[package]] @@ -1427,6 +1459,16 @@ dependencies = [ "stringprep", ] +[[package]] +name = "postgres-types" +version = "0.2.1" +source = "git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7#2949d98df52587d562986aad155dd4e889e408b7" +dependencies = [ + "bytes", + "fallible-iterator", + "postgres-protocol 0.6.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7)", +] + [[package]] name = "postgres-types" version = "0.2.1" @@ -1434,7 +1476,7 @@ source = "git+https://github.com/zenithdb/rust-postgres.git?rev=9eb0dbfbeb6a6c1b dependencies = [ "bytes", "fallible-iterator", - "postgres-protocol", + "postgres-protocol 0.6.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858)", ] [[package]] @@ -1513,7 +1555,7 @@ dependencies = [ "serde", "serde_json", "tokio", - "tokio-postgres", + "tokio-postgres 0.7.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7)", "zenith_metrics", "zenith_utils", ] @@ -2166,6 +2208,28 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-postgres" +version = "0.7.1" +source = "git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7#2949d98df52587d562986aad155dd4e889e408b7" +dependencies = [ + "async-trait", + "byteorder", + "bytes", + "fallible-iterator", + "futures", + "log", + "parking_lot", + "percent-encoding", + "phf", + "pin-project-lite", + "postgres-protocol 0.6.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7)", + "postgres-types 0.2.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7)", + "socket2", + "tokio", + "tokio-util", +] + [[package]] name = "tokio-postgres" version = "0.7.1" @@ -2181,8 +2245,8 @@ dependencies = [ "percent-encoding", "phf", "pin-project-lite", - "postgres-protocol", - "postgres-types", + "postgres-protocol 0.6.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858)", + "postgres-types 0.2.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858)", "socket2", "tokio", "tokio-util", @@ -2419,8 +2483,8 @@ dependencies = [ "humantime", "hyper", "lazy_static", - "postgres", - "postgres-protocol", + "postgres 0.19.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7)", + "postgres-protocol 0.6.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7)", "postgres_ffi", "regex", "routerify", @@ -2430,7 +2494,7 @@ dependencies = [ "signal-hook", "tempfile", "tokio", - "tokio-postgres", + "tokio-postgres 0.7.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7)", "tracing", "walkdir", "workspace_hack", @@ -2668,7 +2732,7 @@ dependencies = [ "clap", "control_plane", "pageserver", - "postgres", + "postgres 0.19.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7)", "postgres_ffi", "serde_json", "walkeeper", @@ -2701,7 +2765,8 @@ dependencies = [ "jsonwebtoken", "lazy_static", "nix", - "postgres", + "postgres 0.19.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7)", + "postgres-protocol 0.6.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7)", "rand", "routerify", "rustls 0.19.1", diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index 5ea7728877..87d08e7439 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" [dependencies] tar = "0.4.33" -postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } +postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="2949d98df52587d562986aad155dd4e889e408b7" } serde = { version = "1.0", features = ["derive"] } toml = "0.5" lazy_static = "1.4" diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 8f6c382262..aa9afe82b3 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -18,10 +18,10 @@ log = "0.4.14" clap = "2.33.0" daemonize = "0.4.1" tokio = { version = "1.11", features = ["process", "sync", "macros", "fs", "rt", "io-util", "time"] } -postgres-types = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } -postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } -postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } -tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } +postgres-types = { git = "https://github.com/zenithdb/rust-postgres.git", rev="2949d98df52587d562986aad155dd4e889e408b7" } +postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="2949d98df52587d562986aad155dd4e889e408b7" } +postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="2949d98df52587d562986aad155dd4e889e408b7" } +tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="2949d98df52587d562986aad155dd4e889e408b7" } tokio-stream = "0.1.8" routerify = "2" anyhow = { version = "1.0", features = ["backtrace"] } diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 2999174277..b6bf117a14 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -11,6 +11,7 @@ use crate::thread_mgr; use crate::thread_mgr::ThreadKind; use crate::walingest::WalIngest; use anyhow::{bail, Context, Error, Result}; +use bytes::BytesMut; use lazy_static::lazy_static; use parking_lot::Mutex; use postgres_ffi::waldecoder::*; @@ -27,9 +28,9 @@ use tokio_postgres::{Client, NoTls, SimpleQueryMessage, SimpleQueryRow}; use tokio_stream::StreamExt; use tracing::*; use zenith_utils::lsn::Lsn; +use zenith_utils::pq_proto::ZenithFeedback; use zenith_utils::zid::ZTenantId; use zenith_utils::zid::ZTimelineId; - // // We keep one WAL Receiver active per timeline. // @@ -287,7 +288,6 @@ fn walreceiver_main( }; if let Some(last_lsn) = status_update { - let last_lsn = PgLsn::from(u64::from(last_lsn)); let timeline_synced_disk_consistent_lsn = tenant_mgr::get_repository_for_tenant(tenantid)? .get_timeline_state(timelineid) @@ -295,18 +295,32 @@ fn walreceiver_main( .unwrap_or(Lsn(0)); // The last LSN we processed. It is not guaranteed to survive pageserver crash. - let write_lsn = last_lsn; + let write_lsn = u64::from(last_lsn); // `disk_consistent_lsn` is the LSN at which page server guarantees local persistence of all received data - let flush_lsn = PgLsn::from(u64::from(timeline.get_disk_consistent_lsn())); + let flush_lsn = u64::from(timeline.get_disk_consistent_lsn()); // The last LSN that is synced to remote storage and is guaranteed to survive pageserver crash // Used by safekeepers to remove WAL preceding `remote_consistent_lsn`. - let apply_lsn = PgLsn::from(u64::from(timeline_synced_disk_consistent_lsn)); + let apply_lsn = u64::from(timeline_synced_disk_consistent_lsn); let ts = SystemTime::now(); - const NO_REPLY: u8 = 0; + + // Send zenith feedback message. + // Regular standby_status_update fields are put into this message. + let zenith_status_update = ZenithFeedback { + current_timeline_size: timeline.get_current_logical_size() as u64, + ps_writelsn: write_lsn, + ps_flushlsn: flush_lsn, + ps_applylsn: apply_lsn, + ps_replytime: ts, + }; + + debug!("zenith_status_update {:?}", zenith_status_update); + + let mut data = BytesMut::new(); + zenith_status_update.serialize(&mut data)?; runtime.block_on( physical_stream .as_mut() - .standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, NO_REPLY), + .zenith_status_update(data.len() as u64, &data), )?; } } diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 997141edf6..e989cf1dec 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -19,7 +19,7 @@ parking_lot = "0.11.2" serde = "1" serde_json = "1" tokio = { version = "1.11", features = ["macros"] } -tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } +tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="2949d98df52587d562986aad155dd4e889e408b7" } clap = "2.33.0" rustls = "0.19.1" reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "rustls-tls"] } diff --git a/vendor/postgres b/vendor/postgres index 12250cf3af..f959267142 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit 12250cf3af0d438e6d554443b59a6b6bb64ab1f4 +Subproject commit f9592671426dc1ba713731720b5273768089433e diff --git a/walkeeper/Cargo.toml b/walkeeper/Cargo.toml index 805b93ceea..9d0dcfd542 100644 --- a/walkeeper/Cargo.toml +++ b/walkeeper/Cargo.toml @@ -20,8 +20,8 @@ clap = "2.33.0" daemonize = "0.4.1" rust-s3 = { version = "0.28", default-features = false, features = ["no-verify-ssl", "tokio-rustls-tls"] } tokio = { version = "1.11", features = ["macros"] } -postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } -postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } +postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="2949d98df52587d562986aad155dd4e889e408b7" } +postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="2949d98df52587d562986aad155dd4e889e408b7" } anyhow = "1.0" crc32c = "0.6.0" humantime = "2.1.0" @@ -30,7 +30,7 @@ signal-hook = "0.3.10" serde = { version = "1.0", features = ["derive"] } hex = "0.4.3" const_format = "0.2.21" -tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } +tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="2949d98df52587d562986aad155dd4e889e408b7" } postgres_ffi = { path = "../postgres_ffi" } workspace_hack = { path = "../workspace_hack" } diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index 1a8633edd0..a39b67f0fd 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -22,6 +22,7 @@ use zenith_metrics::{ use zenith_utils::bin_ser::LeSer; use zenith_utils::lsn::Lsn; use zenith_utils::pq_proto::SystemId; +use zenith_utils::pq_proto::ZenithFeedback; use zenith_utils::zid::{ZTenantId, ZTimelineId}; pub const SK_MAGIC: u32 = 0xcafeceefu32; @@ -278,9 +279,8 @@ pub struct AppendResponse { // We report back our awareness about which WAL is committed, as this is // a criterion for walproposer --sync mode exit pub commit_lsn: Lsn, - // Min disk consistent lsn of pageservers (portion of WAL applied and written to the disk by pageservers) - pub disk_consistent_lsn: Lsn, pub hs_feedback: HotStandbyFeedback, + pub zenith_feedback: ZenithFeedback, } impl AppendResponse { @@ -289,8 +289,8 @@ impl AppendResponse { term, flush_lsn: Lsn(0), commit_lsn: Lsn(0), - disk_consistent_lsn: Lsn(0), hs_feedback: HotStandbyFeedback::empty(), + zenith_feedback: ZenithFeedback::empty(), } } } @@ -395,10 +395,11 @@ impl AcceptorProposerMessage { buf.put_u64_le(msg.term); buf.put_u64_le(msg.flush_lsn.into()); buf.put_u64_le(msg.commit_lsn.into()); - buf.put_u64_le(msg.disk_consistent_lsn.into()); buf.put_i64_le(msg.hs_feedback.ts); buf.put_u64_le(msg.hs_feedback.xmin); buf.put_u64_le(msg.hs_feedback.catalog_xmin); + + msg.zenith_feedback.serialize(buf)? } } @@ -614,9 +615,9 @@ where term: self.s.acceptor_state.term, flush_lsn: self.flush_lsn, commit_lsn: self.s.commit_lsn, - disk_consistent_lsn: Lsn(0), // will be filled by the upper code to avoid bothering safekeeper hs_feedback: HotStandbyFeedback::empty(), + zenith_feedback: ZenithFeedback::empty(), } } diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs index de469aaf81..d9d6bdf680 100644 --- a/walkeeper/src/send_wal.rs +++ b/walkeeper/src/send_wal.rs @@ -9,6 +9,8 @@ use postgres_ffi::xlog_utils::{ get_current_timestamp, TimestampTz, XLogFileName, MAX_SEND_SIZE, PG_TLI, }; +use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey}; +use bytes::Bytes; use serde::{Deserialize, Serialize}; use std::cmp::min; use std::fs::File; @@ -19,20 +21,21 @@ use std::sync::Arc; use std::thread::sleep; use std::time::Duration; use std::{str, thread}; +use tokio::sync::mpsc::UnboundedSender; use tracing::*; use zenith_utils::bin_ser::BeSer; use zenith_utils::lsn::Lsn; use zenith_utils::postgres_backend::PostgresBackend; -use zenith_utils::pq_proto::{BeMessage, FeMessage, WalSndKeepAlive, XLogDataBody}; +use zenith_utils::pq_proto::{BeMessage, FeMessage, WalSndKeepAlive, XLogDataBody, ZenithFeedback}; use zenith_utils::sock_split::ReadStream; -use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey}; -use tokio::sync::mpsc::UnboundedSender; use zenith_utils::zid::{ZTenantId, ZTimelineId}; // See: https://www.postgresql.org/docs/13/protocol-replication.html const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h'; const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r'; +// zenith extension of replication protocol +const ZENITH_STATUS_UPDATE_TAG_BYTE: u8 = b'z'; type FullTransactionId = u64; @@ -139,8 +142,8 @@ impl ReplicationConn { while let Some(msg) = FeMessage::read(&mut stream_in)? { match &msg { FeMessage::CopyData(m) => { - // There's two possible data messages that the client is supposed to send here: - // `HotStandbyFeedback` and `StandbyStatusUpdate`. + // There's three possible data messages that the client is supposed to send here: + // `HotStandbyFeedback` and `StandbyStatusUpdate` and `ZenithStandbyFeedback`. match m.first().cloned() { Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => { @@ -150,11 +153,25 @@ impl ReplicationConn { timeline.update_replica_state(replica_id, state); } Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => { - let reply = StandbyReply::des(&m[1..]) + let _reply = StandbyReply::des(&m[1..]) .context("failed to deserialize StandbyReply")?; - state.last_received_lsn = reply.write_lsn; - state.disk_consistent_lsn = reply.flush_lsn; - state.remote_consistent_lsn = reply.apply_lsn; + // This must be a regular postgres replica, + // because pageserver doesn't send this type of messages to safekeeper. + // Currently this is not implemented, so this message is ignored. + + warn!("unexpected StandbyReply. Read-only postgres replicas are not supported in safekeepers yet."); + // timeline.update_replica_state(replica_id, Some(state)); + } + Some(ZENITH_STATUS_UPDATE_TAG_BYTE) => { + // Note: deserializing is on m[9..] because we skip the tag byte and len bytes. + let buf = Bytes::copy_from_slice(&m[9..]); + let reply = ZenithFeedback::parse(buf); + + info!("ZenithFeedback is {:?}", reply); + // Only pageserver sends ZenithFeedback, so set the flag. + // This replica is the source of information to resend to compute. + state.zenith_feedback = Some(reply); + timeline.update_replica_state(replica_id, state); } _ => warn!("unexpected message {:?}", msg), diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index 5548cd28f4..12f6fd853f 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -27,6 +27,7 @@ use crate::upgrade::upgrade_control_file; use crate::SafeKeeperConf; use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ}; use std::convert::TryInto; +use zenith_utils::pq_proto::ZenithFeedback; // contains persistent metadata for safekeeper const CONTROL_FILE_NAME: &str = "safekeeper.control"; @@ -35,17 +36,17 @@ const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial"; const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1); pub const CHECKSUM_SIZE: usize = std::mem::size_of::(); -/// Replica status: host standby feedback + disk consistent lsn +/// Replica status update + hot standby feedback #[derive(Debug, Clone, Copy)] pub struct ReplicaState { /// last known lsn received by replica pub last_received_lsn: Lsn, // None means we don't know - /// combined disk_consistent_lsn of pageservers - pub disk_consistent_lsn: Lsn, /// combined remote consistent lsn of pageservers pub remote_consistent_lsn: Lsn, /// combined hot standby feedback from all replicas pub hs_feedback: HotStandbyFeedback, + /// Zenith specific feedback received from pageserver, if any + pub zenith_feedback: Option, } impl Default for ReplicaState { @@ -58,13 +59,13 @@ impl ReplicaState { pub fn new() -> ReplicaState { ReplicaState { last_received_lsn: Lsn::MAX, - disk_consistent_lsn: Lsn(u64::MAX), - remote_consistent_lsn: Lsn(u64::MAX), + remote_consistent_lsn: Lsn(0), hs_feedback: HotStandbyFeedback { ts: 0, xmin: u64::MAX, catalog_xmin: u64::MAX, }, + zenith_feedback: None, } } } @@ -109,13 +110,40 @@ impl SharedState { acc.hs_feedback.xmin = min(acc.hs_feedback.xmin, state.hs_feedback.xmin); acc.hs_feedback.catalog_xmin = min(acc.hs_feedback.catalog_xmin, state.hs_feedback.catalog_xmin); - acc.disk_consistent_lsn = Lsn::min(acc.disk_consistent_lsn, state.disk_consistent_lsn); - // currently not used, but update it to be consistent - acc.last_received_lsn = Lsn::min(acc.last_received_lsn, state.last_received_lsn); - // When at least one replica has preserved data up to remote_consistent_lsn, - // safekeeper is free to delete it, so chose max of all replicas. - acc.remote_consistent_lsn = - Lsn::max(acc.remote_consistent_lsn, state.remote_consistent_lsn); + + // FIXME + // If multiple pageservers are streaming WAL and send feedback for the same timeline simultaneously, + // this code is not correct. + // Now the most advanced feedback is used. + // If one pageserver lags when another doesn't, the backpressure won't be activated on compute and lagging + // pageserver is prone to timeout errors. + // + // To choose what feedback to use and resend to compute node, + // we need to know which pageserver compute node considers to be main. + // See https://github.com/zenithdb/zenith/issues/1171 + // + if let Some(zenith_feedback) = state.zenith_feedback { + if let Some(acc_feedback) = acc.zenith_feedback { + if acc_feedback.ps_writelsn < zenith_feedback.ps_writelsn { + warn!("More than one pageserver is streaming WAL for the timeline. Feedback resolving is not fully supported yet."); + acc.zenith_feedback = Some(zenith_feedback); + } + } else { + acc.zenith_feedback = Some(zenith_feedback); + } + + // last lsn received by pageserver + // FIXME if multiple pageservers are streaming WAL, last_received_lsn must be tracked per pageserver. + // See https://github.com/zenithdb/zenith/issues/1171 + acc.last_received_lsn = Lsn::from(zenith_feedback.ps_writelsn); + + // When at least one pageserver has preserved data up to remote_consistent_lsn, + // safekeeper is free to delete it, so choose max of all pageservers. + acc.remote_consistent_lsn = max( + Lsn::from(zenith_feedback.ps_applylsn), + acc.remote_consistent_lsn, + ); + } } acc } @@ -280,8 +308,9 @@ impl Timeline { if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg { let state = shared_state.get_replicas_state(); resp.hs_feedback = state.hs_feedback; - resp.disk_consistent_lsn = state.disk_consistent_lsn; - // XXX Do we need to add state.last_received_lsn to resp? + if let Some(zenith_feedback) = state.zenith_feedback { + resp.zenith_feedback = zenith_feedback; + } } } // Ping wal sender that new data might be available. diff --git a/zenith/Cargo.toml b/zenith/Cargo.toml index 4409ba4295..b41f33cd74 100644 --- a/zenith/Cargo.toml +++ b/zenith/Cargo.toml @@ -10,7 +10,7 @@ edition = "2021" clap = "2.33.0" anyhow = "1.0" serde_json = "1" -postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } +postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="2949d98df52587d562986aad155dd4e889e408b7" } # FIXME: 'pageserver' is needed for BranchInfo. Refactor pageserver = { path = "../pageserver" } diff --git a/zenith_utils/Cargo.toml b/zenith_utils/Cargo.toml index 0e4f6d68e8..34c4c03d97 100644 --- a/zenith_utils/Cargo.toml +++ b/zenith_utils/Cargo.toml @@ -11,7 +11,8 @@ byteorder = "1.4.3" bytes = "1.0.1" hyper = { version = "0.14.7", features = ["full"] } lazy_static = "1.4.0" -postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } +postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="2949d98df52587d562986aad155dd4e889e408b7" } +postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="2949d98df52587d562986aad155dd4e889e408b7" } routerify = "2" serde = { version = "1.0", features = ["derive"] } serde_json = "1" diff --git a/zenith_utils/src/pq_proto.rs b/zenith_utils/src/pq_proto.rs index f3365fa4c5..dec4f460a6 100644 --- a/zenith_utils/src/pq_proto.rs +++ b/zenith_utils/src/pq_proto.rs @@ -6,11 +6,14 @@ use anyhow::{bail, ensure, Context, Result}; use byteorder::{BigEndian, ByteOrder}; use byteorder::{ReadBytesExt, BE}; use bytes::{Buf, BufMut, Bytes, BytesMut}; -// use postgres_ffi::xlog_utils::TimestampTz; +use postgres_protocol::PG_EPOCH; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::io::Read; use std::io::{self, Cursor}; use std::str; +use std::time::{Duration, SystemTime}; +use tracing::info; pub type Oid = u32; pub type SystemId = u64; @@ -504,7 +507,7 @@ where } /// Safe write of s into buf as cstring (String in the protocol). -fn write_cstr(s: &[u8], buf: &mut BytesMut) -> Result<(), io::Error> { +pub fn write_cstr(s: &[u8], buf: &mut BytesMut) -> Result<(), io::Error> { if s.contains(&0) { return Err(io::Error::new( io::ErrorKind::InvalidInput, @@ -516,6 +519,17 @@ fn write_cstr(s: &[u8], buf: &mut BytesMut) -> Result<(), io::Error> { Ok(()) } +// Truncate 0 from C string in Bytes and stringify it (returns slice, no allocations) +// PG protocol strings are always C strings. +fn cstr_to_str(b: &Bytes) -> Result<&str> { + let without_null = if b.last() == Some(&0) { + &b[..b.len() - 1] + } else { + &b[..] + }; + std::str::from_utf8(without_null).map_err(|e| e.into()) +} + impl<'a> BeMessage<'a> { /// Write message to the given buf. // Unlike the reading side, we use BytesMut @@ -798,3 +812,169 @@ impl<'a> BeMessage<'a> { Ok(()) } } + +// Zenith extension of postgres replication protocol +// See ZENITH_STATUS_UPDATE_TAG_BYTE +#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] +pub struct ZenithFeedback { + // Last known size of the timeline. Used to enforce timeline size limit. + pub current_timeline_size: u64, + // Parts of StandbyStatusUpdate we resend to compute via safekeeper + pub ps_writelsn: u64, + pub ps_applylsn: u64, + pub ps_flushlsn: u64, + pub ps_replytime: SystemTime, +} + +// NOTE: Do not forget to increment this number when adding new fields to ZenithFeedback. +// Do not remove previously available fields because this might be backwards incompatible. +pub const ZENITH_FEEDBACK_FIELDS_NUMBER: u8 = 5; + +impl ZenithFeedback { + pub fn empty() -> ZenithFeedback { + ZenithFeedback { + current_timeline_size: 0, + ps_writelsn: 0, + ps_applylsn: 0, + ps_flushlsn: 0, + ps_replytime: SystemTime::now(), + } + } + + // Serialize ZenithFeedback using custom format + // to support protocol extensibility. + // + // Following layout is used: + // char - number of key-value pairs that follow. + // + // key-value pairs: + // null-terminated string - key, + // uint32 - value length in bytes + // value itself + pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> { + buf.put_u8(ZENITH_FEEDBACK_FIELDS_NUMBER); // # of keys + write_cstr(&Bytes::from("current_timeline_size"), buf)?; + buf.put_i32(8); + buf.put_u64(self.current_timeline_size); + + write_cstr(&Bytes::from("ps_writelsn"), buf)?; + buf.put_i32(8); + buf.put_u64(self.ps_writelsn); + write_cstr(&Bytes::from("ps_flushlsn"), buf)?; + buf.put_i32(8); + buf.put_u64(self.ps_flushlsn); + write_cstr(&Bytes::from("ps_applylsn"), buf)?; + buf.put_i32(8); + buf.put_u64(self.ps_applylsn); + + let timestamp = self + .ps_replytime + .duration_since(*PG_EPOCH) + .expect("failed to serialize pg_replytime earlier than PG_EPOCH") + .as_micros() as i64; + + write_cstr(&Bytes::from("ps_replytime"), buf)?; + buf.put_i32(8); + buf.put_i64(timestamp); + Ok(()) + } + + // Deserialize ZenithFeedback message + pub fn parse(mut buf: Bytes) -> ZenithFeedback { + let mut zf = ZenithFeedback::empty(); + let nfields = buf.get_u8(); + let mut i = 0; + while i < nfields { + i += 1; + let key_cstr = read_null_terminated(&mut buf).unwrap(); + let key = cstr_to_str(&key_cstr).unwrap(); + match key { + "current_timeline_size" => { + let len = buf.get_i32(); + assert_eq!(len, 8); + zf.current_timeline_size = buf.get_u64(); + } + "ps_writelsn" => { + let len = buf.get_i32(); + assert_eq!(len, 8); + zf.ps_writelsn = buf.get_u64(); + } + "ps_flushlsn" => { + let len = buf.get_i32(); + assert_eq!(len, 8); + zf.ps_flushlsn = buf.get_u64(); + } + "ps_applylsn" => { + let len = buf.get_i32(); + assert_eq!(len, 8); + zf.ps_applylsn = buf.get_u64(); + } + "ps_replytime" => { + let len = buf.get_i32(); + assert_eq!(len, 8); + let raw_time = buf.get_i64(); + if raw_time > 0 { + zf.ps_replytime = *PG_EPOCH + Duration::from_micros(raw_time as u64); + } else { + zf.ps_replytime = *PG_EPOCH - Duration::from_micros(-raw_time as u64); + } + } + _ => { + let len = buf.get_i32(); + info!( + "ZenithFeedback parse. unknown key {} of len {}. Skip it.", + key, len + ); + buf.advance(len as usize); + } + } + } + info!("ZenithFeedback parsed is {:?}", zf); + zf + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_zenithfeedback_serialization() { + let mut zf = ZenithFeedback::empty(); + // Fill zf wih some values + zf.current_timeline_size = 12345678; + // Set rounded time to be able to compare it with deserialized value, + // because it is rounded up to microseconds during serialization. + zf.ps_replytime = *PG_EPOCH + Duration::from_secs(100_000_000); + let mut data = BytesMut::new(); + zf.serialize(&mut data).unwrap(); + + let zf_parsed = ZenithFeedback::parse(data.freeze()); + assert_eq!(zf, zf_parsed); + } + + #[test] + fn test_zenithfeedback_unknown_key() { + let mut zf = ZenithFeedback::empty(); + // Fill zf wih some values + zf.current_timeline_size = 12345678; + // Set rounded time to be able to compare it with deserialized value, + // because it is rounded up to microseconds during serialization. + zf.ps_replytime = *PG_EPOCH + Duration::from_secs(100_000_000); + let mut data = BytesMut::new(); + zf.serialize(&mut data).unwrap(); + + // Add an extra field to the buffer and adjust number of keys + if let Some(first) = data.first_mut() { + *first = ZENITH_FEEDBACK_FIELDS_NUMBER + 1; + } + + write_cstr(&Bytes::from("new_field_one"), &mut data).unwrap(); + data.put_i32(8); + data.put_u64(42); + + // Parse serialized data and check that new field is not parsed + let zf_parsed = ZenithFeedback::parse(data.freeze()); + assert_eq!(zf, zf_parsed); + } +}