Run compaction on the buffer holding received buffers when useful (#3028)

This cleans up unused entries and reduces the chance of prefetch buffer
thrashing.
This commit is contained in:
MMeent
2022-12-08 09:49:43 +01:00
committed by GitHub
parent e1ef62f086
commit 0d04cd0b99

View File

@@ -242,6 +242,14 @@ PrefetchState *MyPState;
) \
)
#define ReceiveBufferNeedsCompaction() (\
(MyPState->n_responses_buffered / 8) < ( \
MyPState->ring_receive - \
MyPState->ring_last - \
MyPState->n_responses_buffered \
) \
)
int n_prefetch_hits = 0;
int n_prefetch_misses = 0;
int n_prefetch_missed_caches = 0;
@@ -249,17 +257,99 @@ int n_prefetch_dupes = 0;
XLogRecPtr prefetch_lsn = 0;
static bool compact_prefetch_buffers(void);
static void consume_prefetch_responses(void);
static uint64 prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_lsn);
static bool prefetch_read(PrefetchRequest *slot);
static void prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn);
static bool prefetch_wait_for(uint64 ring_index);
static void prefetch_cleanup(void);
static void prefetch_cleanup_trailing_unused(void);
static inline void prefetch_set_unused(uint64 ring_index);
static XLogRecPtr neon_get_request_lsn(bool *latest, RelFileNode rnode,
ForkNumber forknum, BlockNumber blkno);
static bool
compact_prefetch_buffers(void)
{
uint64 empty_ring_index = MyPState->ring_last;
uint64 search_ring_index = MyPState->ring_receive;
int n_moved = 0;
if (MyPState->ring_receive == MyPState->ring_last)
return false;
while (search_ring_index > MyPState->ring_last)
{
search_ring_index--;
if (GetPrfSlot(search_ring_index)->status == PRFS_UNUSED)
{
empty_ring_index = search_ring_index;
break;
}
}
/*
* Here we have established:
* slots < search_ring_index may be unused (not scanned)
* slots >= search_ring_index and <= empty_ring_index are unused
* slots > empty_ring_index are in use, or outside our buffer's range.
*
* Therefore, there is a gap of at least one unused items between
* search_ring_index and empty_ring_index, which grows as we hit
* more unused items while moving backwards through the array.
*/
while (search_ring_index > MyPState->ring_last)
{
PrefetchRequest *source_slot;
PrefetchRequest *target_slot;
bool found;
search_ring_index--;
source_slot = GetPrfSlot(search_ring_index);
if (source_slot->status == PRFS_UNUSED)
continue;
target_slot = GetPrfSlot(empty_ring_index);
Assert(source_slot->status == PRFS_RECEIVED);
Assert(target_slot->status == PRFS_UNUSED);
target_slot->buftag = source_slot->buftag;
target_slot->status = source_slot->status;
target_slot->response = source_slot->response;
target_slot->effective_request_lsn = source_slot->effective_request_lsn;
target_slot->my_ring_index = empty_ring_index;
prfh_delete(MyPState->prf_hash, source_slot);
prfh_insert(MyPState->prf_hash, target_slot, &found);
Assert(!found);
/* Adjust the location of our known-empty slot */
empty_ring_index--;
source_slot->status = PRFS_UNUSED;
source_slot->buftag = (BufferTag) {0};
source_slot->response = NULL;
source_slot->my_ring_index = 0;
source_slot->effective_request_lsn = 0;
n_moved++;
}
if (MyPState->ring_last != empty_ring_index)
{
MyPState->ring_last = empty_ring_index;
return true;
}
return false;
}
void
readahead_buffer_resize(int newsize, void *extra)
{
@@ -323,7 +413,7 @@ readahead_buffer_resize(int newsize, void *extra)
prfh_insert(newPState->prf_hash, newslot, &found);
Assert(!found);
switch (newslot->status)
{
case PRFS_UNUSED:
@@ -370,7 +460,7 @@ consume_prefetch_responses(void)
}
static void
prefetch_cleanup(void)
prefetch_cleanup_trailing_unused(void)
{
uint64 ring_index;
PrefetchRequest *slot;
@@ -531,7 +621,10 @@ prefetch_set_unused(uint64 ring_index)
/* run cleanup if we're holding back ring_last */
if (MyPState->ring_last == ring_index)
prefetch_cleanup();
prefetch_cleanup_trailing_unused();
/* ... and try to store the buffered responses more compactly if > 12.5% of the buffer is gaps */
else if (ReceiveBufferNeedsCompaction())
compact_prefetch_buffers();
}
static void
@@ -702,20 +795,31 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
Assert(slot->status != PRFS_UNUSED);
/* We have the slot for ring_last, so that must still be in progress */
switch (slot->status)
/*
* If there is good reason to run compaction on the prefetch buffers,
* try to do that.
*/
if (ReceiveBufferNeedsCompaction() && compact_prefetch_buffers())
{
case PRFS_REQUESTED:
Assert(MyPState->ring_receive == cleanup_index);
prefetch_wait_for(cleanup_index);
prefetch_set_unused(cleanup_index);
break;
case PRFS_RECEIVED:
case PRFS_TAG_REMAINS:
prefetch_set_unused(cleanup_index);
break;
default:
pg_unreachable();
Assert(slot->status == PRFS_UNUSED);
}
else
{
/* We have the slot for ring_last, so that must still be in progress */
switch (slot->status)
{
case PRFS_REQUESTED:
Assert(MyPState->ring_receive == cleanup_index);
prefetch_wait_for(cleanup_index);
prefetch_set_unused(cleanup_index);
break;
case PRFS_RECEIVED:
case PRFS_TAG_REMAINS:
prefetch_set_unused(cleanup_index);
break;
default:
pg_unreachable();
}
}
}
@@ -1816,7 +1920,7 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
/* buffer was used, clean up for later reuse */
prefetch_set_unused(ring_index);
prefetch_cleanup();
prefetch_cleanup_trailing_unused();
}
/*