diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 1e4e18e7d1..d8e9d8b52c 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -52,6 +52,7 @@ char *page_server_connstring_raw; int n_unflushed_requests = 0; int flush_every_n_requests = 8; +int readahead_buffer_size = 128; static void pageserver_flush(void); @@ -449,9 +450,22 @@ pg_init_libpagestore(void) NULL, &flush_every_n_requests, 8, -1, INT_MAX, - PGC_SIGHUP, + PGC_USERSET, 0, /* no flags required */ NULL, NULL, NULL); + DefineCustomIntVariable("neon.readahead_buffer_size", + "number of prefetches to buffer", + "This buffer is used to store prefetched data; so " + "it is important that this buffer is at least as " + "large as the configured value of all tablespaces' " + "effective_io_concurrency and maintenance_io_concurrency, " + "your sessions' values of these, and the value for " + "seqscan_prefetch_buffers.", + &readahead_buffer_size, + 128, 16, 1024, + PGC_USERSET, + 0, /* no flags required */ + NULL, (GucIntAssignHook) &readahead_buffer_resize, NULL); relsize_hash_init(); diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index be6c4b3a77..9b8081065c 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -150,6 +150,8 @@ extern void prefetch_on_ps_disconnect(void); extern page_server_api * page_server; extern char *page_server_connstring; +extern int flush_every_n_requests; +extern int readahead_buffer_size; extern bool seqscan_prefetch_enabled; extern int seqscan_prefetch_distance; extern char *neon_timeline; @@ -159,6 +161,7 @@ extern int32 max_cluster_size; extern const f_smgr *smgr_neon(BackendId backend, RelFileNode rnode); extern void smgr_init_neon(void); +extern void readahead_buffer_resize(int newsize, void *extra); /* Neon storage manager functionality */ diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 59c5ff8db2..d6fa7c46c9 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -116,10 +116,10 @@ static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS; * * Prefetch is performed locally by each backend. * - * There can be up to READ_BUFFER_SIZE active IO requests registered at any - * time. Requests using smgr_prefetch are sent to the pageserver, but we don't - * wait on the response. Requests using smgr_read are either read from the - * buffer, or (if that's not possible) we wait on the response to arrive - + * There can be up to readahead_buffer_size active IO requests registered at + * any time. Requests using smgr_prefetch are sent to the pageserver, but we + * don't wait on the response. Requests using smgr_read are either read from + * the buffer, or (if that's not possible) we wait on the response to arrive - * this also will allow us to receive other prefetched pages. * Each request is immediately written to the output buffer of the pageserver * connection, but may not be flushed if smgr_prefetch is used: pageserver @@ -136,15 +136,25 @@ static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS; * the connection; the responses are stored for later use. * * NOTE: The current implementation of the prefetch system implements a ring - * buffer of up to READ_BUFFER_SIZE requests. If there are more _read and + * buffer of up to readahead_buffer_size requests. If there are more _read and * _prefetch requests between the initial _prefetch and the _read of a buffer, * the prefetch request will have been dropped from this prefetch buffer, and * your prefetch was wasted. */ -/* Max amount of tracked buffer reads */ -#define READ_BUFFER_SIZE 128 - +/* + * State machine: + * + * not in hash : in hash + * : + * UNUSED ------> REQUESTED --> RECEIVED + * ^ : | | + * | : v | + * | : TAG_UNUSED | + * | : | | + * +----------------+------------+ + * : + */ typedef enum PrefetchStatus { PRFS_UNUSED = 0, /* unused slot */ PRFS_REQUESTED, /* request was written to the sendbuffer to PS, but not @@ -192,7 +202,7 @@ typedef struct PrfHashEntry { * It maintains a (ring) buffer of in-flight requests and responses. * * We maintain several indexes into the ring buffer: - * ring_unused >= ring_receive >= ring_last >= 0 + * ring_unused >= ring_flush >= ring_receive >= ring_last >= 0 * * ring_unused points to the first unused slot of the buffer * ring_receive is the next request that is to be received @@ -208,6 +218,7 @@ typedef struct PrefetchState { /* buffer indexes */ uint64 ring_unused; /* first unused slot */ + uint64 ring_flush; /* next request to flush */ uint64 ring_receive; /* next slot that is to receive a response */ uint64 ring_last; /* min slot with a response value */ @@ -218,11 +229,19 @@ typedef struct PrefetchState { /* the buffers */ prfh_hash *prf_hash; - PrefetchRequest prf_buffer[READ_BUFFER_SIZE]; /* prefetch buffers */ + PrefetchRequest prf_buffer[]; /* prefetch buffers */ } PrefetchState; PrefetchState *MyPState; +#define GetPrfSlot(ring_index) ( \ + ( \ + AssertMacro((ring_index) < MyPState->ring_unused && \ + (ring_index) >= MyPState->ring_last), \ + &MyPState->prf_buffer[((ring_index) % readahead_buffer_size)] \ + ) \ +) + int n_prefetch_hits = 0; int n_prefetch_misses = 0; int n_prefetch_missed_caches = 0; @@ -236,14 +255,112 @@ static void prefetch_read(PrefetchRequest *slot); static void prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn); static void prefetch_wait_for(uint64 ring_index); static void prefetch_cleanup(void); -static inline void prefetch_set_unused(uint64 ring_index, bool hash_cleanup); +static inline void prefetch_set_unused(uint64 ring_index); static XLogRecPtr neon_get_request_lsn(bool *latest, RelFileNode rnode, ForkNumber forknum, BlockNumber blkno); +void +readahead_buffer_resize(int newsize, void *extra) +{ + uint64 end, + nfree = newsize; + PrefetchState *newPState; + Size newprfs_size = offsetof(PrefetchState, prf_buffer) + ( + sizeof(PrefetchRequest) * readahead_buffer_size + ); + + /* don't try to re-initialize if we haven't initialized yet */ + if (MyPState == NULL) + return; + + /* + * Make sure that we don't lose track of active prefetch requests by + * ensuring we have received all but the last n requests (n = newsize). + */ + if (MyPState->n_requests_inflight > newsize) + prefetch_wait_for(MyPState->ring_unused - newsize); + + /* construct the new PrefetchState, and copy over the memory contexts */ + newPState = MemoryContextAllocZero(TopMemoryContext, newprfs_size); + + newPState->bufctx = MyPState->bufctx; + newPState->errctx = MyPState->errctx; + newPState->hashctx = MyPState->hashctx; + newPState->prf_hash = prfh_create(MyPState->hashctx, newsize, NULL); + newPState->n_unused = newsize; + newPState->n_requests_inflight = 0; + newPState->n_responses_buffered = 0; + newPState->ring_last = newsize; + newPState->ring_unused = newsize; + newPState->ring_receive = newsize; + newPState->ring_flush = newsize; + + /* + * Copy over the prefetches. + * + * We populate the prefetch array from the end; to retain the most recent + * prefetches, but this has the benefit of only needing to do one iteration + * on the dataset, and trivial compaction. + */ + for (end = MyPState->ring_unused - 1; + end >= MyPState->ring_last && end != UINT64_MAX && nfree != 0; + end -= 1) + { + PrefetchRequest *slot = GetPrfSlot(end); + PrefetchRequest *newslot; + bool found; + + if (slot->status == PRFS_UNUSED) + continue; + + nfree -= 1; + + newslot = &newPState->prf_buffer[nfree]; + *newslot = *slot; + newslot->my_ring_index = nfree; + + prfh_insert(newPState->prf_hash, newslot, &found); + + Assert(!found); + + switch (newslot->status) + { + case PRFS_UNUSED: + pg_unreachable(); + case PRFS_REQUESTED: + newPState->n_requests_inflight += 1; + newPState->ring_receive -= 1; + newPState->ring_last -= 1; + break; + case PRFS_RECEIVED: + newPState->n_responses_buffered += 1; + newPState->ring_last -= 1; + break; + case PRFS_TAG_REMAINS: + newPState->ring_last -= 1; + break; + } + newPState->n_unused -= 1; + } + + for (; end >= MyPState->ring_last && end != UINT64_MAX; end -= 1) + { + prefetch_set_unused(end); + } + + prfh_destroy(MyPState->prf_hash); + pfree(MyPState); + MyPState = newPState; +} + + /* * Make sure that there are no responses still in the buffer. + * + * NOTE: this function may indirectly update MyPState->pfs_hash; which + * invalidates any active pointers into the hash table. */ static void consume_prefetch_responses(void) @@ -255,14 +372,12 @@ consume_prefetch_responses(void) static void prefetch_cleanup(void) { - int index; uint64 ring_index; PrefetchRequest *slot; while (MyPState->ring_last < MyPState->ring_receive) { ring_index = MyPState->ring_last; - index = (ring_index % READ_BUFFER_SIZE); - slot = &MyPState->prf_buffer[index]; + slot = GetPrfSlot(ring_index); if (slot->status == PRFS_UNUSED) MyPState->ring_last += 1; @@ -274,19 +389,27 @@ prefetch_cleanup(void) /* * Wait for slot of ring_index to have received its response. * The caller is responsible for making sure the request buffer is flushed. + * + * NOTE: this function may indirectly update MyPState->pfs_hash; which + * invalidates any active pointers into the hash table. */ static void prefetch_wait_for(uint64 ring_index) { - int index; PrefetchRequest *entry; + if (MyPState->ring_flush <= ring_index && + MyPState->ring_unused > MyPState->ring_flush) + { + page_server->flush(); + MyPState->ring_flush = MyPState->ring_unused; + } + Assert(MyPState->ring_unused > ring_index); while (MyPState->ring_receive <= ring_index) { - index = (MyPState->ring_receive % READ_BUFFER_SIZE); - entry = &MyPState->prf_buffer[index]; + entry = GetPrfSlot(MyPState->ring_receive); Assert(entry->status == PRFS_REQUESTED); prefetch_read(entry); @@ -298,6 +421,9 @@ prefetch_wait_for(uint64 ring_index) * * The caller is responsible for making sure that the request for this buffer * was flushed to the PageServer. + * + * NOTE: this function may indirectly update MyPState->pfs_hash; which + * invalidates any active pointers into the hash table. */ static void prefetch_read(PrefetchRequest *slot) @@ -312,7 +438,7 @@ prefetch_read(PrefetchRequest *slot) old = MemoryContextSwitchTo(MyPState->errctx); response = (NeonResponse *) page_server->receive(); MemoryContextSwitchTo(old); - + /* update prefetch state */ MyPState->n_responses_buffered += 1; MyPState->n_requests_inflight -= 1; @@ -332,19 +458,22 @@ prefetch_read(PrefetchRequest *slot) void prefetch_on_ps_disconnect(void) { - for (; MyPState->ring_receive < MyPState->ring_unused; MyPState->ring_receive++) + MyPState->ring_flush = MyPState->ring_unused; + while (MyPState->ring_receive < MyPState->ring_unused) { PrefetchRequest *slot; - int index = MyPState->ring_receive % READ_BUFFER_SIZE; + uint64 ring_index = MyPState->ring_receive; + + slot = GetPrfSlot(ring_index); - slot = &MyPState->prf_buffer[index]; Assert(slot->status == PRFS_REQUESTED); - Assert(slot->my_ring_index == MyPState->ring_receive); + Assert(slot->my_ring_index == ring_index); /* clean up the request */ slot->status = PRFS_TAG_REMAINS; - MyPState->n_requests_inflight--; - prefetch_set_unused(MyPState->ring_receive, true); + MyPState->n_requests_inflight -= 1; + MyPState->ring_receive += 1; + prefetch_set_unused(ring_index); } } @@ -353,21 +482,24 @@ prefetch_on_ps_disconnect(void) * * The slot at ring_index must be a current member of the ring buffer, * and may not be in the PRFS_REQUESTED state. + * + * NOTE: this function will update MyPState->pfs_hash; which invalidates any + * active pointers into the hash table. */ static inline void -prefetch_set_unused(uint64 ring_index, bool hash_cleanup) +prefetch_set_unused(uint64 ring_index) { - PrefetchRequest *slot = &MyPState->prf_buffer[ring_index % READ_BUFFER_SIZE]; + PrefetchRequest *slot = GetPrfSlot(ring_index); - Assert(MyPState->ring_last <= ring_index && - MyPState->ring_unused > ring_index); + if (ring_index < MyPState->ring_last) + return; /* Should already be unused */ + + Assert(MyPState->ring_unused > ring_index); if (slot->status == PRFS_UNUSED) return; Assert(slot->status == PRFS_RECEIVED || slot->status == PRFS_TAG_REMAINS); - Assert(ring_index >= MyPState->ring_last && - ring_index < MyPState->ring_unused); if (slot->status == PRFS_RECEIVED) { @@ -382,8 +514,7 @@ prefetch_set_unused(uint64 ring_index, bool hash_cleanup) Assert(slot->response == NULL); } - if (hash_cleanup) - prfh_delete(MyPState->prf_hash, slot); + prfh_delete(MyPState->prf_hash, slot); /* clear all fields */ MemSet(slot, 0, sizeof(PrefetchRequest)); @@ -397,6 +528,7 @@ prefetch_set_unused(uint64 ring_index, bool hash_cleanup) static void prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn) { + bool found; NeonGetPageRequest request = { .req.tag = T_NeonGetPageRequest, .req.latest = false, @@ -454,6 +586,9 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force /* update slot state */ slot->status = PRFS_REQUESTED; + + prfh_insert(MyPState->prf_hash, slot, &found); + Assert(!found); } /* @@ -464,13 +599,14 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force * If force_latest and force_lsn are not NULL, those values are sent to the * pageserver. If they are NULL, we utilize the lastWrittenLsn -infrastructure * to fill in these values manually. + * + * NOTE: this function may indirectly update MyPState->pfs_hash; which + * invalidates any active pointers into the hash table. */ static uint64 prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_lsn) { - int index; - bool found; uint64 ring_index; PrefetchRequest req; PrefetchRequest *slot; @@ -485,28 +621,49 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls { slot = entry->slot; ring_index = slot->my_ring_index; - index = (ring_index % READ_BUFFER_SIZE); - Assert(slot == &MyPState->prf_buffer[index]); + Assert(slot == GetPrfSlot(ring_index)); Assert(slot->status != PRFS_UNUSED); + Assert(MyPState->ring_last <= ring_index && + ring_index < MyPState->ring_unused); Assert(BUFFERTAGS_EQUAL(slot->buftag, tag)); - + /* * If we want a specific lsn, we do not accept requests that were made * with a potentially different LSN. */ - if (force_lsn && slot->effective_request_lsn != *force_lsn) + if (force_latest && force_lsn) { - prefetch_wait_for(ring_index); - prefetch_set_unused(ring_index, true); + /* if we want the latest version, any effective_request_lsn < request lsn is OK */ + if (*force_latest) + { + if (*force_lsn > slot->effective_request_lsn) + { + prefetch_wait_for(ring_index); + prefetch_set_unused(ring_index); + entry = NULL; + } + } + /* if we don't want the latest version, only accept requests with the exact same LSN */ + else + { + if (*force_lsn != slot->effective_request_lsn) + { + prefetch_wait_for(ring_index); + prefetch_set_unused(ring_index); + entry = NULL; + } + } } + /* * We received a prefetch for a page that was recently read and * removed from the buffers. Remove that request from the buffers. */ else if (slot->status == PRFS_TAG_REMAINS) { - prefetch_set_unused(ring_index, true); + prefetch_set_unused(ring_index); + entry = NULL; } else { @@ -529,9 +686,10 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls * output buffer, and 'not sending' a prefetch request kind of goes * against the principles of prefetching) */ - if (MyPState->ring_last + READ_BUFFER_SIZE - 1 == MyPState->ring_unused) + if (MyPState->ring_last + readahead_buffer_size - 1 == MyPState->ring_unused) { - slot = &MyPState->prf_buffer[(MyPState->ring_last % READ_BUFFER_SIZE)]; + uint64 cleanup_index = MyPState->ring_last; + slot = GetPrfSlot(cleanup_index); Assert(slot->status != PRFS_UNUSED); @@ -539,13 +697,13 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls switch (slot->status) { case PRFS_REQUESTED: - Assert(MyPState->ring_receive == MyPState->ring_last); - prefetch_wait_for(MyPState->ring_last); - prefetch_set_unused(MyPState->ring_last, true); + Assert(MyPState->ring_receive == cleanup_index); + prefetch_wait_for(cleanup_index); + prefetch_set_unused(cleanup_index); break; case PRFS_RECEIVED: case PRFS_TAG_REMAINS: - prefetch_set_unused(MyPState->ring_last, true); + prefetch_set_unused(cleanup_index); break; default: pg_unreachable(); @@ -553,12 +711,11 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls } /* - * The next buffer pointed to by `ring_unused` is now unused, so we can insert - * the new request to it. + * The next buffer pointed to by `ring_unused` is now definitely empty, + * so we can insert the new request to it. */ ring_index = MyPState->ring_unused; - index = (ring_index % READ_BUFFER_SIZE); - slot = &MyPState->prf_buffer[index]; + slot = &MyPState->prf_buffer[((ring_index) % readahead_buffer_size)]; Assert(MyPState->ring_last <= ring_index); @@ -571,12 +728,18 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls slot->buftag = tag; slot->my_ring_index = ring_index; - prfh_insert(MyPState->prf_hash, slot, &found); - Assert(!found); - prefetch_do_request(slot, force_latest, force_lsn); Assert(slot->status == PRFS_REQUESTED); - Assert(ring_index < MyPState->ring_unused); + Assert(MyPState->ring_last <= ring_index && + ring_index < MyPState->ring_unused); + + if (flush_every_n_requests > 0 && + MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests) + { + page_server->flush(); + MyPState->ring_flush = MyPState->ring_unused; + } + return ring_index; } @@ -585,6 +748,7 @@ page_server_request(void const *req) { page_server->send((NeonRequest *) req); page_server->flush(); + MyPState->ring_flush = MyPState->ring_unused; consume_prefetch_responses(); return page_server->receive(); } @@ -1052,14 +1216,18 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, ch void neon_init(void) { - HASHCTL info; + Size prfs_size; if (MyPState != NULL) return; - MyPState = MemoryContextAllocZero(TopMemoryContext, sizeof(PrefetchState)); + prfs_size = offsetof(PrefetchState, prf_buffer) + ( + sizeof(PrefetchRequest) * readahead_buffer_size + ); + + MyPState = MemoryContextAllocZero(TopMemoryContext, prfs_size); - MyPState->n_unused = READ_BUFFER_SIZE; + MyPState->n_unused = readahead_buffer_size; MyPState->bufctx = SlabContextCreate(TopMemoryContext, "NeonSMGR/prefetch", @@ -1072,11 +1240,8 @@ neon_init(void) "NeonSMGR/prefetch", ALLOCSET_DEFAULT_SIZES); - info.keysize = sizeof(BufferTag); - info.entrysize = sizeof(uint64); - MyPState->prf_hash = prfh_create(MyPState->hashctx, - READ_BUFFER_SIZE, NULL); + readahead_buffer_size, NULL); #ifdef DEBUG_COMPARE_LOCAL mdinit(); @@ -1470,7 +1635,7 @@ neon_close(SMgrRelation reln, ForkNumber forknum) bool neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) { - uint64 ring_index; + uint64 ring_index PG_USED_FOR_ASSERTS_ONLY; switch (reln->smgr_relpersistence) { @@ -1565,9 +1730,9 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, if (entry != NULL) { - if (entry->slot->effective_request_lsn >= prefetch_lsn) + slot = entry->slot; + if (slot->effective_request_lsn >= request_lsn) { - slot = entry->slot; ring_index = slot->my_ring_index; n_prefetch_hits += 1; } @@ -1578,13 +1743,12 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, * unlikely this happens, but it can happen if prefetch distance is * large enough and a backend didn't consume all prefetch requests. */ - if (entry->slot->status == PRFS_REQUESTED) + if (slot->status == PRFS_REQUESTED) { - page_server->flush(); - prefetch_wait_for(entry->slot->my_ring_index); + prefetch_wait_for(slot->my_ring_index); } /* drop caches */ - prefetch_set_unused(entry->slot->my_ring_index, true); + prefetch_set_unused(slot->my_ring_index); n_prefetch_missed_caches += 1; /* make it look like a prefetch cache miss */ entry = NULL; @@ -1597,16 +1761,15 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, ring_index = prefetch_register_buffer(buftag, &request_latest, &request_lsn); - slot = &MyPState->prf_buffer[(ring_index % READ_BUFFER_SIZE)]; + slot = GetPrfSlot(ring_index); } + Assert(slot->my_ring_index == ring_index); Assert(MyPState->ring_last <= ring_index && MyPState->ring_unused > ring_index); - Assert(slot->my_ring_index == ring_index); Assert(slot->status != PRFS_UNUSED); - Assert(&MyPState->prf_buffer[(ring_index % READ_BUFFER_SIZE)] == slot); + Assert(GetPrfSlot(ring_index) == slot); - page_server->flush(); prefetch_wait_for(ring_index); Assert(slot->status == PRFS_RECEIVED); @@ -1637,7 +1800,7 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, } /* buffer was used, clean up for later reuse */ - prefetch_set_unused(ring_index, true); + prefetch_set_unused(ring_index); prefetch_cleanup(); } diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index e56b812dd8..cd0693e2be 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit e56b812dd85a3d9355478cc626c10909406816ba +Subproject commit cd0693e2be224bedfa0b61f9c5e2ff4cd88eec2c diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index 39e3d745b3..e9e0fd5947 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit 39e3d745b3701a3f47f40412fbec62cbb01a42bf +Subproject commit e9e0fd59477587ff571189f731e0f39bdfae57e3