From 262348e41b04dab7643c2854f4af0cd4a8a20a58 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Fri, 27 Oct 2023 12:09:02 +0100 Subject: [PATCH] Fix safekeeper log spans (#5643) We were missing spans with ttid in "WAL backup" and several other places, this commit should fix it. Here are the examples of logs before and after: https://gist.github.com/petuhovskiy/711a4a4e7ddde3cab3fa6419b2f70fb9 --- safekeeper/src/handler.rs | 16 +++++++----- safekeeper/src/remove_wal.rs | 22 +++++++--------- safekeeper/src/wal_backup.rs | 48 +++++++++++++++++++---------------- safekeeper/src/wal_service.rs | 14 +++++----- 4 files changed, 51 insertions(+), 49 deletions(-) diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index 71b99ab1d8..6d0ed8650d 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -140,6 +140,12 @@ impl postgres_backend::Handler } } + let ttid = TenantTimelineId::new( + self.tenant_id.unwrap_or(TenantId::from([0u8; 16])), + self.timeline_id.unwrap_or(TimelineId::from([0u8; 16])), + ); + tracing::Span::current().record("ttid", tracing::field::display(ttid)); + Ok(()) } else { Err(QueryError::Other(anyhow::anyhow!( @@ -208,26 +214,22 @@ impl postgres_backend::Handler PG_QUERIES_FINISHED.with_label_values(&[cmd_str]).inc(); } - info!( - "got query {:?} in timeline {:?}", - query_string, self.timeline_id - ); + info!("got query {:?}", query_string); let tenant_id = self.tenant_id.context("tenantid is required")?; let timeline_id = self.timeline_id.context("timelineid is required")?; self.check_permission(Some(tenant_id))?; self.ttid = TenantTimelineId::new(tenant_id, timeline_id); - let span_ttid = self.ttid; // satisfy borrow checker match cmd { SafekeeperPostgresCommand::StartWalPush => { self.handle_start_wal_push(pgb) - .instrument(info_span!("WAL receiver", ttid = %span_ttid)) + .instrument(info_span!("WAL receiver")) .await } SafekeeperPostgresCommand::StartReplication { start_lsn, term } => { self.handle_start_replication(pgb, start_lsn, term) - .instrument(info_span!("WAL sender", ttid = %span_ttid)) + .instrument(info_span!("WAL sender")) .await } SafekeeperPostgresCommand::IdentifySystem => self.handle_identify_system(pgb).await, diff --git a/safekeeper/src/remove_wal.rs b/safekeeper/src/remove_wal.rs index 3306f0b63a..d96eedf401 100644 --- a/safekeeper/src/remove_wal.rs +++ b/safekeeper/src/remove_wal.rs @@ -16,20 +16,16 @@ pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> { continue; } let ttid = tli.ttid; - if let Err(e) = tli - .maybe_persist_control_file() - .instrument(info_span!("", tenant = %ttid.tenant_id, timeline = %ttid.timeline_id)) - .await - { - warn!("failed to persist control file: {e}"); - } - if let Err(e) = tli - .remove_old_wal(conf.wal_backup_enabled) - .instrument(info_span!("", tenant = %ttid.tenant_id, timeline = %ttid.timeline_id)) - .await - { - error!("failed to remove WAL: {}", e); + async { + if let Err(e) = tli.maybe_persist_control_file().await { + warn!("failed to persist control file: {e}"); + } + if let Err(e) = tli.remove_old_wal(conf.wal_backup_enabled).await { + error!("failed to remove WAL: {}", e); + } } + .instrument(info_span!("WAL removal", ttid = %ttid)) + .await; } sleep(wal_removal_interval).await; } diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index da8c197411..22c68ce3c9 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -136,7 +136,7 @@ async fn update_task( if elected_me != (entry.handle.is_some()) { if elected_me { - info!("elected for backup {}: {}", ttid, election_dbg_str); + info!("elected for backup: {}", election_dbg_str); let (shutdown_tx, shutdown_rx) = mpsc::channel(1); let timeline_dir = conf.timeline_dir(&ttid); @@ -149,7 +149,7 @@ async fn update_task( conf.backup_parallel_jobs, shutdown_rx, ) - .instrument(info_span!("WAL backup task", ttid = %ttid)), + .in_current_span(), ); entry.handle = Some(WalBackupTaskHandle { @@ -157,7 +157,7 @@ async fn update_task( handle, }); } else { - info!("stepping down from backup {}: {}", ttid, election_dbg_str); + info!("stepping down from backup: {}", election_dbg_str); shut_down_task(ttid, entry).await; } } @@ -199,29 +199,33 @@ pub async fn wal_backup_launcher_task_main( if conf.remote_storage.is_none() || !conf.wal_backup_enabled { continue; /* just drain the channel and do nothing */ } - let timeline = is_wal_backup_required(ttid).await; - // do we need to do anything at all? - if timeline.is_some() != tasks.contains_key(&ttid) { - if let Some(timeline) = timeline { - // need to start the task - let entry = tasks.entry(ttid).or_insert(WalBackupTimelineEntry { - timeline, - handle: None, - }); - update_task(&conf, ttid, entry).await; - } else { - // need to stop the task - info!("stopping WAL backup task for {}", ttid); - let mut entry = tasks.remove(&ttid).unwrap(); - shut_down_task(ttid, &mut entry).await; + async { + let timeline = is_wal_backup_required(ttid).await; + // do we need to do anything at all? + if timeline.is_some() != tasks.contains_key(&ttid) { + if let Some(timeline) = timeline { + // need to start the task + let entry = tasks.entry(ttid).or_insert(WalBackupTimelineEntry { + timeline, + handle: None, + }); + update_task(&conf, ttid, entry).await; + } else { + // need to stop the task + info!("stopping WAL backup task"); + let mut entry = tasks.remove(&ttid).unwrap(); + shut_down_task(ttid, &mut entry).await; + } } - } + }.instrument(info_span!("WAL backup", ttid = %ttid)).await; } // For each timeline needing offloading, check if this safekeeper // should do the job and start/stop the task accordingly. _ = ticker.tick() => { for (ttid, entry) in tasks.iter_mut() { - update_task(&conf, *ttid, entry).await; + update_task(&conf, *ttid, entry) + .instrument(info_span!("WAL backup", ttid = %ttid)) + .await; } } } @@ -248,7 +252,7 @@ async fn backup_task_main( info!("started"); let res = GlobalTimelines::get(ttid); if let Err(e) = res { - error!("backup error for timeline {}: {}", ttid, e); + error!("backup error: {}", e); return; } let tli = res.unwrap(); @@ -346,7 +350,7 @@ impl WalBackupTask { } } -pub async fn backup_lsn_range( +async fn backup_lsn_range( timeline: &Arc, backup_lsn: &mut Lsn, end_lsn: Lsn, diff --git a/safekeeper/src/wal_service.rs b/safekeeper/src/wal_service.rs index 9fabaa79fb..bceaad1e16 100644 --- a/safekeeper/src/wal_service.rs +++ b/safekeeper/src/wal_service.rs @@ -36,14 +36,14 @@ pub async fn task_main( let conf = conf.clone(); let conn_id = issue_connection_id(&mut connection_count); - tokio::spawn(async move { - if let Err(err) = handle_socket(socket, conf, conn_id, allowed_auth_scope) - .instrument(info_span!("", cid = %conn_id)) - .await - { - error!("connection handler exited: {}", err); + tokio::spawn( + async move { + if let Err(err) = handle_socket(socket, conf, conn_id, allowed_auth_scope).await { + error!("connection handler exited: {}", err); + } } - }); + .instrument(info_span!("", cid = %conn_id, ttid = field::Empty)), + ); } }