mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 14:02:55 +00:00
Update prefetch mechanism: (#2687)
Prefetch requests and responses are stored in a ringbuffer instead of a queue, which means we can utilize prefetches of many relations concurrently -- page reads of un-prefetched relations now don't imply dropping prefetches. In a future iteration, this may detect sequential scans based on the read behavior of sequential scans, and will dynamically prefetch buffers for such relations as needed. Right now, it still depends on explicit prefetch requests from PostgreSQL. The main improvement here is that we now have a buffer for prefetched pages of 128 entries with random access. Before, we had a similarly sized cache, but this cache did not allow for random access, which resulted in dropped entries when multiple systems used the prefetching subsystem concurrently. See also: #2544
This commit is contained in:
@@ -42,6 +42,11 @@ PGconn *pageserver_conn = NULL;
|
||||
|
||||
char *page_server_connstring_raw;
|
||||
|
||||
int n_unflushed_requests = 0;
|
||||
int flush_every_n_requests = 8;
|
||||
|
||||
static void pageserver_flush(void);
|
||||
|
||||
static void
|
||||
pageserver_connect()
|
||||
{
|
||||
@@ -164,6 +169,8 @@ pageserver_disconnect(void)
|
||||
PQfinish(pageserver_conn);
|
||||
pageserver_conn = NULL;
|
||||
connected = false;
|
||||
|
||||
prefetch_on_ps_disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -174,11 +181,7 @@ pageserver_send(NeonRequest * request)
|
||||
|
||||
/* If the connection was lost for some reason, reconnect */
|
||||
if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD)
|
||||
{
|
||||
PQfinish(pageserver_conn);
|
||||
pageserver_conn = NULL;
|
||||
connected = false;
|
||||
}
|
||||
pageserver_disconnect();
|
||||
|
||||
if (!connected)
|
||||
pageserver_connect();
|
||||
@@ -202,6 +205,11 @@ pageserver_send(NeonRequest * request)
|
||||
}
|
||||
pfree(req_buff.data);
|
||||
|
||||
n_unflushed_requests++;
|
||||
|
||||
if (flush_every_n_requests > 0 && n_unflushed_requests >= flush_every_n_requests)
|
||||
pageserver_flush();
|
||||
|
||||
if (message_level_is_interesting(PageStoreTrace))
|
||||
{
|
||||
char *msg = nm_to_string((NeonMessage *) request);
|
||||
@@ -255,25 +263,21 @@ pageserver_receive(void)
|
||||
static void
|
||||
pageserver_flush(void)
|
||||
{
|
||||
if (PQflush(pageserver_conn))
|
||||
if (!connected)
|
||||
{
|
||||
neon_log(WARNING, "Tried to flush while disconnected");
|
||||
}
|
||||
else if (PQflush(pageserver_conn))
|
||||
{
|
||||
char *msg = PQerrorMessage(pageserver_conn);
|
||||
|
||||
pageserver_disconnect();
|
||||
neon_log(ERROR, "failed to flush page requests: %s", msg);
|
||||
}
|
||||
}
|
||||
|
||||
static NeonResponse *
|
||||
pageserver_call(NeonRequest * request)
|
||||
{
|
||||
pageserver_send(request);
|
||||
pageserver_flush();
|
||||
return pageserver_receive();
|
||||
n_unflushed_requests = 0;
|
||||
}
|
||||
|
||||
page_server_api api = {
|
||||
.request = pageserver_call,
|
||||
.send = pageserver_send,
|
||||
.flush = pageserver_flush,
|
||||
.receive = pageserver_receive
|
||||
@@ -427,6 +431,14 @@ pg_init_libpagestore(void)
|
||||
PGC_SIGHUP,
|
||||
GUC_UNIT_MB,
|
||||
NULL, NULL, NULL);
|
||||
DefineCustomIntVariable("neon.flush_output_after",
|
||||
"Flush the output buffer after every N unflushed requests",
|
||||
NULL,
|
||||
&flush_every_n_requests,
|
||||
8, -1, INT_MAX,
|
||||
PGC_SIGHUP,
|
||||
0, /* no flags required */
|
||||
NULL, NULL, NULL);
|
||||
|
||||
relsize_hash_init();
|
||||
|
||||
|
||||
@@ -115,6 +115,8 @@ typedef struct
|
||||
char page[FLEXIBLE_ARRAY_MEMBER];
|
||||
} NeonGetPageResponse;
|
||||
|
||||
#define PS_GETPAGERESPONSE_SIZE (MAXALIGN(offsetof(NeonGetPageResponse, page) + BLCKSZ))
|
||||
|
||||
typedef struct
|
||||
{
|
||||
NeonMessageTag tag;
|
||||
@@ -138,15 +140,18 @@ extern char *nm_to_string(NeonMessage * msg);
|
||||
|
||||
typedef struct
|
||||
{
|
||||
NeonResponse *(*request) (NeonRequest * request);
|
||||
void (*send) (NeonRequest * request);
|
||||
NeonResponse *(*receive) (void);
|
||||
void (*flush) (void);
|
||||
} page_server_api;
|
||||
|
||||
extern void prefetch_on_ps_disconnect(void);
|
||||
|
||||
extern page_server_api * page_server;
|
||||
|
||||
extern char *page_server_connstring;
|
||||
extern bool seqscan_prefetch_enabled;
|
||||
extern int seqscan_prefetch_distance;
|
||||
extern char *neon_timeline;
|
||||
extern char *neon_tenant;
|
||||
extern bool wal_redo;
|
||||
@@ -167,7 +172,6 @@ extern void neon_extend(SMgrRelation reln, ForkNumber forknum,
|
||||
BlockNumber blocknum, char *buffer, bool skipFsync);
|
||||
extern bool neon_prefetch(SMgrRelation reln, ForkNumber forknum,
|
||||
BlockNumber blocknum);
|
||||
extern void neon_reset_prefetch(SMgrRelation reln);
|
||||
extern void neon_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
char *buffer);
|
||||
|
||||
|
||||
@@ -49,22 +49,20 @@
|
||||
#include "access/xlog.h"
|
||||
#include "access/xloginsert.h"
|
||||
#include "access/xlog_internal.h"
|
||||
#include "catalog/pg_class.h"
|
||||
#include "pagestore_client.h"
|
||||
#include "pagestore_client.h"
|
||||
#include "storage/smgr.h"
|
||||
#include "access/xlogdefs.h"
|
||||
#include "catalog/pg_class.h"
|
||||
#include "common/hashfn.h"
|
||||
#include "pagestore_client.h"
|
||||
#include "postmaster/interrupt.h"
|
||||
#include "postmaster/autovacuum.h"
|
||||
#include "replication/walsender.h"
|
||||
#include "storage/bufmgr.h"
|
||||
#include "storage/relfilenode.h"
|
||||
#include "storage/buf_internals.h"
|
||||
#include "storage/smgr.h"
|
||||
#include "storage/md.h"
|
||||
#include "fmgr.h"
|
||||
#include "miscadmin.h"
|
||||
#include "pgstat.h"
|
||||
#include "catalog/pg_tablespace_d.h"
|
||||
#include "postmaster/autovacuum.h"
|
||||
|
||||
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
#include "access/xlogutils.h"
|
||||
@@ -113,48 +111,482 @@ typedef enum
|
||||
static SMgrRelation unlogged_build_rel = NULL;
|
||||
static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
|
||||
|
||||
|
||||
/*
|
||||
* Prefetch implementation:
|
||||
*
|
||||
* Prefetch is performed locally by each backend.
|
||||
* There can be up to MAX_PREFETCH_REQUESTS registered using smgr_prefetch
|
||||
* before smgr_read. All this requests are appended to primary smgr_read request.
|
||||
* It is assumed that pages will be requested in prefetch order.
|
||||
* Reading of prefetch responses is delayed until them are actually needed (smgr_read).
|
||||
* It make it possible to parallelize processing and receiving of prefetched pages.
|
||||
* In case of prefetch miss or any other SMGR request other than smgr_read,
|
||||
* all prefetch responses has to be consumed.
|
||||
*
|
||||
* There can be up to READ_BUFFER_SIZE active IO requests registered at any
|
||||
* time. Requests using smgr_prefetch are sent to the pageserver, but we don't
|
||||
* wait on the response. Requests using smgr_read are either read from the
|
||||
* buffer, or (if that's not possible) we wait on the response to arrive -
|
||||
* this also will allow us to receive other prefetched pages.
|
||||
* Each request is immediately written to the output buffer of the pageserver
|
||||
* connection, but may not be flushed if smgr_prefetch is used: pageserver
|
||||
* flushes sent requests on manual flush, or every neon.flush_output_after
|
||||
* unflushed requests; which is not necessarily always and all the time.
|
||||
*
|
||||
* Once we have received a response, this value will be stored in the response
|
||||
* buffer, indexed in a hash table. This allows us to retain our buffered
|
||||
* prefetch responses even when we have cache misses.
|
||||
*
|
||||
* Reading of prefetch responses is delayed until them are actually needed
|
||||
* (smgr_read). In case of prefetch miss or any other SMGR request other than
|
||||
* smgr_read, all prefetch responses in the pipeline will need to be read from
|
||||
* the connection; the responses are stored for later use.
|
||||
*
|
||||
* NOTE: The current implementation of the prefetch system implements a ring
|
||||
* buffer of up to READ_BUFFER_SIZE requests. If there are more _read and
|
||||
* _prefetch requests between the initial _prefetch and the _read of a buffer,
|
||||
* the prefetch request will have been dropped from this prefetch buffer, and
|
||||
* your prefetch was wasted.
|
||||
*/
|
||||
|
||||
#define MAX_PREFETCH_REQUESTS 128
|
||||
/* Max amount of tracked buffer reads */
|
||||
#define READ_BUFFER_SIZE 128
|
||||
|
||||
BufferTag prefetch_requests[MAX_PREFETCH_REQUESTS];
|
||||
BufferTag prefetch_responses[MAX_PREFETCH_REQUESTS];
|
||||
int n_prefetch_requests;
|
||||
int n_prefetch_responses;
|
||||
int n_prefetched_buffers;
|
||||
int n_prefetch_hits;
|
||||
int n_prefetch_misses;
|
||||
XLogRecPtr prefetch_lsn;
|
||||
typedef enum PrefetchStatus {
|
||||
PRFS_UNUSED = 0, /* unused slot */
|
||||
PRFS_REQUESTED, /* request was written to the sendbuffer to PS, but not
|
||||
* necessarily flushed.
|
||||
* all fields except response valid */
|
||||
PRFS_RECEIVED, /* all fields valid */
|
||||
PRFS_TAG_REMAINS, /* only buftag and my_ring_index are still valid */
|
||||
} PrefetchStatus;
|
||||
|
||||
typedef struct PrefetchRequest {
|
||||
BufferTag buftag; /* must be first entry in the struct */
|
||||
XLogRecPtr effective_request_lsn;
|
||||
NeonResponse *response; /* may be null */
|
||||
PrefetchStatus status;
|
||||
uint64 my_ring_index;
|
||||
} PrefetchRequest;
|
||||
|
||||
/* prefetch buffer lookup hash table */
|
||||
|
||||
typedef struct PrfHashEntry {
|
||||
PrefetchRequest *slot;
|
||||
uint32 status;
|
||||
uint32 hash;
|
||||
} PrfHashEntry;
|
||||
|
||||
#define SH_PREFIX prfh
|
||||
#define SH_ELEMENT_TYPE PrfHashEntry
|
||||
#define SH_KEY_TYPE PrefetchRequest *
|
||||
#define SH_KEY slot
|
||||
#define SH_STORE_HASH
|
||||
#define SH_GET_HASH(tb, a) ((a)->hash)
|
||||
#define SH_HASH_KEY(tb, key) hash_bytes( \
|
||||
((const unsigned char *) &(key)->buftag), \
|
||||
sizeof(BufferTag) \
|
||||
)
|
||||
|
||||
#define SH_EQUAL(tb, a, b) (BUFFERTAGS_EQUAL((a)->buftag, (b)->buftag))
|
||||
#define SH_SCOPE static inline
|
||||
#define SH_DEFINE
|
||||
#define SH_DECLARE
|
||||
#include "lib/simplehash.h"
|
||||
|
||||
/*
|
||||
* PrefetchState maintains the state of (prefetch) getPage@LSN requests.
|
||||
* It maintains a (ring) buffer of in-flight requests and responses.
|
||||
*
|
||||
* We maintain several indexes into the ring buffer:
|
||||
* ring_unused >= ring_receive >= ring_last >= 0
|
||||
*
|
||||
* ring_unused points to the first unused slot of the buffer
|
||||
* ring_receive is the next request that is to be received
|
||||
* ring_last is the oldest received entry in the buffer
|
||||
*
|
||||
* Apart from being an entry in the ring buffer of prefetch requests, each
|
||||
* PrefetchRequest that is not UNUSED is indexed in prf_hash by buftag.
|
||||
*/
|
||||
typedef struct PrefetchState {
|
||||
MemoryContext bufctx; /* context for prf_buffer[].response allocations */
|
||||
MemoryContext errctx; /* context for prf_buffer[].response allocations */
|
||||
MemoryContext hashctx; /* context for prf_buffer */
|
||||
|
||||
/* buffer indexes */
|
||||
uint64 ring_unused; /* first unused slot */
|
||||
uint64 ring_receive; /* next slot that is to receive a response */
|
||||
uint64 ring_last; /* min slot with a response value */
|
||||
|
||||
/* metrics / statistics */
|
||||
int n_responses_buffered; /* count of PS responses not yet in buffers */
|
||||
int n_requests_inflight; /* count of PS requests considered in flight */
|
||||
int n_unused; /* count of buffers < unused, > last, that are also unused */
|
||||
|
||||
/* the buffers */
|
||||
prfh_hash *prf_hash;
|
||||
PrefetchRequest prf_buffer[READ_BUFFER_SIZE]; /* prefetch buffers */
|
||||
} PrefetchState;
|
||||
|
||||
PrefetchState *MyPState;
|
||||
|
||||
int n_prefetch_hits = 0;
|
||||
int n_prefetch_misses = 0;
|
||||
int n_prefetch_missed_caches = 0;
|
||||
int n_prefetch_dupes = 0;
|
||||
|
||||
XLogRecPtr prefetch_lsn = 0;
|
||||
|
||||
static void consume_prefetch_responses(void);
|
||||
static uint64 prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_lsn);
|
||||
static void prefetch_read(PrefetchRequest *slot);
|
||||
static void prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn);
|
||||
static void prefetch_wait_for(uint64 ring_index);
|
||||
static void prefetch_cleanup(void);
|
||||
static inline void prefetch_set_unused(uint64 ring_index, bool hash_cleanup);
|
||||
|
||||
static XLogRecPtr neon_get_request_lsn(bool *latest, RelFileNode rnode,
|
||||
ForkNumber forknum, BlockNumber blkno);
|
||||
|
||||
|
||||
/*
|
||||
* Make sure that there are no responses still in the buffer.
|
||||
*/
|
||||
static void
|
||||
consume_prefetch_responses(void)
|
||||
{
|
||||
for (int i = n_prefetched_buffers; i < n_prefetch_responses; i++)
|
||||
{
|
||||
NeonResponse *resp = page_server->receive();
|
||||
if (MyPState->ring_receive < MyPState->ring_unused)
|
||||
prefetch_wait_for(MyPState->ring_unused - 1);
|
||||
}
|
||||
|
||||
pfree(resp);
|
||||
static void
|
||||
prefetch_cleanup(void)
|
||||
{
|
||||
int index;
|
||||
uint64 ring_index;
|
||||
PrefetchRequest *slot;
|
||||
|
||||
while (MyPState->ring_last < MyPState->ring_receive) {
|
||||
ring_index = MyPState->ring_last;
|
||||
index = (ring_index % READ_BUFFER_SIZE);
|
||||
slot = &MyPState->prf_buffer[index];
|
||||
|
||||
if (slot->status == PRFS_UNUSED)
|
||||
MyPState->ring_last += 1;
|
||||
else
|
||||
break;
|
||||
}
|
||||
n_prefetched_buffers = 0;
|
||||
n_prefetch_responses = 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* Wait for slot of ring_index to have received its response.
|
||||
* The caller is responsible for making sure the request buffer is flushed.
|
||||
*/
|
||||
static void
|
||||
prefetch_wait_for(uint64 ring_index)
|
||||
{
|
||||
int index;
|
||||
PrefetchRequest *entry;
|
||||
|
||||
Assert(MyPState->ring_unused > ring_index);
|
||||
|
||||
while (MyPState->ring_receive <= ring_index)
|
||||
{
|
||||
index = (MyPState->ring_receive % READ_BUFFER_SIZE);
|
||||
entry = &MyPState->prf_buffer[index];
|
||||
|
||||
Assert(entry->status == PRFS_REQUESTED);
|
||||
prefetch_read(entry);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Read the response of a prefetch request into its slot.
|
||||
*
|
||||
* The caller is responsible for making sure that the request for this buffer
|
||||
* was flushed to the PageServer.
|
||||
*/
|
||||
static void
|
||||
prefetch_read(PrefetchRequest *slot)
|
||||
{
|
||||
NeonResponse *response;
|
||||
MemoryContext old;
|
||||
|
||||
Assert(slot->status == PRFS_REQUESTED);
|
||||
Assert(slot->response == NULL);
|
||||
Assert(slot->my_ring_index == MyPState->ring_receive);
|
||||
|
||||
old = MemoryContextSwitchTo(MyPState->errctx);
|
||||
response = (NeonResponse *) page_server->receive();
|
||||
MemoryContextSwitchTo(old);
|
||||
|
||||
/* update prefetch state */
|
||||
MyPState->n_responses_buffered += 1;
|
||||
MyPState->n_requests_inflight -= 1;
|
||||
MyPState->ring_receive += 1;
|
||||
|
||||
/* update slot state */
|
||||
slot->status = PRFS_RECEIVED;
|
||||
slot->response = response;
|
||||
}
|
||||
|
||||
/*
|
||||
* Disconnect hook - drop prefetches when the connection drops
|
||||
*
|
||||
* If we don't remove the failed prefetches, we'd be serving incorrect
|
||||
* data to the smgr.
|
||||
*/
|
||||
void
|
||||
prefetch_on_ps_disconnect(void)
|
||||
{
|
||||
for (; MyPState->ring_receive < MyPState->ring_unused; MyPState->ring_receive++)
|
||||
{
|
||||
PrefetchRequest *slot;
|
||||
int index = MyPState->ring_receive % READ_BUFFER_SIZE;
|
||||
|
||||
slot = &MyPState->prf_buffer[index];
|
||||
Assert(slot->status == PRFS_REQUESTED);
|
||||
Assert(slot->my_ring_index == MyPState->ring_receive);
|
||||
|
||||
/* clean up the request */
|
||||
slot->status = PRFS_TAG_REMAINS;
|
||||
MyPState->n_requests_inflight--;
|
||||
prefetch_set_unused(MyPState->ring_receive, true);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* prefetch_set_unused() - clear a received prefetch slot
|
||||
*
|
||||
* The slot at ring_index must be a current member of the ring buffer,
|
||||
* and may not be in the PRFS_REQUESTED state.
|
||||
*/
|
||||
static inline void
|
||||
prefetch_set_unused(uint64 ring_index, bool hash_cleanup)
|
||||
{
|
||||
PrefetchRequest *slot = &MyPState->prf_buffer[ring_index % READ_BUFFER_SIZE];
|
||||
|
||||
Assert(MyPState->ring_last <= ring_index &&
|
||||
MyPState->ring_unused > ring_index);
|
||||
|
||||
if (slot->status == PRFS_UNUSED)
|
||||
return;
|
||||
|
||||
Assert(slot->status == PRFS_RECEIVED || slot->status == PRFS_TAG_REMAINS);
|
||||
Assert(ring_index >= MyPState->ring_last &&
|
||||
ring_index < MyPState->ring_unused);
|
||||
|
||||
if (slot->status == PRFS_RECEIVED)
|
||||
{
|
||||
pfree(slot->response);
|
||||
slot->response = NULL;
|
||||
|
||||
MyPState->n_responses_buffered -= 1;
|
||||
MyPState->n_unused += 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
Assert(slot->response == NULL);
|
||||
}
|
||||
|
||||
if (hash_cleanup)
|
||||
prfh_delete(MyPState->prf_hash, slot);
|
||||
|
||||
/* clear all fields */
|
||||
MemSet(slot, 0, sizeof(PrefetchRequest));
|
||||
slot->status = PRFS_UNUSED;
|
||||
|
||||
/* run cleanup if we're holding back ring_last */
|
||||
if (MyPState->ring_last == ring_index)
|
||||
prefetch_cleanup();
|
||||
}
|
||||
|
||||
static void
|
||||
prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn)
|
||||
{
|
||||
NeonGetPageRequest request = {
|
||||
.req.tag = T_NeonGetPageRequest,
|
||||
.req.latest = false,
|
||||
.req.lsn = 0,
|
||||
.rnode = slot->buftag.rnode,
|
||||
.forknum = slot->buftag.forkNum,
|
||||
.blkno = slot->buftag.blockNum,
|
||||
};
|
||||
|
||||
if (force_lsn && force_latest)
|
||||
{
|
||||
request.req.lsn = *force_lsn;
|
||||
request.req.latest = *force_latest;
|
||||
slot->effective_request_lsn = *force_lsn;
|
||||
}
|
||||
else
|
||||
{
|
||||
XLogRecPtr lsn = neon_get_request_lsn(
|
||||
&request.req.latest,
|
||||
slot->buftag.rnode,
|
||||
slot->buftag.forkNum,
|
||||
slot->buftag.blockNum
|
||||
);
|
||||
/*
|
||||
* Note: effective_request_lsn is potentially higher than the requested
|
||||
* LSN, but still correct:
|
||||
*
|
||||
* We know there are no changes between the actual requested LSN and
|
||||
* the value of effective_request_lsn: If there were, the page would
|
||||
* have been in cache and evicted between those LSN values, which
|
||||
* then would have had to result in a larger request LSN for this page.
|
||||
*
|
||||
* It is possible that a concurrent backend loads the page, modifies
|
||||
* it and then evicts it again, but the LSN of that eviction cannot be
|
||||
* smaller than the current WAL insert/redo pointer, which is already
|
||||
* larger than this prefetch_lsn. So in any case, that would
|
||||
* invalidate this cache.
|
||||
*
|
||||
* The best LSN to use for effective_request_lsn would be
|
||||
* XLogCtl->Insert.RedoRecPtr, but that's expensive to access.
|
||||
*/
|
||||
request.req.lsn = lsn;
|
||||
prefetch_lsn = Max(prefetch_lsn, lsn);
|
||||
slot->effective_request_lsn = prefetch_lsn;
|
||||
}
|
||||
|
||||
Assert(slot->response == NULL);
|
||||
Assert(slot->my_ring_index == MyPState->ring_unused);
|
||||
page_server->send((NeonRequest *) &request);
|
||||
|
||||
/* update prefetch state */
|
||||
MyPState->n_requests_inflight += 1;
|
||||
MyPState->n_unused -= 1;
|
||||
MyPState->ring_unused += 1;
|
||||
|
||||
/* update slot state */
|
||||
slot->status = PRFS_REQUESTED;
|
||||
}
|
||||
|
||||
/*
|
||||
* prefetch_register_buffer() - register and prefetch buffer
|
||||
*
|
||||
* Register that we may want the contents of BufferTag in the near future.
|
||||
*
|
||||
* If force_latest and force_lsn are not NULL, those values are sent to the
|
||||
* pageserver. If they are NULL, we utilize the lastWrittenLsn -infrastructure
|
||||
* to fill in these values manually.
|
||||
*/
|
||||
|
||||
static uint64
|
||||
prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_lsn)
|
||||
{
|
||||
int index;
|
||||
bool found;
|
||||
uint64 ring_index;
|
||||
PrefetchRequest req;
|
||||
PrefetchRequest *slot;
|
||||
PrfHashEntry *entry;
|
||||
|
||||
/* use an intermediate PrefetchRequest struct to ensure correct alignment */
|
||||
req.buftag = tag;
|
||||
|
||||
entry = prfh_lookup(MyPState->prf_hash, (PrefetchRequest *) &req);
|
||||
|
||||
if (entry != NULL)
|
||||
{
|
||||
slot = entry->slot;
|
||||
ring_index = slot->my_ring_index;
|
||||
index = (ring_index % READ_BUFFER_SIZE);
|
||||
Assert(slot == &MyPState->prf_buffer[index]);
|
||||
|
||||
Assert(slot->status != PRFS_UNUSED);
|
||||
Assert(BUFFERTAGS_EQUAL(slot->buftag, tag));
|
||||
|
||||
/*
|
||||
* If we want a specific lsn, we do not accept requests that were made
|
||||
* with a potentially different LSN.
|
||||
*/
|
||||
if (force_lsn && slot->effective_request_lsn != *force_lsn)
|
||||
{
|
||||
prefetch_wait_for(ring_index);
|
||||
prefetch_set_unused(ring_index, true);
|
||||
}
|
||||
/*
|
||||
* We received a prefetch for a page that was recently read and
|
||||
* removed from the buffers. Remove that request from the buffers.
|
||||
*/
|
||||
else if (slot->status == PRFS_TAG_REMAINS)
|
||||
{
|
||||
prefetch_set_unused(ring_index, true);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* The buffered request is good enough, return that index */
|
||||
n_prefetch_dupes++;
|
||||
return ring_index;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* If the prefetch queue is full, we need to make room by clearing the
|
||||
* oldest slot. If the oldest slot holds a buffer that was already
|
||||
* received, we can just throw it away; we fetched the page unnecessarily
|
||||
* in that case. If the oldest slot holds a request that we haven't
|
||||
* received a response for yet, we have to wait for the response to that
|
||||
* before we can continue. We might not have even flushed the request to
|
||||
* the pageserver yet, it might be just sitting in the output buffer. In
|
||||
* that case, we flush it and wait for the response. (We could decide not
|
||||
* to send it, but it's hard to abort when the request is already in the
|
||||
* output buffer, and 'not sending' a prefetch request kind of goes
|
||||
* against the principles of prefetching)
|
||||
*/
|
||||
if (MyPState->ring_last + READ_BUFFER_SIZE - 1 == MyPState->ring_unused)
|
||||
{
|
||||
slot = &MyPState->prf_buffer[(MyPState->ring_last % READ_BUFFER_SIZE)];
|
||||
|
||||
Assert(slot->status != PRFS_UNUSED);
|
||||
|
||||
/* We have the slot for ring_last, so that must still be in progress */
|
||||
switch (slot->status)
|
||||
{
|
||||
case PRFS_REQUESTED:
|
||||
Assert(MyPState->ring_receive == MyPState->ring_last);
|
||||
prefetch_wait_for(MyPState->ring_last);
|
||||
prefetch_set_unused(MyPState->ring_last, true);
|
||||
break;
|
||||
case PRFS_RECEIVED:
|
||||
case PRFS_TAG_REMAINS:
|
||||
prefetch_set_unused(MyPState->ring_last, true);
|
||||
break;
|
||||
default:
|
||||
pg_unreachable();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* The next buffer pointed to by `ring_unused` is now unused, so we can insert
|
||||
* the new request to it.
|
||||
*/
|
||||
ring_index = MyPState->ring_unused;
|
||||
index = (ring_index % READ_BUFFER_SIZE);
|
||||
slot = &MyPState->prf_buffer[index];
|
||||
|
||||
Assert(MyPState->ring_last <= ring_index);
|
||||
|
||||
Assert(slot->status == PRFS_UNUSED);
|
||||
|
||||
/*
|
||||
* We must update the slot data before insertion, because the hash
|
||||
* function reads the buffer tag from the slot.
|
||||
*/
|
||||
slot->buftag = tag;
|
||||
slot->my_ring_index = ring_index;
|
||||
|
||||
prfh_insert(MyPState->prf_hash, slot, &found);
|
||||
Assert(!found);
|
||||
|
||||
prefetch_do_request(slot, force_latest, force_lsn);
|
||||
Assert(slot->status == PRFS_REQUESTED);
|
||||
Assert(ring_index < MyPState->ring_unused);
|
||||
return ring_index;
|
||||
}
|
||||
|
||||
static NeonResponse *
|
||||
page_server_request(void const *req)
|
||||
{
|
||||
page_server->send((NeonRequest *) req);
|
||||
page_server->flush();
|
||||
consume_prefetch_responses();
|
||||
return page_server->request((NeonRequest *) req);
|
||||
return page_server->receive();
|
||||
}
|
||||
|
||||
|
||||
@@ -268,12 +700,15 @@ nm_unpack_response(StringInfo s)
|
||||
|
||||
case T_NeonGetPageResponse:
|
||||
{
|
||||
NeonGetPageResponse *msg_resp = palloc0(offsetof(NeonGetPageResponse, page) + BLCKSZ);
|
||||
NeonGetPageResponse *msg_resp;
|
||||
|
||||
msg_resp = MemoryContextAllocZero(MyPState->bufctx, PS_GETPAGERESPONSE_SIZE);
|
||||
msg_resp->tag = tag;
|
||||
/* XXX: should be varlena */
|
||||
memcpy(msg_resp->page, pq_getmsgbytes(s, BLCKSZ), BLCKSZ);
|
||||
pq_getmsgend(s);
|
||||
|
||||
Assert(msg_resp->tag == T_NeonGetPageResponse);
|
||||
|
||||
resp = (NeonResponse *) msg_resp;
|
||||
break;
|
||||
@@ -617,7 +1052,32 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, ch
|
||||
void
|
||||
neon_init(void)
|
||||
{
|
||||
/* noop */
|
||||
HASHCTL info;
|
||||
|
||||
if (MyPState != NULL)
|
||||
return;
|
||||
|
||||
MyPState = MemoryContextAllocZero(TopMemoryContext, sizeof(PrefetchState));
|
||||
|
||||
MyPState->n_unused = READ_BUFFER_SIZE;
|
||||
|
||||
MyPState->bufctx = SlabContextCreate(TopMemoryContext,
|
||||
"NeonSMGR/prefetch",
|
||||
SLAB_DEFAULT_BLOCK_SIZE * 17,
|
||||
PS_GETPAGERESPONSE_SIZE);
|
||||
MyPState->errctx = AllocSetContextCreate(TopMemoryContext,
|
||||
"NeonSMGR/errors",
|
||||
ALLOCSET_DEFAULT_SIZES);
|
||||
MyPState->hashctx = AllocSetContextCreate(TopMemoryContext,
|
||||
"NeonSMGR/prefetch",
|
||||
ALLOCSET_DEFAULT_SIZES);
|
||||
|
||||
info.keysize = sizeof(BufferTag);
|
||||
info.entrysize = sizeof(uint64);
|
||||
|
||||
MyPState->prf_hash = prfh_create(MyPState->hashctx,
|
||||
READ_BUFFER_SIZE, NULL);
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
mdinit();
|
||||
#endif
|
||||
@@ -1004,27 +1464,17 @@ neon_close(SMgrRelation reln, ForkNumber forknum)
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* neon_reset_prefetch() -- reoe all previously rgistered prefeth requests
|
||||
*/
|
||||
void
|
||||
neon_reset_prefetch(SMgrRelation reln)
|
||||
{
|
||||
n_prefetch_requests = 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* neon_prefetch() -- Initiate asynchronous read of the specified block of a relation
|
||||
*/
|
||||
bool
|
||||
neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
|
||||
{
|
||||
uint64 ring_index;
|
||||
|
||||
switch (reln->smgr_relpersistence)
|
||||
{
|
||||
case 0:
|
||||
/* probably shouldn't happen, but ignore it */
|
||||
break;
|
||||
|
||||
case 0: /* probably shouldn't happen, but ignore it */
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
break;
|
||||
|
||||
@@ -1036,14 +1486,17 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
|
||||
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
}
|
||||
|
||||
if (n_prefetch_requests < MAX_PREFETCH_REQUESTS)
|
||||
{
|
||||
prefetch_requests[n_prefetch_requests].rnode = reln->smgr_rnode.node;
|
||||
prefetch_requests[n_prefetch_requests].forkNum = forknum;
|
||||
prefetch_requests[n_prefetch_requests].blockNum = blocknum;
|
||||
n_prefetch_requests += 1;
|
||||
return true;
|
||||
}
|
||||
BufferTag tag = (BufferTag) {
|
||||
.rnode = reln->smgr_rnode.node,
|
||||
.forkNum = forknum,
|
||||
.blockNum = blocknum
|
||||
};
|
||||
|
||||
ring_index = prefetch_register_buffer(tag, NULL, NULL);
|
||||
|
||||
Assert(ring_index < MyPState->ring_unused &&
|
||||
MyPState->ring_last <= ring_index);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -1094,81 +1547,72 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
|
||||
XLogRecPtr request_lsn, bool request_latest, char *buffer)
|
||||
{
|
||||
NeonResponse *resp;
|
||||
int i;
|
||||
BufferTag buftag;
|
||||
uint64 ring_index;
|
||||
PrfHashEntry *entry;
|
||||
PrefetchRequest *slot;
|
||||
|
||||
buftag = (BufferTag) {
|
||||
.rnode = rnode,
|
||||
.forkNum = forkNum,
|
||||
.blockNum = blkno,
|
||||
};
|
||||
|
||||
/*
|
||||
* Try to find prefetched page. It is assumed that pages will be requested
|
||||
* in the same order as them are prefetched, but some other backend may
|
||||
* load page in shared buffers, so some prefetch responses should be
|
||||
* skipped.
|
||||
* Try to find prefetched page in the list of received pages.
|
||||
*/
|
||||
for (i = n_prefetched_buffers; i < n_prefetch_responses; i++)
|
||||
{
|
||||
resp = page_server->receive();
|
||||
if (resp->tag == T_NeonGetPageResponse &&
|
||||
RelFileNodeEquals(prefetch_responses[i].rnode, rnode) &&
|
||||
prefetch_responses[i].forkNum == forkNum &&
|
||||
prefetch_responses[i].blockNum == blkno)
|
||||
{
|
||||
char *page = ((NeonGetPageResponse *) resp)->page;
|
||||
entry = prfh_lookup(MyPState->prf_hash, (PrefetchRequest *) &buftag);
|
||||
|
||||
if (entry != NULL)
|
||||
{
|
||||
if (entry->slot->effective_request_lsn >= prefetch_lsn)
|
||||
{
|
||||
slot = entry->slot;
|
||||
ring_index = slot->my_ring_index;
|
||||
n_prefetch_hits += 1;
|
||||
}
|
||||
else /* the current prefetch LSN is not large enough, so drop the prefetch */
|
||||
{
|
||||
/*
|
||||
* Check if prefetched page is still relevant. If it is updated by
|
||||
* some other backend, then it should not be requested from smgr
|
||||
* unless it is evicted from shared buffers. In the last case
|
||||
* last_evicted_lsn should be updated and request_lsn should be
|
||||
* greater than prefetch_lsn. Maximum with page LSN is used
|
||||
* because page returned by page server may have LSN either
|
||||
* greater either smaller than requested.
|
||||
* We can't drop cache for not-yet-received requested items. It is
|
||||
* unlikely this happens, but it can happen if prefetch distance is
|
||||
* large enough and a backend didn't consume all prefetch requests.
|
||||
*/
|
||||
if (Max(prefetch_lsn, PageGetLSN(page)) >= request_lsn)
|
||||
if (entry->slot->status == PRFS_REQUESTED)
|
||||
{
|
||||
n_prefetched_buffers = i + 1;
|
||||
n_prefetch_hits += 1;
|
||||
n_prefetch_requests = 0;
|
||||
memcpy(buffer, page, BLCKSZ);
|
||||
pfree(resp);
|
||||
return;
|
||||
page_server->flush();
|
||||
prefetch_wait_for(entry->slot->my_ring_index);
|
||||
}
|
||||
/* drop caches */
|
||||
prefetch_set_unused(entry->slot->my_ring_index, true);
|
||||
n_prefetch_missed_caches += 1;
|
||||
/* make it look like a prefetch cache miss */
|
||||
entry = NULL;
|
||||
}
|
||||
pfree(resp);
|
||||
}
|
||||
n_prefetched_buffers = 0;
|
||||
n_prefetch_responses = 0;
|
||||
n_prefetch_misses += 1;
|
||||
{
|
||||
NeonGetPageRequest request = {
|
||||
.req.tag = T_NeonGetPageRequest,
|
||||
.req.latest = request_latest,
|
||||
.req.lsn = request_lsn,
|
||||
.rnode = rnode,
|
||||
.forknum = forkNum,
|
||||
.blkno = blkno
|
||||
};
|
||||
|
||||
if (n_prefetch_requests > 0)
|
||||
{
|
||||
/* Combine all prefetch requests with primary request */
|
||||
page_server->send((NeonRequest *) & request);
|
||||
for (i = 0; i < n_prefetch_requests; i++)
|
||||
{
|
||||
request.rnode = prefetch_requests[i].rnode;
|
||||
request.forknum = prefetch_requests[i].forkNum;
|
||||
request.blkno = prefetch_requests[i].blockNum;
|
||||
prefetch_responses[i] = prefetch_requests[i];
|
||||
page_server->send((NeonRequest *) & request);
|
||||
}
|
||||
page_server->flush();
|
||||
n_prefetch_responses = n_prefetch_requests;
|
||||
n_prefetch_requests = 0;
|
||||
prefetch_lsn = request_lsn;
|
||||
resp = page_server->receive();
|
||||
}
|
||||
else
|
||||
{
|
||||
resp = page_server->request((NeonRequest *) & request);
|
||||
}
|
||||
if (entry == NULL)
|
||||
{
|
||||
n_prefetch_misses += 1;
|
||||
|
||||
ring_index = prefetch_register_buffer(buftag, &request_latest,
|
||||
&request_lsn);
|
||||
slot = &MyPState->prf_buffer[(ring_index % READ_BUFFER_SIZE)];
|
||||
}
|
||||
|
||||
Assert(MyPState->ring_last <= ring_index &&
|
||||
MyPState->ring_unused > ring_index);
|
||||
Assert(slot->my_ring_index == ring_index);
|
||||
Assert(slot->status != PRFS_UNUSED);
|
||||
Assert(&MyPState->prf_buffer[(ring_index % READ_BUFFER_SIZE)] == slot);
|
||||
|
||||
page_server->flush();
|
||||
prefetch_wait_for(ring_index);
|
||||
|
||||
Assert(slot->status == PRFS_RECEIVED);
|
||||
|
||||
resp = slot->response;
|
||||
|
||||
switch (resp->tag)
|
||||
{
|
||||
case T_NeonGetPageResponse:
|
||||
@@ -1188,12 +1632,13 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
|
||||
errdetail("page server returned error: %s",
|
||||
((NeonErrorResponse *) resp)->message)));
|
||||
break;
|
||||
|
||||
default:
|
||||
elog(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
|
||||
}
|
||||
|
||||
pfree(resp);
|
||||
/* buffer was used, clean up for later reuse */
|
||||
prefetch_set_unused(ring_index, true);
|
||||
prefetch_cleanup();
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1815,7 +2260,6 @@ static const struct f_smgr neon_smgr =
|
||||
.smgr_unlink = neon_unlink,
|
||||
.smgr_extend = neon_extend,
|
||||
.smgr_prefetch = neon_prefetch,
|
||||
.smgr_reset_prefetch = neon_reset_prefetch,
|
||||
.smgr_read = neon_read,
|
||||
.smgr_write = neon_write,
|
||||
.smgr_writeback = neon_writeback,
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: c0284ce58e...e56b812dd8
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: e5cc262697...39e3d745b3
Reference in New Issue
Block a user