diff --git a/libs/pq_proto/src/lib.rs b/libs/pq_proto/src/lib.rs index 656c0ff312..a976e19029 100644 --- a/libs/pq_proto/src/lib.rs +++ b/libs/pq_proto/src/lib.rs @@ -936,35 +936,40 @@ impl<'a> BeMessage<'a> { } } -// Neon extension of postgres replication protocol -// See NEON_STATUS_UPDATE_TAG_BYTE +/// Feedback pageserver sends to safekeeper and safekeeper resends to compute. +/// Serialized in custom flexible key/value format. In replication protocol, it +/// is marked with NEON_STATUS_UPDATE_TAG_BYTE to differentiate from postgres +/// Standby status update / Hot standby feedback messages. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] -pub struct ReplicationFeedback { - // Last known size of the timeline. Used to enforce timeline size limit. +pub struct PageserverFeedback { + /// Last known size of the timeline. Used to enforce timeline size limit. pub current_timeline_size: u64, - // Parts of StandbyStatusUpdate we resend to compute via safekeeper - pub ps_writelsn: u64, - pub ps_applylsn: u64, - pub ps_flushlsn: u64, - pub ps_replytime: SystemTime, + /// LSN last received and ingested by the pageserver. + pub last_received_lsn: u64, + /// LSN up to which data is persisted by the pageserver to its local disc. + pub disk_consistent_lsn: u64, + /// LSN up to which data is persisted by the pageserver on s3; safekeepers + /// consider WAL before it can be removed. + pub remote_consistent_lsn: u64, + pub replytime: SystemTime, } -// NOTE: Do not forget to increment this number when adding new fields to ReplicationFeedback. +// NOTE: Do not forget to increment this number when adding new fields to PageserverFeedback. // Do not remove previously available fields because this might be backwards incompatible. -pub const REPLICATION_FEEDBACK_FIELDS_NUMBER: u8 = 5; +pub const PAGESERVER_FEEDBACK_FIELDS_NUMBER: u8 = 5; -impl ReplicationFeedback { - pub fn empty() -> ReplicationFeedback { - ReplicationFeedback { +impl PageserverFeedback { + pub fn empty() -> PageserverFeedback { + PageserverFeedback { current_timeline_size: 0, - ps_writelsn: 0, - ps_applylsn: 0, - ps_flushlsn: 0, - ps_replytime: SystemTime::now(), + last_received_lsn: 0, + remote_consistent_lsn: 0, + disk_consistent_lsn: 0, + replytime: SystemTime::now(), } } - // Serialize ReplicationFeedback using custom format + // Serialize PageserverFeedback using custom format // to support protocol extensibility. // // Following layout is used: @@ -974,24 +979,26 @@ impl ReplicationFeedback { // null-terminated string - key, // uint32 - value length in bytes // value itself + // + // TODO: change serialized fields names once all computes migrate to rename. pub fn serialize(&self, buf: &mut BytesMut) { - buf.put_u8(REPLICATION_FEEDBACK_FIELDS_NUMBER); // # of keys + buf.put_u8(PAGESERVER_FEEDBACK_FIELDS_NUMBER); // # of keys buf.put_slice(b"current_timeline_size\0"); buf.put_i32(8); buf.put_u64(self.current_timeline_size); buf.put_slice(b"ps_writelsn\0"); buf.put_i32(8); - buf.put_u64(self.ps_writelsn); + buf.put_u64(self.last_received_lsn); buf.put_slice(b"ps_flushlsn\0"); buf.put_i32(8); - buf.put_u64(self.ps_flushlsn); + buf.put_u64(self.disk_consistent_lsn); buf.put_slice(b"ps_applylsn\0"); buf.put_i32(8); - buf.put_u64(self.ps_applylsn); + buf.put_u64(self.remote_consistent_lsn); let timestamp = self - .ps_replytime + .replytime .duration_since(*PG_EPOCH) .expect("failed to serialize pg_replytime earlier than PG_EPOCH") .as_micros() as i64; @@ -1001,9 +1008,10 @@ impl ReplicationFeedback { buf.put_i64(timestamp); } - // Deserialize ReplicationFeedback message - pub fn parse(mut buf: Bytes) -> ReplicationFeedback { - let mut rf = ReplicationFeedback::empty(); + // Deserialize PageserverFeedback message + // TODO: change serialized fields names once all computes migrate to rename. + pub fn parse(mut buf: Bytes) -> PageserverFeedback { + let mut rf = PageserverFeedback::empty(); let nfields = buf.get_u8(); for _ in 0..nfields { let key = read_cstr(&mut buf).unwrap(); @@ -1016,39 +1024,39 @@ impl ReplicationFeedback { b"ps_writelsn" => { let len = buf.get_i32(); assert_eq!(len, 8); - rf.ps_writelsn = buf.get_u64(); + rf.last_received_lsn = buf.get_u64(); } b"ps_flushlsn" => { let len = buf.get_i32(); assert_eq!(len, 8); - rf.ps_flushlsn = buf.get_u64(); + rf.disk_consistent_lsn = buf.get_u64(); } b"ps_applylsn" => { let len = buf.get_i32(); assert_eq!(len, 8); - rf.ps_applylsn = buf.get_u64(); + rf.remote_consistent_lsn = buf.get_u64(); } b"ps_replytime" => { let len = buf.get_i32(); assert_eq!(len, 8); let raw_time = buf.get_i64(); if raw_time > 0 { - rf.ps_replytime = *PG_EPOCH + Duration::from_micros(raw_time as u64); + rf.replytime = *PG_EPOCH + Duration::from_micros(raw_time as u64); } else { - rf.ps_replytime = *PG_EPOCH - Duration::from_micros(-raw_time as u64); + rf.replytime = *PG_EPOCH - Duration::from_micros(-raw_time as u64); } } _ => { let len = buf.get_i32(); warn!( - "ReplicationFeedback parse. unknown key {} of len {len}. Skip it.", + "PageserverFeedback parse. unknown key {} of len {len}. Skip it.", String::from_utf8_lossy(key.as_ref()) ); buf.advance(len as usize); } } } - trace!("ReplicationFeedback parsed is {:?}", rf); + trace!("PageserverFeedback parsed is {:?}", rf); rf } } @@ -1059,33 +1067,33 @@ mod tests { #[test] fn test_replication_feedback_serialization() { - let mut rf = ReplicationFeedback::empty(); + let mut rf = PageserverFeedback::empty(); // Fill rf with some values rf.current_timeline_size = 12345678; // Set rounded time to be able to compare it with deserialized value, // because it is rounded up to microseconds during serialization. - rf.ps_replytime = *PG_EPOCH + Duration::from_secs(100_000_000); + rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000); let mut data = BytesMut::new(); rf.serialize(&mut data); - let rf_parsed = ReplicationFeedback::parse(data.freeze()); + let rf_parsed = PageserverFeedback::parse(data.freeze()); assert_eq!(rf, rf_parsed); } #[test] fn test_replication_feedback_unknown_key() { - let mut rf = ReplicationFeedback::empty(); + let mut rf = PageserverFeedback::empty(); // Fill rf with some values rf.current_timeline_size = 12345678; // Set rounded time to be able to compare it with deserialized value, // because it is rounded up to microseconds during serialization. - rf.ps_replytime = *PG_EPOCH + Duration::from_secs(100_000_000); + rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000); let mut data = BytesMut::new(); rf.serialize(&mut data); // Add an extra field to the buffer and adjust number of keys if let Some(first) = data.first_mut() { - *first = REPLICATION_FEEDBACK_FIELDS_NUMBER + 1; + *first = PAGESERVER_FEEDBACK_FIELDS_NUMBER + 1; } data.put_slice(b"new_field_one\0"); @@ -1093,7 +1101,7 @@ mod tests { data.put_u64(42); // Parse serialized data and check that new field is not parsed - let rf_parsed = ReplicationFeedback::parse(data.freeze()); + let rf_parsed = PageserverFeedback::parse(data.freeze()); assert_eq!(rf, rf_parsed); } diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 9398a7bee9..ea2f2392ea 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -37,7 +37,7 @@ use crate::{ use postgres_backend::is_expected_io_error; use postgres_connection::PgConnectionConfig; use postgres_ffi::waldecoder::WalStreamDecoder; -use pq_proto::ReplicationFeedback; +use pq_proto::PageserverFeedback; use utils::lsn::Lsn; /// Status of the connection. @@ -319,12 +319,12 @@ pub async fn handle_walreceiver_connection( timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0)); // The last LSN we processed. It is not guaranteed to survive pageserver crash. - let write_lsn = u64::from(last_lsn); + let last_received_lsn = u64::from(last_lsn); // `disk_consistent_lsn` is the LSN at which page server guarantees local persistence of all received data - let flush_lsn = u64::from(timeline.get_disk_consistent_lsn()); + let disk_consistent_lsn = u64::from(timeline.get_disk_consistent_lsn()); // The last LSN that is synced to remote storage and is guaranteed to survive pageserver crash // Used by safekeepers to remove WAL preceding `remote_consistent_lsn`. - let apply_lsn = u64::from(timeline_remote_consistent_lsn); + let remote_consistent_lsn = u64::from(timeline_remote_consistent_lsn); let ts = SystemTime::now(); // Update the status about what we just received. This is shown in the mgmt API. @@ -343,12 +343,12 @@ pub async fn handle_walreceiver_connection( let (timeline_logical_size, _) = timeline .get_current_logical_size(&ctx) .context("Status update creation failed to get current logical size")?; - let status_update = ReplicationFeedback { + let status_update = PageserverFeedback { current_timeline_size: timeline_logical_size, - ps_writelsn: write_lsn, - ps_flushlsn: flush_lsn, - ps_applylsn: apply_lsn, - ps_replytime: ts, + last_received_lsn, + disk_consistent_lsn, + remote_consistent_lsn, + replytime: ts, }; debug!("neon_status_update {status_update:?}"); diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index b0b2a23e3c..45037a8c01 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -1872,9 +1872,9 @@ RecvAppendResponses(Safekeeper *sk) return sk->state == SS_ACTIVE; } -/* Parse a ReplicationFeedback message, or the ReplicationFeedback part of an AppendResponse */ +/* Parse a PageserverFeedback message, or the PageserverFeedback part of an AppendResponse */ void -ParseReplicationFeedbackMessage(StringInfo reply_message, ReplicationFeedback * rf) +ParsePageserverFeedbackMessage(StringInfo reply_message, PageserverFeedback * rf) { uint8 nkeys; int i; @@ -1892,45 +1892,45 @@ ParseReplicationFeedbackMessage(StringInfo reply_message, ReplicationFeedback * pq_getmsgint(reply_message, sizeof(int32)); /* read value length */ rf->currentClusterSize = pq_getmsgint64(reply_message); - elog(DEBUG2, "ParseReplicationFeedbackMessage: current_timeline_size %lu", + elog(DEBUG2, "ParsePageserverFeedbackMessage: current_timeline_size %lu", rf->currentClusterSize); } - else if (strcmp(key, "ps_writelsn") == 0) + else if ((strcmp(key, "ps_writelsn") == 0) || (strcmp(key, "last_received_lsn") == 0)) { pq_getmsgint(reply_message, sizeof(int32)); /* read value length */ - rf->ps_writelsn = pq_getmsgint64(reply_message); - elog(DEBUG2, "ParseReplicationFeedbackMessage: ps_writelsn %X/%X", - LSN_FORMAT_ARGS(rf->ps_writelsn)); + rf->last_received_lsn = pq_getmsgint64(reply_message); + elog(DEBUG2, "ParsePageserverFeedbackMessage: last_received_lsn %X/%X", + LSN_FORMAT_ARGS(rf->last_received_lsn)); } - else if (strcmp(key, "ps_flushlsn") == 0) + else if ((strcmp(key, "ps_flushlsn") == 0) || (strcmp(key, "disk_consistent_lsn") == 0)) { pq_getmsgint(reply_message, sizeof(int32)); /* read value length */ - rf->ps_flushlsn = pq_getmsgint64(reply_message); - elog(DEBUG2, "ParseReplicationFeedbackMessage: ps_flushlsn %X/%X", - LSN_FORMAT_ARGS(rf->ps_flushlsn)); + rf->disk_consistent_lsn = pq_getmsgint64(reply_message); + elog(DEBUG2, "ParsePageserverFeedbackMessage: disk_consistent_lsn %X/%X", + LSN_FORMAT_ARGS(rf->disk_consistent_lsn)); } - else if (strcmp(key, "ps_applylsn") == 0) + else if ((strcmp(key, "ps_applylsn") == 0) || (strcmp(key, "remote_consistent_lsn") == 0)) { pq_getmsgint(reply_message, sizeof(int32)); /* read value length */ - rf->ps_applylsn = pq_getmsgint64(reply_message); - elog(DEBUG2, "ParseReplicationFeedbackMessage: ps_applylsn %X/%X", - LSN_FORMAT_ARGS(rf->ps_applylsn)); + rf->remote_consistent_lsn = pq_getmsgint64(reply_message); + elog(DEBUG2, "ParsePageserverFeedbackMessage: remote_consistent_lsn %X/%X", + LSN_FORMAT_ARGS(rf->remote_consistent_lsn)); } - else if (strcmp(key, "ps_replytime") == 0) + else if ((strcmp(key, "ps_replytime") == 0) || (strcmp(key, "replytime") == 0)) { pq_getmsgint(reply_message, sizeof(int32)); /* read value length */ - rf->ps_replytime = pq_getmsgint64(reply_message); + rf->replytime = pq_getmsgint64(reply_message); { char *replyTimeStr; /* Copy because timestamptz_to_str returns a static buffer */ - replyTimeStr = pstrdup(timestamptz_to_str(rf->ps_replytime)); - elog(DEBUG2, "ParseReplicationFeedbackMessage: ps_replytime %lu reply_time: %s", - rf->ps_replytime, replyTimeStr); + replyTimeStr = pstrdup(timestamptz_to_str(rf->replytime)); + elog(DEBUG2, "ParsePageserverFeedbackMessage: replytime %lu reply_time: %s", + rf->replytime, replyTimeStr); pfree(replyTimeStr); } @@ -1944,7 +1944,7 @@ ParseReplicationFeedbackMessage(StringInfo reply_message, ReplicationFeedback * * Skip unknown keys to support backward compatibile protocol * changes */ - elog(LOG, "ParseReplicationFeedbackMessage: unknown key: %s len %d", key, len); + elog(LOG, "ParsePageserverFeedbackMessage: unknown key: %s len %d", key, len); pq_getmsgbytes(reply_message, len); }; } @@ -2024,7 +2024,7 @@ GetAcknowledgedByQuorumWALPosition(void) } /* - * ReplicationFeedbackShmemSize --- report amount of shared memory space needed + * WalproposerShmemSize --- report amount of shared memory space needed */ Size WalproposerShmemSize(void) @@ -2054,10 +2054,10 @@ WalproposerShmemInit(void) } void -replication_feedback_set(ReplicationFeedback * rf) +replication_feedback_set(PageserverFeedback * rf) { SpinLockAcquire(&walprop_shared->mutex); - memcpy(&walprop_shared->feedback, rf, sizeof(ReplicationFeedback)); + memcpy(&walprop_shared->feedback, rf, sizeof(PageserverFeedback)); SpinLockRelease(&walprop_shared->mutex); } @@ -2065,43 +2065,43 @@ void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn) { SpinLockAcquire(&walprop_shared->mutex); - *writeLsn = walprop_shared->feedback.ps_writelsn; - *flushLsn = walprop_shared->feedback.ps_flushlsn; - *applyLsn = walprop_shared->feedback.ps_applylsn; + *writeLsn = walprop_shared->feedback.last_received_lsn; + *flushLsn = walprop_shared->feedback.disk_consistent_lsn; + *applyLsn = walprop_shared->feedback.remote_consistent_lsn; SpinLockRelease(&walprop_shared->mutex); } /* - * Get ReplicationFeedback fields from the most advanced safekeeper + * Get PageserverFeedback fields from the most advanced safekeeper */ static void -GetLatestNeonFeedback(ReplicationFeedback * rf) +GetLatestNeonFeedback(PageserverFeedback * rf) { int latest_safekeeper = 0; - XLogRecPtr ps_writelsn = InvalidXLogRecPtr; + XLogRecPtr last_received_lsn = InvalidXLogRecPtr; for (int i = 0; i < n_safekeepers; i++) { - if (safekeeper[i].appendResponse.rf.ps_writelsn > ps_writelsn) + if (safekeeper[i].appendResponse.rf.last_received_lsn > last_received_lsn) { latest_safekeeper = i; - ps_writelsn = safekeeper[i].appendResponse.rf.ps_writelsn; + last_received_lsn = safekeeper[i].appendResponse.rf.last_received_lsn; } } rf->currentClusterSize = safekeeper[latest_safekeeper].appendResponse.rf.currentClusterSize; - rf->ps_writelsn = safekeeper[latest_safekeeper].appendResponse.rf.ps_writelsn; - rf->ps_flushlsn = safekeeper[latest_safekeeper].appendResponse.rf.ps_flushlsn; - rf->ps_applylsn = safekeeper[latest_safekeeper].appendResponse.rf.ps_applylsn; - rf->ps_replytime = safekeeper[latest_safekeeper].appendResponse.rf.ps_replytime; + rf->last_received_lsn = safekeeper[latest_safekeeper].appendResponse.rf.last_received_lsn; + rf->disk_consistent_lsn = safekeeper[latest_safekeeper].appendResponse.rf.disk_consistent_lsn; + rf->remote_consistent_lsn = safekeeper[latest_safekeeper].appendResponse.rf.remote_consistent_lsn; + rf->replytime = safekeeper[latest_safekeeper].appendResponse.rf.replytime; elog(DEBUG2, "GetLatestNeonFeedback: currentClusterSize %lu," - " ps_writelsn %X/%X, ps_flushlsn %X/%X, ps_applylsn %X/%X, ps_replytime %lu", + " last_received_lsn %X/%X, disk_consistent_lsn %X/%X, remote_consistent_lsn %X/%X, replytime %lu", rf->currentClusterSize, - LSN_FORMAT_ARGS(rf->ps_writelsn), - LSN_FORMAT_ARGS(rf->ps_flushlsn), - LSN_FORMAT_ARGS(rf->ps_applylsn), - rf->ps_replytime); + LSN_FORMAT_ARGS(rf->last_received_lsn), + LSN_FORMAT_ARGS(rf->disk_consistent_lsn), + LSN_FORMAT_ARGS(rf->remote_consistent_lsn), + rf->replytime); replication_feedback_set(rf); } @@ -2115,16 +2115,16 @@ HandleSafekeeperResponse(void) XLogRecPtr minFlushLsn; minQuorumLsn = GetAcknowledgedByQuorumWALPosition(); - diskConsistentLsn = quorumFeedback.rf.ps_flushlsn; + diskConsistentLsn = quorumFeedback.rf.disk_consistent_lsn; if (!syncSafekeepers) { - /* Get ReplicationFeedback fields from the most advanced safekeeper */ + /* Get PageserverFeedback fields from the most advanced safekeeper */ GetLatestNeonFeedback(&quorumFeedback.rf); SetZenithCurrentClusterSize(quorumFeedback.rf.currentClusterSize); } - if (minQuorumLsn > quorumFeedback.flushLsn || diskConsistentLsn != quorumFeedback.rf.ps_flushlsn) + if (minQuorumLsn > quorumFeedback.flushLsn || diskConsistentLsn != quorumFeedback.rf.disk_consistent_lsn) { if (minQuorumLsn > quorumFeedback.flushLsn) @@ -2142,7 +2142,7 @@ HandleSafekeeperResponse(void) * apply_lsn - This is what processed and durably saved at* * pageserver. */ - quorumFeedback.rf.ps_flushlsn, + quorumFeedback.rf.disk_consistent_lsn, GetCurrentTimestamp(), false); } @@ -2326,7 +2326,7 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage * anymsg) msg->hs.xmin.value = pq_getmsgint64_le(&s); msg->hs.catalog_xmin.value = pq_getmsgint64_le(&s); if (buf_size > APPENDRESPONSE_FIXEDPART_SIZE) - ParseReplicationFeedbackMessage(&s, &msg->rf); + ParsePageserverFeedbackMessage(&s, &msg->rf); pq_getmsgend(&s); return true; } @@ -2462,7 +2462,7 @@ backpressure_lag_impl(void) replication_feedback_get_lsns(&writePtr, &flushPtr, &applyPtr); #define MB ((XLogRecPtr)1024 * 1024) - elog(DEBUG2, "current flushLsn %X/%X ReplicationFeedback: write %X/%X flush %X/%X apply %X/%X", + elog(DEBUG2, "current flushLsn %X/%X PageserverFeedback: write %X/%X flush %X/%X apply %X/%X", LSN_FORMAT_ARGS(myFlushLsn), LSN_FORMAT_ARGS(writePtr), LSN_FORMAT_ARGS(flushPtr), diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index 537c733850..f016a229eb 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -280,21 +280,21 @@ typedef struct HotStandbyFeedback FullTransactionId catalog_xmin; } HotStandbyFeedback; -typedef struct ReplicationFeedback +typedef struct PageserverFeedback { /* current size of the timeline on pageserver */ uint64 currentClusterSize; /* standby_status_update fields that safekeeper received from pageserver */ - XLogRecPtr ps_writelsn; - XLogRecPtr ps_flushlsn; - XLogRecPtr ps_applylsn; - TimestampTz ps_replytime; -} ReplicationFeedback; + XLogRecPtr last_received_lsn; + XLogRecPtr disk_consistent_lsn; + XLogRecPtr remote_consistent_lsn; + TimestampTz replytime; +} PageserverFeedback; typedef struct WalproposerShmemState { slock_t mutex; - ReplicationFeedback feedback; + PageserverFeedback feedback; term_t mineLastElectedTerm; pg_atomic_uint64 backpressureThrottlingTime; } WalproposerShmemState; @@ -320,10 +320,10 @@ typedef struct AppendResponse /* Feedback recieved from pageserver includes standby_status_update fields */ /* and custom neon feedback. */ /* This part of the message is extensible. */ - ReplicationFeedback rf; + PageserverFeedback rf; } AppendResponse; -/* ReplicationFeedback is extensible part of the message that is parsed separately */ +/* PageserverFeedback is extensible part of the message that is parsed separately */ /* Other fields are fixed part */ #define APPENDRESPONSE_FIXEDPART_SIZE offsetof(AppendResponse, rf) @@ -383,13 +383,13 @@ extern void WalProposerSync(int argc, char *argv[]); extern void WalProposerMain(Datum main_arg); extern void WalProposerBroadcast(XLogRecPtr startpos, XLogRecPtr endpos); extern void WalProposerPoll(void); -extern void ParseReplicationFeedbackMessage(StringInfo reply_message, - ReplicationFeedback *rf); +extern void ParsePageserverFeedbackMessage(StringInfo reply_message, + PageserverFeedback *rf); extern void StartProposerReplication(StartReplicationCmd *cmd); extern Size WalproposerShmemSize(void); extern bool WalproposerShmemInit(void); -extern void replication_feedback_set(ReplicationFeedback *rf); +extern void replication_feedback_set(PageserverFeedback *rf); extern void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn); /* libpqwalproposer hooks & helper type */ diff --git a/safekeeper/src/metrics.rs b/safekeeper/src/metrics.rs index c3077b6dc5..2aaa17bfc5 100644 --- a/safekeeper/src/metrics.rs +++ b/safekeeper/src/metrics.rs @@ -255,7 +255,7 @@ pub struct TimelineCollector { epoch_start_lsn: GenericGaugeVec, peer_horizon_lsn: GenericGaugeVec, remote_consistent_lsn: GenericGaugeVec, - feedback_ps_write_lsn: GenericGaugeVec, + ps_last_received_lsn: GenericGaugeVec, feedback_last_time_seconds: GenericGaugeVec, timeline_active: GenericGaugeVec, wal_backup_active: GenericGaugeVec, @@ -339,15 +339,15 @@ impl TimelineCollector { .unwrap(); descs.extend(remote_consistent_lsn.desc().into_iter().cloned()); - let feedback_ps_write_lsn = GenericGaugeVec::new( + let ps_last_received_lsn = GenericGaugeVec::new( Opts::new( - "safekeeper_feedback_ps_write_lsn", + "safekeeper_ps_last_received_lsn", "Last LSN received by the pageserver, acknowledged in the feedback", ), &["tenant_id", "timeline_id"], ) .unwrap(); - descs.extend(feedback_ps_write_lsn.desc().into_iter().cloned()); + descs.extend(ps_last_received_lsn.desc().into_iter().cloned()); let feedback_last_time_seconds = GenericGaugeVec::new( Opts::new( @@ -458,7 +458,7 @@ impl TimelineCollector { epoch_start_lsn, peer_horizon_lsn, remote_consistent_lsn, - feedback_ps_write_lsn, + ps_last_received_lsn, feedback_last_time_seconds, timeline_active, wal_backup_active, @@ -489,7 +489,7 @@ impl Collector for TimelineCollector { self.epoch_start_lsn.reset(); self.peer_horizon_lsn.reset(); self.remote_consistent_lsn.reset(); - self.feedback_ps_write_lsn.reset(); + self.ps_last_received_lsn.reset(); self.feedback_last_time_seconds.reset(); self.timeline_active.reset(); self.wal_backup_active.reset(); @@ -514,11 +514,11 @@ impl Collector for TimelineCollector { let timeline_id = tli.ttid.timeline_id.to_string(); let labels = &[tenant_id.as_str(), timeline_id.as_str()]; - let mut most_advanced: Option = None; + let mut most_advanced: Option = None; for replica in tli.replicas.iter() { if let Some(replica_feedback) = replica.pageserver_feedback { if let Some(current) = most_advanced { - if current.ps_writelsn < replica_feedback.ps_writelsn { + if current.last_received_lsn < replica_feedback.last_received_lsn { most_advanced = Some(replica_feedback); } } else { @@ -568,11 +568,10 @@ impl Collector for TimelineCollector { .set(tli.wal_storage.flush_wal_seconds); if let Some(feedback) = most_advanced { - self.feedback_ps_write_lsn + self.ps_last_received_lsn .with_label_values(labels) - .set(feedback.ps_writelsn); - if let Ok(unix_time) = feedback.ps_replytime.duration_since(SystemTime::UNIX_EPOCH) - { + .set(feedback.last_received_lsn); + if let Ok(unix_time) = feedback.replytime.duration_since(SystemTime::UNIX_EPOCH) { self.feedback_last_time_seconds .with_label_values(labels) .set(unix_time.as_secs()); @@ -599,7 +598,7 @@ impl Collector for TimelineCollector { mfs.extend(self.epoch_start_lsn.collect()); mfs.extend(self.peer_horizon_lsn.collect()); mfs.extend(self.remote_consistent_lsn.collect()); - mfs.extend(self.feedback_ps_write_lsn.collect()); + mfs.extend(self.ps_last_received_lsn.collect()); mfs.extend(self.feedback_last_time_seconds.collect()); mfs.extend(self.timeline_active.collect()); mfs.extend(self.wal_backup_active.collect()); diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index d8fe36d7f8..10b4842cbd 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -18,7 +18,7 @@ use crate::control_file; use crate::send_wal::HotStandbyFeedback; use crate::wal_storage; -use pq_proto::{ReplicationFeedback, SystemId}; +use pq_proto::{PageserverFeedback, SystemId}; use utils::{ bin_ser::LeSer, id::{NodeId, TenantId, TenantTimelineId, TimelineId}, @@ -360,7 +360,7 @@ pub struct AppendResponse { // a criterion for walproposer --sync mode exit pub commit_lsn: Lsn, pub hs_feedback: HotStandbyFeedback, - pub pageserver_feedback: ReplicationFeedback, + pub pageserver_feedback: PageserverFeedback, } impl AppendResponse { @@ -370,7 +370,7 @@ impl AppendResponse { flush_lsn: Lsn(0), commit_lsn: Lsn(0), hs_feedback: HotStandbyFeedback::empty(), - pageserver_feedback: ReplicationFeedback::empty(), + pageserver_feedback: PageserverFeedback::empty(), } } } @@ -708,7 +708,7 @@ where commit_lsn: self.state.commit_lsn, // will be filled by the upper code to avoid bothering safekeeper hs_feedback: HotStandbyFeedback::empty(), - pageserver_feedback: ReplicationFeedback::empty(), + pageserver_feedback: PageserverFeedback::empty(), }; trace!("formed AppendResponse {:?}", ar); ar diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index b533e87c5b..a6ca89efa4 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -11,7 +11,7 @@ use postgres_backend::PostgresBackend; use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError}; use postgres_ffi::get_current_timestamp; use postgres_ffi::{TimestampTz, MAX_SEND_SIZE}; -use pq_proto::{BeMessage, ReplicationFeedback, WalSndKeepAlive, XLogDataBody}; +use pq_proto::{BeMessage, PageserverFeedback, WalSndKeepAlive, XLogDataBody}; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncWrite}; @@ -319,11 +319,9 @@ impl ReplyReader { // pageserver sends this. // Note: deserializing is on m[9..] because we skip the tag byte and len bytes. let buf = Bytes::copy_from_slice(&msg[9..]); - let reply = ReplicationFeedback::parse(buf); + let reply = PageserverFeedback::parse(buf); - trace!("ReplicationFeedback is {:?}", reply); - // Only pageserver sends ReplicationFeedback, so set the flag. - // This replica is the source of information to resend to compute. + trace!("PageserverFeedback is {:?}", reply); self.feedback.pageserver_feedback = Some(reply); self.tli diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 931062db1a..9dd8a63cf0 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -4,7 +4,7 @@ use anyhow::{anyhow, bail, Result}; use parking_lot::{Mutex, MutexGuard}; use postgres_ffi::XLogSegNo; -use pq_proto::ReplicationFeedback; +use pq_proto::PageserverFeedback; use serde::Serialize; use std::cmp::{max, min}; use std::path::PathBuf; @@ -91,7 +91,7 @@ pub struct ReplicaState { /// combined hot standby feedback from all replicas pub hs_feedback: HotStandbyFeedback, /// Replication specific feedback received from pageserver, if any - pub pageserver_feedback: Option, + pub pageserver_feedback: Option, } impl Default for ReplicaState { @@ -276,7 +276,7 @@ impl SharedState { // if let Some(pageserver_feedback) = state.pageserver_feedback { if let Some(acc_feedback) = acc.pageserver_feedback { - if acc_feedback.ps_writelsn < pageserver_feedback.ps_writelsn { + if acc_feedback.last_received_lsn < pageserver_feedback.last_received_lsn { warn!("More than one pageserver is streaming WAL for the timeline. Feedback resolving is not fully supported yet."); acc.pageserver_feedback = Some(pageserver_feedback); } @@ -287,12 +287,12 @@ impl SharedState { // last lsn received by pageserver // FIXME if multiple pageservers are streaming WAL, last_received_lsn must be tracked per pageserver. // See https://github.com/neondatabase/neon/issues/1171 - acc.last_received_lsn = Lsn::from(pageserver_feedback.ps_writelsn); + acc.last_received_lsn = Lsn::from(pageserver_feedback.last_received_lsn); // When at least one pageserver has preserved data up to remote_consistent_lsn, // safekeeper is free to delete it, so choose max of all pageservers. acc.remote_consistent_lsn = max( - Lsn::from(pageserver_feedback.ps_applylsn), + Lsn::from(pageserver_feedback.remote_consistent_lsn), acc.remote_consistent_lsn, ); } @@ -585,7 +585,7 @@ impl Timeline { let replica_state = shared_state.replicas[replica_id].unwrap(); let reported_remote_consistent_lsn = replica_state .pageserver_feedback - .map(|f| Lsn(f.ps_applylsn)) + .map(|f| Lsn(f.remote_consistent_lsn)) .unwrap_or(Lsn::INVALID); let stop = shared_state.sk.inmem.commit_lsn == Lsn(0) || // no data at all yet (reported_remote_consistent_lsn!= Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet.