From 436178faeedec9f4cf0494db4598db9226a96eb7 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 20 Feb 2025 00:41:52 +0100 Subject: [PATCH] report generation in PageserverFeedback (maybe we don't need this if we include in Offer RPC response?) --- libs/utils/src/pageserver_feedback.rs | 14 +++++++++++++- safekeeper/src/send_wal.rs | 2 +- safekeeper/src/timeline.rs | 3 ++- 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/libs/utils/src/pageserver_feedback.rs b/libs/utils/src/pageserver_feedback.rs index dede65e699..e49e2ca87a 100644 --- a/libs/utils/src/pageserver_feedback.rs +++ b/libs/utils/src/pageserver_feedback.rs @@ -5,7 +5,7 @@ use pq_proto::{read_cstr, PG_EPOCH}; use serde::{Deserialize, Serialize}; use tracing::{trace, warn}; -use crate::lsn::Lsn; +use crate::{generation::Generation, lsn::Lsn}; /// Feedback pageserver sends to safekeeper and safekeeper resends to compute. /// @@ -32,6 +32,12 @@ pub struct PageserverFeedback { pub replytime: SystemTime, /// Used to track feedbacks from different shards. Always zero for unsharded tenants. pub shard_number: u32, + /// The shard's pageserver-side generation number. + /// Used to track `remote_consistent_lsn` by generation which is required + /// to determine whether + /// - WAL offers still need to be sent + /// - in future: whether WAL can be evicted and/or pruned + pub generation: Generation, } impl PageserverFeedback { @@ -43,6 +49,7 @@ impl PageserverFeedback { disk_consistent_lsn: Lsn::INVALID, replytime: *PG_EPOCH, shard_number: 0, + generation: Generation::none(), } } @@ -101,6 +108,8 @@ impl PageserverFeedback { buf.put_u32(self.shard_number); } + todo!("ps_generation"); + buf[buf_ptr] = nkeys; } @@ -147,6 +156,9 @@ impl PageserverFeedback { assert_eq!(len, 4); rf.shard_number = buf.get_u32(); } + b"ps_generation" => { + todo!(); + } _ => { let len = buf.get_i32(); warn!( diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 4a4a74a0fd..66584f7814 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -1020,7 +1020,7 @@ impl ReplyReader { .walsenders .record_ps_feedback(self.ws_guard.id, &ps_feedback); self.tli - .update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn) + .update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn, ps_feedback.generation) .await; // in principle new remote_consistent_lsn could allow to // deactivate the timeline, but we check that regularly through diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 4341f13824..c1c87532db 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -11,6 +11,7 @@ use safekeeper_api::models::{ use safekeeper_api::Term; use tokio::fs::{self}; use tokio_util::sync::CancellationToken; +use utils::generation::Generation; use utils::id::TenantId; use utils::sync::gate::Gate; @@ -974,7 +975,7 @@ impl WalResidentTimeline { } /// Update in memory remote consistent lsn. - pub async fn update_remote_consistent_lsn(&self, candidate: Lsn) { + pub async fn update_remote_consistent_lsn(&self, candidate: Lsn, generation: Generation) { let mut shared_state = self.write_shared_state().await; shared_state.sk.state_mut().inmem.remote_consistent_lsn = max( shared_state.sk.state().inmem.remote_consistent_lsn,