From 716bb3c3612e425204790a42a608cc79945cb51e Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Mon, 10 Feb 2025 23:29:48 +0100 Subject: [PATCH] pageserver: thread otel dispatch into connection req context --- pageserver/src/bin/pageserver.rs | 25 +++++++++++++++++-------- pageserver/src/page_service.rs | 13 ++++++++++--- 2 files changed, 27 insertions(+), 11 deletions(-) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 66384ca8b4..3217971c02 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -205,7 +205,7 @@ fn main() -> anyhow::Result<()> { tracing::info!("Initializing page_cache..."); page_cache::init(conf.page_cache_size); - start_pageserver(launch_ts, conf).context("Failed to start pageserver")?; + start_pageserver(launch_ts, conf, otel_guard).context("Failed to start pageserver")?; scenario.teardown(); Ok(()) @@ -303,6 +303,7 @@ fn startup_checkpoint(started_at: Instant, phase: &str, human_phase: &str) { fn start_pageserver( launch_ts: &'static LaunchTimestamp, conf: &'static PageServerConf, + otel_guard: Option, ) -> anyhow::Result<()> { // Monotonic time for later calculating startup duration let started_startup_at = Instant::now(); @@ -650,13 +651,21 @@ fn start_pageserver( // Spawn a task to listen for libpq connections. It will spawn further tasks // for each connection. We created the listener earlier already. - let page_service = page_service::spawn(conf, tenant_manager.clone(), pg_auth, { - let _entered = COMPUTE_REQUEST_RUNTIME.enter(); // TcpListener::from_std requires it - pageserver_listener - .set_nonblocking(true) - .context("set listener to nonblocking")?; - tokio::net::TcpListener::from_std(pageserver_listener).context("create tokio listener")? - }); + let perf_trace_dispatch = otel_guard.as_ref().map(|g| g.dispatch.clone()); + let page_service = page_service::spawn( + conf, + tenant_manager.clone(), + pg_auth, + perf_trace_dispatch, + { + let _entered = COMPUTE_REQUEST_RUNTIME.enter(); // TcpListener::from_std requires it + pageserver_listener + .set_nonblocking(true) + .context("set listener to nonblocking")?; + tokio::net::TcpListener::from_std(pageserver_listener) + .context("create tokio listener")? + }, + ); // All started up! Now just sit and wait for shutdown signal. BACKGROUND_RUNTIME.block_on(async move { diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index ba2ed9dc81..e45df07fe6 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -53,7 +53,7 @@ use utils::sync::spsc_fold; use crate::auth::check_permission; use crate::basebackup::BasebackupError; use crate::config::PageServerConf; -use crate::context::{DownloadBehavior, RequestContext}; +use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder}; use crate::metrics::{ self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, LIVE_CONNECTIONS, SmgrOpTimer, }; @@ -99,6 +99,7 @@ pub fn spawn( conf: &'static PageServerConf, tenant_manager: Arc, pg_auth: Option>, + perf_trace_dispatch: Option, tcp_listener: tokio::net::TcpListener, ) -> Listener { let cancel = CancellationToken::new(); @@ -116,6 +117,7 @@ pub fn spawn( conf, tenant_manager, pg_auth, + perf_trace_dispatch, tcp_listener, conf.pg_auth_type, conf.page_service_pipelining.clone(), @@ -172,6 +174,7 @@ pub async fn libpq_listener_main( conf: &'static PageServerConf, tenant_manager: Arc, auth: Option>, + perf_trace_dispatch: Option, listener: tokio::net::TcpListener, auth_type: AuthType, pipelining_config: PageServicePipeliningConfig, @@ -204,8 +207,12 @@ pub async fn libpq_listener_main( // Connection established. Spawn a new task to handle it. debug!("accepted connection from {}", peer_addr); let local_auth = auth.clone(); - let connection_ctx = listener_ctx - .detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download); + let connection_ctx = RequestContextBuilder::from(&listener_ctx) + .task_kind(TaskKind::PageRequestHandler) + .download_behavior(DownloadBehavior::Download) + .perf_span_dispatch(perf_trace_dispatch.clone()) + .detached_child(); + connection_handler_tasks.spawn(page_service_conn_main( conf, tenant_manager.clone(),