mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-13 16:32:56 +00:00
page_service: add peer_addr span field, and set tenant_id / timeline_id fields earlier (#4638)
Before this PR, during shutdown, we'd find naked logs like this one for every active page service connection: ``` 2023-07-05T14:13:50.791992Z INFO shutdown request received in run_message_loop ``` This PR 1. adds a peer_addr span field to distinguish the connections in logs 2. sets the tenant_id / timeline_id fields earlier It would be nice to have `tenant_id` and `timeline_id` directly on the `page_service_conn_main` span (empty, initially), then set them at the top of `process_query`. The problem is that the debug asserts for `tenant_id` and `timeline_id` presence in the tracing span doesn't support detecting empty values [1]. So, I'm a bit hesitant about over-using `Span::record`. [1] https://github.com/neondatabase/neon/issues/4676
This commit is contained in:
committed by
GitHub
parent
5177c1e4b1
commit
65ff256bb8
@@ -33,6 +33,7 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio_util::io::StreamReader;
|
||||
use tracing::field;
|
||||
use tracing::*;
|
||||
use utils::id::ConnectionId;
|
||||
use utils::{
|
||||
@@ -51,6 +52,7 @@ use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME};
|
||||
use crate::task_mgr;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant;
|
||||
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::tenant::mgr;
|
||||
use crate::tenant::mgr::GetTenantError;
|
||||
use crate::tenant::{Tenant, Timeline};
|
||||
@@ -238,6 +240,7 @@ pub async fn libpq_listener_main(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(peer_addr))]
|
||||
async fn page_service_conn_main(
|
||||
conf: &'static PageServerConf,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
@@ -260,6 +263,7 @@ async fn page_service_conn_main(
|
||||
.context("could not set TCP_NODELAY")?;
|
||||
|
||||
let peer_addr = socket.peer_addr().context("get peer address")?;
|
||||
tracing::Span::current().record("peer_addr", field::display(peer_addr));
|
||||
|
||||
// setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
|
||||
// - long enough for most valid compute connections
|
||||
@@ -362,7 +366,7 @@ impl PageServerHandler {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip(self, pgb, ctx))]
|
||||
#[instrument(skip_all)]
|
||||
async fn handle_pagerequests<IO>(
|
||||
&self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
@@ -373,6 +377,8 @@ impl PageServerHandler {
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
// NOTE: pagerequests handler exits when connection is closed,
|
||||
// so there is no need to reset the association
|
||||
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
|
||||
@@ -473,7 +479,7 @@ impl PageServerHandler {
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[instrument(skip(self, pgb, ctx))]
|
||||
#[instrument(skip_all, fields(%base_lsn, end_lsn=%_end_lsn, %pg_version))]
|
||||
async fn handle_import_basebackup<IO>(
|
||||
&self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
@@ -487,6 +493,8 @@ impl PageServerHandler {
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
|
||||
// Create empty timeline
|
||||
info!("creating new timeline");
|
||||
@@ -531,7 +539,7 @@ impl PageServerHandler {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip(self, pgb, ctx))]
|
||||
#[instrument(skip_all, fields(%start_lsn, %end_lsn))]
|
||||
async fn handle_import_wal<IO>(
|
||||
&self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
@@ -544,6 +552,7 @@ impl PageServerHandler {
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
|
||||
|
||||
let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
|
||||
@@ -738,7 +747,7 @@ impl PageServerHandler {
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[instrument(skip(self, pgb, ctx))]
|
||||
#[instrument(skip_all, fields(?lsn, ?prev_lsn, %full_backup))]
|
||||
async fn handle_basebackup_request<IO>(
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
@@ -752,6 +761,8 @@ impl PageServerHandler {
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let started = std::time::Instant::now();
|
||||
|
||||
// check that the timeline exists
|
||||
@@ -862,6 +873,7 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id, timeline_id))]
|
||||
async fn process_query(
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
@@ -883,6 +895,10 @@ where
|
||||
let timeline_id = TimelineId::from_str(params[1])
|
||||
.with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
|
||||
|
||||
tracing::Span::current()
|
||||
.record("tenant_id", field::display(tenant_id))
|
||||
.record("timeline_id", field::display(timeline_id));
|
||||
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
|
||||
self.handle_pagerequests(pgb, tenant_id, timeline_id, ctx)
|
||||
@@ -902,6 +918,10 @@ where
|
||||
let timeline_id = TimelineId::from_str(params[1])
|
||||
.with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
|
||||
|
||||
tracing::Span::current()
|
||||
.record("tenant_id", field::display(tenant_id))
|
||||
.record("timeline_id", field::display(timeline_id));
|
||||
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
|
||||
let lsn = if params.len() >= 3 {
|
||||
@@ -948,6 +968,10 @@ where
|
||||
let timeline_id = TimelineId::from_str(params[1])
|
||||
.with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
|
||||
|
||||
tracing::Span::current()
|
||||
.record("tenant_id", field::display(tenant_id))
|
||||
.record("timeline_id", field::display(timeline_id));
|
||||
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
|
||||
|
||||
@@ -979,6 +1003,10 @@ where
|
||||
let timeline_id = TimelineId::from_str(params[1])
|
||||
.with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
|
||||
|
||||
tracing::Span::current()
|
||||
.record("tenant_id", field::display(tenant_id))
|
||||
.record("timeline_id", field::display(timeline_id));
|
||||
|
||||
// The caller is responsible for providing correct lsn and prev_lsn.
|
||||
let lsn = if params.len() > 2 {
|
||||
Some(
|
||||
@@ -1033,6 +1061,10 @@ where
|
||||
let pg_version = u32::from_str(params[4])
|
||||
.with_context(|| format!("Failed to parse pg_version from {}", params[4]))?;
|
||||
|
||||
tracing::Span::current()
|
||||
.record("tenant_id", field::display(tenant_id))
|
||||
.record("timeline_id", field::display(timeline_id));
|
||||
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
|
||||
match self
|
||||
@@ -1077,6 +1109,10 @@ where
|
||||
let end_lsn = Lsn::from_str(params[3])
|
||||
.with_context(|| format!("Failed to parse Lsn from {}", params[3]))?;
|
||||
|
||||
tracing::Span::current()
|
||||
.record("tenant_id", field::display(tenant_id))
|
||||
.record("timeline_id", field::display(timeline_id));
|
||||
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
|
||||
match self
|
||||
@@ -1108,6 +1144,8 @@ where
|
||||
let tenant_id = TenantId::from_str(params[0])
|
||||
.with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
|
||||
|
||||
tracing::Span::current().record("tenant_id", field::display(tenant_id));
|
||||
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
|
||||
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
|
||||
|
||||
Reference in New Issue
Block a user