undo the remote_consistent_lsn feedback channel brought in by the PoC merge (includes undo of funneling pageserver_connection field via connection options)

This commit is contained in:
Christian Schwarz
2025-06-02 12:00:39 +02:00
parent 39039d1be7
commit 18a43eeab3
7 changed files with 1 additions and 63 deletions

View File

@@ -46,8 +46,6 @@ pub struct ConnectionConfigArgs<'a> {
pub auth_token: Option<&'a str>,
pub availability_zone: Option<&'a str>,
pub pageserver_generation: Option<u32>,
}
impl<'a> ConnectionConfigArgs<'a> {
@@ -74,10 +72,6 @@ impl<'a> ConnectionConfigArgs<'a> {
));
}
if let Some(pageserver_generation) = self.pageserver_generation {
options.push(format!("pageserver_generation={pageserver_generation}"));
}
options
}
}

View File

@@ -880,8 +880,6 @@ impl ConnectionManagerState {
listen_pg_addr_str: info.safekeeper_connstr.as_ref(),
auth_token: self.conf.auth_token.as_ref().map(|t| t.as_str()),
availability_zone: self.conf.availability_zone.as_deref(),
// TODO: do we still have the emergency mode that runs without generations? If so, this expect would panic in that mode.
pageserver_generation: Some(self.timeline.generation.into().expect("attachments always have a generation number nowadays")),
};
match wal_stream_connection_config(connection_conf_args) {

View File

@@ -38,7 +38,6 @@ pub struct SafekeeperPostgresHandler {
pub timeline_id: Option<TimelineId>,
pub ttid: TenantTimelineId,
pub shard: Option<ShardIdentity>,
pub pageserver_generation: Option<Generation>,
pub protocol: Option<PostgresClientProtocol>,
/// Unique connection id is logged in spans for observability.
pub conn_id: ConnectionId,
@@ -161,7 +160,6 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
let mut shard_count: Option<u8> = None;
let mut shard_number: Option<u8> = None;
let mut shard_stripe_size: Option<u32> = None;
let mut pageserver_generation: Option<Generation> = None;
for opt in options {
// FIXME `ztenantid` and `ztimelineid` left for compatibility during deploy,
@@ -204,12 +202,6 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
format!("Failed to parse {value} as shard stripe size")
})?);
}
Some(("pageserver_generation", value)) => {
self.pageserver_generation =
Some(value.parse::<u32>().map(Generation::new).with_context(
|| format!("Failed to parse {value} as generation"),
)?);
}
_ => continue,
}
}
@@ -268,12 +260,6 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
tracing::Span::current().record("shard", tracing::field::display(slug));
}
}
if let Some(pageserver_generation) = self.pageserver_generation {
tracing::Span::current().record(
"pageserver_generation",
tracing::field::display(pageserver_generation.get_suffix()),
);
}
Ok(())
} else {
@@ -385,7 +371,6 @@ impl SafekeeperPostgresHandler {
timeline_id: None,
ttid: TenantTimelineId::empty(),
shard: None,
pageserver_generation: None,
protocol: None,
conn_id,
claims: None,

View File

@@ -360,7 +360,6 @@ async fn recovery_stream(
listen_pg_addr_str: &donor.pg_connstr,
auth_token: None,
availability_zone: None,
pageserver_generation: None,
};
let cfg = wal_stream_connection_config(connection_conf_args)?;
let mut cfg = cfg.to_tokio_postgres_config();

View File

@@ -658,29 +658,10 @@ impl SafekeeperPostgresHandler {
let tli_cancel = tli.cancel.clone();
let wal_advertiser = match (self.shard, self.pageserver_generation) {
(Some(shard), Some(pageserver_generation)) => {
Some(tli.wal_advertiser.get_pageserver_timeline(
self.ttid,
shard.shard_index(),
pageserver_generation,
))
}
(shard, pageserver_generation) => {
debug!(
?shard,
?pageserver_generation,
"cannot feedback last_record_lsn to wal_advertiser subsystem, client must specify shard and pageserver_generation"
);
None
}
};
let mut reply_reader = ReplyReader {
reader,
ws_guard: ws_guard.clone(),
tli,
wal_advertiser,
};
let res = tokio::select! {
@@ -997,7 +978,6 @@ struct ReplyReader<IO> {
reader: PostgresBackendReader<IO>,
ws_guard: Arc<WalSenderGuard>,
tli: WalResidentTimeline,
wal_advertiser: Option<Arc<wal_advertiser::advmap::PageserverTimeline>>,
}
impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
@@ -1044,9 +1024,6 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
self.tli
.update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn)
.await;
if let Some(wal_advertiser) = &self.wal_advertiser {
wal_advertiser.update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn);
}
// in principle new remote_consistent_lsn could allow to
// deactivate the timeline, but we check that regularly through
// broker updated, not need to do it here

View File

@@ -60,19 +60,4 @@ impl SafekeeperTimeline {
pub fn update_commit_lsn(&self, commit_lsn: Lsn) {
todo!()
}
pub fn get_pageserver_timeline(
&self,
ttld: TenantTimelineId,
shard: ShardIndex,
pageserver_generation: Generation,
) -> Arc<PageserverTimeline> {
assert!(!pageserver_generation.is_none());
todo!()
}
}
impl PageserverTimeline {
pub fn update_remote_consistent_lsn(&self, lsn: Lsn) -> anyhow::Result<()> {
todo!()
}
}

View File

@@ -51,7 +51,7 @@ pub async fn task_main(
error!("connection handler exited: {}", err);
}
}
.instrument(info_span!("", cid = %conn_id, ttid = field::Empty, application_name = field::Empty, shard = field::Empty, pageserver_generation = field::Empty)),
.instrument(info_span!("", cid = %conn_id, ttid = field::Empty, application_name = field::Empty, shard = field::Empty)),
);
}
}