diff --git a/libs/pq_proto/src/lib.rs b/libs/pq_proto/src/lib.rs index 53045060c9..8d92352a71 100644 --- a/libs/pq_proto/src/lib.rs +++ b/libs/pq_proto/src/lib.rs @@ -680,7 +680,7 @@ pub struct InterpretedWalRecordsBody<'a> { /// End of raw WAL in [`Self::data`] pub streaming_lsn: u64, /// Current end of WAL on the server - pub safekeeper_wal_end_lsn: u64, + pub commit_lsn: u64, /// Start LSN of the next record in PG WAL. /// Is 0 if the portion of PG WAL did not contain any records. pub next_record_lsn: u64, @@ -1020,7 +1020,7 @@ impl BeMessage<'_> { buf.put_u8(b'0'); // matches INTERPRETED_WAL_RECORD_TAG in postgres-protocol // dependency buf.put_u64(rec.streaming_lsn); - buf.put_u64(rec.safekeeper_wal_end_lsn); + buf.put_u64(rec.commit_lsn); buf.put_u64(rec.next_record_lsn); buf.put_slice(rec.data); }); diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index e60342a014..1a0e66ceb3 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -297,7 +297,7 @@ pub(super) async fn handle_walreceiver_connection( connection_status.latest_wal_update = now; } - connection_status.commit_lsn = Some(Lsn::from(raw.wal_end())); + connection_status.commit_lsn = Some(Lsn::from(raw.commit_lsn())); connection_status.streaming_lsn = Some(Lsn::from(raw.streaming_lsn())); } &_ => {} diff --git a/safekeeper/src/send_interpreted_wal.rs b/safekeeper/src/send_interpreted_wal.rs index d241bdec22..308f7c92ad 100644 --- a/safekeeper/src/send_interpreted_wal.rs +++ b/safekeeper/src/send_interpreted_wal.rs @@ -49,7 +49,7 @@ impl InterpretedWalSender<'_, IO> { tokio::select! { // Get some WAL from the stream and then: decode, interpret and send it wal = stream.next() => { - let WalBytes { wal, wal_start_lsn, wal_end_lsn, available_wal_end_lsn } = match wal { + let WalBytes { wal, wal_start_lsn, wal_end_lsn, commit_lsn } = match wal { Some(some) => some?, None => { break; } }; @@ -92,7 +92,7 @@ impl InterpretedWalSender<'_, IO> { self.pgb .write_message(&BeMessage::InterpretedWalRecords(InterpretedWalRecordsBody { streaming_lsn: wal_end_lsn.0, - safekeeper_wal_end_lsn: available_wal_end_lsn.0, + commit_lsn: commit_lsn.0, next_record_lsn: max_next_record_lsn.unwrap_or(Lsn::INVALID).0, data: buf.as_slice(), })).await?; diff --git a/safekeeper/src/wal_reader_stream.rs b/safekeeper/src/wal_reader_stream.rs index 5c941db663..9a5c0b292a 100644 --- a/safekeeper/src/wal_reader_stream.rs +++ b/safekeeper/src/wal_reader_stream.rs @@ -38,7 +38,7 @@ pub(crate) struct WalBytes { /// End LSN of [`Self::wal`] pub(crate) wal_end_lsn: Lsn, /// End LSN of WAL available on the safekeeper - pub(crate) available_wal_end_lsn: Lsn, + pub(crate) commit_lsn: Lsn, } impl WalReaderStreamBuilder { @@ -135,7 +135,7 @@ impl WalReaderStreamBuilder { wal, wal_start_lsn: start_pos, wal_end_lsn: start_pos + send_size as u64, - available_wal_end_lsn: end_pos + commit_lsn: end_pos }; start_pos += send_size as u64;