From d733bc54b8b2aa904cf0192359c0c7d6f986fe8d Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Wed, 29 Mar 2023 22:02:36 +0400 Subject: [PATCH] Rename ReplicationFeedback and its fields. This is the the feedback originating from pageserver, so change previous confusing names to s/ReplicationFeedback/PageserverFeedback s/ps_writelsn/last_receive_lsn s/ps_flushlsn/disk_consistent_lsn s/ps_apply_lsn/remote_consistent_lsn I haven't changed on the wire format to keep compatibility. However, understanding of new field names is added to compute, so once all computes receive this patch we can change the wire names as well. Safekeepers/pageservers are deployed roughly at the same time and it is ok to live without feedbacks during the short period, so this is not a problem there. --- libs/pq_proto/src/lib.rs | 90 ++++++++++-------- .../walreceiver/walreceiver_connection.rs | 18 ++-- pgxn/neon/walproposer.c | 94 +++++++++---------- pgxn/neon/walproposer.h | 24 ++--- safekeeper/src/metrics.rs | 25 +++-- safekeeper/src/safekeeper.rs | 8 +- safekeeper/src/send_wal.rs | 8 +- safekeeper/src/timeline.rs | 12 +-- 8 files changed, 142 insertions(+), 137 deletions(-) diff --git a/libs/pq_proto/src/lib.rs b/libs/pq_proto/src/lib.rs index 656c0ff312..a976e19029 100644 --- a/libs/pq_proto/src/lib.rs +++ b/libs/pq_proto/src/lib.rs @@ -936,35 +936,40 @@ impl<'a> BeMessage<'a> { } } -// Neon extension of postgres replication protocol -// See NEON_STATUS_UPDATE_TAG_BYTE +/// Feedback pageserver sends to safekeeper and safekeeper resends to compute. +/// Serialized in custom flexible key/value format. In replication protocol, it +/// is marked with NEON_STATUS_UPDATE_TAG_BYTE to differentiate from postgres +/// Standby status update / Hot standby feedback messages. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] -pub struct ReplicationFeedback { - // Last known size of the timeline. Used to enforce timeline size limit. +pub struct PageserverFeedback { + /// Last known size of the timeline. Used to enforce timeline size limit. pub current_timeline_size: u64, - // Parts of StandbyStatusUpdate we resend to compute via safekeeper - pub ps_writelsn: u64, - pub ps_applylsn: u64, - pub ps_flushlsn: u64, - pub ps_replytime: SystemTime, + /// LSN last received and ingested by the pageserver. + pub last_received_lsn: u64, + /// LSN up to which data is persisted by the pageserver to its local disc. + pub disk_consistent_lsn: u64, + /// LSN up to which data is persisted by the pageserver on s3; safekeepers + /// consider WAL before it can be removed. + pub remote_consistent_lsn: u64, + pub replytime: SystemTime, } -// NOTE: Do not forget to increment this number when adding new fields to ReplicationFeedback. +// NOTE: Do not forget to increment this number when adding new fields to PageserverFeedback. // Do not remove previously available fields because this might be backwards incompatible. -pub const REPLICATION_FEEDBACK_FIELDS_NUMBER: u8 = 5; +pub const PAGESERVER_FEEDBACK_FIELDS_NUMBER: u8 = 5; -impl ReplicationFeedback { - pub fn empty() -> ReplicationFeedback { - ReplicationFeedback { +impl PageserverFeedback { + pub fn empty() -> PageserverFeedback { + PageserverFeedback { current_timeline_size: 0, - ps_writelsn: 0, - ps_applylsn: 0, - ps_flushlsn: 0, - ps_replytime: SystemTime::now(), + last_received_lsn: 0, + remote_consistent_lsn: 0, + disk_consistent_lsn: 0, + replytime: SystemTime::now(), } } - // Serialize ReplicationFeedback using custom format + // Serialize PageserverFeedback using custom format // to support protocol extensibility. // // Following layout is used: @@ -974,24 +979,26 @@ impl ReplicationFeedback { // null-terminated string - key, // uint32 - value length in bytes // value itself + // + // TODO: change serialized fields names once all computes migrate to rename. pub fn serialize(&self, buf: &mut BytesMut) { - buf.put_u8(REPLICATION_FEEDBACK_FIELDS_NUMBER); // # of keys + buf.put_u8(PAGESERVER_FEEDBACK_FIELDS_NUMBER); // # of keys buf.put_slice(b"current_timeline_size\0"); buf.put_i32(8); buf.put_u64(self.current_timeline_size); buf.put_slice(b"ps_writelsn\0"); buf.put_i32(8); - buf.put_u64(self.ps_writelsn); + buf.put_u64(self.last_received_lsn); buf.put_slice(b"ps_flushlsn\0"); buf.put_i32(8); - buf.put_u64(self.ps_flushlsn); + buf.put_u64(self.disk_consistent_lsn); buf.put_slice(b"ps_applylsn\0"); buf.put_i32(8); - buf.put_u64(self.ps_applylsn); + buf.put_u64(self.remote_consistent_lsn); let timestamp = self - .ps_replytime + .replytime .duration_since(*PG_EPOCH) .expect("failed to serialize pg_replytime earlier than PG_EPOCH") .as_micros() as i64; @@ -1001,9 +1008,10 @@ impl ReplicationFeedback { buf.put_i64(timestamp); } - // Deserialize ReplicationFeedback message - pub fn parse(mut buf: Bytes) -> ReplicationFeedback { - let mut rf = ReplicationFeedback::empty(); + // Deserialize PageserverFeedback message + // TODO: change serialized fields names once all computes migrate to rename. + pub fn parse(mut buf: Bytes) -> PageserverFeedback { + let mut rf = PageserverFeedback::empty(); let nfields = buf.get_u8(); for _ in 0..nfields { let key = read_cstr(&mut buf).unwrap(); @@ -1016,39 +1024,39 @@ impl ReplicationFeedback { b"ps_writelsn" => { let len = buf.get_i32(); assert_eq!(len, 8); - rf.ps_writelsn = buf.get_u64(); + rf.last_received_lsn = buf.get_u64(); } b"ps_flushlsn" => { let len = buf.get_i32(); assert_eq!(len, 8); - rf.ps_flushlsn = buf.get_u64(); + rf.disk_consistent_lsn = buf.get_u64(); } b"ps_applylsn" => { let len = buf.get_i32(); assert_eq!(len, 8); - rf.ps_applylsn = buf.get_u64(); + rf.remote_consistent_lsn = buf.get_u64(); } b"ps_replytime" => { let len = buf.get_i32(); assert_eq!(len, 8); let raw_time = buf.get_i64(); if raw_time > 0 { - rf.ps_replytime = *PG_EPOCH + Duration::from_micros(raw_time as u64); + rf.replytime = *PG_EPOCH + Duration::from_micros(raw_time as u64); } else { - rf.ps_replytime = *PG_EPOCH - Duration::from_micros(-raw_time as u64); + rf.replytime = *PG_EPOCH - Duration::from_micros(-raw_time as u64); } } _ => { let len = buf.get_i32(); warn!( - "ReplicationFeedback parse. unknown key {} of len {len}. Skip it.", + "PageserverFeedback parse. unknown key {} of len {len}. Skip it.", String::from_utf8_lossy(key.as_ref()) ); buf.advance(len as usize); } } } - trace!("ReplicationFeedback parsed is {:?}", rf); + trace!("PageserverFeedback parsed is {:?}", rf); rf } } @@ -1059,33 +1067,33 @@ mod tests { #[test] fn test_replication_feedback_serialization() { - let mut rf = ReplicationFeedback::empty(); + let mut rf = PageserverFeedback::empty(); // Fill rf with some values rf.current_timeline_size = 12345678; // Set rounded time to be able to compare it with deserialized value, // because it is rounded up to microseconds during serialization. - rf.ps_replytime = *PG_EPOCH + Duration::from_secs(100_000_000); + rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000); let mut data = BytesMut::new(); rf.serialize(&mut data); - let rf_parsed = ReplicationFeedback::parse(data.freeze()); + let rf_parsed = PageserverFeedback::parse(data.freeze()); assert_eq!(rf, rf_parsed); } #[test] fn test_replication_feedback_unknown_key() { - let mut rf = ReplicationFeedback::empty(); + let mut rf = PageserverFeedback::empty(); // Fill rf with some values rf.current_timeline_size = 12345678; // Set rounded time to be able to compare it with deserialized value, // because it is rounded up to microseconds during serialization. - rf.ps_replytime = *PG_EPOCH + Duration::from_secs(100_000_000); + rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000); let mut data = BytesMut::new(); rf.serialize(&mut data); // Add an extra field to the buffer and adjust number of keys if let Some(first) = data.first_mut() { - *first = REPLICATION_FEEDBACK_FIELDS_NUMBER + 1; + *first = PAGESERVER_FEEDBACK_FIELDS_NUMBER + 1; } data.put_slice(b"new_field_one\0"); @@ -1093,7 +1101,7 @@ mod tests { data.put_u64(42); // Parse serialized data and check that new field is not parsed - let rf_parsed = ReplicationFeedback::parse(data.freeze()); + let rf_parsed = PageserverFeedback::parse(data.freeze()); assert_eq!(rf, rf_parsed); } diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 9398a7bee9..ea2f2392ea 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -37,7 +37,7 @@ use crate::{ use postgres_backend::is_expected_io_error; use postgres_connection::PgConnectionConfig; use postgres_ffi::waldecoder::WalStreamDecoder; -use pq_proto::ReplicationFeedback; +use pq_proto::PageserverFeedback; use utils::lsn::Lsn; /// Status of the connection. @@ -319,12 +319,12 @@ pub async fn handle_walreceiver_connection( timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0)); // The last LSN we processed. It is not guaranteed to survive pageserver crash. - let write_lsn = u64::from(last_lsn); + let last_received_lsn = u64::from(last_lsn); // `disk_consistent_lsn` is the LSN at which page server guarantees local persistence of all received data - let flush_lsn = u64::from(timeline.get_disk_consistent_lsn()); + let disk_consistent_lsn = u64::from(timeline.get_disk_consistent_lsn()); // The last LSN that is synced to remote storage and is guaranteed to survive pageserver crash // Used by safekeepers to remove WAL preceding `remote_consistent_lsn`. - let apply_lsn = u64::from(timeline_remote_consistent_lsn); + let remote_consistent_lsn = u64::from(timeline_remote_consistent_lsn); let ts = SystemTime::now(); // Update the status about what we just received. This is shown in the mgmt API. @@ -343,12 +343,12 @@ pub async fn handle_walreceiver_connection( let (timeline_logical_size, _) = timeline .get_current_logical_size(&ctx) .context("Status update creation failed to get current logical size")?; - let status_update = ReplicationFeedback { + let status_update = PageserverFeedback { current_timeline_size: timeline_logical_size, - ps_writelsn: write_lsn, - ps_flushlsn: flush_lsn, - ps_applylsn: apply_lsn, - ps_replytime: ts, + last_received_lsn, + disk_consistent_lsn, + remote_consistent_lsn, + replytime: ts, }; debug!("neon_status_update {status_update:?}"); diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index b0b2a23e3c..45037a8c01 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -1872,9 +1872,9 @@ RecvAppendResponses(Safekeeper *sk) return sk->state == SS_ACTIVE; } -/* Parse a ReplicationFeedback message, or the ReplicationFeedback part of an AppendResponse */ +/* Parse a PageserverFeedback message, or the PageserverFeedback part of an AppendResponse */ void -ParseReplicationFeedbackMessage(StringInfo reply_message, ReplicationFeedback * rf) +ParsePageserverFeedbackMessage(StringInfo reply_message, PageserverFeedback * rf) { uint8 nkeys; int i; @@ -1892,45 +1892,45 @@ ParseReplicationFeedbackMessage(StringInfo reply_message, ReplicationFeedback * pq_getmsgint(reply_message, sizeof(int32)); /* read value length */ rf->currentClusterSize = pq_getmsgint64(reply_message); - elog(DEBUG2, "ParseReplicationFeedbackMessage: current_timeline_size %lu", + elog(DEBUG2, "ParsePageserverFeedbackMessage: current_timeline_size %lu", rf->currentClusterSize); } - else if (strcmp(key, "ps_writelsn") == 0) + else if ((strcmp(key, "ps_writelsn") == 0) || (strcmp(key, "last_received_lsn") == 0)) { pq_getmsgint(reply_message, sizeof(int32)); /* read value length */ - rf->ps_writelsn = pq_getmsgint64(reply_message); - elog(DEBUG2, "ParseReplicationFeedbackMessage: ps_writelsn %X/%X", - LSN_FORMAT_ARGS(rf->ps_writelsn)); + rf->last_received_lsn = pq_getmsgint64(reply_message); + elog(DEBUG2, "ParsePageserverFeedbackMessage: last_received_lsn %X/%X", + LSN_FORMAT_ARGS(rf->last_received_lsn)); } - else if (strcmp(key, "ps_flushlsn") == 0) + else if ((strcmp(key, "ps_flushlsn") == 0) || (strcmp(key, "disk_consistent_lsn") == 0)) { pq_getmsgint(reply_message, sizeof(int32)); /* read value length */ - rf->ps_flushlsn = pq_getmsgint64(reply_message); - elog(DEBUG2, "ParseReplicationFeedbackMessage: ps_flushlsn %X/%X", - LSN_FORMAT_ARGS(rf->ps_flushlsn)); + rf->disk_consistent_lsn = pq_getmsgint64(reply_message); + elog(DEBUG2, "ParsePageserverFeedbackMessage: disk_consistent_lsn %X/%X", + LSN_FORMAT_ARGS(rf->disk_consistent_lsn)); } - else if (strcmp(key, "ps_applylsn") == 0) + else if ((strcmp(key, "ps_applylsn") == 0) || (strcmp(key, "remote_consistent_lsn") == 0)) { pq_getmsgint(reply_message, sizeof(int32)); /* read value length */ - rf->ps_applylsn = pq_getmsgint64(reply_message); - elog(DEBUG2, "ParseReplicationFeedbackMessage: ps_applylsn %X/%X", - LSN_FORMAT_ARGS(rf->ps_applylsn)); + rf->remote_consistent_lsn = pq_getmsgint64(reply_message); + elog(DEBUG2, "ParsePageserverFeedbackMessage: remote_consistent_lsn %X/%X", + LSN_FORMAT_ARGS(rf->remote_consistent_lsn)); } - else if (strcmp(key, "ps_replytime") == 0) + else if ((strcmp(key, "ps_replytime") == 0) || (strcmp(key, "replytime") == 0)) { pq_getmsgint(reply_message, sizeof(int32)); /* read value length */ - rf->ps_replytime = pq_getmsgint64(reply_message); + rf->replytime = pq_getmsgint64(reply_message); { char *replyTimeStr; /* Copy because timestamptz_to_str returns a static buffer */ - replyTimeStr = pstrdup(timestamptz_to_str(rf->ps_replytime)); - elog(DEBUG2, "ParseReplicationFeedbackMessage: ps_replytime %lu reply_time: %s", - rf->ps_replytime, replyTimeStr); + replyTimeStr = pstrdup(timestamptz_to_str(rf->replytime)); + elog(DEBUG2, "ParsePageserverFeedbackMessage: replytime %lu reply_time: %s", + rf->replytime, replyTimeStr); pfree(replyTimeStr); } @@ -1944,7 +1944,7 @@ ParseReplicationFeedbackMessage(StringInfo reply_message, ReplicationFeedback * * Skip unknown keys to support backward compatibile protocol * changes */ - elog(LOG, "ParseReplicationFeedbackMessage: unknown key: %s len %d", key, len); + elog(LOG, "ParsePageserverFeedbackMessage: unknown key: %s len %d", key, len); pq_getmsgbytes(reply_message, len); }; } @@ -2024,7 +2024,7 @@ GetAcknowledgedByQuorumWALPosition(void) } /* - * ReplicationFeedbackShmemSize --- report amount of shared memory space needed + * WalproposerShmemSize --- report amount of shared memory space needed */ Size WalproposerShmemSize(void) @@ -2054,10 +2054,10 @@ WalproposerShmemInit(void) } void -replication_feedback_set(ReplicationFeedback * rf) +replication_feedback_set(PageserverFeedback * rf) { SpinLockAcquire(&walprop_shared->mutex); - memcpy(&walprop_shared->feedback, rf, sizeof(ReplicationFeedback)); + memcpy(&walprop_shared->feedback, rf, sizeof(PageserverFeedback)); SpinLockRelease(&walprop_shared->mutex); } @@ -2065,43 +2065,43 @@ void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn) { SpinLockAcquire(&walprop_shared->mutex); - *writeLsn = walprop_shared->feedback.ps_writelsn; - *flushLsn = walprop_shared->feedback.ps_flushlsn; - *applyLsn = walprop_shared->feedback.ps_applylsn; + *writeLsn = walprop_shared->feedback.last_received_lsn; + *flushLsn = walprop_shared->feedback.disk_consistent_lsn; + *applyLsn = walprop_shared->feedback.remote_consistent_lsn; SpinLockRelease(&walprop_shared->mutex); } /* - * Get ReplicationFeedback fields from the most advanced safekeeper + * Get PageserverFeedback fields from the most advanced safekeeper */ static void -GetLatestNeonFeedback(ReplicationFeedback * rf) +GetLatestNeonFeedback(PageserverFeedback * rf) { int latest_safekeeper = 0; - XLogRecPtr ps_writelsn = InvalidXLogRecPtr; + XLogRecPtr last_received_lsn = InvalidXLogRecPtr; for (int i = 0; i < n_safekeepers; i++) { - if (safekeeper[i].appendResponse.rf.ps_writelsn > ps_writelsn) + if (safekeeper[i].appendResponse.rf.last_received_lsn > last_received_lsn) { latest_safekeeper = i; - ps_writelsn = safekeeper[i].appendResponse.rf.ps_writelsn; + last_received_lsn = safekeeper[i].appendResponse.rf.last_received_lsn; } } rf->currentClusterSize = safekeeper[latest_safekeeper].appendResponse.rf.currentClusterSize; - rf->ps_writelsn = safekeeper[latest_safekeeper].appendResponse.rf.ps_writelsn; - rf->ps_flushlsn = safekeeper[latest_safekeeper].appendResponse.rf.ps_flushlsn; - rf->ps_applylsn = safekeeper[latest_safekeeper].appendResponse.rf.ps_applylsn; - rf->ps_replytime = safekeeper[latest_safekeeper].appendResponse.rf.ps_replytime; + rf->last_received_lsn = safekeeper[latest_safekeeper].appendResponse.rf.last_received_lsn; + rf->disk_consistent_lsn = safekeeper[latest_safekeeper].appendResponse.rf.disk_consistent_lsn; + rf->remote_consistent_lsn = safekeeper[latest_safekeeper].appendResponse.rf.remote_consistent_lsn; + rf->replytime = safekeeper[latest_safekeeper].appendResponse.rf.replytime; elog(DEBUG2, "GetLatestNeonFeedback: currentClusterSize %lu," - " ps_writelsn %X/%X, ps_flushlsn %X/%X, ps_applylsn %X/%X, ps_replytime %lu", + " last_received_lsn %X/%X, disk_consistent_lsn %X/%X, remote_consistent_lsn %X/%X, replytime %lu", rf->currentClusterSize, - LSN_FORMAT_ARGS(rf->ps_writelsn), - LSN_FORMAT_ARGS(rf->ps_flushlsn), - LSN_FORMAT_ARGS(rf->ps_applylsn), - rf->ps_replytime); + LSN_FORMAT_ARGS(rf->last_received_lsn), + LSN_FORMAT_ARGS(rf->disk_consistent_lsn), + LSN_FORMAT_ARGS(rf->remote_consistent_lsn), + rf->replytime); replication_feedback_set(rf); } @@ -2115,16 +2115,16 @@ HandleSafekeeperResponse(void) XLogRecPtr minFlushLsn; minQuorumLsn = GetAcknowledgedByQuorumWALPosition(); - diskConsistentLsn = quorumFeedback.rf.ps_flushlsn; + diskConsistentLsn = quorumFeedback.rf.disk_consistent_lsn; if (!syncSafekeepers) { - /* Get ReplicationFeedback fields from the most advanced safekeeper */ + /* Get PageserverFeedback fields from the most advanced safekeeper */ GetLatestNeonFeedback(&quorumFeedback.rf); SetZenithCurrentClusterSize(quorumFeedback.rf.currentClusterSize); } - if (minQuorumLsn > quorumFeedback.flushLsn || diskConsistentLsn != quorumFeedback.rf.ps_flushlsn) + if (minQuorumLsn > quorumFeedback.flushLsn || diskConsistentLsn != quorumFeedback.rf.disk_consistent_lsn) { if (minQuorumLsn > quorumFeedback.flushLsn) @@ -2142,7 +2142,7 @@ HandleSafekeeperResponse(void) * apply_lsn - This is what processed and durably saved at* * pageserver. */ - quorumFeedback.rf.ps_flushlsn, + quorumFeedback.rf.disk_consistent_lsn, GetCurrentTimestamp(), false); } @@ -2326,7 +2326,7 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage * anymsg) msg->hs.xmin.value = pq_getmsgint64_le(&s); msg->hs.catalog_xmin.value = pq_getmsgint64_le(&s); if (buf_size > APPENDRESPONSE_FIXEDPART_SIZE) - ParseReplicationFeedbackMessage(&s, &msg->rf); + ParsePageserverFeedbackMessage(&s, &msg->rf); pq_getmsgend(&s); return true; } @@ -2462,7 +2462,7 @@ backpressure_lag_impl(void) replication_feedback_get_lsns(&writePtr, &flushPtr, &applyPtr); #define MB ((XLogRecPtr)1024 * 1024) - elog(DEBUG2, "current flushLsn %X/%X ReplicationFeedback: write %X/%X flush %X/%X apply %X/%X", + elog(DEBUG2, "current flushLsn %X/%X PageserverFeedback: write %X/%X flush %X/%X apply %X/%X", LSN_FORMAT_ARGS(myFlushLsn), LSN_FORMAT_ARGS(writePtr), LSN_FORMAT_ARGS(flushPtr), diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index 537c733850..f016a229eb 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -280,21 +280,21 @@ typedef struct HotStandbyFeedback FullTransactionId catalog_xmin; } HotStandbyFeedback; -typedef struct ReplicationFeedback +typedef struct PageserverFeedback { /* current size of the timeline on pageserver */ uint64 currentClusterSize; /* standby_status_update fields that safekeeper received from pageserver */ - XLogRecPtr ps_writelsn; - XLogRecPtr ps_flushlsn; - XLogRecPtr ps_applylsn; - TimestampTz ps_replytime; -} ReplicationFeedback; + XLogRecPtr last_received_lsn; + XLogRecPtr disk_consistent_lsn; + XLogRecPtr remote_consistent_lsn; + TimestampTz replytime; +} PageserverFeedback; typedef struct WalproposerShmemState { slock_t mutex; - ReplicationFeedback feedback; + PageserverFeedback feedback; term_t mineLastElectedTerm; pg_atomic_uint64 backpressureThrottlingTime; } WalproposerShmemState; @@ -320,10 +320,10 @@ typedef struct AppendResponse /* Feedback recieved from pageserver includes standby_status_update fields */ /* and custom neon feedback. */ /* This part of the message is extensible. */ - ReplicationFeedback rf; + PageserverFeedback rf; } AppendResponse; -/* ReplicationFeedback is extensible part of the message that is parsed separately */ +/* PageserverFeedback is extensible part of the message that is parsed separately */ /* Other fields are fixed part */ #define APPENDRESPONSE_FIXEDPART_SIZE offsetof(AppendResponse, rf) @@ -383,13 +383,13 @@ extern void WalProposerSync(int argc, char *argv[]); extern void WalProposerMain(Datum main_arg); extern void WalProposerBroadcast(XLogRecPtr startpos, XLogRecPtr endpos); extern void WalProposerPoll(void); -extern void ParseReplicationFeedbackMessage(StringInfo reply_message, - ReplicationFeedback *rf); +extern void ParsePageserverFeedbackMessage(StringInfo reply_message, + PageserverFeedback *rf); extern void StartProposerReplication(StartReplicationCmd *cmd); extern Size WalproposerShmemSize(void); extern bool WalproposerShmemInit(void); -extern void replication_feedback_set(ReplicationFeedback *rf); +extern void replication_feedback_set(PageserverFeedback *rf); extern void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn); /* libpqwalproposer hooks & helper type */ diff --git a/safekeeper/src/metrics.rs b/safekeeper/src/metrics.rs index c3077b6dc5..2aaa17bfc5 100644 --- a/safekeeper/src/metrics.rs +++ b/safekeeper/src/metrics.rs @@ -255,7 +255,7 @@ pub struct TimelineCollector { epoch_start_lsn: GenericGaugeVec, peer_horizon_lsn: GenericGaugeVec, remote_consistent_lsn: GenericGaugeVec, - feedback_ps_write_lsn: GenericGaugeVec, + ps_last_received_lsn: GenericGaugeVec, feedback_last_time_seconds: GenericGaugeVec, timeline_active: GenericGaugeVec, wal_backup_active: GenericGaugeVec, @@ -339,15 +339,15 @@ impl TimelineCollector { .unwrap(); descs.extend(remote_consistent_lsn.desc().into_iter().cloned()); - let feedback_ps_write_lsn = GenericGaugeVec::new( + let ps_last_received_lsn = GenericGaugeVec::new( Opts::new( - "safekeeper_feedback_ps_write_lsn", + "safekeeper_ps_last_received_lsn", "Last LSN received by the pageserver, acknowledged in the feedback", ), &["tenant_id", "timeline_id"], ) .unwrap(); - descs.extend(feedback_ps_write_lsn.desc().into_iter().cloned()); + descs.extend(ps_last_received_lsn.desc().into_iter().cloned()); let feedback_last_time_seconds = GenericGaugeVec::new( Opts::new( @@ -458,7 +458,7 @@ impl TimelineCollector { epoch_start_lsn, peer_horizon_lsn, remote_consistent_lsn, - feedback_ps_write_lsn, + ps_last_received_lsn, feedback_last_time_seconds, timeline_active, wal_backup_active, @@ -489,7 +489,7 @@ impl Collector for TimelineCollector { self.epoch_start_lsn.reset(); self.peer_horizon_lsn.reset(); self.remote_consistent_lsn.reset(); - self.feedback_ps_write_lsn.reset(); + self.ps_last_received_lsn.reset(); self.feedback_last_time_seconds.reset(); self.timeline_active.reset(); self.wal_backup_active.reset(); @@ -514,11 +514,11 @@ impl Collector for TimelineCollector { let timeline_id = tli.ttid.timeline_id.to_string(); let labels = &[tenant_id.as_str(), timeline_id.as_str()]; - let mut most_advanced: Option = None; + let mut most_advanced: Option = None; for replica in tli.replicas.iter() { if let Some(replica_feedback) = replica.pageserver_feedback { if let Some(current) = most_advanced { - if current.ps_writelsn < replica_feedback.ps_writelsn { + if current.last_received_lsn < replica_feedback.last_received_lsn { most_advanced = Some(replica_feedback); } } else { @@ -568,11 +568,10 @@ impl Collector for TimelineCollector { .set(tli.wal_storage.flush_wal_seconds); if let Some(feedback) = most_advanced { - self.feedback_ps_write_lsn + self.ps_last_received_lsn .with_label_values(labels) - .set(feedback.ps_writelsn); - if let Ok(unix_time) = feedback.ps_replytime.duration_since(SystemTime::UNIX_EPOCH) - { + .set(feedback.last_received_lsn); + if let Ok(unix_time) = feedback.replytime.duration_since(SystemTime::UNIX_EPOCH) { self.feedback_last_time_seconds .with_label_values(labels) .set(unix_time.as_secs()); @@ -599,7 +598,7 @@ impl Collector for TimelineCollector { mfs.extend(self.epoch_start_lsn.collect()); mfs.extend(self.peer_horizon_lsn.collect()); mfs.extend(self.remote_consistent_lsn.collect()); - mfs.extend(self.feedback_ps_write_lsn.collect()); + mfs.extend(self.ps_last_received_lsn.collect()); mfs.extend(self.feedback_last_time_seconds.collect()); mfs.extend(self.timeline_active.collect()); mfs.extend(self.wal_backup_active.collect()); diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index d8fe36d7f8..10b4842cbd 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -18,7 +18,7 @@ use crate::control_file; use crate::send_wal::HotStandbyFeedback; use crate::wal_storage; -use pq_proto::{ReplicationFeedback, SystemId}; +use pq_proto::{PageserverFeedback, SystemId}; use utils::{ bin_ser::LeSer, id::{NodeId, TenantId, TenantTimelineId, TimelineId}, @@ -360,7 +360,7 @@ pub struct AppendResponse { // a criterion for walproposer --sync mode exit pub commit_lsn: Lsn, pub hs_feedback: HotStandbyFeedback, - pub pageserver_feedback: ReplicationFeedback, + pub pageserver_feedback: PageserverFeedback, } impl AppendResponse { @@ -370,7 +370,7 @@ impl AppendResponse { flush_lsn: Lsn(0), commit_lsn: Lsn(0), hs_feedback: HotStandbyFeedback::empty(), - pageserver_feedback: ReplicationFeedback::empty(), + pageserver_feedback: PageserverFeedback::empty(), } } } @@ -708,7 +708,7 @@ where commit_lsn: self.state.commit_lsn, // will be filled by the upper code to avoid bothering safekeeper hs_feedback: HotStandbyFeedback::empty(), - pageserver_feedback: ReplicationFeedback::empty(), + pageserver_feedback: PageserverFeedback::empty(), }; trace!("formed AppendResponse {:?}", ar); ar diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index b533e87c5b..a6ca89efa4 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -11,7 +11,7 @@ use postgres_backend::PostgresBackend; use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError}; use postgres_ffi::get_current_timestamp; use postgres_ffi::{TimestampTz, MAX_SEND_SIZE}; -use pq_proto::{BeMessage, ReplicationFeedback, WalSndKeepAlive, XLogDataBody}; +use pq_proto::{BeMessage, PageserverFeedback, WalSndKeepAlive, XLogDataBody}; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncWrite}; @@ -319,11 +319,9 @@ impl ReplyReader { // pageserver sends this. // Note: deserializing is on m[9..] because we skip the tag byte and len bytes. let buf = Bytes::copy_from_slice(&msg[9..]); - let reply = ReplicationFeedback::parse(buf); + let reply = PageserverFeedback::parse(buf); - trace!("ReplicationFeedback is {:?}", reply); - // Only pageserver sends ReplicationFeedback, so set the flag. - // This replica is the source of information to resend to compute. + trace!("PageserverFeedback is {:?}", reply); self.feedback.pageserver_feedback = Some(reply); self.tli diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 931062db1a..9dd8a63cf0 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -4,7 +4,7 @@ use anyhow::{anyhow, bail, Result}; use parking_lot::{Mutex, MutexGuard}; use postgres_ffi::XLogSegNo; -use pq_proto::ReplicationFeedback; +use pq_proto::PageserverFeedback; use serde::Serialize; use std::cmp::{max, min}; use std::path::PathBuf; @@ -91,7 +91,7 @@ pub struct ReplicaState { /// combined hot standby feedback from all replicas pub hs_feedback: HotStandbyFeedback, /// Replication specific feedback received from pageserver, if any - pub pageserver_feedback: Option, + pub pageserver_feedback: Option, } impl Default for ReplicaState { @@ -276,7 +276,7 @@ impl SharedState { // if let Some(pageserver_feedback) = state.pageserver_feedback { if let Some(acc_feedback) = acc.pageserver_feedback { - if acc_feedback.ps_writelsn < pageserver_feedback.ps_writelsn { + if acc_feedback.last_received_lsn < pageserver_feedback.last_received_lsn { warn!("More than one pageserver is streaming WAL for the timeline. Feedback resolving is not fully supported yet."); acc.pageserver_feedback = Some(pageserver_feedback); } @@ -287,12 +287,12 @@ impl SharedState { // last lsn received by pageserver // FIXME if multiple pageservers are streaming WAL, last_received_lsn must be tracked per pageserver. // See https://github.com/neondatabase/neon/issues/1171 - acc.last_received_lsn = Lsn::from(pageserver_feedback.ps_writelsn); + acc.last_received_lsn = Lsn::from(pageserver_feedback.last_received_lsn); // When at least one pageserver has preserved data up to remote_consistent_lsn, // safekeeper is free to delete it, so choose max of all pageservers. acc.remote_consistent_lsn = max( - Lsn::from(pageserver_feedback.ps_applylsn), + Lsn::from(pageserver_feedback.remote_consistent_lsn), acc.remote_consistent_lsn, ); } @@ -585,7 +585,7 @@ impl Timeline { let replica_state = shared_state.replicas[replica_id].unwrap(); let reported_remote_consistent_lsn = replica_state .pageserver_feedback - .map(|f| Lsn(f.ps_applylsn)) + .map(|f| Lsn(f.remote_consistent_lsn)) .unwrap_or(Lsn::INVALID); let stop = shared_state.sk.inmem.commit_lsn == Lsn(0) || // no data at all yet (reported_remote_consistent_lsn!= Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet.