diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index 015b53bb2e..0356def7df 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -183,9 +183,19 @@ impl SafekeeperPostgresHandler { &mut self, pgb: &mut PostgresBackend, ) -> Result<(), QueryError> { - if let Err(end) = self.handle_start_wal_push_guts(pgb).await { + let mut tli: Option> = None; + if let Err(end) = self.handle_start_wal_push_guts(pgb, &mut tli).await { // Log the result and probably send it to the client, closing the stream. - pgb.handle_copy_stream_end(end).await; + let handle_end_fut = pgb.handle_copy_stream_end(end); + // If we managed to create the timeline, augment logging with current LSNs etc. + if let Some(tli) = tli { + let info = tli.get_safekeeper_info(&self.conf).await; + handle_end_fut + .instrument(info_span!("", term=%info.term, last_log_term=%info.last_log_term, flush_lsn=%Lsn(info.flush_lsn), commit_lsn=%Lsn(info.commit_lsn))) + .await; + } else { + handle_end_fut.await; + } } Ok(()) } @@ -193,6 +203,7 @@ impl SafekeeperPostgresHandler { pub async fn handle_start_wal_push_guts( &mut self, pgb: &mut PostgresBackend, + tli: &mut Option>, ) -> Result<(), CopyStreamHandlerEnd> { // Notify the libpq client that it's allowed to send `CopyData` messages pgb.write_message(&BeMessage::CopyBothResponse).await?; @@ -222,13 +233,17 @@ impl SafekeeperPostgresHandler { // Read first message and create timeline if needed. let res = network_reader.read_first_message().await; - let res = if let Ok((tli, next_msg)) = res { + let network_res = if let Ok((timeline, next_msg)) = res { let pageserver_feedback_rx: tokio::sync::broadcast::Receiver = - tli.get_walreceivers().pageserver_feedback_tx.subscribe(); + timeline + .get_walreceivers() + .pageserver_feedback_tx + .subscribe(); + *tli = Some(timeline.clone()); tokio::select! { // todo: add read|write .context to these errors - r = network_reader.run(msg_tx, msg_rx, reply_tx, tli.clone(), next_msg) => r, + r = network_reader.run(msg_tx, msg_rx, reply_tx, timeline.clone(), next_msg) => r, r = network_write(pgb, reply_rx, pageserver_feedback_rx) => r, } } else { @@ -244,13 +259,13 @@ impl SafekeeperPostgresHandler { match acceptor_handle { None => { // failed even before spawning; read_network should have error - Err(res.expect_err("no error with WalAcceptor not spawn")) + Err(network_res.expect_err("no error with WalAcceptor not spawn")) } Some(handle) => { let wal_acceptor_res = handle.await; // If there was any network error, return it. - res?; + network_res?; // Otherwise, WalAcceptor thread must have errored. match wal_acceptor_res { diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 59a8c595ab..ecaae9cfe7 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -340,12 +340,16 @@ impl SafekeeperPostgresHandler { start_pos: Lsn, term: Option, ) -> Result<(), QueryError> { + let tli = GlobalTimelines::get(self.ttid).map_err(|e| QueryError::Other(e.into()))?; if let Err(end) = self - .handle_start_replication_guts(pgb, start_pos, term) + .handle_start_replication_guts(pgb, start_pos, term, tli.clone()) .await { + let info = tli.get_safekeeper_info(&self.conf).await; // Log the result and probably send it to the client, closing the stream. - pgb.handle_copy_stream_end(end).await; + pgb.handle_copy_stream_end(end) + .instrument(info_span!("", term=%info.term, last_log_term=%info.last_log_term, flush_lsn=%Lsn(info.flush_lsn), commit_lsn=%Lsn(info.flush_lsn))) + .await; } Ok(()) } @@ -355,10 +359,9 @@ impl SafekeeperPostgresHandler { pgb: &mut PostgresBackend, start_pos: Lsn, term: Option, + tli: Arc, ) -> Result<(), CopyStreamHandlerEnd> { let appname = self.appname.clone(); - let tli = - GlobalTimelines::get(self.ttid).map_err(|e| CopyStreamHandlerEnd::Other(e.into()))?; // Use a guard object to remove our entry from the timeline when we are done. let ws_guard = Arc::new(tli.get_walsenders().register(