From dae1b58964098be0c56e67e422a0e146ef95eead Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Tue, 24 Jun 2025 16:30:58 +0200 Subject: [PATCH] wip: more logging --- pageserver/src/tenant/timeline/walreceiver.rs | 28 ++++++++++++++++--- .../walreceiver/walreceiver_connection.rs | 13 +++++++++ 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/pageserver/src/tenant/timeline/walreceiver.rs b/pageserver/src/tenant/timeline/walreceiver.rs index a22e18ecd7..3eea68ade7 100644 --- a/pageserver/src/tenant/timeline/walreceiver.rs +++ b/pageserver/src/tenant/timeline/walreceiver.rs @@ -172,7 +172,7 @@ enum TaskStateUpdate { Progress(E), } -impl TaskHandle { +impl TaskHandle { /// Initializes the task, starting it immediately after the creation. /// /// The second argument to `task` is a child token of `cancel_parent` ([`CancellationToken::child_token`]). @@ -244,10 +244,30 @@ impl TaskHandle { } /// Aborts current task, waiting for it to finish. - async fn shutdown(self) { - if let Some(jh) = self.join_handle { + async fn shutdown(mut self) { + if let Some(mut jh) = self.join_handle { self.cancellation.cancel(); - match jh.await { + + let res = loop { + tokio::select! { + res = &mut jh => { + break res; + }, + received = self.events_receiver.changed() => { + match received { + Ok(()) => { + let event = self.events_receiver.borrow(); + tracing::info!("Received update after cancellation: {event:?}"); + }, + Err(err) => { + tracing::info!("Sender dropped after cancellation: {err}"); + } + } + } + } + }; + + match res { Ok(Ok(())) => debug!("Shutdown success"), Ok(Err(e)) => error!("Shutdown task error: {e:?}"), Err(je) if je.is_cancelled() => unreachable!("not used"), diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index b2c147caa9..ba4d039030 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -307,6 +307,19 @@ pub(super) async fn handle_walreceiver_connection( } { let replication_message = replication_message?; + match &replication_message { + ReplicationMessage::XLogData(_) => { + tracing::info!("Received XLogData replication message") + } + ReplicationMessage::PrimaryKeepAlive(_) => { + tracing::info!("Received PrimaryKeepAlive replication message") + } + ReplicationMessage::RawInterpretedWalRecords(_) => { + tracing::info!("Received RawInterpretedWalRecords replication message") + } + unknown => tracing::info!("Received unknown replication message: {unknown:?}"), + } + let now = Utc::now().naive_utc(); let last_rec_lsn_before_msg = last_rec_lsn;