diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 434a1c2b85..76f53aae0b 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -242,6 +242,14 @@ PrefetchState *MyPState; ) \ ) +#define ReceiveBufferNeedsCompaction() (\ + (MyPState->n_responses_buffered / 8) < ( \ + MyPState->ring_receive - \ + MyPState->ring_last - \ + MyPState->n_responses_buffered \ + ) \ +) + int n_prefetch_hits = 0; int n_prefetch_misses = 0; int n_prefetch_missed_caches = 0; @@ -249,17 +257,99 @@ int n_prefetch_dupes = 0; XLogRecPtr prefetch_lsn = 0; +static bool compact_prefetch_buffers(void); static void consume_prefetch_responses(void); static uint64 prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_lsn); static bool prefetch_read(PrefetchRequest *slot); static void prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn); static bool prefetch_wait_for(uint64 ring_index); -static void prefetch_cleanup(void); +static void prefetch_cleanup_trailing_unused(void); static inline void prefetch_set_unused(uint64 ring_index); static XLogRecPtr neon_get_request_lsn(bool *latest, RelFileNode rnode, ForkNumber forknum, BlockNumber blkno); +static bool +compact_prefetch_buffers(void) +{ + uint64 empty_ring_index = MyPState->ring_last; + uint64 search_ring_index = MyPState->ring_receive; + int n_moved = 0; + + if (MyPState->ring_receive == MyPState->ring_last) + return false; + + while (search_ring_index > MyPState->ring_last) + { + search_ring_index--; + if (GetPrfSlot(search_ring_index)->status == PRFS_UNUSED) + { + empty_ring_index = search_ring_index; + break; + } + } + + /* + * Here we have established: + * slots < search_ring_index may be unused (not scanned) + * slots >= search_ring_index and <= empty_ring_index are unused + * slots > empty_ring_index are in use, or outside our buffer's range. + * + * Therefore, there is a gap of at least one unused items between + * search_ring_index and empty_ring_index, which grows as we hit + * more unused items while moving backwards through the array. + */ + + while (search_ring_index > MyPState->ring_last) + { + PrefetchRequest *source_slot; + PrefetchRequest *target_slot; + bool found; + + search_ring_index--; + + source_slot = GetPrfSlot(search_ring_index); + + if (source_slot->status == PRFS_UNUSED) + continue; + + target_slot = GetPrfSlot(empty_ring_index); + + Assert(source_slot->status == PRFS_RECEIVED); + Assert(target_slot->status == PRFS_UNUSED); + + target_slot->buftag = source_slot->buftag; + target_slot->status = source_slot->status; + target_slot->response = source_slot->response; + target_slot->effective_request_lsn = source_slot->effective_request_lsn; + target_slot->my_ring_index = empty_ring_index; + + prfh_delete(MyPState->prf_hash, source_slot); + prfh_insert(MyPState->prf_hash, target_slot, &found); + + Assert(!found); + + /* Adjust the location of our known-empty slot */ + empty_ring_index--; + + source_slot->status = PRFS_UNUSED; + source_slot->buftag = (BufferTag) {0}; + source_slot->response = NULL; + source_slot->my_ring_index = 0; + source_slot->effective_request_lsn = 0; + + n_moved++; + } + + if (MyPState->ring_last != empty_ring_index) + { + MyPState->ring_last = empty_ring_index; + return true; + } + + return false; +} + void readahead_buffer_resize(int newsize, void *extra) { @@ -323,7 +413,7 @@ readahead_buffer_resize(int newsize, void *extra) prfh_insert(newPState->prf_hash, newslot, &found); Assert(!found); - + switch (newslot->status) { case PRFS_UNUSED: @@ -370,7 +460,7 @@ consume_prefetch_responses(void) } static void -prefetch_cleanup(void) +prefetch_cleanup_trailing_unused(void) { uint64 ring_index; PrefetchRequest *slot; @@ -531,7 +621,10 @@ prefetch_set_unused(uint64 ring_index) /* run cleanup if we're holding back ring_last */ if (MyPState->ring_last == ring_index) - prefetch_cleanup(); + prefetch_cleanup_trailing_unused(); + /* ... and try to store the buffered responses more compactly if > 12.5% of the buffer is gaps */ + else if (ReceiveBufferNeedsCompaction()) + compact_prefetch_buffers(); } static void @@ -702,20 +795,31 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls Assert(slot->status != PRFS_UNUSED); - /* We have the slot for ring_last, so that must still be in progress */ - switch (slot->status) + /* + * If there is good reason to run compaction on the prefetch buffers, + * try to do that. + */ + if (ReceiveBufferNeedsCompaction() && compact_prefetch_buffers()) { - case PRFS_REQUESTED: - Assert(MyPState->ring_receive == cleanup_index); - prefetch_wait_for(cleanup_index); - prefetch_set_unused(cleanup_index); - break; - case PRFS_RECEIVED: - case PRFS_TAG_REMAINS: - prefetch_set_unused(cleanup_index); - break; - default: - pg_unreachable(); + Assert(slot->status == PRFS_UNUSED); + } + else + { + /* We have the slot for ring_last, so that must still be in progress */ + switch (slot->status) + { + case PRFS_REQUESTED: + Assert(MyPState->ring_receive == cleanup_index); + prefetch_wait_for(cleanup_index); + prefetch_set_unused(cleanup_index); + break; + case PRFS_RECEIVED: + case PRFS_TAG_REMAINS: + prefetch_set_unused(cleanup_index); + break; + default: + pg_unreachable(); + } } } @@ -1816,7 +1920,7 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, /* buffer was used, clean up for later reuse */ prefetch_set_unused(ring_index); - prefetch_cleanup(); + prefetch_cleanup_trailing_unused(); } /*