diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index b9b8e32753..cab3d76bf8 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -45,6 +45,7 @@ use utils::sync::gate::{Gate, GateGuard}; use utils::sync::spsc_fold; use utils::{ auth::{Claims, Scope, SwappableJwtAuth}, + failpoint_support, id::{TenantId, TimelineId}, lsn::Lsn, simple_rcu::RcuReadGuard, @@ -1298,6 +1299,8 @@ impl PageServerHandler { &response_msg.serialize(protocol_version), ))?; + failpoint_support::sleep_millis_async!("before-pagestream-msg-flush", cancel); + // what we want to do let socket_fd = pgb_writer.socket_fd; let flush_fut = pgb_writer.flush(); diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index f5801b379b..f71f11ff93 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -14,6 +14,8 @@ */ #include "postgres.h" +#include + #include "access/xlog.h" #include "common/hashfn.h" #include "fmgr.h" @@ -61,6 +63,9 @@ int neon_protocol_version = 2; static int max_reconnect_attempts = 60; static int stripe_size; +static int pageserver_response_log_timeout = 10000; +static int pageserver_response_disconnect_timeout = 120000; /* 2 minutes */ + typedef struct { char connstring[MAX_SHARDS][MAX_PAGESERVER_CONNSTRING_SIZE]; @@ -129,6 +134,11 @@ typedef struct uint64 nrequests_sent; uint64 nresponses_received; + /* State for the receive timeout mechanism in call_PQgetCopyData() */ + instr_time receive_start_time; /* when we started waiting */ + instr_time receive_last_log_time; /* when we last printed a log message for the wait */ + bool receive_logged; /* has the wait been logged */ + /*--- * WaitEventSet containing: * - WL_SOCKET_READABLE on 'conn' @@ -661,6 +671,9 @@ pageserver_connect(shardno_t shard_no, int elevel) shard->state = PS_Connected; shard->nrequests_sent = 0; shard->nresponses_received = 0; + INSTR_TIME_SET_ZERO(shard->receive_start_time); + INSTR_TIME_SET_ZERO(shard->receive_last_log_time); + shard->receive_logged = false; } /* FALLTHROUGH */ case PS_Connected: @@ -680,6 +693,33 @@ pageserver_connect(shardno_t shard_no, int elevel) Assert(false); } +static void +get_socket_stats(int socketfd, int *sndbuf, int *recvbuf) +{ + *sndbuf = -1; + *recvbuf = -1; + +#ifdef __linux__ + /* + * get kernel's send and recv queue size via ioctl + * https://elixir.bootlin.com/linux/v6.1.128/source/include/uapi/linux/sockios.h#L25-L27 + */ + if (socketfd != -1) + { + int ioctl_err; + + ioctl_err = ioctl(socketfd, SIOCOUTQ, sndbuf); + if (ioctl_err!= 0) { + *sndbuf = -errno; + } + ioctl_err = ioctl(socketfd, FIONREAD, recvbuf); + if (ioctl_err != 0) { + *recvbuf = -errno; + } + } +#endif +} + /* * A wrapper around PQgetCopyData that checks for interrupts while sleeping. */ @@ -690,26 +730,8 @@ call_PQgetCopyData(shardno_t shard_no, char **buffer) PageServer *shard = &page_servers[shard_no]; PGconn *pageserver_conn = shard->conn; instr_time now, - start_ts, since_start, - last_log_ts, since_last_log; - bool logged = false; - - /* - * As a debugging aid, if we don't get a response for a long time, print a - * log message. - * - * 10 s is a very generous threshold, normally we expect a response in a - * few milliseconds. We have metrics to track latencies in normal ranges, - * but in the cases that take exceptionally long, it's useful to log the - * exact timestamps. - */ -#define LOG_INTERVAL_MS INT64CONST(10 * 1000) - - INSTR_TIME_SET_CURRENT(now); - start_ts = last_log_ts = now; - INSTR_TIME_SET_ZERO(since_last_log); retry: ret = PQgetCopyData(pageserver_conn, buffer, 1 /* async */ ); @@ -718,11 +740,36 @@ retry: { WaitEvent occurred_event; int noccurred; + double log_timeout, + disconnect_timeout; long timeout; - timeout = Max(0, LOG_INTERVAL_MS - INSTR_TIME_GET_MILLISEC(since_last_log)); + /* + * Calculate time elapsed since the start, and since the last progress + * log message. On first call, remember the start time. + */ + INSTR_TIME_SET_CURRENT(now); + if (INSTR_TIME_IS_ZERO(shard->receive_start_time)) + { + shard->receive_start_time = now; + INSTR_TIME_SET_ZERO(since_start); + shard->receive_last_log_time = now; + INSTR_TIME_SET_ZERO(since_last_log); + shard->receive_logged = false; + } + else + { + since_start = now; + INSTR_TIME_SUBTRACT(since_start, shard->receive_start_time); + since_last_log = now; + INSTR_TIME_SUBTRACT(since_last_log, shard->receive_last_log_time); + } + + /* Sleep until the log or disconnect timeout is reached. */ + log_timeout = Max(0, (double) pageserver_response_log_timeout - INSTR_TIME_GET_MILLISEC(since_last_log)); + disconnect_timeout = Max(0, (double) pageserver_response_disconnect_timeout - INSTR_TIME_GET_MILLISEC(since_start)); + timeout = (long) ceil(Min(log_timeout, disconnect_timeout)); - /* Sleep until there's something to do */ noccurred = WaitEventSetWait(shard->wes_read, timeout, &occurred_event, 1, WAIT_EVENT_NEON_PS_READ); ResetLatch(MyLatch); @@ -740,49 +787,61 @@ retry: pfree(msg); return -1; } + goto retry; + } + + /* Timeout was reached, or we were interrupted for some other reason */ + INSTR_TIME_SET_CURRENT(now); + since_last_log = now; + INSTR_TIME_SUBTRACT(since_last_log, shard->receive_last_log_time); + since_start = now; + INSTR_TIME_SUBTRACT(since_start, shard->receive_start_time); + + /* + * As a debugging aid, if we don't get a response to a pageserver request + * for a long time, print a log message. + * + * The default neon.pageserver_response_log_timeout value, 10 s, is + * very generous. Normally we expect a response in a few + * milliseconds. We have metrics to track latencies in normal ranges, + * but in the cases that take exceptionally long, it's useful to log + * the exact timestamps. + */ + if (INSTR_TIME_GET_MILLISEC(since_last_log) >= pageserver_response_log_timeout) + { + int sndbuf; + int recvbuf; + + get_socket_stats(PQsocket(pageserver_conn), &sndbuf, &recvbuf); + + neon_shard_log(shard_no, LOG, + "no response received from pageserver for %0.3f s, still waiting (sent " UINT64_FORMAT " requests, received " UINT64_FORMAT " responses) (socket sndbuf=%d recvbuf=%d)", + INSTR_TIME_GET_DOUBLE(since_start), + shard->nrequests_sent, shard->nresponses_received, sndbuf, recvbuf); + shard->receive_last_log_time = now; + shard->receive_logged = true; } /* - * Print a message to the log if a long time has passed with no - * response. + * If an even longer time has passed without receiving a response from + * the pageserver, disconnect. That triggers a reconnection attempt + * in the caller. + * + * If this happens, the pageserver is likely dead and isn't coming + * back, or there's some kind of a network glitch and the connection + * is permanently gone. Without this, if the pageserver or the network + * connection is dead, it could take a very long time (15 minutes or + * more) until the TCP keepalive timeout notices that. Even if we + * would in fact get a response if we just waited a little longer, + * there's a good chance that we'll get the response sooner by + * reconnecting. */ - INSTR_TIME_SET_CURRENT(now); - since_last_log = now; - INSTR_TIME_SUBTRACT(since_last_log, last_log_ts); - if (INSTR_TIME_GET_MILLISEC(since_last_log) >= LOG_INTERVAL_MS) + if (INSTR_TIME_GET_MILLISEC(since_start) >= pageserver_response_disconnect_timeout) { - int sndbuf = -1; - int recvbuf = -1; -#ifdef __linux__ - int socketfd; -#endif - - since_start = now; - INSTR_TIME_SUBTRACT(since_start, start_ts); - -#ifdef __linux__ - /* - * get kernel's send and recv queue size via ioctl - * https://elixir.bootlin.com/linux/v6.1.128/source/include/uapi/linux/sockios.h#L25-L27 - */ - socketfd = PQsocket(pageserver_conn); - if (socketfd != -1) { - int ioctl_err; - ioctl_err = ioctl(socketfd, SIOCOUTQ, &sndbuf); - if (ioctl_err!= 0) { - sndbuf = -errno; - } - ioctl_err = ioctl(socketfd, FIONREAD, &recvbuf); - if (ioctl_err != 0) { - recvbuf = -errno; - } - } -#endif - neon_shard_log(shard_no, LOG, "no response received from pageserver for %0.3f s, still waiting (sent " UINT64_FORMAT " requests, received " UINT64_FORMAT " responses) (socket sndbuf=%d recvbuf=%d)", - INSTR_TIME_GET_DOUBLE(since_start), - shard->nrequests_sent, shard->nresponses_received, sndbuf, recvbuf); - last_log_ts = now; - logged = true; + neon_shard_log(shard_no, LOG, "no response from pageserver for %0.3f s, disconnecting", + INSTR_TIME_GET_DOUBLE(since_start)); + pageserver_disconnect(shard_no); + return -1; } goto retry; @@ -792,14 +851,18 @@ retry: * If we logged earlier that the response is taking a long time, log * another message when the response is finally received. */ - if (logged) + if (shard->receive_logged) { INSTR_TIME_SET_CURRENT(now); since_start = now; - INSTR_TIME_SUBTRACT(since_start, start_ts); - neon_shard_log(shard_no, LOG, "received response from pageserver after %0.3f s", + INSTR_TIME_SUBTRACT(since_start, shard->receive_start_time); + neon_shard_log(shard_no, LOG, + "received response from pageserver after %0.3f s", INSTR_TIME_GET_DOUBLE(since_start)); } + INSTR_TIME_SET_ZERO(shard->receive_start_time); + INSTR_TIME_SET_ZERO(shard->receive_last_log_time); + shard->receive_logged = false; return ret; } @@ -973,9 +1036,17 @@ pageserver_receive(shardno_t shard_no) pfree(msg); } } + else if (rc == -1 && shard->state == PS_Disconnected) + { + /* If the state is 'Disconnected', the disconnection message was already logged */ + resp = NULL; + } else if (rc == -1) { - neon_shard_log(shard_no, LOG, "pageserver_receive disconnect: psql end of copy data: %s", pchomp(PQerrorMessage(pageserver_conn))); + char *msg = pchomp(PQerrorMessage(pageserver_conn)); + + neon_shard_log(shard_no, LOG, "pageserver_receive disconnect: psql end of copy data: %s", msg); + pfree(msg); pageserver_disconnect(shard_no); resp = NULL; } @@ -1261,6 +1332,26 @@ pg_init_libpagestore(void) 0, /* no flags required */ NULL, NULL, NULL); + DefineCustomIntVariable("neon.pageserver_response_log_timeout", + "pageserver response log timeout", + "If the pageserver doesn't respond to a request within this timeout," + "a message is printed to the log.", + &pageserver_response_log_timeout, + 10000, 100, INT_MAX, + PGC_SUSET, + GUC_UNIT_MS, + NULL, NULL, NULL); + + DefineCustomIntVariable("neon.pageserver_response_disconnect_timeout", + "pageserver response diconnect timeout", + "If the pageserver doesn't respond to a request within this timeout," + "disconnect and reconnect.", + &pageserver_response_disconnect_timeout, + 120000, 100, INT_MAX, + PGC_SUSET, + GUC_UNIT_MS, + NULL, NULL, NULL); + relsize_hash_init(); if (page_server != NULL) diff --git a/test_runner/regress/test_bad_connection.py b/test_runner/regress/test_bad_connection.py index c0c9537421..bfc5cb174e 100644 --- a/test_runner/regress/test_bad_connection.py +++ b/test_runner/regress/test_bad_connection.py @@ -7,6 +7,7 @@ import psycopg2.errors import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnvBuilder +from fixtures.utils import USE_LFC @pytest.mark.timeout(600) @@ -80,3 +81,193 @@ def test_compute_pageserver_connection_stress(neon_env_builder: NeonEnvBuilder): # do a graceful shutdown which would had caught the allowed_errors before # https://github.com/neondatabase/neon/pull/8632 env.pageserver.stop() + + +def test_compute_pageserver_hung_connections(neon_env_builder: NeonEnvBuilder): + """ + Test timeouts in waiting for response to pageserver request + """ + env = neon_env_builder.init_start() + env.pageserver.allowed_errors.append(".*slow GetPage.*") + pageserver_http = env.pageserver.http_client() + endpoint = env.endpoints.create_start( + "main", + tenant_id=env.initial_tenant, + config_lines=["autovacuum = off"], + ) + pg_conn = endpoint.connect() + cur = pg_conn.cursor() + + # Create table, and insert some rows. Make it big enough that it doesn't fit in + # shared_buffers, otherwise the SELECT after restart will just return answer + # from shared_buffers without hitting the page server, which defeats the point + # of this test. + cur.execute("CREATE TABLE foo (t text)") + cur.execute( + """ + INSERT INTO foo + SELECT 'long string to consume some space' || g + FROM generate_series(1, 100000) g + """ + ) + + # Verify that the table is larger than shared_buffers + cur.execute( + """ + select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_size + from pg_settings where name = 'shared_buffers' + """ + ) + row = cur.fetchone() + assert row is not None + log.debug(f"shared_buffers is {row[0]}, table size {row[1]}") + assert int(row[0]) < int(row[1]) + + # Print the backend PID so that it can be compared with the logs easily + cur.execute("SELECT pg_backend_pid()") + row = cur.fetchone() + assert row is not None + log.info(f"running test workload in backend PID {row[0]}") + + def run_workload(duration: float): + end_time = time.time() + duration + times_executed = 0 + while time.time() < end_time: + if random.random() < 0.5: + cur.execute("INSERT INTO foo VALUES ('stas'), ('heikki')") + else: + cur.execute("SELECT t FROM foo ORDER BY RANDOM() LIMIT 10") + cur.fetchall() + times_executed += 1 + log.info(f"Workload executed {times_executed} times") + assert times_executed > 0 + + ## Test short connection hiccups + ## + ## This is to exercise the logging timeout. + log.info("running workload with log timeout") + cur.execute("SET neon.pageserver_response_log_timeout = '500ms'") + pageserver_http.configure_failpoints(("before-pagestream-msg-flush", "10%3*return(3000)")) + run_workload(20) + + # check that the message was logged + assert endpoint.log_contains("no response received from pageserver for .* s, still waiting") + assert endpoint.log_contains("received response from pageserver after .* s") + + ## Test connections that are hung for longer + ## + ## This exercises the disconnect timeout. We'll disconnect and + ## reconnect after 500 ms. + log.info("running workload with disconnect timeout") + cur.execute("SET neon.pageserver_response_log_timeout = '250ms'") + cur.execute("SET neon.pageserver_response_disconnect_timeout = '500ms'") + pageserver_http.configure_failpoints(("before-pagestream-msg-flush", "10%3*return(3000)")) + run_workload(15) + + assert endpoint.log_contains("no response from pageserver for .* s, disconnecting") + + # do a graceful shutdown which would had caught the allowed_errors before + # https://github.com/neondatabase/neon/pull/8632 + env.pageserver.stop() + + +def test_compute_pageserver_statement_timeout(neon_env_builder: NeonEnvBuilder): + """ + Test statement_timeout while waiting for response to pageserver request + """ + env = neon_env_builder.init_start() + env.pageserver.allowed_errors.append(".*slow GetPage.*") + pageserver_http = env.pageserver.http_client() + + # Make sure the shared_buffers and LFC are tiny, to ensure the queries + # hit the storage. Disable autovacuum to make the test more deterministic. + config_lines = [ + "shared_buffers='512kB'", + "autovacuum = off", + ] + if USE_LFC: + config_lines = ["neon.max_file_cache_size = 1MB", "neon.file_cache_size_limit = 1MB"] + endpoint = env.endpoints.create_start( + "main", + tenant_id=env.initial_tenant, + config_lines=config_lines, + ) + pg_conn = endpoint.connect() + cur = pg_conn.cursor() + + # Disable parallel query. Parallel workers open their own pageserver connections, + # which messes up the test logic. + cur.execute("SET max_parallel_workers_per_gather=0") + cur.execute("SET effective_io_concurrency=0") + + # Create table, and insert some rows. Make it big enough that it doesn't fit in + # shared_buffers, otherwise the SELECT after restart will just return answer + # from shared_buffers without hitting the page server, which defeats the point + # of this test. + cur.execute("CREATE TABLE foo (t text)") + cur.execute( + """ + INSERT INTO foo + SELECT 'long string to consume some space' || g + FROM generate_series(1, 100000) g + """ + ) + + # Verify that the table is larger than shared_buffers + cur.execute( + """ + select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_size + from pg_settings where name = 'shared_buffers' + """ + ) + row = cur.fetchone() + assert row is not None + log.debug(f"shared_buffers is {row[0]}, table size {row[1]}") + assert int(row[0]) < int(row[1]) + + ## Run a query until the compute->pageserver connection hits the failpoint and + ## get stuck. This tests that the statement_timeout is obeyed while waiting on a + ## GetPage request. + log.info("running workload with statement_timeout") + cur.execute("SET neon.pageserver_response_log_timeout = '2000ms'") + cur.execute("SET neon.pageserver_response_disconnect_timeout = '30000ms'") + cur.execute("SET statement_timeout='10s'") + pageserver_http.configure_failpoints(("before-pagestream-msg-flush", "10%return(60000)")) + + start_time = time.time() + with pytest.raises(psycopg2.errors.QueryCanceled): + cur.execute("SELECT count(*) FROM foo") + cur.fetchall() + log.info("Statement timeout reached") + end_time = time.time() + # Verify that the statement_timeout canceled the query before + # neon.pageserver_response_disconnect_timeout expired + assert end_time - start_time < 40 + times_canceled = 1 + + # Should not have disconnected yet + assert not endpoint.log_contains("no response from pageserver for .* s, disconnecting") + + # Clear the failpoint. This doesn't affect the connection that already hit it. It + # will keep waiting. But subsequent connections will work normally. + pageserver_http.configure_failpoints(("before-pagestream-msg-flush", "off")) + + # If we keep retrying, we should eventually succeed. (This tests that the + # neon.pageserver_response_disconnect_timeout is not reset on query + # cancellation.) + while times_canceled < 10: + try: + cur.execute("SELECT count(*) FROM foo") + cur.fetchall() + log.info("Statement succeeded") + break + except psycopg2.errors.QueryCanceled: + log.info("Statement timed out, retrying") + times_canceled += 1 + assert times_canceled > 1 and times_canceled < 10 + + assert endpoint.log_contains("no response from pageserver for .* s, disconnecting") + + # do a graceful shutdown which would had caught the allowed_errors before + # https://github.com/neondatabase/neon/pull/8632 + env.pageserver.stop()