compute: Disconnect if no response to a pageserver request is received (#10882)

We've seen some cases in production where a compute doesn't get a
response to a pageserver request for several minutes, or even more. We
haven't found the root cause for that yet, but whatever the reason is,
it seems overly optimistic to think that if the pageserver hasn't
responded for 2 minutes, we'd get a response if we just wait patiently a
little longer. More likely, the pageserver is dead or there's some kind
of a network glitch so that the TCP connection is dead, or at least
stuck for a long time. Either way, it's better to disconnect and
reconnect. I set the default timeout to 2 minutes, which should be
enough for any GetPage request under normal circumstances, even if the
pageserver has to download several layer files from remote storage.

Make the disconnect timeout configurable. Also make the "log interval",
after which we print a message to the log configurable, so that if you
change the disconnect timeout, you can set the log timeout
correspondingly. The default log interval is still 10 s. The new GUCs
are called "neon.pageserver_response_log_timeout" and
"neon.pageserver_response_disconnect_timeout".

Includes a basic test for the log and disconnect timeouts.

Implements issue #10857
This commit is contained in:
Heikki Linnakangas
2025-02-24 22:16:37 +02:00
committed by GitHub
parent 8fd0f89b94
commit 565a9e62a1
3 changed files with 347 additions and 62 deletions

View File

@@ -45,6 +45,7 @@ use utils::sync::gate::{Gate, GateGuard};
use utils::sync::spsc_fold;
use utils::{
auth::{Claims, Scope, SwappableJwtAuth},
failpoint_support,
id::{TenantId, TimelineId},
lsn::Lsn,
simple_rcu::RcuReadGuard,
@@ -1298,6 +1299,8 @@ impl PageServerHandler {
&response_msg.serialize(protocol_version),
))?;
failpoint_support::sleep_millis_async!("before-pagestream-msg-flush", cancel);
// what we want to do
let socket_fd = pgb_writer.socket_fd;
let flush_fut = pgb_writer.flush();

View File

@@ -14,6 +14,8 @@
*/
#include "postgres.h"
#include <math.h>
#include "access/xlog.h"
#include "common/hashfn.h"
#include "fmgr.h"
@@ -61,6 +63,9 @@ int neon_protocol_version = 2;
static int max_reconnect_attempts = 60;
static int stripe_size;
static int pageserver_response_log_timeout = 10000;
static int pageserver_response_disconnect_timeout = 120000; /* 2 minutes */
typedef struct
{
char connstring[MAX_SHARDS][MAX_PAGESERVER_CONNSTRING_SIZE];
@@ -129,6 +134,11 @@ typedef struct
uint64 nrequests_sent;
uint64 nresponses_received;
/* State for the receive timeout mechanism in call_PQgetCopyData() */
instr_time receive_start_time; /* when we started waiting */
instr_time receive_last_log_time; /* when we last printed a log message for the wait */
bool receive_logged; /* has the wait been logged */
/*---
* WaitEventSet containing:
* - WL_SOCKET_READABLE on 'conn'
@@ -661,6 +671,9 @@ pageserver_connect(shardno_t shard_no, int elevel)
shard->state = PS_Connected;
shard->nrequests_sent = 0;
shard->nresponses_received = 0;
INSTR_TIME_SET_ZERO(shard->receive_start_time);
INSTR_TIME_SET_ZERO(shard->receive_last_log_time);
shard->receive_logged = false;
}
/* FALLTHROUGH */
case PS_Connected:
@@ -680,6 +693,33 @@ pageserver_connect(shardno_t shard_no, int elevel)
Assert(false);
}
static void
get_socket_stats(int socketfd, int *sndbuf, int *recvbuf)
{
*sndbuf = -1;
*recvbuf = -1;
#ifdef __linux__
/*
* get kernel's send and recv queue size via ioctl
* https://elixir.bootlin.com/linux/v6.1.128/source/include/uapi/linux/sockios.h#L25-L27
*/
if (socketfd != -1)
{
int ioctl_err;
ioctl_err = ioctl(socketfd, SIOCOUTQ, sndbuf);
if (ioctl_err!= 0) {
*sndbuf = -errno;
}
ioctl_err = ioctl(socketfd, FIONREAD, recvbuf);
if (ioctl_err != 0) {
*recvbuf = -errno;
}
}
#endif
}
/*
* A wrapper around PQgetCopyData that checks for interrupts while sleeping.
*/
@@ -690,26 +730,8 @@ call_PQgetCopyData(shardno_t shard_no, char **buffer)
PageServer *shard = &page_servers[shard_no];
PGconn *pageserver_conn = shard->conn;
instr_time now,
start_ts,
since_start,
last_log_ts,
since_last_log;
bool logged = false;
/*
* As a debugging aid, if we don't get a response for a long time, print a
* log message.
*
* 10 s is a very generous threshold, normally we expect a response in a
* few milliseconds. We have metrics to track latencies in normal ranges,
* but in the cases that take exceptionally long, it's useful to log the
* exact timestamps.
*/
#define LOG_INTERVAL_MS INT64CONST(10 * 1000)
INSTR_TIME_SET_CURRENT(now);
start_ts = last_log_ts = now;
INSTR_TIME_SET_ZERO(since_last_log);
retry:
ret = PQgetCopyData(pageserver_conn, buffer, 1 /* async */ );
@@ -718,11 +740,36 @@ retry:
{
WaitEvent occurred_event;
int noccurred;
double log_timeout,
disconnect_timeout;
long timeout;
timeout = Max(0, LOG_INTERVAL_MS - INSTR_TIME_GET_MILLISEC(since_last_log));
/*
* Calculate time elapsed since the start, and since the last progress
* log message. On first call, remember the start time.
*/
INSTR_TIME_SET_CURRENT(now);
if (INSTR_TIME_IS_ZERO(shard->receive_start_time))
{
shard->receive_start_time = now;
INSTR_TIME_SET_ZERO(since_start);
shard->receive_last_log_time = now;
INSTR_TIME_SET_ZERO(since_last_log);
shard->receive_logged = false;
}
else
{
since_start = now;
INSTR_TIME_SUBTRACT(since_start, shard->receive_start_time);
since_last_log = now;
INSTR_TIME_SUBTRACT(since_last_log, shard->receive_last_log_time);
}
/* Sleep until the log or disconnect timeout is reached. */
log_timeout = Max(0, (double) pageserver_response_log_timeout - INSTR_TIME_GET_MILLISEC(since_last_log));
disconnect_timeout = Max(0, (double) pageserver_response_disconnect_timeout - INSTR_TIME_GET_MILLISEC(since_start));
timeout = (long) ceil(Min(log_timeout, disconnect_timeout));
/* Sleep until there's something to do */
noccurred = WaitEventSetWait(shard->wes_read, timeout, &occurred_event, 1,
WAIT_EVENT_NEON_PS_READ);
ResetLatch(MyLatch);
@@ -740,49 +787,61 @@ retry:
pfree(msg);
return -1;
}
goto retry;
}
/* Timeout was reached, or we were interrupted for some other reason */
INSTR_TIME_SET_CURRENT(now);
since_last_log = now;
INSTR_TIME_SUBTRACT(since_last_log, shard->receive_last_log_time);
since_start = now;
INSTR_TIME_SUBTRACT(since_start, shard->receive_start_time);
/*
* As a debugging aid, if we don't get a response to a pageserver request
* for a long time, print a log message.
*
* The default neon.pageserver_response_log_timeout value, 10 s, is
* very generous. Normally we expect a response in a few
* milliseconds. We have metrics to track latencies in normal ranges,
* but in the cases that take exceptionally long, it's useful to log
* the exact timestamps.
*/
if (INSTR_TIME_GET_MILLISEC(since_last_log) >= pageserver_response_log_timeout)
{
int sndbuf;
int recvbuf;
get_socket_stats(PQsocket(pageserver_conn), &sndbuf, &recvbuf);
neon_shard_log(shard_no, LOG,
"no response received from pageserver for %0.3f s, still waiting (sent " UINT64_FORMAT " requests, received " UINT64_FORMAT " responses) (socket sndbuf=%d recvbuf=%d)",
INSTR_TIME_GET_DOUBLE(since_start),
shard->nrequests_sent, shard->nresponses_received, sndbuf, recvbuf);
shard->receive_last_log_time = now;
shard->receive_logged = true;
}
/*
* Print a message to the log if a long time has passed with no
* response.
* If an even longer time has passed without receiving a response from
* the pageserver, disconnect. That triggers a reconnection attempt
* in the caller.
*
* If this happens, the pageserver is likely dead and isn't coming
* back, or there's some kind of a network glitch and the connection
* is permanently gone. Without this, if the pageserver or the network
* connection is dead, it could take a very long time (15 minutes or
* more) until the TCP keepalive timeout notices that. Even if we
* would in fact get a response if we just waited a little longer,
* there's a good chance that we'll get the response sooner by
* reconnecting.
*/
INSTR_TIME_SET_CURRENT(now);
since_last_log = now;
INSTR_TIME_SUBTRACT(since_last_log, last_log_ts);
if (INSTR_TIME_GET_MILLISEC(since_last_log) >= LOG_INTERVAL_MS)
if (INSTR_TIME_GET_MILLISEC(since_start) >= pageserver_response_disconnect_timeout)
{
int sndbuf = -1;
int recvbuf = -1;
#ifdef __linux__
int socketfd;
#endif
since_start = now;
INSTR_TIME_SUBTRACT(since_start, start_ts);
#ifdef __linux__
/*
* get kernel's send and recv queue size via ioctl
* https://elixir.bootlin.com/linux/v6.1.128/source/include/uapi/linux/sockios.h#L25-L27
*/
socketfd = PQsocket(pageserver_conn);
if (socketfd != -1) {
int ioctl_err;
ioctl_err = ioctl(socketfd, SIOCOUTQ, &sndbuf);
if (ioctl_err!= 0) {
sndbuf = -errno;
}
ioctl_err = ioctl(socketfd, FIONREAD, &recvbuf);
if (ioctl_err != 0) {
recvbuf = -errno;
}
}
#endif
neon_shard_log(shard_no, LOG, "no response received from pageserver for %0.3f s, still waiting (sent " UINT64_FORMAT " requests, received " UINT64_FORMAT " responses) (socket sndbuf=%d recvbuf=%d)",
INSTR_TIME_GET_DOUBLE(since_start),
shard->nrequests_sent, shard->nresponses_received, sndbuf, recvbuf);
last_log_ts = now;
logged = true;
neon_shard_log(shard_no, LOG, "no response from pageserver for %0.3f s, disconnecting",
INSTR_TIME_GET_DOUBLE(since_start));
pageserver_disconnect(shard_no);
return -1;
}
goto retry;
@@ -792,14 +851,18 @@ retry:
* If we logged earlier that the response is taking a long time, log
* another message when the response is finally received.
*/
if (logged)
if (shard->receive_logged)
{
INSTR_TIME_SET_CURRENT(now);
since_start = now;
INSTR_TIME_SUBTRACT(since_start, start_ts);
neon_shard_log(shard_no, LOG, "received response from pageserver after %0.3f s",
INSTR_TIME_SUBTRACT(since_start, shard->receive_start_time);
neon_shard_log(shard_no, LOG,
"received response from pageserver after %0.3f s",
INSTR_TIME_GET_DOUBLE(since_start));
}
INSTR_TIME_SET_ZERO(shard->receive_start_time);
INSTR_TIME_SET_ZERO(shard->receive_last_log_time);
shard->receive_logged = false;
return ret;
}
@@ -973,9 +1036,17 @@ pageserver_receive(shardno_t shard_no)
pfree(msg);
}
}
else if (rc == -1 && shard->state == PS_Disconnected)
{
/* If the state is 'Disconnected', the disconnection message was already logged */
resp = NULL;
}
else if (rc == -1)
{
neon_shard_log(shard_no, LOG, "pageserver_receive disconnect: psql end of copy data: %s", pchomp(PQerrorMessage(pageserver_conn)));
char *msg = pchomp(PQerrorMessage(pageserver_conn));
neon_shard_log(shard_no, LOG, "pageserver_receive disconnect: psql end of copy data: %s", msg);
pfree(msg);
pageserver_disconnect(shard_no);
resp = NULL;
}
@@ -1261,6 +1332,26 @@ pg_init_libpagestore(void)
0, /* no flags required */
NULL, NULL, NULL);
DefineCustomIntVariable("neon.pageserver_response_log_timeout",
"pageserver response log timeout",
"If the pageserver doesn't respond to a request within this timeout,"
"a message is printed to the log.",
&pageserver_response_log_timeout,
10000, 100, INT_MAX,
PGC_SUSET,
GUC_UNIT_MS,
NULL, NULL, NULL);
DefineCustomIntVariable("neon.pageserver_response_disconnect_timeout",
"pageserver response diconnect timeout",
"If the pageserver doesn't respond to a request within this timeout,"
"disconnect and reconnect.",
&pageserver_response_disconnect_timeout,
120000, 100, INT_MAX,
PGC_SUSET,
GUC_UNIT_MS,
NULL, NULL, NULL);
relsize_hash_init();
if (page_server != NULL)

View File

@@ -7,6 +7,7 @@ import psycopg2.errors
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.utils import USE_LFC
@pytest.mark.timeout(600)
@@ -80,3 +81,193 @@ def test_compute_pageserver_connection_stress(neon_env_builder: NeonEnvBuilder):
# do a graceful shutdown which would had caught the allowed_errors before
# https://github.com/neondatabase/neon/pull/8632
env.pageserver.stop()
def test_compute_pageserver_hung_connections(neon_env_builder: NeonEnvBuilder):
"""
Test timeouts in waiting for response to pageserver request
"""
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.append(".*slow GetPage.*")
pageserver_http = env.pageserver.http_client()
endpoint = env.endpoints.create_start(
"main",
tenant_id=env.initial_tenant,
config_lines=["autovacuum = off"],
)
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
# Create table, and insert some rows. Make it big enough that it doesn't fit in
# shared_buffers, otherwise the SELECT after restart will just return answer
# from shared_buffers without hitting the page server, which defeats the point
# of this test.
cur.execute("CREATE TABLE foo (t text)")
cur.execute(
"""
INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 100000) g
"""
)
# Verify that the table is larger than shared_buffers
cur.execute(
"""
select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_size
from pg_settings where name = 'shared_buffers'
"""
)
row = cur.fetchone()
assert row is not None
log.debug(f"shared_buffers is {row[0]}, table size {row[1]}")
assert int(row[0]) < int(row[1])
# Print the backend PID so that it can be compared with the logs easily
cur.execute("SELECT pg_backend_pid()")
row = cur.fetchone()
assert row is not None
log.info(f"running test workload in backend PID {row[0]}")
def run_workload(duration: float):
end_time = time.time() + duration
times_executed = 0
while time.time() < end_time:
if random.random() < 0.5:
cur.execute("INSERT INTO foo VALUES ('stas'), ('heikki')")
else:
cur.execute("SELECT t FROM foo ORDER BY RANDOM() LIMIT 10")
cur.fetchall()
times_executed += 1
log.info(f"Workload executed {times_executed} times")
assert times_executed > 0
## Test short connection hiccups
##
## This is to exercise the logging timeout.
log.info("running workload with log timeout")
cur.execute("SET neon.pageserver_response_log_timeout = '500ms'")
pageserver_http.configure_failpoints(("before-pagestream-msg-flush", "10%3*return(3000)"))
run_workload(20)
# check that the message was logged
assert endpoint.log_contains("no response received from pageserver for .* s, still waiting")
assert endpoint.log_contains("received response from pageserver after .* s")
## Test connections that are hung for longer
##
## This exercises the disconnect timeout. We'll disconnect and
## reconnect after 500 ms.
log.info("running workload with disconnect timeout")
cur.execute("SET neon.pageserver_response_log_timeout = '250ms'")
cur.execute("SET neon.pageserver_response_disconnect_timeout = '500ms'")
pageserver_http.configure_failpoints(("before-pagestream-msg-flush", "10%3*return(3000)"))
run_workload(15)
assert endpoint.log_contains("no response from pageserver for .* s, disconnecting")
# do a graceful shutdown which would had caught the allowed_errors before
# https://github.com/neondatabase/neon/pull/8632
env.pageserver.stop()
def test_compute_pageserver_statement_timeout(neon_env_builder: NeonEnvBuilder):
"""
Test statement_timeout while waiting for response to pageserver request
"""
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.append(".*slow GetPage.*")
pageserver_http = env.pageserver.http_client()
# Make sure the shared_buffers and LFC are tiny, to ensure the queries
# hit the storage. Disable autovacuum to make the test more deterministic.
config_lines = [
"shared_buffers='512kB'",
"autovacuum = off",
]
if USE_LFC:
config_lines = ["neon.max_file_cache_size = 1MB", "neon.file_cache_size_limit = 1MB"]
endpoint = env.endpoints.create_start(
"main",
tenant_id=env.initial_tenant,
config_lines=config_lines,
)
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
# Disable parallel query. Parallel workers open their own pageserver connections,
# which messes up the test logic.
cur.execute("SET max_parallel_workers_per_gather=0")
cur.execute("SET effective_io_concurrency=0")
# Create table, and insert some rows. Make it big enough that it doesn't fit in
# shared_buffers, otherwise the SELECT after restart will just return answer
# from shared_buffers without hitting the page server, which defeats the point
# of this test.
cur.execute("CREATE TABLE foo (t text)")
cur.execute(
"""
INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 100000) g
"""
)
# Verify that the table is larger than shared_buffers
cur.execute(
"""
select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_size
from pg_settings where name = 'shared_buffers'
"""
)
row = cur.fetchone()
assert row is not None
log.debug(f"shared_buffers is {row[0]}, table size {row[1]}")
assert int(row[0]) < int(row[1])
## Run a query until the compute->pageserver connection hits the failpoint and
## get stuck. This tests that the statement_timeout is obeyed while waiting on a
## GetPage request.
log.info("running workload with statement_timeout")
cur.execute("SET neon.pageserver_response_log_timeout = '2000ms'")
cur.execute("SET neon.pageserver_response_disconnect_timeout = '30000ms'")
cur.execute("SET statement_timeout='10s'")
pageserver_http.configure_failpoints(("before-pagestream-msg-flush", "10%return(60000)"))
start_time = time.time()
with pytest.raises(psycopg2.errors.QueryCanceled):
cur.execute("SELECT count(*) FROM foo")
cur.fetchall()
log.info("Statement timeout reached")
end_time = time.time()
# Verify that the statement_timeout canceled the query before
# neon.pageserver_response_disconnect_timeout expired
assert end_time - start_time < 40
times_canceled = 1
# Should not have disconnected yet
assert not endpoint.log_contains("no response from pageserver for .* s, disconnecting")
# Clear the failpoint. This doesn't affect the connection that already hit it. It
# will keep waiting. But subsequent connections will work normally.
pageserver_http.configure_failpoints(("before-pagestream-msg-flush", "off"))
# If we keep retrying, we should eventually succeed. (This tests that the
# neon.pageserver_response_disconnect_timeout is not reset on query
# cancellation.)
while times_canceled < 10:
try:
cur.execute("SELECT count(*) FROM foo")
cur.fetchall()
log.info("Statement succeeded")
break
except psycopg2.errors.QueryCanceled:
log.info("Statement timed out, retrying")
times_canceled += 1
assert times_canceled > 1 and times_canceled < 10
assert endpoint.log_contains("no response from pageserver for .* s, disconnecting")
# do a graceful shutdown which would had caught the allowed_errors before
# https://github.com/neondatabase/neon/pull/8632
env.pageserver.stop()