From 2eda484ef6dac15e0e7a72955e9d194ccfe3326f Mon Sep 17 00:00:00 2001 From: Matthias van de Meent Date: Thu, 16 Jan 2025 03:43:47 +0100 Subject: [PATCH] prefetch: Read more frequently from TCP buffer (#10394) This reduces pressure on the OS TCP read buffer by increasing the moments we read data out of the receive buffer, and increasing the number of bytes we can pull from that buffer when we do reads. ## Problem A backend may not always consume its prefetch data quick enough ## Summary of changes We add a new function `prefetch_pump_state` which pulls as many prefetch requests from the OS TCP receive buffer as possible, but without blocking. This thus reduces pressure on OS-level TCP buffers, thus increasing throughput by limiting throttling caused by full TCP buffers. --- pgxn/neon/libpagestore.c | 70 +++++++++++++++++++++++++++++++++++- pgxn/neon/pagestore_client.h | 20 +++++++++++ pgxn/neon/pagestore_smgr.c | 66 ++++++++++++++++++++++++++++++++++ 3 files changed, 155 insertions(+), 1 deletion(-) diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 769befb4e5..4460e3b40c 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -911,7 +911,74 @@ pageserver_receive(shardno_t shard_no) } PG_CATCH(); { - neon_shard_log(shard_no, LOG, "pageserver_receive: disconnect due malformatted response"); + neon_shard_log(shard_no, LOG, "pageserver_receive: disconnect due to failure while parsing response"); + pageserver_disconnect(shard_no); + PG_RE_THROW(); + } + PG_END_TRY(); + + if (message_level_is_interesting(PageStoreTrace)) + { + char *msg = nm_to_string((NeonMessage *) resp); + + neon_shard_log(shard_no, PageStoreTrace, "got response: %s", msg); + pfree(msg); + } + } + else if (rc == -1) + { + neon_shard_log(shard_no, LOG, "pageserver_receive disconnect: psql end of copy data: %s", pchomp(PQerrorMessage(pageserver_conn))); + pageserver_disconnect(shard_no); + resp = NULL; + } + else if (rc == -2) + { + char *msg = pchomp(PQerrorMessage(pageserver_conn)); + + pageserver_disconnect(shard_no); + neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect: could not read COPY data: %s", msg); + } + else + { + pageserver_disconnect(shard_no); + neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect: unexpected PQgetCopyData return value: %d", rc); + } + + shard->nresponses_received++; + return (NeonResponse *) resp; +} + +static NeonResponse * +pageserver_try_receive(shardno_t shard_no) +{ + StringInfoData resp_buff; + NeonResponse *resp; + PageServer *shard = &page_servers[shard_no]; + PGconn *pageserver_conn = shard->conn; + /* read response */ + int rc; + + if (shard->state != PS_Connected) + return NULL; + + Assert(pageserver_conn); + + rc = PQgetCopyData(shard->conn, &resp_buff.data, 1 /* async = true */); + + if (rc == 0) + return NULL; + else if (rc > 0) + { + PG_TRY(); + { + resp_buff.len = rc; + resp_buff.cursor = 0; + resp = nm_unpack_response(&resp_buff); + PQfreemem(resp_buff.data); + } + PG_CATCH(); + { + neon_shard_log(shard_no, LOG, "pageserver_receive: disconnect due to failure while parsing response"); pageserver_disconnect(shard_no); PG_RE_THROW(); } @@ -980,6 +1047,7 @@ page_server_api api = .send = pageserver_send, .flush = pageserver_flush, .receive = pageserver_receive, + .try_receive = pageserver_try_receive, .disconnect = pageserver_disconnect_shard }; diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index 37bc4f7886..b751235595 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -192,9 +192,29 @@ typedef uint16 shardno_t; typedef struct { + /* + * Send this request to the PageServer associated with this shard. + */ bool (*send) (shardno_t shard_no, NeonRequest * request); + /* + * Blocking read for the next response of this shard. + * + * When a CANCEL signal is handled, the connection state will be + * unmodified. + */ NeonResponse *(*receive) (shardno_t shard_no); + /* + * Try get the next response from the TCP buffers, if any. + * Returns NULL when the data is not yet available. + */ + NeonResponse *(*try_receive) (shardno_t shard_no); + /* + * Make sure all requests are sent to PageServer. + */ bool (*flush) (shardno_t shard_no); + /* + * Disconnect from this pageserver shard. + */ void (*disconnect) (shardno_t shard_no); } page_server_api; diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 7a4c0ef487..54cacea984 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -405,6 +405,56 @@ compact_prefetch_buffers(void) return false; } +/* + * If there might be responses still in the TCP buffer, then + * we should try to use those, so as to reduce any TCP backpressure + * on the OS/PS side. + * + * This procedure handles that. + * + * Note that this is only valid as long as the only pipelined + * operations in the TCP buffer are getPage@Lsn requests. + */ +static void +prefetch_pump_state(void) +{ + while (MyPState->ring_receive != MyPState->ring_flush) + { + NeonResponse *response; + PrefetchRequest *slot; + MemoryContext old; + + slot = GetPrfSlot(MyPState->ring_receive); + + old = MemoryContextSwitchTo(MyPState->errctx); + response = page_server->try_receive(slot->shard_no); + MemoryContextSwitchTo(old); + + if (response == NULL) + break; + + /* The slot should still be valid */ + if (slot->status != PRFS_REQUESTED || + slot->response != NULL || + slot->my_ring_index != MyPState->ring_receive) + neon_shard_log(slot->shard_no, ERROR, + "Incorrect prefetch slot state after receive: status=%d response=%p my=%lu receive=%lu", + slot->status, slot->response, + (long) slot->my_ring_index, (long) MyPState->ring_receive); + + /* update prefetch state */ + MyPState->n_responses_buffered += 1; + MyPState->n_requests_inflight -= 1; + MyPState->ring_receive += 1; + MyNeonCounters->getpage_prefetches_buffered = + MyPState->n_responses_buffered; + + /* update slot state */ + slot->status = PRFS_RECEIVED; + slot->response = response; + } +} + void readahead_buffer_resize(int newsize, void *extra) { @@ -2808,6 +2858,8 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, MyPState->ring_last <= ring_index); } + prefetch_pump_state(); + return false; } @@ -2849,6 +2901,8 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) Assert(ring_index < MyPState->ring_unused && MyPState->ring_last <= ring_index); + prefetch_pump_state(); + return false; } #endif /* PG_MAJORVERSION_NUM < 17 */ @@ -2891,6 +2945,8 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum, */ neon_log(SmgrTrace, "writeback noop"); + prefetch_pump_state(); + #ifdef DEBUG_COMPARE_LOCAL if (IS_LOCAL_REL(reln)) mdwriteback(reln, forknum, blocknum, nblocks); @@ -3145,6 +3201,8 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1, NULL); neon_read_at_lsn(InfoFromSMgrRel(reln), forkNum, blkno, request_lsns, buffer); + prefetch_pump_state(); + #ifdef DEBUG_COMPARE_LOCAL if (forkNum == MAIN_FORKNUM && IS_LOCAL_REL(reln)) { @@ -3282,6 +3340,8 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, neon_read_at_lsnv(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns, buffers, nblocks, read); + prefetch_pump_state(); + #ifdef DEBUG_COMPARE_LOCAL if (forkNum == MAIN_FORKNUM && IS_LOCAL_REL(reln)) { @@ -3450,6 +3510,8 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo lfc_write(InfoFromSMgrRel(reln), forknum, blocknum, buffer); + prefetch_pump_state(); + #ifdef DEBUG_COMPARE_LOCAL if (IS_LOCAL_REL(reln)) #if PG_MAJORVERSION_NUM >= 17 @@ -3503,6 +3565,8 @@ neon_writev(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno, lfc_writev(InfoFromSMgrRel(reln), forknum, blkno, buffers, nblocks); + prefetch_pump_state(); + #ifdef DEBUG_COMPARE_LOCAL if (IS_LOCAL_REL(reln)) mdwritev(reln, forknum, blocknum, &buffer, 1, skipFsync); @@ -3792,6 +3856,8 @@ neon_immedsync(SMgrRelation reln, ForkNumber forknum) neon_log(SmgrTrace, "[NEON_SMGR] immedsync noop"); + prefetch_pump_state(); + #ifdef DEBUG_COMPARE_LOCAL if (IS_LOCAL_REL(reln)) mdimmedsync(reln, forknum);