report generation in PageserverFeedback (maybe we don't need this if we include in Offer RPC response?)

This commit is contained in:
Christian Schwarz
2025-02-20 00:41:52 +01:00
parent 84bbe87d60
commit 436178faee
3 changed files with 16 additions and 3 deletions

View File

@@ -5,7 +5,7 @@ use pq_proto::{read_cstr, PG_EPOCH};
use serde::{Deserialize, Serialize};
use tracing::{trace, warn};
use crate::lsn::Lsn;
use crate::{generation::Generation, lsn::Lsn};
/// Feedback pageserver sends to safekeeper and safekeeper resends to compute.
///
@@ -32,6 +32,12 @@ pub struct PageserverFeedback {
pub replytime: SystemTime,
/// Used to track feedbacks from different shards. Always zero for unsharded tenants.
pub shard_number: u32,
/// The shard's pageserver-side generation number.
/// Used to track `remote_consistent_lsn` by generation which is required
/// to determine whether
/// - WAL offers still need to be sent
/// - in future: whether WAL can be evicted and/or pruned
pub generation: Generation,
}
impl PageserverFeedback {
@@ -43,6 +49,7 @@ impl PageserverFeedback {
disk_consistent_lsn: Lsn::INVALID,
replytime: *PG_EPOCH,
shard_number: 0,
generation: Generation::none(),
}
}
@@ -101,6 +108,8 @@ impl PageserverFeedback {
buf.put_u32(self.shard_number);
}
todo!("ps_generation");
buf[buf_ptr] = nkeys;
}
@@ -147,6 +156,9 @@ impl PageserverFeedback {
assert_eq!(len, 4);
rf.shard_number = buf.get_u32();
}
b"ps_generation" => {
todo!();
}
_ => {
let len = buf.get_i32();
warn!(

View File

@@ -1020,7 +1020,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
.walsenders
.record_ps_feedback(self.ws_guard.id, &ps_feedback);
self.tli
.update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn)
.update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn, ps_feedback.generation)
.await;
// in principle new remote_consistent_lsn could allow to
// deactivate the timeline, but we check that regularly through

View File

@@ -11,6 +11,7 @@ use safekeeper_api::models::{
use safekeeper_api::Term;
use tokio::fs::{self};
use tokio_util::sync::CancellationToken;
use utils::generation::Generation;
use utils::id::TenantId;
use utils::sync::gate::Gate;
@@ -974,7 +975,7 @@ impl WalResidentTimeline {
}
/// Update in memory remote consistent lsn.
pub async fn update_remote_consistent_lsn(&self, candidate: Lsn) {
pub async fn update_remote_consistent_lsn(&self, candidate: Lsn, generation: Generation) {
let mut shared_state = self.write_shared_state().await;
shared_state.sk.state_mut().inmem.remote_consistent_lsn = max(
shared_state.sk.state().inmem.remote_consistent_lsn,