From c5c4e47eddc89776c61bdda591dd958feee079b6 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Tue, 8 Nov 2022 12:11:17 +0200 Subject: [PATCH] Fix neon.flush_output_after GUC. The neon.flush_output_after was not effective, at least not in a sequential scan, because neon_read_at_lsn() flushed the prefetch queue on every call anyway. Change neon_read_at_lsn() so that it only flushes the queue if the GetPage request that it needs to wait for hasn't alrady been flushed. To make that possible, move the tracking of unflushed requests into a new ring_flush variable, alongside the other ring buffer indexes. While we're at it, mark neon.flush_output_after as PGC_USERSET, so that it can be changed per-session with "SET neon.flush_output_after = ...". Makes it easier to test different values. --- pgxn/neon/libpagestore.c | 9 +-------- pgxn/neon/pagestore_client.h | 1 + pgxn/neon/pagestore_smgr.c | 18 ++++++++++++++++-- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index c37adc6cb7..ece139ff08 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -42,7 +42,6 @@ PGconn *pageserver_conn = NULL; char *page_server_connstring_raw; -int n_unflushed_requests = 0; int flush_every_n_requests = 8; static void pageserver_flush(void); @@ -205,11 +204,6 @@ pageserver_send(NeonRequest * request) } pfree(req_buff.data); - n_unflushed_requests++; - - if (flush_every_n_requests > 0 && n_unflushed_requests >= flush_every_n_requests) - pageserver_flush(); - if (message_level_is_interesting(PageStoreTrace)) { char *msg = nm_to_string((NeonMessage *) request); @@ -274,7 +268,6 @@ pageserver_flush(void) pageserver_disconnect(); neon_log(ERROR, "failed to flush page requests: %s", msg); } - n_unflushed_requests = 0; } page_server_api api = { @@ -436,7 +429,7 @@ pg_init_libpagestore(void) NULL, &flush_every_n_requests, 8, -1, INT_MAX, - PGC_SIGHUP, + PGC_USERSET, 0, /* no flags required */ NULL, NULL, NULL); diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index be6c4b3a77..5d8617a158 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -150,6 +150,7 @@ extern void prefetch_on_ps_disconnect(void); extern page_server_api * page_server; extern char *page_server_connstring; +extern int flush_every_n_requests; extern bool seqscan_prefetch_enabled; extern int seqscan_prefetch_distance; extern char *neon_timeline; diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 59c5ff8db2..e8d6211bff 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -192,7 +192,7 @@ typedef struct PrfHashEntry { * It maintains a (ring) buffer of in-flight requests and responses. * * We maintain several indexes into the ring buffer: - * ring_unused >= ring_receive >= ring_last >= 0 + * ring_unused >= ring_flush >= ring_receive >= ring_last >= 0 * * ring_unused points to the first unused slot of the buffer * ring_receive is the next request that is to be received @@ -208,6 +208,7 @@ typedef struct PrefetchState { /* buffer indexes */ uint64 ring_unused; /* first unused slot */ + uint64 ring_flush; /* next request to flush */ uint64 ring_receive; /* next slot that is to receive a response */ uint64 ring_last; /* min slot with a response value */ @@ -577,6 +578,13 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls prefetch_do_request(slot, force_latest, force_lsn); Assert(slot->status == PRFS_REQUESTED); Assert(ring_index < MyPState->ring_unused); + + if (flush_every_n_requests > 0 && MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests) + { + page_server->flush(); + MyPState->ring_flush = MyPState->ring_unused; + } + return ring_index; } @@ -585,6 +593,7 @@ page_server_request(void const *req) { page_server->send((NeonRequest *) req); page_server->flush(); + MyPState->ring_flush = MyPState->ring_unused; consume_prefetch_responses(); return page_server->receive(); } @@ -1581,6 +1590,7 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, if (entry->slot->status == PRFS_REQUESTED) { page_server->flush(); + MyPState->ring_flush = MyPState->ring_unused; prefetch_wait_for(entry->slot->my_ring_index); } /* drop caches */ @@ -1606,7 +1616,11 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, Assert(slot->status != PRFS_UNUSED); Assert(&MyPState->prf_buffer[(ring_index % READ_BUFFER_SIZE)] == slot); - page_server->flush(); + if (ring_index >= MyPState->ring_flush) + { + page_server->flush(); + MyPState->ring_flush = MyPState->ring_unused; + } prefetch_wait_for(ring_index); Assert(slot->status == PRFS_RECEIVED);