From a283edaccf7413c0cb0768c775f00158eacb0af3 Mon Sep 17 00:00:00 2001 From: Matthias van de Meent Date: Thu, 27 Feb 2025 15:00:18 +0100 Subject: [PATCH] PS/Prefetch: Use a timeout for reading data from TCP (#10834) This reduces pressure on OS TCP buffers, reducing flush times in other systems like PageServer. ## Problem ## Summary of changes --- pgxn/neon/libpagestore.c | 21 +++- pgxn/neon/neon.c | 1 + pgxn/neon/neon.h | 2 + pgxn/neon/pagestore_client.h | 6 +- pgxn/neon/pagestore_smgr.c | 207 +++++++++++++++++++++++++++++++---- 5 files changed, 211 insertions(+), 26 deletions(-) diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index f71f11ff93..49f12bbb9e 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -1099,6 +1099,10 @@ pageserver_try_receive(shardno_t shard_no) { neon_shard_log(shard_no, LOG, "pageserver_receive: disconnect due to failure while parsing response"); pageserver_disconnect(shard_no); + /* + * Malformed responses from PageServer are a reason to raise + * errors and cancel transactions. + */ PG_RE_THROW(); } PG_END_TRY(); @@ -1122,7 +1126,8 @@ pageserver_try_receive(shardno_t shard_no) char *msg = pchomp(PQerrorMessage(pageserver_conn)); pageserver_disconnect(shard_no); - neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect: could not read COPY data: %s", msg); + neon_shard_log(shard_no, LOG, "pageserver_receive disconnect: could not read COPY data: %s", msg); + resp = NULL; } else { @@ -1321,6 +1326,16 @@ pg_init_libpagestore(void) PGC_USERSET, 0, /* no flags required */ NULL, (GucIntAssignHook) &readahead_buffer_resize, NULL); + DefineCustomIntVariable("neon.readahead_getpage_pull_timeout", + "readahead response pull timeout", + "Time between active tries to pull data from the " + "PageStream connection when we have pages which " + "were read ahead but not yet received.", + &readahead_getpage_pull_timeout_ms, + 0, 0, 5 * 60 * 1000, + PGC_USERSET, + GUC_UNIT_MS, + NULL, NULL, NULL); DefineCustomIntVariable("neon.protocol_version", "Version of compute<->page server protocol", NULL, @@ -1334,7 +1349,7 @@ pg_init_libpagestore(void) DefineCustomIntVariable("neon.pageserver_response_log_timeout", "pageserver response log timeout", - "If the pageserver doesn't respond to a request within this timeout," + "If the pageserver doesn't respond to a request within this timeout, " "a message is printed to the log.", &pageserver_response_log_timeout, 10000, 100, INT_MAX, @@ -1344,7 +1359,7 @@ pg_init_libpagestore(void) DefineCustomIntVariable("neon.pageserver_response_disconnect_timeout", "pageserver response diconnect timeout", - "If the pageserver doesn't respond to a request within this timeout," + "If the pageserver doesn't respond to a request within this timeout, " "disconnect and reconnect.", &pageserver_response_disconnect_timeout, 120000, 100, INT_MAX, diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index 768d7ae9e8..4b448ba5f6 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -437,6 +437,7 @@ _PG_init(void) pg_init_libpagestore(); pg_init_walproposer(); + pagestore_smgr_init(); Custom_XLogReaderRoutines = NeonOnDemandXLogReaderRoutines; InitUnstableExtensionsSupport(); diff --git a/pgxn/neon/neon.h b/pgxn/neon/neon.h index 912e09c3d3..7686ce076b 100644 --- a/pgxn/neon/neon.h +++ b/pgxn/neon/neon.h @@ -22,6 +22,7 @@ extern char *neon_tenant; extern char *wal_acceptors_list; extern int wal_acceptor_reconnect_timeout; extern int wal_acceptor_connection_timeout; +extern int readahead_getpage_pull_timeout_ms; #if PG_MAJORVERSION_NUM >= 17 extern uint32 WAIT_EVENT_NEON_LFC_MAINTENANCE; @@ -49,6 +50,7 @@ extern uint32 WAIT_EVENT_NEON_WAL_DL; extern void pg_init_libpagestore(void); extern void pg_init_walproposer(void); +extern void pagestore_smgr_init(void); extern uint64 BackpressureThrottlingTime(void); extern void SetNeonCurrentClusterSize(uint64 size); diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index 9faab1e4f0..475697f9c0 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -209,7 +209,11 @@ typedef struct NeonResponse *(*receive) (shardno_t shard_no); /* * Try get the next response from the TCP buffers, if any. - * Returns NULL when the data is not yet available. + * Returns NULL when the data is not yet available. + * + * This will raise errors only for malformed responses (we can't put them + * back into connection). All other error conditions are soft errors and + * return NULL as "no response available". */ NeonResponse *(*try_receive) (shardno_t shard_no); /* diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 091ad555e0..fe463fd4a6 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -65,10 +65,12 @@ #include "storage/fsm_internals.h" #include "storage/md.h" #include "storage/smgr.h" +#include "utils/timeout.h" +#include "bitmap.h" +#include "neon.h" #include "neon_perf_counters.h" #include "pagestore_client.h" -#include "bitmap.h" #if PG_VERSION_NUM >= 150000 #include "access/xlogrecovery.h" @@ -123,6 +125,45 @@ static BlockNumber neon_nblocks(SMgrRelation reln, ForkNumber forknum); static uint32 local_request_counter; #define GENERATE_REQUEST_ID() (((NeonRequestId)MyProcPid << 32) | ++local_request_counter) +/* + * Various settings related to prompt (fast) handling of PageStream responses + * at any CHECK_FOR_INTERRUPTS point. + */ +int readahead_getpage_pull_timeout_ms = 0; +static int PS_TIMEOUT_ID = 0; +static bool timeout_set = false; +static bool timeout_signaled = false; + +/* + * We have a CHECK_FOR_INTERRUPTS in page_server->receive(), and we don't want + * that to handle any getpage responses if we're already working on the + * backlog of those, as we'd hit issues with determining which prefetch slot + * we just got a response for. + * + * To protect against that, we have this variable that's set whenever we start + * receiving data for prefetch slots, so that we don't get confused. + * + * Note that in certain error cases during readpage we may leak r_r_g=true, + * which results in a failure to pick up further responses until we first + * actively try to receive new getpage responses. + */ +static bool readpage_reentrant_guard = false; + +static void reconfigure_timeout_if_needed(void); +static void pagestore_timeout_handler(void); + +#define START_PREFETCH_RECEIVE_WORK() \ + do { \ + readpage_reentrant_guard = true; \ + } while (false) + +#define END_PREFETCH_RECEIVE_WORK() \ + do { \ + readpage_reentrant_guard = false; \ + if (unlikely(timeout_signaled && !InterruptPending)) \ + InterruptPending = true; \ + } while (false) + /* * Prefetch implementation: * @@ -221,7 +262,6 @@ typedef struct PrfHashEntry #define SH_DEFINE #define SH_DECLARE #include "lib/simplehash.h" -#include "neon.h" /* * PrefetchState maintains the state of (prefetch) getPage@LSN requests. @@ -407,17 +447,26 @@ compact_prefetch_buffers(void) } /* - * If there might be responses still in the TCP buffer, then - * we should try to use those, so as to reduce any TCP backpressure - * on the OS/PS side. + * If there might be responses still in the TCP buffer, then we should try to + * use those, to reduce any TCP backpressure on the OS/PS side. * * This procedure handles that. * - * Note that this is only valid as long as the only pipelined - * operations in the TCP buffer are getPage@Lsn requests. + * Note that this works because we don't pipeline non-getPage requests. + * + * NOTE: This procedure is not allowed to throw errors that should be handled + * by SMGR-related code, as this can be called from every CHECK_FOR_INTERRUPTS + * point inside and outside PostgreSQL. + * + * This still does throw errors when it receives malformed responses from PS. + * + * When we're not called from CHECK_FOR_INTERRUPTS (indicated by + * IsHandlingInterrupts) we also report we've ended prefetch receive work, + * just in case state tracking was lost due to an error in the sync getPage + * response code. */ static void -prefetch_pump_state(void) +prefetch_pump_state(bool IsHandlingInterrupts) { while (MyPState->ring_receive != MyPState->ring_flush) { @@ -466,6 +515,12 @@ prefetch_pump_state(void) } } } + + /* We never pump the prefetch state while handling other pages */ + if (!IsHandlingInterrupts) + END_PREFETCH_RECEIVE_WORK(); + + reconfigure_timeout_if_needed(); } void @@ -581,8 +636,8 @@ readahead_buffer_resize(int newsize, void *extra) /* * Make sure that there are no responses still in the buffer. * - * NOTE: this function may indirectly update MyPState->pfs_hash; which - * invalidates any active pointers into the hash table. + * This function may indirectly update MyPState->pfs_hash; which invalidates + * any active pointers into the hash table. */ static void consume_prefetch_responses(void) @@ -639,6 +694,7 @@ static bool prefetch_wait_for(uint64 ring_index) { PrefetchRequest *entry; + bool result = true; if (MyPState->ring_flush <= ring_index && MyPState->ring_unused > MyPState->ring_flush) @@ -652,13 +708,21 @@ prefetch_wait_for(uint64 ring_index) while (MyPState->ring_receive <= ring_index) { + START_PREFETCH_RECEIVE_WORK(); entry = GetPrfSlot(MyPState->ring_receive); Assert(entry->status == PRFS_REQUESTED); if (!prefetch_read(entry)) - return false; + { + result = false; + break; + } + + END_PREFETCH_RECEIVE_WORK(); + CHECK_FOR_INTERRUPTS(); } - return true; + + return result; } /* @@ -1316,6 +1380,12 @@ page_server_request(void const *req) page_server->disconnect(shard_no); MyNeonCounters->pageserver_open_requests = 0; + /* + * We know for sure we're not working on any prefetch pages after + * this. + */ + END_PREFETCH_RECEIVE_WORK(); + PG_RE_THROW(); } PG_END_TRY(); @@ -2943,7 +3013,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, MyPState->ring_last <= ring_index); } - prefetch_pump_state(); + prefetch_pump_state(false); return false; } @@ -2986,7 +3056,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) Assert(ring_index < MyPState->ring_unused && MyPState->ring_last <= ring_index); - prefetch_pump_state(); + prefetch_pump_state(false); return false; } @@ -3030,7 +3100,7 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum, */ neon_log(SmgrTrace, "writeback noop"); - prefetch_pump_state(); + prefetch_pump_state(false); #ifdef DEBUG_COMPARE_LOCAL if (IS_LOCAL_REL(reln)) @@ -3278,7 +3348,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer } /* Try to read PS results if they are available */ - prefetch_pump_state(); + prefetch_pump_state(false); neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1); @@ -3300,7 +3370,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer /* * Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes. */ - prefetch_pump_state(); + prefetch_pump_state(false); #ifdef DEBUG_COMPARE_LOCAL if (forkNum == MAIN_FORKNUM && IS_LOCAL_REL(reln)) @@ -3411,7 +3481,7 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, nblocks, PG_IOV_MAX); /* Try to read PS results if they are available */ - prefetch_pump_state(); + prefetch_pump_state(false); neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns, nblocks); @@ -3456,7 +3526,7 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, /* * Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes. */ - prefetch_pump_state(); + prefetch_pump_state(false); #ifdef DEBUG_COMPARE_LOCAL if (forkNum == MAIN_FORKNUM && IS_LOCAL_REL(reln)) @@ -3626,7 +3696,7 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo lfc_write(InfoFromSMgrRel(reln), forknum, blocknum, buffer); - prefetch_pump_state(); + prefetch_pump_state(false); #ifdef DEBUG_COMPARE_LOCAL if (IS_LOCAL_REL(reln)) @@ -3681,7 +3751,7 @@ neon_writev(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno, lfc_writev(InfoFromSMgrRel(reln), forknum, blkno, buffers, nblocks); - prefetch_pump_state(); + prefetch_pump_state(false); #ifdef DEBUG_COMPARE_LOCAL if (IS_LOCAL_REL(reln)) @@ -3972,7 +4042,7 @@ neon_immedsync(SMgrRelation reln, ForkNumber forknum) neon_log(SmgrTrace, "[NEON_SMGR] immedsync noop"); - prefetch_pump_state(); + prefetch_pump_state(false); #ifdef DEBUG_COMPARE_LOCAL if (IS_LOCAL_REL(reln)) @@ -4273,6 +4343,7 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf } pfree(resp); + reconfigure_timeout_if_needed(); return n_blocks; } @@ -4308,6 +4379,7 @@ AtEOXact_neon(XactEvent event, void *arg) } break; } + reconfigure_timeout_if_needed(); } static const struct f_smgr neon_smgr = @@ -4564,3 +4636,94 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id) } return no_redo_needed; } + +static void +reconfigure_timeout_if_needed(void) +{ + bool needs_set = MyPState->ring_receive != MyPState->ring_unused && + readahead_getpage_pull_timeout_ms > 0; + + if (needs_set != timeout_set) + { + /* The background writer doens't (shouldn't) read any pages */ + Assert(!AmBackgroundWriterProcess()); + /* The checkpointer doens't (shouldn't) read any pages */ + Assert(!AmCheckpointerProcess()); + + if (unlikely(PS_TIMEOUT_ID == 0)) + { + PS_TIMEOUT_ID = RegisterTimeout(USER_TIMEOUT, pagestore_timeout_handler); + } + + if (needs_set) + { +#if PG_MAJORVERSION_NUM <= 14 + enable_timeout_after(PS_TIMEOUT_ID, readahead_getpage_pull_timeout_ms); +#else + enable_timeout_every( + PS_TIMEOUT_ID, + TimestampTzPlusMilliseconds(GetCurrentTimestamp(), + readahead_getpage_pull_timeout_ms), + readahead_getpage_pull_timeout_ms + ); +#endif + timeout_set = true; + } + else + { + Assert(timeout_set); + disable_timeout(PS_TIMEOUT_ID, false); + timeout_set = false; + } + } +} + +static void +pagestore_timeout_handler(void) +{ +#if PG_MAJORVERSION_NUM <= 14 + /* + * PG14: Setting a repeating timeout is not possible, so we signal here + * that the timeout has already been reset, and by telling the system + * that system will re-schedule it later if we need to. + */ + timeout_set = false; +#endif + timeout_signaled = true; + InterruptPending = true; +} + +static process_interrupts_callback_t prev_interrupt_cb; + +/* + * Process new data received in our active PageStream sockets. + * + * This relies on the invariant that all pipelined yet-to-be-received requests + * are getPage requests managed by MyPState. This is currently true, any + * modification will probably require some stuff to make it work again. + */ +static bool +pagestore_smgr_processinterrupts(void) +{ + if (timeout_signaled) + { + if (!readpage_reentrant_guard && readahead_getpage_pull_timeout_ms > 0) + prefetch_pump_state(true); + + timeout_signaled = false; + reconfigure_timeout_if_needed(); + } + + if (!prev_interrupt_cb) + return false; + + return prev_interrupt_cb(); +} + + +void +pagestore_smgr_init(void) +{ + prev_interrupt_cb = ProcessInterruptsCallback; + ProcessInterruptsCallback = pagestore_smgr_processinterrupts; +}