Files
neon/pgxn/neon/communicator.c
Heikki Linnakangas 02fc8b7c70 Add compatibility macros for MyProcNumber and PGIOAlignedBlock (#12715)
There were a few uses of these already, so collect them to the
compatibility header to avoid the repetition and scattered #ifdefs.

The definition of MyProcNumber is a little different from what was used
before, but the end result is the same. (PGPROC->pgprocno values were
just assigned sequentially to all PGPROC array members, see
InitProcGlobal(). That's a bit silly, which is why it was removed in
v17.)
2025-07-28 15:05:36 +00:00

2576 lines
80 KiB
C

/*-------------------------------------------------------------------------
*
* communicator.c
* Functions for communicating with remote pageservers.
*
* This is the so-called "legacy" communicator. It consists of functions that
* are called from the smgr implementation, in pagestore_smgr.c. There are
* plans to replace this with a different implementation, see RFC.
*
* The communicator is a collection of functions that are called in each
* backend, when the backend needs to read a page or other information. It
* does not spawn background threads or anything like that. To process
* responses to prefetch requests in a timely fashion, however, it registers
* a ProcessInterrupts hook that gets called periodically from any
* CHECK_FOR_INTERRUPTS() point in the backend.
*
* By the time the functions in this file are called, the caller has already
* established that a request to the pageserver is necessary. The functions
* are only called for permanent relations (i.e. not temp or unlogged tables).
* Before making a call to the communicator, the caller has already checked
* the relation size or local file cache.
*
* However, when processing responses to getpage requests, the communicator
* writes pages directly to the LFC.
*
* The communicator functions take request LSNs as arguments; the caller is
* responsible for determining the correct LSNs to use. There's one exception
* to that, in prefetch_do_request(); it sometimes calls back to
* neon_get_request_lsns(). That's because sometimes a suitable response is
* found in the prefetch buffer and the request LSns are not needed, and the
* caller doesn't know whether it's needed or not.
*
* The main interface consists of the following "synchronous" calls:
*
* communicator_exists - Returns true if a relation file exists
* communicator_nblocks - Returns a relation's size
* communicator_dbsize - Returns a databases's total size
* communicator_read_at_lsnv - Read contents of one relation block
* communicator_read_slru_segment - Read contents of one SLRU segment
*
* In addition, there functions related to prefetching:
* communicator_prefetch_register_bufferv - Start prefetching a page
* communicator_prefetch_lookupv - Check if a page is already in prefetch queue
*
* Misc other functions:
* - communicator_init - Initialize the module at startup
* - communicator_prefetch_pump_state - Called periodically to advance the state
*
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/xlog.h"
#include "access/xlogdefs.h"
#include "access/xlog_internal.h"
#include "access/xlogutils.h"
#include "common/hashfn.h"
#include "executor/instrument.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "port/pg_iovec.h"
#include "postmaster/interrupt.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "utils/timeout.h"
#include "bitmap.h"
#include "communicator.h"
#include "file_cache.h"
#include "neon.h"
#include "neon_perf_counters.h"
#include "pagestore_client.h"
#if PG_VERSION_NUM >= 150000
#include "access/xlogrecovery.h"
#endif
#define NEON_PANIC_CONNECTION_STATE(shard_no, elvl, message, ...) \
neon_shard_log(shard_no, elvl, "Broken connection state: " message, \
##__VA_ARGS__)
page_server_api *page_server;
/*
* Various settings related to prompt (fast) handling of PageStream responses
* at any CHECK_FOR_INTERRUPTS point.
*/
int readahead_getpage_pull_timeout_ms = 50;
static int PS_TIMEOUT_ID = 0;
static bool timeout_set = false;
static bool timeout_signaled = false;
/*
* We have a CHECK_FOR_INTERRUPTS in page_server->receive(), and we don't want
* that to handle any getpage responses if we're already working on the
* backlog of those, as we'd hit issues with determining which prefetch slot
* we just got a response for.
*
* To protect against that, we have this variable that's set whenever we start
* receiving data for prefetch slots, so that we don't get confused.
*
* Note that in certain error cases during readpage we may leak r_r_g=true,
* which results in a failure to pick up further responses until we first
* actively try to receive new getpage responses.
*/
static bool readpage_reentrant_guard = false;
static void pagestore_timeout_handler(void);
#define START_PREFETCH_RECEIVE_WORK() \
do { \
readpage_reentrant_guard = true; \
} while (false)
#define END_PREFETCH_RECEIVE_WORK() \
do { \
readpage_reentrant_guard = false; \
if (unlikely(timeout_signaled && !InterruptPending)) \
InterruptPending = true; \
} while (false)
/*
* Prefetch implementation:
*
* Prefetch is performed locally by each backend.
*
* There can be up to readahead_buffer_size active IO requests registered at
* any time. Requests using smgr_prefetch are sent to the pageserver, but we
* don't wait on the response. Requests using smgr_read are either read from
* the buffer, or (if that's not possible) we wait on the response to arrive -
* this also will allow us to receive other prefetched pages.
* Each request is immediately written to the output buffer of the pageserver
* connection, but may not be flushed if smgr_prefetch is used: pageserver
* flushes sent requests on manual flush, or every neon.flush_output_after
* unflushed requests; which is not necessarily always and all the time.
*
* Once we have received a response, this value will be stored in the response
* buffer, indexed in a hash table. This allows us to retain our buffered
* prefetch responses even when we have cache misses.
*
* Reading of prefetch responses is delayed until them are actually needed
* (smgr_read). In case of prefetch miss or any other SMGR request other than
* smgr_read, all prefetch responses in the pipeline will need to be read from
* the connection; the responses are stored for later use.
*
* NOTE: The current implementation of the prefetch system implements a ring
* buffer of up to readahead_buffer_size requests. If there are more _read and
* _prefetch requests between the initial _prefetch and the _read of a buffer,
* the prefetch request will have been dropped from this prefetch buffer, and
* your prefetch was wasted.
*/
/*
* State machine:
*
* not in hash : in hash
* :
* UNUSED ------> REQUESTED --> RECEIVED
* ^ : | |
* | : v |
* | : TAG_REMAINS |
* | : | |
* +----------------+------------+
* :
*/
typedef enum PrefetchStatus
{
PRFS_UNUSED = 0, /* unused slot */
PRFS_REQUESTED, /* request was written to the sendbuffer to
* PS, but not necessarily flushed. all fields
* except response valid */
PRFS_RECEIVED, /* all fields valid */
PRFS_TAG_REMAINS, /* only buftag and my_ring_index are still
* valid */
} PrefetchStatus;
/* must fit in uint8; bits 0x1 are used */
typedef enum {
PRFSF_NONE = 0x0,
PRFSF_LFC = 0x1 /* received prefetch result is stored in LFC */
} PrefetchRequestFlags;
typedef struct PrefetchRequest
{
BufferTag buftag; /* must be first entry in the struct */
shardno_t shard_no;
uint8 status; /* see PrefetchStatus for valid values */
uint8 flags; /* see PrefetchRequestFlags */
neon_request_lsns request_lsns;
NeonRequestId reqid;
NeonResponse *response; /* may be null */
uint64 my_ring_index;
} PrefetchRequest;
/* prefetch buffer lookup hash table */
typedef struct PrfHashEntry
{
PrefetchRequest *slot;
uint32 status;
uint32 hash;
} PrfHashEntry;
#define SH_PREFIX prfh
#define SH_ELEMENT_TYPE PrfHashEntry
#define SH_KEY_TYPE PrefetchRequest *
#define SH_KEY slot
#define SH_STORE_HASH
#define SH_GET_HASH(tb, a) ((a)->hash)
#define SH_HASH_KEY(tb, key) hash_bytes( \
((const unsigned char *) &(key)->buftag), \
sizeof(BufferTag) \
)
#define SH_EQUAL(tb, a, b) (BufferTagsEqual(&(a)->buftag, &(b)->buftag))
#define SH_SCOPE static inline
#define SH_DEFINE
#define SH_DECLARE
#include "lib/simplehash.h"
/*
* PrefetchState maintains the state of (prefetch) getPage@LSN requests.
* It maintains a (ring) buffer of in-flight requests and responses.
*
* We maintain several indexes into the ring buffer:
* ring_unused >= ring_flush >= ring_receive >= ring_last >= 0
*
* ring_unused points to the first unused slot of the buffer
* ring_receive is the next request that is to be received
* ring_last is the oldest received entry in the buffer
*
* Apart from being an entry in the ring buffer of prefetch requests, each
* PrefetchRequest that is not UNUSED is indexed in prf_hash by buftag.
*/
typedef struct PrefetchState
{
MemoryContext bufctx; /* context for prf_buffer[].response
* allocations */
MemoryContext errctx; /* context for prf_buffer[].response
* allocations */
MemoryContext hashctx; /* context for prf_buffer */
/* buffer indexes */
uint64 ring_unused; /* first unused slot */
uint64 ring_flush; /* next request to flush */
uint64 ring_receive; /* next slot that is to receive a response */
uint64 ring_last; /* min slot with a response value */
/* metrics / statistics */
int n_responses_buffered; /* count of PS responses not yet in
* buffers */
int n_requests_inflight; /* count of PS requests considered in
* flight */
int n_unused; /* count of buffers < unused, > last, that are
* also unused */
/* the buffers */
prfh_hash *prf_hash;
int max_shard_no;
/* Mark shards involved in prefetch */
uint8 shard_bitmap[(MAX_SHARDS + 7)/8];
PrefetchRequest prf_buffer[]; /* prefetch buffers */
} PrefetchState;
static PrefetchState *MyPState;
#define GetPrfSlotNoCheck(ring_index) ( \
&MyPState->prf_buffer[((ring_index) % readahead_buffer_size)] \
)
#define GetPrfSlot(ring_index) ( \
( \
AssertMacro((ring_index) < MyPState->ring_unused && \
(ring_index) >= MyPState->ring_last), \
GetPrfSlotNoCheck(ring_index) \
) \
)
#define ReceiveBufferNeedsCompaction() (\
(MyPState->n_responses_buffered / 8) < ( \
MyPState->ring_receive - \
MyPState->ring_last - \
MyPState->n_responses_buffered \
) \
)
static process_interrupts_callback_t prev_interrupt_cb;
static bool compact_prefetch_buffers(void);
static void consume_prefetch_responses(void);
static uint64 prefetch_register_bufferv(BufferTag tag, neon_request_lsns *frlsns,
BlockNumber nblocks, const bits8 *mask,
bool is_prefetch);
static bool prefetch_read(PrefetchRequest *slot);
static void prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns);
static bool prefetch_wait_for(uint64 ring_index);
static void prefetch_cleanup_trailing_unused(void);
static inline void prefetch_set_unused(uint64 ring_index);
static bool neon_prefetch_response_usable(neon_request_lsns *request_lsns,
PrefetchRequest *slot);
static bool communicator_processinterrupts(void);
void
pg_init_communicator(void)
{
prev_interrupt_cb = ProcessInterruptsCallback;
ProcessInterruptsCallback = communicator_processinterrupts;
}
static bool
compact_prefetch_buffers(void)
{
uint64 empty_ring_index = MyPState->ring_last;
uint64 search_ring_index = MyPState->ring_receive;
int n_moved = 0;
if (MyPState->ring_receive == MyPState->ring_last)
return false;
while (search_ring_index > MyPState->ring_last)
{
search_ring_index--;
if (GetPrfSlot(search_ring_index)->status == PRFS_UNUSED)
{
empty_ring_index = search_ring_index;
break;
}
}
/*
* Here we have established: slots < search_ring_index have an unknown
* state (not scanned) slots >= search_ring_index and <= empty_ring_index
* are unused slots > empty_ring_index are in use, or outside our buffer's
* range. ... unless search_ring_index <= ring_last
*
* Therefore, there is a gap of at least one unused items between
* search_ring_index and empty_ring_index (both inclusive), which grows as
* we hit more unused items while moving backwards through the array.
*/
while (search_ring_index > MyPState->ring_last)
{
PrefetchRequest *source_slot;
PrefetchRequest *target_slot;
bool found;
/* update search index to an unprocessed entry */
search_ring_index--;
source_slot = GetPrfSlot(search_ring_index);
if (source_slot->status == PRFS_UNUSED)
continue;
/* slot is used -- start moving slot */
target_slot = GetPrfSlot(empty_ring_index);
Assert(source_slot->status == PRFS_RECEIVED);
Assert(target_slot->status == PRFS_UNUSED);
target_slot->buftag = source_slot->buftag;
target_slot->shard_no = source_slot->shard_no;
target_slot->status = source_slot->status;
target_slot->flags = source_slot->flags;
target_slot->response = source_slot->response;
target_slot->reqid = source_slot->reqid;
target_slot->request_lsns = source_slot->request_lsns;
target_slot->my_ring_index = empty_ring_index;
prfh_delete(MyPState->prf_hash, source_slot);
prfh_insert(MyPState->prf_hash, target_slot, &found);
Assert(!found);
/* Adjust the location of our known-empty slot */
empty_ring_index--;
/* empty the moved slot */
source_slot->status = PRFS_UNUSED;
source_slot->buftag = (BufferTag)
{
0
};
source_slot->response = NULL;
source_slot->my_ring_index = 0;
source_slot->request_lsns = (neon_request_lsns) {
InvalidXLogRecPtr, InvalidXLogRecPtr, InvalidXLogRecPtr
};
/* update bookkeeping */
n_moved++;
}
/*
* Only when we've moved slots we can expect trailing unused slots, so
* only then we clean up trailing unused slots.
*/
if (n_moved > 0)
{
prefetch_cleanup_trailing_unused();
return true;
}
return false;
}
/*
* Check that prefetch response matches the slot
*/
static void
check_getpage_response(PrefetchRequest* slot, NeonResponse* resp)
{
if (resp->tag != T_NeonGetPageResponse && resp->tag != T_NeonErrorResponse)
{
neon_shard_log(slot->shard_no, PANIC, "Unexpected prefetch response %d, ring_receive=" UINT64_FORMAT ", ring_flush=" UINT64_FORMAT ", ring_unused=" UINT64_FORMAT "",
resp->tag, MyPState->ring_receive, MyPState->ring_flush, MyPState->ring_unused);
}
if (neon_protocol_version >= 3)
{
NRelFileInfo rinfo = BufTagGetNRelFileInfo(slot->buftag);
if (resp->tag == T_NeonGetPageResponse)
{
NeonGetPageResponse * getpage_resp = (NeonGetPageResponse *)resp;
if (resp->reqid != slot->reqid ||
resp->lsn != slot->request_lsns.request_lsn ||
resp->not_modified_since != slot->request_lsns.not_modified_since ||
!RelFileInfoEquals(getpage_resp->req.rinfo, rinfo) ||
getpage_resp->req.forknum != slot->buftag.forkNum ||
getpage_resp->req.blkno != slot->buftag.blockNum)
{
NEON_PANIC_CONNECTION_STATE(slot->shard_no, PANIC,
"Receive unexpected getpage response {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u} to get page request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(getpage_resp->req.rinfo), getpage_resp->req.forknum, getpage_resp->req.blkno,
slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since), RelFileInfoFmt(rinfo), slot->buftag.forkNum, slot->buftag.blockNum);
}
}
else if (resp->reqid != slot->reqid ||
resp->lsn != slot->request_lsns.request_lsn ||
resp->not_modified_since != slot->request_lsns.not_modified_since)
{
elog(WARNING, NEON_TAG "Error message {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X} doesn't match exists request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since),
slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since));
}
}
}
/*
* If there might be responses still in the TCP buffer, then we should try to
* use those, to reduce any TCP backpressure on the OS/PS side.
*
* This procedure handles that.
*
* Note that this works because we don't pipeline non-getPage requests.
*
* NOTE: This procedure is not allowed to throw errors that should be handled
* by SMGR-related code, as this can be called from every CHECK_FOR_INTERRUPTS
* point inside and outside PostgreSQL.
*
* This still does throw errors when it receives malformed responses from PS.
*/
void
communicator_prefetch_pump_state(void)
{
START_PREFETCH_RECEIVE_WORK();
while (MyPState->ring_receive != MyPState->ring_flush)
{
NeonResponse *response;
PrefetchRequest *slot;
MemoryContext old;
slot = GetPrfSlot(MyPState->ring_receive);
old = MemoryContextSwitchTo(MyPState->errctx);
response = page_server->try_receive(slot->shard_no);
MemoryContextSwitchTo(old);
if (response == NULL)
break;
check_getpage_response(slot, response);
/* The slot should still be valid */
if (slot->status != PRFS_REQUESTED ||
slot->response != NULL ||
slot->my_ring_index != MyPState->ring_receive)
{
neon_shard_log(slot->shard_no, PANIC,
"Incorrect prefetch slot state after receive: status=%d response=%p my=" UINT64_FORMAT " receive=" UINT64_FORMAT "",
slot->status, slot->response,
slot->my_ring_index, MyPState->ring_receive);
}
/* update prefetch state */
MyPState->n_responses_buffered += 1;
MyPState->n_requests_inflight -= 1;
MyPState->ring_receive += 1;
MyNeonCounters->getpage_prefetches_buffered =
MyPState->n_responses_buffered;
/* update slot state */
slot->status = PRFS_RECEIVED;
slot->response = response;
if (response->tag == T_NeonGetPageResponse && !(slot->flags & PRFSF_LFC) && lfc_store_prefetch_result)
{
/*
* Store prefetched result in LFC (please read comments to lfc_prefetch
* explaining why it can be done without holding shared buffer lock
*/
if (lfc_prefetch(BufTagGetNRelFileInfo(slot->buftag), slot->buftag.forkNum, slot->buftag.blockNum, ((NeonGetPageResponse*)response)->page, slot->request_lsns.not_modified_since))
{
slot->flags |= PRFSF_LFC;
}
}
}
END_PREFETCH_RECEIVE_WORK();
communicator_reconfigure_timeout_if_needed();
}
void
readahead_buffer_resize(int newsize, void *extra)
{
uint64 end,
nfree = newsize;
PrefetchState *newPState;
Size newprfs_size = offsetof(PrefetchState, prf_buffer) +
(sizeof(PrefetchRequest) * newsize);
/* don't try to re-initialize if we haven't initialized yet */
if (MyPState == NULL)
return;
/*
* Make sure that we don't lose track of active prefetch requests by
* ensuring we have received all but the last n requests (n = newsize).
*/
if (MyPState->n_requests_inflight > newsize)
{
prefetch_wait_for(MyPState->ring_unused - newsize - 1);
Assert(MyPState->n_requests_inflight <= newsize);
}
/* construct the new PrefetchState, and copy over the memory contexts */
newPState = MemoryContextAllocZero(TopMemoryContext, newprfs_size);
newPState->bufctx = MyPState->bufctx;
newPState->errctx = MyPState->errctx;
newPState->hashctx = MyPState->hashctx;
newPState->prf_hash = prfh_create(MyPState->hashctx, newsize, NULL);
newPState->n_unused = newsize;
newPState->n_requests_inflight = 0;
newPState->n_responses_buffered = 0;
newPState->ring_last = newsize;
newPState->ring_unused = newsize;
newPState->ring_receive = newsize;
newPState->max_shard_no = MyPState->max_shard_no;
memcpy(newPState->shard_bitmap, MyPState->shard_bitmap, sizeof(MyPState->shard_bitmap));
/*
* Copy over the prefetches.
*
* We populate the prefetch array from the end; to retain the most recent
* prefetches, but this has the benefit of only needing to do one
* iteration on the dataset, and trivial compaction.
*/
for (end = MyPState->ring_unused - 1;
end >= MyPState->ring_last && end != UINT64_MAX && nfree != 0;
end -= 1)
{
PrefetchRequest *slot = GetPrfSlot(end);
PrefetchRequest *newslot;
bool found;
if (slot->status == PRFS_UNUSED)
continue;
nfree -= 1;
newslot = &newPState->prf_buffer[nfree];
*newslot = *slot;
newslot->my_ring_index = nfree;
prfh_insert(newPState->prf_hash, newslot, &found);
Assert(!found);
switch (newslot->status)
{
case PRFS_UNUSED:
pg_unreachable();
case PRFS_REQUESTED:
newPState->n_requests_inflight += 1;
newPState->ring_receive -= 1;
newPState->ring_last -= 1;
break;
case PRFS_RECEIVED:
newPState->n_responses_buffered += 1;
newPState->ring_last -= 1;
break;
case PRFS_TAG_REMAINS:
newPState->ring_last -= 1;
break;
}
newPState->n_unused -= 1;
}
newPState->ring_flush = newPState->ring_receive;
MyNeonCounters->getpage_prefetches_buffered =
MyPState->n_responses_buffered;
MyNeonCounters->pageserver_open_requests =
MyPState->n_requests_inflight;
for (; end >= MyPState->ring_last && end != UINT64_MAX; end -= 1)
{
PrefetchRequest *slot = GetPrfSlot(end);
Assert(slot->status != PRFS_REQUESTED);
if (slot->status == PRFS_RECEIVED)
{
pfree(slot->response);
}
}
prfh_destroy(MyPState->prf_hash);
pfree(MyPState);
MyPState = newPState;
}
/*
* Callback to be called on backend exit to ensure correct state of compute-PS communication
* in case of backend cancel
*/
static void
prefetch_on_exit(int code, Datum arg)
{
if (code != 0) /* do disconnect only on abnormal backend termination */
{
shardno_t shard_no = DatumGetInt32(arg);
prefetch_on_ps_disconnect();
page_server->disconnect(shard_no);
}
}
/*
* Make sure that there are no responses still in the buffer.
*
* This function may indirectly update MyPState->pfs_hash; which invalidates
* any active pointers into the hash table.
*/
static void
consume_prefetch_responses(void)
{
if (MyPState->ring_receive < MyPState->ring_unused)
prefetch_wait_for(MyPState->ring_unused - 1);
/*
* We know for sure we're not working on any prefetch pages after
* this.
*/
END_PREFETCH_RECEIVE_WORK();
}
static void
prefetch_cleanup_trailing_unused(void)
{
uint64 ring_index;
PrefetchRequest *slot;
while (MyPState->ring_last < MyPState->ring_receive)
{
ring_index = MyPState->ring_last;
slot = GetPrfSlot(ring_index);
if (slot->status == PRFS_UNUSED)
MyPState->ring_last += 1;
else
break;
}
}
static bool
prefetch_flush_requests(void)
{
for (shardno_t shard_no = 0; shard_no < MyPState->max_shard_no; shard_no++)
{
if (BITMAP_ISSET(MyPState->shard_bitmap, shard_no))
{
if (!page_server->flush(shard_no))
return false;
BITMAP_CLR(MyPState->shard_bitmap, shard_no);
}
}
MyPState->max_shard_no = 0;
return true;
}
/*
* Wait for slot of ring_index to have received its response.
* The caller is responsible for making sure the request buffer is flushed.
*
* NOTE: this function may indirectly update MyPState->pfs_hash; which
* invalidates any active pointers into the hash table.
* NOTE: callers should make sure they can handle query cancellations in this
* function's call path.
*/
static bool
prefetch_wait_for(uint64 ring_index)
{
PrefetchRequest *entry;
bool result = true;
if (MyPState->ring_flush <= ring_index &&
MyPState->ring_unused > MyPState->ring_flush)
{
if (!prefetch_flush_requests())
return false;
MyPState->ring_flush = MyPState->ring_unused;
}
Assert(MyPState->ring_unused > ring_index);
START_PREFETCH_RECEIVE_WORK();
while (MyPState->ring_receive <= ring_index)
{
entry = GetPrfSlot(MyPState->ring_receive);
Assert(entry->status == PRFS_REQUESTED);
if (!prefetch_read(entry))
{
result = false;
break;
}
CHECK_FOR_INTERRUPTS();
}
if (result)
{
/* Check that slot is actually received (srver can be disconnected in prefetch_pump_state called from CHECK_FOR_INTERRUPTS */
PrefetchRequest *slot = GetPrfSlot(ring_index);
result = slot->status == PRFS_RECEIVED;
}
END_PREFETCH_RECEIVE_WORK();
return result;
;
}
/*
* Read the response of a prefetch request into its slot.
*
* The caller is responsible for making sure that the request for this buffer
* was flushed to the PageServer.
*
* NOTE: this function may indirectly update MyPState->pfs_hash; which
* invalidates any active pointers into the hash table.
*
* NOTE: this does IO, and can get canceled out-of-line.
*/
static bool
prefetch_read(PrefetchRequest *slot)
{
NeonResponse *response;
MemoryContext old;
BufferTag buftag;
shardno_t shard_no;
uint64 my_ring_index;
Assert(slot->status == PRFS_REQUESTED);
Assert(slot->response == NULL);
Assert(slot->my_ring_index == MyPState->ring_receive);
Assert(readpage_reentrant_guard || AmPrewarmWorker);
if (slot->status != PRFS_REQUESTED ||
slot->response != NULL ||
slot->my_ring_index != MyPState->ring_receive)
{
neon_shard_log(slot->shard_no, PANIC,
"Incorrect prefetch read: status=%d response=%p my=" UINT64_FORMAT " receive=" UINT64_FORMAT "",
slot->status, slot->response,
slot->my_ring_index, MyPState->ring_receive);
}
/*
* Copy the request info so that if an error happens and the prefetch
* queue is flushed during the receive call, we can print the original
* values in the error message
*/
buftag = slot->buftag;
shard_no = slot->shard_no;
my_ring_index = slot->my_ring_index;
old = MemoryContextSwitchTo(MyPState->errctx);
response = (NeonResponse *) page_server->receive(shard_no);
MemoryContextSwitchTo(old);
if (response)
{
check_getpage_response(slot, response);
/* The slot should still be valid */
if (slot->status != PRFS_REQUESTED ||
slot->response != NULL ||
slot->my_ring_index != MyPState->ring_receive)
{
neon_shard_log(shard_no, PANIC,
"Incorrect prefetch slot state after receive: status=%d response=%p my=" UINT64_FORMAT " receive=" UINT64_FORMAT "",
slot->status, slot->response,
slot->my_ring_index, MyPState->ring_receive);
}
/* update prefetch state */
MyPState->n_responses_buffered += 1;
MyPState->n_requests_inflight -= 1;
MyPState->ring_receive += 1;
MyNeonCounters->getpage_prefetches_buffered =
MyPState->n_responses_buffered;
/* update slot state */
slot->status = PRFS_RECEIVED;
slot->response = response;
if (response->tag == T_NeonGetPageResponse && !(slot->flags & PRFSF_LFC) && lfc_store_prefetch_result)
{
/*
* Store prefetched result in LFC (please read comments to lfc_prefetch
* explaining why it can be done without holding shared buffer lock
*/
if (lfc_prefetch(BufTagGetNRelFileInfo(buftag), buftag.forkNum, buftag.blockNum, ((NeonGetPageResponse*)response)->page, slot->request_lsns.not_modified_since))
{
slot->flags |= PRFSF_LFC;
}
}
return true;
}
else
{
/*
* Note: The slot might no longer be valid, if the connection was lost
* and the prefetch queue was flushed during the receive call
*/
neon_shard_log(shard_no, LOG,
"No response from reading prefetch entry " UINT64_FORMAT ": %u/%u/%u.%u block %u. This can be caused by a concurrent disconnect",
my_ring_index,
RelFileInfoFmt(BufTagGetNRelFileInfo(buftag)),
buftag.forkNum, buftag.blockNum);
return false;
}
}
/*
* Wait completion of previosly registered prefetch request.
* Prefetch result should be placed in LFC by prefetch_wait_for.
*/
bool
communicator_prefetch_receive(BufferTag tag)
{
PrfHashEntry *entry;
PrefetchRequest hashkey;
Assert(readpage_reentrant_guard || AmPrewarmWorker); /* do not pump prefetch state in prewarm worker */
hashkey.buftag = tag;
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
if (entry != NULL && prefetch_wait_for(entry->slot->my_ring_index))
{
prefetch_set_unused(entry->slot->my_ring_index);
return true;
}
return false;
}
/*
* Disconnect hook - drop prefetches when the connection drops
*
* If we don't remove the failed prefetches, we'd be serving incorrect
* data to the smgr.
*/
void
prefetch_on_ps_disconnect(void)
{
MyPState->ring_flush = MyPState->ring_unused;
/* Nothing should cancel disconnect: we should not leave connection in opaque state */
HOLD_INTERRUPTS();
while (MyPState->ring_receive < MyPState->ring_unused)
{
PrefetchRequest *slot;
uint64 ring_index = MyPState->ring_receive;
slot = GetPrfSlot(ring_index);
Assert(slot->status == PRFS_REQUESTED);
Assert(slot->my_ring_index == ring_index);
/*
* Drop connection to all shards which have prefetch requests.
* It is not a problem to call disconnect multiple times on the same connection
* because disconnect implementation in libpagestore.c will check if connection
* is alive and do nothing of connection was already dropped.
*/
page_server->disconnect(slot->shard_no);
/* clean up the request */
slot->status = PRFS_TAG_REMAINS;
MyPState->n_requests_inflight -= 1;
MyPState->ring_receive += 1;
prefetch_set_unused(ring_index);
pgBufferUsage.prefetch.expired += 1;
MyNeonCounters->getpage_prefetch_discards_total += 1;
}
/*
* We can have gone into retry due to network error, so update stats with
* the latest available
*/
MyNeonCounters->pageserver_open_requests =
MyPState->n_requests_inflight;
MyNeonCounters->getpage_prefetches_buffered =
MyPState->n_responses_buffered;
RESUME_INTERRUPTS();
}
/*
* prefetch_set_unused() - clear a received prefetch slot
*
* The slot at ring_index must be a current member of the ring buffer,
* and may not be in the PRFS_REQUESTED state.
*
* NOTE: this function will update MyPState->pfs_hash; which invalidates any
* active pointers into the hash table.
*/
static inline void
prefetch_set_unused(uint64 ring_index)
{
PrefetchRequest *slot;
if (ring_index < MyPState->ring_last)
return; /* Should already be unused */
slot = GetPrfSlot(ring_index);
if (slot->status == PRFS_UNUSED)
return;
Assert(slot->status == PRFS_RECEIVED || slot->status == PRFS_TAG_REMAINS);
if (slot->status == PRFS_RECEIVED)
{
pfree(slot->response);
slot->response = NULL;
MyPState->n_responses_buffered -= 1;
MyPState->n_unused += 1;
MyNeonCounters->getpage_prefetches_buffered =
MyPState->n_responses_buffered;
}
else
{
Assert(slot->response == NULL);
}
prfh_delete(MyPState->prf_hash, slot);
/* clear all fields */
MemSet(slot, 0, sizeof(PrefetchRequest));
slot->status = PRFS_UNUSED;
/* run cleanup if we're holding back ring_last */
if (MyPState->ring_last == ring_index)
prefetch_cleanup_trailing_unused();
/*
* ... and try to store the buffered responses more compactly if > 12.5%
* of the buffer is gaps
*/
else if (ReceiveBufferNeedsCompaction())
compact_prefetch_buffers();
}
/*
* Send one prefetch request to the pageserver. To wait for the response, call
* prefetch_wait_for().
*/
static void
prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns)
{
bool found;
uint64 mySlotNo PG_USED_FOR_ASSERTS_ONLY = slot->my_ring_index;
NeonGetPageRequest request = {
.hdr.tag = T_NeonGetPageRequest,
/* lsn and not_modified_since are filled in below */
.rinfo = BufTagGetNRelFileInfo(slot->buftag),
.forknum = slot->buftag.forkNum,
.blkno = slot->buftag.blockNum,
};
Assert(mySlotNo == MyPState->ring_unused);
if (force_request_lsns)
slot->request_lsns = *force_request_lsns;
else
neon_get_request_lsns(BufTagGetNRelFileInfo(slot->buftag),
slot->buftag.forkNum, slot->buftag.blockNum,
&slot->request_lsns, 1);
request.hdr.lsn = slot->request_lsns.request_lsn;
request.hdr.not_modified_since = slot->request_lsns.not_modified_since;
Assert(slot->response == NULL);
Assert(slot->my_ring_index == MyPState->ring_unused);
while (!page_server->send(slot->shard_no, (NeonRequest *) &request))
{
Assert(mySlotNo == MyPState->ring_unused);
/* loop */
}
slot->reqid = request.hdr.reqid;
/* update prefetch state */
MyPState->n_requests_inflight += 1;
MyPState->n_unused -= 1;
MyPState->ring_unused += 1;
BITMAP_SET(MyPState->shard_bitmap, slot->shard_no);
MyPState->max_shard_no = Max(slot->shard_no+1, MyPState->max_shard_no);
/* update slot state */
slot->status = PRFS_REQUESTED;
prfh_insert(MyPState->prf_hash, slot, &found);
Assert(!found);
}
/*
* Lookup of already received prefetch requests. Only already received responses matching required LSNs are accepted.
* Present pages are marked in "mask" bitmap and total number of such pages is returned.
*/
int
communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blocknum,
neon_request_lsns *lsns, BlockNumber nblocks,
void **buffers, bits8 *mask)
{
int hits = 0;
PrefetchRequest hashkey;
/*
* Use an intermediate PrefetchRequest struct as the hash key to ensure
* correct alignment and that the padding bytes are cleared.
*/
memset(&hashkey.buftag, 0, sizeof(BufferTag));
CopyNRelFileInfoToBufTag(hashkey.buftag, rinfo);
hashkey.buftag.forkNum = forknum;
for (int i = 0; i < nblocks; i++)
{
PrfHashEntry *entry;
hashkey.buftag.blockNum = blocknum + i;
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
if (entry != NULL)
{
PrefetchRequest *slot = entry->slot;
uint64 ring_index = slot->my_ring_index;
Assert(slot == GetPrfSlot(ring_index));
Assert(slot->status != PRFS_UNUSED);
Assert(MyPState->ring_last <= ring_index &&
ring_index < MyPState->ring_unused);
Assert(BufferTagsEqual(&slot->buftag, &hashkey.buftag));
if (slot->status != PRFS_RECEIVED)
continue;
/*
* If the caller specified a request LSN to use, only accept
* prefetch responses that satisfy that request.
*/
if (!neon_prefetch_response_usable(&lsns[i], slot))
continue;
/*
* Ignore errors
*/
if (slot->response->tag == T_NeonErrorResponse)
{
continue;
}
Assert(slot->response->tag == T_NeonGetPageResponse); /* checked by check_getpage_response when response was assigned to the slot */
memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ);
/*
* With lfc_store_prefetch_result=true prefetch result is stored in LFC in prefetch_pump_state when response is received
* from page server. But if lfc_store_prefetch_result=false then it is not yet stored in LFC and we have to do it here
* under buffer lock.
*/
if (!lfc_store_prefetch_result)
lfc_write(rinfo, forknum, blocknum + i, buffers[i]);
prefetch_set_unused(ring_index);
BITMAP_SET(mask, i);
hits += 1;
inc_getpage_wait(0);
}
}
pgBufferUsage.prefetch.hits += hits;
return hits;
}
/*
* prefetch_register_bufferv() - register and prefetch buffers
*
* Register that we may want the contents of BufferTag in the near future.
* This is used when issuing a speculative prefetch request, but also when
* performing a synchronous request and need the buffer right now.
*
* If force_request_lsns is not NULL, those values are sent to the
* pageserver. If NULL, we utilize the lastWrittenLsn -infrastructure
* to calculate the LSNs to send.
*
* Bits set in *mask (if present) indicate pages already read; i.e. pages we
* can skip in this process.
*
* When performing a prefetch rather than a synchronous request,
* is_prefetch==true. Currently, it only affects how the request is accounted
* in the perf counters.
*
* NOTE: this function may indirectly update MyPState->pfs_hash; which
* invalidates any active pointers into the hash table.
*/
void
communicator_prefetch_register_bufferv(BufferTag tag, neon_request_lsns *frlsns,
BlockNumber nblocks, const bits8 *mask)
{
uint64 ring_index PG_USED_FOR_ASSERTS_ONLY;
ring_index = prefetch_register_bufferv(tag, frlsns, nblocks, mask, true);
Assert(ring_index < MyPState->ring_unused &&
MyPState->ring_last <= ring_index);
}
/* Internal version. Returns the ring index of the last block (result of this function is used only
* when nblocks==1)
*/
static uint64
prefetch_register_bufferv(BufferTag tag, neon_request_lsns *frlsns,
BlockNumber nblocks, const bits8 *mask,
bool is_prefetch)
{
uint64 last_ring_index;
PrefetchRequest hashkey;
#ifdef USE_ASSERT_CHECKING
bool any_hits = false;
#endif
/* We will never read further ahead than our buffer can store. */
nblocks = Max(1, Min(nblocks, readahead_buffer_size));
/*
* Use an intermediate PrefetchRequest struct as the hash key to ensure
* correct alignment and that the padding bytes are cleared.
*/
memset(&hashkey.buftag, 0, sizeof(BufferTag));
hashkey.buftag = tag;
Retry:
/*
* We can have gone into retry due to network error, so update stats with
* the latest available
*/
MyNeonCounters->pageserver_open_requests =
MyPState->ring_unused - MyPState->ring_receive;
MyNeonCounters->getpage_prefetches_buffered =
MyPState->n_responses_buffered;
last_ring_index = UINT64_MAX;
for (int i = 0; i < nblocks; i++)
{
PrefetchRequest *slot = NULL;
PrfHashEntry *entry = NULL;
neon_request_lsns *lsns;
if (PointerIsValid(mask) && BITMAP_ISSET(mask, i))
continue;
if (frlsns)
lsns = &frlsns[i];
else
lsns = NULL;
#ifdef USE_ASSERT_CHECKING
any_hits = true;
#endif
slot = NULL;
entry = NULL;
hashkey.buftag.blockNum = tag.blockNum + i;
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
if (entry != NULL)
{
slot = entry->slot;
last_ring_index = slot->my_ring_index;
Assert(slot == GetPrfSlot(last_ring_index));
Assert(slot->status != PRFS_UNUSED);
Assert(MyPState->ring_last <= last_ring_index &&
last_ring_index < MyPState->ring_unused);
Assert(BufferTagsEqual(&slot->buftag, &hashkey.buftag));
/*
* If the caller specified a request LSN to use, only accept
* prefetch responses that satisfy that request.
*/
if (!is_prefetch)
{
if (!neon_prefetch_response_usable(lsns, slot))
{
/* Wait for the old request to finish and discard it */
if (!prefetch_wait_for(last_ring_index))
goto Retry;
prefetch_set_unused(last_ring_index);
entry = NULL;
slot = NULL;
pgBufferUsage.prefetch.expired += 1;
MyNeonCounters->getpage_prefetch_discards_total += 1;
}
}
if (entry != NULL)
{
/*
* We received a prefetch for a page that was recently read
* and removed from the buffers. Remove that request from the
* buffers.
*/
if (slot->status == PRFS_TAG_REMAINS)
{
prefetch_set_unused(last_ring_index);
entry = NULL;
slot = NULL;
}
else
{
/* The buffered request is good enough, return that index */
if (is_prefetch)
pgBufferUsage.prefetch.duplicates++;
continue;
}
}
}
else if (!is_prefetch)
{
pgBufferUsage.prefetch.misses += 1;
MyNeonCounters->getpage_prefetch_misses_total++;
}
/*
* We can only leave the block above by finding that there's
* no entry that can satisfy this request, either because there
* was no entry, or because the entry was invalid or didn't satisfy
* the LSNs provided.
*
* The code should've made sure to clear up the data.
*/
Assert(entry == NULL);
Assert(slot == NULL);
/* There should be no buffer overflow */
Assert(MyPState->ring_last + readahead_buffer_size >= MyPState->ring_unused);
/*
* If the prefetch queue is full, we need to make room by clearing the
* oldest slot. If the oldest slot holds a buffer that was already
* received, we can just throw it away; we fetched the page
* unnecessarily in that case. If the oldest slot holds a request that
* we haven't received a response for yet, we have to wait for the
* response to that before we can continue. We might not have even
* flushed the request to the pageserver yet, it might be just sitting
* in the output buffer. In that case, we flush it and wait for the
* response. (We could decide not to send it, but it's hard to abort
* when the request is already in the output buffer, and 'not sending'
* a prefetch request kind of goes against the principles of
* prefetching)
*/
if (MyPState->ring_last + readahead_buffer_size == MyPState->ring_unused)
{
uint64 cleanup_index = MyPState->ring_last;
slot = GetPrfSlot(cleanup_index);
Assert(slot->status != PRFS_UNUSED);
/*
* If there is good reason to run compaction on the prefetch buffers,
* try to do that.
*/
if (ReceiveBufferNeedsCompaction() && compact_prefetch_buffers())
{
Assert(slot->status == PRFS_UNUSED);
}
else
{
/*
* We have the slot for ring_last, so that must still be in
* progress
*/
switch (slot->status)
{
case PRFS_REQUESTED:
Assert(MyPState->ring_receive == cleanup_index);
if (!prefetch_wait_for(cleanup_index))
goto Retry;
prefetch_set_unused(cleanup_index);
pgBufferUsage.prefetch.expired += 1;
MyNeonCounters->getpage_prefetch_discards_total += 1;
break;
case PRFS_RECEIVED:
case PRFS_TAG_REMAINS:
prefetch_set_unused(cleanup_index);
pgBufferUsage.prefetch.expired += 1;
MyNeonCounters->getpage_prefetch_discards_total += 1;
break;
default:
pg_unreachable();
}
}
}
/*
* The next buffer pointed to by `ring_unused` is now definitely empty, so
* we can insert the new request to it.
*/
last_ring_index = MyPState->ring_unused;
Assert(MyPState->ring_last <= last_ring_index &&
last_ring_index <= MyPState->ring_unused);
slot = GetPrfSlotNoCheck(last_ring_index);
Assert(slot->status == PRFS_UNUSED);
/*
* We must update the slot data before insertion, because the hash
* function reads the buffer tag from the slot.
*/
slot->buftag = hashkey.buftag;
slot->shard_no = get_shard_number(&tag);
slot->my_ring_index = last_ring_index;
slot->flags = 0;
if (is_prefetch)
MyNeonCounters->getpage_prefetch_requests_total++;
else
MyNeonCounters->getpage_sync_requests_total++;
prefetch_do_request(slot, lsns);
}
MyNeonCounters->pageserver_open_requests =
MyPState->ring_unused - MyPState->ring_receive;
Assert(any_hits);
Assert(last_ring_index != UINT64_MAX);
Assert(GetPrfSlot(last_ring_index)->status == PRFS_REQUESTED ||
GetPrfSlot(last_ring_index)->status == PRFS_RECEIVED);
Assert(MyPState->ring_last <= last_ring_index &&
last_ring_index < MyPState->ring_unused);
if (flush_every_n_requests > 0 &&
MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests)
{
if (!prefetch_flush_requests())
{
/*
* Prefetch set is reset in case of error, so we should try to
* register our request once again
*/
goto Retry;
}
MyPState->ring_flush = MyPState->ring_unused;
}
return last_ring_index;
}
static bool
equal_requests(NeonRequest* a, NeonRequest* b)
{
return a->reqid == b->reqid && a->lsn == b->lsn && a->not_modified_since == b->not_modified_since;
}
/*
* Note: this function can get canceled and use a long jump to the next catch
* context. Take care.
*/
static NeonResponse *
page_server_request(void const *req)
{
NeonResponse *resp = NULL;
BufferTag tag = {0};
shardno_t shard_no;
switch (messageTag(req))
{
case T_NeonExistsRequest:
CopyNRelFileInfoToBufTag(tag, ((NeonExistsRequest *) req)->rinfo);
break;
case T_NeonNblocksRequest:
CopyNRelFileInfoToBufTag(tag, ((NeonNblocksRequest *) req)->rinfo);
break;
case T_NeonDbSizeRequest:
NInfoGetDbOid(BufTagGetNRelFileInfo(tag)) = ((NeonDbSizeRequest *) req)->dbNode;
break;
case T_NeonGetPageRequest:
CopyNRelFileInfoToBufTag(tag, ((NeonGetPageRequest *) req)->rinfo);
tag.blockNum = ((NeonGetPageRequest *) req)->blkno;
break;
default:
neon_log(PANIC, "Unexpected request tag: %d", messageTag(req));
}
shard_no = get_shard_number(&tag);
/*
* Current sharding model assumes that all metadata is present only at shard 0.
* We still need to call get_shard_no() to check if shard map is up-to-date.
*/
if (((NeonRequest *) req)->tag != T_NeonGetPageRequest)
{
shard_no = 0;
}
consume_prefetch_responses();
PG_TRY();
{
before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
do
{
while (!page_server->send(shard_no, (NeonRequest *) req)
|| !page_server->flush(shard_no))
{
/* do nothing */
}
MyNeonCounters->pageserver_open_requests++;
resp = page_server->receive(shard_no);
MyNeonCounters->pageserver_open_requests--;
} while (resp == NULL);
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
}
PG_CATCH();
{
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
/* Nothing should cancel disconnect: we should not leave connection in opaque state */
HOLD_INTERRUPTS();
page_server->disconnect(shard_no);
MyNeonCounters->pageserver_open_requests = 0;
RESUME_INTERRUPTS();
PG_RE_THROW();
}
PG_END_TRY();
return resp;
}
StringInfoData
nm_pack_request(NeonRequest *msg)
{
StringInfoData s;
initStringInfo(&s);
pq_sendbyte(&s, msg->tag);
if (neon_protocol_version >= 3)
{
pq_sendint64(&s, msg->reqid);
}
pq_sendint64(&s, msg->lsn);
pq_sendint64(&s, msg->not_modified_since);
switch (messageTag(msg))
{
/* pagestore_client -> pagestore */
case T_NeonExistsRequest:
{
NeonExistsRequest *msg_req = (NeonExistsRequest *) msg;
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo));
pq_sendbyte(&s, msg_req->forknum);
break;
}
case T_NeonNblocksRequest:
{
NeonNblocksRequest *msg_req = (NeonNblocksRequest *) msg;
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo));
pq_sendbyte(&s, msg_req->forknum);
break;
}
case T_NeonDbSizeRequest:
{
NeonDbSizeRequest *msg_req = (NeonDbSizeRequest *) msg;
pq_sendint32(&s, msg_req->dbNode);
break;
}
case T_NeonGetPageRequest:
{
NeonGetPageRequest *msg_req = (NeonGetPageRequest *) msg;
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo));
pq_sendbyte(&s, msg_req->forknum);
pq_sendint32(&s, msg_req->blkno);
break;
}
case T_NeonGetSlruSegmentRequest:
{
NeonGetSlruSegmentRequest *msg_req = (NeonGetSlruSegmentRequest *) msg;
pq_sendbyte(&s, msg_req->kind);
pq_sendint32(&s, msg_req->segno);
break;
}
/* pagestore -> pagestore_client. We never need to create these. */
case T_NeonExistsResponse:
case T_NeonNblocksResponse:
case T_NeonGetPageResponse:
case T_NeonErrorResponse:
case T_NeonDbSizeResponse:
case T_NeonGetSlruSegmentResponse:
default:
neon_log(PANIC, "unexpected neon message tag 0x%02x", msg->tag);
break;
}
return s;
}
NeonResponse *
nm_unpack_response(StringInfo s)
{
NeonMessageTag tag = pq_getmsgbyte(s);
NeonResponse resp_hdr = {0}; /* make valgrind happy */
NeonResponse *resp = NULL;
resp_hdr.tag = tag;
if (neon_protocol_version >= 3)
{
resp_hdr.reqid = pq_getmsgint64(s);
resp_hdr.lsn = pq_getmsgint64(s);
resp_hdr.not_modified_since = pq_getmsgint64(s);
}
switch (tag)
{
/* pagestore -> pagestore_client */
case T_NeonExistsResponse:
{
NeonExistsResponse *msg_resp = palloc0(sizeof(NeonExistsResponse));
if (neon_protocol_version >= 3)
{
NInfoGetSpcOid(msg_resp->req.rinfo) = pq_getmsgint(s, 4);
NInfoGetDbOid(msg_resp->req.rinfo) = pq_getmsgint(s, 4);
NInfoGetRelNumber(msg_resp->req.rinfo) = pq_getmsgint(s, 4);
msg_resp->req.forknum = pq_getmsgbyte(s);
}
msg_resp->req.hdr = resp_hdr;
msg_resp->exists = pq_getmsgbyte(s);
pq_getmsgend(s);
resp = (NeonResponse *) msg_resp;
break;
}
case T_NeonNblocksResponse:
{
NeonNblocksResponse *msg_resp = palloc0(sizeof(NeonNblocksResponse));
if (neon_protocol_version >= 3)
{
NInfoGetSpcOid(msg_resp->req.rinfo) = pq_getmsgint(s, 4);
NInfoGetDbOid(msg_resp->req.rinfo) = pq_getmsgint(s, 4);
NInfoGetRelNumber(msg_resp->req.rinfo) = pq_getmsgint(s, 4);
msg_resp->req.forknum = pq_getmsgbyte(s);
}
msg_resp->req.hdr = resp_hdr;
msg_resp->n_blocks = pq_getmsgint(s, 4);
pq_getmsgend(s);
resp = (NeonResponse *) msg_resp;
break;
}
case T_NeonGetPageResponse:
{
NeonGetPageResponse *msg_resp;
msg_resp = MemoryContextAllocZero(MyPState->bufctx, PS_GETPAGERESPONSE_SIZE);
if (neon_protocol_version >= 3)
{
NInfoGetSpcOid(msg_resp->req.rinfo) = pq_getmsgint(s, 4);
NInfoGetDbOid(msg_resp->req.rinfo) = pq_getmsgint(s, 4);
NInfoGetRelNumber(msg_resp->req.rinfo) = pq_getmsgint(s, 4);
msg_resp->req.forknum = pq_getmsgbyte(s);
msg_resp->req.blkno = pq_getmsgint(s, 4);
}
msg_resp->req.hdr = resp_hdr;
/* XXX: should be varlena */
memcpy(msg_resp->page, pq_getmsgbytes(s, BLCKSZ), BLCKSZ);
pq_getmsgend(s);
Assert(msg_resp->req.hdr.tag == T_NeonGetPageResponse);
resp = (NeonResponse *) msg_resp;
break;
}
case T_NeonDbSizeResponse:
{
NeonDbSizeResponse *msg_resp = palloc0(sizeof(NeonDbSizeResponse));
if (neon_protocol_version >= 3)
{
msg_resp->req.dbNode = pq_getmsgint(s, 4);
}
msg_resp->req.hdr = resp_hdr;
msg_resp->db_size = pq_getmsgint64(s);
pq_getmsgend(s);
resp = (NeonResponse *) msg_resp;
break;
}
case T_NeonErrorResponse:
{
NeonErrorResponse *msg_resp;
size_t msglen;
const char *msgtext;
msgtext = pq_getmsgrawstring(s);
msglen = strlen(msgtext);
msg_resp = palloc0(sizeof(NeonErrorResponse) + msglen + 1);
msg_resp->req = resp_hdr;
memcpy(msg_resp->message, msgtext, msglen + 1);
pq_getmsgend(s);
resp = (NeonResponse *) msg_resp;
break;
}
case T_NeonGetSlruSegmentResponse:
{
NeonGetSlruSegmentResponse *msg_resp;
int n_blocks;
msg_resp = palloc0(sizeof(NeonGetSlruSegmentResponse));
if (neon_protocol_version >= 3)
{
msg_resp->req.kind = pq_getmsgbyte(s);
msg_resp->req.segno = pq_getmsgint(s, 4);
}
msg_resp->req.hdr = resp_hdr;
n_blocks = pq_getmsgint(s, 4);
msg_resp->n_blocks = n_blocks;
memcpy(msg_resp->data, pq_getmsgbytes(s, n_blocks * BLCKSZ), n_blocks * BLCKSZ);
pq_getmsgend(s);
resp = (NeonResponse *) msg_resp;
break;
}
/*
* pagestore_client -> pagestore
*
* We create these ourselves, and don't need to decode them.
*/
case T_NeonExistsRequest:
case T_NeonNblocksRequest:
case T_NeonGetPageRequest:
case T_NeonDbSizeRequest:
case T_NeonGetSlruSegmentRequest:
default:
neon_log(PANIC, "unexpected neon message tag 0x%02x", tag);
break;
}
return resp;
}
/* dump to json for debugging / error reporting purposes */
char *
nm_to_string(NeonMessage *msg)
{
StringInfoData s;
initStringInfo(&s);
switch (messageTag(msg))
{
/* pagestore_client -> pagestore */
case T_NeonExistsRequest:
{
NeonExistsRequest *msg_req = (NeonExistsRequest *) msg;
appendStringInfoString(&s, "{\"type\": \"NeonExistsRequest\"");
appendStringInfo(&s, ", \"rinfo\": \"%u/%u/%u\"", RelFileInfoFmt(msg_req->rinfo));
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->hdr.lsn));
appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->hdr.not_modified_since));
appendStringInfoChar(&s, '}');
break;
}
case T_NeonNblocksRequest:
{
NeonNblocksRequest *msg_req = (NeonNblocksRequest *) msg;
appendStringInfoString(&s, "{\"type\": \"NeonNblocksRequest\"");
appendStringInfo(&s, ", \"rinfo\": \"%u/%u/%u\"", RelFileInfoFmt(msg_req->rinfo));
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->hdr.lsn));
appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->hdr.not_modified_since));
appendStringInfoChar(&s, '}');
break;
}
case T_NeonGetPageRequest:
{
NeonGetPageRequest *msg_req = (NeonGetPageRequest *) msg;
appendStringInfoString(&s, "{\"type\": \"NeonGetPageRequest\"");
appendStringInfo(&s, ", \"rinfo\": \"%u/%u/%u\"", RelFileInfoFmt(msg_req->rinfo));
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
appendStringInfo(&s, ", \"blkno\": %u", msg_req->blkno);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->hdr.lsn));
appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->hdr.not_modified_since));
appendStringInfoChar(&s, '}');
break;
}
case T_NeonDbSizeRequest:
{
NeonDbSizeRequest *msg_req = (NeonDbSizeRequest *) msg;
appendStringInfoString(&s, "{\"type\": \"NeonDbSizeRequest\"");
appendStringInfo(&s, ", \"dbnode\": \"%u\"", msg_req->dbNode);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->hdr.lsn));
appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->hdr.not_modified_since));
appendStringInfoChar(&s, '}');
break;
}
case T_NeonGetSlruSegmentRequest:
{
NeonGetSlruSegmentRequest *msg_req = (NeonGetSlruSegmentRequest *) msg;
appendStringInfoString(&s, "{\"type\": \"NeonGetSlruSegmentRequest\"");
appendStringInfo(&s, ", \"kind\": %u", msg_req->kind);
appendStringInfo(&s, ", \"segno\": %u", msg_req->segno);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->hdr.lsn));
appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->hdr.not_modified_since));
appendStringInfoChar(&s, '}');
break;
}
/* pagestore -> pagestore_client */
case T_NeonExistsResponse:
{
NeonExistsResponse *msg_resp = (NeonExistsResponse *) msg;
appendStringInfoString(&s, "{\"type\": \"NeonExistsResponse\"");
appendStringInfo(&s, ", \"exists\": %d}",
msg_resp->exists);
appendStringInfoChar(&s, '}');
break;
}
case T_NeonNblocksResponse:
{
NeonNblocksResponse *msg_resp = (NeonNblocksResponse *) msg;
appendStringInfoString(&s, "{\"type\": \"NeonNblocksResponse\"");
appendStringInfo(&s, ", \"n_blocks\": %u}",
msg_resp->n_blocks);
appendStringInfoChar(&s, '}');
break;
}
case T_NeonGetPageResponse:
{
NeonGetPageResponse *msg_resp = (NeonGetPageResponse *) msg;
appendStringInfoString(&s, "{\"type\": \"NeonGetPageResponse\"");
appendStringInfo(&s, ", \"rinfo\": %u/%u/%u", RelFileInfoFmt(msg_resp->req.rinfo));
appendStringInfo(&s, ", \"forknum\": %d", msg_resp->req.forknum);
appendStringInfo(&s, ", \"blkno\": %u", msg_resp->req.blkno);
appendStringInfoChar(&s, '}');
break;
}
case T_NeonErrorResponse:
{
NeonErrorResponse *msg_resp = (NeonErrorResponse *) msg;
/* FIXME: escape double-quotes in the message */
appendStringInfoString(&s, "{\"type\": \"NeonErrorResponse\"");
appendStringInfo(&s, ", \"message\": \"%s\"}", msg_resp->message);
appendStringInfoChar(&s, '}');
break;
}
case T_NeonDbSizeResponse:
{
NeonDbSizeResponse *msg_resp = (NeonDbSizeResponse *) msg;
appendStringInfoString(&s, "{\"type\": \"NeonDbSizeResponse\"");
appendStringInfo(&s, ", \"db_size\": " INT64_FORMAT "}",
msg_resp->db_size);
appendStringInfoChar(&s, '}');
break;
}
case T_NeonGetSlruSegmentResponse:
{
NeonGetSlruSegmentResponse *msg_resp = (NeonGetSlruSegmentResponse *) msg;
appendStringInfoString(&s, "{\"type\": \"NeonGetSlruSegmentResponse\"");
appendStringInfo(&s, ", \"n_blocks\": %u}",
msg_resp->n_blocks);
appendStringInfoChar(&s, '}');
break;
}
default:
appendStringInfo(&s, "{\"type\": \"unknown 0x%02x\"", msg->tag);
}
return s.data;
}
/*
* communicator_init() -- Initialize per-backend private state
*/
void
communicator_init(void)
{
Size prfs_size;
if (MyPState != NULL)
return;
/*
* Sanity check that theperf counters array is sized correctly. We got
* this wrong once, and the formula for max number of backends and aux
* processes might well change in the future, so better safe than sorry.
* This is a very cheap check so we do it even without assertions. On
* v14, this gets called before initializing MyProc, so we cannot perform
* the check here. That's OK, we don't expect the logic to change in old
* releases.
*/
#if PG_VERSION_NUM>=150000
if (MyNeonCounters >= &neon_per_backend_counters_shared[NUM_NEON_PERF_COUNTER_SLOTS])
elog(ERROR, "MyNeonCounters points past end of array");
#endif
prfs_size = offsetof(PrefetchState, prf_buffer) +
sizeof(PrefetchRequest) * readahead_buffer_size;
MyPState = MemoryContextAllocZero(TopMemoryContext, prfs_size);
MyPState->n_unused = readahead_buffer_size;
MyPState->bufctx = SlabContextCreate(TopMemoryContext,
"NeonSMGR/prefetch",
SLAB_DEFAULT_BLOCK_SIZE * 17,
PS_GETPAGERESPONSE_SIZE);
MyPState->errctx = AllocSetContextCreate(TopMemoryContext,
"NeonSMGR/errors",
ALLOCSET_DEFAULT_SIZES);
MyPState->hashctx = AllocSetContextCreate(TopMemoryContext,
"NeonSMGR/prefetch",
ALLOCSET_DEFAULT_SIZES);
MyPState->prf_hash = prfh_create(MyPState->hashctx,
readahead_buffer_size, NULL);
}
/*
* neon_prefetch_response_usable -- Can a new request be satisfied by old one?
*
* This is used to check if the response to a prefetch request can be used to
* satisfy a page read now.
*/
static bool
neon_prefetch_response_usable(neon_request_lsns *request_lsns,
PrefetchRequest *slot)
{
/* sanity check the LSN's on the old and the new request */
Assert(request_lsns->request_lsn >= request_lsns->not_modified_since);
Assert(request_lsns->effective_request_lsn >= request_lsns->not_modified_since);
Assert(request_lsns->effective_request_lsn <= request_lsns->request_lsn);
Assert(slot->request_lsns.request_lsn >= slot->request_lsns.not_modified_since);
Assert(slot->request_lsns.effective_request_lsn >= slot->request_lsns.not_modified_since);
Assert(slot->request_lsns.effective_request_lsn <= slot->request_lsns.request_lsn);
Assert(slot->status != PRFS_UNUSED);
/*
* The new request's LSN should never be older than the old one. This
* could be an Assert, except that for testing purposes, we do provide an
* interface in neon_test_utils to fetch pages at arbitary LSNs, which
* violates this.
*
* Similarly, the not_modified_since value calculated for a page should
* never move backwards. This assumption is a bit fragile; if we updated
* the last-written cache when we read in a page, for example, then it
* might. But as the code stands, it should not.
*
* (If two backends issue a request at the same time, they might race and
* calculate LSNs "out of order" with each other, but the prefetch queue
* is backend-private at the moment.)
*/
if (request_lsns->effective_request_lsn < slot->request_lsns.effective_request_lsn ||
request_lsns->not_modified_since < slot->request_lsns.not_modified_since)
{
ereport(LOG,
(errcode(ERRCODE_IO_ERROR),
errmsg(NEON_TAG "request with unexpected LSN after prefetch"),
errdetail("Request %X/%X not_modified_since %X/%X, prefetch %X/%X not_modified_since %X/%X)",
LSN_FORMAT_ARGS(request_lsns->effective_request_lsn),
LSN_FORMAT_ARGS(request_lsns->not_modified_since),
LSN_FORMAT_ARGS(slot->request_lsns.effective_request_lsn),
LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since))));
return false;
}
/*---
* Each request to the pageserver has three LSN values associated with it:
* `not_modified_since`, `request_lsn`, and 'effective_request_lsn'.
* `not_modified_since` and `request_lsn` are sent to the pageserver, but
* in the primary node, we always use UINT64_MAX as the `request_lsn`, so
* we remember `effective_request_lsn` separately. In a primary,
* `effective_request_lsn` is the same as `not_modified_since`.
* See comments in neon_get_request_lsns why we can not use last flush WAL position here.
*
* To determine whether a response to a GetPage request issued earlier is
* still valid to satisfy a new page read, we look at the
* (not_modified_since, effective_request_lsn] range of the request. It is
* effectively a claim that the page has not been modified between those
* LSNs. If the range of the old request in the queue overlaps with the
* new request, we know that the page hasn't been modified in the union of
* the ranges. We can use the response to old request to satisfy the new
* request in that case. For example:
*
* 100 500
* Old request: +--------+
*
* 400 800
* New request: +--------+
*
* The old request claims that the page was not modified between LSNs 100
* and 500, and the second claims that it was not modified between 400 and
* 800. Together they mean that the page was not modified between 100 and
* 800. Therefore the response to the old request is also valid for the
* new request.
*
* This logic also holds at the boundary case that the old request's LSN
* matches the new request's not_modified_since LSN exactly:
*
* 100 500
* Old request: +--------+
*
* 500 900
* New request: +--------+
*
* The response to the old request is the page as it was at LSN 500, and
* the page hasn't been changed in the range (500, 900], therefore the
* response is valid also for the new request.
*/
/* this follows from the checks above */
Assert(request_lsns->effective_request_lsn >= slot->request_lsns.not_modified_since);
return request_lsns->not_modified_since <= slot->request_lsns.effective_request_lsn;
}
/*
* Does the physical file exist?
*/
bool
communicator_exists(NRelFileInfo rinfo, ForkNumber forkNum, neon_request_lsns *request_lsns)
{
bool exists;
NeonResponse *resp;
{
NeonExistsRequest request = {
.hdr.tag = T_NeonExistsRequest,
.hdr.lsn = request_lsns->request_lsn,
.hdr.not_modified_since = request_lsns->not_modified_since,
.rinfo = rinfo,
.forknum = forkNum
};
resp = page_server_request(&request);
switch (resp->tag)
{
case T_NeonExistsResponse:
{
NeonExistsResponse* exists_resp = (NeonExistsResponse *) resp;
if (neon_protocol_version >= 3)
{
if (!equal_requests(resp, &request.hdr) ||
!RelFileInfoEquals(exists_resp->req.rinfo, request.rinfo) ||
exists_resp->req.forknum != request.forknum)
{
NEON_PANIC_CONNECTION_STATE(0, PANIC,
"Unexpect response {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u} to exits request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(exists_resp->req.rinfo), exists_resp->req.forknum,
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), RelFileInfoFmt(request.rinfo), request.forknum);
}
}
exists = exists_resp->exists;
break;
}
case T_NeonErrorResponse:
if (neon_protocol_version >= 3)
{
if (!equal_requests(resp, &request.hdr))
{
elog(WARNING, NEON_TAG "Error message {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X} doesn't match exists request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since),
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since));
}
}
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg(NEON_TAG "[reqid " UINT64_HEX_FORMAT "] could not read relation existence of rel %u/%u/%u.%u from page server at lsn %X/%08X",
resp->reqid,
RelFileInfoFmt(rinfo),
forkNum,
LSN_FORMAT_ARGS(request_lsns->effective_request_lsn)),
errdetail("page server returned error: %s",
((NeonErrorResponse *) resp)->message)));
break;
default:
NEON_PANIC_CONNECTION_STATE(0, PANIC,
"Expected Exists (0x%02x) or Error (0x%02x) response to ExistsRequest, but got 0x%02x",
T_NeonExistsResponse, T_NeonErrorResponse, resp->tag);
}
pfree(resp);
}
return exists;
}
/*
* Read N pages at a specific LSN.
*
* *mask is set for pages read at a previous point in time, and which we
* should not touch, nor overwrite.
* New bits should be set in *mask for the pages we'successfully read.
*
* The offsets in request_lsns, buffers, and mask are linked.
*/
void
communicator_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber base_blockno,
neon_request_lsns *request_lsns,
void **buffers, BlockNumber nblocks, const bits8 *mask)
{
NeonResponse *resp;
uint64 ring_index;
PrfHashEntry *entry;
PrefetchRequest *slot;
PrefetchRequest hashkey;
Assert(PointerIsValid(request_lsns));
Assert(nblocks >= 1);
/*
* Use an intermediate PrefetchRequest struct as the hash key to ensure
* correct alignment and that the padding bytes are cleared.
*/
memset(&hashkey.buftag, 0, sizeof(BufferTag));
CopyNRelFileInfoToBufTag(hashkey.buftag, rinfo);
hashkey.buftag.forkNum = forkNum;
hashkey.buftag.blockNum = base_blockno;
/*
* The redo process does not lock pages that it needs to replay but are
* not in the shared buffers, so a concurrent process may request the page
* after redo has decided it won't redo that page and updated the LwLSN
* for that page. If we're in hot standby we need to take care that we
* don't return until after REDO has finished replaying up to that LwLSN,
* as the page should have been locked up to that point.
*
* See also the description on neon_redo_read_buffer_filter below.
*
* NOTE: It is possible that the WAL redo process will still do IO due to
* concurrent failed read IOs. Those IOs should never have a request_lsn
* that is as large as the WAL record we're currently replaying, if it
* weren't for the behaviour of the LwLsn cache that uses the highest
* value of the LwLsn cache when the entry is not found.
*/
(void) prefetch_register_bufferv(hashkey.buftag, request_lsns, nblocks, mask, false);
for (int i = 0; i < nblocks; i++)
{
void *buffer = buffers[i];
BlockNumber blockno = base_blockno + i;
neon_request_lsns *reqlsns = &request_lsns[i];
TimestampTz start_ts, end_ts;
if (PointerIsValid(mask) && BITMAP_ISSET(mask, i))
continue;
start_ts = GetCurrentTimestamp();
if (RecoveryInProgress() && MyBackendType != B_STARTUP)
XLogWaitForReplayOf(reqlsns->request_lsn);
/*
* Try to find prefetched page in the list of received pages.
*/
Retry:
hashkey.buftag.blockNum = blockno;
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
if (entry != NULL)
{
slot = entry->slot;
if (neon_prefetch_response_usable(reqlsns, slot))
{
ring_index = slot->my_ring_index;
}
else
{
/*
* Cannot use this prefetch, discard it
*
* We can't drop cache for not-yet-received requested items. It is
* unlikely this happens, but it can happen if prefetch distance
* is large enough and a backend didn't consume all prefetch
* requests.
*/
if (slot->status == PRFS_REQUESTED)
{
if (!prefetch_wait_for(slot->my_ring_index))
goto Retry;
}
/* drop caches */
prefetch_set_unused(slot->my_ring_index);
pgBufferUsage.prefetch.expired += 1;
MyNeonCounters->getpage_prefetch_discards_total++;
/* make it look like a prefetch cache miss */
entry = NULL;
}
}
do
{
if (entry == NULL)
{
ring_index = prefetch_register_bufferv(hashkey.buftag, reqlsns, 1, NULL, false);
Assert(ring_index != UINT64_MAX);
slot = GetPrfSlot(ring_index);
}
else
{
/*
* Empty our reference to the prefetch buffer's hash entry. When
* we wait for prefetches, the entry reference is invalidated by
* potential updates to the hash, and when we reconnect to the
* pageserver the prefetch we're waiting for may be dropped, in
* which case we need to retry and take the branch above.
*/
entry = NULL;
}
Assert(slot->my_ring_index == ring_index);
Assert(MyPState->ring_last <= ring_index &&
MyPState->ring_unused > ring_index);
Assert(slot->status != PRFS_UNUSED);
Assert(GetPrfSlot(ring_index) == slot);
} while (!prefetch_wait_for(ring_index));
Assert(slot->status == PRFS_RECEIVED);
Assert(memcmp(&hashkey.buftag, &slot->buftag, sizeof(BufferTag)) == 0);
Assert(hashkey.buftag.blockNum == base_blockno + i);
/* We already checked that response match request when storing it in slot */
resp = slot->response;
switch (resp->tag)
{
case T_NeonGetPageResponse:
{
NeonGetPageResponse* getpage_resp = (NeonGetPageResponse *) resp;
memcpy(buffer, getpage_resp->page, BLCKSZ);
/*
* With lfc_store_prefetch_result=true prefetch result is stored in LFC in prefetch_pump_state when response is received
* from page server. But if lfc_store_prefetch_result=false then it is not yet stored in LFC and we have to do it here
* under buffer lock.
*/
if (!lfc_store_prefetch_result)
lfc_write(rinfo, forkNum, blockno, buffer);
break;
}
case T_NeonErrorResponse:
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg(NEON_TAG "[shard %d, reqid " UINT64_HEX_FORMAT "] could not read block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X",
slot->shard_no, resp->reqid, blockno, RelFileInfoFmt(rinfo),
forkNum, LSN_FORMAT_ARGS(reqlsns->effective_request_lsn)),
errdetail("page server returned error: %s",
((NeonErrorResponse *) resp)->message)));
break;
default:
NEON_PANIC_CONNECTION_STATE(slot->shard_no, PANIC,
"Expected GetPage (0x%02x) or Error (0x%02x) response to GetPageRequest, but got 0x%02x",
T_NeonGetPageResponse, T_NeonErrorResponse, resp->tag);
}
/* buffer was used, clean up for later reuse */
prefetch_set_unused(ring_index);
prefetch_cleanup_trailing_unused();
end_ts = GetCurrentTimestamp();
inc_getpage_wait(end_ts >= start_ts ? (end_ts - start_ts) : 0);
}
}
/*
* neon_nblocks() -- Get the number of blocks stored in a relation.
*/
BlockNumber
communicator_nblocks(NRelFileInfo rinfo, ForkNumber forknum, neon_request_lsns *request_lsns)
{
NeonResponse *resp;
BlockNumber n_blocks;
{
NeonNblocksRequest request = {
.hdr.tag = T_NeonNblocksRequest,
.hdr.lsn = request_lsns->request_lsn,
.hdr.not_modified_since = request_lsns->not_modified_since,
.rinfo = rinfo,
.forknum = forknum,
};
resp = page_server_request(&request);
switch (resp->tag)
{
case T_NeonNblocksResponse:
{
NeonNblocksResponse * relsize_resp = (NeonNblocksResponse *) resp;
if (neon_protocol_version >= 3)
{
if (!equal_requests(resp, &request.hdr) ||
!RelFileInfoEquals(relsize_resp->req.rinfo, request.rinfo) ||
relsize_resp->req.forknum != forknum)
{
NEON_PANIC_CONNECTION_STATE(0, PANIC,
"Unexpect response {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u} to get relsize request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(relsize_resp->req.rinfo), relsize_resp->req.forknum,
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), RelFileInfoFmt(request.rinfo), forknum);
}
}
n_blocks = relsize_resp->n_blocks;
break;
}
case T_NeonErrorResponse:
if (neon_protocol_version >= 3)
{
if (!equal_requests(resp, &request.hdr))
{
elog(WARNING, NEON_TAG "Error message {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X} doesn't match get relsize request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since),
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since));
}
}
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg(NEON_TAG "[reqid " UINT64_HEX_FORMAT "] could not read relation size of rel %u/%u/%u.%u from page server at lsn %X/%08X",
resp->reqid,
RelFileInfoFmt(rinfo),
forknum,
LSN_FORMAT_ARGS(request_lsns->effective_request_lsn)),
errdetail("page server returned error: %s",
((NeonErrorResponse *) resp)->message)));
break;
default:
NEON_PANIC_CONNECTION_STATE(0, PANIC,
"Expected Nblocks (0x%02x) or Error (0x%02x) response to NblocksRequest, but got 0x%02x",
T_NeonNblocksResponse, T_NeonErrorResponse, resp->tag);
}
pfree(resp);
}
return n_blocks;
}
/*
* neon_db_size() -- Get the size of the database in bytes.
*/
int64
communicator_dbsize(Oid dbNode, neon_request_lsns *request_lsns)
{
NeonResponse *resp;
int64 db_size;
{
NeonDbSizeRequest request = {
.hdr.tag = T_NeonDbSizeRequest,
.hdr.lsn = request_lsns->request_lsn,
.hdr.not_modified_since = request_lsns->not_modified_since,
.dbNode = dbNode,
};
resp = page_server_request(&request);
switch (resp->tag)
{
case T_NeonDbSizeResponse:
{
NeonDbSizeResponse* dbsize_resp = (NeonDbSizeResponse *) resp;
if (neon_protocol_version >= 3)
{
if (!equal_requests(resp, &request.hdr) ||
dbsize_resp->req.dbNode != dbNode)
{
NEON_PANIC_CONNECTION_STATE(0, PANIC,
"Unexpect response {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, dbNode=%u} to get DB size request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, dbNode=%u}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), dbsize_resp->req.dbNode,
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), dbNode);
}
}
db_size = dbsize_resp->db_size;
break;
}
case T_NeonErrorResponse:
if (neon_protocol_version >= 3)
{
if (!equal_requests(resp, &request.hdr))
{
elog(WARNING, NEON_TAG "Error message {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X} doesn't match get DB size request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since),
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since));
}
}
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg(NEON_TAG "[reqid " UINT64_HEX_FORMAT "] could not read db size of db %u from page server at lsn %X/%08X",
resp->reqid,
dbNode, LSN_FORMAT_ARGS(request_lsns->effective_request_lsn)),
errdetail("page server returned error: %s",
((NeonErrorResponse *) resp)->message)));
break;
default:
NEON_PANIC_CONNECTION_STATE(0, PANIC,
"Expected DbSize (0x%02x) or Error (0x%02x) response to DbSizeRequest, but got 0x%02x",
T_NeonDbSizeResponse, T_NeonErrorResponse, resp->tag);
}
pfree(resp);
}
return db_size;
}
int
communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *request_lsns,
void *buffer)
{
int n_blocks;
shardno_t shard_no = 0; /* All SLRUs are at shard 0 */
NeonResponse *resp = NULL;
NeonGetSlruSegmentRequest request;
request = (NeonGetSlruSegmentRequest) {
.hdr.tag = T_NeonGetSlruSegmentRequest,
.hdr.lsn = request_lsns->request_lsn,
.hdr.not_modified_since = request_lsns->not_modified_since,
.kind = kind,
.segno = segno
};
consume_prefetch_responses();
PG_TRY();
{
before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
do
{
while (!page_server->send(shard_no, &request.hdr) || !page_server->flush(shard_no));
resp = page_server->receive(shard_no);
} while (resp == NULL);
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
}
PG_CATCH();
{
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
/* Nothing should cancel disconnect: we should not leave connection in opaque state */
HOLD_INTERRUPTS();
page_server->disconnect(shard_no);
RESUME_INTERRUPTS();
PG_RE_THROW();
}
PG_END_TRY();
switch (resp->tag)
{
case T_NeonGetSlruSegmentResponse:
{
NeonGetSlruSegmentResponse* slru_resp = (NeonGetSlruSegmentResponse *) resp;
if (neon_protocol_version >= 3)
{
if (!equal_requests(resp, &request.hdr) ||
slru_resp->req.kind != kind ||
slru_resp->req.segno != segno)
{
NEON_PANIC_CONNECTION_STATE(0, PANIC,
"Unexpect response {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, kind=%u, segno=%u} to get SLRU segment request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X, kind=%u, segno=%lluu}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), slru_resp->req.kind, slru_resp->req.segno,
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), kind, (unsigned long long) segno);
}
}
n_blocks = slru_resp->n_blocks;
memcpy(buffer, slru_resp->data, n_blocks*BLCKSZ);
break;
}
case T_NeonErrorResponse:
if (neon_protocol_version >= 3)
{
if (!equal_requests(resp, &request.hdr))
{
elog(WARNING, NEON_TAG "Error message {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X} doesn't match get SLRU segment request {reqid=" UINT64_HEX_FORMAT ",lsn=%X/%08X, since=%X/%08X}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since),
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since));
}
}
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg(NEON_TAG "[reqid " UINT64_HEX_FORMAT "] could not read SLRU %d segment %llu at lsn %X/%08X",
resp->reqid,
kind,
(unsigned long long) segno,
LSN_FORMAT_ARGS(request_lsns->request_lsn)),
errdetail("page server returned error: %s",
((NeonErrorResponse *) resp)->message)));
break;
default:
NEON_PANIC_CONNECTION_STATE(0, PANIC,
"Expected GetSlruSegment (0x%02x) or Error (0x%02x) response to GetSlruSegmentRequest, but got 0x%02x",
T_NeonGetSlruSegmentResponse, T_NeonErrorResponse, resp->tag);
}
pfree(resp);
communicator_reconfigure_timeout_if_needed();
return n_blocks;
}
void
communicator_reconfigure_timeout_if_needed(void)
{
bool needs_set = MyPState->ring_receive != MyPState->ring_unused &&
!AmPrewarmWorker && /* do not pump prefetch state in prewarm worker */
readahead_getpage_pull_timeout_ms > 0;
if (needs_set != timeout_set)
{
/* The background writer doens't (shouldn't) read any pages */
Assert(!AmBackgroundWriterProcess());
/* The checkpointer doens't (shouldn't) read any pages */
Assert(!AmCheckpointerProcess());
if (unlikely(PS_TIMEOUT_ID == 0))
{
PS_TIMEOUT_ID = RegisterTimeout(USER_TIMEOUT, pagestore_timeout_handler);
}
if (needs_set)
{
#if PG_MAJORVERSION_NUM <= 14
enable_timeout_after(PS_TIMEOUT_ID, readahead_getpage_pull_timeout_ms);
#else
enable_timeout_every(
PS_TIMEOUT_ID,
TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
readahead_getpage_pull_timeout_ms),
readahead_getpage_pull_timeout_ms
);
#endif
timeout_set = true;
}
else
{
Assert(timeout_set);
disable_timeout(PS_TIMEOUT_ID, false);
timeout_set = false;
}
}
}
static void
pagestore_timeout_handler(void)
{
#if PG_MAJORVERSION_NUM <= 14
/*
* PG14: Setting a repeating timeout is not possible, so we signal here
* that the timeout has already been reset, and by telling the system
* that system will re-schedule it later if we need to.
*/
timeout_set = false;
#endif
timeout_signaled = true;
InterruptPending = true;
}
/*
* Process new data received in our active PageStream sockets.
*
* This relies on the invariant that all pipelined yet-to-be-received requests
* are getPage requests managed by MyPState. This is currently true, any
* modification will probably require some stuff to make it work again.
*/
static bool
communicator_processinterrupts(void)
{
if (timeout_signaled)
{
if (!readpage_reentrant_guard && readahead_getpage_pull_timeout_ms > 0)
communicator_prefetch_pump_state();
timeout_signaled = false;
communicator_reconfigure_timeout_if_needed();
}
if (!prev_interrupt_cb)
return false;
return prev_interrupt_cb();
}