From 965837df53c945be91e914d7fadb032f4d9e4371 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Fri, 10 Mar 2023 00:09:28 +0400 Subject: [PATCH] Log connection ids in safekeeper instead of thread ids. Fixes build on macOS (which doesn't have nix gettid) after 0d8ced85341102. --- Cargo.lock | 1 - safekeeper/Cargo.toml | 1 - safekeeper/src/handler.rs | 6 +++++- safekeeper/src/lib.rs | 1 + safekeeper/src/receive_wal.rs | 11 +++++++---- safekeeper/src/wal_service.rs | 24 +++++++++++++++++++----- 6 files changed, 32 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b96f7dbc99..2e3ea2842d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3333,7 +3333,6 @@ dependencies = [ "humantime", "hyper", "metrics", - "nix", "once_cell", "parking_lot", "postgres", diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index 36ee15347d..8b0733832a 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -19,7 +19,6 @@ git-version.workspace = true hex.workspace = true humantime.workspace = true hyper.workspace = true -nix.workspace = true once_cell.workspace = true parking_lot.workspace = true postgres.workspace = true diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index 3e7bafbd2f..7d788fe3b9 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -8,6 +8,7 @@ use tracing::{info, info_span, Instrument}; use crate::auth::check_permission; use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage}; +use crate::wal_service::ConnectionId; use crate::{GlobalTimelines, SafeKeeperConf}; use postgres_backend::QueryError; use postgres_backend::{self, PostgresBackend}; @@ -28,6 +29,8 @@ pub struct SafekeeperPostgresHandler { pub tenant_id: Option, pub timeline_id: Option, pub ttid: TenantTimelineId, + /// Unique connection id is logged in spans for observability. + pub conn_id: ConnectionId, claims: Option, } @@ -181,13 +184,14 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler { } impl SafekeeperPostgresHandler { - pub fn new(conf: SafeKeeperConf) -> Self { + pub fn new(conf: SafeKeeperConf, conn_id: u32) -> Self { SafekeeperPostgresHandler { conf, appname: None, tenant_id: None, timeline_id: None, ttid: TenantTimelineId::empty(), + conn_id, claims: None, } } diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 03df546a4d..f4e753cdbf 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -1,4 +1,5 @@ use remote_storage::RemoteStorageConfig; + use std::path::PathBuf; use std::time::Duration; use storage_broker::Uri; diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index 22c9871026..b7cf5a7310 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -7,10 +7,10 @@ use crate::safekeeper::AcceptorProposerMessage; use crate::safekeeper::ProposerAcceptorMessage; use crate::safekeeper::ServerInfo; use crate::timeline::Timeline; +use crate::wal_service::ConnectionId; use crate::GlobalTimelines; use anyhow::{anyhow, Context}; use bytes::BytesMut; -use nix::unistd::gettid; use postgres_backend::CopyStreamHandlerEnd; use postgres_backend::PostgresBackend; use postgres_backend::PostgresBackendReader; @@ -70,7 +70,7 @@ impl SafekeeperPostgresHandler { let peer_addr = *pgb.get_peer_addr(); let res = tokio::select! { // todo: add read|write .context to these errors - r = read_network(self.ttid, &mut pgb_reader, peer_addr, msg_tx, &mut acceptor_handle, msg_rx, reply_tx) => r, + r = read_network(self.ttid, self.conn_id, &mut pgb_reader, peer_addr, msg_tx, &mut acceptor_handle, msg_rx, reply_tx) => r, r = write_network(pgb, reply_rx) => r, }; @@ -119,6 +119,7 @@ async fn read_message( /// tell the error. async fn read_network( ttid: TenantTimelineId, + conn_id: ConnectionId, pgb_reader: &mut PostgresBackendReader, peer_addr: SocketAddr, msg_tx: Sender, @@ -151,7 +152,8 @@ async fn read_network( }; *acceptor_handle = Some( - WalAcceptor::spawn(tli.clone(), msg_rx, reply_tx).context("spawn WalAcceptor thread")?, + WalAcceptor::spawn(tli.clone(), msg_rx, reply_tx, conn_id) + .context("spawn WalAcceptor thread")?, ); // Forward all messages to WalAcceptor @@ -205,6 +207,7 @@ impl WalAcceptor { tli: Arc, msg_rx: Receiver, reply_tx: Sender, + conn_id: ConnectionId, ) -> anyhow::Result>> { let thread_name = format!("WAL acceptor {}", tli.ttid); thread::Builder::new() @@ -223,7 +226,7 @@ impl WalAcceptor { let span_ttid = wa.tli.ttid; // satisfy borrow checker runtime.block_on( wa.run() - .instrument(info_span!("WAL acceptor", tid = %gettid(), ttid = %span_ttid)), + .instrument(info_span!("WAL acceptor", cid = %conn_id, ttid = %span_ttid)), ) }) .map_err(anyhow::Error::from) diff --git a/safekeeper/src/wal_service.rs b/safekeeper/src/wal_service.rs index 8d63d604ad..96f063d686 100644 --- a/safekeeper/src/wal_service.rs +++ b/safekeeper/src/wal_service.rs @@ -3,7 +3,6 @@ //! receive WAL from wal_proposer and send it to WAL receivers //! use anyhow::{Context, Result}; -use nix::unistd::gettid; use postgres_backend::QueryError; use std::{future, thread}; use tokio::net::TcpStream; @@ -27,17 +26,19 @@ pub fn thread_main(conf: SafeKeeperConf, pg_listener: std::net::TcpListener) { // Tokio's from_std won't do this for us, per its comment. pg_listener.set_nonblocking(true)?; let listener = tokio::net::TcpListener::from_std(pg_listener)?; + let mut connection_count: ConnectionCount = 0; loop { match listener.accept().await { Ok((socket, peer_addr)) => { debug!("accepted connection from {}", peer_addr); let conf = conf.clone(); + let conn_id = issue_connection_id(&mut connection_count); let _ = thread::Builder::new() .name("WAL service thread".into()) .spawn(move || { - if let Err(err) = handle_socket(socket, conf) { + if let Err(err) = handle_socket(socket, conf, conn_id) { error!("connection handler exited: {}", err); } }) @@ -54,8 +55,12 @@ pub fn thread_main(conf: SafeKeeperConf, pg_listener: std::net::TcpListener) { /// This is run by `thread_main` above, inside a background thread. /// -fn handle_socket(socket: TcpStream, conf: SafeKeeperConf) -> Result<(), QueryError> { - let _enter = info_span!("", tid = %gettid()).entered(); +fn handle_socket( + socket: TcpStream, + conf: SafeKeeperConf, + conn_id: ConnectionId, +) -> Result<(), QueryError> { + let _enter = info_span!("", cid = %conn_id).entered(); let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() @@ -68,7 +73,7 @@ fn handle_socket(socket: TcpStream, conf: SafeKeeperConf) -> Result<(), QueryErr None => AuthType::Trust, Some(_) => AuthType::NeonJWT, }; - let mut conn_handler = SafekeeperPostgresHandler::new(conf); + let mut conn_handler = SafekeeperPostgresHandler::new(conf, conn_id); let pgbackend = PostgresBackend::new(socket, auth_type, None)?; // libpq protocol between safekeeper and walproposer / pageserver // We don't use shutdown. @@ -79,3 +84,12 @@ fn handle_socket(socket: TcpStream, conf: SafeKeeperConf) -> Result<(), QueryErr Ok(()) } + +/// Unique WAL service connection ids are logged in spans for observability. +pub type ConnectionId = u32; +pub type ConnectionCount = u32; + +pub fn issue_connection_id(count: &mut ConnectionCount) -> ConnectionId { + *count = count.wrapping_add(1); + *count +}