diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 8728559d72..118b0c0bae 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -33,6 +33,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_util::io::StreamReader; +use tracing::field; use tracing::*; use utils::id::ConnectionId; use utils::{ @@ -51,6 +52,7 @@ use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME}; use crate::task_mgr; use crate::task_mgr::TaskKind; use crate::tenant; +use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id; use crate::tenant::mgr; use crate::tenant::mgr::GetTenantError; use crate::tenant::{Tenant, Timeline}; @@ -238,6 +240,7 @@ pub async fn libpq_listener_main( Ok(()) } +#[instrument(skip_all, fields(peer_addr))] async fn page_service_conn_main( conf: &'static PageServerConf, broker_client: storage_broker::BrokerClientChannel, @@ -260,6 +263,7 @@ async fn page_service_conn_main( .context("could not set TCP_NODELAY")?; let peer_addr = socket.peer_addr().context("get peer address")?; + tracing::Span::current().record("peer_addr", field::display(peer_addr)); // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements: // - long enough for most valid compute connections @@ -362,7 +366,7 @@ impl PageServerHandler { } } - #[instrument(skip(self, pgb, ctx))] + #[instrument(skip_all)] async fn handle_pagerequests( &self, pgb: &mut PostgresBackend, @@ -373,6 +377,8 @@ impl PageServerHandler { where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, { + debug_assert_current_span_has_tenant_and_timeline_id(); + // NOTE: pagerequests handler exits when connection is closed, // so there is no need to reset the association task_mgr::associate_with(Some(tenant_id), Some(timeline_id)); @@ -473,7 +479,7 @@ impl PageServerHandler { } #[allow(clippy::too_many_arguments)] - #[instrument(skip(self, pgb, ctx))] + #[instrument(skip_all, fields(%base_lsn, end_lsn=%_end_lsn, %pg_version))] async fn handle_import_basebackup( &self, pgb: &mut PostgresBackend, @@ -487,6 +493,8 @@ impl PageServerHandler { where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, { + debug_assert_current_span_has_tenant_and_timeline_id(); + task_mgr::associate_with(Some(tenant_id), Some(timeline_id)); // Create empty timeline info!("creating new timeline"); @@ -531,7 +539,7 @@ impl PageServerHandler { Ok(()) } - #[instrument(skip(self, pgb, ctx))] + #[instrument(skip_all, fields(%start_lsn, %end_lsn))] async fn handle_import_wal( &self, pgb: &mut PostgresBackend, @@ -544,6 +552,7 @@ impl PageServerHandler { where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, { + debug_assert_current_span_has_tenant_and_timeline_id(); task_mgr::associate_with(Some(tenant_id), Some(timeline_id)); let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?; @@ -738,7 +747,7 @@ impl PageServerHandler { } #[allow(clippy::too_many_arguments)] - #[instrument(skip(self, pgb, ctx))] + #[instrument(skip_all, fields(?lsn, ?prev_lsn, %full_backup))] async fn handle_basebackup_request( &mut self, pgb: &mut PostgresBackend, @@ -752,6 +761,8 @@ impl PageServerHandler { where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, { + debug_assert_current_span_has_tenant_and_timeline_id(); + let started = std::time::Instant::now(); // check that the timeline exists @@ -862,6 +873,7 @@ where Ok(()) } + #[instrument(skip_all, fields(tenant_id, timeline_id))] async fn process_query( &mut self, pgb: &mut PostgresBackend, @@ -883,6 +895,10 @@ where let timeline_id = TimelineId::from_str(params[1]) .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?; + tracing::Span::current() + .record("tenant_id", field::display(tenant_id)) + .record("timeline_id", field::display(timeline_id)); + self.check_permission(Some(tenant_id))?; self.handle_pagerequests(pgb, tenant_id, timeline_id, ctx) @@ -902,6 +918,10 @@ where let timeline_id = TimelineId::from_str(params[1]) .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?; + tracing::Span::current() + .record("tenant_id", field::display(tenant_id)) + .record("timeline_id", field::display(timeline_id)); + self.check_permission(Some(tenant_id))?; let lsn = if params.len() >= 3 { @@ -948,6 +968,10 @@ where let timeline_id = TimelineId::from_str(params[1]) .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?; + tracing::Span::current() + .record("tenant_id", field::display(tenant_id)) + .record("timeline_id", field::display(timeline_id)); + self.check_permission(Some(tenant_id))?; let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?; @@ -979,6 +1003,10 @@ where let timeline_id = TimelineId::from_str(params[1]) .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?; + tracing::Span::current() + .record("tenant_id", field::display(tenant_id)) + .record("timeline_id", field::display(timeline_id)); + // The caller is responsible for providing correct lsn and prev_lsn. let lsn = if params.len() > 2 { Some( @@ -1033,6 +1061,10 @@ where let pg_version = u32::from_str(params[4]) .with_context(|| format!("Failed to parse pg_version from {}", params[4]))?; + tracing::Span::current() + .record("tenant_id", field::display(tenant_id)) + .record("timeline_id", field::display(timeline_id)); + self.check_permission(Some(tenant_id))?; match self @@ -1077,6 +1109,10 @@ where let end_lsn = Lsn::from_str(params[3]) .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?; + tracing::Span::current() + .record("tenant_id", field::display(tenant_id)) + .record("timeline_id", field::display(timeline_id)); + self.check_permission(Some(tenant_id))?; match self @@ -1108,6 +1144,8 @@ where let tenant_id = TenantId::from_str(params[0]) .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?; + tracing::Span::current().record("tenant_id", field::display(tenant_id)); + self.check_permission(Some(tenant_id))?; let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;