Merge remote-tracking branch 'upstream/compute_sharding_support' into jcsp/sharding-pt1

This commit is contained in:
John Spray
2023-11-09 16:43:19 +00:00
5 changed files with 365 additions and 78 deletions

View File

@@ -27,12 +27,14 @@
#include "commands/defrem.h"
#include "miscadmin.h"
#include "utils/acl.h"
#include "utils/fmgrprotos.h"
#include "fmgr.h"
#include "utils/guc.h"
#include "port.h"
#include <curl/curl.h>
#include "utils/jsonb.h"
#include "libpq/crypt.h"
#include "pagestore_client.h"
static ProcessUtility_hook_type PreviousProcessUtilityHook = NULL;
@@ -222,6 +224,104 @@ ErrorWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
return nmemb;
}
static size_t
ResponseWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
{
appendBinaryStringInfo((StringInfo)userdata, ptr, size*nmemb);
return nmemb;
}
void
RequestShardMapFromControlPlane(ShardMap* shard_map)
{
shard_map->n_shards = 0;
if (!ConsoleURL)
{
elog(LOG, "ConsoleURL not set, skipping forwarding");
return;
}
StringInfoData resp;
initStringInfo(&resp);
curl_easy_setopt(CurlHandle, CURLOPT_CUSTOMREQUEST, "GET");
curl_easy_setopt(CurlHandle, CURLOPT_URL, ConsoleURL);
curl_easy_setopt(CurlHandle, CURLOPT_ERRORBUFFER, CurlErrorBuf);
curl_easy_setopt(CurlHandle, CURLOPT_TIMEOUT, 3L /* seconds */ );
curl_easy_setopt(CurlHandle, CURLOPT_WRITEDATA, &resp);
curl_easy_setopt(CurlHandle, CURLOPT_WRITEFUNCTION, ResponseWriteCallback);
const int num_retries = 5;
int curl_status;
for (int i = 0; i < num_retries; i++)
{
if ((curl_status = curl_easy_perform(CurlHandle)) == CURLE_OK)
break;
elog(LOG, "Curl request failed on attempt %d: %s", i, CurlErrorBuf);
pg_usleep(1000 * 1000);
}
if (curl_status != CURLE_OK)
{
curl_easy_cleanup(CurlHandle);
elog(ERROR, "Failed to perform curl request: %s", CurlErrorBuf);
}
else
{
long response_code;
if (curl_easy_getinfo(CurlHandle, CURLINFO_RESPONSE_CODE, &response_code) != CURLE_UNKNOWN_OPTION)
{
if (response_code != 200)
{
bool error_exists = resp.len != 0;
if(error_exists)
{
elog(ERROR,
"[PG_LLM] Received HTTP code %ld from OpenAI: %s",
response_code,
resp.data);
}
else
{
elog(ERROR,
"[PG_LLM] Received HTTP code %ld from OpenAI",
response_code);
}
}
}
curl_easy_cleanup(CurlHandle);
JsonbContainer *jsonb = (JsonbContainer *)DatumGetPointer(DirectFunctionCall1(jsonb_in, CStringGetDatum(resp.data)));
JsonbValue v;
JsonbIterator *it;
JsonbIteratorToken r;
it = JsonbIteratorInit(jsonb);
r = JsonbIteratorNext(&it, &v, true);
if (r != WJB_BEGIN_ARRAY)
elog(ERROR, "Array of connection strings expected");
while ((r = JsonbIteratorNext(&it, &v, true)) != WJB_DONE)
{
if (r != WJB_ELEM)
continue;
if (shard_map->n_shards >= MAX_SHARDS)
elog(ERROR, "Too many shards");
if (v.type != jbvString)
elog(ERROR, "Connection string expected");
strncpy(shard_map->shard_connstr[shard_map->n_shards++],
v.val.string.val,
MAX_PS_CONNSTR_LEN);
}
shard_map->update_counter += 1;
pfree(resp.data);
}
}
static void
SendDeltasToControlPlane()
{

View File

@@ -2,5 +2,6 @@
#define CONTROL_PLANE_CONNECTOR_H
void InitControlPlaneConnector();
void RequestShardMapFromControlPlane(ShardMap* shard_map);
#endif

View File

@@ -18,7 +18,9 @@
#include "fmgr.h"
#include "access/xlog.h"
#include "access/xlogutils.h"
#include "common/hashfn.h"
#include "storage/buf_internals.h"
#include "storage/ipc.h"
#include "c.h"
#include "libpq-fe.h"
@@ -32,22 +34,12 @@
#include "neon.h"
#include "walproposer.h"
#include "neon_utils.h"
#include "control_plane_connector.h"
#define PageStoreTrace DEBUG5
#define RECONNECT_INTERVAL_USEC 1000000
bool connected = false;
PGconn *pageserver_conn = NULL;
/*
* WaitEventSet containing:
* - WL_SOCKET_READABLE on pageserver_conn,
* - WL_LATCH_SET on MyLatch, and
* - WL_EXIT_ON_PM_DEATH.
*/
WaitEventSet *pageserver_conn_wes = NULL;
/* GUCs */
char *neon_timeline;
char *neon_tenant;
@@ -63,12 +55,136 @@ int max_reconnect_attempts = 60;
bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL;
static bool pageserver_flush(void);
static void pageserver_disconnect(void);
static bool pageserver_flush(shardno_t shard_no);
static void pageserver_disconnect(shardno_t shard_no);
static pqsigfunc prev_signal_handler;
static shmem_startup_hook_type prev_shmem_startup_hook;
#if PG_VERSION_NUM>=150000
static shmem_request_hook_type prev_shmem_request_hook;
#endif
static ShardMap* shard_map;
static LWLockId shard_map_lock;
static size_t shard_map_update_counter;
typedef struct
{
/*
* connection for each shard
*/
PGconn *conn;
/*
* WaitEventSet containing:
* - WL_SOCKET_READABLE on pageserver_conn,
* - WL_LATCH_SET on MyLatch, and
* - WL_EXIT_ON_PM_DEATH.
*/
WaitEventSet *wes;
} PageServer;
static PageServer page_servers[MAX_SHARDS];
static void
psm_shmem_startup(void)
{
bool found;
if (prev_shmem_startup_hook)
{
prev_shmem_startup_hook();
}
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
shard_map = (ShardMap*)ShmemInitStruct("shard_map", sizeof(ShardMap), &found);
if (!found)
{
shard_map_lock = (LWLockId)GetNamedLWLockTranche("shard_map_lock");
shard_map->n_shards = 0;
shard_map->update_counter = 0;
}
LWLockRelease(AddinShmemInitLock);
}
static void
psm_shmem_request(void)
{
#if PG_VERSION_NUM>=150000
if (prev_shmem_request_hook)
prev_shmem_request_hook();
#endif
RequestAddinShmemSpace(sizeof(ShardMap));
RequestNamedLWLockTranche("shard_map_lock", 1);
}
static void
psm_init(void)
{
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = psm_shmem_startup;
#if PG_VERSION_NUM>=150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = psm_shmem_request;
#else
psm_shmem_request();
#endif
}
shardno_t
get_shard_number(BufferTag* tag)
{
shardno_t shard_no;
uint32 hash;
#if PG_MAJORVERSION_NUM < 16
hash = murmurhash32(tag->rnode.spcNode);
hash_combine(hash, murmurhash32(tag->rnode.dbNode));
hash_combine(hash, murmurhash32(tag->rnode.relNode));
hash_combine(hash, murmurhash32(tag->blockNum/STRIPE_SIZE));
#else
hash = murmurhash32(tag->spcOid);
hash_combine(hash, murmurhash32(tag->dbOid));
hash_combine(hash, murmurhash32(tag->relNumber));
hash_combine(hash, murmurhash32(tag->blockNum/STRIPE_SIZE));
#endif
LWLockAcquire(shard_map_lock, LW_SHARED);
while (shard_map->n_shards == 0 || shard_map_update_counter != shard_map->update_counter)
{
/* Close all existed connections */
for (shard_no = 0; shard_no < shard_map->n_shards; shard_no++)
{
if (page_servers[shard_no].conn)
pageserver_disconnect(shard_no);
}
/* Request new shard map from control plane under exclusive lock */
LWLockRelease(shard_map_lock);
LWLockAcquire(shard_map_lock, LW_EXCLUSIVE);
if (shard_map->n_shards == 0)
{
if (*page_server_connstring)
{
shard_map->n_shards = 1;
strncpy(shard_map->shard_connstr[0], page_server_connstring, sizeof shard_map->shard_connstr[0]);
}
else
{
RequestShardMapFromControlPlane(shard_map);
}
shard_map_update_counter = shard_map->update_counter;
}
}
shard_no = hash % shard_map->n_shards;
LWLockRelease(shard_map_lock);
return shard_no;
}
static void
pageserver_sighup_handler(SIGNAL_ARGS)
{
@@ -77,19 +193,25 @@ pageserver_sighup_handler(SIGNAL_ARGS)
prev_signal_handler(postgres_signal_arg);
}
neon_log(LOG, "Received SIGHUP, disconnecting pageserver. New pageserver connstring is %s", page_server_connstring);
pageserver_disconnect();
/* force refetching shard map from control plane */
LWLockAcquire(shard_map_lock, LW_EXCLUSIVE);
shard_map->n_shards = 0;
LWLockRelease(shard_map_lock);
}
static bool
pageserver_connect(int elevel)
pageserver_connect(shardno_t shard_no, int elevel)
{
char *query;
int ret;
const char *keywords[3];
const char *values[3];
int n;
PGconn* conn;
WaitEventSet *wes;
Assert(!connected);
Assert(page_servers[shard_no].conn == NULL);
/*
* Connect using the connection string we got from the
@@ -110,19 +232,18 @@ pageserver_connect(int elevel)
n++;
}
keywords[n] = "dbname";
values[n] = page_server_connstring;
values[n] = shard_map->shard_connstr[shard_no];
n++;
keywords[n] = NULL;
values[n] = NULL;
n++;
pageserver_conn = PQconnectdbParams(keywords, values, 1);
conn = PQconnectdbParams(keywords, values, 1);
if (PQstatus(pageserver_conn) == CONNECTION_BAD)
if (PQstatus(conn) == CONNECTION_BAD)
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
char *msg = pchomp(PQerrorMessage(conn));
PQfinish(pageserver_conn);
pageserver_conn = NULL;
PQfinish(conn);
ereport(elevel,
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
@@ -130,30 +251,28 @@ pageserver_connect(int elevel)
errdetail_internal("%s", msg)));
return false;
}
query = psprintf("pagestream %s %s", neon_tenant, neon_timeline);
ret = PQsendQuery(pageserver_conn, query);
ret = PQsendQuery(conn, query);
if (ret != 1)
{
PQfinish(pageserver_conn);
pageserver_conn = NULL;
PQfinish(conn);
neon_log(elevel, "could not send pagestream command to pageserver");
return false;
}
pageserver_conn_wes = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(pageserver_conn_wes, WL_LATCH_SET, PGINVALID_SOCKET,
wes = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
AddWaitEventToSet(pageserver_conn_wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
AddWaitEventToSet(wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
AddWaitEventToSet(pageserver_conn_wes, WL_SOCKET_READABLE, PQsocket(pageserver_conn), NULL, NULL);
AddWaitEventToSet(wes, WL_SOCKET_READABLE, PQsocket(conn), NULL, NULL);
while (PQisBusy(pageserver_conn))
while (PQisBusy(conn))
{
WaitEvent event;
/* Sleep until there's something to do */
(void) WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION);
(void) WaitEventSetWait(wes, -1L, &event, 1, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
@@ -161,14 +280,12 @@ pageserver_connect(int elevel)
/* Data available in socket? */
if (event.events & WL_SOCKET_READABLE)
{
if (!PQconsumeInput(pageserver_conn))
if (!PQconsumeInput(conn))
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
char *msg = pchomp(PQerrorMessage(conn));
PQfinish(pageserver_conn);
pageserver_conn = NULL;
FreeWaitEventSet(pageserver_conn_wes);
pageserver_conn_wes = NULL;
PQfinish(conn);
FreeWaitEventSet(wes);
neon_log(elevel, "could not complete handshake with pageserver: %s",
msg);
@@ -177,9 +294,10 @@ pageserver_connect(int elevel)
}
}
neon_log(LOG, "libpagestore: connected to '%s'", page_server_connstring);
neon_log(LOG, "libpagestore: connected to '%s'", shard_map->shard_connstr[shard_no]);
page_servers[shard_no].conn = conn;
page_servers[shard_no].wes = wes;
connected = true;
return true;
}
@@ -187,10 +305,10 @@ pageserver_connect(int elevel)
* A wrapper around PQgetCopyData that checks for interrupts while sleeping.
*/
static int
call_PQgetCopyData(char **buffer)
call_PQgetCopyData(shardno_t shard_no, char **buffer)
{
int ret;
PGconn* pageserver_conn = page_servers[shard_no].conn;
retry:
ret = PQgetCopyData(pageserver_conn, buffer, 1 /* async */ );
@@ -199,7 +317,7 @@ retry:
WaitEvent event;
/* Sleep until there's something to do */
(void) WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION);
(void) WaitEventSetWait(page_servers[shard_no].wes, -1L, &event, 1, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
@@ -224,7 +342,7 @@ retry:
static void
pageserver_disconnect(void)
pageserver_disconnect(shardno_t shard_no)
{
/*
* If anything goes wrong while we were sending a request, it's not clear
@@ -233,32 +351,32 @@ pageserver_disconnect(void)
* time later after we have already sent a new unrelated request. Close
* the connection to avoid getting confused.
*/
if (connected)
if (page_servers[shard_no].conn)
{
neon_log(LOG, "dropping connection to page server due to error");
PQfinish(pageserver_conn);
pageserver_conn = NULL;
connected = false;
PQfinish(page_servers[shard_no].conn);
page_servers[shard_no].conn = NULL;
prefetch_on_ps_disconnect();
}
if (pageserver_conn_wes != NULL)
if (page_servers[shard_no].wes != NULL)
{
FreeWaitEventSet(pageserver_conn_wes);
pageserver_conn_wes = NULL;
FreeWaitEventSet(page_servers[shard_no].wes);
page_servers[shard_no].wes = NULL;
}
}
static bool
pageserver_send(NeonRequest * request)
pageserver_send(shardno_t shard_no, NeonRequest * request)
{
StringInfoData req_buff;
PGconn* pageserver_conn = page_servers[shard_no].conn;
/* If the connection was lost for some reason, reconnect */
if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD)
if (pageserver_conn && PQstatus(pageserver_conn) == CONNECTION_BAD)
{
neon_log(LOG, "pageserver_send disconnect bad connection");
pageserver_disconnect();
pageserver_disconnect(shard_no);
}
req_buff = nm_pack_request(request);
@@ -270,9 +388,9 @@ pageserver_send(NeonRequest * request)
* See https://github.com/neondatabase/neon/issues/1138
* So try to reestablish connection in case of failure.
*/
if (!connected)
if (!page_servers[shard_no].conn)
{
while (!pageserver_connect(n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
while (!pageserver_connect(shard_no, n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
{
n_reconnect_attempts += 1;
pg_usleep(RECONNECT_INTERVAL_USEC);
@@ -280,7 +398,9 @@ pageserver_send(NeonRequest * request)
n_reconnect_attempts = 0;
}
/*
pageserver_conn = page_servers[shard_no].conn;
/*
* Send request.
*
* In principle, this could block if the output buffer is full, and we
@@ -291,7 +411,7 @@ pageserver_send(NeonRequest * request)
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0)
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
pageserver_disconnect();
pageserver_disconnect(shard_no);
neon_log(LOG, "pageserver_send disconnect because failed to send page request (try to reconnect): %s", msg);
pfree(msg);
pfree(req_buff.data);
@@ -311,12 +431,12 @@ pageserver_send(NeonRequest * request)
}
static NeonResponse *
pageserver_receive(void)
pageserver_receive(shardno_t shard_no)
{
StringInfoData resp_buff;
NeonResponse *resp;
if (!connected)
PGconn* pageserver_conn = page_servers[shard_no].conn;
if (!pageserver_conn)
return NULL;
PG_TRY();
@@ -324,7 +444,7 @@ pageserver_receive(void)
/* read response */
int rc;
rc = call_PQgetCopyData(&resp_buff.data);
rc = call_PQgetCopyData(shard_no, &resp_buff.data);
if (rc >= 0)
{
resp_buff.len = rc;
@@ -343,25 +463,25 @@ pageserver_receive(void)
else if (rc == -1)
{
neon_log(LOG, "pageserver_receive disconnect because call_PQgetCopyData returns -1: %s", pchomp(PQerrorMessage(pageserver_conn)));
pageserver_disconnect();
pageserver_disconnect(shard_no);
resp = NULL;
}
else if (rc == -2)
{
char* msg = pchomp(PQerrorMessage(pageserver_conn));
pageserver_disconnect();
pageserver_disconnect(shard_no);
neon_log(ERROR, "pageserver_receive disconnect because could not read COPY data: %s", msg);
}
else
{
pageserver_disconnect();
pageserver_disconnect(shard_no);
neon_log(ERROR, "pageserver_receive disconnect because unexpected PQgetCopyData return value: %d", rc);
}
}
PG_CATCH();
{
neon_log(LOG, "pageserver_receive disconnect due to caught exception");
pageserver_disconnect();
pageserver_disconnect(shard_no);
PG_RE_THROW();
}
PG_END_TRY();
@@ -371,9 +491,10 @@ pageserver_receive(void)
static bool
pageserver_flush(void)
pageserver_flush(shardno_t shard_no)
{
if (!connected)
PGconn* pageserver_conn = page_servers[shard_no].conn;
if (!pageserver_conn)
{
neon_log(WARNING, "Tried to flush while disconnected");
}
@@ -382,7 +503,7 @@ pageserver_flush(void)
if (PQflush(pageserver_conn))
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
pageserver_disconnect();
pageserver_disconnect(shard_no);
neon_log(LOG, "pageserver_flush disconnect because failed to flush page requests: %s", msg);
pfree(msg);
return false;
@@ -502,4 +623,5 @@ pg_init_libpagestore(void)
prev_signal_handler = pqsignal(SIGHUP, pageserver_sighup_handler);
lfc_init();
psm_init();
}

View File

@@ -20,12 +20,25 @@
#include RELFILEINFO_HDR
#include "storage/block.h"
#include "storage/smgr.h"
#include "storage/buf_internals.h"
#include "lib/stringinfo.h"
#include "libpq/pqformat.h"
#include "utils/memutils.h"
#include "pg_config.h"
#define MAX_SHARDS 128
#define STRIPE_SIZE (256 * 1024 / 8) /* TODO: should in betaken from control plane? */
#define MAX_PS_CONNSTR_LEN 128
typedef struct
{
size_t n_shards;
size_t update_counter;
char shard_connstr[MAX_SHARDS][MAX_PS_CONNSTR_LEN];
} ShardMap;
typedef enum
{
/* pagestore_client -> pagestore */
@@ -144,11 +157,13 @@ extern char *nm_to_string(NeonMessage * msg);
* API
*/
typedef unsigned shardno_t;
typedef struct
{
bool (*send) (NeonRequest * request);
NeonResponse *(*receive) (void);
bool (*flush) (void);
bool (*send) (shardno_t shard_no, NeonRequest * request);
NeonResponse *(*receive) (shardno_t shard_no);
bool (*flush) (shardno_t shard_no);
} page_server_api;
extern void prefetch_on_ps_disconnect(void);
@@ -165,6 +180,8 @@ extern char *neon_tenant;
extern bool wal_redo;
extern int32 max_cluster_size;
extern shardno_t get_shard_number(BufferTag* tag);
extern const f_smgr *smgr_neon(BackendId backend, NRelFileInfo rinfo);
extern void smgr_init_neon(void);
extern void readahead_buffer_resize(int newsize, void *extra);

View File

@@ -164,6 +164,7 @@ typedef struct PrefetchRequest {
XLogRecPtr actual_request_lsn;
NeonResponse *response; /* may be null */
PrefetchStatus status;
shardno_t shard_no;
uint64 my_ring_index;
} PrefetchRequest;
@@ -225,6 +226,8 @@ typedef struct PrefetchState {
/* the buffers */
prfh_hash *prf_hash;
int max_shard_no;
uint8 shard_bitmap[(MAX_SHARDS + 7)/8];
PrefetchRequest prf_buffer[]; /* prefetch buffers */
} PrefetchState;
@@ -313,6 +316,7 @@ compact_prefetch_buffers(void)
Assert(target_slot->status == PRFS_UNUSED);
target_slot->buftag = source_slot->buftag;
target_slot->shard_no = source_slot->shard_no;
target_slot->status = source_slot->status;
target_slot->response = source_slot->response;
target_slot->effective_request_lsn = source_slot->effective_request_lsn;
@@ -477,6 +481,23 @@ prefetch_cleanup_trailing_unused(void)
}
}
static bool
prefetch_flush_requests(void)
{
for (shardno_t shard_no = 0; shard_no < MyPState->max_shard_no; shard_no++)
{
if (MyPState->shard_bitmap[shard_no >> 3] & (1 << (shard_no & 7)))
{
if (!page_server->flush(shard_no))
return false;
MyPState->shard_bitmap[shard_no >> 3] &= ~(1 << (shard_no & 7));
}
}
MyPState->max_shard_no = 0;
return true;
}
/*
* Wait for slot of ring_index to have received its response.
* The caller is responsible for making sure the request buffer is flushed.
@@ -492,7 +513,7 @@ prefetch_wait_for(uint64 ring_index)
if (MyPState->ring_flush <= ring_index &&
MyPState->ring_unused > MyPState->ring_flush)
{
if (!page_server->flush())
if (!prefetch_flush_requests())
return false;
MyPState->ring_flush = MyPState->ring_unused;
}
@@ -530,7 +551,7 @@ prefetch_read(PrefetchRequest *slot)
Assert(slot->my_ring_index == MyPState->ring_receive);
old = MemoryContextSwitchTo(MyPState->errctx);
response = (NeonResponse *) page_server->receive();
response = (NeonResponse *) page_server->receive(slot->shard_no);
MemoryContextSwitchTo(old);
if (response)
{
@@ -682,12 +703,14 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
Assert(slot->response == NULL);
Assert(slot->my_ring_index == MyPState->ring_unused);
while (!page_server->send((NeonRequest *) &request));
while (!page_server->send(slot->shard_no, (NeonRequest *) &request));
/* update prefetch state */
MyPState->n_requests_inflight += 1;
MyPState->n_unused -= 1;
MyPState->ring_unused += 1;
MyPState->shard_bitmap[slot->shard_no >> 3] |= 1 << (slot->shard_no & 7);
MyPState->max_shard_no = Max(slot->shard_no+1, MyPState->max_shard_no);
/* update slot state */
slot->status = PRFS_REQUESTED;
@@ -847,6 +870,7 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
* function reads the buffer tag from the slot.
*/
slot->buftag = tag;
slot->shard_no = get_shard_number(&tag);
slot->my_ring_index = ring_index;
prefetch_do_request(slot, force_latest, force_lsn);
@@ -857,7 +881,7 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
if (flush_every_n_requests > 0 &&
MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests)
{
if (!page_server->flush())
if (!prefetch_flush_requests())
{
/* Prefetch set is reset in case of error, so we should try to register our request once again */
goto Retry;
@@ -872,11 +896,34 @@ static NeonResponse *
page_server_request(void const *req)
{
NeonResponse* resp;
BufferTag tag = {0};
shardno_t shard_no;
switch (((NeonRequest *) req)->tag)
{
case T_NeonExistsRequest:
CopyNRelFileInfoToBufTag(tag, ((NeonExistsRequest *) req)->rinfo);
break;
case T_NeonNblocksRequest:
CopyNRelFileInfoToBufTag(tag, ((NeonNblocksRequest *) req)->rinfo);
break;
case T_NeonDbSizeRequest:
NInfoGetDbOid(BufTagGetNRelFileInfo(tag)) = ((NeonDbSizeRequest *) req)->dbNode;
break;
case T_NeonGetPageRequest:
CopyNRelFileInfoToBufTag(tag, ((NeonGetPageRequest *) req)->rinfo);
tag.blockNum = ((NeonGetPageRequest *) req)->blkno;
break;
default:
elog(ERROR, "Unexpected request tag: %d", ((NeonRequest *) req)->tag);
}
shard_no = get_shard_number(&tag);
do {
while (!page_server->send((NeonRequest *) req) || !page_server->flush());
while (!page_server->send(shard_no, (NeonRequest *) req) || !page_server->flush(shard_no));
MyPState->ring_flush = MyPState->ring_unused;
consume_prefetch_responses();
resp = page_server->receive();
resp = page_server->receive(shard_no);
} while (resp == NULL);
return resp;