diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index c37adc6cb7..ece139ff08 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -42,7 +42,6 @@ PGconn *pageserver_conn = NULL; char *page_server_connstring_raw; -int n_unflushed_requests = 0; int flush_every_n_requests = 8; static void pageserver_flush(void); @@ -205,11 +204,6 @@ pageserver_send(NeonRequest * request) } pfree(req_buff.data); - n_unflushed_requests++; - - if (flush_every_n_requests > 0 && n_unflushed_requests >= flush_every_n_requests) - pageserver_flush(); - if (message_level_is_interesting(PageStoreTrace)) { char *msg = nm_to_string((NeonMessage *) request); @@ -274,7 +268,6 @@ pageserver_flush(void) pageserver_disconnect(); neon_log(ERROR, "failed to flush page requests: %s", msg); } - n_unflushed_requests = 0; } page_server_api api = { @@ -436,7 +429,7 @@ pg_init_libpagestore(void) NULL, &flush_every_n_requests, 8, -1, INT_MAX, - PGC_SIGHUP, + PGC_USERSET, 0, /* no flags required */ NULL, NULL, NULL); diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index be6c4b3a77..5d8617a158 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -150,6 +150,7 @@ extern void prefetch_on_ps_disconnect(void); extern page_server_api * page_server; extern char *page_server_connstring; +extern int flush_every_n_requests; extern bool seqscan_prefetch_enabled; extern int seqscan_prefetch_distance; extern char *neon_timeline; diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 59c5ff8db2..e8d6211bff 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -192,7 +192,7 @@ typedef struct PrfHashEntry { * It maintains a (ring) buffer of in-flight requests and responses. * * We maintain several indexes into the ring buffer: - * ring_unused >= ring_receive >= ring_last >= 0 + * ring_unused >= ring_flush >= ring_receive >= ring_last >= 0 * * ring_unused points to the first unused slot of the buffer * ring_receive is the next request that is to be received @@ -208,6 +208,7 @@ typedef struct PrefetchState { /* buffer indexes */ uint64 ring_unused; /* first unused slot */ + uint64 ring_flush; /* next request to flush */ uint64 ring_receive; /* next slot that is to receive a response */ uint64 ring_last; /* min slot with a response value */ @@ -577,6 +578,13 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls prefetch_do_request(slot, force_latest, force_lsn); Assert(slot->status == PRFS_REQUESTED); Assert(ring_index < MyPState->ring_unused); + + if (flush_every_n_requests > 0 && MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests) + { + page_server->flush(); + MyPState->ring_flush = MyPState->ring_unused; + } + return ring_index; } @@ -585,6 +593,7 @@ page_server_request(void const *req) { page_server->send((NeonRequest *) req); page_server->flush(); + MyPState->ring_flush = MyPState->ring_unused; consume_prefetch_responses(); return page_server->receive(); } @@ -1581,6 +1590,7 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, if (entry->slot->status == PRFS_REQUESTED) { page_server->flush(); + MyPState->ring_flush = MyPState->ring_unused; prefetch_wait_for(entry->slot->my_ring_index); } /* drop caches */ @@ -1606,7 +1616,11 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, Assert(slot->status != PRFS_UNUSED); Assert(&MyPState->prf_buffer[(ring_index % READ_BUFFER_SIZE)] == slot); - page_server->flush(); + if (ring_index >= MyPState->ring_flush) + { + page_server->flush(); + MyPState->ring_flush = MyPState->ring_unused; + } prefetch_wait_for(ring_index); Assert(slot->status == PRFS_RECEIVED);