diff --git a/libs/postgres_backend/src/lib.rs b/libs/postgres_backend/src/lib.rs index 8c024375c1..f74b229ac4 100644 --- a/libs/postgres_backend/src/lib.rs +++ b/libs/postgres_backend/src/lib.rs @@ -9,6 +9,8 @@ use bytes::Bytes; use serde::{Deserialize, Serialize}; use std::io::ErrorKind; use std::net::SocketAddr; +use std::os::fd::AsRawFd; +use std::os::fd::RawFd; use std::pin::Pin; use std::sync::Arc; use std::task::{ready, Poll}; @@ -268,6 +270,7 @@ impl MaybeWriteOnly { } pub struct PostgresBackend { + pub socket_fd: RawFd, framed: MaybeWriteOnly, pub state: ProtoState, @@ -293,9 +296,11 @@ impl PostgresBackend { tls_config: Option>, ) -> io::Result { let peer_addr = socket.peer_addr()?; + let socket_fd = socket.as_raw_fd(); let stream = MaybeTlsStream::Unencrypted(socket); Ok(Self { + socket_fd, framed: MaybeWriteOnly::Full(Framed::new(stream)), state: ProtoState::Initialization, auth_type, @@ -307,6 +312,7 @@ impl PostgresBackend { impl PostgresBackend { pub fn new_from_io( + socket_fd: RawFd, socket: IO, peer_addr: SocketAddr, auth_type: AuthType, @@ -315,6 +321,7 @@ impl PostgresBackend { let stream = MaybeTlsStream::Unencrypted(socket); Ok(Self { + socket_fd, framed: MaybeWriteOnly::Full(Framed::new(stream)), state: ProtoState::Initialization, auth_type, diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index 0f10300959..e9611a0f12 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -28,7 +28,7 @@ inferno.workspace = true fail.workspace = true futures = { workspace = true } jsonwebtoken.workspace = true -nix.workspace = true +nix = {workspace = true, features = [ "ioctl" ] } once_cell.workspace = true pin-project-lite.workspace = true regex.workspace = true diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 820ff2d5ea..9389a27bf3 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -93,6 +93,9 @@ pub mod try_rcu; pub mod guard_arc_swap; +#[cfg(target_os = "linux")] +pub mod linux_socket_ioctl; + // Re-export used in macro. Avoids adding git-version as dep in target crates. #[doc(hidden)] pub use git_version; diff --git a/libs/utils/src/linux_socket_ioctl.rs b/libs/utils/src/linux_socket_ioctl.rs new file mode 100644 index 0000000000..5ae0e86af8 --- /dev/null +++ b/libs/utils/src/linux_socket_ioctl.rs @@ -0,0 +1,35 @@ +//! Linux-specific socket ioctls. +//! +//! + +use std::{ + io, + mem::MaybeUninit, + os::{fd::RawFd, raw::c_int}, +}; + +use nix::libc::{FIONREAD, TIOCOUTQ}; + +unsafe fn do_ioctl(socket_fd: RawFd, cmd: nix::libc::Ioctl) -> io::Result { + let mut inq: MaybeUninit = MaybeUninit::uninit(); + let err = nix::libc::ioctl(socket_fd, cmd, inq.as_mut_ptr()); + if err == 0 { + Ok(inq.assume_init()) + } else { + Err(io::Error::last_os_error()) + } +} + +/// # Safety +/// +/// Caller must ensure that `socket_fd` is a valid TCP socket file descriptor. +pub unsafe fn inq(socket_fd: RawFd) -> io::Result { + do_ioctl(socket_fd, FIONREAD) +} + +/// # Safety +/// +/// Caller must ensure that `socket_fd` is a valid TCP socket file descriptor. +pub unsafe fn outq(socket_fd: RawFd) -> io::Result { + do_ioctl(socket_fd, TIOCOUTQ) +} diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 6a5dc3e749..0ffd4e851a 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::num::NonZeroUsize; +use std::os::fd::RawFd; use std::pin::Pin; use std::sync::atomic::AtomicU64; use std::sync::{Arc, Mutex}; @@ -1439,7 +1440,13 @@ impl Drop for SmgrOpTimer { } impl SmgrOpFlushInProgress { - pub(crate) async fn measure(self, started_at: Instant, mut fut: Fut) -> O + /// The caller must guarantee that `socket_fd`` outlives this function. + pub(crate) async fn measure( + self, + started_at: Instant, + mut fut: Fut, + socket_fd: RawFd, + ) -> O where Fut: std::future::Future, { @@ -1470,8 +1477,24 @@ impl SmgrOpFlushInProgress { } else { "slow flush completed or cancelled" }; + + let (inq, outq) = { + // SAFETY: caller guarantees that `socket_fd` outlives this function. + #[cfg(target_os = "linux")] + unsafe { + ( + utils::linux_socket_ioctl::inq(socket_fd).unwrap_or(-2), + utils::linux_socket_ioctl::outq(socket_fd).unwrap_or(-2), + ) + } + #[cfg(not(target_os = "linux"))] + { + (-1, -1) + } + }; + let elapsed_total_secs = format!("{:.6}", elapsed_total.as_secs_f64()); - tracing::info!(elapsed_total_secs, msg); + tracing::info!(elapsed_total_secs, inq, outq, msg); } }, |mut observe| { diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index bc0ed4198b..e9d87dec71 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -73,6 +73,7 @@ use pageserver_api::models::PageTraceEvent; use pageserver_api::reltag::SlruKind; use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID; use postgres_ffi::BLCKSZ; +use std::os::fd::AsRawFd; /// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::Tenant`] which /// is not yet in state [`TenantState::Active`]. @@ -257,6 +258,8 @@ async fn page_service_conn_main( .set_nodelay(true) .context("could not set TCP_NODELAY")?; + let socket_fd = socket.as_raw_fd(); + let peer_addr = socket.peer_addr().context("get peer address")?; tracing::Span::current().record("peer_addr", field::display(peer_addr)); @@ -305,7 +308,7 @@ async fn page_service_conn_main( cancel.clone(), gate_guard, ); - let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?; + let pgbackend = PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, None)?; match pgbackend.run(&mut conn_handler, &cancel).await { Ok(()) => { @@ -1286,12 +1289,15 @@ impl PageServerHandler { ))?; // what we want to do + let socket_fd = pgb_writer.socket_fd; let flush_fut = pgb_writer.flush(); // metric for how long flushing takes let flush_fut = match flushing_timer { - Some(flushing_timer) => { - futures::future::Either::Left(flushing_timer.measure(Instant::now(), flush_fut)) - } + Some(flushing_timer) => futures::future::Either::Left(flushing_timer.measure( + Instant::now(), + flush_fut, + socket_fd, + )), None => futures::future::Either::Right(flush_fut), }; // do it while respecting cancellation diff --git a/safekeeper/src/wal_service.rs b/safekeeper/src/wal_service.rs index 1ebcb060e7..e5ccbb3230 100644 --- a/safekeeper/src/wal_service.rs +++ b/safekeeper/src/wal_service.rs @@ -13,6 +13,8 @@ use tokio_util::sync::CancellationToken; use tracing::*; use utils::{auth::Scope, measured_stream::MeasuredStream}; +use std::os::fd::AsRawFd; + use crate::metrics::TrafficMetrics; use crate::SafeKeeperConf; use crate::{handler::SafekeeperPostgresHandler, GlobalTimelines}; @@ -62,6 +64,7 @@ async fn handle_socket( global_timelines: Arc, ) -> Result<(), QueryError> { socket.set_nodelay(true)?; + let socket_fd = socket.as_raw_fd(); let peer_addr = socket.peer_addr()?; // Set timeout on reading from the socket. It prevents hanged up connection @@ -107,7 +110,7 @@ async fn handle_socket( auth_pair, global_timelines, ); - let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?; + let pgbackend = PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, None)?; // libpq protocol between safekeeper and walproposer / pageserver // We don't use shutdown. pgbackend