diff --git a/pgxn/neon/control_plane_connector.c b/pgxn/neon/control_plane_connector.c index debbbce117..abb350e777 100644 --- a/pgxn/neon/control_plane_connector.c +++ b/pgxn/neon/control_plane_connector.c @@ -27,12 +27,14 @@ #include "commands/defrem.h" #include "miscadmin.h" #include "utils/acl.h" +#include "utils/fmgrprotos.h" #include "fmgr.h" #include "utils/guc.h" #include "port.h" #include #include "utils/jsonb.h" #include "libpq/crypt.h" +#include "pagestore_client.h" static ProcessUtility_hook_type PreviousProcessUtilityHook = NULL; @@ -222,6 +224,104 @@ ErrorWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata) return nmemb; } + +static size_t +ResponseWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata) +{ + appendBinaryStringInfo((StringInfo)userdata, ptr, size*nmemb); + return nmemb; +} + +void +RequestShardMapFromControlPlane(ShardMap* shard_map) +{ + shard_map->n_shards = 0; + if (!ConsoleURL) + { + elog(LOG, "ConsoleURL not set, skipping forwarding"); + return; + } + StringInfoData resp; + initStringInfo(&resp); + + curl_easy_setopt(CurlHandle, CURLOPT_CUSTOMREQUEST, "GET"); + curl_easy_setopt(CurlHandle, CURLOPT_URL, ConsoleURL); + curl_easy_setopt(CurlHandle, CURLOPT_ERRORBUFFER, CurlErrorBuf); + curl_easy_setopt(CurlHandle, CURLOPT_TIMEOUT, 3L /* seconds */ ); + curl_easy_setopt(CurlHandle, CURLOPT_WRITEDATA, &resp); + curl_easy_setopt(CurlHandle, CURLOPT_WRITEFUNCTION, ResponseWriteCallback); + + const int num_retries = 5; + int curl_status; + + for (int i = 0; i < num_retries; i++) + { + if ((curl_status = curl_easy_perform(CurlHandle)) == CURLE_OK) + break; + elog(LOG, "Curl request failed on attempt %d: %s", i, CurlErrorBuf); + pg_usleep(1000 * 1000); + } + if (curl_status != CURLE_OK) + { + curl_easy_cleanup(CurlHandle); + elog(ERROR, "Failed to perform curl request: %s", CurlErrorBuf); + } + else + { + long response_code; + if (curl_easy_getinfo(CurlHandle, CURLINFO_RESPONSE_CODE, &response_code) != CURLE_UNKNOWN_OPTION) + { + if (response_code != 200) + { + bool error_exists = resp.len != 0; + if(error_exists) + { + elog(ERROR, + "[PG_LLM] Received HTTP code %ld from OpenAI: %s", + response_code, + resp.data); + } + else + { + elog(ERROR, + "[PG_LLM] Received HTTP code %ld from OpenAI", + response_code); + } + } + } + curl_easy_cleanup(CurlHandle); + + JsonbContainer *jsonb = (JsonbContainer *)DatumGetPointer(DirectFunctionCall1(jsonb_in, CStringGetDatum(resp.data))); + JsonbValue v; + JsonbIterator *it; + JsonbIteratorToken r; + + it = JsonbIteratorInit(jsonb); + r = JsonbIteratorNext(&it, &v, true); + if (r != WJB_BEGIN_ARRAY) + elog(ERROR, "Array of connection strings expected"); + + while ((r = JsonbIteratorNext(&it, &v, true)) != WJB_DONE) + { + if (r != WJB_ELEM) + continue; + + if (shard_map->n_shards >= MAX_SHARDS) + elog(ERROR, "Too many shards"); + + if (v.type != jbvString) + elog(ERROR, "Connection string expected"); + + strncpy(shard_map->shard_connstr[shard_map->n_shards++], + v.val.string.val, + MAX_PS_CONNSTR_LEN); + } + shard_map->update_counter += 1; + pfree(resp.data); + } +} + + static void SendDeltasToControlPlane() { diff --git a/pgxn/neon/control_plane_connector.h b/pgxn/neon/control_plane_connector.h index 12d6a97562..9c6e2b42a5 100644 --- a/pgxn/neon/control_plane_connector.h +++ b/pgxn/neon/control_plane_connector.h @@ -2,5 +2,6 @@ #define CONTROL_PLANE_CONNECTOR_H void InitControlPlaneConnector(); +void RequestShardMapFromControlPlane(ShardMap* shard_map); #endif diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 0a944a6bf0..701c5f61e7 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -18,7 +18,9 @@ #include "fmgr.h" #include "access/xlog.h" #include "access/xlogutils.h" +#include "common/hashfn.h" #include "storage/buf_internals.h" +#include "storage/ipc.h" #include "c.h" #include "libpq-fe.h" @@ -32,22 +34,12 @@ #include "neon.h" #include "walproposer.h" #include "neon_utils.h" +#include "control_plane_connector.h" #define PageStoreTrace DEBUG5 #define RECONNECT_INTERVAL_USEC 1000000 -bool connected = false; -PGconn *pageserver_conn = NULL; - -/* - * WaitEventSet containing: - * - WL_SOCKET_READABLE on pageserver_conn, - * - WL_LATCH_SET on MyLatch, and - * - WL_EXIT_ON_PM_DEATH. - */ -WaitEventSet *pageserver_conn_wes = NULL; - /* GUCs */ char *neon_timeline; char *neon_tenant; @@ -63,12 +55,136 @@ int max_reconnect_attempts = 60; bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL; -static bool pageserver_flush(void); -static void pageserver_disconnect(void); +static bool pageserver_flush(shardno_t shard_no); +static void pageserver_disconnect(shardno_t shard_no); static pqsigfunc prev_signal_handler; +static shmem_startup_hook_type prev_shmem_startup_hook; +#if PG_VERSION_NUM>=150000 +static shmem_request_hook_type prev_shmem_request_hook; +#endif + +static ShardMap* shard_map; +static LWLockId shard_map_lock; +static size_t shard_map_update_counter; + +typedef struct +{ + /* + * connection for each shard + */ + PGconn *conn; + /* + * WaitEventSet containing: + * - WL_SOCKET_READABLE on pageserver_conn, + * - WL_LATCH_SET on MyLatch, and + * - WL_EXIT_ON_PM_DEATH. + */ + WaitEventSet *wes; +} PageServer; + +static PageServer page_servers[MAX_SHARDS]; + +static void +psm_shmem_startup(void) +{ + bool found; + if (prev_shmem_startup_hook) + { + prev_shmem_startup_hook(); + } + + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + + shard_map = (ShardMap*)ShmemInitStruct("shard_map", sizeof(ShardMap), &found); + if (!found) + { + shard_map_lock = (LWLockId)GetNamedLWLockTranche("shard_map_lock"); + shard_map->n_shards = 0; + shard_map->update_counter = 0; + } + LWLockRelease(AddinShmemInitLock); +} + +static void +psm_shmem_request(void) +{ +#if PG_VERSION_NUM>=150000 + if (prev_shmem_request_hook) + prev_shmem_request_hook(); +#endif + + RequestAddinShmemSpace(sizeof(ShardMap)); + RequestNamedLWLockTranche("shard_map_lock", 1); +} + +static void +psm_init(void) +{ + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = psm_shmem_startup; +#if PG_VERSION_NUM>=150000 + prev_shmem_request_hook = shmem_request_hook; + shmem_request_hook = psm_shmem_request; +#else + psm_shmem_request(); +#endif +} + +shardno_t +get_shard_number(BufferTag* tag) +{ + shardno_t shard_no; + uint32 hash; + +#if PG_MAJORVERSION_NUM < 16 + hash = murmurhash32(tag->rnode.spcNode); + hash_combine(hash, murmurhash32(tag->rnode.dbNode)); + hash_combine(hash, murmurhash32(tag->rnode.relNode)); + hash_combine(hash, murmurhash32(tag->blockNum/STRIPE_SIZE)); +#else + hash = murmurhash32(tag->spcOid); + hash_combine(hash, murmurhash32(tag->dbOid)); + hash_combine(hash, murmurhash32(tag->relNumber)); + hash_combine(hash, murmurhash32(tag->blockNum/STRIPE_SIZE)); +#endif + + LWLockAcquire(shard_map_lock, LW_SHARED); + while (shard_map->n_shards == 0 || shard_map_update_counter != shard_map->update_counter) + { + /* Close all existed connections */ + for (shard_no = 0; shard_no < shard_map->n_shards; shard_no++) + { + if (page_servers[shard_no].conn) + pageserver_disconnect(shard_no); + } + + /* Request new shard map from control plane under exclusive lock */ + LWLockRelease(shard_map_lock); + LWLockAcquire(shard_map_lock, LW_EXCLUSIVE); + if (shard_map->n_shards == 0) + { + if (*page_server_connstring) + { + shard_map->n_shards = 1; + strncpy(shard_map->shard_connstr[0], page_server_connstring, sizeof shard_map->shard_connstr[0]); + } + else + { + RequestShardMapFromControlPlane(shard_map); + } + shard_map_update_counter = shard_map->update_counter; + } + } + shard_no = hash % shard_map->n_shards; + + LWLockRelease(shard_map_lock); + + return shard_no; +} + static void pageserver_sighup_handler(SIGNAL_ARGS) { @@ -77,19 +193,25 @@ pageserver_sighup_handler(SIGNAL_ARGS) prev_signal_handler(postgres_signal_arg); } neon_log(LOG, "Received SIGHUP, disconnecting pageserver. New pageserver connstring is %s", page_server_connstring); - pageserver_disconnect(); + + /* force refetching shard map from control plane */ + LWLockAcquire(shard_map_lock, LW_EXCLUSIVE); + shard_map->n_shards = 0; + LWLockRelease(shard_map_lock); } static bool -pageserver_connect(int elevel) +pageserver_connect(shardno_t shard_no, int elevel) { char *query; int ret; const char *keywords[3]; const char *values[3]; int n; + PGconn* conn; + WaitEventSet *wes; - Assert(!connected); + Assert(page_servers[shard_no].conn == NULL); /* * Connect using the connection string we got from the @@ -110,19 +232,18 @@ pageserver_connect(int elevel) n++; } keywords[n] = "dbname"; - values[n] = page_server_connstring; + values[n] = shard_map->shard_connstr[shard_no]; n++; keywords[n] = NULL; values[n] = NULL; n++; - pageserver_conn = PQconnectdbParams(keywords, values, 1); + conn = PQconnectdbParams(keywords, values, 1); - if (PQstatus(pageserver_conn) == CONNECTION_BAD) + if (PQstatus(conn) == CONNECTION_BAD) { - char *msg = pchomp(PQerrorMessage(pageserver_conn)); + char *msg = pchomp(PQerrorMessage(conn)); - PQfinish(pageserver_conn); - pageserver_conn = NULL; + PQfinish(conn); ereport(elevel, (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), @@ -130,30 +251,28 @@ pageserver_connect(int elevel) errdetail_internal("%s", msg))); return false; } - query = psprintf("pagestream %s %s", neon_tenant, neon_timeline); - ret = PQsendQuery(pageserver_conn, query); + ret = PQsendQuery(conn, query); if (ret != 1) { - PQfinish(pageserver_conn); - pageserver_conn = NULL; + PQfinish(conn); neon_log(elevel, "could not send pagestream command to pageserver"); return false; } - pageserver_conn_wes = CreateWaitEventSet(TopMemoryContext, 3); - AddWaitEventToSet(pageserver_conn_wes, WL_LATCH_SET, PGINVALID_SOCKET, + wes = CreateWaitEventSet(TopMemoryContext, 3); + AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); - AddWaitEventToSet(pageserver_conn_wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, + AddWaitEventToSet(wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, NULL, NULL); - AddWaitEventToSet(pageserver_conn_wes, WL_SOCKET_READABLE, PQsocket(pageserver_conn), NULL, NULL); + AddWaitEventToSet(wes, WL_SOCKET_READABLE, PQsocket(conn), NULL, NULL); - while (PQisBusy(pageserver_conn)) + while (PQisBusy(conn)) { WaitEvent event; /* Sleep until there's something to do */ - (void) WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION); + (void) WaitEventSetWait(wes, -1L, &event, 1, PG_WAIT_EXTENSION); ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); @@ -161,14 +280,12 @@ pageserver_connect(int elevel) /* Data available in socket? */ if (event.events & WL_SOCKET_READABLE) { - if (!PQconsumeInput(pageserver_conn)) + if (!PQconsumeInput(conn)) { - char *msg = pchomp(PQerrorMessage(pageserver_conn)); + char *msg = pchomp(PQerrorMessage(conn)); - PQfinish(pageserver_conn); - pageserver_conn = NULL; - FreeWaitEventSet(pageserver_conn_wes); - pageserver_conn_wes = NULL; + PQfinish(conn); + FreeWaitEventSet(wes); neon_log(elevel, "could not complete handshake with pageserver: %s", msg); @@ -177,9 +294,10 @@ pageserver_connect(int elevel) } } - neon_log(LOG, "libpagestore: connected to '%s'", page_server_connstring); + neon_log(LOG, "libpagestore: connected to '%s'", shard_map->shard_connstr[shard_no]); + page_servers[shard_no].conn = conn; + page_servers[shard_no].wes = wes; - connected = true; return true; } @@ -187,10 +305,10 @@ pageserver_connect(int elevel) * A wrapper around PQgetCopyData that checks for interrupts while sleeping. */ static int -call_PQgetCopyData(char **buffer) +call_PQgetCopyData(shardno_t shard_no, char **buffer) { int ret; - + PGconn* pageserver_conn = page_servers[shard_no].conn; retry: ret = PQgetCopyData(pageserver_conn, buffer, 1 /* async */ ); @@ -199,7 +317,7 @@ retry: WaitEvent event; /* Sleep until there's something to do */ - (void) WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION); + (void) WaitEventSetWait(page_servers[shard_no].wes, -1L, &event, 1, PG_WAIT_EXTENSION); ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); @@ -224,7 +342,7 @@ retry: static void -pageserver_disconnect(void) +pageserver_disconnect(shardno_t shard_no) { /* * If anything goes wrong while we were sending a request, it's not clear @@ -233,32 +351,32 @@ pageserver_disconnect(void) * time later after we have already sent a new unrelated request. Close * the connection to avoid getting confused. */ - if (connected) + if (page_servers[shard_no].conn) { neon_log(LOG, "dropping connection to page server due to error"); - PQfinish(pageserver_conn); - pageserver_conn = NULL; - connected = false; + PQfinish(page_servers[shard_no].conn); + page_servers[shard_no].conn = NULL; prefetch_on_ps_disconnect(); } - if (pageserver_conn_wes != NULL) + if (page_servers[shard_no].wes != NULL) { - FreeWaitEventSet(pageserver_conn_wes); - pageserver_conn_wes = NULL; + FreeWaitEventSet(page_servers[shard_no].wes); + page_servers[shard_no].wes = NULL; } } static bool -pageserver_send(NeonRequest * request) +pageserver_send(shardno_t shard_no, NeonRequest * request) { StringInfoData req_buff; + PGconn* pageserver_conn = page_servers[shard_no].conn; /* If the connection was lost for some reason, reconnect */ - if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD) + if (pageserver_conn && PQstatus(pageserver_conn) == CONNECTION_BAD) { neon_log(LOG, "pageserver_send disconnect bad connection"); - pageserver_disconnect(); + pageserver_disconnect(shard_no); } req_buff = nm_pack_request(request); @@ -270,9 +388,9 @@ pageserver_send(NeonRequest * request) * See https://github.com/neondatabase/neon/issues/1138 * So try to reestablish connection in case of failure. */ - if (!connected) + if (!page_servers[shard_no].conn) { - while (!pageserver_connect(n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR)) + while (!pageserver_connect(shard_no, n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR)) { n_reconnect_attempts += 1; pg_usleep(RECONNECT_INTERVAL_USEC); @@ -280,7 +398,9 @@ pageserver_send(NeonRequest * request) n_reconnect_attempts = 0; } - /* + pageserver_conn = page_servers[shard_no].conn; + + /* * Send request. * * In principle, this could block if the output buffer is full, and we @@ -291,7 +411,7 @@ pageserver_send(NeonRequest * request) if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0) { char *msg = pchomp(PQerrorMessage(pageserver_conn)); - pageserver_disconnect(); + pageserver_disconnect(shard_no); neon_log(LOG, "pageserver_send disconnect because failed to send page request (try to reconnect): %s", msg); pfree(msg); pfree(req_buff.data); @@ -311,12 +431,12 @@ pageserver_send(NeonRequest * request) } static NeonResponse * -pageserver_receive(void) +pageserver_receive(shardno_t shard_no) { StringInfoData resp_buff; NeonResponse *resp; - - if (!connected) + PGconn* pageserver_conn = page_servers[shard_no].conn; + if (!pageserver_conn) return NULL; PG_TRY(); @@ -324,7 +444,7 @@ pageserver_receive(void) /* read response */ int rc; - rc = call_PQgetCopyData(&resp_buff.data); + rc = call_PQgetCopyData(shard_no, &resp_buff.data); if (rc >= 0) { resp_buff.len = rc; @@ -343,25 +463,25 @@ pageserver_receive(void) else if (rc == -1) { neon_log(LOG, "pageserver_receive disconnect because call_PQgetCopyData returns -1: %s", pchomp(PQerrorMessage(pageserver_conn))); - pageserver_disconnect(); + pageserver_disconnect(shard_no); resp = NULL; } else if (rc == -2) { char* msg = pchomp(PQerrorMessage(pageserver_conn)); - pageserver_disconnect(); + pageserver_disconnect(shard_no); neon_log(ERROR, "pageserver_receive disconnect because could not read COPY data: %s", msg); } else { - pageserver_disconnect(); + pageserver_disconnect(shard_no); neon_log(ERROR, "pageserver_receive disconnect because unexpected PQgetCopyData return value: %d", rc); } } PG_CATCH(); { neon_log(LOG, "pageserver_receive disconnect due to caught exception"); - pageserver_disconnect(); + pageserver_disconnect(shard_no); PG_RE_THROW(); } PG_END_TRY(); @@ -371,9 +491,10 @@ pageserver_receive(void) static bool -pageserver_flush(void) +pageserver_flush(shardno_t shard_no) { - if (!connected) + PGconn* pageserver_conn = page_servers[shard_no].conn; + if (!pageserver_conn) { neon_log(WARNING, "Tried to flush while disconnected"); } @@ -382,7 +503,7 @@ pageserver_flush(void) if (PQflush(pageserver_conn)) { char *msg = pchomp(PQerrorMessage(pageserver_conn)); - pageserver_disconnect(); + pageserver_disconnect(shard_no); neon_log(LOG, "pageserver_flush disconnect because failed to flush page requests: %s", msg); pfree(msg); return false; @@ -502,4 +623,5 @@ pg_init_libpagestore(void) prev_signal_handler = pqsignal(SIGHUP, pageserver_sighup_handler); lfc_init(); + psm_init(); } diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index d61f74b5c8..8b7b4422c5 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -20,12 +20,25 @@ #include RELFILEINFO_HDR #include "storage/block.h" #include "storage/smgr.h" +#include "storage/buf_internals.h" #include "lib/stringinfo.h" #include "libpq/pqformat.h" #include "utils/memutils.h" #include "pg_config.h" +#define MAX_SHARDS 128 +#define STRIPE_SIZE (256 * 1024 / 8) /* TODO: should in betaken from control plane? */ +#define MAX_PS_CONNSTR_LEN 128 + +typedef struct +{ + size_t n_shards; + size_t update_counter; + char shard_connstr[MAX_SHARDS][MAX_PS_CONNSTR_LEN]; +} ShardMap; + + typedef enum { /* pagestore_client -> pagestore */ @@ -144,11 +157,13 @@ extern char *nm_to_string(NeonMessage * msg); * API */ +typedef unsigned shardno_t; + typedef struct { - bool (*send) (NeonRequest * request); - NeonResponse *(*receive) (void); - bool (*flush) (void); + bool (*send) (shardno_t shard_no, NeonRequest * request); + NeonResponse *(*receive) (shardno_t shard_no); + bool (*flush) (shardno_t shard_no); } page_server_api; extern void prefetch_on_ps_disconnect(void); @@ -165,6 +180,8 @@ extern char *neon_tenant; extern bool wal_redo; extern int32 max_cluster_size; +extern shardno_t get_shard_number(BufferTag* tag); + extern const f_smgr *smgr_neon(BackendId backend, NRelFileInfo rinfo); extern void smgr_init_neon(void); extern void readahead_buffer_resize(int newsize, void *extra); diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 3a841b04ec..e468a2596c 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -164,6 +164,7 @@ typedef struct PrefetchRequest { XLogRecPtr actual_request_lsn; NeonResponse *response; /* may be null */ PrefetchStatus status; + shardno_t shard_no; uint64 my_ring_index; } PrefetchRequest; @@ -225,6 +226,8 @@ typedef struct PrefetchState { /* the buffers */ prfh_hash *prf_hash; + int max_shard_no; + uint8 shard_bitmap[(MAX_SHARDS + 7)/8]; PrefetchRequest prf_buffer[]; /* prefetch buffers */ } PrefetchState; @@ -313,6 +316,7 @@ compact_prefetch_buffers(void) Assert(target_slot->status == PRFS_UNUSED); target_slot->buftag = source_slot->buftag; + target_slot->shard_no = source_slot->shard_no; target_slot->status = source_slot->status; target_slot->response = source_slot->response; target_slot->effective_request_lsn = source_slot->effective_request_lsn; @@ -477,6 +481,23 @@ prefetch_cleanup_trailing_unused(void) } } + +static bool +prefetch_flush_requests(void) +{ + for (shardno_t shard_no = 0; shard_no < MyPState->max_shard_no; shard_no++) + { + if (MyPState->shard_bitmap[shard_no >> 3] & (1 << (shard_no & 7))) + { + if (!page_server->flush(shard_no)) + return false; + MyPState->shard_bitmap[shard_no >> 3] &= ~(1 << (shard_no & 7)); + } + } + MyPState->max_shard_no = 0; + return true; +} + /* * Wait for slot of ring_index to have received its response. * The caller is responsible for making sure the request buffer is flushed. @@ -492,7 +513,7 @@ prefetch_wait_for(uint64 ring_index) if (MyPState->ring_flush <= ring_index && MyPState->ring_unused > MyPState->ring_flush) { - if (!page_server->flush()) + if (!prefetch_flush_requests()) return false; MyPState->ring_flush = MyPState->ring_unused; } @@ -530,7 +551,7 @@ prefetch_read(PrefetchRequest *slot) Assert(slot->my_ring_index == MyPState->ring_receive); old = MemoryContextSwitchTo(MyPState->errctx); - response = (NeonResponse *) page_server->receive(); + response = (NeonResponse *) page_server->receive(slot->shard_no); MemoryContextSwitchTo(old); if (response) { @@ -682,12 +703,14 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force Assert(slot->response == NULL); Assert(slot->my_ring_index == MyPState->ring_unused); - while (!page_server->send((NeonRequest *) &request)); + while (!page_server->send(slot->shard_no, (NeonRequest *) &request)); /* update prefetch state */ MyPState->n_requests_inflight += 1; MyPState->n_unused -= 1; MyPState->ring_unused += 1; + MyPState->shard_bitmap[slot->shard_no >> 3] |= 1 << (slot->shard_no & 7); + MyPState->max_shard_no = Max(slot->shard_no+1, MyPState->max_shard_no); /* update slot state */ slot->status = PRFS_REQUESTED; @@ -847,6 +870,7 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls * function reads the buffer tag from the slot. */ slot->buftag = tag; + slot->shard_no = get_shard_number(&tag); slot->my_ring_index = ring_index; prefetch_do_request(slot, force_latest, force_lsn); @@ -857,7 +881,7 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls if (flush_every_n_requests > 0 && MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests) { - if (!page_server->flush()) + if (!prefetch_flush_requests()) { /* Prefetch set is reset in case of error, so we should try to register our request once again */ goto Retry; @@ -872,11 +896,34 @@ static NeonResponse * page_server_request(void const *req) { NeonResponse* resp; + BufferTag tag = {0}; + shardno_t shard_no; + + switch (((NeonRequest *) req)->tag) + { + case T_NeonExistsRequest: + CopyNRelFileInfoToBufTag(tag, ((NeonExistsRequest *) req)->rinfo); + break; + case T_NeonNblocksRequest: + CopyNRelFileInfoToBufTag(tag, ((NeonNblocksRequest *) req)->rinfo); + break; + case T_NeonDbSizeRequest: + NInfoGetDbOid(BufTagGetNRelFileInfo(tag)) = ((NeonDbSizeRequest *) req)->dbNode; + break; + case T_NeonGetPageRequest: + CopyNRelFileInfoToBufTag(tag, ((NeonGetPageRequest *) req)->rinfo); + tag.blockNum = ((NeonGetPageRequest *) req)->blkno; + break; + default: + elog(ERROR, "Unexpected request tag: %d", ((NeonRequest *) req)->tag); + } + shard_no = get_shard_number(&tag); + do { - while (!page_server->send((NeonRequest *) req) || !page_server->flush()); + while (!page_server->send(shard_no, (NeonRequest *) req) || !page_server->flush(shard_no)); MyPState->ring_flush = MyPState->ring_unused; consume_prefetch_responses(); - resp = page_server->receive(); + resp = page_server->receive(shard_no); } while (resp == NULL); return resp;