mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
page_service: include socket send & recv queue length in slow flush log mesage (#10823)
# Summary In - https://github.com/neondatabase/neon/pull/10813 we added slow flush logging but it didn't log the TCP send & recv queue length. This PR adds that data to the log message. I believe the implementation to be safe & correct right now, but it's brittle and thus this PR should be reverted or improved upon once the investigation is over. Refs: - stacked atop https://github.com/neondatabase/neon/pull/10813 - context: https://neondb.slack.com/archives/C08DE6Q9C3B/p1739464533762049?thread_ts=1739462628.361019&cid=C08DE6Q9C3B - improves https://github.com/neondatabase/neon/issues/10668 - part of https://github.com/neondatabase/cloud/issues/23515 # How It Works The trouble is two-fold: 1. getting to the raw socket file descriptor through the many Rust types that wrap it and 2. integrating with the `measure()` function Rust wraps it in types to model file descriptor lifetimes and ownership, and usually one can get access using `as_raw_fd()`. However, we `split()` the stream and the resulting [`tokio::io::WriteHalf`](https://docs.rs/tokio/latest/tokio/io/struct.WriteHalf.html) . Check the PR commit history for my attempts to do it. My solution is to get the socket fd before we wrap it in our protocol types, and to store that fd in the new `PostgresBackend::socket_fd` field. I believe it's safe because the lifetime of `PostgresBackend::socket_fd` value == the lifetime of the `TcpStream` that wrap and store in `PostgresBackend::framed`. Specifically, the only place that close()s the socket is the `impl Drop for TcpStream`. I think the protocol stack calls `TcpStream::shutdown()`, but, that doesn't `close()` the file descriptor underneath. Regarding integration with the `measure()` function, the trouble is that `flush_fut` is currently a generic `Future` type. So, we just pass in the `socket_fd` as a separate argument. A clean implementation would convert the `pgb_writer.flush()` to a named future that provides an accessor for the socket fd while not being polled. I tried (see PR history), but failed to break through the `WriteHalf`. # Testing Tested locally by running ``` ./target/debug/pagebench get-page-latest-lsn --num-clients=1000 --queue-depth=1000 ``` in one terminal, waiting a bit, then ``` pkill -STOP pagebench ``` then wait for slow logs to show up in `pageserver.log`. Pick one of the slow log message's port pairs, e.g., `127.0.0.1:39500`, and then checking sockstat output ``` ss -ntp | grep '127.0.0.1:39500' ``` to ensure that send & recv queue size match those in the log message.
This commit is contained in:
committed by
GitHub
parent
3d7a32f619
commit
b992a1a62a
@@ -9,6 +9,8 @@ use bytes::Bytes;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::io::ErrorKind;
|
||||
use std::net::SocketAddr;
|
||||
use std::os::fd::AsRawFd;
|
||||
use std::os::fd::RawFd;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::{ready, Poll};
|
||||
@@ -268,6 +270,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> MaybeWriteOnly<IO> {
|
||||
}
|
||||
|
||||
pub struct PostgresBackend<IO> {
|
||||
pub socket_fd: RawFd,
|
||||
framed: MaybeWriteOnly<IO>,
|
||||
|
||||
pub state: ProtoState,
|
||||
@@ -293,9 +296,11 @@ impl PostgresBackend<tokio::net::TcpStream> {
|
||||
tls_config: Option<Arc<rustls::ServerConfig>>,
|
||||
) -> io::Result<Self> {
|
||||
let peer_addr = socket.peer_addr()?;
|
||||
let socket_fd = socket.as_raw_fd();
|
||||
let stream = MaybeTlsStream::Unencrypted(socket);
|
||||
|
||||
Ok(Self {
|
||||
socket_fd,
|
||||
framed: MaybeWriteOnly::Full(Framed::new(stream)),
|
||||
state: ProtoState::Initialization,
|
||||
auth_type,
|
||||
@@ -307,6 +312,7 @@ impl PostgresBackend<tokio::net::TcpStream> {
|
||||
|
||||
impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
|
||||
pub fn new_from_io(
|
||||
socket_fd: RawFd,
|
||||
socket: IO,
|
||||
peer_addr: SocketAddr,
|
||||
auth_type: AuthType,
|
||||
@@ -315,6 +321,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
|
||||
let stream = MaybeTlsStream::Unencrypted(socket);
|
||||
|
||||
Ok(Self {
|
||||
socket_fd,
|
||||
framed: MaybeWriteOnly::Full(Framed::new(stream)),
|
||||
state: ProtoState::Initialization,
|
||||
auth_type,
|
||||
|
||||
@@ -28,7 +28,7 @@ inferno.workspace = true
|
||||
fail.workspace = true
|
||||
futures = { workspace = true }
|
||||
jsonwebtoken.workspace = true
|
||||
nix.workspace = true
|
||||
nix = {workspace = true, features = [ "ioctl" ] }
|
||||
once_cell.workspace = true
|
||||
pin-project-lite.workspace = true
|
||||
regex.workspace = true
|
||||
|
||||
@@ -93,6 +93,9 @@ pub mod try_rcu;
|
||||
|
||||
pub mod guard_arc_swap;
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
pub mod linux_socket_ioctl;
|
||||
|
||||
// Re-export used in macro. Avoids adding git-version as dep in target crates.
|
||||
#[doc(hidden)]
|
||||
pub use git_version;
|
||||
|
||||
35
libs/utils/src/linux_socket_ioctl.rs
Normal file
35
libs/utils/src/linux_socket_ioctl.rs
Normal file
@@ -0,0 +1,35 @@
|
||||
//! Linux-specific socket ioctls.
|
||||
//!
|
||||
//! <https://elixir.bootlin.com/linux/v6.1.128/source/include/uapi/linux/sockios.h#L25-L27>
|
||||
|
||||
use std::{
|
||||
io,
|
||||
mem::MaybeUninit,
|
||||
os::{fd::RawFd, raw::c_int},
|
||||
};
|
||||
|
||||
use nix::libc::{FIONREAD, TIOCOUTQ};
|
||||
|
||||
unsafe fn do_ioctl(socket_fd: RawFd, cmd: nix::libc::Ioctl) -> io::Result<c_int> {
|
||||
let mut inq: MaybeUninit<c_int> = MaybeUninit::uninit();
|
||||
let err = nix::libc::ioctl(socket_fd, cmd, inq.as_mut_ptr());
|
||||
if err == 0 {
|
||||
Ok(inq.assume_init())
|
||||
} else {
|
||||
Err(io::Error::last_os_error())
|
||||
}
|
||||
}
|
||||
|
||||
/// # Safety
|
||||
///
|
||||
/// Caller must ensure that `socket_fd` is a valid TCP socket file descriptor.
|
||||
pub unsafe fn inq(socket_fd: RawFd) -> io::Result<c_int> {
|
||||
do_ioctl(socket_fd, FIONREAD)
|
||||
}
|
||||
|
||||
/// # Safety
|
||||
///
|
||||
/// Caller must ensure that `socket_fd` is a valid TCP socket file descriptor.
|
||||
pub unsafe fn outq(socket_fd: RawFd) -> io::Result<c_int> {
|
||||
do_ioctl(socket_fd, TIOCOUTQ)
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::collections::HashMap;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::os::fd::RawFd;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::{Arc, Mutex};
|
||||
@@ -1439,7 +1440,13 @@ impl Drop for SmgrOpTimer {
|
||||
}
|
||||
|
||||
impl SmgrOpFlushInProgress {
|
||||
pub(crate) async fn measure<Fut, O>(self, started_at: Instant, mut fut: Fut) -> O
|
||||
/// The caller must guarantee that `socket_fd`` outlives this function.
|
||||
pub(crate) async fn measure<Fut, O>(
|
||||
self,
|
||||
started_at: Instant,
|
||||
mut fut: Fut,
|
||||
socket_fd: RawFd,
|
||||
) -> O
|
||||
where
|
||||
Fut: std::future::Future<Output = O>,
|
||||
{
|
||||
@@ -1470,8 +1477,24 @@ impl SmgrOpFlushInProgress {
|
||||
} else {
|
||||
"slow flush completed or cancelled"
|
||||
};
|
||||
|
||||
let (inq, outq) = {
|
||||
// SAFETY: caller guarantees that `socket_fd` outlives this function.
|
||||
#[cfg(target_os = "linux")]
|
||||
unsafe {
|
||||
(
|
||||
utils::linux_socket_ioctl::inq(socket_fd).unwrap_or(-2),
|
||||
utils::linux_socket_ioctl::outq(socket_fd).unwrap_or(-2),
|
||||
)
|
||||
}
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
{
|
||||
(-1, -1)
|
||||
}
|
||||
};
|
||||
|
||||
let elapsed_total_secs = format!("{:.6}", elapsed_total.as_secs_f64());
|
||||
tracing::info!(elapsed_total_secs, msg);
|
||||
tracing::info!(elapsed_total_secs, inq, outq, msg);
|
||||
}
|
||||
},
|
||||
|mut observe| {
|
||||
|
||||
@@ -73,6 +73,7 @@ use pageserver_api::models::PageTraceEvent;
|
||||
use pageserver_api::reltag::SlruKind;
|
||||
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use std::os::fd::AsRawFd;
|
||||
|
||||
/// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::Tenant`] which
|
||||
/// is not yet in state [`TenantState::Active`].
|
||||
@@ -257,6 +258,8 @@ async fn page_service_conn_main(
|
||||
.set_nodelay(true)
|
||||
.context("could not set TCP_NODELAY")?;
|
||||
|
||||
let socket_fd = socket.as_raw_fd();
|
||||
|
||||
let peer_addr = socket.peer_addr().context("get peer address")?;
|
||||
tracing::Span::current().record("peer_addr", field::display(peer_addr));
|
||||
|
||||
@@ -305,7 +308,7 @@ async fn page_service_conn_main(
|
||||
cancel.clone(),
|
||||
gate_guard,
|
||||
);
|
||||
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
|
||||
let pgbackend = PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, None)?;
|
||||
|
||||
match pgbackend.run(&mut conn_handler, &cancel).await {
|
||||
Ok(()) => {
|
||||
@@ -1286,12 +1289,15 @@ impl PageServerHandler {
|
||||
))?;
|
||||
|
||||
// what we want to do
|
||||
let socket_fd = pgb_writer.socket_fd;
|
||||
let flush_fut = pgb_writer.flush();
|
||||
// metric for how long flushing takes
|
||||
let flush_fut = match flushing_timer {
|
||||
Some(flushing_timer) => {
|
||||
futures::future::Either::Left(flushing_timer.measure(Instant::now(), flush_fut))
|
||||
}
|
||||
Some(flushing_timer) => futures::future::Either::Left(flushing_timer.measure(
|
||||
Instant::now(),
|
||||
flush_fut,
|
||||
socket_fd,
|
||||
)),
|
||||
None => futures::future::Either::Right(flush_fut),
|
||||
};
|
||||
// do it while respecting cancellation
|
||||
|
||||
@@ -13,6 +13,8 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::{auth::Scope, measured_stream::MeasuredStream};
|
||||
|
||||
use std::os::fd::AsRawFd;
|
||||
|
||||
use crate::metrics::TrafficMetrics;
|
||||
use crate::SafeKeeperConf;
|
||||
use crate::{handler::SafekeeperPostgresHandler, GlobalTimelines};
|
||||
@@ -62,6 +64,7 @@ async fn handle_socket(
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
) -> Result<(), QueryError> {
|
||||
socket.set_nodelay(true)?;
|
||||
let socket_fd = socket.as_raw_fd();
|
||||
let peer_addr = socket.peer_addr()?;
|
||||
|
||||
// Set timeout on reading from the socket. It prevents hanged up connection
|
||||
@@ -107,7 +110,7 @@ async fn handle_socket(
|
||||
auth_pair,
|
||||
global_timelines,
|
||||
);
|
||||
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
|
||||
let pgbackend = PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, None)?;
|
||||
// libpq protocol between safekeeper and walproposer / pageserver
|
||||
// We don't use shutdown.
|
||||
pgbackend
|
||||
|
||||
Reference in New Issue
Block a user