Address issues in the pagestore prefetch mechanism: (#2790)

- Update vendored PostgreSQL to address prefetch issues
 - Make flushed state explicit in PrefetchState
 - Move flush logic into prefetch_wait_for, where possible
 - Clean up some prefetch state handling code in the various code
elements handling state transitions.
 - Fix a race condition in neon_read_at_lsn where a hash entry pointer
was used after the hash table was updated. This could result in
incorrect state transitions and assertion failures after disconnects
during prefetch_wait_for in that neon_read_at_lsn.
 
Fixes #2780
This commit is contained in:
MMeent
2022-11-15 15:12:38 +01:00
committed by GitHub
parent 03190a2161
commit 01778e37cc
5 changed files with 257 additions and 77 deletions

View File

@@ -52,6 +52,7 @@ char *page_server_connstring_raw;
int n_unflushed_requests = 0;
int flush_every_n_requests = 8;
int readahead_buffer_size = 128;
static void pageserver_flush(void);
@@ -449,9 +450,22 @@ pg_init_libpagestore(void)
NULL,
&flush_every_n_requests,
8, -1, INT_MAX,
PGC_SIGHUP,
PGC_USERSET,
0, /* no flags required */
NULL, NULL, NULL);
DefineCustomIntVariable("neon.readahead_buffer_size",
"number of prefetches to buffer",
"This buffer is used to store prefetched data; so "
"it is important that this buffer is at least as "
"large as the configured value of all tablespaces' "
"effective_io_concurrency and maintenance_io_concurrency, "
"your sessions' values of these, and the value for "
"seqscan_prefetch_buffers.",
&readahead_buffer_size,
128, 16, 1024,
PGC_USERSET,
0, /* no flags required */
NULL, (GucIntAssignHook) &readahead_buffer_resize, NULL);
relsize_hash_init();

View File

@@ -150,6 +150,8 @@ extern void prefetch_on_ps_disconnect(void);
extern page_server_api * page_server;
extern char *page_server_connstring;
extern int flush_every_n_requests;
extern int readahead_buffer_size;
extern bool seqscan_prefetch_enabled;
extern int seqscan_prefetch_distance;
extern char *neon_timeline;
@@ -159,6 +161,7 @@ extern int32 max_cluster_size;
extern const f_smgr *smgr_neon(BackendId backend, RelFileNode rnode);
extern void smgr_init_neon(void);
extern void readahead_buffer_resize(int newsize, void *extra);
/* Neon storage manager functionality */

View File

@@ -116,10 +116,10 @@ static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
*
* Prefetch is performed locally by each backend.
*
* There can be up to READ_BUFFER_SIZE active IO requests registered at any
* time. Requests using smgr_prefetch are sent to the pageserver, but we don't
* wait on the response. Requests using smgr_read are either read from the
* buffer, or (if that's not possible) we wait on the response to arrive -
* There can be up to readahead_buffer_size active IO requests registered at
* any time. Requests using smgr_prefetch are sent to the pageserver, but we
* don't wait on the response. Requests using smgr_read are either read from
* the buffer, or (if that's not possible) we wait on the response to arrive -
* this also will allow us to receive other prefetched pages.
* Each request is immediately written to the output buffer of the pageserver
* connection, but may not be flushed if smgr_prefetch is used: pageserver
@@ -136,15 +136,25 @@ static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
* the connection; the responses are stored for later use.
*
* NOTE: The current implementation of the prefetch system implements a ring
* buffer of up to READ_BUFFER_SIZE requests. If there are more _read and
* buffer of up to readahead_buffer_size requests. If there are more _read and
* _prefetch requests between the initial _prefetch and the _read of a buffer,
* the prefetch request will have been dropped from this prefetch buffer, and
* your prefetch was wasted.
*/
/* Max amount of tracked buffer reads */
#define READ_BUFFER_SIZE 128
/*
* State machine:
*
* not in hash : in hash
* :
* UNUSED ------> REQUESTED --> RECEIVED
* ^ : | |
* | : v |
* | : TAG_UNUSED |
* | : | |
* +----------------+------------+
* :
*/
typedef enum PrefetchStatus {
PRFS_UNUSED = 0, /* unused slot */
PRFS_REQUESTED, /* request was written to the sendbuffer to PS, but not
@@ -192,7 +202,7 @@ typedef struct PrfHashEntry {
* It maintains a (ring) buffer of in-flight requests and responses.
*
* We maintain several indexes into the ring buffer:
* ring_unused >= ring_receive >= ring_last >= 0
* ring_unused >= ring_flush >= ring_receive >= ring_last >= 0
*
* ring_unused points to the first unused slot of the buffer
* ring_receive is the next request that is to be received
@@ -208,6 +218,7 @@ typedef struct PrefetchState {
/* buffer indexes */
uint64 ring_unused; /* first unused slot */
uint64 ring_flush; /* next request to flush */
uint64 ring_receive; /* next slot that is to receive a response */
uint64 ring_last; /* min slot with a response value */
@@ -218,11 +229,19 @@ typedef struct PrefetchState {
/* the buffers */
prfh_hash *prf_hash;
PrefetchRequest prf_buffer[READ_BUFFER_SIZE]; /* prefetch buffers */
PrefetchRequest prf_buffer[]; /* prefetch buffers */
} PrefetchState;
PrefetchState *MyPState;
#define GetPrfSlot(ring_index) ( \
( \
AssertMacro((ring_index) < MyPState->ring_unused && \
(ring_index) >= MyPState->ring_last), \
&MyPState->prf_buffer[((ring_index) % readahead_buffer_size)] \
) \
)
int n_prefetch_hits = 0;
int n_prefetch_misses = 0;
int n_prefetch_missed_caches = 0;
@@ -236,14 +255,112 @@ static void prefetch_read(PrefetchRequest *slot);
static void prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn);
static void prefetch_wait_for(uint64 ring_index);
static void prefetch_cleanup(void);
static inline void prefetch_set_unused(uint64 ring_index, bool hash_cleanup);
static inline void prefetch_set_unused(uint64 ring_index);
static XLogRecPtr neon_get_request_lsn(bool *latest, RelFileNode rnode,
ForkNumber forknum, BlockNumber blkno);
void
readahead_buffer_resize(int newsize, void *extra)
{
uint64 end,
nfree = newsize;
PrefetchState *newPState;
Size newprfs_size = offsetof(PrefetchState, prf_buffer) + (
sizeof(PrefetchRequest) * readahead_buffer_size
);
/* don't try to re-initialize if we haven't initialized yet */
if (MyPState == NULL)
return;
/*
* Make sure that we don't lose track of active prefetch requests by
* ensuring we have received all but the last n requests (n = newsize).
*/
if (MyPState->n_requests_inflight > newsize)
prefetch_wait_for(MyPState->ring_unused - newsize);
/* construct the new PrefetchState, and copy over the memory contexts */
newPState = MemoryContextAllocZero(TopMemoryContext, newprfs_size);
newPState->bufctx = MyPState->bufctx;
newPState->errctx = MyPState->errctx;
newPState->hashctx = MyPState->hashctx;
newPState->prf_hash = prfh_create(MyPState->hashctx, newsize, NULL);
newPState->n_unused = newsize;
newPState->n_requests_inflight = 0;
newPState->n_responses_buffered = 0;
newPState->ring_last = newsize;
newPState->ring_unused = newsize;
newPState->ring_receive = newsize;
newPState->ring_flush = newsize;
/*
* Copy over the prefetches.
*
* We populate the prefetch array from the end; to retain the most recent
* prefetches, but this has the benefit of only needing to do one iteration
* on the dataset, and trivial compaction.
*/
for (end = MyPState->ring_unused - 1;
end >= MyPState->ring_last && end != UINT64_MAX && nfree != 0;
end -= 1)
{
PrefetchRequest *slot = GetPrfSlot(end);
PrefetchRequest *newslot;
bool found;
if (slot->status == PRFS_UNUSED)
continue;
nfree -= 1;
newslot = &newPState->prf_buffer[nfree];
*newslot = *slot;
newslot->my_ring_index = nfree;
prfh_insert(newPState->prf_hash, newslot, &found);
Assert(!found);
switch (newslot->status)
{
case PRFS_UNUSED:
pg_unreachable();
case PRFS_REQUESTED:
newPState->n_requests_inflight += 1;
newPState->ring_receive -= 1;
newPState->ring_last -= 1;
break;
case PRFS_RECEIVED:
newPState->n_responses_buffered += 1;
newPState->ring_last -= 1;
break;
case PRFS_TAG_REMAINS:
newPState->ring_last -= 1;
break;
}
newPState->n_unused -= 1;
}
for (; end >= MyPState->ring_last && end != UINT64_MAX; end -= 1)
{
prefetch_set_unused(end);
}
prfh_destroy(MyPState->prf_hash);
pfree(MyPState);
MyPState = newPState;
}
/*
* Make sure that there are no responses still in the buffer.
*
* NOTE: this function may indirectly update MyPState->pfs_hash; which
* invalidates any active pointers into the hash table.
*/
static void
consume_prefetch_responses(void)
@@ -255,14 +372,12 @@ consume_prefetch_responses(void)
static void
prefetch_cleanup(void)
{
int index;
uint64 ring_index;
PrefetchRequest *slot;
while (MyPState->ring_last < MyPState->ring_receive) {
ring_index = MyPState->ring_last;
index = (ring_index % READ_BUFFER_SIZE);
slot = &MyPState->prf_buffer[index];
slot = GetPrfSlot(ring_index);
if (slot->status == PRFS_UNUSED)
MyPState->ring_last += 1;
@@ -274,19 +389,27 @@ prefetch_cleanup(void)
/*
* Wait for slot of ring_index to have received its response.
* The caller is responsible for making sure the request buffer is flushed.
*
* NOTE: this function may indirectly update MyPState->pfs_hash; which
* invalidates any active pointers into the hash table.
*/
static void
prefetch_wait_for(uint64 ring_index)
{
int index;
PrefetchRequest *entry;
if (MyPState->ring_flush <= ring_index &&
MyPState->ring_unused > MyPState->ring_flush)
{
page_server->flush();
MyPState->ring_flush = MyPState->ring_unused;
}
Assert(MyPState->ring_unused > ring_index);
while (MyPState->ring_receive <= ring_index)
{
index = (MyPState->ring_receive % READ_BUFFER_SIZE);
entry = &MyPState->prf_buffer[index];
entry = GetPrfSlot(MyPState->ring_receive);
Assert(entry->status == PRFS_REQUESTED);
prefetch_read(entry);
@@ -298,6 +421,9 @@ prefetch_wait_for(uint64 ring_index)
*
* The caller is responsible for making sure that the request for this buffer
* was flushed to the PageServer.
*
* NOTE: this function may indirectly update MyPState->pfs_hash; which
* invalidates any active pointers into the hash table.
*/
static void
prefetch_read(PrefetchRequest *slot)
@@ -312,7 +438,7 @@ prefetch_read(PrefetchRequest *slot)
old = MemoryContextSwitchTo(MyPState->errctx);
response = (NeonResponse *) page_server->receive();
MemoryContextSwitchTo(old);
/* update prefetch state */
MyPState->n_responses_buffered += 1;
MyPState->n_requests_inflight -= 1;
@@ -332,19 +458,22 @@ prefetch_read(PrefetchRequest *slot)
void
prefetch_on_ps_disconnect(void)
{
for (; MyPState->ring_receive < MyPState->ring_unused; MyPState->ring_receive++)
MyPState->ring_flush = MyPState->ring_unused;
while (MyPState->ring_receive < MyPState->ring_unused)
{
PrefetchRequest *slot;
int index = MyPState->ring_receive % READ_BUFFER_SIZE;
uint64 ring_index = MyPState->ring_receive;
slot = GetPrfSlot(ring_index);
slot = &MyPState->prf_buffer[index];
Assert(slot->status == PRFS_REQUESTED);
Assert(slot->my_ring_index == MyPState->ring_receive);
Assert(slot->my_ring_index == ring_index);
/* clean up the request */
slot->status = PRFS_TAG_REMAINS;
MyPState->n_requests_inflight--;
prefetch_set_unused(MyPState->ring_receive, true);
MyPState->n_requests_inflight -= 1;
MyPState->ring_receive += 1;
prefetch_set_unused(ring_index);
}
}
@@ -353,21 +482,24 @@ prefetch_on_ps_disconnect(void)
*
* The slot at ring_index must be a current member of the ring buffer,
* and may not be in the PRFS_REQUESTED state.
*
* NOTE: this function will update MyPState->pfs_hash; which invalidates any
* active pointers into the hash table.
*/
static inline void
prefetch_set_unused(uint64 ring_index, bool hash_cleanup)
prefetch_set_unused(uint64 ring_index)
{
PrefetchRequest *slot = &MyPState->prf_buffer[ring_index % READ_BUFFER_SIZE];
PrefetchRequest *slot = GetPrfSlot(ring_index);
Assert(MyPState->ring_last <= ring_index &&
MyPState->ring_unused > ring_index);
if (ring_index < MyPState->ring_last)
return; /* Should already be unused */
Assert(MyPState->ring_unused > ring_index);
if (slot->status == PRFS_UNUSED)
return;
Assert(slot->status == PRFS_RECEIVED || slot->status == PRFS_TAG_REMAINS);
Assert(ring_index >= MyPState->ring_last &&
ring_index < MyPState->ring_unused);
if (slot->status == PRFS_RECEIVED)
{
@@ -382,8 +514,7 @@ prefetch_set_unused(uint64 ring_index, bool hash_cleanup)
Assert(slot->response == NULL);
}
if (hash_cleanup)
prfh_delete(MyPState->prf_hash, slot);
prfh_delete(MyPState->prf_hash, slot);
/* clear all fields */
MemSet(slot, 0, sizeof(PrefetchRequest));
@@ -397,6 +528,7 @@ prefetch_set_unused(uint64 ring_index, bool hash_cleanup)
static void
prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn)
{
bool found;
NeonGetPageRequest request = {
.req.tag = T_NeonGetPageRequest,
.req.latest = false,
@@ -454,6 +586,9 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
/* update slot state */
slot->status = PRFS_REQUESTED;
prfh_insert(MyPState->prf_hash, slot, &found);
Assert(!found);
}
/*
@@ -464,13 +599,14 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
* If force_latest and force_lsn are not NULL, those values are sent to the
* pageserver. If they are NULL, we utilize the lastWrittenLsn -infrastructure
* to fill in these values manually.
*
* NOTE: this function may indirectly update MyPState->pfs_hash; which
* invalidates any active pointers into the hash table.
*/
static uint64
prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_lsn)
{
int index;
bool found;
uint64 ring_index;
PrefetchRequest req;
PrefetchRequest *slot;
@@ -485,28 +621,49 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
{
slot = entry->slot;
ring_index = slot->my_ring_index;
index = (ring_index % READ_BUFFER_SIZE);
Assert(slot == &MyPState->prf_buffer[index]);
Assert(slot == GetPrfSlot(ring_index));
Assert(slot->status != PRFS_UNUSED);
Assert(MyPState->ring_last <= ring_index &&
ring_index < MyPState->ring_unused);
Assert(BUFFERTAGS_EQUAL(slot->buftag, tag));
/*
* If we want a specific lsn, we do not accept requests that were made
* with a potentially different LSN.
*/
if (force_lsn && slot->effective_request_lsn != *force_lsn)
if (force_latest && force_lsn)
{
prefetch_wait_for(ring_index);
prefetch_set_unused(ring_index, true);
/* if we want the latest version, any effective_request_lsn < request lsn is OK */
if (*force_latest)
{
if (*force_lsn > slot->effective_request_lsn)
{
prefetch_wait_for(ring_index);
prefetch_set_unused(ring_index);
entry = NULL;
}
}
/* if we don't want the latest version, only accept requests with the exact same LSN */
else
{
if (*force_lsn != slot->effective_request_lsn)
{
prefetch_wait_for(ring_index);
prefetch_set_unused(ring_index);
entry = NULL;
}
}
}
/*
* We received a prefetch for a page that was recently read and
* removed from the buffers. Remove that request from the buffers.
*/
else if (slot->status == PRFS_TAG_REMAINS)
{
prefetch_set_unused(ring_index, true);
prefetch_set_unused(ring_index);
entry = NULL;
}
else
{
@@ -529,9 +686,10 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
* output buffer, and 'not sending' a prefetch request kind of goes
* against the principles of prefetching)
*/
if (MyPState->ring_last + READ_BUFFER_SIZE - 1 == MyPState->ring_unused)
if (MyPState->ring_last + readahead_buffer_size - 1 == MyPState->ring_unused)
{
slot = &MyPState->prf_buffer[(MyPState->ring_last % READ_BUFFER_SIZE)];
uint64 cleanup_index = MyPState->ring_last;
slot = GetPrfSlot(cleanup_index);
Assert(slot->status != PRFS_UNUSED);
@@ -539,13 +697,13 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
switch (slot->status)
{
case PRFS_REQUESTED:
Assert(MyPState->ring_receive == MyPState->ring_last);
prefetch_wait_for(MyPState->ring_last);
prefetch_set_unused(MyPState->ring_last, true);
Assert(MyPState->ring_receive == cleanup_index);
prefetch_wait_for(cleanup_index);
prefetch_set_unused(cleanup_index);
break;
case PRFS_RECEIVED:
case PRFS_TAG_REMAINS:
prefetch_set_unused(MyPState->ring_last, true);
prefetch_set_unused(cleanup_index);
break;
default:
pg_unreachable();
@@ -553,12 +711,11 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
}
/*
* The next buffer pointed to by `ring_unused` is now unused, so we can insert
* the new request to it.
* The next buffer pointed to by `ring_unused` is now definitely empty,
* so we can insert the new request to it.
*/
ring_index = MyPState->ring_unused;
index = (ring_index % READ_BUFFER_SIZE);
slot = &MyPState->prf_buffer[index];
slot = &MyPState->prf_buffer[((ring_index) % readahead_buffer_size)];
Assert(MyPState->ring_last <= ring_index);
@@ -571,12 +728,18 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
slot->buftag = tag;
slot->my_ring_index = ring_index;
prfh_insert(MyPState->prf_hash, slot, &found);
Assert(!found);
prefetch_do_request(slot, force_latest, force_lsn);
Assert(slot->status == PRFS_REQUESTED);
Assert(ring_index < MyPState->ring_unused);
Assert(MyPState->ring_last <= ring_index &&
ring_index < MyPState->ring_unused);
if (flush_every_n_requests > 0 &&
MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests)
{
page_server->flush();
MyPState->ring_flush = MyPState->ring_unused;
}
return ring_index;
}
@@ -585,6 +748,7 @@ page_server_request(void const *req)
{
page_server->send((NeonRequest *) req);
page_server->flush();
MyPState->ring_flush = MyPState->ring_unused;
consume_prefetch_responses();
return page_server->receive();
}
@@ -1052,14 +1216,18 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, ch
void
neon_init(void)
{
HASHCTL info;
Size prfs_size;
if (MyPState != NULL)
return;
MyPState = MemoryContextAllocZero(TopMemoryContext, sizeof(PrefetchState));
prfs_size = offsetof(PrefetchState, prf_buffer) + (
sizeof(PrefetchRequest) * readahead_buffer_size
);
MyPState = MemoryContextAllocZero(TopMemoryContext, prfs_size);
MyPState->n_unused = READ_BUFFER_SIZE;
MyPState->n_unused = readahead_buffer_size;
MyPState->bufctx = SlabContextCreate(TopMemoryContext,
"NeonSMGR/prefetch",
@@ -1072,11 +1240,8 @@ neon_init(void)
"NeonSMGR/prefetch",
ALLOCSET_DEFAULT_SIZES);
info.keysize = sizeof(BufferTag);
info.entrysize = sizeof(uint64);
MyPState->prf_hash = prfh_create(MyPState->hashctx,
READ_BUFFER_SIZE, NULL);
readahead_buffer_size, NULL);
#ifdef DEBUG_COMPARE_LOCAL
mdinit();
@@ -1470,7 +1635,7 @@ neon_close(SMgrRelation reln, ForkNumber forknum)
bool
neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
{
uint64 ring_index;
uint64 ring_index PG_USED_FOR_ASSERTS_ONLY;
switch (reln->smgr_relpersistence)
{
@@ -1565,9 +1730,9 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
if (entry != NULL)
{
if (entry->slot->effective_request_lsn >= prefetch_lsn)
slot = entry->slot;
if (slot->effective_request_lsn >= request_lsn)
{
slot = entry->slot;
ring_index = slot->my_ring_index;
n_prefetch_hits += 1;
}
@@ -1578,13 +1743,12 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
* unlikely this happens, but it can happen if prefetch distance is
* large enough and a backend didn't consume all prefetch requests.
*/
if (entry->slot->status == PRFS_REQUESTED)
if (slot->status == PRFS_REQUESTED)
{
page_server->flush();
prefetch_wait_for(entry->slot->my_ring_index);
prefetch_wait_for(slot->my_ring_index);
}
/* drop caches */
prefetch_set_unused(entry->slot->my_ring_index, true);
prefetch_set_unused(slot->my_ring_index);
n_prefetch_missed_caches += 1;
/* make it look like a prefetch cache miss */
entry = NULL;
@@ -1597,16 +1761,15 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
ring_index = prefetch_register_buffer(buftag, &request_latest,
&request_lsn);
slot = &MyPState->prf_buffer[(ring_index % READ_BUFFER_SIZE)];
slot = GetPrfSlot(ring_index);
}
Assert(slot->my_ring_index == ring_index);
Assert(MyPState->ring_last <= ring_index &&
MyPState->ring_unused > ring_index);
Assert(slot->my_ring_index == ring_index);
Assert(slot->status != PRFS_UNUSED);
Assert(&MyPState->prf_buffer[(ring_index % READ_BUFFER_SIZE)] == slot);
Assert(GetPrfSlot(ring_index) == slot);
page_server->flush();
prefetch_wait_for(ring_index);
Assert(slot->status == PRFS_RECEIVED);
@@ -1637,7 +1800,7 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
}
/* buffer was used, clean up for later reuse */
prefetch_set_unused(ring_index, true);
prefetch_set_unused(ring_index);
prefetch_cleanup();
}