Fix safekeeper log spans (#5643)

We were missing spans with ttid in "WAL backup" and several other
places, this commit should fix it.

Here are the examples of logs before and after:
https://gist.github.com/petuhovskiy/711a4a4e7ddde3cab3fa6419b2f70fb9
This commit is contained in:
Arthur Petukhovsky
2023-10-27 12:09:02 +01:00
committed by GitHub
parent 68f15cf967
commit 262348e41b
4 changed files with 51 additions and 49 deletions

View File

@@ -140,6 +140,12 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
}
}
let ttid = TenantTimelineId::new(
self.tenant_id.unwrap_or(TenantId::from([0u8; 16])),
self.timeline_id.unwrap_or(TimelineId::from([0u8; 16])),
);
tracing::Span::current().record("ttid", tracing::field::display(ttid));
Ok(())
} else {
Err(QueryError::Other(anyhow::anyhow!(
@@ -208,26 +214,22 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
PG_QUERIES_FINISHED.with_label_values(&[cmd_str]).inc();
}
info!(
"got query {:?} in timeline {:?}",
query_string, self.timeline_id
);
info!("got query {:?}", query_string);
let tenant_id = self.tenant_id.context("tenantid is required")?;
let timeline_id = self.timeline_id.context("timelineid is required")?;
self.check_permission(Some(tenant_id))?;
self.ttid = TenantTimelineId::new(tenant_id, timeline_id);
let span_ttid = self.ttid; // satisfy borrow checker
match cmd {
SafekeeperPostgresCommand::StartWalPush => {
self.handle_start_wal_push(pgb)
.instrument(info_span!("WAL receiver", ttid = %span_ttid))
.instrument(info_span!("WAL receiver"))
.await
}
SafekeeperPostgresCommand::StartReplication { start_lsn, term } => {
self.handle_start_replication(pgb, start_lsn, term)
.instrument(info_span!("WAL sender", ttid = %span_ttid))
.instrument(info_span!("WAL sender"))
.await
}
SafekeeperPostgresCommand::IdentifySystem => self.handle_identify_system(pgb).await,

View File

@@ -16,20 +16,16 @@ pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
continue;
}
let ttid = tli.ttid;
if let Err(e) = tli
.maybe_persist_control_file()
.instrument(info_span!("", tenant = %ttid.tenant_id, timeline = %ttid.timeline_id))
.await
{
warn!("failed to persist control file: {e}");
}
if let Err(e) = tli
.remove_old_wal(conf.wal_backup_enabled)
.instrument(info_span!("", tenant = %ttid.tenant_id, timeline = %ttid.timeline_id))
.await
{
error!("failed to remove WAL: {}", e);
async {
if let Err(e) = tli.maybe_persist_control_file().await {
warn!("failed to persist control file: {e}");
}
if let Err(e) = tli.remove_old_wal(conf.wal_backup_enabled).await {
error!("failed to remove WAL: {}", e);
}
}
.instrument(info_span!("WAL removal", ttid = %ttid))
.await;
}
sleep(wal_removal_interval).await;
}

View File

@@ -136,7 +136,7 @@ async fn update_task(
if elected_me != (entry.handle.is_some()) {
if elected_me {
info!("elected for backup {}: {}", ttid, election_dbg_str);
info!("elected for backup: {}", election_dbg_str);
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
let timeline_dir = conf.timeline_dir(&ttid);
@@ -149,7 +149,7 @@ async fn update_task(
conf.backup_parallel_jobs,
shutdown_rx,
)
.instrument(info_span!("WAL backup task", ttid = %ttid)),
.in_current_span(),
);
entry.handle = Some(WalBackupTaskHandle {
@@ -157,7 +157,7 @@ async fn update_task(
handle,
});
} else {
info!("stepping down from backup {}: {}", ttid, election_dbg_str);
info!("stepping down from backup: {}", election_dbg_str);
shut_down_task(ttid, entry).await;
}
}
@@ -199,29 +199,33 @@ pub async fn wal_backup_launcher_task_main(
if conf.remote_storage.is_none() || !conf.wal_backup_enabled {
continue; /* just drain the channel and do nothing */
}
let timeline = is_wal_backup_required(ttid).await;
// do we need to do anything at all?
if timeline.is_some() != tasks.contains_key(&ttid) {
if let Some(timeline) = timeline {
// need to start the task
let entry = tasks.entry(ttid).or_insert(WalBackupTimelineEntry {
timeline,
handle: None,
});
update_task(&conf, ttid, entry).await;
} else {
// need to stop the task
info!("stopping WAL backup task for {}", ttid);
let mut entry = tasks.remove(&ttid).unwrap();
shut_down_task(ttid, &mut entry).await;
async {
let timeline = is_wal_backup_required(ttid).await;
// do we need to do anything at all?
if timeline.is_some() != tasks.contains_key(&ttid) {
if let Some(timeline) = timeline {
// need to start the task
let entry = tasks.entry(ttid).or_insert(WalBackupTimelineEntry {
timeline,
handle: None,
});
update_task(&conf, ttid, entry).await;
} else {
// need to stop the task
info!("stopping WAL backup task");
let mut entry = tasks.remove(&ttid).unwrap();
shut_down_task(ttid, &mut entry).await;
}
}
}
}.instrument(info_span!("WAL backup", ttid = %ttid)).await;
}
// For each timeline needing offloading, check if this safekeeper
// should do the job and start/stop the task accordingly.
_ = ticker.tick() => {
for (ttid, entry) in tasks.iter_mut() {
update_task(&conf, *ttid, entry).await;
update_task(&conf, *ttid, entry)
.instrument(info_span!("WAL backup", ttid = %ttid))
.await;
}
}
}
@@ -248,7 +252,7 @@ async fn backup_task_main(
info!("started");
let res = GlobalTimelines::get(ttid);
if let Err(e) = res {
error!("backup error for timeline {}: {}", ttid, e);
error!("backup error: {}", e);
return;
}
let tli = res.unwrap();
@@ -346,7 +350,7 @@ impl WalBackupTask {
}
}
pub async fn backup_lsn_range(
async fn backup_lsn_range(
timeline: &Arc<Timeline>,
backup_lsn: &mut Lsn,
end_lsn: Lsn,

View File

@@ -36,14 +36,14 @@ pub async fn task_main(
let conf = conf.clone();
let conn_id = issue_connection_id(&mut connection_count);
tokio::spawn(async move {
if let Err(err) = handle_socket(socket, conf, conn_id, allowed_auth_scope)
.instrument(info_span!("", cid = %conn_id))
.await
{
error!("connection handler exited: {}", err);
tokio::spawn(
async move {
if let Err(err) = handle_socket(socket, conf, conn_id, allowed_auth_scope).await {
error!("connection handler exited: {}", err);
}
}
});
.instrument(info_span!("", cid = %conn_id, ttid = field::Empty)),
);
}
}