PS/Prefetch: Use a timeout for reading data from TCP (#10834)

This reduces pressure on OS TCP buffers, reducing flush times in other
systems like PageServer.

## Problem

## Summary of changes
This commit is contained in:
Matthias van de Meent
2025-02-27 15:00:18 +01:00
committed by GitHub
parent ad37199745
commit a283edaccf
5 changed files with 211 additions and 26 deletions

View File

@@ -1099,6 +1099,10 @@ pageserver_try_receive(shardno_t shard_no)
{
neon_shard_log(shard_no, LOG, "pageserver_receive: disconnect due to failure while parsing response");
pageserver_disconnect(shard_no);
/*
* Malformed responses from PageServer are a reason to raise
* errors and cancel transactions.
*/
PG_RE_THROW();
}
PG_END_TRY();
@@ -1122,7 +1126,8 @@ pageserver_try_receive(shardno_t shard_no)
char *msg = pchomp(PQerrorMessage(pageserver_conn));
pageserver_disconnect(shard_no);
neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect: could not read COPY data: %s", msg);
neon_shard_log(shard_no, LOG, "pageserver_receive disconnect: could not read COPY data: %s", msg);
resp = NULL;
}
else
{
@@ -1321,6 +1326,16 @@ pg_init_libpagestore(void)
PGC_USERSET,
0, /* no flags required */
NULL, (GucIntAssignHook) &readahead_buffer_resize, NULL);
DefineCustomIntVariable("neon.readahead_getpage_pull_timeout",
"readahead response pull timeout",
"Time between active tries to pull data from the "
"PageStream connection when we have pages which "
"were read ahead but not yet received.",
&readahead_getpage_pull_timeout_ms,
0, 0, 5 * 60 * 1000,
PGC_USERSET,
GUC_UNIT_MS,
NULL, NULL, NULL);
DefineCustomIntVariable("neon.protocol_version",
"Version of compute<->page server protocol",
NULL,
@@ -1334,7 +1349,7 @@ pg_init_libpagestore(void)
DefineCustomIntVariable("neon.pageserver_response_log_timeout",
"pageserver response log timeout",
"If the pageserver doesn't respond to a request within this timeout,"
"If the pageserver doesn't respond to a request within this timeout, "
"a message is printed to the log.",
&pageserver_response_log_timeout,
10000, 100, INT_MAX,
@@ -1344,7 +1359,7 @@ pg_init_libpagestore(void)
DefineCustomIntVariable("neon.pageserver_response_disconnect_timeout",
"pageserver response diconnect timeout",
"If the pageserver doesn't respond to a request within this timeout,"
"If the pageserver doesn't respond to a request within this timeout, "
"disconnect and reconnect.",
&pageserver_response_disconnect_timeout,
120000, 100, INT_MAX,

View File

@@ -437,6 +437,7 @@ _PG_init(void)
pg_init_libpagestore();
pg_init_walproposer();
pagestore_smgr_init();
Custom_XLogReaderRoutines = NeonOnDemandXLogReaderRoutines;
InitUnstableExtensionsSupport();

View File

@@ -22,6 +22,7 @@ extern char *neon_tenant;
extern char *wal_acceptors_list;
extern int wal_acceptor_reconnect_timeout;
extern int wal_acceptor_connection_timeout;
extern int readahead_getpage_pull_timeout_ms;
#if PG_MAJORVERSION_NUM >= 17
extern uint32 WAIT_EVENT_NEON_LFC_MAINTENANCE;
@@ -49,6 +50,7 @@ extern uint32 WAIT_EVENT_NEON_WAL_DL;
extern void pg_init_libpagestore(void);
extern void pg_init_walproposer(void);
extern void pagestore_smgr_init(void);
extern uint64 BackpressureThrottlingTime(void);
extern void SetNeonCurrentClusterSize(uint64 size);

View File

@@ -209,7 +209,11 @@ typedef struct
NeonResponse *(*receive) (shardno_t shard_no);
/*
* Try get the next response from the TCP buffers, if any.
* Returns NULL when the data is not yet available.
* Returns NULL when the data is not yet available.
*
* This will raise errors only for malformed responses (we can't put them
* back into connection). All other error conditions are soft errors and
* return NULL as "no response available".
*/
NeonResponse *(*try_receive) (shardno_t shard_no);
/*

View File

@@ -65,10 +65,12 @@
#include "storage/fsm_internals.h"
#include "storage/md.h"
#include "storage/smgr.h"
#include "utils/timeout.h"
#include "bitmap.h"
#include "neon.h"
#include "neon_perf_counters.h"
#include "pagestore_client.h"
#include "bitmap.h"
#if PG_VERSION_NUM >= 150000
#include "access/xlogrecovery.h"
@@ -123,6 +125,45 @@ static BlockNumber neon_nblocks(SMgrRelation reln, ForkNumber forknum);
static uint32 local_request_counter;
#define GENERATE_REQUEST_ID() (((NeonRequestId)MyProcPid << 32) | ++local_request_counter)
/*
* Various settings related to prompt (fast) handling of PageStream responses
* at any CHECK_FOR_INTERRUPTS point.
*/
int readahead_getpage_pull_timeout_ms = 0;
static int PS_TIMEOUT_ID = 0;
static bool timeout_set = false;
static bool timeout_signaled = false;
/*
* We have a CHECK_FOR_INTERRUPTS in page_server->receive(), and we don't want
* that to handle any getpage responses if we're already working on the
* backlog of those, as we'd hit issues with determining which prefetch slot
* we just got a response for.
*
* To protect against that, we have this variable that's set whenever we start
* receiving data for prefetch slots, so that we don't get confused.
*
* Note that in certain error cases during readpage we may leak r_r_g=true,
* which results in a failure to pick up further responses until we first
* actively try to receive new getpage responses.
*/
static bool readpage_reentrant_guard = false;
static void reconfigure_timeout_if_needed(void);
static void pagestore_timeout_handler(void);
#define START_PREFETCH_RECEIVE_WORK() \
do { \
readpage_reentrant_guard = true; \
} while (false)
#define END_PREFETCH_RECEIVE_WORK() \
do { \
readpage_reentrant_guard = false; \
if (unlikely(timeout_signaled && !InterruptPending)) \
InterruptPending = true; \
} while (false)
/*
* Prefetch implementation:
*
@@ -221,7 +262,6 @@ typedef struct PrfHashEntry
#define SH_DEFINE
#define SH_DECLARE
#include "lib/simplehash.h"
#include "neon.h"
/*
* PrefetchState maintains the state of (prefetch) getPage@LSN requests.
@@ -407,17 +447,26 @@ compact_prefetch_buffers(void)
}
/*
* If there might be responses still in the TCP buffer, then
* we should try to use those, so as to reduce any TCP backpressure
* on the OS/PS side.
* If there might be responses still in the TCP buffer, then we should try to
* use those, to reduce any TCP backpressure on the OS/PS side.
*
* This procedure handles that.
*
* Note that this is only valid as long as the only pipelined
* operations in the TCP buffer are getPage@Lsn requests.
* Note that this works because we don't pipeline non-getPage requests.
*
* NOTE: This procedure is not allowed to throw errors that should be handled
* by SMGR-related code, as this can be called from every CHECK_FOR_INTERRUPTS
* point inside and outside PostgreSQL.
*
* This still does throw errors when it receives malformed responses from PS.
*
* When we're not called from CHECK_FOR_INTERRUPTS (indicated by
* IsHandlingInterrupts) we also report we've ended prefetch receive work,
* just in case state tracking was lost due to an error in the sync getPage
* response code.
*/
static void
prefetch_pump_state(void)
prefetch_pump_state(bool IsHandlingInterrupts)
{
while (MyPState->ring_receive != MyPState->ring_flush)
{
@@ -466,6 +515,12 @@ prefetch_pump_state(void)
}
}
}
/* We never pump the prefetch state while handling other pages */
if (!IsHandlingInterrupts)
END_PREFETCH_RECEIVE_WORK();
reconfigure_timeout_if_needed();
}
void
@@ -581,8 +636,8 @@ readahead_buffer_resize(int newsize, void *extra)
/*
* Make sure that there are no responses still in the buffer.
*
* NOTE: this function may indirectly update MyPState->pfs_hash; which
* invalidates any active pointers into the hash table.
* This function may indirectly update MyPState->pfs_hash; which invalidates
* any active pointers into the hash table.
*/
static void
consume_prefetch_responses(void)
@@ -639,6 +694,7 @@ static bool
prefetch_wait_for(uint64 ring_index)
{
PrefetchRequest *entry;
bool result = true;
if (MyPState->ring_flush <= ring_index &&
MyPState->ring_unused > MyPState->ring_flush)
@@ -652,13 +708,21 @@ prefetch_wait_for(uint64 ring_index)
while (MyPState->ring_receive <= ring_index)
{
START_PREFETCH_RECEIVE_WORK();
entry = GetPrfSlot(MyPState->ring_receive);
Assert(entry->status == PRFS_REQUESTED);
if (!prefetch_read(entry))
return false;
{
result = false;
break;
}
END_PREFETCH_RECEIVE_WORK();
CHECK_FOR_INTERRUPTS();
}
return true;
return result;
}
/*
@@ -1316,6 +1380,12 @@ page_server_request(void const *req)
page_server->disconnect(shard_no);
MyNeonCounters->pageserver_open_requests = 0;
/*
* We know for sure we're not working on any prefetch pages after
* this.
*/
END_PREFETCH_RECEIVE_WORK();
PG_RE_THROW();
}
PG_END_TRY();
@@ -2943,7 +3013,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
MyPState->ring_last <= ring_index);
}
prefetch_pump_state();
prefetch_pump_state(false);
return false;
}
@@ -2986,7 +3056,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
Assert(ring_index < MyPState->ring_unused &&
MyPState->ring_last <= ring_index);
prefetch_pump_state();
prefetch_pump_state(false);
return false;
}
@@ -3030,7 +3100,7 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum,
*/
neon_log(SmgrTrace, "writeback noop");
prefetch_pump_state();
prefetch_pump_state(false);
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
@@ -3278,7 +3348,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
}
/* Try to read PS results if they are available */
prefetch_pump_state();
prefetch_pump_state(false);
neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1);
@@ -3300,7 +3370,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
/*
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.
*/
prefetch_pump_state();
prefetch_pump_state(false);
#ifdef DEBUG_COMPARE_LOCAL
if (forkNum == MAIN_FORKNUM && IS_LOCAL_REL(reln))
@@ -3411,7 +3481,7 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
nblocks, PG_IOV_MAX);
/* Try to read PS results if they are available */
prefetch_pump_state();
prefetch_pump_state(false);
neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, blocknum,
request_lsns, nblocks);
@@ -3456,7 +3526,7 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
/*
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.
*/
prefetch_pump_state();
prefetch_pump_state(false);
#ifdef DEBUG_COMPARE_LOCAL
if (forkNum == MAIN_FORKNUM && IS_LOCAL_REL(reln))
@@ -3626,7 +3696,7 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo
lfc_write(InfoFromSMgrRel(reln), forknum, blocknum, buffer);
prefetch_pump_state();
prefetch_pump_state(false);
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
@@ -3681,7 +3751,7 @@ neon_writev(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
lfc_writev(InfoFromSMgrRel(reln), forknum, blkno, buffers, nblocks);
prefetch_pump_state();
prefetch_pump_state(false);
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
@@ -3972,7 +4042,7 @@ neon_immedsync(SMgrRelation reln, ForkNumber forknum)
neon_log(SmgrTrace, "[NEON_SMGR] immedsync noop");
prefetch_pump_state();
prefetch_pump_state(false);
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
@@ -4273,6 +4343,7 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf
}
pfree(resp);
reconfigure_timeout_if_needed();
return n_blocks;
}
@@ -4308,6 +4379,7 @@ AtEOXact_neon(XactEvent event, void *arg)
}
break;
}
reconfigure_timeout_if_needed();
}
static const struct f_smgr neon_smgr =
@@ -4564,3 +4636,94 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
}
return no_redo_needed;
}
static void
reconfigure_timeout_if_needed(void)
{
bool needs_set = MyPState->ring_receive != MyPState->ring_unused &&
readahead_getpage_pull_timeout_ms > 0;
if (needs_set != timeout_set)
{
/* The background writer doens't (shouldn't) read any pages */
Assert(!AmBackgroundWriterProcess());
/* The checkpointer doens't (shouldn't) read any pages */
Assert(!AmCheckpointerProcess());
if (unlikely(PS_TIMEOUT_ID == 0))
{
PS_TIMEOUT_ID = RegisterTimeout(USER_TIMEOUT, pagestore_timeout_handler);
}
if (needs_set)
{
#if PG_MAJORVERSION_NUM <= 14
enable_timeout_after(PS_TIMEOUT_ID, readahead_getpage_pull_timeout_ms);
#else
enable_timeout_every(
PS_TIMEOUT_ID,
TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
readahead_getpage_pull_timeout_ms),
readahead_getpage_pull_timeout_ms
);
#endif
timeout_set = true;
}
else
{
Assert(timeout_set);
disable_timeout(PS_TIMEOUT_ID, false);
timeout_set = false;
}
}
}
static void
pagestore_timeout_handler(void)
{
#if PG_MAJORVERSION_NUM <= 14
/*
* PG14: Setting a repeating timeout is not possible, so we signal here
* that the timeout has already been reset, and by telling the system
* that system will re-schedule it later if we need to.
*/
timeout_set = false;
#endif
timeout_signaled = true;
InterruptPending = true;
}
static process_interrupts_callback_t prev_interrupt_cb;
/*
* Process new data received in our active PageStream sockets.
*
* This relies on the invariant that all pipelined yet-to-be-received requests
* are getPage requests managed by MyPState. This is currently true, any
* modification will probably require some stuff to make it work again.
*/
static bool
pagestore_smgr_processinterrupts(void)
{
if (timeout_signaled)
{
if (!readpage_reentrant_guard && readahead_getpage_pull_timeout_ms > 0)
prefetch_pump_state(true);
timeout_signaled = false;
reconfigure_timeout_if_needed();
}
if (!prev_interrupt_cb)
return false;
return prev_interrupt_cb();
}
void
pagestore_smgr_init(void)
{
prev_interrupt_cb = ProcessInterruptsCallback;
ProcessInterruptsCallback = pagestore_smgr_processinterrupts;
}